mirror of
https://github.com/livekit/livekit.git
synced 2026-05-15 07:25:30 +00:00
Interfaces (#97)
* create interface * move room manager to interface * interfaces * updates * fix loop * fix fakes * remove node types
This commit is contained in:
@@ -14,7 +14,7 @@ require (
|
||||
github.com/google/wire v0.5.0
|
||||
github.com/gorilla/websocket v1.4.2
|
||||
github.com/jxskiss/base62 v0.0.0-20191017122030-4f11678b909b
|
||||
github.com/livekit/protocol v0.8.0
|
||||
github.com/livekit/protocol v0.8.1
|
||||
github.com/magefile/mage v1.11.0
|
||||
github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0
|
||||
github.com/mitchellh/go-homedir v1.1.0
|
||||
|
||||
@@ -241,8 +241,8 @@ github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY
|
||||
github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
|
||||
github.com/livekit/ion-sfu v1.20.8 h1:nNyih1cq82dmuBv41XKH3qTDFtRvTnHwCKmUBFGz/Vc=
|
||||
github.com/livekit/ion-sfu v1.20.8/go.mod h1:g8hwobZI5fvX1RXvayf4ZXkgP7spV5YGE4yTSsumpB4=
|
||||
github.com/livekit/protocol v0.8.0 h1:cfiSy12WUozFBLi0dE6LEoignJOOPra0Fvf/5MJOI1I=
|
||||
github.com/livekit/protocol v0.8.0/go.mod h1:OczgiKz3Uo6g35oav5g/m3fAFrxd1sROWKmOj3wsVx0=
|
||||
github.com/livekit/protocol v0.8.1 h1:J4mz1C2rUM6696xc1UQJ+qpi7DJXLWl9oTR68xYTGX8=
|
||||
github.com/livekit/protocol v0.8.1/go.mod h1:YKcyBbqH0WmNL35i7c5jxr1L2Az13oT10oGNDOijI20=
|
||||
github.com/lucsky/cuid v1.0.2 h1:z4XlExeoderxoPj2/dxKOyPxe9RCOu7yNq9/XWxIUMQ=
|
||||
github.com/lucsky/cuid v1.0.2/go.mod h1:QaaJqckboimOmhRSJXSx/+IT+VTfxfPGSo/6mfgUfmE=
|
||||
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
|
||||
|
||||
@@ -0,0 +1,44 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
livekit "github.com/livekit/protocol/proto"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/routing"
|
||||
"github.com/livekit/livekit-server/pkg/rtc"
|
||||
)
|
||||
|
||||
//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate
|
||||
|
||||
// encapsulates CRUD operations for room settings
|
||||
// look up participant
|
||||
//counterfeiter:generate . RoomStore
|
||||
type RoomStore interface {
|
||||
StoreRoom(room *livekit.Room) error
|
||||
LoadRoom(idOrName string) (*livekit.Room, error)
|
||||
ListRooms() ([]*livekit.Room, error)
|
||||
DeleteRoom(idOrName string) error
|
||||
|
||||
// enable locking on a specific room to prevent race
|
||||
// returns a (lock uuid, error)
|
||||
LockRoom(name string, duration time.Duration) (string, error)
|
||||
UnlockRoom(name string, uid string) error
|
||||
|
||||
StoreParticipant(roomName string, participant *livekit.ParticipantInfo) error
|
||||
LoadParticipant(roomName, identity string) (*livekit.ParticipantInfo, error)
|
||||
ListParticipants(roomName string) ([]*livekit.ParticipantInfo, error)
|
||||
DeleteParticipant(roomName, identity string) error
|
||||
}
|
||||
|
||||
type RoomManager interface {
|
||||
RoomStore
|
||||
|
||||
CreateRoom(req *livekit.CreateRoomRequest) (*livekit.Room, error)
|
||||
GetRoom(roomName string) *rtc.Room
|
||||
DeleteRoom(roomName string) error
|
||||
CleanupRooms() error
|
||||
CloseIdleRooms()
|
||||
Stop()
|
||||
StartSession(roomName string, pi routing.ParticipantInit, requestSource routing.MessageSource, responseSink routing.MessageSink)
|
||||
}
|
||||
@@ -28,7 +28,7 @@ func NewLocalRoomStore() *LocalRoomStore {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *LocalRoomStore) CreateRoom(room *livekit.Room) error {
|
||||
func (p *LocalRoomStore) StoreRoom(room *livekit.Room) error {
|
||||
if room.CreationTime == 0 {
|
||||
room.CreationTime = time.Now().Unix()
|
||||
}
|
||||
@@ -39,7 +39,7 @@ func (p *LocalRoomStore) CreateRoom(room *livekit.Room) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *LocalRoomStore) GetRoom(idOrName string) (*livekit.Room, error) {
|
||||
func (p *LocalRoomStore) LoadRoom(idOrName string) (*livekit.Room, error) {
|
||||
p.lock.RLock()
|
||||
defer p.lock.RUnlock()
|
||||
// see if it's an id or name
|
||||
@@ -65,7 +65,7 @@ func (p *LocalRoomStore) ListRooms() ([]*livekit.Room, error) {
|
||||
}
|
||||
|
||||
func (p *LocalRoomStore) DeleteRoom(idOrName string) error {
|
||||
room, err := p.GetRoom(idOrName)
|
||||
room, err := p.LoadRoom(idOrName)
|
||||
if err == ErrRoomNotFound {
|
||||
return nil
|
||||
} else if err != nil {
|
||||
@@ -92,7 +92,7 @@ func (p *LocalRoomStore) UnlockRoom(name string, uid string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *LocalRoomStore) PersistParticipant(roomName string, participant *livekit.ParticipantInfo) error {
|
||||
func (p *LocalRoomStore) StoreParticipant(roomName string, participant *livekit.ParticipantInfo) error {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
roomParticipants := p.participants[roomName]
|
||||
@@ -104,7 +104,7 @@ func (p *LocalRoomStore) PersistParticipant(roomName string, participant *liveki
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *LocalRoomStore) GetParticipant(roomName, identity string) (*livekit.ParticipantInfo, error) {
|
||||
func (p *LocalRoomStore) LoadParticipant(roomName, identity string) (*livekit.ParticipantInfo, error) {
|
||||
p.lock.RLock()
|
||||
defer p.lock.RUnlock()
|
||||
|
||||
|
||||
@@ -38,7 +38,7 @@ func NewRedisRoomStore(rc *redis.Client) *RedisRoomStore {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *RedisRoomStore) CreateRoom(room *livekit.Room) error {
|
||||
func (p *RedisRoomStore) StoreRoom(room *livekit.Room) error {
|
||||
if room.CreationTime == 0 {
|
||||
room.CreationTime = time.Now().Unix()
|
||||
}
|
||||
@@ -58,7 +58,7 @@ func (p *RedisRoomStore) CreateRoom(room *livekit.Room) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *RedisRoomStore) GetRoom(idOrName string) (*livekit.Room, error) {
|
||||
func (p *RedisRoomStore) LoadRoom(idOrName string) (*livekit.Room, error) {
|
||||
// see if matches any ids
|
||||
name, err := p.rc.HGet(p.ctx, RoomIdMap, idOrName).Result()
|
||||
if err != nil {
|
||||
@@ -102,7 +102,7 @@ func (p *RedisRoomStore) ListRooms() ([]*livekit.Room, error) {
|
||||
}
|
||||
|
||||
func (p *RedisRoomStore) DeleteRoom(idOrName string) error {
|
||||
room, err := p.GetRoom(idOrName)
|
||||
room, err := p.LoadRoom(idOrName)
|
||||
var sid, name string
|
||||
|
||||
if err == ErrRoomNotFound {
|
||||
@@ -167,7 +167,7 @@ func (p *RedisRoomStore) UnlockRoom(name string, uid string) error {
|
||||
return p.rc.Del(p.ctx, key).Err()
|
||||
}
|
||||
|
||||
func (p *RedisRoomStore) PersistParticipant(roomName string, participant *livekit.ParticipantInfo) error {
|
||||
func (p *RedisRoomStore) StoreParticipant(roomName string, participant *livekit.ParticipantInfo) error {
|
||||
key := RoomParticipantsPrefix + roomName
|
||||
|
||||
data, err := proto.Marshal(participant)
|
||||
@@ -178,7 +178,7 @@ func (p *RedisRoomStore) PersistParticipant(roomName string, participant *liveki
|
||||
return p.rc.HSet(p.ctx, key, participant.Identity, data).Err()
|
||||
}
|
||||
|
||||
func (p *RedisRoomStore) GetParticipant(roomName, identity string) (*livekit.ParticipantInfo, error) {
|
||||
func (p *RedisRoomStore) LoadParticipant(roomName, identity string) (*livekit.ParticipantInfo, error) {
|
||||
key := RoomParticipantsPrefix + roomName
|
||||
data, err := p.rc.HGet(p.ctx, key, identity).Result()
|
||||
if err == redis.Nil {
|
||||
|
||||
@@ -32,10 +32,10 @@ func TestParticipantPersistence(t *testing.T) {
|
||||
}
|
||||
|
||||
// create the participant
|
||||
require.NoError(t, rs.PersistParticipant(roomName, p))
|
||||
require.NoError(t, rs.StoreParticipant(roomName, p))
|
||||
|
||||
// result should match
|
||||
pGet, err := rs.GetParticipant(roomName, p.Identity)
|
||||
pGet, err := rs.LoadParticipant(roomName, p.Identity)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, p.Identity, pGet.Identity)
|
||||
require.Equal(t, len(p.Tracks), len(pGet.Tracks))
|
||||
@@ -54,7 +54,7 @@ func TestParticipantPersistence(t *testing.T) {
|
||||
require.Len(t, participants, 0)
|
||||
|
||||
// shouldn't be able to get it
|
||||
_, err = rs.GetParticipant(roomName, p.Identity)
|
||||
_, err = rs.LoadParticipant(roomName, p.Identity)
|
||||
require.Equal(t, err, service.ErrParticipantNotFound)
|
||||
}
|
||||
|
||||
|
||||
+29
-28
@@ -22,11 +22,12 @@ const (
|
||||
roomPurgeSeconds = 24 * 60 * 60
|
||||
)
|
||||
|
||||
// RoomManager manages rooms and its interaction with participants.
|
||||
// LocalRoomManager manages rooms and its interaction with participants.
|
||||
// It's responsible for creating, deleting rooms, as well as running sessions for participants
|
||||
type RoomManager struct {
|
||||
type LocalRoomManager struct {
|
||||
RoomStore
|
||||
|
||||
lock sync.RWMutex
|
||||
roomStore RoomStore
|
||||
selector routing.NodeSelector
|
||||
router routing.Router
|
||||
currentNode routing.LocalNode
|
||||
@@ -37,16 +38,16 @@ type RoomManager struct {
|
||||
rooms map[string]*rtc.Room
|
||||
}
|
||||
|
||||
func NewRoomManager(rp RoomStore, router routing.Router, currentNode routing.LocalNode, selector routing.NodeSelector,
|
||||
notifier *webhook.Notifier, conf *config.Config) (*RoomManager, error) {
|
||||
func NewLocalRoomManager(rp RoomStore, router routing.Router, currentNode routing.LocalNode, selector routing.NodeSelector,
|
||||
notifier *webhook.Notifier, conf *config.Config) (*LocalRoomManager, error) {
|
||||
rtcConf, err := rtc.NewWebRTCConfig(conf, currentNode.Ip)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &RoomManager{
|
||||
return &LocalRoomManager{
|
||||
RoomStore: rp,
|
||||
lock: sync.RWMutex{},
|
||||
roomStore: rp,
|
||||
rtcConfig: rtcConf,
|
||||
config: conf,
|
||||
router: router,
|
||||
@@ -60,17 +61,17 @@ func NewRoomManager(rp RoomStore, router routing.Router, currentNode routing.Loc
|
||||
|
||||
// CreateRoom creates a new room from a request and allocates it to a node to handle
|
||||
// it'll also monitor fits state, and cleans it up when appropriate
|
||||
func (r *RoomManager) CreateRoom(req *livekit.CreateRoomRequest) (*livekit.Room, error) {
|
||||
token, err := r.roomStore.LockRoom(req.Name, 5*time.Second)
|
||||
func (r *LocalRoomManager) CreateRoom(req *livekit.CreateRoomRequest) (*livekit.Room, error) {
|
||||
token, err := r.LockRoom(req.Name, 5*time.Second)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
_ = r.roomStore.UnlockRoom(req.Name, token)
|
||||
_ = r.UnlockRoom(req.Name, token)
|
||||
}()
|
||||
|
||||
// find existing room and update it
|
||||
rm, err := r.roomStore.GetRoom(req.Name)
|
||||
rm, err := r.LoadRoom(req.Name)
|
||||
if err == ErrRoomNotFound {
|
||||
rm = &livekit.Room{
|
||||
Sid: utils.NewGuid(utils.RoomPrefix),
|
||||
@@ -89,7 +90,7 @@ func (r *RoomManager) CreateRoom(req *livekit.CreateRoomRequest) (*livekit.Room,
|
||||
if req.MaxParticipants > 0 {
|
||||
rm.MaxParticipants = req.MaxParticipants
|
||||
}
|
||||
if err := r.roomStore.CreateRoom(rm); err != nil {
|
||||
if err := r.StoreRoom(rm); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -128,14 +129,14 @@ func (r *RoomManager) CreateRoom(req *livekit.CreateRoomRequest) (*livekit.Room,
|
||||
return rm, nil
|
||||
}
|
||||
|
||||
func (r *RoomManager) GetRoom(roomName string) *rtc.Room {
|
||||
func (r *LocalRoomManager) GetRoom(roomName string) *rtc.Room {
|
||||
r.lock.RLock()
|
||||
defer r.lock.RUnlock()
|
||||
return r.rooms[roomName]
|
||||
}
|
||||
|
||||
// DeleteRoom completely deletes all room information, including active sessions, room store, and routing info
|
||||
func (r *RoomManager) DeleteRoom(roomName string) error {
|
||||
func (r *LocalRoomManager) DeleteRoom(roomName string) error {
|
||||
logger.Infow("deleting room state", "room", roomName)
|
||||
r.lock.Lock()
|
||||
delete(r.rooms, roomName)
|
||||
@@ -152,7 +153,7 @@ func (r *RoomManager) DeleteRoom(roomName string) error {
|
||||
// also delete room from db
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
err2 = r.roomStore.DeleteRoom(roomName)
|
||||
err2 = r.RoomStore.DeleteRoom(roomName)
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
@@ -164,9 +165,9 @@ func (r *RoomManager) DeleteRoom(roomName string) error {
|
||||
}
|
||||
|
||||
// CleanupRooms cleans up after old rooms that have been around for awhile
|
||||
func (r *RoomManager) CleanupRooms() error {
|
||||
func (r *LocalRoomManager) CleanupRooms() error {
|
||||
// cleanup rooms that have been left for over a day
|
||||
rooms, err := r.roomStore.ListRooms()
|
||||
rooms, err := r.ListRooms()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -182,7 +183,7 @@ func (r *RoomManager) CleanupRooms() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *RoomManager) CloseIdleRooms() {
|
||||
func (r *LocalRoomManager) CloseIdleRooms() {
|
||||
r.lock.RLock()
|
||||
rooms := make([]*rtc.Room, 0, len(r.rooms))
|
||||
for _, rm := range r.rooms {
|
||||
@@ -195,7 +196,7 @@ func (r *RoomManager) CloseIdleRooms() {
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RoomManager) Stop() {
|
||||
func (r *LocalRoomManager) Stop() {
|
||||
// disconnect all clients
|
||||
r.lock.RLock()
|
||||
rooms := make([]*rtc.Room, 0, len(r.rooms))
|
||||
@@ -222,7 +223,7 @@ func (r *RoomManager) Stop() {
|
||||
}
|
||||
|
||||
// StartSession starts WebRTC session when a new participant is connected, takes place on RTC node
|
||||
func (r *RoomManager) StartSession(roomName string, pi routing.ParticipantInit, requestSource routing.MessageSource, responseSink routing.MessageSink) {
|
||||
func (r *LocalRoomManager) StartSession(roomName string, pi routing.ParticipantInit, requestSource routing.MessageSource, responseSink routing.MessageSink) {
|
||||
room, err := r.getOrCreateRoom(roomName)
|
||||
if err != nil {
|
||||
logger.Errorw("could not create room", err, "room", roomName)
|
||||
@@ -325,7 +326,7 @@ func (r *RoomManager) StartSession(roomName string, pi routing.ParticipantInit,
|
||||
}
|
||||
|
||||
// create the actual room object, to be used on RTC node
|
||||
func (r *RoomManager) getOrCreateRoom(roomName string) (*rtc.Room, error) {
|
||||
func (r *LocalRoomManager) getOrCreateRoom(roomName string) (*rtc.Room, error) {
|
||||
r.lock.RLock()
|
||||
room := r.rooms[roomName]
|
||||
r.lock.RUnlock()
|
||||
@@ -335,7 +336,7 @@ func (r *RoomManager) getOrCreateRoom(roomName string) (*rtc.Room, error) {
|
||||
}
|
||||
|
||||
// create new room, get details first
|
||||
ri, err := r.roomStore.GetRoom(roomName)
|
||||
ri, err := r.LoadRoom(roomName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -361,9 +362,9 @@ func (r *RoomManager) getOrCreateRoom(roomName string) (*rtc.Room, error) {
|
||||
room.OnParticipantChanged(func(p types.Participant) {
|
||||
var err error
|
||||
if p.State() == livekit.ParticipantInfo_DISCONNECTED {
|
||||
err = r.roomStore.DeleteParticipant(roomName, p.Identity())
|
||||
err = r.DeleteParticipant(roomName, p.Identity())
|
||||
} else {
|
||||
err = r.roomStore.PersistParticipant(roomName, p.ToProto())
|
||||
err = r.StoreParticipant(roomName, p.ToProto())
|
||||
}
|
||||
if err != nil {
|
||||
logger.Errorw("could not handle participant change", err)
|
||||
@@ -382,7 +383,7 @@ func (r *RoomManager) getOrCreateRoom(roomName string) (*rtc.Room, error) {
|
||||
}
|
||||
|
||||
// manages an RTC session for a participant, runs on the RTC node
|
||||
func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.Participant, requestSource routing.MessageSource) {
|
||||
func (r *LocalRoomManager) rtcSessionWorker(room *rtc.Room, participant types.Participant, requestSource routing.MessageSource) {
|
||||
defer func() {
|
||||
logger.Debugw("RTC session finishing",
|
||||
"participant", participant.Identity(),
|
||||
@@ -482,7 +483,7 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.Partici
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RoomManager) handleRTCMessage(roomName, identity string, msg *livekit.RTCNodeMessage) {
|
||||
func (r *LocalRoomManager) handleRTCMessage(roomName, identity string, msg *livekit.RTCNodeMessage) {
|
||||
r.lock.RLock()
|
||||
room := r.rooms[roomName]
|
||||
r.lock.RUnlock()
|
||||
@@ -537,7 +538,7 @@ func (r *RoomManager) handleRTCMessage(roomName, identity string, msg *livekit.R
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RoomManager) iceServersForRoom(ri *livekit.Room) []*livekit.ICEServer {
|
||||
func (r *LocalRoomManager) iceServersForRoom(ri *livekit.Room) []*livekit.ICEServer {
|
||||
var iceServers []*livekit.ICEServer
|
||||
|
||||
hasSTUN := false
|
||||
@@ -571,7 +572,7 @@ func (r *RoomManager) iceServersForRoom(ri *livekit.Room) []*livekit.ICEServer {
|
||||
return iceServers
|
||||
}
|
||||
|
||||
func (r *RoomManager) notifyEvent(event *livekit.WebhookEvent) {
|
||||
func (r *LocalRoomManager) notifyEvent(event *livekit.WebhookEvent) {
|
||||
if r.notifier == nil {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -24,9 +24,9 @@ func TestCreateRoom(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func newTestRoomManager(t *testing.T) (*service.RoomManager, *config.Config) {
|
||||
func newTestRoomManager(t *testing.T) (*service.LocalRoomManager, *config.Config) {
|
||||
store := &servicefakes.FakeRoomStore{}
|
||||
store.GetRoomReturns(nil, service.ErrRoomNotFound)
|
||||
store.LoadRoomReturns(nil, service.ErrRoomNotFound)
|
||||
router := &routingfakes.FakeRouter{}
|
||||
conf, err := config.NewConfig("", nil)
|
||||
require.NoError(t, err)
|
||||
@@ -36,7 +36,7 @@ func newTestRoomManager(t *testing.T) (*service.RoomManager, *config.Config) {
|
||||
|
||||
router.GetNodeForRoomReturns(node, nil)
|
||||
|
||||
rm, err := service.NewRoomManager(store, router, node, selector, nil, conf)
|
||||
rm, err := service.NewLocalRoomManager(store, router, node, selector, nil, conf)
|
||||
require.NoError(t, err)
|
||||
|
||||
return rm, conf
|
||||
|
||||
+16
-12
@@ -13,11 +13,15 @@ import (
|
||||
|
||||
// A rooms service that supports a single node
|
||||
type RoomService struct {
|
||||
roomManager *RoomManager
|
||||
router routing.Router
|
||||
roomManager RoomManager
|
||||
}
|
||||
|
||||
func NewRoomService(roomManager *RoomManager) (svc *RoomService, err error) {
|
||||
svc = &RoomService{roomManager: roomManager}
|
||||
func NewRoomService(roomManager RoomManager, router routing.Router) (svc *RoomService, err error) {
|
||||
svc = &RoomService{
|
||||
router: router,
|
||||
roomManager: roomManager,
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -40,7 +44,7 @@ func (s *RoomService) ListRooms(ctx context.Context, req *livekit.ListRoomsReque
|
||||
return nil, twirpAuthError(err)
|
||||
}
|
||||
|
||||
rooms, err := s.roomManager.roomStore.ListRooms()
|
||||
rooms, err := s.roomManager.ListRooms()
|
||||
if err != nil {
|
||||
// TODO: translate error codes to twirp
|
||||
return
|
||||
@@ -58,7 +62,7 @@ func (s *RoomService) DeleteRoom(ctx context.Context, req *livekit.DeleteRoomReq
|
||||
}
|
||||
// if the room is currently active, RTC node needs to disconnect clients
|
||||
// here we are using any user's identity, due to how it works with routing
|
||||
participants, err := s.roomManager.roomStore.ListParticipants(req.Room)
|
||||
participants, err := s.roomManager.ListParticipants(req.Room)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -88,7 +92,7 @@ func (s *RoomService) ListParticipants(ctx context.Context, req *livekit.ListPar
|
||||
return nil, twirpAuthError(err)
|
||||
}
|
||||
|
||||
participants, err := s.roomManager.roomStore.ListParticipants(req.Room)
|
||||
participants, err := s.roomManager.ListParticipants(req.Room)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@@ -104,7 +108,7 @@ func (s *RoomService) GetParticipant(ctx context.Context, req *livekit.RoomParti
|
||||
return nil, twirpAuthError(err)
|
||||
}
|
||||
|
||||
participant, err := s.roomManager.roomStore.GetParticipant(req.Room, req.Identity)
|
||||
participant, err := s.roomManager.LoadParticipant(req.Room, req.Identity)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@@ -132,7 +136,7 @@ func (s *RoomService) MutePublishedTrack(ctx context.Context, req *livekit.MuteR
|
||||
return nil, twirpAuthError(err)
|
||||
}
|
||||
|
||||
participant, err := s.roomManager.roomStore.GetParticipant(req.Room, req.Identity)
|
||||
participant, err := s.roomManager.LoadParticipant(req.Room, req.Identity)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -172,7 +176,7 @@ func (s *RoomService) UpdateParticipant(ctx context.Context, req *livekit.Update
|
||||
return nil, err
|
||||
}
|
||||
|
||||
participant, err := s.roomManager.roomStore.GetParticipant(req.Room, req.Identity)
|
||||
participant, err := s.roomManager.LoadParticipant(req.Room, req.Identity)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -196,7 +200,7 @@ func (s *RoomService) UpdateSubscriptions(ctx context.Context, req *livekit.Upda
|
||||
|
||||
func (s *RoomService) SendData(ctx context.Context, req *livekit.SendDataRequest) (*livekit.SendDataResponse, error) {
|
||||
// here we are using any user's identity, due to how it works with routing
|
||||
participants, err := s.roomManager.roomStore.ListParticipants(req.Room)
|
||||
participants, err := s.roomManager.ListParticipants(req.Room)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -220,12 +224,12 @@ func (s *RoomService) createRTCSink(ctx context.Context, room, identity string)
|
||||
return nil, twirpAuthError(err)
|
||||
}
|
||||
|
||||
_, err := s.roomManager.roomStore.GetParticipant(room, identity)
|
||||
_, err := s.roomManager.LoadParticipant(room, identity)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return s.roomManager.router.CreateRTCSink(room, identity)
|
||||
return s.router.CreateRTCSink(room, identity)
|
||||
}
|
||||
|
||||
func (s *RoomService) writeMessage(ctx context.Context, room, identity string, msg *livekit.RTCNodeMessage) error {
|
||||
|
||||
@@ -1,29 +0,0 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
livekit "github.com/livekit/protocol/proto"
|
||||
)
|
||||
|
||||
//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate
|
||||
|
||||
// encapsulates CRUD operations for room settings
|
||||
// look up participant
|
||||
//counterfeiter:generate . RoomStore
|
||||
type RoomStore interface {
|
||||
CreateRoom(room *livekit.Room) error
|
||||
GetRoom(idOrName string) (*livekit.Room, error)
|
||||
ListRooms() ([]*livekit.Room, error)
|
||||
DeleteRoom(idOrName string) error
|
||||
|
||||
// enable locking on a specific room to prevent race
|
||||
// returns a (lock uuid, error)
|
||||
LockRoom(name string, duration time.Duration) (string, error)
|
||||
UnlockRoom(name string, uid string) error
|
||||
|
||||
PersistParticipant(roomName string, participant *livekit.ParticipantInfo) error
|
||||
GetParticipant(roomName, identity string) (*livekit.ParticipantInfo, error)
|
||||
ListParticipants(roomName string) ([]*livekit.ParticipantInfo, error)
|
||||
DeleteParticipant(roomName, identity string) error
|
||||
}
|
||||
@@ -19,13 +19,13 @@ import (
|
||||
|
||||
type RTCService struct {
|
||||
router routing.Router
|
||||
roomManager *RoomManager
|
||||
roomManager RoomManager
|
||||
upgrader websocket.Upgrader
|
||||
currentNode routing.LocalNode
|
||||
isDev bool
|
||||
}
|
||||
|
||||
func NewRTCService(conf *config.Config, roomManager *RoomManager, router routing.Router, currentNode routing.LocalNode) *RTCService {
|
||||
func NewRTCService(conf *config.Config, roomManager RoomManager, router routing.Router, currentNode routing.LocalNode) *RTCService {
|
||||
s := &RTCService{
|
||||
router: router,
|
||||
roomManager: roomManager,
|
||||
|
||||
@@ -31,7 +31,7 @@ type LivekitServer struct {
|
||||
httpServer *http.Server
|
||||
promServer *http.Server
|
||||
router routing.Router
|
||||
roomManager *RoomManager
|
||||
roomManager *LocalRoomManager
|
||||
turnServer *turn.Server
|
||||
currentNode routing.LocalNode
|
||||
running utils.AtomicFlag
|
||||
@@ -45,7 +45,7 @@ func NewLivekitServer(conf *config.Config,
|
||||
rtcService *RTCService,
|
||||
keyProvider auth.KeyProvider,
|
||||
router routing.Router,
|
||||
roomManager *RoomManager,
|
||||
roomManager *LocalRoomManager,
|
||||
turnServer *turn.Server,
|
||||
currentNode routing.LocalNode,
|
||||
) (s *LivekitServer, err error) {
|
||||
@@ -219,7 +219,7 @@ func (s *LivekitServer) Stop() {
|
||||
<-s.closedChan
|
||||
}
|
||||
|
||||
func (s *LivekitServer) RoomManager() *RoomManager {
|
||||
func (s *LivekitServer) RoomManager() RoomManager {
|
||||
return s.roomManager
|
||||
}
|
||||
|
||||
|
||||
@@ -10,17 +10,6 @@ import (
|
||||
)
|
||||
|
||||
type FakeRoomStore struct {
|
||||
CreateRoomStub func(*livekit.Room) error
|
||||
createRoomMutex sync.RWMutex
|
||||
createRoomArgsForCall []struct {
|
||||
arg1 *livekit.Room
|
||||
}
|
||||
createRoomReturns struct {
|
||||
result1 error
|
||||
}
|
||||
createRoomReturnsOnCall map[int]struct {
|
||||
result1 error
|
||||
}
|
||||
DeleteParticipantStub func(string, string) error
|
||||
deleteParticipantMutex sync.RWMutex
|
||||
deleteParticipantArgsForCall []struct {
|
||||
@@ -44,33 +33,6 @@ type FakeRoomStore struct {
|
||||
deleteRoomReturnsOnCall map[int]struct {
|
||||
result1 error
|
||||
}
|
||||
GetParticipantStub func(string, string) (*livekit.ParticipantInfo, error)
|
||||
getParticipantMutex sync.RWMutex
|
||||
getParticipantArgsForCall []struct {
|
||||
arg1 string
|
||||
arg2 string
|
||||
}
|
||||
getParticipantReturns struct {
|
||||
result1 *livekit.ParticipantInfo
|
||||
result2 error
|
||||
}
|
||||
getParticipantReturnsOnCall map[int]struct {
|
||||
result1 *livekit.ParticipantInfo
|
||||
result2 error
|
||||
}
|
||||
GetRoomStub func(string) (*livekit.Room, error)
|
||||
getRoomMutex sync.RWMutex
|
||||
getRoomArgsForCall []struct {
|
||||
arg1 string
|
||||
}
|
||||
getRoomReturns struct {
|
||||
result1 *livekit.Room
|
||||
result2 error
|
||||
}
|
||||
getRoomReturnsOnCall map[int]struct {
|
||||
result1 *livekit.Room
|
||||
result2 error
|
||||
}
|
||||
ListParticipantsStub func(string) ([]*livekit.ParticipantInfo, error)
|
||||
listParticipantsMutex sync.RWMutex
|
||||
listParticipantsArgsForCall []struct {
|
||||
@@ -96,6 +58,33 @@ type FakeRoomStore struct {
|
||||
result1 []*livekit.Room
|
||||
result2 error
|
||||
}
|
||||
LoadParticipantStub func(string, string) (*livekit.ParticipantInfo, error)
|
||||
loadParticipantMutex sync.RWMutex
|
||||
loadParticipantArgsForCall []struct {
|
||||
arg1 string
|
||||
arg2 string
|
||||
}
|
||||
loadParticipantReturns struct {
|
||||
result1 *livekit.ParticipantInfo
|
||||
result2 error
|
||||
}
|
||||
loadParticipantReturnsOnCall map[int]struct {
|
||||
result1 *livekit.ParticipantInfo
|
||||
result2 error
|
||||
}
|
||||
LoadRoomStub func(string) (*livekit.Room, error)
|
||||
loadRoomMutex sync.RWMutex
|
||||
loadRoomArgsForCall []struct {
|
||||
arg1 string
|
||||
}
|
||||
loadRoomReturns struct {
|
||||
result1 *livekit.Room
|
||||
result2 error
|
||||
}
|
||||
loadRoomReturnsOnCall map[int]struct {
|
||||
result1 *livekit.Room
|
||||
result2 error
|
||||
}
|
||||
LockRoomStub func(string, time.Duration) (string, error)
|
||||
lockRoomMutex sync.RWMutex
|
||||
lockRoomArgsForCall []struct {
|
||||
@@ -110,16 +99,27 @@ type FakeRoomStore struct {
|
||||
result1 string
|
||||
result2 error
|
||||
}
|
||||
PersistParticipantStub func(string, *livekit.ParticipantInfo) error
|
||||
persistParticipantMutex sync.RWMutex
|
||||
persistParticipantArgsForCall []struct {
|
||||
StoreParticipantStub func(string, *livekit.ParticipantInfo) error
|
||||
storeParticipantMutex sync.RWMutex
|
||||
storeParticipantArgsForCall []struct {
|
||||
arg1 string
|
||||
arg2 *livekit.ParticipantInfo
|
||||
}
|
||||
persistParticipantReturns struct {
|
||||
storeParticipantReturns struct {
|
||||
result1 error
|
||||
}
|
||||
persistParticipantReturnsOnCall map[int]struct {
|
||||
storeParticipantReturnsOnCall map[int]struct {
|
||||
result1 error
|
||||
}
|
||||
StoreRoomStub func(*livekit.Room) error
|
||||
storeRoomMutex sync.RWMutex
|
||||
storeRoomArgsForCall []struct {
|
||||
arg1 *livekit.Room
|
||||
}
|
||||
storeRoomReturns struct {
|
||||
result1 error
|
||||
}
|
||||
storeRoomReturnsOnCall map[int]struct {
|
||||
result1 error
|
||||
}
|
||||
UnlockRoomStub func(string, string) error
|
||||
@@ -138,67 +138,6 @@ type FakeRoomStore struct {
|
||||
invocationsMutex sync.RWMutex
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) CreateRoom(arg1 *livekit.Room) error {
|
||||
fake.createRoomMutex.Lock()
|
||||
ret, specificReturn := fake.createRoomReturnsOnCall[len(fake.createRoomArgsForCall)]
|
||||
fake.createRoomArgsForCall = append(fake.createRoomArgsForCall, struct {
|
||||
arg1 *livekit.Room
|
||||
}{arg1})
|
||||
stub := fake.CreateRoomStub
|
||||
fakeReturns := fake.createRoomReturns
|
||||
fake.recordInvocation("CreateRoom", []interface{}{arg1})
|
||||
fake.createRoomMutex.Unlock()
|
||||
if stub != nil {
|
||||
return stub(arg1)
|
||||
}
|
||||
if specificReturn {
|
||||
return ret.result1
|
||||
}
|
||||
return fakeReturns.result1
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) CreateRoomCallCount() int {
|
||||
fake.createRoomMutex.RLock()
|
||||
defer fake.createRoomMutex.RUnlock()
|
||||
return len(fake.createRoomArgsForCall)
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) CreateRoomCalls(stub func(*livekit.Room) error) {
|
||||
fake.createRoomMutex.Lock()
|
||||
defer fake.createRoomMutex.Unlock()
|
||||
fake.CreateRoomStub = stub
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) CreateRoomArgsForCall(i int) *livekit.Room {
|
||||
fake.createRoomMutex.RLock()
|
||||
defer fake.createRoomMutex.RUnlock()
|
||||
argsForCall := fake.createRoomArgsForCall[i]
|
||||
return argsForCall.arg1
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) CreateRoomReturns(result1 error) {
|
||||
fake.createRoomMutex.Lock()
|
||||
defer fake.createRoomMutex.Unlock()
|
||||
fake.CreateRoomStub = nil
|
||||
fake.createRoomReturns = struct {
|
||||
result1 error
|
||||
}{result1}
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) CreateRoomReturnsOnCall(i int, result1 error) {
|
||||
fake.createRoomMutex.Lock()
|
||||
defer fake.createRoomMutex.Unlock()
|
||||
fake.CreateRoomStub = nil
|
||||
if fake.createRoomReturnsOnCall == nil {
|
||||
fake.createRoomReturnsOnCall = make(map[int]struct {
|
||||
result1 error
|
||||
})
|
||||
}
|
||||
fake.createRoomReturnsOnCall[i] = struct {
|
||||
result1 error
|
||||
}{result1}
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) DeleteParticipant(arg1 string, arg2 string) error {
|
||||
fake.deleteParticipantMutex.Lock()
|
||||
ret, specificReturn := fake.deleteParticipantReturnsOnCall[len(fake.deleteParticipantArgsForCall)]
|
||||
@@ -322,135 +261,6 @@ func (fake *FakeRoomStore) DeleteRoomReturnsOnCall(i int, result1 error) {
|
||||
}{result1}
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) GetParticipant(arg1 string, arg2 string) (*livekit.ParticipantInfo, error) {
|
||||
fake.getParticipantMutex.Lock()
|
||||
ret, specificReturn := fake.getParticipantReturnsOnCall[len(fake.getParticipantArgsForCall)]
|
||||
fake.getParticipantArgsForCall = append(fake.getParticipantArgsForCall, struct {
|
||||
arg1 string
|
||||
arg2 string
|
||||
}{arg1, arg2})
|
||||
stub := fake.GetParticipantStub
|
||||
fakeReturns := fake.getParticipantReturns
|
||||
fake.recordInvocation("GetParticipant", []interface{}{arg1, arg2})
|
||||
fake.getParticipantMutex.Unlock()
|
||||
if stub != nil {
|
||||
return stub(arg1, arg2)
|
||||
}
|
||||
if specificReturn {
|
||||
return ret.result1, ret.result2
|
||||
}
|
||||
return fakeReturns.result1, fakeReturns.result2
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) GetParticipantCallCount() int {
|
||||
fake.getParticipantMutex.RLock()
|
||||
defer fake.getParticipantMutex.RUnlock()
|
||||
return len(fake.getParticipantArgsForCall)
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) GetParticipantCalls(stub func(string, string) (*livekit.ParticipantInfo, error)) {
|
||||
fake.getParticipantMutex.Lock()
|
||||
defer fake.getParticipantMutex.Unlock()
|
||||
fake.GetParticipantStub = stub
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) GetParticipantArgsForCall(i int) (string, string) {
|
||||
fake.getParticipantMutex.RLock()
|
||||
defer fake.getParticipantMutex.RUnlock()
|
||||
argsForCall := fake.getParticipantArgsForCall[i]
|
||||
return argsForCall.arg1, argsForCall.arg2
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) GetParticipantReturns(result1 *livekit.ParticipantInfo, result2 error) {
|
||||
fake.getParticipantMutex.Lock()
|
||||
defer fake.getParticipantMutex.Unlock()
|
||||
fake.GetParticipantStub = nil
|
||||
fake.getParticipantReturns = struct {
|
||||
result1 *livekit.ParticipantInfo
|
||||
result2 error
|
||||
}{result1, result2}
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) GetParticipantReturnsOnCall(i int, result1 *livekit.ParticipantInfo, result2 error) {
|
||||
fake.getParticipantMutex.Lock()
|
||||
defer fake.getParticipantMutex.Unlock()
|
||||
fake.GetParticipantStub = nil
|
||||
if fake.getParticipantReturnsOnCall == nil {
|
||||
fake.getParticipantReturnsOnCall = make(map[int]struct {
|
||||
result1 *livekit.ParticipantInfo
|
||||
result2 error
|
||||
})
|
||||
}
|
||||
fake.getParticipantReturnsOnCall[i] = struct {
|
||||
result1 *livekit.ParticipantInfo
|
||||
result2 error
|
||||
}{result1, result2}
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) GetRoom(arg1 string) (*livekit.Room, error) {
|
||||
fake.getRoomMutex.Lock()
|
||||
ret, specificReturn := fake.getRoomReturnsOnCall[len(fake.getRoomArgsForCall)]
|
||||
fake.getRoomArgsForCall = append(fake.getRoomArgsForCall, struct {
|
||||
arg1 string
|
||||
}{arg1})
|
||||
stub := fake.GetRoomStub
|
||||
fakeReturns := fake.getRoomReturns
|
||||
fake.recordInvocation("GetRoom", []interface{}{arg1})
|
||||
fake.getRoomMutex.Unlock()
|
||||
if stub != nil {
|
||||
return stub(arg1)
|
||||
}
|
||||
if specificReturn {
|
||||
return ret.result1, ret.result2
|
||||
}
|
||||
return fakeReturns.result1, fakeReturns.result2
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) GetRoomCallCount() int {
|
||||
fake.getRoomMutex.RLock()
|
||||
defer fake.getRoomMutex.RUnlock()
|
||||
return len(fake.getRoomArgsForCall)
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) GetRoomCalls(stub func(string) (*livekit.Room, error)) {
|
||||
fake.getRoomMutex.Lock()
|
||||
defer fake.getRoomMutex.Unlock()
|
||||
fake.GetRoomStub = stub
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) GetRoomArgsForCall(i int) string {
|
||||
fake.getRoomMutex.RLock()
|
||||
defer fake.getRoomMutex.RUnlock()
|
||||
argsForCall := fake.getRoomArgsForCall[i]
|
||||
return argsForCall.arg1
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) GetRoomReturns(result1 *livekit.Room, result2 error) {
|
||||
fake.getRoomMutex.Lock()
|
||||
defer fake.getRoomMutex.Unlock()
|
||||
fake.GetRoomStub = nil
|
||||
fake.getRoomReturns = struct {
|
||||
result1 *livekit.Room
|
||||
result2 error
|
||||
}{result1, result2}
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) GetRoomReturnsOnCall(i int, result1 *livekit.Room, result2 error) {
|
||||
fake.getRoomMutex.Lock()
|
||||
defer fake.getRoomMutex.Unlock()
|
||||
fake.GetRoomStub = nil
|
||||
if fake.getRoomReturnsOnCall == nil {
|
||||
fake.getRoomReturnsOnCall = make(map[int]struct {
|
||||
result1 *livekit.Room
|
||||
result2 error
|
||||
})
|
||||
}
|
||||
fake.getRoomReturnsOnCall[i] = struct {
|
||||
result1 *livekit.Room
|
||||
result2 error
|
||||
}{result1, result2}
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) ListParticipants(arg1 string) ([]*livekit.ParticipantInfo, error) {
|
||||
fake.listParticipantsMutex.Lock()
|
||||
ret, specificReturn := fake.listParticipantsReturnsOnCall[len(fake.listParticipantsArgsForCall)]
|
||||
@@ -571,6 +381,135 @@ func (fake *FakeRoomStore) ListRoomsReturnsOnCall(i int, result1 []*livekit.Room
|
||||
}{result1, result2}
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) LoadParticipant(arg1 string, arg2 string) (*livekit.ParticipantInfo, error) {
|
||||
fake.loadParticipantMutex.Lock()
|
||||
ret, specificReturn := fake.loadParticipantReturnsOnCall[len(fake.loadParticipantArgsForCall)]
|
||||
fake.loadParticipantArgsForCall = append(fake.loadParticipantArgsForCall, struct {
|
||||
arg1 string
|
||||
arg2 string
|
||||
}{arg1, arg2})
|
||||
stub := fake.LoadParticipantStub
|
||||
fakeReturns := fake.loadParticipantReturns
|
||||
fake.recordInvocation("LoadParticipant", []interface{}{arg1, arg2})
|
||||
fake.loadParticipantMutex.Unlock()
|
||||
if stub != nil {
|
||||
return stub(arg1, arg2)
|
||||
}
|
||||
if specificReturn {
|
||||
return ret.result1, ret.result2
|
||||
}
|
||||
return fakeReturns.result1, fakeReturns.result2
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) LoadParticipantCallCount() int {
|
||||
fake.loadParticipantMutex.RLock()
|
||||
defer fake.loadParticipantMutex.RUnlock()
|
||||
return len(fake.loadParticipantArgsForCall)
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) LoadParticipantCalls(stub func(string, string) (*livekit.ParticipantInfo, error)) {
|
||||
fake.loadParticipantMutex.Lock()
|
||||
defer fake.loadParticipantMutex.Unlock()
|
||||
fake.LoadParticipantStub = stub
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) LoadParticipantArgsForCall(i int) (string, string) {
|
||||
fake.loadParticipantMutex.RLock()
|
||||
defer fake.loadParticipantMutex.RUnlock()
|
||||
argsForCall := fake.loadParticipantArgsForCall[i]
|
||||
return argsForCall.arg1, argsForCall.arg2
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) LoadParticipantReturns(result1 *livekit.ParticipantInfo, result2 error) {
|
||||
fake.loadParticipantMutex.Lock()
|
||||
defer fake.loadParticipantMutex.Unlock()
|
||||
fake.LoadParticipantStub = nil
|
||||
fake.loadParticipantReturns = struct {
|
||||
result1 *livekit.ParticipantInfo
|
||||
result2 error
|
||||
}{result1, result2}
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) LoadParticipantReturnsOnCall(i int, result1 *livekit.ParticipantInfo, result2 error) {
|
||||
fake.loadParticipantMutex.Lock()
|
||||
defer fake.loadParticipantMutex.Unlock()
|
||||
fake.LoadParticipantStub = nil
|
||||
if fake.loadParticipantReturnsOnCall == nil {
|
||||
fake.loadParticipantReturnsOnCall = make(map[int]struct {
|
||||
result1 *livekit.ParticipantInfo
|
||||
result2 error
|
||||
})
|
||||
}
|
||||
fake.loadParticipantReturnsOnCall[i] = struct {
|
||||
result1 *livekit.ParticipantInfo
|
||||
result2 error
|
||||
}{result1, result2}
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) LoadRoom(arg1 string) (*livekit.Room, error) {
|
||||
fake.loadRoomMutex.Lock()
|
||||
ret, specificReturn := fake.loadRoomReturnsOnCall[len(fake.loadRoomArgsForCall)]
|
||||
fake.loadRoomArgsForCall = append(fake.loadRoomArgsForCall, struct {
|
||||
arg1 string
|
||||
}{arg1})
|
||||
stub := fake.LoadRoomStub
|
||||
fakeReturns := fake.loadRoomReturns
|
||||
fake.recordInvocation("LoadRoom", []interface{}{arg1})
|
||||
fake.loadRoomMutex.Unlock()
|
||||
if stub != nil {
|
||||
return stub(arg1)
|
||||
}
|
||||
if specificReturn {
|
||||
return ret.result1, ret.result2
|
||||
}
|
||||
return fakeReturns.result1, fakeReturns.result2
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) LoadRoomCallCount() int {
|
||||
fake.loadRoomMutex.RLock()
|
||||
defer fake.loadRoomMutex.RUnlock()
|
||||
return len(fake.loadRoomArgsForCall)
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) LoadRoomCalls(stub func(string) (*livekit.Room, error)) {
|
||||
fake.loadRoomMutex.Lock()
|
||||
defer fake.loadRoomMutex.Unlock()
|
||||
fake.LoadRoomStub = stub
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) LoadRoomArgsForCall(i int) string {
|
||||
fake.loadRoomMutex.RLock()
|
||||
defer fake.loadRoomMutex.RUnlock()
|
||||
argsForCall := fake.loadRoomArgsForCall[i]
|
||||
return argsForCall.arg1
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) LoadRoomReturns(result1 *livekit.Room, result2 error) {
|
||||
fake.loadRoomMutex.Lock()
|
||||
defer fake.loadRoomMutex.Unlock()
|
||||
fake.LoadRoomStub = nil
|
||||
fake.loadRoomReturns = struct {
|
||||
result1 *livekit.Room
|
||||
result2 error
|
||||
}{result1, result2}
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) LoadRoomReturnsOnCall(i int, result1 *livekit.Room, result2 error) {
|
||||
fake.loadRoomMutex.Lock()
|
||||
defer fake.loadRoomMutex.Unlock()
|
||||
fake.LoadRoomStub = nil
|
||||
if fake.loadRoomReturnsOnCall == nil {
|
||||
fake.loadRoomReturnsOnCall = make(map[int]struct {
|
||||
result1 *livekit.Room
|
||||
result2 error
|
||||
})
|
||||
}
|
||||
fake.loadRoomReturnsOnCall[i] = struct {
|
||||
result1 *livekit.Room
|
||||
result2 error
|
||||
}{result1, result2}
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) LockRoom(arg1 string, arg2 time.Duration) (string, error) {
|
||||
fake.lockRoomMutex.Lock()
|
||||
ret, specificReturn := fake.lockRoomReturnsOnCall[len(fake.lockRoomArgsForCall)]
|
||||
@@ -636,17 +575,17 @@ func (fake *FakeRoomStore) LockRoomReturnsOnCall(i int, result1 string, result2
|
||||
}{result1, result2}
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) PersistParticipant(arg1 string, arg2 *livekit.ParticipantInfo) error {
|
||||
fake.persistParticipantMutex.Lock()
|
||||
ret, specificReturn := fake.persistParticipantReturnsOnCall[len(fake.persistParticipantArgsForCall)]
|
||||
fake.persistParticipantArgsForCall = append(fake.persistParticipantArgsForCall, struct {
|
||||
func (fake *FakeRoomStore) StoreParticipant(arg1 string, arg2 *livekit.ParticipantInfo) error {
|
||||
fake.storeParticipantMutex.Lock()
|
||||
ret, specificReturn := fake.storeParticipantReturnsOnCall[len(fake.storeParticipantArgsForCall)]
|
||||
fake.storeParticipantArgsForCall = append(fake.storeParticipantArgsForCall, struct {
|
||||
arg1 string
|
||||
arg2 *livekit.ParticipantInfo
|
||||
}{arg1, arg2})
|
||||
stub := fake.PersistParticipantStub
|
||||
fakeReturns := fake.persistParticipantReturns
|
||||
fake.recordInvocation("PersistParticipant", []interface{}{arg1, arg2})
|
||||
fake.persistParticipantMutex.Unlock()
|
||||
stub := fake.StoreParticipantStub
|
||||
fakeReturns := fake.storeParticipantReturns
|
||||
fake.recordInvocation("StoreParticipant", []interface{}{arg1, arg2})
|
||||
fake.storeParticipantMutex.Unlock()
|
||||
if stub != nil {
|
||||
return stub(arg1, arg2)
|
||||
}
|
||||
@@ -656,44 +595,105 @@ func (fake *FakeRoomStore) PersistParticipant(arg1 string, arg2 *livekit.Partici
|
||||
return fakeReturns.result1
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) PersistParticipantCallCount() int {
|
||||
fake.persistParticipantMutex.RLock()
|
||||
defer fake.persistParticipantMutex.RUnlock()
|
||||
return len(fake.persistParticipantArgsForCall)
|
||||
func (fake *FakeRoomStore) StoreParticipantCallCount() int {
|
||||
fake.storeParticipantMutex.RLock()
|
||||
defer fake.storeParticipantMutex.RUnlock()
|
||||
return len(fake.storeParticipantArgsForCall)
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) PersistParticipantCalls(stub func(string, *livekit.ParticipantInfo) error) {
|
||||
fake.persistParticipantMutex.Lock()
|
||||
defer fake.persistParticipantMutex.Unlock()
|
||||
fake.PersistParticipantStub = stub
|
||||
func (fake *FakeRoomStore) StoreParticipantCalls(stub func(string, *livekit.ParticipantInfo) error) {
|
||||
fake.storeParticipantMutex.Lock()
|
||||
defer fake.storeParticipantMutex.Unlock()
|
||||
fake.StoreParticipantStub = stub
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) PersistParticipantArgsForCall(i int) (string, *livekit.ParticipantInfo) {
|
||||
fake.persistParticipantMutex.RLock()
|
||||
defer fake.persistParticipantMutex.RUnlock()
|
||||
argsForCall := fake.persistParticipantArgsForCall[i]
|
||||
func (fake *FakeRoomStore) StoreParticipantArgsForCall(i int) (string, *livekit.ParticipantInfo) {
|
||||
fake.storeParticipantMutex.RLock()
|
||||
defer fake.storeParticipantMutex.RUnlock()
|
||||
argsForCall := fake.storeParticipantArgsForCall[i]
|
||||
return argsForCall.arg1, argsForCall.arg2
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) PersistParticipantReturns(result1 error) {
|
||||
fake.persistParticipantMutex.Lock()
|
||||
defer fake.persistParticipantMutex.Unlock()
|
||||
fake.PersistParticipantStub = nil
|
||||
fake.persistParticipantReturns = struct {
|
||||
func (fake *FakeRoomStore) StoreParticipantReturns(result1 error) {
|
||||
fake.storeParticipantMutex.Lock()
|
||||
defer fake.storeParticipantMutex.Unlock()
|
||||
fake.StoreParticipantStub = nil
|
||||
fake.storeParticipantReturns = struct {
|
||||
result1 error
|
||||
}{result1}
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) PersistParticipantReturnsOnCall(i int, result1 error) {
|
||||
fake.persistParticipantMutex.Lock()
|
||||
defer fake.persistParticipantMutex.Unlock()
|
||||
fake.PersistParticipantStub = nil
|
||||
if fake.persistParticipantReturnsOnCall == nil {
|
||||
fake.persistParticipantReturnsOnCall = make(map[int]struct {
|
||||
func (fake *FakeRoomStore) StoreParticipantReturnsOnCall(i int, result1 error) {
|
||||
fake.storeParticipantMutex.Lock()
|
||||
defer fake.storeParticipantMutex.Unlock()
|
||||
fake.StoreParticipantStub = nil
|
||||
if fake.storeParticipantReturnsOnCall == nil {
|
||||
fake.storeParticipantReturnsOnCall = make(map[int]struct {
|
||||
result1 error
|
||||
})
|
||||
}
|
||||
fake.persistParticipantReturnsOnCall[i] = struct {
|
||||
fake.storeParticipantReturnsOnCall[i] = struct {
|
||||
result1 error
|
||||
}{result1}
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) StoreRoom(arg1 *livekit.Room) error {
|
||||
fake.storeRoomMutex.Lock()
|
||||
ret, specificReturn := fake.storeRoomReturnsOnCall[len(fake.storeRoomArgsForCall)]
|
||||
fake.storeRoomArgsForCall = append(fake.storeRoomArgsForCall, struct {
|
||||
arg1 *livekit.Room
|
||||
}{arg1})
|
||||
stub := fake.StoreRoomStub
|
||||
fakeReturns := fake.storeRoomReturns
|
||||
fake.recordInvocation("StoreRoom", []interface{}{arg1})
|
||||
fake.storeRoomMutex.Unlock()
|
||||
if stub != nil {
|
||||
return stub(arg1)
|
||||
}
|
||||
if specificReturn {
|
||||
return ret.result1
|
||||
}
|
||||
return fakeReturns.result1
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) StoreRoomCallCount() int {
|
||||
fake.storeRoomMutex.RLock()
|
||||
defer fake.storeRoomMutex.RUnlock()
|
||||
return len(fake.storeRoomArgsForCall)
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) StoreRoomCalls(stub func(*livekit.Room) error) {
|
||||
fake.storeRoomMutex.Lock()
|
||||
defer fake.storeRoomMutex.Unlock()
|
||||
fake.StoreRoomStub = stub
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) StoreRoomArgsForCall(i int) *livekit.Room {
|
||||
fake.storeRoomMutex.RLock()
|
||||
defer fake.storeRoomMutex.RUnlock()
|
||||
argsForCall := fake.storeRoomArgsForCall[i]
|
||||
return argsForCall.arg1
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) StoreRoomReturns(result1 error) {
|
||||
fake.storeRoomMutex.Lock()
|
||||
defer fake.storeRoomMutex.Unlock()
|
||||
fake.StoreRoomStub = nil
|
||||
fake.storeRoomReturns = struct {
|
||||
result1 error
|
||||
}{result1}
|
||||
}
|
||||
|
||||
func (fake *FakeRoomStore) StoreRoomReturnsOnCall(i int, result1 error) {
|
||||
fake.storeRoomMutex.Lock()
|
||||
defer fake.storeRoomMutex.Unlock()
|
||||
fake.StoreRoomStub = nil
|
||||
if fake.storeRoomReturnsOnCall == nil {
|
||||
fake.storeRoomReturnsOnCall = make(map[int]struct {
|
||||
result1 error
|
||||
})
|
||||
}
|
||||
fake.storeRoomReturnsOnCall[i] = struct {
|
||||
result1 error
|
||||
}{result1}
|
||||
}
|
||||
@@ -763,24 +763,24 @@ func (fake *FakeRoomStore) UnlockRoomReturnsOnCall(i int, result1 error) {
|
||||
func (fake *FakeRoomStore) Invocations() map[string][][]interface{} {
|
||||
fake.invocationsMutex.RLock()
|
||||
defer fake.invocationsMutex.RUnlock()
|
||||
fake.createRoomMutex.RLock()
|
||||
defer fake.createRoomMutex.RUnlock()
|
||||
fake.deleteParticipantMutex.RLock()
|
||||
defer fake.deleteParticipantMutex.RUnlock()
|
||||
fake.deleteRoomMutex.RLock()
|
||||
defer fake.deleteRoomMutex.RUnlock()
|
||||
fake.getParticipantMutex.RLock()
|
||||
defer fake.getParticipantMutex.RUnlock()
|
||||
fake.getRoomMutex.RLock()
|
||||
defer fake.getRoomMutex.RUnlock()
|
||||
fake.listParticipantsMutex.RLock()
|
||||
defer fake.listParticipantsMutex.RUnlock()
|
||||
fake.listRoomsMutex.RLock()
|
||||
defer fake.listRoomsMutex.RUnlock()
|
||||
fake.loadParticipantMutex.RLock()
|
||||
defer fake.loadParticipantMutex.RUnlock()
|
||||
fake.loadRoomMutex.RLock()
|
||||
defer fake.loadRoomMutex.RUnlock()
|
||||
fake.lockRoomMutex.RLock()
|
||||
defer fake.lockRoomMutex.RUnlock()
|
||||
fake.persistParticipantMutex.RLock()
|
||||
defer fake.persistParticipantMutex.RUnlock()
|
||||
fake.storeParticipantMutex.RLock()
|
||||
defer fake.storeParticipantMutex.RUnlock()
|
||||
fake.storeRoomMutex.RLock()
|
||||
defer fake.storeRoomMutex.RUnlock()
|
||||
fake.unlockRoomMutex.RLock()
|
||||
defer fake.unlockRoomMutex.RUnlock()
|
||||
copiedInvocations := map[string][][]interface{}{}
|
||||
|
||||
+2
-2
@@ -96,11 +96,11 @@ func NewTurnServer(conf *config.Config, roomStore RoomStore, node routing.LocalN
|
||||
func newTurnAuthHandler(roomStore RoomStore) turn.AuthHandler {
|
||||
return func(username, realm string, srcAddr net.Addr) (key []byte, ok bool) {
|
||||
// room id should be the username, create a hashed room id
|
||||
rm, err := roomStore.GetRoom(username)
|
||||
rm, err := roomStore.LoadRoom(username)
|
||||
if err != nil {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
return turn.GenerateAuthKey(username, livekitRealm, rm.TurnPassword), true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,9 +27,10 @@ var ServiceSet = wire.NewSet(
|
||||
NewRoomService,
|
||||
NewRTCService,
|
||||
NewLivekitServer,
|
||||
NewRoomManager,
|
||||
NewLocalRoomManager,
|
||||
NewTurnServer,
|
||||
config.GetAudioConfig,
|
||||
wire.Bind(new(RoomManager), new(*LocalRoomManager)),
|
||||
wire.Bind(new(livekit.RecordingService), new(*RecordingService)),
|
||||
wire.Bind(new(livekit.RoomService), new(*RoomService)),
|
||||
)
|
||||
|
||||
@@ -25,21 +25,21 @@ func InitializeServer(conf *config.Config, keyProvider auth.KeyProvider, current
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
roomManager, err := NewRoomManager(roomStore, router, currentNode, nodeSelector, notifier, conf)
|
||||
localRoomManager, err := NewLocalRoomManager(roomStore, router, currentNode, nodeSelector, notifier, conf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
roomService, err := NewRoomService(roomManager)
|
||||
roomService, err := NewRoomService(localRoomManager, router)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
recordingService := NewRecordingService(client)
|
||||
rtcService := NewRTCService(conf, roomManager, router, currentNode)
|
||||
rtcService := NewRTCService(conf, localRoomManager, router, currentNode)
|
||||
server, err := NewTurnServer(conf, roomStore, currentNode)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
livekitServer, err := NewLivekitServer(conf, roomService, recordingService, rtcService, keyProvider, router, roomManager, server, currentNode)
|
||||
livekitServer, err := NewLivekitServer(conf, roomService, recordingService, rtcService, keyProvider, router, localRoomManager, server, currentNode)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user