mirror of
https://github.com/livekit/livekit.git
synced 2026-04-01 00:05:40 +00:00
Do not post to closed channels. (#2179)
* Do not post to closed channels. Perils of atomics. Hard to imagine, but I guess it could happen. The postMaxLayerNotifier checked for closed and down track was not closed. But, between that check and posting to channel (which is a very small window), the down track could have been closed and the channel (maxLayerNotiferCh) is closed. Protect that channel post + close with the bind lock. * reduce the change * Check for closed inside lock
This commit is contained in:
@@ -637,14 +637,18 @@ func (d *DownTrack) keyFrameRequester(generation uint32, layer int32) {
|
||||
}
|
||||
|
||||
func (d *DownTrack) postMaxLayerNotifierEvent() {
|
||||
if d.IsClosed() || d.kind != webrtc.RTPCodecTypeVideo {
|
||||
if d.kind != webrtc.RTPCodecTypeVideo {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case d.maxLayerNotifierCh <- struct{}{}:
|
||||
default:
|
||||
d.bindLock.Lock()
|
||||
if !d.IsClosed() {
|
||||
select {
|
||||
case d.maxLayerNotifierCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
d.bindLock.Unlock()
|
||||
}
|
||||
|
||||
func (d *DownTrack) maxLayerNotifierWorker() {
|
||||
@@ -930,12 +934,13 @@ func (d *DownTrack) Close() {
|
||||
// 2. in case of session migration, participant migrate from other node, video track should
|
||||
// be resumed with same participant, set flush=false since we don't need to flush decoder.
|
||||
func (d *DownTrack) CloseWithFlush(flush bool) {
|
||||
d.bindLock.Lock()
|
||||
if d.isClosed.Swap(true) {
|
||||
d.bindLock.Unlock()
|
||||
// already closed
|
||||
return
|
||||
}
|
||||
|
||||
d.bindLock.Lock()
|
||||
d.params.Logger.Debugw("close down track", "flushBlankFrame", flush)
|
||||
if d.bound.Load() {
|
||||
d.forwarder.Mute(true, true)
|
||||
@@ -959,6 +964,7 @@ func (d *DownTrack) CloseWithFlush(flush bool) {
|
||||
}
|
||||
|
||||
d.bound.Store(false)
|
||||
d.onBindAndConnectedChange()
|
||||
d.params.Logger.Debugw("closing sender", "kind", d.kind)
|
||||
}
|
||||
d.params.Receiver.DeleteDownTrack(d.params.SubID)
|
||||
@@ -969,13 +975,12 @@ func (d *DownTrack) CloseWithFlush(flush bool) {
|
||||
d.rtcpReader.OnPacket(nil)
|
||||
}
|
||||
|
||||
close(d.maxLayerNotifierCh)
|
||||
d.bindLock.Unlock()
|
||||
d.connectionStats.Close()
|
||||
d.rtpStats.Stop()
|
||||
d.params.Logger.Infow("rtp stats", "direction", "downstream", "mime", d.mime, "ssrc", d.ssrc, "stats", d.rtpStats.ToString())
|
||||
|
||||
close(d.maxLayerNotifierCh)
|
||||
|
||||
if onCloseHandler := d.getOnCloseHandler(); onCloseHandler != nil {
|
||||
onCloseHandler(!flush)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user