diff --git a/go.mod b/go.mod index 82ff36230..2f415c8c7 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 - github.com/livekit/protocol v1.26.1-0.20241022031344-538889e5de0a + github.com/livekit/protocol v1.26.1-0.20241022053724-1eb56d424343 github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9 github.com/mackerelio/go-osstat v0.2.5 github.com/magefile/mage v1.15.0 @@ -56,6 +56,8 @@ require ( gopkg.in/yaml.v3 v3.0.1 ) +// replace github.com/livekit/protocol => ../protocol + require ( buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.33.0-20240401165935-b983156c5e99.1 // indirect dario.cat/mergo v1.0.0 // indirect diff --git a/go.sum b/go.sum index 9a3c3bb36..aad7823e7 100644 --- a/go.sum +++ b/go.sum @@ -165,8 +165,6 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 h1:yLlkHk2feSLHstD9n4VKg7YEBR4rLODTI4WE8gNBEnQ= github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE= -github.com/livekit/protocol v1.26.1-0.20241022031344-538889e5de0a h1:31YXXJLEwCflp7KEe9rRAwmONyCwHFujTl4MdxegTxw= -github.com/livekit/protocol v1.26.1-0.20241022031344-538889e5de0a/go.mod h1:nxRzmQBKSYK64gqr7ABWwt78hvrgiO2wYuCojRYb7Gs= github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9 h1:33oBjGpVD9tYkDXQU42tnHl8eCX9G6PVUToBVuCUyOs= github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o= diff --git a/pkg/config/config.go b/pkg/config/config.go index 8225d50ac..1fceaebc9 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -253,8 +253,8 @@ type RoomConfig struct { // deprecated, moved to limits MaxRoomNameLength int `yaml:"max_room_name_length,omitempty"` // deprecated, moved to limits - MaxParticipantIdentityLength int `yaml:"max_participant_identity_length,omitempty"` - RoomConfigurations map[string]livekit.RoomConfiguration `yaml:"room_configurations,omitempty"` + MaxParticipantIdentityLength int `yaml:"max_participant_identity_length,omitempty"` + RoomConfigurations map[string]*livekit.RoomConfiguration `yaml:"room_configurations,omitempty"` } type CodecSpec struct { diff --git a/pkg/routing/interfaces.go b/pkg/routing/interfaces.go index 6fa66f70b..e4492bcc8 100644 --- a/pkg/routing/interfaces.go +++ b/pkg/routing/interfaces.go @@ -169,6 +169,7 @@ func ParticipantInitFromStartSession(ss *livekit.StartSession, region string) (* AdaptiveStream: ss.AdaptiveStream, ID: livekit.ParticipantID(ss.ParticipantId), DisableICELite: ss.DisableIceLite, + CreateRoom: ss.CreateRoom, } if ss.SubscriberAllowPause != nil { subscriberAllowPause := *ss.SubscriberAllowPause diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 1995a3b91..c3d643f7a 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -1728,9 +1728,7 @@ func (r *Room) createAgentDispatchesFromRoomAgent() { roomDisp := r.internal.AgentDispatches if len(roomDisp) == 0 { // Backward compatibility: by default, start any agent in the empty JobName - roomDisp = []*livekit.RoomAgentDispatch{ - &livekit.RoomAgentDispatch{}, - } + roomDisp = []*livekit.RoomAgentDispatch{{}} } for _, ag := range roomDisp { diff --git a/pkg/service/auth.go b/pkg/service/auth.go index 188759fab..8edcf91c5 100644 --- a/pkg/service/auth.go +++ b/pkg/service/auth.go @@ -179,16 +179,6 @@ func EnsureCreatePermission(ctx context.Context) error { return nil } -func GetRoomConfiguration(ctx context.Context) string { - - claims := GetGrants(ctx) - - if claims == nil || claims.Video == nil { - return "" - } - return claims.Video.RoomConfiguration -} - func EnsureListPermission(ctx context.Context) error { claims := GetGrants(ctx) if claims == nil || claims.Video == nil || !claims.Video.RoomList { diff --git a/pkg/service/roomallocator.go b/pkg/service/roomallocator.go index dce8f1d2f..e8edea3dd 100644 --- a/pkg/service/roomallocator.go +++ b/pkg/service/roomallocator.go @@ -108,8 +108,8 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre internal.TrackEgress = req.Egress.Tracks } } - if req.Agent != nil { - internal.AgentDispatches = req.Agent.Dispatches + if req.Agents != nil { + internal.AgentDispatches = req.Agents } if req.MinPlayoutDelay > 0 || req.MaxPlayoutDelay > 0 { internal.PlayoutDelay = &livekit.PlayoutDelay{ @@ -200,11 +200,11 @@ func applyDefaultRoomConfig(room *livekit.Room, internal *livekit.RoomInternal, } func (r *StandardRoomAllocator) applyNamedRoomConfiguration(req *livekit.CreateRoomRequest) (*livekit.CreateRoomRequest, error) { - if req.ConfigName == "" { + if req.RoomPreset == "" { return req, nil } - conf, ok := r.config.Room.RoomConfigurations[req.ConfigName] + conf, ok := r.config.Room.RoomConfigurations[req.RoomPreset] if !ok { return req, psrpc.NewErrorf(psrpc.InvalidArgument, "unknown room confguration in create room request") } @@ -224,8 +224,11 @@ func (r *StandardRoomAllocator) applyNamedRoomConfiguration(req *livekit.CreateR if clone.Egress == nil { clone.Egress = utils.CloneProto(conf.Egress) } - if clone.Agent == nil { - clone.Agent = utils.CloneProto(conf.Agent) + if clone.Agents == nil { + clone.Agents = make([]*livekit.RoomAgentDispatch, 0, len(conf.Agents)) + for _, agent := range conf.Agents { + clone.Agents = append(clone.Agents, utils.CloneProto(agent)) + } } if clone.MinPlayoutDelay == 0 { clone.MinPlayoutDelay = conf.MinPlayoutDelay diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 5fd83b9d3..19b2b5737 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -278,13 +278,13 @@ func (r *RoomManager) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq // StartSession starts WebRTC session when a new participant is connected, takes place on RTC node func (r *RoomManager) StartSession( ctx context.Context, - createRoom *livekit.CreateRoomRequest, pi routing.ParticipantInit, requestSource routing.MessageSource, responseSink routing.MessageSink, ) error { sessionStartTime := time.Now() + createRoom := pi.CreateRoom room, err := r.getOrCreateRoom(ctx, createRoom) if err != nil { return err @@ -990,7 +990,9 @@ func (r *RoomManager) refreshToken(participant types.LocalParticipant) error { SetValidFor(tokenDefaultTTL). SetMetadata(grants.Metadata). SetAttributes(grants.Attributes). - AddGrant(grants.Video) + SetVideoGrant(grants.Video). + SetRoomConfig(grants.GetRoomConfiguration()). + SetRoomPreset(grants.RoomPreset) jwt, err := token.ToJWT() if err == nil { err = participant.SendRefreshToken(jwt) diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index 0d5f1bebb..5a6dafc35 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -166,6 +166,12 @@ func (s *RTCService) validate(r *http.Request) (livekit.RoomName, routing.Partic } } + createRequest := &livekit.CreateRoomRequest{ + Name: string(roomName), + RoomPreset: claims.RoomPreset, + } + SetRoomConfiguration(createRequest, claims.GetRoomConfiguration()) + pi = routing.ParticipantInit{ Reconnect: boolValue(reconnectParam), ReconnectReason: livekit.ReconnectReason(reconnectReason), @@ -175,10 +181,7 @@ func (s *RTCService) validate(r *http.Request) (livekit.RoomName, routing.Partic Client: s.ParseClientInfo(r), Grants: claims, Region: region, - CreateRoom: &livekit.CreateRoomRequest{ - Name: string(roomName), - ConfigName: GetRoomConfiguration(r.Context()), - }, + CreateRoom: createRequest, } if pi.Reconnect { pi.ID = livekit.ParticipantID(participantID) diff --git a/pkg/service/servicefakes/fake_session_handler.go b/pkg/service/servicefakes/fake_session_handler.go index 15d5a00d5..a31c69dbe 100644 --- a/pkg/service/servicefakes/fake_session_handler.go +++ b/pkg/service/servicefakes/fake_session_handler.go @@ -12,15 +12,14 @@ import ( ) type FakeSessionHandler struct { - HandleSessionStub func(context.Context, *livekit.CreateRoomRequest, routing.ParticipantInit, livekit.ConnectionID, routing.MessageSource, routing.MessageSink) error + HandleSessionStub func(context.Context, routing.ParticipantInit, livekit.ConnectionID, routing.MessageSource, routing.MessageSink) error handleSessionMutex sync.RWMutex handleSessionArgsForCall []struct { arg1 context.Context - arg2 *livekit.CreateRoomRequest - arg3 routing.ParticipantInit - arg4 livekit.ConnectionID - arg5 routing.MessageSource - arg6 routing.MessageSink + arg2 routing.ParticipantInit + arg3 livekit.ConnectionID + arg4 routing.MessageSource + arg5 routing.MessageSink } handleSessionReturns struct { result1 error @@ -43,23 +42,22 @@ type FakeSessionHandler struct { invocationsMutex sync.RWMutex } -func (fake *FakeSessionHandler) HandleSession(arg1 context.Context, arg2 *livekit.CreateRoomRequest, arg3 routing.ParticipantInit, arg4 livekit.ConnectionID, arg5 routing.MessageSource, arg6 routing.MessageSink) error { +func (fake *FakeSessionHandler) HandleSession(arg1 context.Context, arg2 routing.ParticipantInit, arg3 livekit.ConnectionID, arg4 routing.MessageSource, arg5 routing.MessageSink) error { fake.handleSessionMutex.Lock() ret, specificReturn := fake.handleSessionReturnsOnCall[len(fake.handleSessionArgsForCall)] fake.handleSessionArgsForCall = append(fake.handleSessionArgsForCall, struct { arg1 context.Context - arg2 *livekit.CreateRoomRequest - arg3 routing.ParticipantInit - arg4 livekit.ConnectionID - arg5 routing.MessageSource - arg6 routing.MessageSink - }{arg1, arg2, arg3, arg4, arg5, arg6}) + arg2 routing.ParticipantInit + arg3 livekit.ConnectionID + arg4 routing.MessageSource + arg5 routing.MessageSink + }{arg1, arg2, arg3, arg4, arg5}) stub := fake.HandleSessionStub fakeReturns := fake.handleSessionReturns - fake.recordInvocation("HandleSession", []interface{}{arg1, arg2, arg3, arg4, arg5, arg6}) + fake.recordInvocation("HandleSession", []interface{}{arg1, arg2, arg3, arg4, arg5}) fake.handleSessionMutex.Unlock() if stub != nil { - return stub(arg1, arg2, arg3, arg4, arg5, arg6) + return stub(arg1, arg2, arg3, arg4, arg5) } if specificReturn { return ret.result1 @@ -73,17 +71,17 @@ func (fake *FakeSessionHandler) HandleSessionCallCount() int { return len(fake.handleSessionArgsForCall) } -func (fake *FakeSessionHandler) HandleSessionCalls(stub func(context.Context, *livekit.CreateRoomRequest, routing.ParticipantInit, livekit.ConnectionID, routing.MessageSource, routing.MessageSink) error) { +func (fake *FakeSessionHandler) HandleSessionCalls(stub func(context.Context, routing.ParticipantInit, livekit.ConnectionID, routing.MessageSource, routing.MessageSink) error) { fake.handleSessionMutex.Lock() defer fake.handleSessionMutex.Unlock() fake.HandleSessionStub = stub } -func (fake *FakeSessionHandler) HandleSessionArgsForCall(i int) (context.Context, *livekit.CreateRoomRequest, routing.ParticipantInit, livekit.ConnectionID, routing.MessageSource, routing.MessageSink) { +func (fake *FakeSessionHandler) HandleSessionArgsForCall(i int) (context.Context, routing.ParticipantInit, livekit.ConnectionID, routing.MessageSource, routing.MessageSink) { fake.handleSessionMutex.RLock() defer fake.handleSessionMutex.RUnlock() argsForCall := fake.handleSessionArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4, argsForCall.arg5, argsForCall.arg6 + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4, argsForCall.arg5 } func (fake *FakeSessionHandler) HandleSessionReturns(result1 error) { diff --git a/pkg/service/signal.go b/pkg/service/signal.go index 6d5523944..08d5d0a78 100644 --- a/pkg/service/signal.go +++ b/pkg/service/signal.go @@ -39,7 +39,6 @@ type SessionHandler interface { HandleSession( ctx context.Context, - createRoom *livekit.CreateRoomRequest, pi routing.ParticipantInit, connectionID livekit.ConnectionID, requestSource routing.MessageSource, @@ -94,7 +93,6 @@ func (s *defaultSessionHandler) Logger(ctx context.Context) logger.Logger { func (s *defaultSessionHandler) HandleSession( ctx context.Context, - createRoom *livekit.CreateRoomRequest, pi routing.ParticipantInit, connectionID livekit.ConnectionID, requestSource routing.MessageSource, @@ -102,7 +100,7 @@ func (s *defaultSessionHandler) HandleSession( ) error { prometheus.IncrementParticipantRtcInit(1) - rtcNode, err := s.router.GetNodeForRoom(ctx, livekit.RoomName(createRoom.Name)) + rtcNode, err := s.router.GetNodeForRoom(ctx, livekit.RoomName(pi.CreateRoom.Name)) if err != nil { return err } @@ -115,7 +113,7 @@ func (s *defaultSessionHandler) HandleSession( return err } - return s.roomManager.StartSession(ctx, createRoom, pi, requestSource, responseSink) + return s.roomManager.StartSession(ctx, pi, requestSource, responseSink) } func (s *SignalServer) Start() error { @@ -181,19 +179,7 @@ func (r *signalService) RelaySignal(stream psrpc.ServerStream[*rpc.RelaySignalRe // and the delivery of any parting messages from the client. take care to // copy the incoming rpc headers to avoid dropping any session vars. ctx := metadata.NewContextWithIncomingHeader(context.Background(), metadata.IncomingHeader(stream.Context())) - - createRoom := ss.CreateRoom - if createRoom == nil { - createRoom = &livekit.CreateRoomRequest{ - Name: ss.RoomName, - } - - if pi.Grants != nil && pi.Grants.Video != nil { - createRoom.ConfigName = pi.Grants.Video.RoomConfiguration - } - } - - err = r.sessionHandler.HandleSession(ctx, createRoom, *pi, livekit.ConnectionID(ss.ConnectionId), reqChan, sink) + err = r.sessionHandler.HandleSession(ctx, *pi, livekit.ConnectionID(ss.ConnectionId), reqChan, sink) if err != nil { sink.Close() l.Errorw("could not handle new participant", err) diff --git a/pkg/service/signal_test.go b/pkg/service/signal_test.go index 1e3ac03f2..81755cdeb 100644 --- a/pkg/service/signal_test.go +++ b/pkg/service/signal_test.go @@ -67,7 +67,6 @@ func TestSignal(t *testing.T) { LoggerStub: func(context.Context) logger.Logger { return logger.GetLogger() }, HandleSessionStub: func( ctx context.Context, - createRoom *livekit.CreateRoomRequest, pi routing.ParticipantInit, connectionID livekit.ConnectionID, requestSource routing.MessageSource, @@ -124,7 +123,6 @@ func TestSignal(t *testing.T) { LoggerStub: func(context.Context) logger.Logger { return logger.GetLogger() }, HandleSessionStub: func( ctx context.Context, - createRoom *livekit.CreateRoomRequest, pi routing.ParticipantInit, connectionID livekit.ConnectionID, requestSource routing.MessageSource, diff --git a/pkg/service/utils.go b/pkg/service/utils.go index db76e008a..fd54197b2 100644 --- a/pkg/service/utils.go +++ b/pkg/service/utils.go @@ -22,6 +22,7 @@ import ( "regexp" "strings" + "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" ) @@ -67,3 +68,17 @@ func GetClientIP(r *http.Request) string { ip, _, _ := net.SplitHostPort(r.RemoteAddr) return ip } + +func SetRoomConfiguration(createRequest *livekit.CreateRoomRequest, conf *livekit.RoomConfiguration) { + if conf == nil { + return + } + createRequest.Agents = conf.Agents + createRequest.Egress = conf.Egress + createRequest.EmptyTimeout = conf.EmptyTimeout + createRequest.DepartureTimeout = conf.DepartureTimeout + createRequest.MaxParticipants = conf.MaxParticipants + createRequest.MinPlayoutDelay = conf.MinPlayoutDelay + createRequest.MaxPlayoutDelay = conf.MaxPlayoutDelay + createRequest.SyncStreams = conf.SyncStreams +} diff --git a/test/agent_test.go b/test/agent_test.go index ad50c53d9..554e36e33 100644 --- a/test/agent_test.go +++ b/test/agent_test.go @@ -120,12 +120,10 @@ func TestAgentNamespaces(t *testing.T) { _, err = roomClient.CreateRoom(contextWithToken(createRoomToken()), &livekit.CreateRoomRequest{ Name: testRoom, - Agent: &livekit.RoomAgent{ - Dispatches: []*livekit.RoomAgentDispatch{ - &livekit.RoomAgentDispatch{}, - &livekit.RoomAgentDispatch{ - AgentName: "ag", - }, + Agents: []*livekit.RoomAgentDispatch{ + {}, + { + AgentName: "ag", }, }, })