From efbc985c82c4c8eec74200a7a1c5b36a57d5053c Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Mon, 29 Jan 2024 10:57:41 +0530 Subject: [PATCH] Cache data synchronously for processing in worker. (#2424) It is possible that state of underlying object has changed between event posting and event processing. So, cache data synchronously and use it during event processing. This is still not perfect as things like `hidden` and `IsClosed` is accessed in worker. Ideally, it can be a snapshot of current state of all required values that can be posted to the worker and the worker just operates with data. --- pkg/rtc/room.go | 84 +++++++++++++++++++++++--------------- pkg/rtc/room_test.go | 2 +- pkg/service/roommanager.go | 8 ++-- 3 files changed, 57 insertions(+), 37 deletions(-) diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 745ae9f4f..d95044ed9 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -118,7 +118,7 @@ type Room struct { trailer []byte - onParticipantChanged func(p types.LocalParticipant) + onParticipantChanged func(p types.LocalParticipant, pi *livekit.ParticipantInfo) onRoomUpdated func() onClose func() @@ -347,11 +347,13 @@ func (r *Room) Join(participant types.LocalParticipant, requestSource routing.Me pw := r.addParticipantWorkerLocked(participant) participant.OnStateChange(func(p types.LocalParticipant, state livekit.ParticipantInfo_State) { + ri := r.ToProto() + pi := p.ToProto() pw.eventsQueue.Enqueue(func() { if r.onParticipantChanged != nil { - r.onParticipantChanged(p) + r.onParticipantChanged(p, pi) } - r.broadcastParticipantState(p, broadcastOptions{skipSource: true}) + r.broadcastParticipantState(p, pi, broadcastOptions{skipSource: true}) if state == livekit.ParticipantInfo_ACTIVE { // subscribe participant to existing published tracks @@ -368,8 +370,8 @@ func (r *Room) Join(participant types.LocalParticipant, requestSource routing.Me } } r.telemetry.ParticipantActive(context.Background(), - r.ToProto(), - p.ToProto(), + ri, + pi, meta, false, ) @@ -383,23 +385,29 @@ func (r *Room) Join(participant types.LocalParticipant, requestSource routing.Me }) // it's important to set this before connection, we don't want to miss out on any published tracks participant.OnTrackPublished(func(p types.LocalParticipant, t types.MediaTrack) { + pi := p.ToProto() + ti := t.ToProto() pw.eventsQueue.Enqueue(func() { - r.onTrackPublished(p, t) + r.onTrackPublished(p, pi, t, ti) }) }) participant.OnTrackUpdated(func(p types.LocalParticipant, t types.MediaTrack) { + pi := p.ToProto() pw.eventsQueue.Enqueue(func() { - r.onTrackUpdated(p, t) + r.onTrackUpdated(p, pi, t) }) }) participant.OnTrackUnpublished(func(p types.LocalParticipant, t types.MediaTrack) { + pi := p.ToProto() + ti := t.ToProto() pw.eventsQueue.Enqueue(func() { - r.onTrackUnpublished(p, t) + r.onTrackUnpublished(p, pi, t, ti) }) }) participant.OnParticipantUpdate(func(p types.LocalParticipant) { + pi := p.ToProto() pw.eventsQueue.Enqueue(func() { - r.onParticipantUpdate(p) + r.onParticipantUpdate(p, pi) }) }) participant.OnDataPacket(r.onDataPacket) @@ -460,7 +468,7 @@ func (r *Room) Join(participant types.LocalParticipant, requestSource routing.Me r.participantRequestSources[participant.Identity()] = requestSource if r.onParticipantChanged != nil { - r.onParticipantChanged(participant) + r.onParticipantChanged(participant, participant.ToProto()) } time.AfterFunc(time.Minute, func() { @@ -637,10 +645,11 @@ func (r *Room) RemoveParticipant(identity livekit.ParticipantIdentity, pID livek r.leftAt.Store(time.Now().Unix()) if sendUpdates { + pi := p.ToProto() if r.onParticipantChanged != nil { - r.onParticipantChanged(p) + r.onParticipantChanged(p, pi) } - r.broadcastParticipantState(p, broadcastOptions{skipSource: true}) + r.broadcastParticipantState(p, pi, broadcastOptions{skipSource: true}) } } @@ -826,7 +835,7 @@ func (r *Room) OnClose(f func()) { r.onClose = f } -func (r *Room) OnParticipantChanged(f func(participant types.LocalParticipant)) { +func (r *Room) OnParticipantChanged(f func(p types.LocalParticipant, pi *livekit.ParticipantInfo)) { r.onParticipantChanged = f } @@ -988,9 +997,14 @@ func (r *Room) createJoinResponseLocked(participant types.LocalParticipant, iceS } // a ParticipantImpl in the room added a new track, subscribe other participants to it -func (r *Room) onTrackPublished(participant types.LocalParticipant, track types.MediaTrack) { +func (r *Room) onTrackPublished( + participant types.LocalParticipant, + pi *livekit.ParticipantInfo, + track types.MediaTrack, + ti *livekit.TrackInfo, +) { // publish participant update, since track state is changed - r.broadcastParticipantState(participant, broadcastOptions{skipSource: true}) + r.broadcastParticipantState(participant, pi, broadcastOptions{skipSource: true}) r.lock.RLock() // subscribe all existing participants to this MediaTrack @@ -1019,7 +1033,7 @@ func (r *Room) onTrackPublished(participant types.LocalParticipant, track types. r.lock.RUnlock() if onParticipantChanged != nil { - onParticipantChanged(participant) + onParticipantChanged(participant, pi) } r.trackManager.AddTrack(track, participant.Identity(), participant.ID()) @@ -1072,42 +1086,51 @@ func (r *Room) onTrackPublished(participant types.LocalParticipant, track types. context.Background(), participant.ID(), participant.Identity(), - track.ToProto(), + ti, ) } -func (r *Room) onTrackUpdated(p types.LocalParticipant, _ types.MediaTrack) { +func (r *Room) onTrackUpdated( + p types.LocalParticipant, + pi *livekit.ParticipantInfo, + _ types.MediaTrack, +) { // send track updates to everyone, especially if track was updated by admin - r.broadcastParticipantState(p, broadcastOptions{}) + r.broadcastParticipantState(p, pi, broadcastOptions{}) if r.onParticipantChanged != nil { - r.onParticipantChanged(p) + r.onParticipantChanged(p, pi) } } -func (r *Room) onTrackUnpublished(p types.LocalParticipant, track types.MediaTrack) { +func (r *Room) onTrackUnpublished( + p types.LocalParticipant, + pi *livekit.ParticipantInfo, + track types.MediaTrack, + ti *livekit.TrackInfo, +) { r.telemetry.TrackUnpublished( context.Background(), p.ID(), p.Identity(), - track.ToProto(), + ti, !p.IsClosed(), ) r.trackManager.RemoveTrack(track) if !p.IsClosed() { - r.broadcastParticipantState(p, broadcastOptions{skipSource: true}) + r.broadcastParticipantState(p, pi, broadcastOptions{skipSource: true}) } if r.onParticipantChanged != nil { - r.onParticipantChanged(p) + r.onParticipantChanged(p, pi) } } -func (r *Room) onParticipantUpdate(p types.LocalParticipant) { +func (r *Room) onParticipantUpdate(p types.LocalParticipant, pi *livekit.ParticipantInfo) { r.protoProxy.MarkDirty(false) // immediately notify when permissions or metadata changed - r.broadcastParticipantState(p, broadcastOptions{immediate: true}) + r.broadcastParticipantState(p, pi, broadcastOptions{immediate: true}) if r.onParticipantChanged != nil { - r.onParticipantChanged(p) + r.onParticipantChanged(p, pi) } } @@ -1142,16 +1165,13 @@ func (r *Room) subscribeToExistingTracks(p types.LocalParticipant) { } // broadcast an update about participant p -func (r *Room) broadcastParticipantState(p types.LocalParticipant, opts broadcastOptions) { - pi := p.ToProto() - +func (r *Room) broadcastParticipantState(p types.LocalParticipant, pi *livekit.ParticipantInfo, opts broadcastOptions) { if p.Hidden() { if !opts.skipSource { // send update only to hidden participant err := p.SendParticipantUpdate([]*livekit.ParticipantInfo{pi}) if err != nil { - r.Logger.Errorw("could not send update to participant", err, - "participant", p.Identity(), "pID", p.ID()) + p.GetLogger().Errorw("could not send update to participant", err) } } return diff --git a/pkg/rtc/room_test.go b/pkg/rtc/room_test.go index 4990b64d4..ec564c4af 100644 --- a/pkg/rtc/room_test.go +++ b/pkg/rtc/room_test.go @@ -127,7 +127,7 @@ func TestRoomJoin(t *testing.T) { t.Run("participant state change is broadcasted to others", func(t *testing.T) { rm := newRoomWithParticipants(t, testRoomOpts{num: numParticipants}) var changedParticipant types.Participant - rm.OnParticipantChanged(func(participant types.LocalParticipant) { + rm.OnParticipantChanged(func(participant types.LocalParticipant, _pi *livekit.ParticipantInfo) { changedParticipant = participant }) participants := rm.GetParticipants() diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index abfd29331..1c010acbb 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -577,10 +577,10 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room } }) - newRoom.OnParticipantChanged(func(p types.LocalParticipant) { - if !p.IsDisconnected() { - if err := r.roomStore.StoreParticipant(ctx, roomName, p.ToProto()); err != nil { - newRoom.Logger.Errorw("could not handle participant change", err) + newRoom.OnParticipantChanged(func(p types.LocalParticipant, pi *livekit.ParticipantInfo) { + if pi.State != livekit.ParticipantInfo_DISCONNECTED { + if err := r.roomStore.StoreParticipant(ctx, roomName, pi); err != nil { + p.GetLogger().Errorw("could not handle participant change", err) } } })