Files
livekit/pkg/sfu/buffer/rtpstats_receiver.go

758 lines
24 KiB
Go

// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package buffer
import (
"fmt"
"math"
"time"
"github.com/pion/rtcp"
"go.uber.org/zap/zapcore"
"github.com/livekit/livekit-server/pkg/sfu/utils"
"github.com/livekit/protocol/livekit"
protoutils "github.com/livekit/protocol/utils"
)
const (
cHistorySize = 8192
// RTCP Sender Reports are re-based to SFU time base so that all subscriber side
// can have the same time base (i. e. SFU time base). To convert publisher side
// RTCP Sender Reports to SFU timebase, a propagation delay is maintained.
// propagation_delay = time_of_report_reception - ntp_timestamp_in_report
//
// Propagation delay is adapted continuously. If it falls, adapt quickly to the
// lower value as that could be the real propagation delay. If it rises, adapt slowly
// as it might be a temporary change or slow drift. See below for handling of high deltas
// which could be a result of a path change.
cPropagationDelayFallFactor = float64(0.9)
cPropagationDelayRiseFactor = float64(0.1)
cPropagationDelaySpikeAdaptationFactor = float64(0.5)
// To account for path changes mid-stream, if the delta of the propagation delay is consistently higher, reset.
// Reset at whichever of the below happens later.
// 1. 10 seconds of persistent high delta.
// 2. at least 2 consecutive reports with high delta.
//
// A long term estimate of delta of propagation delay is maintained and delta propagation delay exceeding
// a factor of the long term estimate is considered a sharp increase. That will trigger the start of the
// path change condition and if it persists, propagation delay will be reset.
cPropagationDelayDeltaThresholdMin = 10 * time.Millisecond
cPropagationDelayDeltaThresholdMaxFactor = 2
cPropagationDelayDeltaHighResetNumReports = 2
cPropagationDelayDeltaHighResetWait = 10 * time.Second
cPropagationDelayDeltaLongTermAdaptationThreshold = 50 * time.Millisecond
// number of seconds the current report RTP timestamp can be off from expected RTP timestamp
cReportSlack = float64(60.0)
)
// ---------------------------------------------------------------------
type RTPFlowState struct {
IsNotHandled bool
HasLoss bool
LossStartInclusive uint64
LossEndExclusive uint64
IsDuplicate bool
IsOutOfOrder bool
ExtSequenceNumber uint64
ExtTimestamp uint64
}
func (r *RTPFlowState) MarshalLogObject(e zapcore.ObjectEncoder) error {
if r == nil {
return nil
}
e.AddBool("IsNotHandled", r.IsNotHandled)
e.AddBool("HasLoss", r.HasLoss)
e.AddUint64("LossStartInclusive", r.LossStartInclusive)
e.AddUint64("LossEndExclusive", r.LossEndExclusive)
e.AddBool("IsDuplicate", r.IsDuplicate)
e.AddBool("IsOutOfOrder", r.IsOutOfOrder)
e.AddUint64("ExtSequenceNumber", r.ExtSequenceNumber)
e.AddUint64("ExtTimestamp", r.ExtTimestamp)
return nil
}
// ---------------------------------------------------------------------
type RTPStatsReceiver struct {
*rtpStatsBase
sequenceNumber *utils.WrapAround[uint16, uint64]
tsRolloverThreshold int64
timestamp *utils.WrapAround[uint32, uint64]
history *protoutils.Bitmap[uint64]
propagationDelay time.Duration
longTermDeltaPropagationDelay time.Duration
propagationDelayDeltaHighCount int
propagationDelayDeltaHighStartTime time.Time
propagationDelaySpike time.Duration
clockSkewCount int
clockSkewMediaPathCount int
outOfOrderSenderReportCount int
largeJumpCount int
largeJumpNegativeCount int
timeReversedCount int
}
func NewRTPStatsReceiver(params RTPStatsParams) *RTPStatsReceiver {
return &RTPStatsReceiver{
rtpStatsBase: newRTPStatsBase(params),
sequenceNumber: utils.NewWrapAround[uint16, uint64](utils.WrapAroundParams{IsRestartAllowed: false}),
tsRolloverThreshold: (1 << 31) * 1e9 / int64(params.ClockRate),
timestamp: utils.NewWrapAround[uint32, uint64](utils.WrapAroundParams{IsRestartAllowed: false}),
history: protoutils.NewBitmap[uint64](cHistorySize),
}
}
func (r *RTPStatsReceiver) NewSnapshotId() uint32 {
r.lock.Lock()
defer r.lock.Unlock()
return r.newSnapshotID(r.sequenceNumber.GetExtendedHighest())
}
func (r *RTPStatsReceiver) getTSRolloverCount(diffNano int64) int {
if diffNano < r.tsRolloverThreshold {
// time not more than rollover threshold
return 0
}
excess := int((diffNano - r.tsRolloverThreshold) * int64(r.params.ClockRate) / 1e9)
return excess/(1<<32) + 1
}
func (r *RTPStatsReceiver) Update(
packetTime int64,
sequenceNumber uint16,
timestamp uint32,
marker bool,
hdrSize int,
payloadSize int,
paddingSize int,
) (flowState RTPFlowState) {
r.lock.Lock()
defer r.lock.Unlock()
if !r.endTime.IsZero() {
flowState.IsNotHandled = true
return
}
var resSN utils.WrapAroundUpdateResult[uint64]
var gapSN int64
var resTS utils.WrapAroundUpdateResult[uint64]
getLoggingFields := func() []interface{} {
return []interface{}{
"resSN", resSN,
"gapSN", gapSN,
"resTS", resTS,
"gapTS", int64(resTS.ExtendedVal - resTS.PreExtendedHighest),
"packetTime", time.Unix(0, packetTime).String(),
"sequenceNumber", sequenceNumber,
"timestamp", timestamp,
"marker", marker,
"hdrSize", hdrSize,
"payloadSize", payloadSize,
"paddingSize", paddingSize,
"rtpStats", lockedRTPStatsReceiverLogEncoder{r},
}
}
if !r.initialized {
if payloadSize == 0 {
// do not start on a padding only packet
flowState.IsNotHandled = true
return
}
r.initialized = true
r.startTime = time.Now()
r.firstTime = packetTime
r.highestTime = packetTime
resSN = r.sequenceNumber.Update(sequenceNumber)
resTS = r.timestamp.Update(timestamp)
// initialize snapshots if any
for i := uint32(0); i < r.nextSnapshotID-cFirstSnapshotID; i++ {
r.snapshots[i] = r.initSnapshot(r.startTime, r.sequenceNumber.GetExtendedStart())
}
r.logger.Debugw(
"rtp receiver stream start",
"rtpStats", lockedRTPStatsReceiverLogEncoder{r},
)
} else {
resSN = r.sequenceNumber.Update(sequenceNumber)
if resSN.IsUnhandled {
flowState.IsNotHandled = true
return
}
gapSN = int64(resSN.ExtendedVal - resSN.PreExtendedHighest)
resTS = r.timestamp.Rollover(timestamp, r.getTSRolloverCount(packetTime-r.highestTime))
if resTS.IsUnhandled {
flowState.IsNotHandled = true
return
}
gapTS := int64(resTS.ExtendedVal - resTS.PreExtendedHighest)
// it is possible that sequence number has rolled over too
if gapSN < 0 && gapTS > 0 && payloadSize > 0 {
// not possible to know how many cycles of sequence number roll over could have happened,
// use 1 to ensure that it at least does not go backwards
resSN = r.sequenceNumber.Rollover(sequenceNumber, 1)
if resSN.IsUnhandled {
flowState.IsNotHandled = true
return
}
r.logger.Warnw(
"forcing sequence number rollover", nil,
getLoggingFields()...,
)
gapSN = int64(resSN.ExtendedVal - resSN.PreExtendedHighest)
}
}
pktSize := uint64(hdrSize + payloadSize + paddingSize)
if gapSN <= 0 { // duplicate OR out-of-order
if gapSN != 0 {
r.packetsOutOfOrder++
}
if r.isInRange(resSN.ExtendedVal, resSN.PreExtendedHighest) {
if r.history.IsSet(resSN.ExtendedVal) {
r.bytesDuplicate += pktSize
r.headerBytesDuplicate += uint64(hdrSize)
r.packetsDuplicate++
flowState.IsDuplicate = true
} else {
r.packetsLost--
r.history.Set(resSN.ExtendedVal)
}
}
flowState.IsOutOfOrder = true
flowState.ExtSequenceNumber = resSN.ExtendedVal
flowState.ExtTimestamp = resTS.ExtendedVal
if !flowState.IsDuplicate && -gapSN >= cSequenceNumberLargeJumpThreshold {
r.largeJumpNegativeCount++
if (r.largeJumpNegativeCount-1)%100 == 0 {
r.logger.Warnw(
"large sequence number gap negative", nil,
append(getLoggingFields(), "count", r.largeJumpNegativeCount)...,
)
}
}
} else { // in-order
if gapSN >= cSequenceNumberLargeJumpThreshold {
r.largeJumpCount++
if (r.largeJumpCount-1)%100 == 0 {
r.logger.Warnw(
"large sequence number gap", nil,
append(getLoggingFields(), "count", r.largeJumpCount)...,
)
}
}
if resTS.ExtendedVal < resTS.PreExtendedHighest {
r.timeReversedCount++
if (r.timeReversedCount-1)%100 == 0 {
r.logger.Warnw(
"time reversed", nil,
append(getLoggingFields(), "count", r.timeReversedCount)...,
)
}
}
// update gap histogram
r.updateGapHistogram(int(gapSN))
// update missing sequence numbers
r.history.ClearRange(resSN.PreExtendedHighest+1, resSN.ExtendedVal-1)
r.packetsLost += uint64(gapSN - 1)
r.history.Set(resSN.ExtendedVal)
if timestamp != uint32(resTS.PreExtendedHighest) {
// update only on first packet as same timestamp could be in multiple packets.
// NOTE: this may not be the first packet with this time stamp if there is packet loss.
r.highestTime = packetTime
}
if gapSN > 1 {
flowState.HasLoss = true
flowState.LossStartInclusive = resSN.PreExtendedHighest + 1
flowState.LossEndExclusive = resSN.ExtendedVal
}
flowState.ExtSequenceNumber = resSN.ExtendedVal
flowState.ExtTimestamp = resTS.ExtendedVal
}
if !flowState.IsDuplicate {
if payloadSize == 0 {
r.packetsPadding++
r.bytesPadding += pktSize
r.headerBytesPadding += uint64(hdrSize)
} else {
r.bytes += pktSize
r.headerBytes += uint64(hdrSize)
if marker {
r.frames++
}
r.updateJitter(resTS.ExtendedVal, packetTime)
}
}
return
}
func (r *RTPStatsReceiver) getExtendedSenderReport(srData *RTCPSenderReportData) *RTCPSenderReportData {
tsCycles := uint64(0)
if r.srNewest != nil {
// use time since last sender report to ensure long gaps where the time stamp might
// jump more than half the range
timeSinceLastReport := srData.NTPTimestamp.Time().Sub(r.srNewest.NTPTimestamp.Time())
expectedRTPTimestampExt := r.srNewest.RTPTimestampExt + uint64(timeSinceLastReport.Nanoseconds()*int64(r.params.ClockRate)/1e9)
lbound := expectedRTPTimestampExt - uint64(cReportSlack*float64(r.params.ClockRate))
ubound := expectedRTPTimestampExt + uint64(cReportSlack*float64(r.params.ClockRate))
isInRange := (srData.RTPTimestamp-uint32(lbound) < (1 << 31)) && (uint32(ubound)-srData.RTPTimestamp < (1 << 31))
if isInRange {
lbTSCycles := lbound & 0xFFFF_FFFF_0000_0000
ubTSCycles := ubound & 0xFFFF_FFFF_0000_0000
if lbTSCycles == ubTSCycles {
tsCycles = lbTSCycles
} else {
if srData.RTPTimestamp < (1 << 31) {
// rolled over
tsCycles = ubTSCycles
} else {
tsCycles = lbTSCycles
}
}
} else {
// ideally this method should not be required, but there are clients
// negotiating one clock rate, but actually send media at a different rate.
tsCycles = r.srNewest.RTPTimestampExt & 0xFFFF_FFFF_0000_0000
if (srData.RTPTimestamp-r.srNewest.RTPTimestamp) < (1<<31) && srData.RTPTimestamp < r.srNewest.RTPTimestamp {
tsCycles += (1 << 32)
}
if tsCycles >= (1 << 32) {
if (srData.RTPTimestamp-r.srNewest.RTPTimestamp) >= (1<<31) && srData.RTPTimestamp > r.srNewest.RTPTimestamp {
tsCycles -= (1 << 32)
}
}
}
}
srDataExt := *srData
srDataExt.RTPTimestampExt = uint64(srDataExt.RTPTimestamp) + tsCycles
return &srDataExt
}
func (r *RTPStatsReceiver) checkOutOfOrderSenderReport(srData *RTCPSenderReportData) bool {
if r.srNewest != nil && srData.RTPTimestampExt < r.srNewest.RTPTimestampExt {
// This can happen when a track is replaced with a null and then restored -
// i. e. muting replacing with null and unmute restoring the original track.
// Or it could be due bad report generation.
// In any case, ignore out-of-order reports.
r.outOfOrderSenderReportCount++
if (r.outOfOrderSenderReportCount-1)%10 == 0 {
r.logger.Infow(
"received sender report, out-of-order, skipping",
"current", srData,
"count", r.outOfOrderSenderReportCount,
"rtpStats", lockedRTPStatsReceiverLogEncoder{r},
)
}
return true
}
return false
}
func (r *RTPStatsReceiver) checkRTPClockSkewForSenderReport(srData *RTCPSenderReportData) {
if r.srNewest == nil {
return
}
timeSinceLast := srData.NTPTimestamp.Time().Sub(r.srNewest.NTPTimestamp.Time()).Seconds()
rtpDiffSinceLast := srData.RTPTimestampExt - r.srNewest.RTPTimestampExt
calculatedClockRateFromLast := float64(rtpDiffSinceLast) / timeSinceLast
timeSinceFirst := srData.NTPTimestamp.Time().Sub(r.srFirst.NTPTimestamp.Time()).Seconds()
rtpDiffSinceFirst := srData.RTPTimestampExt - r.srFirst.RTPTimestampExt
calculatedClockRateFromFirst := float64(rtpDiffSinceFirst) / timeSinceFirst
if (timeSinceLast > 0.2 && math.Abs(float64(r.params.ClockRate)-calculatedClockRateFromLast) > 0.2*float64(r.params.ClockRate)) ||
(timeSinceFirst > 0.2 && math.Abs(float64(r.params.ClockRate)-calculatedClockRateFromFirst) > 0.2*float64(r.params.ClockRate)) {
r.clockSkewCount++
if (r.clockSkewCount-1)%100 == 0 {
r.logger.Infow(
"received sender report, clock skew",
"current", srData,
"timeSinceFirst", timeSinceFirst,
"rtpDiffSinceFirst", rtpDiffSinceFirst,
"calculatedFirst", calculatedClockRateFromFirst,
"timeSinceLast", timeSinceLast,
"rtpDiffSinceLast", rtpDiffSinceLast,
"calculatedLast", calculatedClockRateFromLast,
"count", r.clockSkewCount,
"rtpStats", lockedRTPStatsReceiverLogEncoder{r},
)
}
}
}
func (r *RTPStatsReceiver) checkRTPClockSkewAgainstMediaPathForSenderReport(srData *RTCPSenderReportData) {
if r.highestTime == 0 {
return
}
timeSinceSR := time.Since(srData.AtAdjusted)
extNowTSSR := srData.RTPTimestampExt + uint64(timeSinceSR.Nanoseconds()*int64(r.params.ClockRate)/1e9)
timeSinceHighest := time.Since(time.Unix(0, r.highestTime))
extNowTSHighest := r.timestamp.GetExtendedHighest() + uint64(timeSinceHighest.Nanoseconds()*int64(r.params.ClockRate)/1e9)
diffHighest := extNowTSSR - extNowTSHighest
timeSinceFirst := time.Since(time.Unix(0, r.firstTime))
extNowTSFirst := r.timestamp.GetExtendedStart() + uint64(timeSinceFirst.Nanoseconds()*int64(r.params.ClockRate)/1e9)
diffFirst := extNowTSSR - extNowTSFirst
// is it more than 5 seconds off?
if uint32(math.Abs(float64(int64(diffHighest)))) > 5*r.params.ClockRate || uint32(math.Abs(float64(int64(diffFirst)))) > 5*r.params.ClockRate {
r.clockSkewMediaPathCount++
if (r.clockSkewMediaPathCount-1)%100 == 0 {
r.logger.Infow(
"received sender report, clock skew against media path",
"current", srData,
"timeSinceSR", timeSinceSR,
"extNowTSSR", extNowTSSR,
"timeSinceHighest", timeSinceHighest,
"extNowTSHighest", extNowTSHighest,
"diffHighest", int64(diffHighest),
"timeSinceFirst", timeSinceFirst,
"extNowTSFirst", extNowTSFirst,
"diffFirst", int64(diffFirst),
"count", r.clockSkewMediaPathCount,
"rtpStats", lockedRTPStatsReceiverLogEncoder{r},
)
}
}
}
func (r *RTPStatsReceiver) updatePropagationDelayAndRecordSenderReport(srData *RTCPSenderReportData) {
var propagationDelay time.Duration
var deltaPropagationDelay time.Duration
getPropagationFields := func() []interface{} {
return []interface{}{
"receivedPropagationDelay", propagationDelay.String(),
"receivedDeltaPropagationDelay", deltaPropagationDelay.String(),
"deltaHighCount", r.propagationDelayDeltaHighCount,
"sinceDeltaHighStart", time.Since(r.propagationDelayDeltaHighStartTime).String(),
"propagationDelaySpike", r.propagationDelaySpike.String(),
"current", srData,
"rtpStats", lockedRTPStatsReceiverLogEncoder{r},
}
}
resetDelta := func() {
r.propagationDelayDeltaHighCount = 0
r.propagationDelayDeltaHighStartTime = time.Time{}
r.propagationDelaySpike = 0
}
initPropagationDelay := func(pd time.Duration) {
r.propagationDelay = pd
r.longTermDeltaPropagationDelay = 0
resetDelta()
}
ntpTime := srData.NTPTimestamp.Time()
propagationDelay = srData.At.Sub(ntpTime)
if r.srFirst == nil {
r.srFirst = srData
initPropagationDelay(propagationDelay)
r.logger.Debugw("initializing propagation delay", getPropagationFields()...)
} else {
deltaPropagationDelay = propagationDelay - r.propagationDelay
if deltaPropagationDelay > cPropagationDelayDeltaThresholdMin { // ignore small changes for path change consideration
if r.longTermDeltaPropagationDelay != 0 &&
deltaPropagationDelay > r.longTermDeltaPropagationDelay*time.Duration(cPropagationDelayDeltaThresholdMaxFactor) {
r.logger.Debugw("sharp increase in propagation delay", getPropagationFields()...)
r.propagationDelayDeltaHighCount++
if r.propagationDelayDeltaHighStartTime.IsZero() {
r.propagationDelayDeltaHighStartTime = time.Now()
}
if r.propagationDelaySpike == 0 {
r.propagationDelaySpike = propagationDelay
} else {
r.propagationDelaySpike += time.Duration(cPropagationDelaySpikeAdaptationFactor * float64(propagationDelay-r.propagationDelaySpike))
}
if r.propagationDelayDeltaHighCount >= cPropagationDelayDeltaHighResetNumReports && time.Since(r.propagationDelayDeltaHighStartTime) >= cPropagationDelayDeltaHighResetWait {
r.logger.Debugw("re-initializing propagation delay", append(getPropagationFields(), "newPropagationDelay", r.propagationDelaySpike.String())...)
initPropagationDelay(r.propagationDelaySpike)
}
} else {
resetDelta()
}
} else {
resetDelta()
factor := cPropagationDelayFallFactor
if propagationDelay > r.propagationDelay {
factor = cPropagationDelayRiseFactor
}
r.propagationDelay += time.Duration(factor * float64(propagationDelay-r.propagationDelay))
}
if deltaPropagationDelay < cPropagationDelayDeltaLongTermAdaptationThreshold {
if r.longTermDeltaPropagationDelay == 0 {
r.longTermDeltaPropagationDelay = deltaPropagationDelay
} else {
// do not adapt to large +ve spikes, can happen when channel is congested and reports are delivered very late
// if the spike is in fact a path change, it will persist and handled by path change detection above
sinceLastReport := srData.NTPTimestamp.Time().Sub(r.srNewest.NTPTimestamp.Time())
adaptationFactor := min(1.0, float64(sinceLastReport)/float64(cPropagationDelayDeltaHighResetWait))
r.longTermDeltaPropagationDelay += time.Duration(adaptationFactor * float64(deltaPropagationDelay-r.longTermDeltaPropagationDelay))
}
}
if r.longTermDeltaPropagationDelay < 0 {
r.longTermDeltaPropagationDelay = 0
}
}
// adjust receive time to estimated propagation delay
srData.AtAdjusted = ntpTime.Add(r.propagationDelay)
r.srNewest = srData
}
func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData) bool {
r.lock.Lock()
defer r.lock.Unlock()
if srData == nil || !r.initialized {
return false
}
// prevent against extreme case of anachronous sender reports
if r.srNewest != nil && r.srNewest.NTPTimestamp > srData.NTPTimestamp {
r.logger.Infow(
"received sender report, anachronous, dropping",
"current", srData,
"rtpStats", lockedRTPStatsReceiverLogEncoder{r},
)
return false
}
srDataExt := r.getExtendedSenderReport(srData)
if r.checkOutOfOrderSenderReport(srDataExt) {
return false
}
r.updatePropagationDelayAndRecordSenderReport(srDataExt)
r.checkRTPClockSkewForSenderReport(srDataExt)
r.checkRTPClockSkewAgainstMediaPathForSenderReport(srDataExt)
if err, loggingFields := r.maybeAdjustFirstPacketTime(r.srNewest, 0, r.timestamp.GetExtendedStart()); err != nil {
r.logger.Infow(err.Error(), append(loggingFields, "rtpStats", lockedRTPStatsReceiverLogEncoder{r})...)
}
return true
}
func (r *RTPStatsReceiver) GetRtcpSenderReportData() *RTCPSenderReportData {
r.lock.RLock()
defer r.lock.RUnlock()
if r.srNewest == nil {
return nil
}
srNewestCopy := *r.srNewest
return &srNewestCopy
}
func (r *RTPStatsReceiver) LastSenderReportTime() time.Time {
r.lock.RLock()
defer r.lock.RUnlock()
if r.srNewest != nil {
return r.srNewest.At
}
return time.Time{}
}
func (r *RTPStatsReceiver) GetRtcpReceptionReport(ssrc uint32, proxyFracLost uint8, snapshotID uint32) *rtcp.ReceptionReport {
r.lock.Lock()
defer r.lock.Unlock()
extHighestSN := r.sequenceNumber.GetExtendedHighest()
then, now := r.getAndResetSnapshot(snapshotID, r.sequenceNumber.GetExtendedStart(), extHighestSN)
if now == nil || then == nil {
return nil
}
packetsExpected := now.extStartSN - then.extStartSN
if packetsExpected > cNumSequenceNumbers {
r.logger.Warnw(
"too many packets expected in receiver report",
fmt.Errorf("start: %d, end: %d, expected: %d", then.extStartSN, now.extStartSN, packetsExpected),
"rtpStats", lockedRTPStatsReceiverLogEncoder{r},
)
return nil
}
if packetsExpected == 0 {
return nil
}
packetsLost := uint32(now.packetsLost - then.packetsLost)
if int32(packetsLost) < 0 {
packetsLost = 0
}
lossRate := float32(packetsLost) / float32(packetsExpected)
fracLost := uint8(lossRate * 256.0)
if proxyFracLost > fracLost {
fracLost = proxyFracLost
}
totalLost := r.packetsLost
if totalLost > 0xffffff { // 24-bits max
totalLost = 0xffffff
}
lastSR := uint32(0)
dlsr := uint32(0)
if r.srNewest != nil {
lastSR = uint32(r.srNewest.NTPTimestamp >> 16)
if !r.srNewest.At.IsZero() {
delayUS := time.Since(r.srNewest.At).Microseconds()
dlsr = uint32(delayUS * 65536 / 1e6)
}
}
return &rtcp.ReceptionReport{
SSRC: ssrc,
FractionLost: fracLost,
TotalLost: uint32(totalLost),
LastSequenceNumber: uint32(now.extStartSN),
Jitter: uint32(r.jitter),
LastSenderReport: lastSR,
Delay: dlsr,
}
}
func (r *RTPStatsReceiver) DeltaInfo(snapshotID uint32) *RTPDeltaInfo {
r.lock.Lock()
defer r.lock.Unlock()
deltaInfo, err, loggingFields := r.deltaInfo(
snapshotID,
r.sequenceNumber.GetExtendedStart(),
r.sequenceNumber.GetExtendedHighest(),
)
if err != nil {
r.logger.Infow(err.Error(), append(loggingFields, "rtpStats", lockedRTPStatsReceiverLogEncoder{r})...)
}
return deltaInfo
}
func (r *RTPStatsReceiver) MarshalLogObject(e zapcore.ObjectEncoder) error {
if r == nil {
return nil
}
r.lock.RLock()
defer r.lock.RUnlock()
return lockedRTPStatsReceiverLogEncoder{r}.MarshalLogObject(e)
}
func (r *RTPStatsReceiver) String() string {
r.lock.RLock()
defer r.lock.RUnlock()
return r.toString(
r.sequenceNumber.GetExtendedStart(), r.sequenceNumber.GetExtendedHighest(), r.timestamp.GetExtendedStart(), r.timestamp.GetExtendedHighest(),
r.packetsLost,
r.jitter, r.maxJitter,
)
}
func (r *RTPStatsReceiver) ToProto() *livekit.RTPStats {
r.lock.RLock()
defer r.lock.RUnlock()
return r.toProto(
r.sequenceNumber.GetExtendedStart(), r.sequenceNumber.GetExtendedHighest(), r.timestamp.GetExtendedStart(), r.timestamp.GetExtendedHighest(),
r.packetsLost,
r.jitter, r.maxJitter,
)
}
func (r *RTPStatsReceiver) isInRange(esn uint64, ehsn uint64) bool {
diff := int64(ehsn - esn)
return diff >= 0 && diff < cHistorySize
}
// ----------------------------------
type lockedRTPStatsReceiverLogEncoder struct {
*RTPStatsReceiver
}
func (r lockedRTPStatsReceiverLogEncoder) MarshalLogObject(e zapcore.ObjectEncoder) error {
if r.RTPStatsReceiver == nil {
return nil
}
e.AddObject("base", r.rtpStatsBase)
e.AddUint64("extStartSN", r.sequenceNumber.GetExtendedStart())
e.AddUint64("extHighestSN", r.sequenceNumber.GetExtendedHighest())
e.AddUint64("extStartTS", r.timestamp.GetExtendedStart())
e.AddUint64("extHighestTS", r.timestamp.GetExtendedHighest())
e.AddDuration("propagationDelay", r.propagationDelay)
e.AddDuration("longTermDeltaPropagationDelay", r.longTermDeltaPropagationDelay)
return nil
}
// ----------------------------------