Update mediatransportutil (#4115)

- New bucket API to pass in max packet size and sequence number offset
  and seequence number size generic type
- Move OWD estimator to mediatransportutil.
This commit is contained in:
Raja Subramanian
2025-11-28 21:51:53 +05:30
committed by GitHub
parent 0a2943bbc5
commit 64c651431e
7 changed files with 15 additions and 210 deletions

2
go.mod
View File

@@ -22,7 +22,7 @@ require (
github.com/jellydator/ttlcache/v3 v3.4.0
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731
github.com/livekit/mediatransportutil v0.0.0-20250922175932-f537f0880397
github.com/livekit/mediatransportutil v0.0.0-20251128155121-3d31455b228f
github.com/livekit/protocol v1.43.3-0.20251125082437-3bc7a74d3d5e
github.com/livekit/psrpc v0.7.1
github.com/mackerelio/go-osstat v0.2.6

4
go.sum
View File

@@ -171,8 +171,8 @@ github.com/lithammer/shortuuid/v4 v4.2.0 h1:LMFOzVB3996a7b8aBuEXxqOBflbfPQAiVzkI
github.com/lithammer/shortuuid/v4 v4.2.0/go.mod h1:D5noHZ2oFw/YaKCfGy0YxyE7M0wMbezmMjPdhyEFe6Y=
github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731 h1:9x+U2HGLrSw5ATTo469PQPkqzdoU7be46ryiCDO3boc=
github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20250922175932-f537f0880397 h1:Z7j2mY+bvG05UC80MpnJkitlJju8sSDWsr0Bb4dPceo=
github.com/livekit/mediatransportutil v0.0.0-20250922175932-f537f0880397/go.mod h1:mSNtYzSf6iY9xM3UX42VEI+STHvMgHmrYzEHPcdhB8A=
github.com/livekit/mediatransportutil v0.0.0-20251128155121-3d31455b228f h1:jh+1m+70GnYV7Eryuc4N8CJtctRhbx6xYH4DlpcEiO4=
github.com/livekit/mediatransportutil v0.0.0-20251128155121-3d31455b228f/go.mod h1:mSNtYzSf6iY9xM3UX42VEI+STHvMgHmrYzEHPcdhB8A=
github.com/livekit/protocol v1.43.3-0.20251125082437-3bc7a74d3d5e h1:Fsdj1LoPI5LrzGwOxTGfP7SbGsK+/4SgFCbt83NAiws=
github.com/livekit/protocol v1.43.3-0.20251125082437-3bc7a74d3d5e/go.mod h1:yjkL2/HcaCRyHykP9rLgKST2099AGd8laaU8EuHMnfw=
github.com/livekit/psrpc v0.7.1 h1:ms37az0QTD3UXIWuUC5D/SkmKOlRMVRsI261eBWu/Vw=

View File

@@ -18,7 +18,7 @@ import (
"sync"
"time"
"github.com/livekit/livekit-server/pkg/sfu/utils"
"github.com/livekit/mediatransportutil/pkg/latency"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils/mono"
@@ -49,7 +49,7 @@ type MetricTimestamperParams struct {
type MetricTimestamper struct {
params MetricTimestamperParams
lock sync.Mutex
owdEstimator *utils.OWDEstimator
owdEstimator *latency.OWDEstimator
lastOWDEstimatorRunAt time.Time
batchesSinceLastOWDEstimatorRun int
}
@@ -57,7 +57,7 @@ type MetricTimestamper struct {
func NewMetricTimestamper(params MetricTimestamperParams) *MetricTimestamper {
return &MetricTimestamper{
params: params,
owdEstimator: utils.NewOWDEstimator(utils.OWDEstimatorParamsDefault),
owdEstimator: latency.NewOWDEstimator(latency.OWDEstimatorParamsDefault),
lastOWDEstimatorRunAt: time.Now().Add(-params.Config.OneWayDelayEstimatorMinInterval),
}
}

View File

@@ -78,7 +78,7 @@ type ExtPacket struct {
ExtSequenceNumber uint64
ExtTimestamp uint64
Packet *rtp.Packet
Payload interface{}
Payload any
KeyFrame bool
RawPacket []byte
DependencyDescriptor *ExtDependencyDescriptor
@@ -97,7 +97,7 @@ type VideoSize struct {
type Buffer struct {
sync.RWMutex
readCond *sync.Cond
bucket *bucket.Bucket[uint64]
bucket *bucket.Bucket[uint64, uint16]
nacker *nack.NackQueue
maxVideoPkts int
maxAudioPkts int
@@ -296,11 +296,11 @@ func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapabili
switch {
case mime.IsMimeTypeAudio(b.mime):
b.codecType = webrtc.RTPCodecTypeAudio
b.bucket = bucket.NewBucket[uint64](InitPacketBufferSizeAudio)
b.bucket = bucket.NewBucket[uint64, uint16](InitPacketBufferSizeAudio, bucket.RTPMaxPktSize, bucket.RTPSeqNumOffset)
case mime.IsMimeTypeVideo(b.mime):
b.codecType = webrtc.RTPCodecTypeVideo
b.bucket = bucket.NewBucket[uint64](InitPacketBufferSizeVideo)
b.bucket = bucket.NewBucket[uint64, uint16](InitPacketBufferSizeVideo, bucket.RTPMaxPktSize, bucket.RTPSeqNumOffset)
if b.frameRateCalculator[0] == nil {
b.createFrameRateCalculator()
}
@@ -487,7 +487,7 @@ func (b *Buffer) writeRTX(rtxPkt *rtp.Packet, arrivalTime int64) {
}
if b.rtxPktBuf == nil {
b.rtxPktBuf = make([]byte, bucket.MaxPktSize)
b.rtxPktBuf = make([]byte, bucket.RTPMaxPktSize)
}
if len(rtxPkt.Payload) < 2 {

View File

@@ -785,7 +785,7 @@ func (w *WebRTCReceiver) forwardRTP(layer int32, buff *buffer.Buffer) {
return
}
pktBuf := make([]byte, bucket.MaxPktSize)
pktBuf := make([]byte, bucket.RTPMaxPktSize)
w.logger.Debugw("starting forwarding", "layer", layer)
for {
pkt, err := buff.ReadExtended(pktBuf)

View File

@@ -24,6 +24,7 @@ import (
"github.com/livekit/livekit-server/pkg/sfu/utils"
"github.com/livekit/mediatransportutil"
"github.com/livekit/mediatransportutil/pkg/latency"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
protoutils "github.com/livekit/protocol/utils"
@@ -81,7 +82,7 @@ type RTPStatsReceiver struct {
history *protoutils.Bitmap[uint64]
propagationDelayEstimator *utils.OWDEstimator
propagationDelayEstimator *latency.OWDEstimator
clockSkewCount int
clockSkewMediaPathCount int
@@ -98,7 +99,7 @@ func NewRTPStatsReceiver(params RTPStatsParams) *RTPStatsReceiver {
tsRolloverThreshold: (1 << 31) * 1e9 / int64(params.ClockRate),
timestamp: utils.NewWrapAround[uint32, uint64](utils.WrapAroundParams{IsRestartAllowed: false}),
history: protoutils.NewBitmap[uint64](cHistorySize),
propagationDelayEstimator: utils.NewOWDEstimator(utils.OWDEstimatorParamsDefault),
propagationDelayEstimator: latency.NewOWDEstimator(latency.OWDEstimatorParamsDefault),
}
}

View File

@@ -1,196 +0,0 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package utils
import (
"time"
"go.uber.org/zap/zapcore"
)
type OWDEstimatorParams struct {
PropagationDelayFallFactor float64
PropagationDelayRiseFactor float64
PropagationDelaySpikeAdaptationFactor float64
PropagationDelayDeltaThresholdMin time.Duration
PropagationDelayDeltaThresholdMaxFactor int64
PropagationDelayDeltaHighResetNumReports int
PropagationDelayDeltaHighResetWait time.Duration
PropagationDelayDeltaLongTermAdaptationThreshold time.Duration
}
var OWDEstimatorParamsDefault = OWDEstimatorParams{
// OWD (One-Way-Delay) Estimator is used to estimate propagation delay between sender and receiver.
// As they operate on different clock domains, it is not possible to get exact propagation delay easily.
// So, this module is an estimator using a simple approach explained below. It should not be used for
// things that require high accuracy.
//
// One example is RTCP Sender Reports getting re-based to SFU time base so that all subscriber side
// can have the same time base (i. e. SFU time base). To convert publisher side
// RTCP Sender Reports to SFU timebase, a propagation delay is maintained.
// propagation_delay = time_of_report_reception - ntp_timestamp_in_report
//
// Propagation delay is adapted continuously. If it falls, adapt quickly to the
// lower value as that could be the real propagation delay. If it rises, adapt slowly
// as it might be a temporary change or slow drift. See below for handling of high deltas
// which could be a result of a path change.
PropagationDelayFallFactor: 0.9,
PropagationDelayRiseFactor: 0.1,
PropagationDelaySpikeAdaptationFactor: 0.5,
// To account for path changes mid-stream, if the delta of the propagation delay is consistently higher, reset.
// Reset at whichever of the below happens later.
// 1. 10 seconds of persistent high delta.
// 2. at least 2 consecutive reports with high delta.
//
// A long term estimate of delta of propagation delay is maintained and delta propagation delay exceeding
// a factor of the long term estimate is considered a sharp increase. That will trigger the start of the
// path change condition and if it persists, propagation delay will be reset.
PropagationDelayDeltaThresholdMin: 10 * time.Millisecond,
PropagationDelayDeltaThresholdMaxFactor: 2,
PropagationDelayDeltaHighResetNumReports: 2,
PropagationDelayDeltaHighResetWait: 10 * time.Second,
PropagationDelayDeltaLongTermAdaptationThreshold: 50 * time.Millisecond,
}
type OWDEstimator struct {
params OWDEstimatorParams
initialized bool
initialAdjustmentDone bool
lastSenderClockTimeNs int64
lastPropagationDelayNs int64
lastDeltaPropagationDelayNs int64
estimatedPropagationDelayNs int64
longTermDeltaPropagationDelayNs int64
propagationDelayDeltaHighCount int
propagationDelayDeltaHighStartTime time.Time
propagationDelaySpikeNs int64
}
func NewOWDEstimator(params OWDEstimatorParams) *OWDEstimator {
return &OWDEstimator{
params: params,
}
}
func (o *OWDEstimator) MarshalLogObject(e zapcore.ObjectEncoder) error {
if o != nil {
e.AddTime("lastSenderClockTimeNs", time.Unix(0, o.lastSenderClockTimeNs))
e.AddDuration("lastPropagationDelayNs", time.Duration(o.lastPropagationDelayNs))
e.AddDuration("lastDeltaPropagationDelayNs", time.Duration(o.lastDeltaPropagationDelayNs))
e.AddDuration("estimatedPropagationDelayNs", time.Duration(o.estimatedPropagationDelayNs))
e.AddDuration("longTermDeltaPropagationDelayNs", time.Duration(o.longTermDeltaPropagationDelayNs))
e.AddInt("propagationDelayDeltaHighCount", o.propagationDelayDeltaHighCount)
e.AddTime("propagationDelayDeltaHighStartTime", o.propagationDelayDeltaHighStartTime)
e.AddDuration("propagationDelaySpikeNs", time.Duration(o.propagationDelaySpikeNs))
}
return nil
}
func (o *OWDEstimator) Update(senderClockTimeNs int64, receiverClockTimeNs int64) (int64, bool) {
resetDelta := func() {
o.propagationDelayDeltaHighCount = 0
o.propagationDelayDeltaHighStartTime = time.Time{}
o.propagationDelaySpikeNs = 0
}
initPropagationDelay := func(pd int64) {
o.estimatedPropagationDelayNs = pd
o.longTermDeltaPropagationDelayNs = 0
resetDelta()
}
o.lastPropagationDelayNs = receiverClockTimeNs - senderClockTimeNs
if !o.initialized {
o.initialized = true
o.lastSenderClockTimeNs = senderClockTimeNs
initPropagationDelay(o.lastPropagationDelayNs)
return o.estimatedPropagationDelayNs, true
}
stepChange := false
o.lastDeltaPropagationDelayNs = o.lastPropagationDelayNs - o.estimatedPropagationDelayNs
// check for path changes, i. e. a step jump increase in propagation delay observed over time
if o.lastDeltaPropagationDelayNs > o.params.PropagationDelayDeltaThresholdMin.Nanoseconds() { // ignore small changes for path change consideration
if o.longTermDeltaPropagationDelayNs != 0 &&
o.lastDeltaPropagationDelayNs > o.longTermDeltaPropagationDelayNs*o.params.PropagationDelayDeltaThresholdMaxFactor {
o.propagationDelayDeltaHighCount++
if o.propagationDelayDeltaHighStartTime.IsZero() {
o.propagationDelayDeltaHighStartTime = time.Now()
}
if o.propagationDelaySpikeNs == 0 {
o.propagationDelaySpikeNs = o.lastPropagationDelayNs
} else {
o.propagationDelaySpikeNs += int64(o.params.PropagationDelaySpikeAdaptationFactor * float64(o.lastPropagationDelayNs-o.propagationDelaySpikeNs))
}
if o.propagationDelayDeltaHighCount >= o.params.PropagationDelayDeltaHighResetNumReports && time.Since(o.propagationDelayDeltaHighStartTime) >= o.params.PropagationDelayDeltaHighResetWait {
stepChange = true
initPropagationDelay(o.propagationDelaySpikeNs)
}
} else {
resetDelta()
}
} else {
resetDelta()
factor := o.params.PropagationDelayFallFactor
if o.lastPropagationDelayNs > o.estimatedPropagationDelayNs {
factor = o.params.PropagationDelayRiseFactor
}
o.estimatedPropagationDelayNs += int64(factor * float64(o.lastPropagationDelayNs-o.estimatedPropagationDelayNs))
}
if o.lastDeltaPropagationDelayNs < o.params.PropagationDelayDeltaLongTermAdaptationThreshold.Nanoseconds() {
if o.longTermDeltaPropagationDelayNs == 0 {
o.longTermDeltaPropagationDelayNs = o.lastDeltaPropagationDelayNs
} else {
// do not adapt to large +ve spikes, can happen when channel is congested and reports are delivered very late
// if the spike is in fact a path change, it will persist and handled by path change detection above
sinceLast := senderClockTimeNs - o.lastSenderClockTimeNs
adaptationFactor := min(1.0, float64(sinceLast)/float64(o.params.PropagationDelayDeltaHighResetWait))
o.longTermDeltaPropagationDelayNs += int64(adaptationFactor * float64(o.lastDeltaPropagationDelayNs-o.longTermDeltaPropagationDelayNs))
}
}
if o.longTermDeltaPropagationDelayNs < 0 {
o.longTermDeltaPropagationDelayNs = 0
}
o.lastSenderClockTimeNs = senderClockTimeNs
return o.estimatedPropagationDelayNs, stepChange
}
func (o *OWDEstimator) InitialAdjustment(adjustmentNs int64) int64 {
if o.initialAdjustmentDone {
return o.estimatedPropagationDelayNs
}
o.initialAdjustmentDone = true
// one time adjustment at init
// example: when this is used to measure one-way-delay of RTCP sender reports,
// it is possible that the first sender report is delayed and experiences more
// than existing propagation delay. This allows adjustment of initial estimate.
if adjustmentNs < 0 && -adjustmentNs < o.estimatedPropagationDelayNs {
o.estimatedPropagationDelayNs += adjustmentNs
}
return o.estimatedPropagationDelayNs
}
func (o *OWDEstimator) EstimatedPropagationDelay() int64 {
return o.estimatedPropagationDelayNs
}