Reconcile RTP stats with RTX data. (#3252)

* RTX RTPStats

* WIP

* RTCP RTX handler

* reconcile rtx

* cache size

* clean up

* test

* clean up
This commit is contained in:
Raja Subramanian
2024-12-15 14:33:02 +05:30
committed by GitHub
parent 34ccc2a578
commit cfe3178542
5 changed files with 190 additions and 61 deletions
+5 -5
View File
@@ -43,8 +43,8 @@ type ConnectionStatsReceiverProvider interface {
type ConnectionStatsSenderProvider interface {
GetDeltaStatsSender() map[uint32]*buffer.StreamStatsWithLayers
GetLastReceiverReportTime() time.Time
GetTotalPacketsSent() uint64
GetPrimaryStreamLastReceiverReportTime() time.Time
GetPrimaryStreamPacketsSent() uint64
}
type ConnectionStatsParams struct {
@@ -237,7 +237,7 @@ func (cs *ConnectionStats) updateScoreFromReceiverReport(at time.Time) (float32,
streams := cs.params.SenderProvider.GetDeltaStatsSender()
if len(streams) == 0 {
// check for receiver report not received for a while
marker := cs.params.SenderProvider.GetLastReceiverReportTime()
marker := cs.params.SenderProvider.GetPrimaryStreamLastReceiverReportTime()
if marker.IsZero() || streamingStartedAt.After(marker) {
marker = streamingStartedAt
}
@@ -253,7 +253,7 @@ func (cs *ConnectionStats) updateScoreFromReceiverReport(at time.Time) (float32,
// delta stat duration could be large due to not receiving receiver report for a long time (for example, due to mute),
// adjust to streaming start if necessary
if streamingStartedAt.After(cs.params.SenderProvider.GetLastReceiverReportTime()) {
if streamingStartedAt.After(cs.params.SenderProvider.GetPrimaryStreamLastReceiverReportTime()) {
// last receiver report was before streaming started, wait for next one
mos, _ := cs.scorer.GetMOSAndQuality()
return mos, streams
@@ -294,7 +294,7 @@ func (cs *ConnectionStats) updateStreamingStart(at time.Time) time.Time {
cs.lock.Lock()
defer cs.lock.Unlock()
packetsSent := cs.params.SenderProvider.GetTotalPacketsSent()
packetsSent := cs.params.SenderProvider.GetPrimaryStreamPacketsSent()
if packetsSent > cs.packetsSent {
if cs.streamingStartedAt.IsZero() {
// the start could be anywhere after last update, but using `at` as this is not required to be accurate
+105 -40
View File
@@ -142,15 +142,19 @@ var (
// -------------------------------------------------------------------
type DownTrackState struct {
RTPStats *rtpstats.RTPStatsSender
DeltaStatsSenderSnapshotId uint32
ForwarderState *livekit.RTPForwarderState
PlayoutDelayControllerState PlayoutDelayControllerState
RTPStats *rtpstats.RTPStatsSender
DeltaStatsSenderSnapshotId uint32
RTPStatsRTX *rtpstats.RTPStatsSender
DeltaStatsRTXSenderSnapshotId uint32
ForwarderState *livekit.RTPForwarderState
PlayoutDelayControllerState PlayoutDelayControllerState
}
func (d DownTrackState) MarshalLogObject(e zapcore.ObjectEncoder) error {
e.AddObject("RTPStats", d.RTPStats)
e.AddUint32("DeltaStatsSenderSnapshotId", d.DeltaStatsSenderSnapshotId)
e.AddObject("RTPStatsRTX", d.RTPStatsRTX)
e.AddUint32("DeltaStatsRTXSenderSnapshotId", d.DeltaStatsRTXSenderSnapshotId)
e.AddObject("ForwarderState", logger.Proto(d.ForwarderState))
e.AddObject("PlayoutDelayControllerState", d.PlayoutDelayControllerState)
return nil
@@ -291,14 +295,17 @@ type DownTrack struct {
writeStopped atomic.Bool
isReceiverReady bool
rtpStats *rtpstats.RTPStatsSender
rtpStats *rtpstats.RTPStatsSender
deltaStatsSenderSnapshotId uint32
rtpStatsRTX *rtpstats.RTPStatsSender
deltaStatsRTXSenderSnapshotId uint32
totalRepeatedNACKs atomic.Uint32
blankFramesGeneration atomic.Uint32
connectionStats *connectionquality.ConnectionStats
deltaStatsSenderSnapshotId uint32
connectionStats *connectionquality.ConnectionStats
isNACKThrottled atomic.Bool
@@ -363,9 +370,15 @@ func NewDownTrack(params DowntrackParams) (*DownTrack, error) {
d.rtpStats = rtpstats.NewRTPStatsSender(rtpstats.RTPStatsParams{
ClockRate: d.codec.ClockRate,
Logger: d.params.Logger,
})
}, 4096)
d.deltaStatsSenderSnapshotId = d.rtpStats.NewSenderSnapshotId()
d.rtpStatsRTX = rtpstats.NewRTPStatsSender(rtpstats.RTPStatsParams{
ClockRate: d.codec.ClockRate,
Logger: d.params.Logger,
}, 1024)
d.deltaStatsRTXSenderSnapshotId = d.rtpStatsRTX.NewSenderSnapshotId()
d.forwarder = NewForwarder(
d.kind,
d.params.Logger,
@@ -529,7 +542,7 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters,
if d.ssrcRTX != 0 {
if rr := d.params.BufferFactory.GetOrNew(packetio.RTCPBufferPacket, d.ssrcRTX).(*buffer.RTCPReader); rr != nil {
rr.OnPacket(func(pkt []byte) {
d.handleRTCP(pkt)
d.handleRTCPRTX(pkt)
})
d.rtcpReaderRTX = rr
}
@@ -956,11 +969,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error {
// WritePaddingRTP tries to write as many padding only RTP packets as necessary
// to satisfy given size to the DownTrack
func (d *DownTrack) WritePaddingRTP(
bytesToSend int,
paddingOnMute bool,
forceMarker bool,
) int {
func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool, forceMarker bool) int {
if !d.writable.Load() {
return 0
}
@@ -1189,11 +1198,13 @@ func (d *DownTrack) CloseWithFlush(flush bool) {
d.connectionStats.Close()
d.rtpStats.Stop()
d.rtpStatsRTX.Stop()
d.params.Logger.Debugw("rtp stats",
"direction", "downstream",
"mime", d.mime,
"ssrc", d.ssrc,
"stats", d.rtpStats,
"statsRTX", d.rtpStatsRTX,
)
d.maxLayerNotifierChMu.Lock()
@@ -1241,9 +1252,11 @@ func (d *DownTrack) MaxLayer() buffer.VideoLayer {
func (d *DownTrack) GetState() DownTrackState {
dts := DownTrackState{
RTPStats: d.rtpStats,
DeltaStatsSenderSnapshotId: d.deltaStatsSenderSnapshotId,
ForwarderState: d.forwarder.GetState(),
RTPStats: d.rtpStats,
DeltaStatsSenderSnapshotId: d.deltaStatsSenderSnapshotId,
RTPStatsRTX: d.rtpStatsRTX,
DeltaStatsRTXSenderSnapshotId: d.deltaStatsRTXSenderSnapshotId,
ForwarderState: d.forwarder.GetState(),
}
if d.playoutDelay != nil {
@@ -1263,6 +1276,10 @@ func (d *DownTrack) SeedState(state DownTrackState) {
d.playoutDelay.SeedState(state.PlayoutDelayControllerState)
}
}
if state.RTPStatsRTX != nil {
d.rtpStatsRTX.Seed(state.RTPStatsRTX)
d.deltaStatsRTXSenderSnapshotId = state.DeltaStatsRTXSenderSnapshotId
}
d.forwarder.SeedState(state.ForwarderState)
}
@@ -1522,6 +1539,8 @@ func (d *DownTrack) CreateSenderReport() *rtcp.SenderReport {
_, tsOffset, refSenderReport := d.forwarder.GetSenderReportParams()
return d.rtpStats.GetRtcpSenderReport(d.ssrc, refSenderReport, tsOffset, !d.params.DisableSenderReportPassThrough)
// not sending RTCP Sender Report for RTX
}
func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan struct{} {
@@ -1702,7 +1721,7 @@ func (d *DownTrack) getH264BlankFrame(_frameEndNeeded bool) ([]byte, error) {
func (d *DownTrack) handleRTCP(bytes []byte) {
pkts, err := rtcp.Unmarshal(bytes)
if err != nil {
d.params.Logger.Errorw("could not unmarshal rtcp receiver packets", err)
d.params.Logger.Errorw("could not unmarshal rtcp receiver packet", err)
return
}
@@ -1754,7 +1773,6 @@ func (d *DownTrack) handleRTCP(bytes []byte) {
if r.SSRC != d.ssrc {
continue
}
rr.Reports = append(rr.Reports, r)
rtt, isRttChanged := d.rtpStats.UpdateFromReceiverReport(r)
if isRttChanged {
@@ -1770,6 +1788,14 @@ func (d *DownTrack) handleRTCP(bytes []byte) {
}
}
}
// RTX-TODO: This is used for media loss proxying only as of 2024-12-15.
// Ideally, this should keep deltas between previous RTCP Receiver Report
// and current report, calculate the loss in the window and reconcile it with
// data in a similar window from RTX stream (to ensure losses are discounted
// for NACKs), but keeping this simple for several reasons
// - media loss proxying is a configurable setting and could be disabled
// - media loss proxying is used for audio only and audio may not have NACKing
// - to keep it simple
if len(rr.Reports) > 0 {
d.listenerLock.RLock()
rrListeners := d.receiverReportListeners
@@ -1842,6 +1868,27 @@ func (d *DownTrack) handleRTCP(bytes []byte) {
}
}
func (d *DownTrack) handleRTCPRTX(bytes []byte) {
pkts, err := rtcp.Unmarshal(bytes)
if err != nil {
d.params.Logger.Errorw("could not unmarshal rtcp rtx receiver packet", err)
return
}
for _, pkt := range pkts {
switch p := pkt.(type) {
case *rtcp.ReceiverReport:
for _, r := range p.Reports {
if r.SSRC != d.ssrcRTX {
continue
}
d.rtpStatsRTX.UpdateFromReceiverReport(r)
}
}
}
}
func (d *DownTrack) SetConnected() {
d.bindLock.Lock()
if !d.connected.Swap(true) {
@@ -1874,11 +1921,12 @@ func (d *DownTrack) retransmitPacket(epm *extPacketMeta, sourcePkt []byte, isPro
SSRC: d.ssrc,
}
rtxOffset := 0
rtxExtSequenceNumber := d.rtxSequenceNumber.Inc()
if d.payloadTypeRTX != 0 && d.ssrcRTX != 0 {
rtxOffset = 2
hdr.PayloadType = d.payloadTypeRTX
hdr.SequenceNumber = uint16(d.rtxSequenceNumber.Inc())
hdr.SequenceNumber = uint16(rtxExtSequenceNumber)
hdr.SSRC = d.ssrcRTX
}
@@ -1926,16 +1974,29 @@ func (d *DownTrack) retransmitPacket(epm *extPacketMeta, sourcePkt []byte, isPro
} else {
payloadSize, paddingSize, isOutOfOrder = len(payload), 0, true
}
d.rtpStats.Update(
mono.UnixNano(),
epm.extSequenceNumber,
epm.extTimestamp,
hdr.Marker,
headerSize,
payloadSize,
paddingSize,
isOutOfOrder,
)
if hdr.SSRC == d.ssrcRTX {
d.rtpStatsRTX.Update(
mono.UnixNano(),
rtxExtSequenceNumber,
0,
hdr.Marker,
headerSize,
payloadSize,
paddingSize,
isOutOfOrder,
)
} else {
d.rtpStats.Update(
mono.UnixNano(),
epm.extSequenceNumber,
epm.extTimestamp,
hdr.Marker,
headerSize,
payloadSize,
paddingSize,
isOutOfOrder,
)
}
d.pacer.Enqueue(&pacer.Packet{
Header: hdr,
HeaderSize: headerSize,
@@ -2022,13 +2083,14 @@ func (d *DownTrack) WriteProbePackets(bytesToSend int, usePadding bool) int {
return 0
}
rtxExtSequenceNumber := d.rtxSequenceNumber.Inc()
for i := 0; i < num; i++ {
hdr := &rtp.Header{
Version: 2,
Padding: true,
Marker: false,
PayloadType: d.payloadTypeRTX,
SequenceNumber: uint16(d.rtxSequenceNumber.Inc()),
SequenceNumber: uint16(rtxExtSequenceNumber),
Timestamp: 0,
SSRC: d.ssrcRTX,
}
@@ -2040,18 +2102,16 @@ func (d *DownTrack) WriteProbePackets(bytesToSend int, usePadding bool) int {
hdrSize := hdr.MarshalSize()
payloadSize := len(payload)
/* RTX-TODO
d.rtpStats.Update(
d.rtpStatsRTX.Update(
mono.UnixNano(),
snts[i].extSequenceNumber,
snts[i].extTimestamp,
rtxExtSequenceNumber,
0,
hdr.Marker,
hdrSize,
0,
payloadSize,
false,
)
*/
d.pacer.Enqueue(&pacer.Packet{
Header: hdr,
HeaderSize: hdrSize,
@@ -2148,7 +2208,7 @@ func (d *DownTrack) GetConnectionScoreAndQuality() (float32, livekit.ConnectionQ
}
func (d *DownTrack) GetTrackStats() *livekit.RTPStats {
return d.rtpStats.ToProto()
return rtpstats.ReconcileRTPStatsWithRTX(d.rtpStats.ToProto(), d.rtpStatsRTX.ToProto())
}
func (d *DownTrack) deltaStats(ds *rtpstats.RTPDeltaInfo) map[uint32]*buffer.StreamStatsWithLayers {
@@ -2168,14 +2228,19 @@ func (d *DownTrack) deltaStats(ds *rtpstats.RTPDeltaInfo) map[uint32]*buffer.Str
}
func (d *DownTrack) GetDeltaStatsSender() map[uint32]*buffer.StreamStatsWithLayers {
return d.deltaStats(d.rtpStats.DeltaInfoSender(d.deltaStatsSenderSnapshotId))
return d.deltaStats(
rtpstats.ReconcileRTPDeltaInfoWithRTX(
d.rtpStats.DeltaInfoSender(d.deltaStatsSenderSnapshotId),
d.rtpStatsRTX.DeltaInfoSender(d.deltaStatsRTXSenderSnapshotId),
),
)
}
func (d *DownTrack) GetLastReceiverReportTime() time.Time {
func (d *DownTrack) GetPrimaryStreamLastReceiverReportTime() time.Time {
return d.rtpStats.LastReceiverReportTime()
}
func (d *DownTrack) GetTotalPacketsSent() uint64 {
func (d *DownTrack) GetPrimaryStreamPacketsSent() uint64 {
return d.rtpStats.GetPacketsSeenMinusPadding()
}
+1 -1
View File
@@ -26,7 +26,7 @@ import (
)
func TestPlayoutDelay(t *testing.T) {
stats := rtpstats.NewRTPStatsSender(rtpstats.RTPStatsParams{ClockRate: 900000, Logger: logger.GetLogger()})
stats := rtpstats.NewRTPStatsSender(rtpstats.RTPStatsParams{ClockRate: 900000, Logger: logger.GetLogger()}, 128)
c, err := NewPlayoutDelayController(100, 120, logger.GetLogger(), stats)
require.NoError(t, err)
+64
View File
@@ -54,6 +54,7 @@ type RTPDeltaInfo struct {
RttMax uint32
JitterMax float64
Nacks uint32
NackRepeated uint32
Plis uint32
Firs uint32
}
@@ -81,6 +82,7 @@ func (r *RTPDeltaInfo) MarshalLogObject(e zapcore.ObjectEncoder) error {
e.AddUint32("RttMax", r.RttMax)
e.AddFloat64("JitterMax", r.JitterMax)
e.AddUint32("Nacks", r.Nacks)
e.AddUint32("NackRepeated", r.NackRepeated)
e.AddUint32("Plis", r.Plis)
e.AddUint32("Firs", r.Firs)
return nil
@@ -897,4 +899,66 @@ func AggregateRTPDeltaInfo(deltaInfoList []*RTPDeltaInfo) *RTPDeltaInfo {
}
}
func ReconcileRTPStatsWithRTX(primaryStats *livekit.RTPStats, rtxStats *livekit.RTPStats) *livekit.RTPStats {
if primaryStats == nil || rtxStats == nil {
return primaryStats
}
primaryStats.PacketsDuplicate += rtxStats.Packets
primaryStats.PacketDuplicateRate = float64(primaryStats.PacketsDuplicate) / primaryStats.Duration
primaryStats.BytesDuplicate += rtxStats.Bytes
primaryStats.HeaderBytesDuplicate += rtxStats.HeaderBytes
primaryStats.BitrateDuplicate = float64(primaryStats.BytesDuplicate) * 8.0 / primaryStats.Duration
primaryStats.PacketsPadding += rtxStats.PacketsPadding
primaryStats.PacketPaddingRate = float64(primaryStats.PacketsPadding) / primaryStats.Duration
primaryStats.BytesPadding += rtxStats.BytesPadding
primaryStats.HeaderBytesPadding += rtxStats.HeaderBytesPadding
primaryStats.BitratePadding = float64(primaryStats.BytesPadding) * 8.0 / primaryStats.Duration
// RTX non-padding packets are responses to NACKs, that should discount packets lost,
lossAdjustment := rtxStats.Packets - rtxStats.PacketsLost - primaryStats.NackRepeated
if int32(lossAdjustment) < 0 {
lossAdjustment = 0
}
if lossAdjustment >= primaryStats.PacketsLost {
primaryStats.PacketsLost = 0
} else {
primaryStats.PacketsLost -= lossAdjustment
}
primaryStats.PacketLossRate = float64(primaryStats.PacketsLost) / primaryStats.Duration
primaryStats.PacketLossPercentage = float32(primaryStats.PacketsLost) / float32(primaryStats.Packets+primaryStats.PacketsPadding+primaryStats.PacketsLost) * 100.0
return primaryStats
}
func ReconcileRTPDeltaInfoWithRTX(primaryDeltaInfo *RTPDeltaInfo, rtxDeltaInfo *RTPDeltaInfo) *RTPDeltaInfo {
if primaryDeltaInfo == nil || rtxDeltaInfo == nil {
return primaryDeltaInfo
}
primaryDeltaInfo.PacketsDuplicate += rtxDeltaInfo.Packets
primaryDeltaInfo.BytesDuplicate += rtxDeltaInfo.Bytes
primaryDeltaInfo.HeaderBytesDuplicate += rtxDeltaInfo.HeaderBytes
primaryDeltaInfo.PacketsPadding += rtxDeltaInfo.PacketsPadding
primaryDeltaInfo.BytesPadding += rtxDeltaInfo.BytesPadding
primaryDeltaInfo.HeaderBytesPadding += rtxDeltaInfo.HeaderBytesPadding
// RTX non-padding packets are responses to NACKs, that should discount packets lost
lossAdjustment := rtxDeltaInfo.Packets - rtxDeltaInfo.PacketsLost - primaryDeltaInfo.NackRepeated
if int32(lossAdjustment) < 0 {
lossAdjustment = 0
}
if lossAdjustment >= primaryDeltaInfo.PacketsLost {
primaryDeltaInfo.PacketsLost = 0
} else {
primaryDeltaInfo.PacketsLost -= lossAdjustment
}
return primaryDeltaInfo
}
// -------------------------------------------------------------------
+15 -15
View File
@@ -29,11 +29,6 @@ import (
"github.com/livekit/protocol/utils/mono"
)
const (
cSnInfoSize = 4096
cSnInfoMask = cSnInfoSize - 1
)
// -------------------------------------------------------------------
type snInfoFlag byte
@@ -147,9 +142,10 @@ type senderSnapshot struct {
frames uint32
nacks uint32
plis uint32
firs uint32
nacks uint32
nackRepeated uint32
plis uint32
firs uint32
maxRtt uint32
maxJitterFeed float64
@@ -223,7 +219,7 @@ type RTPStatsSender struct {
jitterFromRR float64
maxJitterFromRR float64
snInfos [cSnInfoSize]snInfo
snInfos []snInfo
layerLockPlis uint32
lastLayerLockPli time.Time
@@ -238,9 +234,10 @@ type RTPStatsSender struct {
timeReversedCount int
}
func NewRTPStatsSender(params RTPStatsParams) *RTPStatsSender {
func NewRTPStatsSender(params RTPStatsParams, cacheSize int) *RTPStatsSender {
return &RTPStatsSender{
rtpStatsBase: newRTPStatsBase(params),
snInfos: make([]snInfo, cacheSize),
nextSenderSnapshotID: cFirstSnapshotID,
senderSnapshots: make([]senderSnapshot, 2),
}
@@ -271,7 +268,8 @@ func (r *RTPStatsSender) Seed(from *RTPStatsSender) {
r.jitterFromRR = from.jitterFromRR
r.maxJitterFromRR = from.maxJitterFromRR
r.snInfos = from.snInfos
r.snInfos = make([]snInfo, len(from.snInfos))
copy(r.snInfos, from.snInfos)
r.layerLockPlis = from.layerLockPlis
r.lastLayerLockPli = from.lastLayerLockPli
@@ -891,6 +889,7 @@ func (r *RTPStatsSender) DeltaInfoSender(senderSnapshotID uint32) *RTPDeltaInfo
RttMax: then.maxRtt,
JitterMax: maxJitterTime,
Nacks: now.nacks - then.nacks,
NackRepeated: now.nackRepeated - then.nackRepeated,
Plis: now.plis - then.plis,
Firs: now.firs - then.firs,
}
@@ -968,6 +967,7 @@ func (r *RTPStatsSender) getSenderSnapshot(startTime time.Time, s *senderSnapsho
packetsLostFromRR: r.packetsLostFromRR,
frames: s.frames + s.intervalStats.frames,
nacks: r.nacks,
nackRepeated: r.nackRepeated,
plis: r.plis,
firs: r.firs,
maxRtt: r.rtt,
@@ -979,12 +979,12 @@ func (r *RTPStatsSender) getSenderSnapshot(startTime time.Time, s *senderSnapsho
func (r *RTPStatsSender) getSnInfoOutOfOrderSlot(esn uint64, ehsn uint64) int {
offset := int64(ehsn - esn)
if offset >= cSnInfoSize || offset < 0 {
if offset >= int64(len(r.snInfos)) || offset < 0 {
// too old OR too new (i. e. ahead of highest)
return -1
}
return int(esn & cSnInfoMask)
return int(esn) % len(r.snInfos)
}
func (r *RTPStatsSender) setSnInfo(esn uint64, ehsn uint64, pktSize uint16, hdrSize uint8, payloadSize uint16, marker bool, isOutOfOrder bool) {
@@ -995,7 +995,7 @@ func (r *RTPStatsSender) setSnInfo(esn uint64, ehsn uint64, pktSize uint16, hdrS
return
}
} else {
slot = int(esn & cSnInfoMask)
slot = int(esn) % len(r.snInfos)
}
snInfo := &r.snInfos[slot]
@@ -1019,7 +1019,7 @@ func (r *RTPStatsSender) clearSnInfos(extStartInclusive uint64, extEndExclusive
}
for esn := extStartInclusive; esn != extEndExclusive; esn++ {
snInfo := &r.snInfos[esn&cSnInfoMask]
snInfo := &r.snInfos[int(esn)%len(r.snInfos)]
snInfo.pktSize = 0
snInfo.hdrSize = 0
snInfo.flags = 0