diff --git a/pkg/sfu/audio/audiolevel.go b/pkg/sfu/audio/audiolevel.go index f908788d8..bd4ea4a72 100644 --- a/pkg/sfu/audio/audiolevel.go +++ b/pkg/sfu/audio/audiolevel.go @@ -17,7 +17,6 @@ package audio import ( "math" "sync" - "time" ) const ( @@ -46,7 +45,7 @@ type AudioLevel struct { loudestObservedLevel uint8 activeDuration uint32 // ms observedDuration uint32 // ms - lastObservedAt time.Time + lastObservedAt int64 } func NewAudioLevel(params AudioLevelParams) *AudioLevel { @@ -67,7 +66,7 @@ func NewAudioLevel(params AudioLevelParams) *AudioLevel { } // Observes a new frame -func (l *AudioLevel) Observe(level uint8, durationMs uint32, arrivalTime time.Time) { +func (l *AudioLevel) Observe(level uint8, durationMs uint32, arrivalTime int64) { l.lock.Lock() defer l.lock.Unlock() @@ -101,8 +100,8 @@ func (l *AudioLevel) Observe(level uint8, durationMs uint32, arrivalTime time.Ti } } -// returns current soothed audio level -func (l *AudioLevel) GetLevel(now time.Time) (float64, bool) { +// returns current smoothed audio level +func (l *AudioLevel) GetLevel(now int64) (float64, bool) { l.lock.Lock() defer l.lock.Unlock() @@ -111,8 +110,8 @@ func (l *AudioLevel) GetLevel(now time.Time) (float64, bool) { return l.smoothedLevel, l.smoothedLevel >= l.activeThreshold } -func (l *AudioLevel) resetIfStaleLocked(arrivalTime time.Time) { - if arrivalTime.Sub(l.lastObservedAt).Milliseconds() < int64(2*l.params.ObserveDuration) { +func (l *AudioLevel) resetIfStaleLocked(arrivalTime int64) { + if (arrivalTime-l.lastObservedAt)/1e6 < int64(2*l.params.ObserveDuration) { return } diff --git a/pkg/sfu/audio/audiolevel_test.go b/pkg/sfu/audio/audiolevel_test.go index 84d5a34dd..a5ecba189 100644 --- a/pkg/sfu/audio/audiolevel_test.go +++ b/pkg/sfu/audio/audiolevel_test.go @@ -34,13 +34,13 @@ func TestAudioLevel(t *testing.T) { clock := time.Now() a := createAudioLevel(defaultActiveLevel, defaultPercentile, defaultObserveDuration) - _, noisy := a.GetLevel(clock) + _, noisy := a.GetLevel(clock.UnixNano()) require.False(t, noisy) observeSamples(a, 28, 5, clock) clock = clock.Add(5 * 20 * time.Millisecond) - _, noisy = a.GetLevel(clock) + _, noisy = a.GetLevel(clock.UnixNano()) require.False(t, noisy) }) @@ -51,7 +51,7 @@ func TestAudioLevel(t *testing.T) { observeSamples(a, 35, 100, clock) clock = clock.Add(100 * 20 * time.Millisecond) - _, noisy := a.GetLevel(clock) + _, noisy := a.GetLevel(clock.UnixNano()) require.False(t, noisy) }) @@ -66,7 +66,7 @@ func TestAudioLevel(t *testing.T) { observeSamples(a, 35, 1, clock) clock = clock.Add(20 * time.Millisecond) - _, noisy := a.GetLevel(clock) + _, noisy := a.GetLevel(clock.UnixNano()) require.False(t, noisy) }) @@ -81,7 +81,7 @@ func TestAudioLevel(t *testing.T) { observeSamples(a, 29, 8, clock) clock = clock.Add(8 * 20 * time.Millisecond) - level, noisy := a.GetLevel(clock) + level, noisy := a.GetLevel(clock.UnixNano()) require.True(t, noisy) require.Greater(t, level, ConvertAudioLevel(float64(defaultActiveLevel))) require.Less(t, level, ConvertAudioLevel(float64(25))) @@ -93,14 +93,14 @@ func TestAudioLevel(t *testing.T) { observeSamples(a, 25, 100, clock) clock = clock.Add(100 * 20 * time.Millisecond) - level, noisy := a.GetLevel(clock) + level, noisy := a.GetLevel(clock.UnixNano()) require.True(t, noisy) require.Greater(t, level, ConvertAudioLevel(float64(defaultActiveLevel))) require.Less(t, level, ConvertAudioLevel(float64(20))) // let enough time pass to make the samples stale clock = clock.Add(1500 * time.Millisecond) - level, noisy = a.GetLevel(clock) + level, noisy = a.GetLevel(clock.UnixNano()) require.Equal(t, float64(0.0), level) require.False(t, noisy) }) @@ -116,6 +116,6 @@ func createAudioLevel(activeLevel uint8, minPercentile uint8, observeDuration ui func observeSamples(a *AudioLevel, level uint8, count int, baseTime time.Time) { for i := 0; i < count; i++ { - a.Observe(level, 20, baseTime.Add(+time.Duration(i*20)*time.Millisecond)) + a.Observe(level, 20, baseTime.Add(+time.Duration(i*20)*time.Millisecond).UnixNano()) } } diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index 81ab7b5fe..1170118c6 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -45,20 +45,20 @@ import ( ) const ( - ReportDelta = time.Second + ReportDelta = 1e9 InitPacketBufferSizeVideo = 300 InitPacketBufferSizeAudio = 70 ) type pendingPacket struct { - arrivalTime time.Time + arrivalTime int64 packet []byte } type ExtPacket struct { VideoLayer - Arrival time.Time + Arrival int64 ExtSequenceNumber uint64 ExtTimestamp uint64 Packet *rtp.Packet @@ -84,7 +84,7 @@ type Buffer struct { closeOnce sync.Once mediaSSRC uint32 clockRate uint32 - lastReport time.Time + lastReport int64 twccExtID uint8 audioLevelExtID uint8 bound bool @@ -212,7 +212,7 @@ func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapabili b.ppsSnapshotId = b.rtpStats.NewSnapshotId() b.clockRate = codec.ClockRate - b.lastReport = time.Now() + b.lastReport = time.Now().UnixNano() b.mime = strings.ToLower(codec.MimeType) for _, codecParameter := range params.Codecs { if strings.EqualFold(codecParameter.MimeType, codec.MimeType) { @@ -340,10 +340,10 @@ func (b *Buffer) Write(pkt []byte) (n int, err error) { } } - now := time.Now() + now := time.Now().UnixNano() if b.twcc != nil && b.twccExtID != 0 && !b.closed.Load() { if ext := rtpPacket.GetExtension(b.twccExtID); ext != nil { - b.twcc.Push(rtpPacket.SSRC, binary.BigEndian.Uint16(ext[0:2]), now.UnixNano(), rtpPacket.Marker) + b.twcc.Push(rtpPacket.SSRC, binary.BigEndian.Uint16(ext[0:2]), now, rtpPacket.Marker) } } @@ -373,7 +373,7 @@ func (b *Buffer) Write(pkt []byte) (n int, err error) { } b.payloadType = rtpPacket.PayloadType - b.calc(pkt, &rtpPacket, time.Now(), false) + b.calc(pkt, &rtpPacket, now, false) b.Unlock() b.readCond.Broadcast() return @@ -398,7 +398,7 @@ func (b *Buffer) SetPrimaryBufferForRTX(primaryBuffer *Buffer) { } } -func (b *Buffer) writeRTX(rtxPkt *rtp.Packet, arrivalTime time.Time) (n int, err error) { +func (b *Buffer) writeRTX(rtxPkt *rtp.Packet, arrivalTime int64) (n int, err error) { b.Lock() defer b.Unlock() if !b.bound { @@ -541,7 +541,7 @@ func (b *Buffer) SetRTT(rtt uint32) { } } -func (b *Buffer) calc(rawPkt []byte, rtpPacket *rtp.Packet, arrivalTime time.Time, isRTX bool) { +func (b *Buffer) calc(rawPkt []byte, rtpPacket *rtp.Packet, arrivalTime int64, isRTX bool) { defer func() { b.doNACKs() @@ -740,7 +740,7 @@ func (b *Buffer) doFpsCalc(ep *ExtPacket) { } } -func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime time.Time) RTPFlowState { +func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime int64) RTPFlowState { flowState := b.rtpStats.Update( arrivalTime, p.Header.SequenceNumber, @@ -764,7 +764,7 @@ func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime time.Time) RTPFlow return flowState } -func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime time.Time, isRTX bool) { +func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime int64, isRTX bool) { if b.audioLevelExtID != 0 && !isRTX { if !b.latestTSForAudioLevelInitialized { b.latestTSForAudioLevelInitialized = true @@ -786,7 +786,7 @@ func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime time.Time, i } } -func (b *Buffer) getExtPacket(rtpPacket *rtp.Packet, arrivalTime time.Time, flowState RTPFlowState) *ExtPacket { +func (b *Buffer) getExtPacket(rtpPacket *rtp.Packet, arrivalTime int64, flowState RTPFlowState) *ExtPacket { ep := &ExtPacket{ Arrival: arrivalTime, ExtSequenceNumber: flowState.ExtSequenceNumber, @@ -887,8 +887,8 @@ func (b *Buffer) doNACKs() { } } -func (b *Buffer) doReports(arrivalTime time.Time) { - if time.Since(b.lastReport) < ReportDelta { +func (b *Buffer) doReports(arrivalTime int64) { + if arrivalTime-b.lastReport < ReportDelta { return } @@ -1090,7 +1090,7 @@ func (b *Buffer) GetAudioLevel() (float64, bool) { return 0, false } - return b.audioLevel.GetLevel(time.Now()) + return b.audioLevel.GetLevel(time.Now().UnixNano()) } func (b *Buffer) OnFpsChanged(f func()) { diff --git a/pkg/sfu/buffer/rtpstats_base.go b/pkg/sfu/buffer/rtpstats_base.go index 07c906e70..553c0cbd8 100644 --- a/pkg/sfu/buffer/rtpstats_base.go +++ b/pkg/sfu/buffer/rtpstats_base.go @@ -35,7 +35,7 @@ const ( cFirstSnapshotID = 1 cFirstPacketTimeAdjustWindow = 2 * time.Minute - cFirstPacketTimeAdjustThreshold = 15 * time.Second + cFirstPacketTimeAdjustThreshold = 15 * 1e9 cPassthroughNTPTimestamp = true @@ -175,9 +175,9 @@ type rtpStatsBase struct { startTime time.Time endTime time.Time - firstTime time.Time + firstTime int64 firstTimeAdjustment time.Duration - highestTime time.Time + highestTime int64 lastTransit uint64 lastJitterExtTimestamp uint64 @@ -506,7 +506,7 @@ func (r *rtpStatsBase) GetRtt() uint32 { return r.rtt } -func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *RTCPSenderReportData, tsOffset uint64, extStartTS uint64) { +func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *RTCPSenderReportData, tsOffset uint64, extStartTS uint64) (err error, loggingFields []interface{}) { if time.Since(r.startTime) > cFirstPacketTimeAdjustWindow { return } @@ -526,17 +526,17 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *RTCPSenderReportData, } samplesDuration := time.Duration(float64(samplesDiff) / float64(r.params.ClockRate) * float64(time.Second)) - timeSinceFirst := time.Since(r.firstTime) - now := r.firstTime.Add(timeSinceFirst) - firstTime := now.Add(-samplesDuration) + timeSinceFirst := time.Since(time.Unix(0, r.firstTime)) + now := r.firstTime + timeSinceFirst.Nanoseconds() + firstTime := now - samplesDuration.Nanoseconds() getFields := func() []interface{} { return []interface{}{ "startTime", r.startTime.String(), - "nowTime", now.String(), - "before", r.firstTime.String(), - "after", firstTime.String(), - "adjustment", r.firstTime.Sub(firstTime).String(), + "nowTime", time.Unix(0, now).String(), + "before", time.Unix(0, r.firstTime).String(), + "after", time.Unix(0, firstTime).String(), + "adjustment", time.Duration(r.firstTime - firstTime).String(), "extNowTS", extNowTS, "extStartTS", extStartTS, "srData", srData, @@ -548,15 +548,17 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *RTCPSenderReportData, } } - if firstTime.Before(r.firstTime) { - if r.firstTime.Sub(firstTime) > cFirstPacketTimeAdjustThreshold { - r.logger.Infow("adjusting first packet time, too big, ignoring", getFields()...) + if firstTime < r.firstTime { + if r.firstTime-firstTime > cFirstPacketTimeAdjustThreshold { + err = errors.New("adjusting first packet time, too big, ignoring") + loggingFields = getFields() } else { r.logger.Debugw("adjusting first packet time", getFields()...) - r.firstTimeAdjustment += r.firstTime.Sub(firstTime) + r.firstTimeAdjustment += time.Duration(r.firstTime - firstTime) r.firstTime = firstTime } } + return } func (r *rtpStatsBase) getTotalPacketsPrimary(extStartSN, extHighestSN uint64) uint64 { @@ -659,9 +661,9 @@ func (r *rtpStatsBase) MarshalLogObject(e zapcore.ObjectEncoder) error { e.AddTime("startTime", r.startTime) e.AddTime("endTime", r.endTime) - e.AddTime("firstTime", r.firstTime) + e.AddTime("firstTime", time.Unix(0, r.firstTime)) e.AddDuration("firstTimeAdjustment", r.firstTimeAdjustment) - e.AddTime("highestTime", r.highestTime) + e.AddTime("highestTime", time.Unix(0, r.highestTime)) e.AddUint64("bytes", r.bytes) e.AddUint64("headerBytes", r.headerBytes) @@ -907,7 +909,7 @@ func (r *rtpStatsBase) toProto( return p } -func (r *rtpStatsBase) updateJitter(ets uint64, packetTime time.Time) float64 { +func (r *rtpStatsBase) updateJitter(ets uint64, packetTime int64) float64 { // Do not update jitter on multiple packets of same frame. // All packets of a frame have the same time stamp. // NOTE: This does not protect against using more than one packet of the same frame @@ -916,8 +918,8 @@ func (r *rtpStatsBase) updateJitter(ets uint64, packetTime time.Time) float64 { // In this case, p2f1 (packet 2, frame 1) will still be used in jitter calculation // although it is the second packet of a frame because of out-of-order receival. if r.lastJitterExtTimestamp != ets { - timeSinceFirst := packetTime.Sub(r.firstTime) - packetTimeRTP := uint64(timeSinceFirst.Nanoseconds() * int64(r.params.ClockRate) / 1e9) + timeSinceFirst := packetTime - r.firstTime + packetTimeRTP := uint64(timeSinceFirst * int64(r.params.ClockRate) / 1e9) transit := packetTimeRTP - ets if r.lastTransit != 0 { @@ -963,21 +965,22 @@ func (r *rtpStatsBase) getAndResetSnapshot(snapshotID uint32, extStartSN uint64, } func (r *rtpStatsBase) getDrift(extStartTS, extHighestTS uint64) (packetDrift *livekit.RTPDrift, ntpReportDrift *livekit.RTPDrift, rebasedReportDrift *livekit.RTPDrift) { - if !r.firstTime.IsZero() { - elapsed := r.highestTime.Sub(r.firstTime) + if r.firstTime != 0 { + elapsed := r.highestTime - r.firstTime rtpClockTicks := extHighestTS - extStartTS - driftSamples := int64(rtpClockTicks - uint64(elapsed.Nanoseconds()*int64(r.params.ClockRate)/1e9)) - if elapsed.Seconds() > 0.0 { + driftSamples := int64(rtpClockTicks - uint64(elapsed*int64(r.params.ClockRate)/1e9)) + if elapsed > 0 { + elapsedSeconds := time.Duration(elapsed).Seconds() packetDrift = &livekit.RTPDrift{ - StartTime: timestamppb.New(r.firstTime), - EndTime: timestamppb.New(r.highestTime), - Duration: elapsed.Seconds(), + StartTime: timestamppb.New(time.Unix(0, r.firstTime)), + EndTime: timestamppb.New(time.Unix(0, r.highestTime)), + Duration: elapsedSeconds, StartTimestamp: extStartTS, EndTimestamp: extHighestTS, RtpClockTicks: rtpClockTicks, DriftSamples: driftSamples, DriftMs: (float64(driftSamples) * 1000) / float64(r.params.ClockRate), - ClockRate: float64(rtpClockTicks) / elapsed.Seconds(), + ClockRate: float64(rtpClockTicks) / elapsedSeconds, } } } diff --git a/pkg/sfu/buffer/rtpstats_receiver.go b/pkg/sfu/buffer/rtpstats_receiver.go index 1fe7c269e..9bf8f6c22 100644 --- a/pkg/sfu/buffer/rtpstats_receiver.go +++ b/pkg/sfu/buffer/rtpstats_receiver.go @@ -101,7 +101,8 @@ type RTPStatsReceiver struct { sequenceNumber *utils.WrapAround[uint16, uint64] - timestamp *utils.WrapAround[uint32, uint64] + tsRolloverThreshold int64 + timestamp *utils.WrapAround[uint32, uint64] history *protoutils.Bitmap[uint64] @@ -115,14 +116,16 @@ type RTPStatsReceiver struct { outOfOrderSenderReportCount int largeJumpCount int largeJumpNegativeCount int + timeReversedCount int } func NewRTPStatsReceiver(params RTPStatsParams) *RTPStatsReceiver { return &RTPStatsReceiver{ - rtpStatsBase: newRTPStatsBase(params), - sequenceNumber: utils.NewWrapAround[uint16, uint64](utils.WrapAroundParams{IsRestartAllowed: false}), - timestamp: utils.NewWrapAround[uint32, uint64](utils.WrapAroundParams{IsRestartAllowed: false}), - history: protoutils.NewBitmap[uint64](cHistorySize), + rtpStatsBase: newRTPStatsBase(params), + sequenceNumber: utils.NewWrapAround[uint16, uint64](utils.WrapAroundParams{IsRestartAllowed: false}), + tsRolloverThreshold: (1 << 31) * 1e9 / int64(params.ClockRate), + timestamp: utils.NewWrapAround[uint32, uint64](utils.WrapAroundParams{IsRestartAllowed: false}), + history: protoutils.NewBitmap[uint64](cHistorySize), } } @@ -133,8 +136,18 @@ func (r *RTPStatsReceiver) NewSnapshotId() uint32 { return r.newSnapshotID(r.sequenceNumber.GetExtendedHighest()) } +func (r *RTPStatsReceiver) getTSRolloverCount(diffNano int64) int { + if diffNano < r.tsRolloverThreshold { + // time not more than rollover threshold + return 0 + } + + excess := int((diffNano - r.tsRolloverThreshold) * int64(r.params.ClockRate) / 1e9) + return excess/(1<<32) + 1 +} + func (r *RTPStatsReceiver) Update( - packetTime time.Time, + packetTime int64, sequenceNumber uint16, timestamp uint32, marker bool, @@ -151,6 +164,7 @@ func (r *RTPStatsReceiver) Update( } var resSN utils.WrapAroundUpdateResult[uint64] + var gapSN int64 var resTS utils.WrapAroundUpdateResult[uint64] if !r.initialized { if payloadSize == 0 { @@ -184,20 +198,23 @@ func (r *RTPStatsReceiver) Update( flowState.IsNotHandled = true return } - resTS = r.timestamp.Update(timestamp) + + gapSN = int64(resSN.ExtendedVal - resSN.PreExtendedHighest) + if gapSN <= 0 { + resTS = r.timestamp.Update(timestamp) + } else { + resTS = r.timestamp.Rollover(timestamp, r.getTSRolloverCount(packetTime-r.highestTime)) + } } pktSize := uint64(hdrSize + payloadSize + paddingSize) - gapSN := int64(resSN.ExtendedVal - resSN.PreExtendedHighest) getLoggingFields := func() []interface{} { return []interface{}{ - "prevSN", resSN.PreExtendedHighest, - "currSN", resSN.ExtendedVal, + "resSN", resSN, "gapSN", gapSN, - "prevTS", resTS.PreExtendedHighest, - "currTS", resTS.ExtendedVal, + "resTS", resTS, "gapTS", int64(resTS.ExtendedVal - resTS.PreExtendedHighest), - "packetTime", packetTime.String(), + "packetTime", time.Unix(0, packetTime).String(), "sequenceNumber", sequenceNumber, "timestamp", timestamp, "marker", marker, @@ -238,7 +255,7 @@ func (r *RTPStatsReceiver) Update( } } } else { // in-order - if gapSN >= cSequenceNumberLargeJumpThreshold || resTS.ExtendedVal < resTS.PreExtendedHighest { + if gapSN >= cSequenceNumberLargeJumpThreshold { r.largeJumpCount++ if (r.largeJumpCount-1)%100 == 0 { r.logger.Warnw( @@ -248,6 +265,16 @@ func (r *RTPStatsReceiver) Update( } } + if resTS.ExtendedVal < resTS.PreExtendedHighest { + r.timeReversedCount++ + if (r.timeReversedCount-1)%100 == 0 { + r.logger.Warnw( + "time reversed", nil, + append(getLoggingFields(), "count", r.timeReversedCount)..., + ) + } + } + // update gap histogram r.updateGapHistogram(int(gapSN)) @@ -478,7 +505,9 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData) srDataCopy.AtAdjusted = ntpTime.Add(r.propagationDelay) r.srNewest = &srDataCopy - r.maybeAdjustFirstPacketTime(r.srNewest, 0, r.timestamp.GetExtendedStart()) + if err, loggingFields := r.maybeAdjustFirstPacketTime(r.srNewest, 0, r.timestamp.GetExtendedStart()); err != nil { + r.logger.Infow(err.Error(), append(loggingFields, "rtpStats", lockedRTPStatsReceiverLogEncoder{r})...) + } return true } diff --git a/pkg/sfu/buffer/rtpstats_receiver_test.go b/pkg/sfu/buffer/rtpstats_receiver_test.go index 1816b6689..075578dd8 100644 --- a/pkg/sfu/buffer/rtpstats_receiver_test.go +++ b/pkg/sfu/buffer/rtpstats_receiver_test.go @@ -61,7 +61,7 @@ func Test_RTPStatsReceiver(t *testing.T) { for i := 0; i < packetsPerFrame; i++ { packet := getPacket(sequenceNumber, timestamp, packetSize) r.Update( - time.Now(), + time.Now().UnixNano(), packet.Header.SequenceNumber, packet.Header.Timestamp, packet.Header.Marker, @@ -97,7 +97,7 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { timestamp := uint32(rand.Float64() * float64(1<<32)) packet := getPacket(sequenceNumber, timestamp, 1000) flowState := r.Update( - time.Now(), + time.Now().UnixNano(), packet.Header.SequenceNumber, packet.Header.Timestamp, packet.Header.Marker, @@ -117,7 +117,7 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { timestamp += 3000 packet = getPacket(sequenceNumber, timestamp, 1000) flowState = r.Update( - time.Now(), + time.Now().UnixNano(), packet.Header.SequenceNumber, packet.Header.Timestamp, packet.Header.Marker, @@ -134,7 +134,7 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { // out-of-order, would cause a restart which is disallowed packet = getPacket(sequenceNumber-10, timestamp-30000, 1000) flowState = r.Update( - time.Now(), + time.Now().UnixNano(), packet.Header.SequenceNumber, packet.Header.Timestamp, packet.Header.Marker, @@ -154,7 +154,7 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { // duplicate of the above out-of-order packet, but would not be handled as it causes a restart packet = getPacket(sequenceNumber-10, timestamp-30000, 1000) flowState = r.Update( - time.Now(), + time.Now().UnixNano(), packet.Header.SequenceNumber, packet.Header.Timestamp, packet.Header.Marker, @@ -176,7 +176,7 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { timestamp += 30000 packet = getPacket(sequenceNumber, timestamp, 1000) flowState = r.Update( - time.Now(), + time.Now().UnixNano(), packet.Header.SequenceNumber, packet.Header.Timestamp, packet.Header.Marker, @@ -192,7 +192,7 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { // out-of-order should decrement number of lost packets packet = getPacket(sequenceNumber-6, timestamp-45000, 1000) flowState = r.Update( - time.Now(), + time.Now().UnixNano(), packet.Header.SequenceNumber, packet.Header.Timestamp, packet.Header.Marker, @@ -215,7 +215,7 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { timestamp += 6000 packet = getPacket(sequenceNumber, timestamp, 1000) flowState = r.Update( - time.Now(), + time.Now().UnixNano(), packet.Header.SequenceNumber, packet.Header.Timestamp, packet.Header.Marker, @@ -234,7 +234,7 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { timestamp -= 3000 packet = getPacket(sequenceNumber, timestamp, 999) flowState = r.Update( - time.Now(), + time.Now().UnixNano(), packet.Header.SequenceNumber, packet.Header.Timestamp, packet.Header.Marker, @@ -251,7 +251,7 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { sequenceNumber += 2 packet = getPacket(sequenceNumber, timestamp, 0) flowState = r.Update( - time.Now(), + time.Now().UnixNano(), packet.Header.SequenceNumber, packet.Header.Timestamp, packet.Header.Marker, diff --git a/pkg/sfu/buffer/rtpstats_sender.go b/pkg/sfu/buffer/rtpstats_sender.go index ffb420229..5576a3dd0 100644 --- a/pkg/sfu/buffer/rtpstats_sender.go +++ b/pkg/sfu/buffer/rtpstats_sender.go @@ -166,6 +166,7 @@ type RTPStatsSender struct { metadataCacheOverflowCount int largeJumpNegativeCount int largeJumpCount int + timeReversedCount int } func NewRTPStatsSender(params RTPStatsParams) *RTPStatsSender { @@ -233,7 +234,7 @@ func (r *RTPStatsSender) NewSenderSnapshotId() uint32 { } func (r *RTPStatsSender) Update( - packetTime time.Time, + packetTime int64, extSequenceNumber uint64, extTimestamp uint64, marker bool, @@ -358,16 +359,26 @@ func (r *RTPStatsSender) Update( } } } else { // in-order - if gapSN >= cSequenceNumberLargeJumpThreshold || extTimestamp < r.extHighestTS { + if gapSN >= cSequenceNumberLargeJumpThreshold { r.largeJumpCount++ if (r.largeJumpCount-1)%100 == 0 { r.logger.Warnw( - "large sequence number gap OR time reversed", nil, + "large sequence number gap", nil, append(getLoggingFields(), "count", r.largeJumpCount)..., ) } } + if extTimestamp < r.extHighestTS { + r.timeReversedCount++ + if (r.timeReversedCount-1)%100 == 0 { + r.logger.Warnw( + "time reversed", nil, + append(getLoggingFields(), "count", r.timeReversedCount)..., + ) + } + } + // update gap histogram r.updateGapHistogram(int(gapSN)) @@ -575,7 +586,9 @@ func (r *RTPStatsSender) MaybeAdjustFirstPacketTime(publisherSRData *RTCPSenderR return } - r.maybeAdjustFirstPacketTime(publisherSRData, tsOffset, r.extStartTS) + if err, loggingFields := r.maybeAdjustFirstPacketTime(publisherSRData, tsOffset, r.extStartTS); err != nil { + r.logger.Infow(err.Error(), append(loggingFields, "rtpStats", lockedRTPStatsSenderLogEncoder{r})...) + } } func (r *RTPStatsSender) GetExpectedRTPTimestamp(at time.Time) (expectedTSExt uint64, err error) { @@ -587,7 +600,7 @@ func (r *RTPStatsSender) GetExpectedRTPTimestamp(at time.Time) (expectedTSExt ui return } - timeDiff := at.Sub(r.firstTime) + timeDiff := at.Sub(time.Unix(0, r.firstTime)) expectedRTPDiff := timeDiff.Nanoseconds() * int64(r.params.ClockRate) / 1e9 expectedTSExt = r.extStartTS + uint64(expectedRTPDiff) return @@ -630,8 +643,8 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, publisherSRData *RTCPS "tsOffset", tsOffset, "timeNow", time.Now().String(), "now", now.String(), - "timeSinceHighest", now.Sub(r.highestTime).String(), - "timeSinceFirst", now.Sub(r.firstTime).String(), + "timeSinceHighest", now.Sub(time.Unix(0, r.highestTime)).String(), + "timeSinceFirst", now.Sub(time.Unix(0, r.firstTime)).String(), "timeSincePublisherSRAdjusted", timeSincePublisherSRAdjusted.String(), "timeSincePublisherSR", time.Since(publisherSRData.At).String(), "nowRTPExt", nowRTPExt, diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 553d7abb0..c52958ec6 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -905,7 +905,7 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool, forceMa &hdr, len(payload), &sendPacketMetadata{ - packetTime: time.Now(), + packetTime: time.Now().UnixNano(), extSequenceNumber: snts[i].extSequenceNumber, extTimestamp: snts[i].extTimestamp, isPadding: true, @@ -1438,7 +1438,7 @@ func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan } d.sendingPacket(&hdr, len(payload), &sendPacketMetadata{ - packetTime: time.Now(), + packetTime: time.Now().UnixNano(), extSequenceNumber: snts[i].extSequenceNumber, extTimestamp: snts[i].extTimestamp, }) @@ -1801,7 +1801,7 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) { len(payload), &sendPacketMetadata{ layer: int32(epm.layer), - packetTime: time.Now(), + packetTime: time.Now().UnixNano(), extSequenceNumber: epm.extSequenceNumber, extTimestamp: epm.extTimestamp, isRTX: true, @@ -2011,7 +2011,7 @@ func (d *DownTrack) sendSilentFrameOnMuteForOpus() { &hdr, len(payload), &sendPacketMetadata{ - packetTime: time.Now(), + packetTime: time.Now().UnixNano(), extSequenceNumber: snts[i].extSequenceNumber, extTimestamp: snts[i].extTimestamp, // although this is using empty frames, mark as padding as these are used to trigger Pion OnTrack only @@ -2053,7 +2053,7 @@ func (d *DownTrack) handleRTCPSenderReportData(publisherSRData *buffer.RTCPSende type sendPacketMetadata struct { layer int32 - packetTime time.Time + packetTime int64 extSequenceNumber uint64 extTimestamp uint64 isKeyFrame bool diff --git a/pkg/sfu/forwardstats.go b/pkg/sfu/forwardstats.go index 0924e199a..5de0df3f4 100644 --- a/pkg/sfu/forwardstats.go +++ b/pkg/sfu/forwardstats.go @@ -12,10 +12,10 @@ import ( ) type ForwardStats struct { - lock sync.Mutex - lastLeftMs atomic.Int64 - latency *utils.LatencyAggregate - closeCh chan struct{} + lock sync.Mutex + lastLeftNano atomic.Int64 + latency *utils.LatencyAggregate + closeCh chan struct{} } func NewForwardStats(latencyUpdateInterval, reportInterval, latencyWindowLength time.Duration) *ForwardStats { @@ -28,22 +28,21 @@ func NewForwardStats(latencyUpdateInterval, reportInterval, latencyWindowLength return s } -func (s *ForwardStats) Update(arrival, left time.Time) { - transit := left.Sub(arrival) +func (s *ForwardStats) Update(arrival, left int64) { + transit := left - arrival // ignore if transit is too large or negative, this could happen if system time is adjusted - if transit < 0 || transit > 5*time.Second { + if transit < 0 || time.Duration(transit) > 5*time.Second { return } - leftMs := left.UnixMilli() - lastMs := s.lastLeftMs.Load() - if leftMs < lastMs || !s.lastLeftMs.CompareAndSwap(lastMs, leftMs) { + lastLeftNano := s.lastLeftNano.Load() + if left < lastLeftNano || !s.lastLeftNano.CompareAndSwap(lastLeftNano, left) { return } s.lock.Lock() defer s.lock.Unlock() - s.latency.Update(time.Duration(arrival.UnixNano()), float64(transit)) + s.latency.Update(time.Duration(arrival), float64(transit)) } func (s *ForwardStats) GetStats() (latency, jitter time.Duration) { diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index b0ccc9431..97214577b 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -756,7 +756,7 @@ func (w *WebRTCReceiver) forwardRTP(layer int32) { } if writeCount > 0 && w.forwardStats != nil { - w.forwardStats.Update(pkt.Arrival, time.Now()) + w.forwardStats.Update(pkt.Arrival, time.Now().UnixNano()) } if spatialTracker != nil { diff --git a/pkg/sfu/redprimaryreceiver.go b/pkg/sfu/redprimaryreceiver.go index 70cc1945e..aea9bdf6b 100644 --- a/pkg/sfu/redprimaryreceiver.go +++ b/pkg/sfu/redprimaryreceiver.go @@ -73,7 +73,7 @@ func (r *RedPrimaryReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int3 for i, sendPkt := range pkts { pPkt := *pkt if i != len(pkts)-1 { - // patch extended sequence number and time stmap for all but the last packet, + // patch extended sequence number and time stamp for all but the last packet, // last packet is the primary payload pPkt.ExtSequenceNumber -= uint64(pkts[len(pkts)-1].SequenceNumber - pkts[i].SequenceNumber) pPkt.ExtTimestamp -= uint64(pkts[len(pkts)-1].Timestamp - pkts[i].Timestamp) diff --git a/pkg/sfu/sequencer.go b/pkg/sfu/sequencer.go index c876b63c9..22750a037 100644 --- a/pkg/sfu/sequencer.go +++ b/pkg/sfu/sequencer.go @@ -104,7 +104,7 @@ type sequencer struct { func newSequencer(size int, maybeSparse bool, logger logger.Logger) *sequencer { s := &sequencer{ size: size, - startTime: time.Now().UnixMilli(), + startTime: time.Now().UnixNano(), meta: make([]packetMeta, size), rtt: defaultRtt, logger: logger, @@ -128,7 +128,7 @@ func (s *sequencer) setRTT(rtt uint32) { } func (s *sequencer) push( - packetTime time.Time, + packetTime int64, extIncomingSN, extModifiedSN uint64, extModifiedTS uint64, marker bool, @@ -296,7 +296,7 @@ func (s *sequencer) getExtPacketMetas(seqNo []uint16) []extPacketMeta { snOffset := uint64(0) var err error extPacketMetas := make([]extPacketMeta, 0, len(seqNo)) - refTime := s.getRefTime(time.Now()) + refTime := s.getRefTime(time.Now().UnixNano()) highestSN := uint16(s.extHighestSN) highestTS := uint32(s.extHighestTS) for _, sn := range seqNo { @@ -357,8 +357,8 @@ func (s *sequencer) getExtPacketMetas(seqNo []uint16) []extPacketMeta { return extPacketMetas } -func (s *sequencer) getRefTime(at time.Time) uint32 { - return uint32(at.UnixMilli() - s.startTime) +func (s *sequencer) getRefTime(at int64) uint32 { + return uint32((at - s.startTime) / 1e6) } func (s *sequencer) updateSNOffset() { diff --git a/pkg/sfu/sequencer_test.go b/pkg/sfu/sequencer_test.go index f8eabe045..773c54309 100644 --- a/pkg/sfu/sequencer_test.go +++ b/pkg/sfu/sequencer_test.go @@ -29,11 +29,11 @@ func Test_sequencer(t *testing.T) { off := uint16(15) for i := uint64(1); i < 518; i++ { - seq.push(time.Now(), i, i+uint64(off), 123, true, 2, nil, 0, nil, nil) + seq.push(time.Now().UnixNano(), i, i+uint64(off), 123, true, 2, nil, 0, nil, nil) } // send the last two out-of-order - seq.push(time.Now(), 519, 519+uint64(off), 123, false, 2, nil, 0, nil, nil) - seq.push(time.Now(), 518, 518+uint64(off), 123, true, 2, nil, 0, nil, nil) + seq.push(time.Now().UnixNano(), 519, 519+uint64(off), 123, false, 2, nil, 0, nil, nil) + seq.push(time.Now().UnixNano(), 518, 518+uint64(off), 123, true, 2, nil, 0, nil, nil) req := []uint16{57, 58, 62, 63, 513, 514, 515, 516, 517} res := seq.getExtPacketMetas(req) @@ -63,14 +63,14 @@ func Test_sequencer(t *testing.T) { require.Equal(t, val.extTimestamp, uint64(123)) } - seq.push(time.Now(), 521, 521+uint64(off), 123, true, 1, nil, 0, nil, nil) + seq.push(time.Now().UnixNano(), 521, 521+uint64(off), 123, true, 1, nil, 0, nil, nil) m := seq.getExtPacketMetas([]uint16{521 + off}) require.Equal(t, 0, len(m)) time.Sleep((ignoreRetransmission + 10) * time.Millisecond) m = seq.getExtPacketMetas([]uint16{521 + off}) require.Equal(t, 1, len(m)) - seq.push(time.Now(), 505, 505+uint64(off), 123, false, 1, nil, 0, nil, nil) + seq.push(time.Now().UnixNano(), 505, 505+uint64(off), 123, false, 1, nil, 0, nil, nil) m = seq.getExtPacketMetas([]uint16{505 + off}) require.Equal(t, 0, len(m)) time.Sleep((ignoreRetransmission + 10) * time.Millisecond) @@ -157,7 +157,7 @@ func Test_sequencer_getNACKSeqNo_exclusion(t *testing.T) { } else { if i.seqNo%5 == 0 { n.push( - time.Now(), + time.Now().UnixNano(), i.seqNo, i.seqNo+tt.fields.offset, 123, @@ -171,7 +171,7 @@ func Test_sequencer_getNACKSeqNo_exclusion(t *testing.T) { } else { if i.seqNo%2 == 0 { n.push( - time.Now(), + time.Now().UnixNano(), i.seqNo, i.seqNo+tt.fields.offset, 123, @@ -184,7 +184,7 @@ func Test_sequencer_getNACKSeqNo_exclusion(t *testing.T) { ) } else { n.push( - time.Now(), + time.Now().UnixNano(), i.seqNo, i.seqNo+tt.fields.offset, 123, @@ -311,7 +311,7 @@ func Test_sequencer_getNACKSeqNo_no_exclusion(t *testing.T) { } else { if i.seqNo%2 == 0 { n.push( - time.Now(), + time.Now().UnixNano(), i.seqNo, i.seqNo+tt.fields.offset, 123, @@ -324,7 +324,7 @@ func Test_sequencer_getNACKSeqNo_no_exclusion(t *testing.T) { ) } else { n.push( - time.Now(), + time.Now().UnixNano(), i.seqNo, i.seqNo+tt.fields.offset, 123, diff --git a/pkg/sfu/testutils/data.go b/pkg/sfu/testutils/data.go index 1bebc420b..04903a26e 100644 --- a/pkg/sfu/testutils/data.go +++ b/pkg/sfu/testutils/data.go @@ -66,7 +66,7 @@ func GetTestExtPacket(params *TestExtPacketParams) (*buffer.ExtPacket, error) { VideoLayer: params.VideoLayer, ExtSequenceNumber: uint64(params.SNCycles<<16) + uint64(params.SequenceNumber), ExtTimestamp: uint64(params.TSCycles<<32) + uint64(params.Timestamp), - Arrival: params.ArrivalTime, + Arrival: params.ArrivalTime.UnixNano(), Packet: &packet, KeyFrame: params.IsKeyFrame, RawPacket: raw, diff --git a/pkg/sfu/utils/wraparound.go b/pkg/sfu/utils/wraparound.go index e37d5ca27..a1a0441ca 100644 --- a/pkg/sfu/utils/wraparound.go +++ b/pkg/sfu/utils/wraparound.go @@ -16,6 +16,8 @@ package utils import ( "unsafe" + + "go.uber.org/zap/zapcore" ) type number interface { @@ -65,6 +67,19 @@ type WrapAroundUpdateResult[ET extendedNumber] struct { ExtendedVal ET } +func (w *WrapAroundUpdateResult[ET]) MarshalLogObject(e zapcore.ObjectEncoder) error { + if w == nil { + return nil + } + + e.AddBool("IsUnhandled", w.IsUnhandled) + e.AddBool("IsRestart", w.IsRestart) + e.AddUint64("PreExtendedStart", uint64(w.PreExtendedStart)) + e.AddUint64("PreExtendedHighest", uint64(w.PreExtendedHighest)) + e.AddUint64("ExtendedVal", uint64(w.ExtendedVal)) + return nil +} + func (w *WrapAround[T, ET]) Update(val T) (result WrapAroundUpdateResult[ET]) { if !w.initialized { result.PreExtendedHighest = ET(val) - 1 @@ -96,6 +111,21 @@ func (w *WrapAround[T, ET]) Update(val T) (result WrapAroundUpdateResult[ET]) { return } +func (w *WrapAround[T, ET]) Rollover(val T, numCycles int) (result WrapAroundUpdateResult[ET]) { + if !w.initialized || numCycles == 0 { + return w.Update(val) + } + + result.PreExtendedHighest = w.extendedHighest + + w.cycles += ET(numCycles) * w.fullRange + w.highest = val + + w.updateExtendedHighest() + result.ExtendedVal = w.extendedHighest + return +} + func (w *WrapAround[T, ET]) RollbackRestart(ev ET) { if w.isWrapBack(w.start, T(ev)) { w.cycles -= w.fullRange diff --git a/pkg/sfu/utils/wraparound_test.go b/pkg/sfu/utils/wraparound_test.go index 018e227fc..76cb8fdb3 100644 --- a/pkg/sfu/utils/wraparound_test.go +++ b/pkg/sfu/utils/wraparound_test.go @@ -448,6 +448,91 @@ func TestWrapAroundUint16WrapAroundRestartDuplicate(t *testing.T) { require.Equal(t, uint64(65568), w.GetExtendedHighest()) } +func TestWrapAroundUint16Rollover(t *testing.T) { + w := NewWrapAround[uint16, uint32](WrapAroundParams{IsRestartAllowed: false}) + testCases := []struct { + name string + input uint16 + numCycles int + updated WrapAroundUpdateResult[uint32] + start uint16 + extendedStart uint32 + highest uint16 + extendedHighest uint32 + }{ + // initialize - should initialize irrespective of numCycles + { + name: "initialize", + input: 10, + numCycles: 10, + updated: WrapAroundUpdateResult[uint32]{ + IsRestart: false, + PreExtendedStart: 0, + PreExtendedHighest: 9, + ExtendedVal: 10, + }, + start: 10, + extendedStart: 10, + highest: 10, + extendedHighest: 10, + }, + // zero cycles - should just do an update + { + name: "zero", + input: 8, + updated: WrapAroundUpdateResult[uint32]{ + IsUnhandled: true, + // the following fields are not valid when `IsUnhandled = true`, but code fills it in + // and they are filled in here for testing purposes + PreExtendedHighest: 10, + ExtendedVal: 8, + }, + start: 10, + extendedStart: 10, + highest: 10, + extendedHighest: 10, + }, + // one cycle + { + name: "one cycle", + input: (1 << 16) - 6, + numCycles: 1, + updated: WrapAroundUpdateResult[uint32]{ + PreExtendedHighest: 10, + ExtendedVal: (1 << 16) - 6 + (1 << 16), + }, + start: 10, + extendedStart: 10, + highest: (1 << 16) - 6, + extendedHighest: (1 << 16) - 6 + (1 << 16), + }, + // two cycles + { + name: "two cycles", + input: (1 << 16) - 7, + numCycles: 2, + updated: WrapAroundUpdateResult[uint32]{ + PreExtendedHighest: (1 << 16) - 6 + (1 << 16), + ExtendedVal: (1 << 16) - 7 + 3*(1<<16), + }, + start: 10, + extendedStart: 10, + highest: (1 << 16) - 7, + extendedHighest: (1 << 16) - 7 + 3*(1<<16), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + require.Equal(t, tc.updated, w.Rollover(tc.input, tc.numCycles)) + require.Equal(t, tc.start, w.GetStart()) + require.Equal(t, tc.extendedStart, w.GetExtendedStart()) + require.Equal(t, tc.highest, w.GetHighest()) + require.Equal(t, tc.extendedHighest, w.GetExtendedHighest()) + }) + } +} + func TestWrapAroundUint32(t *testing.T) { w := NewWrapAround[uint32, uint64](WrapAroundParams{IsRestartAllowed: true}) testCases := []struct {