add participant job type (#3443)

* add participant job type

* cleanup

* deps
This commit is contained in:
Paul Wells
2025-02-18 00:40:56 -08:00
committed by GitHub
parent b2a54729f5
commit f49103a003
8 changed files with 152 additions and 50 deletions
+1 -1
View File
@@ -22,7 +22,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20241220010243-a2bdee945564
github.com/livekit/protocol v1.33.1-0.20250213045117-9b6e5d703ff8
github.com/livekit/protocol v1.33.1-0.20250218083317-1a019aab5b83
github.com/livekit/psrpc v0.6.1-0.20250205181828-a0beed2e4126
github.com/mackerelio/go-osstat v0.2.5
github.com/magefile/mage v1.15.0
+2 -2
View File
@@ -170,8 +170,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20241220010243-a2bdee945564 h1:GX7KF/V9ExmcfT/2Bdia8aROjkxrgx7WpyH7w9MB4J4=
github.com/livekit/mediatransportutil v0.0.0-20241220010243-a2bdee945564/go.mod h1:36s+wwmU3O40IAhE+MjBWP3W71QRiEE9SfooSBvtBqY=
github.com/livekit/protocol v1.33.1-0.20250213045117-9b6e5d703ff8 h1:TwYfrw9nnuhpTYsgY/T+OQi8z9s1itgdzNMDAOmq3BA=
github.com/livekit/protocol v1.33.1-0.20250213045117-9b6e5d703ff8/go.mod h1:yXuQ7ucrLj91nbxL6/AHgtxdha1DGzLj1LkgvnT90So=
github.com/livekit/protocol v1.33.1-0.20250218083317-1a019aab5b83 h1:1HFZ41AaFE+disN7Md9g0MQNpnw9Y2p3QKbjYNtjQjA=
github.com/livekit/protocol v1.33.1-0.20250218083317-1a019aab5b83/go.mod h1:yXuQ7ucrLj91nbxL6/AHgtxdha1DGzLj1LkgvnT90So=
github.com/livekit/psrpc v0.6.1-0.20250205181828-a0beed2e4126 h1:fzuYpAQbCid7ySPpQWWePfQOWUrs8x6dJ0T3Wl07n+Y=
github.com/livekit/psrpc v0.6.1-0.20250205181828-a0beed2e4126/go.mod h1:X5WtEZ7OnEs72Fi5/J+i0on3964F1aynQpCalcgMqRo=
github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o=
+55 -16
View File
@@ -21,24 +21,32 @@ import (
"time"
"github.com/gammazero/workerpool"
"google.golang.org/protobuf/types/known/emptypb"
serverutils "github.com/livekit/livekit-server/pkg/utils"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
"github.com/livekit/protocol/utils"
"github.com/livekit/psrpc"
"google.golang.org/protobuf/types/known/emptypb"
)
const (
EnabledCacheTTL = 1 * time.Minute
RoomAgentTopic = "room"
PublisherAgentTopic = "publisher"
ParticipantAgentTopic = "participant"
DefaultHandlerNamespace = ""
CheckEnabledTimeout = 5 * time.Second
)
var jobTypeTopics = map[livekit.JobType]string{
livekit.JobType_JT_ROOM: RoomAgentTopic,
livekit.JobType_JT_PUBLISHER: PublisherAgentTopic,
livekit.JobType_JT_PARTICIPANT: ParticipantAgentTopic,
}
type Client interface {
// LaunchJob starts a room or participant job on an agent.
// it will launch a job once for each worker in each namespace
@@ -64,10 +72,12 @@ type agentClient struct {
// cache response to avoid constantly checking with controllers
// cache is invalidated with AgentRegistered updates
roomNamespaces *serverutils.IncrementalDispatcher[string] // deprecated
publisherNamespaces *serverutils.IncrementalDispatcher[string] // deprecated
roomAgentNames *serverutils.IncrementalDispatcher[string]
publisherAgentNames *serverutils.IncrementalDispatcher[string]
roomNamespaces *serverutils.IncrementalDispatcher[string] // deprecated
publisherNamespaces *serverutils.IncrementalDispatcher[string] // deprecated
participantNamespaces *serverutils.IncrementalDispatcher[string] // deprecated
roomAgentNames *serverutils.IncrementalDispatcher[string]
publisherAgentNames *serverutils.IncrementalDispatcher[string]
participantAgentNames *serverutils.IncrementalDispatcher[string]
enabledExpiresAt time.Time
@@ -102,8 +112,10 @@ func NewAgentClient(bus psrpc.MessageBus) (Client, error) {
c.mu.Lock()
c.roomNamespaces = nil
c.publisherNamespaces = nil
c.participantNamespaces = nil
c.roomAgentNames = nil
c.publisherAgentNames = nil
c.participantAgentNames = nil
c.mu.Unlock()
}
@@ -114,11 +126,6 @@ func NewAgentClient(bus psrpc.MessageBus) (Client, error) {
}
func (c *agentClient) LaunchJob(ctx context.Context, desc *JobRequest) *serverutils.IncrementalDispatcher[*livekit.Job] {
jobTypeTopic := RoomAgentTopic
if desc.JobType == livekit.JobType_JT_PUBLISHER {
jobTypeTopic = PublisherAgentTopic
}
var wg sync.WaitGroup
ret := serverutils.NewIncrementalDispatcher[*livekit.Job]()
defer func() {
@@ -128,6 +135,11 @@ func (c *agentClient) LaunchJob(ctx context.Context, desc *JobRequest) *serverut
})
}()
jobTypeTopic, ok := jobTypeTopics[desc.JobType]
if !ok {
return ret
}
dispatcher := c.getDispatcher(desc.AgentName, desc.JobType)
if dispatcher == nil {
@@ -186,21 +198,30 @@ func (c *agentClient) getDispatcher(agName string, jobType livekit.JobType) *ser
c.mu.Lock()
if time.Since(c.enabledExpiresAt) > EnabledCacheTTL || c.roomNamespaces == nil ||
c.publisherNamespaces == nil || c.roomAgentNames == nil || c.publisherAgentNames == nil {
c.publisherNamespaces == nil || c.participantNamespaces == nil || c.roomAgentNames == nil || c.publisherAgentNames == nil || c.participantAgentNames == nil {
c.enabledExpiresAt = time.Now()
c.roomNamespaces = serverutils.NewIncrementalDispatcher[string]()
c.publisherNamespaces = serverutils.NewIncrementalDispatcher[string]()
c.participantNamespaces = serverutils.NewIncrementalDispatcher[string]()
c.roomAgentNames = serverutils.NewIncrementalDispatcher[string]()
c.publisherAgentNames = serverutils.NewIncrementalDispatcher[string]()
c.participantAgentNames = serverutils.NewIncrementalDispatcher[string]()
go c.checkEnabled(c.roomNamespaces, c.publisherNamespaces, c.roomAgentNames, c.publisherAgentNames)
go c.checkEnabled(c.roomNamespaces, c.publisherNamespaces, c.participantNamespaces, c.roomAgentNames, c.publisherAgentNames, c.participantAgentNames)
}
target := c.roomNamespaces
agentNames := c.roomAgentNames
if jobType == livekit.JobType_JT_PUBLISHER {
var target *serverutils.IncrementalDispatcher[string]
var agentNames *serverutils.IncrementalDispatcher[string]
switch jobType {
case livekit.JobType_JT_ROOM:
target = c.roomNamespaces
agentNames = c.roomAgentNames
case livekit.JobType_JT_PUBLISHER:
target = c.publisherNamespaces
agentNames = c.publisherAgentNames
case livekit.JobType_JT_PARTICIPANT:
target = c.participantNamespaces
agentNames = c.participantAgentNames
}
c.mu.Unlock()
@@ -229,11 +250,13 @@ func (c *agentClient) getDispatcher(agName string, jobType livekit.JobType) *ser
return <-done
}
func (c *agentClient) checkEnabled(roomNamespaces, publisherNamespaces, roomAgentNames, publisherAgentNames *serverutils.IncrementalDispatcher[string]) {
func (c *agentClient) checkEnabled(roomNamespaces, publisherNamespaces, participantNamespaces, roomAgentNames, publisherAgentNames, participantAgentNames *serverutils.IncrementalDispatcher[string]) {
defer roomNamespaces.Done()
defer publisherNamespaces.Done()
defer participantNamespaces.Done()
defer roomAgentNames.Done()
defer publisherAgentNames.Done()
defer participantAgentNames.Done()
resChan, err := c.client.CheckEnabled(context.Background(), &rpc.CheckEnabledRequest{}, psrpc.WithRequestTimeout(CheckEnabledTimeout))
if err != nil {
@@ -243,8 +266,10 @@ func (c *agentClient) checkEnabled(roomNamespaces, publisherNamespaces, roomAgen
roomNSMap := make(map[string]bool)
publisherNSMap := make(map[string]bool)
participantNSMap := make(map[string]bool)
roomAgMap := make(map[string]bool)
publisherAgMap := make(map[string]bool)
participantAgMap := make(map[string]bool)
for r := range resChan {
if r.Result.GetRoomEnabled() {
@@ -275,6 +300,20 @@ func (c *agentClient) checkEnabled(roomNamespaces, publisherNamespaces, roomAgen
}
}
}
if r.Result.GetParticipantEnabled() {
for _, ns := range r.Result.GetNamespaces() {
if _, ok := participantNSMap[ns]; !ok {
participantNamespaces.Add(ns)
participantNSMap[ns] = true
}
}
for _, ag := range r.Result.GetAgentNames() {
if _, ok := participantAgMap[ag]; !ok {
participantAgentNames.Add(ag)
participantAgMap[ag] = true
}
}
}
}
}
+9 -4
View File
@@ -520,6 +520,8 @@ func (r *Room) Join(participant types.LocalParticipant, requestSource routing.Me
}
})
r.launchTargetAgents(maps.Values(r.agentDispatches), participant, livekit.JobType_JT_PARTICIPANT)
r.Logger.Debugw("new participant joined",
"pID", participant.ID(),
"participant", participant.Identity(),
@@ -985,7 +987,10 @@ func (r *Room) AddAgentDispatch(dispatch *livekit.AgentDispatch) (*livekit.Agent
r.lock.RLock()
// launchPublisherAgents starts a goroutine to send requests, so is safe to call locked
for _, p := range r.participants {
r.launchPublisherAgents([]*agentDispatch{ad}, p)
if p.IsPublisher() {
r.launchTargetAgents([]*agentDispatch{ad}, p, livekit.JobType_JT_PUBLISHER)
}
r.launchTargetAgents([]*agentDispatch{ad}, p, livekit.JobType_JT_PARTICIPANT)
}
r.lock.RUnlock()
@@ -1209,7 +1214,7 @@ func (r *Room) onTrackPublished(participant types.LocalParticipant, track types.
if !hasPublished {
r.lock.RLock()
r.launchPublisherAgents(maps.Values(r.agentDispatches), participant)
r.launchTargetAgents(maps.Values(r.agentDispatches), participant, livekit.JobType_JT_PUBLISHER)
r.lock.RUnlock()
if r.internal != nil && r.internal.ParticipantEgress != nil {
go func() {
@@ -1645,7 +1650,7 @@ func (r *Room) launchRoomAgents(ads []*agentDispatch) {
}
}
func (r *Room) launchPublisherAgents(ads []*agentDispatch, p types.Participant) {
func (r *Room) launchTargetAgents(ads []*agentDispatch, p types.Participant, jobType livekit.JobType) {
if p == nil || p.IsDependent() || r.agentClient == nil {
return
}
@@ -1655,7 +1660,7 @@ func (r *Room) launchPublisherAgents(ads []*agentDispatch, p types.Participant)
go func() {
inc := r.agentClient.LaunchJob(context.Background(), &agent.JobRequest{
JobType: livekit.JobType_JT_PUBLISHER,
JobType: jobType,
Room: r.ToProto(),
Participant: p.ToProto(),
Metadata: ad.Metadata,
+41 -17
View File
@@ -17,6 +17,7 @@ package service
import (
"context"
"errors"
"fmt"
"math/rand"
"net/http"
"slices"
@@ -136,14 +137,16 @@ type AgentHandler struct {
jobToWorker map[livekit.JobID]*agent.Worker
keyProvider auth.KeyProvider
namespaceWorkers map[workerKey][]*agent.Worker
roomKeyCount int
publisherKeyCount int
namespaces []string // namespaces deprecated
agentNames []string
namespaceWorkers map[workerKey][]*agent.Worker
roomKeyCount int
publisherKeyCount int
participantKeyCount int
namespaces []string // namespaces deprecated
agentNames []string
roomTopic string
publisherTopic string
roomTopic string
publisherTopic string
participantTopic string
}
type workerKey struct {
@@ -179,6 +182,7 @@ func NewAgentService(conf *config.Config,
serverInfo,
agent.RoomAgentTopic,
agent.PublisherAgentTopic,
agent.ParticipantAgentTopic,
)
return s, nil
}
@@ -197,6 +201,7 @@ func NewAgentHandler(
serverInfo *livekit.ServerInfo,
roomTopic string,
publisherTopic string,
participantTopic string,
) *AgentHandler {
return &AgentHandler{
agentServer: agentServer,
@@ -208,6 +213,7 @@ func NewAgentHandler(
keyProvider: keyProvider,
roomTopic: roomTopic,
publisherTopic: publisherTopic,
participantTopic: participantTopic,
}
}
@@ -244,10 +250,18 @@ func (h *AgentHandler) registerWorker(w *agent.Worker) {
if created {
nameTopic := agent.GetAgentTopic(w.AgentName, w.Namespace)
typeTopic := h.roomTopic
if w.JobType == livekit.JobType_JT_PUBLISHER {
var typeTopic string
switch w.JobType {
case livekit.JobType_JT_ROOM:
typeTopic = h.roomTopic
case livekit.JobType_JT_PUBLISHER:
typeTopic = h.publisherTopic
case livekit.JobType_JT_PARTICIPANT:
typeTopic = h.participantTopic
}
fmt.Println(">>> register worker", typeTopic)
err := h.agentServer.RegisterJobRequestTopic(nameTopic, typeTopic)
if err != nil {
h.mu.Unlock()
@@ -257,10 +271,13 @@ func (h *AgentHandler) registerWorker(w *agent.Worker) {
return
}
if w.JobType == livekit.JobType_JT_ROOM {
switch w.JobType {
case livekit.JobType_JT_ROOM:
h.roomKeyCount++
} else {
case livekit.JobType_JT_PUBLISHER:
h.publisherKeyCount++
case livekit.JobType_JT_PARTICIPANT:
h.participantKeyCount++
}
h.namespaces = append(h.namespaces, w.Namespace)
@@ -316,12 +333,17 @@ func (h *AgentHandler) deregisterWorker(w *agent.Worker) {
delete(h.namespaceWorkers, key)
topic := agent.GetAgentTopic(w.AgentName, w.Namespace)
if w.JobType == livekit.JobType_JT_ROOM {
switch w.JobType {
case livekit.JobType_JT_ROOM:
h.roomKeyCount--
h.agentServer.DeregisterJobRequestTopic(topic, h.roomTopic)
} else {
case livekit.JobType_JT_PUBLISHER:
h.publisherKeyCount--
h.agentServer.DeregisterJobRequestTopic(topic, h.publisherTopic)
case livekit.JobType_JT_PARTICIPANT:
h.participantKeyCount--
h.agentServer.DeregisterJobRequestTopic(topic, h.participantTopic)
}
// agentNames and namespaces contains repeated entries for each agentNames/namespaces combinations
@@ -352,6 +374,7 @@ func (h *AgentHandler) JobRequest(ctx context.Context, job *livekit.Job) (*rpc.J
"jobID", job.Id,
"namespace", job.Namespace,
"agentName", job.AgentName,
"jobType", job.Type.String(),
)
if job.Room != nil {
logger = logger.WithValues("room", job.Room.Name, "roomID", job.Room.Sid)
@@ -441,10 +464,11 @@ func (h *AgentHandler) CheckEnabled(ctx context.Context, req *rpc.CheckEnabledRe
// This doesn't return the full agentName -> namespace mapping, which can cause some unnecessary RPC.
// namespaces are however deprecated.
return &rpc.CheckEnabledResponse{
Namespaces: slices.Compact(slices.Clone(h.namespaces)),
AgentNames: slices.Compact(slices.Clone(h.agentNames)),
RoomEnabled: h.roomKeyCount != 0,
PublisherEnabled: h.publisherKeyCount != 0,
Namespaces: slices.Compact(slices.Clone(h.namespaces)),
AgentNames: slices.Compact(slices.Clone(h.agentNames)),
RoomEnabled: h.roomKeyCount != 0,
PublisherEnabled: h.publisherKeyCount != 0,
ParticipantEnabled: h.participantKeyCount != 0,
}, nil
}
+23 -4
View File
@@ -34,6 +34,8 @@ type agentClient struct {
registered atomic.Int32
roomAvailability atomic.Int32
roomJobs atomic.Int32
publisherAvailability atomic.Int32
publisherJobs atomic.Int32
participantAvailability atomic.Int32
participantJobs atomic.Int32
@@ -89,6 +91,17 @@ func (c *agentClient) Run(jobType livekit.JobType, namespace string) (err error)
},
},
})
case livekit.JobType_JT_PARTICIPANT:
err = c.write(&livekit.WorkerMessage{
Message: &livekit.WorkerMessage_Register{
Register: &livekit.RegisterWorkerRequest{
Type: livekit.JobType_JT_PARTICIPANT,
Version: "version",
Namespace: &namespace,
},
},
})
}
return err
@@ -123,17 +136,23 @@ func (c *agentClient) read() {
}
func (c *agentClient) handleAssignment(req *livekit.JobAssignment) {
if req.Job.Type == livekit.JobType_JT_ROOM {
switch req.Job.Type {
case livekit.JobType_JT_ROOM:
c.roomJobs.Inc()
} else {
case livekit.JobType_JT_PUBLISHER:
c.publisherJobs.Inc()
case livekit.JobType_JT_PARTICIPANT:
c.participantJobs.Inc()
}
}
func (c *agentClient) handleAvailability(req *livekit.AvailabilityRequest) {
if req.Job.Type == livekit.JobType_JT_ROOM {
switch req.Job.Type {
case livekit.JobType_JT_ROOM:
c.roomAvailability.Inc()
} else {
case livekit.JobType_JT_PUBLISHER:
c.publisherAvailability.Inc()
case livekit.JobType_JT_PARTICIPANT:
c.participantAvailability.Inc()
}
+21 -5
View File
@@ -43,17 +43,25 @@ func TestAgents(t *testing.T) {
require.NoError(t, err)
ac4, err := newAgentClient(agentToken(), defaultServerPort)
require.NoError(t, err)
ac5, err := newAgentClient(agentToken(), defaultServerPort)
require.NoError(t, err)
ac6, err := newAgentClient(agentToken(), defaultServerPort)
require.NoError(t, err)
defer ac1.close()
defer ac2.close()
defer ac3.close()
defer ac4.close()
defer ac5.close()
defer ac6.close()
ac1.Run(livekit.JobType_JT_ROOM, "default")
ac2.Run(livekit.JobType_JT_ROOM, "default")
ac3.Run(livekit.JobType_JT_PUBLISHER, "default")
ac4.Run(livekit.JobType_JT_PUBLISHER, "default")
ac5.Run(livekit.JobType_JT_PARTICIPANT, "default")
ac6.Run(livekit.JobType_JT_PARTICIPANT, "default")
testutils.WithTimeout(t, func() string {
if ac1.registered.Load() != 1 || ac2.registered.Load() != 1 || ac3.registered.Load() != 1 || ac4.registered.Load() != 1 {
if ac1.registered.Load() != 1 || ac2.registered.Load() != 1 || ac3.registered.Load() != 1 || ac4.registered.Load() != 1 || ac5.registered.Load() != 1 || ac6.registered.Load() != 1 {
return "worker not registered"
}
@@ -77,8 +85,12 @@ func TestAgents(t *testing.T) {
return "room job not assigned"
}
if ac3.participantJobs.Load()+ac4.participantJobs.Load() != 1 {
return fmt.Sprintf("participant jobs not assigned, ac3: %d, ac4: %d", ac3.participantJobs.Load(), ac4.participantJobs.Load())
if ac3.publisherJobs.Load()+ac4.publisherJobs.Load() != 1 {
return fmt.Sprintf("publisher jobs not assigned, ac3: %d, ac4: %d", ac3.publisherJobs.Load(), ac4.publisherJobs.Load())
}
if ac5.participantJobs.Load()+ac6.participantJobs.Load() != 2 {
return fmt.Sprintf("participant jobs not assigned, ac5: %d, ac6: %d", ac5.participantJobs.Load(), ac6.participantJobs.Load())
}
return ""
@@ -97,10 +109,14 @@ func TestAgents(t *testing.T) {
return "room job must be assigned 1 time"
}
if ac3.participantJobs.Load()+ac4.participantJobs.Load() != 2 {
if ac3.publisherJobs.Load()+ac4.publisherJobs.Load() != 2 {
return "2 publisher jobs must assigned"
}
if ac5.participantJobs.Load()+ac6.participantJobs.Load() != 2 {
return "2 participant jobs must assigned"
}
return ""
}, AssignJobTimeout)
}
@@ -198,7 +214,7 @@ func TestAgentMultiNode(t *testing.T) {
return "room job not assigned"
}
if ac2.participantJobs.Load() != 1 {
if ac2.publisherJobs.Load() != 1 {
return "participant job not assigned"
}
-1
View File
@@ -269,7 +269,6 @@ func NewRTCClient(conn *websocket.Conn, opts *Options) (*RTCClient, error) {
return c.SendIceCandidate(ic, livekit.SignalTarget_SUBSCRIBER)
})
subscriberHandler.OnTrackCalls(func(track *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver) {
fmt.Println("ontrack", track.Codec(), track.PayloadType())
go c.processTrack(track)
})
subscriberHandler.OnDataPacketCalls(c.handleDataMessage)