diff --git a/pkg/config/config.go b/pkg/config/config.go index f97930057..09587d1c6 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -244,7 +244,6 @@ type EgressConfig struct { type IngressConfig struct { RTMPBaseURL string `yaml:"rtmp_base_url"` - UsePsRPC bool `yaml:"use_psrpc"` } // not exposed to YAML diff --git a/pkg/service/ingress.go b/pkg/service/ingress.go index 9a6960c66..6d66474a9 100644 --- a/pkg/service/ingress.go +++ b/pkg/service/ingress.go @@ -2,11 +2,9 @@ package service import ( "context" - "time" "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/telemetry" - "github.com/livekit/protocol/ingress" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/rpc" @@ -14,17 +12,11 @@ import ( "github.com/livekit/psrpc" ) -var ( - initialTimeout = time.Second * 3 - retryTimeout = time.Minute * 1 -) - type IngressService struct { conf *config.IngressConfig nodeID livekit.NodeID bus psrpc.MessageBus psrpcClient rpc.IngressClient - rpcClient ingress.RPCClient store IngressStore roomService livekit.RoomService telemetry telemetry.TelemetryService @@ -35,7 +27,6 @@ func NewIngressService( nodeID livekit.NodeID, bus psrpc.MessageBus, psrpcClient rpc.IngressClient, - rpcClient ingress.RPCClient, store IngressStore, rs livekit.RoomService, ts telemetry.TelemetryService, @@ -46,7 +37,6 @@ func NewIngressService( nodeID: nodeID, bus: bus, psrpcClient: psrpcClient, - rpcClient: rpcClient, store: store, roomService: rs, telemetry: ts, @@ -100,7 +90,7 @@ func (s *IngressService) CreateIngressWithUrlPrefix(ctx context.Context, urlPref State: &livekit.IngressState{}, } - if err := s.store.StoreIngress(ctx, info); err != nil { + if err = s.store.StoreIngress(ctx, info); err != nil { logger.Errorw("could not write ingress info", err) return nil, err } @@ -108,41 +98,6 @@ func (s *IngressService) CreateIngressWithUrlPrefix(ctx context.Context, urlPref return info, nil } -func (s *IngressService) sendRPCWithRetry(ctx context.Context, req *livekit.IngressRequest) (*livekit.IngressState, error) { - type result struct { - state *livekit.IngressState - err error - } - - resChan := make(chan result, 1) - - go func() { - cctx, _ := context.WithTimeout(context.Background(), retryTimeout) - - for { - select { - case <-cctx.Done(): - resChan <- result{nil, ingress.ErrNoResponse} - return - default: - } - - s, err := s.rpcClient.SendRequest(cctx, req) - if err != ingress.ErrNoResponse { - resChan <- result{s, err} - return - } - } - }() - - select { - case res := <-resChan: - return res.state, res.err - case <-time.After(initialTimeout): - return nil, ingress.ErrNoResponse - } -} - func (s *IngressService) UpdateIngress(ctx context.Context, req *livekit.UpdateIngressRequest) (*livekit.IngressInfo, error) { fields := []interface{}{ "ingress", req.IngressId, @@ -157,7 +112,7 @@ func (s *IngressService) UpdateIngress(ctx context.Context, req *livekit.UpdateI return nil, twirpAuthError(err) } - if s.psrpcClient == nil && s.rpcClient == nil { + if s.psrpcClient == nil { return nil, ErrIngressNotConnected } @@ -199,21 +154,7 @@ func (s *IngressService) UpdateIngress(ctx context.Context, req *livekit.UpdateI case livekit.IngressState_ENDPOINT_BUFFERING, livekit.IngressState_ENDPOINT_PUBLISHING: // Do not update store the returned state as the ingress service will do it - race := rpc.NewRace[livekit.IngressState](ctx) - if s.rpcClient != nil { - race.Go(func(ctx context.Context) (*livekit.IngressState, error) { - return s.sendRPCWithRetry(ctx, &livekit.IngressRequest{ - IngressId: req.IngressId, - Request: &livekit.IngressRequest_Update{Update: req}, - }) - }) - } - if s.psrpcClient != nil { - race.Go(func(ctx context.Context) (*livekit.IngressState, error) { - return s.psrpcClient.UpdateIngress(ctx, req.IngressId, req) - }) - } - if _, _, err := race.Wait(); err != nil { + if _, err = s.psrpcClient.UpdateIngress(ctx, req.IngressId, req); err != nil { logger.Warnw("could not update active ingress", err) } } @@ -252,7 +193,7 @@ func (s *IngressService) DeleteIngress(ctx context.Context, req *livekit.DeleteI return nil, twirpAuthError(err) } - if s.psrpcClient == nil && s.rpcClient == nil { + if s.psrpcClient == nil { return nil, ErrIngressNotConnected } @@ -264,21 +205,7 @@ func (s *IngressService) DeleteIngress(ctx context.Context, req *livekit.DeleteI switch info.State.Status { case livekit.IngressState_ENDPOINT_BUFFERING, livekit.IngressState_ENDPOINT_PUBLISHING: - race := rpc.NewRace[livekit.IngressState](ctx) - if s.rpcClient != nil { - race.Go(func(ctx context.Context) (*livekit.IngressState, error) { - return s.sendRPCWithRetry(ctx, &livekit.IngressRequest{ - IngressId: req.IngressId, - Request: &livekit.IngressRequest_Delete{Delete: req}, - }) - }) - } - if s.psrpcClient != nil { - race.Go(func(ctx context.Context) (*livekit.IngressState, error) { - return s.psrpcClient.DeleteIngress(ctx, req.IngressId, req) - }) - } - if _, _, err := race.Wait(); err != nil { + if _, err = s.psrpcClient.DeleteIngress(ctx, req.IngressId, req); err != nil { logger.Warnw("could not stop active ingress", err) } } diff --git a/pkg/service/ioinfo.go b/pkg/service/ioinfo.go index 11f5381b3..345f41f7e 100644 --- a/pkg/service/ioinfo.go +++ b/pkg/service/ioinfo.go @@ -10,7 +10,6 @@ import ( "github.com/livekit/livekit-server/pkg/telemetry" "github.com/livekit/protocol/egress" - "github.com/livekit/protocol/ingress" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/rpc" @@ -23,7 +22,6 @@ type IOInfoService struct { is IngressStore telemetry telemetry.TelemetryService ecDeprecated egress.RPCClient - icDeprecated ingress.RPCClient shutdown chan struct{} } @@ -34,14 +32,12 @@ func NewIOInfoService( is IngressStore, ts telemetry.TelemetryService, ec egress.RPCClient, - ic ingress.RPCClient, ) (*IOInfoService, error) { s := &IOInfoService{ es: es, is: is, telemetry: ts, ecDeprecated: ec, - icDeprecated: ic, shutdown: make(chan struct{}), } @@ -66,7 +62,6 @@ func (s *IOInfoService) Start() error { } go s.egressWorkerDeprecated() - go s.ingressWorkerDeprecated() } return nil @@ -180,72 +175,3 @@ func (s *IOInfoService) egressWorkerDeprecated() error { return nil } - -// Deprecated -func (s *IOInfoService) ingressWorkerDeprecated() { - if s.icDeprecated == nil { - return - } - - updates, err := s.icDeprecated.GetUpdateChannel(context.Background()) - if err != nil { - logger.Errorw("failed to subscribe to results channel", err) - return - } - - entities, err := s.icDeprecated.GetEntityChannel(context.Background()) - if err != nil { - logger.Errorw("failed to subscribe to entities channel", err) - _ = updates.Close() - return - } - - updateChan := updates.Channel() - entityChan := entities.Channel() - for { - select { - case msg := <-updateChan: - b := updates.Payload(msg) - - res := &livekit.UpdateIngressStateRequest{} - if err = proto.Unmarshal(b, res); err != nil { - logger.Errorw("failed to read results", err) - continue - } - - // save updated info to store - err = s.is.UpdateIngressState(context.Background(), res.IngressId, res.State) - if err != nil { - logger.Errorw("could not update ingress", err) - } - - case msg := <-entityChan: - b := entities.Payload(msg) - - req := &livekit.GetIngressInfoRequest{} - if err = proto.Unmarshal(b, req); err != nil { - logger.Errorw("failed to read request", err) - continue - } - - info, err := s.loadIngressFromInfoRequest(&rpc.GetIngressInfoRequest{ - IngressId: req.IngressId, - StreamKey: req.StreamKey, - }) - if err != nil { - logger.Errorw("failed to load ingress info", err) - continue - } - - err = s.icDeprecated.SendGetIngressInfoResponse(context.Background(), req, &livekit.GetIngressInfoResponse{Info: info}, err) - if err != nil { - logger.Errorw("could not send response", err) - } - - case <-s.shutdown: - _ = updates.Close() - _ = entities.Close() - return - } - } -} diff --git a/pkg/service/wire.go b/pkg/service/wire.go index 24e718c63..9626a7f1a 100644 --- a/pkg/service/wire.go +++ b/pkg/service/wire.go @@ -20,7 +20,6 @@ import ( "github.com/livekit/livekit-server/pkg/utils" "github.com/livekit/protocol/auth" "github.com/livekit/protocol/egress" - "github.com/livekit/protocol/ingress" "github.com/livekit/protocol/livekit" redisLiveKit "github.com/livekit/protocol/redis" "github.com/livekit/protocol/rpc" @@ -51,11 +50,9 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live getEgressStore, NewEgressLauncher, NewEgressService, - ingress.NewRedisRPC, - getIngressClient, + rpc.NewIngressClient, getIngressStore, getIngressConfig, - getIngressRPCClient, NewIngressService, NewRoomAllocator, NewRoomService, @@ -161,14 +158,6 @@ func getEgressStore(s ObjectStore) EgressStore { } } -func getIngressClient(conf *config.Config, nodeID livekit.NodeID, bus psrpc.MessageBus) (rpc.IngressClient, error) { - if conf.Ingress.UsePsRPC { - return rpc.NewIngressClient(nodeID, bus) - } - - return nil, nil -} - func getIngressStore(s ObjectStore) IngressStore { switch store := s.(type) { case *RedisStore: @@ -182,10 +171,6 @@ func getIngressConfig(conf *config.Config) *config.IngressConfig { return &conf.Ingress } -func getIngressRPCClient(rpc ingress.RPC) ingress.RPCClient { - return rpc -} - func createClientConfiguration() clientconfiguration.ClientConfigurationManager { return clientconfiguration.NewStaticClientConfigurationManager(clientconfiguration.StaticConfigurations) } diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 986ce89c7..31e0f5fc7 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -15,7 +15,6 @@ import ( "github.com/livekit/livekit-server/pkg/utils" "github.com/livekit/protocol/auth" "github.com/livekit/protocol/egress" - "github.com/livekit/protocol/ingress" "github.com/livekit/protocol/livekit" redis2 "github.com/livekit/protocol/redis" "github.com/livekit/protocol/rpc" @@ -72,15 +71,13 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live } egressService := NewEgressService(egressClient, rpcClient, objectStore, egressStore, roomService, telemetryService, rtcEgressLauncher) ingressConfig := getIngressConfig(conf) - ingressClient, err := getIngressClient(conf, nodeID, messageBus) + ingressClient, err := rpc.NewIngressClient(nodeID, messageBus) if err != nil { return nil, err } - rpc := ingress.NewRedisRPC(nodeID, universalClient) - ingressRPCClient := getIngressRPCClient(rpc) ingressStore := getIngressStore(objectStore) - ingressService := NewIngressService(ingressConfig, nodeID, messageBus, ingressClient, ingressRPCClient, ingressStore, roomService, telemetryService) - ioInfoService, err := NewIOInfoService(nodeID, messageBus, egressStore, ingressStore, telemetryService, rpcClient, ingressRPCClient) + ingressService := NewIngressService(ingressConfig, nodeID, messageBus, ingressClient, ingressStore, roomService, telemetryService) + ioInfoService, err := NewIOInfoService(nodeID, messageBus, egressStore, ingressStore, telemetryService, rpcClient) if err != nil { return nil, err } @@ -197,14 +194,6 @@ func getEgressStore(s ObjectStore) EgressStore { } } -func getIngressClient(conf *config.Config, nodeID livekit.NodeID, bus psrpc.MessageBus) (rpc.IngressClient, error) { - if conf.Ingress.UsePsRPC { - return rpc.NewIngressClient(nodeID, bus) - } - - return nil, nil -} - func getIngressStore(s ObjectStore) IngressStore { switch store := s.(type) { case *RedisStore: @@ -218,10 +207,6 @@ func getIngressConfig(conf *config.Config) *config.IngressConfig { return &conf.Ingress } -func getIngressRPCClient(rpc2 ingress.RPC) ingress.RPCClient { - return rpc2 -} - func createClientConfiguration() clientconfiguration.ClientConfigurationManager { return clientconfiguration.NewStaticClientConfigurationManager(clientconfiguration.StaticConfigurations) }