From 1fb8964814eb3eca32f0baa6cec55b6a3b0df050 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Mon, 19 Aug 2024 12:03:45 +0530 Subject: [PATCH] Refactor propagation delay estimator. (#2941) * Refactor propagation delay estimator. NOTE: It is not possible to calculate OWD (one-way-delay) in a passive fashion. So, this should not be used for anything requiring high precision. But, mainly factoring it out as a separate object just in case it can be re-used. TODO: - probably has some edge case that is not handled well - maybe path change detection can be improved - will write UT later. This is just purely splitting it out from what was embedded in RTPStatsReceiver. * fix labels * precision -> accuracy --- pkg/sfu/buffer/rtpstats_receiver.go | 131 +++------------------ pkg/sfu/utils/owd_estimator.go | 175 ++++++++++++++++++++++++++++ 2 files changed, 191 insertions(+), 115 deletions(-) create mode 100644 pkg/sfu/utils/owd_estimator.go diff --git a/pkg/sfu/buffer/rtpstats_receiver.go b/pkg/sfu/buffer/rtpstats_receiver.go index 3ac1461f0..2ae60072b 100644 --- a/pkg/sfu/buffer/rtpstats_receiver.go +++ b/pkg/sfu/buffer/rtpstats_receiver.go @@ -30,34 +30,6 @@ import ( const ( cHistorySize = 8192 - // RTCP Sender Reports are re-based to SFU time base so that all subscriber side - // can have the same time base (i. e. SFU time base). To convert publisher side - // RTCP Sender Reports to SFU timebase, a propagation delay is maintained. - // propagation_delay = time_of_report_reception - ntp_timestamp_in_report - // - // Propagation delay is adapted continuously. If it falls, adapt quickly to the - // lower value as that could be the real propagation delay. If it rises, adapt slowly - // as it might be a temporary change or slow drift. See below for handling of high deltas - // which could be a result of a path change. - cPropagationDelayFallFactor = float64(0.9) - cPropagationDelayRiseFactor = float64(0.1) - - cPropagationDelaySpikeAdaptationFactor = float64(0.5) - - // To account for path changes mid-stream, if the delta of the propagation delay is consistently higher, reset. - // Reset at whichever of the below happens later. - // 1. 10 seconds of persistent high delta. - // 2. at least 2 consecutive reports with high delta. - // - // A long term estimate of delta of propagation delay is maintained and delta propagation delay exceeding - // a factor of the long term estimate is considered a sharp increase. That will trigger the start of the - // path change condition and if it persists, propagation delay will be reset. - cPropagationDelayDeltaThresholdMin = 10 * time.Millisecond - cPropagationDelayDeltaThresholdMaxFactor = 2 - cPropagationDelayDeltaHighResetNumReports = 2 - cPropagationDelayDeltaHighResetWait = 10 * time.Second - cPropagationDelayDeltaLongTermAdaptationThreshold = 50 * time.Millisecond - // number of seconds the current report RTP timestamp can be off from expected RTP timestamp cReportSlack = float64(60.0) ) @@ -106,11 +78,7 @@ type RTPStatsReceiver struct { history *protoutils.Bitmap[uint64] - propagationDelay time.Duration - longTermDeltaPropagationDelay time.Duration - propagationDelayDeltaHighCount int - propagationDelayDeltaHighStartTime time.Time - propagationDelaySpike time.Duration + propagationDelayEstimator *utils.OWDEstimator clockSkewCount int clockSkewMediaPathCount int @@ -122,11 +90,12 @@ type RTPStatsReceiver struct { func NewRTPStatsReceiver(params RTPStatsParams) *RTPStatsReceiver { return &RTPStatsReceiver{ - rtpStatsBase: newRTPStatsBase(params), - sequenceNumber: utils.NewWrapAround[uint16, uint64](utils.WrapAroundParams{IsRestartAllowed: false}), - tsRolloverThreshold: (1 << 31) * 1e9 / int64(params.ClockRate), - timestamp: utils.NewWrapAround[uint32, uint64](utils.WrapAroundParams{IsRestartAllowed: false}), - history: protoutils.NewBitmap[uint64](cHistorySize), + rtpStatsBase: newRTPStatsBase(params), + sequenceNumber: utils.NewWrapAround[uint16, uint64](utils.WrapAroundParams{IsRestartAllowed: false}), + tsRolloverThreshold: (1 << 31) * 1e9 / int64(params.ClockRate), + timestamp: utils.NewWrapAround[uint32, uint64](utils.WrapAroundParams{IsRestartAllowed: false}), + history: protoutils.NewBitmap[uint64](cHistorySize), + propagationDelayEstimator: utils.NewOWDEstimator(utils.OWDEstimatorParamsDefault), } } @@ -512,88 +481,21 @@ func (r *RTPStatsReceiver) checkRTPClockSkewAgainstMediaPathForSenderReport(srDa } func (r *RTPStatsReceiver) updatePropagationDelayAndRecordSenderReport(srData *RTCPSenderReportData) { - var propagationDelay time.Duration - var deltaPropagationDelay time.Duration - getPropagationFields := func() []interface{} { - return []interface{}{ - "receivedPropagationDelay", propagationDelay.String(), - "receivedDeltaPropagationDelay", deltaPropagationDelay.String(), - "deltaHighCount", r.propagationDelayDeltaHighCount, - "sinceDeltaHighStart", time.Since(r.propagationDelayDeltaHighStartTime).String(), - "propagationDelaySpike", r.propagationDelaySpike.String(), - "current", srData, + senderClockTime := srData.NTPTimestamp.Time() + estimatedPropagationDelay, stepChange := r.propagationDelayEstimator.Update(senderClockTime, srData.At) + if stepChange { + r.logger.Debugw( + "propagation delay step change", + "currentSenderReport", srData, "rtpStats", lockedRTPStatsReceiverLogEncoder{r}, - } - } - resetDelta := func() { - r.propagationDelayDeltaHighCount = 0 - r.propagationDelayDeltaHighStartTime = time.Time{} - r.propagationDelaySpike = 0 - } - initPropagationDelay := func(pd time.Duration) { - r.propagationDelay = pd - - r.longTermDeltaPropagationDelay = 0 - - resetDelta() + ) } - ntpTime := srData.NTPTimestamp.Time() - propagationDelay = srData.At.Sub(ntpTime) if r.srFirst == nil { r.srFirst = srData - initPropagationDelay(propagationDelay) - r.logger.Debugw("initializing propagation delay", getPropagationFields()...) - } else { - deltaPropagationDelay = propagationDelay - r.propagationDelay - if deltaPropagationDelay > cPropagationDelayDeltaThresholdMin { // ignore small changes for path change consideration - if r.longTermDeltaPropagationDelay != 0 && - deltaPropagationDelay > r.longTermDeltaPropagationDelay*time.Duration(cPropagationDelayDeltaThresholdMaxFactor) { - r.logger.Debugw("sharp increase in propagation delay", getPropagationFields()...) - r.propagationDelayDeltaHighCount++ - if r.propagationDelayDeltaHighStartTime.IsZero() { - r.propagationDelayDeltaHighStartTime = time.Now() - } - if r.propagationDelaySpike == 0 { - r.propagationDelaySpike = propagationDelay - } else { - r.propagationDelaySpike += time.Duration(cPropagationDelaySpikeAdaptationFactor * float64(propagationDelay-r.propagationDelaySpike)) - } - - if r.propagationDelayDeltaHighCount >= cPropagationDelayDeltaHighResetNumReports && time.Since(r.propagationDelayDeltaHighStartTime) >= cPropagationDelayDeltaHighResetWait { - r.logger.Debugw("re-initializing propagation delay", append(getPropagationFields(), "newPropagationDelay", r.propagationDelaySpike.String())...) - initPropagationDelay(r.propagationDelaySpike) - } - } else { - resetDelta() - } - } else { - resetDelta() - - factor := cPropagationDelayFallFactor - if propagationDelay > r.propagationDelay { - factor = cPropagationDelayRiseFactor - } - r.propagationDelay += time.Duration(factor * float64(propagationDelay-r.propagationDelay)) - } - - if deltaPropagationDelay < cPropagationDelayDeltaLongTermAdaptationThreshold { - if r.longTermDeltaPropagationDelay == 0 { - r.longTermDeltaPropagationDelay = deltaPropagationDelay - } else { - // do not adapt to large +ve spikes, can happen when channel is congested and reports are delivered very late - // if the spike is in fact a path change, it will persist and handled by path change detection above - sinceLastReport := srData.NTPTimestamp.Time().Sub(r.srNewest.NTPTimestamp.Time()) - adaptationFactor := min(1.0, float64(sinceLastReport)/float64(cPropagationDelayDeltaHighResetWait)) - r.longTermDeltaPropagationDelay += time.Duration(adaptationFactor * float64(deltaPropagationDelay-r.longTermDeltaPropagationDelay)) - } - } - if r.longTermDeltaPropagationDelay < 0 { - r.longTermDeltaPropagationDelay = 0 - } } // adjust receive time to estimated propagation delay - srData.AtAdjusted = ntpTime.Add(r.propagationDelay) + srData.AtAdjusted = senderClockTime.Add(estimatedPropagationDelay) r.srNewest = srData } @@ -792,8 +694,7 @@ func (r lockedRTPStatsReceiverLogEncoder) MarshalLogObject(e zapcore.ObjectEncod e.AddUint64("extStartTS", r.timestamp.GetExtendedStart()) e.AddUint64("extHighestTS", r.timestamp.GetExtendedHighest()) - e.AddDuration("propagationDelay", r.propagationDelay) - e.AddDuration("longTermDeltaPropagationDelay", r.longTermDeltaPropagationDelay) + e.AddObject("propagationDelayEstimator", r.propagationDelayEstimator) return nil } diff --git a/pkg/sfu/utils/owd_estimator.go b/pkg/sfu/utils/owd_estimator.go new file mode 100644 index 000000000..90f5fe1e2 --- /dev/null +++ b/pkg/sfu/utils/owd_estimator.go @@ -0,0 +1,175 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "time" + + "go.uber.org/zap/zapcore" +) + +type OWDEstimatorParams struct { + PropagationDelayFallFactor float64 + PropagationDelayRiseFactor float64 + + PropagationDelaySpikeAdaptationFactor float64 + + PropagationDelayDeltaThresholdMin time.Duration + PropagationDelayDeltaThresholdMaxFactor int + PropagationDelayDeltaHighResetNumReports int + PropagationDelayDeltaHighResetWait time.Duration + PropagationDelayDeltaLongTermAdaptationThreshold time.Duration +} + +var OWDEstimatorParamsDefault = OWDEstimatorParams{ + // OWD (One-Way-Delay) Estimator is used to estimate propagation delay between sender and receicer. + // As they operate on different clock domains, it is not possible to get exact propagation delay easily. + // So, this module is an estimator using a simple approach explained below. It should not be used for + // things that require high accuracy. + // + // One example is RTCP Sender Reports getting re-based to SFU time base so that all subscriber side + // can have the same time base (i. e. SFU time base). To convert publisher side + // RTCP Sender Reports to SFU timebase, a propagation delay is maintained. + // propagation_delay = time_of_report_reception - ntp_timestamp_in_report + // + // Propagation delay is adapted continuously. If it falls, adapt quickly to the + // lower value as that could be the real propagation delay. If it rises, adapt slowly + // as it might be a temporary change or slow drift. See below for handling of high deltas + // which could be a result of a path change. + PropagationDelayFallFactor: 0.9, + PropagationDelayRiseFactor: 0.1, + + PropagationDelaySpikeAdaptationFactor: 0.5, + + // To account for path changes mid-stream, if the delta of the propagation delay is consistently higher, reset. + // Reset at whichever of the below happens later. + // 1. 10 seconds of persistent high delta. + // 2. at least 2 consecutive reports with high delta. + // + // A long term estimate of delta of propagation delay is maintained and delta propagation delay exceeding + // a factor of the long term estimate is considered a sharp increase. That will trigger the start of the + // path change condition and if it persists, propagation delay will be reset. + PropagationDelayDeltaThresholdMin: 10 * time.Millisecond, + PropagationDelayDeltaThresholdMaxFactor: 2, + PropagationDelayDeltaHighResetNumReports: 2, + PropagationDelayDeltaHighResetWait: 10 * time.Second, + PropagationDelayDeltaLongTermAdaptationThreshold: 50 * time.Millisecond, +} + +type OWDEstimator struct { + params OWDEstimatorParams + + initialized bool + lastSenderClockTime time.Time + lastPropagationDelay time.Duration + lastDeltaPropagationDelay time.Duration + estimatedPropagationDelay time.Duration + longTermDeltaPropagationDelay time.Duration + propagationDelayDeltaHighCount int + propagationDelayDeltaHighStartTime time.Time + propagationDelaySpike time.Duration +} + +func NewOWDEstimator(params OWDEstimatorParams) *OWDEstimator { + return &OWDEstimator{ + params: params, + } +} + +func (o *OWDEstimator) MarshalLogObject(e zapcore.ObjectEncoder) error { + if o != nil { + e.AddTime("lastSenderClockTime", o.lastSenderClockTime) + e.AddDuration("lastPropagationDelay", o.lastPropagationDelay) + e.AddDuration("lastDeltaPropagationDelay", o.lastDeltaPropagationDelay) + e.AddDuration("estimatedPropagationDelay", o.estimatedPropagationDelay) + e.AddDuration("longTermDeltaPropagationDelay", o.longTermDeltaPropagationDelay) + e.AddInt("propagationDelayDeltaHighCount", o.propagationDelayDeltaHighCount) + e.AddTime("propagationDelayDeltaHighStartTime", o.propagationDelayDeltaHighStartTime) + e.AddDuration("propagationDelaySpike", o.propagationDelaySpike) + } + return nil +} + +func (o *OWDEstimator) Update(senderClockTime time.Time, receiverClockTime time.Time) (time.Duration, bool) { + resetDelta := func() { + o.propagationDelayDeltaHighCount = 0 + o.propagationDelayDeltaHighStartTime = time.Time{} + o.propagationDelaySpike = 0 + } + + initPropagationDelay := func(pd time.Duration) { + o.estimatedPropagationDelay = pd + o.longTermDeltaPropagationDelay = 0 + resetDelta() + } + + o.lastPropagationDelay = receiverClockTime.Sub(senderClockTime) + if !o.initialized { + o.initialized = true + o.lastSenderClockTime = senderClockTime + initPropagationDelay(o.lastPropagationDelay) + return o.estimatedPropagationDelay, true + } + + stepChange := false + o.lastDeltaPropagationDelay = o.lastPropagationDelay - o.estimatedPropagationDelay + // check for path changes, i. e. a step jump increase in propagation delay observed over time + if o.lastDeltaPropagationDelay > o.params.PropagationDelayDeltaThresholdMin { // ignore small changes for path change consideration + if o.longTermDeltaPropagationDelay != 0 && + o.lastDeltaPropagationDelay > o.longTermDeltaPropagationDelay*time.Duration(o.params.PropagationDelayDeltaThresholdMaxFactor) { + o.propagationDelayDeltaHighCount++ + if o.propagationDelayDeltaHighStartTime.IsZero() { + o.propagationDelayDeltaHighStartTime = time.Now() + } + if o.propagationDelaySpike == 0 { + o.propagationDelaySpike = o.lastPropagationDelay + } else { + o.propagationDelaySpike += time.Duration(o.params.PropagationDelaySpikeAdaptationFactor * float64(o.lastPropagationDelay-o.propagationDelaySpike)) + } + + if o.propagationDelayDeltaHighCount >= o.params.PropagationDelayDeltaHighResetNumReports && time.Since(o.propagationDelayDeltaHighStartTime) >= o.params.PropagationDelayDeltaHighResetWait { + stepChange = true + initPropagationDelay(o.propagationDelaySpike) + } + } else { + resetDelta() + } + } else { + resetDelta() + + factor := o.params.PropagationDelayFallFactor + if o.lastPropagationDelay > o.estimatedPropagationDelay { + factor = o.params.PropagationDelayRiseFactor + } + o.estimatedPropagationDelay += time.Duration(factor * float64(o.lastPropagationDelay-o.estimatedPropagationDelay)) + } + + if o.lastDeltaPropagationDelay < o.params.PropagationDelayDeltaLongTermAdaptationThreshold { + if o.longTermDeltaPropagationDelay == 0 { + o.longTermDeltaPropagationDelay = o.lastDeltaPropagationDelay + } else { + // do not adapt to large +ve spikes, can happen when channel is congested and reports are delivered very late + // if the spike is in fact a path change, it will persist and handled by path change detection above + sinceLast := senderClockTime.Sub(o.lastSenderClockTime) + adaptationFactor := min(1.0, float64(sinceLast)/float64(o.params.PropagationDelayDeltaHighResetWait)) + o.longTermDeltaPropagationDelay += time.Duration(adaptationFactor * float64(o.lastDeltaPropagationDelay-o.longTermDeltaPropagationDelay)) + } + } + if o.longTermDeltaPropagationDelay < 0 { + o.longTermDeltaPropagationDelay = 0 + } + o.lastSenderClockTime = senderClockTime + return o.estimatedPropagationDelay, stepChange +}