From 4872f2051dfcf64e48c6e19e43a8d8630c24f3be Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Thu, 6 Nov 2025 13:29:21 +0530 Subject: [PATCH] Return write count from WriteRTP. (#4059) * Log write count atomic. * Return write count from WriteRTP. Apologies for the frequent changes on this. With relays, the down track could write to several targets. So, use count to have an accurate indication of how may subscribers were written to. --- pkg/sfu/downtrack.go | 12 ++++++------ pkg/sfu/receiver.go | 4 +--- pkg/sfu/redprimaryreceiver.go | 8 ++------ pkg/sfu/redreceiver.go | 8 ++------ pkg/sfu/redreceiver_test.go | 4 ++-- 5 files changed, 13 insertions(+), 23 deletions(-) diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 35b99e355..4cd0042af 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -57,7 +57,7 @@ type TrackSender interface { UpTrackMaxPublishedLayerChange(maxPublishedLayer int32) UpTrackMaxTemporalLayerSeenChange(maxTemporalLayerSeen int32) UpTrackBitrateReport(availableLayers []int32, bitrates Bitrates) - WriteRTP(p *buffer.ExtPacket, layer int32) bool + WriteRTP(p *buffer.ExtPacket, layer int32) int32 Close() IsClosed() bool // ID is the globally unique identifier for this Track. @@ -984,9 +984,9 @@ func (d *DownTrack) maxLayerNotifierWorker() { } // WriteRTP writes an RTP Packet to the DownTrack -func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) bool { +func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) int32 { if !d.writable.Load() { - return false + return 0 } tp, err := d.forwarder.GetTranslationParams(extPkt, layer) @@ -994,7 +994,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) bool { if err != nil { d.params.Logger.Errorw("could not get translation params", err) } - return false + return 0 } poolEntity := PacketFactory.Get().(*[]byte) @@ -1008,7 +1008,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) bool { "have", n, ) PacketFactory.Put(poolEntity) - return false + return 0 } payload = payload[:len(tp.codecBytes)+n] @@ -1126,7 +1126,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) bool { sal.OnResume(d) } } - return true + return 1 } // WritePaddingRTP tries to write as many padding only RTP packets as necessary diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index a0e2117a5..26daf3644 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -820,9 +820,7 @@ func (w *WebRTCReceiver) forwardRTP(layer int32, buff *buffer.Buffer) { var writeCount atomic.Int32 w.downTrackSpreader.Broadcast(func(dt TrackSender) { - if dt.WriteRTP(pkt, spatialLayer) { - writeCount.Inc() - } + writeCount.Add(dt.WriteRTP(pkt, spatialLayer)) }) if rt := w.redTransformer.Load(); rt != nil { diff --git a/pkg/sfu/redprimaryreceiver.go b/pkg/sfu/redprimaryreceiver.go index 3bef9ee1a..7e884a433 100644 --- a/pkg/sfu/redprimaryreceiver.go +++ b/pkg/sfu/redprimaryreceiver.go @@ -66,9 +66,7 @@ func (r *RedPrimaryReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int3 // forward non-red packet directly var writeCount atomic.Int32 r.downTrackSpreader.Broadcast(func(dt TrackSender) { - if dt.WriteRTP(pkt, spatialLayer) { - writeCount.Inc() - } + writeCount.Add(dt.WriteRTP(pkt, spatialLayer)) }) return writeCount.Load() } @@ -93,9 +91,7 @@ func (r *RedPrimaryReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int3 // not modify the ExtPacket.RawPacket here for performance since it is not used by the DownTrack, // otherwise it should be set to the correct value (marshal the primary rtp packet) r.downTrackSpreader.Broadcast(func(dt TrackSender) { - if dt.WriteRTP(&pPkt, spatialLayer) { - writeCount.Inc() - } + writeCount.Add(dt.WriteRTP(&pPkt, spatialLayer)) }) } return writeCount.Load() diff --git a/pkg/sfu/redreceiver.go b/pkg/sfu/redreceiver.go index ea64ea33b..8e4d68af3 100644 --- a/pkg/sfu/redreceiver.go +++ b/pkg/sfu/redreceiver.go @@ -68,9 +68,7 @@ func (r *RedReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int32) int3 if len(pkt.Packet.Payload) >= maxRedPayload { var writeCount atomic.Int32 r.downTrackSpreader.Broadcast(func(dt TrackSender) { - if dt.WriteRTP(pkt, spatialLayer) { - writeCount.Inc() - } + writeCount.Add(dt.WriteRTP(pkt, spatialLayer)) }) return writeCount.Load() } @@ -91,9 +89,7 @@ func (r *RedReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int32) int3 // otherwise it should be set to the correct value (marshal the primary rtp packet) var writeCount atomic.Int32 r.downTrackSpreader.Broadcast(func(dt TrackSender) { - if dt.WriteRTP(&pPkt, spatialLayer) { - writeCount.Inc() - } + writeCount.Add(dt.WriteRTP(&pPkt, spatialLayer)) }) return writeCount.Load() } diff --git a/pkg/sfu/redreceiver_test.go b/pkg/sfu/redreceiver_test.go index 765a30ae3..a8ad32a71 100644 --- a/pkg/sfu/redreceiver_test.go +++ b/pkg/sfu/redreceiver_test.go @@ -36,10 +36,10 @@ type dummyDowntrack struct { receivedPkts []*rtp.Packet } -func (dt *dummyDowntrack) WriteRTP(p *buffer.ExtPacket, _ int32) bool { +func (dt *dummyDowntrack) WriteRTP(p *buffer.ExtPacket, _ int32) int32 { dt.lastReceivedPkt = p.Packet dt.receivedPkts = append(dt.receivedPkts, p.Packet) - return true + return 1 } func (dt *dummyDowntrack) TrackInfoAvailable() {}