diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index baef4aa30..dba8a1896 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -247,12 +247,10 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { pi.ID = livekit.ParticipantID(initialResponse.GetJoin().GetParticipant().GetSid()) } - var signalStats *telemetry.BytesTrackStats - if pi.ID != "" { - signalStats = telemetry.NewBytesTrackStats( - telemetry.BytesTrackIDForParticipantID(telemetry.BytesTrackTypeSignal, pi.ID), - pi.ID, - s.telemetry) + signalStats := telemetry.NewBytesSignalStats(r.Context(), s.telemetry) + if join := initialResponse.GetJoin(); join != nil { + signalStats.ResolveRoom(join.GetRoom()) + signalStats.ResolveParticipant(join.GetParticipant()) } pLogger := rtc.LoggerWithParticipant( @@ -274,9 +272,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { cr.RequestSink.Close() close(done) - if signalStats != nil { - signalStats.Stop() - } + signalStats.Stop() }() // upgrade only once the basics are good to go @@ -303,9 +299,8 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { pLogger.Warnw("could not write initial response", err) return } - if signalStats != nil { - signalStats.AddBytes(uint64(count), true) - } + signalStats.AddBytes(uint64(count), true) + pLogger.Debugw("new client WS connected", "connID", cr.ConnectionID, "reconnect", pi.Reconnect, @@ -349,20 +344,17 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { pLogger.Debugw("sending offer", "offer", m) case *livekit.SignalResponse_Answer: pLogger.Debugw("sending answer", "answer", m) - } - - if pi.ID == "" && res.GetJoin() != nil { - pi.ID = livekit.ParticipantID(res.GetJoin().GetParticipant().GetSid()) - signalStats = telemetry.NewBytesTrackStats( - telemetry.BytesTrackIDForParticipantID(telemetry.BytesTrackTypeSignal, pi.ID), - pi.ID, - s.telemetry) + case *livekit.SignalResponse_Join: + signalStats.ResolveRoom(m.Join.GetRoom()) + signalStats.ResolveParticipant(m.Join.GetParticipant()) + case *livekit.SignalResponse_RoomUpdate: + signalStats.ResolveRoom(m.RoomUpdate.GetRoom()) } if count, err := sigConn.WriteResponse(res); err != nil { pLogger.Warnw("error writing to websocket", err) return - } else if signalStats != nil { + } else { signalStats.AddBytes(uint64(count), true) } } @@ -390,9 +382,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { } return } - if signalStats != nil { - signalStats.AddBytes(uint64(count), false) - } + signalStats.AddBytes(uint64(count), false) switch m := req.Message.(type) { case *livekit.SignalRequest_Ping: @@ -405,7 +395,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { Pong: time.Now().UnixMilli(), }, }) - if perr == nil && signalStats != nil { + if perr == nil { signalStats.AddBytes(uint64(count), true) } case *livekit.SignalRequest_PingReq: @@ -417,7 +407,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { }, }, }) - if perr == nil && signalStats != nil { + if perr == nil { signalStats.AddBytes(uint64(count), true) } } diff --git a/pkg/telemetry/events.go b/pkg/telemetry/events.go index 35eaf668e..1d0a9c546 100644 --- a/pkg/telemetry/events.go +++ b/pkg/telemetry/events.go @@ -80,16 +80,17 @@ func (t *telemetryService) ParticipantJoined( shouldSendEvent bool, ) { t.enqueue(func() { - prometheus.IncrementParticipantRtcConnected(1) - prometheus.AddParticipant() - - t.createWorker( + _, found := t.getOrCreateWorker( ctx, livekit.RoomID(room.Sid), livekit.RoomName(room.Name), livekit.ParticipantID(participant.Sid), livekit.ParticipantIdentity(participant.Identity), ) + if !found { + prometheus.IncrementParticipantRtcConnected(1) + prometheus.AddParticipant() + } if shouldSendEvent { ev := newParticipantEvent(livekit.AnalyticsEventType_PARTICIPANT_JOINED, room, participant) @@ -117,18 +118,14 @@ func (t *telemetryService) ParticipantActive( }) } - worker, ok := t.getWorker(livekit.ParticipantID(participant.Sid)) - if !ok { - // in case of session migration, we may not have seen a Join event take place. - // we'd need to create the worker here before being able to process events - worker = t.createWorker( - ctx, - livekit.RoomID(room.Sid), - livekit.RoomName(room.Name), - livekit.ParticipantID(participant.Sid), - livekit.ParticipantIdentity(participant.Identity), - ) - + worker, found := t.getOrCreateWorker( + ctx, + livekit.RoomID(room.Sid), + livekit.RoomName(room.Name), + livekit.ParticipantID(participant.Sid), + livekit.ParticipantIdentity(participant.Identity), + ) + if !found { // need to also account for participant count prometheus.AddParticipant() } diff --git a/pkg/telemetry/signalanddatastats.go b/pkg/telemetry/signalanddatastats.go index f30f8bfe0..086192532 100644 --- a/pkg/telemetry/signalanddatastats.go +++ b/pkg/telemetry/signalanddatastats.go @@ -15,16 +15,17 @@ package telemetry import ( + "context" "fmt" + "sync" "time" + "github.com/frostbyte73/core" "go.uber.org/atomic" - "github.com/frostbyte73/core" + "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/utils" - - "github.com/livekit/livekit-server/pkg/config" ) type BytesTrackType string @@ -139,6 +140,61 @@ func (s *BytesTrackStats) reporter() { // ----------------------------------------------------------------------- +type BytesSignalStats struct { + BytesTrackStats + ctx context.Context + + mu sync.Mutex + ri *livekit.Room + pi *livekit.ParticipantInfo +} + +func NewBytesSignalStats(ctx context.Context, telemetry TelemetryService) *BytesSignalStats { + return &BytesSignalStats{ + BytesTrackStats: BytesTrackStats{ + telemetry: telemetry, + }, + ctx: ctx, + } +} + +func (s *BytesSignalStats) ResolveRoom(ri *livekit.Room) { + s.mu.Lock() + defer s.mu.Unlock() + if s.ri == nil && ri.GetSid() != "" { + s.ri = ri + s.maybeStart() + } +} + +func (s *BytesSignalStats) ResolveParticipant(pi *livekit.ParticipantInfo) { + s.mu.Lock() + defer s.mu.Unlock() + if s.pi == nil { + s.pi = pi + s.maybeStart() + } +} + +func (s *BytesSignalStats) maybeStart() { + if s.ri == nil || s.pi == nil { + return + } + + s.pID = livekit.ParticipantID(s.pi.Sid) + s.trackID = BytesTrackIDForParticipantID(BytesTrackTypeSignal, s.pID) + + s.telemetry.ParticipantJoined(s.ctx, s.ri, s.pi, nil, nil, false) + go s.reporter() +} + +func (s *BytesSignalStats) reporter() { + s.BytesTrackStats.reporter() + s.telemetry.ParticipantLeft(s.ctx, s.ri, s.pi, false) +} + +// ----------------------------------------------------------------------- + func BytesTrackIDForParticipantID(typ BytesTrackType, participantID livekit.ParticipantID) livekit.TrackID { return livekit.TrackID(fmt.Sprintf("%s_%s%s", utils.TrackPrefix, string(typ), participantID)) } diff --git a/pkg/telemetry/telemetryservice.go b/pkg/telemetry/telemetryservice.go index c684d5842..7bf544167 100644 --- a/pkg/telemetry/telemetryservice.go +++ b/pkg/telemetry/telemetryservice.go @@ -19,12 +19,13 @@ import ( "sync" "time" + "golang.org/x/exp/maps" + "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/utils" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/webhook" - "golang.org/x/exp/maps" ) //go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . TelemetryService @@ -153,12 +154,19 @@ func (t *telemetryService) getWorker(participantID livekit.ParticipantID) (worke return } -func (t *telemetryService) createWorker(ctx context.Context, +func (t *telemetryService) getOrCreateWorker(ctx context.Context, roomID livekit.RoomID, roomName livekit.RoomName, participantID livekit.ParticipantID, participantIdentity livekit.ParticipantIdentity, -) *StatsWorker { +) (*StatsWorker, bool) { + t.lock.Lock() + defer t.lock.Unlock() + + if worker, ok := t.workers[participantID]; ok { + return worker, true + } + worker := newStatsWorker( ctx, t, @@ -168,11 +176,10 @@ func (t *telemetryService) createWorker(ctx context.Context, participantIdentity, ) - t.lock.Lock() t.workers[participantID] = worker t.workersShadow = maps.Values(t.workers) - t.lock.Unlock() - return worker + + return worker, false } func (t *telemetryService) cleanupWorkers() {