diff --git a/go.mod b/go.mod index 195418769..ff9d23379 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 - github.com/livekit/protocol v1.23.1-0.20241003084409-2406243b2f49 + github.com/livekit/protocol v1.23.1-0.20241005050546-ec3b682c05d8 github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9 github.com/mackerelio/go-osstat v0.2.5 github.com/magefile/mage v1.15.0 @@ -134,7 +134,7 @@ require ( golang.org/x/text v0.19.0 // indirect golang.org/x/tools v0.26.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240924160255-9d4c2d233b61 // indirect - google.golang.org/grpc v1.67.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240930140551-af27646dc61f // indirect + google.golang.org/grpc v1.67.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index 2f76414fb..131be5be2 100644 --- a/go.sum +++ b/go.sum @@ -165,8 +165,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 h1:yLlkHk2feSLHstD9n4VKg7YEBR4rLODTI4WE8gNBEnQ= github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE= -github.com/livekit/protocol v1.23.1-0.20241003084409-2406243b2f49 h1:mk33tsjwZM8czksJbAj+xQfPfjnPo/RcGUYJLLfltOY= -github.com/livekit/protocol v1.23.1-0.20241003084409-2406243b2f49/go.mod h1:nxRzmQBKSYK64gqr7ABWwt78hvrgiO2wYuCojRYb7Gs= +github.com/livekit/protocol v1.23.1-0.20241005050546-ec3b682c05d8 h1:7cJS9u4VfOcvTeERxwk4yRTOIkwC44LWLSELI+OxWlc= +github.com/livekit/protocol v1.23.1-0.20241005050546-ec3b682c05d8/go.mod h1:nxRzmQBKSYK64gqr7ABWwt78hvrgiO2wYuCojRYb7Gs= github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9 h1:33oBjGpVD9tYkDXQU42tnHl8eCX9G6PVUToBVuCUyOs= github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o= @@ -482,10 +482,10 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142 h1:wKguEg1hsxI2/L3hUYrpo1RVi48K+uTyzKqprwLXsb8= google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142/go.mod h1:d6be+8HhtEtucleCbxpPW9PA9XwISACu8nvpPqF0BVo= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240924160255-9d4c2d233b61 h1:N9BgCIAUvn/M+p4NJccWPWb3BWh88+zyL0ll9HgbEeM= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240924160255-9d4c2d233b61/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= -google.golang.org/grpc v1.67.0 h1:IdH9y6PF5MPSdAntIcpjQ+tXO41pcQsfZV2RxtQgVcw= -google.golang.org/grpc v1.67.0/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240930140551-af27646dc61f h1:cUMEy+8oS78BWIH9OWazBkzbr090Od9tWBNtZHkOhf0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240930140551-af27646dc61f/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= +google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E= +google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= diff --git a/pkg/service/roomservice_test.go b/pkg/service/roomservice_test.go index 8969ee84c..d34d5103e 100644 --- a/pkg/service/roomservice_test.go +++ b/pkg/service/roomservice_test.go @@ -72,8 +72,8 @@ func TestMetaDataLimits(t *testing.T) { }) notExceedsLimitsSvc := map[string]*TestRoomService{ - "metadata noe exceeds limits": newTestRoomService(config.LimitConfig{MaxMetadataSize: 5}), - "metadata no limits": newTestRoomService(config.LimitConfig{}), // no limits + "metadata exceeds limits": newTestRoomService(config.LimitConfig{MaxMetadataSize: 5}), + "metadata no limits": newTestRoomService(config.LimitConfig{}), // no limits } for n, s := range notExceedsLimitsSvc { diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index 00baf1f84..3a7d8287e 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -731,10 +731,8 @@ func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime int64) rtpstats.RT if b.nacker != nil { b.nacker.Remove(p.SequenceNumber) - if flowState.HasLoss { - for lost := flowState.LossStartInclusive; lost != flowState.LossEndExclusive; lost++ { - b.nacker.Push(uint16(lost)) - } + for lost := flowState.LossStartInclusive; lost != flowState.LossEndExclusive; lost++ { + b.nacker.Push(uint16(lost)) } } diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index c6c7f9b30..b2b3a5095 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -142,11 +142,6 @@ type DownTrackState struct { ForwarderState *livekit.RTPForwarderState } -func (d DownTrackState) String() string { - return fmt.Sprintf("DownTrackState{rtpStats: %s, deltaSender: %d, forwarder: %s}", - d.RTPStats, d.DeltaStatsSenderSnapshotId, d.ForwarderState.String()) -} - func (d DownTrackState) MarshalLogObject(e zapcore.ObjectEncoder) error { e.AddObject("RTPStats", d.RTPStats) e.AddUint32("DeltaStatsSenderSnapshotId", d.DeltaStatsSenderSnapshotId) @@ -2087,11 +2082,11 @@ func (d *DownTrack) GetLastReceiverReportTime() time.Time { } func (d *DownTrack) GetTotalPacketsSent() uint64 { - return d.rtpStats.GetTotalPacketsPrimary() + return d.rtpStats.GetPacketsSeenMinusPadding() } func (d *DownTrack) GetNackStats() (totalPackets uint32, totalRepeatedNACKs uint32) { - totalPackets = uint32(d.rtpStats.GetTotalPacketsPrimary()) + totalPackets = uint32(d.rtpStats.GetPacketsSeenMinusPadding()) totalRepeatedNACKs = d.totalRepeatedNACKs.Load() return } diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index c1c3e5c2a..d92ddb50e 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -30,6 +30,7 @@ import ( "github.com/livekit/mediatransportutil" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" + "github.com/livekit/protocol/logger/zaputil" "github.com/livekit/protocol/utils" "github.com/livekit/livekit-server/pkg/sfu/buffer" @@ -207,20 +208,6 @@ func (r refInfo) MarshalLogObject(e zapcore.ObjectEncoder) error { // ------------------------------------------------------------------- -type wrappedRefInfosLogger struct { - *Forwarder -} - -func (w wrappedRefInfosLogger) MarshalLogObject(e zapcore.ObjectEncoder) error { - for i, refInfo := range w.Forwarder.refInfos { - e.AddObject(fmt.Sprintf("%d", i), refInfo) - } - - return nil -} - -// ------------------------------------------------------------------- - type Forwarder struct { lock sync.RWMutex codec webrtc.RTPCodecCapability @@ -1667,7 +1654,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e "extRefTS", extRefTS, "extLastTS", extLastTS, "diffSeconds", math.Abs(diffSeconds), - "refInfos", wrappedRefInfosLogger{f}, + "refInfos", zaputil.ObjectSlice(f.refInfos[:]), ) } // TODO-REMOVE-AFTER-DATA-COLLECTION @@ -1682,7 +1669,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e "extRefTS", extRefTS, "extLastTS", extLastTS, "diffSeconds", math.Abs(diffSeconds), - "refInfos", wrappedRefInfosLogger{f}, + "refInfos", zaputil.ObjectSlice(f.refInfos[:]), ) } @@ -1878,7 +1865,7 @@ func (f *Forwarder) getTranslationParamsCommon(extPkt *buffer.ExtPacket, layer i "could not switch feed", "error", err, "layer", layer, - "refInfos", wrappedRefInfosLogger{f}, + "refInfos", zaputil.ObjectSlice(f.refInfos[:]), "currentLayer", f.vls.GetCurrent(), "targetLayer", f.vls.GetCurrent(), "maxLayer", f.vls.GetMax(), @@ -1891,7 +1878,7 @@ func (f *Forwarder) getTranslationParamsCommon(extPkt *buffer.ExtPacket, layer i "from", f.lastSSRC, "to", extPkt.Packet.SSRC, "layer", layer, - "refInfos", wrappedRefInfosLogger{f}, + "refInfos", zaputil.ObjectSlice(f.refInfos[:]), "currentLayer", f.vls.GetCurrent(), "targetLayer", f.vls.GetCurrent(), "maxLayer", f.vls.GetMax(), diff --git a/pkg/sfu/rtpstats/rtpstats_base.go b/pkg/sfu/rtpstats/rtpstats_base.go index 38881facd..21bfe8b9d 100644 --- a/pkg/sfu/rtpstats/rtpstats_base.go +++ b/pkg/sfu/rtpstats/rtpstats_base.go @@ -16,8 +16,6 @@ package rtpstats import ( "errors" - "fmt" - "sync" "time" "go.uber.org/zap/zapcore" @@ -25,15 +23,10 @@ import ( "github.com/livekit/mediatransportutil" "github.com/livekit/protocol/livekit" - "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils" ) const ( - cGapHistogramNumBins = 101 - cNumSequenceNumbers = 65536 - cFirstSnapshotID = 1 - cFirstPacketTimeAdjustWindow = 2 * time.Minute cFirstPacketTimeAdjustThreshold = 15 * 1e9 @@ -42,20 +35,6 @@ const ( // ------------------------------------------------------- -func RTPDriftToString(r *livekit.RTPDrift) string { - if r == nil { - return "-" - } - - str := fmt.Sprintf("t: %+v|%+v|%.2fs", r.StartTime.AsTime().Format(time.UnixDate), r.EndTime.AsTime().Format(time.UnixDate), r.Duration) - str += fmt.Sprintf(", ts: %d|%d|%d", r.StartTimestamp, r.EndTimestamp, r.RtpClockTicks) - str += fmt.Sprintf(", d: %d|%.2fms", r.DriftSamples, r.DriftMs) - str += fmt.Sprintf(", cr: %.2f", r.ClockRate) - return str -} - -// ------------------------------------------------------- - type RTPDeltaInfo struct { StartTime time.Time EndTime time.Time @@ -80,31 +59,22 @@ type RTPDeltaInfo struct { } type snapshot struct { - isValid bool + snapshotLite - startTime time.Time - - extStartSN uint64 - bytes uint64 headerBytes uint64 - packetsPadding uint64 - bytesPadding uint64 - headerBytesPadding uint64 - packetsDuplicate uint64 bytesDuplicate uint64 headerBytesDuplicate uint64 - packetsOutOfOrder uint64 - - packetsLost uint64 + packetsPadding uint64 + bytesPadding uint64 + headerBytesPadding uint64 frames uint32 - nacks uint32 - plis uint32 - firs uint32 + plis uint32 + firs uint32 maxRtt uint32 maxJitter float64 @@ -166,21 +136,8 @@ func RTCPSenderReportPropagationDelay(rsrs *livekit.RTCPSenderReportState, passT // ------------------------------------------------------------------ -type RTPStatsParams struct { - ClockRate uint32 - Logger logger.Logger -} - type rtpStatsBase struct { - params RTPStatsParams - logger logger.Logger - - lock sync.RWMutex - - initialized bool - - startTime time.Time - endTime time.Time + *rtpStatsBaseLite firstTime int64 firstTimeAdjustment time.Duration @@ -189,27 +146,21 @@ type rtpStatsBase struct { lastTransit uint64 lastJitterExtTimestamp uint64 - bytes uint64 - headerBytes uint64 + headerBytes uint64 + + packetsDuplicate uint64 bytesDuplicate uint64 headerBytesDuplicate uint64 - bytesPadding uint64 - headerBytesPadding uint64 - packetsDuplicate uint64 - packetsPadding uint64 - packetsOutOfOrder uint64 - - packetsLost uint64 + packetsPadding uint64 + bytesPadding uint64 + headerBytesPadding uint64 frames uint32 jitter float64 maxJitter float64 - gapHistogram [cGapHistogramNumBins]uint32 - - nacks uint32 nackAcks uint32 nackMisses uint32 nackRepeated uint32 @@ -238,10 +189,9 @@ type rtpStatsBase struct { func newRTPStatsBase(params RTPStatsParams) *rtpStatsBase { return &rtpStatsBase{ - params: params, - logger: params.Logger, - nextSnapshotID: cFirstSnapshotID, - snapshots: make([]snapshot, 2), + rtpStatsBaseLite: newRTPStatsBaseLite(params), + nextSnapshotID: cFirstSnapshotID, + snapshots: make([]snapshot, 2), } } @@ -250,10 +200,9 @@ func (r *rtpStatsBase) seed(from *rtpStatsBase) bool { return false } - r.initialized = from.initialized - - r.startTime = from.startTime - // do not clone endTime as a non-zero endTime indicates an ended object + if !r.rtpStatsBaseLite.seed(from.rtpStatsBaseLite) { + return false + } r.firstTime = from.firstTime r.highestTime = from.highestTime @@ -261,27 +210,19 @@ func (r *rtpStatsBase) seed(from *rtpStatsBase) bool { r.lastTransit = from.lastTransit r.lastJitterExtTimestamp = from.lastJitterExtTimestamp - r.bytes = from.bytes r.headerBytes = from.headerBytes - r.bytesDuplicate = from.bytesDuplicate + r.headerBytesDuplicate = from.headerBytesDuplicate + + r.packetsPadding = from.packetsPadding r.bytesPadding = from.bytesPadding r.headerBytesPadding = from.headerBytesPadding - r.packetsDuplicate = from.packetsDuplicate - r.packetsPadding = from.packetsPadding - - r.packetsOutOfOrder = from.packetsOutOfOrder - - r.packetsLost = from.packetsLost r.frames = from.frames r.jitter = from.jitter r.maxJitter = from.maxJitter - r.gapHistogram = from.gapHistogram - - r.nacks = from.nacks r.nackAcks = from.nackAcks r.nackMisses = from.nackMisses r.nackRepeated = from.nackRepeated @@ -310,17 +251,6 @@ func (r *rtpStatsBase) seed(from *rtpStatsBase) bool { return true } -func (r *rtpStatsBase) SetLogger(logger logger.Logger) { - r.logger = logger -} - -func (r *rtpStatsBase) Stop() { - r.lock.Lock() - defer r.lock.Unlock() - - r.endTime = time.Now() -} - func (r *rtpStatsBase) newSnapshotID(extStartSN uint64) uint32 { id := r.nextSnapshotID r.nextSnapshotID++ @@ -332,29 +262,11 @@ func (r *rtpStatsBase) newSnapshotID(extStartSN uint64) uint32 { } if r.initialized { - r.snapshots[id-cFirstSnapshotID] = r.initSnapshot(time.Now(), extStartSN) + r.snapshots[id-cFirstSnapshotID] = initSnapshot(time.Now(), extStartSN) } return id } -func (r *rtpStatsBase) IsActive() bool { - r.lock.RLock() - defer r.lock.RUnlock() - - return r.initialized && r.endTime.IsZero() -} - -func (r *rtpStatsBase) UpdateNack(nackCount uint32) { - r.lock.Lock() - defer r.lock.Unlock() - - if !r.endTime.IsZero() { - return - } - - r.nacks += nackCount -} - func (r *rtpStatsBase) UpdateNackProcessed(nackAckCount uint32, nackMissCount uint32, nackRepeatedCount uint32) { r.lock.Lock() defer r.lock.Unlock() @@ -558,14 +470,8 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *livekit.RTCPSenderRepo return } -func (r *rtpStatsBase) getTotalPacketsPrimary(extStartSN, extHighestSN uint64) uint64 { - packetsExpected := extHighestSN - extStartSN + 1 - if r.packetsLost > packetsExpected { - // should not happen - return 0 - } - - packetsSeen := packetsExpected - r.packetsLost +func (r *rtpStatsBase) getPacketsSeenMinusPadding(extStartSN, extHighestSN uint64) uint64 { + packetsSeen := r.getPacketsSeen(extStartSN, extHighestSN) if r.packetsPadding > packetsSeen { return 0 } @@ -573,7 +479,15 @@ func (r *rtpStatsBase) getTotalPacketsPrimary(extStartSN, extHighestSN uint64) u return packetsSeen - r.packetsPadding } -func (r *rtpStatsBase) deltaInfo(snapshotID uint32, extStartSN uint64, extHighestSN uint64) (deltaInfo *RTPDeltaInfo, err error, loggingFields []interface{}) { +func (r *rtpStatsBase) getPacketsSeenPlusDuplicates(extStartSN, extHighestSN uint64) uint64 { + return r.getPacketsSeen(extStartSN, extHighestSN) + r.packetsDuplicate +} + +func (r *rtpStatsBase) deltaInfo( + snapshotID uint32, + extStartSN uint64, + extHighestSN uint64, +) (deltaInfo *RTPDeltaInfo, err error, loggingFields []interface{}) { then, now := r.getAndResetSnapshot(snapshotID, extStartSN, extHighestSN) if now == nil || then == nil { return @@ -651,59 +565,44 @@ func (r *rtpStatsBase) deltaInfo(snapshotID uint32, extStartSN uint64, extHighes return } -func (r *rtpStatsBase) MarshalLogObject(e zapcore.ObjectEncoder) error { +func (r *rtpStatsBase) marshalLogObject( + e zapcore.ObjectEncoder, + packetsExpected, packetsSeenMinusPadding uint64, + extStartTS, extHighestTS uint64, +) error { if r == nil { return nil } - e.AddTime("startTime", r.startTime) - e.AddTime("endTime", r.endTime) + elapsedSeconds, err := r.rtpStatsBaseLite.marshalLogObject(e, packetsExpected, packetsSeenMinusPadding) + if err != nil { + return err + } + e.AddTime("firstTime", time.Unix(0, r.firstTime)) e.AddDuration("firstTimeAdjustment", r.firstTimeAdjustment) e.AddTime("highestTime", time.Unix(0, r.highestTime)) - e.AddUint64("bytes", r.bytes) e.AddUint64("headerBytes", r.headerBytes) e.AddUint64("packetsDuplicate", r.packetsDuplicate) + e.AddFloat64("packetsDuplicateRate", float64(r.packetsDuplicate)/elapsedSeconds) e.AddUint64("bytesDuplicate", r.bytesDuplicate) + e.AddFloat64("bitrateDuplicate", float64(r.bytesDuplicate)*8.0/elapsedSeconds) e.AddUint64("headerBytesDuplicate", r.headerBytesDuplicate) e.AddUint64("packetsPadding", r.packetsPadding) + e.AddFloat64("packetsPaddingRate", float64(r.packetsPadding)/elapsedSeconds) e.AddUint64("bytesPadding", r.bytesPadding) + e.AddFloat64("bitratePadding", float64(r.bytesPadding)*8.0/elapsedSeconds) e.AddUint64("headerBytesPadding", r.headerBytesPadding) - e.AddUint64("packetsOutOfOrder", r.packetsOutOfOrder) - - e.AddUint64("packetsLost", r.packetsLost) - e.AddUint32("frames", r.frames) + e.AddFloat64("frameRate", float64(r.frames)/elapsedSeconds) e.AddFloat64("jitter", r.jitter) e.AddFloat64("maxJitter", r.maxJitter) - hasLoss := false - first := true - str := "[" - for burst, count := range r.gapHistogram { - if count == 0 { - continue - } - - hasLoss = true - - if !first { - str += ", " - } - first = false - str += fmt.Sprintf("%d:%d", burst+1, count) - } - str += "]" - if hasLoss { - e.AddString("gapHistogram", str) - } - - e.AddUint32("nacks", r.nacks) e.AddUint32("nackAcks", r.nackAcks) e.AddUint32("nackMisses", r.nackMisses) e.AddUint32("nackRepeated", r.nackRepeated) @@ -725,190 +624,65 @@ func (r *rtpStatsBase) MarshalLogObject(e zapcore.ObjectEncoder) error { e.AddObject("srFirst", WrappedRTCPSenderReportStateLogger{r.srFirst}) e.AddObject("srNewest", WrappedRTCPSenderReportStateLogger{r.srNewest}) + + packetDrift, ntpReportDrift, receivedReportDrift, rebasedReportDrift := r.getDrift(extStartTS, extHighestTS) + e.AddObject("packetDrift", wrappedRTPDriftLogger{packetDrift}) + e.AddObject("ntpReportDrift", wrappedRTPDriftLogger{ntpReportDrift}) + e.AddObject("receivedReportDrift", wrappedRTPDriftLogger{receivedReportDrift}) + e.AddObject("rebasedReportDrift", wrappedRTPDriftLogger{rebasedReportDrift}) return nil } -func (r *rtpStatsBase) toString( - extStartSN, extHighestSN, extStartTS, extHighestTS uint64, - packetsLost uint64, - jitter, maxJitter float64, -) string { - p := r.toProto( - extStartSN, extHighestSN, extStartTS, extHighestTS, - packetsLost, - jitter, maxJitter, - ) - if p == nil { - return "" - } - - expectedPackets := extHighestSN - extStartSN + 1 - expectedPacketRate := float64(expectedPackets) / p.Duration - - str := fmt.Sprintf("t: %+v|%+v|%.2fs", p.StartTime.AsTime().Format(time.UnixDate), p.EndTime.AsTime().Format(time.UnixDate), p.Duration) - - str += fmt.Sprintf(", sn: %d|%d", extStartSN, extHighestSN) - str += fmt.Sprintf(", ep: %d|%.2f/s", expectedPackets, expectedPacketRate) - - str += fmt.Sprintf(", p: %d|%.2f/s", p.Packets, p.PacketRate) - str += fmt.Sprintf(", l: %d|%.1f/s|%.2f%%", p.PacketsLost, p.PacketLossRate, p.PacketLossPercentage) - str += fmt.Sprintf(", b: %d|%.1fbps|%d", p.Bytes, p.Bitrate, p.HeaderBytes) - str += fmt.Sprintf(", f: %d|%.1f/s / %d|%+v", p.Frames, p.FrameRate, p.KeyFrames, p.LastKeyFrame.AsTime().Format(time.UnixDate)) - - str += fmt.Sprintf(", d: %d|%.2f/s", p.PacketsDuplicate, p.PacketDuplicateRate) - str += fmt.Sprintf(", bd: %d|%.1fbps|%d", p.BytesDuplicate, p.BitrateDuplicate, p.HeaderBytesDuplicate) - - str += fmt.Sprintf(", pp: %d|%.2f/s", p.PacketsPadding, p.PacketPaddingRate) - str += fmt.Sprintf(", bp: %d|%.1fbps|%d", p.BytesPadding, p.BitratePadding, p.HeaderBytesPadding) - - str += fmt.Sprintf(", o: %d", p.PacketsOutOfOrder) - - str += fmt.Sprintf(", c: %d, j: %d(%.1fus)|%d(%.1fus)", r.params.ClockRate, uint32(jitter), p.JitterCurrent, uint32(maxJitter), p.JitterMax) - - if len(p.GapHistogram) != 0 { - first := true - str += ", gh:[" - for burst, count := range p.GapHistogram { - if !first { - str += ", " - } - first = false - str += fmt.Sprintf("%d:%d", burst, count) - } - str += "]" - } - - str += ", n:" - str += fmt.Sprintf("%d|%d|%d|%d", p.Nacks, p.NackAcks, p.NackMisses, p.NackRepeated) - - str += ", pli:" - str += fmt.Sprintf("%d|%+v / %d|%+v", - p.Plis, p.LastPli.AsTime().Format(time.UnixDate), - p.LayerLockPlis, p.LastLayerLockPli.AsTime().Format(time.UnixDate), - ) - - str += ", fir:" - str += fmt.Sprintf("%d|%+v", p.Firs, p.LastFir.AsTime().Format(time.UnixDate)) - - str += ", rtt(ms):" - str += fmt.Sprintf("%d|%d", p.RttCurrent, p.RttMax) - - str += fmt.Sprintf(", pd: %s, nrd: %s, rxrd: %s, rbrd: %s", - RTPDriftToString(p.PacketDrift), - RTPDriftToString(p.NtpReportDrift), - RTPDriftToString(p.ReceivedReportDrift), - RTPDriftToString(p.RebasedReportDrift), - ) - return str -} - func (r *rtpStatsBase) toProto( - extStartSN, extHighestSN, extStartTS, extHighestTS uint64, - packetsLost uint64, + packetsExpected, packetsSeenMinusPadding, packetsLost uint64, + extStartTS, extHighestTS uint64, jitter, maxJitter float64, ) *livekit.RTPStats { - if r.startTime.IsZero() { + p := r.rtpStatsBaseLite.toProto(packetsExpected, packetsSeenMinusPadding, packetsLost) + if p == nil { return nil } - endTime := r.endTime - if endTime.IsZero() { - endTime = time.Now() - } - elapsed := endTime.Sub(r.startTime).Seconds() - if elapsed == 0.0 { - return nil - } + p.HeaderBytes = r.headerBytes - packets := r.getTotalPacketsPrimary(extStartSN, extHighestSN) - packetRate := float64(packets) / elapsed - bitrate := float64(r.bytes) * 8.0 / elapsed + p.PacketsDuplicate = uint32(r.packetsDuplicate) + p.PacketDuplicateRate = float64(r.packetsDuplicate) / p.Duration + p.BytesDuplicate = r.bytesDuplicate + p.BitrateDuplicate = float64(r.bytesDuplicate) * 8.0 / p.Duration + p.HeaderBytesDuplicate = r.headerBytesDuplicate - frameRate := float64(r.frames) / elapsed + p.PacketsPadding = uint32(r.packetsPadding) + p.PacketPaddingRate = float64(r.packetsPadding) / p.Duration + p.BytesPadding = r.bytesPadding + p.BitratePadding = float64(r.bytesPadding) * 8.0 / p.Duration + p.HeaderBytesPadding = r.headerBytesPadding - packetsExpected := extHighestSN - extStartSN + 1 - packetLostRate := float64(packetsLost) / elapsed - packetLostPercentage := float32(packetsLost) / float32(packetsExpected) * 100.0 + p.Frames = r.frames + p.FrameRate = float64(r.frames) / p.Duration - packetDuplicateRate := float64(r.packetsDuplicate) / elapsed - bitrateDuplicate := float64(r.bytesDuplicate) * 8.0 / elapsed + p.KeyFrames = r.keyFrames + p.LastKeyFrame = timestamppb.New(r.lastKeyFrame) - packetPaddingRate := float64(r.packetsPadding) / elapsed - bitratePadding := float64(r.bytesPadding) * 8.0 / elapsed + p.JitterCurrent = jitter / float64(r.params.ClockRate) * 1e6 + p.JitterMax = maxJitter / float64(r.params.ClockRate) * 1e6 - jitterTime := jitter / float64(r.params.ClockRate) * 1e6 - maxJitterTime := maxJitter / float64(r.params.ClockRate) * 1e6 + p.NackAcks = r.nackAcks + p.NackMisses = r.nackMisses + p.NackRepeated = r.nackRepeated - packetDrift, ntpReportDrift, receivedReportDrift, rebasedReportDrift := r.getDrift(extStartTS, extHighestTS) + p.Plis = r.plis + p.LastPli = timestamppb.New(r.lastPli) - p := &livekit.RTPStats{ - StartTime: timestamppb.New(r.startTime), - EndTime: timestamppb.New(endTime), - Duration: elapsed, - Packets: uint32(packets), - PacketRate: packetRate, - Bytes: r.bytes, - HeaderBytes: r.headerBytes, - Bitrate: bitrate, - PacketsLost: uint32(packetsLost), - PacketLossRate: packetLostRate, - PacketLossPercentage: packetLostPercentage, - PacketsDuplicate: uint32(r.packetsDuplicate), - PacketDuplicateRate: packetDuplicateRate, - BytesDuplicate: r.bytesDuplicate, - HeaderBytesDuplicate: r.headerBytesDuplicate, - BitrateDuplicate: bitrateDuplicate, - PacketsPadding: uint32(r.packetsPadding), - PacketPaddingRate: packetPaddingRate, - BytesPadding: r.bytesPadding, - HeaderBytesPadding: r.headerBytesPadding, - BitratePadding: bitratePadding, - PacketsOutOfOrder: uint32(r.packetsOutOfOrder), - Frames: r.frames, - FrameRate: frameRate, - KeyFrames: r.keyFrames, - LastKeyFrame: timestamppb.New(r.lastKeyFrame), - JitterCurrent: jitterTime, - JitterMax: maxJitterTime, - Nacks: r.nacks, - NackAcks: r.nackAcks, - NackMisses: r.nackMisses, - NackRepeated: r.nackRepeated, - Plis: r.plis, - LastPli: timestamppb.New(r.lastPli), - LayerLockPlis: r.layerLockPlis, - LastLayerLockPli: timestamppb.New(r.lastLayerLockPli), - Firs: r.firs, - LastFir: timestamppb.New(r.lastFir), - RttCurrent: r.rtt, - RttMax: r.maxRtt, - PacketDrift: packetDrift, - NtpReportDrift: ntpReportDrift, - RebasedReportDrift: rebasedReportDrift, - ReceivedReportDrift: receivedReportDrift, - } + p.LayerLockPlis = r.layerLockPlis + p.LastLayerLockPli = timestamppb.New(r.lastLayerLockPli) - gapsPresent := false - for i := 0; i < len(r.gapHistogram); i++ { - if r.gapHistogram[i] == 0 { - continue - } + p.Firs = r.firs + p.LastFir = timestamppb.New(r.lastFir) - gapsPresent = true - break - } - - if gapsPresent { - p.GapHistogram = make(map[int32]uint32, len(r.gapHistogram)) - for i := 0; i < len(r.gapHistogram); i++ { - if r.gapHistogram[i] == 0 { - continue - } - - p.GapHistogram[int32(i+1)] = r.gapHistogram[i] - } - } + p.RttCurrent = r.rtt + p.RttMax = r.maxRtt + p.PacketDrift, p.NtpReportDrift, p.ReceivedReportDrift, p.RebasedReportDrift = r.getDrift(extStartTS, extHighestTS) return p } @@ -957,7 +731,7 @@ func (r *rtpStatsBase) getAndResetSnapshot(snapshotID uint32, extStartSN uint64, idx := snapshotID - cFirstSnapshotID then := r.snapshots[idx] if !then.isValid { - then = r.initSnapshot(r.startTime, extStartSN) + then = initSnapshot(r.startTime, extStartSN) r.snapshots[idx] = then } @@ -967,7 +741,12 @@ func (r *rtpStatsBase) getAndResetSnapshot(snapshotID uint32, extStartSN uint64, return &then, &now } -func (r *rtpStatsBase) getDrift(extStartTS, extHighestTS uint64) (packetDrift *livekit.RTPDrift, ntpReportDrift *livekit.RTPDrift, receivedReportDrift *livekit.RTPDrift, rebasedReportDrift *livekit.RTPDrift) { +func (r *rtpStatsBase) getDrift(extStartTS, extHighestTS uint64) ( + packetDrift *livekit.RTPDrift, + ntpReportDrift *livekit.RTPDrift, + receivedReportDrift *livekit.RTPDrift, + rebasedReportDrift *livekit.RTPDrift, +) { if r.firstTime != 0 { elapsed := r.highestTime - r.firstTime rtpClockTicks := extHighestTS - extStartTS @@ -1055,31 +834,17 @@ func (r *rtpStatsBase) updateGapHistogram(gap int) { } } -func (r *rtpStatsBase) initSnapshot(startTime time.Time, extStartSN uint64) snapshot { - return snapshot{ - isValid: true, - startTime: startTime, - extStartSN: extStartSN, - } -} - func (r *rtpStatsBase) getSnapshot(startTime time.Time, extStartSN uint64) snapshot { return snapshot{ - isValid: true, - startTime: startTime, - extStartSN: extStartSN, - bytes: r.bytes, + snapshotLite: r.getSnapshotLite(startTime, extStartSN), headerBytes: r.headerBytes, - packetsPadding: r.packetsPadding, - bytesPadding: r.bytesPadding, - headerBytesPadding: r.headerBytesPadding, packetsDuplicate: r.packetsDuplicate, bytesDuplicate: r.bytesDuplicate, headerBytesDuplicate: r.headerBytesDuplicate, - packetsLost: r.packetsLost, - packetsOutOfOrder: r.packetsOutOfOrder, + packetsPadding: r.packetsPadding, + bytesPadding: r.bytesPadding, + headerBytesPadding: r.headerBytesPadding, frames: r.frames, - nacks: r.nacks, plis: r.plis, firs: r.firs, maxRtt: r.rtt, @@ -1089,6 +854,12 @@ func (r *rtpStatsBase) getSnapshot(startTime time.Time, extStartSN uint64) snaps // ---------------------------------- +func initSnapshot(startTime time.Time, extStartSN uint64) snapshot { + return snapshot{ + snapshotLite: initSnapshotLite(startTime, extStartSN), + } +} + func AggregateRTPStats(statsList []*livekit.RTPStats) *livekit.RTPStats { return utils.AggregateRTPStats(statsList, cGapHistogramNumBins) } diff --git a/pkg/sfu/rtpstats/rtpstats_base_lite.go b/pkg/sfu/rtpstats/rtpstats_base_lite.go new file mode 100644 index 000000000..2bb9197dc --- /dev/null +++ b/pkg/sfu/rtpstats/rtpstats_base_lite.go @@ -0,0 +1,411 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rtpstats + +import ( + "errors" + "fmt" + "sync" + "time" + + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" + "go.uber.org/zap/zapcore" + "google.golang.org/protobuf/types/known/timestamppb" +) + +const ( + cGapHistogramNumBins = 101 + cNumSequenceNumbers = 65536 + cFirstSnapshotID = 1 +) + +// ------------------------------------------------------- + +type RTPDeltaInfoLite struct { + StartTime time.Time + EndTime time.Time + Packets uint32 + Bytes uint64 + PacketsLost uint32 + PacketsOutOfOrder uint32 + Nacks uint32 +} + +type snapshotLite struct { + isValid bool + + startTime time.Time + + extStartSN uint64 + bytes uint64 + + packetsOutOfOrder uint64 + + packetsLost uint64 + + nacks uint32 +} + +// ------------------------------------------------------------------ + +type RTPStatsParams struct { + ClockRate uint32 + Logger logger.Logger +} + +type rtpStatsBaseLite struct { + params RTPStatsParams + logger logger.Logger + + lock sync.RWMutex + + initialized bool + + startTime time.Time + endTime time.Time + + bytes uint64 + + packetsOutOfOrder uint64 + + packetsLost uint64 + + gapHistogram [cGapHistogramNumBins]uint32 + + nacks uint32 + + nextSnapshotLiteID uint32 + snapshotLites []snapshotLite +} + +func newRTPStatsBaseLite(params RTPStatsParams) *rtpStatsBaseLite { + return &rtpStatsBaseLite{ + params: params, + logger: params.Logger, + nextSnapshotLiteID: cFirstSnapshotID, + snapshotLites: make([]snapshotLite, 2), + } +} + +func (r *rtpStatsBaseLite) seed(from *rtpStatsBaseLite) bool { + if from == nil || !from.initialized { + return false + } + + r.initialized = from.initialized + + r.startTime = from.startTime + // do not clone endTime as a non-zero endTime indicates an ended object + + r.bytes = from.bytes + + r.packetsOutOfOrder = from.packetsOutOfOrder + + r.packetsLost = from.packetsLost + + r.gapHistogram = from.gapHistogram + + r.nacks = from.nacks + + r.nextSnapshotLiteID = from.nextSnapshotLiteID + r.snapshotLites = make([]snapshotLite, cap(from.snapshotLites)) + copy(r.snapshotLites, from.snapshotLites) + return true +} + +func (r *rtpStatsBaseLite) SetLogger(logger logger.Logger) { + r.logger = logger +} + +func (r *rtpStatsBaseLite) Stop() { + r.lock.Lock() + defer r.lock.Unlock() + + r.endTime = time.Now() +} + +func (r *rtpStatsBaseLite) newSnapshotLiteID(extStartSN uint64) uint32 { + id := r.nextSnapshotLiteID + r.nextSnapshotLiteID++ + + if cap(r.snapshotLites) < int(r.nextSnapshotLiteID-cFirstSnapshotID) { + snapshotLites := make([]snapshotLite, r.nextSnapshotLiteID-cFirstSnapshotID) + copy(snapshotLites, r.snapshotLites) + r.snapshotLites = snapshotLites + } + + if r.initialized { + r.snapshotLites[id-cFirstSnapshotID] = initSnapshotLite(time.Now(), extStartSN) + } + return id +} + +func (r *rtpStatsBaseLite) IsActive() bool { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.initialized && r.endTime.IsZero() +} + +func (r *rtpStatsBaseLite) UpdateNack(nackCount uint32) { + r.lock.Lock() + defer r.lock.Unlock() + + if !r.endTime.IsZero() { + return + } + + r.nacks += nackCount +} + +func (r *rtpStatsBaseLite) getPacketsSeen(extStartSN, extHighestSN uint64) uint64 { + packetsExpected := getPacketsExpected(extStartSN, extHighestSN) + if r.packetsLost > packetsExpected { + // should not happen + return 0 + } + + return packetsExpected - r.packetsLost +} + +func (r *rtpStatsBaseLite) deltaInfoLite( + snapshotLiteID uint32, + extStartSN uint64, + extHighestSN uint64, +) (deltaInfoLite *RTPDeltaInfoLite, err error, loggingFields []interface{}) { + then, now := r.getAndResetSnapshotLite(snapshotLiteID, extStartSN, extHighestSN) + if now == nil || then == nil { + return + } + + startTime := then.startTime + endTime := now.startTime + + packetsExpected := now.extStartSN - then.extStartSN + if then.extStartSN > extHighestSN { + packetsExpected = 0 + } + if packetsExpected > cNumSequenceNumbers { + loggingFields = []interface{}{ + "startSN", then.extStartSN, + "endSN", now.extStartSN, + "packetsExpected", packetsExpected, + "startTime", startTime, + "endTime", endTime, + "duration", endTime.Sub(startTime).String(), + } + err = errors.New("too many packets expected in delta lite") + return + } + if packetsExpected == 0 { + deltaInfoLite = &RTPDeltaInfoLite{ + StartTime: startTime, + EndTime: endTime, + } + return + } + + packetsLost := uint32(now.packetsLost - then.packetsLost) + if int32(packetsLost) < 0 { + packetsLost = 0 + } + + deltaInfoLite = &RTPDeltaInfoLite{ + StartTime: startTime, + EndTime: endTime, + Packets: uint32(packetsExpected), + Bytes: now.bytes - then.bytes, + PacketsLost: packetsLost, + PacketsOutOfOrder: uint32(now.packetsOutOfOrder - then.packetsOutOfOrder), + Nacks: now.nacks - then.nacks, + } + return +} + +func (r *rtpStatsBaseLite) marshalLogObject(e zapcore.ObjectEncoder, packetsExpected, packetsSeenMinusPadding uint64) (float64, error) { + if r == nil { + return 0, errors.New("no object") + } + + endTime := r.endTime + if endTime.IsZero() { + endTime = time.Now() + } + elapsed := endTime.Sub(r.startTime) + if elapsed == 0 { + return 0, errors.New("no time elapsed") + } + elapsedSeconds := elapsed.Seconds() + + e.AddTime("startTime", r.startTime) + e.AddTime("endTime", r.endTime) + e.AddDuration("elapsed", elapsed) + + e.AddUint64("packetsExpected", packetsExpected) + e.AddFloat64("packetsExpectedRate", float64(packetsExpected)/elapsedSeconds) + e.AddUint64("packetsSeenPrimary", packetsSeenMinusPadding) + e.AddFloat64("packetsSeenPrimaryRate", float64(packetsSeenMinusPadding)/elapsedSeconds) + e.AddUint64("bytes", r.bytes) + e.AddFloat64("bitrate", float64(r.bytes)*8.0/elapsedSeconds) + + e.AddUint64("packetsOutOfOrder", r.packetsOutOfOrder) + + e.AddUint64("packetsLost", r.packetsLost) + e.AddFloat64("packetsLostRate", float64(r.packetsLost)/elapsedSeconds) + e.AddFloat32("packetLostPercentage", float32(r.packetsLost)/float32(packetsExpected)*100.0) + + hasLoss := false + first := true + str := "[" + for burst, count := range r.gapHistogram { + if count == 0 { + continue + } + + hasLoss = true + + if !first { + str += ", " + } + first = false + str += fmt.Sprintf("%d:%d", burst+1, count) + } + str += "]" + if hasLoss { + e.AddString("gapHistogram", str) + } + + e.AddUint32("nacks", r.nacks) + return elapsedSeconds, nil +} + +func (r *rtpStatsBaseLite) toProto(packetsExpected, packetsSeenMinusPadding, packetsLost uint64) *livekit.RTPStats { + if r.startTime.IsZero() { + return nil + } + + endTime := r.endTime + if endTime.IsZero() { + endTime = time.Now() + } + elapsed := endTime.Sub(r.startTime).Seconds() + if elapsed == 0.0 { + return nil + } + + packetRate := float64(packetsSeenMinusPadding) / elapsed + bitrate := float64(r.bytes) * 8.0 / elapsed + + packetLostRate := float64(packetsLost) / elapsed + packetLostPercentage := float32(packetsLost) / float32(packetsExpected) * 100.0 + + p := &livekit.RTPStats{ + StartTime: timestamppb.New(r.startTime), + EndTime: timestamppb.New(endTime), + Duration: elapsed, + Packets: uint32(packetsSeenMinusPadding), + PacketRate: packetRate, + Bytes: r.bytes, + Bitrate: bitrate, + PacketsLost: uint32(packetsLost), + PacketLossRate: packetLostRate, + PacketLossPercentage: packetLostPercentage, + PacketsOutOfOrder: uint32(r.packetsOutOfOrder), + Nacks: r.nacks, + } + + gapsPresent := false + for i := 0; i < len(r.gapHistogram); i++ { + if r.gapHistogram[i] == 0 { + continue + } + + gapsPresent = true + break + } + + if gapsPresent { + p.GapHistogram = make(map[int32]uint32, len(r.gapHistogram)) + for i := 0; i < len(r.gapHistogram); i++ { + if r.gapHistogram[i] == 0 { + continue + } + + p.GapHistogram[int32(i+1)] = r.gapHistogram[i] + } + } + + return p +} + +func (r *rtpStatsBaseLite) getAndResetSnapshotLite(snapshotLiteID uint32, extStartSN uint64, extHighestSN uint64) (*snapshotLite, *snapshotLite) { + if !r.initialized { + return nil, nil + } + + idx := snapshotLiteID - cFirstSnapshotID + then := r.snapshotLites[idx] + if !then.isValid { + then = initSnapshotLite(r.startTime, extStartSN) + r.snapshotLites[idx] = then + } + + // snapshot now + now := r.getSnapshotLite(time.Now(), extHighestSN+1) + r.snapshotLites[idx] = now + return &then, &now +} + +func (r *rtpStatsBaseLite) updateGapHistogram(gap int) { + if gap < 2 { + return + } + + missing := gap - 1 + if missing > len(r.gapHistogram) { + r.gapHistogram[len(r.gapHistogram)-1]++ + } else { + r.gapHistogram[missing-1]++ + } +} + +func (r *rtpStatsBaseLite) getSnapshotLite(startTime time.Time, extStartSN uint64) snapshotLite { + return snapshotLite{ + isValid: true, + startTime: startTime, + extStartSN: extStartSN, + bytes: r.bytes, + packetsOutOfOrder: r.packetsOutOfOrder, + packetsLost: r.packetsLost, + nacks: r.nacks, + } +} + +// ---------------------------------- + +func initSnapshotLite(startTime time.Time, extStartSN uint64) snapshotLite { + return snapshotLite{ + isValid: true, + startTime: startTime, + extStartSN: extStartSN, + } +} + +func getPacketsExpected(extStartSN, extHighestSN uint64) uint64 { + return extHighestSN - extStartSN + 1 +} + +// ---------------------------------- diff --git a/pkg/sfu/rtpstats/rtpstats_receiver.go b/pkg/sfu/rtpstats/rtpstats_receiver.go index bd1d93d21..0d5d67b49 100644 --- a/pkg/sfu/rtpstats/rtpstats_receiver.go +++ b/pkg/sfu/rtpstats/rtpstats_receiver.go @@ -42,7 +42,6 @@ const ( type RTPFlowState struct { IsNotHandled bool - HasLoss bool LossStartInclusive uint64 LossEndExclusive uint64 @@ -59,7 +58,6 @@ func (r *RTPFlowState) MarshalLogObject(e zapcore.ObjectEncoder) error { } e.AddBool("IsNotHandled", r.IsNotHandled) - e.AddBool("HasLoss", r.HasLoss) e.AddUint64("LossStartInclusive", r.LossStartInclusive) e.AddUint64("LossEndExclusive", r.LossEndExclusive) e.AddBool("IsDuplicate", r.IsDuplicate) @@ -189,7 +187,7 @@ func (r *RTPStatsReceiver) Update( // initialize snapshots if any for i := uint32(0); i < r.nextSnapshotID-cFirstSnapshotID; i++ { - r.snapshots[i] = r.initSnapshot(r.startTime, r.sequenceNumber.GetExtendedStart()) + r.snapshots[i] = initSnapshot(r.startTime, r.sequenceNumber.GetExtendedStart()) } r.logger.Debugw( @@ -331,11 +329,8 @@ func (r *RTPStatsReceiver) Update( r.highestTime = packetTime } - if gapSN > 1 { - flowState.HasLoss = true - flowState.LossStartInclusive = resSN.PreExtendedHighest + 1 - flowState.LossEndExclusive = resSN.ExtendedVal - } + flowState.LossStartInclusive = resSN.PreExtendedHighest + 1 + flowState.LossEndExclusive = resSN.ExtendedVal } flowState.ExtSequenceNumber = resSN.ExtendedVal flowState.ExtTimestamp = resTS.ExtendedVal @@ -652,25 +647,19 @@ func (r *RTPStatsReceiver) MarshalLogObject(e zapcore.ObjectEncoder) error { return lockedRTPStatsReceiverLogEncoder{r}.MarshalLogObject(e) } -func (r *RTPStatsReceiver) String() string { - r.lock.RLock() - defer r.lock.RUnlock() - - return r.toString( - r.sequenceNumber.GetExtendedStart(), r.sequenceNumber.GetExtendedHighest(), r.timestamp.GetExtendedStart(), r.timestamp.GetExtendedHighest(), - r.packetsLost, - r.jitter, r.maxJitter, - ) -} - func (r *RTPStatsReceiver) ToProto() *livekit.RTPStats { r.lock.RLock() defer r.lock.RUnlock() + extStartSN, extHighestSN := r.sequenceNumber.GetExtendedStart(), r.sequenceNumber.GetExtendedHighest() return r.toProto( - r.sequenceNumber.GetExtendedStart(), r.sequenceNumber.GetExtendedHighest(), r.timestamp.GetExtendedStart(), r.timestamp.GetExtendedHighest(), + getPacketsExpected(extStartSN, extHighestSN), + r.getPacketsSeenMinusPadding(extStartSN, extHighestSN), r.packetsLost, - r.jitter, r.maxJitter, + r.timestamp.GetExtendedStart(), + r.timestamp.GetExtendedHighest(), + r.jitter, + r.maxJitter, ) } @@ -713,21 +702,24 @@ func (r lockedRTPStatsReceiverLogEncoder) MarshalLogObject(e zapcore.ObjectEncod return nil } - e.AddObject("base", r.rtpStatsBase) - - e.AddUint64("extStartSN", r.sequenceNumber.GetExtendedStart()) - e.AddUint64("extHighestSN", r.sequenceNumber.GetExtendedHighest()) + extStartSN, extHighestSN := r.sequenceNumber.GetExtendedStart(), r.sequenceNumber.GetExtendedHighest() extStartTS, extHighestTS := r.timestamp.GetExtendedStart(), r.timestamp.GetExtendedHighest() + if err := r.rtpStatsBase.marshalLogObject( + e, + getPacketsExpected(extStartSN, extHighestSN), + r.getPacketsSeenMinusPadding(extStartSN, extHighestSN), + extStartTS, + extHighestTS, + ); err != nil { + return err + } + + e.AddUint64("extStartSN", extStartSN) + e.AddUint64("extHighestSN", extHighestSN) e.AddUint64("extStartTS", extStartTS) e.AddUint64("extHighestTS", extHighestTS) e.AddObject("propagationDelayEstimator", r.propagationDelayEstimator) - - packetDrift, ntpReportDrift, receivedReportDrift, rebasedReportDrift := r.getDrift(extStartTS, extHighestTS) - e.AddObject("packetDrift", wrappedRTPDriftLogger{packetDrift}) - e.AddObject("ntpReportDrift", wrappedRTPDriftLogger{ntpReportDrift}) - e.AddObject("receivedReportDrift", wrappedRTPDriftLogger{receivedReportDrift}) - e.AddObject("rebasedReportDrift", wrappedRTPDriftLogger{rebasedReportDrift}) return nil } diff --git a/pkg/sfu/rtpstats/rtpstats_receiver_lite.go b/pkg/sfu/rtpstats/rtpstats_receiver_lite.go new file mode 100644 index 000000000..93dc222ab --- /dev/null +++ b/pkg/sfu/rtpstats/rtpstats_receiver_lite.go @@ -0,0 +1,178 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rtpstats + +import ( + "time" + + "go.uber.org/zap/zapcore" + + "github.com/livekit/livekit-server/pkg/sfu/utils" + "github.com/livekit/protocol/livekit" +) + +type RTPFlowStateLite struct { + IsNotHandled bool + + LossStartInclusive uint64 + LossEndExclusive uint64 + + ExtSequenceNumber uint64 +} + +func (r *RTPFlowStateLite) MarshalLogObject(e zapcore.ObjectEncoder) error { + if r == nil { + return nil + } + + e.AddBool("IsNotHandled", r.IsNotHandled) + e.AddUint64("LossStartInclusive", r.LossStartInclusive) + e.AddUint64("LossEndExclusive", r.LossEndExclusive) + e.AddUint64("ExtSequenceNumber", r.ExtSequenceNumber) + return nil +} + +// --------------------------------------------------------------------- + +type RTPStatsReceiverLite struct { + *rtpStatsBaseLite + + sequenceNumber *utils.WrapAround[uint16, uint64] +} + +func NewRTPStatsReceiverLite(params RTPStatsParams) *RTPStatsReceiverLite { + return &RTPStatsReceiverLite{ + rtpStatsBaseLite: newRTPStatsBaseLite(params), + sequenceNumber: utils.NewWrapAround[uint16, uint64](utils.WrapAroundParams{IsRestartAllowed: false}), + } +} + +func (r *RTPStatsReceiverLite) NewSnapshotLiteId() uint32 { + r.lock.Lock() + defer r.lock.Unlock() + + return r.newSnapshotLiteID(r.sequenceNumber.GetExtendedHighest()) +} + +func (r *RTPStatsReceiverLite) Update(packetTime int64, packetSize int, sequenceNumber uint16) (flowStateLite RTPFlowStateLite) { + r.lock.Lock() + defer r.lock.Unlock() + + if !r.endTime.IsZero() { + flowStateLite.IsNotHandled = true + return + } + + var resSN utils.WrapAroundUpdateResult[uint64] + if !r.initialized { + r.initialized = true + + r.startTime = time.Now() + + resSN = r.sequenceNumber.Update(sequenceNumber) + + // initialize lite snapshots if any + for i := uint32(0); i < r.nextSnapshotLiteID-cFirstSnapshotID; i++ { + r.snapshotLites[i] = initSnapshotLite(r.startTime, r.sequenceNumber.GetExtendedStart()) + } + + r.logger.Debugw( + "rtp receiver lite stream start", + "rtpStats", lockedRTPStatsReceiverLiteLogEncoder{r}, + ) + } else { + resSN = r.sequenceNumber.Update(sequenceNumber) + if resSN.IsUnhandled { + flowStateLite.IsNotHandled = true + return + } + } + gapSN := int64(resSN.ExtendedVal - resSN.PreExtendedHighest) + + if gapSN <= 0 { // duplicate OR out-of-order + r.packetsOutOfOrder++ // counting duplicate as out-of-order + r.packetsLost-- + } else { // in-order + r.updateGapHistogram(int(gapSN)) + r.packetsLost += uint64(gapSN - 1) + + flowStateLite.LossStartInclusive = resSN.PreExtendedHighest + 1 + flowStateLite.LossEndExclusive = resSN.ExtendedVal + } + flowStateLite.ExtSequenceNumber = resSN.ExtendedVal + r.bytes += uint64(packetSize) + return +} + +func (r *RTPStatsReceiverLite) DeltaInfoLite(snapshotLiteID uint32) *RTPDeltaInfoLite { + r.lock.Lock() + defer r.lock.Unlock() + + deltaInfoLite, err, loggingFields := r.deltaInfoLite( + snapshotLiteID, + r.sequenceNumber.GetExtendedStart(), + r.sequenceNumber.GetExtendedHighest(), + ) + if err != nil { + r.logger.Infow(err.Error(), append(loggingFields, "rtpStats", lockedRTPStatsReceiverLiteLogEncoder{r})...) + } + + return deltaInfoLite +} + +func (r *RTPStatsReceiverLite) MarshalLogObject(e zapcore.ObjectEncoder) error { + if r == nil { + return nil + } + + r.lock.RLock() + defer r.lock.RUnlock() + + return lockedRTPStatsReceiverLiteLogEncoder{r}.MarshalLogObject(e) +} + +func (r *RTPStatsReceiverLite) ToProto() *livekit.RTPStats { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.rtpStatsBaseLite.toProto(r.sequenceNumber.GetExtendedStart(), r.sequenceNumber.GetExtendedHighest(), r.packetsLost) +} + +// ---------------------------------- + +type lockedRTPStatsReceiverLiteLogEncoder struct { + *RTPStatsReceiverLite +} + +func (r lockedRTPStatsReceiverLiteLogEncoder) MarshalLogObject(e zapcore.ObjectEncoder) error { + if r.RTPStatsReceiverLite == nil { + return nil + } + + extStartSN, extHighestSN := r.sequenceNumber.GetExtendedStart(), r.sequenceNumber.GetExtendedHighest() + if _, err := r.rtpStatsBaseLite.marshalLogObject( + e, + getPacketsExpected(extStartSN, extHighestSN), + getPacketsExpected(extStartSN, extHighestSN), + ); err != nil { + return err + } + + e.AddUint64("extStartSN", r.sequenceNumber.GetExtendedStart()) + e.AddUint64("extHighestSN", r.sequenceNumber.GetExtendedHighest()) + return nil +} + +// ---------------------------------- diff --git a/pkg/sfu/rtpstats/rtpstats_receiver_test.go b/pkg/sfu/rtpstats/rtpstats_receiver_test.go index 80e3f6937..d895b57c4 100644 --- a/pkg/sfu/rtpstats/rtpstats_receiver_test.go +++ b/pkg/sfu/rtpstats/rtpstats_receiver_test.go @@ -15,7 +15,6 @@ package rtpstats import ( - "fmt" "math/rand" "testing" "time" @@ -36,56 +35,6 @@ func getPacket(sn uint16, ts uint32, payloadSize int) *rtp.Packet { } } -func Test_RTPStatsReceiver(t *testing.T) { - clockRate := uint32(90000) - r := NewRTPStatsReceiver(RTPStatsParams{ - ClockRate: clockRate, - Logger: logger.GetLogger(), - }) - - totalDuration := 5 * time.Second - bitrate := 1000000 - packetSize := 1000 - pps := (((bitrate + 7) / 8) + packetSize - 1) / packetSize - framerate := 30 - sleep := 1000 / framerate - packetsPerFrame := (pps + framerate - 1) / framerate - - sequenceNumber := uint16(rand.Float64() * float64(1<<16)) - timestamp := uint32(rand.Float64() * float64(1<<32)) - now := time.Now() - startTime := now - lastFrameTime := now - for now.Sub(startTime) < totalDuration { - timestamp += uint32(now.Sub(lastFrameTime).Seconds() * float64(clockRate)) - for i := 0; i < packetsPerFrame; i++ { - packet := getPacket(sequenceNumber, timestamp, packetSize) - r.Update( - time.Now().UnixNano(), - packet.Header.SequenceNumber, - packet.Header.Timestamp, - packet.Header.Marker, - packet.Header.MarshalSize(), - len(packet.Payload), - 0, - ) - if (sequenceNumber % 100) == 0 { - jump := uint16(rand.Float64() * 120.0) - sequenceNumber += jump - } else { - sequenceNumber++ - } - } - - lastFrameTime = now - time.Sleep(time.Duration(sleep) * time.Millisecond) - now = time.Now() - } - - r.Stop() - fmt.Printf("%s\n", r.String()) -} - func Test_RTPStatsReceiver_Update(t *testing.T) { clockRate := uint32(90000) r := NewRTPStatsReceiver(RTPStatsParams{ @@ -105,7 +54,6 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { len(packet.Payload), 0, ) - require.False(t, flowState.HasLoss) require.True(t, r.initialized) require.Equal(t, sequenceNumber, r.sequenceNumber.GetHighest()) require.Equal(t, sequenceNumber, uint16(r.sequenceNumber.GetExtendedHighest())) @@ -125,7 +73,6 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { len(packet.Payload), 0, ) - require.False(t, flowState.HasLoss) require.Equal(t, sequenceNumber, r.sequenceNumber.GetHighest()) require.Equal(t, sequenceNumber, uint16(r.sequenceNumber.GetExtendedHighest())) require.Equal(t, timestamp, r.timestamp.GetHighest()) @@ -142,7 +89,6 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { len(packet.Payload), 0, ) - require.False(t, flowState.HasLoss) require.True(t, flowState.IsNotHandled) require.Equal(t, sequenceNumber, r.sequenceNumber.GetHighest()) require.Equal(t, sequenceNumber, uint16(r.sequenceNumber.GetExtendedHighest())) @@ -162,7 +108,6 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { len(packet.Payload), 0, ) - require.False(t, flowState.HasLoss) require.True(t, flowState.IsNotHandled) require.Equal(t, sequenceNumber, r.sequenceNumber.GetHighest()) require.Equal(t, sequenceNumber, uint16(r.sequenceNumber.GetExtendedHighest())) @@ -184,7 +129,6 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { len(packet.Payload), 0, ) - require.True(t, flowState.HasLoss) require.Equal(t, uint64(sequenceNumber-9), flowState.LossStartInclusive) require.Equal(t, uint64(sequenceNumber), flowState.LossEndExclusive) require.Equal(t, uint64(9), r.packetsLost) @@ -200,7 +144,6 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { len(packet.Payload), 0, ) - require.False(t, flowState.HasLoss) require.Equal(t, sequenceNumber, r.sequenceNumber.GetHighest()) require.Equal(t, sequenceNumber, uint16(r.sequenceNumber.GetExtendedHighest())) require.Equal(t, timestamp, r.timestamp.GetHighest()) @@ -223,7 +166,6 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { len(packet.Payload), 0, ) - require.True(t, flowState.HasLoss) require.Equal(t, uint64(sequenceNumber-1), flowState.LossStartInclusive) require.Equal(t, uint64(sequenceNumber), flowState.LossEndExclusive) require.Equal(t, uint64(9), r.packetsLost) @@ -242,7 +184,6 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { len(packet.Payload), 0, ) - require.False(t, flowState.HasLoss) require.Equal(t, uint64(8), r.packetsLost) require.Equal(t, uint64(2), r.packetsOutOfOrder) require.True(t, r.history.IsSet(uint64(sequenceNumber))) @@ -260,7 +201,6 @@ func Test_RTPStatsReceiver_Update(t *testing.T) { len(packet.Payload), 25, ) - require.False(t, flowState.HasLoss) require.Equal(t, uint64(8), r.packetsLost) require.Equal(t, uint64(2), r.packetsOutOfOrder) require.True(t, r.history.IsSet(uint64(sequenceNumber))) diff --git a/pkg/sfu/rtpstats/rtpstats_sender.go b/pkg/sfu/rtpstats/rtpstats_sender.go index 9b2e45493..2fa706a13 100644 --- a/pkg/sfu/rtpstats/rtpstats_sender.go +++ b/pkg/sfu/rtpstats/rtpstats_sender.go @@ -25,7 +25,6 @@ import ( "github.com/livekit/mediatransportutil" "github.com/livekit/protocol/livekit" - "github.com/livekit/protocol/logger" ) const ( @@ -228,7 +227,7 @@ func (r *RTPStatsSender) NewSenderSnapshotId() uint32 { } if r.initialized { - r.senderSnapshots[id-cFirstSnapshotID] = r.initSenderSnapshot(time.Now(), r.extHighestSN) + r.senderSnapshots[id-cFirstSnapshotID] = initSenderSnapshot(time.Now(), r.extHighestSN) } return id } @@ -270,10 +269,10 @@ func (r *RTPStatsSender) Update( // initialize snapshots if any for i := uint32(0); i < r.nextSnapshotID-cFirstSnapshotID; i++ { - r.snapshots[i] = r.initSnapshot(r.startTime, r.extStartSN) + r.snapshots[i] = initSnapshot(r.startTime, r.extStartSN) } for i := uint32(0); i < r.nextSenderSnapshotID-cFirstSnapshotID; i++ { - r.senderSnapshots[i] = r.initSenderSnapshot(r.startTime, r.extStartSN) + r.senderSnapshots[i] = initSenderSnapshot(r.startTime, r.extStartSN) } r.logger.Debugw( @@ -436,11 +435,11 @@ func (r *RTPStatsSender) Update( } } -func (r *RTPStatsSender) GetTotalPacketsPrimary() uint64 { +func (r *RTPStatsSender) GetPacketsSeenMinusPadding() uint64 { r.lock.RLock() defer r.lock.RUnlock() - return r.getTotalPacketsPrimary(r.extStartSN, r.extHighestSN) + return r.getPacketsSeenMinusPadding(r.extStartSN, r.extHighestSN) } func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt uint32, isRttChanged bool) { @@ -628,7 +627,7 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, publisherSRData *livek nowRTPExt = publisherSRData.RtpTimestampExt - tsOffset + uint64(timeSincePublisherSRAdjusted.Nanoseconds()*int64(r.params.ClockRate)/1e9) } - packetCount := uint32(r.getTotalPacketsPrimary(r.extStartSN, r.extHighestSN) + r.packetsDuplicate + r.packetsPadding) + packetCount := uint32(r.getPacketsSeenPlusDuplicates(r.extStartSN, r.extHighestSN)) octetCount := r.bytes + r.bytesDuplicate + r.bytesPadding srData := &livekit.RTCPSenderReportState{ NtpTimestamp: uint64(nowNTP), @@ -809,25 +808,18 @@ func (r *RTPStatsSender) MarshalLogObject(e zapcore.ObjectEncoder) error { return lockedRTPStatsSenderLogEncoder{r}.MarshalLogObject(e) } -func (r *RTPStatsSender) String() string { - r.lock.RLock() - defer r.lock.RUnlock() - - return r.toString( - r.extStartSN, r.extHighestSN, r.extStartTS, r.extHighestTS, - r.packetsLostFromRR, - r.jitterFromRR, r.maxJitterFromRR, - ) -} - func (r *RTPStatsSender) ToProto() *livekit.RTPStats { r.lock.RLock() defer r.lock.RUnlock() return r.toProto( - r.extStartSN, r.extHighestSN, r.extStartTS, r.extHighestTS, + getPacketsExpected(r.extStartSN, r.extHighestSN), + r.getPacketsSeenMinusPadding(r.extStartSN, r.extHighestSN), r.packetsLostFromRR, - r.jitterFromRR, r.maxJitterFromRR, + r.extStartTS, + r.extHighestTS, + r.jitterFromRR, + r.maxJitterFromRR, ) } @@ -839,7 +831,7 @@ func (r *RTPStatsSender) getAndResetSenderSnapshot(senderSnapshotID uint32) (*se idx := senderSnapshotID - cFirstSnapshotID then := r.senderSnapshots[idx] if !then.isValid { - then = r.initSenderSnapshot(r.startTime, r.extStartSN) + then = initSenderSnapshot(r.startTime, r.extStartSN) r.senderSnapshots[idx] = then } @@ -849,15 +841,6 @@ func (r *RTPStatsSender) getAndResetSenderSnapshot(senderSnapshotID uint32) (*se return &then, &now } -func (r *RTPStatsSender) initSenderSnapshot(startTime time.Time, extStartSN uint64) senderSnapshot { - return senderSnapshot{ - isValid: true, - startTime: startTime, - extStartSN: extStartSN, - extLastRRSN: extStartSN - 1, - } -} - func (r *RTPStatsSender) getSenderSnapshot(startTime time.Time, s *senderSnapshot) senderSnapshot { if s == nil { return senderSnapshot{} @@ -1006,7 +989,16 @@ func (r lockedRTPStatsSenderLogEncoder) MarshalLogObject(e zapcore.ObjectEncoder return nil } - e.AddObject("base", r.rtpStatsBase) + if err := r.rtpStatsBase.marshalLogObject( + e, + getPacketsExpected(r.extStartSN, r.extHighestSN), + r.getPacketsSeenMinusPadding(r.extStartSN, r.extHighestSN), + r.extStartTS, + r.extHighestTS, + ); err != nil { + return err + } + e.AddUint64("extStartSN", r.extStartSN) e.AddUint64("extHighestSN", r.extHighestSN) e.AddUint64("extStartTS", r.extStartTS) @@ -1017,11 +1009,16 @@ func (r lockedRTPStatsSenderLogEncoder) MarshalLogObject(e zapcore.ObjectEncoder e.AddUint64("packetsLostFromRR", r.packetsLostFromRR) e.AddFloat64("jitterFromRR", r.jitterFromRR) e.AddFloat64("maxJitterFromRR", r.maxJitterFromRR) - - packetDrift, ntpReportDrift, receivedReportDrift, rebasedReportDrift := r.getDrift(r.extStartTS, r.extHighestTS) - e.AddObject("packetDrift", logger.Proto(packetDrift)) - e.AddObject("ntpReportDrift", logger.Proto(ntpReportDrift)) - e.AddObject("receivedReportDrift", logger.Proto(receivedReportDrift)) - e.AddObject("rebasedReportDrift", logger.Proto(rebasedReportDrift)) return nil } + +// ------------------------------------------------------------------- + +func initSenderSnapshot(startTime time.Time, extStartSN uint64) senderSnapshot { + return senderSnapshot{ + isValid: true, + startTime: startTime, + extStartSN: extStartSN, + extLastRRSN: extStartSN - 1, + } +}