consolidate room internal (#1030)

* consolidate room internal

* create room internal map

* pipelined room read

* check error

* fix pipelined reads

* clean up after test
This commit is contained in:
David Colburn
2022-09-22 15:59:27 -07:00
committed by GitHub
parent 20c565ca02
commit b97d59b8db
15 changed files with 168 additions and 372 deletions

View File

@@ -194,6 +194,10 @@ func (r *Room) LastLeftAt() int64 {
return r.leftAt.Load()
}
func (r *Room) Internal() *livekit.RoomInternal {
return r.internal
}
func (r *Room) Hold() bool {
r.lock.Lock()
defer r.lock.Unlock()

View File

@@ -110,7 +110,7 @@ func (s *EgressService) StartEgress(ctx context.Context, roomName livekit.RoomNa
return nil, ErrEgressNotConnected
}
room, err := s.store.LoadRoom(ctx, roomName)
room, _, err := s.store.LoadRoom(ctx, roomName, false)
if err != nil {
return nil, err
}

View File

@@ -20,8 +20,7 @@ type ObjectStore interface {
LockRoom(ctx context.Context, roomName livekit.RoomName, duration time.Duration) (string, error)
UnlockRoom(ctx context.Context, roomName livekit.RoomName, uid string) error
StoreRoom(ctx context.Context, room *livekit.Room) error
StoreRoomInternal(ctx context.Context, roomName livekit.RoomName, internal *livekit.RoomInternal) error
StoreRoom(ctx context.Context, room *livekit.Room, internal *livekit.RoomInternal) error
DeleteRoom(ctx context.Context, roomName livekit.RoomName) error
StoreParticipant(ctx context.Context, roomName livekit.RoomName, participant *livekit.ParticipantInfo) error
@@ -30,8 +29,7 @@ type ObjectStore interface {
//counterfeiter:generate . ServiceStore
type ServiceStore interface {
LoadRoom(ctx context.Context, roomName livekit.RoomName) (*livekit.Room, error)
LoadRoomInternal(ctx context.Context, roomName livekit.RoomName) (*livekit.RoomInternal, error)
LoadRoom(ctx context.Context, roomName livekit.RoomName, includeInternal bool) (*livekit.Room, *livekit.RoomInternal, error)
// ListRooms returns currently active rooms. if names is not nil, it'll filter and return
// only rooms that match

View File

@@ -25,44 +25,41 @@ type LocalStore struct {
func NewLocalStore() *LocalStore {
return &LocalStore{
rooms: make(map[livekit.RoomName]*livekit.Room),
roomInternal: make(map[livekit.RoomName]*livekit.RoomInternal),
participants: make(map[livekit.RoomName]map[livekit.ParticipantIdentity]*livekit.ParticipantInfo),
lock: sync.RWMutex{},
}
}
func (s *LocalStore) StoreRoom(_ context.Context, room *livekit.Room) error {
func (s *LocalStore) StoreRoom(_ context.Context, room *livekit.Room, internal *livekit.RoomInternal) error {
if room.CreationTime == 0 {
room.CreationTime = time.Now().Unix()
}
s.lock.Lock()
s.rooms[livekit.RoomName(room.Name)] = room
s.lock.Unlock()
return nil
}
roomName := livekit.RoomName(room.Name)
func (s *LocalStore) StoreRoomInternal(_ context.Context, roomName livekit.RoomName, internal *livekit.RoomInternal) error {
s.lock.Lock()
s.rooms[roomName] = room
s.roomInternal[roomName] = internal
s.lock.Unlock()
return nil
}
func (s *LocalStore) LoadRoom(_ context.Context, roomName livekit.RoomName) (*livekit.Room, error) {
func (s *LocalStore) LoadRoom(_ context.Context, roomName livekit.RoomName, includeInternal bool) (*livekit.Room, *livekit.RoomInternal, error) {
s.lock.RLock()
defer s.lock.RUnlock()
room := s.rooms[roomName]
if room == nil {
return nil, ErrRoomNotFound
return nil, nil, ErrRoomNotFound
}
return room, nil
}
func (s *LocalStore) LoadRoomInternal(_ context.Context, roomName livekit.RoomName) (*livekit.RoomInternal, error) {
s.lock.RLock()
defer s.lock.RUnlock()
var internal *livekit.RoomInternal
if includeInternal {
internal = s.roomInternal[roomName]
}
return s.roomInternal[roomName], nil
return room, internal, nil
}
func (s *LocalStore) ListRooms(_ context.Context, roomNames []livekit.RoomName) ([]*livekit.Room, error) {
@@ -78,7 +75,7 @@ func (s *LocalStore) ListRooms(_ context.Context, roomNames []livekit.RoomName)
}
func (s *LocalStore) DeleteRoom(ctx context.Context, roomName livekit.RoomName) error {
room, err := s.LoadRoom(ctx, roomName)
room, _, err := s.LoadRoom(ctx, roomName, false)
if err == ErrRoomNotFound {
return nil
} else if err != nil {
@@ -90,6 +87,7 @@ func (s *LocalStore) DeleteRoom(ctx context.Context, roomName livekit.RoomName)
delete(s.participants, livekit.RoomName(room.Name))
delete(s.rooms, livekit.RoomName(room.Name))
delete(s.roomInternal, livekit.RoomName(room.Name))
return nil
}

View File

@@ -97,18 +97,29 @@ func (s *RedisStore) Stop() {
}
}
func (s *RedisStore) StoreRoom(_ context.Context, room *livekit.Room) error {
func (s *RedisStore) StoreRoom(_ context.Context, room *livekit.Room, internal *livekit.RoomInternal) error {
if room.CreationTime == 0 {
room.CreationTime = time.Now().Unix()
}
data, err := proto.Marshal(room)
roomData, err := proto.Marshal(room)
if err != nil {
return err
}
pp := s.rc.Pipeline()
pp.HSet(s.ctx, RoomsKey, room.Name, data)
pp.HSet(s.ctx, RoomsKey, room.Name, roomData)
var internalData []byte
if internal != nil {
internalData, err = proto.Marshal(internal)
if err != nil {
return err
}
pp.HSet(s.ctx, RoomInternalKey, room.Name, internalData)
} else {
pp.HDel(s.ctx, RoomInternalKey, room.Name)
}
if _, err = pp.Exec(s.ctx); err != nil {
return errors.Wrap(err, "could not create room")
@@ -116,55 +127,45 @@ func (s *RedisStore) StoreRoom(_ context.Context, room *livekit.Room) error {
return nil
}
func (s *RedisStore) StoreRoomInternal(_ context.Context, roomName livekit.RoomName, internal *livekit.RoomInternal) error {
data, err := proto.Marshal(internal)
if err != nil {
return err
}
func (s *RedisStore) LoadRoom(_ context.Context, roomName livekit.RoomName, includeInternal bool) (*livekit.Room, *livekit.RoomInternal, error) {
pp := s.rc.Pipeline()
pp.HSet(s.ctx, RoomInternalKey, roomName, data)
if _, err = pp.Exec(s.ctx); err != nil {
return errors.Wrap(err, "could not create room")
pp.HGet(s.ctx, RoomsKey, string(roomName))
if includeInternal {
pp.HGet(s.ctx, RoomInternalKey, string(roomName))
}
return nil
}
func (s *RedisStore) LoadRoom(_ context.Context, roomName livekit.RoomName) (*livekit.Room, error) {
data, err := s.rc.HGet(s.ctx, RoomsKey, string(roomName)).Result()
res, err := pp.Exec(s.ctx)
if err != nil && err != redis.Nil {
// if the room exists but internal does not, the pipeline will still return redis.Nil
return nil, nil, err
}
room := &livekit.Room{}
roomData, err := res[0].(*redis.StringCmd).Result()
if err != nil {
if err == redis.Nil {
err = ErrRoomNotFound
}
return nil, err
return nil, nil, err
}
if err = proto.Unmarshal([]byte(roomData), room); err != nil {
return nil, nil, err
}
room := livekit.Room{}
err = proto.Unmarshal([]byte(data), &room)
if err != nil {
return nil, err
}
return &room, nil
}
func (s *RedisStore) LoadRoomInternal(_ context.Context, roomName livekit.RoomName) (*livekit.RoomInternal, error) {
data, err := s.rc.HGet(s.ctx, RoomInternalKey, string(roomName)).Result()
if err != nil {
if err == redis.Nil {
return nil, nil
var internal *livekit.RoomInternal
if includeInternal {
internalData, err := res[1].(*redis.StringCmd).Result()
if err == nil {
internal = &livekit.RoomInternal{}
if err = proto.Unmarshal([]byte(internalData), internal); err != nil {
return nil, nil, err
}
} else if err != redis.Nil {
return nil, nil, err
}
return nil, err
}
internal := &livekit.RoomInternal{}
err = proto.Unmarshal([]byte(data), internal)
if err != nil {
return nil, err
}
return internal, nil
return room, internal, nil
}
func (s *RedisStore) ListRooms(_ context.Context, roomNames []livekit.RoomName) ([]*livekit.Room, error) {
@@ -203,7 +204,7 @@ func (s *RedisStore) ListRooms(_ context.Context, roomNames []livekit.RoomName)
}
func (s *RedisStore) DeleteRoom(ctx context.Context, roomName livekit.RoomName) error {
_, err := s.LoadRoom(ctx, roomName)
_, _, err := s.LoadRoom(ctx, roomName, false)
if err == ErrRoomNotFound {
return nil
}

View File

@@ -17,6 +17,34 @@ import (
"github.com/livekit/livekit-server/pkg/service"
)
func TestRoomInternal(t *testing.T) {
ctx := context.Background()
rs := service.NewRedisStore(redisClient())
room := &livekit.Room{
Sid: "123",
Name: "test_room",
}
internal := &livekit.RoomInternal{
TrackEgress: &livekit.AutoTrackEgress{FilePrefix: "egress"},
}
require.NoError(t, rs.StoreRoom(ctx, room, internal))
actualRoom, actualInternal, err := rs.LoadRoom(ctx, livekit.RoomName(room.Name), true)
require.NoError(t, err)
require.Equal(t, room.Sid, actualRoom.Sid)
require.Equal(t, internal.TrackEgress.FilePrefix, actualInternal.TrackEgress.FilePrefix)
// remove internal
require.NoError(t, rs.StoreRoom(ctx, room, nil))
_, actualInternal, err = rs.LoadRoom(ctx, livekit.RoomName(room.Name), true)
require.NoError(t, err)
require.Nil(t, actualInternal)
// clean up
require.NoError(t, rs.DeleteRoom(ctx, "test_room"))
}
func TestParticipantPersistence(t *testing.T) {
ctx := context.Background()
rs := service.NewRedisStore(redisClient())

View File

@@ -46,7 +46,7 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre
}()
// find existing room and update it
rm, err := r.roomStore.LoadRoom(ctx, livekit.RoomName(req.Name))
rm, internal, err := r.roomStore.LoadRoom(ctx, livekit.RoomName(req.Name), true)
if err == ErrRoomNotFound {
rm = &livekit.Room{
Sid: utils.NewGuid(utils.RoomPrefix),
@@ -68,16 +68,12 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre
if req.Metadata != "" {
rm.Metadata = req.Metadata
}
if err = r.roomStore.StoreRoom(ctx, rm); err != nil {
return nil, err
if req.Egress != nil && req.Egress.Tracks != nil {
internal = &livekit.RoomInternal{TrackEgress: req.Egress.Tracks}
}
if req.Egress != nil && req.Egress.Tracks != nil {
internal := &livekit.RoomInternal{TrackEgress: req.Egress.Tracks}
if err = r.roomStore.StoreRoomInternal(ctx, livekit.RoomName(req.Name), internal); err != nil {
return nil, err
}
if err = r.roomStore.StoreRoom(ctx, rm, internal); err != nil {
return nil, err
}
// check if room already assigned

View File

@@ -66,7 +66,7 @@ func TestCreateRoom(t *testing.T) {
func newTestRoomAllocator(t *testing.T, conf *config.Config, node *livekit.Node) (service.RoomAllocator, *config.Config) {
store := &servicefakes.FakeObjectStore{}
store.LoadRoomReturns(nil, service.ErrRoomNotFound)
store.LoadRoomReturns(nil, nil, service.ErrRoomNotFound)
router := &routingfakes.FakeRouter{}
router.GetNodeForRoomReturns(node, nil)

View File

@@ -317,7 +317,7 @@ func (r *RoomManager) StartSession(
updateParticipantCount := func(proto *livekit.Room) {
if !participant.Hidden() {
err = r.roomStore.StoreRoom(ctx, proto)
err = r.roomStore.StoreRoom(ctx, proto, room.Internal())
if err != nil {
logger.Errorw("could not store room", err)
}
@@ -371,12 +371,7 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room
}
// create new room, get details first
ri, err := r.roomStore.LoadRoom(ctx, roomName)
if err != nil {
return nil, err
}
internal, err := r.roomStore.LoadRoomInternal(ctx, roomName)
ri, internal, err := r.roomStore.LoadRoom(ctx, roomName, true)
if err != nil {
return nil, err
}
@@ -410,7 +405,7 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room
})
newRoom.OnMetadataUpdate(func(metadata string) {
if err := r.roomStore.StoreRoom(ctx, newRoom.ToProto()); err != nil {
if err := r.roomStore.StoreRoom(ctx, newRoom.ToProto(), newRoom.Internal()); err != nil {
newRoom.Logger.Errorw("could not handle metadata update", err)
}
})

View File

@@ -115,7 +115,7 @@ func (s *RoomService) DeleteRoom(ctx context.Context, req *livekit.DeleteRoomReq
// we should not return until when the room is confirmed deleted
err = confirmExecution(func() error {
_, err := s.roomStore.LoadRoom(ctx, livekit.RoomName(req.Room))
_, _, err := s.roomStore.LoadRoom(ctx, livekit.RoomName(req.Room), false)
if err == nil {
return ErrOperationFailed
} else if err != ErrRoomNotFound {
@@ -308,7 +308,7 @@ func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat
return nil, twirpAuthError(err)
}
room, err := s.roomStore.LoadRoom(ctx, livekit.RoomName(req.Room))
room, _, err := s.roomStore.LoadRoom(ctx, livekit.RoomName(req.Room), false)
if err != nil {
return nil, err
}
@@ -333,7 +333,7 @@ func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat
}
err = confirmExecution(func() error {
room, err = s.roomStore.LoadRoom(ctx, livekit.RoomName(req.Room))
room, _, err = s.roomStore.LoadRoom(ctx, livekit.RoomName(req.Room), false)
if err != nil {
return err
}

View File

@@ -25,7 +25,7 @@ func TestDeleteRoom(t *testing.T) {
},
}
ctx := service.WithGrants(context.Background(), grant)
svc.store.LoadRoomReturns(nil, service.ErrRoomNotFound)
svc.store.LoadRoomReturns(nil, nil, service.ErrRoomNotFound)
_, err := svc.DeleteRoom(ctx, &livekit.DeleteRoomRequest{
Room: "testroom",
})

View File

@@ -171,7 +171,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// when auto create is disabled, we'll check to ensure it's already created
if !s.config.Room.AutoCreate {
_, err := s.store.LoadRoom(context.Background(), roomName)
_, _, err := s.store.LoadRoom(context.Background(), roomName, false)
if err == ErrRoomNotFound {
handleError(w, 404, err, loggerFields...)
return

View File

@@ -79,33 +79,22 @@ type FakeObjectStore struct {
result1 *livekit.ParticipantInfo
result2 error
}
LoadRoomStub func(context.Context, livekit.RoomName) (*livekit.Room, error)
LoadRoomStub func(context.Context, livekit.RoomName, bool) (*livekit.Room, *livekit.RoomInternal, error)
loadRoomMutex sync.RWMutex
loadRoomArgsForCall []struct {
arg1 context.Context
arg2 livekit.RoomName
arg3 bool
}
loadRoomReturns struct {
result1 *livekit.Room
result2 error
result2 *livekit.RoomInternal
result3 error
}
loadRoomReturnsOnCall map[int]struct {
result1 *livekit.Room
result2 error
}
LoadRoomInternalStub func(context.Context, livekit.RoomName) (*livekit.RoomInternal, error)
loadRoomInternalMutex sync.RWMutex
loadRoomInternalArgsForCall []struct {
arg1 context.Context
arg2 livekit.RoomName
}
loadRoomInternalReturns struct {
result1 *livekit.RoomInternal
result2 error
}
loadRoomInternalReturnsOnCall map[int]struct {
result1 *livekit.RoomInternal
result2 error
result2 *livekit.RoomInternal
result3 error
}
LockRoomStub func(context.Context, livekit.RoomName, time.Duration) (string, error)
lockRoomMutex sync.RWMutex
@@ -135,11 +124,12 @@ type FakeObjectStore struct {
storeParticipantReturnsOnCall map[int]struct {
result1 error
}
StoreRoomStub func(context.Context, *livekit.Room) error
StoreRoomStub func(context.Context, *livekit.Room, *livekit.RoomInternal) error
storeRoomMutex sync.RWMutex
storeRoomArgsForCall []struct {
arg1 context.Context
arg2 *livekit.Room
arg3 *livekit.RoomInternal
}
storeRoomReturns struct {
result1 error
@@ -147,19 +137,6 @@ type FakeObjectStore struct {
storeRoomReturnsOnCall map[int]struct {
result1 error
}
StoreRoomInternalStub func(context.Context, livekit.RoomName, *livekit.RoomInternal) error
storeRoomInternalMutex sync.RWMutex
storeRoomInternalArgsForCall []struct {
arg1 context.Context
arg2 livekit.RoomName
arg3 *livekit.RoomInternal
}
storeRoomInternalReturns struct {
result1 error
}
storeRoomInternalReturnsOnCall map[int]struct {
result1 error
}
UnlockRoomStub func(context.Context, livekit.RoomName, string) error
unlockRoomMutex sync.RWMutex
unlockRoomArgsForCall []struct {
@@ -503,24 +480,25 @@ func (fake *FakeObjectStore) LoadParticipantReturnsOnCall(i int, result1 *liveki
}{result1, result2}
}
func (fake *FakeObjectStore) LoadRoom(arg1 context.Context, arg2 livekit.RoomName) (*livekit.Room, error) {
func (fake *FakeObjectStore) LoadRoom(arg1 context.Context, arg2 livekit.RoomName, arg3 bool) (*livekit.Room, *livekit.RoomInternal, error) {
fake.loadRoomMutex.Lock()
ret, specificReturn := fake.loadRoomReturnsOnCall[len(fake.loadRoomArgsForCall)]
fake.loadRoomArgsForCall = append(fake.loadRoomArgsForCall, struct {
arg1 context.Context
arg2 livekit.RoomName
}{arg1, arg2})
arg3 bool
}{arg1, arg2, arg3})
stub := fake.LoadRoomStub
fakeReturns := fake.loadRoomReturns
fake.recordInvocation("LoadRoom", []interface{}{arg1, arg2})
fake.recordInvocation("LoadRoom", []interface{}{arg1, arg2, arg3})
fake.loadRoomMutex.Unlock()
if stub != nil {
return stub(arg1, arg2)
return stub(arg1, arg2, arg3)
}
if specificReturn {
return ret.result1, ret.result2
return ret.result1, ret.result2, ret.result3
}
return fakeReturns.result1, fakeReturns.result2
return fakeReturns.result1, fakeReturns.result2, fakeReturns.result3
}
func (fake *FakeObjectStore) LoadRoomCallCount() int {
@@ -529,108 +507,46 @@ func (fake *FakeObjectStore) LoadRoomCallCount() int {
return len(fake.loadRoomArgsForCall)
}
func (fake *FakeObjectStore) LoadRoomCalls(stub func(context.Context, livekit.RoomName) (*livekit.Room, error)) {
func (fake *FakeObjectStore) LoadRoomCalls(stub func(context.Context, livekit.RoomName, bool) (*livekit.Room, *livekit.RoomInternal, error)) {
fake.loadRoomMutex.Lock()
defer fake.loadRoomMutex.Unlock()
fake.LoadRoomStub = stub
}
func (fake *FakeObjectStore) LoadRoomArgsForCall(i int) (context.Context, livekit.RoomName) {
func (fake *FakeObjectStore) LoadRoomArgsForCall(i int) (context.Context, livekit.RoomName, bool) {
fake.loadRoomMutex.RLock()
defer fake.loadRoomMutex.RUnlock()
argsForCall := fake.loadRoomArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
}
func (fake *FakeObjectStore) LoadRoomReturns(result1 *livekit.Room, result2 error) {
func (fake *FakeObjectStore) LoadRoomReturns(result1 *livekit.Room, result2 *livekit.RoomInternal, result3 error) {
fake.loadRoomMutex.Lock()
defer fake.loadRoomMutex.Unlock()
fake.LoadRoomStub = nil
fake.loadRoomReturns = struct {
result1 *livekit.Room
result2 error
}{result1, result2}
result2 *livekit.RoomInternal
result3 error
}{result1, result2, result3}
}
func (fake *FakeObjectStore) LoadRoomReturnsOnCall(i int, result1 *livekit.Room, result2 error) {
func (fake *FakeObjectStore) LoadRoomReturnsOnCall(i int, result1 *livekit.Room, result2 *livekit.RoomInternal, result3 error) {
fake.loadRoomMutex.Lock()
defer fake.loadRoomMutex.Unlock()
fake.LoadRoomStub = nil
if fake.loadRoomReturnsOnCall == nil {
fake.loadRoomReturnsOnCall = make(map[int]struct {
result1 *livekit.Room
result2 error
result2 *livekit.RoomInternal
result3 error
})
}
fake.loadRoomReturnsOnCall[i] = struct {
result1 *livekit.Room
result2 error
}{result1, result2}
}
func (fake *FakeObjectStore) LoadRoomInternal(arg1 context.Context, arg2 livekit.RoomName) (*livekit.RoomInternal, error) {
fake.loadRoomInternalMutex.Lock()
ret, specificReturn := fake.loadRoomInternalReturnsOnCall[len(fake.loadRoomInternalArgsForCall)]
fake.loadRoomInternalArgsForCall = append(fake.loadRoomInternalArgsForCall, struct {
arg1 context.Context
arg2 livekit.RoomName
}{arg1, arg2})
stub := fake.LoadRoomInternalStub
fakeReturns := fake.loadRoomInternalReturns
fake.recordInvocation("LoadRoomInternal", []interface{}{arg1, arg2})
fake.loadRoomInternalMutex.Unlock()
if stub != nil {
return stub(arg1, arg2)
}
if specificReturn {
return ret.result1, ret.result2
}
return fakeReturns.result1, fakeReturns.result2
}
func (fake *FakeObjectStore) LoadRoomInternalCallCount() int {
fake.loadRoomInternalMutex.RLock()
defer fake.loadRoomInternalMutex.RUnlock()
return len(fake.loadRoomInternalArgsForCall)
}
func (fake *FakeObjectStore) LoadRoomInternalCalls(stub func(context.Context, livekit.RoomName) (*livekit.RoomInternal, error)) {
fake.loadRoomInternalMutex.Lock()
defer fake.loadRoomInternalMutex.Unlock()
fake.LoadRoomInternalStub = stub
}
func (fake *FakeObjectStore) LoadRoomInternalArgsForCall(i int) (context.Context, livekit.RoomName) {
fake.loadRoomInternalMutex.RLock()
defer fake.loadRoomInternalMutex.RUnlock()
argsForCall := fake.loadRoomInternalArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeObjectStore) LoadRoomInternalReturns(result1 *livekit.RoomInternal, result2 error) {
fake.loadRoomInternalMutex.Lock()
defer fake.loadRoomInternalMutex.Unlock()
fake.LoadRoomInternalStub = nil
fake.loadRoomInternalReturns = struct {
result1 *livekit.RoomInternal
result2 error
}{result1, result2}
}
func (fake *FakeObjectStore) LoadRoomInternalReturnsOnCall(i int, result1 *livekit.RoomInternal, result2 error) {
fake.loadRoomInternalMutex.Lock()
defer fake.loadRoomInternalMutex.Unlock()
fake.LoadRoomInternalStub = nil
if fake.loadRoomInternalReturnsOnCall == nil {
fake.loadRoomInternalReturnsOnCall = make(map[int]struct {
result1 *livekit.RoomInternal
result2 error
})
}
fake.loadRoomInternalReturnsOnCall[i] = struct {
result1 *livekit.RoomInternal
result2 error
}{result1, result2}
result2 *livekit.RoomInternal
result3 error
}{result1, result2, result3}
}
func (fake *FakeObjectStore) LockRoom(arg1 context.Context, arg2 livekit.RoomName, arg3 time.Duration) (string, error) {
@@ -762,19 +678,20 @@ func (fake *FakeObjectStore) StoreParticipantReturnsOnCall(i int, result1 error)
}{result1}
}
func (fake *FakeObjectStore) StoreRoom(arg1 context.Context, arg2 *livekit.Room) error {
func (fake *FakeObjectStore) StoreRoom(arg1 context.Context, arg2 *livekit.Room, arg3 *livekit.RoomInternal) error {
fake.storeRoomMutex.Lock()
ret, specificReturn := fake.storeRoomReturnsOnCall[len(fake.storeRoomArgsForCall)]
fake.storeRoomArgsForCall = append(fake.storeRoomArgsForCall, struct {
arg1 context.Context
arg2 *livekit.Room
}{arg1, arg2})
arg3 *livekit.RoomInternal
}{arg1, arg2, arg3})
stub := fake.StoreRoomStub
fakeReturns := fake.storeRoomReturns
fake.recordInvocation("StoreRoom", []interface{}{arg1, arg2})
fake.recordInvocation("StoreRoom", []interface{}{arg1, arg2, arg3})
fake.storeRoomMutex.Unlock()
if stub != nil {
return stub(arg1, arg2)
return stub(arg1, arg2, arg3)
}
if specificReturn {
return ret.result1
@@ -788,17 +705,17 @@ func (fake *FakeObjectStore) StoreRoomCallCount() int {
return len(fake.storeRoomArgsForCall)
}
func (fake *FakeObjectStore) StoreRoomCalls(stub func(context.Context, *livekit.Room) error) {
func (fake *FakeObjectStore) StoreRoomCalls(stub func(context.Context, *livekit.Room, *livekit.RoomInternal) error) {
fake.storeRoomMutex.Lock()
defer fake.storeRoomMutex.Unlock()
fake.StoreRoomStub = stub
}
func (fake *FakeObjectStore) StoreRoomArgsForCall(i int) (context.Context, *livekit.Room) {
func (fake *FakeObjectStore) StoreRoomArgsForCall(i int) (context.Context, *livekit.Room, *livekit.RoomInternal) {
fake.storeRoomMutex.RLock()
defer fake.storeRoomMutex.RUnlock()
argsForCall := fake.storeRoomArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
}
func (fake *FakeObjectStore) StoreRoomReturns(result1 error) {
@@ -824,69 +741,6 @@ func (fake *FakeObjectStore) StoreRoomReturnsOnCall(i int, result1 error) {
}{result1}
}
func (fake *FakeObjectStore) StoreRoomInternal(arg1 context.Context, arg2 livekit.RoomName, arg3 *livekit.RoomInternal) error {
fake.storeRoomInternalMutex.Lock()
ret, specificReturn := fake.storeRoomInternalReturnsOnCall[len(fake.storeRoomInternalArgsForCall)]
fake.storeRoomInternalArgsForCall = append(fake.storeRoomInternalArgsForCall, struct {
arg1 context.Context
arg2 livekit.RoomName
arg3 *livekit.RoomInternal
}{arg1, arg2, arg3})
stub := fake.StoreRoomInternalStub
fakeReturns := fake.storeRoomInternalReturns
fake.recordInvocation("StoreRoomInternal", []interface{}{arg1, arg2, arg3})
fake.storeRoomInternalMutex.Unlock()
if stub != nil {
return stub(arg1, arg2, arg3)
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeObjectStore) StoreRoomInternalCallCount() int {
fake.storeRoomInternalMutex.RLock()
defer fake.storeRoomInternalMutex.RUnlock()
return len(fake.storeRoomInternalArgsForCall)
}
func (fake *FakeObjectStore) StoreRoomInternalCalls(stub func(context.Context, livekit.RoomName, *livekit.RoomInternal) error) {
fake.storeRoomInternalMutex.Lock()
defer fake.storeRoomInternalMutex.Unlock()
fake.StoreRoomInternalStub = stub
}
func (fake *FakeObjectStore) StoreRoomInternalArgsForCall(i int) (context.Context, livekit.RoomName, *livekit.RoomInternal) {
fake.storeRoomInternalMutex.RLock()
defer fake.storeRoomInternalMutex.RUnlock()
argsForCall := fake.storeRoomInternalArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
}
func (fake *FakeObjectStore) StoreRoomInternalReturns(result1 error) {
fake.storeRoomInternalMutex.Lock()
defer fake.storeRoomInternalMutex.Unlock()
fake.StoreRoomInternalStub = nil
fake.storeRoomInternalReturns = struct {
result1 error
}{result1}
}
func (fake *FakeObjectStore) StoreRoomInternalReturnsOnCall(i int, result1 error) {
fake.storeRoomInternalMutex.Lock()
defer fake.storeRoomInternalMutex.Unlock()
fake.StoreRoomInternalStub = nil
if fake.storeRoomInternalReturnsOnCall == nil {
fake.storeRoomInternalReturnsOnCall = make(map[int]struct {
result1 error
})
}
fake.storeRoomInternalReturnsOnCall[i] = struct {
result1 error
}{result1}
}
func (fake *FakeObjectStore) UnlockRoom(arg1 context.Context, arg2 livekit.RoomName, arg3 string) error {
fake.unlockRoomMutex.Lock()
ret, specificReturn := fake.unlockRoomReturnsOnCall[len(fake.unlockRoomArgsForCall)]
@@ -965,16 +819,12 @@ func (fake *FakeObjectStore) Invocations() map[string][][]interface{} {
defer fake.loadParticipantMutex.RUnlock()
fake.loadRoomMutex.RLock()
defer fake.loadRoomMutex.RUnlock()
fake.loadRoomInternalMutex.RLock()
defer fake.loadRoomInternalMutex.RUnlock()
fake.lockRoomMutex.RLock()
defer fake.lockRoomMutex.RUnlock()
fake.storeParticipantMutex.RLock()
defer fake.storeParticipantMutex.RUnlock()
fake.storeRoomMutex.RLock()
defer fake.storeRoomMutex.RUnlock()
fake.storeRoomInternalMutex.RLock()
defer fake.storeRoomInternalMutex.RUnlock()
fake.unlockRoomMutex.RLock()
defer fake.unlockRoomMutex.RUnlock()
copiedInvocations := map[string][][]interface{}{}

View File

@@ -53,33 +53,22 @@ type FakeServiceStore struct {
result1 *livekit.ParticipantInfo
result2 error
}
LoadRoomStub func(context.Context, livekit.RoomName) (*livekit.Room, error)
LoadRoomStub func(context.Context, livekit.RoomName, bool) (*livekit.Room, *livekit.RoomInternal, error)
loadRoomMutex sync.RWMutex
loadRoomArgsForCall []struct {
arg1 context.Context
arg2 livekit.RoomName
arg3 bool
}
loadRoomReturns struct {
result1 *livekit.Room
result2 error
result2 *livekit.RoomInternal
result3 error
}
loadRoomReturnsOnCall map[int]struct {
result1 *livekit.Room
result2 error
}
LoadRoomInternalStub func(context.Context, livekit.RoomName) (*livekit.RoomInternal, error)
loadRoomInternalMutex sync.RWMutex
loadRoomInternalArgsForCall []struct {
arg1 context.Context
arg2 livekit.RoomName
}
loadRoomInternalReturns struct {
result1 *livekit.RoomInternal
result2 error
}
loadRoomInternalReturnsOnCall map[int]struct {
result1 *livekit.RoomInternal
result2 error
result2 *livekit.RoomInternal
result3 error
}
invocations map[string][][]interface{}
invocationsMutex sync.RWMutex
@@ -286,24 +275,25 @@ func (fake *FakeServiceStore) LoadParticipantReturnsOnCall(i int, result1 *livek
}{result1, result2}
}
func (fake *FakeServiceStore) LoadRoom(arg1 context.Context, arg2 livekit.RoomName) (*livekit.Room, error) {
func (fake *FakeServiceStore) LoadRoom(arg1 context.Context, arg2 livekit.RoomName, arg3 bool) (*livekit.Room, *livekit.RoomInternal, error) {
fake.loadRoomMutex.Lock()
ret, specificReturn := fake.loadRoomReturnsOnCall[len(fake.loadRoomArgsForCall)]
fake.loadRoomArgsForCall = append(fake.loadRoomArgsForCall, struct {
arg1 context.Context
arg2 livekit.RoomName
}{arg1, arg2})
arg3 bool
}{arg1, arg2, arg3})
stub := fake.LoadRoomStub
fakeReturns := fake.loadRoomReturns
fake.recordInvocation("LoadRoom", []interface{}{arg1, arg2})
fake.recordInvocation("LoadRoom", []interface{}{arg1, arg2, arg3})
fake.loadRoomMutex.Unlock()
if stub != nil {
return stub(arg1, arg2)
return stub(arg1, arg2, arg3)
}
if specificReturn {
return ret.result1, ret.result2
return ret.result1, ret.result2, ret.result3
}
return fakeReturns.result1, fakeReturns.result2
return fakeReturns.result1, fakeReturns.result2, fakeReturns.result3
}
func (fake *FakeServiceStore) LoadRoomCallCount() int {
@@ -312,108 +302,46 @@ func (fake *FakeServiceStore) LoadRoomCallCount() int {
return len(fake.loadRoomArgsForCall)
}
func (fake *FakeServiceStore) LoadRoomCalls(stub func(context.Context, livekit.RoomName) (*livekit.Room, error)) {
func (fake *FakeServiceStore) LoadRoomCalls(stub func(context.Context, livekit.RoomName, bool) (*livekit.Room, *livekit.RoomInternal, error)) {
fake.loadRoomMutex.Lock()
defer fake.loadRoomMutex.Unlock()
fake.LoadRoomStub = stub
}
func (fake *FakeServiceStore) LoadRoomArgsForCall(i int) (context.Context, livekit.RoomName) {
func (fake *FakeServiceStore) LoadRoomArgsForCall(i int) (context.Context, livekit.RoomName, bool) {
fake.loadRoomMutex.RLock()
defer fake.loadRoomMutex.RUnlock()
argsForCall := fake.loadRoomArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
}
func (fake *FakeServiceStore) LoadRoomReturns(result1 *livekit.Room, result2 error) {
func (fake *FakeServiceStore) LoadRoomReturns(result1 *livekit.Room, result2 *livekit.RoomInternal, result3 error) {
fake.loadRoomMutex.Lock()
defer fake.loadRoomMutex.Unlock()
fake.LoadRoomStub = nil
fake.loadRoomReturns = struct {
result1 *livekit.Room
result2 error
}{result1, result2}
result2 *livekit.RoomInternal
result3 error
}{result1, result2, result3}
}
func (fake *FakeServiceStore) LoadRoomReturnsOnCall(i int, result1 *livekit.Room, result2 error) {
func (fake *FakeServiceStore) LoadRoomReturnsOnCall(i int, result1 *livekit.Room, result2 *livekit.RoomInternal, result3 error) {
fake.loadRoomMutex.Lock()
defer fake.loadRoomMutex.Unlock()
fake.LoadRoomStub = nil
if fake.loadRoomReturnsOnCall == nil {
fake.loadRoomReturnsOnCall = make(map[int]struct {
result1 *livekit.Room
result2 error
result2 *livekit.RoomInternal
result3 error
})
}
fake.loadRoomReturnsOnCall[i] = struct {
result1 *livekit.Room
result2 error
}{result1, result2}
}
func (fake *FakeServiceStore) LoadRoomInternal(arg1 context.Context, arg2 livekit.RoomName) (*livekit.RoomInternal, error) {
fake.loadRoomInternalMutex.Lock()
ret, specificReturn := fake.loadRoomInternalReturnsOnCall[len(fake.loadRoomInternalArgsForCall)]
fake.loadRoomInternalArgsForCall = append(fake.loadRoomInternalArgsForCall, struct {
arg1 context.Context
arg2 livekit.RoomName
}{arg1, arg2})
stub := fake.LoadRoomInternalStub
fakeReturns := fake.loadRoomInternalReturns
fake.recordInvocation("LoadRoomInternal", []interface{}{arg1, arg2})
fake.loadRoomInternalMutex.Unlock()
if stub != nil {
return stub(arg1, arg2)
}
if specificReturn {
return ret.result1, ret.result2
}
return fakeReturns.result1, fakeReturns.result2
}
func (fake *FakeServiceStore) LoadRoomInternalCallCount() int {
fake.loadRoomInternalMutex.RLock()
defer fake.loadRoomInternalMutex.RUnlock()
return len(fake.loadRoomInternalArgsForCall)
}
func (fake *FakeServiceStore) LoadRoomInternalCalls(stub func(context.Context, livekit.RoomName) (*livekit.RoomInternal, error)) {
fake.loadRoomInternalMutex.Lock()
defer fake.loadRoomInternalMutex.Unlock()
fake.LoadRoomInternalStub = stub
}
func (fake *FakeServiceStore) LoadRoomInternalArgsForCall(i int) (context.Context, livekit.RoomName) {
fake.loadRoomInternalMutex.RLock()
defer fake.loadRoomInternalMutex.RUnlock()
argsForCall := fake.loadRoomInternalArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeServiceStore) LoadRoomInternalReturns(result1 *livekit.RoomInternal, result2 error) {
fake.loadRoomInternalMutex.Lock()
defer fake.loadRoomInternalMutex.Unlock()
fake.LoadRoomInternalStub = nil
fake.loadRoomInternalReturns = struct {
result1 *livekit.RoomInternal
result2 error
}{result1, result2}
}
func (fake *FakeServiceStore) LoadRoomInternalReturnsOnCall(i int, result1 *livekit.RoomInternal, result2 error) {
fake.loadRoomInternalMutex.Lock()
defer fake.loadRoomInternalMutex.Unlock()
fake.LoadRoomInternalStub = nil
if fake.loadRoomInternalReturnsOnCall == nil {
fake.loadRoomInternalReturnsOnCall = make(map[int]struct {
result1 *livekit.RoomInternal
result2 error
})
}
fake.loadRoomInternalReturnsOnCall[i] = struct {
result1 *livekit.RoomInternal
result2 error
}{result1, result2}
result2 *livekit.RoomInternal
result3 error
}{result1, result2, result3}
}
func (fake *FakeServiceStore) Invocations() map[string][][]interface{} {
@@ -427,8 +355,6 @@ func (fake *FakeServiceStore) Invocations() map[string][][]interface{} {
defer fake.loadParticipantMutex.RUnlock()
fake.loadRoomMutex.RLock()
defer fake.loadRoomMutex.RUnlock()
fake.loadRoomInternalMutex.RLock()
defer fake.loadRoomInternalMutex.RUnlock()
copiedInvocations := map[string][][]interface{}{}
for key, value := range fake.invocations {
copiedInvocations[key] = value

View File

@@ -131,7 +131,7 @@ func NewTurnServer(conf *config.Config, authHandler turn.AuthHandler, standalone
func newTurnAuthHandler(roomStore ObjectStore) turn.AuthHandler {
return func(username, realm string, srcAddr net.Addr) (key []byte, ok bool) {
// room id should be the username, create a hashed room id
rm, err := roomStore.LoadRoom(context.Background(), livekit.RoomName(username))
rm, _, err := roomStore.LoadRoom(context.Background(), livekit.RoomName(username), false)
if err != nil {
return nil, false
}