From 75ec31f237102d7b9bd175cc008184109dd5dc13 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Tue, 19 Apr 2022 12:37:53 +0530 Subject: [PATCH] Support starting off with not forwarding video (#623) --- pkg/rtc/mediatrackreceiver.go | 2 +- pkg/rtc/subscribedtrack.go | 6 +++- pkg/sfu/forwarder.go | 61 ++++++++++++++++++++++------------- pkg/sfu/forwarder_test.go | 33 +++++++++++++------ 4 files changed, 68 insertions(+), 34 deletions(-) diff --git a/pkg/rtc/mediatrackreceiver.go b/pkg/rtc/mediatrackreceiver.go index c523378fe..daceda756 100644 --- a/pkg/rtc/mediatrackreceiver.go +++ b/pkg/rtc/mediatrackreceiver.go @@ -91,7 +91,7 @@ func (t *MediaTrackReceiver) Restart() { t.lock.Unlock() if receiver != nil { - receiver.SetMaxExpectedSpatialLayer(sfu.DefaultMaxLayerSpatial) + receiver.SetMaxExpectedSpatialLayer(SpatialLayerForQuality(livekit.VideoQuality_HIGH)) t.MediaTrackSubscriptions.Restart() } } diff --git a/pkg/rtc/subscribedtrack.go b/pkg/rtc/subscribedtrack.go index b03579366..b9f013fd9 100644 --- a/pkg/rtc/subscribedtrack.go +++ b/pkg/rtc/subscribedtrack.go @@ -24,6 +24,7 @@ type SubscribedTrackParams struct { MediaTrack types.MediaTrack DownTrack *sfu.DownTrack } + type SubscribedTrack struct { params SubscribedTrackParams subMuted atomic.Bool @@ -36,10 +37,13 @@ type SubscribedTrack struct { } func NewSubscribedTrack(params SubscribedTrackParams) *SubscribedTrack { - return &SubscribedTrack{ + s := &SubscribedTrack{ params: params, debouncer: debounce.New(subscriptionDebounceInterval), } + + s.params.DownTrack.SetMaxSpatialLayer(SpatialLayerForQuality(livekit.VideoQuality_HIGH)) + return s } func (t *SubscribedTrack) OnBind(f func()) { diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 444ffe231..9936582f1 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -17,10 +17,13 @@ import ( // Forwarder // const ( - FlagPauseOnDowngrade = true - FlagFilterRTX = true + FlagPauseOnDowngrade = true + FlagFilterRTX = true + TransitionCostSpatial = 10 ) +// ------------------------------------------------------------------- + type ForwardingStatus int const ( @@ -29,6 +32,8 @@ const ( ForwardingStatusOptimal ) +// ------------------------------------------------------------------- + type VideoStreamingChange int const ( @@ -50,6 +55,8 @@ func (v VideoStreamingChange) String() string { } } +// ------------------------------------------------------------------- + type VideoAllocationState int const ( @@ -102,22 +109,25 @@ var ( } ) +// ------------------------------------------------------------------- + type VideoAllocationProvisional struct { layers VideoLayers muted bool bitrates Bitrates availableLayers []int32 + maxLayers VideoLayers } +// ------------------------------------------------------------------- + type VideoTransition struct { from VideoLayers to VideoLayers bandwidthDelta int64 } -const ( - TransitionCostSpatial = 10 -) +// ------------------------------------------------------------------- type TranslationParams struct { shouldDrop bool @@ -127,6 +137,8 @@ type TranslationParams struct { vp8 *TranslationParamsVP8 } +// ------------------------------------------------------------------- + type VideoLayers struct { spatial int32 temporal int32 @@ -144,6 +156,12 @@ func (v VideoLayers) SpatialGreaterThanOrEqual(v2 VideoLayers) bool { return v.spatial >= v2.spatial } +func (v VideoLayers) IsValid() bool { + return v.spatial != InvalidLayerSpatial && v.temporal != InvalidLayerTemporal +} + +// ------------------------------------------------------------------- + const ( InvalidLayerSpatial = int32(-1) InvalidLayerTemporal = int32(-1) @@ -157,13 +175,10 @@ var ( spatial: InvalidLayerSpatial, temporal: InvalidLayerTemporal, } - - DefaultMaxLayers = VideoLayers{ - spatial: DefaultMaxLayerSpatial, - temporal: DefaultMaxLayerTemporal, - } ) +// ------------------------------------------------------------------- + type Forwarder struct { lock sync.RWMutex codec webrtc.RTPCodecCapability @@ -213,7 +228,7 @@ func NewForwarder(codec webrtc.RTPCodecCapability, kind webrtc.RTPCodecType, log } if f.kind == webrtc.RTPCodecTypeVideo { - f.maxLayers = DefaultMaxLayers + f.maxLayers = VideoLayers{spatial: InvalidLayerSpatial, temporal: DefaultMaxLayerTemporal} } else { f.maxLayers = InvalidLayers } @@ -436,7 +451,7 @@ func (f *Forwarder) AllocateOptimal(brs Bitrates) VideoAllocation { targetLayers.spatial = int32(math.Min(float64(f.maxLayers.spatial), float64(f.availableLayers[len(f.availableLayers)-1]))) targetLayers.temporal = int32(math.Max(0, float64(f.maxLayers.temporal))) - if f.targetLayers == InvalidLayers { + if f.targetLayers == InvalidLayers && targetLayers.IsValid() { change = VideoStreamingChangeResuming } default: @@ -467,6 +482,9 @@ func (f *Forwarder) AllocateOptimal(brs Bitrates) VideoAllocation { } } + if !targetLayers.IsValid() { + targetLayers = InvalidLayers + } f.lastAllocation = VideoAllocation{ state: state, change: change, @@ -494,6 +512,7 @@ func (f *Forwarder) ProvisionalAllocatePrepare(bitrates Bitrates) { muted: f.muted, bitrates: bitrates, availableLayers: f.availableLayers, + maxLayers: f.maxLayers, } } @@ -501,11 +520,7 @@ func (f *Forwarder) ProvisionalAllocate(availableChannelCapacity int64, layers V f.lock.Lock() defer f.lock.Unlock() - if f.provisional.muted { - return 0 - } - - if layers.GreaterThan(f.maxLayers) { + if f.provisional.muted || !f.provisional.maxLayers.IsValid() || layers.GreaterThan(f.provisional.maxLayers) { return 0 } @@ -571,8 +586,8 @@ func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition() VideoTransitio // what is the highest that is available maximalLayers := InvalidLayers maximalBandwidthRequired := int64(0) - for s := f.maxLayers.spatial; s >= 0; s-- { - for t := f.maxLayers.temporal; t >= 0; t-- { + for s := f.provisional.maxLayers.spatial; s >= 0; s-- { + for t := f.provisional.maxLayers.temporal; t >= 0; t-- { if f.provisional.bitrates[s][t] != 0 { maximalLayers = VideoLayers{spatial: s, temporal: t} maximalBandwidthRequired = f.provisional.bitrates[s][t] @@ -613,8 +628,8 @@ func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition() VideoTransitio // but the cooperative scheme knocks things back to minimal minimalLayers := InvalidLayers bandwidthRequired := int64(0) - for s := int32(0); s <= f.maxLayers.spatial; s++ { - for t := int32(0); t <= f.maxLayers.temporal; t++ { + for s := int32(0); s <= f.provisional.maxLayers.spatial; s++ { + for t := int32(0); t <= f.provisional.maxLayers.temporal; t++ { if f.provisional.bitrates[s][t] != 0 { minimalLayers = VideoLayers{spatial: s, temporal: t} bandwidthRequired = f.provisional.bitrates[s][t] @@ -669,8 +684,8 @@ func (f *Forwarder) ProvisionalAllocateGetBestWeightedTransition() VideoTransiti } maxReachableLayerTemporal := InvalidLayerTemporal - for t := f.maxLayers.temporal; t >= 0; t-- { - for s := f.maxLayers.spatial; s >= 0; s-- { + for t := f.provisional.maxLayers.temporal; t >= 0; t-- { + for s := f.provisional.maxLayers.spatial; s >= 0; s-- { if f.provisional.bitrates[s][t] != 0 { maxReachableLayerTemporal = t break diff --git a/pkg/sfu/forwarder_test.go b/pkg/sfu/forwarder_test.go index 918d7544b..39e38b9c2 100644 --- a/pkg/sfu/forwarder_test.go +++ b/pkg/sfu/forwarder_test.go @@ -62,17 +62,18 @@ func TestForwarderLayersVideo(t *testing.T) { f := newForwarder(testutils.TestVP8Codec, webrtc.RTPCodecTypeVideo) maxLayers := f.MaxLayers() - expectedLayers := VideoLayers{ - spatial: DefaultMaxLayerSpatial, - temporal: DefaultMaxLayerTemporal, - } + expectedLayers := VideoLayers{spatial: InvalidLayerSpatial, temporal: DefaultMaxLayerTemporal} require.Equal(t, expectedLayers, maxLayers) require.Equal(t, InvalidLayers, f.CurrentLayers()) require.Equal(t, InvalidLayers, f.TargetLayers()) + expectedLayers = VideoLayers{ + spatial: DefaultMaxLayerSpatial, + temporal: DefaultMaxLayerTemporal, + } changed, maxLayers, currentLayers := f.SetMaxSpatialLayer(DefaultMaxLayerSpatial) - require.False(t, changed) + require.True(t, changed) require.Equal(t, expectedLayers, maxLayers) require.Equal(t, InvalidLayers, currentLayers) @@ -89,10 +90,6 @@ func TestForwarderLayersVideo(t *testing.T) { f.currentLayers = VideoLayers{spatial: 0, temporal: 1} changed, maxLayers, currentLayers = f.SetMaxSpatialLayer(DefaultMaxLayerSpatial - 1) require.False(t, changed) - expectedLayers = VideoLayers{ - spatial: DefaultMaxLayerSpatial - 1, - temporal: DefaultMaxLayerTemporal, - } require.Equal(t, expectedLayers, maxLayers) require.Equal(t, expectedLayers, f.MaxLayers()) require.Equal(t, VideoLayers{spatial: 0, temporal: 1}, currentLayers) @@ -115,6 +112,8 @@ func TestForwarderLayersVideo(t *testing.T) { func TestForwarderGetForwardingStatus(t *testing.T) { f := newForwarder(testutils.TestVP8Codec, webrtc.RTPCodecTypeVideo) + f.SetMaxSpatialLayer(DefaultMaxLayerSpatial) + f.SetMaxTemporalLayer(DefaultMaxLayerTemporal) // no available layers, should be optimal require.Equal(t, ForwardingStatusOptimal, f.GetForwardingStatus()) @@ -163,6 +162,8 @@ func TestForwarderUpTrackLayersChange(t *testing.T) { func TestForwarderAllocate(t *testing.T) { f := newForwarder(testutils.TestVP8Codec, webrtc.RTPCodecTypeVideo) + f.SetMaxSpatialLayer(DefaultMaxLayerSpatial) + f.SetMaxTemporalLayer(DefaultMaxLayerTemporal) emptyBitrates := Bitrates{} bitrates := Bitrates{ @@ -254,6 +255,8 @@ func TestForwarderAllocate(t *testing.T) { func TestForwarderProvisionalAllocate(t *testing.T) { f := newForwarder(testutils.TestVP8Codec, webrtc.RTPCodecTypeVideo) + f.SetMaxSpatialLayer(DefaultMaxLayerSpatial) + f.SetMaxTemporalLayer(DefaultMaxLayerTemporal) availableLayers := []int32{0, 1, 2} bitrates := Bitrates{ @@ -365,6 +368,8 @@ func TestForwarderProvisionalAllocateMute(t *testing.T) { func TestForwarderProvisionalAllocateGetCooperativeTransition(t *testing.T) { f := newForwarder(testutils.TestVP8Codec, webrtc.RTPCodecTypeVideo) + f.SetMaxSpatialLayer(DefaultMaxLayerSpatial) + f.SetMaxTemporalLayer(DefaultMaxLayerTemporal) availableLayers := []int32{0, 1, 2} bitrates := Bitrates{ @@ -460,6 +465,8 @@ func TestForwarderProvisionalAllocateGetCooperativeTransition(t *testing.T) { func TestForwarderProvisionalAllocateGetBestWeightedTransition(t *testing.T) { f := newForwarder(testutils.TestVP8Codec, webrtc.RTPCodecTypeVideo) + f.SetMaxSpatialLayer(DefaultMaxLayerSpatial) + f.SetMaxTemporalLayer(DefaultMaxLayerTemporal) bitrates := Bitrates{ {1, 2, 3, 4}, @@ -482,6 +489,8 @@ func TestForwarderProvisionalAllocateGetBestWeightedTransition(t *testing.T) { func TestForwarderAllocateNextHigher(t *testing.T) { f := newForwarder(testutils.TestOpusCodec, webrtc.RTPCodecTypeAudio) + f.SetMaxSpatialLayer(DefaultMaxLayerSpatial) + f.SetMaxTemporalLayer(DefaultMaxLayerTemporal) emptyBitrates := Bitrates{} bitrates := Bitrates{ @@ -495,6 +504,8 @@ func TestForwarderAllocateNextHigher(t *testing.T) { require.False(t, boosted) f = newForwarder(testutils.TestVP8Codec, webrtc.RTPCodecTypeVideo) + f.SetMaxSpatialLayer(DefaultMaxLayerSpatial) + f.SetMaxTemporalLayer(DefaultMaxLayerTemporal) // when not in deficient state, does not boost f.lastAllocation.state = VideoAllocationStateNone @@ -688,6 +699,8 @@ func TestForwarderAllocateNextHigher(t *testing.T) { func TestForwarderPause(t *testing.T) { f := newForwarder(testutils.TestVP8Codec, webrtc.RTPCodecTypeVideo) + f.SetMaxSpatialLayer(DefaultMaxLayerSpatial) + f.SetMaxTemporalLayer(DefaultMaxLayerTemporal) availableLayers := []int32{0, 1, 2} bitrates := Bitrates{ @@ -720,6 +733,8 @@ func TestForwarderPause(t *testing.T) { func TestForwarderPauseMute(t *testing.T) { f := newForwarder(testutils.TestVP8Codec, webrtc.RTPCodecTypeVideo) + f.SetMaxSpatialLayer(DefaultMaxLayerSpatial) + f.SetMaxTemporalLayer(DefaultMaxLayerTemporal) availableLayers := []int32{0, 1, 2} bitrates := Bitrates{