diff --git a/go.mod b/go.mod index 08f66b8d7..f0137af2c 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/google/wire v0.5.0 github.com/gorilla/websocket v1.4.2 github.com/hashicorp/golang-lru v0.5.4 - github.com/livekit/protocol v0.13.5-0.20220721030958-86da2252193b + github.com/livekit/protocol v0.13.5-0.20220726184153-ad9c55ddef52 github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a github.com/mackerelio/go-osstat v0.2.1 github.com/magefile/mage v1.13.0 diff --git a/go.sum b/go.sum index 1e4b1179c..376ffa255 100644 --- a/go.sum +++ b/go.sum @@ -237,6 +237,8 @@ github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJV github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= github.com/livekit/protocol v0.13.5-0.20220721030958-86da2252193b h1:7p5M5WoTFDyIvxcB+r2aqMYKU8tFYf8MA52vXY15naI= github.com/livekit/protocol v0.13.5-0.20220721030958-86da2252193b/go.mod h1:Qd/Dn4BkJfZQy/IjtEeUOGXARrR7l09WDkg5SY8thkw= +github.com/livekit/protocol v0.13.5-0.20220726184153-ad9c55ddef52 h1:E0trQ3RLu2b9hjSiJG1+1hyK/8v57NPJznA7/lKj0qY= +github.com/livekit/protocol v0.13.5-0.20220726184153-ad9c55ddef52/go.mod h1:Qd/Dn4BkJfZQy/IjtEeUOGXARrR7l09WDkg5SY8thkw= github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a h1:cENjhGfslLSDV07gt8ASy47Wd12Q0kBS7hsdunyQ62I= github.com/livekit/rtcscore-go v0.0.0-20220524203225-dfd1ba40744a/go.mod h1:116ych8UaEs9vfIE8n6iZCZ30iagUFTls0vRmC+Ix5U= github.com/mackerelio/go-osstat v0.2.1 h1:5AeAcBEutEErAOlDz6WCkEvm6AKYgHTUQrfwm5RbeQc= diff --git a/pkg/service/errors.go b/pkg/service/errors.go index 5f1695cb5..a464898a0 100644 --- a/pkg/service/errors.go +++ b/pkg/service/errors.go @@ -6,6 +6,7 @@ var ( ErrEgressNotFound = errors.New("egress does not exist") ErrEgressNotConnected = errors.New("egress not connected (redis required)") ErrIdentityEmpty = errors.New("identity cannot be empty") + ErrIngressNotFound = errors.New("ingress does not exist") ErrMetadataExceedsLimits = errors.New("metadata size exceeds limits") ErrOperationFailed = errors.New("operation cannot be completed") ErrParticipantNotFound = errors.New("participant does not exist") diff --git a/pkg/service/interfaces.go b/pkg/service/interfaces.go index b36e254dc..9beedf7fc 100644 --- a/pkg/service/interfaces.go +++ b/pkg/service/interfaces.go @@ -41,6 +41,13 @@ type ServiceStore interface { ListEgress(ctx context.Context, roomID livekit.RoomID) ([]*livekit.EgressInfo, error) UpdateEgress(ctx context.Context, info *livekit.EgressInfo) error DeleteEgress(ctx context.Context, info *livekit.EgressInfo) error + + StoreIngress(ctx context.Context, info *livekit.IngressInfo) error + LoadIngress(ctx context.Context, ingressID string) (*livekit.IngressInfo, error) + LoadIngressFromStreamKey(ctx context.Context, streamKey string) (*livekit.IngressInfo, error) + ListIngress(ctx context.Context, roomName livekit.RoomName) ([]*livekit.IngressInfo, error) + UpdateIngress(ctx context.Context, info *livekit.IngressInfo) error + DeleteIngress(ctx context.Context, info *livekit.IngressInfo) error } //counterfeiter:generate . RoomAllocator diff --git a/pkg/service/localstore.go b/pkg/service/localstore.go index 9f9e86e21..bb54f0d88 100644 --- a/pkg/service/localstore.go +++ b/pkg/service/localstore.go @@ -169,3 +169,39 @@ func (s *LocalStore) DeleteEgress(_ context.Context, _ *livekit.EgressInfo) erro // redis is required for egress return nil } + +func (s *LocalStore) StoreIngress(_ context.Context, _ *livekit.IngressInfo) error { + // redis is required for ingress + + return nil +} + +func (s *LocalStore) LoadIngress(_ context.Context, _ string) (*livekit.IngressInfo, error) { + // redis is required for ingress + + return nil, nil +} + +func (s *LocalStore) LoadIngressFromStreamKey(_ context.Context, _ string) (*livekit.IngressInfo, error) { + // redis is required for ingress + + return nil, nil +} + +func (s *LocalStore) ListIngress(_ context.Context, _ livekit.RoomName) ([]*livekit.IngressInfo, error) { + // redis is required for ingress + + return nil, nil +} + +func (s *LocalStore) UpdateIngress(_ context.Context, _ *livekit.IngressInfo) error { + // redis is required for ingress + + return nil +} + +func (s *LocalStore) DeleteIngress(_ context.Context, _ *livekit.IngressInfo) error { + // redis is required for ingress + + return nil +} diff --git a/pkg/service/redisstore.go b/pkg/service/redisstore.go index eab293abe..10fd41a8d 100644 --- a/pkg/service/redisstore.go +++ b/pkg/service/redisstore.go @@ -20,11 +20,18 @@ const ( EgressKey = "egress" RoomEgressPrefix = "room_egress:" + // IngressKey is a hash of ingressID => ingress info + IngressKey = "ingress" + StreamKeyKey = "stream_key" + RoomIngressPrefix = "room_ingress:" + // RoomParticipantsPrefix is hash of participant_name => ParticipantInfo RoomParticipantsPrefix = "room_participants:" // RoomLockPrefix is a simple key containing a provided lock uid RoomLockPrefix = "room_lock:" + + maxRetries = 5 ) type RedisStore struct { @@ -324,3 +331,181 @@ func (s *RedisStore) DeleteEgress(_ context.Context, info *livekit.EgressInfo) e return s.rc.HDel(s.ctx, EgressKey, info.EgressId).Err() } + +func (s *RedisStore) StoreIngress(_ context.Context, info *livekit.IngressInfo) error { + if info.IngressId == "" { + return errors.New("Missing IngressId") + } + if info.StreamKey == "" { + return errors.New("Missing StreamKey") + } + + data, err := proto.Marshal(info) + if err != nil { + return err + } + + // Use a "transaction" to remove the old room association if it changed + txf := func(tx *redis.Tx) error { + var oldRoom string + + oldInfo, err := s.loadIngress(tx, info.IngressId) + switch err { + case ErrIngressNotFound: + // Ingress doesn't exist yet + case nil: + oldRoom = oldInfo.RoomName + default: + return err + } + + results, err := tx.TxPipelined(s.ctx, func(pipe redis.Pipeliner) error { + pipe.HSet(s.ctx, IngressKey, info.IngressId, data) + pipe.HSet(s.ctx, StreamKeyKey, info.IngressId, info.StreamKey) + + if oldRoom != info.RoomName { + if oldRoom != "" { + pipe.SRem(s.ctx, RoomIngressPrefix+oldRoom, info.IngressId) + } + if info.RoomName != "" { + pipe.SAdd(s.ctx, RoomIngressPrefix+info.RoomName, info.IngressId) + } + } + + return nil + }) + + if err != nil { + return err + } + + for _, res := range results { + if err := res.Err(); err != nil { + return err + } + } + + return nil + } + + // Retry if the key has been changed. + for i := 0; i < maxRetries; i++ { + err := s.rc.Watch(s.ctx, txf, IngressKey, StreamKeyKey) + if err == nil { + // Success. + return nil + } + if err == redis.TxFailedErr { + // Optimistic lock lost. Retry. + continue + } + // Return any other error. + return err + } + + return nil +} + +func (s *RedisStore) loadIngress(c redis.Cmdable, ingressId string) (*livekit.IngressInfo, error) { + data, err := c.HGet(s.ctx, IngressKey, ingressId).Result() + if err != nil { + if err == redis.Nil { + return nil, ErrIngressNotFound + } + return nil, err + } + + info := &livekit.IngressInfo{} + err = proto.Unmarshal([]byte(data), info) + if err != nil { + return nil, err + } + + return info, nil +} + +func (s *RedisStore) LoadIngress(_ context.Context, ingressId string) (*livekit.IngressInfo, error) { + return s.loadIngress(s.rc, ingressId) +} + +func (s *RedisStore) LoadIngressFromStreamKey(_ context.Context, streamKey string) (*livekit.IngressInfo, error) { + ingressId, err := s.rc.HGet(s.ctx, StreamKeyKey, streamKey).Result() + switch err { + case nil: + case redis.Nil: + return nil, ErrIngressNotFound + default: + return nil, err + } + + return s.loadIngress(s.rc, ingressId) +} + +func (s *RedisStore) ListIngress(_ context.Context, roomName livekit.RoomName) ([]*livekit.IngressInfo, error) { + var infos []*livekit.IngressInfo + + if roomName == "" { + data, err := s.rc.HGetAll(s.ctx, IngressKey).Result() + if err != nil { + if err == redis.Nil { + return nil, nil + } + return nil, err + } + + for _, d := range data { + info := &livekit.IngressInfo{} + err = proto.Unmarshal([]byte(d), info) + if err != nil { + return nil, err + } + infos = append(infos, info) + } + } else { + ids, err := s.rc.SMembers(s.ctx, RoomIngressPrefix+string(roomName)).Result() + if err != nil { + if err == redis.Nil { + return nil, nil + } + return nil, err + } + + data, _ := s.rc.HMGet(s.ctx, IngressKey, ids...).Result() + for _, d := range data { + if d == nil { + continue + } + info := &livekit.IngressInfo{} + err = proto.Unmarshal([]byte(d.(string)), info) + if err != nil { + return nil, err + } + infos = append(infos, info) + } + } + + return infos, nil +} + +func (s *RedisStore) UpdateIngress(ctx context.Context, info *livekit.IngressInfo) error { + return s.StoreIngress(ctx, info) +} + +func (s *RedisStore) DeleteIngress(ctx context.Context, info *livekit.IngressInfo) error { + err := s.rc.SRem(s.ctx, RoomIngressPrefix+info.RoomName, info.IngressId).Err() + if err != nil { + return err + } + + err = s.rc.HDel(s.ctx, StreamKeyKey, info.IngressId).Err() + if err != nil { + return err + } + + err = s.rc.HDel(s.ctx, EgressKey, info.IngressId).Err() + if err != nil { + return err + } + + return nil +} diff --git a/pkg/service/redisstore_test.go b/pkg/service/redisstore_test.go index 1f6eb6868..951b54f02 100644 --- a/pkg/service/redisstore_test.go +++ b/pkg/service/redisstore_test.go @@ -110,3 +110,49 @@ func TestRoomLock(t *testing.T) { _ = rs.UnlockRoom(ctx, roomName, token2) }) } + +func TestStoreIngress(t *testing.T) { + ctx := context.Background() + rs := service.NewRedisStore(redisClient()) + + info := &livekit.IngressInfo{ + IngressId: "ingressId", + StreamKey: "streamKey", + } + + err := rs.StoreIngress(ctx, info) + require.NoError(t, err) + + pulledInfo, err := rs.LoadIngress(ctx, "ingressId") + require.NoError(t, err) + compareIngressInfo(t, pulledInfo, info) + + infos, err := rs.ListIngress(ctx, "room") + require.NoError(t, err) + require.Equal(t, 0, len(infos)) + + info.RoomName = "room" + err = rs.UpdateIngress(ctx, info) + require.NoError(t, err) + + infos, err = rs.ListIngress(ctx, "room") + require.NoError(t, err) + + require.NoError(t, err) + require.Equal(t, 1, len(infos)) + compareIngressInfo(t, infos[0], info) + + info.RoomName = "" + err = rs.UpdateIngress(ctx, info) + require.NoError(t, err) + + infos, err = rs.ListIngress(ctx, "room") + require.NoError(t, err) + require.Equal(t, 0, len(infos)) +} + +func compareIngressInfo(t *testing.T, expected, v *livekit.IngressInfo) { + require.Equal(t, expected.IngressId, v.IngressId) + require.Equal(t, expected.StreamKey, v.StreamKey) + require.Equal(t, expected.RoomName, v.RoomName) +} diff --git a/pkg/service/servicefakes/fake_object_store.go b/pkg/service/servicefakes/fake_object_store.go index 39ff2a54d..5ef01673b 100644 --- a/pkg/service/servicefakes/fake_object_store.go +++ b/pkg/service/servicefakes/fake_object_store.go @@ -23,6 +23,18 @@ type FakeObjectStore struct { deleteEgressReturnsOnCall map[int]struct { result1 error } + DeleteIngressStub func(context.Context, *livekit.IngressInfo) error + deleteIngressMutex sync.RWMutex + deleteIngressArgsForCall []struct { + arg1 context.Context + arg2 *livekit.IngressInfo + } + deleteIngressReturns struct { + result1 error + } + deleteIngressReturnsOnCall map[int]struct { + result1 error + } DeleteParticipantStub func(context.Context, livekit.RoomName, livekit.ParticipantIdentity) error deleteParticipantMutex sync.RWMutex deleteParticipantArgsForCall []struct { @@ -62,6 +74,20 @@ type FakeObjectStore struct { result1 []*livekit.EgressInfo result2 error } + ListIngressStub func(context.Context, livekit.RoomName) ([]*livekit.IngressInfo, error) + listIngressMutex sync.RWMutex + listIngressArgsForCall []struct { + arg1 context.Context + arg2 livekit.RoomName + } + listIngressReturns struct { + result1 []*livekit.IngressInfo + result2 error + } + listIngressReturnsOnCall map[int]struct { + result1 []*livekit.IngressInfo + result2 error + } ListParticipantsStub func(context.Context, livekit.RoomName) ([]*livekit.ParticipantInfo, error) listParticipantsMutex sync.RWMutex listParticipantsArgsForCall []struct { @@ -104,6 +130,34 @@ type FakeObjectStore struct { result1 *livekit.EgressInfo result2 error } + LoadIngressStub func(context.Context, string) (*livekit.IngressInfo, error) + loadIngressMutex sync.RWMutex + loadIngressArgsForCall []struct { + arg1 context.Context + arg2 string + } + loadIngressReturns struct { + result1 *livekit.IngressInfo + result2 error + } + loadIngressReturnsOnCall map[int]struct { + result1 *livekit.IngressInfo + result2 error + } + LoadIngressFromStreamKeyStub func(context.Context, string) (*livekit.IngressInfo, error) + loadIngressFromStreamKeyMutex sync.RWMutex + loadIngressFromStreamKeyArgsForCall []struct { + arg1 context.Context + arg2 string + } + loadIngressFromStreamKeyReturns struct { + result1 *livekit.IngressInfo + result2 error + } + loadIngressFromStreamKeyReturnsOnCall map[int]struct { + result1 *livekit.IngressInfo + result2 error + } LoadParticipantStub func(context.Context, livekit.RoomName, livekit.ParticipantIdentity) (*livekit.ParticipantInfo, error) loadParticipantMutex sync.RWMutex loadParticipantArgsForCall []struct { @@ -160,6 +214,18 @@ type FakeObjectStore struct { storeEgressReturnsOnCall map[int]struct { result1 error } + StoreIngressStub func(context.Context, *livekit.IngressInfo) error + storeIngressMutex sync.RWMutex + storeIngressArgsForCall []struct { + arg1 context.Context + arg2 *livekit.IngressInfo + } + storeIngressReturns struct { + result1 error + } + storeIngressReturnsOnCall map[int]struct { + result1 error + } StoreParticipantStub func(context.Context, livekit.RoomName, *livekit.ParticipantInfo) error storeParticipantMutex sync.RWMutex storeParticipantArgsForCall []struct { @@ -210,6 +276,18 @@ type FakeObjectStore struct { updateEgressReturnsOnCall map[int]struct { result1 error } + UpdateIngressStub func(context.Context, *livekit.IngressInfo) error + updateIngressMutex sync.RWMutex + updateIngressArgsForCall []struct { + arg1 context.Context + arg2 *livekit.IngressInfo + } + updateIngressReturns struct { + result1 error + } + updateIngressReturnsOnCall map[int]struct { + result1 error + } invocations map[string][][]interface{} invocationsMutex sync.RWMutex } @@ -276,6 +354,68 @@ func (fake *FakeObjectStore) DeleteEgressReturnsOnCall(i int, result1 error) { }{result1} } +func (fake *FakeObjectStore) DeleteIngress(arg1 context.Context, arg2 *livekit.IngressInfo) error { + fake.deleteIngressMutex.Lock() + ret, specificReturn := fake.deleteIngressReturnsOnCall[len(fake.deleteIngressArgsForCall)] + fake.deleteIngressArgsForCall = append(fake.deleteIngressArgsForCall, struct { + arg1 context.Context + arg2 *livekit.IngressInfo + }{arg1, arg2}) + stub := fake.DeleteIngressStub + fakeReturns := fake.deleteIngressReturns + fake.recordInvocation("DeleteIngress", []interface{}{arg1, arg2}) + fake.deleteIngressMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeObjectStore) DeleteIngressCallCount() int { + fake.deleteIngressMutex.RLock() + defer fake.deleteIngressMutex.RUnlock() + return len(fake.deleteIngressArgsForCall) +} + +func (fake *FakeObjectStore) DeleteIngressCalls(stub func(context.Context, *livekit.IngressInfo) error) { + fake.deleteIngressMutex.Lock() + defer fake.deleteIngressMutex.Unlock() + fake.DeleteIngressStub = stub +} + +func (fake *FakeObjectStore) DeleteIngressArgsForCall(i int) (context.Context, *livekit.IngressInfo) { + fake.deleteIngressMutex.RLock() + defer fake.deleteIngressMutex.RUnlock() + argsForCall := fake.deleteIngressArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeObjectStore) DeleteIngressReturns(result1 error) { + fake.deleteIngressMutex.Lock() + defer fake.deleteIngressMutex.Unlock() + fake.DeleteIngressStub = nil + fake.deleteIngressReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeObjectStore) DeleteIngressReturnsOnCall(i int, result1 error) { + fake.deleteIngressMutex.Lock() + defer fake.deleteIngressMutex.Unlock() + fake.DeleteIngressStub = nil + if fake.deleteIngressReturnsOnCall == nil { + fake.deleteIngressReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.deleteIngressReturnsOnCall[i] = struct { + result1 error + }{result1} +} + func (fake *FakeObjectStore) DeleteParticipant(arg1 context.Context, arg2 livekit.RoomName, arg3 livekit.ParticipantIdentity) error { fake.deleteParticipantMutex.Lock() ret, specificReturn := fake.deleteParticipantReturnsOnCall[len(fake.deleteParticipantArgsForCall)] @@ -466,6 +606,71 @@ func (fake *FakeObjectStore) ListEgressReturnsOnCall(i int, result1 []*livekit.E }{result1, result2} } +func (fake *FakeObjectStore) ListIngress(arg1 context.Context, arg2 livekit.RoomName) ([]*livekit.IngressInfo, error) { + fake.listIngressMutex.Lock() + ret, specificReturn := fake.listIngressReturnsOnCall[len(fake.listIngressArgsForCall)] + fake.listIngressArgsForCall = append(fake.listIngressArgsForCall, struct { + arg1 context.Context + arg2 livekit.RoomName + }{arg1, arg2}) + stub := fake.ListIngressStub + fakeReturns := fake.listIngressReturns + fake.recordInvocation("ListIngress", []interface{}{arg1, arg2}) + fake.listIngressMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeObjectStore) ListIngressCallCount() int { + fake.listIngressMutex.RLock() + defer fake.listIngressMutex.RUnlock() + return len(fake.listIngressArgsForCall) +} + +func (fake *FakeObjectStore) ListIngressCalls(stub func(context.Context, livekit.RoomName) ([]*livekit.IngressInfo, error)) { + fake.listIngressMutex.Lock() + defer fake.listIngressMutex.Unlock() + fake.ListIngressStub = stub +} + +func (fake *FakeObjectStore) ListIngressArgsForCall(i int) (context.Context, livekit.RoomName) { + fake.listIngressMutex.RLock() + defer fake.listIngressMutex.RUnlock() + argsForCall := fake.listIngressArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeObjectStore) ListIngressReturns(result1 []*livekit.IngressInfo, result2 error) { + fake.listIngressMutex.Lock() + defer fake.listIngressMutex.Unlock() + fake.ListIngressStub = nil + fake.listIngressReturns = struct { + result1 []*livekit.IngressInfo + result2 error + }{result1, result2} +} + +func (fake *FakeObjectStore) ListIngressReturnsOnCall(i int, result1 []*livekit.IngressInfo, result2 error) { + fake.listIngressMutex.Lock() + defer fake.listIngressMutex.Unlock() + fake.ListIngressStub = nil + if fake.listIngressReturnsOnCall == nil { + fake.listIngressReturnsOnCall = make(map[int]struct { + result1 []*livekit.IngressInfo + result2 error + }) + } + fake.listIngressReturnsOnCall[i] = struct { + result1 []*livekit.IngressInfo + result2 error + }{result1, result2} +} + func (fake *FakeObjectStore) ListParticipants(arg1 context.Context, arg2 livekit.RoomName) ([]*livekit.ParticipantInfo, error) { fake.listParticipantsMutex.Lock() ret, specificReturn := fake.listParticipantsReturnsOnCall[len(fake.listParticipantsArgsForCall)] @@ -666,6 +871,136 @@ func (fake *FakeObjectStore) LoadEgressReturnsOnCall(i int, result1 *livekit.Egr }{result1, result2} } +func (fake *FakeObjectStore) LoadIngress(arg1 context.Context, arg2 string) (*livekit.IngressInfo, error) { + fake.loadIngressMutex.Lock() + ret, specificReturn := fake.loadIngressReturnsOnCall[len(fake.loadIngressArgsForCall)] + fake.loadIngressArgsForCall = append(fake.loadIngressArgsForCall, struct { + arg1 context.Context + arg2 string + }{arg1, arg2}) + stub := fake.LoadIngressStub + fakeReturns := fake.loadIngressReturns + fake.recordInvocation("LoadIngress", []interface{}{arg1, arg2}) + fake.loadIngressMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeObjectStore) LoadIngressCallCount() int { + fake.loadIngressMutex.RLock() + defer fake.loadIngressMutex.RUnlock() + return len(fake.loadIngressArgsForCall) +} + +func (fake *FakeObjectStore) LoadIngressCalls(stub func(context.Context, string) (*livekit.IngressInfo, error)) { + fake.loadIngressMutex.Lock() + defer fake.loadIngressMutex.Unlock() + fake.LoadIngressStub = stub +} + +func (fake *FakeObjectStore) LoadIngressArgsForCall(i int) (context.Context, string) { + fake.loadIngressMutex.RLock() + defer fake.loadIngressMutex.RUnlock() + argsForCall := fake.loadIngressArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeObjectStore) LoadIngressReturns(result1 *livekit.IngressInfo, result2 error) { + fake.loadIngressMutex.Lock() + defer fake.loadIngressMutex.Unlock() + fake.LoadIngressStub = nil + fake.loadIngressReturns = struct { + result1 *livekit.IngressInfo + result2 error + }{result1, result2} +} + +func (fake *FakeObjectStore) LoadIngressReturnsOnCall(i int, result1 *livekit.IngressInfo, result2 error) { + fake.loadIngressMutex.Lock() + defer fake.loadIngressMutex.Unlock() + fake.LoadIngressStub = nil + if fake.loadIngressReturnsOnCall == nil { + fake.loadIngressReturnsOnCall = make(map[int]struct { + result1 *livekit.IngressInfo + result2 error + }) + } + fake.loadIngressReturnsOnCall[i] = struct { + result1 *livekit.IngressInfo + result2 error + }{result1, result2} +} + +func (fake *FakeObjectStore) LoadIngressFromStreamKey(arg1 context.Context, arg2 string) (*livekit.IngressInfo, error) { + fake.loadIngressFromStreamKeyMutex.Lock() + ret, specificReturn := fake.loadIngressFromStreamKeyReturnsOnCall[len(fake.loadIngressFromStreamKeyArgsForCall)] + fake.loadIngressFromStreamKeyArgsForCall = append(fake.loadIngressFromStreamKeyArgsForCall, struct { + arg1 context.Context + arg2 string + }{arg1, arg2}) + stub := fake.LoadIngressFromStreamKeyStub + fakeReturns := fake.loadIngressFromStreamKeyReturns + fake.recordInvocation("LoadIngressFromStreamKey", []interface{}{arg1, arg2}) + fake.loadIngressFromStreamKeyMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeObjectStore) LoadIngressFromStreamKeyCallCount() int { + fake.loadIngressFromStreamKeyMutex.RLock() + defer fake.loadIngressFromStreamKeyMutex.RUnlock() + return len(fake.loadIngressFromStreamKeyArgsForCall) +} + +func (fake *FakeObjectStore) LoadIngressFromStreamKeyCalls(stub func(context.Context, string) (*livekit.IngressInfo, error)) { + fake.loadIngressFromStreamKeyMutex.Lock() + defer fake.loadIngressFromStreamKeyMutex.Unlock() + fake.LoadIngressFromStreamKeyStub = stub +} + +func (fake *FakeObjectStore) LoadIngressFromStreamKeyArgsForCall(i int) (context.Context, string) { + fake.loadIngressFromStreamKeyMutex.RLock() + defer fake.loadIngressFromStreamKeyMutex.RUnlock() + argsForCall := fake.loadIngressFromStreamKeyArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeObjectStore) LoadIngressFromStreamKeyReturns(result1 *livekit.IngressInfo, result2 error) { + fake.loadIngressFromStreamKeyMutex.Lock() + defer fake.loadIngressFromStreamKeyMutex.Unlock() + fake.LoadIngressFromStreamKeyStub = nil + fake.loadIngressFromStreamKeyReturns = struct { + result1 *livekit.IngressInfo + result2 error + }{result1, result2} +} + +func (fake *FakeObjectStore) LoadIngressFromStreamKeyReturnsOnCall(i int, result1 *livekit.IngressInfo, result2 error) { + fake.loadIngressFromStreamKeyMutex.Lock() + defer fake.loadIngressFromStreamKeyMutex.Unlock() + fake.LoadIngressFromStreamKeyStub = nil + if fake.loadIngressFromStreamKeyReturnsOnCall == nil { + fake.loadIngressFromStreamKeyReturnsOnCall = make(map[int]struct { + result1 *livekit.IngressInfo + result2 error + }) + } + fake.loadIngressFromStreamKeyReturnsOnCall[i] = struct { + result1 *livekit.IngressInfo + result2 error + }{result1, result2} +} + func (fake *FakeObjectStore) LoadParticipant(arg1 context.Context, arg2 livekit.RoomName, arg3 livekit.ParticipantIdentity) (*livekit.ParticipantInfo, error) { fake.loadParticipantMutex.Lock() ret, specificReturn := fake.loadParticipantReturnsOnCall[len(fake.loadParticipantArgsForCall)] @@ -925,6 +1260,68 @@ func (fake *FakeObjectStore) StoreEgressReturnsOnCall(i int, result1 error) { }{result1} } +func (fake *FakeObjectStore) StoreIngress(arg1 context.Context, arg2 *livekit.IngressInfo) error { + fake.storeIngressMutex.Lock() + ret, specificReturn := fake.storeIngressReturnsOnCall[len(fake.storeIngressArgsForCall)] + fake.storeIngressArgsForCall = append(fake.storeIngressArgsForCall, struct { + arg1 context.Context + arg2 *livekit.IngressInfo + }{arg1, arg2}) + stub := fake.StoreIngressStub + fakeReturns := fake.storeIngressReturns + fake.recordInvocation("StoreIngress", []interface{}{arg1, arg2}) + fake.storeIngressMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeObjectStore) StoreIngressCallCount() int { + fake.storeIngressMutex.RLock() + defer fake.storeIngressMutex.RUnlock() + return len(fake.storeIngressArgsForCall) +} + +func (fake *FakeObjectStore) StoreIngressCalls(stub func(context.Context, *livekit.IngressInfo) error) { + fake.storeIngressMutex.Lock() + defer fake.storeIngressMutex.Unlock() + fake.StoreIngressStub = stub +} + +func (fake *FakeObjectStore) StoreIngressArgsForCall(i int) (context.Context, *livekit.IngressInfo) { + fake.storeIngressMutex.RLock() + defer fake.storeIngressMutex.RUnlock() + argsForCall := fake.storeIngressArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeObjectStore) StoreIngressReturns(result1 error) { + fake.storeIngressMutex.Lock() + defer fake.storeIngressMutex.Unlock() + fake.StoreIngressStub = nil + fake.storeIngressReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeObjectStore) StoreIngressReturnsOnCall(i int, result1 error) { + fake.storeIngressMutex.Lock() + defer fake.storeIngressMutex.Unlock() + fake.StoreIngressStub = nil + if fake.storeIngressReturnsOnCall == nil { + fake.storeIngressReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.storeIngressReturnsOnCall[i] = struct { + result1 error + }{result1} +} + func (fake *FakeObjectStore) StoreParticipant(arg1 context.Context, arg2 livekit.RoomName, arg3 *livekit.ParticipantInfo) error { fake.storeParticipantMutex.Lock() ret, specificReturn := fake.storeParticipantReturnsOnCall[len(fake.storeParticipantArgsForCall)] @@ -1175,23 +1572,93 @@ func (fake *FakeObjectStore) UpdateEgressReturnsOnCall(i int, result1 error) { }{result1} } +func (fake *FakeObjectStore) UpdateIngress(arg1 context.Context, arg2 *livekit.IngressInfo) error { + fake.updateIngressMutex.Lock() + ret, specificReturn := fake.updateIngressReturnsOnCall[len(fake.updateIngressArgsForCall)] + fake.updateIngressArgsForCall = append(fake.updateIngressArgsForCall, struct { + arg1 context.Context + arg2 *livekit.IngressInfo + }{arg1, arg2}) + stub := fake.UpdateIngressStub + fakeReturns := fake.updateIngressReturns + fake.recordInvocation("UpdateIngress", []interface{}{arg1, arg2}) + fake.updateIngressMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeObjectStore) UpdateIngressCallCount() int { + fake.updateIngressMutex.RLock() + defer fake.updateIngressMutex.RUnlock() + return len(fake.updateIngressArgsForCall) +} + +func (fake *FakeObjectStore) UpdateIngressCalls(stub func(context.Context, *livekit.IngressInfo) error) { + fake.updateIngressMutex.Lock() + defer fake.updateIngressMutex.Unlock() + fake.UpdateIngressStub = stub +} + +func (fake *FakeObjectStore) UpdateIngressArgsForCall(i int) (context.Context, *livekit.IngressInfo) { + fake.updateIngressMutex.RLock() + defer fake.updateIngressMutex.RUnlock() + argsForCall := fake.updateIngressArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeObjectStore) UpdateIngressReturns(result1 error) { + fake.updateIngressMutex.Lock() + defer fake.updateIngressMutex.Unlock() + fake.UpdateIngressStub = nil + fake.updateIngressReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeObjectStore) UpdateIngressReturnsOnCall(i int, result1 error) { + fake.updateIngressMutex.Lock() + defer fake.updateIngressMutex.Unlock() + fake.UpdateIngressStub = nil + if fake.updateIngressReturnsOnCall == nil { + fake.updateIngressReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.updateIngressReturnsOnCall[i] = struct { + result1 error + }{result1} +} + func (fake *FakeObjectStore) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() fake.deleteEgressMutex.RLock() defer fake.deleteEgressMutex.RUnlock() + fake.deleteIngressMutex.RLock() + defer fake.deleteIngressMutex.RUnlock() fake.deleteParticipantMutex.RLock() defer fake.deleteParticipantMutex.RUnlock() fake.deleteRoomMutex.RLock() defer fake.deleteRoomMutex.RUnlock() fake.listEgressMutex.RLock() defer fake.listEgressMutex.RUnlock() + fake.listIngressMutex.RLock() + defer fake.listIngressMutex.RUnlock() fake.listParticipantsMutex.RLock() defer fake.listParticipantsMutex.RUnlock() fake.listRoomsMutex.RLock() defer fake.listRoomsMutex.RUnlock() fake.loadEgressMutex.RLock() defer fake.loadEgressMutex.RUnlock() + fake.loadIngressMutex.RLock() + defer fake.loadIngressMutex.RUnlock() + fake.loadIngressFromStreamKeyMutex.RLock() + defer fake.loadIngressFromStreamKeyMutex.RUnlock() fake.loadParticipantMutex.RLock() defer fake.loadParticipantMutex.RUnlock() fake.loadRoomMutex.RLock() @@ -1200,6 +1667,8 @@ func (fake *FakeObjectStore) Invocations() map[string][][]interface{} { defer fake.lockRoomMutex.RUnlock() fake.storeEgressMutex.RLock() defer fake.storeEgressMutex.RUnlock() + fake.storeIngressMutex.RLock() + defer fake.storeIngressMutex.RUnlock() fake.storeParticipantMutex.RLock() defer fake.storeParticipantMutex.RUnlock() fake.storeRoomMutex.RLock() @@ -1208,6 +1677,8 @@ func (fake *FakeObjectStore) Invocations() map[string][][]interface{} { defer fake.unlockRoomMutex.RUnlock() fake.updateEgressMutex.RLock() defer fake.updateEgressMutex.RUnlock() + fake.updateIngressMutex.RLock() + defer fake.updateIngressMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value diff --git a/pkg/service/servicefakes/fake_service_store.go b/pkg/service/servicefakes/fake_service_store.go index 5c0bd4544..0b5c743ab 100644 --- a/pkg/service/servicefakes/fake_service_store.go +++ b/pkg/service/servicefakes/fake_service_store.go @@ -22,6 +22,18 @@ type FakeServiceStore struct { deleteEgressReturnsOnCall map[int]struct { result1 error } + DeleteIngressStub func(context.Context, *livekit.IngressInfo) error + deleteIngressMutex sync.RWMutex + deleteIngressArgsForCall []struct { + arg1 context.Context + arg2 *livekit.IngressInfo + } + deleteIngressReturns struct { + result1 error + } + deleteIngressReturnsOnCall map[int]struct { + result1 error + } ListEgressStub func(context.Context, livekit.RoomID) ([]*livekit.EgressInfo, error) listEgressMutex sync.RWMutex listEgressArgsForCall []struct { @@ -36,6 +48,20 @@ type FakeServiceStore struct { result1 []*livekit.EgressInfo result2 error } + ListIngressStub func(context.Context, livekit.RoomName) ([]*livekit.IngressInfo, error) + listIngressMutex sync.RWMutex + listIngressArgsForCall []struct { + arg1 context.Context + arg2 livekit.RoomName + } + listIngressReturns struct { + result1 []*livekit.IngressInfo + result2 error + } + listIngressReturnsOnCall map[int]struct { + result1 []*livekit.IngressInfo + result2 error + } ListParticipantsStub func(context.Context, livekit.RoomName) ([]*livekit.ParticipantInfo, error) listParticipantsMutex sync.RWMutex listParticipantsArgsForCall []struct { @@ -78,6 +104,34 @@ type FakeServiceStore struct { result1 *livekit.EgressInfo result2 error } + LoadIngressStub func(context.Context, string) (*livekit.IngressInfo, error) + loadIngressMutex sync.RWMutex + loadIngressArgsForCall []struct { + arg1 context.Context + arg2 string + } + loadIngressReturns struct { + result1 *livekit.IngressInfo + result2 error + } + loadIngressReturnsOnCall map[int]struct { + result1 *livekit.IngressInfo + result2 error + } + LoadIngressFromStreamKeyStub func(context.Context, string) (*livekit.IngressInfo, error) + loadIngressFromStreamKeyMutex sync.RWMutex + loadIngressFromStreamKeyArgsForCall []struct { + arg1 context.Context + arg2 string + } + loadIngressFromStreamKeyReturns struct { + result1 *livekit.IngressInfo + result2 error + } + loadIngressFromStreamKeyReturnsOnCall map[int]struct { + result1 *livekit.IngressInfo + result2 error + } LoadParticipantStub func(context.Context, livekit.RoomName, livekit.ParticipantIdentity) (*livekit.ParticipantInfo, error) loadParticipantMutex sync.RWMutex loadParticipantArgsForCall []struct { @@ -119,6 +173,18 @@ type FakeServiceStore struct { storeEgressReturnsOnCall map[int]struct { result1 error } + StoreIngressStub func(context.Context, *livekit.IngressInfo) error + storeIngressMutex sync.RWMutex + storeIngressArgsForCall []struct { + arg1 context.Context + arg2 *livekit.IngressInfo + } + storeIngressReturns struct { + result1 error + } + storeIngressReturnsOnCall map[int]struct { + result1 error + } UpdateEgressStub func(context.Context, *livekit.EgressInfo) error updateEgressMutex sync.RWMutex updateEgressArgsForCall []struct { @@ -131,6 +197,18 @@ type FakeServiceStore struct { updateEgressReturnsOnCall map[int]struct { result1 error } + UpdateIngressStub func(context.Context, *livekit.IngressInfo) error + updateIngressMutex sync.RWMutex + updateIngressArgsForCall []struct { + arg1 context.Context + arg2 *livekit.IngressInfo + } + updateIngressReturns struct { + result1 error + } + updateIngressReturnsOnCall map[int]struct { + result1 error + } invocations map[string][][]interface{} invocationsMutex sync.RWMutex } @@ -197,6 +275,68 @@ func (fake *FakeServiceStore) DeleteEgressReturnsOnCall(i int, result1 error) { }{result1} } +func (fake *FakeServiceStore) DeleteIngress(arg1 context.Context, arg2 *livekit.IngressInfo) error { + fake.deleteIngressMutex.Lock() + ret, specificReturn := fake.deleteIngressReturnsOnCall[len(fake.deleteIngressArgsForCall)] + fake.deleteIngressArgsForCall = append(fake.deleteIngressArgsForCall, struct { + arg1 context.Context + arg2 *livekit.IngressInfo + }{arg1, arg2}) + stub := fake.DeleteIngressStub + fakeReturns := fake.deleteIngressReturns + fake.recordInvocation("DeleteIngress", []interface{}{arg1, arg2}) + fake.deleteIngressMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeServiceStore) DeleteIngressCallCount() int { + fake.deleteIngressMutex.RLock() + defer fake.deleteIngressMutex.RUnlock() + return len(fake.deleteIngressArgsForCall) +} + +func (fake *FakeServiceStore) DeleteIngressCalls(stub func(context.Context, *livekit.IngressInfo) error) { + fake.deleteIngressMutex.Lock() + defer fake.deleteIngressMutex.Unlock() + fake.DeleteIngressStub = stub +} + +func (fake *FakeServiceStore) DeleteIngressArgsForCall(i int) (context.Context, *livekit.IngressInfo) { + fake.deleteIngressMutex.RLock() + defer fake.deleteIngressMutex.RUnlock() + argsForCall := fake.deleteIngressArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeServiceStore) DeleteIngressReturns(result1 error) { + fake.deleteIngressMutex.Lock() + defer fake.deleteIngressMutex.Unlock() + fake.DeleteIngressStub = nil + fake.deleteIngressReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeServiceStore) DeleteIngressReturnsOnCall(i int, result1 error) { + fake.deleteIngressMutex.Lock() + defer fake.deleteIngressMutex.Unlock() + fake.DeleteIngressStub = nil + if fake.deleteIngressReturnsOnCall == nil { + fake.deleteIngressReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.deleteIngressReturnsOnCall[i] = struct { + result1 error + }{result1} +} + func (fake *FakeServiceStore) ListEgress(arg1 context.Context, arg2 livekit.RoomID) ([]*livekit.EgressInfo, error) { fake.listEgressMutex.Lock() ret, specificReturn := fake.listEgressReturnsOnCall[len(fake.listEgressArgsForCall)] @@ -262,6 +402,71 @@ func (fake *FakeServiceStore) ListEgressReturnsOnCall(i int, result1 []*livekit. }{result1, result2} } +func (fake *FakeServiceStore) ListIngress(arg1 context.Context, arg2 livekit.RoomName) ([]*livekit.IngressInfo, error) { + fake.listIngressMutex.Lock() + ret, specificReturn := fake.listIngressReturnsOnCall[len(fake.listIngressArgsForCall)] + fake.listIngressArgsForCall = append(fake.listIngressArgsForCall, struct { + arg1 context.Context + arg2 livekit.RoomName + }{arg1, arg2}) + stub := fake.ListIngressStub + fakeReturns := fake.listIngressReturns + fake.recordInvocation("ListIngress", []interface{}{arg1, arg2}) + fake.listIngressMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeServiceStore) ListIngressCallCount() int { + fake.listIngressMutex.RLock() + defer fake.listIngressMutex.RUnlock() + return len(fake.listIngressArgsForCall) +} + +func (fake *FakeServiceStore) ListIngressCalls(stub func(context.Context, livekit.RoomName) ([]*livekit.IngressInfo, error)) { + fake.listIngressMutex.Lock() + defer fake.listIngressMutex.Unlock() + fake.ListIngressStub = stub +} + +func (fake *FakeServiceStore) ListIngressArgsForCall(i int) (context.Context, livekit.RoomName) { + fake.listIngressMutex.RLock() + defer fake.listIngressMutex.RUnlock() + argsForCall := fake.listIngressArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeServiceStore) ListIngressReturns(result1 []*livekit.IngressInfo, result2 error) { + fake.listIngressMutex.Lock() + defer fake.listIngressMutex.Unlock() + fake.ListIngressStub = nil + fake.listIngressReturns = struct { + result1 []*livekit.IngressInfo + result2 error + }{result1, result2} +} + +func (fake *FakeServiceStore) ListIngressReturnsOnCall(i int, result1 []*livekit.IngressInfo, result2 error) { + fake.listIngressMutex.Lock() + defer fake.listIngressMutex.Unlock() + fake.ListIngressStub = nil + if fake.listIngressReturnsOnCall == nil { + fake.listIngressReturnsOnCall = make(map[int]struct { + result1 []*livekit.IngressInfo + result2 error + }) + } + fake.listIngressReturnsOnCall[i] = struct { + result1 []*livekit.IngressInfo + result2 error + }{result1, result2} +} + func (fake *FakeServiceStore) ListParticipants(arg1 context.Context, arg2 livekit.RoomName) ([]*livekit.ParticipantInfo, error) { fake.listParticipantsMutex.Lock() ret, specificReturn := fake.listParticipantsReturnsOnCall[len(fake.listParticipantsArgsForCall)] @@ -462,6 +667,136 @@ func (fake *FakeServiceStore) LoadEgressReturnsOnCall(i int, result1 *livekit.Eg }{result1, result2} } +func (fake *FakeServiceStore) LoadIngress(arg1 context.Context, arg2 string) (*livekit.IngressInfo, error) { + fake.loadIngressMutex.Lock() + ret, specificReturn := fake.loadIngressReturnsOnCall[len(fake.loadIngressArgsForCall)] + fake.loadIngressArgsForCall = append(fake.loadIngressArgsForCall, struct { + arg1 context.Context + arg2 string + }{arg1, arg2}) + stub := fake.LoadIngressStub + fakeReturns := fake.loadIngressReturns + fake.recordInvocation("LoadIngress", []interface{}{arg1, arg2}) + fake.loadIngressMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeServiceStore) LoadIngressCallCount() int { + fake.loadIngressMutex.RLock() + defer fake.loadIngressMutex.RUnlock() + return len(fake.loadIngressArgsForCall) +} + +func (fake *FakeServiceStore) LoadIngressCalls(stub func(context.Context, string) (*livekit.IngressInfo, error)) { + fake.loadIngressMutex.Lock() + defer fake.loadIngressMutex.Unlock() + fake.LoadIngressStub = stub +} + +func (fake *FakeServiceStore) LoadIngressArgsForCall(i int) (context.Context, string) { + fake.loadIngressMutex.RLock() + defer fake.loadIngressMutex.RUnlock() + argsForCall := fake.loadIngressArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeServiceStore) LoadIngressReturns(result1 *livekit.IngressInfo, result2 error) { + fake.loadIngressMutex.Lock() + defer fake.loadIngressMutex.Unlock() + fake.LoadIngressStub = nil + fake.loadIngressReturns = struct { + result1 *livekit.IngressInfo + result2 error + }{result1, result2} +} + +func (fake *FakeServiceStore) LoadIngressReturnsOnCall(i int, result1 *livekit.IngressInfo, result2 error) { + fake.loadIngressMutex.Lock() + defer fake.loadIngressMutex.Unlock() + fake.LoadIngressStub = nil + if fake.loadIngressReturnsOnCall == nil { + fake.loadIngressReturnsOnCall = make(map[int]struct { + result1 *livekit.IngressInfo + result2 error + }) + } + fake.loadIngressReturnsOnCall[i] = struct { + result1 *livekit.IngressInfo + result2 error + }{result1, result2} +} + +func (fake *FakeServiceStore) LoadIngressFromStreamKey(arg1 context.Context, arg2 string) (*livekit.IngressInfo, error) { + fake.loadIngressFromStreamKeyMutex.Lock() + ret, specificReturn := fake.loadIngressFromStreamKeyReturnsOnCall[len(fake.loadIngressFromStreamKeyArgsForCall)] + fake.loadIngressFromStreamKeyArgsForCall = append(fake.loadIngressFromStreamKeyArgsForCall, struct { + arg1 context.Context + arg2 string + }{arg1, arg2}) + stub := fake.LoadIngressFromStreamKeyStub + fakeReturns := fake.loadIngressFromStreamKeyReturns + fake.recordInvocation("LoadIngressFromStreamKey", []interface{}{arg1, arg2}) + fake.loadIngressFromStreamKeyMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeServiceStore) LoadIngressFromStreamKeyCallCount() int { + fake.loadIngressFromStreamKeyMutex.RLock() + defer fake.loadIngressFromStreamKeyMutex.RUnlock() + return len(fake.loadIngressFromStreamKeyArgsForCall) +} + +func (fake *FakeServiceStore) LoadIngressFromStreamKeyCalls(stub func(context.Context, string) (*livekit.IngressInfo, error)) { + fake.loadIngressFromStreamKeyMutex.Lock() + defer fake.loadIngressFromStreamKeyMutex.Unlock() + fake.LoadIngressFromStreamKeyStub = stub +} + +func (fake *FakeServiceStore) LoadIngressFromStreamKeyArgsForCall(i int) (context.Context, string) { + fake.loadIngressFromStreamKeyMutex.RLock() + defer fake.loadIngressFromStreamKeyMutex.RUnlock() + argsForCall := fake.loadIngressFromStreamKeyArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeServiceStore) LoadIngressFromStreamKeyReturns(result1 *livekit.IngressInfo, result2 error) { + fake.loadIngressFromStreamKeyMutex.Lock() + defer fake.loadIngressFromStreamKeyMutex.Unlock() + fake.LoadIngressFromStreamKeyStub = nil + fake.loadIngressFromStreamKeyReturns = struct { + result1 *livekit.IngressInfo + result2 error + }{result1, result2} +} + +func (fake *FakeServiceStore) LoadIngressFromStreamKeyReturnsOnCall(i int, result1 *livekit.IngressInfo, result2 error) { + fake.loadIngressFromStreamKeyMutex.Lock() + defer fake.loadIngressFromStreamKeyMutex.Unlock() + fake.LoadIngressFromStreamKeyStub = nil + if fake.loadIngressFromStreamKeyReturnsOnCall == nil { + fake.loadIngressFromStreamKeyReturnsOnCall = make(map[int]struct { + result1 *livekit.IngressInfo + result2 error + }) + } + fake.loadIngressFromStreamKeyReturnsOnCall[i] = struct { + result1 *livekit.IngressInfo + result2 error + }{result1, result2} +} + func (fake *FakeServiceStore) LoadParticipant(arg1 context.Context, arg2 livekit.RoomName, arg3 livekit.ParticipantIdentity) (*livekit.ParticipantInfo, error) { fake.loadParticipantMutex.Lock() ret, specificReturn := fake.loadParticipantReturnsOnCall[len(fake.loadParticipantArgsForCall)] @@ -655,6 +990,68 @@ func (fake *FakeServiceStore) StoreEgressReturnsOnCall(i int, result1 error) { }{result1} } +func (fake *FakeServiceStore) StoreIngress(arg1 context.Context, arg2 *livekit.IngressInfo) error { + fake.storeIngressMutex.Lock() + ret, specificReturn := fake.storeIngressReturnsOnCall[len(fake.storeIngressArgsForCall)] + fake.storeIngressArgsForCall = append(fake.storeIngressArgsForCall, struct { + arg1 context.Context + arg2 *livekit.IngressInfo + }{arg1, arg2}) + stub := fake.StoreIngressStub + fakeReturns := fake.storeIngressReturns + fake.recordInvocation("StoreIngress", []interface{}{arg1, arg2}) + fake.storeIngressMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeServiceStore) StoreIngressCallCount() int { + fake.storeIngressMutex.RLock() + defer fake.storeIngressMutex.RUnlock() + return len(fake.storeIngressArgsForCall) +} + +func (fake *FakeServiceStore) StoreIngressCalls(stub func(context.Context, *livekit.IngressInfo) error) { + fake.storeIngressMutex.Lock() + defer fake.storeIngressMutex.Unlock() + fake.StoreIngressStub = stub +} + +func (fake *FakeServiceStore) StoreIngressArgsForCall(i int) (context.Context, *livekit.IngressInfo) { + fake.storeIngressMutex.RLock() + defer fake.storeIngressMutex.RUnlock() + argsForCall := fake.storeIngressArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeServiceStore) StoreIngressReturns(result1 error) { + fake.storeIngressMutex.Lock() + defer fake.storeIngressMutex.Unlock() + fake.StoreIngressStub = nil + fake.storeIngressReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeServiceStore) StoreIngressReturnsOnCall(i int, result1 error) { + fake.storeIngressMutex.Lock() + defer fake.storeIngressMutex.Unlock() + fake.StoreIngressStub = nil + if fake.storeIngressReturnsOnCall == nil { + fake.storeIngressReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.storeIngressReturnsOnCall[i] = struct { + result1 error + }{result1} +} + func (fake *FakeServiceStore) UpdateEgress(arg1 context.Context, arg2 *livekit.EgressInfo) error { fake.updateEgressMutex.Lock() ret, specificReturn := fake.updateEgressReturnsOnCall[len(fake.updateEgressArgsForCall)] @@ -717,27 +1114,101 @@ func (fake *FakeServiceStore) UpdateEgressReturnsOnCall(i int, result1 error) { }{result1} } +func (fake *FakeServiceStore) UpdateIngress(arg1 context.Context, arg2 *livekit.IngressInfo) error { + fake.updateIngressMutex.Lock() + ret, specificReturn := fake.updateIngressReturnsOnCall[len(fake.updateIngressArgsForCall)] + fake.updateIngressArgsForCall = append(fake.updateIngressArgsForCall, struct { + arg1 context.Context + arg2 *livekit.IngressInfo + }{arg1, arg2}) + stub := fake.UpdateIngressStub + fakeReturns := fake.updateIngressReturns + fake.recordInvocation("UpdateIngress", []interface{}{arg1, arg2}) + fake.updateIngressMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeServiceStore) UpdateIngressCallCount() int { + fake.updateIngressMutex.RLock() + defer fake.updateIngressMutex.RUnlock() + return len(fake.updateIngressArgsForCall) +} + +func (fake *FakeServiceStore) UpdateIngressCalls(stub func(context.Context, *livekit.IngressInfo) error) { + fake.updateIngressMutex.Lock() + defer fake.updateIngressMutex.Unlock() + fake.UpdateIngressStub = stub +} + +func (fake *FakeServiceStore) UpdateIngressArgsForCall(i int) (context.Context, *livekit.IngressInfo) { + fake.updateIngressMutex.RLock() + defer fake.updateIngressMutex.RUnlock() + argsForCall := fake.updateIngressArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeServiceStore) UpdateIngressReturns(result1 error) { + fake.updateIngressMutex.Lock() + defer fake.updateIngressMutex.Unlock() + fake.UpdateIngressStub = nil + fake.updateIngressReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeServiceStore) UpdateIngressReturnsOnCall(i int, result1 error) { + fake.updateIngressMutex.Lock() + defer fake.updateIngressMutex.Unlock() + fake.UpdateIngressStub = nil + if fake.updateIngressReturnsOnCall == nil { + fake.updateIngressReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.updateIngressReturnsOnCall[i] = struct { + result1 error + }{result1} +} + func (fake *FakeServiceStore) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() fake.deleteEgressMutex.RLock() defer fake.deleteEgressMutex.RUnlock() + fake.deleteIngressMutex.RLock() + defer fake.deleteIngressMutex.RUnlock() fake.listEgressMutex.RLock() defer fake.listEgressMutex.RUnlock() + fake.listIngressMutex.RLock() + defer fake.listIngressMutex.RUnlock() fake.listParticipantsMutex.RLock() defer fake.listParticipantsMutex.RUnlock() fake.listRoomsMutex.RLock() defer fake.listRoomsMutex.RUnlock() fake.loadEgressMutex.RLock() defer fake.loadEgressMutex.RUnlock() + fake.loadIngressMutex.RLock() + defer fake.loadIngressMutex.RUnlock() + fake.loadIngressFromStreamKeyMutex.RLock() + defer fake.loadIngressFromStreamKeyMutex.RUnlock() fake.loadParticipantMutex.RLock() defer fake.loadParticipantMutex.RUnlock() fake.loadRoomMutex.RLock() defer fake.loadRoomMutex.RUnlock() fake.storeEgressMutex.RLock() defer fake.storeEgressMutex.RUnlock() + fake.storeIngressMutex.RLock() + defer fake.storeIngressMutex.RUnlock() fake.updateEgressMutex.RLock() defer fake.updateEgressMutex.RUnlock() + fake.updateIngressMutex.RLock() + defer fake.updateIngressMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value