diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index ca4a8621c..b27338946 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -42,7 +42,6 @@ import ( "github.com/livekit/livekit-server/pkg/sfu/videolayerselector/temporallayerselector" ) -// Forwarder const ( FlagPauseOnDowngrade = true FlagFilterRTX = false @@ -55,6 +54,11 @@ const ( SwitchAheadThresholdSeconds = float64(0.025) // 25ms ) +var ( + errSkipStartOnOutOfOrderPacket = errors.New("skip start on out-of-order packet") + errSwitchPointTooFarBehind = errors.New("switch point too far behind") +) + // ------------------------------------------------------------------- type VideoPauseReason int @@ -1693,6 +1697,10 @@ func (f *Forwarder) getRefLayerRTPTimestamp(ts uint32, refLayer, targetLayer int func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) error { if !f.started { + if extPkt.IsOutOfOrder { + return errSkipStartOnOutOfOrderPacket + } + f.started = true f.referenceLayerSpatial = layer f.rtpMunger.SetLastSnTs(extPkt) @@ -1708,6 +1716,10 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e ) return nil } else if f.referenceLayerSpatial == buffer.InvalidLayerSpatial { + if extPkt.IsOutOfOrder { + return errSkipStartOnOutOfOrderPacket + } + f.referenceLayerSpatial = layer f.codecMunger.SetLast(extPkt) f.logger.Debugw( @@ -1887,7 +1899,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e // (like "have waited for too long for layer switch, nothing available, switch to whatever is available" kind of condition). logTransition("layer switch, reference too far behind", extExpectedTS, extRefTS, extLastTS, diffSeconds) - return errors.New("switch point too far behind") + return errSwitchPointTooFarBehind } // use a nominal increase to ensure that timestamp is always moving forward diff --git a/pkg/sfu/forwarder_test.go b/pkg/sfu/forwarder_test.go index 54bdbbd14..649ebbcc1 100644 --- a/pkg/sfu/forwarder_test.go +++ b/pkg/sfu/forwarder_test.go @@ -1273,22 +1273,41 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) { f := newForwarder(testutils.TestOpusCodec, webrtc.RTPCodecTypeAudio) params := &testutils.TestExtPacketParams{ + SequenceNumber: 23332, + Timestamp: 0xabcdef, + SSRC: 0x12345678, + PayloadSize: 20, + IsOutOfOrder: true, + } + extPkt, _ := testutils.GetTestExtPacket(params) + + // should not start on an out-of-order packet + expectedTP := TranslationParams{ + shouldDrop: true, + } + actualTP, err := f.GetTranslationParams(extPkt, 0) + require.NoError(t, err) + require.Equal(t, expectedTP, actualTP) + require.False(t, f.started) + require.Zero(t, f.lastSSRC) + + params = &testutils.TestExtPacketParams{ SequenceNumber: 23333, Timestamp: 0xabcdef, SSRC: 0x12345678, PayloadSize: 20, } - extPkt, _ := testutils.GetTestExtPacket(params) + extPkt, _ = testutils.GetTestExtPacket(params) - // should lock onto the first packet - expectedTP := TranslationParams{ + // should lock onto the first in-order packet + expectedTP = TranslationParams{ rtp: TranslationParamsRTP{ snOrdering: SequenceNumberOrderingContiguous, extSequenceNumber: 23333, extTimestamp: 0xabcdef, }, } - actualTP, err := f.GetTranslationParams(extPkt, 0) + actualTP, err = f.GetTranslationParams(extPkt, 0) require.NoError(t, err) require.Equal(t, expectedTP, actualTP) require.True(t, f.started) @@ -1437,11 +1456,12 @@ func TestForwarderGetTranslationParamsVideo(t *testing.T) { f := newForwarder(testutils.TestVP8Codec, webrtc.RTPCodecTypeVideo) params := &testutils.TestExtPacketParams{ - SequenceNumber: 23333, + SequenceNumber: 23332, Timestamp: 0xabcdef, SSRC: 0x12345678, PayloadSize: 20, - SetMarker: true, + Marker: true, + IsOutOfOrder: true, } vp8 := &buffer.VP8{ FirstByte: 25, @@ -1460,13 +1480,47 @@ func TestForwarderGetTranslationParamsVideo(t *testing.T) { } extPkt, _ := testutils.GetTestExtPacketVP8(params, vp8) - // no target layers, should drop + // should not start on an out-of-order packet expectedTP := TranslationParams{ shouldDrop: true, } actualTP, err := f.GetTranslationParams(extPkt, 0) require.NoError(t, err) require.Equal(t, expectedTP, actualTP) + require.False(t, f.started) + require.Zero(t, f.lastSSRC) + + params = &testutils.TestExtPacketParams{ + SequenceNumber: 23333, + Timestamp: 0xabcdef, + SSRC: 0x12345678, + PayloadSize: 20, + Marker: true, + } + vp8 = &buffer.VP8{ + FirstByte: 25, + I: true, + M: true, + PictureID: 13467, + L: true, + TL0PICIDX: 233, + T: true, + TID: 0, + Y: true, + K: true, + KEYIDX: 23, + HeaderSize: 6, + IsKeyFrame: false, + } + extPkt, _ = testutils.GetTestExtPacketVP8(params, vp8) + + // no target layers, should drop + expectedTP = TranslationParams{ + shouldDrop: true, + } + actualTP, err = f.GetTranslationParams(extPkt, 0) + require.NoError(t, err) + require.Equal(t, expectedTP, actualTP) // although target layer matches, not a key frame, so should drop f.vls.SetTarget(buffer.VideoLayer{ diff --git a/pkg/sfu/rtpmunger_test.go b/pkg/sfu/rtpmunger_test.go index 08e51a128..df90ec2d4 100644 --- a/pkg/sfu/rtpmunger_test.go +++ b/pkg/sfu/rtpmunger_test.go @@ -570,7 +570,7 @@ func TestIsOnFrameBoundary(t *testing.T) { // packet with RTP marker params = &testutils.TestExtPacketParams{ - SetMarker: true, + Marker: true, SequenceNumber: 23334, Timestamp: 0xabcdef, SSRC: 0x12345678, diff --git a/pkg/sfu/testutils/data.go b/pkg/sfu/testutils/data.go index 7f7c96872..a4ebc7bed 100644 --- a/pkg/sfu/testutils/data.go +++ b/pkg/sfu/testutils/data.go @@ -26,7 +26,7 @@ import ( // ----------------------------------------------------------- type TestExtPacketParams struct { - SetMarker bool + Marker bool IsKeyFrame bool PayloadType uint8 SequenceNumber uint16 @@ -38,6 +38,7 @@ type TestExtPacketParams struct { PaddingSize byte ArrivalTime time.Time VideoLayer buffer.VideoLayer + IsOutOfOrder bool } // ----------------------------------------------------------- @@ -47,7 +48,7 @@ func GetTestExtPacket(params *TestExtPacketParams) (*buffer.ExtPacket, error) { Header: rtp.Header{ Version: 2, Padding: params.PaddingSize != 0, - Marker: params.SetMarker, + Marker: params.Marker, PayloadType: params.PayloadType, SequenceNumber: params.SequenceNumber, Timestamp: params.Timestamp, @@ -70,6 +71,7 @@ func GetTestExtPacket(params *TestExtPacketParams) (*buffer.ExtPacket, error) { Packet: &packet, KeyFrame: params.IsKeyFrame, RawPacket: raw, + IsOutOfOrder: params.IsOutOfOrder, } return ep, nil