Tightening up stats and also counting primary/rtx/padding separately (#247)

* Tightening up stats and also counting primary/rtx/padding separately

* Fix tests

* annotate type stored in atomic.Value
This commit is contained in:
Raja Subramanian
2021-12-10 19:30:27 +05:30
committed by GitHub
parent bda2e9cc59
commit 8c774f144e
3 changed files with 108 additions and 30 deletions
+86 -24
View File
@@ -7,6 +7,7 @@ import (
"io"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/pion/rtcp"
@@ -68,6 +69,11 @@ var (
type ReceiverReportListener func(dt *DownTrack, report *rtcp.ReceiverReport)
type PacketStats struct {
octets uint32
packets uint32
}
// DownTrack implements TrackLocal, is the track used to write packets
// to SFU Subscriber, the track handle the packets for simple, simulcast
// and SVC Publisher.
@@ -99,8 +105,10 @@ type DownTrack struct {
closeOnce sync.Once
// Report helpers
octetCount atomicUint32
packetCount atomicUint32
primaryStats atomic.Value // contains *PacketStats
rtxStats atomic.Value // contains *PacketStats
paddingStats atomic.Value // contains *PacketStats
lossFraction atomicUint8
// Debug info
@@ -149,6 +157,10 @@ func NewDownTrack(c webrtc.RTPCodecCapability, r TrackReceiver, bf *buffer.Facto
forwarder: NewForwarder(c, kind),
}
d.primaryStats.Store(new(PacketStats))
d.rtxStats.Store(new(PacketStats))
d.paddingStats.Store(new(PacketStats))
return d, nil
}
@@ -255,7 +267,9 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error {
d.receiver.SendPLI(layer)
}
if tp.shouldDrop {
d.pktsDropped.add(1)
if tp.isDroppingRelevant {
d.pktsDropped.add(1)
}
return err
}
@@ -290,23 +304,24 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error {
_, err = d.writeStream.WriteRTP(hdr, payload)
if err == nil {
pktSize := hdr.MarshalSize() + len(payload)
for _, f := range d.onPacketSent {
f(d, hdr.MarshalSize()+len(payload))
f(d, pktSize)
}
d.UpdatePrimaryStats(uint32(pktSize))
} else {
d.pktsDropped.add(1)
}
// LK-TODO maybe include RTP header size also
d.UpdateStats(uint32(len(payload)))
return err
}
// WritePaddingRTP tries to write as many padding only RTP packets as necessary
// to satisfy given size to the DownTrack
func (d *DownTrack) WritePaddingRTP(bytesToSend int) int {
if d.packetCount.get() == 0 {
primaryStats := d.primaryStats.Load().(*PacketStats)
if primaryStats.packets == 0 {
return 0
}
@@ -377,9 +392,14 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int) int {
return bytesSent
}
// LK-TODO - check if we should keep separate padding stats
size := hdr.MarshalSize() + len(payload)
d.UpdateStats(uint32(size))
d.UpdatePaddingStats(uint32(size))
// LK-TOOD-START:
// Maybe call onPacketSent callbacks? But, a couple of issues to think about
// - Analytics - should padding bytes be counted?
// - StreamAllocator probing - should not include padding
// Maybe a separate callback for `onPaddingSent`?
// LK-TODO-END
// LK-TODO-START
// NACK buffer for these probe packets.
@@ -583,14 +603,37 @@ func (d *DownTrack) CreateSenderReport() *rtcp.SenderReport {
}
}
func (d *DownTrack) UpdateStats(packetLen uint32) {
d.octetCount.add(packetLen)
d.packetCount.add(1)
func (d *DownTrack) UpdatePrimaryStats(packetLen uint32) {
primaryStats, _ := d.primaryStats.Load().(*PacketStats)
primaryStats.octets += packetLen
primaryStats.packets += 1
d.primaryStats.Store(primaryStats)
}
func (d *DownTrack) UpdateRtxStats(packetLen uint32) {
rtxStats, _ := d.rtxStats.Load().(*PacketStats)
rtxStats.octets += packetLen
rtxStats.packets += 1
d.rtxStats.Store(rtxStats)
}
func (d *DownTrack) UpdatePaddingStats(packetLen uint32) {
paddingStats, _ := d.paddingStats.Load().(*PacketStats)
paddingStats.octets += packetLen
paddingStats.packets += 1
d.paddingStats.Store(paddingStats)
}
func (d *DownTrack) writeBlankFrameRTP() error {
// don't send if nothing has been sent
if d.packetCount.get() == 0 {
primaryStats := d.primaryStats.Load().(*PacketStats)
if primaryStats.packets == 0 {
return nil
}
@@ -624,14 +667,24 @@ func (d *DownTrack) writeBlankFrameRTP() error {
return err
}
var pktSize int
switch d.mime {
case "video/vp8":
err = d.writeVP8BlankFrame(&hdr, frameEndNeeded)
pktSize, err = d.writeVP8BlankFrame(&hdr, frameEndNeeded)
case "video/h264":
err = d.writeH264BlankFrame(&hdr, frameEndNeeded)
pktSize, err = d.writeH264BlankFrame(&hdr, frameEndNeeded)
default:
return nil
}
if err != nil {
return err
}
for _, f := range d.onPacketSent {
f(d, pktSize)
}
d.UpdatePrimaryStats(uint32(pktSize))
// only the first frame will need frameEndNeeded to close out the
// previous picture, rest are small key frames
@@ -641,7 +694,7 @@ func (d *DownTrack) writeBlankFrameRTP() error {
return nil
}
func (d *DownTrack) writeVP8BlankFrame(hdr *rtp.Header, frameEndNeeded bool) error {
func (d *DownTrack) writeVP8BlankFrame(hdr *rtp.Header, frameEndNeeded bool) (int, error) {
blankVP8 := d.forwarder.GetPaddingVP8(frameEndNeeded)
// 1x1 key frame
@@ -652,16 +705,16 @@ func (d *DownTrack) writeVP8BlankFrame(hdr *rtp.Header, frameEndNeeded bool) err
vp8Header := payload[:blankVP8.HeaderSize]
err := blankVP8.MarshalTo(vp8Header)
if err != nil {
return err
return 0, err
}
copy(payload[blankVP8.HeaderSize:], VP8KeyFrame1x1)
_, err = d.writeStream.WriteRTP(hdr, payload)
return err
return hdr.MarshalSize() + len(payload), err
}
func (d *DownTrack) writeH264BlankFrame(hdr *rtp.Header, frameEndNeeded bool) error {
func (d *DownTrack) writeH264BlankFrame(hdr *rtp.Header, frameEndNeeded bool) (int, error) {
// TODO - Jie Zeng
// now use STAP-A to compose sps, pps, idr together, most decoder support packetization-mode 1.
// if client only support packetization-mode 0, use single nalu unit packet
@@ -676,7 +729,7 @@ func (d *DownTrack) writeH264BlankFrame(hdr *rtp.Header, frameEndNeeded bool) er
offset += len(payload)
}
_, err := d.writeStream.WriteRTP(hdr, buf[:offset])
return err
return hdr.MarshalSize() + offset, err
}
func (d *DownTrack) handleRTCP(bytes []byte) {
@@ -811,13 +864,22 @@ func (d *DownTrack) retransmitPackets(nackedPackets []packetMeta) {
if _, err = d.writeStream.WriteRTP(&pkt.Header, payload); err != nil {
Logger.Error(err, "Writing rtx packet err")
} else {
d.UpdateStats(uint32(n))
pktSize := pkt.Header.MarshalSize() + len(payload)
for _, f := range d.onPacketSent {
f(d, pktSize)
}
d.UpdateRtxStats(uint32(pktSize))
}
}
}
func (d *DownTrack) getSRStats() (octets, packets uint32) {
return d.octetCount.get(), d.packetCount.get()
func (d *DownTrack) getSRStats() (uint32, uint32) {
primary := d.primaryStats.Load().(*PacketStats)
rtx := d.rtxStats.Load().(*PacketStats)
padding := d.paddingStats.Load().(*PacketStats)
return primary.octets + rtx.octets + padding.octets, primary.packets + rtx.packets + padding.packets
}
// writes RTP header extensions of track
+18 -4
View File
@@ -114,10 +114,11 @@ var (
)
type TranslationParams struct {
shouldDrop bool
shouldSendPLI bool
rtp *TranslationParamsRTP
vp8 *TranslationParamsVP8
shouldDrop bool
isDroppingRelevant bool
shouldSendPLI bool
rtp *TranslationParamsRTP
vp8 *TranslationParamsVP8
}
type VideoLayers struct {
@@ -755,9 +756,13 @@ func (f *Forwarder) getTranslationParamsAudio(extPkt *buffer.ExtPacket) (*Transl
if err != nil {
tp.shouldDrop = true
if err == ErrPaddingOnlyPacket || err == ErrDuplicatePacket || err == ErrOutOfOrderSequenceNumberCacheMiss {
if err == ErrOutOfOrderSequenceNumberCacheMiss {
tp.isDroppingRelevant = true
}
return tp, nil
}
tp.isDroppingRelevant = true
return tp, err
}
@@ -813,6 +818,7 @@ func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer in
// applied restriction from client requested restriction.
//
tp.shouldDrop = true
tp.isDroppingRelevant = true
return tp, nil
}
@@ -858,9 +864,13 @@ func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer in
if err != nil {
tp.shouldDrop = true
if err == ErrPaddingOnlyPacket || err == ErrDuplicatePacket || err == ErrOutOfOrderSequenceNumberCacheMiss {
if err == ErrOutOfOrderSequenceNumberCacheMiss {
tp.isDroppingRelevant = true
}
return tp, nil
}
tp.isDroppingRelevant = true
return tp, err
}
@@ -887,9 +897,13 @@ func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer in
// filtered temporal layer, update sequence number offset to prevent holes
f.rtpMunger.PacketDropped(extPkt)
}
if err == ErrOutOfOrderVP8PictureIdCacheMiss {
tp.isDroppingRelevant = true
}
return tp, nil
}
tp.isDroppingRelevant = true
return tp, err
}
+4 -2
View File
@@ -680,7 +680,8 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) {
extPkt, err = testutils.GetTestExtPacket(params)
expectedTP = TranslationParams{
shouldDrop: true,
shouldDrop: true,
isDroppingRelevant: true,
}
actualTP, err = f.GetTranslationParams(extPkt, 0)
require.NoError(t, err)
@@ -899,7 +900,8 @@ func TestForwarderGetTranslationParamsVideo(t *testing.T) {
}
extPkt, _ = testutils.GetTestExtPacketVP8(params, vp8)
expectedTP = TranslationParams{
shouldDrop: true,
shouldDrop: true,
isDroppingRelevant: true,
}
actualTP, err = f.GetTranslationParams(extPkt, 0)
require.NoError(t, err)