diff --git a/pkg/sfu/buffer/rtpstats_receiver.go b/pkg/sfu/buffer/rtpstats_receiver.go index 3869ebae8..dad7c66ef 100644 --- a/pkg/sfu/buffer/rtpstats_receiver.go +++ b/pkg/sfu/buffer/rtpstats_receiver.go @@ -129,31 +129,30 @@ func (r *RTPStatsReceiver) Update( pktSize := uint64(hdrSize + payloadSize + paddingSize) gapSN := int64(resSN.ExtendedVal - resSN.PreExtendedHighest) if gapSN <= 0 { // duplicate OR out-of-order - if payloadSize == 0 { - // do not start on a padding only packet - if resTS.IsRestart { - r.logger.Infow( - "rolling back timestamp restart", - "tsBefore", resTS.PreExtendedStart, - "tsAfter", r.timestamp.GetExtendedStart(), - "snBefore", resSN.PreExtendedStart, - "snAfter", r.sequenceNumber.GetExtendedStart(), - ) - r.timestamp.RollbackRestart(resTS.PreExtendedStart) - } - if resSN.IsRestart { - r.logger.Infow( - "rolling back sequence number restart", - "snBefore", resSN.PreExtendedStart, - "snAfter", r.sequenceNumber.GetExtendedStart(), - "tsBefore", resTS.PreExtendedStart, - "tsAfter", r.timestamp.GetExtendedStart(), - ) - r.sequenceNumber.RollbackRestart(resSN.PreExtendedStart) - flowState.IsNotHandled = true - return - } + // before start, don't restart + if resTS.IsRestart { + r.logger.Infow( + "rolling back timestamp restart", + "tsBefore", resTS.PreExtendedStart, + "tsAfter", r.timestamp.GetExtendedStart(), + "snBefore", resSN.PreExtendedStart, + "snAfter", r.sequenceNumber.GetExtendedStart(), + ) + r.timestamp.RollbackRestart(resTS.PreExtendedStart) } + if resSN.IsRestart { + r.logger.Infow( + "rolling back sequence number restart", + "snBefore", resSN.PreExtendedStart, + "snAfter", r.sequenceNumber.GetExtendedStart(), + "tsBefore", resTS.PreExtendedStart, + "tsAfter", r.timestamp.GetExtendedStart(), + ) + r.sequenceNumber.RollbackRestart(resSN.PreExtendedStart) + flowState.IsNotHandled = true + return + } + if -gapSN >= cNumSequenceNumbers/2 { r.logger.Warnw( "large sequence number gap negative", nil, diff --git a/pkg/sfu/buffer/rtpstats_receiver_test.go b/pkg/sfu/buffer/rtpstats_receiver_test.go index 34fea0f4b..7a39c1b54 100644 --- a/pkg/sfu/buffer/rtpstats_receiver_test.go +++ b/pkg/sfu/buffer/rtpstats_receiver_test.go @@ -130,7 +130,7 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { require.Equal(t, timestamp, r.timestamp.GetHighest()) require.Equal(t, timestamp, uint32(r.timestamp.GetExtendedHighest())) - // out-of-order + // out-of-order, would cause a restart which is disallowed packet = getPacket(sequenceNumber-10, timestamp-30000, 1000) flowState = r.Update( time.Now(), @@ -142,14 +142,15 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { 0, ) require.False(t, flowState.HasLoss) + require.True(t, flowState.IsNotHandled) require.Equal(t, sequenceNumber, r.sequenceNumber.GetHighest()) require.Equal(t, sequenceNumber, uint16(r.sequenceNumber.GetExtendedHighest())) require.Equal(t, timestamp, r.timestamp.GetHighest()) require.Equal(t, timestamp, uint32(r.timestamp.GetExtendedHighest())) - require.Equal(t, uint64(1), r.packetsOutOfOrder) + require.Equal(t, uint64(0), r.packetsOutOfOrder) require.Equal(t, uint64(0), r.packetsDuplicate) - // duplicate + // duplicate of the above out-of-order packet, but would not be handled as it causes a restart packet = getPacket(sequenceNumber-10, timestamp-30000, 1000) flowState = r.Update( time.Now(), @@ -161,12 +162,13 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { 0, ) require.False(t, flowState.HasLoss) + require.True(t, flowState.IsNotHandled) require.Equal(t, sequenceNumber, r.sequenceNumber.GetHighest()) require.Equal(t, sequenceNumber, uint16(r.sequenceNumber.GetExtendedHighest())) require.Equal(t, timestamp, r.timestamp.GetHighest()) require.Equal(t, timestamp, uint32(r.timestamp.GetExtendedHighest())) - require.Equal(t, uint64(2), r.packetsOutOfOrder) - require.Equal(t, uint64(1), r.packetsDuplicate) + require.Equal(t, uint64(0), r.packetsOutOfOrder) + require.Equal(t, uint64(0), r.packetsDuplicate) // loss sequenceNumber += 10 @@ -184,10 +186,10 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { require.True(t, flowState.HasLoss) require.Equal(t, uint64(sequenceNumber-9), flowState.LossStartInclusive) require.Equal(t, uint64(sequenceNumber), flowState.LossEndExclusive) - require.Equal(t, uint64(17), r.packetsLost) + require.Equal(t, uint64(9), r.packetsLost) // out-of-order should decrement number of lost packets - packet = getPacket(sequenceNumber-15, timestamp-45000, 1000) + packet = getPacket(sequenceNumber-6, timestamp-45000, 1000) flowState = r.Update( time.Now(), packet.Header.SequenceNumber, @@ -202,9 +204,9 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { require.Equal(t, sequenceNumber, uint16(r.sequenceNumber.GetExtendedHighest())) require.Equal(t, timestamp, r.timestamp.GetHighest()) require.Equal(t, timestamp, uint32(r.timestamp.GetExtendedHighest())) - require.Equal(t, uint64(3), r.packetsOutOfOrder) - require.Equal(t, uint64(1), r.packetsDuplicate) - require.Equal(t, uint64(16), r.packetsLost) + require.Equal(t, uint64(1), r.packetsOutOfOrder) + require.Equal(t, uint64(0), r.packetsDuplicate) + require.Equal(t, uint64(8), r.packetsLost) // test sequence number history // with a gap @@ -223,7 +225,7 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { require.True(t, flowState.HasLoss) require.Equal(t, uint64(sequenceNumber-1), flowState.LossStartInclusive) require.Equal(t, uint64(sequenceNumber), flowState.LossEndExclusive) - require.Equal(t, uint64(17), r.packetsLost) + require.Equal(t, uint64(9), r.packetsLost) require.False(t, r.history.IsSet(uint64(sequenceNumber)-1)) // out-of-order @@ -240,8 +242,8 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { 0, ) require.False(t, flowState.HasLoss) - require.Equal(t, uint64(16), r.packetsLost) - require.Equal(t, uint64(4), r.packetsOutOfOrder) + require.Equal(t, uint64(8), r.packetsLost) + require.Equal(t, uint64(2), r.packetsOutOfOrder) require.True(t, r.history.IsSet(uint64(sequenceNumber))) // padding only @@ -257,8 +259,8 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { 25, ) require.False(t, flowState.HasLoss) - require.Equal(t, uint64(16), r.packetsLost) - require.Equal(t, uint64(4), r.packetsOutOfOrder) + require.Equal(t, uint64(8), r.packetsLost) + require.Equal(t, uint64(2), r.packetsOutOfOrder) require.True(t, r.history.IsSet(uint64(sequenceNumber))) require.True(t, r.history.IsSet(uint64(sequenceNumber)-1)) require.True(t, r.history.IsSet(uint64(sequenceNumber)-2))