Make lite version of RTPStatsReceiver called RTPStatsReceiverLite. (#3065)

* Make lite version of RTPStatsReceiver called RTPStatsReceiverLite.

Refactor around that.

Will probably make some more flavors to have lighter versions still.

* update deps

* use MarshalLogArray

* use util
This commit is contained in:
Raja Subramanian
2024-10-05 10:50:25 +05:30
committed by GitHub
parent 737c85371b
commit 2491ee7c7c
12 changed files with 771 additions and 502 deletions

6
go.mod
View File

@@ -19,7 +19,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598
github.com/livekit/protocol v1.23.1-0.20241003084409-2406243b2f49
github.com/livekit/protocol v1.23.1-0.20241005050546-ec3b682c05d8
github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9
github.com/mackerelio/go-osstat v0.2.5
github.com/magefile/mage v1.15.0
@@ -134,7 +134,7 @@ require (
golang.org/x/text v0.19.0 // indirect
golang.org/x/tools v0.26.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240924160255-9d4c2d233b61 // indirect
google.golang.org/grpc v1.67.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240930140551-af27646dc61f // indirect
google.golang.org/grpc v1.67.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)

12
go.sum
View File

@@ -165,8 +165,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 h1:yLlkHk2feSLHstD9n4VKg7YEBR4rLODTI4WE8gNBEnQ=
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE=
github.com/livekit/protocol v1.23.1-0.20241003084409-2406243b2f49 h1:mk33tsjwZM8czksJbAj+xQfPfjnPo/RcGUYJLLfltOY=
github.com/livekit/protocol v1.23.1-0.20241003084409-2406243b2f49/go.mod h1:nxRzmQBKSYK64gqr7ABWwt78hvrgiO2wYuCojRYb7Gs=
github.com/livekit/protocol v1.23.1-0.20241005050546-ec3b682c05d8 h1:7cJS9u4VfOcvTeERxwk4yRTOIkwC44LWLSELI+OxWlc=
github.com/livekit/protocol v1.23.1-0.20241005050546-ec3b682c05d8/go.mod h1:nxRzmQBKSYK64gqr7ABWwt78hvrgiO2wYuCojRYb7Gs=
github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9 h1:33oBjGpVD9tYkDXQU42tnHl8eCX9G6PVUToBVuCUyOs=
github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o=
@@ -482,10 +482,10 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142 h1:wKguEg1hsxI2/L3hUYrpo1RVi48K+uTyzKqprwLXsb8=
google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142/go.mod h1:d6be+8HhtEtucleCbxpPW9PA9XwISACu8nvpPqF0BVo=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240924160255-9d4c2d233b61 h1:N9BgCIAUvn/M+p4NJccWPWb3BWh88+zyL0ll9HgbEeM=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240924160255-9d4c2d233b61/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
google.golang.org/grpc v1.67.0 h1:IdH9y6PF5MPSdAntIcpjQ+tXO41pcQsfZV2RxtQgVcw=
google.golang.org/grpc v1.67.0/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240930140551-af27646dc61f h1:cUMEy+8oS78BWIH9OWazBkzbr090Od9tWBNtZHkOhf0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240930140551-af27646dc61f/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E=
google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=

View File

@@ -72,8 +72,8 @@ func TestMetaDataLimits(t *testing.T) {
})
notExceedsLimitsSvc := map[string]*TestRoomService{
"metadata noe exceeds limits": newTestRoomService(config.LimitConfig{MaxMetadataSize: 5}),
"metadata no limits": newTestRoomService(config.LimitConfig{}), // no limits
"metadata exceeds limits": newTestRoomService(config.LimitConfig{MaxMetadataSize: 5}),
"metadata no limits": newTestRoomService(config.LimitConfig{}), // no limits
}
for n, s := range notExceedsLimitsSvc {

View File

@@ -731,10 +731,8 @@ func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime int64) rtpstats.RT
if b.nacker != nil {
b.nacker.Remove(p.SequenceNumber)
if flowState.HasLoss {
for lost := flowState.LossStartInclusive; lost != flowState.LossEndExclusive; lost++ {
b.nacker.Push(uint16(lost))
}
for lost := flowState.LossStartInclusive; lost != flowState.LossEndExclusive; lost++ {
b.nacker.Push(uint16(lost))
}
}

View File

@@ -142,11 +142,6 @@ type DownTrackState struct {
ForwarderState *livekit.RTPForwarderState
}
func (d DownTrackState) String() string {
return fmt.Sprintf("DownTrackState{rtpStats: %s, deltaSender: %d, forwarder: %s}",
d.RTPStats, d.DeltaStatsSenderSnapshotId, d.ForwarderState.String())
}
func (d DownTrackState) MarshalLogObject(e zapcore.ObjectEncoder) error {
e.AddObject("RTPStats", d.RTPStats)
e.AddUint32("DeltaStatsSenderSnapshotId", d.DeltaStatsSenderSnapshotId)
@@ -2087,11 +2082,11 @@ func (d *DownTrack) GetLastReceiverReportTime() time.Time {
}
func (d *DownTrack) GetTotalPacketsSent() uint64 {
return d.rtpStats.GetTotalPacketsPrimary()
return d.rtpStats.GetPacketsSeenMinusPadding()
}
func (d *DownTrack) GetNackStats() (totalPackets uint32, totalRepeatedNACKs uint32) {
totalPackets = uint32(d.rtpStats.GetTotalPacketsPrimary())
totalPackets = uint32(d.rtpStats.GetPacketsSeenMinusPadding())
totalRepeatedNACKs = d.totalRepeatedNACKs.Load()
return
}

View File

@@ -30,6 +30,7 @@ import (
"github.com/livekit/mediatransportutil"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/logger/zaputil"
"github.com/livekit/protocol/utils"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
@@ -207,20 +208,6 @@ func (r refInfo) MarshalLogObject(e zapcore.ObjectEncoder) error {
// -------------------------------------------------------------------
type wrappedRefInfosLogger struct {
*Forwarder
}
func (w wrappedRefInfosLogger) MarshalLogObject(e zapcore.ObjectEncoder) error {
for i, refInfo := range w.Forwarder.refInfos {
e.AddObject(fmt.Sprintf("%d", i), refInfo)
}
return nil
}
// -------------------------------------------------------------------
type Forwarder struct {
lock sync.RWMutex
codec webrtc.RTPCodecCapability
@@ -1667,7 +1654,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
"extRefTS", extRefTS,
"extLastTS", extLastTS,
"diffSeconds", math.Abs(diffSeconds),
"refInfos", wrappedRefInfosLogger{f},
"refInfos", zaputil.ObjectSlice(f.refInfos[:]),
)
}
// TODO-REMOVE-AFTER-DATA-COLLECTION
@@ -1682,7 +1669,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
"extRefTS", extRefTS,
"extLastTS", extLastTS,
"diffSeconds", math.Abs(diffSeconds),
"refInfos", wrappedRefInfosLogger{f},
"refInfos", zaputil.ObjectSlice(f.refInfos[:]),
)
}
@@ -1878,7 +1865,7 @@ func (f *Forwarder) getTranslationParamsCommon(extPkt *buffer.ExtPacket, layer i
"could not switch feed",
"error", err,
"layer", layer,
"refInfos", wrappedRefInfosLogger{f},
"refInfos", zaputil.ObjectSlice(f.refInfos[:]),
"currentLayer", f.vls.GetCurrent(),
"targetLayer", f.vls.GetCurrent(),
"maxLayer", f.vls.GetMax(),
@@ -1891,7 +1878,7 @@ func (f *Forwarder) getTranslationParamsCommon(extPkt *buffer.ExtPacket, layer i
"from", f.lastSSRC,
"to", extPkt.Packet.SSRC,
"layer", layer,
"refInfos", wrappedRefInfosLogger{f},
"refInfos", zaputil.ObjectSlice(f.refInfos[:]),
"currentLayer", f.vls.GetCurrent(),
"targetLayer", f.vls.GetCurrent(),
"maxLayer", f.vls.GetMax(),

View File

@@ -16,8 +16,6 @@ package rtpstats
import (
"errors"
"fmt"
"sync"
"time"
"go.uber.org/zap/zapcore"
@@ -25,15 +23,10 @@ import (
"github.com/livekit/mediatransportutil"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
)
const (
cGapHistogramNumBins = 101
cNumSequenceNumbers = 65536
cFirstSnapshotID = 1
cFirstPacketTimeAdjustWindow = 2 * time.Minute
cFirstPacketTimeAdjustThreshold = 15 * 1e9
@@ -42,20 +35,6 @@ const (
// -------------------------------------------------------
func RTPDriftToString(r *livekit.RTPDrift) string {
if r == nil {
return "-"
}
str := fmt.Sprintf("t: %+v|%+v|%.2fs", r.StartTime.AsTime().Format(time.UnixDate), r.EndTime.AsTime().Format(time.UnixDate), r.Duration)
str += fmt.Sprintf(", ts: %d|%d|%d", r.StartTimestamp, r.EndTimestamp, r.RtpClockTicks)
str += fmt.Sprintf(", d: %d|%.2fms", r.DriftSamples, r.DriftMs)
str += fmt.Sprintf(", cr: %.2f", r.ClockRate)
return str
}
// -------------------------------------------------------
type RTPDeltaInfo struct {
StartTime time.Time
EndTime time.Time
@@ -80,31 +59,22 @@ type RTPDeltaInfo struct {
}
type snapshot struct {
isValid bool
snapshotLite
startTime time.Time
extStartSN uint64
bytes uint64
headerBytes uint64
packetsPadding uint64
bytesPadding uint64
headerBytesPadding uint64
packetsDuplicate uint64
bytesDuplicate uint64
headerBytesDuplicate uint64
packetsOutOfOrder uint64
packetsLost uint64
packetsPadding uint64
bytesPadding uint64
headerBytesPadding uint64
frames uint32
nacks uint32
plis uint32
firs uint32
plis uint32
firs uint32
maxRtt uint32
maxJitter float64
@@ -166,21 +136,8 @@ func RTCPSenderReportPropagationDelay(rsrs *livekit.RTCPSenderReportState, passT
// ------------------------------------------------------------------
type RTPStatsParams struct {
ClockRate uint32
Logger logger.Logger
}
type rtpStatsBase struct {
params RTPStatsParams
logger logger.Logger
lock sync.RWMutex
initialized bool
startTime time.Time
endTime time.Time
*rtpStatsBaseLite
firstTime int64
firstTimeAdjustment time.Duration
@@ -189,27 +146,21 @@ type rtpStatsBase struct {
lastTransit uint64
lastJitterExtTimestamp uint64
bytes uint64
headerBytes uint64
headerBytes uint64
packetsDuplicate uint64
bytesDuplicate uint64
headerBytesDuplicate uint64
bytesPadding uint64
headerBytesPadding uint64
packetsDuplicate uint64
packetsPadding uint64
packetsOutOfOrder uint64
packetsLost uint64
packetsPadding uint64
bytesPadding uint64
headerBytesPadding uint64
frames uint32
jitter float64
maxJitter float64
gapHistogram [cGapHistogramNumBins]uint32
nacks uint32
nackAcks uint32
nackMisses uint32
nackRepeated uint32
@@ -238,10 +189,9 @@ type rtpStatsBase struct {
func newRTPStatsBase(params RTPStatsParams) *rtpStatsBase {
return &rtpStatsBase{
params: params,
logger: params.Logger,
nextSnapshotID: cFirstSnapshotID,
snapshots: make([]snapshot, 2),
rtpStatsBaseLite: newRTPStatsBaseLite(params),
nextSnapshotID: cFirstSnapshotID,
snapshots: make([]snapshot, 2),
}
}
@@ -250,10 +200,9 @@ func (r *rtpStatsBase) seed(from *rtpStatsBase) bool {
return false
}
r.initialized = from.initialized
r.startTime = from.startTime
// do not clone endTime as a non-zero endTime indicates an ended object
if !r.rtpStatsBaseLite.seed(from.rtpStatsBaseLite) {
return false
}
r.firstTime = from.firstTime
r.highestTime = from.highestTime
@@ -261,27 +210,19 @@ func (r *rtpStatsBase) seed(from *rtpStatsBase) bool {
r.lastTransit = from.lastTransit
r.lastJitterExtTimestamp = from.lastJitterExtTimestamp
r.bytes = from.bytes
r.headerBytes = from.headerBytes
r.bytesDuplicate = from.bytesDuplicate
r.headerBytesDuplicate = from.headerBytesDuplicate
r.packetsPadding = from.packetsPadding
r.bytesPadding = from.bytesPadding
r.headerBytesPadding = from.headerBytesPadding
r.packetsDuplicate = from.packetsDuplicate
r.packetsPadding = from.packetsPadding
r.packetsOutOfOrder = from.packetsOutOfOrder
r.packetsLost = from.packetsLost
r.frames = from.frames
r.jitter = from.jitter
r.maxJitter = from.maxJitter
r.gapHistogram = from.gapHistogram
r.nacks = from.nacks
r.nackAcks = from.nackAcks
r.nackMisses = from.nackMisses
r.nackRepeated = from.nackRepeated
@@ -310,17 +251,6 @@ func (r *rtpStatsBase) seed(from *rtpStatsBase) bool {
return true
}
func (r *rtpStatsBase) SetLogger(logger logger.Logger) {
r.logger = logger
}
func (r *rtpStatsBase) Stop() {
r.lock.Lock()
defer r.lock.Unlock()
r.endTime = time.Now()
}
func (r *rtpStatsBase) newSnapshotID(extStartSN uint64) uint32 {
id := r.nextSnapshotID
r.nextSnapshotID++
@@ -332,29 +262,11 @@ func (r *rtpStatsBase) newSnapshotID(extStartSN uint64) uint32 {
}
if r.initialized {
r.snapshots[id-cFirstSnapshotID] = r.initSnapshot(time.Now(), extStartSN)
r.snapshots[id-cFirstSnapshotID] = initSnapshot(time.Now(), extStartSN)
}
return id
}
func (r *rtpStatsBase) IsActive() bool {
r.lock.RLock()
defer r.lock.RUnlock()
return r.initialized && r.endTime.IsZero()
}
func (r *rtpStatsBase) UpdateNack(nackCount uint32) {
r.lock.Lock()
defer r.lock.Unlock()
if !r.endTime.IsZero() {
return
}
r.nacks += nackCount
}
func (r *rtpStatsBase) UpdateNackProcessed(nackAckCount uint32, nackMissCount uint32, nackRepeatedCount uint32) {
r.lock.Lock()
defer r.lock.Unlock()
@@ -558,14 +470,8 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *livekit.RTCPSenderRepo
return
}
func (r *rtpStatsBase) getTotalPacketsPrimary(extStartSN, extHighestSN uint64) uint64 {
packetsExpected := extHighestSN - extStartSN + 1
if r.packetsLost > packetsExpected {
// should not happen
return 0
}
packetsSeen := packetsExpected - r.packetsLost
func (r *rtpStatsBase) getPacketsSeenMinusPadding(extStartSN, extHighestSN uint64) uint64 {
packetsSeen := r.getPacketsSeen(extStartSN, extHighestSN)
if r.packetsPadding > packetsSeen {
return 0
}
@@ -573,7 +479,15 @@ func (r *rtpStatsBase) getTotalPacketsPrimary(extStartSN, extHighestSN uint64) u
return packetsSeen - r.packetsPadding
}
func (r *rtpStatsBase) deltaInfo(snapshotID uint32, extStartSN uint64, extHighestSN uint64) (deltaInfo *RTPDeltaInfo, err error, loggingFields []interface{}) {
func (r *rtpStatsBase) getPacketsSeenPlusDuplicates(extStartSN, extHighestSN uint64) uint64 {
return r.getPacketsSeen(extStartSN, extHighestSN) + r.packetsDuplicate
}
func (r *rtpStatsBase) deltaInfo(
snapshotID uint32,
extStartSN uint64,
extHighestSN uint64,
) (deltaInfo *RTPDeltaInfo, err error, loggingFields []interface{}) {
then, now := r.getAndResetSnapshot(snapshotID, extStartSN, extHighestSN)
if now == nil || then == nil {
return
@@ -651,59 +565,44 @@ func (r *rtpStatsBase) deltaInfo(snapshotID uint32, extStartSN uint64, extHighes
return
}
func (r *rtpStatsBase) MarshalLogObject(e zapcore.ObjectEncoder) error {
func (r *rtpStatsBase) marshalLogObject(
e zapcore.ObjectEncoder,
packetsExpected, packetsSeenMinusPadding uint64,
extStartTS, extHighestTS uint64,
) error {
if r == nil {
return nil
}
e.AddTime("startTime", r.startTime)
e.AddTime("endTime", r.endTime)
elapsedSeconds, err := r.rtpStatsBaseLite.marshalLogObject(e, packetsExpected, packetsSeenMinusPadding)
if err != nil {
return err
}
e.AddTime("firstTime", time.Unix(0, r.firstTime))
e.AddDuration("firstTimeAdjustment", r.firstTimeAdjustment)
e.AddTime("highestTime", time.Unix(0, r.highestTime))
e.AddUint64("bytes", r.bytes)
e.AddUint64("headerBytes", r.headerBytes)
e.AddUint64("packetsDuplicate", r.packetsDuplicate)
e.AddFloat64("packetsDuplicateRate", float64(r.packetsDuplicate)/elapsedSeconds)
e.AddUint64("bytesDuplicate", r.bytesDuplicate)
e.AddFloat64("bitrateDuplicate", float64(r.bytesDuplicate)*8.0/elapsedSeconds)
e.AddUint64("headerBytesDuplicate", r.headerBytesDuplicate)
e.AddUint64("packetsPadding", r.packetsPadding)
e.AddFloat64("packetsPaddingRate", float64(r.packetsPadding)/elapsedSeconds)
e.AddUint64("bytesPadding", r.bytesPadding)
e.AddFloat64("bitratePadding", float64(r.bytesPadding)*8.0/elapsedSeconds)
e.AddUint64("headerBytesPadding", r.headerBytesPadding)
e.AddUint64("packetsOutOfOrder", r.packetsOutOfOrder)
e.AddUint64("packetsLost", r.packetsLost)
e.AddUint32("frames", r.frames)
e.AddFloat64("frameRate", float64(r.frames)/elapsedSeconds)
e.AddFloat64("jitter", r.jitter)
e.AddFloat64("maxJitter", r.maxJitter)
hasLoss := false
first := true
str := "["
for burst, count := range r.gapHistogram {
if count == 0 {
continue
}
hasLoss = true
if !first {
str += ", "
}
first = false
str += fmt.Sprintf("%d:%d", burst+1, count)
}
str += "]"
if hasLoss {
e.AddString("gapHistogram", str)
}
e.AddUint32("nacks", r.nacks)
e.AddUint32("nackAcks", r.nackAcks)
e.AddUint32("nackMisses", r.nackMisses)
e.AddUint32("nackRepeated", r.nackRepeated)
@@ -725,190 +624,65 @@ func (r *rtpStatsBase) MarshalLogObject(e zapcore.ObjectEncoder) error {
e.AddObject("srFirst", WrappedRTCPSenderReportStateLogger{r.srFirst})
e.AddObject("srNewest", WrappedRTCPSenderReportStateLogger{r.srNewest})
packetDrift, ntpReportDrift, receivedReportDrift, rebasedReportDrift := r.getDrift(extStartTS, extHighestTS)
e.AddObject("packetDrift", wrappedRTPDriftLogger{packetDrift})
e.AddObject("ntpReportDrift", wrappedRTPDriftLogger{ntpReportDrift})
e.AddObject("receivedReportDrift", wrappedRTPDriftLogger{receivedReportDrift})
e.AddObject("rebasedReportDrift", wrappedRTPDriftLogger{rebasedReportDrift})
return nil
}
func (r *rtpStatsBase) toString(
extStartSN, extHighestSN, extStartTS, extHighestTS uint64,
packetsLost uint64,
jitter, maxJitter float64,
) string {
p := r.toProto(
extStartSN, extHighestSN, extStartTS, extHighestTS,
packetsLost,
jitter, maxJitter,
)
if p == nil {
return ""
}
expectedPackets := extHighestSN - extStartSN + 1
expectedPacketRate := float64(expectedPackets) / p.Duration
str := fmt.Sprintf("t: %+v|%+v|%.2fs", p.StartTime.AsTime().Format(time.UnixDate), p.EndTime.AsTime().Format(time.UnixDate), p.Duration)
str += fmt.Sprintf(", sn: %d|%d", extStartSN, extHighestSN)
str += fmt.Sprintf(", ep: %d|%.2f/s", expectedPackets, expectedPacketRate)
str += fmt.Sprintf(", p: %d|%.2f/s", p.Packets, p.PacketRate)
str += fmt.Sprintf(", l: %d|%.1f/s|%.2f%%", p.PacketsLost, p.PacketLossRate, p.PacketLossPercentage)
str += fmt.Sprintf(", b: %d|%.1fbps|%d", p.Bytes, p.Bitrate, p.HeaderBytes)
str += fmt.Sprintf(", f: %d|%.1f/s / %d|%+v", p.Frames, p.FrameRate, p.KeyFrames, p.LastKeyFrame.AsTime().Format(time.UnixDate))
str += fmt.Sprintf(", d: %d|%.2f/s", p.PacketsDuplicate, p.PacketDuplicateRate)
str += fmt.Sprintf(", bd: %d|%.1fbps|%d", p.BytesDuplicate, p.BitrateDuplicate, p.HeaderBytesDuplicate)
str += fmt.Sprintf(", pp: %d|%.2f/s", p.PacketsPadding, p.PacketPaddingRate)
str += fmt.Sprintf(", bp: %d|%.1fbps|%d", p.BytesPadding, p.BitratePadding, p.HeaderBytesPadding)
str += fmt.Sprintf(", o: %d", p.PacketsOutOfOrder)
str += fmt.Sprintf(", c: %d, j: %d(%.1fus)|%d(%.1fus)", r.params.ClockRate, uint32(jitter), p.JitterCurrent, uint32(maxJitter), p.JitterMax)
if len(p.GapHistogram) != 0 {
first := true
str += ", gh:["
for burst, count := range p.GapHistogram {
if !first {
str += ", "
}
first = false
str += fmt.Sprintf("%d:%d", burst, count)
}
str += "]"
}
str += ", n:"
str += fmt.Sprintf("%d|%d|%d|%d", p.Nacks, p.NackAcks, p.NackMisses, p.NackRepeated)
str += ", pli:"
str += fmt.Sprintf("%d|%+v / %d|%+v",
p.Plis, p.LastPli.AsTime().Format(time.UnixDate),
p.LayerLockPlis, p.LastLayerLockPli.AsTime().Format(time.UnixDate),
)
str += ", fir:"
str += fmt.Sprintf("%d|%+v", p.Firs, p.LastFir.AsTime().Format(time.UnixDate))
str += ", rtt(ms):"
str += fmt.Sprintf("%d|%d", p.RttCurrent, p.RttMax)
str += fmt.Sprintf(", pd: %s, nrd: %s, rxrd: %s, rbrd: %s",
RTPDriftToString(p.PacketDrift),
RTPDriftToString(p.NtpReportDrift),
RTPDriftToString(p.ReceivedReportDrift),
RTPDriftToString(p.RebasedReportDrift),
)
return str
}
func (r *rtpStatsBase) toProto(
extStartSN, extHighestSN, extStartTS, extHighestTS uint64,
packetsLost uint64,
packetsExpected, packetsSeenMinusPadding, packetsLost uint64,
extStartTS, extHighestTS uint64,
jitter, maxJitter float64,
) *livekit.RTPStats {
if r.startTime.IsZero() {
p := r.rtpStatsBaseLite.toProto(packetsExpected, packetsSeenMinusPadding, packetsLost)
if p == nil {
return nil
}
endTime := r.endTime
if endTime.IsZero() {
endTime = time.Now()
}
elapsed := endTime.Sub(r.startTime).Seconds()
if elapsed == 0.0 {
return nil
}
p.HeaderBytes = r.headerBytes
packets := r.getTotalPacketsPrimary(extStartSN, extHighestSN)
packetRate := float64(packets) / elapsed
bitrate := float64(r.bytes) * 8.0 / elapsed
p.PacketsDuplicate = uint32(r.packetsDuplicate)
p.PacketDuplicateRate = float64(r.packetsDuplicate) / p.Duration
p.BytesDuplicate = r.bytesDuplicate
p.BitrateDuplicate = float64(r.bytesDuplicate) * 8.0 / p.Duration
p.HeaderBytesDuplicate = r.headerBytesDuplicate
frameRate := float64(r.frames) / elapsed
p.PacketsPadding = uint32(r.packetsPadding)
p.PacketPaddingRate = float64(r.packetsPadding) / p.Duration
p.BytesPadding = r.bytesPadding
p.BitratePadding = float64(r.bytesPadding) * 8.0 / p.Duration
p.HeaderBytesPadding = r.headerBytesPadding
packetsExpected := extHighestSN - extStartSN + 1
packetLostRate := float64(packetsLost) / elapsed
packetLostPercentage := float32(packetsLost) / float32(packetsExpected) * 100.0
p.Frames = r.frames
p.FrameRate = float64(r.frames) / p.Duration
packetDuplicateRate := float64(r.packetsDuplicate) / elapsed
bitrateDuplicate := float64(r.bytesDuplicate) * 8.0 / elapsed
p.KeyFrames = r.keyFrames
p.LastKeyFrame = timestamppb.New(r.lastKeyFrame)
packetPaddingRate := float64(r.packetsPadding) / elapsed
bitratePadding := float64(r.bytesPadding) * 8.0 / elapsed
p.JitterCurrent = jitter / float64(r.params.ClockRate) * 1e6
p.JitterMax = maxJitter / float64(r.params.ClockRate) * 1e6
jitterTime := jitter / float64(r.params.ClockRate) * 1e6
maxJitterTime := maxJitter / float64(r.params.ClockRate) * 1e6
p.NackAcks = r.nackAcks
p.NackMisses = r.nackMisses
p.NackRepeated = r.nackRepeated
packetDrift, ntpReportDrift, receivedReportDrift, rebasedReportDrift := r.getDrift(extStartTS, extHighestTS)
p.Plis = r.plis
p.LastPli = timestamppb.New(r.lastPli)
p := &livekit.RTPStats{
StartTime: timestamppb.New(r.startTime),
EndTime: timestamppb.New(endTime),
Duration: elapsed,
Packets: uint32(packets),
PacketRate: packetRate,
Bytes: r.bytes,
HeaderBytes: r.headerBytes,
Bitrate: bitrate,
PacketsLost: uint32(packetsLost),
PacketLossRate: packetLostRate,
PacketLossPercentage: packetLostPercentage,
PacketsDuplicate: uint32(r.packetsDuplicate),
PacketDuplicateRate: packetDuplicateRate,
BytesDuplicate: r.bytesDuplicate,
HeaderBytesDuplicate: r.headerBytesDuplicate,
BitrateDuplicate: bitrateDuplicate,
PacketsPadding: uint32(r.packetsPadding),
PacketPaddingRate: packetPaddingRate,
BytesPadding: r.bytesPadding,
HeaderBytesPadding: r.headerBytesPadding,
BitratePadding: bitratePadding,
PacketsOutOfOrder: uint32(r.packetsOutOfOrder),
Frames: r.frames,
FrameRate: frameRate,
KeyFrames: r.keyFrames,
LastKeyFrame: timestamppb.New(r.lastKeyFrame),
JitterCurrent: jitterTime,
JitterMax: maxJitterTime,
Nacks: r.nacks,
NackAcks: r.nackAcks,
NackMisses: r.nackMisses,
NackRepeated: r.nackRepeated,
Plis: r.plis,
LastPli: timestamppb.New(r.lastPli),
LayerLockPlis: r.layerLockPlis,
LastLayerLockPli: timestamppb.New(r.lastLayerLockPli),
Firs: r.firs,
LastFir: timestamppb.New(r.lastFir),
RttCurrent: r.rtt,
RttMax: r.maxRtt,
PacketDrift: packetDrift,
NtpReportDrift: ntpReportDrift,
RebasedReportDrift: rebasedReportDrift,
ReceivedReportDrift: receivedReportDrift,
}
p.LayerLockPlis = r.layerLockPlis
p.LastLayerLockPli = timestamppb.New(r.lastLayerLockPli)
gapsPresent := false
for i := 0; i < len(r.gapHistogram); i++ {
if r.gapHistogram[i] == 0 {
continue
}
p.Firs = r.firs
p.LastFir = timestamppb.New(r.lastFir)
gapsPresent = true
break
}
if gapsPresent {
p.GapHistogram = make(map[int32]uint32, len(r.gapHistogram))
for i := 0; i < len(r.gapHistogram); i++ {
if r.gapHistogram[i] == 0 {
continue
}
p.GapHistogram[int32(i+1)] = r.gapHistogram[i]
}
}
p.RttCurrent = r.rtt
p.RttMax = r.maxRtt
p.PacketDrift, p.NtpReportDrift, p.ReceivedReportDrift, p.RebasedReportDrift = r.getDrift(extStartTS, extHighestTS)
return p
}
@@ -957,7 +731,7 @@ func (r *rtpStatsBase) getAndResetSnapshot(snapshotID uint32, extStartSN uint64,
idx := snapshotID - cFirstSnapshotID
then := r.snapshots[idx]
if !then.isValid {
then = r.initSnapshot(r.startTime, extStartSN)
then = initSnapshot(r.startTime, extStartSN)
r.snapshots[idx] = then
}
@@ -967,7 +741,12 @@ func (r *rtpStatsBase) getAndResetSnapshot(snapshotID uint32, extStartSN uint64,
return &then, &now
}
func (r *rtpStatsBase) getDrift(extStartTS, extHighestTS uint64) (packetDrift *livekit.RTPDrift, ntpReportDrift *livekit.RTPDrift, receivedReportDrift *livekit.RTPDrift, rebasedReportDrift *livekit.RTPDrift) {
func (r *rtpStatsBase) getDrift(extStartTS, extHighestTS uint64) (
packetDrift *livekit.RTPDrift,
ntpReportDrift *livekit.RTPDrift,
receivedReportDrift *livekit.RTPDrift,
rebasedReportDrift *livekit.RTPDrift,
) {
if r.firstTime != 0 {
elapsed := r.highestTime - r.firstTime
rtpClockTicks := extHighestTS - extStartTS
@@ -1055,31 +834,17 @@ func (r *rtpStatsBase) updateGapHistogram(gap int) {
}
}
func (r *rtpStatsBase) initSnapshot(startTime time.Time, extStartSN uint64) snapshot {
return snapshot{
isValid: true,
startTime: startTime,
extStartSN: extStartSN,
}
}
func (r *rtpStatsBase) getSnapshot(startTime time.Time, extStartSN uint64) snapshot {
return snapshot{
isValid: true,
startTime: startTime,
extStartSN: extStartSN,
bytes: r.bytes,
snapshotLite: r.getSnapshotLite(startTime, extStartSN),
headerBytes: r.headerBytes,
packetsPadding: r.packetsPadding,
bytesPadding: r.bytesPadding,
headerBytesPadding: r.headerBytesPadding,
packetsDuplicate: r.packetsDuplicate,
bytesDuplicate: r.bytesDuplicate,
headerBytesDuplicate: r.headerBytesDuplicate,
packetsLost: r.packetsLost,
packetsOutOfOrder: r.packetsOutOfOrder,
packetsPadding: r.packetsPadding,
bytesPadding: r.bytesPadding,
headerBytesPadding: r.headerBytesPadding,
frames: r.frames,
nacks: r.nacks,
plis: r.plis,
firs: r.firs,
maxRtt: r.rtt,
@@ -1089,6 +854,12 @@ func (r *rtpStatsBase) getSnapshot(startTime time.Time, extStartSN uint64) snaps
// ----------------------------------
func initSnapshot(startTime time.Time, extStartSN uint64) snapshot {
return snapshot{
snapshotLite: initSnapshotLite(startTime, extStartSN),
}
}
func AggregateRTPStats(statsList []*livekit.RTPStats) *livekit.RTPStats {
return utils.AggregateRTPStats(statsList, cGapHistogramNumBins)
}

View File

@@ -0,0 +1,411 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rtpstats
import (
"errors"
"fmt"
"sync"
"time"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"go.uber.org/zap/zapcore"
"google.golang.org/protobuf/types/known/timestamppb"
)
const (
cGapHistogramNumBins = 101
cNumSequenceNumbers = 65536
cFirstSnapshotID = 1
)
// -------------------------------------------------------
type RTPDeltaInfoLite struct {
StartTime time.Time
EndTime time.Time
Packets uint32
Bytes uint64
PacketsLost uint32
PacketsOutOfOrder uint32
Nacks uint32
}
type snapshotLite struct {
isValid bool
startTime time.Time
extStartSN uint64
bytes uint64
packetsOutOfOrder uint64
packetsLost uint64
nacks uint32
}
// ------------------------------------------------------------------
type RTPStatsParams struct {
ClockRate uint32
Logger logger.Logger
}
type rtpStatsBaseLite struct {
params RTPStatsParams
logger logger.Logger
lock sync.RWMutex
initialized bool
startTime time.Time
endTime time.Time
bytes uint64
packetsOutOfOrder uint64
packetsLost uint64
gapHistogram [cGapHistogramNumBins]uint32
nacks uint32
nextSnapshotLiteID uint32
snapshotLites []snapshotLite
}
func newRTPStatsBaseLite(params RTPStatsParams) *rtpStatsBaseLite {
return &rtpStatsBaseLite{
params: params,
logger: params.Logger,
nextSnapshotLiteID: cFirstSnapshotID,
snapshotLites: make([]snapshotLite, 2),
}
}
func (r *rtpStatsBaseLite) seed(from *rtpStatsBaseLite) bool {
if from == nil || !from.initialized {
return false
}
r.initialized = from.initialized
r.startTime = from.startTime
// do not clone endTime as a non-zero endTime indicates an ended object
r.bytes = from.bytes
r.packetsOutOfOrder = from.packetsOutOfOrder
r.packetsLost = from.packetsLost
r.gapHistogram = from.gapHistogram
r.nacks = from.nacks
r.nextSnapshotLiteID = from.nextSnapshotLiteID
r.snapshotLites = make([]snapshotLite, cap(from.snapshotLites))
copy(r.snapshotLites, from.snapshotLites)
return true
}
func (r *rtpStatsBaseLite) SetLogger(logger logger.Logger) {
r.logger = logger
}
func (r *rtpStatsBaseLite) Stop() {
r.lock.Lock()
defer r.lock.Unlock()
r.endTime = time.Now()
}
func (r *rtpStatsBaseLite) newSnapshotLiteID(extStartSN uint64) uint32 {
id := r.nextSnapshotLiteID
r.nextSnapshotLiteID++
if cap(r.snapshotLites) < int(r.nextSnapshotLiteID-cFirstSnapshotID) {
snapshotLites := make([]snapshotLite, r.nextSnapshotLiteID-cFirstSnapshotID)
copy(snapshotLites, r.snapshotLites)
r.snapshotLites = snapshotLites
}
if r.initialized {
r.snapshotLites[id-cFirstSnapshotID] = initSnapshotLite(time.Now(), extStartSN)
}
return id
}
func (r *rtpStatsBaseLite) IsActive() bool {
r.lock.RLock()
defer r.lock.RUnlock()
return r.initialized && r.endTime.IsZero()
}
func (r *rtpStatsBaseLite) UpdateNack(nackCount uint32) {
r.lock.Lock()
defer r.lock.Unlock()
if !r.endTime.IsZero() {
return
}
r.nacks += nackCount
}
func (r *rtpStatsBaseLite) getPacketsSeen(extStartSN, extHighestSN uint64) uint64 {
packetsExpected := getPacketsExpected(extStartSN, extHighestSN)
if r.packetsLost > packetsExpected {
// should not happen
return 0
}
return packetsExpected - r.packetsLost
}
func (r *rtpStatsBaseLite) deltaInfoLite(
snapshotLiteID uint32,
extStartSN uint64,
extHighestSN uint64,
) (deltaInfoLite *RTPDeltaInfoLite, err error, loggingFields []interface{}) {
then, now := r.getAndResetSnapshotLite(snapshotLiteID, extStartSN, extHighestSN)
if now == nil || then == nil {
return
}
startTime := then.startTime
endTime := now.startTime
packetsExpected := now.extStartSN - then.extStartSN
if then.extStartSN > extHighestSN {
packetsExpected = 0
}
if packetsExpected > cNumSequenceNumbers {
loggingFields = []interface{}{
"startSN", then.extStartSN,
"endSN", now.extStartSN,
"packetsExpected", packetsExpected,
"startTime", startTime,
"endTime", endTime,
"duration", endTime.Sub(startTime).String(),
}
err = errors.New("too many packets expected in delta lite")
return
}
if packetsExpected == 0 {
deltaInfoLite = &RTPDeltaInfoLite{
StartTime: startTime,
EndTime: endTime,
}
return
}
packetsLost := uint32(now.packetsLost - then.packetsLost)
if int32(packetsLost) < 0 {
packetsLost = 0
}
deltaInfoLite = &RTPDeltaInfoLite{
StartTime: startTime,
EndTime: endTime,
Packets: uint32(packetsExpected),
Bytes: now.bytes - then.bytes,
PacketsLost: packetsLost,
PacketsOutOfOrder: uint32(now.packetsOutOfOrder - then.packetsOutOfOrder),
Nacks: now.nacks - then.nacks,
}
return
}
func (r *rtpStatsBaseLite) marshalLogObject(e zapcore.ObjectEncoder, packetsExpected, packetsSeenMinusPadding uint64) (float64, error) {
if r == nil {
return 0, errors.New("no object")
}
endTime := r.endTime
if endTime.IsZero() {
endTime = time.Now()
}
elapsed := endTime.Sub(r.startTime)
if elapsed == 0 {
return 0, errors.New("no time elapsed")
}
elapsedSeconds := elapsed.Seconds()
e.AddTime("startTime", r.startTime)
e.AddTime("endTime", r.endTime)
e.AddDuration("elapsed", elapsed)
e.AddUint64("packetsExpected", packetsExpected)
e.AddFloat64("packetsExpectedRate", float64(packetsExpected)/elapsedSeconds)
e.AddUint64("packetsSeenPrimary", packetsSeenMinusPadding)
e.AddFloat64("packetsSeenPrimaryRate", float64(packetsSeenMinusPadding)/elapsedSeconds)
e.AddUint64("bytes", r.bytes)
e.AddFloat64("bitrate", float64(r.bytes)*8.0/elapsedSeconds)
e.AddUint64("packetsOutOfOrder", r.packetsOutOfOrder)
e.AddUint64("packetsLost", r.packetsLost)
e.AddFloat64("packetsLostRate", float64(r.packetsLost)/elapsedSeconds)
e.AddFloat32("packetLostPercentage", float32(r.packetsLost)/float32(packetsExpected)*100.0)
hasLoss := false
first := true
str := "["
for burst, count := range r.gapHistogram {
if count == 0 {
continue
}
hasLoss = true
if !first {
str += ", "
}
first = false
str += fmt.Sprintf("%d:%d", burst+1, count)
}
str += "]"
if hasLoss {
e.AddString("gapHistogram", str)
}
e.AddUint32("nacks", r.nacks)
return elapsedSeconds, nil
}
func (r *rtpStatsBaseLite) toProto(packetsExpected, packetsSeenMinusPadding, packetsLost uint64) *livekit.RTPStats {
if r.startTime.IsZero() {
return nil
}
endTime := r.endTime
if endTime.IsZero() {
endTime = time.Now()
}
elapsed := endTime.Sub(r.startTime).Seconds()
if elapsed == 0.0 {
return nil
}
packetRate := float64(packetsSeenMinusPadding) / elapsed
bitrate := float64(r.bytes) * 8.0 / elapsed
packetLostRate := float64(packetsLost) / elapsed
packetLostPercentage := float32(packetsLost) / float32(packetsExpected) * 100.0
p := &livekit.RTPStats{
StartTime: timestamppb.New(r.startTime),
EndTime: timestamppb.New(endTime),
Duration: elapsed,
Packets: uint32(packetsSeenMinusPadding),
PacketRate: packetRate,
Bytes: r.bytes,
Bitrate: bitrate,
PacketsLost: uint32(packetsLost),
PacketLossRate: packetLostRate,
PacketLossPercentage: packetLostPercentage,
PacketsOutOfOrder: uint32(r.packetsOutOfOrder),
Nacks: r.nacks,
}
gapsPresent := false
for i := 0; i < len(r.gapHistogram); i++ {
if r.gapHistogram[i] == 0 {
continue
}
gapsPresent = true
break
}
if gapsPresent {
p.GapHistogram = make(map[int32]uint32, len(r.gapHistogram))
for i := 0; i < len(r.gapHistogram); i++ {
if r.gapHistogram[i] == 0 {
continue
}
p.GapHistogram[int32(i+1)] = r.gapHistogram[i]
}
}
return p
}
func (r *rtpStatsBaseLite) getAndResetSnapshotLite(snapshotLiteID uint32, extStartSN uint64, extHighestSN uint64) (*snapshotLite, *snapshotLite) {
if !r.initialized {
return nil, nil
}
idx := snapshotLiteID - cFirstSnapshotID
then := r.snapshotLites[idx]
if !then.isValid {
then = initSnapshotLite(r.startTime, extStartSN)
r.snapshotLites[idx] = then
}
// snapshot now
now := r.getSnapshotLite(time.Now(), extHighestSN+1)
r.snapshotLites[idx] = now
return &then, &now
}
func (r *rtpStatsBaseLite) updateGapHistogram(gap int) {
if gap < 2 {
return
}
missing := gap - 1
if missing > len(r.gapHistogram) {
r.gapHistogram[len(r.gapHistogram)-1]++
} else {
r.gapHistogram[missing-1]++
}
}
func (r *rtpStatsBaseLite) getSnapshotLite(startTime time.Time, extStartSN uint64) snapshotLite {
return snapshotLite{
isValid: true,
startTime: startTime,
extStartSN: extStartSN,
bytes: r.bytes,
packetsOutOfOrder: r.packetsOutOfOrder,
packetsLost: r.packetsLost,
nacks: r.nacks,
}
}
// ----------------------------------
func initSnapshotLite(startTime time.Time, extStartSN uint64) snapshotLite {
return snapshotLite{
isValid: true,
startTime: startTime,
extStartSN: extStartSN,
}
}
func getPacketsExpected(extStartSN, extHighestSN uint64) uint64 {
return extHighestSN - extStartSN + 1
}
// ----------------------------------

View File

@@ -42,7 +42,6 @@ const (
type RTPFlowState struct {
IsNotHandled bool
HasLoss bool
LossStartInclusive uint64
LossEndExclusive uint64
@@ -59,7 +58,6 @@ func (r *RTPFlowState) MarshalLogObject(e zapcore.ObjectEncoder) error {
}
e.AddBool("IsNotHandled", r.IsNotHandled)
e.AddBool("HasLoss", r.HasLoss)
e.AddUint64("LossStartInclusive", r.LossStartInclusive)
e.AddUint64("LossEndExclusive", r.LossEndExclusive)
e.AddBool("IsDuplicate", r.IsDuplicate)
@@ -189,7 +187,7 @@ func (r *RTPStatsReceiver) Update(
// initialize snapshots if any
for i := uint32(0); i < r.nextSnapshotID-cFirstSnapshotID; i++ {
r.snapshots[i] = r.initSnapshot(r.startTime, r.sequenceNumber.GetExtendedStart())
r.snapshots[i] = initSnapshot(r.startTime, r.sequenceNumber.GetExtendedStart())
}
r.logger.Debugw(
@@ -331,11 +329,8 @@ func (r *RTPStatsReceiver) Update(
r.highestTime = packetTime
}
if gapSN > 1 {
flowState.HasLoss = true
flowState.LossStartInclusive = resSN.PreExtendedHighest + 1
flowState.LossEndExclusive = resSN.ExtendedVal
}
flowState.LossStartInclusive = resSN.PreExtendedHighest + 1
flowState.LossEndExclusive = resSN.ExtendedVal
}
flowState.ExtSequenceNumber = resSN.ExtendedVal
flowState.ExtTimestamp = resTS.ExtendedVal
@@ -652,25 +647,19 @@ func (r *RTPStatsReceiver) MarshalLogObject(e zapcore.ObjectEncoder) error {
return lockedRTPStatsReceiverLogEncoder{r}.MarshalLogObject(e)
}
func (r *RTPStatsReceiver) String() string {
r.lock.RLock()
defer r.lock.RUnlock()
return r.toString(
r.sequenceNumber.GetExtendedStart(), r.sequenceNumber.GetExtendedHighest(), r.timestamp.GetExtendedStart(), r.timestamp.GetExtendedHighest(),
r.packetsLost,
r.jitter, r.maxJitter,
)
}
func (r *RTPStatsReceiver) ToProto() *livekit.RTPStats {
r.lock.RLock()
defer r.lock.RUnlock()
extStartSN, extHighestSN := r.sequenceNumber.GetExtendedStart(), r.sequenceNumber.GetExtendedHighest()
return r.toProto(
r.sequenceNumber.GetExtendedStart(), r.sequenceNumber.GetExtendedHighest(), r.timestamp.GetExtendedStart(), r.timestamp.GetExtendedHighest(),
getPacketsExpected(extStartSN, extHighestSN),
r.getPacketsSeenMinusPadding(extStartSN, extHighestSN),
r.packetsLost,
r.jitter, r.maxJitter,
r.timestamp.GetExtendedStart(),
r.timestamp.GetExtendedHighest(),
r.jitter,
r.maxJitter,
)
}
@@ -713,21 +702,24 @@ func (r lockedRTPStatsReceiverLogEncoder) MarshalLogObject(e zapcore.ObjectEncod
return nil
}
e.AddObject("base", r.rtpStatsBase)
e.AddUint64("extStartSN", r.sequenceNumber.GetExtendedStart())
e.AddUint64("extHighestSN", r.sequenceNumber.GetExtendedHighest())
extStartSN, extHighestSN := r.sequenceNumber.GetExtendedStart(), r.sequenceNumber.GetExtendedHighest()
extStartTS, extHighestTS := r.timestamp.GetExtendedStart(), r.timestamp.GetExtendedHighest()
if err := r.rtpStatsBase.marshalLogObject(
e,
getPacketsExpected(extStartSN, extHighestSN),
r.getPacketsSeenMinusPadding(extStartSN, extHighestSN),
extStartTS,
extHighestTS,
); err != nil {
return err
}
e.AddUint64("extStartSN", extStartSN)
e.AddUint64("extHighestSN", extHighestSN)
e.AddUint64("extStartTS", extStartTS)
e.AddUint64("extHighestTS", extHighestTS)
e.AddObject("propagationDelayEstimator", r.propagationDelayEstimator)
packetDrift, ntpReportDrift, receivedReportDrift, rebasedReportDrift := r.getDrift(extStartTS, extHighestTS)
e.AddObject("packetDrift", wrappedRTPDriftLogger{packetDrift})
e.AddObject("ntpReportDrift", wrappedRTPDriftLogger{ntpReportDrift})
e.AddObject("receivedReportDrift", wrappedRTPDriftLogger{receivedReportDrift})
e.AddObject("rebasedReportDrift", wrappedRTPDriftLogger{rebasedReportDrift})
return nil
}

View File

@@ -0,0 +1,178 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rtpstats
import (
"time"
"go.uber.org/zap/zapcore"
"github.com/livekit/livekit-server/pkg/sfu/utils"
"github.com/livekit/protocol/livekit"
)
type RTPFlowStateLite struct {
IsNotHandled bool
LossStartInclusive uint64
LossEndExclusive uint64
ExtSequenceNumber uint64
}
func (r *RTPFlowStateLite) MarshalLogObject(e zapcore.ObjectEncoder) error {
if r == nil {
return nil
}
e.AddBool("IsNotHandled", r.IsNotHandled)
e.AddUint64("LossStartInclusive", r.LossStartInclusive)
e.AddUint64("LossEndExclusive", r.LossEndExclusive)
e.AddUint64("ExtSequenceNumber", r.ExtSequenceNumber)
return nil
}
// ---------------------------------------------------------------------
type RTPStatsReceiverLite struct {
*rtpStatsBaseLite
sequenceNumber *utils.WrapAround[uint16, uint64]
}
func NewRTPStatsReceiverLite(params RTPStatsParams) *RTPStatsReceiverLite {
return &RTPStatsReceiverLite{
rtpStatsBaseLite: newRTPStatsBaseLite(params),
sequenceNumber: utils.NewWrapAround[uint16, uint64](utils.WrapAroundParams{IsRestartAllowed: false}),
}
}
func (r *RTPStatsReceiverLite) NewSnapshotLiteId() uint32 {
r.lock.Lock()
defer r.lock.Unlock()
return r.newSnapshotLiteID(r.sequenceNumber.GetExtendedHighest())
}
func (r *RTPStatsReceiverLite) Update(packetTime int64, packetSize int, sequenceNumber uint16) (flowStateLite RTPFlowStateLite) {
r.lock.Lock()
defer r.lock.Unlock()
if !r.endTime.IsZero() {
flowStateLite.IsNotHandled = true
return
}
var resSN utils.WrapAroundUpdateResult[uint64]
if !r.initialized {
r.initialized = true
r.startTime = time.Now()
resSN = r.sequenceNumber.Update(sequenceNumber)
// initialize lite snapshots if any
for i := uint32(0); i < r.nextSnapshotLiteID-cFirstSnapshotID; i++ {
r.snapshotLites[i] = initSnapshotLite(r.startTime, r.sequenceNumber.GetExtendedStart())
}
r.logger.Debugw(
"rtp receiver lite stream start",
"rtpStats", lockedRTPStatsReceiverLiteLogEncoder{r},
)
} else {
resSN = r.sequenceNumber.Update(sequenceNumber)
if resSN.IsUnhandled {
flowStateLite.IsNotHandled = true
return
}
}
gapSN := int64(resSN.ExtendedVal - resSN.PreExtendedHighest)
if gapSN <= 0 { // duplicate OR out-of-order
r.packetsOutOfOrder++ // counting duplicate as out-of-order
r.packetsLost--
} else { // in-order
r.updateGapHistogram(int(gapSN))
r.packetsLost += uint64(gapSN - 1)
flowStateLite.LossStartInclusive = resSN.PreExtendedHighest + 1
flowStateLite.LossEndExclusive = resSN.ExtendedVal
}
flowStateLite.ExtSequenceNumber = resSN.ExtendedVal
r.bytes += uint64(packetSize)
return
}
func (r *RTPStatsReceiverLite) DeltaInfoLite(snapshotLiteID uint32) *RTPDeltaInfoLite {
r.lock.Lock()
defer r.lock.Unlock()
deltaInfoLite, err, loggingFields := r.deltaInfoLite(
snapshotLiteID,
r.sequenceNumber.GetExtendedStart(),
r.sequenceNumber.GetExtendedHighest(),
)
if err != nil {
r.logger.Infow(err.Error(), append(loggingFields, "rtpStats", lockedRTPStatsReceiverLiteLogEncoder{r})...)
}
return deltaInfoLite
}
func (r *RTPStatsReceiverLite) MarshalLogObject(e zapcore.ObjectEncoder) error {
if r == nil {
return nil
}
r.lock.RLock()
defer r.lock.RUnlock()
return lockedRTPStatsReceiverLiteLogEncoder{r}.MarshalLogObject(e)
}
func (r *RTPStatsReceiverLite) ToProto() *livekit.RTPStats {
r.lock.RLock()
defer r.lock.RUnlock()
return r.rtpStatsBaseLite.toProto(r.sequenceNumber.GetExtendedStart(), r.sequenceNumber.GetExtendedHighest(), r.packetsLost)
}
// ----------------------------------
type lockedRTPStatsReceiverLiteLogEncoder struct {
*RTPStatsReceiverLite
}
func (r lockedRTPStatsReceiverLiteLogEncoder) MarshalLogObject(e zapcore.ObjectEncoder) error {
if r.RTPStatsReceiverLite == nil {
return nil
}
extStartSN, extHighestSN := r.sequenceNumber.GetExtendedStart(), r.sequenceNumber.GetExtendedHighest()
if _, err := r.rtpStatsBaseLite.marshalLogObject(
e,
getPacketsExpected(extStartSN, extHighestSN),
getPacketsExpected(extStartSN, extHighestSN),
); err != nil {
return err
}
e.AddUint64("extStartSN", r.sequenceNumber.GetExtendedStart())
e.AddUint64("extHighestSN", r.sequenceNumber.GetExtendedHighest())
return nil
}
// ----------------------------------

View File

@@ -15,7 +15,6 @@
package rtpstats
import (
"fmt"
"math/rand"
"testing"
"time"
@@ -36,56 +35,6 @@ func getPacket(sn uint16, ts uint32, payloadSize int) *rtp.Packet {
}
}
func Test_RTPStatsReceiver(t *testing.T) {
clockRate := uint32(90000)
r := NewRTPStatsReceiver(RTPStatsParams{
ClockRate: clockRate,
Logger: logger.GetLogger(),
})
totalDuration := 5 * time.Second
bitrate := 1000000
packetSize := 1000
pps := (((bitrate + 7) / 8) + packetSize - 1) / packetSize
framerate := 30
sleep := 1000 / framerate
packetsPerFrame := (pps + framerate - 1) / framerate
sequenceNumber := uint16(rand.Float64() * float64(1<<16))
timestamp := uint32(rand.Float64() * float64(1<<32))
now := time.Now()
startTime := now
lastFrameTime := now
for now.Sub(startTime) < totalDuration {
timestamp += uint32(now.Sub(lastFrameTime).Seconds() * float64(clockRate))
for i := 0; i < packetsPerFrame; i++ {
packet := getPacket(sequenceNumber, timestamp, packetSize)
r.Update(
time.Now().UnixNano(),
packet.Header.SequenceNumber,
packet.Header.Timestamp,
packet.Header.Marker,
packet.Header.MarshalSize(),
len(packet.Payload),
0,
)
if (sequenceNumber % 100) == 0 {
jump := uint16(rand.Float64() * 120.0)
sequenceNumber += jump
} else {
sequenceNumber++
}
}
lastFrameTime = now
time.Sleep(time.Duration(sleep) * time.Millisecond)
now = time.Now()
}
r.Stop()
fmt.Printf("%s\n", r.String())
}
func Test_RTPStatsReceiver_Update(t *testing.T) {
clockRate := uint32(90000)
r := NewRTPStatsReceiver(RTPStatsParams{
@@ -105,7 +54,6 @@ func Test_RTPStatsReceiver_Update(t *testing.T) {
len(packet.Payload),
0,
)
require.False(t, flowState.HasLoss)
require.True(t, r.initialized)
require.Equal(t, sequenceNumber, r.sequenceNumber.GetHighest())
require.Equal(t, sequenceNumber, uint16(r.sequenceNumber.GetExtendedHighest()))
@@ -125,7 +73,6 @@ func Test_RTPStatsReceiver_Update(t *testing.T) {
len(packet.Payload),
0,
)
require.False(t, flowState.HasLoss)
require.Equal(t, sequenceNumber, r.sequenceNumber.GetHighest())
require.Equal(t, sequenceNumber, uint16(r.sequenceNumber.GetExtendedHighest()))
require.Equal(t, timestamp, r.timestamp.GetHighest())
@@ -142,7 +89,6 @@ func Test_RTPStatsReceiver_Update(t *testing.T) {
len(packet.Payload),
0,
)
require.False(t, flowState.HasLoss)
require.True(t, flowState.IsNotHandled)
require.Equal(t, sequenceNumber, r.sequenceNumber.GetHighest())
require.Equal(t, sequenceNumber, uint16(r.sequenceNumber.GetExtendedHighest()))
@@ -162,7 +108,6 @@ func Test_RTPStatsReceiver_Update(t *testing.T) {
len(packet.Payload),
0,
)
require.False(t, flowState.HasLoss)
require.True(t, flowState.IsNotHandled)
require.Equal(t, sequenceNumber, r.sequenceNumber.GetHighest())
require.Equal(t, sequenceNumber, uint16(r.sequenceNumber.GetExtendedHighest()))
@@ -184,7 +129,6 @@ func Test_RTPStatsReceiver_Update(t *testing.T) {
len(packet.Payload),
0,
)
require.True(t, flowState.HasLoss)
require.Equal(t, uint64(sequenceNumber-9), flowState.LossStartInclusive)
require.Equal(t, uint64(sequenceNumber), flowState.LossEndExclusive)
require.Equal(t, uint64(9), r.packetsLost)
@@ -200,7 +144,6 @@ func Test_RTPStatsReceiver_Update(t *testing.T) {
len(packet.Payload),
0,
)
require.False(t, flowState.HasLoss)
require.Equal(t, sequenceNumber, r.sequenceNumber.GetHighest())
require.Equal(t, sequenceNumber, uint16(r.sequenceNumber.GetExtendedHighest()))
require.Equal(t, timestamp, r.timestamp.GetHighest())
@@ -223,7 +166,6 @@ func Test_RTPStatsReceiver_Update(t *testing.T) {
len(packet.Payload),
0,
)
require.True(t, flowState.HasLoss)
require.Equal(t, uint64(sequenceNumber-1), flowState.LossStartInclusive)
require.Equal(t, uint64(sequenceNumber), flowState.LossEndExclusive)
require.Equal(t, uint64(9), r.packetsLost)
@@ -242,7 +184,6 @@ func Test_RTPStatsReceiver_Update(t *testing.T) {
len(packet.Payload),
0,
)
require.False(t, flowState.HasLoss)
require.Equal(t, uint64(8), r.packetsLost)
require.Equal(t, uint64(2), r.packetsOutOfOrder)
require.True(t, r.history.IsSet(uint64(sequenceNumber)))
@@ -260,7 +201,6 @@ func Test_RTPStatsReceiver_Update(t *testing.T) {
len(packet.Payload),
25,
)
require.False(t, flowState.HasLoss)
require.Equal(t, uint64(8), r.packetsLost)
require.Equal(t, uint64(2), r.packetsOutOfOrder)
require.True(t, r.history.IsSet(uint64(sequenceNumber)))

View File

@@ -25,7 +25,6 @@ import (
"github.com/livekit/mediatransportutil"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
)
const (
@@ -228,7 +227,7 @@ func (r *RTPStatsSender) NewSenderSnapshotId() uint32 {
}
if r.initialized {
r.senderSnapshots[id-cFirstSnapshotID] = r.initSenderSnapshot(time.Now(), r.extHighestSN)
r.senderSnapshots[id-cFirstSnapshotID] = initSenderSnapshot(time.Now(), r.extHighestSN)
}
return id
}
@@ -270,10 +269,10 @@ func (r *RTPStatsSender) Update(
// initialize snapshots if any
for i := uint32(0); i < r.nextSnapshotID-cFirstSnapshotID; i++ {
r.snapshots[i] = r.initSnapshot(r.startTime, r.extStartSN)
r.snapshots[i] = initSnapshot(r.startTime, r.extStartSN)
}
for i := uint32(0); i < r.nextSenderSnapshotID-cFirstSnapshotID; i++ {
r.senderSnapshots[i] = r.initSenderSnapshot(r.startTime, r.extStartSN)
r.senderSnapshots[i] = initSenderSnapshot(r.startTime, r.extStartSN)
}
r.logger.Debugw(
@@ -436,11 +435,11 @@ func (r *RTPStatsSender) Update(
}
}
func (r *RTPStatsSender) GetTotalPacketsPrimary() uint64 {
func (r *RTPStatsSender) GetPacketsSeenMinusPadding() uint64 {
r.lock.RLock()
defer r.lock.RUnlock()
return r.getTotalPacketsPrimary(r.extStartSN, r.extHighestSN)
return r.getPacketsSeenMinusPadding(r.extStartSN, r.extHighestSN)
}
func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt uint32, isRttChanged bool) {
@@ -628,7 +627,7 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, publisherSRData *livek
nowRTPExt = publisherSRData.RtpTimestampExt - tsOffset + uint64(timeSincePublisherSRAdjusted.Nanoseconds()*int64(r.params.ClockRate)/1e9)
}
packetCount := uint32(r.getTotalPacketsPrimary(r.extStartSN, r.extHighestSN) + r.packetsDuplicate + r.packetsPadding)
packetCount := uint32(r.getPacketsSeenPlusDuplicates(r.extStartSN, r.extHighestSN))
octetCount := r.bytes + r.bytesDuplicate + r.bytesPadding
srData := &livekit.RTCPSenderReportState{
NtpTimestamp: uint64(nowNTP),
@@ -809,25 +808,18 @@ func (r *RTPStatsSender) MarshalLogObject(e zapcore.ObjectEncoder) error {
return lockedRTPStatsSenderLogEncoder{r}.MarshalLogObject(e)
}
func (r *RTPStatsSender) String() string {
r.lock.RLock()
defer r.lock.RUnlock()
return r.toString(
r.extStartSN, r.extHighestSN, r.extStartTS, r.extHighestTS,
r.packetsLostFromRR,
r.jitterFromRR, r.maxJitterFromRR,
)
}
func (r *RTPStatsSender) ToProto() *livekit.RTPStats {
r.lock.RLock()
defer r.lock.RUnlock()
return r.toProto(
r.extStartSN, r.extHighestSN, r.extStartTS, r.extHighestTS,
getPacketsExpected(r.extStartSN, r.extHighestSN),
r.getPacketsSeenMinusPadding(r.extStartSN, r.extHighestSN),
r.packetsLostFromRR,
r.jitterFromRR, r.maxJitterFromRR,
r.extStartTS,
r.extHighestTS,
r.jitterFromRR,
r.maxJitterFromRR,
)
}
@@ -839,7 +831,7 @@ func (r *RTPStatsSender) getAndResetSenderSnapshot(senderSnapshotID uint32) (*se
idx := senderSnapshotID - cFirstSnapshotID
then := r.senderSnapshots[idx]
if !then.isValid {
then = r.initSenderSnapshot(r.startTime, r.extStartSN)
then = initSenderSnapshot(r.startTime, r.extStartSN)
r.senderSnapshots[idx] = then
}
@@ -849,15 +841,6 @@ func (r *RTPStatsSender) getAndResetSenderSnapshot(senderSnapshotID uint32) (*se
return &then, &now
}
func (r *RTPStatsSender) initSenderSnapshot(startTime time.Time, extStartSN uint64) senderSnapshot {
return senderSnapshot{
isValid: true,
startTime: startTime,
extStartSN: extStartSN,
extLastRRSN: extStartSN - 1,
}
}
func (r *RTPStatsSender) getSenderSnapshot(startTime time.Time, s *senderSnapshot) senderSnapshot {
if s == nil {
return senderSnapshot{}
@@ -1006,7 +989,16 @@ func (r lockedRTPStatsSenderLogEncoder) MarshalLogObject(e zapcore.ObjectEncoder
return nil
}
e.AddObject("base", r.rtpStatsBase)
if err := r.rtpStatsBase.marshalLogObject(
e,
getPacketsExpected(r.extStartSN, r.extHighestSN),
r.getPacketsSeenMinusPadding(r.extStartSN, r.extHighestSN),
r.extStartTS,
r.extHighestTS,
); err != nil {
return err
}
e.AddUint64("extStartSN", r.extStartSN)
e.AddUint64("extHighestSN", r.extHighestSN)
e.AddUint64("extStartTS", r.extStartTS)
@@ -1017,11 +1009,16 @@ func (r lockedRTPStatsSenderLogEncoder) MarshalLogObject(e zapcore.ObjectEncoder
e.AddUint64("packetsLostFromRR", r.packetsLostFromRR)
e.AddFloat64("jitterFromRR", r.jitterFromRR)
e.AddFloat64("maxJitterFromRR", r.maxJitterFromRR)
packetDrift, ntpReportDrift, receivedReportDrift, rebasedReportDrift := r.getDrift(r.extStartTS, r.extHighestTS)
e.AddObject("packetDrift", logger.Proto(packetDrift))
e.AddObject("ntpReportDrift", logger.Proto(ntpReportDrift))
e.AddObject("receivedReportDrift", logger.Proto(receivedReportDrift))
e.AddObject("rebasedReportDrift", logger.Proto(rebasedReportDrift))
return nil
}
// -------------------------------------------------------------------
func initSenderSnapshot(startTime time.Time, extStartSN uint64) senderSnapshot {
return senderSnapshot{
isValid: true,
startTime: startTime,
extStartSN: extStartSN,
extLastRRSN: extStartSN - 1,
}
}