diff --git a/go.mod b/go.mod index 1e1747d4d..82caebb48 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/google/wire v0.5.0 github.com/gorilla/websocket v1.4.2 github.com/jxskiss/base62 v0.0.0-20191017122030-4f11678b909b - github.com/livekit/protocol v0.6.4 + github.com/livekit/protocol v0.6.5 github.com/magefile/mage v1.11.0 github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0 github.com/mitchellh/go-homedir v1.1.0 diff --git a/go.sum b/go.sum index 1a7c91f6c..7b815d550 100644 --- a/go.sum +++ b/go.sum @@ -237,8 +237,8 @@ github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= github.com/livekit/ion-sfu v1.20.6 h1:vA98RfuW3sSidV1rfK+/szGWgHFgki4Q4pomxsJS0i0= github.com/livekit/ion-sfu v1.20.6/go.mod h1:dEdOG4KSqIftr5HxxqciNKBIdu0v3OD0ZYL7A3J09KA= -github.com/livekit/protocol v0.6.4 h1:0xJQz/NzAKIYBuJPjgerooMWQc40xg4KhJ0abLamULA= -github.com/livekit/protocol v0.6.4/go.mod h1:wo3CGfYB7XMF8GoVJAfTARrYSP/ombi+sbLl6AYdKP0= +github.com/livekit/protocol v0.6.5 h1:YROy1ripRbDvZ1KtV/xl24kHKXEFfCye1gwtYB63WeE= +github.com/livekit/protocol v0.6.5/go.mod h1:wo3CGfYB7XMF8GoVJAfTARrYSP/ombi+sbLl6AYdKP0= github.com/lucsky/cuid v1.0.2 h1:z4XlExeoderxoPj2/dxKOyPxe9RCOu7yNq9/XWxIUMQ= github.com/lucsky/cuid v1.0.2/go.mod h1:QaaJqckboimOmhRSJXSx/+IT+VTfxfPGSo/6mfgUfmE= github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= diff --git a/magefile.go b/magefile.go index 797df0a70..4ec376d83 100644 --- a/magefile.go +++ b/magefile.go @@ -68,6 +68,7 @@ func Proto() error { protoDir := info.Dir updated, err := target.Path("proto/livekit_models.pb.go", protoDir+"/livekit_models.proto", + protoDir+"/livekit_recording.proto", protoDir+"/livekit_room.proto", protoDir+"/livekit_rtc.proto", protoDir+"/livekit_internal.proto", @@ -107,6 +108,7 @@ func Proto() error { "--plugin=go="+protocGoPath, "--plugin=twirp="+twirpPath, "-I="+protoDir, + protoDir+"/livekit_recording.proto", protoDir+"/livekit_room.proto", ) connectStd(cmd) @@ -120,6 +122,7 @@ func Proto() error { "--go_opt=paths=source_relative", "--plugin=go="+protocGoPath, "-I="+protoDir, + protoDir+"/livekit_recording.proto", protoDir+"/livekit_rtc.proto", protoDir+"/livekit_internal.proto", protoDir+"/livekit_models.proto", diff --git a/pkg/recording/recording.go b/pkg/recording/recording.go deleted file mode 100644 index 3aa396052..000000000 --- a/pkg/recording/recording.go +++ /dev/null @@ -1,61 +0,0 @@ -package recording - -import ( - "context" - "time" - - "github.com/go-redis/redis/v8" - "github.com/livekit/protocol/utils" - "github.com/pkg/errors" - "google.golang.org/protobuf/proto" - - livekit "github.com/livekit/livekit-server/proto" -) - -type RoomRecorder struct { - rc *redis.Client -} - -func NewRoomRecorder(rc *redis.Client) *RoomRecorder { - if rc == nil { - return nil - } - return &RoomRecorder{rc: rc} -} - -func (s *RoomRecorder) ReserveRecorder(ctx context.Context, req *livekit.RecordRoomRequest) (string, error) { - id := utils.NewGuid(utils.RecordingPrefix) - reservation := &livekit.RecordingReservation{ - Id: id, - SubmittedAt: time.Now().UnixNano(), - Input: req.Input, - Output: req.Output, - } - b, err := proto.Marshal(reservation) - if err != nil { - return "", err - } - - sub := s.rc.Subscribe(ctx, utils.ReservationResponseChannel(id)) - defer sub.Close() - - err = s.rc.Publish(ctx, utils.ReservationChannel, string(b)).Err() - if err != nil { - return "", err - } - - select { - case <-sub.Channel(): - return id, nil - case <-time.After(utils.RecorderTimeout): - return "", errors.New("no recorders available") - } -} - -func (s *RoomRecorder) StartRecording(ctx context.Context, recordingID string) error { - return s.rc.Publish(ctx, utils.StartRecordingChannel(recordingID), nil).Err() -} - -func (s *RoomRecorder) EndRecording(ctx context.Context, recordingID string) error { - return s.rc.Publish(ctx, utils.EndRecordingChannel(recordingID), nil).Err() -} diff --git a/pkg/service/auth.go b/pkg/service/auth.go index 0f671762e..255dc609d 100644 --- a/pkg/service/auth.go +++ b/pkg/service/auth.go @@ -144,6 +144,14 @@ func EnsureListPermission(ctx context.Context) error { return ErrPermissionDenied } +func EnsureRecordPermission(ctx context.Context) error { + claims := GetGrants(ctx) + if claims == nil || !claims.Video.RoomRecord { + return ErrPermissionDenied + } + return nil +} + // wraps authentication errors around Twirp func twirpAuthError(err error) error { return twirp.NewError(twirp.Unauthenticated, err.Error()) diff --git a/pkg/service/recordingservice.go b/pkg/service/recordingservice.go new file mode 100644 index 000000000..578e0a9aa --- /dev/null +++ b/pkg/service/recordingservice.go @@ -0,0 +1,89 @@ +package service + +import ( + "context" + "errors" + "time" + + "github.com/go-redis/redis/v8" + "github.com/livekit/protocol/utils" + "google.golang.org/protobuf/proto" + + livekit "github.com/livekit/livekit-server/proto" +) + +type RecordingService struct { + rc *redis.Client +} + +func NewRecordingService(rc *redis.Client) *RecordingService { + return &RecordingService{rc: rc} +} + +func (s *RecordingService) StartRecording(ctx context.Context, req *livekit.StartRecordingRequest) (*livekit.RecordingResponse, error) { + if err := EnsureRecordPermission(ctx); err != nil { + return nil, twirpAuthError(err) + } + + if s.rc == nil { + return nil, errors.New("recording not configured (redis required)") + } + + // reserve a recorder + recordingID, err := s.reserveRecorder(ctx, req) + if err != nil { + return nil, err + } + + // start the recording + err = s.rc.Publish(ctx, utils.StartRecordingChannel(recordingID), nil).Err() + if err != nil { + return nil, err + } + + return &livekit.RecordingResponse{RecordingId: recordingID}, nil +} + +func (s *RecordingService) reserveRecorder(ctx context.Context, req *livekit.StartRecordingRequest) (string, error) { + id := utils.NewGuid(utils.RecordingPrefix) + reservation := &livekit.RecordingReservation{ + Id: id, + SubmittedAt: time.Now().UnixNano(), + Input: req.Input, + Output: req.Output, + } + b, err := proto.Marshal(reservation) + if err != nil { + return "", err + } + + sub := s.rc.Subscribe(ctx, utils.ReservationResponseChannel(id)) + defer sub.Close() + + if err = s.rc.Publish(ctx, utils.ReservationChannel, string(b)).Err(); err != nil { + return "", err + } + + select { + case <-sub.Channel(): + return id, nil + case <-time.After(utils.RecorderTimeout): + return "", errors.New("no recorders available") + } +} + +func (s *RecordingService) EndRecording(ctx context.Context, req *livekit.EndRecordingRequest) (*livekit.RecordingResponse, error) { + if err := EnsureRecordPermission(ctx); err != nil { + return nil, twirpAuthError(err) + } + + if s.rc == nil { + return nil, errors.New("recording not configured (redis required)") + } + + if err := s.rc.Publish(ctx, utils.EndRecordingChannel(req.RecordingId), nil).Err(); err != nil { + return nil, err + } + + return &livekit.RecordingResponse{RecordingId: req.RecordingId}, nil +} diff --git a/pkg/service/roomservice.go b/pkg/service/roomservice.go index bf8f4fb49..b0b9ffa48 100644 --- a/pkg/service/roomservice.go +++ b/pkg/service/roomservice.go @@ -7,7 +7,6 @@ import ( "github.com/thoas/go-funk" "github.com/twitchtv/twirp" - "github.com/livekit/livekit-server/pkg/recording" "github.com/livekit/livekit-server/pkg/routing" livekit "github.com/livekit/livekit-server/proto" ) @@ -15,15 +14,10 @@ import ( // A rooms service that supports a single node type RoomService struct { roomManager *RoomManager - recorder *recording.RoomRecorder } -func NewRoomService(roomManager *RoomManager, rs *recording.RoomRecorder) (svc *RoomService, err error) { - svc = &RoomService{ - roomManager: roomManager, - recorder: rs, - } - +func NewRoomService(roomManager *RoomManager) (svc *RoomService, err error) { + svc = &RoomService{roomManager: roomManager} return } @@ -32,31 +26,11 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq return nil, twirpAuthError(err) } - var recordingID string - if req.Recording != nil { - if s.recorder == nil { - return nil, errors.New("recording not configured (redis required)") - } - - recordingID, err = s.recorder.ReserveRecorder(ctx, req.Recording) - if err != nil { - err = errors.Wrap(err, "could not reserve recorder") - return - } - } - rm, err = s.roomManager.CreateRoom(req) if err != nil { err = errors.Wrap(err, "could not create room") } - if recordingID != "" { - err = s.recorder.StartRecording(ctx, recordingID) - if err != nil { - err = errors.Wrap(err, "could not start recording") - } - } - return } diff --git a/pkg/service/server.go b/pkg/service/server.go index d9b0812e4..2738e9665 100644 --- a/pkg/service/server.go +++ b/pkg/service/server.go @@ -27,6 +27,7 @@ import ( type LivekitServer struct { config *config.Config roomServer livekit.TwirpServer + recServer livekit.TwirpServer rtcService *RTCService httpServer *http.Server promServer *http.Server @@ -41,6 +42,7 @@ type LivekitServer struct { func NewLivekitServer(conf *config.Config, roomService livekit.RoomService, + recService livekit.RecordingService, rtcService *RTCService, keyProvider auth.KeyProvider, router routing.Router, @@ -51,6 +53,7 @@ func NewLivekitServer(conf *config.Config, s = &LivekitServer{ config: conf, roomServer: livekit.NewRoomServiceServer(roomService), + recServer: livekit.NewRecordingServiceServer(recService), rtcService: rtcService, router: router, roomManager: roomManager, @@ -70,6 +73,7 @@ func NewLivekitServer(conf *config.Config, mux := http.NewServeMux() mux.Handle(s.roomServer.PathPrefix(), s.roomServer) + mux.Handle(s.recServer.PathPrefix(), s.recServer) mux.Handle("/rtc", rtcService) mux.HandleFunc("/rtc/validate", rtcService.Validate) mux.HandleFunc("/", s.healthCheck) diff --git a/pkg/service/utils.go b/pkg/service/utils.go index eec0e47c9..dbfc8fb81 100644 --- a/pkg/service/utils.go +++ b/pkg/service/utils.go @@ -10,7 +10,6 @@ import ( "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/logger" - "github.com/livekit/livekit-server/pkg/recording" "github.com/livekit/livekit-server/pkg/routing" livekit "github.com/livekit/livekit-server/proto" ) @@ -19,13 +18,14 @@ var ServiceSet = wire.NewSet( createRedisClient, createRouter, createStore, - recording.NewRoomRecorder, + NewRecordingService, NewRoomService, NewRTCService, NewLivekitServer, NewRoomManager, NewTurnServer, config.GetAudioConfig, + wire.Bind(new(livekit.RecordingService), new(*RecordingService)), wire.Bind(new(livekit.RoomService), new(*RoomService)), ) diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 1b2e9e10b..d1d2e8060 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -7,7 +7,6 @@ package service import ( "github.com/livekit/livekit-server/pkg/config" - "github.com/livekit/livekit-server/pkg/recording" "github.com/livekit/livekit-server/pkg/routing" "github.com/livekit/protocol/auth" ) @@ -25,17 +24,17 @@ func InitializeServer(conf *config.Config, keyProvider auth.KeyProvider, current if err != nil { return nil, err } - roomRecorder := recording.NewRoomRecorder(client) - roomService, err := NewRoomService(roomManager, roomRecorder) + roomService, err := NewRoomService(roomManager) if err != nil { return nil, err } + recordingService := NewRecordingService(client) rtcService := NewRTCService(conf, roomManager, router, currentNode) server, err := NewTurnServer(conf, roomStore, currentNode) if err != nil { return nil, err } - livekitServer, err := NewLivekitServer(conf, roomService, rtcService, keyProvider, router, roomManager, server, currentNode) + livekitServer, err := NewLivekitServer(conf, roomService, recordingService, rtcService, keyProvider, router, roomManager, server, currentNode) if err != nil { return nil, err } diff --git a/version/version.go b/version/version.go index 84adbfd9f..6fe26c630 100644 --- a/version/version.go +++ b/version/version.go @@ -1,3 +1,3 @@ package version -const Version = "0.11.4" +const Version = "0.11.5"