Use local time base for NTP in RTCP Sender Report for downtracks. (#1321)

* Use local time base for NTP in RTCP Sender Report for downtracks.

More details in comments in code.

* Remove debug

* RTCPSenderReportInfo -> RTCPSenderReportDataExt

* Get rid of sender report data pointer checks

(cherry picked from commit c696626fe8)
This commit is contained in:
Raja Subramanian
2023-01-25 11:00:15 +05:30
committed by Mathew Kamkar
parent 3ec48f37aa
commit a609b915c3
5 changed files with 89 additions and 82 deletions
+2 -2
View File
@@ -289,9 +289,9 @@ func (d *DummyReceiver) GetRedReceiver() sfu.TrackReceiver {
return d
}
func (d *DummyReceiver) GetRTCPSenderReportData(layer int32) *buffer.RTCPSenderReportData {
func (d *DummyReceiver) GetRTCPSenderReportDataExt(layer int32) *buffer.RTCPSenderReportDataExt {
if r, ok := d.receiver.Load().(sfu.TrackReceiver); ok {
return r.GetRTCPSenderReportData(layer)
return r.GetRTCPSenderReportDataExt(layer)
}
return nil
}
+2 -2
View File
@@ -633,12 +633,12 @@ func (b *Buffer) SetSenderReportData(rtpTime uint32, ntpTime uint64) {
}
}
func (b *Buffer) GetSenderReportData() *RTCPSenderReportData {
func (b *Buffer) GetSenderReportDataExt() *RTCPSenderReportDataExt {
b.RLock()
defer b.RUnlock()
if b.rtpStats != nil {
return b.rtpStats.GetRtcpSenderReportData()
return b.rtpStats.GetRtcpSenderReportDataExt()
}
return nil
+73 -66
View File
@@ -2,6 +2,7 @@ package buffer
import (
"fmt"
"math"
"sync"
"time"
@@ -20,6 +21,7 @@ const (
FirstSnapshotId = 1
SnInfoSize = 2048
SnInfoMask = SnInfoSize - 1
TooLargeOWD = 400 * time.Millisecond
)
type RTPFlowState struct {
@@ -87,6 +89,11 @@ type RTCPSenderReportData struct {
ArrivalTime time.Time
}
type RTCPSenderReportDataExt struct {
SenderReportData RTCPSenderReportData
SmoothedOWD time.Duration
}
type RTPStatsParams struct {
ClockRate uint32
IsReceiverReportDriven bool
@@ -164,11 +171,9 @@ type RTPStats struct {
rtt uint32
maxRtt uint32
srData *RTCPSenderReportData
lastSRNTP mediatransportutil.NtpTime
lastSRRTP uint32
lastSRAt time.Time
lastSRPackets uint32
srDataExt *RTCPSenderReportDataExt
lastSRNTP mediatransportutil.NtpTime
lastSRAt time.Time
nextSnapshotId uint32
snapshots map[uint32]*Snapshot
@@ -256,19 +261,16 @@ func (r *RTPStats) Seed(from *RTPStats) {
r.rtt = from.rtt
r.maxRtt = from.maxRtt
if from.srData != nil {
r.srData = &RTCPSenderReportData{
RTPTimestamp: from.srData.RTPTimestamp,
NTPTimestamp: from.srData.NTPTimestamp,
ArrivalTime: from.srData.ArrivalTime,
if from.srDataExt != nil {
r.srDataExt = &RTCPSenderReportDataExt{
SenderReportData: from.srDataExt.SenderReportData,
SmoothedOWD: from.srDataExt.SmoothedOWD,
}
} else {
r.srData = nil
r.srDataExt = nil
}
r.lastSRNTP = from.lastSRNTP
r.lastSRRTP = from.lastSRRTP
r.lastSRAt = from.lastSRAt
r.lastSRPackets = from.lastSRPackets
r.nextSnapshotId = from.nextSnapshotId
for id, ss := range from.snapshots {
@@ -479,6 +481,8 @@ func (r *RTPStats) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt uint32
rtt, err := mediatransportutil.GetRttMs(&rr, r.lastSRNTP, r.lastSRAt)
if err == nil {
isRttChanged = rtt != r.rtt
} else {
r.logger.Warnw("error getting rtt", err)
}
if r.lastRRTime.IsZero() || r.extHighestSNOverridden <= rr.LastSequenceNumber {
@@ -680,33 +684,56 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) {
defer r.lock.Unlock()
if srData == nil {
r.srData = nil
r.srDataExt = nil
return
}
r.srData = &RTCPSenderReportData{
RTPTimestamp: srData.RTPTimestamp,
NTPTimestamp: srData.NTPTimestamp,
ArrivalTime: srData.ArrivalTime,
// prevent against extreme case of anachronous sender reports
if r.srDataExt != nil && r.srDataExt.SenderReportData.NTPTimestamp > srData.NTPTimestamp {
return
}
// Low pass filter one-way-delay (owd) to normalize time stamp to local time base when sending RTCP sender report.
// Forwarding RTCP sender report would be ideal. But, there are a couple of issues with that
// 1. Senders could have different clocks.
// 2. Adjusting to current time as required by RTCP spec.
// By normalizing to local clock, these issues can be addressed. However, normalization is not straightforward
// as it is not possible to know the propagation delay and processing delay at both ends (send side processing
// after time stamping the RTCP packet and receive side processing after reading packet off the wire).
// Smoothed version of OWD is used to
owd := srData.ArrivalTime.Sub(srData.NTPTimestamp.Time())
if r.srDataExt != nil {
prevOwd := r.srDataExt.SenderReportData.ArrivalTime.Sub(r.srDataExt.SenderReportData.NTPTimestamp.Time())
if time.Duration(math.Abs(float64(owd)-float64(prevOwd))) > TooLargeOWD {
r.logger.Infow("large one-way-delay", "owd", owd, "prevOwd", prevOwd)
}
}
smoothedOwd := owd
if r.srDataExt != nil {
smoothedOwd = r.srDataExt.SmoothedOWD
}
r.srDataExt = &RTCPSenderReportDataExt{
SenderReportData: *srData,
SmoothedOWD: (owd + smoothedOwd) / 2,
}
}
func (r *RTPStats) GetRtcpSenderReportData() *RTCPSenderReportData {
func (r *RTPStats) GetRtcpSenderReportDataExt() *RTCPSenderReportDataExt {
r.lock.Lock()
defer r.lock.Unlock()
if r.srData == nil {
if r.srDataExt == nil {
return nil
}
return &RTCPSenderReportData{
RTPTimestamp: r.srData.RTPTimestamp,
NTPTimestamp: r.srData.NTPTimestamp,
ArrivalTime: r.srData.ArrivalTime,
return &RTCPSenderReportDataExt{
SenderReportData: r.srDataExt.SenderReportData,
SmoothedOWD: r.srDataExt.SmoothedOWD,
}
}
func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, srData *RTCPSenderReportData) *rtcp.SenderReport {
func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, srDataExt *RTCPSenderReportDataExt) *rtcp.SenderReport {
r.lock.RLock()
defer r.lock.RUnlock()
@@ -714,59 +741,39 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, srData *RTCPSenderReportData
return nil
}
packetCount := r.getTotalPacketsPrimary() + r.packetsDuplicate + r.packetsPadding
if packetCount == r.lastSRPackets {
// no packets sent since last report
return nil
}
if srData == nil || srData.NTPTimestamp == 0 || srData.ArrivalTime.IsZero() {
if srDataExt == nil || srDataExt.SenderReportData.NTPTimestamp == 0 || srDataExt.SenderReportData.ArrivalTime.IsZero() {
// no sender report from publisher
return nil
}
// NTP timestamp in sender report from publisher side could have a different base,
// i. e. it may not be wall clock time at the time of send.
// i. e. although it should be wall clock at time of send, have observed instances of older timer.
// It is not possible to accurately calculate current time in the NTP time base of the publisher side.
// Time of arrival of sender report from publisher side can be stored and time since that arrival
// can be calculated using local time base and that can be used to adjust the NTP time stamp to current time.
// However, that does not account for the variable time of network propagation of sender report.
// So, it is not possible to get accurate NTP timestamp of current time in publisher's time base.
//
// As a compromise, the NTP timestamp corresponding to the last sent RTP packet is calculated and used.
// That does mean it will not be very accurate, i. e. a bit of time could have elapsed since the last packet transmit.
// But, it is okay as NTP time stamp is used referentially to calculate RTT.
//
// NOTE: Large amounts of time without packets sent will cause error in this calculation,
// i. e. RTP time stamp rolling over will cause incorrect calculations (approx 13h for video and 25h for 48 KHz audio)
var nowNTP mediatransportutil.NtpTime
var nowRTP uint32
if r.lastSRNTP != 0 {
sinceLastSR := time.Duration(float64(r.highestTS-r.lastSRRTP) / float64(r.params.ClockRate) * float64(time.Second))
nowNTP = mediatransportutil.ToNtpTime(r.lastSRNTP.Time().Add(sinceLastSR))
nowRTP = r.highestTS
// So, using a smoothed version of one way delay for use in sender reports.
now := time.Now()
nowNTP := mediatransportutil.ToNtpTime(now)
nowRTP := r.highestTS
smoothedLocalTimeOfLatestSenderReportNTP := srDataExt.SenderReportData.NTPTimestamp.Time().Add(srDataExt.SmoothedOWD)
if smoothedLocalTimeOfLatestSenderReportNTP.After(now) {
r.logger.Infow("smoothed time of NTP is ahead",
"now", now,
"smoothed", smoothedLocalTimeOfLatestSenderReportNTP,
"diff", smoothedLocalTimeOfLatestSenderReportNTP.Sub(now),
)
nowRTP += uint32(now.Sub(time.Unix(0, r.highestTime)).Milliseconds() * int64(r.params.ClockRate) / 1000)
} else {
if (r.highestTS - srData.RTPTimestamp) > (1 << 31) {
// sender report is newer than last packet sent, use it
nowNTP = srData.NTPTimestamp
nowRTP = srData.RTPTimestamp
} else {
sinceLastSR := time.Duration(float64(r.highestTS-srData.RTPTimestamp) / float64(r.params.ClockRate) * float64(time.Second))
nowNTP = mediatransportutil.ToNtpTime(srData.NTPTimestamp.Time().Add(sinceLastSR))
nowRTP = r.highestTS
}
nowRTP = srDataExt.SenderReportData.RTPTimestamp + uint32(now.Sub(smoothedLocalTimeOfLatestSenderReportNTP).Milliseconds()*int64(r.params.ClockRate)/1000)
}
r.lastSRNTP = nowNTP
r.lastSRRTP = nowRTP
r.lastSRAt = time.Now()
r.lastSRPackets = packetCount
return &rtcp.SenderReport{
SSRC: ssrc,
NTPTime: uint64(nowNTP),
RTPTime: nowRTP,
PacketCount: packetCount,
PacketCount: r.getTotalPacketsPrimary() + r.packetsDuplicate + r.packetsPadding,
OctetCount: uint32(r.bytes + r.bytesDuplicate + r.bytesPadding),
}
}
@@ -813,8 +820,8 @@ func (r *RTPStats) SnapshotRtcpReceptionReport(ssrc uint32, proxyFracLost uint8,
}
var dlsr uint32
if r.srData != nil && !r.srData.ArrivalTime.IsZero() {
delayMS := uint32(time.Since(r.srData.ArrivalTime).Milliseconds())
if r.srDataExt != nil && !r.srDataExt.SenderReportData.ArrivalTime.IsZero() {
delayMS := uint32(time.Since(r.srDataExt.SenderReportData.ArrivalTime).Milliseconds())
dlsr = (delayMS / 1e3) << 16
dlsr |= (delayMS % 1e3) * 65536 / 1000
}
@@ -826,8 +833,8 @@ func (r *RTPStats) SnapshotRtcpReceptionReport(ssrc uint32, proxyFracLost uint8,
}
lastSR := uint32(0)
if r.srData != nil {
lastSR = uint32(r.srData.NTPTimestamp >> 16)
if r.srDataExt != nil {
lastSR = uint32(r.srDataExt.SenderReportData.NTPTimestamp >> 16)
}
return &rtcp.ReceptionReport{
SSRC: ssrc,
+2 -2
View File
@@ -740,7 +740,7 @@ func (d *DownTrack) CloseWithFlush(flush bool) {
d.bindLock.Unlock()
d.connectionStats.Close()
d.rtpStats.Stop()
d.logger.Infow("rtp stats", "direction", "downstream", "stats", d.rtpStats.ToString())
d.logger.Infow("rtp stats", "direction", "downstream", "mime", d.mime, "stats", d.rtpStats.ToString())
if d.onMaxLayerChanged != nil && d.kind == webrtc.RTPCodecTypeVideo {
d.onMaxLayerChanged(d, InvalidLayerSpatial)
@@ -974,7 +974,7 @@ func (d *DownTrack) CreateSenderReport() *rtcp.SenderReport {
return nil
}
return d.rtpStats.GetRtcpSenderReport(d.ssrc, d.receiver.GetRTCPSenderReportData(d.forwarder.GetReferenceLayerSpatial()))
return d.rtpStats.GetRtcpSenderReport(d.ssrc, d.receiver.GetRTCPSenderReportDataExt(d.forwarder.GetReferenceLayerSpatial()))
}
func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan struct{} {
+10 -10
View File
@@ -65,7 +65,7 @@ type TrackReceiver interface {
GetTemporalLayerFpsForSpatial(layer int32) []float32
GetRTCPSenderReportData(layer int32) *buffer.RTCPSenderReportData
GetRTCPSenderReportDataExt(layer int32) *buffer.RTCPSenderReportDataExt
GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error)
}
@@ -698,7 +698,7 @@ func (w *WebRTCReceiver) GetTemporalLayerFpsForSpatial(layer int32) []float32 {
return b.GetTemporalLayerFpsForSpatial(layer)
}
func (w *WebRTCReceiver) GetRTCPSenderReportData(layer int32) *buffer.RTCPSenderReportData {
func (w *WebRTCReceiver) GetRTCPSenderReportDataExt(layer int32) *buffer.RTCPSenderReportDataExt {
w.bufferMu.RLock()
defer w.bufferMu.RUnlock()
@@ -706,7 +706,7 @@ func (w *WebRTCReceiver) GetRTCPSenderReportData(layer int32) *buffer.RTCPSender
return nil
}
return w.buffers[layer].GetSenderReportData()
return w.buffers[layer].GetSenderReportDataExt()
}
func (w *WebRTCReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) {
@@ -721,8 +721,8 @@ func (w *WebRTCReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, r
if bLayer == nil {
return 0, fmt.Errorf("invalid layer: %d", layer)
}
srLayer := bLayer.GetSenderReportData()
if srLayer == nil || srLayer.NTPTimestamp == 0 {
srLayer := bLayer.GetSenderReportDataExt()
if srLayer == nil || srLayer.SenderReportData.NTPTimestamp == 0 {
return 0, fmt.Errorf("layer rtcp sender report not available: %d", layer)
}
@@ -730,8 +730,8 @@ func (w *WebRTCReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, r
if bReferenceLayer == nil {
return 0, fmt.Errorf("invalid reference layer: %d", referenceLayer)
}
srRef := bReferenceLayer.GetSenderReportData()
if srRef == nil || srRef.NTPTimestamp == 0 {
srRef := bReferenceLayer.GetSenderReportDataExt()
if srRef == nil || srRef.SenderReportData.NTPTimestamp == 0 {
return 0, fmt.Errorf("reference layer rtcp sender report not available: %d", referenceLayer)
}
@@ -739,12 +739,12 @@ func (w *WebRTCReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, r
// NOTE: It is possible that reference layer has stopped (due to dynacast/adaptive streaming OR publisher
// constraints). It should be okay even if the layer has stopped for a long time when using modulo arithmetic for
// RTP time stamp (uint32 arithmetic).
ntpDiff := float64(int64(srRef.NTPTimestamp-srLayer.NTPTimestamp)) / float64(1<<32)
normalizedTS := srLayer.RTPTimestamp + uint32(ntpDiff*float64(w.codec.ClockRate))
ntpDiff := float64(int64(srRef.SenderReportData.NTPTimestamp-srLayer.SenderReportData.NTPTimestamp)) / float64(1<<32)
normalizedTS := srLayer.SenderReportData.RTPTimestamp + uint32(ntpDiff*float64(w.codec.ClockRate))
// now that both RTP timestamps correspond to roughly the same NTP time,
// the diff between them is the offset in RTP timestamp units between layer and referenceLayer.
// Add the offset to layer's ts to map it to corresponding RTP timestamp in
// the reference layer.
return ts + (srRef.RTPTimestamp - normalizedTS), nil
return ts + (srRef.SenderReportData.RTPTimestamp - normalizedTS), nil
}