mirror of
https://github.com/livekit/livekit.git
synced 2026-05-19 15:25:40 +00:00
Prevent operating on swapped out map. (#3670)
* Prevent operating on swapped out map. * test
This commit is contained in:
+6
-9
@@ -1338,14 +1338,15 @@ func (r *Room) broadcastParticipantState(p types.LocalParticipant, opts broadcas
|
||||
return
|
||||
}
|
||||
|
||||
r.batchedUpdatesMu.Lock()
|
||||
updates := PushAndDequeueUpdates(
|
||||
pi,
|
||||
p.CloseReason(),
|
||||
opts.immediate,
|
||||
r.GetParticipant(livekit.ParticipantIdentity(pi.Identity)),
|
||||
&r.batchedUpdatesMu,
|
||||
r.batchedUpdates,
|
||||
)
|
||||
r.batchedUpdatesMu.Unlock()
|
||||
if len(updates) != 0 {
|
||||
selfSent = true
|
||||
SendParticipantUpdates(updates, r.GetParticipants())
|
||||
@@ -1395,14 +1396,14 @@ func (r *Room) changeUpdateWorker() {
|
||||
r.sendRoomUpdate()
|
||||
case <-subTicker.C:
|
||||
r.batchedUpdatesMu.Lock()
|
||||
if len(r.batchedUpdates) == 0 {
|
||||
r.batchedUpdatesMu.Unlock()
|
||||
continue
|
||||
}
|
||||
updatesMap := r.batchedUpdates
|
||||
r.batchedUpdates = make(map[livekit.ParticipantIdentity]*ParticipantUpdate)
|
||||
r.batchedUpdatesMu.Unlock()
|
||||
|
||||
if len(updatesMap) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
SendParticipantUpdates(maps.Values(updatesMap), r.GetParticipants())
|
||||
}
|
||||
}
|
||||
@@ -1819,12 +1820,8 @@ func PushAndDequeueUpdates(
|
||||
closeReason types.ParticipantCloseReason,
|
||||
isImmediate bool,
|
||||
existingParticipant types.Participant,
|
||||
lock *sync.Mutex,
|
||||
cache map[livekit.ParticipantIdentity]*ParticipantUpdate,
|
||||
) []*ParticipantUpdate {
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
|
||||
var updates []*ParticipantUpdate
|
||||
identity := livekit.ParticipantIdentity(pi.Identity)
|
||||
existing := cache[identity]
|
||||
|
||||
@@ -351,14 +351,15 @@ func TestPushAndDequeueUpdates(t *testing.T) {
|
||||
if tc.existing != nil {
|
||||
rm.batchedUpdates[livekit.ParticipantIdentity(tc.existing.ParticipantInfo.Identity)] = tc.existing
|
||||
}
|
||||
rm.batchedUpdatesMu.Lock()
|
||||
updates := PushAndDequeueUpdates(
|
||||
tc.pi,
|
||||
tc.closeReason,
|
||||
tc.immediate,
|
||||
rm.GetParticipant(livekit.ParticipantIdentity(tc.pi.Identity)),
|
||||
&rm.batchedUpdatesMu,
|
||||
rm.batchedUpdates,
|
||||
)
|
||||
rm.batchedUpdatesMu.Unlock()
|
||||
require.Equal(t, len(tc.expected), len(updates))
|
||||
for i, item := range tc.expected {
|
||||
requirePIEquals(t, item.ParticipantInfo, updates[i].ParticipantInfo)
|
||||
|
||||
Reference in New Issue
Block a user