Adjust sender report time stamp for slow publishers. (#1740)

It is possible that publisher paces the media.
So, RTCP sender report from publisher could be ahead of
what is being fowarded by a good amount (have seen up to 2 seconds
ahead). Using the forwarded time stamp for RTCP sender report
in the down stream leads to jumps back and forth in the down track
RTCP sender report.

So, look at the publisher's RTCP sender report to check for it being
ahead and use the publisher rate as a guide.
This commit is contained in:
Raja Subramanian
2023-05-25 21:55:54 +05:30
committed by GitHub
parent 11c5737e04
commit 0354626bfc
7 changed files with 253 additions and 135 deletions

View File

@@ -12,6 +12,7 @@ import (
"github.com/livekit/protocol/logger"
"github.com/livekit/livekit-server/pkg/sfu"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
)
// wrapper around WebRTC receiver, overriding its ID
@@ -288,6 +289,13 @@ func (d *DummyReceiver) GetRedReceiver() sfu.TrackReceiver {
return d
}
func (d *DummyReceiver) GetRTCPSenderReportData(layer int32) (*buffer.RTCPSenderReportData, *buffer.RTCPSenderReportData) {
if r, ok := d.receiver.Load().(sfu.TrackReceiver); ok {
return r.GetRTCPSenderReportData(layer)
}
return nil, nil
}
func (d *DummyReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) {
if r, ok := d.receiver.Load().(sfu.TrackReceiver); ok {
return r.GetReferenceLayerRTPTimestamp(ts, layer, referenceLayer)

View File

@@ -654,7 +654,7 @@ func (b *Buffer) SetSenderReportData(rtpTime uint32, ntpTime uint64) {
srData := &RTCPSenderReportData{
RTPTimestamp: rtpTime,
NTPTimestamp: mediatransportutil.NtpTime(ntpTime),
ArrivalTime: time.Now(),
At: time.Now(),
}
b.RLock()
@@ -668,7 +668,7 @@ func (b *Buffer) SetSenderReportData(rtpTime uint32, ntpTime uint64) {
}
}
func (b *Buffer) GetSenderReportData() *RTCPSenderReportData {
func (b *Buffer) GetSenderReportData() (*RTCPSenderReportData, *RTCPSenderReportData) {
b.RLock()
defer b.RUnlock()
@@ -676,7 +676,7 @@ func (b *Buffer) GetSenderReportData() *RTCPSenderReportData {
return b.rtpStats.GetRtcpSenderReportData()
}
return nil
return nil, nil
}
func (b *Buffer) SetLastFractionLostReport(lost uint8) {

View File

@@ -25,6 +25,28 @@ const (
TooLargeOWDDelta = 400 * time.Millisecond
)
// -------------------------------------------------------
type driftResult struct {
timeSinceFirst time.Duration
rtpDiffSinceFirst uint64
driftSamples int64
driftMs float64
sampleRate float64
}
func (d driftResult) String() string {
return fmt.Sprintf("time: %+v, rtp: %d, driftSamples: %d, driftMs: %.02f, sampleRate: %.02f",
d.timeSinceFirst,
d.rtpDiffSinceFirst,
d.driftSamples,
d.driftMs,
d.sampleRate,
)
}
// -------------------------------------------------------
type RTPFlowState struct {
HasLoss bool
LossStartInclusive uint16
@@ -88,9 +110,10 @@ type SnInfo struct {
}
type RTCPSenderReportData struct {
RTPTimestamp uint32
NTPTimestamp mediatransportutil.NtpTime
ArrivalTime time.Time
RTPTimestamp uint32
RTPTimestampExt uint64
NTPTimestamp mediatransportutil.NtpTime
At time.Time
}
type RTPStatsParams struct {
@@ -175,10 +198,9 @@ type RTPStats struct {
rtt uint32
maxRtt uint32
srData *RTCPSenderReportData
lastSRTime time.Time
lastSRNTP mediatransportutil.NtpTime
lastSRRTP uint32
srFirst *RTCPSenderReportData
srNewest *RTCPSenderReportData
pidController *PIDController
nextSnapshotId uint32
@@ -280,15 +302,18 @@ func (r *RTPStats) Seed(from *RTPStats) {
r.rtt = from.rtt
r.maxRtt = from.maxRtt
if from.srData != nil {
srData := *from.srData
r.srData = &srData
if from.srFirst != nil {
srFirst := *from.srFirst
r.srFirst = &srFirst
} else {
r.srData = nil
r.srFirst = nil
}
if from.srNewest != nil {
srNewest := *from.srNewest
r.srNewest = &srNewest
} else {
r.srNewest = nil
}
r.lastSRTime = from.lastSRTime
r.lastSRNTP = from.lastSRNTP
r.lastSRRTP = from.lastSRRTP
r.nextSnapshotId = from.nextSnapshotId
for id, ss := range from.snapshots {
@@ -425,12 +450,16 @@ func (r *RTPStats) Update(rtph *rtp.Header, payloadSize int, paddingSize int, pa
}
r.highestSN = rtph.SequenceNumber
if rtph.Timestamp < r.highestTS && !first {
r.tsCycles++
}
r.highestTS = rtph.Timestamp
if rtph.Timestamp != r.highestTS {
if rtph.Timestamp < r.highestTS && !first {
r.tsCycles++
}
r.highestTS = rtph.Timestamp
r.highestTime = packetTime
// update only on first packet as same timestamp could be in multiple packets.
// NOTE: this may not be the first packet with this time stamp if there is packet loss.
r.highestTime = packetTime
}
}
if !isDuplicate {
@@ -514,12 +543,15 @@ func (r *RTPStats) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt uint32
return
}
rtt, err := mediatransportutil.GetRttMs(&rr, r.lastSRNTP, r.lastSRTime)
if err == nil {
isRttChanged = rtt != r.rtt
} else {
if !errors.Is(err, mediatransportutil.ErrRttNotLastSenderReport) && !errors.Is(err, mediatransportutil.ErrRttNoLastSenderReport) {
r.logger.Warnw("error getting rtt", err)
var err error
if r.srNewest != nil {
rtt, err = mediatransportutil.GetRttMs(&rr, r.srNewest.NTPTimestamp, r.srNewest.At)
if err == nil {
isRttChanged = rtt != r.rtt
} else {
if !errors.Is(err, mediatransportutil.ErrRttNotLastSenderReport) && !errors.Is(err, mediatransportutil.ErrRttNoLastSenderReport) {
r.logger.Warnw("error getting rtt", err)
}
}
}
@@ -725,79 +757,97 @@ func (r *RTPStats) GetRtt() uint32 {
}
func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) {
r.lock.Lock()
defer r.lock.Unlock()
if srData == nil {
r.srData = nil
return
}
r.lock.Lock()
defer r.lock.Unlock()
// prevent against extreme case of anachronous sender reports
if r.srData != nil && r.srData.NTPTimestamp > srData.NTPTimestamp {
if r.srNewest != nil && r.srNewest.NTPTimestamp > srData.NTPTimestamp {
r.logger.Infow(
"received anachronous sender report",
"current", srData.NTPTimestamp.Time(),
"last", r.srData.NTPTimestamp.Time(),
"last", r.srNewest.NTPTimestamp.Time(),
)
return
}
// monitor and log RTP timestamp anomalies
if r.srData != nil {
ntpDiffSinceLast := srData.NTPTimestamp.Time().Sub(r.srData.NTPTimestamp.Time())
rtpDiffSinceLast := srData.RTPTimestamp - r.srData.RTPTimestamp
arrivalDiffSinceLast := srData.ArrivalTime.Sub(r.srData.ArrivalTime)
var ntpDiffSinceLast time.Duration
var rtpDiffSinceLast uint32
var arrivalDiffSinceLast time.Duration
var expectedTimeDiffSinceLast float64
var reason string
if r.srNewest != nil {
ntpDiffSinceLast = srData.NTPTimestamp.Time().Sub(r.srNewest.NTPTimestamp.Time())
rtpDiffSinceLast = srData.RTPTimestamp - r.srNewest.RTPTimestamp
arrivalDiffSinceLast = srData.At.Sub(r.srNewest.At)
expectedTimeDiffSinceLast := float64(rtpDiffSinceLast) / float64(r.params.ClockRate)
expectedTimeDiffSinceLast = float64(rtpDiffSinceLast) / float64(r.params.ClockRate)
var reason string
if (srData.RTPTimestamp - r.srData.RTPTimestamp) > (1 << 31) {
reason = "received sender report, out-of-order"
if (srData.RTPTimestamp - r.srNewest.RTPTimestamp) > (1 << 31) {
reason = "received sender report, out-of-order" // should not happen, just a sanity check
} else {
if math.Abs(expectedTimeDiffSinceLast-ntpDiffSinceLast.Seconds()) > 0.2 {
// more than 200 ms away from expected delta
reason = "received sender report, time warp"
}
}
}
if reason != "" {
timeSinceFirst, rtpDiffSinceFirst, drift, driftMs, sampleRate := r.getDrift()
r.logger.Infow(
reason,
"ntp", srData.NTPTimestamp.Time().String(),
"rtp", srData.RTPTimestamp,
"arrival", srData.ArrivalTime.String(),
"ntpDiffSinceLast", ntpDiffSinceLast.Seconds(),
"rtpDiffSinceLast", int32(rtpDiffSinceLast),
"arrivalDiffSinceLast", arrivalDiffSinceLast.Seconds(),
"expectedTimeDiffSinceLast", expectedTimeDiffSinceLast,
"timeSinceFirst", timeSinceFirst.Seconds(),
"rtpDiffSinceFirst", rtpDiffSinceFirst,
"drift", drift,
"driftMs", driftMs,
"sampleRate", sampleRate,
)
cycles := uint64(0)
if r.srNewest != nil {
cycles = r.srNewest.RTPTimestampExt & 0xFF_FF_FF_FF_00_00_00_00
if (srData.RTPTimestamp-r.srNewest.RTPTimestamp) < (1<<31) && srData.RTPTimestamp < r.srNewest.RTPTimestamp {
cycles += (1 << 32)
}
}
srDataCopy := *srData
r.srData = &srDataCopy
srDataCopy.RTPTimestampExt = uint64(srDataCopy.RTPTimestamp) + cycles
r.srNewest = &srDataCopy
if r.srFirst == nil {
r.srFirst = &srDataCopy
}
if reason != "" {
packetDriftResult, reportDriftResult := r.getDrift()
r.logger.Infow(
reason,
"ntp", srData.NTPTimestamp.Time().String(),
"rtp", srData.RTPTimestamp,
"arrival", srData.At.String(),
"ntpDiffSinceLast", ntpDiffSinceLast.Seconds(),
"rtpDiffSinceLast", int32(rtpDiffSinceLast),
"arrivalDiffSinceLast", arrivalDiffSinceLast.Seconds(),
"expectedTimeDiffSinceLast", expectedTimeDiffSinceLast,
"packetDrift", packetDriftResult.String(),
"reportDrift", reportDriftResult.String(),
"highestTS", r.highestTS,
"highestTime", r.highestTime.String(),
)
}
}
func (r *RTPStats) GetRtcpSenderReportData() *RTCPSenderReportData {
func (r *RTPStats) GetRtcpSenderReportData() (srFirst *RTCPSenderReportData, srNewest *RTCPSenderReportData) {
r.lock.RLock()
defer r.lock.RUnlock()
if r.srData == nil {
return nil
if r.srFirst != nil {
srFirstCopy := *r.srFirst
srFirst = &srFirstCopy
}
srDataCopy := *r.srData
return &srDataCopy
if r.srNewest != nil {
srNewestCopy := *r.srNewest
srNewest = &srNewestCopy
}
return
}
func (r *RTPStats) GetExpectedRTPTimestamp(at time.Time) (uint32, uint32, error) {
func (r *RTPStats) GetExpectedRTPTimestamp(at time.Time) (uint32, uint64, error) {
r.lock.RLock()
defer r.lock.RUnlock()
@@ -809,9 +859,9 @@ func (r *RTPStats) GetExpectedRTPTimestamp(at time.Time) (uint32, uint32, error)
expectedRTPDiff := timeDiff.Nanoseconds() * int64(r.params.ClockRate) / 1e9
expectedExtRTP := r.extStartTS + uint64(expectedRTPDiff)
minTS := r.lastSRRTP
if r.lastSRNTP == 0 {
minTS = uint32(expectedExtRTP)
minTS := ^uint64(0)
if r.srNewest != nil {
minTS = r.srNewest.RTPTimestampExt
}
r.logger.Debugw(
"expected RTP timestamp",
@@ -829,7 +879,7 @@ func (r *RTPStats) GetExpectedRTPTimestamp(at time.Time) (uint32, uint32, error)
return uint32(expectedExtRTP), minTS, nil
}
func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) (*rtcp.SenderReport, float64) {
func (r *RTPStats) GetRtcpSenderReport(ssrc uint32, srFirst *RTCPSenderReportData, srNewest *RTCPSenderReportData) (*rtcp.SenderReport, float64) {
r.lock.Lock()
defer r.lock.Unlock()
@@ -845,7 +895,27 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) (*rtcp.SenderReport, float64
timeSinceHighest := now.Sub(r.highestTime)
nowRTP := r.highestTS + uint32(timeSinceHighest.Nanoseconds()*int64(r.params.ClockRate)/1e9)
rtpDiffSinceFirst := getExtTS(nowRTP, r.tsCycles) - r.extStartTS
// It is possible that publisher is pacing at a slower rate.
// That would make `highestTS` to be lagging the RTP time stamp in the RTCP Sender Report from publisher.
// Check for that and use the later time stamp if applicable.
tsCycles := r.tsCycles
if nowRTP < r.highestTS {
tsCycles++
}
nowRTPExt := getExtTS(nowRTP, tsCycles)
if srFirst != nil && srNewest != nil && srFirst.RTPTimestamp != srNewest.RTPTimestamp {
// use incoming rate as a guide
tsf := srNewest.NTPTimestamp.Time().Sub(srFirst.NTPTimestamp.Time())
rdsf := srNewest.RTPTimestampExt - srFirst.RTPTimestampExt
sr := float64(rdsf) / tsf.Seconds()
nowRTPExtUsingRate := r.extStartTS + uint64(sr*timeSinceFirst.Seconds())
if nowRTPExtUsingRate > nowRTPExt {
nowRTPExt = nowRTPExtUsingRate
nowRTP = uint32(nowRTPExtUsingRate)
}
}
rtpDiffSinceFirst := nowRTPExt - r.extStartTS
rate := float64(rtpDiffSinceFirst) / timeSinceFirst.Seconds()
pidOutput := r.pidController.Update(
float64(r.params.ClockRate),
@@ -854,49 +924,50 @@ func (r *RTPStats) GetRtcpSenderReport(ssrc uint32) (*rtcp.SenderReport, float64
)
// monitor and log RTP timestamp anomalies
isWarped := false
if r.lastSRNTP != 0 {
ntpDiffSinceLast := nowNTP.Time().Sub(r.lastSRNTP.Time())
rtpDiffSinceLast := nowRTP - r.lastSRRTP
departureDiffSinceLast := now.Sub(r.lastSRTime)
var ntpDiffSinceLast time.Duration
var rtpDiffSinceLast uint32
var departureDiffSinceLast time.Duration
var expectedTimeDiffSinceLast float64
var isWarped bool
if r.srNewest != nil {
ntpDiffSinceLast = nowNTP.Time().Sub(r.srNewest.NTPTimestamp.Time())
rtpDiffSinceLast = nowRTP - r.srNewest.RTPTimestamp
departureDiffSinceLast = now.Sub(r.srNewest.At)
expectedTimeDiffSinceLast := float64(rtpDiffSinceLast) / float64(r.params.ClockRate)
expectedTimeDiffSinceLast = float64(rtpDiffSinceLast) / float64(r.params.ClockRate)
if math.Abs(expectedTimeDiffSinceLast-ntpDiffSinceLast.Seconds()) > 0.2 {
// more than 200 ms away from expected delta
isWarped = true
}
if isWarped {
expectedExtRTP := r.extStartTS + uint64(timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9)
ntpDiffLocal := nowNTP.Time().Sub(r.highestTime)
rtpDiffLocal := int32(nowRTP - r.highestTS)
timeSinceFirst, rtpDiffSinceFirst, drift, driftMs, sampleRate := r.getDrift()
r.logger.Infow(
"sending sender report, time warp",
"ntp", nowNTP.Time().String(),
"rtp", nowRTP,
"expectedRTP", uint32(expectedExtRTP),
"departure", now.String(),
"ntpDiffSinceLast", ntpDiffSinceLast.Seconds(),
"rtpDiffSinceLast", rtpDiffSinceLast,
"departureDiffSinceLast", departureDiffSinceLast.Seconds(),
"expectedTimeDiffSinceLast", expectedTimeDiffSinceLast,
"timeSinceFirst", timeSinceFirst.Seconds(),
"rtpDiffSinceFirst", rtpDiffSinceFirst,
"drift", drift,
"driftMs", driftMs,
"sampleRate", sampleRate,
"highestTS", r.highestTS,
"highestTime", r.highestTime.String(),
"rtpDiffLocal", rtpDiffLocal,
"ntpDiffLocal", ntpDiffLocal,
)
}
}
r.lastSRTime = now
r.lastSRNTP = nowNTP
r.lastSRRTP = nowRTP
r.srNewest = &RTCPSenderReportData{
NTPTimestamp: nowNTP,
RTPTimestamp: nowRTP,
RTPTimestampExt: nowRTPExt,
At: now,
}
if r.srFirst == nil {
r.srFirst = r.srNewest
}
if isWarped {
packetDriftResult, reportDriftResult := r.getDrift()
r.logger.Infow(
"sending sender report, time warp",
"ntp", nowNTP.Time().String(),
"rtp", nowRTP,
"departure", now.String(),
"ntpDiffSinceLast", ntpDiffSinceLast.Seconds(),
"rtpDiffSinceLast", int32(rtpDiffSinceLast),
"departureDiffSinceLast", departureDiffSinceLast.Seconds(),
"expectedTimeDiffSinceLast", expectedTimeDiffSinceLast,
"packetDrift", packetDriftResult.String(),
"reportDrift", reportDriftResult.String(),
"highestTS", r.highestTS,
"highestTime", r.highestTime.String(),
)
}
return &rtcp.SenderReport{
SSRC: ssrc,
@@ -940,15 +1011,15 @@ func (r *RTPStats) SnapshotRtcpReceptionReport(ssrc uint32, proxyFracLost uint8,
}
var dlsr uint32
if r.srData != nil && !r.srData.ArrivalTime.IsZero() {
delayMS := uint32(time.Since(r.srData.ArrivalTime).Milliseconds())
if r.srNewest != nil && !r.srNewest.At.IsZero() {
delayMS := uint32(time.Since(r.srNewest.At).Milliseconds())
dlsr = (delayMS / 1e3) << 16
dlsr |= (delayMS % 1e3) * 65536 / 1000
}
lastSR := uint32(0)
if r.srData != nil {
lastSR = uint32(r.srData.NTPTimestamp >> 16)
if r.srNewest != nil {
lastSR = uint32(r.srNewest.NTPTimestamp >> 16)
}
return &rtcp.ReceptionReport{
SSRC: ssrc,
@@ -1218,7 +1289,7 @@ func (r *RTPStats) ToProto() *livekit.RTPStats {
jitterTime := jitter / float64(r.params.ClockRate) * 1e6
maxJitterTime := maxJitter / float64(r.params.ClockRate) * 1e6
_, _, _, driftMs, sampleRate := r.getDrift()
packetDrift, _ := r.getDrift()
p := &livekit.RTPStats{
StartTime: timestamppb.New(r.startTime),
@@ -1261,8 +1332,8 @@ func (r *RTPStats) ToProto() *livekit.RTPStats {
LastFir: timestamppb.New(r.lastFir),
RttCurrent: r.rtt,
RttMax: r.maxRtt,
DriftMs: driftMs,
SampleRate: sampleRate,
DriftMs: packetDrift.driftMs,
SampleRate: packetDrift.sampleRate,
}
gapsPresent := false
@@ -1456,12 +1527,20 @@ func (r *RTPStats) updateJitter(rtph *rtp.Header, packetTime time.Time) {
r.lastJitterRTP = rtph.Timestamp
}
func (r *RTPStats) getDrift() (timeSinceFirst time.Duration, rtpDiffSinceFirst uint64, drift int64, driftMs float64, sampleRate float64) {
timeSinceFirst = r.highestTime.Sub(r.firstTime)
rtpDiffSinceFirst = getExtTS(r.highestTS, r.tsCycles) - r.extStartTS
drift = int64(rtpDiffSinceFirst - uint64(timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9))
driftMs = (float64(drift) * 1000) / float64(r.params.ClockRate)
sampleRate = float64(rtpDiffSinceFirst) / timeSinceFirst.Seconds()
func (r *RTPStats) getDrift() (packetDrift driftResult, reportDrift driftResult) {
packetDrift.timeSinceFirst = r.highestTime.Sub(r.firstTime)
packetDrift.rtpDiffSinceFirst = getExtTS(r.highestTS, r.tsCycles) - r.extStartTS
packetDrift.driftSamples = int64(packetDrift.rtpDiffSinceFirst - uint64(packetDrift.timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9))
packetDrift.driftMs = (float64(packetDrift.driftSamples) * 1000) / float64(r.params.ClockRate)
packetDrift.sampleRate = float64(packetDrift.rtpDiffSinceFirst) / packetDrift.timeSinceFirst.Seconds()
if r.srFirst != nil && r.srNewest != nil && r.srFirst.RTPTimestamp != r.srNewest.RTPTimestamp {
reportDrift.timeSinceFirst = r.srNewest.NTPTimestamp.Time().Sub(r.srFirst.NTPTimestamp.Time())
reportDrift.rtpDiffSinceFirst = r.srNewest.RTPTimestampExt - r.srFirst.RTPTimestampExt
reportDrift.driftSamples = int64(reportDrift.rtpDiffSinceFirst - uint64(reportDrift.timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9))
reportDrift.driftMs = (float64(reportDrift.driftSamples) * 1000) / float64(r.params.ClockRate)
reportDrift.sampleRate = float64(reportDrift.rtpDiffSinceFirst) / reportDrift.timeSinceFirst.Seconds()
}
return
}

View File

@@ -1116,7 +1116,8 @@ func (d *DownTrack) CreateSenderReport() *rtcp.SenderReport {
return nil
}
sr, tsAdjust := d.rtpStats.GetRtcpSenderReport(d.ssrc)
srFirst, srNewest := d.receiver.GetRTCPSenderReportData(d.forwarder.GetReferenceLayerSpatial())
sr, tsAdjust := d.rtpStats.GetRtcpSenderReport(d.ssrc, srFirst, srNewest)
if d.allowTimestampAdjustment {
d.forwarder.AdjustTimestamp(tsAdjust)
}
@@ -1636,7 +1637,7 @@ func (d *DownTrack) DebugInfo() map[string]interface{} {
}
}
func (d *DownTrack) getExpectedRTPTimestamp(at time.Time) (uint32, uint32, error) {
func (d *DownTrack) getExpectedRTPTimestamp(at time.Time) (uint32, uint64, error) {
return d.rtpStats.GetExpectedRTPTimestamp(at)
}

View File

@@ -170,7 +170,7 @@ type Forwarder struct {
kind webrtc.RTPCodecType
logger logger.Logger
getReferenceLayerRTPTimestamp func(ts uint32, layer int32, referenceLayer int32) (uint32, error)
getExpectedRTPTimestamp func(at time.Time) (uint32, uint32, error)
getExpectedRTPTimestamp func(at time.Time) (uint32, uint64, error)
muted bool
pubMuted bool
@@ -201,7 +201,7 @@ func NewForwarder(
kind webrtc.RTPCodecType,
logger logger.Logger,
getReferenceLayerRTPTimestamp func(ts uint32, layer int32, referenceLayer int32) (uint32, error),
getExpectedRTPTimestamp func(at time.Time) (uint32, uint32, error),
getExpectedRTPTimestamp func(at time.Time) (uint32, uint64, error),
) *Forwarder {
f := &Forwarder{
kind: kind,
@@ -488,6 +488,13 @@ func (f *Forwarder) TargetLayer() buffer.VideoLayer {
return f.vls.GetTarget()
}
func (f *Forwarder) GetReferenceLayerSpatial() int32 {
f.lock.RLock()
defer f.lock.RUnlock()
return f.referenceLayerSpatial
}
func (f *Forwarder) isDeficientLocked() bool {
return f.lastAllocation.IsDeficient
}
@@ -1475,7 +1482,7 @@ func (f *Forwarder) getTranslationParamsCommon(extPkt *buffer.ExtPacket, layer i
lastTS := f.rtpMunger.GetLast().LastTS
refTS := lastTS
expectedTS := lastTS
minTS := lastTS
minTS := uint64(lastTS)
switchingAt := time.Now()
if f.getReferenceLayerRTPTimestamp != nil {
ts, err := f.getReferenceLayerRTPTimestamp(extPkt.Packet.Timestamp, layer, f.referenceLayerSpatial)
@@ -1671,7 +1678,7 @@ func (f *Forwarder) GetSnTsForBlankFrames(frameRate uint32, numPackets int) ([]S
lastTS := f.rtpMunger.GetLast().LastTS
expectedTS := lastTS
minTS := lastTS
minTS := uint64(lastTS)
if f.getExpectedRTPTimestamp != nil {
ts, min, err := f.getExpectedRTPTimestamp(time.Now())
if err == nil {
@@ -1826,7 +1833,7 @@ done:
return float64(distance) / float64(maxSeenLayer.Temporal+1)
}
func getNextTimestamp(lastTS uint32, refTS uint32, expectedTS uint32, minTS uint32) (uint32, string) {
func getNextTimestamp(lastTS uint32, refTS uint32, expectedTS uint32, minTS uint64) (uint32, string) {
isInOrder := func(val1, val2 uint32) bool {
diff := val1 - val2
return diff != 0 && diff < (1<<31)
@@ -1860,8 +1867,8 @@ func getNextTimestamp(lastTS uint32, refTS uint32, expectedTS uint32, minTS uint
explain = fmt.Sprintf("e < r < l, %d, %d", refTS-expectedTS, lastTS-refTS)
}
if !isInOrder(nextTS, minTS) {
nextTS = minTS + 1
if minTS != ^uint64(0) && !isInOrder(nextTS, uint32(minTS)) {
nextTS = uint32(minTS) + 1
}
return nextTS, explain

View File

@@ -65,6 +65,7 @@ type TrackReceiver interface {
GetTemporalLayerFpsForSpatial(layer int32) []float32
GetRTCPSenderReportData(layer int32) (*buffer.RTCPSenderReportData, *buffer.RTCPSenderReportData)
GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error)
}
@@ -309,7 +310,8 @@ func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buff
})
buff.OnRtcpFeedback(w.sendRTCP)
buff.OnRtcpSenderReport(func(srData *buffer.RTCPSenderReportData) {
w.streamTrackerManager.SetRTCPSenderReportData(layer, buff.GetSenderReportData())
srFirst, srNewest := buff.GetSenderReportData()
w.streamTrackerManager.SetRTCPSenderReportData(layer, srFirst, srNewest)
w.downTrackSpreader.Broadcast(func(dt TrackSender) {
_ = dt.HandleRTCPSenderReportData(w.codec.PayloadType, layer, srData)
@@ -744,6 +746,10 @@ func (w *WebRTCReceiver) GetTemporalLayerFpsForSpatial(layer int32) []float32 {
return b.GetTemporalLayerFpsForSpatial(layer)
}
func (w *WebRTCReceiver) GetRTCPSenderReportData(layer int32) (*buffer.RTCPSenderReportData, *buffer.RTCPSenderReportData) {
return w.streamTrackerManager.GetRTCPSenderReportData(layer)
}
func (w *WebRTCReceiver) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) {
return w.streamTrackerManager.GetReferenceLayerRTPTimestamp(ts, layer, referenceLayer)
}

View File

@@ -24,6 +24,11 @@ type StreamTrackerManagerListener interface {
OnBitrateReport(availableLayers []int32, bitrates Bitrates)
}
type endsSenderReport struct {
first *buffer.RTCPSenderReportData
newest *buffer.RTCPSenderReportData
}
type StreamTrackerManager struct {
logger logger.Logger
trackInfo *livekit.TrackInfo
@@ -43,7 +48,7 @@ type StreamTrackerManager struct {
paused bool
senderReportMu sync.RWMutex
senderReports [buffer.DefaultMaxLayerSpatial + 1]*buffer.RTCPSenderReportData
senderReports [buffer.DefaultMaxLayerSpatial + 1]endsSenderReport
closed core.Fuse
@@ -475,7 +480,7 @@ func (s *StreamTrackerManager) maxExpectedLayerFromTrackInfo() {
}
}
func (s *StreamTrackerManager) SetRTCPSenderReportData(layer int32, senderReport *buffer.RTCPSenderReportData) {
func (s *StreamTrackerManager) SetRTCPSenderReportData(layer int32, srFirst *buffer.RTCPSenderReportData, srNewest *buffer.RTCPSenderReportData) {
s.senderReportMu.Lock()
defer s.senderReportMu.Unlock()
@@ -483,7 +488,19 @@ func (s *StreamTrackerManager) SetRTCPSenderReportData(layer int32, senderReport
return
}
s.senderReports[layer] = senderReport
s.senderReports[layer].first = srFirst
s.senderReports[layer].newest = srNewest
}
func (s *StreamTrackerManager) GetRTCPSenderReportData(layer int32) (*buffer.RTCPSenderReportData, *buffer.RTCPSenderReportData) {
s.senderReportMu.RLock()
defer s.senderReportMu.RUnlock()
if layer < 0 || int(layer) >= len(s.senderReports) {
return nil, nil
}
return s.senderReports[layer].first, s.senderReports[layer].newest
}
func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ts uint32, layer int32, referenceLayer int32) (uint32, error) {
@@ -502,7 +519,7 @@ func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ts uint32, layer in
var srLayer *buffer.RTCPSenderReportData
if int(layer) < len(s.senderReports) {
srLayer = s.senderReports[layer]
srLayer = s.senderReports[layer].newest
}
if srLayer == nil || srLayer.NTPTimestamp == 0 {
return 0, fmt.Errorf("layer rtcp sender report not available: %d", layer)
@@ -510,7 +527,7 @@ func (s *StreamTrackerManager) GetReferenceLayerRTPTimestamp(ts uint32, layer in
var srRef *buffer.RTCPSenderReportData
if int(referenceLayer) < len(s.senderReports) {
srRef = s.senderReports[referenceLayer]
srRef = s.senderReports[referenceLayer].newest
}
if srRef == nil || srRef.NTPTimestamp == 0 {
return 0, fmt.Errorf("reference layer rtcp sender report not available: %d", referenceLayer)