Stats collect v2 (#1291)

* initial commit

* add correct label

* clean up

* more cleanup on adding stats

* cleanup

* move things to pub and sub monitors, ensure stats are correctly updated

* fix merge conflict

* Fix panic on MacOS (#1296)

* fixing last feedback

Co-authored-by: Raja Subramanian <raja.gobi@tutanota.com>
This commit is contained in:
Dan McFaul
2023-01-11 14:49:50 -07:00
committed by GitHub
parent 0ca80a4fa7
commit 4d6f0cd0f7
14 changed files with 147 additions and 273 deletions

View File

@@ -510,208 +510,6 @@
],
"title": "Network Rate",
"type": "timeseries"
},
{
"datasource": null,
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 16
},
"id": 15,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom"
},
"tooltip": {
"mode": "single"
}
},
"targets": [
{
"exemplar": true,
"expr": "sum(rate(livekit_node_service_operation{status=\"success\"}[5m])) by (type)",
"interval": "",
"legendFormat": "Success - {{type}} ",
"refId": "Success"
},
{
"exemplar": true,
"expr": "sum(rate(livekit_node_service_operation{status!=\"success\"}[5m]))",
"hide": false,
"interval": "",
"legendFormat": "Error",
"refId": "Error"
},
{
"exemplar": true,
"expr": "sum(rate(livekit_node_service_operation{status=\"success\", type=\"signal_ws\"}[5m])) - sum(rate(livekit_node_service_operation{status=\"success\", type=\"ice_connection\"}[5m]))",
"hide": false,
"interval": "",
"legendFormat": "Missing",
"refId": "Missing"
}
],
"thresholds": [
{
"colorMode": "critical",
"op": "gt",
"value": 0,
"visible": true
}
],
"title": "Participant Connections",
"type": "timeseries"
},
{
"datasource": null,
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 24
},
"id": 17,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom"
},
"tooltip": {
"mode": "single"
}
},
"targets": [
{
"exemplar": true,
"expr": "sum(rate(livekit_node_service_operation{status=\"success\", type=\"ice_connection\"}[5m])) / sum(rate(livekit_node_service_operation{status=\"success\", type=\"signal_ws\"}[5m]))",
"interval": "",
"legendFormat": "Success Percent",
"refId": "A"
},
{
"exemplar": true,
"expr": "sum(rate(livekit_node_service_operation{status=\"success\", type=\"signal_ws\"}[5m]))",
"hide": true,
"interval": "",
"legendFormat": "",
"refId": "signal_ws"
}
],
"thresholds": [
{
"colorMode": "critical",
"op": "lt",
"value": 0.9,
"visible": true
}
],
"title": "Successful RTC Connection Flow",
"type": "timeseries"
}
],
"refresh": "5m",

2
go.mod
View File

@@ -18,7 +18,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20221221221243-f361fbe40290
github.com/livekit/mediatransportutil v0.0.0-20230111071722-904079e94a7c
github.com/livekit/protocol v1.3.2-0.20230111091921-f637ca8e675f
github.com/livekit/protocol v1.3.2-0.20230111195642-abfad31c5f93
github.com/livekit/psrpc v0.2.1
github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995
github.com/mackerelio/go-osstat v0.2.3

4
go.sum
View File

