diff --git a/go.mod b/go.mod index a0f0370e2..998eb0b23 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20240625074155-301bb4a816b7 - github.com/livekit/protocol v1.19.1-0.20240625064703-b2dc8deac6c9 + github.com/livekit/protocol v1.19.1-0.20240627173058-82786f41fdb6 github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a github.com/mackerelio/go-osstat v0.2.5 github.com/magefile/mage v1.15.0 @@ -58,12 +58,16 @@ require ( ) require ( + buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.33.0-20240401165935-b983156c5e99.1 // indirect dario.cat/mergo v1.0.0 // indirect github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect + github.com/antlr4-go/antlr/v4 v4.13.0 // indirect github.com/benbjohnson/clock v1.3.5 // indirect github.com/beorn7/perks v1.0.1 // indirect + github.com/bufbuild/protovalidate-go v0.6.1 // indirect + github.com/bufbuild/protoyaml-go v0.1.9 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/containerd/continuity v0.4.3 // indirect @@ -80,6 +84,7 @@ require ( github.com/go-jose/go-jose/v3 v3.0.3 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/google/cel-go v0.20.1 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect github.com/google/subcommands v1.2.0 // indirect @@ -116,6 +121,7 @@ require ( github.com/puzpuzpuz/xsync/v3 v3.1.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/sirupsen/logrus v1.9.3 // indirect + github.com/stoewer/go-strcase v1.3.0 // indirect github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect github.com/xeipuuv/gojsonschema v1.2.0 // indirect @@ -128,7 +134,8 @@ require ( golang.org/x/sys v0.21.0 // indirect golang.org/x/text v0.16.0 // indirect golang.org/x/tools v0.22.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20240401170217-c3f982113cda // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect google.golang.org/grpc v1.64.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index 18e1c4eb7..5dc79cfd9 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.33.0-20240401165935-b983156c5e99.1 h1:2IGhRovxlsOIQgx2ekZWo4wTPAYpck41+18ICxs37is= +buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.33.0-20240401165935-b983156c5e99.1/go.mod h1:Tgn5bgL220vkFOI0KPStlcClPeOJzAv4uT+V8JXGUnw= dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk= dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0= @@ -6,6 +8,8 @@ github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERo github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk= +github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI= +github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g= github.com/avast/retry-go/v4 v4.6.0 h1:K9xNA+KeB8HHc2aWFuLb25Offp+0iVRXEvFx8IinRJA= github.com/avast/retry-go/v4 v4.6.0/go.mod h1:gvWlPhBVsvBbLkVGDg/KwvBv0bEkCOLRRSHKIr2PyOE= github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o= @@ -18,6 +22,10 @@ github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/bufbuild/protovalidate-go v0.6.1 h1:uzW8r0CDvqApUChNj87VzZVoQSKhcVdw5UWOE605UIw= +github.com/bufbuild/protovalidate-go v0.6.1/go.mod h1:4BR3rKEJiUiTy+sqsusFn2ladOf0kYmA2Reo6BHSBgQ= +github.com/bufbuild/protoyaml-go v0.1.9 h1:anV5UtF1Mlvkkgp4NWA6U/zOnJFng8Orq4Vf3ZUQHBU= +github.com/bufbuild/protoyaml-go v0.1.9/go.mod h1:KCBItkvZOK/zwGueLdH1Wx1RLyFn5rCH7YjQrdty2Wc= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= @@ -56,6 +64,8 @@ github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/elliotchance/orderedmap/v2 v2.2.0 h1:7/2iwO98kYT4XkOjA9mBEIwvi4KpGB4cyHeOFOnj4Vk= github.com/elliotchance/orderedmap/v2 v2.2.0/go.mod h1:85lZyVbpGaGvHvnKa7Qhx7zncAdBIBq6u56Hb1PRU5Q= +github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A= +github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew= github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= github.com/florianl/go-tc v0.4.3 h1:xpobG2gFNvEqbclU07zjddALSjqTQTWJkxg5/kRYDpw= @@ -78,6 +88,9 @@ github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfC github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/google/cel-go v0.20.1 h1:nDx9r8S3L4pE61eDdt8igGj8rf5kjYR3ILxWIpWNi84= +github.com/google/cel-go v0.20.1/go.mod h1:kWcIzTsPX0zmQ+H3TirHstLLf9ep5QTsZBN9u4dOYLg= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= @@ -154,8 +167,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20240625074155-301bb4a816b7 h1:F1L8inJoynwIAYpZENNYS+1xHJMF5RFRorsnAlcxfSY= github.com/livekit/mediatransportutil v0.0.0-20240625074155-301bb4a816b7/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE= -github.com/livekit/protocol v1.19.1-0.20240625064703-b2dc8deac6c9 h1:dG/rhtgXL3Clh/ywRH2g2g5pnssPr98OP6VmbOCSB2A= -github.com/livekit/protocol v1.19.1-0.20240625064703-b2dc8deac6c9/go.mod h1:zUHHcMbhBRDe0+OBTHbQT1c7njLKEki+xe1qPpR7LhU= +github.com/livekit/protocol v1.19.1-0.20240627173058-82786f41fdb6 h1:XtyV+MqHqXTuNLXz5TUjYtNg0gvTVw9web/YuXD9+3c= +github.com/livekit/protocol v1.19.1-0.20240627173058-82786f41fdb6/go.mod h1:bNjJi+8frdvC84xG0CJ/7VfVvqerLg2MzjOks0ucyC4= github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a h1:EQAHmcYEGlc6V517cQ3Iy0+jHgP6+tM/B4l2vGuLpQo= github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o= @@ -288,6 +301,8 @@ github.com/sclevine/spec v1.4.0 h1:z/Q9idDcay5m5irkZ28M7PtQM4aOISzOpj4bUPkDee8= github.com/sclevine/spec v1.4.0/go.mod h1:LvpgJaFyvQzRvc1kaDs0bulYwzC70PbiYjC4QnFHkOM= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/stoewer/go-strcase v1.3.0 h1:g0eASXYtp+yvN9fK8sH94oCIk0fau9uV1/ZdJ0AVEzs= +github.com/stoewer/go-strcase v1.3.0/go.mod h1:fAH5hQ5pehh+j3nZfvwdk2RgEgQjAoM8wodgtPmh1xo= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= @@ -477,10 +492,14 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d h1:k3zyW3BYYR30e8v3x0bTDdE9vpYFjZHK+HcyqkrppWk= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY= +google.golang.org/genproto/googleapis/api v0.0.0-20240401170217-c3f982113cda h1:b6F6WIV4xHHD0FA4oIyzU6mHWg2WI2X1RBehwa5QN38= +google.golang.org/genproto/googleapis/api v0.0.0-20240401170217-c3f982113cda/go.mod h1:AHcE/gZH76Bk/ROZhQphlRoWo5xKDEtz3eVEO1LfA8c= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda h1:LI5DOvAxUPMv/50agcLLoo+AdWc1irS9Rzz4vPuD1V4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY= google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/pkg/agent/client.go b/pkg/agent/client.go index d2fdabf91..c82d6c3f5 100644 --- a/pkg/agent/client.go +++ b/pkg/agent/client.go @@ -20,15 +20,13 @@ import ( "time" "github.com/gammazero/workerpool" - "google.golang.org/protobuf/types/known/emptypb" - serverutils "github.com/livekit/livekit-server/pkg/utils" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/rpc" "github.com/livekit/protocol/utils" - "github.com/livekit/protocol/utils/guid" "github.com/livekit/psrpc" + "google.golang.org/protobuf/types/known/emptypb" ) const ( @@ -43,15 +41,17 @@ const ( type Client interface { // LaunchJob starts a room or participant job on an agent. // it will launch a job once for each worker in each namespace - LaunchJob(ctx context.Context, desc *JobDescription) + LaunchJob(ctx context.Context, desc *JobRequest) Stop() error } -type JobDescription struct { +type JobRequest struct { JobType livekit.JobType Room *livekit.Room // only set for participant jobs Participant *livekit.ParticipantInfo + Metadata string + Namespace string } type agentClient struct { @@ -105,52 +105,75 @@ func NewAgentClient(bus psrpc.MessageBus) (Client, error) { return c, nil } -func (c *agentClient) LaunchJob(ctx context.Context, desc *JobDescription) { - roomNamespaces, publisherNamespaces, needsRefresh := c.getOrCreateDispatchers() - - if needsRefresh { - go c.checkEnabled(ctx, roomNamespaces, publisherNamespaces) - } - - target := roomNamespaces +func (c *agentClient) LaunchJob(ctx context.Context, desc *JobRequest) { jobTypeTopic := RoomAgentTopic if desc.JobType == livekit.JobType_JT_PUBLISHER { - target = publisherNamespaces jobTypeTopic = PublisherAgentTopic } - target.ForEach(func(ns string) { - c.workers.Submit(func() { - _, err := c.client.JobRequest(ctx, ns, jobTypeTopic, &livekit.Job{ - Id: guid.New(utils.AgentJobPrefix), - Type: desc.JobType, - Room: desc.Room, - Participant: desc.Participant, - Namespace: ns, - }) - if err != nil { - logger.Errorw("failed to send job request", err, "namespace", ns, "jobType", jobTypeTopic) - } + if !c.isNamespaceActive(desc.Namespace, desc.JobType) { + logger.Infow("not dispatching agent job since no worker is available", "namespace", desc.Namespace, "jobType", desc.JobType) + return + } + + c.workers.Submit(func() { + _, err := c.client.JobRequest(context.Background(), desc.Namespace, jobTypeTopic, &livekit.Job{ + Id: utils.NewGuid(utils.AgentJobPrefix), + Type: desc.JobType, + Room: desc.Room, + Participant: desc.Participant, + Namespace: desc.Namespace, + Metadata: desc.Metadata, }) + if err != nil { + logger.Infow("failed to send job request", "error", err, "namespace", desc.Namespace, "jobType", desc.JobType) + } }) } -func (c *agentClient) getOrCreateDispatchers() (*serverutils.IncrementalDispatcher[string], *serverutils.IncrementalDispatcher[string], bool) { +func (c *agentClient) isNamespaceActive(ns string, jobType livekit.JobType) bool { c.mu.Lock() - defer c.mu.Unlock() if time.Since(c.enabledExpiresAt) > EnabledCacheTTL || c.roomNamespaces == nil || c.publisherNamespaces == nil { + c.enabledExpiresAt = time.Now() c.roomNamespaces = serverutils.NewIncrementalDispatcher[string]() c.publisherNamespaces = serverutils.NewIncrementalDispatcher[string]() - return c.roomNamespaces, c.publisherNamespaces, true + go c.checkEnabled(c.roomNamespaces, c.publisherNamespaces) } - return c.roomNamespaces, c.publisherNamespaces, false + + target := c.roomNamespaces + if jobType == livekit.JobType_JT_PUBLISHER { + target = c.publisherNamespaces + } + + c.mu.Unlock() + + done := make(chan bool, 1) + + go func() { + target.ForEach(func(curNs string) { + if curNs == ns { + select { + case done <- true: + default: + } + + return + } + }) + select { + case done <- false: + default: + } + }() + + return <-done } -func (c *agentClient) checkEnabled(ctx context.Context, roomNamespaces, publisherNamespaces *serverutils.IncrementalDispatcher[string]) { +func (c *agentClient) checkEnabled(roomNamespaces, publisherNamespaces *serverutils.IncrementalDispatcher[string]) { defer roomNamespaces.Done() defer publisherNamespaces.Done() - resChan, err := c.client.CheckEnabled(ctx, &rpc.CheckEnabledRequest{}, psrpc.WithRequestTimeout(CheckEnabledTimeout)) + resChan, err := c.client.CheckEnabled(context.Background(), &rpc.CheckEnabledRequest{}, psrpc.WithRequestTimeout(CheckEnabledTimeout)) if err != nil { logger.Errorw("failed to check enabled", err) return diff --git a/pkg/agent/job.go b/pkg/agent/job.go deleted file mode 100644 index 6edfe4a27..000000000 --- a/pkg/agent/job.go +++ /dev/null @@ -1,86 +0,0 @@ -// Copyright 2024 LiveKit, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package agent - -import ( - "sync" - - "github.com/livekit/protocol/livekit" - "github.com/livekit/protocol/logger" -) - -// Represents a job that is being executed by a worker -type Job struct { - id string - jobType livekit.JobType - status livekit.JobStatus - namespace string - - mu sync.Mutex - load float32 - - Logger logger.Logger -} - -func NewJob(id, namespace string, jobType livekit.JobType) *Job { - return &Job{ - id: id, - status: livekit.JobStatus_JS_UNKNOWN, - jobType: jobType, - namespace: namespace, - } -} - -func (j *Job) ID() string { - return j.id -} - -func (j *Job) Namespace() string { - return j.namespace -} - -func (j *Job) Type() livekit.JobType { - return j.jobType -} - -func (j *Job) WorkerLoad() float32 { - // Current load that this job is taking on its worker - j.mu.Lock() - defer j.mu.Unlock() - return j.load -} - -func (j *Job) UpdateStatus(req *livekit.UpdateJobStatus) { - j.mu.Lock() - - if req.Status != nil { - j.status = *req.Status // End of the job, SUCCESS or FAILURE - - if j.status == livekit.JobStatus_JS_FAILED { - j.Logger.Errorw("job failed", nil, "id", j.id, "type", j.jobType, "error", req.Error) - } - } - - j.load = req.Load - j.mu.Unlock() - - if req.Metadata != nil { - j.UpdateMetadata(req.GetMetadata()) - } -} - -func (j *Job) UpdateMetadata(metadata string) { - j.Logger.Debugw("job metadata", nil, "id", j.id, "metadata", metadata) -} diff --git a/pkg/agent/worker.go b/pkg/agent/worker.go index bff334988..daed90b85 100644 --- a/pkg/agent/worker.go +++ b/pkg/agent/worker.go @@ -67,7 +67,7 @@ type Worker struct { protocolVersion WorkerProtocolVersion registered atomic.Bool status livekit.WorkerStatus - runningJobs map[string]*Job + runningJobs map[string]*livekit.Job onWorkerRegistered func(w *Worker) @@ -101,7 +101,7 @@ func NewWorker( apiSecret: apiSecret, serverInfo: serverInfo, closed: make(chan struct{}), - runningJobs: make(map[string]*Job), + runningJobs: make(map[string]*livekit.Job), availability: make(map[string]chan *livekit.AvailabilityResponse), conn: conn, sigConn: sigConn, @@ -134,6 +134,7 @@ func (w *Worker) ID() string { func (w *Worker) JobType() livekit.JobType { w.mu.Lock() defer w.mu.Unlock() + return w.jobType } @@ -165,8 +166,8 @@ func (w *Worker) Registered() bool { return w.registered.Load() } -func (w *Worker) RunningJobs() map[string]*Job { - jobs := make(map[string]*Job, len(w.runningJobs)) +func (w *Worker) RunningJobs() map[string]*livekit.Job { + jobs := make(map[string]*livekit.Job, len(w.runningJobs)) w.mu.Lock() defer w.mu.Unlock() for k, v := range w.runningJobs { @@ -182,6 +183,10 @@ func (w *Worker) AssignJob(ctx context.Context, job *livekit.Job) error { w.availability[job.Id] = availCh w.mu.Unlock() + if job.State == nil { + job.State = &livekit.JobState{} + } + w.sendRequest(&livekit.ServerMessage{Message: &livekit.ServerMessage_Availability{ Availability: &livekit.AvailabilityRequest{Job: job}, }}) @@ -204,7 +209,12 @@ func (w *Worker) AssignJob(ctx context.Context, job *livekit.Job) error { Assignment: &livekit.JobAssignment{Job: job, Url: nil, Token: token}, }}) - // TODO(theomonnom): Check if an agent was successfully connected to the room before returning + w.mu.Lock() + w.runningJobs[job.Id] = job + w.mu.Unlock() + + // TODO sweep jobs that are never started. We can't do this until all SDKs actually update the the JOB state + return nil case <-time.After(assignJobTimeout): return ErrAvailabilityTimeout @@ -222,10 +232,6 @@ func (w *Worker) UpdateStatus(status *livekit.UpdateWorkerStatus) { } w.load = status.GetLoad() w.mu.Unlock() - - if status.Metadata != nil { - w.UpdateMetadata(status.GetMetadata()) - } } func (w *Worker) UpdateMetadata(metadata string) { @@ -283,10 +289,10 @@ func (w *Worker) handleRegister(req *livekit.RegisterWorkerRequest) { w.mu.Lock() onWorkerRegistered := w.onWorkerRegistered - w.jobType = req.Type w.version = req.Version w.name = req.Name w.namespace = req.GetNamespace() + w.jobType = req.GetType() if req.AllowedPermissions != nil { w.permissions = req.AllowedPermissions @@ -336,15 +342,32 @@ func (w *Worker) handleAvailability(res *livekit.AvailabilityResponse) { func (w *Worker) handleJobUpdate(update *livekit.UpdateJobStatus) { w.mu.Lock() - job, ok := w.runningJobs[update.JobId] - w.mu.Unlock() + defer w.mu.Unlock() + job, ok := w.runningJobs[update.JobId] if !ok { - w.Logger.Warnw("received job update for unknown job", nil, "jobId", update.JobId) + w.Logger.Infow("received job update for unknown job", "jobId", update.JobId) return } - job.UpdateStatus(update) + now := time.Now() + job.State.UpdatedAt = now.UnixNano() + + if job.State.Status == livekit.JobStatus_JS_PENDING && update.Status >= livekit.JobStatus_JS_RUNNING { + job.State.StartedAt = now.UnixNano() + } + + if job.State.Status < livekit.JobStatus_JS_SUCCESS && update.Status >= livekit.JobStatus_JS_SUCCESS { + job.State.EndedAt = now.UnixNano() + } + + job.State.Status = update.Status + job.State.Error = update.Error + + // TODO do not delete, leafve inside the JobDefinition + if job.State.Status >= livekit.JobStatus_JS_SUCCESS { + delete(w.runningJobs, job.Id) + } } func (w *Worker) handleSimulateJob(simulate *livekit.SimulateJobRequest) { diff --git a/pkg/config/config.go b/pkg/config/config.go index ca3d64d43..3f9dd31aa 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -28,6 +28,7 @@ import ( "gopkg.in/yaml.v3" "github.com/livekit/mediatransportutil/pkg/rtcconfig" + "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" redisLiveKit "github.com/livekit/protocol/redis" "github.com/livekit/protocol/rpc" @@ -246,7 +247,28 @@ type RoomConfig struct { // deprecated, moved to limits MaxRoomNameLength int `yaml:"max_room_name_length,omitempty"` // deprecated, moved to limits - MaxParticipantIdentityLength int `yaml:"max_participant_identity_length,omitempty"` + MaxParticipantIdentityLength int `yaml:"max_participant_identity_length,omitempty"` + RoomConfigurations map[string]RoomConfiguration `yaml:"room_configurations,omitempty"` +} + +type RoomConfiguration struct { + Name string `yaml:"name,omitempty"` // Used as ID, must be unique + // number of seconds to keep the room open if no one joins + EmptyTimeout uint32 `yaml:"empty_timeout,omitempty"` + // number of seconds to keep the room open after everyone leaves + DepartureTimeout uint32 `yaml:"departure_timeout,omitempty"` + // limit number of participants that can be in a room + MaxParticipants uint32 `yaml:"max_participants,omitempty"` + // egress + Egress *livekit.RoomEgress `yaml:"egress,omitempty"` + // agent + Agent *livekit.RoomAgent `yaml:"agent,omitempty"` + // playout delay of subscriber + MinPlayoutDelay uint32 `yaml:"min_playout_delay,omitempty"` + MaxPlayoutDelay uint32 `yaml:"max_playout_delay,omitempty"` + // improves A/V sync when playout_delay set to a value larger than 200ms. It will disables transceiver re-use + // so not recommended for rooms with frequent subscription changes + SyncStreams bool `yaml:"sync_streams"` } type CodecSpec struct { diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 9a5ecfc7b..c5daee74f 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -1014,7 +1014,7 @@ func (r *Room) onTrackPublished(participant types.LocalParticipant, track types. r.lock.Unlock() if !hasPublished { - r.launchPublisherAgent(participant) + r.launchPublisherAgents(participant) if r.internal != nil && r.internal.ParticipantEgress != nil { go func() { if err := StartParticipantEgress( @@ -1423,16 +1423,44 @@ func (r *Room) simulationCleanupWorker() { } } -func (r *Room) launchPublisherAgent(p types.Participant) { +func (r *Room) launchPublisherAgents(p types.Participant) { if p == nil || p.IsDependent() || r.agentClient == nil { return } - go r.agentClient.LaunchJob(context.Background(), &agent.JobDescription{ - JobType: livekit.JobType_JT_PUBLISHER, - Room: r.ToProto(), - Participant: p.ToProto(), - }) + if r.internal == nil { + return + } + + for _, ag := range r.internal.Agents { + if ag.Type != livekit.JobType_JT_PUBLISHER { + continue + } + + var startAgent bool + + if len(ag.ParticipantIdentity) == 0 { + // If no participant given, start for all participants + startAgent = true + } else { + for _, pi := range ag.ParticipantIdentity { + if pi == string(p.Identity()) { + startAgent = true + break + } + } + } + + if startAgent { + go r.agentClient.LaunchJob(context.Background(), &agent.JobRequest{ + JobType: livekit.JobType_JT_PUBLISHER, + Room: r.ToProto(), + Participant: p.ToProto(), + Metadata: ag.Metadata, + Namespace: ag.Namespace, + }) + } + } } func (r *Room) DebugInfo() map[string]interface{} { diff --git a/pkg/service/agentservice.go b/pkg/service/agentservice.go index 9a7106438..82f6b3af7 100644 --- a/pkg/service/agentservice.go +++ b/pkg/service/agentservice.go @@ -56,11 +56,13 @@ type AgentHandler struct { workers map[string]*agent.Worker keyProvider auth.KeyProvider + // TODO remove once deprecated CheckEnabled is removed namespaces map[string]*namespaceInfo publisherEnabled bool roomEnabled bool - roomTopic string - publisherTopic string + + roomTopic string + publisherTopic string } type namespaceInfo struct { diff --git a/pkg/service/roomallocator.go b/pkg/service/roomallocator.go index c8556e645..eecb2bb30 100644 --- a/pkg/service/roomallocator.go +++ b/pkg/service/roomallocator.go @@ -19,10 +19,13 @@ import ( "errors" "time" + "google.golang.org/protobuf/proto" + "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils" "github.com/livekit/protocol/utils/guid" + "github.com/livekit/psrpc" "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" @@ -78,6 +81,11 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre return nil, false, err } + req, err = r.applyNamedRoomConfiguration(req) + if err != nil { + return nil, false, err + } + if req.EmptyTimeout > 0 { rm.EmptyTimeout = req.EmptyTimeout } @@ -98,6 +106,24 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre internal.TrackEgress = req.Egress.Tracks } } + if req.Agent == nil { + // Backward compatibility: by default, start any agent in the empty namespace + req.Agent = &livekit.RoomAgent{ + Agents: []*livekit.CreateAgentJobDefinitionRequest{ + &livekit.CreateAgentJobDefinitionRequest{ + Type: livekit.JobType_JT_ROOM, + Room: req.Name, + Namespace: "default", + }, + &livekit.CreateAgentJobDefinitionRequest{ + Type: livekit.JobType_JT_PUBLISHER, + Room: req.Name, + Namespace: "default", + }, + }, + } + } + internal.Agents = req.Agent.Agents if req.MinPlayoutDelay > 0 || req.MaxPlayoutDelay > 0 { internal.PlayoutDelay = &livekit.PlayoutDelay{ Enabled: true, @@ -182,3 +208,44 @@ func applyDefaultRoomConfig(room *livekit.Room, internal *livekit.RoomInternal, } internal.SyncStreams = conf.SyncStreams } + +func (r *StandardRoomAllocator) applyNamedRoomConfiguration(req *livekit.CreateRoomRequest) (*livekit.CreateRoomRequest, error) { + if req.ConfigName == "" { + return req, nil + } + + conf, ok := r.config.Room.RoomConfigurations[req.ConfigName] + if !ok { + return nil, psrpc.NewErrorf(psrpc.InvalidArgument, "unknown roomc confguration in create room request") + } + + clone := proto.Clone(req).(*livekit.CreateRoomRequest) + + // Request overwrites conf + if clone.EmptyTimeout == 0 { + clone.EmptyTimeout = conf.EmptyTimeout + } + if clone.DepartureTimeout == 0 { + clone.DepartureTimeout = req.DepartureTimeout + } + if clone.MaxParticipants == 0 { + clone.MaxParticipants = conf.MaxParticipants + } + if clone.Egress == nil { + clone.Egress = proto.Clone(conf.Egress).(*livekit.RoomEgress) + } + if clone.Agent == nil { + clone.Agent = proto.Clone(conf.Agent).(*livekit.RoomAgent) + } + if clone.MinPlayoutDelay == 0 { + clone.MinPlayoutDelay = conf.MinPlayoutDelay + } + if clone.MaxPlayoutDelay == 0 { + clone.MaxPlayoutDelay = conf.MaxPlayoutDelay + } + if !clone.SyncStreams { + clone.SyncStreams = conf.SyncStreams + } + + return clone, nil +} diff --git a/pkg/service/roomservice.go b/pkg/service/roomservice.go index 19657b09a..2cc379bd4 100644 --- a/pkg/service/roomservice.go +++ b/pkg/service/roomservice.go @@ -103,10 +103,14 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq defer done() if created { - go s.agentClient.LaunchJob(context.WithoutCancel(ctx), &agent.JobDescription{ - JobType: livekit.JobType_JT_ROOM, - Room: rm, - }) + _, internal, err := s.roomStore.LoadRoom(ctx, livekit.RoomName(req.Name), true) + + if internal.Agents != nil { + err = s.launchAgents(ctx, rm, internal.Agents) + if err != nil { + return nil, err + } + } if req.Egress != nil && req.Egress.Room != nil { // ensure room name matches @@ -126,6 +130,23 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq return rm, nil } +func (s *RoomService) launchAgents(ctx context.Context, rm *livekit.Room, agents []*livekit.CreateAgentJobDefinitionRequest) error { + for _, ag := range agents { + if ag.Type != livekit.JobType_JT_ROOM { + continue + } + + go s.agentClient.LaunchJob(ctx, &agent.JobRequest{ + JobType: ag.Type, + Room: rm, + Metadata: ag.Metadata, + Namespace: ag.Namespace, + }) + } + + return nil +} + func (s *RoomService) ListRooms(ctx context.Context, req *livekit.ListRoomsRequest) (*livekit.ListRoomsResponse, error) { AppendLogFields(ctx, "room", req.Names) err := EnsureListPermission(ctx) @@ -288,7 +309,7 @@ func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat return nil, twirpAuthError(err) } - room, _, err := s.roomStore.LoadRoom(ctx, livekit.RoomName(req.Room), false) + room, internal, err := s.roomStore.LoadRoom(ctx, livekit.RoomName(req.Room), false) if err != nil { return nil, err } @@ -323,10 +344,10 @@ func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat } if created { - go s.agentClient.LaunchJob(context.WithoutCancel(ctx), &agent.JobDescription{ - JobType: livekit.JobType_JT_ROOM, - Room: room, - }) + err = s.launchAgents(ctx, room, internal.Agents) + if err != nil { + return nil, err + } } return room, nil diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index 5843231e2..74ce2f564 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -533,10 +533,24 @@ func (s *RTCService) startConnection( } if created && s.agentClient != nil { - go s.agentClient.LaunchJob(ctx, &agent.JobDescription{ - JobType: livekit.JobType_JT_ROOM, - Room: cr.Room, - }) + // TODO Have CreateRoom return the RoomInternal object? + _, internal, err := s.store.LoadRoom(ctx, livekit.RoomName(roomName), true) + if err != nil { + return connectionResult{}, nil, err + } + + for _, ag := range internal.Agents { + if ag.Type != livekit.JobType_JT_ROOM { + continue + } + + go s.agentClient.LaunchJob(ctx, &agent.JobRequest{ + JobType: ag.Type, + Room: cr.Room, + Metadata: ag.Metadata, + Namespace: ag.Namespace, + }) + } } // this needs to be started first *before* using router functions on this node diff --git a/test/agent_test.go b/test/agent_test.go index 88e44b19c..7db5b4b6f 100644 --- a/test/agent_test.go +++ b/test/agent_test.go @@ -35,7 +35,6 @@ func TestAgents(t *testing.T) { _, finish := setupSingleNodeTest("TestAgents") defer finish() - ac1, err := newAgentClient(agentToken(), defaultServerPort) require.NoError(t, err) ac2, err := newAgentClient(agentToken(), defaultServerPort) @@ -48,10 +47,10 @@ func TestAgents(t *testing.T) { defer ac2.close() defer ac3.close() defer ac4.close() - ac1.Run(livekit.JobType_JT_ROOM, "namespace") - ac2.Run(livekit.JobType_JT_ROOM, "namespace") - ac3.Run(livekit.JobType_JT_PUBLISHER, "namespace") - ac4.Run(livekit.JobType_JT_PUBLISHER, "namespace") + ac1.Run(livekit.JobType_JT_ROOM, "default") + ac2.Run(livekit.JobType_JT_ROOM, "default") + ac3.Run(livekit.JobType_JT_PUBLISHER, "default") + ac4.Run(livekit.JobType_JT_PUBLISHER, "default") testutils.WithTimeout(t, func() string { if ac1.registered.Load() != 1 || ac2.registered.Load() != 1 || ac3.registered.Load() != 1 || ac4.registered.Load() != 1 { @@ -83,7 +82,7 @@ func TestAgents(t *testing.T) { } return "" - }, 6 * time.Second) + }, 6*time.Second) // publish 2 tracks t3, err := c2.AddStaticTrack("audio/opus", "audio", "micro") @@ -116,9 +115,24 @@ func TestAgentNamespaces(t *testing.T) { require.NoError(t, err) defer ac1.close() defer ac2.close() - ac1.Run(livekit.JobType_JT_ROOM, "namespace") + ac1.Run(livekit.JobType_JT_ROOM, "namespace1") ac2.Run(livekit.JobType_JT_ROOM, "namespace2") + _, err = roomClient.CreateRoom(contextWithToken(createRoomToken()), &livekit.CreateRoomRequest{ + Name: testRoom, + Agent: &livekit.RoomAgent{ + Agents: []*livekit.CreateAgentJobDefinitionRequest{ + &livekit.CreateAgentJobDefinitionRequest{ + Namespace: "namespace1", + }, + &livekit.CreateAgentJobDefinitionRequest{ + Namespace: "namespace2", + }, + }, + }, + }) + require.NoError(t, err) + testutils.WithTimeout(t, func() string { if ac1.registered.Load() != 1 || ac2.registered.Load() != 1 { return "worker not registered" @@ -137,7 +151,7 @@ func TestAgentNamespaces(t *testing.T) { job1 := <-ac1.requestedJobs job2 := <-ac2.requestedJobs - if job1.Namespace != "namespace" { + if job1.Namespace != "namespace1" { return "namespace is not 'namespace'" } @@ -163,8 +177,8 @@ func TestAgentMultiNode(t *testing.T) { ac2, err := newAgentClient(agentToken(), defaultServerPort) defer ac1.close() defer ac2.close() - ac1.Run(livekit.JobType_JT_ROOM, "namespace") - ac2.Run(livekit.JobType_JT_PUBLISHER, "namespace") + ac1.Run(livekit.JobType_JT_ROOM, "default") + ac2.Run(livekit.JobType_JT_PUBLISHER, "default") testutils.WithTimeout(t, func() string { if ac1.registered.Load() != 1 || ac2.registered.Load() != 1 {