mirror of
https://github.com/livekit/livekit.git
synced 2026-03-30 19:55:41 +00:00
249 lines
6.7 KiB
Go
249 lines
6.7 KiB
Go
// Copyright 2023 LiveKit, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package service
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"time"
|
|
|
|
"github.com/livekit/protocol/livekit"
|
|
"github.com/livekit/protocol/logger"
|
|
"github.com/livekit/protocol/utils"
|
|
"github.com/livekit/protocol/utils/guid"
|
|
"github.com/livekit/psrpc"
|
|
|
|
"github.com/livekit/livekit-server/pkg/config"
|
|
"github.com/livekit/livekit-server/pkg/routing"
|
|
"github.com/livekit/livekit-server/pkg/routing/selector"
|
|
)
|
|
|
|
type StandardRoomAllocator struct {
|
|
config *config.Config
|
|
router routing.Router
|
|
selector selector.NodeSelector
|
|
roomStore ObjectStore
|
|
}
|
|
|
|
func NewRoomAllocator(conf *config.Config, router routing.Router, rs ObjectStore) (RoomAllocator, error) {
|
|
ns, err := selector.CreateNodeSelector(conf)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &StandardRoomAllocator{
|
|
config: conf,
|
|
router: router,
|
|
selector: ns,
|
|
roomStore: rs,
|
|
}, nil
|
|
}
|
|
|
|
func (r *StandardRoomAllocator) AutoCreateEnabled(context.Context) bool {
|
|
return r.config.Room.AutoCreate
|
|
}
|
|
|
|
// CreateRoom creates a new room from a request and allocates it to a node to handle
|
|
// it'll also monitor its state, and cleans it up when appropriate
|
|
func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest, isExplicit bool) (*livekit.Room, *livekit.RoomInternal, bool, error) {
|
|
token, err := r.roomStore.LockRoom(ctx, livekit.RoomName(req.Name), 5*time.Second)
|
|
if err != nil {
|
|
return nil, nil, false, err
|
|
}
|
|
defer func() {
|
|
_ = r.roomStore.UnlockRoom(ctx, livekit.RoomName(req.Name), token)
|
|
}()
|
|
|
|
// find existing room and update it
|
|
var created bool
|
|
rm, internal, err := r.roomStore.LoadRoom(ctx, livekit.RoomName(req.Name), true)
|
|
if errors.Is(err, ErrRoomNotFound) {
|
|
created = true
|
|
now := time.Now()
|
|
rm = &livekit.Room{
|
|
Sid: guid.New(utils.RoomPrefix),
|
|
Name: req.Name,
|
|
CreationTime: now.Unix(),
|
|
CreationTimeMs: now.UnixMilli(),
|
|
TurnPassword: utils.RandomSecret(),
|
|
}
|
|
internal = &livekit.RoomInternal{}
|
|
applyDefaultRoomConfig(rm, internal, &r.config.Room)
|
|
} else if err != nil {
|
|
return nil, nil, false, err
|
|
}
|
|
|
|
req, err = r.applyNamedRoomConfiguration(req)
|
|
if err != nil {
|
|
return nil, nil, false, err
|
|
}
|
|
|
|
if req.EmptyTimeout > 0 {
|
|
rm.EmptyTimeout = req.EmptyTimeout
|
|
}
|
|
if req.DepartureTimeout > 0 {
|
|
rm.DepartureTimeout = req.DepartureTimeout
|
|
}
|
|
if req.MaxParticipants > 0 {
|
|
rm.MaxParticipants = req.MaxParticipants
|
|
}
|
|
if req.Metadata != "" {
|
|
rm.Metadata = req.Metadata
|
|
}
|
|
if req.Egress != nil {
|
|
if req.Egress.Participant != nil {
|
|
internal.ParticipantEgress = req.Egress.Participant
|
|
}
|
|
if req.Egress.Tracks != nil {
|
|
internal.TrackEgress = req.Egress.Tracks
|
|
}
|
|
}
|
|
if req.Agents != nil {
|
|
internal.AgentDispatches = req.Agents
|
|
}
|
|
if req.MinPlayoutDelay > 0 || req.MaxPlayoutDelay > 0 {
|
|
internal.PlayoutDelay = &livekit.PlayoutDelay{
|
|
Enabled: true,
|
|
Min: req.MinPlayoutDelay,
|
|
Max: req.MaxPlayoutDelay,
|
|
}
|
|
}
|
|
if req.SyncStreams {
|
|
internal.SyncStreams = true
|
|
}
|
|
|
|
if err = r.roomStore.StoreRoom(ctx, rm, internal); err != nil {
|
|
return nil, nil, false, err
|
|
}
|
|
|
|
return rm, internal, created, nil
|
|
}
|
|
|
|
func (r *StandardRoomAllocator) SelectRoomNode(ctx context.Context, roomName livekit.RoomName, nodeID livekit.NodeID) error {
|
|
// check if room already assigned
|
|
existing, err := r.router.GetNodeForRoom(ctx, roomName)
|
|
if !errors.Is(err, routing.ErrNotFound) && err != nil {
|
|
return err
|
|
}
|
|
|
|
// if already assigned and still available, keep it on that node
|
|
if err == nil && selector.IsAvailable(existing) {
|
|
// if node hosting the room is full, deny entry
|
|
if selector.LimitsReached(r.config.Limit, existing.Stats) {
|
|
return routing.ErrNodeLimitReached
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// select a new node
|
|
if nodeID == "" {
|
|
nodes, err := r.router.ListNodes()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
node, err := r.selector.SelectNode(nodes)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
nodeID = livekit.NodeID(node.Id)
|
|
}
|
|
|
|
logger.Infow("selected node for room", "room", roomName, "selectedNodeID", nodeID)
|
|
err = r.router.SetNodeForRoom(ctx, roomName, nodeID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *StandardRoomAllocator) ValidateCreateRoom(ctx context.Context, roomName livekit.RoomName) error {
|
|
// when auto create is disabled, we'll check to ensure it's already created
|
|
if !r.config.Room.AutoCreate {
|
|
_, _, err := r.roomStore.LoadRoom(ctx, roomName, false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func applyDefaultRoomConfig(room *livekit.Room, internal *livekit.RoomInternal, conf *config.RoomConfig) {
|
|
room.EmptyTimeout = conf.EmptyTimeout
|
|
room.DepartureTimeout = conf.DepartureTimeout
|
|
room.MaxParticipants = conf.MaxParticipants
|
|
for _, codec := range conf.EnabledCodecs {
|
|
room.EnabledCodecs = append(room.EnabledCodecs, &livekit.Codec{
|
|
Mime: codec.Mime,
|
|
FmtpLine: codec.FmtpLine,
|
|
})
|
|
}
|
|
internal.PlayoutDelay = &livekit.PlayoutDelay{
|
|
Enabled: conf.PlayoutDelay.Enabled,
|
|
Min: uint32(conf.PlayoutDelay.Min),
|
|
Max: uint32(conf.PlayoutDelay.Max),
|
|
}
|
|
internal.SyncStreams = conf.SyncStreams
|
|
}
|
|
|
|
func (r *StandardRoomAllocator) applyNamedRoomConfiguration(req *livekit.CreateRoomRequest) (*livekit.CreateRoomRequest, error) {
|
|
if req.RoomPreset == "" {
|
|
return req, nil
|
|
}
|
|
|
|
conf, ok := r.config.Room.RoomConfigurations[req.RoomPreset]
|
|
if !ok {
|
|
return req, psrpc.NewErrorf(psrpc.InvalidArgument, "unknown room configuration in create room request")
|
|
}
|
|
|
|
clone := utils.CloneProto(req)
|
|
|
|
if clone.EmptyTimeout == 0 {
|
|
clone.EmptyTimeout = conf.EmptyTimeout
|
|
}
|
|
if clone.DepartureTimeout == 0 {
|
|
clone.DepartureTimeout = conf.DepartureTimeout
|
|
}
|
|
if clone.MaxParticipants == 0 {
|
|
clone.MaxParticipants = conf.MaxParticipants
|
|
}
|
|
if clone.Egress == nil {
|
|
clone.Egress = utils.CloneProto(conf.Egress)
|
|
}
|
|
if clone.Agents == nil {
|
|
clone.Agents = make([]*livekit.RoomAgentDispatch, 0, len(conf.Agents))
|
|
for _, agent := range conf.Agents {
|
|
clone.Agents = append(clone.Agents, utils.CloneProto(agent))
|
|
}
|
|
}
|
|
if clone.MinPlayoutDelay == 0 {
|
|
clone.MinPlayoutDelay = conf.MinPlayoutDelay
|
|
}
|
|
if clone.MaxPlayoutDelay == 0 {
|
|
clone.MaxPlayoutDelay = conf.MaxPlayoutDelay
|
|
}
|
|
if !clone.SyncStreams {
|
|
clone.SyncStreams = conf.SyncStreams
|
|
}
|
|
if clone.Metadata == "" {
|
|
clone.Metadata = conf.Metadata
|
|
}
|
|
|
|
return clone, nil
|
|
}
|