diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index 2cfe68567..baae76b46 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -101,7 +101,7 @@ type Buffer struct { // callbacks onClose func() onRtcpFeedback func([]rtcp.Packet) - onRtcpSenderReport func(*RTCPSenderReportData) + onRtcpSenderReport func() onFpsChanged func() onFinalRtpStats func(*RTPStats) @@ -675,7 +675,7 @@ func (b *Buffer) SetSenderReportData(rtpTime uint32, ntpTime uint64) { b.RUnlock() if b.onRtcpSenderReport != nil { - b.onRtcpSenderReport(srData) + b.onRtcpSenderReport() } } @@ -729,7 +729,7 @@ func (b *Buffer) OnRtcpFeedback(fn func(fb []rtcp.Packet)) { b.onRtcpFeedback = fn } -func (b *Buffer) OnRtcpSenderReport(fn func(srData *RTCPSenderReportData)) { +func (b *Buffer) OnRtcpSenderReport(fn func()) { b.onRtcpSenderReport = fn } diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index d1069c00a..938f1e4bd 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -36,6 +36,9 @@ const ( FirstSnapshotId = 1 SnInfoSize = 8192 SnInfoMask = SnInfoSize - 1 + + firstPacketTimeAdjustWindow = 2 * time.Minute + firstPacketTimeAdjustThreshold = 5 * time.Second ) // ------------------------------------------------------- @@ -763,6 +766,40 @@ func (r *RTPStats) GetRtt() uint32 { return r.rtt } +func (r *RTPStats) MaybeAdjustFirstPacketTime(srData *RTCPSenderReportData) { + r.lock.Lock() + defer r.lock.Unlock() + + r.maybeAdjustFirstPacketTime(srData.RTPTimestampExt) +} + +func (r *RTPStats) maybeAdjustFirstPacketTime(extTS uint64) { + if time.Since(r.startTime) > firstPacketTimeAdjustWindow { + return + } + + // for some time after the start, adjust time of first packet. + // Helps improve accuracy of expected timestamp calculation. + // Adjusting only one way, i. e. if the first sample experienced + // abnormal delay (maybe due to pacing or maybe due to queuing + // in some network element along the way), push back first time + // to an earlier instance. + samplesDuration := time.Duration(float64(extTS-r.extStartTS) / float64(r.params.ClockRate) * float64(time.Second)) + firstTime := time.Now().Add(-samplesDuration) + if firstTime.Before(r.firstTime) { + r.logger.Infow( + "adjusting first packet time", + "before", r.firstTime.String(), + "after", firstTime.String(), + ) + if r.firstTime.Sub(firstTime) > firstPacketTimeAdjustThreshold { + r.logger.Infow("first packet time adjustment too big, ignoring", "adjustment", r.firstTime.Sub(firstTime)) + } else { + r.firstTime = firstTime + } + } +} + func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) { r.lock.Lock() defer r.lock.Unlock() @@ -792,6 +829,8 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) { srDataCopy := *srData srDataCopy.RTPTimestampExt = uint64(srDataCopy.RTPTimestamp) + cycles + r.maybeAdjustFirstPacketTime(srDataCopy.RTPTimestampExt) + // monitor and log RTP timestamp anomalies var ntpDiffSinceLast time.Duration var rtpDiffSinceLast uint32 diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index d204b8c34..ff287ae20 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -1802,7 +1802,10 @@ func (d *DownTrack) sendSilentFrameOnMuteForOpus() { } } -func (d *DownTrack) HandleRTCPSenderReportData(_payloadType webrtc.PayloadType, _layer int32, _srData *buffer.RTCPSenderReportData) error { +func (d *DownTrack) HandleRTCPSenderReportData(_payloadType webrtc.PayloadType, layer int32, srData *buffer.RTCPSenderReportData) error { + if layer == d.forwarder.GetReferenceLayerSpatial() { + d.rtpStats.MaybeAdjustFirstPacketTime(srData) + } return nil } diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 33131bd71..abfd9a4e3 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -15,6 +15,7 @@ package sfu import ( + "errors" "fmt" "math" "math/rand" @@ -40,7 +41,7 @@ const ( FlagFilterRTX = true TransitionCostSpatial = 10 - ResumeBehindThresholdSeconds = float64(0.1) // 100ms + ResumeBehindThresholdSeconds = float64(0.2) // 200ms LayerSwitchBehindThresholdSeconds = float64(0.05) // 50ms SwitchAheadThresholdSeconds = float64(0.025) // 25ms ) @@ -1455,14 +1456,16 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e switchingAt := time.Now() if f.getReferenceLayerRTPTimestamp != nil { ts, err := f.getReferenceLayerRTPTimestamp(extPkt.Packet.Timestamp, layer, f.referenceLayerSpatial) - if err == nil { - refTS = ts + if err != nil { + // error out if refTS is not available. It can happen when there is no sender report + // for the layer being switched to. Can especially happen at the start of the track when layer switches are + // potentially happening very quickly. Erroring out and waiting for a layer for which a sender report has been + // received will calculate a better offset, but may result in initial adaptation to take a bit longer depending + // on how often publisher/remote side sends RTCP sender report. + return err } - // AVSYNC-TODO: can error out here if refTS is not available. It can happen when there is no sender report - // for the layer being switched to. Can especially happen at the start of the track when layer switches are - // potentially happening very quickly. Erroring out and waiting for a layer for which a sender report has been - // received will calculate a better offset, but may result in initial adaptation to take a bit longer depending - // on how often publisher/remote side sends RTCP sender report. + + refTS = ts } if f.getExpectedRTPTimestamp != nil { @@ -1508,7 +1511,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e // between expectedTS and refTS is thresholded. Difference below the threshold is treated as Case 2 // and above as Case 1. // - // In the event of refTS > expectedTS, another threshold is used to pick the next timestamp. + // In the event of refTS > expectedTS, use refTS. // Ideally, refTS should not be ahead of expectedTS, but expectedTS uses the first packet's // wall clock time. So, if the first packet experienced abmormal latency, it is possible // for refTS > expectedTS @@ -1523,22 +1526,22 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e } else { if math.Abs(diffSeconds) > SwitchAheadThresholdSeconds { f.logger.Infow("resume, reference too far ahead", "expectedTS", expectedTS, "refTS", refTS, "diffSeconds", math.Abs(diffSeconds)) - nextTS = expectedTS - } else { - nextTS = refTS } + nextTS = refTS } } else { // switching between layers, check if refTS is too far behind the last sent diffSeconds := float64(int32(refTS-lastTS)) / float64(f.codec.ClockRate) if diffSeconds < 0.0 { if math.Abs(diffSeconds) > LayerSwitchBehindThresholdSeconds { - // AVSYNC-TODO: This could be due to pacer trickling out this layer. Should potentially return error here and wait for a more opportune time - // or some forcing function (like "have waited for too long for layer switch, nothing available, switch to whatever is available" kind of condition) - // to do the switch. Just logging it for now. + // this could be due to pacer trickling out this layer. Error out and wait for a more opportune time. + // AVSYNC-TODO: Consider some forcing function to do the switch + // (like "have waited for too long for layer switch, nothing available, switch to whatever is available" kind of condition). f.logger.Infow("layer switch, reference too far behind", "expectedTS", expectedTS, "refTS", refTS, "lastTS", lastTS, "diffSeconds", math.Abs(diffSeconds)) + return errors.New("switch point too far behind") } // use a nominal increase to ensure that timestamp is always moving forward + f.logger.Infow("layer switch, reference is slghtly behind", "expectedTS", expectedTS, "refTS", refTS, "lastTS", lastTS, "diffSeconds", math.Abs(diffSeconds)) nextTS = lastTS + 1 } else { diffSeconds = float64(int32(expectedTS-refTS)) / float64(f.codec.ClockRate) diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 1b36811d5..e24d8b1b0 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -331,12 +331,12 @@ func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buff SmoothIntervals: w.audioConfig.SmoothIntervals, }) buff.OnRtcpFeedback(w.sendRTCP) - buff.OnRtcpSenderReport(func(srData *buffer.RTCPSenderReportData) { + buff.OnRtcpSenderReport(func() { srFirst, srNewest := buff.GetSenderReportData() w.streamTrackerManager.SetRTCPSenderReportData(layer, srFirst, srNewest) w.downTrackSpreader.Broadcast(func(dt TrackSender) { - _ = dt.HandleRTCPSenderReportData(w.codec.PayloadType, layer, srData) + _ = dt.HandleRTCPSenderReportData(w.codec.PayloadType, layer, srNewest) }) })