From f0ca262bcf987f9e89f0e32d1f06ffdf747be5a4 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Tue, 29 Aug 2023 13:21:57 +0530 Subject: [PATCH] Prevent erroneous stream pause. (#2008) --- pkg/sfu/downtrack.go | 2 +- pkg/sfu/forwarder.go | 12 +-- pkg/sfu/forwarder_test.go | 51 +++++++---- pkg/sfu/streamallocator/streamallocator.go | 98 ++++++++++++++++------ pkg/sfu/streamallocator/track.go | 2 +- 5 files changed, 114 insertions(+), 51 deletions(-) diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 707640bd2..44414d7c8 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -1135,7 +1135,7 @@ func (d *DownTrack) ProvisionalAllocateReset() { d.forwarder.ProvisionalAllocateReset() } -func (d *DownTrack) ProvisionalAllocate(availableChannelCapacity int64, layers buffer.VideoLayer, allowPause bool, allowOvershoot bool) int64 { +func (d *DownTrack) ProvisionalAllocate(availableChannelCapacity int64, layers buffer.VideoLayer, allowPause bool, allowOvershoot bool) (bool, int64) { return d.forwarder.ProvisionalAllocate(availableChannelCapacity, layers, allowPause, allowOvershoot) } diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index eab995bb8..9421f238c 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -728,7 +728,7 @@ func (f *Forwarder) ProvisionalAllocateReset() { f.provisional.allocatedLayer = buffer.InvalidLayer } -func (f *Forwarder) ProvisionalAllocate(availableChannelCapacity int64, layer buffer.VideoLayer, allowPause bool, allowOvershoot bool) int64 { +func (f *Forwarder) ProvisionalAllocate(availableChannelCapacity int64, layer buffer.VideoLayer, allowPause bool, allowOvershoot bool) (bool, int64) { f.lock.Lock() defer f.lock.Unlock() @@ -737,12 +737,12 @@ func (f *Forwarder) ProvisionalAllocate(availableChannelCapacity int64, layer bu f.provisional.maxSeenLayer.Spatial == buffer.InvalidLayerSpatial || !f.provisional.maxLayer.IsValid() || ((!allowOvershoot || !f.vls.IsOvershootOkay()) && layer.GreaterThan(f.provisional.maxLayer)) { - return 0 + return false, 0 } requiredBitrate := f.provisional.Bitrates[layer.Spatial][layer.Temporal] if requiredBitrate == 0 { - return 0 + return false, 0 } alreadyAllocatedBitrate := int64(0) @@ -753,7 +753,7 @@ func (f *Forwarder) ProvisionalAllocate(availableChannelCapacity int64, layer bu // a layer under maximum fits, take it if !layer.GreaterThan(f.provisional.maxLayer) && requiredBitrate <= (availableChannelCapacity+alreadyAllocatedBitrate) { f.provisional.allocatedLayer = layer - return requiredBitrate - alreadyAllocatedBitrate + return true, requiredBitrate - alreadyAllocatedBitrate } // @@ -766,10 +766,10 @@ func (f *Forwarder) ProvisionalAllocate(availableChannelCapacity int64, layer bu // if !allowPause && (!f.provisional.allocatedLayer.IsValid() || !layer.GreaterThan(f.provisional.allocatedLayer)) { f.provisional.allocatedLayer = layer - return requiredBitrate - alreadyAllocatedBitrate + return true, requiredBitrate - alreadyAllocatedBitrate } - return 0 + return false, 0 } func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot bool) VideoTransition { diff --git a/pkg/sfu/forwarder_test.go b/pkg/sfu/forwarder_test.go index 9c3be7c7a..b0e9bc2ea 100644 --- a/pkg/sfu/forwarder_test.go +++ b/pkg/sfu/forwarder_test.go @@ -400,20 +400,25 @@ func TestForwarderProvisionalAllocate(t *testing.T) { f.ProvisionalAllocatePrepare(nil, bitrates) - usedBitrate := f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, true, false) + isCandidate, usedBitrate := f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, true, false) + require.True(t, isCandidate) require.Equal(t, bitrates[0][0], usedBitrate) - usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 2, Temporal: 3}, true, false) + isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 2, Temporal: 3}, true, false) + require.True(t, isCandidate) require.Equal(t, bitrates[2][3]-bitrates[0][0], usedBitrate) - usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 3}, true, false) + isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 3}, true, false) + require.True(t, isCandidate) require.Equal(t, bitrates[0][3]-bitrates[2][3], usedBitrate) - usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 2}, true, false) + isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 2}, true, false) + require.True(t, isCandidate) require.Equal(t, bitrates[1][2]-bitrates[0][3], usedBitrate) // available not enough to reach (2, 2), allocating at (2, 2) should not succeed - usedBitrate = f.ProvisionalAllocate(bitrates[2][2]-bitrates[1][2]-1, buffer.VideoLayer{Spatial: 2, Temporal: 2}, true, false) + isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][2]-bitrates[1][2]-1, buffer.VideoLayer{Spatial: 2, Temporal: 2}, true, false) + require.False(t, isCandidate) require.Equal(t, int64(0), usedBitrate) // committing should set target to (1, 2) @@ -440,7 +445,8 @@ func TestForwarderProvisionalAllocate(t *testing.T) { // when nothing fits and pausing disallowed, should allocate (0, 0) f.vls.SetTarget(buffer.InvalidLayer) f.ProvisionalAllocatePrepare(nil, bitrates) - usedBitrate = f.ProvisionalAllocate(0, buffer.VideoLayer{Spatial: 0, Temporal: 0}, false, false) + isCandidate, usedBitrate = f.ProvisionalAllocate(0, buffer.VideoLayer{Spatial: 0, Temporal: 0}, false, false) + require.True(t, isCandidate) require.Equal(t, int64(1), usedBitrate) // committing should set target to (0, 0) @@ -477,15 +483,18 @@ func TestForwarderProvisionalAllocate(t *testing.T) { f.ProvisionalAllocatePrepare(nil, bitrates) - usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, false, true) + isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, false, true) + require.False(t, isCandidate) require.Equal(t, int64(0), usedBitrate) // overshoot should succeed - usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 2, Temporal: 3}, false, true) + isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 2, Temporal: 3}, false, true) + require.True(t, isCandidate) require.Equal(t, bitrates[2][3], usedBitrate) // overshoot should succeed - this should win as this is lesser overshoot - usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 3}, false, true) + isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 3}, false, true) + require.True(t, isCandidate) require.Equal(t, bitrates[1][3]-bitrates[2][3], usedBitrate) // committing should set target to (1, 3) @@ -524,15 +533,18 @@ func TestForwarderProvisionalAllocate(t *testing.T) { f.ProvisionalAllocatePrepare(nil, bitrates) // all the provisional allocations should not succeed because the feed is dry - usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, false, true) + isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, false, true) + require.False(t, isCandidate) require.Equal(t, int64(0), usedBitrate) // overshoot should not succeed - usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 2, Temporal: 3}, false, true) + isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 2, Temporal: 3}, false, true) + require.False(t, isCandidate) require.Equal(t, int64(0), usedBitrate) // overshoot should not succeed - usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 3}, false, true) + isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 3}, false, true) + require.False(t, isCandidate) require.Equal(t, int64(0), usedBitrate) // committing should set target to (0, 2), i. e. leave it at current for opportunistic forwarding @@ -562,15 +574,18 @@ func TestForwarderProvisionalAllocate(t *testing.T) { f.ProvisionalAllocatePrepare(nil, bitrates) // all the provisional allocations below should not succeed because the feed is dry - usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, false, true) + isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, false, true) + require.False(t, isCandidate) require.Equal(t, int64(0), usedBitrate) // overshoot should not succeed - usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 2, Temporal: 3}, false, true) + isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 2, Temporal: 3}, false, true) + require.False(t, isCandidate) require.Equal(t, int64(0), usedBitrate) // overshoot should not succeed - usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 3}, false, true) + isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 3}, false, true) + require.False(t, isCandidate) require.Equal(t, int64(0), usedBitrate) expectedResult = VideoAllocation{ @@ -604,10 +619,12 @@ func TestForwarderProvisionalAllocateMute(t *testing.T) { f.Mute(true) f.ProvisionalAllocatePrepare(nil, bitrates) - usedBitrate := f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, true, false) + isCandidate, usedBitrate := f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, true, false) + require.False(t, isCandidate) require.Equal(t, int64(0), usedBitrate) - usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 2}, true, true) + isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 2}, true, true) + require.False(t, isCandidate) require.Equal(t, int64(0), usedBitrate) // committing should set target to buffer.InvalidLayer as track is muted diff --git a/pkg/sfu/streamallocator/streamallocator.go b/pkg/sfu/streamallocator/streamallocator.go index 6751f29cb..f9baf7e5c 100644 --- a/pkg/sfu/streamallocator/streamallocator.go +++ b/pkg/sfu/streamallocator/streamallocator.go @@ -883,9 +883,15 @@ func (s *StreamAllocator) allocateTrack(track *Track) { return } - // this track is currently not streaming and needs bits to start. - // first try an allocation using available headroom - availableChannelCapacity := s.getAvailableHeadroom(false) + // already streaming at some layer and transition is not requesting any change, i. e. BandwidthDelta == 0 + if transition.From.IsValid() && transition.BandwidthDelta == 0 { + return + } + + // this track is currently not streaming and needs bits to start OR streaming at some layer and wants more bits. + // NOTE: With co-operative transition, tracks should not be asking for more if already streaming, but handle that case any way. + // first try an allocation using available headroom, current consumption of this track is discounted to calculate headroom. + availableChannelCapacity := s.getAvailableHeadroomWithoutTracks(false, []*Track{track}) if availableChannelCapacity > 0 { track.ProvisionalAllocateReset() // to reset allocation from co-operative transition above and try fresh @@ -899,21 +905,30 @@ func (s *StreamAllocator) allocateTrack(track *Track) { Temporal: temporal, } - usedChannelCapacity := track.ProvisionalAllocate(availableChannelCapacity, layer, s.allowPause, FlagAllowOvershootWhileDeficient) + isCandidate, usedChannelCapacity := track.ProvisionalAllocate( + availableChannelCapacity, + layer, + s.allowPause, + FlagAllowOvershootWhileDeficient, + ) if availableChannelCapacity < usedChannelCapacity { break alloc_loop } - bestLayer = layer + if isCandidate { + bestLayer = layer + } } } if bestLayer.IsValid() { - // found layer that can fit in available headroom - update := NewStreamStateUpdate() - allocation := track.ProvisionalAllocateCommit() - updateStreamStateChange(track, allocation, update) - s.maybeSendUpdate(update) + if bestLayer.GreaterThan(transition.From) { + // found layer that can fit in available headroom, take it if it is better than existing + update := NewStreamStateUpdate() + allocation := track.ProvisionalAllocateCommit() + updateStreamStateChange(track, allocation, update) + s.maybeSendUpdate(update) + } s.adjustState() return @@ -923,11 +938,6 @@ func (s *StreamAllocator) allocateTrack(track *Track) { transition = track.ProvisionalAllocateGetCooperativeTransition(FlagAllowOvershootWhileDeficient) // get transition again to reset above allocation attempt using available headroom } - // track is currently streaming at minimum - if transition.BandwidthDelta == 0 { - return - } - // if there is not enough headroom, try to redistribute starting with tracks that are closest to their desired. bandwidthAcquired := int64(0) var contributingTracks []*Track @@ -1018,17 +1028,31 @@ func (s *StreamAllocator) maybeBoostDeficientTracks() { update := NewStreamStateUpdate() - for _, track := range s.getMaxDistanceSortedDeficient() { - allocation, boosted := track.AllocateNextHigher(availableChannelCapacity, FlagAllowOvershootInCatchup) - if !boosted { - continue + sortedTracks := s.getMaxDistanceSortedDeficient() +boost_loop: + for { + for idx, track := range sortedTracks { + allocation, boosted := track.AllocateNextHigher(availableChannelCapacity, FlagAllowOvershootInCatchup) + if !boosted { + if idx == len(sortedTracks)-1 { + // all tracks tried + break boost_loop + } + continue + } + + updateStreamStateChange(track, allocation, update) + + availableChannelCapacity -= allocation.BandwidthDelta + if availableChannelCapacity <= 0 { + break boost_loop + } + + break // sort again below as the track that was just boosted could still be farthest from its desired } - - updateStreamStateChange(track, allocation, update) - - availableChannelCapacity -= allocation.BandwidthDelta - if availableChannelCapacity <= 0 { - break + sortedTracks = s.getMaxDistanceSortedDeficient() + if len(sortedTracks) == 0 { + break // nothing available to boost } } @@ -1103,7 +1127,7 @@ func (s *StreamAllocator) allocateAllTracks() { } for _, track := range sorted { - usedChannelCapacity := track.ProvisionalAllocate(availableChannelCapacity, layer, s.allowPause, FlagAllowOvershootWhileDeficient) + _, usedChannelCapacity := track.ProvisionalAllocate(availableChannelCapacity, layer, s.allowPause, FlagAllowOvershootWhileDeficient) availableChannelCapacity -= usedChannelCapacity if availableChannelCapacity < 0 { availableChannelCapacity = 0 @@ -1174,10 +1198,32 @@ func (s *StreamAllocator) getExpectedBandwidthUsage() int64 { return expected } +func (s *StreamAllocator) getExpectedBandwidthUsageWithoutTracks(filteredTracks []*Track) int64 { + expected := int64(0) + for _, track := range s.getTracks() { + filtered := false + for _, ft := range filteredTracks { + if ft == track { + filtered = true + break + } + } + if !filtered { + expected += track.BandwidthRequested() + } + } + + return expected +} + func (s *StreamAllocator) getAvailableHeadroom(allowOverride bool) int64 { return s.getAvailableChannelCapacity(allowOverride) - s.getExpectedBandwidthUsage() } +func (s *StreamAllocator) getAvailableHeadroomWithoutTracks(allowOverride bool, filteredTracks []*Track) int64 { + return s.getAvailableChannelCapacity(allowOverride) - s.getExpectedBandwidthUsageWithoutTracks(filteredTracks) +} + func (s *StreamAllocator) getNackDelta() (uint32, uint32) { aggPacketDelta := uint32(0) aggRepeatedNackDelta := uint32(0) diff --git a/pkg/sfu/streamallocator/track.go b/pkg/sfu/streamallocator/track.go index 6ccae215b..528792928 100644 --- a/pkg/sfu/streamallocator/track.go +++ b/pkg/sfu/streamallocator/track.go @@ -164,7 +164,7 @@ func (t *Track) ProvisionalAllocateReset() { t.downTrack.ProvisionalAllocateReset() } -func (t *Track) ProvisionalAllocate(availableChannelCapacity int64, layer buffer.VideoLayer, allowPause bool, allowOvershoot bool) int64 { +func (t *Track) ProvisionalAllocate(availableChannelCapacity int64, layer buffer.VideoLayer, allowPause bool, allowOvershoot bool) (bool, int64) { return t.downTrack.ProvisionalAllocate(availableChannelCapacity, layer, allowPause, allowOvershoot) }