mirror of
https://github.com/livekit/livekit.git
synced 2026-04-26 21:45:24 +00:00
clean up legacy rpc (#2384)
* clean up legacy rpc * cleanup * cleanup * cleanup * tidy * cleanup * cleanup
This commit is contained in:
@@ -183,8 +183,6 @@ keys:
|
||||
# since v1.4.0, a more reliable, psrpc based signal relay is available
|
||||
# this gives us the ability to reliably proxy messages between a signal server and RTC node
|
||||
# signal_relay:
|
||||
# # enabled by default as of v1.5.0, legacy signal proxy will be removed in v1.6
|
||||
# enabled: true
|
||||
# # amount of time a message delivery is tried before giving up
|
||||
# retry_timeout: 30s
|
||||
# # minimum amount of time to wait for RTC node to ack,
|
||||
@@ -199,8 +197,6 @@ keys:
|
||||
# PSRPC
|
||||
# since v1.5.1, a more reliable, psrpc based internal rpc
|
||||
# psrpc:
|
||||
# # enable the psrpc internal api client for roomservice calls
|
||||
# enabled: true
|
||||
# # maximum number of rpc attempts
|
||||
# max_attempts: 3
|
||||
# # initial time to wait for calls to complete
|
||||
|
||||
@@ -268,7 +268,6 @@ type NodeSelectorConfig struct {
|
||||
}
|
||||
|
||||
type SignalRelayConfig struct {
|
||||
Enabled bool `yaml:"enabled,omitempty"`
|
||||
RetryTimeout time.Duration `yaml:"retry_timeout,omitempty"`
|
||||
MinRetryInterval time.Duration `yaml:"min_retry_interval,omitempty"`
|
||||
MaxRetryInterval time.Duration `yaml:"max_retry_interval,omitempty"`
|
||||
@@ -487,7 +486,6 @@ var DefaultConfig = Config{
|
||||
CPULoadLimit: 0.9,
|
||||
},
|
||||
SignalRelay: SignalRelayConfig{
|
||||
Enabled: true,
|
||||
RetryTimeout: 7500 * time.Millisecond,
|
||||
MinRetryInterval: 500 * time.Millisecond,
|
||||
MaxRetryInterval: 4 * time.Second,
|
||||
|
||||
@@ -21,7 +21,6 @@ import (
|
||||
"github.com/redis/go-redis/v9"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/config"
|
||||
"github.com/livekit/protocol/auth"
|
||||
"github.com/livekit/protocol/livekit"
|
||||
"github.com/livekit/protocol/logger"
|
||||
@@ -99,12 +98,6 @@ type Router interface {
|
||||
Start() error
|
||||
Drain()
|
||||
Stop()
|
||||
|
||||
// OnNewParticipantRTC is called to start a new participant's RTC connection
|
||||
OnNewParticipantRTC(callback NewParticipantCallback)
|
||||
|
||||
// OnRTCMessage is called to execute actions on the RTC node
|
||||
OnRTCMessage(callback RTCMessageCallback)
|
||||
}
|
||||
|
||||
type StartParticipantSignalResults struct {
|
||||
@@ -118,17 +111,13 @@ type StartParticipantSignalResults struct {
|
||||
type MessageRouter interface {
|
||||
// StartParticipantSignal participant signal connection is ready to start
|
||||
StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (res StartParticipantSignalResults, err error)
|
||||
|
||||
// Write a message to a participant or room
|
||||
WriteParticipantRTC(ctx context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error
|
||||
WriteRoomRTC(ctx context.Context, roomName livekit.RoomName, msg *livekit.RTCNodeMessage) error
|
||||
}
|
||||
|
||||
func CreateRouter(config *config.Config, rc redis.UniversalClient, node LocalNode, signalClient SignalClient) Router {
|
||||
func CreateRouter(rc redis.UniversalClient, node LocalNode, signalClient SignalClient) Router {
|
||||
lr := NewLocalRouter(node, signalClient)
|
||||
|
||||
if rc != nil {
|
||||
return NewRedisRouter(config, lr, rc)
|
||||
return NewRedisRouter(lr, rc)
|
||||
}
|
||||
|
||||
// local routing and store
|
||||
|
||||
+4
-125
@@ -26,8 +26,7 @@ import (
|
||||
"github.com/livekit/protocol/logger"
|
||||
)
|
||||
|
||||
// aggregated channel for all participants
|
||||
const localRTCChannelSize = 10000
|
||||
var _ Router = (*LocalRouter)(nil)
|
||||
|
||||
// a router of messages on the same node, basic implementation for local testing
|
||||
type LocalRouter struct {
|
||||
@@ -39,11 +38,6 @@ type LocalRouter struct {
|
||||
requestChannels map[string]*MessageChannel
|
||||
responseChannels map[string]*MessageChannel
|
||||
isStarted atomic.Bool
|
||||
|
||||
rtcMessageChan *MessageChannel
|
||||
|
||||
onNewParticipant NewParticipantCallback
|
||||
onRTCMessage RTCMessageCallback
|
||||
}
|
||||
|
||||
func NewLocalRouter(currentNode LocalNode, signalClient SignalClient) *LocalRouter {
|
||||
@@ -52,7 +46,6 @@ func NewLocalRouter(currentNode LocalNode, signalClient SignalClient) *LocalRout
|
||||
signalClient: signalClient,
|
||||
requestChannels: make(map[string]*MessageChannel),
|
||||
responseChannels: make(map[string]*MessageChannel),
|
||||
rtcMessageChan: NewMessageChannel(livekit.ConnectionID("local"), localRTCChannelSize),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -68,7 +61,6 @@ func (r *LocalRouter) SetNodeForRoom(_ context.Context, _ livekit.RoomName, _ li
|
||||
}
|
||||
|
||||
func (r *LocalRouter) ClearRoomState(_ context.Context, _ livekit.RoomName) error {
|
||||
// do nothing
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -120,65 +112,22 @@ func (r *LocalRouter) StartParticipantSignalWithNodeID(ctx context.Context, room
|
||||
return
|
||||
}
|
||||
|
||||
func (r *LocalRouter) WriteParticipantRTC(_ context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error {
|
||||
r.lock.Lock()
|
||||
if r.rtcMessageChan.IsClosed() {
|
||||
// create a new one
|
||||
r.rtcMessageChan = NewMessageChannel(livekit.ConnectionID("local"), localRTCChannelSize)
|
||||
}
|
||||
r.lock.Unlock()
|
||||
msg.ParticipantKey = string(ParticipantKeyLegacy(roomName, identity))
|
||||
msg.ParticipantKeyB62 = string(ParticipantKey(roomName, identity))
|
||||
return r.writeRTCMessage(r.rtcMessageChan, msg)
|
||||
}
|
||||
|
||||
func (r *LocalRouter) WriteRoomRTC(ctx context.Context, roomName livekit.RoomName, msg *livekit.RTCNodeMessage) error {
|
||||
msg.ParticipantKey = string(ParticipantKeyLegacy(roomName, ""))
|
||||
msg.ParticipantKeyB62 = string(ParticipantKey(roomName, ""))
|
||||
return r.WriteNodeRTC(ctx, r.currentNode.Id, msg)
|
||||
}
|
||||
|
||||
func (r *LocalRouter) WriteNodeRTC(_ context.Context, _ string, msg *livekit.RTCNodeMessage) error {
|
||||
r.lock.Lock()
|
||||
if r.rtcMessageChan.IsClosed() {
|
||||
// create a new one
|
||||
r.rtcMessageChan = NewMessageChannel(livekit.ConnectionID("local"), localRTCChannelSize)
|
||||
}
|
||||
r.lock.Unlock()
|
||||
return r.writeRTCMessage(r.rtcMessageChan, msg)
|
||||
}
|
||||
|
||||
func (r *LocalRouter) writeRTCMessage(sink MessageSink, msg *livekit.RTCNodeMessage) error {
|
||||
msg.SenderTime = time.Now().Unix()
|
||||
return sink.WriteMessage(msg)
|
||||
}
|
||||
|
||||
func (r *LocalRouter) OnNewParticipantRTC(callback NewParticipantCallback) {
|
||||
r.onNewParticipant = callback
|
||||
}
|
||||
|
||||
func (r *LocalRouter) OnRTCMessage(callback RTCMessageCallback) {
|
||||
r.onRTCMessage = callback
|
||||
}
|
||||
|
||||
func (r *LocalRouter) Start() error {
|
||||
if r.isStarted.Swap(true) {
|
||||
return nil
|
||||
}
|
||||
go r.statsWorker()
|
||||
// go r.memStatsWorker()
|
||||
// on local routers, Start doesn't do anything, websocket connections initiate the connections
|
||||
go r.rtcMessageWorker()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *LocalRouter) Drain() {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
r.currentNode.State = livekit.NodeState_SHUTTING_DOWN
|
||||
}
|
||||
|
||||
func (r *LocalRouter) Stop() {
|
||||
r.rtcMessageChan.Close()
|
||||
}
|
||||
func (r *LocalRouter) Stop() {}
|
||||
|
||||
func (r *LocalRouter) GetRegion() string {
|
||||
return r.currentNode.Region
|
||||
@@ -214,73 +163,3 @@ func (r *LocalRouter) statsWorker() {
|
||||
}
|
||||
}
|
||||
*/
|
||||
func (r *LocalRouter) rtcMessageWorker() {
|
||||
// is a new channel available? if so swap to that one
|
||||
if !r.isStarted.Load() {
|
||||
return
|
||||
}
|
||||
|
||||
// start a new worker after this finished
|
||||
defer func() {
|
||||
go r.rtcMessageWorker()
|
||||
}()
|
||||
|
||||
r.lock.RLock()
|
||||
isClosed := r.rtcMessageChan.IsClosed()
|
||||
r.lock.RUnlock()
|
||||
if isClosed {
|
||||
// sleep and retry
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
|
||||
r.lock.RLock()
|
||||
msgChan := r.rtcMessageChan.ReadChan()
|
||||
r.lock.RUnlock()
|
||||
// consume messages from
|
||||
for msg := range msgChan {
|
||||
if rtcMsg, ok := msg.(*livekit.RTCNodeMessage); ok {
|
||||
var room livekit.RoomName
|
||||
var identity livekit.ParticipantIdentity
|
||||
var err error
|
||||
if rtcMsg.ParticipantKeyB62 != "" {
|
||||
room, identity, err = parseParticipantKey(livekit.ParticipantKey(rtcMsg.ParticipantKeyB62))
|
||||
}
|
||||
if err != nil {
|
||||
room, identity, err = parseParticipantKeyLegacy(livekit.ParticipantKey(rtcMsg.ParticipantKey))
|
||||
}
|
||||
if err != nil {
|
||||
logger.Errorw("could not process RTC message", err)
|
||||
continue
|
||||
}
|
||||
if r.onRTCMessage != nil {
|
||||
r.onRTCMessage(context.Background(), room, identity, rtcMsg)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *LocalRouter) getMessageChannel(target map[string]*MessageChannel, key string) *MessageChannel {
|
||||
r.lock.RLock()
|
||||
defer r.lock.RUnlock()
|
||||
return target[key]
|
||||
}
|
||||
|
||||
func (r *LocalRouter) getOrCreateMessageChannel(target map[string]*MessageChannel, key string) *MessageChannel {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
mc := target[key]
|
||||
|
||||
if mc != nil {
|
||||
return mc
|
||||
}
|
||||
|
||||
mc = NewMessageChannel(livekit.ConnectionID(key), DefaultMessageChannelSize)
|
||||
mc.OnClose(func() {
|
||||
r.lock.Lock()
|
||||
delete(target, key)
|
||||
r.lock.Unlock()
|
||||
})
|
||||
target[key] = mc
|
||||
|
||||
return mc
|
||||
}
|
||||
|
||||
+16
-294
@@ -28,9 +28,7 @@ import (
|
||||
|
||||
"github.com/livekit/protocol/livekit"
|
||||
"github.com/livekit/protocol/logger"
|
||||
"github.com/livekit/protocol/utils"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/config"
|
||||
"github.com/livekit/livekit-server/pkg/routing/selector"
|
||||
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
|
||||
)
|
||||
@@ -42,17 +40,18 @@ const (
|
||||
statsMaxDelaySeconds = 30
|
||||
)
|
||||
|
||||
var _ Router = (*RedisRouter)(nil)
|
||||
|
||||
// RedisRouter uses Redis pub/sub to route signaling messages across different nodes
|
||||
// It relies on the RTC node to be the primary driver of the participant connection.
|
||||
// Because
|
||||
type RedisRouter struct {
|
||||
*LocalRouter
|
||||
|
||||
rc redis.UniversalClient
|
||||
usePSRPCSignal bool
|
||||
ctx context.Context
|
||||
isStarted atomic.Bool
|
||||
nodeMu sync.RWMutex
|
||||
rc redis.UniversalClient
|
||||
ctx context.Context
|
||||
isStarted atomic.Bool
|
||||
nodeMu sync.RWMutex
|
||||
// previous stats for computing averages
|
||||
prevStats *livekit.NodeStats
|
||||
|
||||
@@ -60,11 +59,10 @@ type RedisRouter struct {
|
||||
cancel func()
|
||||
}
|
||||
|
||||
func NewRedisRouter(config *config.Config, lr *LocalRouter, rc redis.UniversalClient) *RedisRouter {
|
||||
func NewRedisRouter(lr *LocalRouter, rc redis.UniversalClient) *RedisRouter {
|
||||
rr := &RedisRouter{
|
||||
LocalRouter: lr,
|
||||
rc: rc,
|
||||
usePSRPCSignal: config.SignalRelay.Enabled,
|
||||
LocalRouter: lr,
|
||||
rc: rc,
|
||||
}
|
||||
rr.ctx, rr.cancel = context.WithCancel(context.Background())
|
||||
return rr
|
||||
@@ -163,72 +161,7 @@ func (r *RedisRouter) StartParticipantSignal(ctx context.Context, roomName livek
|
||||
return
|
||||
}
|
||||
|
||||
if r.usePSRPCSignal {
|
||||
res, err = r.StartParticipantSignalWithNodeID(ctx, roomName, pi, livekit.NodeID(rtcNode.Id))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// map signal & rtc nodes
|
||||
err = r.setParticipantSignalNode(res.ConnectionID, r.currentNode.Id)
|
||||
return
|
||||
}
|
||||
|
||||
res.ConnectionID = livekit.ConnectionID(utils.NewGuid("CO_"))
|
||||
pKey := ParticipantKeyLegacy(roomName, pi.Identity)
|
||||
pKeyB62 := ParticipantKey(roomName, pi.Identity)
|
||||
|
||||
// map signal & rtc nodes
|
||||
if err = r.setParticipantSignalNode(res.ConnectionID, r.currentNode.Id); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// index by connectionID, since there may be multiple connections for the participant
|
||||
// set up response channel before sending StartSession and be ready to receive responses.
|
||||
resChan := r.getOrCreateMessageChannel(r.responseChannels, string(res.ConnectionID))
|
||||
|
||||
sink := NewRTCNodeSink(r.rc, livekit.NodeID(rtcNode.Id), res.ConnectionID, pKey, pKeyB62)
|
||||
|
||||
// serialize claims
|
||||
ss, err := pi.ToStartSession(roomName, res.ConnectionID)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// sends a message to start session
|
||||
err = sink.WriteMessage(ss)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
res.RequestSink = sink
|
||||
res.ResponseSource = resChan
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (r *RedisRouter) WriteParticipantRTC(_ context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error {
|
||||
pkey := ParticipantKeyLegacy(roomName, identity)
|
||||
pkeyB62 := ParticipantKey(roomName, identity)
|
||||
rtcNode, err := r.getParticipantRTCNode(pkey, pkeyB62)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rtcSink := NewRTCNodeSink(r.rc, livekit.NodeID(rtcNode), "ephemeral", pkey, pkeyB62)
|
||||
msg.ParticipantKey = string(ParticipantKeyLegacy(roomName, identity))
|
||||
msg.ParticipantKeyB62 = string(ParticipantKey(roomName, identity))
|
||||
defer rtcSink.Close()
|
||||
return r.writeRTCMessage(rtcSink, msg)
|
||||
}
|
||||
|
||||
func (r *RedisRouter) WriteRoomRTC(ctx context.Context, roomName livekit.RoomName, msg *livekit.RTCNodeMessage) error {
|
||||
node, err := r.GetNodeForRoom(ctx, roomName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
msg.ParticipantKey = string(ParticipantKeyLegacy(roomName, ""))
|
||||
msg.ParticipantKeyB62 = string(ParticipantKey(roomName, ""))
|
||||
return r.WriteNodeRTC(ctx, node.Id, msg)
|
||||
return r.StartParticipantSignalWithNodeID(ctx, roomName, pi, livekit.NodeID(rtcNode.Id))
|
||||
}
|
||||
|
||||
func (r *RedisRouter) WriteNodeRTC(_ context.Context, rtcNodeID string, msg *livekit.RTCNodeMessage) error {
|
||||
@@ -237,82 +170,9 @@ func (r *RedisRouter) WriteNodeRTC(_ context.Context, rtcNodeID string, msg *liv
|
||||
return r.writeRTCMessage(rtcSink, msg)
|
||||
}
|
||||
|
||||
func (r *RedisRouter) startParticipantRTC(ss *livekit.StartSession, participantKey livekit.ParticipantKey, participantKeyB62 livekit.ParticipantKey) error {
|
||||
prometheus.IncrementParticipantRtcInit(1)
|
||||
// find the node where the room is hosted at
|
||||
rtcNode, err := r.GetNodeForRoom(r.ctx, livekit.RoomName(ss.RoomName))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if rtcNode.Id != r.currentNode.Id {
|
||||
err = ErrIncorrectRTCNode
|
||||
logger.Errorw("called participant on incorrect node", err,
|
||||
"rtcNode", rtcNode,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
if err := r.SetParticipantRTCNode(participantKey, participantKeyB62, rtcNode.Id); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// find signal node to send responses back
|
||||
signalNode, err := r.getParticipantSignalNode(livekit.ConnectionID(ss.ConnectionId))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// treat it as a new participant connecting
|
||||
if r.onNewParticipant == nil {
|
||||
return ErrHandlerNotDefined
|
||||
}
|
||||
|
||||
// we do not want to re-use the same response sink
|
||||
// the previous rtc worker thread is still consuming off of it.
|
||||
// we'll want to sever the connection and switch to the new one
|
||||
r.lock.RLock()
|
||||
var requestChan *MessageChannel
|
||||
var ok bool
|
||||
var pkey livekit.ParticipantKey
|
||||
if participantKeyB62 != "" {
|
||||
requestChan, ok = r.requestChannels[string(participantKeyB62)]
|
||||
pkey = participantKeyB62
|
||||
} else {
|
||||
requestChan, ok = r.requestChannels[string(participantKey)]
|
||||
pkey = participantKey
|
||||
}
|
||||
r.lock.RUnlock()
|
||||
if ok {
|
||||
requestChan.Close()
|
||||
}
|
||||
|
||||
pi, err := ParticipantInitFromStartSession(ss, r.currentNode.Region)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
reqChan := r.getOrCreateMessageChannel(r.requestChannels, string(pkey))
|
||||
resSink := NewSignalNodeSink(r.rc, livekit.NodeID(signalNode), livekit.ConnectionID(ss.ConnectionId))
|
||||
go func() {
|
||||
err := r.onNewParticipant(
|
||||
r.ctx,
|
||||
livekit.RoomName(ss.RoomName),
|
||||
*pi,
|
||||
reqChan,
|
||||
resSink,
|
||||
)
|
||||
if err != nil {
|
||||
logger.Errorw("could not handle new participant", err,
|
||||
"room", ss.RoomName,
|
||||
"participant", ss.Identity,
|
||||
)
|
||||
// cleanup request channels
|
||||
reqChan.Close()
|
||||
resSink.Close()
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
func (r *LocalRouter) writeRTCMessage(sink MessageSink, msg *livekit.RTCNodeMessage) error {
|
||||
msg.SenderTime = time.Now().Unix()
|
||||
return sink.WriteMessage(msg)
|
||||
}
|
||||
|
||||
func (r *RedisRouter) Start() error {
|
||||
@@ -352,58 +212,6 @@ func (r *RedisRouter) Stop() {
|
||||
r.cancel()
|
||||
}
|
||||
|
||||
func (r *RedisRouter) SetParticipantRTCNode(participantKey livekit.ParticipantKey, participantKeyB62 livekit.ParticipantKey, nodeID string) error {
|
||||
var err error
|
||||
if participantKey != "" {
|
||||
err1 := r.rc.Set(r.ctx, participantRTCKey(participantKey), nodeID, participantMappingTTL).Err()
|
||||
if err1 != nil {
|
||||
err = errors.Wrap(err, "could not set rtc node")
|
||||
}
|
||||
}
|
||||
if participantKeyB62 != "" {
|
||||
err2 := r.rc.Set(r.ctx, participantRTCKey(participantKeyB62), nodeID, participantMappingTTL).Err()
|
||||
if err2 != nil {
|
||||
err = errors.Wrap(err, "could not set rtc node")
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *RedisRouter) setParticipantSignalNode(connectionID livekit.ConnectionID, nodeID string) error {
|
||||
if err := r.rc.Set(r.ctx, participantSignalKey(connectionID), nodeID, participantMappingTTL).Err(); err != nil {
|
||||
return errors.Wrap(err, "could not set signal node")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *RedisRouter) getParticipantRTCNode(participantKey livekit.ParticipantKey, participantKeyB62 livekit.ParticipantKey) (string, error) {
|
||||
var val string
|
||||
var err error
|
||||
if participantKeyB62 != "" {
|
||||
val, err = r.rc.Get(r.ctx, participantRTCKey(participantKeyB62)).Result()
|
||||
if err == redis.Nil {
|
||||
val, err = r.rc.Get(r.ctx, participantRTCKey(participantKey)).Result()
|
||||
if err == redis.Nil {
|
||||
err = ErrNodeNotFound
|
||||
}
|
||||
}
|
||||
} else {
|
||||
val, err = r.rc.Get(r.ctx, participantRTCKey(participantKey)).Result()
|
||||
if err == redis.Nil {
|
||||
err = ErrNodeNotFound
|
||||
}
|
||||
}
|
||||
return val, err
|
||||
}
|
||||
|
||||
func (r *RedisRouter) getParticipantSignalNode(connectionID livekit.ConnectionID) (nodeID string, err error) {
|
||||
val, err := r.rc.Get(r.ctx, participantSignalKey(connectionID)).Result()
|
||||
if err == redis.Nil {
|
||||
err = ErrNodeNotFound
|
||||
}
|
||||
return val, err
|
||||
}
|
||||
|
||||
// update node stats and cleanup
|
||||
func (r *RedisRouter) statsWorker() {
|
||||
goroutineDumped := false
|
||||
@@ -444,9 +252,8 @@ func (r *RedisRouter) redisWorker(startedChan chan struct{}) {
|
||||
}()
|
||||
logger.Debugw("starting redisWorker", "nodeID", r.currentNode.Id)
|
||||
|
||||
sigChannel := signalNodeChannel(livekit.NodeID(r.currentNode.Id))
|
||||
rtcChannel := rtcNodeChannel(livekit.NodeID(r.currentNode.Id))
|
||||
r.pubsub = r.rc.Subscribe(r.ctx, sigChannel, rtcChannel)
|
||||
r.pubsub = r.rc.Subscribe(r.ctx, rtcChannel)
|
||||
|
||||
close(startedChan)
|
||||
for msg := range r.pubsub.Channel() {
|
||||
@@ -454,20 +261,7 @@ func (r *RedisRouter) redisWorker(startedChan chan struct{}) {
|
||||
return
|
||||
}
|
||||
|
||||
if msg.Channel == sigChannel {
|
||||
sm := livekit.SignalNodeMessage{}
|
||||
if err := proto.Unmarshal([]byte(msg.Payload), &sm); err != nil {
|
||||
logger.Errorw("could not unmarshal signal message on sigchan", err)
|
||||
prometheus.MessageCounter.WithLabelValues("signal", "failure").Add(1)
|
||||
continue
|
||||
}
|
||||
if err := r.handleSignalMessage(&sm); err != nil {
|
||||
logger.Errorw("error processing signal message", err)
|
||||
prometheus.MessageCounter.WithLabelValues("signal", "failure").Add(1)
|
||||
continue
|
||||
}
|
||||
prometheus.MessageCounter.WithLabelValues("signal", "success").Add(1)
|
||||
} else if msg.Channel == rtcChannel {
|
||||
if msg.Channel == rtcChannel {
|
||||
rm := livekit.RTCNodeMessage{}
|
||||
if err := proto.Unmarshal([]byte(msg.Payload), &rm); err != nil {
|
||||
logger.Errorw("could not unmarshal RTC message on rtcchan", err)
|
||||
@@ -484,62 +278,8 @@ func (r *RedisRouter) redisWorker(startedChan chan struct{}) {
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RedisRouter) handleSignalMessage(sm *livekit.SignalNodeMessage) error {
|
||||
connectionID := sm.ConnectionId
|
||||
|
||||
r.lock.RLock()
|
||||
resSink := r.responseChannels[connectionID]
|
||||
r.lock.RUnlock()
|
||||
|
||||
// if a client closed the channel, then sent more messages after that,
|
||||
if resSink == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
switch rmb := sm.Message.(type) {
|
||||
case *livekit.SignalNodeMessage_Response:
|
||||
// logger.Debugw("forwarding signal message",
|
||||
// "connID", connectionID,
|
||||
// "type", fmt.Sprintf("%T", rmb.Response.Message))
|
||||
if err := resSink.WriteMessage(rmb.Response); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
case *livekit.SignalNodeMessage_EndSession:
|
||||
// logger.Debugw("received EndSession, closing signal connection",
|
||||
// "connID", connectionID)
|
||||
resSink.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *RedisRouter) handleRTCMessage(rm *livekit.RTCNodeMessage) error {
|
||||
pKey := livekit.ParticipantKey(rm.ParticipantKey)
|
||||
pKeyB62 := livekit.ParticipantKey(rm.ParticipantKeyB62)
|
||||
|
||||
switch rmb := rm.Message.(type) {
|
||||
case *livekit.RTCNodeMessage_StartSession:
|
||||
// RTC session should start on this node
|
||||
if err := r.startParticipantRTC(rmb.StartSession, pKey, pKeyB62); err != nil {
|
||||
return errors.Wrap(err, "could not start participant")
|
||||
}
|
||||
|
||||
case *livekit.RTCNodeMessage_Request:
|
||||
r.lock.RLock()
|
||||
var requestChan *MessageChannel
|
||||
if pKeyB62 != "" {
|
||||
requestChan = r.requestChannels[string(pKeyB62)]
|
||||
} else {
|
||||
requestChan = r.requestChannels[string(pKey)]
|
||||
}
|
||||
r.lock.RUnlock()
|
||||
if requestChan == nil {
|
||||
return ErrChannelClosed
|
||||
}
|
||||
if err := requestChan.WriteMessage(rmb.Request); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
switch rm.Message.(type) {
|
||||
case *livekit.RTCNodeMessage_KeepAlive:
|
||||
if time.Since(time.Unix(rm.SenderTime, 0)) > statsUpdateInterval {
|
||||
logger.Infow("keep alive too old, skipping", "senderTime", rm.SenderTime)
|
||||
@@ -566,24 +306,6 @@ func (r *RedisRouter) handleRTCMessage(rm *livekit.RTCNodeMessage) error {
|
||||
if err := r.RegisterNode(); err != nil {
|
||||
logger.Errorw("could not update node", err)
|
||||
}
|
||||
|
||||
default:
|
||||
// route it to handler
|
||||
if r.onRTCMessage != nil {
|
||||
var roomName livekit.RoomName
|
||||
var identity livekit.ParticipantIdentity
|
||||
var err error
|
||||
if pKeyB62 != "" {
|
||||
roomName, identity, err = parseParticipantKey(pKeyB62)
|
||||
}
|
||||
if err != nil || pKeyB62 == "" {
|
||||
roomName, identity, err = parseParticipantKeyLegacy(pKey)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.onRTCMessage(r.ctx, roomName, identity, rm)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -62,16 +62,6 @@ type FakeRouter struct {
|
||||
result1 []*livekit.Node
|
||||
result2 error
|
||||
}
|
||||
OnNewParticipantRTCStub func(routing.NewParticipantCallback)
|
||||
onNewParticipantRTCMutex sync.RWMutex
|
||||
onNewParticipantRTCArgsForCall []struct {
|
||||
arg1 routing.NewParticipantCallback
|
||||
}
|
||||
OnRTCMessageStub func(routing.RTCMessageCallback)
|
||||
onRTCMessageMutex sync.RWMutex
|
||||
onRTCMessageArgsForCall []struct {
|
||||
arg1 routing.RTCMessageCallback
|
||||
}
|
||||
RegisterNodeStub func() error
|
||||
registerNodeMutex sync.RWMutex
|
||||
registerNodeArgsForCall []struct {
|
||||
@@ -144,33 +134,6 @@ type FakeRouter struct {
|
||||
unregisterNodeReturnsOnCall map[int]struct {
|
||||
result1 error
|
||||
}
|
||||
WriteParticipantRTCStub func(context.Context, livekit.RoomName, livekit.ParticipantIdentity, *livekit.RTCNodeMessage) error
|
||||
writeParticipantRTCMutex sync.RWMutex
|
||||
writeParticipantRTCArgsForCall []struct {
|
||||
arg1 context.Context
|
||||
arg2 livekit.RoomName
|
||||
arg3 livekit.ParticipantIdentity
|
||||
arg4 *livekit.RTCNodeMessage
|
||||
}
|
||||
writeParticipantRTCReturns struct {
|
||||
result1 error
|
||||
}
|
||||
writeParticipantRTCReturnsOnCall map[int]struct {
|
||||
result1 error
|
||||
}
|
||||
WriteRoomRTCStub func(context.Context, livekit.RoomName, *livekit.RTCNodeMessage) error
|
||||
writeRoomRTCMutex sync.RWMutex
|
||||
writeRoomRTCArgsForCall []struct {
|
||||
arg1 context.Context
|
||||
arg2 livekit.RoomName
|
||||
arg3 *livekit.RTCNodeMessage
|
||||
}
|
||||
writeRoomRTCReturns struct {
|
||||
result1 error
|
||||
}
|
||||
writeRoomRTCReturnsOnCall map[int]struct {
|
||||
result1 error
|
||||
}
|
||||
invocations map[string][][]interface{}
|
||||
invocationsMutex sync.RWMutex
|
||||
}
|
||||
@@ -435,70 +398,6 @@ func (fake *FakeRouter) ListNodesReturnsOnCall(i int, result1 []*livekit.Node, r
|
||||
}{result1, result2}
|
||||
}
|
||||
|
||||
func (fake *FakeRouter) OnNewParticipantRTC(arg1 routing.NewParticipantCallback) {
|
||||
fake.onNewParticipantRTCMutex.Lock()
|
||||
fake.onNewParticipantRTCArgsForCall = append(fake.onNewParticipantRTCArgsForCall, struct {
|
||||
arg1 routing.NewParticipantCallback
|
||||
}{arg1})
|
||||
stub := fake.OnNewParticipantRTCStub
|
||||
fake.recordInvocation("OnNewParticipantRTC", []interface{}{arg1})
|
||||
fake.onNewParticipantRTCMutex.Unlock()
|
||||
if stub != nil {
|
||||
fake.OnNewParticipantRTCStub(arg1)
|
||||
}
|
||||
}
|
||||
|
||||
func (fake *FakeRouter) OnNewParticipantRTCCallCount() int {
|
||||
fake.onNewParticipantRTCMutex.RLock()
|
||||
defer fake.onNewParticipantRTCMutex.RUnlock()
|
||||
return len(fake.onNewParticipantRTCArgsForCall)
|
||||
}
|
||||
|
||||
func (fake *FakeRouter) OnNewParticipantRTCCalls(stub func(routing.NewParticipantCallback)) {
|
||||
fake.onNewParticipantRTCMutex.Lock()
|
||||
defer fake.onNewParticipantRTCMutex.Unlock()
|
||||
fake.OnNewParticipantRTCStub = stub
|
||||
}
|
||||
|
||||
func (fake *FakeRouter) OnNewParticipantRTCArgsForCall(i int) routing.NewParticipantCallback {
|
||||
fake.onNewParticipantRTCMutex.RLock()
|
||||
defer fake.onNewParticipantRTCMutex.RUnlock()
|
||||
argsForCall := fake.onNewParticipantRTCArgsForCall[i]
|
||||
return argsForCall.arg1
|
||||
}
|
||||
|
||||
func (fake *FakeRouter) OnRTCMessage(arg1 routing.RTCMessageCallback) {
|
||||
fake.onRTCMessageMutex.Lock()
|
||||
fake.onRTCMessageArgsForCall = append(fake.onRTCMessageArgsForCall, struct {
|
||||
arg1 routing.RTCMessageCallback
|
||||
}{arg1})
|
||||
stub := fake.OnRTCMessageStub
|
||||
fake.recordInvocation("OnRTCMessage", []interface{}{arg1})
|
||||
fake.onRTCMessageMutex.Unlock()
|
||||
if stub != nil {
|
||||
fake.OnRTCMessageStub(arg1)
|
||||
}
|
||||
}
|
||||
|
||||
func (fake *FakeRouter) OnRTCMessageCallCount() int {
|
||||
fake.onRTCMessageMutex.RLock()
|
||||
defer fake.onRTCMessageMutex.RUnlock()
|
||||
return len(fake.onRTCMessageArgsForCall)
|
||||
}
|
||||
|
||||
func (fake *FakeRouter) OnRTCMessageCalls(stub func(routing.RTCMessageCallback)) {
|
||||
fake.onRTCMessageMutex.Lock()
|
||||
defer fake.onRTCMessageMutex.Unlock()
|
||||
fake.OnRTCMessageStub = stub
|
||||
}
|
||||
|
||||
func (fake *FakeRouter) OnRTCMessageArgsForCall(i int) routing.RTCMessageCallback {
|
||||
fake.onRTCMessageMutex.RLock()
|
||||
defer fake.onRTCMessageMutex.RUnlock()
|
||||
argsForCall := fake.onRTCMessageArgsForCall[i]
|
||||
return argsForCall.arg1
|
||||
}
|
||||
|
||||
func (fake *FakeRouter) RegisterNode() error {
|
||||
fake.registerNodeMutex.Lock()
|
||||
ret, specificReturn := fake.registerNodeReturnsOnCall[len(fake.registerNodeArgsForCall)]
|
||||
@@ -864,133 +763,6 @@ func (fake *FakeRouter) UnregisterNodeReturnsOnCall(i int, result1 error) {
|
||||
}{result1}
|
||||
}
|
||||
|
||||
func (fake *FakeRouter) WriteParticipantRTC(arg1 context.Context, arg2 livekit.RoomName, arg3 livekit.ParticipantIdentity, arg4 *livekit.RTCNodeMessage) error {
|
||||
fake.writeParticipantRTCMutex.Lock()
|
||||
ret, specificReturn := fake.writeParticipantRTCReturnsOnCall[len(fake.writeParticipantRTCArgsForCall)]
|
||||
fake.writeParticipantRTCArgsForCall = append(fake.writeParticipantRTCArgsForCall, struct {
|
||||
arg1 context.Context
|
||||
arg2 livekit.RoomName
|
||||
arg3 livekit.ParticipantIdentity
|
||||
arg4 *livekit.RTCNodeMessage
|
||||
}{arg1, arg2, arg3, arg4})
|
||||
stub := fake.WriteParticipantRTCStub
|
||||
fakeReturns := fake.writeParticipantRTCReturns
|
||||
fake.recordInvocation("WriteParticipantRTC", []interface{}{arg1, arg2, arg3, arg4})
|
||||
fake.writeParticipantRTCMutex.Unlock()
|
||||
if stub != nil {
|
||||
return stub(arg1, arg2, arg3, arg4)
|
||||
}
|
||||
if specificReturn {
|
||||
return ret.result1
|
||||
}
|
||||
return fakeReturns.result1
|
||||
}
|
||||
|
||||
func (fake *FakeRouter) WriteParticipantRTCCallCount() int {
|
||||
fake.writeParticipantRTCMutex.RLock()
|
||||
defer fake.writeParticipantRTCMutex.RUnlock()
|
||||
return len(fake.writeParticipantRTCArgsForCall)
|
||||
}
|
||||
|
||||
func (fake *FakeRouter) WriteParticipantRTCCalls(stub func(context.Context, livekit.RoomName, livekit.ParticipantIdentity, *livekit.RTCNodeMessage) error) {
|
||||
fake.writeParticipantRTCMutex.Lock()
|
||||
defer fake.writeParticipantRTCMutex.Unlock()
|
||||
fake.WriteParticipantRTCStub = stub
|
||||
}
|
||||
|
||||
func (fake *FakeRouter) WriteParticipantRTCArgsForCall(i int) (context.Context, livekit.RoomName, livekit.ParticipantIdentity, *livekit.RTCNodeMessage) {
|
||||
fake.writeParticipantRTCMutex.RLock()
|
||||
defer fake.writeParticipantRTCMutex.RUnlock()
|
||||
argsForCall := fake.writeParticipantRTCArgsForCall[i]
|
||||
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4
|
||||
}
|
||||
|
||||
func (fake *FakeRouter) WriteParticipantRTCReturns(result1 error) {
|
||||
fake.writeParticipantRTCMutex.Lock()
|
||||
defer fake.writeParticipantRTCMutex.Unlock()
|
||||
fake.WriteParticipantRTCStub = nil
|
||||
fake.writeParticipantRTCReturns = struct {
|
||||
result1 error
|
||||
}{result1}
|
||||
}
|
||||
|
||||
func (fake *FakeRouter) WriteParticipantRTCReturnsOnCall(i int, result1 error) {
|
||||
fake.writeParticipantRTCMutex.Lock()
|
||||
defer fake.writeParticipantRTCMutex.Unlock()
|
||||
fake.WriteParticipantRTCStub = nil
|
||||
if fake.writeParticipantRTCReturnsOnCall == nil {
|
||||
fake.writeParticipantRTCReturnsOnCall = make(map[int]struct {
|
||||
result1 error
|
||||
})
|
||||
}
|
||||
fake.writeParticipantRTCReturnsOnCall[i] = struct {
|
||||
result1 error
|
||||
}{result1}
|
||||
}
|
||||
|
||||
func (fake *FakeRouter) WriteRoomRTC(arg1 context.Context, arg2 livekit.RoomName, arg3 *livekit.RTCNodeMessage) error {
|
||||
fake.writeRoomRTCMutex.Lock()
|
||||
ret, specificReturn := fake.writeRoomRTCReturnsOnCall[len(fake.writeRoomRTCArgsForCall)]
|
||||
fake.writeRoomRTCArgsForCall = append(fake.writeRoomRTCArgsForCall, struct {
|
||||
arg1 context.Context
|
||||
arg2 livekit.RoomName
|
||||
arg3 *livekit.RTCNodeMessage
|
||||
}{arg1, arg2, arg3})
|
||||
stub := fake.WriteRoomRTCStub
|
||||
fakeReturns := fake.writeRoomRTCReturns
|
||||
fake.recordInvocation("WriteRoomRTC", []interface{}{arg1, arg2, arg3})
|
||||
fake.writeRoomRTCMutex.Unlock()
|
||||
if stub != nil {
|
||||
return stub(arg1, arg2, arg3)
|
||||
}
|
||||
if specificReturn {
|
||||
return ret.result1
|
||||
}
|
||||
return fakeReturns.result1
|
||||
}
|
||||
|
||||
func (fake *FakeRouter) WriteRoomRTCCallCount() int {
|
||||
fake.writeRoomRTCMutex.RLock()
|
||||
defer fake.writeRoomRTCMutex.RUnlock()
|
||||
return len(fake.writeRoomRTCArgsForCall)
|
||||
}
|
||||
|
||||
func (fake *FakeRouter) WriteRoomRTCCalls(stub func(context.Context, livekit.RoomName, *livekit.RTCNodeMessage) error) {
|
||||
fake.writeRoomRTCMutex.Lock()
|
||||
defer fake.writeRoomRTCMutex.Unlock()
|
||||
fake.WriteRoomRTCStub = stub
|
||||
}
|
||||
|
||||
func (fake *FakeRouter) WriteRoomRTCArgsForCall(i int) (context.Context, livekit.RoomName, *livekit.RTCNodeMessage) {
|
||||
fake.writeRoomRTCMutex.RLock()
|
||||
defer fake.writeRoomRTCMutex.RUnlock()
|
||||
argsForCall := fake.writeRoomRTCArgsForCall[i]
|
||||
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
|
||||
}
|
||||
|
||||
func (fake *FakeRouter) WriteRoomRTCReturns(result1 error) {
|
||||
fake.writeRoomRTCMutex.Lock()
|
||||
defer fake.writeRoomRTCMutex.Unlock()
|
||||
fake.WriteRoomRTCStub = nil
|
||||
fake.writeRoomRTCReturns = struct {
|
||||
result1 error
|
||||
}{result1}
|
||||
}
|
||||
|
||||
func (fake *FakeRouter) WriteRoomRTCReturnsOnCall(i int, result1 error) {
|
||||
fake.writeRoomRTCMutex.Lock()
|
||||
defer fake.writeRoomRTCMutex.Unlock()
|
||||
fake.WriteRoomRTCStub = nil
|
||||
if fake.writeRoomRTCReturnsOnCall == nil {
|
||||
fake.writeRoomRTCReturnsOnCall = make(map[int]struct {
|
||||
result1 error
|
||||
})
|
||||
}
|
||||
fake.writeRoomRTCReturnsOnCall[i] = struct {
|
||||
result1 error
|
||||
}{result1}
|
||||
}
|
||||
|
||||
func (fake *FakeRouter) Invocations() map[string][][]interface{} {
|
||||
fake.invocationsMutex.RLock()
|
||||
defer fake.invocationsMutex.RUnlock()
|
||||
@@ -1004,10 +776,6 @@ func (fake *FakeRouter) Invocations() map[string][][]interface{} {
|
||||
defer fake.getRegionMutex.RUnlock()
|
||||
fake.listNodesMutex.RLock()
|
||||
defer fake.listNodesMutex.RUnlock()
|
||||
fake.onNewParticipantRTCMutex.RLock()
|
||||
defer fake.onNewParticipantRTCMutex.RUnlock()
|
||||
fake.onRTCMessageMutex.RLock()
|
||||
defer fake.onRTCMessageMutex.RUnlock()
|
||||
fake.registerNodeMutex.RLock()
|
||||
defer fake.registerNodeMutex.RUnlock()
|
||||
fake.removeDeadNodesMutex.RLock()
|
||||
@@ -1022,10 +790,6 @@ func (fake *FakeRouter) Invocations() map[string][][]interface{} {
|
||||
defer fake.stopMutex.RUnlock()
|
||||
fake.unregisterNodeMutex.RLock()
|
||||
defer fake.unregisterNodeMutex.RUnlock()
|
||||
fake.writeParticipantRTCMutex.RLock()
|
||||
defer fake.writeParticipantRTCMutex.RUnlock()
|
||||
fake.writeRoomRTCMutex.RLock()
|
||||
defer fake.writeRoomRTCMutex.RUnlock()
|
||||
copiedInvocations := map[string][][]interface{}{}
|
||||
for key, value := range fake.invocations {
|
||||
copiedInvocations[key] = value
|
||||
|
||||
@@ -101,7 +101,7 @@ func NewLocalRoomManager(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
r := &RoomManager{
|
||||
return &RoomManager{
|
||||
config: conf,
|
||||
rtcConfig: rtcConf,
|
||||
currentNode: currentNode,
|
||||
@@ -126,12 +126,7 @@ func NewLocalRoomManager(
|
||||
Region: conf.Region,
|
||||
NodeId: currentNode.Id,
|
||||
},
|
||||
}
|
||||
|
||||
// hook up to router
|
||||
router.OnNewParticipantRTC(r.StartSession)
|
||||
router.OnRTCMessage(r.handleRTCMessage)
|
||||
return r, nil
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (r *RoomManager) GetRoom(_ context.Context, roomName livekit.RoomName) *rtc.Room {
|
||||
@@ -638,26 +633,6 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.LocalPa
|
||||
}
|
||||
}
|
||||
|
||||
// handles RTC messages resulted from Room API calls
|
||||
func (r *RoomManager) handleRTCMessage(ctx context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) {
|
||||
switch rm := msg.Message.(type) {
|
||||
case *livekit.RTCNodeMessage_RemoveParticipant:
|
||||
r.RemoveParticipant(ctx, rm.RemoveParticipant)
|
||||
case *livekit.RTCNodeMessage_MuteTrack:
|
||||
r.MutePublishedTrack(ctx, rm.MuteTrack)
|
||||
case *livekit.RTCNodeMessage_UpdateParticipant:
|
||||
r.UpdateParticipant(ctx, rm.UpdateParticipant)
|
||||
case *livekit.RTCNodeMessage_DeleteRoom:
|
||||
r.DeleteRoom(ctx, rm.DeleteRoom)
|
||||
case *livekit.RTCNodeMessage_UpdateSubscriptions:
|
||||
r.UpdateSubscriptions(ctx, rm.UpdateSubscriptions)
|
||||
case *livekit.RTCNodeMessage_SendData:
|
||||
r.SendData(ctx, rm.SendData)
|
||||
case *livekit.RTCNodeMessage_UpdateRoomMetadata:
|
||||
r.UpdateRoomMetadata(ctx, rm.UpdateRoomMetadata)
|
||||
}
|
||||
}
|
||||
|
||||
type participantReq interface {
|
||||
GetRoom() string
|
||||
GetIdentity() string
|
||||
|
||||
+9
-192
@@ -16,20 +16,16 @@ package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/thoas/go-funk"
|
||||
"github.com/twitchtv/twirp"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/config"
|
||||
"github.com/livekit/livekit-server/pkg/routing"
|
||||
"github.com/livekit/livekit-server/pkg/rtc"
|
||||
"github.com/livekit/protocol/livekit"
|
||||
"github.com/livekit/protocol/logger"
|
||||
"github.com/livekit/protocol/rpc"
|
||||
"github.com/livekit/protocol/utils"
|
||||
)
|
||||
@@ -170,39 +166,7 @@ func (s *RoomService) DeleteRoom(ctx context.Context, req *livekit.DeleteRoomReq
|
||||
return nil, twirpAuthError(err)
|
||||
}
|
||||
|
||||
if s.psrpcConf.Enabled {
|
||||
return s.roomClient.DeleteRoom(ctx, s.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), req)
|
||||
}
|
||||
|
||||
if _, _, err := s.roomStore.LoadRoom(ctx, livekit.RoomName(req.Room), false); err == ErrRoomNotFound {
|
||||
return nil, twirp.NotFoundError("room not found")
|
||||
}
|
||||
|
||||
err := s.router.WriteRoomRTC(ctx, livekit.RoomName(req.Room), &livekit.RTCNodeMessage{
|
||||
Message: &livekit.RTCNodeMessage_DeleteRoom{
|
||||
DeleteRoom: req,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// we should not return until when the room is confirmed deleted
|
||||
err = s.confirmExecution(func() error {
|
||||
_, _, err := s.roomStore.LoadRoom(ctx, livekit.RoomName(req.Room), false)
|
||||
if err == nil {
|
||||
return ErrOperationFailed
|
||||
} else if err != ErrRoomNotFound {
|
||||
return err
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &livekit.DeleteRoomResponse{}, nil
|
||||
return s.roomClient.DeleteRoom(ctx, s.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), req)
|
||||
}
|
||||
|
||||
func (s *RoomService) ListParticipants(ctx context.Context, req *livekit.ListParticipantsRequest) (*livekit.ListParticipantsResponse, error) {
|
||||
@@ -247,34 +211,7 @@ func (s *RoomService) RemoveParticipant(ctx context.Context, req *livekit.RoomPa
|
||||
return nil, twirp.NotFoundError("participant not found")
|
||||
}
|
||||
|
||||
if s.psrpcConf.Enabled {
|
||||
return s.participantClient.RemoveParticipant(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req)
|
||||
}
|
||||
|
||||
err := s.writeParticipantMessage(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity), &livekit.RTCNodeMessage{
|
||||
Message: &livekit.RTCNodeMessage_RemoveParticipant{
|
||||
RemoveParticipant: req,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = s.confirmExecution(func() error {
|
||||
_, err := s.roomStore.LoadParticipant(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity))
|
||||
if err == ErrParticipantNotFound {
|
||||
return nil
|
||||
} else if err != nil {
|
||||
return err
|
||||
} else {
|
||||
return ErrOperationFailed
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &livekit.RemoveParticipantResponse{}, nil
|
||||
return s.participantClient.RemoveParticipant(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req)
|
||||
}
|
||||
|
||||
func (s *RoomService) MutePublishedTrack(ctx context.Context, req *livekit.MuteRoomTrackRequest) (*livekit.MuteRoomTrackResponse, error) {
|
||||
@@ -283,47 +220,7 @@ func (s *RoomService) MutePublishedTrack(ctx context.Context, req *livekit.MuteR
|
||||
return nil, twirpAuthError(err)
|
||||
}
|
||||
|
||||
if s.psrpcConf.Enabled {
|
||||
return s.participantClient.MutePublishedTrack(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req)
|
||||
}
|
||||
|
||||
err := s.writeParticipantMessage(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity), &livekit.RTCNodeMessage{
|
||||
Message: &livekit.RTCNodeMessage_MuteTrack{
|
||||
MuteTrack: req,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var track *livekit.TrackInfo
|
||||
err = s.confirmExecution(func() error {
|
||||
p, err := s.roomStore.LoadParticipant(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// ensure track is muted
|
||||
t := funk.Find(p.Tracks, func(t *livekit.TrackInfo) bool {
|
||||
return t.Sid == req.TrackSid
|
||||
})
|
||||
var ok bool
|
||||
track, ok = t.(*livekit.TrackInfo)
|
||||
if !ok {
|
||||
return ErrTrackNotFound
|
||||
}
|
||||
if track.Muted != req.Muted {
|
||||
return ErrOperationFailed
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
res := &livekit.MuteRoomTrackResponse{
|
||||
Track: track,
|
||||
}
|
||||
return res, nil
|
||||
return s.participantClient.MutePublishedTrack(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req)
|
||||
}
|
||||
|
||||
func (s *RoomService) UpdateParticipant(ctx context.Context, req *livekit.UpdateParticipantRequest) (*livekit.ParticipantInfo, error) {
|
||||
@@ -337,42 +234,7 @@ func (s *RoomService) UpdateParticipant(ctx context.Context, req *livekit.Update
|
||||
return nil, twirpAuthError(err)
|
||||
}
|
||||
|
||||
if s.psrpcConf.Enabled {
|
||||
return s.participantClient.UpdateParticipant(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req)
|
||||
}
|
||||
|
||||
err := s.writeParticipantMessage(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity), &livekit.RTCNodeMessage{
|
||||
Message: &livekit.RTCNodeMessage_UpdateParticipant{
|
||||
UpdateParticipant: req,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var participant *livekit.ParticipantInfo
|
||||
var detailedError error
|
||||
err = s.confirmExecution(func() error {
|
||||
participant, err = s.roomStore.LoadParticipant(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if req.Metadata != "" && participant.Metadata != req.Metadata {
|
||||
detailedError = fmt.Errorf("metadata does not match")
|
||||
return ErrOperationFailed
|
||||
}
|
||||
if req.Permission != nil && !proto.Equal(req.Permission, participant.Permission) {
|
||||
detailedError = fmt.Errorf("permissions do not match, expected: %v, actual: %v", req.Permission, participant.Permission)
|
||||
return ErrOperationFailed
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
logger.Warnw("could not confirm participant update", detailedError)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return participant, nil
|
||||
return s.participantClient.UpdateParticipant(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req)
|
||||
}
|
||||
|
||||
func (s *RoomService) UpdateSubscriptions(ctx context.Context, req *livekit.UpdateSubscriptionsRequest) (*livekit.UpdateSubscriptionsResponse, error) {
|
||||
@@ -386,20 +248,7 @@ func (s *RoomService) UpdateSubscriptions(ctx context.Context, req *livekit.Upda
|
||||
return nil, twirpAuthError(err)
|
||||
}
|
||||
|
||||
if s.psrpcConf.Enabled {
|
||||
return s.participantClient.UpdateSubscriptions(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req)
|
||||
}
|
||||
|
||||
err := s.writeParticipantMessage(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity), &livekit.RTCNodeMessage{
|
||||
Message: &livekit.RTCNodeMessage_UpdateSubscriptions{
|
||||
UpdateSubscriptions: req,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &livekit.UpdateSubscriptionsResponse{}, nil
|
||||
return s.participantClient.UpdateSubscriptions(ctx, s.topicFormatter.ParticipantTopic(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)), req)
|
||||
}
|
||||
|
||||
func (s *RoomService) SendData(ctx context.Context, req *livekit.SendDataRequest) (*livekit.SendDataResponse, error) {
|
||||
@@ -409,20 +258,7 @@ func (s *RoomService) SendData(ctx context.Context, req *livekit.SendDataRequest
|
||||
return nil, twirpAuthError(err)
|
||||
}
|
||||
|
||||
if s.psrpcConf.Enabled {
|
||||
return s.roomClient.SendData(ctx, s.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), req)
|
||||
}
|
||||
|
||||
err := s.router.WriteRoomRTC(ctx, roomName, &livekit.RTCNodeMessage{
|
||||
Message: &livekit.RTCNodeMessage_SendData{
|
||||
SendData: req,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &livekit.SendDataResponse{}, nil
|
||||
return s.roomClient.SendData(ctx, s.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), req)
|
||||
}
|
||||
|
||||
func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.UpdateRoomMetadataRequest) (*livekit.Room, error) {
|
||||
@@ -451,20 +287,9 @@ func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if s.psrpcConf.Enabled {
|
||||
_, err := s.roomClient.UpdateRoomMetadata(ctx, s.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
err = s.router.WriteRoomRTC(ctx, livekit.RoomName(req.Room), &livekit.RTCNodeMessage{
|
||||
Message: &livekit.RTCNodeMessage_UpdateRoomMetadata{
|
||||
UpdateRoomMetadata: req,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
_, err = s.roomClient.UpdateRoomMetadata(ctx, s.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = s.confirmExecution(func() error {
|
||||
@@ -494,14 +319,6 @@ func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat
|
||||
return room, nil
|
||||
}
|
||||
|
||||
func (s *RoomService) writeParticipantMessage(ctx context.Context, room livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error {
|
||||
if err := EnsureAdminPermission(ctx, room); err != nil {
|
||||
return twirpAuthError(err)
|
||||
}
|
||||
|
||||
return s.router.WriteParticipantRTC(ctx, room, identity, msg)
|
||||
}
|
||||
|
||||
func (s *RoomService) confirmExecution(f func() error) error {
|
||||
expired := time.After(s.apiConf.ExecutionTimeout)
|
||||
var err error
|
||||
|
||||
@@ -33,26 +33,6 @@ import (
|
||||
)
|
||||
|
||||
func TestDeleteRoom(t *testing.T) {
|
||||
t.Run("delete non-existent", func(t *testing.T) {
|
||||
svc := newTestRoomService(config.RoomConfig{})
|
||||
grant := &auth.ClaimGrants{
|
||||
Video: &auth.VideoGrant{
|
||||
RoomCreate: true,
|
||||
},
|
||||
}
|
||||
ctx := service.WithGrants(context.Background(), grant)
|
||||
svc.store.LoadRoomReturns(nil, nil, service.ErrRoomNotFound)
|
||||
_, err := svc.DeleteRoom(ctx, &livekit.DeleteRoomRequest{
|
||||
Room: "testroom",
|
||||
})
|
||||
require.Error(t, err)
|
||||
if terr, ok := err.(twirp.Error); ok {
|
||||
require.Equal(t, twirp.NotFound, terr.Code())
|
||||
} else {
|
||||
require.Fail(t, "should be twirp error")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("missing permissions", func(t *testing.T) {
|
||||
svc := newTestRoomService(config.RoomConfig{})
|
||||
grant := &auth.ClaimGrants{
|
||||
|
||||
+10
-20
@@ -102,27 +102,17 @@ func (s *defaultSessionHandler) HandleSession(
|
||||
) error {
|
||||
prometheus.IncrementParticipantRtcInit(1)
|
||||
|
||||
if rr, ok := s.router.(*routing.RedisRouter); ok {
|
||||
rtcNode, err := s.router.GetNodeForRoom(ctx, roomName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rtcNode, err := s.router.GetNodeForRoom(ctx, roomName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if rtcNode.Id != s.currentNode.Id {
|
||||
err = routing.ErrIncorrectRTCNode
|
||||
logger.Errorw("called participant on incorrect node", err,
|
||||
"rtcNode", rtcNode,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
pKey := routing.ParticipantKeyLegacy(roomName, pi.Identity)
|
||||
pKeyB62 := routing.ParticipantKey(roomName, pi.Identity)
|
||||
|
||||
// RTC session should start on this node
|
||||
if err := rr.SetParticipantRTCNode(pKey, pKeyB62, s.currentNode.Id); err != nil {
|
||||
return err
|
||||
}
|
||||
if rtcNode.Id != s.currentNode.Id {
|
||||
err = routing.ErrIncorrectRTCNode
|
||||
logger.Errorw("called participant on incorrect node", err,
|
||||
"rtcNode", rtcNode,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
return s.roomManager.StartSession(ctx, roomName, pi, requestSource, responseSink)
|
||||
|
||||
@@ -40,7 +40,6 @@ func init() {
|
||||
|
||||
func TestSignal(t *testing.T) {
|
||||
cfg := config.SignalRelayConfig{
|
||||
Enabled: false,
|
||||
RetryTimeout: 30 * time.Second,
|
||||
MinRetryInterval: 500 * time.Millisecond,
|
||||
MaxRetryInterval: 5 * time.Second,
|
||||
|
||||
@@ -50,7 +50,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
router := routing.CreateRouter(conf, universalClient, currentNode, signalClient)
|
||||
router := routing.CreateRouter(universalClient, currentNode, signalClient)
|
||||
objectStore := createStore(universalClient)
|
||||
roomAllocator, err := NewRoomAllocator(conf, router, objectStore)
|
||||
if err != nil {
|
||||
@@ -149,7 +149,7 @@ func InitializeRouter(conf *config.Config, currentNode routing.LocalNode) (routi
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
router := routing.CreateRouter(conf, universalClient, currentNode, signalClient)
|
||||
router := routing.CreateRouter(universalClient, currentNode, signalClient)
|
||||
return router, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -183,7 +183,6 @@ func createMultiNodeServer(nodeID string, port uint32) *service.LivekitServer {
|
||||
conf.RTC.TCPPort = port + 2
|
||||
conf.Redis.Address = "localhost:6379"
|
||||
conf.Keys = map[string]string{testApiKey: testApiSecret}
|
||||
conf.SignalRelay.Enabled = true
|
||||
|
||||
currentNode, err := routing.NewLocalNode(conf)
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user