From ca0d5ee972c659c5dc460f9d3669b5aae35c75fb Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Tue, 14 Oct 2025 16:58:36 +0530 Subject: [PATCH] Count request/response packets on both client and server side. (#4001) Currently, the signal requests are counted on media side and signal responses are counted on controller side. This does not provide the granularity to check how many response messages each media node is sending. Seeing some cases where track subscriptions are slow under load. This would be good to see if the media node is doing a lot of signal response messages. --- pkg/routing/interfaces.go | 2 +- pkg/routing/signal.go | 14 ++++--- pkg/rtc/room.go | 4 +- pkg/rtc/transport.go | 26 ++++++------ pkg/service/localstore.go | 6 +-- pkg/service/signal.go | 2 + pkg/service/twirp.go | 2 +- pkg/telemetry/prometheus/node.go | 68 ++++++++++++++++++++------------ 8 files changed, 72 insertions(+), 52 deletions(-) diff --git a/pkg/routing/interfaces.go b/pkg/routing/interfaces.go index 8355eed3d..04565b321 100644 --- a/pkg/routing/interfaces.go +++ b/pkg/routing/interfaces.go @@ -95,7 +95,7 @@ type NullMessageSource struct { func NewNullMessageSource(connID livekit.ConnectionID) *NullMessageSource { return &NullMessageSource{ connID: connID, - msgChan: make(chan proto.Message, 0), + msgChan: make(chan proto.Message), } } diff --git a/pkg/routing/signal.go b/pkg/routing/signal.go index 3f296b931..4aaa9a877 100644 --- a/pkg/routing/signal.go +++ b/pkg/routing/signal.go @@ -102,14 +102,14 @@ func (r *signalClient) StartParticipantSignal( stream, err := r.client.RelaySignal(ctx, nodeID) if err != nil { - prometheus.MessageCounter.WithLabelValues("signal", "failure").Add(1) + prometheus.RecordSignalRequestFailure() return } err = stream.Send(&rpc.RelaySignalRequest{StartSession: ss}) if err != nil { stream.Close(err) - prometheus.MessageCounter.WithLabelValues("signal", "failure").Add(1) + prometheus.RecordSignalRequestFailure() return } @@ -133,6 +133,8 @@ func (r *signalClient) StartParticipantSignal( resChan, signalResponseMessageReader{}, r.config, + prometheus.RecordSignalRequestSuccess, + prometheus.RecordSignalRequestFailure, ) l.Debugw("signal stream closed", "error", err) @@ -191,6 +193,8 @@ func CopySignalStreamToMessageChannel[SendType, RecvType RelaySignalMessage]( ch *MessageChannel, reader SignalMessageReader[RecvType], config config.SignalRelayConfig, + promSignalSuccess func(), + promSignalFailure func(), ) error { r := &signalMessageReader[SendType, RecvType]{ reader: reader, @@ -199,16 +203,16 @@ func CopySignalStreamToMessageChannel[SendType, RecvType RelaySignalMessage]( for msg := range stream.Channel() { res, err := r.Read(msg) if err != nil { - prometheus.MessageCounter.WithLabelValues("signal", "failure").Add(1) + promSignalFailure() return err } for _, r := range res { if err = ch.WriteMessage(r); err != nil { - prometheus.MessageCounter.WithLabelValues("signal", "failure").Add(1) + promSignalFailure() return err } - prometheus.MessageCounter.WithLabelValues("signal", "success").Add(1) + promSignalSuccess() } if msg.GetClose() { diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 0d834e409..ab8f7ebbd 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -595,7 +595,7 @@ func (r *Room) Join( joinResponse := r.createJoinResponseLocked(participant, iceServers) if err := participant.SendJoinResponse(joinResponse); err != nil { - prometheus.ServiceOperationCounter.WithLabelValues("participant_join", "error", "send_response").Add(1) + prometheus.RecordServiceOperationError("participant_join", "send_response") return err } @@ -617,7 +617,7 @@ func (r *Room) Join( } } - prometheus.ServiceOperationCounter.WithLabelValues("participant_join", "success", "").Add(1) + prometheus.RecordServiceOperationSuccess("participant_join") return nil } diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index d585b8ade..3fdbe0f12 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -756,7 +756,7 @@ func (t *PCTransport) setConnectedAt(at time.Time) bool { } t.firstConnectedAt = at - prometheus.ServiceOperationCounter.WithLabelValues("peer_connection", "success", "").Add(1) + prometheus.RecordServiceOperationSuccess("peer_connection") t.lock.Unlock() return true } @@ -2472,7 +2472,7 @@ func (t *PCTransport) createAndSendOffer(options *webrtc.OfferOptions) error { return nil } - prometheus.ServiceOperationCounter.WithLabelValues("offer", "error", "create").Add(1) + prometheus.RecordServiceOperationError("offer", "create") return errors.Wrap(err, "create offer failed") } @@ -2488,7 +2488,7 @@ func (t *PCTransport) createAndSendOffer(options *webrtc.OfferOptions) error { return nil } - prometheus.ServiceOperationCounter.WithLabelValues("offer", "error", "local_description").Add(1) + prometheus.RecordServiceOperationError("offer", "local_description") return errors.Wrap(err, "setting local description failed") } @@ -2518,10 +2518,10 @@ func (t *PCTransport) createAndSendOffer(options *webrtc.OfferOptions) error { } if err := t.params.Handler.OnOffer(offer, t.localOfferId.Inc()); err != nil { - prometheus.ServiceOperationCounter.WithLabelValues("offer", "error", "write_message").Add(1) + prometheus.RecordServiceOperationError("offer", "write_message") return errors.Wrap(err, "could not send offer") } - prometheus.ServiceOperationCounter.WithLabelValues("offer", "success", "").Add(1) + prometheus.RecordServiceOperationSuccess("offer") return t.localDescriptionSent() } @@ -2581,7 +2581,7 @@ func (t *PCTransport) setRemoteDescription(sd webrtc.SessionDescription) error { if sd.Type == webrtc.SDPTypeAnswer { sdpType = "answer" } - prometheus.ServiceOperationCounter.WithLabelValues(sdpType, "error", "remote_description").Add(1) + prometheus.RecordServiceOperationError(sdpType, "remote_description") return errors.Wrap(err, "setting remote description failed") } else if sd.Type == webrtc.SDPTypeAnswer { t.lock.Lock() @@ -2619,7 +2619,7 @@ func (t *PCTransport) createAndSendAnswer() error { return nil } - prometheus.ServiceOperationCounter.WithLabelValues("answer", "error", "create").Add(1) + prometheus.RecordServiceOperationError("answer", "create") return errors.Wrap(err, "create answer failed") } @@ -2629,7 +2629,7 @@ func (t *PCTransport) createAndSendAnswer() error { } if err = t.pc.SetLocalDescription(answer); err != nil { - prometheus.ServiceOperationCounter.WithLabelValues("answer", "error", "local_description").Add(1) + prometheus.RecordServiceOperationError("answer", "local_description") return errors.Wrap(err, "setting local description failed") } @@ -2655,11 +2655,11 @@ func (t *PCTransport) createAndSendAnswer() error { answerId := t.remoteOfferId.Load() if err := t.params.Handler.OnAnswer(answer, answerId); err != nil { - prometheus.ServiceOperationCounter.WithLabelValues("answer", "error", "write_message").Add(1) + prometheus.RecordServiceOperationError("answer", "write_message") return errors.Wrap(err, "could not send answer") } t.localAnswerId.Store(answerId) - prometheus.ServiceOperationCounter.WithLabelValues("answer", "success", "").Add(1) + prometheus.RecordServiceOperationSuccess("asnwer") if err := t.sendUnmatchedMediaRequirement(false); err != nil { return err @@ -2838,9 +2838,9 @@ func (t *PCTransport) doICERestart() error { err := t.params.Handler.OnOffer(*offer, t.localOfferId.Inc()) if err != nil { - prometheus.ServiceOperationCounter.WithLabelValues("offer", "error", "write_message").Add(1) + prometheus.RecordServiceOperationError("offer", "write_message") } else { - prometheus.ServiceOperationCounter.WithLabelValues("offer", "success", "").Add(1) + prometheus.RecordServiceOperationSuccess("offer") } return err } @@ -2848,7 +2848,7 @@ func (t *PCTransport) doICERestart() error { // recover by re-applying the last answer t.params.Logger.Infow("recovering from client negotiation state on ICE restart") if err := t.pc.SetRemoteDescription(*currentRemoteDescription); err != nil { - prometheus.ServiceOperationCounter.WithLabelValues("offer", "error", "remote_description").Add(1) + prometheus.RecordServiceOperationError("offer", "remote_description") return errors.Wrap(err, "set remote description failed") } else { t.setNegotiationState(transport.NegotiationStateNone) diff --git a/pkg/service/localstore.go b/pkg/service/localstore.go index b6de883bd..64bd508c6 100644 --- a/pkg/service/localstore.go +++ b/pkg/service/localstore.go @@ -229,10 +229,8 @@ func (s *LocalStore) ListAgentDispatches(ctx context.Context, roomName livekit.R agentJobs := s.agentJobs[roomName] var js []*livekit.Job - if agentJobs != nil { - for _, j := range agentJobs { - js = append(js, utils.CloneProto(j)) - } + for _, j := range agentJobs { + js = append(js, utils.CloneProto(j)) } var ds []*livekit.AgentDispatch diff --git a/pkg/service/signal.go b/pkg/service/signal.go index 6000b2c36..f6c9316eb 100644 --- a/pkg/service/signal.go +++ b/pkg/service/signal.go @@ -168,6 +168,8 @@ func (r *signalService) RelaySignal(stream psrpc.ServerStream[*rpc.RelaySignalRe reqChan, signalRequestMessageReader{}, r.config, + prometheus.RecordSignalResponseSuccess, + prometheus.RecordSignalResponseFailure, ) l.Debugw("signal stream closed", "error", err) diff --git a/pkg/service/twirp.go b/pkg/service/twirp.go index c7ff5b96c..2226c64d3 100644 --- a/pkg/service/twirp.go +++ b/pkg/service/twirp.go @@ -203,7 +203,7 @@ func statusReporterResponseSent(ctx context.Context) { code = r.error.Code() } - prometheus.TwirpRequestStatusCounter.WithLabelValues(r.service, r.method, statusFamily, string(code)).Add(1) + prometheus.RecordTwirpRequestStatus(r.service, r.method, statusFamily, code) } func statusReporterErrorReceived(ctx context.Context, e twirp.Error) context.Context { diff --git a/pkg/telemetry/prometheus/node.go b/pkg/telemetry/prometheus/node.go index e455322b0..5668db3c4 100644 --- a/pkg/telemetry/prometheus/node.go +++ b/pkg/telemetry/prometheus/node.go @@ -18,6 +18,7 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" + "github.com/twitchtv/twirp" "go.uber.org/atomic" "github.com/livekit/protocol/livekit" @@ -33,15 +34,13 @@ const ( var ( initialized atomic.Bool - MessageCounter *prometheus.CounterVec - MessageBytes *prometheus.CounterVec - ServiceOperationCounter *prometheus.CounterVec - TwirpRequestStatusCounter *prometheus.CounterVec + promMessageCounter *prometheus.CounterVec + promServiceOperationCounter *prometheus.CounterVec + promTwirpRequestStatusCounter *prometheus.CounterVec - sysPacketsStart uint32 - sysDroppedPacketsStart uint32 - promSysPacketGauge *prometheus.GaugeVec - promSysDroppedPacketPctGauge prometheus.Gauge + sysPacketsStart uint32 + sysDroppedPacketsStart uint32 + promSysPacketGauge *prometheus.GaugeVec cpuStats *hwstats.CPUStats memoryStats *hwstats.MemoryStats @@ -52,27 +51,17 @@ func Init(nodeID string, nodeType livekit.NodeType) error { return nil } - MessageCounter = prometheus.NewCounterVec( + promMessageCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: livekitNamespace, Subsystem: "node", Name: "messages", ConstLabels: prometheus.Labels{"node_id": nodeID, "node_type": nodeType.String()}, }, - []string{"type", "status"}, + []string{"type", "status", "direction"}, ) - MessageBytes = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: livekitNamespace, - Subsystem: "node", - Name: "message_bytes", - ConstLabels: prometheus.Labels{"node_id": nodeID, "node_type": nodeType.String()}, - }, - []string{"type", "message_type"}, - ) - - ServiceOperationCounter = prometheus.NewCounterVec( + promServiceOperationCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: livekitNamespace, Subsystem: "node", @@ -82,7 +71,7 @@ func Init(nodeID string, nodeType livekit.NodeType) error { []string{"type", "status", "error_type"}, ) - TwirpRequestStatusCounter = prometheus.NewCounterVec( + promTwirpRequestStatusCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: livekitNamespace, Subsystem: "node", @@ -103,10 +92,9 @@ func Init(nodeID string, nodeType livekit.NodeType) error { []string{"type"}, ) - prometheus.MustRegister(MessageCounter) - prometheus.MustRegister(MessageBytes) - prometheus.MustRegister(ServiceOperationCounter) - prometheus.MustRegister(TwirpRequestStatusCounter) + prometheus.MustRegister(promMessageCounter) + prometheus.MustRegister(promServiceOperationCounter) + prometheus.MustRegister(promTwirpRequestStatusCounter) prometheus.MustRegister(promSysPacketGauge) sysPacketsStart, sysDroppedPacketsStart, _ = getTCStats() @@ -264,3 +252,31 @@ func getNodeStatsRate(statsHistory []*livekit.NodeStats) *livekit.NodeStatsRate func perSec(prev, curr uint64, secs int64) float32 { return float32(curr-prev) / float32(secs) } + +func RecordSignalRequestSuccess() { + promMessageCounter.WithLabelValues("signal", "success", "request").Add(1) +} + +func RecordSignalRequestFailure() { + promMessageCounter.WithLabelValues("signal", "failure", "request").Add(1) +} + +func RecordSignalResponseSuccess() { + promMessageCounter.WithLabelValues("signal", "success", "response").Add(1) +} + +func RecordSignalResponseFailure() { + promMessageCounter.WithLabelValues("signal", "failure", "response").Add(1) +} + +func RecordServiceOperationSuccess(op string) { + promServiceOperationCounter.WithLabelValues(op, "success", "").Add(1) +} + +func RecordServiceOperationError(op string, error string) { + promServiceOperationCounter.WithLabelValues(op, "error", error).Add(1) +} + +func RecordTwirpRequestStatus(service string, method string, statusFamily string, code twirp.ErrorCode) { + promTwirpRequestStatusCounter.WithLabelValues(service, method, statusFamily, string(code)).Add(1) +}