Refactor propagation delay estimator. (#2941)

* Refactor propagation delay estimator.

NOTE: It is not possible to calculate OWD (one-way-delay) in a passive
fashion. So, this should not be used for anything requiring high
precision.

But, mainly factoring it out as a separate object just in case it can be
re-used.

TODO:
- probably has some edge case that is not handled well
- maybe path change detection can be improved
- will write UT later. This is just purely splitting it out from what
  was embedded in RTPStatsReceiver.

* fix labels

* precision -> accuracy
This commit is contained in:
Raja Subramanian
2024-08-19 12:03:45 +05:30
committed by GitHub
parent ebc6e49d52
commit 1fb8964814
2 changed files with 191 additions and 115 deletions
+16 -115
View File
@@ -30,34 +30,6 @@ import (
const (
cHistorySize = 8192
// RTCP Sender Reports are re-based to SFU time base so that all subscriber side
// can have the same time base (i. e. SFU time base). To convert publisher side
// RTCP Sender Reports to SFU timebase, a propagation delay is maintained.
// propagation_delay = time_of_report_reception - ntp_timestamp_in_report
//
// Propagation delay is adapted continuously. If it falls, adapt quickly to the
// lower value as that could be the real propagation delay. If it rises, adapt slowly
// as it might be a temporary change or slow drift. See below for handling of high deltas
// which could be a result of a path change.
cPropagationDelayFallFactor = float64(0.9)
cPropagationDelayRiseFactor = float64(0.1)
cPropagationDelaySpikeAdaptationFactor = float64(0.5)
// To account for path changes mid-stream, if the delta of the propagation delay is consistently higher, reset.
// Reset at whichever of the below happens later.
// 1. 10 seconds of persistent high delta.
// 2. at least 2 consecutive reports with high delta.
//
// A long term estimate of delta of propagation delay is maintained and delta propagation delay exceeding
// a factor of the long term estimate is considered a sharp increase. That will trigger the start of the
// path change condition and if it persists, propagation delay will be reset.
cPropagationDelayDeltaThresholdMin = 10 * time.Millisecond
cPropagationDelayDeltaThresholdMaxFactor = 2
cPropagationDelayDeltaHighResetNumReports = 2
cPropagationDelayDeltaHighResetWait = 10 * time.Second
cPropagationDelayDeltaLongTermAdaptationThreshold = 50 * time.Millisecond
// number of seconds the current report RTP timestamp can be off from expected RTP timestamp
cReportSlack = float64(60.0)
)
@@ -106,11 +78,7 @@ type RTPStatsReceiver struct {
history *protoutils.Bitmap[uint64]
propagationDelay time.Duration
longTermDeltaPropagationDelay time.Duration
propagationDelayDeltaHighCount int
propagationDelayDeltaHighStartTime time.Time
propagationDelaySpike time.Duration
propagationDelayEstimator *utils.OWDEstimator
clockSkewCount int
clockSkewMediaPathCount int
@@ -122,11 +90,12 @@ type RTPStatsReceiver struct {
func NewRTPStatsReceiver(params RTPStatsParams) *RTPStatsReceiver {
return &RTPStatsReceiver{
rtpStatsBase: newRTPStatsBase(params),
sequenceNumber: utils.NewWrapAround[uint16, uint64](utils.WrapAroundParams{IsRestartAllowed: false}),
tsRolloverThreshold: (1 << 31) * 1e9 / int64(params.ClockRate),
timestamp: utils.NewWrapAround[uint32, uint64](utils.WrapAroundParams{IsRestartAllowed: false}),
history: protoutils.NewBitmap[uint64](cHistorySize),
rtpStatsBase: newRTPStatsBase(params),
sequenceNumber: utils.NewWrapAround[uint16, uint64](utils.WrapAroundParams{IsRestartAllowed: false}),
tsRolloverThreshold: (1 << 31) * 1e9 / int64(params.ClockRate),
timestamp: utils.NewWrapAround[uint32, uint64](utils.WrapAroundParams{IsRestartAllowed: false}),
history: protoutils.NewBitmap[uint64](cHistorySize),
propagationDelayEstimator: utils.NewOWDEstimator(utils.OWDEstimatorParamsDefault),
}
}
@@ -512,88 +481,21 @@ func (r *RTPStatsReceiver) checkRTPClockSkewAgainstMediaPathForSenderReport(srDa
}
func (r *RTPStatsReceiver) updatePropagationDelayAndRecordSenderReport(srData *RTCPSenderReportData) {
var propagationDelay time.Duration
var deltaPropagationDelay time.Duration
getPropagationFields := func() []interface{} {
return []interface{}{
"receivedPropagationDelay", propagationDelay.String(),
"receivedDeltaPropagationDelay", deltaPropagationDelay.String(),
"deltaHighCount", r.propagationDelayDeltaHighCount,
"sinceDeltaHighStart", time.Since(r.propagationDelayDeltaHighStartTime).String(),
"propagationDelaySpike", r.propagationDelaySpike.String(),
"current", srData,
senderClockTime := srData.NTPTimestamp.Time()
estimatedPropagationDelay, stepChange := r.propagationDelayEstimator.Update(senderClockTime, srData.At)
if stepChange {
r.logger.Debugw(
"propagation delay step change",
"currentSenderReport", srData,
"rtpStats", lockedRTPStatsReceiverLogEncoder{r},
}
}
resetDelta := func() {
r.propagationDelayDeltaHighCount = 0
r.propagationDelayDeltaHighStartTime = time.Time{}
r.propagationDelaySpike = 0
}
initPropagationDelay := func(pd time.Duration) {
r.propagationDelay = pd
r.longTermDeltaPropagationDelay = 0
resetDelta()
)
}
ntpTime := srData.NTPTimestamp.Time()
propagationDelay = srData.At.Sub(ntpTime)
if r.srFirst == nil {
r.srFirst = srData
initPropagationDelay(propagationDelay)
r.logger.Debugw("initializing propagation delay", getPropagationFields()...)
} else {
deltaPropagationDelay = propagationDelay - r.propagationDelay
if deltaPropagationDelay > cPropagationDelayDeltaThresholdMin { // ignore small changes for path change consideration
if r.longTermDeltaPropagationDelay != 0 &&
deltaPropagationDelay > r.longTermDeltaPropagationDelay*time.Duration(cPropagationDelayDeltaThresholdMaxFactor) {
r.logger.Debugw("sharp increase in propagation delay", getPropagationFields()...)
r.propagationDelayDeltaHighCount++
if r.propagationDelayDeltaHighStartTime.IsZero() {
r.propagationDelayDeltaHighStartTime = time.Now()
}
if r.propagationDelaySpike == 0 {
r.propagationDelaySpike = propagationDelay
} else {
r.propagationDelaySpike += time.Duration(cPropagationDelaySpikeAdaptationFactor * float64(propagationDelay-r.propagationDelaySpike))
}
if r.propagationDelayDeltaHighCount >= cPropagationDelayDeltaHighResetNumReports && time.Since(r.propagationDelayDeltaHighStartTime) >= cPropagationDelayDeltaHighResetWait {
r.logger.Debugw("re-initializing propagation delay", append(getPropagationFields(), "newPropagationDelay", r.propagationDelaySpike.String())...)
initPropagationDelay(r.propagationDelaySpike)
}
} else {
resetDelta()
}
} else {
resetDelta()
factor := cPropagationDelayFallFactor
if propagationDelay > r.propagationDelay {
factor = cPropagationDelayRiseFactor
}
r.propagationDelay += time.Duration(factor * float64(propagationDelay-r.propagationDelay))
}
if deltaPropagationDelay < cPropagationDelayDeltaLongTermAdaptationThreshold {
if r.longTermDeltaPropagationDelay == 0 {
r.longTermDeltaPropagationDelay = deltaPropagationDelay
} else {
// do not adapt to large +ve spikes, can happen when channel is congested and reports are delivered very late
// if the spike is in fact a path change, it will persist and handled by path change detection above
sinceLastReport := srData.NTPTimestamp.Time().Sub(r.srNewest.NTPTimestamp.Time())
adaptationFactor := min(1.0, float64(sinceLastReport)/float64(cPropagationDelayDeltaHighResetWait))
r.longTermDeltaPropagationDelay += time.Duration(adaptationFactor * float64(deltaPropagationDelay-r.longTermDeltaPropagationDelay))
}
}
if r.longTermDeltaPropagationDelay < 0 {
r.longTermDeltaPropagationDelay = 0
}
}
// adjust receive time to estimated propagation delay
srData.AtAdjusted = ntpTime.Add(r.propagationDelay)
srData.AtAdjusted = senderClockTime.Add(estimatedPropagationDelay)
r.srNewest = srData
}
@@ -792,8 +694,7 @@ func (r lockedRTPStatsReceiverLogEncoder) MarshalLogObject(e zapcore.ObjectEncod
e.AddUint64("extStartTS", r.timestamp.GetExtendedStart())
e.AddUint64("extHighestTS", r.timestamp.GetExtendedHighest())
e.AddDuration("propagationDelay", r.propagationDelay)
e.AddDuration("longTermDeltaPropagationDelay", r.longTermDeltaPropagationDelay)
e.AddObject("propagationDelayEstimator", r.propagationDelayEstimator)
return nil
}
+175
View File
@@ -0,0 +1,175 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package utils
import (
"time"
"go.uber.org/zap/zapcore"
)
type OWDEstimatorParams struct {
PropagationDelayFallFactor float64
PropagationDelayRiseFactor float64
PropagationDelaySpikeAdaptationFactor float64
PropagationDelayDeltaThresholdMin time.Duration
PropagationDelayDeltaThresholdMaxFactor int
PropagationDelayDeltaHighResetNumReports int
PropagationDelayDeltaHighResetWait time.Duration
PropagationDelayDeltaLongTermAdaptationThreshold time.Duration
}
var OWDEstimatorParamsDefault = OWDEstimatorParams{
// OWD (One-Way-Delay) Estimator is used to estimate propagation delay between sender and receicer.
// As they operate on different clock domains, it is not possible to get exact propagation delay easily.
// So, this module is an estimator using a simple approach explained below. It should not be used for
// things that require high accuracy.
//
// One example is RTCP Sender Reports getting re-based to SFU time base so that all subscriber side
// can have the same time base (i. e. SFU time base). To convert publisher side
// RTCP Sender Reports to SFU timebase, a propagation delay is maintained.
// propagation_delay = time_of_report_reception - ntp_timestamp_in_report
//
// Propagation delay is adapted continuously. If it falls, adapt quickly to the
// lower value as that could be the real propagation delay. If it rises, adapt slowly
// as it might be a temporary change or slow drift. See below for handling of high deltas
// which could be a result of a path change.
PropagationDelayFallFactor: 0.9,
PropagationDelayRiseFactor: 0.1,
PropagationDelaySpikeAdaptationFactor: 0.5,
// To account for path changes mid-stream, if the delta of the propagation delay is consistently higher, reset.
// Reset at whichever of the below happens later.
// 1. 10 seconds of persistent high delta.
// 2. at least 2 consecutive reports with high delta.
//
// A long term estimate of delta of propagation delay is maintained and delta propagation delay exceeding
// a factor of the long term estimate is considered a sharp increase. That will trigger the start of the
// path change condition and if it persists, propagation delay will be reset.
PropagationDelayDeltaThresholdMin: 10 * time.Millisecond,
PropagationDelayDeltaThresholdMaxFactor: 2,
PropagationDelayDeltaHighResetNumReports: 2,
PropagationDelayDeltaHighResetWait: 10 * time.Second,
PropagationDelayDeltaLongTermAdaptationThreshold: 50 * time.Millisecond,
}
type OWDEstimator struct {
params OWDEstimatorParams
initialized bool
lastSenderClockTime time.Time
lastPropagationDelay time.Duration
lastDeltaPropagationDelay time.Duration
estimatedPropagationDelay time.Duration
longTermDeltaPropagationDelay time.Duration
propagationDelayDeltaHighCount int
propagationDelayDeltaHighStartTime time.Time
propagationDelaySpike time.Duration
}
func NewOWDEstimator(params OWDEstimatorParams) *OWDEstimator {
return &OWDEstimator{
params: params,
}
}
func (o *OWDEstimator) MarshalLogObject(e zapcore.ObjectEncoder) error {
if o != nil {
e.AddTime("lastSenderClockTime", o.lastSenderClockTime)
e.AddDuration("lastPropagationDelay", o.lastPropagationDelay)
e.AddDuration("lastDeltaPropagationDelay", o.lastDeltaPropagationDelay)
e.AddDuration("estimatedPropagationDelay", o.estimatedPropagationDelay)
e.AddDuration("longTermDeltaPropagationDelay", o.longTermDeltaPropagationDelay)
e.AddInt("propagationDelayDeltaHighCount", o.propagationDelayDeltaHighCount)
e.AddTime("propagationDelayDeltaHighStartTime", o.propagationDelayDeltaHighStartTime)
e.AddDuration("propagationDelaySpike", o.propagationDelaySpike)
}
return nil
}
func (o *OWDEstimator) Update(senderClockTime time.Time, receiverClockTime time.Time) (time.Duration, bool) {
resetDelta := func() {
o.propagationDelayDeltaHighCount = 0
o.propagationDelayDeltaHighStartTime = time.Time{}
o.propagationDelaySpike = 0
}
initPropagationDelay := func(pd time.Duration) {
o.estimatedPropagationDelay = pd
o.longTermDeltaPropagationDelay = 0
resetDelta()
}
o.lastPropagationDelay = receiverClockTime.Sub(senderClockTime)
if !o.initialized {
o.initialized = true
o.lastSenderClockTime = senderClockTime
initPropagationDelay(o.lastPropagationDelay)
return o.estimatedPropagationDelay, true
}
stepChange := false
o.lastDeltaPropagationDelay = o.lastPropagationDelay - o.estimatedPropagationDelay
// check for path changes, i. e. a step jump increase in propagation delay observed over time
if o.lastDeltaPropagationDelay > o.params.PropagationDelayDeltaThresholdMin { // ignore small changes for path change consideration
if o.longTermDeltaPropagationDelay != 0 &&
o.lastDeltaPropagationDelay > o.longTermDeltaPropagationDelay*time.Duration(o.params.PropagationDelayDeltaThresholdMaxFactor) {
o.propagationDelayDeltaHighCount++
if o.propagationDelayDeltaHighStartTime.IsZero() {
o.propagationDelayDeltaHighStartTime = time.Now()
}
if o.propagationDelaySpike == 0 {
o.propagationDelaySpike = o.lastPropagationDelay
} else {
o.propagationDelaySpike += time.Duration(o.params.PropagationDelaySpikeAdaptationFactor * float64(o.lastPropagationDelay-o.propagationDelaySpike))
}
if o.propagationDelayDeltaHighCount >= o.params.PropagationDelayDeltaHighResetNumReports && time.Since(o.propagationDelayDeltaHighStartTime) >= o.params.PropagationDelayDeltaHighResetWait {
stepChange = true
initPropagationDelay(o.propagationDelaySpike)
}
} else {
resetDelta()
}
} else {
resetDelta()
factor := o.params.PropagationDelayFallFactor
if o.lastPropagationDelay > o.estimatedPropagationDelay {
factor = o.params.PropagationDelayRiseFactor
}
o.estimatedPropagationDelay += time.Duration(factor * float64(o.lastPropagationDelay-o.estimatedPropagationDelay))
}
if o.lastDeltaPropagationDelay < o.params.PropagationDelayDeltaLongTermAdaptationThreshold {
if o.longTermDeltaPropagationDelay == 0 {
o.longTermDeltaPropagationDelay = o.lastDeltaPropagationDelay
} else {
// do not adapt to large +ve spikes, can happen when channel is congested and reports are delivered very late
// if the spike is in fact a path change, it will persist and handled by path change detection above
sinceLast := senderClockTime.Sub(o.lastSenderClockTime)
adaptationFactor := min(1.0, float64(sinceLast)/float64(o.params.PropagationDelayDeltaHighResetWait))
o.longTermDeltaPropagationDelay += time.Duration(adaptationFactor * float64(o.lastDeltaPropagationDelay-o.longTermDeltaPropagationDelay))
}
}
if o.longTermDeltaPropagationDelay < 0 {
o.longTermDeltaPropagationDelay = 0
}
o.lastSenderClockTime = senderClockTime
return o.estimatedPropagationDelay, stepChange
}