diff --git a/pkg/rtc/wrappedreceiver.go b/pkg/rtc/wrappedreceiver.go index 068f8b69d..84d4d9910 100644 --- a/pkg/rtc/wrappedreceiver.go +++ b/pkg/rtc/wrappedreceiver.go @@ -12,6 +12,7 @@ import ( "github.com/livekit/protocol/logger" "github.com/livekit/livekit-server/pkg/sfu" + "github.com/livekit/livekit-server/pkg/sfu/buffer" ) // wrapper around WebRTC receiver, overriding its ID @@ -288,6 +289,13 @@ func (d *DummyReceiver) GetRedReceiver() sfu.TrackReceiver { return d } +func (d *DummyReceiver) GetRTCPSenderReportData(layer int32) (*buffer.RTCPSenderReportData, *buffer.RTCPSenderReportData) { + if r, ok := d.receiver.Load().(sfu.TrackReceiver); ok { + return r.GetRTCPSenderReportData(layer) + } + return nil, nil +} + func (d *DummyReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) { if r, ok := d.receiver.Load().(sfu.TrackReceiver); ok { return r.GetReferenceLayerRTPTimestamp(ts, layer, referenceLayer) diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index 63949a52c..575f2d433 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -654,7 +654,7 @@ func (b *Buffer) SetSenderReportData(rtpTime uint32, ntpTime uint64) { srData := &RTCPSenderReportData{ RTPTimestamp: rtpTime, NTPTimestamp: mediatransportutil.NtpTime(ntpTime), - ArrivalTime: time.Now(), + At: time.Now(), } b.RLock() @@ -668,7 +668,7 @@ func (b *Buffer) SetSenderReportData(rtpTime uint32, ntpTime uint64) { } } -func (b *Buffer) GetSenderReportData() *RTCPSenderReportData { +func (b *Buffer) GetSenderReportData() (*RTCPSenderReportData, *RTCPSenderReportData) { b.RLock() defer b.RUnlock() @@ -676,7 +676,7 @@ func (b *Buffer) GetSenderReportData() *RTCPSenderReportData { return b.rtpStats.GetRtcpSenderReportData() } - return nil + return nil, nil } func (b *Buffer) SetLastFractionLostReport(lost uint8) { diff --git a/pkg/sfu/buffer/rtpstats.go b/pkg/sfu/buffer/rtpstats.go index 0aab6e16a..d650f33a0 100644 --- a/pkg/sfu/buffer/rtpstats.go +++ b/pkg/sfu/buffer/rtpstats.go @@ -25,6 +25,28 @@ const ( TooLargeOWDDelta = 400 * time.Millisecond ) +// ------------------------------------------------------- + +type driftResult struct { + timeSinceFirst time.Duration + rtpDiffSinceFirst uint64 + driftSamples int64 + driftMs float64 + sampleRate float64 +} + +func (d driftResult) String() string { + return fmt.Sprintf("time: %+v, rtp: %d, driftSamples: %d, driftMs: %.02f, sampleRate: %.02f", + d.timeSinceFirst, + d.rtpDiffSinceFirst, + d.driftSamples, + d.driftMs, + d.sampleRate, + ) +} + +// ------------------------------------------------------- + type RTPFlowState struct { HasLoss bool LossStartInclusive uint16 @@ -88,9 +110,10 @@ type SnInfo struct { } type RTCPSenderReportData struct { - RTPTimestamp uint32 - NTPTimestamp mediatransportutil.NtpTime - ArrivalTime time.Time + RTPTimestamp uint32 + RTPTimestampExt uint64 + NTPTimestamp mediatransportutil.NtpTime + At time.Time } type RTPStatsParams struct { @@ -175,10 +198,9 @@ type RTPStats struct { rtt uint32 maxRtt uint32 - srData *RTCPSenderReportData - lastSRTime time.Time - lastSRNTP mediatransportutil.NtpTime - lastSRRTP uint32 + srFirst *RTCPSenderReportData + srNewest *RTCPSenderReportData + pidController *PIDController nextSnapshotId uint32 @@ -280,15 +302,18 @@ func (r *RTPStats) Seed(from *RTPStats) { r.rtt = from.rtt r.maxRtt = from.maxRtt - if from.srData != nil { - srData := *from.srData - r.srData = &srData + if from.srFirst != nil { + srFirst := *from.srFirst + r.srFirst = &srFirst } else { - r.srData = nil + r.srFirst = nil + } + if from.srNewest != nil { + srNewest := *from.srNewest + r.srNewest = &srNewest + } else { + r.srNewest = nil } - r.lastSRTime = from.lastSRTime - r.lastSRNTP = from.lastSRNTP - r.lastSRRTP = from.lastSRRTP r.nextSnapshotId = from.nextSnapshotId for id, ss := range from.snapshots { @@ -425,12 +450,16 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa } r.highestSN = rtph.SequenceNumber - if rtph.Timestamp < r.highestTS && !first { - r.tsCycles++ - } - r.highestTS = rtph.Timestamp + if rtph.Timestamp != r.highestTS { + if rtph.Timestamp < r.highestTS && !first { + r.tsCycles++ + } + r.highestTS = rtph.Timestamp - r.highestTime = packetTime + // update only on first packet as same timestamp could be in multiple packets. + // NOTE: this may not be the first packet with this time stamp if there is packet loss. + r.highestTime = packetTime + } } if !isDuplicate { @@ -514,12 +543,15 @@ func (r *RTPStats) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt uint32 return } - rtt, err := mediatransportutil.GetRttMs(&rr, r.lastSRNTP, r.lastSRTime) - if err == nil { - isRttChanged = rtt != r.rtt - } else { - if !errors.Is(err, mediatransportutil.ErrRttNotLastSenderReport) && !errors.Is(err, mediatransportutil.ErrRttNoLastSenderReport) { - r.logger.Warnw("error getting rtt", err) + var err error + if r.srNewest != nil { + rtt, err = mediatransportutil.GetRttMs(&rr, r.srNewest.NTPTimestamp, r.srNewest.At) + if err == nil { + isRttChanged = rtt != r.rtt + } else { + if !errors.Is(err, mediatransportutil.ErrRttNotLastSenderReport) && !errors.Is(err, mediatransportutil.ErrRttNoLastSenderReport) { + r.logger.Warnw("error getting rtt", err) + } } } @@ -725,79 +757,97 @@ func (r *RTPStats) GetRtt() uint32 { } func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) { - r.lock.Lock() - defer r.lock.Unlock() - if srData == nil { - r.srData = nil return } + r.lock.Lock() + defer r.lock.Unlock() + // prevent against extreme case of anachronous sender reports - if r.srData != nil && r.srData.NTPTimestamp > srData.NTPTimestamp { + if r.srNewest != nil && r.srNewest.NTPTimestamp > srData.NTPTimestamp { r.logger.Infow( "received anachronous sender report", "current", srData.NTPTimestamp.Time(), - "last", r.srData.NTPTimestamp.Time(), + "last", r.srNewest.NTPTimestamp.Time(), ) return } // monitor and log RTP timestamp anomalies - if r.srData != nil { - ntpDiffSinceLast := srData.NTPTimestamp.Time().Sub(r.srData.NTPTimestamp.Time()) - rtpDiffSinceLast := srData.RTPTimestamp - r.srData.RTPTimestamp - arrivalDiffSinceLast := srData.ArrivalTime.Sub(r.srData.ArrivalTime) + var ntpDiffSinceLast time.Duration + var rtpDiffSinceLast uint32 + var arrivalDiffSinceLast time.Duration + var expectedTimeDiffSinceLast float64 + var reason string + if r.srNewest != nil { + ntpDiffSinceLast = srData.NTPTimestamp.Time().Sub(r.srNewest.NTPTimestamp.Time()) + rtpDiffSinceLast = srData.RTPTimestamp - r.srNewest.RTPTimestamp + arrivalDiffSinceLast = srData.At.Sub(r.srNewest.At) - expectedTimeDiffSinceLast := float64(rtpDiffSinceLast) / float64(r.params.ClockRate) + expectedTimeDiffSinceLast = float64(rtpDiffSinceLast) / float64(r.params.ClockRate) - var reason string - if (srData.RTPTimestamp - r.srData.RTPTimestamp) > (1 << 31) { - reason = "received sender report, out-of-order" + if (srData.RTPTimestamp - r.srNewest.RTPTimestamp) > (1 << 31) { + reason = "received sender report, out-of-order" // should not happen, just a sanity check } else { if math.Abs(expectedTimeDiffSinceLast-ntpDiffSinceLast.Seconds()) > 0.2 { // more than 200 ms away from expected delta reason = "received sender report, time warp" } } + } - if reason != "" { - timeSinceFirst, rtpDiffSinceFirst, drift, driftMs, sampleRate := r.getDrift() - r.logger.Infow( - reason, - "ntp", srData.NTPTimestamp.Time().String(), - "rtp", srData.RTPTimestamp, - "arrival", srData.ArrivalTime.String(), - "ntpDiffSinceLast", ntpDiffSinceLast.Seconds(), - "rtpDiffSinceLast", int32(rtpDiffSinceLast), - "arrivalDiffSinceLast", arrivalDiffSinceLast.Seconds(), - "expectedTimeDiffSinceLast", expectedTimeDiffSinceLast, - "timeSinceFirst", timeSinceFirst.Seconds(), - "rtpDiffSinceFirst", rtpDiffSinceFirst, - "drift", drift, - "driftMs", driftMs, - "sampleRate", sampleRate, - ) + cycles := uint64(0) + if r.srNewest != nil { + cycles = r.srNewest.RTPTimestampExt & 0xFF_FF_FF_FF_00_00_00_00 + if (srData.RTPTimestamp-r.srNewest.RTPTimestamp) < (1<<31) && srData.RTPTimestamp < r.srNewest.RTPTimestamp { + cycles += (1 << 32) } } srDataCopy := *srData - r.srData = &srDataCopy + srDataCopy.RTPTimestampExt = uint64(srDataCopy.RTPTimestamp) + cycles + r.srNewest = &srDataCopy + if r.srFirst == nil { + r.srFirst = &srDataCopy + } + + if reason != "" { + packetDriftResult, reportDriftResult := r.getDrift() + r.logger.Infow( + reason, + "ntp", srData.NTPTimestamp.Time().String(), + "rtp", srData.RTPTimestamp, + "arrival", srData.At.String(), + "ntpDiffSinceLast", ntpDiffSinceLast.Seconds(), + "rtpDiffSinceLast", int32(rtpDiffSinceLast), + "arrivalDiffSinceLast", arrivalDiffSinceLast.Seconds(), + "expectedTimeDiffSinceLast", expectedTimeDiffSinceLast, + "packetDrift", packetDriftResult.String(), + "reportDrift", reportDriftResult.String(), + "highestTS", r.highestTS, + "highestTime", r.highestTime.String(), + ) + } } -func (r *RTPStats) GetRtcpSenderReportData() *RTCPSenderReportData { +func (r *RTPStats) GetRtcpSenderReportData() (srFirst *RTCPSenderReportData, srNewest *RTCPSenderReportData) { r.lock.RLock() defer r.lock.RUnlock() - if r.srData == nil { - return nil + if r.srFirst != nil { + srFirstCopy := *r.srFirst + srFirst = &srFirstCopy } - srDataCopy := *r.srData - return &srDataCopy + if r.srNewest != nil { + srNewestCopy := *r.srNewest + srNewest = &srNewestCopy + } + return } -func (r *RTPStats) GetExpectedRTPTimestamp(at time.Time) (uint32, uint32, error) { +func (r *RTPStats) GetExpectedRTPTimestamp(at time.Time) (uint32, uint64, error) { r.lock.RLock() defer r.lock.RUnlock() @@ -809,9 +859,9 @@ func (r *RTPStats) GetExpectedRTPTimestamp(at time.Time) (uint32, uint32, error) expectedRTPDiff := timeDiff.Nanoseconds() * int64(r.params.ClockRate) / 1e9 expectedExtRTP := r.extStartTS + uint64(expectedRTPDiff) - minTS := r.lastSRRTP - if r.lastSRNTP == 0 { - minTS = uint32(expectedExtRTP) + minTS := ^uint64(0) + if r.srNewest != nil { + minTS = r.srNewest.RTPTimestampExt } r.logger.Debugw( "expected RTP timestamp", @@ -829,7 +879,7 @@ func (r *RTPStats) GetExpectedRTPTimestamp(at time.Time) (uint32, uint32, error) return uint32(expectedExtRTP), minTS, nil } -func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) (*rtcp.SenderReport, float64) { +func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, srFirst *RTCPSenderReportData, srNewest *RTCPSenderReportData) (*rtcp.SenderReport, float64) { r.lock.Lock() defer r.lock.Unlock() @@ -845,7 +895,27 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) (*rtcp.SenderReport, float64 timeSinceHighest := now.Sub(r.highestTime) nowRTP := r.highestTS + uint32(timeSinceHighest.Nanoseconds()*int64(r.params.ClockRate)/1e9) - rtpDiffSinceFirst := getExtTS(nowRTP, r.tsCycles) - r.extStartTS + // It is possible that publisher is pacing at a slower rate. + // That would make `highestTS` to be lagging the RTP time stamp in the RTCP Sender Report from publisher. + // Check for that and use the later time stamp if applicable. + tsCycles := r.tsCycles + if nowRTP < r.highestTS { + tsCycles++ + } + nowRTPExt := getExtTS(nowRTP, tsCycles) + if srFirst != nil && srNewest != nil && srFirst.RTPTimestamp != srNewest.RTPTimestamp { + // use incoming rate as a guide + tsf := srNewest.NTPTimestamp.Time().Sub(srFirst.NTPTimestamp.Time()) + rdsf := srNewest.RTPTimestampExt - srFirst.RTPTimestampExt + sr := float64(rdsf) / tsf.Seconds() + nowRTPExtUsingRate := r.extStartTS + uint64(sr*timeSinceFirst.Seconds()) + if nowRTPExtUsingRate > nowRTPExt { + nowRTPExt = nowRTPExtUsingRate + nowRTP = uint32(nowRTPExtUsingRate) + } + } + + rtpDiffSinceFirst := nowRTPExt - r.extStartTS rate := float64(rtpDiffSinceFirst) / timeSinceFirst.Seconds() pidOutput := r.pidController.Update( float64(r.params.ClockRate), @@ -854,49 +924,50 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) (*rtcp.SenderReport, float64 ) // monitor and log RTP timestamp anomalies - isWarped := false - if r.lastSRNTP != 0 { - ntpDiffSinceLast := nowNTP.Time().Sub(r.lastSRNTP.Time()) - rtpDiffSinceLast := nowRTP - r.lastSRRTP - departureDiffSinceLast := now.Sub(r.lastSRTime) + var ntpDiffSinceLast time.Duration + var rtpDiffSinceLast uint32 + var departureDiffSinceLast time.Duration + var expectedTimeDiffSinceLast float64 + var isWarped bool + if r.srNewest != nil { + ntpDiffSinceLast = nowNTP.Time().Sub(r.srNewest.NTPTimestamp.Time()) + rtpDiffSinceLast = nowRTP - r.srNewest.RTPTimestamp + departureDiffSinceLast = now.Sub(r.srNewest.At) - expectedTimeDiffSinceLast := float64(rtpDiffSinceLast) / float64(r.params.ClockRate) + expectedTimeDiffSinceLast = float64(rtpDiffSinceLast) / float64(r.params.ClockRate) if math.Abs(expectedTimeDiffSinceLast-ntpDiffSinceLast.Seconds()) > 0.2 { // more than 200 ms away from expected delta isWarped = true } - - if isWarped { - expectedExtRTP := r.extStartTS + uint64(timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9) - ntpDiffLocal := nowNTP.Time().Sub(r.highestTime) - rtpDiffLocal := int32(nowRTP - r.highestTS) - timeSinceFirst, rtpDiffSinceFirst, drift, driftMs, sampleRate := r.getDrift() - r.logger.Infow( - "sending sender report, time warp", - "ntp", nowNTP.Time().String(), - "rtp", nowRTP, - "expectedRTP", uint32(expectedExtRTP), - "departure", now.String(), - "ntpDiffSinceLast", ntpDiffSinceLast.Seconds(), - "rtpDiffSinceLast", rtpDiffSinceLast, - "departureDiffSinceLast", departureDiffSinceLast.Seconds(), - "expectedTimeDiffSinceLast", expectedTimeDiffSinceLast, - "timeSinceFirst", timeSinceFirst.Seconds(), - "rtpDiffSinceFirst", rtpDiffSinceFirst, - "drift", drift, - "driftMs", driftMs, - "sampleRate", sampleRate, - "highestTS", r.highestTS, - "highestTime", r.highestTime.String(), - "rtpDiffLocal", rtpDiffLocal, - "ntpDiffLocal", ntpDiffLocal, - ) - } } - r.lastSRTime = now - r.lastSRNTP = nowNTP - r.lastSRRTP = nowRTP + r.srNewest = &RTCPSenderReportData{ + NTPTimestamp: nowNTP, + RTPTimestamp: nowRTP, + RTPTimestampExt: nowRTPExt, + At: now, + } + if r.srFirst == nil { + r.srFirst = r.srNewest + } + + if isWarped { + packetDriftResult, reportDriftResult := r.getDrift() + r.logger.Infow( + "sending sender report, time warp", + "ntp", nowNTP.Time().String(), + "rtp", nowRTP, + "departure", now.String(), + "ntpDiffSinceLast", ntpDiffSinceLast.Seconds(), + "rtpDiffSinceLast", int32(rtpDiffSinceLast), + "departureDiffSinceLast", departureDiffSinceLast.Seconds(), + "expectedTimeDiffSinceLast", expectedTimeDiffSinceLast, + "packetDrift", packetDriftResult.String(), + "reportDrift", reportDriftResult.String(), + "highestTS", r.highestTS, + "highestTime", r.highestTime.String(), + ) + } return &rtcp.SenderReport{ SSRC: ssrc, @@ -940,15 +1011,15 @@ func (r *RTPStats) SnapshotRtcpReceptionReport(ssrc uint32, proxyFracLost uint8, } var dlsr uint32 - if r.srData != nil && !r.srData.ArrivalTime.IsZero() { - delayMS := uint32(time.Since(r.srData.ArrivalTime).Milliseconds()) + if r.srNewest != nil && !r.srNewest.At.IsZero() { + delayMS := uint32(time.Since(r.srNewest.At).Milliseconds()) dlsr = (delayMS / 1e3) << 16 dlsr |= (delayMS % 1e3) * 65536 / 1000 } lastSR := uint32(0) - if r.srData != nil { - lastSR = uint32(r.srData.NTPTimestamp >> 16) + if r.srNewest != nil { + lastSR = uint32(r.srNewest.NTPTimestamp >> 16) } return &rtcp.ReceptionReport{ SSRC: ssrc, @@ -1218,7 +1289,7 @@ func (r *RTPStats) ToProto() *livekit.RTPStats { jitterTime := jitter / float64(r.params.ClockRate) * 1e6 maxJitterTime := maxJitter / float64(r.params.ClockRate) * 1e6 - _, _, _, driftMs, sampleRate := r.getDrift() + packetDrift, _ := r.getDrift() p := &livekit.RTPStats{ StartTime: timestamppb.New(r.startTime), @@ -1261,8 +1332,8 @@ func (r *RTPStats) ToProto() *livekit.RTPStats { LastFir: timestamppb.New(r.lastFir), RttCurrent: r.rtt, RttMax: r.maxRtt, - DriftMs: driftMs, - SampleRate: sampleRate, + DriftMs: packetDrift.driftMs, + SampleRate: packetDrift.sampleRate, } gapsPresent := false @@ -1456,12 +1527,20 @@ func (r *RTPStats) updateJitter(rtph *rtp.Header, packetTime time.Time) { r.lastJitterRTP = rtph.Timestamp } -func (r *RTPStats) getDrift() (timeSinceFirst time.Duration, rtpDiffSinceFirst uint64, drift int64, driftMs float64, sampleRate float64) { - timeSinceFirst = r.highestTime.Sub(r.firstTime) - rtpDiffSinceFirst = getExtTS(r.highestTS, r.tsCycles) - r.extStartTS - drift = int64(rtpDiffSinceFirst - uint64(timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9)) - driftMs = (float64(drift) * 1000) / float64(r.params.ClockRate) - sampleRate = float64(rtpDiffSinceFirst) / timeSinceFirst.Seconds() +func (r *RTPStats) getDrift() (packetDrift driftResult, reportDrift driftResult) { + packetDrift.timeSinceFirst = r.highestTime.Sub(r.firstTime) + packetDrift.rtpDiffSinceFirst = getExtTS(r.highestTS, r.tsCycles) - r.extStartTS + packetDrift.driftSamples = int64(packetDrift.rtpDiffSinceFirst - uint64(packetDrift.timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9)) + packetDrift.driftMs = (float64(packetDrift.driftSamples) * 1000) / float64(r.params.ClockRate) + packetDrift.sampleRate = float64(packetDrift.rtpDiffSinceFirst) / packetDrift.timeSinceFirst.Seconds() + + if r.srFirst != nil && r.srNewest != nil && r.srFirst.RTPTimestamp != r.srNewest.RTPTimestamp { + reportDrift.timeSinceFirst = r.srNewest.NTPTimestamp.Time().Sub(r.srFirst.NTPTimestamp.Time()) + reportDrift.rtpDiffSinceFirst = r.srNewest.RTPTimestampExt - r.srFirst.RTPTimestampExt + reportDrift.driftSamples = int64(reportDrift.rtpDiffSinceFirst - uint64(reportDrift.timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9)) + reportDrift.driftMs = (float64(reportDrift.driftSamples) * 1000) / float64(r.params.ClockRate) + reportDrift.sampleRate = float64(reportDrift.rtpDiffSinceFirst) / reportDrift.timeSinceFirst.Seconds() + } return } diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index c072645c6..edd0f9149 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -1116,7 +1116,8 @@ func (d *DownTrack) CreateSenderReport() *rtcp.SenderReport { return nil } - sr, tsAdjust := d.rtpStats.GetRtcpSenderReport(d.ssrc) + srFirst, srNewest := d.receiver.GetRTCPSenderReportData(d.forwarder.GetReferenceLayerSpatial()) + sr, tsAdjust := d.rtpStats.GetRtcpSenderReport(d.ssrc, srFirst, srNewest) if d.allowTimestampAdjustment { d.forwarder.AdjustTimestamp(tsAdjust) } @@ -1636,7 +1637,7 @@ func (d *DownTrack) DebugInfo() map[string]interface{} { } } -func (d *DownTrack) getExpectedRTPTimestamp(at time.Time) (uint32, uint32, error) { +func (d *DownTrack) getExpectedRTPTimestamp(at time.Time) (uint32, uint64, error) { return d.rtpStats.GetExpectedRTPTimestamp(at) } diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index ebede6e0e..b4f373c39 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -170,7 +170,7 @@ type Forwarder struct { kind webrtc.RTPCodecType logger logger.Logger getReferenceLayerRTPTimestamp func(ts uint32, layer int32, referenceLayer int32) (uint32, error) - getExpectedRTPTimestamp func(at time.Time) (uint32, uint32, error) + getExpectedRTPTimestamp func(at time.Time) (uint32, uint64, error) muted bool pubMuted bool @@ -201,7 +201,7 @@ func NewForwarder( kind webrtc.RTPCodecType, logger logger.Logger, getReferenceLayerRTPTimestamp func(ts uint32, layer int32, referenceLayer int32) (uint32, error), - getExpectedRTPTimestamp func(at time.Time) (uint32, uint32, error), + getExpectedRTPTimestamp func(at time.Time) (uint32, uint64, error), ) *Forwarder { f := &Forwarder{ kind: kind, @@ -488,6 +488,13 @@ func (f *Forwarder) TargetLayer() buffer.VideoLayer { return f.vls.GetTarget() } +func (f *Forwarder) GetReferenceLayerSpatial() int32 { + f.lock.RLock() + defer f.lock.RUnlock() + + return f.referenceLayerSpatial +} + func (f *Forwarder) isDeficientLocked() bool { return f.lastAllocation.IsDeficient } @@ -1475,7 +1482,7 @@ func (f *Forwarder) getTranslationParamsCommon(extPkt *buffer.ExtPacket, layer i lastTS := f.rtpMunger.GetLast().LastTS refTS := lastTS expectedTS := lastTS - minTS := lastTS + minTS := uint64(lastTS) switchingAt := time.Now() if f.getReferenceLayerRTPTimestamp != nil { ts, err := f.getReferenceLayerRTPTimestamp(extPkt.Packet.Timestamp, layer, f.referenceLayerSpatial) @@ -1671,7 +1678,7 @@ func (f *Forwarder) GetSnTsForBlankFrames(frameRate uint32, numPackets int) ([]S lastTS := f.rtpMunger.GetLast().LastTS expectedTS := lastTS - minTS := lastTS + minTS := uint64(lastTS) if f.getExpectedRTPTimestamp != nil { ts, min, err := f.getExpectedRTPTimestamp(time.Now()) if err == nil { @@ -1826,7 +1833,7 @@ done: return float64(distance) / float64(maxSeenLayer.Temporal+1) } -func getNextTimestamp(lastTS uint32, refTS uint32, expectedTS uint32, minTS uint32) (uint32, string) { +func getNextTimestamp(lastTS uint32, refTS uint32, expectedTS uint32, minTS uint64) (uint32, string) { isInOrder := func(val1, val2 uint32) bool { diff := val1 - val2 return diff != 0 && diff < (1<<31) @@ -1860,8 +1867,8 @@ func getNextTimestamp(lastTS uint32, refTS uint32, expectedTS uint32, minTS uint explain = fmt.Sprintf("e < r < l, %d, %d", refTS-expectedTS, lastTS-refTS) } - if !isInOrder(nextTS, minTS) { - nextTS = minTS + 1 + if minTS != ^uint64(0) && !isInOrder(nextTS, uint32(minTS)) { + nextTS = uint32(minTS) + 1 } return nextTS, explain diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 496e90ad0..eda4f7412 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -65,6 +65,7 @@ type TrackReceiver interface { GetTemporalLayerFpsForSpatial(layer int32) []float32 + GetRTCPSenderReportData(layer int32) (*buffer.RTCPSenderReportData, *buffer.RTCPSenderReportData) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) } @@ -309,7 +310,8 @@ func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buff }) buff.OnRtcpFeedback(w.sendRTCP) buff.OnRtcpSenderReport(func(srData *buffer.RTCPSenderReportData) { - w.streamTrackerManager.SetRTCPSenderReportData(layer, buff.GetSenderReportData()) + srFirst, srNewest := buff.GetSenderReportData() + w.streamTrackerManager.SetRTCPSenderReportData(layer, srFirst, srNewest) w.downTrackSpreader.Broadcast(func(dt TrackSender) { _ = dt.HandleRTCPSenderReportData(w.codec.PayloadType, layer, srData) @@ -744,6 +746,10 @@ func (w *WebRTCReceiver) GetTemporalLayerFpsForSpatial(layer int32) []float32 { return b.GetTemporalLayerFpsForSpatial(layer) } +func (w *WebRTCReceiver) GetRTCPSenderReportData(layer int32) (*buffer.RTCPSenderReportData, *buffer.RTCPSenderReportData) { + return w.streamTrackerManager.GetRTCPSenderReportData(layer) +} + func (w *WebRTCReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) { return w.streamTrackerManager.GetReferenceLayerRTPTimestamp(ts, layer, referenceLayer) } diff --git a/pkg/sfu/streamtrackermanager.go b/pkg/sfu/streamtrackermanager.go index 6b79914f9..cdb42f4ab 100644 --- a/pkg/sfu/streamtrackermanager.go +++ b/pkg/sfu/streamtrackermanager.go @@ -24,6 +24,11 @@ type StreamTrackerManagerListener interface { OnBitrateReport(availableLayers []int32, bitrates Bitrates) } +type endsSenderReport struct { + first *buffer.RTCPSenderReportData + newest *buffer.RTCPSenderReportData +} + type StreamTrackerManager struct { logger logger.Logger trackInfo *livekit.TrackInfo @@ -43,7 +48,7 @@ type StreamTrackerManager struct { paused bool senderReportMu sync.RWMutex - senderReports [buffer.DefaultMaxLayerSpatial + 1]*buffer.RTCPSenderReportData + senderReports [buffer.DefaultMaxLayerSpatial + 1]endsSenderReport closed core.Fuse @@ -475,7 +480,7 @@ func (s *StreamTrackerManager) maxExpectedLayerFromTrackInfo() { } } -func (s *StreamTrackerManager) SetRTCPSenderReportData(layer int32, senderReport *buffer.RTCPSenderReportData) { +func (s *StreamTrackerManager) SetRTCPSenderReportData(layer int32, srFirst *buffer.RTCPSenderReportData, srNewest *buffer.RTCPSenderReportData) { s.senderReportMu.Lock() defer s.senderReportMu.Unlock() @@ -483,7 +488,19 @@ func (s *StreamTrackerManager) SetRTCPSenderReportData(layer int32, senderReport return } - s.senderReports[layer] = senderReport + s.senderReports[layer].first = srFirst + s.senderReports[layer].newest = srNewest +} + +func (s *StreamTrackerManager) GetRTCPSenderReportData(layer int32) (*buffer.RTCPSenderReportData, *buffer.RTCPSenderReportData) { + s.senderReportMu.RLock() + defer s.senderReportMu.RUnlock() + + if layer < 0 || int(layer) >= len(s.senderReports) { + return nil, nil + } + + return s.senderReports[layer].first, s.senderReports[layer].newest } func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) { @@ -502,7 +519,7 @@ func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ts uint32, layer in var srLayer *buffer.RTCPSenderReportData if int(layer) < len(s.senderReports) { - srLayer = s.senderReports[layer] + srLayer = s.senderReports[layer].newest } if srLayer == nil || srLayer.NTPTimestamp == 0 { return 0, fmt.Errorf("layer rtcp sender report not available: %d", layer) @@ -510,7 +527,7 @@ func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ts uint32, layer in var srRef *buffer.RTCPSenderReportData if int(referenceLayer) < len(s.senderReports) { - srRef = s.senderReports[referenceLayer] + srRef = s.senderReports[referenceLayer].newest } if srRef == nil || srRef.NTPTimestamp == 0 { return 0, fmt.Errorf("reference layer rtcp sender report not available: %d", referenceLayer)