From 289ebd32ff41577f5480e69acd5f1b2303009e85 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Mon, 8 Nov 2021 18:00:34 -0800 Subject: [PATCH] Telemetry refactor (#172) * telemetry refactor * fix imports * update protocol --- go.mod | 3 +- go.sum | 4 +- magefile.go | 2 +- pkg/routing/redisrouter.go | 16 +- pkg/rtc/mediatrack.go | 15 +- pkg/rtc/participant.go | 45 ++--- pkg/rtc/room.go | 40 ++--- pkg/rtc/room_test.go | 2 + pkg/rtc/transport.go | 33 ++-- pkg/rtc/transport_test.go | 14 +- pkg/service/recordingservice.go | 46 ++--- pkg/service/roommanager.go | 62 ++----- pkg/service/rtcservice.go | 12 +- pkg/service/wire.go | 2 + pkg/service/wire_gen.go | 9 +- pkg/telemetry/events.go | 139 ++++++++++++++++ pkg/telemetry/prometheus/node.go | 86 ++++++++++ .../prometheus}/node_linux.go | 3 +- .../prometheus}/node_nonlinux.go | 3 +- pkg/telemetry/prometheus/packets.go | 83 +++++++++ .../prometheus/rooms.go} | 48 ++---- pkg/telemetry/service.go | 91 ++++++++++ pkg/telemetry/stats.go | 64 +++++++ pkg/utils/stats/packetstats.go | 157 ------------------ pkg/utils/stats/stats.go | 129 -------------- 25 files changed, 597 insertions(+), 511 deletions(-) create mode 100644 pkg/telemetry/events.go create mode 100644 pkg/telemetry/prometheus/node.go rename pkg/{utils/stats => telemetry/prometheus}/node_linux.go (94%) rename pkg/{utils/stats => telemetry/prometheus}/node_nonlinux.go (80%) create mode 100644 pkg/telemetry/prometheus/packets.go rename pkg/{utils/stats/roomstatsreporter.go => telemetry/prometheus/rooms.go} (61%) create mode 100644 pkg/telemetry/service.go create mode 100644 pkg/telemetry/stats.go delete mode 100644 pkg/utils/stats/packetstats.go delete mode 100644 pkg/utils/stats/stats.go diff --git a/go.mod b/go.mod index bfa324f7b..096bd4c10 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/gorilla/websocket v1.4.2 github.com/hashicorp/golang-lru v0.5.4 github.com/jxskiss/base62 v0.0.0-20191017122030-4f11678b909b - github.com/livekit/protocol v0.10.0 + github.com/livekit/protocol v0.10.1-0.20211109000312-b3847c8d35ff github.com/magefile/mage v1.11.0 github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0 github.com/mitchellh/go-homedir v1.1.0 @@ -28,7 +28,6 @@ require ( github.com/pion/rtp v1.7.2 github.com/pion/sdp/v3 v3.0.4 github.com/pion/stun v0.3.5 - github.com/pion/transport v0.12.3 github.com/pion/turn/v2 v2.0.5 github.com/pion/webrtc/v3 v3.1.5 github.com/pkg/errors v0.9.1 diff --git a/go.sum b/go.sum index a18406e21..061b5b9bd 100644 --- a/go.sum +++ b/go.sum @@ -252,8 +252,8 @@ github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= github.com/livekit/ion-sfu v1.20.16 h1:B4+z0sf4t3zZSXFIwHive8malNn6Vje+7b1OW4ETDOM= github.com/livekit/ion-sfu v1.20.16/go.mod h1:sUjL3tZRROs3NjCm6ZLT+IsisdYVRtxfq4OhVFHVd/A= -github.com/livekit/protocol v0.10.0 h1:s2zf1+G1Tcx6UKIf8mbRzbQ4ELdyS0mlLGsFkTVT5Aw= -github.com/livekit/protocol v0.10.0/go.mod h1:7ir9zSlgnrPQoGGNv4f8U/c9QrWh+ogC9B5xVbJNedM= +github.com/livekit/protocol v0.10.1-0.20211109000312-b3847c8d35ff h1:21SZ2sh5e7ELCVdXT01hlpdSZyNlwDv6KTOlcplBrQ8= +github.com/livekit/protocol v0.10.1-0.20211109000312-b3847c8d35ff/go.mod h1:7ir9zSlgnrPQoGGNv4f8U/c9QrWh+ogC9B5xVbJNedM= github.com/lucsky/cuid v1.0.2 h1:z4XlExeoderxoPj2/dxKOyPxe9RCOu7yNq9/XWxIUMQ= github.com/lucsky/cuid v1.0.2/go.mod h1:QaaJqckboimOmhRSJXSx/+IT+VTfxfPGSo/6mfgUfmE= github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= diff --git a/magefile.go b/magefile.go index 311681e6e..32ea73a3d 100644 --- a/magefile.go +++ b/magefile.go @@ -149,7 +149,7 @@ func Clean() { // regenerate code func Generate() error { - mg.Deps(installDeps) + mg.Deps(installDeps, generateWire) fmt.Println("generating...") diff --git a/pkg/routing/redisrouter.go b/pkg/routing/redisrouter.go index 0e50ef17f..3ad0e1816 100644 --- a/pkg/routing/redisrouter.go +++ b/pkg/routing/redisrouter.go @@ -12,7 +12,7 @@ import ( "google.golang.org/protobuf/proto" "github.com/livekit/livekit-server/pkg/routing/selector" - "github.com/livekit/livekit-server/pkg/utils/stats" + "github.com/livekit/livekit-server/pkg/telemetry/prometheus" ) const ( @@ -312,7 +312,7 @@ func (r *RedisRouter) statsWorker() { // update periodically seconds select { case <-time.After(statsUpdateInterval): - if err := stats.UpdateCurrentNodeStats(r.currentNode.Stats); err != nil { + if err := prometheus.UpdateCurrentNodeStats(r.currentNode.Stats); err != nil { logger.Errorw("could not update node stats", err, "nodeID", r.currentNode.Id) } if err := r.RegisterNode(); err != nil { @@ -345,28 +345,28 @@ func (r *RedisRouter) redisWorker(startedChan chan struct{}) { sm := livekit.SignalNodeMessage{} if err := proto.Unmarshal([]byte(msg.Payload), &sm); err != nil { logger.Errorw("could not unmarshal signal message on sigchan", err) - stats.PromMessageCounter.WithLabelValues("signal", "failure").Add(1) + prometheus.MessageCounter.WithLabelValues("signal", "failure").Add(1) continue } if err := r.handleSignalMessage(&sm); err != nil { logger.Errorw("error processing signal message", err) - stats.PromMessageCounter.WithLabelValues("signal", "failure").Add(1) + prometheus.MessageCounter.WithLabelValues("signal", "failure").Add(1) continue } - stats.PromMessageCounter.WithLabelValues("signal", "success").Add(1) + prometheus.MessageCounter.WithLabelValues("signal", "success").Add(1) } else if msg.Channel == rtcChannel { rm := livekit.RTCNodeMessage{} if err := proto.Unmarshal([]byte(msg.Payload), &rm); err != nil { logger.Errorw("could not unmarshal RTC message on rtcchan", err) - stats.PromMessageCounter.WithLabelValues("rtc", "failure").Add(1) + prometheus.MessageCounter.WithLabelValues("rtc", "failure").Add(1) continue } if err := r.handleRTCMessage(&rm); err != nil { logger.Errorw("error processing RTC message", err) - stats.PromMessageCounter.WithLabelValues("rtc", "failure").Add(1) + prometheus.MessageCounter.WithLabelValues("rtc", "failure").Add(1) continue } - stats.PromMessageCounter.WithLabelValues("rtc", "success").Add(1) + prometheus.MessageCounter.WithLabelValues("rtc", "success").Add(1) } } } diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index 51df002d3..8404da146 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -18,7 +18,7 @@ import ( "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/rtc/types" - "github.com/livekit/livekit-server/pkg/utils/stats" + "github.com/livekit/livekit-server/pkg/telemetry" ) var ( @@ -74,7 +74,7 @@ type MediaTrackParams struct { BufferFactory *buffer.Factory ReceiverConfig ReceiverConfig AudioConfig config.AudioConfig - Stats *stats.RoomStatsReporter + Telemetry *telemetry.TelemetryService Logger logger.Logger } @@ -247,7 +247,7 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error { delete(t.subscribedTracks, sub.ID()) t.lock.Unlock() - t.params.Stats.SubSubscribedTrack(t.Kind().String()) + t.params.Telemetry.UnsubscribedTrack(sub.ID(), sub.Identity(), t.ToProto()) // ignore if the subscribing sub is not connected if sub.SubscriberPC().ConnectionState() == webrtc.PeerConnectionStateClosed { @@ -293,7 +293,7 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error { sub.Negotiate() }() - t.params.Stats.AddSubscribedTrack(t.Kind().String()) + t.params.Telemetry.SubscribedTrack(sub.ID(), sub.Identity(), t.ToProto()) return nil } @@ -367,12 +367,12 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra onclose := t.onClose t.lock.Unlock() t.RemoveAllSubscribers() - t.params.Stats.SubPublishedTrack(t.Kind().String()) + t.params.Telemetry.UnpublishedTrack(t.params.ParticipantID, t.params.ParticipantIdentity, t.ToProto()) if onclose != nil { onclose() } }) - t.params.Stats.AddPublishedTrack(t.Kind().String()) + t.params.Telemetry.PublishedTrack(t.params.ParticipantID, t.params.ParticipantIdentity, t.ToProto()) if t.Kind() == livekit.TrackType_AUDIO { t.buffer = buff @@ -523,9 +523,6 @@ func (t *MediaTrack) handlePublisherFeedback(packets []rtcp.Packet) { t.fracLostLock.Unlock() } - if t.params.Stats != nil { - t.params.Stats.Incoming.HandleRTCP(packets) - } // also look for sender reports // feedback for the source RTCP t.params.RTCPChan <- packets diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 3f83a22b8..475e09b48 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -22,7 +22,8 @@ import ( "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" "github.com/livekit/livekit-server/pkg/rtc/types" - "github.com/livekit/livekit-server/pkg/utils/stats" + "github.com/livekit/livekit-server/pkg/telemetry" + "github.com/livekit/livekit-server/pkg/telemetry/prometheus" "github.com/livekit/livekit-server/version" ) @@ -38,7 +39,7 @@ type ParticipantParams struct { Sink routing.MessageSink AudioConfig config.AudioConfig ProtocolVersion types.ProtocolVersion - Stats *stats.RoomStatsReporter + Telemetry *telemetry.TelemetryService ThrottleConfig config.PLIThrottleConfig EnabledCodecs []*livekit.Codec Hidden bool @@ -115,20 +116,24 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) { return nil, err } p.publisher, err = NewPCTransport(TransportParams{ - Target: livekit.SignalTarget_PUBLISHER, - Config: params.Config, - Stats: p.params.Stats, - EnabledCodecs: p.params.EnabledCodecs, - Logger: params.Logger, + ParticipantID: p.id, + ParticipantIdentity: p.params.Identity, + Target: livekit.SignalTarget_PUBLISHER, + Config: params.Config, + Telemetry: p.params.Telemetry, + EnabledCodecs: p.params.EnabledCodecs, + Logger: params.Logger, }) if err != nil { return nil, err } p.subscriber, err = NewPCTransport(TransportParams{ - Target: livekit.SignalTarget_SUBSCRIBER, - Config: params.Config, - Stats: p.params.Stats, - Logger: params.Logger, + ParticipantID: p.id, + ParticipantIdentity: p.params.Identity, + Target: livekit.SignalTarget_SUBSCRIBER, + Config: params.Config, + Telemetry: p.params.Telemetry, + Logger: params.Logger, }) if err != nil { return nil, err @@ -283,7 +288,7 @@ func (p *ParticipantImpl) HandleOffer(sdp webrtc.SessionDescription) (answer web ) if err = p.publisher.SetRemoteDescription(sdp); err != nil { - stats.PromServiceOperationCounter.WithLabelValues("answer", "error", "remote_description").Add(1) + prometheus.ServiceOperationCounter.WithLabelValues("answer", "error", "remote_description").Add(1) return } @@ -291,13 +296,13 @@ func (p *ParticipantImpl) HandleOffer(sdp webrtc.SessionDescription) (answer web answer, err = p.publisher.pc.CreateAnswer(nil) if err != nil { - stats.PromServiceOperationCounter.WithLabelValues("answer", "error", "create").Add(1) + prometheus.ServiceOperationCounter.WithLabelValues("answer", "error", "create").Add(1) err = errors.Wrap(err, "could not create answer") return } if err = p.publisher.pc.SetLocalDescription(answer); err != nil { - stats.PromServiceOperationCounter.WithLabelValues("answer", "error", "local_description").Add(1) + prometheus.ServiceOperationCounter.WithLabelValues("answer", "error", "local_description").Add(1) err = errors.Wrap(err, "could not set local description") return } @@ -312,14 +317,14 @@ func (p *ParticipantImpl) HandleOffer(sdp webrtc.SessionDescription) (answer web }, }) if err != nil { - stats.PromServiceOperationCounter.WithLabelValues("answer", "error", "write_message").Add(1) + prometheus.ServiceOperationCounter.WithLabelValues("answer", "error", "write_message").Add(1) return } if p.State() == livekit.ParticipantInfo_JOINING { p.updateState(livekit.ParticipantInfo_JOINED) } - stats.PromServiceOperationCounter.WithLabelValues("answer", "success", "").Add(1) + prometheus.ServiceOperationCounter.WithLabelValues("answer", "success", "").Add(1) return } @@ -890,9 +895,9 @@ func (p *ParticipantImpl) onOffer(offer webrtc.SessionDescription) { }, }) if err != nil { - stats.PromServiceOperationCounter.WithLabelValues("offer", "error", "write_message").Add(1) + prometheus.ServiceOperationCounter.WithLabelValues("offer", "error", "write_message").Add(1) } else { - stats.PromServiceOperationCounter.WithLabelValues("offer", "success", "").Add(1) + prometheus.ServiceOperationCounter.WithLabelValues("offer", "success", "").Add(1) } } @@ -938,7 +943,7 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w BufferFactory: p.params.Config.BufferFactory, ReceiverConfig: p.params.Config.Receiver, AudioConfig: p.params.AudioConfig, - Stats: p.params.Stats, + Telemetry: p.params.Telemetry, Logger: p.params.Logger, }) @@ -1084,7 +1089,7 @@ func (p *ParticipantImpl) handlePrimaryICEStateChange(state webrtc.ICEConnection // p.params.Logger.Debugw("ICE connection state changed", "state", state.String(), // "participant", p.identity, "pID", p.ID()) if state == webrtc.ICEConnectionStateConnected { - stats.PromServiceOperationCounter.WithLabelValues("ice_connection", "success", "").Add(1) + prometheus.ServiceOperationCounter.WithLabelValues("ice_connection", "success", "").Add(1) p.updateState(livekit.ParticipantInfo_ACTIVE) } else if state == webrtc.ICEConnectionStateFailed { // only close when failed, to allow clients opportunity to reconnect diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index d9b7086e9..b72e96bf1 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -8,15 +8,16 @@ import ( "time" "github.com/go-logr/logr" - "github.com/livekit/livekit-server/pkg/routing" "github.com/livekit/protocol/logger" livekit "github.com/livekit/protocol/proto" "github.com/pion/ion-sfu/pkg/buffer" "google.golang.org/protobuf/proto" "github.com/livekit/livekit-server/pkg/config" + "github.com/livekit/livekit-server/pkg/routing" "github.com/livekit/livekit-server/pkg/rtc/types" - "github.com/livekit/livekit-server/pkg/utils/stats" + "github.com/livekit/livekit-server/pkg/telemetry" + "github.com/livekit/livekit-server/pkg/telemetry/prometheus" ) const ( @@ -45,7 +46,7 @@ type Room struct { // for active speaker updates audioConfig *config.AudioConfig - statsReporter *stats.RoomStatsReporter + telemetry *telemetry.TelemetryService onParticipantChanged func(p types.Participant) onMetadataUpdate func(metadata string) @@ -56,13 +57,13 @@ type ParticipantOptions struct { AutoSubscribe bool } -func NewRoom(room *livekit.Room, config WebRTCConfig, audioConfig *config.AudioConfig) *Room { +func NewRoom(room *livekit.Room, config WebRTCConfig, audioConfig *config.AudioConfig, telemetry *telemetry.TelemetryService) *Room { r := &Room{ Room: proto.Clone(room).(*livekit.Room), Logger: logger.Logger(logger.GetLogger().WithValues("room", room.Name)), config: config, audioConfig: audioConfig, - statsReporter: stats.NewRoomStatsReporter(), + telemetry: telemetry, participants: make(map[string]types.Participant), participantOpts: make(map[string]*ParticipantOptions), bufferFactory: buffer.NewBufferFactory(config.Receiver.packetBufferSize, logr.Logger{}), @@ -74,7 +75,7 @@ func NewRoom(room *livekit.Room, config WebRTCConfig, audioConfig *config.AudioC if r.Room.CreationTime == 0 { r.Room.CreationTime = time.Now().Unix() } - r.statsReporter.RoomStarted() + go r.audioUpdateWorker() go r.connectionQualityWorker() @@ -118,10 +119,6 @@ func (r *Room) GetActiveSpeakers() []*livekit.SpeakerInfo { return speakers } -func (r *Room) GetStatsReporter() *stats.RoomStatsReporter { - return r.statsReporter -} - func (r *Room) GetBufferFactor() *buffer.Factory { return r.bufferFactory } @@ -144,7 +141,7 @@ func (r *Room) LastLeftAt() int64 { func (r *Room) Join(participant types.Participant, opts *ParticipantOptions, iceServers []*livekit.ICEServer) error { if r.IsClosed() { - stats.PromServiceOperationCounter.WithLabelValues("participant_join", "error", "room_closed").Add(1) + prometheus.ServiceOperationCounter.WithLabelValues("participant_join", "error", "room_closed").Add(1) return ErrRoomClosed } @@ -152,12 +149,12 @@ func (r *Room) Join(participant types.Participant, opts *ParticipantOptions, ice defer r.lock.Unlock() if r.participants[participant.Identity()] != nil { - stats.PromServiceOperationCounter.WithLabelValues("participant_join", "error", "already_joined").Add(1) + prometheus.ServiceOperationCounter.WithLabelValues("participant_join", "error", "already_joined").Add(1) return ErrAlreadyJoined } if r.Room.MaxParticipants > 0 && int(r.Room.MaxParticipants) == len(r.participants) { - stats.PromServiceOperationCounter.WithLabelValues("participant_join", "error", "max_exceeded").Add(1) + prometheus.ServiceOperationCounter.WithLabelValues("participant_join", "error", "max_exceeded").Add(1) return ErrMaxParticipantsExceeded } @@ -165,8 +162,6 @@ func (r *Room) Join(participant types.Participant, opts *ParticipantOptions, ice r.joinedAt.Store(time.Now().Unix()) } - r.statsReporter.AddParticipant() - // it's important to set this before connection, we don't want to miss out on any publishedTracks participant.OnTrackPublished(r.onTrackPublished) participant.OnStateChange(func(p types.Participant, oldState livekit.ParticipantInfo_State) { @@ -223,7 +218,7 @@ func (r *Room) Join(participant types.Participant, opts *ParticipantOptions, ice }) if err := participant.SendJoinResponse(r.Room, otherParticipants, iceServers); err != nil { - stats.PromServiceOperationCounter.WithLabelValues("participant_join", "error", "send_response").Add(1) + prometheus.ServiceOperationCounter.WithLabelValues("participant_join", "error", "send_response").Add(1) return err } @@ -232,7 +227,7 @@ func (r *Room) Join(participant types.Participant, opts *ParticipantOptions, ice participant.Negotiate() } - stats.PromServiceOperationCounter.WithLabelValues("participant_join", "success", "").Add(1) + prometheus.ServiceOperationCounter.WithLabelValues("participant_join", "success", "").Add(1) return nil } @@ -266,7 +261,6 @@ func (r *Room) RemoveParticipant(identity string) { if !ok { return } - r.statsReporter.SubParticipant() // send broadcast only if it's not already closed sendUpdates := p.State() != livekit.ParticipantInfo_DISCONNECTED @@ -374,22 +368,12 @@ func (r *Room) Close() { r.closeOnce.Do(func() { close(r.closed) r.Logger.Infow("closing room", "roomID", r.Room.Sid, "room", r.Room.Name) - - r.statsReporter.RoomEnded() if r.onClose != nil { r.onClose() } }) } -func (r *Room) GetIncomingStats() stats.PacketStats { - return *r.statsReporter.Incoming -} - -func (r *Room) GetOutgoingStats() stats.PacketStats { - return *r.statsReporter.Outgoing -} - func (r *Room) OnClose(f func()) { r.onClose = f } diff --git a/pkg/rtc/room_test.go b/pkg/rtc/room_test.go index fda14f600..098be3cbb 100644 --- a/pkg/rtc/room_test.go +++ b/pkg/rtc/room_test.go @@ -13,6 +13,7 @@ import ( "github.com/livekit/livekit-server/pkg/rtc" "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/livekit-server/pkg/rtc/types/typesfakes" + "github.com/livekit/livekit-server/pkg/telemetry" "github.com/livekit/livekit-server/pkg/testutils" ) @@ -551,6 +552,7 @@ func newRoomWithParticipants(t *testing.T, opts testRoomOpts) *rtc.Room { UpdateInterval: audioUpdateInterval, SmoothIntervals: opts.audioSmoothIntervals, }, + telemetry.NewTelemetryService(nil), ) for i := 0; i < opts.num+opts.numHidden; i++ { identity := fmt.Sprintf("p%d", i) diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index 2bc4b0593..773adc1f4 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -10,7 +10,8 @@ import ( "github.com/pion/interceptor" "github.com/pion/webrtc/v3" - "github.com/livekit/livekit-server/pkg/utils/stats" + "github.com/livekit/livekit-server/pkg/telemetry" + "github.com/livekit/livekit-server/pkg/telemetry/prometheus" ) const ( @@ -40,11 +41,13 @@ type PCTransport struct { } type TransportParams struct { - Target livekit.SignalTarget - Config *WebRTCConfig - Stats *stats.RoomStatsReporter - EnabledCodecs []*livekit.Codec - Logger logger.Logger + ParticipantID string + ParticipantIdentity string + Target livekit.SignalTarget + Config *WebRTCConfig + Telemetry *telemetry.TelemetryService + EnabledCodecs []*livekit.Codec + Logger logger.Logger } func newPeerConnection(params TransportParams) (*webrtc.PeerConnection, *webrtc.MediaEngine, error) { @@ -60,18 +63,10 @@ func newPeerConnection(params TransportParams) (*webrtc.PeerConnection, *webrtc. } se := params.Config.SettingEngine se.DisableMediaEngineCopy(true) - if params.Stats != nil && se.BufferFactory != nil { - wrapper := &stats.StatsBufferWrapper{ - CreateBufferFunc: se.BufferFactory, - Stats: params.Stats.Incoming, - } - se.BufferFactory = wrapper.CreateBuffer - } ir := &interceptor.Registry{} - if params.Stats != nil && params.Target == livekit.SignalTarget_SUBSCRIBER { - // only capture subscriber for outbound streams - f := stats.NewStatsInterceptorFactory(params.Stats) + if params.Telemetry != nil { + f := params.Telemetry.NewStatsInterceptorFactory(params.ParticipantID, params.ParticipantIdentity) ir.Add(f) } api := webrtc.NewAPI( @@ -208,7 +203,7 @@ func (t *PCTransport) createAndSendOffer(options *webrtc.OfferOptions) error { if iceRestart && currentSD != nil { t.logger.Debugw("recovering from client negotiation state") if err := t.pc.SetRemoteDescription(*currentSD); err != nil { - stats.PromServiceOperationCounter.WithLabelValues("offer", "error", "remote_description").Add(1) + prometheus.ServiceOperationCounter.WithLabelValues("offer", "error", "remote_description").Add(1) return err } } else { @@ -223,14 +218,14 @@ func (t *PCTransport) createAndSendOffer(options *webrtc.OfferOptions) error { offer, err := t.pc.CreateOffer(options) if err != nil { - stats.PromServiceOperationCounter.WithLabelValues("offer", "error", "create").Add(1) + prometheus.ServiceOperationCounter.WithLabelValues("offer", "error", "create").Add(1) t.logger.Errorw("could not create offer", err) return err } err = t.pc.SetLocalDescription(offer) if err != nil { - stats.PromServiceOperationCounter.WithLabelValues("offer", "error", "local_description").Add(1) + prometheus.ServiceOperationCounter.WithLabelValues("offer", "error", "local_description").Add(1) t.logger.Errorw("could not set local description", err) return err } diff --git a/pkg/rtc/transport_test.go b/pkg/rtc/transport_test.go index e5c5821b8..46a1a854b 100644 --- a/pkg/rtc/transport_test.go +++ b/pkg/rtc/transport_test.go @@ -14,9 +14,10 @@ import ( func TestMissingAnswerDuringICERestart(t *testing.T) { params := TransportParams{ - Target: livekit.SignalTarget_PUBLISHER, - Config: &WebRTCConfig{}, - Stats: nil, + ParticipantID: "id", + ParticipantIdentity: "identity", + Target: livekit.SignalTarget_PUBLISHER, + Config: &WebRTCConfig{}, } transportA, err := NewPCTransport(params) require.NoError(t, err) @@ -64,9 +65,10 @@ func TestMissingAnswerDuringICERestart(t *testing.T) { func TestNegotiationTiming(t *testing.T) { params := TransportParams{ - Target: livekit.SignalTarget_SUBSCRIBER, - Config: &WebRTCConfig{}, - Stats: nil, + ParticipantID: "id", + ParticipantIdentity: "identity", + Target: livekit.SignalTarget_SUBSCRIBER, + Config: &WebRTCConfig{}, } transportA, err := NewPCTransport(params) require.NoError(t, err) diff --git a/pkg/service/recordingservice.go b/pkg/service/recordingservice.go index 60c3413d9..600662d8e 100644 --- a/pkg/service/recordingservice.go +++ b/pkg/service/recordingservice.go @@ -3,28 +3,28 @@ package service import ( "context" "errors" - "time" "github.com/livekit/protocol/logger" livekit "github.com/livekit/protocol/proto" "github.com/livekit/protocol/recording" "github.com/livekit/protocol/utils" - "github.com/livekit/protocol/webhook" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/emptypb" + + "github.com/livekit/livekit-server/pkg/telemetry" ) type RecordingService struct { - bus utils.MessageBus - notifier webhook.Notifier - shutdown chan struct{} + bus utils.MessageBus + telemetry *telemetry.TelemetryService + shutdown chan struct{} } -func NewRecordingService(mb utils.MessageBus, notifier webhook.Notifier) *RecordingService { +func NewRecordingService(mb utils.MessageBus, telemetry *telemetry.TelemetryService) *RecordingService { return &RecordingService{ - bus: mb, - notifier: notifier, - shutdown: make(chan struct{}, 1), + bus: mb, + telemetry: telemetry, + shutdown: make(chan struct{}, 1), } } @@ -62,6 +62,7 @@ func (s *RecordingService) StartRecording(ctx context.Context, req *livekit.Star return nil, err } + s.telemetry.RecordingStarted(ctx, recordingId, req) return &livekit.StartRecordingResponse{RecordingId: recordingId}, nil } @@ -141,35 +142,10 @@ func (s *RecordingService) resultsWorker() { logger.Errorw("failed to read results", err) continue } - s.notify(res) + s.telemetry.RecordingEnded(res) case <-s.shutdown: _ = sub.Close() return } } } - -func (s *RecordingService) notify(res *livekit.RecordingResult) { - // log results - values := []interface{}{"id", res.Id} - if res.Error != "" { - values = append(values, "error", res.Error) - } else { - values = append(values, "duration", time.Duration(res.Duration*1e9)) - if res.DownloadUrl != "" { - values = append(values, "url", res.DownloadUrl) - } - } - logger.Debugw("received recording result", values...) - - // webhook - if s.notifier != nil { - event := webhook.EventRecordingFinished - if err := s.notifier.Notify(&livekit.WebhookEvent{ - Event: event, - RecordingResult: res, - }); err != nil { - logger.Warnw("could not notify webhook", err, "event", event) - } - } -} diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index edf261f7c..eda2a649c 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -6,15 +6,14 @@ import ( "sync" "time" - "github.com/gammazero/workerpool" "github.com/livekit/protocol/logger" livekit "github.com/livekit/protocol/proto" - "github.com/livekit/protocol/webhook" "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" "github.com/livekit/livekit-server/pkg/rtc" "github.com/livekit/livekit-server/pkg/rtc/types" + "github.com/livekit/livekit-server/pkg/telemetry" ) const ( @@ -29,15 +28,14 @@ type LocalRoomManager struct { lock sync.RWMutex router routing.Router currentNode routing.LocalNode - notifier webhook.Notifier rtcConfig *rtc.WebRTCConfig config *config.Config - webhookPool *workerpool.WorkerPool + telemetry *telemetry.TelemetryService rooms map[string]*rtc.Room } func NewLocalRoomManager(conf *config.Config, rs RoomStore, router routing.Router, currentNode routing.LocalNode, - notifier webhook.Notifier) (*LocalRoomManager, error) { + telemetry *telemetry.TelemetryService) (*LocalRoomManager, error) { rtcConf, err := rtc.NewWebRTCConfig(conf, currentNode.Ip) if err != nil { @@ -50,9 +48,8 @@ func NewLocalRoomManager(conf *config.Config, rs RoomStore, router routing.Route rtcConfig: rtcConf, config: conf, router: router, - notifier: notifier, currentNode: currentNode, - webhookPool: workerpool.New(1), + telemetry: telemetry, rooms: make(map[string]*rtc.Room), } @@ -228,7 +225,7 @@ func (r *LocalRoomManager) StartSession(ctx context.Context, roomName string, pi Sink: responseSink, AudioConfig: r.config.Audio, ProtocolVersion: pv, - Stats: room.GetStatsReporter(), + Telemetry: r.telemetry, ThrottleConfig: r.config.RTC.PLIThrottle, EnabledCodecs: room.Room.EnabledCodecs, Hidden: pi.Hidden, @@ -255,6 +252,11 @@ func (r *LocalRoomManager) StartSession(ctx context.Context, roomName string, pi return } + r.telemetry.ParticipantJoined(ctx, room.Room, participant.ToProto()) + participant.OnClose(func(p types.Participant) { + r.telemetry.ParticipantLeft(ctx, room.Room, p.ToProto()) + }) + go r.rtcSessionWorker(room, participant, requestSource) } @@ -275,22 +277,16 @@ func (r *LocalRoomManager) getOrCreateRoom(ctx context.Context, roomName string) } // construct ice servers - room = rtc.NewRoom(ri, *r.rtcConfig, &r.config.Audio) + room = rtc.NewRoom(ri, *r.rtcConfig, &r.config.Audio, r.telemetry) + r.telemetry.RoomStarted(ctx, room.Room) + room.OnClose(func() { + r.telemetry.RoomEnded(ctx, room.Room) if err := r.DeleteRoom(ctx, roomName); err != nil { logger.Errorw("could not delete room", err) } - r.notifyEvent(&livekit.WebhookEvent{ - Event: webhook.EventRoomFinished, - Room: room.Room, - }) - - // print stats - logger.Infow("room closed", - "incomingStats", room.GetIncomingStats().Copy(), - "outgoingStats", room.GetOutgoingStats().Copy(), - ) + logger.Infow("room closed") }) room.OnMetadataUpdate(func(metadata string) { err := r.StoreRoom(ctx, room.Room) @@ -313,11 +309,6 @@ func (r *LocalRoomManager) getOrCreateRoom(ctx context.Context, roomName string) r.rooms[roomName] = room r.lock.Unlock() - r.notifyEvent(&livekit.WebhookEvent{ - Event: webhook.EventRoomStarted, - Room: room.Room, - }) - return room, nil } @@ -331,20 +322,9 @@ func (r *LocalRoomManager) rtcSessionWorker(room *rtc.Room, participant types.Pa "roomID", room.Room.Sid, ) _ = participant.Close() - - r.notifyEvent(&livekit.WebhookEvent{ - Event: webhook.EventParticipantLeft, - Room: room.Room, - Participant: participant.ToProto(), - }) }() defer rtc.Recover() - r.notifyEvent(&livekit.WebhookEvent{ - Event: webhook.EventParticipantJoined, - Room: room.Room, - Participant: participant.ToProto(), - }) for { select { case <-time.After(time.Millisecond * 50): @@ -569,18 +549,6 @@ func (r *LocalRoomManager) iceServersForRoom(ri *livekit.Room) []*livekit.ICESer return iceServers } -func (r *LocalRoomManager) notifyEvent(event *livekit.WebhookEvent) { - if r.notifier == nil { - return - } - - r.webhookPool.Submit(func() { - if err := r.notifier.Notify(event); err != nil { - logger.Warnw("could not notify webhook", err, "event", event.Event) - } - }) -} - func iceServerForStunServers(servers []string) *livekit.ICEServer { iceServer := &livekit.ICEServer{} for _, stunServer := range servers { diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index b0144de62..cb6a1db26 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -16,7 +16,7 @@ import ( "github.com/livekit/livekit-server/pkg/routing" "github.com/livekit/livekit-server/pkg/rtc" "github.com/livekit/livekit-server/pkg/rtc/types" - "github.com/livekit/livekit-server/pkg/utils/stats" + "github.com/livekit/livekit-server/pkg/telemetry/prometheus" ) type RTCService struct { @@ -93,7 +93,7 @@ func (s *RTCService) validate(r *http.Request) (string, routing.ParticipantInit, func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { // reject non websocket requests if !websocket.IsWebSocketUpgrade(r) { - stats.PromServiceOperationCounter.WithLabelValues("signal_ws", "error", "reject").Add(1) + prometheus.ServiceOperationCounter.WithLabelValues("signal_ws", "error", "reject").Add(1) w.WriteHeader(404) return } @@ -107,7 +107,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { // create room if it doesn't exist, also assigns an RTC node for the room rm, err := s.roomAllocator.CreateRoom(r.Context(), &livekit.CreateRoomRequest{Name: roomName}) if err != nil { - stats.PromServiceOperationCounter.WithLabelValues("signal_ws", "error", "create_room").Add(1) + prometheus.ServiceOperationCounter.WithLabelValues("signal_ws", "error", "create_room").Add(1) handleError(w, http.StatusInternalServerError, err.Error()) return } @@ -115,7 +115,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { // this needs to be started first *before* using router functions on this node connId, reqSink, resSource, err := s.router.StartParticipantSignal(r.Context(), roomName, pi) if err != nil { - stats.PromServiceOperationCounter.WithLabelValues("signal_ws", "error", "start_signal").Add(1) + prometheus.ServiceOperationCounter.WithLabelValues("signal_ws", "error", "start_signal").Add(1) handleError(w, http.StatusInternalServerError, "could not start session: "+err.Error()) return } @@ -131,7 +131,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { // upgrade only once the basics are good to go conn, err := s.upgrader.Upgrade(w, r, nil) if err != nil { - stats.PromServiceOperationCounter.WithLabelValues("signal_ws", "error", "upgrade").Add(1) + prometheus.ServiceOperationCounter.WithLabelValues("signal_ws", "error", "upgrade").Add(1) logger.Warnw("could not upgrade to WS", err) handleError(w, http.StatusInternalServerError, err.Error()) return @@ -141,7 +141,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { sigConn.useJSON = false } - stats.PromServiceOperationCounter.WithLabelValues("signal_ws", "success", "").Add(1) + prometheus.ServiceOperationCounter.WithLabelValues("signal_ws", "success", "").Add(1) logger.Infow("new client WS connected", "connID", connId, "roomID", rm.Sid, diff --git a/pkg/service/wire.go b/pkg/service/wire.go index 78c0a67f7..786d0070f 100644 --- a/pkg/service/wire.go +++ b/pkg/service/wire.go @@ -20,6 +20,7 @@ import ( "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" + "github.com/livekit/livekit-server/pkg/telemetry" ) func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*LivekitServer, error) { @@ -30,6 +31,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live createKeyProvider, createWebhookNotifier, routing.CreateRouter, + telemetry.NewTelemetryService, NewRecordingService, NewRoomAllocator, NewRoomService, diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 48a1b1cd4..6384229dc 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -1,8 +1,7 @@ // Code generated by Wire. DO NOT EDIT. //go:generate go run github.com/google/wire/cmd/wire -//go:build !wireinject -// +build !wireinject +//+build !wireinject package service @@ -12,6 +11,7 @@ import ( "github.com/go-redis/redis/v8" "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" + "github.com/livekit/livekit-server/pkg/telemetry" "github.com/livekit/protocol/auth" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils" @@ -46,9 +46,10 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live if err != nil { return nil, err } - recordingService := NewRecordingService(messageBus, notifier) + telemetryService := telemetry.NewTelemetryService(notifier) + recordingService := NewRecordingService(messageBus, telemetryService) rtcService := NewRTCService(conf, roomAllocator, router, currentNode) - localRoomManager, err := NewLocalRoomManager(conf, roomStore, router, currentNode, notifier) + localRoomManager, err := NewLocalRoomManager(conf, roomStore, router, currentNode, telemetryService) if err != nil { return nil, err } diff --git a/pkg/telemetry/events.go b/pkg/telemetry/events.go new file mode 100644 index 000000000..d5f39b572 --- /dev/null +++ b/pkg/telemetry/events.go @@ -0,0 +1,139 @@ +package telemetry + +import ( + "context" + "time" + + "github.com/livekit/protocol/logger" + livekit "github.com/livekit/protocol/proto" + "github.com/livekit/protocol/webhook" + + "github.com/livekit/livekit-server/pkg/telemetry/prometheus" +) + +func (s *TelemetryService) RoomStarted(ctx context.Context, room *livekit.Room) { + s.pool.Submit(prometheus.RoomStarted) + + s.notifyEvent(ctx, &livekit.WebhookEvent{ + Event: webhook.EventRoomStarted, + Room: room, + }) + + // TODO: analytics service +} + +func (s *TelemetryService) RoomEnded(ctx context.Context, room *livekit.Room) { + s.pool.Submit(func() { + prometheus.RoomEnded(time.Unix(room.CreationTime, 0)) + }) + + s.notifyEvent(ctx, &livekit.WebhookEvent{ + Event: webhook.EventRoomFinished, + Room: room, + }) + + // TODO: analytics service +} + +func (s *TelemetryService) ParticipantJoined(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) { + s.pool.Submit(prometheus.AddParticipant) + + s.notifyEvent(ctx, &livekit.WebhookEvent{ + Event: webhook.EventParticipantJoined, + Room: room, + Participant: participant, + }) + + // TODO: analytics service +} + +func (s *TelemetryService) ParticipantLeft(ctx context.Context, room *livekit.Room, participant *livekit.ParticipantInfo) { + s.pool.Submit(prometheus.SubParticipant) + + s.notifyEvent(ctx, &livekit.WebhookEvent{ + Event: webhook.EventParticipantLeft, + Room: room, + Participant: participant, + }) + + // TODO: analytics service +} + +func (s *TelemetryService) PublishedTrack(SID, identity string, track *livekit.TrackInfo) { + s.pool.Submit(func() { + prometheus.AddPublishedTrack(track.Type.String()) + }) + + // TODO: analytics service +} + +func (s *TelemetryService) UnpublishedTrack(SID, identity string, track *livekit.TrackInfo) { + s.pool.Submit(func() { + prometheus.SubPublishedTrack(track.Type.String()) + }) + + // TODO: analytics service +} + +func (s *TelemetryService) SubscribedTrack(SID, identity string, track *livekit.TrackInfo) { + s.pool.Submit(func() { + prometheus.AddSubscribedTrack(track.Type.String()) + }) + + // TODO: analytics service +} + +func (s *TelemetryService) UnsubscribedTrack(SID, identity string, track *livekit.TrackInfo) { + s.pool.Submit(func() { + prometheus.SubSubscribedTrack(track.Type.String()) + }) + + // TODO: analytics service +} + +func (s *TelemetryService) RecordingStarted(ctx context.Context, recordingID string, req *livekit.StartRecordingRequest) { + logger.Debugw("recording started", "recordingID", recordingID) + + s.notifyEvent(ctx, &livekit.WebhookEvent{ + Event: webhook.EventRecordingStarted, + RecordingInfo: &livekit.RecordingInfo{ + Id: recordingID, + Request: req, + }, + }) + + // TODO: analytics service +} + +func (s *TelemetryService) RecordingEnded(res *livekit.RecordingResult) { + // log results + values := []interface{}{"recordingID", res.Id} + if res.Error != "" { + values = append(values, "error", res.Error) + } else { + values = append(values, "duration", time.Duration(res.Duration*1e9)) + if res.DownloadUrl != "" { + values = append(values, "url", res.DownloadUrl) + } + } + logger.Debugw("recording ended", values...) + + s.notifyEvent(context.Background(), &livekit.WebhookEvent{ + Event: webhook.EventRecordingFinished, + RecordingResult: res, + }) + + // TODO: analytics service +} + +func (s *TelemetryService) notifyEvent(ctx context.Context, event *livekit.WebhookEvent) { + if s.notifier == nil { + return + } + + s.pool.Submit(func() { + if err := s.notifier.Notify(ctx, event); err != nil { + logger.Warnw("failed to notify webhook", err, "event", event.Event) + } + }) +} diff --git a/pkg/telemetry/prometheus/node.go b/pkg/telemetry/prometheus/node.go new file mode 100644 index 000000000..c66ddf702 --- /dev/null +++ b/pkg/telemetry/prometheus/node.go @@ -0,0 +1,86 @@ +package prometheus + +import ( + "sync/atomic" + "time" + + livekit "github.com/livekit/protocol/proto" + "github.com/prometheus/client_golang/prometheus" +) + +const livekitNamespace = "livekit" + +var ( + MessageCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: livekitNamespace, + Subsystem: "node", + Name: "messages", + }, + []string{"type", "status"}, + ) + + ServiceOperationCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: livekitNamespace, + Subsystem: "node", + Name: "service_operation", + }, + []string{"type", "status", "error_type"}, + ) +) + +func init() { + prometheus.MustRegister(MessageCounter) + prometheus.MustRegister(ServiceOperationCounter) + + initPacketStats() + initRoomStats() +} + +func UpdateCurrentNodeStats(nodeStats *livekit.NodeStats) error { + updatedAtPrevious := nodeStats.UpdatedAt + nodeStats.UpdatedAt = time.Now().Unix() + secondsSinceLastUpdate := nodeStats.UpdatedAt - updatedAtPrevious + + err := updateCurrentNodeSystemStats(nodeStats) + updateCurrentNodeRoomStats(nodeStats) + updateCurrentNodePacketStats(nodeStats, secondsSinceLastUpdate) + + return err +} + +func updateCurrentNodeRoomStats(nodeStats *livekit.NodeStats) { + nodeStats.NumClients = atomic.LoadInt32(&atomicParticipantTotal) + nodeStats.NumRooms = atomic.LoadInt32(&atomicRoomTotal) + nodeStats.NumTracksIn = atomic.LoadInt32(&atomicTrackPublishedTotal) + nodeStats.NumTracksOut = atomic.LoadInt32(&atomicTrackSubscribedTotal) +} + +func updateCurrentNodePacketStats(nodeStats *livekit.NodeStats, secondsSinceLastUpdate int64) { + if secondsSinceLastUpdate == 0 { + return + } + + bytesInPrevious := nodeStats.BytesIn + bytesOutPrevious := nodeStats.BytesOut + packetsInPrevious := nodeStats.PacketsIn + packetsOutPrevious := nodeStats.PacketsOut + nackTotalPrevious := nodeStats.NackTotal + + nodeStats.BytesIn = atomic.LoadUint64(&atomicBytesIn) + nodeStats.BytesOut = atomic.LoadUint64(&atomicBytesOut) + nodeStats.PacketsIn = atomic.LoadUint64(&atomicPacketsIn) + nodeStats.PacketsOut = atomic.LoadUint64(&atomicPacketsOut) + nodeStats.NackTotal = atomic.LoadUint64(&atomicNackTotal) + + nodeStats.BytesInPerSec = perSec(bytesInPrevious, nodeStats.BytesIn, secondsSinceLastUpdate) + nodeStats.BytesOutPerSec = perSec(bytesOutPrevious, nodeStats.BytesOut, secondsSinceLastUpdate) + nodeStats.PacketsInPerSec = perSec(packetsInPrevious, nodeStats.PacketsIn, secondsSinceLastUpdate) + nodeStats.PacketsOutPerSec = perSec(packetsOutPrevious, nodeStats.PacketsOut, secondsSinceLastUpdate) + nodeStats.NackPerSec = perSec(nackTotalPrevious, nodeStats.NackTotal, secondsSinceLastUpdate) +} + +func perSec(prev, curr uint64, secs int64) float32 { + return float32(curr-prev) / float32(secs) +} diff --git a/pkg/utils/stats/node_linux.go b/pkg/telemetry/prometheus/node_linux.go similarity index 94% rename from pkg/utils/stats/node_linux.go rename to pkg/telemetry/prometheus/node_linux.go index 28e59308f..27863c343 100644 --- a/pkg/utils/stats/node_linux.go +++ b/pkg/telemetry/prometheus/node_linux.go @@ -1,6 +1,7 @@ +//go:build linux // +build linux -package stats +package prometheus import ( linuxproc "github.com/c9s/goprocinfo/linux" diff --git a/pkg/utils/stats/node_nonlinux.go b/pkg/telemetry/prometheus/node_nonlinux.go similarity index 80% rename from pkg/utils/stats/node_nonlinux.go rename to pkg/telemetry/prometheus/node_nonlinux.go index 9d35d95b3..e713fd050 100644 --- a/pkg/utils/stats/node_nonlinux.go +++ b/pkg/telemetry/prometheus/node_nonlinux.go @@ -1,6 +1,7 @@ +//go:build !linux // +build !linux -package stats +package prometheus import livekit "github.com/livekit/protocol/proto" diff --git a/pkg/telemetry/prometheus/packets.go b/pkg/telemetry/prometheus/packets.go new file mode 100644 index 000000000..78e66be70 --- /dev/null +++ b/pkg/telemetry/prometheus/packets.go @@ -0,0 +1,83 @@ +package prometheus + +import ( + "sync/atomic" + + "github.com/prometheus/client_golang/prometheus" +) + +type Direction string + +const ( + Incoming Direction = "incoming" + Outgoing Direction = "outgoing" +) + +var ( + atomicBytesIn uint64 + atomicBytesOut uint64 + atomicPacketsIn uint64 + atomicPacketsOut uint64 + atomicNackTotal uint64 + + promPacketLabels = []string{"direction"} + + promPacketTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: livekitNamespace, + Subsystem: "packet", + Name: "total", + }, promPacketLabels) + promPacketBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: livekitNamespace, + Subsystem: "packet", + Name: "bytes", + }, promPacketLabels) + promNackTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: livekitNamespace, + Subsystem: "nack", + Name: "total", + }, promPacketLabels) + promPliTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: livekitNamespace, + Subsystem: "pli", + Name: "total", + }, promPacketLabels) + promFirTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: livekitNamespace, + Subsystem: "fir", + Name: "total", + }, promPacketLabels) +) + +func initPacketStats() { + prometheus.MustRegister(promPacketTotal) + prometheus.MustRegister(promPacketBytes) + prometheus.MustRegister(promNackTotal) + prometheus.MustRegister(promPliTotal) + prometheus.MustRegister(promFirTotal) +} + +func IncrementPackets(direction Direction, pktLen uint64) { + promPacketTotal.WithLabelValues(string(direction)).Add(1) + promPacketBytes.WithLabelValues(string(direction)).Add(float64(pktLen)) + if direction == Incoming { + atomic.AddUint64(&atomicPacketsIn, 1) + atomic.AddUint64(&atomicBytesIn, pktLen) + } else { + atomic.AddUint64(&atomicPacketsOut, 1) + atomic.AddUint64(&atomicBytesOut, pktLen) + } +} + +func IncrementNack(direction Direction) { + promNackTotal.WithLabelValues(string(direction)).Add(1) + atomic.AddUint64(&atomicNackTotal, 1) +} + +func IncrementPLI(direction Direction) { + promPliTotal.WithLabelValues(string(direction)).Add(1) +} + +func IncrementFIR(direction Direction) { + promFirTotal.WithLabelValues(string(direction)).Add(1) +} diff --git a/pkg/utils/stats/roomstatsreporter.go b/pkg/telemetry/prometheus/rooms.go similarity index 61% rename from pkg/utils/stats/roomstatsreporter.go rename to pkg/telemetry/prometheus/rooms.go index 7bf944d41..15afd4934 100644 --- a/pkg/utils/stats/roomstatsreporter.go +++ b/pkg/telemetry/prometheus/rooms.go @@ -1,10 +1,9 @@ -package stats +package prometheus import ( "sync/atomic" "time" - livekit "github.com/livekit/protocol/proto" "github.com/prometheus/client_golang/prometheus" ) @@ -44,7 +43,7 @@ var ( }, []string{"kind"}) ) -func initRoomStatsReporter() { +func initRoomStats() { prometheus.MustRegister(promRoomTotal) prometheus.MustRegister(promRoomDuration) prometheus.MustRegister(promParticipantTotal) @@ -52,68 +51,45 @@ func initRoomStatsReporter() { prometheus.MustRegister(promTrackSubscribedTotal) } -// RoomStatsReporter is created for each room -type RoomStatsReporter struct { - roomName string - startedAt time.Time - Incoming *PacketStats - Outgoing *PacketStats -} - -func NewRoomStatsReporter() *RoomStatsReporter { - return &RoomStatsReporter{ - Incoming: newPacketStats("incoming"), - Outgoing: newPacketStats("outgoing"), - } -} - -func (r *RoomStatsReporter) RoomStarted() { - r.startedAt = time.Now() +func RoomStarted() { promRoomTotal.Add(1) atomic.AddInt32(&atomicRoomTotal, 1) } -func (r *RoomStatsReporter) RoomEnded() { - if !r.startedAt.IsZero() { - promRoomDuration.Observe(float64(time.Now().Sub(r.startedAt)) / float64(time.Second)) +func RoomEnded(startedAt time.Time) { + if !startedAt.IsZero() { + promRoomDuration.Observe(float64(time.Now().Sub(startedAt)) / float64(time.Second)) } promRoomTotal.Sub(1) atomic.AddInt32(&atomicRoomTotal, -1) } -func (r *RoomStatsReporter) AddParticipant() { +func AddParticipant() { promParticipantTotal.Add(1) atomic.AddInt32(&atomicParticipantTotal, 1) } -func (r *RoomStatsReporter) SubParticipant() { +func SubParticipant() { promParticipantTotal.Sub(1) atomic.AddInt32(&atomicParticipantTotal, -1) } -func (r *RoomStatsReporter) AddPublishedTrack(kind string) { +func AddPublishedTrack(kind string) { promTrackPublishedTotal.WithLabelValues(kind).Add(1) atomic.AddInt32(&atomicTrackPublishedTotal, 1) } -func (r *RoomStatsReporter) SubPublishedTrack(kind string) { +func SubPublishedTrack(kind string) { promTrackPublishedTotal.WithLabelValues(kind).Sub(1) atomic.AddInt32(&atomicTrackPublishedTotal, -1) } -func (r *RoomStatsReporter) AddSubscribedTrack(kind string) { +func AddSubscribedTrack(kind string) { promTrackSubscribedTotal.WithLabelValues(kind).Add(1) atomic.AddInt32(&atomicTrackSubscribedTotal, 1) } -func (r *RoomStatsReporter) SubSubscribedTrack(kind string) { +func SubSubscribedTrack(kind string) { promTrackSubscribedTotal.WithLabelValues(kind).Sub(1) atomic.AddInt32(&atomicTrackSubscribedTotal, -1) } - -func updateCurrentNodeRoomStats(nodeStats *livekit.NodeStats) { - nodeStats.NumClients = atomic.LoadInt32(&atomicParticipantTotal) - nodeStats.NumRooms = atomic.LoadInt32(&atomicRoomTotal) - nodeStats.NumTracksIn = atomic.LoadInt32(&atomicTrackPublishedTotal) - nodeStats.NumTracksOut = atomic.LoadInt32(&atomicTrackSubscribedTotal) -} diff --git a/pkg/telemetry/service.go b/pkg/telemetry/service.go new file mode 100644 index 000000000..3d57a2cea --- /dev/null +++ b/pkg/telemetry/service.go @@ -0,0 +1,91 @@ +package telemetry + +import ( + "github.com/gammazero/workerpool" + "github.com/livekit/protocol/webhook" + "github.com/pion/interceptor" + "github.com/pion/rtcp" + "github.com/pion/rtp" +) + +type TelemetryService struct { + notifier webhook.Notifier + pool *workerpool.WorkerPool +} + +func NewTelemetryService(notifier webhook.Notifier) *TelemetryService { + return &TelemetryService{ + notifier: notifier, + pool: workerpool.New(10), + } +} + +type StatsInterceptorFactory struct { + t *TelemetryService + participantID string + identity string +} + +func (s *TelemetryService) NewStatsInterceptorFactory(participantID, identity string) *StatsInterceptorFactory { + return &StatsInterceptorFactory{ + t: s, + participantID: participantID, + identity: identity, + } +} + +func (f *StatsInterceptorFactory) NewInterceptor(id string) (interceptor.Interceptor, error) { + return &StatsInterceptor{ + t: f.t, + participantID: f.participantID, + identity: f.identity, + }, nil +} + +type StatsInterceptor struct { + interceptor.NoOp + + t *TelemetryService + participantID string + identity string +} + +// --- Incoming --- + +// BindRTCPReader lets you modify any incoming RTCP packets. It is called once per sender/receiver, however this might +// change in the future. The returned method will be called once per packet batch. +func (s *StatsInterceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.RTCPReader { + return interceptor.RTCPReaderFunc(func(bytes []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { + s.t.HandleIncomingRTCP(s.participantID, s.identity, bytes) + return reader.Read(bytes, attributes) + }) +} + +// BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method +// will be called once per rtp packet. +func (s *StatsInterceptor) BindRemoteStream(_ *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader { + return interceptor.RTPReaderFunc(func(payload []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) { + s.t.HandleIncomingRTP(s.participantID, s.identity, uint64(len(payload))) + return reader.Read(payload, attributes) + }) +} + +// --- Outgoing --- + +// BindRTCPWriter lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method +// will be called once per packet batch. +func (s *StatsInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter { + return interceptor.RTCPWriterFunc(func(pkts []rtcp.Packet, attributes interceptor.Attributes) (int, error) { + s.t.HandleOutgoingRTCP(s.participantID, s.identity, pkts) + return writer.Write(pkts, attributes) + }) +} + +// BindLocalStream lets you modify any outgoing RTP packets. It is called once for per LocalStream. The returned method +// will be called once per rtp packet. +func (s *StatsInterceptor) BindLocalStream(_ *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter { + return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { + s.t.HandleOutgoingRTP(s.participantID, s.identity, uint64(len(payload))) + return writer.Write(header, payload, attributes) + }) +} diff --git a/pkg/telemetry/stats.go b/pkg/telemetry/stats.go new file mode 100644 index 000000000..74f0ae495 --- /dev/null +++ b/pkg/telemetry/stats.go @@ -0,0 +1,64 @@ +package telemetry + +import ( + "github.com/livekit/protocol/logger" + "github.com/pion/rtcp" + + "github.com/livekit/livekit-server/pkg/telemetry/prometheus" +) + +func (s *TelemetryService) HandleIncomingRTCP(participantID, identity string, bytes []byte) { + pkts, err := rtcp.Unmarshal(bytes) + if err != nil { + logger.Errorw("Interceptor failed to unmarshal rtcp packets", err) + return + } + + s.pool.Submit(func() { + for _, pkt := range pkts { + switch pkt.(type) { + case *rtcp.TransportLayerNack: + prometheus.IncrementNack(prometheus.Incoming) + case *rtcp.PictureLossIndication: + prometheus.IncrementPLI(prometheus.Incoming) + case *rtcp.FullIntraRequest: + prometheus.IncrementFIR(prometheus.Incoming) + } + } + }) + + // TODO: analytics service +} + +func (s *TelemetryService) HandleIncomingRTP(participantID, identity string, pktLen uint64) { + s.pool.Submit(func() { + prometheus.IncrementPackets(prometheus.Incoming, pktLen) + }) + + // TODO: analytics service +} + +func (s *TelemetryService) HandleOutgoingRTCP(participantID, identity string, pkts []rtcp.Packet) { + s.pool.Submit(func() { + for _, pkt := range pkts { + switch pkt.(type) { + case *rtcp.TransportLayerNack: + prometheus.IncrementNack(prometheus.Outgoing) + case *rtcp.PictureLossIndication: + prometheus.IncrementPLI(prometheus.Outgoing) + case *rtcp.FullIntraRequest: + prometheus.IncrementFIR(prometheus.Outgoing) + } + } + }) + + // TODO: analytics service +} + +func (s *TelemetryService) HandleOutgoingRTP(participantID, identity string, pktLen uint64) { + s.pool.Submit(func() { + prometheus.IncrementPackets(prometheus.Outgoing, pktLen) + }) + + // TODO: analytics service +} diff --git a/pkg/utils/stats/packetstats.go b/pkg/utils/stats/packetstats.go deleted file mode 100644 index cdb45d5f8..000000000 --- a/pkg/utils/stats/packetstats.go +++ /dev/null @@ -1,157 +0,0 @@ -package stats - -import ( - "sync/atomic" - - livekit "github.com/livekit/protocol/proto" - "github.com/pion/rtcp" - "github.com/prometheus/client_golang/prometheus" -) - -var ( - atomicBytesIn uint64 - atomicBytesOut uint64 - atomicPacketsIn uint64 - atomicPacketsOut uint64 - atomicNackTotal uint64 - - promPacketLabels = []string{"direction"} - - promPacketTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: livekitNamespace, - Subsystem: "packet", - Name: "total", - }, promPacketLabels) - promPacketBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: livekitNamespace, - Subsystem: "packet", - Name: "bytes", - }, promPacketLabels) - promNackTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: livekitNamespace, - Subsystem: "nack", - Name: "total", - }, promPacketLabels) - promPliTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: livekitNamespace, - Subsystem: "pli", - Name: "total", - }, promPacketLabels) - promFirTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: livekitNamespace, - Subsystem: "fir", - Name: "total", - }, promPacketLabels) -) - -func initPacketStats() { - prometheus.MustRegister(promPacketTotal) - prometheus.MustRegister(promPacketBytes) - prometheus.MustRegister(promNackTotal) - prometheus.MustRegister(promPliTotal) - prometheus.MustRegister(promFirTotal) -} - -type PacketStats struct { - direction string // incoming or outgoing - - PacketBytes uint64 `json:"packetBytes"` - PacketTotal uint64 `json:"packetTotal"` - NackTotal uint64 `json:"nackTotal"` - PLITotal uint64 `json:"pliTotal"` - FIRTotal uint64 `json:"firTotal"` -} - -func newPacketStats(direction string) *PacketStats { - return &PacketStats{ - direction: direction, - } -} - -func (s *PacketStats) IncrementBytes(bytes uint64) { - promPacketBytes.WithLabelValues(s.direction).Add(float64(bytes)) - atomic.AddUint64(&s.PacketBytes, bytes) - if s.direction == "incoming" { - atomic.AddUint64(&atomicBytesIn, bytes) - } else { - atomic.AddUint64(&atomicBytesOut, bytes) - } -} - -func (s *PacketStats) IncrementPackets(count uint64) { - promPacketTotal.WithLabelValues(s.direction).Add(float64(count)) - atomic.AddUint64(&s.PacketTotal, count) - if s.direction == "incoming" { - atomic.AddUint64(&atomicPacketsIn, count) - } else { - atomic.AddUint64(&atomicPacketsOut, count) - } -} - -func (s *PacketStats) IncrementNack(count uint64) { - promNackTotal.WithLabelValues(s.direction).Add(float64(count)) - atomic.AddUint64(&s.NackTotal, count) - atomic.AddUint64(&atomicNackTotal, count) -} - -func (s *PacketStats) IncrementPLI(count uint64) { - promPliTotal.WithLabelValues(s.direction).Add(float64(count)) - atomic.AddUint64(&s.PLITotal, count) -} - -func (s *PacketStats) IncrementFIR(count uint64) { - promFirTotal.WithLabelValues(s.direction).Add(float64(count)) - atomic.AddUint64(&s.FIRTotal, count) -} - -func (s *PacketStats) HandleRTCP(pkts []rtcp.Packet) { - for _, rtcpPacket := range pkts { - switch rtcpPacket.(type) { - case *rtcp.TransportLayerNack: - s.IncrementNack(1) - case *rtcp.PictureLossIndication: - s.IncrementPLI(1) - case *rtcp.FullIntraRequest: - s.IncrementFIR(1) - } - } -} - -func (s PacketStats) Copy() *PacketStats { - return &PacketStats{ - direction: s.direction, - PacketBytes: atomic.LoadUint64(&s.PacketBytes), - PacketTotal: atomic.LoadUint64(&s.PacketTotal), - NackTotal: atomic.LoadUint64(&s.NackTotal), - PLITotal: atomic.LoadUint64(&s.PLITotal), - FIRTotal: atomic.LoadUint64(&s.FIRTotal), - } -} - -func updateCurrentNodePacketStats(nodeStats *livekit.NodeStats, secondsSinceLastUpdate int64) { - if secondsSinceLastUpdate == 0 { - return - } - - bytesInPrevious := nodeStats.BytesIn - bytesOutPrevious := nodeStats.BytesOut - packetsInPrevious := nodeStats.PacketsIn - packetsOutPrevious := nodeStats.PacketsOut - nackTotalPrevious := nodeStats.NackTotal - - nodeStats.BytesIn = atomic.LoadUint64(&atomicBytesIn) - nodeStats.BytesOut = atomic.LoadUint64(&atomicBytesOut) - nodeStats.PacketsIn = atomic.LoadUint64(&atomicPacketsIn) - nodeStats.PacketsOut = atomic.LoadUint64(&atomicPacketsOut) - nodeStats.NackTotal = atomic.LoadUint64(&atomicNackTotal) - - nodeStats.BytesInPerSec = perSec(bytesInPrevious, nodeStats.BytesIn, secondsSinceLastUpdate) - nodeStats.BytesOutPerSec = perSec(bytesOutPrevious, nodeStats.BytesOut, secondsSinceLastUpdate) - nodeStats.PacketsInPerSec = perSec(packetsInPrevious, nodeStats.PacketsIn, secondsSinceLastUpdate) - nodeStats.PacketsOutPerSec = perSec(packetsOutPrevious, nodeStats.PacketsOut, secondsSinceLastUpdate) - nodeStats.NackPerSec = perSec(nackTotalPrevious, nodeStats.NackTotal, secondsSinceLastUpdate) -} - -func perSec(prev, curr uint64, secs int64) float32 { - return float32(curr-prev) / float32(secs) -} diff --git a/pkg/utils/stats/stats.go b/pkg/utils/stats/stats.go deleted file mode 100644 index 284680c1d..000000000 --- a/pkg/utils/stats/stats.go +++ /dev/null @@ -1,129 +0,0 @@ -package stats - -import ( - "io" - "time" - - livekit "github.com/livekit/protocol/proto" - "github.com/pion/interceptor" - "github.com/pion/rtcp" - "github.com/pion/rtp" - "github.com/pion/transport/packetio" - "github.com/prometheus/client_golang/prometheus" -) - -const livekitNamespace = "livekit" - -var ( - PromMessageCounter = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: livekitNamespace, - Subsystem: "node", - Name: "messages", - }, - []string{"type", "status"}, - ) - - PromServiceOperationCounter = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: livekitNamespace, - Subsystem: "node", - Name: "service_operation", - }, - []string{"type", "status", "error_type"}, - ) -) - -func init() { - prometheus.MustRegister(PromMessageCounter) - prometheus.MustRegister(PromServiceOperationCounter) - - initPacketStats() - initRoomStatsReporter() -} - -// StatsBufferWrapper wraps a buffer factory so we could get information on -// incoming packets -type StatsBufferWrapper struct { - CreateBufferFunc func(packetType packetio.BufferPacketType, ssrc uint32) io.ReadWriteCloser - Stats *PacketStats -} - -func (w *StatsBufferWrapper) CreateBuffer(packetType packetio.BufferPacketType, ssrc uint32) io.ReadWriteCloser { - writer := w.CreateBufferFunc(packetType, ssrc) - if packetType == packetio.RTPBufferPacket { - // wrap this in a counter class - return &rtpReporterWriter{ - ReadWriteCloser: writer, - stats: w.Stats, - } - } - return writer -} - -type rtpReporterWriter struct { - io.ReadWriteCloser - stats *PacketStats -} - -func (w *rtpReporterWriter) Write(p []byte) (n int, err error) { - w.stats.IncrementPackets(1) - w.stats.IncrementBytes(uint64(len(p))) - return w.ReadWriteCloser.Write(p) -} - -// StatsInterceptorFactory is created for each participant to keep of track of outgoing stats -// it adheres to Pion interceptor.Factory interface -type StatsInterceptorFactory struct { - reporter *RoomStatsReporter -} - -func NewStatsInterceptorFactory(reporter *RoomStatsReporter) *StatsInterceptorFactory { - return &StatsInterceptorFactory{ - reporter: reporter, - } -} - -func (f *StatsInterceptorFactory) NewInterceptor(id string) (interceptor.Interceptor, error) { - return &StatsInterceptor{ - reporter: f.reporter, - }, nil -} - -// StatsInterceptor is created for each participant to keep of track of outgoing stats -// it adheres to Pion interceptor interface -type StatsInterceptor struct { - interceptor.NoOp - reporter *RoomStatsReporter -} - -// BindRTCPWriter lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method -// will be called once per packet batch. -func (s *StatsInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter { - return interceptor.RTCPWriterFunc(func(pkts []rtcp.Packet, attributes interceptor.Attributes) (int, error) { - s.reporter.Outgoing.HandleRTCP(pkts) - return writer.Write(pkts, attributes) - }) -} - -// BindLocalStream lets you modify any outgoing RTP packets. It is called once for per LocalStream. The returned method -// will be called once per rtp packet. -func (s *StatsInterceptor) BindLocalStream(_ *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter { - return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { - s.reporter.Outgoing.IncrementPackets(1) - s.reporter.Outgoing.IncrementBytes(uint64(len(payload))) - return writer.Write(header, payload, attributes) - }) -} - -func UpdateCurrentNodeStats(nodeStats *livekit.NodeStats) error { - updatedAtPrevious := nodeStats.UpdatedAt - nodeStats.UpdatedAt = time.Now().Unix() - secondsSinceLastUpdate := nodeStats.UpdatedAt - updatedAtPrevious - - err := updateCurrentNodeSystemStats(nodeStats) - updateCurrentNodeRoomStats(nodeStats) - updateCurrentNodePacketStats(nodeStats, secondsSinceLastUpdate) - - return err -}