Handle cases of long mute/rollover of time stamp. (#2842)

* Handle cases of long mute/rollover of time stamp.

There are cases where the track is muted for long enough for timestamp
roll over to happen. There are no packets in that window (typically
there should be black frames (for video) or silence (for audio)). But,
maybe the pause based implementation of mute is causing this.

Anyhow, use time since last packet to gauge how much roll over should
have happened and use that to update time stamp. There will be really
edge cases where this could also fail (for e. g. packet time is affected
by propagation delay, so it could theoretically happen that mute/unmute
+ packet reception could happen exactly around that rollover point and
  miscalculate, but should be rare).

As this happen per packet on receive side, changing time to `UnixNano()`
to make it more efficient to check this.

* spelling

* tests

* test util

* tests
This commit is contained in:
Raja Subramanian
2024-07-08 11:07:20 +05:30
committed by GitHub
parent 39c59d913d
commit acbd4ea104
16 changed files with 283 additions and 125 deletions

View File

@@ -17,7 +17,6 @@ package audio
import (
"math"
"sync"
"time"
)
const (
@@ -46,7 +45,7 @@ type AudioLevel struct {
loudestObservedLevel uint8
activeDuration uint32 // ms
observedDuration uint32 // ms
lastObservedAt time.Time
lastObservedAt int64
}
func NewAudioLevel(params AudioLevelParams) *AudioLevel {
@@ -67,7 +66,7 @@ func NewAudioLevel(params AudioLevelParams) *AudioLevel {
}
// Observes a new frame
func (l *AudioLevel) Observe(level uint8, durationMs uint32, arrivalTime time.Time) {
func (l *AudioLevel) Observe(level uint8, durationMs uint32, arrivalTime int64) {
l.lock.Lock()
defer l.lock.Unlock()
@@ -101,8 +100,8 @@ func (l *AudioLevel) Observe(level uint8, durationMs uint32, arrivalTime time.Ti
}
}
// returns current soothed audio level
func (l *AudioLevel) GetLevel(now time.Time) (float64, bool) {
// returns current smoothed audio level
func (l *AudioLevel) GetLevel(now int64) (float64, bool) {
l.lock.Lock()
defer l.lock.Unlock()
@@ -111,8 +110,8 @@ func (l *AudioLevel) GetLevel(now time.Time) (float64, bool) {
return l.smoothedLevel, l.smoothedLevel >= l.activeThreshold
}
func (l *AudioLevel) resetIfStaleLocked(arrivalTime time.Time) {
if arrivalTime.Sub(l.lastObservedAt).Milliseconds() < int64(2*l.params.ObserveDuration) {
func (l *AudioLevel) resetIfStaleLocked(arrivalTime int64) {
if (arrivalTime-l.lastObservedAt)/1e6 < int64(2*l.params.ObserveDuration) {
return
}

View File

@@ -34,13 +34,13 @@ func TestAudioLevel(t *testing.T) {
clock := time.Now()
a := createAudioLevel(defaultActiveLevel, defaultPercentile, defaultObserveDuration)
_, noisy := a.GetLevel(clock)
_, noisy := a.GetLevel(clock.UnixNano())
require.False(t, noisy)
observeSamples(a, 28, 5, clock)
clock = clock.Add(5 * 20 * time.Millisecond)
_, noisy = a.GetLevel(clock)
_, noisy = a.GetLevel(clock.UnixNano())
require.False(t, noisy)
})
@@ -51,7 +51,7 @@ func TestAudioLevel(t *testing.T) {
observeSamples(a, 35, 100, clock)
clock = clock.Add(100 * 20 * time.Millisecond)
_, noisy := a.GetLevel(clock)
_, noisy := a.GetLevel(clock.UnixNano())
require.False(t, noisy)
})
@@ -66,7 +66,7 @@ func TestAudioLevel(t *testing.T) {
observeSamples(a, 35, 1, clock)
clock = clock.Add(20 * time.Millisecond)
_, noisy := a.GetLevel(clock)
_, noisy := a.GetLevel(clock.UnixNano())
require.False(t, noisy)
})
@@ -81,7 +81,7 @@ func TestAudioLevel(t *testing.T) {
observeSamples(a, 29, 8, clock)
clock = clock.Add(8 * 20 * time.Millisecond)
level, noisy := a.GetLevel(clock)
level, noisy := a.GetLevel(clock.UnixNano())
require.True(t, noisy)
require.Greater(t, level, ConvertAudioLevel(float64(defaultActiveLevel)))
require.Less(t, level, ConvertAudioLevel(float64(25)))
@@ -93,14 +93,14 @@ func TestAudioLevel(t *testing.T) {
observeSamples(a, 25, 100, clock)
clock = clock.Add(100 * 20 * time.Millisecond)
level, noisy := a.GetLevel(clock)
level, noisy := a.GetLevel(clock.UnixNano())
require.True(t, noisy)
require.Greater(t, level, ConvertAudioLevel(float64(defaultActiveLevel)))
require.Less(t, level, ConvertAudioLevel(float64(20)))
// let enough time pass to make the samples stale
clock = clock.Add(1500 * time.Millisecond)
level, noisy = a.GetLevel(clock)
level, noisy = a.GetLevel(clock.UnixNano())
require.Equal(t, float64(0.0), level)
require.False(t, noisy)
})
@@ -116,6 +116,6 @@ func createAudioLevel(activeLevel uint8, minPercentile uint8, observeDuration ui
func observeSamples(a *AudioLevel, level uint8, count int, baseTime time.Time) {
for i := 0; i < count; i++ {
a.Observe(level, 20, baseTime.Add(+time.Duration(i*20)*time.Millisecond))
a.Observe(level, 20, baseTime.Add(+time.Duration(i*20)*time.Millisecond).UnixNano())
}
}

View File

@@ -45,20 +45,20 @@ import (
)
const (
ReportDelta = time.Second
ReportDelta = 1e9
InitPacketBufferSizeVideo = 300
InitPacketBufferSizeAudio = 70
)
type pendingPacket struct {
arrivalTime time.Time
arrivalTime int64
packet []byte
}
type ExtPacket struct {
VideoLayer
Arrival time.Time
Arrival int64
ExtSequenceNumber uint64
ExtTimestamp uint64
Packet *rtp.Packet
@@ -84,7 +84,7 @@ type Buffer struct {
closeOnce sync.Once
mediaSSRC uint32
clockRate uint32
lastReport time.Time
lastReport int64
twccExtID uint8
audioLevelExtID uint8
bound bool
@@ -212,7 +212,7 @@ func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapabili
b.ppsSnapshotId = b.rtpStats.NewSnapshotId()
b.clockRate = codec.ClockRate
b.lastReport = time.Now()
b.lastReport = time.Now().UnixNano()
b.mime = strings.ToLower(codec.MimeType)
for _, codecParameter := range params.Codecs {
if strings.EqualFold(codecParameter.MimeType, codec.MimeType) {
@@ -340,10 +340,10 @@ func (b *Buffer) Write(pkt []byte) (n int, err error) {
}
}
now := time.Now()
now := time.Now().UnixNano()
if b.twcc != nil && b.twccExtID != 0 && !b.closed.Load() {
if ext := rtpPacket.GetExtension(b.twccExtID); ext != nil {
b.twcc.Push(rtpPacket.SSRC, binary.BigEndian.Uint16(ext[0:2]), now.UnixNano(), rtpPacket.Marker)
b.twcc.Push(rtpPacket.SSRC, binary.BigEndian.Uint16(ext[0:2]), now, rtpPacket.Marker)
}
}
@@ -373,7 +373,7 @@ func (b *Buffer) Write(pkt []byte) (n int, err error) {
}
b.payloadType = rtpPacket.PayloadType
b.calc(pkt, &rtpPacket, time.Now(), false)
b.calc(pkt, &rtpPacket, now, false)
b.Unlock()
b.readCond.Broadcast()
return
@@ -398,7 +398,7 @@ func (b *Buffer) SetPrimaryBufferForRTX(primaryBuffer *Buffer) {
}
}
func (b *Buffer) writeRTX(rtxPkt *rtp.Packet, arrivalTime time.Time) (n int, err error) {
func (b *Buffer) writeRTX(rtxPkt *rtp.Packet, arrivalTime int64) (n int, err error) {
b.Lock()
defer b.Unlock()
if !b.bound {
@@ -541,7 +541,7 @@ func (b *Buffer) SetRTT(rtt uint32) {
}
}
func (b *Buffer) calc(rawPkt []byte, rtpPacket *rtp.Packet, arrivalTime time.Time, isRTX bool) {
func (b *Buffer) calc(rawPkt []byte, rtpPacket *rtp.Packet, arrivalTime int64, isRTX bool) {
defer func() {
b.doNACKs()
@@ -740,7 +740,7 @@ func (b *Buffer) doFpsCalc(ep *ExtPacket) {
}
}
func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime time.Time) RTPFlowState {
func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime int64) RTPFlowState {
flowState := b.rtpStats.Update(
arrivalTime,
p.Header.SequenceNumber,
@@ -764,7 +764,7 @@ func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime time.Time) RTPFlow
return flowState
}
func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime time.Time, isRTX bool) {
func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime int64, isRTX bool) {
if b.audioLevelExtID != 0 && !isRTX {
if !b.latestTSForAudioLevelInitialized {
b.latestTSForAudioLevelInitialized = true
@@ -786,7 +786,7 @@ func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime time.Time, i
}
}
func (b *Buffer) getExtPacket(rtpPacket *rtp.Packet, arrivalTime time.Time, flowState RTPFlowState) *ExtPacket {
func (b *Buffer) getExtPacket(rtpPacket *rtp.Packet, arrivalTime int64, flowState RTPFlowState) *ExtPacket {
ep := &ExtPacket{
Arrival: arrivalTime,
ExtSequenceNumber: flowState.ExtSequenceNumber,
@@ -887,8 +887,8 @@ func (b *Buffer) doNACKs() {
}
}
func (b *Buffer) doReports(arrivalTime time.Time) {
if time.Since(b.lastReport) < ReportDelta {
func (b *Buffer) doReports(arrivalTime int64) {
if arrivalTime-b.lastReport < ReportDelta {
return
}
@@ -1090,7 +1090,7 @@ func (b *Buffer) GetAudioLevel() (float64, bool) {
return 0, false
}
return b.audioLevel.GetLevel(time.Now())
return b.audioLevel.GetLevel(time.Now().UnixNano())
}
func (b *Buffer) OnFpsChanged(f func()) {

View File

@@ -35,7 +35,7 @@ const (
cFirstSnapshotID = 1
cFirstPacketTimeAdjustWindow = 2 * time.Minute
cFirstPacketTimeAdjustThreshold = 15 * time.Second
cFirstPacketTimeAdjustThreshold = 15 * 1e9
cPassthroughNTPTimestamp = true
@@ -175,9 +175,9 @@ type rtpStatsBase struct {
startTime time.Time
endTime time.Time
firstTime time.Time
firstTime int64
firstTimeAdjustment time.Duration
highestTime time.Time
highestTime int64
lastTransit uint64
lastJitterExtTimestamp uint64
@@ -506,7 +506,7 @@ func (r *rtpStatsBase) GetRtt() uint32 {
return r.rtt
}
func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *RTCPSenderReportData, tsOffset uint64, extStartTS uint64) {
func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *RTCPSenderReportData, tsOffset uint64, extStartTS uint64) (err error, loggingFields []interface{}) {
if time.Since(r.startTime) > cFirstPacketTimeAdjustWindow {
return
}
@@ -526,17 +526,17 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *RTCPSenderReportData,
}
samplesDuration := time.Duration(float64(samplesDiff) / float64(r.params.ClockRate) * float64(time.Second))
timeSinceFirst := time.Since(r.firstTime)
now := r.firstTime.Add(timeSinceFirst)
firstTime := now.Add(-samplesDuration)
timeSinceFirst := time.Since(time.Unix(0, r.firstTime))
now := r.firstTime + timeSinceFirst.Nanoseconds()
firstTime := now - samplesDuration.Nanoseconds()
getFields := func() []interface{} {
return []interface{}{
"startTime", r.startTime.String(),
"nowTime", now.String(),
"before", r.firstTime.String(),
"after", firstTime.String(),
"adjustment", r.firstTime.Sub(firstTime).String(),
"nowTime", time.Unix(0, now).String(),
"before", time.Unix(0, r.firstTime).String(),
"after", time.Unix(0, firstTime).String(),
"adjustment", time.Duration(r.firstTime - firstTime).String(),
"extNowTS", extNowTS,
"extStartTS", extStartTS,
"srData", srData,
@@ -548,15 +548,17 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *RTCPSenderReportData,
}
}
if firstTime.Before(r.firstTime) {
if r.firstTime.Sub(firstTime) > cFirstPacketTimeAdjustThreshold {
r.logger.Infow("adjusting first packet time, too big, ignoring", getFields()...)
if firstTime < r.firstTime {
if r.firstTime-firstTime > cFirstPacketTimeAdjustThreshold {
err = errors.New("adjusting first packet time, too big, ignoring")
loggingFields = getFields()
} else {
r.logger.Debugw("adjusting first packet time", getFields()...)
r.firstTimeAdjustment += r.firstTime.Sub(firstTime)
r.firstTimeAdjustment += time.Duration(r.firstTime - firstTime)
r.firstTime = firstTime
}
}
return
}
func (r *rtpStatsBase) getTotalPacketsPrimary(extStartSN, extHighestSN uint64) uint64 {
@@ -659,9 +661,9 @@ func (r *rtpStatsBase) MarshalLogObject(e zapcore.ObjectEncoder) error {
e.AddTime("startTime", r.startTime)
e.AddTime("endTime", r.endTime)
e.AddTime("firstTime", r.firstTime)
e.AddTime("firstTime", time.Unix(0, r.firstTime))
e.AddDuration("firstTimeAdjustment", r.firstTimeAdjustment)
e.AddTime("highestTime", r.highestTime)
e.AddTime("highestTime", time.Unix(0, r.highestTime))
e.AddUint64("bytes", r.bytes)
e.AddUint64("headerBytes", r.headerBytes)
@@ -907,7 +909,7 @@ func (r *rtpStatsBase) toProto(
return p
}
func (r *rtpStatsBase) updateJitter(ets uint64, packetTime time.Time) float64 {
func (r *rtpStatsBase) updateJitter(ets uint64, packetTime int64) float64 {
// Do not update jitter on multiple packets of same frame.
// All packets of a frame have the same time stamp.
// NOTE: This does not protect against using more than one packet of the same frame
@@ -916,8 +918,8 @@ func (r *rtpStatsBase) updateJitter(ets uint64, packetTime time.Time) float64 {
// In this case, p2f1 (packet 2, frame 1) will still be used in jitter calculation
// although it is the second packet of a frame because of out-of-order receival.
if r.lastJitterExtTimestamp != ets {
timeSinceFirst := packetTime.Sub(r.firstTime)
packetTimeRTP := uint64(timeSinceFirst.Nanoseconds() * int64(r.params.ClockRate) / 1e9)
timeSinceFirst := packetTime - r.firstTime
packetTimeRTP := uint64(timeSinceFirst * int64(r.params.ClockRate) / 1e9)
transit := packetTimeRTP - ets
if r.lastTransit != 0 {
@@ -963,21 +965,22 @@ func (r *rtpStatsBase) getAndResetSnapshot(snapshotID uint32, extStartSN uint64,
}
func (r *rtpStatsBase) getDrift(extStartTS, extHighestTS uint64) (packetDrift *livekit.RTPDrift, ntpReportDrift *livekit.RTPDrift, rebasedReportDrift *livekit.RTPDrift) {
if !r.firstTime.IsZero() {
elapsed := r.highestTime.Sub(r.firstTime)
if r.firstTime != 0 {
elapsed := r.highestTime - r.firstTime
rtpClockTicks := extHighestTS - extStartTS
driftSamples := int64(rtpClockTicks - uint64(elapsed.Nanoseconds()*int64(r.params.ClockRate)/1e9))
if elapsed.Seconds() > 0.0 {
driftSamples := int64(rtpClockTicks - uint64(elapsed*int64(r.params.ClockRate)/1e9))
if elapsed > 0 {
elapsedSeconds := time.Duration(elapsed).Seconds()
packetDrift = &livekit.RTPDrift{
StartTime: timestamppb.New(r.firstTime),
EndTime: timestamppb.New(r.highestTime),
Duration: elapsed.Seconds(),
StartTime: timestamppb.New(time.Unix(0, r.firstTime)),
EndTime: timestamppb.New(time.Unix(0, r.highestTime)),
Duration: elapsedSeconds,
StartTimestamp: extStartTS,
EndTimestamp: extHighestTS,
RtpClockTicks: rtpClockTicks,
DriftSamples: driftSamples,
DriftMs: (float64(driftSamples) * 1000) / float64(r.params.ClockRate),
ClockRate: float64(rtpClockTicks) / elapsed.Seconds(),
ClockRate: float64(rtpClockTicks) / elapsedSeconds,
}
}
}

View File

@@ -101,7 +101,8 @@ type RTPStatsReceiver struct {
sequenceNumber *utils.WrapAround[uint16, uint64]
timestamp *utils.WrapAround[uint32, uint64]
tsRolloverThreshold int64
timestamp *utils.WrapAround[uint32, uint64]
history *protoutils.Bitmap[uint64]
@@ -115,14 +116,16 @@ type RTPStatsReceiver struct {
outOfOrderSenderReportCount int
largeJumpCount int
largeJumpNegativeCount int
timeReversedCount int
}
func NewRTPStatsReceiver(params RTPStatsParams) *RTPStatsReceiver {
return &RTPStatsReceiver{
rtpStatsBase: newRTPStatsBase(params),
sequenceNumber: utils.NewWrapAround[uint16, uint64](utils.WrapAroundParams{IsRestartAllowed: false}),
timestamp: utils.NewWrapAround[uint32, uint64](utils.WrapAroundParams{IsRestartAllowed: false}),
history: protoutils.NewBitmap[uint64](cHistorySize),
rtpStatsBase: newRTPStatsBase(params),
sequenceNumber: utils.NewWrapAround[uint16, uint64](utils.WrapAroundParams{IsRestartAllowed: false}),
tsRolloverThreshold: (1 << 31) * 1e9 / int64(params.ClockRate),
timestamp: utils.NewWrapAround[uint32, uint64](utils.WrapAroundParams{IsRestartAllowed: false}),
history: protoutils.NewBitmap[uint64](cHistorySize),
}
}
@@ -133,8 +136,18 @@ func (r *RTPStatsReceiver) NewSnapshotId() uint32 {
return r.newSnapshotID(r.sequenceNumber.GetExtendedHighest())
}
func (r *RTPStatsReceiver) getTSRolloverCount(diffNano int64) int {
if diffNano < r.tsRolloverThreshold {
// time not more than rollover threshold
return 0
}
excess := int((diffNano - r.tsRolloverThreshold) * int64(r.params.ClockRate) / 1e9)
return excess/(1<<32) + 1
}
func (r *RTPStatsReceiver) Update(
packetTime time.Time,
packetTime int64,
sequenceNumber uint16,
timestamp uint32,
marker bool,
@@ -151,6 +164,7 @@ func (r *RTPStatsReceiver) Update(
}
var resSN utils.WrapAroundUpdateResult[uint64]
var gapSN int64
var resTS utils.WrapAroundUpdateResult[uint64]
if !r.initialized {
if payloadSize == 0 {
@@ -184,20 +198,23 @@ func (r *RTPStatsReceiver) Update(
flowState.IsNotHandled = true
return
}
resTS = r.timestamp.Update(timestamp)
gapSN = int64(resSN.ExtendedVal - resSN.PreExtendedHighest)
if gapSN <= 0 {
resTS = r.timestamp.Update(timestamp)
} else {
resTS = r.timestamp.Rollover(timestamp, r.getTSRolloverCount(packetTime-r.highestTime))
}
}
pktSize := uint64(hdrSize + payloadSize + paddingSize)
gapSN := int64(resSN.ExtendedVal - resSN.PreExtendedHighest)
getLoggingFields := func() []interface{} {
return []interface{}{
"prevSN", resSN.PreExtendedHighest,
"currSN", resSN.ExtendedVal,
"resSN", resSN,
"gapSN", gapSN,
"prevTS", resTS.PreExtendedHighest,
"currTS", resTS.ExtendedVal,
"resTS", resTS,
"gapTS", int64(resTS.ExtendedVal - resTS.PreExtendedHighest),
"packetTime", packetTime.String(),
"packetTime", time.Unix(0, packetTime).String(),
"sequenceNumber", sequenceNumber,
"timestamp", timestamp,
"marker", marker,
@@ -238,7 +255,7 @@ func (r *RTPStatsReceiver) Update(
}
}
} else { // in-order
if gapSN >= cSequenceNumberLargeJumpThreshold || resTS.ExtendedVal < resTS.PreExtendedHighest {
if gapSN >= cSequenceNumberLargeJumpThreshold {
r.largeJumpCount++
if (r.largeJumpCount-1)%100 == 0 {
r.logger.Warnw(
@@ -248,6 +265,16 @@ func (r *RTPStatsReceiver) Update(
}
}
if resTS.ExtendedVal < resTS.PreExtendedHighest {
r.timeReversedCount++
if (r.timeReversedCount-1)%100 == 0 {
r.logger.Warnw(
"time reversed", nil,
append(getLoggingFields(), "count", r.timeReversedCount)...,
)
}
}
// update gap histogram
r.updateGapHistogram(int(gapSN))
@@ -478,7 +505,9 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData)
srDataCopy.AtAdjusted = ntpTime.Add(r.propagationDelay)
r.srNewest = &srDataCopy
r.maybeAdjustFirstPacketTime(r.srNewest, 0, r.timestamp.GetExtendedStart())
if err, loggingFields := r.maybeAdjustFirstPacketTime(r.srNewest, 0, r.timestamp.GetExtendedStart()); err != nil {
r.logger.Infow(err.Error(), append(loggingFields, "rtpStats", lockedRTPStatsReceiverLogEncoder{r})...)
}
return true
}

View File

@@ -61,7 +61,7 @@ func Test_RTPStatsReceiver(t *testing.T) {
for i := 0; i < packetsPerFrame; i++ {
packet := getPacket(sequenceNumber, timestamp, packetSize)
r.Update(
time.Now(),
time.Now().UnixNano(),
packet.Header.SequenceNumber,
packet.Header.Timestamp,
packet.Header.Marker,
@@ -97,7 +97,7 @@ func Test_RTPStatsReceiver_Update(t *testing.T) {
timestamp := uint32(rand.Float64() * float64(1<<32))
packet := getPacket(sequenceNumber, timestamp, 1000)
flowState := r.Update(
time.Now(),
time.Now().UnixNano(),
packet.Header.SequenceNumber,
packet.Header.Timestamp,
packet.Header.Marker,
@@ -117,7 +117,7 @@ func Test_RTPStatsReceiver_Update(t *testing.T) {
timestamp += 3000
packet = getPacket(sequenceNumber, timestamp, 1000)
flowState = r.Update(
time.Now(),
time.Now().UnixNano(),
packet.Header.SequenceNumber,
packet.Header.Timestamp,
packet.Header.Marker,
@@ -134,7 +134,7 @@ func Test_RTPStatsReceiver_Update(t *testing.T) {
// out-of-order, would cause a restart which is disallowed
packet = getPacket(sequenceNumber-10, timestamp-30000, 1000)
flowState = r.Update(
time.Now(),
time.Now().UnixNano(),
packet.Header.SequenceNumber,
packet.Header.Timestamp,
packet.Header.Marker,
@@ -154,7 +154,7 @@ func Test_RTPStatsReceiver_Update(t *testing.T) {
// duplicate of the above out-of-order packet, but would not be handled as it causes a restart
packet = getPacket(sequenceNumber-10, timestamp-30000, 1000)
flowState = r.Update(
time.Now(),
time.Now().UnixNano(),
packet.Header.SequenceNumber,
packet.Header.Timestamp,
packet.Header.Marker,
@@ -176,7 +176,7 @@ func Test_RTPStatsReceiver_Update(t *testing.T) {
timestamp += 30000
packet = getPacket(sequenceNumber, timestamp, 1000)
flowState = r.Update(
time.Now(),
time.Now().UnixNano(),
packet.Header.SequenceNumber,
packet.Header.Timestamp,
packet.Header.Marker,
@@ -192,7 +192,7 @@ func Test_RTPStatsReceiver_Update(t *testing.T) {
// out-of-order should decrement number of lost packets
packet = getPacket(sequenceNumber-6, timestamp-45000, 1000)
flowState = r.Update(
time.Now(),
time.Now().UnixNano(),
packet.Header.SequenceNumber,
packet.Header.Timestamp,
packet.Header.Marker,
@@ -215,7 +215,7 @@ func Test_RTPStatsReceiver_Update(t *testing.T) {
timestamp += 6000
packet = getPacket(sequenceNumber, timestamp, 1000)
flowState = r.Update(
time.Now(),
time.Now().UnixNano(),
packet.Header.SequenceNumber,
packet.Header.Timestamp,
packet.Header.Marker,
@@ -234,7 +234,7 @@ func Test_RTPStatsReceiver_Update(t *testing.T) {
timestamp -= 3000
packet = getPacket(sequenceNumber, timestamp, 999)
flowState = r.Update(
time.Now(),
time.Now().UnixNano(),
packet.Header.SequenceNumber,
packet.Header.Timestamp,
packet.Header.Marker,
@@ -251,7 +251,7 @@ func Test_RTPStatsReceiver_Update(t *testing.T) {
sequenceNumber += 2
packet = getPacket(sequenceNumber, timestamp, 0)
flowState = r.Update(
time.Now(),
time.Now().UnixNano(),
packet.Header.SequenceNumber,
packet.Header.Timestamp,
packet.Header.Marker,

View File

@@ -166,6 +166,7 @@ type RTPStatsSender struct {
metadataCacheOverflowCount int
largeJumpNegativeCount int
largeJumpCount int
timeReversedCount int
}
func NewRTPStatsSender(params RTPStatsParams) *RTPStatsSender {
@@ -233,7 +234,7 @@ func (r *RTPStatsSender) NewSenderSnapshotId() uint32 {
}
func (r *RTPStatsSender) Update(
packetTime time.Time,
packetTime int64,
extSequenceNumber uint64,
extTimestamp uint64,
marker bool,
@@ -358,16 +359,26 @@ func (r *RTPStatsSender) Update(
}
}
} else { // in-order
if gapSN >= cSequenceNumberLargeJumpThreshold || extTimestamp < r.extHighestTS {
if gapSN >= cSequenceNumberLargeJumpThreshold {
r.largeJumpCount++
if (r.largeJumpCount-1)%100 == 0 {
r.logger.Warnw(
"large sequence number gap OR time reversed", nil,
"large sequence number gap", nil,
append(getLoggingFields(), "count", r.largeJumpCount)...,
)
}
}
if extTimestamp < r.extHighestTS {
r.timeReversedCount++
if (r.timeReversedCount-1)%100 == 0 {
r.logger.Warnw(
"time reversed", nil,
append(getLoggingFields(), "count", r.timeReversedCount)...,
)
}
}
// update gap histogram
r.updateGapHistogram(int(gapSN))
@@ -575,7 +586,9 @@ func (r *RTPStatsSender) MaybeAdjustFirstPacketTime(publisherSRData *RTCPSenderR
return
}
r.maybeAdjustFirstPacketTime(publisherSRData, tsOffset, r.extStartTS)
if err, loggingFields := r.maybeAdjustFirstPacketTime(publisherSRData, tsOffset, r.extStartTS); err != nil {
r.logger.Infow(err.Error(), append(loggingFields, "rtpStats", lockedRTPStatsSenderLogEncoder{r})...)
}
}
func (r *RTPStatsSender) GetExpectedRTPTimestamp(at time.Time) (expectedTSExt uint64, err error) {
@@ -587,7 +600,7 @@ func (r *RTPStatsSender) GetExpectedRTPTimestamp(at time.Time) (expectedTSExt ui
return
}
timeDiff := at.Sub(r.firstTime)
timeDiff := at.Sub(time.Unix(0, r.firstTime))
expectedRTPDiff := timeDiff.Nanoseconds() * int64(r.params.ClockRate) / 1e9
expectedTSExt = r.extStartTS + uint64(expectedRTPDiff)
return
@@ -630,8 +643,8 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, publisherSRData *RTCPS
"tsOffset", tsOffset,
"timeNow", time.Now().String(),
"now", now.String(),
"timeSinceHighest", now.Sub(r.highestTime).String(),
"timeSinceFirst", now.Sub(r.firstTime).String(),
"timeSinceHighest", now.Sub(time.Unix(0, r.highestTime)).String(),
"timeSinceFirst", now.Sub(time.Unix(0, r.firstTime)).String(),
"timeSincePublisherSRAdjusted", timeSincePublisherSRAdjusted.String(),
"timeSincePublisherSR", time.Since(publisherSRData.At).String(),
"nowRTPExt", nowRTPExt,

View File

@@ -905,7 +905,7 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool, forceMa
&hdr,
len(payload),
&sendPacketMetadata{
packetTime: time.Now(),
packetTime: time.Now().UnixNano(),
extSequenceNumber: snts[i].extSequenceNumber,
extTimestamp: snts[i].extTimestamp,
isPadding: true,
@@ -1438,7 +1438,7 @@ func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan
}
d.sendingPacket(&hdr, len(payload), &sendPacketMetadata{
packetTime: time.Now(),
packetTime: time.Now().UnixNano(),
extSequenceNumber: snts[i].extSequenceNumber,
extTimestamp: snts[i].extTimestamp,
})
@@ -1801,7 +1801,7 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) {
len(payload),
&sendPacketMetadata{
layer: int32(epm.layer),
packetTime: time.Now(),
packetTime: time.Now().UnixNano(),
extSequenceNumber: epm.extSequenceNumber,
extTimestamp: epm.extTimestamp,
isRTX: true,
@@ -2011,7 +2011,7 @@ func (d *DownTrack) sendSilentFrameOnMuteForOpus() {
&hdr,
len(payload),
&sendPacketMetadata{
packetTime: time.Now(),
packetTime: time.Now().UnixNano(),
extSequenceNumber: snts[i].extSequenceNumber,
extTimestamp: snts[i].extTimestamp,
// although this is using empty frames, mark as padding as these are used to trigger Pion OnTrack only
@@ -2053,7 +2053,7 @@ func (d *DownTrack) handleRTCPSenderReportData(publisherSRData *buffer.RTCPSende
type sendPacketMetadata struct {
layer int32
packetTime time.Time
packetTime int64
extSequenceNumber uint64
extTimestamp uint64
isKeyFrame bool

View File

@@ -12,10 +12,10 @@ import (
)
type ForwardStats struct {
lock sync.Mutex
lastLeftMs atomic.Int64
latency *utils.LatencyAggregate
closeCh chan struct{}
lock sync.Mutex
lastLeftNano atomic.Int64
latency *utils.LatencyAggregate
closeCh chan struct{}
}
func NewForwardStats(latencyUpdateInterval, reportInterval, latencyWindowLength time.Duration) *ForwardStats {
@@ -28,22 +28,21 @@ func NewForwardStats(latencyUpdateInterval, reportInterval, latencyWindowLength
return s
}
func (s *ForwardStats) Update(arrival, left time.Time) {
transit := left.Sub(arrival)
func (s *ForwardStats) Update(arrival, left int64) {
transit := left - arrival
// ignore if transit is too large or negative, this could happen if system time is adjusted
if transit < 0 || transit > 5*time.Second {
if transit < 0 || time.Duration(transit) > 5*time.Second {
return
}
leftMs := left.UnixMilli()
lastMs := s.lastLeftMs.Load()
if leftMs < lastMs || !s.lastLeftMs.CompareAndSwap(lastMs, leftMs) {
lastLeftNano := s.lastLeftNano.Load()
if left < lastLeftNano || !s.lastLeftNano.CompareAndSwap(lastLeftNano, left) {
return
}
s.lock.Lock()
defer s.lock.Unlock()
s.latency.Update(time.Duration(arrival.UnixNano()), float64(transit))
s.latency.Update(time.Duration(arrival), float64(transit))
}
func (s *ForwardStats) GetStats() (latency, jitter time.Duration) {

View File

@@ -756,7 +756,7 @@ func (w *WebRTCReceiver) forwardRTP(layer int32) {
}
if writeCount > 0 && w.forwardStats != nil {
w.forwardStats.Update(pkt.Arrival, time.Now())
w.forwardStats.Update(pkt.Arrival, time.Now().UnixNano())
}
if spatialTracker != nil {

View File

@@ -73,7 +73,7 @@ func (r *RedPrimaryReceiver) ForwardRTP(pkt *buffer.ExtPacket, spatialLayer int3
for i, sendPkt := range pkts {
pPkt := *pkt
if i != len(pkts)-1 {
// patch extended sequence number and time stmap for all but the last packet,
// patch extended sequence number and time stamp for all but the last packet,
// last packet is the primary payload
pPkt.ExtSequenceNumber -= uint64(pkts[len(pkts)-1].SequenceNumber - pkts[i].SequenceNumber)
pPkt.ExtTimestamp -= uint64(pkts[len(pkts)-1].Timestamp - pkts[i].Timestamp)

View File

@@ -104,7 +104,7 @@ type sequencer struct {
func newSequencer(size int, maybeSparse bool, logger logger.Logger) *sequencer {
s := &sequencer{
size: size,
startTime: time.Now().UnixMilli(),
startTime: time.Now().UnixNano(),
meta: make([]packetMeta, size),
rtt: defaultRtt,
logger: logger,
@@ -128,7 +128,7 @@ func (s *sequencer) setRTT(rtt uint32) {
}
func (s *sequencer) push(
packetTime time.Time,
packetTime int64,
extIncomingSN, extModifiedSN uint64,
extModifiedTS uint64,
marker bool,
@@ -296,7 +296,7 @@ func (s *sequencer) getExtPacketMetas(seqNo []uint16) []extPacketMeta {
snOffset := uint64(0)
var err error
extPacketMetas := make([]extPacketMeta, 0, len(seqNo))
refTime := s.getRefTime(time.Now())
refTime := s.getRefTime(time.Now().UnixNano())
highestSN := uint16(s.extHighestSN)
highestTS := uint32(s.extHighestTS)
for _, sn := range seqNo {
@@ -357,8 +357,8 @@ func (s *sequencer) getExtPacketMetas(seqNo []uint16) []extPacketMeta {
return extPacketMetas
}
func (s *sequencer) getRefTime(at time.Time) uint32 {
return uint32(at.UnixMilli() - s.startTime)
func (s *sequencer) getRefTime(at int64) uint32 {
return uint32((at - s.startTime) / 1e6)
}
func (s *sequencer) updateSNOffset() {

View File

@@ -29,11 +29,11 @@ func Test_sequencer(t *testing.T) {
off := uint16(15)
for i := uint64(1); i < 518; i++ {
seq.push(time.Now(), i, i+uint64(off), 123, true, 2, nil, 0, nil, nil)
seq.push(time.Now().UnixNano(), i, i+uint64(off), 123, true, 2, nil, 0, nil, nil)
}
// send the last two out-of-order
seq.push(time.Now(), 519, 519+uint64(off), 123, false, 2, nil, 0, nil, nil)
seq.push(time.Now(), 518, 518+uint64(off), 123, true, 2, nil, 0, nil, nil)
seq.push(time.Now().UnixNano(), 519, 519+uint64(off), 123, false, 2, nil, 0, nil, nil)
seq.push(time.Now().UnixNano(), 518, 518+uint64(off), 123, true, 2, nil, 0, nil, nil)
req := []uint16{57, 58, 62, 63, 513, 514, 515, 516, 517}
res := seq.getExtPacketMetas(req)
@@ -63,14 +63,14 @@ func Test_sequencer(t *testing.T) {
require.Equal(t, val.extTimestamp, uint64(123))
}
seq.push(time.Now(), 521, 521+uint64(off), 123, true, 1, nil, 0, nil, nil)
seq.push(time.Now().UnixNano(), 521, 521+uint64(off), 123, true, 1, nil, 0, nil, nil)
m := seq.getExtPacketMetas([]uint16{521 + off})
require.Equal(t, 0, len(m))
time.Sleep((ignoreRetransmission + 10) * time.Millisecond)
m = seq.getExtPacketMetas([]uint16{521 + off})
require.Equal(t, 1, len(m))
seq.push(time.Now(), 505, 505+uint64(off), 123, false, 1, nil, 0, nil, nil)
seq.push(time.Now().UnixNano(), 505, 505+uint64(off), 123, false, 1, nil, 0, nil, nil)
m = seq.getExtPacketMetas([]uint16{505 + off})
require.Equal(t, 0, len(m))
time.Sleep((ignoreRetransmission + 10) * time.Millisecond)
@@ -157,7 +157,7 @@ func Test_sequencer_getNACKSeqNo_exclusion(t *testing.T) {
} else {
if i.seqNo%5 == 0 {
n.push(
time.Now(),
time.Now().UnixNano(),
i.seqNo,
i.seqNo+tt.fields.offset,
123,
@@ -171,7 +171,7 @@ func Test_sequencer_getNACKSeqNo_exclusion(t *testing.T) {
} else {
if i.seqNo%2 == 0 {
n.push(
time.Now(),
time.Now().UnixNano(),
i.seqNo,
i.seqNo+tt.fields.offset,
123,
@@ -184,7 +184,7 @@ func Test_sequencer_getNACKSeqNo_exclusion(t *testing.T) {
)
} else {
n.push(
time.Now(),
time.Now().UnixNano(),
i.seqNo,
i.seqNo+tt.fields.offset,
123,
@@ -311,7 +311,7 @@ func Test_sequencer_getNACKSeqNo_no_exclusion(t *testing.T) {
} else {
if i.seqNo%2 == 0 {
n.push(
time.Now(),
time.Now().UnixNano(),
i.seqNo,
i.seqNo+tt.fields.offset,
123,
@@ -324,7 +324,7 @@ func Test_sequencer_getNACKSeqNo_no_exclusion(t *testing.T) {
)
} else {
n.push(
time.Now(),
time.Now().UnixNano(),
i.seqNo,
i.seqNo+tt.fields.offset,
123,

View File

@@ -66,7 +66,7 @@ func GetTestExtPacket(params *TestExtPacketParams) (*buffer.ExtPacket, error) {
VideoLayer: params.VideoLayer,
ExtSequenceNumber: uint64(params.SNCycles<<16) + uint64(params.SequenceNumber),
ExtTimestamp: uint64(params.TSCycles<<32) + uint64(params.Timestamp),
Arrival: params.ArrivalTime,
Arrival: params.ArrivalTime.UnixNano(),
Packet: &packet,
KeyFrame: params.IsKeyFrame,
RawPacket: raw,

View File

@@ -16,6 +16,8 @@ package utils
import (
"unsafe"
"go.uber.org/zap/zapcore"
)
type number interface {
@@ -65,6 +67,19 @@ type WrapAroundUpdateResult[ET extendedNumber] struct {
ExtendedVal ET
}
func (w *WrapAroundUpdateResult[ET]) MarshalLogObject(e zapcore.ObjectEncoder) error {
if w == nil {
return nil
}
e.AddBool("IsUnhandled", w.IsUnhandled)
e.AddBool("IsRestart", w.IsRestart)
e.AddUint64("PreExtendedStart", uint64(w.PreExtendedStart))
e.AddUint64("PreExtendedHighest", uint64(w.PreExtendedHighest))
e.AddUint64("ExtendedVal", uint64(w.ExtendedVal))
return nil
}
func (w *WrapAround[T, ET]) Update(val T) (result WrapAroundUpdateResult[ET]) {
if !w.initialized {
result.PreExtendedHighest = ET(val) - 1
@@ -96,6 +111,21 @@ func (w *WrapAround[T, ET]) Update(val T) (result WrapAroundUpdateResult[ET]) {
return
}
func (w *WrapAround[T, ET]) Rollover(val T, numCycles int) (result WrapAroundUpdateResult[ET]) {
if !w.initialized || numCycles == 0 {
return w.Update(val)
}
result.PreExtendedHighest = w.extendedHighest
w.cycles += ET(numCycles) * w.fullRange
w.highest = val
w.updateExtendedHighest()
result.ExtendedVal = w.extendedHighest
return
}
func (w *WrapAround[T, ET]) RollbackRestart(ev ET) {
if w.isWrapBack(w.start, T(ev)) {
w.cycles -= w.fullRange

View File

@@ -448,6 +448,91 @@ func TestWrapAroundUint16WrapAroundRestartDuplicate(t *testing.T) {
require.Equal(t, uint64(65568), w.GetExtendedHighest())
}
func TestWrapAroundUint16Rollover(t *testing.T) {
w := NewWrapAround[uint16, uint32](WrapAroundParams{IsRestartAllowed: false})
testCases := []struct {
name string
input uint16
numCycles int
updated WrapAroundUpdateResult[uint32]
start uint16
extendedStart uint32
highest uint16
extendedHighest uint32
}{
// initialize - should initialize irrespective of numCycles
{
name: "initialize",
input: 10,
numCycles: 10,
updated: WrapAroundUpdateResult[uint32]{
IsRestart: false,
PreExtendedStart: 0,
PreExtendedHighest: 9,
ExtendedVal: 10,
},
start: 10,
extendedStart: 10,
highest: 10,
extendedHighest: 10,
},
// zero cycles - should just do an update
{
name: "zero",
input: 8,
updated: WrapAroundUpdateResult[uint32]{
IsUnhandled: true,
// the following fields are not valid when `IsUnhandled = true`, but code fills it in
// and they are filled in here for testing purposes
PreExtendedHighest: 10,
ExtendedVal: 8,
},
start: 10,
extendedStart: 10,
highest: 10,
extendedHighest: 10,
},
// one cycle
{
name: "one cycle",
input: (1 << 16) - 6,
numCycles: 1,
updated: WrapAroundUpdateResult[uint32]{
PreExtendedHighest: 10,
ExtendedVal: (1 << 16) - 6 + (1 << 16),
},
start: 10,
extendedStart: 10,
highest: (1 << 16) - 6,
extendedHighest: (1 << 16) - 6 + (1 << 16),
},
// two cycles
{
name: "two cycles",
input: (1 << 16) - 7,
numCycles: 2,
updated: WrapAroundUpdateResult[uint32]{
PreExtendedHighest: (1 << 16) - 6 + (1 << 16),
ExtendedVal: (1 << 16) - 7 + 3*(1<<16),
},
start: 10,
extendedStart: 10,
highest: (1 << 16) - 7,
extendedHighest: (1 << 16) - 7 + 3*(1<<16),
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
require.Equal(t, tc.updated, w.Rollover(tc.input, tc.numCycles))
require.Equal(t, tc.start, w.GetStart())
require.Equal(t, tc.extendedStart, w.GetExtendedStart())
require.Equal(t, tc.highest, w.GetHighest())
require.Equal(t, tc.extendedHighest, w.GetExtendedHighest())
})
}
}
func TestWrapAroundUint32(t *testing.T) {
w := NewWrapAround[uint32, uint64](WrapAroundParams{IsRestartAllowed: true})
testCases := []struct {