mirror of
https://github.com/livekit/livekit.git
synced 2026-06-04 23:52:06 +00:00
Move CreateIngress to IOInfoService. Adopt UpdateIngressStare from IOInfoService instead of IngressService (#2485)
When paired with an updated ingress service, this will cause CreateIngress to be called twice for URL pull ingress, with the 2nd call failing.
This commit is contained in:
@@ -19,7 +19,7 @@ require (
|
||||
github.com/jxskiss/base62 v1.1.0
|
||||
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
|
||||
github.com/livekit/mediatransportutil v0.0.0-20240206082112-9bf41dcbce76
|
||||
github.com/livekit/protocol v1.9.10-0.20240213074347-cee767e2d909
|
||||
github.com/livekit/protocol v1.9.10-0.20240214205753-2e75b8fb463f
|
||||
github.com/livekit/psrpc v0.5.3-0.20240209001357-380f59f00c58
|
||||
github.com/mackerelio/go-osstat v0.2.4
|
||||
github.com/magefile/mage v1.15.0
|
||||
@@ -47,7 +47,7 @@ require (
|
||||
github.com/urfave/cli/v2 v2.27.1
|
||||
github.com/urfave/negroni/v3 v3.0.0
|
||||
go.uber.org/atomic v1.11.0
|
||||
golang.org/x/exp v0.0.0-20240205201215-2c58cdc269a3
|
||||
golang.org/x/exp v0.0.0-20240213143201-ec583247a57a
|
||||
golang.org/x/sync v0.6.0
|
||||
google.golang.org/protobuf v1.32.0
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
@@ -98,11 +98,11 @@ require (
|
||||
go.uber.org/zap v1.26.0 // indirect
|
||||
golang.org/x/crypto v0.19.0 // indirect
|
||||
golang.org/x/mod v0.15.0 // indirect
|
||||
golang.org/x/net v0.20.0 // indirect
|
||||
golang.org/x/net v0.21.0 // indirect
|
||||
golang.org/x/sys v0.17.0 // indirect
|
||||
golang.org/x/text v0.14.0 // indirect
|
||||
golang.org/x/tools v0.17.0 // indirect
|
||||
golang.org/x/tools v0.18.0 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240205150955-31a09d347014 // indirect
|
||||
google.golang.org/grpc v1.61.0 // indirect
|
||||
google.golang.org/grpc v1.61.1 // indirect
|
||||
gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||
)
|
||||
|
||||
@@ -127,8 +127,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
|
||||
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
|
||||
github.com/livekit/mediatransportutil v0.0.0-20240206082112-9bf41dcbce76 h1:Zw88krOHni51OzDUlrduYb3m7VcsaKw06TnnDhsQpjg=
|
||||
github.com/livekit/mediatransportutil v0.0.0-20240206082112-9bf41dcbce76/go.mod h1:GBzn9xL+mivI1pW+tyExcKgbc0VOc29I9yJsNcAVaAc=
|
||||
github.com/livekit/protocol v1.9.10-0.20240213074347-cee767e2d909 h1:36Jehw3Z0CHV3QDbDWUndky02lGtdpl0XTDs7u8UwPk=
|
||||
github.com/livekit/protocol v1.9.10-0.20240213074347-cee767e2d909/go.mod h1:042ukZY29snaLjTVxVO+X89zuqqr9HDcpkq45c9Y27A=
|
||||
github.com/livekit/protocol v1.9.10-0.20240214205753-2e75b8fb463f h1:f/KuQMFbWxKp43I2mftTLCWTwGnwtTE9FF7Btr78Uzc=
|
||||
github.com/livekit/protocol v1.9.10-0.20240214205753-2e75b8fb463f/go.mod h1:/kviHT6yTNqHdZ9QsvRuxAHf7LaBROa7qe5naT1oVrU=
|
||||
github.com/livekit/psrpc v0.5.3-0.20240209001357-380f59f00c58 h1:yH55rBGLRO+ict2mu6bKZ5iPwTIrIwU1i0ydgThi4+k=
|
||||
github.com/livekit/psrpc v0.5.3-0.20240209001357-380f59f00c58/go.mod h1:cQjxg1oCxYHhxxv6KJH1gSvdtCHQoRZCHgPdm5N8v2g=
|
||||
github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs=
|
||||
@@ -296,8 +296,8 @@ golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliY
|
||||
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
|
||||
golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo=
|
||||
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
|
||||
golang.org/x/exp v0.0.0-20240205201215-2c58cdc269a3 h1:/RIbNt/Zr7rVhIkQhooTxCxFcdWLGIKnZA4IXNFSrvo=
|
||||
golang.org/x/exp v0.0.0-20240205201215-2c58cdc269a3/go.mod h1:idGWGoKP1toJGkd5/ig9ZLuPcZBC3ewk7SzmH0uou08=
|
||||
golang.org/x/exp v0.0.0-20240213143201-ec583247a57a h1:HinSgX1tJRX3KsL//Gxynpw5CTOAIPhgL4W8PNiIpVE=
|
||||
golang.org/x/exp v0.0.0-20240213143201-ec583247a57a/go.mod h1:CxmFvTBINI24O/j8iY7H1xHzx2i4OsyguNBmN/uPtqc=
|
||||
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
|
||||
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
|
||||
@@ -332,8 +332,9 @@ golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
|
||||
golang.org/x/net v0.13.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA=
|
||||
golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI=
|
||||
golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk=
|
||||
golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo=
|
||||
golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
|
||||
golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4=
|
||||
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
|
||||
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
@@ -415,16 +416,17 @@ golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4f
|
||||
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
|
||||
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
|
||||
golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58=
|
||||
golang.org/x/tools v0.17.0 h1:FvmRgNOcs3kOa+T20R1uhfP9F6HgG2mfxDv1vrx1Htc=
|
||||
golang.org/x/tools v0.17.0/go.mod h1:xsh6VxdV005rRVaS6SSAf9oiAqljS7UZUacMZ8Bnsps=
|
||||
golang.org/x/tools v0.18.0 h1:k8NLag8AGHnn+PHbl7g43CtqZAwG60vZkLqgyZgIHgQ=
|
||||
golang.org/x/tools v0.18.0/go.mod h1:GL7B4CwcLLeo59yx/9UWWuNOW1n3VZ4f5axWfML7Lcg=
|
||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240205150955-31a09d347014 h1:FSL3lRCkhaPFxqi0s9o+V4UI2WTzAVOvkgbd4kVV4Wg=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240205150955-31a09d347014/go.mod h1:SaPjaZGWb0lPqs6Ittu0spdfrOArqji4ZdeP5IC/9N4=
|
||||
google.golang.org/grpc v1.61.0 h1:TOvOcuXn30kRao+gfcvsebNEa5iZIiLkisYEkf7R7o0=
|
||||
google.golang.org/grpc v1.61.0/go.mod h1:VUbo7IFqmF1QtCAstipjG0GIoq49KvMe9+h1jFLBNJs=
|
||||
google.golang.org/grpc v1.61.1 h1:kLAiWrZs7YeDM6MumDe7m3y4aM6wacLzM1Y/wiLP9XY=
|
||||
google.golang.org/grpc v1.61.1/go.mod h1:VUbo7IFqmF1QtCAstipjG0GIoq49KvMe9+h1jFLBNJs=
|
||||
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
|
||||
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
|
||||
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
|
||||
|
||||
@@ -26,10 +26,15 @@ import (
|
||||
"github.com/livekit/protocol/utils"
|
||||
)
|
||||
|
||||
//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate
|
||||
|
||||
//counterfeiter:generate . IOClient
|
||||
type IOClient interface {
|
||||
CreateEgress(ctx context.Context, info *livekit.EgressInfo) (*emptypb.Empty, error)
|
||||
GetEgress(ctx context.Context, req *rpc.GetEgressRequest) (*livekit.EgressInfo, error)
|
||||
ListEgress(ctx context.Context, req *livekit.ListEgressRequest) (*livekit.ListEgressResponse, error)
|
||||
CreateIngress(ctx context.Context, req *livekit.IngressInfo) (*emptypb.Empty, error)
|
||||
UpdateIngressState(ctx context.Context, req *rpc.UpdateIngressStateRequest) (*emptypb.Empty, error)
|
||||
}
|
||||
|
||||
type egressLauncher struct {
|
||||
|
||||
+20
-5
@@ -39,6 +39,7 @@ type IngressService struct {
|
||||
bus psrpc.MessageBus
|
||||
psrpcClient rpc.IngressClient
|
||||
store IngressStore
|
||||
io IOClient
|
||||
roomService livekit.RoomService
|
||||
telemetry telemetry.TelemetryService
|
||||
launcher IngressLauncher
|
||||
@@ -50,6 +51,7 @@ func NewIngressServiceWithIngressLauncher(
|
||||
bus psrpc.MessageBus,
|
||||
psrpcClient rpc.IngressClient,
|
||||
store IngressStore,
|
||||
io IOClient,
|
||||
rs livekit.RoomService,
|
||||
ts telemetry.TelemetryService,
|
||||
launcher IngressLauncher,
|
||||
@@ -61,6 +63,7 @@ func NewIngressServiceWithIngressLauncher(
|
||||
bus: bus,
|
||||
psrpcClient: psrpcClient,
|
||||
store: store,
|
||||
io: io,
|
||||
roomService: rs,
|
||||
telemetry: ts,
|
||||
launcher: launcher,
|
||||
@@ -73,10 +76,11 @@ func NewIngressService(
|
||||
bus psrpc.MessageBus,
|
||||
psrpcClient rpc.IngressClient,
|
||||
store IngressStore,
|
||||
io IOClient,
|
||||
rs livekit.RoomService,
|
||||
ts telemetry.TelemetryService,
|
||||
) *IngressService {
|
||||
s := NewIngressServiceWithIngressLauncher(conf, nodeID, bus, psrpcClient, store, rs, ts, nil)
|
||||
s := NewIngressServiceWithIngressLauncher(conf, nodeID, bus, psrpcClient, store, io, rs, ts, nil)
|
||||
|
||||
s.launcher = s
|
||||
|
||||
@@ -192,11 +196,19 @@ func (s *IngressService) CreateIngressWithUrl(ctx context.Context, urlStr string
|
||||
}
|
||||
}
|
||||
|
||||
if err = s.store.StoreIngress(ctx, info); err != nil {
|
||||
logger.Errorw("could not write ingress info", err)
|
||||
// TODO Remove this store Ingress call for URL pull as it is redundant since
|
||||
// the ingress service sends a CreateIngress RPC
|
||||
_, err = s.io.CreateIngress(ctx, info)
|
||||
switch err {
|
||||
case nil:
|
||||
break
|
||||
case ingress.ErrIngressOutOfDate:
|
||||
// Error returned if the ingress was already created by the ingress service
|
||||
err = nil
|
||||
default:
|
||||
logger.Errorw("could not create ingress object", err)
|
||||
return nil, err
|
||||
}
|
||||
s.telemetry.IngressCreated(ctx, info)
|
||||
|
||||
return info, nil
|
||||
}
|
||||
@@ -274,7 +286,10 @@ func (s *IngressService) UpdateIngress(ctx context.Context, req *livekit.UpdateI
|
||||
switch info.State.Status {
|
||||
case livekit.IngressState_ENDPOINT_ERROR:
|
||||
info.State.Status = livekit.IngressState_ENDPOINT_INACTIVE
|
||||
err = s.store.UpdateIngressState(ctx, req.IngressId, info.State)
|
||||
_, err = s.io.UpdateIngressState(ctx, &rpc.UpdateIngressStateRequest{
|
||||
IngressId: req.IngressId,
|
||||
State: info.State,
|
||||
})
|
||||
if err != nil {
|
||||
logger.Warnw("could not store ingress state", err)
|
||||
}
|
||||
|
||||
@@ -16,7 +16,6 @@ package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"google.golang.org/protobuf/types/known/emptypb"
|
||||
|
||||
@@ -78,6 +77,14 @@ func (s *IOInfoService) Start() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *IOInfoService) Stop() {
|
||||
close(s.shutdown)
|
||||
|
||||
if s.ioServer != nil {
|
||||
s.ioServer.Shutdown()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *IOInfoService) CreateEgress(ctx context.Context, info *livekit.EgressInfo) (*emptypb.Empty, error) {
|
||||
// check if egress already exists to avoid duplicate EgressStarted event
|
||||
if _, err := s.es.LoadEgress(ctx, info.EgressId); err == nil {
|
||||
@@ -156,79 +163,3 @@ func (s *IOInfoService) UpdateMetrics(ctx context.Context, req *rpc.UpdateMetric
|
||||
)
|
||||
return &emptypb.Empty{}, nil
|
||||
}
|
||||
|
||||
func (s *IOInfoService) GetIngressInfo(ctx context.Context, req *rpc.GetIngressInfoRequest) (*rpc.GetIngressInfoResponse, error) {
|
||||
info, err := s.loadIngressFromInfoRequest(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &rpc.GetIngressInfoResponse{Info: info}, nil
|
||||
}
|
||||
|
||||
func (s *IOInfoService) loadIngressFromInfoRequest(req *rpc.GetIngressInfoRequest) (info *livekit.IngressInfo, err error) {
|
||||
if req.IngressId != "" {
|
||||
info, err = s.is.LoadIngress(context.Background(), req.IngressId)
|
||||
} else if req.StreamKey != "" {
|
||||
info, err = s.is.LoadIngressFromStreamKey(context.Background(), req.StreamKey)
|
||||
} else {
|
||||
err = errors.New("request needs to specify either IngressId or StreamKey")
|
||||
}
|
||||
return info, err
|
||||
}
|
||||
|
||||
func (s *IOInfoService) UpdateIngressState(ctx context.Context, req *rpc.UpdateIngressStateRequest) (*emptypb.Empty, error) {
|
||||
info, err := s.is.LoadIngress(ctx, req.IngressId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err = s.is.UpdateIngressState(ctx, req.IngressId, req.State); err != nil {
|
||||
logger.Errorw("could not update ingress", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if info.State.Status != req.State.Status {
|
||||
info.State = req.State
|
||||
|
||||
switch req.State.Status {
|
||||
case livekit.IngressState_ENDPOINT_ERROR,
|
||||
livekit.IngressState_ENDPOINT_INACTIVE,
|
||||
livekit.IngressState_ENDPOINT_COMPLETE:
|
||||
s.telemetry.IngressEnded(ctx, info)
|
||||
|
||||
if req.State.Error != "" {
|
||||
logger.Infow("ingress failed", "error", req.State.Error, "ingressID", req.IngressId)
|
||||
} else {
|
||||
logger.Infow("ingress ended", "ingressID", req.IngressId)
|
||||
}
|
||||
|
||||
case livekit.IngressState_ENDPOINT_PUBLISHING:
|
||||
s.telemetry.IngressStarted(ctx, info)
|
||||
|
||||
logger.Infow("ingress started", "ingressID", req.IngressId)
|
||||
|
||||
case livekit.IngressState_ENDPOINT_BUFFERING:
|
||||
s.telemetry.IngressUpdated(ctx, info)
|
||||
|
||||
logger.Infow("ingress buffering", "ingressID", req.IngressId)
|
||||
}
|
||||
} else {
|
||||
// Status didn't change, send Updated event
|
||||
info.State = req.State
|
||||
|
||||
s.telemetry.IngressUpdated(ctx, info)
|
||||
|
||||
logger.Infow("ingress updated", "ingressID", req.IngressId, "status", info.State.Status)
|
||||
}
|
||||
|
||||
return &emptypb.Empty{}, nil
|
||||
}
|
||||
|
||||
func (s *IOInfoService) Stop() {
|
||||
close(s.shutdown)
|
||||
|
||||
if s.ioServer != nil {
|
||||
s.ioServer.Shutdown()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,90 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"github.com/livekit/protocol/livekit"
|
||||
"github.com/livekit/protocol/logger"
|
||||
"github.com/livekit/protocol/rpc"
|
||||
"google.golang.org/protobuf/types/known/emptypb"
|
||||
)
|
||||
|
||||
func (s *IOInfoService) CreateIngress(ctx context.Context, info *livekit.IngressInfo) (*emptypb.Empty, error) {
|
||||
err := s.is.StoreIngress(ctx, info)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s.telemetry.IngressCreated(ctx, info)
|
||||
|
||||
return &emptypb.Empty{}, nil
|
||||
}
|
||||
|
||||
func (s *IOInfoService) GetIngressInfo(ctx context.Context, req *rpc.GetIngressInfoRequest) (*rpc.GetIngressInfoResponse, error) {
|
||||
info, err := s.loadIngressFromInfoRequest(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &rpc.GetIngressInfoResponse{Info: info}, nil
|
||||
}
|
||||
|
||||
func (s *IOInfoService) loadIngressFromInfoRequest(req *rpc.GetIngressInfoRequest) (info *livekit.IngressInfo, err error) {
|
||||
if req.IngressId != "" {
|
||||
info, err = s.is.LoadIngress(context.Background(), req.IngressId)
|
||||
} else if req.StreamKey != "" {
|
||||
info, err = s.is.LoadIngressFromStreamKey(context.Background(), req.StreamKey)
|
||||
} else {
|
||||
err = errors.New("request needs to specify either IngressId or StreamKey")
|
||||
}
|
||||
return info, err
|
||||
}
|
||||
|
||||
func (s *IOInfoService) UpdateIngressState(ctx context.Context, req *rpc.UpdateIngressStateRequest) (*emptypb.Empty, error) {
|
||||
info, err := s.is.LoadIngress(ctx, req.IngressId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err = s.is.UpdateIngressState(ctx, req.IngressId, req.State); err != nil {
|
||||
logger.Errorw("could not update ingress", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if info.State.Status != req.State.Status {
|
||||
info.State = req.State
|
||||
|
||||
switch req.State.Status {
|
||||
case livekit.IngressState_ENDPOINT_ERROR,
|
||||
livekit.IngressState_ENDPOINT_INACTIVE,
|
||||
livekit.IngressState_ENDPOINT_COMPLETE:
|
||||
s.telemetry.IngressEnded(ctx, info)
|
||||
|
||||
if req.State.Error != "" {
|
||||
logger.Infow("ingress failed", "error", req.State.Error, "ingressID", req.IngressId)
|
||||
} else {
|
||||
logger.Infow("ingress ended", "ingressID", req.IngressId)
|
||||
}
|
||||
|
||||
case livekit.IngressState_ENDPOINT_PUBLISHING:
|
||||
s.telemetry.IngressStarted(ctx, info)
|
||||
|
||||
logger.Infow("ingress started", "ingressID", req.IngressId)
|
||||
|
||||
case livekit.IngressState_ENDPOINT_BUFFERING:
|
||||
s.telemetry.IngressUpdated(ctx, info)
|
||||
|
||||
logger.Infow("ingress buffering", "ingressID", req.IngressId)
|
||||
}
|
||||
} else {
|
||||
// Status didn't change, send Updated event
|
||||
info.State = req.State
|
||||
|
||||
s.telemetry.IngressUpdated(ctx, info)
|
||||
|
||||
logger.Infow("ingress state updated", "ingressID", req.IngressId, "status", info.State.Status)
|
||||
}
|
||||
|
||||
return &emptypb.Empty{}, nil
|
||||
}
|
||||
@@ -26,6 +26,20 @@ type FakeIOClient struct {
|
||||
result1 *emptypb.Empty
|
||||
result2 error
|
||||
}
|
||||
CreateIngressStub func(context.Context, *livekit.IngressInfo) (*emptypb.Empty, error)
|
||||
createIngressMutex sync.RWMutex
|
||||
createIngressArgsForCall []struct {
|
||||
arg1 context.Context
|
||||
arg2 *livekit.IngressInfo
|
||||
}
|
||||
createIngressReturns struct {
|
||||
result1 *emptypb.Empty
|
||||
result2 error
|
||||
}
|
||||
createIngressReturnsOnCall map[int]struct {
|
||||
result1 *emptypb.Empty
|
||||
result2 error
|
||||
}
|
||||
GetEgressStub func(context.Context, *rpc.GetEgressRequest) (*livekit.EgressInfo, error)
|
||||
getEgressMutex sync.RWMutex
|
||||
getEgressArgsForCall []struct {
|
||||
@@ -54,6 +68,20 @@ type FakeIOClient struct {
|
||||
result1 *livekit.ListEgressResponse
|
||||
result2 error
|
||||
}
|
||||
UpdateIngressStateStub func(context.Context, *rpc.UpdateIngressStateRequest) (*emptypb.Empty, error)
|
||||
updateIngressStateMutex sync.RWMutex
|
||||
updateIngressStateArgsForCall []struct {
|
||||
arg1 context.Context
|
||||
arg2 *rpc.UpdateIngressStateRequest
|
||||
}
|
||||
updateIngressStateReturns struct {
|
||||
result1 *emptypb.Empty
|
||||
result2 error
|
||||
}
|
||||
updateIngressStateReturnsOnCall map[int]struct {
|
||||
result1 *emptypb.Empty
|
||||
result2 error
|
||||
}
|
||||
invocations map[string][][]interface{}
|
||||
invocationsMutex sync.RWMutex
|
||||
}
|
||||
@@ -123,6 +151,71 @@ func (fake *FakeIOClient) CreateEgressReturnsOnCall(i int, result1 *emptypb.Empt
|
||||
}{result1, result2}
|
||||
}
|
||||
|
||||
func (fake *FakeIOClient) CreateIngress(arg1 context.Context, arg2 *livekit.IngressInfo) (*emptypb.Empty, error) {
|
||||
fake.createIngressMutex.Lock()
|
||||
ret, specificReturn := fake.createIngressReturnsOnCall[len(fake.createIngressArgsForCall)]
|
||||
fake.createIngressArgsForCall = append(fake.createIngressArgsForCall, struct {
|
||||
arg1 context.Context
|
||||
arg2 *livekit.IngressInfo
|
||||
}{arg1, arg2})
|
||||
stub := fake.CreateIngressStub
|
||||
fakeReturns := fake.createIngressReturns
|
||||
fake.recordInvocation("CreateIngress", []interface{}{arg1, arg2})
|
||||
fake.createIngressMutex.Unlock()
|
||||
if stub != nil {
|
||||
return stub(arg1, arg2)
|
||||
}
|
||||
if specificReturn {
|
||||
return ret.result1, ret.result2
|
||||
}
|
||||
return fakeReturns.result1, fakeReturns.result2
|
||||
}
|
||||
|
||||
func (fake *FakeIOClient) CreateIngressCallCount() int {
|
||||
fake.createIngressMutex.RLock()
|
||||
defer fake.createIngressMutex.RUnlock()
|
||||
return len(fake.createIngressArgsForCall)
|
||||
}
|
||||
|
||||
func (fake *FakeIOClient) CreateIngressCalls(stub func(context.Context, *livekit.IngressInfo) (*emptypb.Empty, error)) {
|
||||
fake.createIngressMutex.Lock()
|
||||
defer fake.createIngressMutex.Unlock()
|
||||
fake.CreateIngressStub = stub
|
||||
}
|
||||
|
||||
func (fake *FakeIOClient) CreateIngressArgsForCall(i int) (context.Context, *livekit.IngressInfo) {
|
||||
fake.createIngressMutex.RLock()
|
||||
defer fake.createIngressMutex.RUnlock()
|
||||
argsForCall := fake.createIngressArgsForCall[i]
|
||||
return argsForCall.arg1, argsForCall.arg2
|
||||
}
|
||||
|
||||
func (fake *FakeIOClient) CreateIngressReturns(result1 *emptypb.Empty, result2 error) {
|
||||
fake.createIngressMutex.Lock()
|
||||
defer fake.createIngressMutex.Unlock()
|
||||
fake.CreateIngressStub = nil
|
||||
fake.createIngressReturns = struct {
|
||||
result1 *emptypb.Empty
|
||||
result2 error
|
||||
}{result1, result2}
|
||||
}
|
||||
|
||||
func (fake *FakeIOClient) CreateIngressReturnsOnCall(i int, result1 *emptypb.Empty, result2 error) {
|
||||
fake.createIngressMutex.Lock()
|
||||
defer fake.createIngressMutex.Unlock()
|
||||
fake.CreateIngressStub = nil
|
||||
if fake.createIngressReturnsOnCall == nil {
|
||||
fake.createIngressReturnsOnCall = make(map[int]struct {
|
||||
result1 *emptypb.Empty
|
||||
result2 error
|
||||
})
|
||||
}
|
||||
fake.createIngressReturnsOnCall[i] = struct {
|
||||
result1 *emptypb.Empty
|
||||
result2 error
|
||||
}{result1, result2}
|
||||
}
|
||||
|
||||
func (fake *FakeIOClient) GetEgress(arg1 context.Context, arg2 *rpc.GetEgressRequest) (*livekit.EgressInfo, error) {
|
||||
fake.getEgressMutex.Lock()
|
||||
ret, specificReturn := fake.getEgressReturnsOnCall[len(fake.getEgressArgsForCall)]
|
||||
@@ -253,15 +346,84 @@ func (fake *FakeIOClient) ListEgressReturnsOnCall(i int, result1 *livekit.ListEg
|
||||
}{result1, result2}
|
||||
}
|
||||
|
||||
func (fake *FakeIOClient) UpdateIngressState(arg1 context.Context, arg2 *rpc.UpdateIngressStateRequest) (*emptypb.Empty, error) {
|
||||
fake.updateIngressStateMutex.Lock()
|
||||
ret, specificReturn := fake.updateIngressStateReturnsOnCall[len(fake.updateIngressStateArgsForCall)]
|
||||
fake.updateIngressStateArgsForCall = append(fake.updateIngressStateArgsForCall, struct {
|
||||
arg1 context.Context
|
||||
arg2 *rpc.UpdateIngressStateRequest
|
||||
}{arg1, arg2})
|
||||
stub := fake.UpdateIngressStateStub
|
||||
fakeReturns := fake.updateIngressStateReturns
|
||||
fake.recordInvocation("UpdateIngressState", []interface{}{arg1, arg2})
|
||||
fake.updateIngressStateMutex.Unlock()
|
||||
if stub != nil {
|
||||
return stub(arg1, arg2)
|
||||
}
|
||||
if specificReturn {
|
||||
return ret.result1, ret.result2
|
||||
}
|
||||
return fakeReturns.result1, fakeReturns.result2
|
||||
}
|
||||
|
||||
func (fake *FakeIOClient) UpdateIngressStateCallCount() int {
|
||||
fake.updateIngressStateMutex.RLock()
|
||||
defer fake.updateIngressStateMutex.RUnlock()
|
||||
return len(fake.updateIngressStateArgsForCall)
|
||||
}
|
||||
|
||||
func (fake *FakeIOClient) UpdateIngressStateCalls(stub func(context.Context, *rpc.UpdateIngressStateRequest) (*emptypb.Empty, error)) {
|
||||
fake.updateIngressStateMutex.Lock()
|
||||
defer fake.updateIngressStateMutex.Unlock()
|
||||
fake.UpdateIngressStateStub = stub
|
||||
}
|
||||
|
||||
func (fake *FakeIOClient) UpdateIngressStateArgsForCall(i int) (context.Context, *rpc.UpdateIngressStateRequest) {
|
||||
fake.updateIngressStateMutex.RLock()
|
||||
defer fake.updateIngressStateMutex.RUnlock()
|
||||
argsForCall := fake.updateIngressStateArgsForCall[i]
|
||||
return argsForCall.arg1, argsForCall.arg2
|
||||
}
|
||||
|
||||
func (fake *FakeIOClient) UpdateIngressStateReturns(result1 *emptypb.Empty, result2 error) {
|
||||
fake.updateIngressStateMutex.Lock()
|
||||
defer fake.updateIngressStateMutex.Unlock()
|
||||
fake.UpdateIngressStateStub = nil
|
||||
fake.updateIngressStateReturns = struct {
|
||||
result1 *emptypb.Empty
|
||||
result2 error
|
||||
}{result1, result2}
|
||||
}
|
||||
|
||||
func (fake *FakeIOClient) UpdateIngressStateReturnsOnCall(i int, result1 *emptypb.Empty, result2 error) {
|
||||
fake.updateIngressStateMutex.Lock()
|
||||
defer fake.updateIngressStateMutex.Unlock()
|
||||
fake.UpdateIngressStateStub = nil
|
||||
if fake.updateIngressStateReturnsOnCall == nil {
|
||||
fake.updateIngressStateReturnsOnCall = make(map[int]struct {
|
||||
result1 *emptypb.Empty
|
||||
result2 error
|
||||
})
|
||||
}
|
||||
fake.updateIngressStateReturnsOnCall[i] = struct {
|
||||
result1 *emptypb.Empty
|
||||
result2 error
|
||||
}{result1, result2}
|
||||
}
|
||||
|
||||
func (fake *FakeIOClient) Invocations() map[string][][]interface{} {
|
||||
fake.invocationsMutex.RLock()
|
||||
defer fake.invocationsMutex.RUnlock()
|
||||
fake.createEgressMutex.RLock()
|
||||
defer fake.createEgressMutex.RUnlock()
|
||||
fake.createIngressMutex.RLock()
|
||||
defer fake.createIngressMutex.RUnlock()
|
||||
fake.getEgressMutex.RLock()
|
||||
defer fake.getEgressMutex.RUnlock()
|
||||
fake.listEgressMutex.RLock()
|
||||
defer fake.listEgressMutex.RUnlock()
|
||||
fake.updateIngressStateMutex.RLock()
|
||||
defer fake.updateIngressStateMutex.RUnlock()
|
||||
copiedInvocations := map[string][][]interface{}{}
|
||||
for key, value := range fake.invocations {
|
||||
copiedInvocations[key] = value
|
||||
|
||||
@@ -106,7 +106,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ingressService := NewIngressService(ingressConfig, nodeID, messageBus, ingressClient, ingressStore, roomService, telemetryService)
|
||||
ingressService := NewIngressService(ingressConfig, nodeID, messageBus, ingressClient, ingressStore, ioInfoService, roomService, telemetryService)
|
||||
sipConfig := getSIPConfig(conf)
|
||||
sipClient, err := rpc.NewSIPClient(messageBus)
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user