diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index 8d2080322..d2fff6d91 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -87,7 +87,7 @@ func NewMediaTrack(track *webrtc.TrackRemote, params MediaTrackParams) *MediaTra Logger: params.Logger, }) t.MediaTrackReceiver.OnMediaLossUpdate(func(fractionalLoss uint8) { - if t.buffer != nil { + if t.buffer != nil && t.Kind() == livekit.TrackType_AUDIO { // ok to access buffer since receivers are added before subscribers t.buffer.SetLastFractionLostReport(fractionalLoss) } @@ -108,8 +108,6 @@ func NewMediaTrack(track *webrtc.TrackRemote, params MediaTrackParams) *MediaTra // on close signal via closing channel to workers t.AddOnClose(t.closeChan) - go t.updateStats() - return t } @@ -222,16 +220,15 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra t.params.Telemetry.TrackUnpublished(context.Background(), t.PublisherID(), t.ToProto(), uint32(track.SSRC())) }) t.params.Telemetry.TrackPublished(context.Background(), t.PublisherID(), t.ToProto()) - if t.Kind() == livekit.TrackType_AUDIO { - t.buffer = buff - } + + t.buffer = buff t.MediaTrackReceiver.SetupReceiver(wr) + go t.updateStats() } t.lock.Unlock() t.Receiver().(*sfu.WebRTCReceiver).AddUpTrack(track, buff) - t.params.Telemetry.AddUpTrack(t.PublisherID(), t.ID(), buff) atomic.AddUint32(&t.numUpTracks, 1) // LK-TODO: can remove this completely when VideoLayers protocol becomes the default as it has info from client or if we decide to use TrackInfo.Simulcast @@ -275,9 +272,9 @@ func (t *MediaTrack) handlePublisherFeedback(packets []rtcp.Packet) { var jitter uint32 var totalLost uint32 var maxSeqNum uint32 - - // forward to telemetry - t.params.Telemetry.HandleRTCP(livekit.StreamType_UPSTREAM, t.params.ParticipantID, t.ID(), packets) + var nackCount int32 + var pliCount int32 + var firCount int32 for _, p := range packets { switch pkt := p.(type) { @@ -287,7 +284,6 @@ func (t *MediaTrack) handlePublisherFeedback(packets []rtcp.Packet) { if rr.FractionLost > maxLost { maxLost = rr.FractionLost } - if rr.Delay > delay { delay = rr.Delay } @@ -297,11 +293,18 @@ func (t *MediaTrack) handlePublisherFeedback(packets []rtcp.Packet) { if rr.LastSequenceNumber > maxSeqNum { maxSeqNum = rr.LastSequenceNumber } - totalLost = rr.TotalLost - - hasReport = true } + hasReport = true + case *rtcp.TransportLayerNack: + nackCount++ + hasReport = true + case *rtcp.PictureLossIndication: + pliCount++ + hasReport = true + case *rtcp.FullIntraRequest: + firCount++ + hasReport = true } } @@ -317,8 +320,11 @@ func (t *MediaTrack) handlePublisherFeedback(packets []rtcp.Packet) { t.maxUpFracLost = 0 t.maxUpFracLostTs = now } + t.statsLock.Unlock() + // update feedback stats - current := t.connectionStats.Curr + t.connectionStats.Lock.Lock() + current := t.connectionStats if jitter > current.Jitter { current.Jitter = jitter } @@ -329,7 +335,10 @@ func (t *MediaTrack) handlePublisherFeedback(packets []rtcp.Packet) { current.LastSeqNum = maxSeqNum } current.PacketsLost = totalLost - t.statsLock.Unlock() + current.NackCount += nackCount + current.PliCount += pliCount + current.FirCount += firCount + t.connectionStats.Lock.Unlock() } // also look for sender reports @@ -338,8 +347,8 @@ func (t *MediaTrack) handlePublisherFeedback(packets []rtcp.Packet) { } func (t *MediaTrack) GetConnectionScore() float64 { - t.statsLock.Lock() - defer t.statsLock.Unlock() + t.connectionStats.Lock.Lock() + defer t.connectionStats.Lock.Unlock() return t.connectionStats.Score } @@ -348,23 +357,41 @@ func (t *MediaTrack) closeChan() { } func (t *MediaTrack) updateStats() { + for { select { case <-t.done: return case <-time.After(connectionQualityUpdateInterval): - t.statsLock.Lock() + t.connectionStats.Lock.Lock() + + //we are accessing stats after receiver has buffer assigned + stats := t.buffer.GetStats() + + delta := t.connectionStats.UpdateStats(stats.TotalByte) if t.Kind() == livekit.TrackType_AUDIO { - t.connectionStats.CalculateAudioScore() + t.connectionStats.Score = connectionquality.AudioConnectionScore(delta, t.connectionStats.Jitter) } else { - t.calculateVideoScore() + t.connectionStats.Score = t.calculateVideoScore() } - t.statsLock.Unlock() + stat := &livekit.AnalyticsStat{ + Jitter: float64(t.connectionStats.Jitter), + TotalPackets: uint64(t.connectionStats.TotalPackets), + PacketLost: uint64(t.connectionStats.PacketsLost), + Delay: uint64(t.connectionStats.Delay), + TotalBytes: t.connectionStats.TotalBytes, + NackCount: t.connectionStats.NackCount, + PliCount: t.connectionStats.PliCount, + FirCount: t.connectionStats.FirCount, + ConnectionScore: float32(t.connectionStats.Score), + } + t.params.Telemetry.TrackStats(livekit.StreamType_UPSTREAM, t.PublisherID(), t.ID(), stat) + t.connectionStats.Lock.Unlock() } } } -func (t *MediaTrack) calculateVideoScore() { +func (t *MediaTrack) calculateVideoScore() float64 { var reducedQuality bool publishing, expected := t.getNumUpTracks() if publishing < expected { @@ -375,5 +402,5 @@ func (t *MediaTrack) calculateVideoScore() { if expected == 0 { loss = 0 } - t.connectionStats.Score = connectionquality.Loss2Score(loss, reducedQuality) + return connectionquality.VideoConnectionScore(loss, reducedQuality) } diff --git a/pkg/rtc/mediatracksubscriptions.go b/pkg/rtc/mediatracksubscriptions.go index 75847ff2f..8b5b342fe 100644 --- a/pkg/rtc/mediatracksubscriptions.go +++ b/pkg/rtc/mediatracksubscriptions.go @@ -185,14 +185,9 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, code go t.sendDownTrackBindingReports(sub) }) trackID := t.params.MediaTrack.ID() - downTrack.OnPacketSent(func(_ *sfu.DownTrack, size int) { - t.params.Telemetry.OnDownstreamPacket(subscriberID, trackID, size) - }) - downTrack.OnPaddingSent(func(_ *sfu.DownTrack, size int) { - t.params.Telemetry.OnDownstreamPacket(subscriberID, trackID, size) - }) - downTrack.OnRTCP(func(pkts []rtcp.Packet) { - t.params.Telemetry.HandleRTCP(livekit.StreamType_DOWNSTREAM, subscriberID, trackID, pkts) + + downTrack.OnStatsUpdate(func(_ *sfu.DownTrack, stat *livekit.AnalyticsStat) { + t.params.Telemetry.TrackStats(livekit.StreamType_DOWNSTREAM, subscriberID, trackID, stat) }) downTrack.OnCloseHandler(func() { diff --git a/pkg/rtc/participant_internal_test.go b/pkg/rtc/participant_internal_test.go index aa1f91198..b10101ede 100644 --- a/pkg/rtc/participant_internal_test.go +++ b/pkg/rtc/participant_internal_test.go @@ -250,7 +250,7 @@ func TestConnectionQuality(t *testing.T) { if numRegistered > 0 && numPublishing != numRegistered { reducedQuality = true } - return connectionquality.Loss2Score(loss, reducedQuality) + return connectionquality.VideoConnectionScore(loss, reducedQuality) } testPublishedVideoTrack := func(loss, numPublishing, numRegistered uint32) *typesfakes.FakeLocalMediaTrack { @@ -264,16 +264,20 @@ func TestConnectionQuality(t *testing.T) { testPublishedAudioTrack := func(totalPackets, packetsLost uint32) *typesfakes.FakeLocalMediaTrack { tr := &typesfakes.FakeLocalMediaTrack{} - stat := &connectionquality.ConnectionStat{ + stat := connectionquality.ConnectionStat{ PacketsLost: packetsLost, TotalPackets: totalPackets, LastSeqNum: 0, } stats := &connectionquality.ConnectionStats{ - Curr: stat, - Prev: &connectionquality.ConnectionStat{}, + ConnectionStat: stat, + Prev: &connectionquality.ConnectionStat{}, } - stats.CalculateAudioScore() + stats.Score = connectionquality.AudioConnectionScore(connectionquality.ConnectionStat{ + TotalPackets: stat.TotalPackets, + PacketsLost: stat.PacketsLost, + TotalBytes: stat.TotalBytes, + }, 0) t.Log("audio score: ", stats.Score) tr.GetConnectionScoreReturns(stats.Score) return tr diff --git a/pkg/sfu/connectionquality/connectionstats.go b/pkg/sfu/connectionquality/connectionstats.go index e2e36fd8c..396c6bbac 100644 --- a/pkg/sfu/connectionquality/connectionstats.go +++ b/pkg/sfu/connectionquality/connectionstats.go @@ -1,25 +1,28 @@ package connectionquality -import ( - "github.com/livekit/protocol/livekit" -) +import "sync" type ConnectionStat struct { PacketsLost uint32 - Delay uint32 - Jitter uint32 TotalPackets uint32 LastSeqNum uint32 + TotalBytes uint64 } type ConnectionStats struct { - Curr *ConnectionStat - Prev *ConnectionStat - Score float64 + Lock sync.Mutex + ConnectionStat + Prev *ConnectionStat + Delay uint32 + Jitter uint32 + NackCount int32 + PliCount int32 + FirCount int32 + Score float64 } func NewConnectionStats() *ConnectionStats { - return &ConnectionStats{Curr: &ConnectionStat{}, Prev: &ConnectionStat{}, Score: 4.0} + return &ConnectionStats{Prev: &ConnectionStat{}, Score: 4.0} } func getTotalPackets(curSN, prevSN uint32) uint32 { @@ -33,18 +36,22 @@ func getTotalPackets(curSN, prevSN uint32) uint32 { return increment } -func (cs *ConnectionStats) CalculateAudioScore() float64 { +func (cs *ConnectionStats) UpdateStats(totalBytes uint64) ConnectionStat { // update feedback stats - current := cs.Curr previous := cs.Prev // Update TotalPackets from SeqNum here - current.TotalPackets += getTotalPackets(current.LastSeqNum, previous.LastSeqNum) - cs.Score = ConnectionScore(current, previous, livekit.TrackType_AUDIO) + cs.TotalPackets += getTotalPackets(cs.LastSeqNum, previous.LastSeqNum) + cs.TotalBytes = totalBytes + + var delta ConnectionStat + + delta.TotalPackets = cs.TotalPackets - previous.TotalPackets + delta.PacketsLost = cs.PacketsLost - previous.PacketsLost + delta.TotalBytes = cs.TotalBytes - previous.TotalBytes // store previous stats - cs.Prev = current - cs.Curr = &ConnectionStat{TotalPackets: cs.Prev.TotalPackets, PacketsLost: cs.Prev.PacketsLost} + cs.Prev = &ConnectionStat{TotalPackets: cs.TotalPackets, PacketsLost: cs.PacketsLost, TotalBytes: cs.TotalBytes, LastSeqNum: cs.LastSeqNum} - return cs.Score + return delta } diff --git a/pkg/sfu/connectionquality/mos.go b/pkg/sfu/connectionquality/mos.go index d7a35b701..a231d4817 100644 --- a/pkg/sfu/connectionquality/mos.go +++ b/pkg/sfu/connectionquality/mos.go @@ -22,26 +22,20 @@ func Score2Rating(score float64) livekit.ConnectionQuality { return livekit.ConnectionQuality_POOR } -func mosAudioEmodel(cur, prev *ConnectionStat) float64 { - - if cur == nil { - return 0.0 - } +func mosAudioEmodel(delta ConnectionStat, jitter uint32) float64 { // find percentage of lost packets in this window - deltaTotalPackets := cur.TotalPackets - prev.TotalPackets - if deltaTotalPackets == 0 { + if delta.TotalPackets == 0 { return 0.0 } - deltaTotalLostPackets := cur.PacketsLost - prev.PacketsLost - percentageLost := (float64(deltaTotalLostPackets) / float64(deltaTotalPackets)) * 100 + percentageLost := (float64(delta.PacketsLost) / float64(delta.TotalPackets)) * 100 rx := 93.2 - percentageLost ry := 0.18*rx*rx - 27.9*rx + 1126.62 // Jitter is in MicroSecs (1/1e6) units. Convert it to MilliSecs - d := float64(rtt + (cur.Jitter / 1000)) + d := float64(rtt + (jitter / 1000)) h := d - 177.3 if h < 0 { h = 0 @@ -61,7 +55,7 @@ func mosAudioEmodel(cur, prev *ConnectionStat) float64 { return score } -func Loss2Score(loss uint32, reducedQuality bool) float64 { +func loss2Score(loss uint32, reducedQuality bool) float64 { // No Loss, excellent if loss == 0 && !reducedQuality { return 5 @@ -78,9 +72,10 @@ func Loss2Score(loss uint32, reducedQuality bool) float64 { return score } -func ConnectionScore(cur, prev *ConnectionStat, kind livekit.TrackType) float64 { - if kind == livekit.TrackType_AUDIO { - return mosAudioEmodel(cur, prev) - } - return 0 +func AudioConnectionScore(delta ConnectionStat, jitter uint32) float64 { + return mosAudioEmodel(delta, jitter) +} + +func VideoConnectionScore(pctLoss uint32, reducedQuality bool) float64 { + return loss2Score(pctLoss, reducedQuality) } diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index da845ee6a..5769ac403 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -66,7 +66,7 @@ var ( type ReceiverReportListener func(dt *DownTrack, report *rtcp.ReceiverReport) type PacketStats struct { - octets uint32 + octets uint64 packets uint32 } @@ -103,7 +103,6 @@ type DownTrack struct { primaryStats atomic.Value // contains *PacketStats rtxStats atomic.Value // contains *PacketStats paddingStats atomic.Value // contains *PacketStats - statsLock sync.Mutex connectionStats *connectionquality.ConnectionStats done chan struct{} @@ -133,6 +132,9 @@ type DownTrack struct { // padding packet sent callback onPaddingSent []func(dt *DownTrack, size int) + + // update stats + onStatsUpdate func(dt *DownTrack, stat *livekit.AnalyticsStat) } // NewDownTrack returns a DownTrack. @@ -552,6 +554,10 @@ func (d *DownTrack) OnPaddingSent(fn func(dt *DownTrack, size int)) { d.onPaddingSent = append(d.onPaddingSent, fn) } +func (d *DownTrack) OnStatsUpdate(fn func(dt *DownTrack, stat *livekit.AnalyticsStat)) { + d.onStatsUpdate = fn +} + func (d *DownTrack) IsDeficient() bool { return d.forwarder.IsDeficient() } @@ -641,19 +647,20 @@ func (d *DownTrack) CreateSenderReport() *rtcp.SenderReport { diff := (uint64(now.Sub(ntpTime(srNTP).Time())) * uint64(d.codec.ClockRate)) / uint64(time.Second) octets, packets := d.getSRStats() + return &rtcp.SenderReport{ SSRC: d.ssrc, NTPTime: uint64(nowNTP), RTPTime: srRTP + uint32(diff), PacketCount: packets, - OctetCount: octets, + OctetCount: uint32(octets), } } func (d *DownTrack) UpdatePrimaryStats(packetLen uint32) { primaryStats, _ := d.primaryStats.Load().(*PacketStats) - primaryStats.octets += packetLen + primaryStats.octets += uint64(packetLen) primaryStats.packets += 1 d.primaryStats.Store(primaryStats) @@ -662,7 +669,7 @@ func (d *DownTrack) UpdatePrimaryStats(packetLen uint32) { func (d *DownTrack) UpdateRtxStats(packetLen uint32) { rtxStats, _ := d.rtxStats.Load().(*PacketStats) - rtxStats.octets += packetLen + rtxStats.octets += uint64(packetLen) rtxStats.packets += 1 d.rtxStats.Store(rtxStats) @@ -671,7 +678,7 @@ func (d *DownTrack) UpdateRtxStats(packetLen uint32) { func (d *DownTrack) UpdatePaddingStats(packetLen uint32) { paddingStats, _ := d.paddingStats.Load().(*PacketStats) - paddingStats.octets += packetLen + paddingStats.octets += uint64(packetLen) paddingStats.packets += 1 d.paddingStats.Store(paddingStats) @@ -807,14 +814,21 @@ func (d *DownTrack) handleRTCP(bytes []byte) { var jitter uint32 var totalLost uint32 var maxSeqNum uint32 + var nackCount int32 + var pliCount int32 + var firCount int32 maxRatePacketLoss := uint8(0) for _, pkt := range pkts { switch p := pkt.(type) { case *rtcp.PictureLossIndication: sendPliOnce() + pliCount++ + hasReport = true case *rtcp.FullIntraRequest: sendPliOnce() + firCount++ + hasReport = true case *rtcp.ReceiverEstimatedMaximumBitrate: if d.onREMB != nil { d.onREMB(d, p) @@ -854,34 +868,40 @@ func (d *DownTrack) handleRTCP(bytes []byte) { } d.listenerLock.RUnlock() } - if hasReport { - d.statsLock.Lock() - // update feedback stats - current := d.connectionStats.Curr - if jitter > current.Jitter { - current.Jitter = jitter - } - if delay > current.Delay { - current.Delay = delay - } - if maxSeqNum > current.LastSeqNum { - current.LastSeqNum = maxSeqNum - } - current.PacketsLost = totalLost - d.statsLock.Unlock() - } + case *rtcp.TransportLayerNack: var nackedPackets []packetMeta for _, pair := range p.Nacks { nackedPackets = append(nackedPackets, d.sequencer.getSeqNoPairs(pair.PacketList())...) } go d.retransmitPackets(nackedPackets) + nackCount += int32(len(nackedPackets)) + hasReport = true case *rtcp.TransportLayerCC: if p.MediaSSRC == d.ssrc && d.onTransportCCFeedback != nil { d.onTransportCCFeedback(d, p) } } } + if hasReport { + d.connectionStats.Lock.Lock() + current := d.connectionStats + // update feedback stats + if jitter > current.Jitter { + current.Jitter = jitter + } + if delay > current.Delay { + current.Delay = delay + } + if maxSeqNum > current.LastSeqNum { + current.LastSeqNum = maxSeqNum + } + current.PacketsLost = totalLost + current.NackCount += nackCount + current.PliCount += pliCount + current.FirCount += firCount + d.connectionStats.Lock.Unlock() + } } func (d *DownTrack) retransmitPackets(nackedPackets []packetMeta) { @@ -959,7 +979,7 @@ func (d *DownTrack) retransmitPackets(nackedPackets []packetMeta) { } } -func (d *DownTrack) getSRStats() (uint32, uint32) { +func (d *DownTrack) getSRStats() (uint64, uint32) { primary := d.primaryStats.Load().(*PacketStats) rtx := d.rtxStats.Load().(*PacketStats) padding := d.paddingStats.Load().(*PacketStats) @@ -1061,8 +1081,8 @@ func (d *DownTrack) DebugInfo() map[string]interface{} { } func (d *DownTrack) GetConnectionScore() float64 { - d.statsLock.Lock() - defer d.statsLock.Unlock() + d.connectionStats.Lock.Lock() + defer d.connectionStats.Lock.Unlock() return d.connectionStats.Score } @@ -1072,13 +1092,33 @@ func (d *DownTrack) updateStats() { case <-d.done: return case <-time.After(connectionQualityUpdateInterval): - d.statsLock.Lock() + d.connectionStats.Lock.Lock() + totalBytes, _ := d.getSRStats() + delta := d.connectionStats.UpdateStats(totalBytes) if d.Kind() == webrtc.RTPCodecTypeAudio { - d.connectionStats.CalculateAudioScore() + d.connectionStats.Score = connectionquality.AudioConnectionScore(delta, d.connectionStats.Jitter) } else { - d.calculateVideoScore() + var reducedQuality bool + if d.GetForwardingStatus() != ForwardingStatusOptimal { + reducedQuality = true + } + d.connectionStats.Score = connectionquality.VideoConnectionScore(FixedPointToPercent(d.CurrentMaxLossFraction()), reducedQuality) } - d.statsLock.Unlock() + stat := &livekit.AnalyticsStat{ + Jitter: float64(d.connectionStats.Jitter), + TotalPackets: uint64(d.connectionStats.TotalPackets), + PacketLost: uint64(d.connectionStats.PacketsLost), + Delay: uint64(d.connectionStats.Delay), + TotalBytes: d.connectionStats.TotalBytes, + NackCount: d.connectionStats.NackCount, + PliCount: d.connectionStats.PliCount, + FirCount: d.connectionStats.FirCount, + ConnectionScore: float32(d.connectionStats.Score), + } + if d.onStatsUpdate != nil { + d.onStatsUpdate(d, stat) + } + d.connectionStats.Lock.Unlock() } } } @@ -1087,12 +1127,3 @@ func (d *DownTrack) updateStats() { func FixedPointToPercent(frac uint8) uint32 { return (uint32(frac) * 100) >> 8 } - -func (d *DownTrack) calculateVideoScore() { - var reducedQuality bool - if d.GetForwardingStatus() != ForwardingStatusOptimal { - reducedQuality = true - } - d.connectionStats.Score = connectionquality.Loss2Score(FixedPointToPercent(d.CurrentMaxLossFraction()), reducedQuality) - return -} diff --git a/pkg/telemetry/statsworker.go b/pkg/telemetry/statsworker.go index 4d8b477f3..054ed7c95 100644 --- a/pkg/telemetry/statsworker.go +++ b/pkg/telemetry/statsworker.go @@ -5,8 +5,6 @@ import ( "github.com/livekit/protocol/livekit" "google.golang.org/protobuf/types/known/timestamppb" - - "github.com/livekit/livekit-server/pkg/sfu/buffer" ) // StatsWorker handles participant stats @@ -17,9 +15,7 @@ type StatsWorker struct { roomName livekit.RoomName participantID livekit.ParticipantID - upstreamBuffers map[livekit.TrackID][]*buffer.Buffer - drainUpstreamBuffers map[livekit.TrackID]bool - + drainStats map[livekit.TrackID]bool outgoingPerTrack map[livekit.TrackID]*Stats incomingPerTrack map[livekit.TrackID]*Stats } @@ -32,6 +28,7 @@ type Stats struct { prevBytes uint64 totalPacketsLost uint64 prevPacketsLost uint64 + connectionScore float32 } func newStatsWorker(ctx context.Context, t TelemetryReporter, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID) *StatsWorker { @@ -42,24 +39,13 @@ func newStatsWorker(ctx context.Context, t TelemetryReporter, roomID livekit.Roo roomName: roomName, participantID: participantID, - upstreamBuffers: make(map[livekit.TrackID][]*buffer.Buffer), - drainUpstreamBuffers: make(map[livekit.TrackID]bool), - outgoingPerTrack: make(map[livekit.TrackID]*Stats), incomingPerTrack: make(map[livekit.TrackID]*Stats), + drainStats: make(map[livekit.TrackID]bool), } return s } -func (s *StatsWorker) AddBuffer(trackID livekit.TrackID, buffer *buffer.Buffer) { - s.upstreamBuffers[trackID] = append(s.upstreamBuffers[trackID], buffer) -} - -func (s *StatsWorker) OnDownstreamPacket(trackID livekit.TrackID, bytes int) { - s.getOrCreateOutgoingStatsIfEmpty(trackID).totalBytes += uint64(bytes) - s.getOrCreateOutgoingStatsIfEmpty(trackID).totalPackets++ -} - func (s *StatsWorker) getOrCreateOutgoingStatsIfEmpty(trackID livekit.TrackID) *Stats { if s.outgoingPerTrack[trackID] == nil { s.outgoingPerTrack[trackID] = &Stats{next: &livekit.AnalyticsStat{ @@ -84,7 +70,7 @@ func (s *StatsWorker) getOrCreateIncomingStatsIfEmpty(trackID livekit.TrackID) * return s.incomingPerTrack[trackID] } -func (s *StatsWorker) OnRTCP(trackID livekit.TrackID, direction livekit.StreamType, stats *livekit.AnalyticsStat) { +func (s *StatsWorker) OnTrackStat(trackID livekit.TrackID, direction livekit.StreamType, stats *livekit.AnalyticsStat) { var ds *Stats if direction == livekit.StreamType_DOWNSTREAM { ds = s.getOrCreateOutgoingStatsIfEmpty(trackID) @@ -92,6 +78,8 @@ func (s *StatsWorker) OnRTCP(trackID livekit.TrackID, direction livekit.StreamTy ds = s.getOrCreateIncomingStatsIfEmpty(trackID) } ds.totalPacketsLost = stats.PacketLost + ds.totalPackets = uint32(stats.TotalPackets) + ds.totalBytes = stats.TotalBytes if stats.Rtt > ds.next.Rtt { ds.next.Rtt = stats.Rtt @@ -102,17 +90,12 @@ func (s *StatsWorker) OnRTCP(trackID livekit.TrackID, direction livekit.StreamTy ds.next.NackCount += stats.NackCount ds.next.PliCount += stats.PliCount ds.next.FirCount += stats.FirCount -} - -func (s *StatsWorker) calculateTotalBytesPackets(allBuffers []*buffer.Buffer) (totalBytes uint64, totalPackets uint32) { - totalBytes = 0 - totalPackets = 0 - - for _, buff := range allBuffers { - totalBytes += buff.GetStats().TotalByte - totalPackets += buff.GetStats().PacketCount + // average out scores received in this interval + if ds.connectionScore == 0 { + ds.connectionScore = stats.ConnectionScore + } else { + ds.connectionScore = (ds.connectionScore + stats.ConnectionScore) / 2 } - return totalBytes, totalPackets } func (s *StatsWorker) Update() { @@ -133,30 +116,25 @@ func (s *StatsWorker) collectDownstreamStats(ts *timestamppb.Timestamp, stats [] stats = append(stats, analyticsStat) } } + if len(s.drainStats) > 0 { + for trackID := range s.drainStats { + delete(s.outgoingPerTrack, trackID) + delete(s.incomingPerTrack, trackID) + } + s.drainStats = make(map[livekit.TrackID]bool) + } + return stats } func (s *StatsWorker) collectUpstreamStats(ts *timestamppb.Timestamp, stats []*livekit.AnalyticsStat) []*livekit.AnalyticsStat { - for trackID, buffers := range s.upstreamBuffers { - totalBytes, totalPackets := s.calculateTotalBytesPackets(buffers) - - s.getOrCreateIncomingStatsIfEmpty(trackID).totalBytes = totalBytes - s.getOrCreateIncomingStatsIfEmpty(trackID).totalPackets = totalPackets - - analyticsStats := s.update(s.incomingPerTrack[trackID], ts) - if analyticsStats != nil { - analyticsStats.TrackId = string(trackID) - stats = append(stats, analyticsStats) + for trackID, trackUpStreamStats := range s.incomingPerTrack { + analyticsStat := s.update(trackUpStreamStats, ts) + if analyticsStat != nil { + analyticsStat.TrackId = string(trackID) + stats = append(stats, analyticsStat) } } - - if len(s.drainUpstreamBuffers) > 0 { - for trackID := range s.drainUpstreamBuffers { - delete(s.upstreamBuffers, trackID) - delete(s.incomingPerTrack, trackID) - } - s.drainUpstreamBuffers = make(map[livekit.TrackID]bool) - } return stats } @@ -177,18 +155,18 @@ func (s *StatsWorker) update(stats *Stats, ts *timestamppb.Timestamp) *livekit.A next.TotalPackets = uint64(stats.totalPackets - stats.prevPackets) next.TotalBytes = stats.totalBytes - stats.prevBytes next.PacketLost = stats.totalPacketsLost - stats.prevPacketsLost + next.ConnectionScore = stats.connectionScore stats.prevPackets = stats.totalPackets stats.prevBytes = stats.totalBytes stats.prevPacketsLost = stats.totalPacketsLost - return next } -func (s *StatsWorker) RemoveBuffer(trackID livekit.TrackID) { - s.drainUpstreamBuffers[trackID] = true -} - func (s *StatsWorker) Close() { s.Update() } + +func (s *StatsWorker) RemoveStats(trackID livekit.TrackID) { + s.drainStats[trackID] = true +} diff --git a/pkg/telemetry/telemetryservice.go b/pkg/telemetry/telemetryservice.go index a399ff213..42c7bce15 100644 --- a/pkg/telemetry/telemetryservice.go +++ b/pkg/telemetry/telemetryservice.go @@ -6,18 +6,13 @@ import ( "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/webhook" - "github.com/pion/rtcp" - - "github.com/livekit/livekit-server/pkg/sfu/buffer" ) const updateFrequency = time.Second * 10 type TelemetryService interface { // stats - AddUpTrack(participantID livekit.ParticipantID, trackID livekit.TrackID, buff *buffer.Buffer) - OnDownstreamPacket(participantID livekit.ParticipantID, trackID livekit.TrackID, bytes int) - HandleRTCP(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID, pkts []rtcp.Packet) + TrackStats(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID, stat *livekit.AnalyticsStat) // events RoomStarted(ctx context.Context, room *livekit.Room) @@ -68,21 +63,9 @@ func (t *telemetryService) run() { } } -func (t *telemetryService) AddUpTrack(participantID livekit.ParticipantID, trackID livekit.TrackID, buff *buffer.Buffer) { +func (t *telemetryService) TrackStats(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID, stats *livekit.AnalyticsStat) { t.jobQueue <- func() { - t.internalService.AddUpTrack(participantID, trackID, buff) - } -} - -func (t *telemetryService) OnDownstreamPacket(participantID livekit.ParticipantID, trackID livekit.TrackID, bytes int) { - t.jobQueue <- func() { - t.internalService.OnDownstreamPacket(participantID, trackID, bytes) - } -} - -func (t *telemetryService) HandleRTCP(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID, pkts []rtcp.Packet) { - t.jobQueue <- func() { - t.internalService.HandleRTCP(streamType, participantID, trackID, pkts) + t.internalService.TrackStats(streamType, participantID, trackID, stats) } } diff --git a/pkg/telemetry/telemetryserviceinternal.go b/pkg/telemetry/telemetryserviceinternal.go index 3658f8c34..db50f9588 100644 --- a/pkg/telemetry/telemetryserviceinternal.go +++ b/pkg/telemetry/telemetryserviceinternal.go @@ -4,12 +4,9 @@ import ( "context" "github.com/gammazero/workerpool" + "github.com/livekit/livekit-server/pkg/telemetry/prometheus" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/webhook" - "github.com/pion/rtcp" - - "github.com/livekit/livekit-server/pkg/sfu/buffer" - "github.com/livekit/livekit-server/pkg/telemetry/prometheus" ) type TelemetryServiceInternal interface { @@ -40,58 +37,18 @@ func NewTelemetryServiceInternal(notifier webhook.Notifier, analytics AnalyticsS } } -func (t *telemetryServiceInternal) AddUpTrack(participantID livekit.ParticipantID, trackID livekit.TrackID, buff *buffer.Buffer) { - w := t.workers[participantID] - if w != nil { - w.AddBuffer(trackID, buff) - } -} - -func (t *telemetryServiceInternal) OnDownstreamPacket(participantID livekit.ParticipantID, trackID livekit.TrackID, bytes int) { - w := t.workers[participantID] - if w != nil { - w.OnDownstreamPacket(trackID, bytes) - } -} - -func (t *telemetryServiceInternal) HandleRTCP(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID, pkts []rtcp.Packet) { - stats := &livekit.AnalyticsStat{} - for _, pkt := range pkts { - switch pkt := pkt.(type) { - case *rtcp.TransportLayerNack: - stats.NackCount++ - case *rtcp.PictureLossIndication: - stats.PliCount++ - case *rtcp.FullIntraRequest: - stats.FirCount++ - case *rtcp.ReceiverReport: - for _, rr := range pkt.Reports { - if jitter := float64(rr.Jitter); jitter > stats.Jitter { - stats.Jitter = jitter - } - if totalLost := uint64(rr.TotalLost); totalLost > stats.PacketLost { - stats.PacketLost = totalLost - } - } - if streamType == livekit.StreamType_DOWNSTREAM { - rtt := GetRttMs(pkt) - if rtt >= 0 { - stats.Rtt = uint32(rtt) - } - } - } - } +func (t *telemetryServiceInternal) TrackStats(streamType livekit.StreamType, participantID livekit.ParticipantID, trackID livekit.TrackID, stat *livekit.AnalyticsStat) { direction := prometheus.Incoming if streamType == livekit.StreamType_DOWNSTREAM { direction = prometheus.Outgoing } - prometheus.IncrementRTCP(direction, stats.NackCount, stats.PliCount, stats.FirCount) + prometheus.IncrementRTCP(direction, stat.NackCount, stat.PliCount, stat.FirCount) w := t.workers[participantID] if w != nil { - w.OnRTCP(trackID, streamType, stats) + w.OnTrackStat(trackID, streamType, stat) } } diff --git a/pkg/telemetry/telemetryserviceinternalevents.go b/pkg/telemetry/telemetryserviceinternalevents.go index 990fcf963..c60cb93a7 100644 --- a/pkg/telemetry/telemetryserviceinternalevents.go +++ b/pkg/telemetry/telemetryserviceinternalevents.go @@ -124,8 +124,8 @@ func (t *telemetryServiceInternal) TrackUnpublished(ctx context.Context, partici w := t.workers[participantID] if w != nil { roomID = w.roomID - w.RemoveBuffer(livekit.TrackID(track.GetSid())) roomName = w.roomName + w.RemoveStats(livekit.TrackID(track.GetSid())) } prometheus.SubPublishedTrack(track.Type.String()) diff --git a/pkg/telemetry/test/telemetry_service_test.go b/pkg/telemetry/test/telemetry_service_test.go index 2ad50ee46..def671263 100644 --- a/pkg/telemetry/test/telemetry_service_test.go +++ b/pkg/telemetry/test/telemetry_service_test.go @@ -5,10 +5,8 @@ import ( "testing" "github.com/livekit/protocol/livekit" - "github.com/pion/rtcp" "github.com/stretchr/testify/require" - "github.com/livekit/livekit-server/pkg/sfu/buffer" "github.com/livekit/livekit-server/pkg/telemetry" "github.com/livekit/livekit-server/pkg/telemetry/telemetryfakes" ) @@ -37,7 +35,8 @@ func Test_ParticipantAndRoomDataAreSentWithAnalytics(t *testing.T) { // do packet := 33 - fixture.sut.OnDownstreamPacket(partSID, "", packet) + stat := &livekit.AnalyticsStat{TotalBytes: uint64(packet)} + fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, "", stat) fixture.sut.SendAnalytics() // test @@ -65,8 +64,11 @@ func Test_OnDownstreamPackets(t *testing.T) { totalBytes := packets[0] + packets[1] totalPackets := len(packets) trackID := livekit.TrackID("trackID") + var bytes int for i := range packets { - fixture.sut.OnDownstreamPacket(partSID, trackID, packets[i]) + bytes += packets[i] + stat := &livekit.AnalyticsStat{TotalBytes: uint64(bytes), TotalPackets: uint64(i + 1)} + fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, trackID, stat) } fixture.sut.SendAnalytics() @@ -93,10 +95,13 @@ func Test_OnDownstreamPackets_SeveralTracks(t *testing.T) { // do packet1 := 33 trackID1 := livekit.TrackID("trackID1") + stat1 := &livekit.AnalyticsStat{TotalBytes: uint64(packet1), TotalPackets: 1} + fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, trackID1, stat1) + packet2 := 23 trackID2 := livekit.TrackID("trackID2") - fixture.sut.OnDownstreamPacket(partSID, trackID1, packet1) - fixture.sut.OnDownstreamPacket(partSID, trackID2, packet2) + stat2 := &livekit.AnalyticsStat{TotalBytes: uint64(packet2), TotalPackets: 1} + fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, trackID2, stat2) fixture.sut.SendAnalytics() // test @@ -131,20 +136,12 @@ func Test_OnDownStreamRTCP(t *testing.T) { fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil) // do - pkts := []rtcp.Packet{ - &rtcp.TransportLayerNack{}, - &rtcp.PictureLossIndication{}, - &rtcp.FullIntraRequest{}, - &rtcp.ReceiverReport{ - Reports: []rtcp.ReceptionReport{ - {Jitter: 5, TotalLost: 3}, - {Jitter: 2, TotalLost: 4}, - }, - }, - } + stat1 := &livekit.AnalyticsStat{NackCount: 1, PliCount: 1, Jitter: 3, PacketLost: 3, TotalBytes: 1, TotalPackets: 1} trackID := livekit.TrackID("trackID1") - fixture.sut.OnDownstreamPacket(partSID, trackID, 1) // there should be bytes reported so that stats are sent - fixture.sut.HandleRTCP(livekit.StreamType_DOWNSTREAM, partSID, trackID, pkts) + fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, trackID, stat1) + stat2 := &livekit.AnalyticsStat{FirCount: 1, Jitter: 5, PacketLost: 4, TotalBytes: 2, TotalPackets: 2} + fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, trackID, stat2) + fixture.sut.SendAnalytics() // test @@ -171,25 +168,12 @@ func Test_PacketLostDiffShouldBeSentToTelemetry(t *testing.T) { fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil) // do - pkts1 := []rtcp.Packet{ - &rtcp.ReceiverReport{ - Reports: []rtcp.ReceptionReport{ - {Delay: 0, Jitter: 0, TotalLost: 1}, - }, - }, - } - pkts2 := []rtcp.Packet{ - &rtcp.ReceiverReport{ - Reports: []rtcp.ReceptionReport{ - {Delay: 0, Jitter: 0, TotalLost: 4}, // diff with previous is 3, so in second call to SendAnalytics, 3 should be sent - }, - }, - } trackID := livekit.TrackID("trackID1") - fixture.sut.OnDownstreamPacket(partSID, trackID, 1) // there should be bytes reported so that stats are sent - fixture.sut.HandleRTCP(livekit.StreamType_DOWNSTREAM, partSID, trackID, pkts1) + stat1 := &livekit.AnalyticsStat{PacketLost: 1, TotalPackets: 1, TotalBytes: 1} + fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, trackID, stat1) // there should be bytes reported so that stats are sent fixture.sut.SendAnalytics() - fixture.sut.HandleRTCP(livekit.StreamType_DOWNSTREAM, partSID, trackID, pkts2) + stat2 := &livekit.AnalyticsStat{PacketLost: 4, TotalPackets: 2, TotalBytes: 2} + fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, trackID, stat2) fixture.sut.SendAnalytics() // test @@ -215,18 +199,17 @@ func Test_OnDownStreamRTCP_SeveralTracks(t *testing.T) { fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil) // do - pkts1 := []rtcp.Packet{ - &rtcp.TransportLayerNack{}, - } - pkts2 := []rtcp.Packet{ - &rtcp.FullIntraRequest{}, - } + trackID1 := livekit.TrackID("trackID1") trackID2 := livekit.TrackID("trackID2") - fixture.sut.OnDownstreamPacket(partSID, trackID1, 1) // there should be bytes reported so that stats are sent - fixture.sut.HandleRTCP(livekit.StreamType_DOWNSTREAM, partSID, trackID1, pkts1) - fixture.sut.OnDownstreamPacket(partSID, trackID2, 1) // there should be bytes reported so that stats are sent - fixture.sut.HandleRTCP(livekit.StreamType_DOWNSTREAM, partSID, trackID2, pkts2) + stat1 := &livekit.AnalyticsStat{TotalBytes: 1, TotalPackets: 1} + fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, trackID1, stat1) // there should be bytes reported so that stats are sent + + stat2 := &livekit.AnalyticsStat{NackCount: 1, TotalPackets: 2, TotalBytes: 2} + fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, trackID1, stat2) + + stat3 := &livekit.AnalyticsStat{FirCount: 1, TotalPackets: 3, TotalBytes: 3} + fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, trackID2, stat3) fixture.sut.SendAnalytics() // test @@ -260,26 +243,14 @@ func Test_OnUpstreamRTCP(t *testing.T) { fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil) // do - pkts := []rtcp.Packet{ - &rtcp.TransportLayerNack{}, - &rtcp.PictureLossIndication{}, - &rtcp.FullIntraRequest{}, - &rtcp.ReceiverReport{ - Reports: []rtcp.ReceptionReport{ - {Jitter: 5, TotalLost: 3}, - {Jitter: 2, TotalLost: 4}, - }, - }, - } - // there should be bytes reported so that stats are sent - buf := &buffer.Buffer{} - buf.SetStatsTestOnly(buffer.Stats{ - PacketCount: 1, - TotalByte: 1, - }) + + stat1 := &livekit.AnalyticsStat{NackCount: 1, PliCount: 1, FirCount: 1, Jitter: 5, PacketLost: 3, TotalPackets: 1, TotalBytes: 1} trackID := livekit.TrackID("trackID") - fixture.sut.AddUpTrack(partSID, trackID, buf) - fixture.sut.HandleRTCP(livekit.StreamType_UPSTREAM, partSID, trackID, pkts) + + fixture.sut.TrackStats(livekit.StreamType_UPSTREAM, partSID, trackID, stat1) + + stat2 := &livekit.AnalyticsStat{Jitter: 2, PacketLost: 4, TotalPackets: 2, TotalBytes: 2} + fixture.sut.TrackStats(livekit.StreamType_UPSTREAM, partSID, trackID, stat2) fixture.sut.SendAnalytics() // test @@ -306,26 +277,22 @@ func Test_OnUpstreamRTCP_SeveralTracks(t *testing.T) { fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil) // there should be bytes reported so that stats are sent - buf := &buffer.Buffer{} totalBytes := 1 totalPackets := 1 - buf.SetStatsTestOnly(buffer.Stats{ - PacketCount: uint32(totalPackets), - TotalByte: uint64(totalBytes), - }) trackID1 := livekit.TrackID("trackID1") trackID2 := livekit.TrackID("trackID2") - fixture.sut.AddUpTrack(partSID, trackID1, buf) - fixture.sut.AddUpTrack(partSID, trackID2, buf) // using same buffer is not correct but for test it is fine - pkts1 := []rtcp.Packet{ - &rtcp.TransportLayerNack{}, - } - pkts2 := []rtcp.Packet{ - &rtcp.FullIntraRequest{}, - } + + stat1 := &livekit.AnalyticsStat{TotalBytes: uint64(totalBytes), TotalPackets: uint64(totalPackets)} + fixture.sut.TrackStats(livekit.StreamType_UPSTREAM, partSID, trackID1, stat1) + fixture.sut.TrackStats(livekit.StreamType_UPSTREAM, partSID, trackID2, stat1) // using same buffer is not correct but for test it is fine + // do - fixture.sut.HandleRTCP(livekit.StreamType_UPSTREAM, partSID, trackID1, pkts1) - fixture.sut.HandleRTCP(livekit.StreamType_UPSTREAM, partSID, trackID2, pkts2) + totalBytes++ + totalPackets++ + stat2 := &livekit.AnalyticsStat{NackCount: 1, TotalBytes: uint64(totalBytes), TotalPackets: uint64(totalPackets)} + fixture.sut.TrackStats(livekit.StreamType_UPSTREAM, partSID, trackID1, stat2) + stat3 := &livekit.AnalyticsStat{FirCount: 1, TotalBytes: uint64(totalBytes), TotalPackets: uint64(totalPackets)} + fixture.sut.TrackStats(livekit.StreamType_UPSTREAM, partSID, trackID2, stat3) fixture.sut.SendAnalytics() // test @@ -351,7 +318,7 @@ func Test_OnUpstreamRTCP_SeveralTracks(t *testing.T) { require.True(t, found1) require.True(t, found2) - // remove 1 buffer + // remove 1 track fixture.sut.TrackUnpublished(context.Background(), partSID, &livekit.TrackInfo{Sid: string(trackID2)}, 0) fixture.sut.SendAnalytics() require.Equal(t, 2, fixture.analytics.SendStatsCallCount()) @@ -393,14 +360,10 @@ func Test_AddUpTrack(t *testing.T) { // do var totalBytes uint64 = 3 var totalPackets uint32 = 3 - buf := &buffer.Buffer{} - bufferStats := buffer.Stats{ - PacketCount: totalPackets, - TotalByte: totalBytes, - } - buf.SetStatsTestOnly(bufferStats) + + stat := &livekit.AnalyticsStat{TotalPackets: uint64(totalPackets), TotalBytes: totalBytes} trackID := livekit.TrackID("trackID") - fixture.sut.AddUpTrack(partSID, trackID, buf) + fixture.sut.TrackStats(livekit.StreamType_UPSTREAM, partSID, trackID, stat) fixture.sut.SendAnalytics() // test @@ -423,30 +386,20 @@ func Test_AddUpTrack_SeveralBuffers_Simulcast(t *testing.T) { fixture.sut.ParticipantJoined(context.Background(), room, participantInfo, nil) // do trackID := livekit.TrackID("trackID") - // buffer 1 - buf1 := &buffer.Buffer{} - buf1.SetStatsTestOnly(buffer.Stats{ - PacketCount: 1, - TotalByte: 1, - }) - fixture.sut.AddUpTrack(partSID, trackID, buf1) - // buffer 2 - buf2 := &buffer.Buffer{} - buf2.SetStatsTestOnly(buffer.Stats{ - PacketCount: 2, - TotalByte: 2, - }) - fixture.sut.AddUpTrack(partSID, trackID, buf2) + + stat1 := &livekit.AnalyticsStat{TotalBytes: 1, TotalPackets: 1} + fixture.sut.TrackStats(livekit.StreamType_UPSTREAM, partSID, trackID, stat1) + + stat2 := &livekit.AnalyticsStat{TotalPackets: 2, TotalBytes: 2} + fixture.sut.TrackStats(livekit.StreamType_UPSTREAM, partSID, trackID, stat2) fixture.sut.SendAnalytics() // test - totalBytes := buf1.GetStats().TotalByte + buf2.GetStats().TotalByte - totalPackets := buf1.GetStats().PacketCount + buf2.GetStats().PacketCount require.Equal(t, 1, fixture.analytics.SendStatsCallCount()) _, stats := fixture.analytics.SendStatsArgsForCall(0) require.Equal(t, 1, len(stats)) require.Equal(t, livekit.StreamType_UPSTREAM, stats[0].Kind) - require.Equal(t, totalBytes, stats[0].TotalBytes) - require.Equal(t, totalPackets, uint32(stats[0].TotalPackets)) + require.Equal(t, stat2.TotalBytes, stats[0].TotalBytes) + require.Equal(t, stat2.TotalPackets, stats[0].TotalPackets) require.Equal(t, string(trackID), stats[0].TrackId) } @@ -461,14 +414,11 @@ func Test_BothDownstreamAndUpstreamStatsAreSentTogether(t *testing.T) { // do // upstream bytes - buf := &buffer.Buffer{} - buf.SetStatsTestOnly(buffer.Stats{ - PacketCount: 3, - TotalByte: 3, - }) - fixture.sut.AddUpTrack(partSID, "trackID", buf) + stat1 := &livekit.AnalyticsStat{TotalPackets: 3, TotalBytes: 3} + fixture.sut.TrackStats(livekit.StreamType_UPSTREAM, partSID, "trackID", stat1) // downstream bytes - fixture.sut.OnDownstreamPacket(partSID, "trackID1", 1) + stat2 := &livekit.AnalyticsStat{TotalPackets: 1, TotalBytes: 1} + fixture.sut.TrackStats(livekit.StreamType_DOWNSTREAM, partSID, "trackID1", stat2) fixture.sut.SendAnalytics() // test