mirror of
https://github.com/livekit/livekit.git
synced 2026-04-04 16:55:45 +00:00
Prevent erroneous stream pause. (#2008)
This commit is contained in:
@@ -1135,7 +1135,7 @@ func (d *DownTrack) ProvisionalAllocateReset() {
|
||||
d.forwarder.ProvisionalAllocateReset()
|
||||
}
|
||||
|
||||
func (d *DownTrack) ProvisionalAllocate(availableChannelCapacity int64, layers buffer.VideoLayer, allowPause bool, allowOvershoot bool) int64 {
|
||||
func (d *DownTrack) ProvisionalAllocate(availableChannelCapacity int64, layers buffer.VideoLayer, allowPause bool, allowOvershoot bool) (bool, int64) {
|
||||
return d.forwarder.ProvisionalAllocate(availableChannelCapacity, layers, allowPause, allowOvershoot)
|
||||
}
|
||||
|
||||
|
||||
@@ -728,7 +728,7 @@ func (f *Forwarder) ProvisionalAllocateReset() {
|
||||
f.provisional.allocatedLayer = buffer.InvalidLayer
|
||||
}
|
||||
|
||||
func (f *Forwarder) ProvisionalAllocate(availableChannelCapacity int64, layer buffer.VideoLayer, allowPause bool, allowOvershoot bool) int64 {
|
||||
func (f *Forwarder) ProvisionalAllocate(availableChannelCapacity int64, layer buffer.VideoLayer, allowPause bool, allowOvershoot bool) (bool, int64) {
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
|
||||
@@ -737,12 +737,12 @@ func (f *Forwarder) ProvisionalAllocate(availableChannelCapacity int64, layer bu
|
||||
f.provisional.maxSeenLayer.Spatial == buffer.InvalidLayerSpatial ||
|
||||
!f.provisional.maxLayer.IsValid() ||
|
||||
((!allowOvershoot || !f.vls.IsOvershootOkay()) && layer.GreaterThan(f.provisional.maxLayer)) {
|
||||
return 0
|
||||
return false, 0
|
||||
}
|
||||
|
||||
requiredBitrate := f.provisional.Bitrates[layer.Spatial][layer.Temporal]
|
||||
if requiredBitrate == 0 {
|
||||
return 0
|
||||
return false, 0
|
||||
}
|
||||
|
||||
alreadyAllocatedBitrate := int64(0)
|
||||
@@ -753,7 +753,7 @@ func (f *Forwarder) ProvisionalAllocate(availableChannelCapacity int64, layer bu
|
||||
// a layer under maximum fits, take it
|
||||
if !layer.GreaterThan(f.provisional.maxLayer) && requiredBitrate <= (availableChannelCapacity+alreadyAllocatedBitrate) {
|
||||
f.provisional.allocatedLayer = layer
|
||||
return requiredBitrate - alreadyAllocatedBitrate
|
||||
return true, requiredBitrate - alreadyAllocatedBitrate
|
||||
}
|
||||
|
||||
//
|
||||
@@ -766,10 +766,10 @@ func (f *Forwarder) ProvisionalAllocate(availableChannelCapacity int64, layer bu
|
||||
//
|
||||
if !allowPause && (!f.provisional.allocatedLayer.IsValid() || !layer.GreaterThan(f.provisional.allocatedLayer)) {
|
||||
f.provisional.allocatedLayer = layer
|
||||
return requiredBitrate - alreadyAllocatedBitrate
|
||||
return true, requiredBitrate - alreadyAllocatedBitrate
|
||||
}
|
||||
|
||||
return 0
|
||||
return false, 0
|
||||
}
|
||||
|
||||
func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot bool) VideoTransition {
|
||||
|
||||
@@ -400,20 +400,25 @@ func TestForwarderProvisionalAllocate(t *testing.T) {
|
||||
|
||||
f.ProvisionalAllocatePrepare(nil, bitrates)
|
||||
|
||||
usedBitrate := f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, true, false)
|
||||
isCandidate, usedBitrate := f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, true, false)
|
||||
require.True(t, isCandidate)
|
||||
require.Equal(t, bitrates[0][0], usedBitrate)
|
||||
|
||||
usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 2, Temporal: 3}, true, false)
|
||||
isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 2, Temporal: 3}, true, false)
|
||||
require.True(t, isCandidate)
|
||||
require.Equal(t, bitrates[2][3]-bitrates[0][0], usedBitrate)
|
||||
|
||||
usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 3}, true, false)
|
||||
isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 3}, true, false)
|
||||
require.True(t, isCandidate)
|
||||
require.Equal(t, bitrates[0][3]-bitrates[2][3], usedBitrate)
|
||||
|
||||
usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 2}, true, false)
|
||||
isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 2}, true, false)
|
||||
require.True(t, isCandidate)
|
||||
require.Equal(t, bitrates[1][2]-bitrates[0][3], usedBitrate)
|
||||
|
||||
// available not enough to reach (2, 2), allocating at (2, 2) should not succeed
|
||||
usedBitrate = f.ProvisionalAllocate(bitrates[2][2]-bitrates[1][2]-1, buffer.VideoLayer{Spatial: 2, Temporal: 2}, true, false)
|
||||
isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][2]-bitrates[1][2]-1, buffer.VideoLayer{Spatial: 2, Temporal: 2}, true, false)
|
||||
require.False(t, isCandidate)
|
||||
require.Equal(t, int64(0), usedBitrate)
|
||||
|
||||
// committing should set target to (1, 2)
|
||||
@@ -440,7 +445,8 @@ func TestForwarderProvisionalAllocate(t *testing.T) {
|
||||
// when nothing fits and pausing disallowed, should allocate (0, 0)
|
||||
f.vls.SetTarget(buffer.InvalidLayer)
|
||||
f.ProvisionalAllocatePrepare(nil, bitrates)
|
||||
usedBitrate = f.ProvisionalAllocate(0, buffer.VideoLayer{Spatial: 0, Temporal: 0}, false, false)
|
||||
isCandidate, usedBitrate = f.ProvisionalAllocate(0, buffer.VideoLayer{Spatial: 0, Temporal: 0}, false, false)
|
||||
require.True(t, isCandidate)
|
||||
require.Equal(t, int64(1), usedBitrate)
|
||||
|
||||
// committing should set target to (0, 0)
|
||||
@@ -477,15 +483,18 @@ func TestForwarderProvisionalAllocate(t *testing.T) {
|
||||
|
||||
f.ProvisionalAllocatePrepare(nil, bitrates)
|
||||
|
||||
usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, false, true)
|
||||
isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, false, true)
|
||||
require.False(t, isCandidate)
|
||||
require.Equal(t, int64(0), usedBitrate)
|
||||
|
||||
// overshoot should succeed
|
||||
usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 2, Temporal: 3}, false, true)
|
||||
isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 2, Temporal: 3}, false, true)
|
||||
require.True(t, isCandidate)
|
||||
require.Equal(t, bitrates[2][3], usedBitrate)
|
||||
|
||||
// overshoot should succeed - this should win as this is lesser overshoot
|
||||
usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 3}, false, true)
|
||||
isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 3}, false, true)
|
||||
require.True(t, isCandidate)
|
||||
require.Equal(t, bitrates[1][3]-bitrates[2][3], usedBitrate)
|
||||
|
||||
// committing should set target to (1, 3)
|
||||
@@ -524,15 +533,18 @@ func TestForwarderProvisionalAllocate(t *testing.T) {
|
||||
f.ProvisionalAllocatePrepare(nil, bitrates)
|
||||
|
||||
// all the provisional allocations should not succeed because the feed is dry
|
||||
usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, false, true)
|
||||
isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, false, true)
|
||||
require.False(t, isCandidate)
|
||||
require.Equal(t, int64(0), usedBitrate)
|
||||
|
||||
// overshoot should not succeed
|
||||
usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 2, Temporal: 3}, false, true)
|
||||
isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 2, Temporal: 3}, false, true)
|
||||
require.False(t, isCandidate)
|
||||
require.Equal(t, int64(0), usedBitrate)
|
||||
|
||||
// overshoot should not succeed
|
||||
usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 3}, false, true)
|
||||
isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 3}, false, true)
|
||||
require.False(t, isCandidate)
|
||||
require.Equal(t, int64(0), usedBitrate)
|
||||
|
||||
// committing should set target to (0, 2), i. e. leave it at current for opportunistic forwarding
|
||||
@@ -562,15 +574,18 @@ func TestForwarderProvisionalAllocate(t *testing.T) {
|
||||
f.ProvisionalAllocatePrepare(nil, bitrates)
|
||||
|
||||
// all the provisional allocations below should not succeed because the feed is dry
|
||||
usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, false, true)
|
||||
isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, false, true)
|
||||
require.False(t, isCandidate)
|
||||
require.Equal(t, int64(0), usedBitrate)
|
||||
|
||||
// overshoot should not succeed
|
||||
usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 2, Temporal: 3}, false, true)
|
||||
isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 2, Temporal: 3}, false, true)
|
||||
require.False(t, isCandidate)
|
||||
require.Equal(t, int64(0), usedBitrate)
|
||||
|
||||
// overshoot should not succeed
|
||||
usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 3}, false, true)
|
||||
isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 3}, false, true)
|
||||
require.False(t, isCandidate)
|
||||
require.Equal(t, int64(0), usedBitrate)
|
||||
|
||||
expectedResult = VideoAllocation{
|
||||
@@ -604,10 +619,12 @@ func TestForwarderProvisionalAllocateMute(t *testing.T) {
|
||||
f.Mute(true)
|
||||
f.ProvisionalAllocatePrepare(nil, bitrates)
|
||||
|
||||
usedBitrate := f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, true, false)
|
||||
isCandidate, usedBitrate := f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 0, Temporal: 0}, true, false)
|
||||
require.False(t, isCandidate)
|
||||
require.Equal(t, int64(0), usedBitrate)
|
||||
|
||||
usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 2}, true, true)
|
||||
isCandidate, usedBitrate = f.ProvisionalAllocate(bitrates[2][3], buffer.VideoLayer{Spatial: 1, Temporal: 2}, true, true)
|
||||
require.False(t, isCandidate)
|
||||
require.Equal(t, int64(0), usedBitrate)
|
||||
|
||||
// committing should set target to buffer.InvalidLayer as track is muted
|
||||
|
||||
@@ -883,9 +883,15 @@ func (s *StreamAllocator) allocateTrack(track *Track) {
|
||||
return
|
||||
}
|
||||
|
||||
// this track is currently not streaming and needs bits to start.
|
||||
// first try an allocation using available headroom
|
||||
availableChannelCapacity := s.getAvailableHeadroom(false)
|
||||
// already streaming at some layer and transition is not requesting any change, i. e. BandwidthDelta == 0
|
||||
if transition.From.IsValid() && transition.BandwidthDelta == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// this track is currently not streaming and needs bits to start OR streaming at some layer and wants more bits.
|
||||
// NOTE: With co-operative transition, tracks should not be asking for more if already streaming, but handle that case any way.
|
||||
// first try an allocation using available headroom, current consumption of this track is discounted to calculate headroom.
|
||||
availableChannelCapacity := s.getAvailableHeadroomWithoutTracks(false, []*Track{track})
|
||||
if availableChannelCapacity > 0 {
|
||||
track.ProvisionalAllocateReset() // to reset allocation from co-operative transition above and try fresh
|
||||
|
||||
@@ -899,21 +905,30 @@ func (s *StreamAllocator) allocateTrack(track *Track) {
|
||||
Temporal: temporal,
|
||||
}
|
||||
|
||||
usedChannelCapacity := track.ProvisionalAllocate(availableChannelCapacity, layer, s.allowPause, FlagAllowOvershootWhileDeficient)
|
||||
isCandidate, usedChannelCapacity := track.ProvisionalAllocate(
|
||||
availableChannelCapacity,
|
||||
layer,
|
||||
s.allowPause,
|
||||
FlagAllowOvershootWhileDeficient,
|
||||
)
|
||||
if availableChannelCapacity < usedChannelCapacity {
|
||||
break alloc_loop
|
||||
}
|
||||
|
||||
bestLayer = layer
|
||||
if isCandidate {
|
||||
bestLayer = layer
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if bestLayer.IsValid() {
|
||||
// found layer that can fit in available headroom
|
||||
update := NewStreamStateUpdate()
|
||||
allocation := track.ProvisionalAllocateCommit()
|
||||
updateStreamStateChange(track, allocation, update)
|
||||
s.maybeSendUpdate(update)
|
||||
if bestLayer.GreaterThan(transition.From) {
|
||||
// found layer that can fit in available headroom, take it if it is better than existing
|
||||
update := NewStreamStateUpdate()
|
||||
allocation := track.ProvisionalAllocateCommit()
|
||||
updateStreamStateChange(track, allocation, update)
|
||||
s.maybeSendUpdate(update)
|
||||
}
|
||||
|
||||
s.adjustState()
|
||||
return
|
||||
@@ -923,11 +938,6 @@ func (s *StreamAllocator) allocateTrack(track *Track) {
|
||||
transition = track.ProvisionalAllocateGetCooperativeTransition(FlagAllowOvershootWhileDeficient) // get transition again to reset above allocation attempt using available headroom
|
||||
}
|
||||
|
||||
// track is currently streaming at minimum
|
||||
if transition.BandwidthDelta == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// if there is not enough headroom, try to redistribute starting with tracks that are closest to their desired.
|
||||
bandwidthAcquired := int64(0)
|
||||
var contributingTracks []*Track
|
||||
@@ -1018,17 +1028,31 @@ func (s *StreamAllocator) maybeBoostDeficientTracks() {
|
||||
|
||||
update := NewStreamStateUpdate()
|
||||
|
||||
for _, track := range s.getMaxDistanceSortedDeficient() {
|
||||
allocation, boosted := track.AllocateNextHigher(availableChannelCapacity, FlagAllowOvershootInCatchup)
|
||||
if !boosted {
|
||||
continue
|
||||
sortedTracks := s.getMaxDistanceSortedDeficient()
|
||||
boost_loop:
|
||||
for {
|
||||
for idx, track := range sortedTracks {
|
||||
allocation, boosted := track.AllocateNextHigher(availableChannelCapacity, FlagAllowOvershootInCatchup)
|
||||
if !boosted {
|
||||
if idx == len(sortedTracks)-1 {
|
||||
// all tracks tried
|
||||
break boost_loop
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
updateStreamStateChange(track, allocation, update)
|
||||
|
||||
availableChannelCapacity -= allocation.BandwidthDelta
|
||||
if availableChannelCapacity <= 0 {
|
||||
break boost_loop
|
||||
}
|
||||
|
||||
break // sort again below as the track that was just boosted could still be farthest from its desired
|
||||
}
|
||||
|
||||
updateStreamStateChange(track, allocation, update)
|
||||
|
||||
availableChannelCapacity -= allocation.BandwidthDelta
|
||||
if availableChannelCapacity <= 0 {
|
||||
break
|
||||
sortedTracks = s.getMaxDistanceSortedDeficient()
|
||||
if len(sortedTracks) == 0 {
|
||||
break // nothing available to boost
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1103,7 +1127,7 @@ func (s *StreamAllocator) allocateAllTracks() {
|
||||
}
|
||||
|
||||
for _, track := range sorted {
|
||||
usedChannelCapacity := track.ProvisionalAllocate(availableChannelCapacity, layer, s.allowPause, FlagAllowOvershootWhileDeficient)
|
||||
_, usedChannelCapacity := track.ProvisionalAllocate(availableChannelCapacity, layer, s.allowPause, FlagAllowOvershootWhileDeficient)
|
||||
availableChannelCapacity -= usedChannelCapacity
|
||||
if availableChannelCapacity < 0 {
|
||||
availableChannelCapacity = 0
|
||||
@@ -1174,10 +1198,32 @@ func (s *StreamAllocator) getExpectedBandwidthUsage() int64 {
|
||||
return expected
|
||||
}
|
||||
|
||||
func (s *StreamAllocator) getExpectedBandwidthUsageWithoutTracks(filteredTracks []*Track) int64 {
|
||||
expected := int64(0)
|
||||
for _, track := range s.getTracks() {
|
||||
filtered := false
|
||||
for _, ft := range filteredTracks {
|
||||
if ft == track {
|
||||
filtered = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !filtered {
|
||||
expected += track.BandwidthRequested()
|
||||
}
|
||||
}
|
||||
|
||||
return expected
|
||||
}
|
||||
|
||||
func (s *StreamAllocator) getAvailableHeadroom(allowOverride bool) int64 {
|
||||
return s.getAvailableChannelCapacity(allowOverride) - s.getExpectedBandwidthUsage()
|
||||
}
|
||||
|
||||
func (s *StreamAllocator) getAvailableHeadroomWithoutTracks(allowOverride bool, filteredTracks []*Track) int64 {
|
||||
return s.getAvailableChannelCapacity(allowOverride) - s.getExpectedBandwidthUsageWithoutTracks(filteredTracks)
|
||||
}
|
||||
|
||||
func (s *StreamAllocator) getNackDelta() (uint32, uint32) {
|
||||
aggPacketDelta := uint32(0)
|
||||
aggRepeatedNackDelta := uint32(0)
|
||||
|
||||
@@ -164,7 +164,7 @@ func (t *Track) ProvisionalAllocateReset() {
|
||||
t.downTrack.ProvisionalAllocateReset()
|
||||
}
|
||||
|
||||
func (t *Track) ProvisionalAllocate(availableChannelCapacity int64, layer buffer.VideoLayer, allowPause bool, allowOvershoot bool) int64 {
|
||||
func (t *Track) ProvisionalAllocate(availableChannelCapacity int64, layer buffer.VideoLayer, allowPause bool, allowOvershoot bool) (bool, int64) {
|
||||
return t.downTrack.ProvisionalAllocate(availableChannelCapacity, layer, allowPause, allowOvershoot)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user