diff --git a/pkg/config/config.go b/pkg/config/config.go index 315fb3aae..cdab66a70 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -107,6 +107,9 @@ type RTCConfig struct { // force a reconnect on a subscription error ReconnectOnSubscriptionError *bool `yaml:"reconnect_on_subscription_error,omitempty"` + + // allow time stamp adjust to keep drift low, this is experimental + AllowTimestampAdjustment *bool `yaml:"allow_timestamp_adjustment,omitempty"` } type TURNServer struct { diff --git a/pkg/rtc/mediatracksubscriptions.go b/pkg/rtc/mediatracksubscriptions.go index af82fe224..bbf439e43 100644 --- a/pkg/rtc/mediatracksubscriptions.go +++ b/pkg/rtc/mediatracksubscriptions.go @@ -104,6 +104,7 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr * sub.GetBufferFactory(), subscriberID, t.params.ReceiverConfig.PacketBufferSize, + sub.GetAllowTimestampAdjustment(), LoggerWithTrack(sub.GetLogger(), trackID, t.params.IsRelayed), ) if err != nil { diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 793defe93..91008754c 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -92,6 +92,7 @@ type ParticipantParams struct { SubscriberAllowPause bool SubscriptionLimitAudio int32 SubscriptionLimitVideo int32 + AllowTimestampAdjustment bool } type ParticipantImpl struct { @@ -228,6 +229,10 @@ func (p *ParticipantImpl) GetAdaptiveStream() bool { return p.params.AdaptiveStream } +func (p *ParticipantImpl) GetAllowTimestampAdjustment() bool { + return p.params.AllowTimestampAdjustment +} + func (p *ParticipantImpl) ID() livekit.ParticipantID { return p.params.SID } diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 84989179a..1a0a9532c 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -341,6 +341,8 @@ type LocalParticipant interface { // down stream bandwidth management SetSubscriberAllowPause(allowPause bool) SetSubscriberChannelCapacity(channelCapacity int64) + + GetAllowTimestampAdjustment() bool } // Room is a container of participants, and can provide room-level actions diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index e4ddfea5f..3fbdcbc7d 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -165,6 +165,16 @@ type FakeLocalParticipant struct { getAdaptiveStreamReturnsOnCall map[int]struct { result1 bool } + GetAllowTimestampAdjustmentStub func() bool + getAllowTimestampAdjustmentMutex sync.RWMutex + getAllowTimestampAdjustmentArgsForCall []struct { + } + getAllowTimestampAdjustmentReturns struct { + result1 bool + } + getAllowTimestampAdjustmentReturnsOnCall map[int]struct { + result1 bool + } GetAudioLevelStub func() (float64, bool) getAudioLevelMutex sync.RWMutex getAudioLevelArgsForCall []struct { @@ -1612,6 +1622,59 @@ func (fake *FakeLocalParticipant) GetAdaptiveStreamReturnsOnCall(i int, result1 }{result1} } +func (fake *FakeLocalParticipant) GetAllowTimestampAdjustment() bool { + fake.getAllowTimestampAdjustmentMutex.Lock() + ret, specificReturn := fake.getAllowTimestampAdjustmentReturnsOnCall[len(fake.getAllowTimestampAdjustmentArgsForCall)] + fake.getAllowTimestampAdjustmentArgsForCall = append(fake.getAllowTimestampAdjustmentArgsForCall, struct { + }{}) + stub := fake.GetAllowTimestampAdjustmentStub + fakeReturns := fake.getAllowTimestampAdjustmentReturns + fake.recordInvocation("GetAllowTimestampAdjustment", []interface{}{}) + fake.getAllowTimestampAdjustmentMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeLocalParticipant) GetAllowTimestampAdjustmentCallCount() int { + fake.getAllowTimestampAdjustmentMutex.RLock() + defer fake.getAllowTimestampAdjustmentMutex.RUnlock() + return len(fake.getAllowTimestampAdjustmentArgsForCall) +} + +func (fake *FakeLocalParticipant) GetAllowTimestampAdjustmentCalls(stub func() bool) { + fake.getAllowTimestampAdjustmentMutex.Lock() + defer fake.getAllowTimestampAdjustmentMutex.Unlock() + fake.GetAllowTimestampAdjustmentStub = stub +} + +func (fake *FakeLocalParticipant) GetAllowTimestampAdjustmentReturns(result1 bool) { + fake.getAllowTimestampAdjustmentMutex.Lock() + defer fake.getAllowTimestampAdjustmentMutex.Unlock() + fake.GetAllowTimestampAdjustmentStub = nil + fake.getAllowTimestampAdjustmentReturns = struct { + result1 bool + }{result1} +} + +func (fake *FakeLocalParticipant) GetAllowTimestampAdjustmentReturnsOnCall(i int, result1 bool) { + fake.getAllowTimestampAdjustmentMutex.Lock() + defer fake.getAllowTimestampAdjustmentMutex.Unlock() + fake.GetAllowTimestampAdjustmentStub = nil + if fake.getAllowTimestampAdjustmentReturnsOnCall == nil { + fake.getAllowTimestampAdjustmentReturnsOnCall = make(map[int]struct { + result1 bool + }) + } + fake.getAllowTimestampAdjustmentReturnsOnCall[i] = struct { + result1 bool + }{result1} +} + func (fake *FakeLocalParticipant) GetAudioLevel() (float64, bool) { fake.getAudioLevelMutex.Lock() ret, specificReturn := fake.getAudioLevelReturnsOnCall[len(fake.getAudioLevelArgsForCall)] @@ -5456,6 +5519,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.debugInfoMutex.RUnlock() fake.getAdaptiveStreamMutex.RLock() defer fake.getAdaptiveStreamMutex.RUnlock() + fake.getAllowTimestampAdjustmentMutex.RLock() + defer fake.getAllowTimestampAdjustmentMutex.RUnlock() fake.getAudioLevelMutex.RLock() defer fake.getAudioLevelMutex.RUnlock() fake.getBufferFactoryMutex.RLock() diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 5b24c44bc..c9986e92a 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -309,6 +309,11 @@ func (r *RoomManager) StartSession( if pi.SubscriberAllowPause != nil { subscriberAllowPause = *pi.SubscriberAllowPause } + // default do not allow timestamp adjustment + allowTimestampAdjustment := false + if r.config.RTC.AllowTimestampAdjustment != nil { + allowTimestampAdjustment = *r.config.RTC.AllowTimestampAdjustment + } participant, err = rtc.NewParticipant(rtc.ParticipantParams{ Identity: pi.Identity, Name: pi.Name, @@ -343,6 +348,7 @@ func (r *RoomManager) StartSession( SubscriberAllowPause: subscriberAllowPause, SubscriptionLimitAudio: r.config.Limit.SubscriptionLimitAudio, SubscriptionLimitVideo: r.config.Limit.SubscriptionLimitVideo, + AllowTimestampAdjustment: allowTimestampAdjustment, }) if err != nil { return err diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index 339959e6d..9b274f4e3 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -174,21 +174,28 @@ type RTPStats struct { rtt uint32 maxRtt uint32 - srData *RTCPSenderReportData - lastSRTime time.Time - lastSRNTP mediatransportutil.NtpTime + srData *RTCPSenderReportData + lastSRTime time.Time + lastSRNTP mediatransportutil.NtpTime + pidController *PIDController nextSnapshotId uint32 snapshots map[uint32]*Snapshot } func NewRTPStats(params RTPStatsParams) *RTPStats { - return &RTPStats{ + r := &RTPStats{ params: params, logger: params.Logger, nextSnapshotId: FirstSnapshotId, snapshots: make(map[uint32]*Snapshot), + pidController: NewPIDController(), } + + r.pidController.SetGains(2.0, 0.5, 0.25) + r.pidController.SetDerivativeLPF(0.02) + r.pidController.SetOutputLimits(-0.025*float64(params.ClockRate), 0.025*float64(params.ClockRate)) + return r } func (r *RTPStats) Seed(from *RTPStats) { @@ -731,37 +738,35 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) { } // TODO-REMOVE-AFTER-DEBUG-START - if r.params.ClockRate != 90000 { // log only for audio as it is less frequent - ntpTime := srData.NTPTimestamp.Time() + ntpTime := srData.NTPTimestamp.Time() - var ntpDiffSinceLast, arrivalDiffSinceLast time.Duration - var rtpDiffSinceLast uint32 - if r.srData != nil { - ntpDiffSinceLast = ntpTime.Sub(r.srData.NTPTimestamp.Time()) - rtpDiffSinceLast = srData.RTPTimestamp - r.srData.RTPTimestamp - arrivalDiffSinceLast = srData.ArrivalTime.Sub(r.srData.ArrivalTime) - } - - timeSinceFirst := srData.NTPTimestamp.Time().Sub(r.firstTime) - rtpDiffSinceFirst := getExtTS(srData.RTPTimestamp, r.tsCycles) - r.extStartTS - drift := int64(uint64(timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9) - rtpDiffSinceFirst) - driftMs := (float64(drift) * 1000) / float64(r.params.ClockRate) - - r.logger.Debugw( - "received sender report", - "ntp", ntpTime, - "rtp", srData.RTPTimestamp, - "arrival", srData.ArrivalTime, - "ntpDiff", ntpDiffSinceLast, - "rtpDiff", rtpDiffSinceLast, - "arrivalDiff", arrivalDiffSinceLast, - "expectedTimeDiff", float64(rtpDiffSinceLast)/float64(r.params.ClockRate), - "timeSinceFirst", timeSinceFirst, - "rtpDiffSinceFirst", rtpDiffSinceFirst, - "drift", drift, - "driftMs", driftMs, - ) + var ntpDiffSinceLast, arrivalDiffSinceLast time.Duration + var rtpDiffSinceLast uint32 + if r.srData != nil { + ntpDiffSinceLast = ntpTime.Sub(r.srData.NTPTimestamp.Time()) + rtpDiffSinceLast = srData.RTPTimestamp - r.srData.RTPTimestamp + arrivalDiffSinceLast = srData.ArrivalTime.Sub(r.srData.ArrivalTime) } + + timeSinceFirst := time.Since(r.firstTime) // ideally should use NTP time from SR, but that is a different time base, now is a resonable approximation + rtpDiffSinceFirst := getExtTS(srData.RTPTimestamp, r.tsCycles) - r.extStartTS + drift := int64(uint64(timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9) - rtpDiffSinceFirst) + driftMs := (float64(drift) * 1000) / float64(r.params.ClockRate) + + r.logger.Debugw( + "received sender report", + "ntp", ntpTime, + "rtp", srData.RTPTimestamp, + "arrival", srData.ArrivalTime, + "ntpDiff", ntpDiffSinceLast, + "rtpDiff", rtpDiffSinceLast, + "arrivalDiff", arrivalDiffSinceLast, + "expectedTimeDiff", float64(rtpDiffSinceLast)/float64(r.params.ClockRate), + "timeSinceFirst", timeSinceFirst, + "rtpDiffSinceFirst", rtpDiffSinceFirst, + "drift", drift, + "driftMs", driftMs, + ) // TODO-REMOVE-AFTER-DEBUG-END srDataCopy := *srData @@ -806,12 +811,12 @@ func (r *RTPStats) GetExpectedRTPTimestamp(at time.Time) (uint32, error) { return uint32(expectedExtRTP), nil } -func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) *rtcp.SenderReport { +func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) (*rtcp.SenderReport, float64) { r.lock.Lock() defer r.lock.Unlock() if !r.initialized { - return nil + return nil, 0.0 } // construct current time based on monotonic clock @@ -836,6 +841,23 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) *rtcp.SenderReport { timeSinceHighest := time.Since(r.highestTime) nowRTP := r.highestTS + uint32(timeSinceHighest.Nanoseconds()*int64(r.params.ClockRate)/1e9) + // TODO-REMOVE-AFTER-DEBUG-START + timeSinceFirst = nowNTP.Time().Sub(r.firstTime) + rtpDiffSinceFirst := getExtTS(nowRTP, r.tsCycles) - r.extStartTS + measurement := float64(rtpDiffSinceFirst) / timeSinceFirst.Seconds() + pidOutput := r.pidController.Update( + float64(r.params.ClockRate), + measurement, + now, + ) + r.logger.Debugw( + "pid controller output", + "measurement", measurement, + "errorTerm", float64(r.params.ClockRate)-measurement, + "pidOutput", pidOutput, + ) + // TODO-REMOVE-AFTER-DEBUG-STOP + // TODO-REMOVE-AFTER-DEBUG-START ntpTime := nowNTP.Time() @@ -843,8 +865,6 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) *rtcp.SenderReport { rtpDiffLocal := int32(nowRTP - r.highestTS) rtpOffsetLocal := int32(nowRTP - r.highestTS - uint32(ntpDiffLocal.Nanoseconds()*int64(r.params.ClockRate)/1e9)) - timeSinceFirst = nowNTP.Time().Sub(r.firstTime) - rtpDiffSinceFirst := getExtTS(nowRTP, r.tsCycles) - r.extStartTS drift := int64(uint64(timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9) - rtpDiffSinceFirst) driftMs := (float64(drift) * 1000) / float64(r.params.ClockRate) @@ -853,6 +873,7 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) *rtcp.SenderReport { "highestTS", r.highestTS, "highestTime", r.highestTime.String(), "reportTS", nowRTP, + "expectedTS", uint32(expectedExtRTP), "reportTime", ntpTime.String(), "rtpDiffLocal", rtpDiffLocal, "ntpDiffLocal", ntpDiffLocal, @@ -861,6 +882,7 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) *rtcp.SenderReport { "rtpDiffSinceFirst", rtpDiffSinceFirst, "drift", drift, "driftMs", driftMs, + "rate", measurement, ) // TODO-REMOVE-AFTER-DEBUG-END @@ -873,7 +895,7 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) *rtcp.SenderReport { RTPTime: nowRTP, PacketCount: r.getTotalPacketsPrimary() + r.packetsDuplicate + r.packetsPadding, OctetCount: uint32(r.bytes + r.bytesDuplicate + r.bytesPadding), - } + }, pidOutput } func (r *RTPStats) SnapshotRtcpReceptionReport(ssrc uint32, proxyFracLost uint8, snapshotId uint32) *rtcp.ReceptionReport { @@ -1747,3 +1769,90 @@ func AggregateRTPDeltaInfo(deltaInfoList []*RTPDeltaInfo) *RTPDeltaInfo { Firs: firs, } } + +// ------------------------------------------------------------------- + +type PIDController struct { + kp, ki, kd float64 + + tau float64 // low pass filter of D, time constant + + outMin, outMax float64 + isOutLimitsSet bool + + iMin, iMax float64 + isILimitsSet bool + + iVal, dVal float64 + + prevError, prevMeasurement float64 + prevMeasurementTime time.Time +} + +func NewPIDController() *PIDController { + return &PIDController{} +} + +func (p *PIDController) SetGains(kp, ki, kd float64) { + p.kp = kp + p.ki = ki + p.kd = kd +} + +func (p *PIDController) SetDerivativeLPF(tau float64) { + p.tau = tau +} + +func (p *PIDController) SetOutputLimits(min, max float64) { + p.outMin = min + p.outMax = max + p.isOutLimitsSet = true +} + +func (p *PIDController) SetIntegralLimits(min, max float64) { + p.iMin = min + p.iMax = max + p.isILimitsSet = true +} + +func (p *PIDController) Update(setpoint, measurement float64, at time.Time) float64 { + diff := setpoint - measurement + if p.prevMeasurementTime.IsZero() { + p.prevError = diff + p.prevMeasurement = measurement + p.prevMeasurementTime = at + return 0 + } + + proportional := p.kp * diff + + duration := at.Sub(p.prevMeasurementTime).Seconds() + p.iVal = p.iVal + (0.5 * p.ki * duration * (diff + p.prevError)) + if p.isILimitsSet { + if p.iVal > p.iMax { + p.iVal = p.iMax + } + if p.iVal < p.iMin { + p.iVal = p.iMin + } + } + + p.dVal = (-2.0 * p.kd * (measurement - p.prevMeasurement)) + (((2.0*p.tau - duration) * p.dVal) / (2.0*p.tau + duration)) + + output := proportional + p.iVal + p.dVal + if p.isOutLimitsSet { + if output > p.outMax { + output = p.outMax + } + if output < p.outMin { + output = p.outMin + } + } + + p.prevError = diff + p.prevMeasurement = measurement + p.prevMeasurementTime = at + return output +} + +// ------------------------------------------------------------------- diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index b69f4dccb..e9bca290f 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -185,6 +185,8 @@ type DownTrack struct { sequencer *sequencer bufferFactory *buffer.Factory + allowTimestampAdjustment bool + forwarder *Forwarder upstreamCodecs []webrtc.RTPCodecParameters @@ -252,6 +254,7 @@ func NewDownTrack( bf *buffer.Factory, subID livekit.ParticipantID, mt int, + allowTimestampAdjustment bool, logger logger.Logger, ) (*DownTrack, error) { var kind webrtc.RTPCodecType @@ -265,16 +268,17 @@ func NewDownTrack( } d := &DownTrack{ - logger: logger, - id: r.TrackID(), - subscriberID: subID, - maxTrack: mt, - streamID: r.StreamID(), - bufferFactory: bf, - receiver: r, - upstreamCodecs: codecs, - kind: kind, - codec: codecs[0].RTPCodecCapability, + logger: logger, + id: r.TrackID(), + subscriberID: subID, + maxTrack: mt, + streamID: r.StreamID(), + bufferFactory: bf, + allowTimestampAdjustment: allowTimestampAdjustment, + receiver: r, + upstreamCodecs: codecs, + kind: kind, + codec: codecs[0].RTPCodecCapability, } d.forwarder = NewForwarder( d.kind, @@ -1113,7 +1117,11 @@ func (d *DownTrack) CreateSenderReport() *rtcp.SenderReport { return nil } - return d.rtpStats.GetRtcpSenderReport(d.ssrc) + sr, tsAdjust := d.rtpStats.GetRtcpSenderReport(d.ssrc) + if d.allowTimestampAdjustment { + d.forwarder.AdjustTimestamp(tsAdjust) + } + return sr } func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan struct{} { diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index e8e8167b0..b347be756 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -1630,6 +1630,13 @@ func (f *Forwarder) GetRTPMungerParams() RTPMungerParams { return f.rtpMunger.GetParams() } +func (f *Forwarder) AdjustTimestamp(tsAdjust float64) { + f.lock.Lock() + defer f.lock.Unlock() + + f.rtpMunger.UpdateTsOffset(uint32(tsAdjust)) +} + // ----------------------------------------------------------------------------- func getOptimalBandwidthNeeded(muted bool, pubMuted bool, maxPublishedLayer int32, brs Bitrates, maxLayer buffer.VideoLayer) int64 { diff --git a/pkg/sfu/rtpmunger.go b/pkg/sfu/rtpmunger.go index 35ea2b25f..8fa1d95d4 100644 --- a/pkg/sfu/rtpmunger.go +++ b/pkg/sfu/rtpmunger.go @@ -58,6 +58,7 @@ type RTPMungerParams struct { highestIncomingSN uint16 lastSN uint16 snOffset uint16 + highestIncomingTS uint32 lastTS uint32 tsOffset uint32 lastMarker bool @@ -88,6 +89,7 @@ func (r *RTPMunger) GetParams() RTPMungerParams { highestIncomingSN: r.highestIncomingSN, lastSN: r.lastSN, snOffset: r.snOffset, + highestIncomingTS: r.highestIncomingTS, lastTS: r.lastTS, tsOffset: r.tsOffset, lastMarker: r.lastMarker, @@ -110,6 +112,7 @@ func (r *RTPMunger) SeedLast(state RTPMungerState) { func (r *RTPMunger) SetLastSnTs(extPkt *buffer.ExtPacket) { r.highestIncomingSN = extPkt.Packet.SequenceNumber - 1 + r.highestIncomingTS = extPkt.Packet.Timestamp if !r.started { r.lastSN = extPkt.Packet.SequenceNumber r.lastTS = extPkt.Packet.Timestamp @@ -122,6 +125,7 @@ func (r *RTPMunger) SetLastSnTs(extPkt *buffer.ExtPacket) { func (r *RTPMunger) UpdateSnTsOffsets(extPkt *buffer.ExtPacket, snAdjust uint16, tsAdjust uint32) { r.highestIncomingSN = extPkt.Packet.SequenceNumber - 1 + r.highestIncomingTS = extPkt.Packet.Timestamp r.snOffset = extPkt.Packet.SequenceNumber - r.lastSN - snAdjust r.tsOffset = extPkt.Packet.Timestamp - r.lastTS - tsAdjust @@ -138,6 +142,10 @@ func (r *RTPMunger) PacketDropped(extPkt *buffer.ExtPacket) { r.lastSN = extPkt.Packet.SequenceNumber - r.snOffset } +func (r *RTPMunger) UpdateTsOffset(tsAdjust uint32) { + r.tsOffset -= tsAdjust +} + func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationParamsRTP, error) { // if out-of-order, look up sequence number offset cache diff := extPkt.Packet.SequenceNumber - r.highestIncomingSN @@ -200,8 +208,27 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara mungedSN := extPkt.Packet.SequenceNumber - r.snOffset mungedTS := extPkt.Packet.Timestamp - r.tsOffset + // with timestamp adjustment, it is possible that the adjustment causes munged timestamp to move backwards, + // detect that and adjust so that it does not move back + if extPkt.Packet.Timestamp != r.highestIncomingTS && (((mungedTS - r.lastTS) == 0) || (mungedTS-r.lastTS) > (1<<31)) { + adjustedMungedTS := r.lastTS + 1 + adjustedTSOffset := extPkt.Packet.Timestamp - adjustedMungedTS + r.logger.Infow( + "adjust out-of-order timestamp offset", + "mungedTS", mungedTS, + "lastTS", r.lastTS, + "incomingTS", extPkt.Packet.Timestamp, + "offset", r.tsOffset, + "adjustedMungedTS", adjustedMungedTS, + "adjustedTSOffset", adjustedTSOffset, + ) + mungedTS = adjustedMungedTS + r.tsOffset = adjustedTSOffset + } + r.highestIncomingSN = extPkt.Packet.SequenceNumber r.lastSN = mungedSN + r.highestIncomingTS = extPkt.Packet.Timestamp r.lastTS = mungedTS r.lastMarker = extPkt.Packet.Marker diff --git a/pkg/sfu/streamtrackermanager.go b/pkg/sfu/streamtrackermanager.go index 256b5c4ce..ee5f039e7 100644 --- a/pkg/sfu/streamtrackermanager.go +++ b/pkg/sfu/streamtrackermanager.go @@ -494,9 +494,11 @@ func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ts uint32, layer in return 0, fmt.Errorf("invalid layer, target: %d, reference: %d", layer, referenceLayer) } + /* TODO-RESTORE-AFTER-DEBUG - this is just fast path, below calculations should yield same if layer == referenceLayer { return ts, nil } + */ var srLayer *buffer.RTCPSenderReportData if int(layer) < len(s.senderReports) { @@ -521,6 +523,20 @@ func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ts uint32, layer in ntpDiff := srRef.NTPTimestamp.Time().Sub(srLayer.NTPTimestamp.Time()) rtpDiff := ntpDiff.Nanoseconds() * int64(s.clockRate) / 1e9 normalizedTS := srLayer.RTPTimestamp + uint32(rtpDiff) + s.logger.Debugw( + "getting reference timestaml", + "layer", layer, + "referenceLayer", referenceLayer, + "incomingTS", ts, + "layerNTP", srLayer.NTPTimestamp.Time().String(), + "refNTP", srRef.NTPTimestamp.Time().String(), + "ntpDiff", ntpDiff.String(), + "layerRTP", srLayer.RTPTimestamp, + "refRTP", srRef.RTPTimestamp, + "rtpDiff", rtpDiff, + "normalizedTS", normalizedTS, + "mappedTS", ts+(srRef.RTPTimestamp-normalizedTS), + ) // now that both RTP timestamps correspond to roughly the same NTP time, // the diff between them is the offset in RTP timestamp units between layer and referenceLayer.