diff --git a/CHANGELOG b/CHANGELOG index 3a9278aaa..52061a8fa 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -2,6 +2,27 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.7.0] - 2024-06-23 + +This version includes a breaking change for SIP service. SIP service now requires `sip.admin` in the token's permission grant +to interact with trunks and dispatch rules; and `sip.call` to dial out to phone numbers. +The latest versions of server SDKs will include the permission grants automatically. + +### Added + +- Support new SIP Trunk API. (#2799) +- Add participant session duration metric (#2801) +- Support for key/value attributes on Participants (#2806) +- Breaking: SIP service requires sip.admin or sip.call grants. (#2808) + +### Fixed + +- Fixed agent jobs not launching when using the CreateRoom API (#2796) + +### Changed + +- Indicate if track is expected to be resumed in `onClose` callback. (#2800) + ## [1.6.2] - 2024-06-15 ### Added diff --git a/README.md b/README.md index a653de3ba..896cf9d4c 100644 --- a/README.md +++ b/README.md @@ -280,7 +280,7 @@ Read our [deployment docs](https://docs.livekit.io/deploy/) for more information Pre-requisites: -- Go 1.20+ is installed +- Go 1.22+ is installed - GOPATH/bin is in your PATH Then run diff --git a/go.mod b/go.mod index ef8dc0491..f923bfef9 100644 --- a/go.mod +++ b/go.mod @@ -19,9 +19,9 @@ require ( github.com/jellydator/ttlcache/v3 v3.2.0 github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 - github.com/livekit/mediatransportutil v0.0.0-20240613015318-84b69facfb75 - github.com/livekit/protocol v1.18.0 - github.com/livekit/psrpc v0.5.3-0.20240526192918-fbdaf10e6aa5 + github.com/livekit/mediatransportutil v0.0.0-20240625074155-301bb4a816b7 + github.com/livekit/protocol v1.19.2-0.20240705134535-94a2cfe2f1ee + github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a github.com/mackerelio/go-osstat v0.2.5 github.com/magefile/mage v1.15.0 github.com/maxbrunsfeld/counterfeiter/v6 v6.8.1 @@ -33,7 +33,7 @@ require ( github.com/pion/interceptor v0.1.29 github.com/pion/rtcp v1.2.14 github.com/pion/rtp v1.8.6 - github.com/pion/sctp v1.8.16 + github.com/pion/sctp v1.8.17 github.com/pion/sdp/v3 v3.0.9 github.com/pion/transport/v2 v2.2.5 github.com/pion/turn/v2 v2.1.6 @@ -58,12 +58,16 @@ require ( ) require ( + buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.33.0-20240401165935-b983156c5e99.1 // indirect dario.cat/mergo v1.0.0 // indirect github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect + github.com/antlr4-go/antlr/v4 v4.13.0 // indirect github.com/benbjohnson/clock v1.3.5 // indirect github.com/beorn7/perks v1.0.1 // indirect + github.com/bufbuild/protovalidate-go v0.6.1 // indirect + github.com/bufbuild/protoyaml-go v0.1.9 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/containerd/continuity v0.4.3 // indirect @@ -80,15 +84,16 @@ require ( github.com/go-jose/go-jose/v3 v3.0.3 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/google/cel-go v0.20.1 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect github.com/google/subcommands v1.2.0 // indirect github.com/google/uuid v1.6.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect - github.com/hashicorp/go-retryablehttp v0.7.5 // indirect + github.com/hashicorp/go-retryablehttp v0.7.7 // indirect github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/josharian/native v1.1.0 // indirect - github.com/klauspost/compress v1.17.8 // indirect + github.com/klauspost/compress v1.17.9 // indirect github.com/klauspost/cpuid/v2 v2.2.6 // indirect github.com/lithammer/shortuuid/v4 v4.0.0 // indirect github.com/mattn/go-runewidth v0.0.9 // indirect @@ -97,7 +102,7 @@ require ( github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/moby/docker-image-spec v1.3.1 // indirect github.com/moby/term v0.5.0 // indirect - github.com/nats-io/nats.go v1.35.0 // indirect + github.com/nats-io/nats.go v1.36.0 // indirect github.com/nats-io/nkeys v0.4.7 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect @@ -116,6 +121,7 @@ require ( github.com/puzpuzpuz/xsync/v3 v3.1.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/sirupsen/logrus v1.9.3 // indirect + github.com/stoewer/go-strcase v1.3.0 // indirect github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect github.com/xeipuuv/gojsonschema v1.2.0 // indirect @@ -128,7 +134,8 @@ require ( golang.org/x/sys v0.21.0 // indirect golang.org/x/text v0.16.0 // indirect golang.org/x/tools v0.22.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240521202816-d264139d666e // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20240401170217-c3f982113cda // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect google.golang.org/grpc v1.64.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index f7d3af548..964384eec 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.33.0-20240401165935-b983156c5e99.1 h1:2IGhRovxlsOIQgx2ekZWo4wTPAYpck41+18ICxs37is= +buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.33.0-20240401165935-b983156c5e99.1/go.mod h1:Tgn5bgL220vkFOI0KPStlcClPeOJzAv4uT+V8JXGUnw= dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk= dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0= @@ -6,6 +8,8 @@ github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERo github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk= +github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI= +github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g= github.com/avast/retry-go/v4 v4.6.0 h1:K9xNA+KeB8HHc2aWFuLb25Offp+0iVRXEvFx8IinRJA= github.com/avast/retry-go/v4 v4.6.0/go.mod h1:gvWlPhBVsvBbLkVGDg/KwvBv0bEkCOLRRSHKIr2PyOE= github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o= @@ -18,6 +22,10 @@ github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/bufbuild/protovalidate-go v0.6.1 h1:uzW8r0CDvqApUChNj87VzZVoQSKhcVdw5UWOE605UIw= +github.com/bufbuild/protovalidate-go v0.6.1/go.mod h1:4BR3rKEJiUiTy+sqsusFn2ladOf0kYmA2Reo6BHSBgQ= +github.com/bufbuild/protoyaml-go v0.1.9 h1:anV5UtF1Mlvkkgp4NWA6U/zOnJFng8Orq4Vf3ZUQHBU= +github.com/bufbuild/protoyaml-go v0.1.9/go.mod h1:KCBItkvZOK/zwGueLdH1Wx1RLyFn5rCH7YjQrdty2Wc= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= @@ -56,6 +64,10 @@ github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/elliotchance/orderedmap/v2 v2.2.0 h1:7/2iwO98kYT4XkOjA9mBEIwvi4KpGB4cyHeOFOnj4Vk= github.com/elliotchance/orderedmap/v2 v2.2.0/go.mod h1:85lZyVbpGaGvHvnKa7Qhx7zncAdBIBq6u56Hb1PRU5Q= +github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A= +github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew= +github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= +github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= github.com/florianl/go-tc v0.4.3 h1:xpobG2gFNvEqbclU07zjddALSjqTQTWJkxg5/kRYDpw= github.com/florianl/go-tc v0.4.3/go.mod h1:uvp6pIlOw7Z8hhfnT5M4+V1hHVgZWRZwwMS8Z0JsRxc= github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= @@ -76,6 +88,9 @@ github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfC github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/google/cel-go v0.20.1 h1:nDx9r8S3L4pE61eDdt8igGj8rf5kjYR3ILxWIpWNi84= +github.com/google/cel-go v0.20.1/go.mod h1:kWcIzTsPX0zmQ+H3TirHstLLf9ep5QTsZBN9u4dOYLg= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= @@ -101,10 +116,10 @@ github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aN github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48= -github.com/hashicorp/go-hclog v0.9.2 h1:CG6TE5H9/JXsFWJCfoIVpKFIkFe6ysEuHirp4DxCsHI= -github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ= -github.com/hashicorp/go-retryablehttp v0.7.5 h1:bJj+Pj19UZMIweq/iie+1u5YCdGrnxCT9yvm0e+Nd5M= -github.com/hashicorp/go-retryablehttp v0.7.5/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5wjtH1ewM9u8iYVjtX8= +github.com/hashicorp/go-hclog v1.6.3 h1:Qr2kF+eVWjTiYmU7Y31tYlP1h0q/X3Nl3tPGdaB11/k= +github.com/hashicorp/go-hclog v1.6.3/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M= +github.com/hashicorp/go-retryablehttp v0.7.7 h1:C8hUCYzor8PIfXHa4UrZkU4VvK8o9ISHxT2Q8+VepXU= +github.com/hashicorp/go-retryablehttp v0.7.7/go.mod h1:pkQpWZeYWskR+D1tR2O5OcBFOxfA7DoAO6xtkuQnHTk= github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY= github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= @@ -131,8 +146,8 @@ github.com/jxskiss/base62 v1.1.0 h1:A5zbF8v8WXx2xixnAKD2w+abC+sIzYJX+nxmhA6HWFw= github.com/jxskiss/base62 v1.1.0/go.mod h1:HhWAlUXvxKThfOlZbcuFzsqwtF5TcqS9ru3y5GfjWAc= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= -github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc= github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -150,16 +165,20 @@ github.com/lithammer/shortuuid/v4 v4.0.0 h1:QRbbVkfgNippHOS8PXDkti4NaWeyYfcBTHtw github.com/lithammer/shortuuid/v4 v4.0.0/go.mod h1:Zs8puNcrvf2rV9rTH51ZLLcj7ZXqQI3lv67aw4KiB1Y= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkDaKb5iXdynYrzB84ErPPO4LbRASk58= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= -github.com/livekit/mediatransportutil v0.0.0-20240613015318-84b69facfb75 h1:p60OjeixzXnhGFQL8wmdUwWPxijEDe9ZJFMosq+byec= -github.com/livekit/mediatransportutil v0.0.0-20240613015318-84b69facfb75/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE= -github.com/livekit/protocol v1.18.0 h1:LLOjKBA8rtnGpVGjAmKUROy7bv/l9q1wyn9hNmj8Sdg= -github.com/livekit/protocol v1.18.0/go.mod h1:cN8WmGQR+kWz1+UWcAQdFFUcbW76PnfZDdkLAbYIqd4= -github.com/livekit/psrpc v0.5.3-0.20240526192918-fbdaf10e6aa5 h1:mTZyrjk5WEWMsvaYtJ42pG7DuxysKj21DKPINpGSIto= -github.com/livekit/psrpc v0.5.3-0.20240526192918-fbdaf10e6aa5/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= +github.com/livekit/mediatransportutil v0.0.0-20240625074155-301bb4a816b7 h1:F1L8inJoynwIAYpZENNYS+1xHJMF5RFRorsnAlcxfSY= +github.com/livekit/mediatransportutil v0.0.0-20240625074155-301bb4a816b7/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE= +github.com/livekit/protocol v1.19.2-0.20240705134535-94a2cfe2f1ee h1:J1U5fqAB5wJ4+Dl/DAf43Eiw+syyLTKAJoGuUj3rjQI= +github.com/livekit/protocol v1.19.2-0.20240705134535-94a2cfe2f1ee/go.mod h1:bNjJi+8frdvC84xG0CJ/7VfVvqerLg2MzjOks0ucyC4= +github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a h1:EQAHmcYEGlc6V517cQ3Iy0+jHgP6+tM/B4l2vGuLpQo= +github.com/livekit/psrpc v0.5.3-0.20240616012458-ac39c8549a0a/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2GXh+o= github.com/mackerelio/go-osstat v0.2.5/go.mod h1:atxwWF+POUZcdtR1wnsUcQxTytoHG4uhl2AKKzrOajY= github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg= github.com/magefile/mage v1.15.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0= github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/maxbrunsfeld/counterfeiter/v6 v6.8.1 h1:NicmruxkeqHjDv03SfSxqmaLuisddudfP3h5wdXFbhM= @@ -191,8 +210,8 @@ github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3N github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo= github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0= github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y= -github.com/nats-io/nats.go v1.35.0 h1:XFNqNM7v5B+MQMKqVGAyHwYhyKb48jrenXNxIU20ULk= -github.com/nats-io/nats.go v1.35.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nats.go v1.36.0 h1:suEUPuWzTSse/XhESwqLxXGuj8vGRuPRoG7MoRN/qyU= +github.com/nats-io/nats.go v1.36.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= @@ -231,8 +250,8 @@ github.com/pion/rtp v1.8.3/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU github.com/pion/rtp v1.8.6 h1:MTmn/b0aWWsAzux2AmP8WGllusBVw4NPYPVFFd7jUPw= github.com/pion/rtp v1.8.6/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU= github.com/pion/sctp v1.8.5/go.mod h1:SUFFfDpViyKejTAdwD1d/HQsCu+V/40cCs2nZIvC3s0= -github.com/pion/sctp v1.8.16 h1:PKrMs+o9EMLRvFfXq59WFsC+V8mN1wnKzqrv+3D/gYY= -github.com/pion/sctp v1.8.16/go.mod h1:P6PbDVA++OJMrVNg2AL3XtYHV4uD6dvfyOovCgMs0PE= +github.com/pion/sctp v1.8.17 h1:uK1VChU9C/J3Sj/Cq6cwEGtx2Gludgoi5zwZBZzWNT0= +github.com/pion/sctp v1.8.17/go.mod h1:P6PbDVA++OJMrVNg2AL3XtYHV4uD6dvfyOovCgMs0PE= github.com/pion/sdp/v3 v3.0.9 h1:pX++dCHoHUwq43kuwf3PyJfHlwIj4hXA7Vrifiq0IJY= github.com/pion/sdp/v3 v3.0.9/go.mod h1:B5xmvENq5IXJimIO4zfp6LAe1fD9N+kFv+V/1lOdz8M= github.com/pion/srtp/v2 v2.0.18 h1:vKpAXfawO9RtTRKZJbG4y0v1b11NZxQnxRl85kGuUlo= @@ -282,11 +301,12 @@ github.com/sclevine/spec v1.4.0 h1:z/Q9idDcay5m5irkZ28M7PtQM4aOISzOpj4bUPkDee8= github.com/sclevine/spec v1.4.0/go.mod h1:LvpgJaFyvQzRvc1kaDs0bulYwzC70PbiYjC4QnFHkOM= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/stoewer/go-strcase v1.3.0 h1:g0eASXYtp+yvN9fK8sH94oCIk0fau9uV1/ZdJ0AVEzs= +github.com/stoewer/go-strcase v1.3.0/go.mod h1:fAH5hQ5pehh+j3nZfvwdk2RgEgQjAoM8wodgtPmh1xo= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= -github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -472,10 +492,14 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240521202816-d264139d666e h1:Elxv5MwEkCI9f5SkoL6afed6NTdxaGoAo39eANBwHL8= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240521202816-d264139d666e/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0= +google.golang.org/genproto/googleapis/api v0.0.0-20240401170217-c3f982113cda h1:b6F6WIV4xHHD0FA4oIyzU6mHWg2WI2X1RBehwa5QN38= +google.golang.org/genproto/googleapis/api v0.0.0-20240401170217-c3f982113cda/go.mod h1:AHcE/gZH76Bk/ROZhQphlRoWo5xKDEtz3eVEO1LfA8c= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda h1:LI5DOvAxUPMv/50agcLLoo+AdWc1irS9Rzz4vPuD1V4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY= google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/pkg/agent/client.go b/pkg/agent/client.go index d2fdabf91..c82d6c3f5 100644 --- a/pkg/agent/client.go +++ b/pkg/agent/client.go @@ -20,15 +20,13 @@ import ( "time" "github.com/gammazero/workerpool" - "google.golang.org/protobuf/types/known/emptypb" - serverutils "github.com/livekit/livekit-server/pkg/utils" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/rpc" "github.com/livekit/protocol/utils" - "github.com/livekit/protocol/utils/guid" "github.com/livekit/psrpc" + "google.golang.org/protobuf/types/known/emptypb" ) const ( @@ -43,15 +41,17 @@ const ( type Client interface { // LaunchJob starts a room or participant job on an agent. // it will launch a job once for each worker in each namespace - LaunchJob(ctx context.Context, desc *JobDescription) + LaunchJob(ctx context.Context, desc *JobRequest) Stop() error } -type JobDescription struct { +type JobRequest struct { JobType livekit.JobType Room *livekit.Room // only set for participant jobs Participant *livekit.ParticipantInfo + Metadata string + Namespace string } type agentClient struct { @@ -105,52 +105,75 @@ func NewAgentClient(bus psrpc.MessageBus) (Client, error) { return c, nil } -func (c *agentClient) LaunchJob(ctx context.Context, desc *JobDescription) { - roomNamespaces, publisherNamespaces, needsRefresh := c.getOrCreateDispatchers() - - if needsRefresh { - go c.checkEnabled(ctx, roomNamespaces, publisherNamespaces) - } - - target := roomNamespaces +func (c *agentClient) LaunchJob(ctx context.Context, desc *JobRequest) { jobTypeTopic := RoomAgentTopic if desc.JobType == livekit.JobType_JT_PUBLISHER { - target = publisherNamespaces jobTypeTopic = PublisherAgentTopic } - target.ForEach(func(ns string) { - c.workers.Submit(func() { - _, err := c.client.JobRequest(ctx, ns, jobTypeTopic, &livekit.Job{ - Id: guid.New(utils.AgentJobPrefix), - Type: desc.JobType, - Room: desc.Room, - Participant: desc.Participant, - Namespace: ns, - }) - if err != nil { - logger.Errorw("failed to send job request", err, "namespace", ns, "jobType", jobTypeTopic) - } + if !c.isNamespaceActive(desc.Namespace, desc.JobType) { + logger.Infow("not dispatching agent job since no worker is available", "namespace", desc.Namespace, "jobType", desc.JobType) + return + } + + c.workers.Submit(func() { + _, err := c.client.JobRequest(context.Background(), desc.Namespace, jobTypeTopic, &livekit.Job{ + Id: utils.NewGuid(utils.AgentJobPrefix), + Type: desc.JobType, + Room: desc.Room, + Participant: desc.Participant, + Namespace: desc.Namespace, + Metadata: desc.Metadata, }) + if err != nil { + logger.Infow("failed to send job request", "error", err, "namespace", desc.Namespace, "jobType", desc.JobType) + } }) } -func (c *agentClient) getOrCreateDispatchers() (*serverutils.IncrementalDispatcher[string], *serverutils.IncrementalDispatcher[string], bool) { +func (c *agentClient) isNamespaceActive(ns string, jobType livekit.JobType) bool { c.mu.Lock() - defer c.mu.Unlock() if time.Since(c.enabledExpiresAt) > EnabledCacheTTL || c.roomNamespaces == nil || c.publisherNamespaces == nil { + c.enabledExpiresAt = time.Now() c.roomNamespaces = serverutils.NewIncrementalDispatcher[string]() c.publisherNamespaces = serverutils.NewIncrementalDispatcher[string]() - return c.roomNamespaces, c.publisherNamespaces, true + go c.checkEnabled(c.roomNamespaces, c.publisherNamespaces) } - return c.roomNamespaces, c.publisherNamespaces, false + + target := c.roomNamespaces + if jobType == livekit.JobType_JT_PUBLISHER { + target = c.publisherNamespaces + } + + c.mu.Unlock() + + done := make(chan bool, 1) + + go func() { + target.ForEach(func(curNs string) { + if curNs == ns { + select { + case done <- true: + default: + } + + return + } + }) + select { + case done <- false: + default: + } + }() + + return <-done } -func (c *agentClient) checkEnabled(ctx context.Context, roomNamespaces, publisherNamespaces *serverutils.IncrementalDispatcher[string]) { +func (c *agentClient) checkEnabled(roomNamespaces, publisherNamespaces *serverutils.IncrementalDispatcher[string]) { defer roomNamespaces.Done() defer publisherNamespaces.Done() - resChan, err := c.client.CheckEnabled(ctx, &rpc.CheckEnabledRequest{}, psrpc.WithRequestTimeout(CheckEnabledTimeout)) + resChan, err := c.client.CheckEnabled(context.Background(), &rpc.CheckEnabledRequest{}, psrpc.WithRequestTimeout(CheckEnabledTimeout)) if err != nil { logger.Errorw("failed to check enabled", err) return diff --git a/pkg/agent/job.go b/pkg/agent/job.go deleted file mode 100644 index 6edfe4a27..000000000 --- a/pkg/agent/job.go +++ /dev/null @@ -1,86 +0,0 @@ -// Copyright 2024 LiveKit, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package agent - -import ( - "sync" - - "github.com/livekit/protocol/livekit" - "github.com/livekit/protocol/logger" -) - -// Represents a job that is being executed by a worker -type Job struct { - id string - jobType livekit.JobType - status livekit.JobStatus - namespace string - - mu sync.Mutex - load float32 - - Logger logger.Logger -} - -func NewJob(id, namespace string, jobType livekit.JobType) *Job { - return &Job{ - id: id, - status: livekit.JobStatus_JS_UNKNOWN, - jobType: jobType, - namespace: namespace, - } -} - -func (j *Job) ID() string { - return j.id -} - -func (j *Job) Namespace() string { - return j.namespace -} - -func (j *Job) Type() livekit.JobType { - return j.jobType -} - -func (j *Job) WorkerLoad() float32 { - // Current load that this job is taking on its worker - j.mu.Lock() - defer j.mu.Unlock() - return j.load -} - -func (j *Job) UpdateStatus(req *livekit.UpdateJobStatus) { - j.mu.Lock() - - if req.Status != nil { - j.status = *req.Status // End of the job, SUCCESS or FAILURE - - if j.status == livekit.JobStatus_JS_FAILED { - j.Logger.Errorw("job failed", nil, "id", j.id, "type", j.jobType, "error", req.Error) - } - } - - j.load = req.Load - j.mu.Unlock() - - if req.Metadata != nil { - j.UpdateMetadata(req.GetMetadata()) - } -} - -func (j *Job) UpdateMetadata(metadata string) { - j.Logger.Debugw("job metadata", nil, "id", j.id, "metadata", metadata) -} diff --git a/pkg/agent/worker.go b/pkg/agent/worker.go index bff334988..a7d2a4bad 100644 --- a/pkg/agent/worker.go +++ b/pkg/agent/worker.go @@ -21,13 +21,9 @@ import ( "sync/atomic" "time" - "github.com/gorilla/websocket" - pagent "github.com/livekit/protocol/agent" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" - "github.com/livekit/protocol/utils" - putil "github.com/livekit/protocol/utils" "github.com/livekit/protocol/utils/guid" ) @@ -42,13 +38,40 @@ const ( ) var ( - ErrWorkerClosed = errors.New("worker closed") - ErrWorkerNotAvailable = errors.New("worker not available") - ErrAvailabilityTimeout = errors.New("agent worker availability timeout") + ErrWorkerClosed = errors.New("worker closed") + ErrWorkerNotAvailable = errors.New("worker not available") + ErrAvailabilityTimeout = errors.New("agent worker availability timeout") + ErrDuplicateJobAssignment = errors.New("duplicate job assignment") ) -type sigConn interface { +type SignalConn interface { WriteServerMessage(msg *livekit.ServerMessage) (int, error) + ReadWorkerMessage() (*livekit.WorkerMessage, int, error) + Close() error +} + +type WorkerHandler interface { + HandleWorkerRegister(w *Worker) + HandleWorkerDeregister(w *Worker) + HandleWorkerStatus(w *Worker, status *livekit.UpdateWorkerStatus) + HandleWorkerJobStatus(w *Worker, status *livekit.UpdateJobStatus) + HandleWorkerSimulateJob(w *Worker, job *livekit.Job) + HandleWorkerMigrateJob(w *Worker, request *livekit.MigrateJobRequest) +} + +var _ WorkerHandler = UnimplementedWorkerHandler{} + +type UnimplementedWorkerHandler struct{} + +func (UnimplementedWorkerHandler) HandleWorkerRegister(*Worker) {} +func (UnimplementedWorkerHandler) HandleWorkerDeregister(*Worker) {} +func (UnimplementedWorkerHandler) HandleWorkerStatus(*Worker, *livekit.UpdateWorkerStatus) {} +func (UnimplementedWorkerHandler) HandleWorkerJobStatus(*Worker, *livekit.UpdateJobStatus) {} +func (UnimplementedWorkerHandler) HandleWorkerSimulateJob(*Worker, *livekit.Job) {} +func (UnimplementedWorkerHandler) HandleWorkerMigrateJob(*Worker, *livekit.MigrateJobRequest) {} + +func JobStatusIsEnded(s livekit.JobStatus) bool { + return s == livekit.JobStatus_JS_SUCCESS || s == livekit.JobStatus_JS_FAILED } type Worker struct { @@ -67,20 +90,19 @@ type Worker struct { protocolVersion WorkerProtocolVersion registered atomic.Bool status livekit.WorkerStatus - runningJobs map[string]*Job + runningJobs map[string]*livekit.Job - onWorkerRegistered func(w *Worker) + handler WorkerHandler - conn *websocket.Conn - sigConn sigConn - closed chan struct{} + conn SignalConn + closed chan struct{} availability map[string]chan *livekit.AvailabilityResponse ctx context.Context cancel context.CancelFunc - Logger logger.Logger + logger logger.Logger } func NewWorker( @@ -88,42 +110,42 @@ func NewWorker( apiKey string, apiSecret string, serverInfo *livekit.ServerInfo, - conn *websocket.Conn, - sigConn sigConn, + conn SignalConn, logger logger.Logger, + handler WorkerHandler, ) *Worker { ctx, cancel := context.WithCancel(context.Background()) + id := guid.New(guid.AgentWorkerPrefix) w := &Worker{ - id: putil.NewGuid(utils.AgentWorkerPrefix), + id: id, protocolVersion: protocolVersion, apiKey: apiKey, apiSecret: apiSecret, serverInfo: serverInfo, closed: make(chan struct{}), - runningJobs: make(map[string]*Job), + runningJobs: make(map[string]*livekit.Job), availability: make(map[string]chan *livekit.AvailabilityResponse), conn: conn, - sigConn: sigConn, ctx: ctx, cancel: cancel, - Logger: logger, + logger: logger.WithValues("workerID", id), + handler: handler, } - go func() { - <-time.After(registerTimeout) + time.AfterFunc(registerTimeout, func() { if !w.registered.Load() && !w.IsClosed() { - w.Logger.Warnw("worker did not register in time", nil, "id", w.id) + w.logger.Warnw("worker did not register in time", nil, "id", w.id) w.Close() } - }() + }) return w } func (w *Worker) sendRequest(req *livekit.ServerMessage) { - if _, err := w.sigConn.WriteServerMessage(req); err != nil { - w.Logger.Errorw("error writing to websocket", err) + if _, err := w.conn.WriteServerMessage(req); err != nil { + w.logger.Errorw("error writing to websocket", err) } } @@ -132,14 +154,10 @@ func (w *Worker) ID() string { } func (w *Worker) JobType() livekit.JobType { - w.mu.Lock() - defer w.mu.Unlock() return w.jobType } func (w *Worker) Namespace() string { - w.mu.Lock() - defer w.mu.Unlock() return w.namespace } @@ -155,20 +173,14 @@ func (w *Worker) Load() float32 { return w.load } -func (w *Worker) OnWorkerRegistered(f func(w *Worker)) { - w.mu.Lock() - defer w.mu.Unlock() - w.onWorkerRegistered = f +func (w *Worker) Logger() logger.Logger { + return w.logger } -func (w *Worker) Registered() bool { - return w.registered.Load() -} - -func (w *Worker) RunningJobs() map[string]*Job { - jobs := make(map[string]*Job, len(w.runningJobs)) +func (w *Worker) RunningJobs() map[string]*livekit.Job { w.mu.Lock() defer w.mu.Unlock() + jobs := make(map[string]*livekit.Job, len(w.runningJobs)) for k, v := range w.runningJobs { jobs[k] = v } @@ -179,13 +191,31 @@ func (w *Worker) AssignJob(ctx context.Context, job *livekit.Job) error { availCh := make(chan *livekit.AvailabilityResponse, 1) w.mu.Lock() + if _, ok := w.availability[job.Id]; ok { + w.mu.Unlock() + return ErrDuplicateJobAssignment + } + w.availability[job.Id] = availCh w.mu.Unlock() + defer func() { + w.mu.Lock() + delete(w.availability, job.Id) + w.mu.Unlock() + }() + + if job.State == nil { + job.State = &livekit.JobState{} + } + w.sendRequest(&livekit.ServerMessage{Message: &livekit.ServerMessage_Availability{ Availability: &livekit.AvailabilityRequest{Job: job}, }}) + timeout := time.NewTimer(assignJobTimeout) + defer timeout.Stop() + // See handleAvailability for the response select { case res := <-availCh: @@ -195,7 +225,7 @@ func (w *Worker) AssignJob(ctx context.Context, job *livekit.Job) error { token, err := pagent.BuildAgentToken(w.apiKey, w.apiSecret, job.Room.Name, res.ParticipantIdentity, res.ParticipantName, res.ParticipantMetadata, w.permissions) if err != nil { - w.Logger.Errorw("failed to build agent token", err) + w.logger.Errorw("failed to build agent token", err) return err } @@ -204,9 +234,14 @@ func (w *Worker) AssignJob(ctx context.Context, job *livekit.Job) error { Assignment: &livekit.JobAssignment{Job: job, Url: nil, Token: token}, }}) - // TODO(theomonnom): Check if an agent was successfully connected to the room before returning + w.mu.Lock() + w.runningJobs[job.Id] = job + w.mu.Unlock() + + // TODO sweep jobs that are never started. We can't do this until all SDKs actually update the the JOB state + return nil - case <-time.After(assignJobTimeout): + case <-timeout.C: return ErrAvailabilityTimeout case <-w.ctx.Done(): return ErrWorkerClosed @@ -215,21 +250,8 @@ func (w *Worker) AssignJob(ctx context.Context, job *livekit.Job) error { } } -func (w *Worker) UpdateStatus(status *livekit.UpdateWorkerStatus) { - w.mu.Lock() - if status.Status != nil { - w.status = status.GetStatus() - } - w.load = status.GetLoad() - w.mu.Unlock() - - if status.Metadata != nil { - w.UpdateMetadata(status.GetMetadata()) - } -} - func (w *Worker) UpdateMetadata(metadata string) { - w.Logger.Debugw("worker metadata updated", nil, "metadata", metadata) + w.logger.Debugw("worker metadata updated", nil, "metadata", metadata) } func (w *Worker) IsClosed() bool { @@ -248,45 +270,56 @@ func (w *Worker) Close() { return } - w.Logger.Infow("closing worker") + w.logger.Infow("closing worker") close(w.closed) w.cancel() _ = w.conn.Close() w.mu.Unlock() + + if w.registered.Load() { + w.handler.HandleWorkerDeregister(w) + } } func (w *Worker) HandleMessage(req *livekit.WorkerMessage) { switch m := req.Message.(type) { case *livekit.WorkerMessage_Register: - go w.handleRegister(m.Register) + w.handleRegister(m.Register) case *livekit.WorkerMessage_Availability: - go w.handleAvailability(m.Availability) + w.handleAvailability(m.Availability) case *livekit.WorkerMessage_UpdateJob: - go w.handleJobUpdate(m.UpdateJob) + w.handleJobUpdate(m.UpdateJob) case *livekit.WorkerMessage_SimulateJob: - go w.handleSimulateJob(m.SimulateJob) + w.handleSimulateJob(m.SimulateJob) case *livekit.WorkerMessage_Ping: - go w.handleWorkerPing(m.Ping) + w.handleWorkerPing(m.Ping) case *livekit.WorkerMessage_UpdateWorker: - go w.handleWorkerStatus(m.UpdateWorker) + w.handleWorkerStatus(m.UpdateWorker) case *livekit.WorkerMessage_MigrateJob: - go w.handleMigrateJob(m.MigrateJob) + w.handleMigrateJob(m.MigrateJob) } } func (w *Worker) handleRegister(req *livekit.RegisterWorkerRequest) { - if w.registered.Load() { - w.Logger.Warnw("worker already registered", nil, "id", w.id) + w.mu.Lock() + var err error + if w.IsClosed() { + err = errors.New("worker closed") + } + if w.registered.Swap(true) { + err = errors.New("worker already registered") + } + if err != nil { + w.mu.Unlock() + w.logger.Warnw("unable to register worker", err, "id", w.id) return } - w.mu.Lock() - onWorkerRegistered := w.onWorkerRegistered - w.jobType = req.Type w.version = req.Version w.name = req.Name w.namespace = req.GetNamespace() + w.jobType = req.GetType() if req.AllowedPermissions != nil { w.permissions = req.AllowedPermissions @@ -301,10 +334,9 @@ func (w *Worker) handleRegister(req *livekit.RegisterWorkerRequest) { } w.status = livekit.WorkerStatus_WS_AVAILABLE - w.registered.Store(true) w.mu.Unlock() - w.Logger.Debugw("worker registered", "request", req) + w.logger.Debugw("worker registered", "request", logger.Proto(req)) w.sendRequest(&livekit.ServerMessage{ Message: &livekit.ServerMessage_Register{ @@ -315,9 +347,7 @@ func (w *Worker) handleRegister(req *livekit.RegisterWorkerRequest) { }, }) - if onWorkerRegistered != nil { - onWorkerRegistered(w) - } + w.handler.HandleWorkerRegister(w) } func (w *Worker) handleAvailability(res *livekit.AvailabilityResponse) { @@ -326,7 +356,7 @@ func (w *Worker) handleAvailability(res *livekit.AvailabilityResponse) { availCh, ok := w.availability[res.JobId] if !ok { - w.Logger.Warnw("received availability response for unknown job", nil, "jobId", res.JobId) + w.logger.Warnw("received availability response for unknown job", nil, "jobId", res.JobId) return } @@ -336,15 +366,34 @@ func (w *Worker) handleAvailability(res *livekit.AvailabilityResponse) { func (w *Worker) handleJobUpdate(update *livekit.UpdateJobStatus) { w.mu.Lock() - job, ok := w.runningJobs[update.JobId] - w.mu.Unlock() + job, ok := w.runningJobs[update.JobId] if !ok { - w.Logger.Warnw("received job update for unknown job", nil, "jobId", update.JobId) + w.logger.Infow("received job update for unknown job", "jobId", update.JobId) return } - job.UpdateStatus(update) + now := time.Now() + job.State.UpdatedAt = now.UnixNano() + + if job.State.Status == livekit.JobStatus_JS_PENDING && JobStatusIsEnded(update.Status) { + job.State.StartedAt = now.UnixNano() + } + + if job.State.Status < livekit.JobStatus_JS_SUCCESS && JobStatusIsEnded(update.Status) { + job.State.EndedAt = now.UnixNano() + } + + job.State.Status = update.Status + job.State.Error = update.Error + + // TODO do not delete, leave inside the JobDefinition + if JobStatusIsEnded(job.State.Status) { + delete(w.runningJobs, job.Id) + } + w.mu.Unlock() + + w.handler.HandleWorkerJobStatus(w, update) } func (w *Worker) handleSimulateJob(simulate *livekit.SimulateJobRequest) { @@ -354,19 +403,21 @@ func (w *Worker) handleSimulateJob(simulate *livekit.SimulateJobRequest) { } job := &livekit.Job{ - Id: guid.New(utils.AgentJobPrefix), + Id: guid.New(guid.AgentJobPrefix), Type: jobType, Room: simulate.Room, Participant: simulate.Participant, Namespace: w.Namespace(), } - ctx := context.Background() - err := w.AssignJob(ctx, job) - if err != nil { - w.Logger.Errorw("failed to simulate job, assignment failed", err, "jobId", job.Id) - } - + go func() { + err := w.AssignJob(w.ctx, job) + if err != nil { + w.logger.Errorw("failed to simulate job, assignment failed", err, "jobId", job.Id) + } else { + w.handler.HandleWorkerSimulateJob(w, job) + } + }() } func (w *Worker) handleWorkerPing(ping *livekit.WorkerPing) { @@ -379,11 +430,20 @@ func (w *Worker) handleWorkerPing(ping *livekit.WorkerPing) { } func (w *Worker) handleWorkerStatus(update *livekit.UpdateWorkerStatus) { - w.Logger.Debugw("worker status update", "status", update.Status, "load", update.Load) - w.UpdateStatus(update) + w.logger.Debugw("worker status update", "update", logger.Proto(update)) + + w.mu.Lock() + if update.Status != nil { + w.status = update.GetStatus() + } + w.load = update.GetLoad() + w.mu.Unlock() + + w.handler.HandleWorkerStatus(w, update) } func (w *Worker) handleMigrateJob(migrate *livekit.MigrateJobRequest) { // TODO(theomonnom): On OSS this is not implemented // We could maybe just move a specific job to another worker + w.handler.HandleWorkerMigrateJob(w, migrate) } diff --git a/pkg/config/config.go b/pkg/config/config.go index 306580414..3f9dd31aa 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -28,6 +28,7 @@ import ( "gopkg.in/yaml.v3" "github.com/livekit/mediatransportutil/pkg/rtcconfig" + "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" redisLiveKit "github.com/livekit/protocol/redis" "github.com/livekit/protocol/rpc" @@ -58,9 +59,11 @@ var ( ) type Config struct { - Port uint32 `yaml:"port,omitempty"` - BindAddresses []string `yaml:"bind_addresses,omitempty"` + Port uint32 `yaml:"port,omitempty"` + BindAddresses []string `yaml:"bind_addresses,omitempty"` + // PrometheusPort is deprecated PrometheusPort uint32 `yaml:"prometheus_port,omitempty"` + Prometheus PrometheusConfig `yaml:"prometheus,omitempty"` RTC RTCConfig `yaml:"rtc,omitempty"` Redis redisLiveKit.RedisConfig `yaml:"redis,omitempty"` Audio AudioConfig `yaml:"audio,omitempty"` @@ -244,7 +247,28 @@ type RoomConfig struct { // deprecated, moved to limits MaxRoomNameLength int `yaml:"max_room_name_length,omitempty"` // deprecated, moved to limits - MaxParticipantIdentityLength int `yaml:"max_participant_identity_length,omitempty"` + MaxParticipantIdentityLength int `yaml:"max_participant_identity_length,omitempty"` + RoomConfigurations map[string]RoomConfiguration `yaml:"room_configurations,omitempty"` +} + +type RoomConfiguration struct { + Name string `yaml:"name,omitempty"` // Used as ID, must be unique + // number of seconds to keep the room open if no one joins + EmptyTimeout uint32 `yaml:"empty_timeout,omitempty"` + // number of seconds to keep the room open after everyone leaves + DepartureTimeout uint32 `yaml:"departure_timeout,omitempty"` + // limit number of participants that can be in a room + MaxParticipants uint32 `yaml:"max_participants,omitempty"` + // egress + Egress *livekit.RoomEgress `yaml:"egress,omitempty"` + // agent + Agent *livekit.RoomAgent `yaml:"agent,omitempty"` + // playout delay of subscriber + MinPlayoutDelay uint32 `yaml:"min_playout_delay,omitempty"` + MaxPlayoutDelay uint32 `yaml:"max_playout_delay,omitempty"` + // improves A/V sync when playout_delay set to a value larger than 200ms. It will disables transceiver re-use + // so not recommended for rooms with frequent subscription changes + SyncStreams bool `yaml:"sync_streams"` } type CodecSpec struct { @@ -328,6 +352,12 @@ type APIConfig struct { MaxCheckInterval time.Duration `yaml:"max_check_interval,omitempty"` } +type PrometheusConfig struct { + Port uint32 `yaml:"port,omitempty"` + Username string `yaml:"username,omitempty"` + Password string `yaml:"password,omitempty"` +} + type ForwardStatsConfig struct { SummaryInterval time.Duration `yaml:"summary_interval,omitempty"` ReportInterval time.Duration `yaml:"report_interval,omitempty"` diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 833266de6..cf5fb23fc 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -60,7 +60,7 @@ func TestGeneratedFlags(t *testing.T) { set := flag.NewFlagSet("test", 0) set.Bool("rtc.use_ice_lite", true, "") // bool set.String("redis.address", "localhost:6379", "") // string - set.Uint("prometheus_port", 9999, "") // uint32 + set.Uint("prometheus.port", 9999, "") // uint32 set.Bool("rtc.allow_tcp_fallback", true, "") // pointer set.Bool("rtc.reconnect_on_publication_error", true, "") // pointer set.Bool("rtc.reconnect_on_subscription_error", false, "") // pointer @@ -71,7 +71,7 @@ func TestGeneratedFlags(t *testing.T) { require.True(t, conf.RTC.UseICELite) require.Equal(t, "localhost:6379", conf.Redis.Address) - require.Equal(t, uint32(9999), conf.PrometheusPort) + require.Equal(t, uint32(9999), conf.Prometheus.Port) require.NotNil(t, conf.RTC.AllowTCPFallback) require.True(t, *conf.RTC.AllowTCPFallback) diff --git a/pkg/rtc/mediatrack.go b/pkg/rtc/mediatrack.go index 85368df95..ec5030ca3 100644 --- a/pkg/rtc/mediatrack.go +++ b/pkg/rtc/mediatrack.go @@ -40,9 +40,10 @@ import ( // MediaTrack represents a WebRTC track that needs to be forwarded // Implements MediaTrack and PublishedTrack interface type MediaTrack struct { - params MediaTrackParams - numUpTracks atomic.Uint32 - buffer *buffer.Buffer + params MediaTrackParams + numUpTracks atomic.Uint32 + buffer *buffer.Buffer + everSubscribed atomic.Bool *MediaTrackReceiver *MediaLossProxy @@ -55,22 +56,23 @@ type MediaTrack struct { } type MediaTrackParams struct { - SignalCid string - SdpCid string - ParticipantID livekit.ParticipantID - ParticipantIdentity livekit.ParticipantIdentity - ParticipantVersion uint32 - BufferFactory *buffer.Factory - ReceiverConfig ReceiverConfig - SubscriberConfig DirectionConfig - PLIThrottleConfig config.PLIThrottleConfig - AudioConfig config.AudioConfig - VideoConfig config.VideoConfig - Telemetry telemetry.TelemetryService - Logger logger.Logger - SimTracks map[uint32]SimulcastTrackInfo - OnRTCP func([]rtcp.Packet) - ForwardStats *sfu.ForwardStats + SignalCid string + SdpCid string + ParticipantID livekit.ParticipantID + ParticipantIdentity livekit.ParticipantIdentity + ParticipantVersion uint32 + BufferFactory *buffer.Factory + ReceiverConfig ReceiverConfig + SubscriberConfig DirectionConfig + PLIThrottleConfig config.PLIThrottleConfig + AudioConfig config.AudioConfig + VideoConfig config.VideoConfig + Telemetry telemetry.TelemetryService + Logger logger.Logger + SimTracks map[uint32]SimulcastTrackInfo + OnRTCP func([]rtcp.Packet) + ForwardStats *sfu.ForwardStats + OnTrackEverSubscribed func(livekit.TrackID) } func NewMediaTrack(params MediaTrackParams, ti *livekit.TrackInfo) *MediaTrack { @@ -283,6 +285,7 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra sfu.WithLoadBalanceThreshold(20), sfu.WithStreamTrackers(), sfu.WithForwardStats(t.params.ForwardStats), + sfu.WithEverHasDowntrackAdded(t.handleReceiverEverAddDowntrack), ) newWR.OnCloseHandler(func() { t.MediaTrackReceiver.SetClosing() @@ -430,3 +433,9 @@ func (t *MediaTrack) SetMuted(muted bool) { t.MediaTrackReceiver.SetMuted(muted) } + +func (t *MediaTrack) handleReceiverEverAddDowntrack() { + if !t.everSubscribed.Swap(true) && t.params.OnTrackEverSubscribed != nil { + go t.params.OnTrackEverSubscribed(t.ID()) + } +} diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 95636b1d9..1d3824033 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -461,6 +461,9 @@ func (p *ParticipantImpl) SetMetadata(metadata string) { } func (p *ParticipantImpl) SetAttributes(attrs map[string]string) error { + if len(attrs) == 0 { + return nil + } p.lock.Lock() grants := p.grants.Load().Clone() if grants.Attributes == nil { @@ -491,6 +494,7 @@ func (p *ParticipantImpl) SetAttributes(attrs map[string]string) error { } p.grants.Store(grants) + p.requireBroadcast = true // already checked above p.dirty.Store(true) onParticipantUpdate := p.onParticipantUpdate @@ -2136,22 +2140,23 @@ func (p *ParticipantImpl) addMigratedTrack(cid string, ti *livekit.TrackInfo) *M func (p *ParticipantImpl) addMediaTrack(signalCid string, sdpCid string, ti *livekit.TrackInfo) *MediaTrack { mt := NewMediaTrack(MediaTrackParams{ - SignalCid: signalCid, - SdpCid: sdpCid, - ParticipantID: p.params.SID, - ParticipantIdentity: p.params.Identity, - ParticipantVersion: p.version.Load(), - BufferFactory: p.params.Config.BufferFactory, - ReceiverConfig: p.params.Config.Receiver, - AudioConfig: p.params.AudioConfig, - VideoConfig: p.params.VideoConfig, - Telemetry: p.params.Telemetry, - Logger: LoggerWithTrack(p.pubLogger, livekit.TrackID(ti.Sid), false), - SubscriberConfig: p.params.Config.Subscriber, - PLIThrottleConfig: p.params.PLIThrottleConfig, - SimTracks: p.params.SimTracks, - OnRTCP: p.postRtcp, - ForwardStats: p.params.ForwardStats, + SignalCid: signalCid, + SdpCid: sdpCid, + ParticipantID: p.params.SID, + ParticipantIdentity: p.params.Identity, + ParticipantVersion: p.version.Load(), + BufferFactory: p.params.Config.BufferFactory, + ReceiverConfig: p.params.Config.Receiver, + AudioConfig: p.params.AudioConfig, + VideoConfig: p.params.VideoConfig, + Telemetry: p.params.Telemetry, + Logger: LoggerWithTrack(p.pubLogger, livekit.TrackID(ti.Sid), false), + SubscriberConfig: p.params.Config.Subscriber, + PLIThrottleConfig: p.params.PLIThrottleConfig, + SimTracks: p.params.SimTracks, + OnRTCP: p.postRtcp, + ForwardStats: p.params.ForwardStats, + OnTrackEverSubscribed: p.sendTrackHasBeenSubscribed, }, ti) mt.OnSubscribedMaxQualityChange(p.onSubscribedMaxQualityChange) diff --git a/pkg/rtc/participant_signal.go b/pkg/rtc/participant_signal.go index 6f84eadd9..35dc7148d 100644 --- a/pkg/rtc/participant_signal.go +++ b/pkg/rtc/participant_signal.go @@ -286,6 +286,17 @@ func (p *ParticipantImpl) sendTrackUnpublished(trackID livekit.TrackID) { }) } +func (p *ParticipantImpl) sendTrackHasBeenSubscribed(trackID livekit.TrackID) { + _ = p.writeMessage(&livekit.SignalResponse{ + Message: &livekit.SignalResponse_TrackSubscribed{ + TrackSubscribed: &livekit.TrackSubscribed{ + TrackSid: string(trackID), + }, + }, + }) + p.params.Logger.Debugw("track has been subscribed", "trackID", trackID) +} + func (p *ParticipantImpl) writeMessage(msg *livekit.SignalResponse) error { if p.IsDisconnected() || (!p.IsReady() && msg.GetJoin() == nil) { return nil diff --git a/pkg/rtc/room.go b/pkg/rtc/room.go index 9a5ecfc7b..6d1f28b69 100644 --- a/pkg/rtc/room.go +++ b/pkg/rtc/room.go @@ -1014,7 +1014,7 @@ func (r *Room) onTrackPublished(participant types.LocalParticipant, track types. r.lock.Unlock() if !hasPublished { - r.launchPublisherAgent(participant) + r.launchPublisherAgents(participant) if r.internal != nil && r.internal.ParticipantEgress != nil { go func() { if err := StartParticipantEgress( @@ -1423,16 +1423,44 @@ func (r *Room) simulationCleanupWorker() { } } -func (r *Room) launchPublisherAgent(p types.Participant) { +func (r *Room) launchPublisherAgents(p types.Participant) { if p == nil || p.IsDependent() || r.agentClient == nil { return } - go r.agentClient.LaunchJob(context.Background(), &agent.JobDescription{ - JobType: livekit.JobType_JT_PUBLISHER, - Room: r.ToProto(), - Participant: p.ToProto(), - }) + if r.internal == nil { + return + } + + for _, ag := range r.internal.Agents { + if ag.Type != livekit.JobType_JT_PUBLISHER { + continue + } + + var startAgent bool + + if len(ag.ParticipantIdentity) == 0 { + // If no participant given, start for all participants + startAgent = true + } else { + for _, pi := range ag.ParticipantIdentity { + if pi == string(p.Identity()) { + startAgent = true + break + } + } + } + + if startAgent { + go r.agentClient.LaunchJob(context.Background(), &agent.JobRequest{ + JobType: livekit.JobType_JT_PUBLISHER, + Room: r.ToProto(), + Participant: p.ToProto(), + Metadata: ag.Metadata, + Namespace: ag.Namespace, + }) + } + } } func (r *Room) DebugInfo() map[string]interface{} { @@ -1530,6 +1558,9 @@ func connectionDetailsFields(cds []*types.ICEConnectionDetails) []interface{} { } else if c.Filtered { cStr += "[filtered]" } + if c.Trickle { + cStr += "[trickle]" + } cStr += " " + c.Local.String() candidates = append(candidates, cStr) } @@ -1540,6 +1571,9 @@ func connectionDetailsFields(cds []*types.ICEConnectionDetails) []interface{} { } else if c.Filtered { cStr += "[filtered]" } + if c.Trickle { + cStr += "[trickle]" + } cStr += " " + c.Remote.String() candidates = append(candidates, cStr) } diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index 8e062f6d9..146861937 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -1366,7 +1366,7 @@ func (t *PCTransport) handleLocalICECandidate(e event) error { t.params.Logger.Debugw("filtering out local candidate", "candidate", c.String()) filtered = true } - t.connectionDetails.AddLocalCandidate(c, filtered) + t.connectionDetails.AddLocalCandidate(c, filtered, true) } if filtered { @@ -1395,7 +1395,7 @@ func (t *PCTransport) handleRemoteICECandidate(e event) error { filtered = true } - t.connectionDetails.AddRemoteCandidate(*c, filtered) + t.connectionDetails.AddRemoteCandidate(*c, filtered, true) if filtered { return nil } @@ -1422,7 +1422,7 @@ func (t *PCTransport) setNegotiationState(state transport.NegotiationState) { } } -func (t *PCTransport) filterCandidates(sd webrtc.SessionDescription, preferTCP bool) webrtc.SessionDescription { +func (t *PCTransport) filterCandidates(sd webrtc.SessionDescription, preferTCP, isLocal bool) webrtc.SessionDescription { parsed, err := sd.Unmarshal() if err != nil { t.params.Logger.Warnw("could not unmarshal SDP to filter candidates", err) @@ -1432,13 +1432,22 @@ func (t *PCTransport) filterCandidates(sd webrtc.SessionDescription, preferTCP b filterAttributes := func(attrs []sdp.Attribute) []sdp.Attribute { filteredAttrs := make([]sdp.Attribute, 0, len(attrs)) for _, a := range attrs { - if a.Key == sdp.AttrKeyCandidate { - if preferTCP { - if strings.Contains(a.Value, "tcp") { - filteredAttrs = append(filteredAttrs, a) - } - } else { + if a.IsICECandidate() { + c, err := ice.UnmarshalCandidate(a.Value) + if err != nil { + t.params.Logger.Errorw("failed to unmarshal candidate in sdp", err, "isLocal", isLocal, "sdp", sd.SDP) filteredAttrs = append(filteredAttrs, a) + continue + } + excluded := preferTCP && !c.NetworkType().IsTCP() + if !excluded { + filteredAttrs = append(filteredAttrs, a) + } + + if isLocal { + t.connectionDetails.AddLocalICECandidate(c, excluded, false) + } else { + t.connectionDetails.AddRemoteICECandidate(c, excluded, false) } } else { filteredAttrs = append(filteredAttrs, a) @@ -1566,7 +1575,7 @@ func (t *PCTransport) createAndSendOffer(options *webrtc.OfferOptions) error { // Filtered offer is sent to remote so that remote does not // see filtered candidates. // - offer = t.filterCandidates(offer, preferTCP) + offer = t.filterCandidates(offer, preferTCP, true) if preferTCP { t.params.Logger.Debugw("local offer (filtered)", "sdp", offer.SDP) } @@ -1616,7 +1625,7 @@ func (t *PCTransport) setRemoteDescription(sd webrtc.SessionDescription) error { if preferTCP { t.params.Logger.Debugw("remote description (unfiltered)", "type", sd.Type, "sdp", sd.SDP) } - sd = t.filterCandidates(sd, preferTCP) + sd = t.filterCandidates(sd, preferTCP, false) if preferTCP { t.params.Logger.Debugw("remote description (filtered)", "type", sd.Type, "sdp", sd.SDP) } @@ -1683,7 +1692,7 @@ func (t *PCTransport) createAndSendAnswer() error { // Filtered answer is sent to remote so that remote does not // see filtered candidates. // - answer = t.filterCandidates(answer, preferTCP) + answer = t.filterCandidates(answer, preferTCP, true) if preferTCP { t.params.Logger.Debugw("local answer (filtered)", "sdp", answer.SDP) } diff --git a/pkg/rtc/transport_test.go b/pkg/rtc/transport_test.go index 7bd1067b3..ab5cd5a88 100644 --- a/pkg/rtc/transport_test.go +++ b/pkg/rtc/transport_test.go @@ -416,7 +416,7 @@ func TestFilteringCandidates(t *testing.T) { // should not filter out UDP candidates if TCP is not preferred offer = *transport.pc.LocalDescription() - filteredOffer := transport.filterCandidates(offer, false) + filteredOffer := transport.filterCandidates(offer, false, true) require.EqualValues(t, offer.SDP, filteredOffer.SDP) parsed, err := offer.Unmarshal() @@ -494,7 +494,7 @@ func TestFilteringCandidates(t *testing.T) { require.Equal(t, 2, tcp) transport.SetPreferTCP(true) - filteredOffer = transport.filterCandidates(offer, true) + filteredOffer = transport.filterCandidates(offer, true, true) parsed, err = filteredOffer.Unmarshal() require.NoError(t, err) udp, tcp = getNumTransportTypeCandidates(parsed) diff --git a/pkg/rtc/types/ice.go b/pkg/rtc/types/ice.go index f628c4a30..a659e6d03 100644 --- a/pkg/rtc/types/ice.go +++ b/pkg/rtc/types/ice.go @@ -17,6 +17,7 @@ package types import ( + "fmt" "strings" "sync" @@ -43,6 +44,7 @@ type ICECandidateExtended struct { Remote ice.Candidate Selected bool Filtered bool + Trickle bool } type ICEConnectionDetails struct { @@ -85,6 +87,7 @@ func (d *ICEConnectionDetails) Clone() *ICEConnectionDetails { Local: c.Local, Filtered: c.Filtered, Selected: c.Selected, + Trickle: c.Trickle, }) } for _, c := range d.Remote { @@ -92,32 +95,48 @@ func (d *ICEConnectionDetails) Clone() *ICEConnectionDetails { Remote: c.Remote, Filtered: c.Filtered, Selected: c.Selected, + Trickle: c.Trickle, }) } return clone } -func (d *ICEConnectionDetails) AddLocalCandidate(c *webrtc.ICECandidate, filtered bool) { +func (d *ICEConnectionDetails) AddLocalCandidate(c *webrtc.ICECandidate, filtered, trickle bool) { d.lock.Lock() defer d.lock.Unlock() compFn := func(e *ICECandidateExtended) bool { return isCandidateEqualTo(e.Local, c) } - if slices.ContainsFunc[[]*ICECandidateExtended, *ICECandidateExtended](d.Local, compFn) { + if slices.ContainsFunc(d.Local, compFn) { return } d.Local = append(d.Local, &ICECandidateExtended{ Local: c, Filtered: filtered, + Trickle: trickle, }) } -func (d *ICEConnectionDetails) AddRemoteCandidate(c webrtc.ICECandidateInit, filtered bool) { +func (d *ICEConnectionDetails) AddLocalICECandidate(c ice.Candidate, filtered, trickle bool) { + candidate, err := unmarshalCandidate(c) + if err != nil { + d.logger.Errorw("could not unmarshal ice candidate", err, "candidate", c) + return + } + + d.AddLocalCandidate(candidate, filtered, trickle) +} + +func (d *ICEConnectionDetails) AddRemoteCandidate(c webrtc.ICECandidateInit, filtered, trickle bool) { candidate, err := unmarshalICECandidate(c) if err != nil { d.logger.Errorw("could not unmarshal candidate", err, "candidate", c) return } + d.AddRemoteICECandidate(candidate, filtered, trickle) +} + +func (d *ICEConnectionDetails) AddRemoteICECandidate(candidate ice.Candidate, filtered, trickle bool) { if candidate == nil { // end-of-candidates candidate return @@ -126,14 +145,15 @@ func (d *ICEConnectionDetails) AddRemoteCandidate(c webrtc.ICECandidateInit, fil d.lock.Lock() defer d.lock.Unlock() compFn := func(e *ICECandidateExtended) bool { - return isICECandidateEqualTo(e.Remote, *candidate) + return isICECandidateEqualTo(e.Remote, candidate) } - if slices.ContainsFunc[[]*ICECandidateExtended, *ICECandidateExtended](d.Remote, compFn) { + if slices.ContainsFunc(d.Remote, compFn) { return } d.Remote = append(d.Remote, &ICECandidateExtended{ - Remote: *candidate, + Remote: candidate, Filtered: filtered, + Trickle: trickle, }) } @@ -148,7 +168,7 @@ func (d *ICEConnectionDetails) Clear() { func (d *ICEConnectionDetails) SetSelectedPair(pair *webrtc.ICECandidatePair) { d.lock.Lock() defer d.lock.Unlock() - remoteIdx := slices.IndexFunc[[]*ICECandidateExtended, *ICECandidateExtended](d.Remote, func(e *ICECandidateExtended) bool { + remoteIdx := slices.IndexFunc(d.Remote, func(e *ICECandidateExtended) bool { return isICECandidateEqualToCandidate(e.Remote, pair.Remote) }) if remoteIdx < 0 { @@ -162,15 +182,16 @@ func (d *ICEConnectionDetails) SetSelectedPair(pair *webrtc.ICECandidatePair) { return } d.Remote = append(d.Remote, &ICECandidateExtended{ - Remote: *candidate, + Remote: candidate, Filtered: false, + Trickle: true, }) remoteIdx = len(d.Remote) - 1 } remote := d.Remote[remoteIdx] remote.Selected = true - localIdx := slices.IndexFunc[[]*ICECandidateExtended, *ICECandidateExtended](d.Local, func(e *ICECandidateExtended) bool { + localIdx := slices.IndexFunc(d.Local, func(e *ICECandidateExtended) bool { return isCandidateEqualTo(e.Local, pair.Local) }) if localIdx < 0 { @@ -214,7 +235,6 @@ func isCandidateEqualTo(c1, c2 *webrtc.ICECandidate) bool { c1.Protocol == c2.Protocol && c1.Address == c2.Address && c1.Port == c2.Port && - c1.Component == c2.Component && c1.Foundation == c2.Foundation && c1.Priority == c2.Priority && c1.RelatedAddress == c2.RelatedAddress && @@ -233,7 +253,6 @@ func isICECandidateEqualTo(c1, c2 ice.Candidate) bool { c1.NetworkType() == c2.NetworkType() && c1.Address() == c2.Address() && c1.Port() == c2.Port() && - c1.Component() == c2.Component() && c1.Foundation() == c2.Foundation() && c1.Priority() == c2.Priority() && c1.RelatedAddress().Equal(c2.RelatedAddress()) && @@ -251,13 +270,12 @@ func isICECandidateEqualToCandidate(c1 ice.Candidate, c2 *webrtc.ICECandidate) b c1.NetworkType().NetworkShort() == c2.Protocol.String() && c1.Address() == c2.Address && c1.Port() == int(c2.Port) && - c1.Component() == c2.Component && c1.Foundation() == c2.Foundation && c1.Priority() == c2.Priority && c1.TCPType().String() == c2.TCPType } -func unmarshalICECandidate(c webrtc.ICECandidateInit) (*ice.Candidate, error) { +func unmarshalICECandidate(c webrtc.ICECandidateInit) (ice.Candidate, error) { candidateValue := strings.TrimPrefix(c.Candidate, "candidate:") if candidateValue == "" { return nil, nil @@ -268,5 +286,49 @@ func unmarshalICECandidate(c webrtc.ICECandidateInit) (*ice.Candidate, error) { return nil, err } - return &candidate, nil + return candidate, nil +} + +func unmarshalCandidate(i ice.Candidate) (*webrtc.ICECandidate, error) { + var typ webrtc.ICECandidateType + switch i.Type() { + case ice.CandidateTypeHost: + typ = webrtc.ICECandidateTypeHost + case ice.CandidateTypeServerReflexive: + typ = webrtc.ICECandidateTypeSrflx + case ice.CandidateTypePeerReflexive: + typ = webrtc.ICECandidateTypePrflx + case ice.CandidateTypeRelay: + typ = webrtc.ICECandidateTypeRelay + default: + return nil, fmt.Errorf("unknown candidate type: %s", i.Type()) + } + + var protocol webrtc.ICEProtocol + switch strings.ToLower(i.NetworkType().NetworkShort()) { + case "udp": + protocol = webrtc.ICEProtocolUDP + case "tcp": + protocol = webrtc.ICEProtocolTCP + default: + return nil, fmt.Errorf("unknown network type: %s", i.NetworkType()) + } + + c := webrtc.ICECandidate{ + Foundation: i.Foundation(), + Priority: i.Priority(), + Address: i.Address(), + Protocol: protocol, + Port: uint16(i.Port()), + Component: i.Component(), + Typ: typ, + TCPType: i.TCPType().String(), + } + + if i.RelatedAddress() != nil { + c.RelatedAddress = i.RelatedAddress().Address + c.RelatedPort = uint16(i.RelatedAddress().Port) + } + + return &c, nil } diff --git a/pkg/rtc/types/interfaces.go b/pkg/rtc/types/interfaces.go index e548900e8..e877677c7 100644 --- a/pkg/rtc/types/interfaces.go +++ b/pkg/rtc/types/interfaces.go @@ -39,6 +39,7 @@ type WebsocketClient interface { ReadMessage() (messageType int, p []byte, err error) WriteMessage(messageType int, data []byte) error WriteControl(messageType int, data []byte, deadline time.Time) error + Close() error } type AddSubscriberParams struct { diff --git a/pkg/rtc/types/typesfakes/fake_websocket_client.go b/pkg/rtc/types/typesfakes/fake_websocket_client.go index 8cb00b9d9..0a9c14b7f 100644 --- a/pkg/rtc/types/typesfakes/fake_websocket_client.go +++ b/pkg/rtc/types/typesfakes/fake_websocket_client.go @@ -9,6 +9,16 @@ import ( ) type FakeWebsocketClient struct { + CloseStub func() error + closeMutex sync.RWMutex + closeArgsForCall []struct { + } + closeReturns struct { + result1 error + } + closeReturnsOnCall map[int]struct { + result1 error + } ReadMessageStub func() (int, []byte, error) readMessageMutex sync.RWMutex readMessageArgsForCall []struct { @@ -52,6 +62,59 @@ type FakeWebsocketClient struct { invocationsMutex sync.RWMutex } +func (fake *FakeWebsocketClient) Close() error { + fake.closeMutex.Lock() + ret, specificReturn := fake.closeReturnsOnCall[len(fake.closeArgsForCall)] + fake.closeArgsForCall = append(fake.closeArgsForCall, struct { + }{}) + stub := fake.CloseStub + fakeReturns := fake.closeReturns + fake.recordInvocation("Close", []interface{}{}) + fake.closeMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeWebsocketClient) CloseCallCount() int { + fake.closeMutex.RLock() + defer fake.closeMutex.RUnlock() + return len(fake.closeArgsForCall) +} + +func (fake *FakeWebsocketClient) CloseCalls(stub func() error) { + fake.closeMutex.Lock() + defer fake.closeMutex.Unlock() + fake.CloseStub = stub +} + +func (fake *FakeWebsocketClient) CloseReturns(result1 error) { + fake.closeMutex.Lock() + defer fake.closeMutex.Unlock() + fake.CloseStub = nil + fake.closeReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeWebsocketClient) CloseReturnsOnCall(i int, result1 error) { + fake.closeMutex.Lock() + defer fake.closeMutex.Unlock() + fake.CloseStub = nil + if fake.closeReturnsOnCall == nil { + fake.closeReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.closeReturnsOnCall[i] = struct { + result1 error + }{result1} +} + func (fake *FakeWebsocketClient) ReadMessage() (int, []byte, error) { fake.readMessageMutex.Lock() ret, specificReturn := fake.readMessageReturnsOnCall[len(fake.readMessageArgsForCall)] @@ -249,6 +312,8 @@ func (fake *FakeWebsocketClient) WriteMessageReturnsOnCall(i int, result1 error) func (fake *FakeWebsocketClient) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() + fake.closeMutex.RLock() + defer fake.closeMutex.RUnlock() fake.readMessageMutex.RLock() defer fake.readMessageMutex.RUnlock() fake.writeControlMutex.RLock() diff --git a/pkg/service/agentservice.go b/pkg/service/agentservice.go index 9a7106438..dd3cbd6dc 100644 --- a/pkg/service/agentservice.go +++ b/pkg/service/agentservice.go @@ -17,11 +17,11 @@ package service import ( "context" "errors" - "io" "math/rand" "net/http" + "slices" + "sort" "strconv" - "strings" "sync" "time" @@ -41,13 +41,48 @@ import ( "github.com/livekit/psrpc" ) +type AgentSocketUpgrader struct { + websocket.Upgrader +} + +func (u AgentSocketUpgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (*websocket.Conn, agent.WorkerProtocolVersion, bool) { + // reject non websocket requests + if !websocket.IsWebSocketUpgrade(r) { + w.WriteHeader(404) + return nil, 0, false + } + + // require a claim + claims := GetGrants(r.Context()) + if claims == nil || claims.Video == nil || !claims.Video.Agent { + handleError(w, r, http.StatusUnauthorized, rtc.ErrPermissionDenied) + return nil, 0, false + } + + // upgrade + conn, err := u.Upgrader.Upgrade(w, r, responseHeader) + if err != nil { + handleError(w, r, http.StatusInternalServerError, err) + return nil, 0, false + } + + var protocol agent.WorkerProtocolVersion = agent.CurrentProtocol + if pv, err := strconv.Atoi(r.FormValue("protocol")); err == nil { + protocol = agent.WorkerProtocolVersion(pv) + } + + return conn, protocol, true +} + type AgentService struct { - upgrader websocket.Upgrader + upgrader AgentSocketUpgrader *AgentHandler } type AgentHandler struct { + agent.UnimplementedWorkerHandler + agentServer rpc.AgentInternalServer mu sync.Mutex logger logger.Logger @@ -56,16 +91,19 @@ type AgentHandler struct { workers map[string]*agent.Worker keyProvider auth.KeyProvider - namespaces map[string]*namespaceInfo - publisherEnabled bool - roomEnabled bool - roomTopic string - publisherTopic string + namespaceWorkers map[workerKey][]*agent.Worker + roomKeyCount int + publisherKeyCount int + // TODO remove once deprecated CheckEnabled is removed + namespaces []string + + roomTopic string + publisherTopic string } -type namespaceInfo struct { - numPublishers int32 - numRooms int32 +type workerKey struct { + namespace string + jobType livekit.JobType } func NewAgentService(conf *config.Config, @@ -73,9 +111,7 @@ func NewAgentService(conf *config.Config, bus psrpc.MessageBus, keyProvider auth.KeyProvider, ) (*AgentService, error) { - s := &AgentService{ - upgrader: websocket.Upgrader{}, - } + s := &AgentService{} // allow connections from any origin, since script may be hosted anywhere // security is enforced by access tokens @@ -108,27 +144,9 @@ func NewAgentService(conf *config.Config, } func (s *AgentService) ServeHTTP(writer http.ResponseWriter, r *http.Request) { - // reject non websocket requests - if !websocket.IsWebSocketUpgrade(r) { - writer.WriteHeader(404) - return + if conn, protocol, ok := s.upgrader.Upgrade(writer, r, nil); ok { + s.HandleConnection(r.Context(), NewWSSignalConnection(conn), protocol) } - - // require a claim - claims := GetGrants(r.Context()) - if claims == nil || claims.Video == nil || !claims.Video.Agent { - handleError(writer, r, http.StatusUnauthorized, rtc.ErrPermissionDenied) - return - } - - // upgrade - conn, err := s.upgrader.Upgrade(writer, r, nil) - if err != nil { - handleError(writer, r, http.StatusInternalServerError, err) - return - } - - s.HandleConnection(r, conn, nil) } func NewAgentHandler( @@ -140,193 +158,136 @@ func NewAgentHandler( publisherTopic string, ) *AgentHandler { return &AgentHandler{ - agentServer: agentServer, - logger: logger, - workers: make(map[string]*agent.Worker), - namespaces: make(map[string]*namespaceInfo), - serverInfo: serverInfo, - keyProvider: keyProvider, - roomTopic: roomTopic, - publisherTopic: publisherTopic, + agentServer: agentServer, + logger: logger, + workers: make(map[string]*agent.Worker), + namespaceWorkers: make(map[workerKey][]*agent.Worker), + serverInfo: serverInfo, + keyProvider: keyProvider, + roomTopic: roomTopic, + publisherTopic: publisherTopic, } } -func (h *AgentHandler) HandleConnection(r *http.Request, conn *websocket.Conn, onIdle func()) { - var protocol agent.WorkerProtocolVersion - if pv, err := strconv.Atoi(r.FormValue("protocol")); err == nil { - protocol = agent.WorkerProtocolVersion(pv) - } - - sigConn := NewWSSignalConnection(conn) - - apiKey := GetAPIKey(r.Context()) +func (h *AgentHandler) HandleConnection(ctx context.Context, conn agent.SignalConn, protocol agent.WorkerProtocolVersion) { + apiKey := GetAPIKey(ctx) apiSecret := h.keyProvider.GetSecret(apiKey) - worker := agent.NewWorker(protocol, apiKey, apiSecret, h.serverInfo, conn, sigConn, h.logger) - worker.OnWorkerRegistered(h.handleWorkerRegister) + worker := agent.NewWorker(protocol, apiKey, apiSecret, h.serverInfo, conn, h.logger, h) h.mu.Lock() h.workers[worker.ID()] = worker h.mu.Unlock() - defer func() { - worker.Close() - - h.mu.Lock() - delete(h.workers, worker.ID()) - numWorkers := len(h.workers) - h.mu.Unlock() - - if worker.Registered() { - h.handleWorkerDeregister(worker) - } - - if numWorkers == 0 && onIdle != nil { - onIdle() - } - }() - for { - req, _, err := sigConn.ReadWorkerMessage() + req, _, err := conn.ReadWorkerMessage() if err != nil { - // normal/expected closure - if err == io.EOF || - strings.HasSuffix(err.Error(), "use of closed network connection") || - strings.HasSuffix(err.Error(), "connection reset by peer") || - websocket.IsCloseError( - err, - websocket.CloseAbnormalClosure, - websocket.CloseGoingAway, - websocket.CloseNormalClosure, - websocket.CloseNoStatusReceived, - ) { - worker.Logger.Infow("worker closed WS connection", "wsError", err) + if IsWebSocketCloseError(err) { + worker.Logger().Infow("worker closed WS connection", "wsError", err) } else { - worker.Logger.Errorw("error reading from websocket", err) + worker.Logger().Errorw("error reading from websocket", err) } - return + break } worker.HandleMessage(req) } -} -func (h *AgentHandler) handleWorkerRegister(w *agent.Worker) { h.mu.Lock() - - info, ok := h.namespaces[w.Namespace()] - numPublishers := int32(0) - numRooms := int32(0) - if ok { - numPublishers = info.numPublishers - numRooms = info.numRooms - } - - shouldNotify := false - var err error - if w.JobType() == livekit.JobType_JT_PUBLISHER { - numPublishers++ - if numPublishers == 1 { - shouldNotify = true - err = h.agentServer.RegisterJobRequestTopic(w.Namespace(), h.publisherTopic) - } - - } else if w.JobType() == livekit.JobType_JT_ROOM { - numRooms++ - if numRooms == 1 { - shouldNotify = true - err = h.agentServer.RegisterJobRequestTopic(w.Namespace(), h.roomTopic) - } - } - - if err != nil { - w.Logger.Errorw("failed to register job request topic", err) - h.mu.Unlock() - w.Close() // Close the worker - return - } - - h.namespaces[w.Namespace()] = &namespaceInfo{ - numPublishers: numPublishers, - numRooms: numRooms, - } - - h.roomEnabled = h.roomAvailableLocked() - h.publisherEnabled = h.publisherAvailableLocked() + delete(h.workers, worker.ID()) h.mu.Unlock() - if shouldNotify { - h.logger.Infow("initial worker registered", "namespace", w.Namespace(), "jobType", w.JobType()) - err = h.agentServer.PublishWorkerRegistered(context.Background(), agent.DefaultHandlerNamespace, &emptypb.Empty{}) + worker.Close() +} + +func (h *AgentHandler) HandleWorkerRegister(w *agent.Worker) { + h.mu.Lock() + + key := workerKey{w.Namespace(), w.JobType()} + + workers := h.namespaceWorkers[key] + created := len(workers) == 0 + + if created { + topic := h.roomTopic + if w.JobType() == livekit.JobType_JT_PUBLISHER { + topic = h.publisherTopic + } + err := h.agentServer.RegisterJobRequestTopic(w.Namespace(), topic) if err != nil { - w.Logger.Errorw("failed to publish worker registered", err) + h.mu.Unlock() + + w.Logger().Errorw("failed to register job request topic", err) + w.Close() + return + } + + if w.JobType() == livekit.JobType_JT_ROOM { + h.roomKeyCount++ + } else { + h.publisherKeyCount++ + } + + h.namespaces = append(h.namespaces, w.Namespace()) + sort.Strings(h.namespaces) + } + + h.namespaceWorkers[key] = append(workers, w) + h.mu.Unlock() + + if created { + h.logger.Infow("initial worker registered", "namespace", w.Namespace(), "jobType", w.JobType()) + err := h.agentServer.PublishWorkerRegistered(context.Background(), agent.DefaultHandlerNamespace, &emptypb.Empty{}) + if err != nil { + w.Logger().Errorw("failed to publish worker registered", err) } } } -func (h *AgentHandler) handleWorkerDeregister(worker *agent.Worker) { +func (h *AgentHandler) HandleWorkerDeregister(w *agent.Worker) { h.mu.Lock() defer h.mu.Unlock() - info, ok := h.namespaces[worker.Namespace()] + key := workerKey{w.Namespace(), w.JobType()} + + workers, ok := h.namespaceWorkers[key] if !ok { return } - - if worker.JobType() == livekit.JobType_JT_PUBLISHER { - info.numPublishers-- - if info.numPublishers == 0 { - h.agentServer.DeregisterJobRequestTopic(worker.Namespace(), h.publisherTopic) - } - } else if worker.JobType() == livekit.JobType_JT_ROOM { - info.numRooms-- - if info.numRooms == 0 { - h.agentServer.DeregisterJobRequestTopic(worker.Namespace(), h.roomTopic) - } + index := slices.Index(workers, w) + if index == -1 { + return } - if info.numPublishers == 0 && info.numRooms == 0 { + if len(workers) > 1 { + h.namespaceWorkers[key] = slices.Delete(workers, index, index+1) + } else { h.logger.Debugw("last worker deregistered") - delete(h.namespaces, worker.Namespace()) - } + delete(h.namespaceWorkers, key) - h.roomEnabled = h.roomAvailableLocked() - h.publisherEnabled = h.publisherAvailableLocked() -} - -func (h *AgentHandler) roomAvailableLocked() bool { - for _, w := range h.workers { if w.JobType() == livekit.JobType_JT_ROOM { - return true + h.roomKeyCount-- + h.agentServer.DeregisterJobRequestTopic(w.Namespace(), h.roomTopic) + } else { + h.publisherKeyCount-- + h.agentServer.DeregisterJobRequestTopic(w.Namespace(), h.publisherTopic) + } + + if i := slices.Index(h.namespaces, w.Namespace()); i != -1 { + h.namespaces = slices.Delete(h.namespaces, i, i+1) } } - return false - -} - -func (h *AgentHandler) publisherAvailableLocked() bool { - for _, w := range h.workers { - if w.JobType() == livekit.JobType_JT_PUBLISHER { - return true - } - } - - return false } func (h *AgentHandler) JobRequest(ctx context.Context, job *livekit.Job) (*emptypb.Empty, error) { - attempted := make(map[string]bool) + key := workerKey{job.Namespace, job.Type} + attempted := make(map[*agent.Worker]struct{}) for { h.mu.Lock() var selected *agent.Worker var maxLoad float32 - for _, w := range h.workers { - if w.Namespace() != job.Namespace || w.JobType() != job.Type { - continue - } - - _, ok := attempted[w.ID()] - if ok { + for _, w := range h.namespaceWorkers[key] { + if _, ok := attempted[w]; ok { continue } @@ -339,7 +300,6 @@ func (h *AgentHandler) JobRequest(ctx context.Context, job *livekit.Job) (*empty selected = w } } - } h.mu.Unlock() @@ -347,7 +307,7 @@ func (h *AgentHandler) JobRequest(ctx context.Context, job *livekit.Job) (*empty return nil, psrpc.NewErrorf(psrpc.DeadlineExceeded, "no workers available") } - attempted[selected.ID()] = true + attempted[selected] = struct{}{} values := []interface{}{ "jobID", job.Id, @@ -371,7 +331,6 @@ func (h *AgentHandler) JobRequest(ctx context.Context, job *livekit.Job) (*empty return &emptypb.Empty{}, nil } - } func (h *AgentHandler) JobRequestAffinity(ctx context.Context, job *livekit.Job) float32 { @@ -394,7 +353,6 @@ func (h *AgentHandler) JobRequestAffinity(ctx context.Context, job *livekit.Job) affinity = 0.5 } } - } return affinity @@ -403,24 +361,14 @@ func (h *AgentHandler) JobRequestAffinity(ctx context.Context, job *livekit.Job) func (h *AgentHandler) CheckEnabled(ctx context.Context, req *rpc.CheckEnabledRequest) (*rpc.CheckEnabledResponse, error) { h.mu.Lock() defer h.mu.Unlock() - namespaces := make([]string, 0, len(h.namespaces)) - for ns := range h.namespaces { - namespaces = append(namespaces, ns) - } return &rpc.CheckEnabledResponse{ - Namespaces: namespaces, - RoomEnabled: h.roomEnabled, - PublisherEnabled: h.publisherEnabled, + Namespaces: slices.Compact(slices.Clone(h.namespaces)), + RoomEnabled: h.roomKeyCount != 0, + PublisherEnabled: h.publisherKeyCount != 0, }, nil } -func (h *AgentHandler) NumConnections() int { - h.mu.Lock() - defer h.mu.Unlock() - return len(h.workers) -} - func (h *AgentHandler) DrainConnections(interval time.Duration) { // jitter drain start time.Sleep(time.Duration(rand.Int63n(int64(interval)))) diff --git a/pkg/service/auth.go b/pkg/service/auth.go index c319eceff..8ea3a3617 100644 --- a/pkg/service/auth.go +++ b/pkg/service/auth.go @@ -196,6 +196,22 @@ func EnsureIngressAdminPermission(ctx context.Context) error { return nil } +func EnsureSIPAdminPermission(ctx context.Context) error { + claims := GetGrants(ctx) + if claims == nil || claims.SIP == nil || !claims.SIP.Admin { + return ErrPermissionDenied + } + return nil +} + +func EnsureSIPCallPermission(ctx context.Context) error { + claims := GetGrants(ctx) + if claims == nil || claims.SIP == nil || !claims.SIP.Call { + return ErrPermissionDenied + } + return nil +} + // wraps authentication errors around Twirp func twirpAuthError(err error) error { return twirp.NewError(twirp.Unauthenticated, err.Error()) diff --git a/pkg/service/basic_auth.go b/pkg/service/basic_auth.go new file mode 100644 index 000000000..40173c6fa --- /dev/null +++ b/pkg/service/basic_auth.go @@ -0,0 +1,31 @@ +package service + +import ( + "net/http" +) + +func GenBasicAuthMiddleware(username string, password string) (func(http.ResponseWriter, *http.Request, http.HandlerFunc) ) { + return func(rw http.ResponseWriter, r *http.Request, next http.HandlerFunc) { + given_username, given_password, ok := r.BasicAuth() + unauthorized := func() { + rw.Header().Set("WWW-Authenticate", "Basic realm=\"Protected Area\"") + rw.WriteHeader(http.StatusUnauthorized) + } + if !ok { + unauthorized() + return + } + + if given_username != username { + unauthorized() + return + } + + if given_password != password { + unauthorized() + return + } + + next(rw, r) + } +} \ No newline at end of file diff --git a/pkg/service/roomallocator.go b/pkg/service/roomallocator.go index c8556e645..eecb2bb30 100644 --- a/pkg/service/roomallocator.go +++ b/pkg/service/roomallocator.go @@ -19,10 +19,13 @@ import ( "errors" "time" + "google.golang.org/protobuf/proto" + "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils" "github.com/livekit/protocol/utils/guid" + "github.com/livekit/psrpc" "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/livekit-server/pkg/routing" @@ -78,6 +81,11 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre return nil, false, err } + req, err = r.applyNamedRoomConfiguration(req) + if err != nil { + return nil, false, err + } + if req.EmptyTimeout > 0 { rm.EmptyTimeout = req.EmptyTimeout } @@ -98,6 +106,24 @@ func (r *StandardRoomAllocator) CreateRoom(ctx context.Context, req *livekit.Cre internal.TrackEgress = req.Egress.Tracks } } + if req.Agent == nil { + // Backward compatibility: by default, start any agent in the empty namespace + req.Agent = &livekit.RoomAgent{ + Agents: []*livekit.CreateAgentJobDefinitionRequest{ + &livekit.CreateAgentJobDefinitionRequest{ + Type: livekit.JobType_JT_ROOM, + Room: req.Name, + Namespace: "default", + }, + &livekit.CreateAgentJobDefinitionRequest{ + Type: livekit.JobType_JT_PUBLISHER, + Room: req.Name, + Namespace: "default", + }, + }, + } + } + internal.Agents = req.Agent.Agents if req.MinPlayoutDelay > 0 || req.MaxPlayoutDelay > 0 { internal.PlayoutDelay = &livekit.PlayoutDelay{ Enabled: true, @@ -182,3 +208,44 @@ func applyDefaultRoomConfig(room *livekit.Room, internal *livekit.RoomInternal, } internal.SyncStreams = conf.SyncStreams } + +func (r *StandardRoomAllocator) applyNamedRoomConfiguration(req *livekit.CreateRoomRequest) (*livekit.CreateRoomRequest, error) { + if req.ConfigName == "" { + return req, nil + } + + conf, ok := r.config.Room.RoomConfigurations[req.ConfigName] + if !ok { + return nil, psrpc.NewErrorf(psrpc.InvalidArgument, "unknown roomc confguration in create room request") + } + + clone := proto.Clone(req).(*livekit.CreateRoomRequest) + + // Request overwrites conf + if clone.EmptyTimeout == 0 { + clone.EmptyTimeout = conf.EmptyTimeout + } + if clone.DepartureTimeout == 0 { + clone.DepartureTimeout = req.DepartureTimeout + } + if clone.MaxParticipants == 0 { + clone.MaxParticipants = conf.MaxParticipants + } + if clone.Egress == nil { + clone.Egress = proto.Clone(conf.Egress).(*livekit.RoomEgress) + } + if clone.Agent == nil { + clone.Agent = proto.Clone(conf.Agent).(*livekit.RoomAgent) + } + if clone.MinPlayoutDelay == 0 { + clone.MinPlayoutDelay = conf.MinPlayoutDelay + } + if clone.MaxPlayoutDelay == 0 { + clone.MaxPlayoutDelay = conf.MaxPlayoutDelay + } + if !clone.SyncStreams { + clone.SyncStreams = conf.SyncStreams + } + + return clone, nil +} diff --git a/pkg/service/roomservice.go b/pkg/service/roomservice.go index 19657b09a..2cc379bd4 100644 --- a/pkg/service/roomservice.go +++ b/pkg/service/roomservice.go @@ -103,10 +103,14 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq defer done() if created { - go s.agentClient.LaunchJob(context.WithoutCancel(ctx), &agent.JobDescription{ - JobType: livekit.JobType_JT_ROOM, - Room: rm, - }) + _, internal, err := s.roomStore.LoadRoom(ctx, livekit.RoomName(req.Name), true) + + if internal.Agents != nil { + err = s.launchAgents(ctx, rm, internal.Agents) + if err != nil { + return nil, err + } + } if req.Egress != nil && req.Egress.Room != nil { // ensure room name matches @@ -126,6 +130,23 @@ func (s *RoomService) CreateRoom(ctx context.Context, req *livekit.CreateRoomReq return rm, nil } +func (s *RoomService) launchAgents(ctx context.Context, rm *livekit.Room, agents []*livekit.CreateAgentJobDefinitionRequest) error { + for _, ag := range agents { + if ag.Type != livekit.JobType_JT_ROOM { + continue + } + + go s.agentClient.LaunchJob(ctx, &agent.JobRequest{ + JobType: ag.Type, + Room: rm, + Metadata: ag.Metadata, + Namespace: ag.Namespace, + }) + } + + return nil +} + func (s *RoomService) ListRooms(ctx context.Context, req *livekit.ListRoomsRequest) (*livekit.ListRoomsResponse, error) { AppendLogFields(ctx, "room", req.Names) err := EnsureListPermission(ctx) @@ -288,7 +309,7 @@ func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat return nil, twirpAuthError(err) } - room, _, err := s.roomStore.LoadRoom(ctx, livekit.RoomName(req.Room), false) + room, internal, err := s.roomStore.LoadRoom(ctx, livekit.RoomName(req.Room), false) if err != nil { return nil, err } @@ -323,10 +344,10 @@ func (s *RoomService) UpdateRoomMetadata(ctx context.Context, req *livekit.Updat } if created { - go s.agentClient.LaunchJob(context.WithoutCancel(ctx), &agent.JobDescription{ - JobType: livekit.JobType_JT_ROOM, - Room: room, - }) + err = s.launchAgents(ctx, room, internal.Agents) + if err != nil { + return nil, err + } } return room, nil diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index 5843231e2..86952b767 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -18,12 +18,10 @@ import ( "context" "errors" "fmt" - "io" "math/rand" "net/http" "os" "strconv" - "strings" "sync" "time" @@ -369,17 +367,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { for { req, count, err := sigConn.ReadRequest() if err != nil { - // normal/expected closure - if errors.Is(err, io.EOF) || - strings.HasSuffix(err.Error(), "use of closed network connection") || - strings.HasSuffix(err.Error(), "connection reset by peer") || - websocket.IsCloseError( - err, - websocket.CloseAbnormalClosure, - websocket.CloseGoingAway, - websocket.CloseNormalClosure, - websocket.CloseNoStatusReceived, - ) { + if IsWebSocketCloseError(err) { closedByClient.Store(true) } else { pLogger.Errorw("error reading from websocket", err, "connID", cr.ConnectionID) @@ -533,10 +521,24 @@ func (s *RTCService) startConnection( } if created && s.agentClient != nil { - go s.agentClient.LaunchJob(ctx, &agent.JobDescription{ - JobType: livekit.JobType_JT_ROOM, - Room: cr.Room, - }) + // TODO Have CreateRoom return the RoomInternal object? + _, internal, err := s.store.LoadRoom(ctx, livekit.RoomName(roomName), true) + if err != nil { + return connectionResult{}, nil, err + } + + for _, ag := range internal.Agents { + if ag.Type != livekit.JobType_JT_ROOM { + continue + } + + go s.agentClient.LaunchJob(ctx, &agent.JobRequest{ + JobType: ag.Type, + Room: cr.Room, + Metadata: ag.Metadata, + Namespace: ag.Namespace, + }) + } } // this needs to be started first *before* using router functions on this node diff --git a/pkg/service/server.go b/pkg/service/server.go index 5dc123fc0..88498518e 100644 --- a/pkg/service/server.go +++ b/pkg/service/server.go @@ -140,8 +140,20 @@ func NewLivekitServer(conf *config.Config, } if conf.PrometheusPort > 0 { + logger.Warnw("prometheus_port is deprecated, please switch prometheus.port instead", nil) + conf.Prometheus.Port = conf.PrometheusPort + } + + if conf.Prometheus.Port > 0 { + promHandler := promhttp.Handler() + if conf.Prometheus.Username != "" && conf.Prometheus.Password != "" { + protectedHandler := negroni.New() + protectedHandler.Use(negroni.HandlerFunc(GenBasicAuthMiddleware(conf.Prometheus.Username, conf.Prometheus.Password))) + protectedHandler.UseHandler(promHandler) + promHandler = protectedHandler + } s.promServer = &http.Server{ - Handler: promhttp.Handler(), + Handler: promHandler, } } @@ -207,7 +219,7 @@ func (s *LivekitServer) Start() error { listeners = append(listeners, ln) if s.promServer != nil { - ln, err = net.Listen("tcp", net.JoinHostPort(addr, strconv.Itoa(int(s.config.PrometheusPort)))) + ln, err = net.Listen("tcp", net.JoinHostPort(addr, strconv.Itoa(int(s.config.Prometheus.Port)))) if err != nil { return err } @@ -234,8 +246,8 @@ func (s *LivekitServer) Start() error { "rtc.portICERange", []uint32{s.config.RTC.ICEPortRangeStart, s.config.RTC.ICEPortRangeEnd}, ) } - if s.config.PrometheusPort != 0 { - values = append(values, "portPrometheus", s.config.PrometheusPort) + if s.config.Prometheus.Port != 0 { + values = append(values, "portPrometheus", s.config.Prometheus.Port) } if s.config.Region != "" { values = append(values, "region", s.config.Region) diff --git a/pkg/service/sip.go b/pkg/service/sip.go index c70522560..c32dbc7e1 100644 --- a/pkg/service/sip.go +++ b/pkg/service/sip.go @@ -61,6 +61,9 @@ func NewSIPService( } func (s *SIPService) CreateSIPTrunk(ctx context.Context, req *livekit.CreateSIPTrunkRequest) (*livekit.SIPTrunkInfo, error) { + if err := EnsureSIPAdminPermission(ctx); err != nil { + return nil, twirpAuthError(err) + } if s.store == nil { return nil, ErrSIPNotConnected } @@ -101,6 +104,9 @@ func (s *SIPService) CreateSIPTrunk(ctx context.Context, req *livekit.CreateSIPT } func (s *SIPService) CreateSIPInboundTrunk(ctx context.Context, req *livekit.CreateSIPInboundTrunkRequest) (*livekit.SIPInboundTrunkInfo, error) { + if err := EnsureSIPAdminPermission(ctx); err != nil { + return nil, twirpAuthError(err) + } if s.store == nil { return nil, ErrSIPNotConnected } @@ -132,6 +138,9 @@ func (s *SIPService) CreateSIPInboundTrunk(ctx context.Context, req *livekit.Cre } func (s *SIPService) CreateSIPOutboundTrunk(ctx context.Context, req *livekit.CreateSIPOutboundTrunkRequest) (*livekit.SIPOutboundTrunkInfo, error) { + if err := EnsureSIPAdminPermission(ctx); err != nil { + return nil, twirpAuthError(err) + } if s.store == nil { return nil, ErrSIPNotConnected } @@ -151,6 +160,9 @@ func (s *SIPService) CreateSIPOutboundTrunk(ctx context.Context, req *livekit.Cr } func (s *SIPService) ListSIPTrunk(ctx context.Context, req *livekit.ListSIPTrunkRequest) (*livekit.ListSIPTrunkResponse, error) { + if err := EnsureSIPAdminPermission(ctx); err != nil { + return nil, twirpAuthError(err) + } if s.store == nil { return nil, ErrSIPNotConnected } @@ -164,6 +176,9 @@ func (s *SIPService) ListSIPTrunk(ctx context.Context, req *livekit.ListSIPTrunk } func (s *SIPService) ListSIPInboundTrunk(ctx context.Context, req *livekit.ListSIPInboundTrunkRequest) (*livekit.ListSIPInboundTrunkResponse, error) { + if err := EnsureSIPAdminPermission(ctx); err != nil { + return nil, twirpAuthError(err) + } if s.store == nil { return nil, ErrSIPNotConnected } @@ -177,6 +192,9 @@ func (s *SIPService) ListSIPInboundTrunk(ctx context.Context, req *livekit.ListS } func (s *SIPService) ListSIPOutboundTrunk(ctx context.Context, req *livekit.ListSIPOutboundTrunkRequest) (*livekit.ListSIPOutboundTrunkResponse, error) { + if err := EnsureSIPAdminPermission(ctx); err != nil { + return nil, twirpAuthError(err) + } if s.store == nil { return nil, ErrSIPNotConnected } @@ -190,6 +208,9 @@ func (s *SIPService) ListSIPOutboundTrunk(ctx context.Context, req *livekit.List } func (s *SIPService) DeleteSIPTrunk(ctx context.Context, req *livekit.DeleteSIPTrunkRequest) (*livekit.SIPTrunkInfo, error) { + if err := EnsureSIPAdminPermission(ctx); err != nil { + return nil, twirpAuthError(err) + } if s.store == nil { return nil, ErrSIPNotConnected } @@ -207,6 +228,9 @@ func (s *SIPService) DeleteSIPTrunk(ctx context.Context, req *livekit.DeleteSIPT } func (s *SIPService) CreateSIPDispatchRule(ctx context.Context, req *livekit.CreateSIPDispatchRuleRequest) (*livekit.SIPDispatchRuleInfo, error) { + if err := EnsureSIPAdminPermission(ctx); err != nil { + return nil, twirpAuthError(err) + } if s.store == nil { return nil, ErrSIPNotConnected } @@ -239,6 +263,9 @@ func (s *SIPService) CreateSIPDispatchRule(ctx context.Context, req *livekit.Cre } func (s *SIPService) ListSIPDispatchRule(ctx context.Context, req *livekit.ListSIPDispatchRuleRequest) (*livekit.ListSIPDispatchRuleResponse, error) { + if err := EnsureSIPAdminPermission(ctx); err != nil { + return nil, twirpAuthError(err) + } if s.store == nil { return nil, ErrSIPNotConnected } @@ -252,6 +279,9 @@ func (s *SIPService) ListSIPDispatchRule(ctx context.Context, req *livekit.ListS } func (s *SIPService) DeleteSIPDispatchRule(ctx context.Context, req *livekit.DeleteSIPDispatchRuleRequest) (*livekit.SIPDispatchRuleInfo, error) { + if err := EnsureSIPAdminPermission(ctx); err != nil { + return nil, twirpAuthError(err) + } if s.store == nil { return nil, ErrSIPNotConnected } @@ -269,6 +299,9 @@ func (s *SIPService) DeleteSIPDispatchRule(ctx context.Context, req *livekit.Del } func (s *SIPService) CreateSIPParticipantWithToken(ctx context.Context, req *livekit.CreateSIPParticipantRequest, wsUrl, token string) (*livekit.SIPParticipantInfo, error) { + if err := EnsureSIPCallPermission(ctx); err != nil { + return nil, twirpAuthError(err) + } if s.store == nil { return nil, ErrSIPNotConnected } diff --git a/pkg/service/wsprotocol.go b/pkg/service/wsprotocol.go index d94ce3f90..cdfb72a8a 100644 --- a/pkg/service/wsprotocol.go +++ b/pkg/service/wsprotocol.go @@ -15,6 +15,9 @@ package service import ( + "errors" + "io" + "strings" "sync" "time" @@ -49,6 +52,10 @@ func NewWSSignalConnection(conn types.WebsocketClient) *WSSignalConnection { return wsc } +func (c *WSSignalConnection) Close() error { + return c.conn.Close() +} + func (c *WSSignalConnection) ReadRequest() (*livekit.SignalRequest, int, error) { for { // handle special messages and pass on the rest @@ -172,3 +179,17 @@ func (c *WSSignalConnection) pingWorker() { } } } + +// IsWebSocketCloseError checks that error is normal/expected closure +func IsWebSocketCloseError(err error) bool { + return errors.Is(err, io.EOF) || + strings.HasSuffix(err.Error(), "use of closed network connection") || + strings.HasSuffix(err.Error(), "connection reset by peer") || + websocket.IsCloseError( + err, + websocket.CloseAbnormalClosure, + websocket.CloseGoingAway, + websocket.CloseNormalClosure, + websocket.CloseNoStatusReceived, + ) +} diff --git a/pkg/sfu/buffer/buffer.go b/pkg/sfu/buffer/buffer.go index debc82eda..0480d719d 100644 --- a/pkg/sfu/buffer/buffer.go +++ b/pkg/sfu/buffer/buffer.go @@ -17,6 +17,7 @@ package buffer import ( "encoding/binary" "errors" + "fmt" "io" "strings" "sync" @@ -680,6 +681,29 @@ func (b *Buffer) patchExtPacket(ep *ExtPacket, buf []byte) *ExtPacket { b.logger.Warnw("unexpected marshal size", nil, "max", n, "need", payloadEnd) return nil } + // TODO-REMOVE-AFTER-DEBUG START + if payloadEnd != n { + b.logger.Warnw("unexpected marshal size", nil, "max", n, "need", payloadEnd) + } + // check a few fields for validity + checkVersion := (buf[0] & 0xc0) >> 6 + checkPayloadType := buf[1] & 0x7f + checkSequenceNumber := binary.BigEndian.Uint16(buf[2:]) + checkSSRC := binary.BigEndian.Uint32(buf[8:]) + if checkVersion != pkt.Version || checkPayloadType != pkt.PayloadType || checkSequenceNumber != pkt.SequenceNumber || checkSSRC != pkt.SSRC { + b.logger.Warnw( + "rtp packet mismatch", nil, + "version", fmt.Sprintf("%d != %d", checkVersion, pkt.Version), + "payloadType", fmt.Sprintf("%d != %d", checkPayloadType, pkt.PayloadType), + "sequenceNumber", fmt.Sprintf("%d != %d", checkSequenceNumber, pkt.SequenceNumber), + "SSRC", fmt.Sprintf("%d != %d", checkSSRC, pkt.SSRC), + "bytes", buf[0:16], + "len", n, + "headerSize", payloadStart, + "payloadSize", payloadEnd-payloadStart, + ) + } + // TODO-REMOVE-AFTER-DEBUG END pkt.Payload = buf[payloadStart:payloadEnd] ep.Packet = &pkt diff --git a/pkg/sfu/buffer/rtpstats_base.go b/pkg/sfu/buffer/rtpstats_base.go index e588f1dcb..07c906e70 100644 --- a/pkg/sfu/buffer/rtpstats_base.go +++ b/pkg/sfu/buffer/rtpstats_base.go @@ -15,6 +15,7 @@ package buffer import ( + "errors" "fmt" "sync" "time" @@ -38,7 +39,7 @@ const ( cPassthroughNTPTimestamp = true - cSequenceNumberLargeJumpThreshold = 1000 + cSequenceNumberLargeJumpThreshold = 100 ) // ------------------------------------------------------- @@ -573,33 +574,37 @@ func (r *rtpStatsBase) getTotalPacketsPrimary(extStartSN, extHighestSN uint64) u return packetsSeen - r.packetsPadding } -func (r *rtpStatsBase) deltaInfo(snapshotID uint32, extStartSN uint64, extHighestSN uint64) *RTPDeltaInfo { +func (r *rtpStatsBase) deltaInfo(snapshotID uint32, extStartSN uint64, extHighestSN uint64) (deltaInfo *RTPDeltaInfo, err error, loggingFields []interface{}) { then, now := r.getAndResetSnapshot(snapshotID, extStartSN, extHighestSN) if now == nil || then == nil { - return nil + return } startTime := then.startTime endTime := now.startTime packetsExpected := now.extStartSN - then.extStartSN + if then.extStartSN > extHighestSN { + packetsExpected = 0 + } if packetsExpected > cNumSequenceNumbers { - r.logger.Infow( - "too many packets expected in delta", + loggingFields = []interface{}{ "startSN", then.extStartSN, "endSN", now.extStartSN, "packetsExpected", packetsExpected, "startTime", startTime, "endTime", endTime, "duration", endTime.Sub(startTime).String(), - ) - return nil + } + err = errors.New("too many packets expected in delta") + return } if packetsExpected == 0 { - return &RTPDeltaInfo{ + deltaInfo = &RTPDeltaInfo{ StartTime: startTime, EndTime: endTime, } + return } packetsLost := uint32(now.packetsLost - then.packetsLost) @@ -610,19 +615,20 @@ func (r *rtpStatsBase) deltaInfo(snapshotID uint32, extStartSN uint64, extHighes // padding packets delta could be higher than expected due to out-of-order padding packets packetsPadding := now.packetsPadding - then.packetsPadding if packetsExpected < packetsPadding { - r.logger.Infow( - "padding packets more than expected", + loggingFields = []interface{}{ "packetsExpected", packetsExpected, "packetsPadding", packetsPadding, + "packetsLost", packetsLost, "startSequenceNumber", then.extStartSN, - "endSequenceNumber", now.extStartSN-1, - ) + "endSequenceNumber", now.extStartSN - 1, + } + err = errors.New("padding packets more than expected") packetsExpected = 0 } else { packetsExpected -= packetsPadding } - return &RTPDeltaInfo{ + deltaInfo = &RTPDeltaInfo{ StartTime: startTime, EndTime: endTime, Packets: uint32(packetsExpected), @@ -643,6 +649,7 @@ func (r *rtpStatsBase) deltaInfo(snapshotID uint32, extStartSN uint64, extHighes Plis: now.plis - then.plis, Firs: now.firs - then.firs, } + return } func (r *rtpStatsBase) MarshalLogObject(e zapcore.ObjectEncoder) error { diff --git a/pkg/sfu/buffer/rtpstats_receiver.go b/pkg/sfu/buffer/rtpstats_receiver.go index 6cbc9fd4c..1fe7c269e 100644 --- a/pkg/sfu/buffer/rtpstats_receiver.go +++ b/pkg/sfu/buffer/rtpstats_receiver.go @@ -176,10 +176,7 @@ func (r *RTPStatsReceiver) Update( r.logger.Debugw( "rtp receiver stream start", - "startTime", r.startTime.String(), - "firstTime", r.firstTime.String(), - "startSN", r.sequenceNumber.GetExtendedStart(), - "startTS", r.timestamp.GetExtendedStart(), + "rtpStats", lockedRTPStatsReceiverLogEncoder{r}, ) } else { resSN = r.sequenceNumber.Update(sequenceNumber) @@ -194,19 +191,12 @@ func (r *RTPStatsReceiver) Update( gapSN := int64(resSN.ExtendedVal - resSN.PreExtendedHighest) getLoggingFields := func() []interface{} { return []interface{}{ - "extStartSN", r.sequenceNumber.GetExtendedStart(), - "extHighestSN", r.sequenceNumber.GetExtendedHighest(), - "extStartTS", r.timestamp.GetExtendedStart(), - "extHighestTS", r.timestamp.GetExtendedHighest(), - "startTime", r.startTime.String(), - "firstTime", r.firstTime.String(), - "highestTime", r.highestTime.String(), "prevSN", resSN.PreExtendedHighest, "currSN", resSN.ExtendedVal, "gapSN", gapSN, "prevTS", resTS.PreExtendedHighest, "currTS", resTS.ExtendedVal, - "gapTS", resTS.ExtendedVal - resTS.PreExtendedHighest, + "gapTS", int64(resTS.ExtendedVal - resTS.PreExtendedHighest), "packetTime", packetTime.String(), "sequenceNumber", sequenceNumber, "timestamp", timestamp, @@ -214,8 +204,7 @@ func (r *RTPStatsReceiver) Update( "hdrSize", hdrSize, "payloadSize", payloadSize, "paddingSize", paddingSize, - "first", r.srFirst, - "last", r.srNewest, + "rtpStats", lockedRTPStatsReceiverLogEncoder{r}, } } if gapSN <= 0 { // duplicate OR out-of-order @@ -240,23 +229,23 @@ func (r *RTPStatsReceiver) Update( flowState.ExtTimestamp = resTS.ExtendedVal if !flowState.IsDuplicate && -gapSN >= cSequenceNumberLargeJumpThreshold { - if r.largeJumpNegativeCount%100 == 0 { + r.largeJumpNegativeCount++ + if (r.largeJumpNegativeCount-1)%100 == 0 { r.logger.Warnw( "large sequence number gap negative", nil, append(getLoggingFields(), "count", r.largeJumpNegativeCount)..., ) } - r.largeJumpNegativeCount++ } } else { // in-order if gapSN >= cSequenceNumberLargeJumpThreshold || resTS.ExtendedVal < resTS.PreExtendedHighest { - if r.largeJumpCount%100 == 0 { + r.largeJumpCount++ + if (r.largeJumpCount-1)%100 == 0 { r.logger.Warnw( "large sequence number gap OR time reversed", nil, append(getLoggingFields(), "count", r.largeJumpCount)..., ) } - r.largeJumpCount++ } // update gap histogram @@ -314,9 +303,8 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData) if r.srNewest != nil && r.srNewest.NTPTimestamp > srData.NTPTimestamp { r.logger.Infow( "received sender report, anachronous, dropping", - "first", r.srFirst, - "last", r.srNewest, "current", srData, + "rtpStats", lockedRTPStatsReceiverLogEncoder{r}, ) return false } @@ -370,10 +358,9 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData) if r.outOfOrderSenderReportCount%10 == 0 { r.logger.Infow( "received sender report, out-of-order, skipping", - "first", r.srFirst, - "last", r.srNewest, "current", &srDataCopy, "count", r.outOfOrderSenderReportCount, + "rtpStats", lockedRTPStatsReceiverLogEncoder{r}, ) } r.outOfOrderSenderReportCount++ @@ -394,8 +381,6 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData) if r.clockSkewCount%100 == 0 { r.logger.Infow( "received sender report, clock skew", - "first", r.srFirst, - "last", r.srNewest, "current", &srDataCopy, "timeSinceFirst", timeSinceFirst, "rtpDiffSinceFirst", rtpDiffSinceFirst, @@ -404,6 +389,7 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData) "rtpDiffSinceLast", rtpDiffSinceLast, "calculatedLast", calculatedClockRateFromLast, "count", r.clockSkewCount, + "rtpStats", lockedRTPStatsReceiverLogEncoder{r}, ) } r.clockSkewCount++ @@ -414,16 +400,13 @@ func (r *RTPStatsReceiver) SetRtcpSenderReportData(srData *RTCPSenderReportData) var deltaPropagationDelay time.Duration getPropagationFields := func() []interface{} { return []interface{}{ - "propagationDelay", r.propagationDelay.String(), "receivedPropagationDelay", propagationDelay.String(), - "longTermDeltaPropagationDelay", r.longTermDeltaPropagationDelay.String(), "receivedDeltaPropagationDelay", deltaPropagationDelay.String(), "deltaHighCount", r.propagationDelayDeltaHighCount, "sinceDeltaHighStart", time.Since(r.propagationDelayDeltaHighStartTime).String(), "propagationDelaySpike", r.propagationDelaySpike.String(), - "first", r.srFirst, - "last", r.srNewest, "current", &srDataCopy, + "rtpStats", lockedRTPStatsReceiverLogEncoder{r}, } } resetDelta := func() { @@ -537,6 +520,7 @@ func (r *RTPStatsReceiver) GetRtcpReceptionReport(ssrc uint32, proxyFracLost uin r.logger.Warnw( "too many packets expected in receiver report", fmt.Errorf("start: %d, end: %d, expected: %d", then.extStartSN, now.extStartSN, packetsExpected), + "rtpStats", lockedRTPStatsReceiverLogEncoder{r}, ) return nil } @@ -584,7 +568,16 @@ func (r *RTPStatsReceiver) DeltaInfo(snapshotID uint32) *RTPDeltaInfo { r.lock.Lock() defer r.lock.Unlock() - return r.deltaInfo(snapshotID, r.sequenceNumber.GetExtendedStart(), r.sequenceNumber.GetExtendedHighest()) + deltaInfo, err, loggingFields := r.deltaInfo( + snapshotID, + r.sequenceNumber.GetExtendedStart(), + r.sequenceNumber.GetExtendedHighest(), + ) + if err != nil { + r.logger.Infow(err.Error(), append(loggingFields, "rtpStats", lockedRTPStatsReceiverLogEncoder{r})...) + } + + return deltaInfo } func (r *RTPStatsReceiver) MarshalLogObject(e zapcore.ObjectEncoder) error { @@ -595,16 +588,7 @@ func (r *RTPStatsReceiver) MarshalLogObject(e zapcore.ObjectEncoder) error { r.lock.RLock() defer r.lock.RUnlock() - e.AddObject("base", r.rtpStatsBase) - - e.AddUint64("extStartSN", r.sequenceNumber.GetExtendedStart()) - e.AddUint64("extHighestSN", r.sequenceNumber.GetExtendedHighest()) - e.AddUint64("extStartTS", r.timestamp.GetExtendedStart()) - e.AddUint64("extHighestTS", r.timestamp.GetExtendedHighest()) - - e.AddDuration("propagationDelay", r.propagationDelay) - e.AddDuration("longTermDeltaPropagationDelay", r.longTermDeltaPropagationDelay) - return nil + return lockedRTPStatsReceiverLogEncoder{r}.MarshalLogObject(e) } func (r *RTPStatsReceiver) String() string { @@ -635,3 +619,26 @@ func (r *RTPStatsReceiver) isInRange(esn uint64, ehsn uint64) bool { } // ---------------------------------- + +type lockedRTPStatsReceiverLogEncoder struct { + *RTPStatsReceiver +} + +func (r lockedRTPStatsReceiverLogEncoder) MarshalLogObject(e zapcore.ObjectEncoder) error { + if r.RTPStatsReceiver == nil { + return nil + } + + e.AddObject("base", r.rtpStatsBase) + + e.AddUint64("extStartSN", r.sequenceNumber.GetExtendedStart()) + e.AddUint64("extHighestSN", r.sequenceNumber.GetExtendedHighest()) + e.AddUint64("extStartTS", r.timestamp.GetExtendedStart()) + e.AddUint64("extHighestTS", r.timestamp.GetExtendedHighest()) + + e.AddDuration("propagationDelay", r.propagationDelay) + e.AddDuration("longTermDeltaPropagationDelay", r.longTermDeltaPropagationDelay) + return nil +} + +// ---------------------------------- diff --git a/pkg/sfu/buffer/rtpstats_sender.go b/pkg/sfu/buffer/rtpstats_sender.go index 215520f45..ffb420229 100644 --- a/pkg/sfu/buffer/rtpstats_sender.go +++ b/pkg/sfu/buffer/rtpstats_sender.go @@ -277,10 +277,7 @@ func (r *RTPStatsSender) Update( r.logger.Debugw( "rtp sender stream start", - "startTime", r.startTime.String(), - "firstTime", r.firstTime.String(), - "startSN", r.extStartSN, - "startTS", r.extStartTS, + "rtpStats", lockedRTPStatsSenderLogEncoder{r}, ) } @@ -289,26 +286,16 @@ func (r *RTPStatsSender) Update( gapSN := int64(extSequenceNumber - r.extHighestSN) getLoggingFields := func() []interface{} { return []interface{}{ - "extStartSN", r.extStartSN, - "extHighestSN", r.extHighestSN, - "extStartTS", r.extStartTS, - "extHighestTS", r.extHighestTS, - "startTime", r.startTime.String(), - "firstTime", r.firstTime.String(), - "highestTime", r.highestTime.String(), - "highestSN", r.extHighestSN, "currSN", extSequenceNumber, "gapSN", gapSN, - "highestTS", r.extHighestTS, "currTS", extTimestamp, "gapTS", int64(extTimestamp - r.extHighestTS), - "packetTime", packetTime.String(), + "packetTime", packetTime, "marker", marker, "hdrSize", hdrSize, "payloadSize", payloadSize, "paddingSize", paddingSize, - "firstSR", r.srFirst, - "lastSR", r.srNewest, + "rtpStats", lockedRTPStatsSenderLogEncoder{r}, } } if gapSN <= 0 { // duplicate OR out-of-order @@ -316,15 +303,6 @@ func (r *RTPStatsSender) Update( // do not start on a padding only packet return } - if -gapSN >= cSequenceNumberLargeJumpThreshold { - if r.largeJumpNegativeCount%100 == 0 { - r.logger.Warnw( - "large sequence number gap negative", nil, - append(getLoggingFields(), "count", r.largeJumpNegativeCount)..., - ) - } - r.largeJumpNegativeCount++ - } if extSequenceNumber < r.extStartSN { r.packetsLost += r.extStartSN - extSequenceNumber @@ -349,9 +327,7 @@ func (r *RTPStatsSender) Update( r.logger.Infow( "adjusting start sequence number", append(getLoggingFields(), - "snBefore", r.extStartSN, "snAfter", extSequenceNumber, - "tsBefore", r.extStartTS, "tsAfter", extTimestamp, )..., ) @@ -371,15 +347,25 @@ func (r *RTPStatsSender) Update( r.packetsLost-- r.setSnInfo(extSequenceNumber, r.extHighestSN, uint16(pktSize), uint8(hdrSize), uint16(payloadSize), marker, true) } + + if !isDuplicate && -gapSN >= cSequenceNumberLargeJumpThreshold { + r.largeJumpNegativeCount++ + if (r.largeJumpNegativeCount-1)%100 == 0 { + r.logger.Warnw( + "large sequence number gap negative", nil, + append(getLoggingFields(), "count", r.largeJumpNegativeCount)..., + ) + } + } } else { // in-order if gapSN >= cSequenceNumberLargeJumpThreshold || extTimestamp < r.extHighestTS { - if r.largeJumpCount%100 == 0 { + r.largeJumpCount++ + if (r.largeJumpCount-1)%100 == 0 { r.logger.Warnw( "large sequence number gap OR time reversed", nil, append(getLoggingFields(), "count", r.largeJumpCount)..., ) } - r.largeJumpCount++ } // update gap histogram @@ -398,9 +384,7 @@ func (r *RTPStatsSender) Update( r.logger.Infow( "adjusting start timestamp", append(getLoggingFields(), - "snBefore", r.extStartSN, "snAfter", extSequenceNumber, - "tsBefore", r.extStartTS, "tsAfter", extTimestamp, )..., ) @@ -471,12 +455,9 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt if !r.lastRRTime.IsZero() && r.extHighestSNFromRR > extHighestSNFromRR { r.logger.Debugw( fmt.Sprintf("receiver report potentially out of order, highestSN: existing: %d, received: %d", r.extHighestSNFromRR, extHighestSNFromRR), - "lastRRTime", r.lastRRTime.String(), - "lastRR", r.lastRR, "sinceLastRR", time.Since(r.lastRRTime).String(), "receivedRR", rr, - "firstSR", r.srFirst, - "lastSR", r.srNewest, + "rtpStats", lockedRTPStatsSenderLogEncoder{r}, ) return } @@ -538,24 +519,11 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt } r.logger.Infow( "rr interval too big, skipping", - "lastRRTime", r.lastRRTime.String(), - "lastRR", r.lastRR, "timeSinceLastRR", timeSinceLastRR.String(), "receivedRR", rr, - "extStartSN", r.extStartSN, - "extHighestSN", r.extHighestSN, - "extStartTS", r.extStartTS, - "extHighestTS", r.extHighestTS, - "extLastRRSN", s.extLastRRSN, - "firstTime", r.firstTime.String(), - "startTime", r.startTime.String(), - "highestTime", r.highestTime.String(), "extReceivedRRSN", extReceivedRRSN, "packetsInInterval", extReceivedRRSN-s.extLastRRSN, - "extHighestSNFromRR", r.extHighestSNFromRR, - "packetsLostFromRR", r.packetsLostFromRR, - "firstSR", r.srFirst, - "lastSR", r.srNewest, + "rtpStats", lockedRTPStatsSenderLogEncoder{r}, ) continue } @@ -572,27 +540,14 @@ func (r *RTPStatsSender) UpdateFromReceiverReport(rr rtcp.ReceptionReport) (rtt if r.metadataCacheOverflowCount%10 == 0 { r.logger.Infow( "metadata cache overflow", - "lastRRTime", r.lastRRTime.String(), - "lastRR", r.lastRR, "timeSinceLastRR", timeSinceLastRR.String(), "receivedRR", rr, - "extStartSN", r.extStartSN, - "extHighestSN", r.extHighestSN, - "extStartTS", r.extStartTS, - "extHighestTS", r.extHighestTS, - "extLastRRSN", s.extLastRRSN, - "firstTime", r.firstTime.String(), - "startTime", r.startTime.String(), - "highestTime", r.highestTime.String(), "extReceivedRRSN", extReceivedRRSN, "packetsInInterval", extReceivedRRSN-s.extLastRRSN, "intervalStats", is.ToString(), "aggregateIntervalStats", eis.ToString(), - "extHighestSNFromRR", r.extHighestSNFromRR, - "packetsLostFromRR", r.packetsLostFromRR, "count", r.metadataCacheOverflowCount, - "firstSR", r.srFirst, - "lastSR", r.srNewest, + "rtpStats", lockedRTPStatsSenderLogEncoder{r}, ) } r.metadataCacheOverflowCount++ @@ -670,22 +625,17 @@ func (r *RTPStatsSender) GetRtcpSenderReport(ssrc uint32, publisherSRData *RTCPS getFields := func() []interface{} { return []interface{}{ - "first", r.srFirst, - "last", r.srNewest, "curr", srData, "feed", publisherSRData, "tsOffset", tsOffset, "timeNow", time.Now().String(), "now", now.String(), - "extStartTS", r.extStartTS, - "extHighestTS", r.extHighestTS, - "highestTime", r.highestTime.String(), "timeSinceHighest", now.Sub(r.highestTime).String(), - "firstTime", r.firstTime.String(), "timeSinceFirst", now.Sub(r.firstTime).String(), "timeSincePublisherSRAdjusted", timeSincePublisherSRAdjusted.String(), "timeSincePublisherSR", time.Since(publisherSRData.At).String(), "nowRTPExt", nowRTPExt, + "rtpStats", lockedRTPStatsSenderLogEncoder{r}, } } if r.srNewest != nil && nowRTPExt >= r.srNewest.RTPTimestampExt { @@ -732,7 +682,16 @@ func (r *RTPStatsSender) DeltaInfo(snapshotID uint32) *RTPDeltaInfo { r.lock.Lock() defer r.lock.Unlock() - return r.deltaInfo(snapshotID, r.extStartSN, r.extHighestSN) + deltaInfo, err, loggingFields := r.deltaInfo( + snapshotID, + r.extStartSN, + r.extHighestSN, + ) + if err != nil { + r.logger.Infow(err.Error(), append(loggingFields, "rtpStats", lockedRTPStatsSenderLogEncoder{r})...) + } + + return deltaInfo } func (r *RTPStatsSender) DeltaInfoSender(senderSnapshotID uint32) *RTPDeltaInfo { @@ -760,8 +719,7 @@ func (r *RTPStatsSender) DeltaInfoSender(senderSnapshotID uint32) *RTPDeltaInfo "startTime", startTime.String(), "endTime", endTime.String(), "duration", endTime.Sub(startTime).String(), - "firstSR", r.srFirst, - "lastSR", r.srNewest, + "rtpStats", lockedRTPStatsSenderLogEncoder{r}, ) return nil } @@ -789,6 +747,7 @@ func (r *RTPStatsSender) DeltaInfoSender(senderSnapshotID uint32) *RTPDeltaInfo packetsLost, packetsLostFeed, ), + "rtpStats", lockedRTPStatsSenderLogEncoder{r}, ) packetsLost = packetsExpected } @@ -832,18 +791,7 @@ func (r *RTPStatsSender) MarshalLogObject(e zapcore.ObjectEncoder) error { r.lock.RLock() defer r.lock.RUnlock() - e.AddObject("base", r.rtpStatsBase) - e.AddUint64("extStartSN", r.extStartSN) - e.AddUint64("extHighestSN", r.extHighestSN) - e.AddUint64("extStartTS", r.extStartTS) - e.AddUint64("extHighestTS", r.extHighestTS) - e.AddTime("lastRRTime", r.lastRRTime) - e.AddReflected("lastRR", r.lastRR) - e.AddUint64("extHighestSNFromRR", r.extHighestSNFromRR) - e.AddUint64("packetsLostFromRR", r.packetsLostFromRR) - e.AddFloat64("jitterFromRR", r.jitterFromRR) - e.AddFloat64("maxJitterFromRR", r.maxJitterFromRR) - return nil + return lockedRTPStatsSenderLogEncoder{r}.MarshalLogObject(e) } func (r *RTPStatsSender) String() string { @@ -1026,3 +974,26 @@ func (r *RTPStatsSender) getIntervalStats( } // ------------------------------------------------------------------- + +type lockedRTPStatsSenderLogEncoder struct { + *RTPStatsSender +} + +func (r lockedRTPStatsSenderLogEncoder) MarshalLogObject(e zapcore.ObjectEncoder) error { + if r.RTPStatsSender == nil { + return nil + } + + e.AddObject("base", r.rtpStatsBase) + e.AddUint64("extStartSN", r.extStartSN) + e.AddUint64("extHighestSN", r.extHighestSN) + e.AddUint64("extStartTS", r.extStartTS) + e.AddUint64("extHighestTS", r.extHighestTS) + e.AddTime("lastRRTime", r.lastRRTime) + e.AddReflected("lastRR", r.lastRR) + e.AddUint64("extHighestSNFromRR", r.extHighestSNFromRR) + e.AddUint64("packetsLostFromRR", r.packetsLostFromRR) + e.AddFloat64("jitterFromRR", r.jitterFromRR) + e.AddFloat64("maxJitterFromRR", r.maxJitterFromRR) + return nil +} diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 434deb980..a04b19b5d 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -121,8 +121,10 @@ type WebRTCReceiver struct { connectionStats *connectionquality.ConnectionStats - onStatsUpdate func(w *WebRTCReceiver, stat *livekit.AnalyticsStat) - onMaxLayerChange func(maxLayer int32) + onStatsUpdate func(w *WebRTCReceiver, stat *livekit.AnalyticsStat) + onMaxLayerChange func(maxLayer int32) + downtrackEverAdded atomic.Bool + onDowntrackEverAdded func() primaryReceiver atomic.Pointer[RedPrimaryReceiver] redReceiver atomic.Pointer[RedReceiver] @@ -194,6 +196,13 @@ func WithForwardStats(forwardStats *ForwardStats) ReceiverOpts { } } +func WithEverHasDowntrackAdded(f func()) ReceiverOpts { + return func(w *WebRTCReceiver) *WebRTCReceiver { + w.onDowntrackEverAdded = f + return w + } +} + // NewWebRTCReceiver creates a new webrtc track receiver func NewWebRTCReceiver( receiver *webrtc.RTPReceiver, @@ -430,9 +439,16 @@ func (w *WebRTCReceiver) AddDownTrack(track TrackSender) error { w.downTrackSpreader.Store(track) w.logger.Debugw("downtrack added", "subscriberID", track.SubscriberID()) + w.handleDowntrackAdded() return nil } +func (w *WebRTCReceiver) handleDowntrackAdded() { + if !w.downtrackEverAdded.Swap(true) && w.onDowntrackEverAdded != nil { + w.onDowntrackEverAdded() + } +} + func (w *WebRTCReceiver) notifyMaxExpectedLayer(layer int32) { ti := w.TrackInfo() if ti == nil { @@ -811,6 +827,7 @@ func (w *WebRTCReceiver) GetPrimaryReceiverForRed() TrackReceiver { w.bufferMu.Lock() w.redPktWriter = pr.ForwardRTP w.bufferMu.Unlock() + w.handleDowntrackAdded() } } return w.primaryReceiver.Load() @@ -830,6 +847,7 @@ func (w *WebRTCReceiver) GetRedReceiver() TrackReceiver { w.bufferMu.Lock() w.redPktWriter = pr.ForwardRTP w.bufferMu.Unlock() + w.handleDowntrackAdded() } } return w.redReceiver.Load() diff --git a/test/agent_test.go b/test/agent_test.go index 88e44b19c..7db5b4b6f 100644 --- a/test/agent_test.go +++ b/test/agent_test.go @@ -35,7 +35,6 @@ func TestAgents(t *testing.T) { _, finish := setupSingleNodeTest("TestAgents") defer finish() - ac1, err := newAgentClient(agentToken(), defaultServerPort) require.NoError(t, err) ac2, err := newAgentClient(agentToken(), defaultServerPort) @@ -48,10 +47,10 @@ func TestAgents(t *testing.T) { defer ac2.close() defer ac3.close() defer ac4.close() - ac1.Run(livekit.JobType_JT_ROOM, "namespace") - ac2.Run(livekit.JobType_JT_ROOM, "namespace") - ac3.Run(livekit.JobType_JT_PUBLISHER, "namespace") - ac4.Run(livekit.JobType_JT_PUBLISHER, "namespace") + ac1.Run(livekit.JobType_JT_ROOM, "default") + ac2.Run(livekit.JobType_JT_ROOM, "default") + ac3.Run(livekit.JobType_JT_PUBLISHER, "default") + ac4.Run(livekit.JobType_JT_PUBLISHER, "default") testutils.WithTimeout(t, func() string { if ac1.registered.Load() != 1 || ac2.registered.Load() != 1 || ac3.registered.Load() != 1 || ac4.registered.Load() != 1 { @@ -83,7 +82,7 @@ func TestAgents(t *testing.T) { } return "" - }, 6 * time.Second) + }, 6*time.Second) // publish 2 tracks t3, err := c2.AddStaticTrack("audio/opus", "audio", "micro") @@ -116,9 +115,24 @@ func TestAgentNamespaces(t *testing.T) { require.NoError(t, err) defer ac1.close() defer ac2.close() - ac1.Run(livekit.JobType_JT_ROOM, "namespace") + ac1.Run(livekit.JobType_JT_ROOM, "namespace1") ac2.Run(livekit.JobType_JT_ROOM, "namespace2") + _, err = roomClient.CreateRoom(contextWithToken(createRoomToken()), &livekit.CreateRoomRequest{ + Name: testRoom, + Agent: &livekit.RoomAgent{ + Agents: []*livekit.CreateAgentJobDefinitionRequest{ + &livekit.CreateAgentJobDefinitionRequest{ + Namespace: "namespace1", + }, + &livekit.CreateAgentJobDefinitionRequest{ + Namespace: "namespace2", + }, + }, + }, + }) + require.NoError(t, err) + testutils.WithTimeout(t, func() string { if ac1.registered.Load() != 1 || ac2.registered.Load() != 1 { return "worker not registered" @@ -137,7 +151,7 @@ func TestAgentNamespaces(t *testing.T) { job1 := <-ac1.requestedJobs job2 := <-ac2.requestedJobs - if job1.Namespace != "namespace" { + if job1.Namespace != "namespace1" { return "namespace is not 'namespace'" } @@ -163,8 +177,8 @@ func TestAgentMultiNode(t *testing.T) { ac2, err := newAgentClient(agentToken(), defaultServerPort) defer ac1.close() defer ac2.close() - ac1.Run(livekit.JobType_JT_ROOM, "namespace") - ac2.Run(livekit.JobType_JT_PUBLISHER, "namespace") + ac1.Run(livekit.JobType_JT_ROOM, "default") + ac2.Run(livekit.JobType_JT_PUBLISHER, "default") testutils.WithTimeout(t, func() string { if ac1.registered.Load() != 1 || ac2.registered.Load() != 1 { diff --git a/version/version.go b/version/version.go index 35b5cfe1f..4420678ab 100644 --- a/version/version.go +++ b/version/version.go @@ -14,4 +14,4 @@ package version -const Version = "1.6.2" +const Version = "1.7.0"