Split Forwarder into its own file to make sfu.DownTrack smaller. (#217)

This commit is contained in:
Raja Subramanian
2021-12-01 01:26:55 +05:30
committed by GitHub
parent 5e7f93c954
commit 57ee033d67
4 changed files with 1281 additions and 1267 deletions

File diff suppressed because it is too large Load Diff

711
pkg/sfu/forwarder.go Normal file
View File

@@ -0,0 +1,711 @@
package sfu
import (
"strings"
"sync"
"github.com/pion/webrtc/v3"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
)
//
// Forwarder
//
type VideoStreamingChange int
const (
VideoStreamingChangeNone VideoStreamingChange = iota
VideoStreamingChangePausing
VideoStreamingChangeResuming
)
type VideoAllocationState int
const (
VideoAllocationStateNone VideoAllocationState = iota
VideoAllocationStateMuted
VideoAllocationStateFeedDry
VideoAllocationStateAwaitingMeasurement
VideoAllocationStateOptimal
VideoAllocationStateDeficient
)
type VideoAllocationResult struct {
change VideoStreamingChange
state VideoAllocationState
bandwidthRequested int64
bandwidthDelta int64
}
type Forwarder struct {
lock sync.RWMutex
codec webrtc.RTPCodecCapability
kind webrtc.RTPCodecType
muted bool
started bool
lastSSRC uint32
lTSCalc int64
maxSpatialLayer int32
currentSpatialLayer int32
targetSpatialLayer int32
maxTemporalLayer int32
currentTemporalLayer int32
targetTemporalLayer int32
lastAllocationState VideoAllocationState
lastAllocationRequestBps int64
availableLayers []uint16
rtpMunger *RTPMunger
vp8Munger *VP8Munger
}
func NewForwarder(codec webrtc.RTPCodecCapability, kind webrtc.RTPCodecType) *Forwarder {
f := &Forwarder{
codec: codec,
kind: kind,
// start off with nothing, let streamallocator set things
currentSpatialLayer: InvalidSpatialLayer,
targetSpatialLayer: InvalidSpatialLayer,
currentTemporalLayer: InvalidTemporalLayer,
targetTemporalLayer: InvalidTemporalLayer,
lastAllocationState: VideoAllocationStateNone,
rtpMunger: NewRTPMunger(),
}
if strings.ToLower(codec.MimeType) == "video/vp8" {
f.vp8Munger = NewVP8Munger()
}
if f.kind == webrtc.RTPCodecTypeVideo {
f.maxSpatialLayer = 2
f.maxTemporalLayer = 2
} else {
f.maxSpatialLayer = InvalidSpatialLayer
f.maxTemporalLayer = InvalidTemporalLayer
}
return f
}
func (f *Forwarder) Mute(val bool) bool {
f.lock.Lock()
defer f.lock.Unlock()
if f.muted == val {
return false
}
f.muted = val
return true
}
func (f *Forwarder) Muted() bool {
f.lock.RLock()
defer f.lock.RUnlock()
return f.muted
}
func (f *Forwarder) SetMaxSpatialLayer(spatialLayer int32) (bool, VideoLayers) {
f.lock.Lock()
defer f.lock.Unlock()
if spatialLayer == f.maxSpatialLayer {
return false, VideoLayers{}
}
f.maxSpatialLayer = spatialLayer
return true, VideoLayers{
spatial: f.maxSpatialLayer,
temporal: f.maxTemporalLayer,
}
}
func (f *Forwarder) CurrentSpatialLayer() int32 {
f.lock.RLock()
defer f.lock.RUnlock()
return f.currentSpatialLayer
}
func (f *Forwarder) TargetSpatialLayer() int32 {
f.lock.RLock()
defer f.lock.RUnlock()
return f.targetSpatialLayer
}
func (f *Forwarder) SetMaxTemporalLayer(temporalLayer int32) (bool, VideoLayers) {
f.lock.Lock()
defer f.lock.Unlock()
if temporalLayer == f.maxTemporalLayer {
return false, VideoLayers{}
}
f.maxTemporalLayer = temporalLayer
return true, VideoLayers{
spatial: f.maxSpatialLayer,
temporal: f.maxTemporalLayer,
}
}
func (f *Forwarder) MaxLayers() VideoLayers {
f.lock.RLock()
defer f.lock.RUnlock()
return VideoLayers{
spatial: f.maxSpatialLayer,
temporal: f.maxTemporalLayer,
}
}
func (f *Forwarder) GetForwardingStatus() ForwardingStatus {
f.lock.RLock()
defer f.lock.RUnlock()
if f.targetSpatialLayer == InvalidSpatialLayer {
return ForwardingStatusOff
}
if f.targetSpatialLayer < f.maxSpatialLayer {
return ForwardingStatusPartial
}
return ForwardingStatusOptimal
}
func (f *Forwarder) UptrackLayersChange(availableLayers []uint16) {
f.lock.Lock()
defer f.lock.Unlock()
f.availableLayers = availableLayers
}
func (f *Forwarder) disable() {
f.currentSpatialLayer = InvalidSpatialLayer
f.targetSpatialLayer = InvalidSpatialLayer
f.currentTemporalLayer = InvalidTemporalLayer
f.targetTemporalLayer = InvalidTemporalLayer
}
func (f *Forwarder) getOptimalBandwidthNeeded(brs [3][4]int64) int64 {
optimalBandwidthNeeded := int64(0)
for i := f.maxSpatialLayer; i >= 0; i-- {
for j := f.maxTemporalLayer; j >= 0; j-- {
if brs[i][j] == 0 {
continue
}
if optimalBandwidthNeeded == 0 {
optimalBandwidthNeeded = brs[i][j]
break
}
}
if optimalBandwidthNeeded != 0 {
break
}
}
return optimalBandwidthNeeded
}
func (f *Forwarder) allocate(availableChannelCapacity int64, canPause bool, brs [3][4]int64) (result VideoAllocationResult) {
// should never get called on audio tracks, just for safety
if f.kind == webrtc.RTPCodecTypeAudio {
return
}
if f.muted {
result.state = VideoAllocationStateMuted
result.bandwidthRequested = 0
result.bandwidthDelta = result.bandwidthRequested - f.lastAllocationRequestBps
f.lastAllocationState = result.state
f.lastAllocationRequestBps = result.bandwidthRequested
return
}
optimalBandwidthNeeded := f.getOptimalBandwidthNeeded(brs)
if optimalBandwidthNeeded == 0 {
if len(f.availableLayers) == 0 {
// feed is dry
result.state = VideoAllocationStateFeedDry
result.bandwidthRequested = 0
result.bandwidthDelta = result.bandwidthRequested - f.lastAllocationRequestBps
f.lastAllocationState = result.state
f.lastAllocationRequestBps = result.bandwidthRequested
return
}
// feed bitrate is not yet calculated
result.state = VideoAllocationStateAwaitingMeasurement
f.lastAllocationState = result.state
if availableChannelCapacity == ChannelCapacityInfinity {
// channel capacity allows a free pass.
// So, resume with the highest layer available <= max subscribed layer
// if already optimistically started, nothing else to do
if f.targetSpatialLayer != InvalidSpatialLayer {
return
}
f.targetSpatialLayer = int32(f.availableLayers[len(f.availableLayers)-1])
if f.targetSpatialLayer > f.maxSpatialLayer {
f.targetSpatialLayer = f.maxSpatialLayer
}
f.targetTemporalLayer = f.maxTemporalLayer
if f.targetTemporalLayer == InvalidTemporalLayer {
f.targetTemporalLayer = 0
}
result.change = VideoStreamingChangeResuming
} else {
// if not optimistically started, nothing else to do
if f.targetSpatialLayer == InvalidSpatialLayer {
return
}
if canPause {
// disable it as it is not known how big this stream is
// and if it will fit in the available channel capacity
result.change = VideoStreamingChangePausing
result.state = VideoAllocationStateDeficient
result.bandwidthRequested = 0
result.bandwidthDelta = result.bandwidthRequested - f.lastAllocationRequestBps
f.lastAllocationState = result.state
f.lastAllocationRequestBps = result.bandwidthRequested
f.disable()
}
}
return
}
// LK-TODO for temporal preference, traverse the bitrates array the other way
for i := f.maxSpatialLayer; i >= 0; i-- {
for j := f.maxTemporalLayer; j >= 0; j-- {
if brs[i][j] == 0 {
continue
}
if brs[i][j] < availableChannelCapacity {
if f.targetSpatialLayer == InvalidSpatialLayer {
result.change = VideoStreamingChangeResuming
}
result.bandwidthRequested = brs[i][j]
result.bandwidthDelta = result.bandwidthRequested - f.lastAllocationRequestBps
if result.bandwidthRequested == optimalBandwidthNeeded {
result.state = VideoAllocationStateOptimal
} else {
result.state = VideoAllocationStateDeficient
}
f.lastAllocationState = result.state
f.lastAllocationRequestBps = result.bandwidthRequested
f.targetSpatialLayer = int32(i)
f.targetTemporalLayer = int32(j)
return
}
}
}
if !canPause {
// do not pause if preserving
// although preserving, currently streamed layers could have a different bitrate,
// but not updating to prevent entropy increase.
result.state = f.lastAllocationState
return
}
// no layer fits in the available channel capacity, disable the track
if f.targetSpatialLayer != InvalidSpatialLayer {
result.change = VideoStreamingChangePausing
}
result.state = VideoAllocationStateDeficient
result.bandwidthRequested = 0
result.bandwidthDelta = result.bandwidthRequested - f.lastAllocationRequestBps
f.lastAllocationState = result.state
f.lastAllocationRequestBps = result.bandwidthRequested
f.disable()
return
}
func (f *Forwarder) Allocate(availableChannelCapacity int64, brs [3][4]int64) VideoAllocationResult {
f.lock.Lock()
defer f.lock.Unlock()
return f.allocate(availableChannelCapacity, true, brs)
}
func (f *Forwarder) TryAllocate(additionalChannelCapacity int64, brs [3][4]int64) VideoAllocationResult {
f.lock.Lock()
defer f.lock.Unlock()
return f.allocate(f.lastAllocationRequestBps+additionalChannelCapacity, false, brs)
}
func (f *Forwarder) FinalizeAllocate(brs [3][4]int64) {
f.lock.Lock()
defer f.lock.Unlock()
if f.lastAllocationState != VideoAllocationStateAwaitingMeasurement {
return
}
optimalBandwidthNeeded := f.getOptimalBandwidthNeeded(brs)
if optimalBandwidthNeeded == 0 {
if len(f.availableLayers) == 0 {
// feed dry
f.lastAllocationState = VideoAllocationStateFeedDry
f.lastAllocationRequestBps = 0
}
// still awaiting measurement
return
}
// LK-TODO for temporal preference, traverse the bitrates array the other way
for i := f.maxSpatialLayer; i >= 0; i-- {
for j := f.maxTemporalLayer; j >= 0; j-- {
if brs[i][j] == 0 {
continue
}
f.lastAllocationState = VideoAllocationStateOptimal
f.lastAllocationRequestBps = brs[i][j]
f.targetSpatialLayer = int32(i)
f.targetTemporalLayer = int32(j)
break
}
}
}
func (f *Forwarder) AllocateNextHigher(brs [3][4]int64) bool {
f.lock.Lock()
defer f.lock.Unlock()
if f.kind == webrtc.RTPCodecTypeAudio {
return false
}
// if targets are still pending, don't increase
if f.targetSpatialLayer != InvalidSpatialLayer {
if f.targetSpatialLayer != f.currentSpatialLayer || f.targetTemporalLayer != f.currentTemporalLayer {
return false
}
}
optimalBandwidthNeeded := f.getOptimalBandwidthNeeded(brs)
if optimalBandwidthNeeded == 0 {
if len(f.availableLayers) == 0 {
f.lastAllocationState = VideoAllocationStateFeedDry
f.lastAllocationRequestBps = 0
return false
}
// bitrates not available yet
f.lastAllocationState = VideoAllocationStateAwaitingMeasurement
f.lastAllocationRequestBps = 0
return false
}
// try moving temporal layer up in the current spatial layer
nextTemporalLayer := f.currentTemporalLayer + 1
currentSpatialLayer := f.currentSpatialLayer
if currentSpatialLayer == InvalidSpatialLayer {
currentSpatialLayer = 0
}
if nextTemporalLayer <= f.maxTemporalLayer && brs[currentSpatialLayer][nextTemporalLayer] > 0 {
f.targetSpatialLayer = currentSpatialLayer
f.targetTemporalLayer = nextTemporalLayer
f.lastAllocationRequestBps = brs[currentSpatialLayer][nextTemporalLayer]
if f.lastAllocationRequestBps < optimalBandwidthNeeded {
f.lastAllocationState = VideoAllocationStateDeficient
} else {
f.lastAllocationState = VideoAllocationStateOptimal
}
return true
}
// try moving spatial layer up if already at max temporal layer of current spatial layer
nextSpatialLayer := f.currentSpatialLayer + 1
if nextSpatialLayer <= f.maxSpatialLayer && brs[nextSpatialLayer][0] > 0 {
f.targetSpatialLayer = nextSpatialLayer
f.targetTemporalLayer = 0
f.lastAllocationRequestBps = brs[nextSpatialLayer][0]
if f.lastAllocationRequestBps < optimalBandwidthNeeded {
f.lastAllocationState = VideoAllocationStateDeficient
} else {
f.lastAllocationState = VideoAllocationStateOptimal
}
return true
}
return false
}
func (f *Forwarder) AllocationState() VideoAllocationState {
f.lock.RLock()
defer f.lock.RUnlock()
return f.lastAllocationState
}
func (f *Forwarder) AllocationBandwidth() int64 {
f.lock.RLock()
defer f.lock.RUnlock()
return f.lastAllocationRequestBps
}
func (f *Forwarder) GetTranslationParams(extPkt *buffer.ExtPacket, layer int32) (*TranslationParams, error) {
f.lock.Lock()
defer f.lock.Unlock()
if f.muted {
return &TranslationParams{
shouldDrop: true,
}, nil
}
switch f.kind {
case webrtc.RTPCodecTypeAudio:
return f.getTranslationParamsAudio(extPkt)
case webrtc.RTPCodecTypeVideo:
return f.getTranslationParamsVideo(extPkt, layer)
}
return nil, ErrUnknownKind
}
// should be called with lock held
func (f *Forwarder) getTranslationParamsAudio(extPkt *buffer.ExtPacket) (*TranslationParams, error) {
if f.lastSSRC != extPkt.Packet.SSRC {
if !f.started {
// start of stream
f.started = true
f.rtpMunger.SetLastSnTs(extPkt)
} else {
// LK-TODO-START
// TS offset of 1 is not accurate. It should ideally
// be driven by packetization of the incoming track.
// But, on a track switch, won't have any historic data
// of a new track though.
// LK-TODO-END
f.rtpMunger.UpdateSnTsOffsets(extPkt, 1, 1)
}
f.lastSSRC = extPkt.Packet.SSRC
}
tp := &TranslationParams{}
tpRTP, err := f.rtpMunger.UpdateAndGetSnTs(extPkt)
if err != nil {
tp.shouldDrop = true
if err == ErrPaddingOnlyPacket || err == ErrDuplicatePacket || err == ErrOutOfOrderSequenceNumberCacheMiss {
return tp, nil
}
return tp, err
}
tp.rtp = tpRTP
return tp, nil
}
// should be called with lock held
func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer int32) (*TranslationParams, error) {
tp := &TranslationParams{}
if f.targetSpatialLayer == InvalidSpatialLayer {
// stream is paused by streamallocator
tp.shouldDrop = true
return tp, nil
}
tp.shouldSendPLI = false
if f.targetSpatialLayer != f.currentSpatialLayer {
if f.targetSpatialLayer == layer {
if extPkt.KeyFrame {
// lock to target layer
f.currentSpatialLayer = f.targetSpatialLayer
} else {
tp.shouldSendPLI = true
}
}
}
if f.currentSpatialLayer != layer {
tp.shouldDrop = true
return tp, nil
}
if f.targetSpatialLayer < f.currentSpatialLayer && f.targetSpatialLayer < f.maxSpatialLayer {
//
// If target layer is lower than both the current and
// maximum subscribed layer, it is due to bandwidth
// constraints that the target layer has been switched down.
// Continuing to send higher layer will only exacerbate the
// situation by putting more stress on the channel. So, drop it.
//
// In the other direction, it is okay to keep forwarding till
// switch point to get a smoother stream till the higher
// layer key frame arrives.
//
// Note that in the case of client subscription layer restriction
// coinciding with server restriction due to bandwidth limitation,
// this will take client subscription as the winning vote and
// continue to stream current spatial layer till switch point.
// That could lead to congesting the channel.
// LK-TODO: Improve the above case, i. e. distinguish server
// applied restriction from client requested restriction.
//
tp.shouldDrop = true
return tp, nil
}
if f.lastSSRC != extPkt.Packet.SSRC {
if !f.started {
f.started = true
f.rtpMunger.SetLastSnTs(extPkt)
if f.vp8Munger != nil {
f.vp8Munger.SetLast(extPkt)
}
} else {
// LK-TODO-START
// The below offset calculation is not technically correct.
// Timestamps based on the system time of an intermediate box like
// SFU is not going to be accurate. Packets arrival/processing
// are subject to vagaries of network delays, SFU processing etc.
// But, the correct way is a lot harder. Will have to
// look at RTCP SR to get timestamps and figure out alignment
// of layers and use that during layer switch. That can
// get tricky. Given the complexity of that approach, maybe
// this is just fine till it is not :-).
// LK-TODO-END
// Compute how much time passed between the old RTP extPkt
// and the current packet, and fix timestamp on source change
tDiffMs := (extPkt.Arrival - f.lTSCalc) / 1e6
td := uint32((tDiffMs * (int64(f.codec.ClockRate) / 1000)) / 1000)
if td == 0 {
td = 1
}
f.rtpMunger.UpdateSnTsOffsets(extPkt, 1, td)
if f.vp8Munger != nil {
f.vp8Munger.UpdateOffsets(extPkt)
}
}
f.lastSSRC = extPkt.Packet.SSRC
}
f.lTSCalc = extPkt.Arrival
tpRTP, err := f.rtpMunger.UpdateAndGetSnTs(extPkt)
if err != nil {
tp.shouldDrop = true
if err == ErrPaddingOnlyPacket || err == ErrDuplicatePacket || err == ErrOutOfOrderSequenceNumberCacheMiss {
return tp, nil
}
return tp, err
}
if f.vp8Munger == nil {
tp.rtp = tpRTP
return tp, nil
}
tpVP8, err := f.vp8Munger.UpdateAndGet(extPkt, tpRTP.snOrdering, f.currentTemporalLayer)
if err != nil {
tp.shouldDrop = true
if err == ErrFilteredVP8TemporalLayer || err == ErrOutOfOrderVP8PictureIdCacheMiss {
if err == ErrFilteredVP8TemporalLayer {
// filtered temporal layer, update sequence number offset to prevent holes
f.rtpMunger.PacketDropped(extPkt)
}
return tp, nil
}
return tp, err
}
// catch up temporal layer if necessary
if tpVP8 != nil && f.currentTemporalLayer != f.targetTemporalLayer {
if tpVP8.header.TIDPresent == 1 && tpVP8.header.TID <= uint8(f.targetTemporalLayer) {
f.currentTemporalLayer = f.targetTemporalLayer
}
}
tp.rtp = tpRTP
tp.vp8 = tpVP8
return tp, nil
}
func (f *Forwarder) GetSnTsForPadding(num int) ([]SnTs, error) {
f.lock.Lock()
defer f.lock.Unlock()
// padding is used for probing. Padding packets should be
// at frame boundaries only to ensure decoder sequencer does
// not get out-of-sync. But, when a stream is paused,
// force a frame marker as a restart of the stream will
// start with a key frame which will reset the decoder.
forceMarker := false
if f.targetSpatialLayer == InvalidSpatialLayer {
forceMarker = true
}
return f.rtpMunger.UpdateAndGetPaddingSnTs(num, 0, 0, forceMarker)
}
func (f *Forwarder) GetSnTsForBlankFrames() ([]SnTs, bool, error) {
f.lock.Lock()
defer f.lock.Unlock()
num := RTPBlankFramesMax
frameEndNeeded := !f.rtpMunger.IsOnFrameBoundary()
if frameEndNeeded {
num++
}
snts, err := f.rtpMunger.UpdateAndGetPaddingSnTs(num, f.codec.ClockRate, 30, frameEndNeeded)
return snts, frameEndNeeded, err
}
func (f *Forwarder) GetPaddingVP8(frameEndNeeded bool) (*buffer.VP8, error) {
f.lock.Lock()
defer f.lock.Unlock()
return f.vp8Munger.UpdateAndGetPadding(!frameEndNeeded)
}
func (f *Forwarder) GetRTPMungerParams() RTPMungerParams {
f.lock.RLock()
defer f.lock.RUnlock()
return f.rtpMunger.GetParams()
}

