mirror of
https://github.com/livekit/livekit.git
synced 2026-03-30 13:25:42 +00:00
290 lines
11 KiB
Go
290 lines
11 KiB
Go
// Copyright 2023 LiveKit, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package prometheus
|
|
|
|
import (
|
|
"time"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/twitchtv/twirp"
|
|
"go.uber.org/atomic"
|
|
|
|
"github.com/livekit/protocol/livekit"
|
|
"github.com/livekit/protocol/rpc"
|
|
"github.com/livekit/protocol/utils/hwstats"
|
|
"github.com/livekit/protocol/webhook"
|
|
)
|
|
|
|
const (
|
|
livekitNamespace string = "livekit"
|
|
)
|
|
|
|
var (
|
|
initialized atomic.Bool
|
|
|
|
promMessageCounter *prometheus.CounterVec
|
|
promServiceOperationCounter *prometheus.CounterVec
|
|
promTwirpRequestStatusCounter *prometheus.CounterVec
|
|
|
|
sysPacketsStart uint32
|
|
sysDroppedPacketsStart uint32
|
|
promSysPacketGauge *prometheus.GaugeVec
|
|
|
|
cpuStats *hwstats.CPUStats
|
|
memoryStats *hwstats.MemoryStats
|
|
)
|
|
|
|
func Init(nodeID string, nodeType livekit.NodeType) error {
|
|
if initialized.Swap(true) {
|
|
return nil
|
|
}
|
|
|
|
promMessageCounter = prometheus.NewCounterVec(
|
|
prometheus.CounterOpts{
|
|
Namespace: livekitNamespace,
|
|
Subsystem: "node",
|
|
Name: "messages",
|
|
ConstLabels: prometheus.Labels{"node_id": nodeID, "node_type": nodeType.String()},
|
|
},
|
|
[]string{"type", "status", "direction"},
|
|
)
|
|
|
|
promServiceOperationCounter = prometheus.NewCounterVec(
|
|
prometheus.CounterOpts{
|
|
Namespace: livekitNamespace,
|
|
Subsystem: "node",
|
|
Name: "service_operation",
|
|
ConstLabels: prometheus.Labels{"node_id": nodeID, "node_type": nodeType.String()},
|
|
},
|
|
[]string{"type", "status", "error_type"},
|
|
)
|
|
|
|
promTwirpRequestStatusCounter = prometheus.NewCounterVec(
|
|
prometheus.CounterOpts{
|
|
Namespace: livekitNamespace,
|
|
Subsystem: "node",
|
|
Name: "twirp_request_status",
|
|
ConstLabels: prometheus.Labels{"node_id": nodeID, "node_type": nodeType.String()},
|
|
},
|
|
[]string{"service", "method", "status", "code"},
|
|
)
|
|
|
|
promSysPacketGauge = prometheus.NewGaugeVec(
|
|
prometheus.GaugeOpts{
|
|
Namespace: livekitNamespace,
|
|
Subsystem: "node",
|
|
Name: "packet_total",
|
|
ConstLabels: prometheus.Labels{"node_id": nodeID, "node_type": nodeType.String()},
|
|
Help: "System level packet count. Count starts at 0 when service is first started.",
|
|
},
|
|
[]string{"type"},
|
|
)
|
|
|
|
prometheus.MustRegister(promMessageCounter)
|
|
prometheus.MustRegister(promServiceOperationCounter)
|
|
prometheus.MustRegister(promTwirpRequestStatusCounter)
|
|
prometheus.MustRegister(promSysPacketGauge)
|
|
|
|
sysPacketsStart, sysDroppedPacketsStart, _ = getTCStats()
|
|
|
|
initPacketStats(nodeID, nodeType)
|
|
initRoomStats(nodeID, nodeType)
|
|
rpc.InitPSRPCStats(prometheus.Labels{"node_id": nodeID, "node_type": nodeType.String()})
|
|
webhook.InitWebhookStats(prometheus.Labels{"node_id": nodeID, "node_type": nodeType.String()})
|
|
initQualityStats(nodeID, nodeType)
|
|
initDataPacketStats(nodeID, nodeType)
|
|
initDebugStats(nodeID, nodeType)
|
|
|
|
var err error
|
|
cpuStats, err = hwstats.NewCPUStats(nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
memoryStats, err = hwstats.NewMemoryStats()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func GetNodeStats(nodeStartedAt int64, prevStats []*livekit.NodeStats, rateIntervals []time.Duration) (*livekit.NodeStats, error) {
|
|
loadAvg, err := getLoadAvg()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// On MacOS, get "\"vm_stat\": executable file not found in $PATH" although it is in /usr/bin
|
|
// So, do not error out. Use the information if it is available.
|
|
memUsed, memTotal, _ := memoryStats.GetMemory()
|
|
|
|
sysPackets, sysDroppedPackets, _ := getTCStats()
|
|
promSysPacketGauge.WithLabelValues("out").Set(float64(sysPackets - sysPacketsStart))
|
|
promSysPacketGauge.WithLabelValues("dropped").Set(float64(sysDroppedPackets - sysDroppedPacketsStart))
|
|
|
|
stats := &livekit.NodeStats{
|
|
StartedAt: nodeStartedAt,
|
|
UpdatedAt: time.Now().Unix(),
|
|
NumRooms: roomCurrent.Load(),
|
|
NumClients: participantCurrent.Load(),
|
|
NumTracksIn: trackPublishedCurrent.Load(),
|
|
NumTracksOut: trackSubscribedCurrent.Load(),
|
|
NumTrackPublishAttempts: trackPublishAttempts.Load(),
|
|
NumTrackPublishSuccess: trackPublishSuccess.Load(),
|
|
NumTrackPublishCancels: trackPublishCancels.Load(),
|
|
NumTrackSubscribeAttempts: trackSubscribeAttempts.Load(),
|
|
NumTrackSubscribeSuccess: trackSubscribeSuccess.Load(),
|
|
NumTrackSubscribeCancels: trackSubscribeCancels.Load(),
|
|
BytesIn: bytesIn.Load(),
|
|
BytesOut: bytesOut.Load(),
|
|
PacketsIn: packetsIn.Load(),
|
|
PacketsOut: packetsOut.Load(),
|
|
RetransmitBytesOut: retransmitBytes.Load(),
|
|
RetransmitPacketsOut: retransmitPackets.Load(),
|
|
NackTotal: nackTotal.Load(),
|
|
ParticipantSignalConnected: participantSignalConnected.Load(),
|
|
ParticipantRtcInit: participantRTCInit.Load(),
|
|
ParticipantRtcConnected: participantRTCConnected.Load(),
|
|
ParticipantRtcCanceled: participantRTCCanceled.Load(),
|
|
ForwardLatency: forwardLatency.Load(),
|
|
ForwardJitter: forwardJitter.Load(),
|
|
NumCpus: uint32(cpuStats.NumCPU()), // this will round down to the nearest integer
|
|
CpuLoad: float32(cpuStats.GetCPULoad()),
|
|
MemoryTotal: memTotal,
|
|
MemoryUsed: memUsed,
|
|
LoadAvgLast1Min: float32(loadAvg.Loadavg1),
|
|
LoadAvgLast5Min: float32(loadAvg.Loadavg5),
|
|
LoadAvgLast15Min: float32(loadAvg.Loadavg15),
|
|
SysPacketsOut: sysPackets,
|
|
SysPacketsDropped: sysDroppedPackets,
|
|
}
|
|
|
|
for _, rateInterval := range rateIntervals {
|
|
for idx := len(prevStats) - 1; idx >= 0; idx-- {
|
|
prev := prevStats[idx]
|
|
if prev == nil {
|
|
continue
|
|
}
|
|
|
|
if stats.UpdatedAt-prev.UpdatedAt >= int64(rateInterval.Seconds()) {
|
|
if rate := getNodeStatsRate(append(prevStats[idx:], stats)); rate != nil {
|
|
stats.Rates = append(stats.Rates, rate)
|
|
}
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
return stats, nil
|
|
}
|
|
|
|
func getNodeStatsRate(statsHistory []*livekit.NodeStats) *livekit.NodeStatsRate {
|
|
if len(statsHistory) == 0 {
|
|
return nil
|
|
}
|
|
|
|
elapsed := statsHistory[len(statsHistory)-1].UpdatedAt - statsHistory[0].UpdatedAt
|
|
if elapsed <= 0 {
|
|
return nil
|
|
}
|
|
|
|
// time weighted averages
|
|
var cpuLoad, memoryUsed, memoryTotal, memoryLoad float32
|
|
for idx := len(statsHistory) - 1; idx > 0; idx-- {
|
|
stats := statsHistory[idx]
|
|
prevStats := statsHistory[idx-1]
|
|
if stats == nil || prevStats == nil {
|
|
continue
|
|
}
|
|
|
|
spanElapsed := stats.UpdatedAt - prevStats.UpdatedAt
|
|
if spanElapsed <= 0 {
|
|
continue
|
|
}
|
|
|
|
cpuLoad += stats.CpuLoad * float32(spanElapsed)
|
|
memoryUsed += float32(stats.MemoryUsed) * float32(spanElapsed)
|
|
memoryTotal += float32(stats.MemoryTotal) * float32(spanElapsed)
|
|
if stats.MemoryTotal > 0 {
|
|
memoryLoad += float32(stats.MemoryUsed) / float32(stats.MemoryTotal) * float32(spanElapsed)
|
|
}
|
|
}
|
|
|
|
earlier := statsHistory[0]
|
|
later := statsHistory[len(statsHistory)-1]
|
|
rate := &livekit.NodeStatsRate{
|
|
StartedAt: earlier.UpdatedAt,
|
|
EndedAt: later.UpdatedAt,
|
|
Duration: elapsed,
|
|
BytesIn: perSec(earlier.BytesIn, later.BytesIn, elapsed),
|
|
BytesOut: perSec(earlier.BytesOut, later.BytesOut, elapsed),
|
|
PacketsIn: perSec(earlier.PacketsIn, later.PacketsIn, elapsed),
|
|
PacketsOut: perSec(earlier.PacketsOut, later.PacketsOut, elapsed),
|
|
RetransmitBytesOut: perSec(earlier.RetransmitBytesOut, later.RetransmitBytesOut, elapsed),
|
|
RetransmitPacketsOut: perSec(earlier.RetransmitPacketsOut, later.RetransmitPacketsOut, elapsed),
|
|
NackTotal: perSec(earlier.NackTotal, later.NackTotal, elapsed),
|
|
ParticipantSignalConnected: perSec(earlier.ParticipantSignalConnected, later.ParticipantSignalConnected, elapsed),
|
|
ParticipantRtcInit: perSec(earlier.ParticipantRtcInit, later.ParticipantRtcInit, elapsed),
|
|
ParticipantRtcConnected: perSec(earlier.ParticipantRtcConnected, later.ParticipantRtcConnected, elapsed),
|
|
ParticipantRtcCanceled: perSec(earlier.ParticipantRtcCanceled, later.ParticipantRtcCanceled, elapsed),
|
|
SysPacketsOut: perSec(uint64(earlier.SysPacketsOut), uint64(later.SysPacketsOut), elapsed),
|
|
SysPacketsDropped: perSec(uint64(earlier.SysPacketsDropped), uint64(later.SysPacketsDropped), elapsed),
|
|
TrackPublishAttempts: perSec(uint64(earlier.NumTrackPublishAttempts), uint64(later.NumTrackPublishAttempts), elapsed),
|
|
TrackPublishSuccess: perSec(uint64(earlier.NumTrackPublishSuccess), uint64(later.NumTrackPublishSuccess), elapsed),
|
|
TrackPublishCancels: perSec(uint64(earlier.NumTrackPublishCancels), uint64(later.NumTrackPublishCancels), elapsed),
|
|
TrackSubscribeAttempts: perSec(uint64(earlier.NumTrackSubscribeAttempts), uint64(later.NumTrackSubscribeAttempts), elapsed),
|
|
TrackSubscribeSuccess: perSec(uint64(earlier.NumTrackSubscribeSuccess), uint64(later.NumTrackSubscribeSuccess), elapsed),
|
|
TrackSubscribeCancels: perSec(uint64(earlier.NumTrackSubscribeCancels), uint64(later.NumTrackSubscribeCancels), elapsed),
|
|
CpuLoad: cpuLoad / float32(elapsed),
|
|
MemoryLoad: memoryLoad / float32(elapsed),
|
|
MemoryUsed: memoryUsed / float32(elapsed),
|
|
MemoryTotal: memoryTotal / float32(elapsed),
|
|
}
|
|
return rate
|
|
}
|
|
|
|
func perSec(prev, curr uint64, secs int64) float32 {
|
|
return float32(curr-prev) / float32(secs)
|
|
}
|
|
|
|
func RecordSignalRequestSuccess() {
|
|
promMessageCounter.WithLabelValues("signal", "success", "request").Add(1)
|
|
}
|
|
|
|
func RecordSignalRequestFailure() {
|
|
promMessageCounter.WithLabelValues("signal", "failure", "request").Add(1)
|
|
}
|
|
|
|
func RecordSignalResponseSuccess() {
|
|
promMessageCounter.WithLabelValues("signal", "success", "response").Add(1)
|
|
}
|
|
|
|
func RecordSignalResponseFailure() {
|
|
promMessageCounter.WithLabelValues("signal", "failure", "response").Add(1)
|
|
}
|
|
|
|
func RecordServiceOperationSuccess(op string) {
|
|
promServiceOperationCounter.WithLabelValues(op, "success", "").Add(1)
|
|
}
|
|
|
|
func RecordServiceOperationError(op string, error string) {
|
|
promServiceOperationCounter.WithLabelValues(op, "error", error).Add(1)
|
|
}
|
|
|
|
func RecordTwirpRequestStatus(service string, method string, statusFamily string, code twirp.ErrorCode) {
|
|
promTwirpRequestStatusCounter.WithLabelValues(service, method, statusFamily, string(code)).Add(1)
|
|
}
|