mirror of
https://github.com/livekit/livekit.git
synced 2026-03-30 19:55:41 +00:00
Send quality stats to prometheus. (#1708)
This commit is contained in:
@@ -26,6 +26,7 @@ import (
|
||||
"github.com/livekit/livekit-server/pkg/sfu/connectionquality"
|
||||
"github.com/livekit/livekit-server/pkg/sfu/streamallocator"
|
||||
"github.com/livekit/livekit-server/pkg/telemetry"
|
||||
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
|
||||
"github.com/livekit/mediatransportutil/pkg/twcc"
|
||||
"github.com/livekit/protocol/auth"
|
||||
"github.com/livekit/protocol/livekit"
|
||||
@@ -169,6 +170,8 @@ type ParticipantImpl struct {
|
||||
cachedDownTracks map[livekit.TrackID]*downTrackState
|
||||
|
||||
supervisor *supervisor.ParticipantSupervisor
|
||||
|
||||
tracksQuality map[livekit.TrackID]livekit.ConnectionQuality
|
||||
}
|
||||
|
||||
func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
|
||||
@@ -193,7 +196,8 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
|
||||
telemetry.BytesTrackIDForParticipantID(telemetry.BytesTrackTypeData, params.SID),
|
||||
params.SID,
|
||||
params.Telemetry),
|
||||
supervisor: supervisor.NewParticipantSupervisor(supervisor.ParticipantSupervisorParams{Logger: params.Logger}),
|
||||
supervisor: supervisor.NewParticipantSupervisor(supervisor.ParticipantSupervisorParams{Logger: params.Logger}),
|
||||
tracksQuality: make(map[livekit.TrackID]livekit.ConnectionQuality),
|
||||
}
|
||||
p.version.Store(params.InitialVersion)
|
||||
p.timedVersion.Update(params.VersionGenerator.New())
|
||||
@@ -857,6 +861,10 @@ func (p *ParticipantImpl) GetConnectionQuality() *livekit.ConnectionQualityInfo
|
||||
numTracks := 0
|
||||
minQuality := livekit.ConnectionQuality_EXCELLENT
|
||||
minScore := float32(0.0)
|
||||
numUpDrops := 0
|
||||
numDownDrops := 0
|
||||
|
||||
availableTracks := make(map[livekit.TrackID]bool)
|
||||
|
||||
for _, pt := range p.GetPublishedTracks() {
|
||||
numTracks++
|
||||
@@ -869,6 +877,19 @@ func (p *ParticipantImpl) GetConnectionQuality() *livekit.ConnectionQualityInfo
|
||||
} else if quality == minQuality && score < minScore {
|
||||
minScore = score
|
||||
}
|
||||
|
||||
p.lock.Lock()
|
||||
trackID := pt.ID()
|
||||
if prevQuality, ok := p.tracksQuality[trackID]; ok {
|
||||
// WARNING NOTE: comparing protobuf enums directly
|
||||
if prevQuality > quality {
|
||||
numUpDrops++
|
||||
}
|
||||
}
|
||||
p.tracksQuality[trackID] = quality
|
||||
p.lock.Unlock()
|
||||
|
||||
availableTracks[trackID] = true
|
||||
}
|
||||
|
||||
subscribedTracks := p.SubscriptionManager.GetSubscribedTracks()
|
||||
@@ -883,6 +904,19 @@ func (p *ParticipantImpl) GetConnectionQuality() *livekit.ConnectionQualityInfo
|
||||
} else if quality == minQuality && score < minScore {
|
||||
minScore = score
|
||||
}
|
||||
|
||||
p.lock.Lock()
|
||||
trackID := subTrack.ID()
|
||||
if prevQuality, ok := p.tracksQuality[trackID]; ok {
|
||||
// WARNING NOTE: comparing protobuf enums directly
|
||||
if prevQuality > quality {
|
||||
numDownDrops++
|
||||
}
|
||||
}
|
||||
p.tracksQuality[trackID] = quality
|
||||
p.lock.Unlock()
|
||||
|
||||
availableTracks[trackID] = true
|
||||
}
|
||||
|
||||
if numTracks == 0 {
|
||||
@@ -890,6 +924,17 @@ func (p *ParticipantImpl) GetConnectionQuality() *livekit.ConnectionQualityInfo
|
||||
minScore = connectionquality.MaxMOS
|
||||
}
|
||||
|
||||
prometheus.RecordQuality(minQuality, minScore, numUpDrops, numDownDrops)
|
||||
|
||||
// remove unavailable tracks from track quality cache
|
||||
p.lock.Lock()
|
||||
for trackID := range p.tracksQuality {
|
||||
if !availableTracks[trackID] {
|
||||
delete(p.tracksQuality, trackID)
|
||||
}
|
||||
}
|
||||
p.lock.Unlock()
|
||||
|
||||
return &livekit.ConnectionQualityInfo{
|
||||
ParticipantSid: string(p.ID()),
|
||||
Quality: minQuality,
|
||||
|
||||
@@ -96,6 +96,7 @@ func Init(nodeID string, nodeType livekit.NodeType, env string) {
|
||||
initPacketStats(nodeID, nodeType, env)
|
||||
initRoomStats(nodeID, nodeType, env)
|
||||
initPSRPCStats(nodeID, nodeType, env)
|
||||
initQualityStats(nodeID, nodeType, env)
|
||||
}
|
||||
|
||||
func GetUpdatedNodeStats(prev *livekit.NodeStats, prevAverage *livekit.NodeStats) (*livekit.NodeStats, bool, error) {
|
||||
|
||||
47
pkg/telemetry/prometheus/quality.go
Normal file
47
pkg/telemetry/prometheus/quality.go
Normal file
@@ -0,0 +1,47 @@
|
||||
package prometheus
|
||||
|
||||
import (
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
"github.com/livekit/protocol/livekit"
|
||||
)
|
||||
|
||||
var (
|
||||
qualityRating prometheus.Histogram
|
||||
qualityScore prometheus.Histogram
|
||||
qualityDrop *prometheus.CounterVec
|
||||
)
|
||||
|
||||
func initQualityStats(nodeID string, nodeType livekit.NodeType, env string) {
|
||||
qualityRating = prometheus.NewHistogram(prometheus.HistogramOpts{
|
||||
Namespace: livekitNamespace,
|
||||
Subsystem: "quality",
|
||||
Name: "rating",
|
||||
ConstLabels: prometheus.Labels{"node_id": nodeID, "node_type": nodeType.String(), "env": env},
|
||||
Buckets: []float64{0, 1, 2},
|
||||
})
|
||||
qualityScore = prometheus.NewHistogram(prometheus.HistogramOpts{
|
||||
Namespace: livekitNamespace,
|
||||
Subsystem: "quality",
|
||||
Name: "score",
|
||||
ConstLabels: prometheus.Labels{"node_id": nodeID, "node_type": nodeType.String(), "env": env},
|
||||
Buckets: []float64{1.0, 2.0, 2.5, 3.0, 3.25, 3.5, 3.75, 4.0, 4.25, 4.5},
|
||||
})
|
||||
qualityDrop = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: livekitNamespace,
|
||||
Subsystem: "quality",
|
||||
Name: "drop",
|
||||
ConstLabels: prometheus.Labels{"node_id": nodeID, "node_type": nodeType.String(), "env": env},
|
||||
}, []string{"direction"})
|
||||
|
||||
prometheus.MustRegister(qualityRating)
|
||||
prometheus.MustRegister(qualityScore)
|
||||
prometheus.MustRegister(qualityDrop)
|
||||
}
|
||||
|
||||
func RecordQuality(rating livekit.ConnectionQuality, score float32, numUpDrops int, numDownDrops int) {
|
||||
qualityRating.Observe(float64(rating))
|
||||
qualityScore.Observe(float64(score))
|
||||
qualityDrop.WithLabelValues("up").Add(float64(numUpDrops))
|
||||
qualityDrop.WithLabelValues("down").Add(float64(numDownDrops))
|
||||
}
|
||||
Reference in New Issue
Block a user