From cfe3178542b515dbed4580c1b26ec5049a4c3fa5 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Sun, 15 Dec 2024 14:33:02 +0530 Subject: [PATCH] Reconcile RTP stats with RTX data. (#3252) * RTX RTPStats * WIP * RTCP RTX handler * reconcile rtx * cache size * clean up * test * clean up --- pkg/sfu/connectionquality/connectionstats.go | 10 +- pkg/sfu/downtrack.go | 145 ++++++++++++++----- pkg/sfu/playoutdelay_test.go | 2 +- pkg/sfu/rtpstats/rtpstats_base.go | 64 ++++++++ pkg/sfu/rtpstats/rtpstats_sender.go | 30 ++-- 5 files changed, 190 insertions(+), 61 deletions(-) diff --git a/pkg/sfu/connectionquality/connectionstats.go b/pkg/sfu/connectionquality/connectionstats.go index c5a857be4..eb254831a 100644 --- a/pkg/sfu/connectionquality/connectionstats.go +++ b/pkg/sfu/connectionquality/connectionstats.go @@ -43,8 +43,8 @@ type ConnectionStatsReceiverProvider interface { type ConnectionStatsSenderProvider interface { GetDeltaStatsSender() map[uint32]*buffer.StreamStatsWithLayers - GetLastReceiverReportTime() time.Time - GetTotalPacketsSent() uint64 + GetPrimaryStreamLastReceiverReportTime() time.Time + GetPrimaryStreamPacketsSent() uint64 } type ConnectionStatsParams struct { @@ -237,7 +237,7 @@ func (cs *ConnectionStats) updateScoreFromReceiverReport(at time.Time) (float32, streams := cs.params.SenderProvider.GetDeltaStatsSender() if len(streams) == 0 { // check for receiver report not received for a while - marker := cs.params.SenderProvider.GetLastReceiverReportTime() + marker := cs.params.SenderProvider.GetPrimaryStreamLastReceiverReportTime() if marker.IsZero() || streamingStartedAt.After(marker) { marker = streamingStartedAt } @@ -253,7 +253,7 @@ func (cs *ConnectionStats) updateScoreFromReceiverReport(at time.Time) (float32, // delta stat duration could be large due to not receiving receiver report for a long time (for example, due to mute), // adjust to streaming start if necessary - if streamingStartedAt.After(cs.params.SenderProvider.GetLastReceiverReportTime()) { + if streamingStartedAt.After(cs.params.SenderProvider.GetPrimaryStreamLastReceiverReportTime()) { // last receiver report was before streaming started, wait for next one mos, _ := cs.scorer.GetMOSAndQuality() return mos, streams @@ -294,7 +294,7 @@ func (cs *ConnectionStats) updateStreamingStart(at time.Time) time.Time { cs.lock.Lock() defer cs.lock.Unlock() - packetsSent := cs.params.SenderProvider.GetTotalPacketsSent() + packetsSent := cs.params.SenderProvider.GetPrimaryStreamPacketsSent() if packetsSent > cs.packetsSent { if cs.streamingStartedAt.IsZero() { // the start could be anywhere after last update, but using `at` as this is not required to be accurate diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index d5797639c..f75d6502a 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -142,15 +142,19 @@ var ( // ------------------------------------------------------------------- type DownTrackState struct { - RTPStats *rtpstats.RTPStatsSender - DeltaStatsSenderSnapshotId uint32 - ForwarderState *livekit.RTPForwarderState - PlayoutDelayControllerState PlayoutDelayControllerState + RTPStats *rtpstats.RTPStatsSender + DeltaStatsSenderSnapshotId uint32 + RTPStatsRTX *rtpstats.RTPStatsSender + DeltaStatsRTXSenderSnapshotId uint32 + ForwarderState *livekit.RTPForwarderState + PlayoutDelayControllerState PlayoutDelayControllerState } func (d DownTrackState) MarshalLogObject(e zapcore.ObjectEncoder) error { e.AddObject("RTPStats", d.RTPStats) e.AddUint32("DeltaStatsSenderSnapshotId", d.DeltaStatsSenderSnapshotId) + e.AddObject("RTPStatsRTX", d.RTPStatsRTX) + e.AddUint32("DeltaStatsRTXSenderSnapshotId", d.DeltaStatsRTXSenderSnapshotId) e.AddObject("ForwarderState", logger.Proto(d.ForwarderState)) e.AddObject("PlayoutDelayControllerState", d.PlayoutDelayControllerState) return nil @@ -291,14 +295,17 @@ type DownTrack struct { writeStopped atomic.Bool isReceiverReady bool - rtpStats *rtpstats.RTPStatsSender + rtpStats *rtpstats.RTPStatsSender + deltaStatsSenderSnapshotId uint32 + + rtpStatsRTX *rtpstats.RTPStatsSender + deltaStatsRTXSenderSnapshotId uint32 totalRepeatedNACKs atomic.Uint32 blankFramesGeneration atomic.Uint32 - connectionStats *connectionquality.ConnectionStats - deltaStatsSenderSnapshotId uint32 + connectionStats *connectionquality.ConnectionStats isNACKThrottled atomic.Bool @@ -363,9 +370,15 @@ func NewDownTrack(params DowntrackParams) (*DownTrack, error) { d.rtpStats = rtpstats.NewRTPStatsSender(rtpstats.RTPStatsParams{ ClockRate: d.codec.ClockRate, Logger: d.params.Logger, - }) + }, 4096) d.deltaStatsSenderSnapshotId = d.rtpStats.NewSenderSnapshotId() + d.rtpStatsRTX = rtpstats.NewRTPStatsSender(rtpstats.RTPStatsParams{ + ClockRate: d.codec.ClockRate, + Logger: d.params.Logger, + }, 1024) + d.deltaStatsRTXSenderSnapshotId = d.rtpStatsRTX.NewSenderSnapshotId() + d.forwarder = NewForwarder( d.kind, d.params.Logger, @@ -529,7 +542,7 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, if d.ssrcRTX != 0 { if rr := d.params.BufferFactory.GetOrNew(packetio.RTCPBufferPacket, d.ssrcRTX).(*buffer.RTCPReader); rr != nil { rr.OnPacket(func(pkt []byte) { - d.handleRTCP(pkt) + d.handleRTCPRTX(pkt) }) d.rtcpReaderRTX = rr } @@ -956,11 +969,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { // WritePaddingRTP tries to write as many padding only RTP packets as necessary // to satisfy given size to the DownTrack -func (d *DownTrack) WritePaddingRTP( - bytesToSend int, - paddingOnMute bool, - forceMarker bool, -) int { +func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool, forceMarker bool) int { if !d.writable.Load() { return 0 } @@ -1189,11 +1198,13 @@ func (d *DownTrack) CloseWithFlush(flush bool) { d.connectionStats.Close() d.rtpStats.Stop() + d.rtpStatsRTX.Stop() d.params.Logger.Debugw("rtp stats", "direction", "downstream", "mime", d.mime, "ssrc", d.ssrc, "stats", d.rtpStats, + "statsRTX", d.rtpStatsRTX, ) d.maxLayerNotifierChMu.Lock() @@ -1241,9 +1252,11 @@ func (d *DownTrack) MaxLayer() buffer.VideoLayer { func (d *DownTrack) GetState() DownTrackState { dts := DownTrackState{ - RTPStats: d.rtpStats, - DeltaStatsSenderSnapshotId: d.deltaStatsSenderSnapshotId, - ForwarderState: d.forwarder.GetState(), + RTPStats: d.rtpStats, + DeltaStatsSenderSnapshotId: d.deltaStatsSenderSnapshotId, + RTPStatsRTX: d.rtpStatsRTX, + DeltaStatsRTXSenderSnapshotId: d.deltaStatsRTXSenderSnapshotId, + ForwarderState: d.forwarder.GetState(), } if d.playoutDelay != nil { @@ -1263,6 +1276,10 @@ func (d *DownTrack) SeedState(state DownTrackState) { d.playoutDelay.SeedState(state.PlayoutDelayControllerState) } } + if state.RTPStatsRTX != nil { + d.rtpStatsRTX.Seed(state.RTPStatsRTX) + d.deltaStatsRTXSenderSnapshotId = state.DeltaStatsRTXSenderSnapshotId + } d.forwarder.SeedState(state.ForwarderState) } @@ -1522,6 +1539,8 @@ func (d *DownTrack) CreateSenderReport() *rtcp.SenderReport { _, tsOffset, refSenderReport := d.forwarder.GetSenderReportParams() return d.rtpStats.GetRtcpSenderReport(d.ssrc, refSenderReport, tsOffset, !d.params.DisableSenderReportPassThrough) + + // not sending RTCP Sender Report for RTX } func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan struct{} { @@ -1702,7 +1721,7 @@ func (d *DownTrack) getH264BlankFrame(_frameEndNeeded bool) ([]byte, error) { func (d *DownTrack) handleRTCP(bytes []byte) { pkts, err := rtcp.Unmarshal(bytes) if err != nil { - d.params.Logger.Errorw("could not unmarshal rtcp receiver packets", err) + d.params.Logger.Errorw("could not unmarshal rtcp receiver packet", err) return } @@ -1754,7 +1773,6 @@ func (d *DownTrack) handleRTCP(bytes []byte) { if r.SSRC != d.ssrc { continue } - rr.Reports = append(rr.Reports, r) rtt, isRttChanged := d.rtpStats.UpdateFromReceiverReport(r) if isRttChanged { @@ -1770,6 +1788,14 @@ func (d *DownTrack) handleRTCP(bytes []byte) { } } } + // RTX-TODO: This is used for media loss proxying only as of 2024-12-15. + // Ideally, this should keep deltas between previous RTCP Receiver Report + // and current report, calculate the loss in the window and reconcile it with + // data in a similar window from RTX stream (to ensure losses are discounted + // for NACKs), but keeping this simple for several reasons + // - media loss proxying is a configurable setting and could be disabled + // - media loss proxying is used for audio only and audio may not have NACKing + // - to keep it simple if len(rr.Reports) > 0 { d.listenerLock.RLock() rrListeners := d.receiverReportListeners @@ -1842,6 +1868,27 @@ func (d *DownTrack) handleRTCP(bytes []byte) { } } +func (d *DownTrack) handleRTCPRTX(bytes []byte) { + pkts, err := rtcp.Unmarshal(bytes) + if err != nil { + d.params.Logger.Errorw("could not unmarshal rtcp rtx receiver packet", err) + return + } + + for _, pkt := range pkts { + switch p := pkt.(type) { + case *rtcp.ReceiverReport: + for _, r := range p.Reports { + if r.SSRC != d.ssrcRTX { + continue + } + + d.rtpStatsRTX.UpdateFromReceiverReport(r) + } + } + } +} + func (d *DownTrack) SetConnected() { d.bindLock.Lock() if !d.connected.Swap(true) { @@ -1874,11 +1921,12 @@ func (d *DownTrack) retransmitPacket(epm *extPacketMeta, sourcePkt []byte, isPro SSRC: d.ssrc, } rtxOffset := 0 + rtxExtSequenceNumber := d.rtxSequenceNumber.Inc() if d.payloadTypeRTX != 0 && d.ssrcRTX != 0 { rtxOffset = 2 hdr.PayloadType = d.payloadTypeRTX - hdr.SequenceNumber = uint16(d.rtxSequenceNumber.Inc()) + hdr.SequenceNumber = uint16(rtxExtSequenceNumber) hdr.SSRC = d.ssrcRTX } @@ -1926,16 +1974,29 @@ func (d *DownTrack) retransmitPacket(epm *extPacketMeta, sourcePkt []byte, isPro } else { payloadSize, paddingSize, isOutOfOrder = len(payload), 0, true } - d.rtpStats.Update( - mono.UnixNano(), - epm.extSequenceNumber, - epm.extTimestamp, - hdr.Marker, - headerSize, - payloadSize, - paddingSize, - isOutOfOrder, - ) + if hdr.SSRC == d.ssrcRTX { + d.rtpStatsRTX.Update( + mono.UnixNano(), + rtxExtSequenceNumber, + 0, + hdr.Marker, + headerSize, + payloadSize, + paddingSize, + isOutOfOrder, + ) + } else { + d.rtpStats.Update( + mono.UnixNano(), + epm.extSequenceNumber, + epm.extTimestamp, + hdr.Marker, + headerSize, + payloadSize, + paddingSize, + isOutOfOrder, + ) + } d.pacer.Enqueue(&pacer.Packet{ Header: hdr, HeaderSize: headerSize, @@ -2022,13 +2083,14 @@ func (d *DownTrack) WriteProbePackets(bytesToSend int, usePadding bool) int { return 0 } + rtxExtSequenceNumber := d.rtxSequenceNumber.Inc() for i := 0; i < num; i++ { hdr := &rtp.Header{ Version: 2, Padding: true, Marker: false, PayloadType: d.payloadTypeRTX, - SequenceNumber: uint16(d.rtxSequenceNumber.Inc()), + SequenceNumber: uint16(rtxExtSequenceNumber), Timestamp: 0, SSRC: d.ssrcRTX, } @@ -2040,18 +2102,16 @@ func (d *DownTrack) WriteProbePackets(bytesToSend int, usePadding bool) int { hdrSize := hdr.MarshalSize() payloadSize := len(payload) - /* RTX-TODO - d.rtpStats.Update( + d.rtpStatsRTX.Update( mono.UnixNano(), - snts[i].extSequenceNumber, - snts[i].extTimestamp, + rtxExtSequenceNumber, + 0, hdr.Marker, hdrSize, 0, payloadSize, false, ) - */ d.pacer.Enqueue(&pacer.Packet{ Header: hdr, HeaderSize: hdrSize, @@ -2148,7 +2208,7 @@ func (d *DownTrack) GetConnectionScoreAndQuality() (float32, livekit.ConnectionQ } func (d *DownTrack) GetTrackStats() *livekit.RTPStats { - return d.rtpStats.ToProto() + return rtpstats.ReconcileRTPStatsWithRTX(d.rtpStats.ToProto(), d.rtpStatsRTX.ToProto()) } func (d *DownTrack) deltaStats(ds *rtpstats.RTPDeltaInfo) map[uint32]*buffer.StreamStatsWithLayers { @@ -2168,14 +2228,19 @@ func (d *DownTrack) deltaStats(ds *rtpstats.RTPDeltaInfo) map[uint32]*buffer.Str } func (d *DownTrack) GetDeltaStatsSender() map[uint32]*buffer.StreamStatsWithLayers { - return d.deltaStats(d.rtpStats.DeltaInfoSender(d.deltaStatsSenderSnapshotId)) + return d.deltaStats( + rtpstats.ReconcileRTPDeltaInfoWithRTX( + d.rtpStats.DeltaInfoSender(d.deltaStatsSenderSnapshotId), + d.rtpStatsRTX.DeltaInfoSender(d.deltaStatsRTXSenderSnapshotId), + ), + ) } -func (d *DownTrack) GetLastReceiverReportTime() time.Time { +func (d *DownTrack) GetPrimaryStreamLastReceiverReportTime() time.Time { return d.rtpStats.LastReceiverReportTime() } -func (d *DownTrack) GetTotalPacketsSent() uint64 { +func (d *DownTrack) GetPrimaryStreamPacketsSent() uint64 { return d.rtpStats.GetPacketsSeenMinusPadding() } diff --git a/pkg/sfu/playoutdelay_test.go b/pkg/sfu/playoutdelay_test.go index a39674d1f..419f47300 100644 --- a/pkg/sfu/playoutdelay_test.go +++ b/pkg/sfu/playoutdelay_test.go @@ -26,7 +26,7 @@ import ( ) func TestPlayoutDelay(t *testing.T) { - stats := rtpstats.NewRTPStatsSender(rtpstats.RTPStatsParams{ClockRate: 900000, Logger: logger.GetLogger()}) + stats := rtpstats.NewRTPStatsSender(rtpstats.RTPStatsParams{ClockRate: 900000, Logger: logger.GetLogger()}, 128) c, err := NewPlayoutDelayController(100, 120, logger.GetLogger(), stats) require.NoError(t, err) diff --git a/pkg/sfu/rtpstats/rtpstats_base.go b/pkg/sfu/rtpstats/rtpstats_base.go index 329275749..40ec921c3 100644 --- a/pkg/sfu/rtpstats/rtpstats_base.go +++ b/pkg/sfu/rtpstats/rtpstats_base.go @@ -54,6 +54,7 @@ type RTPDeltaInfo struct { RttMax uint32 JitterMax float64 Nacks uint32 + NackRepeated uint32 Plis uint32 Firs uint32 } @@ -81,6 +82,7 @@ func (r *RTPDeltaInfo) MarshalLogObject(e zapcore.ObjectEncoder) error { e.AddUint32("RttMax", r.RttMax) e.AddFloat64("JitterMax", r.JitterMax) e.AddUint32("Nacks", r.Nacks) + e.AddUint32("NackRepeated", r.NackRepeated) e.AddUint32("Plis", r.Plis) e.AddUint32("Firs", r.Firs) return nil @@ -897,4 +899,66 @@ func AggregateRTPDeltaInfo(deltaInfoList []*RTPDeltaInfo) *RTPDeltaInfo { } } +func ReconcileRTPStatsWithRTX(primaryStats *livekit.RTPStats, rtxStats *livekit.RTPStats) *livekit.RTPStats { + if primaryStats == nil || rtxStats == nil { + return primaryStats + } + + primaryStats.PacketsDuplicate += rtxStats.Packets + primaryStats.PacketDuplicateRate = float64(primaryStats.PacketsDuplicate) / primaryStats.Duration + + primaryStats.BytesDuplicate += rtxStats.Bytes + primaryStats.HeaderBytesDuplicate += rtxStats.HeaderBytes + primaryStats.BitrateDuplicate = float64(primaryStats.BytesDuplicate) * 8.0 / primaryStats.Duration + + primaryStats.PacketsPadding += rtxStats.PacketsPadding + primaryStats.PacketPaddingRate = float64(primaryStats.PacketsPadding) / primaryStats.Duration + + primaryStats.BytesPadding += rtxStats.BytesPadding + primaryStats.HeaderBytesPadding += rtxStats.HeaderBytesPadding + primaryStats.BitratePadding = float64(primaryStats.BytesPadding) * 8.0 / primaryStats.Duration + + // RTX non-padding packets are responses to NACKs, that should discount packets lost, + lossAdjustment := rtxStats.Packets - rtxStats.PacketsLost - primaryStats.NackRepeated + if int32(lossAdjustment) < 0 { + lossAdjustment = 0 + } + if lossAdjustment >= primaryStats.PacketsLost { + primaryStats.PacketsLost = 0 + } else { + primaryStats.PacketsLost -= lossAdjustment + } + primaryStats.PacketLossRate = float64(primaryStats.PacketsLost) / primaryStats.Duration + primaryStats.PacketLossPercentage = float32(primaryStats.PacketsLost) / float32(primaryStats.Packets+primaryStats.PacketsPadding+primaryStats.PacketsLost) * 100.0 + return primaryStats +} + +func ReconcileRTPDeltaInfoWithRTX(primaryDeltaInfo *RTPDeltaInfo, rtxDeltaInfo *RTPDeltaInfo) *RTPDeltaInfo { + if primaryDeltaInfo == nil || rtxDeltaInfo == nil { + return primaryDeltaInfo + } + + primaryDeltaInfo.PacketsDuplicate += rtxDeltaInfo.Packets + + primaryDeltaInfo.BytesDuplicate += rtxDeltaInfo.Bytes + primaryDeltaInfo.HeaderBytesDuplicate += rtxDeltaInfo.HeaderBytes + + primaryDeltaInfo.PacketsPadding += rtxDeltaInfo.PacketsPadding + + primaryDeltaInfo.BytesPadding += rtxDeltaInfo.BytesPadding + primaryDeltaInfo.HeaderBytesPadding += rtxDeltaInfo.HeaderBytesPadding + + // RTX non-padding packets are responses to NACKs, that should discount packets lost + lossAdjustment := rtxDeltaInfo.Packets - rtxDeltaInfo.PacketsLost - primaryDeltaInfo.NackRepeated + if int32(lossAdjustment) < 0 { + lossAdjustment = 0 + } + if lossAdjustment >= primaryDeltaInfo.PacketsLost { + primaryDeltaInfo.PacketsLost = 0 + } else { + primaryDeltaInfo.PacketsLost -= lossAdjustment + } + return primaryDeltaInfo +} + // ------------------------------------------------------------------- diff --git a/pkg/sfu/rtpstats/rtpstats_sender.go b/pkg/sfu/rtpstats/rtpstats_sender.go index a9f573d56..e3ee972a8 100644 --- a/pkg/sfu/rtpstats/rtpstats_sender.go +++ b/pkg/sfu/rtpstats/rtpstats_sender.go @@ -29,11 +29,6 @@ import ( "github.com/livekit/protocol/utils/mono" ) -const ( - cSnInfoSize = 4096 - cSnInfoMask = cSnInfoSize - 1 -) - // ------------------------------------------------------------------- type snInfoFlag byte @@ -147,9 +142,10 @@ type senderSnapshot struct { frames uint32 - nacks uint32 - plis uint32 - firs uint32 + nacks uint32 + nackRepeated uint32 + plis uint32 + firs uint32 maxRtt uint32 maxJitterFeed float64 @@ -223,7 +219,7 @@ type RTPStatsSender struct { jitterFromRR float64 maxJitterFromRR float64 - snInfos [cSnInfoSize]snInfo + snInfos []snInfo layerLockPlis uint32 lastLayerLockPli time.Time @@ -238,9 +234,10 @@ type RTPStatsSender struct { timeReversedCount int } -func NewRTPStatsSender(params RTPStatsParams) *RTPStatsSender { +func NewRTPStatsSender(params RTPStatsParams, cacheSize int) *RTPStatsSender { return &RTPStatsSender{ rtpStatsBase: newRTPStatsBase(params), + snInfos: make([]snInfo, cacheSize), nextSenderSnapshotID: cFirstSnapshotID, senderSnapshots: make([]senderSnapshot, 2), } @@ -271,7 +268,8 @@ func (r *RTPStatsSender) Seed(from *RTPStatsSender) { r.jitterFromRR = from.jitterFromRR r.maxJitterFromRR = from.maxJitterFromRR - r.snInfos = from.snInfos + r.snInfos = make([]snInfo, len(from.snInfos)) + copy(r.snInfos, from.snInfos) r.layerLockPlis = from.layerLockPlis r.lastLayerLockPli = from.lastLayerLockPli @@ -891,6 +889,7 @@ func (r *RTPStatsSender) DeltaInfoSender(senderSnapshotID uint32) *RTPDeltaInfo RttMax: then.maxRtt, JitterMax: maxJitterTime, Nacks: now.nacks - then.nacks, + NackRepeated: now.nackRepeated - then.nackRepeated, Plis: now.plis - then.plis, Firs: now.firs - then.firs, } @@ -968,6 +967,7 @@ func (r *RTPStatsSender) getSenderSnapshot(startTime time.Time, s *senderSnapsho packetsLostFromRR: r.packetsLostFromRR, frames: s.frames + s.intervalStats.frames, nacks: r.nacks, + nackRepeated: r.nackRepeated, plis: r.plis, firs: r.firs, maxRtt: r.rtt, @@ -979,12 +979,12 @@ func (r *RTPStatsSender) getSenderSnapshot(startTime time.Time, s *senderSnapsho func (r *RTPStatsSender) getSnInfoOutOfOrderSlot(esn uint64, ehsn uint64) int { offset := int64(ehsn - esn) - if offset >= cSnInfoSize || offset < 0 { + if offset >= int64(len(r.snInfos)) || offset < 0 { // too old OR too new (i. e. ahead of highest) return -1 } - return int(esn & cSnInfoMask) + return int(esn) % len(r.snInfos) } func (r *RTPStatsSender) setSnInfo(esn uint64, ehsn uint64, pktSize uint16, hdrSize uint8, payloadSize uint16, marker bool, isOutOfOrder bool) { @@ -995,7 +995,7 @@ func (r *RTPStatsSender) setSnInfo(esn uint64, ehsn uint64, pktSize uint16, hdrS return } } else { - slot = int(esn & cSnInfoMask) + slot = int(esn) % len(r.snInfos) } snInfo := &r.snInfos[slot] @@ -1019,7 +1019,7 @@ func (r *RTPStatsSender) clearSnInfos(extStartInclusive uint64, extEndExclusive } for esn := extStartInclusive; esn != extEndExclusive; esn++ { - snInfo := &r.snInfos[esn&cSnInfoMask] + snInfo := &r.snInfos[int(esn)%len(r.snInfos)] snInfo.pktSize = 0 snInfo.hdrSize = 0 snInfo.flags = 0