diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 98f3f4c12..86188797b 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -269,7 +269,9 @@ type DownTrack struct { pacer pacer.Pacer - maxLayerNotifierCh chan struct{} + maxLayerNotifierChMu sync.RWMutex + maxLayerNotifierCh chan struct{} + maxLayerNotifierChClosed bool cbMu sync.RWMutex onStatsUpdate func(dt *DownTrack, stat *livekit.AnalyticsStat) @@ -641,14 +643,14 @@ func (d *DownTrack) postMaxLayerNotifierEvent() { return } - d.bindLock.Lock() - if !d.IsClosed() { + d.maxLayerNotifierChMu.RLock() + if !d.maxLayerNotifierChClosed { select { case d.maxLayerNotifierCh <- struct{}{}: default: } } - d.bindLock.Unlock() + d.maxLayerNotifierChMu.RUnlock() } func (d *DownTrack) maxLayerNotifierWorker() { @@ -934,13 +936,12 @@ func (d *DownTrack) Close() { // 2. in case of session migration, participant migrate from other node, video track should // be resumed with same participant, set flush=false since we don't need to flush decoder. func (d *DownTrack) CloseWithFlush(flush bool) { - d.bindLock.Lock() if d.isClosed.Swap(true) { - d.bindLock.Unlock() // already closed return } + d.bindLock.Lock() d.params.Logger.Debugw("close down track", "flushBlankFrame", flush) if d.bound.Load() { d.forwarder.Mute(true, true) @@ -975,12 +976,16 @@ func (d *DownTrack) CloseWithFlush(flush bool) { d.rtcpReader.OnPacket(nil) } - close(d.maxLayerNotifierCh) d.bindLock.Unlock() d.connectionStats.Close() d.rtpStats.Stop() d.params.Logger.Infow("rtp stats", "direction", "downstream", "mime", d.mime, "ssrc", d.ssrc, "stats", d.rtpStats.ToString()) + d.maxLayerNotifierChMu.Lock() + d.maxLayerNotifierChClosed = true + close(d.maxLayerNotifierCh) + d.maxLayerNotifierChMu.Unlock() + if onCloseHandler := d.getOnCloseHandler(); onCloseHandler != nil { onCloseHandler(!flush) }