Remove parked layer feature. (#1927)

* Remove parked layer feature.

Not worth the added complexity.

Several reasons
- Not seeing black frames on pub mute always.
- If they are there, it can consume more than 30kbps if the parked layer
  is high res. That is wasted bandwidth downstream when pub is muted.
- On resume, client some time sends PLI and that triggers a key frame
  request.

But, leaving the separate `PubMuted` flag in forwarder in case we can
use it for better handling.

* need the request spatial
This commit is contained in:
Raja Subramanian
2023-08-02 14:02:29 +05:30
committed by GitHub
parent f3a0e3e71c
commit 0dc92ef273
8 changed files with 36 additions and 206 deletions
+7 -7
View File
@@ -215,14 +215,14 @@ func (t *SubscribedTrack) UpdateVideoLayer() {
return
}
t.logger.Debugw("updating video layer",
"settings", settings,
)
t.logger.Debugw("updating video layer", "settings", settings)
spatial := t.spatialLayerFromSettings(settings)
t.DownTrack().SetMaxSpatialLayer(spatial)
if settings.Fps > 0 {
t.DownTrack().SetMaxTemporalLayer(t.MediaTrack().GetTemporalLayerForSpatialFps(spatial, settings.Fps, t.DownTrack().Codec().MimeType))
if settings.Width > 0 || settings.Fps > 0 {
spatial := t.spatialLayerFromSettings(settings)
t.DownTrack().SetMaxSpatialLayer(spatial)
if settings.Fps > 0 {
t.DownTrack().SetMaxTemporalLayer(t.MediaTrack().GetTemporalLayerForSpatialFps(spatial, settings.Fps, t.DownTrack().Codec().MimeType))
}
}
}
-5
View File
@@ -302,11 +302,6 @@ func NewDownTrack(params DowntrackParams) (*DownTrack, error) {
d.params.Receiver.GetReferenceLayerRTPTimestamp,
d.getExpectedRTPTimestamp,
)
d.forwarder.OnParkedLayerExpired(func() {
if sal := d.getStreamAllocatorListener(); sal != nil {
sal.OnSubscriptionChanged(d)
}
})
d.rtpStats = buffer.NewRTPStats(buffer.RTPStatsParams{
ClockRate: d.codec.ClockRate,
+10 -111
View File
@@ -36,10 +36,9 @@ import (
// Forwarder
const (
FlagPauseOnDowngrade = true
FlagFilterRTX = true
TransitionCostSpatial = 10
ParkedLayerWaitDuration = 2 * time.Second
FlagPauseOnDowngrade = true
FlagFilterRTX = true
TransitionCostSpatial = 10
ResumeBehindThresholdSeconds = float64(0.1) // 100ms
LayerSwitchBehindThresholdSeconds = float64(0.05) // 50ms
@@ -124,7 +123,6 @@ type VideoAllocationProvisional struct {
Bitrates Bitrates
maxLayer buffer.VideoLayer
currentLayer buffer.VideoLayer
parkedLayer buffer.VideoLayer
allocatedLayer buffer.VideoLayer
}
@@ -199,8 +197,6 @@ type Forwarder struct {
referenceLayerSpatial int32
refTSOffset uint32
parkedLayerTimer *time.Timer
provisional *VideoAllocationProvisional
lastAllocation VideoAllocation
@@ -210,8 +206,6 @@ type Forwarder struct {
vls videolayerselector.VideoLayerSelector
codecMunger codecmunger.CodecMunger
onParkedLayerExpired func()
}
func NewForwarder(
@@ -266,20 +260,6 @@ func (f *Forwarder) SetMaxTemporalLayerSeen(maxTemporalLayerSeen int32) bool {
return true
}
func (f *Forwarder) OnParkedLayerExpired(fn func()) {
f.lock.Lock()
defer f.lock.Unlock()
f.onParkedLayerExpired = fn
}
func (f *Forwarder) getOnParkedLayerExpired() func() {
f.lock.RLock()
defer f.lock.RUnlock()
return f.onParkedLayerExpired
}
func (f *Forwarder) DetermineCodec(codec webrtc.RTPCodecCapability, extensions []webrtc.RTPHeaderExtensionParameter) {
f.lock.Lock()
defer f.lock.Unlock()
@@ -428,22 +408,10 @@ func (f *Forwarder) PubMute(pubMuted bool) bool {
f.logger.Debugw("setting forwarder pub mute", "pubMuted", pubMuted)
f.pubMuted = pubMuted
if f.kind == webrtc.RTPCodecTypeAudio {
// for audio resync when pub muted so that sequence numbers do not jump on unmute
// audio stops forwarding during pub mute too
if pubMuted {
f.resyncLocked()
}
} else {
// Do not resync on publisher mute as forwarding can continue on unmute using same layer.
// On unmute, park current layers as streaming can continue without a key frame when publisher starts the stream.
targetLayer := f.vls.GetTarget()
if !pubMuted && targetLayer.IsValid() && f.vls.GetCurrent().Spatial == targetLayer.Spatial {
f.setupParkedLayer(targetLayer)
f.vls.SetCurrent(buffer.InvalidLayer)
}
// resync when pub muted so that sequence numbers do not jump on unmute
if pubMuted {
f.resyncLocked()
}
return true
}
@@ -476,9 +444,6 @@ func (f *Forwarder) SetMaxSpatialLayer(spatialLayer int32) (bool, buffer.VideoLa
f.logger.Debugw("setting max spatial layer", "layer", spatialLayer)
f.vls.SetMaxSpatial(spatialLayer)
f.clearParkedLayer()
return true, f.vls.GetMax()
}
@@ -497,9 +462,6 @@ func (f *Forwarder) SetMaxTemporalLayer(temporalLayer int32) (bool, buffer.Video
f.logger.Debugw("setting max temporal layer", "layer", temporalLayer)
f.vls.SetMaxTemporal(temporalLayer)
f.clearParkedLayer()
return true, f.vls.GetMax()
}
@@ -607,7 +569,6 @@ func (f *Forwarder) AllocateOptimal(availableLayers []int32, brs Bitrates, allow
maxLayer := f.vls.GetMax()
maxSeenLayer := f.vls.GetMaxSeen()
parkedLayer := f.vls.GetParked()
currentLayer := f.vls.GetCurrent()
requestSpatial := f.vls.GetRequestSpatial()
alloc := VideoAllocation{
@@ -653,14 +614,6 @@ func (f *Forwarder) AllocateOptimal(availableLayers []int32, brs Bitrates, allow
case f.pubMuted:
alloc.PauseReason = VideoPauseReasonPubMuted
// leave it at current layers for opportunistic resume
alloc.TargetLayer = currentLayer
alloc.RequestLayerSpatial = alloc.TargetLayer.Spatial
case parkedLayer.IsValid():
// if parked on a layer, let it continue
alloc.TargetLayer = parkedLayer
alloc.RequestLayerSpatial = alloc.TargetLayer.Spatial
default:
// lots of different events could end up here
@@ -754,7 +707,6 @@ func (f *Forwarder) ProvisionalAllocatePrepare(availableLayers []int32, Bitrates
Bitrates: Bitrates,
maxLayer: f.vls.GetMax(),
currentLayer: f.vls.GetCurrent(),
parkedLayer: f.vls.GetParked(),
}
f.provisional.availableLayers = make([]int32, len(availableLayers))
@@ -837,19 +789,11 @@ func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot b
existingTargetLayer := f.vls.GetTarget()
if f.provisional.muted || f.provisional.pubMuted {
bandwidthRequired := int64(0)
f.provisional.allocatedLayer = buffer.InvalidLayer
if f.provisional.pubMuted {
// leave it at current for opportunistic forwarding, there is still bandwidth saving with publisher mute
f.provisional.allocatedLayer = f.provisional.currentLayer
if f.provisional.allocatedLayer.IsValid() {
bandwidthRequired = f.provisional.Bitrates[f.provisional.allocatedLayer.Spatial][f.provisional.allocatedLayer.Temporal]
}
}
return VideoTransition{
From: f.vls.GetTarget(),
To: f.provisional.allocatedLayer,
BandwidthDelta: bandwidthRequired - getBandwidthNeeded(f.provisional.Bitrates, existingTargetLayer, f.lastAllocation.BandwidthRequested),
BandwidthDelta: -getBandwidthNeeded(f.provisional.Bitrates, existingTargetLayer, f.lastAllocation.BandwidthRequested),
}
}
@@ -941,12 +885,7 @@ func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot b
// if nothing available, just leave target at current to enable opportunistic forwarding in case current resumes
if !targetLayer.IsValid() {
if f.provisional.parkedLayer.IsValid() {
targetLayer = f.provisional.parkedLayer
} else {
targetLayer = f.provisional.currentLayer
}
targetLayer = f.provisional.currentLayer
if targetLayer.IsValid() {
bandwidthRequired = f.provisional.Bitrates[targetLayer.Spatial][targetLayer.Temporal]
}
@@ -981,7 +920,6 @@ func (f *Forwarder) ProvisionalAllocateGetBestWeightedTransition() VideoTransiti
targetLayer := f.vls.GetTarget()
if f.provisional.muted || f.provisional.pubMuted {
// if publisher muted, give up opportunistic resume and give back the bandwidth
f.provisional.allocatedLayer = buffer.InvalidLayer
return VideoTransition{
From: targetLayer,
@@ -1007,11 +945,7 @@ func (f *Forwarder) ProvisionalAllocateGetBestWeightedTransition() VideoTransiti
// feed has gone dry, just leave target at current to enable opportunistic forwarding in case current resumes.
// Note that this is giving back bits and opportunistic forwarding resuming might trigger congestion again,
// but that should be handled by stream allocator.
if f.provisional.parkedLayer.IsValid() {
f.provisional.allocatedLayer = f.provisional.parkedLayer
} else {
f.provisional.allocatedLayer = f.provisional.currentLayer
}
f.provisional.allocatedLayer = f.provisional.currentLayer
return VideoTransition{
From: targetLayer,
To: f.provisional.allocatedLayer,
@@ -1138,7 +1072,6 @@ func (f *Forwarder) ProvisionalAllocateCommit() VideoAllocation {
}
}
f.clearParkedLayer()
return f.updateAllocation(alloc, "cooperative")
}
@@ -1382,7 +1315,6 @@ func (f *Forwarder) Pause(availableLayers []int32, brs Bitrates) VideoAllocation
alloc.PauseReason = VideoPauseReasonBandwidth
}
f.clearParkedLayer()
return f.updateAllocation(alloc, "pause")
}
@@ -1427,31 +1359,6 @@ func (f *Forwarder) Resync() {
func (f *Forwarder) resyncLocked() {
f.vls.SetCurrent(buffer.InvalidLayer)
f.lastSSRC = 0
f.clearParkedLayer()
}
func (f *Forwarder) clearParkedLayer() {
f.vls.SetParked(buffer.InvalidLayer)
if f.parkedLayerTimer != nil {
f.parkedLayerTimer.Stop()
f.parkedLayerTimer = nil
}
}
func (f *Forwarder) setupParkedLayer(parkedLayer buffer.VideoLayer) {
f.clearParkedLayer()
f.vls.SetParked(parkedLayer)
f.parkedLayerTimer = time.AfterFunc(ParkedLayerWaitDuration, func() {
f.lock.Lock()
notify := f.vls.GetParked().IsValid()
f.clearParkedLayer()
f.lock.Unlock()
if onParkedLayerExpired := f.getOnParkedLayerExpired(); onParkedLayerExpired != nil && notify {
onParkedLayerExpired()
}
})
}
func (f *Forwarder) CheckSync() (locked bool, layer int32) {
@@ -1495,8 +1402,7 @@ func (f *Forwarder) GetTranslationParams(extPkt *buffer.ExtPacket, layer int32)
f.lock.Lock()
defer f.lock.Unlock()
// Video: Do not drop on publisher mute to enable resume on publisher unmute without a key frame.
if f.muted {
if f.muted || f.pubMuted {
return &TranslationParams{
shouldDrop: true,
}, nil
@@ -1504,13 +1410,6 @@ func (f *Forwarder) GetTranslationParams(extPkt *buffer.ExtPacket, layer int32)
switch f.kind {
case webrtc.RTPCodecTypeAudio:
// Audio: Blank frames are injected on publisher mute to ensure decoder does not get stuck at a noise frame. So, do not forward.
if f.pubMuted {
return &TranslationParams{
shouldDrop: true,
}, nil
}
return f.getTranslationParamsAudio(extPkt, layer)
case webrtc.RTPCodecTypeVideo:
return f.getTranslationParamsVideo(extPkt, layer)
+1 -22
View File
@@ -201,34 +201,13 @@ func TestForwarderAllocateOptimal(t *testing.T) {
f.PubMute(false)
// when parked layers valid, should stay there
f.vls.SetParked(buffer.VideoLayer{
Spatial: 0,
Temporal: 1,
})
expectedResult = VideoAllocation{
PauseReason: VideoPauseReasonFeedDry,
BandwidthRequested: 0,
BandwidthDelta: 0,
Bitrates: emptyBitrates,
TargetLayer: f.vls.GetParked(),
RequestLayerSpatial: f.vls.GetParked().Spatial,
MaxLayer: buffer.DefaultMaxLayer,
DistanceToDesired: 0,
}
result = f.AllocateOptimal(nil, emptyBitrates, true)
require.Equal(t, expectedResult, result)
require.Equal(t, expectedResult, f.lastAllocation)
require.Equal(t, f.vls.GetParked(), f.TargetLayer())
f.vls.SetParked(buffer.InvalidLayer)
// when max layers changes, target is opportunistic, but requested spatial layer should be at max
f.SetMaxTemporalLayerSeen(3)
f.vls.SetMax(buffer.VideoLayer{Spatial: 1, Temporal: 3})
expectedResult = VideoAllocation{
PauseReason: VideoPauseReasonNone,
BandwidthRequested: bitrates[1][3],
BandwidthDelta: bitrates[1][3] - bitrates[0][1],
BandwidthDelta: bitrates[1][3],
BandwidthNeeded: bitrates[1][3],
Bitrates: bitrates,
TargetLayer: buffer.DefaultMaxLayer,
+1 -19
View File
@@ -33,9 +33,6 @@ type Base struct {
requestSpatial int32
parkedLayer buffer.VideoLayer
previousParkedLayer buffer.VideoLayer
currentLayer buffer.VideoLayer
previousLayer buffer.VideoLayer
}
@@ -48,8 +45,6 @@ func NewBase(logger logger.Logger) *Base {
targetLayer: buffer.InvalidLayer, // start off with nothing, let streamallocator/opportunistic forwarder set the target
previousTargetLayer: buffer.InvalidLayer,
requestSpatial: buffer.InvalidLayerSpatial,
parkedLayer: buffer.InvalidLayer,
previousParkedLayer: buffer.InvalidLayer,
currentLayer: buffer.InvalidLayer,
previousLayer: buffer.InvalidLayer,
}
@@ -98,7 +93,7 @@ func (b *Base) GetRequestSpatial() int32 {
func (b *Base) CheckSync() (locked bool, layer int32) {
layer = b.GetRequestSpatial()
locked = layer == b.GetCurrent().Spatial || b.GetParked().IsValid()
locked = layer == b.GetCurrent().Spatial
return
}
@@ -118,14 +113,6 @@ func (b *Base) GetMaxSeen() buffer.VideoLayer {
return b.maxSeenLayer
}
func (b *Base) SetParked(parkedLayer buffer.VideoLayer) {
b.parkedLayer = parkedLayer
}
func (b *Base) GetParked() buffer.VideoLayer {
return b.parkedLayer
}
func (b *Base) SetCurrent(currentLayer buffer.VideoLayer) {
b.currentLayer = currentLayer
}
@@ -143,15 +130,12 @@ func (b *Base) Rollback() {
"rolling back",
"previous", b.previousLayer,
"current", b.currentLayer,
"previousParked", b.previousParkedLayer,
"parked", b.parkedLayer,
"previousTarget", b.previousTargetLayer,
"target", b.targetLayer,
"max", b.maxLayer,
"req", b.requestSpatial,
"maxSeen", b.maxSeenLayer,
)
b.parkedLayer = b.previousParkedLayer
b.currentLayer = b.previousLayer
b.targetLayer = b.previousTargetLayer
}
@@ -170,8 +154,6 @@ func (b *Base) SelectTemporal(extPkt *buffer.ExtPacket) (int32, bool) {
"updating temporal layer",
"previous", b.previousLayer,
"current", b.currentLayer,
"previousParked", b.previousParkedLayer,
"parked", b.parkedLayer,
"previousTarget", b.previousTargetLayer,
"target", b.targetLayer,
"max", b.maxLayer,
@@ -286,10 +286,6 @@ func (d *DependencyDescriptor) updateActiveDecodeTargets(activeDecodeTargetsBitm
func (d *DependencyDescriptor) CheckSync() (locked bool, layer int32) {
layer = d.GetRequestSpatial()
if d.GetParked().IsValid() {
return true, layer
}
d.decodeTargetsLock.RLock()
defer d.decodeTargetsLock.RUnlock()
for _, dt := range d.decodeTargets {
+17 -35
View File
@@ -40,10 +40,8 @@ func (s *Simulcast) IsOvershootOkay() bool {
}
func (s *Simulcast) Select(extPkt *buffer.ExtPacket, layer int32) (result VideoLayerSelectorResult) {
populateSwitches := func(isSwitching bool, isActive bool, reason string) {
if isSwitching {
result.IsSwitching = true
}
populateSwitches := func(isActive bool, reason string) {
result.IsSwitching = true
if !isActive {
result.IsResuming = true
@@ -54,8 +52,6 @@ func (s *Simulcast) Select(extPkt *buffer.ExtPacket, layer int32) (result VideoL
reason,
"previous", s.previousLayer,
"current", s.currentLayer,
"previousParked", s.previousParkedLayer,
"parked", s.parkedLayer,
"previousTarget", s.previousTargetLayer,
"target", s.targetLayer,
"max", s.maxLayer,
@@ -70,44 +66,30 @@ func (s *Simulcast) Select(extPkt *buffer.ExtPacket, layer int32) (result VideoL
if s.currentLayer.Spatial != s.targetLayer.Spatial {
currentLayer := s.currentLayer
// Three things to check when not locked to target
// 1. Resumable layer - don't need a key frame
// 2. Opportunistic layer upgrade - needs a key frame
// 3. Need to downgrade - needs a key frame
isSwitching := true
// Two things to check when not locked to target
// 1. Opportunistic layer upgrade - needs a key frame
// 2. Need to downgrade - needs a key frame
isActive := s.currentLayer.IsValid()
found := false
reason := ""
if s.parkedLayer.IsValid() {
if s.parkedLayer.Spatial == layer {
reason = "resuming at parked layer"
currentLayer = s.parkedLayer
isSwitching = false
if extPkt.KeyFrame {
if layer > s.currentLayer.Spatial && layer <= s.targetLayer.Spatial {
reason = "upgrading layer"
found = true
}
} else {
if extPkt.KeyFrame {
if layer > s.currentLayer.Spatial && layer <= s.targetLayer.Spatial {
reason = "upgrading layer"
found = true
}
if layer < s.currentLayer.Spatial && layer >= s.targetLayer.Spatial {
reason = "downgrading layer"
found = true
}
if layer < s.currentLayer.Spatial && layer >= s.targetLayer.Spatial {
reason = "downgrading layer"
found = true
}
if found {
currentLayer.Spatial = layer
currentLayer.Temporal = extPkt.VideoLayer.Temporal
}
if found {
currentLayer.Spatial = layer
currentLayer.Temporal = extPkt.VideoLayer.Temporal
}
}
if found {
s.previousParkedLayer = s.parkedLayer
s.parkedLayer = buffer.InvalidLayer
s.previousLayer = s.currentLayer
s.currentLayer = currentLayer
@@ -116,7 +98,7 @@ func (s *Simulcast) Select(extPkt *buffer.ExtPacket, layer int32) (result VideoL
s.targetLayer.Spatial = s.currentLayer.Spatial
}
populateSwitches(isSwitching, isActive, reason)
populateSwitches(isActive, reason)
}
}
@@ -130,7 +112,7 @@ func (s *Simulcast) Select(extPkt *buffer.ExtPacket, layer int32) (result VideoL
s.targetLayer.Spatial = layer
}
populateSwitches(true, true, "adjusting overshoot")
populateSwitches(true, "adjusting overshoot")
}
result.RTPMarker = extPkt.Packet.Marker
@@ -51,9 +51,6 @@ type VideoLayerSelector interface {
SetMaxSeenTemporal(layer int32)
GetMaxSeen() buffer.VideoLayer
SetParked(parkedLayer buffer.VideoLayer)
GetParked() buffer.VideoLayer
SetCurrent(currentLayer buffer.VideoLayer)
GetCurrent() buffer.VideoLayer