diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index 62163322c..f0749f5c7 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -690,14 +690,14 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) { return } - // Low pass filter one-way-delay (owd) to normalize time stamp to local time base when sending RTCP sender report. - // Forwarding RTCP sender report would be ideal. But, there are a couple of issues with that + // Low pass filter one-way-delay (owd) to normalize time stamp to local time base when sending RTCP Sender Report. + // Forwarding RTCP Sender Report would be ideal. But, there are a couple of issues with that // 1. Senders could have different clocks. // 2. Adjusting to current time as required by RTCP spec. // By normalizing to local clock, these issues can be addressed. However, normalization is not straightforward // as it is not possible to know the propagation delay and processing delay at both ends (send side processing // after time stamping the RTCP packet and receive side processing after reading packet off the wire). - // Smoothed version of OWD is used to + // Smoothed version of OWD is used to alleviate irregularities somewhat. owd := srData.ArrivalTime.Sub(srData.NTPTimestamp.Time()) if r.srDataExt != nil { prevOwd := r.srDataExt.SenderReportData.ArrivalTime.Sub(r.srDataExt.SenderReportData.NTPTimestamp.Time()) diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 99bcb2848..dfc33f25b 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -1356,9 +1356,9 @@ func (f *Forwarder) getTranslationParamsCommon(extPkt *buffer.ExtPacket, layer i f.logger.Infow("reference timestamp out-of-order, using default", "lastTS", last.LastTS, "refTS", refTS, "td", int32(td)) td = 1 } + } else { + f.logger.Infow("reference timestamp get error, using default", "error", err) } - } else { - f.logger.Infow("reference timestamp not available, using default") } f.rtpMunger.UpdateSnTsOffsets(extPkt, 1, td) diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index bd9b417cf..1a5f0b2b0 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -2,7 +2,6 @@ package sfu import ( "errors" - "fmt" "io" "strings" "sync" @@ -323,6 +322,8 @@ func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buff }) buff.OnRtcpFeedback(w.sendRTCP) buff.OnRtcpSenderReport(func(srData *buffer.RTCPSenderReportData) { + w.streamTrackerManager.SetRTCPSenderReportDataExt(layer, buff.GetSenderReportDataExt()) + w.downTrackSpreader.Broadcast(func(dt TrackSender) { _ = dt.HandleRTCPSenderReportData(w.codec.PayloadType, layer, srData) }) @@ -708,57 +709,9 @@ func (w *WebRTCReceiver) GetTemporalLayerFpsForSpatial(layer int32) []float32 { } func (w *WebRTCReceiver) GetRTCPSenderReportDataExt(layer int32) *buffer.RTCPSenderReportDataExt { - w.bufferMu.RLock() - defer w.bufferMu.RUnlock() - - if layer == InvalidLayerSpatial { - return nil - } - - buffer := w.getBufferLocked(layer) - if buffer == nil { - return nil - } - - return buffer.GetSenderReportDataExt() + return w.streamTrackerManager.GetRTCPSenderReportDataExt(layer) } func (w *WebRTCReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) { - w.bufferMu.RLock() - defer w.bufferMu.RUnlock() - - if layer == referenceLayer { - return ts, nil - } - - bLayer := w.getBufferLocked(layer) - if bLayer == nil { - return 0, fmt.Errorf("invalid layer: %d", layer) - } - srLayer := bLayer.GetSenderReportDataExt() - if srLayer == nil || srLayer.SenderReportData.NTPTimestamp == 0 { - return 0, fmt.Errorf("layer rtcp sender report not available: %d", layer) - } - - bReferenceLayer := w.getBufferLocked(referenceLayer) - if bReferenceLayer == nil { - return 0, fmt.Errorf("invalid reference layer: %d", referenceLayer) - } - srRef := bReferenceLayer.GetSenderReportDataExt() - if srRef == nil || srRef.SenderReportData.NTPTimestamp == 0 { - return 0, fmt.Errorf("reference layer rtcp sender report not available: %d", referenceLayer) - } - - // line up the RTP time stamps using NTP time of most recent sender report of layer and referenceLayer - // NOTE: It is possible that reference layer has stopped (due to dynacast/adaptive streaming OR publisher - // constraints). It should be okay even if the layer has stopped for a long time when using modulo arithmetic for - // RTP time stamp (uint32 arithmetic). - ntpDiff := float64(int64(srRef.SenderReportData.NTPTimestamp-srLayer.SenderReportData.NTPTimestamp)) / float64(1<<32) - normalizedTS := srLayer.SenderReportData.RTPTimestamp + uint32(ntpDiff*float64(w.codec.ClockRate)) - - // now that both RTP timestamps correspond to roughly the same NTP time, - // the diff between them is the offset in RTP timestamp units between layer and referenceLayer. - // Add the offset to layer's ts to map it to corresponding RTP timestamp in - // the reference layer. - return ts + (srRef.SenderReportData.RTPTimestamp - normalizedTS), nil + return w.streamTrackerManager.GetReferenceLayerRTPTimestamp(ts, layer, referenceLayer) } diff --git a/pkg/sfu/streamtrackermanager.go b/pkg/sfu/streamtrackermanager.go index aff6a0411..af7741d49 100644 --- a/pkg/sfu/streamtrackermanager.go +++ b/pkg/sfu/streamtrackermanager.go @@ -1,6 +1,7 @@ package sfu import ( + "fmt" "sort" "sync" @@ -28,6 +29,9 @@ type StreamTrackerManager struct { maxExpectedLayer int32 paused bool + senderReportMu sync.RWMutex + senderReports [DefaultMaxLayerSpatial + 1]*buffer.RTCPSenderReportDataExt + onAvailableLayersChanged func() onBitrateAvailabilityChanged func() onMaxPublishedLayerChanged func(maxPublishedLayer int32) @@ -455,3 +459,67 @@ func (s *StreamTrackerManager) maxExpectedLayerFromTrackInfo() { } } } + +func (s *StreamTrackerManager) SetRTCPSenderReportDataExt(layer int32, senderReport *buffer.RTCPSenderReportDataExt) { + s.senderReportMu.Lock() + defer s.senderReportMu.Unlock() + + if layer < 0 || int(layer) >= len(s.senderReports) { + return + } + + s.senderReports[layer] = senderReport +} + +func (s *StreamTrackerManager) GetRTCPSenderReportDataExt(layer int32) *buffer.RTCPSenderReportDataExt { + s.senderReportMu.RLock() + defer s.senderReportMu.RUnlock() + + if layer < 0 || int(layer) >= len(s.senderReports) { + return nil + } + + return s.senderReports[layer] +} + +func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) { + s.senderReportMu.RLock() + defer s.senderReportMu.RUnlock() + + if layer < 0 || referenceLayer < 0 { + return 0, fmt.Errorf("invalid layer, target: %d, reference: %d", layer, referenceLayer) + } + + if layer == referenceLayer { + return ts, nil + } + + var srLayer *buffer.RTCPSenderReportDataExt + if int(layer) < len(s.senderReports) { + srLayer = s.senderReports[layer] + } + if srLayer == nil || srLayer.SenderReportData.NTPTimestamp == 0 { + return 0, fmt.Errorf("layer rtcp sender report not available: %d", layer) + } + + var srRef *buffer.RTCPSenderReportDataExt + if int(referenceLayer) < len(s.senderReports) { + srRef = s.senderReports[referenceLayer] + } + if srRef == nil || srRef.SenderReportData.NTPTimestamp == 0 { + return 0, fmt.Errorf("reference layer rtcp sender report not available: %d", referenceLayer) + } + + // line up the RTP time stamps using NTP time of most recent sender report of layer and referenceLayer + // NOTE: It is possible that reference layer has stopped (due to dynacast/adaptive streaming OR publisher + // constraints). It should be okay even if the layer has stopped for a long time when using modulo arithmetic for + // RTP time stamp (uint32 arithmetic). + ntpDiff := float64(int64(srRef.SenderReportData.NTPTimestamp-srLayer.SenderReportData.NTPTimestamp)) / float64(1<<32) + normalizedTS := srLayer.SenderReportData.RTPTimestamp + uint32(ntpDiff*float64(s.clockRate)) + + // now that both RTP timestamps correspond to roughly the same NTP time, + // the diff between them is the offset in RTP timestamp units between layer and referenceLayer. + // Add the offset to layer's ts to map it to corresponding RTP timestamp in + // the reference layer. + return ts + (srRef.SenderReportData.RTPTimestamp - normalizedTS), nil +}