Interface updates (#194)

* update interfaces, a bit of cleaning

* regenerate

* return interface for RoomService

* export packetBufferSize

* update router interface

* move participant key into router

* change locks back

* read only room store

* fix server rm locks

* update SendJoinResponse

* clean up imports

* update room messaging

* regenerate
This commit is contained in:
David Colburn
2021-11-15 13:25:50 -08:00
committed by GitHub
parent ffb2c50a70
commit 95e29d3766
17 changed files with 340 additions and 333 deletions
+12 -8
View File
@@ -44,33 +44,37 @@ type RTCMessageCallback func(ctx context.Context, roomName, identity string, msg
// Router allows multiple nodes to coordinate the participant session
//counterfeiter:generate . Router
type Router interface {
MessageRouter
RegisterNode() error
UnregisterNode() error
RemoveDeadNodes() error
GetNode(nodeId string) (*livekit.Node, error)
ListNodes() ([]*livekit.Node, error)
GetNodeForRoom(ctx context.Context, roomName string) (*livekit.Node, error)
SetNodeForRoom(ctx context.Context, roomName, nodeId string) error
ClearRoomState(ctx context.Context, roomName string) error
Start() error
Drain()
Stop()
}
type MessageRouter interface {
// StartParticipantSignal participant signal connection is ready to start
StartParticipantSignal(ctx context.Context, roomName string, pi ParticipantInit) (connectionId string, reqSink MessageSink, resSource MessageSource, err error)
// WriteRTCMessage sends a message to the RTC node
WriteRTCMessage(ctx context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) error
WriteRTCNodeMessage(ctx context.Context, nodeID string, msg *livekit.RTCNodeMessage) error
// Write a message to a participant, room, or node
WriteParticipantRTC(ctx context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) error
WriteRoomRTC(ctx context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) error
WriteNodeRTC(ctx context.Context, nodeID string, msg *livekit.RTCNodeMessage) error
// OnNewParticipantRTC is called to start a new participant's RTC connection
OnNewParticipantRTC(callback NewParticipantCallback)
// OnRTCMessage is called to execute actions on the RTC node
OnRTCMessage(callback RTCMessageCallback)
Start() error
Drain()
Stop()
}
func CreateRouter(conf *config.Config, rc *redis.Client, node LocalNode) Router {
+7 -2
View File
@@ -110,7 +110,7 @@ func (r *LocalRouter) StartParticipantSignal(ctx context.Context, roomName strin
return pi.Identity, reqChan, resChan, nil
}
func (r *LocalRouter) WriteRTCMessage(ctx context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) error {
func (r *LocalRouter) WriteParticipantRTC(ctx context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) error {
if r.rtcMessageChan.IsClosed() {
// create a new one
r.rtcMessageChan = NewMessageChannel()
@@ -119,7 +119,12 @@ func (r *LocalRouter) WriteRTCMessage(ctx context.Context, roomName, identity st
return r.writeRTCMessage(r.rtcMessageChan, msg)
}
func (r *LocalRouter) WriteRTCNodeMessage(ctx context.Context, nodeID string, msg *livekit.RTCNodeMessage) error {
func (r *LocalRouter) WriteRoomRTC(ctx context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) error {
msg.ParticipantKey = participantKey(roomName, identity)
return r.WriteNodeRTC(ctx, r.currentNode.Id, msg)
}
func (r *LocalRouter) WriteNodeRTC(ctx context.Context, nodeID string, msg *livekit.RTCNodeMessage) error {
if r.rtcMessageChan.IsClosed() {
// create a new one
r.rtcMessageChan = NewMessageChannel()
+12 -3
View File
@@ -168,7 +168,7 @@ func (r *RedisRouter) StartParticipantSignal(ctx context.Context, roomName strin
return connectionId, sink, resChan, nil
}
func (r *RedisRouter) WriteRTCMessage(ctx context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) error {
func (r *RedisRouter) WriteParticipantRTC(ctx context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) error {
pkey := participantKey(roomName, identity)
rtcNode, err := r.getParticipantRTCNode(pkey)
if err != nil {
@@ -180,7 +180,16 @@ func (r *RedisRouter) WriteRTCMessage(ctx context.Context, roomName, identity st
return r.writeRTCMessage(rtcSink, msg)
}
func (r *RedisRouter) WriteRTCNodeMessage(ctx context.Context, rtcNodeID string, msg *livekit.RTCNodeMessage) error {
func (r *RedisRouter) WriteRoomRTC(ctx context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) error {
node, err := r.GetNodeForRoom(ctx, roomName)
if err != nil {
return err
}
msg.ParticipantKey = participantKey(roomName, identity)
return r.WriteNodeRTC(ctx, node.Id, msg)
}
func (r *RedisRouter) WriteNodeRTC(ctx context.Context, rtcNodeID string, msg *livekit.RTCNodeMessage) error {
rtcSink := NewRTCNodeSink(r.rc, rtcNodeID, msg.ParticipantKey)
return r.writeRTCMessage(rtcSink, msg)
}
@@ -319,7 +328,7 @@ func (r *RedisRouter) statsWorker() {
// update periodically seconds
select {
case <-time.After(statsUpdateInterval):
r.WriteRTCNodeMessage(context.Background(), r.currentNode.Id, &livekit.RTCNodeMessage{
_ = r.WriteNodeRTC(context.Background(), r.currentNode.Id, &livekit.RTCNodeMessage{
Message: &livekit.RTCNodeMessage_KeepAlive{},
})
case <-r.ctx.Done():
+191 -190
View File
@@ -26,19 +26,6 @@ type FakeRouter struct {
drainMutex sync.RWMutex
drainArgsForCall []struct {
}
GetNodeStub func(string) (*livekit.Node, error)
getNodeMutex sync.RWMutex
getNodeArgsForCall []struct {
arg1 string
}
getNodeReturns struct {
result1 *livekit.Node
result2 error
}
getNodeReturnsOnCall map[int]struct {
result1 *livekit.Node
result2 error
}
GetNodeForRoomStub func(context.Context, string) (*livekit.Node, error)
getNodeForRoomMutex sync.RWMutex
getNodeForRoomArgsForCall []struct {
@@ -151,31 +138,45 @@ type FakeRouter struct {
unregisterNodeReturnsOnCall map[int]struct {
result1 error
}
WriteRTCMessageStub func(context.Context, string, string, *livekit.RTCNodeMessage) error
writeRTCMessageMutex sync.RWMutex
writeRTCMessageArgsForCall []struct {
WriteNodeRTCStub func(context.Context, string, *livekit.RTCNodeMessage) error
writeNodeRTCMutex sync.RWMutex
writeNodeRTCArgsForCall []struct {
arg1 context.Context
arg2 string
arg3 *livekit.RTCNodeMessage
}
writeNodeRTCReturns struct {
result1 error
}
writeNodeRTCReturnsOnCall map[int]struct {
result1 error
}
WriteParticipantRTCStub func(context.Context, string, string, *livekit.RTCNodeMessage) error
writeParticipantRTCMutex sync.RWMutex
writeParticipantRTCArgsForCall []struct {
arg1 context.Context
arg2 string
arg3 string
arg4 *livekit.RTCNodeMessage
}
writeRTCMessageReturns struct {
writeParticipantRTCReturns struct {
result1 error
}
writeRTCMessageReturnsOnCall map[int]struct {
writeParticipantRTCReturnsOnCall map[int]struct {
result1 error
}
WriteRTCNodeMessageStub func(context.Context, string, *livekit.RTCNodeMessage) error
writeRTCNodeMessageMutex sync.RWMutex
writeRTCNodeMessageArgsForCall []struct {
WriteRoomRTCStub func(context.Context, string, string, *livekit.RTCNodeMessage) error
writeRoomRTCMutex sync.RWMutex
writeRoomRTCArgsForCall []struct {
arg1 context.Context
arg2 string
arg3 *livekit.RTCNodeMessage
arg3 string
arg4 *livekit.RTCNodeMessage
}
writeRTCNodeMessageReturns struct {
writeRoomRTCReturns struct {
result1 error
}
writeRTCNodeMessageReturnsOnCall map[int]struct {
writeRoomRTCReturnsOnCall map[int]struct {
result1 error
}
invocations map[string][][]interface{}
@@ -268,70 +269,6 @@ func (fake *FakeRouter) DrainCalls(stub func()) {
fake.DrainStub = stub
}
func (fake *FakeRouter) GetNode(arg1 string) (*livekit.Node, error) {
fake.getNodeMutex.Lock()
ret, specificReturn := fake.getNodeReturnsOnCall[len(fake.getNodeArgsForCall)]
fake.getNodeArgsForCall = append(fake.getNodeArgsForCall, struct {
arg1 string
}{arg1})
stub := fake.GetNodeStub
fakeReturns := fake.getNodeReturns
fake.recordInvocation("GetNode", []interface{}{arg1})
fake.getNodeMutex.Unlock()
if stub != nil {
return stub(arg1)
}
if specificReturn {
return ret.result1, ret.result2
}
return fakeReturns.result1, fakeReturns.result2
}
func (fake *FakeRouter) GetNodeCallCount() int {
fake.getNodeMutex.RLock()
defer fake.getNodeMutex.RUnlock()
return len(fake.getNodeArgsForCall)
}
func (fake *FakeRouter) GetNodeCalls(stub func(string) (*livekit.Node, error)) {
fake.getNodeMutex.Lock()
defer fake.getNodeMutex.Unlock()
fake.GetNodeStub = stub
}
func (fake *FakeRouter) GetNodeArgsForCall(i int) string {
fake.getNodeMutex.RLock()
defer fake.getNodeMutex.RUnlock()
argsForCall := fake.getNodeArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeRouter) GetNodeReturns(result1 *livekit.Node, result2 error) {
fake.getNodeMutex.Lock()
defer fake.getNodeMutex.Unlock()
fake.GetNodeStub = nil
fake.getNodeReturns = struct {
result1 *livekit.Node
result2 error
}{result1, result2}
}
func (fake *FakeRouter) GetNodeReturnsOnCall(i int, result1 *livekit.Node, result2 error) {
fake.getNodeMutex.Lock()
defer fake.getNodeMutex.Unlock()
fake.GetNodeStub = nil
if fake.getNodeReturnsOnCall == nil {
fake.getNodeReturnsOnCall = make(map[int]struct {
result1 *livekit.Node
result2 error
})
}
fake.getNodeReturnsOnCall[i] = struct {
result1 *livekit.Node
result2 error
}{result1, result2}
}
func (fake *FakeRouter) GetNodeForRoom(arg1 context.Context, arg2 string) (*livekit.Node, error) {
fake.getNodeForRoomMutex.Lock()
ret, specificReturn := fake.getNodeForRoomReturnsOnCall[len(fake.getNodeForRoomArgsForCall)]
@@ -888,82 +825,18 @@ func (fake *FakeRouter) UnregisterNodeReturnsOnCall(i int, result1 error) {
}{result1}
}
func (fake *FakeRouter) WriteRTCMessage(arg1 context.Context, arg2 string, arg3 string, arg4 *livekit.RTCNodeMessage) error {
fake.writeRTCMessageMutex.Lock()
ret, specificReturn := fake.writeRTCMessageReturnsOnCall[len(fake.writeRTCMessageArgsForCall)]
fake.writeRTCMessageArgsForCall = append(fake.writeRTCMessageArgsForCall, struct {
arg1 context.Context
arg2 string
arg3 string
arg4 *livekit.RTCNodeMessage
}{arg1, arg2, arg3, arg4})
stub := fake.WriteRTCMessageStub
fakeReturns := fake.writeRTCMessageReturns
fake.recordInvocation("WriteRTCMessage", []interface{}{arg1, arg2, arg3, arg4})
fake.writeRTCMessageMutex.Unlock()
if stub != nil {
return stub(arg1, arg2, arg3, arg4)
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeRouter) WriteRTCMessageCallCount() int {
fake.writeRTCMessageMutex.RLock()
defer fake.writeRTCMessageMutex.RUnlock()
return len(fake.writeRTCMessageArgsForCall)
}
func (fake *FakeRouter) WriteRTCMessageCalls(stub func(context.Context, string, string, *livekit.RTCNodeMessage) error) {
fake.writeRTCMessageMutex.Lock()
defer fake.writeRTCMessageMutex.Unlock()
fake.WriteRTCMessageStub = stub
}
func (fake *FakeRouter) WriteRTCMessageArgsForCall(i int) (context.Context, string, string, *livekit.RTCNodeMessage) {
fake.writeRTCMessageMutex.RLock()
defer fake.writeRTCMessageMutex.RUnlock()
argsForCall := fake.writeRTCMessageArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4
}
func (fake *FakeRouter) WriteRTCMessageReturns(result1 error) {
fake.writeRTCMessageMutex.Lock()
defer fake.writeRTCMessageMutex.Unlock()
fake.WriteRTCMessageStub = nil
fake.writeRTCMessageReturns = struct {
result1 error
}{result1}
}
func (fake *FakeRouter) WriteRTCMessageReturnsOnCall(i int, result1 error) {
fake.writeRTCMessageMutex.Lock()
defer fake.writeRTCMessageMutex.Unlock()
fake.WriteRTCMessageStub = nil
if fake.writeRTCMessageReturnsOnCall == nil {
fake.writeRTCMessageReturnsOnCall = make(map[int]struct {
result1 error
})
}
fake.writeRTCMessageReturnsOnCall[i] = struct {
result1 error
}{result1}
}
func (fake *FakeRouter) WriteRTCNodeMessage(arg1 context.Context, arg2 string, arg3 *livekit.RTCNodeMessage) error {
fake.writeRTCNodeMessageMutex.Lock()
ret, specificReturn := fake.writeRTCNodeMessageReturnsOnCall[len(fake.writeRTCNodeMessageArgsForCall)]
fake.writeRTCNodeMessageArgsForCall = append(fake.writeRTCNodeMessageArgsForCall, struct {
func (fake *FakeRouter) WriteNodeRTC(arg1 context.Context, arg2 string, arg3 *livekit.RTCNodeMessage) error {
fake.writeNodeRTCMutex.Lock()
ret, specificReturn := fake.writeNodeRTCReturnsOnCall[len(fake.writeNodeRTCArgsForCall)]
fake.writeNodeRTCArgsForCall = append(fake.writeNodeRTCArgsForCall, struct {
arg1 context.Context
arg2 string
arg3 *livekit.RTCNodeMessage
}{arg1, arg2, arg3})
stub := fake.WriteRTCNodeMessageStub
fakeReturns := fake.writeRTCNodeMessageReturns
fake.recordInvocation("WriteRTCNodeMessage", []interface{}{arg1, arg2, arg3})
fake.writeRTCNodeMessageMutex.Unlock()
stub := fake.WriteNodeRTCStub
fakeReturns := fake.writeNodeRTCReturns
fake.recordInvocation("WriteNodeRTC", []interface{}{arg1, arg2, arg3})
fake.writeNodeRTCMutex.Unlock()
if stub != nil {
return stub(arg1, arg2, arg3)
}
@@ -973,44 +846,172 @@ func (fake *FakeRouter) WriteRTCNodeMessage(arg1 context.Context, arg2 string, a
return fakeReturns.result1
}
func (fake *FakeRouter) WriteRTCNodeMessageCallCount() int {
fake.writeRTCNodeMessageMutex.RLock()
defer fake.writeRTCNodeMessageMutex.RUnlock()
return len(fake.writeRTCNodeMessageArgsForCall)
func (fake *FakeRouter) WriteNodeRTCCallCount() int {
fake.writeNodeRTCMutex.RLock()
defer fake.writeNodeRTCMutex.RUnlock()
return len(fake.writeNodeRTCArgsForCall)
}
func (fake *FakeRouter) WriteRTCNodeMessageCalls(stub func(context.Context, string, *livekit.RTCNodeMessage) error) {
fake.writeRTCNodeMessageMutex.Lock()
defer fake.writeRTCNodeMessageMutex.Unlock()
fake.WriteRTCNodeMessageStub = stub
func (fake *FakeRouter) WriteNodeRTCCalls(stub func(context.Context, string, *livekit.RTCNodeMessage) error) {
fake.writeNodeRTCMutex.Lock()
defer fake.writeNodeRTCMutex.Unlock()
fake.WriteNodeRTCStub = stub
}
func (fake *FakeRouter) WriteRTCNodeMessageArgsForCall(i int) (context.Context, string, *livekit.RTCNodeMessage) {
fake.writeRTCNodeMessageMutex.RLock()
defer fake.writeRTCNodeMessageMutex.RUnlock()
argsForCall := fake.writeRTCNodeMessageArgsForCall[i]
func (fake *FakeRouter) WriteNodeRTCArgsForCall(i int) (context.Context, string, *livekit.RTCNodeMessage) {
fake.writeNodeRTCMutex.RLock()
defer fake.writeNodeRTCMutex.RUnlock()
argsForCall := fake.writeNodeRTCArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
}
func (fake *FakeRouter) WriteRTCNodeMessageReturns(result1 error) {
fake.writeRTCNodeMessageMutex.Lock()
defer fake.writeRTCNodeMessageMutex.Unlock()
fake.WriteRTCNodeMessageStub = nil
fake.writeRTCNodeMessageReturns = struct {
func (fake *FakeRouter) WriteNodeRTCReturns(result1 error) {
fake.writeNodeRTCMutex.Lock()
defer fake.writeNodeRTCMutex.Unlock()
fake.WriteNodeRTCStub = nil
fake.writeNodeRTCReturns = struct {
result1 error
}{result1}
}
func (fake *FakeRouter) WriteRTCNodeMessageReturnsOnCall(i int, result1 error) {
fake.writeRTCNodeMessageMutex.Lock()
defer fake.writeRTCNodeMessageMutex.Unlock()
fake.WriteRTCNodeMessageStub = nil
if fake.writeRTCNodeMessageReturnsOnCall == nil {
fake.writeRTCNodeMessageReturnsOnCall = make(map[int]struct {
func (fake *FakeRouter) WriteNodeRTCReturnsOnCall(i int, result1 error) {
fake.writeNodeRTCMutex.Lock()
defer fake.writeNodeRTCMutex.Unlock()
fake.WriteNodeRTCStub = nil
if fake.writeNodeRTCReturnsOnCall == nil {
fake.writeNodeRTCReturnsOnCall = make(map[int]struct {
result1 error
})
}
fake.writeRTCNodeMessageReturnsOnCall[i] = struct {
fake.writeNodeRTCReturnsOnCall[i] = struct {
result1 error
}{result1}
}
func (fake *FakeRouter) WriteParticipantRTC(arg1 context.Context, arg2 string, arg3 string, arg4 *livekit.RTCNodeMessage) error {
fake.writeParticipantRTCMutex.Lock()
ret, specificReturn := fake.writeParticipantRTCReturnsOnCall[len(fake.writeParticipantRTCArgsForCall)]
fake.writeParticipantRTCArgsForCall = append(fake.writeParticipantRTCArgsForCall, struct {
arg1 context.Context
arg2 string
arg3 string
arg4 *livekit.RTCNodeMessage
}{arg1, arg2, arg3, arg4})
stub := fake.WriteParticipantRTCStub
fakeReturns := fake.writeParticipantRTCReturns
fake.recordInvocation("WriteParticipantRTC", []interface{}{arg1, arg2, arg3, arg4})
fake.writeParticipantRTCMutex.Unlock()
if stub != nil {
return stub(arg1, arg2, arg3, arg4)
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeRouter) WriteParticipantRTCCallCount() int {
fake.writeParticipantRTCMutex.RLock()
defer fake.writeParticipantRTCMutex.RUnlock()
return len(fake.writeParticipantRTCArgsForCall)
}
func (fake *FakeRouter) WriteParticipantRTCCalls(stub func(context.Context, string, string, *livekit.RTCNodeMessage) error) {
fake.writeParticipantRTCMutex.Lock()
defer fake.writeParticipantRTCMutex.Unlock()
fake.WriteParticipantRTCStub = stub
}
func (fake *FakeRouter) WriteParticipantRTCArgsForCall(i int) (context.Context, string, string, *livekit.RTCNodeMessage) {
fake.writeParticipantRTCMutex.RLock()
defer fake.writeParticipantRTCMutex.RUnlock()
argsForCall := fake.writeParticipantRTCArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4
}
func (fake *FakeRouter) WriteParticipantRTCReturns(result1 error) {
fake.writeParticipantRTCMutex.Lock()
defer fake.writeParticipantRTCMutex.Unlock()
fake.WriteParticipantRTCStub = nil
fake.writeParticipantRTCReturns = struct {
result1 error
}{result1}
}
func (fake *FakeRouter) WriteParticipantRTCReturnsOnCall(i int, result1 error) {
fake.writeParticipantRTCMutex.Lock()
defer fake.writeParticipantRTCMutex.Unlock()
fake.WriteParticipantRTCStub = nil
if fake.writeParticipantRTCReturnsOnCall == nil {
fake.writeParticipantRTCReturnsOnCall = make(map[int]struct {
result1 error
})
}
fake.writeParticipantRTCReturnsOnCall[i] = struct {
result1 error
}{result1}
}
func (fake *FakeRouter) WriteRoomRTC(arg1 context.Context, arg2 string, arg3 string, arg4 *livekit.RTCNodeMessage) error {
fake.writeRoomRTCMutex.Lock()
ret, specificReturn := fake.writeRoomRTCReturnsOnCall[len(fake.writeRoomRTCArgsForCall)]
fake.writeRoomRTCArgsForCall = append(fake.writeRoomRTCArgsForCall, struct {
arg1 context.Context
arg2 string
arg3 string
arg4 *livekit.RTCNodeMessage
}{arg1, arg2, arg3, arg4})
stub := fake.WriteRoomRTCStub
fakeReturns := fake.writeRoomRTCReturns
fake.recordInvocation("WriteRoomRTC", []interface{}{arg1, arg2, arg3, arg4})
fake.writeRoomRTCMutex.Unlock()
if stub != nil {
return stub(arg1, arg2, arg3, arg4)
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeRouter) WriteRoomRTCCallCount() int {
fake.writeRoomRTCMutex.RLock()
defer fake.writeRoomRTCMutex.RUnlock()
return len(fake.writeRoomRTCArgsForCall)
}
func (fake *FakeRouter) WriteRoomRTCCalls(stub func(context.Context, string, string, *livekit.RTCNodeMessage) error) {
fake.writeRoomRTCMutex.Lock()
defer fake.writeRoomRTCMutex.Unlock()
fake.WriteRoomRTCStub = stub
}
func (fake *FakeRouter) WriteRoomRTCArgsForCall(i int) (context.Context, string, string, *livekit.RTCNodeMessage) {
fake.writeRoomRTCMutex.RLock()
defer fake.writeRoomRTCMutex.RUnlock()
argsForCall := fake.writeRoomRTCArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4
}
func (fake *FakeRouter) WriteRoomRTCReturns(result1 error) {
fake.writeRoomRTCMutex.Lock()
defer fake.writeRoomRTCMutex.Unlock()
fake.WriteRoomRTCStub = nil
fake.writeRoomRTCReturns = struct {
result1 error
}{result1}
}
func (fake *FakeRouter) WriteRoomRTCReturnsOnCall(i int, result1 error) {
fake.writeRoomRTCMutex.Lock()
defer fake.writeRoomRTCMutex.Unlock()
fake.WriteRoomRTCStub = nil
if fake.writeRoomRTCReturnsOnCall == nil {
fake.writeRoomRTCReturnsOnCall = make(map[int]struct {
result1 error
})
}
fake.writeRoomRTCReturnsOnCall[i] = struct {
result1 error
}{result1}
}
@@ -1022,8 +1023,6 @@ func (fake *FakeRouter) Invocations() map[string][][]interface{} {
defer fake.clearRoomStateMutex.RUnlock()
fake.drainMutex.RLock()
defer fake.drainMutex.RUnlock()
fake.getNodeMutex.RLock()
defer fake.getNodeMutex.RUnlock()
fake.getNodeForRoomMutex.RLock()
defer fake.getNodeForRoomMutex.RUnlock()
fake.listNodesMutex.RLock()
@@ -1046,10 +1045,12 @@ func (fake *FakeRouter) Invocations() map[string][][]interface{} {
defer fake.stopMutex.RUnlock()
fake.unregisterNodeMutex.RLock()
defer fake.unregisterNodeMutex.RUnlock()
fake.writeRTCMessageMutex.RLock()
defer fake.writeRTCMessageMutex.RUnlock()
fake.writeRTCNodeMessageMutex.RLock()
defer fake.writeRTCNodeMessageMutex.RUnlock()
fake.writeNodeRTCMutex.RLock()
defer fake.writeNodeRTCMutex.RUnlock()
fake.writeParticipantRTCMutex.RLock()
defer fake.writeParticipantRTCMutex.RUnlock()
fake.writeRoomRTCMutex.RLock()
defer fake.writeRoomRTCMutex.RUnlock()
copiedInvocations := map[string][][]interface{}{}
for key, value := range fake.invocations {
copiedInvocations[key] = value
+3 -3
View File
@@ -4,12 +4,12 @@ import (
"errors"
"net"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/pion/ice/v2"
"github.com/pion/webrtc/v3"
"github.com/livekit/livekit-server/pkg/config"
serverlogger "github.com/livekit/livekit-server/pkg/logger"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
)
const (
@@ -28,7 +28,7 @@ type WebRTCConfig struct {
}
type ReceiverConfig struct {
packetBufferSize int
PacketBufferSize int
maxBitrate uint64
}
@@ -116,7 +116,7 @@ func NewWebRTCConfig(conf *config.Config, externalIP string) (*WebRTCConfig, err
Configuration: c,
SettingEngine: s,
Receiver: ReceiverConfig{
packetBufferSize: rtcConf.PacketBufferSize,
PacketBufferSize: rtcConf.PacketBufferSize,
maxBitrate: rtcConf.MaxBitrate,
},
UDPMux: udpMux,
+1 -1
View File
@@ -187,7 +187,7 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error {
Channels: codec.Channels,
SDPFmtpLine: codec.SDPFmtpLine,
RTCPFeedback: feedbackTypes,
}, receiver, t.params.BufferFactory, sub.ID(), t.params.ReceiverConfig.packetBufferSize)
}, receiver, t.params.BufferFactory, sub.ID(), t.params.ReceiverConfig.PacketBufferSize)
if err != nil {
return err
}
+2 -2
View File
@@ -504,14 +504,14 @@ func (p *ParticipantImpl) RemoveSubscriber(participantId string) {
// signal connection methods
func (p *ParticipantImpl) SendJoinResponse(roomInfo *livekit.Room, otherParticipants []types.Participant, iceServers []*livekit.ICEServer) error {
func (p *ParticipantImpl) SendJoinResponse(roomInfo *livekit.Room, otherParticipants []*livekit.ParticipantInfo, iceServers []*livekit.ICEServer) error {
// send Join response
return p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_Join{
Join: &livekit.JoinResponse{
Room: roomInfo,
Participant: p.ToProto(),
OtherParticipants: ToProtoParticipants(otherParticipants),
OtherParticipants: otherParticipants,
ServerVersion: version.Version,
IceServers: iceServers,
// indicates both server and client support subscriber as primary
+11 -11
View File
@@ -8,7 +8,6 @@ import (
"time"
"github.com/go-logr/logr"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/protocol/logger"
livekit "github.com/livekit/protocol/proto"
"google.golang.org/protobuf/proto"
@@ -16,6 +15,7 @@ import (
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/sfu/buffer"
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
)
@@ -27,10 +27,15 @@ const (
)
type Room struct {
lock sync.RWMutex
Room *livekit.Room
Logger logger.Logger
config WebRTCConfig
lock sync.RWMutex
config WebRTCConfig
audioConfig *config.AudioConfig
telemetry *telemetry.TelemetryService
// map of identity -> Participant
participants map[string]types.Participant
participantOpts map[string]*ParticipantOptions
@@ -43,11 +48,6 @@ type Room struct {
closed chan struct{}
closeOnce sync.Once
// for active speaker updates
audioConfig *config.AudioConfig
telemetry *telemetry.TelemetryService
onParticipantChanged func(p types.Participant)
onMetadataUpdate func(metadata string)
onClose func()
@@ -66,7 +66,7 @@ func NewRoom(room *livekit.Room, config WebRTCConfig, audioConfig *config.AudioC
telemetry: telemetry,
participants: make(map[string]types.Participant),
participantOpts: make(map[string]*ParticipantOptions),
bufferFactory: buffer.NewBufferFactory(config.Receiver.packetBufferSize, logr.Logger{}),
bufferFactory: buffer.NewBufferFactory(config.Receiver.PacketBufferSize, logr.Logger{}),
closed: make(chan struct{}),
}
if r.Room.EmptyTimeout == 0 {
@@ -199,10 +199,10 @@ func (r *Room) Join(participant types.Participant, opts *ParticipantOptions, ice
r.participantOpts[participant.Identity()] = opts
// gather other participants and send join response
otherParticipants := make([]types.Participant, 0, len(r.participants))
otherParticipants := make([]*livekit.ParticipantInfo, 0, len(r.participants))
for _, p := range r.participants {
if p.ID() != participant.ID() && !p.Hidden() {
otherParticipants = append(otherParticipants, p)
otherParticipants = append(otherParticipants, p.ToProto())
}
}
+3 -3
View File
@@ -3,12 +3,12 @@ package types
import (
"time"
"github.com/livekit/livekit-server/pkg/sfu"
livekit "github.com/livekit/protocol/proto"
"github.com/pion/rtcp"
"github.com/pion/webrtc/v3"
"github.com/livekit/livekit-server/pkg/routing"
livekit "github.com/livekit/protocol/proto"
"github.com/livekit/livekit-server/pkg/sfu"
)
//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate
@@ -48,7 +48,7 @@ type Participant interface {
AddICECandidate(candidate webrtc.ICECandidateInit, target livekit.SignalTarget) error
AddSubscriber(op Participant) (int, error)
RemoveSubscriber(peerId string)
SendJoinResponse(info *livekit.Room, otherParticipants []Participant, iceServers []*livekit.ICEServer) error
SendJoinResponse(info *livekit.Room, otherParticipants []*livekit.ParticipantInfo, iceServers []*livekit.ICEServer) error
SendParticipantUpdate(participants []*livekit.ParticipantInfo, updatedAt time.Time) error
SendSpeakerUpdate(speakers []*livekit.SpeakerInfo) error
SendDataPacket(packet *livekit.DataPacket) error
+8 -8
View File
@@ -363,11 +363,11 @@ type FakeParticipant struct {
sendDataPacketReturnsOnCall map[int]struct {
result1 error
}
SendJoinResponseStub func(*livekit.Room, []types.Participant, []*livekit.ICEServer) error
SendJoinResponseStub func(*livekit.Room, []*livekit.ParticipantInfo, []*livekit.ICEServer) error
sendJoinResponseMutex sync.RWMutex
sendJoinResponseArgsForCall []struct {
arg1 *livekit.Room
arg2 []types.Participant
arg2 []*livekit.ParticipantInfo
arg3 []*livekit.ICEServer
}
sendJoinResponseReturns struct {
@@ -2390,10 +2390,10 @@ func (fake *FakeParticipant) SendDataPacketReturnsOnCall(i int, result1 error) {
}{result1}
}
func (fake *FakeParticipant) SendJoinResponse(arg1 *livekit.Room, arg2 []types.Participant, arg3 []*livekit.ICEServer) error {
var arg2Copy []types.Participant
func (fake *FakeParticipant) SendJoinResponse(arg1 *livekit.Room, arg2 []*livekit.ParticipantInfo, arg3 []*livekit.ICEServer) error {
var arg2Copy []*livekit.ParticipantInfo
if arg2 != nil {
arg2Copy = make([]types.Participant, len(arg2))
arg2Copy = make([]*livekit.ParticipantInfo, len(arg2))
copy(arg2Copy, arg2)
}
var arg3Copy []*livekit.ICEServer
@@ -2405,7 +2405,7 @@ func (fake *FakeParticipant) SendJoinResponse(arg1 *livekit.Room, arg2 []types.P
ret, specificReturn := fake.sendJoinResponseReturnsOnCall[len(fake.sendJoinResponseArgsForCall)]
fake.sendJoinResponseArgsForCall = append(fake.sendJoinResponseArgsForCall, struct {
arg1 *livekit.Room
arg2 []types.Participant
arg2 []*livekit.ParticipantInfo
arg3 []*livekit.ICEServer
}{arg1, arg2Copy, arg3Copy})
stub := fake.SendJoinResponseStub
@@ -2427,13 +2427,13 @@ func (fake *FakeParticipant) SendJoinResponseCallCount() int {
return len(fake.sendJoinResponseArgsForCall)
}
func (fake *FakeParticipant) SendJoinResponseCalls(stub func(*livekit.Room, []types.Participant, []*livekit.ICEServer) error) {
func (fake *FakeParticipant) SendJoinResponseCalls(stub func(*livekit.Room, []*livekit.ParticipantInfo, []*livekit.ICEServer) error) {
fake.sendJoinResponseMutex.Lock()
defer fake.sendJoinResponseMutex.Unlock()
fake.SendJoinResponseStub = stub
}
func (fake *FakeParticipant) SendJoinResponseArgsForCall(i int) (*livekit.Room, []types.Participant, []*livekit.ICEServer) {
func (fake *FakeParticipant) SendJoinResponseArgsForCall(i int) (*livekit.Room, []*livekit.ParticipantInfo, []*livekit.ICEServer) {
fake.sendJoinResponseMutex.RLock()
defer fake.sendJoinResponseMutex.RUnlock()
argsForCall := fake.sendJoinResponseArgsForCall[i]
+9 -17
View File
@@ -5,9 +5,6 @@ import (
"time"
livekit "github.com/livekit/protocol/proto"
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/rtc"
)
//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate
@@ -15,31 +12,26 @@ import (
// encapsulates CRUD operations for room settings
//counterfeiter:generate . RoomStore
type RoomStore interface {
StoreRoom(ctx context.Context, room *livekit.Room) error
LoadRoom(ctx context.Context, name string) (*livekit.Room, error)
ListRooms(ctx context.Context) ([]*livekit.Room, error)
DeleteRoom(ctx context.Context, name string) error
RORoomStore
// enable locking on a specific room to prevent race
// returns a (lock uuid, error)
LockRoom(ctx context.Context, name string, duration time.Duration) (string, error)
UnlockRoom(ctx context.Context, name string, uid string) error
StoreRoom(ctx context.Context, room *livekit.Room) error
DeleteRoom(ctx context.Context, name string) error
StoreParticipant(ctx context.Context, roomName string, participant *livekit.ParticipantInfo) error
LoadParticipant(ctx context.Context, roomName, identity string) (*livekit.ParticipantInfo, error)
ListParticipants(ctx context.Context, roomName string) ([]*livekit.ParticipantInfo, error)
DeleteParticipant(ctx context.Context, roomName, identity string) error
}
type RoomManager interface {
RoomStore
type RORoomStore interface {
LoadRoom(ctx context.Context, name string) (*livekit.Room, error)
ListRooms(ctx context.Context) ([]*livekit.Room, error)
GetRoom(ctx context.Context, roomName string) *rtc.Room
StartSession(ctx context.Context, roomName string, pi routing.ParticipantInit, requestSource routing.MessageSource, responseSink routing.MessageSink)
CleanupRooms() error
CloseIdleRooms()
HasParticipants() bool
Stop()
LoadParticipant(ctx context.Context, roomName, identity string) (*livekit.ParticipantInfo, error)
ListParticipants(ctx context.Context, roomName string) ([]*livekit.ParticipantInfo, error)
}
type RoomAllocator interface {
+39 -33
View File
@@ -20,37 +20,43 @@ const (
roomPurgeSeconds = 24 * 60 * 60
)
// LocalRoomManager manages rooms and its interaction with participants.
// RoomManager manages rooms and its interaction with participants.
// It's responsible for creating, deleting rooms, as well as running sessions for participants
type LocalRoomManager struct {
RoomStore
type RoomManager struct {
lock sync.RWMutex
lock sync.RWMutex
router routing.Router
currentNode routing.LocalNode
rtcConfig *rtc.WebRTCConfig
config *config.Config
rtcConfig *rtc.WebRTCConfig
currentNode routing.LocalNode
router routing.Router
roomStore RoomStore
telemetry *telemetry.TelemetryService
rooms map[string]*rtc.Room
rooms map[string]*rtc.Room
}
func NewLocalRoomManager(conf *config.Config, rs RoomStore, router routing.Router, currentNode routing.LocalNode,
telemetry *telemetry.TelemetryService) (*LocalRoomManager, error) {
func NewLocalRoomManager(
conf *config.Config,
roomStore RoomStore,
currentNode routing.LocalNode,
router routing.Router,
telemetry *telemetry.TelemetryService,
) (*RoomManager, error) {
rtcConf, err := rtc.NewWebRTCConfig(conf, currentNode.Ip)
if err != nil {
return nil, err
}
r := &LocalRoomManager{
RoomStore: rs,
lock: sync.RWMutex{},
rtcConfig: rtcConf,
r := &RoomManager{
config: conf,
router: router,
rtcConfig: rtcConf,
currentNode: currentNode,
router: router,
roomStore: roomStore,
telemetry: telemetry,
rooms: make(map[string]*rtc.Room),
rooms: make(map[string]*rtc.Room),
}
// hook up to router
@@ -59,14 +65,14 @@ func NewLocalRoomManager(conf *config.Config, rs RoomStore, router routing.Route
return r, nil
}
func (r *LocalRoomManager) GetRoom(ctx context.Context, roomName string) *rtc.Room {
func (r *RoomManager) GetRoom(ctx context.Context, roomName string) *rtc.Room {
r.lock.RLock()
defer r.lock.RUnlock()
return r.rooms[roomName]
}
// DeleteRoom completely deletes all room information, including active sessions, room store, and routing info
func (r *LocalRoomManager) DeleteRoom(ctx context.Context, roomName string) error {
func (r *RoomManager) DeleteRoom(ctx context.Context, roomName string) error {
logger.Infow("deleting room state", "room", roomName)
r.lock.Lock()
delete(r.rooms, roomName)
@@ -83,7 +89,7 @@ func (r *LocalRoomManager) DeleteRoom(ctx context.Context, roomName string) erro
// also delete room from db
go func() {
defer wg.Done()
err2 = r.RoomStore.DeleteRoom(ctx, roomName)
err2 = r.roomStore.DeleteRoom(ctx, roomName)
}()
wg.Wait()
@@ -95,10 +101,10 @@ func (r *LocalRoomManager) DeleteRoom(ctx context.Context, roomName string) erro
}
// CleanupRooms cleans up after old rooms that have been around for awhile
func (r *LocalRoomManager) CleanupRooms() error {
func (r *RoomManager) CleanupRooms() error {
// cleanup rooms that have been left for over a day
ctx := context.Background()
rooms, err := r.ListRooms(ctx)
rooms, err := r.roomStore.ListRooms(ctx)
if err != nil {
return err
}
@@ -114,7 +120,7 @@ func (r *LocalRoomManager) CleanupRooms() error {
return nil
}
func (r *LocalRoomManager) CloseIdleRooms() {
func (r *RoomManager) CloseIdleRooms() {
r.lock.RLock()
rooms := make([]*rtc.Room, 0, len(r.rooms))
for _, rm := range r.rooms {
@@ -127,7 +133,7 @@ func (r *LocalRoomManager) CloseIdleRooms() {
}
}
func (r *LocalRoomManager) HasParticipants() bool {
func (r *RoomManager) HasParticipants() bool {
r.lock.RLock()
defer r.lock.RUnlock()
@@ -139,7 +145,7 @@ func (r *LocalRoomManager) HasParticipants() bool {
return false
}
func (r *LocalRoomManager) Stop() {
func (r *RoomManager) Stop() {
// disconnect all clients
r.lock.RLock()
rooms := make([]*rtc.Room, 0, len(r.rooms))
@@ -166,7 +172,7 @@ func (r *LocalRoomManager) Stop() {
}
// StartSession starts WebRTC session when a new participant is connected, takes place on RTC node
func (r *LocalRoomManager) StartSession(ctx context.Context, roomName string, pi routing.ParticipantInit, requestSource routing.MessageSource, responseSink routing.MessageSink) {
func (r *RoomManager) StartSession(ctx context.Context, roomName string, pi routing.ParticipantInit, requestSource routing.MessageSource, responseSink routing.MessageSink) {
room, err := r.getOrCreateRoom(ctx, roomName)
if err != nil {
logger.Errorw("could not create room", err, "room", roomName)
@@ -261,7 +267,7 @@ func (r *LocalRoomManager) StartSession(ctx context.Context, roomName string, pi
}
// create the actual room object, to be used on RTC node
func (r *LocalRoomManager) getOrCreateRoom(ctx context.Context, roomName string) (*rtc.Room, error) {
func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName string) (*rtc.Room, error) {
r.lock.RLock()
room := r.rooms[roomName]
r.lock.RUnlock()
@@ -271,7 +277,7 @@ func (r *LocalRoomManager) getOrCreateRoom(ctx context.Context, roomName string)
}
// create new room, get details first
ri, err := r.LoadRoom(ctx, roomName)
ri, err := r.roomStore.LoadRoom(ctx, roomName)
if err != nil {
return nil, err
}
@@ -289,7 +295,7 @@ func (r *LocalRoomManager) getOrCreateRoom(ctx context.Context, roomName string)
logger.Infow("room closed")
})
room.OnMetadataUpdate(func(metadata string) {
err := r.StoreRoom(ctx, room.Room)
err := r.roomStore.StoreRoom(ctx, room.Room)
if err != nil {
logger.Errorw("could not handle metadata update", err)
}
@@ -297,9 +303,9 @@ func (r *LocalRoomManager) getOrCreateRoom(ctx context.Context, roomName string)
room.OnParticipantChanged(func(p types.Participant) {
var err error
if p.State() == livekit.ParticipantInfo_DISCONNECTED {
err = r.DeleteParticipant(ctx, roomName, p.Identity())
err = r.roomStore.DeleteParticipant(ctx, roomName, p.Identity())
} else {
err = r.StoreParticipant(ctx, roomName, p.ToProto())
err = r.roomStore.StoreParticipant(ctx, roomName, p.ToProto())
}
if err != nil {
logger.Errorw("could not handle participant change", err)
@@ -313,7 +319,7 @@ func (r *LocalRoomManager) getOrCreateRoom(ctx context.Context, roomName string)
}
// manages an RTC session for a participant, runs on the RTC node
func (r *LocalRoomManager) rtcSessionWorker(room *rtc.Room, participant types.Participant, requestSource routing.MessageSource) {
func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.Participant, requestSource routing.MessageSource) {
defer func() {
logger.Debugw("RTC session finishing",
"participant", participant.Identity(),
@@ -453,7 +459,7 @@ func (r *LocalRoomManager) rtcSessionWorker(room *rtc.Room, participant types.Pa
}
// handles RTC messages resulted from Room API calls
func (r *LocalRoomManager) handleRTCMessage(ctx context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) {
func (r *RoomManager) handleRTCMessage(ctx context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) {
r.lock.RLock()
room := r.rooms[roomName]
r.lock.RUnlock()
@@ -524,7 +530,7 @@ func (r *LocalRoomManager) handleRTCMessage(ctx context.Context, roomName, ident
}
}
func (r *LocalRoomManager) iceServersForRoom(ri *livekit.Room) []*livekit.ICEServer {
func (r *RoomManager) iceServersForRoom(ri *livekit.Room) []*livekit.ICEServer {
var iceServers []*livekit.ICEServer
hasSTUN := false
+26 -35
View File
@@ -13,12 +13,12 @@ import (
// A rooms service that supports a single node
type RoomService struct {
router routing.Router
router routing.MessageRouter
roomAllocator RoomAllocator
roomStore RoomStore
roomStore RORoomStore
}
func NewRoomService(ra RoomAllocator, rs RoomStore, router routing.Router) (svc *RoomService, err error) {
func NewRoomService(ra RoomAllocator, rs RORoomStore, router routing.MessageRouter) (svc livekit.RoomService, err error) {
svc = &RoomService{
router: router,
roomAllocator: ra,
@@ -62,19 +62,12 @@ func (s *RoomService) DeleteRoom(ctx context.Context, req *livekit.DeleteRoomReq
if err := EnsureCreatePermission(ctx); err != nil {
return nil, twirpAuthError(err)
}
// if the room is currently active, RTC node needs to disconnect clients
node, err := s.router.GetNodeForRoom(ctx, req.Room)
if err != nil {
return nil, err
}
err = s.router.WriteRTCNodeMessage(ctx, node.Id, &livekit.RTCNodeMessage{
ParticipantKey: s.roomParticipantKey(req.Room),
if err := s.writeRoomMessage(ctx, req.Room, "", &livekit.RTCNodeMessage{
Message: &livekit.RTCNodeMessage_DeleteRoom{
DeleteRoom: req,
},
})
if err != nil {
}); err != nil {
return nil, err
}
@@ -112,7 +105,7 @@ func (s *RoomService) GetParticipant(ctx context.Context, req *livekit.RoomParti
}
func (s *RoomService) RemoveParticipant(ctx context.Context, req *livekit.RoomParticipantIdentity) (res *livekit.RemoveParticipantResponse, err error) {
err = s.writeMessage(ctx, req.Room, req.Identity, &livekit.RTCNodeMessage{
err = s.writeRoomMessage(ctx, req.Room, req.Identity, &livekit.RTCNodeMessage{
Message: &livekit.RTCNodeMessage_RemoveParticipant{
RemoveParticipant: req,
},
@@ -142,7 +135,7 @@ func (s *RoomService) MutePublishedTrack(ctx context.Context, req *livekit.MuteR
return nil, twirp.NotFoundError(ErrTrackNotFound.Error())
}
err = s.writeMessage(ctx, req.Room, req.Identity, &livekit.RTCNodeMessage{
err = s.writeParticipantMessage(ctx, req.Room, req.Identity, &livekit.RTCNodeMessage{
Message: &livekit.RTCNodeMessage_MuteTrack{
MuteTrack: req,
},
@@ -160,7 +153,7 @@ func (s *RoomService) MutePublishedTrack(ctx context.Context, req *livekit.MuteR
}
func (s *RoomService) UpdateParticipant(ctx context.Context, req *livekit.UpdateParticipantRequest) (*livekit.ParticipantInfo, error) {
err := s.writeMessage(ctx, req.Room, req.Identity, &livekit.RTCNodeMessage{
err := s.writeRoomMessage(ctx, req.Room, req.Identity, &livekit.RTCNodeMessage{
Message: &livekit.RTCNodeMessage_UpdateParticipant{
UpdateParticipant: req,
},
@@ -179,7 +172,7 @@ func (s *RoomService) UpdateParticipant(ctx context.Context, req *livekit.Update
}
func (s *RoomService) UpdateSubscriptions(ctx context.Context, req *livekit.UpdateSubscriptionsRequest) (*livekit.UpdateSubscriptionsResponse, error) {
err := s.writeMessage(ctx, req.Room, req.Identity, &livekit.RTCNodeMessage{
err := s.writeRoomMessage(ctx, req.Room, req.Identity, &livekit.RTCNodeMessage{
Message: &livekit.RTCNodeMessage_UpdateSubscriptions{
UpdateSubscriptions: req,
},
@@ -192,14 +185,7 @@ func (s *RoomService) UpdateSubscriptions(ctx context.Context, req *livekit.Upda
}
func (s *RoomService) SendData(ctx context.Context, req *livekit.SendDataRequest) (*livekit.SendDataResponse, error) {
// here we are using any user's identity, due to how it works with routing
node, err := s.router.GetNodeForRoom(ctx, req.Room)
if err != nil {
return nil, err
}
err = s.router.WriteRTCNodeMessage(ctx, node.Id, &livekit.RTCNodeMessage{
ParticipantKey: s.roomParticipantKey(req.Room),
err := s.writeRoomMessage(ctx, req.Room, "", &livekit.RTCNodeMessage{
Message: &livekit.RTCNodeMessage_SendData{
SendData: req,
},
@@ -223,13 +209,7 @@ func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat
room.Metadata = req.Metadata
node, err := s.router.GetNodeForRoom(ctx, req.Room)
if err != nil {
return nil, err
}
err = s.router.WriteRTCNodeMessage(ctx, node.Id, &livekit.RTCNodeMessage{
ParticipantKey: s.roomParticipantKey(req.Room),
err = s.writeRoomMessage(ctx, req.Room, "", &livekit.RTCNodeMessage{
Message: &livekit.RTCNodeMessage_UpdateRoomMetadata{
UpdateRoomMetadata: req,
},
@@ -241,7 +221,7 @@ func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat
return room, nil
}
func (s *RoomService) writeMessage(ctx context.Context, room, identity string, msg *livekit.RTCNodeMessage) error {
func (s *RoomService) writeParticipantMessage(ctx context.Context, room, identity string, msg *livekit.RTCNodeMessage) error {
if err := EnsureAdminPermission(ctx, room); err != nil {
return twirpAuthError(err)
}
@@ -251,9 +231,20 @@ func (s *RoomService) writeMessage(ctx context.Context, room, identity string, m
return err
}
return s.router.WriteRTCMessage(ctx, room, identity, msg)
return s.router.WriteParticipantRTC(ctx, room, identity, msg)
}
func (s *RoomService) roomParticipantKey(room string) string {
return room + "|"
func (s *RoomService) writeRoomMessage(ctx context.Context, room, identity string, msg *livekit.RTCNodeMessage) error {
if err := EnsureAdminPermission(ctx, room); err != nil {
return twirpAuthError(err)
}
if identity != "" {
_, err := s.roomStore.LoadParticipant(ctx, room, identity)
if err != nil {
return err
}
}
return s.router.WriteRoomRTC(ctx, room, identity, msg)
}
+2 -2
View File
@@ -20,14 +20,14 @@ import (
)
type RTCService struct {
router routing.Router
router routing.MessageRouter
roomAllocator RoomAllocator
upgrader websocket.Upgrader
currentNode routing.LocalNode
isDev bool
}
func NewRTCService(conf *config.Config, ra RoomAllocator, router routing.Router, currentNode routing.LocalNode) *RTCService {
func NewRTCService(conf *config.Config, ra RoomAllocator, router routing.MessageRouter, currentNode routing.LocalNode) *RTCService {
s := &RTCService{
router: router,
roomAllocator: ra,
+9 -9
View File
@@ -30,7 +30,7 @@ type LivekitServer struct {
httpServer *http.Server
promServer *http.Server
router routing.Router
roomManager *LocalRoomManager
roomManager *RoomManager
turnServer *turn.Server
currentNode routing.LocalNode
running utils.AtomicFlag
@@ -44,7 +44,7 @@ func NewLivekitServer(conf *config.Config,
rtcService *RTCService,
keyProvider auth.KeyProvider,
router routing.Router,
roomManager *LocalRoomManager,
roomManager *RoomManager,
turnServer *turn.Server,
currentNode routing.LocalNode,
) (s *LivekitServer, err error) {
@@ -61,7 +61,7 @@ func NewLivekitServer(conf *config.Config,
}
middlewares := []negroni.Handler{
// always the first
// always first
negroni.NewRecovery(),
}
if keyProvider != nil {
@@ -230,15 +230,15 @@ func (s *LivekitServer) Stop(force bool) {
<-s.closedChan
}
func (s *LivekitServer) RoomManager() RoomManager {
func (s *LivekitServer) RoomManager() *RoomManager {
return s.roomManager
}
func (s *LivekitServer) debugGoroutines(w http.ResponseWriter, r *http.Request) {
func (s *LivekitServer) debugGoroutines(w http.ResponseWriter, _ *http.Request) {
_ = pprof.Lookup("goroutine").WriteTo(w, 2)
}
func (s *LivekitServer) debugInfo(w http.ResponseWriter, r *http.Request) {
func (s *LivekitServer) debugInfo(w http.ResponseWriter, _ *http.Request) {
s.roomManager.lock.RLock()
info := make([]map[string]interface{}, 0, len(s.roomManager.rooms))
for _, room := range s.roomManager.rooms {
@@ -255,19 +255,19 @@ func (s *LivekitServer) debugInfo(w http.ResponseWriter, r *http.Request) {
}
}
func (s *LivekitServer) healthCheck(w http.ResponseWriter, r *http.Request) {
func (s *LivekitServer) healthCheck(w http.ResponseWriter, _ *http.Request) {
var updatedAt time.Time
if s.Node().Stats != nil {
updatedAt = time.Unix(s.Node().Stats.UpdatedAt, 0)
}
if time.Now().Sub(updatedAt) > 4*time.Second {
w.WriteHeader(http.StatusNotAcceptable)
w.Write([]byte(fmt.Sprintf("Not Ready\nNode Updated At %s", updatedAt)))
_, _ = w.Write([]byte(fmt.Sprintf("Not Ready\nNode Updated At %s", updatedAt)))
return
}
w.WriteHeader(http.StatusOK)
w.Write([]byte([]byte("OK")))
_, _ = w.Write([]byte("OK"))
}
// worker to perform periodic tasks per node
+2 -2
View File
@@ -14,7 +14,6 @@ import (
"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/logger"
livekit "github.com/livekit/protocol/proto"
"github.com/livekit/protocol/utils"
"github.com/livekit/protocol/webhook"
@@ -28,9 +27,11 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
createRedisClient,
createMessageBus,
createStore,
wire.Bind(new(RORoomStore), new(RoomStore)),
createKeyProvider,
createWebhookNotifier,
routing.CreateRouter,
wire.Bind(new(routing.MessageRouter), new(routing.Router)),
telemetry.NewTelemetryService,
NewRecordingService,
NewRoomAllocator,
@@ -39,7 +40,6 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
NewLocalRoomManager,
newTurnAuthHandler,
NewTurnServer,
wire.Bind(new(livekit.RoomService), new(*RoomService)),
NewLivekitServer,
)
return &LivekitServer{}, nil
+3 -4
View File
@@ -1,8 +1,7 @@
// Code generated by Wire. DO NOT EDIT.
//go:generate go run github.com/google/wire/cmd/wire
//go:build !wireinject
// +build !wireinject
//+build !wireinject
package service
@@ -50,7 +49,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
telemetryService := telemetry.NewTelemetryService(notifier)
recordingService := NewRecordingService(messageBus, telemetryService)
rtcService := NewRTCService(conf, roomAllocator, router, currentNode)
localRoomManager, err := NewLocalRoomManager(conf, roomStore, router, currentNode, telemetryService)
roomManager, err := NewLocalRoomManager(conf, roomStore, currentNode, router, telemetryService)
if err != nil {
return nil, err
}
@@ -59,7 +58,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
if err != nil {
return nil, err
}
livekitServer, err := NewLivekitServer(conf, roomService, recordingService, rtcService, keyProvider, router, localRoomManager, server, currentNode)
livekitServer, err := NewLivekitServer(conf, roomService, recordingService, rtcService, keyProvider, router, roomManager, server, currentNode)
if err != nil {
return nil, err
}