diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index 93471804e..a08d75d5a 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -416,7 +416,12 @@ func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) { } flowState := b.updateStreamState(&rtpPacket, arrivalTime) + // process header extensions always as padding packets could be used for probing b.processHeaderExtensions(&rtpPacket, arrivalTime) + if flowState.IsNotHandled { + return + } + if len(rtpPacket.Payload) == 0 && (!flowState.IsOutOfOrder || flowState.IsDuplicate) { // drop padding only in-order or duplicate packet if !flowState.IsOutOfOrder { @@ -436,7 +441,7 @@ func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) { // 44 - padding only - out-of-order + duplicate - dropped as duplicate // if err := b.snRangeMap.ExcludeRange(flowState.ExtSequenceNumber, flowState.ExtSequenceNumber+1); err != nil { - b.logger.Errorw("could not exclude range", err, "sn", flowState.ExtSequenceNumber) + b.logger.Errorw("could not exclude range", err, "sn", rtpPacket.SequenceNumber, "esn", flowState.ExtSequenceNumber) } } return diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index d9f58ba67..16dd309c8 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -58,6 +58,8 @@ func RTPDriftToString(r *livekit.RTPDrift) string { // ------------------------------------------------------- type RTPFlowState struct { + IsNotHandled bool + HasLoss bool LossStartInclusive uint64 LossEndExclusive uint64 @@ -365,6 +367,7 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa defer r.lock.Unlock() if !r.endTime.IsZero() { + flowState.IsNotHandled = true return } @@ -373,6 +376,7 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa if !r.initialized { if payloadSize == 0 { // do not start on a padding only packet + flowState.IsNotHandled = true return }