diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index a5b58524b..573a71d9c 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -66,33 +66,6 @@ var ( H264KeyFrame2x2 = [][]byte{H264KeyFrame2x2SPS, H264KeyFrame2x2PPS, H264KeyFrame2x2IDR} ) -type TranslationParamsRTP struct { - snOrdering SequenceNumberOrdering - sequenceNumber uint16 - timestamp uint32 -} - -type TranslationParamsVP8 struct { - header *buffer.VP8 -} - -type TranslationParams struct { - shouldDrop bool - shouldSendPLI bool - rtp *TranslationParamsRTP - vp8 *TranslationParamsVP8 -} - -type SnTs struct { - sequenceNumber uint16 - timestamp uint32 -} - -type VideoLayers struct { - spatial int32 - temporal int32 -} - type ReceiverReportListener func(dt *DownTrack, report *rtcp.ReceiverReport) // DownTrack implements TrackLocal, is the track used to write packets @@ -547,7 +520,7 @@ func (d *DownTrack) FinalizeAllocate() { d.forwarder.FinalizeAllocate(d.receiver.GetBitrateTemporalCumulative()) } -func (d *DownTrack) AllocateNextHigher() bool { +func (d *DownTrack) AllocateNextHigher() VideoAllocationResult { return d.forwarder.AllocateNextHigher(d.receiver.GetBitrateTemporalCumulative()) } @@ -585,12 +558,12 @@ func (d *DownTrack) CreateSenderReport() *rtcp.SenderReport { return nil } - currentSpatialLayer := d.forwarder.CurrentSpatialLayer() - if currentSpatialLayer == InvalidSpatialLayer { + currentLayers := d.forwarder.CurrentLayers() + if currentLayers == InvalidLayers { return nil } - srRTP, srNTP := d.receiver.GetSenderReportTime(currentSpatialLayer) + srRTP, srNTP := d.receiver.GetSenderReportTime(currentLayers.spatial) if srRTP == 0 { return nil } @@ -719,10 +692,10 @@ func (d *DownTrack) handleRTCP(bytes []byte) { pliOnce := true sendPliOnce := func() { if pliOnce { - targetSpatialLayer := d.forwarder.TargetSpatialLayer() - if targetSpatialLayer != InvalidSpatialLayer { + targetLayers := d.forwarder.TargetLayers() + if targetLayers != InvalidLayers { d.lastPli.set(time.Now().UnixNano()) - d.receiver.SendPLI(targetSpatialLayer) + d.receiver.SendPLI(targetLayers.spatial) pliOnce = false } } @@ -933,7 +906,7 @@ func (d *DownTrack) DebugInfo() map[string]interface{} { "MimeType": d.codec.MimeType, "Bound": d.bound.get(), "Muted": d.forwarder.Muted(), - "CurrentSpatialLayer": d.forwarder.CurrentSpatialLayer, + "CurrentSpatialLayer": d.forwarder.CurrentLayers().spatial, "Stats": stats, } } diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 1e50c8e52..bbf70e2a9 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -1,6 +1,7 @@ package sfu import ( + "math" "strings" "sync" @@ -13,17 +14,8 @@ import ( // Forwarder // const ( - InvalidSpatialLayer = -1 - InvalidTemporalLayer = -1 -) - -type SequenceNumberOrdering int - -const ( - SequenceNumberOrderingContiguous SequenceNumberOrdering = iota - SequenceNumberOrderingOutOfOrder - SequenceNumberOrderingGap - SequenceNumberOrderingDuplicate + DefaultMaxSpatialLayer = int32(2) + DefaultMaxTemporalLayer = int32(3) ) type ForwardingStatus int @@ -34,6 +26,20 @@ const ( ForwardingStatusOptimal ) +type LayerDirection int + +const ( + LayerDirectionLowToHigh LayerDirection = iota + LayerDirectionHighToLow +) + +type LayerPreference int + +const ( + LayerPreferenceSpatial LayerPreference = iota + LayerPreferenceTemporal +) + type VideoStreamingChange int const ( @@ -58,8 +64,28 @@ type VideoAllocationResult struct { state VideoAllocationState bandwidthRequested int64 bandwidthDelta int64 + layersChanged bool } +type TranslationParams struct { + shouldDrop bool + shouldSendPLI bool + rtp *TranslationParamsRTP + vp8 *TranslationParamsVP8 +} + +type VideoLayers struct { + spatial int32 + temporal int32 +} + +var ( + InvalidLayers = VideoLayers{ + spatial: -1, + temporal: -1, + } +) + type Forwarder struct { lock sync.RWMutex codec webrtc.RTPCodecCapability @@ -71,13 +97,9 @@ type Forwarder struct { lastSSRC uint32 lTSCalc int64 - maxSpatialLayer int32 - currentSpatialLayer int32 - targetSpatialLayer int32 - - maxTemporalLayer int32 - currentTemporalLayer int32 - targetTemporalLayer int32 + maxLayers VideoLayers + currentLayers VideoLayers + targetLayers VideoLayers lastAllocationState VideoAllocationState lastAllocationRequestBps int64 @@ -94,10 +116,8 @@ func NewForwarder(codec webrtc.RTPCodecCapability, kind webrtc.RTPCodecType) *Fo kind: kind, // start off with nothing, let streamallocator set things - currentSpatialLayer: InvalidSpatialLayer, - targetSpatialLayer: InvalidSpatialLayer, - currentTemporalLayer: InvalidTemporalLayer, - targetTemporalLayer: InvalidTemporalLayer, + currentLayers: InvalidLayers, + targetLayers: InvalidLayers, lastAllocationState: VideoAllocationStateNone, @@ -109,11 +129,12 @@ func NewForwarder(codec webrtc.RTPCodecCapability, kind webrtc.RTPCodecType) *Fo } if f.kind == webrtc.RTPCodecTypeVideo { - f.maxSpatialLayer = 2 - f.maxTemporalLayer = 2 + f.maxLayers = VideoLayers{ + spatial: DefaultMaxSpatialLayer, + temporal: DefaultMaxTemporalLayer, + } } else { - f.maxSpatialLayer = InvalidSpatialLayer - f.maxTemporalLayer = InvalidTemporalLayer + f.maxLayers = InvalidLayers } return f @@ -142,67 +163,58 @@ func (f *Forwarder) SetMaxSpatialLayer(spatialLayer int32) (bool, VideoLayers) { f.lock.Lock() defer f.lock.Unlock() - if spatialLayer == f.maxSpatialLayer { - return false, VideoLayers{} + if f.kind == webrtc.RTPCodecTypeAudio || spatialLayer == f.maxLayers.spatial { + return false, InvalidLayers } - f.maxSpatialLayer = spatialLayer + f.maxLayers.spatial = spatialLayer - return true, VideoLayers{ - spatial: f.maxSpatialLayer, - temporal: f.maxTemporalLayer, - } -} - -func (f *Forwarder) CurrentSpatialLayer() int32 { - f.lock.RLock() - defer f.lock.RUnlock() - - return f.currentSpatialLayer -} - -func (f *Forwarder) TargetSpatialLayer() int32 { - f.lock.RLock() - defer f.lock.RUnlock() - - return f.targetSpatialLayer + return true, f.maxLayers } func (f *Forwarder) SetMaxTemporalLayer(temporalLayer int32) (bool, VideoLayers) { f.lock.Lock() defer f.lock.Unlock() - if temporalLayer == f.maxTemporalLayer { - return false, VideoLayers{} + if f.kind == webrtc.RTPCodecTypeAudio || temporalLayer == f.maxLayers.temporal { + return false, InvalidLayers } - f.maxTemporalLayer = temporalLayer + f.maxLayers.temporal = temporalLayer - return true, VideoLayers{ - spatial: f.maxSpatialLayer, - temporal: f.maxTemporalLayer, - } + return true, f.maxLayers } func (f *Forwarder) MaxLayers() VideoLayers { f.lock.RLock() defer f.lock.RUnlock() - return VideoLayers{ - spatial: f.maxSpatialLayer, - temporal: f.maxTemporalLayer, - } + return f.maxLayers +} + +func (f *Forwarder) CurrentLayers() VideoLayers { + f.lock.RLock() + defer f.lock.RUnlock() + + return f.currentLayers +} + +func (f *Forwarder) TargetLayers() VideoLayers { + f.lock.RLock() + defer f.lock.RUnlock() + + return f.targetLayers } func (f *Forwarder) GetForwardingStatus() ForwardingStatus { f.lock.RLock() defer f.lock.RUnlock() - if f.targetSpatialLayer == InvalidSpatialLayer { + if f.targetLayers == InvalidLayers { return ForwardingStatusOff } - if f.targetSpatialLayer < f.maxSpatialLayer { + if f.targetLayers.spatial < f.maxLayers.spatial { return ForwardingStatusPartial } @@ -217,17 +229,13 @@ func (f *Forwarder) UptrackLayersChange(availableLayers []uint16) { } func (f *Forwarder) disable() { - f.currentSpatialLayer = InvalidSpatialLayer - f.targetSpatialLayer = InvalidSpatialLayer - - f.currentTemporalLayer = InvalidTemporalLayer - f.targetTemporalLayer = InvalidTemporalLayer + f.currentLayers = InvalidLayers + f.targetLayers = InvalidLayers } func (f *Forwarder) getOptimalBandwidthNeeded(brs [3][4]int64) int64 { - // LK-TODO for temporal preference, traverse the bitrates array the other way - for i := f.maxSpatialLayer; i >= 0; i-- { - for j := f.maxTemporalLayer; j >= 0; j-- { + for i := f.maxLayers.spatial; i >= 0; i-- { + for j := f.maxLayers.temporal; j >= 0; j-- { if brs[i][j] == 0 { continue } @@ -239,6 +247,143 @@ func (f *Forwarder) getOptimalBandwidthNeeded(brs [3][4]int64) int64 { return 0 } +func (f *Forwarder) toVideoAllocationResult(targetLayers VideoLayers, brs [3][4]int64, optimalBandwidthNeeded int64, canPause bool) (result VideoAllocationResult) { + if targetLayers == InvalidLayers && !canPause { + // do not pause if preserving even if allocation does not fit in available channel capacity. + // although preserving, currently streamed layers could have a different bitrate compared to + // when the allocation was done. But not updating to prevent entropy. Let channel + // changes catch up and update state on a fresh allocation. + result.state = f.lastAllocationState + result.bandwidthRequested = f.lastAllocationRequestBps + return + } + + // change in streaming state? + switch { + case f.targetLayers != InvalidLayers && targetLayers == InvalidLayers: + result.change = VideoStreamingChangePausing + case f.targetLayers == InvalidLayers && targetLayers != InvalidLayers: + result.change = VideoStreamingChangeResuming + } + + // how much bandwidth is needed and delta from previous allocation + if targetLayers == InvalidLayers { + result.bandwidthRequested = 0 + } else { + result.bandwidthRequested = brs[targetLayers.spatial][targetLayers.temporal] + } + result.bandwidthDelta = result.bandwidthRequested - f.lastAllocationRequestBps + + // state of allocation + if result.bandwidthRequested == optimalBandwidthNeeded { + result.state = VideoAllocationStateOptimal + } else { + result.state = VideoAllocationStateDeficient + } + + // have allocated layers changed? + if f.targetLayers != targetLayers { + result.layersChanged = true + } + + return +} + +func (f *Forwarder) updateAllocationState(targetLayers VideoLayers, result VideoAllocationResult) { + f.lastAllocationState = result.state + f.lastAllocationRequestBps = result.bandwidthRequested + + if result.layersChanged { + f.targetLayers = targetLayers + } +} + +func (f *Forwarder) findBestLayers( + minLayers VideoLayers, + maxLayers VideoLayers, + brs [3][4]int64, + optimalBandwidthNeeded int64, + direction LayerDirection, + preference LayerPreference, + availableChannelCapacity int64, + canPause bool, +) (result VideoAllocationResult) { + targetLayers := InvalidLayers + + switch direction { + case LayerDirectionLowToHigh: + switch preference { + case LayerPreferenceSpatial: + for i := minLayers.spatial; i <= maxLayers.spatial; i++ { + for j := minLayers.temporal; j <= maxLayers.temporal; j++ { + if brs[i][j] != 0 && brs[i][j] < availableChannelCapacity { + targetLayers = VideoLayers{ + spatial: i, + temporal: j, + } + break + } + } + if targetLayers != InvalidLayers { + break + } + } + case LayerPreferenceTemporal: + for i := minLayers.temporal; i <= maxLayers.temporal; i++ { + for j := minLayers.spatial; j <= maxLayers.spatial; j++ { + if brs[j][i] != 0 && brs[j][i] < availableChannelCapacity { + targetLayers = VideoLayers{ + spatial: j, + temporal: i, + } + break + } + } + if targetLayers != InvalidLayers { + break + } + } + } + case LayerDirectionHighToLow: + switch preference { + case LayerPreferenceSpatial: + for i := maxLayers.spatial; i >= minLayers.spatial; i-- { + for j := maxLayers.temporal; j >= minLayers.temporal; j-- { + if brs[i][j] != 0 && brs[i][j] < availableChannelCapacity { + targetLayers = VideoLayers{ + spatial: i, + temporal: j, + } + break + } + } + if targetLayers != InvalidLayers { + break + } + } + case LayerPreferenceTemporal: + for i := maxLayers.temporal; i >= minLayers.temporal; i-- { + for j := maxLayers.spatial; j >= minLayers.spatial; j-- { + if brs[j][i] != 0 && brs[j][i] < availableChannelCapacity { + targetLayers = VideoLayers{ + spatial: j, + temporal: i, + } + break + } + } + if targetLayers != InvalidLayers { + break + } + } + } + } + + result = f.toVideoAllocationResult(targetLayers, brs, optimalBandwidthNeeded, canPause) + f.updateAllocationState(targetLayers, result) + return +} + func (f *Forwarder) allocate(availableChannelCapacity int64, canPause bool, brs [3][4]int64) (result VideoAllocationResult) { // should never get called on audio tracks, just for safety if f.kind == webrtc.RTPCodecTypeAudio { @@ -276,23 +421,19 @@ func (f *Forwarder) allocate(availableChannelCapacity int64, canPause bool, brs // channel capacity allows a free pass. // So, resume with the highest layer available <= max subscribed layer // If already resumed, move allocation to the highest available layer <= max subscribed layer - if f.targetSpatialLayer == InvalidSpatialLayer { + if f.targetLayers == InvalidLayers { result.change = VideoStreamingChangeResuming } - f.targetSpatialLayer = int32(f.availableLayers[len(f.availableLayers)-1]) - if f.targetSpatialLayer > f.maxSpatialLayer { - f.targetSpatialLayer = f.maxSpatialLayer - } - - f.targetTemporalLayer = f.maxTemporalLayer - if f.targetTemporalLayer == InvalidTemporalLayer { - f.targetTemporalLayer = 0 + f.targetLayers.spatial = int32(f.availableLayers[len(f.availableLayers)-1]) + if f.targetLayers.spatial > f.maxLayers.spatial { + f.targetLayers.spatial = f.maxLayers.spatial } + f.targetLayers.temporal = int32(math.Max(0, float64(f.maxLayers.temporal))) } else { // if not optimistically started, nothing else to do - if f.targetSpatialLayer == InvalidSpatialLayer { + if f.targetLayers == InvalidLayers { return } @@ -313,54 +454,20 @@ func (f *Forwarder) allocate(availableChannelCapacity int64, canPause bool, brs return } - // LK-TODO for temporal preference, traverse the bitrates array the other way - for i := f.maxSpatialLayer; i >= 0; i-- { - for j := f.maxTemporalLayer; j >= 0; j-- { - if brs[i][j] == 0 { - continue - } - if brs[i][j] < availableChannelCapacity { - if f.targetSpatialLayer == InvalidSpatialLayer { - result.change = VideoStreamingChangeResuming - } - result.bandwidthRequested = brs[i][j] - result.bandwidthDelta = result.bandwidthRequested - f.lastAllocationRequestBps - if result.bandwidthRequested == optimalBandwidthNeeded { - result.state = VideoAllocationStateOptimal - } else { - result.state = VideoAllocationStateDeficient - } - - f.lastAllocationState = result.state - f.lastAllocationRequestBps = result.bandwidthRequested - - f.targetSpatialLayer = int32(i) - f.targetTemporalLayer = int32(j) - return - } - } + minLayers := VideoLayers{ + spatial: 0, + temporal: 0, } - - if !canPause { - // do not pause if preserving - // although preserving, currently streamed layers could have a different bitrate, - // but not updating to prevent entropy increase. - result.state = f.lastAllocationState - return - } - - // no layer fits in the available channel capacity, disable the track - if f.targetSpatialLayer != InvalidSpatialLayer { - result.change = VideoStreamingChangePausing - } - result.state = VideoAllocationStateDeficient - result.bandwidthRequested = 0 - result.bandwidthDelta = result.bandwidthRequested - f.lastAllocationRequestBps - - f.lastAllocationState = result.state - f.lastAllocationRequestBps = result.bandwidthRequested - - f.disable() + result = f.findBestLayers( + minLayers, + f.maxLayers, + brs, + optimalBandwidthNeeded, + LayerDirectionHighToLow, + LayerPreferenceSpatial, + availableChannelCapacity, + canPause, + ) return } @@ -398,87 +505,94 @@ func (f *Forwarder) FinalizeAllocate(brs [3][4]int64) { return } - // LK-TODO for temporal preference, traverse the bitrates array the other way - for i := f.maxSpatialLayer; i >= 0; i-- { - for j := f.maxTemporalLayer; j >= 0; j-- { - if brs[i][j] == 0 { - continue - } - - f.lastAllocationState = VideoAllocationStateOptimal - f.lastAllocationRequestBps = brs[i][j] - - f.targetSpatialLayer = int32(i) - f.targetTemporalLayer = int32(j) - return - } + minLayers := VideoLayers{ + spatial: 0, + temporal: 0, } + f.findBestLayers( + minLayers, + f.maxLayers, + brs, + optimalBandwidthNeeded, + LayerDirectionHighToLow, + LayerPreferenceSpatial, + ChannelCapacityInfinity, + false, + ) } -func (f *Forwarder) AllocateNextHigher(brs [3][4]int64) bool { +func (f *Forwarder) AllocateNextHigher(brs [3][4]int64) (result VideoAllocationResult) { f.lock.Lock() defer f.lock.Unlock() if f.kind == webrtc.RTPCodecTypeAudio { - return false + return + } + + // if not deficient, nothing to do + if f.lastAllocationState != VideoAllocationStateDeficient { + return } // if targets are still pending, don't increase - if f.targetSpatialLayer != InvalidSpatialLayer { - if f.targetSpatialLayer != f.currentSpatialLayer || f.targetTemporalLayer != f.currentTemporalLayer { - return false + if f.targetLayers != InvalidLayers { + if f.targetLayers != f.currentLayers { + return } } optimalBandwidthNeeded := f.getOptimalBandwidthNeeded(brs) if optimalBandwidthNeeded == 0 { - if len(f.availableLayers) == 0 { - f.lastAllocationState = VideoAllocationStateFeedDry - f.lastAllocationRequestBps = 0 - return false + // either feed is dry or awaiting measurement, don't hunt for higher + return + } + + // try moving temporal layer up in currently streaming spatial layer + if f.targetLayers != InvalidLayers { + minLayers := VideoLayers{ + spatial: f.targetLayers.spatial, + temporal: f.targetLayers.temporal + 1, } - - // bitrates not available yet - f.lastAllocationState = VideoAllocationStateAwaitingMeasurement - f.lastAllocationRequestBps = 0 - return false - } - - // try moving temporal layer up in the current spatial layer - nextTemporalLayer := f.currentTemporalLayer + 1 - currentSpatialLayer := f.currentSpatialLayer - if currentSpatialLayer == InvalidSpatialLayer { - currentSpatialLayer = 0 - } - if nextTemporalLayer <= f.maxTemporalLayer && brs[currentSpatialLayer][nextTemporalLayer] > 0 { - f.targetSpatialLayer = currentSpatialLayer - f.targetTemporalLayer = nextTemporalLayer - - f.lastAllocationRequestBps = brs[currentSpatialLayer][nextTemporalLayer] - if f.lastAllocationRequestBps < optimalBandwidthNeeded { - f.lastAllocationState = VideoAllocationStateDeficient - } else { - f.lastAllocationState = VideoAllocationStateOptimal + maxLayers := VideoLayers{ + spatial: f.targetLayers.spatial, + temporal: f.maxLayers.temporal, } - return true - } - - // try moving spatial layer up if already at max temporal layer of current spatial layer - nextSpatialLayer := f.currentSpatialLayer + 1 - if nextSpatialLayer <= f.maxSpatialLayer && brs[nextSpatialLayer][0] > 0 { - f.targetSpatialLayer = nextSpatialLayer - f.targetTemporalLayer = 0 - - f.lastAllocationRequestBps = brs[nextSpatialLayer][0] - if f.lastAllocationRequestBps < optimalBandwidthNeeded { - f.lastAllocationState = VideoAllocationStateDeficient - } else { - f.lastAllocationState = VideoAllocationStateOptimal + result = f.findBestLayers( + minLayers, + maxLayers, + brs, + optimalBandwidthNeeded, + LayerDirectionLowToHigh, + LayerPreferenceSpatial, + ChannelCapacityInfinity, + false, + ) + if result.layersChanged { + return } - return true } - return false + // try moving spatial layer up if temporal layer move up is not available + minLayers := VideoLayers{ + spatial: f.targetLayers.spatial + 1, + temporal: 0, + } + maxLayers := VideoLayers{ + spatial: f.maxLayers.spatial, + temporal: f.maxLayers.temporal, + } + result = f.findBestLayers( + minLayers, + maxLayers, + brs, + optimalBandwidthNeeded, + LayerDirectionLowToHigh, + LayerPreferenceSpatial, + ChannelCapacityInfinity, + false, + ) + + return } func (f *Forwarder) AllocationState() VideoAllocationState { @@ -555,30 +669,30 @@ func (f *Forwarder) getTranslationParamsAudio(extPkt *buffer.ExtPacket) (*Transl func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer int32) (*TranslationParams, error) { tp := &TranslationParams{} - if f.targetSpatialLayer == InvalidSpatialLayer { + if f.targetLayers == InvalidLayers { // stream is paused by streamallocator tp.shouldDrop = true return tp, nil } tp.shouldSendPLI = false - if f.targetSpatialLayer != f.currentSpatialLayer { - if f.targetSpatialLayer == layer { + if f.targetLayers.spatial != f.currentLayers.spatial { + if f.targetLayers.spatial == layer { if extPkt.KeyFrame { // lock to target layer - f.currentSpatialLayer = f.targetSpatialLayer + f.currentLayers.spatial = f.targetLayers.spatial } else { tp.shouldSendPLI = true } } } - if f.currentSpatialLayer != layer { + if f.currentLayers.spatial != layer { tp.shouldDrop = true return tp, nil } - if f.targetSpatialLayer < f.currentSpatialLayer && f.targetSpatialLayer < f.maxSpatialLayer { + if f.targetLayers.spatial < f.currentLayers.spatial && f.targetLayers.spatial < f.maxLayers.spatial { // // If target layer is lower than both the current and // maximum subscribed layer, it is due to bandwidth @@ -655,7 +769,17 @@ func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer in return tp, nil } - tpVP8, err := f.vp8Munger.UpdateAndGet(extPkt, tpRTP.snOrdering, f.currentTemporalLayer) + // catch up temporal layer if necessary + if f.currentLayers.temporal != f.targetLayers.temporal { + incomingVP8, ok := extPkt.Payload.(buffer.VP8) + if ok { + if incomingVP8.TIDPresent == 1 && incomingVP8.TID <= uint8(f.targetLayers.temporal) { + f.currentLayers.temporal = f.targetLayers.temporal + } + } + } + + tpVP8, err := f.vp8Munger.UpdateAndGet(extPkt, tpRTP.snOrdering, f.currentLayers.temporal) if err != nil { tp.shouldDrop = true if err == ErrFilteredVP8TemporalLayer || err == ErrOutOfOrderVP8PictureIdCacheMiss { @@ -669,13 +793,6 @@ func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer in return tp, err } - // catch up temporal layer if necessary - if tpVP8 != nil && f.currentTemporalLayer != f.targetTemporalLayer { - if tpVP8.header.TIDPresent == 1 && tpVP8.header.TID <= uint8(f.targetTemporalLayer) { - f.currentTemporalLayer = f.targetTemporalLayer - } - } - tp.rtp = tpRTP tp.vp8 = tpVP8 return tp, nil @@ -691,7 +808,7 @@ func (f *Forwarder) GetSnTsForPadding(num int) ([]SnTs, error) { // force a frame marker as a restart of the stream will // start with a key frame which will reset the decoder. forceMarker := false - if f.targetSpatialLayer == InvalidSpatialLayer { + if f.targetLayers == InvalidLayers { forceMarker = true } return f.rtpMunger.UpdateAndGetPaddingSnTs(num, 0, 0, forceMarker) diff --git a/pkg/sfu/forwarder_test.go b/pkg/sfu/forwarder_test.go new file mode 100644 index 000000000..b213dac20 --- /dev/null +++ b/pkg/sfu/forwarder_test.go @@ -0,0 +1,1174 @@ +package sfu + +import ( + "reflect" + "testing" + + "github.com/livekit/livekit-server/pkg/sfu/buffer" + "github.com/livekit/livekit-server/pkg/sfu/testutils" + + "github.com/pion/webrtc/v3" + + "github.com/stretchr/testify/require" +) + +func TestForwarderMute(t *testing.T) { + f := NewForwarder(testutils.TestOpusCodec, webrtc.RTPCodecTypeAudio) + require.False(t, f.Muted()) + require.False(t, f.Mute(false)) // no change in mute state + require.False(t, f.Muted()) + require.True(t, f.Mute(true)) + require.True(t, f.Muted()) + require.True(t, f.Mute(false)) + require.False(t, f.Muted()) +} + +func TestForwarderLayersAudio(t *testing.T) { + f := NewForwarder(testutils.TestOpusCodec, webrtc.RTPCodecTypeAudio) + + require.Equal(t, InvalidLayers, f.MaxLayers()) + + require.Equal(t, InvalidLayers, f.CurrentLayers()) + require.Equal(t, InvalidLayers, f.TargetLayers()) + + changed, layers := f.SetMaxSpatialLayer(1) + require.False(t, changed) + require.Equal(t, InvalidLayers, layers) + + changed, layers = f.SetMaxTemporalLayer(1) + require.False(t, changed) + require.Equal(t, InvalidLayers, layers) + + require.Equal(t, InvalidLayers, f.MaxLayers()) +} + +func TestForwarderLayersVideo(t *testing.T) { + f := NewForwarder(testutils.TestVP8Codec, webrtc.RTPCodecTypeVideo) + + maxLayers := f.MaxLayers() + expectedLayers := VideoLayers{ + spatial: DefaultMaxSpatialLayer, + temporal: DefaultMaxTemporalLayer, + } + require.Equal(t, expectedLayers, maxLayers) + + require.Equal(t, InvalidLayers, f.CurrentLayers()) + require.Equal(t, InvalidLayers, f.TargetLayers()) + + changed, layers := f.SetMaxSpatialLayer(DefaultMaxSpatialLayer) + require.False(t, changed) + require.Equal(t, InvalidLayers, layers) + + changed, layers = f.SetMaxSpatialLayer(DefaultMaxSpatialLayer - 1) + require.True(t, changed) + expectedLayers = VideoLayers{ + spatial: DefaultMaxSpatialLayer - 1, + temporal: DefaultMaxTemporalLayer, + } + require.Equal(t, expectedLayers, layers) + require.Equal(t, expectedLayers, f.MaxLayers()) + + changed, layers = f.SetMaxTemporalLayer(DefaultMaxTemporalLayer) + require.False(t, changed) + require.Equal(t, InvalidLayers, layers) + + changed, layers = f.SetMaxTemporalLayer(DefaultMaxTemporalLayer - 1) + require.True(t, changed) + expectedLayers = VideoLayers{ + spatial: DefaultMaxSpatialLayer - 1, + temporal: DefaultMaxTemporalLayer - 1, + } + require.Equal(t, expectedLayers, layers) + require.Equal(t, expectedLayers, f.MaxLayers()) +} + +func TestForwarderGetForwardingStatus(t *testing.T) { + f := NewForwarder(testutils.TestVP8Codec, webrtc.RTPCodecTypeVideo) + + require.Equal(t, ForwardingStatusOff, f.GetForwardingStatus()) + + f.targetLayers.spatial = DefaultMaxSpatialLayer - 1 + require.Equal(t, ForwardingStatusPartial, f.GetForwardingStatus()) + + f.targetLayers.spatial = DefaultMaxSpatialLayer + require.Equal(t, ForwardingStatusOptimal, f.GetForwardingStatus()) +} + +func TestForwarderUptrackLayersChange(t *testing.T) { + f := NewForwarder(testutils.TestVP8Codec, webrtc.RTPCodecTypeVideo) + + require.Nil(t, f.availableLayers) + + availableLayers := []uint16{0, 1, 2} + f.UptrackLayersChange(availableLayers) + require.Equal(t, availableLayers, f.availableLayers) + + availableLayers = []uint16{0, 2} + f.UptrackLayersChange(availableLayers) + require.Equal(t, availableLayers, f.availableLayers) + + availableLayers = []uint16{} + f.UptrackLayersChange(availableLayers) + require.Equal(t, availableLayers, f.availableLayers) +} + +func TestForwarderAllocate(t *testing.T) { + f := NewForwarder(testutils.TestVP8Codec, webrtc.RTPCodecTypeVideo) + + emptyBitrates := [DefaultMaxSpatialLayer + 1][DefaultMaxTemporalLayer + 1]int64{} + bitrates := [DefaultMaxSpatialLayer + 1][DefaultMaxTemporalLayer + 1]int64{ + {2, 3, 0, 0}, + {4, 0, 0, 5}, + {0, 7, 0, 0}, + } + + // muted should not consume any bandwidth + f.Mute(true) + f.disable() + expectedResult := VideoAllocationResult{ + change: VideoStreamingChangeNone, + state: VideoAllocationStateMuted, + bandwidthRequested: 0, + bandwidthDelta: 0, + } + result := f.Allocate(ChannelCapacityInfinity, bitrates) + require.Equal(t, expectedResult, result) + require.Equal(t, VideoAllocationStateMuted, f.lastAllocationState) + require.Equal(t, int64(0), f.lastAllocationRequestBps) + + // feed dry state + f.Mute(false) + f.lastAllocationState = VideoAllocationStateNone + f.disable() + expectedResult = VideoAllocationResult{ + change: VideoStreamingChangeNone, + state: VideoAllocationStateFeedDry, + bandwidthRequested: 0, + bandwidthDelta: 0, + } + result = f.Allocate(ChannelCapacityInfinity, emptyBitrates) + require.Equal(t, expectedResult, result) + require.Equal(t, VideoAllocationStateFeedDry, f.lastAllocationState) + require.Equal(t, int64(0), f.lastAllocationRequestBps) + + // awaiting measurement, i. e. bitrates are not available, but layers available + f.lastAllocationState = VideoAllocationStateNone + f.disable() + f.UptrackLayersChange([]uint16{0}) + expectedResult = VideoAllocationResult{ + change: VideoStreamingChangeResuming, + state: VideoAllocationStateAwaitingMeasurement, + bandwidthRequested: 0, + bandwidthDelta: 0, + } + result = f.Allocate(ChannelCapacityInfinity, emptyBitrates) + require.Equal(t, expectedResult, result) + require.Equal(t, VideoAllocationStateAwaitingMeasurement, f.lastAllocationState) + require.Equal(t, int64(0), f.lastAllocationRequestBps) + expectedTargetLayers := VideoLayers{ + spatial: 0, + temporal: DefaultMaxTemporalLayer, + } + require.Equal(t, expectedTargetLayers, f.TargetLayers()) + require.Equal(t, InvalidLayers, f.CurrentLayers()) + + // while awaiting measurement, less than infinite channel capacity should pause the stream + expectedResult = VideoAllocationResult{ + change: VideoStreamingChangePausing, + state: VideoAllocationStateDeficient, + bandwidthRequested: 0, + bandwidthDelta: 0, + } + result = f.Allocate(ChannelCapacityInfinity-1, emptyBitrates) + require.Equal(t, expectedResult, result) + require.Equal(t, VideoAllocationStateDeficient, f.lastAllocationState) + require.Equal(t, int64(0), f.lastAllocationRequestBps) + require.Equal(t, InvalidLayers, f.CurrentLayers()) + require.Equal(t, InvalidLayers, f.TargetLayers()) + + // allocate using bitrates and less than infinite channel capacity, but enough for optimal + expectedResult = VideoAllocationResult{ + change: VideoStreamingChangeResuming, + state: VideoAllocationStateOptimal, + bandwidthRequested: bitrates[2][1], + bandwidthDelta: bitrates[2][1], + layersChanged: true, + } + result = f.Allocate(ChannelCapacityInfinity-1, bitrates) + require.Equal(t, expectedResult, result) + require.Equal(t, VideoAllocationStateOptimal, f.lastAllocationState) + require.Equal(t, bitrates[2][1], f.lastAllocationRequestBps) + require.Equal(t, InvalidLayers, f.CurrentLayers()) + expectedTargetLayers = VideoLayers{ + spatial: 2, + temporal: 1, + } + require.Equal(t, expectedTargetLayers, f.TargetLayers()) + + // give it a bitrate that is less than optimal + expectedResult = VideoAllocationResult{ + change: VideoStreamingChangeNone, + state: VideoAllocationStateDeficient, + bandwidthRequested: bitrates[1][3], + bandwidthDelta: bitrates[1][3] - bitrates[2][1], + layersChanged: true, + } + result = f.Allocate(bitrates[2][1]-1, bitrates) + require.Equal(t, expectedResult, result) + require.Equal(t, VideoAllocationStateDeficient, f.lastAllocationState) + require.Equal(t, bitrates[1][3], f.lastAllocationRequestBps) + require.Equal(t, InvalidLayers, f.CurrentLayers()) + expectedTargetLayers = VideoLayers{ + spatial: 1, + temporal: 3, + } + require.Equal(t, expectedTargetLayers, f.TargetLayers()) + + // give it a bitrate that cannot fit any layer + expectedResult = VideoAllocationResult{ + change: VideoStreamingChangePausing, + state: VideoAllocationStateDeficient, + bandwidthRequested: 0, + bandwidthDelta: 0 - bitrates[1][3], + layersChanged: true, + } + result = f.Allocate(bitrates[0][0]-1, bitrates) + require.Equal(t, expectedResult, result) + require.Equal(t, VideoAllocationStateDeficient, f.lastAllocationState) + require.Equal(t, int64(0), f.lastAllocationRequestBps) + require.Equal(t, InvalidLayers, f.CurrentLayers()) + require.Equal(t, InvalidLayers, f.TargetLayers()) +} + +func TestForwarderTryAllocate(t *testing.T) { + f := NewForwarder(testutils.TestVP8Codec, webrtc.RTPCodecTypeVideo) + + // adjust target layers per given additional channel capacity (which can be negative), + bitrates := [DefaultMaxSpatialLayer + 1][DefaultMaxTemporalLayer + 1]int64{ + {2, 3, 0, 0}, + {4, 0, 0, 5}, + {0, 7, 0, 0}, + } + + f.lastAllocationState = VideoAllocationStateDeficient + f.lastAllocationRequestBps = bitrates[1][3] + f.targetLayers = VideoLayers{ + spatial: 1, + temporal: 3, + } + + expectedResult := VideoAllocationResult{ + change: VideoStreamingChangeNone, + state: VideoAllocationStateDeficient, + bandwidthRequested: bitrates[0][1], + bandwidthDelta: bitrates[0][1] - bitrates[1][3], + layersChanged: true, + } + result := f.TryAllocate(-1, bitrates) + require.Equal(t, expectedResult, result) + require.Equal(t, VideoAllocationStateDeficient, f.lastAllocationState) + require.Equal(t, bitrates[0][1], f.lastAllocationRequestBps) + require.Equal(t, InvalidLayers, f.CurrentLayers()) + expectedTargetLayers := VideoLayers{ + spatial: 0, + temporal: 1, + } + require.Equal(t, expectedTargetLayers, f.TargetLayers()) + + // but should not pause even if no layer fits, i. e. preserve current + expectedResult = VideoAllocationResult{ + change: VideoStreamingChangeNone, + state: VideoAllocationStateDeficient, + bandwidthRequested: bitrates[0][1], + bandwidthDelta: 0, + } + result = f.TryAllocate(-2, bitrates) + require.Equal(t, expectedResult, result) + require.Equal(t, VideoAllocationStateDeficient, f.lastAllocationState) + require.Equal(t, bitrates[0][1], f.lastAllocationRequestBps) + require.Equal(t, InvalidLayers, f.CurrentLayers()) + require.Equal(t, expectedTargetLayers, f.TargetLayers()) + + // can catch up to optimal given enough additional channel capacity + expectedResult = VideoAllocationResult{ + change: VideoStreamingChangeNone, + state: VideoAllocationStateOptimal, + bandwidthRequested: bitrates[2][1], + bandwidthDelta: bitrates[2][1] - bitrates[0][1], + layersChanged: true, + } + result = f.TryAllocate(10, bitrates) + require.Equal(t, expectedResult, result) + require.Equal(t, VideoAllocationStateOptimal, f.lastAllocationState) + require.Equal(t, bitrates[2][1], f.lastAllocationRequestBps) + require.Equal(t, InvalidLayers, f.CurrentLayers()) + expectedTargetLayers = VideoLayers{ + spatial: 2, + temporal: 1, + } + require.Equal(t, expectedTargetLayers, f.TargetLayers()) +} + +func TestForwarderFinalizeAllocate(t *testing.T) { + f := NewForwarder(testutils.TestVP8Codec, webrtc.RTPCodecTypeVideo) + + bitrates := [DefaultMaxSpatialLayer + 1][DefaultMaxTemporalLayer + 1]int64{ + {1, 2, 3, 4}, + {5, 6, 7, 8}, + {9, 10, 11, 12}, + } + // FinalizeAllocate should do nothing unless Forwarder allocation state is VideoAllocationStateAwaitingMeasurement + f.FinalizeAllocate(bitrates) + require.Equal(t, VideoAllocationStateNone, f.lastAllocationState) + + f.lastAllocationState = VideoAllocationStateMuted + f.disable() + f.FinalizeAllocate(bitrates) + require.Equal(t, VideoAllocationStateMuted, f.lastAllocationState) + + f.lastAllocationState = VideoAllocationStateAwaitingMeasurement + f.disable() + f.FinalizeAllocate(bitrates) + require.Equal(t, VideoAllocationStateOptimal, f.lastAllocationState) + require.Equal(t, InvalidLayers, f.CurrentLayers()) + expectedTargetLayers := VideoLayers{ + spatial: 2, + temporal: 3, + } + require.Equal(t, expectedTargetLayers, f.TargetLayers()) + + // no layers available => feed dry + f.lastAllocationState = VideoAllocationStateAwaitingMeasurement + f.disable() + f.FinalizeAllocate([DefaultMaxSpatialLayer + 1][DefaultMaxTemporalLayer + 1]int64{}) + require.Equal(t, VideoAllocationStateFeedDry, f.lastAllocationState) + + // layers available, but still awaiting measurement + f.lastAllocationState = VideoAllocationStateAwaitingMeasurement + f.disable() + f.UptrackLayersChange([]uint16{0, 1}) + f.FinalizeAllocate([DefaultMaxSpatialLayer + 1][DefaultMaxTemporalLayer + 1]int64{}) + require.Equal(t, VideoAllocationStateAwaitingMeasurement, f.lastAllocationState) + + // sparse layers + bitrates = [DefaultMaxSpatialLayer + 1][DefaultMaxTemporalLayer + 1]int64{ + {1, 2, 0, 0}, + {5, 0, 0, 6}, + {0, 0, 0, 0}, + } + f.lastAllocationState = VideoAllocationStateAwaitingMeasurement + f.disable() + f.FinalizeAllocate(bitrates) + require.Equal(t, VideoAllocationStateOptimal, f.lastAllocationState) + require.Equal(t, InvalidLayers, f.CurrentLayers()) + expectedTargetLayers = VideoLayers{ + spatial: 1, + temporal: 3, + } + require.Equal(t, expectedTargetLayers, f.TargetLayers()) +} + +func TestForwarderAllocateNextHigher(t *testing.T) { + f := NewForwarder(testutils.TestOpusCodec, webrtc.RTPCodecTypeAudio) + + emptyBitrates := [DefaultMaxSpatialLayer + 1][DefaultMaxTemporalLayer + 1]int64{} + bitrates := [DefaultMaxSpatialLayer + 1][DefaultMaxTemporalLayer + 1]int64{ + {2, 3, 0, 0}, + {4, 0, 0, 5}, + {0, 7, 0, 0}, + } + + result := f.AllocateNextHigher(bitrates) + require.Equal(t, VideoAllocationResult{}, result) // no layer for audio + + f = NewForwarder(testutils.TestVP8Codec, webrtc.RTPCodecTypeVideo) + + // when not in deficient state, does not boost + f.lastAllocationState = VideoAllocationStateNone + result = f.AllocateNextHigher(bitrates) + require.Equal(t, VideoAllocationResult{}, result) + + // if layers have not caught up, should not allocate next layer + f.lastAllocationState = VideoAllocationStateDeficient + f.targetLayers.spatial = 0 + result = f.AllocateNextHigher(bitrates) + require.Equal(t, VideoAllocationResult{}, result) + f.currentLayers.spatial = 0 + + f.targetLayers.temporal = 0 + result = f.AllocateNextHigher(bitrates) + require.Equal(t, VideoAllocationResult{}, result) + f.currentLayers.temporal = 0 + + f.lastAllocationRequestBps = bitrates[0][0] + + // empty bitrates cannot increase layer + result = f.AllocateNextHigher(emptyBitrates) + require.Equal(t, VideoAllocationResult{}, result) + + // move from (0, 0) -> (0, 1), i. e. a higher temporal layer is available in the same spatial layer + expectedResult := VideoAllocationResult{ + change: VideoStreamingChangeNone, + state: VideoAllocationStateDeficient, + bandwidthRequested: 3, + bandwidthDelta: 1, + layersChanged: true, + } + result = f.AllocateNextHigher(bitrates) + require.Equal(t, expectedResult, result) + require.Equal(t, VideoAllocationStateDeficient, f.lastAllocationState) + require.Equal(t, bitrates[0][1], f.lastAllocationRequestBps) + expectedTargetLayers := VideoLayers{ + spatial: 0, + temporal: 1, + } + require.Equal(t, expectedTargetLayers, f.TargetLayers()) + + // move from (0, 1) -> (1, 0), i. e. a higher spatial layer is available + f.currentLayers.temporal = 1 + expectedResult = VideoAllocationResult{ + change: VideoStreamingChangeNone, + state: VideoAllocationStateDeficient, + bandwidthRequested: 4, + bandwidthDelta: 1, + layersChanged: true, + } + result = f.AllocateNextHigher(bitrates) + require.Equal(t, expectedResult, result) + require.Equal(t, VideoAllocationStateDeficient, f.lastAllocationState) + require.Equal(t, bitrates[1][0], f.lastAllocationRequestBps) + expectedTargetLayers = VideoLayers{ + spatial: 1, + temporal: 0, + } + require.Equal(t, expectedTargetLayers, f.TargetLayers()) + + // next higher, move from (1, 0) -> (1, 3), still deficient though + f.currentLayers.spatial = 1 + f.currentLayers.temporal = 0 + expectedResult = VideoAllocationResult{ + change: VideoStreamingChangeNone, + state: VideoAllocationStateDeficient, + bandwidthRequested: 5, + bandwidthDelta: 1, + layersChanged: true, + } + result = f.AllocateNextHigher(bitrates) + require.Equal(t, expectedResult, result) + require.Equal(t, VideoAllocationStateDeficient, f.lastAllocationState) + require.Equal(t, bitrates[1][3], f.lastAllocationRequestBps) + expectedTargetLayers = VideoLayers{ + spatial: 1, + temporal: 3, + } + require.Equal(t, expectedTargetLayers, f.TargetLayers()) + + // next higher, move from (1, 3) -> (2, 1), optimal allocation + f.currentLayers.temporal = 3 + expectedResult = VideoAllocationResult{ + change: VideoStreamingChangeNone, + state: VideoAllocationStateOptimal, + bandwidthRequested: 7, + bandwidthDelta: 2, + layersChanged: true, + } + result = f.AllocateNextHigher(bitrates) + require.Equal(t, expectedResult, result) + require.Equal(t, VideoAllocationStateOptimal, f.lastAllocationState) + require.Equal(t, bitrates[2][1], f.lastAllocationRequestBps) + expectedTargetLayers = VideoLayers{ + spatial: 2, + temporal: 1, + } + require.Equal(t, expectedTargetLayers, f.TargetLayers()) + + // ask again, should return false as there is no where to go higher + f.currentLayers.spatial = 2 + f.currentLayers.temporal = 1 + result = f.AllocateNextHigher(bitrates) + require.Equal(t, VideoAllocationResult{}, result) + require.Equal(t, VideoAllocationStateOptimal, f.lastAllocationState) + require.Equal(t, bitrates[2][1], f.lastAllocationRequestBps) + require.Equal(t, expectedTargetLayers, f.TargetLayers()) + + // turn off everything, allocating next layer should result + f.disable() + f.lastAllocationState = VideoAllocationStateDeficient + f.lastAllocationRequestBps = 0 + + expectedResult = VideoAllocationResult{ + change: VideoStreamingChangeResuming, + state: VideoAllocationStateDeficient, + bandwidthRequested: 2, + bandwidthDelta: 2, + layersChanged: true, + } + result = f.AllocateNextHigher(bitrates) + require.Equal(t, expectedResult, result) + require.Equal(t, VideoAllocationStateDeficient, f.lastAllocationState) + require.Equal(t, bitrates[0][0], f.lastAllocationRequestBps) + expectedTargetLayers = VideoLayers{ + spatial: 0, + temporal: 0, + } + require.Equal(t, expectedTargetLayers, f.TargetLayers()) +} + +func TestForwarderGetTranslationParamsMuted(t *testing.T) { + f := NewForwarder(testutils.TestVP8Codec, webrtc.RTPCodecTypeVideo) + f.Mute(true) + + params := &testutils.TestExtPacketParams{ + SequenceNumber: 23333, + Timestamp: 0xabcdef, + SSRC: 0x12345678, + } + extPkt, err := testutils.GetTestExtPacket(params) + require.NoError(t, err) + require.NotNil(t, extPkt) + + expectedTP := TranslationParams{ + shouldDrop: true, + } + actualTP, err := f.GetTranslationParams(extPkt, 0) + require.NoError(t, err) + require.Equal(t, expectedTP, *actualTP) +} + +func TestForwarderGetTranslationParamsAudio(t *testing.T) { + f := NewForwarder(testutils.TestOpusCodec, webrtc.RTPCodecTypeAudio) + + params := &testutils.TestExtPacketParams{ + IsHead: true, + SequenceNumber: 23333, + Timestamp: 0xabcdef, + SSRC: 0x12345678, + PayloadSize: 20, + } + extPkt, err := testutils.GetTestExtPacket(params) + + // should lock onto the first packet + expectedTP := TranslationParams{ + rtp: &TranslationParamsRTP{ + snOrdering: SequenceNumberOrderingContiguous, + sequenceNumber: 23333, + timestamp: 0xabcdef, + }, + } + actualTP, err := f.GetTranslationParams(extPkt, 0) + require.NoError(t, err) + require.True(t, reflect.DeepEqual(expectedTP, *actualTP)) + require.True(t, f.started) + require.Equal(t, f.lastSSRC, params.SSRC) + + // send a duplicate, should be dropped + expectedTP = TranslationParams{ + shouldDrop: true, + } + actualTP, err = f.GetTranslationParams(extPkt, 0) + require.NoError(t, err) + require.True(t, reflect.DeepEqual(expectedTP, *actualTP)) + + // out-of-order packet not in cache should be dropped + params = &testutils.TestExtPacketParams{ + SequenceNumber: 23332, + Timestamp: 0xabcdef, + SSRC: 0x12345678, + PayloadSize: 20, + } + extPkt, err = testutils.GetTestExtPacket(params) + + expectedTP = TranslationParams{ + shouldDrop: true, + } + actualTP, err = f.GetTranslationParams(extPkt, 0) + require.NoError(t, err) + require.True(t, reflect.DeepEqual(expectedTP, *actualTP)) + + // padding only packet in order should be dropped + params = &testutils.TestExtPacketParams{ + IsHead: true, + SequenceNumber: 23334, + Timestamp: 0xabcdef, + SSRC: 0x12345678, + } + extPkt, err = testutils.GetTestExtPacket(params) + + expectedTP = TranslationParams{ + shouldDrop: true, + } + actualTP, err = f.GetTranslationParams(extPkt, 0) + require.NoError(t, err) + require.True(t, reflect.DeepEqual(expectedTP, *actualTP)) + + // in order packet should be forwarded + params = &testutils.TestExtPacketParams{ + IsHead: true, + SequenceNumber: 23335, + Timestamp: 0xabcdef, + SSRC: 0x12345678, + PayloadSize: 20, + } + extPkt, err = testutils.GetTestExtPacket(params) + + expectedTP = TranslationParams{ + rtp: &TranslationParamsRTP{ + snOrdering: SequenceNumberOrderingContiguous, + sequenceNumber: 23334, + timestamp: 0xabcdef, + }, + } + actualTP, err = f.GetTranslationParams(extPkt, 0) + require.NoError(t, err) + require.True(t, reflect.DeepEqual(expectedTP, *actualTP)) + + // switching source should lock onto the new source, but sequence number should be contiguous + params = &testutils.TestExtPacketParams{ + IsHead: true, + SequenceNumber: 123, + Timestamp: 0xfedcba, + SSRC: 0x87654321, + PayloadSize: 20, + } + extPkt, err = testutils.GetTestExtPacket(params) + + expectedTP = TranslationParams{ + rtp: &TranslationParamsRTP{ + snOrdering: SequenceNumberOrderingContiguous, + sequenceNumber: 23335, + timestamp: 0xabcdf0, + }, + } + actualTP, err = f.GetTranslationParams(extPkt, 0) + require.NoError(t, err) + require.True(t, reflect.DeepEqual(expectedTP, *actualTP)) + require.Equal(t, f.lastSSRC, params.SSRC) +} + +func TestForwarderGetTranslationParamsVideo(t *testing.T) { + f := NewForwarder(testutils.TestVP8Codec, webrtc.RTPCodecTypeVideo) + + params := &testutils.TestExtPacketParams{ + IsHead: true, + SequenceNumber: 23333, + Timestamp: 0xabcdef, + SSRC: 0x12345678, + PayloadSize: 20, + } + vp8 := &buffer.VP8{ + FirstByte: 25, + PictureIDPresent: 1, + PictureID: 13467, + MBit: true, + TL0PICIDXPresent: 1, + TL0PICIDX: 233, + TIDPresent: 1, + TID: 1, + Y: 1, + KEYIDXPresent: 1, + KEYIDX: 23, + HeaderSize: 6, + IsKeyFrame: false, + } + extPkt, _ := testutils.GetTestExtPacketVP8(params, vp8) + + // no target layers, should drop + expectedTP := TranslationParams{ + shouldDrop: true, + } + actualTP, err := f.GetTranslationParams(extPkt, 0) + require.NoError(t, err) + require.Equal(t, expectedTP, *actualTP) + + // although target layer matches, not a key frame, so should drop and ask to send PLI + f.targetLayers = VideoLayers{ + spatial: 0, + temporal: 1, + } + expectedTP = TranslationParams{ + shouldDrop: true, + shouldSendPLI: true, + } + actualTP, err = f.GetTranslationParams(extPkt, 0) + require.NoError(t, err) + require.Equal(t, expectedTP, *actualTP) + + // should lock onto packet (target layer and key frame) + vp8 = &buffer.VP8{ + FirstByte: 25, + PictureIDPresent: 1, + PictureID: 13467, + MBit: true, + TL0PICIDXPresent: 1, + TL0PICIDX: 233, + TIDPresent: 1, + TID: 1, + Y: 1, + KEYIDXPresent: 1, + KEYIDX: 23, + HeaderSize: 6, + IsKeyFrame: true, + } + extPkt, _ = testutils.GetTestExtPacketVP8(params, vp8) + expectedTP = TranslationParams{ + rtp: &TranslationParamsRTP{ + snOrdering: SequenceNumberOrderingContiguous, + sequenceNumber: 23333, + timestamp: 0xabcdef, + }, + vp8: &TranslationParamsVP8{ + header: &buffer.VP8{ + FirstByte: 25, + PictureIDPresent: 1, + PictureID: 13467, + MBit: true, + TL0PICIDXPresent: 1, + TL0PICIDX: 233, + TIDPresent: 1, + TID: 1, + Y: 1, + KEYIDXPresent: 1, + KEYIDX: 23, + HeaderSize: 6, + IsKeyFrame: true, + }, + }, + } + actualTP, err = f.GetTranslationParams(extPkt, 0) + require.NoError(t, err) + require.True(t, reflect.DeepEqual(expectedTP, *actualTP)) + require.True(t, f.started) + require.Equal(t, f.lastSSRC, params.SSRC) + + // send a duplicate, should be dropped + expectedTP = TranslationParams{ + shouldDrop: true, + } + actualTP, err = f.GetTranslationParams(extPkt, 0) + require.NoError(t, err) + require.True(t, reflect.DeepEqual(expectedTP, *actualTP)) + + // out-of-order packet not in cache should be dropped + params = &testutils.TestExtPacketParams{ + SequenceNumber: 23332, + Timestamp: 0xabcdef, + SSRC: 0x12345678, + PayloadSize: 20, + } + extPkt, _ = testutils.GetTestExtPacketVP8(params, vp8) + expectedTP = TranslationParams{ + shouldDrop: true, + } + actualTP, err = f.GetTranslationParams(extPkt, 0) + require.NoError(t, err) + require.True(t, reflect.DeepEqual(expectedTP, *actualTP)) + + // padding only packet in order should be dropped + params = &testutils.TestExtPacketParams{ + IsHead: true, + SequenceNumber: 23334, + Timestamp: 0xabcdef, + SSRC: 0x12345678, + } + extPkt, _ = testutils.GetTestExtPacketVP8(params, vp8) + expectedTP = TranslationParams{ + shouldDrop: true, + } + actualTP, err = f.GetTranslationParams(extPkt, 0) + require.NoError(t, err) + require.True(t, reflect.DeepEqual(expectedTP, *actualTP)) + + // in order packet should be forwarded + params = &testutils.TestExtPacketParams{ + IsHead: true, + SequenceNumber: 23335, + Timestamp: 0xabcdef, + SSRC: 0x12345678, + PayloadSize: 20, + } + extPkt, _ = testutils.GetTestExtPacketVP8(params, vp8) + expectedTP = TranslationParams{ + rtp: &TranslationParamsRTP{ + snOrdering: SequenceNumberOrderingContiguous, + sequenceNumber: 23334, + timestamp: 0xabcdef, + }, + vp8: &TranslationParamsVP8{ + header: &buffer.VP8{ + FirstByte: 25, + PictureIDPresent: 1, + PictureID: 13467, + MBit: true, + TL0PICIDXPresent: 1, + TL0PICIDX: 233, + TIDPresent: 1, + TID: 1, + Y: 1, + KEYIDXPresent: 1, + KEYIDX: 23, + HeaderSize: 6, + IsKeyFrame: true, + }, + }, + } + actualTP, err = f.GetTranslationParams(extPkt, 0) + require.NoError(t, err) + require.True(t, reflect.DeepEqual(expectedTP, *actualTP)) + + // temporal layer higher than target, should be dropped + params = &testutils.TestExtPacketParams{ + IsHead: true, + SequenceNumber: 23336, + Timestamp: 0xabcdef, + SSRC: 0x12345678, + PayloadSize: 20, + } + vp8 = &buffer.VP8{ + FirstByte: 25, + PictureIDPresent: 1, + PictureID: 13468, + MBit: true, + TL0PICIDXPresent: 1, + TL0PICIDX: 233, + TIDPresent: 1, + TID: 2, + Y: 1, + KEYIDXPresent: 1, + KEYIDX: 23, + HeaderSize: 6, + IsKeyFrame: true, + } + extPkt, _ = testutils.GetTestExtPacketVP8(params, vp8) + expectedTP = TranslationParams{ + shouldDrop: true, + } + actualTP, err = f.GetTranslationParams(extPkt, 0) + require.NoError(t, err) + require.True(t, reflect.DeepEqual(expectedTP, *actualTP)) + + // RTP sequence number and VP8 picture id should be contiguous after dropping higher temporal layer picture + params = &testutils.TestExtPacketParams{ + IsHead: true, + SequenceNumber: 23337, + Timestamp: 0xabcdef, + SSRC: 0x12345678, + PayloadSize: 20, + } + vp8 = &buffer.VP8{ + FirstByte: 25, + PictureIDPresent: 1, + PictureID: 13469, + MBit: true, + TL0PICIDXPresent: 1, + TL0PICIDX: 234, + TIDPresent: 1, + TID: 0, + Y: 1, + KEYIDXPresent: 1, + KEYIDX: 23, + HeaderSize: 6, + IsKeyFrame: false, + } + extPkt, _ = testutils.GetTestExtPacketVP8(params, vp8) + expectedTP = TranslationParams{ + rtp: &TranslationParamsRTP{ + snOrdering: SequenceNumberOrderingContiguous, + sequenceNumber: 23335, + timestamp: 0xabcdef, + }, + vp8: &TranslationParamsVP8{ + header: &buffer.VP8{ + FirstByte: 25, + PictureIDPresent: 1, + PictureID: 13468, + MBit: true, + TL0PICIDXPresent: 1, + TL0PICIDX: 234, + TIDPresent: 1, + TID: 0, + Y: 1, + KEYIDXPresent: 1, + KEYIDX: 23, + HeaderSize: 6, + IsKeyFrame: false, + }, + }, + } + actualTP, err = f.GetTranslationParams(extPkt, 0) + require.NoError(t, err) + require.True(t, reflect.DeepEqual(expectedTP, *actualTP)) + + // switching SSRC (happens for new layer or new track source) + // should lock onto the new source, but sequence number should be contiguous + f.targetLayers = VideoLayers{ + spatial: 1, + temporal: 1, + } + + params = &testutils.TestExtPacketParams{ + IsHead: true, + SequenceNumber: 123, + Timestamp: 0xfedcba, + SSRC: 0x87654321, + PayloadSize: 20, + } + vp8 = &buffer.VP8{ + FirstByte: 25, + PictureIDPresent: 1, + PictureID: 45, + MBit: false, + TL0PICIDXPresent: 1, + TL0PICIDX: 12, + TIDPresent: 1, + TID: 0, + Y: 1, + KEYIDXPresent: 1, + KEYIDX: 30, + HeaderSize: 5, + IsKeyFrame: true, + } + extPkt, _ = testutils.GetTestExtPacketVP8(params, vp8) + + expectedTP = TranslationParams{ + rtp: &TranslationParamsRTP{ + snOrdering: SequenceNumberOrderingContiguous, + sequenceNumber: 23336, + timestamp: 0xabcdf0, + }, + vp8: &TranslationParamsVP8{ + header: &buffer.VP8{ + FirstByte: 25, + PictureIDPresent: 1, + PictureID: 13469, + MBit: true, + TL0PICIDXPresent: 1, + TL0PICIDX: 235, + TIDPresent: 1, + TID: 0, + Y: 1, + KEYIDXPresent: 1, + KEYIDX: 24, + HeaderSize: 6, + IsKeyFrame: true, + }, + }, + } + actualTP, err = f.GetTranslationParams(extPkt, 1) + require.NoError(t, err) + require.True(t, reflect.DeepEqual(expectedTP, *actualTP)) + require.Equal(t, f.lastSSRC, params.SSRC) +} + +func TestForwardGetSnTsForPadding(t *testing.T) { + f := NewForwarder(testutils.TestVP8Codec, webrtc.RTPCodecTypeVideo) + + params := &testutils.TestExtPacketParams{ + IsHead: true, + SequenceNumber: 23333, + Timestamp: 0xabcdef, + SSRC: 0x12345678, + PayloadSize: 20, + } + vp8 := &buffer.VP8{ + FirstByte: 25, + PictureIDPresent: 1, + PictureID: 13467, + MBit: true, + TL0PICIDXPresent: 1, + TL0PICIDX: 233, + TIDPresent: 1, + TID: 13, + Y: 1, + KEYIDXPresent: 1, + KEYIDX: 23, + HeaderSize: 6, + IsKeyFrame: true, + } + extPkt, _ := testutils.GetTestExtPacketVP8(params, vp8) + + f.targetLayers = VideoLayers{ + spatial: 0, + temporal: 1, + } + f.currentLayers = InvalidLayers + + // send it through so that forwarder locks onto stream + f.GetTranslationParams(extPkt, 0) + + // pause stream and get padding, it should still work + f.disable() + + // should get back frame end needed as the last packet did not have RTP marker set + snts, err := f.GetSnTsForPadding(5) + require.NoError(t, err) + + numPadding := 5 + clockRate := uint32(0) + frameRate := uint32(5) + var sntsExpected []SnTs = make([]SnTs, numPadding) + for i := 0; i < numPadding; i++ { + sntsExpected[i] = SnTs{ + sequenceNumber: 23333 + uint16(i) + 1, + timestamp: 0xabcdef + (uint32(i)*clockRate)/frameRate, + } + } + require.Equal(t, sntsExpected, snts) + + // now that there is a marker, timestamp should jump on first padding when asked again + snts, err = f.GetSnTsForPadding(numPadding) + require.NoError(t, err) + + for i := 0; i < numPadding; i++ { + sntsExpected[i] = SnTs{ + sequenceNumber: 23338 + uint16(i) + 1, + timestamp: 0xabcdef + (uint32(i+1)*clockRate)/frameRate, + } + } + require.Equal(t, sntsExpected, snts) +} + +func TestForwardGetSnTsForBlankFrames(t *testing.T) { + f := NewForwarder(testutils.TestVP8Codec, webrtc.RTPCodecTypeVideo) + + params := &testutils.TestExtPacketParams{ + IsHead: true, + SequenceNumber: 23333, + Timestamp: 0xabcdef, + SSRC: 0x12345678, + PayloadSize: 20, + } + vp8 := &buffer.VP8{ + FirstByte: 25, + PictureIDPresent: 1, + PictureID: 13467, + MBit: true, + TL0PICIDXPresent: 1, + TL0PICIDX: 233, + TIDPresent: 1, + TID: 13, + Y: 1, + KEYIDXPresent: 1, + KEYIDX: 23, + HeaderSize: 6, + IsKeyFrame: true, + } + extPkt, _ := testutils.GetTestExtPacketVP8(params, vp8) + + f.targetLayers = VideoLayers{ + spatial: 0, + temporal: 1, + } + f.currentLayers = InvalidLayers + + // send it through so that forwarder locks onto stream + f.GetTranslationParams(extPkt, 0) + + // should get back frame end needed as the last packet did not have RTP marker set + snts, frameEndNeeded, err := f.GetSnTsForBlankFrames() + require.NoError(t, err) + require.True(t, frameEndNeeded) + + // there should be one more than RTPBlankFramesMax as one would have been allocated to end previous frame + numPadding := RTPBlankFramesMax + 1 + clockRate := testutils.TestVP8Codec.ClockRate + frameRate := uint32(30) + var sntsExpected []SnTs = make([]SnTs, numPadding) + for i := 0; i < numPadding; i++ { + sntsExpected[i] = SnTs{ + sequenceNumber: 23333 + uint16(i) + 1, + timestamp: 0xabcdef + (uint32(i)*clockRate)/frameRate, + } + } + require.Equal(t, sntsExpected, snts) + + // now that there is a marker, timestamp should jump on first padding when asked again + // also number of padding should be RTPBlnkFramesMax + snts, frameEndNeeded, err = f.GetSnTsForBlankFrames() + require.NoError(t, err) + require.False(t, frameEndNeeded) + + numPadding = RTPBlankFramesMax + sntsExpected = sntsExpected[:numPadding] + for i := 0; i < numPadding; i++ { + sntsExpected[i] = SnTs{ + sequenceNumber: 23340 + uint16(i) + 1, + timestamp: 0xabcdef + (uint32(i+1)*clockRate)/frameRate, + } + } + require.Equal(t, sntsExpected, snts) +} + +func TestForwardGetPaddingVP8(t *testing.T) { + f := NewForwarder(testutils.TestVP8Codec, webrtc.RTPCodecTypeVideo) + + params := &testutils.TestExtPacketParams{ + IsHead: true, + SequenceNumber: 23333, + Timestamp: 0xabcdef, + SSRC: 0x12345678, + PayloadSize: 20, + } + vp8 := &buffer.VP8{ + FirstByte: 25, + PictureIDPresent: 1, + PictureID: 13467, + MBit: true, + TL0PICIDXPresent: 1, + TL0PICIDX: 233, + TIDPresent: 1, + TID: 13, + Y: 1, + KEYIDXPresent: 1, + KEYIDX: 23, + HeaderSize: 6, + IsKeyFrame: true, + } + extPkt, _ := testutils.GetTestExtPacketVP8(params, vp8) + + f.targetLayers = VideoLayers{ + spatial: 0, + temporal: 1, + } + f.currentLayers = InvalidLayers + + // send it through so that forwarder locks onto stream + f.GetTranslationParams(extPkt, 0) + + // getting padding with frame end needed, should repeat the last picture id + expectedVP8 := buffer.VP8{ + FirstByte: 16, + PictureIDPresent: 1, + PictureID: 13467, + MBit: true, + TL0PICIDXPresent: 1, + TL0PICIDX: 233, + TIDPresent: 1, + TID: 0, + Y: 1, + KEYIDXPresent: 1, + KEYIDX: 23, + HeaderSize: 6, + IsKeyFrame: true, + } + blankVP8 := f.GetPaddingVP8(true) + require.True(t, reflect.DeepEqual(expectedVP8, *blankVP8)) + + // getting padding with no frame end needed, should get next picture id + expectedVP8 = buffer.VP8{ + FirstByte: 16, + PictureIDPresent: 1, + PictureID: 13468, + MBit: true, + TL0PICIDXPresent: 1, + TL0PICIDX: 234, + TIDPresent: 1, + TID: 0, + Y: 1, + KEYIDXPresent: 1, + KEYIDX: 24, + HeaderSize: 6, + IsKeyFrame: true, + } + blankVP8 = f.GetPaddingVP8(false) + require.True(t, reflect.DeepEqual(expectedVP8, *blankVP8)) +} diff --git a/pkg/sfu/rtpmunger.go b/pkg/sfu/rtpmunger.go index 82d839d85..b09f44fd4 100644 --- a/pkg/sfu/rtpmunger.go +++ b/pkg/sfu/rtpmunger.go @@ -7,6 +7,26 @@ import ( // // RTPMunger // +type SequenceNumberOrdering int + +const ( + SequenceNumberOrderingContiguous SequenceNumberOrdering = iota + SequenceNumberOrderingOutOfOrder + SequenceNumberOrderingGap + SequenceNumberOrderingDuplicate +) + +type TranslationParamsRTP struct { + snOrdering SequenceNumberOrdering + sequenceNumber uint16 + timestamp uint32 +} + +type SnTs struct { + sequenceNumber uint16 + timestamp uint32 +} + type RTPMungerParams struct { highestIncomingSN uint16 lastSN uint16 diff --git a/pkg/sfu/streamallocator.go b/pkg/sfu/streamallocator.go index 76ba8fdca..bd95d92dd 100644 --- a/pkg/sfu/streamallocator.go +++ b/pkg/sfu/streamallocator.go @@ -939,8 +939,14 @@ func (s *StreamAllocator) maybeBoostLayer() { continue } - if track.AllocateNextHigher() { + result := track.AllocateNextHigher() + if result.layersChanged { s.lastBoostTime = time.Now() + + update := NewStreamStateUpdate() + update.HandleStreamingChange(result.change, track) + s.maybeSendUpdate(update) + break } } @@ -1143,7 +1149,7 @@ func (t *Track) FinalizeAllocate() { t.downTrack.FinalizeAllocate() } -func (t *Track) AllocateNextHigher() bool { +func (t *Track) AllocateNextHigher() VideoAllocationResult { return t.downTrack.AllocateNextHigher() } diff --git a/pkg/sfu/testutils/data.go b/pkg/sfu/testutils/data.go index eddb7fc91..09ec3e8b7 100644 --- a/pkg/sfu/testutils/data.go +++ b/pkg/sfu/testutils/data.go @@ -4,6 +4,7 @@ import ( "github.com/livekit/livekit-server/pkg/sfu/buffer" "github.com/pion/rtp" + "github.com/pion/webrtc/v3" ) //----------------------------------------------------------- @@ -63,8 +64,21 @@ func GetTestExtPacketVP8(params *TestExtPacketParams, vp8 *buffer.VP8) (*buffer. return nil, err } + ep.KeyFrame = vp8.IsKeyFrame ep.Payload = *vp8 return ep, nil } //-------------------------------------- + +var TestVP8Codec = webrtc.RTPCodecCapability{ + MimeType: "video/vp8", + ClockRate: 90000, +} + +var TestOpusCodec = webrtc.RTPCodecCapability{ + MimeType: "audio/opus", + ClockRate: 48000, +} + +//-------------------------------------- diff --git a/pkg/sfu/vp8munger.go b/pkg/sfu/vp8munger.go index 278953828..80f9210cc 100644 --- a/pkg/sfu/vp8munger.go +++ b/pkg/sfu/vp8munger.go @@ -9,6 +9,10 @@ import ( // // VP8 munger // +type TranslationParamsVP8 struct { + header *buffer.VP8 +} + type VP8MungerParams struct { pictureIdWrapHandler VP8PictureIdWrapHandler extLastPictureId int32