Use nano time for easier (and hopefully) faster checks/calculations. (#3323)

This commit is contained in:
Raja Subramanian
2025-01-12 00:56:46 +05:30
committed by GitHub
parent 28c39efa06
commit 53d300ba71
10 changed files with 130 additions and 127 deletions
+3 -3
View File
@@ -21,7 +21,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20241220010243-a2bdee945564
github.com/livekit/protocol v1.30.1-0.20250106062425-83e359fc95bf
github.com/livekit/protocol v1.30.1-0.20250111191311-7b5c5cd3dd53
github.com/livekit/psrpc v0.6.1-0.20241018124827-1efff3d113a8
github.com/mackerelio/go-osstat v0.2.5
github.com/magefile/mage v1.15.0
@@ -55,7 +55,7 @@ require (
go.uber.org/zap v1.27.0
golang.org/x/exp v0.0.0-20250106191152-7588d65b2ba8
golang.org/x/sync v0.10.0
google.golang.org/protobuf v1.36.1
google.golang.org/protobuf v1.36.2
gopkg.in/yaml.v3 v3.0.1
)
@@ -136,7 +136,7 @@ require (
golang.org/x/text v0.21.0 // indirect
golang.org/x/tools v0.29.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20241015192408-796eee8c2d53 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241219192143-6b3ec007d9bb // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250106144421-5f5ef82da422 // indirect
google.golang.org/grpc v1.69.2 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)
+6 -6
View File
@@ -167,8 +167,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20241220010243-a2bdee945564 h1:GX7KF/V9ExmcfT/2Bdia8aROjkxrgx7WpyH7w9MB4J4=
github.com/livekit/mediatransportutil v0.0.0-20241220010243-a2bdee945564/go.mod h1:36s+wwmU3O40IAhE+MjBWP3W71QRiEE9SfooSBvtBqY=
github.com/livekit/protocol v1.30.1-0.20250106062425-83e359fc95bf h1:6sLvfeHO1qux5LJEHs+qDam0/xLE0693HzD8rXirYnY=
github.com/livekit/protocol v1.30.1-0.20250106062425-83e359fc95bf/go.mod h1:08wT2rI6GecTCwh9n8OA28Gb7ZQNtIR+hX/LccP1TaY=
github.com/livekit/protocol v1.30.1-0.20250111191311-7b5c5cd3dd53 h1:f2yuRKAa4XWo1ILgyZWyYPA8VzOITWbk3B/md51goW8=
github.com/livekit/protocol v1.30.1-0.20250111191311-7b5c5cd3dd53/go.mod h1:08wT2rI6GecTCwh9n8OA28Gb7ZQNtIR+hX/LccP1TaY=
github.com/livekit/psrpc v0.6.1-0.20241018124827-1efff3d113a8 h1:Ibh0LoFl5NW5a1KFJEE0eLxxz7dqqKmYTj/BfCb0PbY=
github.com/livekit/psrpc v0.6.1-0.20241018124827-1efff3d113a8/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o=
@@ -474,12 +474,12 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/api v0.0.0-20241015192408-796eee8c2d53 h1:fVoAXEKA4+yufmbdVYv+SE73+cPZbbbe8paLsHfkK+U=
google.golang.org/genproto/googleapis/api v0.0.0-20241015192408-796eee8c2d53/go.mod h1:riSXTwQ4+nqmPGtobMFyW5FqVAmIs0St6VPp4Ug7CE4=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241219192143-6b3ec007d9bb h1:3oy2tynMOP1QbTC0MsNNAV+Se8M2Bd0A5+x1QHyw+pI=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241219192143-6b3ec007d9bb/go.mod h1:lcTa1sDdWEIHMWlITnIczmw5w60CF9ffkb8Z+DVmmjA=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250106144421-5f5ef82da422 h1:3UsHvIr4Wc2aW4brOaSCmcxh9ksica6fHEr8P1XhkYw=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250106144421-5f5ef82da422/go.mod h1:3ENsm/5D1mzDyhpzeRi1NR784I0BcofWBoSc5QqqMK4=
google.golang.org/grpc v1.69.2 h1:U3S9QEtbXC0bYNvRtcoklF3xGtLViumSYxWykJS+7AU=
google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4=
google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk=
google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
google.golang.org/protobuf v1.36.2 h1:R8FeyR1/eLmkutZOM5CWghmo5itiG9z0ktFlTVLuTmU=
google.golang.org/protobuf v1.36.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
+3 -3
View File
@@ -1008,7 +1008,7 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool, forceMa
// Hold sending padding packets till first RTCP-RR is received for this RTP stream.
// That is definitive proof that the remote side knows about this RTP stream.
if d.rtpStats.LastReceiverReportTime().IsZero() && !paddingOnMute {
if d.rtpStats.LastReceiverReportTime() == 0 && !paddingOnMute {
return 0
}
@@ -2093,7 +2093,7 @@ func (d *DownTrack) WriteProbePackets(bytesToSend int, usePadding bool) int {
if !d.writable.Load() ||
!d.rtpStats.IsActive() ||
(d.absSendTimeExtID == 0 && d.transportWideExtID == 0) ||
d.rtpStats.LastReceiverReportTime().IsZero() ||
d.rtpStats.LastReceiverReportTime() == 0 ||
d.sequencer == nil {
return 0
}
@@ -2261,7 +2261,7 @@ func (d *DownTrack) GetDeltaStatsSender() map[uint32]*buffer.StreamStatsWithLaye
}
func (d *DownTrack) GetPrimaryStreamLastReceiverReportTime() time.Time {
return d.rtpStats.LastReceiverReportTime()
return time.Unix(0, d.rtpStats.LastReceiverReportTime())
}
func (d *DownTrack) GetPrimaryStreamPacketsSent() uint64 {
+29 -27
View File
@@ -24,6 +24,7 @@ import (
"github.com/livekit/mediatransportutil"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/utils"
"github.com/livekit/protocol/utils/mono"
)
const (
@@ -298,7 +299,7 @@ func (r *rtpStatsBase) newSnapshotID(extStartSN uint64) uint32 {
}
if r.initialized {
r.snapshots[id-cFirstSnapshotID] = initSnapshot(time.Now(), extStartSN)
r.snapshots[id-cFirstSnapshotID] = initSnapshot(mono.UnixNano(), extStartSN)
}
return id
}
@@ -307,7 +308,7 @@ func (r *rtpStatsBase) UpdateFir(firCount uint32) {
r.lock.Lock()
defer r.lock.Unlock()
if !r.endTime.IsZero() {
if r.endTime != 0 {
return
}
@@ -318,7 +319,7 @@ func (r *rtpStatsBase) UpdateFirTime() {
r.lock.Lock()
defer r.lock.Unlock()
if !r.endTime.IsZero() {
if r.endTime != 0 {
return
}
@@ -329,7 +330,7 @@ func (r *rtpStatsBase) UpdateKeyFrame(kfCount uint32) {
r.lock.Lock()
defer r.lock.Unlock()
if !r.endTime.IsZero() {
if r.endTime != 0 {
return
}
@@ -341,7 +342,7 @@ func (r *rtpStatsBase) UpdateRtt(rtt uint32) {
r.lock.Lock()
defer r.lock.Unlock()
if !r.endTime.IsZero() {
if r.endTime != 0 {
return
}
@@ -366,7 +367,8 @@ func (r *rtpStatsBase) GetRtt() uint32 {
}
func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *livekit.RTCPSenderReportState, tsOffset uint64, extStartTS uint64) (err error, loggingFields []interface{}) {
if time.Since(r.startTime) > cFirstPacketTimeAdjustWindow {
nowNano := mono.UnixNano()
if time.Duration(nowNano-r.startTime) > cFirstPacketTimeAdjustWindow {
return
}
@@ -376,7 +378,7 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *livekit.RTCPSenderRepo
// abnormal delay (maybe due to pacing or maybe due to queuing
// in some network element along the way), push back first time
// to an earlier instance.
timeSinceReceive := time.Since(time.Unix(0, srData.AtAdjusted))
timeSinceReceive := time.Duration(nowNano - srData.AtAdjusted)
extNowTS := srData.RtpTimestampExt - tsOffset + uint64(timeSinceReceive.Nanoseconds()*int64(r.params.ClockRate)/1e9)
samplesDiff := int64(extNowTS - extStartTS)
if samplesDiff < 0 {
@@ -385,13 +387,13 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *livekit.RTCPSenderRepo
}
samplesDuration := time.Duration(float64(samplesDiff) / float64(r.params.ClockRate) * float64(time.Second))
timeSinceFirst := time.Since(time.Unix(0, r.firstTime))
timeSinceFirst := time.Duration(nowNano - r.firstTime)
now := r.firstTime + timeSinceFirst.Nanoseconds()
firstTime := now - samplesDuration.Nanoseconds()
getFields := func() []interface{} {
return []interface{}{
"startTime", r.startTime,
"startTime", time.Unix(0, r.startTime),
"nowTime", time.Unix(0, now),
"before", time.Unix(0, r.firstTime),
"after", time.Unix(0, firstTime),
@@ -455,7 +457,7 @@ func (r *rtpStatsBase) deltaInfo(
"snapshotID", snapshotID,
"snapshotNow", now,
"snapshotThen", then,
"duration", endTime.Sub(startTime),
"duration", time.Duration(endTime - startTime),
"packetsExpected", packetsExpected,
}
err = errors.New("too many packets expected in delta")
@@ -463,8 +465,8 @@ func (r *rtpStatsBase) deltaInfo(
}
if packetsExpected == 0 {
deltaInfo = &RTPDeltaInfo{
StartTime: startTime,
EndTime: endTime,
StartTime: time.Unix(0, startTime),
EndTime: time.Unix(0, endTime),
}
return
}
@@ -481,7 +483,7 @@ func (r *rtpStatsBase) deltaInfo(
"snapshotID", snapshotID,
"snapshotNow", now,
"snapshotThen", then,
"duration", endTime.Sub(startTime),
"duration", time.Duration(endTime - startTime),
"packetsExpected", packetsExpected,
"packetsPadding", packetsPadding,
"packetsLost", packetsLost,
@@ -493,8 +495,8 @@ func (r *rtpStatsBase) deltaInfo(
}
deltaInfo = &RTPDeltaInfo{
StartTime: startTime,
EndTime: endTime,
StartTime: time.Unix(0, startTime),
EndTime: time.Unix(0, endTime),
Packets: uint32(packetsExpected),
Bytes: now.bytes - then.bytes,
HeaderBytes: now.headerBytes - then.headerBytes,
@@ -667,7 +669,7 @@ func (r *rtpStatsBase) getAndResetSnapshot(snapshotID uint32, extStartSN uint64,
}
// snapshot now
now := r.getSnapshot(time.Now(), extHighestSN+1)
now := r.getSnapshot(mono.UnixNano(), extHighestSN+1)
r.snapshots[idx] = now
return &then, &now
}
@@ -765,7 +767,7 @@ func (r *rtpStatsBase) updateGapHistogram(gap int) {
}
}
func (r *rtpStatsBase) getSnapshot(startTime time.Time, extStartSN uint64) snapshot {
func (r *rtpStatsBase) getSnapshot(startTime int64, extStartSN uint64) snapshot {
return snapshot{
snapshotLite: r.getSnapshotLite(startTime, extStartSN),
headerBytes: r.headerBytes,
@@ -785,7 +787,7 @@ func (r *rtpStatsBase) getSnapshot(startTime time.Time, extStartSN uint64) snaps
// ----------------------------------
func initSnapshot(startTime time.Time, extStartSN uint64) snapshot {
func initSnapshot(startTime int64, extStartSN uint64) snapshot {
return snapshot{
snapshotLite: initSnapshotLite(startTime, extStartSN),
}
@@ -800,8 +802,8 @@ func AggregateRTPDeltaInfo(deltaInfoList []*RTPDeltaInfo) *RTPDeltaInfo {
return nil
}
startTime := time.Time{}
endTime := time.Time{}
startTime := int64(0)
endTime := int64(0)
packets := uint32(0)
bytes := uint64(0)
@@ -833,12 +835,12 @@ func AggregateRTPDeltaInfo(deltaInfoList []*RTPDeltaInfo) *RTPDeltaInfo {
continue
}
if startTime.IsZero() || startTime.After(deltaInfo.StartTime) {
startTime = deltaInfo.StartTime
if startTime == 0 || startTime > deltaInfo.StartTime.UnixNano() {
startTime = deltaInfo.StartTime.UnixNano()
}
if endTime.IsZero() || endTime.Before(deltaInfo.EndTime) {
endTime = deltaInfo.EndTime
if endTime == 0 || endTime < deltaInfo.EndTime.UnixNano() {
endTime = deltaInfo.EndTime.UnixNano()
}
packets += deltaInfo.Packets
@@ -871,13 +873,13 @@ func AggregateRTPDeltaInfo(deltaInfoList []*RTPDeltaInfo) *RTPDeltaInfo {
plis += deltaInfo.Plis
firs += deltaInfo.Firs
}
if startTime.IsZero() || endTime.IsZero() {
if startTime == 0 || endTime == 0 {
return nil
}
return &RTPDeltaInfo{
StartTime: startTime,
EndTime: endTime,
StartTime: time.Unix(0, startTime),
EndTime: time.Unix(0, endTime),
Packets: packets,
Bytes: bytes,
HeaderBytes: headerBytes,
+39 -38
View File
@@ -22,6 +22,7 @@ import (
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils/mono"
"go.uber.org/zap/zapcore"
"google.golang.org/protobuf/types/known/timestamppb"
)
@@ -64,7 +65,7 @@ func (r *RTPDeltaInfoLite) MarshalLogObject(e zapcore.ObjectEncoder) error {
type snapshotLite struct {
isValid bool
startTime time.Time
startTime int64
extStartSN uint64
bytes uint64
@@ -82,7 +83,7 @@ func (s *snapshotLite) MarshalLogObject(e zapcore.ObjectEncoder) error {
}
e.AddBool("isValid", s.isValid)
e.AddTime("startTime", s.startTime)
e.AddTime("startTime", time.Unix(0, s.startTime))
e.AddUint64("extStartSN", s.extStartSN)
e.AddUint64("bytes", s.bytes)
e.AddUint64("packetsOutOfOrder", s.packetsOutOfOrder)
@@ -106,8 +107,8 @@ type rtpStatsBaseLite struct {
initialized bool
startTime time.Time
endTime time.Time
startTime int64
endTime int64
bytes uint64
@@ -123,7 +124,7 @@ type rtpStatsBaseLite struct {
nackRepeated uint32
plis uint32
lastPli time.Time
lastPli int64
nextSnapshotLiteID uint32
snapshotLites []snapshotLite
@@ -178,7 +179,7 @@ func (r *rtpStatsBaseLite) Stop() {
r.lock.Lock()
defer r.lock.Unlock()
r.endTime = time.Now()
r.endTime = mono.UnixNano()
}
func (r *rtpStatsBaseLite) newSnapshotLiteID(extStartSN uint64) uint32 {
@@ -192,7 +193,7 @@ func (r *rtpStatsBaseLite) newSnapshotLiteID(extStartSN uint64) uint32 {
}
if r.initialized {
r.snapshotLites[id-cFirstSnapshotID] = initSnapshotLite(time.Now(), extStartSN)
r.snapshotLites[id-cFirstSnapshotID] = initSnapshotLite(mono.UnixNano(), extStartSN)
}
return id
}
@@ -201,14 +202,14 @@ func (r *rtpStatsBaseLite) IsActive() bool {
r.lock.RLock()
defer r.lock.RUnlock()
return r.initialized && r.endTime.IsZero()
return r.initialized && r.endTime == 0
}
func (r *rtpStatsBaseLite) UpdateNack(nackCount uint32) {
r.lock.Lock()
defer r.lock.Unlock()
if !r.endTime.IsZero() {
if r.endTime != 0 {
return
}
@@ -219,7 +220,7 @@ func (r *rtpStatsBaseLite) UpdateNackProcessed(nackAckCount uint32, nackMissCoun
r.lock.Lock()
defer r.lock.Unlock()
if !r.endTime.IsZero() {
if r.endTime != 0 {
return
}
@@ -232,7 +233,7 @@ func (r *rtpStatsBaseLite) CheckAndUpdatePli(throttle int64, force bool) bool {
r.lock.Lock()
defer r.lock.Unlock()
if !r.endTime.IsZero() || (!force && time.Now().UnixNano()-r.lastPli.UnixNano() < throttle) {
if r.endTime != 0 || (!force && mono.UnixNano()-r.lastPli < throttle) {
return false
}
r.updatePliLocked(1)
@@ -244,7 +245,7 @@ func (r *rtpStatsBaseLite) UpdatePliAndTime(pliCount uint32) {
r.lock.Lock()
defer r.lock.Unlock()
if !r.endTime.IsZero() {
if r.endTime != 0 {
return
}
@@ -256,7 +257,7 @@ func (r *rtpStatsBaseLite) UpdatePli(pliCount uint32) {
r.lock.Lock()
defer r.lock.Unlock()
if !r.endTime.IsZero() {
if r.endTime != 0 {
return
}
@@ -271,7 +272,7 @@ func (r *rtpStatsBaseLite) UpdatePliTime() {
r.lock.Lock()
defer r.lock.Unlock()
if !r.endTime.IsZero() {
if r.endTime != 0 {
return
}
@@ -279,10 +280,10 @@ func (r *rtpStatsBaseLite) UpdatePliTime() {
}
func (r *rtpStatsBaseLite) updatePliTimeLocked() {
r.lastPli = time.Now()
r.lastPli = mono.UnixNano()
}
func (r *rtpStatsBaseLite) LastPli() time.Time {
func (r *rtpStatsBaseLite) LastPli() int64 {
r.lock.RLock()
defer r.lock.RUnlock()
@@ -322,15 +323,15 @@ func (r *rtpStatsBaseLite) deltaInfoLite(
"snapshotLiteNow", now,
"snapshotLiteThen", then,
"packetsExpected", packetsExpected,
"duration", endTime.Sub(startTime).String(),
"duration", time.Duration(endTime - startTime),
}
err = errors.New("too many packets expected in delta lite")
return
}
if packetsExpected == 0 {
deltaInfoLite = &RTPDeltaInfoLite{
StartTime: startTime,
EndTime: endTime,
StartTime: time.Unix(0, startTime),
EndTime: time.Unix(0, endTime),
}
return
}
@@ -346,14 +347,14 @@ func (r *rtpStatsBaseLite) deltaInfoLite(
"snapshotLiteThen", then,
"packetsExpected", packetsExpected,
"packetsLost", packetsLost,
"duration", endTime.Sub(startTime).String(),
"duration", time.Duration(endTime - startTime),
}
err = errors.New("unexpected number of packets lost in delta lite")
}
deltaInfoLite = &RTPDeltaInfoLite{
StartTime: startTime,
EndTime: endTime,
StartTime: time.Unix(0, startTime),
EndTime: time.Unix(0, endTime),
Packets: packetsExpected,
Bytes: now.bytes - then.bytes,
PacketsLost: packetsLost,
@@ -369,17 +370,17 @@ func (r *rtpStatsBaseLite) marshalLogObject(e zapcore.ObjectEncoder, packetsExpe
}
endTime := r.endTime
if endTime.IsZero() {
endTime = time.Now()
if endTime == 0 {
endTime = mono.UnixNano()
}
elapsed := endTime.Sub(r.startTime)
elapsed := time.Duration(endTime - r.startTime)
if elapsed == 0 {
return 0, errors.New("no time elapsed")
}
elapsedSeconds := elapsed.Seconds()
e.AddTime("startTime", r.startTime)
e.AddTime("endTime", r.endTime)
e.AddTime("startTime", time.Unix(0, r.startTime))
e.AddTime("endTime", time.Unix(0, r.endTime))
e.AddDuration("elapsed", elapsed)
e.AddUint64("packetsExpected", packetsExpected)
@@ -424,20 +425,20 @@ func (r *rtpStatsBaseLite) marshalLogObject(e zapcore.ObjectEncoder, packetsExpe
e.AddUint32("nackRepeated", r.nackRepeated)
e.AddUint32("plis", r.plis)
e.AddTime("lastPli", r.lastPli)
e.AddTime("lastPli", time.Unix(0, r.lastPli))
return elapsedSeconds, nil
}
func (r *rtpStatsBaseLite) toProto(packetsExpected, packetsSeenMinusPadding, packetsLost uint64) *livekit.RTPStats {
if r.startTime.IsZero() {
if r.startTime == 0 {
return nil
}
endTime := r.endTime
if endTime.IsZero() {
endTime = time.Now()
if endTime == 0 {
endTime = mono.UnixNano()
}
elapsed := endTime.Sub(r.startTime).Seconds()
elapsed := time.Duration(endTime - r.startTime).Seconds()
if elapsed == 0.0 {
return nil
}
@@ -452,8 +453,8 @@ func (r *rtpStatsBaseLite) toProto(packetsExpected, packetsSeenMinusPadding, pac
}
p := &livekit.RTPStats{
StartTime: timestamppb.New(r.startTime),
EndTime: timestamppb.New(endTime),
StartTime: timestamppb.New(time.Unix(0, r.startTime)),
EndTime: timestamppb.New(time.Unix(0, endTime)),
Duration: elapsed,
Packets: uint32(packetsSeenMinusPadding),
PacketRate: packetRate,
@@ -468,7 +469,7 @@ func (r *rtpStatsBaseLite) toProto(packetsExpected, packetsSeenMinusPadding, pac
NackMisses: r.nackMisses,
NackRepeated: r.nackRepeated,
Plis: r.plis,
LastPli: timestamppb.New(r.lastPli),
LastPli: timestamppb.New(time.Unix(0, r.lastPli)),
}
gapsPresent := false
@@ -508,7 +509,7 @@ func (r *rtpStatsBaseLite) getAndResetSnapshotLite(snapshotLiteID uint32, extSta
}
// snapshot now
now := r.getSnapshotLite(time.Now(), extHighestSN+1)
now := r.getSnapshotLite(mono.UnixNano(), extHighestSN+1)
r.snapshotLites[idx] = now
return &then, &now
}
@@ -526,7 +527,7 @@ func (r *rtpStatsBaseLite) updateGapHistogram(gap int) {
}
}
func (r *rtpStatsBaseLite) getSnapshotLite(startTime time.Time, extStartSN uint64) snapshotLite {
func (r *rtpStatsBaseLite) getSnapshotLite(startTime int64, extStartSN uint64) snapshotLite {
return snapshotLite{
isValid: true,
startTime: startTime,
@@ -540,7 +541,7 @@ func (r *rtpStatsBaseLite) getSnapshotLite(startTime time.Time, extStartSN uint6
// ----------------------------------
func initSnapshotLite(startTime time.Time, extStartSN uint64) snapshotLite {
func initSnapshotLite(startTime int64, extStartSN uint64) snapshotLite {
return snapshotLite{
isValid: true,
startTime: startTime,
+8 -7
View File
@@ -27,6 +27,7 @@ import (
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
protoutils "github.com/livekit/protocol/utils"
"github.com/livekit/protocol/utils/mono"
)
const (
@@ -137,7 +138,7 @@ func (r *RTPStatsReceiver) Update(
r.lock.Lock()
defer r.lock.Unlock()
if !r.endTime.IsZero() {
if r.endTime != 0 {
flowState.IsNotHandled = true
return
}
@@ -181,7 +182,7 @@ func (r *RTPStatsReceiver) Update(
r.initialized = true
r.startTime = time.Now()
r.startTime = mono.UnixNano()
r.firstTime = packetTime
r.highestTime = packetTime
@@ -275,14 +276,13 @@ func (r *RTPStatsReceiver) Update(
}
if r.isInRange(resSN.ExtendedVal, resSN.PreExtendedHighest) {
if r.history.IsSet(resSN.ExtendedVal) {
if r.history.GetAndSet(resSN.ExtendedVal) {
r.bytesDuplicate += pktSize
r.headerBytesDuplicate += uint64(hdrSize)
r.packetsDuplicate++
flowState.IsDuplicate = true
} else {
r.packetsLost--
r.history.Set(resSN.ExtendedVal)
}
}
@@ -461,14 +461,15 @@ func (r *RTPStatsReceiver) checkRTPClockSkewAgainstMediaPathForSenderReport(srDa
return
}
timeSinceSR := time.Since(time.Unix(0, srData.AtAdjusted))
nowNano := mono.UnixNano()
timeSinceSR := time.Duration(nowNano - srData.AtAdjusted)
extNowTSSR := srData.RtpTimestampExt + uint64(timeSinceSR.Nanoseconds()*int64(r.params.ClockRate)/1e9)
timeSinceHighest := time.Since(time.Unix(0, r.highestTime))
timeSinceHighest := time.Duration(nowNano - r.highestTime)
extNowTSHighest := r.timestamp.GetExtendedHighest() + uint64(timeSinceHighest.Nanoseconds()*int64(r.params.ClockRate)/1e9)
diffHighest := extNowTSSR - extNowTSHighest
timeSinceFirst := time.Since(time.Unix(0, r.firstTime))
timeSinceFirst := time.Duration(nowNano - r.firstTime)
extNowTSFirst := r.timestamp.GetExtendedStart() + uint64(timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9)
diffFirst := extNowTSSR - extNowTSFirst
+3 -4
View File
@@ -15,12 +15,11 @@
package rtpstats
import (
"time"
"go.uber.org/zap/zapcore"
"github.com/livekit/livekit-server/pkg/sfu/utils"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/utils/mono"
)
type RTPFlowStateLite struct {
@@ -70,7 +69,7 @@ func (r *RTPStatsReceiverLite) Update(packetTime int64, packetSize int, sequence
r.lock.Lock()
defer r.lock.Unlock()
if !r.endTime.IsZero() {
if r.endTime != 0 {
flowStateLite.IsNotHandled = true
return
}
@@ -79,7 +78,7 @@ func (r *RTPStatsReceiverLite) Update(packetTime int64, packetSize int, sequence
if !r.initialized {
r.initialized = true
r.startTime = time.Now()
r.startTime = mono.UnixNano()
resSN = r.sequenceNumber.Update(sequenceNumber)
+35 -34
View File
@@ -122,7 +122,7 @@ func (w wrappedReceptionReportsLogger) MarshalLogObject(e zapcore.ObjectEncoder)
type senderSnapshot struct {
isValid bool
startTime time.Time
startTime int64
extStartSN uint64
bytes uint64
@@ -164,7 +164,7 @@ func (s *senderSnapshot) MarshalLogObject(e zapcore.ObjectEncoder) error {
}
e.AddBool("isValid", s.isValid)
e.AddTime("startTime", s.startTime)
e.AddTime("startTime", time.Unix(0, s.startTime))
e.AddUint64("extStartSN", s.extStartSN)
e.AddUint64("bytes", s.bytes)
e.AddUint64("headerBytes", s.headerBytes)
@@ -209,7 +209,7 @@ type RTPStatsSender struct {
rttMarker rttMarker
lastRRTime time.Time
lastRRTime int64
lastRR rtcp.ReceptionReport
extStartTS uint64
@@ -301,7 +301,7 @@ func (r *RTPStatsSender) NewSenderSnapshotId() uint32 {
}
if r.initialized {
r.senderSnapshots[id-cFirstSnapshotID] = initSenderSnapshot(time.Now(), r.extHighestSN)
r.senderSnapshots[id-cFirstSnapshotID] = initSenderSnapshot(mono.UnixNano(), r.extHighestSN)
}
return id
}
@@ -319,7 +319,7 @@ func (r *RTPStatsSender) Update(
r.lock.Lock()
defer r.lock.Unlock()
if !r.endTime.IsZero() {
if r.endTime != 0 {
return
}
@@ -331,7 +331,7 @@ func (r *RTPStatsSender) Update(
r.initialized = true
r.startTime = time.Now()
r.startTime = mono.UnixNano()
r.highestTime = packetTime
@@ -515,7 +515,7 @@ func (r *RTPStatsSender) UpdateLayerLockPliAndTime(pliCount uint32) {
r.lock.Lock()
defer r.lock.Unlock()
if !r.endTime.IsZero() {
if r.endTime != 0 {
return
}
@@ -534,12 +534,12 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt
r.lock.Lock()
defer r.lock.Unlock()
if !r.initialized || !r.endTime.IsZero() {
if !r.initialized || r.endTime != 0 {
return
}
extHighestSNFromRR := r.extHighestSNFromRR&0xFFFF_FFFF_0000_0000 + uint64(rr.LastSequenceNumber)
if !r.lastRRTime.IsZero() {
if r.lastRRTime != 0 {
if (rr.LastSequenceNumber-r.lastRR.LastSequenceNumber) < (1<<31) && rr.LastSequenceNumber < r.lastRR.LastSequenceNumber {
extHighestSNFromRR += (1 << 32)
}
@@ -550,10 +550,10 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt
return
}
if !r.lastRRTime.IsZero() && r.extHighestSNFromRR > extHighestSNFromRR {
if r.lastRRTime != 0 && r.extHighestSNFromRR > extHighestSNFromRR {
r.logger.Debugw(
fmt.Sprintf("receiver report potentially out of order, highestSN: existing: %d, received: %d", r.extHighestSNFromRR, extHighestSNFromRR),
"sinceLastRR", time.Since(r.lastRRTime),
"sinceLastRR", time.Duration(mono.UnixNano()-r.lastRRTime),
"receivedRR", rr,
"rtpStats", lockedRTPStatsSenderLogEncoder{r},
)
@@ -609,9 +609,9 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt
}
if int64(extReceivedRRSN-s.extLastRRSN) < 0 || (extReceivedRRSN-s.extLastRRSN) > (1<<15) {
timeSinceLastRR := time.Since(r.lastRRTime)
if r.lastRRTime.IsZero() {
timeSinceLastRR = time.Since(r.startTime)
timeSinceLastRR := time.Duration(mono.UnixNano() - r.lastRRTime)
if r.lastRRTime == 0 {
timeSinceLastRR = time.Duration(mono.UnixNano() - r.startTime)
}
r.logger.Infow(
"rr interval too big, skipping",
@@ -631,9 +631,9 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt
eis := &s.intervalStats
eis.aggregate(&is)
if is.packetsNotFoundMetadata != 0 {
timeSinceLastRR := time.Since(r.lastRRTime)
if r.lastRRTime.IsZero() {
timeSinceLastRR = time.Since(r.startTime)
timeSinceLastRR := time.Duration(mono.UnixNano() - r.lastRRTime)
if r.lastRRTime == 0 {
timeSinceLastRR = time.Duration(mono.UnixNano() - r.startTime)
}
r.metadataCacheOverflowCount++
if (r.metadataCacheOverflowCount-1)%10 == 0 {
@@ -654,12 +654,12 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt
s.processedReceptionReports = append(s.processedReceptionReports, rr)
}
r.lastRRTime = time.Now()
r.lastRRTime = mono.UnixNano()
r.lastRR = rr
return
}
func (r *RTPStatsSender) LastReceiverReportTime() time.Time {
func (r *RTPStatsSender) LastReceiverReportTime() int64 {
r.lock.RLock()
defer r.lock.RUnlock()
@@ -715,7 +715,7 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, publisherSRData *livek
nowNTP = mediatransportutil.NtpTime(publisherSRData.NtpTimestamp)
nowRTPExt = publisherSRData.RtpTimestampExt - tsOffset
} else {
timeSincePublisherSRAdjusted := time.Since(time.Unix(0, publisherSRData.AtAdjusted))
timeSincePublisherSRAdjusted := time.Duration(mono.UnixNano() - publisherSRData.AtAdjusted)
reportTimeAdjusted = publisherSRData.AtAdjusted + timeSincePublisherSRAdjusted.Nanoseconds()
reportTime = reportTimeAdjusted
@@ -736,17 +736,18 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, publisherSRData *livek
}
ulgr := func() logger.UnlikelyLogger {
nowNano := mono.UnixNano()
return r.logger.WithUnlikelyValues(
"curr", WrappedRTCPSenderReportStateLogger{srData},
"feed", WrappedRTCPSenderReportStateLogger{publisherSRData},
"tsOffset", tsOffset,
"timeNow", time.Now(),
"timeNow", mono.Now(),
"reportTime", time.Unix(0, reportTime),
"reportTimeAdjusted", time.Unix(0, reportTimeAdjusted),
"timeSinceHighest", time.Since(time.Unix(0, r.highestTime)),
"timeSinceFirst", time.Since(time.Unix(0, r.firstTime)),
"timeSincePublisherSRAdjusted", time.Since(time.Unix(0, publisherSRData.AtAdjusted)),
"timeSincePublisherSR", time.Since(time.Unix(0, publisherSRData.At)),
"timeSinceHighest", time.Duration(nowNano-r.highestTime),
"timeSinceFirst", time.Duration(nowNano-r.firstTime),
"timeSincePublisherSRAdjusted", time.Duration(nowNano-publisherSRData.AtAdjusted),
"timeSincePublisherSR", time.Duration(nowNano-publisherSRData.At),
"nowRTPExt", nowRTPExt,
"rtpStats", lockedRTPStatsSenderLogEncoder{r},
)
@@ -815,7 +816,7 @@ func (r *RTPStatsSender) DeltaInfo(snapshotID uint32) *RTPDeltaInfo {
func (r *RTPStatsSender) DeltaInfoSender(senderSnapshotID uint32) *RTPDeltaInfo {
r.lock.Lock()
defer r.lock.Unlock()
if r.lastRRTime.IsZero() {
if r.lastRRTime == 0 {
return nil
}
@@ -835,7 +836,7 @@ func (r *RTPStatsSender) DeltaInfoSender(senderSnapshotID uint32) *RTPDeltaInfo
"senderSnapshotNow", now,
"senderSnapshotThen", then,
"packetsExpected", packetsExpected,
"duration", endTime.Sub(startTime),
"duration", time.Duration(endTime-startTime),
"rtpStats", lockedRTPStatsSenderLogEncoder{r},
)
return nil
@@ -862,7 +863,7 @@ func (r *RTPStatsSender) DeltaInfoSender(senderSnapshotID uint32) *RTPDeltaInfo
"packetsExpected", packetsExpected,
"packetsLost", packetsLost,
"packetsLostFeed", packetsLostFeed,
"duration", endTime.Sub(startTime),
"duration", time.Duration(endTime-startTime),
"rtpStats", lockedRTPStatsSenderLogEncoder{r},
)
packetsLost = packetsExpected
@@ -876,8 +877,8 @@ func (r *RTPStatsSender) DeltaInfoSender(senderSnapshotID uint32) *RTPDeltaInfo
maxJitterTime := maxJitter / float64(r.params.ClockRate) * 1e6
return &RTPDeltaInfo{
StartTime: startTime,
EndTime: endTime,
StartTime: time.Unix(0, startTime),
EndTime: time.Unix(0, endTime),
Packets: packetsExpected - uint32(now.packetsPadding-then.packetsPadding),
Bytes: now.bytes - then.bytes,
HeaderBytes: now.headerBytes - then.headerBytes,
@@ -933,7 +934,7 @@ func (r *RTPStatsSender) ToProto() *livekit.RTPStats {
}
func (r *RTPStatsSender) getAndResetSenderSnapshot(senderSnapshotID uint32) (*senderSnapshot, *senderSnapshot) {
if !r.initialized || r.lastRRTime.IsZero() {
if !r.initialized || r.lastRRTime == 0 {
return nil, nil
}
@@ -950,7 +951,7 @@ func (r *RTPStatsSender) getAndResetSenderSnapshot(senderSnapshotID uint32) (*se
return &then, &now
}
func (r *RTPStatsSender) getSenderSnapshot(startTime time.Time, s *senderSnapshot) senderSnapshot {
func (r *RTPStatsSender) getSenderSnapshot(startTime int64, s *senderSnapshot) senderSnapshot {
if s == nil {
return senderSnapshot{}
}
@@ -1118,7 +1119,7 @@ func (r lockedRTPStatsSenderLogEncoder) MarshalLogObject(e zapcore.ObjectEncoder
e.AddUint64("extStartTS", r.extStartTS)
e.AddUint64("extHighestTS", r.extHighestTS)
e.AddTime("lastRRTime", r.lastRRTime)
e.AddTime("lastRRTime", time.Unix(0, r.lastRRTime))
e.AddReflected("lastRR", r.lastRR)
e.AddUint64("extHighestSNFromRR", r.extHighestSNFromRR)
e.AddUint64("packetsLostFromRR", r.packetsLostFromRR)
@@ -1136,7 +1137,7 @@ func (r lockedRTPStatsSenderLogEncoder) MarshalLogObject(e zapcore.ObjectEncoder
// -------------------------------------------------------------------
func initSenderSnapshot(startTime time.Time, extStartSN uint64) senderSnapshot {
func initSenderSnapshot(startTime int64, extStartSN uint64) senderSnapshot {
return senderSnapshot{
isValid: true,
startTime: startTime,
+3 -4
View File
@@ -15,9 +15,8 @@
package rtpstats
import (
"time"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/utils/mono"
"go.uber.org/zap/zapcore"
)
@@ -38,14 +37,14 @@ func (r *RTPStatsSenderLite) Update(packetTime int64, packetSize int, extSequenc
r.lock.Lock()
defer r.lock.Unlock()
if !r.endTime.IsZero() {
if r.endTime != 0 {
return
}
if !r.initialized {
r.initialized = true
r.startTime = time.Now()
r.startTime = mono.UnixNano()
r.extStartSN = extSequenceNumber
r.extHighestSN = extSequenceNumber - 1
+1 -1
View File
@@ -120,7 +120,7 @@ func (w *WrapAround[T, ET]) UndoUpdate(result WrapAroundUpdateResult[ET]) {
}
func (w *WrapAround[T, ET]) Rollover(val T, numCycles int) (result WrapAroundUpdateResult[ET]) {
if !w.initialized || numCycles < 0 {
if numCycles < 0 || !w.initialized {
return w.Update(val)
}