set participant node for redis router in signal service (#1584)

This commit is contained in:
Paul Wells
2023-04-05 16:08:25 -07:00
committed by GitHub
parent 234f7ea5cb
commit 6b0cb33c53
5 changed files with 43 additions and 25 deletions
+4 -4
View File
@@ -106,14 +106,14 @@ func (r *LocalRouter) WriteParticipantRTC(_ context.Context, roomName livekit.Ro
r.rtcMessageChan = NewMessageChannel(localRTCChannelSize)
}
r.lock.Unlock()
msg.ParticipantKey = string(participantKeyLegacy(roomName, identity))
msg.ParticipantKeyB62 = string(participantKey(roomName, identity))
msg.ParticipantKey = string(ParticipantKeyLegacy(roomName, identity))
msg.ParticipantKeyB62 = string(ParticipantKey(roomName, identity))
return r.writeRTCMessage(r.rtcMessageChan, msg)
}
func (r *LocalRouter) WriteRoomRTC(ctx context.Context, roomName livekit.RoomName, msg *livekit.RTCNodeMessage) error {
msg.ParticipantKey = string(participantKeyLegacy(roomName, ""))
msg.ParticipantKeyB62 = string(participantKey(roomName, ""))
msg.ParticipantKey = string(ParticipantKeyLegacy(roomName, ""))
msg.ParticipantKeyB62 = string(ParticipantKey(roomName, ""))
return r.WriteNodeRTC(ctx, r.currentNode.Id, msg)
}
+21 -14
View File
@@ -149,20 +149,27 @@ func (r *RedisRouter) StartParticipantSignal(ctx context.Context, roomName livek
return
}
if r.usePSRPCSignal {
connectionID, reqSink, resSource, err = r.StartParticipantSignalWithNodeID(ctx, roomName, pi, livekit.NodeID(rtcNode.Id))
if err != nil {
return
}
// map signal & rtc nodes
err = r.setParticipantSignalNode(connectionID, r.currentNode.Id)
return
}
// create a new connection id
connectionID = livekit.ConnectionID(utils.NewGuid("CO_"))
pKey := participantKeyLegacy(roomName, pi.Identity)
pKeyB62 := participantKey(roomName, pi.Identity)
pKey := ParticipantKeyLegacy(roomName, pi.Identity)
pKeyB62 := ParticipantKey(roomName, pi.Identity)
// map signal & rtc nodes
if err = r.setParticipantSignalNode(connectionID, r.currentNode.Id); err != nil {
return
}
if r.usePSRPCSignal {
return r.StartParticipantSignalWithNodeID(ctx, roomName, pi, livekit.NodeID(rtcNode.Id))
}
// index by connectionID, since there may be multiple connections for the participant
// set up response channel before sending StartSession and be ready to receive responses.
resChan := r.getOrCreateMessageChannel(r.responseChannels, string(connectionID))
@@ -185,16 +192,16 @@ func (r *RedisRouter) StartParticipantSignal(ctx context.Context, roomName livek
}
func (r *RedisRouter) WriteParticipantRTC(_ context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error {
pkey := participantKeyLegacy(roomName, identity)
pkeyB62 := participantKey(roomName, identity)
pkey := ParticipantKeyLegacy(roomName, identity)
pkeyB62 := ParticipantKey(roomName, identity)
rtcNode, err := r.getParticipantRTCNode(pkey, pkeyB62)
if err != nil {
return err
}
rtcSink := NewRTCNodeSink(r.rc, livekit.NodeID(rtcNode), pkey, pkeyB62)
msg.ParticipantKey = string(participantKeyLegacy(roomName, identity))
msg.ParticipantKeyB62 = string(participantKey(roomName, identity))
msg.ParticipantKey = string(ParticipantKeyLegacy(roomName, identity))
msg.ParticipantKeyB62 = string(ParticipantKey(roomName, identity))
return r.writeRTCMessage(rtcSink, msg)
}
@@ -203,8 +210,8 @@ func (r *RedisRouter) WriteRoomRTC(ctx context.Context, roomName livekit.RoomNam
if err != nil {
return err
}
msg.ParticipantKey = string(participantKeyLegacy(roomName, ""))
msg.ParticipantKeyB62 = string(participantKey(roomName, ""))
msg.ParticipantKey = string(ParticipantKeyLegacy(roomName, ""))
msg.ParticipantKeyB62 = string(ParticipantKey(roomName, ""))
return r.WriteNodeRTC(ctx, node.Id, msg)
}
@@ -229,7 +236,7 @@ func (r *RedisRouter) startParticipantRTC(ss *livekit.StartSession, participantK
return err
}
if err := r.setParticipantRTCNode(participantKey, participantKeyB62, rtcNode.Id); err != nil {
if err := r.SetParticipantRTCNode(participantKey, participantKeyB62, rtcNode.Id); err != nil {
return err
}
@@ -328,7 +335,7 @@ func (r *RedisRouter) Stop() {
r.cancel()
}
func (r *RedisRouter) setParticipantRTCNode(participantKey livekit.ParticipantKey, participantKeyB62 livekit.ParticipantKey, nodeID string) error {
func (r *RedisRouter) SetParticipantRTCNode(participantKey livekit.ParticipantKey, participantKeyB62 livekit.ParticipantKey, nodeID string) error {
var err error
if participantKey != "" {
err1 := r.rc.Set(r.ctx, participantRTCKey(participantKey), nodeID, participantMappingTTL).Err()
+2 -2
View File
@@ -9,7 +9,7 @@ import (
"github.com/livekit/protocol/livekit"
)
func participantKeyLegacy(roomName livekit.RoomName, identity livekit.ParticipantIdentity) livekit.ParticipantKey {
func ParticipantKeyLegacy(roomName livekit.RoomName, identity livekit.ParticipantIdentity) livekit.ParticipantKey {
return livekit.ParticipantKey(string(roomName) + "|" + string(identity))
}
@@ -25,7 +25,7 @@ func parseParticipantKeyLegacy(pkey livekit.ParticipantKey) (roomName livekit.Ro
return
}
func participantKey(roomName livekit.RoomName, identity livekit.ParticipantIdentity) livekit.ParticipantKey {
func ParticipantKey(roomName livekit.RoomName, identity livekit.ParticipantIdentity) livekit.ParticipantKey {
return livekit.ParticipantKey(encode(string(roomName), string(identity)))
}
+5 -5
View File
@@ -10,7 +10,7 @@ import (
func TestUtils_ParticipantKey(t *testing.T) {
// encode/decode empty
encoded := participantKey("", "")
encoded := ParticipantKey("", "")
roomName, identity, err := parseParticipantKey(encoded)
require.NoError(t, err)
require.Equal(t, livekit.RoomName(""), roomName)
@@ -21,28 +21,28 @@ func TestUtils_ParticipantKey(t *testing.T) {
require.Error(t, err)
// encode/decode without delimiter
encoded = participantKey("room1", "identity1")
encoded = ParticipantKey("room1", "identity1")
roomName, identity, err = parseParticipantKey(encoded)
require.NoError(t, err)
require.Equal(t, livekit.RoomName("room1"), roomName)
require.Equal(t, livekit.ParticipantIdentity("identity1"), identity)
// encode/decode with delimiter in roomName
encoded = participantKey("room1|alter_room1", "identity1")
encoded = ParticipantKey("room1|alter_room1", "identity1")
roomName, identity, err = parseParticipantKey(encoded)
require.NoError(t, err)
require.Equal(t, livekit.RoomName("room1|alter_room1"), roomName)
require.Equal(t, livekit.ParticipantIdentity("identity1"), identity)
// encode/decode with delimiter in identity
encoded = participantKey("room1", "identity1|alter-identity1")
encoded = ParticipantKey("room1", "identity1|alter-identity1")
roomName, identity, err = parseParticipantKey(encoded)
require.NoError(t, err)
require.Equal(t, livekit.RoomName("room1"), roomName)
require.Equal(t, livekit.ParticipantIdentity("identity1|alter-identity1"), identity)
// encode/decode with delimiter in both and multiple delimiters in both
encoded = participantKey("room1|alter_room1|again_room1", "identity1|alter-identity1|again-identity1")
encoded = ParticipantKey("room1|alter_room1|again_room1", "identity1|alter-identity1|again-identity1")
roomName, identity, err = parseParticipantKey(encoded)
require.NoError(t, err)
require.Equal(t, livekit.RoomName("room1|alter_room1|again_room1"), roomName)
+11
View File
@@ -77,6 +77,17 @@ func NewDefaultSignalServer(
responseSink routing.MessageSink,
) error {
prometheus.IncrementParticipantRtcInit(1)
if rr, ok := router.(*routing.RedisRouter); ok {
pKey := routing.ParticipantKeyLegacy(roomName, pi.Identity)
pKeyB62 := routing.ParticipantKey(roomName, pi.Identity)
// RTC session should start on this node
if err := rr.SetParticipantRTCNode(pKey, pKeyB62, currentNode.Id); err != nil {
return err
}
}
return roomManager.StartSession(ctx, roomName, pi, requestSource, responseSink)
}