From dbcc53f04e2ee43df17d030aca541a0ed8135191 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Sun, 14 Aug 2022 13:22:58 +0530 Subject: [PATCH] Use media payload size in scoring. (#912) * Use media payload size in scoring. Subtract out header bytes when calculating score. This does not seem to affect the score (under perfect conditions), but, using header bytes will inflate the bit rate and will affect scoring. * Add header bytes to ToProto * protocol pointer * fix test --- go.mod | 2 +- go.sum | 4 +- pkg/sfu/buffer/rtpstats.go | 150 ++++++++++++------- pkg/sfu/buffer/rtpstats_test.go | 4 +- pkg/sfu/connectionquality/connectionstats.go | 2 +- 5 files changed, 104 insertions(+), 58 deletions(-) diff --git a/go.mod b/go.mod index 94f4871fc..a7d3c4d40 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/gorilla/websocket v1.5.0 github.com/hashicorp/go-version v1.6.0 github.com/hashicorp/golang-lru v0.5.4 - github.com/livekit/protocol v1.0.1-0.20220810172733-df83c837695d + github.com/livekit/protocol v1.0.1-0.20220814074051-ac91aad4ad8c github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a github.com/mackerelio/go-osstat v0.2.2 github.com/magefile/mage v1.13.0 diff --git a/go.sum b/go.sum index 7d1c60980..5cc336afe 100644 --- a/go.sum +++ b/go.sum @@ -240,8 +240,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8= github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= -github.com/livekit/protocol v1.0.1-0.20220810172733-df83c837695d h1:e0esC1DzNhhH4r9GZUQQzuaZd5/lb9pLZqBTdBTVAhI= -github.com/livekit/protocol v1.0.1-0.20220810172733-df83c837695d/go.mod h1:hN0rI0/QsnGXp3oYnFktdquU3FPetAl8/naweFo6oPs= +github.com/livekit/protocol v1.0.1-0.20220814074051-ac91aad4ad8c h1:SX39A/GXStqvlDSAKIp8cxVkhdOEMAV+ufZrNpo2cPs= +github.com/livekit/protocol v1.0.1-0.20220814074051-ac91aad4ad8c/go.mod h1:hN0rI0/QsnGXp3oYnFktdquU3FPetAl8/naweFo6oPs= github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a h1:cENjhGfslLSDV07gt8ASy47Wd12Q0kBS7hsdunyQ62I= github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a/go.mod h1:116ych8UaEs9vfIE8n6iZCZ30iagUFTls0vRmC+Ix5U= github.com/mackerelio/go-osstat v0.2.2 h1:7jVyXGXTkQL3+6lDVUDBY+Fpo8VQPfyOkZeXxxsXX4c= diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index de8561cb2..ec8eae0cd 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -27,21 +27,35 @@ type RTPFlowState struct { LossEndExclusive uint16 } +type IntervalStats struct { + packets uint32 + bytes uint64 + headerBytes uint64 + packetsPadding uint32 + bytesPadding uint64 + headerBytesPadding uint64 + packetsLost uint32 + frames uint32 +} + type RTPDeltaInfo struct { - Duration time.Duration - Packets uint32 - Bytes uint64 - PacketsDuplicate uint32 - BytesDuplicate uint64 - PacketsPadding uint32 - BytesPadding uint64 - PacketsLost uint32 - Frames uint32 - RttMax uint32 - JitterMax float64 - Nacks uint32 - Plis uint32 - Firs uint32 + Duration time.Duration + Packets uint32 + Bytes uint64 + HeaderBytes uint64 + PacketsDuplicate uint32 + BytesDuplicate uint64 + HeaderBytesDuplicate uint64 + PacketsPadding uint32 + BytesPadding uint64 + HeaderBytesPadding uint64 + PacketsLost uint32 + Frames uint32 + RttMax uint32 + JitterMax float64 + Nacks uint32 + Plis uint32 + Firs uint32 } type Snapshot struct { @@ -49,6 +63,7 @@ type Snapshot struct { extStartSN uint32 packetsDuplicate uint32 bytesDuplicate uint64 + headerBytesDuplicate uint64 packetsLostOverridden uint32 nacks uint32 plis uint32 @@ -59,6 +74,7 @@ type Snapshot struct { } type SnInfo struct { + hdrSize uint16 pktSize uint16 isPaddingOnly bool marker bool @@ -93,11 +109,14 @@ type RTPStats struct { lastTransit uint32 - bytes uint64 - bytesDuplicate uint64 - bytesPadding uint64 - packetsDuplicate uint32 - packetsPadding uint32 + bytes uint64 + headerBytes uint64 + bytesDuplicate uint64 + headerBytesDuplicate uint64 + bytesPadding uint64 + headerBytesPadding uint64 + packetsDuplicate uint32 + packetsPadding uint32 packetsOutOfOrder uint32 @@ -214,7 +233,8 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa } } - pktSize := uint64(rtph.MarshalSize() + payloadSize + paddingSize) + hdrSize := uint64(rtph.MarshalSize()) + pktSize := hdrSize + uint64(payloadSize+paddingSize) isDuplicate := false diff := rtph.SequenceNumber - r.highestSN switch { @@ -225,14 +245,15 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa } // adjust start to account for out-of-order packets before a cycle completes - if !r.maybeAdjustStartSN(rtph, packetTime, pktSize, payloadSize) { + if !r.maybeAdjustStartSN(rtph, packetTime, pktSize, hdrSize, payloadSize) { if !r.isSnInfoLost(rtph.SequenceNumber) { r.bytesDuplicate += pktSize + r.headerBytesDuplicate += hdrSize r.packetsDuplicate++ isDuplicate = true } else { r.packetsLost-- - r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(payloadSize), rtph.Marker) + r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker) } } @@ -251,7 +272,7 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa r.clearSnInfos(r.highestSN+1, rtph.SequenceNumber) r.packetsLost += uint32(diff - 1) - r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(payloadSize), rtph.Marker) + r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker) if rtph.SequenceNumber < r.highestSN && !first { r.cycles++ @@ -265,8 +286,10 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa if payloadSize == 0 { r.packetsPadding++ r.bytesPadding += pktSize + r.headerBytesPadding += hdrSize } else { r.bytes += pktSize + r.headerBytes += hdrSize if rtph.Marker { r.frames++ @@ -288,7 +311,7 @@ func (r *RTPStats) ForceUpdateLastPacket(rtph *rtp.Header, packetTime int64) { r.highestTime = packetTime } -func (r *RTPStats) maybeAdjustStartSN(rtph *rtp.Header, packetTime int64, pktSize uint64, payloadSize int) bool { +func (r *RTPStats) maybeAdjustStartSN(rtph *rtp.Header, packetTime int64, pktSize uint64, hdrSize uint64, payloadSize int) bool { if (r.getExtHighestSN() - r.extStartSN + 1) >= (NumSequenceNumbers / 2) { return false } @@ -301,7 +324,7 @@ func (r *RTPStats) maybeAdjustStartSN(rtph *rtp.Header, packetTime int64, pktSiz beforeAdjust := r.extStartSN r.extStartSN = uint32(rtph.SequenceNumber) - r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(payloadSize), rtph.Marker) + r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker) for _, s := range r.snapshots { if s.extStartSN == beforeAdjust { @@ -583,7 +606,8 @@ func (r *RTPStats) SnapshotRtcpReceptionReport(ssrc uint32, proxyFracLost uint8, packetsLost = 0 } } else { - _, _, _, _, packetsLost, _ = r.getIntervalStats(uint16(then.extStartSN), uint16(now.extStartSN)) + intervalStats := r.getIntervalStats(uint16(then.extStartSN), uint16(now.extStartSN)) + packetsLost = intervalStats.packetsLost } lossRate := float32(packetsLost) / float32(packetsExpected) fracLost := uint8(lossRate * 256.0) @@ -639,12 +663,15 @@ func (r *RTPStats) DeltaInfo(snapshotId uint32) *RTPDeltaInfo { return nil } - _, bytes, packetsPadding, bytesPadding, packetsLost, frames := r.getIntervalStats(uint16(then.extStartSN), uint16(now.extStartSN)) + packetsLost := uint32(0) + intervalStats := r.getIntervalStats(uint16(then.extStartSN), uint16(now.extStartSN)) if r.params.IsReceiverReportDriven { packetsLost = now.packetsLostOverridden - then.packetsLostOverridden if int32(packetsLost) < 0 { packetsLost = 0 } + } else { + packetsLost = intervalStats.packetsLost } maxJitter := then.maxJitter @@ -654,20 +681,23 @@ func (r *RTPStats) DeltaInfo(snapshotId uint32) *RTPDeltaInfo { maxJitterTime := maxJitter / float64(r.params.ClockRate) * 1e6 return &RTPDeltaInfo{ - Duration: now.startTime.Sub(then.startTime), - Packets: packetsExpected - packetsPadding, - Bytes: bytes, - PacketsDuplicate: now.packetsDuplicate - then.packetsDuplicate, - BytesDuplicate: now.bytesDuplicate - then.bytesDuplicate, - PacketsPadding: packetsPadding, - BytesPadding: bytesPadding, - PacketsLost: packetsLost, - Frames: frames, - RttMax: then.maxRtt, - JitterMax: maxJitterTime, - Nacks: now.nacks - then.nacks, - Plis: now.plis - then.plis, - Firs: now.firs - then.firs, + Duration: now.startTime.Sub(then.startTime), + Packets: packetsExpected - intervalStats.packetsPadding, + Bytes: intervalStats.bytes, + HeaderBytes: intervalStats.headerBytes, + PacketsDuplicate: now.packetsDuplicate - then.packetsDuplicate, + BytesDuplicate: now.bytesDuplicate - then.bytesDuplicate, + HeaderBytesDuplicate: now.headerBytesDuplicate - then.headerBytesDuplicate, + PacketsPadding: intervalStats.packetsPadding, + BytesPadding: intervalStats.bytesPadding, + HeaderBytesPadding: intervalStats.headerBytesPadding, + PacketsLost: packetsLost, + Frames: intervalStats.frames, + RttMax: then.maxRtt, + JitterMax: maxJitterTime, + Nacks: now.nacks - then.nacks, + Plis: now.plis - then.plis, + Firs: now.firs - then.firs, } } @@ -690,14 +720,14 @@ func (r *RTPStats) ToString() string { str += fmt.Sprintf(", p: %d|%.2f/s", p.Packets, p.PacketRate) str += fmt.Sprintf(", l: %d|%.1f/s|%.2f%%", p.PacketsLost, p.PacketLossRate, p.PacketLossPercentage) - str += fmt.Sprintf(", b: %d|%.1fbps", p.Bytes, p.Bitrate) + str += fmt.Sprintf(", b: %d|%.1fbps|%d", p.Bytes, p.Bitrate, p.HeaderBytes) str += fmt.Sprintf(", f: %d|%.1f/s / %d|%+v", p.Frames, p.FrameRate, p.KeyFrames, p.LastKeyFrame.AsTime().Format(time.UnixDate)) str += fmt.Sprintf(", d: %d|%.2f/s", p.PacketsDuplicate, p.PacketDuplicateRate) - str += fmt.Sprintf(", bd: %d|%.1fbps", p.BytesDuplicate, p.BitrateDuplicate) + str += fmt.Sprintf(", bd: %d|%.1fbps|%d", p.BytesDuplicate, p.BitrateDuplicate, p.HeaderBytesDuplicate) str += fmt.Sprintf(", pp: %d|%.2f/s", p.PacketsPadding, p.PacketPaddingRate) - str += fmt.Sprintf(", bp: %d|%.1fbps", p.BytesPadding, p.BitratePadding) + str += fmt.Sprintf(", bp: %d|%.1fbps|%d", p.BytesPadding, p.BitratePadding, p.HeaderBytesPadding) str += fmt.Sprintf(", o: %d", p.PacketsOutOfOrder) @@ -790,6 +820,7 @@ func (r *RTPStats) ToProto() *livekit.RTPStats { Packets: packets, PacketRate: packetRate, Bytes: r.bytes, + HeaderBytes: r.headerBytes, Bitrate: bitrate, PacketsLost: packetsLost, PacketLossRate: packetLostRate, @@ -797,10 +828,12 @@ func (r *RTPStats) ToProto() *livekit.RTPStats { PacketsDuplicate: r.packetsDuplicate, PacketDuplicateRate: packetDuplicateRate, BytesDuplicate: r.bytesDuplicate, + HeaderBytesDuplicate: r.headerBytesDuplicate, BitrateDuplicate: bitrateDuplicate, PacketsPadding: r.packetsPadding, PacketPaddingRate: packetPaddingRate, BytesPadding: r.bytesPadding, + HeaderBytesPadding: r.headerBytesPadding, BitratePadding: bitratePadding, PacketsOutOfOrder: r.packetsOutOfOrder, Frames: r.frames, @@ -882,7 +915,7 @@ func (r *RTPStats) getSnInfoOutOfOrderPtr(sn uint16) int { return (r.snInfoWritePtr - int(offset) - 1) & SnInfoMask } -func (r *RTPStats) setSnInfo(sn uint16, pktSize uint16, payloadSize uint16, marker bool) { +func (r *RTPStats) setSnInfo(sn uint16, pktSize uint16, hdrSize uint16, payloadSize uint16, marker bool) { writePtr := 0 ooo := (sn - r.highestSN) > (1 << 15) if !ooo { @@ -897,6 +930,7 @@ func (r *RTPStats) setSnInfo(sn uint16, pktSize uint16, payloadSize uint16, mark snInfo := &r.snInfos[writePtr] snInfo.pktSize = pktSize + snInfo.hdrSize = hdrSize snInfo.isPaddingOnly = payloadSize == 0 snInfo.marker = marker } @@ -922,7 +956,7 @@ func (r *RTPStats) isSnInfoLost(sn uint16) bool { return snInfo.pktSize == 0 } -func (r *RTPStats) getIntervalStats(startInclusive uint16, endExclusive uint16) (packets uint32, bytes uint64, packetsPadding uint32, bytesPadding uint64, packetsLost uint32, frames uint32) { +func (r *RTPStats) getIntervalStats(startInclusive uint16, endExclusive uint16) (intervalStats IntervalStats) { packetsNotFound := uint32(0) processSN := func(sn uint16) { readPtr := r.getSnInfoOutOfOrderPtr(sn) @@ -933,19 +967,21 @@ func (r *RTPStats) getIntervalStats(startInclusive uint16, endExclusive uint16) snInfo := &r.snInfos[readPtr] switch { case snInfo.pktSize == 0: - packetsLost++ + intervalStats.packetsLost++ case snInfo.isPaddingOnly: - packetsPadding++ - bytesPadding += uint64(snInfo.pktSize) + intervalStats.packetsPadding++ + intervalStats.bytesPadding += uint64(snInfo.pktSize) + intervalStats.headerBytesPadding += uint64(snInfo.hdrSize) default: - packets++ - bytes += uint64(snInfo.pktSize) + intervalStats.packets++ + intervalStats.bytes += uint64(snInfo.pktSize) + intervalStats.headerBytes += uint64(snInfo.hdrSize) } if snInfo.marker { - frames++ + intervalStats.frames++ } } @@ -1027,6 +1063,7 @@ func (r *RTPStats) getAndResetSnapshot(snapshotId uint32) (*Snapshot, *Snapshot) extStartSN: r.getExtHighestSNAdjusted() + 1, packetsDuplicate: r.packetsDuplicate, bytesDuplicate: r.bytesDuplicate, + headerBytesDuplicate: r.headerBytesDuplicate, packetsLostOverridden: r.packetsLostOverridden, nacks: r.nacks, plis: r.plis, @@ -1049,11 +1086,14 @@ func AggregateRTPStats(statsList []*livekit.RTPStats) *livekit.RTPStats { packets := uint32(0) bytes := uint64(0) + headerBytes := uint64(0) packetsLost := uint32(0) packetsDuplicate := uint32(0) bytesDuplicate := uint64(0) + headerBytesDuplicate := uint64(0) packetsPadding := uint32(0) bytesPadding := uint64(0) + headerBytesPadding := uint64(0) packetsOutOfOrder := uint32(0) frames := uint32(0) keyFrames := uint32(0) @@ -1085,14 +1125,17 @@ func AggregateRTPStats(statsList []*livekit.RTPStats) *livekit.RTPStats { packets += stats.Packets bytes += stats.Bytes + headerBytes += stats.HeaderBytes packetsLost += stats.PacketsLost packetsDuplicate += stats.PacketsDuplicate bytesDuplicate += stats.BytesDuplicate + headerBytesDuplicate += stats.HeaderBytesDuplicate packetsPadding += stats.PacketsPadding bytesPadding += stats.BytesPadding + headerBytesPadding += stats.HeaderBytesPadding packetsOutOfOrder += stats.PacketsOutOfOrder @@ -1163,6 +1206,7 @@ func AggregateRTPStats(statsList []*livekit.RTPStats) *livekit.RTPStats { Packets: packets, PacketRate: packetRate, Bytes: bytes, + HeaderBytes: headerBytes, Bitrate: bitrate, PacketsLost: packetsLost, PacketLossRate: packetLostRate, @@ -1170,10 +1214,12 @@ func AggregateRTPStats(statsList []*livekit.RTPStats) *livekit.RTPStats { PacketsDuplicate: packetsDuplicate, PacketDuplicateRate: packetDuplicateRate, BytesDuplicate: bytesDuplicate, + HeaderBytesDuplicate: headerBytesDuplicate, BitrateDuplicate: bitrateDuplicate, PacketsPadding: packetsPadding, PacketPaddingRate: packetPaddingRate, BytesPadding: bytesPadding, + HeaderBytesPadding: headerBytesPadding, BitratePadding: bitratePadding, PacketsOutOfOrder: packetsOutOfOrder, Frames: frames, diff --git a/pkg/sfu/buffer/rtpstats_test.go b/pkg/sfu/buffer/rtpstats_test.go index 1fbbc7a4e..b1be40650 100644 --- a/pkg/sfu/buffer/rtpstats_test.go +++ b/pkg/sfu/buffer/rtpstats_test.go @@ -122,8 +122,8 @@ func TestRTPStats_Update(t *testing.T) { require.Equal(t, uint32(3), r.packetsOutOfOrder) require.Equal(t, uint32(1), r.packetsDuplicate) require.Equal(t, uint32(16), r.packetsLost) - _, _, _, _, packetsLost, _ := r.getIntervalStats(uint16(r.extStartSN), uint16(r.getExtHighestSN()+1)) - require.Equal(t, uint32(16), packetsLost) + intervalStats := r.getIntervalStats(uint16(r.extStartSN), uint16(r.getExtHighestSN()+1)) + require.Equal(t, uint32(16), intervalStats.packetsLost) r.Stop() } diff --git a/pkg/sfu/connectionquality/connectionstats.go b/pkg/sfu/connectionquality/connectionstats.go index 54e77b921..828e24509 100644 --- a/pkg/sfu/connectionquality/connectionstats.go +++ b/pkg/sfu/connectionquality/connectionstats.go @@ -108,7 +108,7 @@ func (cs *ConnectionStats) updateScore(streams map[uint32]*buffer.StreamStatsWit Codec: cs.params.CodecName, PacketsExpected: maxAvailableLayerStats.Packets + maxAvailableLayerStats.PacketsPadding, PacketsLost: maxAvailableLayerStats.PacketsLost, - Bytes: maxAvailableLayerStats.Bytes, + Bytes: maxAvailableLayerStats.Bytes - maxAvailableLayerStats.HeaderBytes, // only use media payload size Frames: maxAvailableLayerStats.Frames, Jitter: maxAvailableLayerStats.JitterMax, Rtt: maxAvailableLayerStats.RttMax,