mirror of
https://github.com/livekit/livekit.git
synced 2026-05-15 07:25:30 +00:00
Add debug for receiver restart. (#4328)
* Add debug for receiver restart. Have a suspicion that something is deadlocking between restart receiver and buffer bind during replay. Adding debug to get a better picture of state of receiver restart. * consistent logging
This commit is contained in:
@@ -272,6 +272,8 @@ func (b *BufferBase) SetLogger(lgr logger.Logger) {
|
||||
}
|
||||
|
||||
func (b *BufferBase) Bind(rtpParameters webrtc.RTPParameters, codec webrtc.RTPCodecCapability, bitrate int) error {
|
||||
b.logger.Debugw("binding track")
|
||||
|
||||
b.Lock()
|
||||
defer b.Unlock()
|
||||
|
||||
@@ -487,6 +489,8 @@ func (b *BufferBase) stopRTPStats(reason string) (stats *livekit.RTPStats, stats
|
||||
}
|
||||
|
||||
func (b *BufferBase) RestartStream(reason string) {
|
||||
b.logger.Infow("stream restart", "reason", reason)
|
||||
|
||||
b.Lock()
|
||||
defer b.Unlock()
|
||||
|
||||
|
||||
@@ -368,6 +368,13 @@ func (r *ReceiverBase) Restart(reason string) {
|
||||
}
|
||||
|
||||
func (r *ReceiverBase) restartInternal(reason string, isDetected bool) {
|
||||
r.params.Logger.Debugw(
|
||||
"restart receiver",
|
||||
"reason", reason,
|
||||
"isDetected", isDetected,
|
||||
"isClosed", r.IsClosed(),
|
||||
)
|
||||
|
||||
if r.IsClosed() {
|
||||
return
|
||||
}
|
||||
@@ -375,6 +382,7 @@ func (r *ReceiverBase) restartInternal(reason string, isDetected bool) {
|
||||
// 1. guard against concurrent restarts
|
||||
r.bufferMu.Lock()
|
||||
if r.restartInProgress {
|
||||
r.params.Logger.Debugw("restart receiver, skipping duplicate")
|
||||
r.bufferMu.Unlock()
|
||||
return
|
||||
}
|
||||
@@ -382,30 +390,38 @@ func (r *ReceiverBase) restartInternal(reason string, isDetected bool) {
|
||||
|
||||
// 2. advance forwarder generation
|
||||
r.forwardersGeneration.Inc()
|
||||
r.params.Logger.Debugw(
|
||||
"restart receiver, advanced forwarder generation",
|
||||
"forwardersGeneration", r.forwardersGeneration.Load(),
|
||||
)
|
||||
r.bufferMu.Unlock()
|
||||
|
||||
// 3. restart all the buffers
|
||||
// if a stream was detected, skip external restart
|
||||
// if a stream restart was detected, skip external restart
|
||||
//
|
||||
// NOTE: The case of external restart and detected restart (which usually comes from one buffer)
|
||||
// racing will miss restart on all buffers if detected restart from one buffer adds the guard
|
||||
// against concurrent restart. But, that condition should be very rare if at all.
|
||||
// External restart happens when the underlying track changes or when seeking
|
||||
if !isDetected {
|
||||
for _, buff := range r.GetAllBuffers() {
|
||||
for layer, buff := range r.GetAllBuffers() {
|
||||
if buff == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
r.params.Logger.Debugw("restart receiver, restarting buffer", "layer", layer)
|
||||
buff.RestartStream(reason)
|
||||
}
|
||||
r.params.Logger.Debugw("restart receiver, restarted buffers")
|
||||
}
|
||||
|
||||
// 4. wait for the forwarders to finish
|
||||
r.waitForForwardersStop()
|
||||
r.params.Logger.Debugw("restart receiver, forwarders stopped")
|
||||
|
||||
// 5. reset stream tracker
|
||||
r.streamTrackerManager.RemoveAllTrackers()
|
||||
r.params.Logger.Debugw("restart receiver, stream trackers removed")
|
||||
|
||||
// 6. signal attached downtracks to resync so that they can have proper sequencing on a receiver restart
|
||||
r.downTrackSpreader.Broadcast(func(dt TrackSender) {
|
||||
@@ -414,9 +430,14 @@ func (r *ReceiverBase) restartInternal(reason string, isDetected bool) {
|
||||
if rt := r.loadREDTransformer(); rt != nil {
|
||||
rt.OnStreamRestart()
|
||||
}
|
||||
r.params.Logger.Debugw("restart receiver, down tracks signalled")
|
||||
|
||||
// 7. move forwarder generation ahead
|
||||
r.startForwarderGeneration()
|
||||
r.params.Logger.Debugw(
|
||||
"restart receiver, restarted forwarder generation",
|
||||
"forwardersGeneration", r.forwardersGeneration.Load(),
|
||||
)
|
||||
|
||||
r.bufferMu.Lock()
|
||||
// 8. release restart hold
|
||||
@@ -428,8 +449,10 @@ func (r *ReceiverBase) restartInternal(reason string, isDetected bool) {
|
||||
continue
|
||||
}
|
||||
|
||||
r.params.Logger.Debugw("restart receiver, restarting forwarder", "layer", layer)
|
||||
r.startForwarderForBufferLocked(int32(layer), buff)
|
||||
}
|
||||
r.params.Logger.Debugw("restart receiver, restarted forwarders")
|
||||
r.bufferMu.Unlock()
|
||||
}
|
||||
|
||||
@@ -888,16 +911,17 @@ func (r *ReceiverBase) startForwarderGeneration() {
|
||||
|
||||
func (r *ReceiverBase) waitForForwardersStop() {
|
||||
r.bufferMu.Lock()
|
||||
forwarderWaitGroup := r.forwardersWaitGroup
|
||||
forwardersWaitGroup := r.forwardersWaitGroup
|
||||
r.bufferMu.Unlock()
|
||||
|
||||
if forwarderWaitGroup != nil {
|
||||
forwarderWaitGroup.Wait()
|
||||
if forwardersWaitGroup != nil {
|
||||
forwardersWaitGroup.Wait()
|
||||
}
|
||||
}
|
||||
|
||||
func (r *ReceiverBase) startForwarderForBufferLocked(layer int32, buff buffer.BufferProvider) {
|
||||
if r.restartInProgress {
|
||||
r.params.Logger.Debugw("restart in progress, deferring starting forwarder", "layer", layer)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user