mirror of
https://github.com/livekit/livekit.git
synced 2026-07-02 13:32:19 +00:00
195adeb38c
Implements FlexFEC-03 (RFC 8627) end to end in the SFU, in both directions: - Upstream: decode FlexFEC repair packets from publishers and recover lost media in the receive buffer before forwarding (pkg/sfu/flexfec/decoder.go, pkg/sfu/buffer). Recovered packets are surfaced via Prometheus counters. - Downstream: generate FlexFEC for subscribers off the downtrack, paced alongside media (pkg/sfu/downtrack_fec.go, pkg/sfu/pacer). - Negotiation: advertise and match flexfec-03 in the media engine and transport SDP, gated by new config knobs (pkg/rtc, pkg/config, config-sample.yaml). - Telemetry: livekit_fec_* metrics for sent/received/recovered packets (pkg/telemetry/prometheus/packets.go). Tests: unit coverage for the decoder, buffer recovery, downtrack generation, and transport negotiation, plus an end-to-end integration test and the test-client support it needs (test/flexfec_test.go, test/client).
356 lines
10 KiB
Go
356 lines
10 KiB
Go
// Copyright 2026 LiveKit, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package flexfec
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"math/rand"
|
|
"testing"
|
|
|
|
pionflexfec "github.com/pion/interceptor/pkg/flexfec"
|
|
"github.com/pion/rtp"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/livekit/protocol/logger"
|
|
)
|
|
|
|
const (
|
|
testFECSSRC = uint32(1234)
|
|
testMediaSSRC = uint32(5678)
|
|
testFECPT = uint8(115)
|
|
testMediaPT = uint8(96)
|
|
)
|
|
|
|
func makeMediaPackets(t *testing.T, baseSN uint16, count int) []rtp.Packet {
|
|
t.Helper()
|
|
rng := rand.New(rand.NewSource(int64(baseSN)))
|
|
packets := make([]rtp.Packet, 0, count)
|
|
for i := 0; i < count; i++ {
|
|
payload := make([]byte, 100+rng.Intn(900))
|
|
rng.Read(payload)
|
|
packets = append(packets, rtp.Packet{
|
|
Header: rtp.Header{
|
|
Version: 2,
|
|
PayloadType: testMediaPT,
|
|
SequenceNumber: baseSN + uint16(i),
|
|
Timestamp: 3000 * uint32(i),
|
|
SSRC: testMediaSSRC,
|
|
Marker: i == count-1,
|
|
},
|
|
Payload: payload,
|
|
})
|
|
}
|
|
return packets
|
|
}
|
|
|
|
func encodeFEC(t *testing.T, mediaPackets []rtp.Packet, numFEC uint32) []rtp.Packet {
|
|
t.Helper()
|
|
encoder := pionflexfec.NewFlexEncoder03(testFECPT, testFECSSRC)
|
|
fecPackets := encoder.EncodeFec(mediaPackets, numFEC)
|
|
require.NotEmpty(t, fecPackets)
|
|
return fecPackets
|
|
}
|
|
|
|
func requirePacketEqual(t *testing.T, expected *rtp.Packet, actual *rtp.Packet) {
|
|
t.Helper()
|
|
require.Equal(t, expected.SequenceNumber, actual.SequenceNumber)
|
|
require.Equal(t, expected.Timestamp, actual.Timestamp)
|
|
require.Equal(t, expected.PayloadType, actual.PayloadType)
|
|
require.Equal(t, expected.SSRC, actual.SSRC)
|
|
require.Equal(t, expected.Marker, actual.Marker)
|
|
require.Equal(t, expected.Payload, actual.Payload)
|
|
}
|
|
|
|
func TestDecoderRecoversSingleLoss(t *testing.T) {
|
|
media := makeMediaPackets(t, 100, 5)
|
|
fec := encodeFEC(t, media, 1)
|
|
|
|
decoder := NewDecoder(testFECSSRC, testMediaSSRC, logger.GetLogger())
|
|
|
|
// drop media[2], feed the rest
|
|
var recovered []*rtp.Packet
|
|
for i := range media {
|
|
if i == 2 {
|
|
continue
|
|
}
|
|
recovered = append(recovered, decoder.DecodeFec(&media[i])...)
|
|
}
|
|
require.Empty(t, recovered)
|
|
|
|
for i := range fec {
|
|
recovered = append(recovered, decoder.DecodeFec(&fec[i])...)
|
|
}
|
|
|
|
require.Len(t, recovered, 1)
|
|
requirePacketEqual(t, &media[2], recovered[0])
|
|
|
|
stats := decoder.Stats()
|
|
assert.Equal(t, uint64(len(fec)), stats.FECPacketsReceived)
|
|
assert.Equal(t, uint64(1), stats.PacketsRecovered)
|
|
assert.Equal(t, uint64(0), stats.FECPacketsDiscarded)
|
|
}
|
|
|
|
func TestDecoderRecoversWithLateMedia(t *testing.T) {
|
|
// FEC arrives while two packets are missing; recovery happens once one of
|
|
// them shows up late. Exercises updateCoveringFecPackets and the
|
|
// stable-pointer window.
|
|
media := makeMediaPackets(t, 200, 5)
|
|
fec := encodeFEC(t, media, 1)
|
|
|
|
decoder := NewDecoder(testFECSSRC, testMediaSSRC, logger.GetLogger())
|
|
|
|
var recovered []*rtp.Packet
|
|
for _, i := range []int{0, 3, 4} {
|
|
recovered = append(recovered, decoder.DecodeFec(&media[i])...)
|
|
}
|
|
for i := range fec {
|
|
recovered = append(recovered, decoder.DecodeFec(&fec[i])...)
|
|
}
|
|
// two packets missing from the protected window, nothing recoverable yet
|
|
require.Empty(t, recovered)
|
|
|
|
// late arrival of media[1] leaves only media[2] missing
|
|
recovered = decoder.DecodeFec(&media[1])
|
|
require.Len(t, recovered, 1)
|
|
requirePacketEqual(t, &media[2], recovered[0])
|
|
}
|
|
|
|
func TestDecoderRecoversMultipleWindows(t *testing.T) {
|
|
decoder := NewDecoder(testFECSSRC, testMediaSSRC, logger.GetLogger())
|
|
encoder := pionflexfec.NewFlexEncoder03(testFECPT, testFECSSRC)
|
|
|
|
var allRecovered []*rtp.Packet
|
|
dropped := make(map[uint16]*rtp.Packet)
|
|
baseSN := uint16(1000)
|
|
for window := 0; window < 10; window++ {
|
|
media := makeMediaPackets(t, baseSN, 10)
|
|
fecPackets := encoder.EncodeFec(media, 1)
|
|
require.NotEmpty(t, fecPackets)
|
|
|
|
dropIdx := window % 10
|
|
for i := range media {
|
|
if i == dropIdx {
|
|
dropped[media[i].SequenceNumber] = &media[i]
|
|
continue
|
|
}
|
|
allRecovered = append(allRecovered, decoder.DecodeFec(&media[i])...)
|
|
}
|
|
for i := range fecPackets {
|
|
allRecovered = append(allRecovered, decoder.DecodeFec(&fecPackets[i])...)
|
|
}
|
|
baseSN += 10
|
|
}
|
|
|
|
require.Len(t, allRecovered, 10)
|
|
for _, rec := range allRecovered {
|
|
expected, ok := dropped[rec.SequenceNumber]
|
|
require.True(t, ok, "recovered unexpected sequence number %d", rec.SequenceNumber)
|
|
requirePacketEqual(t, expected, rec)
|
|
}
|
|
assert.Equal(t, uint64(10), decoder.Stats().PacketsRecovered)
|
|
}
|
|
|
|
func TestDecoderSequenceNumberWrap(t *testing.T) {
|
|
media := makeMediaPackets(t, 65533, 5) // spans 65533..1
|
|
fec := encodeFEC(t, media, 1)
|
|
|
|
decoder := NewDecoder(testFECSSRC, testMediaSSRC, logger.GetLogger())
|
|
|
|
var recovered []*rtp.Packet
|
|
for i := range media {
|
|
if i == 3 { // sequence number 0
|
|
continue
|
|
}
|
|
recovered = append(recovered, decoder.DecodeFec(&media[i])...)
|
|
}
|
|
for i := range fec {
|
|
recovered = append(recovered, decoder.DecodeFec(&fec[i])...)
|
|
}
|
|
|
|
require.Len(t, recovered, 1)
|
|
requirePacketEqual(t, &media[3], recovered[0])
|
|
}
|
|
|
|
func TestDecoderDiscardsForeignProtectedSSRC(t *testing.T) {
|
|
media := makeMediaPackets(t, 300, 5)
|
|
fec := encodeFEC(t, media, 1)
|
|
|
|
// decoder bound to a different protected stream
|
|
decoder := NewDecoder(testFECSSRC, testMediaSSRC+1, logger.GetLogger())
|
|
recovered := decoder.DecodeFec(&fec[0])
|
|
require.Empty(t, recovered)
|
|
|
|
stats := decoder.Stats()
|
|
assert.Equal(t, uint64(1), stats.FECPacketsReceived)
|
|
assert.Equal(t, uint64(1), stats.FECPacketsDiscarded)
|
|
}
|
|
|
|
func TestDecoderDiscardsMalformedFEC(t *testing.T) {
|
|
decoder := NewDecoder(testFECSSRC, testMediaSSRC, logger.GetLogger())
|
|
|
|
for _, payload := range [][]byte{
|
|
nil,
|
|
{0x00},
|
|
make([]byte, 10),
|
|
func() []byte { // R bit set
|
|
p := make([]byte, pionflexfec.BaseFec03HeaderSize+4)
|
|
p[0] = 0x80
|
|
p[8] = 1
|
|
return p
|
|
}(),
|
|
func() []byte { // multiple protected ssrcs
|
|
p := make([]byte, pionflexfec.BaseFec03HeaderSize+4)
|
|
p[8] = 2
|
|
return p
|
|
}(),
|
|
func() []byte { // empty mask
|
|
p := make([]byte, pionflexfec.BaseFec03HeaderSize+4)
|
|
p[8] = 1
|
|
binary.BigEndian.PutUint32(p[12:], testMediaSSRC)
|
|
binary.BigEndian.PutUint16(p[18:], 0x8000)
|
|
return p
|
|
}(),
|
|
} {
|
|
pkt := &rtp.Packet{
|
|
Header: rtp.Header{
|
|
Version: 2,
|
|
PayloadType: testFECPT,
|
|
SequenceNumber: uint16(rand.Intn(65536)),
|
|
SSRC: testFECSSRC,
|
|
},
|
|
Payload: payload,
|
|
}
|
|
require.NotPanics(t, func() {
|
|
require.Empty(t, decoder.DecodeFec(pkt))
|
|
})
|
|
}
|
|
|
|
stats := decoder.Stats()
|
|
assert.Equal(t, uint64(6), stats.FECPacketsReceived)
|
|
assert.Equal(t, uint64(6), stats.FECPacketsDiscarded)
|
|
}
|
|
|
|
func TestDecoderDiscardsDuplicateFEC(t *testing.T) {
|
|
media := makeMediaPackets(t, 400, 5)
|
|
fec := encodeFEC(t, media, 1)
|
|
|
|
decoder := NewDecoder(testFECSSRC, testMediaSSRC, logger.GetLogger())
|
|
for i := range media {
|
|
decoder.DecodeFec(&media[i])
|
|
}
|
|
require.Empty(t, decoder.DecodeFec(&fec[0]))
|
|
require.Empty(t, decoder.DecodeFec(&fec[0]))
|
|
|
|
stats := decoder.Stats()
|
|
assert.Equal(t, uint64(2), stats.FECPacketsReceived)
|
|
assert.Equal(t, uint64(1), stats.FECPacketsDiscarded)
|
|
}
|
|
|
|
func TestDecoderInputMemoryReuse(t *testing.T) {
|
|
// the decoder must not retain references to caller-owned packet memory
|
|
media := makeMediaPackets(t, 500, 5)
|
|
fec := encodeFEC(t, media, 1)
|
|
|
|
decoder := NewDecoder(testFECSSRC, testMediaSSRC, logger.GetLogger())
|
|
|
|
scratch := &rtp.Packet{}
|
|
feed := func(src *rtp.Packet) []*rtp.Packet {
|
|
buf, err := src.Marshal()
|
|
require.NoError(t, err)
|
|
require.NoError(t, scratch.Unmarshal(buf))
|
|
out := decoder.DecodeFec(scratch)
|
|
// clobber the scratch memory the decoder saw
|
|
for i := range scratch.Payload {
|
|
scratch.Payload[i] = 0xde
|
|
}
|
|
return out
|
|
}
|
|
|
|
var recovered []*rtp.Packet
|
|
for i := range media {
|
|
if i == 2 {
|
|
continue
|
|
}
|
|
recovered = append(recovered, feed(&media[i])...)
|
|
}
|
|
for i := range fec {
|
|
recovered = append(recovered, feed(&fec[i])...)
|
|
}
|
|
|
|
require.Len(t, recovered, 1)
|
|
requirePacketEqual(t, &media[2], recovered[0])
|
|
}
|
|
|
|
func TestDecoderTwoFECPacketsTwoLosses(t *testing.T) {
|
|
// with 2 FEC packets over 10 media packets, the coverage interleaves, so
|
|
// two losses landing in different coverage groups are both recoverable
|
|
media := makeMediaPackets(t, 600, 10)
|
|
fec := encodeFEC(t, media, 2)
|
|
require.Len(t, fec, 2)
|
|
|
|
decoder := NewDecoder(testFECSSRC, testMediaSSRC, logger.GetLogger())
|
|
|
|
var recovered []*rtp.Packet
|
|
for i := range media {
|
|
if i == 2 || i == 3 {
|
|
continue
|
|
}
|
|
recovered = append(recovered, decoder.DecodeFec(&media[i])...)
|
|
}
|
|
for i := range fec {
|
|
recovered = append(recovered, decoder.DecodeFec(&fec[i])...)
|
|
}
|
|
|
|
recoveredSNs := make(map[uint16]bool)
|
|
for _, r := range recovered {
|
|
recoveredSNs[r.SequenceNumber] = true
|
|
}
|
|
// at least one of the two losses must be recovered; both when the losses
|
|
// fall in distinct coverage groups
|
|
require.NotEmpty(t, recovered)
|
|
for _, r := range recovered {
|
|
expectedIdx := int(r.SequenceNumber - 600)
|
|
requirePacketEqual(t, &media[expectedIdx], r)
|
|
}
|
|
require.True(t, recoveredSNs[602] || recoveredSNs[603])
|
|
}
|
|
|
|
func TestDecoderResetsOnBigSequenceGap(t *testing.T) {
|
|
decoder := NewDecoder(testFECSSRC, testMediaSSRC, logger.GetLogger())
|
|
|
|
media := makeMediaPackets(t, 100, 110)
|
|
for i := range media {
|
|
decoder.DecodeFec(&media[i])
|
|
}
|
|
|
|
// jump far ahead, decoder should reset its windows rather than misuse
|
|
// stale state
|
|
farMedia := makeMediaPackets(t, 30000, 5)
|
|
fec := encodeFEC(t, farMedia, 1)
|
|
var recovered []*rtp.Packet
|
|
for i := range farMedia {
|
|
if i == 1 {
|
|
continue
|
|
}
|
|
recovered = append(recovered, decoder.DecodeFec(&farMedia[i])...)
|
|
}
|
|
for i := range fec {
|
|
recovered = append(recovered, decoder.DecodeFec(&fec[i])...)
|
|
}
|
|
require.Len(t, recovered, 1)
|
|
requirePacketEqual(t, &farMedia[1], recovered[0])
|
|
}
|