Files
livekit/pkg/sfu/forwarder.go
cnderrauber 7a7fc09372 Add fps calculator for VP8 and DependencyDescriptor (#1110)
* Add fps calculator for VP8 and DependencyDescriptor

* clean code

* unit test

* clean code

* solve comment
2022-10-26 09:28:28 +08:00

1588 lines
45 KiB
Go

package sfu
import (
"fmt"
"math"
"strings"
"sync"
"time"
"github.com/pion/webrtc/v3"
"github.com/livekit/protocol/logger"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
dd "github.com/livekit/livekit-server/pkg/sfu/dependencydescriptor"
)
//
// Forwarder
//
const (
FlagPauseOnDowngrade = true
FlagFilterRTX = true
TransitionCostSpatial = 10
)
// -------------------------------------------------------------------
type ForwardingStatus int
const (
ForwardingStatusOff ForwardingStatus = iota
ForwardingStatusPartial
ForwardingStatusOptimal
)
// -------------------------------------------------------------------
type VideoStreamingChange int
const (
VideoStreamingChangeNone VideoStreamingChange = iota
VideoStreamingChangePausing
VideoStreamingChangeResuming
)
func (v VideoStreamingChange) String() string {
switch v {
case VideoStreamingChangeNone:
return "NONE"
case VideoStreamingChangePausing:
return "PAUSING"
case VideoStreamingChangeResuming:
return "RESUMING"
default:
return fmt.Sprintf("%d", int(v))
}
}
// -------------------------------------------------------------------
type VideoAllocationState int
const (
VideoAllocationStateNone VideoAllocationState = iota
VideoAllocationStateMuted
VideoAllocationStateFeedDry
VideoAllocationStateAwaitingMeasurement
VideoAllocationStateOptimal
VideoAllocationStateDeficient
)
func (v VideoAllocationState) String() string {
switch v {
case VideoAllocationStateNone:
return "NONE"
case VideoAllocationStateMuted:
return "MUTED"
case VideoAllocationStateFeedDry:
return "FEED_DRY"
case VideoAllocationStateAwaitingMeasurement:
return "AWAITING_MEASUREMENT"
case VideoAllocationStateOptimal:
return "OPTIMAL"
case VideoAllocationStateDeficient:
return "DEFICIENT"
default:
return fmt.Sprintf("%d", int(v))
}
}
type VideoAllocation struct {
state VideoAllocationState
change VideoStreamingChange
bandwidthRequested int64
bandwidthDelta int64
availableLayers []int32
exemptedLayers []int32
bitrates Bitrates
targetLayers VideoLayers
distanceToDesired int32
}
func (v VideoAllocation) String() string {
return fmt.Sprintf("VideoAllocation{state: %s, change: %s, bw: %d, del: %d, avail: %+v, exempt: %+v, rates: %+v, target: %s, dist: %d}",
v.state, v.change, v.bandwidthRequested, v.bandwidthDelta, v.availableLayers, v.exemptedLayers, v.bitrates, v.targetLayers, v.distanceToDesired)
}
var (
VideoAllocationDefault = VideoAllocation{
targetLayers: InvalidLayers,
}
)
// -------------------------------------------------------------------
type VideoAllocationProvisional struct {
muted bool
bitrates Bitrates
availableLayers []int32
exemptedLayers []int32
maxLayers VideoLayers
allocatedLayers VideoLayers
}
// -------------------------------------------------------------------
type VideoTransition struct {
from VideoLayers
to VideoLayers
bandwidthDelta int64
}
func (v VideoTransition) String() string {
return fmt.Sprintf("VideoTransition{from: %s, to: %s, del: %d}", v.from, v.to, v.bandwidthDelta)
}
// -------------------------------------------------------------------
type TranslationParams struct {
shouldDrop bool
isDroppingRelevant bool
isSwitchingToMaxLayer bool
rtp *TranslationParamsRTP
vp8 *TranslationParamsVP8
ddExtension *dd.DependencyDescriptorExtension
marker bool
// indicates this frame has 'Switch' decode indication for target layer
// TODO : in theory, we need check frame chain is not broken for the target
// but we don't have frame queue now, so just use decode target indication
switchingToTargetLayer bool
}
// -------------------------------------------------------------------
type VideoLayers = buffer.VideoLayer
const (
InvalidLayerSpatial = buffer.InvalidLayerSpatial
InvalidLayerTemporal = buffer.InvalidLayerTemporal
DefaultMaxLayerSpatial = buffer.DefaultMaxLayerSpatial
DefaultMaxLayerTemporal = buffer.DefaultMaxLayerTemporal
)
var (
InvalidLayers = buffer.InvalidLayers
)
// -------------------------------------------------------------------
type ForwarderState struct {
LastTSCalc int64
RTP RTPMungerState
VP8 VP8MungerState
}
func (f ForwarderState) String() string {
return fmt.Sprintf("ForwarderState{lTSCalc: %d, rtp: %s, vp8: %s}",
f.LastTSCalc, f.RTP.String(), f.VP8.String())
}
// -------------------------------------------------------------------
type Forwarder struct {
lock sync.RWMutex
codec webrtc.RTPCodecCapability
kind webrtc.RTPCodecType
logger logger.Logger
muted bool
started bool
lastSSRC uint32
lTSCalc int64
maxLayers VideoLayers
currentLayers VideoLayers
targetLayers VideoLayers
provisional *VideoAllocationProvisional
lastAllocation VideoAllocation
availableLayers []int32
exemptedLayers []int32
rtpMunger *RTPMunger
vp8Munger *VP8Munger
isTemporalSupported bool
ddLayerSelector *DDVideoLayerSelector
}
func NewForwarder(kind webrtc.RTPCodecType, logger logger.Logger) *Forwarder {
f := &Forwarder{
kind: kind,
logger: logger,
// start off with nothing, let streamallocator set things
currentLayers: InvalidLayers,
targetLayers: InvalidLayers,
lastAllocation: VideoAllocationDefault,
rtpMunger: NewRTPMunger(logger),
}
if f.kind == webrtc.RTPCodecTypeVideo {
f.maxLayers = VideoLayers{Spatial: InvalidLayerSpatial, Temporal: DefaultMaxLayerTemporal}
} else {
f.maxLayers = InvalidLayers
}
return f
}
func (f *Forwarder) DetermineCodec(codec webrtc.RTPCodecCapability) {
f.lock.Lock()
defer f.lock.Unlock()
if f.codec.MimeType != "" {
return
}
f.codec = codec
switch strings.ToLower(codec.MimeType) {
case "video/vp8":
f.isTemporalSupported = true
f.vp8Munger = NewVP8Munger(f.logger)
case "video/av1":
// TODO : we only enable dd layer selector for av1 now, at future we can
// enable it for vp9 too
f.ddLayerSelector = NewDDVideoLayerSelector(f.logger)
}
}
func (f *Forwarder) GetState() ForwarderState {
f.lock.RLock()
defer f.lock.RUnlock()
state := ForwarderState{
LastTSCalc: f.lTSCalc,
RTP: f.rtpMunger.GetLast(),
}
if f.vp8Munger != nil {
state.VP8 = f.vp8Munger.GetLast()
}
return state
}
func (f *Forwarder) SeedState(state ForwarderState) {
f.lock.Lock()
defer f.lock.Unlock()
f.lTSCalc = state.LastTSCalc
f.rtpMunger.SeedLast(state.RTP)
if f.vp8Munger != nil {
f.vp8Munger.SeedLast(state.VP8)
}
f.started = true
}
func (f *Forwarder) Mute(muted bool) (bool, VideoLayers) {
f.lock.Lock()
defer f.lock.Unlock()
if f.muted == muted {
return false, f.maxLayers
}
f.logger.Infow("setting forwarder mute", "muted", muted)
f.muted = muted
// resync when muted so that sequence numbers do not jump on unmute
if muted {
f.resyncLocked()
}
return true, f.maxLayers
}
func (f *Forwarder) IsMuted() bool {
f.lock.RLock()
defer f.lock.RUnlock()
return f.muted
}
func (f *Forwarder) SetMaxSpatialLayer(spatialLayer int32) (bool, VideoLayers, VideoLayers) {
f.lock.Lock()
defer f.lock.Unlock()
if f.kind == webrtc.RTPCodecTypeAudio || spatialLayer == f.maxLayers.Spatial {
return false, f.maxLayers, f.currentLayers
}
f.logger.Infow("setting max spatial layer", "layer", spatialLayer)
f.maxLayers.Spatial = spatialLayer
return true, f.maxLayers, f.currentLayers
}
func (f *Forwarder) SetMaxTemporalLayer(temporalLayer int32) (bool, VideoLayers, VideoLayers) {
f.lock.Lock()
defer f.lock.Unlock()
if f.kind == webrtc.RTPCodecTypeAudio || temporalLayer == f.maxLayers.Temporal {
return false, f.maxLayers, f.currentLayers
}
f.logger.Infow("setting max temporal layer", "layer", temporalLayer)
f.maxLayers.Temporal = temporalLayer
return true, f.maxLayers, f.currentLayers
}
func (f *Forwarder) MaxLayers() VideoLayers {
f.lock.RLock()
defer f.lock.RUnlock()
return f.maxLayers
}
func (f *Forwarder) CurrentLayers() VideoLayers {
f.lock.RLock()
defer f.lock.RUnlock()
return f.currentLayers
}
func (f *Forwarder) TargetLayers() VideoLayers {
f.lock.RLock()
defer f.lock.RUnlock()
return f.targetLayers
}
func (f *Forwarder) GetForwardingStatus() ForwardingStatus {
f.lock.RLock()
defer f.lock.RUnlock()
if f.muted || len(f.availableLayers) == 0 {
return ForwardingStatusOptimal
}
if f.targetLayers == InvalidLayers {
return ForwardingStatusOff
}
if f.targetLayers.Spatial < f.maxLayers.Spatial && f.targetLayers.Spatial < f.availableLayers[len(f.availableLayers)-1] {
return ForwardingStatusPartial
}
return ForwardingStatusOptimal
}
func (f *Forwarder) IsReducedQuality() (int32, bool) {
f.lock.RLock()
defer f.lock.RUnlock()
if f.muted || len(f.availableLayers) == 0 || f.targetLayers.Spatial == InvalidLayerSpatial {
return 0, false
}
if f.currentLayers.Spatial != f.targetLayers.Spatial {
//
// Waiting for layer lock, do not declare reduced quality.
// Note the target might actually be a lower layer than current.
//
return 0, false
}
distance := f.maxLayers.Spatial - f.currentLayers.Spatial
if distance < 0 {
distance = 0
}
return distance, f.lastAllocation.state == VideoAllocationStateDeficient
}
func (f *Forwarder) UpTrackLayersChange(availableLayers []int32, exemptedLayers []int32) {
f.lock.Lock()
defer f.lock.Unlock()
if len(availableLayers) > 0 {
f.availableLayers = make([]int32, len(availableLayers))
copy(f.availableLayers, availableLayers)
} else {
f.availableLayers = nil
}
if len(exemptedLayers) > 0 {
f.exemptedLayers = make([]int32, len(exemptedLayers))
copy(f.exemptedLayers, exemptedLayers)
} else {
f.exemptedLayers = nil
}
}
func (f *Forwarder) getOptimalBandwidthNeeded(brs Bitrates, maxLayers VideoLayers) int64 {
if f.muted {
return 0
}
for i := maxLayers.Spatial; i >= 0; i-- {
for j := maxLayers.Temporal; j >= 0; j-- {
if brs[i][j] == 0 {
continue
}
return brs[i][j]
}
}
return 0
}
func (f *Forwarder) bitrateAvailable(brs Bitrates) bool {
neededLayers := 0
bitrateAvailableLayers := 0
for _, layer := range f.availableLayers {
if layer > f.maxLayers.Spatial {
continue
}
//
// Layers could be exempted from stream tracker.
// If such a layer actually stops, it will not
// be removed from available layers as it is exempt.
// But, it could have zero bit rate as it actually stopped.
// So, do not take exempt layers into bitrate availability condition.
//
exempt := false
for _, el := range f.exemptedLayers {
if layer == el {
exempt = true
break
}
}
if exempt {
continue
}
neededLayers++
for t := f.maxLayers.Temporal; t >= 0; t-- {
if brs[layer][t] != 0 {
bitrateAvailableLayers++
break
}
}
}
return bitrateAvailableLayers == neededLayers
}
func (f *Forwarder) getDistanceToDesired(brs Bitrates, targetLayers VideoLayers, maxLayers VideoLayers) int32 {
if f.muted {
return 0
}
found := false
distance := int32(0)
for s := maxLayers.Spatial; s >= 0; s-- {
for t := maxLayers.Temporal; t >= 0; t-- {
if brs[s][t] == 0 {
continue
}
if s == targetLayers.Spatial && t == targetLayers.Temporal {
found = true
break
}
distance++
}
if found {
break
}
}
// maybe overshooting
if !found && targetLayers.IsValid() {
distance = 0
for s := targetLayers.Spatial; s > f.maxLayers.Spatial; s-- {
for t := f.maxLayers.Temporal; t >= 0; t-- {
if targetLayers.Temporal < t || brs[s][t] == 0 {
continue
}
distance--
}
}
}
return distance
}
func (f *Forwarder) IsDeficient() bool {
f.lock.RLock()
defer f.lock.RUnlock()
return f.lastAllocation.state == VideoAllocationStateDeficient
}
func (f *Forwarder) BandwidthRequested(brs Bitrates) int64 {
f.lock.RLock()
defer f.lock.RUnlock()
if f.targetLayers == InvalidLayers {
return 0
}
return brs[f.targetLayers.Spatial][f.targetLayers.Temporal]
}
func (f *Forwarder) DistanceToDesired() int32 {
f.lock.RLock()
defer f.lock.RUnlock()
return f.lastAllocation.distanceToDesired
}
func (f *Forwarder) AllocateOptimal(brs Bitrates, allowOvershoot bool) VideoAllocation {
f.lock.Lock()
defer f.lock.Unlock()
if f.kind == webrtc.RTPCodecTypeAudio {
return f.lastAllocation
}
alloc := VideoAllocation{
availableLayers: f.availableLayers,
exemptedLayers: f.exemptedLayers,
bitrates: brs,
targetLayers: InvalidLayers,
}
switch {
case f.muted:
alloc.state = VideoAllocationStateMuted
case len(f.availableLayers) == 0:
// feed is dry
alloc.state = VideoAllocationStateFeedDry
case !f.bitrateAvailable(brs):
// feed bitrate not yet calculated for all available layers
alloc.state = VideoAllocationStateAwaitingMeasurement
//
// Resume with the highest layer available <= max subscribed layer
// If already resumed, move allocation to the highest available layer <= max subscribed layer
//
alloc.targetLayers = VideoLayers{
Spatial: int32(math.Min(float64(f.maxLayers.Spatial), float64(f.availableLayers[len(f.availableLayers)-1]))),
Temporal: int32(math.Max(0, float64(f.maxLayers.Temporal))),
}
default:
// allocate best layer available
for s := f.maxLayers.Spatial; s >= 0; s-- {
for t := f.maxLayers.Temporal; t >= 0; t-- {
if brs[s][t] == 0 {
continue
}
alloc.targetLayers = VideoLayers{
Spatial: s,
Temporal: t,
}
alloc.bandwidthRequested = brs[s][t]
alloc.state = VideoAllocationStateOptimal
break
}
if alloc.bandwidthRequested != 0 {
break
}
}
if alloc.bandwidthRequested == 0 && f.maxLayers.IsValid() && allowOvershoot {
// if we cannot allocate anything below max layer,
// look for a layer above. It is okay to overshoot
// in optimal allocation (i.e. no bandwidth restrictions).
// It is possible that clients send only a higher layer.
// To accommodate cases like that, try finding a layer
// above the requested maximum to ensure streaming
for s := f.maxLayers.Spatial + 1; s <= DefaultMaxLayerSpatial; s++ {
for t := int32(0); t <= DefaultMaxLayerTemporal; t++ {
if brs[s][t] == 0 {
continue
}
alloc.targetLayers = VideoLayers{
Spatial: s,
Temporal: t,
}
alloc.bandwidthRequested = brs[s][t]
alloc.state = VideoAllocationStateOptimal
f.logger.Infow("allowing overshoot", "maxLayer", f.maxLayers, "targetLayers", alloc.targetLayers)
break
}
if alloc.bandwidthRequested != 0 {
break
}
}
}
if alloc.bandwidthRequested == 0 && f.maxLayers.IsValid() {
// if overshoot was allowed and it did not also find a layer,
// keep target at exempted layer (if available) and the current layer is at that level.
// i. e. exempted layer may really have stopped, so a layer switch to an exempted layer should
// not happen as layer switch will send PLI requests. Just letting it continue at the current
// layer if the current is exempted will protect against any stream tracker misdetects
// OR latch on to the layer quicker when it restarts
if f.currentLayers.IsValid() {
for _, s := range f.exemptedLayers {
if s <= f.maxLayers.Spatial && f.currentLayers.Spatial == s {
alloc.targetLayers = f.currentLayers
alloc.bandwidthRequested = brs[alloc.targetLayers.Spatial][alloc.targetLayers.Temporal]
alloc.state = VideoAllocationStateDeficient
break
}
}
}
}
if !alloc.targetLayers.IsValid() && f.maxLayers.IsValid() {
alloc.state = VideoAllocationStateDeficient
}
}
if !alloc.targetLayers.IsValid() {
alloc.targetLayers = InvalidLayers
}
alloc.bandwidthDelta = alloc.bandwidthRequested - f.lastAllocation.bandwidthRequested
alloc.distanceToDesired = f.getDistanceToDesired(brs, alloc.targetLayers, f.maxLayers)
return f.updateAllocation(alloc, "optimal")
}
func (f *Forwarder) ProvisionalAllocatePrepare(bitrates Bitrates) {
f.lock.Lock()
defer f.lock.Unlock()
f.provisional = &VideoAllocationProvisional{
allocatedLayers: InvalidLayers,
muted: f.muted,
bitrates: bitrates,
maxLayers: f.maxLayers,
}
if len(f.availableLayers) > 0 {
f.provisional.availableLayers = make([]int32, len(f.availableLayers))
copy(f.provisional.availableLayers, f.availableLayers)
}
if len(f.exemptedLayers) > 0 {
f.provisional.exemptedLayers = make([]int32, len(f.exemptedLayers))
copy(f.provisional.exemptedLayers, f.exemptedLayers)
}
}
func (f *Forwarder) ProvisionalAllocate(availableChannelCapacity int64, layers VideoLayers, allowPause bool, allowOvershoot bool) int64 {
f.lock.Lock()
defer f.lock.Unlock()
if f.provisional.muted || !f.provisional.maxLayers.IsValid() || (!allowOvershoot && layers.GreaterThan(f.provisional.maxLayers)) {
return 0
}
maybeAdoptExempted := func() int64 {
br := int64(0)
if f.currentLayers.IsValid() {
for _, s := range f.provisional.exemptedLayers {
if s <= f.provisional.maxLayers.Spatial && f.currentLayers.Spatial == s {
f.provisional.allocatedLayers = f.currentLayers
br = f.provisional.bitrates[f.provisional.allocatedLayers.Spatial][f.provisional.allocatedLayers.Temporal]
break
}
}
}
return br
}
requiredBitrate := f.provisional.bitrates[layers.Spatial][layers.Temporal]
if requiredBitrate == 0 {
return maybeAdoptExempted()
}
alreadyAllocatedBitrate := int64(0)
if f.provisional.allocatedLayers != InvalidLayers {
alreadyAllocatedBitrate = f.provisional.bitrates[f.provisional.allocatedLayers.Spatial][f.provisional.allocatedLayers.Temporal]
}
// a layer under maximum fits, take it
if !layers.GreaterThan(f.provisional.maxLayers) && requiredBitrate <= (availableChannelCapacity+alreadyAllocatedBitrate) {
f.provisional.allocatedLayers = layers
return requiredBitrate - alreadyAllocatedBitrate
}
//
// Given layer does not fit. But overshoot is allowed.
// Could be one of
// 1. a layer below maximum that does not fit
// 2. a layer above maximum which may or may not fit.
// In any of those cases, take the lowest possible layer if pause is not allowed
//
if !allowPause && (f.provisional.allocatedLayers == InvalidLayers || !layers.GreaterThan(f.provisional.allocatedLayers)) {
f.provisional.allocatedLayers = layers
return requiredBitrate - alreadyAllocatedBitrate
}
return maybeAdoptExempted()
}
func (f *Forwarder) ProvisionalAllocateGetCooperativeTransition(allowOvershoot bool) VideoTransition {
//
// This is called when a track needs a change (could be mute/unmute, subscribed layers changed, published layers changed)
// when channel is congested.
//
// The goal is to provide a co-operative transition. Co-operative stream allocation aims to keep all the streams active
// as much as possible.
//
// When channel is congested, effecting a transition which will consume more bits will lead to more congestion.
// So, this routine does the following
// 1. When muting, it is not going to increase consumption.
// 2. If the stream is currently active and the transition needs more bits (higher layers = more bits), do not make the up move.
// The higher layer requirement could be due to a new published layer becoming available or subscribed layers changing.
// 3. If the new target layers are lower than current target, take the move down and save bits.
// 4. If not currently streaming, find the minimum layers that can unpause the stream.
//
// To summarize, co-operative streaming means
// - Try to keep tracks streaming, i.e. no pauses at the expense of some streams not being at optimal layers
// - Do not make an upgrade as it could affect other tracks
//
f.lock.Lock()
defer f.lock.Unlock()
if f.provisional.muted {
f.provisional.allocatedLayers = InvalidLayers
return VideoTransition{
from: f.targetLayers,
to: InvalidLayers,
bandwidthDelta: 0 - f.lastAllocation.bandwidthRequested,
// LK-TODO should this take current bitrate of current target layers?
}
}
// check if we should preserve current target
if f.targetLayers != InvalidLayers {
// what is the highest that is available
maximalLayers := InvalidLayers
maximalBandwidthRequired := int64(0)
for s := f.provisional.maxLayers.Spatial; s >= 0; s-- {
for t := f.provisional.maxLayers.Temporal; t >= 0; t-- {
if f.provisional.bitrates[s][t] != 0 {
maximalLayers = VideoLayers{Spatial: s, Temporal: t}
maximalBandwidthRequired = f.provisional.bitrates[s][t]
break
}
}
if maximalBandwidthRequired != 0 {
break
}
}
if maximalLayers != InvalidLayers {
if !f.targetLayers.GreaterThan(maximalLayers) && f.provisional.bitrates[f.targetLayers.Spatial][f.targetLayers.Temporal] != 0 {
// currently streaming and wanting an upgrade, just preserve current target in the cooperative scheme of things
f.provisional.allocatedLayers = f.targetLayers
return VideoTransition{
from: f.targetLayers,
to: f.targetLayers,
bandwidthDelta: 0,
}
}
if f.targetLayers.GreaterThan(maximalLayers) {
// maximalLayers <= f.targetLayers, make the down move
f.provisional.allocatedLayers = maximalLayers
return VideoTransition{
from: f.targetLayers,
to: maximalLayers,
bandwidthDelta: maximalBandwidthRequired - f.lastAllocation.bandwidthRequested,
}
}
}
}
findNextLayer := func(
minSpatial, maxSpatial int32,
minTemporal, maxTemporal int32,
) (VideoLayers, int64) {
layers := InvalidLayers
bw := int64(0)
for s := minSpatial; s <= maxSpatial; s++ {
for t := minTemporal; t <= maxTemporal; t++ {
if f.provisional.bitrates[s][t] != 0 {
layers = VideoLayers{Spatial: s, Temporal: t}
bw = f.provisional.bitrates[s][t]
break
}
}
if bw != 0 {
break
}
}
return layers, bw
}
targetLayers := f.targetLayers
bandwidthRequired := int64(0)
if targetLayers == InvalidLayers {
// currently not streaming, find minimal
// NOTE: a layer in feed could have paused and there could be other options than going back to minimal,
// but the cooperative scheme knocks things back to minimal
targetLayers, bandwidthRequired = findNextLayer(
0, f.provisional.maxLayers.Spatial,
0, f.provisional.maxLayers.Temporal,
)
}
// could not find a minimal layer, overshoot if allowed
if bandwidthRequired == 0 && f.provisional.maxLayers.IsValid() && allowOvershoot {
targetLayers, bandwidthRequired = findNextLayer(
f.provisional.maxLayers.Spatial+1, DefaultMaxLayerSpatial,
0, DefaultMaxLayerTemporal,
)
}
// adopt exempted layer if current is at one of the exempted layers below maximum
if bandwidthRequired == 0 && f.provisional.maxLayers.IsValid() && f.currentLayers.IsValid() {
for _, s := range f.provisional.exemptedLayers {
if s <= f.provisional.maxLayers.Spatial && f.currentLayers.Spatial == s {
targetLayers = f.currentLayers
bandwidthRequired = f.provisional.bitrates[targetLayers.Spatial][targetLayers.Temporal]
break
}
}
}
// turn off if nothing found, not even an exempted layer to continue with
if bandwidthRequired == 0 && (!f.currentLayers.IsValid() || f.currentLayers != targetLayers) {
targetLayers = InvalidLayers
}
f.provisional.allocatedLayers = targetLayers
return VideoTransition{
from: f.targetLayers,
to: targetLayers,
bandwidthDelta: bandwidthRequired - f.lastAllocation.bandwidthRequested,
}
}
func (f *Forwarder) ProvisionalAllocateGetBestWeightedTransition() VideoTransition {
//
// This is called when a track needs a change (could be mute/unmute, subscribed layers changed, published layers changed)
// when channel is congested.
//
// The goal is to keep all tracks streaming as much as possible. So, the track that needs a change needs bandwidth to be unpaused.
//
// This tries to figure out how much this track can contribute back to the pool to enable the track that needs to be unpaused.
// 1. Track muted OR feed dry - can contribute everything back in case it was using bandwidth.
// 2. Look at all possible down transitions from current target and find the best offer.
// Best offer is calculated as bandwidth saved moving to a down layer divided by cost.
// Cost has two components
// a. Transition cost: Spatial layer switch is expensive due to key frame requirement, but temporal layer switch is free.
// b. Quality cost: The farther away from desired layers, the higher the quality cost.
//
f.lock.Lock()
defer f.lock.Unlock()
if f.provisional.muted {
f.provisional.allocatedLayers = InvalidLayers
return VideoTransition{
from: f.targetLayers,
to: InvalidLayers,
bandwidthDelta: 0 - f.lastAllocation.bandwidthRequested,
// LK-TODO should this take current bitrate of current target layers?
}
}
maxReachableLayerTemporal := InvalidLayerTemporal
for t := f.provisional.maxLayers.Temporal; t >= 0; t-- {
for s := f.provisional.maxLayers.Spatial; s >= 0; s-- {
if f.provisional.bitrates[s][t] != 0 {
maxReachableLayerTemporal = t
break
}
}
if maxReachableLayerTemporal != InvalidLayerTemporal {
break
}
}
if maxReachableLayerTemporal == InvalidLayerTemporal {
// stick to an exempted layer if available
if f.currentLayers.IsValid() {
for _, s := range f.provisional.exemptedLayers {
if s <= f.provisional.maxLayers.Spatial && f.currentLayers.Spatial == s {
f.provisional.allocatedLayers = f.currentLayers
return VideoTransition{
from: f.targetLayers,
to: f.provisional.allocatedLayers,
bandwidthDelta: 0 - f.lastAllocation.bandwidthRequested,
// LK-TODO should this take current bitrate of current target layers?
}
}
}
}
// feed has gone dry,
f.provisional.allocatedLayers = InvalidLayers
return VideoTransition{
from: f.targetLayers,
to: InvalidLayers,
bandwidthDelta: 0 - f.lastAllocation.bandwidthRequested,
// LK-TODO should this take current bitrate of current target layers?
}
}
// starting from minimum to target, find transition which gives the best
// transition taking into account bits saved vs cost of such a transition
bestLayers := InvalidLayers
bestBandwidthDelta := int64(0)
bestValue := float32(0)
for s := int32(0); s <= f.targetLayers.Spatial; s++ {
for t := int32(0); t <= f.targetLayers.Temporal; t++ {
if s == f.targetLayers.Spatial && t == f.targetLayers.Temporal {
break
}
bandwidthDelta := int64(math.Max(float64(0), float64(f.lastAllocation.bandwidthRequested-f.provisional.bitrates[s][t])))
transitionCost := int32(0)
if f.targetLayers.Spatial != s {
transitionCost = TransitionCostSpatial
}
qualityCost := (maxReachableLayerTemporal+1)*(f.targetLayers.Spatial-s) + (f.targetLayers.Temporal - t)
value := float32(0)
if (transitionCost + qualityCost) != 0 {
value = float32(bandwidthDelta) / float32(transitionCost+qualityCost)
}
if value > bestValue || (value == bestValue && bandwidthDelta > bestBandwidthDelta) {
bestValue = value
bestBandwidthDelta = bandwidthDelta
bestLayers = VideoLayers{Spatial: s, Temporal: t}
}
}
}
f.provisional.allocatedLayers = bestLayers
return VideoTransition{
from: f.targetLayers,
to: bestLayers,
bandwidthDelta: bestBandwidthDelta,
}
}
func (f *Forwarder) ProvisionalAllocateCommit() VideoAllocation {
f.lock.Lock()
defer f.lock.Unlock()
alloc := VideoAllocation{
bandwidthRequested: 0,
bandwidthDelta: -f.lastAllocation.bandwidthRequested,
availableLayers: f.provisional.availableLayers,
exemptedLayers: f.provisional.exemptedLayers,
bitrates: f.provisional.bitrates,
targetLayers: f.provisional.allocatedLayers,
distanceToDesired: f.getDistanceToDesired(f.provisional.bitrates, f.provisional.allocatedLayers, f.provisional.maxLayers),
}
switch {
case f.provisional.muted:
alloc.state = VideoAllocationStateMuted
case len(f.provisional.availableLayers) == 0:
// feed is dry
alloc.state = VideoAllocationStateFeedDry
case f.provisional.allocatedLayers == InvalidLayers:
alloc.state = VideoAllocationStateDeficient
default:
optimalBandwidthNeeded := f.getOptimalBandwidthNeeded(f.provisional.bitrates, f.provisional.maxLayers)
bandwidthRequested := f.provisional.bitrates[f.provisional.allocatedLayers.Spatial][f.provisional.allocatedLayers.Temporal]
alloc.bandwidthRequested = bandwidthRequested
alloc.bandwidthDelta = bandwidthRequested - f.lastAllocation.bandwidthRequested
if f.provisional.allocatedLayers.GreaterThan(f.provisional.maxLayers) || (optimalBandwidthNeeded > 0 && bandwidthRequested >= optimalBandwidthNeeded) {
// could be greater than optimal if overshooting
alloc.state = VideoAllocationStateOptimal
} else {
//
// Optimal bandwidth could be 0 if using exempted layer.
// Exempted layer is still treated as deficient.
//
alloc.state = VideoAllocationStateDeficient
}
}
return f.updateAllocation(alloc, "cooperative")
}
func (f *Forwarder) AllocateNextHigher(availableChannelCapacity int64, brs Bitrates, allowOvershoot bool) (VideoAllocation, bool) {
f.lock.Lock()
defer f.lock.Unlock()
if f.kind == webrtc.RTPCodecTypeAudio {
f.lastAllocation.change = VideoStreamingChangeNone
return f.lastAllocation, false
}
// if not deficient, nothing to do
if f.lastAllocation.state != VideoAllocationStateDeficient {
f.lastAllocation.change = VideoStreamingChangeNone
return f.lastAllocation, false
}
// if targets are still pending, don't increase
if f.targetLayers != InvalidLayers && f.targetLayers != f.currentLayers {
f.lastAllocation.change = VideoStreamingChangeNone
return f.lastAllocation, false
}
optimalBandwidthNeeded := f.getOptimalBandwidthNeeded(brs, f.maxLayers)
alreadyAllocated := int64(0)
if f.targetLayers != InvalidLayers {
alreadyAllocated = brs[f.targetLayers.Spatial][f.targetLayers.Temporal]
}
doAllocation := func(
minSpatial, maxSpatial int32,
minTemporal, maxTemporal int32,
) (bool, VideoAllocation, bool) {
for s := minSpatial; s <= maxSpatial; s++ {
for t := minTemporal; t <= maxTemporal; t++ {
bandwidthRequested := brs[s][t]
if bandwidthRequested == 0 {
continue
}
if !allowOvershoot && bandwidthRequested-alreadyAllocated > availableChannelCapacity {
// next higher available layer does not fit, return
f.lastAllocation.change = VideoStreamingChangeNone
return true, f.lastAllocation, false
}
targetLayers := VideoLayers{Spatial: s, Temporal: t}
alloc := VideoAllocation{
state: VideoAllocationStateDeficient,
bandwidthRequested: bandwidthRequested,
bandwidthDelta: bandwidthRequested - alreadyAllocated,
availableLayers: f.availableLayers,
exemptedLayers: f.exemptedLayers,
bitrates: brs,
targetLayers: targetLayers,
distanceToDesired: f.getDistanceToDesired(brs, targetLayers, f.maxLayers),
}
if targetLayers.GreaterThan(f.maxLayers) || (optimalBandwidthNeeded > 0 && bandwidthRequested >= optimalBandwidthNeeded) {
alloc.state = VideoAllocationStateOptimal
}
return true, f.updateAllocation(alloc, "next-higher"), true
}
}
return false, VideoAllocation{}, false
}
done := false
var allocation VideoAllocation
boosted := false
// try moving temporal layer up in currently streaming spatial layer
if f.targetLayers != InvalidLayers {
done, allocation, boosted = doAllocation(
f.targetLayers.Spatial, f.targetLayers.Spatial,
f.targetLayers.Temporal+1, f.maxLayers.Temporal,
)
if done {
return allocation, boosted
}
}
// try moving spatial layer up if temporal layer move up is not available
done, allocation, boosted = doAllocation(
f.targetLayers.Spatial+1, f.maxLayers.Spatial,
0, f.maxLayers.Temporal,
)
if done {
return allocation, boosted
}
if allowOvershoot && f.maxLayers.IsValid() {
done, allocation, boosted = doAllocation(
f.maxLayers.Spatial+1, DefaultMaxLayerSpatial,
0, DefaultMaxLayerTemporal,
)
if done {
return allocation, boosted
}
}
f.lastAllocation.change = VideoStreamingChangeNone
return f.lastAllocation, false
}
func (f *Forwarder) GetNextHigherTransition(brs Bitrates, allowOvershoot bool) (VideoTransition, bool) {
f.lock.Lock()
defer f.lock.Unlock()
if f.kind == webrtc.RTPCodecTypeAudio {
return VideoTransition{}, false
}
// if not deficient, nothing to do
if f.lastAllocation.state != VideoAllocationStateDeficient {
return VideoTransition{}, false
}
// if targets are still pending, don't increase
if f.targetLayers != InvalidLayers && f.targetLayers != f.currentLayers {
return VideoTransition{}, false
}
alreadyAllocated := int64(0)
if f.targetLayers != InvalidLayers {
alreadyAllocated = brs[f.targetLayers.Spatial][f.targetLayers.Temporal]
}
findNextHigher := func(
minSpatial, maxSpatial int32,
minTemporal, maxTemporal int32,
) (bool, VideoTransition, bool) {
for s := minSpatial; s <= maxSpatial; s++ {
for t := minTemporal; t <= maxTemporal; t++ {
bandwidthRequested := brs[s][t]
if bandwidthRequested == 0 {
continue
}
transition := VideoTransition{
from: f.targetLayers,
to: VideoLayers{Spatial: s, Temporal: t},
bandwidthDelta: bandwidthRequested - alreadyAllocated,
}
return true, transition, true
}
}
return false, VideoTransition{}, false
}
done := false
var transition VideoTransition
isAvailable := false
// try moving temporal layer up in currently streaming spatial layer
if f.targetLayers != InvalidLayers {
done, transition, isAvailable = findNextHigher(
f.targetLayers.Spatial, f.targetLayers.Spatial,
f.targetLayers.Temporal+1, f.maxLayers.Temporal,
)
if done {
return transition, isAvailable
}
}
// try moving spatial layer up if temporal layer move up is not available
done, transition, isAvailable = findNextHigher(
f.targetLayers.Spatial+1, f.maxLayers.Spatial,
0, f.maxLayers.Temporal,
)
if done {
return transition, isAvailable
}
if allowOvershoot && f.maxLayers.IsValid() {
done, transition, isAvailable = findNextHigher(
f.maxLayers.Spatial+1, DefaultMaxLayerSpatial,
0, DefaultMaxLayerTemporal,
)
if done {
return transition, isAvailable
}
}
return VideoTransition{}, false
}
func (f *Forwarder) Pause(brs Bitrates) VideoAllocation {
f.lock.Lock()
defer f.lock.Unlock()
alloc := VideoAllocation{
bandwidthRequested: 0,
bandwidthDelta: 0 - f.lastAllocation.bandwidthRequested,
availableLayers: f.availableLayers,
exemptedLayers: f.exemptedLayers,
bitrates: brs,
targetLayers: InvalidLayers,
distanceToDesired: f.getDistanceToDesired(brs, InvalidLayers, f.maxLayers),
}
switch {
case f.muted:
alloc.state = VideoAllocationStateMuted
case len(f.availableLayers) == 0:
// feed is dry
alloc.state = VideoAllocationStateFeedDry
default:
// feed bitrate is not yet calculated or pausing due to lack of bandwidth
alloc.state = VideoAllocationStateDeficient
}
return f.updateAllocation(alloc, "pause")
}
func (f *Forwarder) updateAllocation(alloc VideoAllocation, reason string) VideoAllocation {
if f.targetLayers == InvalidLayers && alloc.targetLayers.IsValid() {
alloc.change = VideoStreamingChangeResuming
} else if f.targetLayers != InvalidLayers && !alloc.targetLayers.IsValid() {
alloc.change = VideoStreamingChangePausing
}
if alloc.state != f.lastAllocation.state || alloc.targetLayers != f.lastAllocation.targetLayers {
f.logger.Infow(fmt.Sprintf("stream allocation: %s", reason), "allocation", alloc)
}
f.lastAllocation = alloc
if len(alloc.availableLayers) > 0 {
f.lastAllocation.availableLayers = make([]int32, len(alloc.availableLayers))
copy(f.lastAllocation.availableLayers, alloc.availableLayers)
}
if len(alloc.exemptedLayers) > 0 {
f.lastAllocation.exemptedLayers = make([]int32, len(alloc.exemptedLayers))
copy(f.lastAllocation.exemptedLayers, alloc.exemptedLayers)
}
f.setTargetLayers(f.lastAllocation.targetLayers)
if f.targetLayers == InvalidLayers {
f.resyncLocked()
}
return f.lastAllocation
}
func (f *Forwarder) setTargetLayers(targetLayers VideoLayers) {
f.targetLayers = targetLayers
if f.ddLayerSelector != nil {
f.ddLayerSelector.SelectLayer(targetLayers)
}
}
func (f *Forwarder) Resync() {
f.lock.Lock()
defer f.lock.Unlock()
f.resyncLocked()
}
func (f *Forwarder) resyncLocked() {
f.currentLayers = InvalidLayers
f.lastSSRC = 0
}
func (f *Forwarder) CheckSync() (locked bool, layer int32) {
f.lock.RLock()
defer f.lock.RUnlock()
layer = f.targetLayers.Spatial
locked = f.targetLayers.Spatial == f.currentLayers.Spatial
return
}
func (f *Forwarder) FilterRTX(nacks []uint16) (filtered []uint16, disallowedLayers [DefaultMaxLayerSpatial + 1]bool) {
if !FlagFilterRTX {
filtered = nacks
return
}
f.lock.RLock()
defer f.lock.RUnlock()
filtered = f.rtpMunger.FilterRTX(nacks)
//
// Curb RTX when deficient for two cases
// 1. Target layer is lower than current layer. When current hits target, a key frame should flush the decoder.
// 2. Requested layer is higher than current. Current layer's key frame should have flushed encoder.
// Remote might ask for older layer because of its jitter buffer, but let it starve as channel is already congested.
//
// Without the curb, when congestion hits, RTX rate could be so high that it further congests the channel.
//
for layer := int32(0); layer < DefaultMaxLayerSpatial+1; layer++ {
if f.lastAllocation.state == VideoAllocationStateDeficient &&
(f.targetLayers.Spatial < f.currentLayers.Spatial || layer > f.currentLayers.Spatial) {
disallowedLayers[layer] = true
}
}
return
}
func (f *Forwarder) GetTranslationParams(extPkt *buffer.ExtPacket, layer int32) (*TranslationParams, error) {
f.lock.Lock()
defer f.lock.Unlock()
if f.muted {
return &TranslationParams{
shouldDrop: true,
}, nil
}
switch f.kind {
case webrtc.RTPCodecTypeAudio:
return f.getTranslationParamsAudio(extPkt)
case webrtc.RTPCodecTypeVideo:
return f.getTranslationParamsVideo(extPkt, layer)
}
return nil, ErrUnknownKind
}
// should be called with lock held
func (f *Forwarder) getTranslationParamsCommon(extPkt *buffer.ExtPacket, tp *TranslationParams) (*TranslationParams, error) {
if f.lastSSRC != extPkt.Packet.SSRC {
if !f.started {
f.started = true
f.rtpMunger.SetLastSnTs(extPkt)
if f.vp8Munger != nil {
f.vp8Munger.SetLast(extPkt)
}
} else {
// LK-TODO-START
// The below offset calculation is not technically correct.
// Timestamps based on the system time of an intermediate box like
// SFU is not going to be accurate. Packets arrival/processing
// are subject to vagaries of network delays, SFU processing etc.
// But, the correct way is a lot harder. Will have to
// look at RTCP SR to get timestamps and align (and figure out alignment
// of layers and use that during layer switch in simulcast case).
// That can get tricky. Given the complexity of that approach, maybe
// this is just fine till it is not :-).
// LK-TODO-END
// Compute how much time passed between the old RTP extPkt
// and the current packet, and fix timestamp on source change
tDiffMs := (extPkt.Arrival - f.lTSCalc) / 1e6
if tDiffMs < 0 {
tDiffMs = 0
}
td := uint32(tDiffMs * int64(f.codec.ClockRate) / 1000)
if td == 0 {
td = 1
}
f.rtpMunger.UpdateSnTsOffsets(extPkt, 1, td)
if f.vp8Munger != nil {
f.vp8Munger.UpdateOffsets(extPkt)
}
}
f.lastSSRC = extPkt.Packet.SSRC
}
f.lTSCalc = extPkt.Arrival
if tp == nil {
tp = &TranslationParams{}
}
tpRTP, err := f.rtpMunger.UpdateAndGetSnTs(extPkt)
if err != nil {
tp.shouldDrop = true
if err == ErrPaddingOnlyPacket || err == ErrDuplicatePacket || err == ErrOutOfOrderSequenceNumberCacheMiss {
if err == ErrOutOfOrderSequenceNumberCacheMiss {
tp.isDroppingRelevant = true
}
return tp, nil
}
tp.isDroppingRelevant = true
return tp, err
}
tp.rtp = tpRTP
return tp, nil
}
// should be called with lock held
func (f *Forwarder) getTranslationParamsAudio(extPkt *buffer.ExtPacket) (*TranslationParams, error) {
return f.getTranslationParamsCommon(extPkt, nil)
}
// should be called with lock held
func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer int32) (*TranslationParams, error) {
tp := &TranslationParams{}
if f.targetLayers == InvalidLayers {
// stream is paused by streamallocator
tp.shouldDrop = true
return tp, nil
}
if f.ddLayerSelector != nil {
if selected := f.ddLayerSelector.Select(extPkt, tp); !selected {
tp.shouldDrop = true
f.rtpMunger.PacketDropped(extPkt)
return tp, nil
}
}
if f.targetLayers.Spatial != f.currentLayers.Spatial {
if f.targetLayers.Spatial == layer {
if extPkt.KeyFrame || tp.switchingToTargetLayer {
// lock to target layer
f.logger.Infow("locking to target layer", "current", f.currentLayers, "target", f.targetLayers)
f.currentLayers.Spatial = f.targetLayers.Spatial
if !f.isTemporalSupported {
f.currentLayers.Temporal = f.targetLayers.Temporal
}
// TODO : we switch to target layer immediately now since we assume all frame chain is integrity
// if we have frame chain check, should switch only if target chain is not broken and decodable
// if f.ddLayerSelector != nil {
// f.ddLayerSelector.SelectLayer(f.currentLayers)
// }
if f.currentLayers.Spatial >= f.maxLayers.Spatial {
tp.isSwitchingToMaxLayer = true
}
}
}
}
// if we have layer selector, let it decide whether to drop or not
if f.ddLayerSelector == nil && f.currentLayers.Spatial != layer {
tp.shouldDrop = true
return tp, nil
}
if FlagPauseOnDowngrade && f.targetLayers.Spatial < f.currentLayers.Spatial && f.lastAllocation.state == VideoAllocationStateDeficient {
//
// If target layer is lower than both the current and
// maximum subscribed layer, it is due to bandwidth
// constraints that the target layer has been switched down.
// Continuing to send higher layer will only exacerbate the
// situation by putting more stress on the channel. So, drop it.
//
// In the other direction, it is okay to keep forwarding till
// switch point to get a smoother stream till the higher
// layer key frame arrives.
//
// Note that in the case of client subscription layer restriction
// coinciding with server restriction due to bandwidth limitation,
// In the case of subscription change, higher should continue streaming
// to ensure smooth transition.
//
// To differentiate, drop only when in DEFICIENT state.
//
tp.shouldDrop = true
tp.isDroppingRelevant = true
return tp, nil
}
_, err := f.getTranslationParamsCommon(extPkt, tp)
if tp.shouldDrop || f.vp8Munger == nil || len(extPkt.Packet.Payload) == 0 {
return tp, err
}
// catch up temporal layer if necessary
if f.currentLayers.Temporal != f.targetLayers.Temporal {
incomingVP8, ok := extPkt.Payload.(buffer.VP8)
if ok {
if incomingVP8.TIDPresent == 1 && incomingVP8.TID <= uint8(f.targetLayers.Temporal) {
f.currentLayers.Temporal = f.targetLayers.Temporal
}
}
}
tpVP8, err := f.vp8Munger.UpdateAndGet(extPkt, tp.rtp.snOrdering, f.currentLayers.Temporal)
if err != nil {
tp.rtp = nil
tp.shouldDrop = true
if err == ErrFilteredVP8TemporalLayer || err == ErrOutOfOrderVP8PictureIdCacheMiss {
if err == ErrFilteredVP8TemporalLayer {
// filtered temporal layer, update sequence number offset to prevent holes
f.rtpMunger.PacketDropped(extPkt)
}
if err == ErrOutOfOrderVP8PictureIdCacheMiss {
tp.isDroppingRelevant = true
}
return tp, nil
}
tp.isDroppingRelevant = true
return tp, err
}
tp.vp8 = tpVP8
return tp, nil
}
func (f *Forwarder) GetSnTsForPadding(num int) ([]SnTs, error) {
f.lock.Lock()
defer f.lock.Unlock()
// padding is used for probing. Padding packets should be
// at frame boundaries only to ensure decoder sequencer does
// not get out-of-sync. But, when a stream is paused,
// force a frame marker as a restart of the stream will
// start with a key frame which will reset the decoder.
forceMarker := false
if f.targetLayers == InvalidLayers {
forceMarker = true
}
return f.rtpMunger.UpdateAndGetPaddingSnTs(num, 0, 0, forceMarker)
}
func (f *Forwarder) GetSnTsForBlankFrames(frameRate uint32, numPackets int) ([]SnTs, bool, error) {
f.lock.Lock()
defer f.lock.Unlock()
// NOTE: not using diff of current time and previous packet time (lTSCalc) as this
// driven by a timer, there might be slight differences compared to the frame rate.
// As the differences are going to be small (and also not to update RTP time stamp
// by those small differences), not doing the diff.
f.lTSCalc = time.Now().UnixNano()
frameEndNeeded := !f.rtpMunger.IsOnFrameBoundary()
if frameEndNeeded {
numPackets++
}
snts, err := f.rtpMunger.UpdateAndGetPaddingSnTs(numPackets, f.codec.ClockRate, frameRate, frameEndNeeded)
return snts, frameEndNeeded, err
}
func (f *Forwarder) GetPaddingVP8(frameEndNeeded bool) *buffer.VP8 {
f.lock.Lock()
defer f.lock.Unlock()
return f.vp8Munger.UpdateAndGetPadding(!frameEndNeeded)
}
func (f *Forwarder) GetRTPMungerParams() RTPMungerParams {
f.lock.RLock()
defer f.lock.RUnlock()
return f.rtpMunger.GetParams()
}