From acbd4ea10406aea9255c399ec0e45c02037ccb04 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Mon, 8 Jul 2024 11:07:20 +0530 Subject: [PATCH] Handle cases of long mute/rollover of time stamp. (#2842) * Handle cases of long mute/rollover of time stamp. There are cases where the track is muted for long enough for timestamp roll over to happen. There are no packets in that window (typically there should be black frames (for video) or silence (for audio)). But, maybe the pause based implementation of mute is causing this. Anyhow, use time since last packet to gauge how much roll over should have happened and use that to update time stamp. There will be really edge cases where this could also fail (for e. g. packet time is affected by propagation delay, so it could theoretically happen that mute/unmute + packet reception could happen exactly around that rollover point and miscalculate, but should be rare). As this happen per packet on receive side, changing time to `UnixNano()` to make it more efficient to check this. * spelling * tests * test util * tests --- pkg/sfu/audio/audiolevel.go | 13 ++-- pkg/sfu/audio/audiolevel_test.go | 16 ++--- pkg/sfu/buffer/buffer.go | 32 ++++----- pkg/sfu/buffer/rtpstats_base.go | 59 ++++++++-------- pkg/sfu/buffer/rtpstats_receiver.go | 59 +++++++++++----- pkg/sfu/buffer/rtpstats_receiver_test.go | 20 +++--- pkg/sfu/buffer/rtpstats_sender.go | 27 ++++++-- pkg/sfu/downtrack.go | 10 +-- pkg/sfu/forwardstats.go | 21 +++--- pkg/sfu/receiver.go | 2 +- pkg/sfu/redprimaryreceiver.go | 2 +- pkg/sfu/sequencer.go | 10 +-- pkg/sfu/sequencer_test.go | 20 +++--- pkg/sfu/testutils/data.go | 2 +- pkg/sfu/utils/wraparound.go | 30 +++++++++ pkg/sfu/utils/wraparound_test.go | 85 ++++++++++++++++++++++++ 16 files changed, 283 insertions(+), 125 deletions(-) diff --git a/pkg/sfu/audio/audiolevel.go b/pkg/sfu/audio/audiolevel.go index f908788d8..bd4ea4a72 100644 --- a/pkg/sfu/audio/audiolevel.go +++ b/pkg/sfu/audio/audiolevel.go @@ -17,7 +17,6 @@ package audio import ( "math" "sync" - "time" ) const ( @@ -46,7 +45,7 @@ type AudioLevel struct { loudestObservedLevel uint8 activeDuration uint32 // ms observedDuration uint32 // ms - lastObservedAt time.Time + lastObservedAt int64 } func NewAudioLevel(params AudioLevelParams) *AudioLevel { @@ -67,7 +66,7 @@ func NewAudioLevel(params AudioLevelParams) *AudioLevel { } // Observes a new frame -func (l *AudioLevel) Observe(level uint8, durationMs uint32, arrivalTime time.Time) { +func (l *AudioLevel) Observe(level uint8, durationMs uint32, arrivalTime int64) { l.lock.Lock() defer l.lock.Unlock() @@ -101,8 +100,8 @@ func (l *AudioLevel) Observe(level uint8, durationMs uint32, arrivalTime time.Ti } } -// returns current soothed audio level -func (l *AudioLevel) GetLevel(now time.Time) (float64, bool) { +// returns current smoothed audio level +func (l *AudioLevel) GetLevel(now int64) (float64, bool) { l.lock.Lock() defer l.lock.Unlock() @@ -111,8 +110,8 @@ func (l *AudioLevel) GetLevel(now time.Time) (float64, bool) { return l.smoothedLevel, l.smoothedLevel >= l.activeThreshold } -func (l *AudioLevel) resetIfStaleLocked(arrivalTime time.Time) { - if arrivalTime.Sub(l.lastObservedAt).Milliseconds() < int64(2*l.params.ObserveDuration) { +func (l *AudioLevel) resetIfStaleLocked(arrivalTime int64) { + if (arrivalTime-l.lastObservedAt)/1e6 < int64(2*l.params.ObserveDuration) { return } diff --git a/pkg/sfu/audio/audiolevel_test.go b/pkg/sfu/audio/audiolevel_test.go index 84d5a34dd..a5ecba189 100644 --- a/pkg/sfu/audio/audiolevel_test.go +++ b/pkg/sfu/audio/audiolevel_test.go @@ -34,13 +34,13 @@ func TestAudioLevel(t *testing.T) { clock := time.Now() a := createAudioLevel(defaultActiveLevel, defaultPercentile, defaultObserveDuration) - _, noisy := a.GetLevel(clock) + _, noisy := a.GetLevel(clock.UnixNano()) require.False(t, noisy) observeSamples(a, 28, 5, clock) clock = clock.Add(5 * 20 * time.Millisecond) - _, noisy = a.GetLevel(clock) + _, noisy = a.GetLevel(clock.UnixNano()) require.False(t, noisy) }) @@ -51,7 +51,7 @@ func TestAudioLevel(t *testing.T) { observeSamples(a, 35, 100, clock) clock = clock.Add(100 * 20 * time.Millisecond) - _, noisy := a.GetLevel(clock) + _, noisy := a.GetLevel(clock.UnixNano()) require.False(t, noisy) }) @@ -66,7 +66,7 @@ func TestAudioLevel(t *testing.T) { observeSamples(a, 35, 1, clock) clock = clock.Add(20 * time.Millisecond) - _, noisy := a.GetLevel(clock) + _, noisy := a.GetLevel(clock.UnixNano()) require.False(t, noisy) }) @@ -81,7 +81,7 @@ func TestAudioLevel(t *testing.T) { observeSamples(a, 29, 8, clock) clock = clock.Add(8 * 20 * time.Millisecond) - level, noisy := a.GetLevel(clock) + level, noisy := a.GetLevel(clock.UnixNano()) require.True(t, noisy) require.Greater(t, level, ConvertAudioLevel(float64(defaultActiveLevel))) require.Less(t, level, ConvertAudioLevel(float64(25))) @@ -93,14 +93,14 @@ func TestAudioLevel(t *testing.T) { observeSamples(a, 25, 100, clock) clock = clock.Add(100 * 20 * time.Millisecond) - level, noisy := a.GetLevel(clock) + level, noisy := a.GetLevel(clock.UnixNano()) require.True(t, noisy) require.Greater(t, level, ConvertAudioLevel(float64(defaultActiveLevel))) require.Less(t, level, ConvertAudioLevel(float64(20))) // let enough time pass to make the samples stale clock = clock.Add(1500 * time.Millisecond) - level, noisy = a.GetLevel(clock) + level, noisy = a.GetLevel(clock.UnixNano()) require.Equal(t, float64(0.0), level) require.False(t, noisy) }) @@ -116,6 +116,6 @@ func createAudioLevel(activeLevel uint8, minPercentile uint8, observeDuration ui func observeSamples(a *AudioLevel, level uint8, count int, baseTime time.Time) { for i := 0; i < count; i++ { - a.Observe(level, 20, baseTime.Add(+time.Duration(i*20)*time.Millisecond)) + a.Observe(level, 20, baseTime.Add(+time.Duration(i*20)*time.Millisecond).UnixNano()) } } diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index 81ab7b5fe..1170118c6 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -45,20 +45,20 @@ import ( ) const ( - ReportDelta = time.Second + ReportDelta = 1e9 InitPacketBufferSizeVideo = 300 InitPacketBufferSizeAudio = 70 ) type pendingPacket struct { - arrivalTime time.Time + arrivalTime int64 packet []byte } type ExtPacket struct { VideoLayer - Arrival time.Time + Arrival int64 ExtSequenceNumber uint64 ExtTimestamp uint64 Packet *rtp.Packet @@ -84,7 +84,7 @@ type Buffer struct { closeOnce sync.Once mediaSSRC uint32 clockRate uint32 - lastReport time.Time + lastReport int64 twccExtID uint8 audioLevelExtID uint8 bound bool @@ -212,7 +212,7 @@ func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapabili b.ppsSnapshotId = b.rtpStats.NewSnapshotId() b.clockRate = codec.ClockRate - b.lastReport = time.Now() + b.lastReport = time.Now().UnixNano() b.mime = strings.ToLower(codec.MimeType) for _, codecParameter := range params.Codecs { if strings.EqualFold(codecParameter.MimeType, codec.MimeType) { @@ -340,10 +340,10 @@ func (b *Buffer) Write(pkt []byte) (n int, err error) { } } - now := time.Now() + now := time.Now().UnixNano() if b.twcc != nil && b.twccExtID != 0 && !b.closed.Load() { if ext := rtpPacket.GetExtension(b.twccExtID); ext != nil { - b.twcc.Push(rtpPacket.SSRC, binary.BigEndian.Uint16(ext[0:2]), now.UnixNano(), rtpPacket.Marker) + b.twcc.Push(rtpPacket.SSRC, binary.BigEndian.Uint16(ext[0:2]), now, rtpPacket.Marker) } } @@ -373,7 +373,7 @@ func (b *Buffer) Write(pkt []byte) (n int, err error) { } b.payloadType = rtpPacket.PayloadType - b.calc(pkt, &rtpPacket, time.Now(), false) + b.calc(pkt, &rtpPacket, now, false) b.Unlock() b.readCond.Broadcast() return @@ -398,7 +398,7 @@ func (b *Buffer) SetPrimaryBufferForRTX(primaryBuffer *Buffer) { } } -func (b *Buffer) writeRTX(rtxPkt *rtp.Packet, arrivalTime time.Time) (n int, err error) { +func (b *Buffer) writeRTX(rtxPkt *rtp.Packet, arrivalTime int64) (n int, err error) { b.Lock() defer b.Unlock() if !b.bound { @@ -541,7 +541,7 @@ func (b *Buffer) SetRTT(rtt uint32) { } } -func (b *Buffer) calc(rawPkt []byte, rtpPacket *rtp.Packet, arrivalTime time.Time, isRTX bool) { +func (b *Buffer) calc(rawPkt []byte, rtpPacket *rtp.Packet, arrivalTime int64, isRTX bool) { defer func() { b.doNACKs() @@ -740,7 +740,7 @@ func (b *Buffer) doFpsCalc(ep *ExtPacket) { } } -func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime time.Time) RTPFlowState { +func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime int64) RTPFlowState { flowState := b.rtpStats.Update( arrivalTime, p.Header.SequenceNumber, @@ -764,7 +764,7 @@ func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime time.Time) RTPFlow return flowState } -func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime time.Time, isRTX bool) { +func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime int64, isRTX bool) { if b.audioLevelExtID != 0 && !isRTX { if !b.latestTSForAudioLevelInitialized { b.latestTSForAudioLevelInitialized = true @@ -786,7 +786,7 @@ func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime time.Time, i } } -func (b *Buffer) getExtPacket(rtpPacket *rtp.Packet, arrivalTime time.Time, flowState RTPFlowState) *ExtPacket { +func (b *Buffer) getExtPacket(rtpPacket *rtp.Packet, arrivalTime int64, flowState RTPFlowState) *ExtPacket { ep := &ExtPacket{ Arrival: arrivalTime, ExtSequenceNumber: flowState.ExtSequenceNumber, @@ -887,8 +887,8 @@ func (b *Buffer) doNACKs() { } } -func (b *Buffer) doReports(arrivalTime time.Time) { - if time.Since(b.lastReport) < ReportDelta { +func (b *Buffer) doReports(arrivalTime int64) { + if arrivalTime-b.lastReport < ReportDelta { return } @@ -1090,7 +1090,7 @@ func (b *Buffer) GetAudioLevel() (float64, bool) { return 0, false } - return b.audioLevel.GetLevel(time.Now()) + return b.audioLevel.GetLevel(time.Now().UnixNano()) } func (b *Buffer) OnFpsChanged(f func()) { diff --git a/pkg/sfu/buffer/rtpstats_base.go b/pkg/sfu/buffer/rtpstats_base.go index 07c906e70..553c0cbd8 100644 --- a/pkg/sfu/buffer/rtpstats_base.go +++ b/pkg/sfu/buffer/rtpstats_base.go @@ -35,7 +35,7 @@ const ( cFirstSnapshotID = 1 cFirstPacketTimeAdjustWindow = 2 * time.Minute - cFirstPacketTimeAdjustThreshold = 15 * time.Second + cFirstPacketTimeAdjustThreshold = 15 * 1e9 cPassthroughNTPTimestamp = true @@ -175,9 +175,9 @@ type rtpStatsBase struct { startTime time.Time endTime time.Time - firstTime time.Time + firstTime int64 firstTimeAdjustment time.Duration - highestTime time.Time + highestTime int64 lastTransit uint64 lastJitterExtTimestamp uint64 @@ -506,7 +506,7 @@ func (r *rtpStatsBase) GetRtt() uint32 { return r.rtt } -func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *RTCPSenderReportData, tsOffset uint64, extStartTS uint64) { +func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *RTCPSenderReportData, tsOffset uint64, extStartTS uint64) (err error, loggingFields []interface{}) { if time.Since(r.startTime) > cFirstPacketTimeAdjustWindow { return } @@ -526,17 +526,17 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *RTCPSenderReportData, } samplesDuration := time.Duration(float64(samplesDiff) / float64(r.params.ClockRate) * float64(time.Second)) - timeSinceFirst := time.Since(r.firstTime) - now := r.firstTime.Add(timeSinceFirst) - firstTime := now.Add(-samplesDuration) + timeSinceFirst := time.Since(time.Unix(0, r.firstTime)) + now := r.firstTime + timeSinceFirst.Nanoseconds() + firstTime := now - samplesDuration.Nanoseconds() getFields := func() []interface{} { return []interface{}{ "startTime", r.startTime.String(), - "nowTime", now.String(), - "before", r.firstTime.String(), - "after", firstTime.String(), - "adjustment", r.firstTime.Sub(firstTime).String(), + "nowTime", time.Unix(0, now).String(), + "before", time.Unix(0, r.firstTime).String(), + "after", time.Unix(0, firstTime).String(), + "adjustment", time.Duration(r.firstTime - firstTime).String(), "extNowTS", extNowTS, "extStartTS", extStartTS, "srData", srData, @@ -548,15 +548,17 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *RTCPSenderReportData, } } - if firstTime.Before(r.firstTime) { - if r.firstTime.Sub(firstTime) > cFirstPacketTimeAdjustThreshold { - r.logger.Infow("adjusting first packet time, too big, ignoring", getFields()...) + if firstTime < r.firstTime { + if r.firstTime-firstTime > cFirstPacketTimeAdjustThreshold { + err = errors.New("adjusting first packet time, too big, ignoring") + loggingFields = getFields() } else { r.logger.Debugw("adjusting first packet time", getFields()...) - r.firstTimeAdjustment += r.firstTime.Sub(firstTime) + r.firstTimeAdjustment += time.Duration(r.firstTime - firstTime) r.firstTime = firstTime } } + return } func (r *rtpStatsBase) getTotalPacketsPrimary(extStartSN, extHighestSN uint64) uint64 { @@ -659,9 +661,9 @@ func (r *rtpStatsBase) MarshalLogObject(e zapcore.ObjectEncoder) error { e.AddTime("startTime", r.startTime) e.AddTime("endTime", r.endTime) - e.AddTime("firstTime", r.firstTime) + e.AddTime("firstTime", time.Unix(0, r.firstTime)) e.AddDuration("firstTimeAdjustment", r.firstTimeAdjustment) - e.AddTime("highestTime", r.highestTime) + e.AddTime("highestTime", time.Unix(0, r.highestTime)) e.AddUint64("bytes", r.bytes) e.AddUint64("headerBytes", r.headerBytes) @@ -907,7 +909,7 @@ func (r *rtpStatsBase) toProto( return p } -func (r *rtpStatsBase) updateJitter(ets uint64, packetTime time.Time) float64 { +func (r *rtpStatsBase) updateJitter(ets uint64, packetTime int64) float64 { // Do not update jitter on multiple packets of same frame. // All packets of a frame have the same time stamp. // NOTE: This does not protect against using more than one packet of the same frame @@ -916,8 +918,8 @@ func (r *rtpStatsBase) updateJitter(ets uint64, packetTime time.Time) float64 { // In this case, p2f1 (packet 2, frame 1) will still be used in jitter calculation // although it is the second packet of a frame because of out-of-order receival. if r.lastJitterExtTimestamp != ets { - timeSinceFirst := packetTime.Sub(r.firstTime) - packetTimeRTP := uint64(timeSinceFirst.Nanoseconds() * int64(r.params.ClockRate) / 1e9) + timeSinceFirst := packetTime - r.firstTime + packetTimeRTP := uint64(timeSinceFirst * int64(r.params.ClockRate) / 1e9) transit := packetTimeRTP - ets if r.lastTransit != 0 { @@ -963,21 +965,22 @@ func (r *rtpStatsBase) getAndResetSnapshot(snapshotID uint32, extStartSN uint64, } func (r *rtpStatsBase) getDrift(extStartTS, extHighestTS uint64) (packetDrift *livekit.RTPDrift, ntpReportDrift *livekit.RTPDrift, rebasedReportDrift *livekit.RTPDrift) { - if !r.firstTime.IsZero() { - elapsed := r.highestTime.Sub(r.firstTime) + if r.firstTime != 0 { + elapsed := r.highestTime - r.firstTime rtpClockTicks := extHighestTS - extStartTS - driftSamples := int64(rtpClockTicks - uint64(elapsed.Nanoseconds()*int64(r.params.ClockRate)/1e9)) - if elapsed.Seconds() > 0.0 { + driftSamples := int64(rtpClockTicks - uint64(elapsed*int64(r.params.ClockRate)/1e9)) + if elapsed > 0 { + elapsedSeconds := time.Duration(elapsed).Seconds() packetDrift = &livekit.RTPDrift{ - StartTime: timestamppb.New(r.firstTime), - EndTime: timestamppb.New(r.highestTime), - Duration: elapsed.Seconds(), + StartTime: timestamppb.New(time.Unix(0, r.firstTime)), + EndTime: timestamppb.New(time.Unix(0, r.highestTime)), + Duration: elapsedSeconds, StartTimestamp: extStartTS, EndTimestamp: extHighestTS, RtpClockTicks: rtpClockTicks, DriftSamples: driftSamples, DriftMs: (float64(driftSamples) * 1000) / float64(r.params.ClockRate), - ClockRate: float64(rtpClockTicks) / elapsed.Seconds(), + ClockRate: float64(rtpClockTicks) / elapsedSeconds, } } } diff --git a/pkg/sfu/buffer/rtpstats_receiver.go b/pkg/sfu/buffer/rtpstats_receiver.go index 1fe7c269e..9bf8f6c22 100644 --- a/pkg/sfu/buffer/rtpstats_receiver.go +++ b/pkg/sfu/buffer/rtpstats_receiver.go @@ -101,7 +101,8 @@ type RTPStatsReceiver struct { sequenceNumber *utils.WrapAround[uint16, uint64] - timestamp *utils.WrapAround[uint32, uint64] + tsRolloverThreshold int64 + timestamp *utils.WrapAround[uint32, uint64] history *protoutils.Bitmap[uint64] @@ -115,14 +116,16 @@ type RTPStatsReceiver struct { outOfOrderSenderReportCount int largeJumpCount int largeJumpNegativeCount int + timeReversedCount int } func NewRTPStatsReceiver(params RTPStatsParams) *RTPStatsReceiver { return &RTPStatsReceiver{ - rtpStatsBase: newRTPStatsBase(params), - sequenceNumber: utils.NewWrapAround[uint16, uint64](utils.WrapAroundParams{IsRestartAllowed: false}), - timestamp: utils.NewWrapAround[uint32, uint64](utils.WrapAroundParams{IsRestartAllowed: false}), - history: protoutils.NewBitmap[uint64](cHistorySize), + rtpStatsBase: newRTPStatsBase(params), + sequenceNumber: utils.NewWrapAround[uint16, uint64](utils.WrapAroundParams{IsRestartAllowed: false}), + tsRolloverThreshold: (1 << 31) * 1e9 / int64(params.ClockRate), + timestamp: utils.NewWrapAround[uint32, uint64](utils.WrapAroundParams{IsRestartAllowed: false}), + history: protoutils.NewBitmap[uint64](cHistorySize), } } @@ -133,8 +136,18 @@ func (r *RTPStatsReceiver) NewSnapshotId() uint32 { return r.newSnapshotID(r.sequenceNumber.GetExtendedHighest()) } +func (r *RTPStatsReceiver) getTSRolloverCount(diffNano int64) int { + if diffNano < r.tsRolloverThreshold { + // time not more than rollover threshold + return 0 + } + + excess := int((diffNano - r.tsRolloverThreshold) * int64(r.params.ClockRate) / 1e9) + return excess/(1<<32) + 1 +} + func (r *RTPStatsReceiver) Update( - packetTime time.Time, + packetTime int64, sequenceNumber uint16, timestamp uint32, marker bool, @@ -151,6 +164,7 @@ func (r *RTPStatsReceiver) Update( } var resSN utils.WrapAroundUpdateResult[uint64] + var gapSN int64 var resTS utils.WrapAroundUpdateResult[uint64] if !r.initialized { if payloadSize == 0 { @@ -184,20 +198,23 @@ func (r *RTPStatsReceiver) Update( flowState.IsNotHandled = true return } - resTS = r.timestamp.Update(timestamp) + + gapSN = int64(resSN.ExtendedVal - resSN.PreExtendedHighest) + if gapSN <= 0 { + resTS = r.timestamp.Update(timestamp) + } else { + resTS = r.timestamp.Rollover(timestamp, r.getTSRolloverCount(packetTime-r.highestTime)) + } } pktSize := uint64(hdrSize + payloadSize + paddingSize) - gapSN := int64(resSN.ExtendedVal - resSN.PreExtendedHighest) getLoggingFields := func() []interface{} { return []interface{}{ - "prevSN", resSN.PreExtendedHighest, - "currSN", resSN.ExtendedVal, + "resSN", resSN, "gapSN", gapSN, - "prevTS", resTS.PreExtendedHighest, - "currTS", resTS.ExtendedVal, + "resTS", resTS, "gapTS", int64(resTS.ExtendedVal - resTS.PreExtendedHighest), - "packetTime", packetTime.String(), + "packetTime", time.Unix(0, packetTime).String(), "sequenceNumber", sequenceNumber, "timestamp", timestamp, "marker", marker, @@ -238,7 +255,7 @@ func (r *RTPStatsReceiver) Update( } } } else { // in-order - if gapSN >= cSequenceNumberLargeJumpThreshold || resTS.ExtendedVal < resTS.PreExtendedHighest { + if gapSN >= cSequenceNumberLargeJumpThreshold { r.largeJumpCount++ if (r.largeJumpCount-1)%100 == 0 { r.logger.Warnw( @@ -248,6 +265,16 @@ func (r *RTPStatsReceiver) Update( } } + if resTS.ExtendedVal < resTS.PreExtendedHighest { + r.timeReversedCount++ + if (r.timeReversedCount-1)%100 == 0 { + r.logger.Warnw( + "time reversed", nil, + append(getLoggingFields(), "count", r.timeReversedCount)..., + ) + } + } + // update gap histogram r.updateGapHistogram(int(gapSN)) @@ -478,7 +505,9 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData) srDataCopy.AtAdjusted = ntpTime.Add(r.propagationDelay) r.srNewest = &srDataCopy - r.maybeAdjustFirstPacketTime(r.srNewest, 0, r.timestamp.GetExtendedStart()) + if err, loggingFields := r.maybeAdjustFirstPacketTime(r.srNewest, 0, r.timestamp.GetExtendedStart()); err != nil { + r.logger.Infow(err.Error(), append(loggingFields, "rtpStats", lockedRTPStatsReceiverLogEncoder{r})...) + } return true } diff --git a/pkg/sfu/buffer/rtpstats_receiver_test.go b/pkg/sfu/buffer/rtpstats_receiver_test.go index 1816b6689..075578dd8 100644 --- a/pkg/sfu/buffer/rtpstats_receiver_test.go +++ b/pkg/sfu/buffer/rtpstats_receiver_test.go @@ -61,7 +61,7 @@ func Test_RTPStatsReceiver(t *testing.T) { for i := 0; i < packetsPerFrame; i++ { packet := getPacket(sequenceNumber, timestamp, packetSize) r.Update( - time.Now(), + time.Now().UnixNano(), packet.Header.SequenceNumber, packet.Header.Timestamp, packet.Header.Marker, @@ -97,7 +97,7 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { timestamp := uint32(rand.Float64() * float64(1<<32)) packet := getPacket(sequenceNumber, timestamp, 1000) flowState := r.Update( - time.Now(), + time.Now().UnixNano(), packet.Header.SequenceNumber, packet.Header.Timestamp, packet.Header.Marker, @@ -117,7 +117,7 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { timestamp += 3000 packet = getPacket(sequenceNumber, timestamp, 1000) flowState = r.Update( - time.Now(), + time.Now().UnixNano(), packet.Header.SequenceNumber, packet.Header.Timestamp, packet.Header.Marker, @@ -134,7 +134,7 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { // out-of-order, would cause a restart which is disallowed packet = getPacket(sequenceNumber-10, timestamp-30000, 1000) flowState = r.Update( - time.Now(), + time.Now().UnixNano(), packet.Header.SequenceNumber, packet.Header.Timestamp, packet.Header.Marker, @@ -154,7 +154,7 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { // duplicate of the above out-of-order packet, but would not be handled as it causes a restart packet = getPacket(sequenceNumber-10, timestamp-30000, 1000) flowState = r.Update( - time.Now(), + time.Now().UnixNano(), packet.Header.SequenceNumber, packet.Header.Timestamp, packet.Header.Marker, @@ -176,7 +176,7 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { timestamp += 30000 packet = getPacket(sequenceNumber, timestamp, 1000) flowState = r.Update( - time.Now(), + time.Now().UnixNano(), packet.Header.SequenceNumber, packet.Header.Timestamp, packet.Header.Marker, @@ -192,7 +192,7 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { // out-of-order should decrement number of lost packets packet = getPacket(sequenceNumber-6, timestamp-45000, 1000) flowState = r.Update( - time.Now(), + time.Now().UnixNano(), packet.Header.SequenceNumber, packet.Header.Timestamp, packet.Header.Marker, @@ -215,7 +215,7 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { timestamp += 6000 packet = getPacket(sequenceNumber, timestamp, 1000) flowState = r.Update( - time.Now(), + time.Now().UnixNano(), packet.Header.SequenceNumber, packet.Header.Timestamp, packet.Header.Marker, @@ -234,7 +234,7 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { timestamp -= 3000 packet = getPacket(sequenceNumber, timestamp, 999) flowState = r.Update( - time.Now(), + time.Now().UnixNano(), packet.Header.SequenceNumber, packet.Header.Timestamp, packet.Header.Marker, @@ -251,7 +251,7 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { sequenceNumber += 2 packet = getPacket(sequenceNumber, timestamp, 0) flowState = r.Update( - time.Now(), + time.Now().UnixNano(), packet.Header.SequenceNumber, packet.Header.Timestamp, packet.Header.Marker, diff --git a/pkg/sfu/buffer/rtpstats_sender.go b/pkg/sfu/buffer/rtpstats_sender.go index ffb420229..5576a3dd0 100644 --- a/pkg/sfu/buffer/rtpstats_sender.go +++ b/pkg/sfu/buffer/rtpstats_sender.go @@ -166,6 +166,7 @@ type RTPStatsSender struct { metadataCacheOverflowCount int largeJumpNegativeCount int largeJumpCount int + timeReversedCount int } func NewRTPStatsSender(params RTPStatsParams) *RTPStatsSender { @@ -233,7 +234,7 @@ func (r *RTPStatsSender) NewSenderSnapshotId() uint32 { } func (r *RTPStatsSender) Update( - packetTime time.Time, + packetTime int64, extSequenceNumber uint64, extTimestamp uint64, marker bool, @@ -358,16 +359,26 @@ func (r *RTPStatsSender) Update( } } } else { // in-order - if gapSN >= cSequenceNumberLargeJumpThreshold || extTimestamp < r.extHighestTS { + if gapSN >= cSequenceNumberLargeJumpThreshold { r.largeJumpCount++ if (r.largeJumpCount-1)%100 == 0 { r.logger.Warnw( - "large sequence number gap OR time reversed", nil, + "large sequence number gap", nil, append(getLoggingFields(), "count", r.largeJumpCount)..., ) } } + if extTimestamp < r.extHighestTS { + r.timeReversedCount++ + if (r.timeReversedCount-1)%100 == 0 { + r.logger.Warnw( + "time reversed", nil, + append(getLoggingFields(), "count", r.timeReversedCount)..., + ) + } + } + // update gap histogram r.updateGapHistogram(int(gapSN)) @@ -575,7 +586,9 @@ func (r *RTPStatsSender) MaybeAdjustFirstPacketTime(publisherSRData *RTCPSenderR return } - r.maybeAdjustFirstPacketTime(publisherSRData, tsOffset, r.extStartTS) + if err, loggingFields := r.maybeAdjustFirstPacketTime(publisherSRData, tsOffset, r.extStartTS); err != nil { + r.logger.Infow(err.Error(), append(loggingFields, "rtpStats", lockedRTPStatsSenderLogEncoder{r})...) + } } func (r *RTPStatsSender) GetExpectedRTPTimestamp(at time.Time) (expectedTSExt uint64, err error) { @@ -587,7 +600,7 @@ func (r *RTPStatsSender) GetExpectedRTPTimestamp(at time.Time) (expectedTSExt ui return } - timeDiff := at.Sub(r.firstTime) + timeDiff := at.Sub(time.Unix(0, r.firstTime)) expectedRTPDiff := timeDiff.Nanoseconds() * int64(r.params.ClockRate) / 1e9 expectedTSExt = r.extStartTS + uint64(expectedRTPDiff) return @@ -630,8 +643,8 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, publisherSRData *RTCPS "tsOffset", tsOffset, "timeNow", time.Now().String(), "now", now.String(), - "timeSinceHighest", now.Sub(r.highestTime).String(), - "timeSinceFirst", now.Sub(r.firstTime).String(), + "timeSinceHighest", now.Sub(time.Unix(0, r.highestTime)).String(), + "timeSinceFirst", now.Sub(time.Unix(0, r.firstTime)).String(), "timeSincePublisherSRAdjusted", timeSincePublisherSRAdjusted.String(), "timeSincePublisherSR", time.Since(publisherSRData.At).String(), "nowRTPExt", nowRTPExt, diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 553d7abb0..c52958ec6 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -905,7 +905,7 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool, forceMa &hdr, len(payload), &sendPacketMetadata{ - packetTime: time.Now(), + packetTime: time.Now().UnixNano(), extSequenceNumber: snts[i].extSequenceNumber, extTimestamp: snts[i].extTimestamp, isPadding: true, @@ -1438,7 +1438,7 @@ func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan } d.sendingPacket(&hdr, len(payload), &sendPacketMetadata{ - packetTime: time.Now(), + packetTime: time.Now().UnixNano(), extSequenceNumber: snts[i].extSequenceNumber, extTimestamp: snts[i].extTimestamp, }) @@ -1801,7 +1801,7 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) { len(payload), &sendPacketMetadata{ layer: int32(epm.layer), - packetTime: time.Now(), + packetTime: time.Now().UnixNano(), extSequenceNumber: epm.extSequenceNumber, extTimestamp: epm.extTimestamp, isRTX: true, @@ -2011,7 +2011,7 @@ func (d *DownTrack) sendSilentFrameOnMuteForOpus() { &hdr, len(payload), &sendPacketMetadata{ - packetTime: time.Now(), + packetTime: time.Now().UnixNano(), extSequenceNumber: snts[i].extSequenceNumber, extTimestamp: snts[i].extTimestamp, // although this is using empty frames, mark as padding as these are used to trigger Pion OnTrack only @@ -2053,7 +2053,7 @@ func (d *DownTrack) handleRTCPSenderReportData(publisherSRData *buffer.RTCPSende type sendPacketMetadata struct { layer int32 - packetTime time.Time + packetTime int64 extSequenceNumber uint64 extTimestamp uint64 isKeyFrame bool diff --git a/pkg/sfu/forwardstats.go b/pkg/sfu/forwardstats.go index 0924e199a..5de0df3f4 100644 --- a/pkg/sfu/forwardstats.go +++ b/pkg/sfu/forwardstats.go @@ -12,10 +12,10 @@ import ( ) type ForwardStats struct { - lock sync.Mutex - lastLeftMs atomic.Int64 - latency *utils.LatencyAggregate - closeCh chan struct{} + lock sync.Mutex + lastLeftNano atomic.Int64 + latency *utils.LatencyAggregate + closeCh chan struct{} } func NewForwardStats(latencyUpdateInterval, reportInterval, latencyWindowLength time.Duration) *ForwardStats { @@ -28,22 +28,21 @@ func NewForwardStats(latencyUpdateInterval, reportInterval, latencyWindowLength return s } -func (s *ForwardStats) Update(arrival, left time.Time) { - transit := left.Sub(arrival) +func (s *ForwardStats) Update(arrival, left int64) { + transit := left - arrival // ignore if transit is too large or negative, this could happen if system time is adjusted - if transit < 0 || transit > 5*time.Second { + if transit < 0 || time.Duration(transit) > 5*time.Second { return } - leftMs := left.UnixMilli() - lastMs := s.lastLeftMs.Load() - if leftMs < lastMs || !s.lastLeftMs.CompareAndSwap(lastMs, leftMs) { + lastLeftNano := s.lastLeftNano.Load() + if left < lastLeftNano || !s.lastLeftNano.CompareAndSwap(lastLeftNano, left) { return } s.lock.Lock() defer s.lock.Unlock() - s.latency.Update(time.Duration(arrival.UnixNano()), float64(transit)) + s.latency.Update(time.Duration(arrival), float64(transit)) } func (s *ForwardStats) GetStats() (latency, jitter time.Duration) { diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index b0ccc9431..97214577b 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -756,7 +756,7 @@ func (w *WebRTCReceiver) forwardRTP(layer int32) { } if writeCount > 0 && w.forwardStats != nil { - w.forwardStats.Update(pkt.Arrival, time.Now()) + w.forwardStats.Update(pkt.Arrival, time.Now().UnixNano()) } if spatialTracker != nil { diff --git a/pkg/sfu/redprimaryreceiver.go b/pkg/sfu/redprimaryreceiver.go index 70cc1945e..aea9bdf6b 100644 --- a/pkg/sfu/redprimaryreceiver.go +++ b/pkg/sfu/redprimaryreceiver.go @@ -73,7 +73,7 @@ func (r *RedPrimaryReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int3 for i, sendPkt := range pkts { pPkt := *pkt if i != len(pkts)-1 { - // patch extended sequence number and time stmap for all but the last packet, + // patch extended sequence number and time stamp for all but the last packet, // last packet is the primary payload pPkt.ExtSequenceNumber -= uint64(pkts[len(pkts)-1].SequenceNumber - pkts[i].SequenceNumber) pPkt.ExtTimestamp -= uint64(pkts[len(pkts)-1].Timestamp - pkts[i].Timestamp) diff --git a/pkg/sfu/sequencer.go b/pkg/sfu/sequencer.go index c876b63c9..22750a037 100644 --- a/pkg/sfu/sequencer.go +++ b/pkg/sfu/sequencer.go @@ -104,7 +104,7 @@ type sequencer struct { func newSequencer(size int, maybeSparse bool, logger logger.Logger) *sequencer { s := &sequencer{ size: size, - startTime: time.Now().UnixMilli(), + startTime: time.Now().UnixNano(), meta: make([]packetMeta, size), rtt: defaultRtt, logger: logger, @@ -128,7 +128,7 @@ func (s *sequencer) setRTT(rtt uint32) { } func (s *sequencer) push( - packetTime time.Time, + packetTime int64, extIncomingSN, extModifiedSN uint64, extModifiedTS uint64, marker bool, @@ -296,7 +296,7 @@ func (s *sequencer) getExtPacketMetas(seqNo []uint16) []extPacketMeta { snOffset := uint64(0) var err error extPacketMetas := make([]extPacketMeta, 0, len(seqNo)) - refTime := s.getRefTime(time.Now()) + refTime := s.getRefTime(time.Now().UnixNano()) highestSN := uint16(s.extHighestSN) highestTS := uint32(s.extHighestTS) for _, sn := range seqNo { @@ -357,8 +357,8 @@ func (s *sequencer) getExtPacketMetas(seqNo []uint16) []extPacketMeta { return extPacketMetas } -func (s *sequencer) getRefTime(at time.Time) uint32 { - return uint32(at.UnixMilli() - s.startTime) +func (s *sequencer) getRefTime(at int64) uint32 { + return uint32((at - s.startTime) / 1e6) } func (s *sequencer) updateSNOffset() { diff --git a/pkg/sfu/sequencer_test.go b/pkg/sfu/sequencer_test.go index f8eabe045..773c54309 100644 --- a/pkg/sfu/sequencer_test.go +++ b/pkg/sfu/sequencer_test.go @@ -29,11 +29,11 @@ func Test_sequencer(t *testing.T) { off := uint16(15) for i := uint64(1); i < 518; i++ { - seq.push(time.Now(), i, i+uint64(off), 123, true, 2, nil, 0, nil, nil) + seq.push(time.Now().UnixNano(), i, i+uint64(off), 123, true, 2, nil, 0, nil, nil) } // send the last two out-of-order - seq.push(time.Now(), 519, 519+uint64(off), 123, false, 2, nil, 0, nil, nil) - seq.push(time.Now(), 518, 518+uint64(off), 123, true, 2, nil, 0, nil, nil) + seq.push(time.Now().UnixNano(), 519, 519+uint64(off), 123, false, 2, nil, 0, nil, nil) + seq.push(time.Now().UnixNano(), 518, 518+uint64(off), 123, true, 2, nil, 0, nil, nil) req := []uint16{57, 58, 62, 63, 513, 514, 515, 516, 517} res := seq.getExtPacketMetas(req) @@ -63,14 +63,14 @@ func Test_sequencer(t *testing.T) { require.Equal(t, val.extTimestamp, uint64(123)) } - seq.push(time.Now(), 521, 521+uint64(off), 123, true, 1, nil, 0, nil, nil) + seq.push(time.Now().UnixNano(), 521, 521+uint64(off), 123, true, 1, nil, 0, nil, nil) m := seq.getExtPacketMetas([]uint16{521 + off}) require.Equal(t, 0, len(m)) time.Sleep((ignoreRetransmission + 10) * time.Millisecond) m = seq.getExtPacketMetas([]uint16{521 + off}) require.Equal(t, 1, len(m)) - seq.push(time.Now(), 505, 505+uint64(off), 123, false, 1, nil, 0, nil, nil) + seq.push(time.Now().UnixNano(), 505, 505+uint64(off), 123, false, 1, nil, 0, nil, nil) m = seq.getExtPacketMetas([]uint16{505 + off}) require.Equal(t, 0, len(m)) time.Sleep((ignoreRetransmission + 10) * time.Millisecond) @@ -157,7 +157,7 @@ func Test_sequencer_getNACKSeqNo_exclusion(t *testing.T) { } else { if i.seqNo%5 == 0 { n.push( - time.Now(), + time.Now().UnixNano(), i.seqNo, i.seqNo+tt.fields.offset, 123, @@ -171,7 +171,7 @@ func Test_sequencer_getNACKSeqNo_exclusion(t *testing.T) { } else { if i.seqNo%2 == 0 { n.push( - time.Now(), + time.Now().UnixNano(), i.seqNo, i.seqNo+tt.fields.offset, 123, @@ -184,7 +184,7 @@ func Test_sequencer_getNACKSeqNo_exclusion(t *testing.T) { ) } else { n.push( - time.Now(), + time.Now().UnixNano(), i.seqNo, i.seqNo+tt.fields.offset, 123, @@ -311,7 +311,7 @@ func Test_sequencer_getNACKSeqNo_no_exclusion(t *testing.T) { } else { if i.seqNo%2 == 0 { n.push( - time.Now(), + time.Now().UnixNano(), i.seqNo, i.seqNo+tt.fields.offset, 123, @@ -324,7 +324,7 @@ func Test_sequencer_getNACKSeqNo_no_exclusion(t *testing.T) { ) } else { n.push( - time.Now(), + time.Now().UnixNano(), i.seqNo, i.seqNo+tt.fields.offset, 123, diff --git a/pkg/sfu/testutils/data.go b/pkg/sfu/testutils/data.go index 1bebc420b..04903a26e 100644 --- a/pkg/sfu/testutils/data.go +++ b/pkg/sfu/testutils/data.go @@ -66,7 +66,7 @@ func GetTestExtPacket(params *TestExtPacketParams) (*buffer.ExtPacket, error) { VideoLayer: params.VideoLayer, ExtSequenceNumber: uint64(params.SNCycles<<16) + uint64(params.SequenceNumber), ExtTimestamp: uint64(params.TSCycles<<32) + uint64(params.Timestamp), - Arrival: params.ArrivalTime, + Arrival: params.ArrivalTime.UnixNano(), Packet: &packet, KeyFrame: params.IsKeyFrame, RawPacket: raw, diff --git a/pkg/sfu/utils/wraparound.go b/pkg/sfu/utils/wraparound.go index e37d5ca27..a1a0441ca 100644 --- a/pkg/sfu/utils/wraparound.go +++ b/pkg/sfu/utils/wraparound.go @@ -16,6 +16,8 @@ package utils import ( "unsafe" + + "go.uber.org/zap/zapcore" ) type number interface { @@ -65,6 +67,19 @@ type WrapAroundUpdateResult[ET extendedNumber] struct { ExtendedVal ET } +func (w *WrapAroundUpdateResult[ET]) MarshalLogObject(e zapcore.ObjectEncoder) error { + if w == nil { + return nil + } + + e.AddBool("IsUnhandled", w.IsUnhandled) + e.AddBool("IsRestart", w.IsRestart) + e.AddUint64("PreExtendedStart", uint64(w.PreExtendedStart)) + e.AddUint64("PreExtendedHighest", uint64(w.PreExtendedHighest)) + e.AddUint64("ExtendedVal", uint64(w.ExtendedVal)) + return nil +} + func (w *WrapAround[T, ET]) Update(val T) (result WrapAroundUpdateResult[ET]) { if !w.initialized { result.PreExtendedHighest = ET(val) - 1 @@ -96,6 +111,21 @@ func (w *WrapAround[T, ET]) Update(val T) (result WrapAroundUpdateResult[ET]) { return } +func (w *WrapAround[T, ET]) Rollover(val T, numCycles int) (result WrapAroundUpdateResult[ET]) { + if !w.initialized || numCycles == 0 { + return w.Update(val) + } + + result.PreExtendedHighest = w.extendedHighest + + w.cycles += ET(numCycles) * w.fullRange + w.highest = val + + w.updateExtendedHighest() + result.ExtendedVal = w.extendedHighest + return +} + func (w *WrapAround[T, ET]) RollbackRestart(ev ET) { if w.isWrapBack(w.start, T(ev)) { w.cycles -= w.fullRange diff --git a/pkg/sfu/utils/wraparound_test.go b/pkg/sfu/utils/wraparound_test.go index 018e227fc..76cb8fdb3 100644 --- a/pkg/sfu/utils/wraparound_test.go +++ b/pkg/sfu/utils/wraparound_test.go @@ -448,6 +448,91 @@ func TestWrapAroundUint16WrapAroundRestartDuplicate(t *testing.T) { require.Equal(t, uint64(65568), w.GetExtendedHighest()) } +func TestWrapAroundUint16Rollover(t *testing.T) { + w := NewWrapAround[uint16, uint32](WrapAroundParams{IsRestartAllowed: false}) + testCases := []struct { + name string + input uint16 + numCycles int + updated WrapAroundUpdateResult[uint32] + start uint16 + extendedStart uint32 + highest uint16 + extendedHighest uint32 + }{ + // initialize - should initialize irrespective of numCycles + { + name: "initialize", + input: 10, + numCycles: 10, + updated: WrapAroundUpdateResult[uint32]{ + IsRestart: false, + PreExtendedStart: 0, + PreExtendedHighest: 9, + ExtendedVal: 10, + }, + start: 10, + extendedStart: 10, + highest: 10, + extendedHighest: 10, + }, + // zero cycles - should just do an update + { + name: "zero", + input: 8, + updated: WrapAroundUpdateResult[uint32]{ + IsUnhandled: true, + // the following fields are not valid when `IsUnhandled = true`, but code fills it in + // and they are filled in here for testing purposes + PreExtendedHighest: 10, + ExtendedVal: 8, + }, + start: 10, + extendedStart: 10, + highest: 10, + extendedHighest: 10, + }, + // one cycle + { + name: "one cycle", + input: (1 << 16) - 6, + numCycles: 1, + updated: WrapAroundUpdateResult[uint32]{ + PreExtendedHighest: 10, + ExtendedVal: (1 << 16) - 6 + (1 << 16), + }, + start: 10, + extendedStart: 10, + highest: (1 << 16) - 6, + extendedHighest: (1 << 16) - 6 + (1 << 16), + }, + // two cycles + { + name: "two cycles", + input: (1 << 16) - 7, + numCycles: 2, + updated: WrapAroundUpdateResult[uint32]{ + PreExtendedHighest: (1 << 16) - 6 + (1 << 16), + ExtendedVal: (1 << 16) - 7 + 3*(1<<16), + }, + start: 10, + extendedStart: 10, + highest: (1 << 16) - 7, + extendedHighest: (1 << 16) - 7 + 3*(1<<16), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + require.Equal(t, tc.updated, w.Rollover(tc.input, tc.numCycles)) + require.Equal(t, tc.start, w.GetStart()) + require.Equal(t, tc.extendedStart, w.GetExtendedStart()) + require.Equal(t, tc.highest, w.GetHighest()) + require.Equal(t, tc.extendedHighest, w.GetExtendedHighest()) + }) + } +} + func TestWrapAroundUint32(t *testing.T) { w := NewWrapAround[uint32, uint64](WrapAroundParams{IsRestartAllowed: true}) testCases := []struct {