mirror of
https://github.com/livekit/livekit.git
synced 2026-04-25 15:32:09 +00:00
* Use delta stats throughout and avoid calculating deltas in telemetry * Fix a few things after testing * Remove debug * Fix tests * delete instead of setting to nil * Point to the latest protocol
184 lines
4.1 KiB
Go
184 lines
4.1 KiB
Go
package connectionquality
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/pion/webrtc/v3"
|
|
"go.uber.org/atomic"
|
|
|
|
"github.com/livekit/protocol/livekit"
|
|
"github.com/livekit/protocol/logger"
|
|
|
|
"github.com/livekit/livekit-server/pkg/sfu/buffer"
|
|
)
|
|
|
|
const (
|
|
connectionQualityUpdateInterval = 5 * time.Second
|
|
)
|
|
|
|
type ConnectionStatsParams struct {
|
|
UpdateInterval time.Duration
|
|
CodecType webrtc.RTPCodecType
|
|
GetDeltaStats func() map[uint32]*buffer.StreamStatsWithLayers
|
|
GetQualityParams func() *buffer.ConnectionQualityParams
|
|
GetIsReducedQuality func() bool
|
|
Logger logger.Logger
|
|
}
|
|
|
|
type ConnectionStats struct {
|
|
params ConnectionStatsParams
|
|
|
|
onStatsUpdate func(cs *ConnectionStats, stat *livekit.AnalyticsStat)
|
|
|
|
lock sync.RWMutex
|
|
score float32
|
|
|
|
done chan struct{}
|
|
isClosed atomic.Bool
|
|
}
|
|
|
|
func NewConnectionStats(params ConnectionStatsParams) *ConnectionStats {
|
|
return &ConnectionStats{
|
|
params: params,
|
|
score: 4.0,
|
|
done: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
func (cs *ConnectionStats) Start() {
|
|
go cs.updateStats()
|
|
}
|
|
|
|
func (cs *ConnectionStats) Close() {
|
|
if cs.isClosed.Swap(true) {
|
|
return
|
|
}
|
|
|
|
close(cs.done)
|
|
}
|
|
|
|
func (cs *ConnectionStats) OnStatsUpdate(fn func(cs *ConnectionStats, stat *livekit.AnalyticsStat)) {
|
|
cs.onStatsUpdate = fn
|
|
}
|
|
|
|
func (cs *ConnectionStats) GetScore() float32 {
|
|
cs.lock.RLock()
|
|
defer cs.lock.RUnlock()
|
|
|
|
return cs.score
|
|
}
|
|
|
|
func (cs *ConnectionStats) updateScore() float32 {
|
|
cs.lock.Lock()
|
|
defer cs.lock.Unlock()
|
|
|
|
s := cs.params.GetQualityParams()
|
|
if s == nil {
|
|
return cs.score
|
|
}
|
|
|
|
if cs.params.CodecType == webrtc.RTPCodecTypeAudio {
|
|
cs.score = AudioConnectionScore(s.LossPercentage, s.Rtt, s.Jitter)
|
|
} else {
|
|
isReducedQuality := false
|
|
if cs.params.GetIsReducedQuality != nil {
|
|
isReducedQuality = cs.params.GetIsReducedQuality()
|
|
}
|
|
cs.score = VideoConnectionScore(s.LossPercentage, isReducedQuality)
|
|
}
|
|
|
|
return cs.score
|
|
}
|
|
|
|
func (cs *ConnectionStats) getStat() *livekit.AnalyticsStat {
|
|
if cs.params.GetDeltaStats == nil {
|
|
return nil
|
|
}
|
|
|
|
streams := cs.params.GetDeltaStats()
|
|
if len(streams) == 0 {
|
|
return nil
|
|
}
|
|
|
|
analyticsStreams := make([]*livekit.AnalyticsStream, 0, len(streams))
|
|
for ssrc, stream := range streams {
|
|
as := ToAnalyticsStream(ssrc, stream.RTPStats)
|
|
|
|
//
|
|
// add video layer if either
|
|
// 1. Simulcast - even if there is only one layer per stream as it provides layer id
|
|
// 2. A stream has multiple layers
|
|
//
|
|
if cs.params.CodecType == webrtc.RTPCodecTypeVideo && (len(streams) > 1 || len(stream.Layers) > 1) {
|
|
for layer, layerStats := range stream.Layers {
|
|
as.VideoLayers = append(as.VideoLayers, ToAnalyticsVideoLayer(layer, &layerStats))
|
|
}
|
|
}
|
|
|
|
analyticsStreams = append(analyticsStreams, as)
|
|
}
|
|
|
|
score := cs.updateScore()
|
|
|
|
return &livekit.AnalyticsStat{
|
|
Score: score,
|
|
Streams: analyticsStreams,
|
|
}
|
|
}
|
|
|
|
func (cs *ConnectionStats) updateStats() {
|
|
interval := cs.params.UpdateInterval
|
|
if interval == 0 {
|
|
interval = connectionQualityUpdateInterval
|
|
}
|
|
|
|
tk := time.NewTicker(interval)
|
|
defer tk.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-cs.done:
|
|
return
|
|
|
|
case <-tk.C:
|
|
stat := cs.getStat()
|
|
if stat == nil {
|
|
continue
|
|
}
|
|
|
|
if cs.onStatsUpdate != nil {
|
|
cs.onStatsUpdate(cs, stat)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func ToAnalyticsStream(ssrc uint32, deltaStats *buffer.RTPDeltaInfo) *livekit.AnalyticsStream {
|
|
return &livekit.AnalyticsStream{
|
|
Ssrc: ssrc,
|
|
PrimaryPackets: deltaStats.Packets,
|
|
PrimaryBytes: deltaStats.Bytes,
|
|
RetransmitPackets: deltaStats.PacketsDuplicate,
|
|
RetransmitBytes: deltaStats.BytesDuplicate,
|
|
PaddingPackets: deltaStats.PacketsPadding,
|
|
PaddingBytes: deltaStats.BytesPadding,
|
|
PacketsLost: deltaStats.PacketsLost,
|
|
Frames: deltaStats.Frames,
|
|
Rtt: deltaStats.RttMax,
|
|
Jitter: uint32(deltaStats.JitterMax),
|
|
Nacks: deltaStats.Nacks,
|
|
Plis: deltaStats.Plis,
|
|
Firs: deltaStats.Firs,
|
|
}
|
|
}
|
|
|
|
func ToAnalyticsVideoLayer(layer int, layerStats *buffer.LayerStats) *livekit.AnalyticsVideoLayer {
|
|
return &livekit.AnalyticsVideoLayer{
|
|
Layer: int32(layer),
|
|
Packets: layerStats.Packets,
|
|
Bytes: layerStats.Bytes,
|
|
Frames: layerStats.Frames,
|
|
}
|
|
}
|