Remove deprecated ingress rpc (#1439)

* remove legacy ingress rpcs

* remove from io service
This commit is contained in:
David Colburn
2023-02-17 11:40:38 -08:00
committed by GitHub
parent 67b259bda4
commit 6da9e85636
5 changed files with 9 additions and 187 deletions
-1
View File
@@ -244,7 +244,6 @@ type EgressConfig struct {
type IngressConfig struct {
RTMPBaseURL string `yaml:"rtmp_base_url"`
UsePsRPC bool `yaml:"use_psrpc"`
}
// not exposed to YAML
+5 -78
View File
@@ -2,11 +2,9 @@ package service
import (
"context"
"time"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/protocol/ingress"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
@@ -14,17 +12,11 @@ import (
"github.com/livekit/psrpc"
)
var (
initialTimeout = time.Second * 3
retryTimeout = time.Minute * 1
)
type IngressService struct {
conf *config.IngressConfig
nodeID livekit.NodeID
bus psrpc.MessageBus
psrpcClient rpc.IngressClient
rpcClient ingress.RPCClient
store IngressStore
roomService livekit.RoomService
telemetry telemetry.TelemetryService
@@ -35,7 +27,6 @@ func NewIngressService(
nodeID livekit.NodeID,
bus psrpc.MessageBus,
psrpcClient rpc.IngressClient,
rpcClient ingress.RPCClient,
store IngressStore,
rs livekit.RoomService,
ts telemetry.TelemetryService,
@@ -46,7 +37,6 @@ func NewIngressService(
nodeID: nodeID,
bus: bus,
psrpcClient: psrpcClient,
rpcClient: rpcClient,
store: store,
roomService: rs,
telemetry: ts,
@@ -100,7 +90,7 @@ func (s *IngressService) CreateIngressWithUrlPrefix(ctx context.Context, urlPref
State: &livekit.IngressState{},
}
if err := s.store.StoreIngress(ctx, info); err != nil {
if err = s.store.StoreIngress(ctx, info); err != nil {
logger.Errorw("could not write ingress info", err)
return nil, err
}
@@ -108,41 +98,6 @@ func (s *IngressService) CreateIngressWithUrlPrefix(ctx context.Context, urlPref
return info, nil
}
func (s *IngressService) sendRPCWithRetry(ctx context.Context, req *livekit.IngressRequest) (*livekit.IngressState, error) {
type result struct {
state *livekit.IngressState
err error
}
resChan := make(chan result, 1)
go func() {
cctx, _ := context.WithTimeout(context.Background(), retryTimeout)
for {
select {
case <-cctx.Done():
resChan <- result{nil, ingress.ErrNoResponse}
return
default:
}
s, err := s.rpcClient.SendRequest(cctx, req)
if err != ingress.ErrNoResponse {
resChan <- result{s, err}
return
}
}
}()
select {
case res := <-resChan:
return res.state, res.err
case <-time.After(initialTimeout):
return nil, ingress.ErrNoResponse
}
}
func (s *IngressService) UpdateIngress(ctx context.Context, req *livekit.UpdateIngressRequest) (*livekit.IngressInfo, error) {
fields := []interface{}{
"ingress", req.IngressId,
@@ -157,7 +112,7 @@ func (s *IngressService) UpdateIngress(ctx context.Context, req *livekit.UpdateI
return nil, twirpAuthError(err)
}
if s.psrpcClient == nil && s.rpcClient == nil {
if s.psrpcClient == nil {
return nil, ErrIngressNotConnected
}
@@ -199,21 +154,7 @@ func (s *IngressService) UpdateIngress(ctx context.Context, req *livekit.UpdateI
case livekit.IngressState_ENDPOINT_BUFFERING,
livekit.IngressState_ENDPOINT_PUBLISHING:
// Do not update store the returned state as the ingress service will do it
race := rpc.NewRace[livekit.IngressState](ctx)
if s.rpcClient != nil {
race.Go(func(ctx context.Context) (*livekit.IngressState, error) {
return s.sendRPCWithRetry(ctx, &livekit.IngressRequest{
IngressId: req.IngressId,
Request: &livekit.IngressRequest_Update{Update: req},
})
})
}
if s.psrpcClient != nil {
race.Go(func(ctx context.Context) (*livekit.IngressState, error) {
return s.psrpcClient.UpdateIngress(ctx, req.IngressId, req)
})
}
if _, _, err := race.Wait(); err != nil {
if _, err = s.psrpcClient.UpdateIngress(ctx, req.IngressId, req); err != nil {
logger.Warnw("could not update active ingress", err)
}
}
@@ -252,7 +193,7 @@ func (s *IngressService) DeleteIngress(ctx context.Context, req *livekit.DeleteI
return nil, twirpAuthError(err)
}
if s.psrpcClient == nil && s.rpcClient == nil {
if s.psrpcClient == nil {
return nil, ErrIngressNotConnected
}
@@ -264,21 +205,7 @@ func (s *IngressService) DeleteIngress(ctx context.Context, req *livekit.DeleteI
switch info.State.Status {
case livekit.IngressState_ENDPOINT_BUFFERING,
livekit.IngressState_ENDPOINT_PUBLISHING:
race := rpc.NewRace[livekit.IngressState](ctx)
if s.rpcClient != nil {
race.Go(func(ctx context.Context) (*livekit.IngressState, error) {
return s.sendRPCWithRetry(ctx, &livekit.IngressRequest{
IngressId: req.IngressId,
Request: &livekit.IngressRequest_Delete{Delete: req},
})
})
}
if s.psrpcClient != nil {
race.Go(func(ctx context.Context) (*livekit.IngressState, error) {
return s.psrpcClient.DeleteIngress(ctx, req.IngressId, req)
})
}
if _, _, err := race.Wait(); err != nil {
if _, err = s.psrpcClient.DeleteIngress(ctx, req.IngressId, req); err != nil {
logger.Warnw("could not stop active ingress", err)
}
}
-74
View File
@@ -10,7 +10,6 @@ import (
"github.com/livekit/livekit-server/pkg/telemetry"
"github.com/livekit/protocol/egress"
"github.com/livekit/protocol/ingress"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
@@ -23,7 +22,6 @@ type IOInfoService struct {
is IngressStore
telemetry telemetry.TelemetryService
ecDeprecated egress.RPCClient
icDeprecated ingress.RPCClient
shutdown chan struct{}
}
@@ -34,14 +32,12 @@ func NewIOInfoService(
is IngressStore,
ts telemetry.TelemetryService,
ec egress.RPCClient,
ic ingress.RPCClient,
) (*IOInfoService, error) {
s := &IOInfoService{
es: es,
is: is,
telemetry: ts,
ecDeprecated: ec,
icDeprecated: ic,
shutdown: make(chan struct{}),
}
@@ -66,7 +62,6 @@ func (s *IOInfoService) Start() error {
}
go s.egressWorkerDeprecated()
go s.ingressWorkerDeprecated()
}
return nil
@@ -180,72 +175,3 @@ func (s *IOInfoService) egressWorkerDeprecated() error {
return nil
}
// Deprecated
func (s *IOInfoService) ingressWorkerDeprecated() {
if s.icDeprecated == nil {
return
}
updates, err := s.icDeprecated.GetUpdateChannel(context.Background())
if err != nil {
logger.Errorw("failed to subscribe to results channel", err)
return
}
entities, err := s.icDeprecated.GetEntityChannel(context.Background())
if err != nil {
logger.Errorw("failed to subscribe to entities channel", err)
_ = updates.Close()
return
}
updateChan := updates.Channel()
entityChan := entities.Channel()
for {
select {
case msg := <-updateChan:
b := updates.Payload(msg)
res := &livekit.UpdateIngressStateRequest{}
if err = proto.Unmarshal(b, res); err != nil {
logger.Errorw("failed to read results", err)
continue
}
// save updated info to store
err = s.is.UpdateIngressState(context.Background(), res.IngressId, res.State)
if err != nil {
logger.Errorw("could not update ingress", err)
}
case msg := <-entityChan:
b := entities.Payload(msg)
req := &livekit.GetIngressInfoRequest{}
if err = proto.Unmarshal(b, req); err != nil {
logger.Errorw("failed to read request", err)
continue
}
info, err := s.loadIngressFromInfoRequest(&rpc.GetIngressInfoRequest{
IngressId: req.IngressId,
StreamKey: req.StreamKey,
})
if err != nil {
logger.Errorw("failed to load ingress info", err)
continue
}
err = s.icDeprecated.SendGetIngressInfoResponse(context.Background(), req, &livekit.GetIngressInfoResponse{Info: info}, err)
if err != nil {
logger.Errorw("could not send response", err)
}
case <-s.shutdown:
_ = updates.Close()
_ = entities.Close()
return
}
}
}
+1 -16
View File
@@ -20,7 +20,6 @@ import (
"github.com/livekit/livekit-server/pkg/utils"
"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/egress"
"github.com/livekit/protocol/ingress"
"github.com/livekit/protocol/livekit"
redisLiveKit "github.com/livekit/protocol/redis"
"github.com/livekit/protocol/rpc"
@@ -51,11 +50,9 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
getEgressStore,
NewEgressLauncher,
NewEgressService,
ingress.NewRedisRPC,
getIngressClient,
rpc.NewIngressClient,
getIngressStore,
getIngressConfig,
getIngressRPCClient,
NewIngressService,
NewRoomAllocator,
NewRoomService,
@@ -161,14 +158,6 @@ func getEgressStore(s ObjectStore) EgressStore {
}
}
func getIngressClient(conf *config.Config, nodeID livekit.NodeID, bus psrpc.MessageBus) (rpc.IngressClient, error) {
if conf.Ingress.UsePsRPC {
return rpc.NewIngressClient(nodeID, bus)
}
return nil, nil
}
func getIngressStore(s ObjectStore) IngressStore {
switch store := s.(type) {
case *RedisStore:
@@ -182,10 +171,6 @@ func getIngressConfig(conf *config.Config) *config.IngressConfig {
return &conf.Ingress
}
func getIngressRPCClient(rpc ingress.RPC) ingress.RPCClient {
return rpc
}
func createClientConfiguration() clientconfiguration.ClientConfigurationManager {
return clientconfiguration.NewStaticClientConfigurationManager(clientconfiguration.StaticConfigurations)
}
+3 -18
View File
@@ -15,7 +15,6 @@ import (
"github.com/livekit/livekit-server/pkg/utils"
"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/egress"
"github.com/livekit/protocol/ingress"
"github.com/livekit/protocol/livekit"
redis2 "github.com/livekit/protocol/redis"
"github.com/livekit/protocol/rpc"
@@ -72,15 +71,13 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
}
egressService := NewEgressService(egressClient, rpcClient, objectStore, egressStore, roomService, telemetryService, rtcEgressLauncher)
ingressConfig := getIngressConfig(conf)
ingressClient, err := getIngressClient(conf, nodeID, messageBus)
ingressClient, err := rpc.NewIngressClient(nodeID, messageBus)
if err != nil {
return nil, err
}
rpc := ingress.NewRedisRPC(nodeID, universalClient)
ingressRPCClient := getIngressRPCClient(rpc)
ingressStore := getIngressStore(objectStore)
ingressService := NewIngressService(ingressConfig, nodeID, messageBus, ingressClient, ingressRPCClient, ingressStore, roomService, telemetryService)
ioInfoService, err := NewIOInfoService(nodeID, messageBus, egressStore, ingressStore, telemetryService, rpcClient, ingressRPCClient)
ingressService := NewIngressService(ingressConfig, nodeID, messageBus, ingressClient, ingressStore, roomService, telemetryService)
ioInfoService, err := NewIOInfoService(nodeID, messageBus, egressStore, ingressStore, telemetryService, rpcClient)
if err != nil {
return nil, err
}
@@ -197,14 +194,6 @@ func getEgressStore(s ObjectStore) EgressStore {
}
}
func getIngressClient(conf *config.Config, nodeID livekit.NodeID, bus psrpc.MessageBus) (rpc.IngressClient, error) {
if conf.Ingress.UsePsRPC {
return rpc.NewIngressClient(nodeID, bus)
}
return nil, nil
}
func getIngressStore(s ObjectStore) IngressStore {
switch store := s.(type) {
case *RedisStore:
@@ -218,10 +207,6 @@ func getIngressConfig(conf *config.Config) *config.IngressConfig {
return &conf.Ingress
}
func getIngressRPCClient(rpc2 ingress.RPC) ingress.RPCClient {
return rpc2
}
func createClientConfiguration() clientconfiguration.ClientConfigurationManager {
return clientconfiguration.NewStaticClientConfigurationManager(clientconfiguration.StaticConfigurations)
}