diff --git a/go.mod b/go.mod index 03640dad1..f82b6bff0 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/google/wire v0.5.0 github.com/gorilla/websocket v1.4.2 github.com/hashicorp/golang-lru v0.5.4 - github.com/livekit/protocol v0.13.5-0.20220703041358-5f996eb446f3 + github.com/livekit/protocol v0.13.5-0.20220706001059-137b876761cb github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a github.com/mackerelio/go-osstat v0.2.1 github.com/magefile/mage v1.13.0 diff --git a/go.sum b/go.sum index 0494dd2fe..0307111be 100644 --- a/go.sum +++ b/go.sum @@ -133,8 +133,8 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY/mKrcvA= github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= -github.com/livekit/protocol v0.13.5-0.20220703041358-5f996eb446f3 h1:aKvC6QqcR7HO1X0AQhUwLUVojbSd6PLMhjBIWTTg3Gw= -github.com/livekit/protocol v0.13.5-0.20220703041358-5f996eb446f3/go.mod h1:BLtSeVmn2rLP37xjzw7gHgaAmkWl3L/L9bPvgSbaOfo= +github.com/livekit/protocol v0.13.5-0.20220706001059-137b876761cb h1:/HdeKLSWdf+WTm/LELrI33sZwuYiJxnNUBsQxrP01rw= +github.com/livekit/protocol v0.13.5-0.20220706001059-137b876761cb/go.mod h1:BLtSeVmn2rLP37xjzw7gHgaAmkWl3L/L9bPvgSbaOfo= github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a h1:cENjhGfslLSDV07gt8ASy47Wd12Q0kBS7hsdunyQ62I= github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a/go.mod h1:116ych8UaEs9vfIE8n6iZCZ30iagUFTls0vRmC+Ix5U= github.com/mackerelio/go-osstat v0.2.1 h1:5AeAcBEutEErAOlDz6WCkEvm6AKYgHTUQrfwm5RbeQc= diff --git a/pkg/service/recordingservice.go b/pkg/service/recordingservice.go deleted file mode 100644 index dbd3759c0..000000000 --- a/pkg/service/recordingservice.go +++ /dev/null @@ -1,178 +0,0 @@ -package service - -import ( - "context" - "errors" - - "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/types/known/emptypb" - - "github.com/livekit/protocol/livekit" - "github.com/livekit/protocol/logger" - "github.com/livekit/protocol/recording" - "github.com/livekit/protocol/utils" - - "github.com/livekit/livekit-server/pkg/telemetry" -) - -type RecordingService struct { - bus utils.MessageBus - telemetry telemetry.TelemetryService - shutdown chan struct{} -} - -func NewRecordingService(mb utils.MessageBus, telemetry telemetry.TelemetryService) *RecordingService { - return &RecordingService{ - bus: mb, - telemetry: telemetry, - shutdown: make(chan struct{}, 1), - } -} - -func (s *RecordingService) Start() { - if s.bus != nil { - go s.resultsWorker() - } -} - -func (s *RecordingService) Stop() { - s.shutdown <- struct{}{} -} - -// RecordingService is deprecated, use EgressService instead -func (s *RecordingService) StartRecording(ctx context.Context, req *livekit.StartRecordingRequest) (*livekit.StartRecordingResponse, error) { - if err := EnsureRecordPermission(ctx); err != nil { - return nil, twirpAuthError(err) - } - if s.bus == nil { - return nil, errors.New("recording not configured (redis required)") - } - - // reserve a recorder - recordingID, err := recording.ReserveRecorder(s.bus) - if err != nil { - return nil, err - } - - // start the recording - err = recording.RPC(ctx, s.bus, recordingID, &livekit.RecordingRequest{ - Request: &livekit.RecordingRequest_Start{ - Start: req, - }, - }) - if err != nil { - return nil, err - } - - ri := &livekit.RecordingInfo{ - Id: recordingID, - Active: true, - } - - switch template := req.Input.(type) { - case *livekit.StartRecordingRequest_Template: - ri.RoomName = template.Template.RoomName - } - - logger.Debugw("recording started", "recordingID", recordingID) - s.telemetry.RecordingStarted(ctx, ri) - - return &livekit.StartRecordingResponse{RecordingId: recordingID}, nil -} - -// RecordingService is deprecated, use EgressService instead -func (s *RecordingService) AddOutput(ctx context.Context, req *livekit.AddOutputRequest) (*emptypb.Empty, error) { - if err := EnsureRecordPermission(ctx); err != nil { - return nil, twirpAuthError(err) - } - if s.bus == nil { - return nil, errors.New("recording not configured (redis required)") - } - - err := recording.RPC(ctx, s.bus, req.RecordingId, &livekit.RecordingRequest{ - Request: &livekit.RecordingRequest_AddOutput{ - AddOutput: req, - }, - }) - if err != nil { - return nil, err - } - - return &emptypb.Empty{}, nil -} - -// RecordingService is deprecated, use EgressService instead -func (s *RecordingService) RemoveOutput(ctx context.Context, req *livekit.RemoveOutputRequest) (*emptypb.Empty, error) { - if err := EnsureRecordPermission(ctx); err != nil { - return nil, twirpAuthError(err) - } - if s.bus == nil { - return nil, errors.New("recording not configured (redis required)") - } - - err := recording.RPC(ctx, s.bus, req.RecordingId, &livekit.RecordingRequest{ - Request: &livekit.RecordingRequest_RemoveOutput{ - RemoveOutput: req, - }, - }) - if err != nil { - return nil, err - } - - return &emptypb.Empty{}, nil -} - -// RecordingService is deprecated, use EgressService instead -func (s *RecordingService) EndRecording(ctx context.Context, req *livekit.EndRecordingRequest) (*emptypb.Empty, error) { - if err := EnsureRecordPermission(ctx); err != nil { - return nil, twirpAuthError(err) - } - if s.bus == nil { - return nil, errors.New("recording not configured (redis required)") - } - - err := recording.RPC(ctx, s.bus, req.RecordingId, &livekit.RecordingRequest{ - Request: &livekit.RecordingRequest_End{ - End: req, - }, - }) - if err != nil { - return nil, err - } - - return &emptypb.Empty{}, nil -} - -func (s *RecordingService) resultsWorker() { - sub, err := s.bus.SubscribeQueue(context.Background(), recording.ResultChannel) - if err != nil { - logger.Errorw("failed to subscribe to results channel", err) - return - } - - resChan := sub.Channel() - for { - select { - case msg := <-resChan: - b := sub.Payload(msg) - - res := &livekit.RecordingInfo{} - if err = proto.Unmarshal(b, res); err != nil { - logger.Errorw("failed to read results", err) - continue - } - - // log results - values := []interface{}{"recordingID", res.Id} - if res.Error != "" { - values = append(values, "error", res.Error) - } - logger.Debugw("recording ended", values...) - - s.telemetry.RecordingEnded(context.Background(), res) - case <-s.shutdown: - _ = sub.Close() - return - } - } -} diff --git a/pkg/service/server.go b/pkg/service/server.go index aec93abd0..7f56999fb 100644 --- a/pkg/service/server.go +++ b/pkg/service/server.go @@ -28,7 +28,6 @@ import ( type LivekitServer struct { config *config.Config egressService *EgressService - recService *RecordingService rtcService *RTCService httpServer *http.Server promServer *http.Server @@ -44,7 +43,6 @@ type LivekitServer struct { func NewLivekitServer(conf *config.Config, roomService livekit.RoomService, egressService *EgressService, - recService *RecordingService, rtcService *RTCService, keyProvider auth.KeyProvider, router routing.Router, @@ -55,7 +53,6 @@ func NewLivekitServer(conf *config.Config, s = &LivekitServer{ config: conf, egressService: egressService, - recService: recService, rtcService: rtcService, router: router, roomManager: roomManager, @@ -82,7 +79,6 @@ func NewLivekitServer(conf *config.Config, roomServer := livekit.NewRoomServiceServer(roomService) egressServer := livekit.NewEgressServer(egressService) - recServer := livekit.NewRecordingServiceServer(recService) mux := http.NewServeMux() if conf.Development { @@ -93,7 +89,6 @@ func NewLivekitServer(conf *config.Config, } mux.Handle(roomServer.PathPrefix(), roomServer) mux.Handle(egressServer.PathPrefix(), egressServer) - mux.Handle(recServer.PathPrefix(), recServer) mux.Handle("/rtc", rtcService) mux.HandleFunc("/rtc/validate", rtcService.Validate) mux.HandleFunc("/", s.healthCheck) @@ -153,7 +148,6 @@ func (s *LivekitServer) Start() error { } s.egressService.Start() - s.recService.Start() // ensure we could listen ln, err := net.Listen("tcp", s.httpServer.Addr) @@ -221,7 +215,6 @@ func (s *LivekitServer) Start() error { s.roomManager.Stop() s.egressService.Stop() - s.recService.Stop() close(s.closedChan) return nil diff --git a/pkg/service/wire.go b/pkg/service/wire.go index d2f3b7367..a500b7a22 100644 --- a/pkg/service/wire.go +++ b/pkg/service/wire.go @@ -18,7 +18,6 @@ import ( "github.com/livekit/protocol/egress" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" - "github.com/livekit/protocol/utils" "github.com/livekit/protocol/webhook" "github.com/livekit/livekit-server/pkg/clientconfiguration" @@ -31,7 +30,6 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live wire.Build( getNodeID, createRedisClient, - createMessageBus, createStore, wire.Bind(new(ServiceStore), new(ObjectStore)), createKeyProvider, @@ -45,7 +43,6 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live telemetry.NewTelemetryService, egress.NewRedisRPCClient, NewEgressService, - NewRecordingService, NewRoomAllocator, NewRoomService, NewRTCService, @@ -158,13 +155,6 @@ func createRedisClient(conf *config.Config) (*redis.Client, error) { return rc, nil } -func createMessageBus(rc *redis.Client) utils.MessageBus { - if rc == nil { - return nil - } - return utils.NewRedisMessageBus(rc) -} - func createStore(rc *redis.Client) ObjectStore { if rc != nil { return NewRedisStore(rc) diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 11c64b474..51dab1245 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -19,7 +19,6 @@ import ( "github.com/livekit/protocol/egress" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" - "github.com/livekit/protocol/utils" "github.com/livekit/protocol/webhook" "github.com/pkg/errors" "gopkg.in/yaml.v3" @@ -61,8 +60,6 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live analyticsService := telemetry.NewAnalyticsService(conf, currentNode) telemetryService := telemetry.NewTelemetryService(notifier, analyticsService) egressService := NewEgressService(rpcClient, objectStore, roomService, telemetryService) - messageBus := createMessageBus(client) - recordingService := NewRecordingService(messageBus, telemetryService) rtcService := NewRTCService(conf, roomAllocator, objectStore, router, currentNode) clientConfigurationManager := createClientConfiguration() roomManager, err := NewLocalRoomManager(conf, objectStore, currentNode, router, telemetryService, clientConfigurationManager) @@ -74,7 +71,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live if err != nil { return nil, err } - livekitServer, err := NewLivekitServer(conf, roomService, egressService, recordingService, rtcService, keyProvider, router, roomManager, server, currentNode) + livekitServer, err := NewLivekitServer(conf, roomService, egressService, rtcService, keyProvider, router, roomManager, server, currentNode) if err != nil { return nil, err } @@ -184,13 +181,6 @@ func createRedisClient(conf *config.Config) (*redis.Client, error) { return rc, nil } -func createMessageBus(rc *redis.Client) utils.MessageBus { - if rc == nil { - return nil - } - return utils.NewRedisMessageBus(rc) -} - func createStore(rc *redis.Client) ObjectStore { if rc != nil { return NewRedisStore(rc) diff --git a/pkg/telemetry/telemetryfakes/fake_telemetry_service.go b/pkg/telemetry/telemetryfakes/fake_telemetry_service.go index abdbefd47..5f6785509 100644 --- a/pkg/telemetry/telemetryfakes/fake_telemetry_service.go +++ b/pkg/telemetry/telemetryfakes/fake_telemetry_service.go @@ -46,18 +46,6 @@ type FakeTelemetryService struct { arg2 *livekit.Room arg3 *livekit.ParticipantInfo } - RecordingEndedStub func(context.Context, *livekit.RecordingInfo) - recordingEndedMutex sync.RWMutex - recordingEndedArgsForCall []struct { - arg1 context.Context - arg2 *livekit.RecordingInfo - } - RecordingStartedStub func(context.Context, *livekit.RecordingInfo) - recordingStartedMutex sync.RWMutex - recordingStartedArgsForCall []struct { - arg1 context.Context - arg2 *livekit.RecordingInfo - } RoomEndedStub func(context.Context, *livekit.Room) roomEndedMutex sync.RWMutex roomEndedArgsForCall []struct { @@ -301,72 +289,6 @@ func (fake *FakeTelemetryService) ParticipantLeftArgsForCall(i int) (context.Con return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 } -func (fake *FakeTelemetryService) RecordingEnded(arg1 context.Context, arg2 *livekit.RecordingInfo) { - fake.recordingEndedMutex.Lock() - fake.recordingEndedArgsForCall = append(fake.recordingEndedArgsForCall, struct { - arg1 context.Context - arg2 *livekit.RecordingInfo - }{arg1, arg2}) - stub := fake.RecordingEndedStub - fake.recordInvocation("RecordingEnded", []interface{}{arg1, arg2}) - fake.recordingEndedMutex.Unlock() - if stub != nil { - fake.RecordingEndedStub(arg1, arg2) - } -} - -func (fake *FakeTelemetryService) RecordingEndedCallCount() int { - fake.recordingEndedMutex.RLock() - defer fake.recordingEndedMutex.RUnlock() - return len(fake.recordingEndedArgsForCall) -} - -func (fake *FakeTelemetryService) RecordingEndedCalls(stub func(context.Context, *livekit.RecordingInfo)) { - fake.recordingEndedMutex.Lock() - defer fake.recordingEndedMutex.Unlock() - fake.RecordingEndedStub = stub -} - -func (fake *FakeTelemetryService) RecordingEndedArgsForCall(i int) (context.Context, *livekit.RecordingInfo) { - fake.recordingEndedMutex.RLock() - defer fake.recordingEndedMutex.RUnlock() - argsForCall := fake.recordingEndedArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 -} - -func (fake *FakeTelemetryService) RecordingStarted(arg1 context.Context, arg2 *livekit.RecordingInfo) { - fake.recordingStartedMutex.Lock() - fake.recordingStartedArgsForCall = append(fake.recordingStartedArgsForCall, struct { - arg1 context.Context - arg2 *livekit.RecordingInfo - }{arg1, arg2}) - stub := fake.RecordingStartedStub - fake.recordInvocation("RecordingStarted", []interface{}{arg1, arg2}) - fake.recordingStartedMutex.Unlock() - if stub != nil { - fake.RecordingStartedStub(arg1, arg2) - } -} - -func (fake *FakeTelemetryService) RecordingStartedCallCount() int { - fake.recordingStartedMutex.RLock() - defer fake.recordingStartedMutex.RUnlock() - return len(fake.recordingStartedArgsForCall) -} - -func (fake *FakeTelemetryService) RecordingStartedCalls(stub func(context.Context, *livekit.RecordingInfo)) { - fake.recordingStartedMutex.Lock() - defer fake.recordingStartedMutex.Unlock() - fake.RecordingStartedStub = stub -} - -func (fake *FakeTelemetryService) RecordingStartedArgsForCall(i int) (context.Context, *livekit.RecordingInfo) { - fake.recordingStartedMutex.RLock() - defer fake.recordingStartedMutex.RUnlock() - argsForCall := fake.recordingStartedArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 -} - func (fake *FakeTelemetryService) RoomEnded(arg1 context.Context, arg2 *livekit.Room) { fake.roomEndedMutex.Lock() fake.roomEndedArgsForCall = append(fake.roomEndedArgsForCall, struct { @@ -691,10 +613,6 @@ func (fake *FakeTelemetryService) Invocations() map[string][][]interface{} { defer fake.participantJoinedMutex.RUnlock() fake.participantLeftMutex.RLock() defer fake.participantLeftMutex.RUnlock() - fake.recordingEndedMutex.RLock() - defer fake.recordingEndedMutex.RUnlock() - fake.recordingStartedMutex.RLock() - defer fake.recordingStartedMutex.RUnlock() fake.roomEndedMutex.RLock() defer fake.roomEndedMutex.RUnlock() fake.roomStartedMutex.RLock() diff --git a/pkg/telemetry/telemetryservice.go b/pkg/telemetry/telemetryservice.go index 3996db003..85e3b307d 100644 --- a/pkg/telemetry/telemetryservice.go +++ b/pkg/telemetry/telemetryservice.go @@ -27,8 +27,6 @@ type TelemetryService interface { TrackUnsubscribed(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) TrackPublishedUpdate(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) TrackMaxSubscribedVideoQuality(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo, mime string, maxQuality livekit.VideoQuality) - RecordingStarted(ctx context.Context, ri *livekit.RecordingInfo) - RecordingEnded(ctx context.Context, ri *livekit.RecordingInfo) EgressStarted(ctx context.Context, info *livekit.EgressInfo) EgressEnded(ctx context.Context, info *livekit.EgressInfo) } @@ -135,18 +133,6 @@ func (t *telemetryService) TrackUnsubscribed(ctx context.Context, participantID }) } -func (t *telemetryService) RecordingStarted(ctx context.Context, ri *livekit.RecordingInfo) { - t.enqueue(func() { - t.internalService.RecordingStarted(ctx, ri) - }) -} - -func (t *telemetryService) RecordingEnded(ctx context.Context, ri *livekit.RecordingInfo) { - t.enqueue(func() { - t.internalService.RecordingEnded(ctx, ri) - }) -} - func (t *telemetryService) TrackPublishedUpdate(ctx context.Context, participantID livekit.ParticipantID, track *livekit.TrackInfo) { t.enqueue(func() { t.internalService.TrackPublishedUpdate(ctx, participantID, track) diff --git a/pkg/telemetry/telemetryserviceinternalevents.go b/pkg/telemetry/telemetryserviceinternalevents.go index 9f65b95ff..82e7a11cc 100644 --- a/pkg/telemetry/telemetryserviceinternalevents.go +++ b/pkg/telemetry/telemetryserviceinternalevents.go @@ -263,34 +263,6 @@ func (t *telemetryServiceInternal) TrackUnsubscribed(ctx context.Context, partic }) } -func (t *telemetryServiceInternal) RecordingStarted(ctx context.Context, ri *livekit.RecordingInfo) { - t.notifyEvent(ctx, &livekit.WebhookEvent{ - Event: webhook.EventRecordingStarted, - RecordingInfo: ri, - }) - - t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{ - Type: livekit.AnalyticsEventType_RECORDING_STARTED, - Timestamp: timestamppb.Now(), - RecordingId: ri.Id, - Room: &livekit.Room{Name: ri.RoomName}, - }) -} - -func (t *telemetryServiceInternal) RecordingEnded(ctx context.Context, ri *livekit.RecordingInfo) { - t.notifyEvent(ctx, &livekit.WebhookEvent{ - Event: webhook.EventRecordingFinished, - RecordingInfo: ri, - }) - - t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{ - Type: livekit.AnalyticsEventType_RECORDING_ENDED, - Timestamp: timestamppb.Now(), - RecordingId: ri.Id, - Room: &livekit.Room{Name: ri.RoomName}, - }) -} - func (t *telemetryServiceInternal) getRoomDetails(participantID livekit.ParticipantID) (livekit.RoomID, livekit.RoomName) { if w := t.getStatsWorker(participantID); w != nil { return w.roomID, w.roomName