Use receiver report stats for loss/rtt/jitter. (#1781)

* Use receiver report stats for loss/rtt/jitter.

Reversing a bit of https://github.com/livekit/livekit/pull/1664.
That PR did two snapshots (one based on what SFU is sending
and one based on combination of what SFU is sending reconciled with
stats reported from client via RTCP Receiver Report). That PR
reported SFU only view to analytics. But, that view does not have
information about loss seen by client in the downstream.
Also, that does not have RTT/jitter information. The rationale behind
using SFU only view is that SFU should report what it sends irrespective
of client is receiving or not. But, that view did not have proper
loss/RTT/jitter.

So, switch back to reporting SFU + receiver report reconciled view.
The down side is that when receiver reports are not receiver,
packets sent/bytes sent will not be reported to analytics.

An option is to report SFU only view if there are no receiver reports.
But, it becomes complex because of the offset. Receiver report would
acknowledge certain range whereas SFU only view could be different
because of propagation delay. To simplify, just using the reconciled
view to report to analytics. Using the available view will require
a bunch more work to produce accurate data.
(NOTE: all this started due to a bug where RTCP was not restarted on
a track resume which killed receiver reports and we went on this path
to distinguish between publisher stopping vs RTCP receiver report not
happening)

One optimisation to here here concerns the check to see if publisher is sending data.
Using a full DeltaInfo for that is an overkill. Can do a lighter weight
for that later.

* return available streams

* fix test
This commit is contained in:
Raja Subramanian
2023-06-09 23:31:25 +05:30
committed by GitHub
parent f518f5d743
commit 72ed5b19f7
2 changed files with 95 additions and 58 deletions

View File

@@ -16,8 +16,6 @@ import (
const (
UpdateInterval = 5 * time.Second
processThreshold = 0.95
noStatsTooLongMultiplier = 2
noReceiverReportTooLongThreshold = 30 * time.Second
)
@@ -120,9 +118,9 @@ func (cs *ConnectionStats) updateScoreWithAggregate(agg *buffer.RTPDeltaInfo, at
return mos
}
func (cs *ConnectionStats) updateScoreFromReceiverReport(at time.Time) float32 {
func (cs *ConnectionStats) updateScoreFromReceiverReport(at time.Time) (float32, map[uint32]*buffer.StreamStatsWithLayers) {
if cs.params.GetDeltaStatsOverridden == nil || cs.params.GetLastReceiverReportTime == nil {
return MinMOS
return MinMOS, nil
}
cs.lock.RLock()
@@ -131,7 +129,7 @@ func (cs *ConnectionStats) updateScoreFromReceiverReport(at time.Time) float32 {
if streamingStartedAt.IsZero() {
// not streaming, just return current score
mos, _ := cs.scorer.GetMOSAndQuality()
return mos
return mos, nil
}
streams := cs.params.GetDeltaStatsOverridden()
@@ -143,12 +141,12 @@ func (cs *ConnectionStats) updateScoreFromReceiverReport(at time.Time) float32 {
}
if time.Since(marker) > noReceiverReportTooLongThreshold {
// have not received receiver report for a long time when streaming, run with nil stat
return cs.updateScoreWithAggregate(nil, at)
return cs.updateScoreWithAggregate(nil, at), nil
}
// wait for receiver report, return current score
mos, _ := cs.scorer.GetMOSAndQuality()
return mos
return mos, nil
}
// delta stat duration could be large due to not receiving receiver report for a long time (for example, due to mute),
@@ -157,17 +155,27 @@ func (cs *ConnectionStats) updateScoreFromReceiverReport(at time.Time) float32 {
if streamingStartedAt.After(cs.params.GetLastReceiverReportTime()) {
// last receiver report was before streaming started, wait for next one
mos, _ := cs.scorer.GetMOSAndQuality()
return mos
return mos, streams
}
if streamingStartedAt.After(agg.StartTime) {
agg.Duration = agg.StartTime.Add(agg.Duration).Sub(streamingStartedAt)
agg.StartTime = streamingStartedAt
}
return cs.updateScoreWithAggregate(agg, at)
return cs.updateScoreWithAggregate(agg, at), streams
}
func (cs *ConnectionStats) updateScore(streams map[uint32]*buffer.StreamStatsWithLayers, at time.Time) float32 {
func (cs *ConnectionStats) updateScore(at time.Time) (float32, map[uint32]*buffer.StreamStatsWithLayers) {
if cs.params.GetDeltaStats == nil {
return MinMOS, nil
}
streams := cs.params.GetDeltaStats()
if len(streams) == 0 {
mos, _ := cs.scorer.GetMOSAndQuality()
return mos, nil
}
deltaInfoList := make([]*buffer.RTPDeltaInfo, 0, len(streams))
for _, s := range streams {
deltaInfoList = append(deltaInfoList, s.RTPStats)
@@ -185,7 +193,7 @@ func (cs *ConnectionStats) updateScore(streams map[uint32]*buffer.StreamStatsWit
return cs.updateScoreFromReceiverReport(at)
}
return cs.updateScoreWithAggregate(agg, at)
return cs.updateScoreWithAggregate(agg, at), streams
}
func (cs *ConnectionStats) maybeSetStreamingStart(at time.Time) {
@@ -203,18 +211,9 @@ func (cs *ConnectionStats) clearStreamingStart() {
}
func (cs *ConnectionStats) getStat(at time.Time) {
if cs.params.GetDeltaStats == nil {
return
}
score, streams := cs.updateScore(at)
streams := cs.params.GetDeltaStats()
if len(streams) == 0 {
return
}
score := cs.updateScore(streams, at)
if cs.onStatsUpdate != nil {
if cs.onStatsUpdate != nil && len(streams) != 0 {
analyticsStreams := make([]*livekit.AnalyticsStream, 0, len(streams))
for ssrc, stream := range streams {
as := toAnalyticsStream(ssrc, stream.RTPStats)
@@ -317,6 +316,13 @@ func toAggregateDeltaInfo(streams map[uint32]*buffer.StreamStatsWithLayers) *buf
}
func toAnalyticsStream(ssrc uint32, deltaStats *buffer.RTPDeltaInfo) *livekit.AnalyticsStream {
// discount the feed side loss when reporting forwarded track stats
packetsLost := deltaStats.PacketsLost
if deltaStats.PacketsMissing > packetsLost {
packetsLost = 0
} else {
packetsLost -= deltaStats.PacketsMissing
}
return &livekit.AnalyticsStream{
Ssrc: ssrc,
PrimaryPackets: deltaStats.Packets,
@@ -325,7 +331,7 @@ func toAnalyticsStream(ssrc uint32, deltaStats *buffer.RTPDeltaInfo) *livekit.An
RetransmitBytes: deltaStats.BytesDuplicate,
PaddingPackets: deltaStats.PacketsPadding,
PaddingBytes: deltaStats.BytesPadding,
PacketsLost: deltaStats.PacketsLost,
PacketsLost: packetsLost,
Frames: deltaStats.Frames,
Rtt: deltaStats.RttMax,
Jitter: uint32(deltaStats.JitterMax),

View File

@@ -12,19 +12,30 @@ import (
"github.com/livekit/protocol/logger"
)
func newConnectionStats(mimeType string, isFECEnabled bool, includeRTT bool, includeJitter bool) *ConnectionStats {
func newConnectionStats(
mimeType string,
isFECEnabled bool,
includeRTT bool,
includeJitter bool,
getDeltaStats func() map[uint32]*buffer.StreamStatsWithLayers,
) *ConnectionStats {
return NewConnectionStats(ConnectionStatsParams{
MimeType: mimeType,
IsFECEnabled: isFECEnabled,
IncludeRTT: includeRTT,
IncludeJitter: includeJitter,
GetDeltaStats: getDeltaStats,
Logger: logger.GetLogger(),
})
}
func TestConnectionQuality(t *testing.T) {
t.Run("quality scorer state machine", func(t *testing.T) {
cs := newConnectionStats("audio/opus", false, true, true)
var streams map[uint32]*buffer.StreamStatsWithLayers
getDeltaStats := func() map[uint32]*buffer.StreamStatsWithLayers {
return streams
}
cs := newConnectionStats("audio/opus", false, true, true, getDeltaStats)
duration := 5 * time.Second
now := time.Now()
@@ -32,13 +43,13 @@ func TestConnectionQuality(t *testing.T) {
cs.UpdateMute(false, now.Add(-1*time.Second))
// no data and not enough unmute time should return default state which is EXCELLENT quality
cs.updateScore(nil, now)
cs.updateScore(now)
mos, quality := cs.GetScoreAndQuality()
require.Greater(t, float32(4.6), mos)
require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality)
// best conditions (no loss, jitter/rtt = 0) - quality should stay EXCELLENT
streams := map[uint32]*buffer.StreamStatsWithLayers{
streams = map[uint32]*buffer.StreamStatsWithLayers{
1: {
RTPStats: &buffer.RTPDeltaInfo{
StartTime: now,
@@ -47,7 +58,7 @@ func TestConnectionQuality(t *testing.T) {
},
},
}
cs.updateScore(streams, now.Add(duration))
cs.updateScore(now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(4.6), mos)
require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality)
@@ -72,7 +83,7 @@ func TestConnectionQuality(t *testing.T) {
},
},
}
cs.updateScore(streams, now.Add(duration))
cs.updateScore(now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(2.1), mos)
require.Equal(t, livekit.ConnectionQuality_POOR, quality)
@@ -90,7 +101,7 @@ func TestConnectionQuality(t *testing.T) {
},
},
}
cs.updateScore(streams, now.Add(duration))
cs.updateScore(now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(4.1), mos)
require.Equal(t, livekit.ConnectionQuality_GOOD, quality)
@@ -106,7 +117,7 @@ func TestConnectionQuality(t *testing.T) {
},
},
}
cs.updateScore(streams, now.Add(duration))
cs.updateScore(now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(4.1), mos)
require.Equal(t, livekit.ConnectionQuality_GOOD, quality)
@@ -122,7 +133,7 @@ func TestConnectionQuality(t *testing.T) {
},
},
}
cs.updateScore(streams, now.Add(duration))
cs.updateScore(now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(4.6), mos)
require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality)
@@ -139,7 +150,7 @@ func TestConnectionQuality(t *testing.T) {
},
},
}
cs.updateScore(streams, now.Add(duration))
cs.updateScore(now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(4.1), mos)
require.Equal(t, livekit.ConnectionQuality_GOOD, quality)
@@ -155,7 +166,7 @@ func TestConnectionQuality(t *testing.T) {
},
},
}
cs.updateScore(streams, now.Add(duration))
cs.updateScore(now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(4.1), mos)
require.Equal(t, livekit.ConnectionQuality_GOOD, quality)
@@ -171,7 +182,7 @@ func TestConnectionQuality(t *testing.T) {
},
},
}
cs.updateScore(streams, now.Add(duration))
cs.updateScore(now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(4.6), mos)
require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality)
@@ -188,7 +199,7 @@ func TestConnectionQuality(t *testing.T) {
},
},
}
cs.updateScore(streams, now.Add(duration))
cs.updateScore(now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(2.1), mos)
require.Equal(t, livekit.ConnectionQuality_POOR, quality)
@@ -212,7 +223,7 @@ func TestConnectionQuality(t *testing.T) {
},
},
}
cs.updateScore(streams, now.Add(duration))
cs.updateScore(now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(4.6), mos)
require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality)
@@ -228,7 +239,7 @@ func TestConnectionQuality(t *testing.T) {
},
},
}
cs.updateScore(streams, now.Add(duration))
cs.updateScore(now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(2.1), mos)
require.Equal(t, livekit.ConnectionQuality_POOR, quality)
@@ -250,7 +261,7 @@ func TestConnectionQuality(t *testing.T) {
},
},
}
cs.updateScore(streams, now.Add(duration))
cs.updateScore(now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(4.6), mos)
require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality)
@@ -274,7 +285,7 @@ func TestConnectionQuality(t *testing.T) {
},
},
}
cs.updateScore(streams, now.Add(duration))
cs.updateScore(now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(4.1), mos)
require.Equal(t, livekit.ConnectionQuality_GOOD, quality)
@@ -298,7 +309,7 @@ func TestConnectionQuality(t *testing.T) {
},
},
}
cs.updateScore(streams, now.Add(duration))
cs.updateScore(now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(4.1), mos)
require.Equal(t, livekit.ConnectionQuality_GOOD, quality)
@@ -324,7 +335,7 @@ func TestConnectionQuality(t *testing.T) {
},
},
}
cs.updateScore(streams, now.Add(duration))
cs.updateScore(now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(4.1), mos)
require.Equal(t, livekit.ConnectionQuality_GOOD, quality)
@@ -349,14 +360,18 @@ func TestConnectionQuality(t *testing.T) {
},
},
}
cs.updateScore(streams, now.Add(duration))
cs.updateScore(now.Add(duration))
mos, quality = cs.GetScoreAndQuality()
require.Greater(t, float32(4.1), mos)
require.Equal(t, livekit.ConnectionQuality_GOOD, quality)
})
t.Run("quality scorer dependent rtt", func(t *testing.T) {
cs := newConnectionStats("audio/opus", false, false, true)
var streams map[uint32]*buffer.StreamStatsWithLayers
getDeltaStats := func() map[uint32]*buffer.StreamStatsWithLayers {
return streams
}
cs := newConnectionStats("audio/opus", false, false, true, getDeltaStats)
duration := 5 * time.Second
now := time.Now()
@@ -366,7 +381,7 @@ func TestConnectionQuality(t *testing.T) {
// RTT does not knock quality down because it is dependent and hence not taken into account
// at 2% loss, quality should stay at EXCELLENT purely based on loss. With high RTT (700 ms)
// quality should drop to GOOD if RTT were taken into consideration
streams := map[uint32]*buffer.StreamStatsWithLayers{
streams = map[uint32]*buffer.StreamStatsWithLayers{
1: {
RTPStats: &buffer.RTPDeltaInfo{
StartTime: now,
@@ -377,14 +392,18 @@ func TestConnectionQuality(t *testing.T) {
},
},
}
cs.updateScore(streams, now.Add(duration))
cs.updateScore(now.Add(duration))
mos, quality := cs.GetScoreAndQuality()
require.Greater(t, float32(4.6), mos)
require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality)
})
t.Run("quality scorer dependent jitter", func(t *testing.T) {
cs := newConnectionStats("audio/opus", false, true, false)
var streams map[uint32]*buffer.StreamStatsWithLayers
getDeltaStats := func() map[uint32]*buffer.StreamStatsWithLayers {
return streams
}
cs := newConnectionStats("audio/opus", false, true, false, getDeltaStats)
duration := 5 * time.Second
now := time.Now()
@@ -394,7 +413,7 @@ func TestConnectionQuality(t *testing.T) {
// Jitter does not knock quality down because it is dependent and hence not taken into account
// at 2% loss, quality should stay at EXCELLENT purely based on loss. With high jitter (200 ms)
// quality should drop to GOOD if jitter were taken into consideration
streams := map[uint32]*buffer.StreamStatsWithLayers{
streams = map[uint32]*buffer.StreamStatsWithLayers{
1: {
RTPStats: &buffer.RTPDeltaInfo{
StartTime: now,
@@ -405,7 +424,7 @@ func TestConnectionQuality(t *testing.T) {
},
},
}
cs.updateScore(streams, now.Add(duration))
cs.updateScore(now.Add(duration))
mos, quality := cs.GetScoreAndQuality()
require.Greater(t, float32(4.6), mos)
require.Equal(t, livekit.ConnectionQuality_EXCELLENT, quality)
@@ -549,14 +568,18 @@ func TestConnectionQuality(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
cs := newConnectionStats(tc.mimeType, tc.isFECEnabled, true, true)
var streams map[uint32]*buffer.StreamStatsWithLayers
getDeltaStats := func() map[uint32]*buffer.StreamStatsWithLayers {
return streams
}
cs := newConnectionStats(tc.mimeType, tc.isFECEnabled, true, true, getDeltaStats)
duration := 5 * time.Second
now := time.Now()
cs.Start(&livekit.TrackInfo{Type: livekit.TrackType_AUDIO}, now.Add(-duration))
for _, eq := range tc.expectedQualities {
streams := map[uint32]*buffer.StreamStatsWithLayers{
streams = map[uint32]*buffer.StreamStatsWithLayers{
123: {
RTPStats: &buffer.RTPDeltaInfo{
StartTime: now,
@@ -566,7 +589,7 @@ func TestConnectionQuality(t *testing.T) {
},
},
}
cs.updateScore(streams, now.Add(duration))
cs.updateScore(now.Add(duration))
mos, quality := cs.GetScoreAndQuality()
require.Greater(t, eq.expectedMOS, mos)
require.Equal(t, eq.expectedQuality, quality)
@@ -642,7 +665,11 @@ func TestConnectionQuality(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
cs := newConnectionStats("video/vp8", false, true, true)
var streams map[uint32]*buffer.StreamStatsWithLayers
getDeltaStats := func() map[uint32]*buffer.StreamStatsWithLayers {
return streams
}
cs := newConnectionStats("video/vp8", false, true, true, getDeltaStats)
duration := 5 * time.Second
now := time.Now()
@@ -652,7 +679,7 @@ func TestConnectionQuality(t *testing.T) {
cs.AddBitrateTransition(tr.bitrate, now.Add(tr.offset))
}
streams := map[uint32]*buffer.StreamStatsWithLayers{
streams = map[uint32]*buffer.StreamStatsWithLayers{
123: {
RTPStats: &buffer.RTPDeltaInfo{
StartTime: now,
@@ -662,7 +689,7 @@ func TestConnectionQuality(t *testing.T) {
},
},
}
cs.updateScore(streams, now.Add(duration))
cs.updateScore(now.Add(duration))
mos, quality := cs.GetScoreAndQuality()
require.Greater(t, tc.expectedMOS, mos)
require.Equal(t, tc.expectedQuality, quality)
@@ -729,7 +756,11 @@ func TestConnectionQuality(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
cs := newConnectionStats("video/vp8", false, true, true)
var streams map[uint32]*buffer.StreamStatsWithLayers
getDeltaStats := func() map[uint32]*buffer.StreamStatsWithLayers {
return streams
}
cs := newConnectionStats("video/vp8", false, true, true, getDeltaStats)
duration := 5 * time.Second
now := time.Now()
@@ -739,7 +770,7 @@ func TestConnectionQuality(t *testing.T) {
cs.AddLayerTransition(tr.distance, now.Add(tr.offset))
}
streams := map[uint32]*buffer.StreamStatsWithLayers{
streams = map[uint32]*buffer.StreamStatsWithLayers{
123: {
RTPStats: &buffer.RTPDeltaInfo{
StartTime: now,
@@ -748,7 +779,7 @@ func TestConnectionQuality(t *testing.T) {
},
},
}
cs.updateScore(streams, now.Add(duration))
cs.updateScore(now.Add(duration))
mos, quality := cs.GetScoreAndQuality()
require.Greater(t, tc.expectedMOS, mos)
require.Equal(t, tc.expectedQuality, quality)