Merge remote-tracking branch 'origin/master' into raja_min_packets

This commit is contained in:
boks1971
2023-08-27 11:07:03 +05:30
16 changed files with 505 additions and 449 deletions
+8 -4
View File
@@ -26,7 +26,7 @@ import (
var opusCodecCapability = webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus, ClockRate: 48000, Channels: 2, SDPFmtpLine: "minptime=10;useinbandfec=1"}
var redCodecCapability = webrtc.RTPCodecCapability{MimeType: sfu.MimeTypeAudioRed, ClockRate: 48000, Channels: 2, SDPFmtpLine: "111/111"}
func registerCodecs(me *webrtc.MediaEngine, codecs []*livekit.Codec, rtcpFeedback RTCPFeedbackConfig) error {
func registerCodecs(me *webrtc.MediaEngine, codecs []*livekit.Codec, rtcpFeedback RTCPFeedbackConfig, filterOutH264HighProfile bool) error {
opusCodec := opusCodecCapability
opusCodec.RTCPFeedback = rtcpFeedback.Audio
var opusPayload webrtc.PayloadType
@@ -49,6 +49,7 @@ func registerCodecs(me *webrtc.MediaEngine, codecs []*livekit.Codec, rtcpFeedbac
}
}
h264HighProfileFmtp := "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=640032"
for _, codec := range []webrtc.RTPCodecParameters{
{
RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP8, ClockRate: 90000, RTCPFeedback: rtcpFeedback.Video},
@@ -71,7 +72,7 @@ func registerCodecs(me *webrtc.MediaEngine, codecs []*livekit.Codec, rtcpFeedbac
PayloadType: 108,
},
{
RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264, ClockRate: 90000, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=640032", RTCPFeedback: rtcpFeedback.Video},
RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264, ClockRate: 90000, SDPFmtpLine: h264HighProfileFmtp, RTCPFeedback: rtcpFeedback.Video},
PayloadType: 123,
},
{
@@ -79,6 +80,9 @@ func registerCodecs(me *webrtc.MediaEngine, codecs []*livekit.Codec, rtcpFeedbac
PayloadType: 35,
},
} {
if filterOutH264HighProfile && codec.RTPCodecCapability.SDPFmtpLine == h264HighProfileFmtp {
continue
}
if IsCodecEnabled(codecs, codec.RTPCodecCapability) {
if err := me.RegisterCodec(codec, webrtc.RTPCodecTypeVideo); err != nil {
return err
@@ -104,9 +108,9 @@ func registerHeaderExtensions(me *webrtc.MediaEngine, rtpHeaderExtension RTPHead
return nil
}
func createMediaEngine(codecs []*livekit.Codec, config DirectionConfig) (*webrtc.MediaEngine, error) {
func createMediaEngine(codecs []*livekit.Codec, config DirectionConfig, filterOutH264HighProfile bool) (*webrtc.MediaEngine, error) {
me := &webrtc.MediaEngine{}
if err := registerCodecs(me, codecs, config.RTCPFeedback); err != nil {
if err := registerCodecs(me, codecs, config.RTCPFeedback, filterOutH264HighProfile); err != nil {
return nil, err
}
+4 -1
View File
@@ -263,7 +263,10 @@ func newPeerConnection(params TransportParams, onBandwidthEstimator func(estimat
directionConfig.RTPHeaderExtension.Video = append(directionConfig.RTPHeaderExtension.Video, rtpextension.PlayoutDelayURI)
}
me, err := createMediaEngine(params.EnabledCodecs, directionConfig)
// Some of the browser clients do not handle H.264 High Profile in signalling properly.
// They still decode if the actual stream is H.264 High Profile, but do not handle it well in signalling.
// So, disable H.264 High Profile for SUBSCRIBER peer connection to ensure it is not offered.
me, err := createMediaEngine(params.EnabledCodecs, directionConfig, params.IsOfferer)
if err != nil {
return nil, nil, err
}
+6 -5
View File
@@ -54,6 +54,7 @@ type pendingPacket struct {
type ExtPacket struct {
VideoLayer
Arrival time.Time
ExtSequenceNumber uint32
Packet *rtp.Packet
Payload interface{}
KeyFrame bool
@@ -83,7 +84,6 @@ type Buffer struct {
snRangeMap *utils.RangeMap[uint32, uint32]
// supported feedbacks
latestTSForAudioLevelInitialized bool
latestTSForAudioLevel uint32
@@ -441,7 +441,7 @@ func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) {
b.doReports(arrivalTime)
ep := b.getExtPacket(&rtpPacket, arrivalTime)
ep := b.getExtPacket(&rtpPacket, extSeqNumber, arrivalTime)
if ep == nil {
return
}
@@ -546,10 +546,11 @@ func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime time.Time) {
}
}
func (b *Buffer) getExtPacket(rtpPacket *rtp.Packet, arrivalTime time.Time) *ExtPacket {
func (b *Buffer) getExtPacket(rtpPacket *rtp.Packet, extSeqNumber uint32, arrivalTime time.Time) *ExtPacket {
ep := &ExtPacket{
Packet: rtpPacket,
Arrival: arrivalTime,
Arrival: arrivalTime,
ExtSequenceNumber: extSeqNumber,
Packet: rtpPacket,
VideoLayer: VideoLayer{
Spatial: InvalidLayerSpatial,
Temporal: InvalidLayerTemporal,
+2 -2
View File
@@ -212,8 +212,8 @@ func TestNewBuffer(t *testing.T) {
buf, _ := p.Marshal()
_, _ = buff.Write(buf)
}
require.Equal(t, uint16(1), buff.rtpStats.cycles)
require.Equal(t, uint16(2), buff.rtpStats.highestSN)
require.Equal(t, uint16(2), buff.rtpStats.sequenceNumber.GetHighest())
require.Equal(t, uint32(65536+2), buff.rtpStats.sequenceNumber.GetExtendedHighest())
})
}
}
+137 -201
View File
@@ -25,6 +25,7 @@ import (
"github.com/pion/rtp"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/livekit/livekit-server/pkg/sfu/utils"
"github.com/livekit/mediatransportutil"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
@@ -157,17 +158,13 @@ type RTPStats struct {
startTime time.Time
endTime time.Time
extStartSN uint32
highestSN uint16
cycles uint16
sequenceNumber *utils.WrapAround[uint16, uint32]
extHighestSNOverridden uint32
lastRRTime time.Time
lastRR rtcp.ReceptionReport
extStartTS uint64
highestTS uint32
tsCycles uint32
timestamp *utils.WrapAround[uint32, uint64]
firstTime time.Time
highestTime time.Time
@@ -232,6 +229,8 @@ func NewRTPStats(params RTPStatsParams) *RTPStats {
return &RTPStats{
params: params,
logger: params.Logger,
sequenceNumber: utils.NewWrapAround[uint16, uint32](),
timestamp: utils.NewWrapAround[uint32, uint64](),
nextSnapshotId: FirstSnapshotId,
snapshots: make(map[uint32]*Snapshot),
}
@@ -251,17 +250,13 @@ func (r *RTPStats) Seed(from *RTPStats) {
r.startTime = from.startTime
// do not clone endTime as a non-zero endTime indicates an ended object
r.extStartSN = from.extStartSN
r.highestSN = from.highestSN
r.cycles = from.cycles
r.sequenceNumber.Seed(from.sequenceNumber)
r.extHighestSNOverridden = from.extHighestSNOverridden
r.lastRRTime = from.lastRRTime
r.lastRR = from.lastRR
r.extStartTS = from.extStartTS
r.highestTS = from.highestTS
r.tsCycles = from.tsCycles
r.timestamp.Seed(from.timestamp)
r.firstTime = from.firstTime
r.highestTime = from.highestTime
@@ -352,10 +347,11 @@ func (r *RTPStats) NewSnapshotId() uint32 {
id := r.nextSnapshotId
if r.initialized {
extStartSN := r.sequenceNumber.GetExtendedStart()
r.snapshots[id] = &Snapshot{
startTime: time.Now(),
extStartSN: r.extStartSN,
extStartSNOverridden: r.extStartSN,
extStartSN: extStartSN,
extStartSNOverridden: extStartSN,
}
}
@@ -379,7 +375,18 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
return
}
first := false
if r.resyncOnNextPacket {
r.resyncOnNextPacket = false
if r.initialized {
r.sequenceNumber.ResetHighest(rtph.SequenceNumber - 1)
r.timestamp.ResetHighest(rtph.Timestamp)
r.highestTime = packetTime
}
}
var resSN utils.WrapAroundUpdateResult[uint32]
var resTS utils.WrapAroundUpdateResult[uint64]
if !r.initialized {
if payloadSize == 0 {
// do not start on a padding only packet
@@ -390,25 +397,19 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
r.startTime = time.Now()
r.extStartSN = uint32(rtph.SequenceNumber)
r.highestSN = rtph.SequenceNumber - 1
r.cycles = 0
r.extStartTS = uint64(rtph.Timestamp)
r.highestTS = rtph.Timestamp
r.tsCycles = 0
r.firstTime = packetTime
r.highestTime = packetTime
first = true
resSN = r.sequenceNumber.Update(rtph.SequenceNumber)
resTS = r.timestamp.Update(rtph.Timestamp)
// initialize snapshots if any
for i := uint32(FirstSnapshotId); i < r.nextSnapshotId; i++ {
extStartSN := r.sequenceNumber.GetExtendedStart()
r.snapshots[i] = &Snapshot{
startTime: r.startTime,
extStartSN: r.extStartSN,
extStartSNOverridden: r.extStartSN,
extStartSN: extStartSN,
extStartSNOverridden: extStartSN,
}
}
@@ -416,92 +417,95 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
"rtp stream start",
"startTime", r.startTime.String(),
"firstTime", r.firstTime.String(),
"startSN", r.extStartSN,
"startTS", r.extStartTS,
"startSN", r.sequenceNumber.GetExtendedHighest(),
"startTS", r.timestamp.GetExtendedHighest(),
)
}
if r.resyncOnNextPacket {
r.resyncOnNextPacket = false
r.highestSN = rtph.SequenceNumber - 1
r.highestTS = rtph.Timestamp
r.highestTime = packetTime
} else {
resSN = r.sequenceNumber.Update(rtph.SequenceNumber)
resTS = r.timestamp.Update(rtph.Timestamp)
}
hdrSize := uint64(rtph.MarshalSize())
pktSize := hdrSize + uint64(payloadSize+paddingSize)
isDuplicate := false
diff := rtph.SequenceNumber - r.highestSN
switch {
// duplicate or out-of-order
case diff == 0 || diff > (1<<15):
if diff != 0 {
gapSN := resSN.ExtendedVal - resSN.PreExtendedHighest
if gapSN == 0 || gapSN > (1<<31) { // duplicate OR out-of-order
if payloadSize == 0 {
// do not start on a padding only packet
if resTS.IsRestart {
r.logger.Infow("rolling back timestamp restart", "tsBefore", r.timestamp.GetExtendedStart(), "tsAfter", resTS.PreExtendedStart)
r.timestamp.RollbackRestart(resTS.PreExtendedStart)
}
if resSN.IsRestart {
r.logger.Infow("rolling back sequence number restart", "snBefore", r.sequenceNumber.GetExtendedStart(), "snAfter", resSN.PreExtendedStart)
r.sequenceNumber.RollbackRestart(resSN.PreExtendedStart)
return
}
}
if gapSN != 0 {
r.packetsOutOfOrder++
}
// adjust start to account for out-of-order packets before a cycle completes
if !r.maybeAdjustStart(rtph, pktSize, hdrSize, payloadSize) {
if !r.isSnInfoLost(rtph.SequenceNumber) {
r.bytesDuplicate += pktSize
r.headerBytesDuplicate += hdrSize
r.packetsDuplicate++
isDuplicate = true
} else {
r.packetsLost--
r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker, true)
if resSN.IsRestart {
r.packetsLost += resSN.PreExtendedStart - resSN.ExtendedVal
extStartSN := r.sequenceNumber.GetExtendedStart()
for _, s := range r.snapshots {
if s.extStartSN == resSN.PreExtendedStart {
s.extStartSN = extStartSN
}
}
r.logger.Infow(
"adjusting start sequence number",
"snBefore", resSN.PreExtendedStart,
"snAfter", resSN.ExtendedVal,
)
}
if resTS.IsRestart {
r.logger.Infow(
"adjusting start timestamp",
"tsBefore", resTS.PreExtendedStart,
"tsAfter", resTS.ExtendedVal,
)
}
if !r.isSnInfoLost(resSN.ExtendedVal, resSN.PreExtendedHighest) {
r.bytesDuplicate += pktSize
r.headerBytesDuplicate += hdrSize
r.packetsDuplicate++
isDuplicate = true
} else {
r.packetsLost--
r.setSnInfo(resSN.ExtendedVal, resSN.PreExtendedHighest, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker, true)
}
flowState.IsOutOfOrder = true
cycles := r.cycles
if rtph.SequenceNumber > r.highestSN {
cycles--
}
flowState.ExtSeqNumber = getExtSN(rtph.SequenceNumber, cycles)
// in-order
default:
flowState.ExtSeqNumber = resSN.ExtendedVal
} else { // in-order
// update gap histogram
r.updateGapHistogram(int(diff))
r.updateGapHistogram(int(gapSN))
// update missing sequence numbers
r.clearSnInfos(r.highestSN+1, rtph.SequenceNumber)
r.packetsLost += uint32(diff - 1)
r.clearSnInfos(resSN.PreExtendedHighest+1, resSN.ExtendedVal)
r.packetsLost += gapSN - 1
r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker, false)
if diff > 1 {
flowState.HasLoss = true
cycles := r.cycles
if r.highestSN+1 < r.highestSN {
cycles++
}
flowState.LossStartInclusive = getExtSN(r.highestSN+1, cycles)
}
if rtph.SequenceNumber < r.highestSN && !first {
r.cycles++
}
r.highestSN = rtph.SequenceNumber
if rtph.Timestamp != r.highestTS {
if rtph.Timestamp < r.highestTS && !first {
r.tsCycles++
}
r.highestTS = rtph.Timestamp
r.setSnInfo(resSN.ExtendedVal, resSN.PreExtendedHighest, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker, false)
if rtph.Timestamp != uint32(resTS.PreExtendedHighest) {
// update only on first packet as same timestamp could be in multiple packets.
// NOTE: this may not be the first packet with this time stamp if there is packet loss.
r.highestTime = packetTime
}
if flowState.HasLoss {
flowState.LossEndExclusive = getExtSN(rtph.SequenceNumber, r.cycles)
if gapSN > 1 {
flowState.HasLoss = true
flowState.LossStartInclusive = resSN.PreExtendedHighest + 1
flowState.LossEndExclusive = resSN.ExtendedVal
}
flowState.ExtSeqNumber = getExtSN(rtph.SequenceNumber, r.cycles)
flowState.ExtSeqNumber = resSN.ExtendedVal
}
if !isDuplicate {
@@ -520,7 +524,6 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
r.updateJitter(rtph, packetTime)
}
}
return
}
@@ -531,54 +534,8 @@ func (r *RTPStats) ResyncOnNextPacket() {
r.resyncOnNextPacket = true
}
func (r *RTPStats) maybeAdjustStart(rtph *rtp.Header, pktSize uint64, hdrSize uint64, payloadSize int) bool {
if (r.getExtHighestSN() - r.extStartSN + 1) >= (NumSequenceNumbers / 2) {
return false
}
if (rtph.SequenceNumber - uint16(r.extStartSN)) < (1 << 15) {
return false
}
if payloadSize == 0 {
// do not start on a padding only packet
r.logger.Infow("adjusting start, skipping on padding only packet")
return true
}
r.packetsLost += uint32(uint16(r.extStartSN)-rtph.SequenceNumber) - 1
snBeforeAdjust := r.extStartSN
r.extStartSN = uint32(rtph.SequenceNumber)
if r.extStartSN > snBeforeAdjust {
// wrapping back
r.cycles++
}
r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker, true)
for _, s := range r.snapshots {
if s.extStartSN == snBeforeAdjust {
s.extStartSN = r.extStartSN
}
}
tsBeforeAdjust := r.extStartTS
r.extStartTS = uint64(rtph.Timestamp)
if r.extStartTS > tsBeforeAdjust {
// wrapping back
r.tsCycles++
}
r.logger.Infow(
"adjusting start",
"snBefore", snBeforeAdjust,
"snAfter", r.extStartSN,
"snCyles", r.cycles,
"tsBefore", tsBeforeAdjust,
"tsAfter", r.extStartTS,
"tsCyles", r.tsCycles,
)
return true
func (r *RTPStats) getPacketsExpected() uint32 {
return r.sequenceNumber.GetExtendedHighest() - r.sequenceNumber.GetExtendedStart() + 1
}
func (r *RTPStats) GetTotalPacketsPrimary() uint32 {
@@ -589,7 +546,7 @@ func (r *RTPStats) GetTotalPacketsPrimary() uint32 {
}
func (r *RTPStats) getTotalPacketsPrimary() uint32 {
packetsExpected := r.getExtHighestSN() - r.extStartSN + 1
packetsExpected := r.getPacketsExpected()
if r.packetsLost > packetsExpected {
// should not happen
return 0
@@ -607,7 +564,7 @@ func (r *RTPStats) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt uint32
r.lock.Lock()
defer r.lock.Unlock()
if !r.initialized || !r.endTime.IsZero() || !r.params.IsReceiverReportDriven || rr.LastSequenceNumber < r.extStartSN {
if !r.initialized || !r.endTime.IsZero() || !r.params.IsReceiverReportDriven || rr.LastSequenceNumber < r.sequenceNumber.GetExtendedHighest() {
// it is possible that the `LastSequenceNumber` in the receiver report is before the starting
// sequence number when dummy packets are used to trigger Pion's OnTrack path.
return
@@ -846,7 +803,7 @@ func (r *RTPStats) maybeAdjustFirstPacketTime(ts uint32) {
// abnormal delay (maybe due to pacing or maybe due to queuing
// in some network element along the way), push back first time
// to an earlier instance.
samplesDiff := int32(ts - uint32(r.extStartTS))
samplesDiff := int32(ts - uint32(r.timestamp.GetExtendedStart()))
if samplesDiff < 0 {
// out-of-order, skip
return
@@ -863,7 +820,7 @@ func (r *RTPStats) maybeAdjustFirstPacketTime(ts uint32) {
"after", firstTime.String(),
"adjustment", r.firstTime.Sub(firstTime),
"nowTS", ts,
"extStartTS", r.extStartTS,
"extStartTS", r.timestamp.GetExtendedStart(),
)
if r.firstTime.Sub(firstTime) > firstPacketTimeAdjustThreshold {
r.logger.Infow("first packet time adjustment too big, ignoring",
@@ -873,7 +830,7 @@ func (r *RTPStats) maybeAdjustFirstPacketTime(ts uint32) {
"after", firstTime.String(),
"adjustment", r.firstTime.Sub(firstTime),
"nowTS", ts,
"extStartTS", r.extStartTS,
"extStartTS", r.timestamp.GetExtendedStart(),
)
} else {
r.firstTime = firstTime
@@ -967,7 +924,7 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) {
"expectedTimeDiffSinceLast", expectedTimeDiffSinceLast,
"packetDrift", packetDriftResult.String(),
"reportDrift", reportDriftResult.String(),
"highestTS", r.highestTS,
"highestTS", r.timestamp.GetExtendedHighest(),
"highestTime", r.highestTime.String(),
)
}
@@ -1000,7 +957,7 @@ func (r *RTPStats) GetExpectedRTPTimestamp(at time.Time) (expectedTSExt uint64,
timeDiff := at.Sub(r.firstTime)
expectedRTPDiff := timeDiff.Nanoseconds() * int64(r.params.ClockRate) / 1e9
expectedTSExt = r.extStartTS + uint64(expectedRTPDiff)
expectedTSExt = r.timestamp.GetExtendedStart() + uint64(expectedRTPDiff)
return
}
@@ -1018,19 +975,15 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, calculatedClockRate uint32)
nowNTP := mediatransportutil.ToNtpTime(now)
timeSinceHighest := now.Sub(r.highestTime)
nowRTP := r.highestTS + uint32(timeSinceHighest.Nanoseconds()*int64(r.params.ClockRate)/1e9)
nowRTPExt := r.timestamp.GetExtendedHighest() + uint64(timeSinceHighest.Nanoseconds()*int64(r.params.ClockRate)/1e9)
nowRTP := uint32(nowRTPExt)
// It is possible that publisher is pacing at a slower rate.
// That would make `highestTS` to be lagging the RTP time stamp in the RTCP Sender Report from publisher.
// Check for that using calculated clock rate and use the later time stamp if applicable.
tsCycles := r.tsCycles
if nowRTP < r.highestTS {
tsCycles++
}
nowRTPExt := getExtTS(nowRTP, tsCycles)
var nowRTPExtUsingRate uint64
if calculatedClockRate != 0 {
nowRTPExtUsingRate = r.extStartTS + uint64(float64(calculatedClockRate)*timeSinceFirst.Seconds())
nowRTPExtUsingRate = r.timestamp.GetExtendedStart() + uint64(float64(calculatedClockRate)*timeSinceFirst.Seconds())
if nowRTPExtUsingRate > nowRTPExt {
nowRTPExt = nowRTPExtUsingRate
nowRTP = uint32(nowRTPExt)
@@ -1104,7 +1057,7 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, calculatedClockRate uint32)
"expectedTimeDiffSinceLast", expectedTimeDiffSinceLast,
"packetDrift", packetDriftResult.String(),
"reportDrift", reportDriftResult.String(),
"highestTS", r.highestTS,
"highestTS", r.timestamp.GetExtendedHighest(),
"highestTime", r.highestTime.String(),
"calculatedClockRate", calculatedClockRate,
"nowRTPExt", nowRTPExt,
@@ -1145,7 +1098,7 @@ func (r *RTPStats) SnapshotRtcpReceptionReport(ssrc uint32, proxyFracLost uint8,
return nil
}
intervalStats := r.getIntervalStats(uint16(then.extStartSN), uint16(now.extStartSN))
intervalStats := r.getIntervalStats(then.extStartSN, now.extStartSN)
packetsLost := intervalStats.packetsLost
lossRate := float32(packetsLost) / float32(packetsExpected)
fracLost := uint8(lossRate * 256.0)
@@ -1205,7 +1158,7 @@ func (r *RTPStats) DeltaInfo(snapshotId uint32) *RTPDeltaInfo {
}
}
intervalStats := r.getIntervalStats(uint16(then.extStartSN), uint16(now.extStartSN))
intervalStats := r.getIntervalStats(then.extStartSN, now.extStartSN)
return &RTPDeltaInfo{
StartTime: startTime,
Duration: endTime.Sub(startTime),
@@ -1260,7 +1213,7 @@ func (r *RTPStats) DeltaInfoOverridden(snapshotId uint32) *RTPDeltaInfo {
return nil
}
intervalStats := r.getIntervalStats(uint16(then.extStartSNOverridden), uint16(now.extStartSNOverridden))
intervalStats := r.getIntervalStats(then.extStartSNOverridden, now.extStartSNOverridden)
packetsLost := now.packetsLostOverridden - then.packetsLostOverridden
if int32(packetsLost) < 0 {
packetsLost = 0
@@ -1321,12 +1274,12 @@ func (r *RTPStats) ToString() string {
r.lock.RLock()
defer r.lock.RUnlock()
expectedPackets := r.getExtHighestSN() - r.extStartSN + 1
expectedPackets := r.getPacketsExpected()
expectedPacketRate := float64(expectedPackets) / p.Duration
str := fmt.Sprintf("t: %+v|%+v|%.2fs", p.StartTime.AsTime().Format(time.UnixDate), p.EndTime.AsTime().Format(time.UnixDate), p.Duration)
str += fmt.Sprintf(", sn: %d|%d", r.extStartSN, r.getExtHighestSN())
str += fmt.Sprintf(", sn: %d|%d", r.sequenceNumber.GetExtendedStart(), r.sequenceNumber.GetExtendedHighest())
str += fmt.Sprintf(", ep: %d|%.2f/s", expectedPackets, expectedPacketRate)
str += fmt.Sprintf(", p: %d|%.2f/s", p.Packets, p.PacketRate)
@@ -1411,7 +1364,7 @@ func (r *RTPStats) ToProto() *livekit.RTPStats {
frameRate := float64(r.frames) / elapsed
packetsExpected := r.getExtHighestSN() - r.extStartSN + 1
packetsExpected := r.getPacketsExpected()
packetsLost := r.getPacketsLost()
packetLostRate := float64(packetsLost) / elapsed
packetLostPercentage := float32(packetsLost) / float32(packetsExpected) * 100.0
@@ -1503,16 +1456,12 @@ func (r *RTPStats) ToProto() *livekit.RTPStats {
return p
}
func (r *RTPStats) getExtHighestSN() uint32 {
return (uint32(r.cycles) << 16) | uint32(r.highestSN)
}
func (r *RTPStats) getExtHighestSNAdjusted() uint32 {
if r.params.IsReceiverReportDriven && !r.lastRRTime.IsZero() {
return r.extHighestSNOverridden
}
return r.getExtHighestSN()
return r.sequenceNumber.GetExtendedHighest()
}
func (r *RTPStats) getPacketsLost() uint32 {
@@ -1523,13 +1472,13 @@ func (r *RTPStats) getPacketsLost() uint32 {
return r.packetsLost
}
func (r *RTPStats) getSnInfoOutOfOrderPtr(sn uint16) int {
offset := sn - r.highestSN
if offset > 0 && offset < (1<<15) {
func (r *RTPStats) getSnInfoOutOfOrderPtr(esn uint32, ehsn uint32) int {
offset := esn - ehsn
if offset > 0 && offset < (1<<31) {
return -1 // in-order, not expected, maybe too new
}
offset = r.highestSN - sn
offset = ehsn - esn
if int(offset) >= SnInfoSize {
// too old, ignore
return -1
@@ -1538,14 +1487,14 @@ func (r *RTPStats) getSnInfoOutOfOrderPtr(sn uint16) int {
return (r.snInfoWritePtr - int(offset) - 1) & SnInfoMask
}
func (r *RTPStats) setSnInfo(sn uint16, pktSize uint16, hdrSize uint16, payloadSize uint16, marker bool, isOutOfOrder bool) {
func (r *RTPStats) setSnInfo(esn uint32, ehsn uint32, pktSize uint16, hdrSize uint16, payloadSize uint16, marker bool, isOutOfOrder bool) {
writePtr := 0
ooo := (sn - r.highestSN) > (1 << 15)
ooo := (esn - ehsn) > (1 << 31)
if !ooo {
writePtr = r.snInfoWritePtr
r.snInfoWritePtr = (writePtr + 1) & SnInfoMask
} else {
writePtr = r.getSnInfoOutOfOrderPtr(sn)
writePtr = r.getSnInfoOutOfOrderPtr(esn, ehsn)
if writePtr < 0 {
return
}
@@ -1559,8 +1508,8 @@ func (r *RTPStats) setSnInfo(sn uint16, pktSize uint16, hdrSize uint16, payloadS
snInfo.isOutOfOrder = isOutOfOrder
}
func (r *RTPStats) clearSnInfos(startInclusive uint16, endExclusive uint16) {
for sn := startInclusive; sn != endExclusive; sn++ {
func (r *RTPStats) clearSnInfos(extStartInclusive uint32, extEndExclusive uint32) {
for esn := extStartInclusive; esn != extEndExclusive; esn++ {
snInfo := &r.snInfos[r.snInfoWritePtr]
snInfo.pktSize = 0
snInfo.hdrSize = 0
@@ -1571,8 +1520,8 @@ func (r *RTPStats) clearSnInfos(startInclusive uint16, endExclusive uint16) {
}
}
func (r *RTPStats) isSnInfoLost(sn uint16) bool {
readPtr := r.getSnInfoOutOfOrderPtr(sn)
func (r *RTPStats) isSnInfoLost(esn uint32, ehsn uint32) bool {
readPtr := r.getSnInfoOutOfOrderPtr(esn, ehsn)
if readPtr < 0 {
return false
}
@@ -1581,10 +1530,10 @@ func (r *RTPStats) isSnInfoLost(sn uint16) bool {
return snInfo.pktSize == 0
}
func (r *RTPStats) getIntervalStats(startInclusive uint16, endExclusive uint16) (intervalStats IntervalStats) {
func (r *RTPStats) getIntervalStats(extStartInclusive uint32, extEndExclusive uint32) (intervalStats IntervalStats) {
packetsNotFound := uint32(0)
processSN := func(sn uint16) {
readPtr := r.getSnInfoOutOfOrderPtr(sn)
processESN := func(esn uint32, ehsn uint32) {
readPtr := r.getSnInfoOutOfOrderPtr(esn, ehsn)
if readPtr < 0 {
packetsNotFound++
return
@@ -1614,24 +1563,18 @@ func (r *RTPStats) getIntervalStats(startInclusive uint16, endExclusive uint16)
}
}
if startInclusive == endExclusive {
// do a full cycle
for sn := uint32(0); sn < NumSequenceNumbers; sn++ {
processSN(uint16(sn))
}
} else {
for sn := startInclusive; sn != endExclusive; sn++ {
processSN(sn)
}
ehsn := r.sequenceNumber.GetExtendedHighest()
for esn := extStartInclusive; esn != extEndExclusive; esn++ {
processESN(esn, ehsn)
}
if packetsNotFound != 0 {
r.logger.Errorw(
"could not find some packets", nil,
"start", startInclusive,
"end", endExclusive,
"start", extStartInclusive,
"end", extEndExclusive,
"count", packetsNotFound,
"highestSN", r.highestSN,
"highestSN", r.sequenceNumber.GetExtendedHighest(),
)
}
return
@@ -1676,7 +1619,7 @@ func (r *RTPStats) updateJitter(rtph *rtp.Header, packetTime time.Time) {
func (r *RTPStats) getDrift() (packetDrift driftResult, reportDrift driftResult) {
packetDrift.timeSinceFirst = r.highestTime.Sub(r.firstTime)
packetDrift.rtpDiffSinceFirst = getExtTS(r.highestTS, r.tsCycles) - r.extStartTS
packetDrift.rtpDiffSinceFirst = r.timestamp.GetExtendedHighest() - r.timestamp.GetExtendedStart()
packetDrift.driftSamples = int64(packetDrift.rtpDiffSinceFirst - uint64(packetDrift.timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9))
packetDrift.driftMs = (float64(packetDrift.driftSamples) * 1000) / float64(r.params.ClockRate)
if packetDrift.timeSinceFirst.Seconds() != 0 {
@@ -1715,10 +1658,11 @@ func (r *RTPStats) getAndResetSnapshot(snapshotId uint32, override bool) (*Snaps
then := r.snapshots[snapshotId]
if then == nil {
extStartSN := r.sequenceNumber.GetExtendedHighest()
then = &Snapshot{
startTime: r.startTime,
extStartSN: r.extStartSN,
extStartSNOverridden: r.extStartSN,
extStartSN: extStartSN,
extStartSNOverridden: extStartSN,
}
r.snapshots[snapshotId] = then
}
@@ -1733,7 +1677,7 @@ func (r *RTPStats) getAndResetSnapshot(snapshotId uint32, override bool) (*Snaps
// snapshot now
r.snapshots[snapshotId] = &Snapshot{
startTime: startTime,
extStartSN: r.getExtHighestSN() + 1,
extStartSN: r.sequenceNumber.GetExtendedHighest() + 1,
extStartSNOverridden: r.getExtHighestSNAdjusted() + 1,
packetsDuplicate: r.packetsDuplicate,
bytesDuplicate: r.bytesDuplicate,
@@ -1754,14 +1698,6 @@ func (r *RTPStats) getAndResetSnapshot(snapshotId uint32, override bool) (*Snaps
// ----------------------------------
func getExtSN(sn uint16, cycles uint16) uint32 {
return (uint32(cycles) << 16) | uint32(sn)
}
func getExtTS(ts uint32, cycles uint32) uint64 {
return (uint64(cycles) << 32) | uint64(ts)
}
func AggregateRTPStats(statsList []*livekit.RTPStats) *livekit.RTPStats {
if len(statsList) == 0 {
return nil
+21 -11
View File
@@ -90,8 +90,10 @@ func TestRTPStats_Update(t *testing.T) {
flowState := r.Update(&packet.Header, len(packet.Payload), 0, time.Now())
require.False(t, flowState.HasLoss)
require.True(t, r.initialized)
require.Equal(t, sequenceNumber, r.highestSN)
require.Equal(t, timestamp, r.highestTS)
require.Equal(t, sequenceNumber, r.sequenceNumber.GetHighest())
require.Equal(t, sequenceNumber, uint16(r.sequenceNumber.GetExtendedHighest()))
require.Equal(t, timestamp, r.timestamp.GetHighest())
require.Equal(t, timestamp, uint32(r.timestamp.GetExtendedHighest()))
// in-order, no loss
sequenceNumber++
@@ -99,15 +101,19 @@ func TestRTPStats_Update(t *testing.T) {
packet = getPacket(sequenceNumber, timestamp, 1000)
flowState = r.Update(&packet.Header, len(packet.Payload), 0, time.Now())
require.False(t, flowState.HasLoss)
require.Equal(t, sequenceNumber, r.highestSN)
require.Equal(t, timestamp, r.highestTS)
require.Equal(t, sequenceNumber, r.sequenceNumber.GetHighest())
require.Equal(t, sequenceNumber, uint16(r.sequenceNumber.GetExtendedHighest()))
require.Equal(t, timestamp, r.timestamp.GetHighest())
require.Equal(t, timestamp, uint32(r.timestamp.GetExtendedHighest()))
// out-of-order
packet = getPacket(sequenceNumber-10, timestamp-30000, 1000)
flowState = r.Update(&packet.Header, len(packet.Payload), 0, time.Now())
require.False(t, flowState.HasLoss)
require.Equal(t, sequenceNumber, r.highestSN)
require.Equal(t, timestamp, r.highestTS)
require.Equal(t, sequenceNumber, r.sequenceNumber.GetHighest())
require.Equal(t, sequenceNumber, uint16(r.sequenceNumber.GetExtendedHighest()))
require.Equal(t, timestamp, r.timestamp.GetHighest())
require.Equal(t, timestamp, uint32(r.timestamp.GetExtendedHighest()))
require.Equal(t, uint32(1), r.packetsOutOfOrder)
require.Equal(t, uint32(0), r.packetsDuplicate)
@@ -115,8 +121,10 @@ func TestRTPStats_Update(t *testing.T) {
packet = getPacket(sequenceNumber-10, timestamp-30000, 1000)
flowState = r.Update(&packet.Header, len(packet.Payload), 0, time.Now())
require.False(t, flowState.HasLoss)
require.Equal(t, sequenceNumber, r.highestSN)
require.Equal(t, timestamp, r.highestTS)
require.Equal(t, sequenceNumber, r.sequenceNumber.GetHighest())
require.Equal(t, sequenceNumber, uint16(r.sequenceNumber.GetExtendedHighest()))
require.Equal(t, timestamp, r.timestamp.GetHighest())
require.Equal(t, timestamp, uint32(r.timestamp.GetExtendedHighest()))
require.Equal(t, uint32(2), r.packetsOutOfOrder)
require.Equal(t, uint32(1), r.packetsDuplicate)
@@ -134,12 +142,14 @@ func TestRTPStats_Update(t *testing.T) {
packet = getPacket(sequenceNumber-15, timestamp-45000, 1000)
flowState = r.Update(&packet.Header, len(packet.Payload), 0, time.Now())
require.False(t, flowState.HasLoss)
require.Equal(t, sequenceNumber, r.highestSN)
require.Equal(t, timestamp, r.highestTS)
require.Equal(t, sequenceNumber, r.sequenceNumber.GetHighest())
require.Equal(t, sequenceNumber, uint16(r.sequenceNumber.GetExtendedHighest()))
require.Equal(t, timestamp, r.timestamp.GetHighest())
require.Equal(t, timestamp, uint32(r.timestamp.GetExtendedHighest()))
require.Equal(t, uint32(3), r.packetsOutOfOrder)
require.Equal(t, uint32(1), r.packetsDuplicate)
require.Equal(t, uint32(16), r.packetsLost)
intervalStats := r.getIntervalStats(uint16(r.extStartSN), uint16(r.getExtHighestSN()+1))
intervalStats := r.getIntervalStats(r.sequenceNumber.GetExtendedStart(), r.sequenceNumber.GetExtendedHighest()+1)
require.Equal(t, uint32(16), intervalStats.packetsLost)
r.Stop()
+3 -8
View File
@@ -84,6 +84,7 @@ var (
ErrOutOfOrderSequenceNumberCacheMiss = errors.New("out-of-order sequence number not found in cache")
ErrPaddingOnlyPacket = errors.New("padding only packet that need not be forwarded")
ErrDuplicatePacket = errors.New("duplicate packet")
ErrSequenceNumberOffsetNotFound = errors.New("sequence number offset not found")
ErrPaddingNotOnFrameBoundary = errors.New("padding cannot send on non-frame boundary")
ErrDownTrackAlreadyBound = errors.New("already bound")
)
@@ -1634,16 +1635,10 @@ func (d *DownTrack) translateVP8PacketTo(pkt *rtp.Packet, incomingVP8 *buffer.VP
}
func (d *DownTrack) DebugInfo() map[string]interface{} {
rtpMungerParams := d.forwarder.GetRTPMungerParams()
stats := map[string]interface{}{
"HighestIncomingSN": rtpMungerParams.highestIncomingSN,
"LastSN": rtpMungerParams.lastSN,
"SNOffset": rtpMungerParams.snOffset,
"LastTS": rtpMungerParams.lastTS,
"TSOffset": rtpMungerParams.tsOffset,
"LastMarker": rtpMungerParams.lastMarker,
"LastPli": d.rtpStats.LastPli(),
"LastPli": d.rtpStats.LastPli(),
}
stats["RTPMunger"] = d.forwarder.RTPMungerDebugInfo()
senderReport := d.CreateSenderReport()
if senderReport != nil {
+3 -3
View File
@@ -1585,7 +1585,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
"expectedTS", expectedTS,
"nextTS", nextTS,
"tsJump", nextTS-lastTS,
"nextSN", rtpMungerState.LastSN+1,
"nextSN", rtpMungerState.ExtLastSN+1,
)
f.rtpMunger.UpdateSnTsOffsets(extPkt, 1, nextTS-lastTS)
@@ -1792,11 +1792,11 @@ func (f *Forwarder) GetPadding(frameEndNeeded bool) ([]byte, error) {
return f.codecMunger.UpdateAndGetPadding(!frameEndNeeded)
}
func (f *Forwarder) GetRTPMungerParams() RTPMungerParams {
func (f *Forwarder) RTPMungerDebugInfo() map[string]interface{} {
f.lock.RLock()
defer f.lock.RUnlock()
return f.rtpMunger.GetParams()
return f.rtpMunger.DebugInfo()
}
// -----------------------------------------------------------------------------
+13 -9
View File
@@ -1196,9 +1196,13 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) {
require.NoError(t, err)
require.Equal(t, expectedTP, *actualTP)
// add a missing sequence number to the cache
f.rtpMunger.snRangeMap.IncValue(10)
f.rtpMunger.snRangeMap.AddRange(23332, 23333)
// out-of-order packet not in cache should be dropped
params = &testutils.TestExtPacketParams{
SequenceNumber: 23332,
SequenceNumber: 23331,
Timestamp: 0xabcdef,
SSRC: 0x12345678,
PayloadSize: 20,
@@ -1239,7 +1243,7 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) {
expectedTP = TranslationParams{
rtp: &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingContiguous,
sequenceNumber: 23334,
sequenceNumber: 23324,
timestamp: 0xabcdef,
},
}
@@ -1247,7 +1251,7 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) {
require.NoError(t, err)
require.Equal(t, expectedTP, *actualTP)
// padding only packet after a gap should be forwarded
// padding only packet after a gap should not be dropped
params = &testutils.TestExtPacketParams{
SequenceNumber: 23337,
Timestamp: 0xabcdef,
@@ -1258,7 +1262,7 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) {
expectedTP = TranslationParams{
rtp: &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingGap,
sequenceNumber: 23336,
sequenceNumber: 23326,
timestamp: 0xabcdef,
},
}
@@ -1278,7 +1282,7 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) {
expectedTP = TranslationParams{
rtp: &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingOutOfOrder,
sequenceNumber: 23335,
sequenceNumber: 23325,
timestamp: 0xabcdef,
},
}
@@ -1298,7 +1302,7 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) {
expectedTP = TranslationParams{
rtp: &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingContiguous,
sequenceNumber: 23337,
sequenceNumber: 23327,
timestamp: 0xabcdf0,
},
}
@@ -1716,7 +1720,7 @@ func TestForwarderGetTranslationParamsVideo(t *testing.T) {
require.Equal(t, f.lastSSRC, params.SSRC)
}
func TestForwardGetSnTsForPadding(t *testing.T) {
func TestForwarderGetSnTsForPadding(t *testing.T) {
f := newForwarder(testutils.TestVP8Codec, webrtc.RTPCodecTypeVideo)
params := &testutils.TestExtPacketParams{
@@ -1783,7 +1787,7 @@ func TestForwardGetSnTsForPadding(t *testing.T) {
require.Equal(t, sntsExpected, snts)
}
func TestForwardGetSnTsForBlankFrames(t *testing.T) {
func TestForwarderGetSnTsForBlankFrames(t *testing.T) {
f := newForwarder(testutils.TestVP8Codec, webrtc.RTPCodecTypeVideo)
params := &testutils.TestExtPacketParams{
@@ -1860,7 +1864,7 @@ func TestForwardGetSnTsForBlankFrames(t *testing.T) {
require.Equal(t, sntsExpected, snts)
}
func TestForwardGetPaddingVP8(t *testing.T) {
func TestForwarderGetPaddingVP8(t *testing.T) {
f := newForwarder(testutils.TestVP8Codec, webrtc.RTPCodecTypeVideo)
params := &testutils.TestExtPacketParams{
+84 -107
View File
@@ -20,6 +20,7 @@ import (
"github.com/livekit/protocol/logger"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/livekit-server/pkg/sfu/utils"
)
//
@@ -36,9 +37,6 @@ const (
const (
RtxGateWindow = 2000
SnOffsetCacheSize = 4096
SnOffsetCacheMask = SnOffsetCacheSize - 1
)
type TranslationParamsRTP struct {
@@ -55,97 +53,96 @@ type SnTs struct {
// ----------------------------------------------------------------------
type RTPMungerState struct {
LastSN uint16
LastTS uint32
ExtLastSN uint32
LastTS uint32
}
func (r RTPMungerState) String() string {
return fmt.Sprintf("RTPMungerState{lastSN: %d, lastTS: %d)", r.LastSN, r.LastTS)
return fmt.Sprintf("RTPMungerState{extLastSN: %d, lastTS: %d)", r.ExtLastSN, r.LastTS)
}
// ----------------------------------------------------------------------
type RTPMungerParams struct {
highestIncomingSN uint16
lastSN uint16
snOffset uint16
lastTS uint32
tsOffset uint32
lastMarker bool
snOffsets [SnOffsetCacheSize]uint16
snOffsetsWritePtr int
snOffsetsOccupancy int
rtxGateSn uint16
isInRtxGateRegion bool
}
type RTPMunger struct {
logger logger.Logger
RTPMungerParams
extHighestIncomingSN uint32
snRangeMap *utils.RangeMap[uint32, uint32]
extLastSN uint32
lastTS uint32
tsOffset uint32
lastMarker bool
extRtxGateSn uint32
isInRtxGateRegion bool
}
func NewRTPMunger(logger logger.Logger) *RTPMunger {
return &RTPMunger{
logger: logger,
logger: logger,
snRangeMap: utils.NewRangeMap[uint32, uint32](100),
}
}
func (r *RTPMunger) GetParams() RTPMungerParams {
return RTPMungerParams{
highestIncomingSN: r.highestIncomingSN,
lastSN: r.lastSN,
snOffset: r.snOffset,
lastTS: r.lastTS,
tsOffset: r.tsOffset,
lastMarker: r.lastMarker,
func (r *RTPMunger) DebugInfo() map[string]interface{} {
snOffset, _ := r.snRangeMap.GetValue(r.extHighestIncomingSN)
return map[string]interface{}{
"ExtHighestIncomingSN": r.extHighestIncomingSN,
"ExtLastSN": r.extLastSN,
"SNOffset": snOffset,
"LastTS": r.lastTS,
"TSOffset": r.tsOffset,
"LastMarker": r.lastMarker,
}
}
func (r *RTPMunger) GetLast() RTPMungerState {
return RTPMungerState{
LastSN: r.lastSN,
LastTS: r.lastTS,
ExtLastSN: r.extLastSN,
LastTS: r.lastTS,
}
}
func (r *RTPMunger) SeedLast(state RTPMungerState) {
r.lastSN = state.LastSN
r.extLastSN = state.ExtLastSN
r.lastTS = state.LastTS
}
func (r *RTPMunger) SetLastSnTs(extPkt *buffer.ExtPacket) {
r.highestIncomingSN = extPkt.Packet.SequenceNumber - 1
r.lastSN = extPkt.Packet.SequenceNumber
r.extHighestIncomingSN = extPkt.ExtSequenceNumber - 1
r.extLastSN = extPkt.ExtSequenceNumber
r.lastTS = extPkt.Packet.Timestamp
}
func (r *RTPMunger) UpdateSnTsOffsets(extPkt *buffer.ExtPacket, snAdjust uint16, tsAdjust uint32) {
r.highestIncomingSN = extPkt.Packet.SequenceNumber - 1
r.snOffset = extPkt.Packet.SequenceNumber - r.lastSN - snAdjust
func (r *RTPMunger) UpdateSnTsOffsets(extPkt *buffer.ExtPacket, snAdjust uint32, tsAdjust uint32) {
r.extHighestIncomingSN = extPkt.ExtSequenceNumber - 1
r.snRangeMap.ClearAndResetValue(extPkt.ExtSequenceNumber - r.extLastSN - snAdjust)
r.tsOffset = extPkt.Packet.Timestamp - r.lastTS - tsAdjust
// clear offsets cache layer/source switch
r.snOffsetsWritePtr = 0
r.snOffsetsOccupancy = 0
}
func (r *RTPMunger) PacketDropped(extPkt *buffer.ExtPacket) {
if r.highestIncomingSN != extPkt.Packet.SequenceNumber {
if r.extHighestIncomingSN != extPkt.ExtSequenceNumber {
return
}
r.snOffset++
r.lastSN = extPkt.Packet.SequenceNumber - r.snOffset
r.snRangeMap.IncValue(1)
snOffset, err := r.snRangeMap.GetValue(extPkt.ExtSequenceNumber)
if err != nil {
r.logger.Errorw("could not get sequence number offset", err)
return
}
r.extLastSN = extPkt.ExtSequenceNumber - snOffset
}
func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationParamsRTP, error) {
// if out-of-order, look up sequence number offset cache
diff := extPkt.Packet.SequenceNumber - r.highestIncomingSN
if diff > (1 << 15) {
snOffset, isValid := r.getSnOffset(extPkt.Packet.SequenceNumber)
if !isValid {
diff := extPkt.ExtSequenceNumber - r.extHighestIncomingSN
if diff > (1 << 31) {
// out-of-order, look up sequence number offset cache
snOffset, err := r.snRangeMap.GetValue(extPkt.ExtSequenceNumber)
if err != nil {
return &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingOutOfOrder,
}, ErrOutOfOrderSequenceNumberCacheMiss
@@ -153,72 +150,60 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara
return &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingOutOfOrder,
sequenceNumber: extPkt.Packet.SequenceNumber - snOffset,
sequenceNumber: uint16(extPkt.ExtSequenceNumber - snOffset),
timestamp: extPkt.Packet.Timestamp - r.tsOffset,
}, nil
}
// record sn offset
for i := r.highestIncomingSN + 1; i != extPkt.Packet.SequenceNumber+1; i++ {
r.snOffsets[r.snOffsetsWritePtr] = r.snOffset
r.snOffsetsWritePtr = (r.snOffsetsWritePtr + 1) & SnOffsetCacheMask
r.snOffsetsOccupancy++
}
if r.snOffsetsOccupancy > SnOffsetCacheSize {
r.snOffsetsOccupancy = SnOffsetCacheSize
// can get duplicate packet due to FEC
if diff == 0 {
return &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingDuplicate,
}, ErrDuplicatePacket
}
ordering := SequenceNumberOrderingContiguous
if diff > 1 {
ordering = SequenceNumberOrderingGap
} else {
// can get duplicate packet due to FEC
if diff == 0 {
return &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingDuplicate,
}, ErrDuplicatePacket
}
// if padding only packet, can be dropped and sequence number adjusted
// as it is contiguous and in order. That means this is the highest
// incoming sequence number, and it is a good point to adjust
// sequence number offset.
if len(extPkt.Packet.Payload) == 0 {
r.highestIncomingSN = extPkt.Packet.SequenceNumber
r.snOffset++
return &TranslationParamsRTP{
snOrdering: SequenceNumberOrderingContiguous,
}, ErrPaddingOnlyPacket
}
r.snRangeMap.AddRange(r.extHighestIncomingSN+1, extPkt.ExtSequenceNumber)
}
// in-order incoming packet, may or may not be contiguous.
// In the case of loss (i.e. incoming sequence number is not contiguous),
// forward even if it is a padding only packet. With temporal scalability,
// it is unclear if the current packet should be dropped if it is not
// contiguous. Hence, forward anything that is not contiguous.
// Reference: http://www.rtcbits.com/2017/04/howto-implement-temporal-scalability.html
mungedSN := extPkt.Packet.SequenceNumber - r.snOffset
r.extHighestIncomingSN = extPkt.ExtSequenceNumber
// if padding only packet, can be dropped and sequence number adjusted, if contiguous
if diff == 1 && len(extPkt.Packet.Payload) == 0 {
r.snRangeMap.IncValue(1)
return &TranslationParamsRTP{
snOrdering: ordering,
}, ErrPaddingOnlyPacket
}
snOffset, err := r.snRangeMap.GetValue(extPkt.ExtSequenceNumber)
if err != nil {
return &TranslationParamsRTP{
snOrdering: ordering,
}, ErrSequenceNumberOffsetNotFound
}
extMungedSN := extPkt.ExtSequenceNumber - snOffset
mungedTS := extPkt.Packet.Timestamp - r.tsOffset
r.highestIncomingSN = extPkt.Packet.SequenceNumber
r.lastSN = mungedSN
r.extLastSN = extMungedSN
r.lastTS = mungedTS
r.lastMarker = extPkt.Packet.Marker
if extPkt.KeyFrame {
r.rtxGateSn = mungedSN
r.extRtxGateSn = extMungedSN
r.isInRtxGateRegion = true
}
if r.isInRtxGateRegion && (mungedSN-r.rtxGateSn) < (1<<15) && (mungedSN-r.rtxGateSn) > RtxGateWindow {
if r.isInRtxGateRegion && (extMungedSN-r.extRtxGateSn) > RtxGateWindow {
r.isInRtxGateRegion = false
}
return &TranslationParamsRTP{
snOrdering: ordering,
sequenceNumber: mungedSN,
sequenceNumber: uint16(extMungedSN),
timestamp: mungedTS,
}, nil
}
@@ -230,7 +215,7 @@ func (r *RTPMunger) FilterRTX(nacks []uint16) []uint16 {
filtered := make([]uint16, 0, len(nacks))
for _, sn := range nacks {
if (sn - r.rtxGateSn) < (1 << 15) {
if (sn - uint16(r.extRtxGateSn)) < (1 << 15) {
filtered = append(filtered, sn)
}
}
@@ -251,10 +236,12 @@ func (r *RTPMunger) UpdateAndGetPaddingSnTs(num int, clockRate uint32, frameRate
tsOffset = 1
}
extLastSN := r.extLastSN
lastTS := r.lastTS
vals := make([]SnTs, num)
for i := 0; i < num; i++ {
vals[i].sequenceNumber = r.lastSN + uint16(i) + 1
extLastSN++
vals[i].sequenceNumber = uint16(extLastSN)
if frameRate != 0 {
if useLastTSForFirst && i == 0 {
vals[i].timestamp = r.lastTS
@@ -271,8 +258,8 @@ func (r *RTPMunger) UpdateAndGetPaddingSnTs(num int, clockRate uint32, frameRate
}
}
r.lastSN = vals[num-1].sequenceNumber
r.snOffset -= uint16(num)
r.extLastSN = extLastSN
r.snRangeMap.DecValue(uint32(num))
r.tsOffset -= vals[num-1].timestamp - r.lastTS
r.lastTS = vals[num-1].timestamp
@@ -287,13 +274,3 @@ func (r *RTPMunger) UpdateAndGetPaddingSnTs(num int, clockRate uint32, frameRate
func (r *RTPMunger) IsOnFrameBoundary() bool {
return r.lastMarker
}
func (r *RTPMunger) getSnOffset(sn uint16) (uint16, bool) {
diff := r.highestIncomingSN - sn
if int(diff) >= r.snOffsetsOccupancy {
return 0, false
}
readPtr := (r.snOffsetsWritePtr - int(diff) - 1) & SnOffsetCacheMask
return r.snOffsets[readPtr], true
}
+96 -58
View File
@@ -41,10 +41,12 @@ func TestSetLastSnTs(t *testing.T) {
require.NotNil(t, extPkt)
r.SetLastSnTs(extPkt)
require.Equal(t, uint16(23332), r.highestIncomingSN)
require.Equal(t, uint16(23333), r.lastSN)
require.Equal(t, uint32(23332), r.extHighestIncomingSN)
require.Equal(t, uint32(23333), r.extLastSN)
require.Equal(t, uint32(0xabcdef), r.lastTS)
require.Equal(t, uint16(0), r.snOffset)
snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(0), snOffset)
require.Equal(t, uint32(0), r.tsOffset)
}
@@ -66,10 +68,12 @@ func TestUpdateSnTsOffsets(t *testing.T) {
}
extPkt, _ = testutils.GetTestExtPacket(params)
r.UpdateSnTsOffsets(extPkt, 1, 1)
require.Equal(t, uint16(33332), r.highestIncomingSN)
require.Equal(t, uint16(23333), r.lastSN)
require.Equal(t, uint32(33332), r.extHighestIncomingSN)
require.Equal(t, uint32(23333), r.extLastSN)
require.Equal(t, uint32(0xabcdef), r.lastTS)
require.Equal(t, uint16(9999), r.snOffset)
snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(9999), snOffset)
require.Equal(t, uint32(0xffffffff), r.tsOffset)
}
@@ -84,14 +88,15 @@ func TestPacketDropped(t *testing.T) {
}
extPkt, _ := testutils.GetTestExtPacket(params)
r.SetLastSnTs(extPkt)
require.Equal(t, uint16(23332), r.highestIncomingSN)
require.Equal(t, uint16(23333), r.lastSN)
require.Equal(t, uint32(23332), r.extHighestIncomingSN)
require.Equal(t, uint32(23333), r.extLastSN)
require.Equal(t, uint32(0xabcdef), r.lastTS)
require.Equal(t, uint16(0), r.snOffset)
snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(0), snOffset)
require.Equal(t, uint32(0), r.tsOffset)
r.UpdateAndGetSnTs(extPkt) // update sequence number offset
require.Equal(t, 1, r.snOffsetsWritePtr)
// drop a non-head packet, should cause no change in internals
params = &testutils.TestExtPacketParams{
@@ -101,9 +106,11 @@ func TestPacketDropped(t *testing.T) {
}
extPkt, _ = testutils.GetTestExtPacket(params)
r.PacketDropped(extPkt)
require.Equal(t, uint16(23333), r.highestIncomingSN)
require.Equal(t, uint16(23333), r.lastSN)
require.Equal(t, uint16(0), r.snOffset)
require.Equal(t, uint32(23333), r.extHighestIncomingSN)
require.Equal(t, uint32(23333), r.extLastSN)
snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(0), snOffset)
// drop a head packet and check offset increases
params = &testutils.TestExtPacketParams{
@@ -115,13 +122,12 @@ func TestPacketDropped(t *testing.T) {
extPkt, _ = testutils.GetTestExtPacket(params)
r.UpdateAndGetSnTs(extPkt) // update sequence number offset
snOffsetWritePtr := (44444 - 23333 + 1) & SnOffsetCacheMask
require.Equal(t, snOffsetWritePtr, r.snOffsetsWritePtr)
require.Equal(t, SnOffsetCacheSize, r.snOffsetsOccupancy)
r.PacketDropped(extPkt)
require.Equal(t, r.lastSN, uint16(44443))
require.Equal(t, uint16(1), r.snOffset)
require.Equal(t, uint32(44443), r.extLastSN)
snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(1), snOffset)
params = &testutils.TestExtPacketParams{
SequenceNumber: 44445,
@@ -132,11 +138,10 @@ func TestPacketDropped(t *testing.T) {
extPkt, _ = testutils.GetTestExtPacket(params)
r.UpdateAndGetSnTs(extPkt) // update sequence number offset
require.Equal(t, uint16(1), r.snOffsets[snOffsetWritePtr])
snOffsetWritePtr = (snOffsetWritePtr + 1) & SnOffsetCacheMask
require.Equal(t, snOffsetWritePtr, r.snOffsetsWritePtr)
require.Equal(t, uint16(44444), r.lastSN)
require.Equal(t, uint16(1), r.snOffset)
require.Equal(t, r.extLastSN, uint32(44444))
snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(1), snOffset)
}
func TestOutOfOrderSequenceNumber(t *testing.T) {
@@ -152,9 +157,13 @@ func TestOutOfOrderSequenceNumber(t *testing.T) {
r.SetLastSnTs(extPkt)
r.UpdateAndGetSnTs(extPkt)
// add a missing sequence number to the cache
r.snRangeMap.IncValue(10)
r.snRangeMap.AddRange(23332, 23333)
// out-of-order sequence number not in the missing sequence number cache
params = &testutils.TestExtPacketParams{
SequenceNumber: 23332,
SequenceNumber: 23331,
Timestamp: 0xabcdef,
SSRC: 0x12345678,
PayloadSize: 10,
@@ -170,9 +179,13 @@ func TestOutOfOrderSequenceNumber(t *testing.T) {
require.ErrorIs(t, err, ErrOutOfOrderSequenceNumberCacheMiss)
require.Equal(t, tpExpected, *tp)
// add missing sequence number to the cache and try again
r.snOffsets[SnOffsetCacheSize-1] = 10
r.snOffsetsOccupancy++
params = &testutils.TestExtPacketParams{
SequenceNumber: 23332,
Timestamp: 0xabcdef,
SSRC: 0x12345678,
PayloadSize: 10,
}
extPkt, _ = testutils.GetTestExtPacket(params)
tpExpected = TranslationParamsRTP{
snOrdering: SequenceNumberOrderingOutOfOrder,
@@ -230,9 +243,11 @@ func TestPaddingOnlyPacket(t *testing.T) {
require.Error(t, err)
require.ErrorIs(t, err, ErrPaddingOnlyPacket)
require.Equal(t, tpExpected, *tp)
require.Equal(t, uint16(23333), r.highestIncomingSN)
require.Equal(t, uint16(23333), r.lastSN)
require.Equal(t, uint16(1), r.snOffset)
require.Equal(t, uint32(23333), r.extHighestIncomingSN)
require.Equal(t, uint32(23333), r.extLastSN)
snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(1), snOffset)
// padding only packet with a gap should not report an error
params = &testutils.TestExtPacketParams{
@@ -251,9 +266,11 @@ func TestPaddingOnlyPacket(t *testing.T) {
tp, err = r.UpdateAndGetSnTs(extPkt)
require.NoError(t, err)
require.Equal(t, tpExpected, *tp)
require.Equal(t, uint16(23335), r.highestIncomingSN)
require.Equal(t, uint16(23334), r.lastSN)
require.Equal(t, uint16(1), r.snOffset)
require.Equal(t, uint32(23335), r.extHighestIncomingSN)
require.Equal(t, uint32(23334), r.extLastSN)
snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(1), snOffset)
}
func TestGapInSequenceNumber(t *testing.T) {
@@ -274,6 +291,7 @@ func TestGapInSequenceNumber(t *testing.T) {
// three lost packets
params = &testutils.TestExtPacketParams{
SequenceNumber: 1,
SNCycles: 1,
Timestamp: 0xabcdef,
SSRC: 0x12345678,
PayloadSize: 33,
@@ -289,22 +307,25 @@ func TestGapInSequenceNumber(t *testing.T) {
tp, err := r.UpdateAndGetSnTs(extPkt)
require.NoError(t, err)
require.Equal(t, tpExpected, *tp)
require.Equal(t, uint16(1), r.highestIncomingSN)
require.Equal(t, uint16(1), r.lastSN)
require.Equal(t, uint16(0), r.snOffset)
require.Equal(t, uint32(65536+1), r.extHighestIncomingSN)
require.Equal(t, uint32(65536+1), r.extLastSN)
snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(0), snOffset)
// ensure missing sequence numbers got recorded in cache
// last received, three missing in between and current received should all be in cache
for i := uint16(65533); i != 2; i++ {
offset, ok := r.getSnOffset(i)
require.True(t, ok)
require.Equal(t, uint16(0), offset)
for i := uint32(65534); i != 65536+1; i++ {
offset, err := r.snRangeMap.GetValue(i)
require.NoError(t, err)
require.Equal(t, uint32(0), offset)
}
// a padding only packet should be dropped
params = &testutils.TestExtPacketParams{
SequenceNumber: 2,
SNCycles: 1,
Timestamp: 0xabcdef,
SSRC: 0x12345678,
}
@@ -317,13 +338,16 @@ func TestGapInSequenceNumber(t *testing.T) {
tp, err = r.UpdateAndGetSnTs(extPkt)
require.ErrorIs(t, err, ErrPaddingOnlyPacket)
require.Equal(t, tpExpected, *tp)
require.Equal(t, uint16(2), r.highestIncomingSN)
require.Equal(t, uint16(1), r.lastSN)
require.Equal(t, uint16(1), r.snOffset)
require.Equal(t, uint32(65536+2), r.extHighestIncomingSN)
require.Equal(t, uint32(65536+1), r.extLastSN)
snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(1), snOffset)
// a packet with a gap should be adding to missing cache
params = &testutils.TestExtPacketParams{
SequenceNumber: 4,
SNCycles: 1,
Timestamp: 0xabcdef,
SSRC: 0x12345678,
PayloadSize: 22,
@@ -339,13 +363,16 @@ func TestGapInSequenceNumber(t *testing.T) {
tp, err = r.UpdateAndGetSnTs(extPkt)
require.NoError(t, err)
require.Equal(t, tpExpected, *tp)
require.Equal(t, uint16(4), r.highestIncomingSN)
require.Equal(t, uint16(3), r.lastSN)
require.Equal(t, uint16(1), r.snOffset)
require.Equal(t, uint32(65536+4), r.extHighestIncomingSN)
require.Equal(t, uint32(65536+3), r.extLastSN)
snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(1), snOffset)
// another contiguous padding only packet should be dropped
params = &testutils.TestExtPacketParams{
SequenceNumber: 5,
SNCycles: 1,
Timestamp: 0xabcdef,
SSRC: 0x12345678,
}
@@ -358,13 +385,16 @@ func TestGapInSequenceNumber(t *testing.T) {
tp, err = r.UpdateAndGetSnTs(extPkt)
require.ErrorIs(t, err, ErrPaddingOnlyPacket)
require.Equal(t, tpExpected, *tp)
require.Equal(t, uint16(5), r.highestIncomingSN)
require.Equal(t, uint16(3), r.lastSN)
require.Equal(t, uint16(2), r.snOffset)
require.Equal(t, uint32(65536+5), r.extHighestIncomingSN)
require.Equal(t, uint32(65536+3), r.extLastSN)
snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(2), snOffset)
// a packet with a gap should be adding to missing cache
params = &testutils.TestExtPacketParams{
SequenceNumber: 7,
SNCycles: 1,
Timestamp: 0xabcdef,
SSRC: 0x12345678,
PayloadSize: 22,
@@ -380,13 +410,16 @@ func TestGapInSequenceNumber(t *testing.T) {
tp, err = r.UpdateAndGetSnTs(extPkt)
require.NoError(t, err)
require.Equal(t, tpExpected, *tp)
require.Equal(t, uint16(7), r.highestIncomingSN)
require.Equal(t, uint16(5), r.lastSN)
require.Equal(t, uint16(2), r.snOffset)
require.Equal(t, uint32(65536+7), r.extHighestIncomingSN)
require.Equal(t, uint32(65536+5), r.extLastSN)
snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(2), snOffset)
// check the missing packets
params = &testutils.TestExtPacketParams{
SequenceNumber: 6,
SNCycles: 1,
Timestamp: 0xabcdef,
SSRC: 0x12345678,
}
@@ -401,12 +434,15 @@ func TestGapInSequenceNumber(t *testing.T) {
tp, err = r.UpdateAndGetSnTs(extPkt)
require.NoError(t, err)
require.Equal(t, tpExpected, *tp)
require.Equal(t, uint16(7), r.highestIncomingSN)
require.Equal(t, uint16(5), r.lastSN)
require.Equal(t, uint16(2), r.snOffset)
require.Equal(t, uint32(65536+7), r.extHighestIncomingSN)
require.Equal(t, uint32(65536+5), r.extLastSN)
snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(2), snOffset)
params = &testutils.TestExtPacketParams{
SequenceNumber: 3,
SNCycles: 1,
Timestamp: 0xabcdef,
SSRC: 0x12345678,
}
@@ -421,9 +457,11 @@ func TestGapInSequenceNumber(t *testing.T) {
tp, err = r.UpdateAndGetSnTs(extPkt)
require.NoError(t, err)
require.Equal(t, tpExpected, *tp)
require.Equal(t, uint16(7), r.highestIncomingSN)
require.Equal(t, uint16(5), r.lastSN)
require.Equal(t, uint16(2), r.snOffset)
require.Equal(t, uint32(65536+7), r.extHighestIncomingSN)
require.Equal(t, uint32(65536+5), r.extLastSN)
snOffset, err = r.snRangeMap.GetValue(r.extHighestIncomingSN)
require.NoError(t, err)
require.Equal(t, uint32(2), snOffset)
}
func TestUpdateAndGetPaddingSnTs(t *testing.T) {
+7 -5
View File
@@ -30,6 +30,7 @@ type TestExtPacketParams struct {
IsKeyFrame bool
PayloadType uint8
SequenceNumber uint16
SNCycles int
Timestamp uint32
SSRC uint32
PayloadSize int
@@ -61,11 +62,12 @@ func GetTestExtPacket(params *TestExtPacketParams) (*buffer.ExtPacket, error) {
}
ep := &buffer.ExtPacket{
VideoLayer: params.VideoLayer,
Arrival: params.ArrivalTime,
Packet: &packet,
KeyFrame: params.IsKeyFrame,
RawPacket: raw,
VideoLayer: params.VideoLayer,
ExtSequenceNumber: uint32(params.SNCycles<<16) + uint32(params.SequenceNumber),
Arrival: params.ArrivalTime,
Packet: &packet,
KeyFrame: params.IsKeyFrame,
RawPacket: raw,
}
return ep, nil
+10 -1
View File
@@ -59,12 +59,21 @@ func NewRangeMap[RT rangeType, VT valueType](size int) *RangeMap[RT, VT] {
}
}
func (r *RangeMap[RT, VT]) ClearAndResetValue(val VT) {
r.ranges = r.ranges[:0]
r.runningValue = val
}
func (r *RangeMap[RT, VT]) IncValue(inc VT) {
r.runningValue += inc
}
func (r *RangeMap[RT, VT]) DecValue(dec VT) {
r.runningValue -= dec
}
func (r *RangeMap[RT, VT]) AddRange(startInclusive RT, endExclusive RT) error {
if endExclusive-startInclusive > r.halfRange {
if endExclusive == startInclusive || endExclusive-startInclusive > r.halfRange {
return errReversedOrder
}
+7
View File
@@ -56,6 +56,7 @@ func TestRangeMapUint32(t *testing.T) {
value, err = r.GetValue(22)
require.NoError(t, err)
require.Equal(t, uint32(2), value)
// outside range should return 3
value, err = r.GetValue(662)
require.NoError(t, err)
@@ -119,4 +120,10 @@ func TestRangeMapUint32(t *testing.T) {
value, err = r.GetValue(3000)
require.NoError(t, err)
require.Equal(t, uint32(13), value)
// decrement running value
r.DecValue(23)
value, err = r.GetValue(3000)
require.NoError(t, err)
require.Equal(t, uint32((1<<32)-10), value)
}
+24 -14
View File
@@ -49,14 +49,14 @@ func (w *WrapAround[T, ET]) Seed(from *WrapAround[T, ET]) {
w.cycles = from.cycles
}
type wrapAroundUpdateResult[ET extendedNumber] struct {
type WrapAroundUpdateResult[ET extendedNumber] struct {
IsRestart bool
PreExtendedStart ET // valid only if IsRestart = true
PreExtendedHighest ET
ExtendedVal ET
}
func (w *WrapAround[T, ET]) Update(val T) (result wrapAroundUpdateResult[ET]) {
func (w *WrapAround[T, ET]) Update(val T) (result WrapAroundUpdateResult[ET]) {
if !w.initialized {
result.PreExtendedHighest = ET(val) - 1
result.ExtendedVal = ET(val)
@@ -82,10 +82,17 @@ func (w *WrapAround[T, ET]) Update(val T) (result wrapAroundUpdateResult[ET]) {
}
w.highest = val
result.ExtendedVal = ET(w.cycles)*w.fullRange + ET(val)
result.ExtendedVal = w.getExtendedHighest(w.cycles, val)
return
}
func (w *WrapAround[T, ET]) RollbackRestart(ev ET) {
if w.isWrapBack(w.start, T(ev)) {
w.cycles--
}
w.start = T(ev)
}
func (w *WrapAround[T, ET]) ResetHighest(val T) {
w.highest = val
}
@@ -103,14 +110,10 @@ func (w *WrapAround[T, ET]) GetHighest() T {
}
func (w *WrapAround[T, ET]) GetExtendedHighest() ET {
return ET(w.cycles)*w.fullRange + ET(w.highest)
return w.getExtendedHighest(w.cycles, w.highest)
}
func (w *WrapAround[T, ET]) maybeAdjustStart(val T) (isRestart bool, preExtendedStart ET, extendedVal ET) {
isWrapBack := func() bool {
return ET(w.highest) < (w.fullRange>>1) && ET(val) >= (w.fullRange>>1)
}
// re-adjust start if necessary. The conditions are
// 1. Not seen more than half the range yet
// 1. wrap back compared to start and not completed a half cycle, sequences like (10, 65530) in uint16 space
@@ -118,10 +121,10 @@ func (w *WrapAround[T, ET]) maybeAdjustStart(val T) (isRestart bool, preExtended
cycles := w.cycles
totalNum := w.GetExtendedHighest() - w.GetExtendedStart() + 1
if totalNum > (w.fullRange >> 1) {
if isWrapBack() {
if w.isWrapBack(val, w.highest) {
cycles--
}
extendedVal = ET(cycles)*w.fullRange + ET(val)
extendedVal = w.getExtendedHighest(cycles, val)
return
}
@@ -130,17 +133,24 @@ func (w *WrapAround[T, ET]) maybeAdjustStart(val T) (isRestart bool, preExtended
isRestart = true
preExtendedStart = w.GetExtendedStart()
if val > w.highest {
// wrap around
if w.isWrapBack(val, w.highest) {
w.cycles = 1
cycles = 0
}
w.start = val
} else {
if isWrapBack() {
if w.isWrapBack(val, w.highest) {
cycles--
}
}
extendedVal = ET(cycles)*w.fullRange + ET(val)
extendedVal = w.getExtendedHighest(cycles, val)
return
}
func (w *WrapAround[T, ET]) isWrapBack(earlier T, later T) bool {
return ET(later) < (w.fullRange>>1) && ET(earlier) >= (w.fullRange>>1)
}
func (w *WrapAround[T, ET]) getExtendedHighest(cycles int, val T) ET {
return ET(cycles)*w.fullRange + ET(val)
}
+80 -20
View File
@@ -25,7 +25,7 @@ func TestWrapAroundUint16(t *testing.T) {
testCases := []struct {
name string
input uint16
updated wrapAroundUpdateResult[uint32]
updated WrapAroundUpdateResult[uint32]
start uint16
extendedStart uint32
highest uint16
@@ -35,7 +35,7 @@ func TestWrapAroundUint16(t *testing.T) {
{
name: "initialize",
input: 10,
updated: wrapAroundUpdateResult[uint32]{
updated: WrapAroundUpdateResult[uint32]{
IsRestart: false,
PreExtendedStart: 0,
PreExtendedHighest: 9,
@@ -50,7 +50,7 @@ func TestWrapAroundUint16(t *testing.T) {
{
name: "reset start no wrap around",
input: 8,
updated: wrapAroundUpdateResult[uint32]{
updated: WrapAroundUpdateResult[uint32]{
IsRestart: true,
PreExtendedStart: 10,
PreExtendedHighest: 10,
@@ -65,7 +65,7 @@ func TestWrapAroundUint16(t *testing.T) {
{
name: "reset start wrap around",
input: (1 << 16) - 6,
updated: wrapAroundUpdateResult[uint32]{
updated: WrapAroundUpdateResult[uint32]{
IsRestart: true,
PreExtendedStart: 8,
PreExtendedHighest: 10,
@@ -80,7 +80,7 @@ func TestWrapAroundUint16(t *testing.T) {
{
name: "reset start again",
input: (1 << 16) - 12,
updated: wrapAroundUpdateResult[uint32]{
updated: WrapAroundUpdateResult[uint32]{
IsRestart: true,
PreExtendedStart: (1 << 16) - 6,
PreExtendedHighest: (1 << 16) + 10,
@@ -95,7 +95,7 @@ func TestWrapAroundUint16(t *testing.T) {
{
name: "out of order - no restart",
input: (1 << 16) - 3,
updated: wrapAroundUpdateResult[uint32]{
updated: WrapAroundUpdateResult[uint32]{
IsRestart: false,
PreExtendedStart: 0,
PreExtendedHighest: (1 << 16) + 10,
@@ -110,7 +110,7 @@ func TestWrapAroundUint16(t *testing.T) {
{
name: "duplicate",
input: 10,
updated: wrapAroundUpdateResult[uint32]{
updated: WrapAroundUpdateResult[uint32]{
IsRestart: false,
PreExtendedStart: 0,
PreExtendedHighest: (1 << 16) + 10,
@@ -125,7 +125,7 @@ func TestWrapAroundUint16(t *testing.T) {
{
name: "big in-order jump",
input: (1 << 15) - 10,
updated: wrapAroundUpdateResult[uint32]{
updated: WrapAroundUpdateResult[uint32]{
IsRestart: false,
PreExtendedStart: 0,
PreExtendedHighest: (1 << 16) + 10,
@@ -140,7 +140,7 @@ func TestWrapAroundUint16(t *testing.T) {
{
name: "out-of-order after half range",
input: (1 << 15) - 11,
updated: wrapAroundUpdateResult[uint32]{
updated: WrapAroundUpdateResult[uint32]{
IsRestart: false,
PreExtendedStart: 0,
PreExtendedHighest: (1 << 16) + (1 << 15) - 10,
@@ -155,7 +155,7 @@ func TestWrapAroundUint16(t *testing.T) {
{
name: "wrap back out-of-order after half range",
input: (1 << 16) - 1,
updated: wrapAroundUpdateResult[uint32]{
updated: WrapAroundUpdateResult[uint32]{
IsRestart: false,
PreExtendedStart: 0,
PreExtendedHighest: (1 << 16) + (1 << 15) - 10,
@@ -170,7 +170,7 @@ func TestWrapAroundUint16(t *testing.T) {
{
name: "in-order",
input: (1 << 15) + 3,
updated: wrapAroundUpdateResult[uint32]{
updated: WrapAroundUpdateResult[uint32]{
IsRestart: false,
PreExtendedStart: 0,
PreExtendedHighest: (1 << 16) + (1 << 15) - 10,
@@ -194,12 +194,72 @@ func TestWrapAroundUint16(t *testing.T) {
}
}
func TestWrapAroundUint16RollbackRestart(t *testing.T) {
w := NewWrapAround[uint16, uint32]()
// initialize
w.Update(23)
require.Equal(t, uint16(23), w.GetStart())
require.Equal(t, uint32(23), w.GetExtendedStart())
require.Equal(t, uint16(23), w.GetHighest())
require.Equal(t, uint32(23), w.GetExtendedHighest())
// an in-order update
w.Update(25)
require.Equal(t, uint16(23), w.GetStart())
require.Equal(t, uint32(23), w.GetExtendedStart())
require.Equal(t, uint16(25), w.GetHighest())
require.Equal(t, uint32(25), w.GetExtendedHighest())
// force restart without wrap
res := w.Update(12)
expectedResult := WrapAroundUpdateResult[uint32]{
IsRestart: true,
PreExtendedStart: 23,
PreExtendedHighest: 25,
ExtendedVal: 12,
}
require.Equal(t, expectedResult, res)
require.Equal(t, uint16(12), w.GetStart())
require.Equal(t, uint32(12), w.GetExtendedStart())
require.Equal(t, uint16(25), w.GetHighest())
require.Equal(t, uint32(25), w.GetExtendedHighest())
// roll back restart
w.RollbackRestart(res.PreExtendedStart)
require.Equal(t, uint16(23), w.GetStart())
require.Equal(t, uint32(23), w.GetExtendedStart())
require.Equal(t, uint16(25), w.GetHighest())
require.Equal(t, uint32(25), w.GetExtendedHighest())
// force restart with wrap
res = w.Update(65533)
expectedResult = WrapAroundUpdateResult[uint32]{
IsRestart: true,
PreExtendedStart: 23,
PreExtendedHighest: 25,
ExtendedVal: 65533,
}
require.Equal(t, expectedResult, res)
require.Equal(t, uint16(65533), w.GetStart())
require.Equal(t, uint32(65533), w.GetExtendedStart())
require.Equal(t, uint16(25), w.GetHighest())
require.Equal(t, uint32(65536+25), w.GetExtendedHighest())
// roll back restart
w.RollbackRestart(res.PreExtendedStart)
require.Equal(t, uint16(23), w.GetStart())
require.Equal(t, uint32(23), w.GetExtendedStart())
require.Equal(t, uint16(25), w.GetHighest())
require.Equal(t, uint32(25), w.GetExtendedHighest())
}
func TestWrapAroundUint32(t *testing.T) {
w := NewWrapAround[uint32, uint64]()
testCases := []struct {
name string
input uint32
updated wrapAroundUpdateResult[uint64]
updated WrapAroundUpdateResult[uint64]
start uint32
extendedStart uint64
highest uint32
@@ -209,7 +269,7 @@ func TestWrapAroundUint32(t *testing.T) {
{
name: "initialize",
input: 10,
updated: wrapAroundUpdateResult[uint64]{
updated: WrapAroundUpdateResult[uint64]{
IsRestart: false,
PreExtendedStart: 0,
PreExtendedHighest: 9,
@@ -224,7 +284,7 @@ func TestWrapAroundUint32(t *testing.T) {
{
name: "reset start no wrap around",
input: 8,
updated: wrapAroundUpdateResult[uint64]{
updated: WrapAroundUpdateResult[uint64]{
IsRestart: true,
PreExtendedStart: 10,
PreExtendedHighest: 10,
@@ -239,7 +299,7 @@ func TestWrapAroundUint32(t *testing.T) {
{
name: "reset start wrap around",
input: (1 << 32) - 6,
updated: wrapAroundUpdateResult[uint64]{
updated: WrapAroundUpdateResult[uint64]{
IsRestart: true,
PreExtendedStart: 8,
PreExtendedHighest: 10,
@@ -254,7 +314,7 @@ func TestWrapAroundUint32(t *testing.T) {
{
name: "reset start again",
input: (1 << 32) - 12,
updated: wrapAroundUpdateResult[uint64]{
updated: WrapAroundUpdateResult[uint64]{
IsRestart: true,
PreExtendedStart: (1 << 32) - 6,
PreExtendedHighest: (1 << 32) + 10,
@@ -269,7 +329,7 @@ func TestWrapAroundUint32(t *testing.T) {
{
name: "duplicate",
input: 10,
updated: wrapAroundUpdateResult[uint64]{
updated: WrapAroundUpdateResult[uint64]{
IsRestart: false,
PreExtendedStart: 0,
PreExtendedHighest: (1 << 32) + 10,
@@ -284,7 +344,7 @@ func TestWrapAroundUint32(t *testing.T) {
{
name: "big in-order jump",
input: 1 << 31,
updated: wrapAroundUpdateResult[uint64]{
updated: WrapAroundUpdateResult[uint64]{
IsRestart: false,
PreExtendedStart: 0,
PreExtendedHighest: (1 << 32) + 10,
@@ -299,7 +359,7 @@ func TestWrapAroundUint32(t *testing.T) {
{
name: "out-of-order after half range",
input: (1 << 31) - 1,
updated: wrapAroundUpdateResult[uint64]{
updated: WrapAroundUpdateResult[uint64]{
IsRestart: false,
PreExtendedStart: 0,
PreExtendedHighest: (1 << 32) + (1 << 31),
@@ -314,7 +374,7 @@ func TestWrapAroundUint32(t *testing.T) {
{
name: "in-order",
input: (1 << 31) + 3,
updated: wrapAroundUpdateResult[uint64]{
updated: WrapAroundUpdateResult[uint64]{
IsRestart: false,
PreExtendedStart: 0,
PreExtendedHighest: (1 << 32) + (1 << 31),