diff --git a/pkg/routing/interfaces.go b/pkg/routing/interfaces.go index 8355eed3d..04565b321 100644 --- a/pkg/routing/interfaces.go +++ b/pkg/routing/interfaces.go @@ -95,7 +95,7 @@ type NullMessageSource struct { func NewNullMessageSource(connID livekit.ConnectionID) *NullMessageSource { return &NullMessageSource{ connID: connID, - msgChan: make(chan proto.Message, 0), + msgChan: make(chan proto.Message), } } diff --git a/pkg/routing/signal.go b/pkg/routing/signal.go index 3f296b931..4aaa9a877 100644 --- a/pkg/routing/signal.go +++ b/pkg/routing/signal.go @@ -102,14 +102,14 @@ func (r *signalClient) StartParticipantSignal( stream, err := r.client.RelaySignal(ctx, nodeID) if err != nil { - prometheus.MessageCounter.WithLabelValues("signal", "failure").Add(1) + prometheus.RecordSignalRequestFailure() return } err = stream.Send(&rpc.RelaySignalRequest{StartSession: ss}) if err != nil { stream.Close(err) - prometheus.MessageCounter.WithLabelValues("signal", "failure").Add(1) + prometheus.RecordSignalRequestFailure() return } @@ -133,6 +133,8 @@ func (r *signalClient) StartParticipantSignal( resChan, signalResponseMessageReader{}, r.config, + prometheus.RecordSignalRequestSuccess, + prometheus.RecordSignalRequestFailure, ) l.Debugw("signal stream closed", "error", err) @@ -191,6 +193,8 @@ func CopySignalStreamToMessageChannel[SendType, RecvType RelaySignalMessage]( ch *MessageChannel, reader SignalMessageReader[RecvType], config config.SignalRelayConfig, + promSignalSuccess func(), + promSignalFailure func(), ) error { r := &signalMessageReader[SendType, RecvType]{ reader: reader, @@ -199,16 +203,16 @@ func CopySignalStreamToMessageChannel[SendType, RecvType RelaySignalMessage]( for msg := range stream.Channel() { res, err := r.Read(msg) if err != nil { - prometheus.MessageCounter.WithLabelValues("signal", "failure").Add(1) + promSignalFailure() return err } for _, r := range res { if err = ch.WriteMessage(r); err != nil { - prometheus.MessageCounter.WithLabelValues("signal", "failure").Add(1) + promSignalFailure() return err } - prometheus.MessageCounter.WithLabelValues("signal", "success").Add(1) + promSignalSuccess() } if msg.GetClose() { diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 0d834e409..ab8f7ebbd 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -595,7 +595,7 @@ func (r *Room) Join( joinResponse := r.createJoinResponseLocked(participant, iceServers) if err := participant.SendJoinResponse(joinResponse); err != nil { - prometheus.ServiceOperationCounter.WithLabelValues("participant_join", "error", "send_response").Add(1) + prometheus.RecordServiceOperationError("participant_join", "send_response") return err } @@ -617,7 +617,7 @@ func (r *Room) Join( } } - prometheus.ServiceOperationCounter.WithLabelValues("participant_join", "success", "").Add(1) + prometheus.RecordServiceOperationSuccess("participant_join") return nil } diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index d585b8ade..3fdbe0f12 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -756,7 +756,7 @@ func (t *PCTransport) setConnectedAt(at time.Time) bool { } t.firstConnectedAt = at - prometheus.ServiceOperationCounter.WithLabelValues("peer_connection", "success", "").Add(1) + prometheus.RecordServiceOperationSuccess("peer_connection") t.lock.Unlock() return true } @@ -2472,7 +2472,7 @@ func (t *PCTransport) createAndSendOffer(options *webrtc.OfferOptions) error { return nil } - prometheus.ServiceOperationCounter.WithLabelValues("offer", "error", "create").Add(1) + prometheus.RecordServiceOperationError("offer", "create") return errors.Wrap(err, "create offer failed") } @@ -2488,7 +2488,7 @@ func (t *PCTransport) createAndSendOffer(options *webrtc.OfferOptions) error { return nil } - prometheus.ServiceOperationCounter.WithLabelValues("offer", "error", "local_description").Add(1) + prometheus.RecordServiceOperationError("offer", "local_description") return errors.Wrap(err, "setting local description failed") } @@ -2518,10 +2518,10 @@ func (t *PCTransport) createAndSendOffer(options *webrtc.OfferOptions) error { } if err := t.params.Handler.OnOffer(offer, t.localOfferId.Inc()); err != nil { - prometheus.ServiceOperationCounter.WithLabelValues("offer", "error", "write_message").Add(1) + prometheus.RecordServiceOperationError("offer", "write_message") return errors.Wrap(err, "could not send offer") } - prometheus.ServiceOperationCounter.WithLabelValues("offer", "success", "").Add(1) + prometheus.RecordServiceOperationSuccess("offer") return t.localDescriptionSent() } @@ -2581,7 +2581,7 @@ func (t *PCTransport) setRemoteDescription(sd webrtc.SessionDescription) error { if sd.Type == webrtc.SDPTypeAnswer { sdpType = "answer" } - prometheus.ServiceOperationCounter.WithLabelValues(sdpType, "error", "remote_description").Add(1) + prometheus.RecordServiceOperationError(sdpType, "remote_description") return errors.Wrap(err, "setting remote description failed") } else if sd.Type == webrtc.SDPTypeAnswer { t.lock.Lock() @@ -2619,7 +2619,7 @@ func (t *PCTransport) createAndSendAnswer() error { return nil } - prometheus.ServiceOperationCounter.WithLabelValues("answer", "error", "create").Add(1) + prometheus.RecordServiceOperationError("answer", "create") return errors.Wrap(err, "create answer failed") } @@ -2629,7 +2629,7 @@ func (t *PCTransport) createAndSendAnswer() error { } if err = t.pc.SetLocalDescription(answer); err != nil { - prometheus.ServiceOperationCounter.WithLabelValues("answer", "error", "local_description").Add(1) + prometheus.RecordServiceOperationError("answer", "local_description") return errors.Wrap(err, "setting local description failed") } @@ -2655,11 +2655,11 @@ func (t *PCTransport) createAndSendAnswer() error { answerId := t.remoteOfferId.Load() if err := t.params.Handler.OnAnswer(answer, answerId); err != nil { - prometheus.ServiceOperationCounter.WithLabelValues("answer", "error", "write_message").Add(1) + prometheus.RecordServiceOperationError("answer", "write_message") return errors.Wrap(err, "could not send answer") } t.localAnswerId.Store(answerId) - prometheus.ServiceOperationCounter.WithLabelValues("answer", "success", "").Add(1) + prometheus.RecordServiceOperationSuccess("asnwer") if err := t.sendUnmatchedMediaRequirement(false); err != nil { return err @@ -2838,9 +2838,9 @@ func (t *PCTransport) doICERestart() error { err := t.params.Handler.OnOffer(*offer, t.localOfferId.Inc()) if err != nil { - prometheus.ServiceOperationCounter.WithLabelValues("offer", "error", "write_message").Add(1) + prometheus.RecordServiceOperationError("offer", "write_message") } else { - prometheus.ServiceOperationCounter.WithLabelValues("offer", "success", "").Add(1) + prometheus.RecordServiceOperationSuccess("offer") } return err } @@ -2848,7 +2848,7 @@ func (t *PCTransport) doICERestart() error { // recover by re-applying the last answer t.params.Logger.Infow("recovering from client negotiation state on ICE restart") if err := t.pc.SetRemoteDescription(*currentRemoteDescription); err != nil { - prometheus.ServiceOperationCounter.WithLabelValues("offer", "error", "remote_description").Add(1) + prometheus.RecordServiceOperationError("offer", "remote_description") return errors.Wrap(err, "set remote description failed") } else { t.setNegotiationState(transport.NegotiationStateNone) diff --git a/pkg/service/localstore.go b/pkg/service/localstore.go index b6de883bd..64bd508c6 100644 --- a/pkg/service/localstore.go +++ b/pkg/service/localstore.go @@ -229,10 +229,8 @@ func (s *LocalStore) ListAgentDispatches(ctx context.Context, roomName livekit.R agentJobs := s.agentJobs[roomName] var js []*livekit.Job - if agentJobs != nil { - for _, j := range agentJobs { - js = append(js, utils.CloneProto(j)) - } + for _, j := range agentJobs { + js = append(js, utils.CloneProto(j)) } var ds []*livekit.AgentDispatch diff --git a/pkg/service/signal.go b/pkg/service/signal.go index 6000b2c36..f6c9316eb 100644 --- a/pkg/service/signal.go +++ b/pkg/service/signal.go @@ -168,6 +168,8 @@ func (r *signalService) RelaySignal(stream psrpc.ServerStream[*rpc.RelaySignalRe reqChan, signalRequestMessageReader{}, r.config, + prometheus.RecordSignalResponseSuccess, + prometheus.RecordSignalResponseFailure, ) l.Debugw("signal stream closed", "error", err) diff --git a/pkg/service/twirp.go b/pkg/service/twirp.go index c7ff5b96c..2226c64d3 100644 --- a/pkg/service/twirp.go +++ b/pkg/service/twirp.go @@ -203,7 +203,7 @@ func statusReporterResponseSent(ctx context.Context) { code = r.error.Code() } - prometheus.TwirpRequestStatusCounter.WithLabelValues(r.service, r.method, statusFamily, string(code)).Add(1) + prometheus.RecordTwirpRequestStatus(r.service, r.method, statusFamily, code) } func statusReporterErrorReceived(ctx context.Context, e twirp.Error) context.Context { diff --git a/pkg/telemetry/prometheus/node.go b/pkg/telemetry/prometheus/node.go index e455322b0..5668db3c4 100644 --- a/pkg/telemetry/prometheus/node.go +++ b/pkg/telemetry/prometheus/node.go @@ -18,6 +18,7 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" + "github.com/twitchtv/twirp" "go.uber.org/atomic" "github.com/livekit/protocol/livekit" @@ -33,15 +34,13 @@ const ( var ( initialized atomic.Bool - MessageCounter *prometheus.CounterVec - MessageBytes *prometheus.CounterVec - ServiceOperationCounter *prometheus.CounterVec - TwirpRequestStatusCounter *prometheus.CounterVec + promMessageCounter *prometheus.CounterVec + promServiceOperationCounter *prometheus.CounterVec + promTwirpRequestStatusCounter *prometheus.CounterVec - sysPacketsStart uint32 - sysDroppedPacketsStart uint32 - promSysPacketGauge *prometheus.GaugeVec - promSysDroppedPacketPctGauge prometheus.Gauge + sysPacketsStart uint32 + sysDroppedPacketsStart uint32 + promSysPacketGauge *prometheus.GaugeVec cpuStats *hwstats.CPUStats memoryStats *hwstats.MemoryStats @@ -52,27 +51,17 @@ func Init(nodeID string, nodeType livekit.NodeType) error { return nil } - MessageCounter = prometheus.NewCounterVec( + promMessageCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: livekitNamespace, Subsystem: "node", Name: "messages", ConstLabels: prometheus.Labels{"node_id": nodeID, "node_type": nodeType.String()}, }, - []string{"type", "status"}, + []string{"type", "status", "direction"}, ) - MessageBytes = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: livekitNamespace, - Subsystem: "node", - Name: "message_bytes", - ConstLabels: prometheus.Labels{"node_id": nodeID, "node_type": nodeType.String()}, - }, - []string{"type", "message_type"}, - ) - - ServiceOperationCounter = prometheus.NewCounterVec( + promServiceOperationCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: livekitNamespace, Subsystem: "node", @@ -82,7 +71,7 @@ func Init(nodeID string, nodeType livekit.NodeType) error { []string{"type", "status", "error_type"}, ) - TwirpRequestStatusCounter = prometheus.NewCounterVec( + promTwirpRequestStatusCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: livekitNamespace, Subsystem: "node", @@ -103,10 +92,9 @@ func Init(nodeID string, nodeType livekit.NodeType) error { []string{"type"}, ) - prometheus.MustRegister(MessageCounter) - prometheus.MustRegister(MessageBytes) - prometheus.MustRegister(ServiceOperationCounter) - prometheus.MustRegister(TwirpRequestStatusCounter) + prometheus.MustRegister(promMessageCounter) + prometheus.MustRegister(promServiceOperationCounter) + prometheus.MustRegister(promTwirpRequestStatusCounter) prometheus.MustRegister(promSysPacketGauge) sysPacketsStart, sysDroppedPacketsStart, _ = getTCStats() @@ -264,3 +252,31 @@ func getNodeStatsRate(statsHistory []*livekit.NodeStats) *livekit.NodeStatsRate func perSec(prev, curr uint64, secs int64) float32 { return float32(curr-prev) / float32(secs) } + +func RecordSignalRequestSuccess() { + promMessageCounter.WithLabelValues("signal", "success", "request").Add(1) +} + +func RecordSignalRequestFailure() { + promMessageCounter.WithLabelValues("signal", "failure", "request").Add(1) +} + +func RecordSignalResponseSuccess() { + promMessageCounter.WithLabelValues("signal", "success", "response").Add(1) +} + +func RecordSignalResponseFailure() { + promMessageCounter.WithLabelValues("signal", "failure", "response").Add(1) +} + +func RecordServiceOperationSuccess(op string) { + promServiceOperationCounter.WithLabelValues(op, "success", "").Add(1) +} + +func RecordServiceOperationError(op string, error string) { + promServiceOperationCounter.WithLabelValues(op, "error", error).Add(1) +} + +func RecordTwirpRequestStatus(service string, method string, statusFamily string, code twirp.ErrorCode) { + promTwirpRequestStatusCounter.WithLabelValues(service, method, statusFamily, string(code)).Add(1) +}