Handle room configuration that's set in the grant itself (#3120)

* Handle room configuration that's set in the grant itself

* ensure refresh token contains updates

* deps

* dep

---------

Co-authored-by: Paul Wells <paulwe@gmail.com>
This commit is contained in:
David Zhao
2024-10-21 23:31:12 -07:00
committed by GitHub
parent d751f209d5
commit dd7cd7eafc
14 changed files with 65 additions and 73 deletions

4
go.mod
View File

@@ -19,7 +19,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598
github.com/livekit/protocol v1.26.1-0.20241022031344-538889e5de0a
github.com/livekit/protocol v1.26.1-0.20241022053724-1eb56d424343
github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9
github.com/mackerelio/go-osstat v0.2.5
github.com/magefile/mage v1.15.0
@@ -56,6 +56,8 @@ require (
gopkg.in/yaml.v3 v3.0.1
)
// replace github.com/livekit/protocol => ../protocol
require (
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.33.0-20240401165935-b983156c5e99.1 // indirect
dario.cat/mergo v1.0.0 // indirect

2
go.sum
View File

@@ -165,8 +165,6 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 h1:yLlkHk2feSLHstD9n4VKg7YEBR4rLODTI4WE8gNBEnQ=
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE=
github.com/livekit/protocol v1.26.1-0.20241022031344-538889e5de0a h1:31YXXJLEwCflp7KEe9rRAwmONyCwHFujTl4MdxegTxw=
github.com/livekit/protocol v1.26.1-0.20241022031344-538889e5de0a/go.mod h1:nxRzmQBKSYK64gqr7ABWwt78hvrgiO2wYuCojRYb7Gs=
github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9 h1:33oBjGpVD9tYkDXQU42tnHl8eCX9G6PVUToBVuCUyOs=
github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o=

View File

@@ -253,8 +253,8 @@ type RoomConfig struct {
// deprecated, moved to limits
MaxRoomNameLength int `yaml:"max_room_name_length,omitempty"`
// deprecated, moved to limits
MaxParticipantIdentityLength int `yaml:"max_participant_identity_length,omitempty"`
RoomConfigurations map[string]livekit.RoomConfiguration `yaml:"room_configurations,omitempty"`
MaxParticipantIdentityLength int `yaml:"max_participant_identity_length,omitempty"`
RoomConfigurations map[string]*livekit.RoomConfiguration `yaml:"room_configurations,omitempty"`
}
type CodecSpec struct {

View File

@@ -169,6 +169,7 @@ func ParticipantInitFromStartSession(ss *livekit.StartSession, region string) (*
AdaptiveStream: ss.AdaptiveStream,
ID: livekit.ParticipantID(ss.ParticipantId),
DisableICELite: ss.DisableIceLite,
CreateRoom: ss.CreateRoom,
}
if ss.SubscriberAllowPause != nil {
subscriberAllowPause := *ss.SubscriberAllowPause

View File

@@ -1728,9 +1728,7 @@ func (r *Room) createAgentDispatchesFromRoomAgent() {
roomDisp := r.internal.AgentDispatches
if len(roomDisp) == 0 {
// Backward compatibility: by default, start any agent in the empty JobName
roomDisp = []*livekit.RoomAgentDispatch{
&livekit.RoomAgentDispatch{},
}
roomDisp = []*livekit.RoomAgentDispatch{{}}
}
for _, ag := range roomDisp {

View File

@@ -179,16 +179,6 @@ func EnsureCreatePermission(ctx context.Context) error {
return nil
}
func GetRoomConfiguration(ctx context.Context) string {
claims := GetGrants(ctx)
if claims == nil || claims.Video == nil {
return ""
}
return claims.Video.RoomConfiguration
}
func EnsureListPermission(ctx context.Context) error {
claims := GetGrants(ctx)
if claims == nil || claims.Video == nil || !claims.Video.RoomList {

View File

@@ -108,8 +108,8 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre
internal.TrackEgress = req.Egress.Tracks
}
}
if req.Agent != nil {
internal.AgentDispatches = req.Agent.Dispatches
if req.Agents != nil {
internal.AgentDispatches = req.Agents
}
if req.MinPlayoutDelay > 0 || req.MaxPlayoutDelay > 0 {
internal.PlayoutDelay = &livekit.PlayoutDelay{
@@ -200,11 +200,11 @@ func applyDefaultRoomConfig(room *livekit.Room, internal *livekit.RoomInternal,
}
func (r *StandardRoomAllocator) applyNamedRoomConfiguration(req *livekit.CreateRoomRequest) (*livekit.CreateRoomRequest, error) {
if req.ConfigName == "" {
if req.RoomPreset == "" {
return req, nil
}
conf, ok := r.config.Room.RoomConfigurations[req.ConfigName]
conf, ok := r.config.Room.RoomConfigurations[req.RoomPreset]
if !ok {
return req, psrpc.NewErrorf(psrpc.InvalidArgument, "unknown room confguration in create room request")
}
@@ -224,8 +224,11 @@ func (r *StandardRoomAllocator) applyNamedRoomConfiguration(req *livekit.CreateR
if clone.Egress == nil {
clone.Egress = utils.CloneProto(conf.Egress)
}
if clone.Agent == nil {
clone.Agent = utils.CloneProto(conf.Agent)
if clone.Agents == nil {
clone.Agents = make([]*livekit.RoomAgentDispatch, 0, len(conf.Agents))
for _, agent := range conf.Agents {
clone.Agents = append(clone.Agents, utils.CloneProto(agent))
}
}
if clone.MinPlayoutDelay == 0 {
clone.MinPlayoutDelay = conf.MinPlayoutDelay

View File

@@ -278,13 +278,13 @@ func (r *RoomManager) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq
// StartSession starts WebRTC session when a new participant is connected, takes place on RTC node
func (r *RoomManager) StartSession(
ctx context.Context,
createRoom *livekit.CreateRoomRequest,
pi routing.ParticipantInit,
requestSource routing.MessageSource,
responseSink routing.MessageSink,
) error {
sessionStartTime := time.Now()
createRoom := pi.CreateRoom
room, err := r.getOrCreateRoom(ctx, createRoom)
if err != nil {
return err
@@ -990,7 +990,9 @@ func (r *RoomManager) refreshToken(participant types.LocalParticipant) error {
SetValidFor(tokenDefaultTTL).
SetMetadata(grants.Metadata).
SetAttributes(grants.Attributes).
AddGrant(grants.Video)
SetVideoGrant(grants.Video).
SetRoomConfig(grants.GetRoomConfiguration()).
SetRoomPreset(grants.RoomPreset)
jwt, err := token.ToJWT()
if err == nil {
err = participant.SendRefreshToken(jwt)

View File

@@ -166,6 +166,12 @@ func (s *RTCService) validate(r *http.Request) (livekit.RoomName, routing.Partic
}
}
createRequest := &livekit.CreateRoomRequest{
Name: string(roomName),
RoomPreset: claims.RoomPreset,
}
SetRoomConfiguration(createRequest, claims.GetRoomConfiguration())
pi = routing.ParticipantInit{
Reconnect: boolValue(reconnectParam),
ReconnectReason: livekit.ReconnectReason(reconnectReason),
@@ -175,10 +181,7 @@ func (s *RTCService) validate(r *http.Request) (livekit.RoomName, routing.Partic
Client: s.ParseClientInfo(r),
Grants: claims,
Region: region,
CreateRoom: &livekit.CreateRoomRequest{
Name: string(roomName),
ConfigName: GetRoomConfiguration(r.Context()),
},
CreateRoom: createRequest,
}
if pi.Reconnect {
pi.ID = livekit.ParticipantID(participantID)

View File

@@ -12,15 +12,14 @@ import (
)
type FakeSessionHandler struct {
HandleSessionStub func(context.Context, *livekit.CreateRoomRequest, routing.ParticipantInit, livekit.ConnectionID, routing.MessageSource, routing.MessageSink) error
HandleSessionStub func(context.Context, routing.ParticipantInit, livekit.ConnectionID, routing.MessageSource, routing.MessageSink) error
handleSessionMutex sync.RWMutex
handleSessionArgsForCall []struct {
arg1 context.Context
arg2 *livekit.CreateRoomRequest
arg3 routing.ParticipantInit
arg4 livekit.ConnectionID
arg5 routing.MessageSource
arg6 routing.MessageSink
arg2 routing.ParticipantInit
arg3 livekit.ConnectionID
arg4 routing.MessageSource
arg5 routing.MessageSink
}
handleSessionReturns struct {
result1 error
@@ -43,23 +42,22 @@ type FakeSessionHandler struct {
invocationsMutex sync.RWMutex
}
func (fake *FakeSessionHandler) HandleSession(arg1 context.Context, arg2 *livekit.CreateRoomRequest, arg3 routing.ParticipantInit, arg4 livekit.ConnectionID, arg5 routing.MessageSource, arg6 routing.MessageSink) error {
func (fake *FakeSessionHandler) HandleSession(arg1 context.Context, arg2 routing.ParticipantInit, arg3 livekit.ConnectionID, arg4 routing.MessageSource, arg5 routing.MessageSink) error {
fake.handleSessionMutex.Lock()
ret, specificReturn := fake.handleSessionReturnsOnCall[len(fake.handleSessionArgsForCall)]
fake.handleSessionArgsForCall = append(fake.handleSessionArgsForCall, struct {
arg1 context.Context
arg2 *livekit.CreateRoomRequest
arg3 routing.ParticipantInit
arg4 livekit.ConnectionID
arg5 routing.MessageSource
arg6 routing.MessageSink
}{arg1, arg2, arg3, arg4, arg5, arg6})
arg2 routing.ParticipantInit
arg3 livekit.ConnectionID
arg4 routing.MessageSource
arg5 routing.MessageSink
}{arg1, arg2, arg3, arg4, arg5})
stub := fake.HandleSessionStub
fakeReturns := fake.handleSessionReturns
fake.recordInvocation("HandleSession", []interface{}{arg1, arg2, arg3, arg4, arg5, arg6})
fake.recordInvocation("HandleSession", []interface{}{arg1, arg2, arg3, arg4, arg5})
fake.handleSessionMutex.Unlock()
if stub != nil {
return stub(arg1, arg2, arg3, arg4, arg5, arg6)
return stub(arg1, arg2, arg3, arg4, arg5)
}
if specificReturn {
return ret.result1
@@ -73,17 +71,17 @@ func (fake *FakeSessionHandler) HandleSessionCallCount() int {
return len(fake.handleSessionArgsForCall)
}
func (fake *FakeSessionHandler) HandleSessionCalls(stub func(context.Context, *livekit.CreateRoomRequest, routing.ParticipantInit, livekit.ConnectionID, routing.MessageSource, routing.MessageSink) error) {
func (fake *FakeSessionHandler) HandleSessionCalls(stub func(context.Context, routing.ParticipantInit, livekit.ConnectionID, routing.MessageSource, routing.MessageSink) error) {
fake.handleSessionMutex.Lock()
defer fake.handleSessionMutex.Unlock()
fake.HandleSessionStub = stub
}
func (fake *FakeSessionHandler) HandleSessionArgsForCall(i int) (context.Context, *livekit.CreateRoomRequest, routing.ParticipantInit, livekit.ConnectionID, routing.MessageSource, routing.MessageSink) {
func (fake *FakeSessionHandler) HandleSessionArgsForCall(i int) (context.Context, routing.ParticipantInit, livekit.ConnectionID, routing.MessageSource, routing.MessageSink) {
fake.handleSessionMutex.RLock()
defer fake.handleSessionMutex.RUnlock()
argsForCall := fake.handleSessionArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4, argsForCall.arg5, argsForCall.arg6
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4, argsForCall.arg5
}
func (fake *FakeSessionHandler) HandleSessionReturns(result1 error) {

View File

@@ -39,7 +39,6 @@ type SessionHandler interface {
HandleSession(
ctx context.Context,
createRoom *livekit.CreateRoomRequest,
pi routing.ParticipantInit,
connectionID livekit.ConnectionID,
requestSource routing.MessageSource,
@@ -94,7 +93,6 @@ func (s *defaultSessionHandler) Logger(ctx context.Context) logger.Logger {
func (s *defaultSessionHandler) HandleSession(
ctx context.Context,
createRoom *livekit.CreateRoomRequest,
pi routing.ParticipantInit,
connectionID livekit.ConnectionID,
requestSource routing.MessageSource,
@@ -102,7 +100,7 @@ func (s *defaultSessionHandler) HandleSession(
) error {
prometheus.IncrementParticipantRtcInit(1)
rtcNode, err := s.router.GetNodeForRoom(ctx, livekit.RoomName(createRoom.Name))
rtcNode, err := s.router.GetNodeForRoom(ctx, livekit.RoomName(pi.CreateRoom.Name))
if err != nil {
return err
}
@@ -115,7 +113,7 @@ func (s *defaultSessionHandler) HandleSession(
return err
}
return s.roomManager.StartSession(ctx, createRoom, pi, requestSource, responseSink)
return s.roomManager.StartSession(ctx, pi, requestSource, responseSink)
}
func (s *SignalServer) Start() error {
@@ -181,19 +179,7 @@ func (r *signalService) RelaySignal(stream psrpc.ServerStream[*rpc.RelaySignalRe
// and the delivery of any parting messages from the client. take care to
// copy the incoming rpc headers to avoid dropping any session vars.
ctx := metadata.NewContextWithIncomingHeader(context.Background(), metadata.IncomingHeader(stream.Context()))
createRoom := ss.CreateRoom
if createRoom == nil {
createRoom = &livekit.CreateRoomRequest{
Name: ss.RoomName,
}
if pi.Grants != nil && pi.Grants.Video != nil {
createRoom.ConfigName = pi.Grants.Video.RoomConfiguration
}
}
err = r.sessionHandler.HandleSession(ctx, createRoom, *pi, livekit.ConnectionID(ss.ConnectionId), reqChan, sink)
err = r.sessionHandler.HandleSession(ctx, *pi, livekit.ConnectionID(ss.ConnectionId), reqChan, sink)
if err != nil {
sink.Close()
l.Errorw("could not handle new participant", err)

View File

@@ -67,7 +67,6 @@ func TestSignal(t *testing.T) {
LoggerStub: func(context.Context) logger.Logger { return logger.GetLogger() },
HandleSessionStub: func(
ctx context.Context,
createRoom *livekit.CreateRoomRequest,
pi routing.ParticipantInit,
connectionID livekit.ConnectionID,
requestSource routing.MessageSource,
@@ -124,7 +123,6 @@ func TestSignal(t *testing.T) {
LoggerStub: func(context.Context) logger.Logger { return logger.GetLogger() },
HandleSessionStub: func(
ctx context.Context,
createRoom *livekit.CreateRoomRequest,
pi routing.ParticipantInit,
connectionID livekit.ConnectionID,
requestSource routing.MessageSource,

View File

@@ -22,6 +22,7 @@ import (
"regexp"
"strings"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
)
@@ -67,3 +68,17 @@ func GetClientIP(r *http.Request) string {
ip, _, _ := net.SplitHostPort(r.RemoteAddr)
return ip
}
func SetRoomConfiguration(createRequest *livekit.CreateRoomRequest, conf *livekit.RoomConfiguration) {
if conf == nil {
return
}
createRequest.Agents = conf.Agents
createRequest.Egress = conf.Egress
createRequest.EmptyTimeout = conf.EmptyTimeout
createRequest.DepartureTimeout = conf.DepartureTimeout
createRequest.MaxParticipants = conf.MaxParticipants
createRequest.MinPlayoutDelay = conf.MinPlayoutDelay
createRequest.MaxPlayoutDelay = conf.MaxPlayoutDelay
createRequest.SyncStreams = conf.SyncStreams
}

View File

@@ -120,12 +120,10 @@ func TestAgentNamespaces(t *testing.T) {
_, err = roomClient.CreateRoom(contextWithToken(createRoomToken()), &livekit.CreateRoomRequest{
Name: testRoom,
Agent: &livekit.RoomAgent{
Dispatches: []*livekit.RoomAgentDispatch{
&livekit.RoomAgentDispatch{},
&livekit.RoomAgentDispatch{
AgentName: "ag",
},
Agents: []*livekit.RoomAgentDispatch{
{},
{
AgentName: "ag",
},
},
})