mirror of
https://github.com/livekit/livekit.git
synced 2026-03-30 22:05:39 +00:00
Do not restart on receiver side. (#2224)
* Do not restart on receiver side. Restart with wrap back causes issues in the forwarding path as the subscriber assumes the extended type from receiver side does not restart. Restart was an attempt to include as many packets as possible, but in practice is not super useful. So, taking it out. Can clean up a bit more stuff, but want to run this first and check for any oddities. * fix test
This commit is contained in:
@@ -129,31 +129,30 @@ func (r *RTPStatsReceiver) Update(
|
||||
pktSize := uint64(hdrSize + payloadSize + paddingSize)
|
||||
gapSN := int64(resSN.ExtendedVal - resSN.PreExtendedHighest)
|
||||
if gapSN <= 0 { // duplicate OR out-of-order
|
||||
if payloadSize == 0 {
|
||||
// do not start on a padding only packet
|
||||
if resTS.IsRestart {
|
||||
r.logger.Infow(
|
||||
"rolling back timestamp restart",
|
||||
"tsBefore", resTS.PreExtendedStart,
|
||||
"tsAfter", r.timestamp.GetExtendedStart(),
|
||||
"snBefore", resSN.PreExtendedStart,
|
||||
"snAfter", r.sequenceNumber.GetExtendedStart(),
|
||||
)
|
||||
r.timestamp.RollbackRestart(resTS.PreExtendedStart)
|
||||
}
|
||||
if resSN.IsRestart {
|
||||
r.logger.Infow(
|
||||
"rolling back sequence number restart",
|
||||
"snBefore", resSN.PreExtendedStart,
|
||||
"snAfter", r.sequenceNumber.GetExtendedStart(),
|
||||
"tsBefore", resTS.PreExtendedStart,
|
||||
"tsAfter", r.timestamp.GetExtendedStart(),
|
||||
)
|
||||
r.sequenceNumber.RollbackRestart(resSN.PreExtendedStart)
|
||||
flowState.IsNotHandled = true
|
||||
return
|
||||
}
|
||||
// before start, don't restart
|
||||
if resTS.IsRestart {
|
||||
r.logger.Infow(
|
||||
"rolling back timestamp restart",
|
||||
"tsBefore", resTS.PreExtendedStart,
|
||||
"tsAfter", r.timestamp.GetExtendedStart(),
|
||||
"snBefore", resSN.PreExtendedStart,
|
||||
"snAfter", r.sequenceNumber.GetExtendedStart(),
|
||||
)
|
||||
r.timestamp.RollbackRestart(resTS.PreExtendedStart)
|
||||
}
|
||||
if resSN.IsRestart {
|
||||
r.logger.Infow(
|
||||
"rolling back sequence number restart",
|
||||
"snBefore", resSN.PreExtendedStart,
|
||||
"snAfter", r.sequenceNumber.GetExtendedStart(),
|
||||
"tsBefore", resTS.PreExtendedStart,
|
||||
"tsAfter", r.timestamp.GetExtendedStart(),
|
||||
)
|
||||
r.sequenceNumber.RollbackRestart(resSN.PreExtendedStart)
|
||||
flowState.IsNotHandled = true
|
||||
return
|
||||
}
|
||||
|
||||
if -gapSN >= cNumSequenceNumbers/2 {
|
||||
r.logger.Warnw(
|
||||
"large sequence number gap negative", nil,
|
||||
|
||||
@@ -130,7 +130,7 @@ func Test_RTPStatsReceiver_Update(t *testing.T) {
|
||||
require.Equal(t, timestamp, r.timestamp.GetHighest())
|
||||
require.Equal(t, timestamp, uint32(r.timestamp.GetExtendedHighest()))
|
||||
|
||||
// out-of-order
|
||||
// out-of-order, would cause a restart which is disallowed
|
||||
packet = getPacket(sequenceNumber-10, timestamp-30000, 1000)
|
||||
flowState = r.Update(
|
||||
time.Now(),
|
||||
@@ -142,14 +142,15 @@ func Test_RTPStatsReceiver_Update(t *testing.T) {
|
||||
0,
|
||||
)
|
||||
require.False(t, flowState.HasLoss)
|
||||
require.True(t, flowState.IsNotHandled)
|
||||
require.Equal(t, sequenceNumber, r.sequenceNumber.GetHighest())
|
||||
require.Equal(t, sequenceNumber, uint16(r.sequenceNumber.GetExtendedHighest()))
|
||||
require.Equal(t, timestamp, r.timestamp.GetHighest())
|
||||
require.Equal(t, timestamp, uint32(r.timestamp.GetExtendedHighest()))
|
||||
require.Equal(t, uint64(1), r.packetsOutOfOrder)
|
||||
require.Equal(t, uint64(0), r.packetsOutOfOrder)
|
||||
require.Equal(t, uint64(0), r.packetsDuplicate)
|
||||
|
||||
// duplicate
|
||||
// duplicate of the above out-of-order packet, but would not be handled as it causes a restart
|
||||
packet = getPacket(sequenceNumber-10, timestamp-30000, 1000)
|
||||
flowState = r.Update(
|
||||
time.Now(),
|
||||
@@ -161,12 +162,13 @@ func Test_RTPStatsReceiver_Update(t *testing.T) {
|
||||
0,
|
||||
)
|
||||
require.False(t, flowState.HasLoss)
|
||||
require.True(t, flowState.IsNotHandled)
|
||||
require.Equal(t, sequenceNumber, r.sequenceNumber.GetHighest())
|
||||
require.Equal(t, sequenceNumber, uint16(r.sequenceNumber.GetExtendedHighest()))
|
||||
require.Equal(t, timestamp, r.timestamp.GetHighest())
|
||||
require.Equal(t, timestamp, uint32(r.timestamp.GetExtendedHighest()))
|
||||
require.Equal(t, uint64(2), r.packetsOutOfOrder)
|
||||
require.Equal(t, uint64(1), r.packetsDuplicate)
|
||||
require.Equal(t, uint64(0), r.packetsOutOfOrder)
|
||||
require.Equal(t, uint64(0), r.packetsDuplicate)
|
||||
|
||||
// loss
|
||||
sequenceNumber += 10
|
||||
@@ -184,10 +186,10 @@ func Test_RTPStatsReceiver_Update(t *testing.T) {
|
||||
require.True(t, flowState.HasLoss)
|
||||
require.Equal(t, uint64(sequenceNumber-9), flowState.LossStartInclusive)
|
||||
require.Equal(t, uint64(sequenceNumber), flowState.LossEndExclusive)
|
||||
require.Equal(t, uint64(17), r.packetsLost)
|
||||
require.Equal(t, uint64(9), r.packetsLost)
|
||||
|
||||
// out-of-order should decrement number of lost packets
|
||||
packet = getPacket(sequenceNumber-15, timestamp-45000, 1000)
|
||||
packet = getPacket(sequenceNumber-6, timestamp-45000, 1000)
|
||||
flowState = r.Update(
|
||||
time.Now(),
|
||||
packet.Header.SequenceNumber,
|
||||
@@ -202,9 +204,9 @@ func Test_RTPStatsReceiver_Update(t *testing.T) {
|
||||
require.Equal(t, sequenceNumber, uint16(r.sequenceNumber.GetExtendedHighest()))
|
||||
require.Equal(t, timestamp, r.timestamp.GetHighest())
|
||||
require.Equal(t, timestamp, uint32(r.timestamp.GetExtendedHighest()))
|
||||
require.Equal(t, uint64(3), r.packetsOutOfOrder)
|
||||
require.Equal(t, uint64(1), r.packetsDuplicate)
|
||||
require.Equal(t, uint64(16), r.packetsLost)
|
||||
require.Equal(t, uint64(1), r.packetsOutOfOrder)
|
||||
require.Equal(t, uint64(0), r.packetsDuplicate)
|
||||
require.Equal(t, uint64(8), r.packetsLost)
|
||||
|
||||
// test sequence number history
|
||||
// with a gap
|
||||
@@ -223,7 +225,7 @@ func Test_RTPStatsReceiver_Update(t *testing.T) {
|
||||
require.True(t, flowState.HasLoss)
|
||||
require.Equal(t, uint64(sequenceNumber-1), flowState.LossStartInclusive)
|
||||
require.Equal(t, uint64(sequenceNumber), flowState.LossEndExclusive)
|
||||
require.Equal(t, uint64(17), r.packetsLost)
|
||||
require.Equal(t, uint64(9), r.packetsLost)
|
||||
require.False(t, r.history.IsSet(uint64(sequenceNumber)-1))
|
||||
|
||||
// out-of-order
|
||||
@@ -240,8 +242,8 @@ func Test_RTPStatsReceiver_Update(t *testing.T) {
|
||||
0,
|
||||
)
|
||||
require.False(t, flowState.HasLoss)
|
||||
require.Equal(t, uint64(16), r.packetsLost)
|
||||
require.Equal(t, uint64(4), r.packetsOutOfOrder)
|
||||
require.Equal(t, uint64(8), r.packetsLost)
|
||||
require.Equal(t, uint64(2), r.packetsOutOfOrder)
|
||||
require.True(t, r.history.IsSet(uint64(sequenceNumber)))
|
||||
|
||||
// padding only
|
||||
@@ -257,8 +259,8 @@ func Test_RTPStatsReceiver_Update(t *testing.T) {
|
||||
25,
|
||||
)
|
||||
require.False(t, flowState.HasLoss)
|
||||
require.Equal(t, uint64(16), r.packetsLost)
|
||||
require.Equal(t, uint64(4), r.packetsOutOfOrder)
|
||||
require.Equal(t, uint64(8), r.packetsLost)
|
||||
require.Equal(t, uint64(2), r.packetsOutOfOrder)
|
||||
require.True(t, r.history.IsSet(uint64(sequenceNumber)))
|
||||
require.True(t, r.history.IsSet(uint64(sequenceNumber)-1))
|
||||
require.True(t, r.history.IsSet(uint64(sequenceNumber)-2))
|
||||
|
||||
Reference in New Issue
Block a user