Files
livekit/pkg/sfu/flexfec/decoder_test.go
David Chen 195adeb38c Add FlexFEC-03 support to the SFU: upstream recovery and downstream generation
Implements FlexFEC-03 (RFC 8627) end to end in the SFU, in both directions:

- Upstream: decode FlexFEC repair packets from publishers and recover lost
  media in the receive buffer before forwarding (pkg/sfu/flexfec/decoder.go,
  pkg/sfu/buffer). Recovered packets are surfaced via Prometheus counters.
- Downstream: generate FlexFEC for subscribers off the downtrack, paced
  alongside media (pkg/sfu/downtrack_fec.go, pkg/sfu/pacer).
- Negotiation: advertise and match flexfec-03 in the media engine and
  transport SDP, gated by new config knobs (pkg/rtc, pkg/config,
  config-sample.yaml).
- Telemetry: livekit_fec_* metrics for sent/received/recovered packets
  (pkg/telemetry/prometheus/packets.go).

Tests: unit coverage for the decoder, buffer recovery, downtrack generation,
and transport negotiation, plus an end-to-end integration test and the
test-client support it needs (test/flexfec_test.go, test/client).
2026-06-15 13:41:04 -07:00

356 lines
10 KiB
Go

// Copyright 2026 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package flexfec
import (
"encoding/binary"
"math/rand"
"testing"
pionflexfec "github.com/pion/interceptor/pkg/flexfec"
"github.com/pion/rtp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/livekit/protocol/logger"
)
const (
testFECSSRC = uint32(1234)
testMediaSSRC = uint32(5678)
testFECPT = uint8(115)
testMediaPT = uint8(96)
)
func makeMediaPackets(t *testing.T, baseSN uint16, count int) []rtp.Packet {
t.Helper()
rng := rand.New(rand.NewSource(int64(baseSN)))
packets := make([]rtp.Packet, 0, count)
for i := 0; i < count; i++ {
payload := make([]byte, 100+rng.Intn(900))
rng.Read(payload)
packets = append(packets, rtp.Packet{
Header: rtp.Header{
Version: 2,
PayloadType: testMediaPT,
SequenceNumber: baseSN + uint16(i),
Timestamp: 3000 * uint32(i),
SSRC: testMediaSSRC,
Marker: i == count-1,
},
Payload: payload,
})
}
return packets
}
func encodeFEC(t *testing.T, mediaPackets []rtp.Packet, numFEC uint32) []rtp.Packet {
t.Helper()
encoder := pionflexfec.NewFlexEncoder03(testFECPT, testFECSSRC)
fecPackets := encoder.EncodeFec(mediaPackets, numFEC)
require.NotEmpty(t, fecPackets)
return fecPackets
}
func requirePacketEqual(t *testing.T, expected *rtp.Packet, actual *rtp.Packet) {
t.Helper()
require.Equal(t, expected.SequenceNumber, actual.SequenceNumber)
require.Equal(t, expected.Timestamp, actual.Timestamp)
require.Equal(t, expected.PayloadType, actual.PayloadType)
require.Equal(t, expected.SSRC, actual.SSRC)
require.Equal(t, expected.Marker, actual.Marker)
require.Equal(t, expected.Payload, actual.Payload)
}
func TestDecoderRecoversSingleLoss(t *testing.T) {
media := makeMediaPackets(t, 100, 5)
fec := encodeFEC(t, media, 1)
decoder := NewDecoder(testFECSSRC, testMediaSSRC, logger.GetLogger())
// drop media[2], feed the rest
var recovered []*rtp.Packet
for i := range media {
if i == 2 {
continue
}
recovered = append(recovered, decoder.DecodeFec(&media[i])...)
}
require.Empty(t, recovered)
for i := range fec {
recovered = append(recovered, decoder.DecodeFec(&fec[i])...)
}
require.Len(t, recovered, 1)
requirePacketEqual(t, &media[2], recovered[0])
stats := decoder.Stats()
assert.Equal(t, uint64(len(fec)), stats.FECPacketsReceived)
assert.Equal(t, uint64(1), stats.PacketsRecovered)
assert.Equal(t, uint64(0), stats.FECPacketsDiscarded)
}
func TestDecoderRecoversWithLateMedia(t *testing.T) {
// FEC arrives while two packets are missing; recovery happens once one of
// them shows up late. Exercises updateCoveringFecPackets and the
// stable-pointer window.
media := makeMediaPackets(t, 200, 5)
fec := encodeFEC(t, media, 1)
decoder := NewDecoder(testFECSSRC, testMediaSSRC, logger.GetLogger())
var recovered []*rtp.Packet
for _, i := range []int{0, 3, 4} {
recovered = append(recovered, decoder.DecodeFec(&media[i])...)
}
for i := range fec {
recovered = append(recovered, decoder.DecodeFec(&fec[i])...)
}
// two packets missing from the protected window, nothing recoverable yet
require.Empty(t, recovered)
// late arrival of media[1] leaves only media[2] missing
recovered = decoder.DecodeFec(&media[1])
require.Len(t, recovered, 1)
requirePacketEqual(t, &media[2], recovered[0])
}
func TestDecoderRecoversMultipleWindows(t *testing.T) {
decoder := NewDecoder(testFECSSRC, testMediaSSRC, logger.GetLogger())
encoder := pionflexfec.NewFlexEncoder03(testFECPT, testFECSSRC)
var allRecovered []*rtp.Packet
dropped := make(map[uint16]*rtp.Packet)
baseSN := uint16(1000)
for window := 0; window < 10; window++ {
media := makeMediaPackets(t, baseSN, 10)
fecPackets := encoder.EncodeFec(media, 1)
require.NotEmpty(t, fecPackets)
dropIdx := window % 10
for i := range media {
if i == dropIdx {
dropped[media[i].SequenceNumber] = &media[i]
continue
}
allRecovered = append(allRecovered, decoder.DecodeFec(&media[i])...)
}
for i := range fecPackets {
allRecovered = append(allRecovered, decoder.DecodeFec(&fecPackets[i])...)
}
baseSN += 10
}
require.Len(t, allRecovered, 10)
for _, rec := range allRecovered {
expected, ok := dropped[rec.SequenceNumber]
require.True(t, ok, "recovered unexpected sequence number %d", rec.SequenceNumber)
requirePacketEqual(t, expected, rec)
}
assert.Equal(t, uint64(10), decoder.Stats().PacketsRecovered)
}
func TestDecoderSequenceNumberWrap(t *testing.T) {
media := makeMediaPackets(t, 65533, 5) // spans 65533..1
fec := encodeFEC(t, media, 1)
decoder := NewDecoder(testFECSSRC, testMediaSSRC, logger.GetLogger())
var recovered []*rtp.Packet
for i := range media {
if i == 3 { // sequence number 0
continue
}
recovered = append(recovered, decoder.DecodeFec(&media[i])...)
}
for i := range fec {
recovered = append(recovered, decoder.DecodeFec(&fec[i])...)
}
require.Len(t, recovered, 1)
requirePacketEqual(t, &media[3], recovered[0])
}
func TestDecoderDiscardsForeignProtectedSSRC(t *testing.T) {
media := makeMediaPackets(t, 300, 5)
fec := encodeFEC(t, media, 1)
// decoder bound to a different protected stream
decoder := NewDecoder(testFECSSRC, testMediaSSRC+1, logger.GetLogger())
recovered := decoder.DecodeFec(&fec[0])
require.Empty(t, recovered)
stats := decoder.Stats()
assert.Equal(t, uint64(1), stats.FECPacketsReceived)
assert.Equal(t, uint64(1), stats.FECPacketsDiscarded)
}
func TestDecoderDiscardsMalformedFEC(t *testing.T) {
decoder := NewDecoder(testFECSSRC, testMediaSSRC, logger.GetLogger())
for _, payload := range [][]byte{
nil,
{0x00},
make([]byte, 10),
func() []byte { // R bit set
p := make([]byte, pionflexfec.BaseFec03HeaderSize+4)
p[0] = 0x80
p[8] = 1
return p
}(),
func() []byte { // multiple protected ssrcs
p := make([]byte, pionflexfec.BaseFec03HeaderSize+4)
p[8] = 2
return p
}(),
func() []byte { // empty mask
p := make([]byte, pionflexfec.BaseFec03HeaderSize+4)
p[8] = 1
binary.BigEndian.PutUint32(p[12:], testMediaSSRC)
binary.BigEndian.PutUint16(p[18:], 0x8000)
return p
}(),
} {
pkt := &rtp.Packet{
Header: rtp.Header{
Version: 2,
PayloadType: testFECPT,
SequenceNumber: uint16(rand.Intn(65536)),
SSRC: testFECSSRC,
},
Payload: payload,
}
require.NotPanics(t, func() {
require.Empty(t, decoder.DecodeFec(pkt))
})
}
stats := decoder.Stats()
assert.Equal(t, uint64(6), stats.FECPacketsReceived)
assert.Equal(t, uint64(6), stats.FECPacketsDiscarded)
}
func TestDecoderDiscardsDuplicateFEC(t *testing.T) {
media := makeMediaPackets(t, 400, 5)
fec := encodeFEC(t, media, 1)
decoder := NewDecoder(testFECSSRC, testMediaSSRC, logger.GetLogger())
for i := range media {
decoder.DecodeFec(&media[i])
}
require.Empty(t, decoder.DecodeFec(&fec[0]))
require.Empty(t, decoder.DecodeFec(&fec[0]))
stats := decoder.Stats()
assert.Equal(t, uint64(2), stats.FECPacketsReceived)
assert.Equal(t, uint64(1), stats.FECPacketsDiscarded)
}
func TestDecoderInputMemoryReuse(t *testing.T) {
// the decoder must not retain references to caller-owned packet memory
media := makeMediaPackets(t, 500, 5)
fec := encodeFEC(t, media, 1)
decoder := NewDecoder(testFECSSRC, testMediaSSRC, logger.GetLogger())
scratch := &rtp.Packet{}
feed := func(src *rtp.Packet) []*rtp.Packet {
buf, err := src.Marshal()
require.NoError(t, err)
require.NoError(t, scratch.Unmarshal(buf))
out := decoder.DecodeFec(scratch)
// clobber the scratch memory the decoder saw
for i := range scratch.Payload {
scratch.Payload[i] = 0xde
}
return out
}
var recovered []*rtp.Packet
for i := range media {
if i == 2 {
continue
}
recovered = append(recovered, feed(&media[i])...)
}
for i := range fec {
recovered = append(recovered, feed(&fec[i])...)
}
require.Len(t, recovered, 1)
requirePacketEqual(t, &media[2], recovered[0])
}
func TestDecoderTwoFECPacketsTwoLosses(t *testing.T) {
// with 2 FEC packets over 10 media packets, the coverage interleaves, so
// two losses landing in different coverage groups are both recoverable
media := makeMediaPackets(t, 600, 10)
fec := encodeFEC(t, media, 2)
require.Len(t, fec, 2)
decoder := NewDecoder(testFECSSRC, testMediaSSRC, logger.GetLogger())
var recovered []*rtp.Packet
for i := range media {
if i == 2 || i == 3 {
continue
}
recovered = append(recovered, decoder.DecodeFec(&media[i])...)
}
for i := range fec {
recovered = append(recovered, decoder.DecodeFec(&fec[i])...)
}
recoveredSNs := make(map[uint16]bool)
for _, r := range recovered {
recoveredSNs[r.SequenceNumber] = true
}
// at least one of the two losses must be recovered; both when the losses
// fall in distinct coverage groups
require.NotEmpty(t, recovered)
for _, r := range recovered {
expectedIdx := int(r.SequenceNumber - 600)
requirePacketEqual(t, &media[expectedIdx], r)
}
require.True(t, recoveredSNs[602] || recoveredSNs[603])
}
func TestDecoderResetsOnBigSequenceGap(t *testing.T) {
decoder := NewDecoder(testFECSSRC, testMediaSSRC, logger.GetLogger())
media := makeMediaPackets(t, 100, 110)
for i := range media {
decoder.DecodeFec(&media[i])
}
// jump far ahead, decoder should reset its windows rather than misuse
// stale state
farMedia := makeMediaPackets(t, 30000, 5)
fec := encodeFEC(t, farMedia, 1)
var recovered []*rtp.Packet
for i := range farMedia {
if i == 1 {
continue
}
recovered = append(recovered, decoder.DecodeFec(&farMedia[i])...)
}
for i := range fec {
recovered = append(recovered, decoder.DecodeFec(&fec[i])...)
}
require.Len(t, recovered, 1)
requirePacketEqual(t, &farMedia[1], recovered[0])
}