From b91cd2e4eac2cfd22a6422a2cea0ae01c2eeb98f Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Sat, 27 Dec 2025 17:17:16 +0530 Subject: [PATCH] Rework receiver restart. (#4202) * Rework receiver restart. - Protect against concurrent restarts - Clean up and consolidate code around restarts - Use `RestartStream` of buffer rather than creating new buffers. * fix test --- pkg/sfu/buffer/buffer_base.go | 61 +++++---- pkg/sfu/buffer/buffer_test.go | 58 +++++++-- pkg/sfu/downtrack.go | 8 +- pkg/sfu/receiver_base.go | 227 +++++++++++++++++++++++----------- pkg/utils/opsqueue.go | 1 - 5 files changed, 250 insertions(+), 105 deletions(-) diff --git a/pkg/sfu/buffer/buffer_base.go b/pkg/sfu/buffer/buffer_base.go index ec9e6a0a8..dc54be875 100644 --- a/pkg/sfu/buffer/buffer_base.go +++ b/pkg/sfu/buffer/buffer_base.go @@ -77,7 +77,6 @@ type ExtPacket struct { AbsCaptureTimeExt *act.AbsCaptureTime IsOutOfOrder bool IsBuffered bool - IsRestart bool } // VideoSize represents video resolution @@ -111,9 +110,10 @@ type BufferProvider interface { GetSenderReportData() *livekit.RTCPSenderReportState OnRtcpSenderReport(fn func()) - OnFpsChanged(f func()) + OnFpsChanged(fn func()) OnVideoSizeChanged(fn func([]VideoSize)) OnCodecChange(fn func(webrtc.RTPCodecParameters)) + OnStreamRestart(fn func(string)) StartKeyFrameSeeder() StopKeyFrameSeeder() @@ -128,6 +128,8 @@ type BufferProvider interface { oobSequenceNumber uint16, ) (uint64, error) + RestartStream(reason string) + CloseWithReason(reason string) (*livekit.RTPStats, error) } @@ -190,6 +192,7 @@ type BufferBase struct { onFpsChanged func() onVideoSizeChanged func([]VideoSize) onCodecChange func(webrtc.RTPCodecParameters) + onStreamRestart func(string) // video size tracking for multiple spatial layers currentVideoSize [DefaultMaxLayerSpatial + 1]VideoSize @@ -212,7 +215,7 @@ type BufferBase struct { keyFrameSeederGeneration atomic.Int32 - restartOnNextPacket bool + isRestartPending bool isClosed atomic.Bool } @@ -479,8 +482,16 @@ func (b *BufferBase) stopRTPStats(reason string) (stats *livekit.RTPStats, stats return } -func (b *BufferBase) restartStream() { - b.logger.Infow("stream restart") +func (b *BufferBase) RestartStream(reason string) { + b.Lock() + defer b.Unlock() + + b.restartStreamLocked(reason) + b.readCond.Broadcast() +} + +func (b *BufferBase) restartStreamLocked(reason string) { + b.logger.Infow("stream restart", "reason", reason) // stop b.StopKeyFrameSeeder() @@ -509,13 +520,18 @@ func (b *BufferBase) restartStream() { b.createDDParserAndFrameRateCalculator() } + b.frameRateCalculated = false if b.frameRateCalculator[0] == nil { b.createFrameRateCalculator() } b.StartKeyFrameSeeder() - b.restartOnNextPacket = true + b.isRestartPending = true + + if f := b.onStreamRestart; f != nil { + go f(reason) + } } func (b *BufferBase) createDDParserAndFrameRateCalculator() { @@ -559,6 +575,12 @@ func (b *BufferBase) ReadExtended(buf []byte) (*ExtPacket, error) { return nil, io.EOF } + if b.isRestartPending { + b.isRestartPending = false + b.Unlock() + return nil, nil + } + if b.extPackets.Len() > 0 { ep := b.extPackets.PopFront() patched := b.patchExtPacket(ep, buf) @@ -731,7 +753,7 @@ func (b *BufferBase) HandleIncomingPacketLocked( return 0, fmt.Errorf("unhandled reason: %s", flowState.UnhandledReason.String()) } - b.restartStream() + b.restartStreamLocked("discontinuity") flowState = b.rtpStats.Update( arrivalTime, @@ -835,7 +857,7 @@ func (b *BufferBase) HandleIncomingPacketLocked( return 0, err } - ep := b.getExtPacket(rtpPacket, arrivalTime, isBuffered, b.restartOnNextPacket, flowState) + ep := b.getExtPacket(rtpPacket, arrivalTime, isBuffered, flowState) if ep == nil { return 0, errors.New("could not get ext packet") } @@ -856,7 +878,6 @@ func (b *BufferBase) HandleIncomingPacketLocked( b.updateNACKState(rtpPacket.SequenceNumber, flowState) } - b.restartOnNextPacket = false return ep.ExtSequenceNumber, nil } @@ -931,6 +952,7 @@ func (b *BufferBase) handleCodecChange(newPT uint8) { ) return } + b.logger.Infow( "codec changed", "oldPayload", b.payloadType, "newPayload", newPT, @@ -940,30 +962,18 @@ func (b *BufferBase) handleCodecChange(newPT uint8) { b.payloadType = newPT b.rtxPayloadType = rtxPt b.mime = mime.NormalizeMimeType(newCodec.MimeType) - b.frameRateCalculated = false - if b.ddExtID != 0 { - b.createDDParserAndFrameRateCalculator() - } - - if b.frameRateCalculator[0] == nil { - b.createFrameRateCalculator() - } - - b.bucket.ResyncOnNextPacket() + b.restartStreamLocked("codec-change") if f := b.onCodecChange; f != nil { go f(newCodec) } - - b.StartKeyFrameSeeder() } func (b *BufferBase) getExtPacket( rtpPacket *rtp.Packet, arrivalTime int64, isBuffered bool, - isRestart bool, flowState rtpstats.RTPFlowState, ) *ExtPacket { ep := ExtPacketFactory.Get().(*ExtPacket) @@ -978,7 +988,6 @@ func (b *BufferBase) getExtPacket( }, IsOutOfOrder: flowState.IsOutOfOrder, IsBuffered: isBuffered, - IsRestart: isRestart, } if len(ep.Packet.Payload) == 0 { @@ -1387,6 +1396,12 @@ func (b *BufferBase) OnCodecChange(fn func(webrtc.RTPCodecParameters)) { b.Unlock() } +func (b *BufferBase) OnStreamRestart(fn func(string)) { + b.Lock() + b.onStreamRestart = fn + b.Unlock() +} + // checkVideoSizeChange checks if video size has changed for a specific spatial layer and fires callback func (b *BufferBase) checkVideoSizeChange(videoSizes []VideoSize) { if len(videoSizes) > len(b.currentVideoSize) { diff --git a/pkg/sfu/buffer/buffer_test.go b/pkg/sfu/buffer/buffer_test.go index abd977d63..9b6e8b026 100644 --- a/pkg/sfu/buffer/buffer_test.go +++ b/pkg/sfu/buffer/buffer_test.go @@ -15,6 +15,7 @@ package buffer import ( + "fmt" "math" "sync" "testing" @@ -330,6 +331,7 @@ func TestCodecChange(t *testing.T) { buff := NewBuffer(123, 1, 1) require.NotNil(t, buff) changedCodec := make(chan webrtc.RTPCodecParameters, 1) + restartCleared := make(chan struct{}, 1) buff.OnCodecChange(func(rp webrtc.RTPCodecParameters) { select { case changedCodec <- rp: @@ -337,6 +339,17 @@ func TestCodecChange(t *testing.T) { t.Fatalf("codec change not consumed") } }) + buff.OnStreamRestart(func(reason string) { + require.Equal(t, "codec-change", reason) + + // read once to clear pending restart + var buf [1500]byte + extPkt, err := buff.ReadExtended(buf[:]) + require.NoError(t, err) + require.Nil(t, extPkt) + + restartCleared <- struct{}{} + }) h265Pkt := rtp.Packet{ Header: rtp.Header{ @@ -359,20 +372,28 @@ func TestCodecChange(t *testing.T) { case <-time.After(100 * time.Millisecond): } - buff.Bind(webrtc.RTPParameters{ - HeaderExtensions: nil, - Codecs: []webrtc.RTPCodecParameters{vp8Codec, h265Codec}, - }, vp8Codec.RTPCodecCapability, 0) + // Bind sets up VP8 as expected codec, + // packet written to the buffer above before Bind is H.265, + // that should trigger a codec change and a stream restart + // when the queued packets from Write before Bind get flushed + buff.Bind( + webrtc.RTPParameters{ + HeaderExtensions: nil, + Codecs: []webrtc.RTPCodecParameters{vp8Codec, h265Codec}, + }, + vp8Codec.RTPCodecCapability, + 0, + ) select { case c := <-changedCodec: require.Equal(t, h265Codec, c) - case <-time.After(1 * time.Second): t.Fatalf("expected codec change") } + <-restartCleared - // codec change after bind + // second codec change - writing VP8 packet after Bind should trigger another codec change vp8Pkt := rtp.Packet{ Header: rtp.Header{ Version: 2, @@ -391,12 +412,17 @@ func TestCodecChange(t *testing.T) { select { case c := <-changedCodec: require.Equal(t, vp8Codec, c) - case <-time.After(1 * time.Second): t.Fatalf("expected codec change") } + fmt.Printf("done second codec change\n") // REMOVE + <-restartCleared // out of order pkts can't cause codec change + // rewrite the VP8 packet to start the sequence after a stream restart + _, err = buff.Write(buf) + require.NoError(t, err) + h265Pkt.SequenceNumber = 2 h265Pkt.Timestamp = 2 buf, err = h265Pkt.Marshal() @@ -409,7 +435,7 @@ func TestCodecChange(t *testing.T) { case <-time.After(100 * time.Millisecond): } - // unknown codec should not cause change + // unknown codec should not cause change even if it is in order h265Pkt.SequenceNumber = 4 h265Pkt.Timestamp = 4 h265Pkt.PayloadType = 117 @@ -422,6 +448,22 @@ func TestCodecChange(t *testing.T) { t.Fatalf("unexpected codec change") case <-time.After(100 * time.Millisecond): } + + // an in-order packet should change codec again + h265Pkt.SequenceNumber = 5 + h265Pkt.Timestamp = 5 + h265Pkt.PayloadType = 116 + buf, err = h265Pkt.Marshal() + require.NoError(t, err) + _, err = buff.Write(buf) + require.NoError(t, err) + select { + case c := <-changedCodec: + require.Equal(t, h265Codec, c) + case <-time.After(1 * time.Second): + t.Fatalf("expected codec change") + } + <-restartCleared } func BenchmarkMemcpu(b *testing.B) { diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index e10d8033d..42dfed00e 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -1276,8 +1276,8 @@ func (d *DownTrack) handleMute(muted bool, changed bool) { // no layers required due to publisher mute (bit of circular dependency), // there will be a delay in layers turning back on when unmute happens. // Unmute path will require - // 1. unmute signalling out-of-band from publisher received by down track(s) - // 2. down track(s) notifying max layer + // 1. unmute signalling out-of-band from publisher received by downtrack(s) + // 2. downtrack(s) notifying max layer // 3. out-of-band notification about max layer sent back to the publisher // 4. publisher starts layer(s) // Ideally, on publisher mute, whatever layers were active remain active and @@ -1331,7 +1331,7 @@ func (d *DownTrack) CloseWithFlush(flush bool, isEnding bool) { return } - d.params.Logger.Debugw("close down track", "flushBlankFrame", flush) + d.params.Logger.Debugw("close downtrack", "flushBlankFrame", flush) if d.bindState.Load() == bindStateBound { d.forwarder.Mute(true, true) @@ -1447,7 +1447,7 @@ func (d *DownTrack) SeedState(state DownTrackState) { } if state.RTPStats != nil || state.ForwarderState != nil { - d.params.Logger.Debugw("seeding down track state", "state", state) + d.params.Logger.Debugw("seeding downtrack state", "state", state) } if state.RTPStats != nil { d.rtpStats.Seed(state.RTPStats) diff --git a/pkg/sfu/receiver_base.go b/pkg/sfu/receiver_base.go index d91143974..3751b896f 100644 --- a/pkg/sfu/receiver_base.go +++ b/pkg/sfu/receiver_base.go @@ -82,10 +82,6 @@ var ( // -------------------------------------- -type AudioLevelHandle func(level uint8, duration uint32) - -// -------------------------------------- - type Bitrates [buffer.DefaultMaxLayerSpatial + 1][buffer.DefaultMaxLayerTemporal + 1]int64 // -------------------------------------- @@ -223,6 +219,10 @@ type ReceiverBase struct { redTransformer atomic.Value // redTransformer interface + forwardersGeneration atomic.Uint32 + forwardersWaitGroup *sync.WaitGroup + restartInProgress bool + isClosed atomic.Bool } @@ -249,6 +249,8 @@ func NewReceiverBase(params ReceiverBaseParams, trackInfo *livekit.TrackInfo, co ) r.streamTrackerManager.SetListener(r) + r.startForwarderGeneration() + return r } @@ -345,33 +347,81 @@ func (r *ReceiverBase) UpdateTrackInfo(ti *livekit.TrackInfo) { } r.trackInfo = utils.CloneProto(ti) // MUTABLE-TRACKINFO-TODO: notify buffers, buffers may need to resize retransmission buffer if there is layer change - - if shouldResync { - r.resyncLocked("update-track-info") - } r.bufferMu.Unlock() r.streamTrackerManager.UpdateTrackInfo(ti) + + if shouldResync { + r.Restart("update-track-info") + } } func (r *ReceiverBase) Restart(reason string) { - r.bufferMu.Lock() - defer r.bufferMu.Unlock() - - r.resyncLocked(reason) + r.params.Logger.Infow("restarting receiver", "reason", reason) + r.restartInternal(reason, false) } -func (r *ReceiverBase) resyncLocked(reason string) { - // resync to avoid gaps in the forwarded sequence number - r.params.Logger.Debugw("resync receiver", "reason", reason) - r.clearAllBuffersLocked("resync") +func (r *ReceiverBase) restartInternal(reason string, isDetected bool) { + if r.IsClosed() { + return + } + // 1. guard against concurrent restarts + r.bufferMu.Lock() + if r.restartInProgress { + r.bufferMu.Unlock() + return + } + r.restartInProgress = true + r.bufferMu.Unlock() + + // 2. restart all the buffers + // if a stream was detected, skip external restart + // + // NOTE: The case of external restart and detected restart (which usually comes from one buffer) + // racing will miss restart on all buffers if detected restart from one buffer adds the guard + // against concurrent restart. But, that condition should be very rare if at all. + // External restart happens when the underlying track changes or when seeking + if !isDetected { + for _, buff := range r.GetAllBuffers() { + if buff == nil { + continue + } + + buff.RestartStream(reason) + } + } + + // 3. wait for the forwarders to finish + r.stopForwarderGeneration() + + // 4. reset stream tracker + r.streamTrackerManager.RemoveAllTrackers() + + // 5. signal attached downtracks to resync so that they can have proper sequencing on a receiver restart r.downTrackSpreader.Broadcast(func(dt TrackSender) { - dt.Resync() + dt.ReceiverRestart() }) if rt := r.redTransformer.Load(); rt != nil { - rt.(REDTransformer).ResyncDownTracks() + rt.(REDTransformer).OnStreamRestart() } + + // 6. move forwarder generation ahead + r.startForwarderGeneration() + + r.bufferMu.Lock() + // 7. release restart hold + r.restartInProgress = false + + // 8. restart forwarders + for layer, buff := range r.buffers { + if buff == nil { + continue + } + + r.startForwarderForBufferLocked(int32(layer), buff) + } + r.bufferMu.Unlock() } func (r *ReceiverBase) OnMaxLayerChange(fn func(mimeType mime.MimeType, maxLayer int32)) { @@ -675,6 +725,9 @@ func (r *ReceiverBase) setupBuffer(buff buffer.BufferProvider, layer int32, rtt if r.Kind() == webrtc.RTPCodecTypeVideo && layer == 0 { buff.OnCodecChange(r.handleCodecChange) } + buff.OnStreamRestart(func(reason string) { + r.restartInternal(reason, true) + }) var duration time.Duration switch layer { @@ -705,8 +758,9 @@ func (r *ReceiverBase) AddBuffer(buff buffer.BufferProvider, layer int32) { } func (r *ReceiverBase) StartBuffer(buff buffer.BufferProvider, layer int32) { - r.params.Logger.Debugw("starting forwarder", "layer", layer) - go r.forwardRTP(layer, buff) + r.bufferMu.Lock() + r.startForwarderForBufferLocked(layer, buff) + r.bufferMu.Unlock() } func (r *ReceiverBase) GetAllBuffers() [buffer.DefaultMaxLayerSpatial + 1]buffer.BufferProvider { @@ -723,18 +777,18 @@ func (r *ReceiverBase) GetAllBuffers() [buffer.DefaultMaxLayerSpatial + 1]buffer func (r *ReceiverBase) ClearAllBuffers(reason string) { r.bufferMu.Lock() - defer r.bufferMu.Unlock() - - r.clearAllBuffersLocked(reason) -} - -func (r *ReceiverBase) clearAllBuffersLocked(reason string) { - for idx := range len(r.buffers) { - if r.buffers[idx] != nil { - r.buffers[idx].CloseWithReason(reason) - } + buffers := r.buffers + for idx := range r.buffers { r.buffers[idx] = nil } + r.bufferMu.Unlock() + + for _, buff := range buffers { + if buff == nil { + continue + } + buff.CloseWithReason(reason) + } r.streamTrackerManager.RemoveAllTrackers() } @@ -788,16 +842,58 @@ func (r *ReceiverBase) GetAudioLevel() (float64, bool) { return 0, false } -func (r *ReceiverBase) forwardRTP(layer int32, buff buffer.BufferProvider) { +func (r *ReceiverBase) startForwarderGeneration() { + r.bufferMu.Lock() + defer r.bufferMu.Unlock() + + r.forwardersGeneration.Inc() + r.forwardersWaitGroup = &sync.WaitGroup{} +} + +func (r *ReceiverBase) stopForwarderGeneration() { + r.bufferMu.Lock() + r.forwardersGeneration.Inc() + forwarderWaitGroup := r.forwardersWaitGroup + r.bufferMu.Unlock() + + if forwarderWaitGroup != nil { + forwarderWaitGroup.Wait() + } +} + +func (r *ReceiverBase) startForwarderForBufferLocked(layer int32, buff buffer.BufferProvider) { + if r.restartInProgress { + return + } + + r.forwardersWaitGroup.Add(1) + + r.params.Logger.Debugw("starting forwarder", "layer", layer) + go r.forwardRTP(layer, buff, r.forwardersGeneration.Load(), r.forwardersWaitGroup) +} + +func (r *ReceiverBase) forwardRTP( + layer int32, + buff buffer.BufferProvider, + forwarderGeneration uint32, + wg *sync.WaitGroup, +) { + var ( + extPkt *buffer.ExtPacket + err error + ) + numPacketsForwarded := 0 numPacketsDropped := 0 defer func() { - if r.params.IsSelfClosing { - r.Close("forwarder-done", false) + if err == io.EOF { + if r.params.IsSelfClosing { + r.Close("forwarder-done", false) - r.streamTrackerManager.RemoveTracker(layer) - if r.videoLayerMode == livekit.VideoLayer_MULTIPLE_SPATIAL_LAYERS_PER_STREAM { - r.streamTrackerManager.RemoveAllTrackers() + r.streamTrackerManager.RemoveTracker(layer) + if r.videoLayerMode == livekit.VideoLayer_MULTIPLE_SPATIAL_LAYERS_PER_STREAM { + r.streamTrackerManager.RemoveAllTrackers() + } } } @@ -807,6 +903,7 @@ func (r *ReceiverBase) forwardRTP(layer int32, buff buffer.BufferProvider) { "numPacketsForwarded", numPacketsForwarded, "numPacketsDropped", numPacketsDropped, ) + wg.Done() }() var spatialTrackers [buffer.DefaultMaxLayerSpatial + 1]streamtracker.StreamTrackerWorker @@ -817,29 +914,22 @@ func (r *ReceiverBase) forwardRTP(layer int32, buff buffer.BufferProvider) { pktBuf := make([]byte, bucket.RTPMaxPktSize) r.params.Logger.Debugw("starting forwarding", "layer", layer) - for { - pkt, err := buff.ReadExtended(pktBuf) + + for r.forwardersGeneration.Load() == forwarderGeneration { + extPkt, err = buff.ReadExtended(pktBuf) if err == io.EOF { return } + if extPkt == nil { + continue + } dequeuedAt := mono.UnixNano() - if pkt.IsRestart { - r.params.Logger.Infow("stream restarted", "layer", layer) - r.downTrackSpreader.Broadcast(func(dt TrackSender) { - dt.ReceiverRestart() - }) - - if rt := r.redTransformer.Load(); rt != nil { - rt.(REDTransformer).OnStreamRestart() - } - } - - if pkt.Packet.PayloadType != uint8(r.params.Codec.PayloadType) { + if extPkt.Packet.PayloadType != uint8(r.params.Codec.PayloadType) { // drop packets as we don't support codec fallback directly r.params.Logger.Debugw( "dropping packet - payload mismatch", - "packetPayloadType", pkt.Packet.PayloadType, + "packetPayloadType", extPkt.Packet.PayloadType, "payloadType", r.params.Codec.PayloadType, ) numPacketsDropped++ @@ -847,15 +937,15 @@ func (r *ReceiverBase) forwardRTP(layer int32, buff buffer.BufferProvider) { } spatialLayer := layer - if pkt.Spatial >= 0 { + if extPkt.Spatial >= 0 { // svc packet, take spatial layer info from packet - spatialLayer = pkt.Spatial + spatialLayer = extPkt.Spatial } if int(spatialLayer) >= len(spatialTrackers) { r.params.Logger.Errorw( "unexpected spatial layer", nil, "spatialLayer", spatialLayer, - "pktSpatialLayer", pkt.Spatial, + "pktSpatialLayer", extPkt.Spatial, ) numPacketsDropped++ continue @@ -863,22 +953,21 @@ func (r *ReceiverBase) forwardRTP(layer int32, buff buffer.BufferProvider) { var writeCount atomic.Int32 r.downTrackSpreader.Broadcast(func(dt TrackSender) { - writeCount.Add(dt.WriteRTP(pkt, spatialLayer)) + writeCount.Add(dt.WriteRTP(extPkt, spatialLayer)) }) - if rt := r.redTransformer.Load(); rt != nil { - writeCount.Add(rt.(REDTransformer).ForwardRTP(pkt, spatialLayer)) + writeCount.Add(rt.(REDTransformer).ForwardRTP(extPkt, spatialLayer)) } // track delay/jitter - if writeCount.Load() > 0 && r.forwardStats != nil && !pkt.IsBuffered { - if latency, isHigh := r.forwardStats.Update(pkt.Arrival, mono.UnixNano()); isHigh { + if writeCount.Load() > 0 && r.forwardStats != nil && !extPkt.IsBuffered { + if latency, isHigh := r.forwardStats.Update(extPkt.Arrival, mono.UnixNano()); isHigh { r.params.Logger.Debugw( "high forwarding latency", "latency", time.Duration(latency), - "queuingLatency", time.Duration(dequeuedAt-pkt.Arrival), + "queuingLatency", time.Duration(dequeuedAt-extPkt.Arrival), "writeCount", writeCount.Load(), - "isOutOfOrder", pkt.IsOutOfOrder, + "isOutOfOrder", extPkt.IsOutOfOrder, "layer", layer, ) } @@ -889,7 +978,7 @@ func (r *ReceiverBase) forwardRTP(layer int32, buff buffer.BufferProvider) { if spatialTrackers[spatialLayer] == nil { spatialTrackers[spatialLayer] = r.streamTrackerManager.GetTracker(spatialLayer) if spatialTrackers[spatialLayer] == nil { - if r.videoLayerMode == livekit.VideoLayer_MULTIPLE_SPATIAL_LAYERS_PER_STREAM && pkt.DependencyDescriptor != nil { + if r.videoLayerMode == livekit.VideoLayer_MULTIPLE_SPATIAL_LAYERS_PER_STREAM && extPkt.DependencyDescriptor != nil { r.streamTrackerManager.AddDependencyDescriptorTrackers() } spatialTrackers[spatialLayer] = r.streamTrackerManager.AddTracker(spatialLayer) @@ -897,19 +986,19 @@ func (r *ReceiverBase) forwardRTP(layer int32, buff buffer.BufferProvider) { } if spatialTrackers[spatialLayer] != nil { spatialTrackers[spatialLayer].Observe( - pkt.Temporal, - len(pkt.RawPacket), - len(pkt.Packet.Payload), - pkt.Packet.Marker, - pkt.Packet.Timestamp, - pkt.DependencyDescriptor, + extPkt.Temporal, + len(extPkt.RawPacket), + len(extPkt.Packet.Payload), + extPkt.Packet.Marker, + extPkt.Packet.Timestamp, + extPkt.DependencyDescriptor, ) } } numPacketsForwarded++ - buffer.ReleaseExtPacket(pkt) + buffer.ReleaseExtPacket(extPkt) } } diff --git a/pkg/utils/opsqueue.go b/pkg/utils/opsqueue.go index 7236165ea..65e0dafe2 100644 --- a/pkg/utils/opsqueue.go +++ b/pkg/utils/opsqueue.go @@ -128,7 +128,6 @@ func (oq *opsQueueBase[T]) Enqueue(op T) { select { case oq.wake <- struct{}{}: default: - oq.params.Logger.Debugw("could not wake ops queue", "name", oq.params.Name) } } }