diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 9e37fe99c..96db0b3d0 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -26,6 +26,7 @@ import ( "github.com/livekit/livekit-server/pkg/sfu/connectionquality" "github.com/livekit/livekit-server/pkg/sfu/streamallocator" "github.com/livekit/livekit-server/pkg/telemetry" + "github.com/livekit/livekit-server/pkg/telemetry/prometheus" "github.com/livekit/mediatransportutil/pkg/twcc" "github.com/livekit/protocol/auth" "github.com/livekit/protocol/livekit" @@ -169,6 +170,8 @@ type ParticipantImpl struct { cachedDownTracks map[livekit.TrackID]*downTrackState supervisor *supervisor.ParticipantSupervisor + + tracksQuality map[livekit.TrackID]livekit.ConnectionQuality } func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { @@ -193,7 +196,8 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { telemetry.BytesTrackIDForParticipantID(telemetry.BytesTrackTypeData, params.SID), params.SID, params.Telemetry), - supervisor: supervisor.NewParticipantSupervisor(supervisor.ParticipantSupervisorParams{Logger: params.Logger}), + supervisor: supervisor.NewParticipantSupervisor(supervisor.ParticipantSupervisorParams{Logger: params.Logger}), + tracksQuality: make(map[livekit.TrackID]livekit.ConnectionQuality), } p.version.Store(params.InitialVersion) p.timedVersion.Update(params.VersionGenerator.New()) @@ -857,6 +861,10 @@ func (p *ParticipantImpl) GetConnectionQuality() *livekit.ConnectionQualityInfo numTracks := 0 minQuality := livekit.ConnectionQuality_EXCELLENT minScore := float32(0.0) + numUpDrops := 0 + numDownDrops := 0 + + availableTracks := make(map[livekit.TrackID]bool) for _, pt := range p.GetPublishedTracks() { numTracks++ @@ -869,6 +877,19 @@ func (p *ParticipantImpl) GetConnectionQuality() *livekit.ConnectionQualityInfo } else if quality == minQuality && score < minScore { minScore = score } + + p.lock.Lock() + trackID := pt.ID() + if prevQuality, ok := p.tracksQuality[trackID]; ok { + // WARNING NOTE: comparing protobuf enums directly + if prevQuality > quality { + numUpDrops++ + } + } + p.tracksQuality[trackID] = quality + p.lock.Unlock() + + availableTracks[trackID] = true } subscribedTracks := p.SubscriptionManager.GetSubscribedTracks() @@ -883,6 +904,19 @@ func (p *ParticipantImpl) GetConnectionQuality() *livekit.ConnectionQualityInfo } else if quality == minQuality && score < minScore { minScore = score } + + p.lock.Lock() + trackID := subTrack.ID() + if prevQuality, ok := p.tracksQuality[trackID]; ok { + // WARNING NOTE: comparing protobuf enums directly + if prevQuality > quality { + numDownDrops++ + } + } + p.tracksQuality[trackID] = quality + p.lock.Unlock() + + availableTracks[trackID] = true } if numTracks == 0 { @@ -890,6 +924,17 @@ func (p *ParticipantImpl) GetConnectionQuality() *livekit.ConnectionQualityInfo minScore = connectionquality.MaxMOS } + prometheus.RecordQuality(minQuality, minScore, numUpDrops, numDownDrops) + + // remove unavailable tracks from track quality cache + p.lock.Lock() + for trackID := range p.tracksQuality { + if !availableTracks[trackID] { + delete(p.tracksQuality, trackID) + } + } + p.lock.Unlock() + return &livekit.ConnectionQualityInfo{ ParticipantSid: string(p.ID()), Quality: minQuality, diff --git a/pkg/telemetry/prometheus/node.go b/pkg/telemetry/prometheus/node.go index 54e7f9d33..48b438e65 100644 --- a/pkg/telemetry/prometheus/node.go +++ b/pkg/telemetry/prometheus/node.go @@ -96,6 +96,7 @@ func Init(nodeID string, nodeType livekit.NodeType, env string) { initPacketStats(nodeID, nodeType, env) initRoomStats(nodeID, nodeType, env) initPSRPCStats(nodeID, nodeType, env) + initQualityStats(nodeID, nodeType, env) } func GetUpdatedNodeStats(prev *livekit.NodeStats, prevAverage *livekit.NodeStats) (*livekit.NodeStats, bool, error) { diff --git a/pkg/telemetry/prometheus/quality.go b/pkg/telemetry/prometheus/quality.go new file mode 100644 index 000000000..708c654f0 --- /dev/null +++ b/pkg/telemetry/prometheus/quality.go @@ -0,0 +1,47 @@ +package prometheus + +import ( + "github.com/prometheus/client_golang/prometheus" + + "github.com/livekit/protocol/livekit" +) + +var ( + qualityRating prometheus.Histogram + qualityScore prometheus.Histogram + qualityDrop *prometheus.CounterVec +) + +func initQualityStats(nodeID string, nodeType livekit.NodeType, env string) { + qualityRating = prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: livekitNamespace, + Subsystem: "quality", + Name: "rating", + ConstLabels: prometheus.Labels{"node_id": nodeID, "node_type": nodeType.String(), "env": env}, + Buckets: []float64{0, 1, 2}, + }) + qualityScore = prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: livekitNamespace, + Subsystem: "quality", + Name: "score", + ConstLabels: prometheus.Labels{"node_id": nodeID, "node_type": nodeType.String(), "env": env}, + Buckets: []float64{1.0, 2.0, 2.5, 3.0, 3.25, 3.5, 3.75, 4.0, 4.25, 4.5}, + }) + qualityDrop = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: livekitNamespace, + Subsystem: "quality", + Name: "drop", + ConstLabels: prometheus.Labels{"node_id": nodeID, "node_type": nodeType.String(), "env": env}, + }, []string{"direction"}) + + prometheus.MustRegister(qualityRating) + prometheus.MustRegister(qualityScore) + prometheus.MustRegister(qualityDrop) +} + +func RecordQuality(rating livekit.ConnectionQuality, score float32, numUpDrops int, numDownDrops int) { + qualityRating.Observe(float64(rating)) + qualityScore.Observe(float64(score)) + qualityDrop.WithLabelValues("up").Add(float64(numUpDrops)) + qualityDrop.WithLabelValues("down").Add(float64(numDownDrops)) +}