diff --git a/pkg/sfu/buffer/rtpstats_base.go b/pkg/sfu/buffer/rtpstats_base.go index 04028465d..0b5139b37 100644 --- a/pkg/sfu/buffer/rtpstats_base.go +++ b/pkg/sfu/buffer/rtpstats_base.go @@ -529,6 +529,11 @@ func (r *rtpStatsBase) maybeAdjustFirstPacketTime(srData *RTCPSenderReportData, "adjustment", r.firstTime.Sub(firstTime).String(), "extNowTS", extNowTS, "extStartTS", extStartTS, + "srData", srData, + "timeSinceReceive", timeSinceReceive.String(), + "timeSinceFirst", timeSinceFirst.String(), + "samplesDiff", samplesDiff, + "samplesDuration", samplesDuration, } } diff --git a/pkg/sfu/buffer/rtpstats_receiver.go b/pkg/sfu/buffer/rtpstats_receiver.go index bd1b16341..979e3a623 100644 --- a/pkg/sfu/buffer/rtpstats_receiver.go +++ b/pkg/sfu/buffer/rtpstats_receiver.go @@ -212,18 +212,21 @@ func (r *RTPStatsReceiver) Update( flowState.ExtSequenceNumber = resSN.ExtendedVal flowState.ExtTimestamp = resTS.ExtendedVal } else { // in-order - if gapSN >= cNumSequenceNumbers/2 { + if gapSN >= cNumSequenceNumbers/2 || resTS.ExtendedVal < resTS.PreExtendedHighest { r.logger.Warnw( - "large sequence number gap", nil, + "large sequence number gap OR time reversed", nil, "extStartSN", r.sequenceNumber.GetExtendedStart(), "extHighestSN", r.sequenceNumber.GetExtendedHighest(), "extStartTS", r.timestamp.GetExtendedStart(), "extHighestTS", r.timestamp.GetExtendedHighest(), "firstTime", r.firstTime.String(), "highestTime", r.highestTime.String(), - "prev", resSN.PreExtendedHighest, - "curr", resSN.ExtendedVal, - "gap", gapSN, + "prevSN", resSN.PreExtendedHighest, + "currSN", resSN.ExtendedVal, + "gapSN", gapSN, + "prevTS", resTS.PreExtendedHighest, + "currTS", resTS.ExtendedVal, + "gapTS", resTS.ExtendedVal-resTS.PreExtendedHighest, "packetTime", packetTime.String(), "sequenceNumber", sequenceNumber, "timestamp", timestamp, diff --git a/pkg/sfu/buffer/rtpstats_sender.go b/pkg/sfu/buffer/rtpstats_sender.go index be3f12990..6dafb1047 100644 --- a/pkg/sfu/buffer/rtpstats_sender.go +++ b/pkg/sfu/buffer/rtpstats_sender.go @@ -33,6 +33,8 @@ const ( cSenderReportInitialWait = time.Second ) +// ------------------------------------------------------------------- + type snInfoFlag byte const ( @@ -625,7 +627,7 @@ func (r *RTPStatsSender) GetExpectedRTPTimestamp(at time.Time) (expectedTSExt ui defer r.lock.RUnlock() if !r.initialized { - err = errors.New("uninitilaized") + err = errors.New("uninitialized") return } diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 92c354c3b..6b8415972 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -191,7 +191,7 @@ type ForwarderState struct { ReferenceLayerSpatial int32 PreStartTime time.Time ExtFirstTS uint64 - RefTSOffset uint64 + DummyStartTSOffset uint64 RTP RTPMungerState Codec interface{} } @@ -202,12 +202,12 @@ func (f ForwarderState) String() string { case codecmunger.VP8State: codecString = codecState.String() } - return fmt.Sprintf("ForwarderState{started: %v, referenceLayerSpatial: %d, preStartTime: %s, extFirstTS: %d, refTSOffset: %d, rtp: %s, codec: %s}", + return fmt.Sprintf("ForwarderState{started: %v, referenceLayerSpatial: %d, preStartTime: %s, extFirstTS: %d, dummyStartTSOffset: %d, rtp: %s, codec: %s}", f.Started, f.ReferenceLayerSpatial, f.PreStartTime.String(), f.ExtFirstTS, - f.RefTSOffset, + f.DummyStartTSOffset, f.RTP.String(), codecString, ) @@ -232,7 +232,7 @@ type Forwarder struct { extFirstTS uint64 lastSSRC uint32 referenceLayerSpatial int32 - refTSOffset uint64 + dummyStartTSOffset uint64 refSenderReports [buffer.DefaultMaxLayerSpatial + 1]*buffer.RTCPSenderReportData refIsSVC bool @@ -383,7 +383,7 @@ func (f *Forwarder) GetState() ForwarderState { ReferenceLayerSpatial: f.referenceLayerSpatial, PreStartTime: f.preStartTime, ExtFirstTS: f.extFirstTS, - RefTSOffset: f.refTSOffset, + DummyStartTSOffset: f.dummyStartTSOffset, RTP: f.rtpMunger.GetLast(), Codec: f.codecMunger.GetState(), } @@ -404,7 +404,7 @@ func (f *Forwarder) SeedState(state ForwarderState) { f.referenceLayerSpatial = state.ReferenceLayerSpatial f.preStartTime = state.PreStartTime f.extFirstTS = state.ExtFirstTS - f.refTSOffset = state.RefTSOffset + f.dummyStartTSOffset = state.DummyStartTSOffset } func (f *Forwarder) Mute(muted bool, isSubscribeMutable bool) bool { @@ -578,7 +578,7 @@ func (f *Forwarder) clearRefSenderReportsLocked() { // This is done to prevent use of potentially stale publisher sender reports. // // It is possible to implement mute using pause/unpause - // which can implemented using a replaceTrack(null)/replaceTrack(track). + // which can be implemented using replaceTrack(null)/replaceTrack(track). // In those cases, the RTP time stamp may not jump across // the mute/pause valley (for the time it is replaced with null track). // So, relying on a report that happened before unmute/unpause @@ -591,7 +591,8 @@ func (f *Forwarder) clearRefSenderReportsLocked() { // 2. Publisher pauses: there are no more reports. // 3. When paused, subscriber can still use the publisher side sender // report to send reports. Although the time since last publisher - // sender report is increasing, the reports are correct though. + // sender report is increasing, the reports would still be correct + // as they referencing a previous (albeit older) correct report. // 4. Publisher unpauses after 20 seconds. But, it may not have advanced // RTP Timestamp by that much. Let us say, it advances only by 5 seconds. // 5. When subscriber starts forwarding packets, it will calculate @@ -1623,7 +1624,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e rtpMungerState := f.rtpMunger.GetLast() extLastTS := rtpMungerState.ExtLastTS extExpectedTS := extLastTS - extRefTS := extExpectedTS + extRefTS := extLastTS refTS := uint32(extRefTS) switchingAt := time.Now() if !f.skipReferenceTS { @@ -1639,13 +1640,13 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e } } + // adjust extRefTS to current packet's timestamp mapped to that of reference layer's extRefTS = (extRefTS & 0xFFFF_FFFF_0000_0000) + uint64(refTS) - - expectedTS := uint32(extExpectedTS) - if (refTS-expectedTS) < 1<<31 && refTS < expectedTS { + lastTS := uint32(extLastTS) + if (refTS-lastTS) < 1<<31 && refTS < lastTS { extRefTS += (1 << 32) } - if (expectedTS-refTS) < 1<<31 && expectedTS < refTS && extRefTS >= 1<<32 { + if (lastTS-refTS) < 1<<31 && lastTS < refTS && extRefTS >= 1<<32 { extRefTS -= (1 << 32) } @@ -1658,22 +1659,22 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e timeSinceFirst := time.Since(f.preStartTime) rtpDiff := uint64(timeSinceFirst.Nanoseconds() * int64(f.codec.ClockRate) / 1e9) extExpectedTS = f.extFirstTS + rtpDiff - if f.refTSOffset == 0 { - f.refTSOffset = extExpectedTS - extRefTS + if f.dummyStartTSOffset == 0 { + f.dummyStartTSOffset = extExpectedTS - extRefTS f.logger.Infow( - "calculating refTSOffset", + "calculating dummyStartTSOffset", "preStartTime", f.preStartTime.String(), "extFirstTS", f.extFirstTS, "timeSinceFirst", timeSinceFirst, "rtpDiff", rtpDiff, "extRefTS", extRefTS, - "refTSOffset", f.refTSOffset, + "dummyStartTSOffset", f.dummyStartTSOffset, ) } } } } - extRefTS += f.refTSOffset + extRefTS += f.dummyStartTSOffset var extNextTS uint64 if f.lastSSRC == 0 { @@ -1689,7 +1690,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e // timestamp should be used as things will catch up to real time when channel capacity // increases and pacer starts sending at faster rate. // - // But, the challenege is distinguishing between the two cases. As a compromise, the difference + // But, the challenge is distinguishing between the two cases. As a compromise, the difference // between extExpectedTS and extRefTS is thresholded. Difference below the threshold is treated as Case 2 // and above as Case 1. // @@ -1734,14 +1735,16 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e logTransition("layer switch, reference too far behind", extExpectedTS, extRefTS, extLastTS, diffSeconds) return errors.New("switch point too far behind") } + // use a nominal increase to ensure that timestamp is always moving forward logTransition("layer switch, reference is slightly behind", extExpectedTS, extRefTS, extLastTS, diffSeconds) extNextTS = extLastTS + 1 } else { - diffSeconds = float64(int64(extExpectedTS-extRefTS)) / float64(f.codec.ClockRate) - if diffSeconds < 0.0 && math.Abs(diffSeconds) > SwitchAheadThresholdSeconds { + diffSeconds = float64(int64(extRefTS-extExpectedTS)) / float64(f.codec.ClockRate) + if diffSeconds > SwitchAheadThresholdSeconds { logTransition("layer switch, reference too far ahead", extExpectedTS, extRefTS, extLastTS, diffSeconds) } + extNextTS = extRefTS } } @@ -1757,7 +1760,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e "layer", layer, "extLastTS", extLastTS, "extRefTS", extRefTS, - "refTSOffset", f.refTSOffset, + "dummyStartTSOffset", f.dummyStartTSOffset, "referenceLayerSpatial", f.referenceLayerSpatial, "extExpectedTS", extExpectedTS, "extNextTS", extNextTS,