diff --git a/pkg/rtc/wrappedreceiver.go b/pkg/rtc/wrappedreceiver.go index 366eb9274..b02708d12 100644 --- a/pkg/rtc/wrappedreceiver.go +++ b/pkg/rtc/wrappedreceiver.go @@ -12,6 +12,7 @@ import ( "github.com/livekit/protocol/logger" "github.com/livekit/livekit-server/pkg/sfu" + "github.com/livekit/livekit-server/pkg/sfu/buffer" ) // wrapper around WebRTC receiver, overriding its ID @@ -287,3 +288,17 @@ func (d *DummyReceiver) GetPrimaryReceiverForRed() sfu.TrackReceiver { func (d *DummyReceiver) GetRedReceiver() sfu.TrackReceiver { return d } + +func (d *DummyReceiver) GetRTCPSenderReportData(layer int32) *buffer.RTCPSenderReportData { + if r, ok := d.receiver.Load().(sfu.TrackReceiver); ok { + return r.GetRTCPSenderReportData(layer) + } + return nil +} + +func (d *DummyReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) { + if r, ok := d.receiver.Load().(sfu.TrackReceiver); ok { + return r.GetReferenceLayerRTPTimestamp(ts, layer, referenceLayer) + } + return 0, errors.New("receiver not available") +} diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index 2e30a242d..02b1b059b 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -83,9 +83,10 @@ type Buffer struct { lastFractionLostToReport uint8 // Last fraction lost from subscribers, should report to publisher; Audio only // callbacks - onClose func() - onRtcpFeedback func([]rtcp.Packet) - onFpsChanged func() + onClose func() + onRtcpFeedback func([]rtcp.Packet) + onRtcpSenderReport func(*RTCPSenderReportData) + onFpsChanged func() // logger logger logger.Logger @@ -615,14 +616,32 @@ func (b *Buffer) buildReceptionReport() *rtcp.ReceptionReport { } func (b *Buffer) SetSenderReportData(rtpTime uint32, ntpTime uint64) { + srData := &RTCPSenderReportData{ + RTPTimestamp: rtpTime, + NTPTimestamp: mediatransportutil.NtpTime(ntpTime), + ArrivalTime: time.Now(), + } + + b.RLock() + if b.rtpStats != nil { + b.rtpStats.SetRtcpSenderReportData(srData) + } + b.RUnlock() + + if b.onRtcpSenderReport != nil { + b.onRtcpSenderReport(srData) + } +} + +func (b *Buffer) GetSenderReportData() *RTCPSenderReportData { b.RLock() defer b.RUnlock() - if b.rtpStats == nil { - return + if b.rtpStats != nil { + return b.rtpStats.GetRtcpSenderReportData() } - b.rtpStats.SetRtcpSenderReportData(rtpTime, mediatransportutil.NtpTime(ntpTime), time.Now()) + return nil } func (b *Buffer) SetLastFractionLostReport(lost uint8) { @@ -664,6 +683,10 @@ func (b *Buffer) OnRtcpFeedback(fn func(fb []rtcp.Packet)) { b.onRtcpFeedback = fn } +func (b *Buffer) OnRtcpSenderReport(fn func(srData *RTCPSenderReportData)) { + b.onRtcpSenderReport = fn +} + // GetMediaSSRC returns the associated SSRC of the RTP stream func (b *Buffer) GetMediaSSRC() uint32 { return b.mediaSSRC diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index 10995f9cc..0fe81096a 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -81,6 +81,12 @@ type SnInfo struct { marker bool } +type RTCPSenderReportData struct { + RTPTimestamp uint32 + NTPTimestamp mediatransportutil.NtpTime + ArrivalTime time.Time +} + type RTPStatsParams struct { ClockRate uint32 IsReceiverReportDriven bool @@ -158,9 +164,7 @@ type RTPStats struct { rtt uint32 maxRtt uint32 - rtpSR uint32 - ntpSR mediatransportutil.NtpTime - arrivalSR int64 + srData *RTCPSenderReportData nextSnapshotId uint32 snapshots map[uint32]*Snapshot @@ -248,9 +252,15 @@ func (r *RTPStats) Seed(from *RTPStats) { r.rtt = from.rtt r.maxRtt = from.maxRtt - r.rtpSR = from.rtpSR - r.ntpSR = from.ntpSR - r.arrivalSR = from.arrivalSR + if from.srData != nil { + r.srData = &RTCPSenderReportData{ + RTPTimestamp: from.srData.RTPTimestamp, + NTPTimestamp: from.srData.NTPTimestamp, + ArrivalTime: from.srData.ArrivalTime, + } + } else { + r.srData = nil + } r.nextSnapshotId = from.nextSnapshotId for id, ss := range from.snapshots { @@ -649,16 +659,38 @@ func (r *RTPStats) GetRtt() uint32 { return r.rtt } -func (r *RTPStats) SetRtcpSenderReportData(rtpTS uint32, ntpTS mediatransportutil.NtpTime, arrival time.Time) { +func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) { r.lock.Lock() defer r.lock.Unlock() - r.rtpSR = rtpTS - r.ntpSR = ntpTS - r.arrivalSR = arrival.UnixNano() + if srData == nil { + r.srData = nil + return + } + + r.srData = &RTCPSenderReportData{ + RTPTimestamp: srData.RTPTimestamp, + NTPTimestamp: srData.NTPTimestamp, + ArrivalTime: srData.ArrivalTime, + } } -func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) *rtcp.SenderReport { +func (r *RTPStats) GetRtcpSenderReportData() *RTCPSenderReportData { + r.lock.Lock() + defer r.lock.Unlock() + + if r.srData == nil { + return nil + } + + return &RTCPSenderReportData{ + RTPTimestamp: r.srData.RTPTimestamp, + NTPTimestamp: r.srData.NTPTimestamp, + ArrivalTime: r.srData.ArrivalTime, + } +} + +func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, srData *RTCPSenderReportData) *rtcp.SenderReport { r.lock.RLock() defer r.lock.RUnlock() @@ -666,9 +698,33 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) *rtcp.SenderReport { return nil } - now := time.Now() - nowNTP := mediatransportutil.ToNtpTime(now) - nowRTP := r.highestTS + uint32((now.UnixNano()-r.highestTime)*int64(r.params.ClockRate)/1e9) + var nowNTP mediatransportutil.NtpTime + var nowRTP uint32 + if srData == nil || srData.NTPTimestamp == 0 || srData.ArrivalTime.IsZero() { + r.params.Logger.Infow("reference layer sender report not available") + } else { + // NTP timestamp in sender report could have a different base, i. e. it may not be wall clock time at the time of send. + // So, do not compare local NTP to what is received from remote side. Record receive time locally and do a difference + // using local time now (i. e. same time base) and add the difference to remote NTP to get the current time in remote + // NTP time base. + timeSinceLastSR := time.Since(srData.ArrivalTime) + nowNTP = mediatransportutil.ToNtpTime(srData.NTPTimestamp.Time().Add(timeSinceLastSR)) + nowRTP = srData.RTPTimestamp + uint32(timeSinceLastSR.Milliseconds()*int64(r.params.ClockRate)/1000) + if nowRTP-r.highestTS > (1 << 31) { + r.params.Logger.Infow( + "reference layer sender report could not be used", + "nowRTP", nowRTP, + "highestTS", r.highestTS, + "timeSinceLastSR", timeSinceLastSR, + ) + nowNTP = 0 // reset to force calculation using highest send time + } + } + if nowNTP == 0 { + now := time.Now() + nowNTP = mediatransportutil.ToNtpTime(now) + nowRTP = r.highestTS + uint32((now.UnixNano()-r.highestTime)*int64(r.params.ClockRate)/1e9) + } return &rtcp.SenderReport{ SSRC: ssrc, @@ -721,8 +777,8 @@ func (r *RTPStats) SnapshotRtcpReceptionReport(ssrc uint32, proxyFracLost uint8, } var dlsr uint32 - if r.arrivalSR != 0 { - delayMS := uint32((time.Now().UnixNano() - r.arrivalSR) / 1e6) + if r.srData != nil && !r.srData.ArrivalTime.IsZero() { + delayMS := uint32(time.Since(r.srData.ArrivalTime).Milliseconds()) dlsr = (delayMS / 1e3) << 16 dlsr |= (delayMS % 1e3) * 65536 / 1000 } @@ -733,13 +789,17 @@ func (r *RTPStats) SnapshotRtcpReceptionReport(ssrc uint32, proxyFracLost uint8, jitter = r.jitterOverridden } + lastSR := uint32(0) + if r.srData != nil { + lastSR = uint32(r.srData.NTPTimestamp >> 16) + } return &rtcp.ReceptionReport{ SSRC: ssrc, FractionLost: fracLost, TotalLost: r.packetsLost, LastSequenceNumber: now.extStartSN, Jitter: uint32(jitter), - LastSenderReport: uint32(r.ntpSR >> 16), + LastSenderReport: lastSR, Delay: dlsr, } } diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 5aabf2e69..e9f34c508 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -36,6 +36,7 @@ type TrackSender interface { ID() string SubscriberID() livekit.ParticipantID TrackInfoAvailable() + HandleRTCPSenderReportData(payloadType webrtc.PayloadType, layer int32, srData *buffer.RTCPSenderReportData) error } const ( @@ -248,7 +249,7 @@ func NewDownTrack( kind: kind, codec: codecs[0].RTPCodecCapability, } - d.forwarder = NewForwarder(d.kind, d.logger) + d.forwarder = NewForwarder(d.kind, d.logger, d.receiver.GetReferenceLayerRTPTimestamp) d.connectionStats = connectionquality.NewConnectionStats(connectionquality.ConnectionStatsParams{ MimeType: codecs[0].MimeType, // LK-TODO have to notify on codec change @@ -974,7 +975,7 @@ func (d *DownTrack) CreateSenderReport() *rtcp.SenderReport { return nil } - return d.rtpStats.GetRtcpSenderReport(d.ssrc) + return d.rtpStats.GetRtcpSenderReport(d.ssrc, d.receiver.GetRTCPSenderReportData(d.forwarder.GetReferenceLayerSpatial())) } func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan struct{} { @@ -1523,3 +1524,7 @@ func (d *DownTrack) sendPaddingOnMute() { time.Sleep(paddingOnMuteInterval) } } + +func (d *DownTrack) HandleRTCPSenderReportData(_payloadType webrtc.PayloadType, _layer int32, _srData *buffer.RTCPSenderReportData) error { + return nil +} diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index cc1b136e9..b7f7c0391 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -169,30 +169,33 @@ var ( // ------------------------------------------------------------------- type ForwarderState struct { - Started bool - LastTSCalc int64 - RTP RTPMungerState - VP8 VP8MungerState + Started bool + ReferenceLayerSpatial int32 + LastTSCalc int64 + RTP RTPMungerState + VP8 VP8MungerState } func (f ForwarderState) String() string { - return fmt.Sprintf("ForwarderState{started: %v, lTSCalc: %d, rtp: %s, vp8: %s}", - f.Started, f.LastTSCalc, f.RTP.String(), f.VP8.String()) + return fmt.Sprintf("ForwarderState{started: %v, ref: %d, lTSCalc: %d, rtp: %s, vp8: %s}", + f.Started, f.ReferenceLayerSpatial, f.LastTSCalc, f.RTP.String(), f.VP8.String()) } // ------------------------------------------------------------------- type Forwarder struct { - lock sync.RWMutex - codec webrtc.RTPCodecCapability - kind webrtc.RTPCodecType - logger logger.Logger + lock sync.RWMutex + codec webrtc.RTPCodecCapability + kind webrtc.RTPCodecType + logger logger.Logger + getReferenceLayerRTPTimestamp func(ts uint32, layer int32, referenceLayer int32) (uint32, error) muted bool - started bool - lastSSRC uint32 - lTSCalc int64 + started bool + lastSSRC uint32 + lTSCalc int64 + referenceLayerSpatial int32 maxLayers VideoLayers currentLayers VideoLayers @@ -213,10 +216,17 @@ type Forwarder struct { ddLayerSelector *DDVideoLayerSelector } -func NewForwarder(kind webrtc.RTPCodecType, logger logger.Logger) *Forwarder { +func NewForwarder( + kind webrtc.RTPCodecType, + logger logger.Logger, + getReferenceLayerRTPTimestamp func(ts uint32, layer int32, referenceLayer int32) (uint32, error), +) *Forwarder { f := &Forwarder{ - kind: kind, - logger: logger, + kind: kind, + logger: logger, + getReferenceLayerRTPTimestamp: getReferenceLayerRTPTimestamp, + + referenceLayerSpatial: InvalidLayerSpatial, // start off with nothing, let streamallocator set things currentLayers: InvalidLayers, @@ -265,9 +275,10 @@ func (f *Forwarder) GetState() ForwarderState { } state := ForwarderState{ - Started: f.started, - LastTSCalc: f.lTSCalc, - RTP: f.rtpMunger.GetLast(), + Started: f.started, + ReferenceLayerSpatial: f.referenceLayerSpatial, + LastTSCalc: f.lTSCalc, + RTP: f.rtpMunger.GetLast(), } if f.vp8Munger != nil { @@ -292,6 +303,7 @@ func (f *Forwarder) SeedState(state ForwarderState) { } f.started = true + f.referenceLayerSpatial = state.ReferenceLayerSpatial } func (f *Forwarder) Mute(muted bool) (bool, VideoLayers) { @@ -369,6 +381,13 @@ func (f *Forwarder) TargetLayers() VideoLayers { return f.targetLayers } +func (f *Forwarder) GetReferenceLayerSpatial() int32 { + f.lock.RLock() + defer f.lock.RUnlock() + + return f.referenceLayerSpatial +} + func (f *Forwarder) GetForwardingStatus() ForwardingStatus { f.lock.RLock() defer f.lock.RUnlock() @@ -1362,7 +1381,7 @@ func (f *Forwarder) GetTranslationParams(extPkt *buffer.ExtPacket, layer int32) switch f.kind { case webrtc.RTPCodecTypeAudio: - return f.getTranslationParamsAudio(extPkt) + return f.getTranslationParamsAudio(extPkt, layer) case webrtc.RTPCodecTypeVideo: return f.getTranslationParamsVideo(extPkt, layer) } @@ -1371,37 +1390,42 @@ func (f *Forwarder) GetTranslationParams(extPkt *buffer.ExtPacket, layer int32) } // should be called with lock held -func (f *Forwarder) getTranslationParamsCommon(extPkt *buffer.ExtPacket, tp *TranslationParams) (*TranslationParams, error) { +func (f *Forwarder) getTranslationParamsCommon(extPkt *buffer.ExtPacket, layer int32, tp *TranslationParams) (*TranslationParams, error) { if f.lastSSRC != extPkt.Packet.SSRC { if !f.started { f.started = true + f.referenceLayerSpatial = layer f.rtpMunger.SetLastSnTs(extPkt) if f.vp8Munger != nil { f.vp8Munger.SetLast(extPkt) } } else { - // LK-TODO-START - // The below offset calculation is not technically correct. - // Timestamps based on the system time of an intermediate box like - // SFU is not going to be accurate. Packets arrival/processing - // are subject to vagaries of network delays, SFU processing etc. - // But, the correct way is a lot harder. Will have to - // look at RTCP SR to get timestamps and align (and figure out alignment - // of layers and use that during layer switch in simulcast case). - // That can get tricky. Given the complexity of that approach, maybe - // this is just fine till it is not :-). - // LK-TODO-END - // Compute how much time passed between the old RTP extPkt // and the current packet, and fix timestamp on source change - tDiffMs := (extPkt.Arrival - f.lTSCalc) / 1e6 - if tDiffMs < 0 { - tDiffMs = 0 + var td uint32 + if f.getReferenceLayerRTPTimestamp != nil { + refTS, err := f.getReferenceLayerRTPTimestamp(extPkt.Packet.Timestamp, layer, f.referenceLayerSpatial) + if err == nil { + last := f.rtpMunger.GetLast() + td = refTS - last.LastTS + if td > (1 << 31) { + f.logger.Infow("reference timestamp out-of-order", "lastTS", last.LastTS, "refTS", refTS, "td", td) + td = 0 // reset to force arrival time based calculation + } + } } - td := uint32(tDiffMs * int64(f.codec.ClockRate) / 1000) + if td == 0 { - td = 1 + tDiffMs := (extPkt.Arrival - f.lTSCalc) / 1e6 + if tDiffMs < 0 { + tDiffMs = 0 + } + td = uint32(tDiffMs * int64(f.codec.ClockRate) / 1000) + if td == 0 { + td = 1 + } } + f.rtpMunger.UpdateSnTsOffsets(extPkt, 1, td) if f.vp8Munger != nil { f.vp8Munger.UpdateOffsets(extPkt) @@ -1435,8 +1459,8 @@ func (f *Forwarder) getTranslationParamsCommon(extPkt *buffer.ExtPacket, tp *Tra } // should be called with lock held -func (f *Forwarder) getTranslationParamsAudio(extPkt *buffer.ExtPacket) (*TranslationParams, error) { - return f.getTranslationParamsCommon(extPkt, nil) +func (f *Forwarder) getTranslationParamsAudio(extPkt *buffer.ExtPacket, layer int32) (*TranslationParams, error) { + return f.getTranslationParamsCommon(extPkt, layer, nil) } // should be called with lock held @@ -1508,7 +1532,7 @@ func (f *Forwarder) getTranslationParamsVideo(extPkt *buffer.ExtPacket, layer in return tp, nil } - _, err := f.getTranslationParamsCommon(extPkt, tp) + _, err := f.getTranslationParamsCommon(extPkt, layer, tp) if tp.shouldDrop || f.vp8Munger == nil || len(extPkt.Packet.Payload) == 0 { return tp, err } diff --git a/pkg/sfu/forwarder_test.go b/pkg/sfu/forwarder_test.go index 62dedd1dc..ae4acfe53 100644 --- a/pkg/sfu/forwarder_test.go +++ b/pkg/sfu/forwarder_test.go @@ -18,7 +18,7 @@ func disable(f *Forwarder) { } func newForwarder(codec webrtc.RTPCodecCapability, kind webrtc.RTPCodecType) *Forwarder { - f := NewForwarder(kind, logger.GetLogger()) + f := NewForwarder(kind, logger.GetLogger(), nil) f.DetermineCodec(codec) return f } diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 772af1ca9..be2301bc1 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -2,6 +2,7 @@ package sfu import ( "errors" + "fmt" "io" "strings" "sync" @@ -62,6 +63,9 @@ type TrackReceiver interface { GetRedReceiver() TrackReceiver GetTemporalLayerFpsForSpatial(layer int32) []float32 + + GetRTCPSenderReportData(layer int32) *buffer.RTCPSenderReportData + GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) } // WebRTCReceiver receives a media track @@ -310,6 +314,11 @@ func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buff SmoothIntervals: w.audioConfig.SmoothIntervals, }) buff.OnRtcpFeedback(w.sendRTCP) + buff.OnRtcpSenderReport(func(srData *buffer.RTCPSenderReportData) { + w.downTrackSpreader.Broadcast(func(dt TrackSender) { + _ = dt.HandleRTCPSenderReportData(w.codec.PayloadType, layer, srData) + }) + }) var duration time.Duration switch layer { @@ -670,3 +679,52 @@ func (w *WebRTCReceiver) GetTemporalLayerFpsForSpatial(layer int32) []float32 { } return w.getBuffer(layer).GetTemporalLayerFpsForSpatial(layer) } + +func (w *WebRTCReceiver) GetRTCPSenderReportData(layer int32) *buffer.RTCPSenderReportData { + w.bufferMu.RLock() + defer w.bufferMu.RUnlock() + + if layer == InvalidLayerSpatial || int(layer) >= len(w.buffers) { + return nil + } + + return w.buffers[layer].GetSenderReportData() +} + +func (w *WebRTCReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) { + w.bufferMu.RLock() + defer w.bufferMu.RUnlock() + + if layer == referenceLayer { + return ts, nil + } + + if layer == InvalidLayerSpatial || int(layer) >= len(w.buffers) { + return 0, fmt.Errorf("invalid layer: %d", layer) + } + srLayer := w.buffers[layer].GetSenderReportData() + if srLayer == nil || srLayer.NTPTimestamp == 0 { + return 0, fmt.Errorf("layer rtcp sender report not available: %d", layer) + } + + if referenceLayer == InvalidLayerSpatial || int(referenceLayer) >= len(w.buffers) { + return 0, fmt.Errorf("invalid reference layer: %d", referenceLayer) + } + srRef := w.buffers[referenceLayer].GetSenderReportData() + if srRef == nil || srRef.NTPTimestamp == 0 { + return 0, fmt.Errorf("reference layer rtcp sender report not available: %d", referenceLayer) + } + + // line up the RTP time stamps using NTP time of most recent sender report of layer and referenceLayer + // NOTE: It is possible that reference layer has stopped (due to dynacast/adaptive streaming OR publisher + // constraints). It should be okay even if the layer has stopped for a long time when using modulo arithmetic for + // RTP time stamp (uint32 arithmetic). + ntpDiff := float64(int64(srRef.NTPTimestamp-srLayer.NTPTimestamp)) / float64(1<<32) + normalizedTS := srLayer.RTPTimestamp + uint32(ntpDiff*float64(w.codec.ClockRate)) + + // now that both RTP timestamps correspond to roughly the same NTP time, + // the diff between them is the offset in RTP timestamp units between layer and referenceLayer. + // Add the offset to layer's ts to map it to corresponding RTP timestamp in + // the reference layer. + return ts + (srRef.RTPTimestamp - normalizedTS), nil +}