Files
livekit/pkg/sfu/streamtracker_test.go
T
Raja Subramanian 641858832a Address edge case stream allocation (#544)
* Handle an edge in layer lock.

A very edge case
- Available layer: [0, 1, 2], but bitrate is not yet available.
We set it to layer 2 awaiting measurement.
- Measurement for layers 0 and 1 come through.
- Still no key frame for layer 2.
- Finalize layers runs and sees that bitrate is available for 0 and 1.
It finalizes layer 1.
- Layer 1 key frame comes (because we asked key frame of layer 2,
publisher sends key frame for all layers). Locks to layer 1.
- No more events happen to switch to layer 2.

Changes
-------
- Move bit rate measurement to StreamTrackerManager. Allows re-use
in relay.
- Make bit rate availability (from zero -> non-zero) an event
and let it flow through the stream allocation flow so that we
always have an event when bit rate measurement becomes available.
This gets rid of finalization which I was unhappy with it anyway as
it was polling every second.
- Removing REMB stuff from buffer. We do not use it.
It is incorrect anyway. REMB should be ay peer connection level.

* Fix test

* fix test

* Simplify allocate

* Simplify/clean up
2022-03-21 14:53:31 +05:30

187 lines
4.8 KiB
Go

package sfu
import (
"fmt"
"testing"
"time"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"github.com/livekit/livekit-server/pkg/testutils"
"github.com/livekit/protocol/logger"
)
func newStreamTracker(samplesRequired uint32, cyclesRequired uint32, cycleDuration time.Duration) *StreamTracker {
return NewStreamTracker(StreamTrackerParams{
SamplesRequired: samplesRequired,
CyclesRequired: cyclesRequired,
CycleDuration: cycleDuration,
Logger: logger.Logger(logger.GetLogger()),
})
}
func TestStreamTracker(t *testing.T) {
t.Run("flips to active on first observe", func(t *testing.T) {
callbackCalled := atomic.NewBool(false)
tracker := newStreamTracker(5, 60, 500*time.Millisecond)
tracker.Start()
tracker.OnStatusChanged(func(status StreamStatus) {
callbackCalled.Store(true)
})
require.Equal(t, StreamStatusStopped, tracker.Status())
// observe first packet
tracker.Observe(1, 0, 0)
testutils.WithTimeout(t, func() string {
if callbackCalled.Load() {
return ""
} else {
return "first packet didn't activate stream"
}
})
require.Equal(t, StreamStatusActive, tracker.Status())
require.True(t, callbackCalled.Load())
tracker.Stop()
})
t.Run("flips to inactive immediately", func(t *testing.T) {
tracker := newStreamTracker(5, 60, 500*time.Millisecond)
tracker.Start()
require.Equal(t, StreamStatusStopped, tracker.Status())
tracker.Observe(1, 0, 0)
testutils.WithTimeout(t, func() string {
if tracker.Status() == StreamStatusActive {
return ""
} else {
return "first packet did not activate stream"
}
})
callbackCalled := atomic.NewBool(false)
tracker.OnStatusChanged(func(status StreamStatus) {
callbackCalled.Store(true)
})
require.Equal(t, StreamStatusActive, tracker.Status())
// run a single iteration
tracker.detectChanges()
testutils.WithTimeout(t, func() string {
if callbackCalled.Load() {
return ""
} else {
return "inactive cycle did not declare stream stopped"
}
})
require.Equal(t, StreamStatusStopped, tracker.Status())
require.True(t, callbackCalled.Load())
tracker.Stop()
})
t.Run("flips back to active after iterations", func(t *testing.T) {
tracker := newStreamTracker(1, 2, 500*time.Millisecond)
tracker.Start()
require.Equal(t, StreamStatusStopped, tracker.Status())
tracker.Observe(1, 0, 0)
testutils.WithTimeout(t, func() string {
if tracker.Status() == StreamStatusActive {
return ""
} else {
return "first packet did not activate stream"
}
})
tracker.maybeSetStatus(StreamStatusStopped)
tracker.Observe(2, 0, 0)
tracker.detectChanges()
require.Equal(t, StreamStatusStopped, tracker.Status())
tracker.Observe(3, 0, 0)
tracker.detectChanges()
require.Equal(t, StreamStatusActive, tracker.Status())
tracker.Stop()
})
t.Run("changes to inactive when paused", func(t *testing.T) {
tracker := newStreamTracker(5, 60, 500*time.Millisecond)
tracker.Start()
tracker.Observe(1, 0, 0)
testutils.WithTimeout(t, func() string {
if tracker.Status() == StreamStatusActive {
return ""
} else {
return "first packet did not activate stream"
}
})
tracker.SetPaused(true)
tracker.detectChanges()
require.Equal(t, StreamStatusStopped, tracker.Status())
tracker.Stop()
})
t.Run("flips back to active on first observe after reset", func(t *testing.T) {
callbackCalled := atomic.NewUint32(0)
tracker := newStreamTracker(5, 60, 500*time.Millisecond)
tracker.Start()
tracker.OnStatusChanged(func(status StreamStatus) {
callbackCalled.Inc()
})
require.Equal(t, StreamStatusStopped, tracker.Status())
// observe first packet
tracker.Observe(1, 0, 0)
testutils.WithTimeout(t, func() string {
if callbackCalled.Load() == 1 {
return ""
} else {
return fmt.Sprintf("expected onStatusChanged to be called once, actual: %d", callbackCalled.Load())
}
})
require.Equal(t, StreamStatusActive, tracker.Status())
require.Equal(t, uint32(1), callbackCalled.Load())
// observe a few more
tracker.Observe(2, 0, 0)
tracker.Observe(3, 0, 0)
tracker.Observe(4, 0, 0)
tracker.Observe(5, 0, 0)
tracker.detectChanges()
// should still be active
require.Equal(t, StreamStatusActive, tracker.Status())
// Reset. The first packet after reset should flip state again
tracker.Reset()
require.Equal(t, StreamStatusStopped, tracker.Status())
// first packet after reset
tracker.Observe(1, 0, 0)
testutils.WithTimeout(t, func() string {
if callbackCalled.Load() == 2 {
return ""
} else {
return fmt.Sprintf("expected onStatusChanged to be called twice, actual %d", callbackCalled.Load())
}
})
require.Equal(t, StreamStatusActive, tracker.Status())
require.Equal(t, uint32(2), callbackCalled.Load())
tracker.Stop()
})
}