auto create rooms during create agent dispatch api request (#3158)

This commit is contained in:
Paul Wells
2024-11-05 16:15:05 -08:00
committed by GitHub
parent 365e63230d
commit 09f140afa8
7 changed files with 75 additions and 111 deletions

View File

@@ -17,6 +17,7 @@ package service
import (
"context"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/rpc"
"github.com/livekit/protocol/utils/guid"
@@ -25,12 +26,21 @@ import (
type AgentDispatchService struct {
agentDispatchClient rpc.TypedAgentDispatchInternalClient
topicFormatter rpc.TopicFormatter
roomAllocator RoomAllocator
router routing.MessageRouter
}
func NewAgentDispatchService(agentDispatchClient rpc.TypedAgentDispatchInternalClient, topicFormatter rpc.TopicFormatter) *AgentDispatchService {
func NewAgentDispatchService(
agentDispatchClient rpc.TypedAgentDispatchInternalClient,
topicFormatter rpc.TopicFormatter,
roomAllocator RoomAllocator,
router routing.MessageRouter,
) *AgentDispatchService {
return &AgentDispatchService{
agentDispatchClient: agentDispatchClient,
topicFormatter: topicFormatter,
roomAllocator: roomAllocator,
router: router,
}
}
@@ -40,6 +50,14 @@ func (ag *AgentDispatchService) CreateDispatch(ctx context.Context, req *livekit
return nil, twirpAuthError(err)
}
if ag.roomAllocator.AutoCreateEnabled(ctx) {
// ensure at least one node is available to handle the request
_, err = ag.router.CreateRoom(ctx, &livekit.CreateRoomRequest{Name: req.Room})
if err != nil {
return nil, err
}
}
dispatch := &livekit.AgentDispatch{
Id: guid.New(guid.AgentDispatchPrefix),
AgentName: req.AgentName,

View File

@@ -73,7 +73,7 @@ type IngressStore interface {
//counterfeiter:generate . RoomAllocator
type RoomAllocator interface {
CreateRoomEnabled() bool
AutoCreateEnabled(ctx context.Context) bool
SelectRoomNode(ctx context.Context, roomName livekit.RoomName, nodeID livekit.NodeID) error
CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest, isExplicit bool) (*livekit.Room, *livekit.RoomInternal, bool, error)
ValidateCreateRoom(ctx context.Context, roomName livekit.RoomName) error

View File

@@ -51,8 +51,8 @@ func NewRoomAllocator(conf *config.Config, router routing.Router, rs ObjectStore
}, nil
}
func (r *StandardRoomAllocator) CreateRoomEnabled() bool {
return r.config.Room.CreateRoomEnabled
func (r *StandardRoomAllocator) AutoCreateEnabled(context.Context) bool {
return r.config.Room.AutoCreate
}
// CreateRoom creates a new room from a request and allocates it to a node to handle

View File

@@ -19,7 +19,6 @@ import (
"fmt"
"strconv"
"github.com/pkg/errors"
"github.com/twitchtv/twirp"
"github.com/livekit/livekit-server/pkg/config"
@@ -81,48 +80,12 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq
return nil, fmt.Errorf("%w: max length %d", ErrRoomNameExceedsLimits, s.limitConf.MaxRoomNameLength)
}
if s.roomAllocator.CreateRoomEnabled() {
err := s.roomAllocator.SelectRoomNode(ctx, livekit.RoomName(req.Name), livekit.NodeID(req.NodeId))
if err != nil {
return nil, err
}
return s.router.CreateRoom(ctx, req)
}
rm, _, created, err := s.roomAllocator.CreateRoom(ctx, req, true)
if err != nil {
err = errors.Wrap(err, "could not create room")
return nil, err
}
err = s.roomAllocator.SelectRoomNode(ctx, livekit.RoomName(req.Name), livekit.NodeID(req.NodeId))
err := s.roomAllocator.SelectRoomNode(ctx, livekit.RoomName(req.Name), livekit.NodeID(req.NodeId))
if err != nil {
return nil, err
}
done, err := s.startRoom(ctx, livekit.RoomName(req.Name))
if err != nil {
return nil, err
}
defer done()
if created {
if req.Egress != nil && req.Egress.Room != nil {
// ensure room name matches
req.Egress.Room.RoomName = req.Name
_, err = s.egressLauncher.StartEgress(ctx, &rpc.StartEgressRequest{
Request: &rpc.StartEgressRequest_RoomComposite{
RoomComposite: req.Egress.Room,
},
RoomId: rm.Sid,
})
if err != nil {
return nil, err
}
}
}
return rm, nil
return s.router.CreateRoom(ctx, req)
}
func (s *RoomService) ListRooms(ctx context.Context, req *livekit.ListRoomsRequest) (*livekit.ListRoomsResponse, error) {
@@ -159,17 +122,10 @@ func (s *RoomService) DeleteRoom(ctx context.Context, req *livekit.DeleteRoomReq
return nil, err
}
if s.roomAllocator.CreateRoomEnabled() {
_, err := s.router.CreateRoom(ctx, &livekit.CreateRoomRequest{Name: req.Room})
if err != nil {
return nil, err
}
} else {
done, err := s.startRoom(ctx, livekit.RoomName(req.Room))
if err != nil {
return nil, err
}
defer done()
// ensure at least one node is available to handle the request
_, err = s.router.CreateRoom(ctx, &livekit.CreateRoomRequest{Name: req.Room})
if err != nil {
return nil, err
}
_, err = s.roomClient.DeleteRoom(ctx, s.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), req)
@@ -305,18 +261,6 @@ func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat
return room, nil
}
// startRoom starts the room on an RTC node, to ensure metadata & empty timeout functionality
func (s *RoomService) startRoom(ctx context.Context, roomName livekit.RoomName) (func(), error) {
res, err := s.router.StartParticipantSignal(ctx, roomName, routing.ParticipantInit{})
if err != nil {
return nil, err
}
return func() {
res.RequestSink.Close()
res.ResponseSource.Close()
}, nil
}
func redactCreateRoomRequest(req *livekit.CreateRoomRequest) *livekit.CreateRoomRequest {
if req.Egress == nil {
// nothing to redact

View File

@@ -524,13 +524,6 @@ func (s *RTCService) startConnection(
var cr connectionResult
var err error
if !s.roomAllocator.CreateRoomEnabled() {
cr.Room, _, _, err = s.roomAllocator.CreateRoom(ctx, pi.CreateRoom, false)
if err != nil {
return cr, nil, err
}
}
if err := s.roomAllocator.SelectRoomNode(ctx, roomName, ""); err != nil {
return cr, nil, err
}

View File

@@ -29,14 +29,15 @@ type FakeRoomAllocator struct {
result3 bool
result4 error
}
CreateRoomEnabledStub func() bool
createRoomEnabledMutex sync.RWMutex
createRoomEnabledArgsForCall []struct {
AutoCreateEnabledStub func(context.Context) bool
roomAutoCreateEnabledMutex sync.RWMutex
roomAutoCreateEnabledArgsForCall []struct {
arg1 context.Context
}
createRoomEnabledReturns struct {
roomAutoCreateEnabledReturns struct {
result1 bool
}
createRoomEnabledReturnsOnCall map[int]struct {
roomAutoCreateEnabledReturnsOnCall map[int]struct {
result1 bool
}
SelectRoomNodeStub func(context.Context, livekit.RoomName, livekit.NodeID) error
@@ -140,17 +141,18 @@ func (fake *FakeRoomAllocator) CreateRoomReturnsOnCall(i int, result1 *livekit.R
}{result1, result2, result3, result4}
}
func (fake *FakeRoomAllocator) CreateRoomEnabled() bool {
fake.createRoomEnabledMutex.Lock()
ret, specificReturn := fake.createRoomEnabledReturnsOnCall[len(fake.createRoomEnabledArgsForCall)]
fake.createRoomEnabledArgsForCall = append(fake.createRoomEnabledArgsForCall, struct {
}{})
stub := fake.CreateRoomEnabledStub
fakeReturns := fake.createRoomEnabledReturns
fake.recordInvocation("CreateRoomEnabled", []interface{}{})
fake.createRoomEnabledMutex.Unlock()
func (fake *FakeRoomAllocator) AutoCreateEnabled(arg1 context.Context) bool {
fake.roomAutoCreateEnabledMutex.Lock()
ret, specificReturn := fake.roomAutoCreateEnabledReturnsOnCall[len(fake.roomAutoCreateEnabledArgsForCall)]
fake.roomAutoCreateEnabledArgsForCall = append(fake.roomAutoCreateEnabledArgsForCall, struct {
arg1 context.Context
}{arg1})
stub := fake.AutoCreateEnabledStub
fakeReturns := fake.roomAutoCreateEnabledReturns
fake.recordInvocation("AutoCreateEnabled", []interface{}{arg1})
fake.roomAutoCreateEnabledMutex.Unlock()
if stub != nil {
return stub()
return stub(arg1)
}
if specificReturn {
return ret.result1
@@ -158,37 +160,44 @@ func (fake *FakeRoomAllocator) CreateRoomEnabled() bool {
return fakeReturns.result1
}
func (fake *FakeRoomAllocator) CreateRoomEnabledCallCount() int {
fake.createRoomEnabledMutex.RLock()
defer fake.createRoomEnabledMutex.RUnlock()
return len(fake.createRoomEnabledArgsForCall)
func (fake *FakeRoomAllocator) AutoCreateEnabledCallCount() int {
fake.roomAutoCreateEnabledMutex.RLock()
defer fake.roomAutoCreateEnabledMutex.RUnlock()
return len(fake.roomAutoCreateEnabledArgsForCall)
}
func (fake *FakeRoomAllocator) CreateRoomEnabledCalls(stub func() bool) {
fake.createRoomEnabledMutex.Lock()
defer fake.createRoomEnabledMutex.Unlock()
fake.CreateRoomEnabledStub = stub
func (fake *FakeRoomAllocator) AutoCreateEnabledCalls(stub func(context.Context) bool) {
fake.roomAutoCreateEnabledMutex.Lock()
defer fake.roomAutoCreateEnabledMutex.Unlock()
fake.AutoCreateEnabledStub = stub
}
func (fake *FakeRoomAllocator) CreateRoomEnabledReturns(result1 bool) {
fake.createRoomEnabledMutex.Lock()
defer fake.createRoomEnabledMutex.Unlock()
fake.CreateRoomEnabledStub = nil
fake.createRoomEnabledReturns = struct {
func (fake *FakeRoomAllocator) AutoCreateEnabledArgsForCall(i int) context.Context {
fake.roomAutoCreateEnabledMutex.RLock()
defer fake.roomAutoCreateEnabledMutex.RUnlock()
argsForCall := fake.roomAutoCreateEnabledArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeRoomAllocator) AutoCreateEnabledReturns(result1 bool) {
fake.roomAutoCreateEnabledMutex.Lock()
defer fake.roomAutoCreateEnabledMutex.Unlock()
fake.AutoCreateEnabledStub = nil
fake.roomAutoCreateEnabledReturns = struct {
result1 bool
}{result1}
}
func (fake *FakeRoomAllocator) CreateRoomEnabledReturnsOnCall(i int, result1 bool) {
fake.createRoomEnabledMutex.Lock()
defer fake.createRoomEnabledMutex.Unlock()
fake.CreateRoomEnabledStub = nil
if fake.createRoomEnabledReturnsOnCall == nil {
fake.createRoomEnabledReturnsOnCall = make(map[int]struct {
func (fake *FakeRoomAllocator) AutoCreateEnabledReturnsOnCall(i int, result1 bool) {
fake.roomAutoCreateEnabledMutex.Lock()
defer fake.roomAutoCreateEnabledMutex.Unlock()
fake.AutoCreateEnabledStub = nil
if fake.roomAutoCreateEnabledReturnsOnCall == nil {
fake.roomAutoCreateEnabledReturnsOnCall = make(map[int]struct {
result1 bool
})
}
fake.createRoomEnabledReturnsOnCall[i] = struct {
fake.roomAutoCreateEnabledReturnsOnCall[i] = struct {
result1 bool
}{result1}
}
@@ -323,8 +332,8 @@ func (fake *FakeRoomAllocator) Invocations() map[string][][]interface{} {
defer fake.invocationsMutex.RUnlock()
fake.createRoomMutex.RLock()
defer fake.createRoomMutex.RUnlock()
fake.createRoomEnabledMutex.RLock()
defer fake.createRoomEnabledMutex.RUnlock()
fake.roomAutoCreateEnabledMutex.RLock()
defer fake.roomAutoCreateEnabledMutex.RUnlock()
fake.selectRoomNodeMutex.RLock()
defer fake.selectRoomNodeMutex.RUnlock()
fake.validateCreateRoomMutex.RLock()

View File

@@ -105,7 +105,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
if err != nil {
return nil, err
}
agentDispatchService := NewAgentDispatchService(agentDispatchInternalClient, topicFormatter)
agentDispatchService := NewAgentDispatchService(agentDispatchInternalClient, topicFormatter, roomAllocator, router)
egressService := NewEgressService(egressClient, rtcEgressLauncher, objectStore, ioInfoService, roomService)
ingressConfig := getIngressConfig(conf)
ingressClient, err := rpc.NewIngressClient(clientParams)