WIP - to show the idea of passing along DisconnnectedReasonn

This commit is contained in:
David Zhao
2024-01-28 22:54:27 -08:00
parent 995fddbaf9
commit 42d0d30c3e
11 changed files with 157 additions and 130 deletions
+4 -2
View File
@@ -179,7 +179,7 @@ type ParticipantImpl struct {
requireBroadcast bool
// queued participant updates before join response is sent
// guarded by updateLock
queuedUpdates []*livekit.ParticipantInfo
queuedUpdates []types.PendingParticipantUpdate
// cache of recently sent updates, to ensuring ordering by version
// guarded by updateLock
updateCache *lru.Cache[livekit.ParticipantID, participantUpdateInfo]
@@ -1077,7 +1077,9 @@ func (p *ParticipantImpl) VerifySubscribeParticipantInfo(pID livekit.Participant
if f := p.params.GetParticipantInfo; f != nil {
if info := f(pID); info != nil {
_ = p.SendParticipantUpdate([]*livekit.ParticipantInfo{info})
_ = p.SendParticipantUpdate([]types.PendingParticipantUpdate{
{Info: info},
})
}
}
}
+2 -2
View File
@@ -230,8 +230,8 @@ func TestOutOfOrderUpdates(t *testing.T) {
require.Greater(t, pi2.Version, pi1.Version)
// send the second update first
require.NoError(t, p.SendParticipantUpdate([]*livekit.ParticipantInfo{pi2}))
require.NoError(t, p.SendParticipantUpdate([]*livekit.ParticipantInfo{pi1}))
require.NoError(t, p.SendParticipantUpdate([]types.PendingParticipantUpdate{{Info: pi2}}))
require.NoError(t, p.SendParticipantUpdate([]types.PendingParticipantUpdate{{Info: pi1}}))
// only sent once, and it's the earlier message
require.Equal(t, 1, sink.WriteMessageCallCount())
+18 -14
View File
@@ -79,7 +79,7 @@ func (p *ParticipantImpl) SendJoinResponse(joinResponse *livekit.JoinResponse) e
return nil
}
func (p *ParticipantImpl) SendParticipantUpdate(participantsToUpdate []*livekit.ParticipantInfo) error {
func (p *ParticipantImpl) SendParticipantUpdate(participantsToUpdate []types.PendingParticipantUpdate) error {
p.updateLock.Lock()
if p.IsDisconnected() {
p.updateLock.Unlock()
@@ -93,30 +93,34 @@ func (p *ParticipantImpl) SendParticipantUpdate(participantsToUpdate []*livekit.
return nil
}
validUpdates := make([]*livekit.ParticipantInfo, 0, len(participantsToUpdate))
for _, pi := range participantsToUpdate {
isValid := true
for _, pu := range participantsToUpdate {
pi := pu.Info
pID := livekit.ParticipantID(pi.Sid)
if pu.MaxProtocolVersion > 0 && p.ProtocolVersion() > pu.MaxProtocolVersion {
p.params.Logger.Infow("skipping participant update due to protocol version",
"otherParticipant", pi.Identity, "otherPID", pi.Sid, "version", pi.Version, "maxVersion", pu.MaxProtocolVersion)
continue
}
if lastVersion, ok := p.updateCache.Get(pID); ok {
// this is a message delivered out of order, a more recent version of the message had already been
// sent.
if pi.Version < lastVersion.version {
p.params.Logger.Debugw("skipping outdated participant update", "otherParticipant", pi.Identity, "otherPID", pi.Sid, "version", pi.Version, "lastVersion", lastVersion)
isValid = false
continue
}
}
if pi.Permission != nil && pi.Permission.Hidden && pi.Sid != string(p.params.SID) {
p.params.Logger.Debugw("skipping hidden participant update", "otherParticipant", pi.Identity)
isValid = false
}
if isValid {
p.updateCache.Add(pID, participantUpdateInfo{
identity: livekit.ParticipantIdentity(pi.Identity),
version: pi.Version,
state: pi.State,
updatedAt: time.Now(),
})
validUpdates = append(validUpdates, pi)
continue
}
p.params.Logger.Infow("queuing valid participant update", "otherParticipant", pi.Identity, "sid", pi.Sid, "state", pi.State, "maxVersion", pu.MaxProtocolVersion)
p.updateCache.Add(pID, participantUpdateInfo{
identity: livekit.ParticipantIdentity(pi.Identity),
version: pi.Version,
state: pi.State,
updatedAt: time.Now(),
})
validUpdates = append(validUpdates, pi)
}
p.updateLock.Unlock()
+73 -57
View File
@@ -65,7 +65,6 @@ var (
type broadcastOptions struct {
skipSource bool
immediate bool
}
type disconnectSignalOnResumeNoMessages struct {
@@ -694,7 +693,7 @@ func (r *Room) SyncState(participant types.LocalParticipant, state *livekit.Sync
}
func (r *Room) UpdateSubscriptionPermission(participant types.LocalParticipant, subscriptionPermission *livekit.SubscriptionPermission) error {
if err := participant.UpdateSubscriptionPermission(subscriptionPermission, utils.TimedVersion{}, r.GetParticipant, r.GetParticipantByID); err != nil {
if err := participant.UpdateSubscriptionPermission(subscriptionPermission, utils.TimedVersion{}, r.GetParticipantByID); err != nil {
return err
}
for _, track := range participant.GetPublishedTracks() {
@@ -910,16 +909,16 @@ func (r *Room) SimulateScenario(participant types.LocalParticipant, simulateScen
return nil
}
func (r *Room) getOtherParticipantInfo(identity livekit.ParticipantIdentity) []*livekit.ParticipantInfo {
func (r *Room) getOtherParticipantInfo(identity livekit.ParticipantIdentity) []types.PendingParticipantUpdate {
participants := r.GetParticipants()
pi := make([]*livekit.ParticipantInfo, 0, len(participants))
pus := make([]types.PendingParticipantUpdate, 0, len(participants))
for _, p := range participants {
if !p.Hidden() && p.Identity() != identity {
pi = append(pi, p.ToProto())
pus = append(pus, types.PendingParticipantUpdate{Info: p.ToProto()})
}
}
return pi
return pus
}
// checks if participant should be autosubscribed to new tracks, assumes lock is already acquired
@@ -1061,7 +1060,7 @@ func (r *Room) onTrackUnpublished(p types.LocalParticipant, track types.MediaTra
func (r *Room) onParticipantUpdate(p types.LocalParticipant) {
r.protoProxy.MarkDirty(false)
// immediately notify when permissions or metadata changed
r.broadcastParticipantState(p, broadcastOptions{immediate: true})
r.broadcastParticipantState(p, broadcastOptions{})
if r.onParticipantChanged != nil {
r.onParticipantChanged(p)
}
@@ -1104,7 +1103,7 @@ func (r *Room) broadcastParticipantState(p types.LocalParticipant, opts broadcas
if p.Hidden() {
if !opts.skipSource {
// send update only to hidden participant
err := p.SendParticipantUpdate([]*livekit.ParticipantInfo{pi})
err := p.SendParticipantUpdate([]types.PendingParticipantUpdate{{Info: pi}})
if err != nil {
r.Logger.Errorw("could not send update to participant", err,
"participant", p.Identity(), "pID", p.ID())
@@ -1113,11 +1112,11 @@ func (r *Room) broadcastParticipantState(p types.LocalParticipant, opts broadcas
return
}
updates := r.pushAndDequeueUpdates(pi, opts.immediate)
updates := r.pushAndDequeueUpdates(pi)
r.sendParticipantUpdates(updates)
}
func (r *Room) sendParticipantUpdates(updates []*livekit.ParticipantInfo) {
func (r *Room) sendParticipantUpdates(updates []types.PendingParticipantUpdate) {
if len(updates) == 0 {
return
}
@@ -1172,56 +1171,16 @@ func (r *Room) sendSpeakerChanges(speakers []*livekit.SpeakerInfo) {
// * subscriber-only updates will be queued for batch updates
// * publisher & immediate updates will be returned without queuing
// * when the SID changes, it will return both updates, with the earlier participant set to disconnected
func (r *Room) pushAndDequeueUpdates(pi *livekit.ParticipantInfo, isImmediate bool) []*livekit.ParticipantInfo {
func (r *Room) pushAndDequeueUpdates(pi *livekit.ParticipantInfo) []types.PendingParticipantUpdate {
r.batchedUpdatesMu.Lock()
defer r.batchedUpdatesMu.Unlock()
var updates []*livekit.ParticipantInfo
identity := livekit.ParticipantIdentity(pi.Identity)
existing := r.batchedUpdates[identity]
shouldSend := isImmediate || pi.IsPublisher
if existing != nil {
if pi.Sid == existing.Sid {
// same participant session
if pi.Version < existing.Version {
// out of order update
return nil
}
} else {
// different participant sessions
if existing.JoinedAt < pi.JoinedAt {
// existing is older, synthesize a DISCONNECT for older and
// send immediately along with newer session to signal switch
shouldSend = true
existing.State = livekit.ParticipantInfo_DISCONNECTED
updates = append(updates, existing)
} else {
// older session update, newer session has already become active, so nothing to do
return nil
}
}
} else {
ep := r.GetParticipant(identity)
if ep != nil {
epi := ep.ToProto()
if epi.JoinedAt > pi.JoinedAt {
// older session update, newer session has already become active, so nothing to do
return nil
}
}
getParticipant := func(identity livekit.ParticipantIdentity) types.Participant {
p := r.GetParticipant(identity)
return p
}
if shouldSend {
// include any queued update, and return
delete(r.batchedUpdates, identity)
updates = append(updates, pi)
} else {
// enqueue for batch
r.batchedUpdates[identity] = pi
}
return updates
return PushAndDequeueUpdates(r.batchedUpdates, getParticipant, pi)
}
func (r *Room) updateProto() *livekit.Room {
@@ -1266,9 +1225,9 @@ func (r *Room) changeUpdateWorker() {
continue
}
updates := make([]*livekit.ParticipantInfo, 0, len(updatesMap))
updates := make([]types.PendingParticipantUpdate, 0, len(updatesMap))
for _, pi := range updatesMap {
updates = append(updates, pi)
updates = append(updates, types.PendingParticipantUpdate{Info: pi})
}
r.sendParticipantUpdates(updates)
}
@@ -1452,6 +1411,63 @@ func (r *Room) DebugInfo() map[string]interface{} {
return info
}
func PushAndDequeueUpdates(
updatesMap map[livekit.ParticipantIdentity]*livekit.ParticipantInfo,
participantGetter func(livekit.ParticipantIdentity) types.Participant,
pu types.PendingParticipantUpdate,
) []types.PendingParticipantUpdate {
var updates []types.PendingParticipantUpdate
pi := pu.Info
identity := livekit.ParticipantIdentity(pi.Identity)
existing := updatesMap[identity]
shouldSend := pi.IsPublisher
if existing != nil {
if pi.Sid == existing.Sid {
// same participant session
if pi.Version < existing.Version {
// out of order update
return nil
}
} else {
// different participant sessions
if existing.JoinedAt < pi.JoinedAt {
// existing is older, synthesize a DISCONNECT for older and
// send immediately along with newer session to signal switch
shouldSend = false
existing.State = livekit.ParticipantInfo_DISCONNECTED
updates = append(updates, types.PendingParticipantUpdate{
Info: existing,
PossibleSIDChange: true,
})
} else {
// older session update, newer session has already become active, so nothing to do
return nil
}
}
} else {
ep := participantGetter(identity)
if ep != nil {
epi := ep.ToProto()
if epi.JoinedAt > pi.JoinedAt {
// older session update, newer session has already become active, so nothing to do
return nil
}
}
}
if shouldSend {
// include any queued update, and return
delete(updatesMap, identity)
updates = append(updates, types.PendingParticipantUpdate{Info: pi})
} else {
// enqueue for batch
updatesMap[identity] = pi
}
return updates
}
func BroadcastDataPacketForRoom(r types.Room, source types.LocalParticipant, dp *livekit.DataPacket, logger logger.Logger) {
dest := dp.GetUser().GetDestinationSids()
var dpData []byte
+6 -6
View File
@@ -271,7 +271,7 @@ func TestPushAndDequeueUpdates(t *testing.T) {
immediate bool
existing *livekit.ParticipantInfo
expected []*livekit.ParticipantInfo
validate func(t *testing.T, rm *Room, updates []*livekit.ParticipantInfo)
validate func(t *testing.T, rm *Room, updates []types.PendingParticipantUpdate)
}{
{
name: "publisher updates are immediate",
@@ -286,7 +286,7 @@ func TestPushAndDequeueUpdates(t *testing.T) {
name: "last version is enqueued",
pi: subscriber1v2,
existing: subscriber1v1,
validate: func(t *testing.T, rm *Room, _ []*livekit.ParticipantInfo) {
validate: func(t *testing.T, rm *Room, _ []types.PendingParticipantUpdate) {
queued := rm.batchedUpdates[livekit.ParticipantIdentity(identity)]
require.NotNil(t, queued)
requirePIEquals(t, subscriber1v2, queued)
@@ -298,7 +298,7 @@ func TestPushAndDequeueUpdates(t *testing.T) {
existing: subscriber1v1,
immediate: true,
expected: []*livekit.ParticipantInfo{subscriber1v2},
validate: func(t *testing.T, rm *Room, _ []*livekit.ParticipantInfo) {
validate: func(t *testing.T, rm *Room, _ []types.PendingParticipantUpdate) {
queued := rm.batchedUpdates[livekit.ParticipantIdentity(identity)]
require.Nil(t, queued)
},
@@ -307,7 +307,7 @@ func TestPushAndDequeueUpdates(t *testing.T) {
name: "out of order updates are rejected",
pi: subscriber1v1,
existing: subscriber1v2,
validate: func(t *testing.T, rm *Room, updates []*livekit.ParticipantInfo) {
validate: func(t *testing.T, rm *Room, _ []types.PendingParticipantUpdate) {
queued := rm.batchedUpdates[livekit.ParticipantIdentity(identity)]
requirePIEquals(t, subscriber1v2, queued)
},
@@ -331,7 +331,7 @@ func TestPushAndDequeueUpdates(t *testing.T) {
pi: publisher1v2,
existing: subscriber1v1,
expected: []*livekit.ParticipantInfo{publisher1v2},
validate: func(t *testing.T, rm *Room, updates []*livekit.ParticipantInfo) {
validate: func(t *testing.T, rm *Room, _ []types.PendingParticipantUpdate) {
require.Empty(t, rm.batchedUpdates)
},
},
@@ -347,7 +347,7 @@ func TestPushAndDequeueUpdates(t *testing.T) {
updates := rm.pushAndDequeueUpdates(tc.pi, tc.immediate)
require.Equal(t, len(tc.expected), len(updates))
for i, item := range tc.expected {
requirePIEquals(t, item, updates[i])
requirePIEquals(t, item, updates[i].Info)
}
if tc.validate != nil {
+8 -2
View File
@@ -282,7 +282,6 @@ type Participant interface {
UpdateSubscriptionPermission(
subscriptionPermission *livekit.SubscriptionPermission,
timedVersion utils.TimedVersion,
resolverByIdentity func(participantIdentity livekit.ParticipantIdentity) LocalParticipant,
resolverBySid func(participantID livekit.ParticipantID) LocalParticipant,
) error
UpdateVideoLayers(updateVideoLayers *livekit.UpdateVideoLayers) error
@@ -368,7 +367,7 @@ type LocalParticipant interface {
// server sent messages
SendJoinResponse(joinResponse *livekit.JoinResponse) error
SendParticipantUpdate(participants []*livekit.ParticipantInfo) error
SendParticipantUpdate(participants []PendingParticipantUpdate) error
SendSpeakerUpdate(speakers []*livekit.SpeakerInfo, force bool) error
SendDataPacket(packet *livekit.DataPacket, data []byte) error
SendRoomUpdate(room *livekit.Room) error
@@ -426,6 +425,13 @@ type LocalParticipant interface {
SetRegionSettings(regionSettings *livekit.RegionSettings)
}
// PendingParticipantUpdate holds a pending ParticipantInfo to be sent to clients
type PendingParticipantUpdate struct {
Info *livekit.ParticipantInfo
DisconnectReason livekit.DisconnectReason
PreviousID livekit.ParticipantID
}
// Room is a container of participants, and can provide room-level actions
//
//counterfeiter:generate . Room
+4
View File
@@ -84,6 +84,10 @@ func (v ProtocolVersion) SupportsAsyncRoomID() bool {
return v > 11
}
func (v ProtocolVersion) SupportsSIDUpdates() bool {
return v > 11
}
func (v ProtocolVersion) SupportsRegionsInLeaveRequest() bool {
return v > 12
}
@@ -669,10 +669,10 @@ type FakeLocalParticipant struct {
sendJoinResponseReturnsOnCall map[int]struct {
result1 error
}
SendParticipantUpdateStub func([]*livekit.ParticipantInfo) error
SendParticipantUpdateStub func([]types.PendingParticipantUpdate) error
sendParticipantUpdateMutex sync.RWMutex
sendParticipantUpdateArgsForCall []struct {
arg1 []*livekit.ParticipantInfo
arg1 []types.PendingParticipantUpdate
}
sendParticipantUpdateReturns struct {
result1 error
@@ -937,13 +937,12 @@ type FakeLocalParticipant struct {
arg1 livekit.TrackID
arg2 *livekit.UpdateTrackSettings
}
UpdateSubscriptionPermissionStub func(*livekit.SubscriptionPermission, utils.TimedVersion, func(participantIdentity livekit.ParticipantIdentity) types.LocalParticipant, func(participantID livekit.ParticipantID) types.LocalParticipant) error
UpdateSubscriptionPermissionStub func(*livekit.SubscriptionPermission, utils.TimedVersion, func(participantID livekit.ParticipantID) types.LocalParticipant) error
updateSubscriptionPermissionMutex sync.RWMutex
updateSubscriptionPermissionArgsForCall []struct {
arg1 *livekit.SubscriptionPermission
arg2 utils.TimedVersion
arg3 func(participantIdentity livekit.ParticipantIdentity) types.LocalParticipant
arg4 func(participantID livekit.ParticipantID) types.LocalParticipant
arg3 func(participantID livekit.ParticipantID) types.LocalParticipant
}
updateSubscriptionPermissionReturns struct {
result1 error
@@ -4497,16 +4496,16 @@ func (fake *FakeLocalParticipant) SendJoinResponseReturnsOnCall(i int, result1 e
}{result1}
}
func (fake *FakeLocalParticipant) SendParticipantUpdate(arg1 []*livekit.ParticipantInfo) error {
var arg1Copy []*livekit.ParticipantInfo
func (fake *FakeLocalParticipant) SendParticipantUpdate(arg1 []types.PendingParticipantUpdate) error {
var arg1Copy []types.PendingParticipantUpdate
if arg1 != nil {
arg1Copy = make([]*livekit.ParticipantInfo, len(arg1))
arg1Copy = make([]types.PendingParticipantUpdate, len(arg1))
copy(arg1Copy, arg1)
}
fake.sendParticipantUpdateMutex.Lock()
ret, specificReturn := fake.sendParticipantUpdateReturnsOnCall[len(fake.sendParticipantUpdateArgsForCall)]
fake.sendParticipantUpdateArgsForCall = append(fake.sendParticipantUpdateArgsForCall, struct {
arg1 []*livekit.ParticipantInfo
arg1 []types.PendingParticipantUpdate
}{arg1Copy})
stub := fake.SendParticipantUpdateStub
fakeReturns := fake.sendParticipantUpdateReturns
@@ -4527,13 +4526,13 @@ func (fake *FakeLocalParticipant) SendParticipantUpdateCallCount() int {
return len(fake.sendParticipantUpdateArgsForCall)
}
func (fake *FakeLocalParticipant) SendParticipantUpdateCalls(stub func([]*livekit.ParticipantInfo) error) {
func (fake *FakeLocalParticipant) SendParticipantUpdateCalls(stub func([]types.PendingParticipantUpdate) error) {
fake.sendParticipantUpdateMutex.Lock()
defer fake.sendParticipantUpdateMutex.Unlock()
fake.SendParticipantUpdateStub = stub
}
func (fake *FakeLocalParticipant) SendParticipantUpdateArgsForCall(i int) []*livekit.ParticipantInfo {
func (fake *FakeLocalParticipant) SendParticipantUpdateArgsForCall(i int) []types.PendingParticipantUpdate {
fake.sendParticipantUpdateMutex.RLock()
defer fake.sendParticipantUpdateMutex.RUnlock()
argsForCall := fake.sendParticipantUpdateArgsForCall[i]
@@ -5992,21 +5991,20 @@ func (fake *FakeLocalParticipant) UpdateSubscribedTrackSettingsArgsForCall(i int
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeLocalParticipant) UpdateSubscriptionPermission(arg1 *livekit.SubscriptionPermission, arg2 utils.TimedVersion, arg3 func(participantIdentity livekit.ParticipantIdentity) types.LocalParticipant, arg4 func(participantID livekit.ParticipantID) types.LocalParticipant) error {
func (fake *FakeLocalParticipant) UpdateSubscriptionPermission(arg1 *livekit.SubscriptionPermission, arg2 utils.TimedVersion, arg3 func(participantID livekit.ParticipantID) types.LocalParticipant) error {
fake.updateSubscriptionPermissionMutex.Lock()
ret, specificReturn := fake.updateSubscriptionPermissionReturnsOnCall[len(fake.updateSubscriptionPermissionArgsForCall)]
fake.updateSubscriptionPermissionArgsForCall = append(fake.updateSubscriptionPermissionArgsForCall, struct {
arg1 *livekit.SubscriptionPermission
arg2 utils.TimedVersion
arg3 func(participantIdentity livekit.ParticipantIdentity) types.LocalParticipant
arg4 func(participantID livekit.ParticipantID) types.LocalParticipant
}{arg1, arg2, arg3, arg4})
arg3 func(participantID livekit.ParticipantID) types.LocalParticipant
}{arg1, arg2, arg3})
stub := fake.UpdateSubscriptionPermissionStub
fakeReturns := fake.updateSubscriptionPermissionReturns
fake.recordInvocation("UpdateSubscriptionPermission", []interface{}{arg1, arg2, arg3, arg4})
fake.recordInvocation("UpdateSubscriptionPermission", []interface{}{arg1, arg2, arg3})
fake.updateSubscriptionPermissionMutex.Unlock()
if stub != nil {
return stub(arg1, arg2, arg3, arg4)
return stub(arg1, arg2, arg3)
}
if specificReturn {
return ret.result1
@@ -6020,17 +6018,17 @@ func (fake *FakeLocalParticipant) UpdateSubscriptionPermissionCallCount() int {
return len(fake.updateSubscriptionPermissionArgsForCall)
}
func (fake *FakeLocalParticipant) UpdateSubscriptionPermissionCalls(stub func(*livekit.SubscriptionPermission, utils.TimedVersion, func(participantIdentity livekit.ParticipantIdentity) types.LocalParticipant, func(participantID livekit.ParticipantID) types.LocalParticipant) error) {
func (fake *FakeLocalParticipant) UpdateSubscriptionPermissionCalls(stub func(*livekit.SubscriptionPermission, utils.TimedVersion, func(participantID livekit.ParticipantID) types.LocalParticipant) error) {
fake.updateSubscriptionPermissionMutex.Lock()
defer fake.updateSubscriptionPermissionMutex.Unlock()
fake.UpdateSubscriptionPermissionStub = stub
}
func (fake *FakeLocalParticipant) UpdateSubscriptionPermissionArgsForCall(i int) (*livekit.SubscriptionPermission, utils.TimedVersion, func(participantIdentity livekit.ParticipantIdentity) types.LocalParticipant, func(participantID livekit.ParticipantID) types.LocalParticipant) {
func (fake *FakeLocalParticipant) UpdateSubscriptionPermissionArgsForCall(i int) (*livekit.SubscriptionPermission, utils.TimedVersion, func(participantID livekit.ParticipantID) types.LocalParticipant) {
fake.updateSubscriptionPermissionMutex.RLock()
defer fake.updateSubscriptionPermissionMutex.RUnlock()
argsForCall := fake.updateSubscriptionPermissionArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
}
func (fake *FakeLocalParticipant) UpdateSubscriptionPermissionReturns(result1 error) {
+10 -12
View File
@@ -201,13 +201,12 @@ type FakeParticipant struct {
toProtoReturnsOnCall map[int]struct {
result1 *livekit.ParticipantInfo
}
UpdateSubscriptionPermissionStub func(*livekit.SubscriptionPermission, utils.TimedVersion, func(participantIdentity livekit.ParticipantIdentity) types.LocalParticipant, func(participantID livekit.ParticipantID) types.LocalParticipant) error
UpdateSubscriptionPermissionStub func(*livekit.SubscriptionPermission, utils.TimedVersion, func(participantID livekit.ParticipantID) types.LocalParticipant) error
updateSubscriptionPermissionMutex sync.RWMutex
updateSubscriptionPermissionArgsForCall []struct {
arg1 *livekit.SubscriptionPermission
arg2 utils.TimedVersion
arg3 func(participantIdentity livekit.ParticipantIdentity) types.LocalParticipant
arg4 func(participantID livekit.ParticipantID) types.LocalParticipant
arg3 func(participantID livekit.ParticipantID) types.LocalParticipant
}
updateSubscriptionPermissionReturns struct {
result1 error
@@ -1233,21 +1232,20 @@ func (fake *FakeParticipant) ToProtoReturnsOnCall(i int, result1 *livekit.Partic
}{result1}
}
func (fake *FakeParticipant) UpdateSubscriptionPermission(arg1 *livekit.SubscriptionPermission, arg2 utils.TimedVersion, arg3 func(participantIdentity livekit.ParticipantIdentity) types.LocalParticipant, arg4 func(participantID livekit.ParticipantID) types.LocalParticipant) error {
func (fake *FakeParticipant) UpdateSubscriptionPermission(arg1 *livekit.SubscriptionPermission, arg2 utils.TimedVersion, arg3 func(participantID livekit.ParticipantID) types.LocalParticipant) error {
fake.updateSubscriptionPermissionMutex.Lock()
ret, specificReturn := fake.updateSubscriptionPermissionReturnsOnCall[len(fake.updateSubscriptionPermissionArgsForCall)]
fake.updateSubscriptionPermissionArgsForCall = append(fake.updateSubscriptionPermissionArgsForCall, struct {
arg1 *livekit.SubscriptionPermission
arg2 utils.TimedVersion
arg3 func(participantIdentity livekit.ParticipantIdentity) types.LocalParticipant
arg4 func(participantID livekit.ParticipantID) types.LocalParticipant
}{arg1, arg2, arg3, arg4})
arg3 func(participantID livekit.ParticipantID) types.LocalParticipant
}{arg1, arg2, arg3})
stub := fake.UpdateSubscriptionPermissionStub
fakeReturns := fake.updateSubscriptionPermissionReturns
fake.recordInvocation("UpdateSubscriptionPermission", []interface{}{arg1, arg2, arg3, arg4})
fake.recordInvocation("UpdateSubscriptionPermission", []interface{}{arg1, arg2, arg3})
fake.updateSubscriptionPermissionMutex.Unlock()
if stub != nil {
return stub(arg1, arg2, arg3, arg4)
return stub(arg1, arg2, arg3)
}
if specificReturn {
return ret.result1
@@ -1261,17 +1259,17 @@ func (fake *FakeParticipant) UpdateSubscriptionPermissionCallCount() int {
return len(fake.updateSubscriptionPermissionArgsForCall)
}
func (fake *FakeParticipant) UpdateSubscriptionPermissionCalls(stub func(*livekit.SubscriptionPermission, utils.TimedVersion, func(participantIdentity livekit.ParticipantIdentity) types.LocalParticipant, func(participantID livekit.ParticipantID) types.LocalParticipant) error) {
func (fake *FakeParticipant) UpdateSubscriptionPermissionCalls(stub func(*livekit.SubscriptionPermission, utils.TimedVersion, func(participantID livekit.ParticipantID) types.LocalParticipant) error) {
fake.updateSubscriptionPermissionMutex.Lock()
defer fake.updateSubscriptionPermissionMutex.Unlock()
fake.UpdateSubscriptionPermissionStub = stub
}
func (fake *FakeParticipant) UpdateSubscriptionPermissionArgsForCall(i int) (*livekit.SubscriptionPermission, utils.TimedVersion, func(participantIdentity livekit.ParticipantIdentity) types.LocalParticipant, func(participantID livekit.ParticipantID) types.LocalParticipant) {
func (fake *FakeParticipant) UpdateSubscriptionPermissionArgsForCall(i int) (*livekit.SubscriptionPermission, utils.TimedVersion, func(participantID livekit.ParticipantID) types.LocalParticipant) {
fake.updateSubscriptionPermissionMutex.RLock()
defer fake.updateSubscriptionPermissionMutex.RUnlock()
argsForCall := fake.updateSubscriptionPermissionArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
}
func (fake *FakeParticipant) UpdateSubscriptionPermissionReturns(result1 error) {
-1
View File
@@ -142,7 +142,6 @@ func (u *UpTrackManager) GetPublishedTracks() []types.MediaTrack {
func (u *UpTrackManager) UpdateSubscriptionPermission(
subscriptionPermission *livekit.SubscriptionPermission,
timedVersion utils.TimedVersion,
_ func(participantIdentity livekit.ParticipantIdentity) types.LocalParticipant, // TODO: separate PR to remove this argument
resolverBySid func(participantID livekit.ParticipantID) types.LocalParticipant,
) error {
u.lock.Lock()
+14 -14
View File
@@ -49,14 +49,14 @@ func TestUpdateSubscriptionPermission(t *testing.T) {
subscriptionPermission := &livekit.SubscriptionPermission{
AllParticipants: true,
}
um.UpdateSubscriptionPermission(subscriptionPermission, vg.Next(), nil, nil)
um.UpdateSubscriptionPermission(subscriptionPermission, vg.Next(), nil)
require.Nil(t, um.subscriberPermissions)
// nobody is allowed to subscribe
subscriptionPermission = &livekit.SubscriptionPermission{
TrackPermissions: []*livekit.TrackPermission{},
}
um.UpdateSubscriptionPermission(subscriptionPermission, vg.Next(), nil, nil)
um.UpdateSubscriptionPermission(subscriptionPermission, vg.Next(), nil)
require.NotNil(t, um.subscriberPermissions)
require.Equal(t, 0, len(um.subscriberPermissions))
@@ -92,7 +92,7 @@ func TestUpdateSubscriptionPermission(t *testing.T) {
perms2,
},
}
um.UpdateSubscriptionPermission(subscriptionPermission, vg.Next(), nil, sidResolver)
um.UpdateSubscriptionPermission(subscriptionPermission, vg.Next(), sidResolver)
require.Equal(t, 2, len(um.subscriberPermissions))
require.EqualValues(t, perms1, um.subscriberPermissions["p1"])
require.EqualValues(t, perms2, um.subscriberPermissions["p2"])
@@ -117,7 +117,7 @@ func TestUpdateSubscriptionPermission(t *testing.T) {
perms3,
},
}
um.UpdateSubscriptionPermission(subscriptionPermission, vg.Next(), nil, nil)
um.UpdateSubscriptionPermission(subscriptionPermission, vg.Next(), nil)
require.Equal(t, 3, len(um.subscriberPermissions))
require.EqualValues(t, perms1, um.subscriberPermissions["p1"])
require.EqualValues(t, perms2, um.subscriberPermissions["p2"])
@@ -170,7 +170,7 @@ func TestUpdateSubscriptionPermission(t *testing.T) {
perms2,
},
}
err := um.UpdateSubscriptionPermission(subscriptionPermission, vg.Next(), nil, sidResolver)
err := um.UpdateSubscriptionPermission(subscriptionPermission, vg.Next(), sidResolver)
require.NoError(t, err)
require.Equal(t, 2, len(um.subscriberPermissions))
require.EqualValues(t, perms1, um.subscriberPermissions["p1"])
@@ -189,7 +189,7 @@ func TestUpdateSubscriptionPermission(t *testing.T) {
return nil
}
err = um.UpdateSubscriptionPermission(subscriptionPermission, vg.Next(), nil, badSidResolver)
err = um.UpdateSubscriptionPermission(subscriptionPermission, vg.Next(), badSidResolver)
require.NoError(t, err)
require.Equal(t, 2, len(um.subscriberPermissions))
require.EqualValues(t, perms1, um.subscriberPermissions["p1"])
@@ -202,16 +202,16 @@ func TestUpdateSubscriptionPermission(t *testing.T) {
v0, v1, v2 := vg.Next(), vg.Next(), vg.Next()
um.UpdateSubscriptionPermission(&livekit.SubscriptionPermission{}, v1, nil, nil)
um.UpdateSubscriptionPermission(&livekit.SubscriptionPermission{}, v1, nil)
require.Equal(t, v1.Load(), um.subscriptionPermissionVersion.Load(), "first update should be applied")
um.UpdateSubscriptionPermission(&livekit.SubscriptionPermission{}, v2, nil, nil)
um.UpdateSubscriptionPermission(&livekit.SubscriptionPermission{}, v2, nil)
require.Equal(t, v2.Load(), um.subscriptionPermissionVersion.Load(), "ordered updates should be applied")
um.UpdateSubscriptionPermission(&livekit.SubscriptionPermission{}, v0, nil, nil)
um.UpdateSubscriptionPermission(&livekit.SubscriptionPermission{}, v0, nil)
require.Equal(t, v2.Load(), um.subscriptionPermissionVersion.Load(), "out of order updates should be ignored")
um.UpdateSubscriptionPermission(&livekit.SubscriptionPermission{}, utils.TimedVersion{}, nil, nil)
um.UpdateSubscriptionPermission(&livekit.SubscriptionPermission{}, utils.TimedVersion{}, nil)
require.True(t, um.subscriptionPermissionVersion.After(&v2), "zero version in updates should use next local version")
})
}
@@ -233,7 +233,7 @@ func TestSubscriptionPermission(t *testing.T) {
subscriptionPermission := &livekit.SubscriptionPermission{
AllParticipants: true,
}
um.UpdateSubscriptionPermission(subscriptionPermission, vg.Next(), nil, nil)
um.UpdateSubscriptionPermission(subscriptionPermission, vg.Next(), nil)
require.True(t, um.hasPermissionLocked("audio", "p1"))
require.True(t, um.hasPermissionLocked("audio", "p2"))
@@ -241,7 +241,7 @@ func TestSubscriptionPermission(t *testing.T) {
subscriptionPermission = &livekit.SubscriptionPermission{
TrackPermissions: []*livekit.TrackPermission{},
}
um.UpdateSubscriptionPermission(subscriptionPermission, vg.Next(), nil, nil)
um.UpdateSubscriptionPermission(subscriptionPermission, vg.Next(), nil)
require.False(t, um.hasPermissionLocked("audio", "p1"))
require.False(t, um.hasPermissionLocked("audio", "p2"))
@@ -258,7 +258,7 @@ func TestSubscriptionPermission(t *testing.T) {
},
},
}
um.UpdateSubscriptionPermission(subscriptionPermission, vg.Next(), nil, nil)
um.UpdateSubscriptionPermission(subscriptionPermission, vg.Next(), nil)
require.True(t, um.hasPermissionLocked("audio", "p1"))
require.True(t, um.hasPermissionLocked("video", "p1"))
require.True(t, um.hasPermissionLocked("audio", "p2"))
@@ -293,7 +293,7 @@ func TestSubscriptionPermission(t *testing.T) {
},
},
}
um.UpdateSubscriptionPermission(subscriptionPermission, vg.Next(), nil, nil)
um.UpdateSubscriptionPermission(subscriptionPermission, vg.Next(), nil)
require.True(t, um.hasPermissionLocked("audio", "p1"))
require.True(t, um.hasPermissionLocked("video", "p1"))
require.True(t, um.hasPermissionLocked("screen", "p1"))