diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index b00406d2b..309f3b2cf 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -7,6 +7,7 @@ import ( "io" "strings" "sync" + "sync/atomic" "time" "github.com/pion/rtcp" @@ -68,6 +69,11 @@ var ( type ReceiverReportListener func(dt *DownTrack, report *rtcp.ReceiverReport) +type PacketStats struct { + octets uint32 + packets uint32 +} + // DownTrack implements TrackLocal, is the track used to write packets // to SFU Subscriber, the track handle the packets for simple, simulcast // and SVC Publisher. @@ -99,8 +105,10 @@ type DownTrack struct { closeOnce sync.Once // Report helpers - octetCount atomicUint32 - packetCount atomicUint32 + primaryStats atomic.Value // contains *PacketStats + rtxStats atomic.Value // contains *PacketStats + paddingStats atomic.Value // contains *PacketStats + lossFraction atomicUint8 // Debug info @@ -149,6 +157,10 @@ func NewDownTrack(c webrtc.RTPCodecCapability, r TrackReceiver, bf *buffer.Facto forwarder: NewForwarder(c, kind), } + d.primaryStats.Store(new(PacketStats)) + d.rtxStats.Store(new(PacketStats)) + d.paddingStats.Store(new(PacketStats)) + return d, nil } @@ -255,7 +267,9 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { d.receiver.SendPLI(layer) } if tp.shouldDrop { - d.pktsDropped.add(1) + if tp.isDroppingRelevant { + d.pktsDropped.add(1) + } return err } @@ -290,23 +304,24 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { _, err = d.writeStream.WriteRTP(hdr, payload) if err == nil { + pktSize := hdr.MarshalSize() + len(payload) for _, f := range d.onPacketSent { - f(d, hdr.MarshalSize()+len(payload)) + f(d, pktSize) } + + d.UpdatePrimaryStats(uint32(pktSize)) } else { d.pktsDropped.add(1) } - // LK-TODO maybe include RTP header size also - d.UpdateStats(uint32(len(payload))) - return err } // WritePaddingRTP tries to write as many padding only RTP packets as necessary // to satisfy given size to the DownTrack func (d *DownTrack) WritePaddingRTP(bytesToSend int) int { - if d.packetCount.get() == 0 { + primaryStats := d.primaryStats.Load().(*PacketStats) + if primaryStats.packets == 0 { return 0 } @@ -377,9 +392,14 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int) int { return bytesSent } - // LK-TODO - check if we should keep separate padding stats size := hdr.MarshalSize() + len(payload) - d.UpdateStats(uint32(size)) + d.UpdatePaddingStats(uint32(size)) + // LK-TOOD-START: + // Maybe call onPacketSent callbacks? But, a couple of issues to think about + // - Analytics - should padding bytes be counted? + // - StreamAllocator probing - should not include padding + // Maybe a separate callback for `onPaddingSent`? + // LK-TODO-END // LK-TODO-START // NACK buffer for these probe packets. @@ -583,14 +603,37 @@ func (d *DownTrack) CreateSenderReport() *rtcp.SenderReport { } } -func (d *DownTrack) UpdateStats(packetLen uint32) { - d.octetCount.add(packetLen) - d.packetCount.add(1) +func (d *DownTrack) UpdatePrimaryStats(packetLen uint32) { + primaryStats, _ := d.primaryStats.Load().(*PacketStats) + + primaryStats.octets += packetLen + primaryStats.packets += 1 + + d.primaryStats.Store(primaryStats) +} + +func (d *DownTrack) UpdateRtxStats(packetLen uint32) { + rtxStats, _ := d.rtxStats.Load().(*PacketStats) + + rtxStats.octets += packetLen + rtxStats.packets += 1 + + d.rtxStats.Store(rtxStats) +} + +func (d *DownTrack) UpdatePaddingStats(packetLen uint32) { + paddingStats, _ := d.paddingStats.Load().(*PacketStats) + + paddingStats.octets += packetLen + paddingStats.packets += 1 + + d.paddingStats.Store(paddingStats) } func (d *DownTrack) writeBlankFrameRTP() error { // don't send if nothing has been sent - if d.packetCount.get() == 0 { + primaryStats := d.primaryStats.Load().(*PacketStats) + if primaryStats.packets == 0 { return nil } @@ -624,14 +667,24 @@ func (d *DownTrack) writeBlankFrameRTP() error { return err } + var pktSize int switch d.mime { case "video/vp8": - err = d.writeVP8BlankFrame(&hdr, frameEndNeeded) + pktSize, err = d.writeVP8BlankFrame(&hdr, frameEndNeeded) case "video/h264": - err = d.writeH264BlankFrame(&hdr, frameEndNeeded) + pktSize, err = d.writeH264BlankFrame(&hdr, frameEndNeeded) default: return nil } + if err != nil { + return err + } + + for _, f := range d.onPacketSent { + f(d, pktSize) + } + + d.UpdatePrimaryStats(uint32(pktSize)) // only the first frame will need frameEndNeeded to close out the // previous picture, rest are small key frames @@ -641,7 +694,7 @@ func (d *DownTrack) writeBlankFrameRTP() error { return nil } -func (d *DownTrack) writeVP8BlankFrame(hdr *rtp.Header, frameEndNeeded bool) error { +func (d *DownTrack) writeVP8BlankFrame(hdr *rtp.Header, frameEndNeeded bool) (int, error) { blankVP8 := d.forwarder.GetPaddingVP8(frameEndNeeded) // 1x1 key frame @@ -652,16 +705,16 @@ func (d *DownTrack) writeVP8BlankFrame(hdr *rtp.Header, frameEndNeeded bool) err vp8Header := payload[:blankVP8.HeaderSize] err := blankVP8.MarshalTo(vp8Header) if err != nil { - return err + return 0, err } copy(payload[blankVP8.HeaderSize:], VP8KeyFrame1x1) _, err = d.writeStream.WriteRTP(hdr, payload) - return err + return hdr.MarshalSize() + len(payload), err } -func (d *DownTrack) writeH264BlankFrame(hdr *rtp.Header, frameEndNeeded bool) error { +func (d *DownTrack) writeH264BlankFrame(hdr *rtp.Header, frameEndNeeded bool) (int, error) { // TODO - Jie Zeng // now use STAP-A to compose sps, pps, idr together, most decoder support packetization-mode 1. // if client only support packetization-mode 0, use single nalu unit packet @@ -676,7 +729,7 @@ func (d *DownTrack) writeH264BlankFrame(hdr *rtp.Header, frameEndNeeded bool) er offset += len(payload) } _, err := d.writeStream.WriteRTP(hdr, buf[:offset]) - return err + return hdr.MarshalSize() + offset, err } func (d *DownTrack) handleRTCP(bytes []byte) { @@ -811,13 +864,22 @@ func (d *DownTrack) retransmitPackets(nackedPackets []packetMeta) { if _, err = d.writeStream.WriteRTP(&pkt.Header, payload); err != nil { Logger.Error(err, "Writing rtx packet err") } else { - d.UpdateStats(uint32(n)) + pktSize := pkt.Header.MarshalSize() + len(payload) + for _, f := range d.onPacketSent { + f(d, pktSize) + } + + d.UpdateRtxStats(uint32(pktSize)) } } } -func (d *DownTrack) getSRStats() (octets, packets uint32) { - return d.octetCount.get(), d.packetCount.get() +func (d *DownTrack) getSRStats() (uint32, uint32) { + primary := d.primaryStats.Load().(*PacketStats) + rtx := d.rtxStats.Load().(*PacketStats) + padding := d.paddingStats.Load().(*PacketStats) + + return primary.octets + rtx.octets + padding.octets, primary.packets + rtx.packets + padding.packets } // writes RTP header extensions of track diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index dcfcb4859..8965a0d1b 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -114,10 +114,11 @@ var ( ) type TranslationParams struct { - shouldDrop bool - shouldSendPLI bool - rtp *TranslationParamsRTP - vp8 *TranslationParamsVP8 + shouldDrop bool + isDroppingRelevant bool + shouldSendPLI bool + rtp *TranslationParamsRTP + vp8 *TranslationParamsVP8 } type VideoLayers struct { @@ -755,9 +756,13 @@ func (f *Forwarder) getTranslationParamsAudio(extPkt *buffer.ExtPacket) (*Transl if err != nil { tp.shouldDrop = true if err == ErrPaddingOnlyPacket || err == ErrDuplicatePacket || err == ErrOutOfOrderSequenceNumberCacheMiss { + if err == ErrOutOfOrderSequenceNumberCacheMiss { + tp.isDroppingRelevant = true + } return tp, nil } + tp.isDroppingRelevant = true return tp, err } @@ -813,6 +818,7 @@ func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer in // applied restriction from client requested restriction. // tp.shouldDrop = true + tp.isDroppingRelevant = true return tp, nil } @@ -858,9 +864,13 @@ func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer in if err != nil { tp.shouldDrop = true if err == ErrPaddingOnlyPacket || err == ErrDuplicatePacket || err == ErrOutOfOrderSequenceNumberCacheMiss { + if err == ErrOutOfOrderSequenceNumberCacheMiss { + tp.isDroppingRelevant = true + } return tp, nil } + tp.isDroppingRelevant = true return tp, err } @@ -887,9 +897,13 @@ func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer in // filtered temporal layer, update sequence number offset to prevent holes f.rtpMunger.PacketDropped(extPkt) } + if err == ErrOutOfOrderVP8PictureIdCacheMiss { + tp.isDroppingRelevant = true + } return tp, nil } + tp.isDroppingRelevant = true return tp, err } diff --git a/pkg/sfu/forwarder_test.go b/pkg/sfu/forwarder_test.go index 02f58ab5c..e18964af2 100644 --- a/pkg/sfu/forwarder_test.go +++ b/pkg/sfu/forwarder_test.go @@ -680,7 +680,8 @@ func TestForwarderGetTranslationParamsAudio(t *testing.T) { extPkt, err = testutils.GetTestExtPacket(params) expectedTP = TranslationParams{ - shouldDrop: true, + shouldDrop: true, + isDroppingRelevant: true, } actualTP, err = f.GetTranslationParams(extPkt, 0) require.NoError(t, err) @@ -899,7 +900,8 @@ func TestForwarderGetTranslationParamsVideo(t *testing.T) { } extPkt, _ = testutils.GetTestExtPacketVP8(params, vp8) expectedTP = TranslationParams{ - shouldDrop: true, + shouldDrop: true, + isDroppingRelevant: true, } actualTP, err = f.GetTranslationParams(extPkt, 0) require.NoError(t, err)