From 64c651431ea8853e55e4b4f8f7b84e10fbade680 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Fri, 28 Nov 2025 21:51:53 +0530 Subject: [PATCH] Update mediatransportutil (#4115) - New bucket API to pass in max packet size and sequence number offset and seequence number size generic type - Move OWD estimator to mediatransportutil. --- go.mod | 2 +- go.sum | 4 +- pkg/metric/metric_timestamper.go | 6 +- pkg/sfu/buffer/buffer.go | 10 +- pkg/sfu/receiver.go | 2 +- pkg/sfu/rtpstats/rtpstats_receiver.go | 5 +- pkg/sfu/utils/owd_estimator.go | 196 -------------------------- 7 files changed, 15 insertions(+), 210 deletions(-) delete mode 100644 pkg/sfu/utils/owd_estimator.go diff --git a/go.mod b/go.mod index c4ffa0831..d553893a6 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/jellydator/ttlcache/v3 v3.4.0 github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731 - github.com/livekit/mediatransportutil v0.0.0-20250922175932-f537f0880397 + github.com/livekit/mediatransportutil v0.0.0-20251128155121-3d31455b228f github.com/livekit/protocol v1.43.3-0.20251125082437-3bc7a74d3d5e github.com/livekit/psrpc v0.7.1 github.com/mackerelio/go-osstat v0.2.6 diff --git a/go.sum b/go.sum index 740b1b57d..e367ce13b 100644 --- a/go.sum +++ b/go.sum @@ -171,8 +171,8 @@ github.com/lithammer/shortuuid/v4 v4.2.0 h1:LMFOzVB3996a7b8aBuEXxqOBflbfPQAiVzkI github.com/lithammer/shortuuid/v4 v4.2.0/go.mod h1:D5noHZ2oFw/YaKCfGy0YxyE7M0wMbezmMjPdhyEFe6Y= github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731 h1:9x+U2HGLrSw5ATTo469PQPkqzdoU7be46ryiCDO3boc= github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= -github.com/livekit/mediatransportutil v0.0.0-20250922175932-f537f0880397 h1:Z7j2mY+bvG05UC80MpnJkitlJju8sSDWsr0Bb4dPceo= -github.com/livekit/mediatransportutil v0.0.0-20250922175932-f537f0880397/go.mod h1:mSNtYzSf6iY9xM3UX42VEI+STHvMgHmrYzEHPcdhB8A= +github.com/livekit/mediatransportutil v0.0.0-20251128155121-3d31455b228f h1:jh+1m+70GnYV7Eryuc4N8CJtctRhbx6xYH4DlpcEiO4= +github.com/livekit/mediatransportutil v0.0.0-20251128155121-3d31455b228f/go.mod h1:mSNtYzSf6iY9xM3UX42VEI+STHvMgHmrYzEHPcdhB8A= github.com/livekit/protocol v1.43.3-0.20251125082437-3bc7a74d3d5e h1:Fsdj1LoPI5LrzGwOxTGfP7SbGsK+/4SgFCbt83NAiws= github.com/livekit/protocol v1.43.3-0.20251125082437-3bc7a74d3d5e/go.mod h1:yjkL2/HcaCRyHykP9rLgKST2099AGd8laaU8EuHMnfw= github.com/livekit/psrpc v0.7.1 h1:ms37az0QTD3UXIWuUC5D/SkmKOlRMVRsI261eBWu/Vw= diff --git a/pkg/metric/metric_timestamper.go b/pkg/metric/metric_timestamper.go index c14f935f1..0ee095930 100644 --- a/pkg/metric/metric_timestamper.go +++ b/pkg/metric/metric_timestamper.go @@ -18,7 +18,7 @@ import ( "sync" "time" - "github.com/livekit/livekit-server/pkg/sfu/utils" + "github.com/livekit/mediatransportutil/pkg/latency" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils/mono" @@ -49,7 +49,7 @@ type MetricTimestamperParams struct { type MetricTimestamper struct { params MetricTimestamperParams lock sync.Mutex - owdEstimator *utils.OWDEstimator + owdEstimator *latency.OWDEstimator lastOWDEstimatorRunAt time.Time batchesSinceLastOWDEstimatorRun int } @@ -57,7 +57,7 @@ type MetricTimestamper struct { func NewMetricTimestamper(params MetricTimestamperParams) *MetricTimestamper { return &MetricTimestamper{ params: params, - owdEstimator: utils.NewOWDEstimator(utils.OWDEstimatorParamsDefault), + owdEstimator: latency.NewOWDEstimator(latency.OWDEstimatorParamsDefault), lastOWDEstimatorRunAt: time.Now().Add(-params.Config.OneWayDelayEstimatorMinInterval), } } diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index 02874b179..34a2cd818 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -78,7 +78,7 @@ type ExtPacket struct { ExtSequenceNumber uint64 ExtTimestamp uint64 Packet *rtp.Packet - Payload interface{} + Payload any KeyFrame bool RawPacket []byte DependencyDescriptor *ExtDependencyDescriptor @@ -97,7 +97,7 @@ type VideoSize struct { type Buffer struct { sync.RWMutex readCond *sync.Cond - bucket *bucket.Bucket[uint64] + bucket *bucket.Bucket[uint64, uint16] nacker *nack.NackQueue maxVideoPkts int maxAudioPkts int @@ -296,11 +296,11 @@ func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapabili switch { case mime.IsMimeTypeAudio(b.mime): b.codecType = webrtc.RTPCodecTypeAudio - b.bucket = bucket.NewBucket[uint64](InitPacketBufferSizeAudio) + b.bucket = bucket.NewBucket[uint64, uint16](InitPacketBufferSizeAudio, bucket.RTPMaxPktSize, bucket.RTPSeqNumOffset) case mime.IsMimeTypeVideo(b.mime): b.codecType = webrtc.RTPCodecTypeVideo - b.bucket = bucket.NewBucket[uint64](InitPacketBufferSizeVideo) + b.bucket = bucket.NewBucket[uint64, uint16](InitPacketBufferSizeVideo, bucket.RTPMaxPktSize, bucket.RTPSeqNumOffset) if b.frameRateCalculator[0] == nil { b.createFrameRateCalculator() } @@ -487,7 +487,7 @@ func (b *Buffer) writeRTX(rtxPkt *rtp.Packet, arrivalTime int64) { } if b.rtxPktBuf == nil { - b.rtxPktBuf = make([]byte, bucket.MaxPktSize) + b.rtxPktBuf = make([]byte, bucket.RTPMaxPktSize) } if len(rtxPkt.Payload) < 2 { diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 8f52e113f..afea73327 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -785,7 +785,7 @@ func (w *WebRTCReceiver) forwardRTP(layer int32, buff *buffer.Buffer) { return } - pktBuf := make([]byte, bucket.MaxPktSize) + pktBuf := make([]byte, bucket.RTPMaxPktSize) w.logger.Debugw("starting forwarding", "layer", layer) for { pkt, err := buff.ReadExtended(pktBuf) diff --git a/pkg/sfu/rtpstats/rtpstats_receiver.go b/pkg/sfu/rtpstats/rtpstats_receiver.go index a4a29dc08..6fd323e90 100644 --- a/pkg/sfu/rtpstats/rtpstats_receiver.go +++ b/pkg/sfu/rtpstats/rtpstats_receiver.go @@ -24,6 +24,7 @@ import ( "github.com/livekit/livekit-server/pkg/sfu/utils" "github.com/livekit/mediatransportutil" + "github.com/livekit/mediatransportutil/pkg/latency" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" protoutils "github.com/livekit/protocol/utils" @@ -81,7 +82,7 @@ type RTPStatsReceiver struct { history *protoutils.Bitmap[uint64] - propagationDelayEstimator *utils.OWDEstimator + propagationDelayEstimator *latency.OWDEstimator clockSkewCount int clockSkewMediaPathCount int @@ -98,7 +99,7 @@ func NewRTPStatsReceiver(params RTPStatsParams) *RTPStatsReceiver { tsRolloverThreshold: (1 << 31) * 1e9 / int64(params.ClockRate), timestamp: utils.NewWrapAround[uint32, uint64](utils.WrapAroundParams{IsRestartAllowed: false}), history: protoutils.NewBitmap[uint64](cHistorySize), - propagationDelayEstimator: utils.NewOWDEstimator(utils.OWDEstimatorParamsDefault), + propagationDelayEstimator: latency.NewOWDEstimator(latency.OWDEstimatorParamsDefault), } } diff --git a/pkg/sfu/utils/owd_estimator.go b/pkg/sfu/utils/owd_estimator.go deleted file mode 100644 index 7267262c9..000000000 --- a/pkg/sfu/utils/owd_estimator.go +++ /dev/null @@ -1,196 +0,0 @@ -// Copyright 2023 LiveKit, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package utils - -import ( - "time" - - "go.uber.org/zap/zapcore" -) - -type OWDEstimatorParams struct { - PropagationDelayFallFactor float64 - PropagationDelayRiseFactor float64 - - PropagationDelaySpikeAdaptationFactor float64 - - PropagationDelayDeltaThresholdMin time.Duration - PropagationDelayDeltaThresholdMaxFactor int64 - PropagationDelayDeltaHighResetNumReports int - PropagationDelayDeltaHighResetWait time.Duration - PropagationDelayDeltaLongTermAdaptationThreshold time.Duration -} - -var OWDEstimatorParamsDefault = OWDEstimatorParams{ - // OWD (One-Way-Delay) Estimator is used to estimate propagation delay between sender and receiver. - // As they operate on different clock domains, it is not possible to get exact propagation delay easily. - // So, this module is an estimator using a simple approach explained below. It should not be used for - // things that require high accuracy. - // - // One example is RTCP Sender Reports getting re-based to SFU time base so that all subscriber side - // can have the same time base (i. e. SFU time base). To convert publisher side - // RTCP Sender Reports to SFU timebase, a propagation delay is maintained. - // propagation_delay = time_of_report_reception - ntp_timestamp_in_report - // - // Propagation delay is adapted continuously. If it falls, adapt quickly to the - // lower value as that could be the real propagation delay. If it rises, adapt slowly - // as it might be a temporary change or slow drift. See below for handling of high deltas - // which could be a result of a path change. - PropagationDelayFallFactor: 0.9, - PropagationDelayRiseFactor: 0.1, - - PropagationDelaySpikeAdaptationFactor: 0.5, - - // To account for path changes mid-stream, if the delta of the propagation delay is consistently higher, reset. - // Reset at whichever of the below happens later. - // 1. 10 seconds of persistent high delta. - // 2. at least 2 consecutive reports with high delta. - // - // A long term estimate of delta of propagation delay is maintained and delta propagation delay exceeding - // a factor of the long term estimate is considered a sharp increase. That will trigger the start of the - // path change condition and if it persists, propagation delay will be reset. - PropagationDelayDeltaThresholdMin: 10 * time.Millisecond, - PropagationDelayDeltaThresholdMaxFactor: 2, - PropagationDelayDeltaHighResetNumReports: 2, - PropagationDelayDeltaHighResetWait: 10 * time.Second, - PropagationDelayDeltaLongTermAdaptationThreshold: 50 * time.Millisecond, -} - -type OWDEstimator struct { - params OWDEstimatorParams - - initialized bool - initialAdjustmentDone bool - lastSenderClockTimeNs int64 - lastPropagationDelayNs int64 - lastDeltaPropagationDelayNs int64 - estimatedPropagationDelayNs int64 - longTermDeltaPropagationDelayNs int64 - propagationDelayDeltaHighCount int - propagationDelayDeltaHighStartTime time.Time - propagationDelaySpikeNs int64 -} - -func NewOWDEstimator(params OWDEstimatorParams) *OWDEstimator { - return &OWDEstimator{ - params: params, - } -} - -func (o *OWDEstimator) MarshalLogObject(e zapcore.ObjectEncoder) error { - if o != nil { - e.AddTime("lastSenderClockTimeNs", time.Unix(0, o.lastSenderClockTimeNs)) - e.AddDuration("lastPropagationDelayNs", time.Duration(o.lastPropagationDelayNs)) - e.AddDuration("lastDeltaPropagationDelayNs", time.Duration(o.lastDeltaPropagationDelayNs)) - e.AddDuration("estimatedPropagationDelayNs", time.Duration(o.estimatedPropagationDelayNs)) - e.AddDuration("longTermDeltaPropagationDelayNs", time.Duration(o.longTermDeltaPropagationDelayNs)) - e.AddInt("propagationDelayDeltaHighCount", o.propagationDelayDeltaHighCount) - e.AddTime("propagationDelayDeltaHighStartTime", o.propagationDelayDeltaHighStartTime) - e.AddDuration("propagationDelaySpikeNs", time.Duration(o.propagationDelaySpikeNs)) - } - return nil -} - -func (o *OWDEstimator) Update(senderClockTimeNs int64, receiverClockTimeNs int64) (int64, bool) { - resetDelta := func() { - o.propagationDelayDeltaHighCount = 0 - o.propagationDelayDeltaHighStartTime = time.Time{} - o.propagationDelaySpikeNs = 0 - } - - initPropagationDelay := func(pd int64) { - o.estimatedPropagationDelayNs = pd - o.longTermDeltaPropagationDelayNs = 0 - resetDelta() - } - - o.lastPropagationDelayNs = receiverClockTimeNs - senderClockTimeNs - if !o.initialized { - o.initialized = true - o.lastSenderClockTimeNs = senderClockTimeNs - initPropagationDelay(o.lastPropagationDelayNs) - return o.estimatedPropagationDelayNs, true - } - - stepChange := false - o.lastDeltaPropagationDelayNs = o.lastPropagationDelayNs - o.estimatedPropagationDelayNs - // check for path changes, i. e. a step jump increase in propagation delay observed over time - if o.lastDeltaPropagationDelayNs > o.params.PropagationDelayDeltaThresholdMin.Nanoseconds() { // ignore small changes for path change consideration - if o.longTermDeltaPropagationDelayNs != 0 && - o.lastDeltaPropagationDelayNs > o.longTermDeltaPropagationDelayNs*o.params.PropagationDelayDeltaThresholdMaxFactor { - o.propagationDelayDeltaHighCount++ - if o.propagationDelayDeltaHighStartTime.IsZero() { - o.propagationDelayDeltaHighStartTime = time.Now() - } - if o.propagationDelaySpikeNs == 0 { - o.propagationDelaySpikeNs = o.lastPropagationDelayNs - } else { - o.propagationDelaySpikeNs += int64(o.params.PropagationDelaySpikeAdaptationFactor * float64(o.lastPropagationDelayNs-o.propagationDelaySpikeNs)) - } - - if o.propagationDelayDeltaHighCount >= o.params.PropagationDelayDeltaHighResetNumReports && time.Since(o.propagationDelayDeltaHighStartTime) >= o.params.PropagationDelayDeltaHighResetWait { - stepChange = true - initPropagationDelay(o.propagationDelaySpikeNs) - } - } else { - resetDelta() - } - } else { - resetDelta() - - factor := o.params.PropagationDelayFallFactor - if o.lastPropagationDelayNs > o.estimatedPropagationDelayNs { - factor = o.params.PropagationDelayRiseFactor - } - o.estimatedPropagationDelayNs += int64(factor * float64(o.lastPropagationDelayNs-o.estimatedPropagationDelayNs)) - } - - if o.lastDeltaPropagationDelayNs < o.params.PropagationDelayDeltaLongTermAdaptationThreshold.Nanoseconds() { - if o.longTermDeltaPropagationDelayNs == 0 { - o.longTermDeltaPropagationDelayNs = o.lastDeltaPropagationDelayNs - } else { - // do not adapt to large +ve spikes, can happen when channel is congested and reports are delivered very late - // if the spike is in fact a path change, it will persist and handled by path change detection above - sinceLast := senderClockTimeNs - o.lastSenderClockTimeNs - adaptationFactor := min(1.0, float64(sinceLast)/float64(o.params.PropagationDelayDeltaHighResetWait)) - o.longTermDeltaPropagationDelayNs += int64(adaptationFactor * float64(o.lastDeltaPropagationDelayNs-o.longTermDeltaPropagationDelayNs)) - } - } - if o.longTermDeltaPropagationDelayNs < 0 { - o.longTermDeltaPropagationDelayNs = 0 - } - o.lastSenderClockTimeNs = senderClockTimeNs - return o.estimatedPropagationDelayNs, stepChange -} - -func (o *OWDEstimator) InitialAdjustment(adjustmentNs int64) int64 { - if o.initialAdjustmentDone { - return o.estimatedPropagationDelayNs - } - - o.initialAdjustmentDone = true - // one time adjustment at init - // example: when this is used to measure one-way-delay of RTCP sender reports, - // it is possible that the first sender report is delayed and experiences more - // than existing propagation delay. This allows adjustment of initial estimate. - if adjustmentNs < 0 && -adjustmentNs < o.estimatedPropagationDelayNs { - o.estimatedPropagationDelayNs += adjustmentNs - } - return o.estimatedPropagationDelayNs -} - -func (o *OWDEstimator) EstimatedPropagationDelay() int64 { - return o.estimatedPropagationDelayNs -}