From 12a9d74acbbbf2b96f1bae99cd3f03413cb2f518 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Mon, 6 Nov 2023 10:41:56 +0530 Subject: [PATCH] Do not restart on receiver side. (#2224) * Do not restart on receiver side. Restart with wrap back causes issues in the forwarding path as the subscriber assumes the extended type from receiver side does not restart. Restart was an attempt to include as many packets as possible, but in practice is not super useful. So, taking it out. Can clean up a bit more stuff, but want to run this first and check for any oddities. * fix test --- pkg/sfu/buffer/rtpstats_receiver.go | 47 ++++++++++++------------ pkg/sfu/buffer/rtpstats_receiver_test.go | 32 ++++++++-------- 2 files changed, 40 insertions(+), 39 deletions(-) diff --git a/pkg/sfu/buffer/rtpstats_receiver.go b/pkg/sfu/buffer/rtpstats_receiver.go index 3869ebae8..dad7c66ef 100644 --- a/pkg/sfu/buffer/rtpstats_receiver.go +++ b/pkg/sfu/buffer/rtpstats_receiver.go @@ -129,31 +129,30 @@ func (r *RTPStatsReceiver) Update( pktSize := uint64(hdrSize + payloadSize + paddingSize) gapSN := int64(resSN.ExtendedVal - resSN.PreExtendedHighest) if gapSN <= 0 { // duplicate OR out-of-order - if payloadSize == 0 { - // do not start on a padding only packet - if resTS.IsRestart { - r.logger.Infow( - "rolling back timestamp restart", - "tsBefore", resTS.PreExtendedStart, - "tsAfter", r.timestamp.GetExtendedStart(), - "snBefore", resSN.PreExtendedStart, - "snAfter", r.sequenceNumber.GetExtendedStart(), - ) - r.timestamp.RollbackRestart(resTS.PreExtendedStart) - } - if resSN.IsRestart { - r.logger.Infow( - "rolling back sequence number restart", - "snBefore", resSN.PreExtendedStart, - "snAfter", r.sequenceNumber.GetExtendedStart(), - "tsBefore", resTS.PreExtendedStart, - "tsAfter", r.timestamp.GetExtendedStart(), - ) - r.sequenceNumber.RollbackRestart(resSN.PreExtendedStart) - flowState.IsNotHandled = true - return - } + // before start, don't restart + if resTS.IsRestart { + r.logger.Infow( + "rolling back timestamp restart", + "tsBefore", resTS.PreExtendedStart, + "tsAfter", r.timestamp.GetExtendedStart(), + "snBefore", resSN.PreExtendedStart, + "snAfter", r.sequenceNumber.GetExtendedStart(), + ) + r.timestamp.RollbackRestart(resTS.PreExtendedStart) } + if resSN.IsRestart { + r.logger.Infow( + "rolling back sequence number restart", + "snBefore", resSN.PreExtendedStart, + "snAfter", r.sequenceNumber.GetExtendedStart(), + "tsBefore", resTS.PreExtendedStart, + "tsAfter", r.timestamp.GetExtendedStart(), + ) + r.sequenceNumber.RollbackRestart(resSN.PreExtendedStart) + flowState.IsNotHandled = true + return + } + if -gapSN >= cNumSequenceNumbers/2 { r.logger.Warnw( "large sequence number gap negative", nil, diff --git a/pkg/sfu/buffer/rtpstats_receiver_test.go b/pkg/sfu/buffer/rtpstats_receiver_test.go index 34fea0f4b..7a39c1b54 100644 --- a/pkg/sfu/buffer/rtpstats_receiver_test.go +++ b/pkg/sfu/buffer/rtpstats_receiver_test.go @@ -130,7 +130,7 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { require.Equal(t, timestamp, r.timestamp.GetHighest()) require.Equal(t, timestamp, uint32(r.timestamp.GetExtendedHighest())) - // out-of-order + // out-of-order, would cause a restart which is disallowed packet = getPacket(sequenceNumber-10, timestamp-30000, 1000) flowState = r.Update( time.Now(), @@ -142,14 +142,15 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { 0, ) require.False(t, flowState.HasLoss) + require.True(t, flowState.IsNotHandled) require.Equal(t, sequenceNumber, r.sequenceNumber.GetHighest()) require.Equal(t, sequenceNumber, uint16(r.sequenceNumber.GetExtendedHighest())) require.Equal(t, timestamp, r.timestamp.GetHighest()) require.Equal(t, timestamp, uint32(r.timestamp.GetExtendedHighest())) - require.Equal(t, uint64(1), r.packetsOutOfOrder) + require.Equal(t, uint64(0), r.packetsOutOfOrder) require.Equal(t, uint64(0), r.packetsDuplicate) - // duplicate + // duplicate of the above out-of-order packet, but would not be handled as it causes a restart packet = getPacket(sequenceNumber-10, timestamp-30000, 1000) flowState = r.Update( time.Now(), @@ -161,12 +162,13 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { 0, ) require.False(t, flowState.HasLoss) + require.True(t, flowState.IsNotHandled) require.Equal(t, sequenceNumber, r.sequenceNumber.GetHighest()) require.Equal(t, sequenceNumber, uint16(r.sequenceNumber.GetExtendedHighest())) require.Equal(t, timestamp, r.timestamp.GetHighest()) require.Equal(t, timestamp, uint32(r.timestamp.GetExtendedHighest())) - require.Equal(t, uint64(2), r.packetsOutOfOrder) - require.Equal(t, uint64(1), r.packetsDuplicate) + require.Equal(t, uint64(0), r.packetsOutOfOrder) + require.Equal(t, uint64(0), r.packetsDuplicate) // loss sequenceNumber += 10 @@ -184,10 +186,10 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { require.True(t, flowState.HasLoss) require.Equal(t, uint64(sequenceNumber-9), flowState.LossStartInclusive) require.Equal(t, uint64(sequenceNumber), flowState.LossEndExclusive) - require.Equal(t, uint64(17), r.packetsLost) + require.Equal(t, uint64(9), r.packetsLost) // out-of-order should decrement number of lost packets - packet = getPacket(sequenceNumber-15, timestamp-45000, 1000) + packet = getPacket(sequenceNumber-6, timestamp-45000, 1000) flowState = r.Update( time.Now(), packet.Header.SequenceNumber, @@ -202,9 +204,9 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { require.Equal(t, sequenceNumber, uint16(r.sequenceNumber.GetExtendedHighest())) require.Equal(t, timestamp, r.timestamp.GetHighest()) require.Equal(t, timestamp, uint32(r.timestamp.GetExtendedHighest())) - require.Equal(t, uint64(3), r.packetsOutOfOrder) - require.Equal(t, uint64(1), r.packetsDuplicate) - require.Equal(t, uint64(16), r.packetsLost) + require.Equal(t, uint64(1), r.packetsOutOfOrder) + require.Equal(t, uint64(0), r.packetsDuplicate) + require.Equal(t, uint64(8), r.packetsLost) // test sequence number history // with a gap @@ -223,7 +225,7 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { require.True(t, flowState.HasLoss) require.Equal(t, uint64(sequenceNumber-1), flowState.LossStartInclusive) require.Equal(t, uint64(sequenceNumber), flowState.LossEndExclusive) - require.Equal(t, uint64(17), r.packetsLost) + require.Equal(t, uint64(9), r.packetsLost) require.False(t, r.history.IsSet(uint64(sequenceNumber)-1)) // out-of-order @@ -240,8 +242,8 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { 0, ) require.False(t, flowState.HasLoss) - require.Equal(t, uint64(16), r.packetsLost) - require.Equal(t, uint64(4), r.packetsOutOfOrder) + require.Equal(t, uint64(8), r.packetsLost) + require.Equal(t, uint64(2), r.packetsOutOfOrder) require.True(t, r.history.IsSet(uint64(sequenceNumber))) // padding only @@ -257,8 +259,8 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { 25, ) require.False(t, flowState.HasLoss) - require.Equal(t, uint64(16), r.packetsLost) - require.Equal(t, uint64(4), r.packetsOutOfOrder) + require.Equal(t, uint64(8), r.packetsLost) + require.Equal(t, uint64(2), r.packetsOutOfOrder) require.True(t, r.history.IsSet(uint64(sequenceNumber))) require.True(t, r.history.IsSet(uint64(sequenceNumber)-1)) require.True(t, r.history.IsSet(uint64(sequenceNumber)-2))