From 0354626bfc1b6aaacbf60f8aecdf719dce1507f2 Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Thu, 25 May 2023 21:55:54 +0530 Subject: [PATCH] Adjust sender report time stamp for slow publishers. (#1740) It is possible that publisher paces the media. So, RTCP sender report from publisher could be ahead of what is being fowarded by a good amount (have seen up to 2 seconds ahead). Using the forwarded time stamp for RTCP sender report in the down stream leads to jumps back and forth in the down track RTCP sender report. So, look at the publisher's RTCP sender report to check for it being ahead and use the publisher rate as a guide. --- pkg/rtc/wrappedreceiver.go | 8 + pkg/sfu/buffer/buffer.go | 6 +- pkg/sfu/buffer/rtpstats.go | 313 ++++++++++++++++++++------------ pkg/sfu/downtrack.go | 5 +- pkg/sfu/forwarder.go | 21 ++- pkg/sfu/receiver.go | 8 +- pkg/sfu/streamtrackermanager.go | 27 ++- 7 files changed, 253 insertions(+), 135 deletions(-) diff --git a/pkg/rtc/wrappedreceiver.go b/pkg/rtc/wrappedreceiver.go index 068f8b69d..84d4d9910 100644 --- a/pkg/rtc/wrappedreceiver.go +++ b/pkg/rtc/wrappedreceiver.go @@ -12,6 +12,7 @@ import ( "github.com/livekit/protocol/logger" "github.com/livekit/livekit-server/pkg/sfu" + "github.com/livekit/livekit-server/pkg/sfu/buffer" ) // wrapper around WebRTC receiver, overriding its ID @@ -288,6 +289,13 @@ func (d *DummyReceiver) GetRedReceiver() sfu.TrackReceiver { return d } +func (d *DummyReceiver) GetRTCPSenderReportData(layer int32) (*buffer.RTCPSenderReportData, *buffer.RTCPSenderReportData) { + if r, ok := d.receiver.Load().(sfu.TrackReceiver); ok { + return r.GetRTCPSenderReportData(layer) + } + return nil, nil +} + func (d *DummyReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) { if r, ok := d.receiver.Load().(sfu.TrackReceiver); ok { return r.GetReferenceLayerRTPTimestamp(ts, layer, referenceLayer) diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index 63949a52c..575f2d433 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -654,7 +654,7 @@ func (b *Buffer) SetSenderReportData(rtpTime uint32, ntpTime uint64) { srData := &RTCPSenderReportData{ RTPTimestamp: rtpTime, NTPTimestamp: mediatransportutil.NtpTime(ntpTime), - ArrivalTime: time.Now(), + At: time.Now(), } b.RLock() @@ -668,7 +668,7 @@ func (b *Buffer) SetSenderReportData(rtpTime uint32, ntpTime uint64) { } } -func (b *Buffer) GetSenderReportData() *RTCPSenderReportData { +func (b *Buffer) GetSenderReportData() (*RTCPSenderReportData, *RTCPSenderReportData) { b.RLock() defer b.RUnlock() @@ -676,7 +676,7 @@ func (b *Buffer) GetSenderReportData() *RTCPSenderReportData { return b.rtpStats.GetRtcpSenderReportData() } - return nil + return nil, nil } func (b *Buffer) SetLastFractionLostReport(lost uint8) { diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index 0aab6e16a..d650f33a0 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -25,6 +25,28 @@ const ( TooLargeOWDDelta = 400 * time.Millisecond ) +// ------------------------------------------------------- + +type driftResult struct { + timeSinceFirst time.Duration + rtpDiffSinceFirst uint64 + driftSamples int64 + driftMs float64 + sampleRate float64 +} + +func (d driftResult) String() string { + return fmt.Sprintf("time: %+v, rtp: %d, driftSamples: %d, driftMs: %.02f, sampleRate: %.02f", + d.timeSinceFirst, + d.rtpDiffSinceFirst, + d.driftSamples, + d.driftMs, + d.sampleRate, + ) +} + +// ------------------------------------------------------- + type RTPFlowState struct { HasLoss bool LossStartInclusive uint16 @@ -88,9 +110,10 @@ type SnInfo struct { } type RTCPSenderReportData struct { - RTPTimestamp uint32 - NTPTimestamp mediatransportutil.NtpTime - ArrivalTime time.Time + RTPTimestamp uint32 + RTPTimestampExt uint64 + NTPTimestamp mediatransportutil.NtpTime + At time.Time } type RTPStatsParams struct { @@ -175,10 +198,9 @@ type RTPStats struct { rtt uint32 maxRtt uint32 - srData *RTCPSenderReportData - lastSRTime time.Time - lastSRNTP mediatransportutil.NtpTime - lastSRRTP uint32 + srFirst *RTCPSenderReportData + srNewest *RTCPSenderReportData + pidController *PIDController nextSnapshotId uint32 @@ -280,15 +302,18 @@ func (r *RTPStats) Seed(from *RTPStats) { r.rtt = from.rtt r.maxRtt = from.maxRtt - if from.srData != nil { - srData := *from.srData - r.srData = &srData + if from.srFirst != nil { + srFirst := *from.srFirst + r.srFirst = &srFirst } else { - r.srData = nil + r.srFirst = nil + } + if from.srNewest != nil { + srNewest := *from.srNewest + r.srNewest = &srNewest + } else { + r.srNewest = nil } - r.lastSRTime = from.lastSRTime - r.lastSRNTP = from.lastSRNTP - r.lastSRRTP = from.lastSRRTP r.nextSnapshotId = from.nextSnapshotId for id, ss := range from.snapshots { @@ -425,12 +450,16 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa } r.highestSN = rtph.SequenceNumber - if rtph.Timestamp < r.highestTS && !first { - r.tsCycles++ - } - r.highestTS = rtph.Timestamp + if rtph.Timestamp != r.highestTS { + if rtph.Timestamp < r.highestTS && !first { + r.tsCycles++ + } + r.highestTS = rtph.Timestamp - r.highestTime = packetTime + // update only on first packet as same timestamp could be in multiple packets. + // NOTE: this may not be the first packet with this time stamp if there is packet loss. + r.highestTime = packetTime + } } if !isDuplicate { @@ -514,12 +543,15 @@ func (r *RTPStats) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt uint32 return } - rtt, err := mediatransportutil.GetRttMs(&rr, r.lastSRNTP, r.lastSRTime) - if err == nil { - isRttChanged = rtt != r.rtt - } else { - if !errors.Is(err, mediatransportutil.ErrRttNotLastSenderReport) && !errors.Is(err, mediatransportutil.ErrRttNoLastSenderReport) { - r.logger.Warnw("error getting rtt", err) + var err error + if r.srNewest != nil { + rtt, err = mediatransportutil.GetRttMs(&rr, r.srNewest.NTPTimestamp, r.srNewest.At) + if err == nil { + isRttChanged = rtt != r.rtt + } else { + if !errors.Is(err, mediatransportutil.ErrRttNotLastSenderReport) && !errors.Is(err, mediatransportutil.ErrRttNoLastSenderReport) { + r.logger.Warnw("error getting rtt", err) + } } } @@ -725,79 +757,97 @@ func (r *RTPStats) GetRtt() uint32 { } func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) { - r.lock.Lock() - defer r.lock.Unlock() - if srData == nil { - r.srData = nil return } + r.lock.Lock() + defer r.lock.Unlock() + // prevent against extreme case of anachronous sender reports - if r.srData != nil && r.srData.NTPTimestamp > srData.NTPTimestamp { + if r.srNewest != nil && r.srNewest.NTPTimestamp > srData.NTPTimestamp { r.logger.Infow( "received anachronous sender report", "current", srData.NTPTimestamp.Time(), - "last", r.srData.NTPTimestamp.Time(), + "last", r.srNewest.NTPTimestamp.Time(), ) return } // monitor and log RTP timestamp anomalies - if r.srData != nil { - ntpDiffSinceLast := srData.NTPTimestamp.Time().Sub(r.srData.NTPTimestamp.Time()) - rtpDiffSinceLast := srData.RTPTimestamp - r.srData.RTPTimestamp - arrivalDiffSinceLast := srData.ArrivalTime.Sub(r.srData.ArrivalTime) + var ntpDiffSinceLast time.Duration + var rtpDiffSinceLast uint32 + var arrivalDiffSinceLast time.Duration + var expectedTimeDiffSinceLast float64 + var reason string + if r.srNewest != nil { + ntpDiffSinceLast = srData.NTPTimestamp.Time().Sub(r.srNewest.NTPTimestamp.Time()) + rtpDiffSinceLast = srData.RTPTimestamp - r.srNewest.RTPTimestamp + arrivalDiffSinceLast = srData.At.Sub(r.srNewest.At) - expectedTimeDiffSinceLast := float64(rtpDiffSinceLast) / float64(r.params.ClockRate) + expectedTimeDiffSinceLast = float64(rtpDiffSinceLast) / float64(r.params.ClockRate) - var reason string - if (srData.RTPTimestamp - r.srData.RTPTimestamp) > (1 << 31) { - reason = "received sender report, out-of-order" + if (srData.RTPTimestamp - r.srNewest.RTPTimestamp) > (1 << 31) { + reason = "received sender report, out-of-order" // should not happen, just a sanity check } else { if math.Abs(expectedTimeDiffSinceLast-ntpDiffSinceLast.Seconds()) > 0.2 { // more than 200 ms away from expected delta reason = "received sender report, time warp" } } + } - if reason != "" { - timeSinceFirst, rtpDiffSinceFirst, drift, driftMs, sampleRate := r.getDrift() - r.logger.Infow( - reason, - "ntp", srData.NTPTimestamp.Time().String(), - "rtp", srData.RTPTimestamp, - "arrival", srData.ArrivalTime.String(), - "ntpDiffSinceLast", ntpDiffSinceLast.Seconds(), - "rtpDiffSinceLast", int32(rtpDiffSinceLast), - "arrivalDiffSinceLast", arrivalDiffSinceLast.Seconds(), - "expectedTimeDiffSinceLast", expectedTimeDiffSinceLast, - "timeSinceFirst", timeSinceFirst.Seconds(), - "rtpDiffSinceFirst", rtpDiffSinceFirst, - "drift", drift, - "driftMs", driftMs, - "sampleRate", sampleRate, - ) + cycles := uint64(0) + if r.srNewest != nil { + cycles = r.srNewest.RTPTimestampExt & 0xFF_FF_FF_FF_00_00_00_00 + if (srData.RTPTimestamp-r.srNewest.RTPTimestamp) < (1<<31) && srData.RTPTimestamp < r.srNewest.RTPTimestamp { + cycles += (1 << 32) } } srDataCopy := *srData - r.srData = &srDataCopy + srDataCopy.RTPTimestampExt = uint64(srDataCopy.RTPTimestamp) + cycles + r.srNewest = &srDataCopy + if r.srFirst == nil { + r.srFirst = &srDataCopy + } + + if reason != "" { + packetDriftResult, reportDriftResult := r.getDrift() + r.logger.Infow( + reason, + "ntp", srData.NTPTimestamp.Time().String(), + "rtp", srData.RTPTimestamp, + "arrival", srData.At.String(), + "ntpDiffSinceLast", ntpDiffSinceLast.Seconds(), + "rtpDiffSinceLast", int32(rtpDiffSinceLast), + "arrivalDiffSinceLast", arrivalDiffSinceLast.Seconds(), + "expectedTimeDiffSinceLast", expectedTimeDiffSinceLast, + "packetDrift", packetDriftResult.String(), + "reportDrift", reportDriftResult.String(), + "highestTS", r.highestTS, + "highestTime", r.highestTime.String(), + ) + } } -func (r *RTPStats) GetRtcpSenderReportData() *RTCPSenderReportData { +func (r *RTPStats) GetRtcpSenderReportData() (srFirst *RTCPSenderReportData, srNewest *RTCPSenderReportData) { r.lock.RLock() defer r.lock.RUnlock() - if r.srData == nil { - return nil + if r.srFirst != nil { + srFirstCopy := *r.srFirst + srFirst = &srFirstCopy } - srDataCopy := *r.srData - return &srDataCopy + if r.srNewest != nil { + srNewestCopy := *r.srNewest + srNewest = &srNewestCopy + } + return } -func (r *RTPStats) GetExpectedRTPTimestamp(at time.Time) (uint32, uint32, error) { +func (r *RTPStats) GetExpectedRTPTimestamp(at time.Time) (uint32, uint64, error) { r.lock.RLock() defer r.lock.RUnlock() @@ -809,9 +859,9 @@ func (r *RTPStats) GetExpectedRTPTimestamp(at time.Time) (uint32, uint32, error) expectedRTPDiff := timeDiff.Nanoseconds() * int64(r.params.ClockRate) / 1e9 expectedExtRTP := r.extStartTS + uint64(expectedRTPDiff) - minTS := r.lastSRRTP - if r.lastSRNTP == 0 { - minTS = uint32(expectedExtRTP) + minTS := ^uint64(0) + if r.srNewest != nil { + minTS = r.srNewest.RTPTimestampExt } r.logger.Debugw( "expected RTP timestamp", @@ -829,7 +879,7 @@ func (r *RTPStats) GetExpectedRTPTimestamp(at time.Time) (uint32, uint32, error) return uint32(expectedExtRTP), minTS, nil } -func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) (*rtcp.SenderReport, float64) { +func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, srFirst *RTCPSenderReportData, srNewest *RTCPSenderReportData) (*rtcp.SenderReport, float64) { r.lock.Lock() defer r.lock.Unlock() @@ -845,7 +895,27 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) (*rtcp.SenderReport, float64 timeSinceHighest := now.Sub(r.highestTime) nowRTP := r.highestTS + uint32(timeSinceHighest.Nanoseconds()*int64(r.params.ClockRate)/1e9) - rtpDiffSinceFirst := getExtTS(nowRTP, r.tsCycles) - r.extStartTS + // It is possible that publisher is pacing at a slower rate. + // That would make `highestTS` to be lagging the RTP time stamp in the RTCP Sender Report from publisher. + // Check for that and use the later time stamp if applicable. + tsCycles := r.tsCycles + if nowRTP < r.highestTS { + tsCycles++ + } + nowRTPExt := getExtTS(nowRTP, tsCycles) + if srFirst != nil && srNewest != nil && srFirst.RTPTimestamp != srNewest.RTPTimestamp { + // use incoming rate as a guide + tsf := srNewest.NTPTimestamp.Time().Sub(srFirst.NTPTimestamp.Time()) + rdsf := srNewest.RTPTimestampExt - srFirst.RTPTimestampExt + sr := float64(rdsf) / tsf.Seconds() + nowRTPExtUsingRate := r.extStartTS + uint64(sr*timeSinceFirst.Seconds()) + if nowRTPExtUsingRate > nowRTPExt { + nowRTPExt = nowRTPExtUsingRate + nowRTP = uint32(nowRTPExtUsingRate) + } + } + + rtpDiffSinceFirst := nowRTPExt - r.extStartTS rate := float64(rtpDiffSinceFirst) / timeSinceFirst.Seconds() pidOutput := r.pidController.Update( float64(r.params.ClockRate), @@ -854,49 +924,50 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) (*rtcp.SenderReport, float64 ) // monitor and log RTP timestamp anomalies - isWarped := false - if r.lastSRNTP != 0 { - ntpDiffSinceLast := nowNTP.Time().Sub(r.lastSRNTP.Time()) - rtpDiffSinceLast := nowRTP - r.lastSRRTP - departureDiffSinceLast := now.Sub(r.lastSRTime) + var ntpDiffSinceLast time.Duration + var rtpDiffSinceLast uint32 + var departureDiffSinceLast time.Duration + var expectedTimeDiffSinceLast float64 + var isWarped bool + if r.srNewest != nil { + ntpDiffSinceLast = nowNTP.Time().Sub(r.srNewest.NTPTimestamp.Time()) + rtpDiffSinceLast = nowRTP - r.srNewest.RTPTimestamp + departureDiffSinceLast = now.Sub(r.srNewest.At) - expectedTimeDiffSinceLast := float64(rtpDiffSinceLast) / float64(r.params.ClockRate) + expectedTimeDiffSinceLast = float64(rtpDiffSinceLast) / float64(r.params.ClockRate) if math.Abs(expectedTimeDiffSinceLast-ntpDiffSinceLast.Seconds()) > 0.2 { // more than 200 ms away from expected delta isWarped = true } - - if isWarped { - expectedExtRTP := r.extStartTS + uint64(timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9) - ntpDiffLocal := nowNTP.Time().Sub(r.highestTime) - rtpDiffLocal := int32(nowRTP - r.highestTS) - timeSinceFirst, rtpDiffSinceFirst, drift, driftMs, sampleRate := r.getDrift() - r.logger.Infow( - "sending sender report, time warp", - "ntp", nowNTP.Time().String(), - "rtp", nowRTP, - "expectedRTP", uint32(expectedExtRTP), - "departure", now.String(), - "ntpDiffSinceLast", ntpDiffSinceLast.Seconds(), - "rtpDiffSinceLast", rtpDiffSinceLast, - "departureDiffSinceLast", departureDiffSinceLast.Seconds(), - "expectedTimeDiffSinceLast", expectedTimeDiffSinceLast, - "timeSinceFirst", timeSinceFirst.Seconds(), - "rtpDiffSinceFirst", rtpDiffSinceFirst, - "drift", drift, - "driftMs", driftMs, - "sampleRate", sampleRate, - "highestTS", r.highestTS, - "highestTime", r.highestTime.String(), - "rtpDiffLocal", rtpDiffLocal, - "ntpDiffLocal", ntpDiffLocal, - ) - } } - r.lastSRTime = now - r.lastSRNTP = nowNTP - r.lastSRRTP = nowRTP + r.srNewest = &RTCPSenderReportData{ + NTPTimestamp: nowNTP, + RTPTimestamp: nowRTP, + RTPTimestampExt: nowRTPExt, + At: now, + } + if r.srFirst == nil { + r.srFirst = r.srNewest + } + + if isWarped { + packetDriftResult, reportDriftResult := r.getDrift() + r.logger.Infow( + "sending sender report, time warp", + "ntp", nowNTP.Time().String(), + "rtp", nowRTP, + "departure", now.String(), + "ntpDiffSinceLast", ntpDiffSinceLast.Seconds(), + "rtpDiffSinceLast", int32(rtpDiffSinceLast), + "departureDiffSinceLast", departureDiffSinceLast.Seconds(), + "expectedTimeDiffSinceLast", expectedTimeDiffSinceLast, + "packetDrift", packetDriftResult.String(), + "reportDrift", reportDriftResult.String(), + "highestTS", r.highestTS, + "highestTime", r.highestTime.String(), + ) + } return &rtcp.SenderReport{ SSRC: ssrc, @@ -940,15 +1011,15 @@ func (r *RTPStats) SnapshotRtcpReceptionReport(ssrc uint32, proxyFracLost uint8, } var dlsr uint32 - if r.srData != nil && !r.srData.ArrivalTime.IsZero() { - delayMS := uint32(time.Since(r.srData.ArrivalTime).Milliseconds()) + if r.srNewest != nil && !r.srNewest.At.IsZero() { + delayMS := uint32(time.Since(r.srNewest.At).Milliseconds()) dlsr = (delayMS / 1e3) << 16 dlsr |= (delayMS % 1e3) * 65536 / 1000 } lastSR := uint32(0) - if r.srData != nil { - lastSR = uint32(r.srData.NTPTimestamp >> 16) + if r.srNewest != nil { + lastSR = uint32(r.srNewest.NTPTimestamp >> 16) } return &rtcp.ReceptionReport{ SSRC: ssrc, @@ -1218,7 +1289,7 @@ func (r *RTPStats) ToProto() *livekit.RTPStats { jitterTime := jitter / float64(r.params.ClockRate) * 1e6 maxJitterTime := maxJitter / float64(r.params.ClockRate) * 1e6 - _, _, _, driftMs, sampleRate := r.getDrift() + packetDrift, _ := r.getDrift() p := &livekit.RTPStats{ StartTime: timestamppb.New(r.startTime), @@ -1261,8 +1332,8 @@ func (r *RTPStats) ToProto() *livekit.RTPStats { LastFir: timestamppb.New(r.lastFir), RttCurrent: r.rtt, RttMax: r.maxRtt, - DriftMs: driftMs, - SampleRate: sampleRate, + DriftMs: packetDrift.driftMs, + SampleRate: packetDrift.sampleRate, } gapsPresent := false @@ -1456,12 +1527,20 @@ func (r *RTPStats) updateJitter(rtph *rtp.Header, packetTime time.Time) { r.lastJitterRTP = rtph.Timestamp } -func (r *RTPStats) getDrift() (timeSinceFirst time.Duration, rtpDiffSinceFirst uint64, drift int64, driftMs float64, sampleRate float64) { - timeSinceFirst = r.highestTime.Sub(r.firstTime) - rtpDiffSinceFirst = getExtTS(r.highestTS, r.tsCycles) - r.extStartTS - drift = int64(rtpDiffSinceFirst - uint64(timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9)) - driftMs = (float64(drift) * 1000) / float64(r.params.ClockRate) - sampleRate = float64(rtpDiffSinceFirst) / timeSinceFirst.Seconds() +func (r *RTPStats) getDrift() (packetDrift driftResult, reportDrift driftResult) { + packetDrift.timeSinceFirst = r.highestTime.Sub(r.firstTime) + packetDrift.rtpDiffSinceFirst = getExtTS(r.highestTS, r.tsCycles) - r.extStartTS + packetDrift.driftSamples = int64(packetDrift.rtpDiffSinceFirst - uint64(packetDrift.timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9)) + packetDrift.driftMs = (float64(packetDrift.driftSamples) * 1000) / float64(r.params.ClockRate) + packetDrift.sampleRate = float64(packetDrift.rtpDiffSinceFirst) / packetDrift.timeSinceFirst.Seconds() + + if r.srFirst != nil && r.srNewest != nil && r.srFirst.RTPTimestamp != r.srNewest.RTPTimestamp { + reportDrift.timeSinceFirst = r.srNewest.NTPTimestamp.Time().Sub(r.srFirst.NTPTimestamp.Time()) + reportDrift.rtpDiffSinceFirst = r.srNewest.RTPTimestampExt - r.srFirst.RTPTimestampExt + reportDrift.driftSamples = int64(reportDrift.rtpDiffSinceFirst - uint64(reportDrift.timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9)) + reportDrift.driftMs = (float64(reportDrift.driftSamples) * 1000) / float64(r.params.ClockRate) + reportDrift.sampleRate = float64(reportDrift.rtpDiffSinceFirst) / reportDrift.timeSinceFirst.Seconds() + } return } diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index c072645c6..edd0f9149 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -1116,7 +1116,8 @@ func (d *DownTrack) CreateSenderReport() *rtcp.SenderReport { return nil } - sr, tsAdjust := d.rtpStats.GetRtcpSenderReport(d.ssrc) + srFirst, srNewest := d.receiver.GetRTCPSenderReportData(d.forwarder.GetReferenceLayerSpatial()) + sr, tsAdjust := d.rtpStats.GetRtcpSenderReport(d.ssrc, srFirst, srNewest) if d.allowTimestampAdjustment { d.forwarder.AdjustTimestamp(tsAdjust) } @@ -1636,7 +1637,7 @@ func (d *DownTrack) DebugInfo() map[string]interface{} { } } -func (d *DownTrack) getExpectedRTPTimestamp(at time.Time) (uint32, uint32, error) { +func (d *DownTrack) getExpectedRTPTimestamp(at time.Time) (uint32, uint64, error) { return d.rtpStats.GetExpectedRTPTimestamp(at) } diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index ebede6e0e..b4f373c39 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -170,7 +170,7 @@ type Forwarder struct { kind webrtc.RTPCodecType logger logger.Logger getReferenceLayerRTPTimestamp func(ts uint32, layer int32, referenceLayer int32) (uint32, error) - getExpectedRTPTimestamp func(at time.Time) (uint32, uint32, error) + getExpectedRTPTimestamp func(at time.Time) (uint32, uint64, error) muted bool pubMuted bool @@ -201,7 +201,7 @@ func NewForwarder( kind webrtc.RTPCodecType, logger logger.Logger, getReferenceLayerRTPTimestamp func(ts uint32, layer int32, referenceLayer int32) (uint32, error), - getExpectedRTPTimestamp func(at time.Time) (uint32, uint32, error), + getExpectedRTPTimestamp func(at time.Time) (uint32, uint64, error), ) *Forwarder { f := &Forwarder{ kind: kind, @@ -488,6 +488,13 @@ func (f *Forwarder) TargetLayer() buffer.VideoLayer { return f.vls.GetTarget() } +func (f *Forwarder) GetReferenceLayerSpatial() int32 { + f.lock.RLock() + defer f.lock.RUnlock() + + return f.referenceLayerSpatial +} + func (f *Forwarder) isDeficientLocked() bool { return f.lastAllocation.IsDeficient } @@ -1475,7 +1482,7 @@ func (f *Forwarder) getTranslationParamsCommon(extPkt *buffer.ExtPacket, layer i lastTS := f.rtpMunger.GetLast().LastTS refTS := lastTS expectedTS := lastTS - minTS := lastTS + minTS := uint64(lastTS) switchingAt := time.Now() if f.getReferenceLayerRTPTimestamp != nil { ts, err := f.getReferenceLayerRTPTimestamp(extPkt.Packet.Timestamp, layer, f.referenceLayerSpatial) @@ -1671,7 +1678,7 @@ func (f *Forwarder) GetSnTsForBlankFrames(frameRate uint32, numPackets int) ([]S lastTS := f.rtpMunger.GetLast().LastTS expectedTS := lastTS - minTS := lastTS + minTS := uint64(lastTS) if f.getExpectedRTPTimestamp != nil { ts, min, err := f.getExpectedRTPTimestamp(time.Now()) if err == nil { @@ -1826,7 +1833,7 @@ done: return float64(distance) / float64(maxSeenLayer.Temporal+1) } -func getNextTimestamp(lastTS uint32, refTS uint32, expectedTS uint32, minTS uint32) (uint32, string) { +func getNextTimestamp(lastTS uint32, refTS uint32, expectedTS uint32, minTS uint64) (uint32, string) { isInOrder := func(val1, val2 uint32) bool { diff := val1 - val2 return diff != 0 && diff < (1<<31) @@ -1860,8 +1867,8 @@ func getNextTimestamp(lastTS uint32, refTS uint32, expectedTS uint32, minTS uint explain = fmt.Sprintf("e < r < l, %d, %d", refTS-expectedTS, lastTS-refTS) } - if !isInOrder(nextTS, minTS) { - nextTS = minTS + 1 + if minTS != ^uint64(0) && !isInOrder(nextTS, uint32(minTS)) { + nextTS = uint32(minTS) + 1 } return nextTS, explain diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 496e90ad0..eda4f7412 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -65,6 +65,7 @@ type TrackReceiver interface { GetTemporalLayerFpsForSpatial(layer int32) []float32 + GetRTCPSenderReportData(layer int32) (*buffer.RTCPSenderReportData, *buffer.RTCPSenderReportData) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) } @@ -309,7 +310,8 @@ func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buff }) buff.OnRtcpFeedback(w.sendRTCP) buff.OnRtcpSenderReport(func(srData *buffer.RTCPSenderReportData) { - w.streamTrackerManager.SetRTCPSenderReportData(layer, buff.GetSenderReportData()) + srFirst, srNewest := buff.GetSenderReportData() + w.streamTrackerManager.SetRTCPSenderReportData(layer, srFirst, srNewest) w.downTrackSpreader.Broadcast(func(dt TrackSender) { _ = dt.HandleRTCPSenderReportData(w.codec.PayloadType, layer, srData) @@ -744,6 +746,10 @@ func (w *WebRTCReceiver) GetTemporalLayerFpsForSpatial(layer int32) []float32 { return b.GetTemporalLayerFpsForSpatial(layer) } +func (w *WebRTCReceiver) GetRTCPSenderReportData(layer int32) (*buffer.RTCPSenderReportData, *buffer.RTCPSenderReportData) { + return w.streamTrackerManager.GetRTCPSenderReportData(layer) +} + func (w *WebRTCReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) { return w.streamTrackerManager.GetReferenceLayerRTPTimestamp(ts, layer, referenceLayer) } diff --git a/pkg/sfu/streamtrackermanager.go b/pkg/sfu/streamtrackermanager.go index 6b79914f9..cdb42f4ab 100644 --- a/pkg/sfu/streamtrackermanager.go +++ b/pkg/sfu/streamtrackermanager.go @@ -24,6 +24,11 @@ type StreamTrackerManagerListener interface { OnBitrateReport(availableLayers []int32, bitrates Bitrates) } +type endsSenderReport struct { + first *buffer.RTCPSenderReportData + newest *buffer.RTCPSenderReportData +} + type StreamTrackerManager struct { logger logger.Logger trackInfo *livekit.TrackInfo @@ -43,7 +48,7 @@ type StreamTrackerManager struct { paused bool senderReportMu sync.RWMutex - senderReports [buffer.DefaultMaxLayerSpatial + 1]*buffer.RTCPSenderReportData + senderReports [buffer.DefaultMaxLayerSpatial + 1]endsSenderReport closed core.Fuse @@ -475,7 +480,7 @@ func (s *StreamTrackerManager) maxExpectedLayerFromTrackInfo() { } } -func (s *StreamTrackerManager) SetRTCPSenderReportData(layer int32, senderReport *buffer.RTCPSenderReportData) { +func (s *StreamTrackerManager) SetRTCPSenderReportData(layer int32, srFirst *buffer.RTCPSenderReportData, srNewest *buffer.RTCPSenderReportData) { s.senderReportMu.Lock() defer s.senderReportMu.Unlock() @@ -483,7 +488,19 @@ func (s *StreamTrackerManager) SetRTCPSenderReportData(layer int32, senderReport return } - s.senderReports[layer] = senderReport + s.senderReports[layer].first = srFirst + s.senderReports[layer].newest = srNewest +} + +func (s *StreamTrackerManager) GetRTCPSenderReportData(layer int32) (*buffer.RTCPSenderReportData, *buffer.RTCPSenderReportData) { + s.senderReportMu.RLock() + defer s.senderReportMu.RUnlock() + + if layer < 0 || int(layer) >= len(s.senderReports) { + return nil, nil + } + + return s.senderReports[layer].first, s.senderReports[layer].newest } func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) { @@ -502,7 +519,7 @@ func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ts uint32, layer in var srLayer *buffer.RTCPSenderReportData if int(layer) < len(s.senderReports) { - srLayer = s.senderReports[layer] + srLayer = s.senderReports[layer].newest } if srLayer == nil || srLayer.NTPTimestamp == 0 { return 0, fmt.Errorf("layer rtcp sender report not available: %d", layer) @@ -510,7 +527,7 @@ func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ts uint32, layer in var srRef *buffer.RTCPSenderReportData if int(referenceLayer) < len(s.senderReports) { - srRef = s.senderReports[referenceLayer] + srRef = s.senderReports[referenceLayer].newest } if srRef == nil || srRef.NTPTimestamp == 0 { return 0, fmt.Errorf("reference layer rtcp sender report not available: %d", referenceLayer)