Experimental flag to try time stamp adjustment to control drift. (#1687)

* Experimental flag to try time stamp adjustment to control drift.

There is a config to enable this.

Using a PID controller to try and keep the sample rate at expected
value. Need to be seen if this works well. Adjustment are limited
to 25 ms max at a time to ensure there are no large jumps.
And it is applied when doing RTCP sender report which happens
once in 5 seconds currently for both audio and video tracks.

A nice introduction to PID controllers - https://alphaville.github.io/qub/pid-101/#/
Implementation borrowed from - https://github.com/pms67/PID

A few things TODO
1. PID controller tuning is a process. Have picked values from test from
   that implementation above. May not be the best. Need to try.
2. Can potentially run this more often. Rather than running it only when
   running RTCP sender report (which is once in 5 seconds now), can
   potentially run it every second and limit the amount of change to
   something like 10 ms max.

* remove unused variable

* debug log a bit more
This commit is contained in:
Raja Subramanian
2023-05-06 11:52:57 +05:30
committed by GitHub
parent ae5d2c3261
commit 3fb93135f5
11 changed files with 298 additions and 49 deletions

View File

@@ -107,6 +107,9 @@ type RTCConfig struct {
// force a reconnect on a subscription error
ReconnectOnSubscriptionError *bool `yaml:"reconnect_on_subscription_error,omitempty"`
// allow time stamp adjust to keep drift low, this is experimental
AllowTimestampAdjustment *bool `yaml:"allow_timestamp_adjustment,omitempty"`
}
type TURNServer struct {

View File

@@ -104,6 +104,7 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr *
sub.GetBufferFactory(),
subscriberID,
t.params.ReceiverConfig.PacketBufferSize,
sub.GetAllowTimestampAdjustment(),
LoggerWithTrack(sub.GetLogger(), trackID, t.params.IsRelayed),
)
if err != nil {

View File

@@ -92,6 +92,7 @@ type ParticipantParams struct {
SubscriberAllowPause bool
SubscriptionLimitAudio int32
SubscriptionLimitVideo int32
AllowTimestampAdjustment bool
}
type ParticipantImpl struct {
@@ -228,6 +229,10 @@ func (p *ParticipantImpl) GetAdaptiveStream() bool {
return p.params.AdaptiveStream
}
func (p *ParticipantImpl) GetAllowTimestampAdjustment() bool {
return p.params.AllowTimestampAdjustment
}
func (p *ParticipantImpl) ID() livekit.ParticipantID {
return p.params.SID
}

View File

@@ -341,6 +341,8 @@ type LocalParticipant interface {
// down stream bandwidth management
SetSubscriberAllowPause(allowPause bool)
SetSubscriberChannelCapacity(channelCapacity int64)
GetAllowTimestampAdjustment() bool
}
// Room is a container of participants, and can provide room-level actions

View File

@@ -165,6 +165,16 @@ type FakeLocalParticipant struct {
getAdaptiveStreamReturnsOnCall map[int]struct {
result1 bool
}
GetAllowTimestampAdjustmentStub func() bool
getAllowTimestampAdjustmentMutex sync.RWMutex
getAllowTimestampAdjustmentArgsForCall []struct {
}
getAllowTimestampAdjustmentReturns struct {
result1 bool
}
getAllowTimestampAdjustmentReturnsOnCall map[int]struct {
result1 bool
}
GetAudioLevelStub func() (float64, bool)
getAudioLevelMutex sync.RWMutex
getAudioLevelArgsForCall []struct {
@@ -1612,6 +1622,59 @@ func (fake *FakeLocalParticipant) GetAdaptiveStreamReturnsOnCall(i int, result1
}{result1}
}
func (fake *FakeLocalParticipant) GetAllowTimestampAdjustment() bool {
fake.getAllowTimestampAdjustmentMutex.Lock()
ret, specificReturn := fake.getAllowTimestampAdjustmentReturnsOnCall[len(fake.getAllowTimestampAdjustmentArgsForCall)]
fake.getAllowTimestampAdjustmentArgsForCall = append(fake.getAllowTimestampAdjustmentArgsForCall, struct {
}{})
stub := fake.GetAllowTimestampAdjustmentStub
fakeReturns := fake.getAllowTimestampAdjustmentReturns
fake.recordInvocation("GetAllowTimestampAdjustment", []interface{}{})
fake.getAllowTimestampAdjustmentMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeLocalParticipant) GetAllowTimestampAdjustmentCallCount() int {
fake.getAllowTimestampAdjustmentMutex.RLock()
defer fake.getAllowTimestampAdjustmentMutex.RUnlock()
return len(fake.getAllowTimestampAdjustmentArgsForCall)
}
func (fake *FakeLocalParticipant) GetAllowTimestampAdjustmentCalls(stub func() bool) {
fake.getAllowTimestampAdjustmentMutex.Lock()
defer fake.getAllowTimestampAdjustmentMutex.Unlock()
fake.GetAllowTimestampAdjustmentStub = stub
}
func (fake *FakeLocalParticipant) GetAllowTimestampAdjustmentReturns(result1 bool) {
fake.getAllowTimestampAdjustmentMutex.Lock()
defer fake.getAllowTimestampAdjustmentMutex.Unlock()
fake.GetAllowTimestampAdjustmentStub = nil
fake.getAllowTimestampAdjustmentReturns = struct {
result1 bool
}{result1}
}
func (fake *FakeLocalParticipant) GetAllowTimestampAdjustmentReturnsOnCall(i int, result1 bool) {
fake.getAllowTimestampAdjustmentMutex.Lock()
defer fake.getAllowTimestampAdjustmentMutex.Unlock()
fake.GetAllowTimestampAdjustmentStub = nil
if fake.getAllowTimestampAdjustmentReturnsOnCall == nil {
fake.getAllowTimestampAdjustmentReturnsOnCall = make(map[int]struct {
result1 bool
})
}
fake.getAllowTimestampAdjustmentReturnsOnCall[i] = struct {
result1 bool
}{result1}
}
func (fake *FakeLocalParticipant) GetAudioLevel() (float64, bool) {
fake.getAudioLevelMutex.Lock()
ret, specificReturn := fake.getAudioLevelReturnsOnCall[len(fake.getAudioLevelArgsForCall)]
@@ -5456,6 +5519,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
defer fake.debugInfoMutex.RUnlock()
fake.getAdaptiveStreamMutex.RLock()
defer fake.getAdaptiveStreamMutex.RUnlock()
fake.getAllowTimestampAdjustmentMutex.RLock()
defer fake.getAllowTimestampAdjustmentMutex.RUnlock()
fake.getAudioLevelMutex.RLock()
defer fake.getAudioLevelMutex.RUnlock()
fake.getBufferFactoryMutex.RLock()

View File

@@ -309,6 +309,11 @@ func (r *RoomManager) StartSession(
if pi.SubscriberAllowPause != nil {
subscriberAllowPause = *pi.SubscriberAllowPause
}
// default do not allow timestamp adjustment
allowTimestampAdjustment := false
if r.config.RTC.AllowTimestampAdjustment != nil {
allowTimestampAdjustment = *r.config.RTC.AllowTimestampAdjustment
}
participant, err = rtc.NewParticipant(rtc.ParticipantParams{
Identity: pi.Identity,
Name: pi.Name,
@@ -343,6 +348,7 @@ func (r *RoomManager) StartSession(
SubscriberAllowPause: subscriberAllowPause,
SubscriptionLimitAudio: r.config.Limit.SubscriptionLimitAudio,
SubscriptionLimitVideo: r.config.Limit.SubscriptionLimitVideo,
AllowTimestampAdjustment: allowTimestampAdjustment,
})
if err != nil {
return err

View File

@@ -174,21 +174,28 @@ type RTPStats struct {
rtt uint32
maxRtt uint32
srData *RTCPSenderReportData
lastSRTime time.Time
lastSRNTP mediatransportutil.NtpTime
srData *RTCPSenderReportData
lastSRTime time.Time
lastSRNTP mediatransportutil.NtpTime
pidController *PIDController
nextSnapshotId uint32
snapshots map[uint32]*Snapshot
}
func NewRTPStats(params RTPStatsParams) *RTPStats {
return &RTPStats{
r := &RTPStats{
params: params,
logger: params.Logger,
nextSnapshotId: FirstSnapshotId,
snapshots: make(map[uint32]*Snapshot),
pidController: NewPIDController(),
}
r.pidController.SetGains(2.0, 0.5, 0.25)
r.pidController.SetDerivativeLPF(0.02)
r.pidController.SetOutputLimits(-0.025*float64(params.ClockRate), 0.025*float64(params.ClockRate))
return r
}
func (r *RTPStats) Seed(from *RTPStats) {
@@ -731,37 +738,35 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) {
}
// TODO-REMOVE-AFTER-DEBUG-START
if r.params.ClockRate != 90000 { // log only for audio as it is less frequent
ntpTime := srData.NTPTimestamp.Time()
ntpTime := srData.NTPTimestamp.Time()
var ntpDiffSinceLast, arrivalDiffSinceLast time.Duration
var rtpDiffSinceLast uint32
if r.srData != nil {
ntpDiffSinceLast = ntpTime.Sub(r.srData.NTPTimestamp.Time())
rtpDiffSinceLast = srData.RTPTimestamp - r.srData.RTPTimestamp
arrivalDiffSinceLast = srData.ArrivalTime.Sub(r.srData.ArrivalTime)
}
timeSinceFirst := srData.NTPTimestamp.Time().Sub(r.firstTime)
rtpDiffSinceFirst := getExtTS(srData.RTPTimestamp, r.tsCycles) - r.extStartTS
drift := int64(uint64(timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9) - rtpDiffSinceFirst)
driftMs := (float64(drift) * 1000) / float64(r.params.ClockRate)
r.logger.Debugw(
"received sender report",
"ntp", ntpTime,
"rtp", srData.RTPTimestamp,
"arrival", srData.ArrivalTime,
"ntpDiff", ntpDiffSinceLast,
"rtpDiff", rtpDiffSinceLast,
"arrivalDiff", arrivalDiffSinceLast,
"expectedTimeDiff", float64(rtpDiffSinceLast)/float64(r.params.ClockRate),
"timeSinceFirst", timeSinceFirst,
"rtpDiffSinceFirst", rtpDiffSinceFirst,
"drift", drift,
"driftMs", driftMs,
)
var ntpDiffSinceLast, arrivalDiffSinceLast time.Duration
var rtpDiffSinceLast uint32
if r.srData != nil {
ntpDiffSinceLast = ntpTime.Sub(r.srData.NTPTimestamp.Time())
rtpDiffSinceLast = srData.RTPTimestamp - r.srData.RTPTimestamp
arrivalDiffSinceLast = srData.ArrivalTime.Sub(r.srData.ArrivalTime)
}
timeSinceFirst := time.Since(r.firstTime) // ideally should use NTP time from SR, but that is a different time base, now is a resonable approximation
rtpDiffSinceFirst := getExtTS(srData.RTPTimestamp, r.tsCycles) - r.extStartTS
drift := int64(uint64(timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9) - rtpDiffSinceFirst)
driftMs := (float64(drift) * 1000) / float64(r.params.ClockRate)
r.logger.Debugw(
"received sender report",
"ntp", ntpTime,
"rtp", srData.RTPTimestamp,
"arrival", srData.ArrivalTime,
"ntpDiff", ntpDiffSinceLast,
"rtpDiff", rtpDiffSinceLast,
"arrivalDiff", arrivalDiffSinceLast,
"expectedTimeDiff", float64(rtpDiffSinceLast)/float64(r.params.ClockRate),
"timeSinceFirst", timeSinceFirst,
"rtpDiffSinceFirst", rtpDiffSinceFirst,
"drift", drift,
"driftMs", driftMs,
)
// TODO-REMOVE-AFTER-DEBUG-END
srDataCopy := *srData
@@ -806,12 +811,12 @@ func (r *RTPStats) GetExpectedRTPTimestamp(at time.Time) (uint32, error) {
return uint32(expectedExtRTP), nil
}
func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) *rtcp.SenderReport {
func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) (*rtcp.SenderReport, float64) {
r.lock.Lock()
defer r.lock.Unlock()
if !r.initialized {
return nil
return nil, 0.0
}
// construct current time based on monotonic clock
@@ -836,6 +841,23 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) *rtcp.SenderReport {
timeSinceHighest := time.Since(r.highestTime)
nowRTP := r.highestTS + uint32(timeSinceHighest.Nanoseconds()*int64(r.params.ClockRate)/1e9)
// TODO-REMOVE-AFTER-DEBUG-START
timeSinceFirst = nowNTP.Time().Sub(r.firstTime)
rtpDiffSinceFirst := getExtTS(nowRTP, r.tsCycles) - r.extStartTS
measurement := float64(rtpDiffSinceFirst) / timeSinceFirst.Seconds()
pidOutput := r.pidController.Update(
float64(r.params.ClockRate),
measurement,
now,
)
r.logger.Debugw(
"pid controller output",
"measurement", measurement,
"errorTerm", float64(r.params.ClockRate)-measurement,
"pidOutput", pidOutput,
)
// TODO-REMOVE-AFTER-DEBUG-STOP
// TODO-REMOVE-AFTER-DEBUG-START
ntpTime := nowNTP.Time()
@@ -843,8 +865,6 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) *rtcp.SenderReport {
rtpDiffLocal := int32(nowRTP - r.highestTS)
rtpOffsetLocal := int32(nowRTP - r.highestTS - uint32(ntpDiffLocal.Nanoseconds()*int64(r.params.ClockRate)/1e9))
timeSinceFirst = nowNTP.Time().Sub(r.firstTime)
rtpDiffSinceFirst := getExtTS(nowRTP, r.tsCycles) - r.extStartTS
drift := int64(uint64(timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9) - rtpDiffSinceFirst)
driftMs := (float64(drift) * 1000) / float64(r.params.ClockRate)
@@ -853,6 +873,7 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) *rtcp.SenderReport {
"highestTS", r.highestTS,
"highestTime", r.highestTime.String(),
"reportTS", nowRTP,
"expectedTS", uint32(expectedExtRTP),
"reportTime", ntpTime.String(),
"rtpDiffLocal", rtpDiffLocal,
"ntpDiffLocal", ntpDiffLocal,
@@ -861,6 +882,7 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) *rtcp.SenderReport {
"rtpDiffSinceFirst", rtpDiffSinceFirst,
"drift", drift,
"driftMs", driftMs,
"rate", measurement,
)
// TODO-REMOVE-AFTER-DEBUG-END
@@ -873,7 +895,7 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) *rtcp.SenderReport {
RTPTime: nowRTP,
PacketCount: r.getTotalPacketsPrimary() + r.packetsDuplicate + r.packetsPadding,
OctetCount: uint32(r.bytes + r.bytesDuplicate + r.bytesPadding),
}
}, pidOutput
}
func (r *RTPStats) SnapshotRtcpReceptionReport(ssrc uint32, proxyFracLost uint8, snapshotId uint32) *rtcp.ReceptionReport {
@@ -1747,3 +1769,90 @@ func AggregateRTPDeltaInfo(deltaInfoList []*RTPDeltaInfo) *RTPDeltaInfo {
Firs: firs,
}
}
// -------------------------------------------------------------------
type PIDController struct {
kp, ki, kd float64
tau float64 // low pass filter of D, time constant
outMin, outMax float64
isOutLimitsSet bool
iMin, iMax float64
isILimitsSet bool
iVal, dVal float64
prevError, prevMeasurement float64
prevMeasurementTime time.Time
}
func NewPIDController() *PIDController {
return &PIDController{}
}
func (p *PIDController) SetGains(kp, ki, kd float64) {
p.kp = kp
p.ki = ki
p.kd = kd
}
func (p *PIDController) SetDerivativeLPF(tau float64) {
p.tau = tau
}
func (p *PIDController) SetOutputLimits(min, max float64) {
p.outMin = min
p.outMax = max
p.isOutLimitsSet = true
}
func (p *PIDController) SetIntegralLimits(min, max float64) {
p.iMin = min
p.iMax = max
p.isILimitsSet = true
}
func (p *PIDController) Update(setpoint, measurement float64, at time.Time) float64 {
diff := setpoint - measurement
if p.prevMeasurementTime.IsZero() {
p.prevError = diff
p.prevMeasurement = measurement
p.prevMeasurementTime = at
return 0
}
proportional := p.kp * diff
duration := at.Sub(p.prevMeasurementTime).Seconds()
p.iVal = p.iVal + (0.5 * p.ki * duration * (diff + p.prevError))
if p.isILimitsSet {
if p.iVal > p.iMax {
p.iVal = p.iMax
}
if p.iVal < p.iMin {
p.iVal = p.iMin
}
}
p.dVal = (-2.0 * p.kd * (measurement - p.prevMeasurement)) + (((2.0*p.tau - duration) * p.dVal) / (2.0*p.tau + duration))
output := proportional + p.iVal + p.dVal
if p.isOutLimitsSet {
if output > p.outMax {
output = p.outMax
}
if output < p.outMin {
output = p.outMin
}
}
p.prevError = diff
p.prevMeasurement = measurement
p.prevMeasurementTime = at
return output
}
// -------------------------------------------------------------------

View File

@@ -185,6 +185,8 @@ type DownTrack struct {
sequencer *sequencer
bufferFactory *buffer.Factory
allowTimestampAdjustment bool
forwarder *Forwarder
upstreamCodecs []webrtc.RTPCodecParameters
@@ -252,6 +254,7 @@ func NewDownTrack(
bf *buffer.Factory,
subID livekit.ParticipantID,
mt int,
allowTimestampAdjustment bool,
logger logger.Logger,
) (*DownTrack, error) {
var kind webrtc.RTPCodecType
@@ -265,16 +268,17 @@ func NewDownTrack(
}
d := &DownTrack{
logger: logger,
id: r.TrackID(),
subscriberID: subID,
maxTrack: mt,
streamID: r.StreamID(),
bufferFactory: bf,
receiver: r,
upstreamCodecs: codecs,
kind: kind,
codec: codecs[0].RTPCodecCapability,
logger: logger,
id: r.TrackID(),
subscriberID: subID,
maxTrack: mt,
streamID: r.StreamID(),
bufferFactory: bf,
allowTimestampAdjustment: allowTimestampAdjustment,
receiver: r,
upstreamCodecs: codecs,
kind: kind,
codec: codecs[0].RTPCodecCapability,
}
d.forwarder = NewForwarder(
d.kind,
@@ -1113,7 +1117,11 @@ func (d *DownTrack) CreateSenderReport() *rtcp.SenderReport {
return nil
}
return d.rtpStats.GetRtcpSenderReport(d.ssrc)
sr, tsAdjust := d.rtpStats.GetRtcpSenderReport(d.ssrc)
if d.allowTimestampAdjustment {
d.forwarder.AdjustTimestamp(tsAdjust)
}
return sr
}
func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan struct{} {

View File

@@ -1630,6 +1630,13 @@ func (f *Forwarder) GetRTPMungerParams() RTPMungerParams {
return f.rtpMunger.GetParams()
}
func (f *Forwarder) AdjustTimestamp(tsAdjust float64) {
f.lock.Lock()
defer f.lock.Unlock()
f.rtpMunger.UpdateTsOffset(uint32(tsAdjust))
}
// -----------------------------------------------------------------------------
func getOptimalBandwidthNeeded(muted bool, pubMuted bool, maxPublishedLayer int32, brs Bitrates, maxLayer buffer.VideoLayer) int64 {

View File

@@ -58,6 +58,7 @@ type RTPMungerParams struct {
highestIncomingSN uint16
lastSN uint16
snOffset uint16
highestIncomingTS uint32
lastTS uint32
tsOffset uint32
lastMarker bool
@@ -88,6 +89,7 @@ func (r *RTPMunger) GetParams() RTPMungerParams {
highestIncomingSN: r.highestIncomingSN,
lastSN: r.lastSN,
snOffset: r.snOffset,
highestIncomingTS: r.highestIncomingTS,
lastTS: r.lastTS,
tsOffset: r.tsOffset,
lastMarker: r.lastMarker,
@@ -110,6 +112,7 @@ func (r *RTPMunger) SeedLast(state RTPMungerState) {
func (r *RTPMunger) SetLastSnTs(extPkt *buffer.ExtPacket) {
r.highestIncomingSN = extPkt.Packet.SequenceNumber - 1
r.highestIncomingTS = extPkt.Packet.Timestamp
if !r.started {
r.lastSN = extPkt.Packet.SequenceNumber
r.lastTS = extPkt.Packet.Timestamp
@@ -122,6 +125,7 @@ func (r *RTPMunger) SetLastSnTs(extPkt *buffer.ExtPacket) {
func (r *RTPMunger) UpdateSnTsOffsets(extPkt *buffer.ExtPacket, snAdjust uint16, tsAdjust uint32) {
r.highestIncomingSN = extPkt.Packet.SequenceNumber - 1
r.highestIncomingTS = extPkt.Packet.Timestamp
r.snOffset = extPkt.Packet.SequenceNumber - r.lastSN - snAdjust
r.tsOffset = extPkt.Packet.Timestamp - r.lastTS - tsAdjust
@@ -138,6 +142,10 @@ func (r *RTPMunger) PacketDropped(extPkt *buffer.ExtPacket) {
r.lastSN = extPkt.Packet.SequenceNumber - r.snOffset
}
func (r *RTPMunger) UpdateTsOffset(tsAdjust uint32) {
r.tsOffset -= tsAdjust
}
func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationParamsRTP, error) {
// if out-of-order, look up sequence number offset cache
diff := extPkt.Packet.SequenceNumber - r.highestIncomingSN
@@ -200,8 +208,27 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara
mungedSN := extPkt.Packet.SequenceNumber - r.snOffset
mungedTS := extPkt.Packet.Timestamp - r.tsOffset
// with timestamp adjustment, it is possible that the adjustment causes munged timestamp to move backwards,
// detect that and adjust so that it does not move back
if extPkt.Packet.Timestamp != r.highestIncomingTS && (((mungedTS - r.lastTS) == 0) || (mungedTS-r.lastTS) > (1<<31)) {
adjustedMungedTS := r.lastTS + 1
adjustedTSOffset := extPkt.Packet.Timestamp - adjustedMungedTS
r.logger.Infow(
"adjust out-of-order timestamp offset",
"mungedTS", mungedTS,
"lastTS", r.lastTS,
"incomingTS", extPkt.Packet.Timestamp,
"offset", r.tsOffset,
"adjustedMungedTS", adjustedMungedTS,
"adjustedTSOffset", adjustedTSOffset,
)
mungedTS = adjustedMungedTS
r.tsOffset = adjustedTSOffset
}
r.highestIncomingSN = extPkt.Packet.SequenceNumber
r.lastSN = mungedSN
r.highestIncomingTS = extPkt.Packet.Timestamp
r.lastTS = mungedTS
r.lastMarker = extPkt.Packet.Marker

View File

@@ -494,9 +494,11 @@ func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ts uint32, layer in
return 0, fmt.Errorf("invalid layer, target: %d, reference: %d", layer, referenceLayer)
}
/* TODO-RESTORE-AFTER-DEBUG - this is just fast path, below calculations should yield same
if layer == referenceLayer {
return ts, nil
}
*/
var srLayer *buffer.RTCPSenderReportData
if int(layer) < len(s.senderReports) {
@@ -521,6 +523,20 @@ func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ts uint32, layer in
ntpDiff := srRef.NTPTimestamp.Time().Sub(srLayer.NTPTimestamp.Time())
rtpDiff := ntpDiff.Nanoseconds() * int64(s.clockRate) / 1e9
normalizedTS := srLayer.RTPTimestamp + uint32(rtpDiff)
s.logger.Debugw(
"getting reference timestaml",
"layer", layer,
"referenceLayer", referenceLayer,
"incomingTS", ts,
"layerNTP", srLayer.NTPTimestamp.Time().String(),
"refNTP", srRef.NTPTimestamp.Time().String(),
"ntpDiff", ntpDiff.String(),
"layerRTP", srLayer.RTPTimestamp,
"refRTP", srRef.RTPTimestamp,
"rtpDiff", rtpDiff,
"normalizedTS", normalizedTS,
"mappedTS", ts+(srRef.RTPTimestamp-normalizedTS),
)
// now that both RTP timestamps correspond to roughly the same NTP time,
// the diff between them is the offset in RTP timestamp units between layer and referenceLayer.