From 391e2f8b318765b1bcb96d876e9be0444414328f Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Mon, 22 Nov 2021 14:01:25 +0530 Subject: [PATCH] Separate out max layer setting (#201) Small step on the way to making StreamAllocator prioritization of tracks. With the new callback into StreamAllocator, the idea is to use the max layer information to do track prioritization. Testing: -------- Sanity check that sample app works --- pkg/rtc/mediatrack.go | 1 + pkg/rtc/subscribedtrack.go | 11 +++- pkg/service/utils_test.go | 8 +-- pkg/sfu/downtrack.go | 125 ++++++++++++++++++++----------------- pkg/sfu/receiver.go | 4 +- pkg/sfu/streamallocator.go | 29 ++++++++- 6 files changed, 108 insertions(+), 70 deletions(-) diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index 2762c819f..97fa4e2d2 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -462,6 +462,7 @@ func (t *MediaTrack) GetQualityForDimension(width, height uint32) livekit.VideoQ return quality } +// LK-TODO: this should probably left up to auto size management and StreamAllocator // this function assumes caller holds lock func (t *MediaTrack) shouldStartWithBestQuality() bool { return len(t.subscribedTracks) < 10 diff --git a/pkg/rtc/subscribedtrack.go b/pkg/rtc/subscribedtrack.go index 4e562624e..5c552bfa9 100644 --- a/pkg/rtc/subscribedtrack.go +++ b/pkg/rtc/subscribedtrack.go @@ -62,10 +62,17 @@ func (t *SubscribedTrack) UpdateSubscriberSettings(enabled bool, quality livekit t.subMuted.TrySet(!enabled) t.updateDownTrackMute() if enabled && t.dt.Kind() == webrtc.RTPCodecTypeVideo { - err := t.dt.SwitchSpatialLayer(spatialLayerForQuality(quality), true) + err := t.dt.SetMaxSpatialLayer(spatialLayerForQuality(quality)) + // LK-TODO-START + // For now, this set max calls into switchSpatialLayer and returns the result. + // When StreamAllocator becomes the one true way to allocate layers, + // there will not a ErrSpatialLayerNotFound error as the allocation + // logic will automatically the best available layer under the max layer. + // So, this check should be removed when enabling StreamAllocator + // LK-TODO-END if err == sfu.ErrSpatialLayerNotFound && quality != livekit.VideoQuality_MEDIUM { // try to switch to middle layer - _ = t.dt.SwitchSpatialLayer(spatialLayerForQuality(livekit.VideoQuality_MEDIUM), true) + _ = t.dt.SetMaxSpatialLayer(spatialLayerForQuality(livekit.VideoQuality_MEDIUM)) } } }) diff --git a/pkg/service/utils_test.go b/pkg/service/utils_test.go index 03d10107a..1eaed9b52 100644 --- a/pkg/service/utils_test.go +++ b/pkg/service/utils_test.go @@ -15,13 +15,13 @@ func redisClient() *redis.Client { func TestIsValidDomain(t *testing.T) { list := map[string]bool{ - "turn.myhost.com": true, - "turn.google.com": true, + "turn.myhost.com": true, + "turn.google.com": true, "https://host.com": false, - "turn://host.com": false, + "turn://host.com": false, } for key, result := range list { service.IsValidDomain(key) require.Equal(t, service.IsValidDomain(key), result) } -} \ No newline at end of file +} diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 5f47277d7..6ddfddd88 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -141,6 +141,9 @@ type DownTrack struct { // subscription change callback onSubscriptionChanged func(dt *DownTrack) + // max layer change callback + onSubscribedLayersChanged func(dt *DownTrack, maxSpatialLayer int32, maxTemporalLayer int32) + // packet sent callback onPacketSent []func(dt *DownTrack, size int) } @@ -160,6 +163,8 @@ func NewDownTrack(c webrtc.RTPCodecCapability, r Receiver, bf *buffer.Factory, p if strings.ToLower(c.MimeType) == "video/vp8" { d.vp8Munger = NewVP8Munger() } + d.maxSpatialLayer.set(2) + d.maxTemporalLayer.set(2) return d, nil } @@ -472,23 +477,40 @@ func (d *DownTrack) TargetSpatialLayer() int32 { return d.targetSpatialLayer.get() } +func (d *DownTrack) SetMaxSpatialLayer(spatialLayer int32) error { + // LK-TODO: support SVC + if d.trackType != SimulcastDownTrack { + return ErrSpatialNotSupported + } + + d.maxSpatialLayer.set(spatialLayer) + + if d.onSubscribedLayersChanged != nil { + d.onSubscribedLayersChanged(d, spatialLayer, d.MaxTemporalLayer()) + } + + // LK-TODO: Remove the following when StreamAllocator is the default way + return d.switchSpatialLayer(spatialLayer) +} + func (d *DownTrack) MaxSpatialLayer() int32 { return d.maxSpatialLayer.get() } -// SwitchSpatialLayer switches the current layer -func (d *DownTrack) SwitchSpatialLayer(targetLayer int32, setAsMax bool) error { - // LK-TODO-START - // This gets called directly from two places outside. - // 1. From livekit-server when suscriber updates TrackSetting - // 2. From sfu.Receiver when a new up track is added - // Make a method in this struct which allows setting subscriber controls. - // StreamAllocator needs to know about subscriber setting changes. - // Only the StreamAllocator should be changing forwarded layers. - // Make a method `SetMaxSpatialLayer()` and remove `setAsMax` from this. - // Trigger callback to StreamAllocator to notify subscription change - // in that method. - // LK-TODO-END +func (d *DownTrack) SetMaxTemporalLayer(temporalLayer int32) { + d.maxTemporalLayer.set(temporalLayer) + + if d.onSubscribedLayersChanged != nil { + d.onSubscribedLayersChanged(d, d.MaxSpatialLayer(), temporalLayer) + } +} + +func (d *DownTrack) MaxTemporalLayer() int32 { + return d.maxTemporalLayer.get() +} + +// switchSpatialLayer switches the current layer +func (d *DownTrack) switchSpatialLayer(targetLayer int32) error { if d.trackType != SimulcastDownTrack { return ErrSpatialNotSupported } @@ -501,13 +523,6 @@ func (d *DownTrack) SwitchSpatialLayer(targetLayer int32, setAsMax bool) error { } d.targetSpatialLayer.set(targetLayer) - if setAsMax { - d.maxSpatialLayer.set(targetLayer) - - if d.onSubscriptionChanged != nil { - d.onSubscriptionChanged(d) - } - } return nil } @@ -547,7 +562,7 @@ func (d *DownTrack) UptrackLayersChange(availableLayers []uint16, layerAdded boo // Available layers change should be signalled to StreamAllocator // and StreamAllocator will take care of adjusting allocations. // LK-TODO-END - if err := d.SwitchSpatialLayer(int32(targetLayer), false); err != nil { + if err := d.switchSpatialLayer(int32(targetLayer)); err != nil { return int32(targetLayer), err } } @@ -561,31 +576,21 @@ func (d *DownTrack) UptrackLayersChange(availableLayers []uint16, layerAdded boo return -1, fmt.Errorf("downtrack %s does not support simulcast", d.id) } -func (d *DownTrack) SwitchTemporalLayer(targetLayer int32, setAsMax bool) { - // LK-TODO-START - // See note in SwitchSpatialLayer to split out setting max layer - // into a separate method and triggering notification to StreamAllocator. - // BTW, looks like `setAsMax` is not set to true at any callsite of this API - // LK-TODO-END - if d.trackType == SimulcastDownTrack { - layer := d.temporalLayer.get() - currentLayer := uint16(layer) - currentTargetLayer := uint16(layer >> 16) - - // Don't switch until previous switch is done or canceled - if currentLayer != currentTargetLayer { - return - } - - d.temporalLayer.set((targetLayer << 16) | int32(currentLayer)) - if setAsMax { - d.maxTemporalLayer.set(targetLayer) - - if d.onSubscriptionChanged != nil { - d.onSubscriptionChanged(d) - } - } +func (d *DownTrack) switchTemporalLayer(targetLayer int32) { + if d.trackType != SimulcastDownTrack { + return } + + layer := d.temporalLayer.get() + currentLayer := uint16(layer) + currentTargetLayer := uint16(layer >> 16) + + // Don't switch until previous switch is done or canceled + if currentLayer != currentTargetLayer { + return + } + + d.temporalLayer.set((targetLayer << 16) | int32(currentLayer)) } // OnCloseHandler method to be called on remote tracked removed @@ -623,6 +628,10 @@ func (d *DownTrack) OnSubscriptionChanged(fn func(dt *DownTrack)) { d.onSubscriptionChanged = fn } +func (d *DownTrack) OnSubscribedLayersChanged(fn func(dt *DownTrack, maxSpatialLayer int32, maxTemporalLayer int32)) { + d.onSubscribedLayersChanged = fn +} + func (d *DownTrack) OnPacketSent(fn func(dt *DownTrack, size int)) { d.onPacketSent = append(d.onPacketSent, fn) } @@ -645,8 +654,8 @@ func (d *DownTrack) AdjustAllocation(availableChannelCapacity uint64) (uint64, u } if brs[i][j] < availableChannelCapacity { d.bandwidthConstrainedMute(false) // just in case it was muted - d.SwitchSpatialLayer(int32(i), false) - d.SwitchTemporalLayer(int32(j), false) + d.switchSpatialLayer(int32(i)) + d.switchTemporalLayer(int32(j)) return brs[i][j], optimalBandwidthNeeded } @@ -718,8 +727,8 @@ func (d *DownTrack) IncreaseAllocation() (bool, uint64, uint64) { } d.bandwidthConstrainedMute(false) - d.SwitchSpatialLayer(int32(0), false) - d.SwitchTemporalLayer(int32(0), false) + d.switchSpatialLayer(int32(0)) + d.switchTemporalLayer(int32(0)) return true, brs[0][0], optimalBandwidthNeeded } @@ -727,7 +736,7 @@ func (d *DownTrack) IncreaseAllocation() (bool, uint64, uint64) { // LK-TODO currentTemporalLayer may be outside available range because of inital value being out of range, fix it nextTemporalLayer := currentTemporalLayer + 1 if nextTemporalLayer <= d.maxTemporalLayer.get() && brs[currentSpatialLayer][nextTemporalLayer] > 0 { - d.SwitchTemporalLayer(nextTemporalLayer, false) + d.switchTemporalLayer(nextTemporalLayer) return true, brs[currentSpatialLayer][nextTemporalLayer], optimalBandwidthNeeded } @@ -735,8 +744,8 @@ func (d *DownTrack) IncreaseAllocation() (bool, uint64, uint64) { // LK-TODO currentTemporalLayer may be outside available range because of inital value being out of range, fix it nextSpatialLayer := currentSpatialLayer + 1 if nextSpatialLayer <= d.maxSpatialLayer.get() && brs[nextSpatialLayer][0] > 0 { - d.SwitchSpatialLayer(nextSpatialLayer, false) - d.SwitchTemporalLayer(0, false) + d.switchSpatialLayer(nextSpatialLayer) + d.switchTemporalLayer(0) return true, brs[nextSpatialLayer][0], optimalBandwidthNeeded } @@ -1303,13 +1312,13 @@ func (d *DownTrack) handleLayerChange(maxRatePacketLoss uint8, expectedMinBitrat if maxRatePacketLoss <= 5 { if currentTemporalLayer < mctl && currentTemporalLayer+1 <= d.maxTemporalLayer.get() && expectedMinBitrate >= 3*cbr/4 { - d.SwitchTemporalLayer(currentTemporalLayer+1, false) + d.switchTemporalLayer(currentTemporalLayer + 1) d.simulcast.switchDelay = time.Now().Add(3 * time.Second) } if currentTemporalLayer >= mctl && expectedMinBitrate >= 3*cbr/2 && currentSpatialLayer+1 <= d.maxSpatialLayer.get() && currentSpatialLayer+1 <= 2 { - if err := d.SwitchSpatialLayer(currentSpatialLayer+1, false); err == nil { - d.SwitchTemporalLayer(0, false) + if err := d.switchSpatialLayer(currentSpatialLayer + 1); err == nil { + d.switchTemporalLayer(0) } d.simulcast.switchDelay = time.Now().Add(5 * time.Second) } @@ -1318,12 +1327,12 @@ func (d *DownTrack) handleLayerChange(maxRatePacketLoss uint8, expectedMinBitrat if (expectedMinBitrate <= 5*cbr/8 || currentTemporalLayer == 0) && currentSpatialLayer > 0 && brs[currentSpatialLayer-1] != 0 { - if err := d.SwitchSpatialLayer(currentSpatialLayer-1, false); err != nil { - d.SwitchTemporalLayer(mtl[currentSpatialLayer-1], false) + if err := d.switchSpatialLayer(currentSpatialLayer - 1); err != nil { + d.switchTemporalLayer(mtl[currentSpatialLayer-1]) } d.simulcast.switchDelay = time.Now().Add(10 * time.Second) } else { - d.SwitchTemporalLayer(currentTemporalLayer-1, false) + d.switchTemporalLayer(currentTemporalLayer - 1) d.simulcast.switchDelay = time.Now().Add(5 * time.Second) } } diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index d0d80e685..70c1d25d0 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -215,7 +215,7 @@ func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buff if dt != nil { if (bestQualityFirst && layer > dt.CurrentSpatialLayer()) || (!bestQualityFirst && layer < dt.CurrentSpatialLayer()) { - _ = dt.SwitchSpatialLayer(layer, false) + _ = dt.switchSpatialLayer(layer) } } } @@ -281,8 +281,6 @@ func (w *WebRTCReceiver) AddDownTrack(track *DownTrack, bestQualityFirst bool) { w.upTrackMu.RUnlock() track.SetInitialLayers(int32(layer), 2) - track.maxSpatialLayer.set(2) - track.maxTemporalLayer.set(2) track.lastSSRC.set(w.SSRC(layer)) track.trackType = SimulcastDownTrack track.payload = packetFactory.Get().(*[]byte) diff --git a/pkg/sfu/streamallocator.go b/pkg/sfu/streamallocator.go index 9a0401003..b1cc19702 100644 --- a/pkg/sfu/streamallocator.go +++ b/pkg/sfu/streamallocator.go @@ -39,7 +39,9 @@ // This could happen due to publisher throttling layers due to upstream congestion // in its path. // - OnSubscriptionChanged: called when a down track settings are changed resulting -// from client side requests (muting/pausing a video or limiting maximum layer). +// from client side requests (muting/unmuting) +// - OnSubscribedLayersChanged: called when a down track settings are changed resulting +// from client side requests (limiting maximum layer). // - OnPacketSent: called when a media packet is forwarded by the down track. As // this happens once per forwarded packet, processing in this callback should be // kept to a minimum. @@ -88,7 +90,8 @@ // - Signal_AVAILABLE_LAYERS_ADD: Available layers of publisher changed, new layer(s) available. // - Signal_AVAILABLE_LAYERS_REMOVE: Available layers of publisher changed, some previously // available layer(s) not available anymore. -// - Signal_SUBSCRIPTION_CHANGE: Subscription changed (mute/requested layers changed). +// - Signal_SUBSCRIPTION_CHANGE: Subscription changed (mute/unmute) +// - Signal_SUBSCRIBED_LAYERS_CHANGE: Subscribed layers changed (requested layers changed). // - Signal_PERIODIC_PING: Periodic ping // // There are several interesting challenges which are documented in relevant code below. @@ -144,6 +147,7 @@ const ( Signal_AVAILABLE_LAYERS_ADD Signal_AVAILABLE_LAYERS_REMOVE Signal_SUBSCRIPTION_CHANGE + Signal_SUBSCRIBED_LAYERS_CHANGE Signal_PERIODIC_PING ) @@ -232,6 +236,7 @@ func (s *StreamAllocator) AddTrack(downTrack *DownTrack) { downTrack.AddReceiverReportListener(s.onReceiverReport) downTrack.OnAvailableLayersChanged(s.onAvailableLayersChanged) downTrack.OnSubscriptionChanged(s.onSubscriptionChanged) + downTrack.OnSubscribedLayersChanged(s.onSubscribedLayersChanged) downTrack.OnPacketSent(s.onPacketSent) s.tracksMu.Lock() @@ -351,12 +356,18 @@ func (s *StreamAllocator) onAvailableLayersChanged(downTrack *DownTrack, layerAd } } -// called when subscription settings changes +// called when subscription settings changes (muting/unmuting of track) func (s *StreamAllocator) onSubscriptionChanged(downTrack *DownTrack) { // LK-TODO: Look at processing specific downtrack s.postEvent(Signal_SUBSCRIPTION_CHANGE, downTrack) } +// called when subscribed layers changes (limiting max layers) +func (s *StreamAllocator) onSubscribedLayersChanged(downTrack *DownTrack, maxSpatialLayer int32, maxTemporalLayer int32) { + // LK-TODO: Look at processing specific downtrack + s.postEvent(Signal_SUBSCRIBED_LAYERS_CHANGE, downTrack) +} + // called when DownTrack sends a packet func (s *StreamAllocator) onPacketSent(downTrack *DownTrack, size int) { if downTrack.Kind() == webrtc.RTPCodecTypeAudio { @@ -502,6 +513,8 @@ func (s *StreamAllocator) runStatePreCommit(event Event) { s.allocate() case Signal_SUBSCRIPTION_CHANGE: s.allocate() + case Signal_SUBSCRIBED_LAYERS_CHANGE: + s.allocate() case Signal_PERIODIC_PING: } } @@ -523,6 +536,8 @@ func (s *StreamAllocator) runStateStable(event Event) { s.allocate() case Signal_SUBSCRIPTION_CHANGE: s.allocate() + case Signal_SUBSCRIBED_LAYERS_CHANGE: + s.allocate() case Signal_PERIODIC_PING: // if bandwidth estimate has been stable for a while, maybe gratuitously probe s.maybeGratuitousProbe() @@ -566,6 +581,8 @@ func (s *StreamAllocator) runStateDeficient(event Event) { s.allocate() case Signal_SUBSCRIPTION_CHANGE: s.allocate() + case Signal_SUBSCRIBED_LAYERS_CHANGE: + s.allocate() case Signal_PERIODIC_PING: s.maybeProbe() } @@ -599,6 +616,9 @@ func (s *StreamAllocator) runStateGratuitousProbing(event Event) { case Signal_SUBSCRIPTION_CHANGE: s.prober.Reset() s.allocate() + case Signal_SUBSCRIBED_LAYERS_CHANGE: + s.prober.Reset() + s.allocate() case Signal_PERIODIC_PING: if !s.prober.IsRunning() { // try for more @@ -984,6 +1004,9 @@ type Track struct { bandwidthRequested uint64 optimalBandwidthNeeded uint64 + + maxSpatialLayer int32 + maxTemporalLayer int32 } func NewTrack(downTrack *DownTrack) *Track {