diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 880f047e8..98f3f4c12 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -637,14 +637,18 @@ func (d *DownTrack) keyFrameRequester(generation uint32, layer int32) { } func (d *DownTrack) postMaxLayerNotifierEvent() { - if d.IsClosed() || d.kind != webrtc.RTPCodecTypeVideo { + if d.kind != webrtc.RTPCodecTypeVideo { return } - select { - case d.maxLayerNotifierCh <- struct{}{}: - default: + d.bindLock.Lock() + if !d.IsClosed() { + select { + case d.maxLayerNotifierCh <- struct{}{}: + default: + } } + d.bindLock.Unlock() } func (d *DownTrack) maxLayerNotifierWorker() { @@ -930,12 +934,13 @@ func (d *DownTrack) Close() { // 2. in case of session migration, participant migrate from other node, video track should // be resumed with same participant, set flush=false since we don't need to flush decoder. func (d *DownTrack) CloseWithFlush(flush bool) { + d.bindLock.Lock() if d.isClosed.Swap(true) { + d.bindLock.Unlock() // already closed return } - d.bindLock.Lock() d.params.Logger.Debugw("close down track", "flushBlankFrame", flush) if d.bound.Load() { d.forwarder.Mute(true, true) @@ -959,6 +964,7 @@ func (d *DownTrack) CloseWithFlush(flush bool) { } d.bound.Store(false) + d.onBindAndConnectedChange() d.params.Logger.Debugw("closing sender", "kind", d.kind) } d.params.Receiver.DeleteDownTrack(d.params.SubID) @@ -969,13 +975,12 @@ func (d *DownTrack) CloseWithFlush(flush bool) { d.rtcpReader.OnPacket(nil) } + close(d.maxLayerNotifierCh) d.bindLock.Unlock() d.connectionStats.Close() d.rtpStats.Stop() d.params.Logger.Infow("rtp stats", "direction", "downstream", "mime", d.mime, "ssrc", d.ssrc, "stats", d.rtpStats.ToString()) - close(d.maxLayerNotifierCh) - if onCloseHandler := d.getOnCloseHandler(); onCloseHandler != nil { onCloseHandler(!flush) }