Defer setting clock rate in RTPStats module till codec is bound. (#4250)

With audio simulcast codecs, it is possible that the clock rate of the
primary codec is different from the secondary codec. If a subscriber
binds to the secondary codec, the clock rate should be set correctly. Do
it at bind time.
This commit is contained in:
Raja Subramanian
2026-01-15 22:08:50 +05:30
committed by GitHub
parent d9f716c14a
commit aea044c5cb
8 changed files with 105 additions and 65 deletions
+6 -8
View File
@@ -444,10 +444,9 @@ func (b *BufferBase) SetStreamRestartDetection(enable bool) {
}
func (b *BufferBase) setupRTPStats(clockRate uint32) {
b.rtpStats = rtpstats.NewRTPStatsReceiver(rtpstats.RTPStatsParams{
ClockRate: clockRate,
Logger: b.logger,
})
b.rtpStats = rtpstats.NewRTPStatsReceiver(rtpstats.RTPStatsParams{})
b.rtpStats.SetClockRate(clockRate)
b.ppsSnapshotId = b.rtpStats.NewSnapshotId()
if b.params.IsReportingEnabled {
b.rrSnapshotId = b.rtpStats.NewSnapshotId()
@@ -455,10 +454,9 @@ func (b *BufferBase) setupRTPStats(clockRate uint32) {
}
if b.params.IsOOBSequenceNumber {
b.rtpStatsLite = rtpstats.NewRTPStatsReceiverLite(rtpstats.RTPStatsParams{
ClockRate: clockRate,
Logger: b.logger,
})
b.rtpStatsLite = rtpstats.NewRTPStatsReceiverLite(rtpstats.RTPStatsParams{})
b.rtpStatsLite.SetClockRate(clockRate)
b.liteStatsSnapshotId = b.rtpStatsLite.NewSnapshotLiteId()
}
}
+13 -13
View File
@@ -394,21 +394,14 @@ func NewDownTrack(params DownTrackParams) (*DownTrack, error) {
} else {
mdCacheSize, mdCacheSizeRTX = 8192, 1024
}
d.rtpStats = rtpstats.NewRTPStatsSender(rtpstats.RTPStatsParams{
ClockRate: codec.ClockRate,
Logger: d.params.Logger.WithValues(
"stream", "primary",
),
}, mdCacheSize)
d.rtpStats = rtpstats.NewRTPStatsSender(rtpstats.RTPStatsParams{}, mdCacheSize)
// clock rate will be set on bind or codec change with matching codec's clock rate
d.rtpStats.SetLogger(d.params.Logger.WithValues("stream", "primary"))
d.deltaStatsSenderSnapshotId = d.rtpStats.NewSenderSnapshotId()
d.rtpStatsRTX = rtpstats.NewRTPStatsSender(rtpstats.RTPStatsParams{
ClockRate: codec.ClockRate,
IsRTX: true,
Logger: d.params.Logger.WithValues(
"stream", "rtx",
),
}, mdCacheSizeRTX)
d.rtpStatsRTX = rtpstats.NewRTPStatsSender(rtpstats.RTPStatsParams{IsRTX: true}, mdCacheSizeRTX)
// clock rate will be set on bind or codec change with matching codec's clock rate
d.rtpStatsRTX.SetLogger(d.params.Logger.WithValues("stream", "rtx"))
d.deltaStatsRTXSenderSnapshotId = d.rtpStatsRTX.NewSenderSnapshotId()
d.forwarder = NewForwarder(
@@ -599,6 +592,9 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters,
d.sequencer = newSequencer(d.params.MaxTrack, d.kind == webrtc.RTPCodecTypeVideo, d.params.Logger)
d.codec.Store(codec.RTPCodecCapability)
d.rtpStats.SetClockRate(codec.RTPCodecCapability.ClockRate)
d.rtpStatsRTX.SetClockRate(codec.RTPCodecCapability.ClockRate)
if d.onBinding != nil {
d.onBinding(nil)
}
@@ -699,7 +695,11 @@ func (d *DownTrack) handleUpstreamCodecChange(mimeType string) {
d.payloadType.Store(uint32(codec.PayloadType))
d.payloadTypeRTX.Store(uint32(utils.FindRTXPayloadType(codec.PayloadType, d.negotiatedCodecParameters)))
d.codec.Store(codec.RTPCodecCapability)
d.rtpStats.SetClockRate(codec.RTPCodecCapability.ClockRate)
d.rtpStatsRTX.SetClockRate(codec.RTPCodecCapability.ClockRate)
isFECEnabled := strings.Contains(strings.ToLower(codec.SDPFmtpLine), "fec")
d.bindLock.Unlock()
+3 -1
View File
@@ -26,7 +26,9 @@ import (
)
func TestPlayoutDelay(t *testing.T) {
stats := rtpstats.NewRTPStatsSender(rtpstats.RTPStatsParams{ClockRate: 900000, Logger: logger.GetLogger()}, 128)
stats := rtpstats.NewRTPStatsSender(rtpstats.RTPStatsParams{}, 128)
stats.SetClockRate(90000)
c, err := NewPlayoutDelayController(100, 120, logger.GetLogger(), stats)
require.NoError(t, err)
+24 -9
View File
@@ -251,12 +251,23 @@ type rtpStatsBase struct {
func newRTPStatsBase(params RTPStatsParams) *rtpStatsBase {
return &rtpStatsBase{
rtpStatsBaseLite: newRTPStatsBaseLite(params),
rtpConverter: rtputil.NewRTPConverter(int64(params.ClockRate)),
nextSnapshotID: cFirstSnapshotID,
snapshots: make([]snapshot, 2),
}
}
func (r *rtpStatsBase) SetClockRate(clockRate uint32) {
r.lock.Lock()
defer r.lock.Unlock()
r.setClockRateLocked(clockRate)
}
func (r *rtpStatsBase) setClockRateLocked(clockRate uint32) {
r.rtpConverter = rtputil.NewRTPConverter(int64(clockRate))
r.rtpStatsBaseLite.setClockRateLocked(clockRate)
}
func (r *rtpStatsBase) seed(from *rtpStatsBase) bool {
if !r.rtpStatsBaseLite.seed(from.rtpStatsBaseLite) {
return false
@@ -468,6 +479,10 @@ func (r *rtpStatsBase) deltaInfo(
extStartSN uint64,
extHighestSN uint64,
) (deltaInfo *RTPDeltaInfo, err error, loggingFields []any) {
if r.clockRate == 0 {
return
}
then, now := r.getAndResetSnapshot(snapshotID, extStartSN, extHighestSN)
if now == nil || then == nil {
return
@@ -538,7 +553,7 @@ func (r *rtpStatsBase) deltaInfo(
PacketsOutOfOrder: uint32(now.packetsOutOfOrder - then.packetsOutOfOrder),
Frames: now.frames - then.frames,
RttMax: then.maxRtt,
JitterMax: then.maxJitter / float64(r.params.ClockRate) * 1e6,
JitterMax: then.maxJitter / float64(r.clockRate) * 1e6,
Nacks: now.nacks - then.nacks,
Plis: now.plis - then.plis,
Firs: now.firs - then.firs,
@@ -634,8 +649,8 @@ func (r *rtpStatsBase) toProto(
p.KeyFrames = r.keyFrames
p.LastKeyFrame = timestamppb.New(r.lastKeyFrame)
p.JitterCurrent = jitter / float64(r.params.ClockRate) * 1e6
p.JitterMax = maxJitter / float64(r.params.ClockRate) * 1e6
p.JitterCurrent = jitter / float64(r.clockRate) * 1e6
p.JitterMax = maxJitter / float64(r.clockRate) * 1e6
p.Firs = r.firs
p.LastFir = timestamppb.New(r.lastFir)
@@ -655,7 +670,7 @@ func (r *rtpStatsBase) updateJitter(ets uint64, packetTime int64) float64 {
// p1f1 -> p1f2 -> p2f1
// In this case, p2f1 (packet 2, frame 1) will still be used in jitter calculation
// although it is the second packet of a frame because of out-of-order receival.
if r.lastJitterExtTimestamp != ets {
if r.lastJitterExtTimestamp != ets && r.rtpConverter != nil {
timeSinceFirst := packetTime - r.firstTime
packetTimeRTP := r.rtpConverter.ToRTPExt(time.Duration(timeSinceFirst))
transit := packetTimeRTP - ets
@@ -718,7 +733,7 @@ func (r *rtpStatsBase) getDrift(extStartTS, extHighestTS uint64) (
EndTimestamp: extHighestTS,
RtpClockTicks: rtpClockTicks,
DriftSamples: driftSamples,
DriftMs: (float64(driftSamples) * 1000) / float64(r.params.ClockRate),
DriftMs: (float64(driftSamples) * 1000) / float64(r.clockRate),
ClockRate: float64(rtpClockTicks) / elapsed.Seconds(),
}
}
@@ -738,7 +753,7 @@ func (r *rtpStatsBase) getDrift(extStartTS, extHighestTS uint64) (
EndTimestamp: r.srNewest.RtpTimestampExt,
RtpClockTicks: rtpClockTicks,
DriftSamples: driftSamples,
DriftMs: (float64(driftSamples) * 1000) / float64(r.params.ClockRate),
DriftMs: (float64(driftSamples) * 1000) / float64(r.clockRate),
ClockRate: float64(rtpClockTicks) / elapsed.Seconds(),
}
}
@@ -754,7 +769,7 @@ func (r *rtpStatsBase) getDrift(extStartTS, extHighestTS uint64) (
EndTimestamp: r.srNewest.RtpTimestampExt,
RtpClockTicks: rtpClockTicks,
DriftSamples: driftSamples,
DriftMs: (float64(driftSamples) * 1000) / float64(r.params.ClockRate),
DriftMs: (float64(driftSamples) * 1000) / float64(r.clockRate),
ClockRate: float64(rtpClockTicks) / elapsed.Seconds(),
}
}
@@ -770,7 +785,7 @@ func (r *rtpStatsBase) getDrift(extStartTS, extHighestTS uint64) (
EndTimestamp: r.srNewest.RtpTimestampExt,
RtpClockTicks: rtpClockTicks,
DriftSamples: driftSamples,
DriftMs: (float64(driftSamples) * 1000) / float64(r.params.ClockRate),
DriftMs: (float64(driftSamples) * 1000) / float64(r.clockRate),
ClockRate: float64(rtpClockTicks) / elapsed.Seconds(),
}
}
+21 -8
View File
@@ -95,14 +95,13 @@ func (s *snapshotLite) MarshalLogObject(e zapcore.ObjectEncoder) error {
// ------------------------------------------------------------------
type RTPStatsParams struct {
ClockRate uint32
IsRTX bool
Logger logger.Logger
IsRTX bool
}
type rtpStatsBaseLite struct {
params RTPStatsParams
logger logger.Logger
params RTPStatsParams
clockRate uint32
logger logger.Logger
lock sync.RWMutex
@@ -134,7 +133,7 @@ type rtpStatsBaseLite struct {
func newRTPStatsBaseLite(params RTPStatsParams) *rtpStatsBaseLite {
return &rtpStatsBaseLite{
params: params,
logger: params.Logger,
logger: logger.GetLogger(),
nextSnapshotLiteID: cFirstSnapshotID,
snapshotLites: make([]snapshotLite, 2),
}
@@ -172,7 +171,21 @@ func (r *rtpStatsBaseLite) seed(from *rtpStatsBaseLite) bool {
return true
}
func (r *rtpStatsBaseLite) SetClockRate(clockRate uint32) {
r.lock.Lock()
defer r.lock.Unlock()
r.setClockRateLocked(clockRate)
}
func (r *rtpStatsBaseLite) setClockRateLocked(clockRate uint32) {
r.clockRate = clockRate
}
func (r *rtpStatsBaseLite) SetLogger(logger logger.Logger) {
r.lock.Lock()
defer r.lock.Unlock()
r.logger = logger
}
@@ -366,7 +379,7 @@ func (r *rtpStatsBaseLite) deltaInfoLite(
}
func (r *rtpStatsBaseLite) marshalLogObject(e zapcore.ObjectEncoder, packetsExpected, packetsSeenMinusPadding uint64) (float64, error) {
if r == nil || !r.initialized {
if r == nil || !r.initialized || r.clockRate == 0 {
return 0, errors.New("not initialized")
}
@@ -431,7 +444,7 @@ func (r *rtpStatsBaseLite) marshalLogObject(e zapcore.ObjectEncoder, packetsExpe
}
func (r *rtpStatsBaseLite) toProto(packetsExpected, packetsSeenMinusPadding, packetsLost uint64) *livekit.RTPStats {
if r.startTime == 0 {
if r.startTime == 0 || r.clockRate == 0 {
return nil
}
+22 -8
View File
@@ -49,6 +49,7 @@ type RTPFlowUnhandledReason int
const (
RTPFlowUnhandledReasonNone RTPFlowUnhandledReason = iota
RTPFlowUnhandledReasonEnded
RTPFlowUnhandledReasonUnconfigured
RTPFlowUnhandledReasonPaddingOnly
RTPFlowUnhandledReasonPreStartTimestamp
RTPFlowUnhandledReasonOldTimestamp
@@ -63,6 +64,8 @@ func (r RTPFlowUnhandledReason) String() string {
return "NONE"
case RTPFlowUnhandledReasonEnded:
return "ENDED"
case RTPFlowUnhandledReasonUnconfigured:
return "UNCONFIGURED"
case RTPFlowUnhandledReasonPaddingOnly:
return "PADDING_ONLY"
case RTPFlowUnhandledReasonPreStartTimestamp:
@@ -154,13 +157,20 @@ func NewRTPStatsReceiver(params RTPStatsParams) *RTPStatsReceiver {
return &RTPStatsReceiver{
rtpStatsBase: newRTPStatsBase(params),
sequenceNumber: utils.NewWrapAround[uint16, uint64](utils.WrapAroundParams{IsRestartAllowed: false}),
tsRolloverThreshold: (1 << 31) * 1e9 / int64(params.ClockRate),
timestamp: utils.NewWrapAround[uint32, uint64](utils.WrapAroundParams{IsRestartAllowed: false}),
history: protoutils.NewBitmap[uint64](cHistorySize),
propagationDelayEstimator: latency.NewOWDEstimator(latency.OWDEstimatorParamsDefault),
}
}
func (r *RTPStatsReceiver) SetClockRate(clockRate uint32) {
r.lock.Lock()
defer r.lock.Unlock()
r.tsRolloverThreshold = (1 << 31) * 1e9 / int64(clockRate)
r.rtpStatsBase.setClockRateLocked(clockRate)
}
func (r *RTPStatsReceiver) NewSnapshotId() uint32 {
r.lock.Lock()
defer r.lock.Unlock()
@@ -174,7 +184,7 @@ func (r *RTPStatsReceiver) getTSRolloverCount(diffNano int64, ts uint32) int {
return -1
}
excess := (diffNano - r.tsRolloverThreshold*2) * int64(r.params.ClockRate) / 1e9
excess := (diffNano - r.tsRolloverThreshold*2) * int64(r.clockRate) / 1e9
roc := max(excess/(1<<32), 0)
if r.timestamp.GetHighest() > ts {
roc++
@@ -198,6 +208,10 @@ func (r *RTPStatsReceiver) Update(
flowState.UnhandledReason = RTPFlowUnhandledReasonEnded
return
}
if r.clockRate == 0 {
flowState.UnhandledReason = RTPFlowUnhandledReasonUnconfigured
return
}
var resSN utils.WrapAroundUpdateResult[uint64]
var gapSN int64
@@ -467,8 +481,8 @@ func (r *RTPStatsReceiver) getExtendedSenderReport(srData *livekit.RTCPSenderRep
// jump more than half the range
timeSinceLastReport := mediatransportutil.NtpTime(srData.NtpTimestamp).Time().Sub(mediatransportutil.NtpTime(r.srNewest.NtpTimestamp).Time())
expectedRTPTimestampExt := r.srNewest.RtpTimestampExt + r.rtpConverter.ToRTPExt(timeSinceLastReport)
lbound := expectedRTPTimestampExt - uint64(cReportSlack*float64(r.params.ClockRate))
ubound := expectedRTPTimestampExt + uint64(cReportSlack*float64(r.params.ClockRate))
lbound := expectedRTPTimestampExt - uint64(cReportSlack*float64(r.clockRate))
ubound := expectedRTPTimestampExt + uint64(cReportSlack*float64(r.clockRate))
isInRange := (srData.RtpTimestamp-uint32(lbound) < (1 << 31)) && (uint32(ubound)-srData.RtpTimestamp < (1 << 31))
if isInRange {
lbTSCycles := lbound & 0xFFFF_FFFF_0000_0000
@@ -538,8 +552,8 @@ func (r *RTPStatsReceiver) checkRTPClockSkewForSenderReport(srData *livekit.RTCP
rtpDiffSinceFirst := srData.RtpTimestampExt - r.srFirst.RtpTimestampExt
calculatedClockRateFromFirst := float64(rtpDiffSinceFirst) / timeSinceFirst
if (timeSinceLast > 0.2 && math.Abs(float64(r.params.ClockRate)-calculatedClockRateFromLast) > 0.2*float64(r.params.ClockRate)) ||
(timeSinceFirst > 0.2 && math.Abs(float64(r.params.ClockRate)-calculatedClockRateFromFirst) > 0.2*float64(r.params.ClockRate)) {
if (timeSinceLast > 0.2 && math.Abs(float64(r.clockRate)-calculatedClockRateFromLast) > 0.2*float64(r.clockRate)) ||
(timeSinceFirst > 0.2 && math.Abs(float64(r.clockRate)-calculatedClockRateFromFirst) > 0.2*float64(r.clockRate)) {
r.clockSkewCount++
if (r.clockSkewCount-1)%100 == 0 {
r.logger.Infow(
@@ -576,7 +590,7 @@ func (r *RTPStatsReceiver) checkRTPClockSkewAgainstMediaPathForSenderReport(srDa
diffFirst := extNowTSSR - extNowTSFirst
// is it more than 5 seconds off?
if uint32(math.Abs(float64(int64(diffHighest)))) > 5*r.params.ClockRate || uint32(math.Abs(float64(int64(diffFirst)))) > 5*r.params.ClockRate {
if uint32(math.Abs(float64(int64(diffHighest)))) > 5*r.clockRate || uint32(math.Abs(float64(int64(diffFirst)))) > 5*r.clockRate {
r.clockSkewMediaPathCount++
if (r.clockSkewMediaPathCount-1)%100 == 0 {
r.logger.Infow(
@@ -620,7 +634,7 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *livekit.RTCPSenderRep
r.lock.Lock()
defer r.lock.Unlock()
if srData == nil || !r.initialized {
if srData == nil || !r.initialized || r.clockRate == 0 {
return false
}
+12 -8
View File
@@ -399,7 +399,7 @@ func (r *RTPStatsSender) Update(
r.lock.Lock()
defer r.lock.Unlock()
if r.endTime != 0 {
if r.endTime != 0 || r.clockRate == 0 {
return
}
@@ -808,7 +808,7 @@ func (r *RTPStatsSender) MaybeAdjustFirstPacketTime(publisherSRData *livekit.RTC
r.lock.Lock()
defer r.lock.Unlock()
if !r.initialized || publisherSRData == nil {
if !r.initialized || publisherSRData == nil || r.clockRate == 0 {
return
}
@@ -821,7 +821,7 @@ func (r *RTPStatsSender) GetExpectedRTPTimestamp(at time.Time) (expectedTSExt ui
r.lock.RLock()
defer r.lock.RUnlock()
if r.firstTime == 0 {
if r.firstTime == 0 || r.clockRate == 0 {
err = errors.New("uninitialized")
return
}
@@ -835,7 +835,7 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, publisherSRData *livek
r.lock.Lock()
defer r.lock.Unlock()
if !r.initialized || publisherSRData == nil {
if !r.initialized || publisherSRData == nil || r.clockRate == 0 {
return nil
}
@@ -891,11 +891,11 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, publisherSRData *livek
)
}
if r.srNewest != nil && nowRTPExt >= r.srNewest.RtpTimestampExt {
if r.srNewest != nil && nowRTPExt >= r.srNewest.RtpTimestampExt && r.clockRate != 0 {
timeSinceLastReport := nowNTP.Time().Sub(mediatransportutil.NtpTime(r.srNewest.NtpTimestamp).Time())
rtpDiffSinceLastReport := nowRTPExt - r.srNewest.RtpTimestampExt
windowClockRate := float64(rtpDiffSinceLastReport) / timeSinceLastReport.Seconds()
if timeSinceLastReport.Seconds() > 0.2 && math.Abs(float64(r.params.ClockRate)-windowClockRate) > 0.2*float64(r.params.ClockRate) {
if timeSinceLastReport.Seconds() > 0.2 && math.Abs(float64(r.clockRate)-windowClockRate) > 0.2*float64(r.clockRate) {
r.clockSkewCount++
if (r.clockSkewCount-1)%100 == 0 {
ulgr().Infow(
@@ -955,6 +955,10 @@ func (r *RTPStatsSender) DeltaInfoSender(senderSnapshotID uint32) (*RTPDeltaInfo
r.lock.Lock()
defer r.lock.Unlock()
if r.clockRate == 0 {
return nil, nil
}
var deltaStatsSenderView *RTPDeltaInfo
thenSenderView, nowSenderView := r.getAndResetSenderSnapshotWindow(senderSnapshotID)
if thenSenderView != nil && nowSenderView != nil {
@@ -991,7 +995,7 @@ func (r *RTPStatsSender) DeltaInfoSender(senderSnapshotID uint32) (*RTPDeltaInfo
packetsLostFeed = packetsExpected
}
maxJitterTime := thenSenderView.maxJitterFeed / float64(r.params.ClockRate) * 1e6
maxJitterTime := thenSenderView.maxJitterFeed / float64(r.clockRate) * 1e6
deltaStatsSenderView = &RTPDeltaInfo{
StartTime: time.Unix(0, startTime),
@@ -1060,7 +1064,7 @@ func (r *RTPStatsSender) DeltaInfoSender(senderSnapshotID uint32) (*RTPDeltaInfo
packetsLost = packetsExpected
}
maxJitterTime := thenReceiverView.maxJitter / float64(r.params.ClockRate) * 1e6
maxJitterTime := thenReceiverView.maxJitter / float64(r.clockRate) * 1e6
deltaStatsReceiverView = &RTPDeltaInfo{
StartTime: time.Unix(0, startTime),
+4 -10
View File
@@ -21,8 +21,6 @@ import (
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
"github.com/livekit/protocol/logger"
)
func getPacket(sn uint16, ts uint32, payloadSize int) *rtp.Packet {
@@ -37,10 +35,8 @@ func getPacket(sn uint16, ts uint32, payloadSize int) *rtp.Packet {
func Test_RTPStatsReceiver_Update(t *testing.T) {
clockRate := uint32(90000)
r := NewRTPStatsReceiver(RTPStatsParams{
ClockRate: clockRate,
Logger: logger.GetLogger(),
})
r := NewRTPStatsReceiver(RTPStatsParams{})
r.SetClockRate(clockRate)
sequenceNumber := uint16(rand.Float64() * float64(1<<16))
timestamp := uint32(rand.Float64() * float64(1<<32))
@@ -229,10 +225,8 @@ func Test_RTPStatsReceiver_Update(t *testing.T) {
func Test_RTPStatsReceiver_Restart(t *testing.T) {
clockRate := uint32(90000)
r := NewRTPStatsReceiver(RTPStatsParams{
ClockRate: clockRate,
Logger: logger.GetLogger(),
})
r := NewRTPStatsReceiver(RTPStatsParams{})
r.SetClockRate(clockRate)
// should not restart till there are at least threshold packets
require.False(t, r.maybeRestart(10, 20, 1000))