diff --git a/pkg/rtc/wrappedreceiver.go b/pkg/rtc/wrappedreceiver.go index b962ba809..068f8b69d 100644 --- a/pkg/rtc/wrappedreceiver.go +++ b/pkg/rtc/wrappedreceiver.go @@ -12,7 +12,6 @@ import ( "github.com/livekit/protocol/logger" "github.com/livekit/livekit-server/pkg/sfu" - "github.com/livekit/livekit-server/pkg/sfu/buffer" ) // wrapper around WebRTC receiver, overriding its ID @@ -289,13 +288,6 @@ func (d *DummyReceiver) GetRedReceiver() sfu.TrackReceiver { return d } -func (d *DummyReceiver) GetRTCPSenderReportDataExt(layer int32) *buffer.RTCPSenderReportDataExt { - if r, ok := d.receiver.Load().(sfu.TrackReceiver); ok { - return r.GetRTCPSenderReportDataExt(layer) - } - return nil -} - func (d *DummyReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) { if r, ok := d.receiver.Load().(sfu.TrackReceiver); ok { return r.GetReferenceLayerRTPTimestamp(ts, layer, referenceLayer) diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index 0faea1fe8..63949a52c 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -668,12 +668,12 @@ func (b *Buffer) SetSenderReportData(rtpTime uint32, ntpTime uint64) { } } -func (b *Buffer) GetSenderReportDataExt() *RTCPSenderReportDataExt { +func (b *Buffer) GetSenderReportData() *RTCPSenderReportData { b.RLock() defer b.RUnlock() if b.rtpStats != nil { - return b.rtpStats.GetRtcpSenderReportDataExt() + return b.rtpStats.GetRtcpSenderReportData() } return nil diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index b2cca9f5d..339959e6d 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -3,7 +3,6 @@ package buffer import ( "errors" "fmt" - "math" "sync" "time" @@ -93,11 +92,6 @@ type RTCPSenderReportData struct { ArrivalTime time.Time } -type RTCPSenderReportDataExt struct { - SenderReportData RTCPSenderReportData - SmoothedOWD time.Duration -} - type RTPStatsParams struct { ClockRate uint32 IsReceiverReportDriven bool @@ -180,13 +174,9 @@ type RTPStats struct { rtt uint32 maxRtt uint32 - srDataExt *RTCPSenderReportDataExt - firstSenderReportNTP mediatransportutil.NtpTime - firstSenderReportRTP uint32 - firstFeedSenderReportNTP mediatransportutil.NtpTime - firstFeedSenderReportRTP uint32 - lastSRTime time.Time - lastSRNTP mediatransportutil.NtpTime + srData *RTCPSenderReportData + lastSRTime time.Time + lastSRNTP mediatransportutil.NtpTime nextSnapshotId uint32 snapshots map[uint32]*Snapshot @@ -279,18 +269,12 @@ func (r *RTPStats) Seed(from *RTPStats) { r.rtt = from.rtt r.maxRtt = from.maxRtt - if from.srDataExt != nil { - r.srDataExt = &RTCPSenderReportDataExt{ - SenderReportData: from.srDataExt.SenderReportData, - SmoothedOWD: from.srDataExt.SmoothedOWD, - } + if from.srData != nil { + srData := *from.srData + r.srData = &srData } else { - r.srDataExt = nil + r.srData = nil } - r.firstSenderReportNTP = from.firstSenderReportNTP - r.firstSenderReportRTP = from.firstSenderReportRTP - r.firstFeedSenderReportNTP = from.firstFeedSenderReportNTP - r.firstFeedSenderReportRTP = from.firstFeedSenderReportRTP r.lastSRTime = from.lastSRTime r.lastSRNTP = from.lastSRNTP @@ -732,52 +716,37 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) { defer r.lock.Unlock() if srData == nil { - r.srDataExt = nil + r.srData = nil return } // prevent against extreme case of anachronous sender reports - if r.srDataExt != nil && r.srDataExt.SenderReportData.NTPTimestamp > srData.NTPTimestamp { + if r.srData != nil && r.srData.NTPTimestamp > srData.NTPTimestamp { r.logger.Debugw( "anachronous RTCP sender report", "current", srData.NTPTimestamp.Time(), - "last", r.srDataExt.SenderReportData.NTPTimestamp.Time(), + "last", r.srData.NTPTimestamp.Time(), ) return } - // Low pass filter one-way-delay (owd) to normalize time stamp to local time base when sending RTCP Sender Report. - // Forwarding RTCP Sender Report would be ideal. But, there are a couple of issues with that - // 1. Senders could have different clocks. - // 2. Adjusting to current time as required by RTCP spec. - // By normalizing to local clock, these issues can be addressed. However, normalization is not straightforward - // as it is not possible to know the propagation delay and processing delay at both ends (send side processing - // after time stamping the RTCP packet and receive side processing after reading packet off the wire). - // Smoothed version of OWD is used to alleviate irregularities somewhat. - owd := srData.ArrivalTime.Sub(srData.NTPTimestamp.Time()) - if r.srDataExt != nil { - prevOwd := r.srDataExt.SenderReportData.ArrivalTime.Sub(r.srDataExt.SenderReportData.NTPTimestamp.Time()) - if time.Duration(math.Abs(float64(owd)-float64(prevOwd))) > TooLargeOWDDelta { - r.logger.Debugw("large delta in one-way-delay", "owd", owd, "prevOwd", prevOwd) - } - } - - smoothedOwd := owd - if r.srDataExt != nil { - smoothedOwd = r.srDataExt.SmoothedOWD - } - smoothedOwd = (owd + smoothedOwd) / 2 - // TODO-REMOVE-AFTER-DEBUG + // TODO-REMOVE-AFTER-DEBUG-START if r.params.ClockRate != 90000 { // log only for audio as it is less frequent ntpTime := srData.NTPTimestamp.Time() var ntpDiffSinceLast, arrivalDiffSinceLast time.Duration var rtpDiffSinceLast uint32 - if r.srDataExt != nil { - ntpDiffSinceLast = ntpTime.Sub(r.srDataExt.SenderReportData.NTPTimestamp.Time()) - rtpDiffSinceLast = srData.RTPTimestamp - r.srDataExt.SenderReportData.RTPTimestamp - arrivalDiffSinceLast = srData.ArrivalTime.Sub(r.srDataExt.SenderReportData.ArrivalTime) + if r.srData != nil { + ntpDiffSinceLast = ntpTime.Sub(r.srData.NTPTimestamp.Time()) + rtpDiffSinceLast = srData.RTPTimestamp - r.srData.RTPTimestamp + arrivalDiffSinceLast = srData.ArrivalTime.Sub(r.srData.ArrivalTime) } + + timeSinceFirst := srData.NTPTimestamp.Time().Sub(r.firstTime) + rtpDiffSinceFirst := getExtTS(srData.RTPTimestamp, r.tsCycles) - r.extStartTS + drift := int64(uint64(timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9) - rtpDiffSinceFirst) + driftMs := (float64(drift) * 1000) / float64(r.params.ClockRate) + r.logger.Debugw( "received sender report", "ntp", ntpTime, @@ -787,28 +756,28 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) { "rtpDiff", rtpDiffSinceLast, "arrivalDiff", arrivalDiffSinceLast, "expectedTimeDiff", float64(rtpDiffSinceLast)/float64(r.params.ClockRate), - "owd", owd, - "smoothedOwd", smoothedOwd, + "timeSinceFirst", timeSinceFirst, + "rtpDiffSinceFirst", rtpDiffSinceFirst, + "drift", drift, + "driftMs", driftMs, ) } - r.srDataExt = &RTCPSenderReportDataExt{ - SenderReportData: *srData, - SmoothedOWD: smoothedOwd, - } + // TODO-REMOVE-AFTER-DEBUG-END + + srDataCopy := *srData + r.srData = &srDataCopy } -func (r *RTPStats) GetRtcpSenderReportDataExt() *RTCPSenderReportDataExt { +func (r *RTPStats) GetRtcpSenderReportData() *RTCPSenderReportData { r.lock.RLock() defer r.lock.RUnlock() - if r.srDataExt == nil { + if r.srData == nil { return nil } - return &RTCPSenderReportDataExt{ - SenderReportData: r.srDataExt.SenderReportData, - SmoothedOWD: r.srDataExt.SmoothedOWD, - } + srDataCopy := *r.srData + return &srDataCopy } func (r *RTPStats) GetExpectedRTPTimestamp(at time.Time) (uint32, error) { @@ -820,15 +789,15 @@ func (r *RTPStats) GetExpectedRTPTimestamp(at time.Time) (uint32, error) { } timeDiff := at.Sub(r.firstTime) - rtpDiff := timeDiff.Nanoseconds() * int64(r.params.ClockRate) / 1e9 - expectedExtRTP := r.extStartTS + uint64(rtpDiff) + expectedRTPDiff := timeDiff.Nanoseconds() * int64(r.params.ClockRate) / 1e9 + expectedExtRTP := r.extStartTS + uint64(expectedRTPDiff) r.logger.Debugw( "expected RTP timestamp", "firstTime", r.firstTime.String(), "checkAt", at.String(), "timeDiff", timeDiff, "firstRTP", r.extStartTS, - "rtpDiff", rtpDiff, + "expectedRTPDiff", expectedRTPDiff, "expectedExtRTP", expectedExtRTP, "expectedRTP", uint32(expectedExtRTP), "highestTS", r.highestTS, @@ -837,7 +806,7 @@ func (r *RTPStats) GetExpectedRTPTimestamp(at time.Time) (uint32, error) { return uint32(expectedExtRTP), nil } -func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, srDataExt *RTCPSenderReportDataExt) *rtcp.SenderReport { +func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) *rtcp.SenderReport { r.lock.Lock() defer r.lock.Unlock() @@ -845,11 +814,6 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, srDataExt *RTCPSenderReportD return nil } - if srDataExt == nil || srDataExt.SenderReportData.NTPTimestamp == 0 || srDataExt.SenderReportData.ArrivalTime.IsZero() { - // no sender report from publisher - return nil - } - // construct current time based on monotonic clock timeSinceFirst := time.Since(r.firstTime) now := r.firstTime.Add(timeSinceFirst) @@ -861,7 +825,8 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, srDataExt *RTCPSenderReportD "anachronous sender report", "firstTime", r.firstTime.String(), "currentTime", now.String(), - "timSinceFirst", timeSinceFirst, + "highestTime", r.highestTime.String(), + "timeSinceFirst", timeSinceFirst, "extStartTS", r.extStartTS, "highestExtRTP", getExtTS(r.highestTS, r.tsCycles), "expectedExtRTP", expectedExtRTP, @@ -871,54 +836,33 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, srDataExt *RTCPSenderReportD timeSinceHighest := time.Since(r.highestTime) nowRTP := r.highestTS + uint32(timeSinceHighest.Nanoseconds()*int64(r.params.ClockRate)/1e9) - // TODO-REMOVE-AFTER-DEBUG - if r.firstSenderReportNTP == 0 { - r.firstSenderReportNTP = nowNTP - r.firstSenderReportRTP = nowRTP + // TODO-REMOVE-AFTER-DEBUG-START + ntpTime := nowNTP.Time() - r.firstFeedSenderReportNTP = srDataExt.SenderReportData.NTPTimestamp - r.firstFeedSenderReportRTP = srDataExt.SenderReportData.RTPTimestamp - } else { - ntpTime := nowNTP.Time() + ntpDiffLocal := ntpTime.Sub(r.highestTime) + rtpDiffLocal := int32(nowRTP - r.highestTS) + rtpOffsetLocal := int32(nowRTP - r.highestTS - uint32(ntpDiffLocal.Nanoseconds()*int64(r.params.ClockRate)/1e9)) - ntpDiffLocal := ntpTime.Sub(r.highestTime) - rtpDiffLocal := int32(nowRTP - r.highestTS) - rtpOffsetLocal := int32(nowRTP - r.highestTS - uint32(ntpDiffLocal.Nanoseconds()*int64(r.params.ClockRate)/1e9)) + timeSinceFirst = nowNTP.Time().Sub(r.firstTime) + rtpDiffSinceFirst := getExtTS(nowRTP, r.tsCycles) - r.extStartTS + drift := int64(uint64(timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9) - rtpDiffSinceFirst) + driftMs := (float64(drift) * 1000) / float64(r.params.ClockRate) - timeSinceFirst := nowNTP.Time().Sub(r.firstSenderReportNTP.Time()) - rtpDiffSinceFirst := getExtTS(nowRTP, r.tsCycles) - getExtTS(r.firstSenderReportRTP, 0) - drift := int64(uint64(timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9) - rtpDiffSinceFirst) - driftMs := (float64(drift) * 1000) / float64(r.params.ClockRate) - - feedTimeSinceFirst := srDataExt.SenderReportData.NTPTimestamp.Time().Sub(r.firstFeedSenderReportNTP.Time()) - // using tsCycles for extending feed time stamp too - feedRtpDiffSinceFirst := getExtTS(srDataExt.SenderReportData.RTPTimestamp, r.tsCycles) - getExtTS(r.firstFeedSenderReportRTP, 0) - feedDrift := int64(uint64(feedTimeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9) - feedRtpDiffSinceFirst) - feedDriftMs := (float64(feedDrift) * 1000) / float64(r.params.ClockRate) - - r.logger.Debugw( - "sending sender report", - "highestTS", r.highestTS, - "highestTime", r.highestTime.String(), - "reportTS", nowRTP, - "reportTime", ntpTime.String(), - "rtpDiffLocal", rtpDiffLocal, - "ntpDiffLocal", ntpDiffLocal, - "rtpOffsetLocal", rtpOffsetLocal, - "timeSinceFirst", timeSinceFirst, - "rtpDiffSinceFirst", rtpDiffSinceFirst, - "drift", drift, - "driftMs", driftMs, - "feedRTP", srDataExt.SenderReportData.RTPTimestamp, - "feedNTP", srDataExt.SenderReportData.NTPTimestamp.Time().String(), - "feedArrival", srDataExt.SenderReportData.ArrivalTime.String(), - "smoothedOWD", srDataExt.SmoothedOWD, - "feedTimeSinceFirst", feedTimeSinceFirst, - "feedRtpDiffSinceFirst", feedRtpDiffSinceFirst, - "feedDrift", feedDrift, - "feedDriftMs", feedDriftMs, - ) - } + r.logger.Debugw( + "sending sender report", + "highestTS", r.highestTS, + "highestTime", r.highestTime.String(), + "reportTS", nowRTP, + "reportTime", ntpTime.String(), + "rtpDiffLocal", rtpDiffLocal, + "ntpDiffLocal", ntpDiffLocal, + "rtpOffsetLocal", rtpOffsetLocal, + "timeSinceFirst", timeSinceFirst, + "rtpDiffSinceFirst", rtpDiffSinceFirst, + "drift", drift, + "driftMs", driftMs, + ) + // TODO-REMOVE-AFTER-DEBUG-END r.lastSRTime = now r.lastSRNTP = nowNTP @@ -965,15 +909,15 @@ func (r *RTPStats) SnapshotRtcpReceptionReport(ssrc uint32, proxyFracLost uint8, } var dlsr uint32 - if r.srDataExt != nil && !r.srDataExt.SenderReportData.ArrivalTime.IsZero() { - delayMS := uint32(time.Since(r.srDataExt.SenderReportData.ArrivalTime).Milliseconds()) + if r.srData != nil && !r.srData.ArrivalTime.IsZero() { + delayMS := uint32(time.Since(r.srData.ArrivalTime).Milliseconds()) dlsr = (delayMS / 1e3) << 16 dlsr |= (delayMS % 1e3) * 65536 / 1000 } lastSR := uint32(0) - if r.srDataExt != nil { - lastSR = uint32(r.srDataExt.SenderReportData.NTPTimestamp >> 16) + if r.srData != nil { + lastSR = uint32(r.srData.NTPTimestamp >> 16) } return &rtcp.ReceptionReport{ SSRC: ssrc, diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 0e6638b0e..b69f4dccb 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -1113,7 +1113,7 @@ func (d *DownTrack) CreateSenderReport() *rtcp.SenderReport { return nil } - return d.rtpStats.GetRtcpSenderReport(d.ssrc, d.receiver.GetRTCPSenderReportDataExt(d.forwarder.GetReferenceLayerSpatial())) + return d.rtpStats.GetRtcpSenderReport(d.ssrc) } func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan struct{} { diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index e6da06859..e8e8167b0 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -469,13 +469,6 @@ func (f *Forwarder) TargetLayer() buffer.VideoLayer { return f.vls.GetTarget() } -func (f *Forwarder) GetReferenceLayerSpatial() int32 { - f.lock.RLock() - defer f.lock.RUnlock() - - return f.referenceLayerSpatial -} - func (f *Forwarder) isDeficientLocked() bool { return f.lastAllocation.IsDeficient } diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 2e38540d3..5fd083bed 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -65,7 +65,6 @@ type TrackReceiver interface { GetTemporalLayerFpsForSpatial(layer int32) []float32 - GetRTCPSenderReportDataExt(layer int32) *buffer.RTCPSenderReportDataExt GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) } @@ -311,7 +310,7 @@ func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buff }) buff.OnRtcpFeedback(w.sendRTCP) buff.OnRtcpSenderReport(func(srData *buffer.RTCPSenderReportData) { - w.streamTrackerManager.SetRTCPSenderReportDataExt(layer, buff.GetSenderReportDataExt()) + w.streamTrackerManager.SetRTCPSenderReportData(layer, buff.GetSenderReportData()) w.downTrackSpreader.Broadcast(func(dt TrackSender) { _ = dt.HandleRTCPSenderReportData(w.codec.PayloadType, layer, srData) @@ -746,10 +745,6 @@ func (w *WebRTCReceiver) GetTemporalLayerFpsForSpatial(layer int32) []float32 { return b.GetTemporalLayerFpsForSpatial(layer) } -func (w *WebRTCReceiver) GetRTCPSenderReportDataExt(layer int32) *buffer.RTCPSenderReportDataExt { - return w.streamTrackerManager.GetRTCPSenderReportDataExt(layer) -} - func (w *WebRTCReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) { return w.streamTrackerManager.GetReferenceLayerRTPTimestamp(ts, layer, referenceLayer) } diff --git a/pkg/sfu/streamtrackermanager.go b/pkg/sfu/streamtrackermanager.go index 8e324e6a3..256b5c4ce 100644 --- a/pkg/sfu/streamtrackermanager.go +++ b/pkg/sfu/streamtrackermanager.go @@ -43,7 +43,7 @@ type StreamTrackerManager struct { paused bool senderReportMu sync.RWMutex - senderReports [buffer.DefaultMaxLayerSpatial + 1]*buffer.RTCPSenderReportDataExt + senderReports [buffer.DefaultMaxLayerSpatial + 1]*buffer.RTCPSenderReportData closed core.Fuse @@ -475,7 +475,7 @@ func (s *StreamTrackerManager) maxExpectedLayerFromTrackInfo() { } } -func (s *StreamTrackerManager) SetRTCPSenderReportDataExt(layer int32, senderReport *buffer.RTCPSenderReportDataExt) { +func (s *StreamTrackerManager) SetRTCPSenderReportData(layer int32, senderReport *buffer.RTCPSenderReportData) { s.senderReportMu.Lock() defer s.senderReportMu.Unlock() @@ -486,17 +486,6 @@ func (s *StreamTrackerManager) SetRTCPSenderReportDataExt(layer int32, senderRep s.senderReports[layer] = senderReport } -func (s *StreamTrackerManager) GetRTCPSenderReportDataExt(layer int32) *buffer.RTCPSenderReportDataExt { - s.senderReportMu.RLock() - defer s.senderReportMu.RUnlock() - - if layer < 0 || int(layer) >= len(s.senderReports) { - return nil - } - - return s.senderReports[layer] -} - func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) { s.senderReportMu.RLock() defer s.senderReportMu.RUnlock() @@ -509,19 +498,19 @@ func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ts uint32, layer in return ts, nil } - var srLayer *buffer.RTCPSenderReportDataExt + var srLayer *buffer.RTCPSenderReportData if int(layer) < len(s.senderReports) { srLayer = s.senderReports[layer] } - if srLayer == nil || srLayer.SenderReportData.NTPTimestamp == 0 { + if srLayer == nil || srLayer.NTPTimestamp == 0 { return 0, fmt.Errorf("layer rtcp sender report not available: %d", layer) } - var srRef *buffer.RTCPSenderReportDataExt + var srRef *buffer.RTCPSenderReportData if int(referenceLayer) < len(s.senderReports) { srRef = s.senderReports[referenceLayer] } - if srRef == nil || srRef.SenderReportData.NTPTimestamp == 0 { + if srRef == nil || srRef.NTPTimestamp == 0 { return 0, fmt.Errorf("reference layer rtcp sender report not available: %d", referenceLayer) } @@ -529,15 +518,15 @@ func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ts uint32, layer in // NOTE: It is possible that reference layer has stopped (due to dynacast/adaptive streaming OR publisher // constraints). It should be okay even if the layer has stopped for a long time when using modulo arithmetic for // RTP time stamp (uint32 arithmetic). - ntpDiff := srRef.SenderReportData.NTPTimestamp.Time().Sub(srLayer.SenderReportData.NTPTimestamp.Time()) + ntpDiff := srRef.NTPTimestamp.Time().Sub(srLayer.NTPTimestamp.Time()) rtpDiff := ntpDiff.Nanoseconds() * int64(s.clockRate) / 1e9 - normalizedTS := srLayer.SenderReportData.RTPTimestamp + uint32(rtpDiff) + normalizedTS := srLayer.RTPTimestamp + uint32(rtpDiff) // now that both RTP timestamps correspond to roughly the same NTP time, // the diff between them is the offset in RTP timestamp units between layer and referenceLayer. // Add the offset to layer's ts to map it to corresponding RTP timestamp in // the reference layer. - return ts + (srRef.SenderReportData.RTPTimestamp - normalizedTS), nil + return ts + (srRef.RTPTimestamp - normalizedTS), nil } func (s *StreamTrackerManager) GetMaxTemporalLayerSeen() int32 {