Implement Agents protocol addition (#2786)

This commit is contained in:
Benjamin Pracht
2024-06-27 19:20:52 -07:00
committed by GitHub
parent 6815d85c01
commit 32a4d03c9e
12 changed files with 325 additions and 171 deletions
+9 -2
View File
@@ -20,7 +20,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20240625074155-301bb4a816b7
github.com/livekit/protocol v1.19.1-0.20240625064703-b2dc8deac6c9
github.com/livekit/protocol v1.19.1-0.20240627173058-82786f41fdb6
github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a
github.com/mackerelio/go-osstat v0.2.5
github.com/magefile/mage v1.15.0
@@ -58,12 +58,16 @@ require (
)
require (
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.33.0-20240401165935-b983156c5e99.1 // indirect
dario.cat/mergo v1.0.0 // indirect
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bufbuild/protovalidate-go v0.6.1 // indirect
github.com/bufbuild/protoyaml-go v0.1.9 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/containerd/continuity v0.4.3 // indirect
@@ -80,6 +84,7 @@ require (
github.com/go-jose/go-jose/v3 v3.0.3 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/cel-go v0.20.1 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/google/subcommands v1.2.0 // indirect
@@ -116,6 +121,7 @@ require (
github.com/puzpuzpuz/xsync/v3 v3.1.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/stoewer/go-strcase v1.3.0 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
github.com/xeipuuv/gojsonschema v1.2.0 // indirect
@@ -128,7 +134,8 @@ require (
golang.org/x/sys v0.21.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/tools v0.22.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240401170217-c3f982113cda // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect
google.golang.org/grpc v1.64.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)
+23 -4
View File
@@ -1,3 +1,5 @@
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.33.0-20240401165935-b983156c5e99.1 h1:2IGhRovxlsOIQgx2ekZWo4wTPAYpck41+18ICxs37is=
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.33.0-20240401165935-b983156c5e99.1/go.mod h1:Tgn5bgL220vkFOI0KPStlcClPeOJzAv4uT+V8JXGUnw=
dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk=
dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0=
@@ -6,6 +8,8 @@ github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERo
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw=
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk=
github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI=
github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g=
github.com/avast/retry-go/v4 v4.6.0 h1:K9xNA+KeB8HHc2aWFuLb25Offp+0iVRXEvFx8IinRJA=
github.com/avast/retry-go/v4 v4.6.0/go.mod h1:gvWlPhBVsvBbLkVGDg/KwvBv0bEkCOLRRSHKIr2PyOE=
github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o=
@@ -18,6 +22,10 @@ github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/bufbuild/protovalidate-go v0.6.1 h1:uzW8r0CDvqApUChNj87VzZVoQSKhcVdw5UWOE605UIw=
github.com/bufbuild/protovalidate-go v0.6.1/go.mod h1:4BR3rKEJiUiTy+sqsusFn2ladOf0kYmA2Reo6BHSBgQ=
github.com/bufbuild/protoyaml-go v0.1.9 h1:anV5UtF1Mlvkkgp4NWA6U/zOnJFng8Orq4Vf3ZUQHBU=
github.com/bufbuild/protoyaml-go v0.1.9/go.mod h1:KCBItkvZOK/zwGueLdH1Wx1RLyFn5rCH7YjQrdty2Wc=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
@@ -56,6 +64,8 @@ github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/elliotchance/orderedmap/v2 v2.2.0 h1:7/2iwO98kYT4XkOjA9mBEIwvi4KpGB4cyHeOFOnj4Vk=
github.com/elliotchance/orderedmap/v2 v2.2.0/go.mod h1:85lZyVbpGaGvHvnKa7Qhx7zncAdBIBq6u56Hb1PRU5Q=
github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A=
github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew=
github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM=
github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE=
github.com/florianl/go-tc v0.4.3 h1:xpobG2gFNvEqbclU07zjddALSjqTQTWJkxg5/kRYDpw=
@@ -78,6 +88,9 @@ github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfC
github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/google/cel-go v0.20.1 h1:nDx9r8S3L4pE61eDdt8igGj8rf5kjYR3ILxWIpWNi84=
github.com/google/cel-go v0.20.1/go.mod h1:kWcIzTsPX0zmQ+H3TirHstLLf9ep5QTsZBN9u4dOYLg=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
@@ -154,8 +167,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20240625074155-301bb4a816b7 h1:F1L8inJoynwIAYpZENNYS+1xHJMF5RFRorsnAlcxfSY=
github.com/livekit/mediatransportutil v0.0.0-20240625074155-301bb4a816b7/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE=
github.com/livekit/protocol v1.19.1-0.20240625064703-b2dc8deac6c9 h1:dG/rhtgXL3Clh/ywRH2g2g5pnssPr98OP6VmbOCSB2A=
github.com/livekit/protocol v1.19.1-0.20240625064703-b2dc8deac6c9/go.mod h1:zUHHcMbhBRDe0+OBTHbQT1c7njLKEki+xe1qPpR7LhU=
github.com/livekit/protocol v1.19.1-0.20240627173058-82786f41fdb6 h1:XtyV+MqHqXTuNLXz5TUjYtNg0gvTVw9web/YuXD9+3c=
github.com/livekit/protocol v1.19.1-0.20240627173058-82786f41fdb6/go.mod h1:bNjJi+8frdvC84xG0CJ/7VfVvqerLg2MzjOks0ucyC4=
github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a h1:EQAHmcYEGlc6V517cQ3Iy0+jHgP6+tM/B4l2vGuLpQo=
github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o=
@@ -288,6 +301,8 @@ github.com/sclevine/spec v1.4.0 h1:z/Q9idDcay5m5irkZ28M7PtQM4aOISzOpj4bUPkDee8=
github.com/sclevine/spec v1.4.0/go.mod h1:LvpgJaFyvQzRvc1kaDs0bulYwzC70PbiYjC4QnFHkOM=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stoewer/go-strcase v1.3.0 h1:g0eASXYtp+yvN9fK8sH94oCIk0fau9uV1/ZdJ0AVEzs=
github.com/stoewer/go-strcase v1.3.0/go.mod h1:fAH5hQ5pehh+j3nZfvwdk2RgEgQjAoM8wodgtPmh1xo=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
@@ -477,10 +492,14 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d h1:k3zyW3BYYR30e8v3x0bTDdE9vpYFjZHK+HcyqkrppWk=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY=
google.golang.org/genproto/googleapis/api v0.0.0-20240401170217-c3f982113cda h1:b6F6WIV4xHHD0FA4oIyzU6mHWg2WI2X1RBehwa5QN38=
google.golang.org/genproto/googleapis/api v0.0.0-20240401170217-c3f982113cda/go.mod h1:AHcE/gZH76Bk/ROZhQphlRoWo5xKDEtz3eVEO1LfA8c=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda h1:LI5DOvAxUPMv/50agcLLoo+AdWc1irS9Rzz4vPuD1V4=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY=
google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY=
google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+55 -32
View File
@@ -20,15 +20,13 @@ import (
"time"
"github.com/gammazero/workerpool"
"google.golang.org/protobuf/types/known/emptypb"
serverutils "github.com/livekit/livekit-server/pkg/utils"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
"github.com/livekit/protocol/utils"
"github.com/livekit/protocol/utils/guid"
"github.com/livekit/psrpc"
"google.golang.org/protobuf/types/known/emptypb"
)
const (
@@ -43,15 +41,17 @@ const (
type Client interface {
// LaunchJob starts a room or participant job on an agent.
// it will launch a job once for each worker in each namespace
LaunchJob(ctx context.Context, desc *JobDescription)
LaunchJob(ctx context.Context, desc *JobRequest)
Stop() error
}
type JobDescription struct {
type JobRequest struct {
JobType livekit.JobType
Room *livekit.Room
// only set for participant jobs
Participant *livekit.ParticipantInfo
Metadata string
Namespace string
}
type agentClient struct {
@@ -105,52 +105,75 @@ func NewAgentClient(bus psrpc.MessageBus) (Client, error) {
return c, nil
}
func (c *agentClient) LaunchJob(ctx context.Context, desc *JobDescription) {
roomNamespaces, publisherNamespaces, needsRefresh := c.getOrCreateDispatchers()
if needsRefresh {
go c.checkEnabled(ctx, roomNamespaces, publisherNamespaces)
}
target := roomNamespaces
func (c *agentClient) LaunchJob(ctx context.Context, desc *JobRequest) {
jobTypeTopic := RoomAgentTopic
if desc.JobType == livekit.JobType_JT_PUBLISHER {
target = publisherNamespaces
jobTypeTopic = PublisherAgentTopic
}
target.ForEach(func(ns string) {
c.workers.Submit(func() {
_, err := c.client.JobRequest(ctx, ns, jobTypeTopic, &livekit.Job{
Id: guid.New(utils.AgentJobPrefix),
Type: desc.JobType,
Room: desc.Room,
Participant: desc.Participant,
Namespace: ns,
})
if err != nil {
logger.Errorw("failed to send job request", err, "namespace", ns, "jobType", jobTypeTopic)
}
if !c.isNamespaceActive(desc.Namespace, desc.JobType) {
logger.Infow("not dispatching agent job since no worker is available", "namespace", desc.Namespace, "jobType", desc.JobType)
return
}
c.workers.Submit(func() {
_, err := c.client.JobRequest(context.Background(), desc.Namespace, jobTypeTopic, &livekit.Job{
Id: utils.NewGuid(utils.AgentJobPrefix),
Type: desc.JobType,
Room: desc.Room,
Participant: desc.Participant,
Namespace: desc.Namespace,
Metadata: desc.Metadata,
})
if err != nil {
logger.Infow("failed to send job request", "error", err, "namespace", desc.Namespace, "jobType", desc.JobType)
}
})
}
func (c *agentClient) getOrCreateDispatchers() (*serverutils.IncrementalDispatcher[string], *serverutils.IncrementalDispatcher[string], bool) {
func (c *agentClient) isNamespaceActive(ns string, jobType livekit.JobType) bool {
c.mu.Lock()
defer c.mu.Unlock()
if time.Since(c.enabledExpiresAt) > EnabledCacheTTL || c.roomNamespaces == nil || c.publisherNamespaces == nil {
c.enabledExpiresAt = time.Now()
c.roomNamespaces = serverutils.NewIncrementalDispatcher[string]()
c.publisherNamespaces = serverutils.NewIncrementalDispatcher[string]()
return c.roomNamespaces, c.publisherNamespaces, true
go c.checkEnabled(c.roomNamespaces, c.publisherNamespaces)
}
return c.roomNamespaces, c.publisherNamespaces, false
target := c.roomNamespaces
if jobType == livekit.JobType_JT_PUBLISHER {
target = c.publisherNamespaces
}
c.mu.Unlock()
done := make(chan bool, 1)
go func() {
target.ForEach(func(curNs string) {
if curNs == ns {
select {
case done <- true:
default:
}
return
}
})
select {
case done <- false:
default:
}
}()
return <-done
}
func (c *agentClient) checkEnabled(ctx context.Context, roomNamespaces, publisherNamespaces *serverutils.IncrementalDispatcher[string]) {
func (c *agentClient) checkEnabled(roomNamespaces, publisherNamespaces *serverutils.IncrementalDispatcher[string]) {
defer roomNamespaces.Done()
defer publisherNamespaces.Done()
resChan, err := c.client.CheckEnabled(ctx, &rpc.CheckEnabledRequest{}, psrpc.WithRequestTimeout(CheckEnabledTimeout))
resChan, err := c.client.CheckEnabled(context.Background(), &rpc.CheckEnabledRequest{}, psrpc.WithRequestTimeout(CheckEnabledTimeout))
if err != nil {
logger.Errorw("failed to check enabled", err)
return
-86
View File
@@ -1,86 +0,0 @@
// Copyright 2024 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package agent
import (
"sync"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
)
// Represents a job that is being executed by a worker
type Job struct {
id string
jobType livekit.JobType
status livekit.JobStatus
namespace string
mu sync.Mutex
load float32
Logger logger.Logger
}
func NewJob(id, namespace string, jobType livekit.JobType) *Job {
return &Job{
id: id,
status: livekit.JobStatus_JS_UNKNOWN,
jobType: jobType,
namespace: namespace,
}
}
func (j *Job) ID() string {
return j.id
}
func (j *Job) Namespace() string {
return j.namespace
}
func (j *Job) Type() livekit.JobType {
return j.jobType
}
func (j *Job) WorkerLoad() float32 {
// Current load that this job is taking on its worker
j.mu.Lock()
defer j.mu.Unlock()
return j.load
}
func (j *Job) UpdateStatus(req *livekit.UpdateJobStatus) {
j.mu.Lock()
if req.Status != nil {
j.status = *req.Status // End of the job, SUCCESS or FAILURE
if j.status == livekit.JobStatus_JS_FAILED {
j.Logger.Errorw("job failed", nil, "id", j.id, "type", j.jobType, "error", req.Error)
}
}
j.load = req.Load
j.mu.Unlock()
if req.Metadata != nil {
j.UpdateMetadata(req.GetMetadata())
}
}
func (j *Job) UpdateMetadata(metadata string) {
j.Logger.Debugw("job metadata", nil, "id", j.id, "metadata", metadata)
}
+37 -14
View File
@@ -67,7 +67,7 @@ type Worker struct {
protocolVersion WorkerProtocolVersion
registered atomic.Bool
status livekit.WorkerStatus
runningJobs map[string]*Job
runningJobs map[string]*livekit.Job
onWorkerRegistered func(w *Worker)
@@ -101,7 +101,7 @@ func NewWorker(
apiSecret: apiSecret,
serverInfo: serverInfo,
closed: make(chan struct{}),
runningJobs: make(map[string]*Job),
runningJobs: make(map[string]*livekit.Job),
availability: make(map[string]chan *livekit.AvailabilityResponse),
conn: conn,
sigConn: sigConn,
@@ -134,6 +134,7 @@ func (w *Worker) ID() string {
func (w *Worker) JobType() livekit.JobType {
w.mu.Lock()
defer w.mu.Unlock()
return w.jobType
}
@@ -165,8 +166,8 @@ func (w *Worker) Registered() bool {
return w.registered.Load()
}
func (w *Worker) RunningJobs() map[string]*Job {
jobs := make(map[string]*Job, len(w.runningJobs))
func (w *Worker) RunningJobs() map[string]*livekit.Job {
jobs := make(map[string]*livekit.Job, len(w.runningJobs))
w.mu.Lock()
defer w.mu.Unlock()
for k, v := range w.runningJobs {
@@ -182,6 +183,10 @@ func (w *Worker) AssignJob(ctx context.Context, job *livekit.Job) error {
w.availability[job.Id] = availCh
w.mu.Unlock()
if job.State == nil {
job.State = &livekit.JobState{}
}
w.sendRequest(&livekit.ServerMessage{Message: &livekit.ServerMessage_Availability{
Availability: &livekit.AvailabilityRequest{Job: job},
}})
@@ -204,7 +209,12 @@ func (w *Worker) AssignJob(ctx context.Context, job *livekit.Job) error {
Assignment: &livekit.JobAssignment{Job: job, Url: nil, Token: token},
}})
// TODO(theomonnom): Check if an agent was successfully connected to the room before returning
w.mu.Lock()
w.runningJobs[job.Id] = job
w.mu.Unlock()
// TODO sweep jobs that are never started. We can't do this until all SDKs actually update the the JOB state
return nil
case <-time.After(assignJobTimeout):
return ErrAvailabilityTimeout
@@ -222,10 +232,6 @@ func (w *Worker) UpdateStatus(status *livekit.UpdateWorkerStatus) {
}
w.load = status.GetLoad()
w.mu.Unlock()
if status.Metadata != nil {
w.UpdateMetadata(status.GetMetadata())
}
}
func (w *Worker) UpdateMetadata(metadata string) {
@@ -283,10 +289,10 @@ func (w *Worker) handleRegister(req *livekit.RegisterWorkerRequest) {
w.mu.Lock()
onWorkerRegistered := w.onWorkerRegistered
w.jobType = req.Type
w.version = req.Version
w.name = req.Name
w.namespace = req.GetNamespace()
w.jobType = req.GetType()
if req.AllowedPermissions != nil {
w.permissions = req.AllowedPermissions
@@ -336,15 +342,32 @@ func (w *Worker) handleAvailability(res *livekit.AvailabilityResponse) {
func (w *Worker) handleJobUpdate(update *livekit.UpdateJobStatus) {
w.mu.Lock()
job, ok := w.runningJobs[update.JobId]
w.mu.Unlock()
defer w.mu.Unlock()
job, ok := w.runningJobs[update.JobId]
if !ok {
w.Logger.Warnw("received job update for unknown job", nil, "jobId", update.JobId)
w.Logger.Infow("received job update for unknown job", "jobId", update.JobId)
return
}
job.UpdateStatus(update)
now := time.Now()
job.State.UpdatedAt = now.UnixNano()
if job.State.Status == livekit.JobStatus_JS_PENDING && update.Status >= livekit.JobStatus_JS_RUNNING {
job.State.StartedAt = now.UnixNano()
}
if job.State.Status < livekit.JobStatus_JS_SUCCESS && update.Status >= livekit.JobStatus_JS_SUCCESS {
job.State.EndedAt = now.UnixNano()
}
job.State.Status = update.Status
job.State.Error = update.Error
// TODO do not delete, leafve inside the JobDefinition
if job.State.Status >= livekit.JobStatus_JS_SUCCESS {
delete(w.runningJobs, job.Id)
}
}
func (w *Worker) handleSimulateJob(simulate *livekit.SimulateJobRequest) {
+23 -1
View File
@@ -28,6 +28,7 @@ import (
"gopkg.in/yaml.v3"
"github.com/livekit/mediatransportutil/pkg/rtcconfig"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
redisLiveKit "github.com/livekit/protocol/redis"
"github.com/livekit/protocol/rpc"
@@ -246,7 +247,28 @@ type RoomConfig struct {
// deprecated, moved to limits
MaxRoomNameLength int `yaml:"max_room_name_length,omitempty"`
// deprecated, moved to limits
MaxParticipantIdentityLength int `yaml:"max_participant_identity_length,omitempty"`
MaxParticipantIdentityLength int `yaml:"max_participant_identity_length,omitempty"`
RoomConfigurations map[string]RoomConfiguration `yaml:"room_configurations,omitempty"`
}
type RoomConfiguration struct {
Name string `yaml:"name,omitempty"` // Used as ID, must be unique
// number of seconds to keep the room open if no one joins
EmptyTimeout uint32 `yaml:"empty_timeout,omitempty"`
// number of seconds to keep the room open after everyone leaves
DepartureTimeout uint32 `yaml:"departure_timeout,omitempty"`
// limit number of participants that can be in a room
MaxParticipants uint32 `yaml:"max_participants,omitempty"`
// egress
Egress *livekit.RoomEgress `yaml:"egress,omitempty"`
// agent
Agent *livekit.RoomAgent `yaml:"agent,omitempty"`
// playout delay of subscriber
MinPlayoutDelay uint32 `yaml:"min_playout_delay,omitempty"`
MaxPlayoutDelay uint32 `yaml:"max_playout_delay,omitempty"`
// improves A/V sync when playout_delay set to a value larger than 200ms. It will disables transceiver re-use
// so not recommended for rooms with frequent subscription changes
SyncStreams bool `yaml:"sync_streams"`
}
type CodecSpec struct {
+35 -7
View File
@@ -1014,7 +1014,7 @@ func (r *Room) onTrackPublished(participant types.LocalParticipant, track types.
r.lock.Unlock()
if !hasPublished {
r.launchPublisherAgent(participant)
r.launchPublisherAgents(participant)
if r.internal != nil && r.internal.ParticipantEgress != nil {
go func() {
if err := StartParticipantEgress(
@@ -1423,16 +1423,44 @@ func (r *Room) simulationCleanupWorker() {
}
}
func (r *Room) launchPublisherAgent(p types.Participant) {
func (r *Room) launchPublisherAgents(p types.Participant) {
if p == nil || p.IsDependent() || r.agentClient == nil {
return
}
go r.agentClient.LaunchJob(context.Background(), &agent.JobDescription{
JobType: livekit.JobType_JT_PUBLISHER,
Room: r.ToProto(),
Participant: p.ToProto(),
})
if r.internal == nil {
return
}
for _, ag := range r.internal.Agents {
if ag.Type != livekit.JobType_JT_PUBLISHER {
continue
}
var startAgent bool
if len(ag.ParticipantIdentity) == 0 {
// If no participant given, start for all participants
startAgent = true
} else {
for _, pi := range ag.ParticipantIdentity {
if pi == string(p.Identity()) {
startAgent = true
break
}
}
}
if startAgent {
go r.agentClient.LaunchJob(context.Background(), &agent.JobRequest{
JobType: livekit.JobType_JT_PUBLISHER,
Room: r.ToProto(),
Participant: p.ToProto(),
Metadata: ag.Metadata,
Namespace: ag.Namespace,
})
}
}
}
func (r *Room) DebugInfo() map[string]interface{} {
+4 -2
View File
@@ -56,11 +56,13 @@ type AgentHandler struct {
workers map[string]*agent.Worker
keyProvider auth.KeyProvider
// TODO remove once deprecated CheckEnabled is removed
namespaces map[string]*namespaceInfo
publisherEnabled bool
roomEnabled bool
roomTopic string
publisherTopic string
roomTopic string
publisherTopic string
}
type namespaceInfo struct {
+67
View File
@@ -19,10 +19,13 @@ import (
"errors"
"time"
"google.golang.org/protobuf/proto"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
"github.com/livekit/protocol/utils/guid"
"github.com/livekit/psrpc"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/livekit-server/pkg/routing"
@@ -78,6 +81,11 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre
return nil, false, err
}
req, err = r.applyNamedRoomConfiguration(req)
if err != nil {
return nil, false, err
}
if req.EmptyTimeout > 0 {
rm.EmptyTimeout = req.EmptyTimeout
}
@@ -98,6 +106,24 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre
internal.TrackEgress = req.Egress.Tracks
}
}
if req.Agent == nil {
// Backward compatibility: by default, start any agent in the empty namespace
req.Agent = &livekit.RoomAgent{
Agents: []*livekit.CreateAgentJobDefinitionRequest{
&livekit.CreateAgentJobDefinitionRequest{
Type: livekit.JobType_JT_ROOM,
Room: req.Name,
Namespace: "default",
},
&livekit.CreateAgentJobDefinitionRequest{
Type: livekit.JobType_JT_PUBLISHER,
Room: req.Name,
Namespace: "default",
},
},
}
}
internal.Agents = req.Agent.Agents
if req.MinPlayoutDelay > 0 || req.MaxPlayoutDelay > 0 {
internal.PlayoutDelay = &livekit.PlayoutDelay{
Enabled: true,
@@ -182,3 +208,44 @@ func applyDefaultRoomConfig(room *livekit.Room, internal *livekit.RoomInternal,
}
internal.SyncStreams = conf.SyncStreams
}
func (r *StandardRoomAllocator) applyNamedRoomConfiguration(req *livekit.CreateRoomRequest) (*livekit.CreateRoomRequest, error) {
if req.ConfigName == "" {
return req, nil
}
conf, ok := r.config.Room.RoomConfigurations[req.ConfigName]
if !ok {
return nil, psrpc.NewErrorf(psrpc.InvalidArgument, "unknown roomc confguration in create room request")
}
clone := proto.Clone(req).(*livekit.CreateRoomRequest)
// Request overwrites conf
if clone.EmptyTimeout == 0 {
clone.EmptyTimeout = conf.EmptyTimeout
}
if clone.DepartureTimeout == 0 {
clone.DepartureTimeout = req.DepartureTimeout
}
if clone.MaxParticipants == 0 {
clone.MaxParticipants = conf.MaxParticipants
}
if clone.Egress == nil {
clone.Egress = proto.Clone(conf.Egress).(*livekit.RoomEgress)
}
if clone.Agent == nil {
clone.Agent = proto.Clone(conf.Agent).(*livekit.RoomAgent)
}
if clone.MinPlayoutDelay == 0 {
clone.MinPlayoutDelay = conf.MinPlayoutDelay
}
if clone.MaxPlayoutDelay == 0 {
clone.MaxPlayoutDelay = conf.MaxPlayoutDelay
}
if !clone.SyncStreams {
clone.SyncStreams = conf.SyncStreams
}
return clone, nil
}
+30 -9
View File
@@ -103,10 +103,14 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq
defer done()
if created {
go s.agentClient.LaunchJob(context.WithoutCancel(ctx), &agent.JobDescription{
JobType: livekit.JobType_JT_ROOM,
Room: rm,
})
_, internal, err := s.roomStore.LoadRoom(ctx, livekit.RoomName(req.Name), true)
if internal.Agents != nil {
err = s.launchAgents(ctx, rm, internal.Agents)
if err != nil {
return nil, err
}
}
if req.Egress != nil && req.Egress.Room != nil {
// ensure room name matches
@@ -126,6 +130,23 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq
return rm, nil
}
func (s *RoomService) launchAgents(ctx context.Context, rm *livekit.Room, agents []*livekit.CreateAgentJobDefinitionRequest) error {
for _, ag := range agents {
if ag.Type != livekit.JobType_JT_ROOM {
continue
}
go s.agentClient.LaunchJob(ctx, &agent.JobRequest{
JobType: ag.Type,
Room: rm,
Metadata: ag.Metadata,
Namespace: ag.Namespace,
})
}
return nil
}
func (s *RoomService) ListRooms(ctx context.Context, req *livekit.ListRoomsRequest) (*livekit.ListRoomsResponse, error) {
AppendLogFields(ctx, "room", req.Names)
err := EnsureListPermission(ctx)
@@ -288,7 +309,7 @@ func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat
return nil, twirpAuthError(err)
}
room, _, err := s.roomStore.LoadRoom(ctx, livekit.RoomName(req.Room), false)
room, internal, err := s.roomStore.LoadRoom(ctx, livekit.RoomName(req.Room), false)
if err != nil {
return nil, err
}
@@ -323,10 +344,10 @@ func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat
}
if created {
go s.agentClient.LaunchJob(context.WithoutCancel(ctx), &agent.JobDescription{
JobType: livekit.JobType_JT_ROOM,
Room: room,
})
err = s.launchAgents(ctx, room, internal.Agents)
if err != nil {
return nil, err
}
}
return room, nil
+18 -4
View File
@@ -533,10 +533,24 @@ func (s *RTCService) startConnection(
}
if created && s.agentClient != nil {
go s.agentClient.LaunchJob(ctx, &agent.JobDescription{
JobType: livekit.JobType_JT_ROOM,
Room: cr.Room,
})
// TODO Have CreateRoom return the RoomInternal object?
_, internal, err := s.store.LoadRoom(ctx, livekit.RoomName(roomName), true)
if err != nil {
return connectionResult{}, nil, err
}
for _, ag := range internal.Agents {
if ag.Type != livekit.JobType_JT_ROOM {
continue
}
go s.agentClient.LaunchJob(ctx, &agent.JobRequest{
JobType: ag.Type,
Room: cr.Room,
Metadata: ag.Metadata,
Namespace: ag.Namespace,
})
}
}
// this needs to be started first *before* using router functions on this node
+24 -10
View File
@@ -35,7 +35,6 @@ func TestAgents(t *testing.T) {
_, finish := setupSingleNodeTest("TestAgents")
defer finish()
ac1, err := newAgentClient(agentToken(), defaultServerPort)
require.NoError(t, err)
ac2, err := newAgentClient(agentToken(), defaultServerPort)
@@ -48,10 +47,10 @@ func TestAgents(t *testing.T) {
defer ac2.close()
defer ac3.close()
defer ac4.close()
ac1.Run(livekit.JobType_JT_ROOM, "namespace")
ac2.Run(livekit.JobType_JT_ROOM, "namespace")
ac3.Run(livekit.JobType_JT_PUBLISHER, "namespace")
ac4.Run(livekit.JobType_JT_PUBLISHER, "namespace")
ac1.Run(livekit.JobType_JT_ROOM, "default")
ac2.Run(livekit.JobType_JT_ROOM, "default")
ac3.Run(livekit.JobType_JT_PUBLISHER, "default")
ac4.Run(livekit.JobType_JT_PUBLISHER, "default")
testutils.WithTimeout(t, func() string {
if ac1.registered.Load() != 1 || ac2.registered.Load() != 1 || ac3.registered.Load() != 1 || ac4.registered.Load() != 1 {
@@ -83,7 +82,7 @@ func TestAgents(t *testing.T) {
}
return ""
}, 6 * time.Second)
}, 6*time.Second)
// publish 2 tracks
t3, err := c2.AddStaticTrack("audio/opus", "audio", "micro")
@@ -116,9 +115,24 @@ func TestAgentNamespaces(t *testing.T) {
require.NoError(t, err)
defer ac1.close()
defer ac2.close()
ac1.Run(livekit.JobType_JT_ROOM, "namespace")
ac1.Run(livekit.JobType_JT_ROOM, "namespace1")
ac2.Run(livekit.JobType_JT_ROOM, "namespace2")
_, err = roomClient.CreateRoom(contextWithToken(createRoomToken()), &livekit.CreateRoomRequest{
Name: testRoom,
Agent: &livekit.RoomAgent{
Agents: []*livekit.CreateAgentJobDefinitionRequest{
&livekit.CreateAgentJobDefinitionRequest{
Namespace: "namespace1",
},
&livekit.CreateAgentJobDefinitionRequest{
Namespace: "namespace2",
},
},
},
})
require.NoError(t, err)
testutils.WithTimeout(t, func() string {
if ac1.registered.Load() != 1 || ac2.registered.Load() != 1 {
return "worker not registered"
@@ -137,7 +151,7 @@ func TestAgentNamespaces(t *testing.T) {
job1 := <-ac1.requestedJobs
job2 := <-ac2.requestedJobs
if job1.Namespace != "namespace" {
if job1.Namespace != "namespace1" {
return "namespace is not 'namespace'"
}
@@ -163,8 +177,8 @@ func TestAgentMultiNode(t *testing.T) {
ac2, err := newAgentClient(agentToken(), defaultServerPort)
defer ac1.close()
defer ac2.close()
ac1.Run(livekit.JobType_JT_ROOM, "namespace")
ac2.Run(livekit.JobType_JT_PUBLISHER, "namespace")
ac1.Run(livekit.JobType_JT_ROOM, "default")
ac2.Run(livekit.JobType_JT_PUBLISHER, "default")
testutils.WithTimeout(t, func() string {
if ac1.registered.Load() != 1 || ac2.registered.Load() != 1 {