From 1f1eeb6832d710d9944ed7e6cc7eac288da9662d Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Wed, 25 Mar 2026 11:09:52 +0800 Subject: [PATCH] Fallback to servicestore if rpc is unavailable (#4391) * Fallback to servicestore if rpc is unavailable compatibility mode for #4387 * conf --- pkg/config/config.go | 6 ++++++ pkg/service/interfaces.go | 2 ++ pkg/service/localstore.go | 2 ++ pkg/service/redisstore.go | 2 ++ pkg/service/roomservice.go | 29 +++++++++++++++++++++++++---- pkg/service/wire.go | 6 +++++- pkg/service/wire_gen.go | 6 +++++- 7 files changed, 47 insertions(+), 6 deletions(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index 12411b727..cf7814303 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -88,6 +88,8 @@ type Config struct { NodeStats NodeStatsConfig `yaml:"node_stats,omitempty"` EnableDataTracks bool `yaml:"enable_data_tracks,omitempty"` + + API APIConfig `yaml:"api,omitempty"` } type RTCConfig struct { @@ -308,6 +310,9 @@ type APIConfig struct { // max amount of time to wait before checking for operation complete MaxCheckInterval time.Duration `yaml:"max_check_interval,omitempty"` + + // Backwards compatibility for room service api calls, will enable by default and remove in a future release + EnablePsrpcForGetListParticpants bool `yaml:"enable_psrpc_for_get_list_participants,omitempty"` } type PrometheusConfig struct { @@ -434,6 +439,7 @@ var DefaultConfig = Config{ Metric: metric.DefaultMetricConfig, WebHook: webhook.DefaultWebHookConfig, NodeStats: DefaultNodeStatsConfig, + API: DefaultAPIConfig(), } func NewConfig(confString string, strictMode bool, c *cli.Command, baseFlags []cli.Flag) (*Config, error) { diff --git a/pkg/service/interfaces.go b/pkg/service/interfaces.go index 73bba2b6c..22a8fddb4 100644 --- a/pkg/service/interfaces.go +++ b/pkg/service/interfaces.go @@ -54,6 +54,8 @@ type ServiceStore interface { type OSSServiceStore interface { DeleteRoom(ctx context.Context, roomName livekit.RoomName) error HasParticipant(context.Context, livekit.RoomName, livekit.ParticipantIdentity) (bool, error) + LoadParticipant(ctx context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity) (*livekit.ParticipantInfo, error) + ListParticipants(ctx context.Context, roomName livekit.RoomName) ([]*livekit.ParticipantInfo, error) } //counterfeiter:generate . EgressStore diff --git a/pkg/service/localstore.go b/pkg/service/localstore.go index 447985756..1cf747f94 100644 --- a/pkg/service/localstore.go +++ b/pkg/service/localstore.go @@ -25,6 +25,8 @@ import ( "github.com/livekit/protocol/utils" ) +var _ OSSServiceStore = (*LocalStore)(nil) + // encapsulates CRUD operations for room settings type LocalStore struct { // map of roomName => room diff --git a/pkg/service/redisstore.go b/pkg/service/redisstore.go index 8d9387f80..292ed9524 100644 --- a/pkg/service/redisstore.go +++ b/pkg/service/redisstore.go @@ -68,6 +68,8 @@ const ( maxRetries = 5 ) +var _ OSSServiceStore = (*RedisStore)(nil) + type RedisStore struct { rc redis.UniversalClient unlockScript *redis.Script diff --git a/pkg/service/roomservice.go b/pkg/service/roomservice.go index d1f317163..3579eecc2 100644 --- a/pkg/service/roomservice.go +++ b/pkg/service/roomservice.go @@ -27,6 +27,7 @@ import ( "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/rpc" + "github.com/livekit/psrpc" ) type RoomService struct { @@ -153,7 +154,7 @@ func (s *RoomService) DeleteRoom(ctx context.Context, req *livekit.DeleteRoomReq return res, err } -func (s *RoomService) ListParticipants(ctx context.Context, req *livekit.ListParticipantsRequest) (*livekit.ListParticipantsResponse, error) { +func (s *RoomService) ListParticipants(ctx context.Context, req *livekit.ListParticipantsRequest) (res *livekit.ListParticipantsResponse, err error) { RecordRequest(ctx, req) AppendLogFields(ctx, "room", req.Room) @@ -161,7 +162,20 @@ func (s *RoomService) ListParticipants(ctx context.Context, req *livekit.ListPar return nil, twirpAuthError(err) } - res, err := s.roomClient.ListParticipants(ctx, s.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), req) + if s.apiConf.EnablePsrpcForGetListParticpants { + res, err = s.roomClient.ListParticipants(ctx, s.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), req) + } else if store, ok := s.roomStore.(OSSServiceStore); ok { + var participants []*livekit.ParticipantInfo + participants, err = store.ListParticipants(ctx, livekit.RoomName(req.Room)) + if err == nil { + res = &livekit.ListParticipantsResponse{ + Participants: participants, + } + } + } else { + err = psrpc.ErrUnimplemented + } + if err != nil { return nil, err } @@ -170,7 +184,7 @@ func (s *RoomService) ListParticipants(ctx context.Context, req *livekit.ListPar return res, nil } -func (s *RoomService) GetParticipant(ctx context.Context, req *livekit.RoomParticipantIdentity) (*livekit.ParticipantInfo, error) { +func (s *RoomService) GetParticipant(ctx context.Context, req *livekit.RoomParticipantIdentity) (participant *livekit.ParticipantInfo, err error) { RecordRequest(ctx, req) AppendLogFields(ctx, "room", req.Room, "participant", req.Identity) @@ -178,7 +192,14 @@ func (s *RoomService) GetParticipant(ctx context.Context, req *livekit.RoomParti return nil, twirpAuthError(err) } - participant, err := s.roomClient.GetParticipant(ctx, s.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), req) + if s.apiConf.EnablePsrpcForGetListParticpants { + participant, err = s.roomClient.GetParticipant(ctx, s.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), req) + } else if store, ok := s.roomStore.(OSSServiceStore); ok { + participant, err = store.LoadParticipant(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity)) + } else { + err = psrpc.ErrUnimplemented + } + if err != nil { return nil, err } diff --git a/pkg/service/wire.go b/pkg/service/wire.go index 556f203a6..efcaa2bcc 100644 --- a/pkg/service/wire.go +++ b/pkg/service/wire.go @@ -56,7 +56,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live getNodeStatsConfig, routing.CreateRouter, getLimitConf, - config.DefaultAPIConfig, + getAPIConf, wire.Bind(new(routing.MessageRouter), new(routing.Router)), wire.Bind(new(livekit.RoomService), new(*RoomService)), telemetry.NewAnalyticsService, @@ -291,3 +291,7 @@ func getNodeStatsConfig(config *config.Config) config.NodeStatsConfig { func getAgentConfig(config *config.Config) agent.Config { return config.Agents } + +func getAPIConf(config *config.Config) config.APIConfig { + return config.API +} diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index f130ab902..2a3a7590c 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -37,7 +37,7 @@ import ( func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*LivekitServer, error) { limitConfig := getLimitConf(conf) - apiConfig := config.DefaultAPIConfig() + apiConfig := getAPIConf(conf) universalClient, err := createRedisClient(conf) if err != nil { return nil, err @@ -349,3 +349,7 @@ func getNodeStatsConfig(config2 *config.Config) config.NodeStatsConfig { func getAgentConfig(config2 *config.Config) agent.Config { return config2.Agents } + +func getAPIConf(config2 *config.Config) config.APIConfig { + return config2.API +}