diff --git a/pkg/routing/localrouter.go b/pkg/routing/localrouter.go index a70cec005..2c5c39a98 100644 --- a/pkg/routing/localrouter.go +++ b/pkg/routing/localrouter.go @@ -106,14 +106,14 @@ func (r *LocalRouter) WriteParticipantRTC(_ context.Context, roomName livekit.Ro r.rtcMessageChan = NewMessageChannel(localRTCChannelSize) } r.lock.Unlock() - msg.ParticipantKey = string(participantKeyLegacy(roomName, identity)) - msg.ParticipantKeyB62 = string(participantKey(roomName, identity)) + msg.ParticipantKey = string(ParticipantKeyLegacy(roomName, identity)) + msg.ParticipantKeyB62 = string(ParticipantKey(roomName, identity)) return r.writeRTCMessage(r.rtcMessageChan, msg) } func (r *LocalRouter) WriteRoomRTC(ctx context.Context, roomName livekit.RoomName, msg *livekit.RTCNodeMessage) error { - msg.ParticipantKey = string(participantKeyLegacy(roomName, "")) - msg.ParticipantKeyB62 = string(participantKey(roomName, "")) + msg.ParticipantKey = string(ParticipantKeyLegacy(roomName, "")) + msg.ParticipantKeyB62 = string(ParticipantKey(roomName, "")) return r.WriteNodeRTC(ctx, r.currentNode.Id, msg) } diff --git a/pkg/routing/redisrouter.go b/pkg/routing/redisrouter.go index d53c52997..c5da2a54b 100644 --- a/pkg/routing/redisrouter.go +++ b/pkg/routing/redisrouter.go @@ -149,20 +149,27 @@ func (r *RedisRouter) StartParticipantSignal(ctx context.Context, roomName livek return } + if r.usePSRPCSignal { + connectionID, reqSink, resSource, err = r.StartParticipantSignalWithNodeID(ctx, roomName, pi, livekit.NodeID(rtcNode.Id)) + if err != nil { + return + } + + // map signal & rtc nodes + err = r.setParticipantSignalNode(connectionID, r.currentNode.Id) + return + } + // create a new connection id connectionID = livekit.ConnectionID(utils.NewGuid("CO_")) - pKey := participantKeyLegacy(roomName, pi.Identity) - pKeyB62 := participantKey(roomName, pi.Identity) + pKey := ParticipantKeyLegacy(roomName, pi.Identity) + pKeyB62 := ParticipantKey(roomName, pi.Identity) // map signal & rtc nodes if err = r.setParticipantSignalNode(connectionID, r.currentNode.Id); err != nil { return } - if r.usePSRPCSignal { - return r.StartParticipantSignalWithNodeID(ctx, roomName, pi, livekit.NodeID(rtcNode.Id)) - } - // index by connectionID, since there may be multiple connections for the participant // set up response channel before sending StartSession and be ready to receive responses. resChan := r.getOrCreateMessageChannel(r.responseChannels, string(connectionID)) @@ -185,16 +192,16 @@ func (r *RedisRouter) StartParticipantSignal(ctx context.Context, roomName livek } func (r *RedisRouter) WriteParticipantRTC(_ context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error { - pkey := participantKeyLegacy(roomName, identity) - pkeyB62 := participantKey(roomName, identity) + pkey := ParticipantKeyLegacy(roomName, identity) + pkeyB62 := ParticipantKey(roomName, identity) rtcNode, err := r.getParticipantRTCNode(pkey, pkeyB62) if err != nil { return err } rtcSink := NewRTCNodeSink(r.rc, livekit.NodeID(rtcNode), pkey, pkeyB62) - msg.ParticipantKey = string(participantKeyLegacy(roomName, identity)) - msg.ParticipantKeyB62 = string(participantKey(roomName, identity)) + msg.ParticipantKey = string(ParticipantKeyLegacy(roomName, identity)) + msg.ParticipantKeyB62 = string(ParticipantKey(roomName, identity)) return r.writeRTCMessage(rtcSink, msg) } @@ -203,8 +210,8 @@ func (r *RedisRouter) WriteRoomRTC(ctx context.Context, roomName livekit.RoomNam if err != nil { return err } - msg.ParticipantKey = string(participantKeyLegacy(roomName, "")) - msg.ParticipantKeyB62 = string(participantKey(roomName, "")) + msg.ParticipantKey = string(ParticipantKeyLegacy(roomName, "")) + msg.ParticipantKeyB62 = string(ParticipantKey(roomName, "")) return r.WriteNodeRTC(ctx, node.Id, msg) } @@ -229,7 +236,7 @@ func (r *RedisRouter) startParticipantRTC(ss *livekit.StartSession, participantK return err } - if err := r.setParticipantRTCNode(participantKey, participantKeyB62, rtcNode.Id); err != nil { + if err := r.SetParticipantRTCNode(participantKey, participantKeyB62, rtcNode.Id); err != nil { return err } @@ -328,7 +335,7 @@ func (r *RedisRouter) Stop() { r.cancel() } -func (r *RedisRouter) setParticipantRTCNode(participantKey livekit.ParticipantKey, participantKeyB62 livekit.ParticipantKey, nodeID string) error { +func (r *RedisRouter) SetParticipantRTCNode(participantKey livekit.ParticipantKey, participantKeyB62 livekit.ParticipantKey, nodeID string) error { var err error if participantKey != "" { err1 := r.rc.Set(r.ctx, participantRTCKey(participantKey), nodeID, participantMappingTTL).Err() diff --git a/pkg/routing/utils.go b/pkg/routing/utils.go index 8db4f649c..38a458b63 100644 --- a/pkg/routing/utils.go +++ b/pkg/routing/utils.go @@ -9,7 +9,7 @@ import ( "github.com/livekit/protocol/livekit" ) -func participantKeyLegacy(roomName livekit.RoomName, identity livekit.ParticipantIdentity) livekit.ParticipantKey { +func ParticipantKeyLegacy(roomName livekit.RoomName, identity livekit.ParticipantIdentity) livekit.ParticipantKey { return livekit.ParticipantKey(string(roomName) + "|" + string(identity)) } @@ -25,7 +25,7 @@ func parseParticipantKeyLegacy(pkey livekit.ParticipantKey) (roomName livekit.Ro return } -func participantKey(roomName livekit.RoomName, identity livekit.ParticipantIdentity) livekit.ParticipantKey { +func ParticipantKey(roomName livekit.RoomName, identity livekit.ParticipantIdentity) livekit.ParticipantKey { return livekit.ParticipantKey(encode(string(roomName), string(identity))) } diff --git a/pkg/routing/utils_test.go b/pkg/routing/utils_test.go index ae40b27e8..10a21a60f 100644 --- a/pkg/routing/utils_test.go +++ b/pkg/routing/utils_test.go @@ -10,7 +10,7 @@ import ( func TestUtils_ParticipantKey(t *testing.T) { // encode/decode empty - encoded := participantKey("", "") + encoded := ParticipantKey("", "") roomName, identity, err := parseParticipantKey(encoded) require.NoError(t, err) require.Equal(t, livekit.RoomName(""), roomName) @@ -21,28 +21,28 @@ func TestUtils_ParticipantKey(t *testing.T) { require.Error(t, err) // encode/decode without delimiter - encoded = participantKey("room1", "identity1") + encoded = ParticipantKey("room1", "identity1") roomName, identity, err = parseParticipantKey(encoded) require.NoError(t, err) require.Equal(t, livekit.RoomName("room1"), roomName) require.Equal(t, livekit.ParticipantIdentity("identity1"), identity) // encode/decode with delimiter in roomName - encoded = participantKey("room1|alter_room1", "identity1") + encoded = ParticipantKey("room1|alter_room1", "identity1") roomName, identity, err = parseParticipantKey(encoded) require.NoError(t, err) require.Equal(t, livekit.RoomName("room1|alter_room1"), roomName) require.Equal(t, livekit.ParticipantIdentity("identity1"), identity) // encode/decode with delimiter in identity - encoded = participantKey("room1", "identity1|alter-identity1") + encoded = ParticipantKey("room1", "identity1|alter-identity1") roomName, identity, err = parseParticipantKey(encoded) require.NoError(t, err) require.Equal(t, livekit.RoomName("room1"), roomName) require.Equal(t, livekit.ParticipantIdentity("identity1|alter-identity1"), identity) // encode/decode with delimiter in both and multiple delimiters in both - encoded = participantKey("room1|alter_room1|again_room1", "identity1|alter-identity1|again-identity1") + encoded = ParticipantKey("room1|alter_room1|again_room1", "identity1|alter-identity1|again-identity1") roomName, identity, err = parseParticipantKey(encoded) require.NoError(t, err) require.Equal(t, livekit.RoomName("room1|alter_room1|again_room1"), roomName) diff --git a/pkg/service/signal.go b/pkg/service/signal.go index 2423260d6..407c173e1 100644 --- a/pkg/service/signal.go +++ b/pkg/service/signal.go @@ -77,6 +77,17 @@ func NewDefaultSignalServer( responseSink routing.MessageSink, ) error { prometheus.IncrementParticipantRtcInit(1) + + if rr, ok := router.(*routing.RedisRouter); ok { + pKey := routing.ParticipantKeyLegacy(roomName, pi.Identity) + pKeyB62 := routing.ParticipantKey(roomName, pi.Identity) + + // RTC session should start on this node + if err := rr.SetParticipantRTCNode(pKey, pKeyB62, currentNode.Id); err != nil { + return err + } + } + return roomManager.StartSession(ctx, roomName, pi, requestSource, responseSink) }