Prometheus counter for Signal/RTC messages (#139)

This commit is contained in:
Mathew Kamkar
2021-10-08 12:02:08 -07:00
committed by GitHub
parent 2c9ef2f6bb
commit b212fb9a9e
5 changed files with 27 additions and 8 deletions
+6
View File
@@ -344,22 +344,28 @@ func (r *RedisRouter) redisWorker(startedChan chan struct{}) {
sm := livekit.SignalNodeMessage{}
if err := proto.Unmarshal([]byte(msg.Payload), &sm); err != nil {
logger.Errorw("could not unmarshal signal message on sigchan", err)
stats.PromMessageCounter.WithLabelValues("signal", "failure").Add(1)
continue
}
if err := r.handleSignalMessage(&sm); err != nil {
logger.Errorw("error processing signal message", err)
stats.PromMessageCounter.WithLabelValues("signal", "failure").Add(1)
continue
}
stats.PromMessageCounter.WithLabelValues("signal", "success").Add(1)
} else if msg.Channel == rtcChannel {
rm := livekit.RTCNodeMessage{}
if err := proto.Unmarshal([]byte(msg.Payload), &rm); err != nil {
logger.Errorw("could not unmarshal RTC message on rtcchan", err)
stats.PromMessageCounter.WithLabelValues("rtc", "failure").Add(1)
continue
}
if err := r.handleRTCMessage(&rm); err != nil {
logger.Errorw("error processing RTC message", err)
stats.PromMessageCounter.WithLabelValues("rtc", "failure").Add(1)
continue
}
stats.PromMessageCounter.WithLabelValues("rtc", "success").Add(1)
}
}
}
+1 -1
View File
@@ -119,7 +119,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
done := make(chan struct{})
// function exits when websocket terminates, it'll close the event reading off of response sink as well
defer func() {
logger.Infow("WS connection closed", "participant", pi.Identity, "connID", connId)
logger.Infow("server closing WS connection", "participant", pi.Identity, "connID", connId)
reqSink.Close()
close(done)
}()
+2 -1
View File
@@ -1,7 +1,8 @@
// Code generated by Wire. DO NOT EDIT.
//go:generate go run github.com/google/wire/cmd/wire
//+build !wireinject
//go:build !wireinject
// +build !wireinject
package service
+7 -5
View File
@@ -15,31 +15,33 @@ var (
atomicPacketsOut uint64
atomicNackTotal uint64
promPacketLabels = []string{"direction"}
promPacketTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: livekitNamespace,
Subsystem: "packet",
Name: "total",
}, promLabels)
}, promPacketLabels)
promPacketBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: livekitNamespace,
Subsystem: "packet",
Name: "bytes",
}, promLabels)
}, promPacketLabels)
promNackTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: livekitNamespace,
Subsystem: "nack",
Name: "total",
}, promLabels)
}, promPacketLabels)
promPliTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: livekitNamespace,
Subsystem: "pli",
Name: "total",
}, promLabels)
}, promPacketLabels)
promFirTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: livekitNamespace,
Subsystem: "fir",
Name: "total",
}, promLabels)
}, promPacketLabels)
)
func initPacketStats() {
+11 -1
View File
@@ -9,15 +9,25 @@ import (
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/pion/transport/packetio"
"github.com/prometheus/client_golang/prometheus"
)
const livekitNamespace = "livekit"
var (
promLabels = []string{"direction"}
PromMessageCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: livekitNamespace,
Subsystem: "node",
Name: "messages",
},
[]string{"type", "status"},
)
)
func init() {
prometheus.MustRegister(PromMessageCounter)
initPacketStats()
initRoomStatsReporter()
}