170
pkg/sfu/rtpmunger.go Normal file
View File

@@ -0,0 +1,170 @@
package sfu
import (
"github.com/livekit/livekit-server/pkg/sfu/buffer"
)
//
// RTPMunger
//
type RTPMungerParams struct {
highestIncomingSN uint16
lastSN uint16
snOffset uint16
lastTS uint32
tsOffset uint32
lastMarker bool
missingSNs map[uint16]uint16
}
type RTPMunger struct {
RTPMungerParams
}
func NewRTPMunger() *RTPMunger {
return &RTPMunger{RTPMungerParams: RTPMungerParams{
missingSNs: make(map[uint16]uint16, 10),
}}
}
func (r *RTPMunger) GetParams() RTPMungerParams {
return RTPMungerParams{
highestIncomingSN: r.highestIncomingSN,
lastSN: r.lastSN,
snOffset: r.snOffset,
lastTS: r.lastTS,
tsOffset: r.tsOffset,
lastMarker: r.lastMarker,
}
}
func (r *RTPMunger) SetLastSnTs(extPkt *buffer.ExtPacket) {
r.highestIncomingSN = extPkt.Packet.SequenceNumber - 1
r.lastSN = extPkt.Packet.SequenceNumber
r.lastTS = extPkt.Packet.Timestamp
}
func (r *RTPMunger) UpdateSnTsOffsets(extPkt *buffer.ExtPacket, snAdjust uint16, tsAdjust uint32) {
r.highestIncomingSN = extPkt.Packet.SequenceNumber - 1
r.snOffset = extPkt.Packet.SequenceNumber - r.lastSN - snAdjust
r.tsOffset = extPkt.Packet.Timestamp - r.lastTS - tsAdjust
// clear incoming missing sequence numbers on layer/source switch
r.missingSNs = make(map[uint16]uint16, 10)
}
func (r *RTPMunger) PacketDropped(extPkt *buffer.ExtPacket) {
if !extPkt.Head {
return
}
r.highestIncomingSN = extPkt.Packet.SequenceNumber
r.snOffset += 1
}
func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationParamsRTP, error) {
// if out-of-order, look up missing sequence number cache
if !extPkt.Head {
snOffset, ok := r.missingSNs[extPkt.Packet.SequenceNumber]
if !ok {
return &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingOutOfOrder,
}, ErrOutOfOrderSequenceNumberCacheMiss
}
delete(r.missingSNs, extPkt.Packet.SequenceNumber)
return &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingOutOfOrder,
sequenceNumber: extPkt.Packet.SequenceNumber - snOffset,
timestamp: extPkt.Packet.Timestamp - r.tsOffset,
}, nil
}
ordering := SequenceNumberOrderingContiguous
// if there are gaps, record it in missing sequence number cache
diff := extPkt.Packet.SequenceNumber - r.highestIncomingSN
if diff > 1 {
ordering = SequenceNumberOrderingGap
for i := r.highestIncomingSN + 1; i != extPkt.Packet.SequenceNumber; i++ {
r.missingSNs[i] = r.snOffset
}
} else {
// can get duplicate packet due to FEC
if diff == 0 {
return &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingDuplicate,
}, ErrDuplicatePacket
}
// if padding only packet, can be dropped and sequence number adjusted
// as it is contiguous and in order. That means this is the highest
// incoming sequence number and it is a good point to adjust
// sequence number offset.
if len(extPkt.Packet.Payload) == 0 {
r.highestIncomingSN = extPkt.Packet.SequenceNumber
r.snOffset += 1
return &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingContiguous,
}, ErrPaddingOnlyPacket
}
}
// in-order incoming packet, may or may not be contiguous.
// In the case of loss (i. e. incoming sequence number is not contiguous),
// forward even if it is a padding only packet. With temporal scalability,
// it is unclear if the current packet should be dropped if it is not
// contiguous. Hence forward anything that is not contiguous.
// Reference: http://www.rtcbits.com/2017/04/howto-implement-temporal-scalability.html
mungedSN := extPkt.Packet.SequenceNumber - r.snOffset
mungedTS := extPkt.Packet.Timestamp - r.tsOffset
r.highestIncomingSN = extPkt.Packet.SequenceNumber
r.lastSN = mungedSN
r.lastTS = mungedTS
r.lastMarker = extPkt.Packet.Marker
return &TranslationParamsRTP{
snOrdering: ordering,
sequenceNumber: mungedSN,
timestamp: mungedTS,
}, nil
}
func (r *RTPMunger) UpdateAndGetPaddingSnTs(num int, clockRate uint32, frameRate uint32, forceMarker bool) ([]SnTs, error) {
tsOffset := 0
if !r.lastMarker {
if !forceMarker {
return nil, ErrPaddingNotOnFrameBoundary
} else {
// if forcing frame end, use timestamp of latest received frame for the first one
tsOffset = 1
}
}
vals := make([]SnTs, num)
for i := 0; i < num; i++ {
vals[i].sequenceNumber = r.lastSN + uint16(i) + 1
if frameRate != 0 {
vals[i].timestamp = r.lastTS + uint32(i+1-tsOffset)*(clockRate/frameRate)
} else {
vals[i].timestamp = r.lastTS
}
}
r.lastSN = vals[num-1].sequenceNumber
r.snOffset -= uint16(num)
if forceMarker {
r.lastMarker = true
}
return vals, nil
}
func (r *RTPMunger) IsOnFrameBoundary() bool {
return r.lastMarker
}

