From ab906d710c53cf248bbd3ca1ebe73325fb7e8661 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Mon, 27 Oct 2025 17:40:39 +0530 Subject: [PATCH] Prevent leakage of previous codec after codec regression. (#4035) * Prevent leakage of previous codec after codec regression. In the window between forwarder restart and determining codec, the old codec packet could leak through. Prevent tha by doing the restart and codec determination atomically on a codec regression. * tidy * use locked function --- pkg/sfu/downtrack.go | 4 ++-- pkg/sfu/forwarder.go | 8 +++++++- pkg/sfu/receiver.go | 2 +- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index a7ed1953b..4736c233b 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -709,9 +709,9 @@ func (d *DownTrack) handleUpstreamCodecChange(mimeType string) { "oldCodec", oldCodec, "newCodec", codec.RTPCodecCapability, ) - d.forwarder.Restart() receiver := d.Receiver() - d.forwarder.DetermineCodec(codec.RTPCodecCapability, receiver.HeaderExtensions(), receiver.VideoLayerMode()) + d.forwarder.Restart(codec.RTPCodecCapability, receiver.HeaderExtensions(), receiver.VideoLayerMode()) + d.connectionStats.UpdateCodec(d.Mime(), isFECEnabled) } diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 8c59db68c..04439a597 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -310,6 +310,10 @@ func (f *Forwarder) DetermineCodec(codec webrtc.RTPCodecCapability, extensions [ f.lock.Lock() defer f.lock.Unlock() + f.determineCodecLocked(codec, extensions, videoLayerMode) +} + +func (f *Forwarder) determineCodecLocked(codec webrtc.RTPCodecCapability, extensions []webrtc.RTPHeaderExtensionParameter, videoLayerMode livekit.VideoLayer_Mode) { if videoLayerMode == livekit.VideoLayer_ONE_SPATIAL_LAYER_PER_STREAM_INCOMPLETE_RTCP_SR { f.skipReferenceTS = true } @@ -1630,7 +1634,7 @@ func (f *Forwarder) CheckSync() (bool, int32) { return f.vls.CheckSync() } -func (f *Forwarder) Restart() { +func (f *Forwarder) Restart(codec webrtc.RTPCodecCapability, extensions []webrtc.RTPHeaderExtensionParameter, videoLayerMode livekit.VideoLayer_Mode) { f.lock.Lock() defer f.lock.Unlock() @@ -1644,6 +1648,8 @@ func (f *Forwarder) Restart() { } f.lastSwitchExtIncomingTS = 0 f.refVideoLayerMode = livekit.VideoLayer_MODE_UNUSED + + f.determineCodecLocked(codec, extensions, videoLayerMode) } func (f *Forwarder) FilterRTX(nacks []uint16) (filtered []uint16, disallowedLayers [buffer.DefaultMaxLayerSpatial + 1]bool) { diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 52ea1326b..ec069f621 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -602,7 +602,7 @@ func (w *WebRTCReceiver) GetLayeredBitrate() ([]int32, Bitrates) { return w.streamTrackerManager.GetLayeredBitrate() } -// OnCloseHandler method to be called on remote tracked removed +// OnCloseHandler method to be called on remote track removed func (w *WebRTCReceiver) OnCloseHandler(fn func()) { w.onCloseHandler = fn }