Remove recording (#811)

* remove recorder service

* update protocol
This commit is contained in:
David Colburn
2022-07-05 18:39:32 -07:00
committed by GitHub
parent 4242205ede
commit fbbcbe77df
9 changed files with 4 additions and 333 deletions

2
go.mod
View File

@@ -13,7 +13,7 @@ require (
github.com/google/wire v0.5.0
github.com/gorilla/websocket v1.4.2
github.com/hashicorp/golang-lru v0.5.4
github.com/livekit/protocol v0.13.5-0.20220703041358-5f996eb446f3
github.com/livekit/protocol v0.13.5-0.20220706001059-137b876761cb
github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a
github.com/mackerelio/go-osstat v0.2.1
github.com/magefile/mage v1.13.0

4
go.sum
View File

@@ -133,8 +133,8 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY/mKrcvA=
github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/livekit/protocol v0.13.5-0.20220703041358-5f996eb446f3 h1:aKvC6QqcR7HO1X0AQhUwLUVojbSd6PLMhjBIWTTg3Gw=
github.com/livekit/protocol v0.13.5-0.20220703041358-5f996eb446f3/go.mod h1:BLtSeVmn2rLP37xjzw7gHgaAmkWl3L/L9bPvgSbaOfo=
github.com/livekit/protocol v0.13.5-0.20220706001059-137b876761cb h1:/HdeKLSWdf+WTm/LELrI33sZwuYiJxnNUBsQxrP01rw=
github.com/livekit/protocol v0.13.5-0.20220706001059-137b876761cb/go.mod h1:BLtSeVmn2rLP37xjzw7gHgaAmkWl3L/L9bPvgSbaOfo=
github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a h1:cENjhGfslLSDV07gt8ASy47Wd12Q0kBS7hsdunyQ62I=
github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a/go.mod h1:116ych8UaEs9vfIE8n6iZCZ30iagUFTls0vRmC+Ix5U=
github.com/mackerelio/go-osstat v0.2.1 h1:5AeAcBEutEErAOlDz6WCkEvm6AKYgHTUQrfwm5RbeQc=

View File

@@ -1,178 +0,0 @@
package service
import (
"context"
"errors"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/emptypb"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/recording"
"github.com/livekit/protocol/utils"
"github.com/livekit/livekit-server/pkg/telemetry"
)
type RecordingService struct {
bus utils.MessageBus
telemetry telemetry.TelemetryService
shutdown chan struct{}
}
func NewRecordingService(mb utils.MessageBus, telemetry telemetry.TelemetryService) *RecordingService {
return &RecordingService{
bus: mb,
telemetry: telemetry,
shutdown: make(chan struct{}, 1),
}
}
func (s *RecordingService) Start() {
if s.bus != nil {
go s.resultsWorker()
}
}
func (s *RecordingService) Stop() {
s.shutdown <- struct{}{}
}
// RecordingService is deprecated, use EgressService instead
func (s *RecordingService) StartRecording(ctx context.Context, req *livekit.StartRecordingRequest) (*livekit.StartRecordingResponse, error) {
if err := EnsureRecordPermission(ctx); err != nil {
return nil, twirpAuthError(err)
}
if s.bus == nil {
return nil, errors.New("recording not configured (redis required)")
}
// reserve a recorder
recordingID, err := recording.ReserveRecorder(s.bus)
if err != nil {
return nil, err
}
// start the recording
err = recording.RPC(ctx, s.bus, recordingID, &livekit.RecordingRequest{
Request: &livekit.RecordingRequest_Start{
Start: req,
},
})
if err != nil {
return nil, err
}
ri := &livekit.RecordingInfo{
Id: recordingID,
Active: true,
}
switch template := req.Input.(type) {
case *livekit.StartRecordingRequest_Template:
ri.RoomName = template.Template.RoomName
}
logger.Debugw("recording started", "recordingID", recordingID)
s.telemetry.RecordingStarted(ctx, ri)
return &livekit.StartRecordingResponse{RecordingId: recordingID}, nil
}
// RecordingService is deprecated, use EgressService instead
func (s *RecordingService) AddOutput(ctx context.Context, req *livekit.AddOutputRequest) (*emptypb.Empty, error) {
if err := EnsureRecordPermission(ctx); err != nil {
return nil, twirpAuthError(err)
}
if s.bus == nil {
return nil, errors.New("recording not configured (redis required)")
}
err := recording.RPC(ctx, s.bus, req.RecordingId, &livekit.RecordingRequest{
Request: &livekit.RecordingRequest_AddOutput{
AddOutput: req,
},
})
if err != nil {
return nil, err
}
return &emptypb.Empty{}, nil
}
// RecordingService is deprecated, use EgressService instead
func (s *RecordingService) RemoveOutput(ctx context.Context, req *livekit.RemoveOutputRequest) (*emptypb.Empty, error) {
if err := EnsureRecordPermission(ctx); err != nil {
return nil, twirpAuthError(err)
}
if s.bus == nil {
return nil, errors.New("recording not configured (redis required)")
}
err := recording.RPC(ctx, s.bus, req.RecordingId, &livekit.RecordingRequest{
Request: &livekit.RecordingRequest_RemoveOutput{
RemoveOutput: req,
},
})
if err != nil {
return nil, err
}
return &emptypb.Empty{}, nil
}
// RecordingService is deprecated, use EgressService instead
func (s *RecordingService) EndRecording(ctx context.Context, req *livekit.EndRecordingRequest) (*emptypb.Empty, error) {
if err := EnsureRecordPermission(ctx); err != nil {
return nil, twirpAuthError(err)
}
if s.bus == nil {
return nil, errors.New("recording not configured (redis required)")
}
err := recording.RPC(ctx, s.bus, req.RecordingId, &livekit.RecordingRequest{
Request: &livekit.RecordingRequest_End{
End: req,
},
})
if err != nil {
return nil, err
}
return &emptypb.Empty{}, nil
}
func (s *RecordingService) resultsWorker() {
sub, err := s.bus.SubscribeQueue(context.Background(), recording.ResultChannel)
if err != nil {
logger.Errorw("failed to subscribe to results channel", err)
return
}
resChan := sub.Channel()
for {
select {
case msg := <-resChan:
b := sub.Payload(msg)
res := &livekit.RecordingInfo{}
if err = proto.Unmarshal(b, res); err != nil {
logger.Errorw("failed to read results", err)
continue
}
// log results
values := []interface{}{"recordingID", res.Id}
if res.Error != "" {
values = append(values, "error", res.Error)
}
logger.Debugw("recording ended", values...)
s.telemetry.RecordingEnded(context.Background(), res)
case <-s.shutdown:
_ = sub.Close()
return
}
}
}

View File

@@ -28,7 +28,6 @@ import (
type LivekitServer struct {
config *config.Config
egressService *EgressService
recService *RecordingService
rtcService *RTCService
httpServer *http.Server
promServer *http.Server
@@ -44,7 +43,6 @@ type LivekitServer struct {
func NewLivekitServer(conf *config.Config,
roomService livekit.RoomService,
egressService *EgressService,
recService *RecordingService,
rtcService *RTCService,
keyProvider auth.KeyProvider,
router routing.Router,
@@ -55,7 +53,6 @@ func NewLivekitServer(conf *config.Config,
s = &LivekitServer{
config: conf,
egressService: egressService,
recService: recService,
rtcService: rtcService,
router: router,
roomManager: roomManager,
@@ -82,7 +79,6 @@ func NewLivekitServer(conf *config.Config,
roomServer := livekit.NewRoomServiceServer(roomService)
egressServer := livekit.NewEgressServer(egressService)
recServer := livekit.NewRecordingServiceServer(recService)
mux := http.NewServeMux()
if conf.Development {
@@ -93,7 +89,6 @@ func NewLivekitServer(conf *config.Config,
}
mux.Handle(roomServer.PathPrefix(), roomServer)
mux.Handle(egressServer.PathPrefix(), egressServer)
mux.Handle(recServer.PathPrefix(), recServer)
mux.Handle("/rtc", rtcService)
mux.HandleFunc("/rtc/validate", rtcService.Validate)
mux.HandleFunc("/", s.healthCheck)
@@ -153,7 +148,6 @@ func (s *LivekitServer) Start() error {
}
s.egressService.Start()
s.recService.Start()
// ensure we could listen
ln, err := net.Listen("tcp", s.httpServer.Addr)
@@ -221,7 +215,6 @@ func (s *LivekitServer) Start() error {
s.roomManager.Stop()
s.egressService.Stop()
s.recService.Stop()
close(s.closedChan)
return nil

View File

@@ -18,7 +18,6 @@ import (
"github.com/livekit/protocol/egress"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
"github.com/livekit/protocol/webhook"
"github.com/livekit/livekit-server/pkg/clientconfiguration"
@@ -31,7 +30,6 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
wire.Build(
getNodeID,
createRedisClient,
createMessageBus,
createStore,
wire.Bind(new(ServiceStore), new(ObjectStore)),
createKeyProvider,
@@ -45,7 +43,6 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
telemetry.NewTelemetryService,
egress.NewRedisRPCClient,
NewEgressService,
NewRecordingService,
NewRoomAllocator,
NewRoomService,
NewRTCService,
@@ -158,13 +155,6 @@ func createRedisClient(conf *config.Config) (*redis.Client, error) {
return rc, nil
}
func createMessageBus(rc *redis.Client) utils.MessageBus {
if rc == nil {
return nil
}
return utils.NewRedisMessageBus(rc)
}
func createStore(rc *redis.Client) ObjectStore {
if rc != nil {
return NewRedisStore(rc)

View File

@@ -19,7 +19,6 @@ import (
"github.com/livekit/protocol/egress"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
"github.com/livekit/protocol/webhook"
"github.com/pkg/errors"
"gopkg.in/yaml.v3"
@@ -61,8 +60,6 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
analyticsService := telemetry.NewAnalyticsService(conf, currentNode)
telemetryService := telemetry.NewTelemetryService(notifier, analyticsService)
egressService := NewEgressService(rpcClient, objectStore, roomService, telemetryService)
messageBus := createMessageBus(client)
recordingService := NewRecordingService(messageBus, telemetryService)
rtcService := NewRTCService(conf, roomAllocator, objectStore, router, currentNode)
clientConfigurationManager := createClientConfiguration()
roomManager, err := NewLocalRoomManager(conf, objectStore, currentNode, router, telemetryService, clientConfigurationManager)
@@ -74,7 +71,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
if err != nil {
return nil, err
}
livekitServer, err := NewLivekitServer(conf, roomService, egressService, recordingService, rtcService, keyProvider, router, roomManager, server, currentNode)
livekitServer, err := NewLivekitServer(conf, roomService, egressService, rtcService, keyProvider, router, roomManager, server, currentNode)
if err != nil {
return nil, err
}
@@ -184,13 +181,6 @@ func createRedisClient(conf *config.Config) (*redis.Client, error) {
return rc, nil
}
func createMessageBus(rc *redis.Client) utils.MessageBus {
if rc == nil {
return nil
}
return utils.NewRedisMessageBus(rc)
}
func createStore(rc *redis.Client) ObjectStore {
if rc != nil {
return NewRedisStore(rc)

View File

@@ -46,18 +46,6 @@ type FakeTelemetryService struct {
arg2 *livekit.Room
arg3 *livekit.ParticipantInfo
}
RecordingEndedStub func(context.Context, *livekit.RecordingInfo)
recordingEndedMutex sync.RWMutex
recordingEndedArgsForCall []struct {
arg1 context.Context
arg2 *livekit.RecordingInfo
}
RecordingStartedStub func(context.Context, *livekit.RecordingInfo)
recordingStartedMutex sync.RWMutex
recordingStartedArgsForCall []struct {
arg1 context.Context
arg2 *livekit.RecordingInfo
}
RoomEndedStub func(context.Context, *livekit.Room)
roomEndedMutex sync.RWMutex
roomEndedArgsForCall []struct {
@@ -301,72 +289,6 @@ func (fake *FakeTelemetryService) ParticipantLeftArgsForCall(i int) (context.Con
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
}
func (fake *FakeTelemetryService) RecordingEnded(arg1 context.Context, arg2 *livekit.RecordingInfo) {
fake.recordingEndedMutex.Lock()
fake.recordingEndedArgsForCall = append(fake.recordingEndedArgsForCall, struct {
arg1 context.Context
arg2 *livekit.RecordingInfo
}{arg1, arg2})
stub := fake.RecordingEndedStub
fake.recordInvocation("RecordingEnded", []interface{}{arg1, arg2})
fake.recordingEndedMutex.Unlock()
if stub != nil {
fake.RecordingEndedStub(arg1, arg2)
}
}
func (fake *FakeTelemetryService) RecordingEndedCallCount() int {
fake.recordingEndedMutex.RLock()
defer fake.recordingEndedMutex.RUnlock()
return len(fake.recordingEndedArgsForCall)
}
func (fake *FakeTelemetryService) RecordingEndedCalls(stub func(context.Context, *livekit.RecordingInfo)) {
fake.recordingEndedMutex.Lock()
defer fake.recordingEndedMutex.Unlock()
fake.RecordingEndedStub = stub
}
func (fake *FakeTelemetryService) RecordingEndedArgsForCall(i int) (context.Context, *livekit.RecordingInfo) {
fake.recordingEndedMutex.RLock()
defer fake.recordingEndedMutex.RUnlock()
argsForCall := fake.recordingEndedArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeTelemetryService) RecordingStarted(arg1 context.Context, arg2 *livekit.RecordingInfo) {
fake.recordingStartedMutex.Lock()
fake.recordingStartedArgsForCall = append(fake.recordingStartedArgsForCall, struct {
arg1 context.Context
arg2 *livekit.RecordingInfo
}{arg1, arg2})
stub := fake.RecordingStartedStub
fake.recordInvocation("RecordingStarted", []interface{}{arg1, arg2})
fake.recordingStartedMutex.Unlock()
if stub != nil {
fake.RecordingStartedStub(arg1, arg2)
}
}
func (fake *FakeTelemetryService) RecordingStartedCallCount() int {
fake.recordingStartedMutex.RLock()
defer fake.recordingStartedMutex.RUnlock()
return len(fake.recordingStartedArgsForCall)
}
func (fake *FakeTelemetryService) RecordingStartedCalls(stub func(context.Context, *livekit.RecordingInfo)) {
fake.recordingStartedMutex.Lock()
defer fake.recordingStartedMutex.Unlock()
fake.RecordingStartedStub = stub
}
func (fake *FakeTelemetryService) RecordingStartedArgsForCall(i int) (context.Context, *livekit.RecordingInfo) {
fake.recordingStartedMutex.RLock()
defer fake.recordingStartedMutex.RUnlock()
argsForCall := fake.recordingStartedArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeTelemetryService) RoomEnded(arg1 context.Context, arg2 *livekit.Room) {
fake.roomEndedMutex.Lock()
fake.roomEndedArgsForCall = append(fake.roomEndedArgsForCall, struct {
@@ -691,10 +613,6 @@ func (fake *FakeTelemetryService) Invocations() map[string][][]interface{} {
defer fake.participantJoinedMutex.RUnlock()
fake.participantLeftMutex.RLock()
defer fake.participantLeftMutex.RUnlock()
fake.recordingEndedMutex.RLock()
defer fake.recordingEndedMutex.RUnlock()
fake.recordingStartedMutex.RLock()
defer fake.recordingStartedMutex.RUnlock()
fake.roomEndedMutex.RLock()
defer fake.roomEndedMutex.RUnlock()
fake.roomStartedMutex.RLock()

View File

@@ -27,8 +27,6 @@ type TelemetryService interface {
TrackUnsubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo)
TrackPublishedUpdate(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo)
TrackMaxSubscribedVideoQuality(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, mime string, maxQuality livekit.VideoQuality)
RecordingStarted(ctx context.Context, ri *livekit.RecordingInfo)
RecordingEnded(ctx context.Context, ri *livekit.RecordingInfo)
EgressStarted(ctx context.Context, info *livekit.EgressInfo)
EgressEnded(ctx context.Context, info *livekit.EgressInfo)
}
@@ -135,18 +133,6 @@ func (t *telemetryService) TrackUnsubscribed(ctx context.Context, participantID
})
}
func (t *telemetryService) RecordingStarted(ctx context.Context, ri *livekit.RecordingInfo) {
t.enqueue(func() {
t.internalService.RecordingStarted(ctx, ri)
})
}
func (t *telemetryService) RecordingEnded(ctx context.Context, ri *livekit.RecordingInfo) {
t.enqueue(func() {
t.internalService.RecordingEnded(ctx, ri)
})
}
func (t *telemetryService) TrackPublishedUpdate(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) {
t.enqueue(func() {
t.internalService.TrackPublishedUpdate(ctx, participantID, track)

View File

@@ -263,34 +263,6 @@ func (t *telemetryServiceInternal) TrackUnsubscribed(ctx context.Context, partic
})
}
func (t *telemetryServiceInternal) RecordingStarted(ctx context.Context, ri *livekit.RecordingInfo) {
t.notifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventRecordingStarted,
RecordingInfo: ri,
})
t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_RECORDING_STARTED,
Timestamp: timestamppb.Now(),
RecordingId: ri.Id,
Room: &livekit.Room{Name: ri.RoomName},
})
}
func (t *telemetryServiceInternal) RecordingEnded(ctx context.Context, ri *livekit.RecordingInfo) {
t.notifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventRecordingFinished,
RecordingInfo: ri,
})
t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_RECORDING_ENDED,
Timestamp: timestamppb.Now(),
RecordingId: ri.Id,
Room: &livekit.Room{Name: ri.RoomName},
})
}
func (t *telemetryServiceInternal) getRoomDetails(participantID livekit.ParticipantID) (livekit.RoomID, livekit.RoomName) {
if w := t.getStatsWorker(participantID); w != nil {
return w.roomID, w.roomName