Use int64 nanoseconds and reduce conversion in a few places (#3159)

This commit is contained in:
Raja Subramanian
2024-11-06 12:28:30 +05:30
committed by GitHub
parent 09f140afa8
commit f3a13569ee
4 changed files with 52 additions and 52 deletions
+4 -4
View File
@@ -98,18 +98,18 @@ func (m *MetricTimestamper) maybeRunOWDEstimator(batch *livekit.MetricsBatch) in
if time.Since(m.lastOWDEstimatorRunAt) < m.params.Config.OneWayDelayEstimatorMinInterval &&
m.batchesSinceLastOWDEstimatorRun < m.params.Config.OneWayDelayEstimatorMaxBatch {
m.batchesSinceLastOWDEstimatorRun++
return m.owdEstimator.EstimatedPropagationDelay().Nanoseconds()
return m.owdEstimator.EstimatedPropagationDelay()
}
senderClockTime := batch.GetTimestampMs()
if senderClockTime == 0 {
m.batchesSinceLastOWDEstimatorRun++
return m.owdEstimator.EstimatedPropagationDelay().Nanoseconds()
return m.owdEstimator.EstimatedPropagationDelay()
}
m.lastOWDEstimatorRunAt = time.Now()
m.batchesSinceLastOWDEstimatorRun = 1
estimatedOWD, _ := m.owdEstimator.Update(time.UnixMilli(senderClockTime), mono.Now())
return estimatedOWD.Nanoseconds()
estimatedOWDNs, _ := m.owdEstimator.Update(senderClockTime*1e6, mono.UnixNano())
return estimatedOWDNs
}
+3 -3
View File
@@ -495,8 +495,8 @@ func (r *RTPStatsReceiver) checkRTPClockSkewAgainstMediaPathForSenderReport(srDa
}
func (r *RTPStatsReceiver) updatePropagationDelayAndRecordSenderReport(srData *livekit.RTCPSenderReportState) {
senderClockTime := mediatransportutil.NtpTime(srData.NtpTimestamp).Time()
estimatedPropagationDelay, stepChange := r.propagationDelayEstimator.Update(senderClockTime, time.Unix(0, srData.At))
senderClockTime := mediatransportutil.NtpTime(srData.NtpTimestamp).Time().UnixNano()
estimatedPropagationDelay, stepChange := r.propagationDelayEstimator.Update(senderClockTime, srData.At)
if stepChange {
r.logger.Debugw(
"propagation delay step change",
@@ -509,7 +509,7 @@ func (r *RTPStatsReceiver) updatePropagationDelayAndRecordSenderReport(srData *l
r.srFirst = srData
}
// adjust receive time to estimated propagation delay
srData.AtAdjusted = senderClockTime.Add(estimatedPropagationDelay).UnixNano()
srData.AtAdjusted = senderClockTime + estimatedPropagationDelay
r.srNewest = srData
}
+2 -2
View File
@@ -123,13 +123,13 @@ func (t *TrendDetector) AddValue(value int64) {
// Bandwidth estimate is received periodically. If the estimate does not change, it will be repeated.
// When there is congestion, there are several estimates received with decreasing values.
//
// Using a sliding window, collapsing repeated values and waiting for falling trend is to ensure that
// Using a sliding window, collapsing repeated values and waiting for falling trend to ensure that
// the reaction is not too fast, i. e. reacting to falling values too quick could mean a lot of re-allocation
// resulting in layer switches, key frames and more congestion.
//
// But, on the flip side, estimate could fall once or twice within a sliding window and stay there.
// In those cases, using a collapse window to record a value even if it is duplicate. By doing that,
// a trend could be detected eventually. If will be delayed, but that is fine with slow changing estimates.
// a trend could be detected eventually. It will be delayed, but that is fine with slow changing estimates.
var lastSample *trendDetectorSample
if len(t.samples) != 0 {
lastSample = &t.samples[len(t.samples)-1]
+43 -43
View File
@@ -27,7 +27,7 @@ type OWDEstimatorParams struct {
PropagationDelaySpikeAdaptationFactor float64
PropagationDelayDeltaThresholdMin time.Duration
PropagationDelayDeltaThresholdMaxFactor int
PropagationDelayDeltaThresholdMaxFactor int64
PropagationDelayDeltaHighResetNumReports int
PropagationDelayDeltaHighResetWait time.Duration
PropagationDelayDeltaLongTermAdaptationThreshold time.Duration
@@ -72,14 +72,14 @@ type OWDEstimator struct {
params OWDEstimatorParams
initialized bool
lastSenderClockTime time.Time
lastPropagationDelay time.Duration
lastDeltaPropagationDelay time.Duration
estimatedPropagationDelay time.Duration
longTermDeltaPropagationDelay time.Duration
lastSenderClockTimeNs int64
lastPropagationDelayNs int64
lastDeltaPropagationDelayNs int64
estimatedPropagationDelayNs int64
longTermDeltaPropagationDelayNs int64
propagationDelayDeltaHighCount int
propagationDelayDeltaHighStartTime time.Time
propagationDelaySpike time.Duration
propagationDelaySpikeNs int64
}
func NewOWDEstimator(params OWDEstimatorParams) *OWDEstimator {
@@ -90,58 +90,58 @@ func NewOWDEstimator(params OWDEstimatorParams) *OWDEstimator {
func (o *OWDEstimator) MarshalLogObject(e zapcore.ObjectEncoder) error {
if o != nil {
e.AddTime("lastSenderClockTime", o.lastSenderClockTime)
e.AddDuration("lastPropagationDelay", o.lastPropagationDelay)
e.AddDuration("lastDeltaPropagationDelay", o.lastDeltaPropagationDelay)
e.AddDuration("estimatedPropagationDelay", o.estimatedPropagationDelay)
e.AddDuration("longTermDeltaPropagationDelay", o.longTermDeltaPropagationDelay)
e.AddTime("lastSenderClockTimeNs", time.Unix(0, o.lastSenderClockTimeNs))
e.AddDuration("lastPropagationDelayNs", time.Duration(o.lastPropagationDelayNs))
e.AddDuration("lastDeltaPropagationDelayNs", time.Duration(o.lastDeltaPropagationDelayNs))
e.AddDuration("estimatedPropagationDelayNs", time.Duration(o.estimatedPropagationDelayNs))
e.AddDuration("longTermDeltaPropagationDelayNs", time.Duration(o.longTermDeltaPropagationDelayNs))
e.AddInt("propagationDelayDeltaHighCount", o.propagationDelayDeltaHighCount)
e.AddTime("propagationDelayDeltaHighStartTime", o.propagationDelayDeltaHighStartTime)
e.AddDuration("propagationDelaySpike", o.propagationDelaySpike)
e.AddDuration("propagationDelaySpikeNs", time.Duration(o.propagationDelaySpikeNs))
}
return nil
}
func (o *OWDEstimator) Update(senderClockTime time.Time, receiverClockTime time.Time) (time.Duration, bool) {
func (o *OWDEstimator) Update(senderClockTimeNs int64, receiverClockTimeNs int64) (int64, bool) {
resetDelta := func() {
o.propagationDelayDeltaHighCount = 0
o.propagationDelayDeltaHighStartTime = time.Time{}
o.propagationDelaySpike = 0
o.propagationDelaySpikeNs = 0
}
initPropagationDelay := func(pd time.Duration) {
o.estimatedPropagationDelay = pd
o.longTermDeltaPropagationDelay = 0
initPropagationDelay := func(pd int64) {
o.estimatedPropagationDelayNs = pd
o.longTermDeltaPropagationDelayNs = 0
resetDelta()
}
o.lastPropagationDelay = receiverClockTime.Sub(senderClockTime)
o.lastPropagationDelayNs = receiverClockTimeNs - senderClockTimeNs
if !o.initialized {
o.initialized = true
o.lastSenderClockTime = senderClockTime
initPropagationDelay(o.lastPropagationDelay)
return o.estimatedPropagationDelay, true
o.lastSenderClockTimeNs = senderClockTimeNs
initPropagationDelay(o.lastPropagationDelayNs)
return o.estimatedPropagationDelayNs, true
}
stepChange := false
o.lastDeltaPropagationDelay = o.lastPropagationDelay - o.estimatedPropagationDelay
o.lastDeltaPropagationDelayNs = o.lastPropagationDelayNs - o.estimatedPropagationDelayNs
// check for path changes, i. e. a step jump increase in propagation delay observed over time
if o.lastDeltaPropagationDelay > o.params.PropagationDelayDeltaThresholdMin { // ignore small changes for path change consideration
if o.longTermDeltaPropagationDelay != 0 &&
o.lastDeltaPropagationDelay > o.longTermDeltaPropagationDelay*time.Duration(o.params.PropagationDelayDeltaThresholdMaxFactor) {
if o.lastDeltaPropagationDelayNs > o.params.PropagationDelayDeltaThresholdMin.Nanoseconds() { // ignore small changes for path change consideration
if o.longTermDeltaPropagationDelayNs != 0 &&
o.lastDeltaPropagationDelayNs > o.longTermDeltaPropagationDelayNs*o.params.PropagationDelayDeltaThresholdMaxFactor {
o.propagationDelayDeltaHighCount++
if o.propagationDelayDeltaHighStartTime.IsZero() {
o.propagationDelayDeltaHighStartTime = time.Now()
}
if o.propagationDelaySpike == 0 {
o.propagationDelaySpike = o.lastPropagationDelay
if o.propagationDelaySpikeNs == 0 {
o.propagationDelaySpikeNs = o.lastPropagationDelayNs
} else {
o.propagationDelaySpike += time.Duration(o.params.PropagationDelaySpikeAdaptationFactor * float64(o.lastPropagationDelay-o.propagationDelaySpike))
o.propagationDelaySpikeNs += int64(o.params.PropagationDelaySpikeAdaptationFactor * float64(o.lastPropagationDelayNs-o.propagationDelaySpikeNs))
}
if o.propagationDelayDeltaHighCount >= o.params.PropagationDelayDeltaHighResetNumReports && time.Since(o.propagationDelayDeltaHighStartTime) >= o.params.PropagationDelayDeltaHighResetWait {
stepChange = true
initPropagationDelay(o.propagationDelaySpike)
initPropagationDelay(o.propagationDelaySpikeNs)
}
} else {
resetDelta()
@@ -150,30 +150,30 @@ func (o *OWDEstimator) Update(senderClockTime time.Time, receiverClockTime time.
resetDelta()
factor := o.params.PropagationDelayFallFactor
if o.lastPropagationDelay > o.estimatedPropagationDelay {
if o.lastPropagationDelayNs > o.estimatedPropagationDelayNs {
factor = o.params.PropagationDelayRiseFactor
}
o.estimatedPropagationDelay += time.Duration(factor * float64(o.lastPropagationDelay-o.estimatedPropagationDelay))
o.estimatedPropagationDelayNs += int64(factor * float64(o.lastPropagationDelayNs-o.estimatedPropagationDelayNs))
}
if o.lastDeltaPropagationDelay < o.params.PropagationDelayDeltaLongTermAdaptationThreshold {
if o.longTermDeltaPropagationDelay == 0 {
o.longTermDeltaPropagationDelay = o.lastDeltaPropagationDelay
if o.lastDeltaPropagationDelayNs < o.params.PropagationDelayDeltaLongTermAdaptationThreshold.Nanoseconds() {
if o.longTermDeltaPropagationDelayNs == 0 {
o.longTermDeltaPropagationDelayNs = o.lastDeltaPropagationDelayNs
} else {
// do not adapt to large +ve spikes, can happen when channel is congested and reports are delivered very late
// if the spike is in fact a path change, it will persist and handled by path change detection above
sinceLast := senderClockTime.Sub(o.lastSenderClockTime)
sinceLast := senderClockTimeNs - o.lastSenderClockTimeNs
adaptationFactor := min(1.0, float64(sinceLast)/float64(o.params.PropagationDelayDeltaHighResetWait))
o.longTermDeltaPropagationDelay += time.Duration(adaptationFactor * float64(o.lastDeltaPropagationDelay-o.longTermDeltaPropagationDelay))
o.longTermDeltaPropagationDelayNs += int64(adaptationFactor * float64(o.lastDeltaPropagationDelayNs-o.longTermDeltaPropagationDelayNs))
}
}
if o.longTermDeltaPropagationDelay < 0 {
o.longTermDeltaPropagationDelay = 0
if o.longTermDeltaPropagationDelayNs < 0 {
o.longTermDeltaPropagationDelayNs = 0
}
o.lastSenderClockTime = senderClockTime
return o.estimatedPropagationDelay, stepChange
o.lastSenderClockTimeNs = senderClockTimeNs
return o.estimatedPropagationDelayNs, stepChange
}
func (o *OWDEstimator) EstimatedPropagationDelay() time.Duration {
return o.estimatedPropagationDelay
func (o *OWDEstimator) EstimatedPropagationDelay() int64 {
return o.estimatedPropagationDelayNs
}