From 53d300ba7155669e22a6015eb199ba3e4742fab2 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Sun, 12 Jan 2025 00:56:46 +0530 Subject: [PATCH] Use nano time for easier (and hopefully) faster checks/calculations. (#3323) --- go.mod | 6 +- go.sum | 12 ++-- pkg/sfu/downtrack.go | 6 +- pkg/sfu/rtpstats/rtpstats_base.go | 56 ++++++++-------- pkg/sfu/rtpstats/rtpstats_base_lite.go | 77 +++++++++++----------- pkg/sfu/rtpstats/rtpstats_receiver.go | 15 +++-- pkg/sfu/rtpstats/rtpstats_receiver_lite.go | 7 +- pkg/sfu/rtpstats/rtpstats_sender.go | 69 +++++++++---------- pkg/sfu/rtpstats/rtpstats_sender_lite.go | 7 +- pkg/sfu/utils/wraparound.go | 2 +- 10 files changed, 130 insertions(+), 127 deletions(-) diff --git a/go.mod b/go.mod index 727b87d0a..3dbd992d5 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20241220010243-a2bdee945564 - github.com/livekit/protocol v1.30.1-0.20250106062425-83e359fc95bf + github.com/livekit/protocol v1.30.1-0.20250111191311-7b5c5cd3dd53 github.com/livekit/psrpc v0.6.1-0.20241018124827-1efff3d113a8 github.com/mackerelio/go-osstat v0.2.5 github.com/magefile/mage v1.15.0 @@ -55,7 +55,7 @@ require ( go.uber.org/zap v1.27.0 golang.org/x/exp v0.0.0-20250106191152-7588d65b2ba8 golang.org/x/sync v0.10.0 - google.golang.org/protobuf v1.36.1 + google.golang.org/protobuf v1.36.2 gopkg.in/yaml.v3 v3.0.1 ) @@ -136,7 +136,7 @@ require ( golang.org/x/text v0.21.0 // indirect golang.org/x/tools v0.29.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20241015192408-796eee8c2d53 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20241219192143-6b3ec007d9bb // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250106144421-5f5ef82da422 // indirect google.golang.org/grpc v1.69.2 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index 47dc1d506..9369c3cec 100644 --- a/go.sum +++ b/go.sum @@ -167,8 +167,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20241220010243-a2bdee945564 h1:GX7KF/V9ExmcfT/2Bdia8aROjkxrgx7WpyH7w9MB4J4= github.com/livekit/mediatransportutil v0.0.0-20241220010243-a2bdee945564/go.mod h1:36s+wwmU3O40IAhE+MjBWP3W71QRiEE9SfooSBvtBqY= -github.com/livekit/protocol v1.30.1-0.20250106062425-83e359fc95bf h1:6sLvfeHO1qux5LJEHs+qDam0/xLE0693HzD8rXirYnY= -github.com/livekit/protocol v1.30.1-0.20250106062425-83e359fc95bf/go.mod h1:08wT2rI6GecTCwh9n8OA28Gb7ZQNtIR+hX/LccP1TaY= +github.com/livekit/protocol v1.30.1-0.20250111191311-7b5c5cd3dd53 h1:f2yuRKAa4XWo1ILgyZWyYPA8VzOITWbk3B/md51goW8= +github.com/livekit/protocol v1.30.1-0.20250111191311-7b5c5cd3dd53/go.mod h1:08wT2rI6GecTCwh9n8OA28Gb7ZQNtIR+hX/LccP1TaY= github.com/livekit/psrpc v0.6.1-0.20241018124827-1efff3d113a8 h1:Ibh0LoFl5NW5a1KFJEE0eLxxz7dqqKmYTj/BfCb0PbY= github.com/livekit/psrpc v0.6.1-0.20241018124827-1efff3d113a8/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o= @@ -474,12 +474,12 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/genproto/googleapis/api v0.0.0-20241015192408-796eee8c2d53 h1:fVoAXEKA4+yufmbdVYv+SE73+cPZbbbe8paLsHfkK+U= google.golang.org/genproto/googleapis/api v0.0.0-20241015192408-796eee8c2d53/go.mod h1:riSXTwQ4+nqmPGtobMFyW5FqVAmIs0St6VPp4Ug7CE4= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241219192143-6b3ec007d9bb h1:3oy2tynMOP1QbTC0MsNNAV+Se8M2Bd0A5+x1QHyw+pI= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241219192143-6b3ec007d9bb/go.mod h1:lcTa1sDdWEIHMWlITnIczmw5w60CF9ffkb8Z+DVmmjA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250106144421-5f5ef82da422 h1:3UsHvIr4Wc2aW4brOaSCmcxh9ksica6fHEr8P1XhkYw= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250106144421-5f5ef82da422/go.mod h1:3ENsm/5D1mzDyhpzeRi1NR784I0BcofWBoSc5QqqMK4= google.golang.org/grpc v1.69.2 h1:U3S9QEtbXC0bYNvRtcoklF3xGtLViumSYxWykJS+7AU= google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4= -google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk= -google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.2 h1:R8FeyR1/eLmkutZOM5CWghmo5itiG9z0ktFlTVLuTmU= +google.golang.org/protobuf v1.36.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 1c5c3c682..6f7a7fac2 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -1008,7 +1008,7 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool, forceMa // Hold sending padding packets till first RTCP-RR is received for this RTP stream. // That is definitive proof that the remote side knows about this RTP stream. - if d.rtpStats.LastReceiverReportTime().IsZero() && !paddingOnMute { + if d.rtpStats.LastReceiverReportTime() == 0 && !paddingOnMute { return 0 } @@ -2093,7 +2093,7 @@ func (d *DownTrack) WriteProbePackets(bytesToSend int, usePadding bool) int { if !d.writable.Load() || !d.rtpStats.IsActive() || (d.absSendTimeExtID == 0 && d.transportWideExtID == 0) || - d.rtpStats.LastReceiverReportTime().IsZero() || + d.rtpStats.LastReceiverReportTime() == 0 || d.sequencer == nil { return 0 } @@ -2261,7 +2261,7 @@ func (d *DownTrack) GetDeltaStatsSender() map[uint32]*buffer.StreamStatsWithLaye } func (d *DownTrack) GetPrimaryStreamLastReceiverReportTime() time.Time { - return d.rtpStats.LastReceiverReportTime() + return time.Unix(0, d.rtpStats.LastReceiverReportTime()) } func (d *DownTrack) GetPrimaryStreamPacketsSent() uint64 { diff --git a/pkg/sfu/rtpstats/rtpstats_base.go b/pkg/sfu/rtpstats/rtpstats_base.go index 40ec921c3..d42a40df8 100644 --- a/pkg/sfu/rtpstats/rtpstats_base.go +++ b/pkg/sfu/rtpstats/rtpstats_base.go @@ -24,6 +24,7 @@ import ( "github.com/livekit/mediatransportutil" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/utils" + "github.com/livekit/protocol/utils/mono" ) const ( @@ -298,7 +299,7 @@ func (r *rtpStatsBase) newSnapshotID(extStartSN uint64) uint32 { } if r.initialized { - r.snapshots[id-cFirstSnapshotID] = initSnapshot(time.Now(), extStartSN) + r.snapshots[id-cFirstSnapshotID] = initSnapshot(mono.UnixNano(), extStartSN) } return id } @@ -307,7 +308,7 @@ func (r *rtpStatsBase) UpdateFir(firCount uint32) { r.lock.Lock() defer r.lock.Unlock() - if !r.endTime.IsZero() { + if r.endTime != 0 { return } @@ -318,7 +319,7 @@ func (r *rtpStatsBase) UpdateFirTime() { r.lock.Lock() defer r.lock.Unlock() - if !r.endTime.IsZero() { + if r.endTime != 0 { return } @@ -329,7 +330,7 @@ func (r *rtpStatsBase) UpdateKeyFrame(kfCount uint32) { r.lock.Lock() defer r.lock.Unlock() - if !r.endTime.IsZero() { + if r.endTime != 0 { return } @@ -341,7 +342,7 @@ func (r *rtpStatsBase) UpdateRtt(rtt uint32) { r.lock.Lock() defer r.lock.Unlock() - if !r.endTime.IsZero() { + if r.endTime != 0 { return } @@ -366,7 +367,8 @@ func (r *rtpStatsBase) GetRtt() uint32 { } func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *livekit.RTCPSenderReportState, tsOffset uint64, extStartTS uint64) (err error, loggingFields []interface{}) { - if time.Since(r.startTime) > cFirstPacketTimeAdjustWindow { + nowNano := mono.UnixNano() + if time.Duration(nowNano-r.startTime) > cFirstPacketTimeAdjustWindow { return } @@ -376,7 +378,7 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *livekit.RTCPSenderRepo // abnormal delay (maybe due to pacing or maybe due to queuing // in some network element along the way), push back first time // to an earlier instance. - timeSinceReceive := time.Since(time.Unix(0, srData.AtAdjusted)) + timeSinceReceive := time.Duration(nowNano - srData.AtAdjusted) extNowTS := srData.RtpTimestampExt - tsOffset + uint64(timeSinceReceive.Nanoseconds()*int64(r.params.ClockRate)/1e9) samplesDiff := int64(extNowTS - extStartTS) if samplesDiff < 0 { @@ -385,13 +387,13 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *livekit.RTCPSenderRepo } samplesDuration := time.Duration(float64(samplesDiff) / float64(r.params.ClockRate) * float64(time.Second)) - timeSinceFirst := time.Since(time.Unix(0, r.firstTime)) + timeSinceFirst := time.Duration(nowNano - r.firstTime) now := r.firstTime + timeSinceFirst.Nanoseconds() firstTime := now - samplesDuration.Nanoseconds() getFields := func() []interface{} { return []interface{}{ - "startTime", r.startTime, + "startTime", time.Unix(0, r.startTime), "nowTime", time.Unix(0, now), "before", time.Unix(0, r.firstTime), "after", time.Unix(0, firstTime), @@ -455,7 +457,7 @@ func (r *rtpStatsBase) deltaInfo( "snapshotID", snapshotID, "snapshotNow", now, "snapshotThen", then, - "duration", endTime.Sub(startTime), + "duration", time.Duration(endTime - startTime), "packetsExpected", packetsExpected, } err = errors.New("too many packets expected in delta") @@ -463,8 +465,8 @@ func (r *rtpStatsBase) deltaInfo( } if packetsExpected == 0 { deltaInfo = &RTPDeltaInfo{ - StartTime: startTime, - EndTime: endTime, + StartTime: time.Unix(0, startTime), + EndTime: time.Unix(0, endTime), } return } @@ -481,7 +483,7 @@ func (r *rtpStatsBase) deltaInfo( "snapshotID", snapshotID, "snapshotNow", now, "snapshotThen", then, - "duration", endTime.Sub(startTime), + "duration", time.Duration(endTime - startTime), "packetsExpected", packetsExpected, "packetsPadding", packetsPadding, "packetsLost", packetsLost, @@ -493,8 +495,8 @@ func (r *rtpStatsBase) deltaInfo( } deltaInfo = &RTPDeltaInfo{ - StartTime: startTime, - EndTime: endTime, + StartTime: time.Unix(0, startTime), + EndTime: time.Unix(0, endTime), Packets: uint32(packetsExpected), Bytes: now.bytes - then.bytes, HeaderBytes: now.headerBytes - then.headerBytes, @@ -667,7 +669,7 @@ func (r *rtpStatsBase) getAndResetSnapshot(snapshotID uint32, extStartSN uint64, } // snapshot now - now := r.getSnapshot(time.Now(), extHighestSN+1) + now := r.getSnapshot(mono.UnixNano(), extHighestSN+1) r.snapshots[idx] = now return &then, &now } @@ -765,7 +767,7 @@ func (r *rtpStatsBase) updateGapHistogram(gap int) { } } -func (r *rtpStatsBase) getSnapshot(startTime time.Time, extStartSN uint64) snapshot { +func (r *rtpStatsBase) getSnapshot(startTime int64, extStartSN uint64) snapshot { return snapshot{ snapshotLite: r.getSnapshotLite(startTime, extStartSN), headerBytes: r.headerBytes, @@ -785,7 +787,7 @@ func (r *rtpStatsBase) getSnapshot(startTime time.Time, extStartSN uint64) snaps // ---------------------------------- -func initSnapshot(startTime time.Time, extStartSN uint64) snapshot { +func initSnapshot(startTime int64, extStartSN uint64) snapshot { return snapshot{ snapshotLite: initSnapshotLite(startTime, extStartSN), } @@ -800,8 +802,8 @@ func AggregateRTPDeltaInfo(deltaInfoList []*RTPDeltaInfo) *RTPDeltaInfo { return nil } - startTime := time.Time{} - endTime := time.Time{} + startTime := int64(0) + endTime := int64(0) packets := uint32(0) bytes := uint64(0) @@ -833,12 +835,12 @@ func AggregateRTPDeltaInfo(deltaInfoList []*RTPDeltaInfo) *RTPDeltaInfo { continue } - if startTime.IsZero() || startTime.After(deltaInfo.StartTime) { - startTime = deltaInfo.StartTime + if startTime == 0 || startTime > deltaInfo.StartTime.UnixNano() { + startTime = deltaInfo.StartTime.UnixNano() } - if endTime.IsZero() || endTime.Before(deltaInfo.EndTime) { - endTime = deltaInfo.EndTime + if endTime == 0 || endTime < deltaInfo.EndTime.UnixNano() { + endTime = deltaInfo.EndTime.UnixNano() } packets += deltaInfo.Packets @@ -871,13 +873,13 @@ func AggregateRTPDeltaInfo(deltaInfoList []*RTPDeltaInfo) *RTPDeltaInfo { plis += deltaInfo.Plis firs += deltaInfo.Firs } - if startTime.IsZero() || endTime.IsZero() { + if startTime == 0 || endTime == 0 { return nil } return &RTPDeltaInfo{ - StartTime: startTime, - EndTime: endTime, + StartTime: time.Unix(0, startTime), + EndTime: time.Unix(0, endTime), Packets: packets, Bytes: bytes, HeaderBytes: headerBytes, diff --git a/pkg/sfu/rtpstats/rtpstats_base_lite.go b/pkg/sfu/rtpstats/rtpstats_base_lite.go index cff08e078..6e30a6702 100644 --- a/pkg/sfu/rtpstats/rtpstats_base_lite.go +++ b/pkg/sfu/rtpstats/rtpstats_base_lite.go @@ -22,6 +22,7 @@ import ( "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" + "github.com/livekit/protocol/utils/mono" "go.uber.org/zap/zapcore" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -64,7 +65,7 @@ func (r *RTPDeltaInfoLite) MarshalLogObject(e zapcore.ObjectEncoder) error { type snapshotLite struct { isValid bool - startTime time.Time + startTime int64 extStartSN uint64 bytes uint64 @@ -82,7 +83,7 @@ func (s *snapshotLite) MarshalLogObject(e zapcore.ObjectEncoder) error { } e.AddBool("isValid", s.isValid) - e.AddTime("startTime", s.startTime) + e.AddTime("startTime", time.Unix(0, s.startTime)) e.AddUint64("extStartSN", s.extStartSN) e.AddUint64("bytes", s.bytes) e.AddUint64("packetsOutOfOrder", s.packetsOutOfOrder) @@ -106,8 +107,8 @@ type rtpStatsBaseLite struct { initialized bool - startTime time.Time - endTime time.Time + startTime int64 + endTime int64 bytes uint64 @@ -123,7 +124,7 @@ type rtpStatsBaseLite struct { nackRepeated uint32 plis uint32 - lastPli time.Time + lastPli int64 nextSnapshotLiteID uint32 snapshotLites []snapshotLite @@ -178,7 +179,7 @@ func (r *rtpStatsBaseLite) Stop() { r.lock.Lock() defer r.lock.Unlock() - r.endTime = time.Now() + r.endTime = mono.UnixNano() } func (r *rtpStatsBaseLite) newSnapshotLiteID(extStartSN uint64) uint32 { @@ -192,7 +193,7 @@ func (r *rtpStatsBaseLite) newSnapshotLiteID(extStartSN uint64) uint32 { } if r.initialized { - r.snapshotLites[id-cFirstSnapshotID] = initSnapshotLite(time.Now(), extStartSN) + r.snapshotLites[id-cFirstSnapshotID] = initSnapshotLite(mono.UnixNano(), extStartSN) } return id } @@ -201,14 +202,14 @@ func (r *rtpStatsBaseLite) IsActive() bool { r.lock.RLock() defer r.lock.RUnlock() - return r.initialized && r.endTime.IsZero() + return r.initialized && r.endTime == 0 } func (r *rtpStatsBaseLite) UpdateNack(nackCount uint32) { r.lock.Lock() defer r.lock.Unlock() - if !r.endTime.IsZero() { + if r.endTime != 0 { return } @@ -219,7 +220,7 @@ func (r *rtpStatsBaseLite) UpdateNackProcessed(nackAckCount uint32, nackMissCoun r.lock.Lock() defer r.lock.Unlock() - if !r.endTime.IsZero() { + if r.endTime != 0 { return } @@ -232,7 +233,7 @@ func (r *rtpStatsBaseLite) CheckAndUpdatePli(throttle int64, force bool) bool { r.lock.Lock() defer r.lock.Unlock() - if !r.endTime.IsZero() || (!force && time.Now().UnixNano()-r.lastPli.UnixNano() < throttle) { + if r.endTime != 0 || (!force && mono.UnixNano()-r.lastPli < throttle) { return false } r.updatePliLocked(1) @@ -244,7 +245,7 @@ func (r *rtpStatsBaseLite) UpdatePliAndTime(pliCount uint32) { r.lock.Lock() defer r.lock.Unlock() - if !r.endTime.IsZero() { + if r.endTime != 0 { return } @@ -256,7 +257,7 @@ func (r *rtpStatsBaseLite) UpdatePli(pliCount uint32) { r.lock.Lock() defer r.lock.Unlock() - if !r.endTime.IsZero() { + if r.endTime != 0 { return } @@ -271,7 +272,7 @@ func (r *rtpStatsBaseLite) UpdatePliTime() { r.lock.Lock() defer r.lock.Unlock() - if !r.endTime.IsZero() { + if r.endTime != 0 { return } @@ -279,10 +280,10 @@ func (r *rtpStatsBaseLite) UpdatePliTime() { } func (r *rtpStatsBaseLite) updatePliTimeLocked() { - r.lastPli = time.Now() + r.lastPli = mono.UnixNano() } -func (r *rtpStatsBaseLite) LastPli() time.Time { +func (r *rtpStatsBaseLite) LastPli() int64 { r.lock.RLock() defer r.lock.RUnlock() @@ -322,15 +323,15 @@ func (r *rtpStatsBaseLite) deltaInfoLite( "snapshotLiteNow", now, "snapshotLiteThen", then, "packetsExpected", packetsExpected, - "duration", endTime.Sub(startTime).String(), + "duration", time.Duration(endTime - startTime), } err = errors.New("too many packets expected in delta lite") return } if packetsExpected == 0 { deltaInfoLite = &RTPDeltaInfoLite{ - StartTime: startTime, - EndTime: endTime, + StartTime: time.Unix(0, startTime), + EndTime: time.Unix(0, endTime), } return } @@ -346,14 +347,14 @@ func (r *rtpStatsBaseLite) deltaInfoLite( "snapshotLiteThen", then, "packetsExpected", packetsExpected, "packetsLost", packetsLost, - "duration", endTime.Sub(startTime).String(), + "duration", time.Duration(endTime - startTime), } err = errors.New("unexpected number of packets lost in delta lite") } deltaInfoLite = &RTPDeltaInfoLite{ - StartTime: startTime, - EndTime: endTime, + StartTime: time.Unix(0, startTime), + EndTime: time.Unix(0, endTime), Packets: packetsExpected, Bytes: now.bytes - then.bytes, PacketsLost: packetsLost, @@ -369,17 +370,17 @@ func (r *rtpStatsBaseLite) marshalLogObject(e zapcore.ObjectEncoder, packetsExpe } endTime := r.endTime - if endTime.IsZero() { - endTime = time.Now() + if endTime == 0 { + endTime = mono.UnixNano() } - elapsed := endTime.Sub(r.startTime) + elapsed := time.Duration(endTime - r.startTime) if elapsed == 0 { return 0, errors.New("no time elapsed") } elapsedSeconds := elapsed.Seconds() - e.AddTime("startTime", r.startTime) - e.AddTime("endTime", r.endTime) + e.AddTime("startTime", time.Unix(0, r.startTime)) + e.AddTime("endTime", time.Unix(0, r.endTime)) e.AddDuration("elapsed", elapsed) e.AddUint64("packetsExpected", packetsExpected) @@ -424,20 +425,20 @@ func (r *rtpStatsBaseLite) marshalLogObject(e zapcore.ObjectEncoder, packetsExpe e.AddUint32("nackRepeated", r.nackRepeated) e.AddUint32("plis", r.plis) - e.AddTime("lastPli", r.lastPli) + e.AddTime("lastPli", time.Unix(0, r.lastPli)) return elapsedSeconds, nil } func (r *rtpStatsBaseLite) toProto(packetsExpected, packetsSeenMinusPadding, packetsLost uint64) *livekit.RTPStats { - if r.startTime.IsZero() { + if r.startTime == 0 { return nil } endTime := r.endTime - if endTime.IsZero() { - endTime = time.Now() + if endTime == 0 { + endTime = mono.UnixNano() } - elapsed := endTime.Sub(r.startTime).Seconds() + elapsed := time.Duration(endTime - r.startTime).Seconds() if elapsed == 0.0 { return nil } @@ -452,8 +453,8 @@ func (r *rtpStatsBaseLite) toProto(packetsExpected, packetsSeenMinusPadding, pac } p := &livekit.RTPStats{ - StartTime: timestamppb.New(r.startTime), - EndTime: timestamppb.New(endTime), + StartTime: timestamppb.New(time.Unix(0, r.startTime)), + EndTime: timestamppb.New(time.Unix(0, endTime)), Duration: elapsed, Packets: uint32(packetsSeenMinusPadding), PacketRate: packetRate, @@ -468,7 +469,7 @@ func (r *rtpStatsBaseLite) toProto(packetsExpected, packetsSeenMinusPadding, pac NackMisses: r.nackMisses, NackRepeated: r.nackRepeated, Plis: r.plis, - LastPli: timestamppb.New(r.lastPli), + LastPli: timestamppb.New(time.Unix(0, r.lastPli)), } gapsPresent := false @@ -508,7 +509,7 @@ func (r *rtpStatsBaseLite) getAndResetSnapshotLite(snapshotLiteID uint32, extSta } // snapshot now - now := r.getSnapshotLite(time.Now(), extHighestSN+1) + now := r.getSnapshotLite(mono.UnixNano(), extHighestSN+1) r.snapshotLites[idx] = now return &then, &now } @@ -526,7 +527,7 @@ func (r *rtpStatsBaseLite) updateGapHistogram(gap int) { } } -func (r *rtpStatsBaseLite) getSnapshotLite(startTime time.Time, extStartSN uint64) snapshotLite { +func (r *rtpStatsBaseLite) getSnapshotLite(startTime int64, extStartSN uint64) snapshotLite { return snapshotLite{ isValid: true, startTime: startTime, @@ -540,7 +541,7 @@ func (r *rtpStatsBaseLite) getSnapshotLite(startTime time.Time, extStartSN uint6 // ---------------------------------- -func initSnapshotLite(startTime time.Time, extStartSN uint64) snapshotLite { +func initSnapshotLite(startTime int64, extStartSN uint64) snapshotLite { return snapshotLite{ isValid: true, startTime: startTime, diff --git a/pkg/sfu/rtpstats/rtpstats_receiver.go b/pkg/sfu/rtpstats/rtpstats_receiver.go index a3dd7936e..95d92d493 100644 --- a/pkg/sfu/rtpstats/rtpstats_receiver.go +++ b/pkg/sfu/rtpstats/rtpstats_receiver.go @@ -27,6 +27,7 @@ import ( "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" protoutils "github.com/livekit/protocol/utils" + "github.com/livekit/protocol/utils/mono" ) const ( @@ -137,7 +138,7 @@ func (r *RTPStatsReceiver) Update( r.lock.Lock() defer r.lock.Unlock() - if !r.endTime.IsZero() { + if r.endTime != 0 { flowState.IsNotHandled = true return } @@ -181,7 +182,7 @@ func (r *RTPStatsReceiver) Update( r.initialized = true - r.startTime = time.Now() + r.startTime = mono.UnixNano() r.firstTime = packetTime r.highestTime = packetTime @@ -275,14 +276,13 @@ func (r *RTPStatsReceiver) Update( } if r.isInRange(resSN.ExtendedVal, resSN.PreExtendedHighest) { - if r.history.IsSet(resSN.ExtendedVal) { + if r.history.GetAndSet(resSN.ExtendedVal) { r.bytesDuplicate += pktSize r.headerBytesDuplicate += uint64(hdrSize) r.packetsDuplicate++ flowState.IsDuplicate = true } else { r.packetsLost-- - r.history.Set(resSN.ExtendedVal) } } @@ -461,14 +461,15 @@ func (r *RTPStatsReceiver) checkRTPClockSkewAgainstMediaPathForSenderReport(srDa return } - timeSinceSR := time.Since(time.Unix(0, srData.AtAdjusted)) + nowNano := mono.UnixNano() + timeSinceSR := time.Duration(nowNano - srData.AtAdjusted) extNowTSSR := srData.RtpTimestampExt + uint64(timeSinceSR.Nanoseconds()*int64(r.params.ClockRate)/1e9) - timeSinceHighest := time.Since(time.Unix(0, r.highestTime)) + timeSinceHighest := time.Duration(nowNano - r.highestTime) extNowTSHighest := r.timestamp.GetExtendedHighest() + uint64(timeSinceHighest.Nanoseconds()*int64(r.params.ClockRate)/1e9) diffHighest := extNowTSSR - extNowTSHighest - timeSinceFirst := time.Since(time.Unix(0, r.firstTime)) + timeSinceFirst := time.Duration(nowNano - r.firstTime) extNowTSFirst := r.timestamp.GetExtendedStart() + uint64(timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9) diffFirst := extNowTSSR - extNowTSFirst diff --git a/pkg/sfu/rtpstats/rtpstats_receiver_lite.go b/pkg/sfu/rtpstats/rtpstats_receiver_lite.go index 5f6ef29c7..84812a323 100644 --- a/pkg/sfu/rtpstats/rtpstats_receiver_lite.go +++ b/pkg/sfu/rtpstats/rtpstats_receiver_lite.go @@ -15,12 +15,11 @@ package rtpstats import ( - "time" - "go.uber.org/zap/zapcore" "github.com/livekit/livekit-server/pkg/sfu/utils" "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/utils/mono" ) type RTPFlowStateLite struct { @@ -70,7 +69,7 @@ func (r *RTPStatsReceiverLite) Update(packetTime int64, packetSize int, sequence r.lock.Lock() defer r.lock.Unlock() - if !r.endTime.IsZero() { + if r.endTime != 0 { flowStateLite.IsNotHandled = true return } @@ -79,7 +78,7 @@ func (r *RTPStatsReceiverLite) Update(packetTime int64, packetSize int, sequence if !r.initialized { r.initialized = true - r.startTime = time.Now() + r.startTime = mono.UnixNano() resSN = r.sequenceNumber.Update(sequenceNumber) diff --git a/pkg/sfu/rtpstats/rtpstats_sender.go b/pkg/sfu/rtpstats/rtpstats_sender.go index c47cad3fb..5f883e930 100644 --- a/pkg/sfu/rtpstats/rtpstats_sender.go +++ b/pkg/sfu/rtpstats/rtpstats_sender.go @@ -122,7 +122,7 @@ func (w wrappedReceptionReportsLogger) MarshalLogObject(e zapcore.ObjectEncoder) type senderSnapshot struct { isValid bool - startTime time.Time + startTime int64 extStartSN uint64 bytes uint64 @@ -164,7 +164,7 @@ func (s *senderSnapshot) MarshalLogObject(e zapcore.ObjectEncoder) error { } e.AddBool("isValid", s.isValid) - e.AddTime("startTime", s.startTime) + e.AddTime("startTime", time.Unix(0, s.startTime)) e.AddUint64("extStartSN", s.extStartSN) e.AddUint64("bytes", s.bytes) e.AddUint64("headerBytes", s.headerBytes) @@ -209,7 +209,7 @@ type RTPStatsSender struct { rttMarker rttMarker - lastRRTime time.Time + lastRRTime int64 lastRR rtcp.ReceptionReport extStartTS uint64 @@ -301,7 +301,7 @@ func (r *RTPStatsSender) NewSenderSnapshotId() uint32 { } if r.initialized { - r.senderSnapshots[id-cFirstSnapshotID] = initSenderSnapshot(time.Now(), r.extHighestSN) + r.senderSnapshots[id-cFirstSnapshotID] = initSenderSnapshot(mono.UnixNano(), r.extHighestSN) } return id } @@ -319,7 +319,7 @@ func (r *RTPStatsSender) Update( r.lock.Lock() defer r.lock.Unlock() - if !r.endTime.IsZero() { + if r.endTime != 0 { return } @@ -331,7 +331,7 @@ func (r *RTPStatsSender) Update( r.initialized = true - r.startTime = time.Now() + r.startTime = mono.UnixNano() r.highestTime = packetTime @@ -515,7 +515,7 @@ func (r *RTPStatsSender) UpdateLayerLockPliAndTime(pliCount uint32) { r.lock.Lock() defer r.lock.Unlock() - if !r.endTime.IsZero() { + if r.endTime != 0 { return } @@ -534,12 +534,12 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt r.lock.Lock() defer r.lock.Unlock() - if !r.initialized || !r.endTime.IsZero() { + if !r.initialized || r.endTime != 0 { return } extHighestSNFromRR := r.extHighestSNFromRR&0xFFFF_FFFF_0000_0000 + uint64(rr.LastSequenceNumber) - if !r.lastRRTime.IsZero() { + if r.lastRRTime != 0 { if (rr.LastSequenceNumber-r.lastRR.LastSequenceNumber) < (1<<31) && rr.LastSequenceNumber < r.lastRR.LastSequenceNumber { extHighestSNFromRR += (1 << 32) } @@ -550,10 +550,10 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt return } - if !r.lastRRTime.IsZero() && r.extHighestSNFromRR > extHighestSNFromRR { + if r.lastRRTime != 0 && r.extHighestSNFromRR > extHighestSNFromRR { r.logger.Debugw( fmt.Sprintf("receiver report potentially out of order, highestSN: existing: %d, received: %d", r.extHighestSNFromRR, extHighestSNFromRR), - "sinceLastRR", time.Since(r.lastRRTime), + "sinceLastRR", time.Duration(mono.UnixNano()-r.lastRRTime), "receivedRR", rr, "rtpStats", lockedRTPStatsSenderLogEncoder{r}, ) @@ -609,9 +609,9 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt } if int64(extReceivedRRSN-s.extLastRRSN) < 0 || (extReceivedRRSN-s.extLastRRSN) > (1<<15) { - timeSinceLastRR := time.Since(r.lastRRTime) - if r.lastRRTime.IsZero() { - timeSinceLastRR = time.Since(r.startTime) + timeSinceLastRR := time.Duration(mono.UnixNano() - r.lastRRTime) + if r.lastRRTime == 0 { + timeSinceLastRR = time.Duration(mono.UnixNano() - r.startTime) } r.logger.Infow( "rr interval too big, skipping", @@ -631,9 +631,9 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt eis := &s.intervalStats eis.aggregate(&is) if is.packetsNotFoundMetadata != 0 { - timeSinceLastRR := time.Since(r.lastRRTime) - if r.lastRRTime.IsZero() { - timeSinceLastRR = time.Since(r.startTime) + timeSinceLastRR := time.Duration(mono.UnixNano() - r.lastRRTime) + if r.lastRRTime == 0 { + timeSinceLastRR = time.Duration(mono.UnixNano() - r.startTime) } r.metadataCacheOverflowCount++ if (r.metadataCacheOverflowCount-1)%10 == 0 { @@ -654,12 +654,12 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt s.processedReceptionReports = append(s.processedReceptionReports, rr) } - r.lastRRTime = time.Now() + r.lastRRTime = mono.UnixNano() r.lastRR = rr return } -func (r *RTPStatsSender) LastReceiverReportTime() time.Time { +func (r *RTPStatsSender) LastReceiverReportTime() int64 { r.lock.RLock() defer r.lock.RUnlock() @@ -715,7 +715,7 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, publisherSRData *livek nowNTP = mediatransportutil.NtpTime(publisherSRData.NtpTimestamp) nowRTPExt = publisherSRData.RtpTimestampExt - tsOffset } else { - timeSincePublisherSRAdjusted := time.Since(time.Unix(0, publisherSRData.AtAdjusted)) + timeSincePublisherSRAdjusted := time.Duration(mono.UnixNano() - publisherSRData.AtAdjusted) reportTimeAdjusted = publisherSRData.AtAdjusted + timeSincePublisherSRAdjusted.Nanoseconds() reportTime = reportTimeAdjusted @@ -736,17 +736,18 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, publisherSRData *livek } ulgr := func() logger.UnlikelyLogger { + nowNano := mono.UnixNano() return r.logger.WithUnlikelyValues( "curr", WrappedRTCPSenderReportStateLogger{srData}, "feed", WrappedRTCPSenderReportStateLogger{publisherSRData}, "tsOffset", tsOffset, - "timeNow", time.Now(), + "timeNow", mono.Now(), "reportTime", time.Unix(0, reportTime), "reportTimeAdjusted", time.Unix(0, reportTimeAdjusted), - "timeSinceHighest", time.Since(time.Unix(0, r.highestTime)), - "timeSinceFirst", time.Since(time.Unix(0, r.firstTime)), - "timeSincePublisherSRAdjusted", time.Since(time.Unix(0, publisherSRData.AtAdjusted)), - "timeSincePublisherSR", time.Since(time.Unix(0, publisherSRData.At)), + "timeSinceHighest", time.Duration(nowNano-r.highestTime), + "timeSinceFirst", time.Duration(nowNano-r.firstTime), + "timeSincePublisherSRAdjusted", time.Duration(nowNano-publisherSRData.AtAdjusted), + "timeSincePublisherSR", time.Duration(nowNano-publisherSRData.At), "nowRTPExt", nowRTPExt, "rtpStats", lockedRTPStatsSenderLogEncoder{r}, ) @@ -815,7 +816,7 @@ func (r *RTPStatsSender) DeltaInfo(snapshotID uint32) *RTPDeltaInfo { func (r *RTPStatsSender) DeltaInfoSender(senderSnapshotID uint32) *RTPDeltaInfo { r.lock.Lock() defer r.lock.Unlock() - if r.lastRRTime.IsZero() { + if r.lastRRTime == 0 { return nil } @@ -835,7 +836,7 @@ func (r *RTPStatsSender) DeltaInfoSender(senderSnapshotID uint32) *RTPDeltaInfo "senderSnapshotNow", now, "senderSnapshotThen", then, "packetsExpected", packetsExpected, - "duration", endTime.Sub(startTime), + "duration", time.Duration(endTime-startTime), "rtpStats", lockedRTPStatsSenderLogEncoder{r}, ) return nil @@ -862,7 +863,7 @@ func (r *RTPStatsSender) DeltaInfoSender(senderSnapshotID uint32) *RTPDeltaInfo "packetsExpected", packetsExpected, "packetsLost", packetsLost, "packetsLostFeed", packetsLostFeed, - "duration", endTime.Sub(startTime), + "duration", time.Duration(endTime-startTime), "rtpStats", lockedRTPStatsSenderLogEncoder{r}, ) packetsLost = packetsExpected @@ -876,8 +877,8 @@ func (r *RTPStatsSender) DeltaInfoSender(senderSnapshotID uint32) *RTPDeltaInfo maxJitterTime := maxJitter / float64(r.params.ClockRate) * 1e6 return &RTPDeltaInfo{ - StartTime: startTime, - EndTime: endTime, + StartTime: time.Unix(0, startTime), + EndTime: time.Unix(0, endTime), Packets: packetsExpected - uint32(now.packetsPadding-then.packetsPadding), Bytes: now.bytes - then.bytes, HeaderBytes: now.headerBytes - then.headerBytes, @@ -933,7 +934,7 @@ func (r *RTPStatsSender) ToProto() *livekit.RTPStats { } func (r *RTPStatsSender) getAndResetSenderSnapshot(senderSnapshotID uint32) (*senderSnapshot, *senderSnapshot) { - if !r.initialized || r.lastRRTime.IsZero() { + if !r.initialized || r.lastRRTime == 0 { return nil, nil } @@ -950,7 +951,7 @@ func (r *RTPStatsSender) getAndResetSenderSnapshot(senderSnapshotID uint32) (*se return &then, &now } -func (r *RTPStatsSender) getSenderSnapshot(startTime time.Time, s *senderSnapshot) senderSnapshot { +func (r *RTPStatsSender) getSenderSnapshot(startTime int64, s *senderSnapshot) senderSnapshot { if s == nil { return senderSnapshot{} } @@ -1118,7 +1119,7 @@ func (r lockedRTPStatsSenderLogEncoder) MarshalLogObject(e zapcore.ObjectEncoder e.AddUint64("extStartTS", r.extStartTS) e.AddUint64("extHighestTS", r.extHighestTS) - e.AddTime("lastRRTime", r.lastRRTime) + e.AddTime("lastRRTime", time.Unix(0, r.lastRRTime)) e.AddReflected("lastRR", r.lastRR) e.AddUint64("extHighestSNFromRR", r.extHighestSNFromRR) e.AddUint64("packetsLostFromRR", r.packetsLostFromRR) @@ -1136,7 +1137,7 @@ func (r lockedRTPStatsSenderLogEncoder) MarshalLogObject(e zapcore.ObjectEncoder // ------------------------------------------------------------------- -func initSenderSnapshot(startTime time.Time, extStartSN uint64) senderSnapshot { +func initSenderSnapshot(startTime int64, extStartSN uint64) senderSnapshot { return senderSnapshot{ isValid: true, startTime: startTime, diff --git a/pkg/sfu/rtpstats/rtpstats_sender_lite.go b/pkg/sfu/rtpstats/rtpstats_sender_lite.go index 9590ae569..ae4223524 100644 --- a/pkg/sfu/rtpstats/rtpstats_sender_lite.go +++ b/pkg/sfu/rtpstats/rtpstats_sender_lite.go @@ -15,9 +15,8 @@ package rtpstats import ( - "time" - "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/utils/mono" "go.uber.org/zap/zapcore" ) @@ -38,14 +37,14 @@ func (r *RTPStatsSenderLite) Update(packetTime int64, packetSize int, extSequenc r.lock.Lock() defer r.lock.Unlock() - if !r.endTime.IsZero() { + if r.endTime != 0 { return } if !r.initialized { r.initialized = true - r.startTime = time.Now() + r.startTime = mono.UnixNano() r.extStartSN = extSequenceNumber r.extHighestSN = extSequenceNumber - 1 diff --git a/pkg/sfu/utils/wraparound.go b/pkg/sfu/utils/wraparound.go index 2a30a6325..c75db3448 100644 --- a/pkg/sfu/utils/wraparound.go +++ b/pkg/sfu/utils/wraparound.go @@ -120,7 +120,7 @@ func (w *WrapAround[T, ET]) UndoUpdate(result WrapAroundUpdateResult[ET]) { } func (w *WrapAround[T, ET]) Rollover(val T, numCycles int) (result WrapAroundUpdateResult[ET]) { - if !w.initialized || numCycles < 0 { + if numCycles < 0 || !w.initialized { return w.Update(val) }