From 0d7477178e2b7d326b0337c8eca8298f6ea848e2 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Fri, 20 Oct 2023 00:44:39 +0530 Subject: [PATCH] More fine grained filtering NACKs after a key frame. (#2159) * More fine grained filtering NACKs after a key frame. There are applications with periodic key frame. So, a packet lost before a key frame will not be retransmitted. But, decoder could wait (jitter buffer, play out time) and cause a stutter. Idea behind disabling NACKs after key frame was another knob to throttle retransmission bit rate. But, with spaced out retransmissions and max retransmissions per sequence number, there are throttles. This would provide more throttling, but affects some applications. So, disabling filtering NACKs after a key frame. Introducing another flag to disallow layers. This would still be quite useful, i. e. under congestion the stream allocator would move the target lower. But, because of congestion, higher layer would have lost a bunch of packets. Client would NACK those. Retransmitting those higher layer packets would congest the channel more. The new flag (default enabled) would disallow higher layers retransmission. This was happening before this change also, just splitting out the flag for more control. * split flag --- pkg/sfu/buffer/rtpstats_receiver.go | 4 +++- pkg/sfu/forwarder.go | 27 ++++++++++++++------------- pkg/sfu/receiver.go | 5 ++--- 3 files changed, 19 insertions(+), 17 deletions(-) diff --git a/pkg/sfu/buffer/rtpstats_receiver.go b/pkg/sfu/buffer/rtpstats_receiver.go index fe906eb98..164c0928b 100644 --- a/pkg/sfu/buffer/rtpstats_receiver.go +++ b/pkg/sfu/buffer/rtpstats_receiver.go @@ -315,7 +315,9 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData) timeSinceFirst := srData.NTPTimestamp.Time().Sub(r.srFirst.NTPTimestamp.Time()).Seconds() rtpDiffSinceFirst := srDataCopy.RTPTimestampExt - r.srFirst.RTPTimestampExt calculatedClockRateFromFirst := float64(rtpDiffSinceFirst) / timeSinceFirst - if math.Abs(float64(r.params.ClockRate)-calculatedClockRateFromLast) > 0.2*float64(r.params.ClockRate) || math.Abs(float64(r.params.ClockRate)-calculatedClockRateFromFirst) > 0.2*float64(r.params.ClockRate) { + + if (timeSinceLast > 0.2 && math.Abs(float64(r.params.ClockRate)-calculatedClockRateFromLast) > 0.2*float64(r.params.ClockRate)) || + (timeSinceFirst > 0.2 && math.Abs(float64(r.params.ClockRate)-calculatedClockRateFromFirst) > 0.2*float64(r.params.ClockRate)) { r.logger.Infow( "clock rate skew", "first", r.srFirst.ToString(), diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index cebb5704b..ea6ea29c1 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -38,7 +38,8 @@ import ( // Forwarder const ( FlagPauseOnDowngrade = true - FlagFilterRTX = true + FlagFilterRTX = false + FlagFilterRTXLayers = true TransitionCostSpatial = 10 ResumeBehindThresholdSeconds = float64(0.2) // 200ms @@ -1399,15 +1400,14 @@ func (f *Forwarder) CheckSync() (locked bool, layer int32) { } func (f *Forwarder) FilterRTX(nacks []uint16) (filtered []uint16, disallowedLayers [buffer.DefaultMaxLayerSpatial + 1]bool) { - if !FlagFilterRTX { - filtered = nacks - return - } - f.lock.RLock() defer f.lock.RUnlock() - filtered = f.rtpMunger.FilterRTX(nacks) + if !FlagFilterRTX { + filtered = nacks + } else { + filtered = f.rtpMunger.FilterRTX(nacks) + } // // Curb RTX when deficient for two cases @@ -1417,14 +1417,15 @@ func (f *Forwarder) FilterRTX(nacks []uint16) (filtered []uint16, disallowedLaye // // Without the curb, when congestion hits, RTX rate could be so high that it further congests the channel. // - currentLayer := f.vls.GetCurrent() - targetLayer := f.vls.GetTarget() - for layer := int32(0); layer < buffer.DefaultMaxLayerSpatial+1; layer++ { - if f.isDeficientLocked() && (targetLayer.Spatial < currentLayer.Spatial || layer > currentLayer.Spatial) { - disallowedLayers[layer] = true + if FlagFilterRTXLayers { + currentLayer := f.vls.GetCurrent() + targetLayer := f.vls.GetTarget() + for layer := int32(0); layer < buffer.DefaultMaxLayerSpatial+1; layer++ { + if f.isDeficientLocked() && (targetLayer.Spatial < currentLayer.Spatial || layer > currentLayer.Spatial) { + disallowedLayers[layer] = true + } } } - return } diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index e7ba1fb93..560a9246d 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -210,9 +210,6 @@ func NewWebRTCReceiver( isRED: IsRedCodec(track.Codec().MimeType), } - w.streamTrackerManager = NewStreamTrackerManager(logger, trackInfo, w.isSVC, w.codec.ClockRate, trackersConfig) - w.streamTrackerManager.SetListener(w) - for _, opt := range opts { w = opt(w) } @@ -235,6 +232,8 @@ func NewWebRTCReceiver( }) w.connectionStats.Start(w.trackInfo) + w.streamTrackerManager = NewStreamTrackerManager(logger, trackInfo, w.isSVC, w.codec.ClockRate, trackersConfig) + w.streamTrackerManager.SetListener(w) // SVC-TODO: Handle DD for non-SVC cases??? if w.isSVC { for _, ext := range receiver.GetParameters().HeaderExtensions {