Cleaning up smoothed OWD calculation for sender report. (#1684)

* Keep track of expected RTP time stamp and control drift.

- Use monotonic clock in RTCP Sender Report and packet times
- Keep the time stamp close to expected time stamp on layer/SSRC
  switches

* clean up

* fix test compile

* more test compile failures

* anticipatory clean up

* further clean up

* add received sender report logging
This commit is contained in:
Raja Subramanian
2023-05-05 13:14:12 +05:30
committed by GitHub
parent e00ff50cd6
commit 25d6fd751f
7 changed files with 80 additions and 167 deletions
-8
View File
@@ -12,7 +12,6 @@ import (
"github.com/livekit/protocol/logger"
"github.com/livekit/livekit-server/pkg/sfu"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
)
// wrapper around WebRTC receiver, overriding its ID
@@ -289,13 +288,6 @@ func (d *DummyReceiver) GetRedReceiver() sfu.TrackReceiver {
return d
}
func (d *DummyReceiver) GetRTCPSenderReportDataExt(layer int32) *buffer.RTCPSenderReportDataExt {
if r, ok := d.receiver.Load().(sfu.TrackReceiver); ok {
return r.GetRTCPSenderReportDataExt(layer)
}
return nil
}
func (d *DummyReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) {
if r, ok := d.receiver.Load().(sfu.TrackReceiver); ok {
return r.GetReferenceLayerRTPTimestamp(ts, layer, referenceLayer)
+2 -2
View File
@@ -668,12 +668,12 @@ func (b *Buffer) SetSenderReportData(rtpTime uint32, ntpTime uint64) {
}
}
func (b *Buffer) GetSenderReportDataExt() *RTCPSenderReportDataExt {
func (b *Buffer) GetSenderReportData() *RTCPSenderReportData {
b.RLock()
defer b.RUnlock()
if b.rtpStats != nil {
return b.rtpStats.GetRtcpSenderReportDataExt()
return b.rtpStats.GetRtcpSenderReportData()
}
return nil
+67 -123
View File
@@ -3,7 +3,6 @@ package buffer
import (
"errors"
"fmt"
"math"
"sync"
"time"
@@ -93,11 +92,6 @@ type RTCPSenderReportData struct {
ArrivalTime time.Time
}
type RTCPSenderReportDataExt struct {
SenderReportData RTCPSenderReportData
SmoothedOWD time.Duration
}
type RTPStatsParams struct {
ClockRate uint32
IsReceiverReportDriven bool
@@ -180,13 +174,9 @@ type RTPStats struct {
rtt uint32
maxRtt uint32
srDataExt *RTCPSenderReportDataExt
firstSenderReportNTP mediatransportutil.NtpTime
firstSenderReportRTP uint32
firstFeedSenderReportNTP mediatransportutil.NtpTime
firstFeedSenderReportRTP uint32
lastSRTime time.Time
lastSRNTP mediatransportutil.NtpTime
srData *RTCPSenderReportData
lastSRTime time.Time
lastSRNTP mediatransportutil.NtpTime
nextSnapshotId uint32
snapshots map[uint32]*Snapshot
@@ -279,18 +269,12 @@ func (r *RTPStats) Seed(from *RTPStats) {
r.rtt = from.rtt
r.maxRtt = from.maxRtt
if from.srDataExt != nil {
r.srDataExt = &RTCPSenderReportDataExt{
SenderReportData: from.srDataExt.SenderReportData,
SmoothedOWD: from.srDataExt.SmoothedOWD,
}
if from.srData != nil {
srData := *from.srData
r.srData = &srData
} else {
r.srDataExt = nil
r.srData = nil
}
r.firstSenderReportNTP = from.firstSenderReportNTP
r.firstSenderReportRTP = from.firstSenderReportRTP
r.firstFeedSenderReportNTP = from.firstFeedSenderReportNTP
r.firstFeedSenderReportRTP = from.firstFeedSenderReportRTP
r.lastSRTime = from.lastSRTime
r.lastSRNTP = from.lastSRNTP
@@ -732,52 +716,37 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) {
defer r.lock.Unlock()
if srData == nil {
r.srDataExt = nil
r.srData = nil
return
}
// prevent against extreme case of anachronous sender reports
if r.srDataExt != nil && r.srDataExt.SenderReportData.NTPTimestamp > srData.NTPTimestamp {
if r.srData != nil && r.srData.NTPTimestamp > srData.NTPTimestamp {
r.logger.Debugw(
"anachronous RTCP sender report",
"current", srData.NTPTimestamp.Time(),
"last", r.srDataExt.SenderReportData.NTPTimestamp.Time(),
"last", r.srData.NTPTimestamp.Time(),
)
return
}
// Low pass filter one-way-delay (owd) to normalize time stamp to local time base when sending RTCP Sender Report.
// Forwarding RTCP Sender Report would be ideal. But, there are a couple of issues with that
// 1. Senders could have different clocks.
// 2. Adjusting to current time as required by RTCP spec.
// By normalizing to local clock, these issues can be addressed. However, normalization is not straightforward
// as it is not possible to know the propagation delay and processing delay at both ends (send side processing
// after time stamping the RTCP packet and receive side processing after reading packet off the wire).
// Smoothed version of OWD is used to alleviate irregularities somewhat.
owd := srData.ArrivalTime.Sub(srData.NTPTimestamp.Time())
if r.srDataExt != nil {
prevOwd := r.srDataExt.SenderReportData.ArrivalTime.Sub(r.srDataExt.SenderReportData.NTPTimestamp.Time())
if time.Duration(math.Abs(float64(owd)-float64(prevOwd))) > TooLargeOWDDelta {
r.logger.Debugw("large delta in one-way-delay", "owd", owd, "prevOwd", prevOwd)
}
}
smoothedOwd := owd
if r.srDataExt != nil {
smoothedOwd = r.srDataExt.SmoothedOWD
}
smoothedOwd = (owd + smoothedOwd) / 2
// TODO-REMOVE-AFTER-DEBUG
// TODO-REMOVE-AFTER-DEBUG-START
if r.params.ClockRate != 90000 { // log only for audio as it is less frequent
ntpTime := srData.NTPTimestamp.Time()
var ntpDiffSinceLast, arrivalDiffSinceLast time.Duration
var rtpDiffSinceLast uint32
if r.srDataExt != nil {
ntpDiffSinceLast = ntpTime.Sub(r.srDataExt.SenderReportData.NTPTimestamp.Time())
rtpDiffSinceLast = srData.RTPTimestamp - r.srDataExt.SenderReportData.RTPTimestamp
arrivalDiffSinceLast = srData.ArrivalTime.Sub(r.srDataExt.SenderReportData.ArrivalTime)
if r.srData != nil {
ntpDiffSinceLast = ntpTime.Sub(r.srData.NTPTimestamp.Time())
rtpDiffSinceLast = srData.RTPTimestamp - r.srData.RTPTimestamp
arrivalDiffSinceLast = srData.ArrivalTime.Sub(r.srData.ArrivalTime)
}
timeSinceFirst := srData.NTPTimestamp.Time().Sub(r.firstTime)
rtpDiffSinceFirst := getExtTS(srData.RTPTimestamp, r.tsCycles) - r.extStartTS
drift := int64(uint64(timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9) - rtpDiffSinceFirst)
driftMs := (float64(drift) * 1000) / float64(r.params.ClockRate)
r.logger.Debugw(
"received sender report",
"ntp", ntpTime,
@@ -787,28 +756,28 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) {
"rtpDiff", rtpDiffSinceLast,
"arrivalDiff", arrivalDiffSinceLast,
"expectedTimeDiff", float64(rtpDiffSinceLast)/float64(r.params.ClockRate),
"owd", owd,
"smoothedOwd", smoothedOwd,
"timeSinceFirst", timeSinceFirst,
"rtpDiffSinceFirst", rtpDiffSinceFirst,
"drift", drift,
"driftMs", driftMs,
)
}
r.srDataExt = &RTCPSenderReportDataExt{
SenderReportData: *srData,
SmoothedOWD: smoothedOwd,
}
// TODO-REMOVE-AFTER-DEBUG-END
srDataCopy := *srData
r.srData = &srDataCopy
}
func (r *RTPStats) GetRtcpSenderReportDataExt() *RTCPSenderReportDataExt {
func (r *RTPStats) GetRtcpSenderReportData() *RTCPSenderReportData {
r.lock.RLock()
defer r.lock.RUnlock()
if r.srDataExt == nil {
if r.srData == nil {
return nil
}
return &RTCPSenderReportDataExt{
SenderReportData: r.srDataExt.SenderReportData,
SmoothedOWD: r.srDataExt.SmoothedOWD,
}
srDataCopy := *r.srData
return &srDataCopy
}
func (r *RTPStats) GetExpectedRTPTimestamp(at time.Time) (uint32, error) {
@@ -820,15 +789,15 @@ func (r *RTPStats) GetExpectedRTPTimestamp(at time.Time) (uint32, error) {
}
timeDiff := at.Sub(r.firstTime)
rtpDiff := timeDiff.Nanoseconds() * int64(r.params.ClockRate) / 1e9
expectedExtRTP := r.extStartTS + uint64(rtpDiff)
expectedRTPDiff := timeDiff.Nanoseconds() * int64(r.params.ClockRate) / 1e9
expectedExtRTP := r.extStartTS + uint64(expectedRTPDiff)
r.logger.Debugw(
"expected RTP timestamp",
"firstTime", r.firstTime.String(),
"checkAt", at.String(),
"timeDiff", timeDiff,
"firstRTP", r.extStartTS,
"rtpDiff", rtpDiff,
"expectedRTPDiff", expectedRTPDiff,
"expectedExtRTP", expectedExtRTP,
"expectedRTP", uint32(expectedExtRTP),
"highestTS", r.highestTS,
@@ -837,7 +806,7 @@ func (r *RTPStats) GetExpectedRTPTimestamp(at time.Time) (uint32, error) {
return uint32(expectedExtRTP), nil
}
func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, srDataExt *RTCPSenderReportDataExt) *rtcp.SenderReport {
func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) *rtcp.SenderReport {
r.lock.Lock()
defer r.lock.Unlock()
@@ -845,11 +814,6 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, srDataExt *RTCPSenderReportD
return nil
}
if srDataExt == nil || srDataExt.SenderReportData.NTPTimestamp == 0 || srDataExt.SenderReportData.ArrivalTime.IsZero() {
// no sender report from publisher
return nil
}
// construct current time based on monotonic clock
timeSinceFirst := time.Since(r.firstTime)
now := r.firstTime.Add(timeSinceFirst)
@@ -861,7 +825,8 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, srDataExt *RTCPSenderReportD
"anachronous sender report",
"firstTime", r.firstTime.String(),
"currentTime", now.String(),
"timSinceFirst", timeSinceFirst,
"highestTime", r.highestTime.String(),
"timeSinceFirst", timeSinceFirst,
"extStartTS", r.extStartTS,
"highestExtRTP", getExtTS(r.highestTS, r.tsCycles),
"expectedExtRTP", expectedExtRTP,
@@ -871,54 +836,33 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, srDataExt *RTCPSenderReportD
timeSinceHighest := time.Since(r.highestTime)
nowRTP := r.highestTS + uint32(timeSinceHighest.Nanoseconds()*int64(r.params.ClockRate)/1e9)
// TODO-REMOVE-AFTER-DEBUG
if r.firstSenderReportNTP == 0 {
r.firstSenderReportNTP = nowNTP
r.firstSenderReportRTP = nowRTP
// TODO-REMOVE-AFTER-DEBUG-START
ntpTime := nowNTP.Time()
r.firstFeedSenderReportNTP = srDataExt.SenderReportData.NTPTimestamp
r.firstFeedSenderReportRTP = srDataExt.SenderReportData.RTPTimestamp
} else {
ntpTime := nowNTP.Time()
ntpDiffLocal := ntpTime.Sub(r.highestTime)
rtpDiffLocal := int32(nowRTP - r.highestTS)
rtpOffsetLocal := int32(nowRTP - r.highestTS - uint32(ntpDiffLocal.Nanoseconds()*int64(r.params.ClockRate)/1e9))
ntpDiffLocal := ntpTime.Sub(r.highestTime)
rtpDiffLocal := int32(nowRTP - r.highestTS)
rtpOffsetLocal := int32(nowRTP - r.highestTS - uint32(ntpDiffLocal.Nanoseconds()*int64(r.params.ClockRate)/1e9))
timeSinceFirst = nowNTP.Time().Sub(r.firstTime)
rtpDiffSinceFirst := getExtTS(nowRTP, r.tsCycles) - r.extStartTS
drift := int64(uint64(timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9) - rtpDiffSinceFirst)
driftMs := (float64(drift) * 1000) / float64(r.params.ClockRate)
timeSinceFirst := nowNTP.Time().Sub(r.firstSenderReportNTP.Time())
rtpDiffSinceFirst := getExtTS(nowRTP, r.tsCycles) - getExtTS(r.firstSenderReportRTP, 0)
drift := int64(uint64(timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9) - rtpDiffSinceFirst)
driftMs := (float64(drift) * 1000) / float64(r.params.ClockRate)
feedTimeSinceFirst := srDataExt.SenderReportData.NTPTimestamp.Time().Sub(r.firstFeedSenderReportNTP.Time())
// using tsCycles for extending feed time stamp too
feedRtpDiffSinceFirst := getExtTS(srDataExt.SenderReportData.RTPTimestamp, r.tsCycles) - getExtTS(r.firstFeedSenderReportRTP, 0)
feedDrift := int64(uint64(feedTimeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9) - feedRtpDiffSinceFirst)
feedDriftMs := (float64(feedDrift) * 1000) / float64(r.params.ClockRate)
r.logger.Debugw(
"sending sender report",
"highestTS", r.highestTS,
"highestTime", r.highestTime.String(),
"reportTS", nowRTP,
"reportTime", ntpTime.String(),
"rtpDiffLocal", rtpDiffLocal,
"ntpDiffLocal", ntpDiffLocal,
"rtpOffsetLocal", rtpOffsetLocal,
"timeSinceFirst", timeSinceFirst,
"rtpDiffSinceFirst", rtpDiffSinceFirst,
"drift", drift,
"driftMs", driftMs,
"feedRTP", srDataExt.SenderReportData.RTPTimestamp,
"feedNTP", srDataExt.SenderReportData.NTPTimestamp.Time().String(),
"feedArrival", srDataExt.SenderReportData.ArrivalTime.String(),
"smoothedOWD", srDataExt.SmoothedOWD,
"feedTimeSinceFirst", feedTimeSinceFirst,
"feedRtpDiffSinceFirst", feedRtpDiffSinceFirst,
"feedDrift", feedDrift,
"feedDriftMs", feedDriftMs,
)
}
r.logger.Debugw(
"sending sender report",
"highestTS", r.highestTS,
"highestTime", r.highestTime.String(),
"reportTS", nowRTP,
"reportTime", ntpTime.String(),
"rtpDiffLocal", rtpDiffLocal,
"ntpDiffLocal", ntpDiffLocal,
"rtpOffsetLocal", rtpOffsetLocal,
"timeSinceFirst", timeSinceFirst,
"rtpDiffSinceFirst", rtpDiffSinceFirst,
"drift", drift,
"driftMs", driftMs,
)
// TODO-REMOVE-AFTER-DEBUG-END
r.lastSRTime = now
r.lastSRNTP = nowNTP
@@ -965,15 +909,15 @@ func (r *RTPStats) SnapshotRtcpReceptionReport(ssrc uint32, proxyFracLost uint8,
}
var dlsr uint32
if r.srDataExt != nil && !r.srDataExt.SenderReportData.ArrivalTime.IsZero() {
delayMS := uint32(time.Since(r.srDataExt.SenderReportData.ArrivalTime).Milliseconds())
if r.srData != nil && !r.srData.ArrivalTime.IsZero() {
delayMS := uint32(time.Since(r.srData.ArrivalTime).Milliseconds())
dlsr = (delayMS / 1e3) << 16
dlsr |= (delayMS % 1e3) * 65536 / 1000
}
lastSR := uint32(0)
if r.srDataExt != nil {
lastSR = uint32(r.srDataExt.SenderReportData.NTPTimestamp >> 16)
if r.srData != nil {
lastSR = uint32(r.srData.NTPTimestamp >> 16)
}
return &rtcp.ReceptionReport{
SSRC: ssrc,
+1 -1
View File
@@ -1113,7 +1113,7 @@ func (d *DownTrack) CreateSenderReport() *rtcp.SenderReport {
return nil
}
return d.rtpStats.GetRtcpSenderReport(d.ssrc, d.receiver.GetRTCPSenderReportDataExt(d.forwarder.GetReferenceLayerSpatial()))
return d.rtpStats.GetRtcpSenderReport(d.ssrc)
}
func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan struct{} {
-7
View File
@@ -469,13 +469,6 @@ func (f *Forwarder) TargetLayer() buffer.VideoLayer {
return f.vls.GetTarget()
}
func (f *Forwarder) GetReferenceLayerSpatial() int32 {
f.lock.RLock()
defer f.lock.RUnlock()
return f.referenceLayerSpatial
}
func (f *Forwarder) isDeficientLocked() bool {
return f.lastAllocation.IsDeficient
}
+1 -6
View File
@@ -65,7 +65,6 @@ type TrackReceiver interface {
GetTemporalLayerFpsForSpatial(layer int32) []float32
GetRTCPSenderReportDataExt(layer int32) *buffer.RTCPSenderReportDataExt
GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error)
}
@@ -311,7 +310,7 @@ func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buff
})
buff.OnRtcpFeedback(w.sendRTCP)
buff.OnRtcpSenderReport(func(srData *buffer.RTCPSenderReportData) {
w.streamTrackerManager.SetRTCPSenderReportDataExt(layer, buff.GetSenderReportDataExt())
w.streamTrackerManager.SetRTCPSenderReportData(layer, buff.GetSenderReportData())
w.downTrackSpreader.Broadcast(func(dt TrackSender) {
_ = dt.HandleRTCPSenderReportData(w.codec.PayloadType, layer, srData)
@@ -746,10 +745,6 @@ func (w *WebRTCReceiver) GetTemporalLayerFpsForSpatial(layer int32) []float32 {
return b.GetTemporalLayerFpsForSpatial(layer)
}
func (w *WebRTCReceiver) GetRTCPSenderReportDataExt(layer int32) *buffer.RTCPSenderReportDataExt {
return w.streamTrackerManager.GetRTCPSenderReportDataExt(layer)
}
func (w *WebRTCReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) {
return w.streamTrackerManager.GetReferenceLayerRTPTimestamp(ts, layer, referenceLayer)
}
+9 -20
View File
@@ -43,7 +43,7 @@ type StreamTrackerManager struct {
paused bool
senderReportMu sync.RWMutex
senderReports [buffer.DefaultMaxLayerSpatial + 1]*buffer.RTCPSenderReportDataExt
senderReports [buffer.DefaultMaxLayerSpatial + 1]*buffer.RTCPSenderReportData
closed core.Fuse
@@ -475,7 +475,7 @@ func (s *StreamTrackerManager) maxExpectedLayerFromTrackInfo() {
}
}
func (s *StreamTrackerManager) SetRTCPSenderReportDataExt(layer int32, senderReport *buffer.RTCPSenderReportDataExt) {
func (s *StreamTrackerManager) SetRTCPSenderReportData(layer int32, senderReport *buffer.RTCPSenderReportData) {
s.senderReportMu.Lock()
defer s.senderReportMu.Unlock()
@@ -486,17 +486,6 @@ func (s *StreamTrackerManager) SetRTCPSenderReportDataExt(layer int32, senderRep
s.senderReports[layer] = senderReport
}
func (s *StreamTrackerManager) GetRTCPSenderReportDataExt(layer int32) *buffer.RTCPSenderReportDataExt {
s.senderReportMu.RLock()
defer s.senderReportMu.RUnlock()
if layer < 0 || int(layer) >= len(s.senderReports) {
return nil
}
return s.senderReports[layer]
}
func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) {
s.senderReportMu.RLock()
defer s.senderReportMu.RUnlock()
@@ -509,19 +498,19 @@ func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ts uint32, layer in
return ts, nil
}
var srLayer *buffer.RTCPSenderReportDataExt
var srLayer *buffer.RTCPSenderReportData
if int(layer) < len(s.senderReports) {
srLayer = s.senderReports[layer]
}
if srLayer == nil || srLayer.SenderReportData.NTPTimestamp == 0 {
if srLayer == nil || srLayer.NTPTimestamp == 0 {
return 0, fmt.Errorf("layer rtcp sender report not available: %d", layer)
}
var srRef *buffer.RTCPSenderReportDataExt
var srRef *buffer.RTCPSenderReportData
if int(referenceLayer) < len(s.senderReports) {
srRef = s.senderReports[referenceLayer]
}
if srRef == nil || srRef.SenderReportData.NTPTimestamp == 0 {
if srRef == nil || srRef.NTPTimestamp == 0 {
return 0, fmt.Errorf("reference layer rtcp sender report not available: %d", referenceLayer)
}
@@ -529,15 +518,15 @@ func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ts uint32, layer in
// NOTE: It is possible that reference layer has stopped (due to dynacast/adaptive streaming OR publisher
// constraints). It should be okay even if the layer has stopped for a long time when using modulo arithmetic for
// RTP time stamp (uint32 arithmetic).
ntpDiff := srRef.SenderReportData.NTPTimestamp.Time().Sub(srLayer.SenderReportData.NTPTimestamp.Time())
ntpDiff := srRef.NTPTimestamp.Time().Sub(srLayer.NTPTimestamp.Time())
rtpDiff := ntpDiff.Nanoseconds() * int64(s.clockRate) / 1e9
normalizedTS := srLayer.SenderReportData.RTPTimestamp + uint32(rtpDiff)
normalizedTS := srLayer.RTPTimestamp + uint32(rtpDiff)
// now that both RTP timestamps correspond to roughly the same NTP time,
// the diff between them is the offset in RTP timestamp units between layer and referenceLayer.
// Add the offset to layer's ts to map it to corresponding RTP timestamp in
// the reference layer.
return ts + (srRef.SenderReportData.RTPTimestamp - normalizedTS), nil
return ts + (srRef.RTPTimestamp - normalizedTS), nil
}
func (s *StreamTrackerManager) GetMaxTemporalLayerSeen() int32 {