From c41384cd09d45de3707531414b6cd5bdd5018a85 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Fri, 3 Dec 2021 21:40:53 -0800 Subject: [PATCH] ActiveRecording (#234) * ActiveRecording * regenerate * update to 0.10.3 * 1.17 --- cmd/server/commands.go | 1 + go.mod | 2 +- go.sum | 4 +- pkg/routing/interfaces.go | 1 + pkg/routing/redisrouter.go | 2 + pkg/rtc/participant.go | 11 +++- pkg/rtc/room.go | 36 +++++++++-- pkg/rtc/types/interfaces.go | 1 + pkg/rtc/types/typesfakes/fake_participant.go | 65 ++++++++++++++++++++ pkg/service/recordingservice.go | 19 +++--- pkg/service/rtcservice.go | 1 + pkg/service/wire_gen.go | 3 +- pkg/telemetry/events.go | 19 +++--- pkg/telemetry/service.go | 4 +- 14 files changed, 136 insertions(+), 33 deletions(-) diff --git a/cmd/server/commands.go b/cmd/server/commands.go index f5abf442c..59dbe8a57 100644 --- a/cmd/server/commands.go +++ b/cmd/server/commands.go @@ -110,6 +110,7 @@ func createToken(c *cli.Context) error { } if c.Bool("recorder") { grant.Hidden = true + grant.Recorder = true grant.SetCanPublish(false) grant.SetCanPublishData(false) } diff --git a/go.mod b/go.mod index 212aa2068..09861c979 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/gorilla/websocket v1.4.2 github.com/hashicorp/golang-lru v0.5.4 github.com/jxskiss/base62 v0.0.0-20191017122030-4f11678b909b - github.com/livekit/protocol v0.10.2 + github.com/livekit/protocol v0.10.3 github.com/magefile/mage v1.11.0 github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0 github.com/mitchellh/go-homedir v1.1.0 diff --git a/go.sum b/go.sum index 35f95cdd4..5d12996f3 100644 --- a/go.sum +++ b/go.sum @@ -132,8 +132,8 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY/mKrcvA= github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= -github.com/livekit/protocol v0.10.2 h1:ZnSM1MfiHr70mTsMqVbq9QnotGGyQNqUg32pWrSfXzg= -github.com/livekit/protocol v0.10.2/go.mod h1:YoHW9YbWbPnuVsgwBB4hAINKT+V68jmfh9zXBSSn6Wg= +github.com/livekit/protocol v0.10.3 h1:Jofe/vmPc9DB1o7T7nhMZxjIhuThlQBLU5F9Ju0WQjg= +github.com/livekit/protocol v0.10.3/go.mod h1:YoHW9YbWbPnuVsgwBB4hAINKT+V68jmfh9zXBSSn6Wg= github.com/magefile/mage v1.11.0 h1:C/55Ywp9BpgVVclD3lRnSYCwXTYxmSppIgLeDYlNuls= github.com/magefile/mage v1.11.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0= diff --git a/pkg/routing/interfaces.go b/pkg/routing/interfaces.go index 0ed4cd950..5894fe399 100644 --- a/pkg/routing/interfaces.go +++ b/pkg/routing/interfaces.go @@ -35,6 +35,7 @@ type ParticipantInit struct { Permission *livekit.ParticipantPermission AutoSubscribe bool Hidden bool + Recorder bool Client *livekit.ClientInfo } diff --git a/pkg/routing/redisrouter.go b/pkg/routing/redisrouter.go index e275e7872..39c985768 100644 --- a/pkg/routing/redisrouter.go +++ b/pkg/routing/redisrouter.go @@ -157,6 +157,7 @@ func (r *RedisRouter) StartParticipantSignal(ctx context.Context, roomName strin Permission: pi.Permission, AutoSubscribe: pi.AutoSubscribe, Hidden: pi.Hidden, + Recorder: pi.Recorder, Client: pi.Client, }) if err != nil { @@ -244,6 +245,7 @@ func (r *RedisRouter) startParticipantRTC(ss *livekit.StartSession, participantK Client: ss.Client, AutoSubscribe: ss.AutoSubscribe, Hidden: ss.Hidden, + Recorder: ss.Recorder, } reqChan := r.getOrCreateMessageChannel(r.requestChannels, participantKey) diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 7a2d24bec..6c747439e 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -9,8 +9,6 @@ import ( "time" lru "github.com/hashicorp/golang-lru" - "github.com/livekit/livekit-server/pkg/sfu" - "github.com/livekit/livekit-server/pkg/sfu/twcc" "github.com/livekit/protocol/logger" livekit "github.com/livekit/protocol/proto" "github.com/livekit/protocol/utils" @@ -19,6 +17,9 @@ import ( "github.com/pkg/errors" "google.golang.org/protobuf/proto" + "github.com/livekit/livekit-server/pkg/sfu" + "github.com/livekit/livekit-server/pkg/sfu/twcc" + "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" "github.com/livekit/livekit-server/pkg/rtc/types" @@ -43,6 +44,7 @@ type ParticipantParams struct { ThrottleConfig config.PLIThrottleConfig EnabledCodecs []*livekit.Codec Hidden bool + Recorder bool Logger logger.Logger } @@ -235,6 +237,7 @@ func (p *ParticipantImpl) ToProto() *livekit.ParticipantInfo { State: p.State(), JoinedAt: p.ConnectedAt().Unix(), Hidden: p.Hidden(), + Recorder: p.IsRecorder(), } p.lock.RLock() @@ -758,6 +761,10 @@ func (p *ParticipantImpl) Hidden() bool { return p.params.Hidden } +func (p *ParticipantImpl) IsRecorder() bool { + return p.params.Recorder +} + func (p *ParticipantImpl) SubscriberAsPrimary() bool { return p.ProtocolVersion().SubscriberAsPrimary() && p.CanSubscribe() } diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 5346ebbff..86298ced2 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -198,6 +198,11 @@ func (r *Room) Join(participant types.Participant, opts *ParticipantOptions, ice "room", r.Room.Name, "roomID", r.Room.Sid) + if participant.IsRecorder() && !r.Room.ActiveRecording { + r.Room.ActiveRecording = true + r.sendRoomUpdateLocked() + } + r.participants[participant.Identity()] = participant r.participantOpts[participant.Identity()] = opts @@ -264,7 +269,22 @@ func (r *Room) RemoveParticipant(identity string) { } } + activeRecording := false + if (p != nil && p.IsRecorder()) || p == nil && r.Room.ActiveRecording { + for _, op := range r.participants { + if op.IsRecorder() { + activeRecording = true + break + } + } + } + + if r.Room.ActiveRecording != activeRecording { + r.Room.ActiveRecording = activeRecording + r.sendRoomUpdateLocked() + } r.lock.Unlock() + if !ok { return } @@ -402,8 +422,18 @@ func (r *Room) SendDataPacket(up *livekit.UserPacket, kind livekit.DataPacket_Ki func (r *Room) SetMetadata(metadata string) { r.Room.Metadata = metadata + r.lock.RLock() + r.sendRoomUpdateLocked() + r.lock.RUnlock() + + if r.onMetadataUpdate != nil { + r.onMetadataUpdate(metadata) + } +} + +func (r *Room) sendRoomUpdateLocked() { // Send update to participants - for _, p := range r.GetParticipants() { + for _, p := range r.participants { if !p.IsReady() { continue } @@ -413,10 +443,6 @@ func (r *Room) SetMetadata(metadata string) { r.Logger.Warnw("failed to send room update", err, "room", r.Room.Name, "participant", p.Identity()) } } - - if r.onMetadataUpdate != nil { - r.onMetadataUpdate(metadata) - } } func (r *Room) OnMetadataUpdate(f func(metadata string)) { diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index b0fa6022d..f2f6d8435 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -67,6 +67,7 @@ type Participant interface { CanSubscribe() bool CanPublishData() bool Hidden() bool + IsRecorder() bool SubscriberAsPrimary() bool Start() diff --git a/pkg/rtc/types/typesfakes/fake_participant.go b/pkg/rtc/types/typesfakes/fake_participant.go index 43c4f0b4a..cfc2373d3 100644 --- a/pkg/rtc/types/typesfakes/fake_participant.go +++ b/pkg/rtc/types/typesfakes/fake_participant.go @@ -266,6 +266,16 @@ type FakeParticipant struct { isReadyReturnsOnCall map[int]struct { result1 bool } + IsRecorderStub func() bool + isRecorderMutex sync.RWMutex + isRecorderArgsForCall []struct { + } + isRecorderReturns struct { + result1 bool + } + isRecorderReturnsOnCall map[int]struct { + result1 bool + } IsSubscribedToStub func(string) bool isSubscribedToMutex sync.RWMutex isSubscribedToArgsForCall []struct { @@ -1831,6 +1841,59 @@ func (fake *FakeParticipant) IsReadyReturnsOnCall(i int, result1 bool) { }{result1} } +func (fake *FakeParticipant) IsRecorder() bool { + fake.isRecorderMutex.Lock() + ret, specificReturn := fake.isRecorderReturnsOnCall[len(fake.isRecorderArgsForCall)] + fake.isRecorderArgsForCall = append(fake.isRecorderArgsForCall, struct { + }{}) + stub := fake.IsRecorderStub + fakeReturns := fake.isRecorderReturns + fake.recordInvocation("IsRecorder", []interface{}{}) + fake.isRecorderMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeParticipant) IsRecorderCallCount() int { + fake.isRecorderMutex.RLock() + defer fake.isRecorderMutex.RUnlock() + return len(fake.isRecorderArgsForCall) +} + +func (fake *FakeParticipant) IsRecorderCalls(stub func() bool) { + fake.isRecorderMutex.Lock() + defer fake.isRecorderMutex.Unlock() + fake.IsRecorderStub = stub +} + +func (fake *FakeParticipant) IsRecorderReturns(result1 bool) { + fake.isRecorderMutex.Lock() + defer fake.isRecorderMutex.Unlock() + fake.IsRecorderStub = nil + fake.isRecorderReturns = struct { + result1 bool + }{result1} +} + +func (fake *FakeParticipant) IsRecorderReturnsOnCall(i int, result1 bool) { + fake.isRecorderMutex.Lock() + defer fake.isRecorderMutex.Unlock() + fake.IsRecorderStub = nil + if fake.isRecorderReturnsOnCall == nil { + fake.isRecorderReturnsOnCall = make(map[int]struct { + result1 bool + }) + } + fake.isRecorderReturnsOnCall[i] = struct { + result1 bool + }{result1} +} + func (fake *FakeParticipant) IsSubscribedTo(arg1 string) bool { fake.isSubscribedToMutex.Lock() ret, specificReturn := fake.isSubscribedToReturnsOnCall[len(fake.isSubscribedToArgsForCall)] @@ -3139,6 +3202,8 @@ func (fake *FakeParticipant) Invocations() map[string][][]interface{} { defer fake.identityMutex.RUnlock() fake.isReadyMutex.RLock() defer fake.isReadyMutex.RUnlock() + fake.isRecorderMutex.RLock() + defer fake.isRecorderMutex.RUnlock() fake.isSubscribedToMutex.RLock() defer fake.isSubscribedToMutex.RUnlock() fake.negotiateMutex.RLock() diff --git a/pkg/service/recordingservice.go b/pkg/service/recordingservice.go index a39bbf162..c1c531922 100644 --- a/pkg/service/recordingservice.go +++ b/pkg/service/recordingservice.go @@ -3,7 +3,6 @@ package service import ( "context" "errors" - "time" "github.com/livekit/protocol/logger" livekit "github.com/livekit/protocol/proto" @@ -63,8 +62,15 @@ func (s *RecordingService) StartRecording(ctx context.Context, req *livekit.Star return nil, err } + ri := &livekit.RecordingInfo{ + Id: recordingId, + Active: true, + } + if template := req.Input.(*livekit.StartRecordingRequest_Template); template != nil { + ri.RoomName = template.Template.RoomName + } logger.Debugw("recording started", "recordingID", recordingId) - s.telemetry.RecordingStarted(ctx, recordingId, req) + s.telemetry.RecordingStarted(ctx, ri) return &livekit.StartRecordingResponse{RecordingId: recordingId}, nil } @@ -85,6 +91,7 @@ func (s *RecordingService) AddOutput(ctx context.Context, req *livekit.AddOutput if err != nil { return nil, err } + return &emptypb.Empty{}, nil } @@ -104,6 +111,7 @@ func (s *RecordingService) RemoveOutput(ctx context.Context, req *livekit.Remove if err != nil { return nil, err } + return &emptypb.Empty{}, nil } @@ -140,7 +148,7 @@ func (s *RecordingService) resultsWorker() { case msg := <-resChan: b := sub.Payload(msg) - res := &livekit.RecordingResult{} + res := &livekit.RecordingInfo{} if err = proto.Unmarshal(b, res); err != nil { logger.Errorw("failed to read results", err) continue @@ -150,11 +158,6 @@ func (s *RecordingService) resultsWorker() { values := []interface{}{"recordingID", res.Id} if res.Error != "" { values = append(values, "error", res.Error) - } else { - values = append(values, "duration", time.Duration(res.Duration*1e9)) - if res.DownloadUrl != "" { - values = append(values, "url", res.DownloadUrl) - } } logger.Debugw("recording ended", values...) diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index 5aa74819d..e4280942b 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -91,6 +91,7 @@ func (s *RTCService) validate(r *http.Request) (string, routing.ParticipantInit, AutoSubscribe: true, Metadata: claims.Metadata, Hidden: claims.Video.Hidden, + Recorder: claims.Video.Recorder, Client: s.parseClientInfo(r.Form), } if autoSubParam != "" { diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 29265178c..fafb4ac46 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -1,8 +1,7 @@ // Code generated by Wire. DO NOT EDIT. //go:generate go run github.com/google/wire/cmd/wire -//go:build !wireinject -// +build !wireinject +//+build !wireinject package service diff --git a/pkg/telemetry/events.go b/pkg/telemetry/events.go index 127c009c9..3ebbac09c 100644 --- a/pkg/telemetry/events.go +++ b/pkg/telemetry/events.go @@ -144,32 +144,29 @@ func (t *telemetryService) TrackUnsubscribed(ctx context.Context, participantID }) } -func (t *telemetryService) RecordingStarted(ctx context.Context, recordingID string, req *livekit.StartRecordingRequest) { +func (t *telemetryService) RecordingStarted(ctx context.Context, ri *livekit.RecordingInfo) { t.notifyEvent(ctx, &livekit.WebhookEvent{ - Event: webhook.EventRecordingStarted, - RecordingInfo: &livekit.RecordingInfo{ - Id: recordingID, - Request: req, - }, + Event: webhook.EventRecordingStarted, + RecordingInfo: ri, }) t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{ Type: livekit.AnalyticsEventType_RECORDING_STARTED, Timestamp: timestamppb.Now(), - RecordingId: recordingID, + RecordingId: ri.Id, }) } -func (t *telemetryService) RecordingEnded(ctx context.Context, res *livekit.RecordingResult) { +func (t *telemetryService) RecordingEnded(ctx context.Context, ri *livekit.RecordingInfo) { t.notifyEvent(ctx, &livekit.WebhookEvent{ - Event: webhook.EventRecordingFinished, - RecordingResult: res, + Event: webhook.EventRecordingFinished, + RecordingInfo: ri, }) t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{ Type: livekit.AnalyticsEventType_RECORDING_ENDED, Timestamp: timestamppb.Now(), - RecordingId: res.Id, + RecordingId: ri.Id, }) } diff --git a/pkg/telemetry/service.go b/pkg/telemetry/service.go index 213c8a1b5..8176ebc8a 100644 --- a/pkg/telemetry/service.go +++ b/pkg/telemetry/service.go @@ -30,8 +30,8 @@ type TelemetryService interface { TrackUnpublished(ctx context.Context, participantID string, track *livekit.TrackInfo, ssrc uint32) TrackSubscribed(ctx context.Context, participantID string, track *livekit.TrackInfo) TrackUnsubscribed(ctx context.Context, participantID string, track *livekit.TrackInfo) - RecordingStarted(ctx context.Context, recordingID string, req *livekit.StartRecordingRequest) - RecordingEnded(ctx context.Context, res *livekit.RecordingResult) + RecordingStarted(ctx context.Context, ri *livekit.RecordingInfo) + RecordingEnded(ctx context.Context, ri *livekit.RecordingInfo) } type telemetryService struct {