Recreate stats worker on resume if needed. (#2982)

* Ref count the stats worker.

NOTE: Don't liek this much, but wanted to open this get some 👀 on
this and get feedback.

There are two entities, one for counting signal bytes and another for
media stats. They both send `ParticipantJoined` and `ParticipantLeft`
event.

In the case of a participant resume, as the old web socket
connection is closed, that triggers a signal stats counter close. That
would call `ParticipantLeft` and that would close the stats worker.

The closed stats worker got reaped in `FlushStats` after three minutes.

So, all events after that did not have a worker and hence went
unreported including missing participant_left webhook because it relied
on checking if a participant was ever connected and that needed to check
the worker state.

Using a ref count to keep track of join/leaves. And not close the worker
until ref count goes down to 0.

* create a stats worker on resume

* revert incorrect changes

* transfer connected state

* transfer connected state when creating worker

* resolve participant on a resume
This commit is contained in:
Raja Subramanian
2024-09-06 23:58:03 +05:30
committed by GitHub
parent 95fa55eb5d
commit bec7453a1f
4 changed files with 51 additions and 14 deletions
+10 -12
View File
@@ -359,7 +359,7 @@ func (r *RoomManager) StartSession(
),
pi.ReconnectReason,
); err != nil {
logger.Warnw("could not resume participant", err, "participant", pi.Identity)
participant.GetLogger().Warnw("could not resume participant", err)
return err
}
r.telemetry.ParticipantResumed(ctx, room.ToProto(), participant.ToProto(), livekit.NodeID(r.currentNode.Id), pi.ReconnectReason)
@@ -394,10 +394,16 @@ func (r *RoomManager) StartSession(
return errors.New("could not restart participant")
}
logger.Debugw("starting RTC session",
sid := livekit.ParticipantID(guid.New(utils.ParticipantPrefix))
pLogger := rtc.LoggerWithParticipant(
rtc.LoggerWithRoom(logger.GetLogger(), room.Name(), room.ID()),
pi.Identity,
sid,
false,
)
pLogger.Infow("starting RTC session",
"room", room.Name(),
"nodeID", r.currentNode.Id,
"participant", pi.Identity,
"clientInfo", logger.Proto(pi.Client),
"reconnect", pi.Reconnect,
"reconnectReason", pi.ReconnectReason,
@@ -413,12 +419,6 @@ func (r *RoomManager) StartSession(
if pi.DisableICELite {
rtcConf.SettingEngine.SetLite(false)
}
sid := livekit.ParticipantID(guid.New(utils.ParticipantPrefix))
pLogger := rtc.LoggerWithParticipant(
rtc.LoggerWithRoom(logger.GetLogger(), room.Name(), room.ID()),
pi.Identity,
sid,
false)
// default allow forceTCP
allowFallback := true
if r.config.RTC.AllowTCPFallback != nil {
@@ -546,7 +546,7 @@ func (r *RoomManager) StartSession(
participant.OnClaimsChanged(func(participant types.LocalParticipant) {
pLogger.Debugw("refreshing client token after claims change")
if err := r.refreshToken(participant); err != nil {
logger.Errorw("could not refresh token", err)
pLogger.Errorw("could not refresh token", err)
}
})
participant.OnICEConfigChanged(func(participant types.LocalParticipant, iceConfig *livekit.ICEConfig) {
@@ -696,8 +696,6 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.LocalPa
pLogger.Errorw("could not refresh token", err, "connID", requestSource.ConnectionID())
}
case obj := <-requestSource.ReadChan():
// In single node mode, the request source is directly tied to the signal message channel
// this means ICE restart isn't possible in single node mode
if obj == nil {
if room.GetParticipantRequestSource(participant.Identity()) == requestSource {
participant.HandleSignalSourceClose()
+6
View File
@@ -276,6 +276,12 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
signalStats.ResolveRoom(join.GetRoom())
signalStats.ResolveParticipant(join.GetParticipant())
}
if pi.Reconnect && pi.ID != "" {
signalStats.ResolveParticipant(&livekit.ParticipantInfo{
Sid: string(pi.ID),
Identity: string(pi.Identity),
})
}
closedByClient := atomic.NewBool(false)
done := make(chan struct{})
+23
View File
@@ -86,6 +86,7 @@ func (t *telemetryService) ParticipantJoined(
livekit.RoomName(room.Name),
livekit.ParticipantID(participant.Sid),
livekit.ParticipantIdentity(participant.Identity),
false,
)
if !found {
prometheus.IncrementParticipantRtcConnected(1)
@@ -124,6 +125,7 @@ func (t *telemetryService) ParticipantActive(
livekit.RoomName(room.Name),
livekit.ParticipantID(participant.Sid),
livekit.ParticipantIdentity(participant.Identity),
false,
)
if !found {
// need to also account for participant count
@@ -145,6 +147,27 @@ func (t *telemetryService) ParticipantResumed(
reason livekit.ReconnectReason,
) {
t.enqueue(func() {
// create a worker if needed.
//
// Signalling channel stats collector and media channel stats collector could both call
// ParticipantJoined and ParticipantLeft.
//
// On a resume, the signalling channel collector would call `ParticipantLeft` which would close
// the corresponding participant's stats worker.
//
// So, on a successful resume, create the worker if needed.
_, found := t.getOrCreateWorker(
ctx,
livekit.RoomID(room.Sid),
livekit.RoomName(room.Name),
livekit.ParticipantID(participant.Sid),
livekit.ParticipantIdentity(participant.Identity),
true,
)
if !found {
prometheus.AddParticipant()
}
ev := newParticipantEvent(livekit.AnalyticsEventType_PARTICIPANT_RESUMED, room, participant)
ev.ClientMeta = &livekit.AnalyticsClientMeta{
Node: string(nodeID),
+12 -2
View File
@@ -193,15 +193,22 @@ func (t *telemetryService) getOrCreateWorker(
roomName livekit.RoomName,
participantID livekit.ParticipantID,
participantIdentity livekit.ParticipantIdentity,
transferConnectedState bool,
) (*StatsWorker, bool) {
t.workersMu.Lock()
defer t.workersMu.Unlock()
if worker, ok := t.workers[participantID]; ok && !worker.Closed() {
worker, ok := t.workers[participantID]
if ok && !worker.Closed() {
return worker, true
}
worker := newStatsWorker(
existingIsConnected := false
if ok && transferConnectedState {
existingIsConnected = worker.IsConnected()
}
worker = newStatsWorker(
ctx,
t,
roomID,
@@ -209,6 +216,9 @@ func (t *telemetryService) getOrCreateWorker(
participantID,
participantIdentity,
)
if existingIsConnected {
worker.SetConnected()
}
t.workers[participantID] = worker