diff --git a/go.mod b/go.mod index 94f4871fc..a7d3c4d40 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/gorilla/websocket v1.5.0 github.com/hashicorp/go-version v1.6.0 github.com/hashicorp/golang-lru v0.5.4 - github.com/livekit/protocol v1.0.1-0.20220810172733-df83c837695d + github.com/livekit/protocol v1.0.1-0.20220814074051-ac91aad4ad8c github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a github.com/mackerelio/go-osstat v0.2.2 github.com/magefile/mage v1.13.0 diff --git a/go.sum b/go.sum index 7d1c60980..5cc336afe 100644 --- a/go.sum +++ b/go.sum @@ -240,8 +240,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8= github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= -github.com/livekit/protocol v1.0.1-0.20220810172733-df83c837695d h1:e0esC1DzNhhH4r9GZUQQzuaZd5/lb9pLZqBTdBTVAhI= -github.com/livekit/protocol v1.0.1-0.20220810172733-df83c837695d/go.mod h1:hN0rI0/QsnGXp3oYnFktdquU3FPetAl8/naweFo6oPs= +github.com/livekit/protocol v1.0.1-0.20220814074051-ac91aad4ad8c h1:SX39A/GXStqvlDSAKIp8cxVkhdOEMAV+ufZrNpo2cPs= +github.com/livekit/protocol v1.0.1-0.20220814074051-ac91aad4ad8c/go.mod h1:hN0rI0/QsnGXp3oYnFktdquU3FPetAl8/naweFo6oPs= github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a h1:cENjhGfslLSDV07gt8ASy47Wd12Q0kBS7hsdunyQ62I= github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a/go.mod h1:116ych8UaEs9vfIE8n6iZCZ30iagUFTls0vRmC+Ix5U= github.com/mackerelio/go-osstat v0.2.2 h1:7jVyXGXTkQL3+6lDVUDBY+Fpo8VQPfyOkZeXxxsXX4c= diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index de8561cb2..ec8eae0cd 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -27,21 +27,35 @@ type RTPFlowState struct { LossEndExclusive uint16 } +type IntervalStats struct { + packets uint32 + bytes uint64 + headerBytes uint64 + packetsPadding uint32 + bytesPadding uint64 + headerBytesPadding uint64 + packetsLost uint32 + frames uint32 +} + type RTPDeltaInfo struct { - Duration time.Duration - Packets uint32 - Bytes uint64 - PacketsDuplicate uint32 - BytesDuplicate uint64 - PacketsPadding uint32 - BytesPadding uint64 - PacketsLost uint32 - Frames uint32 - RttMax uint32 - JitterMax float64 - Nacks uint32 - Plis uint32 - Firs uint32 + Duration time.Duration + Packets uint32 + Bytes uint64 + HeaderBytes uint64 + PacketsDuplicate uint32 + BytesDuplicate uint64 + HeaderBytesDuplicate uint64 + PacketsPadding uint32 + BytesPadding uint64 + HeaderBytesPadding uint64 + PacketsLost uint32 + Frames uint32 + RttMax uint32 + JitterMax float64 + Nacks uint32 + Plis uint32 + Firs uint32 } type Snapshot struct { @@ -49,6 +63,7 @@ type Snapshot struct { extStartSN uint32 packetsDuplicate uint32 bytesDuplicate uint64 + headerBytesDuplicate uint64 packetsLostOverridden uint32 nacks uint32 plis uint32 @@ -59,6 +74,7 @@ type Snapshot struct { } type SnInfo struct { + hdrSize uint16 pktSize uint16 isPaddingOnly bool marker bool @@ -93,11 +109,14 @@ type RTPStats struct { lastTransit uint32 - bytes uint64 - bytesDuplicate uint64 - bytesPadding uint64 - packetsDuplicate uint32 - packetsPadding uint32 + bytes uint64 + headerBytes uint64 + bytesDuplicate uint64 + headerBytesDuplicate uint64 + bytesPadding uint64 + headerBytesPadding uint64 + packetsDuplicate uint32 + packetsPadding uint32 packetsOutOfOrder uint32 @@ -214,7 +233,8 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa } } - pktSize := uint64(rtph.MarshalSize() + payloadSize + paddingSize) + hdrSize := uint64(rtph.MarshalSize()) + pktSize := hdrSize + uint64(payloadSize+paddingSize) isDuplicate := false diff := rtph.SequenceNumber - r.highestSN switch { @@ -225,14 +245,15 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa } // adjust start to account for out-of-order packets before a cycle completes - if !r.maybeAdjustStartSN(rtph, packetTime, pktSize, payloadSize) { + if !r.maybeAdjustStartSN(rtph, packetTime, pktSize, hdrSize, payloadSize) { if !r.isSnInfoLost(rtph.SequenceNumber) { r.bytesDuplicate += pktSize + r.headerBytesDuplicate += hdrSize r.packetsDuplicate++ isDuplicate = true } else { r.packetsLost-- - r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(payloadSize), rtph.Marker) + r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker) } } @@ -251,7 +272,7 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa r.clearSnInfos(r.highestSN+1, rtph.SequenceNumber) r.packetsLost += uint32(diff - 1) - r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(payloadSize), rtph.Marker) + r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker) if rtph.SequenceNumber < r.highestSN && !first { r.cycles++ @@ -265,8 +286,10 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa if payloadSize == 0 { r.packetsPadding++ r.bytesPadding += pktSize + r.headerBytesPadding += hdrSize } else { r.bytes += pktSize + r.headerBytes += hdrSize if rtph.Marker { r.frames++ @@ -288,7 +311,7 @@ func (r *RTPStats) ForceUpdateLastPacket(rtph *rtp.Header, packetTime int64) { r.highestTime = packetTime } -func (r *RTPStats) maybeAdjustStartSN(rtph *rtp.Header, packetTime int64, pktSize uint64, payloadSize int) bool { +func (r *RTPStats) maybeAdjustStartSN(rtph *rtp.Header, packetTime int64, pktSize uint64, hdrSize uint64, payloadSize int) bool { if (r.getExtHighestSN() - r.extStartSN + 1) >= (NumSequenceNumbers / 2) { return false } @@ -301,7 +324,7 @@ func (r *RTPStats) maybeAdjustStartSN(rtph *rtp.Header, packetTime int64, pktSiz beforeAdjust := r.extStartSN r.extStartSN = uint32(rtph.SequenceNumber) - r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(payloadSize), rtph.Marker) + r.setSnInfo(rtph.SequenceNumber, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), rtph.Marker) for _, s := range r.snapshots { if s.extStartSN == beforeAdjust { @@ -583,7 +606,8 @@ func (r *RTPStats) SnapshotRtcpReceptionReport(ssrc uint32, proxyFracLost uint8, packetsLost = 0 } } else { - _, _, _, _, packetsLost, _ = r.getIntervalStats(uint16(then.extStartSN), uint16(now.extStartSN)) + intervalStats := r.getIntervalStats(uint16(then.extStartSN), uint16(now.extStartSN)) + packetsLost = intervalStats.packetsLost } lossRate := float32(packetsLost) / float32(packetsExpected) fracLost := uint8(lossRate * 256.0) @@ -639,12 +663,15 @@ func (r *RTPStats) DeltaInfo(snapshotId uint32) *RTPDeltaInfo { return nil } - _, bytes, packetsPadding, bytesPadding, packetsLost, frames := r.getIntervalStats(uint16(then.extStartSN), uint16(now.extStartSN)) + packetsLost := uint32(0) + intervalStats := r.getIntervalStats(uint16(then.extStartSN), uint16(now.extStartSN)) if r.params.IsReceiverReportDriven { packetsLost = now.packetsLostOverridden - then.packetsLostOverridden if int32(packetsLost) < 0 { packetsLost = 0 } + } else { + packetsLost = intervalStats.packetsLost } maxJitter := then.maxJitter @@ -654,20 +681,23 @@ func (r *RTPStats) DeltaInfo(snapshotId uint32) *RTPDeltaInfo { maxJitterTime := maxJitter / float64(r.params.ClockRate) * 1e6 return &RTPDeltaInfo{ - Duration: now.startTime.Sub(then.startTime), - Packets: packetsExpected - packetsPadding, - Bytes: bytes, - PacketsDuplicate: now.packetsDuplicate - then.packetsDuplicate, - BytesDuplicate: now.bytesDuplicate - then.bytesDuplicate, - PacketsPadding: packetsPadding, - BytesPadding: bytesPadding, - PacketsLost: packetsLost, - Frames: frames, - RttMax: then.maxRtt, - JitterMax: maxJitterTime, - Nacks: now.nacks - then.nacks, - Plis: now.plis - then.plis, - Firs: now.firs - then.firs, + Duration: now.startTime.Sub(then.startTime), + Packets: packetsExpected - intervalStats.packetsPadding, + Bytes: intervalStats.bytes, + HeaderBytes: intervalStats.headerBytes, + PacketsDuplicate: now.packetsDuplicate - then.packetsDuplicate, + BytesDuplicate: now.bytesDuplicate - then.bytesDuplicate, + HeaderBytesDuplicate: now.headerBytesDuplicate - then.headerBytesDuplicate, + PacketsPadding: intervalStats.packetsPadding, + BytesPadding: intervalStats.bytesPadding, + HeaderBytesPadding: intervalStats.headerBytesPadding, + PacketsLost: packetsLost, + Frames: intervalStats.frames, + RttMax: then.maxRtt, + JitterMax: maxJitterTime, + Nacks: now.nacks - then.nacks, + Plis: now.plis - then.plis, + Firs: now.firs - then.firs, } } @@ -690,14 +720,14 @@ func (r *RTPStats) ToString() string { str += fmt.Sprintf(", p: %d|%.2f/s", p.Packets, p.PacketRate) str += fmt.Sprintf(", l: %d|%.1f/s|%.2f%%", p.PacketsLost, p.PacketLossRate, p.PacketLossPercentage) - str += fmt.Sprintf(", b: %d|%.1fbps", p.Bytes, p.Bitrate) + str += fmt.Sprintf(", b: %d|%.1fbps|%d", p.Bytes, p.Bitrate, p.HeaderBytes) str += fmt.Sprintf(", f: %d|%.1f/s / %d|%+v", p.Frames, p.FrameRate, p.KeyFrames, p.LastKeyFrame.AsTime().Format(time.UnixDate)) str += fmt.Sprintf(", d: %d|%.2f/s", p.PacketsDuplicate, p.PacketDuplicateRate) - str += fmt.Sprintf(", bd: %d|%.1fbps", p.BytesDuplicate, p.BitrateDuplicate) + str += fmt.Sprintf(", bd: %d|%.1fbps|%d", p.BytesDuplicate, p.BitrateDuplicate, p.HeaderBytesDuplicate) str += fmt.Sprintf(", pp: %d|%.2f/s", p.PacketsPadding, p.PacketPaddingRate) - str += fmt.Sprintf(", bp: %d|%.1fbps", p.BytesPadding, p.BitratePadding) + str += fmt.Sprintf(", bp: %d|%.1fbps|%d", p.BytesPadding, p.BitratePadding, p.HeaderBytesPadding) str += fmt.Sprintf(", o: %d", p.PacketsOutOfOrder) @@ -790,6 +820,7 @@ func (r *RTPStats) ToProto() *livekit.RTPStats { Packets: packets, PacketRate: packetRate, Bytes: r.bytes, + HeaderBytes: r.headerBytes, Bitrate: bitrate, PacketsLost: packetsLost, PacketLossRate: packetLostRate, @@ -797,10 +828,12 @@ func (r *RTPStats) ToProto() *livekit.RTPStats { PacketsDuplicate: r.packetsDuplicate, PacketDuplicateRate: packetDuplicateRate, BytesDuplicate: r.bytesDuplicate, + HeaderBytesDuplicate: r.headerBytesDuplicate, BitrateDuplicate: bitrateDuplicate, PacketsPadding: r.packetsPadding, PacketPaddingRate: packetPaddingRate, BytesPadding: r.bytesPadding, + HeaderBytesPadding: r.headerBytesPadding, BitratePadding: bitratePadding, PacketsOutOfOrder: r.packetsOutOfOrder, Frames: r.frames, @@ -882,7 +915,7 @@ func (r *RTPStats) getSnInfoOutOfOrderPtr(sn uint16) int { return (r.snInfoWritePtr - int(offset) - 1) & SnInfoMask } -func (r *RTPStats) setSnInfo(sn uint16, pktSize uint16, payloadSize uint16, marker bool) { +func (r *RTPStats) setSnInfo(sn uint16, pktSize uint16, hdrSize uint16, payloadSize uint16, marker bool) { writePtr := 0 ooo := (sn - r.highestSN) > (1 << 15) if !ooo { @@ -897,6 +930,7 @@ func (r *RTPStats) setSnInfo(sn uint16, pktSize uint16, payloadSize uint16, mark snInfo := &r.snInfos[writePtr] snInfo.pktSize = pktSize + snInfo.hdrSize = hdrSize snInfo.isPaddingOnly = payloadSize == 0 snInfo.marker = marker } @@ -922,7 +956,7 @@ func (r *RTPStats) isSnInfoLost(sn uint16) bool { return snInfo.pktSize == 0 } -func (r *RTPStats) getIntervalStats(startInclusive uint16, endExclusive uint16) (packets uint32, bytes uint64, packetsPadding uint32, bytesPadding uint64, packetsLost uint32, frames uint32) { +func (r *RTPStats) getIntervalStats(startInclusive uint16, endExclusive uint16) (intervalStats IntervalStats) { packetsNotFound := uint32(0) processSN := func(sn uint16) { readPtr := r.getSnInfoOutOfOrderPtr(sn) @@ -933,19 +967,21 @@ func (r *RTPStats) getIntervalStats(startInclusive uint16, endExclusive uint16) snInfo := &r.snInfos[readPtr] switch { case snInfo.pktSize == 0: - packetsLost++ + intervalStats.packetsLost++ case snInfo.isPaddingOnly: - packetsPadding++ - bytesPadding += uint64(snInfo.pktSize) + intervalStats.packetsPadding++ + intervalStats.bytesPadding += uint64(snInfo.pktSize) + intervalStats.headerBytesPadding += uint64(snInfo.hdrSize) default: - packets++ - bytes += uint64(snInfo.pktSize) + intervalStats.packets++ + intervalStats.bytes += uint64(snInfo.pktSize) + intervalStats.headerBytes += uint64(snInfo.hdrSize) } if snInfo.marker { - frames++ + intervalStats.frames++ } } @@ -1027,6 +1063,7 @@ func (r *RTPStats) getAndResetSnapshot(snapshotId uint32) (*Snapshot, *Snapshot) extStartSN: r.getExtHighestSNAdjusted() + 1, packetsDuplicate: r.packetsDuplicate, bytesDuplicate: r.bytesDuplicate, + headerBytesDuplicate: r.headerBytesDuplicate, packetsLostOverridden: r.packetsLostOverridden, nacks: r.nacks, plis: r.plis, @@ -1049,11 +1086,14 @@ func AggregateRTPStats(statsList []*livekit.RTPStats) *livekit.RTPStats { packets := uint32(0) bytes := uint64(0) + headerBytes := uint64(0) packetsLost := uint32(0) packetsDuplicate := uint32(0) bytesDuplicate := uint64(0) + headerBytesDuplicate := uint64(0) packetsPadding := uint32(0) bytesPadding := uint64(0) + headerBytesPadding := uint64(0) packetsOutOfOrder := uint32(0) frames := uint32(0) keyFrames := uint32(0) @@ -1085,14 +1125,17 @@ func AggregateRTPStats(statsList []*livekit.RTPStats) *livekit.RTPStats { packets += stats.Packets bytes += stats.Bytes + headerBytes += stats.HeaderBytes packetsLost += stats.PacketsLost packetsDuplicate += stats.PacketsDuplicate bytesDuplicate += stats.BytesDuplicate + headerBytesDuplicate += stats.HeaderBytesDuplicate packetsPadding += stats.PacketsPadding bytesPadding += stats.BytesPadding + headerBytesPadding += stats.HeaderBytesPadding packetsOutOfOrder += stats.PacketsOutOfOrder @@ -1163,6 +1206,7 @@ func AggregateRTPStats(statsList []*livekit.RTPStats) *livekit.RTPStats { Packets: packets, PacketRate: packetRate, Bytes: bytes, + HeaderBytes: headerBytes, Bitrate: bitrate, PacketsLost: packetsLost, PacketLossRate: packetLostRate, @@ -1170,10 +1214,12 @@ func AggregateRTPStats(statsList []*livekit.RTPStats) *livekit.RTPStats { PacketsDuplicate: packetsDuplicate, PacketDuplicateRate: packetDuplicateRate, BytesDuplicate: bytesDuplicate, + HeaderBytesDuplicate: headerBytesDuplicate, BitrateDuplicate: bitrateDuplicate, PacketsPadding: packetsPadding, PacketPaddingRate: packetPaddingRate, BytesPadding: bytesPadding, + HeaderBytesPadding: headerBytesPadding, BitratePadding: bitratePadding, PacketsOutOfOrder: packetsOutOfOrder, Frames: frames, diff --git a/pkg/sfu/buffer/rtpstats_test.go b/pkg/sfu/buffer/rtpstats_test.go index 1fbbc7a4e..b1be40650 100644 --- a/pkg/sfu/buffer/rtpstats_test.go +++ b/pkg/sfu/buffer/rtpstats_test.go @@ -122,8 +122,8 @@ func TestRTPStats_Update(t *testing.T) { require.Equal(t, uint32(3), r.packetsOutOfOrder) require.Equal(t, uint32(1), r.packetsDuplicate) require.Equal(t, uint32(16), r.packetsLost) - _, _, _, _, packetsLost, _ := r.getIntervalStats(uint16(r.extStartSN), uint16(r.getExtHighestSN()+1)) - require.Equal(t, uint32(16), packetsLost) + intervalStats := r.getIntervalStats(uint16(r.extStartSN), uint16(r.getExtHighestSN()+1)) + require.Equal(t, uint32(16), intervalStats.packetsLost) r.Stop() } diff --git a/pkg/sfu/connectionquality/connectionstats.go b/pkg/sfu/connectionquality/connectionstats.go index 54e77b921..828e24509 100644 --- a/pkg/sfu/connectionquality/connectionstats.go +++ b/pkg/sfu/connectionquality/connectionstats.go @@ -108,7 +108,7 @@ func (cs *ConnectionStats) updateScore(streams map[uint32]*buffer.StreamStatsWit Codec: cs.params.CodecName, PacketsExpected: maxAvailableLayerStats.Packets + maxAvailableLayerStats.PacketsPadding, PacketsLost: maxAvailableLayerStats.PacketsLost, - Bytes: maxAvailableLayerStats.Bytes, + Bytes: maxAvailableLayerStats.Bytes - maxAvailableLayerStats.HeaderBytes, // only use media payload size Frames: maxAvailableLayerStats.Frames, Jitter: maxAvailableLayerStats.JitterMax, Rtt: maxAvailableLayerStats.RttMax,