Update egress RPC handler (#759)

* egress rpc handler

* add egress statuses

* nil redis client

* update protocol
This commit is contained in:
David Colburn
2022-06-13 12:18:17 -07:00
committed by GitHub
parent 15da445fd7
commit 4be096cf74
5 changed files with 50 additions and 29 deletions
+1 -1
View File
@@ -13,7 +13,7 @@ require (
github.com/google/wire v0.5.0
github.com/gorilla/websocket v1.4.2
github.com/hashicorp/golang-lru v0.5.4
github.com/livekit/protocol v0.13.3-0.20220608063907-5e0777215366
github.com/livekit/protocol v0.13.3-0.20220613185908-e66f32ebb7f5
github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a
github.com/mackerelio/go-osstat v0.2.1
github.com/magefile/mage v1.13.0
+2 -8
View File
@@ -133,18 +133,12 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY/mKrcvA=
github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/livekit/protocol v0.13.3-0.20220530102138-51a8116f88b2 h1:ioAZAmtoQnHELXBKOQr041ro8GwKcZ7lgNuDNbLG1vo=
github.com/livekit/protocol v0.13.3-0.20220530102138-51a8116f88b2/go.mod h1:BLtSeVmn2rLP37xjzw7gHgaAmkWl3L/L9bPvgSbaOfo=
github.com/livekit/protocol v0.13.3-0.20220607094330-e36e6426a094 h1:DwZfNfyg5LaX3N1Eh19rcycTcyzpgqH0DpEkhB1CnI8=
github.com/livekit/protocol v0.13.3-0.20220607094330-e36e6426a094/go.mod h1:BLtSeVmn2rLP37xjzw7gHgaAmkWl3L/L9bPvgSbaOfo=
github.com/livekit/protocol v0.13.3-0.20220608063907-5e0777215366 h1:wm0ftw/i8oZr4lxQLnV4iDpG32T3cUhy6K6wQ5OFqe8=
github.com/livekit/protocol v0.13.3-0.20220608063907-5e0777215366/go.mod h1:BLtSeVmn2rLP37xjzw7gHgaAmkWl3L/L9bPvgSbaOfo=
github.com/livekit/protocol v0.13.3-0.20220613185908-e66f32ebb7f5 h1:uSOLns5ijvm2z19ppei9iJOYs+1Qf1V6Qrp3M4GcQOc=
github.com/livekit/protocol v0.13.3-0.20220613185908-e66f32ebb7f5/go.mod h1:BLtSeVmn2rLP37xjzw7gHgaAmkWl3L/L9bPvgSbaOfo=
github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a h1:cENjhGfslLSDV07gt8ASy47Wd12Q0kBS7hsdunyQ62I=
github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a/go.mod h1:116ych8UaEs9vfIE8n6iZCZ30iagUFTls0vRmC+Ix5U=
github.com/mackerelio/go-osstat v0.2.1 h1:5AeAcBEutEErAOlDz6WCkEvm6AKYgHTUQrfwm5RbeQc=
github.com/mackerelio/go-osstat v0.2.1/go.mod h1:UzRL8dMCCTqG5WdRtsxbuljMpZt9PCAGXqxPst5QtaY=
github.com/magefile/mage v1.11.0 h1:C/55Ywp9BpgVVclD3lRnSYCwXTYxmSppIgLeDYlNuls=
github.com/magefile/mage v1.11.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A=
github.com/magefile/mage v1.13.0 h1:XtLJl8bcCM7EFoO8FyH8XK3t7G5hQAeK+i4tq+veT9M=
github.com/magefile/mage v1.13.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A=
github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0=
+30 -18
View File
@@ -10,22 +10,27 @@ import (
"github.com/livekit/protocol/egress"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
"github.com/livekit/livekit-server/pkg/telemetry"
)
type EgressService struct {
bus utils.MessageBus
rpcClient egress.RPCClient
store ServiceStore
roomService livekit.RoomService
telemetry telemetry.TelemetryService
shutdown chan struct{}
}
func NewEgressService(bus utils.MessageBus, store ServiceStore, rs livekit.RoomService, ts telemetry.TelemetryService) *EgressService {
func NewEgressService(
rpcClient egress.RPCClient,
store ServiceStore,
rs livekit.RoomService,
ts telemetry.TelemetryService,
) *EgressService {
return &EgressService{
bus: bus,
rpcClient: rpcClient,
store: store,
roomService: rs,
telemetry: ts,
@@ -34,8 +39,8 @@ func NewEgressService(bus utils.MessageBus, store ServiceStore, rs livekit.RoomS
}
func (s *EgressService) Start() {
if s.bus != nil {
go s.updateListener()
if s.rpcClient != nil {
go s.updateWorker()
}
}
@@ -71,7 +76,7 @@ func (s *EgressService) StartEgress(ctx context.Context, roomName livekit.RoomNa
if err := EnsureRecordPermission(ctx); err != nil {
return nil, twirpAuthError(err)
}
if s.bus == nil {
if s.rpcClient == nil {
return nil, ErrEgressNotConnected
}
@@ -81,7 +86,7 @@ func (s *EgressService) StartEgress(ctx context.Context, roomName livekit.RoomNa
}
req.RoomId = room.Sid
info, err := egress.SendRequest(ctx, s.bus, req)
info, err := s.rpcClient.SendRequest(ctx, req)
if err != nil {
return nil, err
}
@@ -104,7 +109,7 @@ func (s *EgressService) UpdateLayout(ctx context.Context, req *livekit.UpdateLay
if err := EnsureRecordPermission(ctx); err != nil {
return nil, twirpAuthError(err)
}
if s.bus == nil {
if s.rpcClient == nil {
return nil, ErrEgressNotConnected
}
@@ -146,11 +151,11 @@ func (s *EgressService) UpdateStream(ctx context.Context, req *livekit.UpdateStr
if err := EnsureRecordPermission(ctx); err != nil {
return nil, twirpAuthError(err)
}
if s.bus == nil {
if s.rpcClient == nil {
return nil, ErrEgressNotConnected
}
info, err := egress.SendRequest(ctx, s.bus, &livekit.EgressRequest{
info, err := s.rpcClient.SendRequest(ctx, &livekit.EgressRequest{
EgressId: req.EgressId,
Request: &livekit.EgressRequest_UpdateStream{
UpdateStream: req,
@@ -173,7 +178,7 @@ func (s *EgressService) ListEgress(ctx context.Context, req *livekit.ListEgressR
if err := EnsureRecordPermission(ctx); err != nil {
return nil, twirpAuthError(err)
}
if s.bus == nil {
if s.rpcClient == nil {
return nil, ErrEgressNotConnected
}
@@ -201,11 +206,11 @@ func (s *EgressService) StopEgress(ctx context.Context, req *livekit.StopEgressR
if err := EnsureRecordPermission(ctx); err != nil {
return nil, twirpAuthError(err)
}
if s.bus == nil {
if s.rpcClient == nil {
return nil, ErrEgressNotConnected
}
info, err := egress.SendRequest(ctx, s.bus, &livekit.EgressRequest{
info, err := s.rpcClient.SendRequest(ctx, &livekit.EgressRequest{
EgressId: req.EgressId,
Request: &livekit.EgressRequest_Stop{
Stop: req,
@@ -224,8 +229,8 @@ func (s *EgressService) StopEgress(ctx context.Context, req *livekit.StopEgressR
return info, nil
}
func (s *EgressService) updateListener() {
sub, err := s.bus.SubscribeQueue(context.Background(), egress.ResultsChannel)
func (s *EgressService) updateWorker() {
sub, err := s.rpcClient.GetUpdateChannel(context.Background())
if err != nil {
logger.Errorw("failed to subscribe to results channel", err)
return
@@ -244,13 +249,19 @@ func (s *EgressService) updateListener() {
}
switch res.Status {
case livekit.EgressStatus_EGRESS_ACTIVE, livekit.EgressStatus_EGRESS_ENDING:
case livekit.EgressStatus_EGRESS_ACTIVE,
livekit.EgressStatus_EGRESS_ENDING:
// save updated info to store
err = s.store.UpdateEgress(context.Background(), res)
if err != nil {
logger.Errorw("could not update egress", err)
}
case livekit.EgressStatus_EGRESS_COMPLETE:
case livekit.EgressStatus_EGRESS_COMPLETE,
livekit.EgressStatus_EGRESS_FAILED,
livekit.EgressStatus_EGRESS_ABORTED:
// delete from store
err = s.store.DeleteEgress(context.Background(), res)
if err != nil {
@@ -266,6 +277,7 @@ func (s *EgressService) updateListener() {
s.telemetry.EgressEnded(context.Background(), res)
}
case <-s.shutdown:
_ = sub.Close()
return
+7
View File
@@ -15,6 +15,7 @@ import (
"gopkg.in/yaml.v3"
"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/egress"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
@@ -28,6 +29,7 @@ import (
func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*LivekitServer, error) {
wire.Build(
getNodeID,
createRedisClient,
createMessageBus,
createStore,
@@ -41,6 +43,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
wire.Bind(new(livekit.RoomService), new(*RoomService)),
telemetry.NewAnalyticsService,
telemetry.NewTelemetryService,
egress.NewRedisRPCClient,
NewEgressService,
NewRecordingService,
NewRoomAllocator,
@@ -63,6 +66,10 @@ func InitializeRouter(conf *config.Config, currentNode routing.LocalNode) (routi
return nil, nil
}
func getNodeID(currentNode routing.LocalNode) livekit.NodeID {
return livekit.NodeID(currentNode.Id)
}
func createKeyProvider(conf *config.Config) (auth.KeyProvider, error) {
// prefer keyfile if set
if conf.KeyFile != "" {
+10 -2
View File
@@ -16,6 +16,8 @@ import (
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/egress"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
"github.com/livekit/protocol/webhook"
@@ -46,7 +48,8 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
if err != nil {
return nil, err
}
messageBus := createMessageBus(client)
nodeID := getNodeID(currentNode)
rpcClient := egress.NewRedisRPCClient(nodeID, client)
keyProvider, err := createKeyProvider(conf)
if err != nil {
return nil, err
@@ -57,7 +60,8 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
}
analyticsService := telemetry.NewAnalyticsService(conf, currentNode)
telemetryService := telemetry.NewTelemetryService(notifier, analyticsService)
egressService := NewEgressService(messageBus, objectStore, roomService, telemetryService)
egressService := NewEgressService(rpcClient, objectStore, roomService, telemetryService)
messageBus := createMessageBus(client)
recordingService := NewRecordingService(messageBus, telemetryService)
rtcService := NewRTCService(conf, roomAllocator, objectStore, router, currentNode)
clientConfigurationManager := createClientConfiguration()
@@ -88,6 +92,10 @@ func InitializeRouter(conf *config.Config, currentNode routing.LocalNode) (routi
// wire.go:
func getNodeID(currentNode routing.LocalNode) livekit.NodeID {
return livekit.NodeID(currentNode.Id)
}
func createKeyProvider(conf *config.Config) (auth.KeyProvider, error) {
if conf.KeyFile != "" {