move egress roomID load to launcher (#3748)

* move egress roomID load to launcher

* regenerate
This commit is contained in:
David Colburn
2025-06-20 16:22:19 -04:00
committed by GitHub
parent 3783ebb320
commit e467daa0d4
3 changed files with 83 additions and 67 deletions

View File

@@ -19,12 +19,8 @@ import (
"google.golang.org/protobuf/types/known/emptypb"
"github.com/livekit/livekit-server/pkg/rtc"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
"github.com/livekit/protocol/utils"
"github.com/livekit/protocol/utils/guid"
)
//counterfeiter:generate . IOClient
@@ -35,41 +31,3 @@ type IOClient interface {
CreateIngress(ctx context.Context, req *livekit.IngressInfo) (*emptypb.Empty, error)
UpdateIngressState(ctx context.Context, req *rpc.UpdateIngressStateRequest) (*emptypb.Empty, error)
}
type egressLauncher struct {
client rpc.EgressClient
io IOClient
}
func NewEgressLauncher(client rpc.EgressClient, io IOClient) rtc.EgressLauncher {
if client == nil {
return nil
}
return &egressLauncher{
client: client,
io: io,
}
}
func (s *egressLauncher) StartEgress(ctx context.Context, req *rpc.StartEgressRequest) (*livekit.EgressInfo, error) {
if s.client == nil {
return nil, ErrEgressNotConnected
}
// Ensure we have an Egress ID
if req.EgressId == "" {
req.EgressId = guid.New(utils.EgressPrefix)
}
info, err := s.client.StartEgress(ctx, "", req)
if err != nil {
return nil, err
}
_, err = s.io.CreateEgress(ctx, info)
if err != nil {
logger.Errorw("failed to create egress", err)
}
return info, nil
}

View File

@@ -25,7 +25,10 @@ import (
"github.com/livekit/livekit-server/pkg/rtc"
"github.com/livekit/protocol/egress"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
"github.com/livekit/protocol/utils"
"github.com/livekit/protocol/utils/guid"
)
type EgressService struct {
@@ -33,25 +36,39 @@ type EgressService struct {
client rpc.EgressClient
io IOClient
roomService livekit.RoomService
store ServiceStore
}
type egressLauncher struct {
client rpc.EgressClient
io IOClient
store ServiceStore
}
func NewEgressService(
client rpc.EgressClient,
launcher rtc.EgressLauncher,
store ServiceStore,
io IOClient,
rs livekit.RoomService,
) *EgressService {
return &EgressService{
client: client,
store: store,
io: io,
roomService: rs,
launcher: launcher,
}
}
func NewEgressLauncher(client rpc.EgressClient, io IOClient, store ServiceStore) rtc.EgressLauncher {
if client == nil {
return nil
}
return &egressLauncher{
client: client,
io: io,
store: store,
}
}
func (s *EgressService) StartRoomCompositeEgress(ctx context.Context, req *livekit.RoomCompositeEgressRequest) (*livekit.EgressInfo, error) {
fields := []interface{}{
"room", req.RoomName,
@@ -61,7 +78,7 @@ func (s *EgressService) StartRoomCompositeEgress(ctx context.Context, req *livek
defer func() {
AppendLogFields(ctx, fields...)
}()
ei, err := s.startEgress(ctx, livekit.RoomName(req.RoomName), &rpc.StartEgressRequest{
ei, err := s.startEgress(ctx, &rpc.StartEgressRequest{
Request: &rpc.StartEgressRequest_RoomComposite{
RoomComposite: req,
},
@@ -81,7 +98,7 @@ func (s *EgressService) StartWebEgress(ctx context.Context, req *livekit.WebEgre
defer func() {
AppendLogFields(ctx, fields...)
}()
ei, err := s.startEgress(ctx, "", &rpc.StartEgressRequest{
ei, err := s.startEgress(ctx, &rpc.StartEgressRequest{
Request: &rpc.StartEgressRequest_Web{
Web: req,
},
@@ -102,7 +119,7 @@ func (s *EgressService) StartParticipantEgress(ctx context.Context, req *livekit
defer func() {
AppendLogFields(ctx, fields...)
}()
ei, err := s.startEgress(ctx, livekit.RoomName(req.RoomName), &rpc.StartEgressRequest{
ei, err := s.startEgress(ctx, &rpc.StartEgressRequest{
Request: &rpc.StartEgressRequest_Participant{
Participant: req,
},
@@ -124,7 +141,7 @@ func (s *EgressService) StartTrackCompositeEgress(ctx context.Context, req *live
defer func() {
AppendLogFields(ctx, fields...)
}()
ei, err := s.startEgress(ctx, livekit.RoomName(req.RoomName), &rpc.StartEgressRequest{
ei, err := s.startEgress(ctx, &rpc.StartEgressRequest{
Request: &rpc.StartEgressRequest_TrackComposite{
TrackComposite: req,
},
@@ -144,7 +161,7 @@ func (s *EgressService) StartTrackEgress(ctx context.Context, req *livekit.Track
defer func() {
AppendLogFields(ctx, fields...)
}()
ei, err := s.startEgress(ctx, livekit.RoomName(req.RoomName), &rpc.StartEgressRequest{
ei, err := s.startEgress(ctx, &rpc.StartEgressRequest{
Request: &rpc.StartEgressRequest_Track{
Track: req,
},
@@ -156,22 +173,63 @@ func (s *EgressService) StartTrackEgress(ctx context.Context, req *livekit.Track
return ei, err
}
func (s *EgressService) startEgress(ctx context.Context, roomName livekit.RoomName, req *rpc.StartEgressRequest) (*livekit.EgressInfo, error) {
func (s *EgressService) startEgress(ctx context.Context, req *rpc.StartEgressRequest) (*livekit.EgressInfo, error) {
if err := EnsureRecordPermission(ctx); err != nil {
return nil, twirpAuthError(err)
} else if s.launcher == nil {
return nil, ErrEgressNotConnected
}
if roomName != "" {
room, _, err := s.store.LoadRoom(ctx, roomName, false)
if err != nil {
return nil, err
}
req.RoomId = room.Sid
}
return s.launcher.StartEgress(ctx, req)
}
func (s *egressLauncher) StartEgress(ctx context.Context, req *rpc.StartEgressRequest) (*livekit.EgressInfo, error) {
if s.client == nil {
return nil, ErrEgressNotConnected
}
// Ensure we have an Egress ID
if req.EgressId == "" {
req.EgressId = guid.New(utils.EgressPrefix)
}
if req.RoomId == "" {
var roomName string
switch v := req.Request.(type) {
case *rpc.StartEgressRequest_RoomComposite:
roomName = v.RoomComposite.RoomName
case *rpc.StartEgressRequest_Web:
// no room name
case *rpc.StartEgressRequest_Participant:
roomName = v.Participant.RoomName
case *rpc.StartEgressRequest_TrackComposite:
roomName = v.TrackComposite.RoomName
case *rpc.StartEgressRequest_Track:
roomName = v.Track.RoomName
}
if roomName != "" {
room, _, err := s.store.LoadRoom(ctx, livekit.RoomName(roomName), false)
if err != nil {
return nil, err
}
req.RoomId = room.Sid
}
}
info, err := s.client.StartEgress(ctx, "", req)
if err != nil {
return nil, err
}
_, err = s.io.CreateEgress(ctx, info)
if err != nil {
logger.Errorw("failed to create egress", err)
}
return info, nil
}
type LayoutMetadata struct {
Layout string `json:"layout"`
}

View File

@@ -87,26 +87,26 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
if err != nil {
return nil, err
}
rtcEgressLauncher := NewEgressLauncher(egressClient, ioInfoService)
rtcEgressLauncher := NewEgressLauncher(egressClient, ioInfoService, objectStore)
topicFormatter := rpc.NewTopicFormatter()
roomClient, err := rpc.NewTypedRoomClient(clientParams)
v, err := rpc.NewTypedRoomClient(clientParams)
if err != nil {
return nil, err
}
participantClient, err := rpc.NewTypedParticipantClient(clientParams)
v2, err := rpc.NewTypedParticipantClient(clientParams)
if err != nil {
return nil, err
}
roomService, err := NewRoomService(limitConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, roomClient, participantClient)
roomService, err := NewRoomService(limitConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, v, v2)
if err != nil {
return nil, err
}
agentDispatchInternalClient, err := rpc.NewTypedAgentDispatchInternalClient(clientParams)
v3, err := rpc.NewTypedAgentDispatchInternalClient(clientParams)
if err != nil {
return nil, err
}
agentDispatchService := NewAgentDispatchService(agentDispatchInternalClient, topicFormatter, roomAllocator, router)
egressService := NewEgressService(egressClient, rtcEgressLauncher, objectStore, ioInfoService, roomService)
agentDispatchService := NewAgentDispatchService(v3, topicFormatter, roomAllocator, router)
egressService := NewEgressService(egressClient, rtcEgressLauncher, ioInfoService, roomService)
ingressConfig := getIngressConfig(conf)
ingressClient, err := rpc.NewIngressClient(clientParams)
if err != nil {
@@ -120,11 +120,11 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
}
sipService := NewSIPService(sipConfig, nodeID, messageBus, sipClient, sipStore, roomService, telemetryService)
rtcService := NewRTCService(conf, roomAllocator, router, telemetryService)
rtcRestParticipantClient, err := rpc.NewTypedRTCRestParticipantClient(clientParams)
v4, err := rpc.NewTypedRTCRestParticipantClient(clientParams)
if err != nil {
return nil, err
}
serviceRTCRestService, err := NewRTCRestService(conf, router, roomAllocator, clientParams, topicFormatter, rtcRestParticipantClient)
serviceRTCRestService, err := NewRTCRestService(conf, router, roomAllocator, clientParams, topicFormatter, v4)
if err != nil {
return nil, err
}