Recording apis (#78)

* apis with auth

* recording service

* record permission

* protocol 0.6.5

* bump version
This commit is contained in:
David Colburn
2021-08-04 22:44:06 -05:00
committed by GitHub
parent a53deecb82
commit 6111b2d4f8
11 changed files with 115 additions and 99 deletions
+1 -1
View File
@@ -11,7 +11,7 @@ require (
github.com/google/wire v0.5.0
github.com/gorilla/websocket v1.4.2
github.com/jxskiss/base62 v0.0.0-20191017122030-4f11678b909b
github.com/livekit/protocol v0.6.4
github.com/livekit/protocol v0.6.5
github.com/magefile/mage v1.11.0
github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0
github.com/mitchellh/go-homedir v1.1.0
+2 -2
View File
@@ -237,8 +237,8 @@ github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY
github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/livekit/ion-sfu v1.20.6 h1:vA98RfuW3sSidV1rfK+/szGWgHFgki4Q4pomxsJS0i0=
github.com/livekit/ion-sfu v1.20.6/go.mod h1:dEdOG4KSqIftr5HxxqciNKBIdu0v3OD0ZYL7A3J09KA=
github.com/livekit/protocol v0.6.4 h1:0xJQz/NzAKIYBuJPjgerooMWQc40xg4KhJ0abLamULA=
github.com/livekit/protocol v0.6.4/go.mod h1:wo3CGfYB7XMF8GoVJAfTARrYSP/ombi+sbLl6AYdKP0=
github.com/livekit/protocol v0.6.5 h1:YROy1ripRbDvZ1KtV/xl24kHKXEFfCye1gwtYB63WeE=
github.com/livekit/protocol v0.6.5/go.mod h1:wo3CGfYB7XMF8GoVJAfTARrYSP/ombi+sbLl6AYdKP0=
github.com/lucsky/cuid v1.0.2 h1:z4XlExeoderxoPj2/dxKOyPxe9RCOu7yNq9/XWxIUMQ=
github.com/lucsky/cuid v1.0.2/go.mod h1:QaaJqckboimOmhRSJXSx/+IT+VTfxfPGSo/6mfgUfmE=
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
+3
View File
@@ -68,6 +68,7 @@ func Proto() error {
protoDir := info.Dir
updated, err := target.Path("proto/livekit_models.pb.go",
protoDir+"/livekit_models.proto",
protoDir+"/livekit_recording.proto",
protoDir+"/livekit_room.proto",
protoDir+"/livekit_rtc.proto",
protoDir+"/livekit_internal.proto",
@@ -107,6 +108,7 @@ func Proto() error {
"--plugin=go="+protocGoPath,
"--plugin=twirp="+twirpPath,
"-I="+protoDir,
protoDir+"/livekit_recording.proto",
protoDir+"/livekit_room.proto",
)
connectStd(cmd)
@@ -120,6 +122,7 @@ func Proto() error {
"--go_opt=paths=source_relative",
"--plugin=go="+protocGoPath,
"-I="+protoDir,
protoDir+"/livekit_recording.proto",
protoDir+"/livekit_rtc.proto",
protoDir+"/livekit_internal.proto",
protoDir+"/livekit_models.proto",
-61
View File
@@ -1,61 +0,0 @@
package recording
import (
"context"
"time"
"github.com/go-redis/redis/v8"
"github.com/livekit/protocol/utils"
"github.com/pkg/errors"
"google.golang.org/protobuf/proto"
livekit "github.com/livekit/livekit-server/proto"
)
type RoomRecorder struct {
rc *redis.Client
}
func NewRoomRecorder(rc *redis.Client) *RoomRecorder {
if rc == nil {
return nil
}
return &RoomRecorder{rc: rc}
}
func (s *RoomRecorder) ReserveRecorder(ctx context.Context, req *livekit.RecordRoomRequest) (string, error) {
id := utils.NewGuid(utils.RecordingPrefix)
reservation := &livekit.RecordingReservation{
Id: id,
SubmittedAt: time.Now().UnixNano(),
Input: req.Input,
Output: req.Output,
}
b, err := proto.Marshal(reservation)
if err != nil {
return "", err
}
sub := s.rc.Subscribe(ctx, utils.ReservationResponseChannel(id))
defer sub.Close()
err = s.rc.Publish(ctx, utils.ReservationChannel, string(b)).Err()
if err != nil {
return "", err
}
select {
case <-sub.Channel():
return id, nil
case <-time.After(utils.RecorderTimeout):
return "", errors.New("no recorders available")
}
}
func (s *RoomRecorder) StartRecording(ctx context.Context, recordingID string) error {
return s.rc.Publish(ctx, utils.StartRecordingChannel(recordingID), nil).Err()
}
func (s *RoomRecorder) EndRecording(ctx context.Context, recordingID string) error {
return s.rc.Publish(ctx, utils.EndRecordingChannel(recordingID), nil).Err()
}
+8
View File
@@ -144,6 +144,14 @@ func EnsureListPermission(ctx context.Context) error {
return ErrPermissionDenied
}
func EnsureRecordPermission(ctx context.Context) error {
claims := GetGrants(ctx)
if claims == nil || !claims.Video.RoomRecord {
return ErrPermissionDenied
}
return nil
}
// wraps authentication errors around Twirp
func twirpAuthError(err error) error {
return twirp.NewError(twirp.Unauthenticated, err.Error())
+89
View File
@@ -0,0 +1,89 @@
package service
import (
"context"
"errors"
"time"
"github.com/go-redis/redis/v8"
"github.com/livekit/protocol/utils"
"google.golang.org/protobuf/proto"
livekit "github.com/livekit/livekit-server/proto"
)
type RecordingService struct {
rc *redis.Client
}
func NewRecordingService(rc *redis.Client) *RecordingService {
return &RecordingService{rc: rc}
}
func (s *RecordingService) StartRecording(ctx context.Context, req *livekit.StartRecordingRequest) (*livekit.RecordingResponse, error) {
if err := EnsureRecordPermission(ctx); err != nil {
return nil, twirpAuthError(err)
}
if s.rc == nil {
return nil, errors.New("recording not configured (redis required)")
}
// reserve a recorder
recordingID, err := s.reserveRecorder(ctx, req)
if err != nil {
return nil, err
}
// start the recording
err = s.rc.Publish(ctx, utils.StartRecordingChannel(recordingID), nil).Err()
if err != nil {
return nil, err
}
return &livekit.RecordingResponse{RecordingId: recordingID}, nil
}
func (s *RecordingService) reserveRecorder(ctx context.Context, req *livekit.StartRecordingRequest) (string, error) {
id := utils.NewGuid(utils.RecordingPrefix)
reservation := &livekit.RecordingReservation{
Id: id,
SubmittedAt: time.Now().UnixNano(),
Input: req.Input,
Output: req.Output,
}
b, err := proto.Marshal(reservation)
if err != nil {
return "", err
}
sub := s.rc.Subscribe(ctx, utils.ReservationResponseChannel(id))
defer sub.Close()
if err = s.rc.Publish(ctx, utils.ReservationChannel, string(b)).Err(); err != nil {
return "", err
}
select {
case <-sub.Channel():
return id, nil
case <-time.After(utils.RecorderTimeout):
return "", errors.New("no recorders available")
}
}
func (s *RecordingService) EndRecording(ctx context.Context, req *livekit.EndRecordingRequest) (*livekit.RecordingResponse, error) {
if err := EnsureRecordPermission(ctx); err != nil {
return nil, twirpAuthError(err)
}
if s.rc == nil {
return nil, errors.New("recording not configured (redis required)")
}
if err := s.rc.Publish(ctx, utils.EndRecordingChannel(req.RecordingId), nil).Err(); err != nil {
return nil, err
}
return &livekit.RecordingResponse{RecordingId: req.RecordingId}, nil
}
+2 -28
View File
@@ -7,7 +7,6 @@ import (
"github.com/thoas/go-funk"
"github.com/twitchtv/twirp"
"github.com/livekit/livekit-server/pkg/recording"
"github.com/livekit/livekit-server/pkg/routing"
livekit "github.com/livekit/livekit-server/proto"
)
@@ -15,15 +14,10 @@ import (
// A rooms service that supports a single node
type RoomService struct {
roomManager *RoomManager
recorder *recording.RoomRecorder
}
func NewRoomService(roomManager *RoomManager, rs *recording.RoomRecorder) (svc *RoomService, err error) {
svc = &RoomService{
roomManager: roomManager,
recorder: rs,
}
func NewRoomService(roomManager *RoomManager) (svc *RoomService, err error) {
svc = &RoomService{roomManager: roomManager}
return
}
@@ -32,31 +26,11 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq
return nil, twirpAuthError(err)
}
var recordingID string
if req.Recording != nil {
if s.recorder == nil {
return nil, errors.New("recording not configured (redis required)")
}
recordingID, err = s.recorder.ReserveRecorder(ctx, req.Recording)
if err != nil {
err = errors.Wrap(err, "could not reserve recorder")
return
}
}
rm, err = s.roomManager.CreateRoom(req)
if err != nil {
err = errors.Wrap(err, "could not create room")
}
if recordingID != "" {
err = s.recorder.StartRecording(ctx, recordingID)
if err != nil {
err = errors.Wrap(err, "could not start recording")
}
}
return
}
+4
View File
@@ -27,6 +27,7 @@ import (
type LivekitServer struct {
config *config.Config
roomServer livekit.TwirpServer
recServer livekit.TwirpServer
rtcService *RTCService
httpServer *http.Server
promServer *http.Server
@@ -41,6 +42,7 @@ type LivekitServer struct {
func NewLivekitServer(conf *config.Config,
roomService livekit.RoomService,
recService livekit.RecordingService,
rtcService *RTCService,
keyProvider auth.KeyProvider,
router routing.Router,
@@ -51,6 +53,7 @@ func NewLivekitServer(conf *config.Config,
s = &LivekitServer{
config: conf,
roomServer: livekit.NewRoomServiceServer(roomService),
recServer: livekit.NewRecordingServiceServer(recService),
rtcService: rtcService,
router: router,
roomManager: roomManager,
@@ -70,6 +73,7 @@ func NewLivekitServer(conf *config.Config,
mux := http.NewServeMux()
mux.Handle(s.roomServer.PathPrefix(), s.roomServer)
mux.Handle(s.recServer.PathPrefix(), s.recServer)
mux.Handle("/rtc", rtcService)
mux.HandleFunc("/rtc/validate", rtcService.Validate)
mux.HandleFunc("/", s.healthCheck)
+2 -2
View File
@@ -10,7 +10,6 @@ import (
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/logger"
"github.com/livekit/livekit-server/pkg/recording"
"github.com/livekit/livekit-server/pkg/routing"
livekit "github.com/livekit/livekit-server/proto"
)
@@ -19,13 +18,14 @@ var ServiceSet = wire.NewSet(
createRedisClient,
createRouter,
createStore,
recording.NewRoomRecorder,
NewRecordingService,
NewRoomService,
NewRTCService,
NewLivekitServer,
NewRoomManager,
NewTurnServer,
config.GetAudioConfig,
wire.Bind(new(livekit.RecordingService), new(*RecordingService)),
wire.Bind(new(livekit.RoomService), new(*RoomService)),
)
+3 -4
View File
@@ -7,7 +7,6 @@ package service
import (
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/recording"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/protocol/auth"
)
@@ -25,17 +24,17 @@ func InitializeServer(conf *config.Config, keyProvider auth.KeyProvider, current
if err != nil {
return nil, err
}
roomRecorder := recording.NewRoomRecorder(client)
roomService, err := NewRoomService(roomManager, roomRecorder)
roomService, err := NewRoomService(roomManager)
if err != nil {
return nil, err
}
recordingService := NewRecordingService(client)
rtcService := NewRTCService(conf, roomManager, router, currentNode)
server, err := NewTurnServer(conf, roomStore, currentNode)
if err != nil {
return nil, err
}
livekitServer, err := NewLivekitServer(conf, roomService, rtcService, keyProvider, router, roomManager, server, currentNode)
livekitServer, err := NewLivekitServer(conf, roomService, recordingService, rtcService, keyProvider, router, roomManager, server, currentNode)
if err != nil {
return nil, err
}
+1 -1
View File
@@ -1,3 +1,3 @@
package version
const Version = "0.11.4"
const Version = "0.11.5"