RoomService.updateParticipantMetadata, participant permissions

This commit is contained in:
David Zhao
2021-03-16 01:22:21 -07:00
parent e04eaeb480
commit f7ed2cee60
19 changed files with 1113 additions and 325 deletions
+9 -2
View File
@@ -23,7 +23,14 @@ type MessageSource interface {
ReadChan() <-chan proto.Message
}
type NewParticipantCallback func(roomName, identity, metadata string, reconnect bool, requestSource MessageSource, responseSink MessageSink)
type ParticipantInit struct {
Identity string
Metadata string
Reconnect bool
Permission *livekit.ParticipantPermission
}
type NewParticipantCallback func(roomName string, pi ParticipantInit, requestSource MessageSource, responseSink MessageSink)
type RTCMessageCallback func(roomName, identity string, msg *livekit.RTCNodeMessage)
// Router allows multiple nodes to coordinate the participant session
@@ -39,7 +46,7 @@ type Router interface {
ListNodes() ([]*livekit.Node, error)
// participant signal connection is ready to start
StartParticipantSignal(roomName, identity, metadata string, reconnect bool) (connectionId string, reqSink MessageSink, resSource MessageSource, err error)
StartParticipantSignal(roomName string, pi ParticipantInit) (connectionId string, reqSink MessageSink, resSource MessageSource, err error)
// sends a message to RTC node
CreateRTCSink(roomName, identity string) (MessageSink, error)
+4 -6
View File
@@ -71,7 +71,7 @@ func (r *LocalRouter) ListNodes() ([]*livekit.Node, error) {
}, nil
}
func (r *LocalRouter) StartParticipantSignal(roomName, identity, metadata string, reconnect bool) (connectionId string, reqSink MessageSink, resSource MessageSource, err error) {
func (r *LocalRouter) StartParticipantSignal(roomName string, pi ParticipantInit) (connectionId string, reqSink MessageSink, resSource MessageSource, err error) {
// treat it as a new participant connecting
if r.onNewParticipant == nil {
err = ErrHandlerNotDefined
@@ -79,21 +79,19 @@ func (r *LocalRouter) StartParticipantSignal(roomName, identity, metadata string
}
// index channels by roomName | identity
key := participantKey(roomName, identity)
key := participantKey(roomName, pi.Identity)
reqChan := r.getOrCreateMessageChannel(r.requestChannels, key)
resChan := r.getOrCreateMessageChannel(r.responseChannels, key)
r.onNewParticipant(
roomName,
identity,
metadata,
reconnect,
pi,
// request source
reqChan,
// response sink
resChan,
)
return identity, reqChan, resChan, nil
return pi.Identity, reqChan, resChan, nil
}
func (r *LocalRouter) CreateRTCSink(roomName, identity string) (MessageSink, error) {
+14 -8
View File
@@ -129,7 +129,7 @@ func (r *RedisRouter) ListNodes() ([]*livekit.Node, error) {
}
// signal connection sets up paths to the RTC node, and starts to route messages to that message queue
func (r *RedisRouter) StartParticipantSignal(roomName, identity, metadata string, reconnect bool) (connectionId string, reqSink MessageSink, resSource MessageSource, err error) {
func (r *RedisRouter) StartParticipantSignal(roomName string, pi ParticipantInit) (connectionId string, reqSink MessageSink, resSource MessageSource, err error) {
// find the node where the room is hosted at
rtcNode, err := r.GetNodeForRoom(roomName)
if err != nil {
@@ -138,7 +138,7 @@ func (r *RedisRouter) StartParticipantSignal(roomName, identity, metadata string
// create a new connection id
connectionId = utils.NewGuid("CO_")
pKey := participantKey(roomName, identity)
pKey := participantKey(roomName, pi.Identity)
// map signal & rtc nodes
if err = r.setParticipantSignalNode(connectionId, r.currentNode.Id); err != nil {
@@ -150,11 +150,12 @@ func (r *RedisRouter) StartParticipantSignal(roomName, identity, metadata string
// sends a message to start session
err = sink.WriteMessage(&livekit.StartSession{
RoomName: roomName,
Identity: identity,
Metadata: metadata,
Identity: pi.Identity,
Metadata: pi.Metadata,
// connection id is to allow the RTC node to identify where to route the message back to
ConnectionId: connectionId,
Reconnect: reconnect,
Reconnect: pi.Reconnect,
Permission: pi.Permission,
})
if err != nil {
return
@@ -215,13 +216,18 @@ func (r *RedisRouter) startParticipantRTC(ss *livekit.StartSession, participantK
}
}
pi := ParticipantInit{
Identity: ss.Identity,
Metadata: ss.Metadata,
Reconnect: ss.Reconnect,
Permission: ss.Permission,
}
reqChan := r.getOrCreateMessageChannel(r.requestChannels, participantKey)
resSink := NewSignalNodeSink(r.rc, signalNode, ss.ConnectionId)
r.onNewParticipant(
ss.RoomName,
ss.Identity,
ss.Metadata,
ss.Reconnect,
pi,
reqChan,
resSink,
)
+10 -14
View File
@@ -124,13 +124,11 @@ type FakeRouter struct {
startReturnsOnCall map[int]struct {
result1 error
}
StartParticipantSignalStub func(string, string, string, bool) (string, routing.MessageSink, routing.MessageSource, error)
StartParticipantSignalStub func(string, routing.ParticipantInit) (string, routing.MessageSink, routing.MessageSource, error)
startParticipantSignalMutex sync.RWMutex
startParticipantSignalArgsForCall []struct {
arg1 string
arg2 string
arg3 string
arg4 bool
arg2 routing.ParticipantInit
}
startParticipantSignalReturns struct {
result1 string
@@ -757,21 +755,19 @@ func (fake *FakeRouter) StartReturnsOnCall(i int, result1 error) {
}{result1}
}
func (fake *FakeRouter) StartParticipantSignal(arg1 string, arg2 string, arg3 string, arg4 bool) (string, routing.MessageSink, routing.MessageSource, error) {
func (fake *FakeRouter) StartParticipantSignal(arg1 string, arg2 routing.ParticipantInit) (string, routing.MessageSink, routing.MessageSource, error) {
fake.startParticipantSignalMutex.Lock()
ret, specificReturn := fake.startParticipantSignalReturnsOnCall[len(fake.startParticipantSignalArgsForCall)]
fake.startParticipantSignalArgsForCall = append(fake.startParticipantSignalArgsForCall, struct {
arg1 string
arg2 string
arg3 string
arg4 bool
}{arg1, arg2, arg3, arg4})
arg2 routing.ParticipantInit
}{arg1, arg2})
stub := fake.StartParticipantSignalStub
fakeReturns := fake.startParticipantSignalReturns
fake.recordInvocation("StartParticipantSignal", []interface{}{arg1, arg2, arg3, arg4})
fake.recordInvocation("StartParticipantSignal", []interface{}{arg1, arg2})
fake.startParticipantSignalMutex.Unlock()
if stub != nil {
return stub(arg1, arg2, arg3, arg4)
return stub(arg1, arg2)
}
if specificReturn {
return ret.result1, ret.result2, ret.result3, ret.result4
@@ -785,17 +781,17 @@ func (fake *FakeRouter) StartParticipantSignalCallCount() int {
return len(fake.startParticipantSignalArgsForCall)
}
func (fake *FakeRouter) StartParticipantSignalCalls(stub func(string, string, string, bool) (string, routing.MessageSink, routing.MessageSource, error)) {
func (fake *FakeRouter) StartParticipantSignalCalls(stub func(string, routing.ParticipantInit) (string, routing.MessageSink, routing.MessageSource, error)) {
fake.startParticipantSignalMutex.Lock()
defer fake.startParticipantSignalMutex.Unlock()
fake.StartParticipantSignalStub = stub
}
func (fake *FakeRouter) StartParticipantSignalArgsForCall(i int) (string, string, string, bool) {
func (fake *FakeRouter) StartParticipantSignalArgsForCall(i int) (string, routing.ParticipantInit) {
fake.startParticipantSignalMutex.RLock()
defer fake.startParticipantSignalMutex.RUnlock()
argsForCall := fake.startParticipantSignalArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeRouter) StartParticipantSignalReturns(result1 string, result2 routing.MessageSink, result3 routing.MessageSource, result4 error) {