diff --git a/go.mod b/go.mod index 9afa36623..9ca9921c1 100644 --- a/go.mod +++ b/go.mod @@ -19,8 +19,8 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20240302142739-1c3dd691a1b8 - github.com/livekit/protocol v1.12.1-0.20240331203140-766ababa37ae - github.com/livekit/psrpc v0.5.3-0.20240312110212-61ab09477c30 + github.com/livekit/protocol v1.12.1-0.20240403063258-fc68e8d82b26 + github.com/livekit/psrpc v0.5.3-0.20240327035954-cec3a0e614be github.com/mackerelio/go-osstat v0.2.4 github.com/magefile/mage v1.15.0 github.com/maxbrunsfeld/counterfeiter/v6 v6.8.1 @@ -79,7 +79,7 @@ require ( github.com/mattn/go-runewidth v0.0.9 // indirect github.com/mdlayher/netlink v1.7.1 // indirect github.com/mdlayher/socket v0.4.0 // indirect - github.com/nats-io/nats.go v1.33.1 // indirect + github.com/nats-io/nats.go v1.34.0 // indirect github.com/nats-io/nkeys v0.4.7 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/pion/datachannel v1.5.5 // indirect @@ -104,7 +104,7 @@ require ( golang.org/x/sys v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/tools v0.19.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240311173647-c811ad7063a7 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect google.golang.org/grpc v1.62.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index 1c6c09cb8..fb236c8f8 100644 --- a/go.sum +++ b/go.sum @@ -132,10 +132,10 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20240302142739-1c3dd691a1b8 h1:xawydPEACNO5Ncs2LgioTjWghXQ0eUN1q1RnVUUyVnI= github.com/livekit/mediatransportutil v0.0.0-20240302142739-1c3dd691a1b8/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE= -github.com/livekit/protocol v1.12.1-0.20240331203140-766ababa37ae h1:uUc+ou3R6xXkrIRNMYHraGTYhCSs6D4p9++Vwq8CK0E= -github.com/livekit/protocol v1.12.1-0.20240331203140-766ababa37ae/go.mod h1:mcv7L2DWB6iRckI++egmwFU7YEy3W+aLHnusNhioqi0= -github.com/livekit/psrpc v0.5.3-0.20240312110212-61ab09477c30 h1:3GEU6vP+KLTTOEqsFKW+PgIUp+i+s0jaUqogQc/hb7M= -github.com/livekit/psrpc v0.5.3-0.20240312110212-61ab09477c30/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= +github.com/livekit/protocol v1.12.1-0.20240403063258-fc68e8d82b26 h1:8Oog+IH4NVbq7PEXYU7bIW8Kw4rqd0HgtWaOh++9ZcE= +github.com/livekit/protocol v1.12.1-0.20240403063258-fc68e8d82b26/go.mod h1:mcv7L2DWB6iRckI++egmwFU7YEy3W+aLHnusNhioqi0= +github.com/livekit/psrpc v0.5.3-0.20240327035954-cec3a0e614be h1:W1nCFZ19rYAORMBNX82NeVPHjADN0UyORr6refbUXpU= +github.com/livekit/psrpc v0.5.3-0.20240327035954-cec3a0e614be/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= github.com/mackerelio/go-osstat v0.2.4/go.mod h1:Zy+qzGdZs3A9cuIqmgbJvwbmLQH9dJvtio5ZjJTbdlQ= github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg= @@ -165,8 +165,8 @@ github.com/mdlayher/socket v0.4.0 h1:280wsy40IC9M9q1uPGcLBwXpcTQDtoGwVt+BNoITxIw github.com/mdlayher/socket v0.4.0/go.mod h1:xxFqz5GRCUN3UEOm9CZqEJsAbe1C8OwSK46NlmWuVoc= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= -github.com/nats-io/nats.go v1.33.1 h1:8TxLZZ/seeEfR97qV0/Bl939tpDnt2Z2fK3HkPypj70= -github.com/nats-io/nats.go v1.33.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nats.go v1.34.0 h1:fnxnPCNiwIG5w08rlMcEKTUw4AV/nKyGCOJE8TdhSPk= +github.com/nats-io/nats.go v1.34.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= @@ -440,8 +440,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240311173647-c811ad7063a7 h1:8EeVk1VKMD+GD/neyEHGmz7pFblqPjHoi+PGQIlLx2s= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240311173647-c811ad7063a7/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda h1:LI5DOvAxUPMv/50agcLLoo+AdWc1irS9Rzz4vPuD1V4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= google.golang.org/grpc v1.62.1 h1:B4n+nfKzOICUXMgyrNd19h/I9oH0L1pizfk1d4zSgTk= google.golang.org/grpc v1.62.1/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index b533318da..8cfa7e663 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -793,14 +793,16 @@ func (b *Buffer) mayGrowBucket() { return } oldCap := cap - deltaInfo := b.rtpStats.DeltaInfo(b.ppsSnapshotId) - if deltaInfo != nil && deltaInfo.Duration > 500*time.Millisecond { - pps := int(time.Duration(deltaInfo.Packets) * time.Second / deltaInfo.Duration) - for pps > cap && cap < maxPkts { - cap = b.bucket.Grow() - } - if cap > oldCap { - b.logger.Debugw("grow bucket", "from", oldCap, "to", cap, "pps", pps) + if deltaInfo := b.rtpStats.DeltaInfo(b.ppsSnapshotId); deltaInfo != nil { + duration := deltaInfo.EndTime.Sub(deltaInfo.StartTime) + if duration > 500*time.Millisecond { + pps := int(time.Duration(deltaInfo.Packets) * time.Second / duration) + for pps > cap && cap < maxPkts { + cap = b.bucket.Grow() + } + if cap > oldCap { + b.logger.Debugw("grow bucket", "from", oldCap, "to", cap, "pps", pps) + } } } } diff --git a/pkg/sfu/buffer/rtpstats_base.go b/pkg/sfu/buffer/rtpstats_base.go index 6d6203c17..77586d38b 100644 --- a/pkg/sfu/buffer/rtpstats_base.go +++ b/pkg/sfu/buffer/rtpstats_base.go @@ -55,7 +55,7 @@ func RTPDriftToString(r *livekit.RTPDrift) string { type RTPDeltaInfo struct { StartTime time.Time - Duration time.Duration + EndTime time.Time Packets uint32 Bytes uint64 HeaderBytes uint64 @@ -572,7 +572,7 @@ func (r *rtpStatsBase) deltaInfo(snapshotID uint32, extStartSN uint64, extHighes if packetsExpected == 0 { return &RTPDeltaInfo{ StartTime: startTime, - Duration: endTime.Sub(startTime), + EndTime: endTime, } } @@ -592,7 +592,7 @@ func (r *rtpStatsBase) deltaInfo(snapshotID uint32, extStartSN uint64, extHighes return &RTPDeltaInfo{ StartTime: startTime, - Duration: endTime.Sub(startTime), + EndTime: endTime, Packets: uint32(packetsExpected), Bytes: now.bytes - then.bytes, HeaderBytes: now.headerBytes - then.headerBytes, @@ -997,9 +997,8 @@ func AggregateRTPDeltaInfo(deltaInfoList []*RTPDeltaInfo) *RTPDeltaInfo { startTime = deltaInfo.StartTime } - endedAt := deltaInfo.StartTime.Add(deltaInfo.Duration) - if endTime.IsZero() || endTime.Before(endedAt) { - endTime = endedAt + if endTime.IsZero() || endTime.Before(deltaInfo.EndTime) { + endTime = deltaInfo.EndTime } packets += deltaInfo.Packets @@ -1038,7 +1037,7 @@ func AggregateRTPDeltaInfo(deltaInfoList []*RTPDeltaInfo) *RTPDeltaInfo { return &RTPDeltaInfo{ StartTime: startTime, - Duration: endTime.Sub(startTime), + EndTime: endTime, Packets: packets, Bytes: bytes, HeaderBytes: headerBytes, diff --git a/pkg/sfu/buffer/rtpstats_sender.go b/pkg/sfu/buffer/rtpstats_sender.go index b7cc94c06..b85efb829 100644 --- a/pkg/sfu/buffer/rtpstats_sender.go +++ b/pkg/sfu/buffer/rtpstats_sender.go @@ -776,7 +776,7 @@ func (r *RTPStatsSender) DeltaInfoSender(senderSnapshotID uint32) *RTPDeltaInfo return &RTPDeltaInfo{ StartTime: startTime, - Duration: endTime.Sub(startTime), + EndTime: endTime, Packets: packetsExpected - uint32(now.packetsPadding-then.packetsPadding), Bytes: now.bytes - then.bytes, HeaderBytes: now.headerBytes - then.headerBytes, diff --git a/pkg/sfu/connectionquality/connectionstats.go b/pkg/sfu/connectionquality/connectionstats.go index c221ea130..6c955dc71 100644 --- a/pkg/sfu/connectionquality/connectionstats.go +++ b/pkg/sfu/connectionquality/connectionstats.go @@ -22,6 +22,7 @@ import ( "github.com/frostbyte73/core" "github.com/pion/webrtc/v3" "go.uber.org/atomic" + "google.golang.org/protobuf/types/known/timestamppb" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" @@ -205,7 +206,7 @@ func (cs *ConnectionStats) updateScoreWithAggregate(agg *buffer.RTPDeltaInfo, at var stat windowStat if agg != nil { stat.startedAt = agg.StartTime - stat.duration = agg.Duration + stat.duration = agg.EndTime.Sub(agg.StartTime) stat.packetsExpected = agg.Packets + agg.PacketsPadding stat.packetsLost = agg.PacketsLost stat.packetsMissing = agg.PacketsMissing @@ -263,7 +264,6 @@ func (cs *ConnectionStats) updateScoreFromReceiverReport(at time.Time) (float32, } if streamingStartedAt.After(agg.StartTime) { - agg.Duration = agg.StartTime.Add(agg.Duration).Sub(streamingStartedAt) agg.StartTime = streamingStartedAt } return cs.updateScoreWithAggregate(agg, at), streams @@ -428,6 +428,8 @@ func toAnalyticsStream(ssrc uint32, deltaStats *buffer.RTPDeltaInfo) *livekit.An packetsLost -= deltaStats.PacketsMissing } return &livekit.AnalyticsStream{ + StartTime: timestamppb.New(deltaStats.StartTime), + EndTime: timestamppb.New(deltaStats.EndTime), Ssrc: ssrc, PrimaryPackets: deltaStats.Packets, PrimaryBytes: deltaStats.Bytes, diff --git a/pkg/sfu/connectionquality/connectionstats_test.go b/pkg/sfu/connectionquality/connectionstats_test.go index 569238a40..c34a6d2ce 100644 --- a/pkg/sfu/connectionquality/connectionstats_test.go +++ b/pkg/sfu/connectionquality/connectionstats_test.go @@ -75,7 +75,7 @@ func TestConnectionQuality(t *testing.T) { 1: { RTPStats: &buffer.RTPDeltaInfo{ StartTime: now, - Duration: duration, + EndTime: now.Add(duration), Packets: 250, }, }, @@ -91,7 +91,7 @@ func TestConnectionQuality(t *testing.T) { 1: { RTPStats: &buffer.RTPDeltaInfo{ StartTime: now, - Duration: duration, + EndTime: now.Add(duration), Packets: 120, PacketsLost: 30, }, @@ -99,7 +99,7 @@ func TestConnectionQuality(t *testing.T) { 2: { RTPStats: &buffer.RTPDeltaInfo{ StartTime: now, - Duration: duration, + EndTime: now.Add(duration), Packets: 130, PacketsLost: 0, }, @@ -118,7 +118,7 @@ func TestConnectionQuality(t *testing.T) { 1: { RTPStats: &buffer.RTPDeltaInfo{ StartTime: now, - Duration: duration, + EndTime: now.Add(duration), Packets: 250, }, }, @@ -134,7 +134,7 @@ func TestConnectionQuality(t *testing.T) { 1: { RTPStats: &buffer.RTPDeltaInfo{ StartTime: now, - Duration: duration, + EndTime: now.Add(duration), Packets: 250, }, }, @@ -150,7 +150,7 @@ func TestConnectionQuality(t *testing.T) { 1: { RTPStats: &buffer.RTPDeltaInfo{ StartTime: now, - Duration: duration, + EndTime: now.Add(duration), Packets: 250, }, }, @@ -166,7 +166,7 @@ func TestConnectionQuality(t *testing.T) { 1: { RTPStats: &buffer.RTPDeltaInfo{ StartTime: now, - Duration: duration, + EndTime: now.Add(duration), Packets: 250, PacketsLost: 13, }, @@ -183,7 +183,7 @@ func TestConnectionQuality(t *testing.T) { 1: { RTPStats: &buffer.RTPDeltaInfo{ StartTime: now, - Duration: duration, + EndTime: now.Add(duration), Packets: 250, }, }, @@ -199,7 +199,7 @@ func TestConnectionQuality(t *testing.T) { 1: { RTPStats: &buffer.RTPDeltaInfo{ StartTime: now, - Duration: duration, + EndTime: now.Add(duration), Packets: 250, }, }, @@ -215,7 +215,7 @@ func TestConnectionQuality(t *testing.T) { 1: { RTPStats: &buffer.RTPDeltaInfo{ StartTime: now, - Duration: duration, + EndTime: now.Add(duration), Packets: 250, PacketsLost: 30, }, @@ -240,7 +240,7 @@ func TestConnectionQuality(t *testing.T) { 1: { RTPStats: &buffer.RTPDeltaInfo{ StartTime: now, - Duration: duration, + EndTime: now.Add(duration), Packets: 0, }, }, @@ -256,7 +256,7 @@ func TestConnectionQuality(t *testing.T) { 1: { RTPStats: &buffer.RTPDeltaInfo{ StartTime: now, - Duration: duration, + EndTime: now.Add(duration), Packets: 0, }, }, @@ -281,7 +281,7 @@ func TestConnectionQuality(t *testing.T) { 1: { RTPStats: &buffer.RTPDeltaInfo{ StartTime: now, - Duration: duration, + EndTime: now.Add(duration), Packets: 250, PacketsLost: 0, }, @@ -301,7 +301,7 @@ func TestConnectionQuality(t *testing.T) { 1: { RTPStats: &buffer.RTPDeltaInfo{ StartTime: now, - Duration: duration, + EndTime: now.Add(duration), Packets: 50, PacketsLost: 5, }, @@ -323,7 +323,7 @@ func TestConnectionQuality(t *testing.T) { 1: { RTPStats: &buffer.RTPDeltaInfo{ StartTime: now, - Duration: duration, + EndTime: now.Add(duration), Packets: 250, PacketsLost: 5, RttMax: 400, @@ -349,7 +349,7 @@ func TestConnectionQuality(t *testing.T) { 1: { RTPStats: &buffer.RTPDeltaInfo{ StartTime: now, - Duration: duration, + EndTime: now.Add(duration), Packets: 250, Bytes: 8_000_000 / 8 / 5, }, @@ -368,7 +368,7 @@ func TestConnectionQuality(t *testing.T) { 1: { RTPStats: &buffer.RTPDeltaInfo{ StartTime: now, - Duration: duration, + EndTime: now.Add(duration), Packets: 250, Bytes: 8_000_000 / 8 / 5, }, @@ -392,7 +392,7 @@ func TestConnectionQuality(t *testing.T) { 1: { RTPStats: &buffer.RTPDeltaInfo{ StartTime: now, - Duration: duration, + EndTime: now.Add(duration), Packets: 250, Bytes: 8_000_000 / 8 / 5, }, @@ -419,7 +419,7 @@ func TestConnectionQuality(t *testing.T) { 1: { RTPStats: &buffer.RTPDeltaInfo{ StartTime: now, - Duration: duration, + EndTime: now.Add(duration), Packets: 250, Bytes: 8_000_000 / 8 / 5, }, @@ -453,7 +453,7 @@ func TestConnectionQuality(t *testing.T) { 1: { RTPStats: &buffer.RTPDeltaInfo{ StartTime: now, - Duration: duration, + EndTime: now.Add(duration), Packets: 250, PacketsLost: 5, RttMax: 700, @@ -488,7 +488,7 @@ func TestConnectionQuality(t *testing.T) { 1: { RTPStats: &buffer.RTPDeltaInfo{ StartTime: now, - Duration: duration, + EndTime: now.Add(duration), Packets: 250, PacketsLost: 5, JitterMax: 200, @@ -657,7 +657,7 @@ func TestConnectionQuality(t *testing.T) { 123: { RTPStats: &buffer.RTPDeltaInfo{ StartTime: now, - Duration: duration, + EndTime: now.Add(duration), Packets: tc.packetsExpected, PacketsLost: uint32(math.Ceil(eq.packetLossPercentage * float64(tc.packetsExpected) / 100.0)), }, @@ -761,7 +761,7 @@ func TestConnectionQuality(t *testing.T) { 123: { RTPStats: &buffer.RTPDeltaInfo{ StartTime: now, - Duration: duration, + EndTime: now.Add(duration), Packets: 100, Bytes: tc.bytes, }, @@ -855,7 +855,7 @@ func TestConnectionQuality(t *testing.T) { 123: { RTPStats: &buffer.RTPDeltaInfo{ StartTime: now, - Duration: duration, + EndTime: now.Add(duration), Packets: 200, }, }, diff --git a/pkg/sfu/connectionquality/scorer.go b/pkg/sfu/connectionquality/scorer.go index 4ed30faa3..b38355c06 100644 --- a/pkg/sfu/connectionquality/scorer.go +++ b/pkg/sfu/connectionquality/scorer.go @@ -505,7 +505,7 @@ func (q *qualityScorer) isPaused() bool { } func (q *qualityScorer) getPacketLossWeight(stat *windowStat) float64 { - if stat == nil || stat.duration == 0 { + if stat == nil || stat.duration <= 0 { return q.params.PacketLossWeight } diff --git a/pkg/telemetry/statsworker.go b/pkg/telemetry/statsworker.go index d48a6d6d5..18bde9e55 100644 --- a/pkg/telemetry/statsworker.go +++ b/pkg/telemetry/statsworker.go @@ -158,6 +158,8 @@ func coalesce(stats []*livekit.AnalyticsStat) *livekit.AnalyticsStat { } // find aggregates across streams + startTime := time.Time{} + endTime := time.Time{} scoreSum := float32(0.0) // used for average minScore := float32(0.0) // min score in batched stats var scores []float32 // used for median @@ -183,6 +185,16 @@ func coalesce(stats []*livekit.AnalyticsStat) *livekit.AnalyticsStat { } for _, analyticsStream := range stat.Streams { + start := analyticsStream.StartTime.AsTime() + if startTime.IsZero() || startTime.After(start) { + startTime = start + } + + end := analyticsStream.EndTime.AsTime() + if endTime.IsZero() || endTime.Before(end) { + endTime = end + } + if analyticsStream.Rtt > maxRtt { maxRtt = analyticsStream.Rtt } @@ -216,6 +228,8 @@ func coalesce(stats []*livekit.AnalyticsStat) *livekit.AnalyticsStat { } } } + coalescedStream.StartTime = timestamppb.New(startTime) + coalescedStream.EndTime = timestamppb.New(endTime) coalescedStream.Rtt = maxRtt coalescedStream.Jitter = maxJitter