Use media payload size in scoring. (#912)

* Use media payload size in scoring.

Subtract out header bytes when calculating score.
This does not seem to affect the score (under perfect conditions),
but, using header bytes will inflate the bit rate and
will affect scoring.

* Add header bytes to ToProto

* protocol pointer

* fix test
This commit is contained in:
Raja Subramanian
2022-08-14 13:22:58 +05:30
committed by GitHub
parent d41f5587e6
commit dbcc53f04e
5 changed files with 104 additions and 58 deletions

2
go.mod
View File

@@ -16,7 +16,7 @@ require (
github.com/gorilla/websocket v1.5.0
github.com/hashicorp/go-version v1.6.0
github.com/hashicorp/golang-lru v0.5.4
github.com/livekit/protocol v1.0.1-0.20220810172733-df83c837695d
github.com/livekit/protocol v1.0.1-0.20220814074051-ac91aad4ad8c
github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a
github.com/mackerelio/go-osstat v0.2.2
github.com/magefile/mage v1.13.0

4
go.sum
View File

@@ -240,8 +240,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8=
github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/livekit/protocol v1.0.1-0.20220810172733-df83c837695d h1:e0esC1DzNhhH4r9GZUQQzuaZd5/lb9pLZqBTdBTVAhI=
github.com/livekit/protocol v1.0.1-0.20220810172733-df83c837695d/go.mod h1:hN0rI0/QsnGXp3oYnFktdquU3FPetAl8/naweFo6oPs=
github.com/livekit/protocol v1.0.1-0.20220814074051-ac91aad4ad8c h1:SX39A/GXStqvlDSAKIp8cxVkhdOEMAV+ufZrNpo2cPs=
github.com/livekit/protocol v1.0.1-0.20220814074051-ac91aad4ad8c/go.mod h1:hN0rI0/QsnGXp3oYnFktdquU3FPetAl8/naweFo6oPs=
github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a h1:cENjhGfslLSDV07gt8ASy47Wd12Q0kBS7hsdunyQ62I=
github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a/go.mod h1:116ych8UaEs9vfIE8n6iZCZ30iagUFTls0vRmC+Ix5U=
github.com/mackerelio/go-osstat v0.2.2 h1:7jVyXGXTkQL3+6lDVUDBY+Fpo8VQPfyOkZeXxxsXX4c=

View File

@@ -27,21 +27,35 @@ type RTPFlowState struct {
LossEndExclusive uint16
}
type IntervalStats struct {
packets uint32
bytes uint64
headerBytes uint64
packetsPadding uint32
bytesPadding uint64
headerBytesPadding uint64
packetsLost uint32
frames uint32
}
type RTPDeltaInfo struct {
Duration time.Duration
Packets uint32
Bytes uint64
PacketsDuplicate uint32
BytesDuplicate uint64
PacketsPadding uint32
BytesPadding uint64
PacketsLost uint32
Frames uint32
RttMax uint32
JitterMax float64
Nacks uint32
Plis uint32
Firs uint32
Duration time.Duration
Packets uint32
Bytes uint64
HeaderBytes uint64
PacketsDuplicate uint32
BytesDuplicate uint64
HeaderBytesDuplicate uint64
PacketsPadding uint32
BytesPadding uint64
HeaderBytesPadding uint64
PacketsLost uint32
Frames uint32
RttMax uint32
JitterMax float64
Nacks uint32
Plis uint32
Firs uint32
}
type Snapshot struct {
@@ -49,6 +63,7 @@ type Snapshot struct {
extStartSN uint32
packetsDuplicate uint32
bytesDuplicate uint64
headerBytesDuplicate uint64
packetsLostOverridden uint32
nacks uint32
plis uint32
@@ -59,6 +74,7 @@ type Snapshot struct {
}
type SnInfo struct {
hdrSize uint16
pktSize uint16
isPaddingOnly bool
marker bool
@@ -93,11 +109,14 @@ type RTPStats struct {
lastTransit uint32
bytes uint64
bytesDuplicate uint64
bytesPadding uint64
packetsDuplicate uint32
packetsPadding uint32
bytes uint64
headerBytes uint64
bytesDuplicate uint64
headerBytesDuplicate uint64
bytesPadding uint64
headerBytesPadding uint64
packetsDuplicate uint32
packetsPadding uint32
packetsOutOfOrder uint32
@@ -214,7 +233,8 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
}
}
pktSize := uint64(rtph.MarshalSize() + payloadSize + paddingSize)
hdrSize := uint64(rtph.MarshalSize())
pktSize := hdrSize + uint64(payloadSize+paddingSize)
isDuplicate := false
diff := rtph.SequenceNumber - r.highestSN
switch {
@@ -225,14 +245,15 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
}
// adjust start to account for out-of-order packets before a cycle completes
if !r.maybeAdjustStartSN(rtph, packetTime, pktSize, payloadSize) {
if !r.maybeAdjustStartSN(rtph, packetTime, pktSize, hdrSize, payloadSize) {
if !r.isSnInfoLost(rtph.SequenceNumber) {
r.bytesDuplicate += pktSize
r.headerBytesDuplicate += hdrSize
r.packetsDuplicate++
isDuplicate = true
} else {
r.packetsLost--
r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(payloadSize), rtph.Marker)
r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker)
}
}
@@ -251,7 +272,7 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
r.clearSnInfos(r.highestSN+1, rtph.SequenceNumber)
r.packetsLost += uint32(diff - 1)
r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(payloadSize), rtph.Marker)
r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker)
if rtph.SequenceNumber < r.highestSN && !first {
r.cycles++
@@ -265,8 +286,10 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
if payloadSize == 0 {
r.packetsPadding++
r.bytesPadding += pktSize
r.headerBytesPadding += hdrSize
} else {
r.bytes += pktSize
r.headerBytes += hdrSize
if rtph.Marker {
r.frames++
@@ -288,7 +311,7 @@ func (r *RTPStats) ForceUpdateLastPacket(rtph *rtp.Header, packetTime int64) {
r.highestTime = packetTime
}
func (r *RTPStats) maybeAdjustStartSN(rtph *rtp.Header, packetTime int64, pktSize uint64, payloadSize int) bool {
func (r *RTPStats) maybeAdjustStartSN(rtph *rtp.Header, packetTime int64, pktSize uint64, hdrSize uint64, payloadSize int) bool {
if (r.getExtHighestSN() - r.extStartSN + 1) >= (NumSequenceNumbers / 2) {
return false
}
@@ -301,7 +324,7 @@ func (r *RTPStats) maybeAdjustStartSN(rtph *rtp.Header, packetTime int64, pktSiz
beforeAdjust := r.extStartSN
r.extStartSN = uint32(rtph.SequenceNumber)
r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(payloadSize), rtph.Marker)
r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker)
for _, s := range r.snapshots {
if s.extStartSN == beforeAdjust {
@@ -583,7 +606,8 @@ func (r *RTPStats) SnapshotRtcpReceptionReport(ssrc uint32, proxyFracLost uint8,
packetsLost = 0
}
} else {
_, _, _, _, packetsLost, _ = r.getIntervalStats(uint16(then.extStartSN), uint16(now.extStartSN))
intervalStats := r.getIntervalStats(uint16(then.extStartSN), uint16(now.extStartSN))
packetsLost = intervalStats.packetsLost
}
lossRate := float32(packetsLost) / float32(packetsExpected)
fracLost := uint8(lossRate * 256.0)
@@ -639,12 +663,15 @@ func (r *RTPStats) DeltaInfo(snapshotId uint32) *RTPDeltaInfo {
return nil
}
_, bytes, packetsPadding, bytesPadding, packetsLost, frames := r.getIntervalStats(uint16(then.extStartSN), uint16(now.extStartSN))
packetsLost := uint32(0)
intervalStats := r.getIntervalStats(uint16(then.extStartSN), uint16(now.extStartSN))
if r.params.IsReceiverReportDriven {
packetsLost = now.packetsLostOverridden - then.packetsLostOverridden
if int32(packetsLost) < 0 {
packetsLost = 0
}
} else {
packetsLost = intervalStats.packetsLost
}
maxJitter := then.maxJitter
@@ -654,20 +681,23 @@ func (r *RTPStats) DeltaInfo(snapshotId uint32) *RTPDeltaInfo {
maxJitterTime := maxJitter / float64(r.params.ClockRate) * 1e6
return &RTPDeltaInfo{
Duration: now.startTime.Sub(then.startTime),
Packets: packetsExpected - packetsPadding,
Bytes: bytes,
PacketsDuplicate: now.packetsDuplicate - then.packetsDuplicate,
BytesDuplicate: now.bytesDuplicate - then.bytesDuplicate,
PacketsPadding: packetsPadding,
BytesPadding: bytesPadding,
PacketsLost: packetsLost,
Frames: frames,
RttMax: then.maxRtt,
JitterMax: maxJitterTime,
Nacks: now.nacks - then.nacks,
Plis: now.plis - then.plis,
Firs: now.firs - then.firs,
Duration: now.startTime.Sub(then.startTime),
Packets: packetsExpected - intervalStats.packetsPadding,
Bytes: intervalStats.bytes,
HeaderBytes: intervalStats.headerBytes,
PacketsDuplicate: now.packetsDuplicate - then.packetsDuplicate,
BytesDuplicate: now.bytesDuplicate - then.bytesDuplicate,
HeaderBytesDuplicate: now.headerBytesDuplicate - then.headerBytesDuplicate,
PacketsPadding: intervalStats.packetsPadding,
BytesPadding: intervalStats.bytesPadding,
HeaderBytesPadding: intervalStats.headerBytesPadding,
PacketsLost: packetsLost,
Frames: intervalStats.frames,
RttMax: then.maxRtt,
JitterMax: maxJitterTime,
Nacks: now.nacks - then.nacks,
Plis: now.plis - then.plis,
Firs: now.firs - then.firs,
}
}
@@ -690,14 +720,14 @@ func (r *RTPStats) ToString() string {
str += fmt.Sprintf(", p: %d|%.2f/s", p.Packets, p.PacketRate)
str += fmt.Sprintf(", l: %d|%.1f/s|%.2f%%", p.PacketsLost, p.PacketLossRate, p.PacketLossPercentage)
str += fmt.Sprintf(", b: %d|%.1fbps", p.Bytes, p.Bitrate)
str += fmt.Sprintf(", b: %d|%.1fbps|%d", p.Bytes, p.Bitrate, p.HeaderBytes)
str += fmt.Sprintf(", f: %d|%.1f/s / %d|%+v", p.Frames, p.FrameRate, p.KeyFrames, p.LastKeyFrame.AsTime().Format(time.UnixDate))
str += fmt.Sprintf(", d: %d|%.2f/s", p.PacketsDuplicate, p.PacketDuplicateRate)
str += fmt.Sprintf(", bd: %d|%.1fbps", p.BytesDuplicate, p.BitrateDuplicate)
str += fmt.Sprintf(", bd: %d|%.1fbps|%d", p.BytesDuplicate, p.BitrateDuplicate, p.HeaderBytesDuplicate)
str += fmt.Sprintf(", pp: %d|%.2f/s", p.PacketsPadding, p.PacketPaddingRate)
str += fmt.Sprintf(", bp: %d|%.1fbps", p.BytesPadding, p.BitratePadding)
str += fmt.Sprintf(", bp: %d|%.1fbps|%d", p.BytesPadding, p.BitratePadding, p.HeaderBytesPadding)
str += fmt.Sprintf(", o: %d", p.PacketsOutOfOrder)
@@ -790,6 +820,7 @@ func (r *RTPStats) ToProto() *livekit.RTPStats {
Packets: packets,
PacketRate: packetRate,
Bytes: r.bytes,
HeaderBytes: r.headerBytes,
Bitrate: bitrate,
PacketsLost: packetsLost,
PacketLossRate: packetLostRate,
@@ -797,10 +828,12 @@ func (r *RTPStats) ToProto() *livekit.RTPStats {
PacketsDuplicate: r.packetsDuplicate,
PacketDuplicateRate: packetDuplicateRate,
BytesDuplicate: r.bytesDuplicate,
HeaderBytesDuplicate: r.headerBytesDuplicate,
BitrateDuplicate: bitrateDuplicate,
PacketsPadding: r.packetsPadding,
PacketPaddingRate: packetPaddingRate,
BytesPadding: r.bytesPadding,
HeaderBytesPadding: r.headerBytesPadding,
BitratePadding: bitratePadding,
PacketsOutOfOrder: r.packetsOutOfOrder,
Frames: r.frames,
@@ -882,7 +915,7 @@ func (r *RTPStats) getSnInfoOutOfOrderPtr(sn uint16) int {
return (r.snInfoWritePtr - int(offset) - 1) & SnInfoMask
}
func (r *RTPStats) setSnInfo(sn uint16, pktSize uint16, payloadSize uint16, marker bool) {
func (r *RTPStats) setSnInfo(sn uint16, pktSize uint16, hdrSize uint16, payloadSize uint16, marker bool) {
writePtr := 0
ooo := (sn - r.highestSN) > (1 << 15)
if !ooo {
@@ -897,6 +930,7 @@ func (r *RTPStats) setSnInfo(sn uint16, pktSize uint16, payloadSize uint16, mark
snInfo := &r.snInfos[writePtr]
snInfo.pktSize = pktSize
snInfo.hdrSize = hdrSize
snInfo.isPaddingOnly = payloadSize == 0
snInfo.marker = marker
}
@@ -922,7 +956,7 @@ func (r *RTPStats) isSnInfoLost(sn uint16) bool {
return snInfo.pktSize == 0
}
func (r *RTPStats) getIntervalStats(startInclusive uint16, endExclusive uint16) (packets uint32, bytes uint64, packetsPadding uint32, bytesPadding uint64, packetsLost uint32, frames uint32) {
func (r *RTPStats) getIntervalStats(startInclusive uint16, endExclusive uint16) (intervalStats IntervalStats) {
packetsNotFound := uint32(0)
processSN := func(sn uint16) {
readPtr := r.getSnInfoOutOfOrderPtr(sn)
@@ -933,19 +967,21 @@ func (r *RTPStats) getIntervalStats(startInclusive uint16, endExclusive uint16)
snInfo := &r.snInfos[readPtr]
switch {
case snInfo.pktSize == 0:
packetsLost++
intervalStats.packetsLost++
case snInfo.isPaddingOnly:
packetsPadding++
bytesPadding += uint64(snInfo.pktSize)
intervalStats.packetsPadding++
intervalStats.bytesPadding += uint64(snInfo.pktSize)
intervalStats.headerBytesPadding += uint64(snInfo.hdrSize)
default:
packets++
bytes += uint64(snInfo.pktSize)
intervalStats.packets++
intervalStats.bytes += uint64(snInfo.pktSize)
intervalStats.headerBytes += uint64(snInfo.hdrSize)
}
if snInfo.marker {
frames++
intervalStats.frames++
}
}
@@ -1027,6 +1063,7 @@ func (r *RTPStats) getAndResetSnapshot(snapshotId uint32) (*Snapshot, *Snapshot)
extStartSN: r.getExtHighestSNAdjusted() + 1,
packetsDuplicate: r.packetsDuplicate,
bytesDuplicate: r.bytesDuplicate,
headerBytesDuplicate: r.headerBytesDuplicate,
packetsLostOverridden: r.packetsLostOverridden,
nacks: r.nacks,
plis: r.plis,
@@ -1049,11 +1086,14 @@ func AggregateRTPStats(statsList []*livekit.RTPStats) *livekit.RTPStats {
packets := uint32(0)
bytes := uint64(0)
headerBytes := uint64(0)
packetsLost := uint32(0)
packetsDuplicate := uint32(0)
bytesDuplicate := uint64(0)
headerBytesDuplicate := uint64(0)
packetsPadding := uint32(0)
bytesPadding := uint64(0)
headerBytesPadding := uint64(0)
packetsOutOfOrder := uint32(0)
frames := uint32(0)
keyFrames := uint32(0)
@@ -1085,14 +1125,17 @@ func AggregateRTPStats(statsList []*livekit.RTPStats) *livekit.RTPStats {
packets += stats.Packets
bytes += stats.Bytes
headerBytes += stats.HeaderBytes
packetsLost += stats.PacketsLost
packetsDuplicate += stats.PacketsDuplicate
bytesDuplicate += stats.BytesDuplicate
headerBytesDuplicate += stats.HeaderBytesDuplicate
packetsPadding += stats.PacketsPadding
bytesPadding += stats.BytesPadding
headerBytesPadding += stats.HeaderBytesPadding
packetsOutOfOrder += stats.PacketsOutOfOrder
@@ -1163,6 +1206,7 @@ func AggregateRTPStats(statsList []*livekit.RTPStats) *livekit.RTPStats {
Packets: packets,
PacketRate: packetRate,
Bytes: bytes,
HeaderBytes: headerBytes,
Bitrate: bitrate,
PacketsLost: packetsLost,
PacketLossRate: packetLostRate,
@@ -1170,10 +1214,12 @@ func AggregateRTPStats(statsList []*livekit.RTPStats) *livekit.RTPStats {
PacketsDuplicate: packetsDuplicate,
PacketDuplicateRate: packetDuplicateRate,
BytesDuplicate: bytesDuplicate,
HeaderBytesDuplicate: headerBytesDuplicate,
BitrateDuplicate: bitrateDuplicate,
PacketsPadding: packetsPadding,
PacketPaddingRate: packetPaddingRate,
BytesPadding: bytesPadding,
HeaderBytesPadding: headerBytesPadding,
BitratePadding: bitratePadding,
PacketsOutOfOrder: packetsOutOfOrder,
Frames: frames,

View File

@@ -122,8 +122,8 @@ func TestRTPStats_Update(t *testing.T) {
require.Equal(t, uint32(3), r.packetsOutOfOrder)
require.Equal(t, uint32(1), r.packetsDuplicate)
require.Equal(t, uint32(16), r.packetsLost)
_, _, _, _, packetsLost, _ := r.getIntervalStats(uint16(r.extStartSN), uint16(r.getExtHighestSN()+1))
require.Equal(t, uint32(16), packetsLost)
intervalStats := r.getIntervalStats(uint16(r.extStartSN), uint16(r.getExtHighestSN()+1))
require.Equal(t, uint32(16), intervalStats.packetsLost)
r.Stop()
}

View File

@@ -108,7 +108,7 @@ func (cs *ConnectionStats) updateScore(streams map[uint32]*buffer.StreamStatsWit
Codec: cs.params.CodecName,
PacketsExpected: maxAvailableLayerStats.Packets + maxAvailableLayerStats.PacketsPadding,
PacketsLost: maxAvailableLayerStats.PacketsLost,
Bytes: maxAvailableLayerStats.Bytes,
Bytes: maxAvailableLayerStats.Bytes - maxAvailableLayerStats.HeaderBytes, // only use media payload size
Frames: maxAvailableLayerStats.Frames,
Jitter: maxAvailableLayerStats.JitterMax,
Rtt: maxAvailableLayerStats.RttMax,