package sfu import ( "fmt" "math" "strings" "sync" "github.com/pion/webrtc/v3" "github.com/livekit/protocol/logger" "github.com/livekit/livekit-server/pkg/sfu/buffer" ) // // Forwarder // const ( FlagPauseOnDowngrade = true FlagFilterRTX = true TransitionCostSpatial = 10 ) // ------------------------------------------------------------------- type ForwardingStatus int const ( ForwardingStatusOff ForwardingStatus = iota ForwardingStatusPartial ForwardingStatusOptimal ) // ------------------------------------------------------------------- type VideoStreamingChange int const ( VideoStreamingChangeNone VideoStreamingChange = iota VideoStreamingChangePausing VideoStreamingChangeResuming ) func (v VideoStreamingChange) String() string { switch v { case VideoStreamingChangeNone: return "NONE" case VideoStreamingChangePausing: return "PAUSING" case VideoStreamingChangeResuming: return "RESUMING" default: return fmt.Sprintf("%d", int(v)) } } // ------------------------------------------------------------------- type VideoAllocationState int const ( VideoAllocationStateNone VideoAllocationState = iota VideoAllocationStateMuted VideoAllocationStateFeedDry VideoAllocationStateAwaitingMeasurement VideoAllocationStateOptimal VideoAllocationStateDeficient ) func (v VideoAllocationState) String() string { switch v { case VideoAllocationStateNone: return "NONE" case VideoAllocationStateMuted: return "MUTED" case VideoAllocationStateFeedDry: return "FEED_DRY" case VideoAllocationStateAwaitingMeasurement: return "AWAITING_MEASUREMENT" case VideoAllocationStateOptimal: return "OPTIMAL" case VideoAllocationStateDeficient: return "DEFICIENT" default: return fmt.Sprintf("%d", int(v)) } } type VideoAllocation struct { state VideoAllocationState change VideoStreamingChange bandwidthRequested int64 bandwidthDelta int64 availableLayers []int32 bitrates Bitrates targetLayers VideoLayers distanceToDesired int32 } func (v VideoAllocation) String() string { return fmt.Sprintf("VideoAllocation{state: %s, change: %s, bw: %d, del: %d, avail: %+v, rates: %+v, target: %s}", v.state, v.change, v.bandwidthRequested, v.bandwidthDelta, v.availableLayers, v.bitrates, v.targetLayers) } var ( VideoAllocationDefault = VideoAllocation{ targetLayers: InvalidLayers, } ) // ------------------------------------------------------------------- type VideoAllocationProvisional struct { layers VideoLayers muted bool bitrates Bitrates availableLayers []int32 maxLayers VideoLayers } // ------------------------------------------------------------------- type VideoTransition struct { from VideoLayers to VideoLayers bandwidthDelta int64 } // ------------------------------------------------------------------- type TranslationParams struct { shouldDrop bool isDroppingRelevant bool isSwitchingToMaxLayer bool rtp *TranslationParamsRTP vp8 *TranslationParamsVP8 } // ------------------------------------------------------------------- type VideoLayers struct { spatial int32 temporal int32 } func (v VideoLayers) String() string { return fmt.Sprintf("VideoLayers{s: %d, t: %d}", v.spatial, v.temporal) } func (v VideoLayers) GreaterThan(v2 VideoLayers) bool { return v.spatial > v2.spatial || (v.spatial == v2.spatial && v.temporal > v2.temporal) } func (v VideoLayers) SpatialGreaterThanOrEqual(v2 VideoLayers) bool { return v.spatial >= v2.spatial } func (v VideoLayers) IsValid() bool { return v.spatial != InvalidLayerSpatial && v.temporal != InvalidLayerTemporal } // ------------------------------------------------------------------- const ( InvalidLayerSpatial = int32(-1) InvalidLayerTemporal = int32(-1) DefaultMaxLayerSpatial = int32(2) DefaultMaxLayerTemporal = int32(3) ) var ( InvalidLayers = VideoLayers{ spatial: InvalidLayerSpatial, temporal: InvalidLayerTemporal, } ) // ------------------------------------------------------------------- type Forwarder struct { lock sync.RWMutex codec webrtc.RTPCodecCapability kind webrtc.RTPCodecType logger logger.Logger muted bool started bool lastSSRC uint32 lTSCalc int64 maxLayers VideoLayers currentLayers VideoLayers targetLayers VideoLayers provisional *VideoAllocationProvisional lastAllocation VideoAllocation availableLayers []int32 rtpMunger *RTPMunger vp8Munger *VP8Munger isTemporalSupported bool } func NewForwarder(codec webrtc.RTPCodecCapability, kind webrtc.RTPCodecType, logger logger.Logger) *Forwarder { f := &Forwarder{ codec: codec, kind: kind, logger: logger, // start off with nothing, let streamallocator set things currentLayers: InvalidLayers, targetLayers: InvalidLayers, lastAllocation: VideoAllocationDefault, rtpMunger: NewRTPMunger(logger), } if strings.ToLower(codec.MimeType) == "video/vp8" { f.isTemporalSupported = true f.vp8Munger = NewVP8Munger(logger) } if f.kind == webrtc.RTPCodecTypeVideo { f.maxLayers = VideoLayers{spatial: InvalidLayerSpatial, temporal: DefaultMaxLayerTemporal} } else { f.maxLayers = InvalidLayers } return f } func (f *Forwarder) Mute(val bool) (bool, VideoLayers) { f.lock.Lock() defer f.lock.Unlock() if f.muted == val { return false, f.maxLayers } f.muted = val // resync when muted so that sequence numbers do not jump on unmute if val { f.resyncLocked() } return true, f.maxLayers } func (f *Forwarder) IsMuted() bool { f.lock.RLock() defer f.lock.RUnlock() return f.muted } func (f *Forwarder) SetMaxSpatialLayer(spatialLayer int32) (bool, VideoLayers, VideoLayers) { f.lock.Lock() defer f.lock.Unlock() if f.kind == webrtc.RTPCodecTypeAudio || spatialLayer == f.maxLayers.spatial { return false, f.maxLayers, f.currentLayers } f.maxLayers.spatial = spatialLayer return true, f.maxLayers, f.currentLayers } func (f *Forwarder) SetMaxTemporalLayer(temporalLayer int32) (bool, VideoLayers, VideoLayers) { f.lock.Lock() defer f.lock.Unlock() if f.kind == webrtc.RTPCodecTypeAudio || temporalLayer == f.maxLayers.temporal { return false, f.maxLayers, f.currentLayers } f.maxLayers.temporal = temporalLayer return true, f.maxLayers, f.currentLayers } func (f *Forwarder) MaxLayers() VideoLayers { f.lock.RLock() defer f.lock.RUnlock() return f.maxLayers } func (f *Forwarder) CurrentLayers() VideoLayers { f.lock.RLock() defer f.lock.RUnlock() return f.currentLayers } func (f *Forwarder) TargetLayers() VideoLayers { f.lock.RLock() defer f.lock.RUnlock() return f.targetLayers } func (f *Forwarder) GetForwardingStatus() ForwardingStatus { f.lock.RLock() defer f.lock.RUnlock() if f.muted || len(f.availableLayers) == 0 { return ForwardingStatusOptimal } if f.targetLayers == InvalidLayers { return ForwardingStatusOff } if f.targetLayers.spatial < f.maxLayers.spatial && f.targetLayers.spatial < f.availableLayers[len(f.availableLayers)-1] { return ForwardingStatusPartial } return ForwardingStatusOptimal } func (f *Forwarder) UpTrackLayersChange(availableLayers []int32) { f.lock.Lock() defer f.lock.Unlock() f.availableLayers = availableLayers } func (f *Forwarder) getOptimalBandwidthNeeded(brs Bitrates) int64 { for i := f.maxLayers.spatial; i >= 0; i-- { for j := f.maxLayers.temporal; j >= 0; j-- { if brs[i][j] == 0 { continue } return brs[i][j] } } return 0 } func (f *Forwarder) bitrateAvailable(brs Bitrates, availableLayers []int32) bool { neededLayers := 0 var bitrateAvailableLayers []int32 for _, layer := range f.availableLayers { if layer > f.maxLayers.spatial { continue } neededLayers++ for t := f.maxLayers.temporal; t >= 0; t-- { if brs[layer][t] != 0 { bitrateAvailableLayers = append(bitrateAvailableLayers, layer) break } } } return len(bitrateAvailableLayers) == neededLayers } func (f *Forwarder) getDistanceToDesired(brs Bitrates, targetLayers VideoLayers) int32 { if f.muted { return 0 } distance := int32(0) for s := f.maxLayers.spatial; s >= 0; s-- { found := false for t := f.maxLayers.temporal; t >= 0; t-- { if brs[s][t] == 0 { continue } if s == targetLayers.spatial && t == targetLayers.temporal { found = true break } distance++ } if found { break } } return distance } func (f *Forwarder) IsDeficient() bool { f.lock.RLock() defer f.lock.RUnlock() return f.lastAllocation.state == VideoAllocationStateDeficient } func (f *Forwarder) BandwidthRequested(brs Bitrates) int64 { f.lock.RLock() defer f.lock.RUnlock() if f.targetLayers == InvalidLayers { return 0 } return brs[f.targetLayers.spatial][f.targetLayers.temporal] } func (f *Forwarder) DistanceToDesired() int32 { f.lock.RLock() defer f.lock.RUnlock() return f.lastAllocation.distanceToDesired } func (f *Forwarder) AllocateOptimal(brs Bitrates) VideoAllocation { f.lock.Lock() defer f.lock.Unlock() if f.kind == webrtc.RTPCodecTypeAudio { return f.lastAllocation } state := VideoAllocationStateNone change := VideoStreamingChangeNone bandwidthRequested := int64(0) targetLayers := InvalidLayers switch { case f.muted: state = VideoAllocationStateMuted case len(f.availableLayers) == 0: // feed is dry state = VideoAllocationStateFeedDry case !f.bitrateAvailable(brs, f.availableLayers): // feed bitrate not yet calculated for all available layers state = VideoAllocationStateAwaitingMeasurement // // Resume with the highest layer available <= max subscribed layer // If already resumed, move allocation to the highest available layer <= max subscribed layer // targetLayers.spatial = int32(math.Min(float64(f.maxLayers.spatial), float64(f.availableLayers[len(f.availableLayers)-1]))) targetLayers.temporal = int32(math.Max(0, float64(f.maxLayers.temporal))) if f.targetLayers == InvalidLayers && targetLayers.IsValid() { change = VideoStreamingChangeResuming } default: // allocate best layer available for s := f.maxLayers.spatial; s >= 0; s-- { for t := f.maxLayers.temporal; t >= 0; t-- { if brs[s][t] == 0 { continue } targetLayers = VideoLayers{ spatial: s, temporal: t, } bandwidthRequested = brs[s][t] state = VideoAllocationStateOptimal if f.targetLayers == InvalidLayers { change = VideoStreamingChangeResuming } break } if bandwidthRequested != 0 { break } } } if !targetLayers.IsValid() { targetLayers = InvalidLayers } f.lastAllocation = VideoAllocation{ state: state, change: change, bandwidthRequested: bandwidthRequested, bandwidthDelta: bandwidthRequested - f.lastAllocation.bandwidthRequested, availableLayers: f.availableLayers, bitrates: brs, targetLayers: targetLayers, distanceToDesired: f.getDistanceToDesired(brs, targetLayers), } f.targetLayers = f.lastAllocation.targetLayers if f.targetLayers == InvalidLayers { f.resyncLocked() } return f.lastAllocation } func (f *Forwarder) ProvisionalAllocatePrepare(bitrates Bitrates) { f.lock.Lock() defer f.lock.Unlock() f.provisional = &VideoAllocationProvisional{ layers: InvalidLayers, muted: f.muted, bitrates: bitrates, availableLayers: f.availableLayers, maxLayers: f.maxLayers, } } func (f *Forwarder) ProvisionalAllocate(availableChannelCapacity int64, layers VideoLayers, allowPause bool) int64 { f.lock.Lock() defer f.lock.Unlock() if f.provisional.muted || !f.provisional.maxLayers.IsValid() || layers.GreaterThan(f.provisional.maxLayers) { return 0 } requiredBitrate := f.provisional.bitrates[layers.spatial][layers.temporal] if requiredBitrate == 0 { return 0 } alreadyAllocatedBitrate := int64(0) if f.provisional.layers != InvalidLayers { alreadyAllocatedBitrate = f.provisional.bitrates[f.provisional.layers.spatial][f.provisional.layers.temporal] } if requiredBitrate <= (availableChannelCapacity + alreadyAllocatedBitrate) { f.provisional.layers = layers return requiredBitrate - alreadyAllocatedBitrate } // when pause is disallowed, pick the layer if none allocated already or something lower is available if !allowPause && (f.provisional.layers == InvalidLayers || !layers.GreaterThan(f.provisional.layers)) { f.provisional.layers = layers return requiredBitrate - alreadyAllocatedBitrate } return 0 } func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition() VideoTransition { // // This is called when a track needs a change (could be mute/unmute, subscribed layers changed, published layers changed) // when channel is congested. // // The goal is to provide a co-operative transition. Co-operative stream allocation aims to keep all the streams active // as much as possible. // // When channel is congested, effecting a transition which will consume more bits will lead to more congestion. // So, this routine does the following // 1. When muting, it is not going to increase consumption. // 2. If the stream is currently active and the transition needs more bits (higher layers = more bits), do not make the up move. // The higher layer requirement could be due to a new published layer becoming available or subscribed layers changing. // 3. If the new target layers are lower than current target, take the move down and save bits. // 4. If not currently streaming, find the minimum layers that can unpause the stream. // // To summarize, co-operative streaming means // - Try to keep tracks streaming, i.e. no pauses at the expense of some streams not being at optimal layers // - Do not make an upgrade as it could affect other tracks // f.lock.Lock() defer f.lock.Unlock() if f.provisional.muted { f.provisional.layers = InvalidLayers return VideoTransition{ from: f.targetLayers, to: InvalidLayers, bandwidthDelta: 0 - f.lastAllocation.bandwidthRequested, // LK-TODO should this take current bitrate of current target layers? } } // check if we should preserve current target if f.targetLayers != InvalidLayers { // what is the highest that is available maximalLayers := InvalidLayers maximalBandwidthRequired := int64(0) for s := f.provisional.maxLayers.spatial; s >= 0; s-- { for t := f.provisional.maxLayers.temporal; t >= 0; t-- { if f.provisional.bitrates[s][t] != 0 { maximalLayers = VideoLayers{spatial: s, temporal: t} maximalBandwidthRequired = f.provisional.bitrates[s][t] break } } if maximalBandwidthRequired != 0 { break } } if maximalLayers != InvalidLayers { if !f.targetLayers.GreaterThan(maximalLayers) && (f.provisional.bitrates[f.targetLayers.spatial][f.targetLayers.temporal] != 0) { // currently streaming and wanting an upgrade, just preserve current target in the cooperative scheme of things f.provisional.layers = f.targetLayers return VideoTransition{ from: f.targetLayers, to: f.targetLayers, bandwidthDelta: 0, } } if f.targetLayers.GreaterThan(maximalLayers) { // maximalLayers <= f.targetLayers, make the down move f.provisional.layers = maximalLayers return VideoTransition{ from: f.targetLayers, to: maximalLayers, bandwidthDelta: maximalBandwidthRequired - f.lastAllocation.bandwidthRequested, } } } } // currently not streaming, find minimal // NOTE: a layer in feed could have paused and there could be other options than going back to minimal, // but the cooperative scheme knocks things back to minimal minimalLayers := InvalidLayers bandwidthRequired := int64(0) for s := int32(0); s <= f.provisional.maxLayers.spatial; s++ { for t := int32(0); t <= f.provisional.maxLayers.temporal; t++ { if f.provisional.bitrates[s][t] != 0 { minimalLayers = VideoLayers{spatial: s, temporal: t} bandwidthRequired = f.provisional.bitrates[s][t] break } } if bandwidthRequired != 0 { break } } targetLayers := f.targetLayers if targetLayers == InvalidLayers || targetLayers.GreaterThan(minimalLayers) || (f.provisional.bitrates[targetLayers.spatial][targetLayers.temporal] == 0) { targetLayers = minimalLayers } f.provisional.layers = targetLayers return VideoTransition{ from: f.targetLayers, to: targetLayers, bandwidthDelta: bandwidthRequired - f.lastAllocation.bandwidthRequested, } } func (f *Forwarder) ProvisionalAllocateGetBestWeightedTransition() VideoTransition { // // This is called when a track needs a change (could be mute/unmute, subscribed layers changed, published layers changed) // when channel is congested. // // The goal is to keep all tracks streaming as much as possible. So, the track that needs a change needs bandwidth to be unpaused. // // This tries to figure out how much this track can contribute back to the pool to enable the track that needs to be unpaused. // 1. Track muted OR feed dry - can contribute everything back in case it was using bandwidth. // 2. Look at all possible down transitions from current target and find the best offer. // Best offer is calculated as bandwidth saved moving to a down layer divided by cost. // Cost has two components // a. Transition cost: Spatial layer switch is expensive due to key frame requirement, but temporal layer switch is free. // b. Quality cost: The farther away from desired layers, the higher the quality cost. // f.lock.Lock() defer f.lock.Unlock() if f.provisional.muted { f.provisional.layers = InvalidLayers return VideoTransition{ from: f.targetLayers, to: InvalidLayers, bandwidthDelta: 0 - f.lastAllocation.bandwidthRequested, // LK-TODO should this take current bitrate of current target layers? } } maxReachableLayerTemporal := InvalidLayerTemporal for t := f.provisional.maxLayers.temporal; t >= 0; t-- { for s := f.provisional.maxLayers.spatial; s >= 0; s-- { if f.provisional.bitrates[s][t] != 0 { maxReachableLayerTemporal = t break } } if maxReachableLayerTemporal != InvalidLayerTemporal { break } } if maxReachableLayerTemporal == InvalidLayerTemporal { // feed has gone dry, f.provisional.layers = InvalidLayers return VideoTransition{ from: f.targetLayers, to: InvalidLayers, bandwidthDelta: 0 - f.lastAllocation.bandwidthRequested, // LK-TODO should this take current bitrate of current target layers? } } // starting from minimum to target, find transition which gives the best // transition taking into account bits saved vs cost of such a transition bestLayers := InvalidLayers bestBandwidthDelta := int64(0) bestValue := float32(0) for s := int32(0); s <= f.targetLayers.spatial; s++ { for t := int32(0); t <= f.targetLayers.temporal; t++ { if s == f.targetLayers.spatial && t == f.targetLayers.temporal { break } bandwidthDelta := int64(math.Max(float64(0), float64(f.lastAllocation.bandwidthRequested-f.provisional.bitrates[s][t]))) transitionCost := int32(0) if f.targetLayers.spatial != s { transitionCost = TransitionCostSpatial } qualityCost := (maxReachableLayerTemporal+1)*(f.targetLayers.spatial-s) + (f.targetLayers.temporal - t) value := float32(0) if (transitionCost + qualityCost) != 0 { value = float32(bandwidthDelta) / float32(transitionCost+qualityCost) } if value > bestValue || (value == bestValue && bandwidthDelta > bestBandwidthDelta) { bestValue = value bestBandwidthDelta = bandwidthDelta bestLayers = VideoLayers{spatial: s, temporal: t} } } } f.provisional.layers = bestLayers return VideoTransition{ from: f.targetLayers, to: bestLayers, bandwidthDelta: bestBandwidthDelta, } } func (f *Forwarder) ProvisionalAllocateCommit() VideoAllocation { f.lock.Lock() defer f.lock.Unlock() state := VideoAllocationStateNone change := VideoStreamingChangeNone bandwidthRequested := int64(0) switch { case f.provisional.muted: state = VideoAllocationStateMuted case len(f.provisional.availableLayers) == 0: // feed is dry state = VideoAllocationStateFeedDry case f.provisional.layers == InvalidLayers: state = VideoAllocationStateDeficient if f.targetLayers != InvalidLayers { change = VideoStreamingChangePausing } default: bandwidthRequested = f.provisional.bitrates[f.provisional.layers.spatial][f.provisional.layers.temporal] if bandwidthRequested == f.getOptimalBandwidthNeeded(f.provisional.bitrates) { state = VideoAllocationStateOptimal } else { state = VideoAllocationStateDeficient } if f.targetLayers == InvalidLayers { change = VideoStreamingChangeResuming } } f.lastAllocation = VideoAllocation{ state: state, change: change, bandwidthRequested: bandwidthRequested, bandwidthDelta: bandwidthRequested - f.lastAllocation.bandwidthRequested, availableLayers: f.provisional.availableLayers, bitrates: f.provisional.bitrates, targetLayers: f.provisional.layers, distanceToDesired: f.getDistanceToDesired(f.provisional.bitrates, f.provisional.layers), } f.targetLayers = f.lastAllocation.targetLayers if f.targetLayers == InvalidLayers { f.resyncLocked() } return f.lastAllocation } func (f *Forwarder) AllocateNextHigher(availableChannelCapacity int64, brs Bitrates) (VideoAllocation, bool) { f.lock.Lock() defer f.lock.Unlock() if f.kind == webrtc.RTPCodecTypeAudio { f.lastAllocation.change = VideoStreamingChangeNone return f.lastAllocation, false } // if not deficient, nothing to do if f.lastAllocation.state != VideoAllocationStateDeficient { f.lastAllocation.change = VideoStreamingChangeNone return f.lastAllocation, false } // if targets are still pending, don't increase if f.targetLayers != InvalidLayers && f.targetLayers != f.currentLayers { f.lastAllocation.change = VideoStreamingChangeNone return f.lastAllocation, false } optimalBandwidthNeeded := f.getOptimalBandwidthNeeded(brs) alreadyAllocated := int64(0) if f.targetLayers != InvalidLayers { alreadyAllocated = brs[f.targetLayers.spatial][f.targetLayers.temporal] } // try moving temporal layer up in currently streaming spatial layer if f.targetLayers != InvalidLayers { for t := f.targetLayers.temporal + 1; t <= f.maxLayers.temporal; t++ { bandwidthRequested := brs[f.targetLayers.spatial][t] if bandwidthRequested == 0 { continue } if bandwidthRequested-alreadyAllocated > availableChannelCapacity { // next higher available layer does not fit, return f.lastAllocation.change = VideoStreamingChangeNone return f.lastAllocation, false } state := VideoAllocationStateOptimal if bandwidthRequested != optimalBandwidthNeeded { state = VideoAllocationStateDeficient } targetLayers := VideoLayers{spatial: f.targetLayers.spatial, temporal: t} f.lastAllocation = VideoAllocation{ state: state, change: VideoStreamingChangeNone, bandwidthRequested: bandwidthRequested, bandwidthDelta: bandwidthRequested - alreadyAllocated, availableLayers: f.availableLayers, bitrates: brs, targetLayers: targetLayers, distanceToDesired: f.getDistanceToDesired(brs, targetLayers), } f.targetLayers = f.lastAllocation.targetLayers return f.lastAllocation, true } } // try moving spatial layer up if temporal layer move up is not available for s := f.targetLayers.spatial + 1; s <= f.maxLayers.spatial; s++ { for t := int32(0); t <= f.maxLayers.temporal; t++ { bandwidthRequested := brs[s][t] if bandwidthRequested == 0 { continue } if bandwidthRequested-alreadyAllocated > availableChannelCapacity { // next higher available layer does not fit, return f.lastAllocation.change = VideoStreamingChangeNone return f.lastAllocation, false } state := VideoAllocationStateOptimal if bandwidthRequested != optimalBandwidthNeeded { state = VideoAllocationStateDeficient } change := VideoStreamingChangeNone if f.targetLayers == InvalidLayers { change = VideoStreamingChangeResuming } targetLayers := VideoLayers{spatial: s, temporal: t} f.lastAllocation = VideoAllocation{ state: state, change: change, bandwidthRequested: bandwidthRequested, bandwidthDelta: bandwidthRequested - alreadyAllocated, availableLayers: f.availableLayers, bitrates: brs, targetLayers: targetLayers, distanceToDesired: f.getDistanceToDesired(brs, targetLayers), } f.targetLayers = f.lastAllocation.targetLayers return f.lastAllocation, true } } f.lastAllocation.change = VideoStreamingChangeNone return f.lastAllocation, false } func (f *Forwarder) GetNextHigherTransition(brs Bitrates) (VideoTransition, bool) { f.lock.Lock() defer f.lock.Unlock() if f.kind == webrtc.RTPCodecTypeAudio { return VideoTransition{}, false } // if not deficient, nothing to do if f.lastAllocation.state != VideoAllocationStateDeficient { return VideoTransition{}, false } // if targets are still pending, don't increase if f.targetLayers != InvalidLayers && f.targetLayers != f.currentLayers { return VideoTransition{}, false } alreadyAllocated := int64(0) if f.targetLayers != InvalidLayers { alreadyAllocated = brs[f.targetLayers.spatial][f.targetLayers.temporal] } // try moving temporal layer up in currently streaming spatial layer if f.targetLayers != InvalidLayers { for t := f.targetLayers.temporal + 1; t <= f.maxLayers.temporal; t++ { bandwidthRequested := brs[f.targetLayers.spatial][t] if bandwidthRequested == 0 { continue } transition := VideoTransition{ from: f.targetLayers, to: VideoLayers{spatial: f.targetLayers.spatial, temporal: t}, bandwidthDelta: bandwidthRequested - alreadyAllocated, } return transition, true } } // try moving spatial layer up if temporal layer move up is not available for s := f.targetLayers.spatial + 1; s <= f.maxLayers.spatial; s++ { for t := int32(0); t <= f.maxLayers.temporal; t++ { bandwidthRequested := brs[s][t] if bandwidthRequested == 0 { continue } transition := VideoTransition{ from: f.targetLayers, to: VideoLayers{spatial: s, temporal: t}, bandwidthDelta: bandwidthRequested - alreadyAllocated, } return transition, true } } return VideoTransition{}, false } func (f *Forwarder) Pause(brs Bitrates) VideoAllocation { f.lock.Lock() defer f.lock.Unlock() state := VideoAllocationStateNone change := VideoStreamingChangeNone switch { case f.muted: state = VideoAllocationStateMuted case len(f.availableLayers) == 0: // feed is dry state = VideoAllocationStateFeedDry default: // feed bitrate is not yet calculated or pausing due to lack of bandwidth state = VideoAllocationStateDeficient if f.targetLayers != InvalidLayers { change = VideoStreamingChangePausing } } f.lastAllocation = VideoAllocation{ state: state, change: change, bandwidthRequested: 0, bandwidthDelta: 0 - f.lastAllocation.bandwidthRequested, availableLayers: f.availableLayers, bitrates: brs, targetLayers: InvalidLayers, distanceToDesired: f.getDistanceToDesired(brs, InvalidLayers), } f.targetLayers = f.lastAllocation.targetLayers if f.targetLayers == InvalidLayers { f.resyncLocked() } return f.lastAllocation } func (f *Forwarder) Resync() { f.lock.Lock() defer f.lock.Unlock() f.resyncLocked() } func (f *Forwarder) resyncLocked() { f.currentLayers = InvalidLayers f.lastSSRC = 0 } func (f *Forwarder) CheckSync() (locked bool, layer int32) { f.lock.RLock() defer f.lock.RUnlock() layer = f.targetLayers.spatial locked = f.targetLayers.spatial == f.currentLayers.spatial return } func (f *Forwarder) FilterRTX(nacks []uint16) (filtered []uint16, disallowedLayers [DefaultMaxLayerSpatial + 1]bool) { if !FlagFilterRTX { filtered = nacks return } f.lock.RLock() defer f.lock.RUnlock() filtered = f.rtpMunger.FilterRTX(nacks) // // Curb RTX when deficient for two cases // 1. Target layer is lower than current layer. When current hits target, a key frame should flush the decoder. // 2. Requested layer is higher than current. Current layer's key frame should have flushed encoder. // Remote might ask for older layer because of its jitter buffer, but let it starve as channel is already congested. // // Without the curb, when congestion hits, RTX rate could be so high that it further congests the channel. // for layer := int32(0); layer < DefaultMaxLayerSpatial+1; layer++ { if f.lastAllocation.state == VideoAllocationStateDeficient && (f.targetLayers.spatial < f.currentLayers.spatial || layer > f.currentLayers.spatial) { disallowedLayers[layer] = true } } return } func (f *Forwarder) GetTranslationParams(extPkt *buffer.ExtPacket, layer int32) (*TranslationParams, error) { f.lock.Lock() defer f.lock.Unlock() if f.muted { return &TranslationParams{ shouldDrop: true, }, nil } switch f.kind { case webrtc.RTPCodecTypeAudio: return f.getTranslationParamsAudio(extPkt) case webrtc.RTPCodecTypeVideo: return f.getTranslationParamsVideo(extPkt, layer) } return nil, ErrUnknownKind } // should be called with lock held func (f *Forwarder) getTranslationParamsCommon(extPkt *buffer.ExtPacket) (*TranslationParams, error) { if f.lastSSRC != extPkt.Packet.SSRC { if !f.started { f.started = true f.rtpMunger.SetLastSnTs(extPkt) if f.vp8Munger != nil { f.vp8Munger.SetLast(extPkt) } } else { // LK-TODO-START // The below offset calculation is not technically correct. // Timestamps based on the system time of an intermediate box like // SFU is not going to be accurate. Packets arrival/processing // are subject to vagaries of network delays, SFU processing etc. // But, the correct way is a lot harder. Will have to // look at RTCP SR to get timestamps and align (and figure out alignment // of layers and use that during layer switch in simulcast case). // That can get tricky. Given the complexity of that approach, maybe // this is just fine till it is not :-). // LK-TODO-END // Compute how much time passed between the old RTP extPkt // and the current packet, and fix timestamp on source change tDiffMs := (extPkt.Arrival - f.lTSCalc) / 1e6 if tDiffMs < 0 { tDiffMs = 0 } td := uint32(tDiffMs * int64(f.codec.ClockRate) / 1000) if td == 0 { td = 1 } f.rtpMunger.UpdateSnTsOffsets(extPkt, 1, td) if f.vp8Munger != nil { f.vp8Munger.UpdateOffsets(extPkt) } } f.lastSSRC = extPkt.Packet.SSRC } f.lTSCalc = extPkt.Arrival tp := &TranslationParams{} tpRTP, err := f.rtpMunger.UpdateAndGetSnTs(extPkt) if err != nil { tp.shouldDrop = true if err == ErrPaddingOnlyPacket || err == ErrDuplicatePacket || err == ErrOutOfOrderSequenceNumberCacheMiss { if err == ErrOutOfOrderSequenceNumberCacheMiss { tp.isDroppingRelevant = true } return tp, nil } tp.isDroppingRelevant = true return tp, err } tp.rtp = tpRTP return tp, nil } // should be called with lock held func (f *Forwarder) getTranslationParamsAudio(extPkt *buffer.ExtPacket) (*TranslationParams, error) { return f.getTranslationParamsCommon(extPkt) } // should be called with lock held func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer int32) (*TranslationParams, error) { tp := &TranslationParams{} if f.targetLayers == InvalidLayers { // stream is paused by streamallocator tp.shouldDrop = true return tp, nil } if f.targetLayers.spatial != f.currentLayers.spatial { if f.targetLayers.spatial == layer { if extPkt.KeyFrame { // lock to target layer f.logger.Debugw("locking to target layer", "current", f.currentLayers, "target", f.targetLayers) f.currentLayers.spatial = f.targetLayers.spatial if !f.isTemporalSupported { f.currentLayers.temporal = f.targetLayers.temporal } if f.currentLayers.spatial == f.maxLayers.spatial { tp.isSwitchingToMaxLayer = true } } } } if f.currentLayers.spatial != layer { tp.shouldDrop = true return tp, nil } if FlagPauseOnDowngrade && f.targetLayers.spatial < f.currentLayers.spatial && f.lastAllocation.state == VideoAllocationStateDeficient { // // If target layer is lower than both the current and // maximum subscribed layer, it is due to bandwidth // constraints that the target layer has been switched down. // Continuing to send higher layer will only exacerbate the // situation by putting more stress on the channel. So, drop it. // // In the other direction, it is okay to keep forwarding till // switch point to get a smoother stream till the higher // layer key frame arrives. // // Note that in the case of client subscription layer restriction // coinciding with server restriction due to bandwidth limitation, // In the case of subscription change, higher should continue streaming // to ensure smooth transition. // // To differentiate, drop only when in DEFICIENT state. // tp.shouldDrop = true tp.isDroppingRelevant = true return tp, nil } tp, err := f.getTranslationParamsCommon(extPkt) if tp.shouldDrop || f.vp8Munger == nil || len(extPkt.Packet.Payload) == 0 { return tp, err } // catch up temporal layer if necessary if f.currentLayers.temporal != f.targetLayers.temporal { incomingVP8, ok := extPkt.Payload.(buffer.VP8) if ok { if incomingVP8.TIDPresent == 1 && incomingVP8.TID <= uint8(f.targetLayers.temporal) { f.currentLayers.temporal = f.targetLayers.temporal } } } tpVP8, err := f.vp8Munger.UpdateAndGet(extPkt, tp.rtp.snOrdering, f.currentLayers.temporal) if err != nil { tp.rtp = nil tp.shouldDrop = true if err == ErrFilteredVP8TemporalLayer || err == ErrOutOfOrderVP8PictureIdCacheMiss { if err == ErrFilteredVP8TemporalLayer { // filtered temporal layer, update sequence number offset to prevent holes f.rtpMunger.PacketDropped(extPkt) } if err == ErrOutOfOrderVP8PictureIdCacheMiss { tp.isDroppingRelevant = true } return tp, nil } tp.isDroppingRelevant = true return tp, err } tp.vp8 = tpVP8 return tp, nil } func (f *Forwarder) GetSnTsForPadding(num int) ([]SnTs, error) { f.lock.Lock() defer f.lock.Unlock() // padding is used for probing. Padding packets should be // at frame boundaries only to ensure decoder sequencer does // not get out-of-sync. But, when a stream is paused, // force a frame marker as a restart of the stream will // start with a key frame which will reset the decoder. forceMarker := false if f.targetLayers == InvalidLayers { forceMarker = true } return f.rtpMunger.UpdateAndGetPaddingSnTs(num, 0, 0, forceMarker) } func (f *Forwarder) GetSnTsForBlankFrames() ([]SnTs, bool, error) { f.lock.Lock() defer f.lock.Unlock() num := RTPBlankFramesMax frameEndNeeded := !f.rtpMunger.IsOnFrameBoundary() if frameEndNeeded { num++ } snts, err := f.rtpMunger.UpdateAndGetPaddingSnTs(num, f.codec.ClockRate, 30, frameEndNeeded) return snts, frameEndNeeded, err } func (f *Forwarder) GetPaddingVP8(frameEndNeeded bool) *buffer.VP8 { f.lock.Lock() defer f.lock.Unlock() return f.vp8Munger.UpdateAndGetPadding(!frameEndNeeded) } func (f *Forwarder) GetRTPMungerParams() RTPMungerParams { f.lock.RLock() defer f.lock.RUnlock() return f.rtpMunger.GetParams() }