Prioritize down tracks based on max layer(s) (#202)

Use max layer subscription of down tracks to
prioritize bandwidth allocation. The ordering is
- Highest spatial layer
- Highest temporal layer, if spatial layer matches
- Down track id if both layers match (this is to
  prevent unnecessary re-ordering)

Testing:
--------
(Really need to make a DownTrack interface so that
we can mock it and use it in tests)
For now, just a sanity check.
This commit is contained in:
Raja Subramanian
2021-11-22 23:21:26 +05:30
committed by GitHub
parent 391e2f8b31
commit 2f35128ac2
2 changed files with 130 additions and 25 deletions

View File

@@ -160,11 +160,18 @@ func NewDownTrack(c webrtc.RTPCodecCapability, r Receiver, bf *buffer.Factory, p
codec: c,
munger: NewMunger(),
}
if strings.ToLower(c.MimeType) == "video/vp8" {
d.vp8Munger = NewVP8Munger()
}
d.maxSpatialLayer.set(2)
d.maxTemporalLayer.set(2)
if d.Kind() == webrtc.RTPCodecTypeVideo {
d.maxSpatialLayer.set(2)
d.maxTemporalLayer.set(2)
} else {
d.maxSpatialLayer.set(-1)
d.maxTemporalLayer.set(-1)
}
return d, nil
}
@@ -483,6 +490,10 @@ func (d *DownTrack) SetMaxSpatialLayer(spatialLayer int32) error {
return ErrSpatialNotSupported
}
if spatialLayer == d.MaxSpatialLayer() {
return nil
}
d.maxSpatialLayer.set(spatialLayer)
if d.onSubscribedLayersChanged != nil {
@@ -498,6 +509,10 @@ func (d *DownTrack) MaxSpatialLayer() int32 {
}
func (d *DownTrack) SetMaxTemporalLayer(temporalLayer int32) {
if temporalLayer == d.MaxTemporalLayer() {
return
}
d.maxTemporalLayer.set(temporalLayer)
if d.onSubscribedLayersChanged != nil {

View File

@@ -100,6 +100,7 @@ package sfu
import (
"math"
"sort"
"sync"
"time"
@@ -183,8 +184,9 @@ type StreamAllocator struct {
boostedChannelCapacity uint64
lastBoostTime time.Time
tracksMu sync.RWMutex
tracks map[string]*Track
tracksMu sync.RWMutex
tracks map[string]*Track
tracksSorted TrackSorter
prober *Prober
@@ -240,24 +242,44 @@ func (s *StreamAllocator) AddTrack(downTrack *DownTrack) {
downTrack.OnPacketSent(s.onPacketSent)
s.tracksMu.Lock()
s.tracks[downTrack.ID()] = NewTrack(downTrack)
track := newTrack(downTrack)
s.tracks[downTrack.ID()] = track
s.tracksSorted = append(s.tracksSorted, track)
sort.Sort(s.tracksSorted)
s.tracksMu.Unlock()
s.postEvent(Signal_ADD_TRACK, downTrack)
s.postEvent(Event{
Signal: Signal_ADD_TRACK,
DownTrack: downTrack,
})
}
func (s *StreamAllocator) RemoveTrack(downTrack *DownTrack) {
s.tracksMu.Lock()
if _, ok := s.tracks[downTrack.ID()]; !ok {
s.tracksMu.Unlock()
return
}
delete(s.tracks, downTrack.ID())
n := len(s.tracksSorted)
for idx, track := range s.tracksSorted {
if track.DownTrack() == downTrack {
s.tracksSorted[idx] = s.tracksSorted[n-1]
s.tracksSorted = s.tracksSorted[:n-1]
break
}
}
sort.Sort(s.tracksSorted)
s.tracksMu.Unlock()
s.postEvent(Signal_REMOVE_TRACK, downTrack)
s.postEvent(Event{
Signal: Signal_REMOVE_TRACK,
DownTrack: downTrack,
})
}
func (s *StreamAllocator) onREMB(downTrack *DownTrack, remb *rtcp.ReceiverEstimatedMaximumBitrate) {
@@ -323,7 +345,9 @@ func (s *StreamAllocator) onREMB(downTrack *DownTrack, remb *rtcp.ReceiverEstima
s.estimateMu.Unlock()
if signal != Signal_NONE {
s.postEvent(signal, nil)
s.postEvent(Event{
Signal: signal,
})
}
}
@@ -349,27 +373,49 @@ func (s *StreamAllocator) onReceiverReport(downTrack *DownTrack, rr *rtcp.Receiv
// called when feeding track's simulcast layer availability changes
func (s *StreamAllocator) onAvailableLayersChanged(downTrack *DownTrack, layerAdded bool) {
// LK-TODO: Look at processing specific downtrack
signal := Signal_AVAILABLE_LAYERS_REMOVE
if layerAdded {
s.postEvent(Signal_AVAILABLE_LAYERS_ADD, downTrack)
} else {
s.postEvent(Signal_AVAILABLE_LAYERS_REMOVE, downTrack)
signal = Signal_AVAILABLE_LAYERS_ADD
}
s.postEvent(Event{
Signal: signal,
DownTrack: downTrack,
})
}
// called when subscription settings changes (muting/unmuting of track)
func (s *StreamAllocator) onSubscriptionChanged(downTrack *DownTrack) {
// LK-TODO: Look at processing specific downtrack
s.postEvent(Signal_SUBSCRIPTION_CHANGE, downTrack)
s.postEvent(Event{
Signal: Signal_SUBSCRIPTION_CHANGE,
DownTrack: downTrack,
})
}
// called when subscribed layers changes (limiting max layers)
func (s *StreamAllocator) onSubscribedLayersChanged(downTrack *DownTrack, maxSpatialLayer int32, maxTemporalLayer int32) {
// LK-TODO: Look at processing specific downtrack
s.postEvent(Signal_SUBSCRIBED_LAYERS_CHANGE, downTrack)
// LK-TODO: Look at processing specific downtrack for reallocation
s.tracksMu.Lock()
track, ok := s.tracks[downTrack.ID()]
if !ok {
s.tracksMu.Unlock()
return
}
track.UpdateMaxLayers(maxSpatialLayer, maxTemporalLayer)
sort.Sort(s.tracksSorted)
s.tracksMu.Unlock()
s.postEvent(Event{
Signal: Signal_SUBSCRIBED_LAYERS_CHANGE,
DownTrack: downTrack,
})
}
// called when DownTrack sends a packet
func (s *StreamAllocator) onPacketSent(downTrack *DownTrack, size int) {
// bandwidth allocation is limited to video, so ignore audio
if downTrack.Kind() == webrtc.RTPCodecTypeAudio {
return
}
@@ -399,7 +445,7 @@ func (s *StreamAllocator) onSendProbe(bytesToSend int) int {
return bytesSent
}
func (s *StreamAllocator) postEvent(signal Signal, downTrack *DownTrack) {
func (s *StreamAllocator) postEvent(event Event) {
s.chMu.RLock()
defer s.chMu.RUnlock()
@@ -407,10 +453,7 @@ func (s *StreamAllocator) postEvent(signal Signal, downTrack *DownTrack) {
return
}
s.eventCh <- []Event{Event{
Signal: signal,
DownTrack: downTrack,
}}
s.eventCh <- []Event{event}
}
func (s *StreamAllocator) processEvents() {
@@ -448,10 +491,14 @@ func (s *StreamAllocator) ping() {
s.estimateMu.Unlock()
if signal != Signal_NONE {
s.postEvent(signal, nil)
s.postEvent(Event{
Signal: signal,
})
}
s.postEvent(Signal_PERIODIC_PING, nil)
s.postEvent(Event{
Signal: Signal_PERIODIC_PING,
})
}
}
@@ -768,7 +815,7 @@ func (s *StreamAllocator) allocate() {
if availableChannelCapacity < s.boostedChannelCapacity {
availableChannelCapacity = s.boostedChannelCapacity
}
for _, track := range s.tracks {
for _, track := range s.tracksSorted {
//
// `audio` tracks will do nothing in this method.
//
@@ -985,6 +1032,8 @@ func (s *StreamAllocator) maybeGratuitousProbe() {
s.setState(State_GRATUITOUS_PROBING)
}
//------------------------------------------------
type Track struct {
// LK-TODO-START
// Check if we can do without a lock?
@@ -1009,12 +1058,18 @@ type Track struct {
maxTemporalLayer int32
}
func NewTrack(downTrack *DownTrack) *Track {
func newTrack(downTrack *DownTrack) *Track {
return &Track{
downTrack: downTrack,
downTrack: downTrack,
maxSpatialLayer: downTrack.MaxSpatialLayer(),
maxTemporalLayer: downTrack.MaxTemporalLayer(),
}
}
func (t *Track) DownTrack() *DownTrack {
return t.downTrack
}
// LK-TODO this should probably be maintained in downTrack and this module can query what it needs
func (t *Track) UpdatePacketStats(rr *rtcp.ReceiverReport) {
t.lock.Lock()
@@ -1033,6 +1088,11 @@ func (t *Track) UpdatePacketStats(rr *rtcp.ReceiverReport) {
}
}
func (t *Track) UpdateMaxLayers(maxSpatialLayer, maxTemporalLayer int32) {
t.maxSpatialLayer = maxSpatialLayer
t.maxTemporalLayer = maxTemporalLayer
}
func (t *Track) GetPacketStats() (webrtc.RTPCodecType, uint32, uint32) {
t.lock.RLock()
defer t.lock.RUnlock()
@@ -1072,3 +1132,33 @@ func (t *Track) BandwidthRequested() uint64 {
func (t *Track) BandwidthOptimal() uint64 {
return t.optimalBandwidthNeeded
}
//------------------------------------------------
type TrackSorter []*Track
func (t TrackSorter) Len() int {
return len(t)
}
func (t TrackSorter) Swap(i, j int) {
t[i], t[j] = t[j], t[i]
}
func (t TrackSorter) Less(i, j int) bool {
// highest spatial layers have higher priority
if t[i].maxSpatialLayer != t[j].maxSpatialLayer {
return t[i].maxSpatialLayer > t[j].maxSpatialLayer
}
// highest temporal layers have priority if max spatial layers match
if t[i].maxTemporalLayer != t[j].maxTemporalLayer {
return t[i].maxTemporalLayer > t[j].maxTemporalLayer
}
// use track id to keep ordering if nothing else changes
// LK-TODO: ideally should be sorting, compare and then re-allocate only if order changed
return t[i].downTrack.ID() < t[j].downTrack.ID()
}
//------------------------------------------------