From 00ff2ab941442f535ccf11b9f6bb5c6cb5208d64 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Fri, 26 Sep 2025 18:57:21 +0530 Subject: [PATCH] Adjust for hold time when fowarding RTCP report. (#3956) * Adjust for hold time when fowarding RTCP report. When passing through RTCP sender report, holding it for some time before sending means the remote receiver could see varying amount of propagation delay if the remote uses something like local_clock - ntp_sender_report_time and adapting to it. Ideally, SFU should just forward RTCP Sender Report, but the current pull model to group RTCP sender reports makes it a bigger change. So, adjust it by hold time. Also add a initial condition for one-way-delay estimator which can init with a smaller value of latency if the first sample to measure one-way-delay itself experienced higher delay than the prevailing conditions. * variable name * log as duration --- pkg/rtc/participant.go | 1 - pkg/sfu/rtpstats/rtpstats_base.go | 14 ++++++++++---- pkg/sfu/rtpstats/rtpstats_receiver.go | 4 +++- pkg/sfu/rtpstats/rtpstats_sender.go | 11 ++++++----- pkg/sfu/utils/owd_estimator.go | 19 ++++++++++++++++++- 5 files changed, 37 insertions(+), 12 deletions(-) diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index a350055a7..eac717003 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -3457,7 +3457,6 @@ func (p *ParticipantImpl) handleTrackPublished(track types.MediaTrack, isMigrate p.Identity(), track.ToProto(), ) - } p.pendingTracksLock.Lock() diff --git a/pkg/sfu/rtpstats/rtpstats_base.go b/pkg/sfu/rtpstats/rtpstats_base.go index 8844581e6..1d3260234 100644 --- a/pkg/sfu/rtpstats/rtpstats_base.go +++ b/pkg/sfu/rtpstats/rtpstats_base.go @@ -381,7 +381,11 @@ func (r *rtpStatsBase) GetRtt() uint32 { return r.rtt } -func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *livekit.RTCPSenderReportState, tsOffset uint64, extStartTS uint64) (err error, loggingFields []interface{}) { +func (r *rtpStatsBase) maybeAdjustFirstPacketTime( + srData *livekit.RTCPSenderReportState, + tsOffset uint64, + extStartTS uint64, +) (adjustment int64, err error, loggingFields []interface{}) { nowNano := mono.UnixNano() if time.Duration(nowNano-r.startTime) > cFirstPacketTimeAdjustWindow { return @@ -405,6 +409,7 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *livekit.RTCPSenderRepo timeSinceFirst := time.Duration(nowNano - r.firstTime) now := r.firstTime + timeSinceFirst.Nanoseconds() firstTime := now - samplesDuration.Nanoseconds() + adjustment = r.firstTime - firstTime getFields := func() []interface{} { return []interface{}{ @@ -412,7 +417,7 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *livekit.RTCPSenderRepo "nowTime", time.Unix(0, now), "before", time.Unix(0, r.firstTime), "after", time.Unix(0, firstTime), - "adjustment", time.Duration(r.firstTime - firstTime), + "adjustment", time.Duration(adjustment), "extNowTS", extNowTS, "extStartTS", extStartTS, "srData", WrappedRTCPSenderReportStateLogger{srData}, @@ -425,15 +430,16 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *livekit.RTCPSenderRepo } if firstTime < r.firstTime { - if r.firstTime-firstTime > cFirstPacketTimeAdjustThreshold { + if adjustment > cFirstPacketTimeAdjustThreshold { err = errors.New("adjusting first packet time, too big, ignoring") loggingFields = getFields() } else { r.logger.Debugw("adjusting first packet time", getFields()...) - r.firstTimeAdjustment += time.Duration(r.firstTime - firstTime) + r.firstTimeAdjustment += time.Duration(adjustment) r.firstTime = firstTime } } + return } diff --git a/pkg/sfu/rtpstats/rtpstats_receiver.go b/pkg/sfu/rtpstats/rtpstats_receiver.go index 35531e640..4b2424f1c 100644 --- a/pkg/sfu/rtpstats/rtpstats_receiver.go +++ b/pkg/sfu/rtpstats/rtpstats_receiver.go @@ -542,9 +542,11 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *livekit.RTCPSenderRep r.updatePropagationDelayAndRecordSenderReport(srDataExt) r.checkRTPClockSkewAgainstMediaPathForSenderReport(srDataExt) - if err, loggingFields := r.maybeAdjustFirstPacketTime(r.srNewest, 0, r.timestamp.GetExtendedStart()); err != nil { + adjustment, err, loggingFields := r.maybeAdjustFirstPacketTime(r.srNewest, 0, r.timestamp.GetExtendedStart()) + if err != nil { r.logger.Infow(err.Error(), append(loggingFields, "rtpStats", lockedRTPStatsReceiverLogEncoder{r})...) } + r.propagationDelayEstimator.InitialAdjustment(adjustment) return true } diff --git a/pkg/sfu/rtpstats/rtpstats_sender.go b/pkg/sfu/rtpstats/rtpstats_sender.go index a361231c8..db43fa21c 100644 --- a/pkg/sfu/rtpstats/rtpstats_sender.go +++ b/pkg/sfu/rtpstats/rtpstats_sender.go @@ -800,7 +800,7 @@ func (r *RTPStatsSender) MaybeAdjustFirstPacketTime(publisherSRData *livekit.RTC return } - if err, loggingFields := r.maybeAdjustFirstPacketTime(publisherSRData, tsOffset, r.extStartTS); err != nil { + if _, err, loggingFields := r.maybeAdjustFirstPacketTime(publisherSRData, tsOffset, r.extStartTS); err != nil { r.logger.Infow(err.Error(), append(loggingFields, "rtpStats", lockedRTPStatsSenderLogEncoder{r})...) } } @@ -835,11 +835,12 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, publisherSRData *livek nowRTPExt uint64 ) if passThrough { - reportTime = publisherSRData.At - reportTimeAdjusted = publisherSRData.AtAdjusted + timeSincePublisherSR := time.Duration(mono.UnixNano() - publisherSRData.At) + reportTime = publisherSRData.At + timeSincePublisherSR.Nanoseconds() + reportTimeAdjusted = publisherSRData.AtAdjusted + timeSincePublisherSR.Nanoseconds() - nowNTP = mediatransportutil.NtpTime(publisherSRData.NtpTimestamp) - nowRTPExt = publisherSRData.RtpTimestampExt - tsOffset + nowNTP = mediatransportutil.ToNtpTime(mediatransportutil.NtpTime(publisherSRData.NtpTimestamp).Time().Add(timeSincePublisherSR)) + nowRTPExt = publisherSRData.RtpTimestampExt - tsOffset + uint64(timeSincePublisherSR.Nanoseconds()*int64(r.params.ClockRate)/1e9) } else { timeSincePublisherSRAdjusted := time.Duration(mono.UnixNano() - publisherSRData.AtAdjusted) reportTimeAdjusted = publisherSRData.AtAdjusted + timeSincePublisherSRAdjusted.Nanoseconds() diff --git a/pkg/sfu/utils/owd_estimator.go b/pkg/sfu/utils/owd_estimator.go index b25e2604b..7267262c9 100644 --- a/pkg/sfu/utils/owd_estimator.go +++ b/pkg/sfu/utils/owd_estimator.go @@ -34,7 +34,7 @@ type OWDEstimatorParams struct { } var OWDEstimatorParamsDefault = OWDEstimatorParams{ - // OWD (One-Way-Delay) Estimator is used to estimate propagation delay between sender and receicer. + // OWD (One-Way-Delay) Estimator is used to estimate propagation delay between sender and receiver. // As they operate on different clock domains, it is not possible to get exact propagation delay easily. // So, this module is an estimator using a simple approach explained below. It should not be used for // things that require high accuracy. @@ -72,6 +72,7 @@ type OWDEstimator struct { params OWDEstimatorParams initialized bool + initialAdjustmentDone bool lastSenderClockTimeNs int64 lastPropagationDelayNs int64 lastDeltaPropagationDelayNs int64 @@ -174,6 +175,22 @@ func (o *OWDEstimator) Update(senderClockTimeNs int64, receiverClockTimeNs int64 return o.estimatedPropagationDelayNs, stepChange } +func (o *OWDEstimator) InitialAdjustment(adjustmentNs int64) int64 { + if o.initialAdjustmentDone { + return o.estimatedPropagationDelayNs + } + + o.initialAdjustmentDone = true + // one time adjustment at init + // example: when this is used to measure one-way-delay of RTCP sender reports, + // it is possible that the first sender report is delayed and experiences more + // than existing propagation delay. This allows adjustment of initial estimate. + if adjustmentNs < 0 && -adjustmentNs < o.estimatedPropagationDelayNs { + o.estimatedPropagationDelayNs += adjustmentNs + } + return o.estimatedPropagationDelayNs +} + func (o *OWDEstimator) EstimatedPropagationDelay() int64 { return o.estimatedPropagationDelayNs }