diff --git a/pkg/rtc/transportmanager.go b/pkg/rtc/transportmanager.go index 1df76e823..131255046 100644 --- a/pkg/rtc/transportmanager.go +++ b/pkg/rtc/transportmanager.go @@ -83,6 +83,7 @@ func NewTransportManager(params TransportManagerParams) (*TransportManager, erro t := &TransportManager{ params: params, mediaLossProxy: NewMediaLossProxy(MediaLossProxyParams{Logger: params.Logger}), + iceConfig: &livekit.ICEConfig{}, } t.mediaLossProxy.OnMediaLossUpdate(t.onMediaLossUpdate) diff --git a/pkg/telemetry/events.go b/pkg/telemetry/events.go index e68dfb3c1..bb39b372d 100644 --- a/pkg/telemetry/events.go +++ b/pkg/telemetry/events.go @@ -110,6 +110,8 @@ func (t *telemetryService) ParticipantActive( worker, ok := t.getWorker(livekit.ParticipantID(participant.Sid)) if !ok { + // in case of session migration, we may not have seen a Join event take place. + // we'd need to create the worker here before being able to process events worker = t.createWorker( ctx, livekit.RoomID(room.Sid), @@ -117,6 +119,9 @@ func (t *telemetryService) ParticipantActive( livekit.ParticipantID(participant.Sid), livekit.ParticipantIdentity(participant.Identity), ) + + // need to also account for participant count + prometheus.AddParticipant() } worker.SetConnected() @@ -138,12 +143,17 @@ func (t *telemetryService) ParticipantLeft(ctx context.Context, ) { t.enqueue(func() { isConnected := false + hasWorker := false if worker, ok := t.getWorker(livekit.ParticipantID(participant.Sid)); ok { + hasWorker = true isConnected = worker.IsConnected() worker.Close() } - prometheus.SubParticipant() + if hasWorker { + // signifies we had incremented participant count + prometheus.SubParticipant() + } if isConnected && shouldSendEvent { t.NotifyEvent(ctx, &livekit.WebhookEvent{