Using RTPStats across the board (#515)

* WIP commit

* Clean up
This commit is contained in:
Raja Subramanian
2022-03-15 17:47:19 +05:30
committed by GitHub
parent 61ac44e5f7
commit ae85e55fd4
12 changed files with 692 additions and 579 deletions
+1 -1
View File
@@ -23,7 +23,7 @@ require (
github.com/pion/interceptor v0.1.7
github.com/pion/logging v0.2.2
github.com/pion/rtcp v1.2.9
github.com/pion/rtp v1.7.4
github.com/pion/rtp v1.7.7
github.com/pion/sdp/v3 v3.0.4
github.com/pion/stun v0.3.5
github.com/pion/transport v0.13.0
+2 -3
View File
@@ -134,8 +134,6 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY/mKrcvA=
github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/livekit/protocol v0.11.14-0.20220311165704-a91198e400ac h1:86mRiCnqY/wEkKBUO+gYmon0WyCNt1ejVDmxBHteTV8=
github.com/livekit/protocol v0.11.14-0.20220311165704-a91198e400ac/go.mod h1:3pHsWUtQmWaH8mG0cXrQWpbf3Vo+kj0U+In77CEXu90=
github.com/livekit/protocol v0.11.14-0.20220314072422-5cea5d444f36 h1:D3lWDCvyMuHNVgP4cjscDdK8DBQQykIJFHnx3ragxhk=
github.com/livekit/protocol v0.11.14-0.20220314072422-5cea5d444f36/go.mod h1:3pHsWUtQmWaH8mG0cXrQWpbf3Vo+kj0U+In77CEXu90=
github.com/magefile/mage v1.11.0 h1:C/55Ywp9BpgVVclD3lRnSYCwXTYxmSppIgLeDYlNuls=
@@ -191,8 +189,9 @@ github.com/pion/rtcp v1.2.6/go.mod h1:52rMNPWFsjr39z9B9MhnkqhPLoeHTv1aN63o/42bWE
github.com/pion/rtcp v1.2.9 h1:1ujStwg++IOLIEoOiIQ2s+qBuJ1VN81KW+9pMPsif+U=
github.com/pion/rtcp v1.2.9/go.mod h1:qVPhiCzAm4D/rxb6XzKeyZiQK69yJpbUDJSF7TgrqNo=
github.com/pion/rtp v1.7.0/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko=
github.com/pion/rtp v1.7.4 h1:4dMbjb1SuynU5OpA3kz1zHK+u+eOCQjW3MAeVHf1ODA=
github.com/pion/rtp v1.7.4/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko=
github.com/pion/rtp v1.7.7 h1:MzaAfCVicTVxiZpM2o99+YFrKxyRsQ38nnIi4vJPuUY=
github.com/pion/rtp v1.7.7/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko=
github.com/pion/sctp v1.8.0/go.mod h1:xFe9cLMZ5Vj6eOzpyiKjT9SwGM4KpK/8Jbw5//jc+0s=
github.com/pion/sctp v1.8.2 h1:yBBCIrUMJ4yFICL3RIvR4eh/H2BTTvlligmSTy+3kiA=
github.com/pion/sctp v1.8.2/go.mod h1:xFe9cLMZ5Vj6eOzpyiKjT9SwGM4KpK/8Jbw5//jc+0s=
+101 -167
View File
@@ -68,20 +68,13 @@ type Buffer struct {
lastPacketRead int
bitrate atomic.Value
bitrateHelper [4]int64
lastSRNTPTime NtpTime
lastSRRTPTime uint32
lastSRRecv int64 // Represents wall clock of the most recent sender report arrival
lastTransit uint32
pliThrottle int64
lastPli int64
started bool
stats StreamStats
rrSnapshot *receiverReportSnapshot
highestSN uint16
cycle uint16
rtpStats *RTPStats
rrSnapshotId uint32
rembSnapshotId uint32
connectionQualitySnapshotId uint32
lastFractionLostToReport uint8 // Last fraction lost from subscribers, should report to publisher; Audio only
@@ -97,12 +90,6 @@ type Buffer struct {
logger logger.Logger
}
type receiverReportSnapshot struct {
extHighestSeqNum uint32
packetsLost uint32
lastLossRate float32
}
// BufferOptions provides configuration options for the buffer
type Options struct {
MaxBitRate uint64
@@ -136,6 +123,13 @@ func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapabili
return
}
b.rtpStats = NewRTPStats(RTPStatsParams{
ClockRate: codec.ClockRate,
})
b.rrSnapshotId = b.rtpStats.NewSnapshotId()
b.rembSnapshotId = b.rtpStats.NewSnapshotId()
b.connectionQualitySnapshotId = b.rtpStats.NewSnapshotId()
b.callbacksQueue.Start()
b.clockRate = codec.ClockRate
@@ -269,7 +263,14 @@ func (b *Buffer) Close() error {
if b.bucket != nil && b.codecType == webrtc.RTPCodecTypeAudio {
b.audioPool.Put(b.bucket.src)
}
b.closed.Store(true)
if b.rtpStats != nil {
b.rtpStats.Stop()
b.logger.Debugw("rtp stats", "stats", b.rtpStats.ToString())
}
b.callbacksQueue.Enqueue(b.onClose)
b.callbacksQueue.Stop()
})
@@ -288,17 +289,14 @@ func (b *Buffer) SetPLIThrottle(duration int64) {
}
func (b *Buffer) SendPLI() {
now := time.Now().UnixNano()
b.Lock()
throttled := now-b.lastPli < b.pliThrottle
if throttled {
b.Unlock()
b.RLock()
if b.rtpStats == nil || b.rtpStats.TimeSinceLastPli() < b.pliThrottle {
b.RUnlock()
return
}
b.lastPli = now
b.stats.TotalPLIs++
b.Unlock()
b.rtpStats.UpdatePliAndTime(1)
b.RUnlock()
b.logger.Debugw("send pli", "ssrc", b.mediaSSRC)
pli := []rtcp.Packet{
@@ -314,15 +312,28 @@ func (b *Buffer) SetRTT(rtt uint32) {
b.Lock()
defer b.Unlock()
b.stats.RTT = rtt
if rtt == 0 {
return
}
if b.nacker != nil && rtt != 0 {
if b.nacker != nil {
b.nacker.SetRTT(rtt)
}
if b.rtpStats != nil {
b.rtpStats.UpdateRtt(rtt)
}
}
func (b *Buffer) LastPLI() int64 {
return b.lastPli
func (b *Buffer) LastPLI() time.Time {
b.RLock()
defer b.RUnlock()
if b.rtpStats == nil {
return time.Time{}
}
return b.rtpStats.LastPli()
}
func (b *Buffer) calc(pkt []byte, arrivalTime int64) {
@@ -349,7 +360,7 @@ func (b *Buffer) calc(pkt []byte, arrivalTime int64) {
return
}
b.updateStreamState(&p, len(pkt), arrivalTime, isRTX)
flowState := b.updateStreamState(&p, arrivalTime)
b.processHeaderExtensions(&p, arrivalTime)
@@ -364,7 +375,7 @@ func (b *Buffer) calc(pkt []byte, arrivalTime int64) {
return
}
ep, temporalLayer := b.getExtPacket(pb, &p, arrivalTime)
ep, temporalLayer := b.getExtPacket(pb, &p, arrivalTime, flowState.IsHighestSN)
if ep == nil {
return
}
@@ -379,77 +390,20 @@ func (b *Buffer) calc(pkt []byte, arrivalTime int64) {
b.doReports(arrivalTime)
}
func (b *Buffer) updateStreamState(p *rtp.Packet, pktSize int, arrivalTime int64, isRTX bool) {
sn := p.SequenceNumber
func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime int64) RTPFlowState {
flowState := b.rtpStats.Update(&p.Header, len(p.Payload), int(p.PaddingSize), arrivalTime)
if !b.started {
b.started = true
b.highestSN = sn
if b.nacker != nil {
b.nacker.Remove(p.SequenceNumber)
b.lastReport = arrivalTime
b.rrSnapshot = &receiverReportSnapshot{
extHighestSeqNum: uint32(sn) - 1,
packetsLost: 0,
lastLossRate: 0.0,
}
} else {
diff := sn - b.highestSN
if diff > (1 << 15) {
if !isRTX && b.stats.TotalPacketsLost != 0 {
b.stats.TotalPacketsLost--
if flowState.HasLoss {
for lost := flowState.LossStartInclusive; lost != flowState.LossEndExclusive; lost++ {
b.nacker.Push(lost)
}
// out-of-order, remove it from nack queue
if b.nacker != nil {
b.nacker.Remove(sn)
}
} else {
if diff > 1 {
b.stats.TotalPacketsLost += (uint32(diff) - 1)
if b.nacker != nil {
for lost := b.highestSN + 1; lost != sn; lost++ {
b.nacker.Push(lost)
}
}
}
if sn < b.highestSN {
b.cycle++
}
b.highestSN = sn
}
}
switch {
case isRTX:
b.stats.TotalRetransmitPackets++
b.stats.TotalRetransmitBytes += uint64(pktSize)
case len(p.Payload) == 0:
b.stats.TotalPaddingPackets++
b.stats.TotalPaddingBytes += uint64(pktSize)
default:
b.stats.TotalPrimaryPackets++
b.stats.TotalPrimaryBytes += uint64(pktSize)
if p.Marker {
b.stats.TotalFrames++
}
}
if !isRTX {
// jitter
arrival := uint32(arrivalTime / 1e6 * int64(b.clockRate/1e3))
transit := arrival - p.Timestamp
if b.lastTransit != 0 {
d := int32(transit - b.lastTransit)
if d < 0 {
d = -d
}
b.stats.Jitter += (float64(d) - b.stats.Jitter) / 16
}
b.lastTransit = transit
}
return flowState
}
func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime int64) {
@@ -488,9 +442,9 @@ func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime int64) {
}
}
func (b *Buffer) getExtPacket(rawPacket []byte, rtpPacket *rtp.Packet, arrivalTime int64) (*ExtPacket, int32) {
func (b *Buffer) getExtPacket(rawPacket []byte, rtpPacket *rtp.Packet, arrivalTime int64, isHighestSN bool) (*ExtPacket, int32) {
ep := &ExtPacket{
Head: rtpPacket.SequenceNumber == b.highestSN,
Head: isHighestSN,
Packet: rtpPacket,
Arrival: arrivalTime,
RawPacket: rawPacket,
@@ -531,7 +485,9 @@ func (b *Buffer) doNACKs() {
b.callbacksQueue.Enqueue(func() {
b.feedbackCB(r)
})
b.stats.TotalNACKs += uint32(numSeqNumsNacked)
if b.rtpStats != nil {
b.rtpStats.UpdateNack(uint32(numSeqNumsNacked))
}
}
}
@@ -585,17 +541,24 @@ func (b *Buffer) buildNACKPacket() ([]rtcp.Packet, int) {
}
func (b *Buffer) buildREMBPacket() *rtcp.ReceiverEstimatedMaximumBitrate {
br := b.Bitrate()
lostRate := float32(0)
if b.rrSnapshot != nil {
lostRate = b.rrSnapshot.lastLossRate
if b.rtpStats == nil {
return nil
}
br := b.Bitrate()
s := b.rtpStats.SnapshotInfo(b.rembSnapshotId)
if s == nil {
return nil
}
lostRate := float32(0.0)
if s.PacketsExpected != 0 {
lostRate = float32(s.PacketsLost) / float32(s.PacketsExpected)
}
if lostRate < 0.02 {
br = int64(float64(br)*1.09) + 2000
}
if lostRate > .1 {
if lostRate > 0.1 {
br = int64(float64(br) * float64(1-0.5*lostRate))
}
if br > b.maxBitrate {
@@ -612,60 +575,22 @@ func (b *Buffer) buildREMBPacket() *rtcp.ReceiverEstimatedMaximumBitrate {
}
func (b *Buffer) buildReceptionReport() *rtcp.ReceptionReport {
if b.rrSnapshot == nil {
if b.rtpStats == nil {
return nil
}
extHighestSeqNum := (uint32(b.cycle) << 16) | uint32(b.highestSN)
expectedInInterval := extHighestSeqNum - b.rrSnapshot.extHighestSeqNum
if expectedInInterval == 0 {
return nil
}
lostInInterval := b.stats.TotalPacketsLost - b.rrSnapshot.packetsLost
if int32(lostInInterval) < 0 {
// could happen if retransmitted packets arrive and make received greater than expected
lostInInterval = 0
}
lossRate := float32(lostInInterval) / float32(expectedInInterval)
fracLost := uint8(lossRate * 256.0)
if b.lastFractionLostToReport > fracLost {
// max of fraction lost from all subscribers is bigger than sfu received, use it.
fracLost = b.lastFractionLostToReport
}
var dlsr uint32
if b.lastSRRecv != 0 {
delayMS := uint32((time.Now().UnixNano() - b.lastSRRecv) / 1e6)
dlsr = (delayMS / 1e3) << 16
dlsr |= (delayMS % 1e3) * 65536 / 1000
}
b.rrSnapshot = &receiverReportSnapshot{
extHighestSeqNum: extHighestSeqNum,
packetsLost: b.stats.TotalPacketsLost,
lastLossRate: lossRate,
}
b.stats.LostRate = lossRate
return &rtcp.ReceptionReport{
SSRC: b.mediaSSRC,
FractionLost: fracLost,
TotalLost: b.stats.TotalPacketsLost,
LastSequenceNumber: extHighestSeqNum,
Jitter: uint32(b.stats.Jitter),
LastSenderReport: uint32(b.lastSRNTPTime >> 16),
Delay: dlsr,
}
return b.rtpStats.SnapshotRtcpReceptionReport(b.mediaSSRC, b.lastFractionLostToReport, b.rrSnapshotId)
}
func (b *Buffer) SetSenderReportData(rtpTime uint32, ntpTime uint64) {
b.Lock()
b.lastSRRTPTime = rtpTime
b.lastSRNTPTime = NtpTime(ntpTime)
b.lastSRRecv = time.Now().UnixNano()
b.Unlock()
b.RLock()
defer b.RUnlock()
if b.rtpStats == nil {
return
}
b.rtpStats.SetRtcpSenderReportData(rtpTime, NtpTime(ntpTime), time.Now())
}
func (b *Buffer) SetLastFractionLostReport(lost uint8) {
@@ -754,27 +679,36 @@ func (b *Buffer) GetClockRate() uint32 {
return b.clockRate
}
// GetSenderReportData returns the rtp, ntp and nanos of the last sender report
func (b *Buffer) GetSenderReportData() (rtpTime uint32, ntpTime NtpTime, lastReceivedTimeInNanosSinceEpoch int64) {
b.RLock()
defer b.RUnlock()
return b.lastSRRTPTime, b.lastSRNTPTime, b.lastSRRecv
}
func (b *Buffer) GetStats() *StreamStatsWithLayers {
b.RLock()
defer b.RUnlock()
if b.rtpStats == nil {
return nil
}
stats := b.rtpStats.ToProto()
layers := make(map[int]LayerStats)
layers[0] = LayerStats{
TotalPackets: b.stats.TotalPrimaryPackets + b.stats.TotalRetransmitPackets + b.stats.TotalPaddingPackets,
TotalBytes: b.stats.TotalPrimaryBytes + b.stats.TotalRetransmitBytes + b.stats.TotalPaddingBytes,
TotalFrames: b.stats.TotalFrames,
TotalPackets: stats.Packets + stats.PacketsDuplicate + stats.PacketsPadding,
TotalBytes: stats.Bytes + stats.BytesDuplicate + stats.BytesPadding,
TotalFrames: stats.Frames,
}
return &StreamStatsWithLayers{
StreamStats: b.stats,
Layers: layers,
RTPStats: stats,
Layers: layers,
}
}
func (b *Buffer) GetQualityInfo() *RTPSnapshotInfo {
b.RLock()
defer b.RUnlock()
if b.rtpStats == nil {
return nil
}
return b.rtpStats.SnapshotInfo(b.connectionQualitySnapshotId)
}
+2 -4
View File
@@ -162,7 +162,6 @@ func TestNewBuffer(t *testing.T) {
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
var TestPackets = []*rtp.Packet{
{
@@ -195,7 +194,6 @@ func TestNewBuffer(t *testing.T) {
buff := NewBuffer(123, pool, pool)
buff.codecType = webrtc.RTPCodecTypeVideo
require.NotNil(t, buff)
require.NotNil(t, TestPackets)
buff.OnFeedback(func(_ []rtcp.Packet) {
})
buff.Bind(webrtc.RTPParameters{
@@ -207,8 +205,8 @@ func TestNewBuffer(t *testing.T) {
buf, _ := p.Marshal()
_, _ = buff.Write(buf)
}
require.Equal(t, uint16(1), buff.cycle)
require.Equal(t, uint16(2), buff.highestSN)
require.Equal(t, uint16(1), buff.rtpStats.cycles)
require.Equal(t, uint16(2), buff.rtpStats.highestSN)
})
}
}
@@ -1,4 +1,4 @@
package rtc
package buffer
import (
"sync"
@@ -1,4 +1,4 @@
package rtc
package buffer
import (
"testing"
+374 -104
View File
@@ -1,4 +1,4 @@
package rtc
package buffer
import (
"fmt"
@@ -6,7 +6,6 @@ import (
"sync"
"time"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/pion/rtcp"
@@ -25,6 +24,28 @@ func getPos(sn uint16) (uint16, uint16) {
return sn >> 6, sn & 0x3f
}
type RTPFlowState struct {
IsHighestSN bool
HasLoss bool
LossStartInclusive uint16
LossEndExclusive uint16
}
type RTPSnapshotInfo struct {
PacketsExpected uint32
PacketsLost uint32
MaxJitter float64
MaxRtt uint32
}
type Snapshot struct {
extStartSN uint32
maxJitter float64
isJitterOverridden bool
maxJitterOverridden float64
maxRtt uint32
}
type RTPStatsParams struct {
ClockRate uint32
}
@@ -48,17 +69,25 @@ type RTPStats struct {
lastTransit uint32
bytes uint64
bytesDuplicate uint64
bytesPadding uint64
packetsDuplicate uint32
packetsPadding uint32
packetsOutOfOrder uint32
packetsLost uint32
frames uint32
bytes uint64
bytesDuplicate uint64
bytesPadding uint64
packetsDuplicate uint32
packetsPadding uint32
jitter float64
maxJitter float64
packetsOutOfOrder uint32
packetsLost uint32
isPacketsLostOverridden bool
packetsLostOverridden uint32
frames uint32
jitter float64
maxJitter float64
isJitterOverridden bool
jitterOverridden float64
maxJitterOverridden float64
seenSNs [NumSequenceNumbers / 64]uint64
gapHistogram [GapHistogramNumBins]uint32
@@ -76,13 +105,18 @@ type RTPStats struct {
maxRtt uint32
rtpSR uint32
ntpSR buffer.NtpTime
ntpSR NtpTime
arrivalSR int64
nextSnapshotId uint32
snapshots map[uint32]*Snapshot
}
func NewRTPStats(params RTPStatsParams) *RTPStats {
return &RTPStats{
params: params,
params: params,
nextSnapshotId: 1,
snapshots: make(map[uint32]*Snapshot),
}
}
@@ -93,10 +127,23 @@ func (r *RTPStats) Stop() {
r.endTime = time.Now()
}
func (r *RTPStats) Update(rtp *rtp.Packet, packetTime int64) (isHighestSN bool, hasLoss bool, lossStartInclusive uint16, lossEndExclusive uint16) {
// RAJA-TODO-START
// 1. Padding packet stats
// RAJA-TODO-END
func (r *RTPStats) NewSnapshotId() uint32 {
r.lock.Lock()
defer r.lock.Unlock()
id := r.nextSnapshotId
r.nextSnapshotId++
return id
}
func (r *RTPStats) IsActive() bool {
r.lock.RLock()
defer r.lock.RUnlock()
return r.initialized && r.endTime.IsZero()
}
func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, packetTime int64) (flowState RTPFlowState) {
r.lock.Lock()
defer r.lock.Unlock()
@@ -109,20 +156,24 @@ func (r *RTPStats) Update(rtp *rtp.Packet, packetTime int64) (isHighestSN bool,
r.startTime = time.Now()
r.highestSN = rtp.SequenceNumber - 1
r.highestTS = rtp.Timestamp
r.highestSN = rtph.SequenceNumber - 1
r.highestTS = rtph.Timestamp
r.highestTime = packetTime
r.extStartSN = uint32(rtp.SequenceNumber)
r.extStartSN = uint32(rtph.SequenceNumber)
r.cycles = 0
}
pktSize := uint64(rtph.MarshalSize() + payloadSize + paddingSize)
if payloadSize == 0 {
r.packetsPadding++
r.bytesPadding += pktSize
} else {
r.bytes += pktSize
}
isDuplicate := false
pktSize := uint64(rtp.MarshalSize())
r.bytes += pktSize
diff := rtp.SequenceNumber - r.highestSN
diff := rtph.SequenceNumber - r.highestSN
switch {
// duplicate or out-of-order
case diff == 0 || diff > (1<<15):
@@ -131,13 +182,13 @@ func (r *RTPStats) Update(rtp *rtp.Packet, packetTime int64) (isHighestSN bool,
}
// adjust start to account for out-of-order packets before o cycle completes
if !r.isCycleCompleted() && (rtp.SequenceNumber-uint16(r.extStartSN) > (1 << 15)) {
if !r.isCycleCompleted() && (rtph.SequenceNumber-uint16(r.extStartSN) > (1 << 15)) {
// NOTE: current sequence number is counted as loss as it will be deducted in the duplicate check below
r.packetsLost += uint32(uint16(r.extStartSN) - rtp.SequenceNumber)
r.extStartSN = uint32(rtp.SequenceNumber)
r.packetsLost += uint32(uint16(r.extStartSN) - rtph.SequenceNumber)
r.extStartSN = uint32(rtph.SequenceNumber)
}
if r.isSeenSN(rtp.SequenceNumber) {
if r.isSeenSN(rtph.SequenceNumber) {
r.bytesDuplicate += pktSize
r.packetsDuplicate++
isDuplicate = true
@@ -147,57 +198,73 @@ func (r *RTPStats) Update(rtp *rtp.Packet, packetTime int64) (isHighestSN bool,
// in-order
default:
isHighestSN = true
flowState.IsHighestSN = true
if diff > 1 {
hasLoss = true
lossStartInclusive = r.highestSN + 1
lossEndExclusive = rtp.SequenceNumber
flowState.HasLoss = true
flowState.LossStartInclusive = r.highestSN + 1
flowState.LossEndExclusive = rtph.SequenceNumber
}
// update gap histogram
r.updateGapHistogram(int(diff))
// update missing sequence numbers
for lost := r.highestSN + 1; lost != rtp.SequenceNumber; lost++ {
for lost := r.highestSN + 1; lost != rtph.SequenceNumber; lost++ {
r.clearSeenSN(lost)
}
r.packetsLost += uint32(diff - 1)
if rtp.SequenceNumber < r.highestSN {
if rtph.SequenceNumber < r.highestSN {
r.cycles++
}
r.highestSN = rtp.SequenceNumber
r.highestTS = rtp.Timestamp
r.highestSN = rtph.SequenceNumber
r.highestTS = rtph.Timestamp
r.highestTime = packetTime
if rtp.Marker {
if rtph.Marker {
r.frames++
}
}
// set current sequence number in seen list
r.setSeenSN(rtp.SequenceNumber)
r.setSeenSN(rtph.SequenceNumber)
if !isDuplicate {
r.updateJitter(rtp, packetTime)
r.updateJitter(rtph, packetTime)
}
return
}
func (r *RTPStats) UpdateNack(nackCount int, nackMissCount int) {
r.lock.Lock()
defer r.lock.Unlock()
func (r *RTPStats) GetTotalPackets() uint32 {
r.lock.RLock()
defer r.lock.RUnlock()
if !r.endTime.IsZero() {
return
}
r.nacks += uint32(nackCount)
r.nackMisses += uint32(nackMissCount)
return r.getNumPacketsSeen() + r.packetsDuplicate + r.packetsPadding
}
func (r *RTPStats) UpdatePli() {
func (r *RTPStats) GetTotalPacketsSansDuplicate() uint32 {
r.lock.RLock()
defer r.lock.RUnlock()
return r.getNumPacketsSeen() + r.packetsPadding
}
func (r *RTPStats) GetTotalBytes() uint64 {
r.lock.RLock()
defer r.lock.RUnlock()
return r.bytes + r.bytesDuplicate + r.bytesPadding
}
func (r *RTPStats) GetTotalBytesSansDuplicate() uint64 {
r.lock.RLock()
defer r.lock.RUnlock()
return r.bytes + r.bytesPadding
}
func (r *RTPStats) UpdatePacketsLost(packetsLost uint32) {
r.lock.Lock()
defer r.lock.Unlock()
@@ -205,10 +272,123 @@ func (r *RTPStats) UpdatePli() {
return
}
r.plis++
r.isPacketsLostOverridden = true
r.packetsLostOverridden = packetsLost
}
func (r *RTPStats) UpdateJitter(jitter float64) {
r.lock.Lock()
defer r.lock.Unlock()
if !r.endTime.IsZero() {
return
}
r.isJitterOverridden = true
r.jitterOverridden = jitter
if jitter > r.maxJitterOverridden {
r.maxJitterOverridden = jitter
}
for _, s := range r.snapshots {
s.isJitterOverridden = true
if jitter > s.maxJitterOverridden {
s.maxJitterOverridden = jitter
}
}
}
func (r *RTPStats) UpdateNackAndMiss(nackCount uint32, nackMissCount uint32) {
r.lock.Lock()
defer r.lock.Unlock()
if !r.endTime.IsZero() {
return
}
r.updateNackLocked(nackCount)
r.updateNackMissLocked(nackMissCount)
}
func (r *RTPStats) UpdateNack(nackCount uint32) {
r.lock.Lock()
defer r.lock.Unlock()
if !r.endTime.IsZero() {
return
}
r.updateNackLocked(nackCount)
}
func (r *RTPStats) updateNackLocked(nackCount uint32) {
r.nacks += nackCount
}
func (r *RTPStats) UpdateNackMiss(nackMissCount uint32) {
r.lock.Lock()
defer r.lock.Unlock()
if !r.endTime.IsZero() {
return
}
r.updateNackMissLocked(nackMissCount)
}
func (r *RTPStats) updateNackMissLocked(nackMissCount uint32) {
r.nackMisses += nackMissCount
}
func (r *RTPStats) UpdatePliAndTime(pliCount uint32) {
r.lock.Lock()
defer r.lock.Unlock()
if !r.endTime.IsZero() {
return
}
r.updatePliLocked(pliCount)
r.updatePliTimeLocked()
}
func (r *RTPStats) UpdatePli(pliCount uint32) {
r.lock.Lock()
defer r.lock.Unlock()
if !r.endTime.IsZero() {
return
}
r.updatePliLocked(pliCount)
}
func (r *RTPStats) updatePliLocked(pliCount uint32) {
r.plis += pliCount
}
func (r *RTPStats) UpdatePliTime() {
r.lock.Lock()
defer r.lock.Unlock()
if !r.endTime.IsZero() {
return
}
r.updatePliTimeLocked()
}
func (r *RTPStats) updatePliTimeLocked() {
r.lastPli = time.Now()
}
func (r *RTPStats) LastPli() time.Time {
r.lock.RLock()
defer r.lock.RUnlock()
return r.lastPli
}
func (r *RTPStats) TimeSinceLastPli() int64 {
r.lock.RLock()
defer r.lock.RUnlock()
@@ -216,7 +396,18 @@ func (r *RTPStats) TimeSinceLastPli() int64 {
return time.Now().UnixNano() - r.lastPli.UnixNano()
}
func (r *RTPStats) UpdateFir() {
func (r *RTPStats) UpdateFir(firCount uint32) {
r.lock.Lock()
defer r.lock.Unlock()
if !r.endTime.IsZero() {
return
}
r.firs += firCount
}
func (r *RTPStats) UpdateFirTime() {
r.lock.Lock()
defer r.lock.Unlock()
@@ -224,7 +415,6 @@ func (r *RTPStats) UpdateFir() {
return
}
r.firs++
r.lastFir = time.Now()
}
@@ -240,9 +430,22 @@ func (r *RTPStats) UpdateRtt(rtt uint32) {
if r.rtt > r.maxRtt {
r.maxRtt = r.rtt
}
for _, s := range r.snapshots {
if rtt > s.maxRtt {
s.maxRtt = rtt
}
}
}
func (r *RTPStats) SetRtcpSenderReportData(rtpTS uint32, ntpTS buffer.NtpTime, arrival time.Time) {
func (r *RTPStats) GetRtt() uint32 {
r.lock.RLock()
defer r.lock.RUnlock()
return r.rtt
}
func (r *RTPStats) SetRtcpSenderReportData(rtpTS uint32, ntpTS NtpTime, arrival time.Time) {
r.lock.Lock()
defer r.lock.Unlock()
@@ -251,29 +454,6 @@ func (r *RTPStats) SetRtcpSenderReportData(rtpTS uint32, ntpTS buffer.NtpTime, a
r.arrivalSR = arrival.UnixNano()
}
// RAJA-REMOVE-START
func (r *RTPStats) GetRtcpSenderReportData() (rtpTS uint32, ntpTS buffer.NtpTime) {
r.lock.RLock()
defer r.lock.RUnlock()
now := time.Now()
ntpTS = buffer.ToNtpTime(now)
ntpSR := r.highestTime
if r.ntpSR != 0 {
ntpSR = r.ntpSR.Time().UnixNano()
}
rtpTS = r.highestTS
if r.rtpSR != 0 {
rtpTS = r.rtpSR
}
rtpTS += uint32((now.UnixNano() - ntpSR) * int64(r.params.ClockRate) / 1e9)
return
}
// RAJA-REMOVE-END
func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) *rtcp.SenderReport {
r.lock.RLock()
defer r.lock.RUnlock()
@@ -283,7 +463,7 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) *rtcp.SenderReport {
}
now := time.Now()
nowNTP := buffer.ToNtpTime(now)
nowNTP := ToNtpTime(now)
nowRTP := r.highestTS + uint32((now.UnixNano()-r.highestTime)*int64(r.params.ClockRate)/1e9)
return &rtcp.SenderReport{
@@ -295,24 +475,24 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) *rtcp.SenderReport {
}
}
func (r *RTPStats) GetRtcpReceptionReport(ssrc uint32, proxyFracLost uint8) *rtcp.ReceptionReport {
// RAJA-TODO-START
// 1. Have to use snapshot or from beginning
// 2. Set up next snapshot
// RAJA-TODO-END
r.lock.RLock()
defer r.lock.RUnlock()
func (r *RTPStats) SnapshotRtcpReceptionReport(ssrc uint32, proxyFracLost uint8, snapshotId uint32) *rtcp.ReceptionReport {
r.lock.Lock()
snapshot := r.getAndResetSnapshot(snapshotId)
r.lock.Unlock()
if !r.initialized {
if snapshot == nil {
return nil
}
r.lock.RLock()
defer r.lock.RUnlock()
extHighestSN := r.getExtHighestSN()
packetsExpected := extHighestSN - r.extStartSN + 1
packetsExpected := extHighestSN - snapshot.extStartSN + 1
if packetsExpected > NumSequenceNumbers {
logger.Warnw(
"too many packets expected in receiver report",
fmt.Errorf("start: %d, end: %d, expected: %d", r.extStartSN, extHighestSN, packetsExpected),
fmt.Errorf("start: %d, end: %d, expected: %d", snapshot.extStartSN, extHighestSN, packetsExpected),
)
return nil
}
@@ -320,7 +500,7 @@ func (r *RTPStats) GetRtcpReceptionReport(ssrc uint32, proxyFracLost uint8) *rtc
return nil
}
packetsLost := r.numMissingSNs(uint16(r.extStartSN), uint16(extHighestSN))
packetsLost := r.numMissingSNs(uint16(snapshot.extStartSN), uint16(extHighestSN))
lossRate := float32(packetsLost) / float32(packetsExpected)
fracLost := uint8(lossRate * 256.0)
if proxyFracLost > fracLost {
@@ -334,17 +514,63 @@ func (r *RTPStats) GetRtcpReceptionReport(ssrc uint32, proxyFracLost uint8) *rtc
dlsr |= (delayMS % 1e3) * 65536 / 1000
}
jitter := r.jitter
if r.isJitterOverridden {
jitter = r.jitterOverridden
}
return &rtcp.ReceptionReport{
SSRC: ssrc,
FractionLost: fracLost,
TotalLost: r.packetsLost,
LastSequenceNumber: extHighestSN,
Jitter: uint32(r.jitter),
Jitter: uint32(jitter),
LastSenderReport: uint32(r.ntpSR >> 16),
Delay: dlsr,
}
}
func (r *RTPStats) SnapshotInfo(snapshotId uint32) *RTPSnapshotInfo {
r.lock.Lock()
snapshot := r.getAndResetSnapshot(snapshotId)
r.lock.Unlock()
if snapshot == nil {
return nil
}
r.lock.RLock()
defer r.lock.RUnlock()
extHighestSN := r.getExtHighestSN()
packetsExpected := extHighestSN - snapshot.extStartSN + 1
if packetsExpected > NumSequenceNumbers {
logger.Warnw(
"too many packets expected in loss percentage",
fmt.Errorf("start: %d, end: %d, expected: %d", snapshot.extStartSN, extHighestSN, packetsExpected),
)
return nil
}
if packetsExpected == 0 {
return nil
}
packetsLost := r.numMissingSNs(uint16(snapshot.extStartSN), uint16(extHighestSN))
maxJitter := snapshot.maxJitter
if snapshot.isJitterOverridden {
maxJitter = snapshot.maxJitterOverridden
}
maxJitterTime := maxJitter / float64(r.params.ClockRate) * 1e6
return &RTPSnapshotInfo{
PacketsExpected: packetsExpected,
PacketsLost: packetsLost,
MaxJitter: maxJitterTime,
MaxRtt: snapshot.maxRtt,
}
}
func (r *RTPStats) ToString() string {
p := r.ToProto()
if p == nil {
@@ -370,7 +596,13 @@ func (r *RTPStats) ToString() string {
str += fmt.Sprintf(", o: %d", p.PacketsOutOfOrder)
str += fmt.Sprintf(", c: %d, j: %d(%.1fus)|%d(%.1fus)", r.params.ClockRate, uint32(r.jitter), p.JitterCurrent, uint32(r.maxJitter), p.JitterMax)
jitter := r.jitter
maxJitter := r.maxJitter
if r.isJitterOverridden {
jitter = r.jitterOverridden
maxJitter = r.maxJitterOverridden
}
str += fmt.Sprintf(", c: %d, j: %d(%.1fus)|%d(%.1fus)", r.params.ClockRate, uint32(jitter), p.JitterCurrent, uint32(maxJitter), p.JitterMax)
if len(p.GapHistogram) != 0 {
first := true
@@ -395,7 +627,7 @@ func (r *RTPStats) ToString() string {
str += fmt.Sprintf("%d|%+v", p.Firs, p.LastFir.AsTime().Format(time.UnixDate))
str += ", rtt(ms):"
str += fmt.Sprintf("%d|%+d", p.RttCurrent, p.RttMax)
str += fmt.Sprintf("%d|%d", p.RttCurrent, p.RttMax)
return str
}
@@ -428,8 +660,12 @@ func (r *RTPStats) ToProto() *livekit.RTPStats {
frameRate := float64(r.frames) / elapsed
packetLostRate := float64(r.packetsLost) / elapsed
packetLostPercentage := float32(r.packetsLost) / float32(packetsExpected) * 100.0
packetsLost := r.packetsLost
if r.isPacketsLostOverridden {
packetsLost = r.packetsLostOverridden
}
packetLostRate := float64(packetsLost) / elapsed
packetLostPercentage := float32(packetsLost) / float32(packetsExpected) * 100.0
packetDuplicateRate := float64(r.packetsDuplicate) / elapsed
bitrateDuplicate := float64(r.bytesDuplicate) * 8.0 / elapsed
@@ -437,8 +673,14 @@ func (r *RTPStats) ToProto() *livekit.RTPStats {
packetPaddingRate := float64(r.packetsPadding) / elapsed
bitratePadding := float64(r.bytesPadding) * 8.0 / elapsed
jitterTime := r.jitter / float64(r.params.ClockRate) * 1e6
maxJitterTime := r.maxJitter / float64(r.params.ClockRate) * 1e6
jitter := r.jitter
maxJitter := r.maxJitter
if r.isJitterOverridden {
jitter = r.jitterOverridden
maxJitter = r.maxJitterOverridden
}
jitterTime := jitter / float64(r.params.ClockRate) * 1e6
maxJitterTime := maxJitter / float64(r.params.ClockRate) * 1e6
p := &livekit.RTPStats{
StartTime: timestamppb.New(r.startTime),
@@ -537,7 +779,8 @@ func (r *RTPStats) numMissingSNs(startInclusive uint16, endInclusive uint16) uin
seen := uint32(0)
idx := startIdx
for idx != endIdx+1 {
loopEnd := (endIdx + 1) % uint16(len(r.seenSNs))
for idx != loopEnd {
mask := uint64((1 << 64) - 1)
if idx == startIdx {
mask &^= uint64((1 << startRem) - 1)
@@ -548,18 +791,15 @@ func (r *RTPStats) numMissingSNs(startInclusive uint16, endInclusive uint16) uin
seen += uint32(bits.OnesCount64(r.seenSNs[idx] & mask))
idx++
if idx == uint16(len(r.seenSNs)) {
idx = 0
}
idx = (idx + 1) % uint16(len(r.seenSNs))
}
return uint32(endInclusive-startInclusive+1) - seen
}
func (r *RTPStats) updateJitter(rtp *rtp.Packet, packetTime int64) {
func (r *RTPStats) updateJitter(rtph *rtp.Header, packetTime int64) {
packetTimeRTP := uint32(packetTime / 1e6 * int64(r.params.ClockRate/1e3))
transit := packetTimeRTP - rtp.Timestamp
transit := packetTimeRTP - rtph.Timestamp
if r.lastTransit != 0 {
d := int32(transit - r.lastTransit)
@@ -570,6 +810,12 @@ func (r *RTPStats) updateJitter(rtp *rtp.Packet, packetTime int64) {
if r.jitter > r.maxJitter {
r.maxJitter = r.jitter
}
for _, s := range r.snapshots {
if r.jitter > s.maxJitter {
r.maxJitter = r.jitter
}
}
}
r.lastTransit = transit
@@ -588,6 +834,30 @@ func (r *RTPStats) updateGapHistogram(gap int) {
}
}
func (r *RTPStats) getAndResetSnapshot(snapshotId uint32) *Snapshot {
if !r.initialized {
return nil
}
snapshot := r.snapshots[snapshotId]
if snapshot == nil {
snapshot = &Snapshot{
extStartSN: r.extStartSN,
maxJitter: 0.0,
maxRtt: 0,
}
r.snapshots[snapshotId] = snapshot
}
toReturn := *snapshot
snapshot.extStartSN = r.getExtHighestSN() + 1
snapshot.maxJitter = 0.0
snapshot.maxRtt = 0
return &toReturn
}
// ----------------------------------
func AggregateRTPStats(statses []*livekit.RTPStats) *livekit.RTPStats {
@@ -1,4 +1,4 @@
package rtc
package buffer
import (
"fmt"
@@ -42,7 +42,8 @@ func TestRTPStats(t *testing.T) {
for now.Sub(startTime) < totalDuration {
timestamp += uint32(now.Sub(lastFrameTime).Seconds() * float64(clockRate))
for i := 0; i < packetsPerFrame; i++ {
r.Update(getPacket(sequenceNumber, timestamp, packetSize), time.Now().UnixNano())
packet := getPacket(sequenceNumber, timestamp, packetSize)
r.Update(&packet.Header, len(packet.Payload), 0, time.Now().UnixNano())
if (sequenceNumber % 100) == 0 {
jump := uint16(rand.Float64() * 120.0)
sequenceNumber += jump
@@ -68,9 +69,10 @@ func TestRTPStats_Update(t *testing.T) {
sequenceNumber := uint16(rand.Float64() * float64(1<<16))
timestamp := uint32(rand.Float64() * float64(1<<32))
isHighest, hasLoss, _, _ := r.Update(getPacket(sequenceNumber, timestamp, 1000), time.Now().UnixNano())
require.True(t, isHighest)
require.False(t, hasLoss)
packet := getPacket(sequenceNumber, timestamp, 1000)
flowState := r.Update(&packet.Header, len(packet.Payload), 0, time.Now().UnixNano())
require.True(t, flowState.IsHighestSN)
require.False(t, flowState.HasLoss)
require.True(t, r.initialized)
require.Equal(t, sequenceNumber, r.highestSN)
require.Equal(t, timestamp, r.highestTS)
@@ -78,25 +80,28 @@ func TestRTPStats_Update(t *testing.T) {
// in-order, no loss
sequenceNumber++
timestamp += 3000
isHighest, hasLoss, _, _ = r.Update(getPacket(sequenceNumber, timestamp, 1000), time.Now().UnixNano())
require.True(t, isHighest)
require.False(t, hasLoss)
packet = getPacket(sequenceNumber, timestamp, 1000)
flowState = r.Update(&packet.Header, len(packet.Payload), 0, time.Now().UnixNano())
require.True(t, flowState.IsHighestSN)
require.False(t, flowState.HasLoss)
require.Equal(t, sequenceNumber, r.highestSN)
require.Equal(t, timestamp, r.highestTS)
// out-of-order
isHighest, hasLoss, _, _ = r.Update(getPacket(sequenceNumber-10, timestamp-30000, 1000), time.Now().UnixNano())
require.False(t, isHighest)
require.False(t, hasLoss)
packet = getPacket(sequenceNumber-10, timestamp-30000, 1000)
flowState = r.Update(&packet.Header, len(packet.Payload), 0, time.Now().UnixNano())
require.False(t, flowState.IsHighestSN)
require.False(t, flowState.HasLoss)
require.Equal(t, sequenceNumber, r.highestSN)
require.Equal(t, timestamp, r.highestTS)
require.Equal(t, uint32(1), r.packetsOutOfOrder)
require.Equal(t, uint32(0), r.packetsDuplicate)
// duplicate
isHighest, hasLoss, _, _ = r.Update(getPacket(sequenceNumber-10, timestamp-30000, 1000), time.Now().UnixNano())
require.False(t, isHighest)
require.False(t, hasLoss)
packet = getPacket(sequenceNumber-10, timestamp-30000, 1000)
flowState = r.Update(&packet.Header, len(packet.Payload), 0, time.Now().UnixNano())
require.False(t, flowState.IsHighestSN)
require.False(t, flowState.HasLoss)
require.Equal(t, sequenceNumber, r.highestSN)
require.Equal(t, timestamp, r.highestTS)
require.Equal(t, uint32(2), r.packetsOutOfOrder)
@@ -105,17 +110,19 @@ func TestRTPStats_Update(t *testing.T) {
// loss
sequenceNumber += 10
timestamp += 30000
isHighest, hasLoss, lossStartInclusive, lossEndExclusive := r.Update(getPacket(sequenceNumber, timestamp, 1000), time.Now().UnixNano())
require.True(t, isHighest)
require.True(t, hasLoss)
require.Equal(t, lossStartInclusive, sequenceNumber-9)
require.Equal(t, lossEndExclusive, sequenceNumber)
packet = getPacket(sequenceNumber, timestamp, 1000)
flowState = r.Update(&packet.Header, len(packet.Payload), 0, time.Now().UnixNano())
require.True(t, flowState.IsHighestSN)
require.True(t, flowState.HasLoss)
require.Equal(t, sequenceNumber-9, flowState.LossStartInclusive)
require.Equal(t, sequenceNumber, flowState.LossEndExclusive)
require.Equal(t, uint32(17), r.packetsLost)
// out-of-order should decrement number of lost packets
isHighest, hasLoss, _, _ = r.Update(getPacket(sequenceNumber-15, timestamp-45000, 1000), time.Now().UnixNano())
require.False(t, isHighest)
require.False(t, hasLoss)
packet = getPacket(sequenceNumber-15, timestamp-45000, 1000)
flowState = r.Update(&packet.Header, len(packet.Payload), 0, time.Now().UnixNano())
require.False(t, flowState.IsHighestSN)
require.False(t, flowState.HasLoss)
require.Equal(t, sequenceNumber, r.highestSN)
require.Equal(t, timestamp, r.highestTS)
require.Equal(t, uint32(3), r.packetsOutOfOrder)
+9 -18
View File
@@ -1,29 +1,20 @@
package buffer
import "github.com/livekit/protocol/livekit"
type LayerStats struct {
TotalPackets uint32
TotalBytes uint64
TotalFrames uint32
}
type StreamStats struct {
TotalPrimaryPackets uint32
TotalPrimaryBytes uint64
TotalRetransmitPackets uint32
TotalRetransmitBytes uint64
TotalPaddingPackets uint32
TotalPaddingBytes uint64
TotalPacketsLost uint32
TotalFrames uint32
RTT uint32
Jitter float64
TotalNACKs uint32
TotalPLIs uint32
TotalFIRs uint32
LostRate float32
type StreamStatsWithLayers struct {
RTPStats *livekit.RTPStats
Layers map[int]LayerStats
}
type StreamStatsWithLayers struct {
StreamStats StreamStats
Layers map[int]LayerStats
type ConnectionQualityParams struct {
LossPercentage float32
Jitter float32
Rtt uint32
}
+26 -104
View File
@@ -15,20 +15,11 @@ const (
connectionQualityUpdateInterval = 5 * time.Second
)
type qualityWindow struct {
startSeqNum uint32
endSeqNum uint32
startPacketsLost uint32
endPacketsLost uint32
maxRTT uint32
maxJitter uint32
}
type ConnectionStatsParams struct {
UpdateInterval time.Duration
CodecType webrtc.RTPCodecType
ClockRate uint32
GetTrackStats func() map[uint32]*buffer.StreamStatsWithLayers
GetQualityParams func() *buffer.ConnectionQualityParams
GetIsReducedQuality func() bool
Logger logger.Logger
}
@@ -38,9 +29,8 @@ type ConnectionStats struct {
onStatsUpdate func(cs *ConnectionStats, stat *livekit.AnalyticsStat)
lock sync.RWMutex
score float32
qualityWindows map[uint32]*qualityWindow
lock sync.RWMutex
score float32
done chan struct{}
isClosed atomic.Bool
@@ -48,10 +38,9 @@ type ConnectionStats struct {
func NewConnectionStats(params ConnectionStatsParams) *ConnectionStats {
return &ConnectionStats{
params: params,
score: 4.0,
qualityWindows: make(map[uint32]*qualityWindow),
done: make(chan struct{}),
params: params,
score: 4.0,
done: make(chan struct{}),
}
}
@@ -78,80 +67,23 @@ func (cs *ConnectionStats) GetScore() float32 {
return cs.score
}
func (cs *ConnectionStats) UpdateWindow(ssrc uint32, extHighestSeqNum uint32, packetsLost uint32, rtt uint32, jitter uint32) {
if cs.isClosed.Load() {
return
}
cs.lock.Lock()
defer cs.lock.Unlock()
qw := cs.qualityWindows[ssrc]
if qw == nil {
qw = &qualityWindow{}
cs.qualityWindows[ssrc] = qw
}
if qw.startSeqNum == 0 {
qw.startSeqNum = extHighestSeqNum
qw.startPacketsLost = packetsLost
}
if extHighestSeqNum > qw.endSeqNum {
qw.endSeqNum = extHighestSeqNum
qw.endPacketsLost = packetsLost
}
if rtt > qw.maxRTT {
qw.maxRTT = rtt
}
if jitter > qw.maxJitter {
qw.maxJitter = jitter
}
}
func (cs *ConnectionStats) updateScore() float32 {
cs.lock.Lock()
defer cs.lock.Unlock()
expectedPacketsInInterval := uint32(0)
lostPacketsInInterval := uint32(0)
maxRTT := uint32(0)
maxJitter := uint32(0)
for _, qw := range cs.qualityWindows {
expectedPacketsInInterval += qw.endSeqNum - qw.startSeqNum + 1
lostPacketsInInterval += qw.endPacketsLost - qw.startPacketsLost
if qw.maxRTT > maxRTT {
maxRTT = qw.maxRTT
}
if qw.maxJitter > maxJitter {
maxJitter = qw.maxJitter
}
qw.startSeqNum = qw.endSeqNum
qw.startPacketsLost = qw.endPacketsLost
qw.maxRTT = 0
qw.maxJitter = 0
}
pctLoss := float32(0.0)
if int32(lostPacketsInInterval) < 0 {
lostPacketsInInterval = 0
}
if expectedPacketsInInterval > 0 {
pctLoss = (float32(lostPacketsInInterval) / float32(expectedPacketsInInterval)) * 100.0
s := cs.params.GetQualityParams()
if s == nil {
return cs.score
}
if cs.params.CodecType == webrtc.RTPCodecTypeAudio {
// covert jitter (in media samples units) to milliseconds
cs.score = AudioConnectionScore(pctLoss, maxRTT, float32(maxJitter)*1000.0/float32(cs.params.ClockRate))
cs.score = AudioConnectionScore(s.LossPercentage, s.Rtt, s.Jitter)
} else {
isReducedQuality := false
if cs.params.GetIsReducedQuality != nil {
isReducedQuality = cs.params.GetIsReducedQuality()
}
cs.score = VideoConnectionScore(pctLoss, isReducedQuality)
cs.score = VideoConnectionScore(s.LossPercentage, isReducedQuality)
}
return cs.score
@@ -169,15 +101,7 @@ func (cs *ConnectionStats) getStat() *livekit.AnalyticsStat {
analyticsStreams := make([]*livekit.AnalyticsStream, 0, len(streams))
for ssrc, stream := range streams {
maxRTT := stream.StreamStats.RTT
maxJitter := uint32(stream.StreamStats.Jitter)
if qw := cs.qualityWindows[ssrc]; qw != nil {
maxRTT = qw.maxRTT
maxJitter = qw.maxJitter
}
as := ToAnalyticsStream(ssrc, &stream.StreamStats, maxRTT, maxJitter, cs.params.ClockRate)
as := ToAnalyticsStream(ssrc, stream.RTPStats)
//
// add video layer if either
@@ -225,24 +149,22 @@ func (cs *ConnectionStats) updateStats() {
}
}
func ToAnalyticsStream(ssrc uint32, streamStats *buffer.StreamStats, maxRTT uint32, maxJitter uint32, clockRate uint32) *livekit.AnalyticsStream {
// convert jitter (from number of media samples to microseconds
jitter := uint32((float32(maxJitter) * 1e6) / float32(clockRate))
func ToAnalyticsStream(ssrc uint32, rtpStats *livekit.RTPStats) *livekit.AnalyticsStream {
return &livekit.AnalyticsStream{
Ssrc: ssrc,
TotalPrimaryPackets: streamStats.TotalPrimaryPackets,
TotalPrimaryBytes: streamStats.TotalPrimaryBytes,
TotalRetransmitPackets: streamStats.TotalRetransmitPackets,
TotalRetransmitBytes: streamStats.TotalRetransmitBytes,
TotalPaddingPackets: streamStats.TotalPaddingPackets,
TotalPaddingBytes: streamStats.TotalPaddingBytes,
TotalPacketsLost: streamStats.TotalPacketsLost,
TotalFrames: streamStats.TotalFrames,
Rtt: maxRTT,
Jitter: jitter,
TotalNacks: streamStats.TotalNACKs,
TotalPlis: streamStats.TotalPLIs,
TotalFirs: streamStats.TotalFIRs,
TotalPrimaryPackets: rtpStats.Packets,
TotalPrimaryBytes: rtpStats.Bytes,
TotalRetransmitPackets: rtpStats.PacketsDuplicate,
TotalRetransmitBytes: rtpStats.BytesDuplicate,
TotalPaddingPackets: rtpStats.PacketsPadding,
TotalPaddingBytes: rtpStats.BytesPadding,
TotalPacketsLost: rtpStats.PacketsLost,
TotalFrames: rtpStats.Frames,
Rtt: rtpStats.RttMax,
Jitter: uint32(rtpStats.JitterMax),
TotalNacks: rtpStats.Nacks,
TotalPlis: rtpStats.Plis,
TotalFirs: rtpStats.Firs,
}
}
+94 -128
View File
@@ -42,6 +42,8 @@ const (
firstKeyFramePLIInterval = 500 * time.Millisecond
FlagStopRTXOnPLI = true
bitrateReportDelta = 1
)
var (
@@ -98,19 +100,18 @@ type DownTrack struct {
listenerLock sync.RWMutex
closeOnce sync.Once
statsLock sync.RWMutex
stats buffer.StreamStats
rtpStats *buffer.RTPStats
totalRepeatedNACKs uint32
connectionStats *connectionquality.ConnectionStats
connectionStats *connectionquality.ConnectionStats
connectionQualitySnapshotId uint32
statsLock sync.RWMutex
bitrateHelper uint64
bitrate uint64
lastBitrateReport time.Time
// Debug info
lastPli atomic.Time
lastRTP atomic.Time
pktsDropped atomic.Uint32
isNACKThrottled atomic.Bool
@@ -183,9 +184,9 @@ func NewDownTrack(
}
d.connectionStats = connectionquality.NewConnectionStats(connectionquality.ConnectionStatsParams{
CodecType: kind,
ClockRate: c.ClockRate,
GetTrackStats: d.GetTrackStats,
CodecType: kind,
GetTrackStats: d.GetTrackStats,
GetQualityParams: d.getQualityParams,
GetIsReducedQuality: func() bool {
return d.GetForwardingStatus() != ForwardingStatusOptimal
},
@@ -199,6 +200,11 @@ func NewDownTrack(
}
})
d.rtpStats = buffer.NewRTPStats(buffer.RTPStatsParams{
ClockRate: d.codec.ClockRate,
})
d.connectionQualitySnapshotId = d.rtpStats.NewSnapshotId()
return d, nil
}
@@ -322,12 +328,11 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error {
return nil
}
d.lastRTP.Store(time.Now())
tp, err := d.forwarder.GetTranslationParams(extPkt, layer)
if tp.shouldSendPLI {
d.lastPli.Store(time.Now())
d.receiver.SendPLI(layer)
d.rtpStats.UpdatePli(1)
d.rtpStats.UpdatePliTime()
}
if tp.shouldDrop {
if tp.isDroppingRelevant {
@@ -378,10 +383,11 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error {
})
}
d.updatePrimaryStats(pktSize, hdr.Marker)
if extPkt.KeyFrame {
d.isNACKThrottled.Store(false)
}
d.rtpStats.Update(hdr, len(payload), 0, time.Now().UnixNano())
d.updateBitrate()
} else {
d.logger.Errorw("writing rtp packet err", err)
@@ -394,12 +400,9 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error {
// WritePaddingRTP tries to write as many padding only RTP packets as necessary
// to satisfy given size to the DownTrack
func (d *DownTrack) WritePaddingRTP(bytesToSend int) int {
d.statsLock.RLock()
if d.stats.TotalPrimaryPackets == 0 {
d.statsLock.RUnlock()
if !d.rtpStats.IsActive() {
return 0
}
d.statsLock.RUnlock()
// LK-TODO-START
// Ideally should look at header extensions negotiated for
@@ -469,10 +472,10 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int) int {
}
size := hdr.MarshalSize() + len(payload)
d.updatePaddingStats(size)
for _, f := range d.onPaddingSentUnsafe {
f(d, size)
}
d.rtpStats.Update(&hdr, 0, len(payload), time.Now().UnixNano())
d.updateBitrate()
//
@@ -491,22 +494,29 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int) int {
}
func (d *DownTrack) updateBitrate() {
lastRtp := d.lastRTP.Load()
now := time.Now()
d.statsLock.RLock()
timeDiff := lastRtp.Sub(d.lastBitrateReport).Seconds()
d.statsLock.RUnlock()
if timeDiff < 1 {
timeDiff := now.Sub(d.lastBitrateReport).Seconds()
if timeDiff < bitrateReportDelta {
d.statsLock.RUnlock()
return
}
octets, _ := d.getSRStats()
d.statsLock.RUnlock()
totalBytes := d.rtpStats.GetTotalBytes()
d.statsLock.Lock()
d.bitrate = uint64(float64(octets*8-d.bitrateHelper) / timeDiff)
d.bitrateHelper = octets * 8
d.lastBitrateReport = lastRtp
d.bitrate = uint64(float64(totalBytes*8-d.bitrateHelper) / timeDiff)
d.bitrateHelper = totalBytes * 8
d.lastBitrateReport = now
d.statsLock.Unlock()
}
func (d *DownTrack) Bitrate() uint64 {
d.statsLock.RLock()
defer d.statsLock.RUnlock()
return d.bitrate
}
@@ -568,6 +578,8 @@ func (d *DownTrack) CloseWithFlush(flush bool) {
d.receiver.DeleteDownTrack(d.peerID)
d.connectionStats.Close()
d.rtpStats.Stop()
d.logger.Debugw("rtp stats", "stats", d.rtpStats.ToString())
if d.onMaxLayerChanged != nil && d.kind == webrtc.RTPCodecTypeVideo {
d.callbacksQueue.Enqueue(func() {
@@ -793,66 +805,14 @@ func (d *DownTrack) CreateSenderReport() *rtcp.SenderReport {
return nil
}
currentLayers := d.forwarder.CurrentLayers()
if currentLayers == InvalidLayers {
return nil
}
srRTP, srNTP := d.receiver.GetSenderReportTime(currentLayers.spatial)
if srRTP == 0 {
return nil
}
now := time.Now()
nowNTP := buffer.ToNtpTime(now)
diff := (uint64(now.Sub(srNTP.Time())) * uint64(d.codec.ClockRate)) / uint64(time.Second)
octets, packets := d.getSRStats()
return &rtcp.SenderReport{
SSRC: d.ssrc,
NTPTime: uint64(nowNTP),
RTPTime: srRTP + uint32(diff),
PacketCount: packets,
OctetCount: uint32(octets),
}
}
func (d *DownTrack) updatePrimaryStats(packetLen int, marker bool) {
d.statsLock.Lock()
defer d.statsLock.Unlock()
d.stats.TotalPrimaryPackets++
d.stats.TotalPrimaryBytes += uint64(packetLen)
if marker {
d.stats.TotalFrames++
}
}
func (d *DownTrack) updateRtxStats(packetLen int) {
d.statsLock.Lock()
defer d.statsLock.Unlock()
d.stats.TotalRetransmitPackets++
d.stats.TotalRetransmitBytes += uint64(packetLen)
}
func (d *DownTrack) updatePaddingStats(packetLen int) {
d.statsLock.Lock()
defer d.statsLock.Unlock()
d.stats.TotalPaddingPackets++
d.stats.TotalPaddingBytes += uint64(packetLen)
return d.rtpStats.GetRtcpSenderReport(d.ssrc)
}
func (d *DownTrack) writeBlankFrameRTP() error {
// don't send if nothing has been sent
d.statsLock.RLock()
if d.stats.TotalPrimaryPackets == 0 {
d.statsLock.RUnlock()
if !d.rtpStats.IsActive() {
return nil
}
d.statsLock.RUnlock()
// LK-TODO: Support other video codecs
if d.kind == webrtc.RTPCodecTypeAudio || (d.mime != "video/vp8" && d.mime != "video/h264") {
@@ -901,8 +861,6 @@ func (d *DownTrack) writeBlankFrameRTP() error {
f(d, pktSize)
}
d.updatePrimaryStats(pktSize, hdr.Marker)
// only the first frame will need frameEndNeeded to close out the
// previous picture, rest are small key frames
frameEndNeeded = false
@@ -928,6 +886,9 @@ func (d *DownTrack) writeVP8BlankFrame(hdr *rtp.Header, frameEndNeeded bool) (in
copy(payload[blankVP8.HeaderSize:], VP8KeyFrame8x8)
_, err = d.writeStream.WriteRTP(hdr, payload)
if err == nil {
d.rtpStats.Update(hdr, len(payload), 0, time.Now().UnixNano())
}
return hdr.MarshalSize() + len(payload), err
}
@@ -945,7 +906,11 @@ func (d *DownTrack) writeH264BlankFrame(hdr *rtp.Header, frameEndNeeded bool) (i
copy(buf[offset:offset+len(payload)], payload)
offset += len(payload)
}
_, err := d.writeStream.WriteRTP(hdr, buf[:offset])
payload := buf[:offset]
_, err := d.writeStream.WriteRTP(hdr, payload)
if err == nil {
d.rtpStats.Update(hdr, len(payload), 0, time.Now().UnixNano())
}
return hdr.MarshalSize() + offset, err
}
@@ -961,9 +926,9 @@ func (d *DownTrack) handleRTCP(bytes []byte) {
if pliOnce {
targetLayers := d.forwarder.TargetLayers()
if targetLayers != InvalidLayers {
d.lastPli.Store(time.Now())
d.receiver.SendPLI(targetLayers.spatial)
d.isNACKThrottled.Store(true)
d.rtpStats.UpdatePliTime()
pliOnce = false
}
}
@@ -1003,20 +968,15 @@ func (d *DownTrack) handleRTCP(bytes []byte) {
}
rr.Reports = append(rr.Reports, r)
d.statsLock.Lock()
d.stats.TotalPacketsLost = r.TotalLost
d.rtpStats.UpdatePacketsLost(r.TotalLost)
rtt := getRttMs(&r)
if rtt != d.stats.RTT {
if rtt != d.rtpStats.GetRtt() {
rttToReport = rtt
}
d.stats.RTT = rtt
d.rtpStats.UpdateRtt(rtt)
d.stats.Jitter = float64(r.Jitter)
d.stats.LostRate = float32(r.FractionLost) / 256
d.statsLock.Unlock()
d.connectionStats.UpdateWindow(r.SSRC, r.LastSequenceNumber, r.TotalLost, rtt, r.Jitter)
d.rtpStats.UpdateJitter(float64(r.Jitter))
}
if len(rr.Reports) > 0 {
d.listenerLock.RLock()
@@ -1044,11 +1004,9 @@ func (d *DownTrack) handleRTCP(bytes []byte) {
}
}
d.statsLock.Lock()
d.stats.TotalNACKs += numNACKs
d.stats.TotalPLIs += numPLIs
d.stats.TotalFIRs += numFIRs
d.statsLock.Unlock()
d.rtpStats.UpdateNack(numNACKs)
d.rtpStats.UpdatePli(numPLIs)
d.rtpStats.UpdateFir(numFIRs)
if rttToReport != 0 {
if d.sequencer != nil {
@@ -1089,6 +1047,7 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) {
defer PacketFactory.Put(src)
numRepeatedNACKs := uint32(0)
nackMisses := uint32(0)
for _, meta := range d.sequencer.getPacketsMeta(filtered) {
if meta.layer == int8(InvalidLayerSpatial) {
if meta.nacked > 1 {
@@ -1115,9 +1074,11 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) {
pktBuff := *src
n, err := d.receiver.ReadRTP(pktBuff, uint8(meta.layer), meta.sourceSeqNo)
if err != nil {
d.rtpStats.UpdateNackMiss(1)
if err == io.EOF {
break
}
nackMisses++
continue
}
var pkt rtp.Packet
@@ -1164,23 +1125,15 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) {
f(d, pktSize)
}
d.updateRtxStats(pktSize)
d.rtpStats.Update(&pkt.Header, len(payload), 0, time.Now().UnixNano())
}
}
d.statsLock.Lock()
d.totalRepeatedNACKs += numRepeatedNACKs
d.statsLock.Unlock()
}
func (d *DownTrack) getSRStats() (uint64, uint32) {
d.statsLock.RLock()
defer d.statsLock.RUnlock()
packets := d.stats.TotalPrimaryPackets + d.stats.TotalRetransmitPackets + d.stats.TotalPaddingPackets
octets := d.stats.TotalPrimaryBytes + d.stats.TotalRetransmitBytes + d.stats.TotalPaddingBytes
return octets, packets
d.rtpStats.UpdateNackMiss(nackMisses)
}
// writes RTP header extensions of track
@@ -1251,8 +1204,7 @@ func (d *DownTrack) DebugInfo() map[string]interface{} {
"LastTS": rtpMungerParams.lastTS,
"TSOffset": rtpMungerParams.tsOffset,
"LastMarker": rtpMungerParams.lastMarker,
"LastRTP": d.lastRTP.Load(),
"LastPli": d.lastPli.Load(),
"LastPli": d.rtpStats.LastPli(),
"PacketsDropped": d.pktsDropped.Load(),
}
@@ -1281,39 +1233,53 @@ func (d *DownTrack) GetConnectionScore() float32 {
}
func (d *DownTrack) GetTrackStats() map[uint32]*buffer.StreamStatsWithLayers {
d.statsLock.RLock()
defer d.statsLock.RUnlock()
streamStats := make(map[uint32]*buffer.StreamStatsWithLayers, 1)
stats := make(map[uint32]*buffer.StreamStatsWithLayers, 1)
stats := d.rtpStats.ToProto()
layers := make(map[int]buffer.LayerStats)
layers[0] = buffer.LayerStats{
TotalPackets: d.stats.TotalPrimaryPackets + d.stats.TotalRetransmitPackets + d.stats.TotalPaddingPackets,
TotalBytes: d.stats.TotalPrimaryBytes + d.stats.TotalRetransmitBytes + d.stats.TotalPaddingBytes,
TotalFrames: d.stats.TotalFrames,
TotalPackets: stats.Packets + stats.PacketsDuplicate + stats.PacketsPadding,
TotalBytes: stats.Bytes + stats.BytesDuplicate + stats.BytesPadding,
TotalFrames: stats.Frames,
}
stats[d.ssrc] = &buffer.StreamStatsWithLayers{
StreamStats: d.stats,
Layers: layers,
streamStats[d.ssrc] = &buffer.StreamStatsWithLayers{
RTPStats: stats,
Layers: layers,
}
return stats
return streamStats
}
func (d *DownTrack) getQualityParams() *buffer.ConnectionQualityParams {
s := d.rtpStats.SnapshotInfo(d.connectionQualitySnapshotId)
if s == nil {
return nil
}
lossPercentage := float32(0.0)
if s.PacketsExpected != 0 {
lossPercentage = float32(s.PacketsLost) * 100.0 / float32(s.PacketsExpected)
}
return &buffer.ConnectionQualityParams{
LossPercentage: lossPercentage,
Jitter: float32(s.MaxJitter / 1000.0),
Rtt: s.MaxRtt,
}
}
func (d *DownTrack) GetNackStats() (totalPackets uint32, totalRepeatedNACKs uint32) {
d.statsLock.RLock()
defer d.statsLock.RUnlock()
totalPackets = d.rtpStats.GetTotalPacketsSansDuplicate()
totalPackets = d.stats.TotalPrimaryPackets + d.stats.TotalPaddingPackets
d.statsLock.RLock()
totalRepeatedNACKs = d.totalRepeatedNACKs
d.statsLock.RUnlock()
return
}
func (d *DownTrack) LastPLI() int64 {
t := d.lastPli.Load()
if t.IsZero() {
return 0
}
return t.UnixNano()
func (d *DownTrack) LastPLI() time.Time {
return d.rtpStats.LastPli()
}
+52 -26
View File
@@ -35,7 +35,6 @@ type TrackReceiver interface {
Codec() webrtc.RTPCodecCapability
ReadRTP(buf []byte, layer uint8, sn uint16) (int, error)
GetSenderReportTime(layer int32) (rtpTS uint32, ntpTS buffer.NtpTime)
GetBitrateTemporalCumulative() Bitrates
SendPLI(layer int32)
@@ -168,9 +167,9 @@ func NewWebRTCReceiver(
}
w.connectionStats = connectionquality.NewConnectionStats(connectionquality.ConnectionStatsParams{
CodecType: w.kind,
ClockRate: w.codec.ClockRate,
GetTrackStats: w.GetTrackStats,
CodecType: w.kind,
GetTrackStats: w.GetTrackStats,
GetQualityParams: w.getQualityParams,
GetIsReducedQuality: func() bool {
return w.streamTrackerManager.IsReducedQuality()
},
@@ -385,15 +384,6 @@ func (w *WebRTCReceiver) sendRTCP(packets []rtcp.Packet) {
default:
w.logger.Warnw("sendRTCP failed, rtcp channel full", nil)
}
for _, p := range packets {
switch pkt := p.(type) {
case *rtcp.ReceiverReport:
for _, r := range pkt.Reports {
w.connectionStats.UpdateWindow(r.SSRC, r.LastSequenceNumber, r.TotalLost, w.rtt, r.Jitter)
}
}
}
}
func (w *WebRTCReceiver) SendPLI(layer int32) {
@@ -407,12 +397,17 @@ func (w *WebRTCReceiver) SendPLI(layer int32) {
buff.SendPLI()
}
func (w *WebRTCReceiver) LastPLI() int64 {
var lastPLI int64
func (w *WebRTCReceiver) LastPLI() time.Time {
var lastPLI time.Time
w.bufferMu.RLock()
for _, b := range w.buffers {
if b != nil && b.LastPLI() > lastPLI {
lastPLI = b.LastPLI()
if b == nil {
continue
}
layerLastPLI := b.LastPLI()
if lastPLI.IsZero() || layerLastPLI.After(lastPLI) {
lastPLI = layerLastPLI
}
}
w.bufferMu.RUnlock()
@@ -423,15 +418,6 @@ func (w *WebRTCReceiver) SetRTCPCh(ch chan []rtcp.Packet) {
w.rtcpCh = ch
}
func (w *WebRTCReceiver) GetSenderReportTime(layer int32) (rtpTS uint32, ntpTS buffer.NtpTime) {
w.bufferMu.RLock()
defer w.bufferMu.RUnlock()
if w.buffers[layer] != nil {
rtpTS, ntpTS, _ = w.buffers[layer].GetSenderReportData()
}
return
}
func (w *WebRTCReceiver) ReadRTP(buf []byte, layer uint8, sn uint16) (int, error) {
w.bufferMu.RLock()
buff := w.buffers[layer]
@@ -465,6 +451,46 @@ func (w *WebRTCReceiver) GetTrackStats() map[uint32]*buffer.StreamStatsWithLayer
return stats
}
func (w *WebRTCReceiver) getQualityParams() *buffer.ConnectionQualityParams {
w.bufferMu.RLock()
defer w.bufferMu.RUnlock()
packetsExpected := uint32(0)
packetsLost := uint32(0)
maxJitter := float64(0.0)
maxRtt := uint32(0)
for _, buff := range w.buffers {
if buff == nil {
continue
}
q := buff.GetQualityInfo()
if q == nil {
continue
}
packetsExpected += q.PacketsExpected
packetsLost += q.PacketsLost
if q.MaxJitter > maxJitter {
maxJitter = q.MaxJitter
}
if q.MaxRtt > maxRtt {
maxRtt = q.MaxRtt
}
}
lossPercentage := float32(0.0)
if packetsExpected != 0 {
lossPercentage = float32(packetsLost) * 100.0 / float32(packetsExpected)
}
return &buffer.ConnectionQualityParams{
LossPercentage: lossPercentage,
Jitter: float32(maxJitter / 1000.0),
Rtt: maxRtt,
}
}
func (w *WebRTCReceiver) forwardRTP(layer int32) {
tracker := w.streamTrackerManager.GetTracker(layer)