diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index ff6758d80..547f65965 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -179,7 +179,7 @@ type ParticipantImpl struct { requireBroadcast bool // queued participant updates before join response is sent // guarded by updateLock - queuedUpdates []*livekit.ParticipantInfo + queuedUpdates []types.PendingParticipantUpdate // cache of recently sent updates, to ensuring ordering by version // guarded by updateLock updateCache *lru.Cache[livekit.ParticipantID, participantUpdateInfo] @@ -1077,7 +1077,9 @@ func (p *ParticipantImpl) VerifySubscribeParticipantInfo(pID livekit.Participant if f := p.params.GetParticipantInfo; f != nil { if info := f(pID); info != nil { - _ = p.SendParticipantUpdate([]*livekit.ParticipantInfo{info}) + _ = p.SendParticipantUpdate([]types.PendingParticipantUpdate{ + {Info: info}, + }) } } } diff --git a/pkg/rtc/participant_internal_test.go b/pkg/rtc/participant_internal_test.go index a1b758ff8..b01ef7ef2 100644 --- a/pkg/rtc/participant_internal_test.go +++ b/pkg/rtc/participant_internal_test.go @@ -230,8 +230,8 @@ func TestOutOfOrderUpdates(t *testing.T) { require.Greater(t, pi2.Version, pi1.Version) // send the second update first - require.NoError(t, p.SendParticipantUpdate([]*livekit.ParticipantInfo{pi2})) - require.NoError(t, p.SendParticipantUpdate([]*livekit.ParticipantInfo{pi1})) + require.NoError(t, p.SendParticipantUpdate([]types.PendingParticipantUpdate{{Info: pi2}})) + require.NoError(t, p.SendParticipantUpdate([]types.PendingParticipantUpdate{{Info: pi1}})) // only sent once, and it's the earlier message require.Equal(t, 1, sink.WriteMessageCallCount()) diff --git a/pkg/rtc/participant_signal.go b/pkg/rtc/participant_signal.go index 0a1bb31f1..65022a8e9 100644 --- a/pkg/rtc/participant_signal.go +++ b/pkg/rtc/participant_signal.go @@ -79,7 +79,7 @@ func (p *ParticipantImpl) SendJoinResponse(joinResponse *livekit.JoinResponse) e return nil } -func (p *ParticipantImpl) SendParticipantUpdate(participantsToUpdate []*livekit.ParticipantInfo) error { +func (p *ParticipantImpl) SendParticipantUpdate(participantsToUpdate []types.PendingParticipantUpdate) error { p.updateLock.Lock() if p.IsDisconnected() { p.updateLock.Unlock() @@ -93,30 +93,34 @@ func (p *ParticipantImpl) SendParticipantUpdate(participantsToUpdate []*livekit. return nil } validUpdates := make([]*livekit.ParticipantInfo, 0, len(participantsToUpdate)) - for _, pi := range participantsToUpdate { - isValid := true + for _, pu := range participantsToUpdate { + pi := pu.Info pID := livekit.ParticipantID(pi.Sid) + if pu.MaxProtocolVersion > 0 && p.ProtocolVersion() > pu.MaxProtocolVersion { + p.params.Logger.Infow("skipping participant update due to protocol version", + "otherParticipant", pi.Identity, "otherPID", pi.Sid, "version", pi.Version, "maxVersion", pu.MaxProtocolVersion) + continue + } if lastVersion, ok := p.updateCache.Get(pID); ok { // this is a message delivered out of order, a more recent version of the message had already been // sent. if pi.Version < lastVersion.version { p.params.Logger.Debugw("skipping outdated participant update", "otherParticipant", pi.Identity, "otherPID", pi.Sid, "version", pi.Version, "lastVersion", lastVersion) - isValid = false + continue } } if pi.Permission != nil && pi.Permission.Hidden && pi.Sid != string(p.params.SID) { p.params.Logger.Debugw("skipping hidden participant update", "otherParticipant", pi.Identity) - isValid = false - } - if isValid { - p.updateCache.Add(pID, participantUpdateInfo{ - identity: livekit.ParticipantIdentity(pi.Identity), - version: pi.Version, - state: pi.State, - updatedAt: time.Now(), - }) - validUpdates = append(validUpdates, pi) + continue } + p.params.Logger.Infow("queuing valid participant update", "otherParticipant", pi.Identity, "sid", pi.Sid, "state", pi.State, "maxVersion", pu.MaxProtocolVersion) + p.updateCache.Add(pID, participantUpdateInfo{ + identity: livekit.ParticipantIdentity(pi.Identity), + version: pi.Version, + state: pi.State, + updatedAt: time.Now(), + }) + validUpdates = append(validUpdates, pi) } p.updateLock.Unlock() diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 1bb1f671c..adf86c987 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -65,7 +65,6 @@ var ( type broadcastOptions struct { skipSource bool - immediate bool } type disconnectSignalOnResumeNoMessages struct { @@ -694,7 +693,7 @@ func (r *Room) SyncState(participant types.LocalParticipant, state *livekit.Sync } func (r *Room) UpdateSubscriptionPermission(participant types.LocalParticipant, subscriptionPermission *livekit.SubscriptionPermission) error { - if err := participant.UpdateSubscriptionPermission(subscriptionPermission, utils.TimedVersion{}, r.GetParticipant, r.GetParticipantByID); err != nil { + if err := participant.UpdateSubscriptionPermission(subscriptionPermission, utils.TimedVersion{}, r.GetParticipantByID); err != nil { return err } for _, track := range participant.GetPublishedTracks() { @@ -910,16 +909,16 @@ func (r *Room) SimulateScenario(participant types.LocalParticipant, simulateScen return nil } -func (r *Room) getOtherParticipantInfo(identity livekit.ParticipantIdentity) []*livekit.ParticipantInfo { +func (r *Room) getOtherParticipantInfo(identity livekit.ParticipantIdentity) []types.PendingParticipantUpdate { participants := r.GetParticipants() - pi := make([]*livekit.ParticipantInfo, 0, len(participants)) + pus := make([]types.PendingParticipantUpdate, 0, len(participants)) for _, p := range participants { if !p.Hidden() && p.Identity() != identity { - pi = append(pi, p.ToProto()) + pus = append(pus, types.PendingParticipantUpdate{Info: p.ToProto()}) } } - return pi + return pus } // checks if participant should be autosubscribed to new tracks, assumes lock is already acquired @@ -1061,7 +1060,7 @@ func (r *Room) onTrackUnpublished(p types.LocalParticipant, track types.MediaTra func (r *Room) onParticipantUpdate(p types.LocalParticipant) { r.protoProxy.MarkDirty(false) // immediately notify when permissions or metadata changed - r.broadcastParticipantState(p, broadcastOptions{immediate: true}) + r.broadcastParticipantState(p, broadcastOptions{}) if r.onParticipantChanged != nil { r.onParticipantChanged(p) } @@ -1104,7 +1103,7 @@ func (r *Room) broadcastParticipantState(p types.LocalParticipant, opts broadcas if p.Hidden() { if !opts.skipSource { // send update only to hidden participant - err := p.SendParticipantUpdate([]*livekit.ParticipantInfo{pi}) + err := p.SendParticipantUpdate([]types.PendingParticipantUpdate{{Info: pi}}) if err != nil { r.Logger.Errorw("could not send update to participant", err, "participant", p.Identity(), "pID", p.ID()) @@ -1113,11 +1112,11 @@ func (r *Room) broadcastParticipantState(p types.LocalParticipant, opts broadcas return } - updates := r.pushAndDequeueUpdates(pi, opts.immediate) + updates := r.pushAndDequeueUpdates(pi) r.sendParticipantUpdates(updates) } -func (r *Room) sendParticipantUpdates(updates []*livekit.ParticipantInfo) { +func (r *Room) sendParticipantUpdates(updates []types.PendingParticipantUpdate) { if len(updates) == 0 { return } @@ -1172,56 +1171,16 @@ func (r *Room) sendSpeakerChanges(speakers []*livekit.SpeakerInfo) { // * subscriber-only updates will be queued for batch updates // * publisher & immediate updates will be returned without queuing // * when the SID changes, it will return both updates, with the earlier participant set to disconnected -func (r *Room) pushAndDequeueUpdates(pi *livekit.ParticipantInfo, isImmediate bool) []*livekit.ParticipantInfo { +func (r *Room) pushAndDequeueUpdates(pi *livekit.ParticipantInfo) []types.PendingParticipantUpdate { r.batchedUpdatesMu.Lock() defer r.batchedUpdatesMu.Unlock() - var updates []*livekit.ParticipantInfo - identity := livekit.ParticipantIdentity(pi.Identity) - existing := r.batchedUpdates[identity] - shouldSend := isImmediate || pi.IsPublisher - - if existing != nil { - if pi.Sid == existing.Sid { - // same participant session - if pi.Version < existing.Version { - // out of order update - return nil - } - } else { - // different participant sessions - if existing.JoinedAt < pi.JoinedAt { - // existing is older, synthesize a DISCONNECT for older and - // send immediately along with newer session to signal switch - shouldSend = true - existing.State = livekit.ParticipantInfo_DISCONNECTED - updates = append(updates, existing) - } else { - // older session update, newer session has already become active, so nothing to do - return nil - } - } - } else { - ep := r.GetParticipant(identity) - if ep != nil { - epi := ep.ToProto() - if epi.JoinedAt > pi.JoinedAt { - // older session update, newer session has already become active, so nothing to do - return nil - } - } + getParticipant := func(identity livekit.ParticipantIdentity) types.Participant { + p := r.GetParticipant(identity) + return p } - if shouldSend { - // include any queued update, and return - delete(r.batchedUpdates, identity) - updates = append(updates, pi) - } else { - // enqueue for batch - r.batchedUpdates[identity] = pi - } - - return updates + return PushAndDequeueUpdates(r.batchedUpdates, getParticipant, pi) } func (r *Room) updateProto() *livekit.Room { @@ -1266,9 +1225,9 @@ func (r *Room) changeUpdateWorker() { continue } - updates := make([]*livekit.ParticipantInfo, 0, len(updatesMap)) + updates := make([]types.PendingParticipantUpdate, 0, len(updatesMap)) for _, pi := range updatesMap { - updates = append(updates, pi) + updates = append(updates, types.PendingParticipantUpdate{Info: pi}) } r.sendParticipantUpdates(updates) } @@ -1452,6 +1411,63 @@ func (r *Room) DebugInfo() map[string]interface{} { return info } +func PushAndDequeueUpdates( + updatesMap map[livekit.ParticipantIdentity]*livekit.ParticipantInfo, + participantGetter func(livekit.ParticipantIdentity) types.Participant, + pu types.PendingParticipantUpdate, +) []types.PendingParticipantUpdate { + var updates []types.PendingParticipantUpdate + pi := pu.Info + identity := livekit.ParticipantIdentity(pi.Identity) + existing := updatesMap[identity] + shouldSend := pi.IsPublisher + + if existing != nil { + if pi.Sid == existing.Sid { + // same participant session + if pi.Version < existing.Version { + // out of order update + return nil + } + } else { + // different participant sessions + if existing.JoinedAt < pi.JoinedAt { + // existing is older, synthesize a DISCONNECT for older and + // send immediately along with newer session to signal switch + shouldSend = false + existing.State = livekit.ParticipantInfo_DISCONNECTED + updates = append(updates, types.PendingParticipantUpdate{ + Info: existing, + PossibleSIDChange: true, + }) + } else { + // older session update, newer session has already become active, so nothing to do + return nil + } + } + } else { + ep := participantGetter(identity) + if ep != nil { + epi := ep.ToProto() + if epi.JoinedAt > pi.JoinedAt { + // older session update, newer session has already become active, so nothing to do + return nil + } + } + } + + if shouldSend { + // include any queued update, and return + delete(updatesMap, identity) + updates = append(updates, types.PendingParticipantUpdate{Info: pi}) + } else { + // enqueue for batch + updatesMap[identity] = pi + } + + return updates +} + func BroadcastDataPacketForRoom(r types.Room, source types.LocalParticipant, dp *livekit.DataPacket, logger logger.Logger) { dest := dp.GetUser().GetDestinationSids() var dpData []byte diff --git a/pkg/rtc/room_test.go b/pkg/rtc/room_test.go index 81dfe90c8..456cc237a 100644 --- a/pkg/rtc/room_test.go +++ b/pkg/rtc/room_test.go @@ -271,7 +271,7 @@ func TestPushAndDequeueUpdates(t *testing.T) { immediate bool existing *livekit.ParticipantInfo expected []*livekit.ParticipantInfo - validate func(t *testing.T, rm *Room, updates []*livekit.ParticipantInfo) + validate func(t *testing.T, rm *Room, updates []types.PendingParticipantUpdate) }{ { name: "publisher updates are immediate", @@ -286,7 +286,7 @@ func TestPushAndDequeueUpdates(t *testing.T) { name: "last version is enqueued", pi: subscriber1v2, existing: subscriber1v1, - validate: func(t *testing.T, rm *Room, _ []*livekit.ParticipantInfo) { + validate: func(t *testing.T, rm *Room, _ []types.PendingParticipantUpdate) { queued := rm.batchedUpdates[livekit.ParticipantIdentity(identity)] require.NotNil(t, queued) requirePIEquals(t, subscriber1v2, queued) @@ -298,7 +298,7 @@ func TestPushAndDequeueUpdates(t *testing.T) { existing: subscriber1v1, immediate: true, expected: []*livekit.ParticipantInfo{subscriber1v2}, - validate: func(t *testing.T, rm *Room, _ []*livekit.ParticipantInfo) { + validate: func(t *testing.T, rm *Room, _ []types.PendingParticipantUpdate) { queued := rm.batchedUpdates[livekit.ParticipantIdentity(identity)] require.Nil(t, queued) }, @@ -307,7 +307,7 @@ func TestPushAndDequeueUpdates(t *testing.T) { name: "out of order updates are rejected", pi: subscriber1v1, existing: subscriber1v2, - validate: func(t *testing.T, rm *Room, updates []*livekit.ParticipantInfo) { + validate: func(t *testing.T, rm *Room, _ []types.PendingParticipantUpdate) { queued := rm.batchedUpdates[livekit.ParticipantIdentity(identity)] requirePIEquals(t, subscriber1v2, queued) }, @@ -331,7 +331,7 @@ func TestPushAndDequeueUpdates(t *testing.T) { pi: publisher1v2, existing: subscriber1v1, expected: []*livekit.ParticipantInfo{publisher1v2}, - validate: func(t *testing.T, rm *Room, updates []*livekit.ParticipantInfo) { + validate: func(t *testing.T, rm *Room, _ []types.PendingParticipantUpdate) { require.Empty(t, rm.batchedUpdates) }, }, @@ -347,7 +347,7 @@ func TestPushAndDequeueUpdates(t *testing.T) { updates := rm.pushAndDequeueUpdates(tc.pi, tc.immediate) require.Equal(t, len(tc.expected), len(updates)) for i, item := range tc.expected { - requirePIEquals(t, item, updates[i]) + requirePIEquals(t, item, updates[i].Info) } if tc.validate != nil { diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 466615c8a..a383184b7 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -282,7 +282,6 @@ type Participant interface { UpdateSubscriptionPermission( subscriptionPermission *livekit.SubscriptionPermission, timedVersion utils.TimedVersion, - resolverByIdentity func(participantIdentity livekit.ParticipantIdentity) LocalParticipant, resolverBySid func(participantID livekit.ParticipantID) LocalParticipant, ) error UpdateVideoLayers(updateVideoLayers *livekit.UpdateVideoLayers) error @@ -368,7 +367,7 @@ type LocalParticipant interface { // server sent messages SendJoinResponse(joinResponse *livekit.JoinResponse) error - SendParticipantUpdate(participants []*livekit.ParticipantInfo) error + SendParticipantUpdate(participants []PendingParticipantUpdate) error SendSpeakerUpdate(speakers []*livekit.SpeakerInfo, force bool) error SendDataPacket(packet *livekit.DataPacket, data []byte) error SendRoomUpdate(room *livekit.Room) error @@ -426,6 +425,13 @@ type LocalParticipant interface { SetRegionSettings(regionSettings *livekit.RegionSettings) } +// PendingParticipantUpdate holds a pending ParticipantInfo to be sent to clients +type PendingParticipantUpdate struct { + Info *livekit.ParticipantInfo + DisconnectReason livekit.DisconnectReason + PreviousID livekit.ParticipantID +} + // Room is a container of participants, and can provide room-level actions // //counterfeiter:generate . Room diff --git a/pkg/rtc/types/protocol_version.go b/pkg/rtc/types/protocol_version.go index 08075184d..a6ccbbfb5 100644 --- a/pkg/rtc/types/protocol_version.go +++ b/pkg/rtc/types/protocol_version.go @@ -84,6 +84,10 @@ func (v ProtocolVersion) SupportsAsyncRoomID() bool { return v > 11 } +func (v ProtocolVersion) SupportsSIDUpdates() bool { + return v > 11 +} + func (v ProtocolVersion) SupportsRegionsInLeaveRequest() bool { return v > 12 } diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index 756099d2c..bd67d9b4d 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -669,10 +669,10 @@ type FakeLocalParticipant struct { sendJoinResponseReturnsOnCall map[int]struct { result1 error } - SendParticipantUpdateStub func([]*livekit.ParticipantInfo) error + SendParticipantUpdateStub func([]types.PendingParticipantUpdate) error sendParticipantUpdateMutex sync.RWMutex sendParticipantUpdateArgsForCall []struct { - arg1 []*livekit.ParticipantInfo + arg1 []types.PendingParticipantUpdate } sendParticipantUpdateReturns struct { result1 error @@ -937,13 +937,12 @@ type FakeLocalParticipant struct { arg1 livekit.TrackID arg2 *livekit.UpdateTrackSettings } - UpdateSubscriptionPermissionStub func(*livekit.SubscriptionPermission, utils.TimedVersion, func(participantIdentity livekit.ParticipantIdentity) types.LocalParticipant, func(participantID livekit.ParticipantID) types.LocalParticipant) error + UpdateSubscriptionPermissionStub func(*livekit.SubscriptionPermission, utils.TimedVersion, func(participantID livekit.ParticipantID) types.LocalParticipant) error updateSubscriptionPermissionMutex sync.RWMutex updateSubscriptionPermissionArgsForCall []struct { arg1 *livekit.SubscriptionPermission arg2 utils.TimedVersion - arg3 func(participantIdentity livekit.ParticipantIdentity) types.LocalParticipant - arg4 func(participantID livekit.ParticipantID) types.LocalParticipant + arg3 func(participantID livekit.ParticipantID) types.LocalParticipant } updateSubscriptionPermissionReturns struct { result1 error @@ -4497,16 +4496,16 @@ func (fake *FakeLocalParticipant) SendJoinResponseReturnsOnCall(i int, result1 e }{result1} } -func (fake *FakeLocalParticipant) SendParticipantUpdate(arg1 []*livekit.ParticipantInfo) error { - var arg1Copy []*livekit.ParticipantInfo +func (fake *FakeLocalParticipant) SendParticipantUpdate(arg1 []types.PendingParticipantUpdate) error { + var arg1Copy []types.PendingParticipantUpdate if arg1 != nil { - arg1Copy = make([]*livekit.ParticipantInfo, len(arg1)) + arg1Copy = make([]types.PendingParticipantUpdate, len(arg1)) copy(arg1Copy, arg1) } fake.sendParticipantUpdateMutex.Lock() ret, specificReturn := fake.sendParticipantUpdateReturnsOnCall[len(fake.sendParticipantUpdateArgsForCall)] fake.sendParticipantUpdateArgsForCall = append(fake.sendParticipantUpdateArgsForCall, struct { - arg1 []*livekit.ParticipantInfo + arg1 []types.PendingParticipantUpdate }{arg1Copy}) stub := fake.SendParticipantUpdateStub fakeReturns := fake.sendParticipantUpdateReturns @@ -4527,13 +4526,13 @@ func (fake *FakeLocalParticipant) SendParticipantUpdateCallCount() int { return len(fake.sendParticipantUpdateArgsForCall) } -func (fake *FakeLocalParticipant) SendParticipantUpdateCalls(stub func([]*livekit.ParticipantInfo) error) { +func (fake *FakeLocalParticipant) SendParticipantUpdateCalls(stub func([]types.PendingParticipantUpdate) error) { fake.sendParticipantUpdateMutex.Lock() defer fake.sendParticipantUpdateMutex.Unlock() fake.SendParticipantUpdateStub = stub } -func (fake *FakeLocalParticipant) SendParticipantUpdateArgsForCall(i int) []*livekit.ParticipantInfo { +func (fake *FakeLocalParticipant) SendParticipantUpdateArgsForCall(i int) []types.PendingParticipantUpdate { fake.sendParticipantUpdateMutex.RLock() defer fake.sendParticipantUpdateMutex.RUnlock() argsForCall := fake.sendParticipantUpdateArgsForCall[i] @@ -5992,21 +5991,20 @@ func (fake *FakeLocalParticipant) UpdateSubscribedTrackSettingsArgsForCall(i int return argsForCall.arg1, argsForCall.arg2 } -func (fake *FakeLocalParticipant) UpdateSubscriptionPermission(arg1 *livekit.SubscriptionPermission, arg2 utils.TimedVersion, arg3 func(participantIdentity livekit.ParticipantIdentity) types.LocalParticipant, arg4 func(participantID livekit.ParticipantID) types.LocalParticipant) error { +func (fake *FakeLocalParticipant) UpdateSubscriptionPermission(arg1 *livekit.SubscriptionPermission, arg2 utils.TimedVersion, arg3 func(participantID livekit.ParticipantID) types.LocalParticipant) error { fake.updateSubscriptionPermissionMutex.Lock() ret, specificReturn := fake.updateSubscriptionPermissionReturnsOnCall[len(fake.updateSubscriptionPermissionArgsForCall)] fake.updateSubscriptionPermissionArgsForCall = append(fake.updateSubscriptionPermissionArgsForCall, struct { arg1 *livekit.SubscriptionPermission arg2 utils.TimedVersion - arg3 func(participantIdentity livekit.ParticipantIdentity) types.LocalParticipant - arg4 func(participantID livekit.ParticipantID) types.LocalParticipant - }{arg1, arg2, arg3, arg4}) + arg3 func(participantID livekit.ParticipantID) types.LocalParticipant + }{arg1, arg2, arg3}) stub := fake.UpdateSubscriptionPermissionStub fakeReturns := fake.updateSubscriptionPermissionReturns - fake.recordInvocation("UpdateSubscriptionPermission", []interface{}{arg1, arg2, arg3, arg4}) + fake.recordInvocation("UpdateSubscriptionPermission", []interface{}{arg1, arg2, arg3}) fake.updateSubscriptionPermissionMutex.Unlock() if stub != nil { - return stub(arg1, arg2, arg3, arg4) + return stub(arg1, arg2, arg3) } if specificReturn { return ret.result1 @@ -6020,17 +6018,17 @@ func (fake *FakeLocalParticipant) UpdateSubscriptionPermissionCallCount() int { return len(fake.updateSubscriptionPermissionArgsForCall) } -func (fake *FakeLocalParticipant) UpdateSubscriptionPermissionCalls(stub func(*livekit.SubscriptionPermission, utils.TimedVersion, func(participantIdentity livekit.ParticipantIdentity) types.LocalParticipant, func(participantID livekit.ParticipantID) types.LocalParticipant) error) { +func (fake *FakeLocalParticipant) UpdateSubscriptionPermissionCalls(stub func(*livekit.SubscriptionPermission, utils.TimedVersion, func(participantID livekit.ParticipantID) types.LocalParticipant) error) { fake.updateSubscriptionPermissionMutex.Lock() defer fake.updateSubscriptionPermissionMutex.Unlock() fake.UpdateSubscriptionPermissionStub = stub } -func (fake *FakeLocalParticipant) UpdateSubscriptionPermissionArgsForCall(i int) (*livekit.SubscriptionPermission, utils.TimedVersion, func(participantIdentity livekit.ParticipantIdentity) types.LocalParticipant, func(participantID livekit.ParticipantID) types.LocalParticipant) { +func (fake *FakeLocalParticipant) UpdateSubscriptionPermissionArgsForCall(i int) (*livekit.SubscriptionPermission, utils.TimedVersion, func(participantID livekit.ParticipantID) types.LocalParticipant) { fake.updateSubscriptionPermissionMutex.RLock() defer fake.updateSubscriptionPermissionMutex.RUnlock() argsForCall := fake.updateSubscriptionPermissionArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4 + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 } func (fake *FakeLocalParticipant) UpdateSubscriptionPermissionReturns(result1 error) { diff --git a/pkg/rtc/types/typesfakes/fake_participant.go b/pkg/rtc/types/typesfakes/fake_participant.go index 3f46bdbba..2da46fe6d 100644 --- a/pkg/rtc/types/typesfakes/fake_participant.go +++ b/pkg/rtc/types/typesfakes/fake_participant.go @@ -201,13 +201,12 @@ type FakeParticipant struct { toProtoReturnsOnCall map[int]struct { result1 *livekit.ParticipantInfo } - UpdateSubscriptionPermissionStub func(*livekit.SubscriptionPermission, utils.TimedVersion, func(participantIdentity livekit.ParticipantIdentity) types.LocalParticipant, func(participantID livekit.ParticipantID) types.LocalParticipant) error + UpdateSubscriptionPermissionStub func(*livekit.SubscriptionPermission, utils.TimedVersion, func(participantID livekit.ParticipantID) types.LocalParticipant) error updateSubscriptionPermissionMutex sync.RWMutex updateSubscriptionPermissionArgsForCall []struct { arg1 *livekit.SubscriptionPermission arg2 utils.TimedVersion - arg3 func(participantIdentity livekit.ParticipantIdentity) types.LocalParticipant - arg4 func(participantID livekit.ParticipantID) types.LocalParticipant + arg3 func(participantID livekit.ParticipantID) types.LocalParticipant } updateSubscriptionPermissionReturns struct { result1 error @@ -1233,21 +1232,20 @@ func (fake *FakeParticipant) ToProtoReturnsOnCall(i int, result1 *livekit.Partic }{result1} } -func (fake *FakeParticipant) UpdateSubscriptionPermission(arg1 *livekit.SubscriptionPermission, arg2 utils.TimedVersion, arg3 func(participantIdentity livekit.ParticipantIdentity) types.LocalParticipant, arg4 func(participantID livekit.ParticipantID) types.LocalParticipant) error { +func (fake *FakeParticipant) UpdateSubscriptionPermission(arg1 *livekit.SubscriptionPermission, arg2 utils.TimedVersion, arg3 func(participantID livekit.ParticipantID) types.LocalParticipant) error { fake.updateSubscriptionPermissionMutex.Lock() ret, specificReturn := fake.updateSubscriptionPermissionReturnsOnCall[len(fake.updateSubscriptionPermissionArgsForCall)] fake.updateSubscriptionPermissionArgsForCall = append(fake.updateSubscriptionPermissionArgsForCall, struct { arg1 *livekit.SubscriptionPermission arg2 utils.TimedVersion - arg3 func(participantIdentity livekit.ParticipantIdentity) types.LocalParticipant - arg4 func(participantID livekit.ParticipantID) types.LocalParticipant - }{arg1, arg2, arg3, arg4}) + arg3 func(participantID livekit.ParticipantID) types.LocalParticipant + }{arg1, arg2, arg3}) stub := fake.UpdateSubscriptionPermissionStub fakeReturns := fake.updateSubscriptionPermissionReturns - fake.recordInvocation("UpdateSubscriptionPermission", []interface{}{arg1, arg2, arg3, arg4}) + fake.recordInvocation("UpdateSubscriptionPermission", []interface{}{arg1, arg2, arg3}) fake.updateSubscriptionPermissionMutex.Unlock() if stub != nil { - return stub(arg1, arg2, arg3, arg4) + return stub(arg1, arg2, arg3) } if specificReturn { return ret.result1 @@ -1261,17 +1259,17 @@ func (fake *FakeParticipant) UpdateSubscriptionPermissionCallCount() int { return len(fake.updateSubscriptionPermissionArgsForCall) } -func (fake *FakeParticipant) UpdateSubscriptionPermissionCalls(stub func(*livekit.SubscriptionPermission, utils.TimedVersion, func(participantIdentity livekit.ParticipantIdentity) types.LocalParticipant, func(participantID livekit.ParticipantID) types.LocalParticipant) error) { +func (fake *FakeParticipant) UpdateSubscriptionPermissionCalls(stub func(*livekit.SubscriptionPermission, utils.TimedVersion, func(participantID livekit.ParticipantID) types.LocalParticipant) error) { fake.updateSubscriptionPermissionMutex.Lock() defer fake.updateSubscriptionPermissionMutex.Unlock() fake.UpdateSubscriptionPermissionStub = stub } -func (fake *FakeParticipant) UpdateSubscriptionPermissionArgsForCall(i int) (*livekit.SubscriptionPermission, utils.TimedVersion, func(participantIdentity livekit.ParticipantIdentity) types.LocalParticipant, func(participantID livekit.ParticipantID) types.LocalParticipant) { +func (fake *FakeParticipant) UpdateSubscriptionPermissionArgsForCall(i int) (*livekit.SubscriptionPermission, utils.TimedVersion, func(participantID livekit.ParticipantID) types.LocalParticipant) { fake.updateSubscriptionPermissionMutex.RLock() defer fake.updateSubscriptionPermissionMutex.RUnlock() argsForCall := fake.updateSubscriptionPermissionArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4 + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 } func (fake *FakeParticipant) UpdateSubscriptionPermissionReturns(result1 error) { diff --git a/pkg/rtc/uptrackmanager.go b/pkg/rtc/uptrackmanager.go index ce5177661..3bf95bbf3 100644 --- a/pkg/rtc/uptrackmanager.go +++ b/pkg/rtc/uptrackmanager.go @@ -142,7 +142,6 @@ func (u *UpTrackManager) GetPublishedTracks() []types.MediaTrack { func (u *UpTrackManager) UpdateSubscriptionPermission( subscriptionPermission *livekit.SubscriptionPermission, timedVersion utils.TimedVersion, - _ func(participantIdentity livekit.ParticipantIdentity) types.LocalParticipant, // TODO: separate PR to remove this argument resolverBySid func(participantID livekit.ParticipantID) types.LocalParticipant, ) error { u.lock.Lock() diff --git a/pkg/rtc/uptrackmanager_test.go b/pkg/rtc/uptrackmanager_test.go index 46a8c96cf..5556bc023 100644 --- a/pkg/rtc/uptrackmanager_test.go +++ b/pkg/rtc/uptrackmanager_test.go @@ -49,14 +49,14 @@ func TestUpdateSubscriptionPermission(t *testing.T) { subscriptionPermission := &livekit.SubscriptionPermission{ AllParticipants: true, } - um.UpdateSubscriptionPermission(subscriptionPermission, vg.Next(), nil, nil) + um.UpdateSubscriptionPermission(subscriptionPermission, vg.Next(), nil) require.Nil(t, um.subscriberPermissions) // nobody is allowed to subscribe subscriptionPermission = &livekit.SubscriptionPermission{ TrackPermissions: []*livekit.TrackPermission{}, } - um.UpdateSubscriptionPermission(subscriptionPermission, vg.Next(), nil, nil) + um.UpdateSubscriptionPermission(subscriptionPermission, vg.Next(), nil) require.NotNil(t, um.subscriberPermissions) require.Equal(t, 0, len(um.subscriberPermissions)) @@ -92,7 +92,7 @@ func TestUpdateSubscriptionPermission(t *testing.T) { perms2, }, } - um.UpdateSubscriptionPermission(subscriptionPermission, vg.Next(), nil, sidResolver) + um.UpdateSubscriptionPermission(subscriptionPermission, vg.Next(), sidResolver) require.Equal(t, 2, len(um.subscriberPermissions)) require.EqualValues(t, perms1, um.subscriberPermissions["p1"]) require.EqualValues(t, perms2, um.subscriberPermissions["p2"]) @@ -117,7 +117,7 @@ func TestUpdateSubscriptionPermission(t *testing.T) { perms3, }, } - um.UpdateSubscriptionPermission(subscriptionPermission, vg.Next(), nil, nil) + um.UpdateSubscriptionPermission(subscriptionPermission, vg.Next(), nil) require.Equal(t, 3, len(um.subscriberPermissions)) require.EqualValues(t, perms1, um.subscriberPermissions["p1"]) require.EqualValues(t, perms2, um.subscriberPermissions["p2"]) @@ -170,7 +170,7 @@ func TestUpdateSubscriptionPermission(t *testing.T) { perms2, }, } - err := um.UpdateSubscriptionPermission(subscriptionPermission, vg.Next(), nil, sidResolver) + err := um.UpdateSubscriptionPermission(subscriptionPermission, vg.Next(), sidResolver) require.NoError(t, err) require.Equal(t, 2, len(um.subscriberPermissions)) require.EqualValues(t, perms1, um.subscriberPermissions["p1"]) @@ -189,7 +189,7 @@ func TestUpdateSubscriptionPermission(t *testing.T) { return nil } - err = um.UpdateSubscriptionPermission(subscriptionPermission, vg.Next(), nil, badSidResolver) + err = um.UpdateSubscriptionPermission(subscriptionPermission, vg.Next(), badSidResolver) require.NoError(t, err) require.Equal(t, 2, len(um.subscriberPermissions)) require.EqualValues(t, perms1, um.subscriberPermissions["p1"]) @@ -202,16 +202,16 @@ func TestUpdateSubscriptionPermission(t *testing.T) { v0, v1, v2 := vg.Next(), vg.Next(), vg.Next() - um.UpdateSubscriptionPermission(&livekit.SubscriptionPermission{}, v1, nil, nil) + um.UpdateSubscriptionPermission(&livekit.SubscriptionPermission{}, v1, nil) require.Equal(t, v1.Load(), um.subscriptionPermissionVersion.Load(), "first update should be applied") - um.UpdateSubscriptionPermission(&livekit.SubscriptionPermission{}, v2, nil, nil) + um.UpdateSubscriptionPermission(&livekit.SubscriptionPermission{}, v2, nil) require.Equal(t, v2.Load(), um.subscriptionPermissionVersion.Load(), "ordered updates should be applied") - um.UpdateSubscriptionPermission(&livekit.SubscriptionPermission{}, v0, nil, nil) + um.UpdateSubscriptionPermission(&livekit.SubscriptionPermission{}, v0, nil) require.Equal(t, v2.Load(), um.subscriptionPermissionVersion.Load(), "out of order updates should be ignored") - um.UpdateSubscriptionPermission(&livekit.SubscriptionPermission{}, utils.TimedVersion{}, nil, nil) + um.UpdateSubscriptionPermission(&livekit.SubscriptionPermission{}, utils.TimedVersion{}, nil) require.True(t, um.subscriptionPermissionVersion.After(&v2), "zero version in updates should use next local version") }) } @@ -233,7 +233,7 @@ func TestSubscriptionPermission(t *testing.T) { subscriptionPermission := &livekit.SubscriptionPermission{ AllParticipants: true, } - um.UpdateSubscriptionPermission(subscriptionPermission, vg.Next(), nil, nil) + um.UpdateSubscriptionPermission(subscriptionPermission, vg.Next(), nil) require.True(t, um.hasPermissionLocked("audio", "p1")) require.True(t, um.hasPermissionLocked("audio", "p2")) @@ -241,7 +241,7 @@ func TestSubscriptionPermission(t *testing.T) { subscriptionPermission = &livekit.SubscriptionPermission{ TrackPermissions: []*livekit.TrackPermission{}, } - um.UpdateSubscriptionPermission(subscriptionPermission, vg.Next(), nil, nil) + um.UpdateSubscriptionPermission(subscriptionPermission, vg.Next(), nil) require.False(t, um.hasPermissionLocked("audio", "p1")) require.False(t, um.hasPermissionLocked("audio", "p2")) @@ -258,7 +258,7 @@ func TestSubscriptionPermission(t *testing.T) { }, }, } - um.UpdateSubscriptionPermission(subscriptionPermission, vg.Next(), nil, nil) + um.UpdateSubscriptionPermission(subscriptionPermission, vg.Next(), nil) require.True(t, um.hasPermissionLocked("audio", "p1")) require.True(t, um.hasPermissionLocked("video", "p1")) require.True(t, um.hasPermissionLocked("audio", "p2")) @@ -293,7 +293,7 @@ func TestSubscriptionPermission(t *testing.T) { }, }, } - um.UpdateSubscriptionPermission(subscriptionPermission, vg.Next(), nil, nil) + um.UpdateSubscriptionPermission(subscriptionPermission, vg.Next(), nil) require.True(t, um.hasPermissionLocked("audio", "p1")) require.True(t, um.hasPermissionLocked("video", "p1")) require.True(t, um.hasPermissionLocked("screen", "p1"))