Change lock scope of access to RTCP sender report data. (#1473)

* Change lock scope of access to RTCP sender report data.

Forwarder calls back to get time stamp offset.
Holding buffer lock is a much bigger scoped lock.

Reduce lock scope and cache latest sender report under its own lock.
And use that cache when calculating time stamp offset.

* move sr cache to stream tracker manager for re-use in relay

* cache before spread
This commit is contained in:
Raja Subramanian
2023-02-27 12:28:25 +05:30
committed by GitHub
parent 207c97c4b9
commit 8e6bcdaffe
4 changed files with 77 additions and 56 deletions
+3 -3
View File
@@ -690,14 +690,14 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) {
return
}
// Low pass filter one-way-delay (owd) to normalize time stamp to local time base when sending RTCP sender report.
// Forwarding RTCP sender report would be ideal. But, there are a couple of issues with that
// Low pass filter one-way-delay (owd) to normalize time stamp to local time base when sending RTCP Sender Report.
// Forwarding RTCP Sender Report would be ideal. But, there are a couple of issues with that
// 1. Senders could have different clocks.
// 2. Adjusting to current time as required by RTCP spec.
// By normalizing to local clock, these issues can be addressed. However, normalization is not straightforward
// as it is not possible to know the propagation delay and processing delay at both ends (send side processing
// after time stamping the RTCP packet and receive side processing after reading packet off the wire).
// Smoothed version of OWD is used to
// Smoothed version of OWD is used to alleviate irregularities somewhat.
owd := srData.ArrivalTime.Sub(srData.NTPTimestamp.Time())
if r.srDataExt != nil {
prevOwd := r.srDataExt.SenderReportData.ArrivalTime.Sub(r.srDataExt.SenderReportData.NTPTimestamp.Time())
+2 -2
View File
@@ -1356,9 +1356,9 @@ func (f *Forwarder) getTranslationParamsCommon(extPkt *buffer.ExtPacket, layer i
f.logger.Infow("reference timestamp out-of-order, using default", "lastTS", last.LastTS, "refTS", refTS, "td", int32(td))
td = 1
}
} else {
f.logger.Infow("reference timestamp get error, using default", "error", err)
}
} else {
f.logger.Infow("reference timestamp not available, using default")
}
f.rtpMunger.UpdateSnTsOffsets(extPkt, 1, td)
+4 -51
View File
@@ -2,7 +2,6 @@ package sfu
import (
"errors"
"fmt"
"io"
"strings"
"sync"
@@ -323,6 +322,8 @@ func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buff
})
buff.OnRtcpFeedback(w.sendRTCP)
buff.OnRtcpSenderReport(func(srData *buffer.RTCPSenderReportData) {
w.streamTrackerManager.SetRTCPSenderReportDataExt(layer, buff.GetSenderReportDataExt())
w.downTrackSpreader.Broadcast(func(dt TrackSender) {
_ = dt.HandleRTCPSenderReportData(w.codec.PayloadType, layer, srData)
})
@@ -708,57 +709,9 @@ func (w *WebRTCReceiver) GetTemporalLayerFpsForSpatial(layer int32) []float32 {
}
func (w *WebRTCReceiver) GetRTCPSenderReportDataExt(layer int32) *buffer.RTCPSenderReportDataExt {
w.bufferMu.RLock()
defer w.bufferMu.RUnlock()
if layer == InvalidLayerSpatial {
return nil
}
buffer := w.getBufferLocked(layer)
if buffer == nil {
return nil
}
return buffer.GetSenderReportDataExt()
return w.streamTrackerManager.GetRTCPSenderReportDataExt(layer)
}
func (w *WebRTCReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) {
w.bufferMu.RLock()
defer w.bufferMu.RUnlock()
if layer == referenceLayer {
return ts, nil
}
bLayer := w.getBufferLocked(layer)
if bLayer == nil {
return 0, fmt.Errorf("invalid layer: %d", layer)
}
srLayer := bLayer.GetSenderReportDataExt()
if srLayer == nil || srLayer.SenderReportData.NTPTimestamp == 0 {
return 0, fmt.Errorf("layer rtcp sender report not available: %d", layer)
}
bReferenceLayer := w.getBufferLocked(referenceLayer)
if bReferenceLayer == nil {
return 0, fmt.Errorf("invalid reference layer: %d", referenceLayer)
}
srRef := bReferenceLayer.GetSenderReportDataExt()
if srRef == nil || srRef.SenderReportData.NTPTimestamp == 0 {
return 0, fmt.Errorf("reference layer rtcp sender report not available: %d", referenceLayer)
}
// line up the RTP time stamps using NTP time of most recent sender report of layer and referenceLayer
// NOTE: It is possible that reference layer has stopped (due to dynacast/adaptive streaming OR publisher
// constraints). It should be okay even if the layer has stopped for a long time when using modulo arithmetic for
// RTP time stamp (uint32 arithmetic).
ntpDiff := float64(int64(srRef.SenderReportData.NTPTimestamp-srLayer.SenderReportData.NTPTimestamp)) / float64(1<<32)
normalizedTS := srLayer.SenderReportData.RTPTimestamp + uint32(ntpDiff*float64(w.codec.ClockRate))
// now that both RTP timestamps correspond to roughly the same NTP time,
// the diff between them is the offset in RTP timestamp units between layer and referenceLayer.
// Add the offset to layer's ts to map it to corresponding RTP timestamp in
// the reference layer.
return ts + (srRef.SenderReportData.RTPTimestamp - normalizedTS), nil
return w.streamTrackerManager.GetReferenceLayerRTPTimestamp(ts, layer, referenceLayer)
}
+68
View File
@@ -1,6 +1,7 @@
package sfu
import (
"fmt"
"sort"
"sync"
@@ -28,6 +29,9 @@ type StreamTrackerManager struct {
maxExpectedLayer int32
paused bool
senderReportMu sync.RWMutex
senderReports [DefaultMaxLayerSpatial + 1]*buffer.RTCPSenderReportDataExt
onAvailableLayersChanged func()
onBitrateAvailabilityChanged func()
onMaxPublishedLayerChanged func(maxPublishedLayer int32)
@@ -455,3 +459,67 @@ func (s *StreamTrackerManager) maxExpectedLayerFromTrackInfo() {
}
}
}
func (s *StreamTrackerManager) SetRTCPSenderReportDataExt(layer int32, senderReport *buffer.RTCPSenderReportDataExt) {
s.senderReportMu.Lock()
defer s.senderReportMu.Unlock()
if layer < 0 || int(layer) >= len(s.senderReports) {
return
}
s.senderReports[layer] = senderReport
}
func (s *StreamTrackerManager) GetRTCPSenderReportDataExt(layer int32) *buffer.RTCPSenderReportDataExt {
s.senderReportMu.RLock()
defer s.senderReportMu.RUnlock()
if layer < 0 || int(layer) >= len(s.senderReports) {
return nil
}
return s.senderReports[layer]
}
func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) {
s.senderReportMu.RLock()
defer s.senderReportMu.RUnlock()
if layer < 0 || referenceLayer < 0 {
return 0, fmt.Errorf("invalid layer, target: %d, reference: %d", layer, referenceLayer)
}
if layer == referenceLayer {
return ts, nil
}
var srLayer *buffer.RTCPSenderReportDataExt
if int(layer) < len(s.senderReports) {
srLayer = s.senderReports[layer]
}
if srLayer == nil || srLayer.SenderReportData.NTPTimestamp == 0 {
return 0, fmt.Errorf("layer rtcp sender report not available: %d", layer)
}
var srRef *buffer.RTCPSenderReportDataExt
if int(referenceLayer) < len(s.senderReports) {
srRef = s.senderReports[referenceLayer]
}
if srRef == nil || srRef.SenderReportData.NTPTimestamp == 0 {
return 0, fmt.Errorf("reference layer rtcp sender report not available: %d", referenceLayer)
}
// line up the RTP time stamps using NTP time of most recent sender report of layer and referenceLayer
// NOTE: It is possible that reference layer has stopped (due to dynacast/adaptive streaming OR publisher
// constraints). It should be okay even if the layer has stopped for a long time when using modulo arithmetic for
// RTP time stamp (uint32 arithmetic).
ntpDiff := float64(int64(srRef.SenderReportData.NTPTimestamp-srLayer.SenderReportData.NTPTimestamp)) / float64(1<<32)
normalizedTS := srLayer.SenderReportData.RTPTimestamp + uint32(ntpDiff*float64(s.clockRate))
// now that both RTP timestamps correspond to roughly the same NTP time,
// the diff between them is the offset in RTP timestamp units between layer and referenceLayer.
// Add the offset to layer's ts to map it to corresponding RTP timestamp in
// the reference layer.
return ts + (srRef.SenderReportData.RTPTimestamp - normalizedTS), nil
}