Prometheus counters for RTC connection steps (#143)

* signal ws connection, participant join, ice connection

* must register

* offer negotiation

* dz review: offer and offer_response

* dz review: answer
This commit is contained in:
Mathew Kamkar
2021-10-12 15:22:17 -07:00
committed by GitHub
parent 4149c4a314
commit 84ab0f82af
5 changed files with 38 additions and 1 deletions
+13 -1
View File
@@ -277,16 +277,19 @@ func (p *ParticipantImpl) HandleOffer(sdp webrtc.SessionDescription) (answer web
)
if err = p.publisher.SetRemoteDescription(sdp); err != nil {
stats.PromServiceOperationCounter.WithLabelValues("answer", "error", "remote_description").Add(1)
return
}
answer, err = p.publisher.pc.CreateAnswer(nil)
if err != nil {
stats.PromServiceOperationCounter.WithLabelValues("answer", "error", "create").Add(1)
err = errors.Wrap(err, "could not create answer")
return
}
if err = p.publisher.pc.SetLocalDescription(answer); err != nil {
stats.PromServiceOperationCounter.WithLabelValues("answer", "error", "local_description").Add(1)
err = errors.Wrap(err, "could not set local description")
return
}
@@ -301,12 +304,15 @@ func (p *ParticipantImpl) HandleOffer(sdp webrtc.SessionDescription) (answer web
},
})
if err != nil {
stats.PromServiceOperationCounter.WithLabelValues("answer", "error", "write_message").Add(1)
return
}
if p.State() == livekit.ParticipantInfo_JOINING {
p.updateState(livekit.ParticipantInfo_JOINED)
}
stats.PromServiceOperationCounter.WithLabelValues("answer", "success", "").Add(1)
return
}
@@ -775,11 +781,16 @@ func (p *ParticipantImpl) onOffer(offer webrtc.SessionDescription) {
//"sdp", offer.SDP,
)
_ = p.writeMessage(&livekit.SignalResponse{
err := p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_Offer{
Offer: ToProtoSessionDescription(offer),
},
})
if err != nil {
stats.PromServiceOperationCounter.WithLabelValues("offer", "error", "write_message").Add(1)
} else {
stats.PromServiceOperationCounter.WithLabelValues("offer", "success", "").Add(1)
}
}
// when a new remoteTrack is created, creates a Track and adds it to room
@@ -948,6 +959,7 @@ func (p *ParticipantImpl) handlePrimaryICEStateChange(state webrtc.ICEConnection
// logger.Debugw("ICE connection state changed", "state", state.String(),
// "participant", p.identity, "pID", p.ID())
if state == webrtc.ICEConnectionStateConnected {
stats.PromServiceOperationCounter.WithLabelValues("ice_connection", "success", "").Add(1)
p.updateState(livekit.ParticipantInfo_ACTIVE)
} else if state == webrtc.ICEConnectionStateFailed {
// only close when failed, to allow clients opportunity to reconnect
+6
View File
@@ -141,6 +141,7 @@ func (r *Room) LastLeftAt() int64 {
func (r *Room) Join(participant types.Participant, opts *ParticipantOptions) error {
if r.isClosed.Get() {
stats.PromServiceOperationCounter.WithLabelValues("participant_join", "error", "room_closed").Add(1)
return ErrRoomClosed
}
@@ -148,10 +149,12 @@ func (r *Room) Join(participant types.Participant, opts *ParticipantOptions) err
defer r.lock.Unlock()
if r.participants[participant.Identity()] != nil {
stats.PromServiceOperationCounter.WithLabelValues("participant_join", "error", "already_joined").Add(1)
return ErrAlreadyJoined
}
if r.Room.MaxParticipants > 0 && int(r.Room.MaxParticipants) == len(r.participants) {
stats.PromServiceOperationCounter.WithLabelValues("participant_join", "error", "max_exceeded").Add(1)
return ErrMaxParticipantsExceeded
}
@@ -221,6 +224,7 @@ func (r *Room) Join(participant types.Participant, opts *ParticipantOptions) err
})
if err := participant.SendJoinResponse(r.Room, otherParticipants, r.iceServers); err != nil {
stats.PromServiceOperationCounter.WithLabelValues("participant_join", "error", "send_response").Add(1)
return err
}
@@ -229,6 +233,8 @@ func (r *Room) Join(participant types.Participant, opts *ParticipantOptions) err
participant.Negotiate()
}
stats.PromServiceOperationCounter.WithLabelValues("participant_join", "success", "").Add(1)
return nil
}
+3
View File
@@ -204,6 +204,7 @@ func (t *PCTransport) createAndSendOffer(options *webrtc.OfferOptions) error {
if iceRestart && currentSD != nil {
logger.Debugw("recovering from client negotiation state")
if err := t.pc.SetRemoteDescription(*currentSD); err != nil {
stats.PromServiceOperationCounter.WithLabelValues("offer", "error", "remote_description").Add(1)
return err
}
} else {
@@ -218,12 +219,14 @@ func (t *PCTransport) createAndSendOffer(options *webrtc.OfferOptions) error {
offer, err := t.pc.CreateOffer(options)
if err != nil {
stats.PromServiceOperationCounter.WithLabelValues("offer", "error", "create").Add(1)
logger.Errorw("could not create offer", err)
return err
}
err = t.pc.SetLocalDescription(offer)
if err != nil {
stats.PromServiceOperationCounter.WithLabelValues("offer", "error", "local_description").Add(1)
logger.Errorw("could not set local description", err)
return err
}
+6
View File
@@ -16,6 +16,7 @@ import (
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/rtc"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/utils/stats"
)
type RTCService struct {
@@ -92,6 +93,7 @@ func (s *RTCService) validate(r *http.Request) (string, routing.ParticipantInit,
func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// reject non websocket requests
if !websocket.IsWebSocketUpgrade(r) {
stats.PromServiceOperationCounter.WithLabelValues("signal_ws", "error", "reject").Add(1)
w.WriteHeader(404)
return
}
@@ -105,6 +107,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// create room if it doesn't exist, also assigns an RTC node for the room
rm, err := s.roomAllocator.CreateRoom(r.Context(), &livekit.CreateRoomRequest{Name: roomName})
if err != nil {
stats.PromServiceOperationCounter.WithLabelValues("signal_ws", "error", "create_room").Add(1)
handleError(w, http.StatusInternalServerError, err.Error())
return
}
@@ -112,6 +115,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// this needs to be started first *before* using router functions on this node
connId, reqSink, resSource, err := s.router.StartParticipantSignal(r.Context(), roomName, pi)
if err != nil {
stats.PromServiceOperationCounter.WithLabelValues("signal_ws", "error", "start_signal").Add(1)
handleError(w, http.StatusInternalServerError, "could not start session: "+err.Error())
return
}
@@ -127,6 +131,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// upgrade only once the basics are good to go
conn, err := s.upgrader.Upgrade(w, r, nil)
if err != nil {
stats.PromServiceOperationCounter.WithLabelValues("signal_ws", "error", "upgrade").Add(1)
logger.Warnw("could not upgrade to WS", err)
handleError(w, http.StatusInternalServerError, err.Error())
return
@@ -136,6 +141,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
sigConn.useJSON = false
}
stats.PromServiceOperationCounter.WithLabelValues("signal_ws", "success", "").Add(1)
logger.Infow("new client WS connected",
"connID", connId,
"roomID", rm.Sid,
+10
View File
@@ -23,10 +23,20 @@ var (
},
[]string{"type", "status"},
)
PromServiceOperationCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: livekitNamespace,
Subsystem: "node",
Name: "service_operation",
},
[]string{"type", "status", "error_type"},
)
)
func init() {
prometheus.MustRegister(PromMessageCounter)
prometheus.MustRegister(PromServiceOperationCounter)
initPacketStats()
initRoomStatsReporter()