@@ -233,8 +233,8 @@ github.com/livekit/mageutil v0.0.0-20221221221243-f361fbe40290 h1:ZVsQUuUOM9G7O3
github.com/livekit/mageutil v0.0.0-20221221221243-f361fbe40290/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20230111071722-904079e94a7c h1:wdzwTJjCpzy2FDmwdyVVGVa4+U9iv3E4Jy9qUDe/ubw=
github.com/livekit/mediatransportutil v0.0.0-20230111071722-904079e94a7c/go.mod h1:1Dlx20JPoIKGP45eo+yuj0HjeE25zmyeX/EWHiPCjFw=
github.com/livekit/protocol v1.3.2-0.20230111091921-f637ca8e675f h1:5trxeV2GknxRya2EgbE3BZeB+a8ULLDBRZffLxVq1x0=
github.com/livekit/protocol v1.3.2-0.20230111091921-f637ca8e675f/go.mod h1:gwCG03nKlHlC9hTjL4pXQpn783ALhmbyhq65UZxqbb8=
github.com/livekit/protocol v1.3.2-0.20230111195642-abfad31c5f93 h1:KBW2Puv2tdZELD+zCtGLw/X9vLe9975jEFMFOOpSRGI=
github.com/livekit/protocol v1.3.2-0.20230111195642-abfad31c5f93/go.mod h1:gwCG03nKlHlC9hTjL4pXQpn783ALhmbyhq65UZxqbb8=
github.com/livekit/psrpc v0.2.1 h1:ph/4egUMueUPoh5PZ/Aw4v6SH3wAbA+2t/GyCbpPKTg=
github.com/livekit/psrpc v0.2.1/go.mod h1:MCe0xLdFPXmzogPiLrM94JIJbctb9+fAv5qYPkY2DXw=
github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995 h1:vOaY2qvfLihDyeZtnGGN1Law9wRrw8BMGCr1TygTvMw=

View File

