Fix duplicate subscriber connected (#729)

* Fix duplicate participant not being cleared out

* handle data race
This commit is contained in:
David Zhao
2022-05-27 22:01:34 -07:00
committed by GitHub
parent 8048a89d5e
commit 9ee50c300c
2 changed files with 148 additions and 19 deletions
+30 -19
View File
@@ -290,7 +290,7 @@ func (r *Room) Join(participant types.LocalParticipant, opts *ParticipantOptions
}
})
if err := participant.SendJoinResponse(r.protoRoom, otherParticipants, iceServers, region); err != nil {
if err := participant.SendJoinResponse(proto.Clone(r.protoRoom).(*livekit.Room), otherParticipants, iceServers, region); err != nil {
prometheus.ServiceOperationCounter.WithLabelValues("participant_join", "error", "send_response").Add(1)
return err
}
@@ -741,12 +741,11 @@ func (r *Room) subscribeToExistingTracks(p types.LocalParticipant) int {
// broadcast an update about participant p
func (r *Room) broadcastParticipantState(p types.LocalParticipant, opts broadcastOptions) {
pi := p.ToProto()
updates := []*livekit.ParticipantInfo{pi}
if p.Hidden() {
if !opts.skipSource {
// send update only to hidden participant
err := p.SendParticipantUpdate(updates)
err := p.SendParticipantUpdate([]*livekit.ParticipantInfo{pi})
if err != nil {
r.Logger.Errorw("could not send update to participant", err,
"participant", p.Identity(), "pID", p.ID())
@@ -755,13 +754,7 @@ func (r *Room) broadcastParticipantState(p types.LocalParticipant, opts broadcas
return
}
if !pi.IsPublisher && !opts.immediate {
r.enqueueParticipantUpdate(pi)
return
} else {
r.clearParticipantUpdate(p.Identity())
}
updates := r.pushAndDequeueUpdates(pi, opts.immediate)
r.sendParticipantUpdates(updates)
}
@@ -807,23 +800,41 @@ func (r *Room) sendSpeakerChanges(speakers []*livekit.SpeakerInfo) {
}
}
func (r *Room) enqueueParticipantUpdate(pi *livekit.ParticipantInfo) {
// push a participant update for batched broadcast, optionally returning immediate updates to broadcast.
// it handles the following scenarios
// * subscriber-only updates will be queued for batch updates
// * publisher & immediate updates will be returned without queuing
// * when the SID changes, it will return both updates, with the earlier participant set to disconnected
func (r *Room) pushAndDequeueUpdates(pi *livekit.ParticipantInfo, isImmediate bool) []*livekit.ParticipantInfo {
r.batchedUpdatesMu.Lock()
defer r.batchedUpdatesMu.Unlock()
var updates []*livekit.ParticipantInfo
identity := livekit.ParticipantIdentity(pi.Identity)
existing := r.batchedUpdates[identity]
if existing != nil && pi.Sid == existing.Sid && existing.Version > pi.Version {
return
if existing != nil {
if pi.Sid != existing.Sid {
// session change, need to send immediately
isImmediate = true
existing.State = livekit.ParticipantInfo_DISCONNECTED
updates = append(updates, existing)
} else if pi.Version < existing.Version {
// out of order update
return nil
}
}
r.batchedUpdates[identity] = pi
}
if isImmediate || pi.IsPublisher {
// include any queued update, and return
delete(r.batchedUpdates, identity)
updates = append(updates, pi)
} else {
// enqueue for batch
r.batchedUpdates[identity] = pi
}
func (r *Room) clearParticipantUpdate(identity livekit.ParticipantIdentity) {
r.batchedUpdatesMu.Lock()
delete(r.batchedUpdates, identity)
r.batchedUpdatesMu.Unlock()
return updates
}
func (r *Room) subscriberBroadcastWorker() {
+118
View File
@@ -6,6 +6,7 @@ import (
"time"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
@@ -204,6 +205,123 @@ func TestParticipantUpdate(t *testing.T) {
}
}
func TestPushAndDequeueUpdates(t *testing.T) {
identity := "test_user"
publisher1v1 := &livekit.ParticipantInfo{
Identity: identity,
Sid: "1",
IsPublisher: true,
Version: 1,
}
publisher1v2 := &livekit.ParticipantInfo{
Identity: identity,
Sid: "1",
IsPublisher: true,
Version: 2,
}
publisher2 := &livekit.ParticipantInfo{
Identity: identity,
Sid: "2",
IsPublisher: true,
Version: 1,
}
subscriber1v1 := &livekit.ParticipantInfo{
Identity: identity,
Sid: "1",
Version: 1,
}
subscriber1v2 := &livekit.ParticipantInfo{
Identity: identity,
Sid: "1",
Version: 2,
}
requirePIEquals := func(t *testing.T, a, b *livekit.ParticipantInfo) {
require.Equal(t, a.Sid, b.Sid)
require.Equal(t, a.Identity, b.Identity)
require.Equal(t, a.Version, b.Version)
}
testCases := []struct {
name string
pi *livekit.ParticipantInfo
immediate bool
existing *livekit.ParticipantInfo
expected []*livekit.ParticipantInfo
validate func(t *testing.T, rm *Room, updates []*livekit.ParticipantInfo)
}{
{
name: "publisher updates are immediate",
pi: publisher1v1,
expected: []*livekit.ParticipantInfo{publisher1v1},
},
{
name: "subscriber updates are queued",
pi: subscriber1v1,
},
{
name: "last version is enqueued",
pi: subscriber1v2,
existing: subscriber1v1,
validate: func(t *testing.T, rm *Room, _ []*livekit.ParticipantInfo) {
queued := rm.batchedUpdates[livekit.ParticipantIdentity(identity)]
require.NotNil(t, queued)
requirePIEquals(t, subscriber1v2, queued)
},
},
{
name: "out of order updates are rejected",
pi: subscriber1v1,
existing: subscriber1v2,
validate: func(t *testing.T, rm *Room, updates []*livekit.ParticipantInfo) {
queued := rm.batchedUpdates[livekit.ParticipantIdentity(identity)]
requirePIEquals(t, subscriber1v2, queued)
},
},
{
name: "sid change is broadcasted immediately",
pi: publisher2,
existing: subscriber1v2,
expected: []*livekit.ParticipantInfo{
{
Identity: identity,
Sid: "1",
Version: 2,
State: livekit.ParticipantInfo_DISCONNECTED,
},
publisher2,
},
},
{
name: "when switching to publisher, queue is cleared",
pi: publisher1v2,
existing: subscriber1v1,
expected: []*livekit.ParticipantInfo{publisher1v2},
validate: func(t *testing.T, rm *Room, updates []*livekit.ParticipantInfo) {
require.Empty(t, rm.batchedUpdates)
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
rm := newRoomWithParticipants(t, testRoomOpts{num: 1})
if tc.existing != nil {
// clone the existing value since it can be modified when setting to disconnected
rm.batchedUpdates[livekit.ParticipantIdentity(tc.existing.Identity)] = proto.Clone(tc.existing).(*livekit.ParticipantInfo)
}
updates := rm.pushAndDequeueUpdates(tc.pi, tc.immediate)
require.Equal(t, len(tc.expected), len(updates))
for i, item := range tc.expected {
requirePIEquals(t, item, updates[i])
}
if tc.validate != nil {
tc.validate(t, rm, updates)
}
})
}
}
func TestRoomClosure(t *testing.T) {
t.Run("room closes after participant leaves", func(t *testing.T) {
rm := newRoomWithParticipants(t, testRoomOpts{num: 1})