mirror of
https://github.com/livekit/livekit.git
synced 2026-05-15 03:05:26 +00:00
Do not start forawarding on out-of-order packet. (#3985)
It is posible that a subscriber joins when a publisher has reconnected and has received a flood of retransmitted packets due to NACKing the gap caused by the publisher reconnecting. Starting on that spurt means the subscriber gets a burst of unpaced packets that could lead to issues with calculating render time (especially obvious in cases like egress).
This commit is contained in:
+14
-2
@@ -42,7 +42,6 @@ import (
|
||||
"github.com/livekit/livekit-server/pkg/sfu/videolayerselector/temporallayerselector"
|
||||
)
|
||||
|
||||
// Forwarder
|
||||
const (
|
||||
FlagPauseOnDowngrade = true
|
||||
FlagFilterRTX = false
|
||||
@@ -55,6 +54,11 @@ const (
|
||||
SwitchAheadThresholdSeconds = float64(0.025) // 25ms
|
||||
)
|
||||
|
||||
var (
|
||||
errSkipStartOnOutOfOrderPacket = errors.New("skip start on out-of-order packet")
|
||||
errSwitchPointTooFarBehind = errors.New("switch point too far behind")
|
||||
)
|
||||
|
||||
// -------------------------------------------------------------------
|
||||
|
||||
type VideoPauseReason int
|
||||
@@ -1693,6 +1697,10 @@ func (f *Forwarder) getRefLayerRTPTimestamp(ts uint32, refLayer, targetLayer int
|
||||
|
||||
func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) error {
|
||||
if !f.started {
|
||||
if extPkt.IsOutOfOrder {
|
||||
return errSkipStartOnOutOfOrderPacket
|
||||
}
|
||||
|
||||
f.started = true
|
||||
f.referenceLayerSpatial = layer
|
||||
f.rtpMunger.SetLastSnTs(extPkt)
|
||||
@@ -1708,6 +1716,10 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
|
||||
)
|
||||
return nil
|
||||
} else if f.referenceLayerSpatial == buffer.InvalidLayerSpatial {
|
||||
if extPkt.IsOutOfOrder {
|
||||
return errSkipStartOnOutOfOrderPacket
|
||||
}
|
||||
|
||||
f.referenceLayerSpatial = layer
|
||||
f.codecMunger.SetLast(extPkt)
|
||||
f.logger.Debugw(
|
||||
@@ -1887,7 +1899,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
|
||||
// (like "have waited for too long for layer switch, nothing available, switch to whatever is available" kind of condition).
|
||||
logTransition("layer switch, reference too far behind", extExpectedTS, extRefTS, extLastTS, diffSeconds)
|
||||
|
||||
return errors.New("switch point too far behind")
|
||||
return errSwitchPointTooFarBehind
|
||||
}
|
||||
|
||||
// use a nominal increase to ensure that timestamp is always moving forward
|
||||
|
||||
@@ -1273,22 +1273,41 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) {
|
||||
f := newForwarder(testutils.TestOpusCodec, webrtc.RTPCodecTypeAudio)
|
||||
|
||||
params := &testutils.TestExtPacketParams{
|
||||
SequenceNumber: 23332,
|
||||
Timestamp: 0xabcdef,
|
||||
SSRC: 0x12345678,
|
||||
PayloadSize: 20,
|
||||
IsOutOfOrder: true,
|
||||
}
|
||||
extPkt, _ := testutils.GetTestExtPacket(params)
|
||||
|
||||
// should not start on an out-of-order packet
|
||||
expectedTP := TranslationParams{
|
||||
shouldDrop: true,
|
||||
}
|
||||
actualTP, err := f.GetTranslationParams(extPkt, 0)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expectedTP, actualTP)
|
||||
require.False(t, f.started)
|
||||
require.Zero(t, f.lastSSRC)
|
||||
|
||||
params = &testutils.TestExtPacketParams{
|
||||
SequenceNumber: 23333,
|
||||
Timestamp: 0xabcdef,
|
||||
SSRC: 0x12345678,
|
||||
PayloadSize: 20,
|
||||
}
|
||||
extPkt, _ := testutils.GetTestExtPacket(params)
|
||||
extPkt, _ = testutils.GetTestExtPacket(params)
|
||||
|
||||
// should lock onto the first packet
|
||||
expectedTP := TranslationParams{
|
||||
// should lock onto the first in-order packet
|
||||
expectedTP = TranslationParams{
|
||||
rtp: TranslationParamsRTP{
|
||||
snOrdering: SequenceNumberOrderingContiguous,
|
||||
extSequenceNumber: 23333,
|
||||
extTimestamp: 0xabcdef,
|
||||
},
|
||||
}
|
||||
actualTP, err := f.GetTranslationParams(extPkt, 0)
|
||||
actualTP, err = f.GetTranslationParams(extPkt, 0)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expectedTP, actualTP)
|
||||
require.True(t, f.started)
|
||||
@@ -1437,11 +1456,12 @@ func TestForwarderGetTranslationParamsVideo(t *testing.T) {
|
||||
f := newForwarder(testutils.TestVP8Codec, webrtc.RTPCodecTypeVideo)
|
||||
|
||||
params := &testutils.TestExtPacketParams{
|
||||
SequenceNumber: 23333,
|
||||
SequenceNumber: 23332,
|
||||
Timestamp: 0xabcdef,
|
||||
SSRC: 0x12345678,
|
||||
PayloadSize: 20,
|
||||
SetMarker: true,
|
||||
Marker: true,
|
||||
IsOutOfOrder: true,
|
||||
}
|
||||
vp8 := &buffer.VP8{
|
||||
FirstByte: 25,
|
||||
@@ -1460,13 +1480,47 @@ func TestForwarderGetTranslationParamsVideo(t *testing.T) {
|
||||
}
|
||||
extPkt, _ := testutils.GetTestExtPacketVP8(params, vp8)
|
||||
|
||||
// no target layers, should drop
|
||||
// should not start on an out-of-order packet
|
||||
expectedTP := TranslationParams{
|
||||
shouldDrop: true,
|
||||
}
|
||||
actualTP, err := f.GetTranslationParams(extPkt, 0)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expectedTP, actualTP)
|
||||
require.False(t, f.started)
|
||||
require.Zero(t, f.lastSSRC)
|
||||
|
||||
params = &testutils.TestExtPacketParams{
|
||||
SequenceNumber: 23333,
|
||||
Timestamp: 0xabcdef,
|
||||
SSRC: 0x12345678,
|
||||
PayloadSize: 20,
|
||||
Marker: true,
|
||||
}
|
||||
vp8 = &buffer.VP8{
|
||||
FirstByte: 25,
|
||||
I: true,
|
||||
M: true,
|
||||
PictureID: 13467,
|
||||
L: true,
|
||||
TL0PICIDX: 233,
|
||||
T: true,
|
||||
TID: 0,
|
||||
Y: true,
|
||||
K: true,
|
||||
KEYIDX: 23,
|
||||
HeaderSize: 6,
|
||||
IsKeyFrame: false,
|
||||
}
|
||||
extPkt, _ = testutils.GetTestExtPacketVP8(params, vp8)
|
||||
|
||||
// no target layers, should drop
|
||||
expectedTP = TranslationParams{
|
||||
shouldDrop: true,
|
||||
}
|
||||
actualTP, err = f.GetTranslationParams(extPkt, 0)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expectedTP, actualTP)
|
||||
|
||||
// although target layer matches, not a key frame, so should drop
|
||||
f.vls.SetTarget(buffer.VideoLayer{
|
||||
|
||||
@@ -570,7 +570,7 @@ func TestIsOnFrameBoundary(t *testing.T) {
|
||||
|
||||
// packet with RTP marker
|
||||
params = &testutils.TestExtPacketParams{
|
||||
SetMarker: true,
|
||||
Marker: true,
|
||||
SequenceNumber: 23334,
|
||||
Timestamp: 0xabcdef,
|
||||
SSRC: 0x12345678,
|
||||
|
||||
@@ -26,7 +26,7 @@ import (
|
||||
// -----------------------------------------------------------
|
||||
|
||||
type TestExtPacketParams struct {
|
||||
SetMarker bool
|
||||
Marker bool
|
||||
IsKeyFrame bool
|
||||
PayloadType uint8
|
||||
SequenceNumber uint16
|
||||
@@ -38,6 +38,7 @@ type TestExtPacketParams struct {
|
||||
PaddingSize byte
|
||||
ArrivalTime time.Time
|
||||
VideoLayer buffer.VideoLayer
|
||||
IsOutOfOrder bool
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------
|
||||
@@ -47,7 +48,7 @@ func GetTestExtPacket(params *TestExtPacketParams) (*buffer.ExtPacket, error) {
|
||||
Header: rtp.Header{
|
||||
Version: 2,
|
||||
Padding: params.PaddingSize != 0,
|
||||
Marker: params.SetMarker,
|
||||
Marker: params.Marker,
|
||||
PayloadType: params.PayloadType,
|
||||
SequenceNumber: params.SequenceNumber,
|
||||
Timestamp: params.Timestamp,
|
||||
@@ -70,6 +71,7 @@ func GetTestExtPacket(params *TestExtPacketParams) (*buffer.ExtPacket, error) {
|
||||
Packet: &packet,
|
||||
KeyFrame: params.IsKeyFrame,
|
||||
RawPacket: raw,
|
||||
IsOutOfOrder: params.IsOutOfOrder,
|
||||
}
|
||||
|
||||
return ep, nil
|
||||
|
||||
Reference in New Issue
Block a user