From 4be096cf740501f95dd2a6ef5f4959dc5bc0a202 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Mon, 13 Jun 2022 12:18:17 -0700 Subject: [PATCH] Update egress RPC handler (#759) * egress rpc handler * add egress statuses * nil redis client * update protocol --- go.mod | 2 +- go.sum | 10 ++------- pkg/service/egress.go | 48 +++++++++++++++++++++++++---------------- pkg/service/wire.go | 7 ++++++ pkg/service/wire_gen.go | 12 +++++++++-- 5 files changed, 50 insertions(+), 29 deletions(-) diff --git a/go.mod b/go.mod index b57ced2a8..f4c1572ea 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/google/wire v0.5.0 github.com/gorilla/websocket v1.4.2 github.com/hashicorp/golang-lru v0.5.4 - github.com/livekit/protocol v0.13.3-0.20220608063907-5e0777215366 + github.com/livekit/protocol v0.13.3-0.20220613185908-e66f32ebb7f5 github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a github.com/mackerelio/go-osstat v0.2.1 github.com/magefile/mage v1.13.0 diff --git a/go.sum b/go.sum index f26246ce5..de9464139 100644 --- a/go.sum +++ b/go.sum @@ -133,18 +133,12 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY/mKrcvA= github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= -github.com/livekit/protocol v0.13.3-0.20220530102138-51a8116f88b2 h1:ioAZAmtoQnHELXBKOQr041ro8GwKcZ7lgNuDNbLG1vo= -github.com/livekit/protocol v0.13.3-0.20220530102138-51a8116f88b2/go.mod h1:BLtSeVmn2rLP37xjzw7gHgaAmkWl3L/L9bPvgSbaOfo= -github.com/livekit/protocol v0.13.3-0.20220607094330-e36e6426a094 h1:DwZfNfyg5LaX3N1Eh19rcycTcyzpgqH0DpEkhB1CnI8= -github.com/livekit/protocol v0.13.3-0.20220607094330-e36e6426a094/go.mod h1:BLtSeVmn2rLP37xjzw7gHgaAmkWl3L/L9bPvgSbaOfo= -github.com/livekit/protocol v0.13.3-0.20220608063907-5e0777215366 h1:wm0ftw/i8oZr4lxQLnV4iDpG32T3cUhy6K6wQ5OFqe8= -github.com/livekit/protocol v0.13.3-0.20220608063907-5e0777215366/go.mod h1:BLtSeVmn2rLP37xjzw7gHgaAmkWl3L/L9bPvgSbaOfo= +github.com/livekit/protocol v0.13.3-0.20220613185908-e66f32ebb7f5 h1:uSOLns5ijvm2z19ppei9iJOYs+1Qf1V6Qrp3M4GcQOc= +github.com/livekit/protocol v0.13.3-0.20220613185908-e66f32ebb7f5/go.mod h1:BLtSeVmn2rLP37xjzw7gHgaAmkWl3L/L9bPvgSbaOfo= github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a h1:cENjhGfslLSDV07gt8ASy47Wd12Q0kBS7hsdunyQ62I= github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a/go.mod h1:116ych8UaEs9vfIE8n6iZCZ30iagUFTls0vRmC+Ix5U= github.com/mackerelio/go-osstat v0.2.1 h1:5AeAcBEutEErAOlDz6WCkEvm6AKYgHTUQrfwm5RbeQc= github.com/mackerelio/go-osstat v0.2.1/go.mod h1:UzRL8dMCCTqG5WdRtsxbuljMpZt9PCAGXqxPst5QtaY= -github.com/magefile/mage v1.11.0 h1:C/55Ywp9BpgVVclD3lRnSYCwXTYxmSppIgLeDYlNuls= -github.com/magefile/mage v1.11.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= github.com/magefile/mage v1.13.0 h1:XtLJl8bcCM7EFoO8FyH8XK3t7G5hQAeK+i4tq+veT9M= github.com/magefile/mage v1.13.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0= diff --git a/pkg/service/egress.go b/pkg/service/egress.go index ea7b2880f..52f0d14af 100644 --- a/pkg/service/egress.go +++ b/pkg/service/egress.go @@ -10,22 +10,27 @@ import ( "github.com/livekit/protocol/egress" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" - "github.com/livekit/protocol/utils" "github.com/livekit/livekit-server/pkg/telemetry" ) type EgressService struct { - bus utils.MessageBus + rpcClient egress.RPCClient store ServiceStore roomService livekit.RoomService telemetry telemetry.TelemetryService shutdown chan struct{} } -func NewEgressService(bus utils.MessageBus, store ServiceStore, rs livekit.RoomService, ts telemetry.TelemetryService) *EgressService { +func NewEgressService( + rpcClient egress.RPCClient, + store ServiceStore, + rs livekit.RoomService, + ts telemetry.TelemetryService, +) *EgressService { + return &EgressService{ - bus: bus, + rpcClient: rpcClient, store: store, roomService: rs, telemetry: ts, @@ -34,8 +39,8 @@ func NewEgressService(bus utils.MessageBus, store ServiceStore, rs livekit.RoomS } func (s *EgressService) Start() { - if s.bus != nil { - go s.updateListener() + if s.rpcClient != nil { + go s.updateWorker() } } @@ -71,7 +76,7 @@ func (s *EgressService) StartEgress(ctx context.Context, roomName livekit.RoomNa if err := EnsureRecordPermission(ctx); err != nil { return nil, twirpAuthError(err) } - if s.bus == nil { + if s.rpcClient == nil { return nil, ErrEgressNotConnected } @@ -81,7 +86,7 @@ func (s *EgressService) StartEgress(ctx context.Context, roomName livekit.RoomNa } req.RoomId = room.Sid - info, err := egress.SendRequest(ctx, s.bus, req) + info, err := s.rpcClient.SendRequest(ctx, req) if err != nil { return nil, err } @@ -104,7 +109,7 @@ func (s *EgressService) UpdateLayout(ctx context.Context, req *livekit.UpdateLay if err := EnsureRecordPermission(ctx); err != nil { return nil, twirpAuthError(err) } - if s.bus == nil { + if s.rpcClient == nil { return nil, ErrEgressNotConnected } @@ -146,11 +151,11 @@ func (s *EgressService) UpdateStream(ctx context.Context, req *livekit.UpdateStr if err := EnsureRecordPermission(ctx); err != nil { return nil, twirpAuthError(err) } - if s.bus == nil { + if s.rpcClient == nil { return nil, ErrEgressNotConnected } - info, err := egress.SendRequest(ctx, s.bus, &livekit.EgressRequest{ + info, err := s.rpcClient.SendRequest(ctx, &livekit.EgressRequest{ EgressId: req.EgressId, Request: &livekit.EgressRequest_UpdateStream{ UpdateStream: req, @@ -173,7 +178,7 @@ func (s *EgressService) ListEgress(ctx context.Context, req *livekit.ListEgressR if err := EnsureRecordPermission(ctx); err != nil { return nil, twirpAuthError(err) } - if s.bus == nil { + if s.rpcClient == nil { return nil, ErrEgressNotConnected } @@ -201,11 +206,11 @@ func (s *EgressService) StopEgress(ctx context.Context, req *livekit.StopEgressR if err := EnsureRecordPermission(ctx); err != nil { return nil, twirpAuthError(err) } - if s.bus == nil { + if s.rpcClient == nil { return nil, ErrEgressNotConnected } - info, err := egress.SendRequest(ctx, s.bus, &livekit.EgressRequest{ + info, err := s.rpcClient.SendRequest(ctx, &livekit.EgressRequest{ EgressId: req.EgressId, Request: &livekit.EgressRequest_Stop{ Stop: req, @@ -224,8 +229,8 @@ func (s *EgressService) StopEgress(ctx context.Context, req *livekit.StopEgressR return info, nil } -func (s *EgressService) updateListener() { - sub, err := s.bus.SubscribeQueue(context.Background(), egress.ResultsChannel) +func (s *EgressService) updateWorker() { + sub, err := s.rpcClient.GetUpdateChannel(context.Background()) if err != nil { logger.Errorw("failed to subscribe to results channel", err) return @@ -244,13 +249,19 @@ func (s *EgressService) updateListener() { } switch res.Status { - case livekit.EgressStatus_EGRESS_ACTIVE, livekit.EgressStatus_EGRESS_ENDING: + case livekit.EgressStatus_EGRESS_ACTIVE, + livekit.EgressStatus_EGRESS_ENDING: + // save updated info to store err = s.store.UpdateEgress(context.Background(), res) if err != nil { logger.Errorw("could not update egress", err) } - case livekit.EgressStatus_EGRESS_COMPLETE: + + case livekit.EgressStatus_EGRESS_COMPLETE, + livekit.EgressStatus_EGRESS_FAILED, + livekit.EgressStatus_EGRESS_ABORTED: + // delete from store err = s.store.DeleteEgress(context.Background(), res) if err != nil { @@ -266,6 +277,7 @@ func (s *EgressService) updateListener() { s.telemetry.EgressEnded(context.Background(), res) } + case <-s.shutdown: _ = sub.Close() return diff --git a/pkg/service/wire.go b/pkg/service/wire.go index 8d0b7eeee..d2f3b7367 100644 --- a/pkg/service/wire.go +++ b/pkg/service/wire.go @@ -15,6 +15,7 @@ import ( "gopkg.in/yaml.v3" "github.com/livekit/protocol/auth" + "github.com/livekit/protocol/egress" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils" @@ -28,6 +29,7 @@ import ( func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*LivekitServer, error) { wire.Build( + getNodeID, createRedisClient, createMessageBus, createStore, @@ -41,6 +43,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live wire.Bind(new(livekit.RoomService), new(*RoomService)), telemetry.NewAnalyticsService, telemetry.NewTelemetryService, + egress.NewRedisRPCClient, NewEgressService, NewRecordingService, NewRoomAllocator, @@ -63,6 +66,10 @@ func InitializeRouter(conf *config.Config, currentNode routing.LocalNode) (routi return nil, nil } +func getNodeID(currentNode routing.LocalNode) livekit.NodeID { + return livekit.NodeID(currentNode.Id) +} + func createKeyProvider(conf *config.Config) (auth.KeyProvider, error) { // prefer keyfile if set if conf.KeyFile != "" { diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index a7c7e8e49..11c64b474 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -16,6 +16,8 @@ import ( "github.com/livekit/livekit-server/pkg/routing" "github.com/livekit/livekit-server/pkg/telemetry" "github.com/livekit/protocol/auth" + "github.com/livekit/protocol/egress" + "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils" "github.com/livekit/protocol/webhook" @@ -46,7 +48,8 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live if err != nil { return nil, err } - messageBus := createMessageBus(client) + nodeID := getNodeID(currentNode) + rpcClient := egress.NewRedisRPCClient(nodeID, client) keyProvider, err := createKeyProvider(conf) if err != nil { return nil, err @@ -57,7 +60,8 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live } analyticsService := telemetry.NewAnalyticsService(conf, currentNode) telemetryService := telemetry.NewTelemetryService(notifier, analyticsService) - egressService := NewEgressService(messageBus, objectStore, roomService, telemetryService) + egressService := NewEgressService(rpcClient, objectStore, roomService, telemetryService) + messageBus := createMessageBus(client) recordingService := NewRecordingService(messageBus, telemetryService) rtcService := NewRTCService(conf, roomAllocator, objectStore, router, currentNode) clientConfigurationManager := createClientConfiguration() @@ -88,6 +92,10 @@ func InitializeRouter(conf *config.Config, currentNode routing.LocalNode) (routi // wire.go: +func getNodeID(currentNode routing.LocalNode) livekit.NodeID { + return livekit.NodeID(currentNode.Id) +} + func createKeyProvider(conf *config.Config) (auth.KeyProvider, error) { if conf.KeyFile != "" {