diff --git a/pkg/rtc/wrappedreceiver.go b/pkg/rtc/wrappedreceiver.go index 593a62487..853d2c4f4 100644 --- a/pkg/rtc/wrappedreceiver.go +++ b/pkg/rtc/wrappedreceiver.go @@ -317,9 +317,9 @@ func (d *DummyReceiver) GetCalculatedClockRate(layer int32) uint32 { return 0 } -func (d *DummyReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) { +func (d *DummyReceiver) GetReferenceLayerRTPTimestamp(ets uint64, layer int32, referenceLayer int32) (uint64, error) { if r, ok := d.receiver.Load().(sfu.TrackReceiver); ok { - return r.GetReferenceLayerRTPTimestamp(ts, layer, referenceLayer) + return r.GetReferenceLayerRTPTimestamp(ets, layer, referenceLayer) } return 0, errors.New("receiver not available") } diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index 0550f9cf6..939f5c6d8 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -54,7 +54,8 @@ type pendingPacket struct { type ExtPacket struct { VideoLayer Arrival time.Time - ExtSequenceNumber uint32 + ExtSequenceNumber uint64 + ExtTimestamp uint64 Packet *rtp.Packet Payload interface{} KeyFrame bool @@ -82,7 +83,7 @@ type Buffer struct { closed atomic.Bool mime string - snRangeMap *utils.RangeMap[uint32, uint32] + snRangeMap *utils.RangeMap[uint64, uint64] latestTSForAudioLevelInitialized bool latestTSForAudioLevel uint32 @@ -127,7 +128,7 @@ func NewBuffer(ssrc uint32, vp, ap *sync.Pool) *Buffer { mediaSSRC: ssrc, videoPool: vp, audioPool: ap, - snRangeMap: utils.NewRangeMap[uint32, uint32](100), + snRangeMap: utils.NewRangeMap[uint64, uint64](100), pliThrottle: int64(500 * time.Millisecond), logger: l.WithComponent(sutils.ComponentPub).WithComponent(sutils.ComponentSFU), } @@ -414,21 +415,21 @@ func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) { return } - extSeqNumber, isOutOfOrder := b.updateStreamState(&rtpPacket, arrivalTime) + flowState := b.updateStreamState(&rtpPacket, arrivalTime) b.processHeaderExtensions(&rtpPacket, arrivalTime) - if !isOutOfOrder && len(rtpPacket.Payload) == 0 { + if !flowState.IsOutOfOrder && len(rtpPacket.Payload) == 0 { // drop padding only in-order packet b.snRangeMap.IncValue(1) return } // add to RTX buffer using sequence number after accounting for dropped padding only packets - snAdjustment, err := b.snRangeMap.GetValue(extSeqNumber) + snAdjustment, err := b.snRangeMap.GetValue(flowState.ExtSequenceNumber) if err != nil { b.logger.Errorw("could not get sequence number adjustment", err) return } - rtpPacket.Header.SequenceNumber = uint16(extSeqNumber - snAdjustment) + rtpPacket.Header.SequenceNumber = uint16(flowState.ExtSequenceNumber - snAdjustment) _, err = b.bucket.AddPacketWithSequenceNumber(pkt, rtpPacket.Header.SequenceNumber) if err != nil { if err != bucket.ErrRTXPacket { @@ -441,7 +442,7 @@ func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) { b.doReports(arrivalTime) - ep := b.getExtPacket(&rtpPacket, extSeqNumber, arrivalTime) + ep := b.getExtPacket(&rtpPacket, arrivalTime, flowState) if ep == nil { return } @@ -499,7 +500,7 @@ func (b *Buffer) doFpsCalc(ep *ExtPacket) { } } -func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime time.Time) (uint32, bool) { +func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime time.Time) RTPFlowState { flowState := b.rtpStats.Update(&p.Header, len(p.Payload), int(p.PaddingSize), arrivalTime) if b.nacker != nil { @@ -513,7 +514,7 @@ func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime time.Time) (uint32 } } - return flowState.ExtSeqNumber, flowState.IsOutOfOrder + return flowState } func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime time.Time) { @@ -546,10 +547,11 @@ func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime time.Time) { } } -func (b *Buffer) getExtPacket(rtpPacket *rtp.Packet, extSeqNumber uint32, arrivalTime time.Time) *ExtPacket { +func (b *Buffer) getExtPacket(rtpPacket *rtp.Packet, arrivalTime time.Time, flowState RTPFlowState) *ExtPacket { ep := &ExtPacket{ Arrival: arrivalTime, - ExtSequenceNumber: extSeqNumber, + ExtSequenceNumber: flowState.ExtSequenceNumber, + ExtTimestamp: flowState.ExtTimestamp, Packet: rtpPacket, VideoLayer: VideoLayer{ Spatial: InvalidLayerSpatial, diff --git a/pkg/sfu/buffer/buffer_test.go b/pkg/sfu/buffer/buffer_test.go index 7f3edf125..7f685186c 100644 --- a/pkg/sfu/buffer/buffer_test.go +++ b/pkg/sfu/buffer/buffer_test.go @@ -213,7 +213,7 @@ func TestNewBuffer(t *testing.T) { _, _ = buff.Write(buf) } require.Equal(t, uint16(2), buff.rtpStats.sequenceNumber.GetHighest()) - require.Equal(t, uint32(65536+2), buff.rtpStats.sequenceNumber.GetExtendedHighest()) + require.Equal(t, uint64(65536+2), buff.rtpStats.sequenceNumber.GetExtendedHighest()) }) } } diff --git a/pkg/sfu/buffer/helpers.go b/pkg/sfu/buffer/helpers.go index 274b73e47..5be302959 100644 --- a/pkg/sfu/buffer/helpers.go +++ b/pkg/sfu/buffer/helpers.go @@ -52,7 +52,7 @@ type VP8 struct { I bool M bool - PictureID uint16 /* 8 or 16 bits, picture ID */ + PictureID uint16 /* 7 or 15 bits, picture ID */ L bool TL0PICIDX uint8 /* 8 bits temporal level zero index */ diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index a55c12a42..3687a0ac9 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -66,23 +66,24 @@ func (d driftResult) String() string { type RTPFlowState struct { HasLoss bool - LossStartInclusive uint32 - LossEndExclusive uint32 + LossStartInclusive uint64 + LossEndExclusive uint64 IsOutOfOrder bool - ExtSeqNumber uint32 + ExtSequenceNumber uint64 + ExtTimestamp uint64 } type IntervalStats struct { - packets uint32 + packets uint64 bytes uint64 headerBytes uint64 - packetsPadding uint32 + packetsPadding uint64 bytesPadding uint64 headerBytesPadding uint64 - packetsLost uint32 - packetsOutOfOrder uint32 + packetsLost uint64 + packetsOutOfOrder uint64 frames uint32 } @@ -111,12 +112,12 @@ type RTPDeltaInfo struct { type Snapshot struct { startTime time.Time - extStartSN uint32 - extStartSNOverridden uint32 - packetsDuplicate uint32 + extStartSN uint64 + extStartSNOverridden uint64 + packetsDuplicate uint64 bytesDuplicate uint64 headerBytesDuplicate uint64 - packetsLostOverridden uint32 + packetsLostOverridden uint64 nacks uint32 plis uint32 firs uint32 @@ -152,15 +153,14 @@ type RTPStats struct { lock sync.RWMutex - initialized bool - resyncOnNextPacket bool + initialized bool startTime time.Time endTime time.Time - sequenceNumber *utils.WrapAround[uint16, uint32] + sequenceNumber *utils.WrapAround[uint16, uint64] - extHighestSNOverridden uint32 + extHighestSNOverridden uint64 lastRRTime time.Time lastRR rtcp.ReceptionReport @@ -178,13 +178,13 @@ type RTPStats struct { headerBytesDuplicate uint64 bytesPadding uint64 headerBytesPadding uint64 - packetsDuplicate uint32 - packetsPadding uint32 + packetsDuplicate uint64 + packetsPadding uint64 - packetsOutOfOrder uint32 + packetsOutOfOrder uint64 - packetsLost uint32 - packetsLostOverridden uint32 + packetsLost uint64 + packetsLostOverridden uint64 frames uint32 @@ -229,7 +229,7 @@ func NewRTPStats(params RTPStatsParams) *RTPStats { return &RTPStats{ params: params, logger: params.Logger, - sequenceNumber: utils.NewWrapAround[uint16, uint32](), + sequenceNumber: utils.NewWrapAround[uint16, uint64](), timestamp: utils.NewWrapAround[uint32, uint64](), nextSnapshotId: FirstSnapshotId, snapshots: make(map[uint32]*Snapshot), @@ -245,7 +245,6 @@ func (r *RTPStats) Seed(from *RTPStats) { } r.initialized = from.initialized - r.resyncOnNextPacket = from.resyncOnNextPacket r.startTime = from.startTime // do not clone endTime as a non-zero endTime indicates an ended object @@ -375,17 +374,7 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa return } - if r.resyncOnNextPacket { - r.resyncOnNextPacket = false - - if r.initialized { - r.sequenceNumber.ResetHighest(rtph.SequenceNumber - 1) - r.timestamp.ResetHighest(rtph.Timestamp) - r.highestTime = packetTime - } - } - - var resSN utils.WrapAroundUpdateResult[uint32] + var resSN utils.WrapAroundUpdateResult[uint64] var resTS utils.WrapAroundUpdateResult[uint64] if !r.initialized { if payloadSize == 0 { @@ -417,8 +406,8 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa "rtp stream start", "startTime", r.startTime.String(), "firstTime", r.firstTime.String(), - "startSN", r.sequenceNumber.GetExtendedHighest(), - "startTS", r.timestamp.GetExtendedHighest(), + "startSN", r.sequenceNumber.GetExtendedStart(), + "startTS", r.timestamp.GetExtendedStart(), ) } else { resSN = r.sequenceNumber.Update(rtph.SequenceNumber) @@ -428,8 +417,8 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa hdrSize := uint64(rtph.MarshalSize()) pktSize := hdrSize + uint64(payloadSize+paddingSize) isDuplicate := false - gapSN := resSN.ExtendedVal - resSN.PreExtendedHighest - if gapSN == 0 || gapSN > (1<<31) { // duplicate OR out-of-order + gapSN := int64(resSN.ExtendedVal - resSN.PreExtendedHighest) + if gapSN <= 0 { // duplicate OR out-of-order if payloadSize == 0 { // do not start on a padding only packet if resTS.IsRestart { @@ -483,14 +472,15 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa } flowState.IsOutOfOrder = true - flowState.ExtSeqNumber = resSN.ExtendedVal + flowState.ExtSequenceNumber = resSN.ExtendedVal + flowState.ExtTimestamp = resTS.ExtendedVal } else { // in-order // update gap histogram r.updateGapHistogram(int(gapSN)) // update missing sequence numbers r.clearSnInfos(resSN.PreExtendedHighest+1, resSN.ExtendedVal) - r.packetsLost += gapSN - 1 + r.packetsLost += uint64(gapSN - 1) r.setSnInfo(resSN.ExtendedVal, resSN.PreExtendedHighest, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker, false) @@ -505,7 +495,8 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa flowState.LossStartInclusive = resSN.PreExtendedHighest + 1 flowState.LossEndExclusive = resSN.ExtendedVal } - flowState.ExtSeqNumber = resSN.ExtendedVal + flowState.ExtSequenceNumber = resSN.ExtendedVal + flowState.ExtTimestamp = resTS.ExtendedVal } if !isDuplicate { @@ -527,25 +518,30 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa return } -func (r *RTPStats) ResyncOnNextPacket() { +func (r *RTPStats) Resync(esn uint64, ets uint64, at time.Time) { r.lock.Lock() defer r.lock.Unlock() - r.resyncOnNextPacket = true + if !r.initialized { + return + } + r.sequenceNumber.ResetHighest(esn - 1) + r.timestamp.ResetHighest(ets) + r.highestTime = at } -func (r *RTPStats) getPacketsExpected() uint32 { +func (r *RTPStats) getPacketsExpected() uint64 { return r.sequenceNumber.GetExtendedHighest() - r.sequenceNumber.GetExtendedStart() + 1 } -func (r *RTPStats) GetTotalPacketsPrimary() uint32 { +func (r *RTPStats) GetTotalPacketsPrimary() uint64 { r.lock.RLock() defer r.lock.RUnlock() return r.getTotalPacketsPrimary() } -func (r *RTPStats) getTotalPacketsPrimary() uint32 { +func (r *RTPStats) getTotalPacketsPrimary() uint64 { packetsExpected := r.getPacketsExpected() if r.packetsLost > packetsExpected { // should not happen @@ -564,7 +560,19 @@ func (r *RTPStats) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt uint32 r.lock.Lock() defer r.lock.Unlock() - if !r.initialized || !r.endTime.IsZero() || !r.params.IsReceiverReportDriven || rr.LastSequenceNumber < r.sequenceNumber.GetExtendedHighest() { + if !r.initialized || !r.endTime.IsZero() || !r.params.IsReceiverReportDriven || uint64(rr.LastSequenceNumber) < r.sequenceNumber.GetExtendedHighest() { + // it is possible that the `LastSequenceNumber` in the receiver report is before the starting + // sequence number when dummy packets are used to trigger Pion's OnTrack path. + return + } + + extHighestSNOverridden := r.extHighestSNOverridden&0xFFFF_FFFF_0000_0000 + uint64(rr.LastSequenceNumber) + if !r.lastRRTime.IsZero() { + if (rr.LastSequenceNumber-r.lastRR.LastSequenceNumber) < (1<<31) && rr.LastSequenceNumber < r.lastRR.LastSequenceNumber { + extHighestSNOverridden += (1 << 32) + } + } + if extHighestSNOverridden < r.sequenceNumber.GetExtendedHighest() { // it is possible that the `LastSequenceNumber` in the receiver report is before the starting // sequence number when dummy packets are used to trigger Pion's OnTrack path. return @@ -582,9 +590,14 @@ func (r *RTPStats) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt uint32 } } - if r.lastRRTime.IsZero() || r.extHighestSNOverridden <= rr.LastSequenceNumber { - r.extHighestSNOverridden = rr.LastSequenceNumber - r.packetsLostOverridden = rr.TotalLost + if r.lastRRTime.IsZero() || r.extHighestSNOverridden <= extHighestSNOverridden { + r.extHighestSNOverridden = extHighestSNOverridden + + packetsLostOverridden := r.packetsLostOverridden&0xFFFF_FFFF_0000_0000 + uint64(rr.TotalLost) + if (rr.TotalLost-r.lastRR.TotalLost) < (1<<31) && rr.TotalLost < r.lastRR.TotalLost { + packetsLostOverridden += (1 << 32) + } + r.packetsLostOverridden = packetsLostOverridden if isRttChanged { r.rtt = rtt @@ -788,11 +801,11 @@ func (r *RTPStats) MaybeAdjustFirstPacketTime(srData *RTCPSenderReportData) { defer r.lock.Unlock() if srData != nil { - r.maybeAdjustFirstPacketTime(srData.RTPTimestamp) + r.maybeAdjustFirstPacketTime(srData.RTPTimestampExt) } } -func (r *RTPStats) maybeAdjustFirstPacketTime(ts uint32) { +func (r *RTPStats) maybeAdjustFirstPacketTime(ets uint64) { if time.Since(r.startTime) > firstPacketTimeAdjustWindow { return } @@ -803,7 +816,7 @@ func (r *RTPStats) maybeAdjustFirstPacketTime(ts uint32) { // abnormal delay (maybe due to pacing or maybe due to queuing // in some network element along the way), push back first time // to an earlier instance. - samplesDiff := int32(ts - uint32(r.timestamp.GetExtendedStart())) + samplesDiff := int64(ets - r.timestamp.GetExtendedStart()) if samplesDiff < 0 { // out-of-order, skip return @@ -819,7 +832,7 @@ func (r *RTPStats) maybeAdjustFirstPacketTime(ts uint32) { "before", r.firstTime.String(), "after", firstTime.String(), "adjustment", r.firstTime.Sub(firstTime), - "nowTS", ts, + "extNowTS", ets, "extStartTS", r.timestamp.GetExtendedStart(), ) if r.firstTime.Sub(firstTime) > firstPacketTimeAdjustThreshold { @@ -829,7 +842,7 @@ func (r *RTPStats) maybeAdjustFirstPacketTime(ts uint32) { "before", r.firstTime.String(), "after", firstTime.String(), "adjustment", r.firstTime.Sub(firstTime), - "nowTS", ts, + "extNowTS", ets, "extStartTS", r.timestamp.GetExtendedStart(), ) } else { @@ -860,16 +873,21 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) { cycles := uint64(0) if r.srNewest != nil { - cycles = r.srNewest.RTPTimestampExt & 0xFF_FF_FF_FF_00_00_00_00 + cycles = r.srNewest.RTPTimestampExt & 0xFFFF_FFFF_0000_0000 if (srData.RTPTimestamp-r.srNewest.RTPTimestamp) < (1<<31) && srData.RTPTimestamp < r.srNewest.RTPTimestamp { cycles += (1 << 32) } + + ntpDiffSinceLast := srData.NTPTimestamp.Time().Sub(r.srNewest.NTPTimestamp.Time()) + rtpDiff := uint64(ntpDiffSinceLast.Seconds() * float64(r.params.ClockRate)) + goArounds := rtpDiff / (1 << 32) + cycles += goArounds * (1 << 32) } srDataCopy := *srData srDataCopy.RTPTimestampExt = uint64(srDataCopy.RTPTimestamp) + cycles - r.maybeAdjustFirstPacketTime(srDataCopy.RTPTimestamp) + r.maybeAdjustFirstPacketTime(srDataCopy.RTPTimestampExt) // monitor and log RTP timestamp anomalies var ntpDiffSinceLast time.Duration @@ -1018,13 +1036,13 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, calculatedClockRate uint32) // monitor and log RTP timestamp anomalies var ntpDiffSinceLast time.Duration - var rtpDiffSinceLast uint32 + var rtpDiffSinceLast uint64 var departureDiffSinceLast time.Duration var expectedTimeDiffSinceLast float64 var isWarped bool if r.srNewest != nil { ntpDiffSinceLast = nowNTP.Time().Sub(r.srNewest.NTPTimestamp.Time()) - rtpDiffSinceLast = nowRTP - r.srNewest.RTPTimestamp + rtpDiffSinceLast = nowRTPExt - r.srNewest.RTPTimestampExt departureDiffSinceLast = now.Sub(r.srNewest.At) expectedTimeDiffSinceLast = float64(rtpDiffSinceLast) / float64(r.params.ClockRate) @@ -1069,7 +1087,7 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, calculatedClockRate uint32) SSRC: ssrc, NTPTime: uint64(nowNTP), RTPTime: nowRTP, - PacketCount: r.getTotalPacketsPrimary() + r.packetsDuplicate + r.packetsPadding, + PacketCount: uint32(r.getTotalPacketsPrimary() + r.packetsDuplicate + r.packetsPadding), OctetCount: uint32(r.bytes + r.bytesDuplicate + r.bytesPadding), } } @@ -1120,8 +1138,8 @@ func (r *RTPStats) SnapshotRtcpReceptionReport(ssrc uint32, proxyFracLost uint8, return &rtcp.ReceptionReport{ SSRC: ssrc, FractionLost: fracLost, - TotalLost: r.packetsLost, - LastSequenceNumber: now.extStartSN, + TotalLost: uint32(r.packetsLost), + LastSequenceNumber: uint32(now.extStartSN), Jitter: uint32(r.jitter), LastSenderReport: lastSR, Delay: dlsr, @@ -1162,16 +1180,16 @@ func (r *RTPStats) DeltaInfo(snapshotId uint32) *RTPDeltaInfo { return &RTPDeltaInfo{ StartTime: startTime, Duration: endTime.Sub(startTime), - Packets: packetsExpected - intervalStats.packetsPadding, + Packets: uint32(packetsExpected - intervalStats.packetsPadding), Bytes: intervalStats.bytes, HeaderBytes: intervalStats.headerBytes, - PacketsDuplicate: now.packetsDuplicate - then.packetsDuplicate, + PacketsDuplicate: uint32(now.packetsDuplicate - then.packetsDuplicate), BytesDuplicate: now.bytesDuplicate - then.bytesDuplicate, HeaderBytesDuplicate: now.headerBytesDuplicate - then.headerBytesDuplicate, - PacketsPadding: intervalStats.packetsPadding, + PacketsPadding: uint32(intervalStats.packetsPadding), BytesPadding: intervalStats.bytesPadding, HeaderBytesPadding: intervalStats.headerBytesPadding, - PacketsLost: intervalStats.packetsLost, + PacketsLost: uint32(intervalStats.packetsLost), Frames: intervalStats.frames, RttMax: then.maxRtt, JitterMax: then.maxJitter / float64(r.params.ClockRate) * 1e6, @@ -1244,18 +1262,18 @@ func (r *RTPStats) DeltaInfoOverridden(snapshotId uint32) *RTPDeltaInfo { return &RTPDeltaInfo{ StartTime: startTime, Duration: endTime.Sub(startTime), - Packets: packetsExpected - intervalStats.packetsPadding, + Packets: uint32(packetsExpected - intervalStats.packetsPadding), Bytes: intervalStats.bytes, HeaderBytes: intervalStats.headerBytes, - PacketsDuplicate: now.packetsDuplicate - then.packetsDuplicate, + PacketsDuplicate: uint32(now.packetsDuplicate - then.packetsDuplicate), BytesDuplicate: now.bytesDuplicate - then.bytesDuplicate, HeaderBytesDuplicate: now.headerBytesDuplicate - then.headerBytesDuplicate, - PacketsPadding: intervalStats.packetsPadding, + PacketsPadding: uint32(intervalStats.packetsPadding), BytesPadding: intervalStats.bytesPadding, HeaderBytesPadding: intervalStats.headerBytesPadding, - PacketsLost: packetsLost, - PacketsMissing: intervalStats.packetsLost, - PacketsOutOfOrder: intervalStats.packetsOutOfOrder, + PacketsLost: uint32(packetsLost), + PacketsMissing: uint32(intervalStats.packetsLost), + PacketsOutOfOrder: uint32(intervalStats.packetsOutOfOrder), Frames: intervalStats.frames, RttMax: then.maxRtt, JitterMax: maxJitterTime, @@ -1391,25 +1409,25 @@ func (r *RTPStats) ToProto() *livekit.RTPStats { StartTime: timestamppb.New(r.startTime), EndTime: timestamppb.New(endTime), Duration: elapsed, - Packets: packets, + Packets: uint32(packets), PacketRate: packetRate, Bytes: r.bytes, HeaderBytes: r.headerBytes, Bitrate: bitrate, - PacketsLost: packetsLost, + PacketsLost: uint32(packetsLost), PacketLossRate: packetLostRate, PacketLossPercentage: packetLostPercentage, - PacketsDuplicate: r.packetsDuplicate, + PacketsDuplicate: uint32(r.packetsDuplicate), PacketDuplicateRate: packetDuplicateRate, BytesDuplicate: r.bytesDuplicate, HeaderBytesDuplicate: r.headerBytesDuplicate, BitrateDuplicate: bitrateDuplicate, - PacketsPadding: r.packetsPadding, + PacketsPadding: uint32(r.packetsPadding), PacketPaddingRate: packetPaddingRate, BytesPadding: r.bytesPadding, HeaderBytesPadding: r.headerBytesPadding, BitratePadding: bitratePadding, - PacketsOutOfOrder: r.packetsOutOfOrder, + PacketsOutOfOrder: uint32(r.packetsOutOfOrder), Frames: r.frames, FrameRate: frameRate, KeyFrames: r.keyFrames, @@ -1456,7 +1474,7 @@ func (r *RTPStats) ToProto() *livekit.RTPStats { return p } -func (r *RTPStats) getExtHighestSNAdjusted() uint32 { +func (r *RTPStats) getExtHighestSNAdjusted() uint64 { if r.params.IsReceiverReportDriven && !r.lastRRTime.IsZero() { return r.extHighestSNOverridden } @@ -1464,7 +1482,7 @@ func (r *RTPStats) getExtHighestSNAdjusted() uint32 { return r.sequenceNumber.GetExtendedHighest() } -func (r *RTPStats) getPacketsLost() uint32 { +func (r *RTPStats) getPacketsLost() uint64 { if r.params.IsReceiverReportDriven && !r.lastRRTime.IsZero() { return r.packetsLostOverridden } @@ -1472,13 +1490,12 @@ func (r *RTPStats) getPacketsLost() uint32 { return r.packetsLost } -func (r *RTPStats) getSnInfoOutOfOrderPtr(esn uint32, ehsn uint32) int { - offset := esn - ehsn - if offset > 0 && offset < (1<<31) { +func (r *RTPStats) getSnInfoOutOfOrderPtr(esn uint64, ehsn uint64) int { + if int64(esn-ehsn) > 0 { return -1 // in-order, not expected, maybe too new } - offset = ehsn - esn + offset := ehsn - esn if int(offset) >= SnInfoSize { // too old, ignore return -1 @@ -1487,9 +1504,9 @@ func (r *RTPStats) getSnInfoOutOfOrderPtr(esn uint32, ehsn uint32) int { return (r.snInfoWritePtr - int(offset) - 1) & SnInfoMask } -func (r *RTPStats) setSnInfo(esn uint32, ehsn uint32, pktSize uint16, hdrSize uint16, payloadSize uint16, marker bool, isOutOfOrder bool) { +func (r *RTPStats) setSnInfo(esn uint64, ehsn uint64, pktSize uint16, hdrSize uint16, payloadSize uint16, marker bool, isOutOfOrder bool) { writePtr := 0 - ooo := (esn - ehsn) > (1 << 31) + ooo := int64(esn-ehsn) < 0 if !ooo { writePtr = r.snInfoWritePtr r.snInfoWritePtr = (writePtr + 1) & SnInfoMask @@ -1508,7 +1525,7 @@ func (r *RTPStats) setSnInfo(esn uint32, ehsn uint32, pktSize uint16, hdrSize ui snInfo.isOutOfOrder = isOutOfOrder } -func (r *RTPStats) clearSnInfos(extStartInclusive uint32, extEndExclusive uint32) { +func (r *RTPStats) clearSnInfos(extStartInclusive uint64, extEndExclusive uint64) { for esn := extStartInclusive; esn != extEndExclusive; esn++ { snInfo := &r.snInfos[r.snInfoWritePtr] snInfo.pktSize = 0 @@ -1520,7 +1537,7 @@ func (r *RTPStats) clearSnInfos(extStartInclusive uint32, extEndExclusive uint32 } } -func (r *RTPStats) isSnInfoLost(esn uint32, ehsn uint32) bool { +func (r *RTPStats) isSnInfoLost(esn uint64, ehsn uint64) bool { readPtr := r.getSnInfoOutOfOrderPtr(esn, ehsn) if readPtr < 0 { return false @@ -1530,9 +1547,9 @@ func (r *RTPStats) isSnInfoLost(esn uint32, ehsn uint32) bool { return snInfo.pktSize == 0 } -func (r *RTPStats) getIntervalStats(extStartInclusive uint32, extEndExclusive uint32) (intervalStats IntervalStats) { +func (r *RTPStats) getIntervalStats(extStartInclusive uint64, extEndExclusive uint64) (intervalStats IntervalStats) { packetsNotFound := uint32(0) - processESN := func(esn uint32, ehsn uint32) { + processESN := func(esn uint64, ehsn uint64) { readPtr := r.getSnInfoOutOfOrderPtr(esn, ehsn) if readPtr < 0 { packetsNotFound++ diff --git a/pkg/sfu/buffer/rtpstats_test.go b/pkg/sfu/buffer/rtpstats_test.go index 1a803578d..e59a21de2 100644 --- a/pkg/sfu/buffer/rtpstats_test.go +++ b/pkg/sfu/buffer/rtpstats_test.go @@ -114,8 +114,8 @@ func TestRTPStats_Update(t *testing.T) { require.Equal(t, sequenceNumber, uint16(r.sequenceNumber.GetExtendedHighest())) require.Equal(t, timestamp, r.timestamp.GetHighest()) require.Equal(t, timestamp, uint32(r.timestamp.GetExtendedHighest())) - require.Equal(t, uint32(1), r.packetsOutOfOrder) - require.Equal(t, uint32(0), r.packetsDuplicate) + require.Equal(t, uint64(1), r.packetsOutOfOrder) + require.Equal(t, uint64(0), r.packetsDuplicate) // duplicate packet = getPacket(sequenceNumber-10, timestamp-30000, 1000) @@ -125,8 +125,8 @@ func TestRTPStats_Update(t *testing.T) { require.Equal(t, sequenceNumber, uint16(r.sequenceNumber.GetExtendedHighest())) require.Equal(t, timestamp, r.timestamp.GetHighest()) require.Equal(t, timestamp, uint32(r.timestamp.GetExtendedHighest())) - require.Equal(t, uint32(2), r.packetsOutOfOrder) - require.Equal(t, uint32(1), r.packetsDuplicate) + require.Equal(t, uint64(2), r.packetsOutOfOrder) + require.Equal(t, uint64(1), r.packetsDuplicate) // loss sequenceNumber += 10 @@ -134,9 +134,9 @@ func TestRTPStats_Update(t *testing.T) { packet = getPacket(sequenceNumber, timestamp, 1000) flowState = r.Update(&packet.Header, len(packet.Payload), 0, time.Now()) require.True(t, flowState.HasLoss) - require.Equal(t, uint32(sequenceNumber-9), flowState.LossStartInclusive) - require.Equal(t, uint32(sequenceNumber), flowState.LossEndExclusive) - require.Equal(t, uint32(17), r.packetsLost) + require.Equal(t, uint64(sequenceNumber-9), flowState.LossStartInclusive) + require.Equal(t, uint64(sequenceNumber), flowState.LossEndExclusive) + require.Equal(t, uint64(17), r.packetsLost) // out-of-order should decrement number of lost packets packet = getPacket(sequenceNumber-15, timestamp-45000, 1000) @@ -146,11 +146,11 @@ func TestRTPStats_Update(t *testing.T) { require.Equal(t, sequenceNumber, uint16(r.sequenceNumber.GetExtendedHighest())) require.Equal(t, timestamp, r.timestamp.GetHighest()) require.Equal(t, timestamp, uint32(r.timestamp.GetExtendedHighest())) - require.Equal(t, uint32(3), r.packetsOutOfOrder) - require.Equal(t, uint32(1), r.packetsDuplicate) - require.Equal(t, uint32(16), r.packetsLost) + require.Equal(t, uint64(3), r.packetsOutOfOrder) + require.Equal(t, uint64(1), r.packetsDuplicate) + require.Equal(t, uint64(16), r.packetsLost) intervalStats := r.getIntervalStats(r.sequenceNumber.GetExtendedStart(), r.sequenceNumber.GetExtendedHighest()+1) - require.Equal(t, uint32(16), intervalStats.packetsLost) + require.Equal(t, uint64(16), intervalStats.packetsLost) r.Stop() } diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 04c93cc25..bc9cf2ca9 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -1738,7 +1738,7 @@ func (d *DownTrack) getDeltaStatsOverridden() map[uint32]*buffer.StreamStatsWith } func (d *DownTrack) GetNackStats() (totalPackets uint32, totalRepeatedNACKs uint32) { - totalPackets = d.rtpStats.GetTotalPacketsPrimary() + totalPackets = uint32(d.rtpStats.GetTotalPacketsPrimary()) totalRepeatedNACKs = d.totalRepeatedNACKs.Load() return } diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 894c0bfe9..dc9652989 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -158,8 +158,8 @@ type ForwarderState struct { Started bool ReferenceLayerSpatial int32 PreStartTime time.Time - FirstTS uint32 - RefTSOffset uint32 + ExtFirstTS uint64 + RefTSOffset uint64 RTP RTPMungerState Codec interface{} } @@ -170,11 +170,11 @@ func (f ForwarderState) String() string { case codecmunger.VP8State: codecString = codecState.String() } - return fmt.Sprintf("ForwarderState{started: %v, referenceLayerSpatial: %d, preStartTime: %s, firstTS: %d, refTSOffset: %d, rtp: %s, codec: %s}", + return fmt.Sprintf("ForwarderState{started: %v, referenceLayerSpatial: %d, preStartTime: %s, extFirstTS: %d, refTSOffset: %d, rtp: %s, codec: %s}", f.Started, f.ReferenceLayerSpatial, f.PreStartTime.String(), - f.FirstTS, + f.ExtFirstTS, f.RefTSOffset, f.RTP.String(), codecString, @@ -188,7 +188,7 @@ type Forwarder struct { codec webrtc.RTPCodecCapability kind webrtc.RTPCodecType logger logger.Logger - getReferenceLayerRTPTimestamp func(ts uint32, layer int32, referenceLayer int32) (uint32, error) + getReferenceLayerRTPTimestamp func(ets uint64, layer int32, referenceLayer int32) (uint64, error) getExpectedRTPTimestamp func(at time.Time) (uint64, error) muted bool @@ -197,10 +197,10 @@ type Forwarder struct { started bool preStartTime time.Time - firstTS uint32 + extFirstTS uint64 lastSSRC uint32 referenceLayerSpatial int32 - refTSOffset uint32 + refTSOffset uint64 provisional *VideoAllocationProvisional @@ -216,7 +216,7 @@ type Forwarder struct { func NewForwarder( kind webrtc.RTPCodecType, logger logger.Logger, - getReferenceLayerRTPTimestamp func(ts uint32, layer int32, referenceLayer int32) (uint32, error), + getReferenceLayerRTPTimestamp func(ets uint64, layer int32, referenceLayer int32) (uint64, error), getExpectedRTPTimestamp func(at time.Time) (uint64, error), ) *Forwarder { f := &Forwarder{ @@ -336,7 +336,7 @@ func (f *Forwarder) GetState() ForwarderState { Started: f.started, ReferenceLayerSpatial: f.referenceLayerSpatial, PreStartTime: f.preStartTime, - FirstTS: f.firstTS, + ExtFirstTS: f.extFirstTS, RefTSOffset: f.refTSOffset, RTP: f.rtpMunger.GetLast(), Codec: f.codecMunger.GetState(), @@ -357,7 +357,7 @@ func (f *Forwarder) SeedState(state ForwarderState) { f.started = true f.referenceLayerSpatial = state.ReferenceLayerSpatial f.preStartTime = state.PreStartTime - f.firstTS = state.FirstTS + f.extFirstTS = state.ExtFirstTS f.refTSOffset = state.RefTSOffset } @@ -1445,13 +1445,13 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) ( return nil, nil } - logTransition := func(message string, expectedTS, refTS, lastTS uint32, diffSeconds float64) { + logTransition := func(message string, extExpectedTS, extRefTS, extLastTS uint64, diffSeconds float64) { f.logger.Debugw( message, "layer", layer, - "expectedTS", expectedTS, - "refTS", refTS, - "lastTS", lastTS, + "extExpectedTS", extExpectedTS, + "extRefTS", extRefTS, + "extLastTS", extLastTS, "diffSeconds", math.Abs(diffSeconds), ) } @@ -1461,20 +1461,20 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) ( // timestamp offset on source change. // // There are three timestamps to consider here - // 1. lastTS -> timestamp of last sent packet - // 2. refTS -> timestamp of this packet (after munging) calculated using feed's RTCP sender report - // 3. expectedTS -> expected timestamp of this packet calculated based on elapsed time since first packet - // Ideally, refTS and expectedTS should be very close and lastTS should be before both of those. + // 1. extLastTS -> timestamp of last sent packet + // 2. extRefTS -> timestamp of this packet (after munging) calculated using feed's RTCP sender report + // 3. extExpectedTS -> expected timestamp of this packet calculated based on elapsed time since first packet + // Ideally, extRefTS and extExpectedTS should be very close and extLastTS should be before both of those. // But, cases like muting/unmuting, clock vagaries, pacing, etc. make them not satisfy those conditions always. rtpMungerState := f.rtpMunger.GetLast() - lastTS := rtpMungerState.LastTS - refTS := lastTS - expectedTS := lastTS + extLastTS := rtpMungerState.ExtLastTS + extRefTS := extLastTS + extExpectedTS := extLastTS switchingAt := time.Now() if f.getReferenceLayerRTPTimestamp != nil { - ts, err := f.getReferenceLayerRTPTimestamp(extPkt.Packet.Timestamp, layer, f.referenceLayerSpatial) + ets, err := f.getReferenceLayerRTPTimestamp(extPkt.ExtTimestamp, layer, f.referenceLayerSpatial) if err != nil { - // error out if refTS is not available. It can happen when there is no sender report + // error out if extRefTS is not available. It can happen when there is no sender report // for the layer being switched to. Can especially happen at the start of the track when layer switches are // potentially happening very quickly. Erroring out and waiting for a layer for which a sender report has been // received will calculate a better offset, but may result in initial adaptation to take a bit longer depending @@ -1482,35 +1482,35 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) ( return nil, err } - refTS = ts + extRefTS = ets } if f.getExpectedRTPTimestamp != nil { tsExt, err := f.getExpectedRTPTimestamp(switchingAt) if err == nil { - expectedTS = uint32(tsExt) + extExpectedTS = tsExt } else { - rtpDiff := uint32(0) + rtpDiff := uint64(0) if !f.preStartTime.IsZero() && f.refTSOffset == 0 { timeSinceFirst := time.Since(f.preStartTime) - rtpDiff = uint32(timeSinceFirst.Nanoseconds() * int64(f.codec.ClockRate) / 1e9) - f.refTSOffset = f.firstTS + rtpDiff - refTS + rtpDiff = uint64(timeSinceFirst.Nanoseconds() * int64(f.codec.ClockRate) / 1e9) + f.refTSOffset = f.extFirstTS + rtpDiff - extRefTS f.logger.Infow( "calculating refTSOffset", "preStartTime", f.preStartTime.String(), - "firstTS", f.firstTS, + "extFirstTS", f.extFirstTS, "timeSinceFirst", timeSinceFirst, "rtpDiff", rtpDiff, - "refTS", refTS, + "extRefTS", extRefTS, "refTSOffset", f.refTSOffset, ) } - expectedTS += rtpDiff + extExpectedTS += rtpDiff } } - refTS += f.refTSOffset + extRefTS += f.refTSOffset - var nextTS uint32 + var extNextTS uint64 if f.lastSSRC == 0 { // If resuming (e. g. on unmute), keep next timestamp close to expected timestamp. // @@ -1525,66 +1525,66 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) ( // increases and pacer starts sending at faster rate. // // But, the challenege is distinguishing between the two cases. As a compromise, the difference - // between expectedTS and refTS is thresholded. Difference below the threshold is treated as Case 2 + // between extExpectedTS and extRefTS is thresholded. Difference below the threshold is treated as Case 2 // and above as Case 1. // - // In the event of refTS > expectedTS, use refTS. - // Ideally, refTS should not be ahead of expectedTS, but expectedTS uses the first packet's + // In the event of extRefTS > extExpectedTS, use extRefTS. + // Ideally, extRefTS should not be ahead of extExpectedTS, but extExpectedTS uses the first packet's // wall clock time. So, if the first packet experienced abmormal latency, it is possible - // for refTS > expectedTS - diffSeconds := float64(int32(expectedTS-refTS)) / float64(f.codec.ClockRate) + // for extRefTS > extExpectedTS + diffSeconds := float64(int64(extExpectedTS-extRefTS)) / float64(f.codec.ClockRate) if diffSeconds >= 0.0 { if f.resumeBehindThreshold > 0 && diffSeconds > f.resumeBehindThreshold { - logTransition("resume, reference too far behind", expectedTS, refTS, lastTS, diffSeconds) - nextTS = expectedTS + logTransition("resume, reference too far behind", extExpectedTS, extRefTS, extLastTS, diffSeconds) + extNextTS = extExpectedTS } else { - nextTS = refTS + extNextTS = extRefTS } } else { if math.Abs(diffSeconds) > SwitchAheadThresholdSeconds { - logTransition("resume, reference too far ahead", expectedTS, refTS, lastTS, diffSeconds) + logTransition("resume, reference too far ahead", extExpectedTS, extRefTS, extLastTS, diffSeconds) } - nextTS = refTS + extNextTS = extRefTS } f.resumeBehindThreshold = 0.0 } else { - // switching between layers, check if refTS is too far behind the last sent - diffSeconds := float64(int32(refTS-lastTS)) / float64(f.codec.ClockRate) + // switching between layers, check if extRefTS is too far behind the last sent + diffSeconds := float64(int64(extRefTS-extLastTS)) / float64(f.codec.ClockRate) if diffSeconds < 0.0 { if math.Abs(diffSeconds) > LayerSwitchBehindThresholdSeconds { // this could be due to pacer trickling out this layer. Error out and wait for a more opportune time. // AVSYNC-TODO: Consider some forcing function to do the switch // (like "have waited for too long for layer switch, nothing available, switch to whatever is available" kind of condition). - logTransition("layer switch, reference too far behind", expectedTS, refTS, lastTS, diffSeconds) + logTransition("layer switch, reference too far behind", extExpectedTS, extRefTS, extLastTS, diffSeconds) return nil, errors.New("switch point too far behind") } // use a nominal increase to ensure that timestamp is always moving forward - logTransition("layer switch, reference is slightly behind", expectedTS, refTS, lastTS, diffSeconds) - nextTS = lastTS + 1 + logTransition("layer switch, reference is slightly behind", extExpectedTS, extRefTS, extLastTS, diffSeconds) + extNextTS = extLastTS + 1 } else { - diffSeconds = float64(int32(expectedTS-refTS)) / float64(f.codec.ClockRate) + diffSeconds = float64(int64(extExpectedTS-extRefTS)) / float64(f.codec.ClockRate) if diffSeconds < 0.0 && math.Abs(diffSeconds) > SwitchAheadThresholdSeconds { - logTransition("layer switch, reference too far ahead", expectedTS, refTS, lastTS, diffSeconds) + logTransition("layer switch, reference too far ahead", extExpectedTS, extRefTS, extLastTS, diffSeconds) } - nextTS = refTS + extNextTS = extRefTS } } - if nextTS-lastTS == 0 || nextTS-lastTS > (1<<31) { - f.logger.Debugw("next timestamp is before last, adjusting", "nextTS", nextTS, "lastTS", lastTS) + if int64(extNextTS-extLastTS) <= 0 { + f.logger.Debugw("next timestamp is before last, adjusting", "extNextTS", extNextTS, "extLastTS", extLastTS) // nominal increase - nextTS = lastTS + 1 + extNextTS = extLastTS + 1 } - snOffset := uint32(1) - tsOffset := nextTS - lastTS + snOffset := uint64(1) + tsOffset := extNextTS - extLastTS if !rtpMungerState.LastMarker { // If last forwarded packet is not end of frame, synthesise a break in sequence number. // Else, decoders could try to interpret consecutive packets as part of the same frame // and potentially cause video corruption. snOffset++ - if tsOffset < f.codec.ClockRate*33/1000 { - tsOffset = f.codec.ClockRate * 33 / 1000 + if tsOffset < uint64(f.codec.ClockRate*33/1000) { + tsOffset = uint64(f.codec.ClockRate * 33 / 1000) } } f.rtpMunger.UpdateSnTsOffsets(extPkt, snOffset, tsOffset) @@ -1594,13 +1594,13 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) ( "source switch", "switchingAt", switchingAt.String(), "layer", layer, - "lastTS", lastTS, - "refTS", refTS, + "extLastTS", extLastTS, + "extRefTS", extRefTS, "refTSOffset", f.refTSOffset, "referenceLayerSpatial", f.referenceLayerSpatial, - "expectedTS", expectedTS, - "nextTS", nextTS, - "tsJump", nextTS-lastTS, + "extExpectedTS", extExpectedTS, + "extNextTS", extNextTS, + "tsJump", tsOffset, "nextSN", rtpMungerState.ExtLastSN+snOffset, "snOffset", snOffset, ) @@ -1609,7 +1609,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) ( if snOffset != 1 { eof = &SnTs{ sequenceNumber: uint16(rtpMungerState.ExtLastSN + 1), - timestamp: rtpMungerState.LastTS, + timestamp: uint32(rtpMungerState.ExtLastTS), } } return eof, nil @@ -1760,7 +1760,7 @@ func (f *Forwarder) maybeStart() { } f.rtpMunger.SetLastSnTs(extPkt) - f.firstTS = extPkt.Packet.Timestamp + f.extFirstTS = uint64(extPkt.Packet.Timestamp) f.logger.Debugw( "starting with dummy forwarding", "sequenceNumber", extPkt.Packet.SequenceNumber, @@ -1797,18 +1797,18 @@ func (f *Forwarder) GetSnTsForBlankFrames(frameRate uint32, numPackets int) ([]S numPackets++ } - lastTS := f.rtpMunger.GetLast().LastTS - expectedTS := lastTS + extLastTS := f.rtpMunger.GetLast().ExtLastTS + extExpectedTS := extLastTS if f.getExpectedRTPTimestamp != nil { tsExt, err := f.getExpectedRTPTimestamp(time.Now()) if err == nil { - expectedTS = uint32(tsExt) + extExpectedTS = tsExt } } - if expectedTS-lastTS == 0 || expectedTS-lastTS > (1<<31) { - expectedTS = lastTS + 1 + if int64(extExpectedTS-extLastTS) <= 0 { + extExpectedTS = extLastTS + 1 } - snts, err := f.rtpMunger.UpdateAndGetPaddingSnTs(numPackets, f.codec.ClockRate, frameRate, frameEndNeeded, expectedTS) + snts, err := f.rtpMunger.UpdateAndGetPaddingSnTs(numPackets, f.codec.ClockRate, frameRate, frameEndNeeded, extExpectedTS) return snts, frameEndNeeded, err } diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 8aa5dcac3..c725dbb69 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -81,7 +81,7 @@ type TrackReceiver interface { GetTemporalLayerFpsForSpatial(layer int32) []float32 GetCalculatedClockRate(layer int32) uint32 - GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) + GetReferenceLayerRTPTimestamp(ets uint64, layer int32, referenceLayer int32) (uint64, error) } // WebRTCReceiver receives a media track @@ -777,8 +777,8 @@ func (w *WebRTCReceiver) GetCalculatedClockRate(layer int32) uint32 { return w.streamTrackerManager.GetCalculatedClockRate(layer) } -func (w *WebRTCReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) { - return w.streamTrackerManager.GetReferenceLayerRTPTimestamp(ts, layer, referenceLayer) +func (w *WebRTCReceiver) GetReferenceLayerRTPTimestamp(ets uint64, layer int32, referenceLayer int32) (uint64, error) { + return w.streamTrackerManager.GetReferenceLayerRTPTimestamp(ets, layer, referenceLayer) } // closes all track senders in parallel, returns when all are closed diff --git a/pkg/sfu/rtpmunger.go b/pkg/sfu/rtpmunger.go index a04140a6b..5aa9fa145 100644 --- a/pkg/sfu/rtpmunger.go +++ b/pkg/sfu/rtpmunger.go @@ -53,13 +53,13 @@ type SnTs struct { // ---------------------------------------------------------------------- type RTPMungerState struct { - ExtLastSN uint32 - LastTS uint32 + ExtLastSN uint64 + ExtLastTS uint64 LastMarker bool } func (r RTPMungerState) String() string { - return fmt.Sprintf("RTPMungerState{extLastSN: %d, lastTS: %d, lastMarker: %v)", r.ExtLastSN, r.LastTS, r.LastMarker) + return fmt.Sprintf("RTPMungerState{extLastSN: %d, extLastTS: %d, lastMarker: %v)", r.ExtLastSN, r.ExtLastTS, r.LastMarker) } // ---------------------------------------------------------------------- @@ -67,22 +67,22 @@ func (r RTPMungerState) String() string { type RTPMunger struct { logger logger.Logger - extHighestIncomingSN uint32 - snRangeMap *utils.RangeMap[uint32, uint32] + extHighestIncomingSN uint64 + snRangeMap *utils.RangeMap[uint64, uint64] - extLastSN uint32 - lastTS uint32 - tsOffset uint32 + extLastSN uint64 + extLastTS uint64 + tsOffset uint64 lastMarker bool - extRtxGateSn uint32 + extRtxGateSn uint64 isInRtxGateRegion bool } func NewRTPMunger(logger logger.Logger) *RTPMunger { return &RTPMunger{ logger: logger, - snRangeMap: utils.NewRangeMap[uint32, uint32](100), + snRangeMap: utils.NewRangeMap[uint64, uint64](100), } } @@ -92,7 +92,7 @@ func (r *RTPMunger) DebugInfo() map[string]interface{} { "ExtHighestIncomingSN": r.extHighestIncomingSN, "ExtLastSN": r.extLastSN, "SNOffset": snOffset, - "LastTS": r.lastTS, + "ExtLastTS": r.extLastTS, "TSOffset": r.tsOffset, "LastMarker": r.lastMarker, } @@ -101,27 +101,27 @@ func (r *RTPMunger) DebugInfo() map[string]interface{} { func (r *RTPMunger) GetLast() RTPMungerState { return RTPMungerState{ ExtLastSN: r.extLastSN, - LastTS: r.lastTS, + ExtLastTS: r.extLastTS, LastMarker: r.lastMarker, } } func (r *RTPMunger) SeedLast(state RTPMungerState) { r.extLastSN = state.ExtLastSN - r.lastTS = state.LastTS + r.extLastTS = state.ExtLastTS r.lastMarker = state.LastMarker } func (r *RTPMunger) SetLastSnTs(extPkt *buffer.ExtPacket) { r.extHighestIncomingSN = extPkt.ExtSequenceNumber - 1 r.extLastSN = extPkt.ExtSequenceNumber - r.lastTS = extPkt.Packet.Timestamp + r.extLastTS = extPkt.ExtTimestamp } -func (r *RTPMunger) UpdateSnTsOffsets(extPkt *buffer.ExtPacket, snAdjust uint32, tsAdjust uint32) { +func (r *RTPMunger) UpdateSnTsOffsets(extPkt *buffer.ExtPacket, snAdjust uint64, tsAdjust uint64) { r.extHighestIncomingSN = extPkt.ExtSequenceNumber - 1 r.snRangeMap.ClearAndResetValue(extPkt.ExtSequenceNumber - r.extLastSN - snAdjust) - r.tsOffset = extPkt.Packet.Timestamp - r.lastTS - tsAdjust + r.tsOffset = extPkt.ExtTimestamp - r.extLastTS - tsAdjust } func (r *RTPMunger) PacketDropped(extPkt *buffer.ExtPacket) { @@ -154,7 +154,7 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara return &TranslationParamsRTP{ snOrdering: SequenceNumberOrderingOutOfOrder, sequenceNumber: uint16(extPkt.ExtSequenceNumber - snOffset), - timestamp: extPkt.Packet.Timestamp - r.tsOffset, + timestamp: uint32(extPkt.ExtTimestamp - r.tsOffset), }, nil } @@ -189,10 +189,10 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara } extMungedSN := extPkt.ExtSequenceNumber - snOffset - mungedTS := extPkt.Packet.Timestamp - r.tsOffset + extMungedTS := extPkt.ExtTimestamp - r.tsOffset r.extLastSN = extMungedSN - r.lastTS = mungedTS + r.extLastTS = extMungedTS r.lastMarker = extPkt.Packet.Marker if extPkt.KeyFrame { @@ -207,7 +207,7 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara return &TranslationParamsRTP{ snOrdering: ordering, sequenceNumber: uint16(extMungedSN), - timestamp: mungedTS, + timestamp: uint32(extMungedTS), }, nil } @@ -226,7 +226,7 @@ func (r *RTPMunger) FilterRTX(nacks []uint16) []uint16 { return filtered } -func (r *RTPMunger) UpdateAndGetPaddingSnTs(num int, clockRate uint32, frameRate uint32, forceMarker bool, rtpTimestamp uint32) ([]SnTs, error) { +func (r *RTPMunger) UpdateAndGetPaddingSnTs(num int, clockRate uint32, frameRate uint32, forceMarker bool, extRtpTimestamp uint64) ([]SnTs, error) { useLastTSForFirst := false tsOffset := 0 if !r.lastMarker { @@ -240,32 +240,33 @@ func (r *RTPMunger) UpdateAndGetPaddingSnTs(num int, clockRate uint32, frameRate } extLastSN := r.extLastSN - lastTS := r.lastTS + extLastTS := r.extLastTS vals := make([]SnTs, num) for i := 0; i < num; i++ { extLastSN++ vals[i].sequenceNumber = uint16(extLastSN) + if frameRate != 0 { if useLastTSForFirst && i == 0 { - vals[i].timestamp = r.lastTS + vals[i].timestamp = uint32(r.extLastTS) } else { - ts := rtpTimestamp + ((uint32(i+1-tsOffset)*clockRate)+frameRate-1)/frameRate - if (ts-lastTS) == 0 || (ts-lastTS) > (1<<31) { - ts = lastTS + 1 - lastTS = ts + ets := extRtpTimestamp + uint64(((uint32(i+1-tsOffset)*clockRate)+frameRate-1)/frameRate) + if int64(ets-extLastTS) <= 0 { + ets = extLastTS + 1 } - vals[i].timestamp = ts + extLastTS = ets + vals[i].timestamp = uint32(ets) } } else { - vals[i].timestamp = r.lastTS + vals[i].timestamp = uint32(r.extLastTS) } } r.extLastSN = extLastSN - r.snRangeMap.DecValue(uint32(num)) + r.snRangeMap.DecValue(uint64(num)) - r.tsOffset -= vals[num-1].timestamp - r.lastTS - r.lastTS = vals[num-1].timestamp + r.tsOffset -= extLastTS - r.extLastTS + r.extLastTS = extLastTS if forceMarker { r.lastMarker = true diff --git a/pkg/sfu/rtpmunger_test.go b/pkg/sfu/rtpmunger_test.go index 9f4ea976d..2ade34e7b 100644 --- a/pkg/sfu/rtpmunger_test.go +++ b/pkg/sfu/rtpmunger_test.go @@ -41,13 +41,13 @@ func TestSetLastSnTs(t *testing.T) { require.NotNil(t, extPkt) r.SetLastSnTs(extPkt) - require.Equal(t, uint32(23332), r.extHighestIncomingSN) - require.Equal(t, uint32(23333), r.extLastSN) - require.Equal(t, uint32(0xabcdef), r.lastTS) + require.Equal(t, uint64(23332), r.extHighestIncomingSN) + require.Equal(t, uint64(23333), r.extLastSN) + require.Equal(t, uint64(0xabcdef), r.extLastTS) snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN) require.NoError(t, err) - require.Equal(t, uint32(0), snOffset) - require.Equal(t, uint32(0), r.tsOffset) + require.Equal(t, uint64(0), snOffset) + require.Equal(t, uint64(0), r.tsOffset) } func TestUpdateSnTsOffsets(t *testing.T) { @@ -68,13 +68,13 @@ func TestUpdateSnTsOffsets(t *testing.T) { } extPkt, _ = testutils.GetTestExtPacket(params) r.UpdateSnTsOffsets(extPkt, 1, 1) - require.Equal(t, uint32(33332), r.extHighestIncomingSN) - require.Equal(t, uint32(23333), r.extLastSN) - require.Equal(t, uint32(0xabcdef), r.lastTS) + require.Equal(t, uint64(33332), r.extHighestIncomingSN) + require.Equal(t, uint64(23333), r.extLastSN) + require.Equal(t, uint64(0xabcdef), r.extLastTS) snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN) require.NoError(t, err) - require.Equal(t, uint32(9999), snOffset) - require.Equal(t, uint32(0xffffffff), r.tsOffset) + require.Equal(t, uint64(9999), snOffset) + require.Equal(t, uint64(0xffff_ffff_ffff_ffff), r.tsOffset) } func TestPacketDropped(t *testing.T) { @@ -88,13 +88,13 @@ func TestPacketDropped(t *testing.T) { } extPkt, _ := testutils.GetTestExtPacket(params) r.SetLastSnTs(extPkt) - require.Equal(t, uint32(23332), r.extHighestIncomingSN) - require.Equal(t, uint32(23333), r.extLastSN) - require.Equal(t, uint32(0xabcdef), r.lastTS) + require.Equal(t, uint64(23332), r.extHighestIncomingSN) + require.Equal(t, uint64(23333), r.extLastSN) + require.Equal(t, uint64(0xabcdef), r.extLastTS) snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN) require.NoError(t, err) - require.Equal(t, uint32(0), snOffset) - require.Equal(t, uint32(0), r.tsOffset) + require.Equal(t, uint64(0), snOffset) + require.Equal(t, uint64(0), r.tsOffset) r.UpdateAndGetSnTs(extPkt) // update sequence number offset @@ -106,11 +106,11 @@ func TestPacketDropped(t *testing.T) { } extPkt, _ = testutils.GetTestExtPacket(params) r.PacketDropped(extPkt) - require.Equal(t, uint32(23333), r.extHighestIncomingSN) - require.Equal(t, uint32(23333), r.extLastSN) + require.Equal(t, uint64(23333), r.extHighestIncomingSN) + require.Equal(t, uint64(23333), r.extLastSN) snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN) require.NoError(t, err) - require.Equal(t, uint32(0), snOffset) + require.Equal(t, uint64(0), snOffset) // drop a head packet and check offset increases params = &testutils.TestExtPacketParams{ @@ -124,10 +124,10 @@ func TestPacketDropped(t *testing.T) { r.UpdateAndGetSnTs(extPkt) // update sequence number offset r.PacketDropped(extPkt) - require.Equal(t, uint32(44443), r.extLastSN) + require.Equal(t, uint64(44443), r.extLastSN) snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN) require.NoError(t, err) - require.Equal(t, uint32(1), snOffset) + require.Equal(t, uint64(1), snOffset) params = &testutils.TestExtPacketParams{ SequenceNumber: 44445, @@ -138,10 +138,10 @@ func TestPacketDropped(t *testing.T) { extPkt, _ = testutils.GetTestExtPacket(params) r.UpdateAndGetSnTs(extPkt) // update sequence number offset - require.Equal(t, r.extLastSN, uint32(44444)) + require.Equal(t, r.extLastSN, uint64(44444)) snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN) require.NoError(t, err) - require.Equal(t, uint32(1), snOffset) + require.Equal(t, uint64(1), snOffset) } func TestOutOfOrderSequenceNumber(t *testing.T) { @@ -243,11 +243,11 @@ func TestPaddingOnlyPacket(t *testing.T) { require.Error(t, err) require.ErrorIs(t, err, ErrPaddingOnlyPacket) require.Equal(t, tpExpected, *tp) - require.Equal(t, uint32(23333), r.extHighestIncomingSN) - require.Equal(t, uint32(23333), r.extLastSN) + require.Equal(t, uint64(23333), r.extHighestIncomingSN) + require.Equal(t, uint64(23333), r.extLastSN) snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN) require.NoError(t, err) - require.Equal(t, uint32(1), snOffset) + require.Equal(t, uint64(1), snOffset) // padding only packet with a gap should not report an error params = &testutils.TestExtPacketParams{ @@ -266,11 +266,11 @@ func TestPaddingOnlyPacket(t *testing.T) { tp, err = r.UpdateAndGetSnTs(extPkt) require.NoError(t, err) require.Equal(t, tpExpected, *tp) - require.Equal(t, uint32(23335), r.extHighestIncomingSN) - require.Equal(t, uint32(23334), r.extLastSN) + require.Equal(t, uint64(23335), r.extHighestIncomingSN) + require.Equal(t, uint64(23334), r.extLastSN) snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN) require.NoError(t, err) - require.Equal(t, uint32(1), snOffset) + require.Equal(t, uint64(1), snOffset) } func TestGapInSequenceNumber(t *testing.T) { @@ -307,19 +307,19 @@ func TestGapInSequenceNumber(t *testing.T) { tp, err := r.UpdateAndGetSnTs(extPkt) require.NoError(t, err) require.Equal(t, tpExpected, *tp) - require.Equal(t, uint32(65536+1), r.extHighestIncomingSN) - require.Equal(t, uint32(65536+1), r.extLastSN) + require.Equal(t, uint64(65536+1), r.extHighestIncomingSN) + require.Equal(t, uint64(65536+1), r.extLastSN) snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN) require.NoError(t, err) - require.Equal(t, uint32(0), snOffset) + require.Equal(t, uint64(0), snOffset) // ensure missing sequence numbers got recorded in cache // last received, three missing in between and current received should all be in cache - for i := uint32(65534); i != 65536+1; i++ { + for i := uint64(65534); i != 65536+1; i++ { offset, err := r.snRangeMap.GetValue(i) require.NoError(t, err) - require.Equal(t, uint32(0), offset) + require.Equal(t, uint64(0), offset) } // a padding only packet should be dropped @@ -338,11 +338,11 @@ func TestGapInSequenceNumber(t *testing.T) { tp, err = r.UpdateAndGetSnTs(extPkt) require.ErrorIs(t, err, ErrPaddingOnlyPacket) require.Equal(t, tpExpected, *tp) - require.Equal(t, uint32(65536+2), r.extHighestIncomingSN) - require.Equal(t, uint32(65536+1), r.extLastSN) + require.Equal(t, uint64(65536+2), r.extHighestIncomingSN) + require.Equal(t, uint64(65536+1), r.extLastSN) snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN) require.NoError(t, err) - require.Equal(t, uint32(1), snOffset) + require.Equal(t, uint64(1), snOffset) // a packet with a gap should be adding to missing cache params = &testutils.TestExtPacketParams{ @@ -363,11 +363,11 @@ func TestGapInSequenceNumber(t *testing.T) { tp, err = r.UpdateAndGetSnTs(extPkt) require.NoError(t, err) require.Equal(t, tpExpected, *tp) - require.Equal(t, uint32(65536+4), r.extHighestIncomingSN) - require.Equal(t, uint32(65536+3), r.extLastSN) + require.Equal(t, uint64(65536+4), r.extHighestIncomingSN) + require.Equal(t, uint64(65536+3), r.extLastSN) snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN) require.NoError(t, err) - require.Equal(t, uint32(1), snOffset) + require.Equal(t, uint64(1), snOffset) // another contiguous padding only packet should be dropped params = &testutils.TestExtPacketParams{ @@ -385,11 +385,11 @@ func TestGapInSequenceNumber(t *testing.T) { tp, err = r.UpdateAndGetSnTs(extPkt) require.ErrorIs(t, err, ErrPaddingOnlyPacket) require.Equal(t, tpExpected, *tp) - require.Equal(t, uint32(65536+5), r.extHighestIncomingSN) - require.Equal(t, uint32(65536+3), r.extLastSN) + require.Equal(t, uint64(65536+5), r.extHighestIncomingSN) + require.Equal(t, uint64(65536+3), r.extLastSN) snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN) require.NoError(t, err) - require.Equal(t, uint32(2), snOffset) + require.Equal(t, uint64(2), snOffset) // a packet with a gap should be adding to missing cache params = &testutils.TestExtPacketParams{ @@ -410,11 +410,11 @@ func TestGapInSequenceNumber(t *testing.T) { tp, err = r.UpdateAndGetSnTs(extPkt) require.NoError(t, err) require.Equal(t, tpExpected, *tp) - require.Equal(t, uint32(65536+7), r.extHighestIncomingSN) - require.Equal(t, uint32(65536+5), r.extLastSN) + require.Equal(t, uint64(65536+7), r.extHighestIncomingSN) + require.Equal(t, uint64(65536+5), r.extLastSN) snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN) require.NoError(t, err) - require.Equal(t, uint32(2), snOffset) + require.Equal(t, uint64(2), snOffset) // check the missing packets params = &testutils.TestExtPacketParams{ @@ -434,11 +434,11 @@ func TestGapInSequenceNumber(t *testing.T) { tp, err = r.UpdateAndGetSnTs(extPkt) require.NoError(t, err) require.Equal(t, tpExpected, *tp) - require.Equal(t, uint32(65536+7), r.extHighestIncomingSN) - require.Equal(t, uint32(65536+5), r.extLastSN) + require.Equal(t, uint64(65536+7), r.extHighestIncomingSN) + require.Equal(t, uint64(65536+5), r.extLastSN) snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN) require.NoError(t, err) - require.Equal(t, uint32(2), snOffset) + require.Equal(t, uint64(2), snOffset) params = &testutils.TestExtPacketParams{ SequenceNumber: 3, @@ -457,11 +457,11 @@ func TestGapInSequenceNumber(t *testing.T) { tp, err = r.UpdateAndGetSnTs(extPkt) require.NoError(t, err) require.Equal(t, tpExpected, *tp) - require.Equal(t, uint32(65536+7), r.extHighestIncomingSN) - require.Equal(t, uint32(65536+5), r.extLastSN) + require.Equal(t, uint64(65536+7), r.extHighestIncomingSN) + require.Equal(t, uint64(65536+5), r.extLastSN) snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN) require.NoError(t, err) - require.Equal(t, uint32(2), snOffset) + require.Equal(t, uint64(2), snOffset) } func TestUpdateAndGetPaddingSnTs(t *testing.T) { @@ -493,7 +493,7 @@ func TestUpdateAndGetPaddingSnTs(t *testing.T) { timestamp: params.Timestamp + ((uint32(i)*clockRate)+frameRate-1)/frameRate, } } - snts, err := r.UpdateAndGetPaddingSnTs(numPadding, clockRate, frameRate, true, params.Timestamp) + snts, err := r.UpdateAndGetPaddingSnTs(numPadding, clockRate, frameRate, true, extPkt.ExtTimestamp) require.NoError(t, err) require.Equal(t, sntsExpected, snts) @@ -504,7 +504,7 @@ func TestUpdateAndGetPaddingSnTs(t *testing.T) { timestamp: snts[len(snts)-1].timestamp + ((uint32(i+1)*clockRate)+frameRate-1)/frameRate, } } - snts, err = r.UpdateAndGetPaddingSnTs(numPadding, clockRate, frameRate, false, snts[len(snts)-1].timestamp) + snts, err = r.UpdateAndGetPaddingSnTs(numPadding, clockRate, frameRate, false, uint64(snts[len(snts)-1].timestamp)) require.NoError(t, err) require.Equal(t, sntsExpected, snts) } diff --git a/pkg/sfu/streamtrackermanager.go b/pkg/sfu/streamtrackermanager.go index 9aa82f91e..19fd266e9 100644 --- a/pkg/sfu/streamtrackermanager.go +++ b/pkg/sfu/streamtrackermanager.go @@ -76,7 +76,7 @@ type StreamTrackerManager struct { senderReportMu sync.RWMutex senderReports [buffer.DefaultMaxLayerSpatial + 1]endsSenderReport - layerOffsets [buffer.DefaultMaxLayerSpatial + 1][buffer.DefaultMaxLayerSpatial + 1]uint32 + layerOffsets [buffer.DefaultMaxLayerSpatial + 1][buffer.DefaultMaxLayerSpatial + 1]uint64 closed core.Fuse @@ -563,10 +563,10 @@ func (s *StreamTrackerManager) updateLayerOffsetLocked(ref, other int32) { rtpDiff := ntpDiff.Nanoseconds() * int64(s.clockRate) / 1e9 // calculate other layer's time stamp at the same time as ref layer's NTP time - normalizedOtherTS := srOther.RTPTimestamp + uint32(rtpDiff) + normalizedOtherTS := srOther.RTPTimestampExt + uint64(rtpDiff) // now both layers' time stamp refer to the same NTP time and the diff is the offset between the layers - offset := srRef.RTPTimestamp - normalizedOtherTS + offset := srRef.RTPTimestampExt - normalizedOtherTS // use minimal offset to indicate value availability in the extremely unlikely case of // both layers using the same timestamp @@ -643,7 +643,7 @@ func (s *StreamTrackerManager) GetCalculatedClockRate(layer int32) uint32 { return uint32(float64(rdsf) / tsf.Seconds()) } -func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) { +func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ets uint64, layer int32, referenceLayer int32) (uint64, error) { s.senderReportMu.RLock() defer s.senderReportMu.RUnlock() @@ -655,7 +655,7 @@ func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ts uint32, layer in return 0, fmt.Errorf("offset unavailable, target: %d, reference: %d", layer, referenceLayer) } - return ts + s.layerOffsets[referenceLayer][layer], nil + return ets + s.layerOffsets[referenceLayer][layer], nil } func (s *StreamTrackerManager) GetMaxTemporalLayerSeen() int32 { diff --git a/pkg/sfu/testutils/data.go b/pkg/sfu/testutils/data.go index c30757b3d..1bebc420b 100644 --- a/pkg/sfu/testutils/data.go +++ b/pkg/sfu/testutils/data.go @@ -32,6 +32,7 @@ type TestExtPacketParams struct { SequenceNumber uint16 SNCycles int Timestamp uint32 + TSCycles int SSRC uint32 PayloadSize int PaddingSize byte @@ -63,7 +64,8 @@ func GetTestExtPacket(params *TestExtPacketParams) (*buffer.ExtPacket, error) { ep := &buffer.ExtPacket{ VideoLayer: params.VideoLayer, - ExtSequenceNumber: uint32(params.SNCycles<<16) + uint32(params.SequenceNumber), + ExtSequenceNumber: uint64(params.SNCycles<<16) + uint64(params.SequenceNumber), + ExtTimestamp: uint64(params.TSCycles<<32) + uint64(params.Timestamp), Arrival: params.ArrivalTime, Packet: &packet, KeyFrame: params.IsKeyFrame, diff --git a/pkg/sfu/utils/rangemap.go b/pkg/sfu/utils/rangemap.go index 3654791f0..acaa4a320 100644 --- a/pkg/sfu/utils/rangemap.go +++ b/pkg/sfu/utils/rangemap.go @@ -30,11 +30,11 @@ var ( ) type rangeType interface { - uint32 + uint32 | uint64 } type valueType interface { - uint32 + uint32 | uint64 } type rangeVal[RT rangeType, VT valueType] struct { diff --git a/pkg/sfu/utils/wraparound.go b/pkg/sfu/utils/wraparound.go index 8845f2c19..5dd2b1e93 100644 --- a/pkg/sfu/utils/wraparound.go +++ b/pkg/sfu/utils/wraparound.go @@ -32,7 +32,7 @@ type WrapAround[T number, ET extendedNumber] struct { initialized bool start T highest T - cycles int + cycles ET } func NewWrapAround[T number, ET extendedNumber]() *WrapAround[T, ET] { @@ -78,7 +78,7 @@ func (w *WrapAround[T, ET]) Update(val T) (result WrapAroundUpdateResult[ET]) { // in-order if val < w.highest { - w.cycles++ + w.cycles += w.fullRange } w.highest = val @@ -88,13 +88,14 @@ func (w *WrapAround[T, ET]) Update(val T) (result WrapAroundUpdateResult[ET]) { func (w *WrapAround[T, ET]) RollbackRestart(ev ET) { if w.isWrapBack(w.start, T(ev)) { - w.cycles-- + w.cycles -= w.fullRange } w.start = T(ev) } -func (w *WrapAround[T, ET]) ResetHighest(val T) { - w.highest = val +func (w *WrapAround[T, ET]) ResetHighest(ev ET) { + w.highest = T(ev) + w.cycles = ev & ^(w.fullRange - 1) } func (w *WrapAround[T, ET]) GetStart() T { @@ -122,7 +123,7 @@ func (w *WrapAround[T, ET]) maybeAdjustStart(val T) (isRestart bool, preExtended totalNum := w.GetExtendedHighest() - w.GetExtendedStart() + 1 if totalNum > (w.fullRange >> 1) { if w.isWrapBack(val, w.highest) { - cycles-- + cycles -= w.fullRange } extendedVal = w.getExtendedHighest(cycles, val) return @@ -134,13 +135,13 @@ func (w *WrapAround[T, ET]) maybeAdjustStart(val T) (isRestart bool, preExtended preExtendedStart = w.GetExtendedStart() if w.isWrapBack(val, w.highest) { - w.cycles = 1 + w.cycles = w.fullRange cycles = 0 } w.start = val } else { if w.isWrapBack(val, w.highest) { - cycles-- + cycles -= w.fullRange } } extendedVal = w.getExtendedHighest(cycles, val) @@ -151,6 +152,6 @@ func (w *WrapAround[T, ET]) isWrapBack(earlier T, later T) bool { return ET(later) < (w.fullRange>>1) && ET(earlier) >= (w.fullRange>>1) } -func (w *WrapAround[T, ET]) getExtendedHighest(cycles int, val T) ET { - return ET(cycles)*w.fullRange + ET(val) +func (w *WrapAround[T, ET]) getExtendedHighest(cycles ET, val T) ET { + return cycles + ET(val) } diff --git a/pkg/sfu/utils/wraparound_test.go b/pkg/sfu/utils/wraparound_test.go index c01729108..9b69f105f 100644 --- a/pkg/sfu/utils/wraparound_test.go +++ b/pkg/sfu/utils/wraparound_test.go @@ -194,26 +194,26 @@ func TestWrapAroundUint16(t *testing.T) { } } -func TestWrapAroundUint16RollbackRestart(t *testing.T) { - w := NewWrapAround[uint16, uint32]() +func TestWrapAroundUint16RollbackRestartAndResetHighest(t *testing.T) { + w := NewWrapAround[uint16, uint64]() // initialize w.Update(23) require.Equal(t, uint16(23), w.GetStart()) - require.Equal(t, uint32(23), w.GetExtendedStart()) + require.Equal(t, uint64(23), w.GetExtendedStart()) require.Equal(t, uint16(23), w.GetHighest()) - require.Equal(t, uint32(23), w.GetExtendedHighest()) + require.Equal(t, uint64(23), w.GetExtendedHighest()) // an in-order update w.Update(25) require.Equal(t, uint16(23), w.GetStart()) - require.Equal(t, uint32(23), w.GetExtendedStart()) + require.Equal(t, uint64(23), w.GetExtendedStart()) require.Equal(t, uint16(25), w.GetHighest()) - require.Equal(t, uint32(25), w.GetExtendedHighest()) + require.Equal(t, uint64(25), w.GetExtendedHighest()) // force restart without wrap res := w.Update(12) - expectedResult := WrapAroundUpdateResult[uint32]{ + expectedResult := WrapAroundUpdateResult[uint64]{ IsRestart: true, PreExtendedStart: 23, PreExtendedHighest: 25, @@ -221,20 +221,20 @@ func TestWrapAroundUint16RollbackRestart(t *testing.T) { } require.Equal(t, expectedResult, res) require.Equal(t, uint16(12), w.GetStart()) - require.Equal(t, uint32(12), w.GetExtendedStart()) + require.Equal(t, uint64(12), w.GetExtendedStart()) require.Equal(t, uint16(25), w.GetHighest()) - require.Equal(t, uint32(25), w.GetExtendedHighest()) + require.Equal(t, uint64(25), w.GetExtendedHighest()) // roll back restart w.RollbackRestart(res.PreExtendedStart) require.Equal(t, uint16(23), w.GetStart()) - require.Equal(t, uint32(23), w.GetExtendedStart()) + require.Equal(t, uint64(23), w.GetExtendedStart()) require.Equal(t, uint16(25), w.GetHighest()) - require.Equal(t, uint32(25), w.GetExtendedHighest()) + require.Equal(t, uint64(25), w.GetExtendedHighest()) // force restart with wrap res = w.Update(65533) - expectedResult = WrapAroundUpdateResult[uint32]{ + expectedResult = WrapAroundUpdateResult[uint64]{ IsRestart: true, PreExtendedStart: 23, PreExtendedHighest: 25, @@ -242,16 +242,29 @@ func TestWrapAroundUint16RollbackRestart(t *testing.T) { } require.Equal(t, expectedResult, res) require.Equal(t, uint16(65533), w.GetStart()) - require.Equal(t, uint32(65533), w.GetExtendedStart()) + require.Equal(t, uint64(65533), w.GetExtendedStart()) require.Equal(t, uint16(25), w.GetHighest()) - require.Equal(t, uint32(65536+25), w.GetExtendedHighest()) + require.Equal(t, uint64(65536+25), w.GetExtendedHighest()) // roll back restart w.RollbackRestart(res.PreExtendedStart) require.Equal(t, uint16(23), w.GetStart()) - require.Equal(t, uint32(23), w.GetExtendedStart()) + require.Equal(t, uint64(23), w.GetExtendedStart()) require.Equal(t, uint16(25), w.GetHighest()) - require.Equal(t, uint32(25), w.GetExtendedHighest()) + require.Equal(t, uint64(25), w.GetExtendedHighest()) + + // reset highest + w.ResetHighest(0x1234) + require.Equal(t, uint16(23), w.GetStart()) + require.Equal(t, uint64(23), w.GetExtendedStart()) + require.Equal(t, uint16(0x1234), w.GetHighest()) + require.Equal(t, uint64(0x1234), w.GetExtendedHighest()) + + w.ResetHighest(0x7f1234) + require.Equal(t, uint16(23), w.GetStart()) + require.Equal(t, uint64(23), w.GetExtendedStart()) + require.Equal(t, uint16(0x1234), w.GetHighest()) + require.Equal(t, uint64(0x7f1234), w.GetExtendedHighest()) } func TestWrapAroundUint32(t *testing.T) {