Fallback to servicestore if rpc is unavailable (#4391)

* Fallback to servicestore if rpc is unavailable

compatibility mode for #4387

* conf
This commit is contained in:
cnderrauber
2026-03-25 11:09:52 +08:00
committed by GitHub
parent 59e9bb41b9
commit 1f1eeb6832
7 changed files with 47 additions and 6 deletions

View File

@@ -88,6 +88,8 @@ type Config struct {
NodeStats NodeStatsConfig `yaml:"node_stats,omitempty"`
EnableDataTracks bool `yaml:"enable_data_tracks,omitempty"`
API APIConfig `yaml:"api,omitempty"`
}
type RTCConfig struct {
@@ -308,6 +310,9 @@ type APIConfig struct {
// max amount of time to wait before checking for operation complete
MaxCheckInterval time.Duration `yaml:"max_check_interval,omitempty"`
// Backwards compatibility for room service api calls, will enable by default and remove in a future release
EnablePsrpcForGetListParticpants bool `yaml:"enable_psrpc_for_get_list_participants,omitempty"`
}
type PrometheusConfig struct {
@@ -434,6 +439,7 @@ var DefaultConfig = Config{
Metric: metric.DefaultMetricConfig,
WebHook: webhook.DefaultWebHookConfig,
NodeStats: DefaultNodeStatsConfig,
API: DefaultAPIConfig(),
}
func NewConfig(confString string, strictMode bool, c *cli.Command, baseFlags []cli.Flag) (*Config, error) {

View File

@@ -54,6 +54,8 @@ type ServiceStore interface {
type OSSServiceStore interface {
DeleteRoom(ctx context.Context, roomName livekit.RoomName) error
HasParticipant(context.Context, livekit.RoomName, livekit.ParticipantIdentity) (bool, error)
LoadParticipant(ctx context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity) (*livekit.ParticipantInfo, error)
ListParticipants(ctx context.Context, roomName livekit.RoomName) ([]*livekit.ParticipantInfo, error)
}
//counterfeiter:generate . EgressStore

View File

@@ -25,6 +25,8 @@ import (
"github.com/livekit/protocol/utils"
)
var _ OSSServiceStore = (*LocalStore)(nil)
// encapsulates CRUD operations for room settings
type LocalStore struct {
// map of roomName => room

View File

@@ -68,6 +68,8 @@ const (
maxRetries = 5
)
var _ OSSServiceStore = (*RedisStore)(nil)
type RedisStore struct {
rc redis.UniversalClient
unlockScript *redis.Script

View File

@@ -27,6 +27,7 @@ import (
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
"github.com/livekit/psrpc"
)
type RoomService struct {
@@ -153,7 +154,7 @@ func (s *RoomService) DeleteRoom(ctx context.Context, req *livekit.DeleteRoomReq
return res, err
}
func (s *RoomService) ListParticipants(ctx context.Context, req *livekit.ListParticipantsRequest) (*livekit.ListParticipantsResponse, error) {
func (s *RoomService) ListParticipants(ctx context.Context, req *livekit.ListParticipantsRequest) (res *livekit.ListParticipantsResponse, err error) {
RecordRequest(ctx, req)
AppendLogFields(ctx, "room", req.Room)
@@ -161,7 +162,20 @@ func (s *RoomService) ListParticipants(ctx context.Context, req *livekit.ListPar
return nil, twirpAuthError(err)
}
res, err := s.roomClient.ListParticipants(ctx, s.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), req)
if s.apiConf.EnablePsrpcForGetListParticpants {
res, err = s.roomClient.ListParticipants(ctx, s.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), req)
} else if store, ok := s.roomStore.(OSSServiceStore); ok {
var participants []*livekit.ParticipantInfo
participants, err = store.ListParticipants(ctx, livekit.RoomName(req.Room))
if err == nil {
res = &livekit.ListParticipantsResponse{
Participants: participants,
}
}
} else {
err = psrpc.ErrUnimplemented
}
if err != nil {
return nil, err
}
@@ -170,7 +184,7 @@ func (s *RoomService) ListParticipants(ctx context.Context, req *livekit.ListPar
return res, nil
}
func (s *RoomService) GetParticipant(ctx context.Context, req *livekit.RoomParticipantIdentity) (*livekit.ParticipantInfo, error) {
func (s *RoomService) GetParticipant(ctx context.Context, req *livekit.RoomParticipantIdentity) (participant *livekit.ParticipantInfo, err error) {
RecordRequest(ctx, req)
AppendLogFields(ctx, "room", req.Room, "participant", req.Identity)
@@ -178,7 +192,14 @@ func (s *RoomService) GetParticipant(ctx context.Context, req *livekit.RoomParti
return nil, twirpAuthError(err)
}
participant, err := s.roomClient.GetParticipant(ctx, s.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), req)
if s.apiConf.EnablePsrpcForGetListParticpants {
participant, err = s.roomClient.GetParticipant(ctx, s.topicFormatter.RoomTopic(ctx, livekit.RoomName(req.Room)), req)
} else if store, ok := s.roomStore.(OSSServiceStore); ok {
participant, err = store.LoadParticipant(ctx, livekit.RoomName(req.Room), livekit.ParticipantIdentity(req.Identity))
} else {
err = psrpc.ErrUnimplemented
}
if err != nil {
return nil, err
}

View File

@@ -56,7 +56,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
getNodeStatsConfig,
routing.CreateRouter,
getLimitConf,
config.DefaultAPIConfig,
getAPIConf,
wire.Bind(new(routing.MessageRouter), new(routing.Router)),
wire.Bind(new(livekit.RoomService), new(*RoomService)),
telemetry.NewAnalyticsService,
@@ -291,3 +291,7 @@ func getNodeStatsConfig(config *config.Config) config.NodeStatsConfig {
func getAgentConfig(config *config.Config) agent.Config {
return config.Agents
}
func getAPIConf(config *config.Config) config.APIConfig {
return config.API
}

View File

@@ -37,7 +37,7 @@ import (
func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*LivekitServer, error) {
limitConfig := getLimitConf(conf)
apiConfig := config.DefaultAPIConfig()
apiConfig := getAPIConf(conf)
universalClient, err := createRedisClient(conf)
if err != nil {
return nil, err
@@ -349,3 +349,7 @@ func getNodeStatsConfig(config2 *config.Config) config.NodeStatsConfig {
func getAgentConfig(config2 *config.Config) agent.Config {
return config2.Agents
}
func getAPIConf(config2 *config.Config) config.APIConfig {
return config2.API
}