Fix header size calculation in stats. (#3171)

* Fix header size calculation in stats.

With pacer inserting some extensions, the header size used in stats
(and more impoetantly when probing for bandwidth estimation and
metering the bytes to control the probes) was incorrect. The size
was effectively was that of incoming extensions. It would have been
close enough though.

Anyhow, a bit of history
- initially was planning on packaging all the necessary fields into
  pacer packet and pacer would callback after sending, but that was not
  great for a couple of reasons
  - had to send in a bunch of useless data (as far as pacer is
    concerned) into pacer.
  - callback every packet (this is not bad, just a function call which
    happens in the foward path too, but had to lug around the above
    data).
- in the forward path, there is a very edge case issue when calling stats update
  after pacer.Enqueue() - details in https://github.com/livekit/livekit/pull/2085,
  but that is a rare case.

Because of those reasons, the update was placed in the forward path
before enqueue, but did not notice the header size issue till now.

As a compromise, `pacer.Enqueue` returns the headerSize and payloadSize.
It uses a dummy header to calculate size. Real extension will be added
just before sending packet on the wire. pion/rtp replaces extension if
one is already present. So, the dummy would be replaced by the real one
before sending on the wire.
a21194ecfb/packet.go (L398)

This does introduce back the second rare edge case, but that is very
rare and even if it happens, not catastrophic.

* cleanup

* add extensions and dummy as well in downtrack to make pacer cleaner
This commit is contained in:
Raja Subramanian
2024-11-12 10:53:57 +05:30
committed by GitHub
parent a825661aff
commit 41fbcec2cd
6 changed files with 174 additions and 233 deletions

View File

@@ -132,6 +132,9 @@ var (
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
}
dummyAbsSendTimeExt, _ = rtp.NewAbsSendTimeExtension(mono.Now()).Marshal()
dummyTransportCCExt, _ = rtp.TransportCCExtension{TransportSequence: 12345}.Marshal()
)
// -------------------------------------------------------------------
@@ -865,32 +868,26 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error {
}
payload = payload[:len(tp.codecBytes)+n]
hdr, err := d.getTranslatedRTPHeader(extPkt, &tp)
if err != nil {
d.params.Logger.Errorw("could not get translated RTP header", err)
PacketFactory.Put(poolEntity)
return err
// translate RTP header
hdr := &rtp.Header{
Version: extPkt.Packet.Version,
Padding: extPkt.Packet.Padding,
PayloadType: d.getTranslatedPayloadType(extPkt.Packet.PayloadType),
SequenceNumber: uint16(tp.rtp.extSequenceNumber),
Timestamp: uint32(tp.rtp.extTimestamp),
SSRC: d.ssrc,
}
if tp.marker {
hdr.Marker = tp.marker
}
var extensions []pacer.ExtensionData
// add extensions
if tp.ddBytes != nil {
extensions = append(
extensions,
pacer.ExtensionData{
ID: uint8(d.dependencyDescriptorExtID),
Payload: tp.ddBytes,
},
)
hdr.SetExtension(uint8(d.dependencyDescriptorExtID), tp.ddBytes)
}
if d.playoutDelayExtID != 0 && d.playoutDelay != nil {
if val := d.playoutDelay.GetDelayExtension(hdr.SequenceNumber); val != nil {
extensions = append(
extensions,
pacer.ExtensionData{
ID: uint8(d.playoutDelayExtID),
Payload: val,
},
)
hdr.SetExtension(uint8(d.playoutDelayExtID), val)
// NOTE: play out delay extension is not cached in sequencer,
// i. e. they will not be added to retransmitted packet.
@@ -912,20 +909,20 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error {
_, _, refSenderReport := d.forwarder.GetSenderReportParams()
if refSenderReport != nil {
actExtCopy := *extPkt.AbsCaptureTimeExt
if err = actExtCopy.Rewrite(rtpstats.RTCPSenderReportPropagationDelay(refSenderReport, !d.params.DisableSenderReportPassThrough)); err == nil {
if err = actExtCopy.Rewrite(
rtpstats.RTCPSenderReportPropagationDelay(
refSenderReport,
!d.params.DisableSenderReportPassThrough,
),
); err == nil {
actBytes, err = actExtCopy.Marshal()
if err == nil {
extensions = append(
extensions,
pacer.ExtensionData{
ID: uint8(d.absCaptureTimeExtID),
Payload: actBytes,
},
)
hdr.SetExtension(uint8(d.absCaptureTimeExtID), actBytes)
}
}
}
}
d.addDummyExtensions(hdr)
if d.sequencer != nil {
d.sequencer.push(
@@ -942,22 +939,17 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error {
)
}
d.sendingPacket(
hdr,
len(payload),
&sendPacketMetadata{
layer: layer,
packetTime: extPkt.Arrival,
extSequenceNumber: tp.rtp.extSequenceNumber,
extTimestamp: tp.rtp.extTimestamp,
isKeyFrame: extPkt.KeyFrame,
isOutOfOrder: extPkt.IsOutOfOrder,
tp: &tp,
},
)
d.pacer.Enqueue(pacer.Packet{
d.updateStats(updateStatsParams{
packetTime: extPkt.Arrival,
extSequenceNumber: tp.rtp.extSequenceNumber,
extTimestamp: tp.rtp.extTimestamp,
isOutOfOrder: extPkt.IsOutOfOrder,
headerSize: hdr.MarshalSize(),
payloadSize: len(payload),
marker: hdr.Marker,
})
d.pacer.Enqueue(&pacer.Packet{
Header: hdr,
Extensions: extensions,
Payload: payload,
AbsSendTimeExtID: uint8(d.absSendTimeExtID),
TransportWideExtID: uint8(d.transportWideExtID),
@@ -965,6 +957,27 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error {
Pool: PacketFactory,
PoolEntity: poolEntity,
})
if extPkt.KeyFrame {
d.isNACKThrottled.Store(false)
d.rtpStats.UpdateKeyFrame(1)
d.params.Logger.Debugw(
"forwarded key frame",
"layer", layer,
"rtpsn", tp.rtp.extSequenceNumber,
"rtpts", tp.rtp.extTimestamp,
)
}
if tp.isSwitching {
d.postMaxLayerNotifierEvent("switching")
}
if tp.isResuming {
if sal := d.getStreamAllocatorListener(); sal != nil {
sal.OnResume(d)
}
}
return nil
}
@@ -1026,7 +1039,7 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool, forceMa
bytesSent := 0
for i := 0; i < len(snts); i++ {
hdr := rtp.Header{
hdr := &rtp.Header{
Version: 2,
Padding: true,
Marker: false,
@@ -1034,33 +1047,33 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool, forceMa
SequenceNumber: uint16(snts[i].extSequenceNumber),
Timestamp: uint32(snts[i].extTimestamp),
SSRC: d.ssrc,
CSRC: []uint32{},
}
d.addDummyExtensions(hdr)
payload := make([]byte, RTPPaddingMaxPayloadSize)
// last byte of padding has padding size including that byte
payload[RTPPaddingMaxPayloadSize-1] = byte(RTPPaddingMaxPayloadSize)
d.sendingPacket(
&hdr,
len(payload),
&sendPacketMetadata{
packetTime: mono.UnixNano(),
extSequenceNumber: snts[i].extSequenceNumber,
extTimestamp: snts[i].extTimestamp,
isPadding: true,
shouldDisableCounter: true,
},
)
d.pacer.Enqueue(pacer.Packet{
Header: &hdr,
hdrSize := hdr.MarshalSize()
payloadSize := len(payload)
d.updateStats(updateStatsParams{
packetTime: mono.UnixNano(),
extSequenceNumber: snts[i].extSequenceNumber,
extTimestamp: snts[i].extTimestamp,
headerSize: hdrSize,
payloadSize: payloadSize,
isPadding: true,
disableCounter: true,
})
d.pacer.Enqueue(&pacer.Packet{
Header: hdr,
Payload: payload,
AbsSendTimeExtID: uint8(d.absSendTimeExtID),
TransportWideExtID: uint8(d.transportWideExtID),
WriteStream: d.writeStream,
})
bytesSent += hdr.MarshalSize() + len(payload)
bytesSent += hdrSize + payloadSize
}
// STREAM_ALLOCATOR-TODO: change this to pull this counter from stream allocator so that counter can be updated in pacer callback
@@ -1572,7 +1585,7 @@ func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan
}
for i := 0; i < len(snts); i++ {
hdr := rtp.Header{
hdr := &rtp.Header{
Version: 2,
Padding: false,
Marker: true,
@@ -1580,8 +1593,8 @@ func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan
SequenceNumber: uint16(snts[i].extSequenceNumber),
Timestamp: uint32(snts[i].extTimestamp),
SSRC: d.ssrc,
CSRC: []uint32{},
}
d.addDummyExtensions(hdr)
payload, err := getBlankFrame(frameEndNeeded)
if err != nil {
@@ -1590,13 +1603,15 @@ func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan
return
}
d.sendingPacket(&hdr, len(payload), &sendPacketMetadata{
d.updateStats(updateStatsParams{
packetTime: mono.UnixNano(),
extSequenceNumber: snts[i].extSequenceNumber,
extTimestamp: snts[i].extTimestamp,
headerSize: hdr.MarshalSize(),
payloadSize: len(payload),
})
d.pacer.Enqueue(pacer.Packet{
Header: &hdr,
d.pacer.Enqueue(&pacer.Packet{
Header: hdr,
Payload: payload,
AbsSendTimeExtID: uint8(d.absSendTimeExtID),
TransportWideExtID: uint8(d.transportWideExtID),
@@ -1924,7 +1939,6 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) {
payload = payload[:int(epm.numCodecBytesOut)+len(pkt.Payload)-int(epm.numCodecBytesIn)]
}
var extensions []pacer.ExtensionData
if d.dependencyDescriptorExtID != 0 {
var ddBytes []byte
if len(epm.ddBytesSlice) != 0 {
@@ -1932,39 +1946,24 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) {
} else {
ddBytes = epm.ddBytes[:epm.ddBytesSize]
}
extensions = append(
extensions,
pacer.ExtensionData{
ID: uint8(d.dependencyDescriptorExtID),
Payload: ddBytes,
},
)
pkt.Header.SetExtension(uint8(d.dependencyDescriptorExtID), ddBytes)
}
if d.absCaptureTimeExtID != 0 && len(epm.actBytes) != 0 {
extensions = append(
extensions,
pacer.ExtensionData{
ID: uint8(d.absCaptureTimeExtID),
Payload: epm.actBytes,
},
)
pkt.Header.SetExtension(uint8(d.absCaptureTimeExtID), epm.actBytes)
}
d.addDummyExtensions(&pkt.Header)
d.sendingPacket(
&pkt.Header,
len(payload),
&sendPacketMetadata{
layer: int32(epm.layer),
packetTime: mono.UnixNano(),
extSequenceNumber: epm.extSequenceNumber,
extTimestamp: epm.extTimestamp,
isRTX: true,
isOutOfOrder: true,
},
)
d.pacer.Enqueue(pacer.Packet{
d.updateStats(updateStatsParams{
packetTime: mono.UnixNano(),
extSequenceNumber: epm.extSequenceNumber,
extTimestamp: epm.extTimestamp,
isOutOfOrder: true,
headerSize: pkt.Header.MarshalSize(),
payloadSize: len(payload),
isRTX: true,
})
d.pacer.Enqueue(&pacer.Packet{
Header: &pkt.Header,
Extensions: extensions,
Payload: payload,
IsRTX: true,
AbsSendTimeExtID: uint8(d.absSendTimeExtID),
@@ -1997,17 +1996,14 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) {
*/
}
func (d *DownTrack) getTranslatedRTPHeader(extPkt *buffer.ExtPacket, tp *TranslationParams) (*rtp.Header, error) {
hdr := extPkt.Packet.Header
hdr.PayloadType = d.getTranslatedPayloadType(hdr.PayloadType)
hdr.Timestamp = uint32(tp.rtp.extTimestamp)
hdr.SequenceNumber = uint16(tp.rtp.extSequenceNumber)
hdr.SSRC = d.ssrc
if tp.marker {
hdr.Marker = tp.marker
func (d *DownTrack) addDummyExtensions(hdr *rtp.Header) {
// add dummy extensions (actual ones will be filed by pacer) to get header size
if d.absSendTimeExtID != 0 {
hdr.SetExtension(uint8(d.absSendTimeExtID), dummyAbsSendTimeExt)
}
if d.transportWideExtID != 0 {
hdr.SetExtension(uint8(d.transportWideExtID), dummyTransportCCExt)
}
return &hdr, nil
}
func (d *DownTrack) getTranslatedPayloadType(src uint8) uint8 {
@@ -2158,7 +2154,7 @@ func (d *DownTrack) sendSilentFrameOnMuteForOpus() {
return
}
for i := 0; i < len(snts); i++ {
hdr := rtp.Header{
hdr := &rtp.Header{
Version: 2,
Padding: false,
Marker: true,
@@ -2166,8 +2162,8 @@ func (d *DownTrack) sendSilentFrameOnMuteForOpus() {
SequenceNumber: uint16(snts[i].extSequenceNumber),
Timestamp: uint32(snts[i].extTimestamp),
SSRC: d.ssrc,
CSRC: []uint32{},
}
d.addDummyExtensions(hdr)
payload, err := d.getOpusBlankFrame(false)
if err != nil {
@@ -2175,19 +2171,17 @@ func (d *DownTrack) sendSilentFrameOnMuteForOpus() {
return
}
d.sendingPacket(
&hdr,
len(payload),
&sendPacketMetadata{
packetTime: mono.UnixNano(),
extSequenceNumber: snts[i].extSequenceNumber,
extTimestamp: snts[i].extTimestamp,
// although this is using empty frames, mark as padding as these are used to trigger Pion OnTrack only
isPadding: true,
},
)
d.pacer.Enqueue(pacer.Packet{
Header: &hdr,
d.updateStats(updateStatsParams{
packetTime: mono.UnixNano(),
extSequenceNumber: snts[i].extSequenceNumber,
extTimestamp: snts[i].extTimestamp,
headerSize: hdr.MarshalSize(),
payloadSize: len(payload),
// although this is using empty frames, mark as padding as these are used to trigger Pion OnTrack only
isPadding: true,
})
d.pacer.Enqueue(&pacer.Packet{
Header: hdr,
Payload: payload,
AbsSendTimeExtID: uint8(d.absSendTimeExtID),
TransportWideExtID: uint8(d.transportWideExtID),
@@ -2219,27 +2213,26 @@ func (d *DownTrack) handleRTCPSenderReportData(publisherSRData *livekit.RTCPSend
d.rtpStats.MaybeAdjustFirstPacketTime(publisherSRData, tsOffset)
}
type sendPacketMetadata struct {
layer int32
packetTime int64
extSequenceNumber uint64
extTimestamp uint64
isKeyFrame bool
isRTX bool
isOutOfOrder bool
isPadding bool
shouldDisableCounter bool
tp *TranslationParams
type updateStatsParams struct {
packetTime int64
extSequenceNumber uint64
extTimestamp uint64
isOutOfOrder bool
headerSize int
payloadSize int
marker bool
isRTX bool
isPadding bool
disableCounter bool
}
func (d *DownTrack) sendingPacket(hdr *rtp.Header, payloadSize int, spmd *sendPacketMetadata) {
hdrSize := hdr.MarshalSize()
if !spmd.shouldDisableCounter {
func (d *DownTrack) updateStats(params updateStatsParams) {
if !params.disableCounter {
// STREAM-ALLOCATOR-TODO: remove this stream allocator bytes counter once stream allocator changes fully to pull bytes counter
size := uint32(hdrSize + payloadSize)
size := uint32(params.headerSize + params.payloadSize)
d.streamAllocatorBytesCounter.Add(size)
/* STREAM-ALLOCATOR-DATA
if spmd.isRTX {
if params.isRTX {
d.bytesRetransmitted.Add(size)
} else {
d.bytesSent.Add(size)
@@ -2248,36 +2241,23 @@ func (d *DownTrack) sendingPacket(hdr *rtp.Header, payloadSize int, spmd *sendPa
}
// update RTPStats
paddingSize := payloadSize
if spmd.isPadding {
payloadSize := params.payloadSize
paddingSize := params.payloadSize
if params.isPadding {
payloadSize = 0
} else {
paddingSize = 0
}
d.rtpStats.Update(spmd.packetTime, spmd.extSequenceNumber, spmd.extTimestamp, hdr.Marker, hdrSize, payloadSize, paddingSize, spmd.isOutOfOrder)
if spmd.isKeyFrame {
d.isNACKThrottled.Store(false)
d.rtpStats.UpdateKeyFrame(1)
d.params.Logger.Debugw(
"forwarded key frame",
"layer", spmd.layer,
"rtpsn", spmd.extSequenceNumber,
"rtpts", spmd.extTimestamp,
)
}
if spmd.tp != nil {
if spmd.tp.isSwitching {
d.postMaxLayerNotifierEvent("switching")
}
if spmd.tp.isResuming {
if sal := d.getStreamAllocatorListener(); sal != nil {
sal.OnResume(d)
}
}
}
d.rtpStats.Update(
params.packetTime,
params.extSequenceNumber,
params.extTimestamp,
params.marker,
params.headerSize,
payloadSize,
paddingSize,
params.isOutOfOrder,
)
}
// -------------------------------------------------------------------------------

View File

@@ -51,7 +51,7 @@ func (b *Base) SendPacket(p *Packet) (int, error) {
}
}()
err := b.writeRTPHeaderExtensions(p)
err := b.patchRTPHeaderExtensions(p)
if err != nil {
b.logger.Errorw("writing rtp header extensions err", err)
return 0, err
@@ -69,21 +69,8 @@ func (b *Base) SendPacket(p *Packet) (int, error) {
return written, nil
}
// writes RTP header extensions of track
func (b *Base) writeRTPHeaderExtensions(p *Packet) error {
// clear out extensions that may have been in the forwarded header
p.Header.Extension = false
p.Header.ExtensionProfile = 0
p.Header.Extensions = []rtp.Extension{}
for _, ext := range p.Extensions {
if ext.ID == 0 || len(ext.Payload) == 0 {
continue
}
p.Header.SetExtension(ext.ID, ext.Payload)
}
// patch just abs-send-time and transport-cc extensions if applicable
func (b *Base) patchRTPHeaderExtensions(p *Packet) error {
sendingAt := mono.Now()
if p.AbsSendTimeExtID != 0 {
sendTime := rtp.NewAbsSendTimeExtension(sendingAt)

View File

@@ -18,6 +18,7 @@ import (
"sync"
"time"
"github.com/frostbyte73/core"
"github.com/gammazero/deque"
"github.com/livekit/livekit-server/pkg/sfu/sendsidebwe"
"github.com/livekit/protocol/logger"
@@ -32,11 +33,11 @@ type LeakyBucket struct {
logger logger.Logger
lock sync.RWMutex
packets deque.Deque[Packet]
interval time.Duration
bitrate int
isStopped bool
lock sync.RWMutex
packets deque.Deque[*Packet]
interval time.Duration
bitrate int
stop core.Fuse
}
func NewLeakyBucket(logger logger.Logger, sendSideBWE *sendsidebwe.SendSideBWE, interval time.Duration, bitrate int) *LeakyBucket {
@@ -67,23 +68,13 @@ func (l *LeakyBucket) SetBitrate(bitrate int) {
}
func (l *LeakyBucket) Stop() {
l.lock.Lock()
if l.isStopped {
l.lock.Unlock()
return
}
l.isStopped = true
l.lock.Unlock()
l.stop.Break()
}
func (l *LeakyBucket) Enqueue(p Packet) {
func (l *LeakyBucket) Enqueue(p *Packet) {
l.lock.Lock()
defer l.lock.Unlock()
if !l.isStopped {
l.packets.PushBack(p)
}
l.packets.PushBack(p)
l.lock.Unlock()
}
func (l *LeakyBucket) sendWorker() {
@@ -121,12 +112,11 @@ func (l *LeakyBucket) sendWorker() {
}
for {
l.lock.Lock()
if l.isStopped {
l.lock.Unlock()
if l.stop.IsBroken() {
return
}
l.lock.Lock()
if l.packets.Len() == 0 {
l.lock.Unlock()
// allow overshoot in next interval with shortage in this interval
@@ -137,7 +127,7 @@ func (l *LeakyBucket) sendWorker() {
p := l.packets.PopFront()
l.lock.Unlock()
written, _ := l.Base.SendPacket(&p)
written, _ := l.Base.SendPacket(p)
toSendBytes -= written
if toSendBytes < 0 {
// overage, wait for next interval

View File

@@ -17,6 +17,7 @@ package pacer
import (
"sync"
"github.com/frostbyte73/core"
"github.com/gammazero/deque"
"github.com/livekit/livekit-server/pkg/sfu/sendsidebwe"
"github.com/livekit/protocol/logger"
@@ -27,10 +28,10 @@ type NoQueue struct {
logger logger.Logger
lock sync.RWMutex
packets deque.Deque[Packet]
wake chan struct{}
isStopped bool
lock sync.RWMutex
packets deque.Deque[*Packet]
wake chan struct{}
stop core.Fuse
}
func NewNoQueue(logger logger.Logger, sendSideBWE *sendsidebwe.SendSideBWE) *NoQueue {
@@ -46,27 +47,17 @@ func NewNoQueue(logger logger.Logger, sendSideBWE *sendsidebwe.SendSideBWE) *NoQ
}
func (n *NoQueue) Stop() {
n.lock.Lock()
if n.isStopped {
n.lock.Unlock()
return
}
close(n.wake)
n.isStopped = true
n.lock.Unlock()
n.stop.Break()
}
func (n *NoQueue) Enqueue(p Packet) {
func (n *NoQueue) Enqueue(p *Packet) {
n.lock.Lock()
defer n.lock.Unlock()
n.packets.PushBack(p)
if n.packets.Len() == 1 && !n.isStopped {
select {
case n.wake <- struct{}{}:
default:
}
n.lock.Unlock()
select {
case n.wake <- struct{}{}:
default:
}
}
@@ -74,12 +65,11 @@ func (n *NoQueue) sendWorker() {
for {
<-n.wake
for {
n.lock.Lock()
if n.isStopped {
n.lock.Unlock()
if n.stop.IsBroken() {
return
}
n.lock.Lock()
if n.packets.Len() == 0 {
n.lock.Unlock()
break
@@ -87,7 +77,7 @@ func (n *NoQueue) sendWorker() {
p := n.packets.PopFront()
n.lock.Unlock()
n.Base.SendPacket(&p)
n.Base.SendPacket(p)
}
}
}

View File

@@ -22,14 +22,8 @@ import (
"github.com/pion/webrtc/v3"
)
type ExtensionData struct {
ID uint8
Payload []byte
}
type Packet struct {
Header *rtp.Header
Extensions []ExtensionData
Payload []byte
IsRTX bool
AbsSendTimeExtID uint8
@@ -40,7 +34,7 @@ type Packet struct {
}
type Pacer interface {
Enqueue(p Packet)
Enqueue(p *Packet)
Stop()
SetInterval(interval time.Duration)

View File

@@ -32,8 +32,8 @@ func NewPassThrough(logger logger.Logger, sendSideBWE *sendsidebwe.SendSideBWE)
func (p *PassThrough) Stop() {
}
func (p *PassThrough) Enqueue(pkt Packet) {
p.Base.SendPacket(&pkt)
func (p *PassThrough) Enqueue(pkt *Packet) {
p.Base.SendPacket(pkt)
}
// ------------------------------------------------