From 2c9ef2f6bbdffe63a02406fbda4730bca25466e5 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Fri, 8 Oct 2021 11:44:41 -0700 Subject: [PATCH] Update recording service (#137) * update recording service * return empty for EndRecording * update protocol --- go.mod | 4 +- go.sum | 4 +- pkg/service/recordingservice.go | 125 ++++++++++++++++++-------------- 3 files changed, 73 insertions(+), 60 deletions(-) diff --git a/go.mod b/go.mod index bec821c1d..c8373b142 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/google/wire v0.5.0 github.com/gorilla/websocket v1.4.2 github.com/jxskiss/base62 v0.0.0-20191017122030-4f11678b909b - github.com/livekit/protocol v0.9.4 + github.com/livekit/protocol v0.9.7 github.com/magefile/mage v1.11.0 github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0 github.com/mitchellh/go-homedir v1.1.0 @@ -47,5 +47,3 @@ require ( ) replace github.com/pion/ion-sfu => github.com/livekit/ion-sfu v1.20.11 - -replace github.com/livekit/protocol => github.com/livekit/protocol v0.9.6-0.20211006235141-c785be2288fd diff --git a/go.sum b/go.sum index 6c0373c38..c74e64076 100644 --- a/go.sum +++ b/go.sum @@ -246,8 +246,8 @@ github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= github.com/livekit/ion-sfu v1.20.11 h1:QhfksN1jcYn9VvVLKHSxrlaPNpSxx1j57tsuncF5DbI= github.com/livekit/ion-sfu v1.20.11/go.mod h1:IVcCb8yMl5qHq+8InP5v9HgrjgnPZny8GhFS86YOtug= -github.com/livekit/protocol v0.9.6-0.20211006235141-c785be2288fd h1:Q7wRRzJGqK6i3hAhgYtD82h2TypwSrySTrtk6AKWtI4= -github.com/livekit/protocol v0.9.6-0.20211006235141-c785be2288fd/go.mod h1:MEKn847Iu/2U8ClZyUmEm2oHn8k9fnSHy81Wv8kkSDo= +github.com/livekit/protocol v0.9.7 h1:GrDvTJTD8wFeoPk6XsLUB/C6pz0RRwa6EVr1trWvPAY= +github.com/livekit/protocol v0.9.7/go.mod h1:MEKn847Iu/2U8ClZyUmEm2oHn8k9fnSHy81Wv8kkSDo= github.com/lucsky/cuid v1.0.2 h1:z4XlExeoderxoPj2/dxKOyPxe9RCOu7yNq9/XWxIUMQ= github.com/lucsky/cuid v1.0.2/go.mod h1:QaaJqckboimOmhRSJXSx/+IT+VTfxfPGSo/6mfgUfmE= github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= diff --git a/pkg/service/recordingservice.go b/pkg/service/recordingservice.go index b3faa17a8..60c3413d9 100644 --- a/pkg/service/recordingservice.go +++ b/pkg/service/recordingservice.go @@ -3,34 +3,33 @@ package service import ( "context" "errors" - "fmt" "time" "github.com/livekit/protocol/logger" livekit "github.com/livekit/protocol/proto" + "github.com/livekit/protocol/recording" "github.com/livekit/protocol/utils" "github.com/livekit/protocol/webhook" "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/emptypb" ) -const lockExpiration = time.Second * 5 - type RecordingService struct { - mb utils.MessageBus + bus utils.MessageBus notifier webhook.Notifier shutdown chan struct{} } func NewRecordingService(mb utils.MessageBus, notifier webhook.Notifier) *RecordingService { return &RecordingService{ - mb: mb, + bus: mb, notifier: notifier, shutdown: make(chan struct{}, 1), } } func (s *RecordingService) Start() { - if s.mb != nil { + if s.bus != nil { go s.resultsWorker() } } @@ -39,78 +38,93 @@ func (s *RecordingService) Stop() { s.shutdown <- struct{}{} } -func (s *RecordingService) StartRecording(ctx context.Context, req *livekit.StartRecordingRequest) (*livekit.RecordingResponse, error) { +func (s *RecordingService) StartRecording(ctx context.Context, req *livekit.StartRecordingRequest) (*livekit.StartRecordingResponse, error) { if err := EnsureRecordPermission(ctx); err != nil { return nil, twirpAuthError(err) } - - if s.mb == nil { + if s.bus == nil { return nil, errors.New("recording not configured (redis required)") } - // reserve a recorder - recordingID, err := s.reserveRecorder(ctx, req) + // reserve a recorde + recordingId, err := recording.ReserveRecorder(s.bus) if err != nil { return nil, err } // start the recording - err = s.mb.Publish(ctx, utils.StartRecordingChannel(recordingID), nil) + err = recording.RPC(ctx, s.bus, recordingId, &livekit.RecordingRequest{ + Request: &livekit.RecordingRequest_Start{ + Start: req, + }, + }) if err != nil { return nil, err } - return &livekit.RecordingResponse{RecordingId: recordingID}, nil + return &livekit.StartRecordingResponse{RecordingId: recordingId}, nil } -func (s *RecordingService) reserveRecorder(ctx context.Context, req *livekit.StartRecordingRequest) (string, error) { - id := utils.NewGuid(utils.RecordingPrefix) - reservation := &livekit.RecordingReservation{ - Id: id, - SubmittedAt: time.Now().UnixNano(), - Request: req, - } - b, err := proto.Marshal(reservation) - if err != nil { - return "", err - } - - sub, err := s.mb.Subscribe(ctx, utils.ReservationResponseChannel(id)) - if err != nil { - return "", err - } - defer sub.Close() - - if err = s.mb.Publish(ctx, utils.ReservationChannel, string(b)); err != nil { - return "", err - } - - select { - case <-sub.Channel(): - return id, nil - case <-time.After(utils.RecorderTimeout): - return "", errors.New("recording request failed") - } -} - -func (s *RecordingService) EndRecording(ctx context.Context, req *livekit.EndRecordingRequest) (*livekit.RecordingResponse, error) { +func (s *RecordingService) AddOutput(ctx context.Context, req *livekit.AddOutputRequest) (*emptypb.Empty, error) { if err := EnsureRecordPermission(ctx); err != nil { return nil, twirpAuthError(err) } - - if s.mb == nil { + if s.bus == nil { return nil, errors.New("recording not configured (redis required)") } - if err := s.mb.Publish(ctx, utils.EndRecordingChannel(req.RecordingId), nil); err != nil { + err := recording.RPC(ctx, s.bus, req.RecordingId, &livekit.RecordingRequest{ + Request: &livekit.RecordingRequest_AddOutput{ + AddOutput: req, + }, + }) + if err != nil { + return nil, err + } + return &emptypb.Empty{}, nil +} + +func (s *RecordingService) RemoveOutput(ctx context.Context, req *livekit.RemoveOutputRequest) (*emptypb.Empty, error) { + if err := EnsureRecordPermission(ctx); err != nil { + return nil, twirpAuthError(err) + } + if s.bus == nil { + return nil, errors.New("recording not configured (redis required)") + } + + err := recording.RPC(ctx, s.bus, req.RecordingId, &livekit.RecordingRequest{ + Request: &livekit.RecordingRequest_RemoveOutput{ + RemoveOutput: req, + }, + }) + if err != nil { + return nil, err + } + return &emptypb.Empty{}, nil +} + +func (s *RecordingService) EndRecording(ctx context.Context, req *livekit.EndRecordingRequest) (*emptypb.Empty, error) { + if err := EnsureRecordPermission(ctx); err != nil { + return nil, twirpAuthError(err) + } + if s.bus == nil { + return nil, errors.New("recording not configured (redis required)") + } + + err := recording.RPC(ctx, s.bus, req.RecordingId, &livekit.RecordingRequest{ + Request: &livekit.RecordingRequest_End{ + End: req, + }, + }) + if err != nil { return nil, err } - return &livekit.RecordingResponse{RecordingId: req.RecordingId}, nil + return &emptypb.Empty{}, nil } func (s *RecordingService) resultsWorker() { - sub, err := s.mb.SubscribeQueue(context.Background(), utils.RecordingResultChannel) + sub, err := s.bus.SubscribeQueue(context.Background(), recording.ResultChannel) if err != nil { logger.Errorw("failed to subscribe to results channel", err) return @@ -137,15 +151,16 @@ func (s *RecordingService) resultsWorker() { func (s *RecordingService) notify(res *livekit.RecordingResult) { // log results + values := []interface{}{"id", res.Id} if res.Error != "" { - logger.Errorw("recording failed", errors.New(res.Error), "id", res.Id) + values = append(values, "error", res.Error) } else { - logger.Infow("recording complete", - "id", res.Id, - "duration", fmt.Sprint(time.Duration(res.Duration*1e6)), - "location", res.Location, - ) + values = append(values, "duration", time.Duration(res.Duration*1e9)) + if res.DownloadUrl != "" { + values = append(values, "url", res.DownloadUrl) + } } + logger.Debugw("received recording result", values...) // webhook if s.notifier != nil {