Files
livekit/pkg/sfu/buffer/flexfec_test.go
2026-06-10 14:34:06 -07:00

592 lines
18 KiB
Go

// Copyright 2026 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package buffer
import (
"fmt"
"testing"
pionflexfec "github.com/pion/interceptor/pkg/flexfec"
"github.com/pion/rtp"
"github.com/pion/transport/v4/packetio"
"github.com/pion/webrtc/v4"
"github.com/stretchr/testify/require"
"github.com/livekit/mediatransportutil/pkg/bucket"
"github.com/livekit/mediatransportutil/pkg/twcc"
"github.com/livekit/protocol/logger"
)
const (
testFECMediaSSRC = uint32(0x12345678)
testFECSSRC = uint32(0x23456789)
testFECPT = uint8(49)
)
// fecPacketStore mimics the primary buffer's packet bucket keyed by extended
// wire sequence number
type fecPacketStore struct {
packets map[uint64][]byte
highest uint64
hasAny bool
window uint64
}
func newFECPacketStore(window int) *fecPacketStore {
return &fecPacketStore{
packets: make(map[uint64][]byte),
window: uint64(window),
}
}
func (s *fecPacketStore) add(esn uint64, raw []byte) {
s.packets[esn] = raw
if !s.hasAny || esn > s.highest {
s.highest = esn
}
s.hasAny = true
}
func (s *fecPacketStore) getPacket(buf []byte, esn uint64) (int, error) {
if !s.hasAny || esn > s.highest {
return 0, bucket.ErrPacketTooNew
}
if s.highest-esn >= s.window {
return 0, bucket.ErrPacketTooOld
}
raw, ok := s.packets[esn]
if !ok {
return 0, bucket.ErrPacketSizeInvalid
}
return copy(buf, raw), nil
}
func (s *fecPacketStore) decoderParams() flexFECDecoderParams {
return flexFECDecoderParams{
Logger: logger.GetLogger(),
SSRC: testFECMediaSSRC,
GetPacket: s.getPacket,
ExtHighestSN: func() (uint64, bool) { return s.highest, s.hasAny },
WindowSize: func() int { return int(s.window) },
}
}
// makeFECMediaPackets builds count consecutive media packets starting at baseSN
// with varying payload sizes, header extensions and marker bits
func makeFECMediaPackets(t *testing.T, baseSN uint16, count int) ([]rtp.Packet, [][]byte) {
t.Helper()
packets := make([]rtp.Packet, 0, count)
raws := make([][]byte, 0, count)
for i := 0; i < count; i++ {
pkt := rtp.Packet{
Header: rtp.Header{
Version: 2,
PayloadType: 96,
SequenceNumber: baseSN + uint16(i),
Timestamp: 0x1000 + uint32(i/3)*3000,
SSRC: testFECMediaSSRC,
Marker: i%3 == 2,
},
Payload: make([]byte, 20+(i*37)%600),
}
for j := range pkt.Payload {
pkt.Payload[j] = byte(i + j)
}
if i%3 == 0 {
require.NoError(t, pkt.Header.SetExtension(3, []byte{byte(i), byte(i >> 8)}))
}
raw, err := pkt.Marshal()
require.NoError(t, err)
packets = append(packets, pkt)
raws = append(raws, raw)
}
return packets, raws
}
func encodeFECPackets(t *testing.T, media []rtp.Packet, numFEC uint32) []rtp.Packet {
t.Helper()
fecPackets := pionflexfec.NewFlexEncoder03(testFECPT, testFECSSRC).EncodeFec(media, numFEC)
require.NotEmpty(t, fecPackets)
return fecPackets
}
func TestParseFlexFEC03Header(t *testing.T) {
buildPayload := func(k0, k1, k2 bool, size int) []byte {
payload := make([]byte, size)
payload[8] = 1 // SSRCCount
// protected SSRC
payload[12], payload[13], payload[14], payload[15] = 0x12, 0x34, 0x56, 0x78
// SN base
payload[16], payload[17] = 0x10, 0x01
if k0 {
payload[18] |= 0x80
}
if k1 && size > 20 {
payload[20] |= 0x80
}
if k2 && size > 24 {
payload[24] |= 0x80
}
return payload
}
t.Run("mask sizes", func(t *testing.T) {
// k-bit 0 set, 15 bit mask, offsets 0 and 14
payload := buildPayload(true, false, false, 30)
payload[18] |= 0x40 // offset 0
payload[19] |= 0x01 // offset 14
h, err := parseFlexFEC03Header(payload)
require.NoError(t, err)
require.Equal(t, uint32(0x12345678), h.protectedSSRC)
require.Equal(t, uint16(0x1001), h.snBase)
require.Equal(t, []uint16{0, 14}, h.offsets)
require.Equal(t, 20, h.payloadOffset)
// k-bit 1 set, 15+31 bit masks, offsets 15 and 45
payload = buildPayload(false, true, false, 40)
payload[20] |= 0x40 // offset 15
payload[23] |= 0x01 // offset 45
h, err = parseFlexFEC03Header(payload)
require.NoError(t, err)
require.Equal(t, []uint16{15, 45}, h.offsets)
require.Equal(t, 24, h.payloadOffset)
// k-bit 2 set, 15+31+63 bit masks, offsets 46 and 108
payload = buildPayload(false, false, true, 40)
payload[24] |= 0x40 // offset 46
payload[31] |= 0x01 // offset 108
h, err = parseFlexFEC03Header(payload)
require.NoError(t, err)
require.Equal(t, []uint16{46, 108}, h.offsets)
require.Equal(t, 32, h.payloadOffset)
})
t.Run("rejects", func(t *testing.T) {
// truncated
_, err := parseFlexFEC03Header(make([]byte, 19))
require.ErrorIs(t, err, errFECPacketTruncated)
// truncated with k-bit 0 unset
_, err = parseFlexFEC03Header(buildPayload(false, true, false, 20))
require.ErrorIs(t, err, errFECPacketTruncated)
// truncated with k-bits 0 and 1 unset
_, err = parseFlexFEC03Header(buildPayload(false, false, true, 24))
require.ErrorIs(t, err, errFECPacketTruncated)
// retransmission bit
payload := buildPayload(true, false, false, 30)
payload[0] |= 0x80
_, err = parseFlexFEC03Header(payload)
require.ErrorIs(t, err, errFECRetransmissionBit)
// inflexible mask bit
payload = buildPayload(true, false, false, 30)
payload[0] |= 0x40
_, err = parseFlexFEC03Header(payload)
require.ErrorIs(t, err, errFECInflexibleMask)
// multiple SSRC
payload = buildPayload(true, false, false, 30)
payload[8] = 2
_, err = parseFlexFEC03Header(payload)
require.ErrorIs(t, err, errFECMultipleSSRC)
// k-bit of last mask not set
payload = buildPayload(false, false, false, 40)
_, err = parseFlexFEC03Header(payload)
require.ErrorIs(t, err, errFECLastMaskKBitNotSet)
// empty mask
payload = buildPayload(true, false, false, 30)
_, err = parseFlexFEC03Header(payload)
require.ErrorIs(t, err, errFECEmptyMask)
})
}
func TestFlexFECDecoderRoundTrip(t *testing.T) {
for _, blockSize := range []int{10, 30, 100} {
for _, dropIdx := range []int{0, blockSize / 2, blockSize - 1} {
t.Run(fmt.Sprintf("block=%d,drop=%d", blockSize, dropIdx), func(t *testing.T) {
media, raws := makeFECMediaPackets(t, 100, blockSize)
fecPkt := encodeFECPackets(t, media, 1)[0]
store := newFECPacketStore(1000)
for i, raw := range raws {
if i != dropIdx {
store.add(uint64(media[i].SequenceNumber), raw)
}
}
d := newFlexFECDecoder(store.decoderParams())
recovered := d.AddFEC(fecPkt.SequenceNumber, fecPkt.Payload, 0)
if dropIdx == blockSize-1 {
// the dropped packet is past the highest received one, it
// is not detectable as lost until the stream advances
require.Empty(t, recovered)
next, nextRaws := makeFECMediaPackets(t, 100+uint16(blockSize), 1)
store.add(uint64(next[0].SequenceNumber), nextRaws[0])
recovered = d.OnMediaPacket(next[0].SequenceNumber)
}
require.Len(t, recovered, 1)
require.Equal(t, raws[dropIdx], recovered[0])
stats := d.Stats()
require.Equal(t, uint32(1), stats.PacketsReceived)
require.Equal(t, uint32(1), stats.RecoveryAttempts)
require.Equal(t, uint32(1), stats.PacketsRecovered)
require.Equal(t, uint32(0), stats.RecoveryFailed)
require.False(t, d.HasPending())
})
}
}
}
func TestFlexFECDecoderNoLoss(t *testing.T) {
media, raws := makeFECMediaPackets(t, 5000, 12)
fecPkt := encodeFECPackets(t, media, 1)[0]
store := newFECPacketStore(1000)
for i, raw := range raws {
store.add(uint64(media[i].SequenceNumber), raw)
}
d := newFlexFECDecoder(store.decoderParams())
require.Empty(t, d.AddFEC(fecPkt.SequenceNumber, fecPkt.Payload, 0))
stats := d.Stats()
require.Equal(t, uint32(1), stats.PacketsReceived)
require.Equal(t, uint32(1), stats.PacketsUnused)
require.Equal(t, uint32(0), stats.RecoveryAttempts)
require.False(t, d.HasPending())
}
func TestFlexFECDecoderFECBeforeMedia(t *testing.T) {
media, raws := makeFECMediaPackets(t, 300, 10)
fecPkt := encodeFECPackets(t, media, 1)[0]
store := newFECPacketStore(1000)
d := newFlexFECDecoder(store.decoderParams())
// FEC arrives before any media
require.Empty(t, d.AddFEC(fecPkt.SequenceNumber, fecPkt.Payload, 0))
require.True(t, d.HasPending())
// media arrives, packet 4 is lost
var recovered [][]byte
for i, raw := range raws {
if i == 4 {
continue
}
store.add(uint64(media[i].SequenceNumber), raw)
recovered = append(recovered, d.OnMediaPacket(media[i].SequenceNumber)...)
}
require.Len(t, recovered, 1)
require.Equal(t, raws[4], recovered[0])
require.False(t, d.HasPending())
}
func TestFlexFECDecoderTwoMissing(t *testing.T) {
media, raws := makeFECMediaPackets(t, 700, 10)
fecPkt := encodeFECPackets(t, media, 1)[0]
store := newFECPacketStore(1000)
for i, raw := range raws {
if i == 2 || i == 6 {
continue
}
store.add(uint64(media[i].SequenceNumber), raw)
}
d := newFlexFECDecoder(store.decoderParams())
require.Empty(t, d.AddFEC(fecPkt.SequenceNumber, fecPkt.Payload, 0))
require.True(t, d.HasPending())
require.Equal(t, uint32(0), d.Stats().RecoveryAttempts)
// packet 2 arrives late, e.g. via RTX, packet 6 becomes recoverable
store.add(uint64(media[2].SequenceNumber), raws[2])
recovered := d.OnMediaPacket(media[2].SequenceNumber)
require.Len(t, recovered, 1)
require.Equal(t, raws[6], recovered[0])
require.False(t, d.HasPending())
}
func TestFlexFECDecoderDuplicateFEC(t *testing.T) {
media, raws := makeFECMediaPackets(t, 900, 10)
fecPkt := encodeFECPackets(t, media, 1)[0]
store := newFECPacketStore(1000)
for i, raw := range raws {
if i == 3 || i == 5 {
continue
}
store.add(uint64(media[i].SequenceNumber), raw)
}
d := newFlexFECDecoder(store.decoderParams())
require.Empty(t, d.AddFEC(fecPkt.SequenceNumber, fecPkt.Payload, 0))
require.Empty(t, d.AddFEC(fecPkt.SequenceNumber, fecPkt.Payload, 0))
stats := d.Stats()
require.Equal(t, uint32(2), stats.PacketsReceived)
require.Equal(t, 1, len(d.pending), "duplicate must not be buffered twice")
}
func TestFlexFECDecoderInvalid(t *testing.T) {
store := newFECPacketStore(1000)
d := newFlexFECDecoder(store.decoderParams())
// garbage payload
require.Empty(t, d.AddFEC(1, make([]byte, 10), 0))
// valid header protecting another SSRC
media, _ := makeFECMediaPackets(t, 100, 5)
for i := range media {
media[i].SSRC = testFECMediaSSRC + 1
}
fecPkt := encodeFECPackets(t, media, 1)[0]
require.Empty(t, d.AddFEC(fecPkt.SequenceNumber, fecPkt.Payload, 0))
stats := d.Stats()
require.Equal(t, uint32(2), stats.PacketsReceived)
require.Equal(t, uint32(2), stats.PacketsInvalid)
require.False(t, d.HasPending())
}
func TestFlexFECDecoderWraparound(t *testing.T) {
// media packets crossing the 16-bit sequence number wrap
media, raws := makeFECMediaPackets(t, 65530, 10)
fecPkt := encodeFECPackets(t, media, 1)[0]
const cycle = uint64(7) << 16
esn := func(i int) uint64 {
// 65530..65535 are in the cycle, 0..3 in the next
if media[i].SequenceNumber >= 65530 {
return cycle | uint64(media[i].SequenceNumber)
}
return cycle + (1 << 16) + uint64(media[i].SequenceNumber)
}
for _, dropIdx := range []int{2, 8} { // one drop on each side of the wrap
t.Run(fmt.Sprintf("drop=%d", dropIdx), func(t *testing.T) {
store := newFECPacketStore(1000)
for i, raw := range raws {
if i != dropIdx {
store.add(esn(i), raw)
}
}
d := newFlexFECDecoder(store.decoderParams())
recovered := d.AddFEC(fecPkt.SequenceNumber, fecPkt.Payload, 0)
require.Len(t, recovered, 1)
require.Equal(t, raws[dropIdx], recovered[0])
})
}
}
func TestFlexFECDecoderOverlappingChain(t *testing.T) {
// two FEC packets with overlapping coverage, recovering a packet via the
// first enables the second once the recovered packet is fed back
media, raws := makeFECMediaPackets(t, 2000, 12)
fecA := encodeFECPackets(t, media[:8], 1)[0] // protects 0..7
fecB := encodeFECPackets(t, media[4:], 1)[0] // protects 4..11
store := newFECPacketStore(1000)
for i, raw := range raws {
if i == 6 || i == 10 {
continue
}
store.add(uint64(media[i].SequenceNumber), raw)
}
d := newFlexFECDecoder(store.decoderParams())
// distinct FEC stream sequence numbers, each encoder starts at the same one
// B cannot recover, 6 and 10 missing
require.Empty(t, d.AddFEC(1, fecB.Payload, 0))
// A recovers 6
recovered := d.AddFEC(2, fecA.Payload, 0)
require.Len(t, recovered, 1)
require.Equal(t, raws[6], recovered[0])
// the buffer injects the recovered packet and notifies the decoder,
// which lets B recover 10
store.add(uint64(media[6].SequenceNumber), raws[6])
recovered = d.OnMediaPacket(media[6].SequenceNumber)
require.Len(t, recovered, 1)
require.Equal(t, raws[10], recovered[0])
require.False(t, d.HasPending())
require.Equal(t, uint32(2), d.Stats().PacketsRecovered)
}
func TestFlexFECDecoderStaleEviction(t *testing.T) {
media, raws := makeFECMediaPackets(t, 100, 10)
fecPkt := encodeFECPackets(t, media, 1)[0]
store := newFECPacketStore(50)
for i, raw := range raws {
if i == 2 || i == 6 {
continue
}
store.add(uint64(media[i].SequenceNumber), raw)
}
d := newFlexFECDecoder(store.decoderParams())
require.Empty(t, d.AddFEC(1, fecPkt.Payload, 0))
require.True(t, d.HasPending())
// the stream advances beyond the window, next FEC packet triggers a sweep
next, nextRaws := makeFECMediaPackets(t, 300, 10)
for i, raw := range nextRaws {
store.add(uint64(next[i].SequenceNumber), raw)
}
nextFEC := encodeFECPackets(t, next, 1)[0]
require.Empty(t, d.AddFEC(2, nextFEC.Payload, 0))
stats := d.Stats()
require.Equal(t, uint32(1), stats.RecoveryFailed, "stale FEC with missing packets evicted as failed")
require.Equal(t, uint32(1), stats.PacketsUnused, "fresh FEC with no losses evicted as unused")
require.False(t, d.HasPending())
}
// TestBufferFlexFECIntegration exercises the full path, a FEC packet written to
// the paired FEC buffer recovers a lost media packet into the primary's bucket
func TestBufferFlexFECIntegration(t *testing.T) {
flexfecCodec := webrtc.RTPCodecParameters{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeFlexFEC03,
ClockRate: 90000,
SDPFmtpLine: "repair-window=10000000",
},
PayloadType: 49,
}
media, raws := makeFECMediaPackets(t, 100, 10)
fecPkt := encodeFECPackets(t, media, 1)[0]
fecRaw, err := fecPkt.Marshal()
require.NoError(t, err)
for _, pairFirst := range []bool{true, false} {
t.Run(fmt.Sprintf("pairBeforeFECBuffer=%v", pairFirst), func(t *testing.T) {
factory := NewFactoryOfBufferFactory(500, 200).CreateBufferFactory()
primary := factory.GetOrNew(packetio.RTPBufferPacket, testFECMediaSSRC).(*Buffer)
require.NoError(t, primary.Bind(webrtc.RTPParameters{
Codecs: []webrtc.RTPCodecParameters{vp8Codec, flexfecCodec},
}, vp8Codec.RTPCodecCapability, 0))
require.Equal(t, uint8(49), primary.fecPayloadType, "fec payload type learned at bind")
twccResponder := twcc.NewTransportWideCCResponder()
primary.SetTWCCAndExtID(twccResponder, 5)
// media packets arrive, packet 4 is lost
for i, raw := range raws {
if i == 4 {
continue
}
_, err := primary.Write(raw)
require.NoError(t, err)
}
if pairFirst {
factory.SetFECPair(testFECSSRC, testFECMediaSSRC)
}
fecBuffer := factory.GetOrNew(packetio.RTPBufferPacket, testFECSSRC).(*Buffer)
if !pairFirst {
factory.SetFECPair(testFECSSRC, testFECMediaSSRC)
}
_, err = fecBuffer.Write(fecRaw)
require.NoError(t, err)
// the recovered packet must be byte equal in the primary's bucket
primary.Lock()
ref, ok := primary.extHighestWireSNLocked()
require.True(t, ok)
buf := make([]byte, bucket.RTPMaxPktSize)
n, getErr := primary.getWirePacketLocked(buf, unwrapNearESN(media[4].SequenceNumber, ref))
primary.Unlock()
require.NoError(t, getErr)
require.Equal(t, raws[4], buf[:n])
stats, hasStats := primary.GetFECStreamStats()
require.True(t, hasStats)
require.Equal(t, uint32(1), stats.PacketsReceived)
require.Equal(t, uint32(1), stats.RecoveryAttempts)
require.Equal(t, uint32(1), stats.PacketsRecovered)
})
}
}
// TestBufferFlexFECBeforePairing verifies FEC packets queued in the unbound FEC
// buffer are processed once the pairing is established
func TestBufferFlexFECBeforePairing(t *testing.T) {
media, raws := makeFECMediaPackets(t, 100, 10)
fecPkt := encodeFECPackets(t, media, 1)[0]
fecRaw, err := fecPkt.Marshal()
require.NoError(t, err)
factory := NewFactoryOfBufferFactory(500, 200).CreateBufferFactory()
primary := factory.GetOrNew(packetio.RTPBufferPacket, testFECMediaSSRC).(*Buffer)
require.NoError(t, primary.Bind(webrtc.RTPParameters{
Codecs: []webrtc.RTPCodecParameters{vp8Codec},
}, vp8Codec.RTPCodecCapability, 0))
require.Equal(t, uint8(0), primary.fecPayloadType, "no flexfec codec negotiated")
// FEC packet arrives before the pairing is known, queued in the FEC buffer
fecBuffer := factory.GetOrNew(packetio.RTPBufferPacket, testFECSSRC).(*Buffer)
_, err = fecBuffer.Write(fecRaw)
require.NoError(t, err)
for i, raw := range raws {
if i == 4 {
continue
}
_, err := primary.Write(raw)
require.NoError(t, err)
}
// pairing flushes the queued FEC packet into the primary
factory.SetFECPair(testFECSSRC, testFECMediaSSRC)
stats, hasStats := primary.GetFECStreamStats()
require.True(t, hasStats)
require.Equal(t, uint32(1), stats.PacketsReceived)
require.Equal(t, uint32(1), stats.PacketsRecovered)
}
func TestFlexFECDecoderPendingOverflow(t *testing.T) {
store := newFECPacketStore(100000)
params := store.decoderParams()
params.MaxPendingFEC = 4
d := newFlexFECDecoder(params)
// no media received, FEC packets pile up
for i := 0; i < 6; i++ {
media, _ := makeFECMediaPackets(t, uint16(1000+i*20), 5)
fecPkt := encodeFECPackets(t, media, 1)[0]
require.Empty(t, d.AddFEC(fecPkt.SequenceNumber+uint16(i), fecPkt.Payload, 0))
}
require.Equal(t, 4, len(d.pending))
require.Equal(t, uint32(2), d.Stats().PacketsDiscardedOld)
}