From 5f87a35b7ede63e9c0cf3f4eb5bbd0250466e57a Mon Sep 17 00:00:00 2001 From: Raja Subramanian Date: Sat, 17 May 2025 17:02:41 +0530 Subject: [PATCH] Prevent operating on swapped out map. (#3670) * Prevent operating on swapped out map. * test --- pkg/rtc/room.go | 15 ++++++--------- pkg/rtc/room_test.go | 3 ++- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index f372c3135..dae8bac3b 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -1338,14 +1338,15 @@ func (r *Room) broadcastParticipantState(p types.LocalParticipant, opts broadcas return } + r.batchedUpdatesMu.Lock() updates := PushAndDequeueUpdates( pi, p.CloseReason(), opts.immediate, r.GetParticipant(livekit.ParticipantIdentity(pi.Identity)), - &r.batchedUpdatesMu, r.batchedUpdates, ) + r.batchedUpdatesMu.Unlock() if len(updates) != 0 { selfSent = true SendParticipantUpdates(updates, r.GetParticipants()) @@ -1395,14 +1396,14 @@ func (r *Room) changeUpdateWorker() { r.sendRoomUpdate() case <-subTicker.C: r.batchedUpdatesMu.Lock() + if len(r.batchedUpdates) == 0 { + r.batchedUpdatesMu.Unlock() + continue + } updatesMap := r.batchedUpdates r.batchedUpdates = make(map[livekit.ParticipantIdentity]*ParticipantUpdate) r.batchedUpdatesMu.Unlock() - if len(updatesMap) == 0 { - continue - } - SendParticipantUpdates(maps.Values(updatesMap), r.GetParticipants()) } } @@ -1819,12 +1820,8 @@ func PushAndDequeueUpdates( closeReason types.ParticipantCloseReason, isImmediate bool, existingParticipant types.Participant, - lock *sync.Mutex, cache map[livekit.ParticipantIdentity]*ParticipantUpdate, ) []*ParticipantUpdate { - lock.Lock() - defer lock.Unlock() - var updates []*ParticipantUpdate identity := livekit.ParticipantIdentity(pi.Identity) existing := cache[identity] diff --git a/pkg/rtc/room_test.go b/pkg/rtc/room_test.go index 07dccc002..ff552f599 100644 --- a/pkg/rtc/room_test.go +++ b/pkg/rtc/room_test.go @@ -351,14 +351,15 @@ func TestPushAndDequeueUpdates(t *testing.T) { if tc.existing != nil { rm.batchedUpdates[livekit.ParticipantIdentity(tc.existing.ParticipantInfo.Identity)] = tc.existing } + rm.batchedUpdatesMu.Lock() updates := PushAndDequeueUpdates( tc.pi, tc.closeReason, tc.immediate, rm.GetParticipant(livekit.ParticipantIdentity(tc.pi.Identity)), - &rm.batchedUpdatesMu, rm.batchedUpdates, ) + rm.batchedUpdatesMu.Unlock() require.Equal(t, len(tc.expected), len(updates)) for i, item := range tc.expected { requirePIEquals(t, item.ParticipantInfo, updates[i].ParticipantInfo)