diff --git a/go.mod b/go.mod index 9894dcb7d..42cd30c94 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20240206082112-9bf41dcbce76 - github.com/livekit/protocol v1.9.10-0.20240213074347-cee767e2d909 + github.com/livekit/protocol v1.9.10-0.20240214205753-2e75b8fb463f github.com/livekit/psrpc v0.5.3-0.20240209001357-380f59f00c58 github.com/mackerelio/go-osstat v0.2.4 github.com/magefile/mage v1.15.0 @@ -47,7 +47,7 @@ require ( github.com/urfave/cli/v2 v2.27.1 github.com/urfave/negroni/v3 v3.0.0 go.uber.org/atomic v1.11.0 - golang.org/x/exp v0.0.0-20240205201215-2c58cdc269a3 + golang.org/x/exp v0.0.0-20240213143201-ec583247a57a golang.org/x/sync v0.6.0 google.golang.org/protobuf v1.32.0 gopkg.in/yaml.v3 v3.0.1 @@ -98,11 +98,11 @@ require ( go.uber.org/zap v1.26.0 // indirect golang.org/x/crypto v0.19.0 // indirect golang.org/x/mod v0.15.0 // indirect - golang.org/x/net v0.20.0 // indirect + golang.org/x/net v0.21.0 // indirect golang.org/x/sys v0.17.0 // indirect golang.org/x/text v0.14.0 // indirect - golang.org/x/tools v0.17.0 // indirect + golang.org/x/tools v0.18.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240205150955-31a09d347014 // indirect - google.golang.org/grpc v1.61.0 // indirect + google.golang.org/grpc v1.61.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index 605b241d6..210a2b9ad 100644 --- a/go.sum +++ b/go.sum @@ -127,8 +127,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20240206082112-9bf41dcbce76 h1:Zw88krOHni51OzDUlrduYb3m7VcsaKw06TnnDhsQpjg= github.com/livekit/mediatransportutil v0.0.0-20240206082112-9bf41dcbce76/go.mod h1:GBzn9xL+mivI1pW+tyExcKgbc0VOc29I9yJsNcAVaAc= -github.com/livekit/protocol v1.9.10-0.20240213074347-cee767e2d909 h1:36Jehw3Z0CHV3QDbDWUndky02lGtdpl0XTDs7u8UwPk= -github.com/livekit/protocol v1.9.10-0.20240213074347-cee767e2d909/go.mod h1:042ukZY29snaLjTVxVO+X89zuqqr9HDcpkq45c9Y27A= +github.com/livekit/protocol v1.9.10-0.20240214205753-2e75b8fb463f h1:f/KuQMFbWxKp43I2mftTLCWTwGnwtTE9FF7Btr78Uzc= +github.com/livekit/protocol v1.9.10-0.20240214205753-2e75b8fb463f/go.mod h1:/kviHT6yTNqHdZ9QsvRuxAHf7LaBROa7qe5naT1oVrU= github.com/livekit/psrpc v0.5.3-0.20240209001357-380f59f00c58 h1:yH55rBGLRO+ict2mu6bKZ5iPwTIrIwU1i0ydgThi4+k= github.com/livekit/psrpc v0.5.3-0.20240209001357-380f59f00c58/go.mod h1:cQjxg1oCxYHhxxv6KJH1gSvdtCHQoRZCHgPdm5N8v2g= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= @@ -296,8 +296,8 @@ golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliY golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= -golang.org/x/exp v0.0.0-20240205201215-2c58cdc269a3 h1:/RIbNt/Zr7rVhIkQhooTxCxFcdWLGIKnZA4IXNFSrvo= -golang.org/x/exp v0.0.0-20240205201215-2c58cdc269a3/go.mod h1:idGWGoKP1toJGkd5/ig9ZLuPcZBC3ewk7SzmH0uou08= +golang.org/x/exp v0.0.0-20240213143201-ec583247a57a h1:HinSgX1tJRX3KsL//Gxynpw5CTOAIPhgL4W8PNiIpVE= +golang.org/x/exp v0.0.0-20240213143201-ec583247a57a/go.mod h1:CxmFvTBINI24O/j8iY7H1xHzx2i4OsyguNBmN/uPtqc= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= @@ -332,8 +332,9 @@ golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.13.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= -golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo= golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= +golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= +golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -415,16 +416,17 @@ golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58= -golang.org/x/tools v0.17.0 h1:FvmRgNOcs3kOa+T20R1uhfP9F6HgG2mfxDv1vrx1Htc= golang.org/x/tools v0.17.0/go.mod h1:xsh6VxdV005rRVaS6SSAf9oiAqljS7UZUacMZ8Bnsps= +golang.org/x/tools v0.18.0 h1:k8NLag8AGHnn+PHbl7g43CtqZAwG60vZkLqgyZgIHgQ= +golang.org/x/tools v0.18.0/go.mod h1:GL7B4CwcLLeo59yx/9UWWuNOW1n3VZ4f5axWfML7Lcg= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/genproto/googleapis/rpc v0.0.0-20240205150955-31a09d347014 h1:FSL3lRCkhaPFxqi0s9o+V4UI2WTzAVOvkgbd4kVV4Wg= google.golang.org/genproto/googleapis/rpc v0.0.0-20240205150955-31a09d347014/go.mod h1:SaPjaZGWb0lPqs6Ittu0spdfrOArqji4ZdeP5IC/9N4= -google.golang.org/grpc v1.61.0 h1:TOvOcuXn30kRao+gfcvsebNEa5iZIiLkisYEkf7R7o0= -google.golang.org/grpc v1.61.0/go.mod h1:VUbo7IFqmF1QtCAstipjG0GIoq49KvMe9+h1jFLBNJs= +google.golang.org/grpc v1.61.1 h1:kLAiWrZs7YeDM6MumDe7m3y4aM6wacLzM1Y/wiLP9XY= +google.golang.org/grpc v1.61.1/go.mod h1:VUbo7IFqmF1QtCAstipjG0GIoq49KvMe9+h1jFLBNJs= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/pkg/service/clients.go b/pkg/service/clients.go index 53c0e415b..89faa691a 100644 --- a/pkg/service/clients.go +++ b/pkg/service/clients.go @@ -26,10 +26,15 @@ import ( "github.com/livekit/protocol/utils" ) +//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate + +//counterfeiter:generate . IOClient type IOClient interface { CreateEgress(ctx context.Context, info *livekit.EgressInfo) (*emptypb.Empty, error) GetEgress(ctx context.Context, req *rpc.GetEgressRequest) (*livekit.EgressInfo, error) ListEgress(ctx context.Context, req *livekit.ListEgressRequest) (*livekit.ListEgressResponse, error) + CreateIngress(ctx context.Context, req *livekit.IngressInfo) (*emptypb.Empty, error) + UpdateIngressState(ctx context.Context, req *rpc.UpdateIngressStateRequest) (*emptypb.Empty, error) } type egressLauncher struct { diff --git a/pkg/service/ingress.go b/pkg/service/ingress.go index 75d5ffc91..40b09aaf3 100644 --- a/pkg/service/ingress.go +++ b/pkg/service/ingress.go @@ -39,6 +39,7 @@ type IngressService struct { bus psrpc.MessageBus psrpcClient rpc.IngressClient store IngressStore + io IOClient roomService livekit.RoomService telemetry telemetry.TelemetryService launcher IngressLauncher @@ -50,6 +51,7 @@ func NewIngressServiceWithIngressLauncher( bus psrpc.MessageBus, psrpcClient rpc.IngressClient, store IngressStore, + io IOClient, rs livekit.RoomService, ts telemetry.TelemetryService, launcher IngressLauncher, @@ -61,6 +63,7 @@ func NewIngressServiceWithIngressLauncher( bus: bus, psrpcClient: psrpcClient, store: store, + io: io, roomService: rs, telemetry: ts, launcher: launcher, @@ -73,10 +76,11 @@ func NewIngressService( bus psrpc.MessageBus, psrpcClient rpc.IngressClient, store IngressStore, + io IOClient, rs livekit.RoomService, ts telemetry.TelemetryService, ) *IngressService { - s := NewIngressServiceWithIngressLauncher(conf, nodeID, bus, psrpcClient, store, rs, ts, nil) + s := NewIngressServiceWithIngressLauncher(conf, nodeID, bus, psrpcClient, store, io, rs, ts, nil) s.launcher = s @@ -192,11 +196,19 @@ func (s *IngressService) CreateIngressWithUrl(ctx context.Context, urlStr string } } - if err = s.store.StoreIngress(ctx, info); err != nil { - logger.Errorw("could not write ingress info", err) + // TODO Remove this store Ingress call for URL pull as it is redundant since + // the ingress service sends a CreateIngress RPC + _, err = s.io.CreateIngress(ctx, info) + switch err { + case nil: + break + case ingress.ErrIngressOutOfDate: + // Error returned if the ingress was already created by the ingress service + err = nil + default: + logger.Errorw("could not create ingress object", err) return nil, err } - s.telemetry.IngressCreated(ctx, info) return info, nil } @@ -274,7 +286,10 @@ func (s *IngressService) UpdateIngress(ctx context.Context, req *livekit.UpdateI switch info.State.Status { case livekit.IngressState_ENDPOINT_ERROR: info.State.Status = livekit.IngressState_ENDPOINT_INACTIVE - err = s.store.UpdateIngressState(ctx, req.IngressId, info.State) + _, err = s.io.UpdateIngressState(ctx, &rpc.UpdateIngressStateRequest{ + IngressId: req.IngressId, + State: info.State, + }) if err != nil { logger.Warnw("could not store ingress state", err) } diff --git a/pkg/service/ioservice.go b/pkg/service/ioservice.go index 67f995033..f3f2848c9 100644 --- a/pkg/service/ioservice.go +++ b/pkg/service/ioservice.go @@ -16,7 +16,6 @@ package service import ( "context" - "errors" "google.golang.org/protobuf/types/known/emptypb" @@ -78,6 +77,14 @@ func (s *IOInfoService) Start() error { return nil } +func (s *IOInfoService) Stop() { + close(s.shutdown) + + if s.ioServer != nil { + s.ioServer.Shutdown() + } +} + func (s *IOInfoService) CreateEgress(ctx context.Context, info *livekit.EgressInfo) (*emptypb.Empty, error) { // check if egress already exists to avoid duplicate EgressStarted event if _, err := s.es.LoadEgress(ctx, info.EgressId); err == nil { @@ -156,79 +163,3 @@ func (s *IOInfoService) UpdateMetrics(ctx context.Context, req *rpc.UpdateMetric ) return &emptypb.Empty{}, nil } - -func (s *IOInfoService) GetIngressInfo(ctx context.Context, req *rpc.GetIngressInfoRequest) (*rpc.GetIngressInfoResponse, error) { - info, err := s.loadIngressFromInfoRequest(req) - if err != nil { - return nil, err - } - - return &rpc.GetIngressInfoResponse{Info: info}, nil -} - -func (s *IOInfoService) loadIngressFromInfoRequest(req *rpc.GetIngressInfoRequest) (info *livekit.IngressInfo, err error) { - if req.IngressId != "" { - info, err = s.is.LoadIngress(context.Background(), req.IngressId) - } else if req.StreamKey != "" { - info, err = s.is.LoadIngressFromStreamKey(context.Background(), req.StreamKey) - } else { - err = errors.New("request needs to specify either IngressId or StreamKey") - } - return info, err -} - -func (s *IOInfoService) UpdateIngressState(ctx context.Context, req *rpc.UpdateIngressStateRequest) (*emptypb.Empty, error) { - info, err := s.is.LoadIngress(ctx, req.IngressId) - if err != nil { - return nil, err - } - - if err = s.is.UpdateIngressState(ctx, req.IngressId, req.State); err != nil { - logger.Errorw("could not update ingress", err) - return nil, err - } - - if info.State.Status != req.State.Status { - info.State = req.State - - switch req.State.Status { - case livekit.IngressState_ENDPOINT_ERROR, - livekit.IngressState_ENDPOINT_INACTIVE, - livekit.IngressState_ENDPOINT_COMPLETE: - s.telemetry.IngressEnded(ctx, info) - - if req.State.Error != "" { - logger.Infow("ingress failed", "error", req.State.Error, "ingressID", req.IngressId) - } else { - logger.Infow("ingress ended", "ingressID", req.IngressId) - } - - case livekit.IngressState_ENDPOINT_PUBLISHING: - s.telemetry.IngressStarted(ctx, info) - - logger.Infow("ingress started", "ingressID", req.IngressId) - - case livekit.IngressState_ENDPOINT_BUFFERING: - s.telemetry.IngressUpdated(ctx, info) - - logger.Infow("ingress buffering", "ingressID", req.IngressId) - } - } else { - // Status didn't change, send Updated event - info.State = req.State - - s.telemetry.IngressUpdated(ctx, info) - - logger.Infow("ingress updated", "ingressID", req.IngressId, "status", info.State.Status) - } - - return &emptypb.Empty{}, nil -} - -func (s *IOInfoService) Stop() { - close(s.shutdown) - - if s.ioServer != nil { - s.ioServer.Shutdown() - } -} diff --git a/pkg/service/ioservice_ingress.go b/pkg/service/ioservice_ingress.go new file mode 100644 index 000000000..cba5ab390 --- /dev/null +++ b/pkg/service/ioservice_ingress.go @@ -0,0 +1,90 @@ +package service + +import ( + "context" + "errors" + + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" + "github.com/livekit/protocol/rpc" + "google.golang.org/protobuf/types/known/emptypb" +) + +func (s *IOInfoService) CreateIngress(ctx context.Context, info *livekit.IngressInfo) (*emptypb.Empty, error) { + err := s.is.StoreIngress(ctx, info) + if err != nil { + return nil, err + } + + s.telemetry.IngressCreated(ctx, info) + + return &emptypb.Empty{}, nil +} + +func (s *IOInfoService) GetIngressInfo(ctx context.Context, req *rpc.GetIngressInfoRequest) (*rpc.GetIngressInfoResponse, error) { + info, err := s.loadIngressFromInfoRequest(req) + if err != nil { + return nil, err + } + + return &rpc.GetIngressInfoResponse{Info: info}, nil +} + +func (s *IOInfoService) loadIngressFromInfoRequest(req *rpc.GetIngressInfoRequest) (info *livekit.IngressInfo, err error) { + if req.IngressId != "" { + info, err = s.is.LoadIngress(context.Background(), req.IngressId) + } else if req.StreamKey != "" { + info, err = s.is.LoadIngressFromStreamKey(context.Background(), req.StreamKey) + } else { + err = errors.New("request needs to specify either IngressId or StreamKey") + } + return info, err +} + +func (s *IOInfoService) UpdateIngressState(ctx context.Context, req *rpc.UpdateIngressStateRequest) (*emptypb.Empty, error) { + info, err := s.is.LoadIngress(ctx, req.IngressId) + if err != nil { + return nil, err + } + + if err = s.is.UpdateIngressState(ctx, req.IngressId, req.State); err != nil { + logger.Errorw("could not update ingress", err) + return nil, err + } + + if info.State.Status != req.State.Status { + info.State = req.State + + switch req.State.Status { + case livekit.IngressState_ENDPOINT_ERROR, + livekit.IngressState_ENDPOINT_INACTIVE, + livekit.IngressState_ENDPOINT_COMPLETE: + s.telemetry.IngressEnded(ctx, info) + + if req.State.Error != "" { + logger.Infow("ingress failed", "error", req.State.Error, "ingressID", req.IngressId) + } else { + logger.Infow("ingress ended", "ingressID", req.IngressId) + } + + case livekit.IngressState_ENDPOINT_PUBLISHING: + s.telemetry.IngressStarted(ctx, info) + + logger.Infow("ingress started", "ingressID", req.IngressId) + + case livekit.IngressState_ENDPOINT_BUFFERING: + s.telemetry.IngressUpdated(ctx, info) + + logger.Infow("ingress buffering", "ingressID", req.IngressId) + } + } else { + // Status didn't change, send Updated event + info.State = req.State + + s.telemetry.IngressUpdated(ctx, info) + + logger.Infow("ingress state updated", "ingressID", req.IngressId, "status", info.State.Status) + } + + return &emptypb.Empty{}, nil +} diff --git a/pkg/service/servicefakes/fake_ioclient.go b/pkg/service/servicefakes/fake_ioclient.go index 5d440cbb9..0e1e22ac5 100644 --- a/pkg/service/servicefakes/fake_ioclient.go +++ b/pkg/service/servicefakes/fake_ioclient.go @@ -26,6 +26,20 @@ type FakeIOClient struct { result1 *emptypb.Empty result2 error } + CreateIngressStub func(context.Context, *livekit.IngressInfo) (*emptypb.Empty, error) + createIngressMutex sync.RWMutex + createIngressArgsForCall []struct { + arg1 context.Context + arg2 *livekit.IngressInfo + } + createIngressReturns struct { + result1 *emptypb.Empty + result2 error + } + createIngressReturnsOnCall map[int]struct { + result1 *emptypb.Empty + result2 error + } GetEgressStub func(context.Context, *rpc.GetEgressRequest) (*livekit.EgressInfo, error) getEgressMutex sync.RWMutex getEgressArgsForCall []struct { @@ -54,6 +68,20 @@ type FakeIOClient struct { result1 *livekit.ListEgressResponse result2 error } + UpdateIngressStateStub func(context.Context, *rpc.UpdateIngressStateRequest) (*emptypb.Empty, error) + updateIngressStateMutex sync.RWMutex + updateIngressStateArgsForCall []struct { + arg1 context.Context + arg2 *rpc.UpdateIngressStateRequest + } + updateIngressStateReturns struct { + result1 *emptypb.Empty + result2 error + } + updateIngressStateReturnsOnCall map[int]struct { + result1 *emptypb.Empty + result2 error + } invocations map[string][][]interface{} invocationsMutex sync.RWMutex } @@ -123,6 +151,71 @@ func (fake *FakeIOClient) CreateEgressReturnsOnCall(i int, result1 *emptypb.Empt }{result1, result2} } +func (fake *FakeIOClient) CreateIngress(arg1 context.Context, arg2 *livekit.IngressInfo) (*emptypb.Empty, error) { + fake.createIngressMutex.Lock() + ret, specificReturn := fake.createIngressReturnsOnCall[len(fake.createIngressArgsForCall)] + fake.createIngressArgsForCall = append(fake.createIngressArgsForCall, struct { + arg1 context.Context + arg2 *livekit.IngressInfo + }{arg1, arg2}) + stub := fake.CreateIngressStub + fakeReturns := fake.createIngressReturns + fake.recordInvocation("CreateIngress", []interface{}{arg1, arg2}) + fake.createIngressMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeIOClient) CreateIngressCallCount() int { + fake.createIngressMutex.RLock() + defer fake.createIngressMutex.RUnlock() + return len(fake.createIngressArgsForCall) +} + +func (fake *FakeIOClient) CreateIngressCalls(stub func(context.Context, *livekit.IngressInfo) (*emptypb.Empty, error)) { + fake.createIngressMutex.Lock() + defer fake.createIngressMutex.Unlock() + fake.CreateIngressStub = stub +} + +func (fake *FakeIOClient) CreateIngressArgsForCall(i int) (context.Context, *livekit.IngressInfo) { + fake.createIngressMutex.RLock() + defer fake.createIngressMutex.RUnlock() + argsForCall := fake.createIngressArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeIOClient) CreateIngressReturns(result1 *emptypb.Empty, result2 error) { + fake.createIngressMutex.Lock() + defer fake.createIngressMutex.Unlock() + fake.CreateIngressStub = nil + fake.createIngressReturns = struct { + result1 *emptypb.Empty + result2 error + }{result1, result2} +} + +func (fake *FakeIOClient) CreateIngressReturnsOnCall(i int, result1 *emptypb.Empty, result2 error) { + fake.createIngressMutex.Lock() + defer fake.createIngressMutex.Unlock() + fake.CreateIngressStub = nil + if fake.createIngressReturnsOnCall == nil { + fake.createIngressReturnsOnCall = make(map[int]struct { + result1 *emptypb.Empty + result2 error + }) + } + fake.createIngressReturnsOnCall[i] = struct { + result1 *emptypb.Empty + result2 error + }{result1, result2} +} + func (fake *FakeIOClient) GetEgress(arg1 context.Context, arg2 *rpc.GetEgressRequest) (*livekit.EgressInfo, error) { fake.getEgressMutex.Lock() ret, specificReturn := fake.getEgressReturnsOnCall[len(fake.getEgressArgsForCall)] @@ -253,15 +346,84 @@ func (fake *FakeIOClient) ListEgressReturnsOnCall(i int, result1 *livekit.ListEg }{result1, result2} } +func (fake *FakeIOClient) UpdateIngressState(arg1 context.Context, arg2 *rpc.UpdateIngressStateRequest) (*emptypb.Empty, error) { + fake.updateIngressStateMutex.Lock() + ret, specificReturn := fake.updateIngressStateReturnsOnCall[len(fake.updateIngressStateArgsForCall)] + fake.updateIngressStateArgsForCall = append(fake.updateIngressStateArgsForCall, struct { + arg1 context.Context + arg2 *rpc.UpdateIngressStateRequest + }{arg1, arg2}) + stub := fake.UpdateIngressStateStub + fakeReturns := fake.updateIngressStateReturns + fake.recordInvocation("UpdateIngressState", []interface{}{arg1, arg2}) + fake.updateIngressStateMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeIOClient) UpdateIngressStateCallCount() int { + fake.updateIngressStateMutex.RLock() + defer fake.updateIngressStateMutex.RUnlock() + return len(fake.updateIngressStateArgsForCall) +} + +func (fake *FakeIOClient) UpdateIngressStateCalls(stub func(context.Context, *rpc.UpdateIngressStateRequest) (*emptypb.Empty, error)) { + fake.updateIngressStateMutex.Lock() + defer fake.updateIngressStateMutex.Unlock() + fake.UpdateIngressStateStub = stub +} + +func (fake *FakeIOClient) UpdateIngressStateArgsForCall(i int) (context.Context, *rpc.UpdateIngressStateRequest) { + fake.updateIngressStateMutex.RLock() + defer fake.updateIngressStateMutex.RUnlock() + argsForCall := fake.updateIngressStateArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeIOClient) UpdateIngressStateReturns(result1 *emptypb.Empty, result2 error) { + fake.updateIngressStateMutex.Lock() + defer fake.updateIngressStateMutex.Unlock() + fake.UpdateIngressStateStub = nil + fake.updateIngressStateReturns = struct { + result1 *emptypb.Empty + result2 error + }{result1, result2} +} + +func (fake *FakeIOClient) UpdateIngressStateReturnsOnCall(i int, result1 *emptypb.Empty, result2 error) { + fake.updateIngressStateMutex.Lock() + defer fake.updateIngressStateMutex.Unlock() + fake.UpdateIngressStateStub = nil + if fake.updateIngressStateReturnsOnCall == nil { + fake.updateIngressStateReturnsOnCall = make(map[int]struct { + result1 *emptypb.Empty + result2 error + }) + } + fake.updateIngressStateReturnsOnCall[i] = struct { + result1 *emptypb.Empty + result2 error + }{result1, result2} +} + func (fake *FakeIOClient) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() fake.createEgressMutex.RLock() defer fake.createEgressMutex.RUnlock() + fake.createIngressMutex.RLock() + defer fake.createIngressMutex.RUnlock() fake.getEgressMutex.RLock() defer fake.getEgressMutex.RUnlock() fake.listEgressMutex.RLock() defer fake.listEgressMutex.RUnlock() + fake.updateIngressStateMutex.RLock() + defer fake.updateIngressStateMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index e76fa56c7..c8307a3bf 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -106,7 +106,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live if err != nil { return nil, err } - ingressService := NewIngressService(ingressConfig, nodeID, messageBus, ingressClient, ingressStore, roomService, telemetryService) + ingressService := NewIngressService(ingressConfig, nodeID, messageBus, ingressClient, ingressStore, ioInfoService, roomService, telemetryService) sipConfig := getSIPConfig(conf) sipClient, err := rpc.NewSIPClient(messageBus) if err != nil {