From 9afb0873aec89eacdc322efc826bb840f31681e8 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Wed, 30 Aug 2023 19:43:24 +0530 Subject: [PATCH 1/5] Do not process packets not processed by RTPStats. (#2015) Seeing the case of a stream starting with padding packets on migration. As publisher in that case is always sending packets. it is possible that the new node gets padding packets at the start. Processing them in buffer leads to trying to drop that padding packet and adding an exclusion range. That fails because the extended sequence number is not available for unprocessed packets. It is okay to drop them as they will be dropped anyway. But, they are useful for bandwidth estimation. So, headers are processed even if the packet is RTPStats unprocessed. --- pkg/sfu/buffer/buffer.go | 7 ++++++- pkg/sfu/buffer/rtpstats.go | 4 ++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index 93471804e..a08d75d5a 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -416,7 +416,12 @@ func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) { } flowState := b.updateStreamState(&rtpPacket, arrivalTime) + // process header extensions always as padding packets could be used for probing b.processHeaderExtensions(&rtpPacket, arrivalTime) + if flowState.IsNotHandled { + return + } + if len(rtpPacket.Payload) == 0 && (!flowState.IsOutOfOrder || flowState.IsDuplicate) { // drop padding only in-order or duplicate packet if !flowState.IsOutOfOrder { @@ -436,7 +441,7 @@ func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) { // 44 - padding only - out-of-order + duplicate - dropped as duplicate // if err := b.snRangeMap.ExcludeRange(flowState.ExtSequenceNumber, flowState.ExtSequenceNumber+1); err != nil { - b.logger.Errorw("could not exclude range", err, "sn", flowState.ExtSequenceNumber) + b.logger.Errorw("could not exclude range", err, "sn", rtpPacket.SequenceNumber, "esn", flowState.ExtSequenceNumber) } } return diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index d9f58ba67..16dd309c8 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -58,6 +58,8 @@ func RTPDriftToString(r *livekit.RTPDrift) string { // ------------------------------------------------------- type RTPFlowState struct { + IsNotHandled bool + HasLoss bool LossStartInclusive uint64 LossEndExclusive uint64 @@ -365,6 +367,7 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa defer r.lock.Unlock() if !r.endTime.IsZero() { + flowState.IsNotHandled = true return } @@ -373,6 +376,7 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa if !r.initialized { if payloadSize == 0 { // do not start on a padding only packet + flowState.IsNotHandled = true return } From f9b613be41ad48784773f203f1b282c79b33d67f Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Wed, 30 Aug 2023 22:18:27 +0530 Subject: [PATCH 2/5] Log resync data (#2016) --- pkg/sfu/buffer/rtpstats.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index 16dd309c8..978604f97 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -525,6 +525,7 @@ func (r *RTPStats) Resync(esn uint64, ets uint64, at time.Time) { r.sequenceNumber.ResetHighest(esn - 1) r.timestamp.ResetHighest(ets) r.highestTime = at + r.logger.Debugw("resync", "extSequenceNumber", esn, "extTimestamp", ets, "at", at) } func (r *RTPStats) getPacketsExpected() uint64 { From da52167cd967d02407ff3e0d405b4259e6f5e9cc Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Wed, 30 Aug 2023 23:08:53 +0530 Subject: [PATCH 3/5] Adjust extended sequence number to account for dropped packets (#2017) --- pkg/sfu/buffer/buffer.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index a08d75d5a..b94cb0265 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -453,7 +453,8 @@ func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) { b.logger.Errorw("could not get sequence number adjustment", err, "sn", flowState.ExtSequenceNumber, "payloadSize", len(rtpPacket.Payload)) return } - rtpPacket.Header.SequenceNumber = uint16(flowState.ExtSequenceNumber - snAdjustment) + flowState.ExtSequenceNumber -= snAdjustment + rtpPacket.Header.SequenceNumber = uint16(flowState.ExtSequenceNumber) _, err = b.bucket.AddPacketWithSequenceNumber(pkt, rtpPacket.Header.SequenceNumber) if err != nil { if err != bucket.ErrRTXPacket { From 6e3a20ebf4571ca20f603768363a4712d0cd548e Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Thu, 31 Aug 2023 00:33:00 +0530 Subject: [PATCH 4/5] Temporary packet debug (#2018) --- pkg/sfu/buffer/buffer.go | 5 +++++ pkg/sfu/downtrack.go | 6 +++++- pkg/sfu/sequencer.go | 5 +++-- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index b94cb0265..3401d603a 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -440,9 +440,12 @@ func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) { // 45 - regular packet - offset = 2 (running offset) - passed through with adjusted sequence number as 43 // 44 - padding only - out-of-order + duplicate - dropped as duplicate // + b.logger.Debugw("PDBG dropping padding", "sn", rtpPacket.Header.SequenceNumber, "esn", flowState.ExtSequenceNumber) // REMOVE if err := b.snRangeMap.ExcludeRange(flowState.ExtSequenceNumber, flowState.ExtSequenceNumber+1); err != nil { b.logger.Errorw("could not exclude range", err, "sn", rtpPacket.SequenceNumber, "esn", flowState.ExtSequenceNumber) } + } else { + b.logger.Debugw("PDBG dropping duplicate padding", "sn", rtpPacket.Header.SequenceNumber, "esn", flowState.ExtSequenceNumber) // REMOVE } return } @@ -453,6 +456,7 @@ func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) { b.logger.Errorw("could not get sequence number adjustment", err, "sn", flowState.ExtSequenceNumber, "payloadSize", len(rtpPacket.Payload)) return } + b.logger.Debugw("PDBG incoming packet", "sn", rtpPacket.Header.SequenceNumber, "esn", flowState.ExtSequenceNumber, "adjust", snAdjustment, "size", len(rtpPacket.Payload)) // REMOVE flowState.ExtSequenceNumber -= snAdjustment rtpPacket.Header.SequenceNumber = uint16(flowState.ExtSequenceNumber) _, err = b.bucket.AddPacketWithSequenceNumber(pkt, rtpPacket.Header.SequenceNumber) @@ -494,6 +498,7 @@ func (b *Buffer) patchExtPacket(ep *ExtPacket, buf []byte) *ExtPacket { } pkt.Payload = buf[payloadStart:payloadEnd] ep.Packet = &pkt + b.logger.Debugw("PDBG forwarding packet", "sn", pkt.Header.SequenceNumber, "esn", ep.ExtSequenceNumber, "size", len(ep.Packet.Payload)) // REMOVE return ep } diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 44414d7c8..5a3328d26 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -684,7 +684,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { } } if d.sequencer != nil { - d.sequencer.push( + isPushed := d.sequencer.push( extPkt.Packet.SequenceNumber, tp.rtp.sequenceNumber, tp.rtp.timestamp, @@ -693,8 +693,12 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { tp.codecBytes, tp.ddBytes, ) + if !isPushed { + d.params.Logger.Debugw("PDBG sequencer push failed", "isn", extPkt.Packet.SequenceNumber, "iesn", extPkt.ExtSequenceNumber, "layer", layer, "osn", tp.rtp.sequenceNumber, "size", len(payload)) // REMOVE + } } + d.params.Logger.Debugw("PDBG forwarding packet", "isn", extPkt.Packet.SequenceNumber, "iesn", extPkt.ExtSequenceNumber, "layer", layer, "osn", tp.rtp.sequenceNumber, "size", len(payload)) // REMOVE d.pacer.Enqueue(pacer.Packet{ Header: hdr, Extensions: extensions, diff --git a/pkg/sfu/sequencer.go b/pkg/sfu/sequencer.go index a2699427b..e5ed8299a 100644 --- a/pkg/sfu/sequencer.go +++ b/pkg/sfu/sequencer.go @@ -116,13 +116,13 @@ func (s *sequencer) push( layer int8, codecBytes []byte, ddBytes []byte, -) { +) bool { s.Lock() defer s.Unlock() slot, isValid := s.getSlot(offSn) if !isValid { - return + return false } s.meta[s.metaWritePtr] = packetMeta{ @@ -142,6 +142,7 @@ func (s *sequencer) push( if s.metaWritePtr >= len(s.meta) { s.metaWritePtr -= len(s.meta) } + return true } func (s *sequencer) pushPadding(offSn uint16) { From 790954bbe91ec3b0a60e500db3275334852c9781 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Thu, 31 Aug 2023 11:45:42 +0530 Subject: [PATCH 5/5] Use RTCP SR to resync. (#2021) Remove packet debug code that was added temporarily. --- pkg/rtc/mediatrack.go | 2 +- pkg/sfu/buffer/buffer.go | 21 ++++--- pkg/sfu/buffer/rtpstats.go | 115 ++++++++++++++++++++++++++++++------- pkg/sfu/downtrack.go | 6 +- pkg/sfu/sequencer.go | 5 +- 5 files changed, 108 insertions(+), 41 deletions(-) diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index 72f0a6f3a..365b7434a 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -228,7 +228,7 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra case *rtcp.SourceDescription: // do nothing for now case *rtcp.SenderReport: - buff.SetSenderReportData(pkt.RTPTime, pkt.NTPTime) + buff.SetSenderReportData(pkt.RTPTime, pkt.NTPTime, pkt.PacketCount) } } }) diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index 3401d603a..aaeb08925 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -83,7 +83,8 @@ type Buffer struct { closed atomic.Bool mime string - snRangeMap *utils.RangeMap[uint64, uint64] + snRangeMap *utils.RangeMap[uint64, uint64] + paddingOnlyDrops uint64 latestTSForAudioLevelInitialized bool latestTSForAudioLevel uint32 @@ -440,12 +441,10 @@ func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) { // 45 - regular packet - offset = 2 (running offset) - passed through with adjusted sequence number as 43 // 44 - padding only - out-of-order + duplicate - dropped as duplicate // - b.logger.Debugw("PDBG dropping padding", "sn", rtpPacket.Header.SequenceNumber, "esn", flowState.ExtSequenceNumber) // REMOVE if err := b.snRangeMap.ExcludeRange(flowState.ExtSequenceNumber, flowState.ExtSequenceNumber+1); err != nil { b.logger.Errorw("could not exclude range", err, "sn", rtpPacket.SequenceNumber, "esn", flowState.ExtSequenceNumber) } - } else { - b.logger.Debugw("PDBG dropping duplicate padding", "sn", rtpPacket.Header.SequenceNumber, "esn", flowState.ExtSequenceNumber) // REMOVE + b.paddingOnlyDrops++ } return } @@ -456,7 +455,6 @@ func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) { b.logger.Errorw("could not get sequence number adjustment", err, "sn", flowState.ExtSequenceNumber, "payloadSize", len(rtpPacket.Payload)) return } - b.logger.Debugw("PDBG incoming packet", "sn", rtpPacket.Header.SequenceNumber, "esn", flowState.ExtSequenceNumber, "adjust", snAdjustment, "size", len(rtpPacket.Payload)) // REMOVE flowState.ExtSequenceNumber -= snAdjustment rtpPacket.Header.SequenceNumber = uint16(flowState.ExtSequenceNumber) _, err = b.bucket.AddPacketWithSequenceNumber(pkt, rtpPacket.Header.SequenceNumber) @@ -498,7 +496,6 @@ func (b *Buffer) patchExtPacket(ep *ExtPacket, buf []byte) *ExtPacket { } pkt.Payload = buf[payloadStart:payloadEnd] ep.Packet = &pkt - b.logger.Debugw("PDBG forwarding packet", "sn", pkt.Header.SequenceNumber, "esn", ep.ExtSequenceNumber, "size", len(ep.Packet.Payload)) // REMOVE return ep } @@ -698,14 +695,16 @@ func (b *Buffer) buildReceptionReport() *rtcp.ReceptionReport { return b.rtpStats.SnapshotRtcpReceptionReport(b.mediaSSRC, b.lastFractionLostToReport, b.rrSnapshotId) } -func (b *Buffer) SetSenderReportData(rtpTime uint32, ntpTime uint64) { +func (b *Buffer) SetSenderReportData(rtpTime uint32, ntpTime uint64, packetCount uint32) { + b.RLock() srData := &RTCPSenderReportData{ - RTPTimestamp: rtpTime, - NTPTimestamp: mediatransportutil.NtpTime(ntpTime), - At: time.Now(), + RTPTimestamp: rtpTime, + NTPTimestamp: mediatransportutil.NtpTime(ntpTime), + PacketCount: packetCount, + PaddingOnlyDrops: b.paddingOnlyDrops, + At: time.Now(), } - b.RLock() if b.rtpStats != nil { b.rtpStats.SetRtcpSenderReportData(srData) } diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index 978604f97..357b50902 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -131,10 +131,13 @@ type SnInfo struct { } type RTCPSenderReportData struct { - RTPTimestamp uint32 - RTPTimestampExt uint64 - NTPTimestamp mediatransportutil.NtpTime - At time.Time + RTPTimestamp uint32 + RTPTimestampExt uint64 + NTPTimestamp mediatransportutil.NtpTime + PacketCount uint32 + PacketCountExt uint64 + PaddingOnlyDrops uint64 + At time.Time } type RTPStatsParams struct { @@ -149,7 +152,9 @@ type RTPStats struct { lock sync.RWMutex - initialized bool + initialized bool + resyncOnNextPacket bool + shouldDiscountPaddingOnlyDrops bool startTime time.Time endTime time.Time @@ -241,6 +246,8 @@ func (r *RTPStats) Seed(from *RTPStats) { } r.initialized = from.initialized + r.resyncOnNextPacket = from.resyncOnNextPacket + r.shouldDiscountPaddingOnlyDrops = from.shouldDiscountPaddingOnlyDrops r.startTime = from.startTime // do not clone endTime as a non-zero endTime indicates an ended object @@ -371,6 +378,75 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa return } + if r.resyncOnNextPacket { + r.resyncOnNextPacket = false + + if r.initialized { + extHighestSN := r.sequenceNumber.GetExtendedHighest() + var newestPacketCount uint64 + var paddingOnlyDrops uint64 + var extExpectedHighestSN uint64 + var expectedHighestSN uint16 + var snCycles uint64 + + extHighestTS := r.timestamp.GetExtendedHighest() + var newestTS uint64 + var extExpectedHighestTS uint64 + var expectedHighestTS uint32 + var tsCycles uint64 + if r.srNewest != nil { + newestPacketCount = r.srNewest.PacketCountExt + paddingOnlyDrops = r.srNewest.PaddingOnlyDrops + if newestPacketCount != 0 { + extExpectedHighestSN = r.sequenceNumber.GetExtendedStart() + newestPacketCount + if r.shouldDiscountPaddingOnlyDrops { + extExpectedHighestSN -= paddingOnlyDrops + } + expectedHighestSN = uint16(extExpectedHighestSN & 0xFFFF) + snCycles = extExpectedHighestSN & 0xFFFF_FFFF_FFFF_0000 + if rtph.SequenceNumber-expectedHighestSN < (1<<15) && rtph.SequenceNumber < expectedHighestSN { + snCycles += (1 << 16) + } + if snCycles != 0 && expectedHighestSN-rtph.SequenceNumber < (1<<15) && expectedHighestSN < rtph.SequenceNumber { + snCycles -= (1 << 16) + } + } + + newestTS = r.srNewest.RTPTimestampExt + extExpectedHighestTS = newestTS + expectedHighestTS = uint32(extExpectedHighestTS & 0xFFFF_FFFF) + tsCycles = extExpectedHighestTS & 0xFFFF_FFFF_0000_0000 + if rtph.Timestamp-expectedHighestTS < (1<<31) && rtph.Timestamp < expectedHighestTS { + tsCycles += (1 << 32) + } + if tsCycles != 0 && expectedHighestTS-rtph.Timestamp < (1<<31) && expectedHighestTS < rtph.Timestamp { + tsCycles -= (1 << 32) + } + } + r.sequenceNumber.ResetHighest(snCycles + uint64(rtph.SequenceNumber) - 1) + r.timestamp.ResetHighest(tsCycles + uint64(rtph.Timestamp)) + r.highestTime = packetTime + r.logger.Debugw( + "resync", + "newestPacketCount", newestPacketCount, + "paddingOnlyDrops", paddingOnlyDrops, + "extExpectedHighestSN", extExpectedHighestSN, + "expectedHighestSN", expectedHighestSN, + "snCycles", snCycles, + "rtpSN", rtph.SequenceNumber, + "beforeExtHighestSN", extHighestSN, + "afterExtHighestSN", r.sequenceNumber.GetExtendedHighest(), + "newestTS", newestTS, + "extExpectedHighestTS", extExpectedHighestTS, + "expectedHighestTS", expectedHighestTS, + "tsCycles", tsCycles, + "rtpTS", rtph.Timestamp, + "beforeExtHighestTS", extHighestTS, + "afterExtHighestTS", r.timestamp.GetExtendedHighest(), + ) + } + } + var resSN utils.WrapAroundUpdateResult[uint64] var resTS utils.WrapAroundUpdateResult[uint64] if !r.initialized { @@ -515,17 +591,12 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa return } -func (r *RTPStats) Resync(esn uint64, ets uint64, at time.Time) { +func (r *RTPStats) ResyncOnNextPacket(shouldDiscountPaddingOnlyDrops bool) { r.lock.Lock() defer r.lock.Unlock() - if !r.initialized { - return - } - r.sequenceNumber.ResetHighest(esn - 1) - r.timestamp.ResetHighest(ets) - r.highestTime = at - r.logger.Debugw("resync", "extSequenceNumber", esn, "extTimestamp", ets, "at", at) + r.resyncOnNextPacket = true + r.shouldDiscountPaddingOnlyDrops = shouldDiscountPaddingOnlyDrops } func (r *RTPStats) getPacketsExpected() uint64 { @@ -873,21 +944,23 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) { return } - cycles := uint64(0) + tsCycles := uint64(0) + pcCycles := uint64(0) if r.srNewest != nil { - cycles = r.srNewest.RTPTimestampExt & 0xFFFF_FFFF_0000_0000 + tsCycles = r.srNewest.RTPTimestampExt & 0xFFFF_FFFF_0000_0000 if (srData.RTPTimestamp-r.srNewest.RTPTimestamp) < (1<<31) && srData.RTPTimestamp < r.srNewest.RTPTimestamp { - cycles += (1 << 32) + tsCycles += (1 << 32) } - ntpDiffSinceLast := srData.NTPTimestamp.Time().Sub(r.srNewest.NTPTimestamp.Time()) - rtpDiff := uint64(ntpDiffSinceLast.Seconds() * float64(r.params.ClockRate)) - goArounds := rtpDiff / (1 << 32) - cycles += goArounds * (1 << 32) + pcCycles = r.srNewest.PacketCountExt & 0xFFFF_FFFF_0000_0000 + if (srData.PacketCount-r.srNewest.PacketCount) < (1<<31) && srData.PacketCount < r.srNewest.PacketCount { + pcCycles += (1 << 32) + } } srDataCopy := *srData - srDataCopy.RTPTimestampExt = uint64(srDataCopy.RTPTimestamp) + cycles + srDataCopy.RTPTimestampExt = uint64(srDataCopy.RTPTimestamp) + tsCycles + srDataCopy.PacketCountExt = uint64(srDataCopy.PacketCount) + pcCycles r.maybeAdjustFirstPacketTime(srDataCopy.RTPTimestampExt) diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 5a3328d26..44414d7c8 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -684,7 +684,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { } } if d.sequencer != nil { - isPushed := d.sequencer.push( + d.sequencer.push( extPkt.Packet.SequenceNumber, tp.rtp.sequenceNumber, tp.rtp.timestamp, @@ -693,12 +693,8 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { tp.codecBytes, tp.ddBytes, ) - if !isPushed { - d.params.Logger.Debugw("PDBG sequencer push failed", "isn", extPkt.Packet.SequenceNumber, "iesn", extPkt.ExtSequenceNumber, "layer", layer, "osn", tp.rtp.sequenceNumber, "size", len(payload)) // REMOVE - } } - d.params.Logger.Debugw("PDBG forwarding packet", "isn", extPkt.Packet.SequenceNumber, "iesn", extPkt.ExtSequenceNumber, "layer", layer, "osn", tp.rtp.sequenceNumber, "size", len(payload)) // REMOVE d.pacer.Enqueue(pacer.Packet{ Header: hdr, Extensions: extensions, diff --git a/pkg/sfu/sequencer.go b/pkg/sfu/sequencer.go index e5ed8299a..a2699427b 100644 --- a/pkg/sfu/sequencer.go +++ b/pkg/sfu/sequencer.go @@ -116,13 +116,13 @@ func (s *sequencer) push( layer int8, codecBytes []byte, ddBytes []byte, -) bool { +) { s.Lock() defer s.Unlock() slot, isValid := s.getSlot(offSn) if !isValid { - return false + return } s.meta[s.metaWritePtr] = packetMeta{ @@ -142,7 +142,6 @@ func (s *sequencer) push( if s.metaWritePtr >= len(s.meta) { s.metaWritePtr -= len(s.meta) } - return true } func (s *sequencer) pushPadding(offSn uint16) {