From 0d8848cfcd7747aafc7e24366db8a5cbbd10b860 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Thu, 5 May 2022 13:09:59 +0530 Subject: [PATCH] Do not count padding packets in stream tracker. (#667) * Do not count padding packets in stream tracker. There are cases where publisher uses padding only packets in a layer to probe the channel. Treating those as valid layer packets makes stream tracker declare that the layer is active where in reality, it is not. Also, removing check for out-of-order packets. Out-of-order packets can happen and should be counted in bitrate calculation. There is the extreme case of heavy loss which might skew it, but that is an extreme case. * Fix test --- pkg/sfu/streamtracker.go | 15 +++------------ pkg/sfu/streamtracker_test.go | 24 ++++++++++++------------ 2 files changed, 15 insertions(+), 24 deletions(-) diff --git a/pkg/sfu/streamtracker.go b/pkg/sfu/streamtracker.go index 0a929e1e0..41db4bcbc 100644 --- a/pkg/sfu/streamtracker.go +++ b/pkg/sfu/streamtracker.go @@ -64,9 +64,6 @@ type StreamTracker struct { // only access within detectWorker cycleCount uint32 - // only access by the same goroutine as Observe - lastSN uint16 - lastBitrateReport time.Time bytesForBitrate [4]int64 bitrate [4]int64 @@ -184,7 +181,7 @@ func (s *StreamTracker) Observe(sn uint16, temporalLayer int32, pktSize int, pay s.lock.Lock() defer s.lock.Unlock() - if s.isStopped || s.paused { + if s.isStopped || s.paused || payloadSize == 0 { return } @@ -192,11 +189,10 @@ func (s *StreamTracker) Observe(sn uint16, temporalLayer int32, pktSize int, pay // first packet s.initialized = true - s.lastSN = sn s.countSinceLast = 1 s.lastBitrateReport = time.Now() - if temporalLayer >= 0 && payloadSize > 0 { + if temporalLayer >= 0 { s.bytesForBitrate[temporalLayer] += int64(pktSize) } @@ -206,14 +202,9 @@ func (s *StreamTracker) Observe(sn uint16, temporalLayer int32, pktSize int, pay return } - // ignore out-of-order SNs - if (sn - s.lastSN) > uint16(1<<15) { - return - } - s.lastSN = sn s.countSinceLast++ - if temporalLayer >= 0 && payloadSize > 0 { + if temporalLayer >= 0 { s.bytesForBitrate[temporalLayer] += int64(pktSize) } } diff --git a/pkg/sfu/streamtracker_test.go b/pkg/sfu/streamtracker_test.go index c5fe855d3..6d9b2c8b0 100644 --- a/pkg/sfu/streamtracker_test.go +++ b/pkg/sfu/streamtracker_test.go @@ -32,7 +32,7 @@ func TestStreamTracker(t *testing.T) { require.Equal(t, StreamStatusStopped, tracker.Status()) // observe first packet - tracker.Observe(1, 0, 0, 0) + tracker.Observe(1, 0, 20, 10) testutils.WithTimeout(t, func() string { if callbackCalled.Load() { @@ -53,7 +53,7 @@ func TestStreamTracker(t *testing.T) { tracker.Start() require.Equal(t, StreamStatusStopped, tracker.Status()) - tracker.Observe(1, 0, 0, 0) + tracker.Observe(1, 0, 20, 10) testutils.WithTimeout(t, func() string { if tracker.Status() == StreamStatusActive { return "" @@ -89,7 +89,7 @@ func TestStreamTracker(t *testing.T) { tracker.Start() require.Equal(t, StreamStatusStopped, tracker.Status()) - tracker.Observe(1, 0, 0, 0) + tracker.Observe(1, 0, 20, 10) testutils.WithTimeout(t, func() string { if tracker.Status() == StreamStatusActive { return "" @@ -100,11 +100,11 @@ func TestStreamTracker(t *testing.T) { tracker.maybeSetStatus(StreamStatusStopped) - tracker.Observe(2, 0, 0, 0) + tracker.Observe(2, 0, 20, 10) tracker.detectChanges() require.Equal(t, StreamStatusStopped, tracker.Status()) - tracker.Observe(3, 0, 0, 0) + tracker.Observe(3, 0, 20, 10) tracker.detectChanges() require.Equal(t, StreamStatusActive, tracker.Status()) @@ -114,7 +114,7 @@ func TestStreamTracker(t *testing.T) { t.Run("changes to inactive when paused", func(t *testing.T) { tracker := newStreamTracker(5, 60, 500*time.Millisecond) tracker.Start() - tracker.Observe(1, 0, 0, 0) + tracker.Observe(1, 0, 20, 10) testutils.WithTimeout(t, func() string { if tracker.Status() == StreamStatusActive { return "" @@ -140,7 +140,7 @@ func TestStreamTracker(t *testing.T) { require.Equal(t, StreamStatusStopped, tracker.Status()) // observe first packet - tracker.Observe(1, 0, 0, 0) + tracker.Observe(1, 0, 20, 10) testutils.WithTimeout(t, func() string { if callbackCalled.Load() == 1 { @@ -154,10 +154,10 @@ func TestStreamTracker(t *testing.T) { require.Equal(t, uint32(1), callbackCalled.Load()) // observe a few more - tracker.Observe(2, 0, 0, 0) - tracker.Observe(3, 0, 0, 0) - tracker.Observe(4, 0, 0, 0) - tracker.Observe(5, 0, 0, 0) + tracker.Observe(2, 0, 20, 10) + tracker.Observe(3, 0, 20, 10) + tracker.Observe(4, 0, 20, 10) + tracker.Observe(5, 0, 20, 10) tracker.detectChanges() // should still be active @@ -168,7 +168,7 @@ func TestStreamTracker(t *testing.T) { require.Equal(t, StreamStatusStopped, tracker.Status()) // first packet after reset - tracker.Observe(1, 0, 0, 0) + tracker.Observe(1, 0, 20, 10) testutils.WithTimeout(t, func() string { if callbackCalled.Load() == 2 {