From 3fb93135f5f2e41c8ff6924b5c4b2cf2d7f83899 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Sat, 6 May 2023 11:52:57 +0530 Subject: [PATCH] Experimental flag to try time stamp adjustment to control drift. (#1687) * Experimental flag to try time stamp adjustment to control drift. There is a config to enable this. Using a PID controller to try and keep the sample rate at expected value. Need to be seen if this works well. Adjustment are limited to 25 ms max at a time to ensure there are no large jumps. And it is applied when doing RTCP sender report which happens once in 5 seconds currently for both audio and video tracks. A nice introduction to PID controllers - https://alphaville.github.io/qub/pid-101/#/ Implementation borrowed from - https://github.com/pms67/PID A few things TODO 1. PID controller tuning is a process. Have picked values from test from that implementation above. May not be the best. Need to try. 2. Can potentially run this more often. Rather than running it only when running RTCP sender report (which is once in 5 seconds now), can potentially run it every second and limit the amount of change to something like 10 ms max. * remove unused variable * debug log a bit more --- pkg/config/config.go | 3 + pkg/rtc/mediatracksubscriptions.go | 1 + pkg/rtc/participant.go | 5 + pkg/rtc/types/interfaces.go | 2 + .../typesfakes/fake_local_participant.go | 65 ++++++ pkg/service/roommanager.go | 6 + pkg/sfu/buffer/rtpstats.go | 185 ++++++++++++++---- pkg/sfu/downtrack.go | 30 +-- pkg/sfu/forwarder.go | 7 + pkg/sfu/rtpmunger.go | 27 +++ pkg/sfu/streamtrackermanager.go | 16 ++ 11 files changed, 298 insertions(+), 49 deletions(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index 315fb3aae..cdab66a70 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -107,6 +107,9 @@ type RTCConfig struct { // force a reconnect on a subscription error ReconnectOnSubscriptionError *bool `yaml:"reconnect_on_subscription_error,omitempty"` + + // allow time stamp adjust to keep drift low, this is experimental + AllowTimestampAdjustment *bool `yaml:"allow_timestamp_adjustment,omitempty"` } type TURNServer struct { diff --git a/pkg/rtc/mediatracksubscriptions.go b/pkg/rtc/mediatracksubscriptions.go index af82fe224..bbf439e43 100644 --- a/pkg/rtc/mediatracksubscriptions.go +++ b/pkg/rtc/mediatracksubscriptions.go @@ -104,6 +104,7 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr * sub.GetBufferFactory(), subscriberID, t.params.ReceiverConfig.PacketBufferSize, + sub.GetAllowTimestampAdjustment(), LoggerWithTrack(sub.GetLogger(), trackID, t.params.IsRelayed), ) if err != nil { diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 793defe93..91008754c 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -92,6 +92,7 @@ type ParticipantParams struct { SubscriberAllowPause bool SubscriptionLimitAudio int32 SubscriptionLimitVideo int32 + AllowTimestampAdjustment bool } type ParticipantImpl struct { @@ -228,6 +229,10 @@ func (p *ParticipantImpl) GetAdaptiveStream() bool { return p.params.AdaptiveStream } +func (p *ParticipantImpl) GetAllowTimestampAdjustment() bool { + return p.params.AllowTimestampAdjustment +} + func (p *ParticipantImpl) ID() livekit.ParticipantID { return p.params.SID } diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 84989179a..1a0a9532c 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -341,6 +341,8 @@ type LocalParticipant interface { // down stream bandwidth management SetSubscriberAllowPause(allowPause bool) SetSubscriberChannelCapacity(channelCapacity int64) + + GetAllowTimestampAdjustment() bool } // Room is a container of participants, and can provide room-level actions diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index e4ddfea5f..3fbdcbc7d 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -165,6 +165,16 @@ type FakeLocalParticipant struct { getAdaptiveStreamReturnsOnCall map[int]struct { result1 bool } + GetAllowTimestampAdjustmentStub func() bool + getAllowTimestampAdjustmentMutex sync.RWMutex + getAllowTimestampAdjustmentArgsForCall []struct { + } + getAllowTimestampAdjustmentReturns struct { + result1 bool + } + getAllowTimestampAdjustmentReturnsOnCall map[int]struct { + result1 bool + } GetAudioLevelStub func() (float64, bool) getAudioLevelMutex sync.RWMutex getAudioLevelArgsForCall []struct { @@ -1612,6 +1622,59 @@ func (fake *FakeLocalParticipant) GetAdaptiveStreamReturnsOnCall(i int, result1 }{result1} } +func (fake *FakeLocalParticipant) GetAllowTimestampAdjustment() bool { + fake.getAllowTimestampAdjustmentMutex.Lock() + ret, specificReturn := fake.getAllowTimestampAdjustmentReturnsOnCall[len(fake.getAllowTimestampAdjustmentArgsForCall)] + fake.getAllowTimestampAdjustmentArgsForCall = append(fake.getAllowTimestampAdjustmentArgsForCall, struct { + }{}) + stub := fake.GetAllowTimestampAdjustmentStub + fakeReturns := fake.getAllowTimestampAdjustmentReturns + fake.recordInvocation("GetAllowTimestampAdjustment", []interface{}{}) + fake.getAllowTimestampAdjustmentMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeLocalParticipant) GetAllowTimestampAdjustmentCallCount() int { + fake.getAllowTimestampAdjustmentMutex.RLock() + defer fake.getAllowTimestampAdjustmentMutex.RUnlock() + return len(fake.getAllowTimestampAdjustmentArgsForCall) +} + +func (fake *FakeLocalParticipant) GetAllowTimestampAdjustmentCalls(stub func() bool) { + fake.getAllowTimestampAdjustmentMutex.Lock() + defer fake.getAllowTimestampAdjustmentMutex.Unlock() + fake.GetAllowTimestampAdjustmentStub = stub +} + +func (fake *FakeLocalParticipant) GetAllowTimestampAdjustmentReturns(result1 bool) { + fake.getAllowTimestampAdjustmentMutex.Lock() + defer fake.getAllowTimestampAdjustmentMutex.Unlock() + fake.GetAllowTimestampAdjustmentStub = nil + fake.getAllowTimestampAdjustmentReturns = struct { + result1 bool + }{result1} +} + +func (fake *FakeLocalParticipant) GetAllowTimestampAdjustmentReturnsOnCall(i int, result1 bool) { + fake.getAllowTimestampAdjustmentMutex.Lock() + defer fake.getAllowTimestampAdjustmentMutex.Unlock() + fake.GetAllowTimestampAdjustmentStub = nil + if fake.getAllowTimestampAdjustmentReturnsOnCall == nil { + fake.getAllowTimestampAdjustmentReturnsOnCall = make(map[int]struct { + result1 bool + }) + } + fake.getAllowTimestampAdjustmentReturnsOnCall[i] = struct { + result1 bool + }{result1} +} + func (fake *FakeLocalParticipant) GetAudioLevel() (float64, bool) { fake.getAudioLevelMutex.Lock() ret, specificReturn := fake.getAudioLevelReturnsOnCall[len(fake.getAudioLevelArgsForCall)] @@ -5456,6 +5519,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.debugInfoMutex.RUnlock() fake.getAdaptiveStreamMutex.RLock() defer fake.getAdaptiveStreamMutex.RUnlock() + fake.getAllowTimestampAdjustmentMutex.RLock() + defer fake.getAllowTimestampAdjustmentMutex.RUnlock() fake.getAudioLevelMutex.RLock() defer fake.getAudioLevelMutex.RUnlock() fake.getBufferFactoryMutex.RLock() diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 5b24c44bc..c9986e92a 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -309,6 +309,11 @@ func (r *RoomManager) StartSession( if pi.SubscriberAllowPause != nil { subscriberAllowPause = *pi.SubscriberAllowPause } + // default do not allow timestamp adjustment + allowTimestampAdjustment := false + if r.config.RTC.AllowTimestampAdjustment != nil { + allowTimestampAdjustment = *r.config.RTC.AllowTimestampAdjustment + } participant, err = rtc.NewParticipant(rtc.ParticipantParams{ Identity: pi.Identity, Name: pi.Name, @@ -343,6 +348,7 @@ func (r *RoomManager) StartSession( SubscriberAllowPause: subscriberAllowPause, SubscriptionLimitAudio: r.config.Limit.SubscriptionLimitAudio, SubscriptionLimitVideo: r.config.Limit.SubscriptionLimitVideo, + AllowTimestampAdjustment: allowTimestampAdjustment, }) if err != nil { return err diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index 339959e6d..9b274f4e3 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -174,21 +174,28 @@ type RTPStats struct { rtt uint32 maxRtt uint32 - srData *RTCPSenderReportData - lastSRTime time.Time - lastSRNTP mediatransportutil.NtpTime + srData *RTCPSenderReportData + lastSRTime time.Time + lastSRNTP mediatransportutil.NtpTime + pidController *PIDController nextSnapshotId uint32 snapshots map[uint32]*Snapshot } func NewRTPStats(params RTPStatsParams) *RTPStats { - return &RTPStats{ + r := &RTPStats{ params: params, logger: params.Logger, nextSnapshotId: FirstSnapshotId, snapshots: make(map[uint32]*Snapshot), + pidController: NewPIDController(), } + + r.pidController.SetGains(2.0, 0.5, 0.25) + r.pidController.SetDerivativeLPF(0.02) + r.pidController.SetOutputLimits(-0.025*float64(params.ClockRate), 0.025*float64(params.ClockRate)) + return r } func (r *RTPStats) Seed(from *RTPStats) { @@ -731,37 +738,35 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) { } // TODO-REMOVE-AFTER-DEBUG-START - if r.params.ClockRate != 90000 { // log only for audio as it is less frequent - ntpTime := srData.NTPTimestamp.Time() + ntpTime := srData.NTPTimestamp.Time() - var ntpDiffSinceLast, arrivalDiffSinceLast time.Duration - var rtpDiffSinceLast uint32 - if r.srData != nil { - ntpDiffSinceLast = ntpTime.Sub(r.srData.NTPTimestamp.Time()) - rtpDiffSinceLast = srData.RTPTimestamp - r.srData.RTPTimestamp - arrivalDiffSinceLast = srData.ArrivalTime.Sub(r.srData.ArrivalTime) - } - - timeSinceFirst := srData.NTPTimestamp.Time().Sub(r.firstTime) - rtpDiffSinceFirst := getExtTS(srData.RTPTimestamp, r.tsCycles) - r.extStartTS - drift := int64(uint64(timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9) - rtpDiffSinceFirst) - driftMs := (float64(drift) * 1000) / float64(r.params.ClockRate) - - r.logger.Debugw( - "received sender report", - "ntp", ntpTime, - "rtp", srData.RTPTimestamp, - "arrival", srData.ArrivalTime, - "ntpDiff", ntpDiffSinceLast, - "rtpDiff", rtpDiffSinceLast, - "arrivalDiff", arrivalDiffSinceLast, - "expectedTimeDiff", float64(rtpDiffSinceLast)/float64(r.params.ClockRate), - "timeSinceFirst", timeSinceFirst, - "rtpDiffSinceFirst", rtpDiffSinceFirst, - "drift", drift, - "driftMs", driftMs, - ) + var ntpDiffSinceLast, arrivalDiffSinceLast time.Duration + var rtpDiffSinceLast uint32 + if r.srData != nil { + ntpDiffSinceLast = ntpTime.Sub(r.srData.NTPTimestamp.Time()) + rtpDiffSinceLast = srData.RTPTimestamp - r.srData.RTPTimestamp + arrivalDiffSinceLast = srData.ArrivalTime.Sub(r.srData.ArrivalTime) } + + timeSinceFirst := time.Since(r.firstTime) // ideally should use NTP time from SR, but that is a different time base, now is a resonable approximation + rtpDiffSinceFirst := getExtTS(srData.RTPTimestamp, r.tsCycles) - r.extStartTS + drift := int64(uint64(timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9) - rtpDiffSinceFirst) + driftMs := (float64(drift) * 1000) / float64(r.params.ClockRate) + + r.logger.Debugw( + "received sender report", + "ntp", ntpTime, + "rtp", srData.RTPTimestamp, + "arrival", srData.ArrivalTime, + "ntpDiff", ntpDiffSinceLast, + "rtpDiff", rtpDiffSinceLast, + "arrivalDiff", arrivalDiffSinceLast, + "expectedTimeDiff", float64(rtpDiffSinceLast)/float64(r.params.ClockRate), + "timeSinceFirst", timeSinceFirst, + "rtpDiffSinceFirst", rtpDiffSinceFirst, + "drift", drift, + "driftMs", driftMs, + ) // TODO-REMOVE-AFTER-DEBUG-END srDataCopy := *srData @@ -806,12 +811,12 @@ func (r *RTPStats) GetExpectedRTPTimestamp(at time.Time) (uint32, error) { return uint32(expectedExtRTP), nil } -func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) *rtcp.SenderReport { +func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) (*rtcp.SenderReport, float64) { r.lock.Lock() defer r.lock.Unlock() if !r.initialized { - return nil + return nil, 0.0 } // construct current time based on monotonic clock @@ -836,6 +841,23 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) *rtcp.SenderReport { timeSinceHighest := time.Since(r.highestTime) nowRTP := r.highestTS + uint32(timeSinceHighest.Nanoseconds()*int64(r.params.ClockRate)/1e9) + // TODO-REMOVE-AFTER-DEBUG-START + timeSinceFirst = nowNTP.Time().Sub(r.firstTime) + rtpDiffSinceFirst := getExtTS(nowRTP, r.tsCycles) - r.extStartTS + measurement := float64(rtpDiffSinceFirst) / timeSinceFirst.Seconds() + pidOutput := r.pidController.Update( + float64(r.params.ClockRate), + measurement, + now, + ) + r.logger.Debugw( + "pid controller output", + "measurement", measurement, + "errorTerm", float64(r.params.ClockRate)-measurement, + "pidOutput", pidOutput, + ) + // TODO-REMOVE-AFTER-DEBUG-STOP + // TODO-REMOVE-AFTER-DEBUG-START ntpTime := nowNTP.Time() @@ -843,8 +865,6 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) *rtcp.SenderReport { rtpDiffLocal := int32(nowRTP - r.highestTS) rtpOffsetLocal := int32(nowRTP - r.highestTS - uint32(ntpDiffLocal.Nanoseconds()*int64(r.params.ClockRate)/1e9)) - timeSinceFirst = nowNTP.Time().Sub(r.firstTime) - rtpDiffSinceFirst := getExtTS(nowRTP, r.tsCycles) - r.extStartTS drift := int64(uint64(timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9) - rtpDiffSinceFirst) driftMs := (float64(drift) * 1000) / float64(r.params.ClockRate) @@ -853,6 +873,7 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) *rtcp.SenderReport { "highestTS", r.highestTS, "highestTime", r.highestTime.String(), "reportTS", nowRTP, + "expectedTS", uint32(expectedExtRTP), "reportTime", ntpTime.String(), "rtpDiffLocal", rtpDiffLocal, "ntpDiffLocal", ntpDiffLocal, @@ -861,6 +882,7 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) *rtcp.SenderReport { "rtpDiffSinceFirst", rtpDiffSinceFirst, "drift", drift, "driftMs", driftMs, + "rate", measurement, ) // TODO-REMOVE-AFTER-DEBUG-END @@ -873,7 +895,7 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) *rtcp.SenderReport { RTPTime: nowRTP, PacketCount: r.getTotalPacketsPrimary() + r.packetsDuplicate + r.packetsPadding, OctetCount: uint32(r.bytes + r.bytesDuplicate + r.bytesPadding), - } + }, pidOutput } func (r *RTPStats) SnapshotRtcpReceptionReport(ssrc uint32, proxyFracLost uint8, snapshotId uint32) *rtcp.ReceptionReport { @@ -1747,3 +1769,90 @@ func AggregateRTPDeltaInfo(deltaInfoList []*RTPDeltaInfo) *RTPDeltaInfo { Firs: firs, } } + +// ------------------------------------------------------------------- + +type PIDController struct { + kp, ki, kd float64 + + tau float64 // low pass filter of D, time constant + + outMin, outMax float64 + isOutLimitsSet bool + + iMin, iMax float64 + isILimitsSet bool + + iVal, dVal float64 + + prevError, prevMeasurement float64 + prevMeasurementTime time.Time +} + +func NewPIDController() *PIDController { + return &PIDController{} +} + +func (p *PIDController) SetGains(kp, ki, kd float64) { + p.kp = kp + p.ki = ki + p.kd = kd +} + +func (p *PIDController) SetDerivativeLPF(tau float64) { + p.tau = tau +} + +func (p *PIDController) SetOutputLimits(min, max float64) { + p.outMin = min + p.outMax = max + p.isOutLimitsSet = true +} + +func (p *PIDController) SetIntegralLimits(min, max float64) { + p.iMin = min + p.iMax = max + p.isILimitsSet = true +} + +func (p *PIDController) Update(setpoint, measurement float64, at time.Time) float64 { + diff := setpoint - measurement + if p.prevMeasurementTime.IsZero() { + p.prevError = diff + p.prevMeasurement = measurement + p.prevMeasurementTime = at + return 0 + } + + proportional := p.kp * diff + + duration := at.Sub(p.prevMeasurementTime).Seconds() + p.iVal = p.iVal + (0.5 * p.ki * duration * (diff + p.prevError)) + if p.isILimitsSet { + if p.iVal > p.iMax { + p.iVal = p.iMax + } + if p.iVal < p.iMin { + p.iVal = p.iMin + } + } + + p.dVal = (-2.0 * p.kd * (measurement - p.prevMeasurement)) + (((2.0*p.tau - duration) * p.dVal) / (2.0*p.tau + duration)) + + output := proportional + p.iVal + p.dVal + if p.isOutLimitsSet { + if output > p.outMax { + output = p.outMax + } + if output < p.outMin { + output = p.outMin + } + } + + p.prevError = diff + p.prevMeasurement = measurement + p.prevMeasurementTime = at + return output +} + +// ------------------------------------------------------------------- diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index b69f4dccb..e9bca290f 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -185,6 +185,8 @@ type DownTrack struct { sequencer *sequencer bufferFactory *buffer.Factory + allowTimestampAdjustment bool + forwarder *Forwarder upstreamCodecs []webrtc.RTPCodecParameters @@ -252,6 +254,7 @@ func NewDownTrack( bf *buffer.Factory, subID livekit.ParticipantID, mt int, + allowTimestampAdjustment bool, logger logger.Logger, ) (*DownTrack, error) { var kind webrtc.RTPCodecType @@ -265,16 +268,17 @@ func NewDownTrack( } d := &DownTrack{ - logger: logger, - id: r.TrackID(), - subscriberID: subID, - maxTrack: mt, - streamID: r.StreamID(), - bufferFactory: bf, - receiver: r, - upstreamCodecs: codecs, - kind: kind, - codec: codecs[0].RTPCodecCapability, + logger: logger, + id: r.TrackID(), + subscriberID: subID, + maxTrack: mt, + streamID: r.StreamID(), + bufferFactory: bf, + allowTimestampAdjustment: allowTimestampAdjustment, + receiver: r, + upstreamCodecs: codecs, + kind: kind, + codec: codecs[0].RTPCodecCapability, } d.forwarder = NewForwarder( d.kind, @@ -1113,7 +1117,11 @@ func (d *DownTrack) CreateSenderReport() *rtcp.SenderReport { return nil } - return d.rtpStats.GetRtcpSenderReport(d.ssrc) + sr, tsAdjust := d.rtpStats.GetRtcpSenderReport(d.ssrc) + if d.allowTimestampAdjustment { + d.forwarder.AdjustTimestamp(tsAdjust) + } + return sr } func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan struct{} { diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index e8e8167b0..b347be756 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -1630,6 +1630,13 @@ func (f *Forwarder) GetRTPMungerParams() RTPMungerParams { return f.rtpMunger.GetParams() } +func (f *Forwarder) AdjustTimestamp(tsAdjust float64) { + f.lock.Lock() + defer f.lock.Unlock() + + f.rtpMunger.UpdateTsOffset(uint32(tsAdjust)) +} + // ----------------------------------------------------------------------------- func getOptimalBandwidthNeeded(muted bool, pubMuted bool, maxPublishedLayer int32, brs Bitrates, maxLayer buffer.VideoLayer) int64 { diff --git a/pkg/sfu/rtpmunger.go b/pkg/sfu/rtpmunger.go index 35ea2b25f..8fa1d95d4 100644 --- a/pkg/sfu/rtpmunger.go +++ b/pkg/sfu/rtpmunger.go @@ -58,6 +58,7 @@ type RTPMungerParams struct { highestIncomingSN uint16 lastSN uint16 snOffset uint16 + highestIncomingTS uint32 lastTS uint32 tsOffset uint32 lastMarker bool @@ -88,6 +89,7 @@ func (r *RTPMunger) GetParams() RTPMungerParams { highestIncomingSN: r.highestIncomingSN, lastSN: r.lastSN, snOffset: r.snOffset, + highestIncomingTS: r.highestIncomingTS, lastTS: r.lastTS, tsOffset: r.tsOffset, lastMarker: r.lastMarker, @@ -110,6 +112,7 @@ func (r *RTPMunger) SeedLast(state RTPMungerState) { func (r *RTPMunger) SetLastSnTs(extPkt *buffer.ExtPacket) { r.highestIncomingSN = extPkt.Packet.SequenceNumber - 1 + r.highestIncomingTS = extPkt.Packet.Timestamp if !r.started { r.lastSN = extPkt.Packet.SequenceNumber r.lastTS = extPkt.Packet.Timestamp @@ -122,6 +125,7 @@ func (r *RTPMunger) SetLastSnTs(extPkt *buffer.ExtPacket) { func (r *RTPMunger) UpdateSnTsOffsets(extPkt *buffer.ExtPacket, snAdjust uint16, tsAdjust uint32) { r.highestIncomingSN = extPkt.Packet.SequenceNumber - 1 + r.highestIncomingTS = extPkt.Packet.Timestamp r.snOffset = extPkt.Packet.SequenceNumber - r.lastSN - snAdjust r.tsOffset = extPkt.Packet.Timestamp - r.lastTS - tsAdjust @@ -138,6 +142,10 @@ func (r *RTPMunger) PacketDropped(extPkt *buffer.ExtPacket) { r.lastSN = extPkt.Packet.SequenceNumber - r.snOffset } +func (r *RTPMunger) UpdateTsOffset(tsAdjust uint32) { + r.tsOffset -= tsAdjust +} + func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationParamsRTP, error) { // if out-of-order, look up sequence number offset cache diff := extPkt.Packet.SequenceNumber - r.highestIncomingSN @@ -200,8 +208,27 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara mungedSN := extPkt.Packet.SequenceNumber - r.snOffset mungedTS := extPkt.Packet.Timestamp - r.tsOffset + // with timestamp adjustment, it is possible that the adjustment causes munged timestamp to move backwards, + // detect that and adjust so that it does not move back + if extPkt.Packet.Timestamp != r.highestIncomingTS && (((mungedTS - r.lastTS) == 0) || (mungedTS-r.lastTS) > (1<<31)) { + adjustedMungedTS := r.lastTS + 1 + adjustedTSOffset := extPkt.Packet.Timestamp - adjustedMungedTS + r.logger.Infow( + "adjust out-of-order timestamp offset", + "mungedTS", mungedTS, + "lastTS", r.lastTS, + "incomingTS", extPkt.Packet.Timestamp, + "offset", r.tsOffset, + "adjustedMungedTS", adjustedMungedTS, + "adjustedTSOffset", adjustedTSOffset, + ) + mungedTS = adjustedMungedTS + r.tsOffset = adjustedTSOffset + } + r.highestIncomingSN = extPkt.Packet.SequenceNumber r.lastSN = mungedSN + r.highestIncomingTS = extPkt.Packet.Timestamp r.lastTS = mungedTS r.lastMarker = extPkt.Packet.Marker diff --git a/pkg/sfu/streamtrackermanager.go b/pkg/sfu/streamtrackermanager.go index 256b5c4ce..ee5f039e7 100644 --- a/pkg/sfu/streamtrackermanager.go +++ b/pkg/sfu/streamtrackermanager.go @@ -494,9 +494,11 @@ func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ts uint32, layer in return 0, fmt.Errorf("invalid layer, target: %d, reference: %d", layer, referenceLayer) } + /* TODO-RESTORE-AFTER-DEBUG - this is just fast path, below calculations should yield same if layer == referenceLayer { return ts, nil } + */ var srLayer *buffer.RTCPSenderReportData if int(layer) < len(s.senderReports) { @@ -521,6 +523,20 @@ func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ts uint32, layer in ntpDiff := srRef.NTPTimestamp.Time().Sub(srLayer.NTPTimestamp.Time()) rtpDiff := ntpDiff.Nanoseconds() * int64(s.clockRate) / 1e9 normalizedTS := srLayer.RTPTimestamp + uint32(rtpDiff) + s.logger.Debugw( + "getting reference timestaml", + "layer", layer, + "referenceLayer", referenceLayer, + "incomingTS", ts, + "layerNTP", srLayer.NTPTimestamp.Time().String(), + "refNTP", srRef.NTPTimestamp.Time().String(), + "ntpDiff", ntpDiff.String(), + "layerRTP", srLayer.RTPTimestamp, + "refRTP", srRef.RTPTimestamp, + "rtpDiff", rtpDiff, + "normalizedTS", normalizedTS, + "mappedTS", ts+(srRef.RTPTimestamp-normalizedTS), + ) // now that both RTP timestamps correspond to roughly the same NTP time, // the diff between them is the offset in RTP timestamp units between layer and referenceLayer.