Do not process packets not processed by RTPStats. (#2015)

Seeing the case of a stream starting with padding packets
on migration. As publisher in that case is always sending packets.
it is possible that the new node gets padding packets at the start.
Processing them in buffer leads to trying to drop that padding packet
and adding an exclusion range. That fails because the extended
sequence number is not available for unprocessed packets.

It is okay to drop them as they will be dropped anyway.
But, they are useful for bandwidth estimation. So, headers are processed
even if the packet is RTPStats unprocessed.
This commit is contained in:
Raja Subramanian
2023-08-30 19:43:24 +05:30
committed by GitHub
parent 126872047d
commit 9afb0873ae
2 changed files with 10 additions and 1 deletions
+6 -1
View File
@@ -416,7 +416,12 @@ func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) {
}
flowState := b.updateStreamState(&rtpPacket, arrivalTime)
// process header extensions always as padding packets could be used for probing
b.processHeaderExtensions(&rtpPacket, arrivalTime)
if flowState.IsNotHandled {
return
}
if len(rtpPacket.Payload) == 0 && (!flowState.IsOutOfOrder || flowState.IsDuplicate) {
// drop padding only in-order or duplicate packet
if !flowState.IsOutOfOrder {
@@ -436,7 +441,7 @@ func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) {
// 44 - padding only - out-of-order + duplicate - dropped as duplicate
//
if err := b.snRangeMap.ExcludeRange(flowState.ExtSequenceNumber, flowState.ExtSequenceNumber+1); err != nil {
b.logger.Errorw("could not exclude range", err, "sn", flowState.ExtSequenceNumber)
b.logger.Errorw("could not exclude range", err, "sn", rtpPacket.SequenceNumber, "esn", flowState.ExtSequenceNumber)
}
}
return
+4
View File
@@ -58,6 +58,8 @@ func RTPDriftToString(r *livekit.RTPDrift) string {
// -------------------------------------------------------
type RTPFlowState struct {
IsNotHandled bool
HasLoss bool
LossStartInclusive uint64
LossEndExclusive uint64
@@ -365,6 +367,7 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
defer r.lock.Unlock()
if !r.endTime.IsZero() {
flowState.IsNotHandled = true
return
}
@@ -373,6 +376,7 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
if !r.initialized {
if payloadSize == 0 {
// do not start on a padding only packet
flowState.IsNotHandled = true
return
}