Merge remote-tracking branch 'origin/master' into raja_1833

This commit is contained in:
boks1971
2023-08-27 21:51:55 +05:30
16 changed files with 345 additions and 309 deletions
+2 -2
View File
@@ -317,9 +317,9 @@ func (d *DummyReceiver) GetCalculatedClockRate(layer int32) uint32 {
return 0
}
func (d *DummyReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) {
func (d *DummyReceiver) GetReferenceLayerRTPTimestamp(ets uint64, layer int32, referenceLayer int32) (uint64, error) {
if r, ok := d.receiver.Load().(sfu.TrackReceiver); ok {
return r.GetReferenceLayerRTPTimestamp(ts, layer, referenceLayer)
return r.GetReferenceLayerRTPTimestamp(ets, layer, referenceLayer)
}
return 0, errors.New("receiver not available")
}
+14 -12
View File
@@ -54,7 +54,8 @@ type pendingPacket struct {
type ExtPacket struct {
VideoLayer
Arrival time.Time
ExtSequenceNumber uint32
ExtSequenceNumber uint64
ExtTimestamp uint64
Packet *rtp.Packet
Payload interface{}
KeyFrame bool
@@ -82,7 +83,7 @@ type Buffer struct {
closed atomic.Bool
mime string
snRangeMap *utils.RangeMap[uint32, uint32]
snRangeMap *utils.RangeMap[uint64, uint64]
latestTSForAudioLevelInitialized bool
latestTSForAudioLevel uint32
@@ -127,7 +128,7 @@ func NewBuffer(ssrc uint32, vp, ap *sync.Pool) *Buffer {
mediaSSRC: ssrc,
videoPool: vp,
audioPool: ap,
snRangeMap: utils.NewRangeMap[uint32, uint32](100),
snRangeMap: utils.NewRangeMap[uint64, uint64](100),
pliThrottle: int64(500 * time.Millisecond),
logger: l.WithComponent(sutils.ComponentPub).WithComponent(sutils.ComponentSFU),
}
@@ -414,21 +415,21 @@ func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) {
return
}
extSeqNumber, isOutOfOrder := b.updateStreamState(&rtpPacket, arrivalTime)
flowState := b.updateStreamState(&rtpPacket, arrivalTime)
b.processHeaderExtensions(&rtpPacket, arrivalTime)
if !isOutOfOrder && len(rtpPacket.Payload) == 0 {
if !flowState.IsOutOfOrder && len(rtpPacket.Payload) == 0 {
// drop padding only in-order packet
b.snRangeMap.IncValue(1)
return
}
// add to RTX buffer using sequence number after accounting for dropped padding only packets
snAdjustment, err := b.snRangeMap.GetValue(extSeqNumber)
snAdjustment, err := b.snRangeMap.GetValue(flowState.ExtSequenceNumber)
if err != nil {
b.logger.Errorw("could not get sequence number adjustment", err)
return
}
rtpPacket.Header.SequenceNumber = uint16(extSeqNumber - snAdjustment)
rtpPacket.Header.SequenceNumber = uint16(flowState.ExtSequenceNumber - snAdjustment)
_, err = b.bucket.AddPacketWithSequenceNumber(pkt, rtpPacket.Header.SequenceNumber)
if err != nil {
if err != bucket.ErrRTXPacket {
@@ -441,7 +442,7 @@ func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) {
b.doReports(arrivalTime)
ep := b.getExtPacket(&rtpPacket, extSeqNumber, arrivalTime)
ep := b.getExtPacket(&rtpPacket, arrivalTime, flowState)
if ep == nil {
return
}
@@ -499,7 +500,7 @@ func (b *Buffer) doFpsCalc(ep *ExtPacket) {
}
}
func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime time.Time) (uint32, bool) {
func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime time.Time) RTPFlowState {
flowState := b.rtpStats.Update(&p.Header, len(p.Payload), int(p.PaddingSize), arrivalTime)
if b.nacker != nil {
@@ -513,7 +514,7 @@ func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime time.Time) (uint32
}
}
return flowState.ExtSeqNumber, flowState.IsOutOfOrder
return flowState
}
func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime time.Time) {
@@ -546,10 +547,11 @@ func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime time.Time) {
}
}
func (b *Buffer) getExtPacket(rtpPacket *rtp.Packet, extSeqNumber uint32, arrivalTime time.Time) *ExtPacket {
func (b *Buffer) getExtPacket(rtpPacket *rtp.Packet, arrivalTime time.Time, flowState RTPFlowState) *ExtPacket {
ep := &ExtPacket{
Arrival: arrivalTime,
ExtSequenceNumber: extSeqNumber,
ExtSequenceNumber: flowState.ExtSequenceNumber,
ExtTimestamp: flowState.ExtTimestamp,
Packet: rtpPacket,
VideoLayer: VideoLayer{
Spatial: InvalidLayerSpatial,
+1 -1
View File
@@ -213,7 +213,7 @@ func TestNewBuffer(t *testing.T) {
_, _ = buff.Write(buf)
}
require.Equal(t, uint16(2), buff.rtpStats.sequenceNumber.GetHighest())
require.Equal(t, uint32(65536+2), buff.rtpStats.sequenceNumber.GetExtendedHighest())
require.Equal(t, uint64(65536+2), buff.rtpStats.sequenceNumber.GetExtendedHighest())
})
}
}
+1 -1
View File
@@ -52,7 +52,7 @@ type VP8 struct {
I bool
M bool
PictureID uint16 /* 8 or 16 bits, picture ID */
PictureID uint16 /* 7 or 15 bits, picture ID */
L bool
TL0PICIDX uint8 /* 8 bits temporal level zero index */
+105 -88
View File
@@ -66,23 +66,24 @@ func (d driftResult) String() string {
type RTPFlowState struct {
HasLoss bool
LossStartInclusive uint32
LossEndExclusive uint32
LossStartInclusive uint64
LossEndExclusive uint64
IsOutOfOrder bool
ExtSeqNumber uint32
ExtSequenceNumber uint64
ExtTimestamp uint64
}
type IntervalStats struct {
packets uint32
packets uint64
bytes uint64
headerBytes uint64
packetsPadding uint32
packetsPadding uint64
bytesPadding uint64
headerBytesPadding uint64
packetsLost uint32
packetsOutOfOrder uint32
packetsLost uint64
packetsOutOfOrder uint64
frames uint32
}
@@ -111,12 +112,12 @@ type RTPDeltaInfo struct {
type Snapshot struct {
startTime time.Time
extStartSN uint32
extStartSNOverridden uint32
packetsDuplicate uint32
extStartSN uint64
extStartSNOverridden uint64
packetsDuplicate uint64
bytesDuplicate uint64
headerBytesDuplicate uint64
packetsLostOverridden uint32
packetsLostOverridden uint64
nacks uint32
plis uint32
firs uint32
@@ -152,15 +153,14 @@ type RTPStats struct {
lock sync.RWMutex
initialized bool
resyncOnNextPacket bool
initialized bool
startTime time.Time
endTime time.Time
sequenceNumber *utils.WrapAround[uint16, uint32]
sequenceNumber *utils.WrapAround[uint16, uint64]
extHighestSNOverridden uint32
extHighestSNOverridden uint64
lastRRTime time.Time
lastRR rtcp.ReceptionReport
@@ -178,13 +178,13 @@ type RTPStats struct {
headerBytesDuplicate uint64
bytesPadding uint64
headerBytesPadding uint64
packetsDuplicate uint32
packetsPadding uint32
packetsDuplicate uint64
packetsPadding uint64
packetsOutOfOrder uint32
packetsOutOfOrder uint64
packetsLost uint32
packetsLostOverridden uint32
packetsLost uint64
packetsLostOverridden uint64
frames uint32
@@ -229,7 +229,7 @@ func NewRTPStats(params RTPStatsParams) *RTPStats {
return &RTPStats{
params: params,
logger: params.Logger,
sequenceNumber: utils.NewWrapAround[uint16, uint32](),
sequenceNumber: utils.NewWrapAround[uint16, uint64](),
timestamp: utils.NewWrapAround[uint32, uint64](),
nextSnapshotId: FirstSnapshotId,
snapshots: make(map[uint32]*Snapshot),
@@ -245,7 +245,6 @@ func (r *RTPStats) Seed(from *RTPStats) {
}
r.initialized = from.initialized
r.resyncOnNextPacket = from.resyncOnNextPacket
r.startTime = from.startTime
// do not clone endTime as a non-zero endTime indicates an ended object
@@ -375,17 +374,7 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
return
}
if r.resyncOnNextPacket {
r.resyncOnNextPacket = false
if r.initialized {
r.sequenceNumber.ResetHighest(rtph.SequenceNumber - 1)
r.timestamp.ResetHighest(rtph.Timestamp)
r.highestTime = packetTime
}
}
var resSN utils.WrapAroundUpdateResult[uint32]
var resSN utils.WrapAroundUpdateResult[uint64]
var resTS utils.WrapAroundUpdateResult[uint64]
if !r.initialized {
if payloadSize == 0 {
@@ -417,8 +406,8 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
"rtp stream start",
"startTime", r.startTime.String(),
"firstTime", r.firstTime.String(),
"startSN", r.sequenceNumber.GetExtendedHighest(),
"startTS", r.timestamp.GetExtendedHighest(),
"startSN", r.sequenceNumber.GetExtendedStart(),
"startTS", r.timestamp.GetExtendedStart(),
)
} else {
resSN = r.sequenceNumber.Update(rtph.SequenceNumber)
@@ -428,8 +417,8 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
hdrSize := uint64(rtph.MarshalSize())
pktSize := hdrSize + uint64(payloadSize+paddingSize)
isDuplicate := false
gapSN := resSN.ExtendedVal - resSN.PreExtendedHighest
if gapSN == 0 || gapSN > (1<<31) { // duplicate OR out-of-order
gapSN := int64(resSN.ExtendedVal - resSN.PreExtendedHighest)
if gapSN <= 0 { // duplicate OR out-of-order
if payloadSize == 0 {
// do not start on a padding only packet
if resTS.IsRestart {
@@ -483,14 +472,15 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
}
flowState.IsOutOfOrder = true
flowState.ExtSeqNumber = resSN.ExtendedVal
flowState.ExtSequenceNumber = resSN.ExtendedVal
flowState.ExtTimestamp = resTS.ExtendedVal
} else { // in-order
// update gap histogram
r.updateGapHistogram(int(gapSN))
// update missing sequence numbers
r.clearSnInfos(resSN.PreExtendedHighest+1, resSN.ExtendedVal)
r.packetsLost += gapSN - 1
r.packetsLost += uint64(gapSN - 1)
r.setSnInfo(resSN.ExtendedVal, resSN.PreExtendedHighest, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker, false)
@@ -505,7 +495,8 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
flowState.LossStartInclusive = resSN.PreExtendedHighest + 1
flowState.LossEndExclusive = resSN.ExtendedVal
}
flowState.ExtSeqNumber = resSN.ExtendedVal
flowState.ExtSequenceNumber = resSN.ExtendedVal
flowState.ExtTimestamp = resTS.ExtendedVal
}
if !isDuplicate {
@@ -527,25 +518,30 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
return
}
func (r *RTPStats) ResyncOnNextPacket() {
func (r *RTPStats) Resync(esn uint64, ets uint64, at time.Time) {
r.lock.Lock()
defer r.lock.Unlock()
r.resyncOnNextPacket = true
if !r.initialized {
return
}
r.sequenceNumber.ResetHighest(esn - 1)
r.timestamp.ResetHighest(ets)
r.highestTime = at
}
func (r *RTPStats) getPacketsExpected() uint32 {
func (r *RTPStats) getPacketsExpected() uint64 {
return r.sequenceNumber.GetExtendedHighest() - r.sequenceNumber.GetExtendedStart() + 1
}
func (r *RTPStats) GetTotalPacketsPrimary() uint32 {
func (r *RTPStats) GetTotalPacketsPrimary() uint64 {
r.lock.RLock()
defer r.lock.RUnlock()
return r.getTotalPacketsPrimary()
}
func (r *RTPStats) getTotalPacketsPrimary() uint32 {
func (r *RTPStats) getTotalPacketsPrimary() uint64 {
packetsExpected := r.getPacketsExpected()
if r.packetsLost > packetsExpected {
// should not happen
@@ -564,7 +560,19 @@ func (r *RTPStats) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt uint32
r.lock.Lock()
defer r.lock.Unlock()
if !r.initialized || !r.endTime.IsZero() || !r.params.IsReceiverReportDriven || rr.LastSequenceNumber < r.sequenceNumber.GetExtendedHighest() {
if !r.initialized || !r.endTime.IsZero() || !r.params.IsReceiverReportDriven || uint64(rr.LastSequenceNumber) < r.sequenceNumber.GetExtendedHighest() {
// it is possible that the `LastSequenceNumber` in the receiver report is before the starting
// sequence number when dummy packets are used to trigger Pion's OnTrack path.
return
}
extHighestSNOverridden := r.extHighestSNOverridden&0xFFFF_FFFF_0000_0000 + uint64(rr.LastSequenceNumber)
if !r.lastRRTime.IsZero() {
if (rr.LastSequenceNumber-r.lastRR.LastSequenceNumber) < (1<<31) && rr.LastSequenceNumber < r.lastRR.LastSequenceNumber {
extHighestSNOverridden += (1 << 32)
}
}
if extHighestSNOverridden < r.sequenceNumber.GetExtendedHighest() {
// it is possible that the `LastSequenceNumber` in the receiver report is before the starting
// sequence number when dummy packets are used to trigger Pion's OnTrack path.
return
@@ -582,9 +590,14 @@ func (r *RTPStats) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt uint32
}
}
if r.lastRRTime.IsZero() || r.extHighestSNOverridden <= rr.LastSequenceNumber {
r.extHighestSNOverridden = rr.LastSequenceNumber
r.packetsLostOverridden = rr.TotalLost
if r.lastRRTime.IsZero() || r.extHighestSNOverridden <= extHighestSNOverridden {
r.extHighestSNOverridden = extHighestSNOverridden
packetsLostOverridden := r.packetsLostOverridden&0xFFFF_FFFF_0000_0000 + uint64(rr.TotalLost)
if (rr.TotalLost-r.lastRR.TotalLost) < (1<<31) && rr.TotalLost < r.lastRR.TotalLost {
packetsLostOverridden += (1 << 32)
}
r.packetsLostOverridden = packetsLostOverridden
if isRttChanged {
r.rtt = rtt
@@ -788,11 +801,11 @@ func (r *RTPStats) MaybeAdjustFirstPacketTime(srData *RTCPSenderReportData) {
defer r.lock.Unlock()
if srData != nil {
r.maybeAdjustFirstPacketTime(srData.RTPTimestamp)
r.maybeAdjustFirstPacketTime(srData.RTPTimestampExt)
}
}
func (r *RTPStats) maybeAdjustFirstPacketTime(ts uint32) {
func (r *RTPStats) maybeAdjustFirstPacketTime(ets uint64) {
if time.Since(r.startTime) > firstPacketTimeAdjustWindow {
return
}
@@ -803,7 +816,7 @@ func (r *RTPStats) maybeAdjustFirstPacketTime(ts uint32) {
// abnormal delay (maybe due to pacing or maybe due to queuing
// in some network element along the way), push back first time
// to an earlier instance.
samplesDiff := int32(ts - uint32(r.timestamp.GetExtendedStart()))
samplesDiff := int64(ets - r.timestamp.GetExtendedStart())
if samplesDiff < 0 {
// out-of-order, skip
return
@@ -819,7 +832,7 @@ func (r *RTPStats) maybeAdjustFirstPacketTime(ts uint32) {
"before", r.firstTime.String(),
"after", firstTime.String(),
"adjustment", r.firstTime.Sub(firstTime),
"nowTS", ts,
"extNowTS", ets,
"extStartTS", r.timestamp.GetExtendedStart(),
)
if r.firstTime.Sub(firstTime) > firstPacketTimeAdjustThreshold {
@@ -829,7 +842,7 @@ func (r *RTPStats) maybeAdjustFirstPacketTime(ts uint32) {
"before", r.firstTime.String(),
"after", firstTime.String(),
"adjustment", r.firstTime.Sub(firstTime),
"nowTS", ts,
"extNowTS", ets,
"extStartTS", r.timestamp.GetExtendedStart(),
)
} else {
@@ -860,16 +873,21 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) {
cycles := uint64(0)
if r.srNewest != nil {
cycles = r.srNewest.RTPTimestampExt & 0xFF_FF_FF_FF_00_00_00_00
cycles = r.srNewest.RTPTimestampExt & 0xFFFF_FFFF_0000_0000
if (srData.RTPTimestamp-r.srNewest.RTPTimestamp) < (1<<31) && srData.RTPTimestamp < r.srNewest.RTPTimestamp {
cycles += (1 << 32)
}
ntpDiffSinceLast := srData.NTPTimestamp.Time().Sub(r.srNewest.NTPTimestamp.Time())
rtpDiff := uint64(ntpDiffSinceLast.Seconds() * float64(r.params.ClockRate))
goArounds := rtpDiff / (1 << 32)
cycles += goArounds * (1 << 32)
}
srDataCopy := *srData
srDataCopy.RTPTimestampExt = uint64(srDataCopy.RTPTimestamp) + cycles
r.maybeAdjustFirstPacketTime(srDataCopy.RTPTimestamp)
r.maybeAdjustFirstPacketTime(srDataCopy.RTPTimestampExt)
// monitor and log RTP timestamp anomalies
var ntpDiffSinceLast time.Duration
@@ -1018,13 +1036,13 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, calculatedClockRate uint32)
// monitor and log RTP timestamp anomalies
var ntpDiffSinceLast time.Duration
var rtpDiffSinceLast uint32
var rtpDiffSinceLast uint64
var departureDiffSinceLast time.Duration
var expectedTimeDiffSinceLast float64
var isWarped bool
if r.srNewest != nil {
ntpDiffSinceLast = nowNTP.Time().Sub(r.srNewest.NTPTimestamp.Time())
rtpDiffSinceLast = nowRTP - r.srNewest.RTPTimestamp
rtpDiffSinceLast = nowRTPExt - r.srNewest.RTPTimestampExt
departureDiffSinceLast = now.Sub(r.srNewest.At)
expectedTimeDiffSinceLast = float64(rtpDiffSinceLast) / float64(r.params.ClockRate)
@@ -1069,7 +1087,7 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, calculatedClockRate uint32)
SSRC: ssrc,
NTPTime: uint64(nowNTP),
RTPTime: nowRTP,
PacketCount: r.getTotalPacketsPrimary() + r.packetsDuplicate + r.packetsPadding,
PacketCount: uint32(r.getTotalPacketsPrimary() + r.packetsDuplicate + r.packetsPadding),
OctetCount: uint32(r.bytes + r.bytesDuplicate + r.bytesPadding),
}
}
@@ -1120,8 +1138,8 @@ func (r *RTPStats) SnapshotRtcpReceptionReport(ssrc uint32, proxyFracLost uint8,
return &rtcp.ReceptionReport{
SSRC: ssrc,
FractionLost: fracLost,
TotalLost: r.packetsLost,
LastSequenceNumber: now.extStartSN,
TotalLost: uint32(r.packetsLost),
LastSequenceNumber: uint32(now.extStartSN),
Jitter: uint32(r.jitter),
LastSenderReport: lastSR,
Delay: dlsr,
@@ -1162,16 +1180,16 @@ func (r *RTPStats) DeltaInfo(snapshotId uint32) *RTPDeltaInfo {
return &RTPDeltaInfo{
StartTime: startTime,
Duration: endTime.Sub(startTime),
Packets: packetsExpected - intervalStats.packetsPadding,
Packets: uint32(packetsExpected - intervalStats.packetsPadding),
Bytes: intervalStats.bytes,
HeaderBytes: intervalStats.headerBytes,
PacketsDuplicate: now.packetsDuplicate - then.packetsDuplicate,
PacketsDuplicate: uint32(now.packetsDuplicate - then.packetsDuplicate),
BytesDuplicate: now.bytesDuplicate - then.bytesDuplicate,
HeaderBytesDuplicate: now.headerBytesDuplicate - then.headerBytesDuplicate,
PacketsPadding: intervalStats.packetsPadding,
PacketsPadding: uint32(intervalStats.packetsPadding),
BytesPadding: intervalStats.bytesPadding,
HeaderBytesPadding: intervalStats.headerBytesPadding,
PacketsLost: intervalStats.packetsLost,
PacketsLost: uint32(intervalStats.packetsLost),
Frames: intervalStats.frames,
RttMax: then.maxRtt,
JitterMax: then.maxJitter / float64(r.params.ClockRate) * 1e6,
@@ -1244,18 +1262,18 @@ func (r *RTPStats) DeltaInfoOverridden(snapshotId uint32) *RTPDeltaInfo {
return &RTPDeltaInfo{
StartTime: startTime,
Duration: endTime.Sub(startTime),
Packets: packetsExpected - intervalStats.packetsPadding,
Packets: uint32(packetsExpected - intervalStats.packetsPadding),
Bytes: intervalStats.bytes,
HeaderBytes: intervalStats.headerBytes,
PacketsDuplicate: now.packetsDuplicate - then.packetsDuplicate,
PacketsDuplicate: uint32(now.packetsDuplicate - then.packetsDuplicate),
BytesDuplicate: now.bytesDuplicate - then.bytesDuplicate,
HeaderBytesDuplicate: now.headerBytesDuplicate - then.headerBytesDuplicate,
PacketsPadding: intervalStats.packetsPadding,
PacketsPadding: uint32(intervalStats.packetsPadding),
BytesPadding: intervalStats.bytesPadding,
HeaderBytesPadding: intervalStats.headerBytesPadding,
PacketsLost: packetsLost,
PacketsMissing: intervalStats.packetsLost,
PacketsOutOfOrder: intervalStats.packetsOutOfOrder,
PacketsLost: uint32(packetsLost),
PacketsMissing: uint32(intervalStats.packetsLost),
PacketsOutOfOrder: uint32(intervalStats.packetsOutOfOrder),
Frames: intervalStats.frames,
RttMax: then.maxRtt,
JitterMax: maxJitterTime,
@@ -1391,25 +1409,25 @@ func (r *RTPStats) ToProto() *livekit.RTPStats {
StartTime: timestamppb.New(r.startTime),
EndTime: timestamppb.New(endTime),
Duration: elapsed,
Packets: packets,
Packets: uint32(packets),
PacketRate: packetRate,
Bytes: r.bytes,
HeaderBytes: r.headerBytes,
Bitrate: bitrate,
PacketsLost: packetsLost,
PacketsLost: uint32(packetsLost),
PacketLossRate: packetLostRate,
PacketLossPercentage: packetLostPercentage,
PacketsDuplicate: r.packetsDuplicate,
PacketsDuplicate: uint32(r.packetsDuplicate),
PacketDuplicateRate: packetDuplicateRate,
BytesDuplicate: r.bytesDuplicate,
HeaderBytesDuplicate: r.headerBytesDuplicate,
BitrateDuplicate: bitrateDuplicate,
PacketsPadding: r.packetsPadding,
PacketsPadding: uint32(r.packetsPadding),
PacketPaddingRate: packetPaddingRate,
BytesPadding: r.bytesPadding,
HeaderBytesPadding: r.headerBytesPadding,
BitratePadding: bitratePadding,
PacketsOutOfOrder: r.packetsOutOfOrder,
PacketsOutOfOrder: uint32(r.packetsOutOfOrder),
Frames: r.frames,
FrameRate: frameRate,
KeyFrames: r.keyFrames,
@@ -1456,7 +1474,7 @@ func (r *RTPStats) ToProto() *livekit.RTPStats {
return p
}
func (r *RTPStats) getExtHighestSNAdjusted() uint32 {
func (r *RTPStats) getExtHighestSNAdjusted() uint64 {
if r.params.IsReceiverReportDriven && !r.lastRRTime.IsZero() {
return r.extHighestSNOverridden
}
@@ -1464,7 +1482,7 @@ func (r *RTPStats) getExtHighestSNAdjusted() uint32 {
return r.sequenceNumber.GetExtendedHighest()
}
func (r *RTPStats) getPacketsLost() uint32 {
func (r *RTPStats) getPacketsLost() uint64 {
if r.params.IsReceiverReportDriven && !r.lastRRTime.IsZero() {
return r.packetsLostOverridden
}
@@ -1472,13 +1490,12 @@ func (r *RTPStats) getPacketsLost() uint32 {
return r.packetsLost
}
func (r *RTPStats) getSnInfoOutOfOrderPtr(esn uint32, ehsn uint32) int {
offset := esn - ehsn
if offset > 0 && offset < (1<<31) {
func (r *RTPStats) getSnInfoOutOfOrderPtr(esn uint64, ehsn uint64) int {
if int64(esn-ehsn) > 0 {
return -1 // in-order, not expected, maybe too new
}
offset = ehsn - esn
offset := ehsn - esn
if int(offset) >= SnInfoSize {
// too old, ignore
return -1
@@ -1487,9 +1504,9 @@ func (r *RTPStats) getSnInfoOutOfOrderPtr(esn uint32, ehsn uint32) int {
return (r.snInfoWritePtr - int(offset) - 1) & SnInfoMask
}
func (r *RTPStats) setSnInfo(esn uint32, ehsn uint32, pktSize uint16, hdrSize uint16, payloadSize uint16, marker bool, isOutOfOrder bool) {
func (r *RTPStats) setSnInfo(esn uint64, ehsn uint64, pktSize uint16, hdrSize uint16, payloadSize uint16, marker bool, isOutOfOrder bool) {
writePtr := 0
ooo := (esn - ehsn) > (1 << 31)
ooo := int64(esn-ehsn) < 0
if !ooo {
writePtr = r.snInfoWritePtr
r.snInfoWritePtr = (writePtr + 1) & SnInfoMask
@@ -1508,7 +1525,7 @@ func (r *RTPStats) setSnInfo(esn uint32, ehsn uint32, pktSize uint16, hdrSize ui
snInfo.isOutOfOrder = isOutOfOrder
}
func (r *RTPStats) clearSnInfos(extStartInclusive uint32, extEndExclusive uint32) {
func (r *RTPStats) clearSnInfos(extStartInclusive uint64, extEndExclusive uint64) {
for esn := extStartInclusive; esn != extEndExclusive; esn++ {
snInfo := &r.snInfos[r.snInfoWritePtr]
snInfo.pktSize = 0
@@ -1520,7 +1537,7 @@ func (r *RTPStats) clearSnInfos(extStartInclusive uint32, extEndExclusive uint32
}
}
func (r *RTPStats) isSnInfoLost(esn uint32, ehsn uint32) bool {
func (r *RTPStats) isSnInfoLost(esn uint64, ehsn uint64) bool {
readPtr := r.getSnInfoOutOfOrderPtr(esn, ehsn)
if readPtr < 0 {
return false
@@ -1530,9 +1547,9 @@ func (r *RTPStats) isSnInfoLost(esn uint32, ehsn uint32) bool {
return snInfo.pktSize == 0
}
func (r *RTPStats) getIntervalStats(extStartInclusive uint32, extEndExclusive uint32) (intervalStats IntervalStats) {
func (r *RTPStats) getIntervalStats(extStartInclusive uint64, extEndExclusive uint64) (intervalStats IntervalStats) {
packetsNotFound := uint32(0)
processESN := func(esn uint32, ehsn uint32) {
processESN := func(esn uint64, ehsn uint64) {
readPtr := r.getSnInfoOutOfOrderPtr(esn, ehsn)
if readPtr < 0 {
packetsNotFound++
+11 -11
View File
@@ -114,8 +114,8 @@ func TestRTPStats_Update(t *testing.T) {
require.Equal(t, sequenceNumber, uint16(r.sequenceNumber.GetExtendedHighest()))
require.Equal(t, timestamp, r.timestamp.GetHighest())
require.Equal(t, timestamp, uint32(r.timestamp.GetExtendedHighest()))
require.Equal(t, uint32(1), r.packetsOutOfOrder)
require.Equal(t, uint32(0), r.packetsDuplicate)
require.Equal(t, uint64(1), r.packetsOutOfOrder)
require.Equal(t, uint64(0), r.packetsDuplicate)
// duplicate
packet = getPacket(sequenceNumber-10, timestamp-30000, 1000)
@@ -125,8 +125,8 @@ func TestRTPStats_Update(t *testing.T) {
require.Equal(t, sequenceNumber, uint16(r.sequenceNumber.GetExtendedHighest()))
require.Equal(t, timestamp, r.timestamp.GetHighest())
require.Equal(t, timestamp, uint32(r.timestamp.GetExtendedHighest()))
require.Equal(t, uint32(2), r.packetsOutOfOrder)
require.Equal(t, uint32(1), r.packetsDuplicate)
require.Equal(t, uint64(2), r.packetsOutOfOrder)
require.Equal(t, uint64(1), r.packetsDuplicate)
// loss
sequenceNumber += 10
@@ -134,9 +134,9 @@ func TestRTPStats_Update(t *testing.T) {
packet = getPacket(sequenceNumber, timestamp, 1000)
flowState = r.Update(&packet.Header, len(packet.Payload), 0, time.Now())
require.True(t, flowState.HasLoss)
require.Equal(t, uint32(sequenceNumber-9), flowState.LossStartInclusive)
require.Equal(t, uint32(sequenceNumber), flowState.LossEndExclusive)
require.Equal(t, uint32(17), r.packetsLost)
require.Equal(t, uint64(sequenceNumber-9), flowState.LossStartInclusive)
require.Equal(t, uint64(sequenceNumber), flowState.LossEndExclusive)
require.Equal(t, uint64(17), r.packetsLost)
// out-of-order should decrement number of lost packets
packet = getPacket(sequenceNumber-15, timestamp-45000, 1000)
@@ -146,11 +146,11 @@ func TestRTPStats_Update(t *testing.T) {
require.Equal(t, sequenceNumber, uint16(r.sequenceNumber.GetExtendedHighest()))
require.Equal(t, timestamp, r.timestamp.GetHighest())
require.Equal(t, timestamp, uint32(r.timestamp.GetExtendedHighest()))
require.Equal(t, uint32(3), r.packetsOutOfOrder)
require.Equal(t, uint32(1), r.packetsDuplicate)
require.Equal(t, uint32(16), r.packetsLost)
require.Equal(t, uint64(3), r.packetsOutOfOrder)
require.Equal(t, uint64(1), r.packetsDuplicate)
require.Equal(t, uint64(16), r.packetsLost)
intervalStats := r.getIntervalStats(r.sequenceNumber.GetExtendedStart(), r.sequenceNumber.GetExtendedHighest()+1)
require.Equal(t, uint32(16), intervalStats.packetsLost)
require.Equal(t, uint64(16), intervalStats.packetsLost)
r.Stop()
}
+1 -1
View File
@@ -1738,7 +1738,7 @@ func (d *DownTrack) getDeltaStatsOverridden() map[uint32]*buffer.StreamStatsWith
}
func (d *DownTrack) GetNackStats() (totalPackets uint32, totalRepeatedNACKs uint32) {
totalPackets = d.rtpStats.GetTotalPacketsPrimary()
totalPackets = uint32(d.rtpStats.GetTotalPacketsPrimary())
totalRepeatedNACKs = d.totalRepeatedNACKs.Load()
return
}
+71 -71
View File
@@ -158,8 +158,8 @@ type ForwarderState struct {
Started bool
ReferenceLayerSpatial int32
PreStartTime time.Time
FirstTS uint32
RefTSOffset uint32
ExtFirstTS uint64
RefTSOffset uint64
RTP RTPMungerState
Codec interface{}
}
@@ -170,11 +170,11 @@ func (f ForwarderState) String() string {
case codecmunger.VP8State:
codecString = codecState.String()
}
return fmt.Sprintf("ForwarderState{started: %v, referenceLayerSpatial: %d, preStartTime: %s, firstTS: %d, refTSOffset: %d, rtp: %s, codec: %s}",
return fmt.Sprintf("ForwarderState{started: %v, referenceLayerSpatial: %d, preStartTime: %s, extFirstTS: %d, refTSOffset: %d, rtp: %s, codec: %s}",
f.Started,
f.ReferenceLayerSpatial,
f.PreStartTime.String(),
f.FirstTS,
f.ExtFirstTS,
f.RefTSOffset,
f.RTP.String(),
codecString,
@@ -188,7 +188,7 @@ type Forwarder struct {
codec webrtc.RTPCodecCapability
kind webrtc.RTPCodecType
logger logger.Logger
getReferenceLayerRTPTimestamp func(ts uint32, layer int32, referenceLayer int32) (uint32, error)
getReferenceLayerRTPTimestamp func(ets uint64, layer int32, referenceLayer int32) (uint64, error)
getExpectedRTPTimestamp func(at time.Time) (uint64, error)
muted bool
@@ -197,10 +197,10 @@ type Forwarder struct {
started bool
preStartTime time.Time
firstTS uint32
extFirstTS uint64
lastSSRC uint32
referenceLayerSpatial int32
refTSOffset uint32
refTSOffset uint64
provisional *VideoAllocationProvisional
@@ -216,7 +216,7 @@ type Forwarder struct {
func NewForwarder(
kind webrtc.RTPCodecType,
logger logger.Logger,
getReferenceLayerRTPTimestamp func(ts uint32, layer int32, referenceLayer int32) (uint32, error),
getReferenceLayerRTPTimestamp func(ets uint64, layer int32, referenceLayer int32) (uint64, error),
getExpectedRTPTimestamp func(at time.Time) (uint64, error),
) *Forwarder {
f := &Forwarder{
@@ -336,7 +336,7 @@ func (f *Forwarder) GetState() ForwarderState {
Started: f.started,
ReferenceLayerSpatial: f.referenceLayerSpatial,
PreStartTime: f.preStartTime,
FirstTS: f.firstTS,
ExtFirstTS: f.extFirstTS,
RefTSOffset: f.refTSOffset,
RTP: f.rtpMunger.GetLast(),
Codec: f.codecMunger.GetState(),
@@ -357,7 +357,7 @@ func (f *Forwarder) SeedState(state ForwarderState) {
f.started = true
f.referenceLayerSpatial = state.ReferenceLayerSpatial
f.preStartTime = state.PreStartTime
f.firstTS = state.FirstTS
f.extFirstTS = state.ExtFirstTS
f.refTSOffset = state.RefTSOffset
}
@@ -1445,13 +1445,13 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) (
return nil, nil
}
logTransition := func(message string, expectedTS, refTS, lastTS uint32, diffSeconds float64) {
logTransition := func(message string, extExpectedTS, extRefTS, extLastTS uint64, diffSeconds float64) {
f.logger.Debugw(
message,
"layer", layer,
"expectedTS", expectedTS,
"refTS", refTS,
"lastTS", lastTS,
"extExpectedTS", extExpectedTS,
"extRefTS", extRefTS,
"extLastTS", extLastTS,
"diffSeconds", math.Abs(diffSeconds),
)
}
@@ -1461,20 +1461,20 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) (
// timestamp offset on source change.
//
// There are three timestamps to consider here
// 1. lastTS -> timestamp of last sent packet
// 2. refTS -> timestamp of this packet (after munging) calculated using feed's RTCP sender report
// 3. expectedTS -> expected timestamp of this packet calculated based on elapsed time since first packet
// Ideally, refTS and expectedTS should be very close and lastTS should be before both of those.
// 1. extLastTS -> timestamp of last sent packet
// 2. extRefTS -> timestamp of this packet (after munging) calculated using feed's RTCP sender report
// 3. extExpectedTS -> expected timestamp of this packet calculated based on elapsed time since first packet
// Ideally, extRefTS and extExpectedTS should be very close and extLastTS should be before both of those.
// But, cases like muting/unmuting, clock vagaries, pacing, etc. make them not satisfy those conditions always.
rtpMungerState := f.rtpMunger.GetLast()
lastTS := rtpMungerState.LastTS
refTS := lastTS
expectedTS := lastTS
extLastTS := rtpMungerState.ExtLastTS
extRefTS := extLastTS
extExpectedTS := extLastTS
switchingAt := time.Now()
if f.getReferenceLayerRTPTimestamp != nil {
ts, err := f.getReferenceLayerRTPTimestamp(extPkt.Packet.Timestamp, layer, f.referenceLayerSpatial)
ets, err := f.getReferenceLayerRTPTimestamp(extPkt.ExtTimestamp, layer, f.referenceLayerSpatial)
if err != nil {
// error out if refTS is not available. It can happen when there is no sender report
// error out if extRefTS is not available. It can happen when there is no sender report
// for the layer being switched to. Can especially happen at the start of the track when layer switches are
// potentially happening very quickly. Erroring out and waiting for a layer for which a sender report has been
// received will calculate a better offset, but may result in initial adaptation to take a bit longer depending
@@ -1482,35 +1482,35 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) (
return nil, err
}
refTS = ts
extRefTS = ets
}
if f.getExpectedRTPTimestamp != nil {
tsExt, err := f.getExpectedRTPTimestamp(switchingAt)
if err == nil {
expectedTS = uint32(tsExt)
extExpectedTS = tsExt
} else {
rtpDiff := uint32(0)
rtpDiff := uint64(0)
if !f.preStartTime.IsZero() && f.refTSOffset == 0 {
timeSinceFirst := time.Since(f.preStartTime)
rtpDiff = uint32(timeSinceFirst.Nanoseconds() * int64(f.codec.ClockRate) / 1e9)
f.refTSOffset = f.firstTS + rtpDiff - refTS
rtpDiff = uint64(timeSinceFirst.Nanoseconds() * int64(f.codec.ClockRate) / 1e9)
f.refTSOffset = f.extFirstTS + rtpDiff - extRefTS
f.logger.Infow(
"calculating refTSOffset",
"preStartTime", f.preStartTime.String(),
"firstTS", f.firstTS,
"extFirstTS", f.extFirstTS,
"timeSinceFirst", timeSinceFirst,
"rtpDiff", rtpDiff,
"refTS", refTS,
"extRefTS", extRefTS,
"refTSOffset", f.refTSOffset,
)
}
expectedTS += rtpDiff
extExpectedTS += rtpDiff
}
}
refTS += f.refTSOffset
extRefTS += f.refTSOffset
var nextTS uint32
var extNextTS uint64
if f.lastSSRC == 0 {
// If resuming (e. g. on unmute), keep next timestamp close to expected timestamp.
//
@@ -1525,66 +1525,66 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) (
// increases and pacer starts sending at faster rate.
//
// But, the challenege is distinguishing between the two cases. As a compromise, the difference
// between expectedTS and refTS is thresholded. Difference below the threshold is treated as Case 2
// between extExpectedTS and extRefTS is thresholded. Difference below the threshold is treated as Case 2
// and above as Case 1.
//
// In the event of refTS > expectedTS, use refTS.
// Ideally, refTS should not be ahead of expectedTS, but expectedTS uses the first packet's
// In the event of extRefTS > extExpectedTS, use extRefTS.
// Ideally, extRefTS should not be ahead of extExpectedTS, but extExpectedTS uses the first packet's
// wall clock time. So, if the first packet experienced abmormal latency, it is possible
// for refTS > expectedTS
diffSeconds := float64(int32(expectedTS-refTS)) / float64(f.codec.ClockRate)
// for extRefTS > extExpectedTS
diffSeconds := float64(int64(extExpectedTS-extRefTS)) / float64(f.codec.ClockRate)
if diffSeconds >= 0.0 {
if f.resumeBehindThreshold > 0 && diffSeconds > f.resumeBehindThreshold {
logTransition("resume, reference too far behind", expectedTS, refTS, lastTS, diffSeconds)
nextTS = expectedTS
logTransition("resume, reference too far behind", extExpectedTS, extRefTS, extLastTS, diffSeconds)
extNextTS = extExpectedTS
} else {
nextTS = refTS
extNextTS = extRefTS
}
} else {
if math.Abs(diffSeconds) > SwitchAheadThresholdSeconds {
logTransition("resume, reference too far ahead", expectedTS, refTS, lastTS, diffSeconds)
logTransition("resume, reference too far ahead", extExpectedTS, extRefTS, extLastTS, diffSeconds)
}
nextTS = refTS
extNextTS = extRefTS
}
f.resumeBehindThreshold = 0.0
} else {
// switching between layers, check if refTS is too far behind the last sent
diffSeconds := float64(int32(refTS-lastTS)) / float64(f.codec.ClockRate)
// switching between layers, check if extRefTS is too far behind the last sent
diffSeconds := float64(int64(extRefTS-extLastTS)) / float64(f.codec.ClockRate)
if diffSeconds < 0.0 {
if math.Abs(diffSeconds) > LayerSwitchBehindThresholdSeconds {
// this could be due to pacer trickling out this layer. Error out and wait for a more opportune time.
// AVSYNC-TODO: Consider some forcing function to do the switch
// (like "have waited for too long for layer switch, nothing available, switch to whatever is available" kind of condition).
logTransition("layer switch, reference too far behind", expectedTS, refTS, lastTS, diffSeconds)
logTransition("layer switch, reference too far behind", extExpectedTS, extRefTS, extLastTS, diffSeconds)
return nil, errors.New("switch point too far behind")
}
// use a nominal increase to ensure that timestamp is always moving forward
logTransition("layer switch, reference is slightly behind", expectedTS, refTS, lastTS, diffSeconds)
nextTS = lastTS + 1
logTransition("layer switch, reference is slightly behind", extExpectedTS, extRefTS, extLastTS, diffSeconds)
extNextTS = extLastTS + 1
} else {
diffSeconds = float64(int32(expectedTS-refTS)) / float64(f.codec.ClockRate)
diffSeconds = float64(int64(extExpectedTS-extRefTS)) / float64(f.codec.ClockRate)
if diffSeconds < 0.0 && math.Abs(diffSeconds) > SwitchAheadThresholdSeconds {
logTransition("layer switch, reference too far ahead", expectedTS, refTS, lastTS, diffSeconds)
logTransition("layer switch, reference too far ahead", extExpectedTS, extRefTS, extLastTS, diffSeconds)
}
nextTS = refTS
extNextTS = extRefTS
}
}
if nextTS-lastTS == 0 || nextTS-lastTS > (1<<31) {
f.logger.Debugw("next timestamp is before last, adjusting", "nextTS", nextTS, "lastTS", lastTS)
if int64(extNextTS-extLastTS) <= 0 {
f.logger.Debugw("next timestamp is before last, adjusting", "extNextTS", extNextTS, "extLastTS", extLastTS)
// nominal increase
nextTS = lastTS + 1
extNextTS = extLastTS + 1
}
snOffset := uint32(1)
tsOffset := nextTS - lastTS
snOffset := uint64(1)
tsOffset := extNextTS - extLastTS
if !rtpMungerState.LastMarker {
// If last forwarded packet is not end of frame, synthesise a break in sequence number.
// Else, decoders could try to interpret consecutive packets as part of the same frame
// and potentially cause video corruption.
snOffset++
if tsOffset < f.codec.ClockRate*33/1000 {
tsOffset = f.codec.ClockRate * 33 / 1000
if tsOffset < uint64(f.codec.ClockRate*33/1000) {
tsOffset = uint64(f.codec.ClockRate * 33 / 1000)
}
}
f.rtpMunger.UpdateSnTsOffsets(extPkt, snOffset, tsOffset)
@@ -1594,13 +1594,13 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) (
"source switch",
"switchingAt", switchingAt.String(),
"layer", layer,
"lastTS", lastTS,
"refTS", refTS,
"extLastTS", extLastTS,
"extRefTS", extRefTS,
"refTSOffset", f.refTSOffset,
"referenceLayerSpatial", f.referenceLayerSpatial,
"expectedTS", expectedTS,
"nextTS", nextTS,
"tsJump", nextTS-lastTS,
"extExpectedTS", extExpectedTS,
"extNextTS", extNextTS,
"tsJump", tsOffset,
"nextSN", rtpMungerState.ExtLastSN+snOffset,
"snOffset", snOffset,
)
@@ -1609,7 +1609,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) (
if snOffset != 1 {
eof = &SnTs{
sequenceNumber: uint16(rtpMungerState.ExtLastSN + 1),
timestamp: rtpMungerState.LastTS,
timestamp: uint32(rtpMungerState.ExtLastTS),
}
}
return eof, nil
@@ -1760,7 +1760,7 @@ func (f *Forwarder) maybeStart() {
}
f.rtpMunger.SetLastSnTs(extPkt)
f.firstTS = extPkt.Packet.Timestamp
f.extFirstTS = uint64(extPkt.Packet.Timestamp)
f.logger.Debugw(
"starting with dummy forwarding",
"sequenceNumber", extPkt.Packet.SequenceNumber,
@@ -1797,18 +1797,18 @@ func (f *Forwarder) GetSnTsForBlankFrames(frameRate uint32, numPackets int) ([]S
numPackets++
}
lastTS := f.rtpMunger.GetLast().LastTS
expectedTS := lastTS
extLastTS := f.rtpMunger.GetLast().ExtLastTS
extExpectedTS := extLastTS
if f.getExpectedRTPTimestamp != nil {
tsExt, err := f.getExpectedRTPTimestamp(time.Now())
if err == nil {
expectedTS = uint32(tsExt)
extExpectedTS = tsExt
}
}
if expectedTS-lastTS == 0 || expectedTS-lastTS > (1<<31) {
expectedTS = lastTS + 1
if int64(extExpectedTS-extLastTS) <= 0 {
extExpectedTS = extLastTS + 1
}
snts, err := f.rtpMunger.UpdateAndGetPaddingSnTs(numPackets, f.codec.ClockRate, frameRate, frameEndNeeded, expectedTS)
snts, err := f.rtpMunger.UpdateAndGetPaddingSnTs(numPackets, f.codec.ClockRate, frameRate, frameEndNeeded, extExpectedTS)
return snts, frameEndNeeded, err
}
+3 -3
View File
@@ -81,7 +81,7 @@ type TrackReceiver interface {
GetTemporalLayerFpsForSpatial(layer int32) []float32
GetCalculatedClockRate(layer int32) uint32
GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error)
GetReferenceLayerRTPTimestamp(ets uint64, layer int32, referenceLayer int32) (uint64, error)
}
// WebRTCReceiver receives a media track
@@ -777,8 +777,8 @@ func (w *WebRTCReceiver) GetCalculatedClockRate(layer int32) uint32 {
return w.streamTrackerManager.GetCalculatedClockRate(layer)
}
func (w *WebRTCReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) {
return w.streamTrackerManager.GetReferenceLayerRTPTimestamp(ts, layer, referenceLayer)
func (w *WebRTCReceiver) GetReferenceLayerRTPTimestamp(ets uint64, layer int32, referenceLayer int32) (uint64, error) {
return w.streamTrackerManager.GetReferenceLayerRTPTimestamp(ets, layer, referenceLayer)
}
// closes all track senders in parallel, returns when all are closed
+33 -32
View File
@@ -53,13 +53,13 @@ type SnTs struct {
// ----------------------------------------------------------------------
type RTPMungerState struct {
ExtLastSN uint32
LastTS uint32
ExtLastSN uint64
ExtLastTS uint64
LastMarker bool
}
func (r RTPMungerState) String() string {
return fmt.Sprintf("RTPMungerState{extLastSN: %d, lastTS: %d, lastMarker: %v)", r.ExtLastSN, r.LastTS, r.LastMarker)
return fmt.Sprintf("RTPMungerState{extLastSN: %d, extLastTS: %d, lastMarker: %v)", r.ExtLastSN, r.ExtLastTS, r.LastMarker)
}
// ----------------------------------------------------------------------
@@ -67,22 +67,22 @@ func (r RTPMungerState) String() string {
type RTPMunger struct {
logger logger.Logger
extHighestIncomingSN uint32
snRangeMap *utils.RangeMap[uint32, uint32]
extHighestIncomingSN uint64
snRangeMap *utils.RangeMap[uint64, uint64]
extLastSN uint32
lastTS uint32
tsOffset uint32
extLastSN uint64
extLastTS uint64
tsOffset uint64
lastMarker bool
extRtxGateSn uint32
extRtxGateSn uint64
isInRtxGateRegion bool
}
func NewRTPMunger(logger logger.Logger) *RTPMunger {
return &RTPMunger{
logger: logger,
snRangeMap: utils.NewRangeMap[uint32, uint32](100),
snRangeMap: utils.NewRangeMap[uint64, uint64](100),
}
}
@@ -92,7 +92,7 @@ func (r *RTPMunger) DebugInfo() map[string]interface{} {
"ExtHighestIncomingSN": r.extHighestIncomingSN,
"ExtLastSN": r.extLastSN,
"SNOffset": snOffset,
"LastTS": r.lastTS,
"ExtLastTS": r.extLastTS,
"TSOffset": r.tsOffset,
"LastMarker": r.lastMarker,
}
@@ -101,27 +101,27 @@ func (r *RTPMunger) DebugInfo() map[string]interface{} {
func (r *RTPMunger) GetLast() RTPMungerState {
return RTPMungerState{
ExtLastSN: r.extLastSN,
LastTS: r.lastTS,
ExtLastTS: r.extLastTS,
LastMarker: r.lastMarker,
}
}
func (r *RTPMunger) SeedLast(state RTPMungerState) {
r.extLastSN = state.ExtLastSN
r.lastTS = state.LastTS
r.extLastTS = state.ExtLastTS
r.lastMarker = state.LastMarker
}
func (r *RTPMunger) SetLastSnTs(extPkt *buffer.ExtPacket) {
r.extHighestIncomingSN = extPkt.ExtSequenceNumber - 1
r.extLastSN = extPkt.ExtSequenceNumber
r.lastTS = extPkt.Packet.Timestamp
r.extLastTS = extPkt.ExtTimestamp
}
func (r *RTPMunger) UpdateSnTsOffsets(extPkt *buffer.ExtPacket, snAdjust uint32, tsAdjust uint32) {
func (r *RTPMunger) UpdateSnTsOffsets(extPkt *buffer.ExtPacket, snAdjust uint64, tsAdjust uint64) {
r.extHighestIncomingSN = extPkt.ExtSequenceNumber - 1
r.snRangeMap.ClearAndResetValue(extPkt.ExtSequenceNumber - r.extLastSN - snAdjust)
r.tsOffset = extPkt.Packet.Timestamp - r.lastTS - tsAdjust
r.tsOffset = extPkt.ExtTimestamp - r.extLastTS - tsAdjust
}
func (r *RTPMunger) PacketDropped(extPkt *buffer.ExtPacket) {
@@ -154,7 +154,7 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara
return &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingOutOfOrder,
sequenceNumber: uint16(extPkt.ExtSequenceNumber - snOffset),
timestamp: extPkt.Packet.Timestamp - r.tsOffset,
timestamp: uint32(extPkt.ExtTimestamp - r.tsOffset),
}, nil
}
@@ -189,10 +189,10 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara
}
extMungedSN := extPkt.ExtSequenceNumber - snOffset
mungedTS := extPkt.Packet.Timestamp - r.tsOffset
extMungedTS := extPkt.ExtTimestamp - r.tsOffset
r.extLastSN = extMungedSN
r.lastTS = mungedTS
r.extLastTS = extMungedTS
r.lastMarker = extPkt.Packet.Marker
if extPkt.KeyFrame {
@@ -207,7 +207,7 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara
return &TranslationParamsRTP{
snOrdering: ordering,
sequenceNumber: uint16(extMungedSN),
timestamp: mungedTS,
timestamp: uint32(extMungedTS),
}, nil
}
@@ -226,7 +226,7 @@ func (r *RTPMunger) FilterRTX(nacks []uint16) []uint16 {
return filtered
}
func (r *RTPMunger) UpdateAndGetPaddingSnTs(num int, clockRate uint32, frameRate uint32, forceMarker bool, rtpTimestamp uint32) ([]SnTs, error) {
func (r *RTPMunger) UpdateAndGetPaddingSnTs(num int, clockRate uint32, frameRate uint32, forceMarker bool, extRtpTimestamp uint64) ([]SnTs, error) {
useLastTSForFirst := false
tsOffset := 0
if !r.lastMarker {
@@ -240,32 +240,33 @@ func (r *RTPMunger) UpdateAndGetPaddingSnTs(num int, clockRate uint32, frameRate
}
extLastSN := r.extLastSN
lastTS := r.lastTS
extLastTS := r.extLastTS
vals := make([]SnTs, num)
for i := 0; i < num; i++ {
extLastSN++
vals[i].sequenceNumber = uint16(extLastSN)
if frameRate != 0 {
if useLastTSForFirst && i == 0 {
vals[i].timestamp = r.lastTS
vals[i].timestamp = uint32(r.extLastTS)
} else {
ts := rtpTimestamp + ((uint32(i+1-tsOffset)*clockRate)+frameRate-1)/frameRate
if (ts-lastTS) == 0 || (ts-lastTS) > (1<<31) {
ts = lastTS + 1
lastTS = ts
ets := extRtpTimestamp + uint64(((uint32(i+1-tsOffset)*clockRate)+frameRate-1)/frameRate)
if int64(ets-extLastTS) <= 0 {
ets = extLastTS + 1
}
vals[i].timestamp = ts
extLastTS = ets
vals[i].timestamp = uint32(ets)
}
} else {
vals[i].timestamp = r.lastTS
vals[i].timestamp = uint32(r.extLastTS)
}
}
r.extLastSN = extLastSN
r.snRangeMap.DecValue(uint32(num))
r.snRangeMap.DecValue(uint64(num))
r.tsOffset -= vals[num-1].timestamp - r.lastTS
r.lastTS = vals[num-1].timestamp
r.tsOffset -= extLastTS - r.extLastTS
r.extLastTS = extLastTS
if forceMarker {
r.lastMarker = true
+53 -53
View File
@@ -41,13 +41,13 @@ func TestSetLastSnTs(t *testing.T) {
require.NotNil(t, extPkt)
r.SetLastSnTs(extPkt)
require.Equal(t, uint32(23332), r.extHighestIncomingSN)
require.Equal(t, uint32(23333), r.extLastSN)
require.Equal(t, uint32(0xabcdef), r.lastTS)
require.Equal(t, uint64(23332), r.extHighestIncomingSN)
require.Equal(t, uint64(23333), r.extLastSN)
require.Equal(t, uint64(0xabcdef), r.extLastTS)
snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(0), snOffset)
require.Equal(t, uint32(0), r.tsOffset)
require.Equal(t, uint64(0), snOffset)
require.Equal(t, uint64(0), r.tsOffset)
}
func TestUpdateSnTsOffsets(t *testing.T) {
@@ -68,13 +68,13 @@ func TestUpdateSnTsOffsets(t *testing.T) {
}
extPkt, _ = testutils.GetTestExtPacket(params)
r.UpdateSnTsOffsets(extPkt, 1, 1)
require.Equal(t, uint32(33332), r.extHighestIncomingSN)
require.Equal(t, uint32(23333), r.extLastSN)
require.Equal(t, uint32(0xabcdef), r.lastTS)
require.Equal(t, uint64(33332), r.extHighestIncomingSN)
require.Equal(t, uint64(23333), r.extLastSN)
require.Equal(t, uint64(0xabcdef), r.extLastTS)
snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(9999), snOffset)
require.Equal(t, uint32(0xffffffff), r.tsOffset)
require.Equal(t, uint64(9999), snOffset)
require.Equal(t, uint64(0xffff_ffff_ffff_ffff), r.tsOffset)
}
func TestPacketDropped(t *testing.T) {
@@ -88,13 +88,13 @@ func TestPacketDropped(t *testing.T) {
}
extPkt, _ := testutils.GetTestExtPacket(params)
r.SetLastSnTs(extPkt)
require.Equal(t, uint32(23332), r.extHighestIncomingSN)
require.Equal(t, uint32(23333), r.extLastSN)
require.Equal(t, uint32(0xabcdef), r.lastTS)
require.Equal(t, uint64(23332), r.extHighestIncomingSN)
require.Equal(t, uint64(23333), r.extLastSN)
require.Equal(t, uint64(0xabcdef), r.extLastTS)
snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(0), snOffset)
require.Equal(t, uint32(0), r.tsOffset)
require.Equal(t, uint64(0), snOffset)
require.Equal(t, uint64(0), r.tsOffset)
r.UpdateAndGetSnTs(extPkt) // update sequence number offset
@@ -106,11 +106,11 @@ func TestPacketDropped(t *testing.T) {
}
extPkt, _ = testutils.GetTestExtPacket(params)
r.PacketDropped(extPkt)
require.Equal(t, uint32(23333), r.extHighestIncomingSN)
require.Equal(t, uint32(23333), r.extLastSN)
require.Equal(t, uint64(23333), r.extHighestIncomingSN)
require.Equal(t, uint64(23333), r.extLastSN)
snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(0), snOffset)
require.Equal(t, uint64(0), snOffset)
// drop a head packet and check offset increases
params = &testutils.TestExtPacketParams{
@@ -124,10 +124,10 @@ func TestPacketDropped(t *testing.T) {
r.UpdateAndGetSnTs(extPkt) // update sequence number offset
r.PacketDropped(extPkt)
require.Equal(t, uint32(44443), r.extLastSN)
require.Equal(t, uint64(44443), r.extLastSN)
snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(1), snOffset)
require.Equal(t, uint64(1), snOffset)
params = &testutils.TestExtPacketParams{
SequenceNumber: 44445,
@@ -138,10 +138,10 @@ func TestPacketDropped(t *testing.T) {
extPkt, _ = testutils.GetTestExtPacket(params)
r.UpdateAndGetSnTs(extPkt) // update sequence number offset
require.Equal(t, r.extLastSN, uint32(44444))
require.Equal(t, r.extLastSN, uint64(44444))
snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(1), snOffset)
require.Equal(t, uint64(1), snOffset)
}
func TestOutOfOrderSequenceNumber(t *testing.T) {
@@ -243,11 +243,11 @@ func TestPaddingOnlyPacket(t *testing.T) {
require.Error(t, err)
require.ErrorIs(t, err, ErrPaddingOnlyPacket)
require.Equal(t, tpExpected, *tp)
require.Equal(t, uint32(23333), r.extHighestIncomingSN)
require.Equal(t, uint32(23333), r.extLastSN)
require.Equal(t, uint64(23333), r.extHighestIncomingSN)
require.Equal(t, uint64(23333), r.extLastSN)
snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(1), snOffset)
require.Equal(t, uint64(1), snOffset)
// padding only packet with a gap should not report an error
params = &testutils.TestExtPacketParams{
@@ -266,11 +266,11 @@ func TestPaddingOnlyPacket(t *testing.T) {
tp, err = r.UpdateAndGetSnTs(extPkt)
require.NoError(t, err)
require.Equal(t, tpExpected, *tp)
require.Equal(t, uint32(23335), r.extHighestIncomingSN)
require.Equal(t, uint32(23334), r.extLastSN)
require.Equal(t, uint64(23335), r.extHighestIncomingSN)
require.Equal(t, uint64(23334), r.extLastSN)
snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(1), snOffset)
require.Equal(t, uint64(1), snOffset)
}
func TestGapInSequenceNumber(t *testing.T) {
@@ -307,19 +307,19 @@ func TestGapInSequenceNumber(t *testing.T) {
tp, err := r.UpdateAndGetSnTs(extPkt)
require.NoError(t, err)
require.Equal(t, tpExpected, *tp)
require.Equal(t, uint32(65536+1), r.extHighestIncomingSN)
require.Equal(t, uint32(65536+1), r.extLastSN)
require.Equal(t, uint64(65536+1), r.extHighestIncomingSN)
require.Equal(t, uint64(65536+1), r.extLastSN)
snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(0), snOffset)
require.Equal(t, uint64(0), snOffset)
// ensure missing sequence numbers got recorded in cache
// last received, three missing in between and current received should all be in cache
for i := uint32(65534); i != 65536+1; i++ {
for i := uint64(65534); i != 65536+1; i++ {
offset, err := r.snRangeMap.GetValue(i)
require.NoError(t, err)
require.Equal(t, uint32(0), offset)
require.Equal(t, uint64(0), offset)
}
// a padding only packet should be dropped
@@ -338,11 +338,11 @@ func TestGapInSequenceNumber(t *testing.T) {
tp, err = r.UpdateAndGetSnTs(extPkt)
require.ErrorIs(t, err, ErrPaddingOnlyPacket)
require.Equal(t, tpExpected, *tp)
require.Equal(t, uint32(65536+2), r.extHighestIncomingSN)
require.Equal(t, uint32(65536+1), r.extLastSN)
require.Equal(t, uint64(65536+2), r.extHighestIncomingSN)
require.Equal(t, uint64(65536+1), r.extLastSN)
snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(1), snOffset)
require.Equal(t, uint64(1), snOffset)
// a packet with a gap should be adding to missing cache
params = &testutils.TestExtPacketParams{
@@ -363,11 +363,11 @@ func TestGapInSequenceNumber(t *testing.T) {
tp, err = r.UpdateAndGetSnTs(extPkt)
require.NoError(t, err)
require.Equal(t, tpExpected, *tp)
require.Equal(t, uint32(65536+4), r.extHighestIncomingSN)
require.Equal(t, uint32(65536+3), r.extLastSN)
require.Equal(t, uint64(65536+4), r.extHighestIncomingSN)
require.Equal(t, uint64(65536+3), r.extLastSN)
snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(1), snOffset)
require.Equal(t, uint64(1), snOffset)
// another contiguous padding only packet should be dropped
params = &testutils.TestExtPacketParams{
@@ -385,11 +385,11 @@ func TestGapInSequenceNumber(t *testing.T) {
tp, err = r.UpdateAndGetSnTs(extPkt)
require.ErrorIs(t, err, ErrPaddingOnlyPacket)
require.Equal(t, tpExpected, *tp)
require.Equal(t, uint32(65536+5), r.extHighestIncomingSN)
require.Equal(t, uint32(65536+3), r.extLastSN)
require.Equal(t, uint64(65536+5), r.extHighestIncomingSN)
require.Equal(t, uint64(65536+3), r.extLastSN)
snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(2), snOffset)
require.Equal(t, uint64(2), snOffset)
// a packet with a gap should be adding to missing cache
params = &testutils.TestExtPacketParams{
@@ -410,11 +410,11 @@ func TestGapInSequenceNumber(t *testing.T) {
tp, err = r.UpdateAndGetSnTs(extPkt)
require.NoError(t, err)
require.Equal(t, tpExpected, *tp)
require.Equal(t, uint32(65536+7), r.extHighestIncomingSN)
require.Equal(t, uint32(65536+5), r.extLastSN)
require.Equal(t, uint64(65536+7), r.extHighestIncomingSN)
require.Equal(t, uint64(65536+5), r.extLastSN)
snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(2), snOffset)
require.Equal(t, uint64(2), snOffset)
// check the missing packets
params = &testutils.TestExtPacketParams{
@@ -434,11 +434,11 @@ func TestGapInSequenceNumber(t *testing.T) {
tp, err = r.UpdateAndGetSnTs(extPkt)
require.NoError(t, err)
require.Equal(t, tpExpected, *tp)
require.Equal(t, uint32(65536+7), r.extHighestIncomingSN)
require.Equal(t, uint32(65536+5), r.extLastSN)
require.Equal(t, uint64(65536+7), r.extHighestIncomingSN)
require.Equal(t, uint64(65536+5), r.extLastSN)
snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(2), snOffset)
require.Equal(t, uint64(2), snOffset)
params = &testutils.TestExtPacketParams{
SequenceNumber: 3,
@@ -457,11 +457,11 @@ func TestGapInSequenceNumber(t *testing.T) {
tp, err = r.UpdateAndGetSnTs(extPkt)
require.NoError(t, err)
require.Equal(t, tpExpected, *tp)
require.Equal(t, uint32(65536+7), r.extHighestIncomingSN)
require.Equal(t, uint32(65536+5), r.extLastSN)
require.Equal(t, uint64(65536+7), r.extHighestIncomingSN)
require.Equal(t, uint64(65536+5), r.extLastSN)
snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(2), snOffset)
require.Equal(t, uint64(2), snOffset)
}
func TestUpdateAndGetPaddingSnTs(t *testing.T) {
@@ -493,7 +493,7 @@ func TestUpdateAndGetPaddingSnTs(t *testing.T) {
timestamp: params.Timestamp + ((uint32(i)*clockRate)+frameRate-1)/frameRate,
}
}
snts, err := r.UpdateAndGetPaddingSnTs(numPadding, clockRate, frameRate, true, params.Timestamp)
snts, err := r.UpdateAndGetPaddingSnTs(numPadding, clockRate, frameRate, true, extPkt.ExtTimestamp)
require.NoError(t, err)
require.Equal(t, sntsExpected, snts)
@@ -504,7 +504,7 @@ func TestUpdateAndGetPaddingSnTs(t *testing.T) {
timestamp: snts[len(snts)-1].timestamp + ((uint32(i+1)*clockRate)+frameRate-1)/frameRate,
}
}
snts, err = r.UpdateAndGetPaddingSnTs(numPadding, clockRate, frameRate, false, snts[len(snts)-1].timestamp)
snts, err = r.UpdateAndGetPaddingSnTs(numPadding, clockRate, frameRate, false, uint64(snts[len(snts)-1].timestamp))
require.NoError(t, err)
require.Equal(t, sntsExpected, snts)
}
+5 -5
View File
@@ -76,7 +76,7 @@ type StreamTrackerManager struct {
senderReportMu sync.RWMutex
senderReports [buffer.DefaultMaxLayerSpatial + 1]endsSenderReport
layerOffsets [buffer.DefaultMaxLayerSpatial + 1][buffer.DefaultMaxLayerSpatial + 1]uint32
layerOffsets [buffer.DefaultMaxLayerSpatial + 1][buffer.DefaultMaxLayerSpatial + 1]uint64
closed core.Fuse
@@ -563,10 +563,10 @@ func (s *StreamTrackerManager) updateLayerOffsetLocked(ref, other int32) {
rtpDiff := ntpDiff.Nanoseconds() * int64(s.clockRate) / 1e9
// calculate other layer's time stamp at the same time as ref layer's NTP time
normalizedOtherTS := srOther.RTPTimestamp + uint32(rtpDiff)
normalizedOtherTS := srOther.RTPTimestampExt + uint64(rtpDiff)
// now both layers' time stamp refer to the same NTP time and the diff is the offset between the layers
offset := srRef.RTPTimestamp - normalizedOtherTS
offset := srRef.RTPTimestampExt - normalizedOtherTS
// use minimal offset to indicate value availability in the extremely unlikely case of
// both layers using the same timestamp
@@ -643,7 +643,7 @@ func (s *StreamTrackerManager) GetCalculatedClockRate(layer int32) uint32 {
return uint32(float64(rdsf) / tsf.Seconds())
}
func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) {
func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ets uint64, layer int32, referenceLayer int32) (uint64, error) {
s.senderReportMu.RLock()
defer s.senderReportMu.RUnlock()
@@ -655,7 +655,7 @@ func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ts uint32, layer in
return 0, fmt.Errorf("offset unavailable, target: %d, reference: %d", layer, referenceLayer)
}
return ts + s.layerOffsets[referenceLayer][layer], nil
return ets + s.layerOffsets[referenceLayer][layer], nil
}
func (s *StreamTrackerManager) GetMaxTemporalLayerSeen() int32 {
+3 -1
View File
@@ -32,6 +32,7 @@ type TestExtPacketParams struct {
SequenceNumber uint16
SNCycles int
Timestamp uint32
TSCycles int
SSRC uint32
PayloadSize int
PaddingSize byte
@@ -63,7 +64,8 @@ func GetTestExtPacket(params *TestExtPacketParams) (*buffer.ExtPacket, error) {
ep := &buffer.ExtPacket{
VideoLayer: params.VideoLayer,
ExtSequenceNumber: uint32(params.SNCycles<<16) + uint32(params.SequenceNumber),
ExtSequenceNumber: uint64(params.SNCycles<<16) + uint64(params.SequenceNumber),
ExtTimestamp: uint64(params.TSCycles<<32) + uint64(params.Timestamp),
Arrival: params.ArrivalTime,
Packet: &packet,
KeyFrame: params.IsKeyFrame,
+2 -2
View File
@@ -30,11 +30,11 @@ var (
)
type rangeType interface {
uint32
uint32 | uint64
}
type valueType interface {
uint32
uint32 | uint64
}
type rangeVal[RT rangeType, VT valueType] struct {
+11 -10
View File
@@ -32,7 +32,7 @@ type WrapAround[T number, ET extendedNumber] struct {
initialized bool
start T
highest T
cycles int
cycles ET
}
func NewWrapAround[T number, ET extendedNumber]() *WrapAround[T, ET] {
@@ -78,7 +78,7 @@ func (w *WrapAround[T, ET]) Update(val T) (result WrapAroundUpdateResult[ET]) {
// in-order
if val < w.highest {
w.cycles++
w.cycles += w.fullRange
}
w.highest = val
@@ -88,13 +88,14 @@ func (w *WrapAround[T, ET]) Update(val T) (result WrapAroundUpdateResult[ET]) {
func (w *WrapAround[T, ET]) RollbackRestart(ev ET) {
if w.isWrapBack(w.start, T(ev)) {
w.cycles--
w.cycles -= w.fullRange
}
w.start = T(ev)
}
func (w *WrapAround[T, ET]) ResetHighest(val T) {
w.highest = val
func (w *WrapAround[T, ET]) ResetHighest(ev ET) {
w.highest = T(ev)
w.cycles = ev & ^(w.fullRange - 1)
}
func (w *WrapAround[T, ET]) GetStart() T {
@@ -122,7 +123,7 @@ func (w *WrapAround[T, ET]) maybeAdjustStart(val T) (isRestart bool, preExtended
totalNum := w.GetExtendedHighest() - w.GetExtendedStart() + 1
if totalNum > (w.fullRange >> 1) {
if w.isWrapBack(val, w.highest) {
cycles--
cycles -= w.fullRange
}
extendedVal = w.getExtendedHighest(cycles, val)
return
@@ -134,13 +135,13 @@ func (w *WrapAround[T, ET]) maybeAdjustStart(val T) (isRestart bool, preExtended
preExtendedStart = w.GetExtendedStart()
if w.isWrapBack(val, w.highest) {
w.cycles = 1
w.cycles = w.fullRange
cycles = 0
}
w.start = val
} else {
if w.isWrapBack(val, w.highest) {
cycles--
cycles -= w.fullRange
}
}
extendedVal = w.getExtendedHighest(cycles, val)
@@ -151,6 +152,6 @@ func (w *WrapAround[T, ET]) isWrapBack(earlier T, later T) bool {
return ET(later) < (w.fullRange>>1) && ET(earlier) >= (w.fullRange>>1)
}
func (w *WrapAround[T, ET]) getExtendedHighest(cycles int, val T) ET {
return ET(cycles)*w.fullRange + ET(val)
func (w *WrapAround[T, ET]) getExtendedHighest(cycles ET, val T) ET {
return cycles + ET(val)
}
+29 -16
View File
@@ -194,26 +194,26 @@ func TestWrapAroundUint16(t *testing.T) {
}
}
func TestWrapAroundUint16RollbackRestart(t *testing.T) {
w := NewWrapAround[uint16, uint32]()
func TestWrapAroundUint16RollbackRestartAndResetHighest(t *testing.T) {
w := NewWrapAround[uint16, uint64]()
// initialize
w.Update(23)
require.Equal(t, uint16(23), w.GetStart())
require.Equal(t, uint32(23), w.GetExtendedStart())
require.Equal(t, uint64(23), w.GetExtendedStart())
require.Equal(t, uint16(23), w.GetHighest())
require.Equal(t, uint32(23), w.GetExtendedHighest())
require.Equal(t, uint64(23), w.GetExtendedHighest())
// an in-order update
w.Update(25)
require.Equal(t, uint16(23), w.GetStart())
require.Equal(t, uint32(23), w.GetExtendedStart())
require.Equal(t, uint64(23), w.GetExtendedStart())
require.Equal(t, uint16(25), w.GetHighest())
require.Equal(t, uint32(25), w.GetExtendedHighest())
require.Equal(t, uint64(25), w.GetExtendedHighest())
// force restart without wrap
res := w.Update(12)
expectedResult := WrapAroundUpdateResult[uint32]{
expectedResult := WrapAroundUpdateResult[uint64]{
IsRestart: true,
PreExtendedStart: 23,
PreExtendedHighest: 25,
@@ -221,20 +221,20 @@ func TestWrapAroundUint16RollbackRestart(t *testing.T) {
}
require.Equal(t, expectedResult, res)
require.Equal(t, uint16(12), w.GetStart())
require.Equal(t, uint32(12), w.GetExtendedStart())
require.Equal(t, uint64(12), w.GetExtendedStart())
require.Equal(t, uint16(25), w.GetHighest())
require.Equal(t, uint32(25), w.GetExtendedHighest())
require.Equal(t, uint64(25), w.GetExtendedHighest())
// roll back restart
w.RollbackRestart(res.PreExtendedStart)
require.Equal(t, uint16(23), w.GetStart())
require.Equal(t, uint32(23), w.GetExtendedStart())
require.Equal(t, uint64(23), w.GetExtendedStart())
require.Equal(t, uint16(25), w.GetHighest())
require.Equal(t, uint32(25), w.GetExtendedHighest())
require.Equal(t, uint64(25), w.GetExtendedHighest())
// force restart with wrap
res = w.Update(65533)
expectedResult = WrapAroundUpdateResult[uint32]{
expectedResult = WrapAroundUpdateResult[uint64]{
IsRestart: true,
PreExtendedStart: 23,
PreExtendedHighest: 25,
@@ -242,16 +242,29 @@ func TestWrapAroundUint16RollbackRestart(t *testing.T) {
}
require.Equal(t, expectedResult, res)
require.Equal(t, uint16(65533), w.GetStart())
require.Equal(t, uint32(65533), w.GetExtendedStart())
require.Equal(t, uint64(65533), w.GetExtendedStart())
require.Equal(t, uint16(25), w.GetHighest())
require.Equal(t, uint32(65536+25), w.GetExtendedHighest())
require.Equal(t, uint64(65536+25), w.GetExtendedHighest())
// roll back restart
w.RollbackRestart(res.PreExtendedStart)
require.Equal(t, uint16(23), w.GetStart())
require.Equal(t, uint32(23), w.GetExtendedStart())
require.Equal(t, uint64(23), w.GetExtendedStart())
require.Equal(t, uint16(25), w.GetHighest())
require.Equal(t, uint32(25), w.GetExtendedHighest())
require.Equal(t, uint64(25), w.GetExtendedHighest())
// reset highest
w.ResetHighest(0x1234)
require.Equal(t, uint16(23), w.GetStart())
require.Equal(t, uint64(23), w.GetExtendedStart())
require.Equal(t, uint16(0x1234), w.GetHighest())
require.Equal(t, uint64(0x1234), w.GetExtendedHighest())
w.ResetHighest(0x7f1234)
require.Equal(t, uint16(23), w.GetStart())
require.Equal(t, uint64(23), w.GetExtendedStart())
require.Equal(t, uint16(0x1234), w.GetHighest())
require.Equal(t, uint64(0x7f1234), w.GetExtendedHighest())
}
func TestWrapAroundUint32(t *testing.T) {