Update recording service (#137)

* update recording service

* return empty for EndRecording

* update protocol
This commit is contained in:
David Colburn
2021-10-08 11:44:41 -07:00
committed by GitHub
parent f6f5602bc4
commit 2c9ef2f6bb
3 changed files with 73 additions and 60 deletions

4
go.mod
View File

@@ -14,7 +14,7 @@ require (
github.com/google/wire v0.5.0
github.com/gorilla/websocket v1.4.2
github.com/jxskiss/base62 v0.0.0-20191017122030-4f11678b909b
github.com/livekit/protocol v0.9.4
github.com/livekit/protocol v0.9.7
github.com/magefile/mage v1.11.0
github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0
github.com/mitchellh/go-homedir v1.1.0
@@ -47,5 +47,3 @@ require (
)
replace github.com/pion/ion-sfu => github.com/livekit/ion-sfu v1.20.11
replace github.com/livekit/protocol => github.com/livekit/protocol v0.9.6-0.20211006235141-c785be2288fd

4
go.sum
View File

@@ -246,8 +246,8 @@ github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY
github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/livekit/ion-sfu v1.20.11 h1:QhfksN1jcYn9VvVLKHSxrlaPNpSxx1j57tsuncF5DbI=
github.com/livekit/ion-sfu v1.20.11/go.mod h1:IVcCb8yMl5qHq+8InP5v9HgrjgnPZny8GhFS86YOtug=
github.com/livekit/protocol v0.9.6-0.20211006235141-c785be2288fd h1:Q7wRRzJGqK6i3hAhgYtD82h2TypwSrySTrtk6AKWtI4=
github.com/livekit/protocol v0.9.6-0.20211006235141-c785be2288fd/go.mod h1:MEKn847Iu/2U8ClZyUmEm2oHn8k9fnSHy81Wv8kkSDo=
github.com/livekit/protocol v0.9.7 h1:GrDvTJTD8wFeoPk6XsLUB/C6pz0RRwa6EVr1trWvPAY=
github.com/livekit/protocol v0.9.7/go.mod h1:MEKn847Iu/2U8ClZyUmEm2oHn8k9fnSHy81Wv8kkSDo=
github.com/lucsky/cuid v1.0.2 h1:z4XlExeoderxoPj2/dxKOyPxe9RCOu7yNq9/XWxIUMQ=
github.com/lucsky/cuid v1.0.2/go.mod h1:QaaJqckboimOmhRSJXSx/+IT+VTfxfPGSo/6mfgUfmE=
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=

View File

@@ -3,34 +3,33 @@ package service
import (
"context"
"errors"
"fmt"
"time"
"github.com/livekit/protocol/logger"
livekit "github.com/livekit/protocol/proto"
"github.com/livekit/protocol/recording"
"github.com/livekit/protocol/utils"
"github.com/livekit/protocol/webhook"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/emptypb"
)
const lockExpiration = time.Second * 5
type RecordingService struct {
mb utils.MessageBus
bus utils.MessageBus
notifier webhook.Notifier
shutdown chan struct{}
}
func NewRecordingService(mb utils.MessageBus, notifier webhook.Notifier) *RecordingService {
return &RecordingService{
mb: mb,
bus: mb,
notifier: notifier,
shutdown: make(chan struct{}, 1),
}
}
func (s *RecordingService) Start() {
if s.mb != nil {
if s.bus != nil {
go s.resultsWorker()
}
}
@@ -39,78 +38,93 @@ func (s *RecordingService) Stop() {
s.shutdown <- struct{}{}
}
func (s *RecordingService) StartRecording(ctx context.Context, req *livekit.StartRecordingRequest) (*livekit.RecordingResponse, error) {
func (s *RecordingService) StartRecording(ctx context.Context, req *livekit.StartRecordingRequest) (*livekit.StartRecordingResponse, error) {
if err := EnsureRecordPermission(ctx); err != nil {
return nil, twirpAuthError(err)
}
if s.mb == nil {
if s.bus == nil {
return nil, errors.New("recording not configured (redis required)")
}
// reserve a recorder
recordingID, err := s.reserveRecorder(ctx, req)
// reserve a recorde
recordingId, err := recording.ReserveRecorder(s.bus)
if err != nil {
return nil, err
}
// start the recording
err = s.mb.Publish(ctx, utils.StartRecordingChannel(recordingID), nil)
err = recording.RPC(ctx, s.bus, recordingId, &livekit.RecordingRequest{
Request: &livekit.RecordingRequest_Start{
Start: req,
},
})
if err != nil {
return nil, err
}
return &livekit.RecordingResponse{RecordingId: recordingID}, nil
return &livekit.StartRecordingResponse{RecordingId: recordingId}, nil
}
func (s *RecordingService) reserveRecorder(ctx context.Context, req *livekit.StartRecordingRequest) (string, error) {
id := utils.NewGuid(utils.RecordingPrefix)
reservation := &livekit.RecordingReservation{
Id: id,
SubmittedAt: time.Now().UnixNano(),
Request: req,
}
b, err := proto.Marshal(reservation)
if err != nil {
return "", err
}
sub, err := s.mb.Subscribe(ctx, utils.ReservationResponseChannel(id))
if err != nil {
return "", err
}
defer sub.Close()
if err = s.mb.Publish(ctx, utils.ReservationChannel, string(b)); err != nil {
return "", err
}
select {
case <-sub.Channel():
return id, nil
case <-time.After(utils.RecorderTimeout):
return "", errors.New("recording request failed")
}
}
func (s *RecordingService) EndRecording(ctx context.Context, req *livekit.EndRecordingRequest) (*livekit.RecordingResponse, error) {
func (s *RecordingService) AddOutput(ctx context.Context, req *livekit.AddOutputRequest) (*emptypb.Empty, error) {
if err := EnsureRecordPermission(ctx); err != nil {
return nil, twirpAuthError(err)
}
if s.mb == nil {
if s.bus == nil {
return nil, errors.New("recording not configured (redis required)")
}
if err := s.mb.Publish(ctx, utils.EndRecordingChannel(req.RecordingId), nil); err != nil {
err := recording.RPC(ctx, s.bus, req.RecordingId, &livekit.RecordingRequest{
Request: &livekit.RecordingRequest_AddOutput{
AddOutput: req,
},
})
if err != nil {
return nil, err
}
return &emptypb.Empty{}, nil
}
func (s *RecordingService) RemoveOutput(ctx context.Context, req *livekit.RemoveOutputRequest) (*emptypb.Empty, error) {
if err := EnsureRecordPermission(ctx); err != nil {
return nil, twirpAuthError(err)
}
if s.bus == nil {
return nil, errors.New("recording not configured (redis required)")
}
err := recording.RPC(ctx, s.bus, req.RecordingId, &livekit.RecordingRequest{
Request: &livekit.RecordingRequest_RemoveOutput{
RemoveOutput: req,
},
})
if err != nil {
return nil, err
}
return &emptypb.Empty{}, nil
}
func (s *RecordingService) EndRecording(ctx context.Context, req *livekit.EndRecordingRequest) (*emptypb.Empty, error) {
if err := EnsureRecordPermission(ctx); err != nil {
return nil, twirpAuthError(err)
}
if s.bus == nil {
return nil, errors.New("recording not configured (redis required)")
}
err := recording.RPC(ctx, s.bus, req.RecordingId, &livekit.RecordingRequest{
Request: &livekit.RecordingRequest_End{
End: req,
},
})
if err != nil {
return nil, err
}
return &livekit.RecordingResponse{RecordingId: req.RecordingId}, nil
return &emptypb.Empty{}, nil
}
func (s *RecordingService) resultsWorker() {
sub, err := s.mb.SubscribeQueue(context.Background(), utils.RecordingResultChannel)
sub, err := s.bus.SubscribeQueue(context.Background(), recording.ResultChannel)
if err != nil {
logger.Errorw("failed to subscribe to results channel", err)
return
@@ -137,15 +151,16 @@ func (s *RecordingService) resultsWorker() {
func (s *RecordingService) notify(res *livekit.RecordingResult) {
// log results
values := []interface{}{"id", res.Id}
if res.Error != "" {
logger.Errorw("recording failed", errors.New(res.Error), "id", res.Id)
values = append(values, "error", res.Error)
} else {
logger.Infow("recording complete",
"id", res.Id,
"duration", fmt.Sprint(time.Duration(res.Duration*1e6)),
"location", res.Location,
)
values = append(values, "duration", time.Duration(res.Duration*1e9))
if res.DownloadUrl != "" {
values = append(values, "url", res.DownloadUrl)
}
}
logger.Debugw("received recording result", values...)
// webhook
if s.notifier != nil {