From 69c1d4c2958eec4dcfa667be8a19a4c7a3270abc Mon Sep 17 00:00:00 2001 From: David Colburn Date: Sat, 28 Aug 2021 18:28:08 -0500 Subject: [PATCH] Interfaces (#97) * create interface * move room manager to interface * interfaces * updates * fix loop * fix fakes * remove node types --- go.mod | 2 +- go.sum | 4 +- pkg/service/interfaces.go | 44 ++ pkg/service/localroomstore.go | 10 +- pkg/service/redisroomstore.go | 10 +- pkg/service/redisroomstore_test.go | 6 +- pkg/service/roommanager.go | 57 +- pkg/service/roommanager_test.go | 6 +- pkg/service/roomservice.go | 28 +- pkg/service/roomstore.go | 29 -- pkg/service/rtcservice.go | 4 +- pkg/service/server.go | 6 +- pkg/service/servicefakes/fake_room_store.go | 546 ++++++++++---------- pkg/service/turn.go | 4 +- pkg/service/utils.go | 3 +- pkg/service/wire_gen.go | 8 +- 16 files changed, 394 insertions(+), 373 deletions(-) create mode 100644 pkg/service/interfaces.go delete mode 100644 pkg/service/roomstore.go diff --git a/go.mod b/go.mod index ed8d1b5e5..507e524c2 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/google/wire v0.5.0 github.com/gorilla/websocket v1.4.2 github.com/jxskiss/base62 v0.0.0-20191017122030-4f11678b909b - github.com/livekit/protocol v0.8.0 + github.com/livekit/protocol v0.8.1 github.com/magefile/mage v1.11.0 github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0 github.com/mitchellh/go-homedir v1.1.0 diff --git a/go.sum b/go.sum index ba792bf4d..530dd0e2e 100644 --- a/go.sum +++ b/go.sum @@ -241,8 +241,8 @@ github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= github.com/livekit/ion-sfu v1.20.8 h1:nNyih1cq82dmuBv41XKH3qTDFtRvTnHwCKmUBFGz/Vc= github.com/livekit/ion-sfu v1.20.8/go.mod h1:g8hwobZI5fvX1RXvayf4ZXkgP7spV5YGE4yTSsumpB4= -github.com/livekit/protocol v0.8.0 h1:cfiSy12WUozFBLi0dE6LEoignJOOPra0Fvf/5MJOI1I= -github.com/livekit/protocol v0.8.0/go.mod h1:OczgiKz3Uo6g35oav5g/m3fAFrxd1sROWKmOj3wsVx0= +github.com/livekit/protocol v0.8.1 h1:J4mz1C2rUM6696xc1UQJ+qpi7DJXLWl9oTR68xYTGX8= +github.com/livekit/protocol v0.8.1/go.mod h1:YKcyBbqH0WmNL35i7c5jxr1L2Az13oT10oGNDOijI20= github.com/lucsky/cuid v1.0.2 h1:z4XlExeoderxoPj2/dxKOyPxe9RCOu7yNq9/XWxIUMQ= github.com/lucsky/cuid v1.0.2/go.mod h1:QaaJqckboimOmhRSJXSx/+IT+VTfxfPGSo/6mfgUfmE= github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= diff --git a/pkg/service/interfaces.go b/pkg/service/interfaces.go new file mode 100644 index 000000000..59426f034 --- /dev/null +++ b/pkg/service/interfaces.go @@ -0,0 +1,44 @@ +package service + +import ( + "time" + + livekit "github.com/livekit/protocol/proto" + + "github.com/livekit/livekit-server/pkg/routing" + "github.com/livekit/livekit-server/pkg/rtc" +) + +//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate + +// encapsulates CRUD operations for room settings +// look up participant +//counterfeiter:generate . RoomStore +type RoomStore interface { + StoreRoom(room *livekit.Room) error + LoadRoom(idOrName string) (*livekit.Room, error) + ListRooms() ([]*livekit.Room, error) + DeleteRoom(idOrName string) error + + // enable locking on a specific room to prevent race + // returns a (lock uuid, error) + LockRoom(name string, duration time.Duration) (string, error) + UnlockRoom(name string, uid string) error + + StoreParticipant(roomName string, participant *livekit.ParticipantInfo) error + LoadParticipant(roomName, identity string) (*livekit.ParticipantInfo, error) + ListParticipants(roomName string) ([]*livekit.ParticipantInfo, error) + DeleteParticipant(roomName, identity string) error +} + +type RoomManager interface { + RoomStore + + CreateRoom(req *livekit.CreateRoomRequest) (*livekit.Room, error) + GetRoom(roomName string) *rtc.Room + DeleteRoom(roomName string) error + CleanupRooms() error + CloseIdleRooms() + Stop() + StartSession(roomName string, pi routing.ParticipantInit, requestSource routing.MessageSource, responseSink routing.MessageSink) +} diff --git a/pkg/service/localroomstore.go b/pkg/service/localroomstore.go index 7ccc19cc8..566a444ca 100644 --- a/pkg/service/localroomstore.go +++ b/pkg/service/localroomstore.go @@ -28,7 +28,7 @@ func NewLocalRoomStore() *LocalRoomStore { } } -func (p *LocalRoomStore) CreateRoom(room *livekit.Room) error { +func (p *LocalRoomStore) StoreRoom(room *livekit.Room) error { if room.CreationTime == 0 { room.CreationTime = time.Now().Unix() } @@ -39,7 +39,7 @@ func (p *LocalRoomStore) CreateRoom(room *livekit.Room) error { return nil } -func (p *LocalRoomStore) GetRoom(idOrName string) (*livekit.Room, error) { +func (p *LocalRoomStore) LoadRoom(idOrName string) (*livekit.Room, error) { p.lock.RLock() defer p.lock.RUnlock() // see if it's an id or name @@ -65,7 +65,7 @@ func (p *LocalRoomStore) ListRooms() ([]*livekit.Room, error) { } func (p *LocalRoomStore) DeleteRoom(idOrName string) error { - room, err := p.GetRoom(idOrName) + room, err := p.LoadRoom(idOrName) if err == ErrRoomNotFound { return nil } else if err != nil { @@ -92,7 +92,7 @@ func (p *LocalRoomStore) UnlockRoom(name string, uid string) error { return nil } -func (p *LocalRoomStore) PersistParticipant(roomName string, participant *livekit.ParticipantInfo) error { +func (p *LocalRoomStore) StoreParticipant(roomName string, participant *livekit.ParticipantInfo) error { p.lock.Lock() defer p.lock.Unlock() roomParticipants := p.participants[roomName] @@ -104,7 +104,7 @@ func (p *LocalRoomStore) PersistParticipant(roomName string, participant *liveki return nil } -func (p *LocalRoomStore) GetParticipant(roomName, identity string) (*livekit.ParticipantInfo, error) { +func (p *LocalRoomStore) LoadParticipant(roomName, identity string) (*livekit.ParticipantInfo, error) { p.lock.RLock() defer p.lock.RUnlock() diff --git a/pkg/service/redisroomstore.go b/pkg/service/redisroomstore.go index d8c0f2846..177d9e522 100644 --- a/pkg/service/redisroomstore.go +++ b/pkg/service/redisroomstore.go @@ -38,7 +38,7 @@ func NewRedisRoomStore(rc *redis.Client) *RedisRoomStore { } } -func (p *RedisRoomStore) CreateRoom(room *livekit.Room) error { +func (p *RedisRoomStore) StoreRoom(room *livekit.Room) error { if room.CreationTime == 0 { room.CreationTime = time.Now().Unix() } @@ -58,7 +58,7 @@ func (p *RedisRoomStore) CreateRoom(room *livekit.Room) error { return nil } -func (p *RedisRoomStore) GetRoom(idOrName string) (*livekit.Room, error) { +func (p *RedisRoomStore) LoadRoom(idOrName string) (*livekit.Room, error) { // see if matches any ids name, err := p.rc.HGet(p.ctx, RoomIdMap, idOrName).Result() if err != nil { @@ -102,7 +102,7 @@ func (p *RedisRoomStore) ListRooms() ([]*livekit.Room, error) { } func (p *RedisRoomStore) DeleteRoom(idOrName string) error { - room, err := p.GetRoom(idOrName) + room, err := p.LoadRoom(idOrName) var sid, name string if err == ErrRoomNotFound { @@ -167,7 +167,7 @@ func (p *RedisRoomStore) UnlockRoom(name string, uid string) error { return p.rc.Del(p.ctx, key).Err() } -func (p *RedisRoomStore) PersistParticipant(roomName string, participant *livekit.ParticipantInfo) error { +func (p *RedisRoomStore) StoreParticipant(roomName string, participant *livekit.ParticipantInfo) error { key := RoomParticipantsPrefix + roomName data, err := proto.Marshal(participant) @@ -178,7 +178,7 @@ func (p *RedisRoomStore) PersistParticipant(roomName string, participant *liveki return p.rc.HSet(p.ctx, key, participant.Identity, data).Err() } -func (p *RedisRoomStore) GetParticipant(roomName, identity string) (*livekit.ParticipantInfo, error) { +func (p *RedisRoomStore) LoadParticipant(roomName, identity string) (*livekit.ParticipantInfo, error) { key := RoomParticipantsPrefix + roomName data, err := p.rc.HGet(p.ctx, key, identity).Result() if err == redis.Nil { diff --git a/pkg/service/redisroomstore_test.go b/pkg/service/redisroomstore_test.go index 0c02678b9..344ee8b94 100644 --- a/pkg/service/redisroomstore_test.go +++ b/pkg/service/redisroomstore_test.go @@ -32,10 +32,10 @@ func TestParticipantPersistence(t *testing.T) { } // create the participant - require.NoError(t, rs.PersistParticipant(roomName, p)) + require.NoError(t, rs.StoreParticipant(roomName, p)) // result should match - pGet, err := rs.GetParticipant(roomName, p.Identity) + pGet, err := rs.LoadParticipant(roomName, p.Identity) require.NoError(t, err) require.Equal(t, p.Identity, pGet.Identity) require.Equal(t, len(p.Tracks), len(pGet.Tracks)) @@ -54,7 +54,7 @@ func TestParticipantPersistence(t *testing.T) { require.Len(t, participants, 0) // shouldn't be able to get it - _, err = rs.GetParticipant(roomName, p.Identity) + _, err = rs.LoadParticipant(roomName, p.Identity) require.Equal(t, err, service.ErrParticipantNotFound) } diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 8f90537ed..a03e74371 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -22,11 +22,12 @@ const ( roomPurgeSeconds = 24 * 60 * 60 ) -// RoomManager manages rooms and its interaction with participants. +// LocalRoomManager manages rooms and its interaction with participants. // It's responsible for creating, deleting rooms, as well as running sessions for participants -type RoomManager struct { +type LocalRoomManager struct { + RoomStore + lock sync.RWMutex - roomStore RoomStore selector routing.NodeSelector router routing.Router currentNode routing.LocalNode @@ -37,16 +38,16 @@ type RoomManager struct { rooms map[string]*rtc.Room } -func NewRoomManager(rp RoomStore, router routing.Router, currentNode routing.LocalNode, selector routing.NodeSelector, - notifier *webhook.Notifier, conf *config.Config) (*RoomManager, error) { +func NewLocalRoomManager(rp RoomStore, router routing.Router, currentNode routing.LocalNode, selector routing.NodeSelector, + notifier *webhook.Notifier, conf *config.Config) (*LocalRoomManager, error) { rtcConf, err := rtc.NewWebRTCConfig(conf, currentNode.Ip) if err != nil { return nil, err } - return &RoomManager{ + return &LocalRoomManager{ + RoomStore: rp, lock: sync.RWMutex{}, - roomStore: rp, rtcConfig: rtcConf, config: conf, router: router, @@ -60,17 +61,17 @@ func NewRoomManager(rp RoomStore, router routing.Router, currentNode routing.Loc // CreateRoom creates a new room from a request and allocates it to a node to handle // it'll also monitor fits state, and cleans it up when appropriate -func (r *RoomManager) CreateRoom(req *livekit.CreateRoomRequest) (*livekit.Room, error) { - token, err := r.roomStore.LockRoom(req.Name, 5*time.Second) +func (r *LocalRoomManager) CreateRoom(req *livekit.CreateRoomRequest) (*livekit.Room, error) { + token, err := r.LockRoom(req.Name, 5*time.Second) if err != nil { return nil, err } defer func() { - _ = r.roomStore.UnlockRoom(req.Name, token) + _ = r.UnlockRoom(req.Name, token) }() // find existing room and update it - rm, err := r.roomStore.GetRoom(req.Name) + rm, err := r.LoadRoom(req.Name) if err == ErrRoomNotFound { rm = &livekit.Room{ Sid: utils.NewGuid(utils.RoomPrefix), @@ -89,7 +90,7 @@ func (r *RoomManager) CreateRoom(req *livekit.CreateRoomRequest) (*livekit.Room, if req.MaxParticipants > 0 { rm.MaxParticipants = req.MaxParticipants } - if err := r.roomStore.CreateRoom(rm); err != nil { + if err := r.StoreRoom(rm); err != nil { return nil, err } @@ -128,14 +129,14 @@ func (r *RoomManager) CreateRoom(req *livekit.CreateRoomRequest) (*livekit.Room, return rm, nil } -func (r *RoomManager) GetRoom(roomName string) *rtc.Room { +func (r *LocalRoomManager) GetRoom(roomName string) *rtc.Room { r.lock.RLock() defer r.lock.RUnlock() return r.rooms[roomName] } // DeleteRoom completely deletes all room information, including active sessions, room store, and routing info -func (r *RoomManager) DeleteRoom(roomName string) error { +func (r *LocalRoomManager) DeleteRoom(roomName string) error { logger.Infow("deleting room state", "room", roomName) r.lock.Lock() delete(r.rooms, roomName) @@ -152,7 +153,7 @@ func (r *RoomManager) DeleteRoom(roomName string) error { // also delete room from db go func() { defer wg.Done() - err2 = r.roomStore.DeleteRoom(roomName) + err2 = r.RoomStore.DeleteRoom(roomName) }() wg.Wait() @@ -164,9 +165,9 @@ func (r *RoomManager) DeleteRoom(roomName string) error { } // CleanupRooms cleans up after old rooms that have been around for awhile -func (r *RoomManager) CleanupRooms() error { +func (r *LocalRoomManager) CleanupRooms() error { // cleanup rooms that have been left for over a day - rooms, err := r.roomStore.ListRooms() + rooms, err := r.ListRooms() if err != nil { return err } @@ -182,7 +183,7 @@ func (r *RoomManager) CleanupRooms() error { return nil } -func (r *RoomManager) CloseIdleRooms() { +func (r *LocalRoomManager) CloseIdleRooms() { r.lock.RLock() rooms := make([]*rtc.Room, 0, len(r.rooms)) for _, rm := range r.rooms { @@ -195,7 +196,7 @@ func (r *RoomManager) CloseIdleRooms() { } } -func (r *RoomManager) Stop() { +func (r *LocalRoomManager) Stop() { // disconnect all clients r.lock.RLock() rooms := make([]*rtc.Room, 0, len(r.rooms)) @@ -222,7 +223,7 @@ func (r *RoomManager) Stop() { } // StartSession starts WebRTC session when a new participant is connected, takes place on RTC node -func (r *RoomManager) StartSession(roomName string, pi routing.ParticipantInit, requestSource routing.MessageSource, responseSink routing.MessageSink) { +func (r *LocalRoomManager) StartSession(roomName string, pi routing.ParticipantInit, requestSource routing.MessageSource, responseSink routing.MessageSink) { room, err := r.getOrCreateRoom(roomName) if err != nil { logger.Errorw("could not create room", err, "room", roomName) @@ -325,7 +326,7 @@ func (r *RoomManager) StartSession(roomName string, pi routing.ParticipantInit, } // create the actual room object, to be used on RTC node -func (r *RoomManager) getOrCreateRoom(roomName string) (*rtc.Room, error) { +func (r *LocalRoomManager) getOrCreateRoom(roomName string) (*rtc.Room, error) { r.lock.RLock() room := r.rooms[roomName] r.lock.RUnlock() @@ -335,7 +336,7 @@ func (r *RoomManager) getOrCreateRoom(roomName string) (*rtc.Room, error) { } // create new room, get details first - ri, err := r.roomStore.GetRoom(roomName) + ri, err := r.LoadRoom(roomName) if err != nil { return nil, err } @@ -361,9 +362,9 @@ func (r *RoomManager) getOrCreateRoom(roomName string) (*rtc.Room, error) { room.OnParticipantChanged(func(p types.Participant) { var err error if p.State() == livekit.ParticipantInfo_DISCONNECTED { - err = r.roomStore.DeleteParticipant(roomName, p.Identity()) + err = r.DeleteParticipant(roomName, p.Identity()) } else { - err = r.roomStore.PersistParticipant(roomName, p.ToProto()) + err = r.StoreParticipant(roomName, p.ToProto()) } if err != nil { logger.Errorw("could not handle participant change", err) @@ -382,7 +383,7 @@ func (r *RoomManager) getOrCreateRoom(roomName string) (*rtc.Room, error) { } // manages an RTC session for a participant, runs on the RTC node -func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.Participant, requestSource routing.MessageSource) { +func (r *LocalRoomManager) rtcSessionWorker(room *rtc.Room, participant types.Participant, requestSource routing.MessageSource) { defer func() { logger.Debugw("RTC session finishing", "participant", participant.Identity(), @@ -482,7 +483,7 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.Partici } } -func (r *RoomManager) handleRTCMessage(roomName, identity string, msg *livekit.RTCNodeMessage) { +func (r *LocalRoomManager) handleRTCMessage(roomName, identity string, msg *livekit.RTCNodeMessage) { r.lock.RLock() room := r.rooms[roomName] r.lock.RUnlock() @@ -537,7 +538,7 @@ func (r *RoomManager) handleRTCMessage(roomName, identity string, msg *livekit.R } } -func (r *RoomManager) iceServersForRoom(ri *livekit.Room) []*livekit.ICEServer { +func (r *LocalRoomManager) iceServersForRoom(ri *livekit.Room) []*livekit.ICEServer { var iceServers []*livekit.ICEServer hasSTUN := false @@ -571,7 +572,7 @@ func (r *RoomManager) iceServersForRoom(ri *livekit.Room) []*livekit.ICEServer { return iceServers } -func (r *RoomManager) notifyEvent(event *livekit.WebhookEvent) { +func (r *LocalRoomManager) notifyEvent(event *livekit.WebhookEvent) { if r.notifier == nil { return } diff --git a/pkg/service/roommanager_test.go b/pkg/service/roommanager_test.go index 951144067..a6f49cebc 100644 --- a/pkg/service/roommanager_test.go +++ b/pkg/service/roommanager_test.go @@ -24,9 +24,9 @@ func TestCreateRoom(t *testing.T) { }) } -func newTestRoomManager(t *testing.T) (*service.RoomManager, *config.Config) { +func newTestRoomManager(t *testing.T) (*service.LocalRoomManager, *config.Config) { store := &servicefakes.FakeRoomStore{} - store.GetRoomReturns(nil, service.ErrRoomNotFound) + store.LoadRoomReturns(nil, service.ErrRoomNotFound) router := &routingfakes.FakeRouter{} conf, err := config.NewConfig("", nil) require.NoError(t, err) @@ -36,7 +36,7 @@ func newTestRoomManager(t *testing.T) (*service.RoomManager, *config.Config) { router.GetNodeForRoomReturns(node, nil) - rm, err := service.NewRoomManager(store, router, node, selector, nil, conf) + rm, err := service.NewLocalRoomManager(store, router, node, selector, nil, conf) require.NoError(t, err) return rm, conf diff --git a/pkg/service/roomservice.go b/pkg/service/roomservice.go index 64997e094..37eff1b29 100644 --- a/pkg/service/roomservice.go +++ b/pkg/service/roomservice.go @@ -13,11 +13,15 @@ import ( // A rooms service that supports a single node type RoomService struct { - roomManager *RoomManager + router routing.Router + roomManager RoomManager } -func NewRoomService(roomManager *RoomManager) (svc *RoomService, err error) { - svc = &RoomService{roomManager: roomManager} +func NewRoomService(roomManager RoomManager, router routing.Router) (svc *RoomService, err error) { + svc = &RoomService{ + router: router, + roomManager: roomManager, + } return } @@ -40,7 +44,7 @@ func (s *RoomService) ListRooms(ctx context.Context, req *livekit.ListRoomsReque return nil, twirpAuthError(err) } - rooms, err := s.roomManager.roomStore.ListRooms() + rooms, err := s.roomManager.ListRooms() if err != nil { // TODO: translate error codes to twirp return @@ -58,7 +62,7 @@ func (s *RoomService) DeleteRoom(ctx context.Context, req *livekit.DeleteRoomReq } // if the room is currently active, RTC node needs to disconnect clients // here we are using any user's identity, due to how it works with routing - participants, err := s.roomManager.roomStore.ListParticipants(req.Room) + participants, err := s.roomManager.ListParticipants(req.Room) if err != nil { return nil, err } @@ -88,7 +92,7 @@ func (s *RoomService) ListParticipants(ctx context.Context, req *livekit.ListPar return nil, twirpAuthError(err) } - participants, err := s.roomManager.roomStore.ListParticipants(req.Room) + participants, err := s.roomManager.ListParticipants(req.Room) if err != nil { return } @@ -104,7 +108,7 @@ func (s *RoomService) GetParticipant(ctx context.Context, req *livekit.RoomParti return nil, twirpAuthError(err) } - participant, err := s.roomManager.roomStore.GetParticipant(req.Room, req.Identity) + participant, err := s.roomManager.LoadParticipant(req.Room, req.Identity) if err != nil { return } @@ -132,7 +136,7 @@ func (s *RoomService) MutePublishedTrack(ctx context.Context, req *livekit.MuteR return nil, twirpAuthError(err) } - participant, err := s.roomManager.roomStore.GetParticipant(req.Room, req.Identity) + participant, err := s.roomManager.LoadParticipant(req.Room, req.Identity) if err != nil { return nil, err } @@ -172,7 +176,7 @@ func (s *RoomService) UpdateParticipant(ctx context.Context, req *livekit.Update return nil, err } - participant, err := s.roomManager.roomStore.GetParticipant(req.Room, req.Identity) + participant, err := s.roomManager.LoadParticipant(req.Room, req.Identity) if err != nil { return nil, err } @@ -196,7 +200,7 @@ func (s *RoomService) UpdateSubscriptions(ctx context.Context, req *livekit.Upda func (s *RoomService) SendData(ctx context.Context, req *livekit.SendDataRequest) (*livekit.SendDataResponse, error) { // here we are using any user's identity, due to how it works with routing - participants, err := s.roomManager.roomStore.ListParticipants(req.Room) + participants, err := s.roomManager.ListParticipants(req.Room) if err != nil { return nil, err } @@ -220,12 +224,12 @@ func (s *RoomService) createRTCSink(ctx context.Context, room, identity string) return nil, twirpAuthError(err) } - _, err := s.roomManager.roomStore.GetParticipant(room, identity) + _, err := s.roomManager.LoadParticipant(room, identity) if err != nil { return nil, err } - return s.roomManager.router.CreateRTCSink(room, identity) + return s.router.CreateRTCSink(room, identity) } func (s *RoomService) writeMessage(ctx context.Context, room, identity string, msg *livekit.RTCNodeMessage) error { diff --git a/pkg/service/roomstore.go b/pkg/service/roomstore.go deleted file mode 100644 index 80838fe77..000000000 --- a/pkg/service/roomstore.go +++ /dev/null @@ -1,29 +0,0 @@ -package service - -import ( - "time" - - livekit "github.com/livekit/protocol/proto" -) - -//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate - -// encapsulates CRUD operations for room settings -// look up participant -//counterfeiter:generate . RoomStore -type RoomStore interface { - CreateRoom(room *livekit.Room) error - GetRoom(idOrName string) (*livekit.Room, error) - ListRooms() ([]*livekit.Room, error) - DeleteRoom(idOrName string) error - - // enable locking on a specific room to prevent race - // returns a (lock uuid, error) - LockRoom(name string, duration time.Duration) (string, error) - UnlockRoom(name string, uid string) error - - PersistParticipant(roomName string, participant *livekit.ParticipantInfo) error - GetParticipant(roomName, identity string) (*livekit.ParticipantInfo, error) - ListParticipants(roomName string) ([]*livekit.ParticipantInfo, error) - DeleteParticipant(roomName, identity string) error -} diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index 9fa393057..ed1bdbd34 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -19,13 +19,13 @@ import ( type RTCService struct { router routing.Router - roomManager *RoomManager + roomManager RoomManager upgrader websocket.Upgrader currentNode routing.LocalNode isDev bool } -func NewRTCService(conf *config.Config, roomManager *RoomManager, router routing.Router, currentNode routing.LocalNode) *RTCService { +func NewRTCService(conf *config.Config, roomManager RoomManager, router routing.Router, currentNode routing.LocalNode) *RTCService { s := &RTCService{ router: router, roomManager: roomManager, diff --git a/pkg/service/server.go b/pkg/service/server.go index b98e9f60f..591647ae9 100644 --- a/pkg/service/server.go +++ b/pkg/service/server.go @@ -31,7 +31,7 @@ type LivekitServer struct { httpServer *http.Server promServer *http.Server router routing.Router - roomManager *RoomManager + roomManager *LocalRoomManager turnServer *turn.Server currentNode routing.LocalNode running utils.AtomicFlag @@ -45,7 +45,7 @@ func NewLivekitServer(conf *config.Config, rtcService *RTCService, keyProvider auth.KeyProvider, router routing.Router, - roomManager *RoomManager, + roomManager *LocalRoomManager, turnServer *turn.Server, currentNode routing.LocalNode, ) (s *LivekitServer, err error) { @@ -219,7 +219,7 @@ func (s *LivekitServer) Stop() { <-s.closedChan } -func (s *LivekitServer) RoomManager() *RoomManager { +func (s *LivekitServer) RoomManager() RoomManager { return s.roomManager } diff --git a/pkg/service/servicefakes/fake_room_store.go b/pkg/service/servicefakes/fake_room_store.go index 33fb7c4a5..5df7b0a9b 100644 --- a/pkg/service/servicefakes/fake_room_store.go +++ b/pkg/service/servicefakes/fake_room_store.go @@ -10,17 +10,6 @@ import ( ) type FakeRoomStore struct { - CreateRoomStub func(*livekit.Room) error - createRoomMutex sync.RWMutex - createRoomArgsForCall []struct { - arg1 *livekit.Room - } - createRoomReturns struct { - result1 error - } - createRoomReturnsOnCall map[int]struct { - result1 error - } DeleteParticipantStub func(string, string) error deleteParticipantMutex sync.RWMutex deleteParticipantArgsForCall []struct { @@ -44,33 +33,6 @@ type FakeRoomStore struct { deleteRoomReturnsOnCall map[int]struct { result1 error } - GetParticipantStub func(string, string) (*livekit.ParticipantInfo, error) - getParticipantMutex sync.RWMutex - getParticipantArgsForCall []struct { - arg1 string - arg2 string - } - getParticipantReturns struct { - result1 *livekit.ParticipantInfo - result2 error - } - getParticipantReturnsOnCall map[int]struct { - result1 *livekit.ParticipantInfo - result2 error - } - GetRoomStub func(string) (*livekit.Room, error) - getRoomMutex sync.RWMutex - getRoomArgsForCall []struct { - arg1 string - } - getRoomReturns struct { - result1 *livekit.Room - result2 error - } - getRoomReturnsOnCall map[int]struct { - result1 *livekit.Room - result2 error - } ListParticipantsStub func(string) ([]*livekit.ParticipantInfo, error) listParticipantsMutex sync.RWMutex listParticipantsArgsForCall []struct { @@ -96,6 +58,33 @@ type FakeRoomStore struct { result1 []*livekit.Room result2 error } + LoadParticipantStub func(string, string) (*livekit.ParticipantInfo, error) + loadParticipantMutex sync.RWMutex + loadParticipantArgsForCall []struct { + arg1 string + arg2 string + } + loadParticipantReturns struct { + result1 *livekit.ParticipantInfo + result2 error + } + loadParticipantReturnsOnCall map[int]struct { + result1 *livekit.ParticipantInfo + result2 error + } + LoadRoomStub func(string) (*livekit.Room, error) + loadRoomMutex sync.RWMutex + loadRoomArgsForCall []struct { + arg1 string + } + loadRoomReturns struct { + result1 *livekit.Room + result2 error + } + loadRoomReturnsOnCall map[int]struct { + result1 *livekit.Room + result2 error + } LockRoomStub func(string, time.Duration) (string, error) lockRoomMutex sync.RWMutex lockRoomArgsForCall []struct { @@ -110,16 +99,27 @@ type FakeRoomStore struct { result1 string result2 error } - PersistParticipantStub func(string, *livekit.ParticipantInfo) error - persistParticipantMutex sync.RWMutex - persistParticipantArgsForCall []struct { + StoreParticipantStub func(string, *livekit.ParticipantInfo) error + storeParticipantMutex sync.RWMutex + storeParticipantArgsForCall []struct { arg1 string arg2 *livekit.ParticipantInfo } - persistParticipantReturns struct { + storeParticipantReturns struct { result1 error } - persistParticipantReturnsOnCall map[int]struct { + storeParticipantReturnsOnCall map[int]struct { + result1 error + } + StoreRoomStub func(*livekit.Room) error + storeRoomMutex sync.RWMutex + storeRoomArgsForCall []struct { + arg1 *livekit.Room + } + storeRoomReturns struct { + result1 error + } + storeRoomReturnsOnCall map[int]struct { result1 error } UnlockRoomStub func(string, string) error @@ -138,67 +138,6 @@ type FakeRoomStore struct { invocationsMutex sync.RWMutex } -func (fake *FakeRoomStore) CreateRoom(arg1 *livekit.Room) error { - fake.createRoomMutex.Lock() - ret, specificReturn := fake.createRoomReturnsOnCall[len(fake.createRoomArgsForCall)] - fake.createRoomArgsForCall = append(fake.createRoomArgsForCall, struct { - arg1 *livekit.Room - }{arg1}) - stub := fake.CreateRoomStub - fakeReturns := fake.createRoomReturns - fake.recordInvocation("CreateRoom", []interface{}{arg1}) - fake.createRoomMutex.Unlock() - if stub != nil { - return stub(arg1) - } - if specificReturn { - return ret.result1 - } - return fakeReturns.result1 -} - -func (fake *FakeRoomStore) CreateRoomCallCount() int { - fake.createRoomMutex.RLock() - defer fake.createRoomMutex.RUnlock() - return len(fake.createRoomArgsForCall) -} - -func (fake *FakeRoomStore) CreateRoomCalls(stub func(*livekit.Room) error) { - fake.createRoomMutex.Lock() - defer fake.createRoomMutex.Unlock() - fake.CreateRoomStub = stub -} - -func (fake *FakeRoomStore) CreateRoomArgsForCall(i int) *livekit.Room { - fake.createRoomMutex.RLock() - defer fake.createRoomMutex.RUnlock() - argsForCall := fake.createRoomArgsForCall[i] - return argsForCall.arg1 -} - -func (fake *FakeRoomStore) CreateRoomReturns(result1 error) { - fake.createRoomMutex.Lock() - defer fake.createRoomMutex.Unlock() - fake.CreateRoomStub = nil - fake.createRoomReturns = struct { - result1 error - }{result1} -} - -func (fake *FakeRoomStore) CreateRoomReturnsOnCall(i int, result1 error) { - fake.createRoomMutex.Lock() - defer fake.createRoomMutex.Unlock() - fake.CreateRoomStub = nil - if fake.createRoomReturnsOnCall == nil { - fake.createRoomReturnsOnCall = make(map[int]struct { - result1 error - }) - } - fake.createRoomReturnsOnCall[i] = struct { - result1 error - }{result1} -} - func (fake *FakeRoomStore) DeleteParticipant(arg1 string, arg2 string) error { fake.deleteParticipantMutex.Lock() ret, specificReturn := fake.deleteParticipantReturnsOnCall[len(fake.deleteParticipantArgsForCall)] @@ -322,135 +261,6 @@ func (fake *FakeRoomStore) DeleteRoomReturnsOnCall(i int, result1 error) { }{result1} } -func (fake *FakeRoomStore) GetParticipant(arg1 string, arg2 string) (*livekit.ParticipantInfo, error) { - fake.getParticipantMutex.Lock() - ret, specificReturn := fake.getParticipantReturnsOnCall[len(fake.getParticipantArgsForCall)] - fake.getParticipantArgsForCall = append(fake.getParticipantArgsForCall, struct { - arg1 string - arg2 string - }{arg1, arg2}) - stub := fake.GetParticipantStub - fakeReturns := fake.getParticipantReturns - fake.recordInvocation("GetParticipant", []interface{}{arg1, arg2}) - fake.getParticipantMutex.Unlock() - if stub != nil { - return stub(arg1, arg2) - } - if specificReturn { - return ret.result1, ret.result2 - } - return fakeReturns.result1, fakeReturns.result2 -} - -func (fake *FakeRoomStore) GetParticipantCallCount() int { - fake.getParticipantMutex.RLock() - defer fake.getParticipantMutex.RUnlock() - return len(fake.getParticipantArgsForCall) -} - -func (fake *FakeRoomStore) GetParticipantCalls(stub func(string, string) (*livekit.ParticipantInfo, error)) { - fake.getParticipantMutex.Lock() - defer fake.getParticipantMutex.Unlock() - fake.GetParticipantStub = stub -} - -func (fake *FakeRoomStore) GetParticipantArgsForCall(i int) (string, string) { - fake.getParticipantMutex.RLock() - defer fake.getParticipantMutex.RUnlock() - argsForCall := fake.getParticipantArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 -} - -func (fake *FakeRoomStore) GetParticipantReturns(result1 *livekit.ParticipantInfo, result2 error) { - fake.getParticipantMutex.Lock() - defer fake.getParticipantMutex.Unlock() - fake.GetParticipantStub = nil - fake.getParticipantReturns = struct { - result1 *livekit.ParticipantInfo - result2 error - }{result1, result2} -} - -func (fake *FakeRoomStore) GetParticipantReturnsOnCall(i int, result1 *livekit.ParticipantInfo, result2 error) { - fake.getParticipantMutex.Lock() - defer fake.getParticipantMutex.Unlock() - fake.GetParticipantStub = nil - if fake.getParticipantReturnsOnCall == nil { - fake.getParticipantReturnsOnCall = make(map[int]struct { - result1 *livekit.ParticipantInfo - result2 error - }) - } - fake.getParticipantReturnsOnCall[i] = struct { - result1 *livekit.ParticipantInfo - result2 error - }{result1, result2} -} - -func (fake *FakeRoomStore) GetRoom(arg1 string) (*livekit.Room, error) { - fake.getRoomMutex.Lock() - ret, specificReturn := fake.getRoomReturnsOnCall[len(fake.getRoomArgsForCall)] - fake.getRoomArgsForCall = append(fake.getRoomArgsForCall, struct { - arg1 string - }{arg1}) - stub := fake.GetRoomStub - fakeReturns := fake.getRoomReturns - fake.recordInvocation("GetRoom", []interface{}{arg1}) - fake.getRoomMutex.Unlock() - if stub != nil { - return stub(arg1) - } - if specificReturn { - return ret.result1, ret.result2 - } - return fakeReturns.result1, fakeReturns.result2 -} - -func (fake *FakeRoomStore) GetRoomCallCount() int { - fake.getRoomMutex.RLock() - defer fake.getRoomMutex.RUnlock() - return len(fake.getRoomArgsForCall) -} - -func (fake *FakeRoomStore) GetRoomCalls(stub func(string) (*livekit.Room, error)) { - fake.getRoomMutex.Lock() - defer fake.getRoomMutex.Unlock() - fake.GetRoomStub = stub -} - -func (fake *FakeRoomStore) GetRoomArgsForCall(i int) string { - fake.getRoomMutex.RLock() - defer fake.getRoomMutex.RUnlock() - argsForCall := fake.getRoomArgsForCall[i] - return argsForCall.arg1 -} - -func (fake *FakeRoomStore) GetRoomReturns(result1 *livekit.Room, result2 error) { - fake.getRoomMutex.Lock() - defer fake.getRoomMutex.Unlock() - fake.GetRoomStub = nil - fake.getRoomReturns = struct { - result1 *livekit.Room - result2 error - }{result1, result2} -} - -func (fake *FakeRoomStore) GetRoomReturnsOnCall(i int, result1 *livekit.Room, result2 error) { - fake.getRoomMutex.Lock() - defer fake.getRoomMutex.Unlock() - fake.GetRoomStub = nil - if fake.getRoomReturnsOnCall == nil { - fake.getRoomReturnsOnCall = make(map[int]struct { - result1 *livekit.Room - result2 error - }) - } - fake.getRoomReturnsOnCall[i] = struct { - result1 *livekit.Room - result2 error - }{result1, result2} -} - func (fake *FakeRoomStore) ListParticipants(arg1 string) ([]*livekit.ParticipantInfo, error) { fake.listParticipantsMutex.Lock() ret, specificReturn := fake.listParticipantsReturnsOnCall[len(fake.listParticipantsArgsForCall)] @@ -571,6 +381,135 @@ func (fake *FakeRoomStore) ListRoomsReturnsOnCall(i int, result1 []*livekit.Room }{result1, result2} } +func (fake *FakeRoomStore) LoadParticipant(arg1 string, arg2 string) (*livekit.ParticipantInfo, error) { + fake.loadParticipantMutex.Lock() + ret, specificReturn := fake.loadParticipantReturnsOnCall[len(fake.loadParticipantArgsForCall)] + fake.loadParticipantArgsForCall = append(fake.loadParticipantArgsForCall, struct { + arg1 string + arg2 string + }{arg1, arg2}) + stub := fake.LoadParticipantStub + fakeReturns := fake.loadParticipantReturns + fake.recordInvocation("LoadParticipant", []interface{}{arg1, arg2}) + fake.loadParticipantMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeRoomStore) LoadParticipantCallCount() int { + fake.loadParticipantMutex.RLock() + defer fake.loadParticipantMutex.RUnlock() + return len(fake.loadParticipantArgsForCall) +} + +func (fake *FakeRoomStore) LoadParticipantCalls(stub func(string, string) (*livekit.ParticipantInfo, error)) { + fake.loadParticipantMutex.Lock() + defer fake.loadParticipantMutex.Unlock() + fake.LoadParticipantStub = stub +} + +func (fake *FakeRoomStore) LoadParticipantArgsForCall(i int) (string, string) { + fake.loadParticipantMutex.RLock() + defer fake.loadParticipantMutex.RUnlock() + argsForCall := fake.loadParticipantArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeRoomStore) LoadParticipantReturns(result1 *livekit.ParticipantInfo, result2 error) { + fake.loadParticipantMutex.Lock() + defer fake.loadParticipantMutex.Unlock() + fake.LoadParticipantStub = nil + fake.loadParticipantReturns = struct { + result1 *livekit.ParticipantInfo + result2 error + }{result1, result2} +} + +func (fake *FakeRoomStore) LoadParticipantReturnsOnCall(i int, result1 *livekit.ParticipantInfo, result2 error) { + fake.loadParticipantMutex.Lock() + defer fake.loadParticipantMutex.Unlock() + fake.LoadParticipantStub = nil + if fake.loadParticipantReturnsOnCall == nil { + fake.loadParticipantReturnsOnCall = make(map[int]struct { + result1 *livekit.ParticipantInfo + result2 error + }) + } + fake.loadParticipantReturnsOnCall[i] = struct { + result1 *livekit.ParticipantInfo + result2 error + }{result1, result2} +} + +func (fake *FakeRoomStore) LoadRoom(arg1 string) (*livekit.Room, error) { + fake.loadRoomMutex.Lock() + ret, specificReturn := fake.loadRoomReturnsOnCall[len(fake.loadRoomArgsForCall)] + fake.loadRoomArgsForCall = append(fake.loadRoomArgsForCall, struct { + arg1 string + }{arg1}) + stub := fake.LoadRoomStub + fakeReturns := fake.loadRoomReturns + fake.recordInvocation("LoadRoom", []interface{}{arg1}) + fake.loadRoomMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeRoomStore) LoadRoomCallCount() int { + fake.loadRoomMutex.RLock() + defer fake.loadRoomMutex.RUnlock() + return len(fake.loadRoomArgsForCall) +} + +func (fake *FakeRoomStore) LoadRoomCalls(stub func(string) (*livekit.Room, error)) { + fake.loadRoomMutex.Lock() + defer fake.loadRoomMutex.Unlock() + fake.LoadRoomStub = stub +} + +func (fake *FakeRoomStore) LoadRoomArgsForCall(i int) string { + fake.loadRoomMutex.RLock() + defer fake.loadRoomMutex.RUnlock() + argsForCall := fake.loadRoomArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeRoomStore) LoadRoomReturns(result1 *livekit.Room, result2 error) { + fake.loadRoomMutex.Lock() + defer fake.loadRoomMutex.Unlock() + fake.LoadRoomStub = nil + fake.loadRoomReturns = struct { + result1 *livekit.Room + result2 error + }{result1, result2} +} + +func (fake *FakeRoomStore) LoadRoomReturnsOnCall(i int, result1 *livekit.Room, result2 error) { + fake.loadRoomMutex.Lock() + defer fake.loadRoomMutex.Unlock() + fake.LoadRoomStub = nil + if fake.loadRoomReturnsOnCall == nil { + fake.loadRoomReturnsOnCall = make(map[int]struct { + result1 *livekit.Room + result2 error + }) + } + fake.loadRoomReturnsOnCall[i] = struct { + result1 *livekit.Room + result2 error + }{result1, result2} +} + func (fake *FakeRoomStore) LockRoom(arg1 string, arg2 time.Duration) (string, error) { fake.lockRoomMutex.Lock() ret, specificReturn := fake.lockRoomReturnsOnCall[len(fake.lockRoomArgsForCall)] @@ -636,17 +575,17 @@ func (fake *FakeRoomStore) LockRoomReturnsOnCall(i int, result1 string, result2 }{result1, result2} } -func (fake *FakeRoomStore) PersistParticipant(arg1 string, arg2 *livekit.ParticipantInfo) error { - fake.persistParticipantMutex.Lock() - ret, specificReturn := fake.persistParticipantReturnsOnCall[len(fake.persistParticipantArgsForCall)] - fake.persistParticipantArgsForCall = append(fake.persistParticipantArgsForCall, struct { +func (fake *FakeRoomStore) StoreParticipant(arg1 string, arg2 *livekit.ParticipantInfo) error { + fake.storeParticipantMutex.Lock() + ret, specificReturn := fake.storeParticipantReturnsOnCall[len(fake.storeParticipantArgsForCall)] + fake.storeParticipantArgsForCall = append(fake.storeParticipantArgsForCall, struct { arg1 string arg2 *livekit.ParticipantInfo }{arg1, arg2}) - stub := fake.PersistParticipantStub - fakeReturns := fake.persistParticipantReturns - fake.recordInvocation("PersistParticipant", []interface{}{arg1, arg2}) - fake.persistParticipantMutex.Unlock() + stub := fake.StoreParticipantStub + fakeReturns := fake.storeParticipantReturns + fake.recordInvocation("StoreParticipant", []interface{}{arg1, arg2}) + fake.storeParticipantMutex.Unlock() if stub != nil { return stub(arg1, arg2) } @@ -656,44 +595,105 @@ func (fake *FakeRoomStore) PersistParticipant(arg1 string, arg2 *livekit.Partici return fakeReturns.result1 } -func (fake *FakeRoomStore) PersistParticipantCallCount() int { - fake.persistParticipantMutex.RLock() - defer fake.persistParticipantMutex.RUnlock() - return len(fake.persistParticipantArgsForCall) +func (fake *FakeRoomStore) StoreParticipantCallCount() int { + fake.storeParticipantMutex.RLock() + defer fake.storeParticipantMutex.RUnlock() + return len(fake.storeParticipantArgsForCall) } -func (fake *FakeRoomStore) PersistParticipantCalls(stub func(string, *livekit.ParticipantInfo) error) { - fake.persistParticipantMutex.Lock() - defer fake.persistParticipantMutex.Unlock() - fake.PersistParticipantStub = stub +func (fake *FakeRoomStore) StoreParticipantCalls(stub func(string, *livekit.ParticipantInfo) error) { + fake.storeParticipantMutex.Lock() + defer fake.storeParticipantMutex.Unlock() + fake.StoreParticipantStub = stub } -func (fake *FakeRoomStore) PersistParticipantArgsForCall(i int) (string, *livekit.ParticipantInfo) { - fake.persistParticipantMutex.RLock() - defer fake.persistParticipantMutex.RUnlock() - argsForCall := fake.persistParticipantArgsForCall[i] +func (fake *FakeRoomStore) StoreParticipantArgsForCall(i int) (string, *livekit.ParticipantInfo) { + fake.storeParticipantMutex.RLock() + defer fake.storeParticipantMutex.RUnlock() + argsForCall := fake.storeParticipantArgsForCall[i] return argsForCall.arg1, argsForCall.arg2 } -func (fake *FakeRoomStore) PersistParticipantReturns(result1 error) { - fake.persistParticipantMutex.Lock() - defer fake.persistParticipantMutex.Unlock() - fake.PersistParticipantStub = nil - fake.persistParticipantReturns = struct { +func (fake *FakeRoomStore) StoreParticipantReturns(result1 error) { + fake.storeParticipantMutex.Lock() + defer fake.storeParticipantMutex.Unlock() + fake.StoreParticipantStub = nil + fake.storeParticipantReturns = struct { result1 error }{result1} } -func (fake *FakeRoomStore) PersistParticipantReturnsOnCall(i int, result1 error) { - fake.persistParticipantMutex.Lock() - defer fake.persistParticipantMutex.Unlock() - fake.PersistParticipantStub = nil - if fake.persistParticipantReturnsOnCall == nil { - fake.persistParticipantReturnsOnCall = make(map[int]struct { +func (fake *FakeRoomStore) StoreParticipantReturnsOnCall(i int, result1 error) { + fake.storeParticipantMutex.Lock() + defer fake.storeParticipantMutex.Unlock() + fake.StoreParticipantStub = nil + if fake.storeParticipantReturnsOnCall == nil { + fake.storeParticipantReturnsOnCall = make(map[int]struct { result1 error }) } - fake.persistParticipantReturnsOnCall[i] = struct { + fake.storeParticipantReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeRoomStore) StoreRoom(arg1 *livekit.Room) error { + fake.storeRoomMutex.Lock() + ret, specificReturn := fake.storeRoomReturnsOnCall[len(fake.storeRoomArgsForCall)] + fake.storeRoomArgsForCall = append(fake.storeRoomArgsForCall, struct { + arg1 *livekit.Room + }{arg1}) + stub := fake.StoreRoomStub + fakeReturns := fake.storeRoomReturns + fake.recordInvocation("StoreRoom", []interface{}{arg1}) + fake.storeRoomMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeRoomStore) StoreRoomCallCount() int { + fake.storeRoomMutex.RLock() + defer fake.storeRoomMutex.RUnlock() + return len(fake.storeRoomArgsForCall) +} + +func (fake *FakeRoomStore) StoreRoomCalls(stub func(*livekit.Room) error) { + fake.storeRoomMutex.Lock() + defer fake.storeRoomMutex.Unlock() + fake.StoreRoomStub = stub +} + +func (fake *FakeRoomStore) StoreRoomArgsForCall(i int) *livekit.Room { + fake.storeRoomMutex.RLock() + defer fake.storeRoomMutex.RUnlock() + argsForCall := fake.storeRoomArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeRoomStore) StoreRoomReturns(result1 error) { + fake.storeRoomMutex.Lock() + defer fake.storeRoomMutex.Unlock() + fake.StoreRoomStub = nil + fake.storeRoomReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeRoomStore) StoreRoomReturnsOnCall(i int, result1 error) { + fake.storeRoomMutex.Lock() + defer fake.storeRoomMutex.Unlock() + fake.StoreRoomStub = nil + if fake.storeRoomReturnsOnCall == nil { + fake.storeRoomReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.storeRoomReturnsOnCall[i] = struct { result1 error }{result1} } @@ -763,24 +763,24 @@ func (fake *FakeRoomStore) UnlockRoomReturnsOnCall(i int, result1 error) { func (fake *FakeRoomStore) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() - fake.createRoomMutex.RLock() - defer fake.createRoomMutex.RUnlock() fake.deleteParticipantMutex.RLock() defer fake.deleteParticipantMutex.RUnlock() fake.deleteRoomMutex.RLock() defer fake.deleteRoomMutex.RUnlock() - fake.getParticipantMutex.RLock() - defer fake.getParticipantMutex.RUnlock() - fake.getRoomMutex.RLock() - defer fake.getRoomMutex.RUnlock() fake.listParticipantsMutex.RLock() defer fake.listParticipantsMutex.RUnlock() fake.listRoomsMutex.RLock() defer fake.listRoomsMutex.RUnlock() + fake.loadParticipantMutex.RLock() + defer fake.loadParticipantMutex.RUnlock() + fake.loadRoomMutex.RLock() + defer fake.loadRoomMutex.RUnlock() fake.lockRoomMutex.RLock() defer fake.lockRoomMutex.RUnlock() - fake.persistParticipantMutex.RLock() - defer fake.persistParticipantMutex.RUnlock() + fake.storeParticipantMutex.RLock() + defer fake.storeParticipantMutex.RUnlock() + fake.storeRoomMutex.RLock() + defer fake.storeRoomMutex.RUnlock() fake.unlockRoomMutex.RLock() defer fake.unlockRoomMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} diff --git a/pkg/service/turn.go b/pkg/service/turn.go index 1cffd7537..2767dc609 100644 --- a/pkg/service/turn.go +++ b/pkg/service/turn.go @@ -96,11 +96,11 @@ func NewTurnServer(conf *config.Config, roomStore RoomStore, node routing.LocalN func newTurnAuthHandler(roomStore RoomStore) turn.AuthHandler { return func(username, realm string, srcAddr net.Addr) (key []byte, ok bool) { // room id should be the username, create a hashed room id - rm, err := roomStore.GetRoom(username) + rm, err := roomStore.LoadRoom(username) if err != nil { return nil, false } return turn.GenerateAuthKey(username, livekitRealm, rm.TurnPassword), true } -} \ No newline at end of file +} diff --git a/pkg/service/utils.go b/pkg/service/utils.go index 7c821471f..aff1b9d5c 100644 --- a/pkg/service/utils.go +++ b/pkg/service/utils.go @@ -27,9 +27,10 @@ var ServiceSet = wire.NewSet( NewRoomService, NewRTCService, NewLivekitServer, - NewRoomManager, + NewLocalRoomManager, NewTurnServer, config.GetAudioConfig, + wire.Bind(new(RoomManager), new(*LocalRoomManager)), wire.Bind(new(livekit.RecordingService), new(*RecordingService)), wire.Bind(new(livekit.RoomService), new(*RoomService)), ) diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index d5d3122f1..f4f166b86 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -25,21 +25,21 @@ func InitializeServer(conf *config.Config, keyProvider auth.KeyProvider, current if err != nil { return nil, err } - roomManager, err := NewRoomManager(roomStore, router, currentNode, nodeSelector, notifier, conf) + localRoomManager, err := NewLocalRoomManager(roomStore, router, currentNode, nodeSelector, notifier, conf) if err != nil { return nil, err } - roomService, err := NewRoomService(roomManager) + roomService, err := NewRoomService(localRoomManager, router) if err != nil { return nil, err } recordingService := NewRecordingService(client) - rtcService := NewRTCService(conf, roomManager, router, currentNode) + rtcService := NewRTCService(conf, localRoomManager, router, currentNode) server, err := NewTurnServer(conf, roomStore, currentNode) if err != nil { return nil, err } - livekitServer, err := NewLivekitServer(conf, roomService, recordingService, rtcService, keyProvider, router, roomManager, server, currentNode) + livekitServer, err := NewLivekitServer(conf, roomService, recordingService, rtcService, keyProvider, router, localRoomManager, server, currentNode) if err != nil { return nil, err }