From 9ee50c300c3504dc8797b68aa986da036b3f4c8e Mon Sep 17 00:00:00 2001 From: David Zhao Date: Fri, 27 May 2022 22:01:34 -0700 Subject: [PATCH] Fix duplicate subscriber connected (#729) * Fix duplicate participant not being cleared out * handle data race --- pkg/rtc/room.go | 49 +++++++++++------- pkg/rtc/room_test.go | 118 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 148 insertions(+), 19 deletions(-) diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 0cd2acd96..2aa416f9d 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -290,7 +290,7 @@ func (r *Room) Join(participant types.LocalParticipant, opts *ParticipantOptions } }) - if err := participant.SendJoinResponse(r.protoRoom, otherParticipants, iceServers, region); err != nil { + if err := participant.SendJoinResponse(proto.Clone(r.protoRoom).(*livekit.Room), otherParticipants, iceServers, region); err != nil { prometheus.ServiceOperationCounter.WithLabelValues("participant_join", "error", "send_response").Add(1) return err } @@ -741,12 +741,11 @@ func (r *Room) subscribeToExistingTracks(p types.LocalParticipant) int { // broadcast an update about participant p func (r *Room) broadcastParticipantState(p types.LocalParticipant, opts broadcastOptions) { pi := p.ToProto() - updates := []*livekit.ParticipantInfo{pi} if p.Hidden() { if !opts.skipSource { // send update only to hidden participant - err := p.SendParticipantUpdate(updates) + err := p.SendParticipantUpdate([]*livekit.ParticipantInfo{pi}) if err != nil { r.Logger.Errorw("could not send update to participant", err, "participant", p.Identity(), "pID", p.ID()) @@ -755,13 +754,7 @@ func (r *Room) broadcastParticipantState(p types.LocalParticipant, opts broadcas return } - if !pi.IsPublisher && !opts.immediate { - r.enqueueParticipantUpdate(pi) - return - } else { - r.clearParticipantUpdate(p.Identity()) - } - + updates := r.pushAndDequeueUpdates(pi, opts.immediate) r.sendParticipantUpdates(updates) } @@ -807,23 +800,41 @@ func (r *Room) sendSpeakerChanges(speakers []*livekit.SpeakerInfo) { } } -func (r *Room) enqueueParticipantUpdate(pi *livekit.ParticipantInfo) { +// push a participant update for batched broadcast, optionally returning immediate updates to broadcast. +// it handles the following scenarios +// * subscriber-only updates will be queued for batch updates +// * publisher & immediate updates will be returned without queuing +// * when the SID changes, it will return both updates, with the earlier participant set to disconnected +func (r *Room) pushAndDequeueUpdates(pi *livekit.ParticipantInfo, isImmediate bool) []*livekit.ParticipantInfo { r.batchedUpdatesMu.Lock() defer r.batchedUpdatesMu.Unlock() + var updates []*livekit.ParticipantInfo identity := livekit.ParticipantIdentity(pi.Identity) existing := r.batchedUpdates[identity] - if existing != nil && pi.Sid == existing.Sid && existing.Version > pi.Version { - return + + if existing != nil { + if pi.Sid != existing.Sid { + // session change, need to send immediately + isImmediate = true + existing.State = livekit.ParticipantInfo_DISCONNECTED + updates = append(updates, existing) + } else if pi.Version < existing.Version { + // out of order update + return nil + } } - r.batchedUpdates[identity] = pi -} + if isImmediate || pi.IsPublisher { + // include any queued update, and return + delete(r.batchedUpdates, identity) + updates = append(updates, pi) + } else { + // enqueue for batch + r.batchedUpdates[identity] = pi + } -func (r *Room) clearParticipantUpdate(identity livekit.ParticipantIdentity) { - r.batchedUpdatesMu.Lock() - delete(r.batchedUpdates, identity) - r.batchedUpdatesMu.Unlock() + return updates } func (r *Room) subscriberBroadcastWorker() { diff --git a/pkg/rtc/room_test.go b/pkg/rtc/room_test.go index 9e3fd0e16..ea8f48f34 100644 --- a/pkg/rtc/room_test.go +++ b/pkg/rtc/room_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" @@ -204,6 +205,123 @@ func TestParticipantUpdate(t *testing.T) { } } +func TestPushAndDequeueUpdates(t *testing.T) { + identity := "test_user" + publisher1v1 := &livekit.ParticipantInfo{ + Identity: identity, + Sid: "1", + IsPublisher: true, + Version: 1, + } + publisher1v2 := &livekit.ParticipantInfo{ + Identity: identity, + Sid: "1", + IsPublisher: true, + Version: 2, + } + publisher2 := &livekit.ParticipantInfo{ + Identity: identity, + Sid: "2", + IsPublisher: true, + Version: 1, + } + subscriber1v1 := &livekit.ParticipantInfo{ + Identity: identity, + Sid: "1", + Version: 1, + } + subscriber1v2 := &livekit.ParticipantInfo{ + Identity: identity, + Sid: "1", + Version: 2, + } + + requirePIEquals := func(t *testing.T, a, b *livekit.ParticipantInfo) { + require.Equal(t, a.Sid, b.Sid) + require.Equal(t, a.Identity, b.Identity) + require.Equal(t, a.Version, b.Version) + } + testCases := []struct { + name string + pi *livekit.ParticipantInfo + immediate bool + existing *livekit.ParticipantInfo + expected []*livekit.ParticipantInfo + validate func(t *testing.T, rm *Room, updates []*livekit.ParticipantInfo) + }{ + { + name: "publisher updates are immediate", + pi: publisher1v1, + expected: []*livekit.ParticipantInfo{publisher1v1}, + }, + { + name: "subscriber updates are queued", + pi: subscriber1v1, + }, + { + name: "last version is enqueued", + pi: subscriber1v2, + existing: subscriber1v1, + validate: func(t *testing.T, rm *Room, _ []*livekit.ParticipantInfo) { + queued := rm.batchedUpdates[livekit.ParticipantIdentity(identity)] + require.NotNil(t, queued) + requirePIEquals(t, subscriber1v2, queued) + }, + }, + { + name: "out of order updates are rejected", + pi: subscriber1v1, + existing: subscriber1v2, + validate: func(t *testing.T, rm *Room, updates []*livekit.ParticipantInfo) { + queued := rm.batchedUpdates[livekit.ParticipantIdentity(identity)] + requirePIEquals(t, subscriber1v2, queued) + }, + }, + { + name: "sid change is broadcasted immediately", + pi: publisher2, + existing: subscriber1v2, + expected: []*livekit.ParticipantInfo{ + { + Identity: identity, + Sid: "1", + Version: 2, + State: livekit.ParticipantInfo_DISCONNECTED, + }, + publisher2, + }, + }, + { + name: "when switching to publisher, queue is cleared", + pi: publisher1v2, + existing: subscriber1v1, + expected: []*livekit.ParticipantInfo{publisher1v2}, + validate: func(t *testing.T, rm *Room, updates []*livekit.ParticipantInfo) { + require.Empty(t, rm.batchedUpdates) + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + rm := newRoomWithParticipants(t, testRoomOpts{num: 1}) + if tc.existing != nil { + // clone the existing value since it can be modified when setting to disconnected + rm.batchedUpdates[livekit.ParticipantIdentity(tc.existing.Identity)] = proto.Clone(tc.existing).(*livekit.ParticipantInfo) + } + updates := rm.pushAndDequeueUpdates(tc.pi, tc.immediate) + require.Equal(t, len(tc.expected), len(updates)) + for i, item := range tc.expected { + requirePIEquals(t, item, updates[i]) + } + + if tc.validate != nil { + tc.validate(t, rm, updates) + } + }) + } +} + func TestRoomClosure(t *testing.T) { t.Run("room closes after participant leaves", func(t *testing.T) { rm := newRoomWithParticipants(t, testRoomOpts{num: 1})