From c735668f67b1303bb5b5182efd470a032ffb812e Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Mon, 7 Nov 2022 17:27:28 -0800 Subject: [PATCH] Use the redis.UniversalClient interface instead of *redis.Client when interacting with go-redis (#1149) * Use the redis.UniversalClient interface instead of *redis.Client when interacting with go-redis * Update protocol to v1.2.1 --- go.mod | 2 +- go.sum | 5 ++--- pkg/routing/interfaces.go | 2 +- pkg/routing/redis.go | 12 ++++++------ pkg/routing/redisrouter.go | 4 ++-- pkg/service/redisstore.go | 4 ++-- pkg/service/wire.go | 6 +++--- pkg/service/wire_gen.go | 20 ++++++++++---------- 8 files changed, 27 insertions(+), 28 deletions(-) diff --git a/go.mod b/go.mod index de4b0078a..6761af35b 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/hashicorp/golang-lru v0.5.4 github.com/livekit/mageutil v0.0.0-20221002073820-d9198083cfdc github.com/livekit/mediatransportutil v0.0.0-20221007030528-7440725c362b - github.com/livekit/protocol v1.1.3-0.20221026203734-d3635b12268c + github.com/livekit/protocol v1.2.1 github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995 github.com/mackerelio/go-osstat v0.2.3 github.com/magefile/mage v1.14.0 diff --git a/go.sum b/go.sum index b08b9aa13..ae61400ad 100644 --- a/go.sum +++ b/go.sum @@ -246,8 +246,8 @@ github.com/livekit/mageutil v0.0.0-20221002073820-d9198083cfdc h1:e3GIA9AL6h4a38 github.com/livekit/mageutil v0.0.0-20221002073820-d9198083cfdc/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20221007030528-7440725c362b h1:RBNV8TckETSkIkKxcD12d8nZKVkB9GSY/sQlMoaruP4= github.com/livekit/mediatransportutil v0.0.0-20221007030528-7440725c362b/go.mod h1:1Dlx20JPoIKGP45eo+yuj0HjeE25zmyeX/EWHiPCjFw= -github.com/livekit/protocol v1.1.3-0.20221026203734-d3635b12268c h1:5BciCRrrcYE8HyKACliG2RTwNhkT8dYtPu4rp2O8Sq4= -github.com/livekit/protocol v1.1.3-0.20221026203734-d3635b12268c/go.mod h1:BIjSeLm8mZA7c91gKGwyXzenMFxVva0wjbxOftSGuEI= +github.com/livekit/protocol v1.2.1 h1:om1yWCgFIvSrnBXvwe8vMsINN6gBwQOafGoqptoREKQ= +github.com/livekit/protocol v1.2.1/go.mod h1:Xz4GHQXR8FGH6+mvAQ8GN7/vXjC0QinDHCJsSKxlGNc= github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995 h1:vOaY2qvfLihDyeZtnGGN1Law9wRrw8BMGCr1TygTvMw= github.com/livekit/rtcscore-go v0.0.0-20220815072451-20ee10ae1995/go.mod h1:116ych8UaEs9vfIE8n6iZCZ30iagUFTls0vRmC+Ix5U= github.com/mackerelio/go-osstat v0.2.3 h1:jAMXD5erlDE39kdX2CU7YwCGRcxIO33u/p8+Fhe5dJw= @@ -357,7 +357,6 @@ github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5Fsn github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0= github.com/prometheus/client_golang v1.12.1/go.mod h1:3Z9XVyYiZYEO+YQWt3RD2R3jrbd179Rt297l4aS6nDY= -github.com/prometheus/client_golang v1.13.0/go.mod h1:vTeo+zgvILHsnnj/39Ou/1fPN5nJFOEMgftOUOmlvYQ= github.com/prometheus/client_golang v1.13.1 h1:3gMjIY2+/hzmqhtUC/aQNYldJA6DtH3CgQvwS+02K1c= github.com/prometheus/client_golang v1.13.1/go.mod h1:vTeo+zgvILHsnnj/39Ou/1fPN5nJFOEMgftOUOmlvYQ= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= diff --git a/pkg/routing/interfaces.go b/pkg/routing/interfaces.go index cb1728bc2..53e835a0a 100644 --- a/pkg/routing/interfaces.go +++ b/pkg/routing/interfaces.go @@ -93,7 +93,7 @@ type MessageRouter interface { WriteRoomRTC(ctx context.Context, roomName livekit.RoomName, msg *livekit.RTCNodeMessage) error } -func CreateRouter(rc *redis.Client, node LocalNode) Router { +func CreateRouter(rc redis.UniversalClient, node LocalNode) Router { if rc != nil { return NewRedisRouter(node, rc) } diff --git a/pkg/routing/redis.go b/pkg/routing/redis.go index c1f091737..afa0163fa 100644 --- a/pkg/routing/redis.go +++ b/pkg/routing/redis.go @@ -38,7 +38,7 @@ func signalNodeChannel(nodeID livekit.NodeID) string { return "signal_channel:" + string(nodeID) } -func publishRTCMessage(rc *redis.Client, nodeID livekit.NodeID, participantKey livekit.ParticipantKey, msg proto.Message) error { +func publishRTCMessage(rc redis.UniversalClient, nodeID livekit.NodeID, participantKey livekit.ParticipantKey, msg proto.Message) error { rm := &livekit.RTCNodeMessage{ ParticipantKey: string(participantKey), } @@ -67,7 +67,7 @@ func publishRTCMessage(rc *redis.Client, nodeID livekit.NodeID, participantKey l return rc.Publish(redisCtx, rtcNodeChannel(nodeID), data).Err() } -func publishSignalMessage(rc *redis.Client, nodeID livekit.NodeID, connectionID livekit.ConnectionID, msg proto.Message) error { +func publishSignalMessage(rc redis.UniversalClient, nodeID livekit.NodeID, connectionID livekit.ConnectionID, msg proto.Message) error { rm := &livekit.SignalNodeMessage{ ConnectionId: string(connectionID), } @@ -94,14 +94,14 @@ func publishSignalMessage(rc *redis.Client, nodeID livekit.NodeID, connectionID } type RTCNodeSink struct { - rc *redis.Client + rc redis.UniversalClient nodeID livekit.NodeID participantKey livekit.ParticipantKey isClosed atomic.Bool onClose func() } -func NewRTCNodeSink(rc *redis.Client, nodeID livekit.NodeID, participantKey livekit.ParticipantKey) *RTCNodeSink { +func NewRTCNodeSink(rc redis.UniversalClient, nodeID livekit.NodeID, participantKey livekit.ParticipantKey) *RTCNodeSink { return &RTCNodeSink{ rc: rc, nodeID: nodeID, @@ -130,14 +130,14 @@ func (s *RTCNodeSink) OnClose(f func()) { } type SignalNodeSink struct { - rc *redis.Client + rc redis.UniversalClient nodeID livekit.NodeID connectionID livekit.ConnectionID isClosed atomic.Bool onClose func() } -func NewSignalNodeSink(rc *redis.Client, nodeID livekit.NodeID, connectionID livekit.ConnectionID) *SignalNodeSink { +func NewSignalNodeSink(rc redis.UniversalClient, nodeID livekit.NodeID, connectionID livekit.ConnectionID) *SignalNodeSink { return &SignalNodeSink{ rc: rc, nodeID: nodeID, diff --git a/pkg/routing/redisrouter.go b/pkg/routing/redisrouter.go index 86190a4e1..15b1ce78a 100644 --- a/pkg/routing/redisrouter.go +++ b/pkg/routing/redisrouter.go @@ -33,7 +33,7 @@ const ( type RedisRouter struct { LocalRouter - rc *redis.Client + rc redis.UniversalClient ctx context.Context isStarted atomic.Bool nodeMu sync.RWMutex @@ -44,7 +44,7 @@ type RedisRouter struct { cancel func() } -func NewRedisRouter(currentNode LocalNode, rc *redis.Client) *RedisRouter { +func NewRedisRouter(currentNode LocalNode, rc redis.UniversalClient) *RedisRouter { rr := &RedisRouter{ LocalRouter: *NewLocalRouter(currentNode), rc: rc, diff --git a/pkg/service/redisstore.go b/pkg/service/redisstore.go index 8835ad64a..cb846cb4c 100644 --- a/pkg/service/redisstore.go +++ b/pkg/service/redisstore.go @@ -48,12 +48,12 @@ const ( ) type RedisStore struct { - rc *redis.Client + rc redis.UniversalClient ctx context.Context done chan struct{} } -func NewRedisStore(rc *redis.Client) *RedisStore { +func NewRedisStore(rc redis.UniversalClient) *RedisStore { return &RedisStore{ ctx: context.Background(), rc: rc, diff --git a/pkg/service/wire.go b/pkg/service/wire.go index be6f8cf04..7d44e79a0 100644 --- a/pkg/service/wire.go +++ b/pkg/service/wire.go @@ -117,12 +117,12 @@ func createWebhookNotifier(conf *config.Config, provider auth.KeyProvider) (webh return webhook.NewNotifier(wc.APIKey, secret, wc.URLs), nil } -func createRedisClient(conf *config.Config) (*redis.Client, error) { +func createRedisClient(conf *config.Config) (redis.UniversalClient, error) { if !conf.HasRedis() { return nil, nil } - var rc *redis.Client + var rc redis.UniversalClient var tlsConfig *tls.Config if conf.Redis.UseTLS { @@ -167,7 +167,7 @@ func createRedisClient(conf *config.Config) (*redis.Client, error) { return rc, nil } -func createStore(rc *redis.Client) ObjectStore { +func createStore(rc redis.UniversalClient) ObjectStore { if rc != nil { return NewRedisStore(rc) } diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index b6da2453c..7f356d329 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -35,18 +35,18 @@ import ( func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*LivekitServer, error) { roomConfig := getRoomConf(conf) - client, err := createRedisClient(conf) + universalClient, err := createRedisClient(conf) if err != nil { return nil, err } - router := routing.CreateRouter(client, currentNode) - objectStore := createStore(client) + router := routing.CreateRouter(universalClient, currentNode) + objectStore := createStore(universalClient) roomAllocator, err := NewRoomAllocator(conf, router, objectStore) if err != nil { return nil, err } nodeID := getNodeID(currentNode) - rpcClient := egress.NewRedisRPCClient(nodeID, client) + rpcClient := egress.NewRedisRPCClient(nodeID, universalClient) egressStore := getEgressStore(objectStore) keyProvider, err := createKeyProvider(conf) if err != nil { @@ -65,7 +65,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live } egressService := NewEgressService(rpcClient, objectStore, egressStore, roomService, telemetryService, rtcEgressLauncher) ingressConfig := getIngressConfig(conf) - rpc := ingress.NewRedisRPC(nodeID, client) + rpc := ingress.NewRedisRPC(nodeID, universalClient) ingressRPCClient := getIngressRPCClient(rpc) ingressStore := getIngressStore(objectStore) ingressService := NewIngressService(ingressConfig, ingressRPCClient, ingressStore, roomService, telemetryService) @@ -88,11 +88,11 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live } func InitializeRouter(conf *config.Config, currentNode routing.LocalNode) (routing.Router, error) { - client, err := createRedisClient(conf) + universalClient, err := createRedisClient(conf) if err != nil { return nil, err } - router := routing.CreateRouter(client, currentNode) + router := routing.CreateRouter(universalClient, currentNode) return router, nil } @@ -143,12 +143,12 @@ func createWebhookNotifier(conf *config.Config, provider auth.KeyProvider) (webh return webhook.NewNotifier(wc.APIKey, secret, wc.URLs), nil } -func createRedisClient(conf *config.Config) (*redis.Client, error) { +func createRedisClient(conf *config.Config) (redis.UniversalClient, error) { if !conf.HasRedis() { return nil, nil } - var rc *redis.Client + var rc redis.UniversalClient var tlsConfig *tls.Config if conf.Redis.UseTLS { @@ -193,7 +193,7 @@ func createRedisClient(conf *config.Config) (*redis.Client, error) { return rc, nil } -func createStore(rc *redis.Client) ObjectStore { +func createStore(rc redis.UniversalClient) ObjectStore { if rc != nil { return NewRedisStore(rc) }