diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index 2762c819f..97fa4e2d2 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -462,6 +462,7 @@ func (t *MediaTrack) GetQualityForDimension(width, height uint32) livekit.VideoQ return quality } +// LK-TODO: this should probably left up to auto size management and StreamAllocator // this function assumes caller holds lock func (t *MediaTrack) shouldStartWithBestQuality() bool { return len(t.subscribedTracks) < 10 diff --git a/pkg/rtc/subscribedtrack.go b/pkg/rtc/subscribedtrack.go index 4e562624e..5c552bfa9 100644 --- a/pkg/rtc/subscribedtrack.go +++ b/pkg/rtc/subscribedtrack.go @@ -62,10 +62,17 @@ func (t *SubscribedTrack) UpdateSubscriberSettings(enabled bool, quality livekit t.subMuted.TrySet(!enabled) t.updateDownTrackMute() if enabled && t.dt.Kind() == webrtc.RTPCodecTypeVideo { - err := t.dt.SwitchSpatialLayer(spatialLayerForQuality(quality), true) + err := t.dt.SetMaxSpatialLayer(spatialLayerForQuality(quality)) + // LK-TODO-START + // For now, this set max calls into switchSpatialLayer and returns the result. + // When StreamAllocator becomes the one true way to allocate layers, + // there will not a ErrSpatialLayerNotFound error as the allocation + // logic will automatically the best available layer under the max layer. + // So, this check should be removed when enabling StreamAllocator + // LK-TODO-END if err == sfu.ErrSpatialLayerNotFound && quality != livekit.VideoQuality_MEDIUM { // try to switch to middle layer - _ = t.dt.SwitchSpatialLayer(spatialLayerForQuality(livekit.VideoQuality_MEDIUM), true) + _ = t.dt.SetMaxSpatialLayer(spatialLayerForQuality(livekit.VideoQuality_MEDIUM)) } } }) diff --git a/pkg/service/utils_test.go b/pkg/service/utils_test.go index 03d10107a..1eaed9b52 100644 --- a/pkg/service/utils_test.go +++ b/pkg/service/utils_test.go @@ -15,13 +15,13 @@ func redisClient() *redis.Client { func TestIsValidDomain(t *testing.T) { list := map[string]bool{ - "turn.myhost.com": true, - "turn.google.com": true, + "turn.myhost.com": true, + "turn.google.com": true, "https://host.com": false, - "turn://host.com": false, + "turn://host.com": false, } for key, result := range list { service.IsValidDomain(key) require.Equal(t, service.IsValidDomain(key), result) } -} \ No newline at end of file +} diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 5f47277d7..6ddfddd88 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -141,6 +141,9 @@ type DownTrack struct { // subscription change callback onSubscriptionChanged func(dt *DownTrack) + // max layer change callback + onSubscribedLayersChanged func(dt *DownTrack, maxSpatialLayer int32, maxTemporalLayer int32) + // packet sent callback onPacketSent []func(dt *DownTrack, size int) } @@ -160,6 +163,8 @@ func NewDownTrack(c webrtc.RTPCodecCapability, r Receiver, bf *buffer.Factory, p if strings.ToLower(c.MimeType) == "video/vp8" { d.vp8Munger = NewVP8Munger() } + d.maxSpatialLayer.set(2) + d.maxTemporalLayer.set(2) return d, nil } @@ -472,23 +477,40 @@ func (d *DownTrack) TargetSpatialLayer() int32 { return d.targetSpatialLayer.get() } +func (d *DownTrack) SetMaxSpatialLayer(spatialLayer int32) error { + // LK-TODO: support SVC + if d.trackType != SimulcastDownTrack { + return ErrSpatialNotSupported + } + + d.maxSpatialLayer.set(spatialLayer) + + if d.onSubscribedLayersChanged != nil { + d.onSubscribedLayersChanged(d, spatialLayer, d.MaxTemporalLayer()) + } + + // LK-TODO: Remove the following when StreamAllocator is the default way + return d.switchSpatialLayer(spatialLayer) +} + func (d *DownTrack) MaxSpatialLayer() int32 { return d.maxSpatialLayer.get() } -// SwitchSpatialLayer switches the current layer -func (d *DownTrack) SwitchSpatialLayer(targetLayer int32, setAsMax bool) error { - // LK-TODO-START - // This gets called directly from two places outside. - // 1. From livekit-server when suscriber updates TrackSetting - // 2. From sfu.Receiver when a new up track is added - // Make a method in this struct which allows setting subscriber controls. - // StreamAllocator needs to know about subscriber setting changes. - // Only the StreamAllocator should be changing forwarded layers. - // Make a method `SetMaxSpatialLayer()` and remove `setAsMax` from this. - // Trigger callback to StreamAllocator to notify subscription change - // in that method. - // LK-TODO-END +func (d *DownTrack) SetMaxTemporalLayer(temporalLayer int32) { + d.maxTemporalLayer.set(temporalLayer) + + if d.onSubscribedLayersChanged != nil { + d.onSubscribedLayersChanged(d, d.MaxSpatialLayer(), temporalLayer) + } +} + +func (d *DownTrack) MaxTemporalLayer() int32 { + return d.maxTemporalLayer.get() +} + +// switchSpatialLayer switches the current layer +func (d *DownTrack) switchSpatialLayer(targetLayer int32) error { if d.trackType != SimulcastDownTrack { return ErrSpatialNotSupported } @@ -501,13 +523,6 @@ func (d *DownTrack) SwitchSpatialLayer(targetLayer int32, setAsMax bool) error { } d.targetSpatialLayer.set(targetLayer) - if setAsMax { - d.maxSpatialLayer.set(targetLayer) - - if d.onSubscriptionChanged != nil { - d.onSubscriptionChanged(d) - } - } return nil } @@ -547,7 +562,7 @@ func (d *DownTrack) UptrackLayersChange(availableLayers []uint16, layerAdded boo // Available layers change should be signalled to StreamAllocator // and StreamAllocator will take care of adjusting allocations. // LK-TODO-END - if err := d.SwitchSpatialLayer(int32(targetLayer), false); err != nil { + if err := d.switchSpatialLayer(int32(targetLayer)); err != nil { return int32(targetLayer), err } } @@ -561,31 +576,21 @@ func (d *DownTrack) UptrackLayersChange(availableLayers []uint16, layerAdded boo return -1, fmt.Errorf("downtrack %s does not support simulcast", d.id) } -func (d *DownTrack) SwitchTemporalLayer(targetLayer int32, setAsMax bool) { - // LK-TODO-START - // See note in SwitchSpatialLayer to split out setting max layer - // into a separate method and triggering notification to StreamAllocator. - // BTW, looks like `setAsMax` is not set to true at any callsite of this API - // LK-TODO-END - if d.trackType == SimulcastDownTrack { - layer := d.temporalLayer.get() - currentLayer := uint16(layer) - currentTargetLayer := uint16(layer >> 16) - - // Don't switch until previous switch is done or canceled - if currentLayer != currentTargetLayer { - return - } - - d.temporalLayer.set((targetLayer << 16) | int32(currentLayer)) - if setAsMax { - d.maxTemporalLayer.set(targetLayer) - - if d.onSubscriptionChanged != nil { - d.onSubscriptionChanged(d) - } - } +func (d *DownTrack) switchTemporalLayer(targetLayer int32) { + if d.trackType != SimulcastDownTrack { + return } + + layer := d.temporalLayer.get() + currentLayer := uint16(layer) + currentTargetLayer := uint16(layer >> 16) + + // Don't switch until previous switch is done or canceled + if currentLayer != currentTargetLayer { + return + } + + d.temporalLayer.set((targetLayer << 16) | int32(currentLayer)) } // OnCloseHandler method to be called on remote tracked removed @@ -623,6 +628,10 @@ func (d *DownTrack) OnSubscriptionChanged(fn func(dt *DownTrack)) { d.onSubscriptionChanged = fn } +func (d *DownTrack) OnSubscribedLayersChanged(fn func(dt *DownTrack, maxSpatialLayer int32, maxTemporalLayer int32)) { + d.onSubscribedLayersChanged = fn +} + func (d *DownTrack) OnPacketSent(fn func(dt *DownTrack, size int)) { d.onPacketSent = append(d.onPacketSent, fn) } @@ -645,8 +654,8 @@ func (d *DownTrack) AdjustAllocation(availableChannelCapacity uint64) (uint64, u } if brs[i][j] < availableChannelCapacity { d.bandwidthConstrainedMute(false) // just in case it was muted - d.SwitchSpatialLayer(int32(i), false) - d.SwitchTemporalLayer(int32(j), false) + d.switchSpatialLayer(int32(i)) + d.switchTemporalLayer(int32(j)) return brs[i][j], optimalBandwidthNeeded } @@ -718,8 +727,8 @@ func (d *DownTrack) IncreaseAllocation() (bool, uint64, uint64) { } d.bandwidthConstrainedMute(false) - d.SwitchSpatialLayer(int32(0), false) - d.SwitchTemporalLayer(int32(0), false) + d.switchSpatialLayer(int32(0)) + d.switchTemporalLayer(int32(0)) return true, brs[0][0], optimalBandwidthNeeded } @@ -727,7 +736,7 @@ func (d *DownTrack) IncreaseAllocation() (bool, uint64, uint64) { // LK-TODO currentTemporalLayer may be outside available range because of inital value being out of range, fix it nextTemporalLayer := currentTemporalLayer + 1 if nextTemporalLayer <= d.maxTemporalLayer.get() && brs[currentSpatialLayer][nextTemporalLayer] > 0 { - d.SwitchTemporalLayer(nextTemporalLayer, false) + d.switchTemporalLayer(nextTemporalLayer) return true, brs[currentSpatialLayer][nextTemporalLayer], optimalBandwidthNeeded } @@ -735,8 +744,8 @@ func (d *DownTrack) IncreaseAllocation() (bool, uint64, uint64) { // LK-TODO currentTemporalLayer may be outside available range because of inital value being out of range, fix it nextSpatialLayer := currentSpatialLayer + 1 if nextSpatialLayer <= d.maxSpatialLayer.get() && brs[nextSpatialLayer][0] > 0 { - d.SwitchSpatialLayer(nextSpatialLayer, false) - d.SwitchTemporalLayer(0, false) + d.switchSpatialLayer(nextSpatialLayer) + d.switchTemporalLayer(0) return true, brs[nextSpatialLayer][0], optimalBandwidthNeeded } @@ -1303,13 +1312,13 @@ func (d *DownTrack) handleLayerChange(maxRatePacketLoss uint8, expectedMinBitrat if maxRatePacketLoss <= 5 { if currentTemporalLayer < mctl && currentTemporalLayer+1 <= d.maxTemporalLayer.get() && expectedMinBitrate >= 3*cbr/4 { - d.SwitchTemporalLayer(currentTemporalLayer+1, false) + d.switchTemporalLayer(currentTemporalLayer + 1) d.simulcast.switchDelay = time.Now().Add(3 * time.Second) } if currentTemporalLayer >= mctl && expectedMinBitrate >= 3*cbr/2 && currentSpatialLayer+1 <= d.maxSpatialLayer.get() && currentSpatialLayer+1 <= 2 { - if err := d.SwitchSpatialLayer(currentSpatialLayer+1, false); err == nil { - d.SwitchTemporalLayer(0, false) + if err := d.switchSpatialLayer(currentSpatialLayer + 1); err == nil { + d.switchTemporalLayer(0) } d.simulcast.switchDelay = time.Now().Add(5 * time.Second) } @@ -1318,12 +1327,12 @@ func (d *DownTrack) handleLayerChange(maxRatePacketLoss uint8, expectedMinBitrat if (expectedMinBitrate <= 5*cbr/8 || currentTemporalLayer == 0) && currentSpatialLayer > 0 && brs[currentSpatialLayer-1] != 0 { - if err := d.SwitchSpatialLayer(currentSpatialLayer-1, false); err != nil { - d.SwitchTemporalLayer(mtl[currentSpatialLayer-1], false) + if err := d.switchSpatialLayer(currentSpatialLayer - 1); err != nil { + d.switchTemporalLayer(mtl[currentSpatialLayer-1]) } d.simulcast.switchDelay = time.Now().Add(10 * time.Second) } else { - d.SwitchTemporalLayer(currentTemporalLayer-1, false) + d.switchTemporalLayer(currentTemporalLayer - 1) d.simulcast.switchDelay = time.Now().Add(5 * time.Second) } } diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index d0d80e685..70c1d25d0 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -215,7 +215,7 @@ func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buff if dt != nil { if (bestQualityFirst && layer > dt.CurrentSpatialLayer()) || (!bestQualityFirst && layer < dt.CurrentSpatialLayer()) { - _ = dt.SwitchSpatialLayer(layer, false) + _ = dt.switchSpatialLayer(layer) } } } @@ -281,8 +281,6 @@ func (w *WebRTCReceiver) AddDownTrack(track *DownTrack, bestQualityFirst bool) { w.upTrackMu.RUnlock() track.SetInitialLayers(int32(layer), 2) - track.maxSpatialLayer.set(2) - track.maxTemporalLayer.set(2) track.lastSSRC.set(w.SSRC(layer)) track.trackType = SimulcastDownTrack track.payload = packetFactory.Get().(*[]byte) diff --git a/pkg/sfu/streamallocator.go b/pkg/sfu/streamallocator.go index 9a0401003..b1cc19702 100644 --- a/pkg/sfu/streamallocator.go +++ b/pkg/sfu/streamallocator.go @@ -39,7 +39,9 @@ // This could happen due to publisher throttling layers due to upstream congestion // in its path. // - OnSubscriptionChanged: called when a down track settings are changed resulting -// from client side requests (muting/pausing a video or limiting maximum layer). +// from client side requests (muting/unmuting) +// - OnSubscribedLayersChanged: called when a down track settings are changed resulting +// from client side requests (limiting maximum layer). // - OnPacketSent: called when a media packet is forwarded by the down track. As // this happens once per forwarded packet, processing in this callback should be // kept to a minimum. @@ -88,7 +90,8 @@ // - Signal_AVAILABLE_LAYERS_ADD: Available layers of publisher changed, new layer(s) available. // - Signal_AVAILABLE_LAYERS_REMOVE: Available layers of publisher changed, some previously // available layer(s) not available anymore. -// - Signal_SUBSCRIPTION_CHANGE: Subscription changed (mute/requested layers changed). +// - Signal_SUBSCRIPTION_CHANGE: Subscription changed (mute/unmute) +// - Signal_SUBSCRIBED_LAYERS_CHANGE: Subscribed layers changed (requested layers changed). // - Signal_PERIODIC_PING: Periodic ping // // There are several interesting challenges which are documented in relevant code below. @@ -144,6 +147,7 @@ const ( Signal_AVAILABLE_LAYERS_ADD Signal_AVAILABLE_LAYERS_REMOVE Signal_SUBSCRIPTION_CHANGE + Signal_SUBSCRIBED_LAYERS_CHANGE Signal_PERIODIC_PING ) @@ -232,6 +236,7 @@ func (s *StreamAllocator) AddTrack(downTrack *DownTrack) { downTrack.AddReceiverReportListener(s.onReceiverReport) downTrack.OnAvailableLayersChanged(s.onAvailableLayersChanged) downTrack.OnSubscriptionChanged(s.onSubscriptionChanged) + downTrack.OnSubscribedLayersChanged(s.onSubscribedLayersChanged) downTrack.OnPacketSent(s.onPacketSent) s.tracksMu.Lock() @@ -351,12 +356,18 @@ func (s *StreamAllocator) onAvailableLayersChanged(downTrack *DownTrack, layerAd } } -// called when subscription settings changes +// called when subscription settings changes (muting/unmuting of track) func (s *StreamAllocator) onSubscriptionChanged(downTrack *DownTrack) { // LK-TODO: Look at processing specific downtrack s.postEvent(Signal_SUBSCRIPTION_CHANGE, downTrack) } +// called when subscribed layers changes (limiting max layers) +func (s *StreamAllocator) onSubscribedLayersChanged(downTrack *DownTrack, maxSpatialLayer int32, maxTemporalLayer int32) { + // LK-TODO: Look at processing specific downtrack + s.postEvent(Signal_SUBSCRIBED_LAYERS_CHANGE, downTrack) +} + // called when DownTrack sends a packet func (s *StreamAllocator) onPacketSent(downTrack *DownTrack, size int) { if downTrack.Kind() == webrtc.RTPCodecTypeAudio { @@ -502,6 +513,8 @@ func (s *StreamAllocator) runStatePreCommit(event Event) { s.allocate() case Signal_SUBSCRIPTION_CHANGE: s.allocate() + case Signal_SUBSCRIBED_LAYERS_CHANGE: + s.allocate() case Signal_PERIODIC_PING: } } @@ -523,6 +536,8 @@ func (s *StreamAllocator) runStateStable(event Event) { s.allocate() case Signal_SUBSCRIPTION_CHANGE: s.allocate() + case Signal_SUBSCRIBED_LAYERS_CHANGE: + s.allocate() case Signal_PERIODIC_PING: // if bandwidth estimate has been stable for a while, maybe gratuitously probe s.maybeGratuitousProbe() @@ -566,6 +581,8 @@ func (s *StreamAllocator) runStateDeficient(event Event) { s.allocate() case Signal_SUBSCRIPTION_CHANGE: s.allocate() + case Signal_SUBSCRIBED_LAYERS_CHANGE: + s.allocate() case Signal_PERIODIC_PING: s.maybeProbe() } @@ -599,6 +616,9 @@ func (s *StreamAllocator) runStateGratuitousProbing(event Event) { case Signal_SUBSCRIPTION_CHANGE: s.prober.Reset() s.allocate() + case Signal_SUBSCRIBED_LAYERS_CHANGE: + s.prober.Reset() + s.allocate() case Signal_PERIODIC_PING: if !s.prober.IsRunning() { // try for more @@ -984,6 +1004,9 @@ type Track struct { bandwidthRequested uint64 optimalBandwidthNeeded uint64 + + maxSpatialLayer int32 + maxTemporalLayer int32 } func NewTrack(downTrack *DownTrack) *Track {