Count request/response packets on both client and server side. (#4001)

Currently, the signal requests are counted on media side and signal
responses are counted on controller side. This does not provide the
granularity to check how many response messages each media node is
sending.

Seeing some cases where track subscriptions are slow under load. This
would be good to see if the media node is doing a lot of signal response
messages.
This commit is contained in:
Raja Subramanian
2025-10-14 16:58:36 +05:30
committed by GitHub
parent dd62eb0072
commit ca0d5ee972
8 changed files with 72 additions and 52 deletions
+1 -1
View File
@@ -95,7 +95,7 @@ type NullMessageSource struct {
func NewNullMessageSource(connID livekit.ConnectionID) *NullMessageSource {
return &NullMessageSource{
connID: connID,
msgChan: make(chan proto.Message, 0),
msgChan: make(chan proto.Message),
}
}
+9 -5
View File
@@ -102,14 +102,14 @@ func (r *signalClient) StartParticipantSignal(
stream, err := r.client.RelaySignal(ctx, nodeID)
if err != nil {
prometheus.MessageCounter.WithLabelValues("signal", "failure").Add(1)
prometheus.RecordSignalRequestFailure()
return
}
err = stream.Send(&rpc.RelaySignalRequest{StartSession: ss})
if err != nil {
stream.Close(err)
prometheus.MessageCounter.WithLabelValues("signal", "failure").Add(1)
prometheus.RecordSignalRequestFailure()
return
}
@@ -133,6 +133,8 @@ func (r *signalClient) StartParticipantSignal(
resChan,
signalResponseMessageReader{},
r.config,
prometheus.RecordSignalRequestSuccess,
prometheus.RecordSignalRequestFailure,
)
l.Debugw("signal stream closed", "error", err)
@@ -191,6 +193,8 @@ func CopySignalStreamToMessageChannel[SendType, RecvType RelaySignalMessage](
ch *MessageChannel,
reader SignalMessageReader[RecvType],
config config.SignalRelayConfig,
promSignalSuccess func(),
promSignalFailure func(),
) error {
r := &signalMessageReader[SendType, RecvType]{
reader: reader,
@@ -199,16 +203,16 @@ func CopySignalStreamToMessageChannel[SendType, RecvType RelaySignalMessage](
for msg := range stream.Channel() {
res, err := r.Read(msg)
if err != nil {
prometheus.MessageCounter.WithLabelValues("signal", "failure").Add(1)
promSignalFailure()
return err
}
for _, r := range res {
if err = ch.WriteMessage(r); err != nil {
prometheus.MessageCounter.WithLabelValues("signal", "failure").Add(1)
promSignalFailure()
return err
}
prometheus.MessageCounter.WithLabelValues("signal", "success").Add(1)
promSignalSuccess()
}
if msg.GetClose() {
+2 -2
View File
@@ -595,7 +595,7 @@ func (r *Room) Join(
joinResponse := r.createJoinResponseLocked(participant, iceServers)
if err := participant.SendJoinResponse(joinResponse); err != nil {
prometheus.ServiceOperationCounter.WithLabelValues("participant_join", "error", "send_response").Add(1)
prometheus.RecordServiceOperationError("participant_join", "send_response")
return err
}
@@ -617,7 +617,7 @@ func (r *Room) Join(
}
}
prometheus.ServiceOperationCounter.WithLabelValues("participant_join", "success", "").Add(1)
prometheus.RecordServiceOperationSuccess("participant_join")
return nil
}
+13 -13
View File
@@ -756,7 +756,7 @@ func (t *PCTransport) setConnectedAt(at time.Time) bool {
}
t.firstConnectedAt = at
prometheus.ServiceOperationCounter.WithLabelValues("peer_connection", "success", "").Add(1)
prometheus.RecordServiceOperationSuccess("peer_connection")
t.lock.Unlock()
return true
}
@@ -2472,7 +2472,7 @@ func (t *PCTransport) createAndSendOffer(options *webrtc.OfferOptions) error {
return nil
}
prometheus.ServiceOperationCounter.WithLabelValues("offer", "error", "create").Add(1)
prometheus.RecordServiceOperationError("offer", "create")
return errors.Wrap(err, "create offer failed")
}
@@ -2488,7 +2488,7 @@ func (t *PCTransport) createAndSendOffer(options *webrtc.OfferOptions) error {
return nil
}
prometheus.ServiceOperationCounter.WithLabelValues("offer", "error", "local_description").Add(1)
prometheus.RecordServiceOperationError("offer", "local_description")
return errors.Wrap(err, "setting local description failed")
}
@@ -2518,10 +2518,10 @@ func (t *PCTransport) createAndSendOffer(options *webrtc.OfferOptions) error {
}
if err := t.params.Handler.OnOffer(offer, t.localOfferId.Inc()); err != nil {
prometheus.ServiceOperationCounter.WithLabelValues("offer", "error", "write_message").Add(1)
prometheus.RecordServiceOperationError("offer", "write_message")
return errors.Wrap(err, "could not send offer")
}
prometheus.ServiceOperationCounter.WithLabelValues("offer", "success", "").Add(1)
prometheus.RecordServiceOperationSuccess("offer")
return t.localDescriptionSent()
}
@@ -2581,7 +2581,7 @@ func (t *PCTransport) setRemoteDescription(sd webrtc.SessionDescription) error {
if sd.Type == webrtc.SDPTypeAnswer {
sdpType = "answer"
}
prometheus.ServiceOperationCounter.WithLabelValues(sdpType, "error", "remote_description").Add(1)
prometheus.RecordServiceOperationError(sdpType, "remote_description")
return errors.Wrap(err, "setting remote description failed")
} else if sd.Type == webrtc.SDPTypeAnswer {
t.lock.Lock()
@@ -2619,7 +2619,7 @@ func (t *PCTransport) createAndSendAnswer() error {
return nil
}
prometheus.ServiceOperationCounter.WithLabelValues("answer", "error", "create").Add(1)
prometheus.RecordServiceOperationError("answer", "create")
return errors.Wrap(err, "create answer failed")
}
@@ -2629,7 +2629,7 @@ func (t *PCTransport) createAndSendAnswer() error {
}
if err = t.pc.SetLocalDescription(answer); err != nil {
prometheus.ServiceOperationCounter.WithLabelValues("answer", "error", "local_description").Add(1)
prometheus.RecordServiceOperationError("answer", "local_description")
return errors.Wrap(err, "setting local description failed")
}
@@ -2655,11 +2655,11 @@ func (t *PCTransport) createAndSendAnswer() error {
answerId := t.remoteOfferId.Load()
if err := t.params.Handler.OnAnswer(answer, answerId); err != nil {
prometheus.ServiceOperationCounter.WithLabelValues("answer", "error", "write_message").Add(1)
prometheus.RecordServiceOperationError("answer", "write_message")
return errors.Wrap(err, "could not send answer")
}
t.localAnswerId.Store(answerId)
prometheus.ServiceOperationCounter.WithLabelValues("answer", "success", "").Add(1)
prometheus.RecordServiceOperationSuccess("asnwer")
if err := t.sendUnmatchedMediaRequirement(false); err != nil {
return err
@@ -2838,9 +2838,9 @@ func (t *PCTransport) doICERestart() error {
err := t.params.Handler.OnOffer(*offer, t.localOfferId.Inc())
if err != nil {
prometheus.ServiceOperationCounter.WithLabelValues("offer", "error", "write_message").Add(1)
prometheus.RecordServiceOperationError("offer", "write_message")
} else {
prometheus.ServiceOperationCounter.WithLabelValues("offer", "success", "").Add(1)
prometheus.RecordServiceOperationSuccess("offer")
}
return err
}
@@ -2848,7 +2848,7 @@ func (t *PCTransport) doICERestart() error {
// recover by re-applying the last answer
t.params.Logger.Infow("recovering from client negotiation state on ICE restart")
if err := t.pc.SetRemoteDescription(*currentRemoteDescription); err != nil {
prometheus.ServiceOperationCounter.WithLabelValues("offer", "error", "remote_description").Add(1)
prometheus.RecordServiceOperationError("offer", "remote_description")
return errors.Wrap(err, "set remote description failed")
} else {
t.setNegotiationState(transport.NegotiationStateNone)
+2 -4
View File
@@ -229,10 +229,8 @@ func (s *LocalStore) ListAgentDispatches(ctx context.Context, roomName livekit.R
agentJobs := s.agentJobs[roomName]
var js []*livekit.Job
if agentJobs != nil {
for _, j := range agentJobs {
js = append(js, utils.CloneProto(j))
}
for _, j := range agentJobs {
js = append(js, utils.CloneProto(j))
}
var ds []*livekit.AgentDispatch
+2
View File
@@ -168,6 +168,8 @@ func (r *signalService) RelaySignal(stream psrpc.ServerStream[*rpc.RelaySignalRe
reqChan,
signalRequestMessageReader{},
r.config,
prometheus.RecordSignalResponseSuccess,
prometheus.RecordSignalResponseFailure,
)
l.Debugw("signal stream closed", "error", err)
+1 -1
View File
@@ -203,7 +203,7 @@ func statusReporterResponseSent(ctx context.Context) {
code = r.error.Code()
}
prometheus.TwirpRequestStatusCounter.WithLabelValues(r.service, r.method, statusFamily, string(code)).Add(1)
prometheus.RecordTwirpRequestStatus(r.service, r.method, statusFamily, code)
}
func statusReporterErrorReceived(ctx context.Context, e twirp.Error) context.Context {
+42 -26
View File
@@ -18,6 +18,7 @@ import (
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/twitchtv/twirp"
"go.uber.org/atomic"
"github.com/livekit/protocol/livekit"
@@ -33,15 +34,13 @@ const (
var (
initialized atomic.Bool
MessageCounter *prometheus.CounterVec
MessageBytes *prometheus.CounterVec
ServiceOperationCounter *prometheus.CounterVec
TwirpRequestStatusCounter *prometheus.CounterVec
promMessageCounter *prometheus.CounterVec
promServiceOperationCounter *prometheus.CounterVec
promTwirpRequestStatusCounter *prometheus.CounterVec
sysPacketsStart uint32
sysDroppedPacketsStart uint32
promSysPacketGauge *prometheus.GaugeVec
promSysDroppedPacketPctGauge prometheus.Gauge
sysPacketsStart uint32
sysDroppedPacketsStart uint32
promSysPacketGauge *prometheus.GaugeVec
cpuStats *hwstats.CPUStats
memoryStats *hwstats.MemoryStats
@@ -52,27 +51,17 @@ func Init(nodeID string, nodeType livekit.NodeType) error {
return nil
}
MessageCounter = prometheus.NewCounterVec(
promMessageCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: livekitNamespace,
Subsystem: "node",
Name: "messages",
ConstLabels: prometheus.Labels{"node_id": nodeID, "node_type": nodeType.String()},
},
[]string{"type", "status"},
[]string{"type", "status", "direction"},
)
MessageBytes = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: livekitNamespace,
Subsystem: "node",
Name: "message_bytes",
ConstLabels: prometheus.Labels{"node_id": nodeID, "node_type": nodeType.String()},
},
[]string{"type", "message_type"},
)
ServiceOperationCounter = prometheus.NewCounterVec(
promServiceOperationCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: livekitNamespace,
Subsystem: "node",
@@ -82,7 +71,7 @@ func Init(nodeID string, nodeType livekit.NodeType) error {
[]string{"type", "status", "error_type"},
)
TwirpRequestStatusCounter = prometheus.NewCounterVec(
promTwirpRequestStatusCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: livekitNamespace,
Subsystem: "node",
@@ -103,10 +92,9 @@ func Init(nodeID string, nodeType livekit.NodeType) error {
[]string{"type"},
)
prometheus.MustRegister(MessageCounter)
prometheus.MustRegister(MessageBytes)
prometheus.MustRegister(ServiceOperationCounter)
prometheus.MustRegister(TwirpRequestStatusCounter)
prometheus.MustRegister(promMessageCounter)
prometheus.MustRegister(promServiceOperationCounter)
prometheus.MustRegister(promTwirpRequestStatusCounter)
prometheus.MustRegister(promSysPacketGauge)
sysPacketsStart, sysDroppedPacketsStart, _ = getTCStats()
@@ -264,3 +252,31 @@ func getNodeStatsRate(statsHistory []*livekit.NodeStats) *livekit.NodeStatsRate
func perSec(prev, curr uint64, secs int64) float32 {
return float32(curr-prev) / float32(secs)
}
func RecordSignalRequestSuccess() {
promMessageCounter.WithLabelValues("signal", "success", "request").Add(1)
}
func RecordSignalRequestFailure() {
promMessageCounter.WithLabelValues("signal", "failure", "request").Add(1)
}
func RecordSignalResponseSuccess() {
promMessageCounter.WithLabelValues("signal", "success", "response").Add(1)
}
func RecordSignalResponseFailure() {
promMessageCounter.WithLabelValues("signal", "failure", "response").Add(1)
}
func RecordServiceOperationSuccess(op string) {
promServiceOperationCounter.WithLabelValues(op, "success", "").Add(1)
}
func RecordServiceOperationError(op string, error string) {
promServiceOperationCounter.WithLabelValues(op, "error", error).Add(1)
}
func RecordTwirpRequestStatus(service string, method string, statusFamily string, code twirp.ErrorCode) {
promTwirpRequestStatusCounter.WithLabelValues(service, method, statusFamily, string(code)).Add(1)
}