mirror of
https://github.com/livekit/livekit.git
synced 2026-07-02 22:12:18 +00:00
195adeb38c
Implements FlexFEC-03 (RFC 8627) end to end in the SFU, in both directions: - Upstream: decode FlexFEC repair packets from publishers and recover lost media in the receive buffer before forwarding (pkg/sfu/flexfec/decoder.go, pkg/sfu/buffer). Recovered packets are surfaced via Prometheus counters. - Downstream: generate FlexFEC for subscribers off the downtrack, paced alongside media (pkg/sfu/downtrack_fec.go, pkg/sfu/pacer). - Negotiation: advertise and match flexfec-03 in the media engine and transport SDP, gated by new config knobs (pkg/rtc, pkg/config, config-sample.yaml). - Telemetry: livekit_fec_* metrics for sent/received/recovered packets (pkg/telemetry/prometheus/packets.go). Tests: unit coverage for the decoder, buffer recovery, downtrack generation, and transport negotiation, plus an end-to-end integration test and the test-client support it needs (test/flexfec_test.go, test/client).
325 lines
11 KiB
Go
325 lines
11 KiB
Go
// Copyright 2026 LiveKit, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package buffer
|
|
|
|
import (
|
|
"math/rand"
|
|
"testing"
|
|
|
|
pionflexfec "github.com/pion/interceptor/pkg/flexfec"
|
|
"github.com/pion/rtp"
|
|
"github.com/pion/transport/v4/packetio"
|
|
"github.com/pion/webrtc/v4"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
const (
|
|
fecTestMediaSSRC = uint32(0x11111111)
|
|
fecTestFECSSRC = uint32(0x22222222)
|
|
fecTestFECPT = uint8(115)
|
|
)
|
|
|
|
var flexfecCodec = webrtc.RTPCodecParameters{
|
|
RTPCodecCapability: webrtc.RTPCodecCapability{
|
|
MimeType: webrtc.MimeTypeFlexFEC03,
|
|
ClockRate: 90000,
|
|
SDPFmtpLine: "repair-window=10000000",
|
|
},
|
|
PayloadType: webrtc.PayloadType(fecTestFECPT),
|
|
}
|
|
|
|
func fecTestMediaPackets(t *testing.T, baseSN uint16, count int) []rtp.Packet {
|
|
t.Helper()
|
|
rng := rand.New(rand.NewSource(int64(baseSN)))
|
|
pkts := make([]rtp.Packet, 0, count)
|
|
for i := 0; i < count; i++ {
|
|
payload := make([]byte, 50+rng.Intn(200))
|
|
rng.Read(payload)
|
|
// valid VP8 payload descriptor (S=1, no extensions) so the video
|
|
// packet processing in the buffer accepts the packet
|
|
payload[0] = 0x10
|
|
sn := baseSN + uint16(i)
|
|
pkts = append(pkts, rtp.Packet{
|
|
Header: rtp.Header{
|
|
Version: 2,
|
|
PayloadType: uint8(vp8Codec.PayloadType),
|
|
SequenceNumber: sn,
|
|
// derive timestamp from the sequence number so windows
|
|
// generated separately stay monotonic
|
|
Timestamp: 90000 + 3000*uint32(sn),
|
|
SSRC: fecTestMediaSSRC,
|
|
Marker: i == count-1,
|
|
},
|
|
Payload: payload,
|
|
})
|
|
}
|
|
return pkts
|
|
}
|
|
|
|
func bindFECTestBuffer(t *testing.T, buff *Buffer) {
|
|
t.Helper()
|
|
buff.codecType = webrtc.RTPCodecTypeVideo
|
|
require.NoError(t, buff.Bind(webrtc.RTPParameters{
|
|
Codecs: []webrtc.RTPCodecParameters{vp8Codec, flexfecCodec},
|
|
}, vp8Codec.RTPCodecCapability, 0))
|
|
}
|
|
|
|
func writePacket(t *testing.T, buff *Buffer, pkt *rtp.Packet) {
|
|
t.Helper()
|
|
raw, err := pkt.Marshal()
|
|
require.NoError(t, err)
|
|
_, err = buff.Write(raw)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
// readExtSequenceNumbers drains count ExtPackets and returns sequence number
|
|
// -> extended sequence number of everything seen.
|
|
func readExtSequenceNumbers(t *testing.T, buff *Buffer, count int) map[uint16]uint64 {
|
|
t.Helper()
|
|
seen := make(map[uint16]uint64, count)
|
|
var buf [1500]byte
|
|
for i := 0; i < count; i++ {
|
|
extPkt, err := buff.ReadExtended(buf[:])
|
|
require.NoError(t, err)
|
|
require.NotNil(t, extPkt)
|
|
seen[extPkt.Packet.SequenceNumber] = extPkt.ExtSequenceNumber
|
|
}
|
|
return seen
|
|
}
|
|
|
|
// requireRecoveredInBucket asserts that the dropped packet was placed into
|
|
// the buffer's bucket (where downstream NACKs are served from), matching the
|
|
// behavior of RTX repaired packets.
|
|
func requireRecoveredInBucket(t *testing.T, buff *Buffer, dropped *rtp.Packet, extSNBySN map[uint16]uint64, refSN uint16) {
|
|
t.Helper()
|
|
refExtSN, ok := extSNBySN[refSN]
|
|
require.True(t, ok, "reference sequence number %d not seen", refSN)
|
|
droppedExtSN := refExtSN + uint64(dropped.SequenceNumber-refSN)
|
|
|
|
var buf [1500]byte
|
|
n, err := buff.GetPacket(buf[:], droppedExtSN)
|
|
require.NoError(t, err, "recovered packet not found in bucket")
|
|
|
|
var pkt rtp.Packet
|
|
require.NoError(t, pkt.Unmarshal(buf[:n]))
|
|
require.Equal(t, dropped.SequenceNumber, pkt.SequenceNumber)
|
|
assert.Equal(t, dropped.Timestamp, pkt.Timestamp)
|
|
assert.Equal(t, dropped.Payload, pkt.Payload)
|
|
}
|
|
|
|
func TestBufferFECRecoversDroppedPacket(t *testing.T) {
|
|
factory := NewFactoryOfBufferFactory(500, 200).CreateBufferFactory()
|
|
|
|
primary := factory.GetOrNew(packetio.RTPBufferPacket, fecTestMediaSSRC).(*Buffer)
|
|
fecBuff := factory.GetOrNew(packetio.RTPBufferPacket, fecTestFECSSRC).(*Buffer)
|
|
factory.SetFECPair(fecTestFECSSRC, fecTestMediaSSRC)
|
|
|
|
bindFECTestBuffer(t, primary)
|
|
|
|
var recoveredDelta, receivedDelta int
|
|
primary.OnFECRecovery(func(recovered int, received int, discarded int, bytesReceived int) {
|
|
recoveredDelta += recovered
|
|
receivedDelta += received
|
|
})
|
|
|
|
media := fecTestMediaPackets(t, 100, 10)
|
|
encoder := pionflexfec.NewFlexEncoder03(fecTestFECPT, fecTestFECSSRC)
|
|
fecPackets := encoder.EncodeFec(media, 2)
|
|
require.NotEmpty(t, fecPackets)
|
|
|
|
const droppedIdx = 3
|
|
for i := range media {
|
|
if i == droppedIdx {
|
|
continue
|
|
}
|
|
writePacket(t, primary, &media[i])
|
|
}
|
|
for i := range fecPackets {
|
|
writePacket(t, fecBuff, &fecPackets[i])
|
|
}
|
|
|
|
stats := primary.FECDecoderStats()
|
|
assert.EqualValues(t, len(fecPackets), stats.FECPacketsReceived)
|
|
assert.EqualValues(t, 1, stats.PacketsRecovered)
|
|
assert.EqualValues(t, 0, stats.FECPacketsDiscarded)
|
|
assert.Equal(t, 1, recoveredDelta)
|
|
assert.Equal(t, len(fecPackets), receivedDelta)
|
|
|
|
// the 9 received packets flow through the ext packet pipeline, the
|
|
// recovered one fills the bucket like an RTX repair
|
|
extSNBySN := readExtSequenceNumbers(t, primary, len(media)-1)
|
|
requireRecoveredInBucket(t, primary, &media[droppedIdx], extSNBySN, media[0].SequenceNumber)
|
|
}
|
|
|
|
func TestBufferFECPairAfterPackets(t *testing.T) {
|
|
// FEC packets arriving before the ssrc-group is known are queued as
|
|
// pending and replayed when the pair is established. Media seen before
|
|
// the pairing is not in the decoder window (cold start), so the first
|
|
// window is not recoverable, subsequent windows are.
|
|
factory := NewFactoryOfBufferFactory(500, 200).CreateBufferFactory()
|
|
|
|
primary := factory.GetOrNew(packetio.RTPBufferPacket, fecTestMediaSSRC).(*Buffer)
|
|
bindFECTestBuffer(t, primary)
|
|
|
|
encoder := pionflexfec.NewFlexEncoder03(fecTestFECPT, fecTestFECSSRC)
|
|
media := fecTestMediaPackets(t, 200, 10)
|
|
fecPackets := encoder.EncodeFec(media, 2)
|
|
require.NotEmpty(t, fecPackets)
|
|
|
|
for i := range media {
|
|
if i == 5 {
|
|
continue
|
|
}
|
|
writePacket(t, primary, &media[i])
|
|
}
|
|
|
|
// fec buffer created by first packet arrival, before the pair is declared
|
|
fecBuff := factory.GetOrNew(packetio.RTPBufferPacket, fecTestFECSSRC).(*Buffer)
|
|
for i := range fecPackets {
|
|
writePacket(t, fecBuff, &fecPackets[i])
|
|
}
|
|
|
|
stats := primary.FECDecoderStats()
|
|
require.EqualValues(t, 0, stats.FECPacketsReceived)
|
|
|
|
factory.SetFECPair(fecTestFECSSRC, fecTestMediaSSRC)
|
|
|
|
// pending FEC was replayed into the decoder, no recovery possible for the
|
|
// cold-start window
|
|
stats = primary.FECDecoderStats()
|
|
assert.EqualValues(t, len(fecPackets), stats.FECPacketsReceived)
|
|
assert.EqualValues(t, 0, stats.PacketsRecovered)
|
|
|
|
// the next window recovers normally
|
|
media2 := fecTestMediaPackets(t, 210, 10)
|
|
fecPackets2 := encoder.EncodeFec(media2, 2)
|
|
require.NotEmpty(t, fecPackets2)
|
|
|
|
const droppedIdx = 4
|
|
for i := range media2 {
|
|
if i == droppedIdx {
|
|
continue
|
|
}
|
|
writePacket(t, primary, &media2[i])
|
|
}
|
|
for i := range fecPackets2 {
|
|
writePacket(t, fecBuff, &fecPackets2[i])
|
|
}
|
|
|
|
stats = primary.FECDecoderStats()
|
|
assert.EqualValues(t, 1, stats.PacketsRecovered)
|
|
|
|
extSNBySN := readExtSequenceNumbers(t, primary, len(media)-1+len(media2)-1)
|
|
requireRecoveredInBucket(t, primary, &media2[droppedIdx], extSNBySN, media2[0].SequenceNumber)
|
|
}
|
|
|
|
func TestBufferFECCoupledBeforeBuffersExist(t *testing.T) {
|
|
// pair declared first (from SDP), buffers created later on first packet
|
|
factory := NewFactoryOfBufferFactory(500, 200).CreateBufferFactory()
|
|
factory.SetFECPair(fecTestFECSSRC, fecTestMediaSSRC)
|
|
|
|
primary := factory.GetOrNew(packetio.RTPBufferPacket, fecTestMediaSSRC).(*Buffer)
|
|
fecBuff := factory.GetOrNew(packetio.RTPBufferPacket, fecTestFECSSRC).(*Buffer)
|
|
bindFECTestBuffer(t, primary)
|
|
|
|
media := fecTestMediaPackets(t, 300, 5)
|
|
encoder := pionflexfec.NewFlexEncoder03(fecTestFECPT, fecTestFECSSRC)
|
|
fecPackets := encoder.EncodeFec(media, 1)
|
|
require.NotEmpty(t, fecPackets)
|
|
|
|
const droppedIdx = 2
|
|
for i := range media {
|
|
if i == droppedIdx {
|
|
continue
|
|
}
|
|
writePacket(t, primary, &media[i])
|
|
}
|
|
for i := range fecPackets {
|
|
writePacket(t, fecBuff, &fecPackets[i])
|
|
}
|
|
|
|
stats := primary.FECDecoderStats()
|
|
assert.EqualValues(t, 1, stats.PacketsRecovered)
|
|
|
|
extSNBySN := readExtSequenceNumbers(t, primary, len(media)-1)
|
|
requireRecoveredInBucket(t, primary, &media[droppedIdx], extSNBySN, media[0].SequenceNumber)
|
|
}
|
|
|
|
func TestBufferFECIgnoresUnexpectedPayloadType(t *testing.T) {
|
|
factory := NewFactoryOfBufferFactory(500, 200).CreateBufferFactory()
|
|
|
|
primary := factory.GetOrNew(packetio.RTPBufferPacket, fecTestMediaSSRC).(*Buffer)
|
|
fecBuff := factory.GetOrNew(packetio.RTPBufferPacket, fecTestFECSSRC).(*Buffer)
|
|
factory.SetFECPair(fecTestFECSSRC, fecTestMediaSSRC)
|
|
|
|
// bound without flexfec in negotiated codecs
|
|
primary.codecType = webrtc.RTPCodecTypeVideo
|
|
require.NoError(t, primary.Bind(webrtc.RTPParameters{
|
|
Codecs: []webrtc.RTPCodecParameters{vp8Codec},
|
|
}, vp8Codec.RTPCodecCapability, 0))
|
|
|
|
media := fecTestMediaPackets(t, 400, 5)
|
|
encoder := pionflexfec.NewFlexEncoder03(fecTestFECPT, fecTestFECSSRC)
|
|
fecPackets := encoder.EncodeFec(media, 1)
|
|
require.NotEmpty(t, fecPackets)
|
|
|
|
for i := range media {
|
|
writePacket(t, primary, &media[i])
|
|
}
|
|
for i := range fecPackets {
|
|
writePacket(t, fecBuff, &fecPackets[i])
|
|
}
|
|
|
|
// no flexfec payload type negotiated, decoder must not be created
|
|
stats := primary.FECDecoderStats()
|
|
assert.EqualValues(t, 0, stats.FECPacketsReceived)
|
|
assert.EqualValues(t, 0, stats.PacketsRecovered)
|
|
}
|
|
|
|
func TestBufferFECNACKSuppression(t *testing.T) {
|
|
// a recovered packet must clear the pending NACK for its sequence number
|
|
factory := NewFactoryOfBufferFactory(500, 200).CreateBufferFactory()
|
|
|
|
primary := factory.GetOrNew(packetio.RTPBufferPacket, fecTestMediaSSRC).(*Buffer)
|
|
fecBuff := factory.GetOrNew(packetio.RTPBufferPacket, fecTestFECSSRC).(*Buffer)
|
|
factory.SetFECPair(fecTestFECSSRC, fecTestMediaSSRC)
|
|
bindFECTestBuffer(t, primary)
|
|
|
|
media := fecTestMediaPackets(t, 700, 10)
|
|
encoder := pionflexfec.NewFlexEncoder03(fecTestFECPT, fecTestFECSSRC)
|
|
fecPackets := encoder.EncodeFec(media, 2)
|
|
require.NotEmpty(t, fecPackets)
|
|
|
|
const droppedIdx = 6
|
|
for i := range media {
|
|
if i == droppedIdx {
|
|
continue
|
|
}
|
|
writePacket(t, primary, &media[i])
|
|
}
|
|
|
|
// the only gap is the dropped packet, exactly one queued NACK
|
|
require.NotNil(t, primary.nacker)
|
|
require.Len(t, primary.nacker.Nacks(), 1, "expected queued NACK for dropped packet")
|
|
|
|
for i := range fecPackets {
|
|
writePacket(t, fecBuff, &fecPackets[i])
|
|
}
|
|
require.EqualValues(t, 1, primary.FECDecoderStats().PacketsRecovered)
|
|
|
|
require.Empty(t, primary.nacker.Nacks(), "NACK for recovered packet not suppressed")
|
|
}
|