mirror of
https://github.com/livekit/livekit.git
synced 2026-05-22 12:46:02 +00:00
Do not count padding packets in stream tracker. (#667)
* Do not count padding packets in stream tracker. There are cases where publisher uses padding only packets in a layer to probe the channel. Treating those as valid layer packets makes stream tracker declare that the layer is active where in reality, it is not. Also, removing check for out-of-order packets. Out-of-order packets can happen and should be counted in bitrate calculation. There is the extreme case of heavy loss which might skew it, but that is an extreme case. * Fix test
This commit is contained in:
@@ -64,9 +64,6 @@ type StreamTracker struct {
|
||||
// only access within detectWorker
|
||||
cycleCount uint32
|
||||
|
||||
// only access by the same goroutine as Observe
|
||||
lastSN uint16
|
||||
|
||||
lastBitrateReport time.Time
|
||||
bytesForBitrate [4]int64
|
||||
bitrate [4]int64
|
||||
@@ -184,7 +181,7 @@ func (s *StreamTracker) Observe(sn uint16, temporalLayer int32, pktSize int, pay
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
if s.isStopped || s.paused {
|
||||
if s.isStopped || s.paused || payloadSize == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -192,11 +189,10 @@ func (s *StreamTracker) Observe(sn uint16, temporalLayer int32, pktSize int, pay
|
||||
// first packet
|
||||
s.initialized = true
|
||||
|
||||
s.lastSN = sn
|
||||
s.countSinceLast = 1
|
||||
|
||||
s.lastBitrateReport = time.Now()
|
||||
if temporalLayer >= 0 && payloadSize > 0 {
|
||||
if temporalLayer >= 0 {
|
||||
s.bytesForBitrate[temporalLayer] += int64(pktSize)
|
||||
}
|
||||
|
||||
@@ -206,14 +202,9 @@ func (s *StreamTracker) Observe(sn uint16, temporalLayer int32, pktSize int, pay
|
||||
return
|
||||
}
|
||||
|
||||
// ignore out-of-order SNs
|
||||
if (sn - s.lastSN) > uint16(1<<15) {
|
||||
return
|
||||
}
|
||||
s.lastSN = sn
|
||||
s.countSinceLast++
|
||||
|
||||
if temporalLayer >= 0 && payloadSize > 0 {
|
||||
if temporalLayer >= 0 {
|
||||
s.bytesForBitrate[temporalLayer] += int64(pktSize)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,7 +32,7 @@ func TestStreamTracker(t *testing.T) {
|
||||
require.Equal(t, StreamStatusStopped, tracker.Status())
|
||||
|
||||
// observe first packet
|
||||
tracker.Observe(1, 0, 0, 0)
|
||||
tracker.Observe(1, 0, 20, 10)
|
||||
|
||||
testutils.WithTimeout(t, func() string {
|
||||
if callbackCalled.Load() {
|
||||
@@ -53,7 +53,7 @@ func TestStreamTracker(t *testing.T) {
|
||||
tracker.Start()
|
||||
require.Equal(t, StreamStatusStopped, tracker.Status())
|
||||
|
||||
tracker.Observe(1, 0, 0, 0)
|
||||
tracker.Observe(1, 0, 20, 10)
|
||||
testutils.WithTimeout(t, func() string {
|
||||
if tracker.Status() == StreamStatusActive {
|
||||
return ""
|
||||
@@ -89,7 +89,7 @@ func TestStreamTracker(t *testing.T) {
|
||||
tracker.Start()
|
||||
require.Equal(t, StreamStatusStopped, tracker.Status())
|
||||
|
||||
tracker.Observe(1, 0, 0, 0)
|
||||
tracker.Observe(1, 0, 20, 10)
|
||||
testutils.WithTimeout(t, func() string {
|
||||
if tracker.Status() == StreamStatusActive {
|
||||
return ""
|
||||
@@ -100,11 +100,11 @@ func TestStreamTracker(t *testing.T) {
|
||||
|
||||
tracker.maybeSetStatus(StreamStatusStopped)
|
||||
|
||||
tracker.Observe(2, 0, 0, 0)
|
||||
tracker.Observe(2, 0, 20, 10)
|
||||
tracker.detectChanges()
|
||||
require.Equal(t, StreamStatusStopped, tracker.Status())
|
||||
|
||||
tracker.Observe(3, 0, 0, 0)
|
||||
tracker.Observe(3, 0, 20, 10)
|
||||
tracker.detectChanges()
|
||||
require.Equal(t, StreamStatusActive, tracker.Status())
|
||||
|
||||
@@ -114,7 +114,7 @@ func TestStreamTracker(t *testing.T) {
|
||||
t.Run("changes to inactive when paused", func(t *testing.T) {
|
||||
tracker := newStreamTracker(5, 60, 500*time.Millisecond)
|
||||
tracker.Start()
|
||||
tracker.Observe(1, 0, 0, 0)
|
||||
tracker.Observe(1, 0, 20, 10)
|
||||
testutils.WithTimeout(t, func() string {
|
||||
if tracker.Status() == StreamStatusActive {
|
||||
return ""
|
||||
@@ -140,7 +140,7 @@ func TestStreamTracker(t *testing.T) {
|
||||
require.Equal(t, StreamStatusStopped, tracker.Status())
|
||||
|
||||
// observe first packet
|
||||
tracker.Observe(1, 0, 0, 0)
|
||||
tracker.Observe(1, 0, 20, 10)
|
||||
|
||||
testutils.WithTimeout(t, func() string {
|
||||
if callbackCalled.Load() == 1 {
|
||||
@@ -154,10 +154,10 @@ func TestStreamTracker(t *testing.T) {
|
||||
require.Equal(t, uint32(1), callbackCalled.Load())
|
||||
|
||||
// observe a few more
|
||||
tracker.Observe(2, 0, 0, 0)
|
||||
tracker.Observe(3, 0, 0, 0)
|
||||
tracker.Observe(4, 0, 0, 0)
|
||||
tracker.Observe(5, 0, 0, 0)
|
||||
tracker.Observe(2, 0, 20, 10)
|
||||
tracker.Observe(3, 0, 20, 10)
|
||||
tracker.Observe(4, 0, 20, 10)
|
||||
tracker.Observe(5, 0, 20, 10)
|
||||
tracker.detectChanges()
|
||||
|
||||
// should still be active
|
||||
@@ -168,7 +168,7 @@ func TestStreamTracker(t *testing.T) {
|
||||
require.Equal(t, StreamStatusStopped, tracker.Status())
|
||||
|
||||
// first packet after reset
|
||||
tracker.Observe(1, 0, 0, 0)
|
||||
tracker.Observe(1, 0, 20, 10)
|
||||
|
||||
testutils.WithTimeout(t, func() string {
|
||||
if callbackCalled.Load() == 2 {
|
||||
|
||||
Reference in New Issue
Block a user