mirror of
https://github.com/livekit/livekit.git
synced 2026-03-30 15:35:41 +00:00
Agents enabled check (#2227)
* agents enabled check * participant -> publisher * nil check client * add NumConnections * add lock around agent check * do not launch agents against other agents * regen * don't need atomic anymore * update protocol
This commit is contained in:
2
go.mod
2
go.mod
@@ -18,7 +18,7 @@ require (
|
||||
github.com/jxskiss/base62 v1.1.0
|
||||
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
|
||||
github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e
|
||||
github.com/livekit/protocol v1.9.1-0.20231103182211-6d382559cf42
|
||||
github.com/livekit/protocol v1.9.1-0.20231107185101-e230ee2d840e
|
||||
github.com/livekit/psrpc v0.5.0
|
||||
github.com/mackerelio/go-osstat v0.2.4
|
||||
github.com/magefile/mage v1.15.0
|
||||
|
||||
4
go.sum
4
go.sum
@@ -125,8 +125,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
|
||||
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
|
||||
github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e h1:yNeIo7MSMUWgoLu7LkNKnBYnJBFPFH9Wq4S6h1kS44M=
|
||||
github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY=
|
||||
github.com/livekit/protocol v1.9.1-0.20231103182211-6d382559cf42 h1:uDziAK5uhQPOj0fCKl+YyJx51tdFORLjC+rHgNNBCmY=
|
||||
github.com/livekit/protocol v1.9.1-0.20231103182211-6d382559cf42/go.mod h1:l2WjlZWErS6vBlQaQyCGwWLt1aOx10XfQTsmvLjJWFQ=
|
||||
github.com/livekit/protocol v1.9.1-0.20231107185101-e230ee2d840e h1:YShBpEjkEBY7yil2gjMWlkVkxs3OI58LIIYsBdb8aBU=
|
||||
github.com/livekit/protocol v1.9.1-0.20231107185101-e230ee2d840e/go.mod h1:l2WjlZWErS6vBlQaQyCGwWLt1aOx10XfQTsmvLjJWFQ=
|
||||
github.com/livekit/psrpc v0.5.0 h1:g+yYNSs6Y1/vM7UlFkB2s/ARe2y3RKWZhX8ata5j+eo=
|
||||
github.com/livekit/psrpc v0.5.0/go.mod h1:1XYH1LLoD/YbvBvt6xg2KQ/J3InLXSJK6PL/+DKmuAU=
|
||||
github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs=
|
||||
|
||||
92
pkg/rtc/agentclient.go
Normal file
92
pkg/rtc/agentclient.go
Normal file
@@ -0,0 +1,92 @@
|
||||
// Copyright 2023 LiveKit, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package rtc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/livekit/protocol/livekit"
|
||||
"github.com/livekit/protocol/logger"
|
||||
"github.com/livekit/protocol/rpc"
|
||||
"github.com/livekit/psrpc"
|
||||
)
|
||||
|
||||
const (
|
||||
RoomAgentTopic = "room"
|
||||
PublisherAgentTopic = "publisher"
|
||||
)
|
||||
|
||||
type AgentClient interface {
|
||||
CheckEnabled(ctx context.Context, req *rpc.CheckEnabledRequest) *rpc.CheckEnabledResponse
|
||||
JobRequest(ctx context.Context, job *livekit.Job)
|
||||
}
|
||||
|
||||
type agentClient struct {
|
||||
client rpc.AgentInternalClient
|
||||
}
|
||||
|
||||
func NewAgentClient(bus psrpc.MessageBus) (AgentClient, error) {
|
||||
client, err := rpc.NewAgentInternalClient(bus)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &agentClient{client: client}, nil
|
||||
}
|
||||
|
||||
func (c *agentClient) CheckEnabled(ctx context.Context, req *rpc.CheckEnabledRequest) *rpc.CheckEnabledResponse {
|
||||
res := &rpc.CheckEnabledResponse{}
|
||||
resChan, err := c.client.CheckEnabled(ctx, req, psrpc.WithRequestTimeout(time.Second))
|
||||
if err != nil {
|
||||
return res
|
||||
}
|
||||
|
||||
for r := range resChan {
|
||||
if r.Err != nil {
|
||||
continue
|
||||
}
|
||||
if r.Result.RoomEnabled {
|
||||
res.RoomEnabled = true
|
||||
if res.PublisherEnabled {
|
||||
return res
|
||||
}
|
||||
}
|
||||
if r.Result.PublisherEnabled {
|
||||
res.PublisherEnabled = true
|
||||
if res.RoomEnabled {
|
||||
return res
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
func (c *agentClient) JobRequest(ctx context.Context, job *livekit.Job) {
|
||||
var topic string
|
||||
var logError bool
|
||||
switch job.Type {
|
||||
case livekit.JobType_JT_ROOM:
|
||||
topic = RoomAgentTopic
|
||||
case livekit.JobType_JT_PUBLISHER:
|
||||
topic = PublisherAgentTopic
|
||||
logError = true
|
||||
}
|
||||
|
||||
_, err := c.client.JobRequest(ctx, topic, job)
|
||||
if err != nil && logError {
|
||||
logger.Warnw("agent job request failed", err)
|
||||
}
|
||||
}
|
||||
@@ -27,10 +27,6 @@ import (
|
||||
"github.com/livekit/protocol/webhook"
|
||||
)
|
||||
|
||||
type AgentClient interface {
|
||||
JobRequest(ctx context.Context, job *livekit.Job)
|
||||
}
|
||||
|
||||
type EgressLauncher interface {
|
||||
StartEgress(context.Context, *rpc.StartEgressRequest) (*livekit.EgressInfo, error)
|
||||
StartEgressWithClusterId(ctx context.Context, clusterId string, req *rpc.StartEgressRequest) (*livekit.EgressInfo, error)
|
||||
@@ -1034,6 +1034,13 @@ func (p *ParticipantImpl) IsRecorder() bool {
|
||||
return p.grants.Video.Recorder
|
||||
}
|
||||
|
||||
func (p *ParticipantImpl) IsAgent() bool {
|
||||
p.lock.RLock()
|
||||
defer p.lock.RUnlock()
|
||||
|
||||
return p.grants.Video.Agent
|
||||
}
|
||||
|
||||
func (p *ParticipantImpl) VerifySubscribeParticipantInfo(pID livekit.ParticipantID, version uint32) {
|
||||
if !p.IsReady() {
|
||||
// we have not sent a JoinResponse yet. metadata would be covered in JoinResponse
|
||||
|
||||
@@ -30,6 +30,7 @@ import (
|
||||
|
||||
"github.com/livekit/protocol/livekit"
|
||||
"github.com/livekit/protocol/logger"
|
||||
"github.com/livekit/protocol/rpc"
|
||||
"github.com/livekit/protocol/utils"
|
||||
|
||||
"github.com/livekit/livekit-server/pkg/config"
|
||||
@@ -74,15 +75,18 @@ type Room struct {
|
||||
audioConfig *config.AudioConfig
|
||||
serverInfo *livekit.ServerInfo
|
||||
telemetry telemetry.TelemetryService
|
||||
agentClient AgentClient
|
||||
egressLauncher EgressLauncher
|
||||
trackManager *RoomTrackManager
|
||||
|
||||
// agents
|
||||
agentClient AgentClient
|
||||
publisherAgentsEnabled bool
|
||||
|
||||
// map of identity -> Participant
|
||||
participants map[livekit.ParticipantIdentity]types.LocalParticipant
|
||||
participantOpts map[livekit.ParticipantIdentity]*ParticipantOptions
|
||||
participantRequestSources map[livekit.ParticipantIdentity]routing.MessageSource
|
||||
hasPublished sync.Map // map of identity -> bool
|
||||
hasPublished map[livekit.ParticipantIdentity]bool
|
||||
bufferFactory *buffer.FactoryOfBufferFactory
|
||||
|
||||
// batch update participant info for non-publishers
|
||||
@@ -135,11 +139,13 @@ func NewRoom(
|
||||
participants: make(map[livekit.ParticipantIdentity]types.LocalParticipant),
|
||||
participantOpts: make(map[livekit.ParticipantIdentity]*ParticipantOptions),
|
||||
participantRequestSources: make(map[livekit.ParticipantIdentity]routing.MessageSource),
|
||||
hasPublished: make(map[livekit.ParticipantIdentity]bool),
|
||||
bufferFactory: buffer.NewFactoryOfBufferFactory(config.Receiver.PacketBufferSize),
|
||||
batchedUpdates: make(map[livekit.ParticipantIdentity]*livekit.ParticipantInfo),
|
||||
closed: make(chan struct{}),
|
||||
trailer: []byte(utils.RandomSecret()),
|
||||
}
|
||||
|
||||
r.protoProxy = utils.NewProtoProxy[*livekit.Room](roomUpdateInterval, r.updateProto)
|
||||
if r.protoRoom.EmptyTimeout == 0 {
|
||||
r.protoRoom.EmptyTimeout = DefaultEmptyTimeout
|
||||
@@ -148,6 +154,21 @@ func NewRoom(
|
||||
r.protoRoom.CreationTime = time.Now().Unix()
|
||||
}
|
||||
|
||||
if agentClient != nil {
|
||||
go func() {
|
||||
res := r.agentClient.CheckEnabled(context.Background(), &rpc.CheckEnabledRequest{})
|
||||
if res.PublisherEnabled {
|
||||
r.lock.Lock()
|
||||
r.publisherAgentsEnabled = true
|
||||
// if there are already published tracks, start the agents
|
||||
for identity := range r.hasPublished {
|
||||
r.launchPublisherAgent(r.participants[identity])
|
||||
}
|
||||
r.lock.Unlock()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
go r.audioUpdateWorker()
|
||||
go r.connectionQualityWorker()
|
||||
go r.changeUpdateWorker()
|
||||
@@ -477,6 +498,7 @@ func (r *Room) RemoveParticipant(identity livekit.ParticipantIdentity, pID livek
|
||||
delete(r.participants, identity)
|
||||
delete(r.participantOpts, identity)
|
||||
delete(r.participantRequestSources, identity)
|
||||
delete(r.hasPublished, identity)
|
||||
if !p.Hidden() {
|
||||
r.protoRoom.NumParticipants--
|
||||
}
|
||||
@@ -512,7 +534,6 @@ func (r *Room) RemoveParticipant(identity livekit.ParticipantIdentity, pID livek
|
||||
for _, t := range p.GetPublishedTracks() {
|
||||
r.trackManager.RemoveTrack(t)
|
||||
}
|
||||
r.hasPublished.Delete(p.Identity())
|
||||
|
||||
p.OnTrackUpdated(nil)
|
||||
p.OnTrackPublished(nil)
|
||||
@@ -902,17 +923,15 @@ func (r *Room) onTrackPublished(participant types.LocalParticipant, track types.
|
||||
r.trackManager.AddTrack(track, participant.Identity(), participant.ID())
|
||||
|
||||
// launch jobs
|
||||
_, hasPublished := r.hasPublished.Swap(participant.Identity(), true)
|
||||
r.lock.Lock()
|
||||
hasPublished := r.hasPublished[participant.Identity()]
|
||||
r.hasPublished[participant.Identity()] = true
|
||||
publisherAgentsEnabled := r.publisherAgentsEnabled
|
||||
r.lock.Unlock()
|
||||
|
||||
if !hasPublished {
|
||||
if r.agentClient != nil {
|
||||
go func() {
|
||||
r.agentClient.JobRequest(context.Background(), &livekit.Job{
|
||||
Id: utils.NewGuid("JP_"),
|
||||
Type: livekit.JobType_JT_PUBLISHER,
|
||||
Room: r.protoRoom,
|
||||
Participant: participant.ToProto(),
|
||||
})
|
||||
}()
|
||||
if publisherAgentsEnabled {
|
||||
r.launchPublisherAgent(participant)
|
||||
}
|
||||
if r.internal != nil && r.internal.ParticipantEgress != nil {
|
||||
go func() {
|
||||
@@ -1302,6 +1321,21 @@ func (r *Room) connectionQualityWorker() {
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Room) launchPublisherAgent(p types.Participant) {
|
||||
if p == nil || p.IsRecorder() || p.IsAgent() {
|
||||
return
|
||||
}
|
||||
|
||||
go func() {
|
||||
r.agentClient.JobRequest(context.Background(), &livekit.Job{
|
||||
Id: utils.NewGuid("JP_"),
|
||||
Type: livekit.JobType_JT_PUBLISHER,
|
||||
Room: r.ToProto(),
|
||||
Participant: p.ToProto(),
|
||||
})
|
||||
}()
|
||||
}
|
||||
|
||||
func (r *Room) DebugInfo() map[string]interface{} {
|
||||
info := map[string]interface{}{
|
||||
"Name": r.protoRoom.Name,
|
||||
|
||||
@@ -264,6 +264,7 @@ type Participant interface {
|
||||
// permissions
|
||||
Hidden() bool
|
||||
IsRecorder() bool
|
||||
IsAgent() bool
|
||||
|
||||
Start()
|
||||
Close(sendLeave bool, reason ParticipantCloseReason, isExpectedToResume bool) error
|
||||
|
||||
@@ -408,6 +408,16 @@ type FakeLocalParticipant struct {
|
||||
identityReturnsOnCall map[int]struct {
|
||||
result1 livekit.ParticipantIdentity
|
||||
}
|
||||
IsAgentStub func() bool
|
||||
isAgentMutex sync.RWMutex
|
||||
isAgentArgsForCall []struct {
|
||||
}
|
||||
isAgentReturns struct {
|
||||
result1 bool
|
||||
}
|
||||
isAgentReturnsOnCall map[int]struct {
|
||||
result1 bool
|
||||
}
|
||||
IsClosedStub func() bool
|
||||
isClosedMutex sync.RWMutex
|
||||
isClosedArgsForCall []struct {
|
||||
@@ -2986,6 +2996,59 @@ func (fake *FakeLocalParticipant) IdentityReturnsOnCall(i int, result1 livekit.P
|
||||
}{result1}
|
||||
}
|
||||
|
||||
func (fake *FakeLocalParticipant) IsAgent() bool {
|
||||
fake.isAgentMutex.Lock()
|
||||
ret, specificReturn := fake.isAgentReturnsOnCall[len(fake.isAgentArgsForCall)]
|
||||
fake.isAgentArgsForCall = append(fake.isAgentArgsForCall, struct {
|
||||
}{})
|
||||
stub := fake.IsAgentStub
|
||||
fakeReturns := fake.isAgentReturns
|
||||
fake.recordInvocation("IsAgent", []interface{}{})
|
||||
fake.isAgentMutex.Unlock()
|
||||
if stub != nil {
|
||||
return stub()
|
||||
}
|
||||
if specificReturn {
|
||||
return ret.result1
|
||||
}
|
||||
return fakeReturns.result1
|
||||
}
|
||||
|
||||
func (fake *FakeLocalParticipant) IsAgentCallCount() int {
|
||||
fake.isAgentMutex.RLock()
|
||||
defer fake.isAgentMutex.RUnlock()
|
||||
return len(fake.isAgentArgsForCall)
|
||||
}
|
||||
|
||||
func (fake *FakeLocalParticipant) IsAgentCalls(stub func() bool) {
|
||||
fake.isAgentMutex.Lock()
|
||||
defer fake.isAgentMutex.Unlock()
|
||||
fake.IsAgentStub = stub
|
||||
}
|
||||
|
||||
func (fake *FakeLocalParticipant) IsAgentReturns(result1 bool) {
|
||||
fake.isAgentMutex.Lock()
|
||||
defer fake.isAgentMutex.Unlock()
|
||||
fake.IsAgentStub = nil
|
||||
fake.isAgentReturns = struct {
|
||||
result1 bool
|
||||
}{result1}
|
||||
}
|
||||
|
||||
func (fake *FakeLocalParticipant) IsAgentReturnsOnCall(i int, result1 bool) {
|
||||
fake.isAgentMutex.Lock()
|
||||
defer fake.isAgentMutex.Unlock()
|
||||
fake.IsAgentStub = nil
|
||||
if fake.isAgentReturnsOnCall == nil {
|
||||
fake.isAgentReturnsOnCall = make(map[int]struct {
|
||||
result1 bool
|
||||
})
|
||||
}
|
||||
fake.isAgentReturnsOnCall[i] = struct {
|
||||
result1 bool
|
||||
}{result1}
|
||||
}
|
||||
|
||||
func (fake *FakeLocalParticipant) IsClosed() bool {
|
||||
fake.isClosedMutex.Lock()
|
||||
ret, specificReturn := fake.isClosedReturnsOnCall[len(fake.isClosedArgsForCall)]
|
||||
@@ -6031,6 +6094,8 @@ func (fake *FakeLocalParticipant) Invocations() map[string][][]interface{} {
|
||||
defer fake.iDMutex.RUnlock()
|
||||
fake.identityMutex.RLock()
|
||||
defer fake.identityMutex.RUnlock()
|
||||
fake.isAgentMutex.RLock()
|
||||
defer fake.isAgentMutex.RUnlock()
|
||||
fake.isClosedMutex.RLock()
|
||||
defer fake.isClosedMutex.RUnlock()
|
||||
fake.isDisconnectedMutex.RLock()
|
||||
|
||||
@@ -118,6 +118,16 @@ type FakeParticipant struct {
|
||||
identityReturnsOnCall map[int]struct {
|
||||
result1 livekit.ParticipantIdentity
|
||||
}
|
||||
IsAgentStub func() bool
|
||||
isAgentMutex sync.RWMutex
|
||||
isAgentArgsForCall []struct {
|
||||
}
|
||||
isAgentReturns struct {
|
||||
result1 bool
|
||||
}
|
||||
isAgentReturnsOnCall map[int]struct {
|
||||
result1 bool
|
||||
}
|
||||
IsPublisherStub func() bool
|
||||
isPublisherMutex sync.RWMutex
|
||||
isPublisherArgsForCall []struct {
|
||||
@@ -780,6 +790,59 @@ func (fake *FakeParticipant) IdentityReturnsOnCall(i int, result1 livekit.Partic
|
||||
}{result1}
|
||||
}
|
||||
|
||||
func (fake *FakeParticipant) IsAgent() bool {
|
||||
fake.isAgentMutex.Lock()
|
||||
ret, specificReturn := fake.isAgentReturnsOnCall[len(fake.isAgentArgsForCall)]
|
||||
fake.isAgentArgsForCall = append(fake.isAgentArgsForCall, struct {
|
||||
}{})
|
||||
stub := fake.IsAgentStub
|
||||
fakeReturns := fake.isAgentReturns
|
||||
fake.recordInvocation("IsAgent", []interface{}{})
|
||||
fake.isAgentMutex.Unlock()
|
||||
if stub != nil {
|
||||
return stub()
|
||||
}
|
||||
if specificReturn {
|
||||
return ret.result1
|
||||
}
|
||||
return fakeReturns.result1
|
||||
}
|
||||
|
||||
func (fake *FakeParticipant) IsAgentCallCount() int {
|
||||
fake.isAgentMutex.RLock()
|
||||
defer fake.isAgentMutex.RUnlock()
|
||||
return len(fake.isAgentArgsForCall)
|
||||
}
|
||||
|
||||
func (fake *FakeParticipant) IsAgentCalls(stub func() bool) {
|
||||
fake.isAgentMutex.Lock()
|
||||
defer fake.isAgentMutex.Unlock()
|
||||
fake.IsAgentStub = stub
|
||||
}
|
||||
|
||||
func (fake *FakeParticipant) IsAgentReturns(result1 bool) {
|
||||
fake.isAgentMutex.Lock()
|
||||
defer fake.isAgentMutex.Unlock()
|
||||
fake.IsAgentStub = nil
|
||||
fake.isAgentReturns = struct {
|
||||
result1 bool
|
||||
}{result1}
|
||||
}
|
||||
|
||||
func (fake *FakeParticipant) IsAgentReturnsOnCall(i int, result1 bool) {
|
||||
fake.isAgentMutex.Lock()
|
||||
defer fake.isAgentMutex.Unlock()
|
||||
fake.IsAgentStub = nil
|
||||
if fake.isAgentReturnsOnCall == nil {
|
||||
fake.isAgentReturnsOnCall = make(map[int]struct {
|
||||
result1 bool
|
||||
})
|
||||
}
|
||||
fake.isAgentReturnsOnCall[i] = struct {
|
||||
result1 bool
|
||||
}{result1}
|
||||
}
|
||||
|
||||
func (fake *FakeParticipant) IsPublisher() bool {
|
||||
fake.isPublisherMutex.Lock()
|
||||
ret, specificReturn := fake.isPublisherReturnsOnCall[len(fake.isPublisherArgsForCall)]
|
||||
@@ -1318,6 +1381,8 @@ func (fake *FakeParticipant) Invocations() map[string][][]interface{} {
|
||||
defer fake.iDMutex.RUnlock()
|
||||
fake.identityMutex.RLock()
|
||||
defer fake.identityMutex.RUnlock()
|
||||
fake.isAgentMutex.RLock()
|
||||
defer fake.isAgentMutex.RUnlock()
|
||||
fake.isPublisherMutex.RLock()
|
||||
defer fake.isPublisherMutex.RUnlock()
|
||||
fake.isRecorderMutex.RLock()
|
||||
|
||||
@@ -34,6 +34,8 @@ import (
|
||||
"github.com/livekit/psrpc"
|
||||
)
|
||||
|
||||
const AgentServiceVersion = "0.1.0"
|
||||
|
||||
type AgentService struct {
|
||||
upgrader websocket.Upgrader
|
||||
|
||||
@@ -41,17 +43,17 @@ type AgentService struct {
|
||||
}
|
||||
|
||||
type AgentHandler struct {
|
||||
agentServer rpc.AgentInternalServer
|
||||
roomTopic string
|
||||
participantTopic string
|
||||
agentServer rpc.AgentInternalServer
|
||||
roomTopic string
|
||||
publisherTopic string
|
||||
|
||||
mu sync.Mutex
|
||||
availability map[string]chan *availability
|
||||
unregistered map[*websocket.Conn]*worker
|
||||
roomRegistered bool
|
||||
roomWorkers map[string]*worker
|
||||
participantRegistered bool
|
||||
participantWorkers map[string]*worker
|
||||
mu sync.Mutex
|
||||
availability map[string]chan *availability
|
||||
unregistered map[*websocket.Conn]*worker
|
||||
roomRegistered bool
|
||||
roomWorkers map[string]*worker
|
||||
publisherRegistered bool
|
||||
publisherWorkers map[string]*worker
|
||||
}
|
||||
|
||||
type worker struct {
|
||||
@@ -85,7 +87,7 @@ func NewAgentService(bus psrpc.MessageBus) (*AgentService, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.AgentHandler = NewAgentHandler(agentServer, "room", "participant")
|
||||
s.AgentHandler = NewAgentHandler(agentServer, rtc.RoomAgentTopic, rtc.PublisherAgentTopic)
|
||||
|
||||
return s, nil
|
||||
}
|
||||
@@ -114,15 +116,15 @@ func (s *AgentService) ServeHTTP(writer http.ResponseWriter, r *http.Request) {
|
||||
s.HandleConnection(conn)
|
||||
}
|
||||
|
||||
func NewAgentHandler(agentServer rpc.AgentInternalServer, roomTopic, participantTopic string) *AgentHandler {
|
||||
func NewAgentHandler(agentServer rpc.AgentInternalServer, roomTopic, publisherTopic string) *AgentHandler {
|
||||
return &AgentHandler{
|
||||
agentServer: agentServer,
|
||||
roomTopic: roomTopic,
|
||||
participantTopic: participantTopic,
|
||||
availability: make(map[string]chan *availability),
|
||||
unregistered: make(map[*websocket.Conn]*worker),
|
||||
roomWorkers: make(map[string]*worker),
|
||||
participantWorkers: make(map[string]*worker),
|
||||
agentServer: agentServer,
|
||||
roomTopic: roomTopic,
|
||||
publisherTopic: publisherTopic,
|
||||
availability: make(map[string]chan *availability),
|
||||
unregistered: make(map[*websocket.Conn]*worker),
|
||||
roomWorkers: make(map[string]*worker),
|
||||
publisherWorkers: make(map[string]*worker),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -150,10 +152,10 @@ func (s *AgentHandler) HandleConnection(conn *websocket.Conn) {
|
||||
s.agentServer.DeregisterJobRequestTopic(s.roomTopic)
|
||||
}
|
||||
case livekit.JobType_JT_PUBLISHER:
|
||||
delete(s.participantWorkers, w.id)
|
||||
if s.participantRegistered && !s.participantAvailableLocked() {
|
||||
s.participantRegistered = false
|
||||
s.agentServer.DeregisterJobRequestTopic(s.participantTopic)
|
||||
delete(s.publisherWorkers, w.id)
|
||||
if s.publisherRegistered && !s.publisherAvailableLocked() {
|
||||
s.publisherRegistered = false
|
||||
s.agentServer.DeregisterJobRequestTopic(s.publisherTopic)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -217,26 +219,29 @@ func (s *AgentHandler) handleRegister(worker *worker, msg *livekit.RegisterWorke
|
||||
case livekit.JobType_JT_PUBLISHER:
|
||||
worker.id = msg.WorkerId
|
||||
delete(s.unregistered, worker.conn)
|
||||
s.participantWorkers[worker.id] = worker
|
||||
s.publisherWorkers[worker.id] = worker
|
||||
|
||||
if !s.participantRegistered {
|
||||
err := s.agentServer.RegisterJobRequestTopic(s.participantTopic)
|
||||
if !s.publisherRegistered {
|
||||
err := s.agentServer.RegisterJobRequestTopic(s.publisherTopic)
|
||||
if err != nil {
|
||||
logger.Errorw("failed to register participant agents", err)
|
||||
logger.Errorw("failed to register publisher agents", err)
|
||||
} else {
|
||||
s.participantRegistered = true
|
||||
s.publisherRegistered = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
worker.sigConn.WriteServerMessage(&livekit.ServerMessage{
|
||||
_, err := worker.sigConn.WriteServerMessage(&livekit.ServerMessage{
|
||||
Message: &livekit.ServerMessage_Register{
|
||||
Register: &livekit.RegisterWorkerResponse{
|
||||
WorkerId: worker.id,
|
||||
ServerVersion: "version",
|
||||
ServerVersion: AgentServiceVersion,
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
logger.Errorw("failed to write server message", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *AgentHandler) handleAvailability(w *worker, msg *livekit.AvailabilityResponse) {
|
||||
@@ -286,19 +291,29 @@ func (s *AgentHandler) handleStatus(w *worker, msg *livekit.UpdateWorkerStatus)
|
||||
}
|
||||
}
|
||||
case livekit.JobType_JT_PUBLISHER:
|
||||
if s.participantRegistered && !s.participantAvailableLocked() {
|
||||
s.participantRegistered = false
|
||||
s.agentServer.DeregisterJobRequestTopic(s.participantTopic)
|
||||
} else if !s.participantRegistered && s.participantAvailableLocked() {
|
||||
if err := s.agentServer.RegisterJobRequestTopic(s.participantTopic); err != nil {
|
||||
logger.Errorw("failed to register participant agents", err)
|
||||
if s.publisherRegistered && !s.publisherAvailableLocked() {
|
||||
s.publisherRegistered = false
|
||||
s.agentServer.DeregisterJobRequestTopic(s.publisherTopic)
|
||||
} else if !s.publisherRegistered && s.publisherAvailableLocked() {
|
||||
if err := s.agentServer.RegisterJobRequestTopic(s.publisherTopic); err != nil {
|
||||
logger.Errorw("failed to register publisher agents", err)
|
||||
} else {
|
||||
s.participantRegistered = true
|
||||
s.publisherRegistered = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *AgentHandler) CheckEnabled(_ context.Context, _ *rpc.CheckEnabledRequest) (*rpc.CheckEnabledResponse, error) {
|
||||
s.mu.Lock()
|
||||
res := &rpc.CheckEnabledResponse{
|
||||
RoomEnabled: len(s.roomWorkers) > 0,
|
||||
PublisherEnabled: len(s.publisherWorkers) > 0,
|
||||
}
|
||||
s.mu.Unlock()
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (s *AgentHandler) JobRequest(ctx context.Context, job *livekit.Job) (*emptypb.Empty, error) {
|
||||
s.mu.Lock()
|
||||
ac := make(chan *availability, 100)
|
||||
@@ -316,7 +331,7 @@ func (s *AgentHandler) JobRequest(ctx context.Context, job *livekit.Job) (*empty
|
||||
case livekit.JobType_JT_ROOM:
|
||||
pool = s.roomWorkers
|
||||
case livekit.JobType_JT_PUBLISHER:
|
||||
pool = s.participantWorkers
|
||||
pool = s.publisherWorkers
|
||||
}
|
||||
|
||||
attempted := make(map[string]bool)
|
||||
@@ -386,7 +401,7 @@ func (s *AgentHandler) JobRequestAffinity(ctx context.Context, job *livekit.Job)
|
||||
case livekit.JobType_JT_ROOM:
|
||||
pool = s.roomWorkers
|
||||
case livekit.JobType_JT_PUBLISHER:
|
||||
pool = s.participantWorkers
|
||||
pool = s.publisherWorkers
|
||||
}
|
||||
|
||||
var affinity float32
|
||||
@@ -403,6 +418,13 @@ func (s *AgentHandler) JobRequestAffinity(ctx context.Context, job *livekit.Job)
|
||||
return affinity
|
||||
}
|
||||
|
||||
func (s *AgentHandler) NumConnections() int {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
return len(s.unregistered) + len(s.roomWorkers) + len(s.publisherWorkers)
|
||||
}
|
||||
|
||||
func (s *AgentHandler) DrainConnections(interval time.Duration) {
|
||||
// jitter drain start
|
||||
time.Sleep(time.Duration(rand.Int63n(int64(interval))))
|
||||
@@ -421,7 +443,7 @@ func (s *AgentHandler) DrainConnections(interval time.Duration) {
|
||||
_ = w.conn.Close()
|
||||
<-t.C
|
||||
}
|
||||
for _, w := range s.participantWorkers {
|
||||
for _, w := range s.publisherWorkers {
|
||||
_ = w.conn.Close()
|
||||
<-t.C
|
||||
}
|
||||
@@ -436,8 +458,8 @@ func (s *AgentHandler) roomAvailableLocked() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (s *AgentHandler) participantAvailableLocked() bool {
|
||||
for _, w := range s.participantWorkers {
|
||||
func (s *AgentHandler) publisherAvailableLocked() bool {
|
||||
for _, w := range s.publisherWorkers {
|
||||
if w.status == livekit.WorkerStatus_WS_AVAILABLE {
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -26,18 +26,6 @@ import (
|
||||
"github.com/livekit/protocol/utils"
|
||||
)
|
||||
|
||||
type agentClient struct {
|
||||
s *AgentService
|
||||
}
|
||||
|
||||
func NewAgentClient(s *AgentService) rtc.AgentClient {
|
||||
return &agentClient{s}
|
||||
}
|
||||
|
||||
func (c *agentClient) JobRequest(ctx context.Context, job *livekit.Job) {
|
||||
_, _ = c.s.JobRequest(ctx, job)
|
||||
}
|
||||
|
||||
type IOClient interface {
|
||||
CreateEgress(ctx context.Context, info *livekit.EgressInfo) (*emptypb.Empty, error)
|
||||
GetEgress(ctx context.Context, req *rpc.GetEgressRequest) (*livekit.EgressInfo, error)
|
||||
|
||||
@@ -117,25 +117,28 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq
|
||||
}
|
||||
|
||||
if created {
|
||||
if s.agentClient != nil {
|
||||
go func() {
|
||||
s.agentClient.JobRequest(ctx, &livekit.Job{
|
||||
Id: utils.NewGuid("JR_"),
|
||||
Type: livekit.JobType_JT_ROOM,
|
||||
Room: rm,
|
||||
})
|
||||
}
|
||||
}()
|
||||
|
||||
if req.Egress != nil && req.Egress.Room != nil {
|
||||
egress := &rpc.StartEgressRequest{
|
||||
_, err = s.egressLauncher.StartEgress(ctx, &rpc.StartEgressRequest{
|
||||
Request: &rpc.StartEgressRequest_RoomComposite{
|
||||
RoomComposite: req.Egress.Room,
|
||||
},
|
||||
RoomId: rm.Sid,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
_, err = s.egressLauncher.StartEgress(ctx, egress)
|
||||
}
|
||||
}
|
||||
|
||||
return rm, err
|
||||
return rm, nil
|
||||
}
|
||||
|
||||
func (s *RoomService) ListRooms(ctx context.Context, req *livekit.ListRoomsRequest) (*livekit.ListRoomsResponse, error) {
|
||||
@@ -478,12 +481,14 @@ func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if created && s.agentClient != nil {
|
||||
s.agentClient.JobRequest(ctx, &livekit.Job{
|
||||
Id: utils.NewGuid("JR_"),
|
||||
Type: livekit.JobType_JT_ROOM,
|
||||
Room: room,
|
||||
})
|
||||
if created {
|
||||
go func() {
|
||||
s.agentClient.JobRequest(ctx, &livekit.Job{
|
||||
Id: utils.NewGuid("JR_"),
|
||||
Type: livekit.JobType_JT_ROOM,
|
||||
Room: room,
|
||||
})
|
||||
}()
|
||||
}
|
||||
|
||||
return room, nil
|
||||
|
||||
@@ -30,6 +30,7 @@ import (
|
||||
"github.com/livekit/livekit-server/pkg/clientconfiguration"
|
||||
"github.com/livekit/livekit-server/pkg/config"
|
||||
"github.com/livekit/livekit-server/pkg/routing"
|
||||
"github.com/livekit/livekit-server/pkg/rtc"
|
||||
"github.com/livekit/livekit-server/pkg/telemetry"
|
||||
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
|
||||
"github.com/livekit/protocol/auth"
|
||||
@@ -73,7 +74,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
|
||||
NewRoomService,
|
||||
NewRTCService,
|
||||
NewAgentService,
|
||||
NewAgentClient,
|
||||
rtc.NewAgentClient,
|
||||
getSignalRelayConfig,
|
||||
NewDefaultSignalServer,
|
||||
routing.NewSignalClient,
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"github.com/livekit/livekit-server/pkg/clientconfiguration"
|
||||
"github.com/livekit/livekit-server/pkg/config"
|
||||
"github.com/livekit/livekit-server/pkg/routing"
|
||||
"github.com/livekit/livekit-server/pkg/rtc"
|
||||
"github.com/livekit/livekit-server/pkg/telemetry"
|
||||
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
|
||||
"github.com/livekit/protocol/auth"
|
||||
@@ -55,11 +56,10 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
agentService, err := NewAgentService(messageBus)
|
||||
agentClient, err := rtc.NewAgentClient(messageBus)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rtcAgentClient := NewAgentClient(agentService)
|
||||
egressClient, err := rpc.NewEgressClient(messageBus)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -91,7 +91,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
roomService, err := NewRoomService(roomConfig, apiConfig, psrpcConfig, router, roomAllocator, objectStore, rtcAgentClient, rtcEgressLauncher, topicFormatter, roomClient, participantClient)
|
||||
roomService, err := NewRoomService(roomConfig, apiConfig, psrpcConfig, router, roomAllocator, objectStore, agentClient, rtcEgressLauncher, topicFormatter, roomClient, participantClient)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -102,11 +102,15 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live
|
||||
return nil, err
|
||||
}
|
||||
ingressService := NewIngressService(ingressConfig, nodeID, messageBus, ingressClient, ingressStore, roomService, telemetryService)
|
||||
rtcService := NewRTCService(conf, roomAllocator, objectStore, router, currentNode, rtcAgentClient, telemetryService)
|
||||
rtcService := NewRTCService(conf, roomAllocator, objectStore, router, currentNode, agentClient, telemetryService)
|
||||
agentService, err := NewAgentService(messageBus)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
clientConfigurationManager := createClientConfiguration()
|
||||
timedVersionGenerator := utils.NewDefaultTimedVersionGenerator()
|
||||
turnAuthHandler := NewTURNAuthHandler(keyProvider)
|
||||
roomManager, err := NewLocalRoomManager(conf, objectStore, currentNode, router, telemetryService, clientConfigurationManager, rtcAgentClient, rtcEgressLauncher, timedVersionGenerator, turnAuthHandler, messageBus)
|
||||
roomManager, err := NewLocalRoomManager(conf, objectStore, currentNode, router, telemetryService, clientConfigurationManager, agentClient, rtcEgressLauncher, timedVersionGenerator, turnAuthHandler, messageBus)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -1,3 +1,17 @@
|
||||
// Copyright 2023 LiveKit, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package test
|
||||
|
||||
import (
|
||||
|
||||
Reference in New Issue
Block a user