From 4d6f0cd0f77e010c9d1a3ca4372bde5dcc78a8f4 Mon Sep 17 00:00:00 2001 From: Dan McFaul <55854809+real-danm@users.noreply.github.com> Date: Wed, 11 Jan 2023 14:49:50 -0700 Subject: [PATCH] Stats collect v2 (#1291) * initial commit * add correct label * clean up * more cleanup on adding stats * cleanup * move things to pub and sub monitors, ensure stats are correctly updated * fix merge conflict * Fix panic on MacOS (#1296) * fixing last feedback Co-authored-by: Raja Subramanian --- deploy/grafana/livekit-server-overview.json | 202 ------------------- go.mod | 2 +- go.sum | 4 +- pkg/rtc/participant.go | 6 +- pkg/rtc/room.go | 3 - pkg/rtc/supervisor/participant_supervisor.go | 7 +- pkg/rtc/supervisor/publication_monitor.go | 12 +- pkg/rtc/supervisor/subscription_monitor.go | 10 + pkg/rtc/transport.go | 1 - pkg/service/rtcservice.go | 13 +- pkg/telemetry/events.go | 2 +- pkg/telemetry/prometheus/node.go | 86 +++++--- pkg/telemetry/prometheus/packets.go | 14 +- pkg/telemetry/prometheus/rooms.go | 58 +++++- 14 files changed, 147 insertions(+), 273 deletions(-) diff --git a/deploy/grafana/livekit-server-overview.json b/deploy/grafana/livekit-server-overview.json index 33aa0fbb1..24f375720 100644 --- a/deploy/grafana/livekit-server-overview.json +++ b/deploy/grafana/livekit-server-overview.json @@ -510,208 +510,6 @@ ], "title": "Network Rate", "type": "timeseries" - }, - { - "datasource": null, - "fieldConfig": { - "defaults": { - "color": { - "mode": "palette-classic" - }, - "custom": { - "axisLabel": "", - "axisPlacement": "auto", - "barAlignment": 0, - "drawStyle": "line", - "fillOpacity": 0, - "gradientMode": "none", - "hideFrom": { - "legend": false, - "tooltip": false, - "viz": false - }, - "lineInterpolation": "linear", - "lineWidth": 1, - "pointSize": 5, - "scaleDistribution": { - "type": "linear" - }, - "showPoints": "auto", - "spanNulls": false, - "stacking": { - "group": "A", - "mode": "none" - }, - "thresholdsStyle": { - "mode": "off" - } - }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green", - "value": null - }, - { - "color": "red", - "value": 80 - } - ] - } - }, - "overrides": [] - }, - "gridPos": { - "h": 8, - "w": 12, - "x": 12, - "y": 16 - }, - "id": 15, - "options": { - "legend": { - "calcs": [], - "displayMode": "list", - "placement": "bottom" - }, - "tooltip": { - "mode": "single" - } - }, - "targets": [ - { - "exemplar": true, - "expr": "sum(rate(livekit_node_service_operation{status=\"success\"}[5m])) by (type)", - "interval": "", - "legendFormat": "Success - {{type}} ", - "refId": "Success" - }, - { - "exemplar": true, - "expr": "sum(rate(livekit_node_service_operation{status!=\"success\"}[5m]))", - "hide": false, - "interval": "", - "legendFormat": "Error", - "refId": "Error" - }, - { - "exemplar": true, - "expr": "sum(rate(livekit_node_service_operation{status=\"success\", type=\"signal_ws\"}[5m])) - sum(rate(livekit_node_service_operation{status=\"success\", type=\"ice_connection\"}[5m]))", - "hide": false, - "interval": "", - "legendFormat": "Missing", - "refId": "Missing" - } - ], - "thresholds": [ - { - "colorMode": "critical", - "op": "gt", - "value": 0, - "visible": true - } - ], - "title": "Participant Connections", - "type": "timeseries" - }, - { - "datasource": null, - "fieldConfig": { - "defaults": { - "color": { - "mode": "palette-classic" - }, - "custom": { - "axisLabel": "", - "axisPlacement": "auto", - "barAlignment": 0, - "drawStyle": "line", - "fillOpacity": 0, - "gradientMode": "none", - "hideFrom": { - "legend": false, - "tooltip": false, - "viz": false - }, - "lineInterpolation": "linear", - "lineWidth": 1, - "pointSize": 5, - "scaleDistribution": { - "type": "linear" - }, - "showPoints": "auto", - "spanNulls": false, - "stacking": { - "group": "A", - "mode": "none" - }, - "thresholdsStyle": { - "mode": "off" - } - }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green", - "value": null - }, - { - "color": "red", - "value": 80 - } - ] - } - }, - "overrides": [] - }, - "gridPos": { - "h": 8, - "w": 12, - "x": 12, - "y": 24 - }, - "id": 17, - "options": { - "legend": { - "calcs": [], - "displayMode": "list", - "placement": "bottom" - }, - "tooltip": { - "mode": "single" - } - }, - "targets": [ - { - "exemplar": true, - "expr": "sum(rate(livekit_node_service_operation{status=\"success\", type=\"ice_connection\"}[5m])) / sum(rate(livekit_node_service_operation{status=\"success\", type=\"signal_ws\"}[5m]))", - "interval": "", - "legendFormat": "Success Percent", - "refId": "A" - }, - { - "exemplar": true, - "expr": "sum(rate(livekit_node_service_operation{status=\"success\", type=\"signal_ws\"}[5m]))", - "hide": true, - "interval": "", - "legendFormat": "", - "refId": "signal_ws" - } - ], - "thresholds": [ - { - "colorMode": "critical", - "op": "lt", - "value": 0.9, - "visible": true - } - ], - "title": "Successful RTC Connection Flow", - "type": "timeseries" } ], "refresh": "5m", diff --git a/go.mod b/go.mod index bb68f54c6..3cae14ae1 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20221221221243-f361fbe40290 github.com/livekit/mediatransportutil v0.0.0-20230111071722-904079e94a7c - github.com/livekit/protocol v1.3.2-0.20230111091921-f637ca8e675f + github.com/livekit/protocol v1.3.2-0.20230111195642-abfad31c5f93 github.com/livekit/psrpc v0.2.1 github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995 github.com/mackerelio/go-osstat v0.2.3 diff --git a/go.sum b/go.sum index ec27bf49a..8b0cf0e7f 100644 --- a/go.sum +++ b/go.sum @@ -233,8 +233,8 @@ github.com/livekit/mageutil v0.0.0-20221221221243-f361fbe40290 h1:ZVsQUuUOM9G7O3 github.com/livekit/mageutil v0.0.0-20221221221243-f361fbe40290/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20230111071722-904079e94a7c h1:wdzwTJjCpzy2FDmwdyVVGVa4+U9iv3E4Jy9qUDe/ubw= github.com/livekit/mediatransportutil v0.0.0-20230111071722-904079e94a7c/go.mod h1:1Dlx20JPoIKGP45eo+yuj0HjeE25zmyeX/EWHiPCjFw= -github.com/livekit/protocol v1.3.2-0.20230111091921-f637ca8e675f h1:5trxeV2GknxRya2EgbE3BZeB+a8ULLDBRZffLxVq1x0= -github.com/livekit/protocol v1.3.2-0.20230111091921-f637ca8e675f/go.mod h1:gwCG03nKlHlC9hTjL4pXQpn783ALhmbyhq65UZxqbb8= +github.com/livekit/protocol v1.3.2-0.20230111195642-abfad31c5f93 h1:KBW2Puv2tdZELD+zCtGLw/X9vLe9975jEFMFOOpSRGI= +github.com/livekit/protocol v1.3.2-0.20230111195642-abfad31c5f93/go.mod h1:gwCG03nKlHlC9hTjL4pXQpn783ALhmbyhq65UZxqbb8= github.com/livekit/psrpc v0.2.1 h1:ph/4egUMueUPoh5PZ/Aw4v6SH3wAbA+2t/GyCbpPKTg= github.com/livekit/psrpc v0.2.1/go.mod h1:MCe0xLdFPXmzogPiLrM94JIJbctb9+fAv5qYPkY2DXw= github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995 h1:vOaY2qvfLihDyeZtnGGN1Law9wRrw8BMGCr1TygTvMw= diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index c3110f103..2287a87f1 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -589,7 +589,7 @@ func (p *ParticipantImpl) SetMigrateInfo( for _, t := range mediaTracks { ti := t.GetTrack() - p.supervisor.AddPublication(livekit.TrackID(ti.Sid)) + p.supervisor.AddPublication(livekit.TrackID(ti.Sid), ti.Type) p.supervisor.SetPublicationMute(livekit.TrackID(ti.Sid), ti.Muted) p.pendingTracks[t.GetCid()] = &pendingTrackInfo{trackInfos: []*livekit.TrackInfo{ti}, migrated: true} @@ -1548,7 +1548,7 @@ func (p *ParticipantImpl) addPendingTrackLocked(req *livekit.AddTrackRequest) *l } if p.getPublishedTrackBySignalCid(req.Cid) != nil || p.getPublishedTrackBySdpCid(req.Cid) != nil || p.pendingTracks[req.Cid] != nil { - p.supervisor.AddPublication(livekit.TrackID(ti.Sid)) + p.supervisor.AddPublication(livekit.TrackID(ti.Sid), ti.Type) p.supervisor.SetPublicationMute(livekit.TrackID(ti.Sid), ti.Muted) if p.pendingTracks[req.Cid] == nil { @@ -1560,7 +1560,7 @@ func (p *ParticipantImpl) addPendingTrackLocked(req *livekit.AddTrackRequest) *l return nil } - p.supervisor.AddPublication(livekit.TrackID(ti.Sid)) + p.supervisor.AddPublication(livekit.TrackID(ti.Sid), ti.Type) p.supervisor.SetPublicationMute(livekit.TrackID(ti.Sid), ti.Muted) p.pendingTracks[req.Cid] = &pendingTrackInfo{trackInfos: []*livekit.TrackInfo{ti}} diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index c1b97837c..09a8d4858 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -221,12 +221,10 @@ func (r *Room) Join(participant types.LocalParticipant, opts *ParticipantOptions defer r.lock.Unlock() if r.IsClosed() { - prometheus.ServiceOperationCounter.WithLabelValues("participant_join", "error", "room_closed").Add(1) return ErrRoomClosed } if r.participants[participant.Identity()] != nil { - prometheus.ServiceOperationCounter.WithLabelValues("participant_join", "error", "already_joined").Add(1) return ErrAlreadyJoined } @@ -239,7 +237,6 @@ func (r *Room) Join(participant types.LocalParticipant, opts *ParticipantOptions } if participantCount >= int(r.protoRoom.MaxParticipants) { - prometheus.ServiceOperationCounter.WithLabelValues("participant_join", "error", "max_exceeded").Add(1) return ErrMaxParticipantsExceeded } } diff --git a/pkg/rtc/supervisor/participant_supervisor.go b/pkg/rtc/supervisor/participant_supervisor.go index 9276a66d4..d912342db 100644 --- a/pkg/rtc/supervisor/participant_supervisor.go +++ b/pkg/rtc/supervisor/participant_supervisor.go @@ -4,10 +4,11 @@ import ( "sync" "time" + "go.uber.org/atomic" + "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" - "go.uber.org/atomic" ) const ( @@ -91,7 +92,7 @@ func (p *ParticipantSupervisor) SetPublisherPeerConnectionConnected(isConnected p.lock.Unlock() } -func (p *ParticipantSupervisor) AddPublication(trackID livekit.TrackID) { +func (p *ParticipantSupervisor) AddPublication(trackID livekit.TrackID, trackType livekit.TrackType) { p.lock.Lock() pm, ok := p.publications[trackID] if !ok { @@ -106,7 +107,7 @@ func (p *ParticipantSupervisor) AddPublication(trackID livekit.TrackID) { } p.publications[trackID] = pm } - pm.opMon.PostEvent(types.OperationMonitorEventAddPendingPublication, nil) + pm.opMon.PostEvent(types.OperationMonitorEventAddPendingPublication, trackType.String()) p.lock.Unlock() } diff --git a/pkg/rtc/supervisor/publication_monitor.go b/pkg/rtc/supervisor/publication_monitor.go index c9050c48c..be483d665 100644 --- a/pkg/rtc/supervisor/publication_monitor.go +++ b/pkg/rtc/supervisor/publication_monitor.go @@ -6,7 +6,9 @@ import ( "time" "github.com/gammazero/deque" + "github.com/livekit/livekit-server/pkg/rtc/types" + "github.com/livekit/livekit-server/pkg/telemetry/prometheus" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" ) @@ -56,7 +58,7 @@ func (p *PublicationMonitor) PostEvent(ome types.OperationMonitorEvent, omd type case types.OperationMonitorEventPublisherPeerConnectionConnected: p.setConnected(omd.(bool)) case types.OperationMonitorEventAddPendingPublication: - p.addPending() + p.addPending(omd.(string)) case types.OperationMonitorEventSetPublicationMute: p.setMute(omd.(bool)) case types.OperationMonitorEventSetPublishedTrack: @@ -66,7 +68,9 @@ func (p *PublicationMonitor) PostEvent(ome types.OperationMonitorEvent, omd type } } -func (p *PublicationMonitor) addPending() { +func (p *PublicationMonitor) addPending(trackType string) { + prometheus.AddPublishAttempt(trackType) + p.lock.Lock() p.desiredPublishes.PushBack( &publish{ @@ -166,6 +170,10 @@ func (p *PublicationMonitor) update() { return } + if pub.isStart && p.publishedTrack != nil { + prometheus.AddPublishSuccess(p.publishedTrack.Kind().String()) + } + if (pub.isStart && p.publishedTrack == nil) || (!pub.isStart && p.publishedTrack != nil) { // put it back as the condition is not satisfied p.desiredPublishes.PushFront(pub) diff --git a/pkg/rtc/supervisor/subscription_monitor.go b/pkg/rtc/supervisor/subscription_monitor.go index 866cf5b14..10c9d9983 100644 --- a/pkg/rtc/supervisor/subscription_monitor.go +++ b/pkg/rtc/supervisor/subscription_monitor.go @@ -6,7 +6,9 @@ import ( "time" "github.com/gammazero/deque" + "github.com/livekit/livekit-server/pkg/rtc/types" + "github.com/livekit/livekit-server/pkg/telemetry/prometheus" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" ) @@ -73,6 +75,10 @@ func (s *SubscriptionMonitor) PostEvent(ome types.OperationMonitorEvent, omd typ } func (s *SubscriptionMonitor) updateSubscription(params SubscriptionOpParams) { + if params.IsSubscribe { + prometheus.AddSubscribeAttempt(params.SourceTrack.Kind().String()) + } + s.lock.Lock() so := s.getOrCreateSubscriptionOpsForSource(params.SourceTrack) @@ -171,6 +177,10 @@ func (s *SubscriptionMonitor) update() { break } + if tx.isSubscribe && so.subscribedTrack != nil { + prometheus.AddSubscribeSuccess(so.subscribedTrack.MediaTrack().Kind().String()) + } + if so.desiredTransitions.Len() == 0 && so.subscribedTrack == nil { delete(s.subscriptionOpsBySource, sourceTrack) } diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index e608a5876..14e22f1ba 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -409,7 +409,6 @@ func (t *PCTransport) setICEConnectedAt(at time.Time) { // This prevents reset of connected at time if ICE goes `Connected` -> `Disconnected` -> `Connected`. // t.iceConnectedAt = at - prometheus.ServiceOperationCounter.WithLabelValues("ice_connection", "success", "").Add(1) } t.lock.Unlock() } diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index 1019fc28f..ee295b4a5 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -155,7 +155,6 @@ func (s *RTCService) validate(r *http.Request) (livekit.RoomName, routing.Partic func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { // reject non websocket requests if !websocket.IsWebSocketUpgrade(r) { - prometheus.ServiceOperationCounter.WithLabelValues("signal_ws", "error", "reject").Add(1) w.WriteHeader(404) return } @@ -177,10 +176,10 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { if !s.config.Room.AutoCreate { _, _, err := s.store.LoadRoom(context.Background(), roomName, false) if err == ErrRoomNotFound { - handleError(w, 404, err, loggerFields...) + handleError(w, http.StatusNotFound, err, loggerFields...) return } else if err != nil { - handleError(w, 500, err, loggerFields...) + handleError(w, http.StatusInternalServerError, err, loggerFields...) return } } @@ -188,7 +187,6 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { // create room if it doesn't exist, also assigns an RTC node for the room rm, err := s.roomAllocator.CreateRoom(r.Context(), &livekit.CreateRoomRequest{Name: string(roomName)}) if err != nil { - prometheus.ServiceOperationCounter.WithLabelValues("signal_ws", "error", "create_room").Add(1) handleError(w, http.StatusInternalServerError, err, loggerFields...) return } @@ -196,7 +194,6 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { // this needs to be started first *before* using router functions on this node connId, reqSink, resSource, err := s.router.StartParticipantSignal(r.Context(), roomName, pi) if err != nil { - prometheus.ServiceOperationCounter.WithLabelValues("signal_ws", "error", "start_signal").Add(1) handleError(w, http.StatusInternalServerError, err, loggerFields...) return } @@ -206,11 +203,12 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { // instead of waiting forever on the WebSocket initialResponse, err := readInitialResponse(resSource, maxInitialResponseWait) if err != nil { - prometheus.ServiceOperationCounter.WithLabelValues("signal_ws", "error", "initial_response").Add(1) handleError(w, http.StatusInternalServerError, err, loggerFields...) return } + prometheus.IncrementParticipantJoin(1) + if !pi.Reconnect && initialResponse.GetJoin() != nil { pi.ID = livekit.ParticipantID(initialResponse.GetJoin().GetParticipant().GetSid()) } @@ -246,7 +244,6 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { // upgrade only once the basics are good to go conn, err := s.upgrader.Upgrade(w, r, nil) if err != nil { - prometheus.ServiceOperationCounter.WithLabelValues("signal_ws", "error", "upgrade").Add(1) handleError(w, http.StatusInternalServerError, err, loggerFields...) return } @@ -261,8 +258,6 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { signalStats.AddBytes(uint64(count), true) } } - - prometheus.ServiceOperationCounter.WithLabelValues("signal_ws", "success", "").Add(1) pLogger.Infow("new client WS connected", "connID", connId) // handle responses diff --git a/pkg/telemetry/events.go b/pkg/telemetry/events.go index 472aaa424..289c60130 100644 --- a/pkg/telemetry/events.go +++ b/pkg/telemetry/events.go @@ -68,7 +68,7 @@ func (t *telemetryService) ParticipantJoined( shouldSendEvent bool, ) { t.enqueue(func() { - prometheus.IncrementParticipantJoin(1) + prometheus.IncrementParticipantJoin(1, true) prometheus.AddParticipant() t.createWorker( diff --git a/pkg/telemetry/prometheus/node.go b/pkg/telemetry/prometheus/node.go index 2889cad7b..f9ad5e166 100644 --- a/pkg/telemetry/prometheus/node.go +++ b/pkg/telemetry/prometheus/node.go @@ -121,6 +121,11 @@ func GetUpdatedNodeStats(prev *livekit.NodeStats, prevAverage *livekit.NodeStats retransmitBytesNow := retransmitBytes.Load() retransmitPacketsNow := retransmitPackets.Load() participantJoinNow := participantJoin.Load() + participantRTCNow := participantRTC.Load() + trackPublishAttemptsNow := trackPublishAttempts.Load() + trackPublishSuccessNow := trackPublishSuccess.Load() + trackSubscribeAttemptsNow := trackSubscribeAttempts.Load() + trackSubscribeSuccessNow := trackSubscribeSuccess.Load() updatedAt := time.Now().Unix() elapsed := updatedAt - prevAverage.UpdatedAt @@ -136,37 +141,47 @@ func GetUpdatedNodeStats(prev *livekit.NodeStats, prevAverage *livekit.NodeStats } stats := &livekit.NodeStats{ - StartedAt: prev.StartedAt, - UpdatedAt: updatedAt, - NumRooms: roomTotal.Load(), - NumClients: participantTotal.Load(), - NumTracksIn: trackPublishedTotal.Load(), - NumTracksOut: trackSubscribedTotal.Load(), - BytesIn: bytesInNow, - BytesOut: bytesOutNow, - PacketsIn: packetsInNow, - PacketsOut: packetsOutNow, - RetransmitBytesOut: retransmitBytesNow, - RetransmitPacketsOut: retransmitPacketsNow, - NackTotal: nackTotalNow, - ParticipantJoin: participantJoinNow, - BytesInPerSec: prevAverage.BytesInPerSec, - BytesOutPerSec: prevAverage.BytesOutPerSec, - PacketsInPerSec: prevAverage.PacketsInPerSec, - PacketsOutPerSec: prevAverage.PacketsOutPerSec, - RetransmitBytesOutPerSec: prevAverage.RetransmitBytesOutPerSec, - RetransmitPacketsOutPerSec: prevAverage.RetransmitPacketsOutPerSec, - NackPerSec: prevAverage.NackPerSec, - ParticipantJoinPerSec: prevAverage.ParticipantJoinPerSec, - NumCpus: numCPUs, - CpuLoad: cpuLoad, - MemoryTotal: memTotal, - MemoryUsed: memUsed, - LoadAvgLast1Min: float32(loadAvg.Loadavg1), - LoadAvgLast5Min: float32(loadAvg.Loadavg5), - LoadAvgLast15Min: float32(loadAvg.Loadavg15), - SysPacketsOut: sysPackets, - SysPacketsDropped: sysDroppedPackets, + StartedAt: prev.StartedAt, + UpdatedAt: updatedAt, + NumRooms: roomTotal.Load(), + NumClients: participantTotal.Load(), + NumTracksIn: trackPublishedTotal.Load(), + NumTracksOut: trackSubscribedTotal.Load(), + NumTrackPublishAttempts: trackPublishAttemptsNow, + NumTrackPublishSuccess: trackPublishSuccessNow, + NumTrackSubscribeAttempts: trackSubscribeAttemptsNow, + NumTrackSubscribeSuccess: trackSubscribeSuccessNow, + BytesIn: bytesInNow, + BytesOut: bytesOutNow, + PacketsIn: packetsInNow, + PacketsOut: packetsOutNow, + RetransmitBytesOut: retransmitBytesNow, + RetransmitPacketsOut: retransmitPacketsNow, + NackTotal: nackTotalNow, + ParticipantJoin: participantJoinNow, + ParticipantRtc: participantRTCNow, + BytesInPerSec: prevAverage.BytesInPerSec, + BytesOutPerSec: prevAverage.BytesOutPerSec, + PacketsInPerSec: prevAverage.PacketsInPerSec, + PacketsOutPerSec: prevAverage.PacketsOutPerSec, + RetransmitBytesOutPerSec: prevAverage.RetransmitBytesOutPerSec, + RetransmitPacketsOutPerSec: prevAverage.RetransmitPacketsOutPerSec, + NackPerSec: prevAverage.NackPerSec, + ParticipantJoinPerSec: prevAverage.ParticipantJoinPerSec, + ParticipantRtcPerSec: prevAverage.ParticipantRtcPerSec, + NumCpus: numCPUs, + CpuLoad: cpuLoad, + MemoryTotal: memTotal, + MemoryUsed: memUsed, + LoadAvgLast1Min: float32(loadAvg.Loadavg1), + LoadAvgLast5Min: float32(loadAvg.Loadavg5), + LoadAvgLast15Min: float32(loadAvg.Loadavg15), + SysPacketsOut: sysPackets, + SysPacketsDropped: sysDroppedPackets, + TrackPublishAttemptsPerSec: prevAverage.TrackPublishAttemptsPerSec, + TrackPublishSuccessPerSec: prevAverage.TrackPublishSuccessPerSec, + TrackSubscribeAttemptsPerSec: prevAverage.TrackSubscribeAttemptsPerSec, + TrackSubscribeSuccessPerSec: prevAverage.TrackSubscribeSuccessPerSec, } // update stats @@ -179,8 +194,13 @@ func GetUpdatedNodeStats(prev *livekit.NodeStats, prevAverage *livekit.NodeStats stats.RetransmitPacketsOutPerSec = perSec(prevAverage.RetransmitPacketsOut, retransmitPacketsNow, elapsed) stats.NackPerSec = perSec(prevAverage.NackTotal, nackTotalNow, elapsed) stats.ParticipantJoinPerSec = perSec(prevAverage.ParticipantJoin, participantJoinNow, elapsed) - stats.SysPacketsOutPerSec = perSec(uint64(prev.SysPacketsOut), uint64(sysPackets), elapsed) - stats.SysPacketsDroppedPerSec = perSec(uint64(prev.SysPacketsDropped), uint64(sysDroppedPackets), elapsed) + stats.ParticipantRtcPerSec = perSec(prevAverage.ParticipantRtc, participantRTCNow, elapsed) + stats.SysPacketsOutPerSec = perSec(uint64(prevAverage.SysPacketsOut), uint64(sysPackets), elapsed) + stats.SysPacketsDroppedPerSec = perSec(uint64(prevAverage.SysPacketsDropped), uint64(sysDroppedPackets), elapsed) + stats.TrackPublishAttemptsPerSec = perSec(uint64(prevAverage.NumTrackPublishAttempts), uint64(trackPublishAttemptsNow), elapsed) + stats.TrackPublishSuccessPerSec = perSec(uint64(prevAverage.NumTrackPublishSuccess), uint64(trackPublishSuccessNow), elapsed) + stats.TrackSubscribeAttemptsPerSec = perSec(uint64(prevAverage.NumTrackSubscribeAttempts), uint64(trackSubscribeAttemptsNow), elapsed) + stats.TrackSubscribeSuccessPerSec = perSec(uint64(prevAverage.NumTrackSubscribeSuccess), uint64(trackSubscribeSuccessNow), elapsed) packetTotal := stats.SysPacketsOutPerSec + stats.SysPacketsDroppedPerSec if packetTotal == 0 { diff --git a/pkg/telemetry/prometheus/packets.go b/pkg/telemetry/prometheus/packets.go index b195950b8..5a6996345 100644 --- a/pkg/telemetry/prometheus/packets.go +++ b/pkg/telemetry/prometheus/packets.go @@ -25,6 +25,7 @@ var ( retransmitBytes atomic.Uint64 retransmitPackets atomic.Uint64 participantJoin atomic.Uint64 + participantRTC atomic.Uint64 promPacketLabels = []string{"direction", "transmission"} promPacketTotal *prometheus.CounterVec @@ -73,7 +74,7 @@ func initPacketStats(nodeID string, nodeType livekit.NodeType) { Subsystem: "participant_join", Name: "total", ConstLabels: prometheus.Labels{"node_id": nodeID, "node_type": nodeType.String()}, - }, nil) + }, []string{"state"}) promConnections = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: livekitNamespace, Subsystem: "connection", @@ -133,10 +134,15 @@ func IncrementRTCP(direction Direction, nack, pli, fir uint32) { } } -func IncrementParticipantJoin(join uint32) { +func IncrementParticipantJoin(join uint32, rtcConnected ...bool) { if join > 0 { - promParticipantJoin.WithLabelValues().Add(float64(join)) - participantJoin.Add(uint64(join)) + if len(rtcConnected) > 0 && rtcConnected[0] { + participantRTC.Add(uint64(join)) + promParticipantJoin.WithLabelValues("rtc_connected").Add(float64(join)) + } else { + participantJoin.Add(uint64(join)) + promParticipantJoin.WithLabelValues("signal_connected").Add(float64(join)) + } } } diff --git a/pkg/telemetry/prometheus/rooms.go b/pkg/telemetry/prometheus/rooms.go index 2bbca6476..de31aa0de 100644 --- a/pkg/telemetry/prometheus/rooms.go +++ b/pkg/telemetry/prometheus/rooms.go @@ -10,16 +10,22 @@ import ( ) var ( - roomTotal atomic.Int32 - participantTotal atomic.Int32 - trackPublishedTotal atomic.Int32 - trackSubscribedTotal atomic.Int32 + roomTotal atomic.Int32 + participantTotal atomic.Int32 + trackPublishedTotal atomic.Int32 + trackSubscribedTotal atomic.Int32 + trackPublishAttempts atomic.Int32 + trackPublishSuccess atomic.Int32 + trackSubscribeAttempts atomic.Int32 + trackSubscribeSuccess atomic.Int32 - promRoomTotal prometheus.Gauge - promRoomDuration prometheus.Histogram - promParticipantTotal prometheus.Gauge - promTrackPublishedTotal *prometheus.GaugeVec - promTrackSubscribedTotal *prometheus.GaugeVec + promRoomTotal prometheus.Gauge + promRoomDuration prometheus.Histogram + promParticipantTotal prometheus.Gauge + promTrackPublishedTotal *prometheus.GaugeVec + promTrackSubscribedTotal *prometheus.GaugeVec + promTrackPublishCounter *prometheus.CounterVec + promTrackSubscribeCounter *prometheus.CounterVec ) func initRoomStats(nodeID string, nodeType livekit.NodeType) { @@ -56,12 +62,26 @@ func initRoomStats(nodeID string, nodeType livekit.NodeType) { Name: "subscribed_total", ConstLabels: prometheus.Labels{"node_id": nodeID, "node_type": nodeType.String()}, }, []string{"kind"}) + promTrackPublishCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: livekitNamespace, + Subsystem: "track", + Name: "publish_counter", + ConstLabels: prometheus.Labels{"node_id": nodeID, "node_type": nodeType.String()}, + }, []string{"kind", "state"}) + promTrackSubscribeCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: livekitNamespace, + Subsystem: "track", + Name: "subscribe_counter", + ConstLabels: prometheus.Labels{"node_id": nodeID, "node_type": nodeType.String()}, + }, []string{"kind", "state"}) prometheus.MustRegister(promRoomTotal) prometheus.MustRegister(promRoomDuration) prometheus.MustRegister(promParticipantTotal) prometheus.MustRegister(promTrackPublishedTotal) prometheus.MustRegister(promTrackSubscribedTotal) + prometheus.MustRegister(promTrackPublishCounter) + prometheus.MustRegister(promTrackSubscribeCounter) } func RoomStarted() { @@ -97,6 +117,16 @@ func SubPublishedTrack(kind string) { trackPublishedTotal.Dec() } +func AddPublishAttempt(kind string) { + trackPublishAttempts.Inc() + promTrackPublishCounter.WithLabelValues(kind, "attempt").Inc() +} + +func AddPublishSuccess(kind string) { + trackPublishSuccess.Inc() + promTrackPublishCounter.WithLabelValues(kind, "success").Inc() +} + func AddSubscribedTrack(kind string) { promTrackSubscribedTotal.WithLabelValues(kind).Add(1) trackSubscribedTotal.Inc() @@ -106,3 +136,13 @@ func SubSubscribedTrack(kind string) { promTrackSubscribedTotal.WithLabelValues(kind).Sub(1) trackSubscribedTotal.Dec() } + +func AddSubscribeAttempt(kind string) { + trackSubscribeAttempts.Inc() + promTrackSubscribeCounter.WithLabelValues(kind, "attempt").Inc() +} + +func AddSubscribeSuccess(kind string) { + trackSubscribeSuccess.Inc() + promTrackSubscribeCounter.WithLabelValues(kind, "success").Inc() +}