diff --git a/cmd/server/commands.go b/cmd/server/commands.go index 73ff60f61..aa22f5e0a 100644 --- a/cmd/server/commands.go +++ b/cmd/server/commands.go @@ -103,6 +103,11 @@ func createToken(c *cli.Context) error { RoomJoin: true, Room: room, } + if c.Bool("recorder") { + grant.Hidden = true + grant.CanSubscribe = true + } + at := auth.NewAccessToken(apiKey, apiSecret). AddGrant(grant). SetIdentity(identity). diff --git a/cmd/server/main.go b/cmd/server/main.go index 86296bae7..1e6ae002b 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -118,6 +118,11 @@ func main() { Usage: "identity of participant that holds the token", Required: true, }, + &cli.BoolFlag{ + Name: "recorder", + Usage: "creates a hidden participant that can only subscribe", + Required: false, + }, }, }, }, diff --git a/go.mod b/go.mod index fc5335e05..fe27936d1 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/google/wire v0.5.0 github.com/gorilla/websocket v1.4.2 github.com/jxskiss/base62 v0.0.0-20191017122030-4f11678b909b - github.com/livekit/protocol v0.5.6 + github.com/livekit/protocol v0.5.8 github.com/magefile/mage v1.11.0 github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0 github.com/mitchellh/go-homedir v1.1.0 diff --git a/go.sum b/go.sum index 0302304bb..ff0775104 100644 --- a/go.sum +++ b/go.sum @@ -237,8 +237,8 @@ github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= github.com/livekit/ion-sfu v1.20.3 h1:1xfWnbg3tgXHLQWDaRQvuD/DtIOp72hAHDaDutP/i6M= github.com/livekit/ion-sfu v1.20.3/go.mod h1:dEdOG4KSqIftr5HxxqciNKBIdu0v3OD0ZYL7A3J09KA= -github.com/livekit/protocol v0.5.6 h1:vMUjYvJH2TD/WIjrXtG6aQl5fFrIj0/ZgIctQqmmwro= -github.com/livekit/protocol v0.5.6/go.mod h1:wo3CGfYB7XMF8GoVJAfTARrYSP/ombi+sbLl6AYdKP0= +github.com/livekit/protocol v0.5.8 h1:gcHZ1afxtmYrs8TvL1HqlRDFrF7SjnTbxCNWZGM5kik= +github.com/livekit/protocol v0.5.8/go.mod h1:wo3CGfYB7XMF8GoVJAfTARrYSP/ombi+sbLl6AYdKP0= github.com/lucsky/cuid v1.0.2 h1:z4XlExeoderxoPj2/dxKOyPxe9RCOu7yNq9/XWxIUMQ= github.com/lucsky/cuid v1.0.2/go.mod h1:QaaJqckboimOmhRSJXSx/+IT+VTfxfPGSo/6mfgUfmE= github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= diff --git a/pkg/routing/interfaces.go b/pkg/routing/interfaces.go index 2a5ba26da..fdae4d994 100644 --- a/pkg/routing/interfaces.go +++ b/pkg/routing/interfaces.go @@ -31,6 +31,7 @@ type ParticipantInit struct { ProtocolVersion int32 UsePlanB bool AutoSubscribe bool + Hidden bool } type NewParticipantCallback func(roomName string, pi ParticipantInit, requestSource MessageSource, responseSink MessageSink) diff --git a/pkg/rtc/helper_test.go b/pkg/rtc/helper_test.go index fcea38a34..4d281b220 100644 --- a/pkg/rtc/helper_test.go +++ b/pkg/rtc/helper_test.go @@ -1,20 +1,22 @@ package rtc_test import ( + "github.com/livekit/protocol/utils" + "github.com/livekit/livekit-server/pkg/rtc/types" "github.com/livekit/livekit-server/pkg/rtc/types/typesfakes" livekit "github.com/livekit/livekit-server/proto" - "github.com/livekit/protocol/utils" ) -func newMockParticipant(identity string, protocol types.ProtocolVersion) *typesfakes.FakeParticipant { +func newMockParticipant(identity string, protocol types.ProtocolVersion, hidden bool) *typesfakes.FakeParticipant { p := &typesfakes.FakeParticipant{} p.IDReturns(utils.NewGuid(utils.ParticipantPrefix)) p.IdentityReturns(identity) p.StateReturns(livekit.ParticipantInfo_JOINED) p.ProtocolVersionReturns(protocol) p.CanSubscribeReturns(true) - p.CanPublishReturns(true) + p.CanPublishReturns(!hidden) + p.HiddenReturns(hidden) p.SetMetadataStub = func(m string) { var f func(participant types.Participant) diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index a01ddc280..e750a0147 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -39,6 +39,7 @@ type ParticipantParams struct { Stats *RoomStatsReporter ThrottleConfig config.PLIThrottleConfig EnabledCodecs []*livekit.Codec + Hidden bool } type ParticipantImpl struct { @@ -195,6 +196,7 @@ func (p *ParticipantImpl) ToProto() *livekit.ParticipantInfo { Metadata: p.metadata, State: p.State(), JoinedAt: p.ConnectedAt().Unix(), + Hidden: p.Hidden(), } p.lock.RLock() @@ -583,6 +585,10 @@ func (p *ParticipantImpl) CanSubscribe() bool { return p.permission == nil || p.permission.CanSubscribe } +func (p *ParticipantImpl) Hidden() bool { + return p.params.Hidden +} + func (p *ParticipantImpl) SubscriberPC() *webrtc.PeerConnection { return p.subscriber.pc } diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index bf75f34e4..81ce27314 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -199,7 +199,7 @@ func (r *Room) Join(participant types.Participant, opts *ParticipantOptions) err // gather other participants and send join response otherParticipants := make([]types.Participant, 0, len(r.participants)) for _, p := range r.participants { - if p.ID() != participant.ID() { + if p.ID() != participant.ID() && !p.Hidden() { otherParticipants = append(otherParticipants, p) } } @@ -295,10 +295,15 @@ func (r *Room) CloseIfEmpty() { } r.lock.RLock() - numParticipants := len(r.participants) + visibleParticipants := 0 + for _, p := range r.participants { + if !p.Hidden() { + visibleParticipants++ + } + } r.lock.RUnlock() - if numParticipants > 0 { + if visibleParticipants > 0 { return } @@ -472,6 +477,19 @@ func (r *Room) subscribeToExistingTracks(p types.Participant) { // broadcast an update about participant p func (r *Room) broadcastParticipantState(p types.Participant, skipSource bool) { + if p.Hidden() { + if !skipSource { + // send update only to hidden participant + updates := ToProtoParticipants([]types.Participant{p}) + err := p.SendParticipantUpdate(updates) + if err != nil { + logger.Errorw("could not send update to participant", err, + "participant", p.Identity(), "pID", p.ID()) + } + } + return + } + updates := ToProtoParticipants([]types.Participant{p}) participants := r.GetParticipants() for _, op := range participants { diff --git a/pkg/rtc/room_test.go b/pkg/rtc/room_test.go index 5cd3448fb..2b2c86742 100644 --- a/pkg/rtc/room_test.go +++ b/pkg/rtc/room_test.go @@ -5,9 +5,10 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/testutils" - "github.com/stretchr/testify/require" "github.com/livekit/livekit-server/pkg/logger" "github.com/livekit/livekit-server/pkg/rtc" @@ -59,7 +60,7 @@ func TestJoinedState(t *testing.T) { func TestRoomJoin(t *testing.T) { t.Run("joining returns existing participant data", func(t *testing.T) { rm := newRoomWithParticipants(t, testRoomOpts{num: numParticipants}) - pNew := newMockParticipant("new", types.DefaultProtocol) + pNew := newMockParticipant("new", types.DefaultProtocol, false) rm.Join(pNew, nil) @@ -74,7 +75,7 @@ func TestRoomJoin(t *testing.T) { t.Run("subscribe to existing channels upon join", func(t *testing.T) { numExisting := 3 rm := newRoomWithParticipants(t, testRoomOpts{num: numExisting}) - p := newMockParticipant("new", types.DefaultProtocol) + p := newMockParticipant("new", types.DefaultProtocol, false) err := rm.Join(p, &rtc.ParticipantOptions{AutoSubscribe: true}) require.NoError(t, err) @@ -128,7 +129,7 @@ func TestRoomJoin(t *testing.T) { t.Run("cannot exceed max participants", func(t *testing.T) { rm := newRoomWithParticipants(t, testRoomOpts{num: 1}) rm.Room.MaxParticipants = 1 - p := newMockParticipant("second", types.ProtocolVersion(0)) + p := newMockParticipant("second", types.ProtocolVersion(0), false) err := rm.Join(p, nil) require.Equal(t, rtc.ErrMaxParticipantsExceeded, err) @@ -455,8 +456,50 @@ func TestDataChannel(t *testing.T) { }) } +func TestHiddenParticipants(t *testing.T) { + t.Run("other participants don't receive hidden updates", func(t *testing.T) { + rm := newRoomWithParticipants(t, testRoomOpts{num: 2, numHidden: 1}) + defer rm.Close() + + pNew := newMockParticipant("new", types.DefaultProtocol, false) + rm.Join(pNew, nil) + + // expect new participant to get a JoinReply + info, participants, iceServers := pNew.SendJoinResponseArgsForCall(0) + require.Equal(t, info.Sid, rm.Room.Sid) + require.Len(t, participants, 2) + require.Len(t, rm.GetParticipants(), 4) + require.NotEmpty(t, iceServers) + }) + + t.Run("hidden participant subscribes to tracks", func(t *testing.T) { + rm := newRoomWithParticipants(t, testRoomOpts{num: 2, numHidden: 1}) + p := newMockParticipant("new", types.DefaultProtocol, false) + + err := rm.Join(p, &rtc.ParticipantOptions{AutoSubscribe: true}) + require.NoError(t, err) + + stateChangeCB := p.OnStateChangeArgsForCall(0) + require.NotNil(t, stateChangeCB) + p.StateReturns(livekit.ParticipantInfo_ACTIVE) + stateChangeCB(p, livekit.ParticipantInfo_JOINED) + + // it should become a subscriber when connectivity changes + for _, op := range rm.GetParticipants() { + if p == op { + continue + } + mockP := op.(*typesfakes.FakeParticipant) + require.NotZero(t, mockP.AddSubscriberCallCount()) + // last call should be to add the newest participant + require.Equal(t, p, mockP.AddSubscriberArgsForCall(mockP.AddSubscriberCallCount()-1)) + } + }) +} + type testRoomOpts struct { num int + numHidden int protocol types.ProtocolVersion audioSmoothIntervals uint32 } @@ -477,9 +520,9 @@ func newRoomWithParticipants(t *testing.T, opts testRoomOpts) *rtc.Room { SmoothIntervals: opts.audioSmoothIntervals, }, ) - for i := 0; i < opts.num; i++ { + for i := 0; i < opts.num+opts.numHidden; i++ { identity := fmt.Sprintf("p%d", i) - participant := newMockParticipant(identity, opts.protocol) + participant := newMockParticipant(identity, opts.protocol, i >= opts.num) err := rm.Join(participant, &rtc.ParticipantOptions{AutoSubscribe: true}) participant.StateReturns(livekit.ParticipantInfo_ACTIVE) require.NoError(t, err) diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 226e732c6..ba08fb63c 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -57,6 +57,7 @@ type Participant interface { CanPublish() bool CanSubscribe() bool + Hidden() bool Start() Close() error diff --git a/pkg/rtc/types/typesfakes/fake_participant.go b/pkg/rtc/types/typesfakes/fake_participant.go index 6b64d29c0..22e54235d 100644 --- a/pkg/rtc/types/typesfakes/fake_participant.go +++ b/pkg/rtc/types/typesfakes/fake_participant.go @@ -165,6 +165,16 @@ type FakeParticipant struct { result1 webrtc.SessionDescription result2 error } + HiddenStub func() bool + hiddenMutex sync.RWMutex + hiddenArgsForCall []struct { + } + hiddenReturns struct { + result1 bool + } + hiddenReturnsOnCall map[int]struct { + result1 bool + } ICERestartStub func() error iCERestartMutex sync.RWMutex iCERestartArgsForCall []struct { @@ -1191,6 +1201,59 @@ func (fake *FakeParticipant) HandleOfferReturnsOnCall(i int, result1 webrtc.Sess }{result1, result2} } +func (fake *FakeParticipant) Hidden() bool { + fake.hiddenMutex.Lock() + ret, specificReturn := fake.hiddenReturnsOnCall[len(fake.hiddenArgsForCall)] + fake.hiddenArgsForCall = append(fake.hiddenArgsForCall, struct { + }{}) + stub := fake.HiddenStub + fakeReturns := fake.hiddenReturns + fake.recordInvocation("Hidden", []interface{}{}) + fake.hiddenMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeParticipant) HiddenCallCount() int { + fake.hiddenMutex.RLock() + defer fake.hiddenMutex.RUnlock() + return len(fake.hiddenArgsForCall) +} + +func (fake *FakeParticipant) HiddenCalls(stub func() bool) { + fake.hiddenMutex.Lock() + defer fake.hiddenMutex.Unlock() + fake.HiddenStub = stub +} + +func (fake *FakeParticipant) HiddenReturns(result1 bool) { + fake.hiddenMutex.Lock() + defer fake.hiddenMutex.Unlock() + fake.HiddenStub = nil + fake.hiddenReturns = struct { + result1 bool + }{result1} +} + +func (fake *FakeParticipant) HiddenReturnsOnCall(i int, result1 bool) { + fake.hiddenMutex.Lock() + defer fake.hiddenMutex.Unlock() + fake.HiddenStub = nil + if fake.hiddenReturnsOnCall == nil { + fake.hiddenReturnsOnCall = make(map[int]struct { + result1 bool + }) + } + fake.hiddenReturnsOnCall[i] = struct { + result1 bool + }{result1} +} + func (fake *FakeParticipant) ICERestart() error { fake.iCERestartMutex.Lock() ret, specificReturn := fake.iCERestartReturnsOnCall[len(fake.iCERestartArgsForCall)] @@ -2507,6 +2570,8 @@ func (fake *FakeParticipant) Invocations() map[string][][]interface{} { defer fake.handleAnswerMutex.RUnlock() fake.handleOfferMutex.RLock() defer fake.handleOfferMutex.RUnlock() + fake.hiddenMutex.RLock() + defer fake.hiddenMutex.RUnlock() fake.iCERestartMutex.RLock() defer fake.iCERestartMutex.RUnlock() fake.iDMutex.RLock() diff --git a/pkg/service/redisroomstore_test.go b/pkg/service/redisroomstore_test.go index a76321de4..233686ced 100644 --- a/pkg/service/redisroomstore_test.go +++ b/pkg/service/redisroomstore_test.go @@ -6,9 +6,10 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/livekit/livekit-server/pkg/service" livekit "github.com/livekit/livekit-server/proto" - "github.com/stretchr/testify/require" ) func TestParticipantPersistence(t *testing.T) { diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 45cf94d5e..4826a2db8 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -291,6 +291,7 @@ func (r *RoomManager) StartSession(roomName string, pi routing.ParticipantInit, Stats: room.GetStatsReporter(), ThrottleConfig: r.config.RTC.PLIThrottle, EnabledCodecs: room.Room.EnabledCodecs, + Hidden: pi.Hidden, }) if err != nil { logger.Errorw("could not create participant", err) diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index ce834574b..93f20c3c0 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -81,6 +81,7 @@ func (s *RTCService) validate(r *http.Request) (string, routing.ParticipantInit, UsePlanB: boolValue(planBParam), AutoSubscribe: true, Metadata: claims.Metadata, + Hidden: claims.Video.Hidden, } if autoSubParam != "" { pi.AutoSubscribe = boolValue(autoSubParam)