From 0dc92ef2739de0c263878af68750cf165ac6bffc Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Wed, 2 Aug 2023 14:02:29 +0530 Subject: [PATCH] Remove parked layer feature. (#1927) * Remove parked layer feature. Not worth the added complexity. Several reasons - Not seeing black frames on pub mute always. - If they are there, it can consume more than 30kbps if the parked layer is high res. That is wasted bandwidth downstream when pub is muted. - On resume, client some time sends PLI and that triggers a key frame request. But, leaving the separate `PubMuted` flag in forwarder in case we can use it for better handling. * need the request spatial --- pkg/rtc/subscribedtrack.go | 14 +- pkg/sfu/downtrack.go | 5 - pkg/sfu/forwarder.go | 121 ++---------------- pkg/sfu/forwarder_test.go | 23 +--- pkg/sfu/videolayerselector/base.go | 20 +-- .../dependencydescriptor.go | 4 - pkg/sfu/videolayerselector/simulcast.go | 52 +++----- .../videolayerselector/videolayerselector.go | 3 - 8 files changed, 36 insertions(+), 206 deletions(-) diff --git a/pkg/rtc/subscribedtrack.go b/pkg/rtc/subscribedtrack.go index c586ec3a7..10bf25fcb 100644 --- a/pkg/rtc/subscribedtrack.go +++ b/pkg/rtc/subscribedtrack.go @@ -215,14 +215,14 @@ func (t *SubscribedTrack) UpdateVideoLayer() { return } - t.logger.Debugw("updating video layer", - "settings", settings, - ) + t.logger.Debugw("updating video layer", "settings", settings) - spatial := t.spatialLayerFromSettings(settings) - t.DownTrack().SetMaxSpatialLayer(spatial) - if settings.Fps > 0 { - t.DownTrack().SetMaxTemporalLayer(t.MediaTrack().GetTemporalLayerForSpatialFps(spatial, settings.Fps, t.DownTrack().Codec().MimeType)) + if settings.Width > 0 || settings.Fps > 0 { + spatial := t.spatialLayerFromSettings(settings) + t.DownTrack().SetMaxSpatialLayer(spatial) + if settings.Fps > 0 { + t.DownTrack().SetMaxTemporalLayer(t.MediaTrack().GetTemporalLayerForSpatialFps(spatial, settings.Fps, t.DownTrack().Codec().MimeType)) + } } } diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index d1d10b92c..8f66820e2 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -302,11 +302,6 @@ func NewDownTrack(params DowntrackParams) (*DownTrack, error) { d.params.Receiver.GetReferenceLayerRTPTimestamp, d.getExpectedRTPTimestamp, ) - d.forwarder.OnParkedLayerExpired(func() { - if sal := d.getStreamAllocatorListener(); sal != nil { - sal.OnSubscriptionChanged(d) - } - }) d.rtpStats = buffer.NewRTPStats(buffer.RTPStatsParams{ ClockRate: d.codec.ClockRate, diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index ec7db7753..6ab522da1 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -36,10 +36,9 @@ import ( // Forwarder const ( - FlagPauseOnDowngrade = true - FlagFilterRTX = true - TransitionCostSpatial = 10 - ParkedLayerWaitDuration = 2 * time.Second + FlagPauseOnDowngrade = true + FlagFilterRTX = true + TransitionCostSpatial = 10 ResumeBehindThresholdSeconds = float64(0.1) // 100ms LayerSwitchBehindThresholdSeconds = float64(0.05) // 50ms @@ -124,7 +123,6 @@ type VideoAllocationProvisional struct { Bitrates Bitrates maxLayer buffer.VideoLayer currentLayer buffer.VideoLayer - parkedLayer buffer.VideoLayer allocatedLayer buffer.VideoLayer } @@ -199,8 +197,6 @@ type Forwarder struct { referenceLayerSpatial int32 refTSOffset uint32 - parkedLayerTimer *time.Timer - provisional *VideoAllocationProvisional lastAllocation VideoAllocation @@ -210,8 +206,6 @@ type Forwarder struct { vls videolayerselector.VideoLayerSelector codecMunger codecmunger.CodecMunger - - onParkedLayerExpired func() } func NewForwarder( @@ -266,20 +260,6 @@ func (f *Forwarder) SetMaxTemporalLayerSeen(maxTemporalLayerSeen int32) bool { return true } -func (f *Forwarder) OnParkedLayerExpired(fn func()) { - f.lock.Lock() - defer f.lock.Unlock() - - f.onParkedLayerExpired = fn -} - -func (f *Forwarder) getOnParkedLayerExpired() func() { - f.lock.RLock() - defer f.lock.RUnlock() - - return f.onParkedLayerExpired -} - func (f *Forwarder) DetermineCodec(codec webrtc.RTPCodecCapability, extensions []webrtc.RTPHeaderExtensionParameter) { f.lock.Lock() defer f.lock.Unlock() @@ -428,22 +408,10 @@ func (f *Forwarder) PubMute(pubMuted bool) bool { f.logger.Debugw("setting forwarder pub mute", "pubMuted", pubMuted) f.pubMuted = pubMuted - if f.kind == webrtc.RTPCodecTypeAudio { - // for audio resync when pub muted so that sequence numbers do not jump on unmute - // audio stops forwarding during pub mute too - if pubMuted { - f.resyncLocked() - } - } else { - // Do not resync on publisher mute as forwarding can continue on unmute using same layer. - // On unmute, park current layers as streaming can continue without a key frame when publisher starts the stream. - targetLayer := f.vls.GetTarget() - if !pubMuted && targetLayer.IsValid() && f.vls.GetCurrent().Spatial == targetLayer.Spatial { - f.setupParkedLayer(targetLayer) - f.vls.SetCurrent(buffer.InvalidLayer) - } + // resync when pub muted so that sequence numbers do not jump on unmute + if pubMuted { + f.resyncLocked() } - return true } @@ -476,9 +444,6 @@ func (f *Forwarder) SetMaxSpatialLayer(spatialLayer int32) (bool, buffer.VideoLa f.logger.Debugw("setting max spatial layer", "layer", spatialLayer) f.vls.SetMaxSpatial(spatialLayer) - - f.clearParkedLayer() - return true, f.vls.GetMax() } @@ -497,9 +462,6 @@ func (f *Forwarder) SetMaxTemporalLayer(temporalLayer int32) (bool, buffer.Video f.logger.Debugw("setting max temporal layer", "layer", temporalLayer) f.vls.SetMaxTemporal(temporalLayer) - - f.clearParkedLayer() - return true, f.vls.GetMax() } @@ -607,7 +569,6 @@ func (f *Forwarder) AllocateOptimal(availableLayers []int32, brs Bitrates, allow maxLayer := f.vls.GetMax() maxSeenLayer := f.vls.GetMaxSeen() - parkedLayer := f.vls.GetParked() currentLayer := f.vls.GetCurrent() requestSpatial := f.vls.GetRequestSpatial() alloc := VideoAllocation{ @@ -653,14 +614,6 @@ func (f *Forwarder) AllocateOptimal(availableLayers []int32, brs Bitrates, allow case f.pubMuted: alloc.PauseReason = VideoPauseReasonPubMuted - // leave it at current layers for opportunistic resume - alloc.TargetLayer = currentLayer - alloc.RequestLayerSpatial = alloc.TargetLayer.Spatial - - case parkedLayer.IsValid(): - // if parked on a layer, let it continue - alloc.TargetLayer = parkedLayer - alloc.RequestLayerSpatial = alloc.TargetLayer.Spatial default: // lots of different events could end up here @@ -754,7 +707,6 @@ func (f *Forwarder) ProvisionalAllocatePrepare(availableLayers []int32, Bitrates Bitrates: Bitrates, maxLayer: f.vls.GetMax(), currentLayer: f.vls.GetCurrent(), - parkedLayer: f.vls.GetParked(), } f.provisional.availableLayers = make([]int32, len(availableLayers)) @@ -837,19 +789,11 @@ func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot b existingTargetLayer := f.vls.GetTarget() if f.provisional.muted || f.provisional.pubMuted { - bandwidthRequired := int64(0) f.provisional.allocatedLayer = buffer.InvalidLayer - if f.provisional.pubMuted { - // leave it at current for opportunistic forwarding, there is still bandwidth saving with publisher mute - f.provisional.allocatedLayer = f.provisional.currentLayer - if f.provisional.allocatedLayer.IsValid() { - bandwidthRequired = f.provisional.Bitrates[f.provisional.allocatedLayer.Spatial][f.provisional.allocatedLayer.Temporal] - } - } return VideoTransition{ From: f.vls.GetTarget(), To: f.provisional.allocatedLayer, - BandwidthDelta: bandwidthRequired - getBandwidthNeeded(f.provisional.Bitrates, existingTargetLayer, f.lastAllocation.BandwidthRequested), + BandwidthDelta: -getBandwidthNeeded(f.provisional.Bitrates, existingTargetLayer, f.lastAllocation.BandwidthRequested), } } @@ -941,12 +885,7 @@ func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot b // if nothing available, just leave target at current to enable opportunistic forwarding in case current resumes if !targetLayer.IsValid() { - if f.provisional.parkedLayer.IsValid() { - targetLayer = f.provisional.parkedLayer - } else { - targetLayer = f.provisional.currentLayer - } - + targetLayer = f.provisional.currentLayer if targetLayer.IsValid() { bandwidthRequired = f.provisional.Bitrates[targetLayer.Spatial][targetLayer.Temporal] } @@ -981,7 +920,6 @@ func (f *Forwarder) ProvisionalAllocateGetBestWeightedTransition() VideoTransiti targetLayer := f.vls.GetTarget() if f.provisional.muted || f.provisional.pubMuted { - // if publisher muted, give up opportunistic resume and give back the bandwidth f.provisional.allocatedLayer = buffer.InvalidLayer return VideoTransition{ From: targetLayer, @@ -1007,11 +945,7 @@ func (f *Forwarder) ProvisionalAllocateGetBestWeightedTransition() VideoTransiti // feed has gone dry, just leave target at current to enable opportunistic forwarding in case current resumes. // Note that this is giving back bits and opportunistic forwarding resuming might trigger congestion again, // but that should be handled by stream allocator. - if f.provisional.parkedLayer.IsValid() { - f.provisional.allocatedLayer = f.provisional.parkedLayer - } else { - f.provisional.allocatedLayer = f.provisional.currentLayer - } + f.provisional.allocatedLayer = f.provisional.currentLayer return VideoTransition{ From: targetLayer, To: f.provisional.allocatedLayer, @@ -1138,7 +1072,6 @@ func (f *Forwarder) ProvisionalAllocateCommit() VideoAllocation { } } - f.clearParkedLayer() return f.updateAllocation(alloc, "cooperative") } @@ -1382,7 +1315,6 @@ func (f *Forwarder) Pause(availableLayers []int32, brs Bitrates) VideoAllocation alloc.PauseReason = VideoPauseReasonBandwidth } - f.clearParkedLayer() return f.updateAllocation(alloc, "pause") } @@ -1427,31 +1359,6 @@ func (f *Forwarder) Resync() { func (f *Forwarder) resyncLocked() { f.vls.SetCurrent(buffer.InvalidLayer) f.lastSSRC = 0 - f.clearParkedLayer() -} - -func (f *Forwarder) clearParkedLayer() { - f.vls.SetParked(buffer.InvalidLayer) - if f.parkedLayerTimer != nil { - f.parkedLayerTimer.Stop() - f.parkedLayerTimer = nil - } -} - -func (f *Forwarder) setupParkedLayer(parkedLayer buffer.VideoLayer) { - f.clearParkedLayer() - - f.vls.SetParked(parkedLayer) - f.parkedLayerTimer = time.AfterFunc(ParkedLayerWaitDuration, func() { - f.lock.Lock() - notify := f.vls.GetParked().IsValid() - f.clearParkedLayer() - f.lock.Unlock() - - if onParkedLayerExpired := f.getOnParkedLayerExpired(); onParkedLayerExpired != nil && notify { - onParkedLayerExpired() - } - }) } func (f *Forwarder) CheckSync() (locked bool, layer int32) { @@ -1495,8 +1402,7 @@ func (f *Forwarder) GetTranslationParams(extPkt *buffer.ExtPacket, layer int32) f.lock.Lock() defer f.lock.Unlock() - // Video: Do not drop on publisher mute to enable resume on publisher unmute without a key frame. - if f.muted { + if f.muted || f.pubMuted { return &TranslationParams{ shouldDrop: true, }, nil @@ -1504,13 +1410,6 @@ func (f *Forwarder) GetTranslationParams(extPkt *buffer.ExtPacket, layer int32) switch f.kind { case webrtc.RTPCodecTypeAudio: - // Audio: Blank frames are injected on publisher mute to ensure decoder does not get stuck at a noise frame. So, do not forward. - if f.pubMuted { - return &TranslationParams{ - shouldDrop: true, - }, nil - } - return f.getTranslationParamsAudio(extPkt, layer) case webrtc.RTPCodecTypeVideo: return f.getTranslationParamsVideo(extPkt, layer) diff --git a/pkg/sfu/forwarder_test.go b/pkg/sfu/forwarder_test.go index 935009792..01b224634 100644 --- a/pkg/sfu/forwarder_test.go +++ b/pkg/sfu/forwarder_test.go @@ -201,34 +201,13 @@ func TestForwarderAllocateOptimal(t *testing.T) { f.PubMute(false) - // when parked layers valid, should stay there - f.vls.SetParked(buffer.VideoLayer{ - Spatial: 0, - Temporal: 1, - }) - expectedResult = VideoAllocation{ - PauseReason: VideoPauseReasonFeedDry, - BandwidthRequested: 0, - BandwidthDelta: 0, - Bitrates: emptyBitrates, - TargetLayer: f.vls.GetParked(), - RequestLayerSpatial: f.vls.GetParked().Spatial, - MaxLayer: buffer.DefaultMaxLayer, - DistanceToDesired: 0, - } - result = f.AllocateOptimal(nil, emptyBitrates, true) - require.Equal(t, expectedResult, result) - require.Equal(t, expectedResult, f.lastAllocation) - require.Equal(t, f.vls.GetParked(), f.TargetLayer()) - f.vls.SetParked(buffer.InvalidLayer) - // when max layers changes, target is opportunistic, but requested spatial layer should be at max f.SetMaxTemporalLayerSeen(3) f.vls.SetMax(buffer.VideoLayer{Spatial: 1, Temporal: 3}) expectedResult = VideoAllocation{ PauseReason: VideoPauseReasonNone, BandwidthRequested: bitrates[1][3], - BandwidthDelta: bitrates[1][3] - bitrates[0][1], + BandwidthDelta: bitrates[1][3], BandwidthNeeded: bitrates[1][3], Bitrates: bitrates, TargetLayer: buffer.DefaultMaxLayer, diff --git a/pkg/sfu/videolayerselector/base.go b/pkg/sfu/videolayerselector/base.go index 6708e7030..346632cb9 100644 --- a/pkg/sfu/videolayerselector/base.go +++ b/pkg/sfu/videolayerselector/base.go @@ -33,9 +33,6 @@ type Base struct { requestSpatial int32 - parkedLayer buffer.VideoLayer - previousParkedLayer buffer.VideoLayer - currentLayer buffer.VideoLayer previousLayer buffer.VideoLayer } @@ -48,8 +45,6 @@ func NewBase(logger logger.Logger) *Base { targetLayer: buffer.InvalidLayer, // start off with nothing, let streamallocator/opportunistic forwarder set the target previousTargetLayer: buffer.InvalidLayer, requestSpatial: buffer.InvalidLayerSpatial, - parkedLayer: buffer.InvalidLayer, - previousParkedLayer: buffer.InvalidLayer, currentLayer: buffer.InvalidLayer, previousLayer: buffer.InvalidLayer, } @@ -98,7 +93,7 @@ func (b *Base) GetRequestSpatial() int32 { func (b *Base) CheckSync() (locked bool, layer int32) { layer = b.GetRequestSpatial() - locked = layer == b.GetCurrent().Spatial || b.GetParked().IsValid() + locked = layer == b.GetCurrent().Spatial return } @@ -118,14 +113,6 @@ func (b *Base) GetMaxSeen() buffer.VideoLayer { return b.maxSeenLayer } -func (b *Base) SetParked(parkedLayer buffer.VideoLayer) { - b.parkedLayer = parkedLayer -} - -func (b *Base) GetParked() buffer.VideoLayer { - return b.parkedLayer -} - func (b *Base) SetCurrent(currentLayer buffer.VideoLayer) { b.currentLayer = currentLayer } @@ -143,15 +130,12 @@ func (b *Base) Rollback() { "rolling back", "previous", b.previousLayer, "current", b.currentLayer, - "previousParked", b.previousParkedLayer, - "parked", b.parkedLayer, "previousTarget", b.previousTargetLayer, "target", b.targetLayer, "max", b.maxLayer, "req", b.requestSpatial, "maxSeen", b.maxSeenLayer, ) - b.parkedLayer = b.previousParkedLayer b.currentLayer = b.previousLayer b.targetLayer = b.previousTargetLayer } @@ -170,8 +154,6 @@ func (b *Base) SelectTemporal(extPkt *buffer.ExtPacket) (int32, bool) { "updating temporal layer", "previous", b.previousLayer, "current", b.currentLayer, - "previousParked", b.previousParkedLayer, - "parked", b.parkedLayer, "previousTarget", b.previousTargetLayer, "target", b.targetLayer, "max", b.maxLayer, diff --git a/pkg/sfu/videolayerselector/dependencydescriptor.go b/pkg/sfu/videolayerselector/dependencydescriptor.go index 555437c6d..9d29427be 100644 --- a/pkg/sfu/videolayerselector/dependencydescriptor.go +++ b/pkg/sfu/videolayerselector/dependencydescriptor.go @@ -286,10 +286,6 @@ func (d *DependencyDescriptor) updateActiveDecodeTargets(activeDecodeTargetsBitm func (d *DependencyDescriptor) CheckSync() (locked bool, layer int32) { layer = d.GetRequestSpatial() - if d.GetParked().IsValid() { - return true, layer - } - d.decodeTargetsLock.RLock() defer d.decodeTargetsLock.RUnlock() for _, dt := range d.decodeTargets { diff --git a/pkg/sfu/videolayerselector/simulcast.go b/pkg/sfu/videolayerselector/simulcast.go index b4a5c527e..6b20787b7 100644 --- a/pkg/sfu/videolayerselector/simulcast.go +++ b/pkg/sfu/videolayerselector/simulcast.go @@ -40,10 +40,8 @@ func (s *Simulcast) IsOvershootOkay() bool { } func (s *Simulcast) Select(extPkt *buffer.ExtPacket, layer int32) (result VideoLayerSelectorResult) { - populateSwitches := func(isSwitching bool, isActive bool, reason string) { - if isSwitching { - result.IsSwitching = true - } + populateSwitches := func(isActive bool, reason string) { + result.IsSwitching = true if !isActive { result.IsResuming = true @@ -54,8 +52,6 @@ func (s *Simulcast) Select(extPkt *buffer.ExtPacket, layer int32) (result VideoL reason, "previous", s.previousLayer, "current", s.currentLayer, - "previousParked", s.previousParkedLayer, - "parked", s.parkedLayer, "previousTarget", s.previousTargetLayer, "target", s.targetLayer, "max", s.maxLayer, @@ -70,44 +66,30 @@ func (s *Simulcast) Select(extPkt *buffer.ExtPacket, layer int32) (result VideoL if s.currentLayer.Spatial != s.targetLayer.Spatial { currentLayer := s.currentLayer - // Three things to check when not locked to target - // 1. Resumable layer - don't need a key frame - // 2. Opportunistic layer upgrade - needs a key frame - // 3. Need to downgrade - needs a key frame - isSwitching := true + // Two things to check when not locked to target + // 1. Opportunistic layer upgrade - needs a key frame + // 2. Need to downgrade - needs a key frame isActive := s.currentLayer.IsValid() found := false reason := "" - if s.parkedLayer.IsValid() { - if s.parkedLayer.Spatial == layer { - reason = "resuming at parked layer" - currentLayer = s.parkedLayer - isSwitching = false + if extPkt.KeyFrame { + if layer > s.currentLayer.Spatial && layer <= s.targetLayer.Spatial { + reason = "upgrading layer" found = true } - } else { - if extPkt.KeyFrame { - if layer > s.currentLayer.Spatial && layer <= s.targetLayer.Spatial { - reason = "upgrading layer" - found = true - } - if layer < s.currentLayer.Spatial && layer >= s.targetLayer.Spatial { - reason = "downgrading layer" - found = true - } + if layer < s.currentLayer.Spatial && layer >= s.targetLayer.Spatial { + reason = "downgrading layer" + found = true + } - if found { - currentLayer.Spatial = layer - currentLayer.Temporal = extPkt.VideoLayer.Temporal - } + if found { + currentLayer.Spatial = layer + currentLayer.Temporal = extPkt.VideoLayer.Temporal } } if found { - s.previousParkedLayer = s.parkedLayer - s.parkedLayer = buffer.InvalidLayer - s.previousLayer = s.currentLayer s.currentLayer = currentLayer @@ -116,7 +98,7 @@ func (s *Simulcast) Select(extPkt *buffer.ExtPacket, layer int32) (result VideoL s.targetLayer.Spatial = s.currentLayer.Spatial } - populateSwitches(isSwitching, isActive, reason) + populateSwitches(isActive, reason) } } @@ -130,7 +112,7 @@ func (s *Simulcast) Select(extPkt *buffer.ExtPacket, layer int32) (result VideoL s.targetLayer.Spatial = layer } - populateSwitches(true, true, "adjusting overshoot") + populateSwitches(true, "adjusting overshoot") } result.RTPMarker = extPkt.Packet.Marker diff --git a/pkg/sfu/videolayerselector/videolayerselector.go b/pkg/sfu/videolayerselector/videolayerselector.go index 545196eae..186c8f64d 100644 --- a/pkg/sfu/videolayerselector/videolayerselector.go +++ b/pkg/sfu/videolayerselector/videolayerselector.go @@ -51,9 +51,6 @@ type VideoLayerSelector interface { SetMaxSeenTemporal(layer int32) GetMaxSeen() buffer.VideoLayer - SetParked(parkedLayer buffer.VideoLayer) - GetParked() buffer.VideoLayer - SetCurrent(currentLayer buffer.VideoLayer) GetCurrent() buffer.VideoLayer