From 95e29d3766a098aaa3c6be3ead75fc57b64133e4 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Mon, 15 Nov 2021 13:25:50 -0800 Subject: [PATCH] Interface updates (#194) * update interfaces, a bit of cleaning * regenerate * return interface for RoomService * export packetBufferSize * update router interface * move participant key into router * change locks back * read only room store * fix server rm locks * update SendJoinResponse * clean up imports * update room messaging * regenerate --- pkg/routing/interfaces.go | 20 +- pkg/routing/localrouter.go | 9 +- pkg/routing/redisrouter.go | 15 +- pkg/routing/routingfakes/fake_router.go | 381 ++++++++++--------- pkg/rtc/config.go | 6 +- pkg/rtc/mediatrack.go | 2 +- pkg/rtc/participant.go | 4 +- pkg/rtc/room.go | 22 +- pkg/rtc/types/interfaces.go | 6 +- pkg/rtc/types/typesfakes/fake_participant.go | 16 +- pkg/service/interfaces.go | 26 +- pkg/service/roommanager.go | 72 ++-- pkg/service/roomservice.go | 61 ++- pkg/service/rtcservice.go | 4 +- pkg/service/server.go | 18 +- pkg/service/wire.go | 4 +- pkg/service/wire_gen.go | 7 +- 17 files changed, 340 insertions(+), 333 deletions(-) diff --git a/pkg/routing/interfaces.go b/pkg/routing/interfaces.go index 550384e7f..0ed4cd950 100644 --- a/pkg/routing/interfaces.go +++ b/pkg/routing/interfaces.go @@ -44,33 +44,37 @@ type RTCMessageCallback func(ctx context.Context, roomName, identity string, msg // Router allows multiple nodes to coordinate the participant session //counterfeiter:generate . Router type Router interface { + MessageRouter + RegisterNode() error UnregisterNode() error RemoveDeadNodes() error - GetNode(nodeId string) (*livekit.Node, error) ListNodes() ([]*livekit.Node, error) GetNodeForRoom(ctx context.Context, roomName string) (*livekit.Node, error) SetNodeForRoom(ctx context.Context, roomName, nodeId string) error ClearRoomState(ctx context.Context, roomName string) error + Start() error + Drain() + Stop() +} + +type MessageRouter interface { // StartParticipantSignal participant signal connection is ready to start StartParticipantSignal(ctx context.Context, roomName string, pi ParticipantInit) (connectionId string, reqSink MessageSink, resSource MessageSource, err error) - // WriteRTCMessage sends a message to the RTC node - WriteRTCMessage(ctx context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) error - WriteRTCNodeMessage(ctx context.Context, nodeID string, msg *livekit.RTCNodeMessage) error + // Write a message to a participant, room, or node + WriteParticipantRTC(ctx context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) error + WriteRoomRTC(ctx context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) error + WriteNodeRTC(ctx context.Context, nodeID string, msg *livekit.RTCNodeMessage) error // OnNewParticipantRTC is called to start a new participant's RTC connection OnNewParticipantRTC(callback NewParticipantCallback) // OnRTCMessage is called to execute actions on the RTC node OnRTCMessage(callback RTCMessageCallback) - - Start() error - Drain() - Stop() } func CreateRouter(conf *config.Config, rc *redis.Client, node LocalNode) Router { diff --git a/pkg/routing/localrouter.go b/pkg/routing/localrouter.go index 20de1e2c5..46b91716c 100644 --- a/pkg/routing/localrouter.go +++ b/pkg/routing/localrouter.go @@ -110,7 +110,7 @@ func (r *LocalRouter) StartParticipantSignal(ctx context.Context, roomName strin return pi.Identity, reqChan, resChan, nil } -func (r *LocalRouter) WriteRTCMessage(ctx context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) error { +func (r *LocalRouter) WriteParticipantRTC(ctx context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) error { if r.rtcMessageChan.IsClosed() { // create a new one r.rtcMessageChan = NewMessageChannel() @@ -119,7 +119,12 @@ func (r *LocalRouter) WriteRTCMessage(ctx context.Context, roomName, identity st return r.writeRTCMessage(r.rtcMessageChan, msg) } -func (r *LocalRouter) WriteRTCNodeMessage(ctx context.Context, nodeID string, msg *livekit.RTCNodeMessage) error { +func (r *LocalRouter) WriteRoomRTC(ctx context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) error { + msg.ParticipantKey = participantKey(roomName, identity) + return r.WriteNodeRTC(ctx, r.currentNode.Id, msg) +} + +func (r *LocalRouter) WriteNodeRTC(ctx context.Context, nodeID string, msg *livekit.RTCNodeMessage) error { if r.rtcMessageChan.IsClosed() { // create a new one r.rtcMessageChan = NewMessageChannel() diff --git a/pkg/routing/redisrouter.go b/pkg/routing/redisrouter.go index b786efdbb..151ff6b9a 100644 --- a/pkg/routing/redisrouter.go +++ b/pkg/routing/redisrouter.go @@ -168,7 +168,7 @@ func (r *RedisRouter) StartParticipantSignal(ctx context.Context, roomName strin return connectionId, sink, resChan, nil } -func (r *RedisRouter) WriteRTCMessage(ctx context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) error { +func (r *RedisRouter) WriteParticipantRTC(ctx context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) error { pkey := participantKey(roomName, identity) rtcNode, err := r.getParticipantRTCNode(pkey) if err != nil { @@ -180,7 +180,16 @@ func (r *RedisRouter) WriteRTCMessage(ctx context.Context, roomName, identity st return r.writeRTCMessage(rtcSink, msg) } -func (r *RedisRouter) WriteRTCNodeMessage(ctx context.Context, rtcNodeID string, msg *livekit.RTCNodeMessage) error { +func (r *RedisRouter) WriteRoomRTC(ctx context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) error { + node, err := r.GetNodeForRoom(ctx, roomName) + if err != nil { + return err + } + msg.ParticipantKey = participantKey(roomName, identity) + return r.WriteNodeRTC(ctx, node.Id, msg) +} + +func (r *RedisRouter) WriteNodeRTC(ctx context.Context, rtcNodeID string, msg *livekit.RTCNodeMessage) error { rtcSink := NewRTCNodeSink(r.rc, rtcNodeID, msg.ParticipantKey) return r.writeRTCMessage(rtcSink, msg) } @@ -319,7 +328,7 @@ func (r *RedisRouter) statsWorker() { // update periodically seconds select { case <-time.After(statsUpdateInterval): - r.WriteRTCNodeMessage(context.Background(), r.currentNode.Id, &livekit.RTCNodeMessage{ + _ = r.WriteNodeRTC(context.Background(), r.currentNode.Id, &livekit.RTCNodeMessage{ Message: &livekit.RTCNodeMessage_KeepAlive{}, }) case <-r.ctx.Done(): diff --git a/pkg/routing/routingfakes/fake_router.go b/pkg/routing/routingfakes/fake_router.go index 699e2d907..14cee43fa 100644 --- a/pkg/routing/routingfakes/fake_router.go +++ b/pkg/routing/routingfakes/fake_router.go @@ -26,19 +26,6 @@ type FakeRouter struct { drainMutex sync.RWMutex drainArgsForCall []struct { } - GetNodeStub func(string) (*livekit.Node, error) - getNodeMutex sync.RWMutex - getNodeArgsForCall []struct { - arg1 string - } - getNodeReturns struct { - result1 *livekit.Node - result2 error - } - getNodeReturnsOnCall map[int]struct { - result1 *livekit.Node - result2 error - } GetNodeForRoomStub func(context.Context, string) (*livekit.Node, error) getNodeForRoomMutex sync.RWMutex getNodeForRoomArgsForCall []struct { @@ -151,31 +138,45 @@ type FakeRouter struct { unregisterNodeReturnsOnCall map[int]struct { result1 error } - WriteRTCMessageStub func(context.Context, string, string, *livekit.RTCNodeMessage) error - writeRTCMessageMutex sync.RWMutex - writeRTCMessageArgsForCall []struct { + WriteNodeRTCStub func(context.Context, string, *livekit.RTCNodeMessage) error + writeNodeRTCMutex sync.RWMutex + writeNodeRTCArgsForCall []struct { + arg1 context.Context + arg2 string + arg3 *livekit.RTCNodeMessage + } + writeNodeRTCReturns struct { + result1 error + } + writeNodeRTCReturnsOnCall map[int]struct { + result1 error + } + WriteParticipantRTCStub func(context.Context, string, string, *livekit.RTCNodeMessage) error + writeParticipantRTCMutex sync.RWMutex + writeParticipantRTCArgsForCall []struct { arg1 context.Context arg2 string arg3 string arg4 *livekit.RTCNodeMessage } - writeRTCMessageReturns struct { + writeParticipantRTCReturns struct { result1 error } - writeRTCMessageReturnsOnCall map[int]struct { + writeParticipantRTCReturnsOnCall map[int]struct { result1 error } - WriteRTCNodeMessageStub func(context.Context, string, *livekit.RTCNodeMessage) error - writeRTCNodeMessageMutex sync.RWMutex - writeRTCNodeMessageArgsForCall []struct { + WriteRoomRTCStub func(context.Context, string, string, *livekit.RTCNodeMessage) error + writeRoomRTCMutex sync.RWMutex + writeRoomRTCArgsForCall []struct { arg1 context.Context arg2 string - arg3 *livekit.RTCNodeMessage + arg3 string + arg4 *livekit.RTCNodeMessage } - writeRTCNodeMessageReturns struct { + writeRoomRTCReturns struct { result1 error } - writeRTCNodeMessageReturnsOnCall map[int]struct { + writeRoomRTCReturnsOnCall map[int]struct { result1 error } invocations map[string][][]interface{} @@ -268,70 +269,6 @@ func (fake *FakeRouter) DrainCalls(stub func()) { fake.DrainStub = stub } -func (fake *FakeRouter) GetNode(arg1 string) (*livekit.Node, error) { - fake.getNodeMutex.Lock() - ret, specificReturn := fake.getNodeReturnsOnCall[len(fake.getNodeArgsForCall)] - fake.getNodeArgsForCall = append(fake.getNodeArgsForCall, struct { - arg1 string - }{arg1}) - stub := fake.GetNodeStub - fakeReturns := fake.getNodeReturns - fake.recordInvocation("GetNode", []interface{}{arg1}) - fake.getNodeMutex.Unlock() - if stub != nil { - return stub(arg1) - } - if specificReturn { - return ret.result1, ret.result2 - } - return fakeReturns.result1, fakeReturns.result2 -} - -func (fake *FakeRouter) GetNodeCallCount() int { - fake.getNodeMutex.RLock() - defer fake.getNodeMutex.RUnlock() - return len(fake.getNodeArgsForCall) -} - -func (fake *FakeRouter) GetNodeCalls(stub func(string) (*livekit.Node, error)) { - fake.getNodeMutex.Lock() - defer fake.getNodeMutex.Unlock() - fake.GetNodeStub = stub -} - -func (fake *FakeRouter) GetNodeArgsForCall(i int) string { - fake.getNodeMutex.RLock() - defer fake.getNodeMutex.RUnlock() - argsForCall := fake.getNodeArgsForCall[i] - return argsForCall.arg1 -} - -func (fake *FakeRouter) GetNodeReturns(result1 *livekit.Node, result2 error) { - fake.getNodeMutex.Lock() - defer fake.getNodeMutex.Unlock() - fake.GetNodeStub = nil - fake.getNodeReturns = struct { - result1 *livekit.Node - result2 error - }{result1, result2} -} - -func (fake *FakeRouter) GetNodeReturnsOnCall(i int, result1 *livekit.Node, result2 error) { - fake.getNodeMutex.Lock() - defer fake.getNodeMutex.Unlock() - fake.GetNodeStub = nil - if fake.getNodeReturnsOnCall == nil { - fake.getNodeReturnsOnCall = make(map[int]struct { - result1 *livekit.Node - result2 error - }) - } - fake.getNodeReturnsOnCall[i] = struct { - result1 *livekit.Node - result2 error - }{result1, result2} -} - func (fake *FakeRouter) GetNodeForRoom(arg1 context.Context, arg2 string) (*livekit.Node, error) { fake.getNodeForRoomMutex.Lock() ret, specificReturn := fake.getNodeForRoomReturnsOnCall[len(fake.getNodeForRoomArgsForCall)] @@ -888,82 +825,18 @@ func (fake *FakeRouter) UnregisterNodeReturnsOnCall(i int, result1 error) { }{result1} } -func (fake *FakeRouter) WriteRTCMessage(arg1 context.Context, arg2 string, arg3 string, arg4 *livekit.RTCNodeMessage) error { - fake.writeRTCMessageMutex.Lock() - ret, specificReturn := fake.writeRTCMessageReturnsOnCall[len(fake.writeRTCMessageArgsForCall)] - fake.writeRTCMessageArgsForCall = append(fake.writeRTCMessageArgsForCall, struct { - arg1 context.Context - arg2 string - arg3 string - arg4 *livekit.RTCNodeMessage - }{arg1, arg2, arg3, arg4}) - stub := fake.WriteRTCMessageStub - fakeReturns := fake.writeRTCMessageReturns - fake.recordInvocation("WriteRTCMessage", []interface{}{arg1, arg2, arg3, arg4}) - fake.writeRTCMessageMutex.Unlock() - if stub != nil { - return stub(arg1, arg2, arg3, arg4) - } - if specificReturn { - return ret.result1 - } - return fakeReturns.result1 -} - -func (fake *FakeRouter) WriteRTCMessageCallCount() int { - fake.writeRTCMessageMutex.RLock() - defer fake.writeRTCMessageMutex.RUnlock() - return len(fake.writeRTCMessageArgsForCall) -} - -func (fake *FakeRouter) WriteRTCMessageCalls(stub func(context.Context, string, string, *livekit.RTCNodeMessage) error) { - fake.writeRTCMessageMutex.Lock() - defer fake.writeRTCMessageMutex.Unlock() - fake.WriteRTCMessageStub = stub -} - -func (fake *FakeRouter) WriteRTCMessageArgsForCall(i int) (context.Context, string, string, *livekit.RTCNodeMessage) { - fake.writeRTCMessageMutex.RLock() - defer fake.writeRTCMessageMutex.RUnlock() - argsForCall := fake.writeRTCMessageArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4 -} - -func (fake *FakeRouter) WriteRTCMessageReturns(result1 error) { - fake.writeRTCMessageMutex.Lock() - defer fake.writeRTCMessageMutex.Unlock() - fake.WriteRTCMessageStub = nil - fake.writeRTCMessageReturns = struct { - result1 error - }{result1} -} - -func (fake *FakeRouter) WriteRTCMessageReturnsOnCall(i int, result1 error) { - fake.writeRTCMessageMutex.Lock() - defer fake.writeRTCMessageMutex.Unlock() - fake.WriteRTCMessageStub = nil - if fake.writeRTCMessageReturnsOnCall == nil { - fake.writeRTCMessageReturnsOnCall = make(map[int]struct { - result1 error - }) - } - fake.writeRTCMessageReturnsOnCall[i] = struct { - result1 error - }{result1} -} - -func (fake *FakeRouter) WriteRTCNodeMessage(arg1 context.Context, arg2 string, arg3 *livekit.RTCNodeMessage) error { - fake.writeRTCNodeMessageMutex.Lock() - ret, specificReturn := fake.writeRTCNodeMessageReturnsOnCall[len(fake.writeRTCNodeMessageArgsForCall)] - fake.writeRTCNodeMessageArgsForCall = append(fake.writeRTCNodeMessageArgsForCall, struct { +func (fake *FakeRouter) WriteNodeRTC(arg1 context.Context, arg2 string, arg3 *livekit.RTCNodeMessage) error { + fake.writeNodeRTCMutex.Lock() + ret, specificReturn := fake.writeNodeRTCReturnsOnCall[len(fake.writeNodeRTCArgsForCall)] + fake.writeNodeRTCArgsForCall = append(fake.writeNodeRTCArgsForCall, struct { arg1 context.Context arg2 string arg3 *livekit.RTCNodeMessage }{arg1, arg2, arg3}) - stub := fake.WriteRTCNodeMessageStub - fakeReturns := fake.writeRTCNodeMessageReturns - fake.recordInvocation("WriteRTCNodeMessage", []interface{}{arg1, arg2, arg3}) - fake.writeRTCNodeMessageMutex.Unlock() + stub := fake.WriteNodeRTCStub + fakeReturns := fake.writeNodeRTCReturns + fake.recordInvocation("WriteNodeRTC", []interface{}{arg1, arg2, arg3}) + fake.writeNodeRTCMutex.Unlock() if stub != nil { return stub(arg1, arg2, arg3) } @@ -973,44 +846,172 @@ func (fake *FakeRouter) WriteRTCNodeMessage(arg1 context.Context, arg2 string, a return fakeReturns.result1 } -func (fake *FakeRouter) WriteRTCNodeMessageCallCount() int { - fake.writeRTCNodeMessageMutex.RLock() - defer fake.writeRTCNodeMessageMutex.RUnlock() - return len(fake.writeRTCNodeMessageArgsForCall) +func (fake *FakeRouter) WriteNodeRTCCallCount() int { + fake.writeNodeRTCMutex.RLock() + defer fake.writeNodeRTCMutex.RUnlock() + return len(fake.writeNodeRTCArgsForCall) } -func (fake *FakeRouter) WriteRTCNodeMessageCalls(stub func(context.Context, string, *livekit.RTCNodeMessage) error) { - fake.writeRTCNodeMessageMutex.Lock() - defer fake.writeRTCNodeMessageMutex.Unlock() - fake.WriteRTCNodeMessageStub = stub +func (fake *FakeRouter) WriteNodeRTCCalls(stub func(context.Context, string, *livekit.RTCNodeMessage) error) { + fake.writeNodeRTCMutex.Lock() + defer fake.writeNodeRTCMutex.Unlock() + fake.WriteNodeRTCStub = stub } -func (fake *FakeRouter) WriteRTCNodeMessageArgsForCall(i int) (context.Context, string, *livekit.RTCNodeMessage) { - fake.writeRTCNodeMessageMutex.RLock() - defer fake.writeRTCNodeMessageMutex.RUnlock() - argsForCall := fake.writeRTCNodeMessageArgsForCall[i] +func (fake *FakeRouter) WriteNodeRTCArgsForCall(i int) (context.Context, string, *livekit.RTCNodeMessage) { + fake.writeNodeRTCMutex.RLock() + defer fake.writeNodeRTCMutex.RUnlock() + argsForCall := fake.writeNodeRTCArgsForCall[i] return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 } -func (fake *FakeRouter) WriteRTCNodeMessageReturns(result1 error) { - fake.writeRTCNodeMessageMutex.Lock() - defer fake.writeRTCNodeMessageMutex.Unlock() - fake.WriteRTCNodeMessageStub = nil - fake.writeRTCNodeMessageReturns = struct { +func (fake *FakeRouter) WriteNodeRTCReturns(result1 error) { + fake.writeNodeRTCMutex.Lock() + defer fake.writeNodeRTCMutex.Unlock() + fake.WriteNodeRTCStub = nil + fake.writeNodeRTCReturns = struct { result1 error }{result1} } -func (fake *FakeRouter) WriteRTCNodeMessageReturnsOnCall(i int, result1 error) { - fake.writeRTCNodeMessageMutex.Lock() - defer fake.writeRTCNodeMessageMutex.Unlock() - fake.WriteRTCNodeMessageStub = nil - if fake.writeRTCNodeMessageReturnsOnCall == nil { - fake.writeRTCNodeMessageReturnsOnCall = make(map[int]struct { +func (fake *FakeRouter) WriteNodeRTCReturnsOnCall(i int, result1 error) { + fake.writeNodeRTCMutex.Lock() + defer fake.writeNodeRTCMutex.Unlock() + fake.WriteNodeRTCStub = nil + if fake.writeNodeRTCReturnsOnCall == nil { + fake.writeNodeRTCReturnsOnCall = make(map[int]struct { result1 error }) } - fake.writeRTCNodeMessageReturnsOnCall[i] = struct { + fake.writeNodeRTCReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeRouter) WriteParticipantRTC(arg1 context.Context, arg2 string, arg3 string, arg4 *livekit.RTCNodeMessage) error { + fake.writeParticipantRTCMutex.Lock() + ret, specificReturn := fake.writeParticipantRTCReturnsOnCall[len(fake.writeParticipantRTCArgsForCall)] + fake.writeParticipantRTCArgsForCall = append(fake.writeParticipantRTCArgsForCall, struct { + arg1 context.Context + arg2 string + arg3 string + arg4 *livekit.RTCNodeMessage + }{arg1, arg2, arg3, arg4}) + stub := fake.WriteParticipantRTCStub + fakeReturns := fake.writeParticipantRTCReturns + fake.recordInvocation("WriteParticipantRTC", []interface{}{arg1, arg2, arg3, arg4}) + fake.writeParticipantRTCMutex.Unlock() + if stub != nil { + return stub(arg1, arg2, arg3, arg4) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeRouter) WriteParticipantRTCCallCount() int { + fake.writeParticipantRTCMutex.RLock() + defer fake.writeParticipantRTCMutex.RUnlock() + return len(fake.writeParticipantRTCArgsForCall) +} + +func (fake *FakeRouter) WriteParticipantRTCCalls(stub func(context.Context, string, string, *livekit.RTCNodeMessage) error) { + fake.writeParticipantRTCMutex.Lock() + defer fake.writeParticipantRTCMutex.Unlock() + fake.WriteParticipantRTCStub = stub +} + +func (fake *FakeRouter) WriteParticipantRTCArgsForCall(i int) (context.Context, string, string, *livekit.RTCNodeMessage) { + fake.writeParticipantRTCMutex.RLock() + defer fake.writeParticipantRTCMutex.RUnlock() + argsForCall := fake.writeParticipantRTCArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4 +} + +func (fake *FakeRouter) WriteParticipantRTCReturns(result1 error) { + fake.writeParticipantRTCMutex.Lock() + defer fake.writeParticipantRTCMutex.Unlock() + fake.WriteParticipantRTCStub = nil + fake.writeParticipantRTCReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeRouter) WriteParticipantRTCReturnsOnCall(i int, result1 error) { + fake.writeParticipantRTCMutex.Lock() + defer fake.writeParticipantRTCMutex.Unlock() + fake.WriteParticipantRTCStub = nil + if fake.writeParticipantRTCReturnsOnCall == nil { + fake.writeParticipantRTCReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.writeParticipantRTCReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeRouter) WriteRoomRTC(arg1 context.Context, arg2 string, arg3 string, arg4 *livekit.RTCNodeMessage) error { + fake.writeRoomRTCMutex.Lock() + ret, specificReturn := fake.writeRoomRTCReturnsOnCall[len(fake.writeRoomRTCArgsForCall)] + fake.writeRoomRTCArgsForCall = append(fake.writeRoomRTCArgsForCall, struct { + arg1 context.Context + arg2 string + arg3 string + arg4 *livekit.RTCNodeMessage + }{arg1, arg2, arg3, arg4}) + stub := fake.WriteRoomRTCStub + fakeReturns := fake.writeRoomRTCReturns + fake.recordInvocation("WriteRoomRTC", []interface{}{arg1, arg2, arg3, arg4}) + fake.writeRoomRTCMutex.Unlock() + if stub != nil { + return stub(arg1, arg2, arg3, arg4) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeRouter) WriteRoomRTCCallCount() int { + fake.writeRoomRTCMutex.RLock() + defer fake.writeRoomRTCMutex.RUnlock() + return len(fake.writeRoomRTCArgsForCall) +} + +func (fake *FakeRouter) WriteRoomRTCCalls(stub func(context.Context, string, string, *livekit.RTCNodeMessage) error) { + fake.writeRoomRTCMutex.Lock() + defer fake.writeRoomRTCMutex.Unlock() + fake.WriteRoomRTCStub = stub +} + +func (fake *FakeRouter) WriteRoomRTCArgsForCall(i int) (context.Context, string, string, *livekit.RTCNodeMessage) { + fake.writeRoomRTCMutex.RLock() + defer fake.writeRoomRTCMutex.RUnlock() + argsForCall := fake.writeRoomRTCArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4 +} + +func (fake *FakeRouter) WriteRoomRTCReturns(result1 error) { + fake.writeRoomRTCMutex.Lock() + defer fake.writeRoomRTCMutex.Unlock() + fake.WriteRoomRTCStub = nil + fake.writeRoomRTCReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeRouter) WriteRoomRTCReturnsOnCall(i int, result1 error) { + fake.writeRoomRTCMutex.Lock() + defer fake.writeRoomRTCMutex.Unlock() + fake.WriteRoomRTCStub = nil + if fake.writeRoomRTCReturnsOnCall == nil { + fake.writeRoomRTCReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.writeRoomRTCReturnsOnCall[i] = struct { result1 error }{result1} } @@ -1022,8 +1023,6 @@ func (fake *FakeRouter) Invocations() map[string][][]interface{} { defer fake.clearRoomStateMutex.RUnlock() fake.drainMutex.RLock() defer fake.drainMutex.RUnlock() - fake.getNodeMutex.RLock() - defer fake.getNodeMutex.RUnlock() fake.getNodeForRoomMutex.RLock() defer fake.getNodeForRoomMutex.RUnlock() fake.listNodesMutex.RLock() @@ -1046,10 +1045,12 @@ func (fake *FakeRouter) Invocations() map[string][][]interface{} { defer fake.stopMutex.RUnlock() fake.unregisterNodeMutex.RLock() defer fake.unregisterNodeMutex.RUnlock() - fake.writeRTCMessageMutex.RLock() - defer fake.writeRTCMessageMutex.RUnlock() - fake.writeRTCNodeMessageMutex.RLock() - defer fake.writeRTCNodeMessageMutex.RUnlock() + fake.writeNodeRTCMutex.RLock() + defer fake.writeNodeRTCMutex.RUnlock() + fake.writeParticipantRTCMutex.RLock() + defer fake.writeParticipantRTCMutex.RUnlock() + fake.writeRoomRTCMutex.RLock() + defer fake.writeRoomRTCMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value diff --git a/pkg/rtc/config.go b/pkg/rtc/config.go index ab93b99ae..c781287ab 100644 --- a/pkg/rtc/config.go +++ b/pkg/rtc/config.go @@ -4,12 +4,12 @@ import ( "errors" "net" - "github.com/livekit/livekit-server/pkg/sfu/buffer" "github.com/pion/ice/v2" "github.com/pion/webrtc/v3" "github.com/livekit/livekit-server/pkg/config" serverlogger "github.com/livekit/livekit-server/pkg/logger" + "github.com/livekit/livekit-server/pkg/sfu/buffer" ) const ( @@ -28,7 +28,7 @@ type WebRTCConfig struct { } type ReceiverConfig struct { - packetBufferSize int + PacketBufferSize int maxBitrate uint64 } @@ -116,7 +116,7 @@ func NewWebRTCConfig(conf *config.Config, externalIP string) (*WebRTCConfig, err Configuration: c, SettingEngine: s, Receiver: ReceiverConfig{ - packetBufferSize: rtcConf.PacketBufferSize, + PacketBufferSize: rtcConf.PacketBufferSize, maxBitrate: rtcConf.MaxBitrate, }, UDPMux: udpMux, diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index 66a7ec613..67c9af147 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -187,7 +187,7 @@ func (t *MediaTrack) AddSubscriber(sub types.Participant) error { Channels: codec.Channels, SDPFmtpLine: codec.SDPFmtpLine, RTCPFeedback: feedbackTypes, - }, receiver, t.params.BufferFactory, sub.ID(), t.params.ReceiverConfig.packetBufferSize) + }, receiver, t.params.BufferFactory, sub.ID(), t.params.ReceiverConfig.PacketBufferSize) if err != nil { return err } diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 5d68beef8..f37a954f7 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -504,14 +504,14 @@ func (p *ParticipantImpl) RemoveSubscriber(participantId string) { // signal connection methods -func (p *ParticipantImpl) SendJoinResponse(roomInfo *livekit.Room, otherParticipants []types.Participant, iceServers []*livekit.ICEServer) error { +func (p *ParticipantImpl) SendJoinResponse(roomInfo *livekit.Room, otherParticipants []*livekit.ParticipantInfo, iceServers []*livekit.ICEServer) error { // send Join response return p.writeMessage(&livekit.SignalResponse{ Message: &livekit.SignalResponse_Join{ Join: &livekit.JoinResponse{ Room: roomInfo, Participant: p.ToProto(), - OtherParticipants: ToProtoParticipants(otherParticipants), + OtherParticipants: otherParticipants, ServerVersion: version.Version, IceServers: iceServers, // indicates both server and client support subscriber as primary diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index f179442c0..fddf804a8 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -8,7 +8,6 @@ import ( "time" "github.com/go-logr/logr" - "github.com/livekit/livekit-server/pkg/sfu/buffer" "github.com/livekit/protocol/logger" livekit "github.com/livekit/protocol/proto" "google.golang.org/protobuf/proto" @@ -16,6 +15,7 @@ import ( "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" "github.com/livekit/livekit-server/pkg/rtc/types" + "github.com/livekit/livekit-server/pkg/sfu/buffer" "github.com/livekit/livekit-server/pkg/telemetry" "github.com/livekit/livekit-server/pkg/telemetry/prometheus" ) @@ -27,10 +27,15 @@ const ( ) type Room struct { + lock sync.RWMutex + Room *livekit.Room Logger logger.Logger - config WebRTCConfig - lock sync.RWMutex + + config WebRTCConfig + audioConfig *config.AudioConfig + telemetry *telemetry.TelemetryService + // map of identity -> Participant participants map[string]types.Participant participantOpts map[string]*ParticipantOptions @@ -43,11 +48,6 @@ type Room struct { closed chan struct{} closeOnce sync.Once - // for active speaker updates - audioConfig *config.AudioConfig - - telemetry *telemetry.TelemetryService - onParticipantChanged func(p types.Participant) onMetadataUpdate func(metadata string) onClose func() @@ -66,7 +66,7 @@ func NewRoom(room *livekit.Room, config WebRTCConfig, audioConfig *config.AudioC telemetry: telemetry, participants: make(map[string]types.Participant), participantOpts: make(map[string]*ParticipantOptions), - bufferFactory: buffer.NewBufferFactory(config.Receiver.packetBufferSize, logr.Logger{}), + bufferFactory: buffer.NewBufferFactory(config.Receiver.PacketBufferSize, logr.Logger{}), closed: make(chan struct{}), } if r.Room.EmptyTimeout == 0 { @@ -199,10 +199,10 @@ func (r *Room) Join(participant types.Participant, opts *ParticipantOptions, ice r.participantOpts[participant.Identity()] = opts // gather other participants and send join response - otherParticipants := make([]types.Participant, 0, len(r.participants)) + otherParticipants := make([]*livekit.ParticipantInfo, 0, len(r.participants)) for _, p := range r.participants { if p.ID() != participant.ID() && !p.Hidden() { - otherParticipants = append(otherParticipants, p) + otherParticipants = append(otherParticipants, p.ToProto()) } } diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index caa8d6ec9..e04fb3499 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -3,12 +3,12 @@ package types import ( "time" - "github.com/livekit/livekit-server/pkg/sfu" + livekit "github.com/livekit/protocol/proto" "github.com/pion/rtcp" "github.com/pion/webrtc/v3" "github.com/livekit/livekit-server/pkg/routing" - livekit "github.com/livekit/protocol/proto" + "github.com/livekit/livekit-server/pkg/sfu" ) //go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate @@ -48,7 +48,7 @@ type Participant interface { AddICECandidate(candidate webrtc.ICECandidateInit, target livekit.SignalTarget) error AddSubscriber(op Participant) (int, error) RemoveSubscriber(peerId string) - SendJoinResponse(info *livekit.Room, otherParticipants []Participant, iceServers []*livekit.ICEServer) error + SendJoinResponse(info *livekit.Room, otherParticipants []*livekit.ParticipantInfo, iceServers []*livekit.ICEServer) error SendParticipantUpdate(participants []*livekit.ParticipantInfo, updatedAt time.Time) error SendSpeakerUpdate(speakers []*livekit.SpeakerInfo) error SendDataPacket(packet *livekit.DataPacket) error diff --git a/pkg/rtc/types/typesfakes/fake_participant.go b/pkg/rtc/types/typesfakes/fake_participant.go index 61e66614c..6a3803a55 100644 --- a/pkg/rtc/types/typesfakes/fake_participant.go +++ b/pkg/rtc/types/typesfakes/fake_participant.go @@ -363,11 +363,11 @@ type FakeParticipant struct { sendDataPacketReturnsOnCall map[int]struct { result1 error } - SendJoinResponseStub func(*livekit.Room, []types.Participant, []*livekit.ICEServer) error + SendJoinResponseStub func(*livekit.Room, []*livekit.ParticipantInfo, []*livekit.ICEServer) error sendJoinResponseMutex sync.RWMutex sendJoinResponseArgsForCall []struct { arg1 *livekit.Room - arg2 []types.Participant + arg2 []*livekit.ParticipantInfo arg3 []*livekit.ICEServer } sendJoinResponseReturns struct { @@ -2390,10 +2390,10 @@ func (fake *FakeParticipant) SendDataPacketReturnsOnCall(i int, result1 error) { }{result1} } -func (fake *FakeParticipant) SendJoinResponse(arg1 *livekit.Room, arg2 []types.Participant, arg3 []*livekit.ICEServer) error { - var arg2Copy []types.Participant +func (fake *FakeParticipant) SendJoinResponse(arg1 *livekit.Room, arg2 []*livekit.ParticipantInfo, arg3 []*livekit.ICEServer) error { + var arg2Copy []*livekit.ParticipantInfo if arg2 != nil { - arg2Copy = make([]types.Participant, len(arg2)) + arg2Copy = make([]*livekit.ParticipantInfo, len(arg2)) copy(arg2Copy, arg2) } var arg3Copy []*livekit.ICEServer @@ -2405,7 +2405,7 @@ func (fake *FakeParticipant) SendJoinResponse(arg1 *livekit.Room, arg2 []types.P ret, specificReturn := fake.sendJoinResponseReturnsOnCall[len(fake.sendJoinResponseArgsForCall)] fake.sendJoinResponseArgsForCall = append(fake.sendJoinResponseArgsForCall, struct { arg1 *livekit.Room - arg2 []types.Participant + arg2 []*livekit.ParticipantInfo arg3 []*livekit.ICEServer }{arg1, arg2Copy, arg3Copy}) stub := fake.SendJoinResponseStub @@ -2427,13 +2427,13 @@ func (fake *FakeParticipant) SendJoinResponseCallCount() int { return len(fake.sendJoinResponseArgsForCall) } -func (fake *FakeParticipant) SendJoinResponseCalls(stub func(*livekit.Room, []types.Participant, []*livekit.ICEServer) error) { +func (fake *FakeParticipant) SendJoinResponseCalls(stub func(*livekit.Room, []*livekit.ParticipantInfo, []*livekit.ICEServer) error) { fake.sendJoinResponseMutex.Lock() defer fake.sendJoinResponseMutex.Unlock() fake.SendJoinResponseStub = stub } -func (fake *FakeParticipant) SendJoinResponseArgsForCall(i int) (*livekit.Room, []types.Participant, []*livekit.ICEServer) { +func (fake *FakeParticipant) SendJoinResponseArgsForCall(i int) (*livekit.Room, []*livekit.ParticipantInfo, []*livekit.ICEServer) { fake.sendJoinResponseMutex.RLock() defer fake.sendJoinResponseMutex.RUnlock() argsForCall := fake.sendJoinResponseArgsForCall[i] diff --git a/pkg/service/interfaces.go b/pkg/service/interfaces.go index 06e873a0f..51260ffa3 100644 --- a/pkg/service/interfaces.go +++ b/pkg/service/interfaces.go @@ -5,9 +5,6 @@ import ( "time" livekit "github.com/livekit/protocol/proto" - - "github.com/livekit/livekit-server/pkg/routing" - "github.com/livekit/livekit-server/pkg/rtc" ) //go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate @@ -15,31 +12,26 @@ import ( // encapsulates CRUD operations for room settings //counterfeiter:generate . RoomStore type RoomStore interface { - StoreRoom(ctx context.Context, room *livekit.Room) error - LoadRoom(ctx context.Context, name string) (*livekit.Room, error) - ListRooms(ctx context.Context) ([]*livekit.Room, error) - DeleteRoom(ctx context.Context, name string) error + RORoomStore // enable locking on a specific room to prevent race // returns a (lock uuid, error) LockRoom(ctx context.Context, name string, duration time.Duration) (string, error) UnlockRoom(ctx context.Context, name string, uid string) error + StoreRoom(ctx context.Context, room *livekit.Room) error + DeleteRoom(ctx context.Context, name string) error + StoreParticipant(ctx context.Context, roomName string, participant *livekit.ParticipantInfo) error - LoadParticipant(ctx context.Context, roomName, identity string) (*livekit.ParticipantInfo, error) - ListParticipants(ctx context.Context, roomName string) ([]*livekit.ParticipantInfo, error) DeleteParticipant(ctx context.Context, roomName, identity string) error } -type RoomManager interface { - RoomStore +type RORoomStore interface { + LoadRoom(ctx context.Context, name string) (*livekit.Room, error) + ListRooms(ctx context.Context) ([]*livekit.Room, error) - GetRoom(ctx context.Context, roomName string) *rtc.Room - StartSession(ctx context.Context, roomName string, pi routing.ParticipantInit, requestSource routing.MessageSource, responseSink routing.MessageSink) - CleanupRooms() error - CloseIdleRooms() - HasParticipants() bool - Stop() + LoadParticipant(ctx context.Context, roomName, identity string) (*livekit.ParticipantInfo, error) + ListParticipants(ctx context.Context, roomName string) ([]*livekit.ParticipantInfo, error) } type RoomAllocator interface { diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index eeabc725c..d4fa1d468 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -20,37 +20,43 @@ const ( roomPurgeSeconds = 24 * 60 * 60 ) -// LocalRoomManager manages rooms and its interaction with participants. +// RoomManager manages rooms and its interaction with participants. // It's responsible for creating, deleting rooms, as well as running sessions for participants -type LocalRoomManager struct { - RoomStore +type RoomManager struct { + lock sync.RWMutex - lock sync.RWMutex - router routing.Router - currentNode routing.LocalNode - rtcConfig *rtc.WebRTCConfig config *config.Config + rtcConfig *rtc.WebRTCConfig + currentNode routing.LocalNode + router routing.Router + roomStore RoomStore telemetry *telemetry.TelemetryService - rooms map[string]*rtc.Room + + rooms map[string]*rtc.Room } -func NewLocalRoomManager(conf *config.Config, rs RoomStore, router routing.Router, currentNode routing.LocalNode, - telemetry *telemetry.TelemetryService) (*LocalRoomManager, error) { +func NewLocalRoomManager( + conf *config.Config, + roomStore RoomStore, + currentNode routing.LocalNode, + router routing.Router, + telemetry *telemetry.TelemetryService, +) (*RoomManager, error) { rtcConf, err := rtc.NewWebRTCConfig(conf, currentNode.Ip) if err != nil { return nil, err } - r := &LocalRoomManager{ - RoomStore: rs, - lock: sync.RWMutex{}, - rtcConfig: rtcConf, + r := &RoomManager{ config: conf, - router: router, + rtcConfig: rtcConf, currentNode: currentNode, + router: router, + roomStore: roomStore, telemetry: telemetry, - rooms: make(map[string]*rtc.Room), + + rooms: make(map[string]*rtc.Room), } // hook up to router @@ -59,14 +65,14 @@ func NewLocalRoomManager(conf *config.Config, rs RoomStore, router routing.Route return r, nil } -func (r *LocalRoomManager) GetRoom(ctx context.Context, roomName string) *rtc.Room { +func (r *RoomManager) GetRoom(ctx context.Context, roomName string) *rtc.Room { r.lock.RLock() defer r.lock.RUnlock() return r.rooms[roomName] } // DeleteRoom completely deletes all room information, including active sessions, room store, and routing info -func (r *LocalRoomManager) DeleteRoom(ctx context.Context, roomName string) error { +func (r *RoomManager) DeleteRoom(ctx context.Context, roomName string) error { logger.Infow("deleting room state", "room", roomName) r.lock.Lock() delete(r.rooms, roomName) @@ -83,7 +89,7 @@ func (r *LocalRoomManager) DeleteRoom(ctx context.Context, roomName string) erro // also delete room from db go func() { defer wg.Done() - err2 = r.RoomStore.DeleteRoom(ctx, roomName) + err2 = r.roomStore.DeleteRoom(ctx, roomName) }() wg.Wait() @@ -95,10 +101,10 @@ func (r *LocalRoomManager) DeleteRoom(ctx context.Context, roomName string) erro } // CleanupRooms cleans up after old rooms that have been around for awhile -func (r *LocalRoomManager) CleanupRooms() error { +func (r *RoomManager) CleanupRooms() error { // cleanup rooms that have been left for over a day ctx := context.Background() - rooms, err := r.ListRooms(ctx) + rooms, err := r.roomStore.ListRooms(ctx) if err != nil { return err } @@ -114,7 +120,7 @@ func (r *LocalRoomManager) CleanupRooms() error { return nil } -func (r *LocalRoomManager) CloseIdleRooms() { +func (r *RoomManager) CloseIdleRooms() { r.lock.RLock() rooms := make([]*rtc.Room, 0, len(r.rooms)) for _, rm := range r.rooms { @@ -127,7 +133,7 @@ func (r *LocalRoomManager) CloseIdleRooms() { } } -func (r *LocalRoomManager) HasParticipants() bool { +func (r *RoomManager) HasParticipants() bool { r.lock.RLock() defer r.lock.RUnlock() @@ -139,7 +145,7 @@ func (r *LocalRoomManager) HasParticipants() bool { return false } -func (r *LocalRoomManager) Stop() { +func (r *RoomManager) Stop() { // disconnect all clients r.lock.RLock() rooms := make([]*rtc.Room, 0, len(r.rooms)) @@ -166,7 +172,7 @@ func (r *LocalRoomManager) Stop() { } // StartSession starts WebRTC session when a new participant is connected, takes place on RTC node -func (r *LocalRoomManager) StartSession(ctx context.Context, roomName string, pi routing.ParticipantInit, requestSource routing.MessageSource, responseSink routing.MessageSink) { +func (r *RoomManager) StartSession(ctx context.Context, roomName string, pi routing.ParticipantInit, requestSource routing.MessageSource, responseSink routing.MessageSink) { room, err := r.getOrCreateRoom(ctx, roomName) if err != nil { logger.Errorw("could not create room", err, "room", roomName) @@ -261,7 +267,7 @@ func (r *LocalRoomManager) StartSession(ctx context.Context, roomName string, pi } // create the actual room object, to be used on RTC node -func (r *LocalRoomManager) getOrCreateRoom(ctx context.Context, roomName string) (*rtc.Room, error) { +func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName string) (*rtc.Room, error) { r.lock.RLock() room := r.rooms[roomName] r.lock.RUnlock() @@ -271,7 +277,7 @@ func (r *LocalRoomManager) getOrCreateRoom(ctx context.Context, roomName string) } // create new room, get details first - ri, err := r.LoadRoom(ctx, roomName) + ri, err := r.roomStore.LoadRoom(ctx, roomName) if err != nil { return nil, err } @@ -289,7 +295,7 @@ func (r *LocalRoomManager) getOrCreateRoom(ctx context.Context, roomName string) logger.Infow("room closed") }) room.OnMetadataUpdate(func(metadata string) { - err := r.StoreRoom(ctx, room.Room) + err := r.roomStore.StoreRoom(ctx, room.Room) if err != nil { logger.Errorw("could not handle metadata update", err) } @@ -297,9 +303,9 @@ func (r *LocalRoomManager) getOrCreateRoom(ctx context.Context, roomName string) room.OnParticipantChanged(func(p types.Participant) { var err error if p.State() == livekit.ParticipantInfo_DISCONNECTED { - err = r.DeleteParticipant(ctx, roomName, p.Identity()) + err = r.roomStore.DeleteParticipant(ctx, roomName, p.Identity()) } else { - err = r.StoreParticipant(ctx, roomName, p.ToProto()) + err = r.roomStore.StoreParticipant(ctx, roomName, p.ToProto()) } if err != nil { logger.Errorw("could not handle participant change", err) @@ -313,7 +319,7 @@ func (r *LocalRoomManager) getOrCreateRoom(ctx context.Context, roomName string) } // manages an RTC session for a participant, runs on the RTC node -func (r *LocalRoomManager) rtcSessionWorker(room *rtc.Room, participant types.Participant, requestSource routing.MessageSource) { +func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.Participant, requestSource routing.MessageSource) { defer func() { logger.Debugw("RTC session finishing", "participant", participant.Identity(), @@ -453,7 +459,7 @@ func (r *LocalRoomManager) rtcSessionWorker(room *rtc.Room, participant types.Pa } // handles RTC messages resulted from Room API calls -func (r *LocalRoomManager) handleRTCMessage(ctx context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) { +func (r *RoomManager) handleRTCMessage(ctx context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) { r.lock.RLock() room := r.rooms[roomName] r.lock.RUnlock() @@ -524,7 +530,7 @@ func (r *LocalRoomManager) handleRTCMessage(ctx context.Context, roomName, ident } } -func (r *LocalRoomManager) iceServersForRoom(ri *livekit.Room) []*livekit.ICEServer { +func (r *RoomManager) iceServersForRoom(ri *livekit.Room) []*livekit.ICEServer { var iceServers []*livekit.ICEServer hasSTUN := false diff --git a/pkg/service/roomservice.go b/pkg/service/roomservice.go index 0330e9bc6..3f468c1dd 100644 --- a/pkg/service/roomservice.go +++ b/pkg/service/roomservice.go @@ -13,12 +13,12 @@ import ( // A rooms service that supports a single node type RoomService struct { - router routing.Router + router routing.MessageRouter roomAllocator RoomAllocator - roomStore RoomStore + roomStore RORoomStore } -func NewRoomService(ra RoomAllocator, rs RoomStore, router routing.Router) (svc *RoomService, err error) { +func NewRoomService(ra RoomAllocator, rs RORoomStore, router routing.MessageRouter) (svc livekit.RoomService, err error) { svc = &RoomService{ router: router, roomAllocator: ra, @@ -62,19 +62,12 @@ func (s *RoomService) DeleteRoom(ctx context.Context, req *livekit.DeleteRoomReq if err := EnsureCreatePermission(ctx); err != nil { return nil, twirpAuthError(err) } - // if the room is currently active, RTC node needs to disconnect clients - node, err := s.router.GetNodeForRoom(ctx, req.Room) - if err != nil { - return nil, err - } - err = s.router.WriteRTCNodeMessage(ctx, node.Id, &livekit.RTCNodeMessage{ - ParticipantKey: s.roomParticipantKey(req.Room), + if err := s.writeRoomMessage(ctx, req.Room, "", &livekit.RTCNodeMessage{ Message: &livekit.RTCNodeMessage_DeleteRoom{ DeleteRoom: req, }, - }) - if err != nil { + }); err != nil { return nil, err } @@ -112,7 +105,7 @@ func (s *RoomService) GetParticipant(ctx context.Context, req *livekit.RoomParti } func (s *RoomService) RemoveParticipant(ctx context.Context, req *livekit.RoomParticipantIdentity) (res *livekit.RemoveParticipantResponse, err error) { - err = s.writeMessage(ctx, req.Room, req.Identity, &livekit.RTCNodeMessage{ + err = s.writeRoomMessage(ctx, req.Room, req.Identity, &livekit.RTCNodeMessage{ Message: &livekit.RTCNodeMessage_RemoveParticipant{ RemoveParticipant: req, }, @@ -142,7 +135,7 @@ func (s *RoomService) MutePublishedTrack(ctx context.Context, req *livekit.MuteR return nil, twirp.NotFoundError(ErrTrackNotFound.Error()) } - err = s.writeMessage(ctx, req.Room, req.Identity, &livekit.RTCNodeMessage{ + err = s.writeParticipantMessage(ctx, req.Room, req.Identity, &livekit.RTCNodeMessage{ Message: &livekit.RTCNodeMessage_MuteTrack{ MuteTrack: req, }, @@ -160,7 +153,7 @@ func (s *RoomService) MutePublishedTrack(ctx context.Context, req *livekit.MuteR } func (s *RoomService) UpdateParticipant(ctx context.Context, req *livekit.UpdateParticipantRequest) (*livekit.ParticipantInfo, error) { - err := s.writeMessage(ctx, req.Room, req.Identity, &livekit.RTCNodeMessage{ + err := s.writeRoomMessage(ctx, req.Room, req.Identity, &livekit.RTCNodeMessage{ Message: &livekit.RTCNodeMessage_UpdateParticipant{ UpdateParticipant: req, }, @@ -179,7 +172,7 @@ func (s *RoomService) UpdateParticipant(ctx context.Context, req *livekit.Update } func (s *RoomService) UpdateSubscriptions(ctx context.Context, req *livekit.UpdateSubscriptionsRequest) (*livekit.UpdateSubscriptionsResponse, error) { - err := s.writeMessage(ctx, req.Room, req.Identity, &livekit.RTCNodeMessage{ + err := s.writeRoomMessage(ctx, req.Room, req.Identity, &livekit.RTCNodeMessage{ Message: &livekit.RTCNodeMessage_UpdateSubscriptions{ UpdateSubscriptions: req, }, @@ -192,14 +185,7 @@ func (s *RoomService) UpdateSubscriptions(ctx context.Context, req *livekit.Upda } func (s *RoomService) SendData(ctx context.Context, req *livekit.SendDataRequest) (*livekit.SendDataResponse, error) { - // here we are using any user's identity, due to how it works with routing - node, err := s.router.GetNodeForRoom(ctx, req.Room) - if err != nil { - return nil, err - } - - err = s.router.WriteRTCNodeMessage(ctx, node.Id, &livekit.RTCNodeMessage{ - ParticipantKey: s.roomParticipantKey(req.Room), + err := s.writeRoomMessage(ctx, req.Room, "", &livekit.RTCNodeMessage{ Message: &livekit.RTCNodeMessage_SendData{ SendData: req, }, @@ -223,13 +209,7 @@ func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat room.Metadata = req.Metadata - node, err := s.router.GetNodeForRoom(ctx, req.Room) - if err != nil { - return nil, err - } - - err = s.router.WriteRTCNodeMessage(ctx, node.Id, &livekit.RTCNodeMessage{ - ParticipantKey: s.roomParticipantKey(req.Room), + err = s.writeRoomMessage(ctx, req.Room, "", &livekit.RTCNodeMessage{ Message: &livekit.RTCNodeMessage_UpdateRoomMetadata{ UpdateRoomMetadata: req, }, @@ -241,7 +221,7 @@ func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat return room, nil } -func (s *RoomService) writeMessage(ctx context.Context, room, identity string, msg *livekit.RTCNodeMessage) error { +func (s *RoomService) writeParticipantMessage(ctx context.Context, room, identity string, msg *livekit.RTCNodeMessage) error { if err := EnsureAdminPermission(ctx, room); err != nil { return twirpAuthError(err) } @@ -251,9 +231,20 @@ func (s *RoomService) writeMessage(ctx context.Context, room, identity string, m return err } - return s.router.WriteRTCMessage(ctx, room, identity, msg) + return s.router.WriteParticipantRTC(ctx, room, identity, msg) } -func (s *RoomService) roomParticipantKey(room string) string { - return room + "|" +func (s *RoomService) writeRoomMessage(ctx context.Context, room, identity string, msg *livekit.RTCNodeMessage) error { + if err := EnsureAdminPermission(ctx, room); err != nil { + return twirpAuthError(err) + } + + if identity != "" { + _, err := s.roomStore.LoadParticipant(ctx, room, identity) + if err != nil { + return err + } + } + + return s.router.WriteRoomRTC(ctx, room, identity, msg) } diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index cb6a1db26..f847c7ec6 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -20,14 +20,14 @@ import ( ) type RTCService struct { - router routing.Router + router routing.MessageRouter roomAllocator RoomAllocator upgrader websocket.Upgrader currentNode routing.LocalNode isDev bool } -func NewRTCService(conf *config.Config, ra RoomAllocator, router routing.Router, currentNode routing.LocalNode) *RTCService { +func NewRTCService(conf *config.Config, ra RoomAllocator, router routing.MessageRouter, currentNode routing.LocalNode) *RTCService { s := &RTCService{ router: router, roomAllocator: ra, diff --git a/pkg/service/server.go b/pkg/service/server.go index caa40facf..83e31fd46 100644 --- a/pkg/service/server.go +++ b/pkg/service/server.go @@ -30,7 +30,7 @@ type LivekitServer struct { httpServer *http.Server promServer *http.Server router routing.Router - roomManager *LocalRoomManager + roomManager *RoomManager turnServer *turn.Server currentNode routing.LocalNode running utils.AtomicFlag @@ -44,7 +44,7 @@ func NewLivekitServer(conf *config.Config, rtcService *RTCService, keyProvider auth.KeyProvider, router routing.Router, - roomManager *LocalRoomManager, + roomManager *RoomManager, turnServer *turn.Server, currentNode routing.LocalNode, ) (s *LivekitServer, err error) { @@ -61,7 +61,7 @@ func NewLivekitServer(conf *config.Config, } middlewares := []negroni.Handler{ - // always the first + // always first negroni.NewRecovery(), } if keyProvider != nil { @@ -230,15 +230,15 @@ func (s *LivekitServer) Stop(force bool) { <-s.closedChan } -func (s *LivekitServer) RoomManager() RoomManager { +func (s *LivekitServer) RoomManager() *RoomManager { return s.roomManager } -func (s *LivekitServer) debugGoroutines(w http.ResponseWriter, r *http.Request) { +func (s *LivekitServer) debugGoroutines(w http.ResponseWriter, _ *http.Request) { _ = pprof.Lookup("goroutine").WriteTo(w, 2) } -func (s *LivekitServer) debugInfo(w http.ResponseWriter, r *http.Request) { +func (s *LivekitServer) debugInfo(w http.ResponseWriter, _ *http.Request) { s.roomManager.lock.RLock() info := make([]map[string]interface{}, 0, len(s.roomManager.rooms)) for _, room := range s.roomManager.rooms { @@ -255,19 +255,19 @@ func (s *LivekitServer) debugInfo(w http.ResponseWriter, r *http.Request) { } } -func (s *LivekitServer) healthCheck(w http.ResponseWriter, r *http.Request) { +func (s *LivekitServer) healthCheck(w http.ResponseWriter, _ *http.Request) { var updatedAt time.Time if s.Node().Stats != nil { updatedAt = time.Unix(s.Node().Stats.UpdatedAt, 0) } if time.Now().Sub(updatedAt) > 4*time.Second { w.WriteHeader(http.StatusNotAcceptable) - w.Write([]byte(fmt.Sprintf("Not Ready\nNode Updated At %s", updatedAt))) + _, _ = w.Write([]byte(fmt.Sprintf("Not Ready\nNode Updated At %s", updatedAt))) return } w.WriteHeader(http.StatusOK) - w.Write([]byte([]byte("OK"))) + _, _ = w.Write([]byte("OK")) } // worker to perform periodic tasks per node diff --git a/pkg/service/wire.go b/pkg/service/wire.go index 786d0070f..df0bbc4be 100644 --- a/pkg/service/wire.go +++ b/pkg/service/wire.go @@ -14,7 +14,6 @@ import ( "github.com/livekit/protocol/auth" "github.com/livekit/protocol/logger" - livekit "github.com/livekit/protocol/proto" "github.com/livekit/protocol/utils" "github.com/livekit/protocol/webhook" @@ -28,9 +27,11 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live createRedisClient, createMessageBus, createStore, + wire.Bind(new(RORoomStore), new(RoomStore)), createKeyProvider, createWebhookNotifier, routing.CreateRouter, + wire.Bind(new(routing.MessageRouter), new(routing.Router)), telemetry.NewTelemetryService, NewRecordingService, NewRoomAllocator, @@ -39,7 +40,6 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live NewLocalRoomManager, newTurnAuthHandler, NewTurnServer, - wire.Bind(new(livekit.RoomService), new(*RoomService)), NewLivekitServer, ) return &LivekitServer{}, nil diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 21ccc3e39..bde9a342f 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -1,8 +1,7 @@ // Code generated by Wire. DO NOT EDIT. //go:generate go run github.com/google/wire/cmd/wire -//go:build !wireinject -// +build !wireinject +//+build !wireinject package service @@ -50,7 +49,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live telemetryService := telemetry.NewTelemetryService(notifier) recordingService := NewRecordingService(messageBus, telemetryService) rtcService := NewRTCService(conf, roomAllocator, router, currentNode) - localRoomManager, err := NewLocalRoomManager(conf, roomStore, router, currentNode, telemetryService) + roomManager, err := NewLocalRoomManager(conf, roomStore, currentNode, router, telemetryService) if err != nil { return nil, err } @@ -59,7 +58,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live if err != nil { return nil, err } - livekitServer, err := NewLivekitServer(conf, roomService, recordingService, rtcService, keyProvider, router, localRoomManager, server, currentNode) + livekitServer, err := NewLivekitServer(conf, roomService, recordingService, rtcService, keyProvider, router, roomManager, server, currentNode) if err != nil { return nil, err }