mirror of
https://github.com/livekit/livekit.git
synced 2026-05-14 20:35:27 +00:00
Adjust for hold time when fowarding RTCP report. (#3956)
* Adjust for hold time when fowarding RTCP report. When passing through RTCP sender report, holding it for some time before sending means the remote receiver could see varying amount of propagation delay if the remote uses something like local_clock - ntp_sender_report_time and adapting to it. Ideally, SFU should just forward RTCP Sender Report, but the current pull model to group RTCP sender reports makes it a bigger change. So, adjust it by hold time. Also add a initial condition for one-way-delay estimator which can init with a smaller value of latency if the first sample to measure one-way-delay itself experienced higher delay than the prevailing conditions. * variable name * log as duration
This commit is contained in:
@@ -3457,7 +3457,6 @@ func (p *ParticipantImpl) handleTrackPublished(track types.MediaTrack, isMigrate
|
||||
p.Identity(),
|
||||
track.ToProto(),
|
||||
)
|
||||
|
||||
}
|
||||
|
||||
p.pendingTracksLock.Lock()
|
||||
|
||||
@@ -381,7 +381,11 @@ func (r *rtpStatsBase) GetRtt() uint32 {
|
||||
return r.rtt
|
||||
}
|
||||
|
||||
func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *livekit.RTCPSenderReportState, tsOffset uint64, extStartTS uint64) (err error, loggingFields []interface{}) {
|
||||
func (r *rtpStatsBase) maybeAdjustFirstPacketTime(
|
||||
srData *livekit.RTCPSenderReportState,
|
||||
tsOffset uint64,
|
||||
extStartTS uint64,
|
||||
) (adjustment int64, err error, loggingFields []interface{}) {
|
||||
nowNano := mono.UnixNano()
|
||||
if time.Duration(nowNano-r.startTime) > cFirstPacketTimeAdjustWindow {
|
||||
return
|
||||
@@ -405,6 +409,7 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *livekit.RTCPSenderRepo
|
||||
timeSinceFirst := time.Duration(nowNano - r.firstTime)
|
||||
now := r.firstTime + timeSinceFirst.Nanoseconds()
|
||||
firstTime := now - samplesDuration.Nanoseconds()
|
||||
adjustment = r.firstTime - firstTime
|
||||
|
||||
getFields := func() []interface{} {
|
||||
return []interface{}{
|
||||
@@ -412,7 +417,7 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *livekit.RTCPSenderRepo
|
||||
"nowTime", time.Unix(0, now),
|
||||
"before", time.Unix(0, r.firstTime),
|
||||
"after", time.Unix(0, firstTime),
|
||||
"adjustment", time.Duration(r.firstTime - firstTime),
|
||||
"adjustment", time.Duration(adjustment),
|
||||
"extNowTS", extNowTS,
|
||||
"extStartTS", extStartTS,
|
||||
"srData", WrappedRTCPSenderReportStateLogger{srData},
|
||||
@@ -425,15 +430,16 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *livekit.RTCPSenderRepo
|
||||
}
|
||||
|
||||
if firstTime < r.firstTime {
|
||||
if r.firstTime-firstTime > cFirstPacketTimeAdjustThreshold {
|
||||
if adjustment > cFirstPacketTimeAdjustThreshold {
|
||||
err = errors.New("adjusting first packet time, too big, ignoring")
|
||||
loggingFields = getFields()
|
||||
} else {
|
||||
r.logger.Debugw("adjusting first packet time", getFields()...)
|
||||
r.firstTimeAdjustment += time.Duration(r.firstTime - firstTime)
|
||||
r.firstTimeAdjustment += time.Duration(adjustment)
|
||||
r.firstTime = firstTime
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -542,9 +542,11 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *livekit.RTCPSenderRep
|
||||
r.updatePropagationDelayAndRecordSenderReport(srDataExt)
|
||||
r.checkRTPClockSkewAgainstMediaPathForSenderReport(srDataExt)
|
||||
|
||||
if err, loggingFields := r.maybeAdjustFirstPacketTime(r.srNewest, 0, r.timestamp.GetExtendedStart()); err != nil {
|
||||
adjustment, err, loggingFields := r.maybeAdjustFirstPacketTime(r.srNewest, 0, r.timestamp.GetExtendedStart())
|
||||
if err != nil {
|
||||
r.logger.Infow(err.Error(), append(loggingFields, "rtpStats", lockedRTPStatsReceiverLogEncoder{r})...)
|
||||
}
|
||||
r.propagationDelayEstimator.InitialAdjustment(adjustment)
|
||||
return true
|
||||
}
|
||||
|
||||
|
||||
@@ -800,7 +800,7 @@ func (r *RTPStatsSender) MaybeAdjustFirstPacketTime(publisherSRData *livekit.RTC
|
||||
return
|
||||
}
|
||||
|
||||
if err, loggingFields := r.maybeAdjustFirstPacketTime(publisherSRData, tsOffset, r.extStartTS); err != nil {
|
||||
if _, err, loggingFields := r.maybeAdjustFirstPacketTime(publisherSRData, tsOffset, r.extStartTS); err != nil {
|
||||
r.logger.Infow(err.Error(), append(loggingFields, "rtpStats", lockedRTPStatsSenderLogEncoder{r})...)
|
||||
}
|
||||
}
|
||||
@@ -835,11 +835,12 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, publisherSRData *livek
|
||||
nowRTPExt uint64
|
||||
)
|
||||
if passThrough {
|
||||
reportTime = publisherSRData.At
|
||||
reportTimeAdjusted = publisherSRData.AtAdjusted
|
||||
timeSincePublisherSR := time.Duration(mono.UnixNano() - publisherSRData.At)
|
||||
reportTime = publisherSRData.At + timeSincePublisherSR.Nanoseconds()
|
||||
reportTimeAdjusted = publisherSRData.AtAdjusted + timeSincePublisherSR.Nanoseconds()
|
||||
|
||||
nowNTP = mediatransportutil.NtpTime(publisherSRData.NtpTimestamp)
|
||||
nowRTPExt = publisherSRData.RtpTimestampExt - tsOffset
|
||||
nowNTP = mediatransportutil.ToNtpTime(mediatransportutil.NtpTime(publisherSRData.NtpTimestamp).Time().Add(timeSincePublisherSR))
|
||||
nowRTPExt = publisherSRData.RtpTimestampExt - tsOffset + uint64(timeSincePublisherSR.Nanoseconds()*int64(r.params.ClockRate)/1e9)
|
||||
} else {
|
||||
timeSincePublisherSRAdjusted := time.Duration(mono.UnixNano() - publisherSRData.AtAdjusted)
|
||||
reportTimeAdjusted = publisherSRData.AtAdjusted + timeSincePublisherSRAdjusted.Nanoseconds()
|
||||
|
||||
@@ -34,7 +34,7 @@ type OWDEstimatorParams struct {
|
||||
}
|
||||
|
||||
var OWDEstimatorParamsDefault = OWDEstimatorParams{
|
||||
// OWD (One-Way-Delay) Estimator is used to estimate propagation delay between sender and receicer.
|
||||
// OWD (One-Way-Delay) Estimator is used to estimate propagation delay between sender and receiver.
|
||||
// As they operate on different clock domains, it is not possible to get exact propagation delay easily.
|
||||
// So, this module is an estimator using a simple approach explained below. It should not be used for
|
||||
// things that require high accuracy.
|
||||
@@ -72,6 +72,7 @@ type OWDEstimator struct {
|
||||
params OWDEstimatorParams
|
||||
|
||||
initialized bool
|
||||
initialAdjustmentDone bool
|
||||
lastSenderClockTimeNs int64
|
||||
lastPropagationDelayNs int64
|
||||
lastDeltaPropagationDelayNs int64
|
||||
@@ -174,6 +175,22 @@ func (o *OWDEstimator) Update(senderClockTimeNs int64, receiverClockTimeNs int64
|
||||
return o.estimatedPropagationDelayNs, stepChange
|
||||
}
|
||||
|
||||
func (o *OWDEstimator) InitialAdjustment(adjustmentNs int64) int64 {
|
||||
if o.initialAdjustmentDone {
|
||||
return o.estimatedPropagationDelayNs
|
||||
}
|
||||
|
||||
o.initialAdjustmentDone = true
|
||||
// one time adjustment at init
|
||||
// example: when this is used to measure one-way-delay of RTCP sender reports,
|
||||
// it is possible that the first sender report is delayed and experiences more
|
||||
// than existing propagation delay. This allows adjustment of initial estimate.
|
||||
if adjustmentNs < 0 && -adjustmentNs < o.estimatedPropagationDelayNs {
|
||||
o.estimatedPropagationDelayNs += adjustmentNs
|
||||
}
|
||||
return o.estimatedPropagationDelayNs
|
||||
}
|
||||
|
||||
func (o *OWDEstimator) EstimatedPropagationDelay() int64 {
|
||||
return o.estimatedPropagationDelayNs
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user