From f117ee511f9338c3bb6b5ae1b76cfa8cb5ba8e6b Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Fri, 7 Nov 2025 16:55:18 +0530 Subject: [PATCH] Track start up delay. (#4061) * Debug high forwarding latency missing. * log highest * log condition * update log * log * log * change log * Track start up delay. Digging into forwarding latency, there are a few things 1. Seems to be caused due to forwarding packets queued before bind. They would be in the queue till bind. There are two ways it is showing up a. Bind itself is delayed and releasing queued packets causes the high forwarding latency. b. There is a significant gap between bind and first packet being pulled off the queue to be forwarded, in one example 100ms. (a) is understandable if the signalling delays things. Can drop these packets without forwarding or indicate in the packet that it is a queued packet and drop it from forwarding latency calculation. Dropping is probably better as down stream components like egress will see a burst in these situations. (b) looks like go scheduling latency? Unsure. Logging more to understand this better. * log start --- pkg/sfu/buffer/buffer.go | 6 ++++++ pkg/sfu/receiver.go | 7 +++++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index 3dd4bf517..4ceb8430c 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -225,6 +225,7 @@ func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapabili return nil } + b.logger.Debugw("binding track") if codec.ClockRate == 0 { b.logger.Warnw("invalid codec", nil, "params", params, "codec", codec, "bitrates", bitrates) return errInvalidCodec @@ -322,6 +323,7 @@ func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapabili } } + b.logger.Debugw("releasing queued packets on bind") for _, pp := range b.pPackets { b.calc(pp.packet, nil, pp.arrivalTime, false) } @@ -415,6 +417,10 @@ func (b *Buffer) Write(pkt []byte) (n int, err error) { packet := make([]byte, len(pkt)) copy(packet, pkt) + if len(b.pPackets) == 0 { + b.logger.Debugw("received first packet") + } + startIdx := 0 overflow := len(b.pPackets) - max(b.maxVideoPkts, b.maxAudioPkts) if overflow > 0 { diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 26daf3644..c03110031 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -471,6 +471,7 @@ func (w *WebRTCReceiver) AddUpTrack(track TrackRemote, buff *buffer.Buffer) erro buff.SetPaused(w.streamTrackerManager.IsPaused()) go w.forwardRTP(layer, buff) + w.logger.Debugw("starting forwarder", "layer", layer) return nil } @@ -785,6 +786,7 @@ func (w *WebRTCReceiver) forwardRTP(layer int32, buff *buffer.Buffer) { } pktBuf := make([]byte, bucket.MaxPktSize) + w.logger.Debugw("starting forwarding", "layer", layer) for { pkt, err := buff.ReadExtended(pktBuf) if err == io.EOF { @@ -832,10 +834,11 @@ func (w *WebRTCReceiver) forwardRTP(layer int32, buff *buffer.Buffer) { if latency, isHigh := w.forwardStats.Update(pkt.Arrival, mono.UnixNano()); isHigh { w.logger.Infow( "high forwarding latency", - "latency", latency, + "latency", time.Duration(latency), + "queuingLatency", time.Duration(dequeuedAt-pkt.Arrival), "writeCount", writeCount.Load(), - "queuingLatency", dequeuedAt-pkt.Arrival, "isOutOfOrder", pkt.IsOutOfOrder, + "layer", layer, ) } }