Split RTPStats into receiver and sender. (#2055)

* Split RTPStats into receiver and sender.

For receiver, short types are input and need to calculate extended type.

For sender (subscriber), it can operate only in extended type.
This makes the subscriber side a little simpler and should make it more
efficient as it can do simple comparisons in extended type space.

There was also an issue with subscriber using shorter type and
calculating extended type. When subscriber starts after the publisher
has already rolled over in sequence number OR timestamp, when
subsequent publisher side sender reports are used to adjust subscriber
time stamps, they were out of whack. Using extended type on subscriber
does not face that.

* fix test

* extended types from sequencer

* log
This commit is contained in:
Raja Subramanian
2023-09-11 07:33:39 +05:30
committed by GitHub
parent 820730a385
commit c09d8d0878
13 changed files with 2592 additions and 2122 deletions
+2 -2
View File
@@ -327,14 +327,14 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra
t.MediaTrackSubscriptions.UpdateVideoLayers()
})
buff.OnFinalRtpStats(func(stats *buffer.RTPStats) {
buff.OnFinalRtpStats(func(stats *livekit.RTPStats) {
t.params.Telemetry.TrackPublishRTPStats(
context.Background(),
t.params.ParticipantID,
t.ID(),
mime,
int(layer),
stats.ToProto(),
stats,
)
})
return newCodec
+15 -7
View File
@@ -97,7 +97,7 @@ type Buffer struct {
pliThrottle int64
rtpStats *RTPStats
rtpStats *RTPStatsReceiver
rrSnapshotId uint32
deltaStatsSnapshotId uint32
@@ -108,7 +108,7 @@ type Buffer struct {
onRtcpFeedback func([]rtcp.Packet)
onRtcpSenderReport func()
onFpsChanged func()
onFinalRtpStats func(*RTPStats)
onFinalRtpStats func(*livekit.RTPStats)
// logger
logger logger.Logger
@@ -175,7 +175,7 @@ func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapabili
return
}
b.rtpStats = NewRTPStats(RTPStatsParams{
b.rtpStats = NewRTPStatsReceiver(RTPStatsParams{
ClockRate: codec.ClockRate,
Logger: b.logger,
})
@@ -350,7 +350,7 @@ func (b *Buffer) Close() error {
b.rtpStats.Stop()
b.logger.Infow("rtp stats", "direction", "upstream", "stats", b.rtpStats.ToString())
if b.onFinalRtpStats != nil {
b.onFinalRtpStats(b.rtpStats)
b.onFinalRtpStats(b.rtpStats.ToProto())
}
}
@@ -530,7 +530,15 @@ func (b *Buffer) doFpsCalc(ep *ExtPacket) {
}
func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime time.Time) RTPFlowState {
flowState := b.rtpStats.Update(&p.Header, len(p.Payload), int(p.PaddingSize), arrivalTime)
flowState := b.rtpStats.Update(
arrivalTime,
p.Header.SequenceNumber,
p.Header.Timestamp,
p.Header.Marker,
p.Header.MarshalSize(),
len(p.Payload),
int(p.PaddingSize),
)
if b.nacker != nil {
b.nacker.Remove(p.SequenceNumber)
@@ -693,7 +701,7 @@ func (b *Buffer) buildReceptionReport() *rtcp.ReceptionReport {
return nil
}
return b.rtpStats.SnapshotRtcpReceptionReport(b.mediaSSRC, b.lastFractionLostToReport, b.rrSnapshotId)
return b.rtpStats.GetRtcpReceptionReport(b.mediaSSRC, b.lastFractionLostToReport, b.rrSnapshotId)
}
func (b *Buffer) SetSenderReportData(rtpTime uint32, ntpTime uint64, packetCount uint32) {
@@ -770,7 +778,7 @@ func (b *Buffer) OnRtcpSenderReport(fn func()) {
b.onRtcpSenderReport = fn
}
func (b *Buffer) OnFinalRtpStats(fn func(*RTPStats)) {
func (b *Buffer) OnFinalRtpStats(fn func(*livekit.RTPStats)) {
b.onFinalRtpStats = fn
}
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
+471
View File
@@ -0,0 +1,471 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package buffer
import (
"fmt"
"time"
"github.com/pion/rtcp"
"github.com/livekit/livekit-server/pkg/sfu/utils"
"github.com/livekit/protocol/livekit"
)
type RTPFlowState struct {
IsNotHandled bool
HasLoss bool
LossStartInclusive uint64
LossEndExclusive uint64
IsDuplicate bool
IsOutOfOrder bool
ExtSequenceNumber uint64
ExtTimestamp uint64
}
type RTPStatsReceiver struct {
*rtpStatsBase
resyncOnNextPacket bool
shouldDiscountPaddingOnlyDrops bool
sequenceNumber *utils.WrapAround[uint16, uint64]
timestamp *utils.WrapAround[uint32, uint64]
}
func NewRTPStatsReceiver(params RTPStatsParams) *RTPStatsReceiver {
return &RTPStatsReceiver{
rtpStatsBase: newRTPStatsBase(params),
sequenceNumber: utils.NewWrapAround[uint16, uint64](),
timestamp: utils.NewWrapAround[uint32, uint64](),
}
}
func (r *RTPStatsReceiver) NewSnapshotId() uint32 {
r.lock.Lock()
defer r.lock.Unlock()
return r.newSnapshotID(r.sequenceNumber.GetExtendedStart())
}
func (r *RTPStatsReceiver) Update(
packetTime time.Time,
sequenceNumber uint16,
timestamp uint32,
marker bool,
hdrSize int,
payloadSize int,
paddingSize int,
) (flowState RTPFlowState) {
r.lock.Lock()
defer r.lock.Unlock()
if !r.endTime.IsZero() {
flowState.IsNotHandled = true
return
}
if r.resyncOnNextPacket {
r.resyncOnNextPacket = false
r.resync(packetTime, sequenceNumber, timestamp)
}
var resSN utils.WrapAroundUpdateResult[uint64]
var resTS utils.WrapAroundUpdateResult[uint64]
if !r.initialized {
if payloadSize == 0 {
// do not start on a padding only packet
flowState.IsNotHandled = true
return
}
r.initialized = true
r.startTime = time.Now()
r.firstTime = packetTime
r.highestTime = packetTime
resSN = r.sequenceNumber.Update(sequenceNumber)
resTS = r.timestamp.Update(timestamp)
// initialize snapshots if any
for i := uint32(cFirstSnapshotID); i < r.nextSnapshotID; i++ {
r.snapshots[i] = &snapshot{
startTime: r.startTime,
extStartSN: r.sequenceNumber.GetExtendedStart(),
}
}
r.logger.Debugw(
"rtp receiver stream start",
"startTime", r.startTime.String(),
"firstTime", r.firstTime.String(),
"startSN", r.sequenceNumber.GetExtendedStart(),
"startTS", r.timestamp.GetExtendedStart(),
)
} else {
resSN = r.sequenceNumber.Update(sequenceNumber)
resTS = r.timestamp.Update(timestamp)
}
pktSize := uint64(hdrSize + payloadSize + paddingSize)
gapSN := int64(resSN.ExtendedVal - resSN.PreExtendedHighest)
if gapSN <= 0 { // duplicate OR out-of-order
if payloadSize == 0 {
// do not start on a padding only packet
if resTS.IsRestart {
r.logger.Infow("rolling back timestamp restart", "tsBefore", r.timestamp.GetExtendedStart(), "tsAfter", resTS.PreExtendedStart)
r.timestamp.RollbackRestart(resTS.PreExtendedStart)
}
if resSN.IsRestart {
r.logger.Infow("rolling back sequence number restart", "snBefore", r.sequenceNumber.GetExtendedStart(), "snAfter", resSN.PreExtendedStart)
r.sequenceNumber.RollbackRestart(resSN.PreExtendedStart)
return
}
}
if gapSN != 0 {
r.packetsOutOfOrder++
}
if resSN.IsRestart {
r.packetsLost += resSN.PreExtendedStart - resSN.ExtendedVal
extStartSN := r.sequenceNumber.GetExtendedStart()
for _, s := range r.snapshots {
if s.extStartSN == resSN.PreExtendedStart {
s.extStartSN = extStartSN
}
}
r.logger.Infow(
"adjusting start sequence number",
"snBefore", resSN.PreExtendedStart,
"snAfter", resSN.ExtendedVal,
)
}
if resTS.IsRestart {
r.logger.Infow(
"adjusting start timestamp",
"tsBefore", resTS.PreExtendedStart,
"tsAfter", resTS.ExtendedVal,
)
}
if !r.isSnInfoLost(resSN.ExtendedVal, resSN.PreExtendedHighest) {
r.bytesDuplicate += pktSize
r.headerBytesDuplicate += uint64(hdrSize)
r.packetsDuplicate++
flowState.IsDuplicate = true
} else {
r.packetsLost--
r.setSnInfo(resSN.ExtendedVal, resSN.PreExtendedHighest, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), marker, true)
}
flowState.IsOutOfOrder = true
flowState.ExtSequenceNumber = resSN.ExtendedVal
flowState.ExtTimestamp = resTS.ExtendedVal
} else { // in-order
// update gap histogram
r.updateGapHistogram(int(gapSN))
// update missing sequence numbers
r.clearSnInfos(resSN.PreExtendedHighest+1, resSN.ExtendedVal)
r.packetsLost += uint64(gapSN - 1)
r.setSnInfo(resSN.ExtendedVal, resSN.PreExtendedHighest, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), marker, false)
if timestamp != uint32(resTS.PreExtendedHighest) {
// update only on first packet as same timestamp could be in multiple packets.
// NOTE: this may not be the first packet with this time stamp if there is packet loss.
r.highestTime = packetTime
}
if gapSN > 1 {
flowState.HasLoss = true
flowState.LossStartInclusive = resSN.PreExtendedHighest + 1
flowState.LossEndExclusive = resSN.ExtendedVal
}
flowState.ExtSequenceNumber = resSN.ExtendedVal
flowState.ExtTimestamp = resTS.ExtendedVal
}
if !flowState.IsDuplicate {
if payloadSize == 0 {
r.packetsPadding++
r.bytesPadding += pktSize
r.headerBytesPadding += uint64(hdrSize)
} else {
r.bytes += pktSize
r.headerBytes += uint64(hdrSize)
if marker {
r.frames++
}
r.updateJitter(resTS.ExtendedVal, packetTime)
}
}
return
}
func (r *RTPStatsReceiver) ResyncOnNextPacket(shouldDiscountPaddingOnlyDrops bool) {
r.lock.Lock()
defer r.lock.Unlock()
r.resyncOnNextPacket = true
r.shouldDiscountPaddingOnlyDrops = shouldDiscountPaddingOnlyDrops
}
func (r *RTPStatsReceiver) resync(packetTime time.Time, sn uint16, ts uint32) {
if !r.initialized {
return
}
extHighestSN := r.sequenceNumber.GetExtendedHighest()
var newestPacketCount uint64
var paddingOnlyDrops uint64
var extExpectedHighestSN uint64
var expectedHighestSN uint16
var snCycles uint64
extHighestTS := r.timestamp.GetExtendedHighest()
var newestTS uint64
var extExpectedHighestTS uint64
var expectedHighestTS uint32
var tsCycles uint64
if r.srNewest != nil {
newestPacketCount = r.srNewest.PacketCountExt
paddingOnlyDrops = r.srNewest.PaddingOnlyDrops
if newestPacketCount != 0 {
extExpectedHighestSN = r.sequenceNumber.GetExtendedStart() + newestPacketCount
if r.shouldDiscountPaddingOnlyDrops {
extExpectedHighestSN -= paddingOnlyDrops
}
expectedHighestSN = uint16(extExpectedHighestSN & 0xFFFF)
snCycles = extExpectedHighestSN & 0xFFFF_FFFF_FFFF_0000
if sn-expectedHighestSN < (1<<15) && sn < expectedHighestSN {
snCycles += (1 << 16)
}
if snCycles != 0 && expectedHighestSN-sn < (1<<15) && expectedHighestSN < sn {
snCycles -= (1 << 16)
}
}
newestTS = r.srNewest.RTPTimestampExt
extExpectedHighestTS = newestTS
expectedHighestTS = uint32(extExpectedHighestTS & 0xFFFF_FFFF)
tsCycles = extExpectedHighestTS & 0xFFFF_FFFF_0000_0000
if ts-expectedHighestTS < (1<<31) && ts < expectedHighestTS {
tsCycles += (1 << 32)
}
if tsCycles != 0 && expectedHighestTS-ts < (1<<31) && expectedHighestTS < ts {
tsCycles -= (1 << 32)
}
}
r.sequenceNumber.ResetHighest(snCycles + uint64(sn) - 1)
r.timestamp.ResetHighest(tsCycles + uint64(ts))
r.highestTime = packetTime
r.logger.Debugw(
"resync",
"newestPacketCount", newestPacketCount,
"paddingOnlyDrops", paddingOnlyDrops,
"extExpectedHighestSN", extExpectedHighestSN,
"expectedHighestSN", expectedHighestSN,
"snCycles", snCycles,
"rtpSN", sn,
"beforeExtHighestSN", extHighestSN,
"afterExtHighestSN", r.sequenceNumber.GetExtendedHighest(),
"newestTS", newestTS,
"extExpectedHighestTS", extExpectedHighestTS,
"expectedHighestTS", expectedHighestTS,
"tsCycles", tsCycles,
"rtpTS", ts,
"beforeExtHighestTS", extHighestTS,
"afterExtHighestTS", r.timestamp.GetExtendedHighest(),
)
}
func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData) {
r.lock.Lock()
defer r.lock.Unlock()
if srData == nil || !r.initialized {
return
}
// prevent against extreme case of anachronous sender reports
if r.srNewest != nil && r.srNewest.NTPTimestamp > srData.NTPTimestamp {
r.logger.Infow(
"received anachronous sender report",
"currentNTP", srData.NTPTimestamp.Time().String(),
"currentRTP", srData.RTPTimestamp,
"currentAt", srData.At.String(),
"lastNTP", r.srNewest.NTPTimestamp.Time().String(),
"lastRTP", r.srNewest.RTPTimestamp,
"lastAt", r.srNewest.At.String(),
)
return
}
tsCycles := uint64(0)
pcCycles := uint64(0)
if r.srNewest != nil {
tsCycles = r.srNewest.RTPTimestampExt & 0xFFFF_FFFF_0000_0000
if (srData.RTPTimestamp-r.srNewest.RTPTimestamp) < (1<<31) && srData.RTPTimestamp < r.srNewest.RTPTimestamp {
tsCycles += (1 << 32)
}
pcCycles = r.srNewest.PacketCountExt & 0xFFFF_FFFF_0000_0000
if (srData.PacketCount-r.srNewest.PacketCount) < (1<<31) && srData.PacketCount < r.srNewest.PacketCount {
pcCycles += (1 << 32)
}
}
srDataCopy := *srData
srDataCopy.RTPTimestampExt = uint64(srDataCopy.RTPTimestamp) + tsCycles
srDataCopy.PacketCountExt = uint64(srDataCopy.PacketCount) + pcCycles
r.maybeAdjustFirstPacketTime(srDataCopy.RTPTimestampExt, r.timestamp.GetExtendedStart())
if r.srNewest != nil && srDataCopy.RTPTimestampExt < r.srNewest.RTPTimestampExt {
// This can happen when a track is replaced with a null and then restored -
// i. e. muting replacing with null and unmute restoring the original track.
// Under such a condition reset the sender reports to start from this point.
// Resetting will ensure sample rate calculations do not go haywire due to negative time.
r.logger.Infow(
"received sender report, out-of-order, resetting",
"prevTSExt", r.srNewest.RTPTimestampExt,
"prevRTP", r.srNewest.RTPTimestamp,
"prevNTP", r.srNewest.NTPTimestamp.Time().String(),
"prevAt", r.srNewest.At.String(),
"currTSExt", srDataCopy.RTPTimestampExt,
"currRTP", srDataCopy.RTPTimestamp,
"currNTP", srDataCopy.NTPTimestamp.Time().String(),
"currentAt", srDataCopy.At.String(),
)
r.srFirst = nil
}
r.srNewest = &srDataCopy
if r.srFirst == nil {
r.srFirst = &srDataCopy
}
}
func (r *RTPStatsReceiver) GetRtcpSenderReportData() (srFirst *RTCPSenderReportData, srNewest *RTCPSenderReportData) {
r.lock.RLock()
defer r.lock.RUnlock()
if r.srFirst != nil {
srFirstCopy := *r.srFirst
srFirst = &srFirstCopy
}
if r.srNewest != nil {
srNewestCopy := *r.srNewest
srNewest = &srNewestCopy
}
return
}
func (r *RTPStatsReceiver) GetRtcpReceptionReport(ssrc uint32, proxyFracLost uint8, snapshotID uint32) *rtcp.ReceptionReport {
r.lock.Lock()
defer r.lock.Unlock()
extHighestSN := r.sequenceNumber.GetExtendedHighest()
then, now := r.getAndResetSnapshot(snapshotID, r.sequenceNumber.GetExtendedStart(), extHighestSN)
if now == nil || then == nil {
return nil
}
packetsExpected := now.extStartSN - then.extStartSN
if packetsExpected > cNumSequenceNumbers {
r.logger.Warnw(
"too many packets expected in receiver report",
fmt.Errorf("start: %d, end: %d, expected: %d", then.extStartSN, now.extStartSN, packetsExpected),
)
return nil
}
if packetsExpected == 0 {
return nil
}
intervalStats := r.getIntervalStats(then.extStartSN, now.extStartSN, extHighestSN)
packetsLost := intervalStats.packetsLost
lossRate := float32(packetsLost) / float32(packetsExpected)
fracLost := uint8(lossRate * 256.0)
if proxyFracLost > fracLost {
fracLost = proxyFracLost
}
lastSR := uint32(0)
dlsr := uint32(0)
if r.srNewest != nil {
lastSR = uint32(r.srNewest.NTPTimestamp >> 16)
if !r.srNewest.At.IsZero() {
delayMS := uint32(time.Since(r.srNewest.At).Milliseconds())
dlsr = (delayMS / 1e3) << 16
dlsr |= (delayMS % 1e3) * 65536 / 1000
}
}
return &rtcp.ReceptionReport{
SSRC: ssrc,
FractionLost: fracLost,
TotalLost: uint32(r.packetsLost),
LastSequenceNumber: uint32(now.extStartSN),
Jitter: uint32(r.jitter),
LastSenderReport: lastSR,
Delay: dlsr,
}
}
func (r *RTPStatsReceiver) DeltaInfo(snapshotID uint32) *RTPDeltaInfo {
r.lock.Lock()
defer r.lock.Unlock()
return r.deltaInfo(snapshotID, r.sequenceNumber.GetExtendedStart(), r.sequenceNumber.GetExtendedHighest())
}
func (r *RTPStatsReceiver) ToString() string {
r.lock.RLock()
defer r.lock.RUnlock()
return r.toString(
r.sequenceNumber.GetExtendedStart(), r.sequenceNumber.GetExtendedHighest(), r.timestamp.GetExtendedStart(), r.timestamp.GetExtendedHighest(),
r.packetsLost,
r.jitter, r.maxJitter,
)
}
func (r *RTPStatsReceiver) ToProto() *livekit.RTPStats {
r.lock.RLock()
defer r.lock.RUnlock()
return r.toProto(
r.sequenceNumber.GetExtendedStart(), r.sequenceNumber.GetExtendedHighest(), r.timestamp.GetExtendedStart(), r.timestamp.GetExtendedHighest(),
r.packetsLost,
r.jitter, r.maxJitter,
)
}
// ----------------------------------
@@ -35,9 +35,9 @@ func getPacket(sn uint16, ts uint32, payloadSize int) *rtp.Packet {
}
}
func TestRTPStats(t *testing.T) {
func Test_RTPStatsReceiver(t *testing.T) {
clockRate := uint32(90000)
r := NewRTPStats(RTPStatsParams{
r := NewRTPStatsReceiver(RTPStatsParams{
ClockRate: clockRate,
Logger: logger.GetLogger(),
})
@@ -59,7 +59,15 @@ func TestRTPStats(t *testing.T) {
timestamp += uint32(now.Sub(lastFrameTime).Seconds() * float64(clockRate))
for i := 0; i < packetsPerFrame; i++ {
packet := getPacket(sequenceNumber, timestamp, packetSize)
r.Update(&packet.Header, len(packet.Payload), 0, time.Now())
r.Update(
time.Now(),
packet.Header.SequenceNumber,
packet.Header.Timestamp,
packet.Header.Marker,
packet.Header.MarshalSize(),
len(packet.Payload),
0,
)
if (sequenceNumber % 100) == 0 {
jump := uint16(rand.Float64() * 120.0)
sequenceNumber += jump
@@ -77,9 +85,9 @@ func TestRTPStats(t *testing.T) {
fmt.Printf("%s\n", r.ToString())
}
func TestRTPStats_Update(t *testing.T) {
func Test_RTPStatsReceiver_Update(t *testing.T) {
clockRate := uint32(90000)
r := NewRTPStats(RTPStatsParams{
r := NewRTPStatsReceiver(RTPStatsParams{
ClockRate: clockRate,
Logger: logger.GetLogger(),
})
@@ -87,7 +95,15 @@ func TestRTPStats_Update(t *testing.T) {
sequenceNumber := uint16(rand.Float64() * float64(1<<16))
timestamp := uint32(rand.Float64() * float64(1<<32))
packet := getPacket(sequenceNumber, timestamp, 1000)
flowState := r.Update(&packet.Header, len(packet.Payload), 0, time.Now())
flowState := r.Update(
time.Now(),
packet.Header.SequenceNumber,
packet.Header.Timestamp,
packet.Header.Marker,
packet.Header.MarshalSize(),
len(packet.Payload),
0,
)
require.False(t, flowState.HasLoss)
require.True(t, r.initialized)
require.Equal(t, sequenceNumber, r.sequenceNumber.GetHighest())
@@ -99,7 +115,15 @@ func TestRTPStats_Update(t *testing.T) {
sequenceNumber++
timestamp += 3000
packet = getPacket(sequenceNumber, timestamp, 1000)
flowState = r.Update(&packet.Header, len(packet.Payload), 0, time.Now())
flowState = r.Update(
time.Now(),
packet.Header.SequenceNumber,
packet.Header.Timestamp,
packet.Header.Marker,
packet.Header.MarshalSize(),
len(packet.Payload),
0,
)
require.False(t, flowState.HasLoss)
require.Equal(t, sequenceNumber, r.sequenceNumber.GetHighest())
require.Equal(t, sequenceNumber, uint16(r.sequenceNumber.GetExtendedHighest()))
@@ -108,7 +132,15 @@ func TestRTPStats_Update(t *testing.T) {
// out-of-order
packet = getPacket(sequenceNumber-10, timestamp-30000, 1000)
flowState = r.Update(&packet.Header, len(packet.Payload), 0, time.Now())
flowState = r.Update(
time.Now(),
packet.Header.SequenceNumber,
packet.Header.Timestamp,
packet.Header.Marker,
packet.Header.MarshalSize(),
len(packet.Payload),
0,
)
require.False(t, flowState.HasLoss)
require.Equal(t, sequenceNumber, r.sequenceNumber.GetHighest())
require.Equal(t, sequenceNumber, uint16(r.sequenceNumber.GetExtendedHighest()))
@@ -119,7 +151,15 @@ func TestRTPStats_Update(t *testing.T) {
// duplicate
packet = getPacket(sequenceNumber-10, timestamp-30000, 1000)
flowState = r.Update(&packet.Header, len(packet.Payload), 0, time.Now())
flowState = r.Update(
time.Now(),
packet.Header.SequenceNumber,
packet.Header.Timestamp,
packet.Header.Marker,
packet.Header.MarshalSize(),
len(packet.Payload),
0,
)
require.False(t, flowState.HasLoss)
require.Equal(t, sequenceNumber, r.sequenceNumber.GetHighest())
require.Equal(t, sequenceNumber, uint16(r.sequenceNumber.GetExtendedHighest()))
@@ -132,7 +172,15 @@ func TestRTPStats_Update(t *testing.T) {
sequenceNumber += 10
timestamp += 30000
packet = getPacket(sequenceNumber, timestamp, 1000)
flowState = r.Update(&packet.Header, len(packet.Payload), 0, time.Now())
flowState = r.Update(
time.Now(),
packet.Header.SequenceNumber,
packet.Header.Timestamp,
packet.Header.Marker,
packet.Header.MarshalSize(),
len(packet.Payload),
0,
)
require.True(t, flowState.HasLoss)
require.Equal(t, uint64(sequenceNumber-9), flowState.LossStartInclusive)
require.Equal(t, uint64(sequenceNumber), flowState.LossEndExclusive)
@@ -140,7 +188,15 @@ func TestRTPStats_Update(t *testing.T) {
// out-of-order should decrement number of lost packets
packet = getPacket(sequenceNumber-15, timestamp-45000, 1000)
flowState = r.Update(&packet.Header, len(packet.Payload), 0, time.Now())
flowState = r.Update(
time.Now(),
packet.Header.SequenceNumber,
packet.Header.Timestamp,
packet.Header.Marker,
packet.Header.MarshalSize(),
len(packet.Payload),
0,
)
require.False(t, flowState.HasLoss)
require.Equal(t, sequenceNumber, r.sequenceNumber.GetHighest())
require.Equal(t, sequenceNumber, uint16(r.sequenceNumber.GetExtendedHighest()))
@@ -149,7 +205,11 @@ func TestRTPStats_Update(t *testing.T) {
require.Equal(t, uint64(3), r.packetsOutOfOrder)
require.Equal(t, uint64(1), r.packetsDuplicate)
require.Equal(t, uint64(16), r.packetsLost)
intervalStats := r.getIntervalStats(r.sequenceNumber.GetExtendedStart(), r.sequenceNumber.GetExtendedHighest()+1)
intervalStats := r.getIntervalStats(
r.sequenceNumber.GetExtendedStart(),
r.sequenceNumber.GetExtendedHighest()+1,
r.sequenceNumber.GetExtendedHighest(),
)
require.Equal(t, uint64(16), intervalStats.packetsLost)
// test sequence number cache
@@ -157,76 +217,100 @@ func TestRTPStats_Update(t *testing.T) {
sequenceNumber += 2
timestamp += 6000
packet = getPacket(sequenceNumber, timestamp, 1000)
flowState = r.Update(&packet.Header, len(packet.Payload), 0, time.Now())
flowState = r.Update(
time.Now(),
packet.Header.SequenceNumber,
packet.Header.Timestamp,
packet.Header.Marker,
packet.Header.MarshalSize(),
len(packet.Payload),
0,
)
require.True(t, flowState.HasLoss)
require.Equal(t, uint64(sequenceNumber-1), flowState.LossStartInclusive)
require.Equal(t, uint64(sequenceNumber), flowState.LossEndExclusive)
require.Equal(t, uint64(17), r.packetsLost)
expectedSnInfo := SnInfo{
expectedSnInfo := snInfo{
hdrSize: 12,
pktSize: 1012,
isPaddingOnly: false,
marker: false,
isOutOfOrder: false,
}
require.Equal(t, expectedSnInfo, r.snInfos[sequenceNumber&SnInfoMask])
require.Equal(t, expectedSnInfo, r.snInfos[sequenceNumber&cSnInfoMask])
// out-of-order
sequenceNumber--
timestamp -= 3000
packet = getPacket(sequenceNumber, timestamp, 999)
flowState = r.Update(&packet.Header, len(packet.Payload), 0, time.Now())
flowState = r.Update(
time.Now(),
packet.Header.SequenceNumber,
packet.Header.Timestamp,
packet.Header.Marker,
packet.Header.MarshalSize(),
len(packet.Payload),
0,
)
require.False(t, flowState.HasLoss)
require.Equal(t, uint64(16), r.packetsLost)
expectedSnInfo = SnInfo{
expectedSnInfo = snInfo{
hdrSize: 12,
pktSize: 1011,
isPaddingOnly: false,
marker: false,
isOutOfOrder: true,
}
require.Equal(t, expectedSnInfo, r.snInfos[sequenceNumber&SnInfoMask])
require.Equal(t, expectedSnInfo, r.snInfos[sequenceNumber&cSnInfoMask])
// check that last one is still fine
expectedSnInfo = SnInfo{
expectedSnInfo = snInfo{
hdrSize: 12,
pktSize: 1012,
isPaddingOnly: false,
marker: false,
isOutOfOrder: false,
}
require.Equal(t, expectedSnInfo, r.snInfos[(sequenceNumber+1)&SnInfoMask])
require.Equal(t, expectedSnInfo, r.snInfos[(sequenceNumber+1)&cSnInfoMask])
// padding only
sequenceNumber += 2
packet = getPacket(sequenceNumber, timestamp, 0)
flowState = r.Update(&packet.Header, len(packet.Payload), 25, time.Now())
flowState = r.Update(
time.Now(),
packet.Header.SequenceNumber,
packet.Header.Timestamp,
packet.Header.Marker,
packet.Header.MarshalSize(),
len(packet.Payload),
25,
)
require.False(t, flowState.HasLoss)
require.Equal(t, uint64(16), r.packetsLost)
expectedSnInfo = SnInfo{
expectedSnInfo = snInfo{
hdrSize: 12,
pktSize: 37,
isPaddingOnly: true,
marker: false,
isOutOfOrder: false,
}
require.Equal(t, expectedSnInfo, r.snInfos[sequenceNumber&SnInfoMask])
require.Equal(t, expectedSnInfo, r.snInfos[sequenceNumber&cSnInfoMask])
// check that last two are still fine
expectedSnInfo = SnInfo{
expectedSnInfo = snInfo{
hdrSize: 12,
pktSize: 1011,
isPaddingOnly: false,
marker: false,
isOutOfOrder: true,
}
require.Equal(t, expectedSnInfo, r.snInfos[(sequenceNumber-2)&SnInfoMask])
expectedSnInfo = SnInfo{
require.Equal(t, expectedSnInfo, r.snInfos[(sequenceNumber-2)&cSnInfoMask])
expectedSnInfo = snInfo{
hdrSize: 12,
pktSize: 1012,
isPaddingOnly: false,
marker: false,
isOutOfOrder: false,
}
require.Equal(t, expectedSnInfo, r.snInfos[(sequenceNumber-1)&SnInfoMask])
require.Equal(t, expectedSnInfo, r.snInfos[(sequenceNumber-1)&cSnInfoMask])
r.Stop()
}
+618
View File
@@ -0,0 +1,618 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package buffer
import (
"errors"
"fmt"
"time"
"github.com/pion/rtcp"
"github.com/livekit/mediatransportutil"
"github.com/livekit/protocol/livekit"
)
type senderSnapshot struct {
snapshot
extStartSNFromRR uint64
packetsLostFromRR uint64
maxJitterFromRR float64
}
type RTPStatsSender struct {
*rtpStatsBase
extStartSN uint64
extHighestSN uint64
extHighestSNFromRR uint64
lastRRTime time.Time
lastRR rtcp.ReceptionReport
extStartTS uint64
extHighestTS uint64
packetsLostFromRR uint64
jitterFromRR float64
maxJitterFromRR float64
nextSenderSnapshotID uint32
senderSnapshots map[uint32]*senderSnapshot
}
func NewRTPStatsSender(params RTPStatsParams) *RTPStatsSender {
return &RTPStatsSender{
rtpStatsBase: newRTPStatsBase(params),
nextSenderSnapshotID: cFirstSnapshotID,
senderSnapshots: make(map[uint32]*senderSnapshot),
}
}
func (r *RTPStatsSender) Seed(from *RTPStatsSender) {
r.lock.Lock()
defer r.lock.Unlock()
if !r.seed(from.rtpStatsBase) {
return
}
r.extStartSN = from.extStartSN
r.extHighestSN = from.extHighestSN
r.extHighestSNFromRR = from.extHighestSNFromRR
r.lastRRTime = from.lastRRTime
r.lastRR = from.lastRR
r.extStartTS = from.extStartTS
r.extHighestTS = from.extHighestTS
r.packetsLostFromRR = from.packetsLostFromRR
r.jitterFromRR = from.jitterFromRR
r.maxJitterFromRR = from.maxJitterFromRR
r.nextSenderSnapshotID = from.nextSenderSnapshotID
for id, ss := range from.senderSnapshots {
ssCopy := *ss
r.senderSnapshots[id] = &ssCopy
}
}
func (r *RTPStatsSender) NewSnapshotId() uint32 {
r.lock.Lock()
defer r.lock.Unlock()
return r.newSnapshotID(r.extStartSN)
}
func (r *RTPStatsSender) NewSenderSnapshotId() uint32 {
r.lock.Lock()
defer r.lock.Unlock()
id := r.nextSenderSnapshotID
if r.initialized {
r.senderSnapshots[id] = &senderSnapshot{
snapshot: snapshot{
startTime: time.Now(),
extStartSN: r.extStartSN,
},
extStartSNFromRR: r.extStartSN,
}
}
return id
}
func (r *RTPStatsSender) Update(
packetTime time.Time,
extSequenceNumber uint64,
extTimestamp uint64,
marker bool,
hdrSize int,
payloadSize int,
paddingSize int,
) {
r.lock.Lock()
defer r.lock.Unlock()
if !r.endTime.IsZero() {
return
}
if !r.initialized {
if payloadSize == 0 {
// do not start on a padding only packet
return
}
r.initialized = true
r.startTime = time.Now()
r.firstTime = packetTime
r.highestTime = packetTime
r.extStartSN = extSequenceNumber
r.extHighestSN = extSequenceNumber
r.extStartTS = extTimestamp
r.extHighestTS = extTimestamp
// initialize snapshots if any
for i := uint32(cFirstSnapshotID); i < r.nextSnapshotID; i++ {
r.snapshots[i] = &snapshot{
startTime: r.startTime,
extStartSN: r.extStartSN,
}
}
for i := uint32(cFirstSnapshotID); i < r.nextSenderSnapshotID; i++ {
r.senderSnapshots[i] = &senderSnapshot{
snapshot: snapshot{
startTime: r.startTime,
extStartSN: r.extStartSN,
},
extStartSNFromRR: r.extStartSN,
}
}
r.logger.Debugw(
"rtp sender stream start",
"startTime", r.startTime.String(),
"firstTime", r.firstTime.String(),
"startSN", r.extStartSN,
"startTS", r.extStartTS,
)
}
pktSize := uint64(hdrSize + payloadSize + paddingSize)
isDuplicate := false
gapSN := int64(extSequenceNumber - r.extHighestSN)
if gapSN <= 0 { // duplicate OR out-of-order
if payloadSize == 0 && extSequenceNumber < r.extStartSN {
// do not start on a padding only packet
return
}
if extSequenceNumber < r.extStartSN {
r.packetsLost += r.extStartSN - extSequenceNumber
// adjust start of snapshots
for _, s := range r.snapshots {
if s.extStartSN == r.extStartSN {
s.extStartSN = extSequenceNumber
}
}
for _, s := range r.senderSnapshots {
if s.extStartSN == r.extStartSN {
s.extStartSN = extSequenceNumber
}
}
r.extStartSN = extSequenceNumber
}
if extTimestamp < r.extStartTS {
r.extStartTS = extTimestamp
}
if gapSN != 0 {
r.packetsOutOfOrder++
}
if !r.isSnInfoLost(extSequenceNumber, r.extHighestSN) {
r.bytesDuplicate += pktSize
r.headerBytesDuplicate += uint64(hdrSize)
r.packetsDuplicate++
isDuplicate = true
} else {
r.packetsLost--
r.setSnInfo(extSequenceNumber, r.extHighestSN, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), marker, true)
}
} else { // in-order
// update gap histogram
r.updateGapHistogram(int(gapSN))
// update missing sequence numbers
r.clearSnInfos(r.extHighestSN+1, extSequenceNumber)
r.packetsLost += uint64(gapSN - 1)
r.setSnInfo(extSequenceNumber, r.extHighestSN, uint16(pktSize), uint16(hdrSize), uint16(payloadSize), marker, false)
if extTimestamp != r.extHighestTS {
// update only on first packet as same timestamp could be in multiple packets.
// NOTE: this may not be the first packet with this time stamp if there is packet loss.
r.highestTime = packetTime
}
r.extHighestSN = extSequenceNumber
r.extHighestTS = extTimestamp
}
if !isDuplicate {
if payloadSize == 0 {
r.packetsPadding++
r.bytesPadding += pktSize
r.headerBytesPadding += uint64(hdrSize)
} else {
r.bytes += pktSize
r.headerBytes += uint64(hdrSize)
if marker {
r.frames++
}
jitter := r.updateJitter(extTimestamp, packetTime)
for _, s := range r.senderSnapshots {
if jitter > s.maxJitter {
s.maxJitter = jitter
}
}
}
}
}
func (r *RTPStatsSender) GetTotalPacketsPrimary() uint64 {
r.lock.RLock()
defer r.lock.RUnlock()
return r.getTotalPacketsPrimary(r.extStartSN, r.extHighestSN)
}
func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt uint32, isRttChanged bool) {
r.lock.Lock()
defer r.lock.Unlock()
if !r.initialized || !r.endTime.IsZero() {
return
}
extHighestSNFromRR := r.extHighestSNFromRR&0xFFFF_FFFF_0000_0000 + uint64(rr.LastSequenceNumber)
if !r.lastRRTime.IsZero() {
if (rr.LastSequenceNumber-r.lastRR.LastSequenceNumber) < (1<<31) && rr.LastSequenceNumber < r.lastRR.LastSequenceNumber {
extHighestSNFromRR += (1 << 32)
}
}
if extHighestSNFromRR < r.extStartSN {
// it is possible that the `LastSequenceNumber` in the receiver report is before the starting
// sequence number when dummy packets are used to trigger Pion's OnTrack path.
return
}
var err error
if r.srNewest != nil {
rtt, err = mediatransportutil.GetRttMs(&rr, r.srNewest.NTPTimestamp, r.srNewest.At)
if err == nil {
isRttChanged = rtt != r.rtt
} else {
if !errors.Is(err, mediatransportutil.ErrRttNotLastSenderReport) && !errors.Is(err, mediatransportutil.ErrRttNoLastSenderReport) {
r.logger.Warnw("error getting rtt", err)
}
}
}
if r.lastRRTime.IsZero() || r.extHighestSNFromRR <= extHighestSNFromRR {
r.extHighestSNFromRR = extHighestSNFromRR
packetsLostFromRR := r.packetsLostFromRR&0xFFFF_FFFF_0000_0000 + uint64(rr.TotalLost)
if (rr.TotalLost-r.lastRR.TotalLost) < (1<<31) && rr.TotalLost < r.lastRR.TotalLost {
packetsLostFromRR += (1 << 32)
}
r.packetsLostFromRR = packetsLostFromRR
if isRttChanged {
r.rtt = rtt
if rtt > r.maxRtt {
r.maxRtt = rtt
}
}
r.jitterFromRR = float64(rr.Jitter)
if r.jitterFromRR > r.maxJitterFromRR {
r.maxJitterFromRR = r.jitterFromRR
}
// update snapshots
for _, s := range r.snapshots {
if isRttChanged && rtt > s.maxRtt {
s.maxRtt = rtt
}
}
for _, s := range r.senderSnapshots {
if isRttChanged && rtt > s.maxRtt {
s.maxRtt = rtt
}
if r.jitterFromRR > s.maxJitterFromRR {
s.maxJitterFromRR = r.jitterFromRR
}
}
r.lastRRTime = time.Now()
r.lastRR = rr
} else {
r.logger.Debugw(
fmt.Sprintf("receiver report potentially out of order, highestSN: existing: %d, received: %d", r.extHighestSNFromRR, rr.LastSequenceNumber),
"lastRRTime", r.lastRRTime,
"lastRR", r.lastRR,
"sinceLastRR", time.Since(r.lastRRTime),
"receivedRR", rr,
)
}
return
}
func (r *RTPStatsSender) LastReceiverReportTime() time.Time {
r.lock.RLock()
defer r.lock.RUnlock()
return r.lastRRTime
}
func (r *RTPStatsSender) MaybeAdjustFirstPacketTime(ets uint64) {
r.lock.Lock()
defer r.lock.Unlock()
r.maybeAdjustFirstPacketTime(ets, r.extStartTS)
}
func (r *RTPStatsSender) GetExpectedRTPTimestamp(at time.Time) (expectedTSExt uint64, err error) {
r.lock.RLock()
defer r.lock.RUnlock()
if !r.initialized {
err = errors.New("uninitilaized")
return
}
timeDiff := at.Sub(r.firstTime)
expectedRTPDiff := timeDiff.Nanoseconds() * int64(r.params.ClockRate) / 1e9
expectedTSExt = r.extStartTS + uint64(expectedRTPDiff)
return
}
func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, calculatedClockRate uint32) *rtcp.SenderReport {
r.lock.Lock()
defer r.lock.Unlock()
if !r.initialized {
return nil
}
// construct current time based on monotonic clock
timeSinceFirst := time.Since(r.firstTime)
now := r.firstTime.Add(timeSinceFirst)
nowNTP := mediatransportutil.ToNtpTime(now)
timeSinceHighest := now.Sub(r.highestTime)
nowRTPExt := r.extHighestTS + uint64(timeSinceHighest.Nanoseconds()*int64(r.params.ClockRate)/1e9)
nowRTPExtUsingTime := nowRTPExt
nowRTP := uint32(nowRTPExt)
// It is possible that publisher is pacing at a slower rate.
// That would make `highestTS` to be lagging the RTP time stamp in the RTCP Sender Report from publisher.
// Check for that using calculated clock rate and use the later time stamp if applicable.
var nowRTPExtUsingRate uint64
if calculatedClockRate != 0 {
nowRTPExtUsingRate = r.extStartTS + uint64(float64(calculatedClockRate)*timeSinceFirst.Seconds())
if nowRTPExtUsingRate > nowRTPExt {
nowRTPExt = nowRTPExtUsingRate
nowRTP = uint32(nowRTPExt)
}
}
if r.srNewest != nil && nowRTPExt < r.srNewest.RTPTimestampExt {
// If report being generated is behind, use the time difference and
// clock rate of codec to produce next report.
//
// Current report could be behind due to the following
// - Publisher pacing
// - Due to above, report from publisher side is ahead of packet timestamps.
// Note that report will map wall clock to timestamp at capture time and happens before the pacer.
// - Pause/Mute followed by resume, some combination of events that could
// result in this module not having calculated clock rate of publisher side.
// - When the above happens, current will be generated using highestTS which could be behind.
// That could end up behind the last report's timestamp in extreme cases
r.logger.Infow(
"sending sender report, out-of-order, repairing",
"prevTSExt", r.srNewest.RTPTimestampExt,
"prevRTP", r.srNewest.RTPTimestamp,
"prevNTP", r.srNewest.NTPTimestamp.Time().String(),
"currTSExt", nowRTPExt,
"currRTP", nowRTP,
"currNTP", nowNTP.Time().String(),
"timeNow", time.Now().String(),
"firstTime", r.firstTime.String(),
"timeSinceFirst", timeSinceFirst,
"highestTime", r.highestTime.String(),
"timeSinceHighest", timeSinceHighest,
"nowRTPExtUsingTime", nowRTPExtUsingTime,
"calculatedClockRate", calculatedClockRate,
"nowRTPExtUsingRate", nowRTPExtUsingRate,
)
ntpDiffSinceLast := nowNTP.Time().Sub(r.srNewest.NTPTimestamp.Time())
nowRTPExt = r.srNewest.RTPTimestampExt + uint64(ntpDiffSinceLast.Seconds()*float64(r.params.ClockRate))
nowRTP = uint32(nowRTPExt)
}
r.srNewest = &RTCPSenderReportData{
NTPTimestamp: nowNTP,
RTPTimestamp: nowRTP,
RTPTimestampExt: nowRTPExt,
At: now,
}
if r.srFirst == nil {
r.srFirst = r.srNewest
}
return &rtcp.SenderReport{
SSRC: ssrc,
NTPTime: uint64(nowNTP),
RTPTime: nowRTP,
PacketCount: uint32(r.getTotalPacketsPrimary(r.extStartSN, r.extHighestSN) + r.packetsDuplicate + r.packetsPadding),
OctetCount: uint32(r.bytes + r.bytesDuplicate + r.bytesPadding),
}
}
func (r *RTPStatsSender) DeltaInfo(snapshotID uint32) *RTPDeltaInfo {
r.lock.Lock()
defer r.lock.Unlock()
return r.deltaInfo(snapshotID, r.extStartSN, r.extHighestSN)
}
func (r *RTPStatsSender) DeltaInfoSender(senderSnapshotID uint32) *RTPDeltaInfo {
r.lock.Lock()
defer r.lock.Unlock()
if r.lastRRTime.IsZero() {
return nil
}
then, now := r.getAndResetSenderSnapshot(senderSnapshotID)
if now == nil || then == nil {
return nil
}
startTime := then.startTime
endTime := now.startTime
packetsExpected := now.extStartSNFromRR - then.extStartSNFromRR
if packetsExpected > cNumSequenceNumbers {
r.logger.Warnw(
"too many packets expected in delta (sender)",
fmt.Errorf("start: %d, end: %d, expected: %d", then.extStartSNFromRR, now.extStartSNFromRR, packetsExpected),
)
return nil
}
if packetsExpected == 0 {
// not received RTCP RR (OR) publisher is not producing any data
return nil
}
intervalStats := r.getIntervalStats(then.extStartSNFromRR, now.extStartSNFromRR, r.extHighestSN)
packetsLost := now.packetsLostFromRR - then.packetsLostFromRR
if int32(packetsLost) < 0 {
packetsLost = 0
}
if packetsLost > packetsExpected {
r.logger.Warnw(
"unexpected number of packets lost",
fmt.Errorf(
"start: %d, end: %d, expected: %d, lost: report: %d, interval: %d",
then.extStartSNFromRR,
now.extStartSNFromRR,
packetsExpected,
now.packetsLostFromRR-then.packetsLostFromRR,
intervalStats.packetsLost,
),
)
packetsLost = packetsExpected
}
// discount jitter from publisher side + internal processing
maxJitter := then.maxJitterFromRR - then.maxJitter
if maxJitter < 0.0 {
maxJitter = 0.0
}
maxJitterTime := maxJitter / float64(r.params.ClockRate) * 1e6
return &RTPDeltaInfo{
StartTime: startTime,
Duration: endTime.Sub(startTime),
Packets: uint32(packetsExpected - intervalStats.packetsPadding),
Bytes: intervalStats.bytes,
HeaderBytes: intervalStats.headerBytes,
PacketsDuplicate: uint32(now.packetsDuplicate - then.packetsDuplicate),
BytesDuplicate: now.bytesDuplicate - then.bytesDuplicate,
HeaderBytesDuplicate: now.headerBytesDuplicate - then.headerBytesDuplicate,
PacketsPadding: uint32(intervalStats.packetsPadding),
BytesPadding: intervalStats.bytesPadding,
HeaderBytesPadding: intervalStats.headerBytesPadding,
PacketsLost: uint32(packetsLost),
PacketsMissing: uint32(intervalStats.packetsLost),
PacketsOutOfOrder: uint32(intervalStats.packetsOutOfOrder),
Frames: intervalStats.frames,
RttMax: then.maxRtt,
JitterMax: maxJitterTime,
Nacks: now.nacks - then.nacks,
Plis: now.plis - then.plis,
Firs: now.firs - then.firs,
}
}
func (r *RTPStatsSender) ToString() string {
r.lock.RLock()
defer r.lock.RUnlock()
return r.toString(
r.extStartSN, r.extHighestSN, r.extStartTS, r.extHighestTS,
r.packetsLostFromRR,
r.jitterFromRR, r.maxJitterFromRR,
)
}
func (r *RTPStatsSender) ToProto() *livekit.RTPStats {
r.lock.RLock()
defer r.lock.RUnlock()
return r.toProto(
r.extStartSN, r.extHighestSN, r.extStartTS, r.extHighestTS,
r.packetsLostFromRR,
r.jitterFromRR, r.maxJitterFromRR,
)
}
func (r *RTPStatsSender) getAndResetSenderSnapshot(senderSnapshotID uint32) (*senderSnapshot, *senderSnapshot) {
if !r.initialized || r.lastRRTime.IsZero() {
return nil, nil
}
then := r.senderSnapshots[senderSnapshotID]
if then == nil {
then = &senderSnapshot{
snapshot: snapshot{
startTime: r.startTime,
extStartSN: r.extStartSN,
},
extStartSNFromRR: r.extStartSN,
}
r.senderSnapshots[senderSnapshotID] = then
}
// snapshot now
r.senderSnapshots[senderSnapshotID] = &senderSnapshot{
snapshot: snapshot{
startTime: r.lastRRTime,
extStartSN: r.extHighestSN + 1,
packetsDuplicate: r.packetsDuplicate,
bytesDuplicate: r.bytesDuplicate,
headerBytesDuplicate: r.headerBytesDuplicate,
nacks: r.nacks,
plis: r.plis,
firs: r.firs,
maxJitter: r.jitter,
maxRtt: r.rtt,
},
extStartSNFromRR: r.extHighestSNFromRR + 1,
packetsLostFromRR: r.packetsLostFromRR,
maxJitterFromRR: r.jitterFromRR,
}
// make a copy so that it can be used independently
now := *r.senderSnapshots[senderSnapshotID]
return then, &now
}
// -------------------------------------------------------------------
+4 -4
View File
@@ -40,7 +40,7 @@ type ConnectionStatsParams struct {
IncludeRTT bool
IncludeJitter bool
GetDeltaStats func() map[uint32]*buffer.StreamStatsWithLayers
GetDeltaStatsOverridden func() map[uint32]*buffer.StreamStatsWithLayers
GetDeltaStatsSender func() map[uint32]*buffer.StreamStatsWithLayers
GetLastReceiverReportTime func() time.Time
GetTotalPacketsSent func() uint64
Logger logger.Logger
@@ -215,7 +215,7 @@ func (cs *ConnectionStats) updateScoreWithAggregate(agg *buffer.RTPDeltaInfo, at
}
func (cs *ConnectionStats) updateScoreFromReceiverReport(at time.Time) (float32, map[uint32]*buffer.StreamStatsWithLayers) {
if cs.params.GetDeltaStatsOverridden == nil || cs.params.GetLastReceiverReportTime == nil || cs.params.GetTotalPacketsSent == nil {
if cs.params.GetDeltaStatsSender == nil || cs.params.GetLastReceiverReportTime == nil || cs.params.GetTotalPacketsSent == nil {
return MinMOS, nil
}
@@ -226,7 +226,7 @@ func (cs *ConnectionStats) updateScoreFromReceiverReport(at time.Time) (float32,
return mos, nil
}
streams := cs.params.GetDeltaStatsOverridden()
streams := cs.params.GetDeltaStatsSender()
if len(streams) == 0 {
// check for receiver report not received for a while
marker := cs.params.GetLastReceiverReportTime()
@@ -260,7 +260,7 @@ func (cs *ConnectionStats) updateScoreFromReceiverReport(at time.Time) (float32,
}
func (cs *ConnectionStats) updateScoreAt(at time.Time) (float32, map[uint32]*buffer.StreamStatsWithLayers) {
if cs.params.GetDeltaStatsOverridden != nil {
if cs.params.GetDeltaStatsSender != nil {
// receiver report based quality scoring, use stats from receiver report for scoring
return cs.updateScoreFromReceiverReport(at)
}
+71 -58
View File
@@ -126,14 +126,14 @@ var (
// -------------------------------------------------------------------
type DownTrackState struct {
RTPStats *buffer.RTPStats
DeltaStatsOverriddenSnapshotId uint32
ForwarderState ForwarderState
RTPStats *buffer.RTPStatsSender
DeltaStatsSenderSnapshotId uint32
ForwarderState ForwarderState
}
func (d DownTrackState) String() string {
return fmt.Sprintf("DownTrackState{rtpStats: %s, deltaOverridden: %d, forwarder: %s}",
d.RTPStats.ToString(), d.DeltaStatsOverriddenSnapshotId, d.ForwarderState.String())
return fmt.Sprintf("DownTrackState{rtpStats: %s, deltaSender: %d, forwarder: %s}",
d.RTPStats.ToString(), d.DeltaStatsSenderSnapshotId, d.ForwarderState.String())
}
// -------------------------------------------------------------------
@@ -239,7 +239,7 @@ type DownTrack struct {
bindAndConnectedOnce atomic.Bool
writable atomic.Bool
rtpStats *buffer.RTPStats
rtpStats *buffer.RTPStatsSender
totalRepeatedNACKs atomic.Uint32
@@ -247,8 +247,8 @@ type DownTrack struct {
blankFramesGeneration atomic.Uint32
connectionStats *connectionquality.ConnectionStats
deltaStatsOverriddenSnapshotId uint32
connectionStats *connectionquality.ConnectionStats
deltaStatsSenderSnapshotId uint32
isNACKThrottled atomic.Bool
@@ -304,17 +304,16 @@ func NewDownTrack(params DowntrackParams) (*DownTrack, error) {
d.getExpectedRTPTimestamp,
)
d.rtpStats = buffer.NewRTPStats(buffer.RTPStatsParams{
ClockRate: d.codec.ClockRate,
IsReceiverReportDriven: true,
Logger: params.Logger,
d.rtpStats = buffer.NewRTPStatsSender(buffer.RTPStatsParams{
ClockRate: d.codec.ClockRate,
Logger: params.Logger,
})
d.deltaStatsOverriddenSnapshotId = d.rtpStats.NewSnapshotId()
d.deltaStatsSenderSnapshotId = d.rtpStats.NewSenderSnapshotId()
d.connectionStats = connectionquality.NewConnectionStats(connectionquality.ConnectionStatsParams{
MimeType: codecs[0].MimeType, // LK-TODO have to notify on codec change
IsFECEnabled: strings.EqualFold(codecs[0].MimeType, webrtc.MimeTypeOpus) && strings.Contains(strings.ToLower(codecs[0].SDPFmtpLine), "fec"),
GetDeltaStatsOverridden: d.getDeltaStatsOverridden,
GetDeltaStatsSender: d.getDeltaStatsSender,
GetLastReceiverReportTime: func() time.Time { return d.rtpStats.LastReceiverReportTime() },
GetTotalPacketsSent: func() uint64 { return d.rtpStats.GetTotalPacketsPrimary() },
Logger: params.Logger.WithValues("direction", "down"),
@@ -723,11 +722,13 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error {
TransportWideExtID: uint8(d.transportWideExtID),
WriteStream: d.writeStream,
Metadata: sendPacketMetadata{
layer: layer,
arrival: extPkt.Arrival,
isKeyFrame: extPkt.KeyFrame,
tp: tp,
pool: pool,
layer: layer,
arrival: extPkt.Arrival,
extSequenceNumber: tp.rtp.extSequenceNumber,
extTimestamp: tp.rtp.extTimestamp,
isKeyFrame: extPkt.KeyFrame,
tp: tp,
pool: pool,
},
OnSent: d.packetSent,
})
@@ -814,8 +815,10 @@ func (d *DownTrack) WritePaddingRTP(bytesToSend int, paddingOnMute bool, forceMa
TransportWideExtID: uint8(d.transportWideExtID),
WriteStream: d.writeStream,
Metadata: sendPacketMetadata{
isPadding: true,
disableCounter: true,
extSequenceNumber: snts[i].extSequenceNumber,
extTimestamp: snts[i].extTimestamp,
isPadding: true,
disableCounter: true,
},
OnSent: d.packetSent,
})
@@ -989,16 +992,16 @@ func (d *DownTrack) MaxLayer() buffer.VideoLayer {
func (d *DownTrack) GetState() DownTrackState {
dts := DownTrackState{
RTPStats: d.rtpStats,
DeltaStatsOverriddenSnapshotId: d.deltaStatsOverriddenSnapshotId,
ForwarderState: d.forwarder.GetState(),
RTPStats: d.rtpStats,
DeltaStatsSenderSnapshotId: d.deltaStatsSenderSnapshotId,
ForwarderState: d.forwarder.GetState(),
}
return dts
}
func (d *DownTrack) SeedState(state DownTrackState) {
d.rtpStats.Seed(state.RTPStats)
d.deltaStatsOverriddenSnapshotId = state.DeltaStatsOverriddenSnapshotId
d.deltaStatsSenderSnapshotId = state.DeltaStatsSenderSnapshotId
d.forwarder.SeedState(state.ForwarderState)
}
@@ -1312,8 +1315,11 @@ func (d *DownTrack) writeBlankFrameRTP(duration float32, generation uint32) chan
AbsSendTimeExtID: uint8(d.absSendTimeExtID),
TransportWideExtID: uint8(d.transportWideExtID),
WriteStream: d.writeStream,
Metadata: sendPacketMetadata{},
OnSent: d.packetSent,
Metadata: sendPacketMetadata{
extSequenceNumber: snts[i].extSequenceNumber,
extTimestamp: snts[i].extTimestamp,
},
OnSent: d.packetSent,
})
// only the first frame will need frameEndNeeded to close out the
@@ -1539,20 +1545,20 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) {
nackMisses := uint32(0)
numRepeatedNACKs := uint32(0)
nackInfos := make([]NackInfo, 0, len(filtered))
for _, meta := range d.sequencer.getPacketsMeta(filtered) {
if disallowedLayers[meta.layer] {
for _, epm := range d.sequencer.getExtPacketMetas(filtered) {
if disallowedLayers[epm.layer] {
continue
}
nackAcks++
nackInfos = append(nackInfos, NackInfo{
SequenceNumber: meta.targetSeqNo,
Timestamp: meta.timestamp,
Attempts: meta.nacked,
SequenceNumber: epm.targetSeqNo,
Timestamp: epm.timestamp,
Attempts: epm.nacked,
})
pktBuff := *src
n, err := d.params.Receiver.ReadRTP(pktBuff, uint8(meta.layer), meta.sourceSeqNo)
n, err := d.params.Receiver.ReadRTP(pktBuff, uint8(epm.layer), epm.sourceSeqNo)
if err != nil {
if err == io.EOF {
break
@@ -1561,7 +1567,7 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) {
continue
}
if meta.nacked > 1 {
if epm.nacked > 1 {
numRepeatedNACKs++
}
@@ -1570,15 +1576,15 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) {
d.params.Logger.Errorw("unmarshalling rtp packet failed in retransmit", err)
continue
}
pkt.Header.Marker = meta.marker
pkt.Header.SequenceNumber = meta.targetSeqNo
pkt.Header.Timestamp = meta.timestamp
pkt.Header.Marker = epm.marker
pkt.Header.SequenceNumber = epm.targetSeqNo
pkt.Header.Timestamp = epm.timestamp
pkt.Header.SSRC = d.ssrc
pkt.Header.PayloadType = d.payloadType
var payload []byte
pool := PacketFactory.Get().(*[]byte)
if d.mime == "video/vp8" && len(pkt.Payload) > 0 && len(meta.codecBytes) != 0 {
if d.mime == "video/vp8" && len(pkt.Payload) > 0 && len(epm.codecBytes) != 0 {
var incomingVP8 buffer.VP8
if err = incomingVP8.Unmarshal(pkt.Payload); err != nil {
d.params.Logger.Errorw("unmarshalling VP8 packet err", err)
@@ -1586,7 +1592,7 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) {
continue
}
payload = d.translateVP8PacketTo(&pkt, &incomingVP8, meta.codecBytes, pool)
payload = d.translateVP8PacketTo(&pkt, &incomingVP8, epm.codecBytes, pool)
}
if payload == nil {
payload = (*pool)[:len(pkt.Payload)]
@@ -1595,14 +1601,16 @@ func (d *DownTrack) retransmitPackets(nacks []uint16) {
d.pacer.Enqueue(pacer.Packet{
Header: &pkt.Header,
Extensions: []pacer.ExtensionData{{ID: uint8(d.dependencyDescriptorExtID), Payload: meta.ddBytes}},
Extensions: []pacer.ExtensionData{{ID: uint8(d.dependencyDescriptorExtID), Payload: epm.ddBytes}},
Payload: payload,
AbsSendTimeExtID: uint8(d.absSendTimeExtID),
TransportWideExtID: uint8(d.transportWideExtID),
WriteStream: d.writeStream,
Metadata: sendPacketMetadata{
isRTX: true,
pool: pool,
extSequenceNumber: epm.extSequenceNumber,
extTimestamp: epm.extTimestamp,
isRTX: true,
pool: pool,
},
OnSent: d.packetSent,
})
@@ -1707,8 +1715,9 @@ func (d *DownTrack) deltaStats(ds *buffer.RTPDeltaInfo) map[uint32]*buffer.Strea
return streamStats
}
func (d *DownTrack) getDeltaStatsOverridden() map[uint32]*buffer.StreamStatsWithLayers {
return d.deltaStats(d.rtpStats.DeltaInfoOverridden(d.deltaStatsOverriddenSnapshotId))
func (d *DownTrack) getDeltaStatsSender() map[uint32]*buffer.StreamStatsWithLayers {
return d.deltaStats(d.rtpStats.DeltaInfoSender(d.deltaStatsSenderSnapshotId))
return nil
}
func (d *DownTrack) GetNackStats() (totalPackets uint32, totalRepeatedNACKs uint32) {
@@ -1806,6 +1815,8 @@ func (d *DownTrack) sendSilentFrameOnMuteForOpus() {
TransportWideExtID: uint8(d.transportWideExtID),
WriteStream: d.writeStream,
Metadata: sendPacketMetadata{
extSequenceNumber: snts[i].extSequenceNumber,
extTimestamp: snts[i].extTimestamp,
// although this is using empty frames, mark as padding as these are used to trigger Pion OnTrack only
isPadding: true,
},
@@ -1826,17 +1837,19 @@ func (d *DownTrack) HandleRTCPSenderReportData(_payloadType webrtc.PayloadType,
}
type sendPacketMetadata struct {
layer int32
arrival time.Time
isKeyFrame bool
isRTX bool
isPadding bool
disableCounter bool
tp *TranslationParams
pool *[]byte
layer int32
arrival time.Time
extSequenceNumber uint64
extTimestamp uint64
isKeyFrame bool
isRTX bool
isPadding bool
disableCounter bool
tp *TranslationParams
pool *[]byte
}
func (d *DownTrack) packetSent(md interface{}, hdr *rtp.Header, payloadSize int, sendTime time.Time, sendError error) {
func (d *DownTrack) packetSent(md interface{}, marker bool, hdrSize int, payloadSize int, sendTime time.Time, sendError error) {
spmd, ok := md.(sendPacketMetadata)
if !ok {
d.params.Logger.Errorw("invalid send packet metadata", nil)
@@ -1853,7 +1866,7 @@ func (d *DownTrack) packetSent(md interface{}, hdr *rtp.Header, payloadSize int,
if !spmd.disableCounter {
// STREAM-ALLOCATOR-TODO: remove this stream allocator bytes counter once stream allocator changes fully to pull bytes counter
size := uint32(hdr.MarshalSize() + payloadSize)
size := uint32(hdrSize + payloadSize)
d.streamAllocatorBytesCounter.Add(size)
if spmd.isRTX {
d.bytesRetransmitted.Add(size)
@@ -1868,9 +1881,9 @@ func (d *DownTrack) packetSent(md interface{}, hdr *rtp.Header, payloadSize int,
packetTime = sendTime
}
if spmd.isPadding {
d.rtpStats.Update(hdr, 0, payloadSize, packetTime)
d.rtpStats.Update(packetTime, spmd.extSequenceNumber, spmd.extTimestamp, marker, hdrSize, 0, payloadSize)
} else {
d.rtpStats.Update(hdr, payloadSize, 0, packetTime)
d.rtpStats.Update(packetTime, spmd.extSequenceNumber, spmd.extTimestamp, marker, hdrSize, payloadSize, 0)
}
if spmd.isKeyFrame {
@@ -1879,8 +1892,8 @@ func (d *DownTrack) packetSent(md interface{}, hdr *rtp.Header, payloadSize int,
d.params.Logger.Debugw(
"forwarded key frame",
"layer", spmd.layer,
"rtpsn", hdr.SequenceNumber,
"rtpts", hdr.Timestamp,
"rtpsn", spmd.extSequenceNumber,
"rtpts", spmd.extTimestamp,
)
}
+1 -1
View File
@@ -47,7 +47,7 @@ func (b *Base) SendPacket(p *Packet) (int, error) {
var err error
defer func() {
if p.OnSent != nil {
p.OnSent(p.Metadata, p.Header, len(p.Payload), sendingAt, err)
p.OnSent(p.Metadata, p.Header.Marker, p.Header.MarshalSize(), len(p.Payload), sendingAt, err)
}
}()
+1 -1
View File
@@ -34,7 +34,7 @@ type Packet struct {
TransportWideExtID uint8
WriteStream webrtc.TrackLocalWriter
Metadata interface{}
OnSent func(md interface{}, sentHeader *rtp.Header, payloadSize int, sentTime time.Time, sendError error)
OnSent func(md interface{}, marker bool, hdrSize int, payloadSize int, sentTime time.Time, sendError error)
}
type Pacer interface {
+28 -7
View File
@@ -72,6 +72,12 @@ type packetMeta struct {
ddBytes []byte
}
type extPacketMeta struct {
packetMeta
extSequenceNumber uint64
extTimestamp uint64
}
// Sequencer stores the packet sequence received by the down track
type sequencer struct {
sync.Mutex
@@ -80,6 +86,7 @@ type sequencer struct {
initialized bool
extHighestSN uint64
snOffset uint64
extHighestTS uint64
meta []packetMeta
snRangeMap *utils.RangeMap[uint64, uint64]
rtt uint32
@@ -126,6 +133,7 @@ func (s *sequencer) push(
if !s.initialized {
s.extHighestSN = extModifiedSN - 1
s.extHighestTS = extModifiedTS
s.updateSNOffset()
}
@@ -149,6 +157,10 @@ func (s *sequencer) push(
}
}
if int64(extModifiedTS-s.extHighestTS) >= 0 {
s.extHighestTS = extModifiedTS
}
slot := (extModifiedSN - snOffset) % uint64(s.size)
s.meta[slot] = packetMeta{
sourceSeqNo: uint16(extIncomingSN),
@@ -213,15 +225,16 @@ func (s *sequencer) pushPadding(extStartSNInclusive uint64, extEndSNInclusive ui
s.updateSNOffset()
}
func (s *sequencer) getPacketsMeta(seqNo []uint16) []packetMeta {
func (s *sequencer) getExtPacketMetas(seqNo []uint16) []extPacketMeta {
s.Lock()
defer s.Unlock()
snOffset := uint64(0)
var err error
packetsMeta := make([]packetMeta, 0, len(seqNo))
extPacketMetas := make([]extPacketMeta, 0, len(seqNo))
refTime := s.getRefTime(time.Now())
highestSN := uint16(s.extHighestSN)
highestTS := uint32(s.extHighestTS)
for _, sn := range seqNo {
diff := highestSN - sn
if diff > (1 << 15) {
@@ -258,14 +271,22 @@ func (s *sequencer) getPacketsMeta(seqNo []uint16) []packetMeta {
meta.nacked++
meta.lastNack = refTime
pm := *meta
pm.codecBytes = append([]byte{}, meta.codecBytes...)
pm.ddBytes = append([]byte{}, meta.ddBytes...)
packetsMeta = append(packetsMeta, pm)
extTS := uint64(meta.timestamp) + (s.extHighestTS & 0xFFFF_FFFF_FFFF_0000)
if meta.timestamp > highestTS {
extTS -= (1 << 32)
}
epm := extPacketMeta{
packetMeta: *meta,
extSequenceNumber: extSN,
extTimestamp: extTS,
}
epm.codecBytes = append([]byte{}, meta.codecBytes...)
epm.ddBytes = append([]byte{}, meta.ddBytes...)
extPacketMetas = append(extPacketMetas, epm)
}
}
return packetsMeta
return extPacketMetas
}
func (s *sequencer) getRefTime(at time.Time) uint32 {
+16 -12
View File
@@ -36,41 +36,45 @@ func Test_sequencer(t *testing.T) {
seq.push(time.Now(), 518, 518+uint64(off), 123, true, 2, nil, nil)
req := []uint16{57, 58, 62, 63, 513, 514, 515, 516, 517}
res := seq.getPacketsMeta(req)
res := seq.getExtPacketMetas(req)
// nothing should be returned as not enough time has elapsed since sending packet
require.Equal(t, 0, len(res))
time.Sleep((ignoreRetransmission + 10) * time.Millisecond)
res = seq.getPacketsMeta(req)
res = seq.getExtPacketMetas(req)
require.Equal(t, len(req), len(res))
for i, val := range res {
require.Equal(t, val.targetSeqNo, req[i])
require.Equal(t, val.sourceSeqNo, req[i]-off)
require.Equal(t, val.layer, int8(2))
require.Equal(t, val.extSequenceNumber, uint64(req[i]))
require.Equal(t, val.extTimestamp, uint64(123))
}
res = seq.getPacketsMeta(req)
res = seq.getExtPacketMetas(req)
require.Equal(t, 0, len(res))
time.Sleep((ignoreRetransmission + 10) * time.Millisecond)
res = seq.getPacketsMeta(req)
res = seq.getExtPacketMetas(req)
require.Equal(t, len(req), len(res))
for i, val := range res {
require.Equal(t, val.targetSeqNo, req[i])
require.Equal(t, val.sourceSeqNo, req[i]-off)
require.Equal(t, val.layer, int8(2))
require.Equal(t, val.extSequenceNumber, uint64(req[i]))
require.Equal(t, val.extTimestamp, uint64(123))
}
seq.push(time.Now(), 521, 521+uint64(off), 123, true, 1, nil, nil)
m := seq.getPacketsMeta([]uint16{521 + off})
m := seq.getExtPacketMetas([]uint16{521 + off})
require.Equal(t, 0, len(m))
time.Sleep((ignoreRetransmission + 10) * time.Millisecond)
m = seq.getPacketsMeta([]uint16{521 + off})
m = seq.getExtPacketMetas([]uint16{521 + off})
require.Equal(t, 1, len(m))
seq.push(time.Now(), 505, 505+uint64(off), 123, false, 1, nil, nil)
m = seq.getPacketsMeta([]uint16{505 + off})
m = seq.getExtPacketMetas([]uint16{505 + off})
require.Equal(t, 0, len(m))
time.Sleep((ignoreRetransmission + 10) * time.Millisecond)
m = seq.getPacketsMeta([]uint16{505 + off})
m = seq.getExtPacketMetas([]uint16{505 + off})
require.Equal(t, 1, len(m))
}
@@ -148,7 +152,7 @@ func Test_sequencer_getNACKSeqNo_exclusion(t *testing.T) {
}
time.Sleep((ignoreRetransmission + 10) * time.Millisecond)
g := n.getPacketsMeta(tt.args.seqNo)
g := n.getExtPacketMetas(tt.args.seqNo)
var got []uint16
for _, sn := range g {
got = append(got, sn.sourceSeqNo)
@@ -163,7 +167,7 @@ func Test_sequencer_getNACKSeqNo_exclusion(t *testing.T) {
}
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("getPacketsMeta() = %v, want %v", got, tt.want)
t.Errorf("getExtPacketMetas() = %v, want %v", got, tt.want)
}
})
}
@@ -242,7 +246,7 @@ func Test_sequencer_getNACKSeqNo_no_exclusion(t *testing.T) {
}
time.Sleep((ignoreRetransmission + 10) * time.Millisecond)
g := n.getPacketsMeta(tt.args.seqNo)
g := n.getExtPacketMetas(tt.args.seqNo)
var got []uint16
for _, sn := range g {
got = append(got, sn.sourceSeqNo)
@@ -257,7 +261,7 @@ func Test_sequencer_getNACKSeqNo_no_exclusion(t *testing.T) {
}
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("getPacketsMeta() = %v, want %v", got, tt.want)
t.Errorf("getExtPacketMetas() = %v, want %v", got, tt.want)
}
})
}