From a609b915c39c26117ee574bd9bd7b00d4bae3e15 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Wed, 25 Jan 2023 11:00:15 +0530 Subject: [PATCH] Use local time base for NTP in RTCP Sender Report for downtracks. (#1321) * Use local time base for NTP in RTCP Sender Report for downtracks. More details in comments in code. * Remove debug * RTCPSenderReportInfo -> RTCPSenderReportDataExt * Get rid of sender report data pointer checks (cherry picked from commit c696626fe8e8d7f24805ae7e2948159f0154e0d9) --- pkg/rtc/wrappedreceiver.go | 4 +- pkg/sfu/buffer/buffer.go | 4 +- pkg/sfu/buffer/rtpstats.go | 139 +++++++++++++++++++------------------ pkg/sfu/downtrack.go | 4 +- pkg/sfu/receiver.go | 20 +++--- 5 files changed, 89 insertions(+), 82 deletions(-) diff --git a/pkg/rtc/wrappedreceiver.go b/pkg/rtc/wrappedreceiver.go index b02708d12..484b78792 100644 --- a/pkg/rtc/wrappedreceiver.go +++ b/pkg/rtc/wrappedreceiver.go @@ -289,9 +289,9 @@ func (d *DummyReceiver) GetRedReceiver() sfu.TrackReceiver { return d } -func (d *DummyReceiver) GetRTCPSenderReportData(layer int32) *buffer.RTCPSenderReportData { +func (d *DummyReceiver) GetRTCPSenderReportDataExt(layer int32) *buffer.RTCPSenderReportDataExt { if r, ok := d.receiver.Load().(sfu.TrackReceiver); ok { - return r.GetRTCPSenderReportData(layer) + return r.GetRTCPSenderReportDataExt(layer) } return nil } diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index 02b1b059b..37701e260 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -633,12 +633,12 @@ func (b *Buffer) SetSenderReportData(rtpTime uint32, ntpTime uint64) { } } -func (b *Buffer) GetSenderReportData() *RTCPSenderReportData { +func (b *Buffer) GetSenderReportDataExt() *RTCPSenderReportDataExt { b.RLock() defer b.RUnlock() if b.rtpStats != nil { - return b.rtpStats.GetRtcpSenderReportData() + return b.rtpStats.GetRtcpSenderReportDataExt() } return nil diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index 8ff1a2ddf..131d98a9d 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -2,6 +2,7 @@ package buffer import ( "fmt" + "math" "sync" "time" @@ -20,6 +21,7 @@ const ( FirstSnapshotId = 1 SnInfoSize = 2048 SnInfoMask = SnInfoSize - 1 + TooLargeOWD = 400 * time.Millisecond ) type RTPFlowState struct { @@ -87,6 +89,11 @@ type RTCPSenderReportData struct { ArrivalTime time.Time } +type RTCPSenderReportDataExt struct { + SenderReportData RTCPSenderReportData + SmoothedOWD time.Duration +} + type RTPStatsParams struct { ClockRate uint32 IsReceiverReportDriven bool @@ -164,11 +171,9 @@ type RTPStats struct { rtt uint32 maxRtt uint32 - srData *RTCPSenderReportData - lastSRNTP mediatransportutil.NtpTime - lastSRRTP uint32 - lastSRAt time.Time - lastSRPackets uint32 + srDataExt *RTCPSenderReportDataExt + lastSRNTP mediatransportutil.NtpTime + lastSRAt time.Time nextSnapshotId uint32 snapshots map[uint32]*Snapshot @@ -256,19 +261,16 @@ func (r *RTPStats) Seed(from *RTPStats) { r.rtt = from.rtt r.maxRtt = from.maxRtt - if from.srData != nil { - r.srData = &RTCPSenderReportData{ - RTPTimestamp: from.srData.RTPTimestamp, - NTPTimestamp: from.srData.NTPTimestamp, - ArrivalTime: from.srData.ArrivalTime, + if from.srDataExt != nil { + r.srDataExt = &RTCPSenderReportDataExt{ + SenderReportData: from.srDataExt.SenderReportData, + SmoothedOWD: from.srDataExt.SmoothedOWD, } } else { - r.srData = nil + r.srDataExt = nil } r.lastSRNTP = from.lastSRNTP - r.lastSRRTP = from.lastSRRTP r.lastSRAt = from.lastSRAt - r.lastSRPackets = from.lastSRPackets r.nextSnapshotId = from.nextSnapshotId for id, ss := range from.snapshots { @@ -479,6 +481,8 @@ func (r *RTPStats) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt uint32 rtt, err := mediatransportutil.GetRttMs(&rr, r.lastSRNTP, r.lastSRAt) if err == nil { isRttChanged = rtt != r.rtt + } else { + r.logger.Warnw("error getting rtt", err) } if r.lastRRTime.IsZero() || r.extHighestSNOverridden <= rr.LastSequenceNumber { @@ -680,33 +684,56 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) { defer r.lock.Unlock() if srData == nil { - r.srData = nil + r.srDataExt = nil return } - r.srData = &RTCPSenderReportData{ - RTPTimestamp: srData.RTPTimestamp, - NTPTimestamp: srData.NTPTimestamp, - ArrivalTime: srData.ArrivalTime, + // prevent against extreme case of anachronous sender reports + if r.srDataExt != nil && r.srDataExt.SenderReportData.NTPTimestamp > srData.NTPTimestamp { + return + } + + // Low pass filter one-way-delay (owd) to normalize time stamp to local time base when sending RTCP sender report. + // Forwarding RTCP sender report would be ideal. But, there are a couple of issues with that + // 1. Senders could have different clocks. + // 2. Adjusting to current time as required by RTCP spec. + // By normalizing to local clock, these issues can be addressed. However, normalization is not straightforward + // as it is not possible to know the propagation delay and processing delay at both ends (send side processing + // after time stamping the RTCP packet and receive side processing after reading packet off the wire). + // Smoothed version of OWD is used to + owd := srData.ArrivalTime.Sub(srData.NTPTimestamp.Time()) + if r.srDataExt != nil { + prevOwd := r.srDataExt.SenderReportData.ArrivalTime.Sub(r.srDataExt.SenderReportData.NTPTimestamp.Time()) + if time.Duration(math.Abs(float64(owd)-float64(prevOwd))) > TooLargeOWD { + r.logger.Infow("large one-way-delay", "owd", owd, "prevOwd", prevOwd) + } + } + + smoothedOwd := owd + if r.srDataExt != nil { + smoothedOwd = r.srDataExt.SmoothedOWD + } + r.srDataExt = &RTCPSenderReportDataExt{ + SenderReportData: *srData, + SmoothedOWD: (owd + smoothedOwd) / 2, } } -func (r *RTPStats) GetRtcpSenderReportData() *RTCPSenderReportData { +func (r *RTPStats) GetRtcpSenderReportDataExt() *RTCPSenderReportDataExt { r.lock.Lock() defer r.lock.Unlock() - if r.srData == nil { + if r.srDataExt == nil { return nil } - return &RTCPSenderReportData{ - RTPTimestamp: r.srData.RTPTimestamp, - NTPTimestamp: r.srData.NTPTimestamp, - ArrivalTime: r.srData.ArrivalTime, + return &RTCPSenderReportDataExt{ + SenderReportData: r.srDataExt.SenderReportData, + SmoothedOWD: r.srDataExt.SmoothedOWD, } } -func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, srData *RTCPSenderReportData) *rtcp.SenderReport { +func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, srDataExt *RTCPSenderReportDataExt) *rtcp.SenderReport { r.lock.RLock() defer r.lock.RUnlock() @@ -714,59 +741,39 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, srData *RTCPSenderReportData return nil } - packetCount := r.getTotalPacketsPrimary() + r.packetsDuplicate + r.packetsPadding - if packetCount == r.lastSRPackets { - // no packets sent since last report - return nil - } - - if srData == nil || srData.NTPTimestamp == 0 || srData.ArrivalTime.IsZero() { + if srDataExt == nil || srDataExt.SenderReportData.NTPTimestamp == 0 || srDataExt.SenderReportData.ArrivalTime.IsZero() { // no sender report from publisher return nil } // NTP timestamp in sender report from publisher side could have a different base, - // i. e. it may not be wall clock time at the time of send. + // i. e. although it should be wall clock at time of send, have observed instances of older timer. // It is not possible to accurately calculate current time in the NTP time base of the publisher side. - // Time of arrival of sender report from publisher side can be stored and time since that arrival - // can be calculated using local time base and that can be used to adjust the NTP time stamp to current time. - // However, that does not account for the variable time of network propagation of sender report. - // So, it is not possible to get accurate NTP timestamp of current time in publisher's time base. - // - // As a compromise, the NTP timestamp corresponding to the last sent RTP packet is calculated and used. - // That does mean it will not be very accurate, i. e. a bit of time could have elapsed since the last packet transmit. - // But, it is okay as NTP time stamp is used referentially to calculate RTT. - // - // NOTE: Large amounts of time without packets sent will cause error in this calculation, - // i. e. RTP time stamp rolling over will cause incorrect calculations (approx 13h for video and 25h for 48 KHz audio) - var nowNTP mediatransportutil.NtpTime - var nowRTP uint32 - if r.lastSRNTP != 0 { - sinceLastSR := time.Duration(float64(r.highestTS-r.lastSRRTP) / float64(r.params.ClockRate) * float64(time.Second)) - nowNTP = mediatransportutil.ToNtpTime(r.lastSRNTP.Time().Add(sinceLastSR)) - nowRTP = r.highestTS + // So, using a smoothed version of one way delay for use in sender reports. + now := time.Now() + nowNTP := mediatransportutil.ToNtpTime(now) + nowRTP := r.highestTS + + smoothedLocalTimeOfLatestSenderReportNTP := srDataExt.SenderReportData.NTPTimestamp.Time().Add(srDataExt.SmoothedOWD) + if smoothedLocalTimeOfLatestSenderReportNTP.After(now) { + r.logger.Infow("smoothed time of NTP is ahead", + "now", now, + "smoothed", smoothedLocalTimeOfLatestSenderReportNTP, + "diff", smoothedLocalTimeOfLatestSenderReportNTP.Sub(now), + ) + nowRTP += uint32(now.Sub(time.Unix(0, r.highestTime)).Milliseconds() * int64(r.params.ClockRate) / 1000) } else { - if (r.highestTS - srData.RTPTimestamp) > (1 << 31) { - // sender report is newer than last packet sent, use it - nowNTP = srData.NTPTimestamp - nowRTP = srData.RTPTimestamp - } else { - sinceLastSR := time.Duration(float64(r.highestTS-srData.RTPTimestamp) / float64(r.params.ClockRate) * float64(time.Second)) - nowNTP = mediatransportutil.ToNtpTime(srData.NTPTimestamp.Time().Add(sinceLastSR)) - nowRTP = r.highestTS - } + nowRTP = srDataExt.SenderReportData.RTPTimestamp + uint32(now.Sub(smoothedLocalTimeOfLatestSenderReportNTP).Milliseconds()*int64(r.params.ClockRate)/1000) } r.lastSRNTP = nowNTP - r.lastSRRTP = nowRTP r.lastSRAt = time.Now() - r.lastSRPackets = packetCount return &rtcp.SenderReport{ SSRC: ssrc, NTPTime: uint64(nowNTP), RTPTime: nowRTP, - PacketCount: packetCount, + PacketCount: r.getTotalPacketsPrimary() + r.packetsDuplicate + r.packetsPadding, OctetCount: uint32(r.bytes + r.bytesDuplicate + r.bytesPadding), } } @@ -813,8 +820,8 @@ func (r *RTPStats) SnapshotRtcpReceptionReport(ssrc uint32, proxyFracLost uint8, } var dlsr uint32 - if r.srData != nil && !r.srData.ArrivalTime.IsZero() { - delayMS := uint32(time.Since(r.srData.ArrivalTime).Milliseconds()) + if r.srDataExt != nil && !r.srDataExt.SenderReportData.ArrivalTime.IsZero() { + delayMS := uint32(time.Since(r.srDataExt.SenderReportData.ArrivalTime).Milliseconds()) dlsr = (delayMS / 1e3) << 16 dlsr |= (delayMS % 1e3) * 65536 / 1000 } @@ -826,8 +833,8 @@ func (r *RTPStats) SnapshotRtcpReceptionReport(ssrc uint32, proxyFracLost uint8, } lastSR := uint32(0) - if r.srData != nil { - lastSR = uint32(r.srData.NTPTimestamp >> 16) + if r.srDataExt != nil { + lastSR = uint32(r.srDataExt.SenderReportData.NTPTimestamp >> 16) } return &rtcp.ReceptionReport{ SSRC: ssrc, diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index bdf100646..1a0b00003 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -740,7 +740,7 @@ func (d *DownTrack) CloseWithFlush(flush bool) { d.bindLock.Unlock() d.connectionStats.Close() d.rtpStats.Stop() - d.logger.Infow("rtp stats", "direction", "downstream", "stats", d.rtpStats.ToString()) + d.logger.Infow("rtp stats", "direction", "downstream", "mime", d.mime, "stats", d.rtpStats.ToString()) if d.onMaxLayerChanged != nil && d.kind == webrtc.RTPCodecTypeVideo { d.onMaxLayerChanged(d, InvalidLayerSpatial) @@ -974,7 +974,7 @@ func (d *DownTrack) CreateSenderReport() *rtcp.SenderReport { return nil } - return d.rtpStats.GetRtcpSenderReport(d.ssrc, d.receiver.GetRTCPSenderReportData(d.forwarder.GetReferenceLayerSpatial())) + return d.rtpStats.GetRtcpSenderReport(d.ssrc, d.receiver.GetRTCPSenderReportDataExt(d.forwarder.GetReferenceLayerSpatial())) } func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan struct{} { diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 207165fbb..3f3f85b27 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -65,7 +65,7 @@ type TrackReceiver interface { GetTemporalLayerFpsForSpatial(layer int32) []float32 - GetRTCPSenderReportData(layer int32) *buffer.RTCPSenderReportData + GetRTCPSenderReportDataExt(layer int32) *buffer.RTCPSenderReportDataExt GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) } @@ -698,7 +698,7 @@ func (w *WebRTCReceiver) GetTemporalLayerFpsForSpatial(layer int32) []float32 { return b.GetTemporalLayerFpsForSpatial(layer) } -func (w *WebRTCReceiver) GetRTCPSenderReportData(layer int32) *buffer.RTCPSenderReportData { +func (w *WebRTCReceiver) GetRTCPSenderReportDataExt(layer int32) *buffer.RTCPSenderReportDataExt { w.bufferMu.RLock() defer w.bufferMu.RUnlock() @@ -706,7 +706,7 @@ func (w *WebRTCReceiver) GetRTCPSenderReportData(layer int32) *buffer.RTCPSender return nil } - return w.buffers[layer].GetSenderReportData() + return w.buffers[layer].GetSenderReportDataExt() } func (w *WebRTCReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) { @@ -721,8 +721,8 @@ func (w *WebRTCReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, r if bLayer == nil { return 0, fmt.Errorf("invalid layer: %d", layer) } - srLayer := bLayer.GetSenderReportData() - if srLayer == nil || srLayer.NTPTimestamp == 0 { + srLayer := bLayer.GetSenderReportDataExt() + if srLayer == nil || srLayer.SenderReportData.NTPTimestamp == 0 { return 0, fmt.Errorf("layer rtcp sender report not available: %d", layer) } @@ -730,8 +730,8 @@ func (w *WebRTCReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, r if bReferenceLayer == nil { return 0, fmt.Errorf("invalid reference layer: %d", referenceLayer) } - srRef := bReferenceLayer.GetSenderReportData() - if srRef == nil || srRef.NTPTimestamp == 0 { + srRef := bReferenceLayer.GetSenderReportDataExt() + if srRef == nil || srRef.SenderReportData.NTPTimestamp == 0 { return 0, fmt.Errorf("reference layer rtcp sender report not available: %d", referenceLayer) } @@ -739,12 +739,12 @@ func (w *WebRTCReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, r // NOTE: It is possible that reference layer has stopped (due to dynacast/adaptive streaming OR publisher // constraints). It should be okay even if the layer has stopped for a long time when using modulo arithmetic for // RTP time stamp (uint32 arithmetic). - ntpDiff := float64(int64(srRef.NTPTimestamp-srLayer.NTPTimestamp)) / float64(1<<32) - normalizedTS := srLayer.RTPTimestamp + uint32(ntpDiff*float64(w.codec.ClockRate)) + ntpDiff := float64(int64(srRef.SenderReportData.NTPTimestamp-srLayer.SenderReportData.NTPTimestamp)) / float64(1<<32) + normalizedTS := srLayer.SenderReportData.RTPTimestamp + uint32(ntpDiff*float64(w.codec.ClockRate)) // now that both RTP timestamps correspond to roughly the same NTP time, // the diff between them is the offset in RTP timestamp units between layer and referenceLayer. // Add the offset to layer's ts to map it to corresponding RTP timestamp in // the reference layer. - return ts + (srRef.RTPTimestamp - normalizedTS), nil + return ts + (srRef.SenderReportData.RTPTimestamp - normalizedTS), nil }