mirror of
https://github.com/livekit/livekit.git
synced 2026-05-15 16:06:09 +00:00
Prevent large spikes in propagation delay (#2615)
* Prevent large spikes in propagation delay
A few tweaks
- Large spike in propagation delay due to congested channel results in
long term estimate getting high value. Ignore outliers in long term
estimate.
- Introduce a new field for adjusted arrival time as adjusting the
arrival time in place meant it got applied again across the relay and
that caused different propagation delay on remote nodes.
- Reset path change counters as long as there is any sample that is not
higher than the multiple of long term. There was a case of
o Sample with high value that triggered path change start.
o Then some samples with high enough delta, but did not meet the
criteria for increasing counter further.
o Some time later, another sample met the threshold and that triggered
a path change re-init.
* do not adapt to large delta
This commit is contained in:
@@ -114,6 +114,7 @@ type RTCPSenderReportData struct {
|
||||
RTPTimestampExt uint64
|
||||
NTPTimestamp mediatransportutil.NtpTime
|
||||
At time.Time
|
||||
AtAdjusted time.Time
|
||||
}
|
||||
|
||||
func (r *RTCPSenderReportData) ToString() string {
|
||||
@@ -121,7 +122,7 @@ func (r *RTCPSenderReportData) ToString() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
return fmt.Sprintf("ntp: %s, rtp: %d, extRtp: %d, at: %s", r.NTPTimestamp.Time().String(), r.RTPTimestamp, r.RTPTimestampExt, r.At.String())
|
||||
return fmt.Sprintf("ntp: %s, rtp: %d, extRtp: %d, at: %s, atAdj: %s", r.NTPTimestamp.Time().String(), r.RTPTimestamp, r.RTPTimestampExt, r.At.String(), r.AtAdjusted.String())
|
||||
}
|
||||
|
||||
func (r *RTCPSenderReportData) MarshalLogObject(e zapcore.ObjectEncoder) error {
|
||||
@@ -133,6 +134,7 @@ func (r *RTCPSenderReportData) MarshalLogObject(e zapcore.ObjectEncoder) error {
|
||||
e.AddUint32("RTPTimestamp", r.RTPTimestamp)
|
||||
e.AddUint64("RTPTimestampExt", r.RTPTimestampExt)
|
||||
e.AddTime("At", r.At)
|
||||
e.AddTime("AtAdjusted", r.AtAdjusted)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -495,7 +497,7 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *RTCPSenderReportData,
|
||||
// abnormal delay (maybe due to pacing or maybe due to queuing
|
||||
// in some network element along the way), push back first time
|
||||
// to an earlier instance.
|
||||
timeSinceReceive := time.Since(srData.At)
|
||||
timeSinceReceive := time.Since(srData.AtAdjusted)
|
||||
extNowTS := srData.RTPTimestampExt - tsOffset + uint64(timeSinceReceive.Nanoseconds()*int64(r.params.ClockRate)/1e9)
|
||||
samplesDiff := int64(extNowTS - extStartTS)
|
||||
if samplesDiff < 0 {
|
||||
|
||||
@@ -43,20 +43,19 @@ const (
|
||||
|
||||
cPropagationDelaySpikeAdaptationFactor = float64(0.5)
|
||||
|
||||
cPropagationDelayDeltaMaxInterval = 10 * time.Second
|
||||
|
||||
// To account for path changes mid-stream, if the delta of the propagation delay is consistently higher, reset.
|
||||
// Reset at whichever of the below happens later.
|
||||
// 1. 10 seconds of persistent high delta.
|
||||
// 2. at least 2 reports with high delta.
|
||||
// 2. at least 2 consecutive reports with high delta.
|
||||
//
|
||||
// A long term version of delta of propagation delay is maintained and delta propagation delay exceeding
|
||||
// a factor of the long term version is considered a sharp increase. That will trigger the start of the
|
||||
// A long term estimate of delta of propagation delay is maintained and delta propagation delay exceeding
|
||||
// a factor of the long term estimate is considered a sharp increase. That will trigger the start of the
|
||||
// path change condition and if it persists, propagation delay will be reset.
|
||||
cPropagationDelayDeltaThresholdMin = 10 * time.Millisecond
|
||||
cPropagationDelayDeltaThresholdMaxFactor = 2
|
||||
cPropagationDelayDeltaHighResetNumReports = 2
|
||||
cPropagationDelayDeltaHighResetWait = 10 * time.Second
|
||||
cPropagationDelayDeltaThresholdMin = 10 * time.Millisecond
|
||||
cPropagationDelayDeltaThresholdMaxFactor = 2
|
||||
cPropagationDelayDeltaHighResetNumReports = 2
|
||||
cPropagationDelayDeltaHighResetWait = 10 * time.Second
|
||||
cPropagationDelayDeltaLongTermAdaptationThreshold = 50 * time.Millisecond
|
||||
)
|
||||
|
||||
type RTPFlowState struct {
|
||||
@@ -369,6 +368,7 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData)
|
||||
"receivedDeltaPropagationDelay", deltaPropagationDelay.String(),
|
||||
"deltaHighCount", r.propagationDelayDeltaHighCount,
|
||||
"sinceDeltaHighStart", time.Since(r.propagationDelayDeltaHighStartTime).String(),
|
||||
"propagationDelaySpike", r.propagationDelaySpike.String(),
|
||||
"first", r.srFirst,
|
||||
"last", r.srNewest,
|
||||
"current", &srDataCopy,
|
||||
@@ -395,9 +395,11 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData)
|
||||
r.logger.Debugw("initializing propagation delay", getPropagationFields()...)
|
||||
} else {
|
||||
deltaPropagationDelay = propagationDelay - r.propagationDelay
|
||||
if deltaPropagationDelay.Abs() > cPropagationDelayDeltaThresholdMin { // ignore small changes for path change consideration
|
||||
if r.longTermDeltaPropagationDelay != 0 && deltaPropagationDelay > 0 && deltaPropagationDelay > r.longTermDeltaPropagationDelay*time.Duration(cPropagationDelayDeltaThresholdMaxFactor) {
|
||||
r.logger.Debugw("sharp increase in propagation delay, skipping", getPropagationFields()...) // TODO-REMOVE
|
||||
if deltaPropagationDelay > cPropagationDelayDeltaThresholdMin { // ignore small changes for path change consideration
|
||||
if r.longTermDeltaPropagationDelay != 0 &&
|
||||
deltaPropagationDelay > 0 &&
|
||||
deltaPropagationDelay > r.longTermDeltaPropagationDelay*time.Duration(cPropagationDelayDeltaThresholdMaxFactor) {
|
||||
r.logger.Debugw("sharp increase in propagation delay", getPropagationFields()...) // TODO-REMOVE
|
||||
r.propagationDelayDeltaHighCount++
|
||||
if r.propagationDelayDeltaHighStartTime.IsZero() {
|
||||
r.propagationDelayDeltaHighStartTime = time.Now()
|
||||
@@ -412,6 +414,8 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData)
|
||||
r.logger.Debugw("re-initializing propagation delay", append(getPropagationFields(), "newPropagationDelay", propagationDelay.String())...)
|
||||
initPropagationDelay(r.propagationDelaySpike)
|
||||
}
|
||||
} else {
|
||||
resetDelta()
|
||||
}
|
||||
} else {
|
||||
resetDelta()
|
||||
@@ -432,13 +436,17 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData)
|
||||
if r.longTermDeltaPropagationDelay == 0 {
|
||||
r.longTermDeltaPropagationDelay = deltaPropagationDelay
|
||||
} else {
|
||||
sinceLastReport := srDataCopy.NTPTimestamp.Time().Sub(r.srNewest.NTPTimestamp.Time())
|
||||
adaptationFactor := min(1.0, float64(sinceLastReport)/float64(cPropagationDelayDeltaMaxInterval))
|
||||
r.longTermDeltaPropagationDelay += time.Duration(adaptationFactor * float64(deltaPropagationDelay-r.longTermDeltaPropagationDelay))
|
||||
if deltaPropagationDelay < cPropagationDelayDeltaLongTermAdaptationThreshold {
|
||||
// do not adapt to large +ve spikes, can happen when channel is congested and reports are delivered very late
|
||||
// if the spike is in fact a path change, it will persist and handled by path change detection above
|
||||
sinceLastReport := srDataCopy.NTPTimestamp.Time().Sub(r.srNewest.NTPTimestamp.Time())
|
||||
adaptationFactor := min(1.0, float64(sinceLastReport)/float64(cPropagationDelayDeltaHighResetWait))
|
||||
r.longTermDeltaPropagationDelay += time.Duration(adaptationFactor * float64(deltaPropagationDelay-r.longTermDeltaPropagationDelay))
|
||||
}
|
||||
}
|
||||
}
|
||||
// adjust receive time to estimated propagation delay
|
||||
srDataCopy.At = ntpTime.Add(r.propagationDelay)
|
||||
srDataCopy.AtAdjusted = ntpTime.Add(r.propagationDelay)
|
||||
r.srNewest = &srDataCopy
|
||||
|
||||
r.maybeAdjustFirstPacketTime(r.srNewest, 0, r.timestamp.GetExtendedStart())
|
||||
|
||||
@@ -633,8 +633,8 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, publisherSRData *RTCPS
|
||||
return nil
|
||||
}
|
||||
|
||||
timeSincePublisherSR := time.Since(publisherSRData.At)
|
||||
now := publisherSRData.At.Add(timeSincePublisherSR)
|
||||
timeSincePublisherSR := time.Since(publisherSRData.AtAdjusted)
|
||||
now := publisherSRData.AtAdjusted.Add(timeSincePublisherSR)
|
||||
nowNTP := mediatransportutil.ToNtpTime(now)
|
||||
nowRTPExt := publisherSRData.RTPTimestampExt - tsOffset + uint64(timeSincePublisherSR.Nanoseconds()*int64(r.params.ClockRate)/1e9)
|
||||
|
||||
@@ -643,6 +643,7 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, publisherSRData *RTCPS
|
||||
RTPTimestamp: uint32(nowRTPExt),
|
||||
RTPTimestampExt: nowRTPExt,
|
||||
At: now,
|
||||
AtAdjusted: now,
|
||||
}
|
||||
|
||||
getFields := func() []interface{} {
|
||||
|
||||
Reference in New Issue
Block a user