diff --git a/pkg/rtc/mediatrackreceiver.go b/pkg/rtc/mediatrackreceiver.go index 433f64aad..a404c6852 100644 --- a/pkg/rtc/mediatrackreceiver.go +++ b/pkg/rtc/mediatrackreceiver.go @@ -36,6 +36,7 @@ import ( "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/livekit-server/pkg/sfu" "github.com/livekit/livekit-server/pkg/sfu/buffer" + "github.com/livekit/livekit-server/pkg/sfu/rtpstats" "github.com/livekit/livekit-server/pkg/telemetry" ) @@ -893,5 +894,5 @@ func (t *MediaTrackReceiver) GetTrackStats() *livekit.RTPStats { } } - return buffer.AggregateRTPStats(stats) + return rtpstats.AggregateRTPStats(stats) } diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index f467ac627..00baf1f84 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -33,6 +33,7 @@ import ( "github.com/livekit/livekit-server/pkg/sfu/audio" act "github.com/livekit/livekit-server/pkg/sfu/rtpextension/abscapturetime" dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor" + "github.com/livekit/livekit-server/pkg/sfu/rtpstats" "github.com/livekit/livekit-server/pkg/sfu/utils" sutils "github.com/livekit/livekit-server/pkg/utils" "github.com/livekit/mediatransportutil/pkg/bucket" @@ -105,7 +106,7 @@ type Buffer struct { pliThrottle int64 - rtpStats *RTPStatsReceiver + rtpStats *rtpstats.RTPStatsReceiver rrSnapshotId uint32 deltaStatsSnapshotId uint32 ppsSnapshotId uint32 @@ -211,7 +212,7 @@ func (b *Buffer) Bind(params webrtc.RTPParameters, codec webrtc.RTPCodecCapabili return } - b.rtpStats = NewRTPStatsReceiver(RTPStatsParams{ + b.rtpStats = rtpstats.NewRTPStatsReceiver(rtpstats.RTPStatsParams{ ClockRate: codec.ClockRate, Logger: b.logger, }) @@ -716,7 +717,7 @@ func (b *Buffer) doFpsCalc(ep *ExtPacket) { } } -func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime int64) RTPFlowState { +func (b *Buffer) updateStreamState(p *rtp.Packet, arrivalTime int64) rtpstats.RTPFlowState { flowState := b.rtpStats.Update( arrivalTime, p.Header.SequenceNumber, @@ -762,7 +763,7 @@ func (b *Buffer) processHeaderExtensions(p *rtp.Packet, arrivalTime int64, isRTX } } -func (b *Buffer) getExtPacket(rtpPacket *rtp.Packet, arrivalTime int64, flowState RTPFlowState) *ExtPacket { +func (b *Buffer) getExtPacket(rtpPacket *rtp.Packet, arrivalTime int64, flowState rtpstats.RTPFlowState) *ExtPacket { ep := &ExtPacket{ Arrival: arrivalTime, ExtSequenceNumber: flowState.ExtSequenceNumber, @@ -1076,7 +1077,7 @@ func (b *Buffer) GetDeltaStats() *StreamStatsWithLayers { return &StreamStatsWithLayers{ RTPStats: deltaStats, - Layers: map[int32]*RTPDeltaInfo{ + Layers: map[int32]*rtpstats.RTPDeltaInfo{ 0: deltaStats, }, } diff --git a/pkg/sfu/buffer/buffer_test.go b/pkg/sfu/buffer/buffer_test.go index e5ce7fdc4..012c36545 100644 --- a/pkg/sfu/buffer/buffer_test.go +++ b/pkg/sfu/buffer/buffer_test.go @@ -223,8 +223,8 @@ func TestNewBuffer(t *testing.T) { buf, _ := p.Marshal() _, _ = buff.Write(buf) } - require.Equal(t, uint16(2), buff.rtpStats.sequenceNumber.GetHighest()) - require.Equal(t, uint64(65536+2), buff.rtpStats.sequenceNumber.GetExtendedHighest()) + require.Equal(t, uint16(2), buff.rtpStats.HighestSequenceNumber()) + require.Equal(t, uint64(65536+2), buff.rtpStats.ExtendedHighestSequenceNumber()) }) } } diff --git a/pkg/sfu/buffer/streamstats.go b/pkg/sfu/buffer/streamstats.go index cdd8e1333..e38cd1980 100644 --- a/pkg/sfu/buffer/streamstats.go +++ b/pkg/sfu/buffer/streamstats.go @@ -14,7 +14,9 @@ package buffer +import "github.com/livekit/livekit-server/pkg/sfu/rtpstats" + type StreamStatsWithLayers struct { - RTPStats *RTPDeltaInfo - Layers map[int32]*RTPDeltaInfo + RTPStats *rtpstats.RTPDeltaInfo + Layers map[int32]*rtpstats.RTPDeltaInfo } diff --git a/pkg/sfu/connectionquality/connectionstats.go b/pkg/sfu/connectionquality/connectionstats.go index da509db16..29fa49fc8 100644 --- a/pkg/sfu/connectionquality/connectionstats.go +++ b/pkg/sfu/connectionquality/connectionstats.go @@ -28,6 +28,7 @@ import ( "github.com/livekit/protocol/logger" "github.com/livekit/livekit-server/pkg/sfu/buffer" + "github.com/livekit/livekit-server/pkg/sfu/rtpstats" ) const ( @@ -203,7 +204,7 @@ func (cs *ConnectionStats) GetScoreAndQuality() (float32, livekit.ConnectionQual return cs.scorer.GetMOSAndQuality() } -func (cs *ConnectionStats) updateScoreWithAggregate(agg *buffer.RTPDeltaInfo, lastRTCPAt time.Time, at time.Time) float32 { +func (cs *ConnectionStats) updateScoreWithAggregate(agg *rtpstats.RTPDeltaInfo, lastRTCPAt time.Time, at time.Time) float32 { var stat windowStat if agg != nil { stat.startedAt = agg.StartTime @@ -289,11 +290,11 @@ func (cs *ConnectionStats) updateScoreAt(at time.Time) (float32, map[uint32]*buf return mos, nil } - deltaInfoList := make([]*buffer.RTPDeltaInfo, 0, len(streams)) + deltaInfoList := make([]*rtpstats.RTPDeltaInfo, 0, len(streams)) for _, s := range streams { deltaInfoList = append(deltaInfoList, s.RTPStats) } - agg := buffer.AggregateRTPDeltaInfo(deltaInfoList) + agg := rtpstats.AggregateRTPDeltaInfo(deltaInfoList) return cs.updateScoreWithAggregate(agg, cs.params.ReceiverProvider.GetLastSenderReportTime(), at), streams } @@ -415,15 +416,15 @@ func getPacketLossWeight(mimeType string, isFecEnabled bool) float64 { return plw } -func toAggregateDeltaInfo(streams map[uint32]*buffer.StreamStatsWithLayers) *buffer.RTPDeltaInfo { - deltaInfoList := make([]*buffer.RTPDeltaInfo, 0, len(streams)) +func toAggregateDeltaInfo(streams map[uint32]*buffer.StreamStatsWithLayers) *rtpstats.RTPDeltaInfo { + deltaInfoList := make([]*rtpstats.RTPDeltaInfo, 0, len(streams)) for _, s := range streams { deltaInfoList = append(deltaInfoList, s.RTPStats) } - return buffer.AggregateRTPDeltaInfo(deltaInfoList) + return rtpstats.AggregateRTPDeltaInfo(deltaInfoList) } -func toAnalyticsStream(ssrc uint32, deltaStats *buffer.RTPDeltaInfo) *livekit.AnalyticsStream { +func toAnalyticsStream(ssrc uint32, deltaStats *rtpstats.RTPDeltaInfo) *livekit.AnalyticsStream { // discount the feed side loss when reporting forwarded track stats packetsLost := deltaStats.PacketsLost if deltaStats.PacketsMissing > packetsLost { @@ -452,7 +453,7 @@ func toAnalyticsStream(ssrc uint32, deltaStats *buffer.RTPDeltaInfo) *livekit.An } } -func toAnalyticsVideoLayer(layer int32, layerStats *buffer.RTPDeltaInfo) *livekit.AnalyticsVideoLayer { +func toAnalyticsVideoLayer(layer int32, layerStats *rtpstats.RTPDeltaInfo) *livekit.AnalyticsVideoLayer { avl := &livekit.AnalyticsVideoLayer{ Layer: layer, Packets: layerStats.Packets + layerStats.PacketsDuplicate + layerStats.PacketsPadding, diff --git a/pkg/sfu/connectionquality/connectionstats_test.go b/pkg/sfu/connectionquality/connectionstats_test.go index 78e217130..90a4ac015 100644 --- a/pkg/sfu/connectionquality/connectionstats_test.go +++ b/pkg/sfu/connectionquality/connectionstats_test.go @@ -22,6 +22,7 @@ import ( "github.com/stretchr/testify/require" "github.com/livekit/livekit-server/pkg/sfu/buffer" + "github.com/livekit/livekit-server/pkg/sfu/rtpstats" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" ) @@ -82,7 +83,7 @@ func TestConnectionQuality(t *testing.T) { // best conditions (no loss, jitter/rtt = 0) - quality should stay EXCELLENT trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{ 1: { - RTPStats: &buffer.RTPDeltaInfo{ + RTPStats: &rtpstats.RTPDeltaInfo{ StartTime: now, EndTime: now.Add(duration), Packets: 250, @@ -98,7 +99,7 @@ func TestConnectionQuality(t *testing.T) { now = now.Add(duration) trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{ 1: { - RTPStats: &buffer.RTPDeltaInfo{ + RTPStats: &rtpstats.RTPDeltaInfo{ StartTime: now, EndTime: now.Add(duration), Packets: 120, @@ -106,7 +107,7 @@ func TestConnectionQuality(t *testing.T) { }, }, 2: { - RTPStats: &buffer.RTPDeltaInfo{ + RTPStats: &rtpstats.RTPDeltaInfo{ StartTime: now, EndTime: now.Add(duration), Packets: 130, @@ -125,7 +126,7 @@ func TestConnectionQuality(t *testing.T) { now = now.Add(duration) trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{ 1: { - RTPStats: &buffer.RTPDeltaInfo{ + RTPStats: &rtpstats.RTPDeltaInfo{ StartTime: now, EndTime: now.Add(duration), Packets: 250, @@ -141,7 +142,7 @@ func TestConnectionQuality(t *testing.T) { now = now.Add(duration) trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{ 1: { - RTPStats: &buffer.RTPDeltaInfo{ + RTPStats: &rtpstats.RTPDeltaInfo{ StartTime: now, EndTime: now.Add(duration), Packets: 250, @@ -157,7 +158,7 @@ func TestConnectionQuality(t *testing.T) { now = now.Add(duration) trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{ 1: { - RTPStats: &buffer.RTPDeltaInfo{ + RTPStats: &rtpstats.RTPDeltaInfo{ StartTime: now, EndTime: now.Add(duration), Packets: 250, @@ -173,7 +174,7 @@ func TestConnectionQuality(t *testing.T) { now = now.Add(duration) trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{ 1: { - RTPStats: &buffer.RTPDeltaInfo{ + RTPStats: &rtpstats.RTPDeltaInfo{ StartTime: now, EndTime: now.Add(duration), Packets: 250, @@ -190,7 +191,7 @@ func TestConnectionQuality(t *testing.T) { now = now.Add(duration) trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{ 1: { - RTPStats: &buffer.RTPDeltaInfo{ + RTPStats: &rtpstats.RTPDeltaInfo{ StartTime: now, EndTime: now.Add(duration), Packets: 250, @@ -206,7 +207,7 @@ func TestConnectionQuality(t *testing.T) { now = now.Add(duration) trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{ 1: { - RTPStats: &buffer.RTPDeltaInfo{ + RTPStats: &rtpstats.RTPDeltaInfo{ StartTime: now, EndTime: now.Add(duration), Packets: 250, @@ -222,7 +223,7 @@ func TestConnectionQuality(t *testing.T) { now = now.Add(duration) trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{ 1: { - RTPStats: &buffer.RTPDeltaInfo{ + RTPStats: &rtpstats.RTPDeltaInfo{ StartTime: now, EndTime: now.Add(duration), Packets: 250, @@ -247,7 +248,7 @@ func TestConnectionQuality(t *testing.T) { trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{ 1: { - RTPStats: &buffer.RTPDeltaInfo{ + RTPStats: &rtpstats.RTPDeltaInfo{ StartTime: now, EndTime: now.Add(duration), Packets: 0, @@ -264,7 +265,7 @@ func TestConnectionQuality(t *testing.T) { now = now.Add(duration) trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{ 1: { - RTPStats: &buffer.RTPDeltaInfo{ + RTPStats: &rtpstats.RTPDeltaInfo{ StartTime: now, EndTime: now.Add(duration), Packets: 0, @@ -281,7 +282,7 @@ func TestConnectionQuality(t *testing.T) { trp.setLastSenderReportTime(now.Add(time.Second)) trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{ 1: { - RTPStats: &buffer.RTPDeltaInfo{ + RTPStats: &rtpstats.RTPDeltaInfo{ StartTime: now, EndTime: now.Add(duration), Packets: 0, @@ -297,7 +298,7 @@ func TestConnectionQuality(t *testing.T) { now = now.Add(duration) trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{ 1: { - RTPStats: &buffer.RTPDeltaInfo{ + RTPStats: &rtpstats.RTPDeltaInfo{ StartTime: now, EndTime: now.Add(duration), Packets: 0, @@ -322,7 +323,7 @@ func TestConnectionQuality(t *testing.T) { for i := 0; i < 3; i++ { trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{ 1: { - RTPStats: &buffer.RTPDeltaInfo{ + RTPStats: &rtpstats.RTPDeltaInfo{ StartTime: now, EndTime: now.Add(duration), Packets: 250, @@ -342,7 +343,7 @@ func TestConnectionQuality(t *testing.T) { // even higher loss (like 10%) should not knock down quality due to quadratic weighting of packet loss ratio trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{ 1: { - RTPStats: &buffer.RTPDeltaInfo{ + RTPStats: &rtpstats.RTPDeltaInfo{ StartTime: now, EndTime: now.Add(duration), Packets: 50, @@ -364,7 +365,7 @@ func TestConnectionQuality(t *testing.T) { // at 2% loss, quality should stay at EXCELLENT purely based on loss, but with added RTT/jitter, should drop to GOOD trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{ 1: { - RTPStats: &buffer.RTPDeltaInfo{ + RTPStats: &rtpstats.RTPDeltaInfo{ StartTime: now, EndTime: now.Add(duration), Packets: 250, @@ -390,7 +391,7 @@ func TestConnectionQuality(t *testing.T) { trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{ 1: { - RTPStats: &buffer.RTPDeltaInfo{ + RTPStats: &rtpstats.RTPDeltaInfo{ StartTime: now, EndTime: now.Add(duration), Packets: 250, @@ -409,7 +410,7 @@ func TestConnectionQuality(t *testing.T) { trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{ 1: { - RTPStats: &buffer.RTPDeltaInfo{ + RTPStats: &rtpstats.RTPDeltaInfo{ StartTime: now, EndTime: now.Add(duration), Packets: 250, @@ -433,7 +434,7 @@ func TestConnectionQuality(t *testing.T) { trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{ 1: { - RTPStats: &buffer.RTPDeltaInfo{ + RTPStats: &rtpstats.RTPDeltaInfo{ StartTime: now, EndTime: now.Add(duration), Packets: 250, @@ -460,7 +461,7 @@ func TestConnectionQuality(t *testing.T) { // will only climb to GOOD. trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{ 1: { - RTPStats: &buffer.RTPDeltaInfo{ + RTPStats: &rtpstats.RTPDeltaInfo{ StartTime: now, EndTime: now.Add(duration), Packets: 250, @@ -494,7 +495,7 @@ func TestConnectionQuality(t *testing.T) { // quality should drop to GOOD if RTT were taken into consideration trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{ 1: { - RTPStats: &buffer.RTPDeltaInfo{ + RTPStats: &rtpstats.RTPDeltaInfo{ StartTime: now, EndTime: now.Add(duration), Packets: 250, @@ -529,7 +530,7 @@ func TestConnectionQuality(t *testing.T) { // quality should drop to GOOD if jitter were taken into consideration trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{ 1: { - RTPStats: &buffer.RTPDeltaInfo{ + RTPStats: &rtpstats.RTPDeltaInfo{ StartTime: now, EndTime: now.Add(duration), Packets: 250, @@ -698,7 +699,7 @@ func TestConnectionQuality(t *testing.T) { for _, eq := range tc.expectedQualities { trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{ 123: { - RTPStats: &buffer.RTPDeltaInfo{ + RTPStats: &rtpstats.RTPDeltaInfo{ StartTime: now, EndTime: now.Add(duration), Packets: tc.packetsExpected, @@ -802,7 +803,7 @@ func TestConnectionQuality(t *testing.T) { trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{ 123: { - RTPStats: &buffer.RTPDeltaInfo{ + RTPStats: &rtpstats.RTPDeltaInfo{ StartTime: now, EndTime: now.Add(duration), Packets: 100, @@ -896,7 +897,7 @@ func TestConnectionQuality(t *testing.T) { trp.setStreams(map[uint32]*buffer.StreamStatsWithLayers{ 123: { - RTPStats: &buffer.RTPDeltaInfo{ + RTPStats: &rtpstats.RTPDeltaInfo{ StartTime: now, EndTime: now.Add(duration), Packets: 200, diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 6df5f4842..c6c7f9b30 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -41,6 +41,7 @@ import ( act "github.com/livekit/livekit-server/pkg/sfu/rtpextension/abscapturetime" dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor" pd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/playoutdelay" + "github.com/livekit/livekit-server/pkg/sfu/rtpstats" "github.com/livekit/livekit-server/pkg/sfu/utils" ) @@ -136,7 +137,7 @@ var ( // ------------------------------------------------------------------- type DownTrackState struct { - RTPStats *buffer.RTPStatsSender + RTPStats *rtpstats.RTPStatsSender DeltaStatsSenderSnapshotId uint32 ForwarderState *livekit.RTPForwarderState } @@ -277,7 +278,7 @@ type DownTrack struct { writeStopped atomic.Bool isReceiverReady bool - rtpStats *buffer.RTPStatsSender + rtpStats *rtpstats.RTPStatsSender totalRepeatedNACKs atomic.Uint32 @@ -379,7 +380,7 @@ func NewDownTrack(params DowntrackParams) (*DownTrack, error) { d.getExpectedRTPTimestamp, ) - d.rtpStats = buffer.NewRTPStatsSender(buffer.RTPStatsParams{ + d.rtpStats = rtpstats.NewRTPStatsSender(rtpstats.RTPStatsParams{ ClockRate: d.codec.ClockRate, Logger: d.params.Logger, }) @@ -918,7 +919,7 @@ func (d *DownTrack) WriteRTP(extPkt *buffer.ExtPacket, layer int32) error { _, _, refSenderReport := d.forwarder.GetSenderReportParams() if refSenderReport != nil { actExtCopy := *extPkt.AbsCaptureTimeExt - if err = actExtCopy.Rewrite(buffer.RTCPSenderReportPropagationDelay(refSenderReport, !d.params.DisableSenderReportPassThrough)); err == nil { + if err = actExtCopy.Rewrite(rtpstats.RTCPSenderReportPropagationDelay(refSenderReport, !d.params.DisableSenderReportPassThrough)); err == nil { actBytes, err = actExtCopy.Marshal() if err == nil { extensions = append( @@ -2061,7 +2062,7 @@ func (d *DownTrack) GetTrackStats() *livekit.RTPStats { return d.rtpStats.ToProto() } -func (d *DownTrack) deltaStats(ds *buffer.RTPDeltaInfo) map[uint32]*buffer.StreamStatsWithLayers { +func (d *DownTrack) deltaStats(ds *rtpstats.RTPDeltaInfo) map[uint32]*buffer.StreamStatsWithLayers { if ds == nil { return nil } @@ -2069,7 +2070,7 @@ func (d *DownTrack) deltaStats(ds *buffer.RTPDeltaInfo) map[uint32]*buffer.Strea streamStats := make(map[uint32]*buffer.StreamStatsWithLayers, 1) streamStats[d.ssrc] = &buffer.StreamStatsWithLayers{ RTPStats: ds, - Layers: map[int32]*buffer.RTPDeltaInfo{ + Layers: map[int32]*rtpstats.RTPDeltaInfo{ 0: ds, }, } diff --git a/pkg/sfu/forwarder.go b/pkg/sfu/forwarder.go index 43c5d1641..fd5bff07a 100644 --- a/pkg/sfu/forwarder.go +++ b/pkg/sfu/forwarder.go @@ -35,6 +35,7 @@ import ( "github.com/livekit/livekit-server/pkg/sfu/buffer" "github.com/livekit/livekit-server/pkg/sfu/codecmunger" dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor" + "github.com/livekit/livekit-server/pkg/sfu/rtpstats" "github.com/livekit/livekit-server/pkg/sfu/videolayerselector" "github.com/livekit/livekit-server/pkg/sfu/videolayerselector/temporallayerselector" ) @@ -196,7 +197,7 @@ type refInfo struct { } func (r refInfo) MarshalLogObject(e zapcore.ObjectEncoder) error { - e.AddObject("senderReport", buffer.WrappedRTCPSenderReportStateLogger{ + e.AddObject("senderReport", rtpstats.WrappedRTCPSenderReportStateLogger{ RTCPSenderReportState: r.senderReport, }) e.AddUint64("tsOffset", r.tsOffset) diff --git a/pkg/sfu/playoutdelay.go b/pkg/sfu/playoutdelay.go index c0ed985b0..573c16a6a 100644 --- a/pkg/sfu/playoutdelay.go +++ b/pkg/sfu/playoutdelay.go @@ -19,8 +19,8 @@ import ( "sync/atomic" "time" - "github.com/livekit/livekit-server/pkg/sfu/buffer" pd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/playoutdelay" + "github.com/livekit/livekit-server/pkg/sfu/rtpstats" "github.com/livekit/protocol/logger" ) @@ -59,13 +59,13 @@ type PlayoutDelayController struct { sendingAtSeq uint16 sendingAtTime time.Time logger logger.Logger - rtpStats *buffer.RTPStatsSender + rtpStats *rtpstats.RTPStatsSender snapshotID uint32 highDelayCount atomic.Uint32 } -func NewPlayoutDelayController(minDelay, maxDelay uint32, logger logger.Logger, rtpStats *buffer.RTPStatsSender) (*PlayoutDelayController, error) { +func NewPlayoutDelayController(minDelay, maxDelay uint32, logger logger.Logger, rtpStats *rtpstats.RTPStatsSender) (*PlayoutDelayController, error) { if maxDelay == 0 && minDelay > 0 { maxDelay = pd.MaxPlayoutDelayDefault } diff --git a/pkg/sfu/playoutdelay_test.go b/pkg/sfu/playoutdelay_test.go index 338b4d64b..a39674d1f 100644 --- a/pkg/sfu/playoutdelay_test.go +++ b/pkg/sfu/playoutdelay_test.go @@ -20,13 +20,13 @@ import ( "github.com/stretchr/testify/require" - "github.com/livekit/livekit-server/pkg/sfu/buffer" pd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/playoutdelay" + "github.com/livekit/livekit-server/pkg/sfu/rtpstats" "github.com/livekit/protocol/logger" ) func TestPlayoutDelay(t *testing.T) { - stats := buffer.NewRTPStatsSender(buffer.RTPStatsParams{ClockRate: 900000, Logger: logger.GetLogger()}) + stats := rtpstats.NewRTPStatsSender(rtpstats.RTPStatsParams{ClockRate: 900000, Logger: logger.GetLogger()}) c, err := NewPlayoutDelayController(100, 120, logger.GetLogger(), stats) require.NoError(t, err) diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 7bc8f236d..4c410ca22 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -35,6 +35,7 @@ import ( "github.com/livekit/livekit-server/pkg/sfu/buffer" "github.com/livekit/livekit-server/pkg/sfu/connectionquality" dd "github.com/livekit/livekit-server/pkg/sfu/rtpextension/dependencydescriptor" + "github.com/livekit/livekit-server/pkg/sfu/rtpstats" ) var ( @@ -597,7 +598,7 @@ func (w *WebRTCReceiver) GetTrackStats() *livekit.RTPStats { stats = append(stats, sswl) } - return buffer.AggregateRTPStats(stats) + return rtpstats.AggregateRTPStats(stats) } func (w *WebRTCReceiver) GetAudioLevel() (float64, bool) { @@ -636,7 +637,7 @@ func (w *WebRTCReceiver) GetDeltaStats() map[uint32]*buffer.StreamStatsWithLayer } // patch buffer stats with correct layer - patched := make(map[int32]*buffer.RTPDeltaInfo, 1) + patched := make(map[int32]*rtpstats.RTPDeltaInfo, 1) patched[int32(layer)] = sswl.Layers[0] sswl.Layers = patched diff --git a/pkg/sfu/buffer/rtpstats_base.go b/pkg/sfu/rtpstats/rtpstats_base.go similarity index 99% rename from pkg/sfu/buffer/rtpstats_base.go rename to pkg/sfu/rtpstats/rtpstats_base.go index 812fcacc3..768cb4464 100644 --- a/pkg/sfu/buffer/rtpstats_base.go +++ b/pkg/sfu/rtpstats/rtpstats_base.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package buffer +package rtpstats import ( "errors" diff --git a/pkg/sfu/buffer/rtpstats_receiver.go b/pkg/sfu/rtpstats/rtpstats_receiver.go similarity index 98% rename from pkg/sfu/buffer/rtpstats_receiver.go rename to pkg/sfu/rtpstats/rtpstats_receiver.go index 669f609b6..257c8d168 100644 --- a/pkg/sfu/buffer/rtpstats_receiver.go +++ b/pkg/sfu/rtpstats/rtpstats_receiver.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package buffer +package rtpstats import ( "fmt" @@ -216,7 +216,7 @@ func (r *RTPStatsReceiver) Update( } gapTS := int64(resTS.ExtendedVal - resTS.PreExtendedHighest) - // it is possible to reecive old packets in two different scenarios + // it is possible to receive old packets in two different scenarios // as it is not possible to detect how far to roll back, ignore old packets // // Case 1: @@ -686,6 +686,22 @@ func (r *RTPStatsReceiver) HighestTimestamp() uint32 { return r.timestamp.GetHighest() } +// for testing only +func (r *RTPStatsReceiver) HighestSequenceNumber() uint16 { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.sequenceNumber.GetHighest() +} + +// for testing only +func (r *RTPStatsReceiver) ExtendedHighestSequenceNumber() uint64 { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.sequenceNumber.GetExtendedHighest() +} + // ---------------------------------- type lockedRTPStatsReceiverLogEncoder struct { diff --git a/pkg/sfu/buffer/rtpstats_receiver_test.go b/pkg/sfu/rtpstats/rtpstats_receiver_test.go similarity index 99% rename from pkg/sfu/buffer/rtpstats_receiver_test.go rename to pkg/sfu/rtpstats/rtpstats_receiver_test.go index b9005faf7..80e3f6937 100644 --- a/pkg/sfu/buffer/rtpstats_receiver_test.go +++ b/pkg/sfu/rtpstats/rtpstats_receiver_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package buffer +package rtpstats import ( "fmt" diff --git a/pkg/sfu/buffer/rtpstats_sender.go b/pkg/sfu/rtpstats/rtpstats_sender.go similarity index 99% rename from pkg/sfu/buffer/rtpstats_sender.go rename to pkg/sfu/rtpstats/rtpstats_sender.go index 1f9715e96..d8c3edb5e 100644 --- a/pkg/sfu/buffer/rtpstats_sender.go +++ b/pkg/sfu/rtpstats/rtpstats_sender.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package buffer +package rtpstats import ( "errors"