From 7ed3af193a3f1ec41e558f70dfa75e1a95d3d248 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Tue, 6 Jun 2023 11:28:13 +0530 Subject: [PATCH] No proof that this helps (#1772) --- pkg/config/config.go | 3 - pkg/rtc/mediatracksubscriptions.go | 1 - pkg/rtc/participant.go | 5 - pkg/rtc/types/interfaces.go | 2 - .../typesfakes/fake_local_participant.go | 65 -------- pkg/service/roommanager.go | 6 - pkg/sfu/buffer/rtpstats.go | 153 ++---------------- pkg/sfu/downtrack.go | 30 ++-- pkg/sfu/forwarder.go | 7 - pkg/sfu/rtpmunger.go | 46 +----- pkg/sfu/rtpmunger_test.go | 3 - 11 files changed, 26 insertions(+), 295 deletions(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index eded926ca..325eec68c 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -89,9 +89,6 @@ type RTCConfig struct { // force a reconnect on a subscription error ReconnectOnSubscriptionError *bool `yaml:"reconnect_on_subscription_error,omitempty"` - - // allow time stamp adjust to keep drift low, this is experimental - AllowTimestampAdjustment *bool `yaml:"allow_timestamp_adjustment,omitempty"` } type TURNServer struct { diff --git a/pkg/rtc/mediatracksubscriptions.go b/pkg/rtc/mediatracksubscriptions.go index 94b6f2b88..007428f50 100644 --- a/pkg/rtc/mediatracksubscriptions.go +++ b/pkg/rtc/mediatracksubscriptions.go @@ -104,7 +104,6 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr * sub.GetBufferFactory(), subscriberID, t.params.ReceiverConfig.PacketBufferSize, - sub.GetAllowTimestampAdjustment(), LoggerWithTrack(sub.GetLogger(), trackID, t.params.IsRelayed), ) if err != nil { diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index ecdfd2082..5c3cdef26 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -93,7 +93,6 @@ type ParticipantParams struct { SubscriberAllowPause bool SubscriptionLimitAudio int32 SubscriptionLimitVideo int32 - AllowTimestampAdjustment bool } type ParticipantImpl struct { @@ -233,10 +232,6 @@ func (p *ParticipantImpl) GetAdaptiveStream() bool { return p.params.AdaptiveStream } -func (p *ParticipantImpl) GetAllowTimestampAdjustment() bool { - return p.params.AllowTimestampAdjustment -} - func (p *ParticipantImpl) ID() livekit.ParticipantID { return p.params.SID } diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index c7c963372..2501d7013 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -341,8 +341,6 @@ type LocalParticipant interface { // down stream bandwidth management SetSubscriberAllowPause(allowPause bool) SetSubscriberChannelCapacity(channelCapacity int64) - - GetAllowTimestampAdjustment() bool } // Room is a container of participants, and can provide room-level actions diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index 3fbdcbc7d..e4ddfea5f 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -165,16 +165,6 @@ type FakeLocalParticipant struct { getAdaptiveStreamReturnsOnCall map[int]struct { result1 bool } - GetAllowTimestampAdjustmentStub func() bool - getAllowTimestampAdjustmentMutex sync.RWMutex - getAllowTimestampAdjustmentArgsForCall []struct { - } - getAllowTimestampAdjustmentReturns struct { - result1 bool - } - getAllowTimestampAdjustmentReturnsOnCall map[int]struct { - result1 bool - } GetAudioLevelStub func() (float64, bool) getAudioLevelMutex sync.RWMutex getAudioLevelArgsForCall []struct { @@ -1622,59 +1612,6 @@ func (fake *FakeLocalParticipant) GetAdaptiveStreamReturnsOnCall(i int, result1 }{result1} } -func (fake *FakeLocalParticipant) GetAllowTimestampAdjustment() bool { - fake.getAllowTimestampAdjustmentMutex.Lock() - ret, specificReturn := fake.getAllowTimestampAdjustmentReturnsOnCall[len(fake.getAllowTimestampAdjustmentArgsForCall)] - fake.getAllowTimestampAdjustmentArgsForCall = append(fake.getAllowTimestampAdjustmentArgsForCall, struct { - }{}) - stub := fake.GetAllowTimestampAdjustmentStub - fakeReturns := fake.getAllowTimestampAdjustmentReturns - fake.recordInvocation("GetAllowTimestampAdjustment", []interface{}{}) - fake.getAllowTimestampAdjustmentMutex.Unlock() - if stub != nil { - return stub() - } - if specificReturn { - return ret.result1 - } - return fakeReturns.result1 -} - -func (fake *FakeLocalParticipant) GetAllowTimestampAdjustmentCallCount() int { - fake.getAllowTimestampAdjustmentMutex.RLock() - defer fake.getAllowTimestampAdjustmentMutex.RUnlock() - return len(fake.getAllowTimestampAdjustmentArgsForCall) -} - -func (fake *FakeLocalParticipant) GetAllowTimestampAdjustmentCalls(stub func() bool) { - fake.getAllowTimestampAdjustmentMutex.Lock() - defer fake.getAllowTimestampAdjustmentMutex.Unlock() - fake.GetAllowTimestampAdjustmentStub = stub -} - -func (fake *FakeLocalParticipant) GetAllowTimestampAdjustmentReturns(result1 bool) { - fake.getAllowTimestampAdjustmentMutex.Lock() - defer fake.getAllowTimestampAdjustmentMutex.Unlock() - fake.GetAllowTimestampAdjustmentStub = nil - fake.getAllowTimestampAdjustmentReturns = struct { - result1 bool - }{result1} -} - -func (fake *FakeLocalParticipant) GetAllowTimestampAdjustmentReturnsOnCall(i int, result1 bool) { - fake.getAllowTimestampAdjustmentMutex.Lock() - defer fake.getAllowTimestampAdjustmentMutex.Unlock() - fake.GetAllowTimestampAdjustmentStub = nil - if fake.getAllowTimestampAdjustmentReturnsOnCall == nil { - fake.getAllowTimestampAdjustmentReturnsOnCall = make(map[int]struct { - result1 bool - }) - } - fake.getAllowTimestampAdjustmentReturnsOnCall[i] = struct { - result1 bool - }{result1} -} - func (fake *FakeLocalParticipant) GetAudioLevel() (float64, bool) { fake.getAudioLevelMutex.Lock() ret, specificReturn := fake.getAudioLevelReturnsOnCall[len(fake.getAudioLevelArgsForCall)] @@ -5519,8 +5456,6 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.debugInfoMutex.RUnlock() fake.getAdaptiveStreamMutex.RLock() defer fake.getAdaptiveStreamMutex.RUnlock() - fake.getAllowTimestampAdjustmentMutex.RLock() - defer fake.getAllowTimestampAdjustmentMutex.RUnlock() fake.getAudioLevelMutex.RLock() defer fake.getAudioLevelMutex.RUnlock() fake.getBufferFactoryMutex.RLock() diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 061446b00..056b1b976 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -310,11 +310,6 @@ func (r *RoomManager) StartSession( if pi.SubscriberAllowPause != nil { subscriberAllowPause = *pi.SubscriberAllowPause } - // default do not allow timestamp adjustment - allowTimestampAdjustment := false - if r.config.RTC.AllowTimestampAdjustment != nil { - allowTimestampAdjustment = *r.config.RTC.AllowTimestampAdjustment - } participant, err = rtc.NewParticipant(rtc.ParticipantParams{ Identity: pi.Identity, Name: pi.Name, @@ -349,7 +344,6 @@ func (r *RoomManager) StartSession( SubscriberAllowPause: subscriberAllowPause, SubscriptionLimitAudio: r.config.Limit.SubscriptionLimitAudio, SubscriptionLimitVideo: r.config.Limit.SubscriptionLimitVideo, - AllowTimestampAdjustment: allowTimestampAdjustment, }) if err != nil { return err diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index 51c18c4d7..0c1952a69 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -17,12 +17,11 @@ import ( ) const ( - GapHistogramNumBins = 101 - NumSequenceNumbers = 65536 - FirstSnapshotId = 1 - SnInfoSize = 8192 - SnInfoMask = SnInfoSize - 1 - MeasurementWindowSecondsMin = float64(5.0) + GapHistogramNumBins = 101 + NumSequenceNumbers = 65536 + FirstSnapshotId = 1 + SnInfoSize = 8192 + SnInfoMask = SnInfoSize - 1 ) // ------------------------------------------------------- @@ -201,27 +200,17 @@ type RTPStats struct { srFirst *RTCPSenderReportData srNewest *RTCPSenderReportData - pidController *PIDController - nextSnapshotId uint32 snapshots map[uint32]*Snapshot } func NewRTPStats(params RTPStatsParams) *RTPStats { - r := &RTPStats{ + return &RTPStats{ params: params, logger: params.Logger, nextSnapshotId: FirstSnapshotId, snapshots: make(map[uint32]*Snapshot), - pidController: NewPIDController(params.Logger), } - - r.pidController.SetGains(2.0, 0.5, 0.25) - r.pidController.SetDerivativeLPF(0.02) - outMin, outMax := -0.025*float64(r.params.ClockRate), 0.025*float64(r.params.ClockRate) - r.pidController.SetOutputLimits(outMin, outMax) - r.pidController.SetIntegralLimits(outMin/2.0, outMax/2.0) - return r } func (r *RTPStats) Seed(from *RTPStats) { @@ -324,7 +313,6 @@ func (r *RTPStats) Seed(from *RTPStats) { func (r *RTPStats) SetLogger(logger logger.Logger) { r.logger = logger - r.pidController.SetLogger(logger) } func (r *RTPStats) Stop() { @@ -897,12 +885,12 @@ func (r *RTPStats) GetExpectedRTPTimestamp(at time.Time) (uint32, uint64, error) return uint32(expectedExtRTP), minTS, nil } -func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, srFirst *RTCPSenderReportData, srNewest *RTCPSenderReportData) (*rtcp.SenderReport, float64) { +func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, srFirst *RTCPSenderReportData, srNewest *RTCPSenderReportData) *rtcp.SenderReport { r.lock.Lock() defer r.lock.Unlock() if !r.initialized { - return nil, 0.0 + return nil } // construct current time based on monotonic clock @@ -933,17 +921,6 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, srFirst *RTCPSenderReportDat } } - pidOutput := float64(0.0) - if timeSinceFirst.Seconds() > MeasurementWindowSecondsMin { - rtpDiffSinceFirst := nowRTPExt - r.extStartTS - rate := float64(rtpDiffSinceFirst) / timeSinceFirst.Seconds() - pidOutput = r.pidController.Update( - float64(r.params.ClockRate), - rate, - now, - ) - } - // monitor and log RTP timestamp anomalies var ntpDiffSinceLast time.Duration var rtpDiffSinceLast uint32 @@ -1012,7 +989,7 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, srFirst *RTCPSenderReportDat RTPTime: nowRTP, PacketCount: r.getTotalPacketsPrimary() + r.packetsDuplicate + r.packetsPadding, OctetCount: uint32(r.bytes + r.bytesDuplicate + r.bytesPadding), - }, pidOutput + } } func (r *RTPStats) SnapshotRtcpReceptionReport(ssrc uint32, proxyFracLost uint8, snapshotId uint32) *rtcp.ReceptionReport { @@ -1922,115 +1899,3 @@ func AggregateRTPDeltaInfo(deltaInfoList []*RTPDeltaInfo) *RTPDeltaInfo { } // ------------------------------------------------------------------- - -type PIDController struct { - logger logger.Logger - - kp, ki, kd float64 - - tau float64 // low pass filter of D, time constant - - outMin, outMax float64 - isOutLimitsSet bool - - iMin, iMax float64 - isILimitsSet bool - - iVal, dVal float64 - - prevError, prevMeasurement float64 - prevMeasurementTime time.Time -} - -func NewPIDController(logger logger.Logger) *PIDController { - return &PIDController{ - logger: logger, - } -} - -func (p *PIDController) SetLogger(logger logger.Logger) { - p.logger = logger -} - -func (p *PIDController) SetGains(kp, ki, kd float64) { - p.kp = kp - p.ki = ki - p.kd = kd -} - -func (p *PIDController) SetDerivativeLPF(tau float64) { - p.tau = tau -} - -func (p *PIDController) SetOutputLimits(min, max float64) { - p.outMin = min - p.outMax = max - p.isOutLimitsSet = true -} - -func (p *PIDController) SetIntegralLimits(min, max float64) { - p.iMin = min - p.iMax = max - p.isILimitsSet = true -} - -func (p *PIDController) Update(setpoint, measurement float64, at time.Time) float64 { - errorTerm := setpoint - measurement - if p.prevMeasurementTime.IsZero() { - p.prevError = errorTerm - p.prevMeasurement = measurement - p.prevMeasurementTime = at - return 0 - } - - duration := at.Sub(p.prevMeasurementTime).Seconds() - if duration == 0 { - return 0 - } - - proportional := p.kp * errorTerm - - iVal := p.iVal + (0.5 * p.ki * duration * (errorTerm + p.prevError)) - boundIVal := iVal - if p.isILimitsSet { - if iVal > p.iMax { - boundIVal = p.iMax - } - if iVal < p.iMin { - boundIVal = p.iMin - } - } - p.iVal = boundIVal - - p.dVal = -(2.0*p.kd*(measurement-p.prevMeasurement) + (2.0*p.tau-duration)*p.dVal) / (2.0*p.tau + duration) - - output := proportional + p.iVal + p.dVal - boundOutput := output - if p.isOutLimitsSet { - if output > p.outMax { - boundOutput = p.outMax - } - if output < p.outMin { - boundOutput = p.outMin - } - } - - p.prevError = errorTerm - p.prevMeasurement = measurement - p.prevMeasurementTime = at - p.logger.Debugw( - "pid controller", - "setpoint", setpoint, - "measurement", measurement, - "errorTerm", errorTerm, - "proportional", proportional, - "integral", iVal, - "integralLimited", boundIVal, - "derivative", p.dVal, - "output", output, - "outputLimited", boundOutput, - ) - return boundOutput -} - -// ------------------------------------------------------------------- diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index b90d8ebb5..d8850ff64 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -185,8 +185,6 @@ type DownTrack struct { sequencer *sequencer bufferFactory *buffer.Factory - allowTimestampAdjustment bool - forwarder *Forwarder upstreamCodecs []webrtc.RTPCodecParameters @@ -254,7 +252,6 @@ func NewDownTrack( bf *buffer.Factory, subID livekit.ParticipantID, mt int, - allowTimestampAdjustment bool, logger logger.Logger, ) (*DownTrack, error) { var kind webrtc.RTPCodecType @@ -268,17 +265,16 @@ func NewDownTrack( } d := &DownTrack{ - logger: logger, - id: r.TrackID(), - subscriberID: subID, - maxTrack: mt, - streamID: r.StreamID(), - bufferFactory: bf, - allowTimestampAdjustment: allowTimestampAdjustment, - receiver: r, - upstreamCodecs: codecs, - kind: kind, - codec: codecs[0].RTPCodecCapability, + logger: logger, + id: r.TrackID(), + subscriberID: subID, + maxTrack: mt, + streamID: r.StreamID(), + bufferFactory: bf, + receiver: r, + upstreamCodecs: codecs, + kind: kind, + codec: codecs[0].RTPCodecCapability, } d.forwarder = NewForwarder( d.kind, @@ -1123,11 +1119,7 @@ func (d *DownTrack) CreateSenderReport() *rtcp.SenderReport { } srFirst, srNewest := d.receiver.GetRTCPSenderReportData(d.forwarder.GetReferenceLayerSpatial()) - sr, tsAdjust := d.rtpStats.GetRtcpSenderReport(d.ssrc, srFirst, srNewest) - if d.allowTimestampAdjustment { - d.forwarder.AdjustTimestamp(tsAdjust) - } - return sr + return d.rtpStats.GetRtcpSenderReport(d.ssrc, srFirst, srNewest) } func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan struct{} { diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 32f8f2f30..db2127065 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -1706,13 +1706,6 @@ func (f *Forwarder) GetRTPMungerParams() RTPMungerParams { return f.rtpMunger.GetParams() } -func (f *Forwarder) AdjustTimestamp(tsAdjust float64) { - f.lock.Lock() - defer f.lock.Unlock() - - f.rtpMunger.UpdateTsOffset(uint32(tsAdjust + 0.5)) -} - // ----------------------------------------------------------------------------- func getOptimalBandwidthNeeded(muted bool, pubMuted bool, maxPublishedLayer int32, brs Bitrates, maxLayer buffer.VideoLayer) int64 { diff --git a/pkg/sfu/rtpmunger.go b/pkg/sfu/rtpmunger.go index a80ad6000..452019507 100644 --- a/pkg/sfu/rtpmunger.go +++ b/pkg/sfu/rtpmunger.go @@ -52,14 +52,12 @@ func (r RTPMungerState) String() string { // ---------------------------------------------------------------------- type RTPMungerParams struct { - highestIncomingSN uint16 - lastSN uint16 - snOffset uint16 - highestIncomingTS uint32 - lastTS uint32 - tsOffset uint32 - tsOffsetAdjustment uint32 - lastMarker bool + highestIncomingSN uint16 + lastSN uint16 + snOffset uint16 + lastTS uint32 + tsOffset uint32 + lastMarker bool snOffsets [SnOffsetCacheSize]uint16 snOffsetsWritePtr int @@ -86,7 +84,6 @@ func (r *RTPMunger) GetParams() RTPMungerParams { highestIncomingSN: r.highestIncomingSN, lastSN: r.lastSN, snOffset: r.snOffset, - highestIncomingTS: r.highestIncomingTS, lastTS: r.lastTS, tsOffset: r.tsOffset, lastMarker: r.lastMarker, @@ -107,14 +104,12 @@ func (r *RTPMunger) SeedLast(state RTPMungerState) { func (r *RTPMunger) SetLastSnTs(extPkt *buffer.ExtPacket) { r.highestIncomingSN = extPkt.Packet.SequenceNumber - 1 - r.highestIncomingTS = extPkt.Packet.Timestamp r.lastSN = extPkt.Packet.SequenceNumber r.lastTS = extPkt.Packet.Timestamp } func (r *RTPMunger) UpdateSnTsOffsets(extPkt *buffer.ExtPacket, snAdjust uint16, tsAdjust uint32) { r.highestIncomingSN = extPkt.Packet.SequenceNumber - 1 - r.highestIncomingTS = extPkt.Packet.Timestamp r.snOffset = extPkt.Packet.SequenceNumber - r.lastSN - snAdjust r.tsOffset = extPkt.Packet.Timestamp - r.lastTS - tsAdjust @@ -131,10 +126,6 @@ func (r *RTPMunger) PacketDropped(extPkt *buffer.ExtPacket) { r.lastSN = extPkt.Packet.SequenceNumber - r.snOffset } -func (r *RTPMunger) UpdateTsOffset(tsAdjust uint32) { - r.tsOffsetAdjustment = tsAdjust -} - func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationParamsRTP, error) { // if out-of-order, look up sequence number offset cache diff := extPkt.Packet.SequenceNumber - r.highestIncomingSN @@ -188,12 +179,6 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara } } - // apply timestamp offset adjustment at the start of a frame only - if extPkt.Packet.Timestamp != r.highestIncomingTS && r.tsOffsetAdjustment != 0 { - r.tsOffset -= r.tsOffsetAdjustment - r.tsOffsetAdjustment = 0 - } - // in-order incoming packet, may or may not be contiguous. // In the case of loss (i.e. incoming sequence number is not contiguous), // forward even if it is a padding only packet. With temporal scalability, @@ -203,27 +188,8 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara mungedSN := extPkt.Packet.SequenceNumber - r.snOffset mungedTS := extPkt.Packet.Timestamp - r.tsOffset - // with timestamp adjustment, it is possible that the adjustment causes munged timestamp to move backwards, - // detect that and adjust so that it does not move back - if extPkt.Packet.Timestamp != r.highestIncomingTS && (((mungedTS - r.lastTS) == 0) || (mungedTS-r.lastTS) > (1<<31)) { - adjustedMungedTS := r.lastTS + 1 - adjustedTSOffset := extPkt.Packet.Timestamp - adjustedMungedTS - r.logger.Debugw( - "adjust out-of-order timestamp offset", - "mungedTS", mungedTS, - "lastTS", r.lastTS, - "incomingTS", extPkt.Packet.Timestamp, - "offset", r.tsOffset, - "adjustedMungedTS", adjustedMungedTS, - "adjustedTSOffset", adjustedTSOffset, - ) - mungedTS = adjustedMungedTS - r.tsOffset = adjustedTSOffset - } - r.highestIncomingSN = extPkt.Packet.SequenceNumber r.lastSN = mungedSN - r.highestIncomingTS = extPkt.Packet.Timestamp r.lastTS = mungedTS r.lastMarker = extPkt.Packet.Marker diff --git a/pkg/sfu/rtpmunger_test.go b/pkg/sfu/rtpmunger_test.go index 68ea60716..b4d764ca3 100644 --- a/pkg/sfu/rtpmunger_test.go +++ b/pkg/sfu/rtpmunger_test.go @@ -28,7 +28,6 @@ func TestSetLastSnTs(t *testing.T) { r.SetLastSnTs(extPkt) require.Equal(t, uint16(23332), r.highestIncomingSN) - require.Equal(t, uint32(0xabcdef), r.highestIncomingTS) require.Equal(t, uint16(23333), r.lastSN) require.Equal(t, uint32(0xabcdef), r.lastTS) require.Equal(t, uint16(0), r.snOffset) @@ -54,7 +53,6 @@ func TestUpdateSnTsOffsets(t *testing.T) { extPkt, _ = testutils.GetTestExtPacket(params) r.UpdateSnTsOffsets(extPkt, 1, 1) require.Equal(t, uint16(33332), r.highestIncomingSN) - require.Equal(t, uint32(0xabcdef), r.highestIncomingTS) require.Equal(t, uint16(23333), r.lastSN) require.Equal(t, uint32(0xabcdef), r.lastTS) require.Equal(t, uint16(9999), r.snOffset) @@ -73,7 +71,6 @@ func TestPacketDropped(t *testing.T) { extPkt, _ := testutils.GetTestExtPacket(params) r.SetLastSnTs(extPkt) require.Equal(t, uint16(23332), r.highestIncomingSN) - require.Equal(t, uint32(0xabcdef), r.highestIncomingTS) require.Equal(t, uint16(23333), r.lastSN) require.Equal(t, uint32(0xabcdef), r.lastTS) require.Equal(t, uint16(0), r.snOffset)