Deficient state handling when a track needs a change (#261)

* WIP commit

* deficient handling

* Add missing ProvisionalAllocatePrepare

* adjust state on track removal

* Increase test timeout

* - Add comments about cooperative routines
- Take down transition if available in cooperative scheme
- Use layer comparison when taking down transition. Because of when the
  bitrate is measured, it is not always guaranteed bandwidthDelta is -ve
  when moving down.
- Do not add track to stream allocator till bind.

* make comment better

* a bit more clear comments

* Use OnBind on subscribed track
This commit is contained in:
Raja Subramanian
2021-12-16 10:58:34 +05:30
committed by GitHub
parent c08d1e9d72
commit e504b6678c
10 changed files with 509 additions and 26 deletions
+1
View File
@@ -248,6 +248,7 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error {
downTrack.SetTransceiver(transceiver)
// when outtrack is bound, start loop to send reports
downTrack.OnBind(func() {
go subTrack.Bound()
go t.sendDownTrackBindingReports(sub)
})
downTrack.OnPacketSent(func(_ *sfu.DownTrack, size int) {
+3 -1
View File
@@ -797,7 +797,9 @@ func (p *ParticipantImpl) AddSubscribedTrack(subTrack types.SubscribedTrack) {
p.subscribedTracks[subTrack.ID()] = subTrack
p.lock.Unlock()
p.subscriber.AddTrack(subTrack)
subTrack.OnBind(func() {
p.subscriber.AddTrack(subTrack)
})
p.subscribedTo.Store(subTrack.PublisherIdentity(), struct{}{})
}
+12
View File
@@ -24,6 +24,8 @@ type SubscribedTrack struct {
pubMuted utils.AtomicFlag
settings atomic.Value // *livekit.UpdateTrackSettings
onBind func()
debouncer func(func())
}
@@ -36,6 +38,16 @@ func NewSubscribedTrack(mediaTrack types.MediaTrack, publisherIdentity string, d
}
}
func (t *SubscribedTrack) OnBind(f func()) {
t.onBind = f
}
func (t *SubscribedTrack) Bound() {
if t.onBind != nil {
t.onBind()
}
}
func (t *SubscribedTrack) ID() string {
return t.dt.ID()
}
+1
View File
@@ -139,6 +139,7 @@ type PublishedTrack interface {
//counterfeiter:generate . SubscribedTrack
type SubscribedTrack interface {
OnBind(f func())
ID() string
PublisherIdentity() string
DownTrack() *sfu.DownTrack
@@ -40,6 +40,11 @@ type FakeSubscribedTrack struct {
isMutedReturnsOnCall map[int]struct {
result1 bool
}
OnBindStub func(func())
onBindMutex sync.RWMutex
onBindArgsForCall []struct {
arg1 func()
}
PublishedTrackStub func() types.MediaTrack
publishedTrackMutex sync.RWMutex
publishedTrackArgsForCall []struct {
@@ -247,6 +252,38 @@ func (fake *FakeSubscribedTrack) IsMutedReturnsOnCall(i int, result1 bool) {
}{result1}
}
func (fake *FakeSubscribedTrack) OnBind(arg1 func()) {
fake.onBindMutex.Lock()
fake.onBindArgsForCall = append(fake.onBindArgsForCall, struct {
arg1 func()
}{arg1})
stub := fake.OnBindStub
fake.recordInvocation("OnBind", []interface{}{arg1})
fake.onBindMutex.Unlock()
if stub != nil {
fake.OnBindStub(arg1)
}
}
func (fake *FakeSubscribedTrack) OnBindCallCount() int {
fake.onBindMutex.RLock()
defer fake.onBindMutex.RUnlock()
return len(fake.onBindArgsForCall)
}
func (fake *FakeSubscribedTrack) OnBindCalls(stub func(func())) {
fake.onBindMutex.Lock()
defer fake.onBindMutex.Unlock()
fake.OnBindStub = stub
}
func (fake *FakeSubscribedTrack) OnBindArgsForCall(i int) func() {
fake.onBindMutex.RLock()
defer fake.onBindMutex.RUnlock()
argsForCall := fake.onBindArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeSubscribedTrack) PublishedTrack() types.MediaTrack {
fake.publishedTrackMutex.Lock()
ret, specificReturn := fake.publishedTrackReturnsOnCall[len(fake.publishedTrackArgsForCall)]
@@ -503,6 +540,8 @@ func (fake *FakeSubscribedTrack) Invocations() map[string][][]interface{} {
defer fake.iDMutex.RUnlock()
fake.isMutedMutex.RLock()
defer fake.isMutedMutex.RUnlock()
fake.onBindMutex.RLock()
defer fake.onBindMutex.RUnlock()
fake.publishedTrackMutex.RLock()
defer fake.publishedTrackMutex.RUnlock()
fake.publisherIdentityMutex.RLock()
+9 -1
View File
@@ -287,7 +287,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error {
}
if d.sequencer != nil {
meta := d.sequencer.push(extPkt.Packet.SequenceNumber, tp.rtp.sequenceNumber, tp.rtp.timestamp, 0, extPkt.Head)
meta := d.sequencer.push(extPkt.Packet.SequenceNumber, tp.rtp.sequenceNumber, tp.rtp.timestamp, uint8(layer), extPkt.Head)
if meta != nil && tp.vp8 != nil {
meta.packVP8(tp.vp8.header)
}
@@ -550,6 +550,14 @@ func (d *DownTrack) ProvisionalAllocate(availableChannelCapacity int64, layers V
return d.forwarder.ProvisionalAllocate(availableChannelCapacity, layers)
}
func (d *DownTrack) ProvisionalAllocateGetCooperativeTransition() VideoTransition {
return d.forwarder.ProvisionalAllocateGetCooperativeTransition()
}
func (d *DownTrack) ProvisionalAllocateGetBestWeightedTransition() VideoTransition {
return d.forwarder.ProvisionalAllocateGetBestWeightedTransition()
}
func (d *DownTrack) ProvisionalAllocateCommit() VideoAllocation {
return d.forwarder.ProvisionalAllocateCommit()
}
+206 -13
View File
@@ -112,18 +112,16 @@ type VideoAllocationProvisional struct {
bitrates Bitrates
}
const (
SpatialTransitionCost = 10
TemporalTransitionCost = 0
)
type VideoAllocationMove struct {
layers VideoLayers
deltaBitrate int64
transitionCost int32
qualityCost int32
type VideoTransition struct {
from VideoLayers
to VideoLayers
bandwidthDelta int64
}
const (
TransitionCostSpatial = 10
)
type TranslationParams struct {
shouldDrop bool
isDroppingRelevant bool
@@ -520,6 +518,203 @@ func (f *Forwarder) ProvisionalAllocate(availableChannelCapacity int64, layers V
return 0
}
func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition() VideoTransition {
//
// This is called when a track needs a change (could be mute/unmute, subscribed layers changed, published layers changed)
// when channel is congested.
//
// The goal is to provide a co-operative transition. Co-operative stream allocation aims to keep all the streams active
// as much as possible.
//
// When channel is congested, effecting a transition which will consume more bits will lead to more congestion.
// So, this routine does the following
// 1. When muting, it is not going to increase consumption.
// 2. If the stream is currently active and the transition needs more bits (higher layers = more bits), do not make the up move.
// The higher layer requirement could be due to a new published layer becoming available or subscribed layers changing.
// 3. If the new target layers are lower than current target, take the move down and save bits.
// 4. If not currently streaming, find the minimum layers that can unpause the stream.
//
// To summarize, co-operative streaming means
// - Try to keep tracks streaming, i. e. no pauses even if not at optimal layers
// - Do not make an upgrade as it could affect other tracks
//
f.lock.Lock()
defer f.lock.Unlock()
if f.provisional.muted {
f.provisional.layers = InvalidLayers
return VideoTransition{
from: f.targetLayers,
to: InvalidLayers,
bandwidthDelta: 0 - f.lastAllocation.bandwidthRequested,
// LK-TODO should this take current bitrate of current target layers?
}
}
// check if we should preserve current target
if f.targetLayers != InvalidLayers {
// what is the highest that is available
maximalLayers := InvalidLayers
maximalBandwidthRequired := int64(0)
for s := f.maxLayers.spatial; s >= 0; s-- {
for t := f.maxLayers.temporal; t >= 0; t-- {
if f.provisional.bitrates[s][t] != 0 {
maximalLayers = VideoLayers{spatial: s, temporal: t}
maximalBandwidthRequired = f.provisional.bitrates[s][t]
break
}
}
if maximalBandwidthRequired != 0 {
break
}
}
if maximalLayers != InvalidLayers {
if !f.targetLayers.GreaterThan(maximalLayers) && (f.provisional.bitrates[f.targetLayers.spatial][f.targetLayers.temporal] != 0) {
// currently streaming and wanting an upgrade, just preserve current target in the cooperative scheme of things
f.provisional.layers = f.targetLayers
return VideoTransition{
from: f.targetLayers,
to: f.targetLayers,
bandwidthDelta: 0,
}
}
if f.targetLayers.GreaterThan(maximalLayers) {
// maximalLayers <= f.targetLayers, make the down move
f.provisional.layers = maximalLayers
return VideoTransition{
from: f.targetLayers,
to: maximalLayers,
bandwidthDelta: maximalBandwidthRequired - f.lastAllocation.bandwidthRequested,
}
}
}
}
// currently not streaming, find minimal
// NOTE: a layer in feed could have paused and there could be other options than going back to minimal, but the cooperative scheme knocks things back to minimal
minimalLayers := InvalidLayers
bandwidthRequired := int64(0)
for s := int32(0); s <= f.maxLayers.spatial; s++ {
for t := int32(0); s <= f.maxLayers.temporal; t++ {
if f.provisional.bitrates[s][t] != 0 {
minimalLayers = VideoLayers{spatial: s, temporal: t}
bandwidthRequired = f.provisional.bitrates[s][t]
break
}
}
if bandwidthRequired != 0 {
break
}
}
targetLayers := f.targetLayers
if targetLayers == InvalidLayers || targetLayers.GreaterThan(minimalLayers) || (f.provisional.bitrates[targetLayers.spatial][targetLayers.temporal] == 0) {
targetLayers = minimalLayers
}
f.provisional.layers = targetLayers
return VideoTransition{
from: f.targetLayers,
to: targetLayers,
bandwidthDelta: bandwidthRequired - f.lastAllocation.bandwidthRequested,
}
}
func (f *Forwarder) ProvisionalAllocateGetBestWeightedTransition() VideoTransition {
//
// This is called when a track needs a change (could be mute/unmute, subscribed layers changed, published layers changed)
// when channel is congested.
//
// The goal is to keep all tracks streaming as much as possible. So, the track that needs a change needs bits to be unpaused.
//
// This tries to figure out how much this track can contribute back to the pool to enable the track that needs to be unpaused.
// 1. Track muted OR feed dry - can contribute everything back in case it was using bits.
// 2. Look at all possible down transitions from current target and find the best offer.
// Best offer is calculated as bits saved moving to a down layer divided by cost.
// Cost has two components
// a. Transition cost: Spatial layer switch is expensive due to key frame requiremnt, but temporal layer switch is free.
// b. Quality cost: The farther away from desired layers, the higher the quality cost.
//
f.lock.Lock()
defer f.lock.Unlock()
if f.provisional.muted {
f.provisional.layers = InvalidLayers
return VideoTransition{
from: f.targetLayers,
to: InvalidLayers,
bandwidthDelta: 0 - f.lastAllocation.bandwidthRequested,
// LK-TODO should this take current bitrate of current target layers?
}
}
maxReachableLayerTemporal := int32(-1)
for t := f.maxLayers.temporal; t >= 0; t-- {
for s := f.maxLayers.spatial; s >= 0; s-- {
if f.provisional.bitrates[s][t] != 0 {
maxReachableLayerTemporal = t
break
}
}
if maxReachableLayerTemporal != -1 {
break
}
}
if maxReachableLayerTemporal == -1 {
// feed has gone dry,
f.provisional.layers = InvalidLayers
return VideoTransition{
from: f.targetLayers,
to: InvalidLayers,
bandwidthDelta: 0 - f.lastAllocation.bandwidthRequested,
}
}
// starting from mimimum to target, find transition which gives the best
// transition taking into account bits saved vs cost of such a transition
bestLayers := InvalidLayers
bestBandwidthDelta := int64(0)
bestValue := float32(0)
for s := int32(0); s <= f.targetLayers.spatial; s++ {
for t := int32(0); t <= f.targetLayers.temporal; t++ {
if s == f.targetLayers.spatial && t == f.targetLayers.temporal {
break
}
bandwidthDelta := int64(math.Max(float64(0), float64(f.lastAllocation.bandwidthRequested-f.provisional.bitrates[s][t])))
transitionCost := int32(0)
if f.targetLayers.spatial != s {
transitionCost = TransitionCostSpatial
}
qualityCost := (maxReachableLayerTemporal+1)*(f.targetLayers.spatial-s) + (f.targetLayers.temporal - t)
value := float32(0)
if (transitionCost + qualityCost) != 0 {
value = float32(bandwidthDelta) / float32(transitionCost+qualityCost)
}
if value > bestValue || (value == bestValue && bandwidthDelta > bestBandwidthDelta) {
bestValue = value
bestBandwidthDelta = bandwidthDelta
bestLayers = VideoLayers{spatial: s, temporal: t}
}
}
}
f.provisional.layers = bestLayers
return VideoTransition{
from: f.targetLayers,
to: bestLayers,
bandwidthDelta: bestBandwidthDelta,
}
}
func (f *Forwarder) ProvisionalAllocateCommit() VideoAllocation {
f.lock.Lock()
defer f.lock.Unlock()
@@ -531,7 +726,7 @@ func (f *Forwarder) ProvisionalAllocateCommit() VideoAllocation {
bandwidthRequested := int64(0)
switch {
case f.muted:
case f.provisional.muted:
state = VideoAllocationStateMuted
case optimalBandwidthNeeded == 0:
if len(f.availableLayers) == 0 {
@@ -582,8 +777,6 @@ func (f *Forwarder) ProvisionalAllocateCommit() VideoAllocation {
f.currentLayers = InvalidLayers
}
f.provisional = nil
return f.lastAllocation
}
+115
View File
@@ -340,6 +340,121 @@ func TestForwarderProvisionalAllocateMute(t *testing.T) {
require.Equal(t, InvalidLayers, f.TargetLayers())
}
func TestForwarderProvisionalAllocateGetCooperativeTransition(t *testing.T) {
f := NewForwarder(testutils.TestVP8Codec, webrtc.RTPCodecTypeVideo)
bitrates := Bitrates{
{1, 2, 3, 4},
{5, 6, 7, 8},
{9, 10, 0, 0},
}
f.ProvisionalAllocatePrepare(bitrates)
// from scratch (InvalidLayers) should give back layer (0, 0)
expectedTransition := VideoTransition{
from: InvalidLayers,
to: VideoLayers{spatial: 0, temporal: 0},
bandwidthDelta: 1,
}
transition := f.ProvisionalAllocateGetCooperativeTransition()
require.Equal(t, expectedTransition, transition)
// committing should set target to (0, 0)
expectedLayers := VideoLayers{spatial: 0, temporal: 0}
expectedResult := VideoAllocation{
state: VideoAllocationStateDeficient,
change: VideoStreamingChangeResuming,
bandwidthRequested: 1,
bandwidthDelta: 1,
availableLayers: nil,
bitrates: bitrates,
targetLayers: expectedLayers,
distanceToDesired: 9,
}
result := f.ProvisionalAllocateCommit()
require.Equal(t, expectedResult, result)
require.Equal(t, expectedResult, f.lastAllocation)
require.Equal(t, expectedLayers, f.TargetLayers())
// a higher target that is already streaming, just maintain it
targetLayers := VideoLayers{spatial: 2, temporal: 1}
f.targetLayers = targetLayers
f.lastAllocation.bandwidthRequested = 10
expectedTransition = VideoTransition{
from: targetLayers,
to: targetLayers,
bandwidthDelta: 0,
}
transition = f.ProvisionalAllocateGetCooperativeTransition()
require.Equal(t, expectedTransition, transition)
// committing should set target to (2, 1)
expectedLayers = VideoLayers{spatial: 2, temporal: 1}
expectedResult = VideoAllocation{
state: VideoAllocationStateOptimal,
change: VideoStreamingChangeNone,
bandwidthRequested: 10,
bandwidthDelta: 0,
availableLayers: nil,
bitrates: bitrates,
targetLayers: expectedLayers,
distanceToDesired: 0,
}
result = f.ProvisionalAllocateCommit()
require.Equal(t, expectedResult, result)
require.Equal(t, expectedResult, f.lastAllocation)
require.Equal(t, expectedLayers, f.TargetLayers())
// from a target that has become unavailable, should switch to lower available layer
targetLayers = VideoLayers{spatial: 2, temporal: 2}
f.targetLayers = targetLayers
expectedTransition = VideoTransition{
from: targetLayers,
to: VideoLayers{spatial: 2, temporal: 1},
bandwidthDelta: 0,
}
transition = f.ProvisionalAllocateGetCooperativeTransition()
require.Equal(t, expectedTransition, transition)
f.ProvisionalAllocateCommit()
// mute
f.Mute(true)
f.ProvisionalAllocatePrepare(bitrates)
// mute should send target to InvalidLayers
expectedTransition = VideoTransition{
from: VideoLayers{spatial: 2, temporal: 1},
to: InvalidLayers,
bandwidthDelta: -10,
}
transition = f.ProvisionalAllocateGetCooperativeTransition()
require.Equal(t, expectedTransition, transition)
}
func TestForwarderProvisionalAllocateGetBestWeightedTransition(t *testing.T) {
f := NewForwarder(testutils.TestVP8Codec, webrtc.RTPCodecTypeVideo)
bitrates := Bitrates{
{1, 2, 3, 4},
{5, 6, 7, 8},
{9, 10, 11, 12},
}
f.ProvisionalAllocatePrepare(bitrates)
f.targetLayers = VideoLayers{spatial: 2, temporal: 2}
f.lastAllocation.bandwidthRequested = bitrates[2][2]
expectedTransition := VideoTransition{
from: f.targetLayers,
to: VideoLayers{spatial: 2, temporal: 0},
bandwidthDelta: 2,
}
transition := f.ProvisionalAllocateGetBestWeightedTransition()
require.Equal(t, expectedTransition, transition)
}
func TestForwarderFinalizeAllocate(t *testing.T) {
f := NewForwarder(testutils.TestVP8Codec, webrtc.RTPCodecTypeVideo)
+122 -10
View File
@@ -134,6 +134,10 @@ type Event struct {
Data interface{}
}
func (e Event) String() string {
return fmt.Sprintf("StreamAllocator:Event{signal: %s, data: %s}", e.Signal, e.Data)
}
func NewStreamAllocator(params StreamAllocatorParams) *StreamAllocator {
s := &StreamAllocator{
logger: params.Logger,
@@ -390,6 +394,7 @@ func (s *StreamAllocator) handleSignalRemoveTrack(event *Event) {
}
// LK-TODO: use any saved bandwidth to re-distribute
s.adjustState()
}
}
@@ -632,11 +637,90 @@ func (s *StreamAllocator) allocateTrack(track *Track) {
return
}
// LK-TODO-START
// Two possible scenarios
// - Down allocate as necessary, take saved bits and re-distribute
// - If up shifting, try to steal from tracks which are closest to the desired
// LK-TODO-END
//
// In DEFICIENT state,
// 1. Find cooperative transition from track that needs allocation.
// 2. If track is currently streaming at minimum, do not do anything.
// 3. If that track is giving back bits, apply the transition.
// 4. If this track needs more, ask for best offer from others and try to use it.
//
track.ProvisionalAllocatePrepare()
transition := track.ProvisionalAllocateGetCooperativeTransition()
// track is currently streaming at minimum
if transition.bandwidthDelta == 0 {
return
}
// downgrade, giving back bits
if transition.from.GreaterThan(transition.to) {
allocation := track.ProvisionalAllocateCommit()
update := NewStreamStateUpdate()
update.HandleStreamingChange(allocation.change, track)
s.maybeSendUpdate(update)
s.adjustState()
return
// LK-TODO-START
// Should use the bits given back to start any paused track.
// Note layer downgrade may actually have positive delta (i. e. consume more bits)
// because of when the measurement is done. Watch for that.
// LK-TODO-END
}
//
// This track is currently not streaming and needs bits to start.
// Try to redistribute starting with tracks that are closest to their desired.
//
var minDistanceSorted MinDistanceSorter
for _, t := range s.managedVideoTracksSorted {
if t != track {
minDistanceSorted = append(minDistanceSorted, t)
}
}
sort.Sort(minDistanceSorted)
bandwidthAcquired := int64(0)
var contributingTracks []*Track
for _, t := range minDistanceSorted {
t.ProvisionalAllocatePrepare()
}
for _, t := range minDistanceSorted {
tx := t.ProvisionalAllocateGetBestWeightedTransition()
if tx.bandwidthDelta < 0 {
contributingTracks = append(contributingTracks, t)
bandwidthAcquired += -tx.bandwidthDelta
if bandwidthAcquired >= transition.bandwidthDelta {
break
}
}
}
if bandwidthAcquired < transition.bandwidthDelta {
// could not get enough from other tracks, let probing deal with starting the track
return
}
// commit the tracks that contributed
update := NewStreamStateUpdate()
for _, t := range contributingTracks {
allocation := t.ProvisionalAllocateCommit()
update.HandleStreamingChange(allocation.change, t)
}
// commit the track that needs change
allocation := track.ProvisionalAllocateCommit()
update.HandleStreamingChange(allocation.change, track)
// LK-TODO if got too much extra, can potentially give it to some deficient track
s.maybeSendUpdate(update)
s.adjustState()
}
func (s *StreamAllocator) allocateAllTracks() {
@@ -727,6 +811,10 @@ func (s *StreamAllocator) maybeSendUpdate(update *StreamStateUpdate) {
}
func (s *StreamAllocator) finalizeTracks() {
for _, t := range s.exemptVideoTracksSorted {
t.FinalizeAllocate()
}
for _, t := range s.managedVideoTracksSorted {
t.FinalizeAllocate()
}
@@ -786,14 +874,14 @@ func (s *StreamAllocator) maybeProbe() {
}
func (s *StreamAllocator) maybeBoostLayer() {
var distanceSorted MaxDistanceSorter
var maxDistanceSorted MaxDistanceSorter
for _, track := range s.managedVideoTracksSorted {
distanceSorted = append(distanceSorted, track)
maxDistanceSorted = append(maxDistanceSorted, track)
}
sort.Sort(distanceSorted)
sort.Sort(maxDistanceSorted)
// boost first deficient track in priority order
for _, track := range distanceSorted {
for _, track := range maxDistanceSorted {
if !track.IsDeficient() {
continue
}
@@ -824,7 +912,7 @@ func (s *StreamAllocator) isTimeToBoost() bool {
}
func (s *StreamAllocator) resetBoost() {
s.lastBoostTime = time.Now()
s.lastBoostTime = time.Time{}
}
func (s *StreamAllocator) maybeGratuitousProbe() bool {
@@ -992,6 +1080,14 @@ func (t *Track) ProvisionalAllocate(availableChannelCapacity int64, layers Video
return t.downTrack.ProvisionalAllocate(availableChannelCapacity, layers)
}
func (t *Track) ProvisionalAllocateGetCooperativeTransition() VideoTransition {
return t.downTrack.ProvisionalAllocateGetCooperativeTransition()
}
func (t *Track) ProvisionalAllocateGetBestWeightedTransition() VideoTransition {
return t.downTrack.ProvisionalAllocateGetBestWeightedTransition()
}
func (t *Track) ProvisionalAllocateCommit() VideoAllocation {
return t.downTrack.ProvisionalAllocateCommit()
}
@@ -1057,3 +1153,19 @@ func (m MaxDistanceSorter) Less(i, j int) bool {
}
//------------------------------------------------
type MinDistanceSorter []*Track
func (m MinDistanceSorter) Len() int {
return len(m)
}
func (m MinDistanceSorter) Swap(i, j int) {
m[i], m[j] = m[j], m[i]
}
func (m MinDistanceSorter) Less(i, j int) bool {
return m[i].DistanceToDesired() < m[j].DistanceToDesired()
}
//------------------------------------------------
+1 -1
View File
@@ -10,7 +10,7 @@ import (
var (
SyncDelay = 100 * time.Millisecond
ConnectTimeout = 10 * time.Second
ConnectTimeout = 30 * time.Second
)
func WithTimeout(t *testing.T, description string, f func() bool) bool {