diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index d19e2d793..c58a6b1f4 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -359,7 +359,7 @@ func (r *RoomManager) StartSession( ), pi.ReconnectReason, ); err != nil { - logger.Warnw("could not resume participant", err, "participant", pi.Identity) + participant.GetLogger().Warnw("could not resume participant", err) return err } r.telemetry.ParticipantResumed(ctx, room.ToProto(), participant.ToProto(), livekit.NodeID(r.currentNode.Id), pi.ReconnectReason) @@ -394,10 +394,16 @@ func (r *RoomManager) StartSession( return errors.New("could not restart participant") } - logger.Debugw("starting RTC session", + sid := livekit.ParticipantID(guid.New(utils.ParticipantPrefix)) + pLogger := rtc.LoggerWithParticipant( + rtc.LoggerWithRoom(logger.GetLogger(), room.Name(), room.ID()), + pi.Identity, + sid, + false, + ) + pLogger.Infow("starting RTC session", "room", room.Name(), "nodeID", r.currentNode.Id, - "participant", pi.Identity, "clientInfo", logger.Proto(pi.Client), "reconnect", pi.Reconnect, "reconnectReason", pi.ReconnectReason, @@ -413,12 +419,6 @@ func (r *RoomManager) StartSession( if pi.DisableICELite { rtcConf.SettingEngine.SetLite(false) } - sid := livekit.ParticipantID(guid.New(utils.ParticipantPrefix)) - pLogger := rtc.LoggerWithParticipant( - rtc.LoggerWithRoom(logger.GetLogger(), room.Name(), room.ID()), - pi.Identity, - sid, - false) // default allow forceTCP allowFallback := true if r.config.RTC.AllowTCPFallback != nil { @@ -546,7 +546,7 @@ func (r *RoomManager) StartSession( participant.OnClaimsChanged(func(participant types.LocalParticipant) { pLogger.Debugw("refreshing client token after claims change") if err := r.refreshToken(participant); err != nil { - logger.Errorw("could not refresh token", err) + pLogger.Errorw("could not refresh token", err) } }) participant.OnICEConfigChanged(func(participant types.LocalParticipant, iceConfig *livekit.ICEConfig) { @@ -696,8 +696,6 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.LocalPa pLogger.Errorw("could not refresh token", err, "connID", requestSource.ConnectionID()) } case obj := <-requestSource.ReadChan(): - // In single node mode, the request source is directly tied to the signal message channel - // this means ICE restart isn't possible in single node mode if obj == nil { if room.GetParticipantRequestSource(participant.Identity()) == requestSource { participant.HandleSignalSourceClose() diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index a2feb19ef..851816907 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -276,6 +276,12 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { signalStats.ResolveRoom(join.GetRoom()) signalStats.ResolveParticipant(join.GetParticipant()) } + if pi.Reconnect && pi.ID != "" { + signalStats.ResolveParticipant(&livekit.ParticipantInfo{ + Sid: string(pi.ID), + Identity: string(pi.Identity), + }) + } closedByClient := atomic.NewBool(false) done := make(chan struct{}) diff --git a/pkg/telemetry/events.go b/pkg/telemetry/events.go index a43a086a4..d3488a273 100644 --- a/pkg/telemetry/events.go +++ b/pkg/telemetry/events.go @@ -86,6 +86,7 @@ func (t *telemetryService) ParticipantJoined( livekit.RoomName(room.Name), livekit.ParticipantID(participant.Sid), livekit.ParticipantIdentity(participant.Identity), + false, ) if !found { prometheus.IncrementParticipantRtcConnected(1) @@ -124,6 +125,7 @@ func (t *telemetryService) ParticipantActive( livekit.RoomName(room.Name), livekit.ParticipantID(participant.Sid), livekit.ParticipantIdentity(participant.Identity), + false, ) if !found { // need to also account for participant count @@ -145,6 +147,27 @@ func (t *telemetryService) ParticipantResumed( reason livekit.ReconnectReason, ) { t.enqueue(func() { + // create a worker if needed. + // + // Signalling channel stats collector and media channel stats collector could both call + // ParticipantJoined and ParticipantLeft. + // + // On a resume, the signalling channel collector would call `ParticipantLeft` which would close + // the corresponding participant's stats worker. + // + // So, on a successful resume, create the worker if needed. + _, found := t.getOrCreateWorker( + ctx, + livekit.RoomID(room.Sid), + livekit.RoomName(room.Name), + livekit.ParticipantID(participant.Sid), + livekit.ParticipantIdentity(participant.Identity), + true, + ) + if !found { + prometheus.AddParticipant() + } + ev := newParticipantEvent(livekit.AnalyticsEventType_PARTICIPANT_RESUMED, room, participant) ev.ClientMeta = &livekit.AnalyticsClientMeta{ Node: string(nodeID), diff --git a/pkg/telemetry/telemetryservice.go b/pkg/telemetry/telemetryservice.go index 151f7e814..0079c5d5a 100644 --- a/pkg/telemetry/telemetryservice.go +++ b/pkg/telemetry/telemetryservice.go @@ -193,15 +193,22 @@ func (t *telemetryService) getOrCreateWorker( roomName livekit.RoomName, participantID livekit.ParticipantID, participantIdentity livekit.ParticipantIdentity, + transferConnectedState bool, ) (*StatsWorker, bool) { t.workersMu.Lock() defer t.workersMu.Unlock() - if worker, ok := t.workers[participantID]; ok && !worker.Closed() { + worker, ok := t.workers[participantID] + if ok && !worker.Closed() { return worker, true } - worker := newStatsWorker( + existingIsConnected := false + if ok && transferConnectedState { + existingIsConnected = worker.IsConnected() + } + + worker = newStatsWorker( ctx, t, roomID, @@ -209,6 +216,9 @@ func (t *telemetryService) getOrCreateWorker( participantID, participantIdentity, ) + if existingIsConnected { + worker.SetConnected() + } t.workers[participantID] = worker