From f49103a0039ebe96dea5f978039dbff0b0e4953d Mon Sep 17 00:00:00 2001 From: Paul Wells Date: Tue, 18 Feb 2025 00:40:56 -0800 Subject: [PATCH] add participant job type (#3443) * add participant job type * cleanup * deps --- go.mod | 2 +- go.sum | 4 +-- pkg/agent/client.go | 71 ++++++++++++++++++++++++++++--------- pkg/rtc/room.go | 13 ++++--- pkg/service/agentservice.go | 58 +++++++++++++++++++++--------- test/agent.go | 27 +++++++++++--- test/agent_test.go | 26 +++++++++++--- test/client/client.go | 1 - 8 files changed, 152 insertions(+), 50 deletions(-) diff --git a/go.mod b/go.mod index 2e44b8c1a..40ebb02f6 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20241220010243-a2bdee945564 - github.com/livekit/protocol v1.33.1-0.20250213045117-9b6e5d703ff8 + github.com/livekit/protocol v1.33.1-0.20250218083317-1a019aab5b83 github.com/livekit/psrpc v0.6.1-0.20250205181828-a0beed2e4126 github.com/mackerelio/go-osstat v0.2.5 github.com/magefile/mage v1.15.0 diff --git a/go.sum b/go.sum index 416ca99fb..55cff33b0 100644 --- a/go.sum +++ b/go.sum @@ -170,8 +170,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20241220010243-a2bdee945564 h1:GX7KF/V9ExmcfT/2Bdia8aROjkxrgx7WpyH7w9MB4J4= github.com/livekit/mediatransportutil v0.0.0-20241220010243-a2bdee945564/go.mod h1:36s+wwmU3O40IAhE+MjBWP3W71QRiEE9SfooSBvtBqY= -github.com/livekit/protocol v1.33.1-0.20250213045117-9b6e5d703ff8 h1:TwYfrw9nnuhpTYsgY/T+OQi8z9s1itgdzNMDAOmq3BA= -github.com/livekit/protocol v1.33.1-0.20250213045117-9b6e5d703ff8/go.mod h1:yXuQ7ucrLj91nbxL6/AHgtxdha1DGzLj1LkgvnT90So= +github.com/livekit/protocol v1.33.1-0.20250218083317-1a019aab5b83 h1:1HFZ41AaFE+disN7Md9g0MQNpnw9Y2p3QKbjYNtjQjA= +github.com/livekit/protocol v1.33.1-0.20250218083317-1a019aab5b83/go.mod h1:yXuQ7ucrLj91nbxL6/AHgtxdha1DGzLj1LkgvnT90So= github.com/livekit/psrpc v0.6.1-0.20250205181828-a0beed2e4126 h1:fzuYpAQbCid7ySPpQWWePfQOWUrs8x6dJ0T3Wl07n+Y= github.com/livekit/psrpc v0.6.1-0.20250205181828-a0beed2e4126/go.mod h1:X5WtEZ7OnEs72Fi5/J+i0on3964F1aynQpCalcgMqRo= github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o= diff --git a/pkg/agent/client.go b/pkg/agent/client.go index 95acedb5f..925ca105f 100644 --- a/pkg/agent/client.go +++ b/pkg/agent/client.go @@ -21,24 +21,32 @@ import ( "time" "github.com/gammazero/workerpool" + "google.golang.org/protobuf/types/known/emptypb" + serverutils "github.com/livekit/livekit-server/pkg/utils" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/rpc" "github.com/livekit/protocol/utils" "github.com/livekit/psrpc" - "google.golang.org/protobuf/types/known/emptypb" ) const ( EnabledCacheTTL = 1 * time.Minute RoomAgentTopic = "room" PublisherAgentTopic = "publisher" + ParticipantAgentTopic = "participant" DefaultHandlerNamespace = "" CheckEnabledTimeout = 5 * time.Second ) +var jobTypeTopics = map[livekit.JobType]string{ + livekit.JobType_JT_ROOM: RoomAgentTopic, + livekit.JobType_JT_PUBLISHER: PublisherAgentTopic, + livekit.JobType_JT_PARTICIPANT: ParticipantAgentTopic, +} + type Client interface { // LaunchJob starts a room or participant job on an agent. // it will launch a job once for each worker in each namespace @@ -64,10 +72,12 @@ type agentClient struct { // cache response to avoid constantly checking with controllers // cache is invalidated with AgentRegistered updates - roomNamespaces *serverutils.IncrementalDispatcher[string] // deprecated - publisherNamespaces *serverutils.IncrementalDispatcher[string] // deprecated - roomAgentNames *serverutils.IncrementalDispatcher[string] - publisherAgentNames *serverutils.IncrementalDispatcher[string] + roomNamespaces *serverutils.IncrementalDispatcher[string] // deprecated + publisherNamespaces *serverutils.IncrementalDispatcher[string] // deprecated + participantNamespaces *serverutils.IncrementalDispatcher[string] // deprecated + roomAgentNames *serverutils.IncrementalDispatcher[string] + publisherAgentNames *serverutils.IncrementalDispatcher[string] + participantAgentNames *serverutils.IncrementalDispatcher[string] enabledExpiresAt time.Time @@ -102,8 +112,10 @@ func NewAgentClient(bus psrpc.MessageBus) (Client, error) { c.mu.Lock() c.roomNamespaces = nil c.publisherNamespaces = nil + c.participantNamespaces = nil c.roomAgentNames = nil c.publisherAgentNames = nil + c.participantAgentNames = nil c.mu.Unlock() } @@ -114,11 +126,6 @@ func NewAgentClient(bus psrpc.MessageBus) (Client, error) { } func (c *agentClient) LaunchJob(ctx context.Context, desc *JobRequest) *serverutils.IncrementalDispatcher[*livekit.Job] { - jobTypeTopic := RoomAgentTopic - if desc.JobType == livekit.JobType_JT_PUBLISHER { - jobTypeTopic = PublisherAgentTopic - } - var wg sync.WaitGroup ret := serverutils.NewIncrementalDispatcher[*livekit.Job]() defer func() { @@ -128,6 +135,11 @@ func (c *agentClient) LaunchJob(ctx context.Context, desc *JobRequest) *serverut }) }() + jobTypeTopic, ok := jobTypeTopics[desc.JobType] + if !ok { + return ret + } + dispatcher := c.getDispatcher(desc.AgentName, desc.JobType) if dispatcher == nil { @@ -186,21 +198,30 @@ func (c *agentClient) getDispatcher(agName string, jobType livekit.JobType) *ser c.mu.Lock() if time.Since(c.enabledExpiresAt) > EnabledCacheTTL || c.roomNamespaces == nil || - c.publisherNamespaces == nil || c.roomAgentNames == nil || c.publisherAgentNames == nil { + c.publisherNamespaces == nil || c.participantNamespaces == nil || c.roomAgentNames == nil || c.publisherAgentNames == nil || c.participantAgentNames == nil { c.enabledExpiresAt = time.Now() c.roomNamespaces = serverutils.NewIncrementalDispatcher[string]() c.publisherNamespaces = serverutils.NewIncrementalDispatcher[string]() + c.participantNamespaces = serverutils.NewIncrementalDispatcher[string]() c.roomAgentNames = serverutils.NewIncrementalDispatcher[string]() c.publisherAgentNames = serverutils.NewIncrementalDispatcher[string]() + c.participantAgentNames = serverutils.NewIncrementalDispatcher[string]() - go c.checkEnabled(c.roomNamespaces, c.publisherNamespaces, c.roomAgentNames, c.publisherAgentNames) + go c.checkEnabled(c.roomNamespaces, c.publisherNamespaces, c.participantNamespaces, c.roomAgentNames, c.publisherAgentNames, c.participantAgentNames) } - target := c.roomNamespaces - agentNames := c.roomAgentNames - if jobType == livekit.JobType_JT_PUBLISHER { + var target *serverutils.IncrementalDispatcher[string] + var agentNames *serverutils.IncrementalDispatcher[string] + switch jobType { + case livekit.JobType_JT_ROOM: + target = c.roomNamespaces + agentNames = c.roomAgentNames + case livekit.JobType_JT_PUBLISHER: target = c.publisherNamespaces agentNames = c.publisherAgentNames + case livekit.JobType_JT_PARTICIPANT: + target = c.participantNamespaces + agentNames = c.participantAgentNames } c.mu.Unlock() @@ -229,11 +250,13 @@ func (c *agentClient) getDispatcher(agName string, jobType livekit.JobType) *ser return <-done } -func (c *agentClient) checkEnabled(roomNamespaces, publisherNamespaces, roomAgentNames, publisherAgentNames *serverutils.IncrementalDispatcher[string]) { +func (c *agentClient) checkEnabled(roomNamespaces, publisherNamespaces, participantNamespaces, roomAgentNames, publisherAgentNames, participantAgentNames *serverutils.IncrementalDispatcher[string]) { defer roomNamespaces.Done() defer publisherNamespaces.Done() + defer participantNamespaces.Done() defer roomAgentNames.Done() defer publisherAgentNames.Done() + defer participantAgentNames.Done() resChan, err := c.client.CheckEnabled(context.Background(), &rpc.CheckEnabledRequest{}, psrpc.WithRequestTimeout(CheckEnabledTimeout)) if err != nil { @@ -243,8 +266,10 @@ func (c *agentClient) checkEnabled(roomNamespaces, publisherNamespaces, roomAgen roomNSMap := make(map[string]bool) publisherNSMap := make(map[string]bool) + participantNSMap := make(map[string]bool) roomAgMap := make(map[string]bool) publisherAgMap := make(map[string]bool) + participantAgMap := make(map[string]bool) for r := range resChan { if r.Result.GetRoomEnabled() { @@ -275,6 +300,20 @@ func (c *agentClient) checkEnabled(roomNamespaces, publisherNamespaces, roomAgen } } } + if r.Result.GetParticipantEnabled() { + for _, ns := range r.Result.GetNamespaces() { + if _, ok := participantNSMap[ns]; !ok { + participantNamespaces.Add(ns) + participantNSMap[ns] = true + } + } + for _, ag := range r.Result.GetAgentNames() { + if _, ok := participantAgMap[ag]; !ok { + participantAgentNames.Add(ag) + participantAgMap[ag] = true + } + } + } } } diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 99ef9dbcd..a2fa0381d 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -520,6 +520,8 @@ func (r *Room) Join(participant types.LocalParticipant, requestSource routing.Me } }) + r.launchTargetAgents(maps.Values(r.agentDispatches), participant, livekit.JobType_JT_PARTICIPANT) + r.Logger.Debugw("new participant joined", "pID", participant.ID(), "participant", participant.Identity(), @@ -985,7 +987,10 @@ func (r *Room) AddAgentDispatch(dispatch *livekit.AgentDispatch) (*livekit.Agent r.lock.RLock() // launchPublisherAgents starts a goroutine to send requests, so is safe to call locked for _, p := range r.participants { - r.launchPublisherAgents([]*agentDispatch{ad}, p) + if p.IsPublisher() { + r.launchTargetAgents([]*agentDispatch{ad}, p, livekit.JobType_JT_PUBLISHER) + } + r.launchTargetAgents([]*agentDispatch{ad}, p, livekit.JobType_JT_PARTICIPANT) } r.lock.RUnlock() @@ -1209,7 +1214,7 @@ func (r *Room) onTrackPublished(participant types.LocalParticipant, track types. if !hasPublished { r.lock.RLock() - r.launchPublisherAgents(maps.Values(r.agentDispatches), participant) + r.launchTargetAgents(maps.Values(r.agentDispatches), participant, livekit.JobType_JT_PUBLISHER) r.lock.RUnlock() if r.internal != nil && r.internal.ParticipantEgress != nil { go func() { @@ -1645,7 +1650,7 @@ func (r *Room) launchRoomAgents(ads []*agentDispatch) { } } -func (r *Room) launchPublisherAgents(ads []*agentDispatch, p types.Participant) { +func (r *Room) launchTargetAgents(ads []*agentDispatch, p types.Participant, jobType livekit.JobType) { if p == nil || p.IsDependent() || r.agentClient == nil { return } @@ -1655,7 +1660,7 @@ func (r *Room) launchPublisherAgents(ads []*agentDispatch, p types.Participant) go func() { inc := r.agentClient.LaunchJob(context.Background(), &agent.JobRequest{ - JobType: livekit.JobType_JT_PUBLISHER, + JobType: jobType, Room: r.ToProto(), Participant: p.ToProto(), Metadata: ad.Metadata, diff --git a/pkg/service/agentservice.go b/pkg/service/agentservice.go index ba25ff96d..0ee3604de 100644 --- a/pkg/service/agentservice.go +++ b/pkg/service/agentservice.go @@ -17,6 +17,7 @@ package service import ( "context" "errors" + "fmt" "math/rand" "net/http" "slices" @@ -136,14 +137,16 @@ type AgentHandler struct { jobToWorker map[livekit.JobID]*agent.Worker keyProvider auth.KeyProvider - namespaceWorkers map[workerKey][]*agent.Worker - roomKeyCount int - publisherKeyCount int - namespaces []string // namespaces deprecated - agentNames []string + namespaceWorkers map[workerKey][]*agent.Worker + roomKeyCount int + publisherKeyCount int + participantKeyCount int + namespaces []string // namespaces deprecated + agentNames []string - roomTopic string - publisherTopic string + roomTopic string + publisherTopic string + participantTopic string } type workerKey struct { @@ -179,6 +182,7 @@ func NewAgentService(conf *config.Config, serverInfo, agent.RoomAgentTopic, agent.PublisherAgentTopic, + agent.ParticipantAgentTopic, ) return s, nil } @@ -197,6 +201,7 @@ func NewAgentHandler( serverInfo *livekit.ServerInfo, roomTopic string, publisherTopic string, + participantTopic string, ) *AgentHandler { return &AgentHandler{ agentServer: agentServer, @@ -208,6 +213,7 @@ func NewAgentHandler( keyProvider: keyProvider, roomTopic: roomTopic, publisherTopic: publisherTopic, + participantTopic: participantTopic, } } @@ -244,10 +250,18 @@ func (h *AgentHandler) registerWorker(w *agent.Worker) { if created { nameTopic := agent.GetAgentTopic(w.AgentName, w.Namespace) - typeTopic := h.roomTopic - if w.JobType == livekit.JobType_JT_PUBLISHER { + var typeTopic string + switch w.JobType { + case livekit.JobType_JT_ROOM: + typeTopic = h.roomTopic + case livekit.JobType_JT_PUBLISHER: typeTopic = h.publisherTopic + case livekit.JobType_JT_PARTICIPANT: + typeTopic = h.participantTopic } + + fmt.Println(">>> register worker", typeTopic) + err := h.agentServer.RegisterJobRequestTopic(nameTopic, typeTopic) if err != nil { h.mu.Unlock() @@ -257,10 +271,13 @@ func (h *AgentHandler) registerWorker(w *agent.Worker) { return } - if w.JobType == livekit.JobType_JT_ROOM { + switch w.JobType { + case livekit.JobType_JT_ROOM: h.roomKeyCount++ - } else { + case livekit.JobType_JT_PUBLISHER: h.publisherKeyCount++ + case livekit.JobType_JT_PARTICIPANT: + h.participantKeyCount++ } h.namespaces = append(h.namespaces, w.Namespace) @@ -316,12 +333,17 @@ func (h *AgentHandler) deregisterWorker(w *agent.Worker) { delete(h.namespaceWorkers, key) topic := agent.GetAgentTopic(w.AgentName, w.Namespace) - if w.JobType == livekit.JobType_JT_ROOM { + + switch w.JobType { + case livekit.JobType_JT_ROOM: h.roomKeyCount-- h.agentServer.DeregisterJobRequestTopic(topic, h.roomTopic) - } else { + case livekit.JobType_JT_PUBLISHER: h.publisherKeyCount-- h.agentServer.DeregisterJobRequestTopic(topic, h.publisherTopic) + case livekit.JobType_JT_PARTICIPANT: + h.participantKeyCount-- + h.agentServer.DeregisterJobRequestTopic(topic, h.participantTopic) } // agentNames and namespaces contains repeated entries for each agentNames/namespaces combinations @@ -352,6 +374,7 @@ func (h *AgentHandler) JobRequest(ctx context.Context, job *livekit.Job) (*rpc.J "jobID", job.Id, "namespace", job.Namespace, "agentName", job.AgentName, + "jobType", job.Type.String(), ) if job.Room != nil { logger = logger.WithValues("room", job.Room.Name, "roomID", job.Room.Sid) @@ -441,10 +464,11 @@ func (h *AgentHandler) CheckEnabled(ctx context.Context, req *rpc.CheckEnabledRe // This doesn't return the full agentName -> namespace mapping, which can cause some unnecessary RPC. // namespaces are however deprecated. return &rpc.CheckEnabledResponse{ - Namespaces: slices.Compact(slices.Clone(h.namespaces)), - AgentNames: slices.Compact(slices.Clone(h.agentNames)), - RoomEnabled: h.roomKeyCount != 0, - PublisherEnabled: h.publisherKeyCount != 0, + Namespaces: slices.Compact(slices.Clone(h.namespaces)), + AgentNames: slices.Compact(slices.Clone(h.agentNames)), + RoomEnabled: h.roomKeyCount != 0, + PublisherEnabled: h.publisherKeyCount != 0, + ParticipantEnabled: h.participantKeyCount != 0, }, nil } diff --git a/test/agent.go b/test/agent.go index ef34d594b..364052415 100644 --- a/test/agent.go +++ b/test/agent.go @@ -34,6 +34,8 @@ type agentClient struct { registered atomic.Int32 roomAvailability atomic.Int32 roomJobs atomic.Int32 + publisherAvailability atomic.Int32 + publisherJobs atomic.Int32 participantAvailability atomic.Int32 participantJobs atomic.Int32 @@ -89,6 +91,17 @@ func (c *agentClient) Run(jobType livekit.JobType, namespace string) (err error) }, }, }) + + case livekit.JobType_JT_PARTICIPANT: + err = c.write(&livekit.WorkerMessage{ + Message: &livekit.WorkerMessage_Register{ + Register: &livekit.RegisterWorkerRequest{ + Type: livekit.JobType_JT_PARTICIPANT, + Version: "version", + Namespace: &namespace, + }, + }, + }) } return err @@ -123,17 +136,23 @@ func (c *agentClient) read() { } func (c *agentClient) handleAssignment(req *livekit.JobAssignment) { - if req.Job.Type == livekit.JobType_JT_ROOM { + switch req.Job.Type { + case livekit.JobType_JT_ROOM: c.roomJobs.Inc() - } else { + case livekit.JobType_JT_PUBLISHER: + c.publisherJobs.Inc() + case livekit.JobType_JT_PARTICIPANT: c.participantJobs.Inc() } } func (c *agentClient) handleAvailability(req *livekit.AvailabilityRequest) { - if req.Job.Type == livekit.JobType_JT_ROOM { + switch req.Job.Type { + case livekit.JobType_JT_ROOM: c.roomAvailability.Inc() - } else { + case livekit.JobType_JT_PUBLISHER: + c.publisherAvailability.Inc() + case livekit.JobType_JT_PARTICIPANT: c.participantAvailability.Inc() } diff --git a/test/agent_test.go b/test/agent_test.go index 554e36e33..795fa1111 100644 --- a/test/agent_test.go +++ b/test/agent_test.go @@ -43,17 +43,25 @@ func TestAgents(t *testing.T) { require.NoError(t, err) ac4, err := newAgentClient(agentToken(), defaultServerPort) require.NoError(t, err) + ac5, err := newAgentClient(agentToken(), defaultServerPort) + require.NoError(t, err) + ac6, err := newAgentClient(agentToken(), defaultServerPort) + require.NoError(t, err) defer ac1.close() defer ac2.close() defer ac3.close() defer ac4.close() + defer ac5.close() + defer ac6.close() ac1.Run(livekit.JobType_JT_ROOM, "default") ac2.Run(livekit.JobType_JT_ROOM, "default") ac3.Run(livekit.JobType_JT_PUBLISHER, "default") ac4.Run(livekit.JobType_JT_PUBLISHER, "default") + ac5.Run(livekit.JobType_JT_PARTICIPANT, "default") + ac6.Run(livekit.JobType_JT_PARTICIPANT, "default") testutils.WithTimeout(t, func() string { - if ac1.registered.Load() != 1 || ac2.registered.Load() != 1 || ac3.registered.Load() != 1 || ac4.registered.Load() != 1 { + if ac1.registered.Load() != 1 || ac2.registered.Load() != 1 || ac3.registered.Load() != 1 || ac4.registered.Load() != 1 || ac5.registered.Load() != 1 || ac6.registered.Load() != 1 { return "worker not registered" } @@ -77,8 +85,12 @@ func TestAgents(t *testing.T) { return "room job not assigned" } - if ac3.participantJobs.Load()+ac4.participantJobs.Load() != 1 { - return fmt.Sprintf("participant jobs not assigned, ac3: %d, ac4: %d", ac3.participantJobs.Load(), ac4.participantJobs.Load()) + if ac3.publisherJobs.Load()+ac4.publisherJobs.Load() != 1 { + return fmt.Sprintf("publisher jobs not assigned, ac3: %d, ac4: %d", ac3.publisherJobs.Load(), ac4.publisherJobs.Load()) + } + + if ac5.participantJobs.Load()+ac6.participantJobs.Load() != 2 { + return fmt.Sprintf("participant jobs not assigned, ac5: %d, ac6: %d", ac5.participantJobs.Load(), ac6.participantJobs.Load()) } return "" @@ -97,10 +109,14 @@ func TestAgents(t *testing.T) { return "room job must be assigned 1 time" } - if ac3.participantJobs.Load()+ac4.participantJobs.Load() != 2 { + if ac3.publisherJobs.Load()+ac4.publisherJobs.Load() != 2 { return "2 publisher jobs must assigned" } + if ac5.participantJobs.Load()+ac6.participantJobs.Load() != 2 { + return "2 participant jobs must assigned" + } + return "" }, AssignJobTimeout) } @@ -198,7 +214,7 @@ func TestAgentMultiNode(t *testing.T) { return "room job not assigned" } - if ac2.participantJobs.Load() != 1 { + if ac2.publisherJobs.Load() != 1 { return "participant job not assigned" } diff --git a/test/client/client.go b/test/client/client.go index b54fedb1e..777c5a1e0 100644 --- a/test/client/client.go +++ b/test/client/client.go @@ -269,7 +269,6 @@ func NewRTCClient(conn *websocket.Conn, opts *Options) (*RTCClient, error) { return c.SendIceCandidate(ic, livekit.SignalTarget_SUBSCRIBER) }) subscriberHandler.OnTrackCalls(func(track *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver) { - fmt.Println("ontrack", track.Codec(), track.PayloadType()) go c.processTrack(track) }) subscriberHandler.OnDataPacketCalls(c.handleDataMessage)