From b97d59b8dbef99c95e1e95ec7c5ac2b7d818ee92 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Thu, 22 Sep 2022 15:59:27 -0700 Subject: [PATCH] consolidate room internal (#1030) * consolidate room internal * create room internal map * pipelined room read * check error * fix pipelined reads * clean up after test --- pkg/rtc/room.go | 4 + pkg/service/egress.go | 2 +- pkg/service/interfaces.go | 6 +- pkg/service/localstore.go | 30 ++- pkg/service/redisstore.go | 83 +++---- pkg/service/redisstore_test.go | 28 +++ pkg/service/roomallocator.go | 14 +- pkg/service/roomallocator_test.go | 2 +- pkg/service/roommanager.go | 11 +- pkg/service/roomservice.go | 6 +- pkg/service/roomservice_test.go | 2 +- pkg/service/rtcservice.go | 2 +- pkg/service/servicefakes/fake_object_store.go | 222 +++--------------- .../servicefakes/fake_service_store.go | 126 ++-------- pkg/service/turn.go | 2 +- 15 files changed, 168 insertions(+), 372 deletions(-) diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index d6dd88fba..b9334caed 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -194,6 +194,10 @@ func (r *Room) LastLeftAt() int64 { return r.leftAt.Load() } +func (r *Room) Internal() *livekit.RoomInternal { + return r.internal +} + func (r *Room) Hold() bool { r.lock.Lock() defer r.lock.Unlock() diff --git a/pkg/service/egress.go b/pkg/service/egress.go index 7249b425d..d88e83bc9 100644 --- a/pkg/service/egress.go +++ b/pkg/service/egress.go @@ -110,7 +110,7 @@ func (s *EgressService) StartEgress(ctx context.Context, roomName livekit.RoomNa return nil, ErrEgressNotConnected } - room, err := s.store.LoadRoom(ctx, roomName) + room, _, err := s.store.LoadRoom(ctx, roomName, false) if err != nil { return nil, err } diff --git a/pkg/service/interfaces.go b/pkg/service/interfaces.go index d31e2d492..88e051770 100644 --- a/pkg/service/interfaces.go +++ b/pkg/service/interfaces.go @@ -20,8 +20,7 @@ type ObjectStore interface { LockRoom(ctx context.Context, roomName livekit.RoomName, duration time.Duration) (string, error) UnlockRoom(ctx context.Context, roomName livekit.RoomName, uid string) error - StoreRoom(ctx context.Context, room *livekit.Room) error - StoreRoomInternal(ctx context.Context, roomName livekit.RoomName, internal *livekit.RoomInternal) error + StoreRoom(ctx context.Context, room *livekit.Room, internal *livekit.RoomInternal) error DeleteRoom(ctx context.Context, roomName livekit.RoomName) error StoreParticipant(ctx context.Context, roomName livekit.RoomName, participant *livekit.ParticipantInfo) error @@ -30,8 +29,7 @@ type ObjectStore interface { //counterfeiter:generate . ServiceStore type ServiceStore interface { - LoadRoom(ctx context.Context, roomName livekit.RoomName) (*livekit.Room, error) - LoadRoomInternal(ctx context.Context, roomName livekit.RoomName) (*livekit.RoomInternal, error) + LoadRoom(ctx context.Context, roomName livekit.RoomName, includeInternal bool) (*livekit.Room, *livekit.RoomInternal, error) // ListRooms returns currently active rooms. if names is not nil, it'll filter and return // only rooms that match diff --git a/pkg/service/localstore.go b/pkg/service/localstore.go index b980ed3d9..53518022b 100644 --- a/pkg/service/localstore.go +++ b/pkg/service/localstore.go @@ -25,44 +25,41 @@ type LocalStore struct { func NewLocalStore() *LocalStore { return &LocalStore{ rooms: make(map[livekit.RoomName]*livekit.Room), + roomInternal: make(map[livekit.RoomName]*livekit.RoomInternal), participants: make(map[livekit.RoomName]map[livekit.ParticipantIdentity]*livekit.ParticipantInfo), lock: sync.RWMutex{}, } } -func (s *LocalStore) StoreRoom(_ context.Context, room *livekit.Room) error { +func (s *LocalStore) StoreRoom(_ context.Context, room *livekit.Room, internal *livekit.RoomInternal) error { if room.CreationTime == 0 { room.CreationTime = time.Now().Unix() } - s.lock.Lock() - s.rooms[livekit.RoomName(room.Name)] = room - s.lock.Unlock() - return nil -} + roomName := livekit.RoomName(room.Name) -func (s *LocalStore) StoreRoomInternal(_ context.Context, roomName livekit.RoomName, internal *livekit.RoomInternal) error { s.lock.Lock() + s.rooms[roomName] = room s.roomInternal[roomName] = internal s.lock.Unlock() + return nil } -func (s *LocalStore) LoadRoom(_ context.Context, roomName livekit.RoomName) (*livekit.Room, error) { +func (s *LocalStore) LoadRoom(_ context.Context, roomName livekit.RoomName, includeInternal bool) (*livekit.Room, *livekit.RoomInternal, error) { s.lock.RLock() defer s.lock.RUnlock() room := s.rooms[roomName] if room == nil { - return nil, ErrRoomNotFound + return nil, nil, ErrRoomNotFound } - return room, nil -} -func (s *LocalStore) LoadRoomInternal(_ context.Context, roomName livekit.RoomName) (*livekit.RoomInternal, error) { - s.lock.RLock() - defer s.lock.RUnlock() + var internal *livekit.RoomInternal + if includeInternal { + internal = s.roomInternal[roomName] + } - return s.roomInternal[roomName], nil + return room, internal, nil } func (s *LocalStore) ListRooms(_ context.Context, roomNames []livekit.RoomName) ([]*livekit.Room, error) { @@ -78,7 +75,7 @@ func (s *LocalStore) ListRooms(_ context.Context, roomNames []livekit.RoomName) } func (s *LocalStore) DeleteRoom(ctx context.Context, roomName livekit.RoomName) error { - room, err := s.LoadRoom(ctx, roomName) + room, _, err := s.LoadRoom(ctx, roomName, false) if err == ErrRoomNotFound { return nil } else if err != nil { @@ -90,6 +87,7 @@ func (s *LocalStore) DeleteRoom(ctx context.Context, roomName livekit.RoomName) delete(s.participants, livekit.RoomName(room.Name)) delete(s.rooms, livekit.RoomName(room.Name)) + delete(s.roomInternal, livekit.RoomName(room.Name)) return nil } diff --git a/pkg/service/redisstore.go b/pkg/service/redisstore.go index 121272966..1db9bbb68 100644 --- a/pkg/service/redisstore.go +++ b/pkg/service/redisstore.go @@ -97,18 +97,29 @@ func (s *RedisStore) Stop() { } } -func (s *RedisStore) StoreRoom(_ context.Context, room *livekit.Room) error { +func (s *RedisStore) StoreRoom(_ context.Context, room *livekit.Room, internal *livekit.RoomInternal) error { if room.CreationTime == 0 { room.CreationTime = time.Now().Unix() } - data, err := proto.Marshal(room) + roomData, err := proto.Marshal(room) if err != nil { return err } pp := s.rc.Pipeline() - pp.HSet(s.ctx, RoomsKey, room.Name, data) + pp.HSet(s.ctx, RoomsKey, room.Name, roomData) + + var internalData []byte + if internal != nil { + internalData, err = proto.Marshal(internal) + if err != nil { + return err + } + pp.HSet(s.ctx, RoomInternalKey, room.Name, internalData) + } else { + pp.HDel(s.ctx, RoomInternalKey, room.Name) + } if _, err = pp.Exec(s.ctx); err != nil { return errors.Wrap(err, "could not create room") @@ -116,55 +127,45 @@ func (s *RedisStore) StoreRoom(_ context.Context, room *livekit.Room) error { return nil } -func (s *RedisStore) StoreRoomInternal(_ context.Context, roomName livekit.RoomName, internal *livekit.RoomInternal) error { - data, err := proto.Marshal(internal) - if err != nil { - return err - } - +func (s *RedisStore) LoadRoom(_ context.Context, roomName livekit.RoomName, includeInternal bool) (*livekit.Room, *livekit.RoomInternal, error) { pp := s.rc.Pipeline() - pp.HSet(s.ctx, RoomInternalKey, roomName, data) - - if _, err = pp.Exec(s.ctx); err != nil { - return errors.Wrap(err, "could not create room") + pp.HGet(s.ctx, RoomsKey, string(roomName)) + if includeInternal { + pp.HGet(s.ctx, RoomInternalKey, string(roomName)) } - return nil -} -func (s *RedisStore) LoadRoom(_ context.Context, roomName livekit.RoomName) (*livekit.Room, error) { - data, err := s.rc.HGet(s.ctx, RoomsKey, string(roomName)).Result() + res, err := pp.Exec(s.ctx) + if err != nil && err != redis.Nil { + // if the room exists but internal does not, the pipeline will still return redis.Nil + return nil, nil, err + } + + room := &livekit.Room{} + roomData, err := res[0].(*redis.StringCmd).Result() if err != nil { if err == redis.Nil { err = ErrRoomNotFound } - return nil, err + return nil, nil, err + } + if err = proto.Unmarshal([]byte(roomData), room); err != nil { + return nil, nil, err } - room := livekit.Room{} - err = proto.Unmarshal([]byte(data), &room) - if err != nil { - return nil, err - } - - return &room, nil -} - -func (s *RedisStore) LoadRoomInternal(_ context.Context, roomName livekit.RoomName) (*livekit.RoomInternal, error) { - data, err := s.rc.HGet(s.ctx, RoomInternalKey, string(roomName)).Result() - if err != nil { - if err == redis.Nil { - return nil, nil + var internal *livekit.RoomInternal + if includeInternal { + internalData, err := res[1].(*redis.StringCmd).Result() + if err == nil { + internal = &livekit.RoomInternal{} + if err = proto.Unmarshal([]byte(internalData), internal); err != nil { + return nil, nil, err + } + } else if err != redis.Nil { + return nil, nil, err } - return nil, err } - internal := &livekit.RoomInternal{} - err = proto.Unmarshal([]byte(data), internal) - if err != nil { - return nil, err - } - - return internal, nil + return room, internal, nil } func (s *RedisStore) ListRooms(_ context.Context, roomNames []livekit.RoomName) ([]*livekit.Room, error) { @@ -203,7 +204,7 @@ func (s *RedisStore) ListRooms(_ context.Context, roomNames []livekit.RoomName) } func (s *RedisStore) DeleteRoom(ctx context.Context, roomName livekit.RoomName) error { - _, err := s.LoadRoom(ctx, roomName) + _, _, err := s.LoadRoom(ctx, roomName, false) if err == ErrRoomNotFound { return nil } diff --git a/pkg/service/redisstore_test.go b/pkg/service/redisstore_test.go index 72d3d831c..07fae901b 100644 --- a/pkg/service/redisstore_test.go +++ b/pkg/service/redisstore_test.go @@ -17,6 +17,34 @@ import ( "github.com/livekit/livekit-server/pkg/service" ) +func TestRoomInternal(t *testing.T) { + ctx := context.Background() + rs := service.NewRedisStore(redisClient()) + + room := &livekit.Room{ + Sid: "123", + Name: "test_room", + } + internal := &livekit.RoomInternal{ + TrackEgress: &livekit.AutoTrackEgress{FilePrefix: "egress"}, + } + + require.NoError(t, rs.StoreRoom(ctx, room, internal)) + actualRoom, actualInternal, err := rs.LoadRoom(ctx, livekit.RoomName(room.Name), true) + require.NoError(t, err) + require.Equal(t, room.Sid, actualRoom.Sid) + require.Equal(t, internal.TrackEgress.FilePrefix, actualInternal.TrackEgress.FilePrefix) + + // remove internal + require.NoError(t, rs.StoreRoom(ctx, room, nil)) + _, actualInternal, err = rs.LoadRoom(ctx, livekit.RoomName(room.Name), true) + require.NoError(t, err) + require.Nil(t, actualInternal) + + // clean up + require.NoError(t, rs.DeleteRoom(ctx, "test_room")) +} + func TestParticipantPersistence(t *testing.T) { ctx := context.Background() rs := service.NewRedisStore(redisClient()) diff --git a/pkg/service/roomallocator.go b/pkg/service/roomallocator.go index 3f5c0207e..cde4b42ef 100644 --- a/pkg/service/roomallocator.go +++ b/pkg/service/roomallocator.go @@ -46,7 +46,7 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre }() // find existing room and update it - rm, err := r.roomStore.LoadRoom(ctx, livekit.RoomName(req.Name)) + rm, internal, err := r.roomStore.LoadRoom(ctx, livekit.RoomName(req.Name), true) if err == ErrRoomNotFound { rm = &livekit.Room{ Sid: utils.NewGuid(utils.RoomPrefix), @@ -68,16 +68,12 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre if req.Metadata != "" { rm.Metadata = req.Metadata } - - if err = r.roomStore.StoreRoom(ctx, rm); err != nil { - return nil, err + if req.Egress != nil && req.Egress.Tracks != nil { + internal = &livekit.RoomInternal{TrackEgress: req.Egress.Tracks} } - if req.Egress != nil && req.Egress.Tracks != nil { - internal := &livekit.RoomInternal{TrackEgress: req.Egress.Tracks} - if err = r.roomStore.StoreRoomInternal(ctx, livekit.RoomName(req.Name), internal); err != nil { - return nil, err - } + if err = r.roomStore.StoreRoom(ctx, rm, internal); err != nil { + return nil, err } // check if room already assigned diff --git a/pkg/service/roomallocator_test.go b/pkg/service/roomallocator_test.go index 952f41972..66b0eab1d 100644 --- a/pkg/service/roomallocator_test.go +++ b/pkg/service/roomallocator_test.go @@ -66,7 +66,7 @@ func TestCreateRoom(t *testing.T) { func newTestRoomAllocator(t *testing.T, conf *config.Config, node *livekit.Node) (service.RoomAllocator, *config.Config) { store := &servicefakes.FakeObjectStore{} - store.LoadRoomReturns(nil, service.ErrRoomNotFound) + store.LoadRoomReturns(nil, nil, service.ErrRoomNotFound) router := &routingfakes.FakeRouter{} router.GetNodeForRoomReturns(node, nil) diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 9751ab998..386c9cffa 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -317,7 +317,7 @@ func (r *RoomManager) StartSession( updateParticipantCount := func(proto *livekit.Room) { if !participant.Hidden() { - err = r.roomStore.StoreRoom(ctx, proto) + err = r.roomStore.StoreRoom(ctx, proto, room.Internal()) if err != nil { logger.Errorw("could not store room", err) } @@ -371,12 +371,7 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room } // create new room, get details first - ri, err := r.roomStore.LoadRoom(ctx, roomName) - if err != nil { - return nil, err - } - - internal, err := r.roomStore.LoadRoomInternal(ctx, roomName) + ri, internal, err := r.roomStore.LoadRoom(ctx, roomName, true) if err != nil { return nil, err } @@ -410,7 +405,7 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room }) newRoom.OnMetadataUpdate(func(metadata string) { - if err := r.roomStore.StoreRoom(ctx, newRoom.ToProto()); err != nil { + if err := r.roomStore.StoreRoom(ctx, newRoom.ToProto(), newRoom.Internal()); err != nil { newRoom.Logger.Errorw("could not handle metadata update", err) } }) diff --git a/pkg/service/roomservice.go b/pkg/service/roomservice.go index 462b4f81d..43e2b39b2 100644 --- a/pkg/service/roomservice.go +++ b/pkg/service/roomservice.go @@ -115,7 +115,7 @@ func (s *RoomService) DeleteRoom(ctx context.Context, req *livekit.DeleteRoomReq // we should not return until when the room is confirmed deleted err = confirmExecution(func() error { - _, err := s.roomStore.LoadRoom(ctx, livekit.RoomName(req.Room)) + _, _, err := s.roomStore.LoadRoom(ctx, livekit.RoomName(req.Room), false) if err == nil { return ErrOperationFailed } else if err != ErrRoomNotFound { @@ -308,7 +308,7 @@ func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat return nil, twirpAuthError(err) } - room, err := s.roomStore.LoadRoom(ctx, livekit.RoomName(req.Room)) + room, _, err := s.roomStore.LoadRoom(ctx, livekit.RoomName(req.Room), false) if err != nil { return nil, err } @@ -333,7 +333,7 @@ func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat } err = confirmExecution(func() error { - room, err = s.roomStore.LoadRoom(ctx, livekit.RoomName(req.Room)) + room, _, err = s.roomStore.LoadRoom(ctx, livekit.RoomName(req.Room), false) if err != nil { return err } diff --git a/pkg/service/roomservice_test.go b/pkg/service/roomservice_test.go index 946a8cf29..0eb6bded7 100644 --- a/pkg/service/roomservice_test.go +++ b/pkg/service/roomservice_test.go @@ -25,7 +25,7 @@ func TestDeleteRoom(t *testing.T) { }, } ctx := service.WithGrants(context.Background(), grant) - svc.store.LoadRoomReturns(nil, service.ErrRoomNotFound) + svc.store.LoadRoomReturns(nil, nil, service.ErrRoomNotFound) _, err := svc.DeleteRoom(ctx, &livekit.DeleteRoomRequest{ Room: "testroom", }) diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index 6c5df7188..4a1e41753 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -171,7 +171,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { // when auto create is disabled, we'll check to ensure it's already created if !s.config.Room.AutoCreate { - _, err := s.store.LoadRoom(context.Background(), roomName) + _, _, err := s.store.LoadRoom(context.Background(), roomName, false) if err == ErrRoomNotFound { handleError(w, 404, err, loggerFields...) return diff --git a/pkg/service/servicefakes/fake_object_store.go b/pkg/service/servicefakes/fake_object_store.go index 9574b2c3c..1c5259bf9 100644 --- a/pkg/service/servicefakes/fake_object_store.go +++ b/pkg/service/servicefakes/fake_object_store.go @@ -79,33 +79,22 @@ type FakeObjectStore struct { result1 *livekit.ParticipantInfo result2 error } - LoadRoomStub func(context.Context, livekit.RoomName) (*livekit.Room, error) + LoadRoomStub func(context.Context, livekit.RoomName, bool) (*livekit.Room, *livekit.RoomInternal, error) loadRoomMutex sync.RWMutex loadRoomArgsForCall []struct { arg1 context.Context arg2 livekit.RoomName + arg3 bool } loadRoomReturns struct { result1 *livekit.Room - result2 error + result2 *livekit.RoomInternal + result3 error } loadRoomReturnsOnCall map[int]struct { result1 *livekit.Room - result2 error - } - LoadRoomInternalStub func(context.Context, livekit.RoomName) (*livekit.RoomInternal, error) - loadRoomInternalMutex sync.RWMutex - loadRoomInternalArgsForCall []struct { - arg1 context.Context - arg2 livekit.RoomName - } - loadRoomInternalReturns struct { - result1 *livekit.RoomInternal - result2 error - } - loadRoomInternalReturnsOnCall map[int]struct { - result1 *livekit.RoomInternal - result2 error + result2 *livekit.RoomInternal + result3 error } LockRoomStub func(context.Context, livekit.RoomName, time.Duration) (string, error) lockRoomMutex sync.RWMutex @@ -135,11 +124,12 @@ type FakeObjectStore struct { storeParticipantReturnsOnCall map[int]struct { result1 error } - StoreRoomStub func(context.Context, *livekit.Room) error + StoreRoomStub func(context.Context, *livekit.Room, *livekit.RoomInternal) error storeRoomMutex sync.RWMutex storeRoomArgsForCall []struct { arg1 context.Context arg2 *livekit.Room + arg3 *livekit.RoomInternal } storeRoomReturns struct { result1 error @@ -147,19 +137,6 @@ type FakeObjectStore struct { storeRoomReturnsOnCall map[int]struct { result1 error } - StoreRoomInternalStub func(context.Context, livekit.RoomName, *livekit.RoomInternal) error - storeRoomInternalMutex sync.RWMutex - storeRoomInternalArgsForCall []struct { - arg1 context.Context - arg2 livekit.RoomName - arg3 *livekit.RoomInternal - } - storeRoomInternalReturns struct { - result1 error - } - storeRoomInternalReturnsOnCall map[int]struct { - result1 error - } UnlockRoomStub func(context.Context, livekit.RoomName, string) error unlockRoomMutex sync.RWMutex unlockRoomArgsForCall []struct { @@ -503,24 +480,25 @@ func (fake *FakeObjectStore) LoadParticipantReturnsOnCall(i int, result1 *liveki }{result1, result2} } -func (fake *FakeObjectStore) LoadRoom(arg1 context.Context, arg2 livekit.RoomName) (*livekit.Room, error) { +func (fake *FakeObjectStore) LoadRoom(arg1 context.Context, arg2 livekit.RoomName, arg3 bool) (*livekit.Room, *livekit.RoomInternal, error) { fake.loadRoomMutex.Lock() ret, specificReturn := fake.loadRoomReturnsOnCall[len(fake.loadRoomArgsForCall)] fake.loadRoomArgsForCall = append(fake.loadRoomArgsForCall, struct { arg1 context.Context arg2 livekit.RoomName - }{arg1, arg2}) + arg3 bool + }{arg1, arg2, arg3}) stub := fake.LoadRoomStub fakeReturns := fake.loadRoomReturns - fake.recordInvocation("LoadRoom", []interface{}{arg1, arg2}) + fake.recordInvocation("LoadRoom", []interface{}{arg1, arg2, arg3}) fake.loadRoomMutex.Unlock() if stub != nil { - return stub(arg1, arg2) + return stub(arg1, arg2, arg3) } if specificReturn { - return ret.result1, ret.result2 + return ret.result1, ret.result2, ret.result3 } - return fakeReturns.result1, fakeReturns.result2 + return fakeReturns.result1, fakeReturns.result2, fakeReturns.result3 } func (fake *FakeObjectStore) LoadRoomCallCount() int { @@ -529,108 +507,46 @@ func (fake *FakeObjectStore) LoadRoomCallCount() int { return len(fake.loadRoomArgsForCall) } -func (fake *FakeObjectStore) LoadRoomCalls(stub func(context.Context, livekit.RoomName) (*livekit.Room, error)) { +func (fake *FakeObjectStore) LoadRoomCalls(stub func(context.Context, livekit.RoomName, bool) (*livekit.Room, *livekit.RoomInternal, error)) { fake.loadRoomMutex.Lock() defer fake.loadRoomMutex.Unlock() fake.LoadRoomStub = stub } -func (fake *FakeObjectStore) LoadRoomArgsForCall(i int) (context.Context, livekit.RoomName) { +func (fake *FakeObjectStore) LoadRoomArgsForCall(i int) (context.Context, livekit.RoomName, bool) { fake.loadRoomMutex.RLock() defer fake.loadRoomMutex.RUnlock() argsForCall := fake.loadRoomArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 } -func (fake *FakeObjectStore) LoadRoomReturns(result1 *livekit.Room, result2 error) { +func (fake *FakeObjectStore) LoadRoomReturns(result1 *livekit.Room, result2 *livekit.RoomInternal, result3 error) { fake.loadRoomMutex.Lock() defer fake.loadRoomMutex.Unlock() fake.LoadRoomStub = nil fake.loadRoomReturns = struct { result1 *livekit.Room - result2 error - }{result1, result2} + result2 *livekit.RoomInternal + result3 error + }{result1, result2, result3} } -func (fake *FakeObjectStore) LoadRoomReturnsOnCall(i int, result1 *livekit.Room, result2 error) { +func (fake *FakeObjectStore) LoadRoomReturnsOnCall(i int, result1 *livekit.Room, result2 *livekit.RoomInternal, result3 error) { fake.loadRoomMutex.Lock() defer fake.loadRoomMutex.Unlock() fake.LoadRoomStub = nil if fake.loadRoomReturnsOnCall == nil { fake.loadRoomReturnsOnCall = make(map[int]struct { result1 *livekit.Room - result2 error + result2 *livekit.RoomInternal + result3 error }) } fake.loadRoomReturnsOnCall[i] = struct { result1 *livekit.Room - result2 error - }{result1, result2} -} - -func (fake *FakeObjectStore) LoadRoomInternal(arg1 context.Context, arg2 livekit.RoomName) (*livekit.RoomInternal, error) { - fake.loadRoomInternalMutex.Lock() - ret, specificReturn := fake.loadRoomInternalReturnsOnCall[len(fake.loadRoomInternalArgsForCall)] - fake.loadRoomInternalArgsForCall = append(fake.loadRoomInternalArgsForCall, struct { - arg1 context.Context - arg2 livekit.RoomName - }{arg1, arg2}) - stub := fake.LoadRoomInternalStub - fakeReturns := fake.loadRoomInternalReturns - fake.recordInvocation("LoadRoomInternal", []interface{}{arg1, arg2}) - fake.loadRoomInternalMutex.Unlock() - if stub != nil { - return stub(arg1, arg2) - } - if specificReturn { - return ret.result1, ret.result2 - } - return fakeReturns.result1, fakeReturns.result2 -} - -func (fake *FakeObjectStore) LoadRoomInternalCallCount() int { - fake.loadRoomInternalMutex.RLock() - defer fake.loadRoomInternalMutex.RUnlock() - return len(fake.loadRoomInternalArgsForCall) -} - -func (fake *FakeObjectStore) LoadRoomInternalCalls(stub func(context.Context, livekit.RoomName) (*livekit.RoomInternal, error)) { - fake.loadRoomInternalMutex.Lock() - defer fake.loadRoomInternalMutex.Unlock() - fake.LoadRoomInternalStub = stub -} - -func (fake *FakeObjectStore) LoadRoomInternalArgsForCall(i int) (context.Context, livekit.RoomName) { - fake.loadRoomInternalMutex.RLock() - defer fake.loadRoomInternalMutex.RUnlock() - argsForCall := fake.loadRoomInternalArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 -} - -func (fake *FakeObjectStore) LoadRoomInternalReturns(result1 *livekit.RoomInternal, result2 error) { - fake.loadRoomInternalMutex.Lock() - defer fake.loadRoomInternalMutex.Unlock() - fake.LoadRoomInternalStub = nil - fake.loadRoomInternalReturns = struct { - result1 *livekit.RoomInternal - result2 error - }{result1, result2} -} - -func (fake *FakeObjectStore) LoadRoomInternalReturnsOnCall(i int, result1 *livekit.RoomInternal, result2 error) { - fake.loadRoomInternalMutex.Lock() - defer fake.loadRoomInternalMutex.Unlock() - fake.LoadRoomInternalStub = nil - if fake.loadRoomInternalReturnsOnCall == nil { - fake.loadRoomInternalReturnsOnCall = make(map[int]struct { - result1 *livekit.RoomInternal - result2 error - }) - } - fake.loadRoomInternalReturnsOnCall[i] = struct { - result1 *livekit.RoomInternal - result2 error - }{result1, result2} + result2 *livekit.RoomInternal + result3 error + }{result1, result2, result3} } func (fake *FakeObjectStore) LockRoom(arg1 context.Context, arg2 livekit.RoomName, arg3 time.Duration) (string, error) { @@ -762,19 +678,20 @@ func (fake *FakeObjectStore) StoreParticipantReturnsOnCall(i int, result1 error) }{result1} } -func (fake *FakeObjectStore) StoreRoom(arg1 context.Context, arg2 *livekit.Room) error { +func (fake *FakeObjectStore) StoreRoom(arg1 context.Context, arg2 *livekit.Room, arg3 *livekit.RoomInternal) error { fake.storeRoomMutex.Lock() ret, specificReturn := fake.storeRoomReturnsOnCall[len(fake.storeRoomArgsForCall)] fake.storeRoomArgsForCall = append(fake.storeRoomArgsForCall, struct { arg1 context.Context arg2 *livekit.Room - }{arg1, arg2}) + arg3 *livekit.RoomInternal + }{arg1, arg2, arg3}) stub := fake.StoreRoomStub fakeReturns := fake.storeRoomReturns - fake.recordInvocation("StoreRoom", []interface{}{arg1, arg2}) + fake.recordInvocation("StoreRoom", []interface{}{arg1, arg2, arg3}) fake.storeRoomMutex.Unlock() if stub != nil { - return stub(arg1, arg2) + return stub(arg1, arg2, arg3) } if specificReturn { return ret.result1 @@ -788,17 +705,17 @@ func (fake *FakeObjectStore) StoreRoomCallCount() int { return len(fake.storeRoomArgsForCall) } -func (fake *FakeObjectStore) StoreRoomCalls(stub func(context.Context, *livekit.Room) error) { +func (fake *FakeObjectStore) StoreRoomCalls(stub func(context.Context, *livekit.Room, *livekit.RoomInternal) error) { fake.storeRoomMutex.Lock() defer fake.storeRoomMutex.Unlock() fake.StoreRoomStub = stub } -func (fake *FakeObjectStore) StoreRoomArgsForCall(i int) (context.Context, *livekit.Room) { +func (fake *FakeObjectStore) StoreRoomArgsForCall(i int) (context.Context, *livekit.Room, *livekit.RoomInternal) { fake.storeRoomMutex.RLock() defer fake.storeRoomMutex.RUnlock() argsForCall := fake.storeRoomArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 } func (fake *FakeObjectStore) StoreRoomReturns(result1 error) { @@ -824,69 +741,6 @@ func (fake *FakeObjectStore) StoreRoomReturnsOnCall(i int, result1 error) { }{result1} } -func (fake *FakeObjectStore) StoreRoomInternal(arg1 context.Context, arg2 livekit.RoomName, arg3 *livekit.RoomInternal) error { - fake.storeRoomInternalMutex.Lock() - ret, specificReturn := fake.storeRoomInternalReturnsOnCall[len(fake.storeRoomInternalArgsForCall)] - fake.storeRoomInternalArgsForCall = append(fake.storeRoomInternalArgsForCall, struct { - arg1 context.Context - arg2 livekit.RoomName - arg3 *livekit.RoomInternal - }{arg1, arg2, arg3}) - stub := fake.StoreRoomInternalStub - fakeReturns := fake.storeRoomInternalReturns - fake.recordInvocation("StoreRoomInternal", []interface{}{arg1, arg2, arg3}) - fake.storeRoomInternalMutex.Unlock() - if stub != nil { - return stub(arg1, arg2, arg3) - } - if specificReturn { - return ret.result1 - } - return fakeReturns.result1 -} - -func (fake *FakeObjectStore) StoreRoomInternalCallCount() int { - fake.storeRoomInternalMutex.RLock() - defer fake.storeRoomInternalMutex.RUnlock() - return len(fake.storeRoomInternalArgsForCall) -} - -func (fake *FakeObjectStore) StoreRoomInternalCalls(stub func(context.Context, livekit.RoomName, *livekit.RoomInternal) error) { - fake.storeRoomInternalMutex.Lock() - defer fake.storeRoomInternalMutex.Unlock() - fake.StoreRoomInternalStub = stub -} - -func (fake *FakeObjectStore) StoreRoomInternalArgsForCall(i int) (context.Context, livekit.RoomName, *livekit.RoomInternal) { - fake.storeRoomInternalMutex.RLock() - defer fake.storeRoomInternalMutex.RUnlock() - argsForCall := fake.storeRoomInternalArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 -} - -func (fake *FakeObjectStore) StoreRoomInternalReturns(result1 error) { - fake.storeRoomInternalMutex.Lock() - defer fake.storeRoomInternalMutex.Unlock() - fake.StoreRoomInternalStub = nil - fake.storeRoomInternalReturns = struct { - result1 error - }{result1} -} - -func (fake *FakeObjectStore) StoreRoomInternalReturnsOnCall(i int, result1 error) { - fake.storeRoomInternalMutex.Lock() - defer fake.storeRoomInternalMutex.Unlock() - fake.StoreRoomInternalStub = nil - if fake.storeRoomInternalReturnsOnCall == nil { - fake.storeRoomInternalReturnsOnCall = make(map[int]struct { - result1 error - }) - } - fake.storeRoomInternalReturnsOnCall[i] = struct { - result1 error - }{result1} -} - func (fake *FakeObjectStore) UnlockRoom(arg1 context.Context, arg2 livekit.RoomName, arg3 string) error { fake.unlockRoomMutex.Lock() ret, specificReturn := fake.unlockRoomReturnsOnCall[len(fake.unlockRoomArgsForCall)] @@ -965,16 +819,12 @@ func (fake *FakeObjectStore) Invocations() map[string][][]interface{} { defer fake.loadParticipantMutex.RUnlock() fake.loadRoomMutex.RLock() defer fake.loadRoomMutex.RUnlock() - fake.loadRoomInternalMutex.RLock() - defer fake.loadRoomInternalMutex.RUnlock() fake.lockRoomMutex.RLock() defer fake.lockRoomMutex.RUnlock() fake.storeParticipantMutex.RLock() defer fake.storeParticipantMutex.RUnlock() fake.storeRoomMutex.RLock() defer fake.storeRoomMutex.RUnlock() - fake.storeRoomInternalMutex.RLock() - defer fake.storeRoomInternalMutex.RUnlock() fake.unlockRoomMutex.RLock() defer fake.unlockRoomMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} diff --git a/pkg/service/servicefakes/fake_service_store.go b/pkg/service/servicefakes/fake_service_store.go index b37b493d2..0ac442161 100644 --- a/pkg/service/servicefakes/fake_service_store.go +++ b/pkg/service/servicefakes/fake_service_store.go @@ -53,33 +53,22 @@ type FakeServiceStore struct { result1 *livekit.ParticipantInfo result2 error } - LoadRoomStub func(context.Context, livekit.RoomName) (*livekit.Room, error) + LoadRoomStub func(context.Context, livekit.RoomName, bool) (*livekit.Room, *livekit.RoomInternal, error) loadRoomMutex sync.RWMutex loadRoomArgsForCall []struct { arg1 context.Context arg2 livekit.RoomName + arg3 bool } loadRoomReturns struct { result1 *livekit.Room - result2 error + result2 *livekit.RoomInternal + result3 error } loadRoomReturnsOnCall map[int]struct { result1 *livekit.Room - result2 error - } - LoadRoomInternalStub func(context.Context, livekit.RoomName) (*livekit.RoomInternal, error) - loadRoomInternalMutex sync.RWMutex - loadRoomInternalArgsForCall []struct { - arg1 context.Context - arg2 livekit.RoomName - } - loadRoomInternalReturns struct { - result1 *livekit.RoomInternal - result2 error - } - loadRoomInternalReturnsOnCall map[int]struct { - result1 *livekit.RoomInternal - result2 error + result2 *livekit.RoomInternal + result3 error } invocations map[string][][]interface{} invocationsMutex sync.RWMutex @@ -286,24 +275,25 @@ func (fake *FakeServiceStore) LoadParticipantReturnsOnCall(i int, result1 *livek }{result1, result2} } -func (fake *FakeServiceStore) LoadRoom(arg1 context.Context, arg2 livekit.RoomName) (*livekit.Room, error) { +func (fake *FakeServiceStore) LoadRoom(arg1 context.Context, arg2 livekit.RoomName, arg3 bool) (*livekit.Room, *livekit.RoomInternal, error) { fake.loadRoomMutex.Lock() ret, specificReturn := fake.loadRoomReturnsOnCall[len(fake.loadRoomArgsForCall)] fake.loadRoomArgsForCall = append(fake.loadRoomArgsForCall, struct { arg1 context.Context arg2 livekit.RoomName - }{arg1, arg2}) + arg3 bool + }{arg1, arg2, arg3}) stub := fake.LoadRoomStub fakeReturns := fake.loadRoomReturns - fake.recordInvocation("LoadRoom", []interface{}{arg1, arg2}) + fake.recordInvocation("LoadRoom", []interface{}{arg1, arg2, arg3}) fake.loadRoomMutex.Unlock() if stub != nil { - return stub(arg1, arg2) + return stub(arg1, arg2, arg3) } if specificReturn { - return ret.result1, ret.result2 + return ret.result1, ret.result2, ret.result3 } - return fakeReturns.result1, fakeReturns.result2 + return fakeReturns.result1, fakeReturns.result2, fakeReturns.result3 } func (fake *FakeServiceStore) LoadRoomCallCount() int { @@ -312,108 +302,46 @@ func (fake *FakeServiceStore) LoadRoomCallCount() int { return len(fake.loadRoomArgsForCall) } -func (fake *FakeServiceStore) LoadRoomCalls(stub func(context.Context, livekit.RoomName) (*livekit.Room, error)) { +func (fake *FakeServiceStore) LoadRoomCalls(stub func(context.Context, livekit.RoomName, bool) (*livekit.Room, *livekit.RoomInternal, error)) { fake.loadRoomMutex.Lock() defer fake.loadRoomMutex.Unlock() fake.LoadRoomStub = stub } -func (fake *FakeServiceStore) LoadRoomArgsForCall(i int) (context.Context, livekit.RoomName) { +func (fake *FakeServiceStore) LoadRoomArgsForCall(i int) (context.Context, livekit.RoomName, bool) { fake.loadRoomMutex.RLock() defer fake.loadRoomMutex.RUnlock() argsForCall := fake.loadRoomArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 } -func (fake *FakeServiceStore) LoadRoomReturns(result1 *livekit.Room, result2 error) { +func (fake *FakeServiceStore) LoadRoomReturns(result1 *livekit.Room, result2 *livekit.RoomInternal, result3 error) { fake.loadRoomMutex.Lock() defer fake.loadRoomMutex.Unlock() fake.LoadRoomStub = nil fake.loadRoomReturns = struct { result1 *livekit.Room - result2 error - }{result1, result2} + result2 *livekit.RoomInternal + result3 error + }{result1, result2, result3} } -func (fake *FakeServiceStore) LoadRoomReturnsOnCall(i int, result1 *livekit.Room, result2 error) { +func (fake *FakeServiceStore) LoadRoomReturnsOnCall(i int, result1 *livekit.Room, result2 *livekit.RoomInternal, result3 error) { fake.loadRoomMutex.Lock() defer fake.loadRoomMutex.Unlock() fake.LoadRoomStub = nil if fake.loadRoomReturnsOnCall == nil { fake.loadRoomReturnsOnCall = make(map[int]struct { result1 *livekit.Room - result2 error + result2 *livekit.RoomInternal + result3 error }) } fake.loadRoomReturnsOnCall[i] = struct { result1 *livekit.Room - result2 error - }{result1, result2} -} - -func (fake *FakeServiceStore) LoadRoomInternal(arg1 context.Context, arg2 livekit.RoomName) (*livekit.RoomInternal, error) { - fake.loadRoomInternalMutex.Lock() - ret, specificReturn := fake.loadRoomInternalReturnsOnCall[len(fake.loadRoomInternalArgsForCall)] - fake.loadRoomInternalArgsForCall = append(fake.loadRoomInternalArgsForCall, struct { - arg1 context.Context - arg2 livekit.RoomName - }{arg1, arg2}) - stub := fake.LoadRoomInternalStub - fakeReturns := fake.loadRoomInternalReturns - fake.recordInvocation("LoadRoomInternal", []interface{}{arg1, arg2}) - fake.loadRoomInternalMutex.Unlock() - if stub != nil { - return stub(arg1, arg2) - } - if specificReturn { - return ret.result1, ret.result2 - } - return fakeReturns.result1, fakeReturns.result2 -} - -func (fake *FakeServiceStore) LoadRoomInternalCallCount() int { - fake.loadRoomInternalMutex.RLock() - defer fake.loadRoomInternalMutex.RUnlock() - return len(fake.loadRoomInternalArgsForCall) -} - -func (fake *FakeServiceStore) LoadRoomInternalCalls(stub func(context.Context, livekit.RoomName) (*livekit.RoomInternal, error)) { - fake.loadRoomInternalMutex.Lock() - defer fake.loadRoomInternalMutex.Unlock() - fake.LoadRoomInternalStub = stub -} - -func (fake *FakeServiceStore) LoadRoomInternalArgsForCall(i int) (context.Context, livekit.RoomName) { - fake.loadRoomInternalMutex.RLock() - defer fake.loadRoomInternalMutex.RUnlock() - argsForCall := fake.loadRoomInternalArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 -} - -func (fake *FakeServiceStore) LoadRoomInternalReturns(result1 *livekit.RoomInternal, result2 error) { - fake.loadRoomInternalMutex.Lock() - defer fake.loadRoomInternalMutex.Unlock() - fake.LoadRoomInternalStub = nil - fake.loadRoomInternalReturns = struct { - result1 *livekit.RoomInternal - result2 error - }{result1, result2} -} - -func (fake *FakeServiceStore) LoadRoomInternalReturnsOnCall(i int, result1 *livekit.RoomInternal, result2 error) { - fake.loadRoomInternalMutex.Lock() - defer fake.loadRoomInternalMutex.Unlock() - fake.LoadRoomInternalStub = nil - if fake.loadRoomInternalReturnsOnCall == nil { - fake.loadRoomInternalReturnsOnCall = make(map[int]struct { - result1 *livekit.RoomInternal - result2 error - }) - } - fake.loadRoomInternalReturnsOnCall[i] = struct { - result1 *livekit.RoomInternal - result2 error - }{result1, result2} + result2 *livekit.RoomInternal + result3 error + }{result1, result2, result3} } func (fake *FakeServiceStore) Invocations() map[string][][]interface{} { @@ -427,8 +355,6 @@ func (fake *FakeServiceStore) Invocations() map[string][][]interface{} { defer fake.loadParticipantMutex.RUnlock() fake.loadRoomMutex.RLock() defer fake.loadRoomMutex.RUnlock() - fake.loadRoomInternalMutex.RLock() - defer fake.loadRoomInternalMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value diff --git a/pkg/service/turn.go b/pkg/service/turn.go index 26a4efc44..3cdd39072 100644 --- a/pkg/service/turn.go +++ b/pkg/service/turn.go @@ -131,7 +131,7 @@ func NewTurnServer(conf *config.Config, authHandler turn.AuthHandler, standalone func newTurnAuthHandler(roomStore ObjectStore) turn.AuthHandler { return func(username, realm string, srcAddr net.Addr) (key []byte, ok bool) { // room id should be the username, create a hashed room id - rm, err := roomStore.LoadRoom(context.Background(), livekit.RoomName(username)) + rm, _, err := roomStore.LoadRoom(context.Background(), livekit.RoomName(username), false) if err != nil { return nil, false }