diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index a350055a7..eac717003 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -3457,7 +3457,6 @@ func (p *ParticipantImpl) handleTrackPublished(track types.MediaTrack, isMigrate p.Identity(), track.ToProto(), ) - } p.pendingTracksLock.Lock() diff --git a/pkg/sfu/rtpstats/rtpstats_base.go b/pkg/sfu/rtpstats/rtpstats_base.go index 8844581e6..1d3260234 100644 --- a/pkg/sfu/rtpstats/rtpstats_base.go +++ b/pkg/sfu/rtpstats/rtpstats_base.go @@ -381,7 +381,11 @@ func (r *rtpStatsBase) GetRtt() uint32 { return r.rtt } -func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *livekit.RTCPSenderReportState, tsOffset uint64, extStartTS uint64) (err error, loggingFields []interface{}) { +func (r *rtpStatsBase) maybeAdjustFirstPacketTime( + srData *livekit.RTCPSenderReportState, + tsOffset uint64, + extStartTS uint64, +) (adjustment int64, err error, loggingFields []interface{}) { nowNano := mono.UnixNano() if time.Duration(nowNano-r.startTime) > cFirstPacketTimeAdjustWindow { return @@ -405,6 +409,7 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *livekit.RTCPSenderRepo timeSinceFirst := time.Duration(nowNano - r.firstTime) now := r.firstTime + timeSinceFirst.Nanoseconds() firstTime := now - samplesDuration.Nanoseconds() + adjustment = r.firstTime - firstTime getFields := func() []interface{} { return []interface{}{ @@ -412,7 +417,7 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *livekit.RTCPSenderRepo "nowTime", time.Unix(0, now), "before", time.Unix(0, r.firstTime), "after", time.Unix(0, firstTime), - "adjustment", time.Duration(r.firstTime - firstTime), + "adjustment", time.Duration(adjustment), "extNowTS", extNowTS, "extStartTS", extStartTS, "srData", WrappedRTCPSenderReportStateLogger{srData}, @@ -425,15 +430,16 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *livekit.RTCPSenderRepo } if firstTime < r.firstTime { - if r.firstTime-firstTime > cFirstPacketTimeAdjustThreshold { + if adjustment > cFirstPacketTimeAdjustThreshold { err = errors.New("adjusting first packet time, too big, ignoring") loggingFields = getFields() } else { r.logger.Debugw("adjusting first packet time", getFields()...) - r.firstTimeAdjustment += time.Duration(r.firstTime - firstTime) + r.firstTimeAdjustment += time.Duration(adjustment) r.firstTime = firstTime } } + return } diff --git a/pkg/sfu/rtpstats/rtpstats_receiver.go b/pkg/sfu/rtpstats/rtpstats_receiver.go index 35531e640..4b2424f1c 100644 --- a/pkg/sfu/rtpstats/rtpstats_receiver.go +++ b/pkg/sfu/rtpstats/rtpstats_receiver.go @@ -542,9 +542,11 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *livekit.RTCPSenderRep r.updatePropagationDelayAndRecordSenderReport(srDataExt) r.checkRTPClockSkewAgainstMediaPathForSenderReport(srDataExt) - if err, loggingFields := r.maybeAdjustFirstPacketTime(r.srNewest, 0, r.timestamp.GetExtendedStart()); err != nil { + adjustment, err, loggingFields := r.maybeAdjustFirstPacketTime(r.srNewest, 0, r.timestamp.GetExtendedStart()) + if err != nil { r.logger.Infow(err.Error(), append(loggingFields, "rtpStats", lockedRTPStatsReceiverLogEncoder{r})...) } + r.propagationDelayEstimator.InitialAdjustment(adjustment) return true } diff --git a/pkg/sfu/rtpstats/rtpstats_sender.go b/pkg/sfu/rtpstats/rtpstats_sender.go index a361231c8..db43fa21c 100644 --- a/pkg/sfu/rtpstats/rtpstats_sender.go +++ b/pkg/sfu/rtpstats/rtpstats_sender.go @@ -800,7 +800,7 @@ func (r *RTPStatsSender) MaybeAdjustFirstPacketTime(publisherSRData *livekit.RTC return } - if err, loggingFields := r.maybeAdjustFirstPacketTime(publisherSRData, tsOffset, r.extStartTS); err != nil { + if _, err, loggingFields := r.maybeAdjustFirstPacketTime(publisherSRData, tsOffset, r.extStartTS); err != nil { r.logger.Infow(err.Error(), append(loggingFields, "rtpStats", lockedRTPStatsSenderLogEncoder{r})...) } } @@ -835,11 +835,12 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, publisherSRData *livek nowRTPExt uint64 ) if passThrough { - reportTime = publisherSRData.At - reportTimeAdjusted = publisherSRData.AtAdjusted + timeSincePublisherSR := time.Duration(mono.UnixNano() - publisherSRData.At) + reportTime = publisherSRData.At + timeSincePublisherSR.Nanoseconds() + reportTimeAdjusted = publisherSRData.AtAdjusted + timeSincePublisherSR.Nanoseconds() - nowNTP = mediatransportutil.NtpTime(publisherSRData.NtpTimestamp) - nowRTPExt = publisherSRData.RtpTimestampExt - tsOffset + nowNTP = mediatransportutil.ToNtpTime(mediatransportutil.NtpTime(publisherSRData.NtpTimestamp).Time().Add(timeSincePublisherSR)) + nowRTPExt = publisherSRData.RtpTimestampExt - tsOffset + uint64(timeSincePublisherSR.Nanoseconds()*int64(r.params.ClockRate)/1e9) } else { timeSincePublisherSRAdjusted := time.Duration(mono.UnixNano() - publisherSRData.AtAdjusted) reportTimeAdjusted = publisherSRData.AtAdjusted + timeSincePublisherSRAdjusted.Nanoseconds() diff --git a/pkg/sfu/utils/owd_estimator.go b/pkg/sfu/utils/owd_estimator.go index b25e2604b..7267262c9 100644 --- a/pkg/sfu/utils/owd_estimator.go +++ b/pkg/sfu/utils/owd_estimator.go @@ -34,7 +34,7 @@ type OWDEstimatorParams struct { } var OWDEstimatorParamsDefault = OWDEstimatorParams{ - // OWD (One-Way-Delay) Estimator is used to estimate propagation delay between sender and receicer. + // OWD (One-Way-Delay) Estimator is used to estimate propagation delay between sender and receiver. // As they operate on different clock domains, it is not possible to get exact propagation delay easily. // So, this module is an estimator using a simple approach explained below. It should not be used for // things that require high accuracy. @@ -72,6 +72,7 @@ type OWDEstimator struct { params OWDEstimatorParams initialized bool + initialAdjustmentDone bool lastSenderClockTimeNs int64 lastPropagationDelayNs int64 lastDeltaPropagationDelayNs int64 @@ -174,6 +175,22 @@ func (o *OWDEstimator) Update(senderClockTimeNs int64, receiverClockTimeNs int64 return o.estimatedPropagationDelayNs, stepChange } +func (o *OWDEstimator) InitialAdjustment(adjustmentNs int64) int64 { + if o.initialAdjustmentDone { + return o.estimatedPropagationDelayNs + } + + o.initialAdjustmentDone = true + // one time adjustment at init + // example: when this is used to measure one-way-delay of RTCP sender reports, + // it is possible that the first sender report is delayed and experiences more + // than existing propagation delay. This allows adjustment of initial estimate. + if adjustmentNs < 0 && -adjustmentNs < o.estimatedPropagationDelayNs { + o.estimatedPropagationDelayNs += adjustmentNs + } + return o.estimatedPropagationDelayNs +} + func (o *OWDEstimator) EstimatedPropagationDelay() int64 { return o.estimatedPropagationDelayNs }