From d78fdbf2a8ec8197780af9d59f34d9ed50f848fd Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Wed, 21 Aug 2024 13:32:20 +0530 Subject: [PATCH] Handle another old packet condition. (#2947) * Handle another old packet condition. With this detection, the sequence number can be rolled over even when TS rollover is not possible. For example, a track at 300 pps can rollver the sequence number space in minutes compared to 13h+ for video time stamp to roll over. * fix typo --- pkg/sfu/buffer/rtpstats_receiver.go | 64 +++++++++++++++++++---------- 1 file changed, 43 insertions(+), 21 deletions(-) diff --git a/pkg/sfu/buffer/rtpstats_receiver.go b/pkg/sfu/buffer/rtpstats_receiver.go index 2ae60072b..b9778ef6d 100644 --- a/pkg/sfu/buffer/rtpstats_receiver.go +++ b/pkg/sfu/buffer/rtpstats_receiver.go @@ -32,6 +32,8 @@ const ( // number of seconds the current report RTP timestamp can be off from expected RTP timestamp cReportSlack = float64(60.0) + + cTSJumpTooHighFactor = 100 ) // --------------------------------------------------------------------- @@ -144,6 +146,7 @@ func (r *RTPStatsReceiver) Update( var gapSN int64 var resTS utils.WrapAroundUpdateResult[uint64] var timeSinceHighest int64 + var expectedTSJump int64 var tsRolloverCount int getLoggingFields := func() []interface{} { @@ -153,6 +156,7 @@ func (r *RTPStatsReceiver) Update( "resTS", resTS, "gapTS", int64(resTS.ExtendedVal - resTS.PreExtendedHighest), "timeSinceHighest", time.Duration(timeSinceHighest), + "expectedTSJump", expectedTSJump, "tsRolloverCount", tsRolloverCount, "packetTime", time.Unix(0, packetTime).String(), "sequenceNumber", sequenceNumber, @@ -214,12 +218,37 @@ func (r *RTPStatsReceiver) Update( } gapTS := int64(resTS.ExtendedVal - resTS.PreExtendedHighest) - // it is possible to reecive old packets, - // as it is not possible to detect how far to roll back sequence number, ignore old packets + // it is possible to reecive old packets in two different scenarios + // as it is not possible to detect how far to roll back, ignore old packets + // + // Case 1: + // Very old time stamp, happens under the following conditions + // - resume after long mute, big time stamp jump + // - an out of order packet from before the mute arrives (unsure what causes this + // very old packet to be trasmitted from remote), causing time stamp to jump back + // to before mute, but it appears like it has rolled over. + // Use a threshold against expected to ignore these. + if gapSN < 0 && gapTS > 0 { + expectedTSJump = timeSinceHighest * int64(r.params.ClockRate) / 1e9 + if gapTS > expectedTSJump*cTSJumpTooHighFactor { + r.sequenceNumber.UndoUpdate(resSN) + r.timestamp.UndoUpdate(resTS) + r.logger.Warnw( + "dropping old packet, timestamp", nil, + getLoggingFields()..., + ) + flowState.IsNotHandled = true + return + } + } + + // Case 2: + // Sequence number looks like it is moving forward, but it is actually a very old packet. if gapTS < 0 && gapSN > 0 { r.sequenceNumber.UndoUpdate(resSN) + r.timestamp.UndoUpdate(resTS) r.logger.Warnw( - "dropping old packet", nil, + "dropping old packet, sequence number", nil, getLoggingFields()..., ) flowState.IsNotHandled = true @@ -228,25 +257,18 @@ func (r *RTPStatsReceiver) Update( // it is possible that sequence number has rolled over too if gapSN < 0 && gapTS > 0 && payloadSize > 0 { - if tsRolloverCount >= 0 { - // not possible to know how many cycles of sequence number roll over could have happened, - // use 1 to ensure that it at least does not go backwards - resSN = r.sequenceNumber.Rollover(sequenceNumber, 1) - if resSN.IsUnhandled { - flowState.IsNotHandled = true - return - } - - r.logger.Warnw( - "forcing sequence number rollover", nil, - getLoggingFields()..., - ) - } else { - r.logger.Warnw( - "forcing sequence number rollover skipped", nil, - getLoggingFields()..., - ) + // not possible to know how many cycles of sequence number roll over could have happened, + // use 1 to ensure that it at least does not go backwards + resSN = r.sequenceNumber.Rollover(sequenceNumber, 1) + if resSN.IsUnhandled { + flowState.IsNotHandled = true + return } + + r.logger.Warnw( + "forcing sequence number rollover", nil, + getLoggingFields()..., + ) } } gapSN = int64(resSN.ExtendedVal - resSN.PreExtendedHighest)