diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 35b99e355..4cd0042af 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -57,7 +57,7 @@ type TrackSender interface { UpTrackMaxPublishedLayerChange(maxPublishedLayer int32) UpTrackMaxTemporalLayerSeenChange(maxTemporalLayerSeen int32) UpTrackBitrateReport(availableLayers []int32, bitrates Bitrates) - WriteRTP(p *buffer.ExtPacket, layer int32) bool + WriteRTP(p *buffer.ExtPacket, layer int32) int32 Close() IsClosed() bool // ID is the globally unique identifier for this Track. @@ -984,9 +984,9 @@ func (d *DownTrack) maxLayerNotifierWorker() { } // WriteRTP writes an RTP Packet to the DownTrack -func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) bool { +func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) int32 { if !d.writable.Load() { - return false + return 0 } tp, err := d.forwarder.GetTranslationParams(extPkt, layer) @@ -994,7 +994,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) bool { if err != nil { d.params.Logger.Errorw("could not get translation params", err) } - return false + return 0 } poolEntity := PacketFactory.Get().(*[]byte) @@ -1008,7 +1008,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) bool { "have", n, ) PacketFactory.Put(poolEntity) - return false + return 0 } payload = payload[:len(tp.codecBytes)+n] @@ -1126,7 +1126,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) bool { sal.OnResume(d) } } - return true + return 1 } // WritePaddingRTP tries to write as many padding only RTP packets as necessary diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index a0e2117a5..26daf3644 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -820,9 +820,7 @@ func (w *WebRTCReceiver) forwardRTP(layer int32, buff *buffer.Buffer) { var writeCount atomic.Int32 w.downTrackSpreader.Broadcast(func(dt TrackSender) { - if dt.WriteRTP(pkt, spatialLayer) { - writeCount.Inc() - } + writeCount.Add(dt.WriteRTP(pkt, spatialLayer)) }) if rt := w.redTransformer.Load(); rt != nil { diff --git a/pkg/sfu/redprimaryreceiver.go b/pkg/sfu/redprimaryreceiver.go index 3bef9ee1a..7e884a433 100644 --- a/pkg/sfu/redprimaryreceiver.go +++ b/pkg/sfu/redprimaryreceiver.go @@ -66,9 +66,7 @@ func (r *RedPrimaryReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int3 // forward non-red packet directly var writeCount atomic.Int32 r.downTrackSpreader.Broadcast(func(dt TrackSender) { - if dt.WriteRTP(pkt, spatialLayer) { - writeCount.Inc() - } + writeCount.Add(dt.WriteRTP(pkt, spatialLayer)) }) return writeCount.Load() } @@ -93,9 +91,7 @@ func (r *RedPrimaryReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int3 // not modify the ExtPacket.RawPacket here for performance since it is not used by the DownTrack, // otherwise it should be set to the correct value (marshal the primary rtp packet) r.downTrackSpreader.Broadcast(func(dt TrackSender) { - if dt.WriteRTP(&pPkt, spatialLayer) { - writeCount.Inc() - } + writeCount.Add(dt.WriteRTP(&pPkt, spatialLayer)) }) } return writeCount.Load() diff --git a/pkg/sfu/redreceiver.go b/pkg/sfu/redreceiver.go index ea64ea33b..8e4d68af3 100644 --- a/pkg/sfu/redreceiver.go +++ b/pkg/sfu/redreceiver.go @@ -68,9 +68,7 @@ func (r *RedReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int32) int3 if len(pkt.Packet.Payload) >= maxRedPayload { var writeCount atomic.Int32 r.downTrackSpreader.Broadcast(func(dt TrackSender) { - if dt.WriteRTP(pkt, spatialLayer) { - writeCount.Inc() - } + writeCount.Add(dt.WriteRTP(pkt, spatialLayer)) }) return writeCount.Load() } @@ -91,9 +89,7 @@ func (r *RedReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int32) int3 // otherwise it should be set to the correct value (marshal the primary rtp packet) var writeCount atomic.Int32 r.downTrackSpreader.Broadcast(func(dt TrackSender) { - if dt.WriteRTP(&pPkt, spatialLayer) { - writeCount.Inc() - } + writeCount.Add(dt.WriteRTP(&pPkt, spatialLayer)) }) return writeCount.Load() } diff --git a/pkg/sfu/redreceiver_test.go b/pkg/sfu/redreceiver_test.go index 765a30ae3..a8ad32a71 100644 --- a/pkg/sfu/redreceiver_test.go +++ b/pkg/sfu/redreceiver_test.go @@ -36,10 +36,10 @@ type dummyDowntrack struct { receivedPkts []*rtp.Packet } -func (dt *dummyDowntrack) WriteRTP(p *buffer.ExtPacket, _ int32) bool { +func (dt *dummyDowntrack) WriteRTP(p *buffer.ExtPacket, _ int32) int32 { dt.lastReceivedPkt = p.Packet dt.receivedPkts = append(dt.receivedPkts, p.Packet) - return true + return 1 } func (dt *dummyDowntrack) TrackInfoAvailable() {}