From e467daa0d4bcae81d6b17e4a241a36fdbe8e6fb1 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Fri, 20 Jun 2025 16:22:19 -0400 Subject: [PATCH] move egress roomID load to launcher (#3748) * move egress roomID load to launcher * regenerate --- pkg/service/clients.go | 42 ------------------- pkg/service/egress.go | 90 +++++++++++++++++++++++++++++++++-------- pkg/service/wire_gen.go | 18 ++++----- 3 files changed, 83 insertions(+), 67 deletions(-) diff --git a/pkg/service/clients.go b/pkg/service/clients.go index 8a0eb39c1..d8fb33789 100644 --- a/pkg/service/clients.go +++ b/pkg/service/clients.go @@ -19,12 +19,8 @@ import ( "google.golang.org/protobuf/types/known/emptypb" - "github.com/livekit/livekit-server/pkg/rtc" "github.com/livekit/protocol/livekit" - "github.com/livekit/protocol/logger" "github.com/livekit/protocol/rpc" - "github.com/livekit/protocol/utils" - "github.com/livekit/protocol/utils/guid" ) //counterfeiter:generate . IOClient @@ -35,41 +31,3 @@ type IOClient interface { CreateIngress(ctx context.Context, req *livekit.IngressInfo) (*emptypb.Empty, error) UpdateIngressState(ctx context.Context, req *rpc.UpdateIngressStateRequest) (*emptypb.Empty, error) } - -type egressLauncher struct { - client rpc.EgressClient - io IOClient -} - -func NewEgressLauncher(client rpc.EgressClient, io IOClient) rtc.EgressLauncher { - if client == nil { - return nil - } - return &egressLauncher{ - client: client, - io: io, - } -} - -func (s *egressLauncher) StartEgress(ctx context.Context, req *rpc.StartEgressRequest) (*livekit.EgressInfo, error) { - if s.client == nil { - return nil, ErrEgressNotConnected - } - - // Ensure we have an Egress ID - if req.EgressId == "" { - req.EgressId = guid.New(utils.EgressPrefix) - } - - info, err := s.client.StartEgress(ctx, "", req) - if err != nil { - return nil, err - } - - _, err = s.io.CreateEgress(ctx, info) - if err != nil { - logger.Errorw("failed to create egress", err) - } - - return info, nil -} diff --git a/pkg/service/egress.go b/pkg/service/egress.go index a745caaf0..41dd73cd4 100644 --- a/pkg/service/egress.go +++ b/pkg/service/egress.go @@ -25,7 +25,10 @@ import ( "github.com/livekit/livekit-server/pkg/rtc" "github.com/livekit/protocol/egress" "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" "github.com/livekit/protocol/rpc" + "github.com/livekit/protocol/utils" + "github.com/livekit/protocol/utils/guid" ) type EgressService struct { @@ -33,25 +36,39 @@ type EgressService struct { client rpc.EgressClient io IOClient roomService livekit.RoomService - store ServiceStore +} + +type egressLauncher struct { + client rpc.EgressClient + io IOClient + store ServiceStore } func NewEgressService( client rpc.EgressClient, launcher rtc.EgressLauncher, - store ServiceStore, io IOClient, rs livekit.RoomService, ) *EgressService { return &EgressService{ client: client, - store: store, io: io, roomService: rs, launcher: launcher, } } +func NewEgressLauncher(client rpc.EgressClient, io IOClient, store ServiceStore) rtc.EgressLauncher { + if client == nil { + return nil + } + return &egressLauncher{ + client: client, + io: io, + store: store, + } +} + func (s *EgressService) StartRoomCompositeEgress(ctx context.Context, req *livekit.RoomCompositeEgressRequest) (*livekit.EgressInfo, error) { fields := []interface{}{ "room", req.RoomName, @@ -61,7 +78,7 @@ func (s *EgressService) StartRoomCompositeEgress(ctx context.Context, req *livek defer func() { AppendLogFields(ctx, fields...) }() - ei, err := s.startEgress(ctx, livekit.RoomName(req.RoomName), &rpc.StartEgressRequest{ + ei, err := s.startEgress(ctx, &rpc.StartEgressRequest{ Request: &rpc.StartEgressRequest_RoomComposite{ RoomComposite: req, }, @@ -81,7 +98,7 @@ func (s *EgressService) StartWebEgress(ctx context.Context, req *livekit.WebEgre defer func() { AppendLogFields(ctx, fields...) }() - ei, err := s.startEgress(ctx, "", &rpc.StartEgressRequest{ + ei, err := s.startEgress(ctx, &rpc.StartEgressRequest{ Request: &rpc.StartEgressRequest_Web{ Web: req, }, @@ -102,7 +119,7 @@ func (s *EgressService) StartParticipantEgress(ctx context.Context, req *livekit defer func() { AppendLogFields(ctx, fields...) }() - ei, err := s.startEgress(ctx, livekit.RoomName(req.RoomName), &rpc.StartEgressRequest{ + ei, err := s.startEgress(ctx, &rpc.StartEgressRequest{ Request: &rpc.StartEgressRequest_Participant{ Participant: req, }, @@ -124,7 +141,7 @@ func (s *EgressService) StartTrackCompositeEgress(ctx context.Context, req *live defer func() { AppendLogFields(ctx, fields...) }() - ei, err := s.startEgress(ctx, livekit.RoomName(req.RoomName), &rpc.StartEgressRequest{ + ei, err := s.startEgress(ctx, &rpc.StartEgressRequest{ Request: &rpc.StartEgressRequest_TrackComposite{ TrackComposite: req, }, @@ -144,7 +161,7 @@ func (s *EgressService) StartTrackEgress(ctx context.Context, req *livekit.Track defer func() { AppendLogFields(ctx, fields...) }() - ei, err := s.startEgress(ctx, livekit.RoomName(req.RoomName), &rpc.StartEgressRequest{ + ei, err := s.startEgress(ctx, &rpc.StartEgressRequest{ Request: &rpc.StartEgressRequest_Track{ Track: req, }, @@ -156,22 +173,63 @@ func (s *EgressService) StartTrackEgress(ctx context.Context, req *livekit.Track return ei, err } -func (s *EgressService) startEgress(ctx context.Context, roomName livekit.RoomName, req *rpc.StartEgressRequest) (*livekit.EgressInfo, error) { +func (s *EgressService) startEgress(ctx context.Context, req *rpc.StartEgressRequest) (*livekit.EgressInfo, error) { if err := EnsureRecordPermission(ctx); err != nil { return nil, twirpAuthError(err) } else if s.launcher == nil { return nil, ErrEgressNotConnected } - if roomName != "" { - room, _, err := s.store.LoadRoom(ctx, roomName, false) - if err != nil { - return nil, err - } - req.RoomId = room.Sid - } + return s.launcher.StartEgress(ctx, req) } +func (s *egressLauncher) StartEgress(ctx context.Context, req *rpc.StartEgressRequest) (*livekit.EgressInfo, error) { + if s.client == nil { + return nil, ErrEgressNotConnected + } + + // Ensure we have an Egress ID + if req.EgressId == "" { + req.EgressId = guid.New(utils.EgressPrefix) + } + + if req.RoomId == "" { + var roomName string + switch v := req.Request.(type) { + case *rpc.StartEgressRequest_RoomComposite: + roomName = v.RoomComposite.RoomName + case *rpc.StartEgressRequest_Web: + // no room name + case *rpc.StartEgressRequest_Participant: + roomName = v.Participant.RoomName + case *rpc.StartEgressRequest_TrackComposite: + roomName = v.TrackComposite.RoomName + case *rpc.StartEgressRequest_Track: + roomName = v.Track.RoomName + } + + if roomName != "" { + room, _, err := s.store.LoadRoom(ctx, livekit.RoomName(roomName), false) + if err != nil { + return nil, err + } + req.RoomId = room.Sid + } + } + + info, err := s.client.StartEgress(ctx, "", req) + if err != nil { + return nil, err + } + + _, err = s.io.CreateEgress(ctx, info) + if err != nil { + logger.Errorw("failed to create egress", err) + } + + return info, nil +} + type LayoutMetadata struct { Layout string `json:"layout"` } diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 5a08beb08..e95eb875d 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -87,26 +87,26 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live if err != nil { return nil, err } - rtcEgressLauncher := NewEgressLauncher(egressClient, ioInfoService) + rtcEgressLauncher := NewEgressLauncher(egressClient, ioInfoService, objectStore) topicFormatter := rpc.NewTopicFormatter() - roomClient, err := rpc.NewTypedRoomClient(clientParams) + v, err := rpc.NewTypedRoomClient(clientParams) if err != nil { return nil, err } - participantClient, err := rpc.NewTypedParticipantClient(clientParams) + v2, err := rpc.NewTypedParticipantClient(clientParams) if err != nil { return nil, err } - roomService, err := NewRoomService(limitConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, roomClient, participantClient) + roomService, err := NewRoomService(limitConfig, apiConfig, router, roomAllocator, objectStore, rtcEgressLauncher, topicFormatter, v, v2) if err != nil { return nil, err } - agentDispatchInternalClient, err := rpc.NewTypedAgentDispatchInternalClient(clientParams) + v3, err := rpc.NewTypedAgentDispatchInternalClient(clientParams) if err != nil { return nil, err } - agentDispatchService := NewAgentDispatchService(agentDispatchInternalClient, topicFormatter, roomAllocator, router) - egressService := NewEgressService(egressClient, rtcEgressLauncher, objectStore, ioInfoService, roomService) + agentDispatchService := NewAgentDispatchService(v3, topicFormatter, roomAllocator, router) + egressService := NewEgressService(egressClient, rtcEgressLauncher, ioInfoService, roomService) ingressConfig := getIngressConfig(conf) ingressClient, err := rpc.NewIngressClient(clientParams) if err != nil { @@ -120,11 +120,11 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live } sipService := NewSIPService(sipConfig, nodeID, messageBus, sipClient, sipStore, roomService, telemetryService) rtcService := NewRTCService(conf, roomAllocator, router, telemetryService) - rtcRestParticipantClient, err := rpc.NewTypedRTCRestParticipantClient(clientParams) + v4, err := rpc.NewTypedRTCRestParticipantClient(clientParams) if err != nil { return nil, err } - serviceRTCRestService, err := NewRTCRestService(conf, router, roomAllocator, clientParams, topicFormatter, rtcRestParticipantClient) + serviceRTCRestService, err := NewRTCRestService(conf, router, roomAllocator, clientParams, topicFormatter, v4) if err != nil { return nil, err }