@@ -589,7 +589,7 @@ func (p *ParticipantImpl) SetMigrateInfo(
for _, t := range mediaTracks {
ti := t.GetTrack()
p.supervisor.AddPublication(livekit.TrackID(ti.Sid))
p.supervisor.AddPublication(livekit.TrackID(ti.Sid), ti.Type)
p.supervisor.SetPublicationMute(livekit.TrackID(ti.Sid), ti.Muted)
p.pendingTracks[t.GetCid()] = &pendingTrackInfo{trackInfos: []*livekit.TrackInfo{ti}, migrated: true}
@@ -1548,7 +1548,7 @@ func (p *ParticipantImpl) addPendingTrackLocked(req *livekit.AddTrackRequest) *l
}
if p.getPublishedTrackBySignalCid(req.Cid) != nil || p.getPublishedTrackBySdpCid(req.Cid) != nil || p.pendingTracks[req.Cid] != nil {
p.supervisor.AddPublication(livekit.TrackID(ti.Sid))
p.supervisor.AddPublication(livekit.TrackID(ti.Sid), ti.Type)
p.supervisor.SetPublicationMute(livekit.TrackID(ti.Sid), ti.Muted)
if p.pendingTracks[req.Cid] == nil {
@@ -1560,7 +1560,7 @@ func (p *ParticipantImpl) addPendingTrackLocked(req *livekit.AddTrackRequest) *l
return nil
}
p.supervisor.AddPublication(livekit.TrackID(ti.Sid))
p.supervisor.AddPublication(livekit.TrackID(ti.Sid), ti.Type)
p.supervisor.SetPublicationMute(livekit.TrackID(ti.Sid), ti.Muted)
p.pendingTracks[req.Cid] = &pendingTrackInfo{trackInfos: []*livekit.TrackInfo{ti}}

View File

@@ -221,12 +221,10 @@ func (r *Room) Join(participant types.LocalParticipant, opts *ParticipantOptions
defer r.lock.Unlock()
if r.IsClosed() {
prometheus.ServiceOperationCounter.WithLabelValues("participant_join", "error", "room_closed").Add(1)
return ErrRoomClosed
}
if r.participants[participant.Identity()] != nil {
prometheus.ServiceOperationCounter.WithLabelValues("participant_join", "error", "already_joined").Add(1)
return ErrAlreadyJoined
}
@@ -239,7 +237,6 @@ func (r *Room) Join(participant types.LocalParticipant, opts *ParticipantOptions
}
if participantCount >= int(r.protoRoom.MaxParticipants) {
prometheus.ServiceOperationCounter.WithLabelValues("participant_join", "error", "max_exceeded").Add(1)
return ErrMaxParticipantsExceeded
}
}

View File

@@ -4,10 +4,11 @@ import (
"sync"
"time"
"go.uber.org/atomic"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"go.uber.org/atomic"
)
const (
@@ -91,7 +92,7 @@ func (p *ParticipantSupervisor) SetPublisherPeerConnectionConnected(isConnected
p.lock.Unlock()
}
func (p *ParticipantSupervisor) AddPublication(trackID livekit.TrackID) {
func (p *ParticipantSupervisor) AddPublication(trackID livekit.TrackID, trackType livekit.TrackType) {
p.lock.Lock()
pm, ok := p.publications[trackID]
if !ok {
@@ -106,7 +107,7 @@ func (p *ParticipantSupervisor) AddPublication(trackID livekit.TrackID) {
}
p.publications[trackID] = pm
}
pm.opMon.PostEvent(types.OperationMonitorEventAddPendingPublication, nil)
pm.opMon.PostEvent(types.OperationMonitorEventAddPendingPublication, trackType.String())
p.lock.Unlock()
}

View File

@@ -6,7 +6,9 @@ import (
"time"
"github.com/gammazero/deque"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
)
@@ -56,7 +58,7 @@ func (p *PublicationMonitor) PostEvent(ome types.OperationMonitorEvent, omd type
case types.OperationMonitorEventPublisherPeerConnectionConnected:
p.setConnected(omd.(bool))
case types.OperationMonitorEventAddPendingPublication:
p.addPending()
p.addPending(omd.(string))
case types.OperationMonitorEventSetPublicationMute:
p.setMute(omd.(bool))
case types.OperationMonitorEventSetPublishedTrack:
@@ -66,7 +68,9 @@ func (p *PublicationMonitor) PostEvent(ome types.OperationMonitorEvent, omd type
}
}
func (p *PublicationMonitor) addPending() {
func (p *PublicationMonitor) addPending(trackType string) {
prometheus.AddPublishAttempt(trackType)
p.lock.Lock()
p.desiredPublishes.PushBack(
&publish{
@@ -166,6 +170,10 @@ func (p *PublicationMonitor) update() {
return
}
if pub.isStart && p.publishedTrack != nil {
prometheus.AddPublishSuccess(p.publishedTrack.Kind().String())
}
if (pub.isStart && p.publishedTrack == nil) || (!pub.isStart && p.publishedTrack != nil) {
// put it back as the condition is not satisfied
p.desiredPublishes.PushFront(pub)

View File

@@ -6,7 +6,9 @@ import (
"time"
"github.com/gammazero/deque"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
)
@@ -73,6 +75,10 @@ func (s *SubscriptionMonitor) PostEvent(ome types.OperationMonitorEvent, omd typ
}
func (s *SubscriptionMonitor) updateSubscription(params SubscriptionOpParams) {
if params.IsSubscribe {
prometheus.AddSubscribeAttempt(params.SourceTrack.Kind().String())
}
s.lock.Lock()
so := s.getOrCreateSubscriptionOpsForSource(params.SourceTrack)
@@ -171,6 +177,10 @@ func (s *SubscriptionMonitor) update() {
break
}
if tx.isSubscribe && so.subscribedTrack != nil {
prometheus.AddSubscribeSuccess(so.subscribedTrack.MediaTrack().Kind().String())
}
if so.desiredTransitions.Len() == 0 && so.subscribedTrack == nil {
delete(s.subscriptionOpsBySource, sourceTrack)
}

View File

@@ -409,7 +409,6 @@ func (t *PCTransport) setICEConnectedAt(at time.Time) {
// This prevents reset of connected at time if ICE goes `Connected` -> `Disconnected` -> `Connected`.
//
t.iceConnectedAt = at
prometheus.ServiceOperationCounter.WithLabelValues("ice_connection", "success", "").Add(1)
}
t.lock.Unlock()
}

View File

@@ -155,7 +155,6 @@ func (s *RTCService) validate(r *http.Request) (livekit.RoomName, routing.Partic
func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// reject non websocket requests
if !websocket.IsWebSocketUpgrade(r) {
prometheus.ServiceOperationCounter.WithLabelValues("signal_ws", "error", "reject").Add(1)
w.WriteHeader(404)
return
}
@@ -177,10 +176,10 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if !s.config.Room.AutoCreate {
_, _, err := s.store.LoadRoom(context.Background(), roomName, false)
if err == ErrRoomNotFound {
handleError(w, 404, err, loggerFields...)
handleError(w, http.StatusNotFound, err, loggerFields...)
return
} else if err != nil {
handleError(w, 500, err, loggerFields...)
handleError(w, http.StatusInternalServerError, err, loggerFields...)
return
}
}
@@ -188,7 +187,6 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// create room if it doesn't exist, also assigns an RTC node for the room
rm, err := s.roomAllocator.CreateRoom(r.Context(), &livekit.CreateRoomRequest{Name: string(roomName)})
if err != nil {
prometheus.ServiceOperationCounter.WithLabelValues("signal_ws", "error", "create_room").Add(1)
handleError(w, http.StatusInternalServerError, err, loggerFields...)
return
}
@@ -196,7 +194,6 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// this needs to be started first *before* using router functions on this node
connId, reqSink, resSource, err := s.router.StartParticipantSignal(r.Context(), roomName, pi)
if err != nil {
prometheus.ServiceOperationCounter.WithLabelValues("signal_ws", "error", "start_signal").Add(1)
handleError(w, http.StatusInternalServerError, err, loggerFields...)
return
}
@@ -206,11 +203,12 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// instead of waiting forever on the WebSocket
initialResponse, err := readInitialResponse(resSource, maxInitialResponseWait)
if err != nil {
prometheus.ServiceOperationCounter.WithLabelValues("signal_ws", "error", "initial_response").Add(1)
handleError(w, http.StatusInternalServerError, err, loggerFields...)
return
}
prometheus.IncrementParticipantJoin(1)
if !pi.Reconnect && initialResponse.GetJoin() != nil {
pi.ID = livekit.ParticipantID(initialResponse.GetJoin().GetParticipant().GetSid())
}
@@ -246,7 +244,6 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// upgrade only once the basics are good to go
conn, err := s.upgrader.Upgrade(w, r, nil)
if err != nil {
prometheus.ServiceOperationCounter.WithLabelValues("signal_ws", "error", "upgrade").Add(1)
handleError(w, http.StatusInternalServerError, err, loggerFields...)
return
}
@@ -261,8 +258,6 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
signalStats.AddBytes(uint64(count), true)
}
}
prometheus.ServiceOperationCounter.WithLabelValues("signal_ws", "success", "").Add(1)
pLogger.Infow("new client WS connected", "connID", connId)
// handle responses

View File

@@ -68,7 +68,7 @@ func (t *telemetryService) ParticipantJoined(
shouldSendEvent bool,
) {
t.enqueue(func() {
prometheus.IncrementParticipantJoin(1)
prometheus.IncrementParticipantJoin(1, true)
prometheus.AddParticipant()
t.createWorker(

View File

@@ -121,6 +121,11 @@ func GetUpdatedNodeStats(prev *livekit.NodeStats, prevAverage *livekit.NodeStats
retransmitBytesNow := retransmitBytes.Load()
retransmitPacketsNow := retransmitPackets.Load()
participantJoinNow := participantJoin.Load()
participantRTCNow := participantRTC.Load()
trackPublishAttemptsNow := trackPublishAttempts.Load()
trackPublishSuccessNow := trackPublishSuccess.Load()
trackSubscribeAttemptsNow := trackSubscribeAttempts.Load()
trackSubscribeSuccessNow := trackSubscribeSuccess.Load()
updatedAt := time.Now().Unix()
elapsed := updatedAt - prevAverage.UpdatedAt
@@ -136,37 +141,47 @@ func GetUpdatedNodeStats(prev *livekit.NodeStats, prevAverage *livekit.NodeStats
}
stats := &livekit.NodeStats{
StartedAt: prev.StartedAt,
UpdatedAt: updatedAt,
NumRooms: roomTotal.Load(),
NumClients: participantTotal.Load(),
NumTracksIn: trackPublishedTotal.Load(),
NumTracksOut: trackSubscribedTotal.Load(),
BytesIn: bytesInNow,
BytesOut: bytesOutNow,
PacketsIn: packetsInNow,
PacketsOut: packetsOutNow,
RetransmitBytesOut: retransmitBytesNow,
RetransmitPacketsOut: retransmitPacketsNow,
NackTotal: nackTotalNow,
ParticipantJoin: participantJoinNow,
BytesInPerSec: prevAverage.BytesInPerSec,
BytesOutPerSec: prevAverage.BytesOutPerSec,
PacketsInPerSec: prevAverage.PacketsInPerSec,
PacketsOutPerSec: prevAverage.PacketsOutPerSec,
RetransmitBytesOutPerSec: prevAverage.RetransmitBytesOutPerSec,
RetransmitPacketsOutPerSec: prevAverage.RetransmitPacketsOutPerSec,
NackPerSec: prevAverage.NackPerSec,
ParticipantJoinPerSec: prevAverage.ParticipantJoinPerSec,
NumCpus: numCPUs,
CpuLoad: cpuLoad,
MemoryTotal: memTotal,
MemoryUsed: memUsed,
LoadAvgLast1Min: float32(loadAvg.Loadavg1),
LoadAvgLast5Min: float32(loadAvg.Loadavg5),
LoadAvgLast15Min: float32(loadAvg.Loadavg15),
SysPacketsOut: sysPackets,
SysPacketsDropped: sysDroppedPackets,
StartedAt: prev.StartedAt,
UpdatedAt: updatedAt,
NumRooms: roomTotal.Load(),
NumClients: participantTotal.Load(),
NumTracksIn: trackPublishedTotal.Load(),
NumTracksOut: trackSubscribedTotal.Load(),
NumTrackPublishAttempts: trackPublishAttemptsNow,
NumTrackPublishSuccess: trackPublishSuccessNow,
NumTrackSubscribeAttempts: trackSubscribeAttemptsNow,
NumTrackSubscribeSuccess: trackSubscribeSuccessNow,
BytesIn: bytesInNow,
BytesOut: bytesOutNow,
PacketsIn: packetsInNow,
PacketsOut: packetsOutNow,
RetransmitBytesOut: retransmitBytesNow,
RetransmitPacketsOut: retransmitPacketsNow,
NackTotal: nackTotalNow,
ParticipantJoin: participantJoinNow,
ParticipantRtc: participantRTCNow,
BytesInPerSec: prevAverage.BytesInPerSec,
BytesOutPerSec: prevAverage.BytesOutPerSec,
PacketsInPerSec: prevAverage.PacketsInPerSec,
PacketsOutPerSec: prevAverage.PacketsOutPerSec,
RetransmitBytesOutPerSec: prevAverage.RetransmitBytesOutPerSec,
RetransmitPacketsOutPerSec: prevAverage.RetransmitPacketsOutPerSec,
NackPerSec: prevAverage.NackPerSec,
ParticipantJoinPerSec: prevAverage.ParticipantJoinPerSec,
ParticipantRtcPerSec: prevAverage.ParticipantRtcPerSec,
NumCpus: numCPUs,
CpuLoad: cpuLoad,
MemoryTotal: memTotal,
MemoryUsed: memUsed,
LoadAvgLast1Min: float32(loadAvg.Loadavg1),
LoadAvgLast5Min: float32(loadAvg.Loadavg5),
LoadAvgLast15Min: float32(loadAvg.Loadavg15),
SysPacketsOut: sysPackets,
SysPacketsDropped: sysDroppedPackets,
TrackPublishAttemptsPerSec: prevAverage.TrackPublishAttemptsPerSec,
TrackPublishSuccessPerSec: prevAverage.TrackPublishSuccessPerSec,
TrackSubscribeAttemptsPerSec: prevAverage.TrackSubscribeAttemptsPerSec,
TrackSubscribeSuccessPerSec: prevAverage.TrackSubscribeSuccessPerSec,
}
// update stats
@@ -179,8 +194,13 @@ func GetUpdatedNodeStats(prev *livekit.NodeStats, prevAverage *livekit.NodeStats
stats.RetransmitPacketsOutPerSec = perSec(prevAverage.RetransmitPacketsOut, retransmitPacketsNow, elapsed)
stats.NackPerSec = perSec(prevAverage.NackTotal, nackTotalNow, elapsed)
stats.ParticipantJoinPerSec = perSec(prevAverage.ParticipantJoin, participantJoinNow, elapsed)
stats.SysPacketsOutPerSec = perSec(uint64(prev.SysPacketsOut), uint64(sysPackets), elapsed)
stats.SysPacketsDroppedPerSec = perSec(uint64(prev.SysPacketsDropped), uint64(sysDroppedPackets), elapsed)
stats.ParticipantRtcPerSec = perSec(prevAverage.ParticipantRtc, participantRTCNow, elapsed)
stats.SysPacketsOutPerSec = perSec(uint64(prevAverage.SysPacketsOut), uint64(sysPackets), elapsed)
stats.SysPacketsDroppedPerSec = perSec(uint64(prevAverage.SysPacketsDropped), uint64(sysDroppedPackets), elapsed)
stats.TrackPublishAttemptsPerSec = perSec(uint64(prevAverage.NumTrackPublishAttempts), uint64(trackPublishAttemptsNow), elapsed)
stats.TrackPublishSuccessPerSec = perSec(uint64(prevAverage.NumTrackPublishSuccess), uint64(trackPublishSuccessNow), elapsed)
stats.TrackSubscribeAttemptsPerSec = perSec(uint64(prevAverage.NumTrackSubscribeAttempts), uint64(trackSubscribeAttemptsNow), elapsed)
stats.TrackSubscribeSuccessPerSec = perSec(uint64(prevAverage.NumTrackSubscribeSuccess), uint64(trackSubscribeSuccessNow), elapsed)
packetTotal := stats.SysPacketsOutPerSec + stats.SysPacketsDroppedPerSec
if packetTotal == 0 {

View File

@@ -25,6 +25,7 @@ var (
retransmitBytes atomic.Uint64
retransmitPackets atomic.Uint64
participantJoin atomic.Uint64
participantRTC atomic.Uint64
promPacketLabels = []string{"direction", "transmission"}
promPacketTotal *prometheus.CounterVec
@@ -73,7 +74,7 @@ func initPacketStats(nodeID string, nodeType livekit.NodeType) {
Subsystem: "participant_join",
Name: "total",
ConstLabels: prometheus.Labels{"node_id": nodeID, "node_type": nodeType.String()},
}, nil)
}, []string{"state"})
promConnections = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: livekitNamespace,
Subsystem: "connection",
@@ -133,10 +134,15 @@ func IncrementRTCP(direction Direction, nack, pli, fir uint32) {
}
}
func IncrementParticipantJoin(join uint32) {
func IncrementParticipantJoin(join uint32, rtcConnected ...bool) {
if join > 0 {
promParticipantJoin.WithLabelValues().Add(float64(join))
participantJoin.Add(uint64(join))
if len(rtcConnected) > 0 && rtcConnected[0] {
participantRTC.Add(uint64(join))
promParticipantJoin.WithLabelValues("rtc_connected").Add(float64(join))
} else {
participantJoin.Add(uint64(join))
promParticipantJoin.WithLabelValues("signal_connected").Add(float64(join))
}
}
}

View File

@@ -10,16 +10,22 @@ import (
)
var (
roomTotal atomic.Int32
participantTotal atomic.Int32
trackPublishedTotal atomic.Int32
trackSubscribedTotal atomic.Int32
roomTotal atomic.Int32
participantTotal atomic.Int32
trackPublishedTotal atomic.Int32
trackSubscribedTotal atomic.Int32
trackPublishAttempts atomic.Int32
trackPublishSuccess atomic.Int32
trackSubscribeAttempts atomic.Int32
trackSubscribeSuccess atomic.Int32
promRoomTotal prometheus.Gauge
promRoomDuration prometheus.Histogram
promParticipantTotal prometheus.Gauge
promTrackPublishedTotal *prometheus.GaugeVec
promTrackSubscribedTotal *prometheus.GaugeVec
promRoomTotal prometheus.Gauge
promRoomDuration prometheus.Histogram
promParticipantTotal prometheus.Gauge
promTrackPublishedTotal *prometheus.GaugeVec
promTrackSubscribedTotal *prometheus.GaugeVec
promTrackPublishCounter *prometheus.CounterVec
promTrackSubscribeCounter *prometheus.CounterVec
)
func initRoomStats(nodeID string, nodeType livekit.NodeType) {
@@ -56,12 +62,26 @@ func initRoomStats(nodeID string, nodeType livekit.NodeType) {
Name: "subscribed_total",
ConstLabels: prometheus.Labels{"node_id": nodeID, "node_type": nodeType.String()},
}, []string{"kind"})
promTrackPublishCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: livekitNamespace,
Subsystem: "track",
Name: "publish_counter",
ConstLabels: prometheus.Labels{"node_id": nodeID, "node_type": nodeType.String()},
}, []string{"kind", "state"})
promTrackSubscribeCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: livekitNamespace,
Subsystem: "track",
Name: "subscribe_counter",
ConstLabels: prometheus.Labels{"node_id": nodeID, "node_type": nodeType.String()},
}, []string{"kind", "state"})
prometheus.MustRegister(promRoomTotal)
prometheus.MustRegister(promRoomDuration)
prometheus.MustRegister(promParticipantTotal)
prometheus.MustRegister(promTrackPublishedTotal)
prometheus.MustRegister(promTrackSubscribedTotal)
prometheus.MustRegister(promTrackPublishCounter)
prometheus.MustRegister(promTrackSubscribeCounter)
}
func RoomStarted() {
@@ -97,6 +117,16 @@ func SubPublishedTrack(kind string) {
trackPublishedTotal.Dec()
}
func AddPublishAttempt(kind string) {
trackPublishAttempts.Inc()
promTrackPublishCounter.WithLabelValues(kind, "attempt").Inc()
}
func AddPublishSuccess(kind string) {
trackPublishSuccess.Inc()
promTrackPublishCounter.WithLabelValues(kind, "success").Inc()
}
func AddSubscribedTrack(kind string) {
promTrackSubscribedTotal.WithLabelValues(kind).Add(1)
trackSubscribedTotal.Inc()
@@ -106,3 +136,13 @@ func SubSubscribedTrack(kind string) {
promTrackSubscribedTotal.WithLabelValues(kind).Sub(1)
trackSubscribedTotal.Dec()
}
func AddSubscribeAttempt(kind string) {
trackSubscribeAttempts.Inc()
promTrackSubscribeCounter.WithLabelValues(kind, "attempt").Inc()
}
func AddSubscribeSuccess(kind string) {
trackSubscribeSuccess.Inc()
promTrackSubscribeCounter.WithLabelValues(kind, "success").Inc()
}