Merge remote-tracking branch 'origin/master' into raja_1833

This commit is contained in:
boks1971
2023-08-09 18:55:48 +05:30
6 changed files with 117 additions and 42 deletions
+3 -3
View File
@@ -101,7 +101,7 @@ type Buffer struct {
// callbacks
onClose func()
onRtcpFeedback func([]rtcp.Packet)
onRtcpSenderReport func(*RTCPSenderReportData)
onRtcpSenderReport func()
onFpsChanged func()
onFinalRtpStats func(*RTPStats)
@@ -675,7 +675,7 @@ func (b *Buffer) SetSenderReportData(rtpTime uint32, ntpTime uint64) {
b.RUnlock()
if b.onRtcpSenderReport != nil {
b.onRtcpSenderReport(srData)
b.onRtcpSenderReport()
}
}
@@ -729,7 +729,7 @@ func (b *Buffer) OnRtcpFeedback(fn func(fb []rtcp.Packet)) {
b.onRtcpFeedback = fn
}
func (b *Buffer) OnRtcpSenderReport(fn func(srData *RTCPSenderReportData)) {
func (b *Buffer) OnRtcpSenderReport(fn func()) {
b.onRtcpSenderReport = fn
}
+39
View File
@@ -36,6 +36,9 @@ const (
FirstSnapshotId = 1
SnInfoSize = 8192
SnInfoMask = SnInfoSize - 1
firstPacketTimeAdjustWindow = 2 * time.Minute
firstPacketTimeAdjustThreshold = 5 * time.Second
)
// -------------------------------------------------------
@@ -763,6 +766,40 @@ func (r *RTPStats) GetRtt() uint32 {
return r.rtt
}
func (r *RTPStats) MaybeAdjustFirstPacketTime(srData *RTCPSenderReportData) {
r.lock.Lock()
defer r.lock.Unlock()
r.maybeAdjustFirstPacketTime(srData.RTPTimestampExt)
}
func (r *RTPStats) maybeAdjustFirstPacketTime(extTS uint64) {
if time.Since(r.startTime) > firstPacketTimeAdjustWindow {
return
}
// for some time after the start, adjust time of first packet.
// Helps improve accuracy of expected timestamp calculation.
// Adjusting only one way, i. e. if the first sample experienced
// abnormal delay (maybe due to pacing or maybe due to queuing
// in some network element along the way), push back first time
// to an earlier instance.
samplesDuration := time.Duration(float64(extTS-r.extStartTS) / float64(r.params.ClockRate) * float64(time.Second))
firstTime := time.Now().Add(-samplesDuration)
if firstTime.Before(r.firstTime) {
r.logger.Infow(
"adjusting first packet time",
"before", r.firstTime.String(),
"after", firstTime.String(),
)
if r.firstTime.Sub(firstTime) > firstPacketTimeAdjustThreshold {
r.logger.Infow("first packet time adjustment too big, ignoring", "adjustment", r.firstTime.Sub(firstTime))
} else {
r.firstTime = firstTime
}
}
}
func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) {
r.lock.Lock()
defer r.lock.Unlock()
@@ -792,6 +829,8 @@ func (r *RTPStats) SetRtcpSenderReportData(srData *RTCPSenderReportData) {
srDataCopy := *srData
srDataCopy.RTPTimestampExt = uint64(srDataCopy.RTPTimestamp) + cycles
r.maybeAdjustFirstPacketTime(srDataCopy.RTPTimestampExt)
// monitor and log RTP timestamp anomalies
var ntpDiffSinceLast time.Duration
var rtpDiffSinceLast uint32
+4 -1
View File
@@ -1842,7 +1842,10 @@ func (d *DownTrack) sendSilentFrameOnMuteForOpus() {
}
}
func (d *DownTrack) HandleRTCPSenderReportData(_payloadType webrtc.PayloadType, _layer int32, _srData *buffer.RTCPSenderReportData) error {
func (d *DownTrack) HandleRTCPSenderReportData(_payloadType webrtc.PayloadType, layer int32, srData *buffer.RTCPSenderReportData) error {
if layer == d.forwarder.GetReferenceLayerSpatial() {
d.rtpStats.MaybeAdjustFirstPacketTime(srData)
}
return nil
}
+23 -20
View File
@@ -15,6 +15,7 @@
package sfu
import (
"errors"
"fmt"
"math"
"math/rand"
@@ -40,7 +41,7 @@ const (
FlagFilterRTX = true
TransitionCostSpatial = 10
ResumeBehindThresholdSeconds = float64(0.1) // 100ms
ResumeBehindThresholdSeconds = float64(0.2) // 200ms
LayerSwitchBehindThresholdSeconds = float64(0.05) // 50ms
SwitchAheadThresholdSeconds = float64(0.025) // 25ms
)
@@ -1425,7 +1426,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) (
f.referenceLayerSpatial = layer
f.rtpMunger.SetLastSnTs(extPkt)
f.codecMunger.SetLast(extPkt)
f.logger.Debugw(
f.logger.Infow(
"starting forwarding",
"sequenceNumber", extPkt.Packet.SequenceNumber,
"timestamp", extPkt.Packet.Timestamp,
@@ -1457,14 +1458,16 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) (
switchingAt := time.Now()
if f.getReferenceLayerRTPTimestamp != nil {
ts, err := f.getReferenceLayerRTPTimestamp(extPkt.Packet.Timestamp, layer, f.referenceLayerSpatial)
if err == nil {
refTS = ts
if err != nil {
// error out if refTS is not available. It can happen when there is no sender report
// for the layer being switched to. Can especially happen at the start of the track when layer switches are
// potentially happening very quickly. Erroring out and waiting for a layer for which a sender report has been
// received will calculate a better offset, but may result in initial adaptation to take a bit longer depending
// on how often publisher/remote side sends RTCP sender report.
return nil, err
}
// AVSYNC-TODO: can error out here if refTS is not available. It can happen when there is no sender report
// for the layer being switched to. Can especially happen at the start of the track when layer switches are
// potentially happening very quickly. Erroring out and waiting for a layer for which a sender report has been
// received will calculate a better offset, but may result in initial adaptation to take a bit longer depending
// on how often publisher/remote side sends RTCP sender report.
refTS = ts
}
if f.getExpectedRTPTimestamp != nil {
@@ -1510,11 +1513,11 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) (
// between expectedTS and refTS is thresholded. Difference below the threshold is treated as Case 2
// and above as Case 1.
//
// In the event of refTS > expectedTS, another threshold is used to pick the next timestamp.
// In the event of refTS > expectedTS, use refTS.
// Ideally, refTS should not be ahead of expectedTS, but expectedTS uses the first packet's
// wall clock time. So, if the first packet experienced abmormal latency, it is possible
// for refTS > expectedTS
diffSeconds := float64(expectedTS-refTS) / float64(f.codec.ClockRate)
diffSeconds := float64(int32(expectedTS-refTS)) / float64(f.codec.ClockRate)
if diffSeconds >= 0.0 {
if diffSeconds > ResumeBehindThresholdSeconds {
f.logger.Infow("resume, reference too far behind", "expectedTS", expectedTS, "refTS", refTS, "diffSeconds", diffSeconds)
@@ -1525,25 +1528,25 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) (
} else {
if math.Abs(diffSeconds) > SwitchAheadThresholdSeconds {
f.logger.Infow("resume, reference too far ahead", "expectedTS", expectedTS, "refTS", refTS, "diffSeconds", math.Abs(diffSeconds))
nextTS = expectedTS
} else {
nextTS = refTS
}
nextTS = refTS
}
} else {
// switching between layers, check if refTS is too far behind the last sent
diffSeconds := float64(refTS-lastTS) / float64(f.codec.ClockRate)
diffSeconds := float64(int32(refTS-lastTS)) / float64(f.codec.ClockRate)
if diffSeconds < 0.0 {
if math.Abs(diffSeconds) > LayerSwitchBehindThresholdSeconds {
// AVSYNC-TODO: This could be due to pacer trickling out this layer. Should potentially return error here and wait for a more opportune time
// or some forcing function (like "have waited for too long for layer switch, nothing available, switch to whatever is available" kind of condition)
// to do the switch. Just logging it for now.
// this could be due to pacer trickling out this layer. Error out and wait for a more opportune time.
// AVSYNC-TODO: Consider some forcing function to do the switch
// (like "have waited for too long for layer switch, nothing available, switch to whatever is available" kind of condition).
f.logger.Infow("layer switch, reference too far behind", "expectedTS", expectedTS, "refTS", refTS, "lastTS", lastTS, "diffSeconds", math.Abs(diffSeconds))
return nil, errors.New("switch point too far behind")
}
// use a nominal increase to ensure that timestamp is always moving forward
f.logger.Infow("layer switch, reference is slghtly behind", "expectedTS", expectedTS, "refTS", refTS, "lastTS", lastTS, "diffSeconds", math.Abs(diffSeconds))
nextTS = lastTS + 1
} else {
diffSeconds = float64(expectedTS-refTS) / float64(f.codec.ClockRate)
diffSeconds = float64(int32(expectedTS-refTS)) / float64(f.codec.ClockRate)
if diffSeconds < 0.0 && math.Abs(diffSeconds) > SwitchAheadThresholdSeconds {
f.logger.Infow("layer switch, reference too far ahead", "expectedTS", expectedTS, "refTS", refTS, "diffSeconds", math.Abs(diffSeconds))
nextTS = expectedTS
@@ -1573,7 +1576,7 @@ func (f *Forwarder) processSourceSwitch(extPkt *buffer.ExtPacket, layer int32) (
f.rtpMunger.UpdateSnTsOffsets(extPkt, snOffset, tsOffset)
f.codecMunger.UpdateOffsets(extPkt)
f.logger.Debugw(
f.logger.Infow(
"source switch",
"switchingAt", switchingAt.String(),
"layer", layer,
+2 -2
View File
@@ -331,12 +331,12 @@ func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buff
SmoothIntervals: w.audioConfig.SmoothIntervals,
})
buff.OnRtcpFeedback(w.sendRTCP)
buff.OnRtcpSenderReport(func(srData *buffer.RTCPSenderReportData) {
buff.OnRtcpSenderReport(func() {
srFirst, srNewest := buff.GetSenderReportData()
w.streamTrackerManager.SetRTCPSenderReportData(layer, srFirst, srNewest)
w.downTrackSpreader.Broadcast(func(dt TrackSender) {
_ = dt.HandleRTCPSenderReportData(w.codec.PayloadType, layer, srData)
_ = dt.HandleRTCPSenderReportData(w.codec.PayloadType, layer, srNewest)
})
})
+46 -16
View File
@@ -56,6 +56,15 @@ var (
promRTT *prometheus.HistogramVec
promParticipantJoin *prometheus.CounterVec
promConnections *prometheus.GaugeVec
promPacketTotalIncomingInitial prometheus.Counter
promPacketTotalIncomingRetransmit prometheus.Counter
promPacketTotalOutgoingInitial prometheus.Counter
promPacketTotalOutgoingRetransmit prometheus.Counter
promPacketBytesIncomingInitial prometheus.Counter
promPacketBytesIncomingRetransmit prometheus.Counter
promPacketBytesOutgoingInitial prometheus.Counter
promPacketBytesOutgoingRetransmit prometheus.Counter
)
func initPacketStats(nodeID string, nodeType livekit.NodeType, env string) {
@@ -140,13 +149,32 @@ func initPacketStats(nodeID string, nodeType livekit.NodeType, env string) {
prometheus.MustRegister(promRTT)
prometheus.MustRegister(promParticipantJoin)
prometheus.MustRegister(promConnections)
promPacketTotalIncomingInitial = promPacketTotal.WithLabelValues(string(Incoming), transmissionInitial)
promPacketTotalIncomingRetransmit = promPacketTotal.WithLabelValues(string(Incoming), transmissionRetransmit)
promPacketTotalOutgoingInitial = promPacketTotal.WithLabelValues(string(Outgoing), transmissionInitial)
promPacketTotalOutgoingRetransmit = promPacketTotal.WithLabelValues(string(Outgoing), transmissionRetransmit)
promPacketBytesIncomingInitial = promPacketBytes.WithLabelValues(string(Incoming), transmissionInitial)
promPacketBytesIncomingRetransmit = promPacketBytes.WithLabelValues(string(Incoming), transmissionRetransmit)
promPacketBytesOutgoingInitial = promPacketBytes.WithLabelValues(string(Outgoing), transmissionInitial)
promPacketBytesOutgoingRetransmit = promPacketBytes.WithLabelValues(string(Outgoing), transmissionRetransmit)
}
func IncrementPackets(direction Direction, count uint64, retransmit bool) {
promPacketTotal.WithLabelValues(
string(direction),
transmissionLabel(retransmit),
).Add(float64(count))
if direction == Incoming {
if retransmit {
promPacketTotalIncomingRetransmit.Add(float64(count))
} else {
promPacketTotalIncomingInitial.Add(float64(count))
}
} else {
if retransmit {
promPacketTotalOutgoingRetransmit.Add(float64(count))
} else {
promPacketTotalOutgoingInitial.Add(float64(count))
}
}
if direction == Incoming {
packetsIn.Add(count)
} else {
@@ -158,10 +186,20 @@ func IncrementPackets(direction Direction, count uint64, retransmit bool) {
}
func IncrementBytes(direction Direction, count uint64, retransmit bool) {
promPacketBytes.WithLabelValues(
string(direction),
transmissionLabel(retransmit),
).Add(float64(count))
if direction == Incoming {
if retransmit {
promPacketBytesIncomingRetransmit.Add(float64(count))
} else {
promPacketBytesIncomingInitial.Add(float64(count))
}
} else {
if retransmit {
promPacketBytesOutgoingRetransmit.Add(float64(count))
} else {
promPacketBytesOutgoingInitial.Add(float64(count))
}
}
if direction == Incoming {
bytesIn.Add(count)
} else {
@@ -240,11 +278,3 @@ func AddConnection(direction Direction) {
func SubConnection(direction Direction) {
promConnections.WithLabelValues(string(direction)).Sub(1)
}
func transmissionLabel(retransmit bool) string {
if !retransmit {
return transmissionInitial
} else {
return transmissionRetransmit
}
}