Add start/end time to AnalyticsStream. (#2618)

* Add start/end time to AnalyticsStream.

* fix test
This commit is contained in:
Raja Subramanian
2024-04-03 12:23:18 +05:30
committed by GitHub
parent 1caa6ff6d0
commit 63b1fba082
9 changed files with 72 additions and 55 deletions
+4 -4
View File
@@ -19,8 +19,8 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20240302142739-1c3dd691a1b8
github.com/livekit/protocol v1.12.1-0.20240331203140-766ababa37ae
github.com/livekit/psrpc v0.5.3-0.20240312110212-61ab09477c30
github.com/livekit/protocol v1.12.1-0.20240403063258-fc68e8d82b26
github.com/livekit/psrpc v0.5.3-0.20240327035954-cec3a0e614be
github.com/mackerelio/go-osstat v0.2.4
github.com/magefile/mage v1.15.0
github.com/maxbrunsfeld/counterfeiter/v6 v6.8.1
@@ -79,7 +79,7 @@ require (
github.com/mattn/go-runewidth v0.0.9 // indirect
github.com/mdlayher/netlink v1.7.1 // indirect
github.com/mdlayher/socket v0.4.0 // indirect
github.com/nats-io/nats.go v1.33.1 // indirect
github.com/nats-io/nats.go v1.34.0 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pion/datachannel v1.5.5 // indirect
@@ -104,7 +104,7 @@ require (
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.19.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240311173647-c811ad7063a7 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect
google.golang.org/grpc v1.62.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)
+8 -8
View File
@@ -132,10 +132,10 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20240302142739-1c3dd691a1b8 h1:xawydPEACNO5Ncs2LgioTjWghXQ0eUN1q1RnVUUyVnI=
github.com/livekit/mediatransportutil v0.0.0-20240302142739-1c3dd691a1b8/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE=
github.com/livekit/protocol v1.12.1-0.20240331203140-766ababa37ae h1:uUc+ou3R6xXkrIRNMYHraGTYhCSs6D4p9++Vwq8CK0E=
github.com/livekit/protocol v1.12.1-0.20240331203140-766ababa37ae/go.mod h1:mcv7L2DWB6iRckI++egmwFU7YEy3W+aLHnusNhioqi0=
github.com/livekit/psrpc v0.5.3-0.20240312110212-61ab09477c30 h1:3GEU6vP+KLTTOEqsFKW+PgIUp+i+s0jaUqogQc/hb7M=
github.com/livekit/psrpc v0.5.3-0.20240312110212-61ab09477c30/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/livekit/protocol v1.12.1-0.20240403063258-fc68e8d82b26 h1:8Oog+IH4NVbq7PEXYU7bIW8Kw4rqd0HgtWaOh++9ZcE=
github.com/livekit/protocol v1.12.1-0.20240403063258-fc68e8d82b26/go.mod h1:mcv7L2DWB6iRckI++egmwFU7YEy3W+aLHnusNhioqi0=
github.com/livekit/psrpc v0.5.3-0.20240327035954-cec3a0e614be h1:W1nCFZ19rYAORMBNX82NeVPHjADN0UyORr6refbUXpU=
github.com/livekit/psrpc v0.5.3-0.20240327035954-cec3a0e614be/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs=
github.com/mackerelio/go-osstat v0.2.4/go.mod h1:Zy+qzGdZs3A9cuIqmgbJvwbmLQH9dJvtio5ZjJTbdlQ=
github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg=
@@ -165,8 +165,8 @@ github.com/mdlayher/socket v0.4.0 h1:280wsy40IC9M9q1uPGcLBwXpcTQDtoGwVt+BNoITxIw
github.com/mdlayher/socket v0.4.0/go.mod h1:xxFqz5GRCUN3UEOm9CZqEJsAbe1C8OwSK46NlmWuVoc=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/nats-io/nats.go v1.33.1 h1:8TxLZZ/seeEfR97qV0/Bl939tpDnt2Z2fK3HkPypj70=
github.com/nats-io/nats.go v1.33.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nats.go v1.34.0 h1:fnxnPCNiwIG5w08rlMcEKTUw4AV/nKyGCOJE8TdhSPk=
github.com/nats-io/nats.go v1.34.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
@@ -440,8 +440,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240311173647-c811ad7063a7 h1:8EeVk1VKMD+GD/neyEHGmz7pFblqPjHoi+PGQIlLx2s=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240311173647-c811ad7063a7/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda h1:LI5DOvAxUPMv/50agcLLoo+AdWc1irS9Rzz4vPuD1V4=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY=
google.golang.org/grpc v1.62.1 h1:B4n+nfKzOICUXMgyrNd19h/I9oH0L1pizfk1d4zSgTk=
google.golang.org/grpc v1.62.1/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
+10 -8
View File
@@ -793,14 +793,16 @@ func (b *Buffer) mayGrowBucket() {
return
}
oldCap := cap
deltaInfo := b.rtpStats.DeltaInfo(b.ppsSnapshotId)
if deltaInfo != nil && deltaInfo.Duration > 500*time.Millisecond {
pps := int(time.Duration(deltaInfo.Packets) * time.Second / deltaInfo.Duration)
for pps > cap && cap < maxPkts {
cap = b.bucket.Grow()
}
if cap > oldCap {
b.logger.Debugw("grow bucket", "from", oldCap, "to", cap, "pps", pps)
if deltaInfo := b.rtpStats.DeltaInfo(b.ppsSnapshotId); deltaInfo != nil {
duration := deltaInfo.EndTime.Sub(deltaInfo.StartTime)
if duration > 500*time.Millisecond {
pps := int(time.Duration(deltaInfo.Packets) * time.Second / duration)
for pps > cap && cap < maxPkts {
cap = b.bucket.Grow()
}
if cap > oldCap {
b.logger.Debugw("grow bucket", "from", oldCap, "to", cap, "pps", pps)
}
}
}
}
+6 -7
View File
@@ -55,7 +55,7 @@ func RTPDriftToString(r *livekit.RTPDrift) string {
type RTPDeltaInfo struct {
StartTime time.Time
Duration time.Duration
EndTime time.Time
Packets uint32
Bytes uint64
HeaderBytes uint64
@@ -572,7 +572,7 @@ func (r *rtpStatsBase) deltaInfo(snapshotID uint32, extStartSN uint64, extHighes
if packetsExpected == 0 {
return &RTPDeltaInfo{
StartTime: startTime,
Duration: endTime.Sub(startTime),
EndTime: endTime,
}
}
@@ -592,7 +592,7 @@ func (r *rtpStatsBase) deltaInfo(snapshotID uint32, extStartSN uint64, extHighes
return &RTPDeltaInfo{
StartTime: startTime,
Duration: endTime.Sub(startTime),
EndTime: endTime,
Packets: uint32(packetsExpected),
Bytes: now.bytes - then.bytes,
HeaderBytes: now.headerBytes - then.headerBytes,
@@ -997,9 +997,8 @@ func AggregateRTPDeltaInfo(deltaInfoList []*RTPDeltaInfo) *RTPDeltaInfo {
startTime = deltaInfo.StartTime
}
endedAt := deltaInfo.StartTime.Add(deltaInfo.Duration)
if endTime.IsZero() || endTime.Before(endedAt) {
endTime = endedAt
if endTime.IsZero() || endTime.Before(deltaInfo.EndTime) {
endTime = deltaInfo.EndTime
}
packets += deltaInfo.Packets
@@ -1038,7 +1037,7 @@ func AggregateRTPDeltaInfo(deltaInfoList []*RTPDeltaInfo) *RTPDeltaInfo {
return &RTPDeltaInfo{
StartTime: startTime,
Duration: endTime.Sub(startTime),
EndTime: endTime,
Packets: packets,
Bytes: bytes,
HeaderBytes: headerBytes,
+1 -1
View File
@@ -776,7 +776,7 @@ func (r *RTPStatsSender) DeltaInfoSender(senderSnapshotID uint32) *RTPDeltaInfo
return &RTPDeltaInfo{
StartTime: startTime,
Duration: endTime.Sub(startTime),
EndTime: endTime,
Packets: packetsExpected - uint32(now.packetsPadding-then.packetsPadding),
Bytes: now.bytes - then.bytes,
HeaderBytes: now.headerBytes - then.headerBytes,
+4 -2
View File
@@ -22,6 +22,7 @@ import (
"github.com/frostbyte73/core"
"github.com/pion/webrtc/v3"
"go.uber.org/atomic"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
@@ -205,7 +206,7 @@ func (cs *ConnectionStats) updateScoreWithAggregate(agg *buffer.RTPDeltaInfo, at
var stat windowStat
if agg != nil {
stat.startedAt = agg.StartTime
stat.duration = agg.Duration
stat.duration = agg.EndTime.Sub(agg.StartTime)
stat.packetsExpected = agg.Packets + agg.PacketsPadding
stat.packetsLost = agg.PacketsLost
stat.packetsMissing = agg.PacketsMissing
@@ -263,7 +264,6 @@ func (cs *ConnectionStats) updateScoreFromReceiverReport(at time.Time) (float32,
}
if streamingStartedAt.After(agg.StartTime) {
agg.Duration = agg.StartTime.Add(agg.Duration).Sub(streamingStartedAt)
agg.StartTime = streamingStartedAt
}
return cs.updateScoreWithAggregate(agg, at), streams
@@ -428,6 +428,8 @@ func toAnalyticsStream(ssrc uint32, deltaStats *buffer.RTPDeltaInfo) *livekit.An
packetsLost -= deltaStats.PacketsMissing
}
return &livekit.AnalyticsStream{
StartTime: timestamppb.New(deltaStats.StartTime),
EndTime: timestamppb.New(deltaStats.EndTime),
Ssrc: ssrc,
PrimaryPackets: deltaStats.Packets,
PrimaryBytes: deltaStats.Bytes,
@@ -75,7 +75,7 @@ func TestConnectionQuality(t *testing.T) {
1: {
RTPStats: &buffer.RTPDeltaInfo{
StartTime: now,
Duration: duration,
EndTime: now.Add(duration),
Packets: 250,
},
},
@@ -91,7 +91,7 @@ func TestConnectionQuality(t *testing.T) {
1: {
RTPStats: &buffer.RTPDeltaInfo{
StartTime: now,
Duration: duration,
EndTime: now.Add(duration),
Packets: 120,
PacketsLost: 30,
},
@@ -99,7 +99,7 @@ func TestConnectionQuality(t *testing.T) {
2: {
RTPStats: &buffer.RTPDeltaInfo{
StartTime: now,
Duration: duration,
EndTime: now.Add(duration),
Packets: 130,
PacketsLost: 0,
},
@@ -118,7 +118,7 @@ func TestConnectionQuality(t *testing.T) {
1: {
RTPStats: &buffer.RTPDeltaInfo{
StartTime: now,
Duration: duration,
EndTime: now.Add(duration),
Packets: 250,
},
},
@@ -134,7 +134,7 @@ func TestConnectionQuality(t *testing.T) {
1: {
RTPStats: &buffer.RTPDeltaInfo{
StartTime: now,
Duration: duration,
EndTime: now.Add(duration),
Packets: 250,
},
},
@@ -150,7 +150,7 @@ func TestConnectionQuality(t *testing.T) {
1: {
RTPStats: &buffer.RTPDeltaInfo{
StartTime: now,
Duration: duration,
EndTime: now.Add(duration),
Packets: 250,
},
},
@@ -166,7 +166,7 @@ func TestConnectionQuality(t *testing.T) {
1: {
RTPStats: &buffer.RTPDeltaInfo{
StartTime: now,
Duration: duration,
EndTime: now.Add(duration),
Packets: 250,
PacketsLost: 13,
},
@@ -183,7 +183,7 @@ func TestConnectionQuality(t *testing.T) {
1: {
RTPStats: &buffer.RTPDeltaInfo{
StartTime: now,
Duration: duration,
EndTime: now.Add(duration),
Packets: 250,
},
},
@@ -199,7 +199,7 @@ func TestConnectionQuality(t *testing.T) {
1: {
RTPStats: &buffer.RTPDeltaInfo{
StartTime: now,
Duration: duration,
EndTime: now.Add(duration),
Packets: 250,
},
},
@@ -215,7 +215,7 @@ func TestConnectionQuality(t *testing.T) {
1: {
RTPStats: &buffer.RTPDeltaInfo{
StartTime: now,
Duration: duration,
EndTime: now.Add(duration),
Packets: 250,
PacketsLost: 30,
},
@@ -240,7 +240,7 @@ func TestConnectionQuality(t *testing.T) {
1: {
RTPStats: &buffer.RTPDeltaInfo{
StartTime: now,
Duration: duration,
EndTime: now.Add(duration),
Packets: 0,
},
},
@@ -256,7 +256,7 @@ func TestConnectionQuality(t *testing.T) {
1: {
RTPStats: &buffer.RTPDeltaInfo{
StartTime: now,
Duration: duration,
EndTime: now.Add(duration),
Packets: 0,
},
},
@@ -281,7 +281,7 @@ func TestConnectionQuality(t *testing.T) {
1: {
RTPStats: &buffer.RTPDeltaInfo{
StartTime: now,
Duration: duration,
EndTime: now.Add(duration),
Packets: 250,
PacketsLost: 0,
},
@@ -301,7 +301,7 @@ func TestConnectionQuality(t *testing.T) {
1: {
RTPStats: &buffer.RTPDeltaInfo{
StartTime: now,
Duration: duration,
EndTime: now.Add(duration),
Packets: 50,
PacketsLost: 5,
},
@@ -323,7 +323,7 @@ func TestConnectionQuality(t *testing.T) {
1: {
RTPStats: &buffer.RTPDeltaInfo{
StartTime: now,
Duration: duration,
EndTime: now.Add(duration),
Packets: 250,
PacketsLost: 5,
RttMax: 400,
@@ -349,7 +349,7 @@ func TestConnectionQuality(t *testing.T) {
1: {
RTPStats: &buffer.RTPDeltaInfo{
StartTime: now,
Duration: duration,
EndTime: now.Add(duration),
Packets: 250,
Bytes: 8_000_000 / 8 / 5,
},
@@ -368,7 +368,7 @@ func TestConnectionQuality(t *testing.T) {
1: {
RTPStats: &buffer.RTPDeltaInfo{
StartTime: now,
Duration: duration,
EndTime: now.Add(duration),
Packets: 250,
Bytes: 8_000_000 / 8 / 5,
},
@@ -392,7 +392,7 @@ func TestConnectionQuality(t *testing.T) {
1: {
RTPStats: &buffer.RTPDeltaInfo{
StartTime: now,
Duration: duration,
EndTime: now.Add(duration),
Packets: 250,
Bytes: 8_000_000 / 8 / 5,
},
@@ -419,7 +419,7 @@ func TestConnectionQuality(t *testing.T) {
1: {
RTPStats: &buffer.RTPDeltaInfo{
StartTime: now,
Duration: duration,
EndTime: now.Add(duration),
Packets: 250,
Bytes: 8_000_000 / 8 / 5,
},
@@ -453,7 +453,7 @@ func TestConnectionQuality(t *testing.T) {
1: {
RTPStats: &buffer.RTPDeltaInfo{
StartTime: now,
Duration: duration,
EndTime: now.Add(duration),
Packets: 250,
PacketsLost: 5,
RttMax: 700,
@@ -488,7 +488,7 @@ func TestConnectionQuality(t *testing.T) {
1: {
RTPStats: &buffer.RTPDeltaInfo{
StartTime: now,
Duration: duration,
EndTime: now.Add(duration),
Packets: 250,
PacketsLost: 5,
JitterMax: 200,
@@ -657,7 +657,7 @@ func TestConnectionQuality(t *testing.T) {
123: {
RTPStats: &buffer.RTPDeltaInfo{
StartTime: now,
Duration: duration,
EndTime: now.Add(duration),
Packets: tc.packetsExpected,
PacketsLost: uint32(math.Ceil(eq.packetLossPercentage * float64(tc.packetsExpected) / 100.0)),
},
@@ -761,7 +761,7 @@ func TestConnectionQuality(t *testing.T) {
123: {
RTPStats: &buffer.RTPDeltaInfo{
StartTime: now,
Duration: duration,
EndTime: now.Add(duration),
Packets: 100,
Bytes: tc.bytes,
},
@@ -855,7 +855,7 @@ func TestConnectionQuality(t *testing.T) {
123: {
RTPStats: &buffer.RTPDeltaInfo{
StartTime: now,
Duration: duration,
EndTime: now.Add(duration),
Packets: 200,
},
},
+1 -1
View File
@@ -505,7 +505,7 @@ func (q *qualityScorer) isPaused() bool {
}
func (q *qualityScorer) getPacketLossWeight(stat *windowStat) float64 {
if stat == nil || stat.duration == 0 {
if stat == nil || stat.duration <= 0 {
return q.params.PacketLossWeight
}
+14
View File
@@ -158,6 +158,8 @@ func coalesce(stats []*livekit.AnalyticsStat) *livekit.AnalyticsStat {
}
// find aggregates across streams
startTime := time.Time{}
endTime := time.Time{}
scoreSum := float32(0.0) // used for average
minScore := float32(0.0) // min score in batched stats
var scores []float32 // used for median
@@ -183,6 +185,16 @@ func coalesce(stats []*livekit.AnalyticsStat) *livekit.AnalyticsStat {
}
for _, analyticsStream := range stat.Streams {
start := analyticsStream.StartTime.AsTime()
if startTime.IsZero() || startTime.After(start) {
startTime = start
}
end := analyticsStream.EndTime.AsTime()
if endTime.IsZero() || endTime.Before(end) {
endTime = end
}
if analyticsStream.Rtt > maxRtt {
maxRtt = analyticsStream.Rtt
}
@@ -216,6 +228,8 @@ func coalesce(stats []*livekit.AnalyticsStat) *livekit.AnalyticsStat {
}
}
}
coalescedStream.StartTime = timestamppb.New(startTime)
coalescedStream.EndTime = timestamppb.New(endTime)
coalescedStream.Rtt = maxRtt
coalescedStream.Jitter = maxJitter