From f3a13569eeb3ab36cc1cdb2b54721577defa5881 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Wed, 6 Nov 2024 12:28:30 +0530 Subject: [PATCH] Use int64 nanoseconds and reduce conversion in a few places (#3159) --- pkg/metric/metric_timestamper.go | 8 +-- pkg/sfu/rtpstats/rtpstats_receiver.go | 6 +- pkg/sfu/streamallocator/trenddetector.go | 4 +- pkg/sfu/utils/owd_estimator.go | 86 ++++++++++++------------ 4 files changed, 52 insertions(+), 52 deletions(-) diff --git a/pkg/metric/metric_timestamper.go b/pkg/metric/metric_timestamper.go index 614cf51c8..ceacf634a 100644 --- a/pkg/metric/metric_timestamper.go +++ b/pkg/metric/metric_timestamper.go @@ -98,18 +98,18 @@ func (m *MetricTimestamper) maybeRunOWDEstimator(batch *livekit.MetricsBatch) in if time.Since(m.lastOWDEstimatorRunAt) < m.params.Config.OneWayDelayEstimatorMinInterval && m.batchesSinceLastOWDEstimatorRun < m.params.Config.OneWayDelayEstimatorMaxBatch { m.batchesSinceLastOWDEstimatorRun++ - return m.owdEstimator.EstimatedPropagationDelay().Nanoseconds() + return m.owdEstimator.EstimatedPropagationDelay() } senderClockTime := batch.GetTimestampMs() if senderClockTime == 0 { m.batchesSinceLastOWDEstimatorRun++ - return m.owdEstimator.EstimatedPropagationDelay().Nanoseconds() + return m.owdEstimator.EstimatedPropagationDelay() } m.lastOWDEstimatorRunAt = time.Now() m.batchesSinceLastOWDEstimatorRun = 1 - estimatedOWD, _ := m.owdEstimator.Update(time.UnixMilli(senderClockTime), mono.Now()) - return estimatedOWD.Nanoseconds() + estimatedOWDNs, _ := m.owdEstimator.Update(senderClockTime*1e6, mono.UnixNano()) + return estimatedOWDNs } diff --git a/pkg/sfu/rtpstats/rtpstats_receiver.go b/pkg/sfu/rtpstats/rtpstats_receiver.go index ef5308ce0..a3dd7936e 100644 --- a/pkg/sfu/rtpstats/rtpstats_receiver.go +++ b/pkg/sfu/rtpstats/rtpstats_receiver.go @@ -495,8 +495,8 @@ func (r *RTPStatsReceiver) checkRTPClockSkewAgainstMediaPathForSenderReport(srDa } func (r *RTPStatsReceiver) updatePropagationDelayAndRecordSenderReport(srData *livekit.RTCPSenderReportState) { - senderClockTime := mediatransportutil.NtpTime(srData.NtpTimestamp).Time() - estimatedPropagationDelay, stepChange := r.propagationDelayEstimator.Update(senderClockTime, time.Unix(0, srData.At)) + senderClockTime := mediatransportutil.NtpTime(srData.NtpTimestamp).Time().UnixNano() + estimatedPropagationDelay, stepChange := r.propagationDelayEstimator.Update(senderClockTime, srData.At) if stepChange { r.logger.Debugw( "propagation delay step change", @@ -509,7 +509,7 @@ func (r *RTPStatsReceiver) updatePropagationDelayAndRecordSenderReport(srData *l r.srFirst = srData } // adjust receive time to estimated propagation delay - srData.AtAdjusted = senderClockTime.Add(estimatedPropagationDelay).UnixNano() + srData.AtAdjusted = senderClockTime + estimatedPropagationDelay r.srNewest = srData } diff --git a/pkg/sfu/streamallocator/trenddetector.go b/pkg/sfu/streamallocator/trenddetector.go index 7c0938482..a42e54a1f 100644 --- a/pkg/sfu/streamallocator/trenddetector.go +++ b/pkg/sfu/streamallocator/trenddetector.go @@ -123,13 +123,13 @@ func (t *TrendDetector) AddValue(value int64) { // Bandwidth estimate is received periodically. If the estimate does not change, it will be repeated. // When there is congestion, there are several estimates received with decreasing values. // - // Using a sliding window, collapsing repeated values and waiting for falling trend is to ensure that + // Using a sliding window, collapsing repeated values and waiting for falling trend to ensure that // the reaction is not too fast, i. e. reacting to falling values too quick could mean a lot of re-allocation // resulting in layer switches, key frames and more congestion. // // But, on the flip side, estimate could fall once or twice within a sliding window and stay there. // In those cases, using a collapse window to record a value even if it is duplicate. By doing that, - // a trend could be detected eventually. If will be delayed, but that is fine with slow changing estimates. + // a trend could be detected eventually. It will be delayed, but that is fine with slow changing estimates. var lastSample *trendDetectorSample if len(t.samples) != 0 { lastSample = &t.samples[len(t.samples)-1] diff --git a/pkg/sfu/utils/owd_estimator.go b/pkg/sfu/utils/owd_estimator.go index 7a5d22b91..b25e2604b 100644 --- a/pkg/sfu/utils/owd_estimator.go +++ b/pkg/sfu/utils/owd_estimator.go @@ -27,7 +27,7 @@ type OWDEstimatorParams struct { PropagationDelaySpikeAdaptationFactor float64 PropagationDelayDeltaThresholdMin time.Duration - PropagationDelayDeltaThresholdMaxFactor int + PropagationDelayDeltaThresholdMaxFactor int64 PropagationDelayDeltaHighResetNumReports int PropagationDelayDeltaHighResetWait time.Duration PropagationDelayDeltaLongTermAdaptationThreshold time.Duration @@ -72,14 +72,14 @@ type OWDEstimator struct { params OWDEstimatorParams initialized bool - lastSenderClockTime time.Time - lastPropagationDelay time.Duration - lastDeltaPropagationDelay time.Duration - estimatedPropagationDelay time.Duration - longTermDeltaPropagationDelay time.Duration + lastSenderClockTimeNs int64 + lastPropagationDelayNs int64 + lastDeltaPropagationDelayNs int64 + estimatedPropagationDelayNs int64 + longTermDeltaPropagationDelayNs int64 propagationDelayDeltaHighCount int propagationDelayDeltaHighStartTime time.Time - propagationDelaySpike time.Duration + propagationDelaySpikeNs int64 } func NewOWDEstimator(params OWDEstimatorParams) *OWDEstimator { @@ -90,58 +90,58 @@ func NewOWDEstimator(params OWDEstimatorParams) *OWDEstimator { func (o *OWDEstimator) MarshalLogObject(e zapcore.ObjectEncoder) error { if o != nil { - e.AddTime("lastSenderClockTime", o.lastSenderClockTime) - e.AddDuration("lastPropagationDelay", o.lastPropagationDelay) - e.AddDuration("lastDeltaPropagationDelay", o.lastDeltaPropagationDelay) - e.AddDuration("estimatedPropagationDelay", o.estimatedPropagationDelay) - e.AddDuration("longTermDeltaPropagationDelay", o.longTermDeltaPropagationDelay) + e.AddTime("lastSenderClockTimeNs", time.Unix(0, o.lastSenderClockTimeNs)) + e.AddDuration("lastPropagationDelayNs", time.Duration(o.lastPropagationDelayNs)) + e.AddDuration("lastDeltaPropagationDelayNs", time.Duration(o.lastDeltaPropagationDelayNs)) + e.AddDuration("estimatedPropagationDelayNs", time.Duration(o.estimatedPropagationDelayNs)) + e.AddDuration("longTermDeltaPropagationDelayNs", time.Duration(o.longTermDeltaPropagationDelayNs)) e.AddInt("propagationDelayDeltaHighCount", o.propagationDelayDeltaHighCount) e.AddTime("propagationDelayDeltaHighStartTime", o.propagationDelayDeltaHighStartTime) - e.AddDuration("propagationDelaySpike", o.propagationDelaySpike) + e.AddDuration("propagationDelaySpikeNs", time.Duration(o.propagationDelaySpikeNs)) } return nil } -func (o *OWDEstimator) Update(senderClockTime time.Time, receiverClockTime time.Time) (time.Duration, bool) { +func (o *OWDEstimator) Update(senderClockTimeNs int64, receiverClockTimeNs int64) (int64, bool) { resetDelta := func() { o.propagationDelayDeltaHighCount = 0 o.propagationDelayDeltaHighStartTime = time.Time{} - o.propagationDelaySpike = 0 + o.propagationDelaySpikeNs = 0 } - initPropagationDelay := func(pd time.Duration) { - o.estimatedPropagationDelay = pd - o.longTermDeltaPropagationDelay = 0 + initPropagationDelay := func(pd int64) { + o.estimatedPropagationDelayNs = pd + o.longTermDeltaPropagationDelayNs = 0 resetDelta() } - o.lastPropagationDelay = receiverClockTime.Sub(senderClockTime) + o.lastPropagationDelayNs = receiverClockTimeNs - senderClockTimeNs if !o.initialized { o.initialized = true - o.lastSenderClockTime = senderClockTime - initPropagationDelay(o.lastPropagationDelay) - return o.estimatedPropagationDelay, true + o.lastSenderClockTimeNs = senderClockTimeNs + initPropagationDelay(o.lastPropagationDelayNs) + return o.estimatedPropagationDelayNs, true } stepChange := false - o.lastDeltaPropagationDelay = o.lastPropagationDelay - o.estimatedPropagationDelay + o.lastDeltaPropagationDelayNs = o.lastPropagationDelayNs - o.estimatedPropagationDelayNs // check for path changes, i. e. a step jump increase in propagation delay observed over time - if o.lastDeltaPropagationDelay > o.params.PropagationDelayDeltaThresholdMin { // ignore small changes for path change consideration - if o.longTermDeltaPropagationDelay != 0 && - o.lastDeltaPropagationDelay > o.longTermDeltaPropagationDelay*time.Duration(o.params.PropagationDelayDeltaThresholdMaxFactor) { + if o.lastDeltaPropagationDelayNs > o.params.PropagationDelayDeltaThresholdMin.Nanoseconds() { // ignore small changes for path change consideration + if o.longTermDeltaPropagationDelayNs != 0 && + o.lastDeltaPropagationDelayNs > o.longTermDeltaPropagationDelayNs*o.params.PropagationDelayDeltaThresholdMaxFactor { o.propagationDelayDeltaHighCount++ if o.propagationDelayDeltaHighStartTime.IsZero() { o.propagationDelayDeltaHighStartTime = time.Now() } - if o.propagationDelaySpike == 0 { - o.propagationDelaySpike = o.lastPropagationDelay + if o.propagationDelaySpikeNs == 0 { + o.propagationDelaySpikeNs = o.lastPropagationDelayNs } else { - o.propagationDelaySpike += time.Duration(o.params.PropagationDelaySpikeAdaptationFactor * float64(o.lastPropagationDelay-o.propagationDelaySpike)) + o.propagationDelaySpikeNs += int64(o.params.PropagationDelaySpikeAdaptationFactor * float64(o.lastPropagationDelayNs-o.propagationDelaySpikeNs)) } if o.propagationDelayDeltaHighCount >= o.params.PropagationDelayDeltaHighResetNumReports && time.Since(o.propagationDelayDeltaHighStartTime) >= o.params.PropagationDelayDeltaHighResetWait { stepChange = true - initPropagationDelay(o.propagationDelaySpike) + initPropagationDelay(o.propagationDelaySpikeNs) } } else { resetDelta() @@ -150,30 +150,30 @@ func (o *OWDEstimator) Update(senderClockTime time.Time, receiverClockTime time. resetDelta() factor := o.params.PropagationDelayFallFactor - if o.lastPropagationDelay > o.estimatedPropagationDelay { + if o.lastPropagationDelayNs > o.estimatedPropagationDelayNs { factor = o.params.PropagationDelayRiseFactor } - o.estimatedPropagationDelay += time.Duration(factor * float64(o.lastPropagationDelay-o.estimatedPropagationDelay)) + o.estimatedPropagationDelayNs += int64(factor * float64(o.lastPropagationDelayNs-o.estimatedPropagationDelayNs)) } - if o.lastDeltaPropagationDelay < o.params.PropagationDelayDeltaLongTermAdaptationThreshold { - if o.longTermDeltaPropagationDelay == 0 { - o.longTermDeltaPropagationDelay = o.lastDeltaPropagationDelay + if o.lastDeltaPropagationDelayNs < o.params.PropagationDelayDeltaLongTermAdaptationThreshold.Nanoseconds() { + if o.longTermDeltaPropagationDelayNs == 0 { + o.longTermDeltaPropagationDelayNs = o.lastDeltaPropagationDelayNs } else { // do not adapt to large +ve spikes, can happen when channel is congested and reports are delivered very late // if the spike is in fact a path change, it will persist and handled by path change detection above - sinceLast := senderClockTime.Sub(o.lastSenderClockTime) + sinceLast := senderClockTimeNs - o.lastSenderClockTimeNs adaptationFactor := min(1.0, float64(sinceLast)/float64(o.params.PropagationDelayDeltaHighResetWait)) - o.longTermDeltaPropagationDelay += time.Duration(adaptationFactor * float64(o.lastDeltaPropagationDelay-o.longTermDeltaPropagationDelay)) + o.longTermDeltaPropagationDelayNs += int64(adaptationFactor * float64(o.lastDeltaPropagationDelayNs-o.longTermDeltaPropagationDelayNs)) } } - if o.longTermDeltaPropagationDelay < 0 { - o.longTermDeltaPropagationDelay = 0 + if o.longTermDeltaPropagationDelayNs < 0 { + o.longTermDeltaPropagationDelayNs = 0 } - o.lastSenderClockTime = senderClockTime - return o.estimatedPropagationDelay, stepChange + o.lastSenderClockTimeNs = senderClockTimeNs + return o.estimatedPropagationDelayNs, stepChange } -func (o *OWDEstimator) EstimatedPropagationDelay() time.Duration { - return o.estimatedPropagationDelay +func (o *OWDEstimator) EstimatedPropagationDelay() int64 { + return o.estimatedPropagationDelayNs }