diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index da2b2e692..330b93508 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -48,6 +48,55 @@ room: require.Error(t, err) } +func TestFlexFECConfigEnabled(t *testing.T) { + require.False(t, FlexFECConfig{}.Enabled(), "disabled by default") + require.False(t, FlexFECConfig{Publisher: true}.Enabled(), "payload type is required") + require.False(t, FlexFECConfig{Subscriber: true}.Enabled(), "payload type is required") + require.True(t, FlexFECConfig{Publisher: true, PayloadType: 49}.Enabled()) + require.True(t, FlexFECConfig{Subscriber: true, PayloadType: 49}.Enabled()) + require.True(t, FlexFECConfig{Publisher: true, Subscriber: true, PayloadType: 49}.Enabled()) +} + +func TestFlexFECConfigEncoderParams(t *testing.T) { + for _, tc := range []struct { + name string + conf FlexFECConfig + expected [2]uint32 + }{ + { + name: "defaults", + conf: FlexFECConfig{}, + expected: [2]uint32{10, 2}, + }, + { + name: "custom", + conf: FlexFECConfig{NumMediaPackets: 8, NumFecPackets: 3}, + expected: [2]uint32{8, 3}, + }, + { + name: "media_clamped_to_minimum", + conf: FlexFECConfig{NumMediaPackets: 1, NumFecPackets: 1}, + expected: [2]uint32{2, 1}, + }, + { + name: "fec_clamped_below_media", + conf: FlexFECConfig{NumMediaPackets: 5, NumFecPackets: 5}, + expected: [2]uint32{5, 4}, + }, + { + name: "fec_above_media_clamped", + conf: FlexFECConfig{NumMediaPackets: 4, NumFecPackets: 10}, + expected: [2]uint32{4, 3}, + }, + } { + t.Run(tc.name, func(t *testing.T) { + numMediaPackets, numFecPackets := tc.conf.EncoderParams() + require.Equal(t, tc.expected[0], numMediaPackets) + require.Equal(t, tc.expected[1], numFecPackets) + }) + } +} + func TestGeneratedFlags(t *testing.T) { generatedFlags, err := GenerateCLIFlags(nil, false) require.NoError(t, err) diff --git a/pkg/rtc/flexfec_test.go b/pkg/rtc/flexfec_test.go new file mode 100644 index 000000000..064a78155 --- /dev/null +++ b/pkg/rtc/flexfec_test.go @@ -0,0 +1,58 @@ +// Copyright 2024 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rtc + +import ( + "testing" + + "github.com/pion/sdp/v3" + "github.com/stretchr/testify/require" + + "github.com/livekit/protocol/logger" +) + +func TestPublisherFlexFECRepairPairsFromSDP(t *testing.T) { + const offerSDP = "v=0\r\n" + + "o=- 0 0 IN IP4 127.0.0.1\r\n" + + "s=-\r\n" + + "t=0 0\r\n" + + "m=video 9 UDP/TLS/RTP/SAVPF 96 49\r\n" + + "c=IN IP4 0.0.0.0\r\n" + + "a=mid:0\r\n" + + "a=sendonly\r\n" + + "a=rtpmap:96 VP8/90000\r\n" + + "a=rtpmap:49 flexfec-03/90000\r\n" + + "a=ssrc:1111 cname:publisher\r\n" + + "a=ssrc:2222 cname:publisher\r\n" + + "a=ssrc-group:FEC-FR 1111 2222\r\n" + + "m=video 9 UDP/TLS/RTP/SAVPF 96 49\r\n" + + "c=IN IP4 0.0.0.0\r\n" + + "a=mid:1\r\n" + + "a=sendonly\r\n" + + "a=rid:high send\r\n" + + "a=rtpmap:96 VP8/90000\r\n" + + "a=rtpmap:49 flexfec-03/90000\r\n" + + "a=ssrc:3333 cname:publisher\r\n" + + "a=ssrc:4444 cname:publisher\r\n" + + "a=ssrc-group:FEC-FR 3333 4444\r\n" + + var sd sdp.SessionDescription + require.NoError(t, sd.UnmarshalString(offerSDP)) + + require.Equal(t, + map[uint32]uint32{2222: 1111}, + nonSimulcastFECRepairsFromSDP(&sd, logger.GetLogger()), + ) +} diff --git a/pkg/sfu/buffer/flexfec_test.go b/pkg/sfu/buffer/flexfec_test.go index 4f4e3f5d3..abfdabb72 100644 --- a/pkg/sfu/buffer/flexfec_test.go +++ b/pkg/sfu/buffer/flexfec_test.go @@ -25,11 +25,11 @@ import ( "github.com/stretchr/testify/require" ) -// TestFlexFECRecoveryThroughFactory exercises the full SFU receive path: the source and -// repair buffers are created through the factory (as the SRTP layer would), associated -// via SetFECPair, and a lost source packet is recovered from the FlexFEC repair stream -// and forwarded out of the source buffer. -func TestFlexFECRecoveryThroughFactory(t *testing.T) { +// TestPublisherToSFUFlexFECRecoveryThroughFactory exercises the publisher -> SFU receive +// path: the source and repair buffers are created through the factory (as the SRTP layer +// would), associated via SetFECPair, and a lost publisher packet is recovered from the +// FlexFEC repair stream and forwarded out of the source buffer. +func TestPublisherToSFUFlexFECRecoveryThroughFactory(t *testing.T) { const ( mediaSSRC = uint32(0xAABBCCDD) fecSSRC = uint32(0x11223344) @@ -38,28 +38,150 @@ func TestFlexFECRecoveryThroughFactory(t *testing.T) { dropIdx = 6 ) - factory := NewFactoryOfBufferFactory(InitPacketBufferSizeVideo, InitPacketBufferSizeAudio).CreateBufferFactory() + mediaBuf, fecBuf := newFECBufferPair(t, mediaSSRC, fecSSRC) + defer mediaBuf.Close() + defer fecBuf.Close() - mediaBuf, ok := factory.GetOrNew(packetio.RTPBufferPacket, mediaSSRC).(*Buffer) - require.True(t, ok) - mediaBuf.codecType = webrtc.RTPCodecTypeAudio - require.NoError(t, mediaBuf.Bind( - webrtc.RTPParameters{Codecs: []webrtc.RTPCodecParameters{opusCodec}}, - opusCodec.RTPCodecCapability, - 0, - )) + media := makeFECTestMediaPackets(mediaSSRC, baseSeq, numMedia) + fecPackets := makeFECTestRepairPackets(t, media, fecSSRC) + got := collectExtPackets(mediaBuf) + + writeMediaExcept(t, mediaBuf, media, dropIdx) + writeRepairPackets(t, fecBuf, fecPackets) + + requireRecoveredPacket(t, got, media[dropIdx]) +} + +// TestPublisherToSFUFlexFECRecoveryReplaysBufferedRepairPackets covers the ordering that +// can happen on ingress: a FlexFEC repair packet reaches the buffer before the SDP-derived +// FEC-FR association has been applied. The repair buffer must replay its queued packet +// into the source decoder once SetFECPair wires the streams together. +func TestPublisherToSFUFlexFECRecoveryReplaysBufferedRepairPackets(t *testing.T) { + const ( + mediaSSRC = uint32(0x01020304) + fecSSRC = uint32(0x05060708) + baseSeq = uint16(9000) + numMedia = 10 + dropIdx = 3 + ) + + factory := NewFactoryOfBufferFactory(InitPacketBufferSizeVideo, InitPacketBufferSizeAudio).CreateBufferFactory() fecBuf, ok := factory.GetOrNew(packetio.RTPBufferPacket, fecSSRC).(*Buffer) require.True(t, ok) + defer fecBuf.Close() + + media := makeFECTestMediaPackets(mediaSSRC, baseSeq, numMedia) + fecPackets := makeFECTestRepairPackets(t, media, fecSSRC) + + // Repair arrives before the source buffer and FEC-FR association are known. Because + // fecBuf is not associated yet, Buffer queues the packet in pPackets. + writeRepairPackets(t, fecBuf, fecPackets) + + mediaBuf, ok := factory.GetOrNew(packetio.RTPBufferPacket, mediaSSRC).(*Buffer) + require.True(t, ok) + defer mediaBuf.Close() + bindVideoBuffer(t, mediaBuf) + + got := collectExtPackets(mediaBuf) - // associate the repair stream with the source stream (as a=ssrc-group:FEC-FR would) factory.SetFECPair(fecSSRC, mediaSSRC) require.NotNil(t, mediaBuf.fecDecoder) require.Equal(t, mediaBuf, fecBuf.primaryBufferForFEC) - // build media packets and the protecting FlexFEC packet + writeMediaExcept(t, mediaBuf, media, dropIdx) + + requireRecoveredPacket(t, got, media[dropIdx]) +} + +func TestPublisherToSFUFlexFECPairRememberedBeforeBuffers(t *testing.T) { + for _, tc := range []struct { + name string + createSourceFirst bool + }{ + {name: "source_first", createSourceFirst: true}, + {name: "repair_first", createSourceFirst: false}, + } { + t.Run(tc.name, func(t *testing.T) { + const ( + mediaSSRC = uint32(0x0A0B0C0D) + fecSSRC = uint32(0x01010101) + baseSeq = uint16(12000) + numMedia = 10 + dropIdx = 5 + ) + + factory := NewFactoryOfBufferFactory(InitPacketBufferSizeVideo, InitPacketBufferSizeAudio).CreateBufferFactory() + factory.SetFECPair(fecSSRC, mediaSSRC) + + var mediaBuf *Buffer + var fecBuf *Buffer + if tc.createSourceFirst { + mediaBuf = getRTPBuffer(t, factory, mediaSSRC) + bindVideoBuffer(t, mediaBuf) + fecBuf = getRTPBuffer(t, factory, fecSSRC) + } else { + fecBuf = getRTPBuffer(t, factory, fecSSRC) + mediaBuf = getRTPBuffer(t, factory, mediaSSRC) + bindVideoBuffer(t, mediaBuf) + } + defer mediaBuf.Close() + defer fecBuf.Close() + + require.NotNil(t, mediaBuf.fecDecoder) + require.Equal(t, mediaBuf, fecBuf.primaryBufferForFEC) + + media := makeFECTestMediaPackets(mediaSSRC, baseSeq, numMedia) + fecPackets := makeFECTestRepairPackets(t, media, fecSSRC) + got := collectExtPackets(mediaBuf) + + writeMediaExcept(t, mediaBuf, media, dropIdx) + writeRepairPackets(t, fecBuf, fecPackets) + + requireRecoveredPacket(t, got, media[dropIdx]) + }) + } +} + +func newFECBufferPair(t *testing.T, mediaSSRC, fecSSRC uint32) (*Buffer, *Buffer) { + t.Helper() + + factory := NewFactoryOfBufferFactory(InitPacketBufferSizeVideo, InitPacketBufferSizeAudio).CreateBufferFactory() + + mediaBuf := getRTPBuffer(t, factory, mediaSSRC) + bindVideoBuffer(t, mediaBuf) + + fecBuf := getRTPBuffer(t, factory, fecSSRC) + + factory.SetFECPair(fecSSRC, mediaSSRC) + require.NotNil(t, mediaBuf.fecDecoder) + require.Equal(t, mediaBuf, fecBuf.primaryBufferForFEC) + + return mediaBuf, fecBuf +} + +func getRTPBuffer(t *testing.T, factory *Factory, ssrc uint32) *Buffer { + t.Helper() + + b, ok := factory.GetOrNew(packetio.RTPBufferPacket, ssrc).(*Buffer) + require.True(t, ok) + return b +} + +func bindVideoBuffer(t *testing.T, b *Buffer) { + t.Helper() + + b.codecType = webrtc.RTPCodecTypeVideo + require.NoError(t, b.Bind( + webrtc.RTPParameters{Codecs: []webrtc.RTPCodecParameters{vp8Codec}}, + vp8Codec.RTPCodecCapability, + 0, + )) +} + +func makeFECTestMediaPackets(mediaSSRC uint32, baseSeq uint16, numMedia int) []rtp.Packet { media := make([]rtp.Packet, numMedia) - for i := 0; i < numMedia; i++ { + for i := range media { payload := make([]byte, 16) for j := range payload { payload[j] = byte((i*11 + j*5 + 1) & 0xff) @@ -67,7 +189,7 @@ func TestFlexFECRecoveryThroughFactory(t *testing.T) { media[i] = rtp.Packet{ Header: rtp.Header{ Version: 2, - PayloadType: uint8(opusCodec.PayloadType), + PayloadType: uint8(vp8Codec.PayloadType), SequenceNumber: baseSeq + uint16(i), Timestamp: uint32(8000 + i*960), SSRC: mediaSSRC, @@ -75,19 +197,26 @@ func TestFlexFECRecoveryThroughFactory(t *testing.T) { Payload: payload, } } + return media +} + +func makeFECTestRepairPackets(t *testing.T, media []rtp.Packet, fecSSRC uint32) []rtp.Packet { + t.Helper() encoder := flexfec.NewFlexEncoder03(uint8(49), fecSSRC) - mediaForFec := make([]rtp.Packet, numMedia) + mediaForFec := make([]rtp.Packet, len(media)) copy(mediaForFec, media) fecPackets := encoder.EncodeFec(mediaForFec, 1) require.NotEmpty(t, fecPackets) + return fecPackets +} - // collect forwarded packets out of the source buffer - got := make(chan *ExtPacket, numMedia*2) +func collectExtPackets(b *Buffer) <-chan *ExtPacket { + got := make(chan *ExtPacket, 32) go func() { var buf [1500]byte for { - ep, err := mediaBuf.ReadExtended(buf[:]) + ep, err := b.ReadExtended(buf[:]) if err != nil { return } @@ -97,44 +226,51 @@ func TestFlexFECRecoveryThroughFactory(t *testing.T) { } } }() + return got +} - // deliver all source packets except the dropped one - for i := 0; i < numMedia; i++ { +func writeMediaExcept(t *testing.T, b *Buffer, media []rtp.Packet, dropIdx int) { + t.Helper() + + for i := range media { if i == dropIdx { continue } raw, err := media[i].Marshal() require.NoError(t, err) - _, err = mediaBuf.Write(raw) + _, err = b.Write(raw) require.NoError(t, err) } +} + +func writeRepairPackets(t *testing.T, b *Buffer, fecPackets []rtp.Packet) { + t.Helper() - // deliver the repair packet to the FEC buffer -> triggers recovery + injection for _, fp := range fecPackets { raw, err := fp.Marshal() require.NoError(t, err) - _, err = fecBuf.Write(raw) + _, err = b.Write(raw) require.NoError(t, err) } +} + +func requireRecoveredPacket(t *testing.T, got <-chan *ExtPacket, want rtp.Packet) { + t.Helper() - // the dropped source packet must be recovered and forwarded - recoveredSeq := baseSeq + uint16(dropIdx) deadline := time.After(2 * time.Second) seen := map[uint16]*ExtPacket{} for { select { case ep := <-got: seen[ep.Packet.SequenceNumber] = ep - if rec, found := seen[recoveredSeq]; found { - require.Equal(t, mediaSSRC, rec.Packet.SSRC) - require.Equal(t, media[dropIdx].Payload, rec.Packet.Payload) - require.Equal(t, media[dropIdx].Timestamp, rec.Packet.Timestamp) - _ = mediaBuf.Close() + if rec, found := seen[want.SequenceNumber]; found { + require.Equal(t, want.SSRC, rec.Packet.SSRC) + require.Equal(t, want.Payload, rec.Packet.Payload) + require.Equal(t, want.Timestamp, rec.Packet.Timestamp) return } case <-deadline: - _ = mediaBuf.Close() - t.Fatalf("recovered packet seq %d not forwarded; saw %v", recoveredSeq, seqKeys(seen)) + t.Fatalf("recovered packet seq %d not forwarded; saw %v", want.SequenceNumber, seqKeys(seen)) } } } diff --git a/pkg/sfu/flexfec/encoder_test.go b/pkg/sfu/flexfec/encoder_test.go index 06f31d394..ce3816056 100644 --- a/pkg/sfu/flexfec/encoder_test.go +++ b/pkg/sfu/flexfec/encoder_test.go @@ -80,11 +80,12 @@ func drivePooled(wr interceptor.RTPWriter, baseSeq uint16, count int) { } } -// TestEncoderInterceptorRecoversWithPooledPayloads is the regression test for the FEC -// generation bug: our interceptor must copy the payload before retaining it, so that the -// parity it computes matches the bytes actually sent even though the caller recycles the -// payload buffer immediately after each write. -func TestEncoderInterceptorRecoversWithPooledPayloads(t *testing.T) { +// TestSFUToSubscriberFlexFECGenerationRecoversDownlinkLossWithPooledPayloads exercises the +// SFU -> subscriber leg: the SFU encoder interceptor generates a repair stream, one media +// packet is lost on the simulated downlink, and the subscriber-side decoder reconstructs +// the exact packet bytes that were actually written. It also guards the pooled-payload +// bug: the interceptor must copy payloads before retaining them for parity generation. +func TestSFUToSubscriberFlexFECGenerationRecoversDownlinkLossWithPooledPayloads(t *testing.T) { const numMedia, numFec = 5, uint32(1) wr := &recyclingWriter{fecSSRC: testFECSSRC} @@ -118,6 +119,49 @@ func TestEncoderInterceptorRecoversWithPooledPayloads(t *testing.T) { requirePacketsEqual(t, wr.media[dropped], recovered[0]) } +func TestSFUToSubscriberFlexFECGenerationUsesMultipleRepairPackets(t *testing.T) { + const numMedia, numFec = 6, uint32(2) + + wr := &recyclingWriter{fecSSRC: testFECSSRC} + factory := NewEncoderInterceptorFactory(numMedia, numFec) + itc, err := factory.NewInterceptor("") + require.NoError(t, err) + writer := itc.BindLocalStream(fecStreamInfo(), interceptor.RTPWriterFunc(wr.Write)) + + drivePooled(writer, 2000, numMedia) + + require.Len(t, wr.media, numMedia, "all media packets forwarded") + require.Len(t, wr.fec, int(numFec), "configured number of FEC packets generated") + + // Pion's coverage is interleaved by repair-packet index, so with two FEC packets + // one protects media indexes 0/2/4 and the other protects 1/3/5. Drop one from + // each coverage set to prove both generated repair packets are useful. + dropped := map[int]struct{}{ + 1: {}, + 4: {}, + } + dec := NewDecoder(testFECSSRC, testMediaSSRC) + for i, p := range wr.media { + if _, ok := dropped[i]; ok { + continue + } + require.Empty(t, dec.Decode(p)) + } + + recoveredBySeq := make(map[uint16]rtp.Packet) + for _, fp := range wr.fec { + for _, recovered := range dec.Decode(fp) { + recoveredBySeq[recovered.SequenceNumber] = recovered + } + } + + require.Len(t, recoveredBySeq, len(dropped)) + for droppedIdx := range dropped { + want := wr.media[droppedIdx] + requirePacketsEqual(t, want, recoveredBySeq[want.SequenceNumber]) + } +} + // TestPionEncoderCorruptsWithPooledPayloads documents the upstream behaviour that makes // pion's own encoder interceptor unsafe with LiveKit's pooled payloads: because it // retains the payload by reference, recycling the buffer corrupts the parity and the