mirror of
https://github.com/livekit/livekit.git
synced 2026-03-30 15:35:41 +00:00
Use the redis.UniversalClient interface instead of *redis.Client when interacting with go-redis (#1149)
* Use the redis.UniversalClient interface instead of *redis.Client when interacting with go-redis * Update protocol to v1.2.1
This commit is contained in:
2
go.mod
2
go.mod
@@ -18,7 +18,7 @@ require (
|
||||
github.com/hashicorp/golang-lru v0.5.4
|
||||
github.com/livekit/mageutil v0.0.0-20221002073820-d9198083cfdc
|
||||
github.com/livekit/mediatransportutil v0.0.0-20221007030528-7440725c362b
|
||||
github.com/livekit/protocol v1.1.3-0.20221026203734-d3635b12268c
|
||||
github.com/livekit/protocol v1.2.1
|
||||
github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995
|
||||
github.com/mackerelio/go-osstat v0.2.3
|
||||
github.com/magefile/mage v1.14.0
|
||||
|
||||
5
go.sum
5
go.sum
@@ -246,8 +246,8 @@ github.com/livekit/mageutil v0.0.0-20221002073820-d9198083cfdc h1:e3GIA9AL6h4a38
|
||||
github.com/livekit/mageutil v0.0.0-20221002073820-d9198083cfdc/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
|
||||
github.com/livekit/mediatransportutil v0.0.0-20221007030528-7440725c362b h1:RBNV8TckETSkIkKxcD12d8nZKVkB9GSY/sQlMoaruP4=
|
||||
github.com/livekit/mediatransportutil v0.0.0-20221007030528-7440725c362b/go.mod h1:1Dlx20JPoIKGP45eo+yuj0HjeE25zmyeX/EWHiPCjFw=
|
||||
github.com/livekit/protocol v1.1.3-0.20221026203734-d3635b12268c h1:5BciCRrrcYE8HyKACliG2RTwNhkT8dYtPu4rp2O8Sq4=
|
||||
github.com/livekit/protocol v1.1.3-0.20221026203734-d3635b12268c/go.mod h1:BIjSeLm8mZA7c91gKGwyXzenMFxVva0wjbxOftSGuEI=
|
||||
github.com/livekit/protocol v1.2.1 h1:om1yWCgFIvSrnBXvwe8vMsINN6gBwQOafGoqptoREKQ=
|
||||
github.com/livekit/protocol v1.2.1/go.mod h1:Xz4GHQXR8FGH6+mvAQ8GN7/vXjC0QinDHCJsSKxlGNc=
|
||||
github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995 h1:vOaY2qvfLihDyeZtnGGN1Law9wRrw8BMGCr1TygTvMw=
|
||||
github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995/go.mod h1:116ych8UaEs9vfIE8n6iZCZ30iagUFTls0vRmC+Ix5U=
|
||||
github.com/mackerelio/go-osstat v0.2.3 h1:jAMXD5erlDE39kdX2CU7YwCGRcxIO33u/p8+Fhe5dJw=
|
||||
@@ -357,7 +357,6 @@ github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5Fsn
|
||||
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
|
||||
github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
|
||||
github.com/prometheus/client_golang v1.12.1/go.mod h1:3Z9XVyYiZYEO+YQWt3RD2R3jrbd179Rt297l4aS6nDY=
|
||||
github.com/prometheus/client_golang v1.13.0/go.mod h1:vTeo+zgvILHsnnj/39Ou/1fPN5nJFOEMgftOUOmlvYQ=
|
||||
github.com/prometheus/client_golang v1.13.1 h1:3gMjIY2+/hzmqhtUC/aQNYldJA6DtH3CgQvwS+02K1c=
|
||||
github.com/prometheus/client_golang v1.13.1/go.mod h1:vTeo+zgvILHsnnj/39Ou/1fPN5nJFOEMgftOUOmlvYQ=
|
||||
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
|
||||
|
||||
@@ -93,7 +93,7 @@ type MessageRouter interface {
|
||||
WriteRoomRTC(ctx context.Context, roomName livekit.RoomName, msg *livekit.RTCNodeMessage) error
|
||||
}
|
||||
|
||||
func CreateRouter(rc *redis.Client, node LocalNode) Router {
|
||||
func CreateRouter(rc redis.UniversalClient, node LocalNode) Router {
|
||||
if rc != nil {
|
||||
return NewRedisRouter(node, rc)
|
||||
}
|
||||
|
||||
@@ -38,7 +38,7 @@ func signalNodeChannel(nodeID livekit.NodeID) string {
|
||||
return "signal_channel:" + string(nodeID)
|
||||
}
|
||||
|
||||
func publishRTCMessage(rc *redis.Client, nodeID livekit.NodeID, participantKey livekit.ParticipantKey, msg proto.Message) error {
|
||||
func publishRTCMessage(rc redis.UniversalClient, nodeID livekit.NodeID, participantKey livekit.ParticipantKey, msg proto.Message) error {
|
||||
rm := &livekit.RTCNodeMessage{
|
||||
ParticipantKey: string(participantKey),
|
||||
}
|
||||
@@ -67,7 +67,7 @@ func publishRTCMessage(rc *redis.Client, nodeID livekit.NodeID, participantKey l
|
||||
return rc.Publish(redisCtx, rtcNodeChannel(nodeID), data).Err()
|
||||
}
|
||||
|
||||
func publishSignalMessage(rc *redis.Client, nodeID livekit.NodeID, connectionID livekit.ConnectionID, msg proto.Message) error {
|
||||
func publishSignalMessage(rc redis.UniversalClient, nodeID livekit.NodeID, connectionID livekit.ConnectionID, msg proto.Message) error {
|
||||
rm := &livekit.SignalNodeMessage{
|
||||
ConnectionId: string(connectionID),
|
||||
}
|
||||
@@ -94,14 +94,14 @@ func publishSignalMessage(rc *redis.Client, nodeID livekit.NodeID, connectionID
|
||||
}
|
||||
|
||||
type RTCNodeSink struct {
|
||||
rc *redis.Client
|
||||
rc redis.UniversalClient
|
||||
nodeID livekit.NodeID
|
||||
participantKey livekit.ParticipantKey
|
||||
isClosed atomic.Bool
|
||||
onClose func()
|
||||
}
|
||||
|
||||
func NewRTCNodeSink(rc *redis.Client, nodeID livekit.NodeID, participantKey livekit.ParticipantKey) *RTCNodeSink {
|
||||
func NewRTCNodeSink(rc redis.UniversalClient, nodeID livekit.NodeID, participantKey livekit.ParticipantKey) *RTCNodeSink {
|
||||
return &RTCNodeSink{
|
||||
rc: rc,
|
||||
nodeID: nodeID,
|
||||
@@ -130,14 +130,14 @@ func (s *RTCNodeSink) OnClose(f func()) {
|
||||
}
|
||||
|
||||
type SignalNodeSink struct {
|
||||
rc *redis.Client
|
||||
rc redis.UniversalClient
|
||||
nodeID livekit.NodeID
|
||||
connectionID livekit.ConnectionID
|
||||
isClosed atomic.Bool
|
||||
onClose func()
|
||||
}
|
||||
|
||||
func NewSignalNodeSink(rc *redis.Client, nodeID livekit.NodeID, connectionID livekit.ConnectionID) *SignalNodeSink {
|
||||
func NewSignalNodeSink(rc redis.UniversalClient, nodeID livekit.NodeID, connectionID livekit.ConnectionID) *SignalNodeSink {
|
||||
return &SignalNodeSink{
|
||||
rc: rc,
|
||||
nodeID: nodeID,
|
||||
|
||||
@@ -33,7 +33,7 @@ const (
|
||||
type RedisRouter struct {
|
||||
LocalRouter
|
||||
|
||||
rc *redis.Client
|
||||
rc redis.UniversalClient
|
||||
ctx context.Context
|
||||
isStarted atomic.Bool
|
||||
nodeMu sync.RWMutex
|
||||
@@ -44,7 +44,7 @@ type RedisRouter struct {
|
||||
cancel func()
|
||||
}
|
||||
|
||||
func NewRedisRouter(currentNode LocalNode, rc *redis.Client) *RedisRouter {
|
||||
func NewRedisRouter(currentNode LocalNode, rc redis.UniversalClient) *RedisRouter {
|
||||
rr := &RedisRouter{
|
||||
LocalRouter: *NewLocalRouter(currentNode),
|
||||
rc: rc,
|
||||
|
||||
@@ -48,12 +48,12 @@ const (
|
||||
)
|
||||
|
||||
type RedisStore struct {
|
||||
rc *redis.Client
|
||||
rc redis.UniversalClient
|
||||
ctx context.Context
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
func NewRedisStore(rc *redis.Client) *RedisStore {
|
||||
func NewRedisStore(rc redis.UniversalClient) *RedisStore {
|
||||
return &RedisStore{
|
||||
ctx: context.Background(),
|
||||
rc: rc,
|
||||
|
||||
@@ -117,12 +117,12 @@ func createWebhookNotifier(conf *config.Config, provider auth.KeyProvider) (webh
|
||||
return webhook.NewNotifier(wc.APIKey, secret, wc.URLs), nil
|
||||
}
|
||||
|
||||
func createRedisClient(conf *config.Config) (*redis.Client, error) {
|
||||
func createRedisClient(conf *config.Config) (redis.UniversalClient, error) {
|
||||
if !conf.HasRedis() {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var rc *redis.Client
|
||||
var rc redis.UniversalClient
|
||||
var tlsConfig *tls.Config
|
||||
|
||||
if conf.Redis.UseTLS {
|
||||
@@ -167,7 +167,7 @@ func createRedisClient(conf *config.Config) (*redis.Client, error) {
|
||||
return rc, nil
|
||||
}
|
||||
|
||||
func createStore(rc *redis.Client) ObjectStore {
|
||||
func createStore(rc redis.UniversalClient) ObjectStore {
|
||||
if rc != nil {
|
||||
return NewRedisStore(rc)
|
||||
}
|
||||
|
||||
@@ -35,18 +35,18 @@ import (
|
||||
|
||||
func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*LivekitServer, error) {
|
||||
roomConfig := getRoomConf(conf)
|
||||
client, err := createRedisClient(conf)
|
||||
universalClient, err := createRedisClient(conf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
router := routing.CreateRouter(client, currentNode)
|
||||
objectStore := createStore(client)
|
||||
router := routing.CreateRouter(universalClient, currentNode)
|
||||
objectStore := createStore(universalClient)
|
||||
roomAllocator, err := NewRoomAllocator(conf, router, objectStore)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
nodeID := getNodeID(currentNode)
|
||||
rpcClient := egress.NewRedisRPCClient(nodeID, client)
|
||||
rpcClient := egress.NewRedisRPCClient(nodeID, universalClient)
|
||||
egressStore := getEgressStore(objectStore)
|
||||
keyProvider, err := createKeyProvider(conf)
|
||||
if err != nil {
|
||||
@@ -65,7 +65,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
|
||||
}
|
||||
egressService := NewEgressService(rpcClient, objectStore, egressStore, roomService, telemetryService, rtcEgressLauncher)
|
||||
ingressConfig := getIngressConfig(conf)
|
||||
rpc := ingress.NewRedisRPC(nodeID, client)
|
||||
rpc := ingress.NewRedisRPC(nodeID, universalClient)
|
||||
ingressRPCClient := getIngressRPCClient(rpc)
|
||||
ingressStore := getIngressStore(objectStore)
|
||||
ingressService := NewIngressService(ingressConfig, ingressRPCClient, ingressStore, roomService, telemetryService)
|
||||
@@ -88,11 +88,11 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
|
||||
}
|
||||
|
||||
func InitializeRouter(conf *config.Config, currentNode routing.LocalNode) (routing.Router, error) {
|
||||
client, err := createRedisClient(conf)
|
||||
universalClient, err := createRedisClient(conf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
router := routing.CreateRouter(client, currentNode)
|
||||
router := routing.CreateRouter(universalClient, currentNode)
|
||||
return router, nil
|
||||
}
|
||||
|
||||
@@ -143,12 +143,12 @@ func createWebhookNotifier(conf *config.Config, provider auth.KeyProvider) (webh
|
||||
return webhook.NewNotifier(wc.APIKey, secret, wc.URLs), nil
|
||||
}
|
||||
|
||||
func createRedisClient(conf *config.Config) (*redis.Client, error) {
|
||||
func createRedisClient(conf *config.Config) (redis.UniversalClient, error) {
|
||||
if !conf.HasRedis() {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var rc *redis.Client
|
||||
var rc redis.UniversalClient
|
||||
var tlsConfig *tls.Config
|
||||
|
||||
if conf.Redis.UseTLS {
|
||||
@@ -193,7 +193,7 @@ func createRedisClient(conf *config.Config) (*redis.Client, error) {
|
||||
return rc, nil
|
||||
}
|
||||
|
||||
func createStore(rc *redis.Client) ObjectStore {
|
||||
func createStore(rc redis.UniversalClient) ObjectStore {
|
||||
if rc != nil {
|
||||
return NewRedisStore(rc)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user