Add packet to forwarding stats only if packet is forwarded. (#4056)

Packets not being forwarded were getting included in forwarding stats
calculation and skewing the measurement towards a smaller number.

The latency measurement does not include the batch IO of packets on
send. With a 2ms batching, that will add an average latency of 1ms.
This commit is contained in:
Raja Subramanian
2025-11-06 12:31:49 +05:30
committed by GitHub
parent f6909192bb
commit ae5fb7e882
10 changed files with 73 additions and 57 deletions
+7 -7
View File
@@ -89,23 +89,23 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
}
rtcEgressLauncher := NewEgressLauncher(egressClient, ioInfoService, objectStore)
topicFormatter := rpc.NewTopicFormatter()
v, err := rpc.NewTypedRoomClient(clientParams)
roomClient, err := rpc.NewTypedRoomClient(clientParams)
if err != nil {
return nil, err
}
v2, err := rpc.NewTypedParticipantClient(clientParams)
participantClient, err := rpc.NewTypedParticipantClient(clientParams)
if err != nil {
return nil, err
}
roomService, err := NewRoomService(limitConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, v, v2)
roomService, err := NewRoomService(limitConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, roomClient, participantClient)
if err != nil {
return nil, err
}
v3, err := rpc.NewTypedAgentDispatchInternalClient(clientParams)
agentDispatchInternalClient, err := rpc.NewTypedAgentDispatchInternalClient(clientParams)
if err != nil {
return nil, err
}
agentDispatchService := NewAgentDispatchService(v3, topicFormatter, roomAllocator, router)
agentDispatchService := NewAgentDispatchService(agentDispatchInternalClient, topicFormatter, roomAllocator, router)
egressService := NewEgressService(egressClient, rtcEgressLauncher, ioInfoService, roomService)
ingressConfig := getIngressConfig(conf)
ingressClient, err := rpc.NewIngressClient(clientParams)
@@ -120,11 +120,11 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
}
sipService := NewSIPService(sipConfig, nodeID, messageBus, sipClient, sipStore, roomService, telemetryService)
rtcService := NewRTCService(conf, roomAllocator, router, telemetryService)
v4, err := rpc.NewTypedWHIPParticipantClient(clientParams)
whipParticipantClient, err := rpc.NewTypedWHIPParticipantClient(clientParams)
if err != nil {
return nil, err
}
serviceWHIPService, err := NewWHIPService(conf, router, roomAllocator, clientParams, topicFormatter, v4)
serviceWHIPService, err := NewWHIPService(conf, router, roomAllocator, clientParams, topicFormatter, whipParticipantClient)
if err != nil {
return nil, err
}
+15 -15
View File
@@ -57,7 +57,7 @@ type TrackSender interface {
UpTrackMaxPublishedLayerChange(maxPublishedLayer int32)
UpTrackMaxTemporalLayerSeenChange(maxTemporalLayerSeen int32)
UpTrackBitrateReport(availableLayers []int32, bitrates Bitrates)
WriteRTP(p *buffer.ExtPacket, layer int32) error
WriteRTP(p *buffer.ExtPacket, layer int32) bool
Close()
IsClosed() bool
// ID is the globally unique identifier for this Track.
@@ -94,13 +94,13 @@ const (
// -------------------------------------------------------------------
var (
ErrUnknownKind = errors.New("unknown kind of codec")
ErrOutOfOrderSequenceNumberCacheMiss = errors.New("out-of-order sequence number not found in cache")
ErrPaddingOnlyPacket = errors.New("padding only packet that need not be forwarded")
ErrDuplicatePacket = errors.New("duplicate packet")
ErrPaddingNotOnFrameBoundary = errors.New("padding cannot send on non-frame boundary")
ErrDownTrackAlreadyBound = errors.New("already bound")
ErrPayloadOverflow = errors.New("payload overflow")
errUnknownKind = errors.New("unknown kind of codec")
errOutOfOrderSequenceNumberCacheMiss = errors.New("out-of-order sequence number not found in cache")
errPaddingOnlyPacket = errors.New("padding only packet that need not be forwarded")
errDuplicatePacket = errors.New("duplicate packet")
errPaddingNotOnFrameBoundary = errors.New("padding cannot send on non-frame boundary")
errDownTrackAlreadyBound = errors.New("already bound")
errPayloadOverflow = errors.New("payload overflow")
)
var (
@@ -452,7 +452,7 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters,
d.bindLock.Lock()
if d.bindState.Load() != bindStateUnbound {
d.bindLock.Unlock()
return webrtc.RTPCodecParameters{}, ErrDownTrackAlreadyBound
return webrtc.RTPCodecParameters{}, errDownTrackAlreadyBound
}
// the TrackLocalContext's codec parameters will be set to the bound codec after Bind returns,
@@ -984,9 +984,9 @@ func (d *DownTrack) maxLayerNotifierWorker() {
}
// WriteRTP writes an RTP Packet to the DownTrack
func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error {
func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) bool {
if !d.writable.Load() {
return nil
return false
}
tp, err := d.forwarder.GetTranslationParams(extPkt, layer)
@@ -994,7 +994,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error {
if err != nil {
d.params.Logger.Errorw("could not get translation params", err)
}
return err
return false
}
poolEntity := PacketFactory.Get().(*[]byte)
@@ -1003,12 +1003,12 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error {
n := copy(payload[len(tp.codecBytes):], extPkt.Packet.Payload[tp.incomingHeaderSize:])
if n != len(extPkt.Packet.Payload[tp.incomingHeaderSize:]) {
d.params.Logger.Errorw(
"payload overflow", nil,
"payload overflow", errPayloadOverflow,
"want", len(extPkt.Packet.Payload[tp.incomingHeaderSize:]),
"have", n,
)
PacketFactory.Put(poolEntity)
return ErrPayloadOverflow
return false
}
payload = payload[:len(tp.codecBytes)+n]
@@ -1126,7 +1126,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error {
sal.OnResume(d)
}
}
return nil
return true
}
// WritePaddingRTP tries to write as many padding only RTP packets as necessary
+2 -3
View File
@@ -86,10 +86,10 @@ func (d *DownTrackSpreader) HasDownTrack(subscriberID livekit.ParticipantID) boo
return ok
}
func (d *DownTrackSpreader) Broadcast(writer func(TrackSender)) int {
func (d *DownTrackSpreader) Broadcast(writer func(TrackSender)) {
downTracks := d.GetDownTracks()
if len(downTracks) == 0 {
return 0
return
}
threshold := uint64(d.params.Threshold)
@@ -101,7 +101,6 @@ func (d *DownTrackSpreader) Broadcast(writer func(TrackSender)) int {
// WriteRTP takes about 50µs on average, so we write to 2 down tracks per loop.
step := uint64(2)
utils.ParallelExec(downTracks, threshold, step, writer)
return len(downTracks)
}
func (d *DownTrackSpreader) DownTrackCount() int {
+2 -2
View File
@@ -1696,7 +1696,7 @@ func (f *Forwarder) GetTranslationParams(extPkt *buffer.ExtPacket, layer int32)
return TranslationParams{
shouldDrop: true,
}, ErrUnknownKind
}, errUnknownKind
}
func (f *Forwarder) getRefLayerRTPTimestamp(ts uint32, refLayer, targetLayer int32) (uint32, error) {
@@ -2038,7 +2038,7 @@ func (f *Forwarder) getTranslationParamsCommon(extPkt *buffer.ExtPacket, layer i
tpRTP, err := f.rtpMunger.UpdateAndGetSnTs(extPkt, tp.marker)
if err != nil {
tp.shouldDrop = true
if err == ErrPaddingOnlyPacket || err == ErrDuplicatePacket || err == ErrOutOfOrderSequenceNumberCacheMiss {
if err == errPaddingOnlyPacket || err == errDuplicatePacket || err == errOutOfOrderSequenceNumberCacheMiss {
return nil
}
return err
+8 -5
View File
@@ -148,7 +148,7 @@ type TrackReceiver interface {
}
type REDTransformer interface {
ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int32) int
ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int32) int32
ForwardRTCPSenderReport(
payloadType webrtc.PayloadType,
layer int32,
@@ -818,16 +818,19 @@ func (w *WebRTCReceiver) forwardRTP(layer int32, buff *buffer.Buffer) {
continue
}
writeCount := w.downTrackSpreader.Broadcast(func(dt TrackSender) {
_ = dt.WriteRTP(pkt, spatialLayer)
var writeCount atomic.Int32
w.downTrackSpreader.Broadcast(func(dt TrackSender) {
if dt.WriteRTP(pkt, spatialLayer) {
writeCount.Inc()
}
})
if rt := w.redTransformer.Load(); rt != nil {
writeCount += rt.(REDTransformer).ForwardRTP(pkt, spatialLayer)
writeCount.Add(rt.(REDTransformer).ForwardRTP(pkt, spatialLayer))
}
// track delay/jitter
if writeCount > 0 && w.forwardStats != nil {
if writeCount.Load() > 0 && w.forwardStats != nil {
if latency, isHigh := w.forwardStats.Update(pkt.Arrival, mono.UnixNano()); isHigh {
w.logger.Infow(
"high forwarding latency",
+13 -7
View File
@@ -56,7 +56,7 @@ func NewRedPrimaryReceiver(receiver TrackReceiver, dsp DownTrackSpreaderParams)
}
}
func (r *RedPrimaryReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int32) int {
func (r *RedPrimaryReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int32) int32 {
// extract primary payload from RED and forward to downtracks
if r.downTrackSpreader.DownTrackCount() == 0 {
return 0
@@ -64,9 +64,13 @@ func (r *RedPrimaryReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int3
if pkt.Packet.PayloadType != r.redPT {
// forward non-red packet directly
return r.downTrackSpreader.Broadcast(func(dt TrackSender) {
_ = dt.WriteRTP(pkt, spatialLayer)
var writeCount atomic.Int32
r.downTrackSpreader.Broadcast(func(dt TrackSender) {
if dt.WriteRTP(pkt, spatialLayer) {
writeCount.Inc()
}
})
return writeCount.Load()
}
pkts, err := r.getSendPktsFromRed(pkt.Packet)
@@ -75,7 +79,7 @@ func (r *RedPrimaryReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int3
return 0
}
var count int
var writeCount atomic.Int32
for i, sendPkt := range pkts {
pPkt := *pkt
if i != len(pkts)-1 {
@@ -88,11 +92,13 @@ func (r *RedPrimaryReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int3
// not modify the ExtPacket.RawPacket here for performance since it is not used by the DownTrack,
// otherwise it should be set to the correct value (marshal the primary rtp packet)
count += r.downTrackSpreader.Broadcast(func(dt TrackSender) {
_ = dt.WriteRTP(&pPkt, spatialLayer)
r.downTrackSpreader.Broadcast(func(dt TrackSender) {
if dt.WriteRTP(&pPkt, spatialLayer) {
writeCount.Inc()
}
})
}
return count
return writeCount.Load()
}
func (r *RedPrimaryReceiver) ForwardRTCPSenderReport(
+13 -5
View File
@@ -58,7 +58,7 @@ func NewRedReceiver(receiver TrackReceiver, dsp DownTrackSpreaderParams) *RedRec
}
}
func (r *RedReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int32) int {
func (r *RedReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int32) int32 {
// encode RED payload from primary payload and forward to downtracks
if r.downTrackSpreader.DownTrackCount() == 0 {
return 0
@@ -66,9 +66,13 @@ func (r *RedReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int32) int
// fallback to primary codec if payload size exceeds redundant block length
if len(pkt.Packet.Payload) >= maxRedPayload {
return r.downTrackSpreader.Broadcast(func(dt TrackSender) {
_ = dt.WriteRTP(pkt, spatialLayer)
var writeCount atomic.Int32
r.downTrackSpreader.Broadcast(func(dt TrackSender) {
if dt.WriteRTP(pkt, spatialLayer) {
writeCount.Inc()
}
})
return writeCount.Load()
}
redLen, err := r.encodeRedForPrimary(pkt.Packet, r.redPayloadBuf[:])
@@ -85,9 +89,13 @@ func (r *RedReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int32) int
// not modify the ExtPacket.RawPacket here for performance since it is not used by the DownTrack,
// otherwise it should be set to the correct value (marshal the primary rtp packet)
return r.downTrackSpreader.Broadcast(func(dt TrackSender) {
_ = dt.WriteRTP(&pPkt, spatialLayer)
var writeCount atomic.Int32
r.downTrackSpreader.Broadcast(func(dt TrackSender) {
if dt.WriteRTP(&pPkt, spatialLayer) {
writeCount.Inc()
}
})
return writeCount.Load()
}
func (r *RedReceiver) ForwardRTCPSenderReport(
+2 -2
View File
@@ -36,10 +36,10 @@ type dummyDowntrack struct {
receivedPkts []*rtp.Packet
}
func (dt *dummyDowntrack) WriteRTP(p *buffer.ExtPacket, _ int32) error {
func (dt *dummyDowntrack) WriteRTP(p *buffer.ExtPacket, _ int32) bool {
dt.lastReceivedPkt = p.Packet
dt.receivedPkts = append(dt.receivedPkts, p.Packet)
return nil
return true
}
func (dt *dummyDowntrack) TrackInfoAvailable() {}
+5 -5
View File
@@ -207,7 +207,7 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket, marker bool) (Tra
if err != nil {
return TranslationParamsRTP{
snOrdering: SequenceNumberOrderingOutOfOrder,
}, ErrOutOfOrderSequenceNumberCacheMiss
}, errOutOfOrderSequenceNumberCacheMiss
}
extSequenceNumber := extPkt.ExtSequenceNumber - snOffset
@@ -223,7 +223,7 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket, marker bool) (Tra
)
return TranslationParamsRTP{
snOrdering: SequenceNumberOrderingOutOfOrder,
}, ErrOutOfOrderSequenceNumberCacheMiss
}, errOutOfOrderSequenceNumberCacheMiss
}
return TranslationParamsRTP{
@@ -245,13 +245,13 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket, marker bool) (Tra
return TranslationParamsRTP{
snOrdering: SequenceNumberOrderingContiguous,
}, ErrPaddingOnlyPacket
}, errPaddingOnlyPacket
}
// can get duplicate packet due to FEC
return TranslationParamsRTP{
snOrdering: SequenceNumberOrderingDuplicate,
}, ErrDuplicatePacket
}, errDuplicatePacket
}
func (r *RTPMunger) FilterRTX(nacks []uint16) []uint16 {
@@ -284,7 +284,7 @@ func (r *RTPMunger) UpdateAndGetPaddingSnTs(
tsOffset := 0
if !r.lastMarker {
if !forceMarker {
return nil, ErrPaddingNotOnFrameBoundary
return nil, errPaddingNotOnFrameBoundary
}
// if forcing frame end, use timestamp of latest received frame for the first one
+6 -6
View File
@@ -228,7 +228,7 @@ func TestOutOfOrderSequenceNumber(t *testing.T) {
}
tp, err = r.UpdateAndGetSnTs(extPkt, extPkt.Packet.Marker)
require.Error(t, err, ErrOutOfOrderSequenceNumberCacheMiss)
require.Error(t, err, errOutOfOrderSequenceNumberCacheMiss)
require.Equal(t, tpExpected, tp)
}
@@ -252,7 +252,7 @@ func TestDuplicateSequenceNumber(t *testing.T) {
}
tp, err := r.UpdateAndGetSnTs(extPkt, extPkt.Packet.Marker)
require.ErrorIs(t, err, ErrDuplicatePacket)
require.ErrorIs(t, err, errDuplicatePacket)
require.Equal(t, tpExpected, tp)
}
@@ -274,7 +274,7 @@ func TestPaddingOnlyPacket(t *testing.T) {
tp, err := r.UpdateAndGetSnTs(extPkt, extPkt.Packet.Marker)
require.Error(t, err)
require.ErrorIs(t, err, ErrPaddingOnlyPacket)
require.ErrorIs(t, err, errPaddingOnlyPacket)
require.Equal(t, tpExpected, tp)
require.Equal(t, uint64(23333), r.extHighestIncomingSN)
require.Equal(t, uint64(23333), r.extLastSN)
@@ -366,7 +366,7 @@ func TestGapInSequenceNumber(t *testing.T) {
}
tp, err = r.UpdateAndGetSnTs(extPkt, extPkt.Packet.Marker)
require.ErrorIs(t, err, ErrPaddingOnlyPacket)
require.ErrorIs(t, err, errPaddingOnlyPacket)
require.Equal(t, tpExpected, tp)
require.Equal(t, uint64(65536+2), r.extHighestIncomingSN)
require.Equal(t, uint64(65536+1), r.extLastSN)
@@ -417,7 +417,7 @@ func TestGapInSequenceNumber(t *testing.T) {
}
tp, err = r.UpdateAndGetSnTs(extPkt, extPkt.Packet.Marker)
require.ErrorIs(t, err, ErrPaddingOnlyPacket)
require.ErrorIs(t, err, errPaddingOnlyPacket)
require.Equal(t, tpExpected, tp)
require.Equal(t, uint64(65536+5), r.extHighestIncomingSN)
require.Equal(t, uint64(65536+3), r.extLastSN)
@@ -521,7 +521,7 @@ func TestUpdateAndGetPaddingSnTs(t *testing.T) {
// getting padding without forcing marker should fail
_, err := r.UpdateAndGetPaddingSnTs(10, 10, 5, false, 0)
require.Error(t, err)
require.ErrorIs(t, err, ErrPaddingNotOnFrameBoundary)
require.ErrorIs(t, err, errPaddingNotOnFrameBoundary)
// forcing a marker should not error out.
// And timestamp on first padding should be the same as the last one.