From 9afb0873aec89eacdc322efc826bb840f31681e8 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Wed, 30 Aug 2023 19:43:24 +0530 Subject: [PATCH] Do not process packets not processed by RTPStats. (#2015) Seeing the case of a stream starting with padding packets on migration. As publisher in that case is always sending packets. it is possible that the new node gets padding packets at the start. Processing them in buffer leads to trying to drop that padding packet and adding an exclusion range. That fails because the extended sequence number is not available for unprocessed packets. It is okay to drop them as they will be dropped anyway. But, they are useful for bandwidth estimation. So, headers are processed even if the packet is RTPStats unprocessed. --- pkg/sfu/buffer/buffer.go | 7 ++++++- pkg/sfu/buffer/rtpstats.go | 4 ++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index 93471804e..a08d75d5a 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -416,7 +416,12 @@ func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) { } flowState := b.updateStreamState(&rtpPacket, arrivalTime) + // process header extensions always as padding packets could be used for probing b.processHeaderExtensions(&rtpPacket, arrivalTime) + if flowState.IsNotHandled { + return + } + if len(rtpPacket.Payload) == 0 && (!flowState.IsOutOfOrder || flowState.IsDuplicate) { // drop padding only in-order or duplicate packet if !flowState.IsOutOfOrder { @@ -436,7 +441,7 @@ func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) { // 44 - padding only - out-of-order + duplicate - dropped as duplicate // if err := b.snRangeMap.ExcludeRange(flowState.ExtSequenceNumber, flowState.ExtSequenceNumber+1); err != nil { - b.logger.Errorw("could not exclude range", err, "sn", flowState.ExtSequenceNumber) + b.logger.Errorw("could not exclude range", err, "sn", rtpPacket.SequenceNumber, "esn", flowState.ExtSequenceNumber) } } return diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index d9f58ba67..16dd309c8 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -58,6 +58,8 @@ func RTPDriftToString(r *livekit.RTPDrift) string { // ------------------------------------------------------- type RTPFlowState struct { + IsNotHandled bool + HasLoss bool LossStartInclusive uint64 LossEndExclusive uint64 @@ -365,6 +367,7 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa defer r.lock.Unlock() if !r.endTime.IsZero() { + flowState.IsNotHandled = true return } @@ -373,6 +376,7 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa if !r.initialized { if payloadSize == 0 { // do not start on a padding only packet + flowState.IsNotHandled = true return }