From 860702e9dcf93701931e7caee9ec3cd78409a757 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Tue, 2 Apr 2024 14:21:20 +0530 Subject: [PATCH] Prevent large spikes in propagation delay (#2615) * Prevent large spikes in propagation delay A few tweaks - Large spike in propagation delay due to congested channel results in long term estimate getting high value. Ignore outliers in long term estimate. - Introduce a new field for adjusted arrival time as adjusting the arrival time in place meant it got applied again across the relay and that caused different propagation delay on remote nodes. - Reset path change counters as long as there is any sample that is not higher than the multiple of long term. There was a case of o Sample with high value that triggered path change start. o Then some samples with high enough delta, but did not meet the criteria for increasing counter further. o Some time later, another sample met the threshold and that triggered a path change re-init. * do not adapt to large delta --- pkg/sfu/buffer/rtpstats_base.go | 6 +++-- pkg/sfu/buffer/rtpstats_receiver.go | 40 +++++++++++++++++------------ pkg/sfu/buffer/rtpstats_sender.go | 5 ++-- 3 files changed, 31 insertions(+), 20 deletions(-) diff --git a/pkg/sfu/buffer/rtpstats_base.go b/pkg/sfu/buffer/rtpstats_base.go index 11c9517c6..6d6203c17 100644 --- a/pkg/sfu/buffer/rtpstats_base.go +++ b/pkg/sfu/buffer/rtpstats_base.go @@ -114,6 +114,7 @@ type RTCPSenderReportData struct { RTPTimestampExt uint64 NTPTimestamp mediatransportutil.NtpTime At time.Time + AtAdjusted time.Time } func (r *RTCPSenderReportData) ToString() string { @@ -121,7 +122,7 @@ func (r *RTCPSenderReportData) ToString() string { return "" } - return fmt.Sprintf("ntp: %s, rtp: %d, extRtp: %d, at: %s", r.NTPTimestamp.Time().String(), r.RTPTimestamp, r.RTPTimestampExt, r.At.String()) + return fmt.Sprintf("ntp: %s, rtp: %d, extRtp: %d, at: %s, atAdj: %s", r.NTPTimestamp.Time().String(), r.RTPTimestamp, r.RTPTimestampExt, r.At.String(), r.AtAdjusted.String()) } func (r *RTCPSenderReportData) MarshalLogObject(e zapcore.ObjectEncoder) error { @@ -133,6 +134,7 @@ func (r *RTCPSenderReportData) MarshalLogObject(e zapcore.ObjectEncoder) error { e.AddUint32("RTPTimestamp", r.RTPTimestamp) e.AddUint64("RTPTimestampExt", r.RTPTimestampExt) e.AddTime("At", r.At) + e.AddTime("AtAdjusted", r.AtAdjusted) return nil } @@ -495,7 +497,7 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *RTCPSenderReportData, // abnormal delay (maybe due to pacing or maybe due to queuing // in some network element along the way), push back first time // to an earlier instance. - timeSinceReceive := time.Since(srData.At) + timeSinceReceive := time.Since(srData.AtAdjusted) extNowTS := srData.RTPTimestampExt - tsOffset + uint64(timeSinceReceive.Nanoseconds()*int64(r.params.ClockRate)/1e9) samplesDiff := int64(extNowTS - extStartTS) if samplesDiff < 0 { diff --git a/pkg/sfu/buffer/rtpstats_receiver.go b/pkg/sfu/buffer/rtpstats_receiver.go index ecdc0be8c..706be886f 100644 --- a/pkg/sfu/buffer/rtpstats_receiver.go +++ b/pkg/sfu/buffer/rtpstats_receiver.go @@ -43,20 +43,19 @@ const ( cPropagationDelaySpikeAdaptationFactor = float64(0.5) - cPropagationDelayDeltaMaxInterval = 10 * time.Second - // To account for path changes mid-stream, if the delta of the propagation delay is consistently higher, reset. // Reset at whichever of the below happens later. // 1. 10 seconds of persistent high delta. - // 2. at least 2 reports with high delta. + // 2. at least 2 consecutive reports with high delta. // - // A long term version of delta of propagation delay is maintained and delta propagation delay exceeding - // a factor of the long term version is considered a sharp increase. That will trigger the start of the + // A long term estimate of delta of propagation delay is maintained and delta propagation delay exceeding + // a factor of the long term estimate is considered a sharp increase. That will trigger the start of the // path change condition and if it persists, propagation delay will be reset. - cPropagationDelayDeltaThresholdMin = 10 * time.Millisecond - cPropagationDelayDeltaThresholdMaxFactor = 2 - cPropagationDelayDeltaHighResetNumReports = 2 - cPropagationDelayDeltaHighResetWait = 10 * time.Second + cPropagationDelayDeltaThresholdMin = 10 * time.Millisecond + cPropagationDelayDeltaThresholdMaxFactor = 2 + cPropagationDelayDeltaHighResetNumReports = 2 + cPropagationDelayDeltaHighResetWait = 10 * time.Second + cPropagationDelayDeltaLongTermAdaptationThreshold = 50 * time.Millisecond ) type RTPFlowState struct { @@ -369,6 +368,7 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData) "receivedDeltaPropagationDelay", deltaPropagationDelay.String(), "deltaHighCount", r.propagationDelayDeltaHighCount, "sinceDeltaHighStart", time.Since(r.propagationDelayDeltaHighStartTime).String(), + "propagationDelaySpike", r.propagationDelaySpike.String(), "first", r.srFirst, "last", r.srNewest, "current", &srDataCopy, @@ -395,9 +395,11 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData) r.logger.Debugw("initializing propagation delay", getPropagationFields()...) } else { deltaPropagationDelay = propagationDelay - r.propagationDelay - if deltaPropagationDelay.Abs() > cPropagationDelayDeltaThresholdMin { // ignore small changes for path change consideration - if r.longTermDeltaPropagationDelay != 0 && deltaPropagationDelay > 0 && deltaPropagationDelay > r.longTermDeltaPropagationDelay*time.Duration(cPropagationDelayDeltaThresholdMaxFactor) { - r.logger.Debugw("sharp increase in propagation delay, skipping", getPropagationFields()...) // TODO-REMOVE + if deltaPropagationDelay > cPropagationDelayDeltaThresholdMin { // ignore small changes for path change consideration + if r.longTermDeltaPropagationDelay != 0 && + deltaPropagationDelay > 0 && + deltaPropagationDelay > r.longTermDeltaPropagationDelay*time.Duration(cPropagationDelayDeltaThresholdMaxFactor) { + r.logger.Debugw("sharp increase in propagation delay", getPropagationFields()...) // TODO-REMOVE r.propagationDelayDeltaHighCount++ if r.propagationDelayDeltaHighStartTime.IsZero() { r.propagationDelayDeltaHighStartTime = time.Now() @@ -412,6 +414,8 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData) r.logger.Debugw("re-initializing propagation delay", append(getPropagationFields(), "newPropagationDelay", propagationDelay.String())...) initPropagationDelay(r.propagationDelaySpike) } + } else { + resetDelta() } } else { resetDelta() @@ -432,13 +436,17 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData) if r.longTermDeltaPropagationDelay == 0 { r.longTermDeltaPropagationDelay = deltaPropagationDelay } else { - sinceLastReport := srDataCopy.NTPTimestamp.Time().Sub(r.srNewest.NTPTimestamp.Time()) - adaptationFactor := min(1.0, float64(sinceLastReport)/float64(cPropagationDelayDeltaMaxInterval)) - r.longTermDeltaPropagationDelay += time.Duration(adaptationFactor * float64(deltaPropagationDelay-r.longTermDeltaPropagationDelay)) + if deltaPropagationDelay < cPropagationDelayDeltaLongTermAdaptationThreshold { + // do not adapt to large +ve spikes, can happen when channel is congested and reports are delivered very late + // if the spike is in fact a path change, it will persist and handled by path change detection above + sinceLastReport := srDataCopy.NTPTimestamp.Time().Sub(r.srNewest.NTPTimestamp.Time()) + adaptationFactor := min(1.0, float64(sinceLastReport)/float64(cPropagationDelayDeltaHighResetWait)) + r.longTermDeltaPropagationDelay += time.Duration(adaptationFactor * float64(deltaPropagationDelay-r.longTermDeltaPropagationDelay)) + } } } // adjust receive time to estimated propagation delay - srDataCopy.At = ntpTime.Add(r.propagationDelay) + srDataCopy.AtAdjusted = ntpTime.Add(r.propagationDelay) r.srNewest = &srDataCopy r.maybeAdjustFirstPacketTime(r.srNewest, 0, r.timestamp.GetExtendedStart()) diff --git a/pkg/sfu/buffer/rtpstats_sender.go b/pkg/sfu/buffer/rtpstats_sender.go index 4fe39c0c5..b7cc94c06 100644 --- a/pkg/sfu/buffer/rtpstats_sender.go +++ b/pkg/sfu/buffer/rtpstats_sender.go @@ -633,8 +633,8 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, publisherSRData *RTCPS return nil } - timeSincePublisherSR := time.Since(publisherSRData.At) - now := publisherSRData.At.Add(timeSincePublisherSR) + timeSincePublisherSR := time.Since(publisherSRData.AtAdjusted) + now := publisherSRData.AtAdjusted.Add(timeSincePublisherSR) nowNTP := mediatransportutil.ToNtpTime(now) nowRTPExt := publisherSRData.RTPTimestampExt - tsOffset + uint64(timeSincePublisherSR.Nanoseconds()*int64(r.params.ClockRate)/1e9) @@ -643,6 +643,7 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, publisherSRData *RTCPS RTPTimestamp: uint32(nowRTPExt), RTPTimestampExt: nowRTPExt, At: now, + AtAdjusted: now, } getFields := func() []interface{} {