From d6ad857506bb0ebebcc47cd6a2f5ebec091d2eca Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Tue, 24 Oct 2023 18:21:59 +0530 Subject: [PATCH] Do not post to closed channels. (#2179) * Do not post to closed channels. Perils of atomics. Hard to imagine, but I guess it could happen. The postMaxLayerNotifier checked for closed and down track was not closed. But, between that check and posting to channel (which is a very small window), the down track could have been closed and the channel (maxLayerNotiferCh) is closed. Protect that channel post + close with the bind lock. * reduce the change * Check for closed inside lock --- pkg/sfu/downtrack.go | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 880f047e8..98f3f4c12 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -637,14 +637,18 @@ func (d *DownTrack) keyFrameRequester(generation uint32, layer int32) { } func (d *DownTrack) postMaxLayerNotifierEvent() { - if d.IsClosed() || d.kind != webrtc.RTPCodecTypeVideo { + if d.kind != webrtc.RTPCodecTypeVideo { return } - select { - case d.maxLayerNotifierCh <- struct{}{}: - default: + d.bindLock.Lock() + if !d.IsClosed() { + select { + case d.maxLayerNotifierCh <- struct{}{}: + default: + } } + d.bindLock.Unlock() } func (d *DownTrack) maxLayerNotifierWorker() { @@ -930,12 +934,13 @@ func (d *DownTrack) Close() { // 2. in case of session migration, participant migrate from other node, video track should // be resumed with same participant, set flush=false since we don't need to flush decoder. func (d *DownTrack) CloseWithFlush(flush bool) { + d.bindLock.Lock() if d.isClosed.Swap(true) { + d.bindLock.Unlock() // already closed return } - d.bindLock.Lock() d.params.Logger.Debugw("close down track", "flushBlankFrame", flush) if d.bound.Load() { d.forwarder.Mute(true, true) @@ -959,6 +964,7 @@ func (d *DownTrack) CloseWithFlush(flush bool) { } d.bound.Store(false) + d.onBindAndConnectedChange() d.params.Logger.Debugw("closing sender", "kind", d.kind) } d.params.Receiver.DeleteDownTrack(d.params.SubID) @@ -969,13 +975,12 @@ func (d *DownTrack) CloseWithFlush(flush bool) { d.rtcpReader.OnPacket(nil) } + close(d.maxLayerNotifierCh) d.bindLock.Unlock() d.connectionStats.Close() d.rtpStats.Stop() d.params.Logger.Infow("rtp stats", "direction", "downstream", "mime", d.mime, "ssrc", d.ssrc, "stats", d.rtpStats.ToString()) - close(d.maxLayerNotifierCh) - if onCloseHandler := d.getOnCloseHandler(); onCloseHandler != nil { onCloseHandler(!flush) }