From 2f35128ac25d54bf0c9b151ce71b9f79cd331d52 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Mon, 22 Nov 2021 23:21:26 +0530 Subject: [PATCH] Prioritize down tracks based on max layer(s) (#202) Use max layer subscription of down tracks to prioritize bandwidth allocation. The ordering is - Highest spatial layer - Highest temporal layer, if spatial layer matches - Down track id if both layers match (this is to prevent unnecessary re-ordering) Testing: -------- (Really need to make a DownTrack interface so that we can mock it and use it in tests) For now, just a sanity check. --- pkg/sfu/downtrack.go | 19 +++++- pkg/sfu/streamallocator.go | 136 ++++++++++++++++++++++++++++++------- 2 files changed, 130 insertions(+), 25 deletions(-) diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 6ddfddd88..f6838c2a6 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -160,11 +160,18 @@ func NewDownTrack(c webrtc.RTPCodecCapability, r Receiver, bf *buffer.Factory, p codec: c, munger: NewMunger(), } + if strings.ToLower(c.MimeType) == "video/vp8" { d.vp8Munger = NewVP8Munger() } - d.maxSpatialLayer.set(2) - d.maxTemporalLayer.set(2) + + if d.Kind() == webrtc.RTPCodecTypeVideo { + d.maxSpatialLayer.set(2) + d.maxTemporalLayer.set(2) + } else { + d.maxSpatialLayer.set(-1) + d.maxTemporalLayer.set(-1) + } return d, nil } @@ -483,6 +490,10 @@ func (d *DownTrack) SetMaxSpatialLayer(spatialLayer int32) error { return ErrSpatialNotSupported } + if spatialLayer == d.MaxSpatialLayer() { + return nil + } + d.maxSpatialLayer.set(spatialLayer) if d.onSubscribedLayersChanged != nil { @@ -498,6 +509,10 @@ func (d *DownTrack) MaxSpatialLayer() int32 { } func (d *DownTrack) SetMaxTemporalLayer(temporalLayer int32) { + if temporalLayer == d.MaxTemporalLayer() { + return + } + d.maxTemporalLayer.set(temporalLayer) if d.onSubscribedLayersChanged != nil { diff --git a/pkg/sfu/streamallocator.go b/pkg/sfu/streamallocator.go index b1cc19702..44110fb87 100644 --- a/pkg/sfu/streamallocator.go +++ b/pkg/sfu/streamallocator.go @@ -100,6 +100,7 @@ package sfu import ( "math" + "sort" "sync" "time" @@ -183,8 +184,9 @@ type StreamAllocator struct { boostedChannelCapacity uint64 lastBoostTime time.Time - tracksMu sync.RWMutex - tracks map[string]*Track + tracksMu sync.RWMutex + tracks map[string]*Track + tracksSorted TrackSorter prober *Prober @@ -240,24 +242,44 @@ func (s *StreamAllocator) AddTrack(downTrack *DownTrack) { downTrack.OnPacketSent(s.onPacketSent) s.tracksMu.Lock() - s.tracks[downTrack.ID()] = NewTrack(downTrack) + track := newTrack(downTrack) + s.tracks[downTrack.ID()] = track + + s.tracksSorted = append(s.tracksSorted, track) + sort.Sort(s.tracksSorted) + s.tracksMu.Unlock() - s.postEvent(Signal_ADD_TRACK, downTrack) + s.postEvent(Event{ + Signal: Signal_ADD_TRACK, + DownTrack: downTrack, + }) } func (s *StreamAllocator) RemoveTrack(downTrack *DownTrack) { s.tracksMu.Lock() - if _, ok := s.tracks[downTrack.ID()]; !ok { s.tracksMu.Unlock() return } delete(s.tracks, downTrack.ID()) + + n := len(s.tracksSorted) + for idx, track := range s.tracksSorted { + if track.DownTrack() == downTrack { + s.tracksSorted[idx] = s.tracksSorted[n-1] + s.tracksSorted = s.tracksSorted[:n-1] + break + } + } + sort.Sort(s.tracksSorted) s.tracksMu.Unlock() - s.postEvent(Signal_REMOVE_TRACK, downTrack) + s.postEvent(Event{ + Signal: Signal_REMOVE_TRACK, + DownTrack: downTrack, + }) } func (s *StreamAllocator) onREMB(downTrack *DownTrack, remb *rtcp.ReceiverEstimatedMaximumBitrate) { @@ -323,7 +345,9 @@ func (s *StreamAllocator) onREMB(downTrack *DownTrack, remb *rtcp.ReceiverEstima s.estimateMu.Unlock() if signal != Signal_NONE { - s.postEvent(signal, nil) + s.postEvent(Event{ + Signal: signal, + }) } } @@ -349,27 +373,49 @@ func (s *StreamAllocator) onReceiverReport(downTrack *DownTrack, rr *rtcp.Receiv // called when feeding track's simulcast layer availability changes func (s *StreamAllocator) onAvailableLayersChanged(downTrack *DownTrack, layerAdded bool) { // LK-TODO: Look at processing specific downtrack + signal := Signal_AVAILABLE_LAYERS_REMOVE if layerAdded { - s.postEvent(Signal_AVAILABLE_LAYERS_ADD, downTrack) - } else { - s.postEvent(Signal_AVAILABLE_LAYERS_REMOVE, downTrack) + signal = Signal_AVAILABLE_LAYERS_ADD } + s.postEvent(Event{ + Signal: signal, + DownTrack: downTrack, + }) } // called when subscription settings changes (muting/unmuting of track) func (s *StreamAllocator) onSubscriptionChanged(downTrack *DownTrack) { // LK-TODO: Look at processing specific downtrack - s.postEvent(Signal_SUBSCRIPTION_CHANGE, downTrack) + s.postEvent(Event{ + Signal: Signal_SUBSCRIPTION_CHANGE, + DownTrack: downTrack, + }) } // called when subscribed layers changes (limiting max layers) func (s *StreamAllocator) onSubscribedLayersChanged(downTrack *DownTrack, maxSpatialLayer int32, maxTemporalLayer int32) { - // LK-TODO: Look at processing specific downtrack - s.postEvent(Signal_SUBSCRIBED_LAYERS_CHANGE, downTrack) + // LK-TODO: Look at processing specific downtrack for reallocation + s.tracksMu.Lock() + track, ok := s.tracks[downTrack.ID()] + if !ok { + s.tracksMu.Unlock() + return + } + + track.UpdateMaxLayers(maxSpatialLayer, maxTemporalLayer) + + sort.Sort(s.tracksSorted) + s.tracksMu.Unlock() + + s.postEvent(Event{ + Signal: Signal_SUBSCRIBED_LAYERS_CHANGE, + DownTrack: downTrack, + }) } // called when DownTrack sends a packet func (s *StreamAllocator) onPacketSent(downTrack *DownTrack, size int) { + // bandwidth allocation is limited to video, so ignore audio if downTrack.Kind() == webrtc.RTPCodecTypeAudio { return } @@ -399,7 +445,7 @@ func (s *StreamAllocator) onSendProbe(bytesToSend int) int { return bytesSent } -func (s *StreamAllocator) postEvent(signal Signal, downTrack *DownTrack) { +func (s *StreamAllocator) postEvent(event Event) { s.chMu.RLock() defer s.chMu.RUnlock() @@ -407,10 +453,7 @@ func (s *StreamAllocator) postEvent(signal Signal, downTrack *DownTrack) { return } - s.eventCh <- []Event{Event{ - Signal: signal, - DownTrack: downTrack, - }} + s.eventCh <- []Event{event} } func (s *StreamAllocator) processEvents() { @@ -448,10 +491,14 @@ func (s *StreamAllocator) ping() { s.estimateMu.Unlock() if signal != Signal_NONE { - s.postEvent(signal, nil) + s.postEvent(Event{ + Signal: signal, + }) } - s.postEvent(Signal_PERIODIC_PING, nil) + s.postEvent(Event{ + Signal: Signal_PERIODIC_PING, + }) } } @@ -768,7 +815,7 @@ func (s *StreamAllocator) allocate() { if availableChannelCapacity < s.boostedChannelCapacity { availableChannelCapacity = s.boostedChannelCapacity } - for _, track := range s.tracks { + for _, track := range s.tracksSorted { // // `audio` tracks will do nothing in this method. // @@ -985,6 +1032,8 @@ func (s *StreamAllocator) maybeGratuitousProbe() { s.setState(State_GRATUITOUS_PROBING) } +//------------------------------------------------ + type Track struct { // LK-TODO-START // Check if we can do without a lock? @@ -1009,12 +1058,18 @@ type Track struct { maxTemporalLayer int32 } -func NewTrack(downTrack *DownTrack) *Track { +func newTrack(downTrack *DownTrack) *Track { return &Track{ - downTrack: downTrack, + downTrack: downTrack, + maxSpatialLayer: downTrack.MaxSpatialLayer(), + maxTemporalLayer: downTrack.MaxTemporalLayer(), } } +func (t *Track) DownTrack() *DownTrack { + return t.downTrack +} + // LK-TODO this should probably be maintained in downTrack and this module can query what it needs func (t *Track) UpdatePacketStats(rr *rtcp.ReceiverReport) { t.lock.Lock() @@ -1033,6 +1088,11 @@ func (t *Track) UpdatePacketStats(rr *rtcp.ReceiverReport) { } } +func (t *Track) UpdateMaxLayers(maxSpatialLayer, maxTemporalLayer int32) { + t.maxSpatialLayer = maxSpatialLayer + t.maxTemporalLayer = maxTemporalLayer +} + func (t *Track) GetPacketStats() (webrtc.RTPCodecType, uint32, uint32) { t.lock.RLock() defer t.lock.RUnlock() @@ -1072,3 +1132,33 @@ func (t *Track) BandwidthRequested() uint64 { func (t *Track) BandwidthOptimal() uint64 { return t.optimalBandwidthNeeded } + +//------------------------------------------------ + +type TrackSorter []*Track + +func (t TrackSorter) Len() int { + return len(t) +} + +func (t TrackSorter) Swap(i, j int) { + t[i], t[j] = t[j], t[i] +} + +func (t TrackSorter) Less(i, j int) bool { + // highest spatial layers have higher priority + if t[i].maxSpatialLayer != t[j].maxSpatialLayer { + return t[i].maxSpatialLayer > t[j].maxSpatialLayer + } + + // highest temporal layers have priority if max spatial layers match + if t[i].maxTemporalLayer != t[j].maxTemporalLayer { + return t[i].maxTemporalLayer > t[j].maxTemporalLayer + } + + // use track id to keep ordering if nothing else changes + // LK-TODO: ideally should be sorting, compare and then re-allocate only if order changed + return t[i].downTrack.ID() < t[j].downTrack.ID() +} + +//------------------------------------------------