add ctx to interfaces (#105)

* add ctx to interfaces

* use existing context
This commit is contained in:
David Colburn
2021-08-30 20:31:24 -05:00
committed by GitHub
parent 1e10d440c8
commit 1f1eea383f
17 changed files with 321 additions and 267 deletions
-2
View File
@@ -4,8 +4,6 @@
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/bin" />
<excludeFolder url="file://$MODULE_DIR$/pkg/routing/routingfakes" />
<excludeFolder url="file://$MODULE_DIR$/pkg/service/servicefakes" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
+9 -7
View File
@@ -1,6 +1,8 @@
package routing
import (
"context"
livekit "github.com/livekit/protocol/proto"
"google.golang.org/protobuf/proto"
)
@@ -33,15 +35,15 @@ type ParticipantInit struct {
Hidden bool
}
type NewParticipantCallback func(roomName string, pi ParticipantInit, requestSource MessageSource, responseSink MessageSink)
type RTCMessageCallback func(roomName, identity string, msg *livekit.RTCNodeMessage)
type NewParticipantCallback func(ctx context.Context, roomName string, pi ParticipantInit, requestSource MessageSource, responseSink MessageSink)
type RTCMessageCallback func(ctx context.Context, roomName, identity string, msg *livekit.RTCNodeMessage)
// Router allows multiple nodes to coordinate the participant session
//counterfeiter:generate . Router
type Router interface {
GetNodeForRoom(roomName string) (*livekit.Node, error)
SetNodeForRoom(roomName string, nodeId string) error
ClearRoomState(roomName string) error
GetNodeForRoom(ctx context.Context, roomName string) (*livekit.Node, error)
SetNodeForRoom(ctx context.Context, roomName string, nodeId string) error
ClearRoomState(ctx context.Context, roomName string) error
RegisterNode() error
UnregisterNode() error
RemoveDeadNodes() error
@@ -49,10 +51,10 @@ type Router interface {
ListNodes() ([]*livekit.Node, error)
// StartParticipantSignal participant signal connection is ready to start
StartParticipantSignal(roomName string, pi ParticipantInit) (connectionId string, reqSink MessageSink, resSource MessageSource, err error)
StartParticipantSignal(ctx context.Context, roomName string, pi ParticipantInit) (connectionId string, reqSink MessageSink, resSource MessageSource, err error)
// CreateRTCSink sends a message to RTC node
CreateRTCSink(roomName, identity string) (MessageSink, error)
CreateRTCSink(ctx context.Context, roomName, identity string) (MessageSink, error)
// OnNewParticipantRTC is called to start a new participant's RTC connection
OnNewParticipantRTC(callback NewParticipantCallback)
+8 -6
View File
@@ -1,6 +1,7 @@
package routing
import (
"context"
"sync"
"time"
@@ -35,18 +36,18 @@ func NewLocalRouter(currentNode LocalNode) *LocalRouter {
}
}
func (r *LocalRouter) GetNodeForRoom(roomName string) (*livekit.Node, error) {
func (r *LocalRouter) GetNodeForRoom(ctx context.Context, roomName string) (*livekit.Node, error) {
r.lock.Lock()
defer r.lock.Unlock()
node := proto.Clone((*livekit.Node)(r.currentNode)).(*livekit.Node)
return node, nil
}
func (r *LocalRouter) SetNodeForRoom(roomName string, nodeId string) error {
func (r *LocalRouter) SetNodeForRoom(ctx context.Context, roomName string, nodeId string) error {
return nil
}
func (r *LocalRouter) ClearRoomState(roomName string) error {
func (r *LocalRouter) ClearRoomState(ctx context.Context, roomName string) error {
// do nothing
return nil
}
@@ -76,7 +77,7 @@ func (r *LocalRouter) ListNodes() ([]*livekit.Node, error) {
}, nil
}
func (r *LocalRouter) StartParticipantSignal(roomName string, pi ParticipantInit) (connectionId string, reqSink MessageSink, resSource MessageSource, err error) {
func (r *LocalRouter) StartParticipantSignal(ctx context.Context, roomName string, pi ParticipantInit) (connectionId string, reqSink MessageSink, resSource MessageSource, err error) {
// treat it as a new participant connecting
if r.onNewParticipant == nil {
err = ErrHandlerNotDefined
@@ -89,6 +90,7 @@ func (r *LocalRouter) StartParticipantSignal(roomName string, pi ParticipantInit
resChan := r.getOrCreateMessageChannel(r.responseChannels, key)
r.onNewParticipant(
ctx,
roomName,
pi,
// request source
@@ -99,7 +101,7 @@ func (r *LocalRouter) StartParticipantSignal(roomName string, pi ParticipantInit
return pi.Identity, reqChan, resChan, nil
}
func (r *LocalRouter) CreateRTCSink(roomName, identity string) (MessageSink, error) {
func (r *LocalRouter) CreateRTCSink(ctx context.Context, roomName, identity string) (MessageSink, error) {
if r.rtcMessageChan.isClosed.Get() {
// create a new one
r.rtcMessageChan = NewMessageChannel()
@@ -167,7 +169,7 @@ func (r *LocalRouter) rtcMessageWorker() {
continue
}
if r.onRTCMessage != nil {
r.onRTCMessage(room, identity, rtcMsg)
r.onRTCMessage(context.Background(), room, identity, rtcMsg)
}
}
}
+9 -8
View File
@@ -77,7 +77,7 @@ func (r *RedisRouter) RemoveDeadNodes() error {
return nil
}
func (r *RedisRouter) GetNodeForRoom(roomName string) (*livekit.Node, error) {
func (r *RedisRouter) GetNodeForRoom(ctx context.Context, roomName string) (*livekit.Node, error) {
nodeId, err := r.rc.HGet(r.ctx, NodeRoomKey, roomName).Result()
if err == redis.Nil {
return nil, ErrNotFound
@@ -88,11 +88,11 @@ func (r *RedisRouter) GetNodeForRoom(roomName string) (*livekit.Node, error) {
return r.GetNode(nodeId)
}
func (r *RedisRouter) SetNodeForRoom(roomName string, nodeId string) error {
func (r *RedisRouter) SetNodeForRoom(ctx context.Context, roomName string, nodeId string) error {
return r.rc.HSet(r.ctx, NodeRoomKey, roomName, nodeId).Err()
}
func (r *RedisRouter) ClearRoomState(roomName string) error {
func (r *RedisRouter) ClearRoomState(ctx context.Context, roomName string) error {
if err := r.rc.HDel(r.ctx, NodeRoomKey, roomName).Err(); err != nil {
return errors.Wrap(err, "could not clear room state")
}
@@ -130,9 +130,9 @@ func (r *RedisRouter) ListNodes() ([]*livekit.Node, error) {
}
// StartParticipantSignal signal connection sets up paths to the RTC node, and starts to route messages to that message queue
func (r *RedisRouter) StartParticipantSignal(roomName string, pi ParticipantInit) (connectionId string, reqSink MessageSink, resSource MessageSource, err error) {
func (r *RedisRouter) StartParticipantSignal(ctx context.Context, roomName string, pi ParticipantInit) (connectionId string, reqSink MessageSink, resSource MessageSource, err error) {
// find the node where the room is hosted at
rtcNode, err := r.GetNodeForRoom(roomName)
rtcNode, err := r.GetNodeForRoom(ctx, roomName)
if err != nil {
return
}
@@ -171,7 +171,7 @@ func (r *RedisRouter) StartParticipantSignal(roomName string, pi ParticipantInit
return connectionId, sink, resChan, nil
}
func (r *RedisRouter) CreateRTCSink(roomName, identity string) (MessageSink, error) {
func (r *RedisRouter) CreateRTCSink(ctx context.Context, roomName, identity string) (MessageSink, error) {
pkey := ParticipantKey(roomName, identity)
rtcNode, err := r.getParticipantRTCNode(pkey)
if err != nil {
@@ -183,7 +183,7 @@ func (r *RedisRouter) CreateRTCSink(roomName, identity string) (MessageSink, err
func (r *RedisRouter) startParticipantRTC(ss *livekit.StartSession, participantKey string) error {
// find the node where the room is hosted at
rtcNode, err := r.GetNodeForRoom(ss.RoomName)
rtcNode, err := r.GetNodeForRoom(r.ctx, ss.RoomName)
if err != nil {
return err
}
@@ -236,6 +236,7 @@ func (r *RedisRouter) startParticipantRTC(ss *livekit.StartSession, participantK
reqChan := r.getOrCreateMessageChannel(r.requestChannels, participantKey)
resSink := NewSignalNodeSink(r.rc, signalNode, ss.ConnectionId)
r.onNewParticipant(
r.ctx,
ss.RoomName,
pi,
reqChan,
@@ -416,7 +417,7 @@ func (r *RedisRouter) handleRTCMessage(rm *livekit.RTCNodeMessage) error {
if err != nil {
return err
}
r.onRTCMessage(roomName, identity, rm)
r.onRTCMessage(r.ctx, roomName, identity, rm)
}
}
return nil
+63 -52
View File
@@ -2,6 +2,7 @@
package routingfakes
import (
"context"
"sync"
"github.com/livekit/livekit-server/pkg/routing"
@@ -9,10 +10,11 @@ import (
)
type FakeRouter struct {
ClearRoomStateStub func(string) error
ClearRoomStateStub func(context.Context, string) error
clearRoomStateMutex sync.RWMutex
clearRoomStateArgsForCall []struct {
arg1 string
arg1 context.Context
arg2 string
}
clearRoomStateReturns struct {
result1 error
@@ -20,11 +22,12 @@ type FakeRouter struct {
clearRoomStateReturnsOnCall map[int]struct {
result1 error
}
CreateRTCSinkStub func(string, string) (routing.MessageSink, error)
CreateRTCSinkStub func(context.Context, string, string) (routing.MessageSink, error)
createRTCSinkMutex sync.RWMutex
createRTCSinkArgsForCall []struct {
arg1 string
arg1 context.Context
arg2 string
arg3 string
}
createRTCSinkReturns struct {
result1 routing.MessageSink
@@ -47,10 +50,11 @@ type FakeRouter struct {
result1 *livekit.Node
result2 error
}
GetNodeForRoomStub func(string) (*livekit.Node, error)
GetNodeForRoomStub func(context.Context, string) (*livekit.Node, error)
getNodeForRoomMutex sync.RWMutex
getNodeForRoomArgsForCall []struct {
arg1 string
arg1 context.Context
arg2 string
}
getNodeForRoomReturns struct {
result1 *livekit.Node
@@ -102,11 +106,12 @@ type FakeRouter struct {
removeDeadNodesReturnsOnCall map[int]struct {
result1 error
}
SetNodeForRoomStub func(string, string) error
SetNodeForRoomStub func(context.Context, string, string) error
setNodeForRoomMutex sync.RWMutex
setNodeForRoomArgsForCall []struct {
arg1 string
arg1 context.Context
arg2 string
arg3 string
}
setNodeForRoomReturns struct {
result1 error
@@ -124,11 +129,12 @@ type FakeRouter struct {
startReturnsOnCall map[int]struct {
result1 error
}
StartParticipantSignalStub func(string, routing.ParticipantInit) (string, routing.MessageSink, routing.MessageSource, error)
StartParticipantSignalStub func(context.Context, string, routing.ParticipantInit) (string, routing.MessageSink, routing.MessageSource, error)
startParticipantSignalMutex sync.RWMutex
startParticipantSignalArgsForCall []struct {
arg1 string
arg2 routing.ParticipantInit
arg1 context.Context
arg2 string
arg3 routing.ParticipantInit
}
startParticipantSignalReturns struct {
result1 string
@@ -160,18 +166,19 @@ type FakeRouter struct {
invocationsMutex sync.RWMutex
}
func (fake *FakeRouter) ClearRoomState(arg1 string) error {
func (fake *FakeRouter) ClearRoomState(arg1 context.Context, arg2 string) error {
fake.clearRoomStateMutex.Lock()
ret, specificReturn := fake.clearRoomStateReturnsOnCall[len(fake.clearRoomStateArgsForCall)]
fake.clearRoomStateArgsForCall = append(fake.clearRoomStateArgsForCall, struct {
arg1 string
}{arg1})
arg1 context.Context
arg2 string
}{arg1, arg2})
stub := fake.ClearRoomStateStub
fakeReturns := fake.clearRoomStateReturns
fake.recordInvocation("ClearRoomState", []interface{}{arg1})
fake.recordInvocation("ClearRoomState", []interface{}{arg1, arg2})
fake.clearRoomStateMutex.Unlock()
if stub != nil {
return stub(arg1)
return stub(arg1, arg2)
}
if specificReturn {
return ret.result1
@@ -185,17 +192,17 @@ func (fake *FakeRouter) ClearRoomStateCallCount() int {
return len(fake.clearRoomStateArgsForCall)
}
func (fake *FakeRouter) ClearRoomStateCalls(stub func(string) error) {
func (fake *FakeRouter) ClearRoomStateCalls(stub func(context.Context, string) error) {
fake.clearRoomStateMutex.Lock()
defer fake.clearRoomStateMutex.Unlock()
fake.ClearRoomStateStub = stub
}
func (fake *FakeRouter) ClearRoomStateArgsForCall(i int) string {
func (fake *FakeRouter) ClearRoomStateArgsForCall(i int) (context.Context, string) {
fake.clearRoomStateMutex.RLock()
defer fake.clearRoomStateMutex.RUnlock()
argsForCall := fake.clearRoomStateArgsForCall[i]
return argsForCall.arg1
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeRouter) ClearRoomStateReturns(result1 error) {
@@ -221,19 +228,20 @@ func (fake *FakeRouter) ClearRoomStateReturnsOnCall(i int, result1 error) {
}{result1}
}
func (fake *FakeRouter) CreateRTCSink(arg1 string, arg2 string) (routing.MessageSink, error) {
func (fake *FakeRouter) CreateRTCSink(arg1 context.Context, arg2 string, arg3 string) (routing.MessageSink, error) {
fake.createRTCSinkMutex.Lock()
ret, specificReturn := fake.createRTCSinkReturnsOnCall[len(fake.createRTCSinkArgsForCall)]
fake.createRTCSinkArgsForCall = append(fake.createRTCSinkArgsForCall, struct {
arg1 string
arg1 context.Context
arg2 string
}{arg1, arg2})
arg3 string
}{arg1, arg2, arg3})
stub := fake.CreateRTCSinkStub
fakeReturns := fake.createRTCSinkReturns
fake.recordInvocation("CreateRTCSink", []interface{}{arg1, arg2})
fake.recordInvocation("CreateRTCSink", []interface{}{arg1, arg2, arg3})
fake.createRTCSinkMutex.Unlock()
if stub != nil {
return stub(arg1, arg2)
return stub(arg1, arg2, arg3)
}
if specificReturn {
return ret.result1, ret.result2
@@ -247,17 +255,17 @@ func (fake *FakeRouter) CreateRTCSinkCallCount() int {
return len(fake.createRTCSinkArgsForCall)
}
func (fake *FakeRouter) CreateRTCSinkCalls(stub func(string, string) (routing.MessageSink, error)) {
func (fake *FakeRouter) CreateRTCSinkCalls(stub func(context.Context, string, string) (routing.MessageSink, error)) {
fake.createRTCSinkMutex.Lock()
defer fake.createRTCSinkMutex.Unlock()
fake.CreateRTCSinkStub = stub
}
func (fake *FakeRouter) CreateRTCSinkArgsForCall(i int) (string, string) {
func (fake *FakeRouter) CreateRTCSinkArgsForCall(i int) (context.Context, string, string) {
fake.createRTCSinkMutex.RLock()
defer fake.createRTCSinkMutex.RUnlock()
argsForCall := fake.createRTCSinkArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
}
func (fake *FakeRouter) CreateRTCSinkReturns(result1 routing.MessageSink, result2 error) {
@@ -350,18 +358,19 @@ func (fake *FakeRouter) GetNodeReturnsOnCall(i int, result1 *livekit.Node, resul
}{result1, result2}
}
func (fake *FakeRouter) GetNodeForRoom(arg1 string) (*livekit.Node, error) {
func (fake *FakeRouter) GetNodeForRoom(arg1 context.Context, arg2 string) (*livekit.Node, error) {
fake.getNodeForRoomMutex.Lock()
ret, specificReturn := fake.getNodeForRoomReturnsOnCall[len(fake.getNodeForRoomArgsForCall)]
fake.getNodeForRoomArgsForCall = append(fake.getNodeForRoomArgsForCall, struct {
arg1 string
}{arg1})
arg1 context.Context
arg2 string
}{arg1, arg2})
stub := fake.GetNodeForRoomStub
fakeReturns := fake.getNodeForRoomReturns
fake.recordInvocation("GetNodeForRoom", []interface{}{arg1})
fake.recordInvocation("GetNodeForRoom", []interface{}{arg1, arg2})
fake.getNodeForRoomMutex.Unlock()
if stub != nil {
return stub(arg1)
return stub(arg1, arg2)
}
if specificReturn {
return ret.result1, ret.result2
@@ -375,17 +384,17 @@ func (fake *FakeRouter) GetNodeForRoomCallCount() int {
return len(fake.getNodeForRoomArgsForCall)
}
func (fake *FakeRouter) GetNodeForRoomCalls(stub func(string) (*livekit.Node, error)) {
func (fake *FakeRouter) GetNodeForRoomCalls(stub func(context.Context, string) (*livekit.Node, error)) {
fake.getNodeForRoomMutex.Lock()
defer fake.getNodeForRoomMutex.Unlock()
fake.GetNodeForRoomStub = stub
}
func (fake *FakeRouter) GetNodeForRoomArgsForCall(i int) string {
func (fake *FakeRouter) GetNodeForRoomArgsForCall(i int) (context.Context, string) {
fake.getNodeForRoomMutex.RLock()
defer fake.getNodeForRoomMutex.RUnlock()
argsForCall := fake.getNodeForRoomArgsForCall[i]
return argsForCall.arg1
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeRouter) GetNodeForRoomReturns(result1 *livekit.Node, result2 error) {
@@ -640,19 +649,20 @@ func (fake *FakeRouter) RemoveDeadNodesReturnsOnCall(i int, result1 error) {
}{result1}
}
func (fake *FakeRouter) SetNodeForRoom(arg1 string, arg2 string) error {
func (fake *FakeRouter) SetNodeForRoom(arg1 context.Context, arg2 string, arg3 string) error {
fake.setNodeForRoomMutex.Lock()
ret, specificReturn := fake.setNodeForRoomReturnsOnCall[len(fake.setNodeForRoomArgsForCall)]
fake.setNodeForRoomArgsForCall = append(fake.setNodeForRoomArgsForCall, struct {
arg1 string
arg1 context.Context
arg2 string
}{arg1, arg2})
arg3 string
}{arg1, arg2, arg3})
stub := fake.SetNodeForRoomStub
fakeReturns := fake.setNodeForRoomReturns
fake.recordInvocation("SetNodeForRoom", []interface{}{arg1, arg2})
fake.recordInvocation("SetNodeForRoom", []interface{}{arg1, arg2, arg3})
fake.setNodeForRoomMutex.Unlock()
if stub != nil {
return stub(arg1, arg2)
return stub(arg1, arg2, arg3)
}
if specificReturn {
return ret.result1
@@ -666,17 +676,17 @@ func (fake *FakeRouter) SetNodeForRoomCallCount() int {
return len(fake.setNodeForRoomArgsForCall)
}
func (fake *FakeRouter) SetNodeForRoomCalls(stub func(string, string) error) {
func (fake *FakeRouter) SetNodeForRoomCalls(stub func(context.Context, string, string) error) {
fake.setNodeForRoomMutex.Lock()
defer fake.setNodeForRoomMutex.Unlock()
fake.SetNodeForRoomStub = stub
}
func (fake *FakeRouter) SetNodeForRoomArgsForCall(i int) (string, string) {
func (fake *FakeRouter) SetNodeForRoomArgsForCall(i int) (context.Context, string, string) {
fake.setNodeForRoomMutex.RLock()
defer fake.setNodeForRoomMutex.RUnlock()
argsForCall := fake.setNodeForRoomArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
}
func (fake *FakeRouter) SetNodeForRoomReturns(result1 error) {
@@ -755,19 +765,20 @@ func (fake *FakeRouter) StartReturnsOnCall(i int, result1 error) {
}{result1}
}
func (fake *FakeRouter) StartParticipantSignal(arg1 string, arg2 routing.ParticipantInit) (string, routing.MessageSink, routing.MessageSource, error) {
func (fake *FakeRouter) StartParticipantSignal(arg1 context.Context, arg2 string, arg3 routing.ParticipantInit) (string, routing.MessageSink, routing.MessageSource, error) {
fake.startParticipantSignalMutex.Lock()
ret, specificReturn := fake.startParticipantSignalReturnsOnCall[len(fake.startParticipantSignalArgsForCall)]
fake.startParticipantSignalArgsForCall = append(fake.startParticipantSignalArgsForCall, struct {
arg1 string
arg2 routing.ParticipantInit
}{arg1, arg2})
arg1 context.Context
arg2 string
arg3 routing.ParticipantInit
}{arg1, arg2, arg3})
stub := fake.StartParticipantSignalStub
fakeReturns := fake.startParticipantSignalReturns
fake.recordInvocation("StartParticipantSignal", []interface{}{arg1, arg2})
fake.recordInvocation("StartParticipantSignal", []interface{}{arg1, arg2, arg3})
fake.startParticipantSignalMutex.Unlock()
if stub != nil {
return stub(arg1, arg2)
return stub(arg1, arg2, arg3)
}
if specificReturn {
return ret.result1, ret.result2, ret.result3, ret.result4
@@ -781,17 +792,17 @@ func (fake *FakeRouter) StartParticipantSignalCallCount() int {
return len(fake.startParticipantSignalArgsForCall)
}
func (fake *FakeRouter) StartParticipantSignalCalls(stub func(string, routing.ParticipantInit) (string, routing.MessageSink, routing.MessageSource, error)) {
func (fake *FakeRouter) StartParticipantSignalCalls(stub func(context.Context, string, routing.ParticipantInit) (string, routing.MessageSink, routing.MessageSource, error)) {
fake.startParticipantSignalMutex.Lock()
defer fake.startParticipantSignalMutex.Unlock()
fake.StartParticipantSignalStub = stub
}
func (fake *FakeRouter) StartParticipantSignalArgsForCall(i int) (string, routing.ParticipantInit) {
func (fake *FakeRouter) StartParticipantSignalArgsForCall(i int) (context.Context, string, routing.ParticipantInit) {
fake.startParticipantSignalMutex.RLock()
defer fake.startParticipantSignalMutex.RUnlock()
argsForCall := fake.startParticipantSignalArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
}
func (fake *FakeRouter) StartParticipantSignalReturns(result1 string, result2 routing.MessageSink, result3 routing.MessageSource, result4 error) {
+15 -14
View File
@@ -1,6 +1,7 @@
package service
import (
"context"
"time"
livekit "github.com/livekit/protocol/proto"
@@ -15,30 +16,30 @@ import (
// look up participant
//counterfeiter:generate . RoomStore
type RoomStore interface {
StoreRoom(room *livekit.Room) error
LoadRoom(idOrName string) (*livekit.Room, error)
ListRooms() ([]*livekit.Room, error)
DeleteRoom(idOrName string) error
StoreRoom(ctx context.Context, room *livekit.Room) error
LoadRoom(ctx context.Context, idOrName string) (*livekit.Room, error)
ListRooms(ctx context.Context) ([]*livekit.Room, error)
DeleteRoom(ctx context.Context, idOrName string) error
// enable locking on a specific room to prevent race
// returns a (lock uuid, error)
LockRoom(name string, duration time.Duration) (string, error)
UnlockRoom(name string, uid string) error
LockRoom(ctx context.Context, name string, duration time.Duration) (string, error)
UnlockRoom(ctx context.Context, name string, uid string) error
StoreParticipant(roomName string, participant *livekit.ParticipantInfo) error
LoadParticipant(roomName, identity string) (*livekit.ParticipantInfo, error)
ListParticipants(roomName string) ([]*livekit.ParticipantInfo, error)
DeleteParticipant(roomName, identity string) error
StoreParticipant(ctx context.Context, roomName string, participant *livekit.ParticipantInfo) error
LoadParticipant(ctx context.Context, roomName, identity string) (*livekit.ParticipantInfo, error)
ListParticipants(ctx context.Context, roomName string) ([]*livekit.ParticipantInfo, error)
DeleteParticipant(ctx context.Context, roomName, identity string) error
}
type RoomManager interface {
RoomStore
CreateRoom(req *livekit.CreateRoomRequest) (*livekit.Room, error)
GetRoom(roomName string) *rtc.Room
DeleteRoom(roomName string) error
CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (*livekit.Room, error)
GetRoom(ctx context.Context, roomName string) *rtc.Room
DeleteRoom(ctx context.Context, roomName string) error
StartSession(ctx context.Context, roomName string, pi routing.ParticipantInit, requestSource routing.MessageSource, responseSink routing.MessageSink)
CleanupRooms() error
CloseIdleRooms()
Stop()
StartSession(roomName string, pi routing.ParticipantInit, requestSource routing.MessageSource, responseSink routing.MessageSink)
}
+12 -11
View File
@@ -1,6 +1,7 @@
package service
import (
"context"
"sync"
"time"
@@ -28,7 +29,7 @@ func NewLocalRoomStore() *LocalRoomStore {
}
}
func (p *LocalRoomStore) StoreRoom(room *livekit.Room) error {
func (p *LocalRoomStore) StoreRoom(ctx context.Context, room *livekit.Room) error {
if room.CreationTime == 0 {
room.CreationTime = time.Now().Unix()
}
@@ -39,7 +40,7 @@ func (p *LocalRoomStore) StoreRoom(room *livekit.Room) error {
return nil
}
func (p *LocalRoomStore) LoadRoom(idOrName string) (*livekit.Room, error) {
func (p *LocalRoomStore) LoadRoom(ctx context.Context, idOrName string) (*livekit.Room, error) {
p.lock.RLock()
defer p.lock.RUnlock()
// see if it's an id or name
@@ -54,7 +55,7 @@ func (p *LocalRoomStore) LoadRoom(idOrName string) (*livekit.Room, error) {
return room, nil
}
func (p *LocalRoomStore) ListRooms() ([]*livekit.Room, error) {
func (p *LocalRoomStore) ListRooms(ctx context.Context) ([]*livekit.Room, error) {
p.lock.RLock()
defer p.lock.RUnlock()
rooms := make([]*livekit.Room, 0, len(p.rooms))
@@ -64,8 +65,8 @@ func (p *LocalRoomStore) ListRooms() ([]*livekit.Room, error) {
return rooms, nil
}
func (p *LocalRoomStore) DeleteRoom(idOrName string) error {
room, err := p.LoadRoom(idOrName)
func (p *LocalRoomStore) DeleteRoom(ctx context.Context, idOrName string) error {
room, err := p.LoadRoom(ctx, idOrName)
if err == ErrRoomNotFound {
return nil
} else if err != nil {
@@ -81,18 +82,18 @@ func (p *LocalRoomStore) DeleteRoom(idOrName string) error {
return nil
}
func (p *LocalRoomStore) LockRoom(name string, duration time.Duration) (string, error) {
func (p *LocalRoomStore) LockRoom(ctx context.Context, name string, duration time.Duration) (string, error) {
// local rooms lock & unlock globally
p.globalLock.Lock()
return "", nil
}
func (p *LocalRoomStore) UnlockRoom(name string, uid string) error {
func (p *LocalRoomStore) UnlockRoom(ctx context.Context, name string, uid string) error {
p.globalLock.Unlock()
return nil
}
func (p *LocalRoomStore) StoreParticipant(roomName string, participant *livekit.ParticipantInfo) error {
func (p *LocalRoomStore) StoreParticipant(ctx context.Context, roomName string, participant *livekit.ParticipantInfo) error {
p.lock.Lock()
defer p.lock.Unlock()
roomParticipants := p.participants[roomName]
@@ -104,7 +105,7 @@ func (p *LocalRoomStore) StoreParticipant(roomName string, participant *livekit.
return nil
}
func (p *LocalRoomStore) LoadParticipant(roomName, identity string) (*livekit.ParticipantInfo, error) {
func (p *LocalRoomStore) LoadParticipant(ctx context.Context, roomName, identity string) (*livekit.ParticipantInfo, error) {
p.lock.RLock()
defer p.lock.RUnlock()
@@ -119,7 +120,7 @@ func (p *LocalRoomStore) LoadParticipant(roomName, identity string) (*livekit.Pa
return participant, nil
}
func (p *LocalRoomStore) ListParticipants(roomName string) ([]*livekit.ParticipantInfo, error) {
func (p *LocalRoomStore) ListParticipants(ctx context.Context, roomName string) ([]*livekit.ParticipantInfo, error) {
p.lock.RLock()
defer p.lock.RUnlock()
@@ -137,7 +138,7 @@ func (p *LocalRoomStore) ListParticipants(roomName string) ([]*livekit.Participa
return items, nil
}
func (p *LocalRoomStore) DeleteParticipant(roomName, identity string) error {
func (p *LocalRoomStore) DeleteParticipant(ctx context.Context, roomName, identity string) error {
p.lock.Lock()
defer p.lock.Unlock()
+11 -11
View File
@@ -38,7 +38,7 @@ func NewRedisRoomStore(rc *redis.Client) *RedisRoomStore {
}
}
func (p *RedisRoomStore) StoreRoom(room *livekit.Room) error {
func (p *RedisRoomStore) StoreRoom(ctx context.Context, room *livekit.Room) error {
if room.CreationTime == 0 {
room.CreationTime = time.Now().Unix()
}
@@ -58,7 +58,7 @@ func (p *RedisRoomStore) StoreRoom(room *livekit.Room) error {
return nil
}
func (p *RedisRoomStore) LoadRoom(idOrName string) (*livekit.Room, error) {
func (p *RedisRoomStore) LoadRoom(ctx context.Context, idOrName string) (*livekit.Room, error) {
// see if matches any ids
name, err := p.rc.HGet(p.ctx, RoomIdMap, idOrName).Result()
if err != nil {
@@ -82,7 +82,7 @@ func (p *RedisRoomStore) LoadRoom(idOrName string) (*livekit.Room, error) {
return &room, nil
}
func (p *RedisRoomStore) ListRooms() ([]*livekit.Room, error) {
func (p *RedisRoomStore) ListRooms(ctx context.Context) ([]*livekit.Room, error) {
items, err := p.rc.HVals(p.ctx, RoomsKey).Result()
if err != nil && err != redis.Nil {
return nil, errors.Wrap(err, "could not get rooms")
@@ -101,8 +101,8 @@ func (p *RedisRoomStore) ListRooms() ([]*livekit.Room, error) {
return rooms, nil
}
func (p *RedisRoomStore) DeleteRoom(idOrName string) error {
room, err := p.LoadRoom(idOrName)
func (p *RedisRoomStore) DeleteRoom(ctx context.Context, idOrName string) error {
room, err := p.LoadRoom(ctx, idOrName)
var sid, name string
if err == ErrRoomNotFound {
@@ -125,7 +125,7 @@ func (p *RedisRoomStore) DeleteRoom(idOrName string) error {
return err
}
func (p *RedisRoomStore) LockRoom(name string, duration time.Duration) (string, error) {
func (p *RedisRoomStore) LockRoom(ctx context.Context, name string, duration time.Duration) (string, error) {
token := utils.NewGuid("LOCK")
key := RoomLockPrefix + name
@@ -150,7 +150,7 @@ func (p *RedisRoomStore) LockRoom(name string, duration time.Duration) (string,
return "", ErrRoomLockFailed
}
func (p *RedisRoomStore) UnlockRoom(name string, uid string) error {
func (p *RedisRoomStore) UnlockRoom(ctx context.Context, name string, uid string) error {
key := RoomLockPrefix + name
val, err := p.rc.Get(p.ctx, key).Result()
@@ -167,7 +167,7 @@ func (p *RedisRoomStore) UnlockRoom(name string, uid string) error {
return p.rc.Del(p.ctx, key).Err()
}
func (p *RedisRoomStore) StoreParticipant(roomName string, participant *livekit.ParticipantInfo) error {
func (p *RedisRoomStore) StoreParticipant(ctx context.Context, roomName string, participant *livekit.ParticipantInfo) error {
key := RoomParticipantsPrefix + roomName
data, err := proto.Marshal(participant)
@@ -178,7 +178,7 @@ func (p *RedisRoomStore) StoreParticipant(roomName string, participant *livekit.
return p.rc.HSet(p.ctx, key, participant.Identity, data).Err()
}
func (p *RedisRoomStore) LoadParticipant(roomName, identity string) (*livekit.ParticipantInfo, error) {
func (p *RedisRoomStore) LoadParticipant(ctx context.Context, roomName, identity string) (*livekit.ParticipantInfo, error) {
key := RoomParticipantsPrefix + roomName
data, err := p.rc.HGet(p.ctx, key, identity).Result()
if err == redis.Nil {
@@ -194,7 +194,7 @@ func (p *RedisRoomStore) LoadParticipant(roomName, identity string) (*livekit.Pa
return &pi, nil
}
func (p *RedisRoomStore) ListParticipants(roomName string) ([]*livekit.ParticipantInfo, error) {
func (p *RedisRoomStore) ListParticipants(ctx context.Context, roomName string) ([]*livekit.ParticipantInfo, error) {
key := RoomParticipantsPrefix + roomName
items, err := p.rc.HVals(p.ctx, key).Result()
if err == redis.Nil {
@@ -214,7 +214,7 @@ func (p *RedisRoomStore) ListParticipants(roomName string) ([]*livekit.Participa
return participants, nil
}
func (p *RedisRoomStore) DeleteParticipant(roomName, identity string) error {
func (p *RedisRoomStore) DeleteParticipant(ctx context.Context, roomName, identity string) error {
key := RoomParticipantsPrefix + roomName
return p.rc.HDel(p.ctx, key, identity).Err()
+20 -17
View File
@@ -1,6 +1,7 @@
package service_test
import (
"context"
"sync"
"sync/atomic"
"testing"
@@ -13,10 +14,11 @@ import (
)
func TestParticipantPersistence(t *testing.T) {
ctx := context.Background()
rs := service.NewRedisRoomStore(redisClient())
roomName := "room1"
rs.DeleteRoom(roomName)
rs.DeleteRoom(ctx, roomName)
p := &livekit.ParticipantInfo{
Sid: "PA_test",
@@ -32,46 +34,47 @@ func TestParticipantPersistence(t *testing.T) {
}
// create the participant
require.NoError(t, rs.StoreParticipant(roomName, p))
require.NoError(t, rs.StoreParticipant(ctx, roomName, p))
// result should match
pGet, err := rs.LoadParticipant(roomName, p.Identity)
pGet, err := rs.LoadParticipant(ctx, roomName, p.Identity)
require.NoError(t, err)
require.Equal(t, p.Identity, pGet.Identity)
require.Equal(t, len(p.Tracks), len(pGet.Tracks))
require.Equal(t, p.Tracks[0].Sid, pGet.Tracks[0].Sid)
// list should return one participant
participants, err := rs.ListParticipants(roomName)
participants, err := rs.ListParticipants(ctx, roomName)
require.NoError(t, err)
require.Len(t, participants, 1)
// deleting participant should return back to normal
require.NoError(t, rs.DeleteParticipant(roomName, p.Identity))
require.NoError(t, rs.DeleteParticipant(ctx, roomName, p.Identity))
participants, err = rs.ListParticipants(roomName)
participants, err = rs.ListParticipants(ctx, roomName)
require.NoError(t, err)
require.Len(t, participants, 0)
// shouldn't be able to get it
_, err = rs.LoadParticipant(roomName, p.Identity)
_, err = rs.LoadParticipant(ctx, roomName, p.Identity)
require.Equal(t, err, service.ErrParticipantNotFound)
}
func TestRoomLock(t *testing.T) {
ctx := context.Background()
rs := service.NewRedisRoomStore(redisClient())
lockInterval := 5 * time.Millisecond
roomName := "myroom"
t.Run("normal locking", func(t *testing.T) {
token, err := rs.LockRoom(roomName, lockInterval)
token, err := rs.LockRoom(ctx, roomName, lockInterval)
require.NoError(t, err)
require.NotEmpty(t, token)
require.NoError(t, rs.UnlockRoom(roomName, token))
require.NoError(t, rs.UnlockRoom(ctx, roomName, token))
})
t.Run("waits before acquiring lock", func(t *testing.T) {
token, err := rs.LockRoom(roomName, lockInterval)
token, err := rs.LockRoom(ctx, roomName, lockInterval)
require.NoError(t, err)
require.NotEmpty(t, token)
unlocked := uint32(0)
@@ -81,28 +84,28 @@ func TestRoomLock(t *testing.T) {
go func() {
// attempt to lock again
defer wg.Done()
token2, err := rs.LockRoom(roomName, lockInterval)
token2, err := rs.LockRoom(ctx, roomName, lockInterval)
require.NoError(t, err)
defer rs.UnlockRoom(roomName, token2)
defer rs.UnlockRoom(ctx, roomName, token2)
require.Equal(t, uint32(1), atomic.LoadUint32(&unlocked))
}()
// release after 2 ms
time.Sleep(2 * time.Millisecond)
atomic.StoreUint32(&unlocked, 1)
rs.UnlockRoom(roomName, token)
rs.UnlockRoom(ctx, roomName, token)
wg.Wait()
})
t.Run("lock expires", func(t *testing.T) {
token, err := rs.LockRoom(roomName, lockInterval)
token, err := rs.LockRoom(ctx, roomName, lockInterval)
require.NoError(t, err)
defer rs.UnlockRoom(roomName, token)
defer rs.UnlockRoom(ctx, roomName, token)
time.Sleep(lockInterval + time.Millisecond)
token2, err := rs.LockRoom(roomName, lockInterval)
token2, err := rs.LockRoom(ctx, roomName, lockInterval)
require.NoError(t, err)
rs.UnlockRoom(roomName, token2)
rs.UnlockRoom(ctx, roomName, token2)
})
}
+23 -21
View File
@@ -1,6 +1,7 @@
package service
import (
"context"
"fmt"
"sync"
"time"
@@ -66,17 +67,17 @@ func NewLocalRoomManager(rp RoomStore, router routing.Router, currentNode routin
// CreateRoom creates a new room from a request and allocates it to a node to handle
// it'll also monitor fits state, and cleans it up when appropriate
func (r *LocalRoomManager) CreateRoom(req *livekit.CreateRoomRequest) (*livekit.Room, error) {
token, err := r.LockRoom(req.Name, 5*time.Second)
func (r *LocalRoomManager) CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (*livekit.Room, error) {
token, err := r.LockRoom(ctx, req.Name, 5*time.Second)
if err != nil {
return nil, err
}
defer func() {
_ = r.UnlockRoom(req.Name, token)
_ = r.UnlockRoom(ctx, req.Name, token)
}()
// find existing room and update it
rm, err := r.LoadRoom(req.Name)
rm, err := r.LoadRoom(ctx, req.Name)
if err == ErrRoomNotFound {
rm = &livekit.Room{
Sid: utils.NewGuid(utils.RoomPrefix),
@@ -95,12 +96,12 @@ func (r *LocalRoomManager) CreateRoom(req *livekit.CreateRoomRequest) (*livekit.
if req.MaxParticipants > 0 {
rm.MaxParticipants = req.MaxParticipants
}
if err := r.StoreRoom(rm); err != nil {
if err := r.StoreRoom(ctx, rm); err != nil {
return nil, err
}
// Is that node still available?
node, err := r.router.GetNodeForRoom(rm.Name)
node, err := r.router.GetNodeForRoom(ctx, rm.Name)
if err != routing.ErrNotFound && err != nil {
return nil, err
}
@@ -127,21 +128,21 @@ func (r *LocalRoomManager) CreateRoom(req *livekit.CreateRoomRequest) (*livekit.
}
logger.Debugw("selected node for room", "room", rm.Name, "roomID", rm.Sid, "nodeID", nodeId)
if err := r.router.SetNodeForRoom(req.Name, nodeId); err != nil {
if err := r.router.SetNodeForRoom(ctx, req.Name, nodeId); err != nil {
return nil, err
}
return rm, nil
}
func (r *LocalRoomManager) GetRoom(roomName string) *rtc.Room {
func (r *LocalRoomManager) GetRoom(ctx context.Context, roomName string) *rtc.Room {
r.lock.RLock()
defer r.lock.RUnlock()
return r.rooms[roomName]
}
// DeleteRoom completely deletes all room information, including active sessions, room store, and routing info
func (r *LocalRoomManager) DeleteRoom(roomName string) error {
func (r *LocalRoomManager) DeleteRoom(ctx context.Context, roomName string) error {
logger.Infow("deleting room state", "room", roomName)
r.lock.Lock()
delete(r.rooms, roomName)
@@ -153,12 +154,12 @@ func (r *LocalRoomManager) DeleteRoom(roomName string) error {
// clear routing information
go func() {
defer wg.Done()
err = r.router.ClearRoomState(roomName)
err = r.router.ClearRoomState(ctx, roomName)
}()
// also delete room from db
go func() {
defer wg.Done()
err2 = r.RoomStore.DeleteRoom(roomName)
err2 = r.RoomStore.DeleteRoom(ctx, roomName)
}()
wg.Wait()
@@ -172,7 +173,8 @@ func (r *LocalRoomManager) DeleteRoom(roomName string) error {
// CleanupRooms cleans up after old rooms that have been around for awhile
func (r *LocalRoomManager) CleanupRooms() error {
// cleanup rooms that have been left for over a day
rooms, err := r.ListRooms()
ctx := context.Background()
rooms, err := r.ListRooms(ctx)
if err != nil {
return err
}
@@ -180,7 +182,7 @@ func (r *LocalRoomManager) CleanupRooms() error {
now := time.Now().Unix()
for _, room := range rooms {
if (now - room.CreationTime) > roomPurgeSeconds {
if err := r.DeleteRoom(room.Name); err != nil {
if err := r.DeleteRoom(ctx, room.Name); err != nil {
return err
}
}
@@ -228,8 +230,8 @@ func (r *LocalRoomManager) Stop() {
}
// StartSession starts WebRTC session when a new participant is connected, takes place on RTC node
func (r *LocalRoomManager) StartSession(roomName string, pi routing.ParticipantInit, requestSource routing.MessageSource, responseSink routing.MessageSink) {
room, err := r.getOrCreateRoom(roomName)
func (r *LocalRoomManager) StartSession(ctx context.Context, roomName string, pi routing.ParticipantInit, requestSource routing.MessageSource, responseSink routing.MessageSink) {
room, err := r.getOrCreateRoom(ctx, roomName)
if err != nil {
logger.Errorw("could not create room", err, "room", roomName)
return
@@ -331,7 +333,7 @@ func (r *LocalRoomManager) StartSession(roomName string, pi routing.ParticipantI
}
// create the actual room object, to be used on RTC node
func (r *LocalRoomManager) getOrCreateRoom(roomName string) (*rtc.Room, error) {
func (r *LocalRoomManager) getOrCreateRoom(ctx context.Context, roomName string) (*rtc.Room, error) {
r.lock.RLock()
room := r.rooms[roomName]
r.lock.RUnlock()
@@ -341,7 +343,7 @@ func (r *LocalRoomManager) getOrCreateRoom(roomName string) (*rtc.Room, error) {
}
// create new room, get details first
ri, err := r.LoadRoom(roomName)
ri, err := r.LoadRoom(ctx, roomName)
if err != nil {
return nil, err
}
@@ -349,7 +351,7 @@ func (r *LocalRoomManager) getOrCreateRoom(roomName string) (*rtc.Room, error) {
// construct ice servers
room = rtc.NewRoom(ri, *r.rtcConfig, r.iceServersForRoom(ri), &r.config.Audio)
room.OnClose(func() {
if err := r.DeleteRoom(roomName); err != nil {
if err := r.DeleteRoom(ctx, roomName); err != nil {
logger.Errorw("could not delete room", err)
}
@@ -367,9 +369,9 @@ func (r *LocalRoomManager) getOrCreateRoom(roomName string) (*rtc.Room, error) {
room.OnParticipantChanged(func(p types.Participant) {
var err error
if p.State() == livekit.ParticipantInfo_DISCONNECTED {
err = r.DeleteParticipant(roomName, p.Identity())
err = r.DeleteParticipant(ctx, roomName, p.Identity())
} else {
err = r.StoreParticipant(roomName, p.ToProto())
err = r.StoreParticipant(ctx, roomName, p.ToProto())
}
if err != nil {
logger.Errorw("could not handle participant change", err)
@@ -488,7 +490,7 @@ func (r *LocalRoomManager) rtcSessionWorker(room *rtc.Room, participant types.Pa
}
}
func (r *LocalRoomManager) handleRTCMessage(roomName, identity string, msg *livekit.RTCNodeMessage) {
func (r *LocalRoomManager) handleRTCMessage(ctx context.Context, roomName, identity string, msg *livekit.RTCNodeMessage) {
r.lock.RLock()
room := r.rooms[roomName]
r.lock.RUnlock()
+2 -1
View File
@@ -1,6 +1,7 @@
package service_test
import (
"context"
"testing"
livekit "github.com/livekit/protocol/proto"
@@ -17,7 +18,7 @@ func TestCreateRoom(t *testing.T) {
manager, conf := newTestRoomManager(t)
t.Run("ensure default room settings are applied", func(t *testing.T) {
room, err := manager.CreateRoom(&livekit.CreateRoomRequest{Name: "myroom"})
room, err := manager.CreateRoom(context.Background(), &livekit.CreateRoomRequest{Name: "myroom"})
require.NoError(t, err)
require.Equal(t, conf.Room.EmptyTimeout, room.EmptyTimeout)
require.NotEmpty(t, room.EnabledCodecs)
+11 -11
View File
@@ -30,7 +30,7 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq
return nil, twirpAuthError(err)
}
rm, err = s.roomManager.CreateRoom(req)
rm, err = s.roomManager.CreateRoom(ctx, req)
if err != nil {
err = errors.Wrap(err, "could not create room")
}
@@ -44,7 +44,7 @@ func (s *RoomService) ListRooms(ctx context.Context, req *livekit.ListRoomsReque
return nil, twirpAuthError(err)
}
rooms, err := s.roomManager.ListRooms()
rooms, err := s.roomManager.ListRooms(ctx)
if err != nil {
// TODO: translate error codes to twirp
return
@@ -62,7 +62,7 @@ func (s *RoomService) DeleteRoom(ctx context.Context, req *livekit.DeleteRoomReq
}
// if the room is currently active, RTC node needs to disconnect clients
// here we are using any user's identity, due to how it works with routing
participants, err := s.roomManager.ListParticipants(req.Room)
participants, err := s.roomManager.ListParticipants(ctx, req.Room)
if err != nil {
return nil, err
}
@@ -78,7 +78,7 @@ func (s *RoomService) DeleteRoom(ctx context.Context, req *livekit.DeleteRoomReq
}
} else {
// if a room hasn't started, delete locally
if err = s.roomManager.DeleteRoom(req.Room); err != nil {
if err = s.roomManager.DeleteRoom(ctx, req.Room); err != nil {
err = twirp.WrapError(twirp.InternalError("could not delete room"), err)
return nil, err
}
@@ -92,7 +92,7 @@ func (s *RoomService) ListParticipants(ctx context.Context, req *livekit.ListPar
return nil, twirpAuthError(err)
}
participants, err := s.roomManager.ListParticipants(req.Room)
participants, err := s.roomManager.ListParticipants(ctx, req.Room)
if err != nil {
return
}
@@ -108,7 +108,7 @@ func (s *RoomService) GetParticipant(ctx context.Context, req *livekit.RoomParti
return nil, twirpAuthError(err)
}
participant, err := s.roomManager.LoadParticipant(req.Room, req.Identity)
participant, err := s.roomManager.LoadParticipant(ctx, req.Room, req.Identity)
if err != nil {
return
}
@@ -136,7 +136,7 @@ func (s *RoomService) MutePublishedTrack(ctx context.Context, req *livekit.MuteR
return nil, twirpAuthError(err)
}
participant, err := s.roomManager.LoadParticipant(req.Room, req.Identity)
participant, err := s.roomManager.LoadParticipant(ctx, req.Room, req.Identity)
if err != nil {
return nil, err
}
@@ -176,7 +176,7 @@ func (s *RoomService) UpdateParticipant(ctx context.Context, req *livekit.Update
return nil, err
}
participant, err := s.roomManager.LoadParticipant(req.Room, req.Identity)
participant, err := s.roomManager.LoadParticipant(ctx, req.Room, req.Identity)
if err != nil {
return nil, err
}
@@ -200,7 +200,7 @@ func (s *RoomService) UpdateSubscriptions(ctx context.Context, req *livekit.Upda
func (s *RoomService) SendData(ctx context.Context, req *livekit.SendDataRequest) (*livekit.SendDataResponse, error) {
// here we are using any user's identity, due to how it works with routing
participants, err := s.roomManager.ListParticipants(req.Room)
participants, err := s.roomManager.ListParticipants(ctx, req.Room)
if err != nil {
return nil, err
}
@@ -224,12 +224,12 @@ func (s *RoomService) createRTCSink(ctx context.Context, room, identity string)
return nil, twirpAuthError(err)
}
_, err := s.roomManager.LoadParticipant(room, identity)
_, err := s.roomManager.LoadParticipant(ctx, room, identity)
if err != nil {
return nil, err
}
return s.router.CreateRTCSink(room, identity)
return s.router.CreateRTCSink(ctx, room, identity)
}
func (s *RoomService) writeMessage(ctx context.Context, room, identity string, msg *livekit.RTCNodeMessage) error {
+3 -2
View File
@@ -1,6 +1,7 @@
package service
import (
"context"
"fmt"
"io"
"net/http"
@@ -108,14 +109,14 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
// create room if it doesn't exist, also assigns an RTC node for the room
rm, err := s.roomManager.CreateRoom(&livekit.CreateRoomRequest{Name: roomName})
rm, err := s.roomManager.CreateRoom(context.Background(), &livekit.CreateRoomRequest{Name: roomName})
if err != nil {
handleError(w, http.StatusInternalServerError, err.Error())
return
}
// this needs to be started first *before* using router functions on this node
connId, reqSink, resSource, err := s.router.StartParticipantSignal(roomName, pi)
connId, reqSink, resSource, err := s.router.StartParticipantSignal(context.Background(), roomName, pi)
if err != nil {
handleError(w, http.StatusInternalServerError, "could not start session: "+err.Error())
return
+128 -100
View File
@@ -2,6 +2,7 @@
package servicefakes
import (
"context"
"sync"
"time"
@@ -10,11 +11,12 @@ import (
)
type FakeRoomStore struct {
DeleteParticipantStub func(string, string) error
DeleteParticipantStub func(context.Context, string, string) error
deleteParticipantMutex sync.RWMutex
deleteParticipantArgsForCall []struct {
arg1 string
arg1 context.Context
arg2 string
arg3 string
}
deleteParticipantReturns struct {
result1 error
@@ -22,10 +24,11 @@ type FakeRoomStore struct {
deleteParticipantReturnsOnCall map[int]struct {
result1 error
}
DeleteRoomStub func(string) error
DeleteRoomStub func(context.Context, string) error
deleteRoomMutex sync.RWMutex
deleteRoomArgsForCall []struct {
arg1 string
arg1 context.Context
arg2 string
}
deleteRoomReturns struct {
result1 error
@@ -33,10 +36,11 @@ type FakeRoomStore struct {
deleteRoomReturnsOnCall map[int]struct {
result1 error
}
ListParticipantsStub func(string) ([]*livekit.ParticipantInfo, error)
ListParticipantsStub func(context.Context, string) ([]*livekit.ParticipantInfo, error)
listParticipantsMutex sync.RWMutex
listParticipantsArgsForCall []struct {
arg1 string
arg1 context.Context
arg2 string
}
listParticipantsReturns struct {
result1 []*livekit.ParticipantInfo
@@ -46,9 +50,10 @@ type FakeRoomStore struct {
result1 []*livekit.ParticipantInfo
result2 error
}
ListRoomsStub func() ([]*livekit.Room, error)
ListRoomsStub func(context.Context) ([]*livekit.Room, error)
listRoomsMutex sync.RWMutex
listRoomsArgsForCall []struct {
arg1 context.Context
}
listRoomsReturns struct {
result1 []*livekit.Room
@@ -58,11 +63,12 @@ type FakeRoomStore struct {
result1 []*livekit.Room
result2 error
}
LoadParticipantStub func(string, string) (*livekit.ParticipantInfo, error)
LoadParticipantStub func(context.Context, string, string) (*livekit.ParticipantInfo, error)
loadParticipantMutex sync.RWMutex
loadParticipantArgsForCall []struct {
arg1 string
arg1 context.Context
arg2 string
arg3 string
}
loadParticipantReturns struct {
result1 *livekit.ParticipantInfo
@@ -72,10 +78,11 @@ type FakeRoomStore struct {
result1 *livekit.ParticipantInfo
result2 error
}
LoadRoomStub func(string) (*livekit.Room, error)
LoadRoomStub func(context.Context, string) (*livekit.Room, error)
loadRoomMutex sync.RWMutex
loadRoomArgsForCall []struct {
arg1 string
arg1 context.Context
arg2 string
}
loadRoomReturns struct {
result1 *livekit.Room
@@ -85,11 +92,12 @@ type FakeRoomStore struct {
result1 *livekit.Room
result2 error
}
LockRoomStub func(string, time.Duration) (string, error)
LockRoomStub func(context.Context, string, time.Duration) (string, error)
lockRoomMutex sync.RWMutex
lockRoomArgsForCall []struct {
arg1 string
arg2 time.Duration
arg1 context.Context
arg2 string
arg3 time.Duration
}
lockRoomReturns struct {
result1 string
@@ -99,11 +107,12 @@ type FakeRoomStore struct {
result1 string
result2 error
}
StoreParticipantStub func(string, *livekit.ParticipantInfo) error
StoreParticipantStub func(context.Context, string, *livekit.ParticipantInfo) error
storeParticipantMutex sync.RWMutex
storeParticipantArgsForCall []struct {
arg1 string
arg2 *livekit.ParticipantInfo
arg1 context.Context
arg2 string
arg3 *livekit.ParticipantInfo
}
storeParticipantReturns struct {
result1 error
@@ -111,10 +120,11 @@ type FakeRoomStore struct {
storeParticipantReturnsOnCall map[int]struct {
result1 error
}
StoreRoomStub func(*livekit.Room) error
StoreRoomStub func(context.Context, *livekit.Room) error
storeRoomMutex sync.RWMutex
storeRoomArgsForCall []struct {
arg1 *livekit.Room
arg1 context.Context
arg2 *livekit.Room
}
storeRoomReturns struct {
result1 error
@@ -122,11 +132,12 @@ type FakeRoomStore struct {
storeRoomReturnsOnCall map[int]struct {
result1 error
}
UnlockRoomStub func(string, string) error
UnlockRoomStub func(context.Context, string, string) error
unlockRoomMutex sync.RWMutex
unlockRoomArgsForCall []struct {
arg1 string
arg1 context.Context
arg2 string
arg3 string
}
unlockRoomReturns struct {
result1 error
@@ -138,19 +149,20 @@ type FakeRoomStore struct {
invocationsMutex sync.RWMutex
}
func (fake *FakeRoomStore) DeleteParticipant(arg1 string, arg2 string) error {
func (fake *FakeRoomStore) DeleteParticipant(arg1 context.Context, arg2 string, arg3 string) error {
fake.deleteParticipantMutex.Lock()
ret, specificReturn := fake.deleteParticipantReturnsOnCall[len(fake.deleteParticipantArgsForCall)]
fake.deleteParticipantArgsForCall = append(fake.deleteParticipantArgsForCall, struct {
arg1 string
arg1 context.Context
arg2 string
}{arg1, arg2})
arg3 string
}{arg1, arg2, arg3})
stub := fake.DeleteParticipantStub
fakeReturns := fake.deleteParticipantReturns
fake.recordInvocation("DeleteParticipant", []interface{}{arg1, arg2})
fake.recordInvocation("DeleteParticipant", []interface{}{arg1, arg2, arg3})
fake.deleteParticipantMutex.Unlock()
if stub != nil {
return stub(arg1, arg2)
return stub(arg1, arg2, arg3)
}
if specificReturn {
return ret.result1
@@ -164,17 +176,17 @@ func (fake *FakeRoomStore) DeleteParticipantCallCount() int {
return len(fake.deleteParticipantArgsForCall)
}
func (fake *FakeRoomStore) DeleteParticipantCalls(stub func(string, string) error) {
func (fake *FakeRoomStore) DeleteParticipantCalls(stub func(context.Context, string, string) error) {
fake.deleteParticipantMutex.Lock()
defer fake.deleteParticipantMutex.Unlock()
fake.DeleteParticipantStub = stub
}
func (fake *FakeRoomStore) DeleteParticipantArgsForCall(i int) (string, string) {
func (fake *FakeRoomStore) DeleteParticipantArgsForCall(i int) (context.Context, string, string) {
fake.deleteParticipantMutex.RLock()
defer fake.deleteParticipantMutex.RUnlock()
argsForCall := fake.deleteParticipantArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
}
func (fake *FakeRoomStore) DeleteParticipantReturns(result1 error) {
@@ -200,18 +212,19 @@ func (fake *FakeRoomStore) DeleteParticipantReturnsOnCall(i int, result1 error)
}{result1}
}
func (fake *FakeRoomStore) DeleteRoom(arg1 string) error {
func (fake *FakeRoomStore) DeleteRoom(arg1 context.Context, arg2 string) error {
fake.deleteRoomMutex.Lock()
ret, specificReturn := fake.deleteRoomReturnsOnCall[len(fake.deleteRoomArgsForCall)]
fake.deleteRoomArgsForCall = append(fake.deleteRoomArgsForCall, struct {
arg1 string
}{arg1})
arg1 context.Context
arg2 string
}{arg1, arg2})
stub := fake.DeleteRoomStub
fakeReturns := fake.deleteRoomReturns
fake.recordInvocation("DeleteRoom", []interface{}{arg1})
fake.recordInvocation("DeleteRoom", []interface{}{arg1, arg2})
fake.deleteRoomMutex.Unlock()
if stub != nil {
return stub(arg1)
return stub(arg1, arg2)
}
if specificReturn {
return ret.result1
@@ -225,17 +238,17 @@ func (fake *FakeRoomStore) DeleteRoomCallCount() int {
return len(fake.deleteRoomArgsForCall)
}
func (fake *FakeRoomStore) DeleteRoomCalls(stub func(string) error) {
func (fake *FakeRoomStore) DeleteRoomCalls(stub func(context.Context, string) error) {
fake.deleteRoomMutex.Lock()
defer fake.deleteRoomMutex.Unlock()
fake.DeleteRoomStub = stub
}
func (fake *FakeRoomStore) DeleteRoomArgsForCall(i int) string {
func (fake *FakeRoomStore) DeleteRoomArgsForCall(i int) (context.Context, string) {
fake.deleteRoomMutex.RLock()
defer fake.deleteRoomMutex.RUnlock()
argsForCall := fake.deleteRoomArgsForCall[i]
return argsForCall.arg1
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeRoomStore) DeleteRoomReturns(result1 error) {
@@ -261,18 +274,19 @@ func (fake *FakeRoomStore) DeleteRoomReturnsOnCall(i int, result1 error) {
}{result1}
}
func (fake *FakeRoomStore) ListParticipants(arg1 string) ([]*livekit.ParticipantInfo, error) {
func (fake *FakeRoomStore) ListParticipants(arg1 context.Context, arg2 string) ([]*livekit.ParticipantInfo, error) {
fake.listParticipantsMutex.Lock()
ret, specificReturn := fake.listParticipantsReturnsOnCall[len(fake.listParticipantsArgsForCall)]
fake.listParticipantsArgsForCall = append(fake.listParticipantsArgsForCall, struct {
arg1 string
}{arg1})
arg1 context.Context
arg2 string
}{arg1, arg2})
stub := fake.ListParticipantsStub
fakeReturns := fake.listParticipantsReturns
fake.recordInvocation("ListParticipants", []interface{}{arg1})
fake.recordInvocation("ListParticipants", []interface{}{arg1, arg2})
fake.listParticipantsMutex.Unlock()
if stub != nil {
return stub(arg1)
return stub(arg1, arg2)
}
if specificReturn {
return ret.result1, ret.result2
@@ -286,17 +300,17 @@ func (fake *FakeRoomStore) ListParticipantsCallCount() int {
return len(fake.listParticipantsArgsForCall)
}
func (fake *FakeRoomStore) ListParticipantsCalls(stub func(string) ([]*livekit.ParticipantInfo, error)) {
func (fake *FakeRoomStore) ListParticipantsCalls(stub func(context.Context, string) ([]*livekit.ParticipantInfo, error)) {
fake.listParticipantsMutex.Lock()
defer fake.listParticipantsMutex.Unlock()
fake.ListParticipantsStub = stub
}
func (fake *FakeRoomStore) ListParticipantsArgsForCall(i int) string {
func (fake *FakeRoomStore) ListParticipantsArgsForCall(i int) (context.Context, string) {
fake.listParticipantsMutex.RLock()
defer fake.listParticipantsMutex.RUnlock()
argsForCall := fake.listParticipantsArgsForCall[i]
return argsForCall.arg1
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeRoomStore) ListParticipantsReturns(result1 []*livekit.ParticipantInfo, result2 error) {
@@ -325,17 +339,18 @@ func (fake *FakeRoomStore) ListParticipantsReturnsOnCall(i int, result1 []*livek
}{result1, result2}
}
func (fake *FakeRoomStore) ListRooms() ([]*livekit.Room, error) {
func (fake *FakeRoomStore) ListRooms(arg1 context.Context) ([]*livekit.Room, error) {
fake.listRoomsMutex.Lock()
ret, specificReturn := fake.listRoomsReturnsOnCall[len(fake.listRoomsArgsForCall)]
fake.listRoomsArgsForCall = append(fake.listRoomsArgsForCall, struct {
}{})
arg1 context.Context
}{arg1})
stub := fake.ListRoomsStub
fakeReturns := fake.listRoomsReturns
fake.recordInvocation("ListRooms", []interface{}{})
fake.recordInvocation("ListRooms", []interface{}{arg1})
fake.listRoomsMutex.Unlock()
if stub != nil {
return stub()
return stub(arg1)
}
if specificReturn {
return ret.result1, ret.result2
@@ -349,12 +364,19 @@ func (fake *FakeRoomStore) ListRoomsCallCount() int {
return len(fake.listRoomsArgsForCall)
}
func (fake *FakeRoomStore) ListRoomsCalls(stub func() ([]*livekit.Room, error)) {
func (fake *FakeRoomStore) ListRoomsCalls(stub func(context.Context) ([]*livekit.Room, error)) {
fake.listRoomsMutex.Lock()
defer fake.listRoomsMutex.Unlock()
fake.ListRoomsStub = stub
}
func (fake *FakeRoomStore) ListRoomsArgsForCall(i int) context.Context {
fake.listRoomsMutex.RLock()
defer fake.listRoomsMutex.RUnlock()
argsForCall := fake.listRoomsArgsForCall[i]
return argsForCall.arg1
}
func (fake *FakeRoomStore) ListRoomsReturns(result1 []*livekit.Room, result2 error) {
fake.listRoomsMutex.Lock()
defer fake.listRoomsMutex.Unlock()
@@ -381,19 +403,20 @@ func (fake *FakeRoomStore) ListRoomsReturnsOnCall(i int, result1 []*livekit.Room
}{result1, result2}
}
func (fake *FakeRoomStore) LoadParticipant(arg1 string, arg2 string) (*livekit.ParticipantInfo, error) {
func (fake *FakeRoomStore) LoadParticipant(arg1 context.Context, arg2 string, arg3 string) (*livekit.ParticipantInfo, error) {
fake.loadParticipantMutex.Lock()
ret, specificReturn := fake.loadParticipantReturnsOnCall[len(fake.loadParticipantArgsForCall)]
fake.loadParticipantArgsForCall = append(fake.loadParticipantArgsForCall, struct {
arg1 string
arg1 context.Context
arg2 string
}{arg1, arg2})
arg3 string
}{arg1, arg2, arg3})
stub := fake.LoadParticipantStub
fakeReturns := fake.loadParticipantReturns
fake.recordInvocation("LoadParticipant", []interface{}{arg1, arg2})
fake.recordInvocation("LoadParticipant", []interface{}{arg1, arg2, arg3})
fake.loadParticipantMutex.Unlock()
if stub != nil {
return stub(arg1, arg2)
return stub(arg1, arg2, arg3)
}
if specificReturn {
return ret.result1, ret.result2
@@ -407,17 +430,17 @@ func (fake *FakeRoomStore) LoadParticipantCallCount() int {
return len(fake.loadParticipantArgsForCall)
}
func (fake *FakeRoomStore) LoadParticipantCalls(stub func(string, string) (*livekit.ParticipantInfo, error)) {
func (fake *FakeRoomStore) LoadParticipantCalls(stub func(context.Context, string, string) (*livekit.ParticipantInfo, error)) {
fake.loadParticipantMutex.Lock()
defer fake.loadParticipantMutex.Unlock()
fake.LoadParticipantStub = stub
}
func (fake *FakeRoomStore) LoadParticipantArgsForCall(i int) (string, string) {
func (fake *FakeRoomStore) LoadParticipantArgsForCall(i int) (context.Context, string, string) {
fake.loadParticipantMutex.RLock()
defer fake.loadParticipantMutex.RUnlock()
argsForCall := fake.loadParticipantArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
}
func (fake *FakeRoomStore) LoadParticipantReturns(result1 *livekit.ParticipantInfo, result2 error) {
@@ -446,18 +469,19 @@ func (fake *FakeRoomStore) LoadParticipantReturnsOnCall(i int, result1 *livekit.
}{result1, result2}
}
func (fake *FakeRoomStore) LoadRoom(arg1 string) (*livekit.Room, error) {
func (fake *FakeRoomStore) LoadRoom(arg1 context.Context, arg2 string) (*livekit.Room, error) {
fake.loadRoomMutex.Lock()
ret, specificReturn := fake.loadRoomReturnsOnCall[len(fake.loadRoomArgsForCall)]
fake.loadRoomArgsForCall = append(fake.loadRoomArgsForCall, struct {
arg1 string
}{arg1})
arg1 context.Context
arg2 string
}{arg1, arg2})
stub := fake.LoadRoomStub
fakeReturns := fake.loadRoomReturns
fake.recordInvocation("LoadRoom", []interface{}{arg1})
fake.recordInvocation("LoadRoom", []interface{}{arg1, arg2})
fake.loadRoomMutex.Unlock()
if stub != nil {
return stub(arg1)
return stub(arg1, arg2)
}
if specificReturn {
return ret.result1, ret.result2
@@ -471,17 +495,17 @@ func (fake *FakeRoomStore) LoadRoomCallCount() int {
return len(fake.loadRoomArgsForCall)
}
func (fake *FakeRoomStore) LoadRoomCalls(stub func(string) (*livekit.Room, error)) {
func (fake *FakeRoomStore) LoadRoomCalls(stub func(context.Context, string) (*livekit.Room, error)) {
fake.loadRoomMutex.Lock()
defer fake.loadRoomMutex.Unlock()
fake.LoadRoomStub = stub
}
func (fake *FakeRoomStore) LoadRoomArgsForCall(i int) string {
func (fake *FakeRoomStore) LoadRoomArgsForCall(i int) (context.Context, string) {
fake.loadRoomMutex.RLock()
defer fake.loadRoomMutex.RUnlock()
argsForCall := fake.loadRoomArgsForCall[i]
return argsForCall.arg1
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeRoomStore) LoadRoomReturns(result1 *livekit.Room, result2 error) {
@@ -510,19 +534,20 @@ func (fake *FakeRoomStore) LoadRoomReturnsOnCall(i int, result1 *livekit.Room, r
}{result1, result2}
}
func (fake *FakeRoomStore) LockRoom(arg1 string, arg2 time.Duration) (string, error) {
func (fake *FakeRoomStore) LockRoom(arg1 context.Context, arg2 string, arg3 time.Duration) (string, error) {
fake.lockRoomMutex.Lock()
ret, specificReturn := fake.lockRoomReturnsOnCall[len(fake.lockRoomArgsForCall)]
fake.lockRoomArgsForCall = append(fake.lockRoomArgsForCall, struct {
arg1 string
arg2 time.Duration
}{arg1, arg2})
arg1 context.Context
arg2 string
arg3 time.Duration
}{arg1, arg2, arg3})
stub := fake.LockRoomStub
fakeReturns := fake.lockRoomReturns
fake.recordInvocation("LockRoom", []interface{}{arg1, arg2})
fake.recordInvocation("LockRoom", []interface{}{arg1, arg2, arg3})
fake.lockRoomMutex.Unlock()
if stub != nil {
return stub(arg1, arg2)
return stub(arg1, arg2, arg3)
}
if specificReturn {
return ret.result1, ret.result2
@@ -536,17 +561,17 @@ func (fake *FakeRoomStore) LockRoomCallCount() int {
return len(fake.lockRoomArgsForCall)
}
func (fake *FakeRoomStore) LockRoomCalls(stub func(string, time.Duration) (string, error)) {
func (fake *FakeRoomStore) LockRoomCalls(stub func(context.Context, string, time.Duration) (string, error)) {
fake.lockRoomMutex.Lock()
defer fake.lockRoomMutex.Unlock()
fake.LockRoomStub = stub
}
func (fake *FakeRoomStore) LockRoomArgsForCall(i int) (string, time.Duration) {
func (fake *FakeRoomStore) LockRoomArgsForCall(i int) (context.Context, string, time.Duration) {
fake.lockRoomMutex.RLock()
defer fake.lockRoomMutex.RUnlock()
argsForCall := fake.lockRoomArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
}
func (fake *FakeRoomStore) LockRoomReturns(result1 string, result2 error) {
@@ -575,19 +600,20 @@ func (fake *FakeRoomStore) LockRoomReturnsOnCall(i int, result1 string, result2
}{result1, result2}
}
func (fake *FakeRoomStore) StoreParticipant(arg1 string, arg2 *livekit.ParticipantInfo) error {
func (fake *FakeRoomStore) StoreParticipant(arg1 context.Context, arg2 string, arg3 *livekit.ParticipantInfo) error {
fake.storeParticipantMutex.Lock()
ret, specificReturn := fake.storeParticipantReturnsOnCall[len(fake.storeParticipantArgsForCall)]
fake.storeParticipantArgsForCall = append(fake.storeParticipantArgsForCall, struct {
arg1 string
arg2 *livekit.ParticipantInfo
}{arg1, arg2})
arg1 context.Context
arg2 string
arg3 *livekit.ParticipantInfo
}{arg1, arg2, arg3})
stub := fake.StoreParticipantStub
fakeReturns := fake.storeParticipantReturns
fake.recordInvocation("StoreParticipant", []interface{}{arg1, arg2})
fake.recordInvocation("StoreParticipant", []interface{}{arg1, arg2, arg3})
fake.storeParticipantMutex.Unlock()
if stub != nil {
return stub(arg1, arg2)
return stub(arg1, arg2, arg3)
}
if specificReturn {
return ret.result1
@@ -601,17 +627,17 @@ func (fake *FakeRoomStore) StoreParticipantCallCount() int {
return len(fake.storeParticipantArgsForCall)
}
func (fake *FakeRoomStore) StoreParticipantCalls(stub func(string, *livekit.ParticipantInfo) error) {
func (fake *FakeRoomStore) StoreParticipantCalls(stub func(context.Context, string, *livekit.ParticipantInfo) error) {
fake.storeParticipantMutex.Lock()
defer fake.storeParticipantMutex.Unlock()
fake.StoreParticipantStub = stub
}
func (fake *FakeRoomStore) StoreParticipantArgsForCall(i int) (string, *livekit.ParticipantInfo) {
func (fake *FakeRoomStore) StoreParticipantArgsForCall(i int) (context.Context, string, *livekit.ParticipantInfo) {
fake.storeParticipantMutex.RLock()
defer fake.storeParticipantMutex.RUnlock()
argsForCall := fake.storeParticipantArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
}
func (fake *FakeRoomStore) StoreParticipantReturns(result1 error) {
@@ -637,18 +663,19 @@ func (fake *FakeRoomStore) StoreParticipantReturnsOnCall(i int, result1 error) {
}{result1}
}
func (fake *FakeRoomStore) StoreRoom(arg1 *livekit.Room) error {
func (fake *FakeRoomStore) StoreRoom(arg1 context.Context, arg2 *livekit.Room) error {
fake.storeRoomMutex.Lock()
ret, specificReturn := fake.storeRoomReturnsOnCall[len(fake.storeRoomArgsForCall)]
fake.storeRoomArgsForCall = append(fake.storeRoomArgsForCall, struct {
arg1 *livekit.Room
}{arg1})
arg1 context.Context
arg2 *livekit.Room
}{arg1, arg2})
stub := fake.StoreRoomStub
fakeReturns := fake.storeRoomReturns
fake.recordInvocation("StoreRoom", []interface{}{arg1})
fake.recordInvocation("StoreRoom", []interface{}{arg1, arg2})
fake.storeRoomMutex.Unlock()
if stub != nil {
return stub(arg1)
return stub(arg1, arg2)
}
if specificReturn {
return ret.result1
@@ -662,17 +689,17 @@ func (fake *FakeRoomStore) StoreRoomCallCount() int {
return len(fake.storeRoomArgsForCall)
}
func (fake *FakeRoomStore) StoreRoomCalls(stub func(*livekit.Room) error) {
func (fake *FakeRoomStore) StoreRoomCalls(stub func(context.Context, *livekit.Room) error) {
fake.storeRoomMutex.Lock()
defer fake.storeRoomMutex.Unlock()
fake.StoreRoomStub = stub
}
func (fake *FakeRoomStore) StoreRoomArgsForCall(i int) *livekit.Room {
func (fake *FakeRoomStore) StoreRoomArgsForCall(i int) (context.Context, *livekit.Room) {
fake.storeRoomMutex.RLock()
defer fake.storeRoomMutex.RUnlock()
argsForCall := fake.storeRoomArgsForCall[i]
return argsForCall.arg1
return argsForCall.arg1, argsForCall.arg2
}
func (fake *FakeRoomStore) StoreRoomReturns(result1 error) {
@@ -698,19 +725,20 @@ func (fake *FakeRoomStore) StoreRoomReturnsOnCall(i int, result1 error) {
}{result1}
}
func (fake *FakeRoomStore) UnlockRoom(arg1 string, arg2 string) error {
func (fake *FakeRoomStore) UnlockRoom(arg1 context.Context, arg2 string, arg3 string) error {
fake.unlockRoomMutex.Lock()
ret, specificReturn := fake.unlockRoomReturnsOnCall[len(fake.unlockRoomArgsForCall)]
fake.unlockRoomArgsForCall = append(fake.unlockRoomArgsForCall, struct {
arg1 string
arg1 context.Context
arg2 string
}{arg1, arg2})
arg3 string
}{arg1, arg2, arg3})
stub := fake.UnlockRoomStub
fakeReturns := fake.unlockRoomReturns
fake.recordInvocation("UnlockRoom", []interface{}{arg1, arg2})
fake.recordInvocation("UnlockRoom", []interface{}{arg1, arg2, arg3})
fake.unlockRoomMutex.Unlock()
if stub != nil {
return stub(arg1, arg2)
return stub(arg1, arg2, arg3)
}
if specificReturn {
return ret.result1
@@ -724,17 +752,17 @@ func (fake *FakeRoomStore) UnlockRoomCallCount() int {
return len(fake.unlockRoomArgsForCall)
}
func (fake *FakeRoomStore) UnlockRoomCalls(stub func(string, string) error) {
func (fake *FakeRoomStore) UnlockRoomCalls(stub func(context.Context, string, string) error) {
fake.unlockRoomMutex.Lock()
defer fake.unlockRoomMutex.Unlock()
fake.UnlockRoomStub = stub
}
func (fake *FakeRoomStore) UnlockRoomArgsForCall(i int) (string, string) {
func (fake *FakeRoomStore) UnlockRoomArgsForCall(i int) (context.Context, string, string) {
fake.unlockRoomMutex.RLock()
defer fake.unlockRoomMutex.RUnlock()
argsForCall := fake.unlockRoomArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
}
func (fake *FakeRoomStore) UnlockRoomReturns(result1 error) {
+2 -1
View File
@@ -1,6 +1,7 @@
package service
import (
"context"
"crypto/tls"
"net"
"strconv"
@@ -96,7 +97,7 @@ func NewTurnServer(conf *config.Config, roomStore RoomStore, node routing.LocalN
func newTurnAuthHandler(roomStore RoomStore) turn.AuthHandler {
return func(username, realm string, srcAddr net.Addr) (key []byte, ok bool) {
// room id should be the username, create a hashed room id
rm, err := roomStore.LoadRoom(username)
rm, err := roomStore.LoadRoom(context.Background(), username)
if err != nil {
return nil, false
}
+4 -2
View File
@@ -1,13 +1,15 @@
package test
import (
"context"
"strings"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/livekit/livekit-server/pkg/testutils"
testclient "github.com/livekit/livekit-server/test/client"
"github.com/stretchr/testify/require"
)
func TestClientCouldConnect(t *testing.T) {
@@ -100,7 +102,7 @@ func TestSinglePublisher(t *testing.T) {
c3.Stop()
success = testutils.WithTimeout(t, "c3 is cleaned up as a subscriber", func() bool {
room := s.RoomManager().GetRoom(testRoom)
room := s.RoomManager().GetRoom(context.Background(), testRoom)
require.NotNil(t, room)
p := room.GetParticipant("c1")
+1 -1
View File
@@ -73,7 +73,7 @@ func TestWebhooks(t *testing.T) {
ts.ClearEvents()
// room closed
rm := server.RoomManager().GetRoom(testRoom)
rm := server.RoomManager().GetRoom(context.Background(), testRoom)
rm.Close()
testutils.WithTimeout(t, "webhook events room_finished", func() bool {
if ts.GetEvent(webhook.EventRoomFinished) == nil {