From ae85e55fd4daf23d0a1ede740288b785b3853c4e Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Tue, 15 Mar 2022 17:47:19 +0530 Subject: [PATCH] Using RTPStats across the board (#515) * WIP commit * Clean up --- go.mod | 2 +- go.sum | 5 +- pkg/sfu/buffer/buffer.go | 268 ++++------- pkg/sfu/buffer/buffer_test.go | 6 +- pkg/{rtc => sfu/buffer}/datastats.go | 2 +- pkg/{rtc => sfu/buffer}/datastats_test.go | 2 +- pkg/{rtc => sfu/buffer}/rtpstats.go | 478 +++++++++++++++---- pkg/{rtc => sfu/buffer}/rtpstats_test.go | 51 +- pkg/sfu/buffer/streamstats.go | 27 +- pkg/sfu/connectionquality/connectionstats.go | 130 +---- pkg/sfu/downtrack.go | 222 ++++----- pkg/sfu/receiver.go | 78 ++- 12 files changed, 692 insertions(+), 579 deletions(-) rename pkg/{rtc => sfu/buffer}/datastats.go (99%) rename pkg/{rtc => sfu/buffer}/datastats_test.go (98%) rename pkg/{rtc => sfu/buffer}/rtpstats.go (62%) rename pkg/{rtc => sfu/buffer}/rtpstats_test.go (63%) diff --git a/go.mod b/go.mod index c1ee8f52f..701adce6a 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/pion/interceptor v0.1.7 github.com/pion/logging v0.2.2 github.com/pion/rtcp v1.2.9 - github.com/pion/rtp v1.7.4 + github.com/pion/rtp v1.7.7 github.com/pion/sdp/v3 v3.0.4 github.com/pion/stun v0.3.5 github.com/pion/transport v0.13.0 diff --git a/go.sum b/go.sum index 44c50f6d1..85d9da14d 100644 --- a/go.sum +++ b/go.sum @@ -134,8 +134,6 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY/mKrcvA= github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= -github.com/livekit/protocol v0.11.14-0.20220311165704-a91198e400ac h1:86mRiCnqY/wEkKBUO+gYmon0WyCNt1ejVDmxBHteTV8= -github.com/livekit/protocol v0.11.14-0.20220311165704-a91198e400ac/go.mod h1:3pHsWUtQmWaH8mG0cXrQWpbf3Vo+kj0U+In77CEXu90= github.com/livekit/protocol v0.11.14-0.20220314072422-5cea5d444f36 h1:D3lWDCvyMuHNVgP4cjscDdK8DBQQykIJFHnx3ragxhk= github.com/livekit/protocol v0.11.14-0.20220314072422-5cea5d444f36/go.mod h1:3pHsWUtQmWaH8mG0cXrQWpbf3Vo+kj0U+In77CEXu90= github.com/magefile/mage v1.11.0 h1:C/55Ywp9BpgVVclD3lRnSYCwXTYxmSppIgLeDYlNuls= @@ -191,8 +189,9 @@ github.com/pion/rtcp v1.2.6/go.mod h1:52rMNPWFsjr39z9B9MhnkqhPLoeHTv1aN63o/42bWE github.com/pion/rtcp v1.2.9 h1:1ujStwg++IOLIEoOiIQ2s+qBuJ1VN81KW+9pMPsif+U= github.com/pion/rtcp v1.2.9/go.mod h1:qVPhiCzAm4D/rxb6XzKeyZiQK69yJpbUDJSF7TgrqNo= github.com/pion/rtp v1.7.0/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko= -github.com/pion/rtp v1.7.4 h1:4dMbjb1SuynU5OpA3kz1zHK+u+eOCQjW3MAeVHf1ODA= github.com/pion/rtp v1.7.4/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko= +github.com/pion/rtp v1.7.7 h1:MzaAfCVicTVxiZpM2o99+YFrKxyRsQ38nnIi4vJPuUY= +github.com/pion/rtp v1.7.7/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko= github.com/pion/sctp v1.8.0/go.mod h1:xFe9cLMZ5Vj6eOzpyiKjT9SwGM4KpK/8Jbw5//jc+0s= github.com/pion/sctp v1.8.2 h1:yBBCIrUMJ4yFICL3RIvR4eh/H2BTTvlligmSTy+3kiA= github.com/pion/sctp v1.8.2/go.mod h1:xFe9cLMZ5Vj6eOzpyiKjT9SwGM4KpK/8Jbw5//jc+0s= diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index d5299b872..68315ec6a 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -68,20 +68,13 @@ type Buffer struct { lastPacketRead int bitrate atomic.Value bitrateHelper [4]int64 - lastSRNTPTime NtpTime - lastSRRTPTime uint32 - lastSRRecv int64 // Represents wall clock of the most recent sender report arrival - lastTransit uint32 pliThrottle int64 - lastPli int64 - started bool - stats StreamStats - rrSnapshot *receiverReportSnapshot - - highestSN uint16 - cycle uint16 + rtpStats *RTPStats + rrSnapshotId uint32 + rembSnapshotId uint32 + connectionQualitySnapshotId uint32 lastFractionLostToReport uint8 // Last fraction lost from subscribers, should report to publisher; Audio only @@ -97,12 +90,6 @@ type Buffer struct { logger logger.Logger } -type receiverReportSnapshot struct { - extHighestSeqNum uint32 - packetsLost uint32 - lastLossRate float32 -} - // BufferOptions provides configuration options for the buffer type Options struct { MaxBitRate uint64 @@ -136,6 +123,13 @@ func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapabili return } + b.rtpStats = NewRTPStats(RTPStatsParams{ + ClockRate: codec.ClockRate, + }) + b.rrSnapshotId = b.rtpStats.NewSnapshotId() + b.rembSnapshotId = b.rtpStats.NewSnapshotId() + b.connectionQualitySnapshotId = b.rtpStats.NewSnapshotId() + b.callbacksQueue.Start() b.clockRate = codec.ClockRate @@ -269,7 +263,14 @@ func (b *Buffer) Close() error { if b.bucket != nil && b.codecType == webrtc.RTPCodecTypeAudio { b.audioPool.Put(b.bucket.src) } + b.closed.Store(true) + + if b.rtpStats != nil { + b.rtpStats.Stop() + b.logger.Debugw("rtp stats", "stats", b.rtpStats.ToString()) + } + b.callbacksQueue.Enqueue(b.onClose) b.callbacksQueue.Stop() }) @@ -288,17 +289,14 @@ func (b *Buffer) SetPLIThrottle(duration int64) { } func (b *Buffer) SendPLI() { - now := time.Now().UnixNano() - - b.Lock() - throttled := now-b.lastPli < b.pliThrottle - if throttled { - b.Unlock() + b.RLock() + if b.rtpStats == nil || b.rtpStats.TimeSinceLastPli() < b.pliThrottle { + b.RUnlock() return } - b.lastPli = now - b.stats.TotalPLIs++ - b.Unlock() + + b.rtpStats.UpdatePliAndTime(1) + b.RUnlock() b.logger.Debugw("send pli", "ssrc", b.mediaSSRC) pli := []rtcp.Packet{ @@ -314,15 +312,28 @@ func (b *Buffer) SetRTT(rtt uint32) { b.Lock() defer b.Unlock() - b.stats.RTT = rtt + if rtt == 0 { + return + } - if b.nacker != nil && rtt != 0 { + if b.nacker != nil { b.nacker.SetRTT(rtt) } + + if b.rtpStats != nil { + b.rtpStats.UpdateRtt(rtt) + } } -func (b *Buffer) LastPLI() int64 { - return b.lastPli +func (b *Buffer) LastPLI() time.Time { + b.RLock() + defer b.RUnlock() + + if b.rtpStats == nil { + return time.Time{} + } + + return b.rtpStats.LastPli() } func (b *Buffer) calc(pkt []byte, arrivalTime int64) { @@ -349,7 +360,7 @@ func (b *Buffer) calc(pkt []byte, arrivalTime int64) { return } - b.updateStreamState(&p, len(pkt), arrivalTime, isRTX) + flowState := b.updateStreamState(&p, arrivalTime) b.processHeaderExtensions(&p, arrivalTime) @@ -364,7 +375,7 @@ func (b *Buffer) calc(pkt []byte, arrivalTime int64) { return } - ep, temporalLayer := b.getExtPacket(pb, &p, arrivalTime) + ep, temporalLayer := b.getExtPacket(pb, &p, arrivalTime, flowState.IsHighestSN) if ep == nil { return } @@ -379,77 +390,20 @@ func (b *Buffer) calc(pkt []byte, arrivalTime int64) { b.doReports(arrivalTime) } -func (b *Buffer) updateStreamState(p *rtp.Packet, pktSize int, arrivalTime int64, isRTX bool) { - sn := p.SequenceNumber +func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime int64) RTPFlowState { + flowState := b.rtpStats.Update(&p.Header, len(p.Payload), int(p.PaddingSize), arrivalTime) - if !b.started { - b.started = true - b.highestSN = sn + if b.nacker != nil { + b.nacker.Remove(p.SequenceNumber) - b.lastReport = arrivalTime - - b.rrSnapshot = &receiverReportSnapshot{ - extHighestSeqNum: uint32(sn) - 1, - packetsLost: 0, - lastLossRate: 0.0, - } - } else { - diff := sn - b.highestSN - if diff > (1 << 15) { - if !isRTX && b.stats.TotalPacketsLost != 0 { - b.stats.TotalPacketsLost-- + if flowState.HasLoss { + for lost := flowState.LossStartInclusive; lost != flowState.LossEndExclusive; lost++ { + b.nacker.Push(lost) } - - // out-of-order, remove it from nack queue - if b.nacker != nil { - b.nacker.Remove(sn) - } - } else { - if diff > 1 { - b.stats.TotalPacketsLost += (uint32(diff) - 1) - if b.nacker != nil { - for lost := b.highestSN + 1; lost != sn; lost++ { - b.nacker.Push(lost) - } - } - } - - if sn < b.highestSN { - b.cycle++ - } - - b.highestSN = sn } } - switch { - case isRTX: - b.stats.TotalRetransmitPackets++ - b.stats.TotalRetransmitBytes += uint64(pktSize) - case len(p.Payload) == 0: - b.stats.TotalPaddingPackets++ - b.stats.TotalPaddingBytes += uint64(pktSize) - default: - b.stats.TotalPrimaryPackets++ - b.stats.TotalPrimaryBytes += uint64(pktSize) - if p.Marker { - b.stats.TotalFrames++ - } - } - - if !isRTX { - // jitter - arrival := uint32(arrivalTime / 1e6 * int64(b.clockRate/1e3)) - transit := arrival - p.Timestamp - if b.lastTransit != 0 { - d := int32(transit - b.lastTransit) - if d < 0 { - d = -d - } - b.stats.Jitter += (float64(d) - b.stats.Jitter) / 16 - } - b.lastTransit = transit - } + return flowState } func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime int64) { @@ -488,9 +442,9 @@ func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime int64) { } } -func (b *Buffer) getExtPacket(rawPacket []byte, rtpPacket *rtp.Packet, arrivalTime int64) (*ExtPacket, int32) { +func (b *Buffer) getExtPacket(rawPacket []byte, rtpPacket *rtp.Packet, arrivalTime int64, isHighestSN bool) (*ExtPacket, int32) { ep := &ExtPacket{ - Head: rtpPacket.SequenceNumber == b.highestSN, + Head: isHighestSN, Packet: rtpPacket, Arrival: arrivalTime, RawPacket: rawPacket, @@ -531,7 +485,9 @@ func (b *Buffer) doNACKs() { b.callbacksQueue.Enqueue(func() { b.feedbackCB(r) }) - b.stats.TotalNACKs += uint32(numSeqNumsNacked) + if b.rtpStats != nil { + b.rtpStats.UpdateNack(uint32(numSeqNumsNacked)) + } } } @@ -585,17 +541,24 @@ func (b *Buffer) buildNACKPacket() ([]rtcp.Packet, int) { } func (b *Buffer) buildREMBPacket() *rtcp.ReceiverEstimatedMaximumBitrate { - br := b.Bitrate() - - lostRate := float32(0) - if b.rrSnapshot != nil { - lostRate = b.rrSnapshot.lastLossRate + if b.rtpStats == nil { + return nil } + br := b.Bitrate() + s := b.rtpStats.SnapshotInfo(b.rembSnapshotId) + if s == nil { + return nil + } + + lostRate := float32(0.0) + if s.PacketsExpected != 0 { + lostRate = float32(s.PacketsLost) / float32(s.PacketsExpected) + } if lostRate < 0.02 { br = int64(float64(br)*1.09) + 2000 } - if lostRate > .1 { + if lostRate > 0.1 { br = int64(float64(br) * float64(1-0.5*lostRate)) } if br > b.maxBitrate { @@ -612,60 +575,22 @@ func (b *Buffer) buildREMBPacket() *rtcp.ReceiverEstimatedMaximumBitrate { } func (b *Buffer) buildReceptionReport() *rtcp.ReceptionReport { - if b.rrSnapshot == nil { + if b.rtpStats == nil { return nil } - extHighestSeqNum := (uint32(b.cycle) << 16) | uint32(b.highestSN) - expectedInInterval := extHighestSeqNum - b.rrSnapshot.extHighestSeqNum - if expectedInInterval == 0 { - return nil - } - - lostInInterval := b.stats.TotalPacketsLost - b.rrSnapshot.packetsLost - if int32(lostInInterval) < 0 { - // could happen if retransmitted packets arrive and make received greater than expected - lostInInterval = 0 - } - - lossRate := float32(lostInInterval) / float32(expectedInInterval) - fracLost := uint8(lossRate * 256.0) - if b.lastFractionLostToReport > fracLost { - // max of fraction lost from all subscribers is bigger than sfu received, use it. - fracLost = b.lastFractionLostToReport - } - - var dlsr uint32 - if b.lastSRRecv != 0 { - delayMS := uint32((time.Now().UnixNano() - b.lastSRRecv) / 1e6) - dlsr = (delayMS / 1e3) << 16 - dlsr |= (delayMS % 1e3) * 65536 / 1000 - } - - b.rrSnapshot = &receiverReportSnapshot{ - extHighestSeqNum: extHighestSeqNum, - packetsLost: b.stats.TotalPacketsLost, - lastLossRate: lossRate, - } - b.stats.LostRate = lossRate - - return &rtcp.ReceptionReport{ - SSRC: b.mediaSSRC, - FractionLost: fracLost, - TotalLost: b.stats.TotalPacketsLost, - LastSequenceNumber: extHighestSeqNum, - Jitter: uint32(b.stats.Jitter), - LastSenderReport: uint32(b.lastSRNTPTime >> 16), - Delay: dlsr, - } + return b.rtpStats.SnapshotRtcpReceptionReport(b.mediaSSRC, b.lastFractionLostToReport, b.rrSnapshotId) } func (b *Buffer) SetSenderReportData(rtpTime uint32, ntpTime uint64) { - b.Lock() - b.lastSRRTPTime = rtpTime - b.lastSRNTPTime = NtpTime(ntpTime) - b.lastSRRecv = time.Now().UnixNano() - b.Unlock() + b.RLock() + defer b.RUnlock() + + if b.rtpStats == nil { + return + } + + b.rtpStats.SetRtcpSenderReportData(rtpTime, NtpTime(ntpTime), time.Now()) } func (b *Buffer) SetLastFractionLostReport(lost uint8) { @@ -754,27 +679,36 @@ func (b *Buffer) GetClockRate() uint32 { return b.clockRate } -// GetSenderReportData returns the rtp, ntp and nanos of the last sender report -func (b *Buffer) GetSenderReportData() (rtpTime uint32, ntpTime NtpTime, lastReceivedTimeInNanosSinceEpoch int64) { - b.RLock() - defer b.RUnlock() - - return b.lastSRRTPTime, b.lastSRNTPTime, b.lastSRRecv -} - func (b *Buffer) GetStats() *StreamStatsWithLayers { b.RLock() defer b.RUnlock() + if b.rtpStats == nil { + return nil + } + + stats := b.rtpStats.ToProto() + layers := make(map[int]LayerStats) layers[0] = LayerStats{ - TotalPackets: b.stats.TotalPrimaryPackets + b.stats.TotalRetransmitPackets + b.stats.TotalPaddingPackets, - TotalBytes: b.stats.TotalPrimaryBytes + b.stats.TotalRetransmitBytes + b.stats.TotalPaddingBytes, - TotalFrames: b.stats.TotalFrames, + TotalPackets: stats.Packets + stats.PacketsDuplicate + stats.PacketsPadding, + TotalBytes: stats.Bytes + stats.BytesDuplicate + stats.BytesPadding, + TotalFrames: stats.Frames, } return &StreamStatsWithLayers{ - StreamStats: b.stats, - Layers: layers, + RTPStats: stats, + Layers: layers, } } + +func (b *Buffer) GetQualityInfo() *RTPSnapshotInfo { + b.RLock() + defer b.RUnlock() + + if b.rtpStats == nil { + return nil + } + + return b.rtpStats.SnapshotInfo(b.connectionQualitySnapshotId) +} diff --git a/pkg/sfu/buffer/buffer_test.go b/pkg/sfu/buffer/buffer_test.go index c3d9bb14c..a271ebc2d 100644 --- a/pkg/sfu/buffer/buffer_test.go +++ b/pkg/sfu/buffer/buffer_test.go @@ -162,7 +162,6 @@ func TestNewBuffer(t *testing.T) { }, } for _, tt := range tests { - tt := tt t.Run(tt.name, func(t *testing.T) { var TestPackets = []*rtp.Packet{ { @@ -195,7 +194,6 @@ func TestNewBuffer(t *testing.T) { buff := NewBuffer(123, pool, pool) buff.codecType = webrtc.RTPCodecTypeVideo require.NotNil(t, buff) - require.NotNil(t, TestPackets) buff.OnFeedback(func(_ []rtcp.Packet) { }) buff.Bind(webrtc.RTPParameters{ @@ -207,8 +205,8 @@ func TestNewBuffer(t *testing.T) { buf, _ := p.Marshal() _, _ = buff.Write(buf) } - require.Equal(t, uint16(1), buff.cycle) - require.Equal(t, uint16(2), buff.highestSN) + require.Equal(t, uint16(1), buff.rtpStats.cycles) + require.Equal(t, uint16(2), buff.rtpStats.highestSN) }) } } diff --git a/pkg/rtc/datastats.go b/pkg/sfu/buffer/datastats.go similarity index 99% rename from pkg/rtc/datastats.go rename to pkg/sfu/buffer/datastats.go index 7794a607d..1160fa91f 100644 --- a/pkg/rtc/datastats.go +++ b/pkg/sfu/buffer/datastats.go @@ -1,4 +1,4 @@ -package rtc +package buffer import ( "sync" diff --git a/pkg/rtc/datastats_test.go b/pkg/sfu/buffer/datastats_test.go similarity index 98% rename from pkg/rtc/datastats_test.go rename to pkg/sfu/buffer/datastats_test.go index ff0bd69d0..583eb22a3 100644 --- a/pkg/rtc/datastats_test.go +++ b/pkg/sfu/buffer/datastats_test.go @@ -1,4 +1,4 @@ -package rtc +package buffer import ( "testing" diff --git a/pkg/rtc/rtpstats.go b/pkg/sfu/buffer/rtpstats.go similarity index 62% rename from pkg/rtc/rtpstats.go rename to pkg/sfu/buffer/rtpstats.go index b488b79bc..57c301ca8 100644 --- a/pkg/rtc/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -1,4 +1,4 @@ -package rtc +package buffer import ( "fmt" @@ -6,7 +6,6 @@ import ( "sync" "time" - "github.com/livekit/livekit-server/pkg/sfu/buffer" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/pion/rtcp" @@ -25,6 +24,28 @@ func getPos(sn uint16) (uint16, uint16) { return sn >> 6, sn & 0x3f } +type RTPFlowState struct { + IsHighestSN bool + HasLoss bool + LossStartInclusive uint16 + LossEndExclusive uint16 +} + +type RTPSnapshotInfo struct { + PacketsExpected uint32 + PacketsLost uint32 + MaxJitter float64 + MaxRtt uint32 +} + +type Snapshot struct { + extStartSN uint32 + maxJitter float64 + isJitterOverridden bool + maxJitterOverridden float64 + maxRtt uint32 +} + type RTPStatsParams struct { ClockRate uint32 } @@ -48,17 +69,25 @@ type RTPStats struct { lastTransit uint32 - bytes uint64 - bytesDuplicate uint64 - bytesPadding uint64 - packetsDuplicate uint32 - packetsPadding uint32 - packetsOutOfOrder uint32 - packetsLost uint32 - frames uint32 + bytes uint64 + bytesDuplicate uint64 + bytesPadding uint64 + packetsDuplicate uint32 + packetsPadding uint32 - jitter float64 - maxJitter float64 + packetsOutOfOrder uint32 + + packetsLost uint32 + isPacketsLostOverridden bool + packetsLostOverridden uint32 + + frames uint32 + + jitter float64 + maxJitter float64 + isJitterOverridden bool + jitterOverridden float64 + maxJitterOverridden float64 seenSNs [NumSequenceNumbers / 64]uint64 gapHistogram [GapHistogramNumBins]uint32 @@ -76,13 +105,18 @@ type RTPStats struct { maxRtt uint32 rtpSR uint32 - ntpSR buffer.NtpTime + ntpSR NtpTime arrivalSR int64 + + nextSnapshotId uint32 + snapshots map[uint32]*Snapshot } func NewRTPStats(params RTPStatsParams) *RTPStats { return &RTPStats{ - params: params, + params: params, + nextSnapshotId: 1, + snapshots: make(map[uint32]*Snapshot), } } @@ -93,10 +127,23 @@ func (r *RTPStats) Stop() { r.endTime = time.Now() } -func (r *RTPStats) Update(rtp *rtp.Packet, packetTime int64) (isHighestSN bool, hasLoss bool, lossStartInclusive uint16, lossEndExclusive uint16) { - // RAJA-TODO-START - // 1. Padding packet stats - // RAJA-TODO-END +func (r *RTPStats) NewSnapshotId() uint32 { + r.lock.Lock() + defer r.lock.Unlock() + + id := r.nextSnapshotId + r.nextSnapshotId++ + return id +} + +func (r *RTPStats) IsActive() bool { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.initialized && r.endTime.IsZero() +} + +func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, packetTime int64) (flowState RTPFlowState) { r.lock.Lock() defer r.lock.Unlock() @@ -109,20 +156,24 @@ func (r *RTPStats) Update(rtp *rtp.Packet, packetTime int64) (isHighestSN bool, r.startTime = time.Now() - r.highestSN = rtp.SequenceNumber - 1 - r.highestTS = rtp.Timestamp + r.highestSN = rtph.SequenceNumber - 1 + r.highestTS = rtph.Timestamp r.highestTime = packetTime - r.extStartSN = uint32(rtp.SequenceNumber) + r.extStartSN = uint32(rtph.SequenceNumber) r.cycles = 0 } + pktSize := uint64(rtph.MarshalSize() + payloadSize + paddingSize) + if payloadSize == 0 { + r.packetsPadding++ + r.bytesPadding += pktSize + } else { + r.bytes += pktSize + } + isDuplicate := false - - pktSize := uint64(rtp.MarshalSize()) - r.bytes += pktSize - - diff := rtp.SequenceNumber - r.highestSN + diff := rtph.SequenceNumber - r.highestSN switch { // duplicate or out-of-order case diff == 0 || diff > (1<<15): @@ -131,13 +182,13 @@ func (r *RTPStats) Update(rtp *rtp.Packet, packetTime int64) (isHighestSN bool, } // adjust start to account for out-of-order packets before o cycle completes - if !r.isCycleCompleted() && (rtp.SequenceNumber-uint16(r.extStartSN) > (1 << 15)) { + if !r.isCycleCompleted() && (rtph.SequenceNumber-uint16(r.extStartSN) > (1 << 15)) { // NOTE: current sequence number is counted as loss as it will be deducted in the duplicate check below - r.packetsLost += uint32(uint16(r.extStartSN) - rtp.SequenceNumber) - r.extStartSN = uint32(rtp.SequenceNumber) + r.packetsLost += uint32(uint16(r.extStartSN) - rtph.SequenceNumber) + r.extStartSN = uint32(rtph.SequenceNumber) } - if r.isSeenSN(rtp.SequenceNumber) { + if r.isSeenSN(rtph.SequenceNumber) { r.bytesDuplicate += pktSize r.packetsDuplicate++ isDuplicate = true @@ -147,57 +198,73 @@ func (r *RTPStats) Update(rtp *rtp.Packet, packetTime int64) (isHighestSN bool, // in-order default: - isHighestSN = true + flowState.IsHighestSN = true if diff > 1 { - hasLoss = true - lossStartInclusive = r.highestSN + 1 - lossEndExclusive = rtp.SequenceNumber + flowState.HasLoss = true + flowState.LossStartInclusive = r.highestSN + 1 + flowState.LossEndExclusive = rtph.SequenceNumber } // update gap histogram r.updateGapHistogram(int(diff)) // update missing sequence numbers - for lost := r.highestSN + 1; lost != rtp.SequenceNumber; lost++ { + for lost := r.highestSN + 1; lost != rtph.SequenceNumber; lost++ { r.clearSeenSN(lost) } r.packetsLost += uint32(diff - 1) - if rtp.SequenceNumber < r.highestSN { + if rtph.SequenceNumber < r.highestSN { r.cycles++ } - r.highestSN = rtp.SequenceNumber - r.highestTS = rtp.Timestamp + r.highestSN = rtph.SequenceNumber + r.highestTS = rtph.Timestamp r.highestTime = packetTime - if rtp.Marker { + if rtph.Marker { r.frames++ } } // set current sequence number in seen list - r.setSeenSN(rtp.SequenceNumber) + r.setSeenSN(rtph.SequenceNumber) if !isDuplicate { - r.updateJitter(rtp, packetTime) + r.updateJitter(rtph, packetTime) } return } -func (r *RTPStats) UpdateNack(nackCount int, nackMissCount int) { - r.lock.Lock() - defer r.lock.Unlock() +func (r *RTPStats) GetTotalPackets() uint32 { + r.lock.RLock() + defer r.lock.RUnlock() - if !r.endTime.IsZero() { - return - } - - r.nacks += uint32(nackCount) - r.nackMisses += uint32(nackMissCount) + return r.getNumPacketsSeen() + r.packetsDuplicate + r.packetsPadding } -func (r *RTPStats) UpdatePli() { +func (r *RTPStats) GetTotalPacketsSansDuplicate() uint32 { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.getNumPacketsSeen() + r.packetsPadding +} + +func (r *RTPStats) GetTotalBytes() uint64 { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.bytes + r.bytesDuplicate + r.bytesPadding +} + +func (r *RTPStats) GetTotalBytesSansDuplicate() uint64 { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.bytes + r.bytesPadding +} + +func (r *RTPStats) UpdatePacketsLost(packetsLost uint32) { r.lock.Lock() defer r.lock.Unlock() @@ -205,10 +272,123 @@ func (r *RTPStats) UpdatePli() { return } - r.plis++ + r.isPacketsLostOverridden = true + r.packetsLostOverridden = packetsLost +} + +func (r *RTPStats) UpdateJitter(jitter float64) { + r.lock.Lock() + defer r.lock.Unlock() + + if !r.endTime.IsZero() { + return + } + + r.isJitterOverridden = true + r.jitterOverridden = jitter + if jitter > r.maxJitterOverridden { + r.maxJitterOverridden = jitter + } + + for _, s := range r.snapshots { + s.isJitterOverridden = true + if jitter > s.maxJitterOverridden { + s.maxJitterOverridden = jitter + } + } +} + +func (r *RTPStats) UpdateNackAndMiss(nackCount uint32, nackMissCount uint32) { + r.lock.Lock() + defer r.lock.Unlock() + + if !r.endTime.IsZero() { + return + } + + r.updateNackLocked(nackCount) + r.updateNackMissLocked(nackMissCount) +} + +func (r *RTPStats) UpdateNack(nackCount uint32) { + r.lock.Lock() + defer r.lock.Unlock() + + if !r.endTime.IsZero() { + return + } + + r.updateNackLocked(nackCount) +} + +func (r *RTPStats) updateNackLocked(nackCount uint32) { + r.nacks += nackCount +} + +func (r *RTPStats) UpdateNackMiss(nackMissCount uint32) { + r.lock.Lock() + defer r.lock.Unlock() + + if !r.endTime.IsZero() { + return + } + + r.updateNackMissLocked(nackMissCount) +} + +func (r *RTPStats) updateNackMissLocked(nackMissCount uint32) { + r.nackMisses += nackMissCount +} + +func (r *RTPStats) UpdatePliAndTime(pliCount uint32) { + r.lock.Lock() + defer r.lock.Unlock() + + if !r.endTime.IsZero() { + return + } + + r.updatePliLocked(pliCount) + r.updatePliTimeLocked() +} + +func (r *RTPStats) UpdatePli(pliCount uint32) { + r.lock.Lock() + defer r.lock.Unlock() + + if !r.endTime.IsZero() { + return + } + + r.updatePliLocked(pliCount) +} + +func (r *RTPStats) updatePliLocked(pliCount uint32) { + r.plis += pliCount +} + +func (r *RTPStats) UpdatePliTime() { + r.lock.Lock() + defer r.lock.Unlock() + + if !r.endTime.IsZero() { + return + } + + r.updatePliTimeLocked() +} + +func (r *RTPStats) updatePliTimeLocked() { r.lastPli = time.Now() } +func (r *RTPStats) LastPli() time.Time { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.lastPli +} + func (r *RTPStats) TimeSinceLastPli() int64 { r.lock.RLock() defer r.lock.RUnlock() @@ -216,7 +396,18 @@ func (r *RTPStats) TimeSinceLastPli() int64 { return time.Now().UnixNano() - r.lastPli.UnixNano() } -func (r *RTPStats) UpdateFir() { +func (r *RTPStats) UpdateFir(firCount uint32) { + r.lock.Lock() + defer r.lock.Unlock() + + if !r.endTime.IsZero() { + return + } + + r.firs += firCount +} + +func (r *RTPStats) UpdateFirTime() { r.lock.Lock() defer r.lock.Unlock() @@ -224,7 +415,6 @@ func (r *RTPStats) UpdateFir() { return } - r.firs++ r.lastFir = time.Now() } @@ -240,9 +430,22 @@ func (r *RTPStats) UpdateRtt(rtt uint32) { if r.rtt > r.maxRtt { r.maxRtt = r.rtt } + + for _, s := range r.snapshots { + if rtt > s.maxRtt { + s.maxRtt = rtt + } + } } -func (r *RTPStats) SetRtcpSenderReportData(rtpTS uint32, ntpTS buffer.NtpTime, arrival time.Time) { +func (r *RTPStats) GetRtt() uint32 { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.rtt +} + +func (r *RTPStats) SetRtcpSenderReportData(rtpTS uint32, ntpTS NtpTime, arrival time.Time) { r.lock.Lock() defer r.lock.Unlock() @@ -251,29 +454,6 @@ func (r *RTPStats) SetRtcpSenderReportData(rtpTS uint32, ntpTS buffer.NtpTime, a r.arrivalSR = arrival.UnixNano() } -// RAJA-REMOVE-START -func (r *RTPStats) GetRtcpSenderReportData() (rtpTS uint32, ntpTS buffer.NtpTime) { - r.lock.RLock() - defer r.lock.RUnlock() - - now := time.Now() - ntpTS = buffer.ToNtpTime(now) - - ntpSR := r.highestTime - if r.ntpSR != 0 { - ntpSR = r.ntpSR.Time().UnixNano() - } - - rtpTS = r.highestTS - if r.rtpSR != 0 { - rtpTS = r.rtpSR - } - rtpTS += uint32((now.UnixNano() - ntpSR) * int64(r.params.ClockRate) / 1e9) - return -} - -// RAJA-REMOVE-END - func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) *rtcp.SenderReport { r.lock.RLock() defer r.lock.RUnlock() @@ -283,7 +463,7 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) *rtcp.SenderReport { } now := time.Now() - nowNTP := buffer.ToNtpTime(now) + nowNTP := ToNtpTime(now) nowRTP := r.highestTS + uint32((now.UnixNano()-r.highestTime)*int64(r.params.ClockRate)/1e9) return &rtcp.SenderReport{ @@ -295,24 +475,24 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) *rtcp.SenderReport { } } -func (r *RTPStats) GetRtcpReceptionReport(ssrc uint32, proxyFracLost uint8) *rtcp.ReceptionReport { - // RAJA-TODO-START - // 1. Have to use snapshot or from beginning - // 2. Set up next snapshot - // RAJA-TODO-END - r.lock.RLock() - defer r.lock.RUnlock() +func (r *RTPStats) SnapshotRtcpReceptionReport(ssrc uint32, proxyFracLost uint8, snapshotId uint32) *rtcp.ReceptionReport { + r.lock.Lock() + snapshot := r.getAndResetSnapshot(snapshotId) + r.lock.Unlock() - if !r.initialized { + if snapshot == nil { return nil } + r.lock.RLock() + defer r.lock.RUnlock() + extHighestSN := r.getExtHighestSN() - packetsExpected := extHighestSN - r.extStartSN + 1 + packetsExpected := extHighestSN - snapshot.extStartSN + 1 if packetsExpected > NumSequenceNumbers { logger.Warnw( "too many packets expected in receiver report", - fmt.Errorf("start: %d, end: %d, expected: %d", r.extStartSN, extHighestSN, packetsExpected), + fmt.Errorf("start: %d, end: %d, expected: %d", snapshot.extStartSN, extHighestSN, packetsExpected), ) return nil } @@ -320,7 +500,7 @@ func (r *RTPStats) GetRtcpReceptionReport(ssrc uint32, proxyFracLost uint8) *rtc return nil } - packetsLost := r.numMissingSNs(uint16(r.extStartSN), uint16(extHighestSN)) + packetsLost := r.numMissingSNs(uint16(snapshot.extStartSN), uint16(extHighestSN)) lossRate := float32(packetsLost) / float32(packetsExpected) fracLost := uint8(lossRate * 256.0) if proxyFracLost > fracLost { @@ -334,17 +514,63 @@ func (r *RTPStats) GetRtcpReceptionReport(ssrc uint32, proxyFracLost uint8) *rtc dlsr |= (delayMS % 1e3) * 65536 / 1000 } + jitter := r.jitter + if r.isJitterOverridden { + jitter = r.jitterOverridden + } + return &rtcp.ReceptionReport{ SSRC: ssrc, FractionLost: fracLost, TotalLost: r.packetsLost, LastSequenceNumber: extHighestSN, - Jitter: uint32(r.jitter), + Jitter: uint32(jitter), LastSenderReport: uint32(r.ntpSR >> 16), Delay: dlsr, } } +func (r *RTPStats) SnapshotInfo(snapshotId uint32) *RTPSnapshotInfo { + r.lock.Lock() + snapshot := r.getAndResetSnapshot(snapshotId) + r.lock.Unlock() + + if snapshot == nil { + return nil + } + + r.lock.RLock() + defer r.lock.RUnlock() + + extHighestSN := r.getExtHighestSN() + packetsExpected := extHighestSN - snapshot.extStartSN + 1 + if packetsExpected > NumSequenceNumbers { + logger.Warnw( + "too many packets expected in loss percentage", + fmt.Errorf("start: %d, end: %d, expected: %d", snapshot.extStartSN, extHighestSN, packetsExpected), + ) + return nil + } + if packetsExpected == 0 { + return nil + } + + packetsLost := r.numMissingSNs(uint16(snapshot.extStartSN), uint16(extHighestSN)) + + maxJitter := snapshot.maxJitter + if snapshot.isJitterOverridden { + maxJitter = snapshot.maxJitterOverridden + } + maxJitterTime := maxJitter / float64(r.params.ClockRate) * 1e6 + + return &RTPSnapshotInfo{ + PacketsExpected: packetsExpected, + PacketsLost: packetsLost, + MaxJitter: maxJitterTime, + MaxRtt: snapshot.maxRtt, + } +} + func (r *RTPStats) ToString() string { p := r.ToProto() if p == nil { @@ -370,7 +596,13 @@ func (r *RTPStats) ToString() string { str += fmt.Sprintf(", o: %d", p.PacketsOutOfOrder) - str += fmt.Sprintf(", c: %d, j: %d(%.1fus)|%d(%.1fus)", r.params.ClockRate, uint32(r.jitter), p.JitterCurrent, uint32(r.maxJitter), p.JitterMax) + jitter := r.jitter + maxJitter := r.maxJitter + if r.isJitterOverridden { + jitter = r.jitterOverridden + maxJitter = r.maxJitterOverridden + } + str += fmt.Sprintf(", c: %d, j: %d(%.1fus)|%d(%.1fus)", r.params.ClockRate, uint32(jitter), p.JitterCurrent, uint32(maxJitter), p.JitterMax) if len(p.GapHistogram) != 0 { first := true @@ -395,7 +627,7 @@ func (r *RTPStats) ToString() string { str += fmt.Sprintf("%d|%+v", p.Firs, p.LastFir.AsTime().Format(time.UnixDate)) str += ", rtt(ms):" - str += fmt.Sprintf("%d|%+d", p.RttCurrent, p.RttMax) + str += fmt.Sprintf("%d|%d", p.RttCurrent, p.RttMax) return str } @@ -428,8 +660,12 @@ func (r *RTPStats) ToProto() *livekit.RTPStats { frameRate := float64(r.frames) / elapsed - packetLostRate := float64(r.packetsLost) / elapsed - packetLostPercentage := float32(r.packetsLost) / float32(packetsExpected) * 100.0 + packetsLost := r.packetsLost + if r.isPacketsLostOverridden { + packetsLost = r.packetsLostOverridden + } + packetLostRate := float64(packetsLost) / elapsed + packetLostPercentage := float32(packetsLost) / float32(packetsExpected) * 100.0 packetDuplicateRate := float64(r.packetsDuplicate) / elapsed bitrateDuplicate := float64(r.bytesDuplicate) * 8.0 / elapsed @@ -437,8 +673,14 @@ func (r *RTPStats) ToProto() *livekit.RTPStats { packetPaddingRate := float64(r.packetsPadding) / elapsed bitratePadding := float64(r.bytesPadding) * 8.0 / elapsed - jitterTime := r.jitter / float64(r.params.ClockRate) * 1e6 - maxJitterTime := r.maxJitter / float64(r.params.ClockRate) * 1e6 + jitter := r.jitter + maxJitter := r.maxJitter + if r.isJitterOverridden { + jitter = r.jitterOverridden + maxJitter = r.maxJitterOverridden + } + jitterTime := jitter / float64(r.params.ClockRate) * 1e6 + maxJitterTime := maxJitter / float64(r.params.ClockRate) * 1e6 p := &livekit.RTPStats{ StartTime: timestamppb.New(r.startTime), @@ -537,7 +779,8 @@ func (r *RTPStats) numMissingSNs(startInclusive uint16, endInclusive uint16) uin seen := uint32(0) idx := startIdx - for idx != endIdx+1 { + loopEnd := (endIdx + 1) % uint16(len(r.seenSNs)) + for idx != loopEnd { mask := uint64((1 << 64) - 1) if idx == startIdx { mask &^= uint64((1 << startRem) - 1) @@ -548,18 +791,15 @@ func (r *RTPStats) numMissingSNs(startInclusive uint16, endInclusive uint16) uin seen += uint32(bits.OnesCount64(r.seenSNs[idx] & mask)) - idx++ - if idx == uint16(len(r.seenSNs)) { - idx = 0 - } + idx = (idx + 1) % uint16(len(r.seenSNs)) } return uint32(endInclusive-startInclusive+1) - seen } -func (r *RTPStats) updateJitter(rtp *rtp.Packet, packetTime int64) { +func (r *RTPStats) updateJitter(rtph *rtp.Header, packetTime int64) { packetTimeRTP := uint32(packetTime / 1e6 * int64(r.params.ClockRate/1e3)) - transit := packetTimeRTP - rtp.Timestamp + transit := packetTimeRTP - rtph.Timestamp if r.lastTransit != 0 { d := int32(transit - r.lastTransit) @@ -570,6 +810,12 @@ func (r *RTPStats) updateJitter(rtp *rtp.Packet, packetTime int64) { if r.jitter > r.maxJitter { r.maxJitter = r.jitter } + + for _, s := range r.snapshots { + if r.jitter > s.maxJitter { + r.maxJitter = r.jitter + } + } } r.lastTransit = transit @@ -588,6 +834,30 @@ func (r *RTPStats) updateGapHistogram(gap int) { } } +func (r *RTPStats) getAndResetSnapshot(snapshotId uint32) *Snapshot { + if !r.initialized { + return nil + } + + snapshot := r.snapshots[snapshotId] + if snapshot == nil { + snapshot = &Snapshot{ + extStartSN: r.extStartSN, + maxJitter: 0.0, + maxRtt: 0, + } + r.snapshots[snapshotId] = snapshot + } + + toReturn := *snapshot + + snapshot.extStartSN = r.getExtHighestSN() + 1 + snapshot.maxJitter = 0.0 + snapshot.maxRtt = 0 + + return &toReturn +} + // ---------------------------------- func AggregateRTPStats(statses []*livekit.RTPStats) *livekit.RTPStats { diff --git a/pkg/rtc/rtpstats_test.go b/pkg/sfu/buffer/rtpstats_test.go similarity index 63% rename from pkg/rtc/rtpstats_test.go rename to pkg/sfu/buffer/rtpstats_test.go index 260557553..335d1d1cc 100644 --- a/pkg/rtc/rtpstats_test.go +++ b/pkg/sfu/buffer/rtpstats_test.go @@ -1,4 +1,4 @@ -package rtc +package buffer import ( "fmt" @@ -42,7 +42,8 @@ func TestRTPStats(t *testing.T) { for now.Sub(startTime) < totalDuration { timestamp += uint32(now.Sub(lastFrameTime).Seconds() * float64(clockRate)) for i := 0; i < packetsPerFrame; i++ { - r.Update(getPacket(sequenceNumber, timestamp, packetSize), time.Now().UnixNano()) + packet := getPacket(sequenceNumber, timestamp, packetSize) + r.Update(&packet.Header, len(packet.Payload), 0, time.Now().UnixNano()) if (sequenceNumber % 100) == 0 { jump := uint16(rand.Float64() * 120.0) sequenceNumber += jump @@ -68,9 +69,10 @@ func TestRTPStats_Update(t *testing.T) { sequenceNumber := uint16(rand.Float64() * float64(1<<16)) timestamp := uint32(rand.Float64() * float64(1<<32)) - isHighest, hasLoss, _, _ := r.Update(getPacket(sequenceNumber, timestamp, 1000), time.Now().UnixNano()) - require.True(t, isHighest) - require.False(t, hasLoss) + packet := getPacket(sequenceNumber, timestamp, 1000) + flowState := r.Update(&packet.Header, len(packet.Payload), 0, time.Now().UnixNano()) + require.True(t, flowState.IsHighestSN) + require.False(t, flowState.HasLoss) require.True(t, r.initialized) require.Equal(t, sequenceNumber, r.highestSN) require.Equal(t, timestamp, r.highestTS) @@ -78,25 +80,28 @@ func TestRTPStats_Update(t *testing.T) { // in-order, no loss sequenceNumber++ timestamp += 3000 - isHighest, hasLoss, _, _ = r.Update(getPacket(sequenceNumber, timestamp, 1000), time.Now().UnixNano()) - require.True(t, isHighest) - require.False(t, hasLoss) + packet = getPacket(sequenceNumber, timestamp, 1000) + flowState = r.Update(&packet.Header, len(packet.Payload), 0, time.Now().UnixNano()) + require.True(t, flowState.IsHighestSN) + require.False(t, flowState.HasLoss) require.Equal(t, sequenceNumber, r.highestSN) require.Equal(t, timestamp, r.highestTS) // out-of-order - isHighest, hasLoss, _, _ = r.Update(getPacket(sequenceNumber-10, timestamp-30000, 1000), time.Now().UnixNano()) - require.False(t, isHighest) - require.False(t, hasLoss) + packet = getPacket(sequenceNumber-10, timestamp-30000, 1000) + flowState = r.Update(&packet.Header, len(packet.Payload), 0, time.Now().UnixNano()) + require.False(t, flowState.IsHighestSN) + require.False(t, flowState.HasLoss) require.Equal(t, sequenceNumber, r.highestSN) require.Equal(t, timestamp, r.highestTS) require.Equal(t, uint32(1), r.packetsOutOfOrder) require.Equal(t, uint32(0), r.packetsDuplicate) // duplicate - isHighest, hasLoss, _, _ = r.Update(getPacket(sequenceNumber-10, timestamp-30000, 1000), time.Now().UnixNano()) - require.False(t, isHighest) - require.False(t, hasLoss) + packet = getPacket(sequenceNumber-10, timestamp-30000, 1000) + flowState = r.Update(&packet.Header, len(packet.Payload), 0, time.Now().UnixNano()) + require.False(t, flowState.IsHighestSN) + require.False(t, flowState.HasLoss) require.Equal(t, sequenceNumber, r.highestSN) require.Equal(t, timestamp, r.highestTS) require.Equal(t, uint32(2), r.packetsOutOfOrder) @@ -105,17 +110,19 @@ func TestRTPStats_Update(t *testing.T) { // loss sequenceNumber += 10 timestamp += 30000 - isHighest, hasLoss, lossStartInclusive, lossEndExclusive := r.Update(getPacket(sequenceNumber, timestamp, 1000), time.Now().UnixNano()) - require.True(t, isHighest) - require.True(t, hasLoss) - require.Equal(t, lossStartInclusive, sequenceNumber-9) - require.Equal(t, lossEndExclusive, sequenceNumber) + packet = getPacket(sequenceNumber, timestamp, 1000) + flowState = r.Update(&packet.Header, len(packet.Payload), 0, time.Now().UnixNano()) + require.True(t, flowState.IsHighestSN) + require.True(t, flowState.HasLoss) + require.Equal(t, sequenceNumber-9, flowState.LossStartInclusive) + require.Equal(t, sequenceNumber, flowState.LossEndExclusive) require.Equal(t, uint32(17), r.packetsLost) // out-of-order should decrement number of lost packets - isHighest, hasLoss, _, _ = r.Update(getPacket(sequenceNumber-15, timestamp-45000, 1000), time.Now().UnixNano()) - require.False(t, isHighest) - require.False(t, hasLoss) + packet = getPacket(sequenceNumber-15, timestamp-45000, 1000) + flowState = r.Update(&packet.Header, len(packet.Payload), 0, time.Now().UnixNano()) + require.False(t, flowState.IsHighestSN) + require.False(t, flowState.HasLoss) require.Equal(t, sequenceNumber, r.highestSN) require.Equal(t, timestamp, r.highestTS) require.Equal(t, uint32(3), r.packetsOutOfOrder) diff --git a/pkg/sfu/buffer/streamstats.go b/pkg/sfu/buffer/streamstats.go index 6b3126243..16b6d9e5d 100644 --- a/pkg/sfu/buffer/streamstats.go +++ b/pkg/sfu/buffer/streamstats.go @@ -1,29 +1,20 @@ package buffer +import "github.com/livekit/protocol/livekit" + type LayerStats struct { TotalPackets uint32 TotalBytes uint64 TotalFrames uint32 } -type StreamStats struct { - TotalPrimaryPackets uint32 - TotalPrimaryBytes uint64 - TotalRetransmitPackets uint32 - TotalRetransmitBytes uint64 - TotalPaddingPackets uint32 - TotalPaddingBytes uint64 - TotalPacketsLost uint32 - TotalFrames uint32 - RTT uint32 - Jitter float64 - TotalNACKs uint32 - TotalPLIs uint32 - TotalFIRs uint32 - LostRate float32 +type StreamStatsWithLayers struct { + RTPStats *livekit.RTPStats + Layers map[int]LayerStats } -type StreamStatsWithLayers struct { - StreamStats StreamStats - Layers map[int]LayerStats +type ConnectionQualityParams struct { + LossPercentage float32 + Jitter float32 + Rtt uint32 } diff --git a/pkg/sfu/connectionquality/connectionstats.go b/pkg/sfu/connectionquality/connectionstats.go index 5945227aa..0f1b53f53 100644 --- a/pkg/sfu/connectionquality/connectionstats.go +++ b/pkg/sfu/connectionquality/connectionstats.go @@ -15,20 +15,11 @@ const ( connectionQualityUpdateInterval = 5 * time.Second ) -type qualityWindow struct { - startSeqNum uint32 - endSeqNum uint32 - startPacketsLost uint32 - endPacketsLost uint32 - maxRTT uint32 - maxJitter uint32 -} - type ConnectionStatsParams struct { UpdateInterval time.Duration CodecType webrtc.RTPCodecType - ClockRate uint32 GetTrackStats func() map[uint32]*buffer.StreamStatsWithLayers + GetQualityParams func() *buffer.ConnectionQualityParams GetIsReducedQuality func() bool Logger logger.Logger } @@ -38,9 +29,8 @@ type ConnectionStats struct { onStatsUpdate func(cs *ConnectionStats, stat *livekit.AnalyticsStat) - lock sync.RWMutex - score float32 - qualityWindows map[uint32]*qualityWindow + lock sync.RWMutex + score float32 done chan struct{} isClosed atomic.Bool @@ -48,10 +38,9 @@ type ConnectionStats struct { func NewConnectionStats(params ConnectionStatsParams) *ConnectionStats { return &ConnectionStats{ - params: params, - score: 4.0, - qualityWindows: make(map[uint32]*qualityWindow), - done: make(chan struct{}), + params: params, + score: 4.0, + done: make(chan struct{}), } } @@ -78,80 +67,23 @@ func (cs *ConnectionStats) GetScore() float32 { return cs.score } -func (cs *ConnectionStats) UpdateWindow(ssrc uint32, extHighestSeqNum uint32, packetsLost uint32, rtt uint32, jitter uint32) { - if cs.isClosed.Load() { - return - } - - cs.lock.Lock() - defer cs.lock.Unlock() - - qw := cs.qualityWindows[ssrc] - if qw == nil { - qw = &qualityWindow{} - cs.qualityWindows[ssrc] = qw - } - - if qw.startSeqNum == 0 { - qw.startSeqNum = extHighestSeqNum - qw.startPacketsLost = packetsLost - } - - if extHighestSeqNum > qw.endSeqNum { - qw.endSeqNum = extHighestSeqNum - qw.endPacketsLost = packetsLost - } - - if rtt > qw.maxRTT { - qw.maxRTT = rtt - } - - if jitter > qw.maxJitter { - qw.maxJitter = jitter - } -} - func (cs *ConnectionStats) updateScore() float32 { cs.lock.Lock() defer cs.lock.Unlock() - expectedPacketsInInterval := uint32(0) - lostPacketsInInterval := uint32(0) - maxRTT := uint32(0) - maxJitter := uint32(0) - for _, qw := range cs.qualityWindows { - expectedPacketsInInterval += qw.endSeqNum - qw.startSeqNum + 1 - lostPacketsInInterval += qw.endPacketsLost - qw.startPacketsLost - if qw.maxRTT > maxRTT { - maxRTT = qw.maxRTT - } - if qw.maxJitter > maxJitter { - maxJitter = qw.maxJitter - } - - qw.startSeqNum = qw.endSeqNum - qw.startPacketsLost = qw.endPacketsLost - qw.maxRTT = 0 - qw.maxJitter = 0 - } - - pctLoss := float32(0.0) - if int32(lostPacketsInInterval) < 0 { - lostPacketsInInterval = 0 - } - if expectedPacketsInInterval > 0 { - pctLoss = (float32(lostPacketsInInterval) / float32(expectedPacketsInInterval)) * 100.0 + s := cs.params.GetQualityParams() + if s == nil { + return cs.score } if cs.params.CodecType == webrtc.RTPCodecTypeAudio { - // covert jitter (in media samples units) to milliseconds - cs.score = AudioConnectionScore(pctLoss, maxRTT, float32(maxJitter)*1000.0/float32(cs.params.ClockRate)) + cs.score = AudioConnectionScore(s.LossPercentage, s.Rtt, s.Jitter) } else { isReducedQuality := false if cs.params.GetIsReducedQuality != nil { isReducedQuality = cs.params.GetIsReducedQuality() } - cs.score = VideoConnectionScore(pctLoss, isReducedQuality) + cs.score = VideoConnectionScore(s.LossPercentage, isReducedQuality) } return cs.score @@ -169,15 +101,7 @@ func (cs *ConnectionStats) getStat() *livekit.AnalyticsStat { analyticsStreams := make([]*livekit.AnalyticsStream, 0, len(streams)) for ssrc, stream := range streams { - maxRTT := stream.StreamStats.RTT - maxJitter := uint32(stream.StreamStats.Jitter) - - if qw := cs.qualityWindows[ssrc]; qw != nil { - maxRTT = qw.maxRTT - maxJitter = qw.maxJitter - } - - as := ToAnalyticsStream(ssrc, &stream.StreamStats, maxRTT, maxJitter, cs.params.ClockRate) + as := ToAnalyticsStream(ssrc, stream.RTPStats) // // add video layer if either @@ -225,24 +149,22 @@ func (cs *ConnectionStats) updateStats() { } } -func ToAnalyticsStream(ssrc uint32, streamStats *buffer.StreamStats, maxRTT uint32, maxJitter uint32, clockRate uint32) *livekit.AnalyticsStream { - // convert jitter (from number of media samples to microseconds - jitter := uint32((float32(maxJitter) * 1e6) / float32(clockRate)) +func ToAnalyticsStream(ssrc uint32, rtpStats *livekit.RTPStats) *livekit.AnalyticsStream { return &livekit.AnalyticsStream{ Ssrc: ssrc, - TotalPrimaryPackets: streamStats.TotalPrimaryPackets, - TotalPrimaryBytes: streamStats.TotalPrimaryBytes, - TotalRetransmitPackets: streamStats.TotalRetransmitPackets, - TotalRetransmitBytes: streamStats.TotalRetransmitBytes, - TotalPaddingPackets: streamStats.TotalPaddingPackets, - TotalPaddingBytes: streamStats.TotalPaddingBytes, - TotalPacketsLost: streamStats.TotalPacketsLost, - TotalFrames: streamStats.TotalFrames, - Rtt: maxRTT, - Jitter: jitter, - TotalNacks: streamStats.TotalNACKs, - TotalPlis: streamStats.TotalPLIs, - TotalFirs: streamStats.TotalFIRs, + TotalPrimaryPackets: rtpStats.Packets, + TotalPrimaryBytes: rtpStats.Bytes, + TotalRetransmitPackets: rtpStats.PacketsDuplicate, + TotalRetransmitBytes: rtpStats.BytesDuplicate, + TotalPaddingPackets: rtpStats.PacketsPadding, + TotalPaddingBytes: rtpStats.BytesPadding, + TotalPacketsLost: rtpStats.PacketsLost, + TotalFrames: rtpStats.Frames, + Rtt: rtpStats.RttMax, + Jitter: uint32(rtpStats.JitterMax), + TotalNacks: rtpStats.Nacks, + TotalPlis: rtpStats.Plis, + TotalFirs: rtpStats.Firs, } } diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index eb30545c7..6f159c00c 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -42,6 +42,8 @@ const ( firstKeyFramePLIInterval = 500 * time.Millisecond FlagStopRTXOnPLI = true + + bitrateReportDelta = 1 ) var ( @@ -98,19 +100,18 @@ type DownTrack struct { listenerLock sync.RWMutex closeOnce sync.Once - statsLock sync.RWMutex - stats buffer.StreamStats + rtpStats *buffer.RTPStats totalRepeatedNACKs uint32 - connectionStats *connectionquality.ConnectionStats + connectionStats *connectionquality.ConnectionStats + connectionQualitySnapshotId uint32 + statsLock sync.RWMutex bitrateHelper uint64 bitrate uint64 lastBitrateReport time.Time // Debug info - lastPli atomic.Time - lastRTP atomic.Time pktsDropped atomic.Uint32 isNACKThrottled atomic.Bool @@ -183,9 +184,9 @@ func NewDownTrack( } d.connectionStats = connectionquality.NewConnectionStats(connectionquality.ConnectionStatsParams{ - CodecType: kind, - ClockRate: c.ClockRate, - GetTrackStats: d.GetTrackStats, + CodecType: kind, + GetTrackStats: d.GetTrackStats, + GetQualityParams: d.getQualityParams, GetIsReducedQuality: func() bool { return d.GetForwardingStatus() != ForwardingStatusOptimal }, @@ -199,6 +200,11 @@ func NewDownTrack( } }) + d.rtpStats = buffer.NewRTPStats(buffer.RTPStatsParams{ + ClockRate: d.codec.ClockRate, + }) + d.connectionQualitySnapshotId = d.rtpStats.NewSnapshotId() + return d, nil } @@ -322,12 +328,11 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { return nil } - d.lastRTP.Store(time.Now()) - tp, err := d.forwarder.GetTranslationParams(extPkt, layer) if tp.shouldSendPLI { - d.lastPli.Store(time.Now()) d.receiver.SendPLI(layer) + d.rtpStats.UpdatePli(1) + d.rtpStats.UpdatePliTime() } if tp.shouldDrop { if tp.isDroppingRelevant { @@ -378,10 +383,11 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { }) } - d.updatePrimaryStats(pktSize, hdr.Marker) if extPkt.KeyFrame { d.isNACKThrottled.Store(false) } + + d.rtpStats.Update(hdr, len(payload), 0, time.Now().UnixNano()) d.updateBitrate() } else { d.logger.Errorw("writing rtp packet err", err) @@ -394,12 +400,9 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { // WritePaddingRTP tries to write as many padding only RTP packets as necessary // to satisfy given size to the DownTrack func (d *DownTrack) WritePaddingRTP(bytesToSend int) int { - d.statsLock.RLock() - if d.stats.TotalPrimaryPackets == 0 { - d.statsLock.RUnlock() + if !d.rtpStats.IsActive() { return 0 } - d.statsLock.RUnlock() // LK-TODO-START // Ideally should look at header extensions negotiated for @@ -469,10 +472,10 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int) int { } size := hdr.MarshalSize() + len(payload) - d.updatePaddingStats(size) for _, f := range d.onPaddingSentUnsafe { f(d, size) } + d.rtpStats.Update(&hdr, 0, len(payload), time.Now().UnixNano()) d.updateBitrate() // @@ -491,22 +494,29 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int) int { } func (d *DownTrack) updateBitrate() { - lastRtp := d.lastRTP.Load() + now := time.Now() + d.statsLock.RLock() - timeDiff := lastRtp.Sub(d.lastBitrateReport).Seconds() - d.statsLock.RUnlock() - if timeDiff < 1 { + timeDiff := now.Sub(d.lastBitrateReport).Seconds() + if timeDiff < bitrateReportDelta { + d.statsLock.RUnlock() return } - octets, _ := d.getSRStats() + d.statsLock.RUnlock() + + totalBytes := d.rtpStats.GetTotalBytes() + d.statsLock.Lock() - d.bitrate = uint64(float64(octets*8-d.bitrateHelper) / timeDiff) - d.bitrateHelper = octets * 8 - d.lastBitrateReport = lastRtp + d.bitrate = uint64(float64(totalBytes*8-d.bitrateHelper) / timeDiff) + d.bitrateHelper = totalBytes * 8 + d.lastBitrateReport = now d.statsLock.Unlock() } func (d *DownTrack) Bitrate() uint64 { + d.statsLock.RLock() + defer d.statsLock.RUnlock() + return d.bitrate } @@ -568,6 +578,8 @@ func (d *DownTrack) CloseWithFlush(flush bool) { d.receiver.DeleteDownTrack(d.peerID) d.connectionStats.Close() + d.rtpStats.Stop() + d.logger.Debugw("rtp stats", "stats", d.rtpStats.ToString()) if d.onMaxLayerChanged != nil && d.kind == webrtc.RTPCodecTypeVideo { d.callbacksQueue.Enqueue(func() { @@ -793,66 +805,14 @@ func (d *DownTrack) CreateSenderReport() *rtcp.SenderReport { return nil } - currentLayers := d.forwarder.CurrentLayers() - if currentLayers == InvalidLayers { - return nil - } - - srRTP, srNTP := d.receiver.GetSenderReportTime(currentLayers.spatial) - if srRTP == 0 { - return nil - } - - now := time.Now() - nowNTP := buffer.ToNtpTime(now) - - diff := (uint64(now.Sub(srNTP.Time())) * uint64(d.codec.ClockRate)) / uint64(time.Second) - octets, packets := d.getSRStats() - - return &rtcp.SenderReport{ - SSRC: d.ssrc, - NTPTime: uint64(nowNTP), - RTPTime: srRTP + uint32(diff), - PacketCount: packets, - OctetCount: uint32(octets), - } -} - -func (d *DownTrack) updatePrimaryStats(packetLen int, marker bool) { - d.statsLock.Lock() - defer d.statsLock.Unlock() - - d.stats.TotalPrimaryPackets++ - d.stats.TotalPrimaryBytes += uint64(packetLen) - if marker { - d.stats.TotalFrames++ - } -} - -func (d *DownTrack) updateRtxStats(packetLen int) { - d.statsLock.Lock() - defer d.statsLock.Unlock() - - d.stats.TotalRetransmitPackets++ - d.stats.TotalRetransmitBytes += uint64(packetLen) -} - -func (d *DownTrack) updatePaddingStats(packetLen int) { - d.statsLock.Lock() - defer d.statsLock.Unlock() - - d.stats.TotalPaddingPackets++ - d.stats.TotalPaddingBytes += uint64(packetLen) + return d.rtpStats.GetRtcpSenderReport(d.ssrc) } func (d *DownTrack) writeBlankFrameRTP() error { // don't send if nothing has been sent - d.statsLock.RLock() - if d.stats.TotalPrimaryPackets == 0 { - d.statsLock.RUnlock() + if !d.rtpStats.IsActive() { return nil } - d.statsLock.RUnlock() // LK-TODO: Support other video codecs if d.kind == webrtc.RTPCodecTypeAudio || (d.mime != "video/vp8" && d.mime != "video/h264") { @@ -901,8 +861,6 @@ func (d *DownTrack) writeBlankFrameRTP() error { f(d, pktSize) } - d.updatePrimaryStats(pktSize, hdr.Marker) - // only the first frame will need frameEndNeeded to close out the // previous picture, rest are small key frames frameEndNeeded = false @@ -928,6 +886,9 @@ func (d *DownTrack) writeVP8BlankFrame(hdr *rtp.Header, frameEndNeeded bool) (in copy(payload[blankVP8.HeaderSize:], VP8KeyFrame8x8) _, err = d.writeStream.WriteRTP(hdr, payload) + if err == nil { + d.rtpStats.Update(hdr, len(payload), 0, time.Now().UnixNano()) + } return hdr.MarshalSize() + len(payload), err } @@ -945,7 +906,11 @@ func (d *DownTrack) writeH264BlankFrame(hdr *rtp.Header, frameEndNeeded bool) (i copy(buf[offset:offset+len(payload)], payload) offset += len(payload) } - _, err := d.writeStream.WriteRTP(hdr, buf[:offset]) + payload := buf[:offset] + _, err := d.writeStream.WriteRTP(hdr, payload) + if err == nil { + d.rtpStats.Update(hdr, len(payload), 0, time.Now().UnixNano()) + } return hdr.MarshalSize() + offset, err } @@ -961,9 +926,9 @@ func (d *DownTrack) handleRTCP(bytes []byte) { if pliOnce { targetLayers := d.forwarder.TargetLayers() if targetLayers != InvalidLayers { - d.lastPli.Store(time.Now()) d.receiver.SendPLI(targetLayers.spatial) d.isNACKThrottled.Store(true) + d.rtpStats.UpdatePliTime() pliOnce = false } } @@ -1003,20 +968,15 @@ func (d *DownTrack) handleRTCP(bytes []byte) { } rr.Reports = append(rr.Reports, r) - d.statsLock.Lock() - d.stats.TotalPacketsLost = r.TotalLost + d.rtpStats.UpdatePacketsLost(r.TotalLost) rtt := getRttMs(&r) - if rtt != d.stats.RTT { + if rtt != d.rtpStats.GetRtt() { rttToReport = rtt } - d.stats.RTT = rtt + d.rtpStats.UpdateRtt(rtt) - d.stats.Jitter = float64(r.Jitter) - d.stats.LostRate = float32(r.FractionLost) / 256 - d.statsLock.Unlock() - - d.connectionStats.UpdateWindow(r.SSRC, r.LastSequenceNumber, r.TotalLost, rtt, r.Jitter) + d.rtpStats.UpdateJitter(float64(r.Jitter)) } if len(rr.Reports) > 0 { d.listenerLock.RLock() @@ -1044,11 +1004,9 @@ func (d *DownTrack) handleRTCP(bytes []byte) { } } - d.statsLock.Lock() - d.stats.TotalNACKs += numNACKs - d.stats.TotalPLIs += numPLIs - d.stats.TotalFIRs += numFIRs - d.statsLock.Unlock() + d.rtpStats.UpdateNack(numNACKs) + d.rtpStats.UpdatePli(numPLIs) + d.rtpStats.UpdateFir(numFIRs) if rttToReport != 0 { if d.sequencer != nil { @@ -1089,6 +1047,7 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) { defer PacketFactory.Put(src) numRepeatedNACKs := uint32(0) + nackMisses := uint32(0) for _, meta := range d.sequencer.getPacketsMeta(filtered) { if meta.layer == int8(InvalidLayerSpatial) { if meta.nacked > 1 { @@ -1115,9 +1074,11 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) { pktBuff := *src n, err := d.receiver.ReadRTP(pktBuff, uint8(meta.layer), meta.sourceSeqNo) if err != nil { + d.rtpStats.UpdateNackMiss(1) if err == io.EOF { break } + nackMisses++ continue } var pkt rtp.Packet @@ -1164,23 +1125,15 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) { f(d, pktSize) } - d.updateRtxStats(pktSize) + d.rtpStats.Update(&pkt.Header, len(payload), 0, time.Now().UnixNano()) } } d.statsLock.Lock() d.totalRepeatedNACKs += numRepeatedNACKs d.statsLock.Unlock() -} -func (d *DownTrack) getSRStats() (uint64, uint32) { - d.statsLock.RLock() - defer d.statsLock.RUnlock() - - packets := d.stats.TotalPrimaryPackets + d.stats.TotalRetransmitPackets + d.stats.TotalPaddingPackets - octets := d.stats.TotalPrimaryBytes + d.stats.TotalRetransmitBytes + d.stats.TotalPaddingBytes - - return octets, packets + d.rtpStats.UpdateNackMiss(nackMisses) } // writes RTP header extensions of track @@ -1251,8 +1204,7 @@ func (d *DownTrack) DebugInfo() map[string]interface{} { "LastTS": rtpMungerParams.lastTS, "TSOffset": rtpMungerParams.tsOffset, "LastMarker": rtpMungerParams.lastMarker, - "LastRTP": d.lastRTP.Load(), - "LastPli": d.lastPli.Load(), + "LastPli": d.rtpStats.LastPli(), "PacketsDropped": d.pktsDropped.Load(), } @@ -1281,39 +1233,53 @@ func (d *DownTrack) GetConnectionScore() float32 { } func (d *DownTrack) GetTrackStats() map[uint32]*buffer.StreamStatsWithLayers { - d.statsLock.RLock() - defer d.statsLock.RUnlock() + streamStats := make(map[uint32]*buffer.StreamStatsWithLayers, 1) - stats := make(map[uint32]*buffer.StreamStatsWithLayers, 1) + stats := d.rtpStats.ToProto() layers := make(map[int]buffer.LayerStats) layers[0] = buffer.LayerStats{ - TotalPackets: d.stats.TotalPrimaryPackets + d.stats.TotalRetransmitPackets + d.stats.TotalPaddingPackets, - TotalBytes: d.stats.TotalPrimaryBytes + d.stats.TotalRetransmitBytes + d.stats.TotalPaddingBytes, - TotalFrames: d.stats.TotalFrames, + TotalPackets: stats.Packets + stats.PacketsDuplicate + stats.PacketsPadding, + TotalBytes: stats.Bytes + stats.BytesDuplicate + stats.BytesPadding, + TotalFrames: stats.Frames, } - stats[d.ssrc] = &buffer.StreamStatsWithLayers{ - StreamStats: d.stats, - Layers: layers, + streamStats[d.ssrc] = &buffer.StreamStatsWithLayers{ + RTPStats: stats, + Layers: layers, } - return stats + return streamStats +} + +func (d *DownTrack) getQualityParams() *buffer.ConnectionQualityParams { + s := d.rtpStats.SnapshotInfo(d.connectionQualitySnapshotId) + if s == nil { + return nil + } + + lossPercentage := float32(0.0) + if s.PacketsExpected != 0 { + lossPercentage = float32(s.PacketsLost) * 100.0 / float32(s.PacketsExpected) + } + + return &buffer.ConnectionQualityParams{ + LossPercentage: lossPercentage, + Jitter: float32(s.MaxJitter / 1000.0), + Rtt: s.MaxRtt, + } } func (d *DownTrack) GetNackStats() (totalPackets uint32, totalRepeatedNACKs uint32) { - d.statsLock.RLock() - defer d.statsLock.RUnlock() + totalPackets = d.rtpStats.GetTotalPacketsSansDuplicate() - totalPackets = d.stats.TotalPrimaryPackets + d.stats.TotalPaddingPackets + d.statsLock.RLock() totalRepeatedNACKs = d.totalRepeatedNACKs + d.statsLock.RUnlock() + return } -func (d *DownTrack) LastPLI() int64 { - t := d.lastPli.Load() - if t.IsZero() { - return 0 - } - return t.UnixNano() +func (d *DownTrack) LastPLI() time.Time { + return d.rtpStats.LastPli() } diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index e783307bd..97c3943de 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -35,7 +35,6 @@ type TrackReceiver interface { Codec() webrtc.RTPCodecCapability ReadRTP(buf []byte, layer uint8, sn uint16) (int, error) - GetSenderReportTime(layer int32) (rtpTS uint32, ntpTS buffer.NtpTime) GetBitrateTemporalCumulative() Bitrates SendPLI(layer int32) @@ -168,9 +167,9 @@ func NewWebRTCReceiver( } w.connectionStats = connectionquality.NewConnectionStats(connectionquality.ConnectionStatsParams{ - CodecType: w.kind, - ClockRate: w.codec.ClockRate, - GetTrackStats: w.GetTrackStats, + CodecType: w.kind, + GetTrackStats: w.GetTrackStats, + GetQualityParams: w.getQualityParams, GetIsReducedQuality: func() bool { return w.streamTrackerManager.IsReducedQuality() }, @@ -385,15 +384,6 @@ func (w *WebRTCReceiver) sendRTCP(packets []rtcp.Packet) { default: w.logger.Warnw("sendRTCP failed, rtcp channel full", nil) } - - for _, p := range packets { - switch pkt := p.(type) { - case *rtcp.ReceiverReport: - for _, r := range pkt.Reports { - w.connectionStats.UpdateWindow(r.SSRC, r.LastSequenceNumber, r.TotalLost, w.rtt, r.Jitter) - } - } - } } func (w *WebRTCReceiver) SendPLI(layer int32) { @@ -407,12 +397,17 @@ func (w *WebRTCReceiver) SendPLI(layer int32) { buff.SendPLI() } -func (w *WebRTCReceiver) LastPLI() int64 { - var lastPLI int64 +func (w *WebRTCReceiver) LastPLI() time.Time { + var lastPLI time.Time w.bufferMu.RLock() for _, b := range w.buffers { - if b != nil && b.LastPLI() > lastPLI { - lastPLI = b.LastPLI() + if b == nil { + continue + } + + layerLastPLI := b.LastPLI() + if lastPLI.IsZero() || layerLastPLI.After(lastPLI) { + lastPLI = layerLastPLI } } w.bufferMu.RUnlock() @@ -423,15 +418,6 @@ func (w *WebRTCReceiver) SetRTCPCh(ch chan []rtcp.Packet) { w.rtcpCh = ch } -func (w *WebRTCReceiver) GetSenderReportTime(layer int32) (rtpTS uint32, ntpTS buffer.NtpTime) { - w.bufferMu.RLock() - defer w.bufferMu.RUnlock() - if w.buffers[layer] != nil { - rtpTS, ntpTS, _ = w.buffers[layer].GetSenderReportData() - } - return -} - func (w *WebRTCReceiver) ReadRTP(buf []byte, layer uint8, sn uint16) (int, error) { w.bufferMu.RLock() buff := w.buffers[layer] @@ -465,6 +451,46 @@ func (w *WebRTCReceiver) GetTrackStats() map[uint32]*buffer.StreamStatsWithLayer return stats } +func (w *WebRTCReceiver) getQualityParams() *buffer.ConnectionQualityParams { + w.bufferMu.RLock() + defer w.bufferMu.RUnlock() + + packetsExpected := uint32(0) + packetsLost := uint32(0) + maxJitter := float64(0.0) + maxRtt := uint32(0) + for _, buff := range w.buffers { + if buff == nil { + continue + } + + q := buff.GetQualityInfo() + if q == nil { + continue + } + + packetsExpected += q.PacketsExpected + packetsLost += q.PacketsLost + if q.MaxJitter > maxJitter { + maxJitter = q.MaxJitter + } + if q.MaxRtt > maxRtt { + maxRtt = q.MaxRtt + } + } + + lossPercentage := float32(0.0) + if packetsExpected != 0 { + lossPercentage = float32(packetsLost) * 100.0 / float32(packetsExpected) + } + + return &buffer.ConnectionQualityParams{ + LossPercentage: lossPercentage, + Jitter: float32(maxJitter / 1000.0), + Rtt: maxRtt, + } +} + func (w *WebRTCReceiver) forwardRTP(layer int32) { tracker := w.streamTrackerManager.GetTracker(layer)