(Attempted) Simplify time stamp calculation on switches. (#2688)

* Simplify time stamp calculation on switches.

Trying to simplify time stamp calculation on restarts.
The additional checks take effect rarely and it not worth the extra
complication.

Also, doing the reference time stamp in extended range.
The challenge with that is when publisher migrates the extended
timestamp could change post migration (i. e. post migration would not
know about rollovers). To address that, maintain an offset that is
updated on resync.

* WIP

* Revert to resume threshold

* typo

* clean up
This commit is contained in:
Raja Subramanian
2024-04-28 12:13:52 +05:30
committed by GitHub
parent 18b3b7b421
commit c8b289daa5
4 changed files with 41 additions and 28 deletions

View File

@@ -529,6 +529,11 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *RTCPSenderReportData,
"adjustment", r.firstTime.Sub(firstTime).String(),
"extNowTS", extNowTS,
"extStartTS", extStartTS,
"srData", srData,
"timeSinceReceive", timeSinceReceive.String(),
"timeSinceFirst", timeSinceFirst.String(),
"samplesDiff", samplesDiff,
"samplesDuration", samplesDuration,
}
}

View File

@@ -212,18 +212,21 @@ func (r *RTPStatsReceiver) Update(
flowState.ExtSequenceNumber = resSN.ExtendedVal
flowState.ExtTimestamp = resTS.ExtendedVal
} else { // in-order
if gapSN >= cNumSequenceNumbers/2 {
if gapSN >= cNumSequenceNumbers/2 || resTS.ExtendedVal < resTS.PreExtendedHighest {
r.logger.Warnw(
"large sequence number gap", nil,
"large sequence number gap OR time reversed", nil,
"extStartSN", r.sequenceNumber.GetExtendedStart(),
"extHighestSN", r.sequenceNumber.GetExtendedHighest(),
"extStartTS", r.timestamp.GetExtendedStart(),
"extHighestTS", r.timestamp.GetExtendedHighest(),
"firstTime", r.firstTime.String(),
"highestTime", r.highestTime.String(),
"prev", resSN.PreExtendedHighest,
"curr", resSN.ExtendedVal,
"gap", gapSN,
"prevSN", resSN.PreExtendedHighest,
"currSN", resSN.ExtendedVal,
"gapSN", gapSN,
"prevTS", resTS.PreExtendedHighest,
"currTS", resTS.ExtendedVal,
"gapTS", resTS.ExtendedVal-resTS.PreExtendedHighest,
"packetTime", packetTime.String(),
"sequenceNumber", sequenceNumber,
"timestamp", timestamp,

View File

@@ -33,6 +33,8 @@ const (
cSenderReportInitialWait = time.Second
)
// -------------------------------------------------------------------
type snInfoFlag byte
const (
@@ -625,7 +627,7 @@ func (r *RTPStatsSender) GetExpectedRTPTimestamp(at time.Time) (expectedTSExt ui
defer r.lock.RUnlock()
if !r.initialized {
err = errors.New("uninitilaized")
err = errors.New("uninitialized")
return
}

View File

@@ -191,7 +191,7 @@ type ForwarderState struct {
ReferenceLayerSpatial int32
PreStartTime time.Time
ExtFirstTS uint64
RefTSOffset uint64
DummyStartTSOffset uint64
RTP RTPMungerState
Codec interface{}
}
@@ -202,12 +202,12 @@ func (f ForwarderState) String() string {
case codecmunger.VP8State:
codecString = codecState.String()
}
return fmt.Sprintf("ForwarderState{started: %v, referenceLayerSpatial: %d, preStartTime: %s, extFirstTS: %d, refTSOffset: %d, rtp: %s, codec: %s}",
return fmt.Sprintf("ForwarderState{started: %v, referenceLayerSpatial: %d, preStartTime: %s, extFirstTS: %d, dummyStartTSOffset: %d, rtp: %s, codec: %s}",
f.Started,
f.ReferenceLayerSpatial,
f.PreStartTime.String(),
f.ExtFirstTS,
f.RefTSOffset,
f.DummyStartTSOffset,
f.RTP.String(),
codecString,
)
@@ -232,7 +232,7 @@ type Forwarder struct {
extFirstTS uint64
lastSSRC uint32
referenceLayerSpatial int32
refTSOffset uint64
dummyStartTSOffset uint64
refSenderReports [buffer.DefaultMaxLayerSpatial + 1]*buffer.RTCPSenderReportData
refIsSVC bool
@@ -383,7 +383,7 @@ func (f *Forwarder) GetState() ForwarderState {
ReferenceLayerSpatial: f.referenceLayerSpatial,
PreStartTime: f.preStartTime,
ExtFirstTS: f.extFirstTS,
RefTSOffset: f.refTSOffset,
DummyStartTSOffset: f.dummyStartTSOffset,
RTP: f.rtpMunger.GetLast(),
Codec: f.codecMunger.GetState(),
}
@@ -404,7 +404,7 @@ func (f *Forwarder) SeedState(state ForwarderState) {
f.referenceLayerSpatial = state.ReferenceLayerSpatial
f.preStartTime = state.PreStartTime
f.extFirstTS = state.ExtFirstTS
f.refTSOffset = state.RefTSOffset
f.dummyStartTSOffset = state.DummyStartTSOffset
}
func (f *Forwarder) Mute(muted bool, isSubscribeMutable bool) bool {
@@ -578,7 +578,7 @@ func (f *Forwarder) clearRefSenderReportsLocked() {
// This is done to prevent use of potentially stale publisher sender reports.
//
// It is possible to implement mute using pause/unpause
// which can implemented using a replaceTrack(null)/replaceTrack(track).
// which can be implemented using replaceTrack(null)/replaceTrack(track).
// In those cases, the RTP time stamp may not jump across
// the mute/pause valley (for the time it is replaced with null track).
// So, relying on a report that happened before unmute/unpause
@@ -591,7 +591,8 @@ func (f *Forwarder) clearRefSenderReportsLocked() {
// 2. Publisher pauses: there are no more reports.
// 3. When paused, subscriber can still use the publisher side sender
// report to send reports. Although the time since last publisher
// sender report is increasing, the reports are correct though.
// sender report is increasing, the reports would still be correct
// as they referencing a previous (albeit older) correct report.
// 4. Publisher unpauses after 20 seconds. But, it may not have advanced
// RTP Timestamp by that much. Let us say, it advances only by 5 seconds.
// 5. When subscriber starts forwarding packets, it will calculate
@@ -1623,7 +1624,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
rtpMungerState := f.rtpMunger.GetLast()
extLastTS := rtpMungerState.ExtLastTS
extExpectedTS := extLastTS
extRefTS := extExpectedTS
extRefTS := extLastTS
refTS := uint32(extRefTS)
switchingAt := time.Now()
if !f.skipReferenceTS {
@@ -1639,13 +1640,13 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
}
}
// adjust extRefTS to current packet's timestamp mapped to that of reference layer's
extRefTS = (extRefTS & 0xFFFF_FFFF_0000_0000) + uint64(refTS)
expectedTS := uint32(extExpectedTS)
if (refTS-expectedTS) < 1<<31 && refTS < expectedTS {
lastTS := uint32(extLastTS)
if (refTS-lastTS) < 1<<31 && refTS < lastTS {
extRefTS += (1 << 32)
}
if (expectedTS-refTS) < 1<<31 && expectedTS < refTS && extRefTS >= 1<<32 {
if (lastTS-refTS) < 1<<31 && lastTS < refTS && extRefTS >= 1<<32 {
extRefTS -= (1 << 32)
}
@@ -1658,22 +1659,22 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
timeSinceFirst := time.Since(f.preStartTime)
rtpDiff := uint64(timeSinceFirst.Nanoseconds() * int64(f.codec.ClockRate) / 1e9)
extExpectedTS = f.extFirstTS + rtpDiff
if f.refTSOffset == 0 {
f.refTSOffset = extExpectedTS - extRefTS
if f.dummyStartTSOffset == 0 {
f.dummyStartTSOffset = extExpectedTS - extRefTS
f.logger.Infow(
"calculating refTSOffset",
"calculating dummyStartTSOffset",
"preStartTime", f.preStartTime.String(),
"extFirstTS", f.extFirstTS,
"timeSinceFirst", timeSinceFirst,
"rtpDiff", rtpDiff,
"extRefTS", extRefTS,
"refTSOffset", f.refTSOffset,
"dummyStartTSOffset", f.dummyStartTSOffset,
)
}
}
}
}
extRefTS += f.refTSOffset
extRefTS += f.dummyStartTSOffset
var extNextTS uint64
if f.lastSSRC == 0 {
@@ -1689,7 +1690,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
// timestamp should be used as things will catch up to real time when channel capacity
// increases and pacer starts sending at faster rate.
//
// But, the challenege is distinguishing between the two cases. As a compromise, the difference
// But, the challenge is distinguishing between the two cases. As a compromise, the difference
// between extExpectedTS and extRefTS is thresholded. Difference below the threshold is treated as Case 2
// and above as Case 1.
//
@@ -1734,14 +1735,16 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
logTransition("layer switch, reference too far behind", extExpectedTS, extRefTS, extLastTS, diffSeconds)
return errors.New("switch point too far behind")
}
// use a nominal increase to ensure that timestamp is always moving forward
logTransition("layer switch, reference is slightly behind", extExpectedTS, extRefTS, extLastTS, diffSeconds)
extNextTS = extLastTS + 1
} else {
diffSeconds = float64(int64(extExpectedTS-extRefTS)) / float64(f.codec.ClockRate)
if diffSeconds < 0.0 && math.Abs(diffSeconds) > SwitchAheadThresholdSeconds {
diffSeconds = float64(int64(extRefTS-extExpectedTS)) / float64(f.codec.ClockRate)
if diffSeconds > SwitchAheadThresholdSeconds {
logTransition("layer switch, reference too far ahead", extExpectedTS, extRefTS, extLastTS, diffSeconds)
}
extNextTS = extRefTS
}
}
@@ -1757,7 +1760,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
"layer", layer,
"extLastTS", extLastTS,
"extRefTS", extRefTS,
"refTSOffset", f.refTSOffset,
"dummyStartTSOffset", f.dummyStartTSOffset,
"referenceLayerSpatial", f.referenceLayerSpatial,
"extExpectedTS", extExpectedTS,
"extNextTS", extNextTS,