mirror of
https://github.com/livekit/livekit.git
synced 2026-06-06 12:41:37 +00:00
Ignore lagging layer switches. (#1948)
This commit is contained in:
@@ -101,7 +101,7 @@ type Buffer struct {
|
||||
// callbacks
|
||||
onClose func()
|
||||
onRtcpFeedback func([]rtcp.Packet)
|
||||
onRtcpSenderReport func(*RTCPSenderReportData)
|
||||
onRtcpSenderReport func()
|
||||
onFpsChanged func()
|
||||
onFinalRtpStats func(*RTPStats)
|
||||
|
||||
@@ -675,7 +675,7 @@ func (b *Buffer) SetSenderReportData(rtpTime uint32, ntpTime uint64) {
|
||||
b.RUnlock()
|
||||
|
||||
if b.onRtcpSenderReport != nil {
|
||||
b.onRtcpSenderReport(srData)
|
||||
b.onRtcpSenderReport()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -729,7 +729,7 @@ func (b *Buffer) OnRtcpFeedback(fn func(fb []rtcp.Packet)) {
|
||||
b.onRtcpFeedback = fn
|
||||
}
|
||||
|
||||
func (b *Buffer) OnRtcpSenderReport(fn func(srData *RTCPSenderReportData)) {
|
||||
func (b *Buffer) OnRtcpSenderReport(fn func()) {
|
||||
b.onRtcpSenderReport = fn
|
||||
}
|
||||
|
||||
|
||||
@@ -36,6 +36,9 @@ const (
|
||||
FirstSnapshotId = 1
|
||||
SnInfoSize = 8192
|
||||
SnInfoMask = SnInfoSize - 1
|
||||
|
||||
firstPacketTimeAdjustWindow = 2 * time.Minute
|
||||
firstPacketTimeAdjustThreshold = 5 * time.Second
|
||||
)
|
||||
|
||||
// -------------------------------------------------------
|
||||
@@ -763,6 +766,40 @@ func (r *RTPStats) GetRtt() uint32 {
|
||||
return r.rtt
|
||||
}
|
||||
|
||||
func (r *RTPStats) MaybeAdjustFirstPacketTime(srData *RTCPSenderReportData) {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
|
||||
r.maybeAdjustFirstPacketTime(srData.RTPTimestampExt)
|
||||
}
|
||||
|
||||
func (r *RTPStats) maybeAdjustFirstPacketTime(extTS uint64) {
|
||||
if time.Since(r.startTime) > firstPacketTimeAdjustWindow {
|
||||
return
|
||||
}
|
||||
|
||||
// for some time after the start, adjust time of first packet.
|
||||
// Helps improve accuracy of expected timestamp calculation.
|
||||
// Adjusting only one way, i. e. if the first sample experienced
|
||||
// abnormal delay (maybe due to pacing or maybe due to queuing
|
||||
// in some network element along the way), push back first time
|
||||
// to an earlier instance.
|
||||
samplesDuration := time.Duration(float64(extTS-r.extStartTS) / float64(r.params.ClockRate) * float64(time.Second))
|
||||
firstTime := time.Now().Add(-samplesDuration)
|
||||
if firstTime.Before(r.firstTime) {
|
||||
r.logger.Infow(
|
||||
"adjusting first packet time",
|
||||
"before", r.firstTime.String(),
|
||||
"after", firstTime.String(),
|
||||
)
|
||||
if r.firstTime.Sub(firstTime) > firstPacketTimeAdjustThreshold {
|
||||
r.logger.Infow("first packet time adjustment too big, ignoring", "adjustment", r.firstTime.Sub(firstTime))
|
||||
} else {
|
||||
r.firstTime = firstTime
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
@@ -792,6 +829,8 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) {
|
||||
srDataCopy := *srData
|
||||
srDataCopy.RTPTimestampExt = uint64(srDataCopy.RTPTimestamp) + cycles
|
||||
|
||||
r.maybeAdjustFirstPacketTime(srDataCopy.RTPTimestampExt)
|
||||
|
||||
// monitor and log RTP timestamp anomalies
|
||||
var ntpDiffSinceLast time.Duration
|
||||
var rtpDiffSinceLast uint32
|
||||
|
||||
@@ -1802,7 +1802,10 @@ func (d *DownTrack) sendSilentFrameOnMuteForOpus() {
|
||||
}
|
||||
}
|
||||
|
||||
func (d *DownTrack) HandleRTCPSenderReportData(_payloadType webrtc.PayloadType, _layer int32, _srData *buffer.RTCPSenderReportData) error {
|
||||
func (d *DownTrack) HandleRTCPSenderReportData(_payloadType webrtc.PayloadType, layer int32, srData *buffer.RTCPSenderReportData) error {
|
||||
if layer == d.forwarder.GetReferenceLayerSpatial() {
|
||||
d.rtpStats.MaybeAdjustFirstPacketTime(srData)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
+18
-15
@@ -15,6 +15,7 @@
|
||||
package sfu
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"math/rand"
|
||||
@@ -40,7 +41,7 @@ const (
|
||||
FlagFilterRTX = true
|
||||
TransitionCostSpatial = 10
|
||||
|
||||
ResumeBehindThresholdSeconds = float64(0.1) // 100ms
|
||||
ResumeBehindThresholdSeconds = float64(0.2) // 200ms
|
||||
LayerSwitchBehindThresholdSeconds = float64(0.05) // 50ms
|
||||
SwitchAheadThresholdSeconds = float64(0.025) // 25ms
|
||||
)
|
||||
@@ -1455,14 +1456,16 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
|
||||
switchingAt := time.Now()
|
||||
if f.getReferenceLayerRTPTimestamp != nil {
|
||||
ts, err := f.getReferenceLayerRTPTimestamp(extPkt.Packet.Timestamp, layer, f.referenceLayerSpatial)
|
||||
if err == nil {
|
||||
refTS = ts
|
||||
if err != nil {
|
||||
// error out if refTS is not available. It can happen when there is no sender report
|
||||
// for the layer being switched to. Can especially happen at the start of the track when layer switches are
|
||||
// potentially happening very quickly. Erroring out and waiting for a layer for which a sender report has been
|
||||
// received will calculate a better offset, but may result in initial adaptation to take a bit longer depending
|
||||
// on how often publisher/remote side sends RTCP sender report.
|
||||
return err
|
||||
}
|
||||
// AVSYNC-TODO: can error out here if refTS is not available. It can happen when there is no sender report
|
||||
// for the layer being switched to. Can especially happen at the start of the track when layer switches are
|
||||
// potentially happening very quickly. Erroring out and waiting for a layer for which a sender report has been
|
||||
// received will calculate a better offset, but may result in initial adaptation to take a bit longer depending
|
||||
// on how often publisher/remote side sends RTCP sender report.
|
||||
|
||||
refTS = ts
|
||||
}
|
||||
|
||||
if f.getExpectedRTPTimestamp != nil {
|
||||
@@ -1508,7 +1511,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
|
||||
// between expectedTS and refTS is thresholded. Difference below the threshold is treated as Case 2
|
||||
// and above as Case 1.
|
||||
//
|
||||
// In the event of refTS > expectedTS, another threshold is used to pick the next timestamp.
|
||||
// In the event of refTS > expectedTS, use refTS.
|
||||
// Ideally, refTS should not be ahead of expectedTS, but expectedTS uses the first packet's
|
||||
// wall clock time. So, if the first packet experienced abmormal latency, it is possible
|
||||
// for refTS > expectedTS
|
||||
@@ -1523,22 +1526,22 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) e
|
||||
} else {
|
||||
if math.Abs(diffSeconds) > SwitchAheadThresholdSeconds {
|
||||
f.logger.Infow("resume, reference too far ahead", "expectedTS", expectedTS, "refTS", refTS, "diffSeconds", math.Abs(diffSeconds))
|
||||
nextTS = expectedTS
|
||||
} else {
|
||||
nextTS = refTS
|
||||
}
|
||||
nextTS = refTS
|
||||
}
|
||||
} else {
|
||||
// switching between layers, check if refTS is too far behind the last sent
|
||||
diffSeconds := float64(int32(refTS-lastTS)) / float64(f.codec.ClockRate)
|
||||
if diffSeconds < 0.0 {
|
||||
if math.Abs(diffSeconds) > LayerSwitchBehindThresholdSeconds {
|
||||
// AVSYNC-TODO: This could be due to pacer trickling out this layer. Should potentially return error here and wait for a more opportune time
|
||||
// or some forcing function (like "have waited for too long for layer switch, nothing available, switch to whatever is available" kind of condition)
|
||||
// to do the switch. Just logging it for now.
|
||||
// this could be due to pacer trickling out this layer. Error out and wait for a more opportune time.
|
||||
// AVSYNC-TODO: Consider some forcing function to do the switch
|
||||
// (like "have waited for too long for layer switch, nothing available, switch to whatever is available" kind of condition).
|
||||
f.logger.Infow("layer switch, reference too far behind", "expectedTS", expectedTS, "refTS", refTS, "lastTS", lastTS, "diffSeconds", math.Abs(diffSeconds))
|
||||
return errors.New("switch point too far behind")
|
||||
}
|
||||
// use a nominal increase to ensure that timestamp is always moving forward
|
||||
f.logger.Infow("layer switch, reference is slghtly behind", "expectedTS", expectedTS, "refTS", refTS, "lastTS", lastTS, "diffSeconds", math.Abs(diffSeconds))
|
||||
nextTS = lastTS + 1
|
||||
} else {
|
||||
diffSeconds = float64(int32(expectedTS-refTS)) / float64(f.codec.ClockRate)
|
||||
|
||||
+2
-2
@@ -331,12 +331,12 @@ func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buff
|
||||
SmoothIntervals: w.audioConfig.SmoothIntervals,
|
||||
})
|
||||
buff.OnRtcpFeedback(w.sendRTCP)
|
||||
buff.OnRtcpSenderReport(func(srData *buffer.RTCPSenderReportData) {
|
||||
buff.OnRtcpSenderReport(func() {
|
||||
srFirst, srNewest := buff.GetSenderReportData()
|
||||
w.streamTrackerManager.SetRTCPSenderReportData(layer, srFirst, srNewest)
|
||||
|
||||
w.downTrackSpreader.Broadcast(func(dt TrackSender) {
|
||||
_ = dt.HandleRTCPSenderReportData(w.codec.PayloadType, layer, srData)
|
||||
_ = dt.HandleRTCPSenderReportData(w.codec.PayloadType, layer, srNewest)
|
||||
})
|
||||
})
|
||||
|
||||
|
||||
Reference in New Issue
Block a user