Splitting out rtp stats stuff into its own package. (#3060)

* Splitting out rtp stats stuff into its own package.

Going to be making some lighter versions of these.
Will be cleaner to have all of these grouped together.
So, as a first step, just making a package for it.

* tests
This commit is contained in:
Raja Subramanian
2024-10-03 15:51:24 +05:30
committed by GitHub
parent 0b4fd32905
commit 8ac33a868c
15 changed files with 88 additions and 63 deletions
+2 -1
View File
@@ -36,6 +36,7 @@ import (
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/sfu"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/livekit-server/pkg/sfu/rtpstats"
"github.com/livekit/livekit-server/pkg/telemetry"
)
@@ -893,5 +894,5 @@ func (t *MediaTrackReceiver) GetTrackStats() *livekit.RTPStats {
}
}
return buffer.AggregateRTPStats(stats)
return rtpstats.AggregateRTPStats(stats)
}
+6 -5
View File
@@ -33,6 +33,7 @@ import (
"github.com/livekit/livekit-server/pkg/sfu/audio"
act "github.com/livekit/livekit-server/pkg/sfu/rtpextension/abscapturetime"
dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor"
"github.com/livekit/livekit-server/pkg/sfu/rtpstats"
"github.com/livekit/livekit-server/pkg/sfu/utils"
sutils "github.com/livekit/livekit-server/pkg/utils"
"github.com/livekit/mediatransportutil/pkg/bucket"
@@ -105,7 +106,7 @@ type Buffer struct {
pliThrottle int64
rtpStats *RTPStatsReceiver
rtpStats *rtpstats.RTPStatsReceiver
rrSnapshotId uint32
deltaStatsSnapshotId uint32
ppsSnapshotId uint32
@@ -211,7 +212,7 @@ func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapabili
return
}
b.rtpStats = NewRTPStatsReceiver(RTPStatsParams{
b.rtpStats = rtpstats.NewRTPStatsReceiver(rtpstats.RTPStatsParams{
ClockRate: codec.ClockRate,
Logger: b.logger,
})
@@ -716,7 +717,7 @@ func (b *Buffer) doFpsCalc(ep *ExtPacket) {
}
}
func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime int64) RTPFlowState {
func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime int64) rtpstats.RTPFlowState {
flowState := b.rtpStats.Update(
arrivalTime,
p.Header.SequenceNumber,
@@ -762,7 +763,7 @@ func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime int64, isRTX
}
}
func (b *Buffer) getExtPacket(rtpPacket *rtp.Packet, arrivalTime int64, flowState RTPFlowState) *ExtPacket {
func (b *Buffer) getExtPacket(rtpPacket *rtp.Packet, arrivalTime int64, flowState rtpstats.RTPFlowState) *ExtPacket {
ep := &ExtPacket{
Arrival: arrivalTime,
ExtSequenceNumber: flowState.ExtSequenceNumber,
@@ -1076,7 +1077,7 @@ func (b *Buffer) GetDeltaStats() *StreamStatsWithLayers {
return &StreamStatsWithLayers{
RTPStats: deltaStats,
Layers: map[int32]*RTPDeltaInfo{
Layers: map[int32]*rtpstats.RTPDeltaInfo{
0: deltaStats,
},
}
+2 -2
View File
@@ -223,8 +223,8 @@ func TestNewBuffer(t *testing.T) {
buf, _ := p.Marshal()
_, _ = buff.Write(buf)
}
require.Equal(t, uint16(2), buff.rtpStats.sequenceNumber.GetHighest())
require.Equal(t, uint64(65536+2), buff.rtpStats.sequenceNumber.GetExtendedHighest())
require.Equal(t, uint16(2), buff.rtpStats.HighestSequenceNumber())
require.Equal(t, uint64(65536+2), buff.rtpStats.ExtendedHighestSequenceNumber())
})
}
}
+4 -2
View File
@@ -14,7 +14,9 @@
package buffer
import "github.com/livekit/livekit-server/pkg/sfu/rtpstats"
type StreamStatsWithLayers struct {
RTPStats *RTPDeltaInfo
Layers map[int32]*RTPDeltaInfo
RTPStats *rtpstats.RTPDeltaInfo
Layers map[int32]*rtpstats.RTPDeltaInfo
}
+9 -8
View File
@@ -28,6 +28,7 @@ import (
"github.com/livekit/protocol/logger"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/livekit-server/pkg/sfu/rtpstats"
)
const (
@@ -203,7 +204,7 @@ func (cs *ConnectionStats) GetScoreAndQuality() (float32, livekit.ConnectionQual
return cs.scorer.GetMOSAndQuality()
}
func (cs *ConnectionStats) updateScoreWithAggregate(agg *buffer.RTPDeltaInfo, lastRTCPAt time.Time, at time.Time) float32 {
func (cs *ConnectionStats) updateScoreWithAggregate(agg *rtpstats.RTPDeltaInfo, lastRTCPAt time.Time, at time.Time) float32 {
var stat windowStat
if agg != nil {
stat.startedAt = agg.StartTime
@@ -289,11 +290,11 @@ func (cs *ConnectionStats) updateScoreAt(at time.Time) (float32, map[uint32]*buf
return mos, nil
}
deltaInfoList := make([]*buffer.RTPDeltaInfo, 0, len(streams))
deltaInfoList := make([]*rtpstats.RTPDeltaInfo, 0, len(streams))
for _, s := range streams {
deltaInfoList = append(deltaInfoList, s.RTPStats)
}
agg := buffer.AggregateRTPDeltaInfo(deltaInfoList)
agg := rtpstats.AggregateRTPDeltaInfo(deltaInfoList)
return cs.updateScoreWithAggregate(agg, cs.params.ReceiverProvider.GetLastSenderReportTime(), at), streams
}
@@ -415,15 +416,15 @@ func getPacketLossWeight(mimeType string, isFecEnabled bool) float64 {
return plw
}
func toAggregateDeltaInfo(streams map[uint32]*buffer.StreamStatsWithLayers) *buffer.RTPDeltaInfo {
deltaInfoList := make([]*buffer.RTPDeltaInfo, 0, len(streams))
func toAggregateDeltaInfo(streams map[uint32]*buffer.StreamStatsWithLayers) *rtpstats.RTPDeltaInfo {
deltaInfoList := make([]*rtpstats.RTPDeltaInfo, 0, len(streams))
for _, s := range streams {
deltaInfoList = append(deltaInfoList, s.RTPStats)
}
return buffer.AggregateRTPDeltaInfo(deltaInfoList)
return rtpstats.AggregateRTPDeltaInfo(deltaInfoList)
}
func toAnalyticsStream(ssrc uint32, deltaStats *buffer.RTPDeltaInfo) *livekit.AnalyticsStream {
func toAnalyticsStream(ssrc uint32, deltaStats *rtpstats.RTPDeltaInfo) *livekit.AnalyticsStream {
// discount the feed side loss when reporting forwarded track stats
packetsLost := deltaStats.PacketsLost
if deltaStats.PacketsMissing > packetsLost {
@@ -452,7 +453,7 @@ func toAnalyticsStream(ssrc uint32, deltaStats *buffer.RTPDeltaInfo) *livekit.An
}
}
func toAnalyticsVideoLayer(layer int32, layerStats *buffer.RTPDeltaInfo) *livekit.AnalyticsVideoLayer {
func toAnalyticsVideoLayer(layer int32, layerStats *rtpstats.RTPDeltaInfo) *livekit.AnalyticsVideoLayer {
avl := &livekit.AnalyticsVideoLayer{
Layer: layer,
Packets: layerStats.Packets + layerStats.PacketsDuplicate + layerStats.PacketsPadding,
@@ -22,6 +22,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/livekit-server/pkg/sfu/rtpstats"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
)
@@ -82,7 +83,7 @@ func TestConnectionQuality(t *testing.T) {
// best conditions (no loss, jitter/rtt = 0) - quality should stay EXCELLENT
trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{
1: {
RTPStats: &buffer.RTPDeltaInfo{
RTPStats: &rtpstats.RTPDeltaInfo{
StartTime: now,
EndTime: now.Add(duration),
Packets: 250,
@@ -98,7 +99,7 @@ func TestConnectionQuality(t *testing.T) {
now = now.Add(duration)
trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{
1: {
RTPStats: &buffer.RTPDeltaInfo{
RTPStats: &rtpstats.RTPDeltaInfo{
StartTime: now,
EndTime: now.Add(duration),
Packets: 120,
@@ -106,7 +107,7 @@ func TestConnectionQuality(t *testing.T) {
},
},
2: {
RTPStats: &buffer.RTPDeltaInfo{
RTPStats: &rtpstats.RTPDeltaInfo{
StartTime: now,
EndTime: now.Add(duration),
Packets: 130,
@@ -125,7 +126,7 @@ func TestConnectionQuality(t *testing.T) {
now = now.Add(duration)
trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{
1: {
RTPStats: &buffer.RTPDeltaInfo{
RTPStats: &rtpstats.RTPDeltaInfo{
StartTime: now,
EndTime: now.Add(duration),
Packets: 250,
@@ -141,7 +142,7 @@ func TestConnectionQuality(t *testing.T) {
now = now.Add(duration)
trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{
1: {
RTPStats: &buffer.RTPDeltaInfo{
RTPStats: &rtpstats.RTPDeltaInfo{
StartTime: now,
EndTime: now.Add(duration),
Packets: 250,
@@ -157,7 +158,7 @@ func TestConnectionQuality(t *testing.T) {
now = now.Add(duration)
trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{
1: {
RTPStats: &buffer.RTPDeltaInfo{
RTPStats: &rtpstats.RTPDeltaInfo{
StartTime: now,
EndTime: now.Add(duration),
Packets: 250,
@@ -173,7 +174,7 @@ func TestConnectionQuality(t *testing.T) {
now = now.Add(duration)
trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{
1: {
RTPStats: &buffer.RTPDeltaInfo{
RTPStats: &rtpstats.RTPDeltaInfo{
StartTime: now,
EndTime: now.Add(duration),
Packets: 250,
@@ -190,7 +191,7 @@ func TestConnectionQuality(t *testing.T) {
now = now.Add(duration)
trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{
1: {
RTPStats: &buffer.RTPDeltaInfo{
RTPStats: &rtpstats.RTPDeltaInfo{
StartTime: now,
EndTime: now.Add(duration),
Packets: 250,
@@ -206,7 +207,7 @@ func TestConnectionQuality(t *testing.T) {
now = now.Add(duration)
trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{
1: {
RTPStats: &buffer.RTPDeltaInfo{
RTPStats: &rtpstats.RTPDeltaInfo{
StartTime: now,
EndTime: now.Add(duration),
Packets: 250,
@@ -222,7 +223,7 @@ func TestConnectionQuality(t *testing.T) {
now = now.Add(duration)
trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{
1: {
RTPStats: &buffer.RTPDeltaInfo{
RTPStats: &rtpstats.RTPDeltaInfo{
StartTime: now,
EndTime: now.Add(duration),
Packets: 250,
@@ -247,7 +248,7 @@ func TestConnectionQuality(t *testing.T) {
trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{
1: {
RTPStats: &buffer.RTPDeltaInfo{
RTPStats: &rtpstats.RTPDeltaInfo{
StartTime: now,
EndTime: now.Add(duration),
Packets: 0,
@@ -264,7 +265,7 @@ func TestConnectionQuality(t *testing.T) {
now = now.Add(duration)
trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{
1: {
RTPStats: &buffer.RTPDeltaInfo{
RTPStats: &rtpstats.RTPDeltaInfo{
StartTime: now,
EndTime: now.Add(duration),
Packets: 0,
@@ -281,7 +282,7 @@ func TestConnectionQuality(t *testing.T) {
trp.setLastSenderReportTime(now.Add(time.Second))
trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{
1: {
RTPStats: &buffer.RTPDeltaInfo{
RTPStats: &rtpstats.RTPDeltaInfo{
StartTime: now,
EndTime: now.Add(duration),
Packets: 0,
@@ -297,7 +298,7 @@ func TestConnectionQuality(t *testing.T) {
now = now.Add(duration)
trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{
1: {
RTPStats: &buffer.RTPDeltaInfo{
RTPStats: &rtpstats.RTPDeltaInfo{
StartTime: now,
EndTime: now.Add(duration),
Packets: 0,
@@ -322,7 +323,7 @@ func TestConnectionQuality(t *testing.T) {
for i := 0; i < 3; i++ {
trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{
1: {
RTPStats: &buffer.RTPDeltaInfo{
RTPStats: &rtpstats.RTPDeltaInfo{
StartTime: now,
EndTime: now.Add(duration),
Packets: 250,
@@ -342,7 +343,7 @@ func TestConnectionQuality(t *testing.T) {
// even higher loss (like 10%) should not knock down quality due to quadratic weighting of packet loss ratio
trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{
1: {
RTPStats: &buffer.RTPDeltaInfo{
RTPStats: &rtpstats.RTPDeltaInfo{
StartTime: now,
EndTime: now.Add(duration),
Packets: 50,
@@ -364,7 +365,7 @@ func TestConnectionQuality(t *testing.T) {
// at 2% loss, quality should stay at EXCELLENT purely based on loss, but with added RTT/jitter, should drop to GOOD
trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{
1: {
RTPStats: &buffer.RTPDeltaInfo{
RTPStats: &rtpstats.RTPDeltaInfo{
StartTime: now,
EndTime: now.Add(duration),
Packets: 250,
@@ -390,7 +391,7 @@ func TestConnectionQuality(t *testing.T) {
trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{
1: {
RTPStats: &buffer.RTPDeltaInfo{
RTPStats: &rtpstats.RTPDeltaInfo{
StartTime: now,
EndTime: now.Add(duration),
Packets: 250,
@@ -409,7 +410,7 @@ func TestConnectionQuality(t *testing.T) {
trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{
1: {
RTPStats: &buffer.RTPDeltaInfo{
RTPStats: &rtpstats.RTPDeltaInfo{
StartTime: now,
EndTime: now.Add(duration),
Packets: 250,
@@ -433,7 +434,7 @@ func TestConnectionQuality(t *testing.T) {
trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{
1: {
RTPStats: &buffer.RTPDeltaInfo{
RTPStats: &rtpstats.RTPDeltaInfo{
StartTime: now,
EndTime: now.Add(duration),
Packets: 250,
@@ -460,7 +461,7 @@ func TestConnectionQuality(t *testing.T) {
// will only climb to GOOD.
trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{
1: {
RTPStats: &buffer.RTPDeltaInfo{
RTPStats: &rtpstats.RTPDeltaInfo{
StartTime: now,
EndTime: now.Add(duration),
Packets: 250,
@@ -494,7 +495,7 @@ func TestConnectionQuality(t *testing.T) {
// quality should drop to GOOD if RTT were taken into consideration
trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{
1: {
RTPStats: &buffer.RTPDeltaInfo{
RTPStats: &rtpstats.RTPDeltaInfo{
StartTime: now,
EndTime: now.Add(duration),
Packets: 250,
@@ -529,7 +530,7 @@ func TestConnectionQuality(t *testing.T) {
// quality should drop to GOOD if jitter were taken into consideration
trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{
1: {
RTPStats: &buffer.RTPDeltaInfo{
RTPStats: &rtpstats.RTPDeltaInfo{
StartTime: now,
EndTime: now.Add(duration),
Packets: 250,
@@ -698,7 +699,7 @@ func TestConnectionQuality(t *testing.T) {
for _, eq := range tc.expectedQualities {
trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{
123: {
RTPStats: &buffer.RTPDeltaInfo{
RTPStats: &rtpstats.RTPDeltaInfo{
StartTime: now,
EndTime: now.Add(duration),
Packets: tc.packetsExpected,
@@ -802,7 +803,7 @@ func TestConnectionQuality(t *testing.T) {
trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{
123: {
RTPStats: &buffer.RTPDeltaInfo{
RTPStats: &rtpstats.RTPDeltaInfo{
StartTime: now,
EndTime: now.Add(duration),
Packets: 100,
@@ -896,7 +897,7 @@ func TestConnectionQuality(t *testing.T) {
trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{
123: {
RTPStats: &buffer.RTPDeltaInfo{
RTPStats: &rtpstats.RTPDeltaInfo{
StartTime: now,
EndTime: now.Add(duration),
Packets: 200,
+7 -6
View File
@@ -41,6 +41,7 @@ import (
act "github.com/livekit/livekit-server/pkg/sfu/rtpextension/abscapturetime"
dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor"
pd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/playoutdelay"
"github.com/livekit/livekit-server/pkg/sfu/rtpstats"
"github.com/livekit/livekit-server/pkg/sfu/utils"
)
@@ -136,7 +137,7 @@ var (
// -------------------------------------------------------------------
type DownTrackState struct {
RTPStats *buffer.RTPStatsSender
RTPStats *rtpstats.RTPStatsSender
DeltaStatsSenderSnapshotId uint32
ForwarderState *livekit.RTPForwarderState
}
@@ -277,7 +278,7 @@ type DownTrack struct {
writeStopped atomic.Bool
isReceiverReady bool
rtpStats *buffer.RTPStatsSender
rtpStats *rtpstats.RTPStatsSender
totalRepeatedNACKs atomic.Uint32
@@ -379,7 +380,7 @@ func NewDownTrack(params DowntrackParams) (*DownTrack, error) {
d.getExpectedRTPTimestamp,
)
d.rtpStats = buffer.NewRTPStatsSender(buffer.RTPStatsParams{
d.rtpStats = rtpstats.NewRTPStatsSender(rtpstats.RTPStatsParams{
ClockRate: d.codec.ClockRate,
Logger: d.params.Logger,
})
@@ -918,7 +919,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error {
_, _, refSenderReport := d.forwarder.GetSenderReportParams()
if refSenderReport != nil {
actExtCopy := *extPkt.AbsCaptureTimeExt
if err = actExtCopy.Rewrite(buffer.RTCPSenderReportPropagationDelay(refSenderReport, !d.params.DisableSenderReportPassThrough)); err == nil {
if err = actExtCopy.Rewrite(rtpstats.RTCPSenderReportPropagationDelay(refSenderReport, !d.params.DisableSenderReportPassThrough)); err == nil {
actBytes, err = actExtCopy.Marshal()
if err == nil {
extensions = append(
@@ -2061,7 +2062,7 @@ func (d *DownTrack) GetTrackStats() *livekit.RTPStats {
return d.rtpStats.ToProto()
}
func (d *DownTrack) deltaStats(ds *buffer.RTPDeltaInfo) map[uint32]*buffer.StreamStatsWithLayers {
func (d *DownTrack) deltaStats(ds *rtpstats.RTPDeltaInfo) map[uint32]*buffer.StreamStatsWithLayers {
if ds == nil {
return nil
}
@@ -2069,7 +2070,7 @@ func (d *DownTrack) deltaStats(ds *buffer.RTPDeltaInfo) map[uint32]*buffer.Strea
streamStats := make(map[uint32]*buffer.StreamStatsWithLayers, 1)
streamStats[d.ssrc] = &buffer.StreamStatsWithLayers{
RTPStats: ds,
Layers: map[int32]*buffer.RTPDeltaInfo{
Layers: map[int32]*rtpstats.RTPDeltaInfo{
0: ds,
},
}
+2 -1
View File
@@ -35,6 +35,7 @@ import (
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/livekit-server/pkg/sfu/codecmunger"
dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor"
"github.com/livekit/livekit-server/pkg/sfu/rtpstats"
"github.com/livekit/livekit-server/pkg/sfu/videolayerselector"
"github.com/livekit/livekit-server/pkg/sfu/videolayerselector/temporallayerselector"
)
@@ -196,7 +197,7 @@ type refInfo struct {
}
func (r refInfo) MarshalLogObject(e zapcore.ObjectEncoder) error {
e.AddObject("senderReport", buffer.WrappedRTCPSenderReportStateLogger{
e.AddObject("senderReport", rtpstats.WrappedRTCPSenderReportStateLogger{
RTCPSenderReportState: r.senderReport,
})
e.AddUint64("tsOffset", r.tsOffset)
+3 -3
View File
@@ -19,8 +19,8 @@ import (
"sync/atomic"
"time"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
pd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/playoutdelay"
"github.com/livekit/livekit-server/pkg/sfu/rtpstats"
"github.com/livekit/protocol/logger"
)
@@ -59,13 +59,13 @@ type PlayoutDelayController struct {
sendingAtSeq uint16
sendingAtTime time.Time
logger logger.Logger
rtpStats *buffer.RTPStatsSender
rtpStats *rtpstats.RTPStatsSender
snapshotID uint32
highDelayCount atomic.Uint32
}
func NewPlayoutDelayController(minDelay, maxDelay uint32, logger logger.Logger, rtpStats *buffer.RTPStatsSender) (*PlayoutDelayController, error) {
func NewPlayoutDelayController(minDelay, maxDelay uint32, logger logger.Logger, rtpStats *rtpstats.RTPStatsSender) (*PlayoutDelayController, error) {
if maxDelay == 0 && minDelay > 0 {
maxDelay = pd.MaxPlayoutDelayDefault
}
+2 -2
View File
@@ -20,13 +20,13 @@ import (
"github.com/stretchr/testify/require"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
pd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/playoutdelay"
"github.com/livekit/livekit-server/pkg/sfu/rtpstats"
"github.com/livekit/protocol/logger"
)
func TestPlayoutDelay(t *testing.T) {
stats := buffer.NewRTPStatsSender(buffer.RTPStatsParams{ClockRate: 900000, Logger: logger.GetLogger()})
stats := rtpstats.NewRTPStatsSender(rtpstats.RTPStatsParams{ClockRate: 900000, Logger: logger.GetLogger()})
c, err := NewPlayoutDelayController(100, 120, logger.GetLogger(), stats)
require.NoError(t, err)
+3 -2
View File
@@ -35,6 +35,7 @@ import (
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/livekit-server/pkg/sfu/connectionquality"
dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor"
"github.com/livekit/livekit-server/pkg/sfu/rtpstats"
)
var (
@@ -597,7 +598,7 @@ func (w *WebRTCReceiver) GetTrackStats() *livekit.RTPStats {
stats = append(stats, sswl)
}
return buffer.AggregateRTPStats(stats)
return rtpstats.AggregateRTPStats(stats)
}
func (w *WebRTCReceiver) GetAudioLevel() (float64, bool) {
@@ -636,7 +637,7 @@ func (w *WebRTCReceiver) GetDeltaStats() map[uint32]*buffer.StreamStatsWithLayer
}
// patch buffer stats with correct layer
patched := make(map[int32]*buffer.RTPDeltaInfo, 1)
patched := make(map[int32]*rtpstats.RTPDeltaInfo, 1)
patched[int32(layer)] = sswl.Layers[0]
sswl.Layers = patched
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package buffer
package rtpstats
import (
"errors"
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package buffer
package rtpstats
import (
"fmt"
@@ -216,7 +216,7 @@ func (r *RTPStatsReceiver) Update(
}
gapTS := int64(resTS.ExtendedVal - resTS.PreExtendedHighest)
// it is possible to reecive old packets in two different scenarios
// it is possible to receive old packets in two different scenarios
// as it is not possible to detect how far to roll back, ignore old packets
//
// Case 1:
@@ -686,6 +686,22 @@ func (r *RTPStatsReceiver) HighestTimestamp() uint32 {
return r.timestamp.GetHighest()
}
// for testing only
func (r *RTPStatsReceiver) HighestSequenceNumber() uint16 {
r.lock.RLock()
defer r.lock.RUnlock()
return r.sequenceNumber.GetHighest()
}
// for testing only
func (r *RTPStatsReceiver) ExtendedHighestSequenceNumber() uint64 {
r.lock.RLock()
defer r.lock.RUnlock()
return r.sequenceNumber.GetExtendedHighest()
}
// ----------------------------------
type lockedRTPStatsReceiverLogEncoder struct {
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package buffer
package rtpstats
import (
"fmt"
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package buffer
package rtpstats
import (
"errors"