diff --git a/pkg/sfu/buffer/buffer_base.go b/pkg/sfu/buffer/buffer_base.go index b288296d2..3dd24387d 100644 --- a/pkg/sfu/buffer/buffer_base.go +++ b/pkg/sfu/buffer/buffer_base.go @@ -272,6 +272,8 @@ func (b *BufferBase) SetLogger(lgr logger.Logger) { } func (b *BufferBase) Bind(rtpParameters webrtc.RTPParameters, codec webrtc.RTPCodecCapability, bitrate int) error { + b.logger.Debugw("binding track") + b.Lock() defer b.Unlock() @@ -487,6 +489,8 @@ func (b *BufferBase) stopRTPStats(reason string) (stats *livekit.RTPStats, stats } func (b *BufferBase) RestartStream(reason string) { + b.logger.Infow("stream restart", "reason", reason) + b.Lock() defer b.Unlock() diff --git a/pkg/sfu/receiver_base.go b/pkg/sfu/receiver_base.go index 8293abbd7..a18e83ca9 100644 --- a/pkg/sfu/receiver_base.go +++ b/pkg/sfu/receiver_base.go @@ -368,6 +368,13 @@ func (r *ReceiverBase) Restart(reason string) { } func (r *ReceiverBase) restartInternal(reason string, isDetected bool) { + r.params.Logger.Debugw( + "restart receiver", + "reason", reason, + "isDetected", isDetected, + "isClosed", r.IsClosed(), + ) + if r.IsClosed() { return } @@ -375,6 +382,7 @@ func (r *ReceiverBase) restartInternal(reason string, isDetected bool) { // 1. guard against concurrent restarts r.bufferMu.Lock() if r.restartInProgress { + r.params.Logger.Debugw("restart receiver, skipping duplicate") r.bufferMu.Unlock() return } @@ -382,30 +390,38 @@ func (r *ReceiverBase) restartInternal(reason string, isDetected bool) { // 2. advance forwarder generation r.forwardersGeneration.Inc() + r.params.Logger.Debugw( + "restart receiver, advanced forwarder generation", + "forwardersGeneration", r.forwardersGeneration.Load(), + ) r.bufferMu.Unlock() // 3. restart all the buffers - // if a stream was detected, skip external restart + // if a stream restart was detected, skip external restart // // NOTE: The case of external restart and detected restart (which usually comes from one buffer) // racing will miss restart on all buffers if detected restart from one buffer adds the guard // against concurrent restart. But, that condition should be very rare if at all. // External restart happens when the underlying track changes or when seeking if !isDetected { - for _, buff := range r.GetAllBuffers() { + for layer, buff := range r.GetAllBuffers() { if buff == nil { continue } + r.params.Logger.Debugw("restart receiver, restarting buffer", "layer", layer) buff.RestartStream(reason) } + r.params.Logger.Debugw("restart receiver, restarted buffers") } // 4. wait for the forwarders to finish r.waitForForwardersStop() + r.params.Logger.Debugw("restart receiver, forwarders stopped") // 5. reset stream tracker r.streamTrackerManager.RemoveAllTrackers() + r.params.Logger.Debugw("restart receiver, stream trackers removed") // 6. signal attached downtracks to resync so that they can have proper sequencing on a receiver restart r.downTrackSpreader.Broadcast(func(dt TrackSender) { @@ -414,9 +430,14 @@ func (r *ReceiverBase) restartInternal(reason string, isDetected bool) { if rt := r.loadREDTransformer(); rt != nil { rt.OnStreamRestart() } + r.params.Logger.Debugw("restart receiver, down tracks signalled") // 7. move forwarder generation ahead r.startForwarderGeneration() + r.params.Logger.Debugw( + "restart receiver, restarted forwarder generation", + "forwardersGeneration", r.forwardersGeneration.Load(), + ) r.bufferMu.Lock() // 8. release restart hold @@ -428,8 +449,10 @@ func (r *ReceiverBase) restartInternal(reason string, isDetected bool) { continue } + r.params.Logger.Debugw("restart receiver, restarting forwarder", "layer", layer) r.startForwarderForBufferLocked(int32(layer), buff) } + r.params.Logger.Debugw("restart receiver, restarted forwarders") r.bufferMu.Unlock() } @@ -888,16 +911,17 @@ func (r *ReceiverBase) startForwarderGeneration() { func (r *ReceiverBase) waitForForwardersStop() { r.bufferMu.Lock() - forwarderWaitGroup := r.forwardersWaitGroup + forwardersWaitGroup := r.forwardersWaitGroup r.bufferMu.Unlock() - if forwarderWaitGroup != nil { - forwarderWaitGroup.Wait() + if forwardersWaitGroup != nil { + forwardersWaitGroup.Wait() } } func (r *ReceiverBase) startForwarderForBufferLocked(layer int32, buff buffer.BufferProvider) { if r.restartInProgress { + r.params.Logger.Debugw("restart in progress, deferring starting forwarder", "layer", layer) return }