Return write count from WriteRTP. (#4059)

* Log write count atomic.

* Return write count from WriteRTP.

Apologies for the frequent changes on this. With relays, the down track
could write to several targets. So, use count to have an accurate
indication of how may subscribers were written to.
This commit is contained in:
Raja Subramanian
2025-11-06 13:29:21 +05:30
committed by GitHub
parent d0ba46b460
commit 4872f2051d
5 changed files with 13 additions and 23 deletions

View File

@@ -57,7 +57,7 @@ type TrackSender interface {
UpTrackMaxPublishedLayerChange(maxPublishedLayer int32)
UpTrackMaxTemporalLayerSeenChange(maxTemporalLayerSeen int32)
UpTrackBitrateReport(availableLayers []int32, bitrates Bitrates)
WriteRTP(p *buffer.ExtPacket, layer int32) bool
WriteRTP(p *buffer.ExtPacket, layer int32) int32
Close()
IsClosed() bool
// ID is the globally unique identifier for this Track.
@@ -984,9 +984,9 @@ func (d *DownTrack) maxLayerNotifierWorker() {
}
// WriteRTP writes an RTP Packet to the DownTrack
func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) bool {
func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) int32 {
if !d.writable.Load() {
return false
return 0
}
tp, err := d.forwarder.GetTranslationParams(extPkt, layer)
@@ -994,7 +994,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) bool {
if err != nil {
d.params.Logger.Errorw("could not get translation params", err)
}
return false
return 0
}
poolEntity := PacketFactory.Get().(*[]byte)
@@ -1008,7 +1008,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) bool {
"have", n,
)
PacketFactory.Put(poolEntity)
return false
return 0
}
payload = payload[:len(tp.codecBytes)+n]
@@ -1126,7 +1126,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) bool {
sal.OnResume(d)
}
}
return true
return 1
}
// WritePaddingRTP tries to write as many padding only RTP packets as necessary

View File

@@ -820,9 +820,7 @@ func (w *WebRTCReceiver) forwardRTP(layer int32, buff *buffer.Buffer) {
var writeCount atomic.Int32
w.downTrackSpreader.Broadcast(func(dt TrackSender) {
if dt.WriteRTP(pkt, spatialLayer) {
writeCount.Inc()
}
writeCount.Add(dt.WriteRTP(pkt, spatialLayer))
})
if rt := w.redTransformer.Load(); rt != nil {

View File

@@ -66,9 +66,7 @@ func (r *RedPrimaryReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int3
// forward non-red packet directly
var writeCount atomic.Int32
r.downTrackSpreader.Broadcast(func(dt TrackSender) {
if dt.WriteRTP(pkt, spatialLayer) {
writeCount.Inc()
}
writeCount.Add(dt.WriteRTP(pkt, spatialLayer))
})
return writeCount.Load()
}
@@ -93,9 +91,7 @@ func (r *RedPrimaryReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int3
// not modify the ExtPacket.RawPacket here for performance since it is not used by the DownTrack,
// otherwise it should be set to the correct value (marshal the primary rtp packet)
r.downTrackSpreader.Broadcast(func(dt TrackSender) {
if dt.WriteRTP(&pPkt, spatialLayer) {
writeCount.Inc()
}
writeCount.Add(dt.WriteRTP(&pPkt, spatialLayer))
})
}
return writeCount.Load()

View File

@@ -68,9 +68,7 @@ func (r *RedReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int32) int3
if len(pkt.Packet.Payload) >= maxRedPayload {
var writeCount atomic.Int32
r.downTrackSpreader.Broadcast(func(dt TrackSender) {
if dt.WriteRTP(pkt, spatialLayer) {
writeCount.Inc()
}
writeCount.Add(dt.WriteRTP(pkt, spatialLayer))
})
return writeCount.Load()
}
@@ -91,9 +89,7 @@ func (r *RedReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int32) int3
// otherwise it should be set to the correct value (marshal the primary rtp packet)
var writeCount atomic.Int32
r.downTrackSpreader.Broadcast(func(dt TrackSender) {
if dt.WriteRTP(&pPkt, spatialLayer) {
writeCount.Inc()
}
writeCount.Add(dt.WriteRTP(&pPkt, spatialLayer))
})
return writeCount.Load()
}

View File

@@ -36,10 +36,10 @@ type dummyDowntrack struct {
receivedPkts []*rtp.Packet
}
func (dt *dummyDowntrack) WriteRTP(p *buffer.ExtPacket, _ int32) bool {
func (dt *dummyDowntrack) WriteRTP(p *buffer.ExtPacket, _ int32) int32 {
dt.lastReceivedPkt = p.Packet
dt.receivedPkts = append(dt.receivedPkts, p.Packet)
return true
return 1
}
func (dt *dummyDowntrack) TrackInfoAvailable() {}