diff --git a/pkg/sfu/streamtracker.go b/pkg/sfu/streamtracker.go index 0a929e1e0..41db4bcbc 100644 --- a/pkg/sfu/streamtracker.go +++ b/pkg/sfu/streamtracker.go @@ -64,9 +64,6 @@ type StreamTracker struct { // only access within detectWorker cycleCount uint32 - // only access by the same goroutine as Observe - lastSN uint16 - lastBitrateReport time.Time bytesForBitrate [4]int64 bitrate [4]int64 @@ -184,7 +181,7 @@ func (s *StreamTracker) Observe(sn uint16, temporalLayer int32, pktSize int, pay s.lock.Lock() defer s.lock.Unlock() - if s.isStopped || s.paused { + if s.isStopped || s.paused || payloadSize == 0 { return } @@ -192,11 +189,10 @@ func (s *StreamTracker) Observe(sn uint16, temporalLayer int32, pktSize int, pay // first packet s.initialized = true - s.lastSN = sn s.countSinceLast = 1 s.lastBitrateReport = time.Now() - if temporalLayer >= 0 && payloadSize > 0 { + if temporalLayer >= 0 { s.bytesForBitrate[temporalLayer] += int64(pktSize) } @@ -206,14 +202,9 @@ func (s *StreamTracker) Observe(sn uint16, temporalLayer int32, pktSize int, pay return } - // ignore out-of-order SNs - if (sn - s.lastSN) > uint16(1<<15) { - return - } - s.lastSN = sn s.countSinceLast++ - if temporalLayer >= 0 && payloadSize > 0 { + if temporalLayer >= 0 { s.bytesForBitrate[temporalLayer] += int64(pktSize) } } diff --git a/pkg/sfu/streamtracker_test.go b/pkg/sfu/streamtracker_test.go index c5fe855d3..6d9b2c8b0 100644 --- a/pkg/sfu/streamtracker_test.go +++ b/pkg/sfu/streamtracker_test.go @@ -32,7 +32,7 @@ func TestStreamTracker(t *testing.T) { require.Equal(t, StreamStatusStopped, tracker.Status()) // observe first packet - tracker.Observe(1, 0, 0, 0) + tracker.Observe(1, 0, 20, 10) testutils.WithTimeout(t, func() string { if callbackCalled.Load() { @@ -53,7 +53,7 @@ func TestStreamTracker(t *testing.T) { tracker.Start() require.Equal(t, StreamStatusStopped, tracker.Status()) - tracker.Observe(1, 0, 0, 0) + tracker.Observe(1, 0, 20, 10) testutils.WithTimeout(t, func() string { if tracker.Status() == StreamStatusActive { return "" @@ -89,7 +89,7 @@ func TestStreamTracker(t *testing.T) { tracker.Start() require.Equal(t, StreamStatusStopped, tracker.Status()) - tracker.Observe(1, 0, 0, 0) + tracker.Observe(1, 0, 20, 10) testutils.WithTimeout(t, func() string { if tracker.Status() == StreamStatusActive { return "" @@ -100,11 +100,11 @@ func TestStreamTracker(t *testing.T) { tracker.maybeSetStatus(StreamStatusStopped) - tracker.Observe(2, 0, 0, 0) + tracker.Observe(2, 0, 20, 10) tracker.detectChanges() require.Equal(t, StreamStatusStopped, tracker.Status()) - tracker.Observe(3, 0, 0, 0) + tracker.Observe(3, 0, 20, 10) tracker.detectChanges() require.Equal(t, StreamStatusActive, tracker.Status()) @@ -114,7 +114,7 @@ func TestStreamTracker(t *testing.T) { t.Run("changes to inactive when paused", func(t *testing.T) { tracker := newStreamTracker(5, 60, 500*time.Millisecond) tracker.Start() - tracker.Observe(1, 0, 0, 0) + tracker.Observe(1, 0, 20, 10) testutils.WithTimeout(t, func() string { if tracker.Status() == StreamStatusActive { return "" @@ -140,7 +140,7 @@ func TestStreamTracker(t *testing.T) { require.Equal(t, StreamStatusStopped, tracker.Status()) // observe first packet - tracker.Observe(1, 0, 0, 0) + tracker.Observe(1, 0, 20, 10) testutils.WithTimeout(t, func() string { if callbackCalled.Load() == 1 { @@ -154,10 +154,10 @@ func TestStreamTracker(t *testing.T) { require.Equal(t, uint32(1), callbackCalled.Load()) // observe a few more - tracker.Observe(2, 0, 0, 0) - tracker.Observe(3, 0, 0, 0) - tracker.Observe(4, 0, 0, 0) - tracker.Observe(5, 0, 0, 0) + tracker.Observe(2, 0, 20, 10) + tracker.Observe(3, 0, 20, 10) + tracker.Observe(4, 0, 20, 10) + tracker.Observe(5, 0, 20, 10) tracker.detectChanges() // should still be active @@ -168,7 +168,7 @@ func TestStreamTracker(t *testing.T) { require.Equal(t, StreamStatusStopped, tracker.Status()) // first packet after reset - tracker.Observe(1, 0, 0, 0) + tracker.Observe(1, 0, 20, 10) testutils.WithTimeout(t, func() string { if callbackCalled.Load() == 2 {