diff --git a/pkg/routing/redisrouter.go b/pkg/routing/redisrouter.go index 51486e1eb..def478c35 100644 --- a/pkg/routing/redisrouter.go +++ b/pkg/routing/redisrouter.go @@ -344,22 +344,28 @@ func (r *RedisRouter) redisWorker(startedChan chan struct{}) { sm := livekit.SignalNodeMessage{} if err := proto.Unmarshal([]byte(msg.Payload), &sm); err != nil { logger.Errorw("could not unmarshal signal message on sigchan", err) + stats.PromMessageCounter.WithLabelValues("signal", "failure").Add(1) continue } if err := r.handleSignalMessage(&sm); err != nil { logger.Errorw("error processing signal message", err) + stats.PromMessageCounter.WithLabelValues("signal", "failure").Add(1) continue } + stats.PromMessageCounter.WithLabelValues("signal", "success").Add(1) } else if msg.Channel == rtcChannel { rm := livekit.RTCNodeMessage{} if err := proto.Unmarshal([]byte(msg.Payload), &rm); err != nil { logger.Errorw("could not unmarshal RTC message on rtcchan", err) + stats.PromMessageCounter.WithLabelValues("rtc", "failure").Add(1) continue } if err := r.handleRTCMessage(&rm); err != nil { logger.Errorw("error processing RTC message", err) + stats.PromMessageCounter.WithLabelValues("rtc", "failure").Add(1) continue } + stats.PromMessageCounter.WithLabelValues("rtc", "success").Add(1) } } } diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index 90a9e9404..1a6d91519 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -119,7 +119,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { done := make(chan struct{}) // function exits when websocket terminates, it'll close the event reading off of response sink as well defer func() { - logger.Infow("WS connection closed", "participant", pi.Identity, "connID", connId) + logger.Infow("server closing WS connection", "participant", pi.Identity, "connID", connId) reqSink.Close() close(done) }() diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index ff419d8ce..06ea1b929 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -1,7 +1,8 @@ // Code generated by Wire. DO NOT EDIT. //go:generate go run github.com/google/wire/cmd/wire -//+build !wireinject +//go:build !wireinject +// +build !wireinject package service diff --git a/pkg/utils/stats/packetstats.go b/pkg/utils/stats/packetstats.go index 3dd3106e9..ae4971c84 100644 --- a/pkg/utils/stats/packetstats.go +++ b/pkg/utils/stats/packetstats.go @@ -15,31 +15,33 @@ var ( atomicPacketsOut uint64 atomicNackTotal uint64 + promPacketLabels = []string{"direction"} + promPacketTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: livekitNamespace, Subsystem: "packet", Name: "total", - }, promLabels) + }, promPacketLabels) promPacketBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: livekitNamespace, Subsystem: "packet", Name: "bytes", - }, promLabels) + }, promPacketLabels) promNackTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: livekitNamespace, Subsystem: "nack", Name: "total", - }, promLabels) + }, promPacketLabels) promPliTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: livekitNamespace, Subsystem: "pli", Name: "total", - }, promLabels) + }, promPacketLabels) promFirTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: livekitNamespace, Subsystem: "fir", Name: "total", - }, promLabels) + }, promPacketLabels) ) func initPacketStats() { diff --git a/pkg/utils/stats/stats.go b/pkg/utils/stats/stats.go index 1d96af2b5..75c18e918 100644 --- a/pkg/utils/stats/stats.go +++ b/pkg/utils/stats/stats.go @@ -9,15 +9,25 @@ import ( "github.com/pion/rtcp" "github.com/pion/rtp" "github.com/pion/transport/packetio" + "github.com/prometheus/client_golang/prometheus" ) const livekitNamespace = "livekit" var ( - promLabels = []string{"direction"} + PromMessageCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: livekitNamespace, + Subsystem: "node", + Name: "messages", + }, + []string{"type", "status"}, + ) ) func init() { + prometheus.MustRegister(PromMessageCounter) + initPacketStats() initRoomStatsReporter() }