400
pkg/sfu/vp8munger.go Normal file
View File

@@ -0,0 +1,400 @@
package sfu
import (
"github.com/elliotchance/orderedmap"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
)
//
// VP8 munger
//
type VP8MungerParams struct {
pictureIdWrapHandler VP8PictureIdWrapHandler
extLastPictureId int32
pictureIdOffset int32
pictureIdUsed int
lastTl0PicIdx uint8
tl0PicIdxOffset uint8
tl0PicIdxUsed int
tidUsed int
lastKeyIdx uint8
keyIdxOffset uint8
keyIdxUsed int
missingPictureIds *orderedmap.OrderedMap
lastDroppedPictureId int32
}
type VP8Munger struct {
VP8MungerParams
}
func NewVP8Munger() *VP8Munger {
return &VP8Munger{VP8MungerParams: VP8MungerParams{
missingPictureIds: orderedmap.NewOrderedMap(),
lastDroppedPictureId: -1,
}}
}
func (v *VP8Munger) SetLast(extPkt *buffer.ExtPacket) {
vp8, ok := extPkt.Payload.(buffer.VP8)
if !ok {
return
}
v.pictureIdUsed = vp8.PictureIDPresent
if v.pictureIdUsed == 1 {
v.pictureIdWrapHandler.Init(int32(vp8.PictureID)-1, vp8.MBit)
v.extLastPictureId = int32(vp8.PictureID)
}
v.tl0PicIdxUsed = vp8.TL0PICIDXPresent
if v.tl0PicIdxUsed == 1 {
v.lastTl0PicIdx = vp8.TL0PICIDX
}
v.tidUsed = vp8.TIDPresent
v.keyIdxUsed = vp8.KEYIDXPresent
if v.keyIdxUsed == 1 {
v.lastKeyIdx = vp8.KEYIDX
}
v.lastDroppedPictureId = -1
}
func (v *VP8Munger) UpdateOffsets(extPkt *buffer.ExtPacket) {
vp8, ok := extPkt.Payload.(buffer.VP8)
if !ok {
return
}
if v.pictureIdUsed == 1 {
v.pictureIdWrapHandler.Init(int32(vp8.PictureID)-1, vp8.MBit)
v.pictureIdOffset = int32(vp8.PictureID) - v.extLastPictureId - 1
}
if v.tl0PicIdxUsed == 1 {
v.tl0PicIdxOffset = vp8.TL0PICIDX - v.lastTl0PicIdx - 1
}
if v.keyIdxUsed == 1 {
v.keyIdxOffset = (vp8.KEYIDX - v.lastKeyIdx - 1) & 0x1f
}
// clear missing picture ids on layer switch
v.missingPictureIds = orderedmap.NewOrderedMap()
v.lastDroppedPictureId = -1
}
func (v *VP8Munger) UpdateAndGet(extPkt *buffer.ExtPacket, ordering SequenceNumberOrdering, maxTemporalLayer int32) (*TranslationParamsVP8, error) {
vp8, ok := extPkt.Payload.(buffer.VP8)
if !ok {
return nil, ErrNotVP8
}
extPictureId, newer := v.pictureIdWrapHandler.Unwrap(vp8.PictureID, vp8.MBit)
// if out-of-order, look up missing picture id cache
if !newer {
value, ok := v.missingPictureIds.Get(extPictureId)
if !ok {
return nil, ErrOutOfOrderVP8PictureIdCacheMiss
}
pictureIdOffset := value.(int32)
// the out-of-order picture id cannot be deleted from the cache
// as there could more than one packet in a picture and more
// than one packet of a picture could come out-of-order.
// To prevent picture id cache from growing, it is truncated
// when it reaches a certain size.
mungedPictureId := uint16((extPictureId - pictureIdOffset) & 0x7fff)
vp8Packet := &buffer.VP8{
FirstByte: vp8.FirstByte,
PictureIDPresent: vp8.PictureIDPresent,
PictureID: mungedPictureId,
MBit: mungedPictureId > 127,
TL0PICIDXPresent: vp8.TL0PICIDXPresent,
TL0PICIDX: vp8.TL0PICIDX - v.tl0PicIdxOffset,
TIDPresent: vp8.TIDPresent,
TID: vp8.TID,
Y: vp8.Y,
KEYIDXPresent: vp8.KEYIDXPresent,
KEYIDX: vp8.KEYIDX - v.keyIdxOffset,
IsKeyFrame: vp8.IsKeyFrame,
HeaderSize: vp8.HeaderSize + buffer.VP8PictureIdSizeDiff(mungedPictureId > 127, vp8.MBit),
}
return &TranslationParamsVP8{
header: vp8Packet,
}, nil
}
prevMaxPictureId := v.pictureIdWrapHandler.MaxPictureId()
v.pictureIdWrapHandler.UpdateMaxPictureId(extPictureId, vp8.MBit)
// if there is a gap in sequence number, record possible pictures that
// the missing packets can belong to in missing picture id cache.
// The missing picture cache should contain the previous picture id
// and the current picture id and all the intervening pictures.
// This is to handle a scenario as follows
// o Packet 10 -> Picture ID 10
// o Packet 11 -> missing
// o Packet 12 -> Picture ID 11
// In this case, Packet 11 could belong to either Picture ID 10 (last packet of that picture)
// or Picture ID 11 (first packet of the current picture). Although in this simple case,
// it is possible to deduce that (for example by looking at previous packet's RTP marker
// and check if that was the last packet of Picture 10), it could get complicated when
// the gap is larger.
if ordering == SequenceNumberOrderingGap {
// can drop packet if it belongs to the last dropped picture.
// Example:
// o Packet 10 - Picture 11 - TID that should be dropped
// o Packet 11 - missing
// o Packet 12 - Picture 11 - will be reported as GAP, but belongs to a picture that was dropped and hence can be dropped
// If Packet 11 comes around, it will be reported as OUT_OF_ORDER, but the missing
// picture id cache will not have an entry and hence will be dropped.
if extPictureId == v.lastDroppedPictureId {
return nil, ErrFilteredVP8TemporalLayer
} else {
for lostPictureId := prevMaxPictureId; lostPictureId <= extPictureId; lostPictureId++ {
v.missingPictureIds.Set(lostPictureId, v.pictureIdOffset)
}
// trim cache if necessary
for v.missingPictureIds.Len() > 50 {
el := v.missingPictureIds.Front()
v.missingPictureIds.Delete(el.Key)
}
}
} else {
if vp8.TIDPresent == 1 && vp8.TID > uint8(maxTemporalLayer) {
// adjust only once per picture as a picture could have multiple packets
if vp8.PictureIDPresent == 1 && prevMaxPictureId != extPictureId {
v.lastDroppedPictureId = extPictureId
v.pictureIdOffset += 1
}
return nil, ErrFilteredVP8TemporalLayer
}
}
// in-order incoming sequence number, may or may not be contiguous.
// In the case of loss (i. e. incoming sequence number is not contiguous),
// forward even if it is a filtered layer. With temporal scalability,
// it is unclear if the current packet should be dropped if it is not
// contiguous. Hence forward anything that is not contiguous.
// Reference: http://www.rtcbits.com/2017/04/howto-implement-temporal-scalability.html
extMungedPictureId := extPictureId - v.pictureIdOffset
mungedPictureId := uint16(extMungedPictureId & 0x7fff)
mungedTl0PicIdx := vp8.TL0PICIDX - v.tl0PicIdxOffset
mungedKeyIdx := (vp8.KEYIDX - v.keyIdxOffset) & 0x1f
v.extLastPictureId = extMungedPictureId
v.lastTl0PicIdx = mungedTl0PicIdx
v.lastKeyIdx = mungedKeyIdx
vp8Packet := &buffer.VP8{
FirstByte: vp8.FirstByte,
PictureIDPresent: vp8.PictureIDPresent,
PictureID: mungedPictureId,
MBit: mungedPictureId > 127,
TL0PICIDXPresent: vp8.TL0PICIDXPresent,
TL0PICIDX: mungedTl0PicIdx,
TIDPresent: vp8.TIDPresent,
TID: vp8.TID,
Y: vp8.Y,
KEYIDXPresent: vp8.KEYIDXPresent,
KEYIDX: mungedKeyIdx,
IsKeyFrame: vp8.IsKeyFrame,
HeaderSize: vp8.HeaderSize + buffer.VP8PictureIdSizeDiff(mungedPictureId > 127, vp8.MBit),
}
return &TranslationParamsVP8{
header: vp8Packet,
}, nil
}
func (v *VP8Munger) UpdateAndGetPadding(newPicture bool) (*buffer.VP8, error) {
offset := 0
if newPicture {
offset = 1
}
headerSize := 1
if (v.pictureIdUsed + v.tl0PicIdxUsed + v.tidUsed + v.keyIdxUsed) != 0 {
headerSize += 1
}
extPictureId := v.extLastPictureId
if v.pictureIdUsed == 1 {
extPictureId = v.extLastPictureId + int32(offset)
v.extLastPictureId = extPictureId
v.pictureIdOffset -= int32(offset)
if (extPictureId & 0x7fff) > 127 {
headerSize += 2
} else {
headerSize += 1
}
}
pictureId := uint16(extPictureId & 0x7fff)
tl0PicIdx := uint8(0)
if v.tl0PicIdxUsed == 1 {
tl0PicIdx = v.lastTl0PicIdx + uint8(offset)
v.lastTl0PicIdx = tl0PicIdx
v.tl0PicIdxOffset -= uint8(offset)
headerSize += 1
}
if (v.tidUsed + v.keyIdxUsed) != 0 {
headerSize += 1
}
keyIdx := uint8(0)
if v.keyIdxUsed == 1 {
keyIdx = (v.lastKeyIdx + uint8(offset)) & 0x1f
v.lastKeyIdx = keyIdx
v.keyIdxOffset -= uint8(offset)
}
vp8Packet := &buffer.VP8{
FirstByte: 0x10, // partition 0, start of VP8 Partition, reference frame
PictureIDPresent: v.pictureIdUsed,
PictureID: pictureId,
MBit: pictureId > 127,
TL0PICIDXPresent: v.tl0PicIdxUsed,
TL0PICIDX: tl0PicIdx,
TIDPresent: v.tidUsed,
TID: 0,
Y: 1,
KEYIDXPresent: v.keyIdxUsed,
KEYIDX: keyIdx,
IsKeyFrame: true,
HeaderSize: headerSize,
}
return vp8Packet, nil
}
//-----------------------------
//
// VP8Munger
//
func isWrapping7Bit(val1 int32, val2 int32) bool {
return val2 < val1 && (val1-val2) > (1<<6)
}
func isWrapping15Bit(val1 int32, val2 int32) bool {
return val2 < val1 && (val1-val2) > (1<<14)
}
type VP8PictureIdWrapHandler struct {
maxPictureId int32
maxMBit bool
totalWrap int32
lastWrap int32
}
func (v *VP8PictureIdWrapHandler) Init(extPictureId int32, mBit bool) {
v.maxPictureId = extPictureId
v.maxMBit = mBit
v.totalWrap = 0
v.lastWrap = 0
}
func (v *VP8PictureIdWrapHandler) MaxPictureId() int32 {
return v.maxPictureId
}
// unwrap picture id and update the maxPictureId. return unwrapped value, and whether picture id is newer
func (v *VP8PictureIdWrapHandler) Unwrap(pictureId uint16, mBit bool) (int32, bool) {
//
// VP8 Picture ID is specified very flexibly.
//
// Reference: https://datatracker.ietf.org/doc/html/draft-ietf-payload-vp8
//
// Quoting from the RFC
// ----------------------------
// PictureID: 7 or 15 bits (shown left and right, respectively, in
// Figure 2) not including the M bit. This is a running index of
// the frames, which MAY start at a random value, MUST increase by
// 1 for each subsequent frame, and MUST wrap to 0 after reaching
// the maximum ID (all bits set). The 7 or 15 bits of the
// PictureID go from most significant to least significant,
// beginning with the first bit after the M bit. The sender
// chooses a 7 or 15 bit index and sets the M bit accordingly.
// The receiver MUST NOT assume that the number of bits in
// PictureID stay the same through the session. Having sent a
// 7-bit PictureID with all bits set to 1, the sender may either
// wrap the PictureID to 0, or extend to 15 bits and continue
// incrementing
// ----------------------------
//
// While in practice, senders may not switch between modes indiscriminately,
// it is possible that small picture ids are sent in 7 bits and then switch
// to 15 bits. But, to ensure correctness, this code keeps track of how much
// quantity has wrapped and uses that to figure out if the incoming picture id
// is newer OR out-of-order.
//
maxPictureId := v.maxPictureId
// maxPictureId can be -1 at the start
if maxPictureId > 0 {
if v.maxMBit {
maxPictureId = v.maxPictureId & 0x7fff
} else {
maxPictureId = v.maxPictureId & 0x7f
}
}
var newPictureId int32
if mBit {
newPictureId = int32(pictureId & 0x7fff)
} else {
newPictureId = int32(pictureId & 0x7f)
}
//
// if the new picture id is too far ahead of max, i.e. more than half of last wrap,
// it is out-of-order, unwrap backwards
//
if v.totalWrap > 0 {
if (v.maxPictureId + (v.lastWrap >> 1)) < (newPictureId + v.totalWrap) {
return newPictureId + v.totalWrap - v.lastWrap, false
}
}
//
// check for wrap around based on mode of previous picture id.
// There are three cases here
// 1. Wrapping from 15-bit -> 8-bit (32767 -> 0)
// 2. Wrapping from 15-bit -> 15-bit (32767 -> 0)
// 3. Wrapping from 8-bit -> 8-bit (127 -> 0)
// In all cases, looking at the mode of previous picture id will
// ensure that we are calculating the rap properly.
//
wrap := int32(0)
if v.maxMBit {
if isWrapping15Bit(maxPictureId, newPictureId) {
wrap = 1 << 15
}
} else {
if isWrapping7Bit(maxPictureId, newPictureId) {
wrap = 1 << 7
}
}
v.totalWrap += wrap
if wrap != 0 {
v.lastWrap = wrap
}
newPictureId += v.totalWrap
// >= in the below check as there could be multiple packets per picture
return newPictureId, newPictureId >= v.maxPictureId
}
func (v *VP8PictureIdWrapHandler) UpdateMaxPictureId(extPictureId int32, mBit bool) {
v.maxPictureId = extPictureId
v.maxMBit = mBit
}