diff --git a/.idea/livekit-server.iml b/.idea/livekit-server.iml index f5d5063d9..a16151c89 100644 --- a/.idea/livekit-server.iml +++ b/.idea/livekit-server.iml @@ -4,8 +4,6 @@ - - diff --git a/pkg/routing/interfaces.go b/pkg/routing/interfaces.go index 34c4beb38..1f32b050c 100644 --- a/pkg/routing/interfaces.go +++ b/pkg/routing/interfaces.go @@ -1,6 +1,8 @@ package routing import ( + "context" + livekit "github.com/livekit/protocol/proto" "google.golang.org/protobuf/proto" ) @@ -33,15 +35,15 @@ type ParticipantInit struct { Hidden bool } -type NewParticipantCallback func(roomName string, pi ParticipantInit, requestSource MessageSource, responseSink MessageSink) -type RTCMessageCallback func(roomName, identity string, msg *livekit.RTCNodeMessage) +type NewParticipantCallback func(ctx context.Context, roomName string, pi ParticipantInit, requestSource MessageSource, responseSink MessageSink) +type RTCMessageCallback func(ctx context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) // Router allows multiple nodes to coordinate the participant session //counterfeiter:generate . Router type Router interface { - GetNodeForRoom(roomName string) (*livekit.Node, error) - SetNodeForRoom(roomName string, nodeId string) error - ClearRoomState(roomName string) error + GetNodeForRoom(ctx context.Context, roomName string) (*livekit.Node, error) + SetNodeForRoom(ctx context.Context, roomName string, nodeId string) error + ClearRoomState(ctx context.Context, roomName string) error RegisterNode() error UnregisterNode() error RemoveDeadNodes() error @@ -49,10 +51,10 @@ type Router interface { ListNodes() ([]*livekit.Node, error) // StartParticipantSignal participant signal connection is ready to start - StartParticipantSignal(roomName string, pi ParticipantInit) (connectionId string, reqSink MessageSink, resSource MessageSource, err error) + StartParticipantSignal(ctx context.Context, roomName string, pi ParticipantInit) (connectionId string, reqSink MessageSink, resSource MessageSource, err error) // CreateRTCSink sends a message to RTC node - CreateRTCSink(roomName, identity string) (MessageSink, error) + CreateRTCSink(ctx context.Context, roomName, identity string) (MessageSink, error) // OnNewParticipantRTC is called to start a new participant's RTC connection OnNewParticipantRTC(callback NewParticipantCallback) diff --git a/pkg/routing/localrouter.go b/pkg/routing/localrouter.go index 45bd637af..bdd18f9d3 100644 --- a/pkg/routing/localrouter.go +++ b/pkg/routing/localrouter.go @@ -1,6 +1,7 @@ package routing import ( + "context" "sync" "time" @@ -35,18 +36,18 @@ func NewLocalRouter(currentNode LocalNode) *LocalRouter { } } -func (r *LocalRouter) GetNodeForRoom(roomName string) (*livekit.Node, error) { +func (r *LocalRouter) GetNodeForRoom(ctx context.Context, roomName string) (*livekit.Node, error) { r.lock.Lock() defer r.lock.Unlock() node := proto.Clone((*livekit.Node)(r.currentNode)).(*livekit.Node) return node, nil } -func (r *LocalRouter) SetNodeForRoom(roomName string, nodeId string) error { +func (r *LocalRouter) SetNodeForRoom(ctx context.Context, roomName string, nodeId string) error { return nil } -func (r *LocalRouter) ClearRoomState(roomName string) error { +func (r *LocalRouter) ClearRoomState(ctx context.Context, roomName string) error { // do nothing return nil } @@ -76,7 +77,7 @@ func (r *LocalRouter) ListNodes() ([]*livekit.Node, error) { }, nil } -func (r *LocalRouter) StartParticipantSignal(roomName string, pi ParticipantInit) (connectionId string, reqSink MessageSink, resSource MessageSource, err error) { +func (r *LocalRouter) StartParticipantSignal(ctx context.Context, roomName string, pi ParticipantInit) (connectionId string, reqSink MessageSink, resSource MessageSource, err error) { // treat it as a new participant connecting if r.onNewParticipant == nil { err = ErrHandlerNotDefined @@ -89,6 +90,7 @@ func (r *LocalRouter) StartParticipantSignal(roomName string, pi ParticipantInit resChan := r.getOrCreateMessageChannel(r.responseChannels, key) r.onNewParticipant( + ctx, roomName, pi, // request source @@ -99,7 +101,7 @@ func (r *LocalRouter) StartParticipantSignal(roomName string, pi ParticipantInit return pi.Identity, reqChan, resChan, nil } -func (r *LocalRouter) CreateRTCSink(roomName, identity string) (MessageSink, error) { +func (r *LocalRouter) CreateRTCSink(ctx context.Context, roomName, identity string) (MessageSink, error) { if r.rtcMessageChan.isClosed.Get() { // create a new one r.rtcMessageChan = NewMessageChannel() @@ -167,7 +169,7 @@ func (r *LocalRouter) rtcMessageWorker() { continue } if r.onRTCMessage != nil { - r.onRTCMessage(room, identity, rtcMsg) + r.onRTCMessage(context.Background(), room, identity, rtcMsg) } } } diff --git a/pkg/routing/redisrouter.go b/pkg/routing/redisrouter.go index 077e35c60..d1c78f980 100644 --- a/pkg/routing/redisrouter.go +++ b/pkg/routing/redisrouter.go @@ -77,7 +77,7 @@ func (r *RedisRouter) RemoveDeadNodes() error { return nil } -func (r *RedisRouter) GetNodeForRoom(roomName string) (*livekit.Node, error) { +func (r *RedisRouter) GetNodeForRoom(ctx context.Context, roomName string) (*livekit.Node, error) { nodeId, err := r.rc.HGet(r.ctx, NodeRoomKey, roomName).Result() if err == redis.Nil { return nil, ErrNotFound @@ -88,11 +88,11 @@ func (r *RedisRouter) GetNodeForRoom(roomName string) (*livekit.Node, error) { return r.GetNode(nodeId) } -func (r *RedisRouter) SetNodeForRoom(roomName string, nodeId string) error { +func (r *RedisRouter) SetNodeForRoom(ctx context.Context, roomName string, nodeId string) error { return r.rc.HSet(r.ctx, NodeRoomKey, roomName, nodeId).Err() } -func (r *RedisRouter) ClearRoomState(roomName string) error { +func (r *RedisRouter) ClearRoomState(ctx context.Context, roomName string) error { if err := r.rc.HDel(r.ctx, NodeRoomKey, roomName).Err(); err != nil { return errors.Wrap(err, "could not clear room state") } @@ -130,9 +130,9 @@ func (r *RedisRouter) ListNodes() ([]*livekit.Node, error) { } // StartParticipantSignal signal connection sets up paths to the RTC node, and starts to route messages to that message queue -func (r *RedisRouter) StartParticipantSignal(roomName string, pi ParticipantInit) (connectionId string, reqSink MessageSink, resSource MessageSource, err error) { +func (r *RedisRouter) StartParticipantSignal(ctx context.Context, roomName string, pi ParticipantInit) (connectionId string, reqSink MessageSink, resSource MessageSource, err error) { // find the node where the room is hosted at - rtcNode, err := r.GetNodeForRoom(roomName) + rtcNode, err := r.GetNodeForRoom(ctx, roomName) if err != nil { return } @@ -171,7 +171,7 @@ func (r *RedisRouter) StartParticipantSignal(roomName string, pi ParticipantInit return connectionId, sink, resChan, nil } -func (r *RedisRouter) CreateRTCSink(roomName, identity string) (MessageSink, error) { +func (r *RedisRouter) CreateRTCSink(ctx context.Context, roomName, identity string) (MessageSink, error) { pkey := ParticipantKey(roomName, identity) rtcNode, err := r.getParticipantRTCNode(pkey) if err != nil { @@ -183,7 +183,7 @@ func (r *RedisRouter) CreateRTCSink(roomName, identity string) (MessageSink, err func (r *RedisRouter) startParticipantRTC(ss *livekit.StartSession, participantKey string) error { // find the node where the room is hosted at - rtcNode, err := r.GetNodeForRoom(ss.RoomName) + rtcNode, err := r.GetNodeForRoom(r.ctx, ss.RoomName) if err != nil { return err } @@ -236,6 +236,7 @@ func (r *RedisRouter) startParticipantRTC(ss *livekit.StartSession, participantK reqChan := r.getOrCreateMessageChannel(r.requestChannels, participantKey) resSink := NewSignalNodeSink(r.rc, signalNode, ss.ConnectionId) r.onNewParticipant( + r.ctx, ss.RoomName, pi, reqChan, @@ -416,7 +417,7 @@ func (r *RedisRouter) handleRTCMessage(rm *livekit.RTCNodeMessage) error { if err != nil { return err } - r.onRTCMessage(roomName, identity, rm) + r.onRTCMessage(r.ctx, roomName, identity, rm) } } return nil diff --git a/pkg/routing/routingfakes/fake_router.go b/pkg/routing/routingfakes/fake_router.go index 91a809c3a..d43cea8e8 100644 --- a/pkg/routing/routingfakes/fake_router.go +++ b/pkg/routing/routingfakes/fake_router.go @@ -2,6 +2,7 @@ package routingfakes import ( + "context" "sync" "github.com/livekit/livekit-server/pkg/routing" @@ -9,10 +10,11 @@ import ( ) type FakeRouter struct { - ClearRoomStateStub func(string) error + ClearRoomStateStub func(context.Context, string) error clearRoomStateMutex sync.RWMutex clearRoomStateArgsForCall []struct { - arg1 string + arg1 context.Context + arg2 string } clearRoomStateReturns struct { result1 error @@ -20,11 +22,12 @@ type FakeRouter struct { clearRoomStateReturnsOnCall map[int]struct { result1 error } - CreateRTCSinkStub func(string, string) (routing.MessageSink, error) + CreateRTCSinkStub func(context.Context, string, string) (routing.MessageSink, error) createRTCSinkMutex sync.RWMutex createRTCSinkArgsForCall []struct { - arg1 string + arg1 context.Context arg2 string + arg3 string } createRTCSinkReturns struct { result1 routing.MessageSink @@ -47,10 +50,11 @@ type FakeRouter struct { result1 *livekit.Node result2 error } - GetNodeForRoomStub func(string) (*livekit.Node, error) + GetNodeForRoomStub func(context.Context, string) (*livekit.Node, error) getNodeForRoomMutex sync.RWMutex getNodeForRoomArgsForCall []struct { - arg1 string + arg1 context.Context + arg2 string } getNodeForRoomReturns struct { result1 *livekit.Node @@ -102,11 +106,12 @@ type FakeRouter struct { removeDeadNodesReturnsOnCall map[int]struct { result1 error } - SetNodeForRoomStub func(string, string) error + SetNodeForRoomStub func(context.Context, string, string) error setNodeForRoomMutex sync.RWMutex setNodeForRoomArgsForCall []struct { - arg1 string + arg1 context.Context arg2 string + arg3 string } setNodeForRoomReturns struct { result1 error @@ -124,11 +129,12 @@ type FakeRouter struct { startReturnsOnCall map[int]struct { result1 error } - StartParticipantSignalStub func(string, routing.ParticipantInit) (string, routing.MessageSink, routing.MessageSource, error) + StartParticipantSignalStub func(context.Context, string, routing.ParticipantInit) (string, routing.MessageSink, routing.MessageSource, error) startParticipantSignalMutex sync.RWMutex startParticipantSignalArgsForCall []struct { - arg1 string - arg2 routing.ParticipantInit + arg1 context.Context + arg2 string + arg3 routing.ParticipantInit } startParticipantSignalReturns struct { result1 string @@ -160,18 +166,19 @@ type FakeRouter struct { invocationsMutex sync.RWMutex } -func (fake *FakeRouter) ClearRoomState(arg1 string) error { +func (fake *FakeRouter) ClearRoomState(arg1 context.Context, arg2 string) error { fake.clearRoomStateMutex.Lock() ret, specificReturn := fake.clearRoomStateReturnsOnCall[len(fake.clearRoomStateArgsForCall)] fake.clearRoomStateArgsForCall = append(fake.clearRoomStateArgsForCall, struct { - arg1 string - }{arg1}) + arg1 context.Context + arg2 string + }{arg1, arg2}) stub := fake.ClearRoomStateStub fakeReturns := fake.clearRoomStateReturns - fake.recordInvocation("ClearRoomState", []interface{}{arg1}) + fake.recordInvocation("ClearRoomState", []interface{}{arg1, arg2}) fake.clearRoomStateMutex.Unlock() if stub != nil { - return stub(arg1) + return stub(arg1, arg2) } if specificReturn { return ret.result1 @@ -185,17 +192,17 @@ func (fake *FakeRouter) ClearRoomStateCallCount() int { return len(fake.clearRoomStateArgsForCall) } -func (fake *FakeRouter) ClearRoomStateCalls(stub func(string) error) { +func (fake *FakeRouter) ClearRoomStateCalls(stub func(context.Context, string) error) { fake.clearRoomStateMutex.Lock() defer fake.clearRoomStateMutex.Unlock() fake.ClearRoomStateStub = stub } -func (fake *FakeRouter) ClearRoomStateArgsForCall(i int) string { +func (fake *FakeRouter) ClearRoomStateArgsForCall(i int) (context.Context, string) { fake.clearRoomStateMutex.RLock() defer fake.clearRoomStateMutex.RUnlock() argsForCall := fake.clearRoomStateArgsForCall[i] - return argsForCall.arg1 + return argsForCall.arg1, argsForCall.arg2 } func (fake *FakeRouter) ClearRoomStateReturns(result1 error) { @@ -221,19 +228,20 @@ func (fake *FakeRouter) ClearRoomStateReturnsOnCall(i int, result1 error) { }{result1} } -func (fake *FakeRouter) CreateRTCSink(arg1 string, arg2 string) (routing.MessageSink, error) { +func (fake *FakeRouter) CreateRTCSink(arg1 context.Context, arg2 string, arg3 string) (routing.MessageSink, error) { fake.createRTCSinkMutex.Lock() ret, specificReturn := fake.createRTCSinkReturnsOnCall[len(fake.createRTCSinkArgsForCall)] fake.createRTCSinkArgsForCall = append(fake.createRTCSinkArgsForCall, struct { - arg1 string + arg1 context.Context arg2 string - }{arg1, arg2}) + arg3 string + }{arg1, arg2, arg3}) stub := fake.CreateRTCSinkStub fakeReturns := fake.createRTCSinkReturns - fake.recordInvocation("CreateRTCSink", []interface{}{arg1, arg2}) + fake.recordInvocation("CreateRTCSink", []interface{}{arg1, arg2, arg3}) fake.createRTCSinkMutex.Unlock() if stub != nil { - return stub(arg1, arg2) + return stub(arg1, arg2, arg3) } if specificReturn { return ret.result1, ret.result2 @@ -247,17 +255,17 @@ func (fake *FakeRouter) CreateRTCSinkCallCount() int { return len(fake.createRTCSinkArgsForCall) } -func (fake *FakeRouter) CreateRTCSinkCalls(stub func(string, string) (routing.MessageSink, error)) { +func (fake *FakeRouter) CreateRTCSinkCalls(stub func(context.Context, string, string) (routing.MessageSink, error)) { fake.createRTCSinkMutex.Lock() defer fake.createRTCSinkMutex.Unlock() fake.CreateRTCSinkStub = stub } -func (fake *FakeRouter) CreateRTCSinkArgsForCall(i int) (string, string) { +func (fake *FakeRouter) CreateRTCSinkArgsForCall(i int) (context.Context, string, string) { fake.createRTCSinkMutex.RLock() defer fake.createRTCSinkMutex.RUnlock() argsForCall := fake.createRTCSinkArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 } func (fake *FakeRouter) CreateRTCSinkReturns(result1 routing.MessageSink, result2 error) { @@ -350,18 +358,19 @@ func (fake *FakeRouter) GetNodeReturnsOnCall(i int, result1 *livekit.Node, resul }{result1, result2} } -func (fake *FakeRouter) GetNodeForRoom(arg1 string) (*livekit.Node, error) { +func (fake *FakeRouter) GetNodeForRoom(arg1 context.Context, arg2 string) (*livekit.Node, error) { fake.getNodeForRoomMutex.Lock() ret, specificReturn := fake.getNodeForRoomReturnsOnCall[len(fake.getNodeForRoomArgsForCall)] fake.getNodeForRoomArgsForCall = append(fake.getNodeForRoomArgsForCall, struct { - arg1 string - }{arg1}) + arg1 context.Context + arg2 string + }{arg1, arg2}) stub := fake.GetNodeForRoomStub fakeReturns := fake.getNodeForRoomReturns - fake.recordInvocation("GetNodeForRoom", []interface{}{arg1}) + fake.recordInvocation("GetNodeForRoom", []interface{}{arg1, arg2}) fake.getNodeForRoomMutex.Unlock() if stub != nil { - return stub(arg1) + return stub(arg1, arg2) } if specificReturn { return ret.result1, ret.result2 @@ -375,17 +384,17 @@ func (fake *FakeRouter) GetNodeForRoomCallCount() int { return len(fake.getNodeForRoomArgsForCall) } -func (fake *FakeRouter) GetNodeForRoomCalls(stub func(string) (*livekit.Node, error)) { +func (fake *FakeRouter) GetNodeForRoomCalls(stub func(context.Context, string) (*livekit.Node, error)) { fake.getNodeForRoomMutex.Lock() defer fake.getNodeForRoomMutex.Unlock() fake.GetNodeForRoomStub = stub } -func (fake *FakeRouter) GetNodeForRoomArgsForCall(i int) string { +func (fake *FakeRouter) GetNodeForRoomArgsForCall(i int) (context.Context, string) { fake.getNodeForRoomMutex.RLock() defer fake.getNodeForRoomMutex.RUnlock() argsForCall := fake.getNodeForRoomArgsForCall[i] - return argsForCall.arg1 + return argsForCall.arg1, argsForCall.arg2 } func (fake *FakeRouter) GetNodeForRoomReturns(result1 *livekit.Node, result2 error) { @@ -640,19 +649,20 @@ func (fake *FakeRouter) RemoveDeadNodesReturnsOnCall(i int, result1 error) { }{result1} } -func (fake *FakeRouter) SetNodeForRoom(arg1 string, arg2 string) error { +func (fake *FakeRouter) SetNodeForRoom(arg1 context.Context, arg2 string, arg3 string) error { fake.setNodeForRoomMutex.Lock() ret, specificReturn := fake.setNodeForRoomReturnsOnCall[len(fake.setNodeForRoomArgsForCall)] fake.setNodeForRoomArgsForCall = append(fake.setNodeForRoomArgsForCall, struct { - arg1 string + arg1 context.Context arg2 string - }{arg1, arg2}) + arg3 string + }{arg1, arg2, arg3}) stub := fake.SetNodeForRoomStub fakeReturns := fake.setNodeForRoomReturns - fake.recordInvocation("SetNodeForRoom", []interface{}{arg1, arg2}) + fake.recordInvocation("SetNodeForRoom", []interface{}{arg1, arg2, arg3}) fake.setNodeForRoomMutex.Unlock() if stub != nil { - return stub(arg1, arg2) + return stub(arg1, arg2, arg3) } if specificReturn { return ret.result1 @@ -666,17 +676,17 @@ func (fake *FakeRouter) SetNodeForRoomCallCount() int { return len(fake.setNodeForRoomArgsForCall) } -func (fake *FakeRouter) SetNodeForRoomCalls(stub func(string, string) error) { +func (fake *FakeRouter) SetNodeForRoomCalls(stub func(context.Context, string, string) error) { fake.setNodeForRoomMutex.Lock() defer fake.setNodeForRoomMutex.Unlock() fake.SetNodeForRoomStub = stub } -func (fake *FakeRouter) SetNodeForRoomArgsForCall(i int) (string, string) { +func (fake *FakeRouter) SetNodeForRoomArgsForCall(i int) (context.Context, string, string) { fake.setNodeForRoomMutex.RLock() defer fake.setNodeForRoomMutex.RUnlock() argsForCall := fake.setNodeForRoomArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 } func (fake *FakeRouter) SetNodeForRoomReturns(result1 error) { @@ -755,19 +765,20 @@ func (fake *FakeRouter) StartReturnsOnCall(i int, result1 error) { }{result1} } -func (fake *FakeRouter) StartParticipantSignal(arg1 string, arg2 routing.ParticipantInit) (string, routing.MessageSink, routing.MessageSource, error) { +func (fake *FakeRouter) StartParticipantSignal(arg1 context.Context, arg2 string, arg3 routing.ParticipantInit) (string, routing.MessageSink, routing.MessageSource, error) { fake.startParticipantSignalMutex.Lock() ret, specificReturn := fake.startParticipantSignalReturnsOnCall[len(fake.startParticipantSignalArgsForCall)] fake.startParticipantSignalArgsForCall = append(fake.startParticipantSignalArgsForCall, struct { - arg1 string - arg2 routing.ParticipantInit - }{arg1, arg2}) + arg1 context.Context + arg2 string + arg3 routing.ParticipantInit + }{arg1, arg2, arg3}) stub := fake.StartParticipantSignalStub fakeReturns := fake.startParticipantSignalReturns - fake.recordInvocation("StartParticipantSignal", []interface{}{arg1, arg2}) + fake.recordInvocation("StartParticipantSignal", []interface{}{arg1, arg2, arg3}) fake.startParticipantSignalMutex.Unlock() if stub != nil { - return stub(arg1, arg2) + return stub(arg1, arg2, arg3) } if specificReturn { return ret.result1, ret.result2, ret.result3, ret.result4 @@ -781,17 +792,17 @@ func (fake *FakeRouter) StartParticipantSignalCallCount() int { return len(fake.startParticipantSignalArgsForCall) } -func (fake *FakeRouter) StartParticipantSignalCalls(stub func(string, routing.ParticipantInit) (string, routing.MessageSink, routing.MessageSource, error)) { +func (fake *FakeRouter) StartParticipantSignalCalls(stub func(context.Context, string, routing.ParticipantInit) (string, routing.MessageSink, routing.MessageSource, error)) { fake.startParticipantSignalMutex.Lock() defer fake.startParticipantSignalMutex.Unlock() fake.StartParticipantSignalStub = stub } -func (fake *FakeRouter) StartParticipantSignalArgsForCall(i int) (string, routing.ParticipantInit) { +func (fake *FakeRouter) StartParticipantSignalArgsForCall(i int) (context.Context, string, routing.ParticipantInit) { fake.startParticipantSignalMutex.RLock() defer fake.startParticipantSignalMutex.RUnlock() argsForCall := fake.startParticipantSignalArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 } func (fake *FakeRouter) StartParticipantSignalReturns(result1 string, result2 routing.MessageSink, result3 routing.MessageSource, result4 error) { diff --git a/pkg/service/interfaces.go b/pkg/service/interfaces.go index 59426f034..57b1a9570 100644 --- a/pkg/service/interfaces.go +++ b/pkg/service/interfaces.go @@ -1,6 +1,7 @@ package service import ( + "context" "time" livekit "github.com/livekit/protocol/proto" @@ -15,30 +16,30 @@ import ( // look up participant //counterfeiter:generate . RoomStore type RoomStore interface { - StoreRoom(room *livekit.Room) error - LoadRoom(idOrName string) (*livekit.Room, error) - ListRooms() ([]*livekit.Room, error) - DeleteRoom(idOrName string) error + StoreRoom(ctx context.Context, room *livekit.Room) error + LoadRoom(ctx context.Context, idOrName string) (*livekit.Room, error) + ListRooms(ctx context.Context) ([]*livekit.Room, error) + DeleteRoom(ctx context.Context, idOrName string) error // enable locking on a specific room to prevent race // returns a (lock uuid, error) - LockRoom(name string, duration time.Duration) (string, error) - UnlockRoom(name string, uid string) error + LockRoom(ctx context.Context, name string, duration time.Duration) (string, error) + UnlockRoom(ctx context.Context, name string, uid string) error - StoreParticipant(roomName string, participant *livekit.ParticipantInfo) error - LoadParticipant(roomName, identity string) (*livekit.ParticipantInfo, error) - ListParticipants(roomName string) ([]*livekit.ParticipantInfo, error) - DeleteParticipant(roomName, identity string) error + StoreParticipant(ctx context.Context, roomName string, participant *livekit.ParticipantInfo) error + LoadParticipant(ctx context.Context, roomName, identity string) (*livekit.ParticipantInfo, error) + ListParticipants(ctx context.Context, roomName string) ([]*livekit.ParticipantInfo, error) + DeleteParticipant(ctx context.Context, roomName, identity string) error } type RoomManager interface { RoomStore - CreateRoom(req *livekit.CreateRoomRequest) (*livekit.Room, error) - GetRoom(roomName string) *rtc.Room - DeleteRoom(roomName string) error + CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (*livekit.Room, error) + GetRoom(ctx context.Context, roomName string) *rtc.Room + DeleteRoom(ctx context.Context, roomName string) error + StartSession(ctx context.Context, roomName string, pi routing.ParticipantInit, requestSource routing.MessageSource, responseSink routing.MessageSink) CleanupRooms() error CloseIdleRooms() Stop() - StartSession(roomName string, pi routing.ParticipantInit, requestSource routing.MessageSource, responseSink routing.MessageSink) } diff --git a/pkg/service/localroomstore.go b/pkg/service/localroomstore.go index 566a444ca..c0ee06d06 100644 --- a/pkg/service/localroomstore.go +++ b/pkg/service/localroomstore.go @@ -1,6 +1,7 @@ package service import ( + "context" "sync" "time" @@ -28,7 +29,7 @@ func NewLocalRoomStore() *LocalRoomStore { } } -func (p *LocalRoomStore) StoreRoom(room *livekit.Room) error { +func (p *LocalRoomStore) StoreRoom(ctx context.Context, room *livekit.Room) error { if room.CreationTime == 0 { room.CreationTime = time.Now().Unix() } @@ -39,7 +40,7 @@ func (p *LocalRoomStore) StoreRoom(room *livekit.Room) error { return nil } -func (p *LocalRoomStore) LoadRoom(idOrName string) (*livekit.Room, error) { +func (p *LocalRoomStore) LoadRoom(ctx context.Context, idOrName string) (*livekit.Room, error) { p.lock.RLock() defer p.lock.RUnlock() // see if it's an id or name @@ -54,7 +55,7 @@ func (p *LocalRoomStore) LoadRoom(idOrName string) (*livekit.Room, error) { return room, nil } -func (p *LocalRoomStore) ListRooms() ([]*livekit.Room, error) { +func (p *LocalRoomStore) ListRooms(ctx context.Context) ([]*livekit.Room, error) { p.lock.RLock() defer p.lock.RUnlock() rooms := make([]*livekit.Room, 0, len(p.rooms)) @@ -64,8 +65,8 @@ func (p *LocalRoomStore) ListRooms() ([]*livekit.Room, error) { return rooms, nil } -func (p *LocalRoomStore) DeleteRoom(idOrName string) error { - room, err := p.LoadRoom(idOrName) +func (p *LocalRoomStore) DeleteRoom(ctx context.Context, idOrName string) error { + room, err := p.LoadRoom(ctx, idOrName) if err == ErrRoomNotFound { return nil } else if err != nil { @@ -81,18 +82,18 @@ func (p *LocalRoomStore) DeleteRoom(idOrName string) error { return nil } -func (p *LocalRoomStore) LockRoom(name string, duration time.Duration) (string, error) { +func (p *LocalRoomStore) LockRoom(ctx context.Context, name string, duration time.Duration) (string, error) { // local rooms lock & unlock globally p.globalLock.Lock() return "", nil } -func (p *LocalRoomStore) UnlockRoom(name string, uid string) error { +func (p *LocalRoomStore) UnlockRoom(ctx context.Context, name string, uid string) error { p.globalLock.Unlock() return nil } -func (p *LocalRoomStore) StoreParticipant(roomName string, participant *livekit.ParticipantInfo) error { +func (p *LocalRoomStore) StoreParticipant(ctx context.Context, roomName string, participant *livekit.ParticipantInfo) error { p.lock.Lock() defer p.lock.Unlock() roomParticipants := p.participants[roomName] @@ -104,7 +105,7 @@ func (p *LocalRoomStore) StoreParticipant(roomName string, participant *livekit. return nil } -func (p *LocalRoomStore) LoadParticipant(roomName, identity string) (*livekit.ParticipantInfo, error) { +func (p *LocalRoomStore) LoadParticipant(ctx context.Context, roomName, identity string) (*livekit.ParticipantInfo, error) { p.lock.RLock() defer p.lock.RUnlock() @@ -119,7 +120,7 @@ func (p *LocalRoomStore) LoadParticipant(roomName, identity string) (*livekit.Pa return participant, nil } -func (p *LocalRoomStore) ListParticipants(roomName string) ([]*livekit.ParticipantInfo, error) { +func (p *LocalRoomStore) ListParticipants(ctx context.Context, roomName string) ([]*livekit.ParticipantInfo, error) { p.lock.RLock() defer p.lock.RUnlock() @@ -137,7 +138,7 @@ func (p *LocalRoomStore) ListParticipants(roomName string) ([]*livekit.Participa return items, nil } -func (p *LocalRoomStore) DeleteParticipant(roomName, identity string) error { +func (p *LocalRoomStore) DeleteParticipant(ctx context.Context, roomName, identity string) error { p.lock.Lock() defer p.lock.Unlock() diff --git a/pkg/service/redisroomstore.go b/pkg/service/redisroomstore.go index 177d9e522..2794a8992 100644 --- a/pkg/service/redisroomstore.go +++ b/pkg/service/redisroomstore.go @@ -38,7 +38,7 @@ func NewRedisRoomStore(rc *redis.Client) *RedisRoomStore { } } -func (p *RedisRoomStore) StoreRoom(room *livekit.Room) error { +func (p *RedisRoomStore) StoreRoom(ctx context.Context, room *livekit.Room) error { if room.CreationTime == 0 { room.CreationTime = time.Now().Unix() } @@ -58,7 +58,7 @@ func (p *RedisRoomStore) StoreRoom(room *livekit.Room) error { return nil } -func (p *RedisRoomStore) LoadRoom(idOrName string) (*livekit.Room, error) { +func (p *RedisRoomStore) LoadRoom(ctx context.Context, idOrName string) (*livekit.Room, error) { // see if matches any ids name, err := p.rc.HGet(p.ctx, RoomIdMap, idOrName).Result() if err != nil { @@ -82,7 +82,7 @@ func (p *RedisRoomStore) LoadRoom(idOrName string) (*livekit.Room, error) { return &room, nil } -func (p *RedisRoomStore) ListRooms() ([]*livekit.Room, error) { +func (p *RedisRoomStore) ListRooms(ctx context.Context) ([]*livekit.Room, error) { items, err := p.rc.HVals(p.ctx, RoomsKey).Result() if err != nil && err != redis.Nil { return nil, errors.Wrap(err, "could not get rooms") @@ -101,8 +101,8 @@ func (p *RedisRoomStore) ListRooms() ([]*livekit.Room, error) { return rooms, nil } -func (p *RedisRoomStore) DeleteRoom(idOrName string) error { - room, err := p.LoadRoom(idOrName) +func (p *RedisRoomStore) DeleteRoom(ctx context.Context, idOrName string) error { + room, err := p.LoadRoom(ctx, idOrName) var sid, name string if err == ErrRoomNotFound { @@ -125,7 +125,7 @@ func (p *RedisRoomStore) DeleteRoom(idOrName string) error { return err } -func (p *RedisRoomStore) LockRoom(name string, duration time.Duration) (string, error) { +func (p *RedisRoomStore) LockRoom(ctx context.Context, name string, duration time.Duration) (string, error) { token := utils.NewGuid("LOCK") key := RoomLockPrefix + name @@ -150,7 +150,7 @@ func (p *RedisRoomStore) LockRoom(name string, duration time.Duration) (string, return "", ErrRoomLockFailed } -func (p *RedisRoomStore) UnlockRoom(name string, uid string) error { +func (p *RedisRoomStore) UnlockRoom(ctx context.Context, name string, uid string) error { key := RoomLockPrefix + name val, err := p.rc.Get(p.ctx, key).Result() @@ -167,7 +167,7 @@ func (p *RedisRoomStore) UnlockRoom(name string, uid string) error { return p.rc.Del(p.ctx, key).Err() } -func (p *RedisRoomStore) StoreParticipant(roomName string, participant *livekit.ParticipantInfo) error { +func (p *RedisRoomStore) StoreParticipant(ctx context.Context, roomName string, participant *livekit.ParticipantInfo) error { key := RoomParticipantsPrefix + roomName data, err := proto.Marshal(participant) @@ -178,7 +178,7 @@ func (p *RedisRoomStore) StoreParticipant(roomName string, participant *livekit. return p.rc.HSet(p.ctx, key, participant.Identity, data).Err() } -func (p *RedisRoomStore) LoadParticipant(roomName, identity string) (*livekit.ParticipantInfo, error) { +func (p *RedisRoomStore) LoadParticipant(ctx context.Context, roomName, identity string) (*livekit.ParticipantInfo, error) { key := RoomParticipantsPrefix + roomName data, err := p.rc.HGet(p.ctx, key, identity).Result() if err == redis.Nil { @@ -194,7 +194,7 @@ func (p *RedisRoomStore) LoadParticipant(roomName, identity string) (*livekit.Pa return &pi, nil } -func (p *RedisRoomStore) ListParticipants(roomName string) ([]*livekit.ParticipantInfo, error) { +func (p *RedisRoomStore) ListParticipants(ctx context.Context, roomName string) ([]*livekit.ParticipantInfo, error) { key := RoomParticipantsPrefix + roomName items, err := p.rc.HVals(p.ctx, key).Result() if err == redis.Nil { @@ -214,7 +214,7 @@ func (p *RedisRoomStore) ListParticipants(roomName string) ([]*livekit.Participa return participants, nil } -func (p *RedisRoomStore) DeleteParticipant(roomName, identity string) error { +func (p *RedisRoomStore) DeleteParticipant(ctx context.Context, roomName, identity string) error { key := RoomParticipantsPrefix + roomName return p.rc.HDel(p.ctx, key, identity).Err() diff --git a/pkg/service/redisroomstore_test.go b/pkg/service/redisroomstore_test.go index 344ee8b94..2f67fdee6 100644 --- a/pkg/service/redisroomstore_test.go +++ b/pkg/service/redisroomstore_test.go @@ -1,6 +1,7 @@ package service_test import ( + "context" "sync" "sync/atomic" "testing" @@ -13,10 +14,11 @@ import ( ) func TestParticipantPersistence(t *testing.T) { + ctx := context.Background() rs := service.NewRedisRoomStore(redisClient()) roomName := "room1" - rs.DeleteRoom(roomName) + rs.DeleteRoom(ctx, roomName) p := &livekit.ParticipantInfo{ Sid: "PA_test", @@ -32,46 +34,47 @@ func TestParticipantPersistence(t *testing.T) { } // create the participant - require.NoError(t, rs.StoreParticipant(roomName, p)) + require.NoError(t, rs.StoreParticipant(ctx, roomName, p)) // result should match - pGet, err := rs.LoadParticipant(roomName, p.Identity) + pGet, err := rs.LoadParticipant(ctx, roomName, p.Identity) require.NoError(t, err) require.Equal(t, p.Identity, pGet.Identity) require.Equal(t, len(p.Tracks), len(pGet.Tracks)) require.Equal(t, p.Tracks[0].Sid, pGet.Tracks[0].Sid) // list should return one participant - participants, err := rs.ListParticipants(roomName) + participants, err := rs.ListParticipants(ctx, roomName) require.NoError(t, err) require.Len(t, participants, 1) // deleting participant should return back to normal - require.NoError(t, rs.DeleteParticipant(roomName, p.Identity)) + require.NoError(t, rs.DeleteParticipant(ctx, roomName, p.Identity)) - participants, err = rs.ListParticipants(roomName) + participants, err = rs.ListParticipants(ctx, roomName) require.NoError(t, err) require.Len(t, participants, 0) // shouldn't be able to get it - _, err = rs.LoadParticipant(roomName, p.Identity) + _, err = rs.LoadParticipant(ctx, roomName, p.Identity) require.Equal(t, err, service.ErrParticipantNotFound) } func TestRoomLock(t *testing.T) { + ctx := context.Background() rs := service.NewRedisRoomStore(redisClient()) lockInterval := 5 * time.Millisecond roomName := "myroom" t.Run("normal locking", func(t *testing.T) { - token, err := rs.LockRoom(roomName, lockInterval) + token, err := rs.LockRoom(ctx, roomName, lockInterval) require.NoError(t, err) require.NotEmpty(t, token) - require.NoError(t, rs.UnlockRoom(roomName, token)) + require.NoError(t, rs.UnlockRoom(ctx, roomName, token)) }) t.Run("waits before acquiring lock", func(t *testing.T) { - token, err := rs.LockRoom(roomName, lockInterval) + token, err := rs.LockRoom(ctx, roomName, lockInterval) require.NoError(t, err) require.NotEmpty(t, token) unlocked := uint32(0) @@ -81,28 +84,28 @@ func TestRoomLock(t *testing.T) { go func() { // attempt to lock again defer wg.Done() - token2, err := rs.LockRoom(roomName, lockInterval) + token2, err := rs.LockRoom(ctx, roomName, lockInterval) require.NoError(t, err) - defer rs.UnlockRoom(roomName, token2) + defer rs.UnlockRoom(ctx, roomName, token2) require.Equal(t, uint32(1), atomic.LoadUint32(&unlocked)) }() // release after 2 ms time.Sleep(2 * time.Millisecond) atomic.StoreUint32(&unlocked, 1) - rs.UnlockRoom(roomName, token) + rs.UnlockRoom(ctx, roomName, token) wg.Wait() }) t.Run("lock expires", func(t *testing.T) { - token, err := rs.LockRoom(roomName, lockInterval) + token, err := rs.LockRoom(ctx, roomName, lockInterval) require.NoError(t, err) - defer rs.UnlockRoom(roomName, token) + defer rs.UnlockRoom(ctx, roomName, token) time.Sleep(lockInterval + time.Millisecond) - token2, err := rs.LockRoom(roomName, lockInterval) + token2, err := rs.LockRoom(ctx, roomName, lockInterval) require.NoError(t, err) - rs.UnlockRoom(roomName, token2) + rs.UnlockRoom(ctx, roomName, token2) }) } diff --git a/pkg/service/roommanager.go b/pkg/service/roommanager.go index 3f37d5991..70250eba9 100644 --- a/pkg/service/roommanager.go +++ b/pkg/service/roommanager.go @@ -1,6 +1,7 @@ package service import ( + "context" "fmt" "sync" "time" @@ -66,17 +67,17 @@ func NewLocalRoomManager(rp RoomStore, router routing.Router, currentNode routin // CreateRoom creates a new room from a request and allocates it to a node to handle // it'll also monitor fits state, and cleans it up when appropriate -func (r *LocalRoomManager) CreateRoom(req *livekit.CreateRoomRequest) (*livekit.Room, error) { - token, err := r.LockRoom(req.Name, 5*time.Second) +func (r *LocalRoomManager) CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (*livekit.Room, error) { + token, err := r.LockRoom(ctx, req.Name, 5*time.Second) if err != nil { return nil, err } defer func() { - _ = r.UnlockRoom(req.Name, token) + _ = r.UnlockRoom(ctx, req.Name, token) }() // find existing room and update it - rm, err := r.LoadRoom(req.Name) + rm, err := r.LoadRoom(ctx, req.Name) if err == ErrRoomNotFound { rm = &livekit.Room{ Sid: utils.NewGuid(utils.RoomPrefix), @@ -95,12 +96,12 @@ func (r *LocalRoomManager) CreateRoom(req *livekit.CreateRoomRequest) (*livekit. if req.MaxParticipants > 0 { rm.MaxParticipants = req.MaxParticipants } - if err := r.StoreRoom(rm); err != nil { + if err := r.StoreRoom(ctx, rm); err != nil { return nil, err } // Is that node still available? - node, err := r.router.GetNodeForRoom(rm.Name) + node, err := r.router.GetNodeForRoom(ctx, rm.Name) if err != routing.ErrNotFound && err != nil { return nil, err } @@ -127,21 +128,21 @@ func (r *LocalRoomManager) CreateRoom(req *livekit.CreateRoomRequest) (*livekit. } logger.Debugw("selected node for room", "room", rm.Name, "roomID", rm.Sid, "nodeID", nodeId) - if err := r.router.SetNodeForRoom(req.Name, nodeId); err != nil { + if err := r.router.SetNodeForRoom(ctx, req.Name, nodeId); err != nil { return nil, err } return rm, nil } -func (r *LocalRoomManager) GetRoom(roomName string) *rtc.Room { +func (r *LocalRoomManager) GetRoom(ctx context.Context, roomName string) *rtc.Room { r.lock.RLock() defer r.lock.RUnlock() return r.rooms[roomName] } // DeleteRoom completely deletes all room information, including active sessions, room store, and routing info -func (r *LocalRoomManager) DeleteRoom(roomName string) error { +func (r *LocalRoomManager) DeleteRoom(ctx context.Context, roomName string) error { logger.Infow("deleting room state", "room", roomName) r.lock.Lock() delete(r.rooms, roomName) @@ -153,12 +154,12 @@ func (r *LocalRoomManager) DeleteRoom(roomName string) error { // clear routing information go func() { defer wg.Done() - err = r.router.ClearRoomState(roomName) + err = r.router.ClearRoomState(ctx, roomName) }() // also delete room from db go func() { defer wg.Done() - err2 = r.RoomStore.DeleteRoom(roomName) + err2 = r.RoomStore.DeleteRoom(ctx, roomName) }() wg.Wait() @@ -172,7 +173,8 @@ func (r *LocalRoomManager) DeleteRoom(roomName string) error { // CleanupRooms cleans up after old rooms that have been around for awhile func (r *LocalRoomManager) CleanupRooms() error { // cleanup rooms that have been left for over a day - rooms, err := r.ListRooms() + ctx := context.Background() + rooms, err := r.ListRooms(ctx) if err != nil { return err } @@ -180,7 +182,7 @@ func (r *LocalRoomManager) CleanupRooms() error { now := time.Now().Unix() for _, room := range rooms { if (now - room.CreationTime) > roomPurgeSeconds { - if err := r.DeleteRoom(room.Name); err != nil { + if err := r.DeleteRoom(ctx, room.Name); err != nil { return err } } @@ -228,8 +230,8 @@ func (r *LocalRoomManager) Stop() { } // StartSession starts WebRTC session when a new participant is connected, takes place on RTC node -func (r *LocalRoomManager) StartSession(roomName string, pi routing.ParticipantInit, requestSource routing.MessageSource, responseSink routing.MessageSink) { - room, err := r.getOrCreateRoom(roomName) +func (r *LocalRoomManager) StartSession(ctx context.Context, roomName string, pi routing.ParticipantInit, requestSource routing.MessageSource, responseSink routing.MessageSink) { + room, err := r.getOrCreateRoom(ctx, roomName) if err != nil { logger.Errorw("could not create room", err, "room", roomName) return @@ -331,7 +333,7 @@ func (r *LocalRoomManager) StartSession(roomName string, pi routing.ParticipantI } // create the actual room object, to be used on RTC node -func (r *LocalRoomManager) getOrCreateRoom(roomName string) (*rtc.Room, error) { +func (r *LocalRoomManager) getOrCreateRoom(ctx context.Context, roomName string) (*rtc.Room, error) { r.lock.RLock() room := r.rooms[roomName] r.lock.RUnlock() @@ -341,7 +343,7 @@ func (r *LocalRoomManager) getOrCreateRoom(roomName string) (*rtc.Room, error) { } // create new room, get details first - ri, err := r.LoadRoom(roomName) + ri, err := r.LoadRoom(ctx, roomName) if err != nil { return nil, err } @@ -349,7 +351,7 @@ func (r *LocalRoomManager) getOrCreateRoom(roomName string) (*rtc.Room, error) { // construct ice servers room = rtc.NewRoom(ri, *r.rtcConfig, r.iceServersForRoom(ri), &r.config.Audio) room.OnClose(func() { - if err := r.DeleteRoom(roomName); err != nil { + if err := r.DeleteRoom(ctx, roomName); err != nil { logger.Errorw("could not delete room", err) } @@ -367,9 +369,9 @@ func (r *LocalRoomManager) getOrCreateRoom(roomName string) (*rtc.Room, error) { room.OnParticipantChanged(func(p types.Participant) { var err error if p.State() == livekit.ParticipantInfo_DISCONNECTED { - err = r.DeleteParticipant(roomName, p.Identity()) + err = r.DeleteParticipant(ctx, roomName, p.Identity()) } else { - err = r.StoreParticipant(roomName, p.ToProto()) + err = r.StoreParticipant(ctx, roomName, p.ToProto()) } if err != nil { logger.Errorw("could not handle participant change", err) @@ -488,7 +490,7 @@ func (r *LocalRoomManager) rtcSessionWorker(room *rtc.Room, participant types.Pa } } -func (r *LocalRoomManager) handleRTCMessage(roomName, identity string, msg *livekit.RTCNodeMessage) { +func (r *LocalRoomManager) handleRTCMessage(ctx context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) { r.lock.RLock() room := r.rooms[roomName] r.lock.RUnlock() diff --git a/pkg/service/roommanager_test.go b/pkg/service/roommanager_test.go index a6f49cebc..e0ece4ea2 100644 --- a/pkg/service/roommanager_test.go +++ b/pkg/service/roommanager_test.go @@ -1,6 +1,7 @@ package service_test import ( + "context" "testing" livekit "github.com/livekit/protocol/proto" @@ -17,7 +18,7 @@ func TestCreateRoom(t *testing.T) { manager, conf := newTestRoomManager(t) t.Run("ensure default room settings are applied", func(t *testing.T) { - room, err := manager.CreateRoom(&livekit.CreateRoomRequest{Name: "myroom"}) + room, err := manager.CreateRoom(context.Background(), &livekit.CreateRoomRequest{Name: "myroom"}) require.NoError(t, err) require.Equal(t, conf.Room.EmptyTimeout, room.EmptyTimeout) require.NotEmpty(t, room.EnabledCodecs) diff --git a/pkg/service/roomservice.go b/pkg/service/roomservice.go index 37eff1b29..70e134f54 100644 --- a/pkg/service/roomservice.go +++ b/pkg/service/roomservice.go @@ -30,7 +30,7 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq return nil, twirpAuthError(err) } - rm, err = s.roomManager.CreateRoom(req) + rm, err = s.roomManager.CreateRoom(ctx, req) if err != nil { err = errors.Wrap(err, "could not create room") } @@ -44,7 +44,7 @@ func (s *RoomService) ListRooms(ctx context.Context, req *livekit.ListRoomsReque return nil, twirpAuthError(err) } - rooms, err := s.roomManager.ListRooms() + rooms, err := s.roomManager.ListRooms(ctx) if err != nil { // TODO: translate error codes to twirp return @@ -62,7 +62,7 @@ func (s *RoomService) DeleteRoom(ctx context.Context, req *livekit.DeleteRoomReq } // if the room is currently active, RTC node needs to disconnect clients // here we are using any user's identity, due to how it works with routing - participants, err := s.roomManager.ListParticipants(req.Room) + participants, err := s.roomManager.ListParticipants(ctx, req.Room) if err != nil { return nil, err } @@ -78,7 +78,7 @@ func (s *RoomService) DeleteRoom(ctx context.Context, req *livekit.DeleteRoomReq } } else { // if a room hasn't started, delete locally - if err = s.roomManager.DeleteRoom(req.Room); err != nil { + if err = s.roomManager.DeleteRoom(ctx, req.Room); err != nil { err = twirp.WrapError(twirp.InternalError("could not delete room"), err) return nil, err } @@ -92,7 +92,7 @@ func (s *RoomService) ListParticipants(ctx context.Context, req *livekit.ListPar return nil, twirpAuthError(err) } - participants, err := s.roomManager.ListParticipants(req.Room) + participants, err := s.roomManager.ListParticipants(ctx, req.Room) if err != nil { return } @@ -108,7 +108,7 @@ func (s *RoomService) GetParticipant(ctx context.Context, req *livekit.RoomParti return nil, twirpAuthError(err) } - participant, err := s.roomManager.LoadParticipant(req.Room, req.Identity) + participant, err := s.roomManager.LoadParticipant(ctx, req.Room, req.Identity) if err != nil { return } @@ -136,7 +136,7 @@ func (s *RoomService) MutePublishedTrack(ctx context.Context, req *livekit.MuteR return nil, twirpAuthError(err) } - participant, err := s.roomManager.LoadParticipant(req.Room, req.Identity) + participant, err := s.roomManager.LoadParticipant(ctx, req.Room, req.Identity) if err != nil { return nil, err } @@ -176,7 +176,7 @@ func (s *RoomService) UpdateParticipant(ctx context.Context, req *livekit.Update return nil, err } - participant, err := s.roomManager.LoadParticipant(req.Room, req.Identity) + participant, err := s.roomManager.LoadParticipant(ctx, req.Room, req.Identity) if err != nil { return nil, err } @@ -200,7 +200,7 @@ func (s *RoomService) UpdateSubscriptions(ctx context.Context, req *livekit.Upda func (s *RoomService) SendData(ctx context.Context, req *livekit.SendDataRequest) (*livekit.SendDataResponse, error) { // here we are using any user's identity, due to how it works with routing - participants, err := s.roomManager.ListParticipants(req.Room) + participants, err := s.roomManager.ListParticipants(ctx, req.Room) if err != nil { return nil, err } @@ -224,12 +224,12 @@ func (s *RoomService) createRTCSink(ctx context.Context, room, identity string) return nil, twirpAuthError(err) } - _, err := s.roomManager.LoadParticipant(room, identity) + _, err := s.roomManager.LoadParticipant(ctx, room, identity) if err != nil { return nil, err } - return s.router.CreateRTCSink(room, identity) + return s.router.CreateRTCSink(ctx, room, identity) } func (s *RoomService) writeMessage(ctx context.Context, room, identity string, msg *livekit.RTCNodeMessage) error { diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index ed1bdbd34..08cfd6a01 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -1,6 +1,7 @@ package service import ( + "context" "fmt" "io" "net/http" @@ -108,14 +109,14 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { } // create room if it doesn't exist, also assigns an RTC node for the room - rm, err := s.roomManager.CreateRoom(&livekit.CreateRoomRequest{Name: roomName}) + rm, err := s.roomManager.CreateRoom(context.Background(), &livekit.CreateRoomRequest{Name: roomName}) if err != nil { handleError(w, http.StatusInternalServerError, err.Error()) return } // this needs to be started first *before* using router functions on this node - connId, reqSink, resSource, err := s.router.StartParticipantSignal(roomName, pi) + connId, reqSink, resSource, err := s.router.StartParticipantSignal(context.Background(), roomName, pi) if err != nil { handleError(w, http.StatusInternalServerError, "could not start session: "+err.Error()) return diff --git a/pkg/service/servicefakes/fake_room_store.go b/pkg/service/servicefakes/fake_room_store.go index 5df7b0a9b..3ed10bf1b 100644 --- a/pkg/service/servicefakes/fake_room_store.go +++ b/pkg/service/servicefakes/fake_room_store.go @@ -2,6 +2,7 @@ package servicefakes import ( + "context" "sync" "time" @@ -10,11 +11,12 @@ import ( ) type FakeRoomStore struct { - DeleteParticipantStub func(string, string) error + DeleteParticipantStub func(context.Context, string, string) error deleteParticipantMutex sync.RWMutex deleteParticipantArgsForCall []struct { - arg1 string + arg1 context.Context arg2 string + arg3 string } deleteParticipantReturns struct { result1 error @@ -22,10 +24,11 @@ type FakeRoomStore struct { deleteParticipantReturnsOnCall map[int]struct { result1 error } - DeleteRoomStub func(string) error + DeleteRoomStub func(context.Context, string) error deleteRoomMutex sync.RWMutex deleteRoomArgsForCall []struct { - arg1 string + arg1 context.Context + arg2 string } deleteRoomReturns struct { result1 error @@ -33,10 +36,11 @@ type FakeRoomStore struct { deleteRoomReturnsOnCall map[int]struct { result1 error } - ListParticipantsStub func(string) ([]*livekit.ParticipantInfo, error) + ListParticipantsStub func(context.Context, string) ([]*livekit.ParticipantInfo, error) listParticipantsMutex sync.RWMutex listParticipantsArgsForCall []struct { - arg1 string + arg1 context.Context + arg2 string } listParticipantsReturns struct { result1 []*livekit.ParticipantInfo @@ -46,9 +50,10 @@ type FakeRoomStore struct { result1 []*livekit.ParticipantInfo result2 error } - ListRoomsStub func() ([]*livekit.Room, error) + ListRoomsStub func(context.Context) ([]*livekit.Room, error) listRoomsMutex sync.RWMutex listRoomsArgsForCall []struct { + arg1 context.Context } listRoomsReturns struct { result1 []*livekit.Room @@ -58,11 +63,12 @@ type FakeRoomStore struct { result1 []*livekit.Room result2 error } - LoadParticipantStub func(string, string) (*livekit.ParticipantInfo, error) + LoadParticipantStub func(context.Context, string, string) (*livekit.ParticipantInfo, error) loadParticipantMutex sync.RWMutex loadParticipantArgsForCall []struct { - arg1 string + arg1 context.Context arg2 string + arg3 string } loadParticipantReturns struct { result1 *livekit.ParticipantInfo @@ -72,10 +78,11 @@ type FakeRoomStore struct { result1 *livekit.ParticipantInfo result2 error } - LoadRoomStub func(string) (*livekit.Room, error) + LoadRoomStub func(context.Context, string) (*livekit.Room, error) loadRoomMutex sync.RWMutex loadRoomArgsForCall []struct { - arg1 string + arg1 context.Context + arg2 string } loadRoomReturns struct { result1 *livekit.Room @@ -85,11 +92,12 @@ type FakeRoomStore struct { result1 *livekit.Room result2 error } - LockRoomStub func(string, time.Duration) (string, error) + LockRoomStub func(context.Context, string, time.Duration) (string, error) lockRoomMutex sync.RWMutex lockRoomArgsForCall []struct { - arg1 string - arg2 time.Duration + arg1 context.Context + arg2 string + arg3 time.Duration } lockRoomReturns struct { result1 string @@ -99,11 +107,12 @@ type FakeRoomStore struct { result1 string result2 error } - StoreParticipantStub func(string, *livekit.ParticipantInfo) error + StoreParticipantStub func(context.Context, string, *livekit.ParticipantInfo) error storeParticipantMutex sync.RWMutex storeParticipantArgsForCall []struct { - arg1 string - arg2 *livekit.ParticipantInfo + arg1 context.Context + arg2 string + arg3 *livekit.ParticipantInfo } storeParticipantReturns struct { result1 error @@ -111,10 +120,11 @@ type FakeRoomStore struct { storeParticipantReturnsOnCall map[int]struct { result1 error } - StoreRoomStub func(*livekit.Room) error + StoreRoomStub func(context.Context, *livekit.Room) error storeRoomMutex sync.RWMutex storeRoomArgsForCall []struct { - arg1 *livekit.Room + arg1 context.Context + arg2 *livekit.Room } storeRoomReturns struct { result1 error @@ -122,11 +132,12 @@ type FakeRoomStore struct { storeRoomReturnsOnCall map[int]struct { result1 error } - UnlockRoomStub func(string, string) error + UnlockRoomStub func(context.Context, string, string) error unlockRoomMutex sync.RWMutex unlockRoomArgsForCall []struct { - arg1 string + arg1 context.Context arg2 string + arg3 string } unlockRoomReturns struct { result1 error @@ -138,19 +149,20 @@ type FakeRoomStore struct { invocationsMutex sync.RWMutex } -func (fake *FakeRoomStore) DeleteParticipant(arg1 string, arg2 string) error { +func (fake *FakeRoomStore) DeleteParticipant(arg1 context.Context, arg2 string, arg3 string) error { fake.deleteParticipantMutex.Lock() ret, specificReturn := fake.deleteParticipantReturnsOnCall[len(fake.deleteParticipantArgsForCall)] fake.deleteParticipantArgsForCall = append(fake.deleteParticipantArgsForCall, struct { - arg1 string + arg1 context.Context arg2 string - }{arg1, arg2}) + arg3 string + }{arg1, arg2, arg3}) stub := fake.DeleteParticipantStub fakeReturns := fake.deleteParticipantReturns - fake.recordInvocation("DeleteParticipant", []interface{}{arg1, arg2}) + fake.recordInvocation("DeleteParticipant", []interface{}{arg1, arg2, arg3}) fake.deleteParticipantMutex.Unlock() if stub != nil { - return stub(arg1, arg2) + return stub(arg1, arg2, arg3) } if specificReturn { return ret.result1 @@ -164,17 +176,17 @@ func (fake *FakeRoomStore) DeleteParticipantCallCount() int { return len(fake.deleteParticipantArgsForCall) } -func (fake *FakeRoomStore) DeleteParticipantCalls(stub func(string, string) error) { +func (fake *FakeRoomStore) DeleteParticipantCalls(stub func(context.Context, string, string) error) { fake.deleteParticipantMutex.Lock() defer fake.deleteParticipantMutex.Unlock() fake.DeleteParticipantStub = stub } -func (fake *FakeRoomStore) DeleteParticipantArgsForCall(i int) (string, string) { +func (fake *FakeRoomStore) DeleteParticipantArgsForCall(i int) (context.Context, string, string) { fake.deleteParticipantMutex.RLock() defer fake.deleteParticipantMutex.RUnlock() argsForCall := fake.deleteParticipantArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 } func (fake *FakeRoomStore) DeleteParticipantReturns(result1 error) { @@ -200,18 +212,19 @@ func (fake *FakeRoomStore) DeleteParticipantReturnsOnCall(i int, result1 error) }{result1} } -func (fake *FakeRoomStore) DeleteRoom(arg1 string) error { +func (fake *FakeRoomStore) DeleteRoom(arg1 context.Context, arg2 string) error { fake.deleteRoomMutex.Lock() ret, specificReturn := fake.deleteRoomReturnsOnCall[len(fake.deleteRoomArgsForCall)] fake.deleteRoomArgsForCall = append(fake.deleteRoomArgsForCall, struct { - arg1 string - }{arg1}) + arg1 context.Context + arg2 string + }{arg1, arg2}) stub := fake.DeleteRoomStub fakeReturns := fake.deleteRoomReturns - fake.recordInvocation("DeleteRoom", []interface{}{arg1}) + fake.recordInvocation("DeleteRoom", []interface{}{arg1, arg2}) fake.deleteRoomMutex.Unlock() if stub != nil { - return stub(arg1) + return stub(arg1, arg2) } if specificReturn { return ret.result1 @@ -225,17 +238,17 @@ func (fake *FakeRoomStore) DeleteRoomCallCount() int { return len(fake.deleteRoomArgsForCall) } -func (fake *FakeRoomStore) DeleteRoomCalls(stub func(string) error) { +func (fake *FakeRoomStore) DeleteRoomCalls(stub func(context.Context, string) error) { fake.deleteRoomMutex.Lock() defer fake.deleteRoomMutex.Unlock() fake.DeleteRoomStub = stub } -func (fake *FakeRoomStore) DeleteRoomArgsForCall(i int) string { +func (fake *FakeRoomStore) DeleteRoomArgsForCall(i int) (context.Context, string) { fake.deleteRoomMutex.RLock() defer fake.deleteRoomMutex.RUnlock() argsForCall := fake.deleteRoomArgsForCall[i] - return argsForCall.arg1 + return argsForCall.arg1, argsForCall.arg2 } func (fake *FakeRoomStore) DeleteRoomReturns(result1 error) { @@ -261,18 +274,19 @@ func (fake *FakeRoomStore) DeleteRoomReturnsOnCall(i int, result1 error) { }{result1} } -func (fake *FakeRoomStore) ListParticipants(arg1 string) ([]*livekit.ParticipantInfo, error) { +func (fake *FakeRoomStore) ListParticipants(arg1 context.Context, arg2 string) ([]*livekit.ParticipantInfo, error) { fake.listParticipantsMutex.Lock() ret, specificReturn := fake.listParticipantsReturnsOnCall[len(fake.listParticipantsArgsForCall)] fake.listParticipantsArgsForCall = append(fake.listParticipantsArgsForCall, struct { - arg1 string - }{arg1}) + arg1 context.Context + arg2 string + }{arg1, arg2}) stub := fake.ListParticipantsStub fakeReturns := fake.listParticipantsReturns - fake.recordInvocation("ListParticipants", []interface{}{arg1}) + fake.recordInvocation("ListParticipants", []interface{}{arg1, arg2}) fake.listParticipantsMutex.Unlock() if stub != nil { - return stub(arg1) + return stub(arg1, arg2) } if specificReturn { return ret.result1, ret.result2 @@ -286,17 +300,17 @@ func (fake *FakeRoomStore) ListParticipantsCallCount() int { return len(fake.listParticipantsArgsForCall) } -func (fake *FakeRoomStore) ListParticipantsCalls(stub func(string) ([]*livekit.ParticipantInfo, error)) { +func (fake *FakeRoomStore) ListParticipantsCalls(stub func(context.Context, string) ([]*livekit.ParticipantInfo, error)) { fake.listParticipantsMutex.Lock() defer fake.listParticipantsMutex.Unlock() fake.ListParticipantsStub = stub } -func (fake *FakeRoomStore) ListParticipantsArgsForCall(i int) string { +func (fake *FakeRoomStore) ListParticipantsArgsForCall(i int) (context.Context, string) { fake.listParticipantsMutex.RLock() defer fake.listParticipantsMutex.RUnlock() argsForCall := fake.listParticipantsArgsForCall[i] - return argsForCall.arg1 + return argsForCall.arg1, argsForCall.arg2 } func (fake *FakeRoomStore) ListParticipantsReturns(result1 []*livekit.ParticipantInfo, result2 error) { @@ -325,17 +339,18 @@ func (fake *FakeRoomStore) ListParticipantsReturnsOnCall(i int, result1 []*livek }{result1, result2} } -func (fake *FakeRoomStore) ListRooms() ([]*livekit.Room, error) { +func (fake *FakeRoomStore) ListRooms(arg1 context.Context) ([]*livekit.Room, error) { fake.listRoomsMutex.Lock() ret, specificReturn := fake.listRoomsReturnsOnCall[len(fake.listRoomsArgsForCall)] fake.listRoomsArgsForCall = append(fake.listRoomsArgsForCall, struct { - }{}) + arg1 context.Context + }{arg1}) stub := fake.ListRoomsStub fakeReturns := fake.listRoomsReturns - fake.recordInvocation("ListRooms", []interface{}{}) + fake.recordInvocation("ListRooms", []interface{}{arg1}) fake.listRoomsMutex.Unlock() if stub != nil { - return stub() + return stub(arg1) } if specificReturn { return ret.result1, ret.result2 @@ -349,12 +364,19 @@ func (fake *FakeRoomStore) ListRoomsCallCount() int { return len(fake.listRoomsArgsForCall) } -func (fake *FakeRoomStore) ListRoomsCalls(stub func() ([]*livekit.Room, error)) { +func (fake *FakeRoomStore) ListRoomsCalls(stub func(context.Context) ([]*livekit.Room, error)) { fake.listRoomsMutex.Lock() defer fake.listRoomsMutex.Unlock() fake.ListRoomsStub = stub } +func (fake *FakeRoomStore) ListRoomsArgsForCall(i int) context.Context { + fake.listRoomsMutex.RLock() + defer fake.listRoomsMutex.RUnlock() + argsForCall := fake.listRoomsArgsForCall[i] + return argsForCall.arg1 +} + func (fake *FakeRoomStore) ListRoomsReturns(result1 []*livekit.Room, result2 error) { fake.listRoomsMutex.Lock() defer fake.listRoomsMutex.Unlock() @@ -381,19 +403,20 @@ func (fake *FakeRoomStore) ListRoomsReturnsOnCall(i int, result1 []*livekit.Room }{result1, result2} } -func (fake *FakeRoomStore) LoadParticipant(arg1 string, arg2 string) (*livekit.ParticipantInfo, error) { +func (fake *FakeRoomStore) LoadParticipant(arg1 context.Context, arg2 string, arg3 string) (*livekit.ParticipantInfo, error) { fake.loadParticipantMutex.Lock() ret, specificReturn := fake.loadParticipantReturnsOnCall[len(fake.loadParticipantArgsForCall)] fake.loadParticipantArgsForCall = append(fake.loadParticipantArgsForCall, struct { - arg1 string + arg1 context.Context arg2 string - }{arg1, arg2}) + arg3 string + }{arg1, arg2, arg3}) stub := fake.LoadParticipantStub fakeReturns := fake.loadParticipantReturns - fake.recordInvocation("LoadParticipant", []interface{}{arg1, arg2}) + fake.recordInvocation("LoadParticipant", []interface{}{arg1, arg2, arg3}) fake.loadParticipantMutex.Unlock() if stub != nil { - return stub(arg1, arg2) + return stub(arg1, arg2, arg3) } if specificReturn { return ret.result1, ret.result2 @@ -407,17 +430,17 @@ func (fake *FakeRoomStore) LoadParticipantCallCount() int { return len(fake.loadParticipantArgsForCall) } -func (fake *FakeRoomStore) LoadParticipantCalls(stub func(string, string) (*livekit.ParticipantInfo, error)) { +func (fake *FakeRoomStore) LoadParticipantCalls(stub func(context.Context, string, string) (*livekit.ParticipantInfo, error)) { fake.loadParticipantMutex.Lock() defer fake.loadParticipantMutex.Unlock() fake.LoadParticipantStub = stub } -func (fake *FakeRoomStore) LoadParticipantArgsForCall(i int) (string, string) { +func (fake *FakeRoomStore) LoadParticipantArgsForCall(i int) (context.Context, string, string) { fake.loadParticipantMutex.RLock() defer fake.loadParticipantMutex.RUnlock() argsForCall := fake.loadParticipantArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 } func (fake *FakeRoomStore) LoadParticipantReturns(result1 *livekit.ParticipantInfo, result2 error) { @@ -446,18 +469,19 @@ func (fake *FakeRoomStore) LoadParticipantReturnsOnCall(i int, result1 *livekit. }{result1, result2} } -func (fake *FakeRoomStore) LoadRoom(arg1 string) (*livekit.Room, error) { +func (fake *FakeRoomStore) LoadRoom(arg1 context.Context, arg2 string) (*livekit.Room, error) { fake.loadRoomMutex.Lock() ret, specificReturn := fake.loadRoomReturnsOnCall[len(fake.loadRoomArgsForCall)] fake.loadRoomArgsForCall = append(fake.loadRoomArgsForCall, struct { - arg1 string - }{arg1}) + arg1 context.Context + arg2 string + }{arg1, arg2}) stub := fake.LoadRoomStub fakeReturns := fake.loadRoomReturns - fake.recordInvocation("LoadRoom", []interface{}{arg1}) + fake.recordInvocation("LoadRoom", []interface{}{arg1, arg2}) fake.loadRoomMutex.Unlock() if stub != nil { - return stub(arg1) + return stub(arg1, arg2) } if specificReturn { return ret.result1, ret.result2 @@ -471,17 +495,17 @@ func (fake *FakeRoomStore) LoadRoomCallCount() int { return len(fake.loadRoomArgsForCall) } -func (fake *FakeRoomStore) LoadRoomCalls(stub func(string) (*livekit.Room, error)) { +func (fake *FakeRoomStore) LoadRoomCalls(stub func(context.Context, string) (*livekit.Room, error)) { fake.loadRoomMutex.Lock() defer fake.loadRoomMutex.Unlock() fake.LoadRoomStub = stub } -func (fake *FakeRoomStore) LoadRoomArgsForCall(i int) string { +func (fake *FakeRoomStore) LoadRoomArgsForCall(i int) (context.Context, string) { fake.loadRoomMutex.RLock() defer fake.loadRoomMutex.RUnlock() argsForCall := fake.loadRoomArgsForCall[i] - return argsForCall.arg1 + return argsForCall.arg1, argsForCall.arg2 } func (fake *FakeRoomStore) LoadRoomReturns(result1 *livekit.Room, result2 error) { @@ -510,19 +534,20 @@ func (fake *FakeRoomStore) LoadRoomReturnsOnCall(i int, result1 *livekit.Room, r }{result1, result2} } -func (fake *FakeRoomStore) LockRoom(arg1 string, arg2 time.Duration) (string, error) { +func (fake *FakeRoomStore) LockRoom(arg1 context.Context, arg2 string, arg3 time.Duration) (string, error) { fake.lockRoomMutex.Lock() ret, specificReturn := fake.lockRoomReturnsOnCall[len(fake.lockRoomArgsForCall)] fake.lockRoomArgsForCall = append(fake.lockRoomArgsForCall, struct { - arg1 string - arg2 time.Duration - }{arg1, arg2}) + arg1 context.Context + arg2 string + arg3 time.Duration + }{arg1, arg2, arg3}) stub := fake.LockRoomStub fakeReturns := fake.lockRoomReturns - fake.recordInvocation("LockRoom", []interface{}{arg1, arg2}) + fake.recordInvocation("LockRoom", []interface{}{arg1, arg2, arg3}) fake.lockRoomMutex.Unlock() if stub != nil { - return stub(arg1, arg2) + return stub(arg1, arg2, arg3) } if specificReturn { return ret.result1, ret.result2 @@ -536,17 +561,17 @@ func (fake *FakeRoomStore) LockRoomCallCount() int { return len(fake.lockRoomArgsForCall) } -func (fake *FakeRoomStore) LockRoomCalls(stub func(string, time.Duration) (string, error)) { +func (fake *FakeRoomStore) LockRoomCalls(stub func(context.Context, string, time.Duration) (string, error)) { fake.lockRoomMutex.Lock() defer fake.lockRoomMutex.Unlock() fake.LockRoomStub = stub } -func (fake *FakeRoomStore) LockRoomArgsForCall(i int) (string, time.Duration) { +func (fake *FakeRoomStore) LockRoomArgsForCall(i int) (context.Context, string, time.Duration) { fake.lockRoomMutex.RLock() defer fake.lockRoomMutex.RUnlock() argsForCall := fake.lockRoomArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 } func (fake *FakeRoomStore) LockRoomReturns(result1 string, result2 error) { @@ -575,19 +600,20 @@ func (fake *FakeRoomStore) LockRoomReturnsOnCall(i int, result1 string, result2 }{result1, result2} } -func (fake *FakeRoomStore) StoreParticipant(arg1 string, arg2 *livekit.ParticipantInfo) error { +func (fake *FakeRoomStore) StoreParticipant(arg1 context.Context, arg2 string, arg3 *livekit.ParticipantInfo) error { fake.storeParticipantMutex.Lock() ret, specificReturn := fake.storeParticipantReturnsOnCall[len(fake.storeParticipantArgsForCall)] fake.storeParticipantArgsForCall = append(fake.storeParticipantArgsForCall, struct { - arg1 string - arg2 *livekit.ParticipantInfo - }{arg1, arg2}) + arg1 context.Context + arg2 string + arg3 *livekit.ParticipantInfo + }{arg1, arg2, arg3}) stub := fake.StoreParticipantStub fakeReturns := fake.storeParticipantReturns - fake.recordInvocation("StoreParticipant", []interface{}{arg1, arg2}) + fake.recordInvocation("StoreParticipant", []interface{}{arg1, arg2, arg3}) fake.storeParticipantMutex.Unlock() if stub != nil { - return stub(arg1, arg2) + return stub(arg1, arg2, arg3) } if specificReturn { return ret.result1 @@ -601,17 +627,17 @@ func (fake *FakeRoomStore) StoreParticipantCallCount() int { return len(fake.storeParticipantArgsForCall) } -func (fake *FakeRoomStore) StoreParticipantCalls(stub func(string, *livekit.ParticipantInfo) error) { +func (fake *FakeRoomStore) StoreParticipantCalls(stub func(context.Context, string, *livekit.ParticipantInfo) error) { fake.storeParticipantMutex.Lock() defer fake.storeParticipantMutex.Unlock() fake.StoreParticipantStub = stub } -func (fake *FakeRoomStore) StoreParticipantArgsForCall(i int) (string, *livekit.ParticipantInfo) { +func (fake *FakeRoomStore) StoreParticipantArgsForCall(i int) (context.Context, string, *livekit.ParticipantInfo) { fake.storeParticipantMutex.RLock() defer fake.storeParticipantMutex.RUnlock() argsForCall := fake.storeParticipantArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 } func (fake *FakeRoomStore) StoreParticipantReturns(result1 error) { @@ -637,18 +663,19 @@ func (fake *FakeRoomStore) StoreParticipantReturnsOnCall(i int, result1 error) { }{result1} } -func (fake *FakeRoomStore) StoreRoom(arg1 *livekit.Room) error { +func (fake *FakeRoomStore) StoreRoom(arg1 context.Context, arg2 *livekit.Room) error { fake.storeRoomMutex.Lock() ret, specificReturn := fake.storeRoomReturnsOnCall[len(fake.storeRoomArgsForCall)] fake.storeRoomArgsForCall = append(fake.storeRoomArgsForCall, struct { - arg1 *livekit.Room - }{arg1}) + arg1 context.Context + arg2 *livekit.Room + }{arg1, arg2}) stub := fake.StoreRoomStub fakeReturns := fake.storeRoomReturns - fake.recordInvocation("StoreRoom", []interface{}{arg1}) + fake.recordInvocation("StoreRoom", []interface{}{arg1, arg2}) fake.storeRoomMutex.Unlock() if stub != nil { - return stub(arg1) + return stub(arg1, arg2) } if specificReturn { return ret.result1 @@ -662,17 +689,17 @@ func (fake *FakeRoomStore) StoreRoomCallCount() int { return len(fake.storeRoomArgsForCall) } -func (fake *FakeRoomStore) StoreRoomCalls(stub func(*livekit.Room) error) { +func (fake *FakeRoomStore) StoreRoomCalls(stub func(context.Context, *livekit.Room) error) { fake.storeRoomMutex.Lock() defer fake.storeRoomMutex.Unlock() fake.StoreRoomStub = stub } -func (fake *FakeRoomStore) StoreRoomArgsForCall(i int) *livekit.Room { +func (fake *FakeRoomStore) StoreRoomArgsForCall(i int) (context.Context, *livekit.Room) { fake.storeRoomMutex.RLock() defer fake.storeRoomMutex.RUnlock() argsForCall := fake.storeRoomArgsForCall[i] - return argsForCall.arg1 + return argsForCall.arg1, argsForCall.arg2 } func (fake *FakeRoomStore) StoreRoomReturns(result1 error) { @@ -698,19 +725,20 @@ func (fake *FakeRoomStore) StoreRoomReturnsOnCall(i int, result1 error) { }{result1} } -func (fake *FakeRoomStore) UnlockRoom(arg1 string, arg2 string) error { +func (fake *FakeRoomStore) UnlockRoom(arg1 context.Context, arg2 string, arg3 string) error { fake.unlockRoomMutex.Lock() ret, specificReturn := fake.unlockRoomReturnsOnCall[len(fake.unlockRoomArgsForCall)] fake.unlockRoomArgsForCall = append(fake.unlockRoomArgsForCall, struct { - arg1 string + arg1 context.Context arg2 string - }{arg1, arg2}) + arg3 string + }{arg1, arg2, arg3}) stub := fake.UnlockRoomStub fakeReturns := fake.unlockRoomReturns - fake.recordInvocation("UnlockRoom", []interface{}{arg1, arg2}) + fake.recordInvocation("UnlockRoom", []interface{}{arg1, arg2, arg3}) fake.unlockRoomMutex.Unlock() if stub != nil { - return stub(arg1, arg2) + return stub(arg1, arg2, arg3) } if specificReturn { return ret.result1 @@ -724,17 +752,17 @@ func (fake *FakeRoomStore) UnlockRoomCallCount() int { return len(fake.unlockRoomArgsForCall) } -func (fake *FakeRoomStore) UnlockRoomCalls(stub func(string, string) error) { +func (fake *FakeRoomStore) UnlockRoomCalls(stub func(context.Context, string, string) error) { fake.unlockRoomMutex.Lock() defer fake.unlockRoomMutex.Unlock() fake.UnlockRoomStub = stub } -func (fake *FakeRoomStore) UnlockRoomArgsForCall(i int) (string, string) { +func (fake *FakeRoomStore) UnlockRoomArgsForCall(i int) (context.Context, string, string) { fake.unlockRoomMutex.RLock() defer fake.unlockRoomMutex.RUnlock() argsForCall := fake.unlockRoomArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 } func (fake *FakeRoomStore) UnlockRoomReturns(result1 error) { diff --git a/pkg/service/turn.go b/pkg/service/turn.go index 2767dc609..9e7384d5b 100644 --- a/pkg/service/turn.go +++ b/pkg/service/turn.go @@ -1,6 +1,7 @@ package service import ( + "context" "crypto/tls" "net" "strconv" @@ -96,7 +97,7 @@ func NewTurnServer(conf *config.Config, roomStore RoomStore, node routing.LocalN func newTurnAuthHandler(roomStore RoomStore) turn.AuthHandler { return func(username, realm string, srcAddr net.Addr) (key []byte, ok bool) { // room id should be the username, create a hashed room id - rm, err := roomStore.LoadRoom(username) + rm, err := roomStore.LoadRoom(context.Background(), username) if err != nil { return nil, false } diff --git a/test/singlenode_test.go b/test/singlenode_test.go index 0e127a396..ea45cae4e 100644 --- a/test/singlenode_test.go +++ b/test/singlenode_test.go @@ -1,13 +1,15 @@ package test import ( + "context" "strings" "testing" "time" + "github.com/stretchr/testify/require" + "github.com/livekit/livekit-server/pkg/testutils" testclient "github.com/livekit/livekit-server/test/client" - "github.com/stretchr/testify/require" ) func TestClientCouldConnect(t *testing.T) { @@ -100,7 +102,7 @@ func TestSinglePublisher(t *testing.T) { c3.Stop() success = testutils.WithTimeout(t, "c3 is cleaned up as a subscriber", func() bool { - room := s.RoomManager().GetRoom(testRoom) + room := s.RoomManager().GetRoom(context.Background(), testRoom) require.NotNil(t, room) p := room.GetParticipant("c1") diff --git a/test/webhook_test.go b/test/webhook_test.go index e7617367f..17489f00e 100644 --- a/test/webhook_test.go +++ b/test/webhook_test.go @@ -73,7 +73,7 @@ func TestWebhooks(t *testing.T) { ts.ClearEvents() // room closed - rm := server.RoomManager().GetRoom(testRoom) + rm := server.RoomManager().GetRoom(context.Background(), testRoom) rm.Close() testutils.WithTimeout(t, "webhook events room_finished", func() bool { if ts.GetEvent(webhook.EventRoomFinished) == nil {