From 57643a42ed80be56187f48e3d09f769ff2fd723a Mon Sep 17 00:00:00 2001 From: David Colburn Date: Tue, 7 Nov 2023 19:19:07 -0800 Subject: [PATCH] Agents enabled check (#2227) * agents enabled check * participant -> publisher * nil check client * add NumConnections * add lock around agent check * do not launch agents against other agents * regen * don't need atomic anymore * update protocol --- go.mod | 2 +- go.sum | 4 +- pkg/rtc/agentclient.go | 92 +++++++++++++++ pkg/rtc/{clients.go => egress.go} | 4 - pkg/rtc/participant.go | 7 ++ pkg/rtc/room.go | 60 +++++++--- pkg/rtc/types/interfaces.go | 1 + .../typesfakes/fake_local_participant.go | 65 +++++++++++ pkg/rtc/types/typesfakes/fake_participant.go | 65 +++++++++++ pkg/service/agentservice.go | 106 +++++++++++------- pkg/service/clients.go | 12 -- pkg/service/roomservice.go | 27 +++-- pkg/service/wire.go | 3 +- pkg/service/wire_gen.go | 14 ++- test/agent_test.go | 14 +++ 15 files changed, 385 insertions(+), 91 deletions(-) create mode 100644 pkg/rtc/agentclient.go rename pkg/rtc/{clients.go => egress.go} (98%) diff --git a/go.mod b/go.mod index 6202fccc7..df947357d 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e - github.com/livekit/protocol v1.9.1-0.20231103182211-6d382559cf42 + github.com/livekit/protocol v1.9.1-0.20231107185101-e230ee2d840e github.com/livekit/psrpc v0.5.0 github.com/mackerelio/go-osstat v0.2.4 github.com/magefile/mage v1.15.0 diff --git a/go.sum b/go.sum index 1c09de005..c59b38f48 100644 --- a/go.sum +++ b/go.sum @@ -125,8 +125,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e h1:yNeIo7MSMUWgoLu7LkNKnBYnJBFPFH9Wq4S6h1kS44M= github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY= -github.com/livekit/protocol v1.9.1-0.20231103182211-6d382559cf42 h1:uDziAK5uhQPOj0fCKl+YyJx51tdFORLjC+rHgNNBCmY= -github.com/livekit/protocol v1.9.1-0.20231103182211-6d382559cf42/go.mod h1:l2WjlZWErS6vBlQaQyCGwWLt1aOx10XfQTsmvLjJWFQ= +github.com/livekit/protocol v1.9.1-0.20231107185101-e230ee2d840e h1:YShBpEjkEBY7yil2gjMWlkVkxs3OI58LIIYsBdb8aBU= +github.com/livekit/protocol v1.9.1-0.20231107185101-e230ee2d840e/go.mod h1:l2WjlZWErS6vBlQaQyCGwWLt1aOx10XfQTsmvLjJWFQ= github.com/livekit/psrpc v0.5.0 h1:g+yYNSs6Y1/vM7UlFkB2s/ARe2y3RKWZhX8ata5j+eo= github.com/livekit/psrpc v0.5.0/go.mod h1:1XYH1LLoD/YbvBvt6xg2KQ/J3InLXSJK6PL/+DKmuAU= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= diff --git a/pkg/rtc/agentclient.go b/pkg/rtc/agentclient.go new file mode 100644 index 000000000..65be3ba7c --- /dev/null +++ b/pkg/rtc/agentclient.go @@ -0,0 +1,92 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rtc + +import ( + "context" + "time" + + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" + "github.com/livekit/protocol/rpc" + "github.com/livekit/psrpc" +) + +const ( + RoomAgentTopic = "room" + PublisherAgentTopic = "publisher" +) + +type AgentClient interface { + CheckEnabled(ctx context.Context, req *rpc.CheckEnabledRequest) *rpc.CheckEnabledResponse + JobRequest(ctx context.Context, job *livekit.Job) +} + +type agentClient struct { + client rpc.AgentInternalClient +} + +func NewAgentClient(bus psrpc.MessageBus) (AgentClient, error) { + client, err := rpc.NewAgentInternalClient(bus) + if err != nil { + return nil, err + } + return &agentClient{client: client}, nil +} + +func (c *agentClient) CheckEnabled(ctx context.Context, req *rpc.CheckEnabledRequest) *rpc.CheckEnabledResponse { + res := &rpc.CheckEnabledResponse{} + resChan, err := c.client.CheckEnabled(ctx, req, psrpc.WithRequestTimeout(time.Second)) + if err != nil { + return res + } + + for r := range resChan { + if r.Err != nil { + continue + } + if r.Result.RoomEnabled { + res.RoomEnabled = true + if res.PublisherEnabled { + return res + } + } + if r.Result.PublisherEnabled { + res.PublisherEnabled = true + if res.RoomEnabled { + return res + } + } + } + + return res +} + +func (c *agentClient) JobRequest(ctx context.Context, job *livekit.Job) { + var topic string + var logError bool + switch job.Type { + case livekit.JobType_JT_ROOM: + topic = RoomAgentTopic + case livekit.JobType_JT_PUBLISHER: + topic = PublisherAgentTopic + logError = true + } + + _, err := c.client.JobRequest(ctx, topic, job) + if err != nil && logError { + logger.Warnw("agent job request failed", err) + } +} diff --git a/pkg/rtc/clients.go b/pkg/rtc/egress.go similarity index 98% rename from pkg/rtc/clients.go rename to pkg/rtc/egress.go index eeaf22869..db6d12813 100644 --- a/pkg/rtc/clients.go +++ b/pkg/rtc/egress.go @@ -27,10 +27,6 @@ import ( "github.com/livekit/protocol/webhook" ) -type AgentClient interface { - JobRequest(ctx context.Context, job *livekit.Job) -} - type EgressLauncher interface { StartEgress(context.Context, *rpc.StartEgressRequest) (*livekit.EgressInfo, error) StartEgressWithClusterId(ctx context.Context, clusterId string, req *rpc.StartEgressRequest) (*livekit.EgressInfo, error) diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 21134b0f1..404cbf8a2 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -1034,6 +1034,13 @@ func (p *ParticipantImpl) IsRecorder() bool { return p.grants.Video.Recorder } +func (p *ParticipantImpl) IsAgent() bool { + p.lock.RLock() + defer p.lock.RUnlock() + + return p.grants.Video.Agent +} + func (p *ParticipantImpl) VerifySubscribeParticipantInfo(pID livekit.ParticipantID, version uint32) { if !p.IsReady() { // we have not sent a JoinResponse yet. metadata would be covered in JoinResponse diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 7ac27ca4a..e35fbc738 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -30,6 +30,7 @@ import ( "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" + "github.com/livekit/protocol/rpc" "github.com/livekit/protocol/utils" "github.com/livekit/livekit-server/pkg/config" @@ -74,15 +75,18 @@ type Room struct { audioConfig *config.AudioConfig serverInfo *livekit.ServerInfo telemetry telemetry.TelemetryService - agentClient AgentClient egressLauncher EgressLauncher trackManager *RoomTrackManager + // agents + agentClient AgentClient + publisherAgentsEnabled bool + // map of identity -> Participant participants map[livekit.ParticipantIdentity]types.LocalParticipant participantOpts map[livekit.ParticipantIdentity]*ParticipantOptions participantRequestSources map[livekit.ParticipantIdentity]routing.MessageSource - hasPublished sync.Map // map of identity -> bool + hasPublished map[livekit.ParticipantIdentity]bool bufferFactory *buffer.FactoryOfBufferFactory // batch update participant info for non-publishers @@ -135,11 +139,13 @@ func NewRoom( participants: make(map[livekit.ParticipantIdentity]types.LocalParticipant), participantOpts: make(map[livekit.ParticipantIdentity]*ParticipantOptions), participantRequestSources: make(map[livekit.ParticipantIdentity]routing.MessageSource), + hasPublished: make(map[livekit.ParticipantIdentity]bool), bufferFactory: buffer.NewFactoryOfBufferFactory(config.Receiver.PacketBufferSize), batchedUpdates: make(map[livekit.ParticipantIdentity]*livekit.ParticipantInfo), closed: make(chan struct{}), trailer: []byte(utils.RandomSecret()), } + r.protoProxy = utils.NewProtoProxy[*livekit.Room](roomUpdateInterval, r.updateProto) if r.protoRoom.EmptyTimeout == 0 { r.protoRoom.EmptyTimeout = DefaultEmptyTimeout @@ -148,6 +154,21 @@ func NewRoom( r.protoRoom.CreationTime = time.Now().Unix() } + if agentClient != nil { + go func() { + res := r.agentClient.CheckEnabled(context.Background(), &rpc.CheckEnabledRequest{}) + if res.PublisherEnabled { + r.lock.Lock() + r.publisherAgentsEnabled = true + // if there are already published tracks, start the agents + for identity := range r.hasPublished { + r.launchPublisherAgent(r.participants[identity]) + } + r.lock.Unlock() + } + }() + } + go r.audioUpdateWorker() go r.connectionQualityWorker() go r.changeUpdateWorker() @@ -477,6 +498,7 @@ func (r *Room) RemoveParticipant(identity livekit.ParticipantIdentity, pID livek delete(r.participants, identity) delete(r.participantOpts, identity) delete(r.participantRequestSources, identity) + delete(r.hasPublished, identity) if !p.Hidden() { r.protoRoom.NumParticipants-- } @@ -512,7 +534,6 @@ func (r *Room) RemoveParticipant(identity livekit.ParticipantIdentity, pID livek for _, t := range p.GetPublishedTracks() { r.trackManager.RemoveTrack(t) } - r.hasPublished.Delete(p.Identity()) p.OnTrackUpdated(nil) p.OnTrackPublished(nil) @@ -902,17 +923,15 @@ func (r *Room) onTrackPublished(participant types.LocalParticipant, track types. r.trackManager.AddTrack(track, participant.Identity(), participant.ID()) // launch jobs - _, hasPublished := r.hasPublished.Swap(participant.Identity(), true) + r.lock.Lock() + hasPublished := r.hasPublished[participant.Identity()] + r.hasPublished[participant.Identity()] = true + publisherAgentsEnabled := r.publisherAgentsEnabled + r.lock.Unlock() + if !hasPublished { - if r.agentClient != nil { - go func() { - r.agentClient.JobRequest(context.Background(), &livekit.Job{ - Id: utils.NewGuid("JP_"), - Type: livekit.JobType_JT_PUBLISHER, - Room: r.protoRoom, - Participant: participant.ToProto(), - }) - }() + if publisherAgentsEnabled { + r.launchPublisherAgent(participant) } if r.internal != nil && r.internal.ParticipantEgress != nil { go func() { @@ -1302,6 +1321,21 @@ func (r *Room) connectionQualityWorker() { } } +func (r *Room) launchPublisherAgent(p types.Participant) { + if p == nil || p.IsRecorder() || p.IsAgent() { + return + } + + go func() { + r.agentClient.JobRequest(context.Background(), &livekit.Job{ + Id: utils.NewGuid("JP_"), + Type: livekit.JobType_JT_PUBLISHER, + Room: r.ToProto(), + Participant: p.ToProto(), + }) + }() +} + func (r *Room) DebugInfo() map[string]interface{} { info := map[string]interface{}{ "Name": r.protoRoom.Name, diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index 6c2210613..bef8abb6e 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -264,6 +264,7 @@ type Participant interface { // permissions Hidden() bool IsRecorder() bool + IsAgent() bool Start() Close(sendLeave bool, reason ParticipantCloseReason, isExpectedToResume bool) error diff --git a/pkg/rtc/types/typesfakes/fake_local_participant.go b/pkg/rtc/types/typesfakes/fake_local_participant.go index 4020502be..9e11470c3 100644 --- a/pkg/rtc/types/typesfakes/fake_local_participant.go +++ b/pkg/rtc/types/typesfakes/fake_local_participant.go @@ -408,6 +408,16 @@ type FakeLocalParticipant struct { identityReturnsOnCall map[int]struct { result1 livekit.ParticipantIdentity } + IsAgentStub func() bool + isAgentMutex sync.RWMutex + isAgentArgsForCall []struct { + } + isAgentReturns struct { + result1 bool + } + isAgentReturnsOnCall map[int]struct { + result1 bool + } IsClosedStub func() bool isClosedMutex sync.RWMutex isClosedArgsForCall []struct { @@ -2986,6 +2996,59 @@ func (fake *FakeLocalParticipant) IdentityReturnsOnCall(i int, result1 livekit.P }{result1} } +func (fake *FakeLocalParticipant) IsAgent() bool { + fake.isAgentMutex.Lock() + ret, specificReturn := fake.isAgentReturnsOnCall[len(fake.isAgentArgsForCall)] + fake.isAgentArgsForCall = append(fake.isAgentArgsForCall, struct { + }{}) + stub := fake.IsAgentStub + fakeReturns := fake.isAgentReturns + fake.recordInvocation("IsAgent", []interface{}{}) + fake.isAgentMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeLocalParticipant) IsAgentCallCount() int { + fake.isAgentMutex.RLock() + defer fake.isAgentMutex.RUnlock() + return len(fake.isAgentArgsForCall) +} + +func (fake *FakeLocalParticipant) IsAgentCalls(stub func() bool) { + fake.isAgentMutex.Lock() + defer fake.isAgentMutex.Unlock() + fake.IsAgentStub = stub +} + +func (fake *FakeLocalParticipant) IsAgentReturns(result1 bool) { + fake.isAgentMutex.Lock() + defer fake.isAgentMutex.Unlock() + fake.IsAgentStub = nil + fake.isAgentReturns = struct { + result1 bool + }{result1} +} + +func (fake *FakeLocalParticipant) IsAgentReturnsOnCall(i int, result1 bool) { + fake.isAgentMutex.Lock() + defer fake.isAgentMutex.Unlock() + fake.IsAgentStub = nil + if fake.isAgentReturnsOnCall == nil { + fake.isAgentReturnsOnCall = make(map[int]struct { + result1 bool + }) + } + fake.isAgentReturnsOnCall[i] = struct { + result1 bool + }{result1} +} + func (fake *FakeLocalParticipant) IsClosed() bool { fake.isClosedMutex.Lock() ret, specificReturn := fake.isClosedReturnsOnCall[len(fake.isClosedArgsForCall)] @@ -6031,6 +6094,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} { defer fake.iDMutex.RUnlock() fake.identityMutex.RLock() defer fake.identityMutex.RUnlock() + fake.isAgentMutex.RLock() + defer fake.isAgentMutex.RUnlock() fake.isClosedMutex.RLock() defer fake.isClosedMutex.RUnlock() fake.isDisconnectedMutex.RLock() diff --git a/pkg/rtc/types/typesfakes/fake_participant.go b/pkg/rtc/types/typesfakes/fake_participant.go index fa92204fe..3f46bdbba 100644 --- a/pkg/rtc/types/typesfakes/fake_participant.go +++ b/pkg/rtc/types/typesfakes/fake_participant.go @@ -118,6 +118,16 @@ type FakeParticipant struct { identityReturnsOnCall map[int]struct { result1 livekit.ParticipantIdentity } + IsAgentStub func() bool + isAgentMutex sync.RWMutex + isAgentArgsForCall []struct { + } + isAgentReturns struct { + result1 bool + } + isAgentReturnsOnCall map[int]struct { + result1 bool + } IsPublisherStub func() bool isPublisherMutex sync.RWMutex isPublisherArgsForCall []struct { @@ -780,6 +790,59 @@ func (fake *FakeParticipant) IdentityReturnsOnCall(i int, result1 livekit.Partic }{result1} } +func (fake *FakeParticipant) IsAgent() bool { + fake.isAgentMutex.Lock() + ret, specificReturn := fake.isAgentReturnsOnCall[len(fake.isAgentArgsForCall)] + fake.isAgentArgsForCall = append(fake.isAgentArgsForCall, struct { + }{}) + stub := fake.IsAgentStub + fakeReturns := fake.isAgentReturns + fake.recordInvocation("IsAgent", []interface{}{}) + fake.isAgentMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeParticipant) IsAgentCallCount() int { + fake.isAgentMutex.RLock() + defer fake.isAgentMutex.RUnlock() + return len(fake.isAgentArgsForCall) +} + +func (fake *FakeParticipant) IsAgentCalls(stub func() bool) { + fake.isAgentMutex.Lock() + defer fake.isAgentMutex.Unlock() + fake.IsAgentStub = stub +} + +func (fake *FakeParticipant) IsAgentReturns(result1 bool) { + fake.isAgentMutex.Lock() + defer fake.isAgentMutex.Unlock() + fake.IsAgentStub = nil + fake.isAgentReturns = struct { + result1 bool + }{result1} +} + +func (fake *FakeParticipant) IsAgentReturnsOnCall(i int, result1 bool) { + fake.isAgentMutex.Lock() + defer fake.isAgentMutex.Unlock() + fake.IsAgentStub = nil + if fake.isAgentReturnsOnCall == nil { + fake.isAgentReturnsOnCall = make(map[int]struct { + result1 bool + }) + } + fake.isAgentReturnsOnCall[i] = struct { + result1 bool + }{result1} +} + func (fake *FakeParticipant) IsPublisher() bool { fake.isPublisherMutex.Lock() ret, specificReturn := fake.isPublisherReturnsOnCall[len(fake.isPublisherArgsForCall)] @@ -1318,6 +1381,8 @@ func (fake *FakeParticipant) Invocations() map[string][][]interface{} { defer fake.iDMutex.RUnlock() fake.identityMutex.RLock() defer fake.identityMutex.RUnlock() + fake.isAgentMutex.RLock() + defer fake.isAgentMutex.RUnlock() fake.isPublisherMutex.RLock() defer fake.isPublisherMutex.RUnlock() fake.isRecorderMutex.RLock() diff --git a/pkg/service/agentservice.go b/pkg/service/agentservice.go index bb7a9afb0..baaf40da4 100644 --- a/pkg/service/agentservice.go +++ b/pkg/service/agentservice.go @@ -34,6 +34,8 @@ import ( "github.com/livekit/psrpc" ) +const AgentServiceVersion = "0.1.0" + type AgentService struct { upgrader websocket.Upgrader @@ -41,17 +43,17 @@ type AgentService struct { } type AgentHandler struct { - agentServer rpc.AgentInternalServer - roomTopic string - participantTopic string + agentServer rpc.AgentInternalServer + roomTopic string + publisherTopic string - mu sync.Mutex - availability map[string]chan *availability - unregistered map[*websocket.Conn]*worker - roomRegistered bool - roomWorkers map[string]*worker - participantRegistered bool - participantWorkers map[string]*worker + mu sync.Mutex + availability map[string]chan *availability + unregistered map[*websocket.Conn]*worker + roomRegistered bool + roomWorkers map[string]*worker + publisherRegistered bool + publisherWorkers map[string]*worker } type worker struct { @@ -85,7 +87,7 @@ func NewAgentService(bus psrpc.MessageBus) (*AgentService, error) { if err != nil { return nil, err } - s.AgentHandler = NewAgentHandler(agentServer, "room", "participant") + s.AgentHandler = NewAgentHandler(agentServer, rtc.RoomAgentTopic, rtc.PublisherAgentTopic) return s, nil } @@ -114,15 +116,15 @@ func (s *AgentService) ServeHTTP(writer http.ResponseWriter, r *http.Request) { s.HandleConnection(conn) } -func NewAgentHandler(agentServer rpc.AgentInternalServer, roomTopic, participantTopic string) *AgentHandler { +func NewAgentHandler(agentServer rpc.AgentInternalServer, roomTopic, publisherTopic string) *AgentHandler { return &AgentHandler{ - agentServer: agentServer, - roomTopic: roomTopic, - participantTopic: participantTopic, - availability: make(map[string]chan *availability), - unregistered: make(map[*websocket.Conn]*worker), - roomWorkers: make(map[string]*worker), - participantWorkers: make(map[string]*worker), + agentServer: agentServer, + roomTopic: roomTopic, + publisherTopic: publisherTopic, + availability: make(map[string]chan *availability), + unregistered: make(map[*websocket.Conn]*worker), + roomWorkers: make(map[string]*worker), + publisherWorkers: make(map[string]*worker), } } @@ -150,10 +152,10 @@ func (s *AgentHandler) HandleConnection(conn *websocket.Conn) { s.agentServer.DeregisterJobRequestTopic(s.roomTopic) } case livekit.JobType_JT_PUBLISHER: - delete(s.participantWorkers, w.id) - if s.participantRegistered && !s.participantAvailableLocked() { - s.participantRegistered = false - s.agentServer.DeregisterJobRequestTopic(s.participantTopic) + delete(s.publisherWorkers, w.id) + if s.publisherRegistered && !s.publisherAvailableLocked() { + s.publisherRegistered = false + s.agentServer.DeregisterJobRequestTopic(s.publisherTopic) } } } @@ -217,26 +219,29 @@ func (s *AgentHandler) handleRegister(worker *worker, msg *livekit.RegisterWorke case livekit.JobType_JT_PUBLISHER: worker.id = msg.WorkerId delete(s.unregistered, worker.conn) - s.participantWorkers[worker.id] = worker + s.publisherWorkers[worker.id] = worker - if !s.participantRegistered { - err := s.agentServer.RegisterJobRequestTopic(s.participantTopic) + if !s.publisherRegistered { + err := s.agentServer.RegisterJobRequestTopic(s.publisherTopic) if err != nil { - logger.Errorw("failed to register participant agents", err) + logger.Errorw("failed to register publisher agents", err) } else { - s.participantRegistered = true + s.publisherRegistered = true } } } - worker.sigConn.WriteServerMessage(&livekit.ServerMessage{ + _, err := worker.sigConn.WriteServerMessage(&livekit.ServerMessage{ Message: &livekit.ServerMessage_Register{ Register: &livekit.RegisterWorkerResponse{ WorkerId: worker.id, - ServerVersion: "version", + ServerVersion: AgentServiceVersion, }, }, }) + if err != nil { + logger.Errorw("failed to write server message", err) + } } func (s *AgentHandler) handleAvailability(w *worker, msg *livekit.AvailabilityResponse) { @@ -286,19 +291,29 @@ func (s *AgentHandler) handleStatus(w *worker, msg *livekit.UpdateWorkerStatus) } } case livekit.JobType_JT_PUBLISHER: - if s.participantRegistered && !s.participantAvailableLocked() { - s.participantRegistered = false - s.agentServer.DeregisterJobRequestTopic(s.participantTopic) - } else if !s.participantRegistered && s.participantAvailableLocked() { - if err := s.agentServer.RegisterJobRequestTopic(s.participantTopic); err != nil { - logger.Errorw("failed to register participant agents", err) + if s.publisherRegistered && !s.publisherAvailableLocked() { + s.publisherRegistered = false + s.agentServer.DeregisterJobRequestTopic(s.publisherTopic) + } else if !s.publisherRegistered && s.publisherAvailableLocked() { + if err := s.agentServer.RegisterJobRequestTopic(s.publisherTopic); err != nil { + logger.Errorw("failed to register publisher agents", err) } else { - s.participantRegistered = true + s.publisherRegistered = true } } } } +func (s *AgentHandler) CheckEnabled(_ context.Context, _ *rpc.CheckEnabledRequest) (*rpc.CheckEnabledResponse, error) { + s.mu.Lock() + res := &rpc.CheckEnabledResponse{ + RoomEnabled: len(s.roomWorkers) > 0, + PublisherEnabled: len(s.publisherWorkers) > 0, + } + s.mu.Unlock() + return res, nil +} + func (s *AgentHandler) JobRequest(ctx context.Context, job *livekit.Job) (*emptypb.Empty, error) { s.mu.Lock() ac := make(chan *availability, 100) @@ -316,7 +331,7 @@ func (s *AgentHandler) JobRequest(ctx context.Context, job *livekit.Job) (*empty case livekit.JobType_JT_ROOM: pool = s.roomWorkers case livekit.JobType_JT_PUBLISHER: - pool = s.participantWorkers + pool = s.publisherWorkers } attempted := make(map[string]bool) @@ -386,7 +401,7 @@ func (s *AgentHandler) JobRequestAffinity(ctx context.Context, job *livekit.Job) case livekit.JobType_JT_ROOM: pool = s.roomWorkers case livekit.JobType_JT_PUBLISHER: - pool = s.participantWorkers + pool = s.publisherWorkers } var affinity float32 @@ -403,6 +418,13 @@ func (s *AgentHandler) JobRequestAffinity(ctx context.Context, job *livekit.Job) return affinity } +func (s *AgentHandler) NumConnections() int { + s.mu.Lock() + defer s.mu.Unlock() + + return len(s.unregistered) + len(s.roomWorkers) + len(s.publisherWorkers) +} + func (s *AgentHandler) DrainConnections(interval time.Duration) { // jitter drain start time.Sleep(time.Duration(rand.Int63n(int64(interval)))) @@ -421,7 +443,7 @@ func (s *AgentHandler) DrainConnections(interval time.Duration) { _ = w.conn.Close() <-t.C } - for _, w := range s.participantWorkers { + for _, w := range s.publisherWorkers { _ = w.conn.Close() <-t.C } @@ -436,8 +458,8 @@ func (s *AgentHandler) roomAvailableLocked() bool { return false } -func (s *AgentHandler) participantAvailableLocked() bool { - for _, w := range s.participantWorkers { +func (s *AgentHandler) publisherAvailableLocked() bool { + for _, w := range s.publisherWorkers { if w.status == livekit.WorkerStatus_WS_AVAILABLE { return true } diff --git a/pkg/service/clients.go b/pkg/service/clients.go index cad477a93..53c0e415b 100644 --- a/pkg/service/clients.go +++ b/pkg/service/clients.go @@ -26,18 +26,6 @@ import ( "github.com/livekit/protocol/utils" ) -type agentClient struct { - s *AgentService -} - -func NewAgentClient(s *AgentService) rtc.AgentClient { - return &agentClient{s} -} - -func (c *agentClient) JobRequest(ctx context.Context, job *livekit.Job) { - _, _ = c.s.JobRequest(ctx, job) -} - type IOClient interface { CreateEgress(ctx context.Context, info *livekit.EgressInfo) (*emptypb.Empty, error) GetEgress(ctx context.Context, req *rpc.GetEgressRequest) (*livekit.EgressInfo, error) diff --git a/pkg/service/roomservice.go b/pkg/service/roomservice.go index 0576bd9c2..174ea3391 100644 --- a/pkg/service/roomservice.go +++ b/pkg/service/roomservice.go @@ -117,25 +117,28 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq } if created { - if s.agentClient != nil { + go func() { s.agentClient.JobRequest(ctx, &livekit.Job{ Id: utils.NewGuid("JR_"), Type: livekit.JobType_JT_ROOM, Room: rm, }) - } + }() + if req.Egress != nil && req.Egress.Room != nil { - egress := &rpc.StartEgressRequest{ + _, err = s.egressLauncher.StartEgress(ctx, &rpc.StartEgressRequest{ Request: &rpc.StartEgressRequest_RoomComposite{ RoomComposite: req.Egress.Room, }, RoomId: rm.Sid, + }) + if err != nil { + return nil, err } - _, err = s.egressLauncher.StartEgress(ctx, egress) } } - return rm, err + return rm, nil } func (s *RoomService) ListRooms(ctx context.Context, req *livekit.ListRoomsRequest) (*livekit.ListRoomsResponse, error) { @@ -478,12 +481,14 @@ func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat return nil, err } - if created && s.agentClient != nil { - s.agentClient.JobRequest(ctx, &livekit.Job{ - Id: utils.NewGuid("JR_"), - Type: livekit.JobType_JT_ROOM, - Room: room, - }) + if created { + go func() { + s.agentClient.JobRequest(ctx, &livekit.Job{ + Id: utils.NewGuid("JR_"), + Type: livekit.JobType_JT_ROOM, + Room: room, + }) + }() } return room, nil diff --git a/pkg/service/wire.go b/pkg/service/wire.go index f53297d8f..afca2631e 100644 --- a/pkg/service/wire.go +++ b/pkg/service/wire.go @@ -30,6 +30,7 @@ import ( "github.com/livekit/livekit-server/pkg/clientconfiguration" "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" + "github.com/livekit/livekit-server/pkg/rtc" "github.com/livekit/livekit-server/pkg/telemetry" "github.com/livekit/livekit-server/pkg/telemetry/prometheus" "github.com/livekit/protocol/auth" @@ -73,7 +74,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live NewRoomService, NewRTCService, NewAgentService, - NewAgentClient, + rtc.NewAgentClient, getSignalRelayConfig, NewDefaultSignalServer, routing.NewSignalClient, diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 30a61b66c..3a6470b88 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -11,6 +11,7 @@ import ( "github.com/livekit/livekit-server/pkg/clientconfiguration" "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" + "github.com/livekit/livekit-server/pkg/rtc" "github.com/livekit/livekit-server/pkg/telemetry" "github.com/livekit/livekit-server/pkg/telemetry/prometheus" "github.com/livekit/protocol/auth" @@ -55,11 +56,10 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live if err != nil { return nil, err } - agentService, err := NewAgentService(messageBus) + agentClient, err := rtc.NewAgentClient(messageBus) if err != nil { return nil, err } - rtcAgentClient := NewAgentClient(agentService) egressClient, err := rpc.NewEgressClient(messageBus) if err != nil { return nil, err @@ -91,7 +91,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live if err != nil { return nil, err } - roomService, err := NewRoomService(roomConfig, apiConfig, psrpcConfig, router, roomAllocator, objectStore, rtcAgentClient, rtcEgressLauncher, topicFormatter, roomClient, participantClient) + roomService, err := NewRoomService(roomConfig, apiConfig, psrpcConfig, router, roomAllocator, objectStore, agentClient, rtcEgressLauncher, topicFormatter, roomClient, participantClient) if err != nil { return nil, err } @@ -102,11 +102,15 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live return nil, err } ingressService := NewIngressService(ingressConfig, nodeID, messageBus, ingressClient, ingressStore, roomService, telemetryService) - rtcService := NewRTCService(conf, roomAllocator, objectStore, router, currentNode, rtcAgentClient, telemetryService) + rtcService := NewRTCService(conf, roomAllocator, objectStore, router, currentNode, agentClient, telemetryService) + agentService, err := NewAgentService(messageBus) + if err != nil { + return nil, err + } clientConfigurationManager := createClientConfiguration() timedVersionGenerator := utils.NewDefaultTimedVersionGenerator() turnAuthHandler := NewTURNAuthHandler(keyProvider) - roomManager, err := NewLocalRoomManager(conf, objectStore, currentNode, router, telemetryService, clientConfigurationManager, rtcAgentClient, rtcEgressLauncher, timedVersionGenerator, turnAuthHandler, messageBus) + roomManager, err := NewLocalRoomManager(conf, objectStore, currentNode, router, telemetryService, clientConfigurationManager, agentClient, rtcEgressLauncher, timedVersionGenerator, turnAuthHandler, messageBus) if err != nil { return nil, err } diff --git a/test/agent_test.go b/test/agent_test.go index 172dec62f..0a45fa9ae 100644 --- a/test/agent_test.go +++ b/test/agent_test.go @@ -1,3 +1,17 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package test import (