From ae5fb7e882d319a58d8f42ab3e3f92659ea89c66 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Thu, 6 Nov 2025 12:31:49 +0530 Subject: [PATCH] Add packet to forwarding stats only if packet is forwarded. (#4056) Packets not being forwarded were getting included in forwarding stats calculation and skewing the measurement towards a smaller number. The latency measurement does not include the batch IO of packets on send. With a 2ms batching, that will add an average latency of 1ms. --- pkg/service/wire_gen.go | 14 +++++++------- pkg/sfu/downtrack.go | 30 +++++++++++++++--------------- pkg/sfu/downtrackspreader.go | 5 ++--- pkg/sfu/forwarder.go | 4 ++-- pkg/sfu/receiver.go | 13 ++++++++----- pkg/sfu/redprimaryreceiver.go | 20 +++++++++++++------- pkg/sfu/redreceiver.go | 18 +++++++++++++----- pkg/sfu/redreceiver_test.go | 4 ++-- pkg/sfu/rtpmunger.go | 10 +++++----- pkg/sfu/rtpmunger_test.go | 12 ++++++------ 10 files changed, 73 insertions(+), 57 deletions(-) diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 22b1e0c1c..b33cc1744 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -89,23 +89,23 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live } rtcEgressLauncher := NewEgressLauncher(egressClient, ioInfoService, objectStore) topicFormatter := rpc.NewTopicFormatter() - v, err := rpc.NewTypedRoomClient(clientParams) + roomClient, err := rpc.NewTypedRoomClient(clientParams) if err != nil { return nil, err } - v2, err := rpc.NewTypedParticipantClient(clientParams) + participantClient, err := rpc.NewTypedParticipantClient(clientParams) if err != nil { return nil, err } - roomService, err := NewRoomService(limitConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, v, v2) + roomService, err := NewRoomService(limitConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, roomClient, participantClient) if err != nil { return nil, err } - v3, err := rpc.NewTypedAgentDispatchInternalClient(clientParams) + agentDispatchInternalClient, err := rpc.NewTypedAgentDispatchInternalClient(clientParams) if err != nil { return nil, err } - agentDispatchService := NewAgentDispatchService(v3, topicFormatter, roomAllocator, router) + agentDispatchService := NewAgentDispatchService(agentDispatchInternalClient, topicFormatter, roomAllocator, router) egressService := NewEgressService(egressClient, rtcEgressLauncher, ioInfoService, roomService) ingressConfig := getIngressConfig(conf) ingressClient, err := rpc.NewIngressClient(clientParams) @@ -120,11 +120,11 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live } sipService := NewSIPService(sipConfig, nodeID, messageBus, sipClient, sipStore, roomService, telemetryService) rtcService := NewRTCService(conf, roomAllocator, router, telemetryService) - v4, err := rpc.NewTypedWHIPParticipantClient(clientParams) + whipParticipantClient, err := rpc.NewTypedWHIPParticipantClient(clientParams) if err != nil { return nil, err } - serviceWHIPService, err := NewWHIPService(conf, router, roomAllocator, clientParams, topicFormatter, v4) + serviceWHIPService, err := NewWHIPService(conf, router, roomAllocator, clientParams, topicFormatter, whipParticipantClient) if err != nil { return nil, err } diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 79a559ca0..35b99e355 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -57,7 +57,7 @@ type TrackSender interface { UpTrackMaxPublishedLayerChange(maxPublishedLayer int32) UpTrackMaxTemporalLayerSeenChange(maxTemporalLayerSeen int32) UpTrackBitrateReport(availableLayers []int32, bitrates Bitrates) - WriteRTP(p *buffer.ExtPacket, layer int32) error + WriteRTP(p *buffer.ExtPacket, layer int32) bool Close() IsClosed() bool // ID is the globally unique identifier for this Track. @@ -94,13 +94,13 @@ const ( // ------------------------------------------------------------------- var ( - ErrUnknownKind = errors.New("unknown kind of codec") - ErrOutOfOrderSequenceNumberCacheMiss = errors.New("out-of-order sequence number not found in cache") - ErrPaddingOnlyPacket = errors.New("padding only packet that need not be forwarded") - ErrDuplicatePacket = errors.New("duplicate packet") - ErrPaddingNotOnFrameBoundary = errors.New("padding cannot send on non-frame boundary") - ErrDownTrackAlreadyBound = errors.New("already bound") - ErrPayloadOverflow = errors.New("payload overflow") + errUnknownKind = errors.New("unknown kind of codec") + errOutOfOrderSequenceNumberCacheMiss = errors.New("out-of-order sequence number not found in cache") + errPaddingOnlyPacket = errors.New("padding only packet that need not be forwarded") + errDuplicatePacket = errors.New("duplicate packet") + errPaddingNotOnFrameBoundary = errors.New("padding cannot send on non-frame boundary") + errDownTrackAlreadyBound = errors.New("already bound") + errPayloadOverflow = errors.New("payload overflow") ) var ( @@ -452,7 +452,7 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters, d.bindLock.Lock() if d.bindState.Load() != bindStateUnbound { d.bindLock.Unlock() - return webrtc.RTPCodecParameters{}, ErrDownTrackAlreadyBound + return webrtc.RTPCodecParameters{}, errDownTrackAlreadyBound } // the TrackLocalContext's codec parameters will be set to the bound codec after Bind returns, @@ -984,9 +984,9 @@ func (d *DownTrack) maxLayerNotifierWorker() { } // WriteRTP writes an RTP Packet to the DownTrack -func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { +func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) bool { if !d.writable.Load() { - return nil + return false } tp, err := d.forwarder.GetTranslationParams(extPkt, layer) @@ -994,7 +994,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { if err != nil { d.params.Logger.Errorw("could not get translation params", err) } - return err + return false } poolEntity := PacketFactory.Get().(*[]byte) @@ -1003,12 +1003,12 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { n := copy(payload[len(tp.codecBytes):], extPkt.Packet.Payload[tp.incomingHeaderSize:]) if n != len(extPkt.Packet.Payload[tp.incomingHeaderSize:]) { d.params.Logger.Errorw( - "payload overflow", nil, + "payload overflow", errPayloadOverflow, "want", len(extPkt.Packet.Payload[tp.incomingHeaderSize:]), "have", n, ) PacketFactory.Put(poolEntity) - return ErrPayloadOverflow + return false } payload = payload[:len(tp.codecBytes)+n] @@ -1126,7 +1126,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { sal.OnResume(d) } } - return nil + return true } // WritePaddingRTP tries to write as many padding only RTP packets as necessary diff --git a/pkg/sfu/downtrackspreader.go b/pkg/sfu/downtrackspreader.go index 98057a0d4..456a2ab3b 100644 --- a/pkg/sfu/downtrackspreader.go +++ b/pkg/sfu/downtrackspreader.go @@ -86,10 +86,10 @@ func (d *DownTrackSpreader) HasDownTrack(subscriberID livekit.ParticipantID) boo return ok } -func (d *DownTrackSpreader) Broadcast(writer func(TrackSender)) int { +func (d *DownTrackSpreader) Broadcast(writer func(TrackSender)) { downTracks := d.GetDownTracks() if len(downTracks) == 0 { - return 0 + return } threshold := uint64(d.params.Threshold) @@ -101,7 +101,6 @@ func (d *DownTrackSpreader) Broadcast(writer func(TrackSender)) int { // WriteRTP takes about 50µs on average, so we write to 2 down tracks per loop. step := uint64(2) utils.ParallelExec(downTracks, threshold, step, writer) - return len(downTracks) } func (d *DownTrackSpreader) DownTrackCount() int { diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 8c59db68c..ce2771f23 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -1696,7 +1696,7 @@ func (f *Forwarder) GetTranslationParams(extPkt *buffer.ExtPacket, layer int32) return TranslationParams{ shouldDrop: true, - }, ErrUnknownKind + }, errUnknownKind } func (f *Forwarder) getRefLayerRTPTimestamp(ts uint32, refLayer, targetLayer int32) (uint32, error) { @@ -2038,7 +2038,7 @@ func (f *Forwarder) getTranslationParamsCommon(extPkt *buffer.ExtPacket, layer i tpRTP, err := f.rtpMunger.UpdateAndGetSnTs(extPkt, tp.marker) if err != nil { tp.shouldDrop = true - if err == ErrPaddingOnlyPacket || err == ErrDuplicatePacket || err == ErrOutOfOrderSequenceNumberCacheMiss { + if err == errPaddingOnlyPacket || err == errDuplicatePacket || err == errOutOfOrderSequenceNumberCacheMiss { return nil } return err diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index f25250f86..8de2aff67 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -148,7 +148,7 @@ type TrackReceiver interface { } type REDTransformer interface { - ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int32) int + ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int32) int32 ForwardRTCPSenderReport( payloadType webrtc.PayloadType, layer int32, @@ -818,16 +818,19 @@ func (w *WebRTCReceiver) forwardRTP(layer int32, buff *buffer.Buffer) { continue } - writeCount := w.downTrackSpreader.Broadcast(func(dt TrackSender) { - _ = dt.WriteRTP(pkt, spatialLayer) + var writeCount atomic.Int32 + w.downTrackSpreader.Broadcast(func(dt TrackSender) { + if dt.WriteRTP(pkt, spatialLayer) { + writeCount.Inc() + } }) if rt := w.redTransformer.Load(); rt != nil { - writeCount += rt.(REDTransformer).ForwardRTP(pkt, spatialLayer) + writeCount.Add(rt.(REDTransformer).ForwardRTP(pkt, spatialLayer)) } // track delay/jitter - if writeCount > 0 && w.forwardStats != nil { + if writeCount.Load() > 0 && w.forwardStats != nil { if latency, isHigh := w.forwardStats.Update(pkt.Arrival, mono.UnixNano()); isHigh { w.logger.Infow( "high forwarding latency", diff --git a/pkg/sfu/redprimaryreceiver.go b/pkg/sfu/redprimaryreceiver.go index e18a57143..3bef9ee1a 100644 --- a/pkg/sfu/redprimaryreceiver.go +++ b/pkg/sfu/redprimaryreceiver.go @@ -56,7 +56,7 @@ func NewRedPrimaryReceiver(receiver TrackReceiver, dsp DownTrackSpreaderParams) } } -func (r *RedPrimaryReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int32) int { +func (r *RedPrimaryReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int32) int32 { // extract primary payload from RED and forward to downtracks if r.downTrackSpreader.DownTrackCount() == 0 { return 0 @@ -64,9 +64,13 @@ func (r *RedPrimaryReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int3 if pkt.Packet.PayloadType != r.redPT { // forward non-red packet directly - return r.downTrackSpreader.Broadcast(func(dt TrackSender) { - _ = dt.WriteRTP(pkt, spatialLayer) + var writeCount atomic.Int32 + r.downTrackSpreader.Broadcast(func(dt TrackSender) { + if dt.WriteRTP(pkt, spatialLayer) { + writeCount.Inc() + } }) + return writeCount.Load() } pkts, err := r.getSendPktsFromRed(pkt.Packet) @@ -75,7 +79,7 @@ func (r *RedPrimaryReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int3 return 0 } - var count int + var writeCount atomic.Int32 for i, sendPkt := range pkts { pPkt := *pkt if i != len(pkts)-1 { @@ -88,11 +92,13 @@ func (r *RedPrimaryReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int3 // not modify the ExtPacket.RawPacket here for performance since it is not used by the DownTrack, // otherwise it should be set to the correct value (marshal the primary rtp packet) - count += r.downTrackSpreader.Broadcast(func(dt TrackSender) { - _ = dt.WriteRTP(&pPkt, spatialLayer) + r.downTrackSpreader.Broadcast(func(dt TrackSender) { + if dt.WriteRTP(&pPkt, spatialLayer) { + writeCount.Inc() + } }) } - return count + return writeCount.Load() } func (r *RedPrimaryReceiver) ForwardRTCPSenderReport( diff --git a/pkg/sfu/redreceiver.go b/pkg/sfu/redreceiver.go index 9151d259d..ea64ea33b 100644 --- a/pkg/sfu/redreceiver.go +++ b/pkg/sfu/redreceiver.go @@ -58,7 +58,7 @@ func NewRedReceiver(receiver TrackReceiver, dsp DownTrackSpreaderParams) *RedRec } } -func (r *RedReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int32) int { +func (r *RedReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int32) int32 { // encode RED payload from primary payload and forward to downtracks if r.downTrackSpreader.DownTrackCount() == 0 { return 0 @@ -66,9 +66,13 @@ func (r *RedReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int32) int // fallback to primary codec if payload size exceeds redundant block length if len(pkt.Packet.Payload) >= maxRedPayload { - return r.downTrackSpreader.Broadcast(func(dt TrackSender) { - _ = dt.WriteRTP(pkt, spatialLayer) + var writeCount atomic.Int32 + r.downTrackSpreader.Broadcast(func(dt TrackSender) { + if dt.WriteRTP(pkt, spatialLayer) { + writeCount.Inc() + } }) + return writeCount.Load() } redLen, err := r.encodeRedForPrimary(pkt.Packet, r.redPayloadBuf[:]) @@ -85,9 +89,13 @@ func (r *RedReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int32) int // not modify the ExtPacket.RawPacket here for performance since it is not used by the DownTrack, // otherwise it should be set to the correct value (marshal the primary rtp packet) - return r.downTrackSpreader.Broadcast(func(dt TrackSender) { - _ = dt.WriteRTP(&pPkt, spatialLayer) + var writeCount atomic.Int32 + r.downTrackSpreader.Broadcast(func(dt TrackSender) { + if dt.WriteRTP(&pPkt, spatialLayer) { + writeCount.Inc() + } }) + return writeCount.Load() } func (r *RedReceiver) ForwardRTCPSenderReport( diff --git a/pkg/sfu/redreceiver_test.go b/pkg/sfu/redreceiver_test.go index 16a1340b4..765a30ae3 100644 --- a/pkg/sfu/redreceiver_test.go +++ b/pkg/sfu/redreceiver_test.go @@ -36,10 +36,10 @@ type dummyDowntrack struct { receivedPkts []*rtp.Packet } -func (dt *dummyDowntrack) WriteRTP(p *buffer.ExtPacket, _ int32) error { +func (dt *dummyDowntrack) WriteRTP(p *buffer.ExtPacket, _ int32) bool { dt.lastReceivedPkt = p.Packet dt.receivedPkts = append(dt.receivedPkts, p.Packet) - return nil + return true } func (dt *dummyDowntrack) TrackInfoAvailable() {} diff --git a/pkg/sfu/rtpmunger.go b/pkg/sfu/rtpmunger.go index ed3f85870..c1cd73005 100644 --- a/pkg/sfu/rtpmunger.go +++ b/pkg/sfu/rtpmunger.go @@ -207,7 +207,7 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket, marker bool) (Tra if err != nil { return TranslationParamsRTP{ snOrdering: SequenceNumberOrderingOutOfOrder, - }, ErrOutOfOrderSequenceNumberCacheMiss + }, errOutOfOrderSequenceNumberCacheMiss } extSequenceNumber := extPkt.ExtSequenceNumber - snOffset @@ -223,7 +223,7 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket, marker bool) (Tra ) return TranslationParamsRTP{ snOrdering: SequenceNumberOrderingOutOfOrder, - }, ErrOutOfOrderSequenceNumberCacheMiss + }, errOutOfOrderSequenceNumberCacheMiss } return TranslationParamsRTP{ @@ -245,13 +245,13 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket, marker bool) (Tra return TranslationParamsRTP{ snOrdering: SequenceNumberOrderingContiguous, - }, ErrPaddingOnlyPacket + }, errPaddingOnlyPacket } // can get duplicate packet due to FEC return TranslationParamsRTP{ snOrdering: SequenceNumberOrderingDuplicate, - }, ErrDuplicatePacket + }, errDuplicatePacket } func (r *RTPMunger) FilterRTX(nacks []uint16) []uint16 { @@ -284,7 +284,7 @@ func (r *RTPMunger) UpdateAndGetPaddingSnTs( tsOffset := 0 if !r.lastMarker { if !forceMarker { - return nil, ErrPaddingNotOnFrameBoundary + return nil, errPaddingNotOnFrameBoundary } // if forcing frame end, use timestamp of latest received frame for the first one diff --git a/pkg/sfu/rtpmunger_test.go b/pkg/sfu/rtpmunger_test.go index df90ec2d4..9e519120c 100644 --- a/pkg/sfu/rtpmunger_test.go +++ b/pkg/sfu/rtpmunger_test.go @@ -228,7 +228,7 @@ func TestOutOfOrderSequenceNumber(t *testing.T) { } tp, err = r.UpdateAndGetSnTs(extPkt, extPkt.Packet.Marker) - require.Error(t, err, ErrOutOfOrderSequenceNumberCacheMiss) + require.Error(t, err, errOutOfOrderSequenceNumberCacheMiss) require.Equal(t, tpExpected, tp) } @@ -252,7 +252,7 @@ func TestDuplicateSequenceNumber(t *testing.T) { } tp, err := r.UpdateAndGetSnTs(extPkt, extPkt.Packet.Marker) - require.ErrorIs(t, err, ErrDuplicatePacket) + require.ErrorIs(t, err, errDuplicatePacket) require.Equal(t, tpExpected, tp) } @@ -274,7 +274,7 @@ func TestPaddingOnlyPacket(t *testing.T) { tp, err := r.UpdateAndGetSnTs(extPkt, extPkt.Packet.Marker) require.Error(t, err) - require.ErrorIs(t, err, ErrPaddingOnlyPacket) + require.ErrorIs(t, err, errPaddingOnlyPacket) require.Equal(t, tpExpected, tp) require.Equal(t, uint64(23333), r.extHighestIncomingSN) require.Equal(t, uint64(23333), r.extLastSN) @@ -366,7 +366,7 @@ func TestGapInSequenceNumber(t *testing.T) { } tp, err = r.UpdateAndGetSnTs(extPkt, extPkt.Packet.Marker) - require.ErrorIs(t, err, ErrPaddingOnlyPacket) + require.ErrorIs(t, err, errPaddingOnlyPacket) require.Equal(t, tpExpected, tp) require.Equal(t, uint64(65536+2), r.extHighestIncomingSN) require.Equal(t, uint64(65536+1), r.extLastSN) @@ -417,7 +417,7 @@ func TestGapInSequenceNumber(t *testing.T) { } tp, err = r.UpdateAndGetSnTs(extPkt, extPkt.Packet.Marker) - require.ErrorIs(t, err, ErrPaddingOnlyPacket) + require.ErrorIs(t, err, errPaddingOnlyPacket) require.Equal(t, tpExpected, tp) require.Equal(t, uint64(65536+5), r.extHighestIncomingSN) require.Equal(t, uint64(65536+3), r.extLastSN) @@ -521,7 +521,7 @@ func TestUpdateAndGetPaddingSnTs(t *testing.T) { // getting padding without forcing marker should fail _, err := r.UpdateAndGetPaddingSnTs(10, 10, 5, false, 0) require.Error(t, err) - require.ErrorIs(t, err, ErrPaddingNotOnFrameBoundary) + require.ErrorIs(t, err, errPaddingNotOnFrameBoundary) // forcing a marker should not error out. // And timestamp on first padding should be the same as the last one.