ActiveRecording (#234)

* ActiveRecording

* regenerate

* update to 0.10.3

* 1.17
This commit is contained in:
David Colburn
2021-12-03 21:40:53 -08:00
committed by GitHub
parent c00d799ac6
commit c41384cd09
14 changed files with 136 additions and 33 deletions

View File

@@ -110,6 +110,7 @@ func createToken(c *cli.Context) error {
}
if c.Bool("recorder") {
grant.Hidden = true
grant.Recorder = true
grant.SetCanPublish(false)
grant.SetCanPublishData(false)
}

2
go.mod
View File

@@ -15,7 +15,7 @@ require (
github.com/gorilla/websocket v1.4.2
github.com/hashicorp/golang-lru v0.5.4
github.com/jxskiss/base62 v0.0.0-20191017122030-4f11678b909b
github.com/livekit/protocol v0.10.2
github.com/livekit/protocol v0.10.3
github.com/magefile/mage v1.11.0
github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0
github.com/mitchellh/go-homedir v1.1.0

4
go.sum
View File

@@ -132,8 +132,8 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY/mKrcvA=
github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/livekit/protocol v0.10.2 h1:ZnSM1MfiHr70mTsMqVbq9QnotGGyQNqUg32pWrSfXzg=
github.com/livekit/protocol v0.10.2/go.mod h1:YoHW9YbWbPnuVsgwBB4hAINKT+V68jmfh9zXBSSn6Wg=
github.com/livekit/protocol v0.10.3 h1:Jofe/vmPc9DB1o7T7nhMZxjIhuThlQBLU5F9Ju0WQjg=
github.com/livekit/protocol v0.10.3/go.mod h1:YoHW9YbWbPnuVsgwBB4hAINKT+V68jmfh9zXBSSn6Wg=
github.com/magefile/mage v1.11.0 h1:C/55Ywp9BpgVVclD3lRnSYCwXTYxmSppIgLeDYlNuls=
github.com/magefile/mage v1.11.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A=
github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0=

View File

@@ -35,6 +35,7 @@ type ParticipantInit struct {
Permission *livekit.ParticipantPermission
AutoSubscribe bool
Hidden bool
Recorder bool
Client *livekit.ClientInfo
}

View File

@@ -157,6 +157,7 @@ func (r *RedisRouter) StartParticipantSignal(ctx context.Context, roomName strin
Permission: pi.Permission,
AutoSubscribe: pi.AutoSubscribe,
Hidden: pi.Hidden,
Recorder: pi.Recorder,
Client: pi.Client,
})
if err != nil {
@@ -244,6 +245,7 @@ func (r *RedisRouter) startParticipantRTC(ss *livekit.StartSession, participantK
Client: ss.Client,
AutoSubscribe: ss.AutoSubscribe,
Hidden: ss.Hidden,
Recorder: ss.Recorder,
}
reqChan := r.getOrCreateMessageChannel(r.requestChannels, participantKey)

View File

@@ -9,8 +9,6 @@ import (
"time"
lru "github.com/hashicorp/golang-lru"
"github.com/livekit/livekit-server/pkg/sfu"
"github.com/livekit/livekit-server/pkg/sfu/twcc"
"github.com/livekit/protocol/logger"
livekit "github.com/livekit/protocol/proto"
"github.com/livekit/protocol/utils"
@@ -19,6 +17,9 @@ import (
"github.com/pkg/errors"
"google.golang.org/protobuf/proto"
"github.com/livekit/livekit-server/pkg/sfu"
"github.com/livekit/livekit-server/pkg/sfu/twcc"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/rtc/types"
@@ -43,6 +44,7 @@ type ParticipantParams struct {
ThrottleConfig config.PLIThrottleConfig
EnabledCodecs []*livekit.Codec
Hidden bool
Recorder bool
Logger logger.Logger
}
@@ -235,6 +237,7 @@ func (p *ParticipantImpl) ToProto() *livekit.ParticipantInfo {
State: p.State(),
JoinedAt: p.ConnectedAt().Unix(),
Hidden: p.Hidden(),
Recorder: p.IsRecorder(),
}
p.lock.RLock()
@@ -758,6 +761,10 @@ func (p *ParticipantImpl) Hidden() bool {
return p.params.Hidden
}
func (p *ParticipantImpl) IsRecorder() bool {
return p.params.Recorder
}
func (p *ParticipantImpl) SubscriberAsPrimary() bool {
return p.ProtocolVersion().SubscriberAsPrimary() && p.CanSubscribe()
}

View File

@@ -198,6 +198,11 @@ func (r *Room) Join(participant types.Participant, opts *ParticipantOptions, ice
"room", r.Room.Name,
"roomID", r.Room.Sid)
if participant.IsRecorder() && !r.Room.ActiveRecording {
r.Room.ActiveRecording = true
r.sendRoomUpdateLocked()
}
r.participants[participant.Identity()] = participant
r.participantOpts[participant.Identity()] = opts
@@ -264,7 +269,22 @@ func (r *Room) RemoveParticipant(identity string) {
}
}
activeRecording := false
if (p != nil && p.IsRecorder()) || p == nil && r.Room.ActiveRecording {
for _, op := range r.participants {
if op.IsRecorder() {
activeRecording = true
break
}
}
}
if r.Room.ActiveRecording != activeRecording {
r.Room.ActiveRecording = activeRecording
r.sendRoomUpdateLocked()
}
r.lock.Unlock()
if !ok {
return
}
@@ -402,8 +422,18 @@ func (r *Room) SendDataPacket(up *livekit.UserPacket, kind livekit.DataPacket_Ki
func (r *Room) SetMetadata(metadata string) {
r.Room.Metadata = metadata
r.lock.RLock()
r.sendRoomUpdateLocked()
r.lock.RUnlock()
if r.onMetadataUpdate != nil {
r.onMetadataUpdate(metadata)
}
}
func (r *Room) sendRoomUpdateLocked() {
// Send update to participants
for _, p := range r.GetParticipants() {
for _, p := range r.participants {
if !p.IsReady() {
continue
}
@@ -413,10 +443,6 @@ func (r *Room) SetMetadata(metadata string) {
r.Logger.Warnw("failed to send room update", err, "room", r.Room.Name, "participant", p.Identity())
}
}
if r.onMetadataUpdate != nil {
r.onMetadataUpdate(metadata)
}
}
func (r *Room) OnMetadataUpdate(f func(metadata string)) {

View File

@@ -67,6 +67,7 @@ type Participant interface {
CanSubscribe() bool
CanPublishData() bool
Hidden() bool
IsRecorder() bool
SubscriberAsPrimary() bool
Start()

View File

@@ -266,6 +266,16 @@ type FakeParticipant struct {
isReadyReturnsOnCall map[int]struct {
result1 bool
}
IsRecorderStub func() bool
isRecorderMutex sync.RWMutex
isRecorderArgsForCall []struct {
}
isRecorderReturns struct {
result1 bool
}
isRecorderReturnsOnCall map[int]struct {
result1 bool
}
IsSubscribedToStub func(string) bool
isSubscribedToMutex sync.RWMutex
isSubscribedToArgsForCall []struct {
@@ -1831,6 +1841,59 @@ func (fake *FakeParticipant) IsReadyReturnsOnCall(i int, result1 bool) {
}{result1}
}
func (fake *FakeParticipant) IsRecorder() bool {
fake.isRecorderMutex.Lock()
ret, specificReturn := fake.isRecorderReturnsOnCall[len(fake.isRecorderArgsForCall)]
fake.isRecorderArgsForCall = append(fake.isRecorderArgsForCall, struct {
}{})
stub := fake.IsRecorderStub
fakeReturns := fake.isRecorderReturns
fake.recordInvocation("IsRecorder", []interface{}{})
fake.isRecorderMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeParticipant) IsRecorderCallCount() int {
fake.isRecorderMutex.RLock()
defer fake.isRecorderMutex.RUnlock()
return len(fake.isRecorderArgsForCall)
}
func (fake *FakeParticipant) IsRecorderCalls(stub func() bool) {
fake.isRecorderMutex.Lock()
defer fake.isRecorderMutex.Unlock()
fake.IsRecorderStub = stub
}
func (fake *FakeParticipant) IsRecorderReturns(result1 bool) {
fake.isRecorderMutex.Lock()
defer fake.isRecorderMutex.Unlock()
fake.IsRecorderStub = nil
fake.isRecorderReturns = struct {
result1 bool
}{result1}
}
func (fake *FakeParticipant) IsRecorderReturnsOnCall(i int, result1 bool) {
fake.isRecorderMutex.Lock()
defer fake.isRecorderMutex.Unlock()
fake.IsRecorderStub = nil
if fake.isRecorderReturnsOnCall == nil {
fake.isRecorderReturnsOnCall = make(map[int]struct {
result1 bool
})
}
fake.isRecorderReturnsOnCall[i] = struct {
result1 bool
}{result1}
}
func (fake *FakeParticipant) IsSubscribedTo(arg1 string) bool {
fake.isSubscribedToMutex.Lock()
ret, specificReturn := fake.isSubscribedToReturnsOnCall[len(fake.isSubscribedToArgsForCall)]
@@ -3139,6 +3202,8 @@ func (fake *FakeParticipant) Invocations() map[string][][]interface{} {
defer fake.identityMutex.RUnlock()
fake.isReadyMutex.RLock()
defer fake.isReadyMutex.RUnlock()
fake.isRecorderMutex.RLock()
defer fake.isRecorderMutex.RUnlock()
fake.isSubscribedToMutex.RLock()
defer fake.isSubscribedToMutex.RUnlock()
fake.negotiateMutex.RLock()

View File

@@ -3,7 +3,6 @@ package service
import (
"context"
"errors"
"time"
"github.com/livekit/protocol/logger"
livekit "github.com/livekit/protocol/proto"
@@ -63,8 +62,15 @@ func (s *RecordingService) StartRecording(ctx context.Context, req *livekit.Star
return nil, err
}
ri := &livekit.RecordingInfo{
Id: recordingId,
Active: true,
}
if template := req.Input.(*livekit.StartRecordingRequest_Template); template != nil {
ri.RoomName = template.Template.RoomName
}
logger.Debugw("recording started", "recordingID", recordingId)
s.telemetry.RecordingStarted(ctx, recordingId, req)
s.telemetry.RecordingStarted(ctx, ri)
return &livekit.StartRecordingResponse{RecordingId: recordingId}, nil
}
@@ -85,6 +91,7 @@ func (s *RecordingService) AddOutput(ctx context.Context, req *livekit.AddOutput
if err != nil {
return nil, err
}
return &emptypb.Empty{}, nil
}
@@ -104,6 +111,7 @@ func (s *RecordingService) RemoveOutput(ctx context.Context, req *livekit.Remove
if err != nil {
return nil, err
}
return &emptypb.Empty{}, nil
}
@@ -140,7 +148,7 @@ func (s *RecordingService) resultsWorker() {
case msg := <-resChan:
b := sub.Payload(msg)
res := &livekit.RecordingResult{}
res := &livekit.RecordingInfo{}
if err = proto.Unmarshal(b, res); err != nil {
logger.Errorw("failed to read results", err)
continue
@@ -150,11 +158,6 @@ func (s *RecordingService) resultsWorker() {
values := []interface{}{"recordingID", res.Id}
if res.Error != "" {
values = append(values, "error", res.Error)
} else {
values = append(values, "duration", time.Duration(res.Duration*1e9))
if res.DownloadUrl != "" {
values = append(values, "url", res.DownloadUrl)
}
}
logger.Debugw("recording ended", values...)

View File

@@ -91,6 +91,7 @@ func (s *RTCService) validate(r *http.Request) (string, routing.ParticipantInit,
AutoSubscribe: true,
Metadata: claims.Metadata,
Hidden: claims.Video.Hidden,
Recorder: claims.Video.Recorder,
Client: s.parseClientInfo(r.Form),
}
if autoSubParam != "" {

View File

@@ -1,8 +1,7 @@
// Code generated by Wire. DO NOT EDIT.
//go:generate go run github.com/google/wire/cmd/wire
//go:build !wireinject
// +build !wireinject
//+build !wireinject
package service

View File

@@ -144,32 +144,29 @@ func (t *telemetryService) TrackUnsubscribed(ctx context.Context, participantID
})
}
func (t *telemetryService) RecordingStarted(ctx context.Context, recordingID string, req *livekit.StartRecordingRequest) {
func (t *telemetryService) RecordingStarted(ctx context.Context, ri *livekit.RecordingInfo) {
t.notifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventRecordingStarted,
RecordingInfo: &livekit.RecordingInfo{
Id: recordingID,
Request: req,
},
Event: webhook.EventRecordingStarted,
RecordingInfo: ri,
})
t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_RECORDING_STARTED,
Timestamp: timestamppb.Now(),
RecordingId: recordingID,
RecordingId: ri.Id,
})
}
func (t *telemetryService) RecordingEnded(ctx context.Context, res *livekit.RecordingResult) {
func (t *telemetryService) RecordingEnded(ctx context.Context, ri *livekit.RecordingInfo) {
t.notifyEvent(ctx, &livekit.WebhookEvent{
Event: webhook.EventRecordingFinished,
RecordingResult: res,
Event: webhook.EventRecordingFinished,
RecordingInfo: ri,
})
t.analytics.SendEvent(ctx, &livekit.AnalyticsEvent{
Type: livekit.AnalyticsEventType_RECORDING_ENDED,
Timestamp: timestamppb.Now(),
RecordingId: res.Id,
RecordingId: ri.Id,
})
}

View File

@@ -30,8 +30,8 @@ type TelemetryService interface {
TrackUnpublished(ctx context.Context, participantID string, track *livekit.TrackInfo, ssrc uint32)
TrackSubscribed(ctx context.Context, participantID string, track *livekit.TrackInfo)
TrackUnsubscribed(ctx context.Context, participantID string, track *livekit.TrackInfo)
RecordingStarted(ctx context.Context, recordingID string, req *livekit.StartRecordingRequest)
RecordingEnded(ctx context.Context, res *livekit.RecordingResult)
RecordingStarted(ctx context.Context, ri *livekit.RecordingInfo)
RecordingEnded(ctx context.Context, ri *livekit.RecordingInfo)
}
type telemetryService struct {