Hidden participants (#65)

Enables hidden recording and/or forwarding participants
This commit is contained in:
David Colburn
2021-07-19 18:33:28 -05:00
committed by GitHub
parent 87dfb2b117
commit 4bab395ac9
14 changed files with 165 additions and 16 deletions

View File

@@ -103,6 +103,11 @@ func createToken(c *cli.Context) error {
RoomJoin: true,
Room: room,
}
if c.Bool("recorder") {
grant.Hidden = true
grant.CanSubscribe = true
}
at := auth.NewAccessToken(apiKey, apiSecret).
AddGrant(grant).
SetIdentity(identity).

View File

@@ -118,6 +118,11 @@ func main() {
Usage: "identity of participant that holds the token",
Required: true,
},
&cli.BoolFlag{
Name: "recorder",
Usage: "creates a hidden participant that can only subscribe",
Required: false,
},
},
},
},

2
go.mod
View File

@@ -11,7 +11,7 @@ require (
github.com/google/wire v0.5.0
github.com/gorilla/websocket v1.4.2
github.com/jxskiss/base62 v0.0.0-20191017122030-4f11678b909b
github.com/livekit/protocol v0.5.6
github.com/livekit/protocol v0.5.8
github.com/magefile/mage v1.11.0
github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0
github.com/mitchellh/go-homedir v1.1.0

4
go.sum
View File

@@ -237,8 +237,8 @@ github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY
github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/livekit/ion-sfu v1.20.3 h1:1xfWnbg3tgXHLQWDaRQvuD/DtIOp72hAHDaDutP/i6M=
github.com/livekit/ion-sfu v1.20.3/go.mod h1:dEdOG4KSqIftr5HxxqciNKBIdu0v3OD0ZYL7A3J09KA=
github.com/livekit/protocol v0.5.6 h1:vMUjYvJH2TD/WIjrXtG6aQl5fFrIj0/ZgIctQqmmwro=
github.com/livekit/protocol v0.5.6/go.mod h1:wo3CGfYB7XMF8GoVJAfTARrYSP/ombi+sbLl6AYdKP0=
github.com/livekit/protocol v0.5.8 h1:gcHZ1afxtmYrs8TvL1HqlRDFrF7SjnTbxCNWZGM5kik=
github.com/livekit/protocol v0.5.8/go.mod h1:wo3CGfYB7XMF8GoVJAfTARrYSP/ombi+sbLl6AYdKP0=
github.com/lucsky/cuid v1.0.2 h1:z4XlExeoderxoPj2/dxKOyPxe9RCOu7yNq9/XWxIUMQ=
github.com/lucsky/cuid v1.0.2/go.mod h1:QaaJqckboimOmhRSJXSx/+IT+VTfxfPGSo/6mfgUfmE=
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=

View File

@@ -31,6 +31,7 @@ type ParticipantInit struct {
ProtocolVersion int32
UsePlanB bool
AutoSubscribe bool
Hidden bool
}
type NewParticipantCallback func(roomName string, pi ParticipantInit, requestSource MessageSource, responseSink MessageSink)

View File

@@ -1,20 +1,22 @@
package rtc_test
import (
"github.com/livekit/protocol/utils"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/rtc/types/typesfakes"
livekit "github.com/livekit/livekit-server/proto"
"github.com/livekit/protocol/utils"
)
func newMockParticipant(identity string, protocol types.ProtocolVersion) *typesfakes.FakeParticipant {
func newMockParticipant(identity string, protocol types.ProtocolVersion, hidden bool) *typesfakes.FakeParticipant {
p := &typesfakes.FakeParticipant{}
p.IDReturns(utils.NewGuid(utils.ParticipantPrefix))
p.IdentityReturns(identity)
p.StateReturns(livekit.ParticipantInfo_JOINED)
p.ProtocolVersionReturns(protocol)
p.CanSubscribeReturns(true)
p.CanPublishReturns(true)
p.CanPublishReturns(!hidden)
p.HiddenReturns(hidden)
p.SetMetadataStub = func(m string) {
var f func(participant types.Participant)

View File

@@ -39,6 +39,7 @@ type ParticipantParams struct {
Stats *RoomStatsReporter
ThrottleConfig config.PLIThrottleConfig
EnabledCodecs []*livekit.Codec
Hidden bool
}
type ParticipantImpl struct {
@@ -195,6 +196,7 @@ func (p *ParticipantImpl) ToProto() *livekit.ParticipantInfo {
Metadata: p.metadata,
State: p.State(),
JoinedAt: p.ConnectedAt().Unix(),
Hidden: p.Hidden(),
}
p.lock.RLock()
@@ -583,6 +585,10 @@ func (p *ParticipantImpl) CanSubscribe() bool {
return p.permission == nil || p.permission.CanSubscribe
}
func (p *ParticipantImpl) Hidden() bool {
return p.params.Hidden
}
func (p *ParticipantImpl) SubscriberPC() *webrtc.PeerConnection {
return p.subscriber.pc
}

View File

@@ -199,7 +199,7 @@ func (r *Room) Join(participant types.Participant, opts *ParticipantOptions) err
// gather other participants and send join response
otherParticipants := make([]types.Participant, 0, len(r.participants))
for _, p := range r.participants {
if p.ID() != participant.ID() {
if p.ID() != participant.ID() && !p.Hidden() {
otherParticipants = append(otherParticipants, p)
}
}
@@ -295,10 +295,15 @@ func (r *Room) CloseIfEmpty() {
}
r.lock.RLock()
numParticipants := len(r.participants)
visibleParticipants := 0
for _, p := range r.participants {
if !p.Hidden() {
visibleParticipants++
}
}
r.lock.RUnlock()
if numParticipants > 0 {
if visibleParticipants > 0 {
return
}
@@ -472,6 +477,19 @@ func (r *Room) subscribeToExistingTracks(p types.Participant) {
// broadcast an update about participant p
func (r *Room) broadcastParticipantState(p types.Participant, skipSource bool) {
if p.Hidden() {
if !skipSource {
// send update only to hidden participant
updates := ToProtoParticipants([]types.Participant{p})
err := p.SendParticipantUpdate(updates)
if err != nil {
logger.Errorw("could not send update to participant", err,
"participant", p.Identity(), "pID", p.ID())
}
}
return
}
updates := ToProtoParticipants([]types.Participant{p})
participants := r.GetParticipants()
for _, op := range participants {

View File

@@ -5,9 +5,10 @@ import (
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/testutils"
"github.com/stretchr/testify/require"
"github.com/livekit/livekit-server/pkg/logger"
"github.com/livekit/livekit-server/pkg/rtc"
@@ -59,7 +60,7 @@ func TestJoinedState(t *testing.T) {
func TestRoomJoin(t *testing.T) {
t.Run("joining returns existing participant data", func(t *testing.T) {
rm := newRoomWithParticipants(t, testRoomOpts{num: numParticipants})
pNew := newMockParticipant("new", types.DefaultProtocol)
pNew := newMockParticipant("new", types.DefaultProtocol, false)
rm.Join(pNew, nil)
@@ -74,7 +75,7 @@ func TestRoomJoin(t *testing.T) {
t.Run("subscribe to existing channels upon join", func(t *testing.T) {
numExisting := 3
rm := newRoomWithParticipants(t, testRoomOpts{num: numExisting})
p := newMockParticipant("new", types.DefaultProtocol)
p := newMockParticipant("new", types.DefaultProtocol, false)
err := rm.Join(p, &rtc.ParticipantOptions{AutoSubscribe: true})
require.NoError(t, err)
@@ -128,7 +129,7 @@ func TestRoomJoin(t *testing.T) {
t.Run("cannot exceed max participants", func(t *testing.T) {
rm := newRoomWithParticipants(t, testRoomOpts{num: 1})
rm.Room.MaxParticipants = 1
p := newMockParticipant("second", types.ProtocolVersion(0))
p := newMockParticipant("second", types.ProtocolVersion(0), false)
err := rm.Join(p, nil)
require.Equal(t, rtc.ErrMaxParticipantsExceeded, err)
@@ -455,8 +456,50 @@ func TestDataChannel(t *testing.T) {
})
}
func TestHiddenParticipants(t *testing.T) {
t.Run("other participants don't receive hidden updates", func(t *testing.T) {
rm := newRoomWithParticipants(t, testRoomOpts{num: 2, numHidden: 1})
defer rm.Close()
pNew := newMockParticipant("new", types.DefaultProtocol, false)
rm.Join(pNew, nil)
// expect new participant to get a JoinReply
info, participants, iceServers := pNew.SendJoinResponseArgsForCall(0)
require.Equal(t, info.Sid, rm.Room.Sid)
require.Len(t, participants, 2)
require.Len(t, rm.GetParticipants(), 4)
require.NotEmpty(t, iceServers)
})
t.Run("hidden participant subscribes to tracks", func(t *testing.T) {
rm := newRoomWithParticipants(t, testRoomOpts{num: 2, numHidden: 1})
p := newMockParticipant("new", types.DefaultProtocol, false)
err := rm.Join(p, &rtc.ParticipantOptions{AutoSubscribe: true})
require.NoError(t, err)
stateChangeCB := p.OnStateChangeArgsForCall(0)
require.NotNil(t, stateChangeCB)
p.StateReturns(livekit.ParticipantInfo_ACTIVE)
stateChangeCB(p, livekit.ParticipantInfo_JOINED)
// it should become a subscriber when connectivity changes
for _, op := range rm.GetParticipants() {
if p == op {
continue
}
mockP := op.(*typesfakes.FakeParticipant)
require.NotZero(t, mockP.AddSubscriberCallCount())
// last call should be to add the newest participant
require.Equal(t, p, mockP.AddSubscriberArgsForCall(mockP.AddSubscriberCallCount()-1))
}
})
}
type testRoomOpts struct {
num int
numHidden int
protocol types.ProtocolVersion
audioSmoothIntervals uint32
}
@@ -477,9 +520,9 @@ func newRoomWithParticipants(t *testing.T, opts testRoomOpts) *rtc.Room {
SmoothIntervals: opts.audioSmoothIntervals,
},
)
for i := 0; i < opts.num; i++ {
for i := 0; i < opts.num+opts.numHidden; i++ {
identity := fmt.Sprintf("p%d", i)
participant := newMockParticipant(identity, opts.protocol)
participant := newMockParticipant(identity, opts.protocol, i >= opts.num)
err := rm.Join(participant, &rtc.ParticipantOptions{AutoSubscribe: true})
participant.StateReturns(livekit.ParticipantInfo_ACTIVE)
require.NoError(t, err)

View File

@@ -57,6 +57,7 @@ type Participant interface {
CanPublish() bool
CanSubscribe() bool
Hidden() bool
Start()
Close() error

View File

@@ -165,6 +165,16 @@ type FakeParticipant struct {
result1 webrtc.SessionDescription
result2 error
}
HiddenStub func() bool
hiddenMutex sync.RWMutex
hiddenArgsForCall []struct {
}
hiddenReturns struct {
result1 bool
}
hiddenReturnsOnCall map[int]struct {
result1 bool
}
ICERestartStub func() error
iCERestartMutex sync.RWMutex
iCERestartArgsForCall []struct {
@@ -1191,6 +1201,59 @@ func (fake *FakeParticipant) HandleOfferReturnsOnCall(i int, result1 webrtc.Sess
}{result1, result2}
}
func (fake *FakeParticipant) Hidden() bool {
fake.hiddenMutex.Lock()
ret, specificReturn := fake.hiddenReturnsOnCall[len(fake.hiddenArgsForCall)]
fake.hiddenArgsForCall = append(fake.hiddenArgsForCall, struct {
}{})
stub := fake.HiddenStub
fakeReturns := fake.hiddenReturns
fake.recordInvocation("Hidden", []interface{}{})
fake.hiddenMutex.Unlock()
if stub != nil {
return stub()
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeParticipant) HiddenCallCount() int {
fake.hiddenMutex.RLock()
defer fake.hiddenMutex.RUnlock()
return len(fake.hiddenArgsForCall)
}
func (fake *FakeParticipant) HiddenCalls(stub func() bool) {
fake.hiddenMutex.Lock()
defer fake.hiddenMutex.Unlock()
fake.HiddenStub = stub
}
func (fake *FakeParticipant) HiddenReturns(result1 bool) {
fake.hiddenMutex.Lock()
defer fake.hiddenMutex.Unlock()
fake.HiddenStub = nil
fake.hiddenReturns = struct {
result1 bool
}{result1}
}
func (fake *FakeParticipant) HiddenReturnsOnCall(i int, result1 bool) {
fake.hiddenMutex.Lock()
defer fake.hiddenMutex.Unlock()
fake.HiddenStub = nil
if fake.hiddenReturnsOnCall == nil {
fake.hiddenReturnsOnCall = make(map[int]struct {
result1 bool
})
}
fake.hiddenReturnsOnCall[i] = struct {
result1 bool
}{result1}
}
func (fake *FakeParticipant) ICERestart() error {
fake.iCERestartMutex.Lock()
ret, specificReturn := fake.iCERestartReturnsOnCall[len(fake.iCERestartArgsForCall)]
@@ -2507,6 +2570,8 @@ func (fake *FakeParticipant) Invocations() map[string][][]interface{} {
defer fake.handleAnswerMutex.RUnlock()
fake.handleOfferMutex.RLock()
defer fake.handleOfferMutex.RUnlock()
fake.hiddenMutex.RLock()
defer fake.hiddenMutex.RUnlock()
fake.iCERestartMutex.RLock()
defer fake.iCERestartMutex.RUnlock()
fake.iDMutex.RLock()

View File

@@ -6,9 +6,10 @@ import (
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/livekit/livekit-server/pkg/service"
livekit "github.com/livekit/livekit-server/proto"
"github.com/stretchr/testify/require"
)
func TestParticipantPersistence(t *testing.T) {

View File

@@ -291,6 +291,7 @@ func (r *RoomManager) StartSession(roomName string, pi routing.ParticipantInit,
Stats: room.GetStatsReporter(),
ThrottleConfig: r.config.RTC.PLIThrottle,
EnabledCodecs: room.Room.EnabledCodecs,
Hidden: pi.Hidden,
})
if err != nil {
logger.Errorw("could not create participant", err)

View File

@@ -81,6 +81,7 @@ func (s *RTCService) validate(r *http.Request) (string, routing.ParticipantInit,
UsePlanB: boolValue(planBParam),
AutoSubscribe: true,
Metadata: claims.Metadata,
Hidden: claims.Video.Hidden,
}
if autoSubParam != "" {
pi.AutoSubscribe = boolValue(autoSubParam)