diff --git a/pkg/rtc/wrappedreceiver.go b/pkg/rtc/wrappedreceiver.go index 593a62487..853d2c4f4 100644 --- a/pkg/rtc/wrappedreceiver.go +++ b/pkg/rtc/wrappedreceiver.go @@ -317,9 +317,9 @@ func (d *DummyReceiver) GetCalculatedClockRate(layer int32) uint32 { return 0 } -func (d *DummyReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) { +func (d *DummyReceiver) GetReferenceLayerRTPTimestamp(ets uint64, layer int32, referenceLayer int32) (uint64, error) { if r, ok := d.receiver.Load().(sfu.TrackReceiver); ok { - return r.GetReferenceLayerRTPTimestamp(ts, layer, referenceLayer) + return r.GetReferenceLayerRTPTimestamp(ets, layer, referenceLayer) } return 0, errors.New("receiver not available") } diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index 0550f9cf6..73704e883 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -55,6 +55,7 @@ type ExtPacket struct { VideoLayer Arrival time.Time ExtSequenceNumber uint32 + ExtTimestamp uint64 Packet *rtp.Packet Payload interface{} KeyFrame bool @@ -414,21 +415,21 @@ func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) { return } - extSeqNumber, isOutOfOrder := b.updateStreamState(&rtpPacket, arrivalTime) + flowState := b.updateStreamState(&rtpPacket, arrivalTime) b.processHeaderExtensions(&rtpPacket, arrivalTime) - if !isOutOfOrder && len(rtpPacket.Payload) == 0 { + if !flowState.IsOutOfOrder && len(rtpPacket.Payload) == 0 { // drop padding only in-order packet b.snRangeMap.IncValue(1) return } // add to RTX buffer using sequence number after accounting for dropped padding only packets - snAdjustment, err := b.snRangeMap.GetValue(extSeqNumber) + snAdjustment, err := b.snRangeMap.GetValue(flowState.ExtSequenceNumber) if err != nil { b.logger.Errorw("could not get sequence number adjustment", err) return } - rtpPacket.Header.SequenceNumber = uint16(extSeqNumber - snAdjustment) + rtpPacket.Header.SequenceNumber = uint16(flowState.ExtSequenceNumber - snAdjustment) _, err = b.bucket.AddPacketWithSequenceNumber(pkt, rtpPacket.Header.SequenceNumber) if err != nil { if err != bucket.ErrRTXPacket { @@ -441,7 +442,7 @@ func (b *Buffer) calc(pkt []byte, arrivalTime time.Time) { b.doReports(arrivalTime) - ep := b.getExtPacket(&rtpPacket, extSeqNumber, arrivalTime) + ep := b.getExtPacket(&rtpPacket, arrivalTime, flowState) if ep == nil { return } @@ -499,7 +500,7 @@ func (b *Buffer) doFpsCalc(ep *ExtPacket) { } } -func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime time.Time) (uint32, bool) { +func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime time.Time) RTPFlowState { flowState := b.rtpStats.Update(&p.Header, len(p.Payload), int(p.PaddingSize), arrivalTime) if b.nacker != nil { @@ -513,7 +514,7 @@ func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime time.Time) (uint32 } } - return flowState.ExtSeqNumber, flowState.IsOutOfOrder + return flowState } func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime time.Time) { @@ -546,10 +547,11 @@ func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime time.Time) { } } -func (b *Buffer) getExtPacket(rtpPacket *rtp.Packet, extSeqNumber uint32, arrivalTime time.Time) *ExtPacket { +func (b *Buffer) getExtPacket(rtpPacket *rtp.Packet, arrivalTime time.Time, flowState RTPFlowState) *ExtPacket { ep := &ExtPacket{ Arrival: arrivalTime, - ExtSequenceNumber: extSeqNumber, + ExtSequenceNumber: flowState.ExtSequenceNumber, + ExtTimestamp: flowState.ExtTimestamp, Packet: rtpPacket, VideoLayer: VideoLayer{ Spatial: InvalidLayerSpatial, diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index a55c12a42..4c17c93e4 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -71,7 +71,8 @@ type RTPFlowState struct { IsOutOfOrder bool - ExtSeqNumber uint32 + ExtSequenceNumber uint32 + ExtTimestamp uint64 } type IntervalStats struct { @@ -152,8 +153,7 @@ type RTPStats struct { lock sync.RWMutex - initialized bool - resyncOnNextPacket bool + initialized bool startTime time.Time endTime time.Time @@ -245,7 +245,6 @@ func (r *RTPStats) Seed(from *RTPStats) { } r.initialized = from.initialized - r.resyncOnNextPacket = from.resyncOnNextPacket r.startTime = from.startTime // do not clone endTime as a non-zero endTime indicates an ended object @@ -375,16 +374,6 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa return } - if r.resyncOnNextPacket { - r.resyncOnNextPacket = false - - if r.initialized { - r.sequenceNumber.ResetHighest(rtph.SequenceNumber - 1) - r.timestamp.ResetHighest(rtph.Timestamp) - r.highestTime = packetTime - } - } - var resSN utils.WrapAroundUpdateResult[uint32] var resTS utils.WrapAroundUpdateResult[uint64] if !r.initialized { @@ -417,8 +406,8 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa "rtp stream start", "startTime", r.startTime.String(), "firstTime", r.firstTime.String(), - "startSN", r.sequenceNumber.GetExtendedHighest(), - "startTS", r.timestamp.GetExtendedHighest(), + "startSN", r.sequenceNumber.GetExtendedStart(), + "startTS", r.timestamp.GetExtendedStart(), ) } else { resSN = r.sequenceNumber.Update(rtph.SequenceNumber) @@ -483,7 +472,8 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa } flowState.IsOutOfOrder = true - flowState.ExtSeqNumber = resSN.ExtendedVal + flowState.ExtSequenceNumber = resSN.ExtendedVal + flowState.ExtTimestamp = resTS.ExtendedVal } else { // in-order // update gap histogram r.updateGapHistogram(int(gapSN)) @@ -505,7 +495,8 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa flowState.LossStartInclusive = resSN.PreExtendedHighest + 1 flowState.LossEndExclusive = resSN.ExtendedVal } - flowState.ExtSeqNumber = resSN.ExtendedVal + flowState.ExtSequenceNumber = resSN.ExtendedVal + flowState.ExtTimestamp = resTS.ExtendedVal } if !isDuplicate { @@ -527,11 +518,16 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa return } -func (r *RTPStats) ResyncOnNextPacket() { +func (r *RTPStats) Resync(esn uint32, ets uint64, at time.Time) { r.lock.Lock() defer r.lock.Unlock() - r.resyncOnNextPacket = true + if !r.initialized { + return + } + r.sequenceNumber.ResetHighest(esn - 1) + r.timestamp.ResetHighest(ets) + r.highestTime = at } func (r *RTPStats) getPacketsExpected() uint32 { @@ -788,11 +784,11 @@ func (r *RTPStats) MaybeAdjustFirstPacketTime(srData *RTCPSenderReportData) { defer r.lock.Unlock() if srData != nil { - r.maybeAdjustFirstPacketTime(srData.RTPTimestamp) + r.maybeAdjustFirstPacketTime(srData.RTPTimestampExt) } } -func (r *RTPStats) maybeAdjustFirstPacketTime(ts uint32) { +func (r *RTPStats) maybeAdjustFirstPacketTime(ets uint64) { if time.Since(r.startTime) > firstPacketTimeAdjustWindow { return } @@ -803,7 +799,7 @@ func (r *RTPStats) maybeAdjustFirstPacketTime(ts uint32) { // abnormal delay (maybe due to pacing or maybe due to queuing // in some network element along the way), push back first time // to an earlier instance. - samplesDiff := int32(ts - uint32(r.timestamp.GetExtendedStart())) + samplesDiff := int64(ets - r.timestamp.GetExtendedStart()) if samplesDiff < 0 { // out-of-order, skip return @@ -819,7 +815,7 @@ func (r *RTPStats) maybeAdjustFirstPacketTime(ts uint32) { "before", r.firstTime.String(), "after", firstTime.String(), "adjustment", r.firstTime.Sub(firstTime), - "nowTS", ts, + "extNowTS", ets, "extStartTS", r.timestamp.GetExtendedStart(), ) if r.firstTime.Sub(firstTime) > firstPacketTimeAdjustThreshold { @@ -829,7 +825,7 @@ func (r *RTPStats) maybeAdjustFirstPacketTime(ts uint32) { "before", r.firstTime.String(), "after", firstTime.String(), "adjustment", r.firstTime.Sub(firstTime), - "nowTS", ts, + "extNowTS", ets, "extStartTS", r.timestamp.GetExtendedStart(), ) } else { @@ -864,12 +860,17 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) { if (srData.RTPTimestamp-r.srNewest.RTPTimestamp) < (1<<31) && srData.RTPTimestamp < r.srNewest.RTPTimestamp { cycles += (1 << 32) } + + ntpDiffSinceLast := srData.NTPTimestamp.Time().Sub(r.srNewest.NTPTimestamp.Time()) + rtpDiff := uint64(ntpDiffSinceLast.Seconds() * float64(r.params.ClockRate)) + goArounds := rtpDiff / (1 << 32) + cycles += goArounds * (1 << 32) } srDataCopy := *srData srDataCopy.RTPTimestampExt = uint64(srDataCopy.RTPTimestamp) + cycles - r.maybeAdjustFirstPacketTime(srDataCopy.RTPTimestamp) + r.maybeAdjustFirstPacketTime(srDataCopy.RTPTimestampExt) // monitor and log RTP timestamp anomalies var ntpDiffSinceLast time.Duration @@ -1018,13 +1019,13 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, calculatedClockRate uint32) // monitor and log RTP timestamp anomalies var ntpDiffSinceLast time.Duration - var rtpDiffSinceLast uint32 + var rtpDiffSinceLast uint64 var departureDiffSinceLast time.Duration var expectedTimeDiffSinceLast float64 var isWarped bool if r.srNewest != nil { ntpDiffSinceLast = nowNTP.Time().Sub(r.srNewest.NTPTimestamp.Time()) - rtpDiffSinceLast = nowRTP - r.srNewest.RTPTimestamp + rtpDiffSinceLast = nowRTPExt - r.srNewest.RTPTimestampExt departureDiffSinceLast = now.Sub(r.srNewest.At) expectedTimeDiffSinceLast = float64(rtpDiffSinceLast) / float64(r.params.ClockRate) diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 69363e65a..eab995bb8 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -157,8 +157,8 @@ type ForwarderState struct { Started bool ReferenceLayerSpatial int32 PreStartTime time.Time - FirstTS uint32 - RefTSOffset uint32 + ExtFirstTS uint64 + RefTSOffset uint64 RTP RTPMungerState Codec interface{} } @@ -169,11 +169,11 @@ func (f ForwarderState) String() string { case codecmunger.VP8State: codecString = codecState.String() } - return fmt.Sprintf("ForwarderState{started: %v, referenceLayerSpatial: %d, preStartTime: %s, firstTS: %d, refTSOffset: %d, rtp: %s, codec: %s}", + return fmt.Sprintf("ForwarderState{started: %v, referenceLayerSpatial: %d, preStartTime: %s, extFirstTS: %d, refTSOffset: %d, rtp: %s, codec: %s}", f.Started, f.ReferenceLayerSpatial, f.PreStartTime.String(), - f.FirstTS, + f.ExtFirstTS, f.RefTSOffset, f.RTP.String(), codecString, @@ -187,7 +187,7 @@ type Forwarder struct { codec webrtc.RTPCodecCapability kind webrtc.RTPCodecType logger logger.Logger - getReferenceLayerRTPTimestamp func(ts uint32, layer int32, referenceLayer int32) (uint32, error) + getReferenceLayerRTPTimestamp func(ets uint64, layer int32, referenceLayer int32) (uint64, error) getExpectedRTPTimestamp func(at time.Time) (uint64, error) muted bool @@ -196,10 +196,10 @@ type Forwarder struct { started bool preStartTime time.Time - firstTS uint32 + extFirstTS uint64 lastSSRC uint32 referenceLayerSpatial int32 - refTSOffset uint32 + refTSOffset uint64 provisional *VideoAllocationProvisional @@ -215,7 +215,7 @@ type Forwarder struct { func NewForwarder( kind webrtc.RTPCodecType, logger logger.Logger, - getReferenceLayerRTPTimestamp func(ts uint32, layer int32, referenceLayer int32) (uint32, error), + getReferenceLayerRTPTimestamp func(ets uint64, layer int32, referenceLayer int32) (uint64, error), getExpectedRTPTimestamp func(at time.Time) (uint64, error), ) *Forwarder { f := &Forwarder{ @@ -335,7 +335,7 @@ func (f *Forwarder) GetState() ForwarderState { Started: f.started, ReferenceLayerSpatial: f.referenceLayerSpatial, PreStartTime: f.preStartTime, - FirstTS: f.firstTS, + ExtFirstTS: f.extFirstTS, RefTSOffset: f.refTSOffset, RTP: f.rtpMunger.GetLast(), Codec: f.codecMunger.GetState(), @@ -356,7 +356,7 @@ func (f *Forwarder) SeedState(state ForwarderState) { f.started = true f.referenceLayerSpatial = state.ReferenceLayerSpatial f.preStartTime = state.PreStartTime - f.firstTS = state.FirstTS + f.extFirstTS = state.ExtFirstTS f.refTSOffset = state.RefTSOffset } @@ -1444,13 +1444,13 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e return nil } - logTransition := func(message string, expectedTS, refTS, lastTS uint32, diffSeconds float64) { + logTransition := func(message string, extExpectedTS, extRefTS, extLastTS uint64, diffSeconds float64) { f.logger.Debugw( message, "layer", layer, - "expectedTS", expectedTS, - "refTS", refTS, - "lastTS", lastTS, + "extExpectedTS", extExpectedTS, + "extRefTS", extRefTS, + "extLastTS", extLastTS, "diffSeconds", math.Abs(diffSeconds), ) } @@ -1460,20 +1460,20 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e // timestamp offset on source change. // // There are three timestamps to consider here - // 1. lastTS -> timestamp of last sent packet - // 2. refTS -> timestamp of this packet (after munging) calculated using feed's RTCP sender report - // 3. expectedTS -> expected timestamp of this packet calculated based on elapsed time since first packet - // Ideally, refTS and expectedTS should be very close and lastTS should be before both of those. + // 1. extLastTS -> timestamp of last sent packet + // 2. extRefTS -> timestamp of this packet (after munging) calculated using feed's RTCP sender report + // 3. extExpectedTS -> expected timestamp of this packet calculated based on elapsed time since first packet + // Ideally, extRefTS and extExpectedTS should be very close and extLastTS should be before both of those. // But, cases like muting/unmuting, clock vagaries, pacing, etc. make them not satisfy those conditions always. rtpMungerState := f.rtpMunger.GetLast() - lastTS := rtpMungerState.LastTS - refTS := lastTS - expectedTS := lastTS + extLastTS := rtpMungerState.ExtLastTS + extRefTS := extLastTS + extExpectedTS := extLastTS switchingAt := time.Now() if f.getReferenceLayerRTPTimestamp != nil { - ts, err := f.getReferenceLayerRTPTimestamp(extPkt.Packet.Timestamp, layer, f.referenceLayerSpatial) + ets, err := f.getReferenceLayerRTPTimestamp(extPkt.ExtTimestamp, layer, f.referenceLayerSpatial) if err != nil { - // error out if refTS is not available. It can happen when there is no sender report + // error out if extRefTS is not available. It can happen when there is no sender report // for the layer being switched to. Can especially happen at the start of the track when layer switches are // potentially happening very quickly. Erroring out and waiting for a layer for which a sender report has been // received will calculate a better offset, but may result in initial adaptation to take a bit longer depending @@ -1481,35 +1481,35 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e return err } - refTS = ts + extRefTS = ets } if f.getExpectedRTPTimestamp != nil { tsExt, err := f.getExpectedRTPTimestamp(switchingAt) if err == nil { - expectedTS = uint32(tsExt) + extExpectedTS = tsExt } else { - rtpDiff := uint32(0) + rtpDiff := uint64(0) if !f.preStartTime.IsZero() && f.refTSOffset == 0 { timeSinceFirst := time.Since(f.preStartTime) - rtpDiff = uint32(timeSinceFirst.Nanoseconds() * int64(f.codec.ClockRate) / 1e9) - f.refTSOffset = f.firstTS + rtpDiff - refTS + rtpDiff = uint64(timeSinceFirst.Nanoseconds() * int64(f.codec.ClockRate) / 1e9) + f.refTSOffset = f.extFirstTS + rtpDiff - extRefTS f.logger.Infow( "calculating refTSOffset", "preStartTime", f.preStartTime.String(), - "firstTS", f.firstTS, + "extFirstTS", f.extFirstTS, "timeSinceFirst", timeSinceFirst, "rtpDiff", rtpDiff, - "refTS", refTS, + "extRefTS", extRefTS, "refTSOffset", f.refTSOffset, ) } - expectedTS += rtpDiff + extExpectedTS += rtpDiff } } - refTS += f.refTSOffset + extRefTS += f.refTSOffset - var nextTS uint32 + var extNextTS uint64 if f.lastSSRC == 0 { // If resuming (e. g. on unmute), keep next timestamp close to expected timestamp. // @@ -1524,71 +1524,71 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e // increases and pacer starts sending at faster rate. // // But, the challenege is distinguishing between the two cases. As a compromise, the difference - // between expectedTS and refTS is thresholded. Difference below the threshold is treated as Case 2 + // between extExpectedTS and extRefTS is thresholded. Difference below the threshold is treated as Case 2 // and above as Case 1. // - // In the event of refTS > expectedTS, use refTS. - // Ideally, refTS should not be ahead of expectedTS, but expectedTS uses the first packet's + // In the event of extRefTS > extExpectedTS, use extRefTS. + // Ideally, extRefTS should not be ahead of extExpectedTS, but extExpectedTS uses the first packet's // wall clock time. So, if the first packet experienced abmormal latency, it is possible - // for refTS > expectedTS - diffSeconds := float64(int32(expectedTS-refTS)) / float64(f.codec.ClockRate) + // for extRefTS > extExpectedTS + diffSeconds := float64(int64(extExpectedTS-extRefTS)) / float64(f.codec.ClockRate) if diffSeconds >= 0.0 { if f.resumeBehindThreshold > 0 && diffSeconds > f.resumeBehindThreshold { - logTransition("resume, reference too far behind", expectedTS, refTS, lastTS, diffSeconds) - nextTS = expectedTS + logTransition("resume, reference too far behind", extExpectedTS, extRefTS, extLastTS, diffSeconds) + extNextTS = extExpectedTS } else { - nextTS = refTS + extNextTS = extRefTS } } else { if math.Abs(diffSeconds) > SwitchAheadThresholdSeconds { - logTransition("resume, reference too far ahead", expectedTS, refTS, lastTS, diffSeconds) + logTransition("resume, reference too far ahead", extExpectedTS, extRefTS, extLastTS, diffSeconds) } - nextTS = refTS + extNextTS = extRefTS } f.resumeBehindThreshold = 0.0 } else { - // switching between layers, check if refTS is too far behind the last sent - diffSeconds := float64(int32(refTS-lastTS)) / float64(f.codec.ClockRate) + // switching between layers, check if extRefTS is too far behind the last sent + diffSeconds := float64(int64(extRefTS-extLastTS)) / float64(f.codec.ClockRate) if diffSeconds < 0.0 { if math.Abs(diffSeconds) > LayerSwitchBehindThresholdSeconds { // this could be due to pacer trickling out this layer. Error out and wait for a more opportune time. // AVSYNC-TODO: Consider some forcing function to do the switch // (like "have waited for too long for layer switch, nothing available, switch to whatever is available" kind of condition). - logTransition("layer switch, reference too far behind", expectedTS, refTS, lastTS, diffSeconds) + logTransition("layer switch, reference too far behind", extExpectedTS, extRefTS, extLastTS, diffSeconds) return errors.New("switch point too far behind") } // use a nominal increase to ensure that timestamp is always moving forward - logTransition("layer switch, reference is slightly behind", expectedTS, refTS, lastTS, diffSeconds) - nextTS = lastTS + 1 + logTransition("layer switch, reference is slightly behind", extExpectedTS, extRefTS, extLastTS, diffSeconds) + extNextTS = extLastTS + 1 } else { - diffSeconds = float64(int32(expectedTS-refTS)) / float64(f.codec.ClockRate) + diffSeconds = float64(int64(extExpectedTS-extRefTS)) / float64(f.codec.ClockRate) if diffSeconds < 0.0 && math.Abs(diffSeconds) > SwitchAheadThresholdSeconds { - logTransition("layer switch, reference too far ahead", expectedTS, refTS, lastTS, diffSeconds) + logTransition("layer switch, reference too far ahead", extExpectedTS, extRefTS, extLastTS, diffSeconds) } - nextTS = refTS + extNextTS = extRefTS } } - if nextTS-lastTS == 0 || nextTS-lastTS > (1<<31) { - f.logger.Debugw("next timestamp is before last, adjusting", "nextTS", nextTS, "lastTS", lastTS) + if int64(extNextTS-extLastTS) <= 0 { + f.logger.Debugw("next timestamp is before last, adjusting", "extNextTS", extNextTS, "extLastTS", extLastTS) // nominal increase - nextTS = lastTS + 1 + extNextTS = extLastTS + 1 } f.logger.Debugw( "next timestamp on switch", "switchingAt", switchingAt.String(), "layer", layer, - "lastTS", lastTS, - "refTS", refTS, + "extLastTS", extLastTS, + "extRefTS", extRefTS, "refTSOffset", f.refTSOffset, "referenceLayerSpatial", f.referenceLayerSpatial, - "expectedTS", expectedTS, - "nextTS", nextTS, - "tsJump", nextTS-lastTS, + "extExpectedTS", extExpectedTS, + "extNextTS", extNextTS, + "tsJump", extNextTS-extLastTS, "nextSN", rtpMungerState.ExtLastSN+1, ) - f.rtpMunger.UpdateSnTsOffsets(extPkt, 1, nextTS-lastTS) + f.rtpMunger.UpdateSnTsOffsets(extPkt, 1, extNextTS-extLastTS) f.codecMunger.UpdateOffsets(extPkt) return nil } @@ -1733,7 +1733,7 @@ func (f *Forwarder) maybeStart() { } f.rtpMunger.SetLastSnTs(extPkt) - f.firstTS = extPkt.Packet.Timestamp + f.extFirstTS = uint64(extPkt.Packet.Timestamp) f.logger.Debugw( "starting with dummy forwarding", "sequenceNumber", extPkt.Packet.SequenceNumber, @@ -1770,18 +1770,18 @@ func (f *Forwarder) GetSnTsForBlankFrames(frameRate uint32, numPackets int) ([]S numPackets++ } - lastTS := f.rtpMunger.GetLast().LastTS - expectedTS := lastTS + extLastTS := f.rtpMunger.GetLast().ExtLastTS + extExpectedTS := extLastTS if f.getExpectedRTPTimestamp != nil { tsExt, err := f.getExpectedRTPTimestamp(time.Now()) if err == nil { - expectedTS = uint32(tsExt) + extExpectedTS = tsExt } } - if expectedTS-lastTS == 0 || expectedTS-lastTS > (1<<31) { - expectedTS = lastTS + 1 + if int64(extExpectedTS-extLastTS) <= 0 { + extExpectedTS = extLastTS + 1 } - snts, err := f.rtpMunger.UpdateAndGetPaddingSnTs(numPackets, f.codec.ClockRate, frameRate, frameEndNeeded, expectedTS) + snts, err := f.rtpMunger.UpdateAndGetPaddingSnTs(numPackets, f.codec.ClockRate, frameRate, frameEndNeeded, extExpectedTS) return snts, frameEndNeeded, err } diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 8aa5dcac3..c725dbb69 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -81,7 +81,7 @@ type TrackReceiver interface { GetTemporalLayerFpsForSpatial(layer int32) []float32 GetCalculatedClockRate(layer int32) uint32 - GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) + GetReferenceLayerRTPTimestamp(ets uint64, layer int32, referenceLayer int32) (uint64, error) } // WebRTCReceiver receives a media track @@ -777,8 +777,8 @@ func (w *WebRTCReceiver) GetCalculatedClockRate(layer int32) uint32 { return w.streamTrackerManager.GetCalculatedClockRate(layer) } -func (w *WebRTCReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) { - return w.streamTrackerManager.GetReferenceLayerRTPTimestamp(ts, layer, referenceLayer) +func (w *WebRTCReceiver) GetReferenceLayerRTPTimestamp(ets uint64, layer int32, referenceLayer int32) (uint64, error) { + return w.streamTrackerManager.GetReferenceLayerRTPTimestamp(ets, layer, referenceLayer) } // closes all track senders in parallel, returns when all are closed diff --git a/pkg/sfu/rtpmunger.go b/pkg/sfu/rtpmunger.go index 3a79cac93..3f26ddfe7 100644 --- a/pkg/sfu/rtpmunger.go +++ b/pkg/sfu/rtpmunger.go @@ -54,11 +54,11 @@ type SnTs struct { type RTPMungerState struct { ExtLastSN uint32 - LastTS uint32 + ExtLastTS uint64 } func (r RTPMungerState) String() string { - return fmt.Sprintf("RTPMungerState{extLastSN: %d, lastTS: %d)", r.ExtLastSN, r.LastTS) + return fmt.Sprintf("RTPMungerState{extLastSN: %d, extLastTS: %d)", r.ExtLastSN, r.ExtLastTS) } // ---------------------------------------------------------------------- @@ -70,8 +70,8 @@ type RTPMunger struct { snRangeMap *utils.RangeMap[uint32, uint32] extLastSN uint32 - lastTS uint32 - tsOffset uint32 + extLastTS uint64 + tsOffset uint64 lastMarker bool extRtxGateSn uint32 @@ -91,7 +91,7 @@ func (r *RTPMunger) DebugInfo() map[string]interface{} { "ExtHighestIncomingSN": r.extHighestIncomingSN, "ExtLastSN": r.extLastSN, "SNOffset": snOffset, - "LastTS": r.lastTS, + "ExtLastTS": r.extLastTS, "TSOffset": r.tsOffset, "LastMarker": r.lastMarker, } @@ -100,25 +100,25 @@ func (r *RTPMunger) DebugInfo() map[string]interface{} { func (r *RTPMunger) GetLast() RTPMungerState { return RTPMungerState{ ExtLastSN: r.extLastSN, - LastTS: r.lastTS, + ExtLastTS: r.extLastTS, } } func (r *RTPMunger) SeedLast(state RTPMungerState) { r.extLastSN = state.ExtLastSN - r.lastTS = state.LastTS + r.extLastTS = state.ExtLastTS } func (r *RTPMunger) SetLastSnTs(extPkt *buffer.ExtPacket) { r.extHighestIncomingSN = extPkt.ExtSequenceNumber - 1 r.extLastSN = extPkt.ExtSequenceNumber - r.lastTS = extPkt.Packet.Timestamp + r.extLastTS = extPkt.ExtTimestamp } -func (r *RTPMunger) UpdateSnTsOffsets(extPkt *buffer.ExtPacket, snAdjust uint32, tsAdjust uint32) { +func (r *RTPMunger) UpdateSnTsOffsets(extPkt *buffer.ExtPacket, snAdjust uint32, tsAdjust uint64) { r.extHighestIncomingSN = extPkt.ExtSequenceNumber - 1 r.snRangeMap.ClearAndResetValue(extPkt.ExtSequenceNumber - r.extLastSN - snAdjust) - r.tsOffset = extPkt.Packet.Timestamp - r.lastTS - tsAdjust + r.tsOffset = extPkt.ExtTimestamp - r.extLastTS - tsAdjust } func (r *RTPMunger) PacketDropped(extPkt *buffer.ExtPacket) { @@ -151,7 +151,7 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara return &TranslationParamsRTP{ snOrdering: SequenceNumberOrderingOutOfOrder, sequenceNumber: uint16(extPkt.ExtSequenceNumber - snOffset), - timestamp: extPkt.Packet.Timestamp - r.tsOffset, + timestamp: uint32(extPkt.ExtTimestamp - r.tsOffset), }, nil } @@ -186,10 +186,10 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara } extMungedSN := extPkt.ExtSequenceNumber - snOffset - mungedTS := extPkt.Packet.Timestamp - r.tsOffset + extMungedTS := extPkt.ExtTimestamp - r.tsOffset r.extLastSN = extMungedSN - r.lastTS = mungedTS + r.extLastTS = extMungedTS r.lastMarker = extPkt.Packet.Marker if extPkt.KeyFrame { @@ -204,7 +204,7 @@ func (r *RTPMunger) UpdateAndGetSnTs(extPkt *buffer.ExtPacket) (*TranslationPara return &TranslationParamsRTP{ snOrdering: ordering, sequenceNumber: uint16(extMungedSN), - timestamp: mungedTS, + timestamp: uint32(extMungedTS), }, nil } @@ -223,7 +223,7 @@ func (r *RTPMunger) FilterRTX(nacks []uint16) []uint16 { return filtered } -func (r *RTPMunger) UpdateAndGetPaddingSnTs(num int, clockRate uint32, frameRate uint32, forceMarker bool, rtpTimestamp uint32) ([]SnTs, error) { +func (r *RTPMunger) UpdateAndGetPaddingSnTs(num int, clockRate uint32, frameRate uint32, forceMarker bool, extRtpTimestamp uint64) ([]SnTs, error) { useLastTSForFirst := false tsOffset := 0 if !r.lastMarker { @@ -237,32 +237,33 @@ func (r *RTPMunger) UpdateAndGetPaddingSnTs(num int, clockRate uint32, frameRate } extLastSN := r.extLastSN - lastTS := r.lastTS + extLastTS := r.extLastTS vals := make([]SnTs, num) for i := 0; i < num; i++ { extLastSN++ vals[i].sequenceNumber = uint16(extLastSN) + if frameRate != 0 { if useLastTSForFirst && i == 0 { - vals[i].timestamp = r.lastTS + vals[i].timestamp = uint32(r.extLastTS) } else { - ts := rtpTimestamp + ((uint32(i+1-tsOffset)*clockRate)+frameRate-1)/frameRate - if (ts-lastTS) == 0 || (ts-lastTS) > (1<<31) { - ts = lastTS + 1 - lastTS = ts + ets := extRtpTimestamp + uint64(((uint32(i+1-tsOffset)*clockRate)+frameRate-1)/frameRate) + if int64(ets-extLastTS) <= 0 { + ets = extLastTS + 1 } - vals[i].timestamp = ts + extLastTS = ets + vals[i].timestamp = uint32(ets) } } else { - vals[i].timestamp = r.lastTS + vals[i].timestamp = uint32(r.extLastTS) } } r.extLastSN = extLastSN r.snRangeMap.DecValue(uint32(num)) - r.tsOffset -= vals[num-1].timestamp - r.lastTS - r.lastTS = vals[num-1].timestamp + r.tsOffset -= extLastTS - r.extLastTS + r.extLastTS = extLastTS if forceMarker { r.lastMarker = true diff --git a/pkg/sfu/rtpmunger_test.go b/pkg/sfu/rtpmunger_test.go index 9f4ea976d..4119fd2c5 100644 --- a/pkg/sfu/rtpmunger_test.go +++ b/pkg/sfu/rtpmunger_test.go @@ -43,11 +43,11 @@ func TestSetLastSnTs(t *testing.T) { r.SetLastSnTs(extPkt) require.Equal(t, uint32(23332), r.extHighestIncomingSN) require.Equal(t, uint32(23333), r.extLastSN) - require.Equal(t, uint32(0xabcdef), r.lastTS) + require.Equal(t, uint64(0xabcdef), r.extLastTS) snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN) require.NoError(t, err) require.Equal(t, uint32(0), snOffset) - require.Equal(t, uint32(0), r.tsOffset) + require.Equal(t, uint64(0), r.tsOffset) } func TestUpdateSnTsOffsets(t *testing.T) { @@ -70,11 +70,11 @@ func TestUpdateSnTsOffsets(t *testing.T) { r.UpdateSnTsOffsets(extPkt, 1, 1) require.Equal(t, uint32(33332), r.extHighestIncomingSN) require.Equal(t, uint32(23333), r.extLastSN) - require.Equal(t, uint32(0xabcdef), r.lastTS) + require.Equal(t, uint64(0xabcdef), r.extLastTS) snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN) require.NoError(t, err) require.Equal(t, uint32(9999), snOffset) - require.Equal(t, uint32(0xffffffff), r.tsOffset) + require.Equal(t, uint64(0xffff_ffff_ffff_ffff), r.tsOffset) } func TestPacketDropped(t *testing.T) { @@ -90,11 +90,11 @@ func TestPacketDropped(t *testing.T) { r.SetLastSnTs(extPkt) require.Equal(t, uint32(23332), r.extHighestIncomingSN) require.Equal(t, uint32(23333), r.extLastSN) - require.Equal(t, uint32(0xabcdef), r.lastTS) + require.Equal(t, uint64(0xabcdef), r.extLastTS) snOffset, err := r.snRangeMap.GetValue(r.extHighestIncomingSN) require.NoError(t, err) require.Equal(t, uint32(0), snOffset) - require.Equal(t, uint32(0), r.tsOffset) + require.Equal(t, uint64(0), r.tsOffset) r.UpdateAndGetSnTs(extPkt) // update sequence number offset @@ -493,7 +493,7 @@ func TestUpdateAndGetPaddingSnTs(t *testing.T) { timestamp: params.Timestamp + ((uint32(i)*clockRate)+frameRate-1)/frameRate, } } - snts, err := r.UpdateAndGetPaddingSnTs(numPadding, clockRate, frameRate, true, params.Timestamp) + snts, err := r.UpdateAndGetPaddingSnTs(numPadding, clockRate, frameRate, true, extPkt.ExtTimestamp) require.NoError(t, err) require.Equal(t, sntsExpected, snts) @@ -504,7 +504,7 @@ func TestUpdateAndGetPaddingSnTs(t *testing.T) { timestamp: snts[len(snts)-1].timestamp + ((uint32(i+1)*clockRate)+frameRate-1)/frameRate, } } - snts, err = r.UpdateAndGetPaddingSnTs(numPadding, clockRate, frameRate, false, snts[len(snts)-1].timestamp) + snts, err = r.UpdateAndGetPaddingSnTs(numPadding, clockRate, frameRate, false, uint64(snts[len(snts)-1].timestamp)) require.NoError(t, err) require.Equal(t, sntsExpected, snts) } diff --git a/pkg/sfu/streamtrackermanager.go b/pkg/sfu/streamtrackermanager.go index 9aa82f91e..19fd266e9 100644 --- a/pkg/sfu/streamtrackermanager.go +++ b/pkg/sfu/streamtrackermanager.go @@ -76,7 +76,7 @@ type StreamTrackerManager struct { senderReportMu sync.RWMutex senderReports [buffer.DefaultMaxLayerSpatial + 1]endsSenderReport - layerOffsets [buffer.DefaultMaxLayerSpatial + 1][buffer.DefaultMaxLayerSpatial + 1]uint32 + layerOffsets [buffer.DefaultMaxLayerSpatial + 1][buffer.DefaultMaxLayerSpatial + 1]uint64 closed core.Fuse @@ -563,10 +563,10 @@ func (s *StreamTrackerManager) updateLayerOffsetLocked(ref, other int32) { rtpDiff := ntpDiff.Nanoseconds() * int64(s.clockRate) / 1e9 // calculate other layer's time stamp at the same time as ref layer's NTP time - normalizedOtherTS := srOther.RTPTimestamp + uint32(rtpDiff) + normalizedOtherTS := srOther.RTPTimestampExt + uint64(rtpDiff) // now both layers' time stamp refer to the same NTP time and the diff is the offset between the layers - offset := srRef.RTPTimestamp - normalizedOtherTS + offset := srRef.RTPTimestampExt - normalizedOtherTS // use minimal offset to indicate value availability in the extremely unlikely case of // both layers using the same timestamp @@ -643,7 +643,7 @@ func (s *StreamTrackerManager) GetCalculatedClockRate(layer int32) uint32 { return uint32(float64(rdsf) / tsf.Seconds()) } -func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) { +func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ets uint64, layer int32, referenceLayer int32) (uint64, error) { s.senderReportMu.RLock() defer s.senderReportMu.RUnlock() @@ -655,7 +655,7 @@ func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ts uint32, layer in return 0, fmt.Errorf("offset unavailable, target: %d, reference: %d", layer, referenceLayer) } - return ts + s.layerOffsets[referenceLayer][layer], nil + return ets + s.layerOffsets[referenceLayer][layer], nil } func (s *StreamTrackerManager) GetMaxTemporalLayerSeen() int32 { diff --git a/pkg/sfu/testutils/data.go b/pkg/sfu/testutils/data.go index c30757b3d..2f0096c63 100644 --- a/pkg/sfu/testutils/data.go +++ b/pkg/sfu/testutils/data.go @@ -32,6 +32,7 @@ type TestExtPacketParams struct { SequenceNumber uint16 SNCycles int Timestamp uint32 + TSCycles int SSRC uint32 PayloadSize int PaddingSize byte @@ -64,6 +65,7 @@ func GetTestExtPacket(params *TestExtPacketParams) (*buffer.ExtPacket, error) { ep := &buffer.ExtPacket{ VideoLayer: params.VideoLayer, ExtSequenceNumber: uint32(params.SNCycles<<16) + uint32(params.SequenceNumber), + ExtTimestamp: uint64(params.TSCycles<<32) + uint64(params.Timestamp), Arrival: params.ArrivalTime, Packet: &packet, KeyFrame: params.IsKeyFrame, diff --git a/pkg/sfu/utils/wraparound.go b/pkg/sfu/utils/wraparound.go index 8845f2c19..5dd2b1e93 100644 --- a/pkg/sfu/utils/wraparound.go +++ b/pkg/sfu/utils/wraparound.go @@ -32,7 +32,7 @@ type WrapAround[T number, ET extendedNumber] struct { initialized bool start T highest T - cycles int + cycles ET } func NewWrapAround[T number, ET extendedNumber]() *WrapAround[T, ET] { @@ -78,7 +78,7 @@ func (w *WrapAround[T, ET]) Update(val T) (result WrapAroundUpdateResult[ET]) { // in-order if val < w.highest { - w.cycles++ + w.cycles += w.fullRange } w.highest = val @@ -88,13 +88,14 @@ func (w *WrapAround[T, ET]) Update(val T) (result WrapAroundUpdateResult[ET]) { func (w *WrapAround[T, ET]) RollbackRestart(ev ET) { if w.isWrapBack(w.start, T(ev)) { - w.cycles-- + w.cycles -= w.fullRange } w.start = T(ev) } -func (w *WrapAround[T, ET]) ResetHighest(val T) { - w.highest = val +func (w *WrapAround[T, ET]) ResetHighest(ev ET) { + w.highest = T(ev) + w.cycles = ev & ^(w.fullRange - 1) } func (w *WrapAround[T, ET]) GetStart() T { @@ -122,7 +123,7 @@ func (w *WrapAround[T, ET]) maybeAdjustStart(val T) (isRestart bool, preExtended totalNum := w.GetExtendedHighest() - w.GetExtendedStart() + 1 if totalNum > (w.fullRange >> 1) { if w.isWrapBack(val, w.highest) { - cycles-- + cycles -= w.fullRange } extendedVal = w.getExtendedHighest(cycles, val) return @@ -134,13 +135,13 @@ func (w *WrapAround[T, ET]) maybeAdjustStart(val T) (isRestart bool, preExtended preExtendedStart = w.GetExtendedStart() if w.isWrapBack(val, w.highest) { - w.cycles = 1 + w.cycles = w.fullRange cycles = 0 } w.start = val } else { if w.isWrapBack(val, w.highest) { - cycles-- + cycles -= w.fullRange } } extendedVal = w.getExtendedHighest(cycles, val) @@ -151,6 +152,6 @@ func (w *WrapAround[T, ET]) isWrapBack(earlier T, later T) bool { return ET(later) < (w.fullRange>>1) && ET(earlier) >= (w.fullRange>>1) } -func (w *WrapAround[T, ET]) getExtendedHighest(cycles int, val T) ET { - return ET(cycles)*w.fullRange + ET(val) +func (w *WrapAround[T, ET]) getExtendedHighest(cycles ET, val T) ET { + return cycles + ET(val) } diff --git a/pkg/sfu/utils/wraparound_test.go b/pkg/sfu/utils/wraparound_test.go index c01729108..8f246d700 100644 --- a/pkg/sfu/utils/wraparound_test.go +++ b/pkg/sfu/utils/wraparound_test.go @@ -252,6 +252,19 @@ func TestWrapAroundUint16RollbackRestart(t *testing.T) { require.Equal(t, uint32(23), w.GetExtendedStart()) require.Equal(t, uint16(25), w.GetHighest()) require.Equal(t, uint32(25), w.GetExtendedHighest()) + + // reset highest + w.ResetHighest(0x1234) + require.Equal(t, uint16(23), w.GetStart()) + require.Equal(t, uint32(23), w.GetExtendedStart()) + require.Equal(t, uint16(0x1234), w.GetHighest()) + require.Equal(t, uint32(0x1234), w.GetExtendedHighest()) + + w.ResetHighest(0x7f1234) + require.Equal(t, uint16(23), w.GetStart()) + require.Equal(t, uint32(23), w.GetExtendedStart()) + require.Equal(t, uint16(0x1234), w.GetHighest()) + require.Equal(t, uint32(0x7f1234), w.GetExtendedHighest()) } func TestWrapAroundUint32(t *testing.T) {