mirror of
https://github.com/livekit/livekit.git
synced 2026-03-30 17:45:40 +00:00
1197 lines
37 KiB
Go
1197 lines
37 KiB
Go
// Copyright 2023 LiveKit, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package service
|
|
|
|
import (
|
|
"context"
|
|
"crypto/hmac"
|
|
"crypto/sha1"
|
|
"encoding/base64"
|
|
"fmt"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/pkg/errors"
|
|
"golang.org/x/exp/maps"
|
|
|
|
"github.com/livekit/mediatransportutil/pkg/rtcconfig"
|
|
"github.com/livekit/protocol/auth"
|
|
"github.com/livekit/protocol/livekit"
|
|
"github.com/livekit/protocol/logger"
|
|
"github.com/livekit/protocol/observability/roomobs"
|
|
"github.com/livekit/protocol/rpc"
|
|
"github.com/livekit/protocol/utils"
|
|
"github.com/livekit/protocol/utils/guid"
|
|
"github.com/livekit/protocol/utils/must"
|
|
"github.com/livekit/psrpc"
|
|
"github.com/livekit/psrpc/pkg/middleware"
|
|
|
|
"github.com/livekit/livekit-server/pkg/agent"
|
|
"github.com/livekit/livekit-server/pkg/sfu"
|
|
sutils "github.com/livekit/livekit-server/pkg/utils"
|
|
|
|
"github.com/livekit/livekit-server/pkg/clientconfiguration"
|
|
"github.com/livekit/livekit-server/pkg/config"
|
|
"github.com/livekit/livekit-server/pkg/routing"
|
|
"github.com/livekit/livekit-server/pkg/rtc"
|
|
"github.com/livekit/livekit-server/pkg/rtc/types"
|
|
"github.com/livekit/livekit-server/pkg/telemetry"
|
|
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
|
|
"github.com/livekit/livekit-server/version"
|
|
)
|
|
|
|
const (
|
|
tokenRefreshInterval = 5 * time.Minute
|
|
tokenDefaultTTL = 10 * time.Minute
|
|
)
|
|
|
|
type iceConfigCacheKey struct {
|
|
roomName livekit.RoomName
|
|
participantIdentity livekit.ParticipantIdentity
|
|
}
|
|
|
|
// RoomManager manages rooms and its interaction with participants.
|
|
// It's responsible for creating, deleting rooms, as well as running sessions for participants
|
|
type RoomManager struct {
|
|
lock sync.RWMutex
|
|
|
|
config *config.Config
|
|
rtcConfig *rtc.WebRTCConfig
|
|
serverInfo *livekit.ServerInfo
|
|
currentNode routing.LocalNode
|
|
router routing.Router
|
|
roomAllocator RoomAllocator
|
|
roomManagerServer rpc.TypedRoomManagerServer
|
|
whipServer rpc.WHIPServer[livekit.NodeID]
|
|
roomStore ObjectStore
|
|
telemetry telemetry.TelemetryService
|
|
clientConfManager clientconfiguration.ClientConfigurationManager
|
|
agentClient agent.Client
|
|
agentStore AgentStore
|
|
egressLauncher rtc.EgressLauncher
|
|
versionGenerator utils.TimedVersionGenerator
|
|
turnAuthHandler *TURNAuthHandler
|
|
bus psrpc.MessageBus
|
|
|
|
rooms map[livekit.RoomName]*rtc.Room
|
|
|
|
roomServers utils.MultitonService[rpc.RoomTopic]
|
|
agentDispatchServers utils.MultitonService[rpc.RoomTopic]
|
|
participantServers utils.MultitonService[rpc.ParticipantTopic]
|
|
httpSignalParticipantServers utils.MultitonService[rpc.ParticipantTopic]
|
|
whipParticipantServers utils.MultitonService[rpc.ParticipantTopic]
|
|
|
|
iceConfigCache *sutils.IceConfigCache[iceConfigCacheKey]
|
|
|
|
forwardStats *sfu.ForwardStats
|
|
|
|
rpc.UnimplementedParticipantServer
|
|
rpc.UnimplementedRoomServer
|
|
rpc.UnimplementedRoomManagerServer
|
|
}
|
|
|
|
func NewLocalRoomManager(
|
|
conf *config.Config,
|
|
roomStore ObjectStore,
|
|
currentNode routing.LocalNode,
|
|
router routing.Router,
|
|
roomAllocator RoomAllocator,
|
|
telemetry telemetry.TelemetryService,
|
|
agentClient agent.Client,
|
|
agentStore AgentStore,
|
|
egressLauncher rtc.EgressLauncher,
|
|
versionGenerator utils.TimedVersionGenerator,
|
|
turnAuthHandler *TURNAuthHandler,
|
|
bus psrpc.MessageBus,
|
|
forwardStats *sfu.ForwardStats,
|
|
) (*RoomManager, error) {
|
|
rtcConf, err := rtc.NewWebRTCConfig(conf)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
r := &RoomManager{
|
|
config: conf,
|
|
rtcConfig: rtcConf,
|
|
currentNode: currentNode,
|
|
router: router,
|
|
roomAllocator: roomAllocator,
|
|
roomStore: roomStore,
|
|
telemetry: telemetry,
|
|
clientConfManager: clientconfiguration.NewStaticClientConfigurationManager(clientconfiguration.StaticConfigurations),
|
|
egressLauncher: egressLauncher,
|
|
agentClient: agentClient,
|
|
agentStore: agentStore,
|
|
versionGenerator: versionGenerator,
|
|
turnAuthHandler: turnAuthHandler,
|
|
bus: bus,
|
|
forwardStats: forwardStats,
|
|
|
|
rooms: make(map[livekit.RoomName]*rtc.Room),
|
|
|
|
iceConfigCache: sutils.NewIceConfigCache[iceConfigCacheKey](0),
|
|
|
|
serverInfo: &livekit.ServerInfo{
|
|
Edition: livekit.ServerInfo_Standard,
|
|
Version: version.Version,
|
|
Protocol: types.CurrentProtocol,
|
|
AgentProtocol: agent.CurrentProtocol,
|
|
Region: conf.Region,
|
|
NodeId: string(currentNode.NodeID()),
|
|
},
|
|
}
|
|
|
|
r.roomManagerServer, err = rpc.NewTypedRoomManagerServer(r, bus, rpc.WithServerLogger(logger.GetLogger()), middleware.WithServerMetrics(rpc.PSRPCMetricsObserver{}), psrpc.WithServerChannelSize(conf.PSRPC.BufferSize))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err := r.roomManagerServer.RegisterAllNodeTopics(currentNode.NodeID()); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
whipService, err := newWhipService(r)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
r.whipServer, err = rpc.NewWHIPServer[livekit.NodeID](whipService, bus, rpc.WithDefaultServerOptions(conf.PSRPC, logger.GetLogger()))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err := r.whipServer.RegisterAllCommonTopics(currentNode.NodeID()); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return r, nil
|
|
}
|
|
|
|
func (r *RoomManager) GetRoom(_ context.Context, roomName livekit.RoomName) *rtc.Room {
|
|
r.lock.RLock()
|
|
defer r.lock.RUnlock()
|
|
return r.rooms[roomName]
|
|
}
|
|
|
|
// deleteRoom completely deletes all room information, including active sessions, room store, and routing info
|
|
func (r *RoomManager) deleteRoom(ctx context.Context, roomName livekit.RoomName) error {
|
|
logger.Infow("deleting room state", "room", roomName)
|
|
r.lock.Lock()
|
|
delete(r.rooms, roomName)
|
|
r.lock.Unlock()
|
|
|
|
var err, err2 error
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(2)
|
|
// clear routing information
|
|
go func() {
|
|
defer wg.Done()
|
|
err = r.router.ClearRoomState(ctx, roomName)
|
|
}()
|
|
// also delete room from db
|
|
go func() {
|
|
defer wg.Done()
|
|
err2 = r.roomStore.DeleteRoom(ctx, roomName)
|
|
}()
|
|
|
|
wg.Wait()
|
|
if err2 != nil {
|
|
err = err2
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (r *RoomManager) CloseIdleRooms() {
|
|
r.lock.RLock()
|
|
rooms := maps.Values(r.rooms)
|
|
r.lock.RUnlock()
|
|
|
|
for _, room := range rooms {
|
|
room.CloseIfEmpty()
|
|
}
|
|
}
|
|
|
|
func (r *RoomManager) HasParticipants() bool {
|
|
r.lock.RLock()
|
|
defer r.lock.RUnlock()
|
|
|
|
for _, room := range r.rooms {
|
|
if len(room.GetParticipants()) != 0 {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (r *RoomManager) Stop() {
|
|
// disconnect all clients
|
|
r.lock.RLock()
|
|
rooms := maps.Values(r.rooms)
|
|
r.lock.RUnlock()
|
|
|
|
for _, room := range rooms {
|
|
room.Close(types.ParticipantCloseReasonRoomManagerStop)
|
|
}
|
|
|
|
r.roomManagerServer.Kill()
|
|
r.whipServer.Kill()
|
|
r.roomServers.Kill()
|
|
r.agentDispatchServers.Kill()
|
|
r.participantServers.Kill()
|
|
r.httpSignalParticipantServers.Kill()
|
|
r.whipParticipantServers.Kill()
|
|
|
|
if r.rtcConfig != nil {
|
|
if r.rtcConfig.UDPMux != nil {
|
|
_ = r.rtcConfig.UDPMux.Close()
|
|
}
|
|
if r.rtcConfig.TCPMuxListener != nil {
|
|
_ = r.rtcConfig.TCPMuxListener.Close()
|
|
}
|
|
}
|
|
|
|
r.iceConfigCache.Stop()
|
|
|
|
if r.forwardStats != nil {
|
|
r.forwardStats.Stop()
|
|
}
|
|
}
|
|
|
|
func (r *RoomManager) CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (*livekit.Room, error) {
|
|
room, err := r.getOrCreateRoom(ctx, req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer room.Release()
|
|
|
|
return room.ToProto(), nil
|
|
}
|
|
|
|
// StartSession starts WebRTC session when a new participant is connected, takes place on RTC node
|
|
func (r *RoomManager) StartSession(
|
|
ctx context.Context,
|
|
pi routing.ParticipantInit,
|
|
requestSource routing.MessageSource,
|
|
responseSink routing.MessageSink,
|
|
useOneShotSignallingMode bool,
|
|
) error {
|
|
sessionStartTime := time.Now()
|
|
|
|
createRoom := pi.CreateRoom
|
|
room, err := r.getOrCreateRoom(ctx, createRoom)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer room.Release()
|
|
|
|
protoRoom, roomInternal := room.ToProto(), room.Internal()
|
|
|
|
// only create the room, but don't start a participant session
|
|
if pi.Identity == "" {
|
|
return nil
|
|
}
|
|
|
|
// should not error out, error is logged in iceServersForParticipant even if it fails
|
|
// since this is used for TURN server credentials, we don't want to fail the request even if there's no TURN for the session
|
|
apiKey, _, _ := r.getFirstKeyPair()
|
|
|
|
participant := room.GetParticipant(pi.Identity)
|
|
if participant != nil {
|
|
// When reconnecting, it means WS has interrupted but underlying peer connection is still ok in this state,
|
|
// we'll keep the participant SID, and just swap the sink for the underlying connection
|
|
if pi.Reconnect {
|
|
if participant.IsClosed() {
|
|
// Send leave request if participant is closed, i. e. handle the case of client trying to resume crossing wires with
|
|
// server closing the participant due to some irrecoverable condition. Such a condition would have triggered
|
|
// a full reconnect when that condition occurred.
|
|
//
|
|
// It is possible that the client did not get that send request. So, send it again.
|
|
logger.Infow("cannot restart a closed participant",
|
|
"room", room.Name(),
|
|
"nodeID", r.currentNode.NodeID(),
|
|
"participant", pi.Identity,
|
|
"reason", pi.ReconnectReason,
|
|
)
|
|
|
|
var leave *livekit.LeaveRequest
|
|
pv := types.ProtocolVersion(pi.Client.Protocol)
|
|
if pv.SupportsRegionsInLeaveRequest() {
|
|
leave = &livekit.LeaveRequest{
|
|
Reason: livekit.DisconnectReason_STATE_MISMATCH,
|
|
Action: livekit.LeaveRequest_RECONNECT,
|
|
}
|
|
} else {
|
|
leave = &livekit.LeaveRequest{
|
|
CanReconnect: true,
|
|
Reason: livekit.DisconnectReason_STATE_MISMATCH,
|
|
}
|
|
}
|
|
_ = responseSink.WriteMessage(&livekit.SignalResponse{
|
|
Message: &livekit.SignalResponse_Leave{
|
|
Leave: leave,
|
|
},
|
|
})
|
|
prometheus.IncrementParticipantRtcCanceled(1)
|
|
return errors.New("could not restart closed participant")
|
|
}
|
|
|
|
participant.GetLogger().Infow(
|
|
"resuming RTC session",
|
|
"nodeID", r.currentNode.NodeID(),
|
|
"participantInit", &pi,
|
|
"numParticipants", room.GetParticipantCount(),
|
|
)
|
|
iceConfig := r.getIceConfig(room.Name(), participant)
|
|
if err = room.ResumeParticipant(
|
|
participant,
|
|
requestSource,
|
|
responseSink,
|
|
iceConfig,
|
|
r.iceServersForParticipant(
|
|
apiKey,
|
|
participant,
|
|
iceConfig.PreferenceSubscriber == livekit.ICECandidateType_ICT_TLS,
|
|
),
|
|
pi.ReconnectReason,
|
|
); err != nil {
|
|
participant.GetLogger().Warnw("could not resume participant", err)
|
|
return err
|
|
}
|
|
r.telemetry.ParticipantResumed(ctx, room.ToProto(), participant.ToProto(), r.currentNode.NodeID(), pi.ReconnectReason)
|
|
|
|
go room.HandleSyncState(participant, pi.SyncState)
|
|
|
|
go r.rtcSessionWorker(room, participant, requestSource)
|
|
return nil
|
|
}
|
|
|
|
// we need to clean up the existing participant, so a new one can join
|
|
participant.GetLogger().Infow("removing duplicate participant")
|
|
room.RemoveParticipant(participant.Identity(), participant.ID(), types.ParticipantCloseReasonDuplicateIdentity)
|
|
} else if pi.Reconnect {
|
|
// send leave request if participant is trying to reconnect without keep subscribe state
|
|
// but missing from the room
|
|
var leave *livekit.LeaveRequest
|
|
pv := types.ProtocolVersion(pi.Client.Protocol)
|
|
if pv.SupportsRegionsInLeaveRequest() {
|
|
leave = &livekit.LeaveRequest{
|
|
Reason: livekit.DisconnectReason_STATE_MISMATCH,
|
|
Action: livekit.LeaveRequest_RECONNECT,
|
|
}
|
|
} else {
|
|
leave = &livekit.LeaveRequest{
|
|
CanReconnect: true,
|
|
Reason: livekit.DisconnectReason_STATE_MISMATCH,
|
|
}
|
|
}
|
|
_ = responseSink.WriteMessage(&livekit.SignalResponse{
|
|
Message: &livekit.SignalResponse_Leave{
|
|
Leave: leave,
|
|
},
|
|
})
|
|
prometheus.IncrementParticipantRtcCanceled(1)
|
|
return errors.New("could not restart participant")
|
|
}
|
|
|
|
sid := livekit.ParticipantID(guid.New(utils.ParticipantPrefix))
|
|
pLogger := rtc.LoggerWithParticipant(
|
|
rtc.LoggerWithRoom(logger.GetLogger(), room.Name(), room.ID()),
|
|
pi.Identity,
|
|
sid,
|
|
false,
|
|
)
|
|
pLogger.Infow(
|
|
"starting RTC session",
|
|
"room", room.Name(),
|
|
"nodeID", r.currentNode.NodeID(),
|
|
"numParticipants", room.GetParticipantCount(),
|
|
"participantInit", &pi,
|
|
)
|
|
|
|
clientConf := r.clientConfManager.GetConfiguration(pi.Client)
|
|
|
|
pv := types.ProtocolVersion(pi.Client.Protocol)
|
|
rtcConf := *r.rtcConfig
|
|
rtcConf.SetBufferFactory(room.GetBufferFactory())
|
|
if pi.DisableICELite {
|
|
rtcConf.SettingEngine.SetLite(false)
|
|
}
|
|
rtcConf.UpdatePublisherConfig(pi.UseSinglePeerConnection)
|
|
|
|
// default allow forceTCP
|
|
allowFallback := true
|
|
if r.config.RTC.AllowTCPFallback != nil {
|
|
allowFallback = *r.config.RTC.AllowTCPFallback
|
|
}
|
|
|
|
// default do not force full reconnect on a publication error
|
|
reconnectOnPublicationError := false
|
|
if r.config.RTC.ReconnectOnPublicationError != nil {
|
|
reconnectOnPublicationError = *r.config.RTC.ReconnectOnPublicationError
|
|
}
|
|
|
|
// default do not force full reconnect on a subscription error
|
|
reconnectOnSubscriptionError := false
|
|
if r.config.RTC.ReconnectOnSubscriptionError != nil {
|
|
reconnectOnSubscriptionError = *r.config.RTC.ReconnectOnSubscriptionError
|
|
}
|
|
|
|
// default do not force full reconnect on a data channel error
|
|
reconnectOnDataChannelError := false
|
|
if r.config.RTC.ReconnectOnDataChannelError != nil {
|
|
reconnectOnDataChannelError = *r.config.RTC.ReconnectOnDataChannelError
|
|
}
|
|
|
|
subscriberAllowPause := r.config.RTC.CongestionControl.AllowPause
|
|
if pi.SubscriberAllowPause != nil {
|
|
subscriberAllowPause = *pi.SubscriberAllowPause
|
|
}
|
|
|
|
participant, err = rtc.NewParticipant(rtc.ParticipantParams{
|
|
Identity: pi.Identity,
|
|
Name: pi.Name,
|
|
SID: sid,
|
|
Config: &rtcConf,
|
|
Sink: responseSink,
|
|
AudioConfig: r.config.Audio,
|
|
VideoConfig: r.config.Video,
|
|
LimitConfig: r.config.Limit,
|
|
ProtocolVersion: pv,
|
|
SessionStartTime: sessionStartTime,
|
|
TelemetryListener: room.ParticipantTelemetryListener(),
|
|
Trailer: room.Trailer(),
|
|
PLIThrottleConfig: r.config.RTC.PLIThrottle,
|
|
CongestionControlConfig: r.config.RTC.CongestionControl,
|
|
PublishEnabledCodecs: protoRoom.EnabledCodecs,
|
|
SubscribeEnabledCodecs: protoRoom.EnabledCodecs,
|
|
Grants: pi.Grants,
|
|
Reconnect: pi.Reconnect,
|
|
Logger: pLogger,
|
|
Reporter: roomobs.NewNoopParticipantSessionReporter(),
|
|
ClientConf: clientConf,
|
|
ClientInfo: rtc.ClientInfo{ClientInfo: pi.Client},
|
|
Region: pi.Region,
|
|
AdaptiveStream: pi.AdaptiveStream,
|
|
AllowTCPFallback: allowFallback,
|
|
TURNSEnabled: r.config.IsTURNSEnabled(),
|
|
ParticipantListener: room.LocalParticipantListener(),
|
|
ParticipantHelper: &roomManagerParticipantHelper{
|
|
room: room,
|
|
codecRegressionThreshold: r.config.Video.CodecRegressionThreshold,
|
|
},
|
|
ReconnectOnPublicationError: reconnectOnPublicationError,
|
|
ReconnectOnSubscriptionError: reconnectOnSubscriptionError,
|
|
ReconnectOnDataChannelError: reconnectOnDataChannelError,
|
|
VersionGenerator: r.versionGenerator,
|
|
SubscriberAllowPause: subscriberAllowPause,
|
|
SubscriptionLimitAudio: r.config.Limit.SubscriptionLimitAudio,
|
|
SubscriptionLimitVideo: r.config.Limit.SubscriptionLimitVideo,
|
|
PlayoutDelay: roomInternal.GetPlayoutDelay(),
|
|
SyncStreams: roomInternal.GetSyncStreams(),
|
|
ForwardStats: r.forwardStats,
|
|
MetricConfig: r.config.Metric,
|
|
UseOneShotSignallingMode: useOneShotSignallingMode,
|
|
DataChannelMaxBufferedAmount: r.config.RTC.DataChannelMaxBufferedAmount,
|
|
DatachannelSlowThreshold: r.config.RTC.DatachannelSlowThreshold,
|
|
DatachannelLossyTargetLatency: r.config.RTC.DatachannelLossyTargetLatency,
|
|
FireOnTrackBySdp: true,
|
|
UseSinglePeerConnection: pi.UseSinglePeerConnection,
|
|
EnableDataTracks: r.config.EnableDataTracks,
|
|
EnableRTPStreamRestartDetection: r.config.RTC.EnableRTPStreamRestartDetection,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
iceConfig := r.setIceConfig(room.Name(), participant)
|
|
|
|
// join room
|
|
opts := rtc.ParticipantOptions{
|
|
AutoSubscribe: pi.AutoSubscribe,
|
|
}
|
|
if pi.AutoSubscribeDataTrack != nil {
|
|
opts.AutoSubscribeDataTrack = *pi.AutoSubscribeDataTrack
|
|
}
|
|
iceServers := r.iceServersForParticipant(apiKey, participant, iceConfig.PreferenceSubscriber == livekit.ICECandidateType_ICT_TLS)
|
|
if err = room.Join(participant, requestSource, &opts, iceServers); err != nil {
|
|
pLogger.Errorw("could not join room", err)
|
|
_ = participant.Close(true, types.ParticipantCloseReasonJoinFailed, false)
|
|
return err
|
|
}
|
|
|
|
var participantServerClosers utils.Closers
|
|
participantTopic := rpc.FormatParticipantTopic(room.Name(), participant.Identity())
|
|
participantServer := must.Get(rpc.NewTypedParticipantServer(r, r.bus))
|
|
participantServerClosers = append(participantServerClosers, utils.CloseFunc(r.participantServers.Replace(participantTopic, participantServer)))
|
|
if err := participantServer.RegisterAllParticipantTopics(participantTopic); err != nil {
|
|
participantServerClosers.Close()
|
|
pLogger.Errorw("could not join register participant topic", err)
|
|
_ = participant.Close(true, types.ParticipantCloseReasonMessageBusFailed, false)
|
|
return err
|
|
}
|
|
|
|
if useOneShotSignallingMode {
|
|
whipParticipantServer := must.Get(rpc.NewTypedWHIPParticipantServer(whipParticipantService{r}, r.bus))
|
|
participantServerClosers = append(participantServerClosers, utils.CloseFunc(r.whipParticipantServers.Replace(participantTopic, whipParticipantServer)))
|
|
if err := whipParticipantServer.RegisterAllCommonTopics(participantTopic); err != nil {
|
|
participantServerClosers.Close()
|
|
pLogger.Errorw("could not join register participant topic for rtc rest participant server", err)
|
|
_ = participant.Close(true, types.ParticipantCloseReasonMessageBusFailed, false)
|
|
return err
|
|
}
|
|
}
|
|
|
|
if err = r.roomStore.StoreParticipant(ctx, room.Name(), participant.ToProto()); err != nil {
|
|
pLogger.Errorw("could not store participant", err)
|
|
}
|
|
|
|
persistRoomForParticipantCount := func(proto *livekit.Room) {
|
|
if !participant.Hidden() && !room.IsClosed() {
|
|
err = r.roomStore.StoreRoom(ctx, proto, room.Internal())
|
|
if err != nil {
|
|
logger.Errorw("could not store room", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// update room store with new numParticipants
|
|
persistRoomForParticipantCount(room.ToProto())
|
|
|
|
clientMeta := &livekit.AnalyticsClientMeta{Region: r.currentNode.Region(), Node: string(r.currentNode.NodeID())}
|
|
r.telemetry.ParticipantJoined(ctx, protoRoom, participant.ToProto(), pi.Client, clientMeta, true, participant.TelemetryGuard())
|
|
participant.AddOnClose(types.ParticipantCloseKeyNormal, func(p types.LocalParticipant) {
|
|
participantServerClosers.Close()
|
|
|
|
if err := r.roomStore.DeleteParticipant(ctx, room.Name(), p.Identity()); err != nil {
|
|
pLogger.Errorw("could not delete participant", err)
|
|
}
|
|
|
|
// update room store with new numParticipants
|
|
proto := room.ToProto()
|
|
persistRoomForParticipantCount(proto)
|
|
r.telemetry.ParticipantLeft(ctx, proto, p.ToProto(), true, participant.TelemetryGuard())
|
|
})
|
|
participant.OnClaimsChanged(func(participant types.LocalParticipant) {
|
|
pLogger.Debugw("refreshing client token after claims change")
|
|
if err := r.refreshToken(participant); err != nil {
|
|
pLogger.Errorw("could not refresh token", err)
|
|
}
|
|
})
|
|
participant.OnICEConfigChanged(func(participant types.LocalParticipant, iceConfig *livekit.ICEConfig) {
|
|
r.iceConfigCache.Put(iceConfigCacheKey{room.Name(), participant.Identity()}, iceConfig)
|
|
})
|
|
|
|
for _, addTrackRequest := range pi.AddTrackRequests {
|
|
participant.AddTrack(addTrackRequest)
|
|
}
|
|
if pi.PublisherOffer != nil {
|
|
participant.HandleOffer(pi.PublisherOffer)
|
|
}
|
|
|
|
go r.rtcSessionWorker(room, participant, requestSource)
|
|
return nil
|
|
}
|
|
|
|
// create the actual room object, to be used on RTC node
|
|
func (r *RoomManager) getOrCreateRoom(ctx context.Context, createRoom *livekit.CreateRoomRequest) (*rtc.Room, error) {
|
|
roomName := livekit.RoomName(createRoom.Name)
|
|
|
|
r.lock.RLock()
|
|
lastSeenRoom := r.rooms[roomName]
|
|
r.lock.RUnlock()
|
|
|
|
if lastSeenRoom != nil && lastSeenRoom.Hold() {
|
|
return lastSeenRoom, nil
|
|
}
|
|
|
|
// create new room, get details first
|
|
ri, internal, created, err := r.roomAllocator.CreateRoom(ctx, createRoom, true)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
r.lock.Lock()
|
|
|
|
currentRoom := r.rooms[roomName]
|
|
for currentRoom != lastSeenRoom {
|
|
r.lock.Unlock()
|
|
if currentRoom != nil && currentRoom.Hold() {
|
|
return currentRoom, nil
|
|
}
|
|
|
|
lastSeenRoom = currentRoom
|
|
r.lock.Lock()
|
|
currentRoom = r.rooms[roomName]
|
|
}
|
|
|
|
// construct ice servers
|
|
newRoom := rtc.NewRoom(ri, internal, *r.rtcConfig, r.config.Room, &r.config.Audio, r.serverInfo, r.telemetry, r.agentClient, r.agentStore, r.egressLauncher)
|
|
|
|
roomTopic := rpc.FormatRoomTopic(roomName)
|
|
roomServer := must.Get(rpc.NewTypedRoomServer(r, r.bus))
|
|
killRoomServer := r.roomServers.Replace(roomTopic, roomServer)
|
|
if err := roomServer.RegisterAllRoomTopics(roomTopic); err != nil {
|
|
killRoomServer()
|
|
r.lock.Unlock()
|
|
return nil, err
|
|
}
|
|
agentDispatchServer := must.Get(rpc.NewTypedAgentDispatchInternalServer(r, r.bus))
|
|
killDispServer := r.agentDispatchServers.Replace(roomTopic, agentDispatchServer)
|
|
if err := agentDispatchServer.RegisterAllRoomTopics(roomTopic); err != nil {
|
|
killRoomServer()
|
|
killDispServer()
|
|
r.lock.Unlock()
|
|
return nil, err
|
|
}
|
|
|
|
newRoom.OnClose(func() {
|
|
killRoomServer()
|
|
killDispServer()
|
|
|
|
roomInfo := newRoom.ToProto()
|
|
r.telemetry.RoomEnded(ctx, roomInfo)
|
|
prometheus.RoomEnded(time.Unix(roomInfo.CreationTime, 0))
|
|
if err := r.deleteRoom(ctx, roomName); err != nil {
|
|
newRoom.Logger().Errorw("could not delete room", err)
|
|
}
|
|
|
|
newRoom.Logger().Infow("room closed")
|
|
})
|
|
|
|
newRoom.OnRoomUpdated(func() {
|
|
if err := r.roomStore.StoreRoom(ctx, newRoom.ToProto(), newRoom.Internal()); err != nil {
|
|
newRoom.Logger().Errorw("could not handle metadata update", err)
|
|
}
|
|
})
|
|
|
|
newRoom.OnParticipantChanged(func(p types.Participant) {
|
|
if !p.IsDisconnected() {
|
|
if err := r.roomStore.StoreParticipant(ctx, roomName, p.ToProto()); err != nil {
|
|
newRoom.Logger().Errorw("could not handle participant change", err)
|
|
}
|
|
}
|
|
})
|
|
|
|
r.rooms[roomName] = newRoom
|
|
|
|
r.lock.Unlock()
|
|
|
|
newRoom.Hold()
|
|
|
|
r.telemetry.RoomStarted(ctx, newRoom.ToProto())
|
|
prometheus.RoomStarted()
|
|
|
|
if created && createRoom.GetEgress().GetRoom() != nil {
|
|
// ensure room name matches
|
|
createRoom.Egress.Room.RoomName = createRoom.Name
|
|
_, err = r.egressLauncher.StartEgress(ctx, &rpc.StartEgressRequest{
|
|
Request: &rpc.StartEgressRequest_RoomComposite{
|
|
RoomComposite: createRoom.Egress.Room,
|
|
},
|
|
RoomId: ri.Sid,
|
|
})
|
|
if err != nil {
|
|
newRoom.Release()
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return newRoom, nil
|
|
}
|
|
|
|
// manages an RTC session for a participant, runs on the RTC node
|
|
func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.LocalParticipant, requestSource routing.MessageSource) {
|
|
pLogger := participant.GetLogger()
|
|
defer func() {
|
|
pLogger.Debugw("RTC session finishing", "connID", requestSource.ConnectionID())
|
|
requestSource.Close()
|
|
}()
|
|
|
|
defer func() {
|
|
if r := rtc.Recover(pLogger); r != nil {
|
|
os.Exit(1)
|
|
}
|
|
}()
|
|
|
|
// send first refresh for cases when client token is close to expiring
|
|
_ = r.refreshToken(participant)
|
|
tokenTicker := time.NewTicker(tokenRefreshInterval)
|
|
defer tokenTicker.Stop()
|
|
for {
|
|
select {
|
|
case <-participant.Disconnected():
|
|
return
|
|
|
|
case <-tokenTicker.C:
|
|
// refresh token with the first API Key/secret pair
|
|
if err := r.refreshToken(participant); err != nil {
|
|
pLogger.Errorw("could not refresh token", err, "connID", requestSource.ConnectionID())
|
|
}
|
|
|
|
case obj := <-requestSource.ReadChan():
|
|
if obj == nil {
|
|
if room.GetParticipantRequestSource(participant.Identity()) == requestSource {
|
|
participant.HandleSignalSourceClose()
|
|
}
|
|
return
|
|
}
|
|
|
|
if err := participant.HandleSignalMessage(obj); err != nil {
|
|
// more specific errors are already logged
|
|
// treat errors returned as fatal
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
type participantReq interface {
|
|
GetRoom() string
|
|
GetIdentity() string
|
|
}
|
|
|
|
func (r *RoomManager) roomAndParticipantForReq(ctx context.Context, req participantReq) (*rtc.Room, types.LocalParticipant, error) {
|
|
room := r.GetRoom(ctx, livekit.RoomName(req.GetRoom()))
|
|
if room == nil {
|
|
return nil, nil, ErrRoomNotFound
|
|
}
|
|
|
|
participant := room.GetParticipant(livekit.ParticipantIdentity(req.GetIdentity()))
|
|
if participant == nil {
|
|
return nil, nil, ErrParticipantNotFound
|
|
}
|
|
|
|
return room, participant, nil
|
|
}
|
|
|
|
func (r *RoomManager) ListParticipants(ctx context.Context, req *livekit.ListParticipantsRequest) (*livekit.ListParticipantsResponse, error) {
|
|
room := r.GetRoom(ctx, livekit.RoomName(req.Room))
|
|
if room == nil {
|
|
return nil, ErrRoomNotFound
|
|
}
|
|
|
|
participants := room.GetParticipants()
|
|
items := make([]*livekit.ParticipantInfo, 0, len(participants))
|
|
for _, p := range participants {
|
|
items = append(items, p.ToProto())
|
|
}
|
|
|
|
return &livekit.ListParticipantsResponse{
|
|
Participants: items,
|
|
}, nil
|
|
}
|
|
|
|
func (r *RoomManager) GetParticipant(ctx context.Context, req *livekit.RoomParticipantIdentity) (*livekit.ParticipantInfo, error) {
|
|
room := r.GetRoom(ctx, livekit.RoomName(req.Room))
|
|
if room == nil {
|
|
return nil, ErrRoomNotFound
|
|
}
|
|
|
|
participant := room.GetParticipant(livekit.ParticipantIdentity(req.Identity))
|
|
if participant == nil {
|
|
return nil, ErrParticipantNotFound
|
|
}
|
|
|
|
return participant.ToProto(), nil
|
|
}
|
|
|
|
func (r *RoomManager) RemoveParticipant(ctx context.Context, req *livekit.RoomParticipantIdentity) (*livekit.RemoveParticipantResponse, error) {
|
|
room, participant, err := r.roomAndParticipantForReq(ctx, req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
participant.GetLogger().Infow("removing participant")
|
|
room.RemoveParticipant(livekit.ParticipantIdentity(req.Identity), "", types.ParticipantCloseReasonServiceRequestRemoveParticipant)
|
|
return &livekit.RemoveParticipantResponse{}, nil
|
|
}
|
|
|
|
func (r *RoomManager) MutePublishedTrack(ctx context.Context, req *livekit.MuteRoomTrackRequest) (*livekit.MuteRoomTrackResponse, error) {
|
|
_, participant, err := r.roomAndParticipantForReq(ctx, req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
participant.GetLogger().Debugw("setting track muted",
|
|
"trackID", req.TrackSid, "muted", req.Muted)
|
|
if !req.Muted && !r.config.Room.EnableRemoteUnmute {
|
|
participant.GetLogger().Errorw("cannot unmute track, remote unmute is disabled", nil)
|
|
return nil, ErrRemoteUnmuteNoteEnabled
|
|
}
|
|
track := participant.SetTrackMuted(&livekit.MuteTrackRequest{
|
|
Sid: req.TrackSid,
|
|
Muted: req.Muted,
|
|
}, true)
|
|
return &livekit.MuteRoomTrackResponse{Track: track}, nil
|
|
}
|
|
|
|
func (r *RoomManager) UpdateParticipant(ctx context.Context, req *livekit.UpdateParticipantRequest) (*livekit.ParticipantInfo, error) {
|
|
_, participant, err := r.roomAndParticipantForReq(ctx, req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err = participant.UpdateMetadata(&livekit.UpdateParticipantMetadata{
|
|
Name: req.Name,
|
|
Metadata: req.Metadata,
|
|
Attributes: req.Attributes,
|
|
}, true); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if req.Permission != nil {
|
|
participant.GetLogger().Debugw(
|
|
"updating participant permission",
|
|
"permission", req.Permission,
|
|
)
|
|
|
|
participant.SetPermission(req.Permission)
|
|
}
|
|
|
|
return participant.ToProto(), nil
|
|
}
|
|
|
|
func (r *RoomManager) ForwardParticipant(ctx context.Context, req *livekit.ForwardParticipantRequest) (*livekit.ForwardParticipantResponse, error) {
|
|
return nil, errors.New("not implemented")
|
|
}
|
|
|
|
func (r *RoomManager) MoveParticipant(ctx context.Context, req *livekit.MoveParticipantRequest) (*livekit.MoveParticipantResponse, error) {
|
|
return nil, errors.New("not implemented")
|
|
}
|
|
|
|
func (r *RoomManager) PerformRpc(ctx context.Context, req *livekit.PerformRpcRequest) (*livekit.PerformRpcResponse, error) {
|
|
room := r.GetRoom(ctx, livekit.RoomName(req.GetRoom()))
|
|
if room == nil {
|
|
return nil, ErrRoomNotFound
|
|
}
|
|
|
|
participant := room.GetParticipant(livekit.ParticipantIdentity(req.GetDestinationIdentity()))
|
|
if participant == nil {
|
|
return nil, ErrParticipantNotFound
|
|
}
|
|
|
|
resultChan := make(chan string, 1)
|
|
errorChan := make(chan error, 1)
|
|
|
|
participant.PerformRpc(req, resultChan, errorChan)
|
|
|
|
select {
|
|
case result := <-resultChan:
|
|
return &livekit.PerformRpcResponse{Payload: result}, nil
|
|
case err := <-errorChan:
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
func (r *RoomManager) DeleteRoom(ctx context.Context, req *livekit.DeleteRoomRequest) (*livekit.DeleteRoomResponse, error) {
|
|
room := r.GetRoom(ctx, livekit.RoomName(req.Room))
|
|
if room == nil {
|
|
// special case of a non-RTC room e.g. room created but no participants joined
|
|
logger.Debugw("Deleting non-rtc room, loading from roomstore")
|
|
err := r.roomStore.DeleteRoom(ctx, livekit.RoomName(req.Room))
|
|
if err != nil {
|
|
logger.Debugw("Error deleting non-rtc room", "err", err)
|
|
return nil, err
|
|
}
|
|
} else {
|
|
room.Logger().Infow("deleting room")
|
|
room.Close(types.ParticipantCloseReasonServiceRequestDeleteRoom)
|
|
}
|
|
return &livekit.DeleteRoomResponse{}, nil
|
|
}
|
|
|
|
func (r *RoomManager) UpdateSubscriptions(ctx context.Context, req *livekit.UpdateSubscriptionsRequest) (*livekit.UpdateSubscriptionsResponse, error) {
|
|
room, participant, err := r.roomAndParticipantForReq(ctx, req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
participant.GetLogger().Debugw("updating participant subscriptions")
|
|
room.UpdateSubscriptions(
|
|
participant,
|
|
livekit.StringsAsIDs[livekit.TrackID](req.TrackSids),
|
|
req.ParticipantTracks,
|
|
req.Subscribe,
|
|
)
|
|
return &livekit.UpdateSubscriptionsResponse{}, nil
|
|
}
|
|
|
|
func (r *RoomManager) SendData(ctx context.Context, req *livekit.SendDataRequest) (*livekit.SendDataResponse, error) {
|
|
room := r.GetRoom(ctx, livekit.RoomName(req.Room))
|
|
if room == nil {
|
|
return nil, ErrRoomNotFound
|
|
}
|
|
|
|
room.Logger().Debugw("api send data", "size", len(req.Data))
|
|
room.SendDataPacket(&livekit.DataPacket{
|
|
Kind: req.Kind,
|
|
DestinationIdentities: req.DestinationIdentities,
|
|
Value: &livekit.DataPacket_User{
|
|
User: &livekit.UserPacket{
|
|
Payload: req.Data,
|
|
DestinationSids: req.DestinationSids,
|
|
DestinationIdentities: req.DestinationIdentities,
|
|
Topic: req.Topic,
|
|
Nonce: req.Nonce,
|
|
},
|
|
},
|
|
}, req.Kind)
|
|
return &livekit.SendDataResponse{}, nil
|
|
}
|
|
|
|
func (r *RoomManager) UpdateRoomMetadata(ctx context.Context, req *livekit.UpdateRoomMetadataRequest) (*livekit.Room, error) {
|
|
room := r.GetRoom(ctx, livekit.RoomName(req.Room))
|
|
if room == nil {
|
|
return nil, ErrRoomNotFound
|
|
}
|
|
|
|
room.Logger().Debugw("updating room")
|
|
done := room.SetMetadata(req.Metadata)
|
|
// wait till the update is applied
|
|
<-done
|
|
return room.ToProto(), nil
|
|
}
|
|
|
|
func (r *RoomManager) ListDispatch(ctx context.Context, req *livekit.ListAgentDispatchRequest) (*livekit.ListAgentDispatchResponse, error) {
|
|
room := r.GetRoom(ctx, livekit.RoomName(req.Room))
|
|
if room == nil {
|
|
return nil, ErrRoomNotFound
|
|
}
|
|
|
|
disp, err := room.GetAgentDispatches(req.DispatchId)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
ret := &livekit.ListAgentDispatchResponse{
|
|
AgentDispatches: disp,
|
|
}
|
|
|
|
return ret, nil
|
|
}
|
|
|
|
func (r *RoomManager) CreateDispatch(ctx context.Context, req *livekit.AgentDispatch) (*livekit.AgentDispatch, error) {
|
|
room := r.GetRoom(ctx, livekit.RoomName(req.Room))
|
|
if room == nil {
|
|
return nil, ErrRoomNotFound
|
|
}
|
|
|
|
disp, err := room.AddAgentDispatch(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return disp, nil
|
|
}
|
|
|
|
func (r *RoomManager) DeleteDispatch(ctx context.Context, req *livekit.DeleteAgentDispatchRequest) (*livekit.AgentDispatch, error) {
|
|
room := r.GetRoom(ctx, livekit.RoomName(req.Room))
|
|
if room == nil {
|
|
return nil, ErrRoomNotFound
|
|
}
|
|
|
|
disp, err := room.DeleteAgentDispatch(req.DispatchId)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return disp, nil
|
|
}
|
|
|
|
func (r *RoomManager) iceServersForParticipant(apiKey string, participant types.LocalParticipant, tlsOnly bool) []*livekit.ICEServer {
|
|
var iceServers []*livekit.ICEServer
|
|
rtcConf := r.config.RTC
|
|
|
|
if tlsOnly && r.config.TURN.TLSPort == 0 {
|
|
logger.Warnw("tls only enabled but no turn tls config", nil)
|
|
tlsOnly = false
|
|
}
|
|
|
|
hasSTUN := false
|
|
if r.config.TURN.Enabled {
|
|
var urls []string
|
|
if r.config.TURN.UDPPort > 0 && !tlsOnly {
|
|
// UDP TURN is used as STUN
|
|
hasSTUN = true
|
|
for _, ip := range r.config.RTC.NodeIP.ToStringSlice() {
|
|
urls = append(urls, fmt.Sprintf("turn:%s:%d?transport=udp", ip, r.config.TURN.UDPPort))
|
|
}
|
|
}
|
|
if r.config.TURN.TLSPort > 0 {
|
|
urls = append(urls, fmt.Sprintf("turns:%s:443?transport=tcp", r.config.TURN.Domain))
|
|
}
|
|
if len(urls) > 0 {
|
|
username := r.turnAuthHandler.CreateUsername(apiKey, participant.ID())
|
|
password, err := r.turnAuthHandler.CreatePassword(apiKey, participant.ID())
|
|
if err != nil {
|
|
participant.GetLogger().Warnw("could not create turn password", err)
|
|
hasSTUN = false
|
|
} else {
|
|
iceServers = append(iceServers, &livekit.ICEServer{
|
|
Urls: urls,
|
|
Username: username,
|
|
Credential: password,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(rtcConf.TURNServers) > 0 {
|
|
hasSTUN = true
|
|
for _, s := range r.config.RTC.TURNServers {
|
|
scheme := "turn"
|
|
transport := "tcp"
|
|
switch s.Protocol {
|
|
case "tls":
|
|
scheme = "turns"
|
|
case "udp":
|
|
transport = "udp"
|
|
}
|
|
|
|
var username, credential string
|
|
if s.Secret != "" {
|
|
// Generate dynamic credentials using TURN static auth secrets
|
|
ttl := s.TTL
|
|
if ttl == 0 {
|
|
ttl = 14400 // Default 4 hours
|
|
}
|
|
|
|
expiry := time.Now().Add(time.Duration(ttl) * time.Second).Unix()
|
|
participantID := string(participant.ID())
|
|
username = fmt.Sprintf("%d:%s", expiry, participantID)
|
|
|
|
// HMAC-SHA1 signature
|
|
h := hmac.New(sha1.New, []byte(s.Secret))
|
|
h.Write([]byte(username))
|
|
credential = base64.StdEncoding.EncodeToString(h.Sum(nil))
|
|
} else {
|
|
// Use static credentials
|
|
username = s.Username
|
|
credential = s.Credential
|
|
}
|
|
|
|
is := &livekit.ICEServer{
|
|
Urls: []string{
|
|
fmt.Sprintf("%s:%s:%d?transport=%s", scheme, s.Host, s.Port, transport),
|
|
},
|
|
Username: username,
|
|
Credential: credential,
|
|
}
|
|
iceServers = append(iceServers, is)
|
|
}
|
|
}
|
|
|
|
if len(rtcConf.STUNServers) > 0 {
|
|
hasSTUN = true
|
|
iceServers = append(iceServers, iceServerForStunServers(r.config.RTC.STUNServers))
|
|
}
|
|
|
|
if !hasSTUN {
|
|
iceServers = append(iceServers, iceServerForStunServers(rtcconfig.DefaultStunServers))
|
|
}
|
|
return iceServers
|
|
}
|
|
|
|
func (r *RoomManager) refreshToken(participant types.LocalParticipant) error {
|
|
key, secret, err := r.getFirstKeyPair()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
grants := participant.ClaimGrants()
|
|
token := auth.NewAccessToken(key, secret)
|
|
token.SetName(grants.Name).
|
|
SetIdentity(string(participant.Identity())).
|
|
SetKind(grants.GetParticipantKind()).
|
|
SetValidFor(tokenDefaultTTL).
|
|
SetMetadata(grants.Metadata).
|
|
SetAttributes(grants.Attributes).
|
|
SetVideoGrant(grants.Video).
|
|
SetRoomConfig(grants.GetRoomConfiguration()).
|
|
SetRoomPreset(grants.RoomPreset)
|
|
jwt, err := token.ToJWT()
|
|
if err == nil {
|
|
err = participant.SendRefreshToken(jwt)
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *RoomManager) setIceConfig(roomName livekit.RoomName, participant types.LocalParticipant) *livekit.ICEConfig {
|
|
iceConfig := r.getIceConfig(roomName, participant)
|
|
participant.SetICEConfig(iceConfig)
|
|
return iceConfig
|
|
}
|
|
|
|
func (r *RoomManager) getIceConfig(roomName livekit.RoomName, participant types.LocalParticipant) *livekit.ICEConfig {
|
|
return r.iceConfigCache.Get(iceConfigCacheKey{roomName, participant.Identity()})
|
|
}
|
|
|
|
func (r *RoomManager) getFirstKeyPair() (string, string, error) {
|
|
for key, secret := range r.config.Keys {
|
|
return key, secret, nil
|
|
}
|
|
return "", "", errors.New("no API keys configured")
|
|
}
|
|
|
|
// ------------------------------------
|
|
|
|
func iceServerForStunServers(servers []string) *livekit.ICEServer {
|
|
iceServer := &livekit.ICEServer{}
|
|
for _, stunServer := range servers {
|
|
iceServer.Urls = append(iceServer.Urls, fmt.Sprintf("stun:%s", stunServer))
|
|
}
|
|
return iceServer
|
|
}
|
|
|
|
// ------------------------------------
|
|
|
|
type roomManagerParticipantHelper struct {
|
|
room *rtc.Room
|
|
codecRegressionThreshold int
|
|
}
|
|
|
|
func (h *roomManagerParticipantHelper) GetParticipantInfo(pID livekit.ParticipantID) *livekit.ParticipantInfo {
|
|
if p := h.room.GetParticipantByID(pID); p != nil {
|
|
return p.ToProto()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (h *roomManagerParticipantHelper) GetRegionSettings(ip string) *livekit.RegionSettings {
|
|
return nil
|
|
}
|
|
|
|
func (h *roomManagerParticipantHelper) GetSubscriberForwarderState(lp types.LocalParticipant) (map[livekit.TrackID]*livekit.RTPForwarderState, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
func (h *roomManagerParticipantHelper) ResolveMediaTrack(lp types.LocalParticipant, trackID livekit.TrackID) types.MediaResolverResult {
|
|
return h.room.ResolveMediaTrackForSubscriber(lp, trackID)
|
|
}
|
|
|
|
func (h *roomManagerParticipantHelper) ResolveDataTrack(lp types.LocalParticipant, trackID livekit.TrackID) types.DataResolverResult {
|
|
return h.room.ResolveDataTrackForSubscriber(lp, trackID)
|
|
}
|
|
|
|
func (h *roomManagerParticipantHelper) ShouldRegressCodec() bool {
|
|
return h.codecRegressionThreshold == 0 || h.room.GetParticipantCount() < h.codecRegressionThreshold
|
|
}
|
|
|
|
func (h *roomManagerParticipantHelper) GetCachedReliableDataMessage(seqs map[livekit.ParticipantID]uint32) []*types.DataMessageCache {
|
|
return h.room.GetCachedReliableDataMessage(seqs)
|
|
}
|