diff --git a/pkg/service/agent_dispatch_service.go b/pkg/service/agent_dispatch_service.go index c5cfe0bbf..17c321ae9 100644 --- a/pkg/service/agent_dispatch_service.go +++ b/pkg/service/agent_dispatch_service.go @@ -17,6 +17,7 @@ package service import ( "context" + "github.com/livekit/livekit-server/pkg/routing" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/rpc" "github.com/livekit/protocol/utils/guid" @@ -25,12 +26,21 @@ import ( type AgentDispatchService struct { agentDispatchClient rpc.TypedAgentDispatchInternalClient topicFormatter rpc.TopicFormatter + roomAllocator RoomAllocator + router routing.MessageRouter } -func NewAgentDispatchService(agentDispatchClient rpc.TypedAgentDispatchInternalClient, topicFormatter rpc.TopicFormatter) *AgentDispatchService { +func NewAgentDispatchService( + agentDispatchClient rpc.TypedAgentDispatchInternalClient, + topicFormatter rpc.TopicFormatter, + roomAllocator RoomAllocator, + router routing.MessageRouter, +) *AgentDispatchService { return &AgentDispatchService{ agentDispatchClient: agentDispatchClient, topicFormatter: topicFormatter, + roomAllocator: roomAllocator, + router: router, } } @@ -40,6 +50,14 @@ func (ag *AgentDispatchService) CreateDispatch(ctx context.Context, req *livekit return nil, twirpAuthError(err) } + if ag.roomAllocator.AutoCreateEnabled(ctx) { + // ensure at least one node is available to handle the request + _, err = ag.router.CreateRoom(ctx, &livekit.CreateRoomRequest{Name: req.Room}) + if err != nil { + return nil, err + } + } + dispatch := &livekit.AgentDispatch{ Id: guid.New(guid.AgentDispatchPrefix), AgentName: req.AgentName, diff --git a/pkg/service/interfaces.go b/pkg/service/interfaces.go index 5b0ee26d7..890820c56 100644 --- a/pkg/service/interfaces.go +++ b/pkg/service/interfaces.go @@ -73,7 +73,7 @@ type IngressStore interface { //counterfeiter:generate . RoomAllocator type RoomAllocator interface { - CreateRoomEnabled() bool + AutoCreateEnabled(ctx context.Context) bool SelectRoomNode(ctx context.Context, roomName livekit.RoomName, nodeID livekit.NodeID) error CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest, isExplicit bool) (*livekit.Room, *livekit.RoomInternal, bool, error) ValidateCreateRoom(ctx context.Context, roomName livekit.RoomName) error diff --git a/pkg/service/roomallocator.go b/pkg/service/roomallocator.go index e8edea3dd..a5b0e5118 100644 --- a/pkg/service/roomallocator.go +++ b/pkg/service/roomallocator.go @@ -51,8 +51,8 @@ func NewRoomAllocator(conf *config.Config, router routing.Router, rs ObjectStore }, nil } -func (r *StandardRoomAllocator) CreateRoomEnabled() bool { - return r.config.Room.CreateRoomEnabled +func (r *StandardRoomAllocator) AutoCreateEnabled(context.Context) bool { + return r.config.Room.AutoCreate } // CreateRoom creates a new room from a request and allocates it to a node to handle diff --git a/pkg/service/roomservice.go b/pkg/service/roomservice.go index bd76e5ca3..6745a333d 100644 --- a/pkg/service/roomservice.go +++ b/pkg/service/roomservice.go @@ -19,7 +19,6 @@ import ( "fmt" "strconv" - "github.com/pkg/errors" "github.com/twitchtv/twirp" "github.com/livekit/livekit-server/pkg/config" @@ -81,48 +80,12 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq return nil, fmt.Errorf("%w: max length %d", ErrRoomNameExceedsLimits, s.limitConf.MaxRoomNameLength) } - if s.roomAllocator.CreateRoomEnabled() { - err := s.roomAllocator.SelectRoomNode(ctx, livekit.RoomName(req.Name), livekit.NodeID(req.NodeId)) - if err != nil { - return nil, err - } - - return s.router.CreateRoom(ctx, req) - } - - rm, _, created, err := s.roomAllocator.CreateRoom(ctx, req, true) - if err != nil { - err = errors.Wrap(err, "could not create room") - return nil, err - } - err = s.roomAllocator.SelectRoomNode(ctx, livekit.RoomName(req.Name), livekit.NodeID(req.NodeId)) + err := s.roomAllocator.SelectRoomNode(ctx, livekit.RoomName(req.Name), livekit.NodeID(req.NodeId)) if err != nil { return nil, err } - done, err := s.startRoom(ctx, livekit.RoomName(req.Name)) - if err != nil { - return nil, err - } - defer done() - - if created { - if req.Egress != nil && req.Egress.Room != nil { - // ensure room name matches - req.Egress.Room.RoomName = req.Name - _, err = s.egressLauncher.StartEgress(ctx, &rpc.StartEgressRequest{ - Request: &rpc.StartEgressRequest_RoomComposite{ - RoomComposite: req.Egress.Room, - }, - RoomId: rm.Sid, - }) - if err != nil { - return nil, err - } - } - } - - return rm, nil + return s.router.CreateRoom(ctx, req) } func (s *RoomService) ListRooms(ctx context.Context, req *livekit.ListRoomsRequest) (*livekit.ListRoomsResponse, error) { @@ -159,17 +122,10 @@ func (s *RoomService) DeleteRoom(ctx context.Context, req *livekit.DeleteRoomReq return nil, err } - if s.roomAllocator.CreateRoomEnabled() { - _, err := s.router.CreateRoom(ctx, &livekit.CreateRoomRequest{Name: req.Room}) - if err != nil { - return nil, err - } - } else { - done, err := s.startRoom(ctx, livekit.RoomName(req.Room)) - if err != nil { - return nil, err - } - defer done() + // ensure at least one node is available to handle the request + _, err = s.router.CreateRoom(ctx, &livekit.CreateRoomRequest{Name: req.Room}) + if err != nil { + return nil, err } _, err = s.roomClient.DeleteRoom(ctx, s.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), req) @@ -305,18 +261,6 @@ func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat return room, nil } -// startRoom starts the room on an RTC node, to ensure metadata & empty timeout functionality -func (s *RoomService) startRoom(ctx context.Context, roomName livekit.RoomName) (func(), error) { - res, err := s.router.StartParticipantSignal(ctx, roomName, routing.ParticipantInit{}) - if err != nil { - return nil, err - } - return func() { - res.RequestSink.Close() - res.ResponseSource.Close() - }, nil -} - func redactCreateRoomRequest(req *livekit.CreateRoomRequest) *livekit.CreateRoomRequest { if req.Egress == nil { // nothing to redact diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index 5a6dafc35..a532fb71c 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -524,13 +524,6 @@ func (s *RTCService) startConnection( var cr connectionResult var err error - if !s.roomAllocator.CreateRoomEnabled() { - cr.Room, _, _, err = s.roomAllocator.CreateRoom(ctx, pi.CreateRoom, false) - if err != nil { - return cr, nil, err - } - } - if err := s.roomAllocator.SelectRoomNode(ctx, roomName, ""); err != nil { return cr, nil, err } diff --git a/pkg/service/servicefakes/fake_room_allocator.go b/pkg/service/servicefakes/fake_room_allocator.go index 9d37110d2..9a60a47ee 100644 --- a/pkg/service/servicefakes/fake_room_allocator.go +++ b/pkg/service/servicefakes/fake_room_allocator.go @@ -29,14 +29,15 @@ type FakeRoomAllocator struct { result3 bool result4 error } - CreateRoomEnabledStub func() bool - createRoomEnabledMutex sync.RWMutex - createRoomEnabledArgsForCall []struct { + AutoCreateEnabledStub func(context.Context) bool + roomAutoCreateEnabledMutex sync.RWMutex + roomAutoCreateEnabledArgsForCall []struct { + arg1 context.Context } - createRoomEnabledReturns struct { + roomAutoCreateEnabledReturns struct { result1 bool } - createRoomEnabledReturnsOnCall map[int]struct { + roomAutoCreateEnabledReturnsOnCall map[int]struct { result1 bool } SelectRoomNodeStub func(context.Context, livekit.RoomName, livekit.NodeID) error @@ -140,17 +141,18 @@ func (fake *FakeRoomAllocator) CreateRoomReturnsOnCall(i int, result1 *livekit.R }{result1, result2, result3, result4} } -func (fake *FakeRoomAllocator) CreateRoomEnabled() bool { - fake.createRoomEnabledMutex.Lock() - ret, specificReturn := fake.createRoomEnabledReturnsOnCall[len(fake.createRoomEnabledArgsForCall)] - fake.createRoomEnabledArgsForCall = append(fake.createRoomEnabledArgsForCall, struct { - }{}) - stub := fake.CreateRoomEnabledStub - fakeReturns := fake.createRoomEnabledReturns - fake.recordInvocation("CreateRoomEnabled", []interface{}{}) - fake.createRoomEnabledMutex.Unlock() +func (fake *FakeRoomAllocator) AutoCreateEnabled(arg1 context.Context) bool { + fake.roomAutoCreateEnabledMutex.Lock() + ret, specificReturn := fake.roomAutoCreateEnabledReturnsOnCall[len(fake.roomAutoCreateEnabledArgsForCall)] + fake.roomAutoCreateEnabledArgsForCall = append(fake.roomAutoCreateEnabledArgsForCall, struct { + arg1 context.Context + }{arg1}) + stub := fake.AutoCreateEnabledStub + fakeReturns := fake.roomAutoCreateEnabledReturns + fake.recordInvocation("AutoCreateEnabled", []interface{}{arg1}) + fake.roomAutoCreateEnabledMutex.Unlock() if stub != nil { - return stub() + return stub(arg1) } if specificReturn { return ret.result1 @@ -158,37 +160,44 @@ func (fake *FakeRoomAllocator) CreateRoomEnabled() bool { return fakeReturns.result1 } -func (fake *FakeRoomAllocator) CreateRoomEnabledCallCount() int { - fake.createRoomEnabledMutex.RLock() - defer fake.createRoomEnabledMutex.RUnlock() - return len(fake.createRoomEnabledArgsForCall) +func (fake *FakeRoomAllocator) AutoCreateEnabledCallCount() int { + fake.roomAutoCreateEnabledMutex.RLock() + defer fake.roomAutoCreateEnabledMutex.RUnlock() + return len(fake.roomAutoCreateEnabledArgsForCall) } -func (fake *FakeRoomAllocator) CreateRoomEnabledCalls(stub func() bool) { - fake.createRoomEnabledMutex.Lock() - defer fake.createRoomEnabledMutex.Unlock() - fake.CreateRoomEnabledStub = stub +func (fake *FakeRoomAllocator) AutoCreateEnabledCalls(stub func(context.Context) bool) { + fake.roomAutoCreateEnabledMutex.Lock() + defer fake.roomAutoCreateEnabledMutex.Unlock() + fake.AutoCreateEnabledStub = stub } -func (fake *FakeRoomAllocator) CreateRoomEnabledReturns(result1 bool) { - fake.createRoomEnabledMutex.Lock() - defer fake.createRoomEnabledMutex.Unlock() - fake.CreateRoomEnabledStub = nil - fake.createRoomEnabledReturns = struct { +func (fake *FakeRoomAllocator) AutoCreateEnabledArgsForCall(i int) context.Context { + fake.roomAutoCreateEnabledMutex.RLock() + defer fake.roomAutoCreateEnabledMutex.RUnlock() + argsForCall := fake.roomAutoCreateEnabledArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeRoomAllocator) AutoCreateEnabledReturns(result1 bool) { + fake.roomAutoCreateEnabledMutex.Lock() + defer fake.roomAutoCreateEnabledMutex.Unlock() + fake.AutoCreateEnabledStub = nil + fake.roomAutoCreateEnabledReturns = struct { result1 bool }{result1} } -func (fake *FakeRoomAllocator) CreateRoomEnabledReturnsOnCall(i int, result1 bool) { - fake.createRoomEnabledMutex.Lock() - defer fake.createRoomEnabledMutex.Unlock() - fake.CreateRoomEnabledStub = nil - if fake.createRoomEnabledReturnsOnCall == nil { - fake.createRoomEnabledReturnsOnCall = make(map[int]struct { +func (fake *FakeRoomAllocator) AutoCreateEnabledReturnsOnCall(i int, result1 bool) { + fake.roomAutoCreateEnabledMutex.Lock() + defer fake.roomAutoCreateEnabledMutex.Unlock() + fake.AutoCreateEnabledStub = nil + if fake.roomAutoCreateEnabledReturnsOnCall == nil { + fake.roomAutoCreateEnabledReturnsOnCall = make(map[int]struct { result1 bool }) } - fake.createRoomEnabledReturnsOnCall[i] = struct { + fake.roomAutoCreateEnabledReturnsOnCall[i] = struct { result1 bool }{result1} } @@ -323,8 +332,8 @@ func (fake *FakeRoomAllocator) Invocations() map[string][][]interface{} { defer fake.invocationsMutex.RUnlock() fake.createRoomMutex.RLock() defer fake.createRoomMutex.RUnlock() - fake.createRoomEnabledMutex.RLock() - defer fake.createRoomEnabledMutex.RUnlock() + fake.roomAutoCreateEnabledMutex.RLock() + defer fake.roomAutoCreateEnabledMutex.RUnlock() fake.selectRoomNodeMutex.RLock() defer fake.selectRoomNodeMutex.RUnlock() fake.validateCreateRoomMutex.RLock() diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 55b8db9ea..19c33c76a 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -105,7 +105,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live if err != nil { return nil, err } - agentDispatchService := NewAgentDispatchService(agentDispatchInternalClient, topicFormatter) + agentDispatchService := NewAgentDispatchService(agentDispatchInternalClient, topicFormatter, roomAllocator, router) egressService := NewEgressService(egressClient, rtcEgressLauncher, objectStore, ioInfoService, roomService) ingressConfig := getIngressConfig(conf) ingressClient, err := rpc.NewIngressClient(clientParams)