diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 1e261f1da..dfc6d5267 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -277,16 +277,19 @@ func (p *ParticipantImpl) HandleOffer(sdp webrtc.SessionDescription) (answer web ) if err = p.publisher.SetRemoteDescription(sdp); err != nil { + stats.PromServiceOperationCounter.WithLabelValues("answer", "error", "remote_description").Add(1) return } answer, err = p.publisher.pc.CreateAnswer(nil) if err != nil { + stats.PromServiceOperationCounter.WithLabelValues("answer", "error", "create").Add(1) err = errors.Wrap(err, "could not create answer") return } if err = p.publisher.pc.SetLocalDescription(answer); err != nil { + stats.PromServiceOperationCounter.WithLabelValues("answer", "error", "local_description").Add(1) err = errors.Wrap(err, "could not set local description") return } @@ -301,12 +304,15 @@ func (p *ParticipantImpl) HandleOffer(sdp webrtc.SessionDescription) (answer web }, }) if err != nil { + stats.PromServiceOperationCounter.WithLabelValues("answer", "error", "write_message").Add(1) return } if p.State() == livekit.ParticipantInfo_JOINING { p.updateState(livekit.ParticipantInfo_JOINED) } + stats.PromServiceOperationCounter.WithLabelValues("answer", "success", "").Add(1) + return } @@ -775,11 +781,16 @@ func (p *ParticipantImpl) onOffer(offer webrtc.SessionDescription) { //"sdp", offer.SDP, ) - _ = p.writeMessage(&livekit.SignalResponse{ + err := p.writeMessage(&livekit.SignalResponse{ Message: &livekit.SignalResponse_Offer{ Offer: ToProtoSessionDescription(offer), }, }) + if err != nil { + stats.PromServiceOperationCounter.WithLabelValues("offer", "error", "write_message").Add(1) + } else { + stats.PromServiceOperationCounter.WithLabelValues("offer", "success", "").Add(1) + } } // when a new remoteTrack is created, creates a Track and adds it to room @@ -948,6 +959,7 @@ func (p *ParticipantImpl) handlePrimaryICEStateChange(state webrtc.ICEConnection // logger.Debugw("ICE connection state changed", "state", state.String(), // "participant", p.identity, "pID", p.ID()) if state == webrtc.ICEConnectionStateConnected { + stats.PromServiceOperationCounter.WithLabelValues("ice_connection", "success", "").Add(1) p.updateState(livekit.ParticipantInfo_ACTIVE) } else if state == webrtc.ICEConnectionStateFailed { // only close when failed, to allow clients opportunity to reconnect diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index afac66dff..4e5dc9cac 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -141,6 +141,7 @@ func (r *Room) LastLeftAt() int64 { func (r *Room) Join(participant types.Participant, opts *ParticipantOptions) error { if r.isClosed.Get() { + stats.PromServiceOperationCounter.WithLabelValues("participant_join", "error", "room_closed").Add(1) return ErrRoomClosed } @@ -148,10 +149,12 @@ func (r *Room) Join(participant types.Participant, opts *ParticipantOptions) err defer r.lock.Unlock() if r.participants[participant.Identity()] != nil { + stats.PromServiceOperationCounter.WithLabelValues("participant_join", "error", "already_joined").Add(1) return ErrAlreadyJoined } if r.Room.MaxParticipants > 0 && int(r.Room.MaxParticipants) == len(r.participants) { + stats.PromServiceOperationCounter.WithLabelValues("participant_join", "error", "max_exceeded").Add(1) return ErrMaxParticipantsExceeded } @@ -221,6 +224,7 @@ func (r *Room) Join(participant types.Participant, opts *ParticipantOptions) err }) if err := participant.SendJoinResponse(r.Room, otherParticipants, r.iceServers); err != nil { + stats.PromServiceOperationCounter.WithLabelValues("participant_join", "error", "send_response").Add(1) return err } @@ -229,6 +233,8 @@ func (r *Room) Join(participant types.Participant, opts *ParticipantOptions) err participant.Negotiate() } + stats.PromServiceOperationCounter.WithLabelValues("participant_join", "success", "").Add(1) + return nil } diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index 0abb6d6bc..a76a2e0e7 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -204,6 +204,7 @@ func (t *PCTransport) createAndSendOffer(options *webrtc.OfferOptions) error { if iceRestart && currentSD != nil { logger.Debugw("recovering from client negotiation state") if err := t.pc.SetRemoteDescription(*currentSD); err != nil { + stats.PromServiceOperationCounter.WithLabelValues("offer", "error", "remote_description").Add(1) return err } } else { @@ -218,12 +219,14 @@ func (t *PCTransport) createAndSendOffer(options *webrtc.OfferOptions) error { offer, err := t.pc.CreateOffer(options) if err != nil { + stats.PromServiceOperationCounter.WithLabelValues("offer", "error", "create").Add(1) logger.Errorw("could not create offer", err) return err } err = t.pc.SetLocalDescription(offer) if err != nil { + stats.PromServiceOperationCounter.WithLabelValues("offer", "error", "local_description").Add(1) logger.Errorw("could not set local description", err) return err } diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index 1a6d91519..00557ad38 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -16,6 +16,7 @@ import ( "github.com/livekit/livekit-server/pkg/routing" "github.com/livekit/livekit-server/pkg/rtc" "github.com/livekit/livekit-server/pkg/rtc/types" + "github.com/livekit/livekit-server/pkg/utils/stats" ) type RTCService struct { @@ -92,6 +93,7 @@ func (s *RTCService) validate(r *http.Request) (string, routing.ParticipantInit, func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { // reject non websocket requests if !websocket.IsWebSocketUpgrade(r) { + stats.PromServiceOperationCounter.WithLabelValues("signal_ws", "error", "reject").Add(1) w.WriteHeader(404) return } @@ -105,6 +107,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { // create room if it doesn't exist, also assigns an RTC node for the room rm, err := s.roomAllocator.CreateRoom(r.Context(), &livekit.CreateRoomRequest{Name: roomName}) if err != nil { + stats.PromServiceOperationCounter.WithLabelValues("signal_ws", "error", "create_room").Add(1) handleError(w, http.StatusInternalServerError, err.Error()) return } @@ -112,6 +115,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { // this needs to be started first *before* using router functions on this node connId, reqSink, resSource, err := s.router.StartParticipantSignal(r.Context(), roomName, pi) if err != nil { + stats.PromServiceOperationCounter.WithLabelValues("signal_ws", "error", "start_signal").Add(1) handleError(w, http.StatusInternalServerError, "could not start session: "+err.Error()) return } @@ -127,6 +131,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { // upgrade only once the basics are good to go conn, err := s.upgrader.Upgrade(w, r, nil) if err != nil { + stats.PromServiceOperationCounter.WithLabelValues("signal_ws", "error", "upgrade").Add(1) logger.Warnw("could not upgrade to WS", err) handleError(w, http.StatusInternalServerError, err.Error()) return @@ -136,6 +141,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { sigConn.useJSON = false } + stats.PromServiceOperationCounter.WithLabelValues("signal_ws", "success", "").Add(1) logger.Infow("new client WS connected", "connID", connId, "roomID", rm.Sid, diff --git a/pkg/utils/stats/stats.go b/pkg/utils/stats/stats.go index 75c18e918..284680c1d 100644 --- a/pkg/utils/stats/stats.go +++ b/pkg/utils/stats/stats.go @@ -23,10 +23,20 @@ var ( }, []string{"type", "status"}, ) + + PromServiceOperationCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: livekitNamespace, + Subsystem: "node", + Name: "service_operation", + }, + []string{"type", "status", "error_type"}, + ) ) func init() { prometheus.MustRegister(PromMessageCounter) + prometheus.MustRegister(PromServiceOperationCounter) initPacketStats() initRoomStatsReporter()