Files
livekit/pkg/telemetry/statsworker.go
David Colburn bf46e998b2 Sfu/buffer stats for telemetry (#173)
* more buffer stats for analytics

* update names

* fix jitter and lost rate

* don't return on participantLeft if they never published
2021-11-09 02:06:07 -06:00

90 lines
1.9 KiB
Go

package telemetry
import (
"sync"
"time"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
)
const updateFrequency = time.Second * 10
// StatsWorker handles incoming RTP statistics instead of the stream interceptor
type StatsWorker struct {
sync.RWMutex
buffers map[uint32]*buffer.Buffer
lastStats *buffer.Stats
onUpdate func(diff *buffer.Stats)
close chan struct{}
}
func NewStatsWorker(onUpdate func(*buffer.Stats)) *StatsWorker {
s := &StatsWorker{
buffers: make(map[uint32]*buffer.Buffer),
onUpdate: onUpdate,
close: make(chan struct{}, 1),
}
go s.run()
return s
}
func (s *StatsWorker) run() {
for {
select {
case <-s.close:
return
case <-time.After(updateFrequency):
s.onUpdate(s.Calc())
}
}
}
func (s *StatsWorker) AddBuffer(buffer *buffer.Buffer) {
s.Lock()
s.buffers[buffer.GetMediaSSRC()] = buffer
s.Unlock()
}
func (s *StatsWorker) RemoveBuffer(ssrc uint32) {
s.Lock()
delete(s.buffers, ssrc)
s.Unlock()
}
func (s *StatsWorker) Calc() *buffer.Stats {
s.RLock()
total := &buffer.Stats{}
for _, buff := range s.buffers {
stats := buff.GetStats()
total.PacketCount += stats.PacketCount
total.TotalByte += stats.TotalByte
total.LastExpected += stats.LastExpected
total.LastReceived += stats.LastReceived
if stats.Jitter > total.Jitter {
total.Jitter = stats.Jitter
}
}
s.RUnlock()
var diff *buffer.Stats
if s.lastStats != nil {
diff = &buffer.Stats{
LastExpected: total.LastExpected - s.lastStats.LastExpected,
LastReceived: total.LastReceived - s.lastStats.LastReceived,
PacketCount: total.PacketCount - s.lastStats.PacketCount,
TotalByte: total.TotalByte - s.lastStats.TotalByte,
Jitter: total.Jitter,
}
} else {
diff = total
}
diff.LostRate = float32(diff.LastExpected-diff.LastReceived) / float32(diff.LastExpected)
s.lastStats = diff
return diff
}
func (s *StatsWorker) Close() {
close(s.close)
}