diff --git a/CHANGELOG b/CHANGELOG index cac84e150..1947140a6 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -2,6 +2,40 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.5.1] - 2023-11-09 + +Support for the Agent framework. + +### Added +- PSRPC based room and participant service. disabled by default (#2171 #2205) +- Add configuration to limit MaxBufferedAmount for data channel (#2170) +- Agent framework worker support (#2203 #2227 #2230 #2231 #2232) + +### Fixed +- Fixed panic in StreamTracker when SVC is used (#2147) +- fix CreateEgress not completing (#2156) +- Do not update highest time on padding packet. (#2157) +- Clear flags in packet metadata cache before setting them. (#2160) +- Drop not relevant packet only if contiguous. (#2167) +- Fixed edge cases in SVC codec support (#2176 #2185 #2191 #2196 #2197 #2214 #2215 #2216 #2218 #2219) +- Do not post to closed channels. (#2179) +- Only launch room egress once (#2175) +- Remove un-preferred codecs for android firefox (#2183) +- Fix pre-extended value on wrap back restart. (#2202) +- Declare audio inactive if stale. (#2229) + +### Changed +- Defer close of source and sink to prevent error logs. (#2149) +- Continued AV Sync improvements (#2150 #2153) +- Egress store/IO cleanup (required for Egress 1.8.0) (#2152) +- More fine grained filtering NACKs after a key frame. (#2159) +- Don't filter out ipv6 address for client don't support prflx over relay (#2193) +- Disable h264 for android firefox (#2190) +- Do not block on down track close with flush. (#2201) +- Separate publish and subscribe enabled codecs for finer grained control. (#2217) +- improve participant hidden (#2220) +- Reject migration if codec mismatch with published tracks (#2225) + ## [1.5.0] - 2023-10-15 ### Added diff --git a/go.mod b/go.mod index df947357d..c93bc5d8c 100644 --- a/go.mod +++ b/go.mod @@ -12,29 +12,29 @@ require ( github.com/gammazero/deque v0.2.1 github.com/gammazero/workerpool v1.1.3 github.com/google/wire v0.5.0 - github.com/gorilla/websocket v1.5.0 + github.com/gorilla/websocket v1.5.1 github.com/hashicorp/go-version v1.6.0 github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/jxskiss/base62 v1.1.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e - github.com/livekit/protocol v1.9.1-0.20231107185101-e230ee2d840e - github.com/livekit/psrpc v0.5.0 + github.com/livekit/protocol v1.9.2-0.20231116141704-aa43aa7482d7 + github.com/livekit/psrpc v0.5.2 github.com/mackerelio/go-osstat v0.2.4 github.com/magefile/mage v1.15.0 github.com/maxbrunsfeld/counterfeiter/v6 v6.7.0 github.com/mitchellh/go-homedir v1.1.0 github.com/olekukonko/tablewriter v0.0.5 - github.com/pion/dtls/v2 v2.2.7 + github.com/pion/dtls/v2 v2.2.8 github.com/pion/ice/v2 v2.3.11 github.com/pion/interceptor v0.1.25 - github.com/pion/rtcp v1.2.10 - github.com/pion/rtp v1.8.2 + github.com/pion/rtcp v1.2.12 + github.com/pion/rtp v1.8.3 github.com/pion/sctp v1.8.9 github.com/pion/sdp/v3 v3.0.6 github.com/pion/transport/v2 v2.2.4 github.com/pion/turn/v2 v2.1.4 - github.com/pion/webrtc/v3 v3.2.21 + github.com/pion/webrtc/v3 v3.2.23 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.17.0 github.com/redis/go-redis/v9 v9.3.0 @@ -46,7 +46,7 @@ require ( github.com/urfave/cli/v2 v2.25.7 github.com/urfave/negroni/v3 v3.0.0 go.uber.org/atomic v1.11.0 - golang.org/x/exp v0.0.0-20231006140011-7918f672742d + golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa golang.org/x/sync v0.5.0 google.golang.org/protobuf v1.31.0 gopkg.in/yaml.v3 v3.0.1 @@ -60,18 +60,18 @@ require ( github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/eapache/channels v1.1.0 // indirect github.com/eapache/queue v1.1.0 // indirect - github.com/go-jose/go-jose/v3 v3.0.0 // indirect + github.com/go-jose/go-jose/v3 v3.0.1 // indirect github.com/go-logr/logr v1.3.0 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/google/go-cmp v0.5.9 // indirect github.com/google/subcommands v1.2.0 // indirect github.com/google/uuid v1.3.1 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect - github.com/hashicorp/go-retryablehttp v0.7.4 // indirect + github.com/hashicorp/go-retryablehttp v0.7.5 // indirect github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/josharian/native v1.1.0 // indirect github.com/klauspost/compress v1.17.2 // indirect - github.com/klauspost/cpuid/v2 v2.2.5 // indirect + github.com/klauspost/cpuid/v2 v2.2.6 // indirect github.com/lithammer/shortuuid/v4 v4.0.0 // indirect github.com/mattn/go-runewidth v0.0.9 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect @@ -84,7 +84,7 @@ require ( github.com/pion/logging v0.2.2 // indirect github.com/pion/mdns v0.0.8 // indirect github.com/pion/randutil v0.1.0 // indirect - github.com/pion/srtp/v2 v2.0.17 // indirect + github.com/pion/srtp/v2 v2.0.18 // indirect github.com/pion/stun v0.6.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 // indirect @@ -95,13 +95,13 @@ require ( github.com/zeebo/xxh3 v1.0.2 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.26.0 // indirect - golang.org/x/crypto v0.14.0 // indirect - golang.org/x/mod v0.13.0 // indirect - golang.org/x/net v0.17.0 // indirect - golang.org/x/sys v0.13.0 // indirect - golang.org/x/text v0.13.0 // indirect - golang.org/x/tools v0.14.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405 // indirect + golang.org/x/crypto v0.15.0 // indirect + golang.org/x/mod v0.14.0 // indirect + golang.org/x/net v0.18.0 // indirect + golang.org/x/sys v0.14.0 // indirect + golang.org/x/text v0.14.0 // indirect + golang.org/x/tools v0.15.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect google.golang.org/grpc v1.59.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index c59b38f48..909a503d1 100644 --- a/go.sum +++ b/go.sum @@ -40,8 +40,8 @@ github.com/gammazero/deque v0.2.1 h1:qSdsbG6pgp6nL7A0+K/B7s12mcCY/5l5SIUpMOl+dC0 github.com/gammazero/deque v0.2.1/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU= github.com/gammazero/workerpool v1.1.3 h1:WixN4xzukFoN0XSeXF6puqEqFTl2mECI9S6W44HWy9Q= github.com/gammazero/workerpool v1.1.3/go.mod h1:wPjyBLDbyKnUn2XwwyD3EEwo9dHutia9/fwNmSHWACc= -github.com/go-jose/go-jose/v3 v3.0.0 h1:s6rrhirfEP/CGIoc6p+PZAeogN2SxKav6Wp7+dyMWVo= -github.com/go-jose/go-jose/v3 v3.0.0/go.mod h1:RNkWWRld676jZEYoV3+XK8L2ZnNSvIsxFMht0mSX+u8= +github.com/go-jose/go-jose/v3 v3.0.1 h1:pWmKFVtt+Jl0vBZTIpz/eAKwsm6LkIxDVVbFHKkchhA= +github.com/go-jose/go-jose/v3 v3.0.1/go.mod h1:RNkWWRld676jZEYoV3+XK8L2ZnNSvIsxFMht0mSX+u8= github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY= github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= @@ -76,14 +76,14 @@ github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/wire v0.5.0 h1:I7ELFeVBr3yfPIcc8+MWvrjk+3VjbcSzoXm3JVa+jD8= github.com/google/wire v0.5.0/go.mod h1:ngWDr9Qvq3yZA10YrxfyGELY/AFWGVpy9c1LTRi1EoU= -github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= -github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= +github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48= github.com/hashicorp/go-hclog v0.9.2 h1:CG6TE5H9/JXsFWJCfoIVpKFIkFe6ysEuHirp4DxCsHI= github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ= -github.com/hashicorp/go-retryablehttp v0.7.4 h1:ZQgVdpTdAL7WpMIwLzCfbalOcSUdkDZnpUv3/+BxzFA= -github.com/hashicorp/go-retryablehttp v0.7.4/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5wjtH1ewM9u8iYVjtX8= +github.com/hashicorp/go-retryablehttp v0.7.5 h1:bJj+Pj19UZMIweq/iie+1u5YCdGrnxCT9yvm0e+Nd5M= +github.com/hashicorp/go-retryablehttp v0.7.5/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5wjtH1ewM9u8iYVjtX8= github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek= github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= @@ -109,8 +109,8 @@ github.com/jxskiss/base62 v1.1.0 h1:A5zbF8v8WXx2xixnAKD2w+abC+sIzYJX+nxmhA6HWFw= github.com/jxskiss/base62 v1.1.0/go.mod h1:HhWAlUXvxKThfOlZbcuFzsqwtF5TcqS9ru3y5GfjWAc= github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= -github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg= -github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= +github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc= +github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= @@ -125,10 +125,10 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e h1:yNeIo7MSMUWgoLu7LkNKnBYnJBFPFH9Wq4S6h1kS44M= github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY= -github.com/livekit/protocol v1.9.1-0.20231107185101-e230ee2d840e h1:YShBpEjkEBY7yil2gjMWlkVkxs3OI58LIIYsBdb8aBU= -github.com/livekit/protocol v1.9.1-0.20231107185101-e230ee2d840e/go.mod h1:l2WjlZWErS6vBlQaQyCGwWLt1aOx10XfQTsmvLjJWFQ= -github.com/livekit/psrpc v0.5.0 h1:g+yYNSs6Y1/vM7UlFkB2s/ARe2y3RKWZhX8ata5j+eo= -github.com/livekit/psrpc v0.5.0/go.mod h1:1XYH1LLoD/YbvBvt6xg2KQ/J3InLXSJK6PL/+DKmuAU= +github.com/livekit/protocol v1.9.2-0.20231116141704-aa43aa7482d7 h1:M/ljEz6MCH5lovoTT0t6hyaaZJEn4hvXs9J9OtQ+gS4= +github.com/livekit/protocol v1.9.2-0.20231116141704-aa43aa7482d7/go.mod h1:JgFHHd99wgEp4smATlJupOdA7iJHFoj2g3RFeM/Hk8M= +github.com/livekit/psrpc v0.5.2 h1:+MvG8Otm/J6MTg2MP/uuMbrkxOWsrj2hDhu/I1VIU1U= +github.com/livekit/psrpc v0.5.2/go.mod h1:cQjxg1oCxYHhxxv6KJH1gSvdtCHQoRZCHgPdm5N8v2g= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= github.com/mackerelio/go-osstat v0.2.4/go.mod h1:Zy+qzGdZs3A9cuIqmgbJvwbmLQH9dJvtio5ZjJTbdlQ= github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg= @@ -180,11 +180,11 @@ github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAl github.com/onsi/gomega v1.27.10 h1:naR28SdDFlqrG6kScpT8VWpu1xWY5nJRCF3XaYyBjhI= github.com/pion/datachannel v1.5.5 h1:10ef4kwdjije+M9d7Xm9im2Y3O6A6ccQb0zcqZcJew8= github.com/pion/datachannel v1.5.5/go.mod h1:iMz+lECmfdCMqFRhXhcA/219B0SQlbpoR2V118yimL0= -github.com/pion/dtls/v2 v2.2.7 h1:cSUBsETxepsCSFSxC3mc/aDo14qQLMSL+O6IjG28yV8= github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s= +github.com/pion/dtls/v2 v2.2.8 h1:BUroldfiIbV9jSnC6cKOMnyiORRWrWWpV11JUyEu5OA= +github.com/pion/dtls/v2 v2.2.8/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s= github.com/pion/ice/v2 v2.3.11 h1:rZjVmUwyT55cmN8ySMpL7rsS8KYsJERsrxJLLxpKhdw= github.com/pion/ice/v2 v2.3.11/go.mod h1:hPcLC3kxMa+JGRzMHqQzjoSj3xtE9F+eoncmXLlCL4E= -github.com/pion/interceptor v0.1.18/go.mod h1:tpvvF4cPM6NGxFA1DUMbhabzQBxdWMATDGEUYOR9x6I= github.com/pion/interceptor v0.1.25 h1:pwY9r7P6ToQ3+IF0bajN0xmk/fNw/suTgaTdlwTDmhc= github.com/pion/interceptor v0.1.25/go.mod h1:wkbPYAak5zKsfpVDYMtEfWEy8D4zL+rpxCxPImLOg3Y= github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= @@ -193,19 +193,20 @@ github.com/pion/mdns v0.0.8 h1:HhicWIg7OX5PVilyBO6plhMetInbzkVJAhbdJiAeVaI= github.com/pion/mdns v0.0.8/go.mod h1:hYE72WX8WDveIhg7fmXgMKivD3Puklk0Ymzog0lSyaI= github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= -github.com/pion/rtcp v1.2.10 h1:nkr3uj+8Sp97zyItdN60tE/S6vk4al5CPRR6Gejsdjc= github.com/pion/rtcp v1.2.10/go.mod h1:ztfEwXZNLGyF1oQDttz/ZKIBaeeg/oWbRYqzBM9TL1I= -github.com/pion/rtp v1.8.1/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU= -github.com/pion/rtp v1.8.2 h1:oKMM0K1/QYQ5b5qH+ikqDSZRipP5mIxPJcgcvw5sH0w= +github.com/pion/rtcp v1.2.12 h1:bKWiX93XKgDZENEXCijvHRU/wRifm6JV5DGcH6twtSM= +github.com/pion/rtcp v1.2.12/go.mod h1:sn6qjxvnwyAkkPzPULIbVqSKI5Dv54Rv7VG0kNxh9L4= github.com/pion/rtp v1.8.2/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU= +github.com/pion/rtp v1.8.3 h1:VEHxqzSVQxCkKDSHro5/4IUUG1ea+MFdqR2R3xSpNU8= +github.com/pion/rtp v1.8.3/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU= github.com/pion/sctp v1.8.5/go.mod h1:SUFFfDpViyKejTAdwD1d/HQsCu+V/40cCs2nZIvC3s0= github.com/pion/sctp v1.8.8/go.mod h1:igF9nZBrjh5AtmKc7U30jXltsFHicFCXSmWA2GWRaWs= github.com/pion/sctp v1.8.9 h1:TP5ZVxV5J7rz7uZmbyvnUvsn7EJ2x/5q9uhsTtXbI3g= github.com/pion/sctp v1.8.9/go.mod h1:cMLT45jqw3+jiJCrtHVwfQLnfR0MGZ4rgOJwUOIqLkI= github.com/pion/sdp/v3 v3.0.6 h1:WuDLhtuFUUVpTfus9ILC4HRyHsW6TdugjEX/QY9OiUw= github.com/pion/sdp/v3 v3.0.6/go.mod h1:iiFWFpQO8Fy3S5ldclBkpXqmWy02ns78NOKoLLL0YQw= -github.com/pion/srtp/v2 v2.0.17 h1:ECuOk+7uIpY6HUlTb0nXhfvu4REG2hjtC4ronYFCZE4= -github.com/pion/srtp/v2 v2.0.17/go.mod h1:y5WSHcJY4YfNB/5r7ca5YjHeIr1H3LM1rKArGGs8jMc= +github.com/pion/srtp/v2 v2.0.18 h1:vKpAXfawO9RtTRKZJbG4y0v1b11NZxQnxRl85kGuUlo= +github.com/pion/srtp/v2 v2.0.18/go.mod h1:0KJQjA99A6/a0DOVTu1PhDSw0CXF2jTkqOoMg3ODqdA= github.com/pion/stun v0.6.1 h1:8lp6YejULeHBF8NmV8e2787BogQhduZugh5PdhDyyN4= github.com/pion/stun v0.6.1/go.mod h1:/hO7APkX4hZKu/D0f2lHzNyvdkTGtIy3NDmLR7kSz/8= github.com/pion/transport v0.14.1 h1:XSM6olwW+o8J4SCmOBb/BpwZypkHeyM0PGFCxNQBr40= @@ -220,8 +221,8 @@ github.com/pion/transport/v3 v3.0.1/go.mod h1:UY7kiITrlMv7/IKgd5eTUcaahZx5oUN3l9 github.com/pion/turn/v2 v2.1.3/go.mod h1:huEpByKKHix2/b9kmTAM3YoX6MKP+/D//0ClgUYR2fY= github.com/pion/turn/v2 v2.1.4 h1:2xn8rduI5W6sCZQkEnIUDAkrBQNl2eYIBCHMZ3QMmP8= github.com/pion/turn/v2 v2.1.4/go.mod h1:huEpByKKHix2/b9kmTAM3YoX6MKP+/D//0ClgUYR2fY= -github.com/pion/webrtc/v3 v3.2.21 h1:c8fy5JcqJkAQBwwy3Sk9huQLTBUSqaggyRlv9Lnh2zY= -github.com/pion/webrtc/v3 v3.2.21/go.mod h1:vVURQTBOG5BpWKOJz3nlr23NfTDeyKVmubRNqzQp+Tg= +github.com/pion/webrtc/v3 v3.2.23 h1:GbqEuxBbVLFhXk0GwxKAoaIJYiEa9TyoZPEZC+2HZxM= +github.com/pion/webrtc/v3 v3.2.23/go.mod h1:1CaT2fcZzZ6VZA+O1i9yK2DU4EOcXVvSbWG9pr5jefs= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -290,15 +291,15 @@ golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE golang.org/x/crypto v0.10.0/go.mod h1:o4eNf7Ede1fv+hwOwZsTHl9EsPFO6q6ZvYR8vYfY45I= golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio= golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw= -golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc= -golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= -golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI= -golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo= +golang.org/x/crypto v0.15.0 h1:frVn1TEaCEaZcn3Tmd7Y2b5KKPaZ+I32Q2OA3kYp5TA= +golang.org/x/crypto v0.15.0/go.mod h1:4ChreQoLWfG3xLDer1WdlH5NdlQ3+mwnQq1YTKY+72g= +golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa h1:FRnLl4eNAQl8hwxVVC17teOw8kdjVDVAiFMtgUdTSRQ= +golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa/go.mod h1:zk2irFbV9DP96SEBUUAy67IdHUaZuSnrz1n472HUCLE= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= -golang.org/x/mod v0.13.0 h1:I/DsJXRlw/8l/0c24sM9yb0T4z9liZTduXvdAWYiysY= -golang.org/x/mod v0.13.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0= +golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= @@ -326,8 +327,8 @@ golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.11.0/go.mod h1:2L/ixqYpgIVXmeoSA/4Lu7BzTG4KIyPIryS4IsOd1oQ= golang.org/x/net v0.13.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= -golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= -golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/net v0.18.0 h1:mIYleuAkSbHh0tCv7RvjL3F6ZVbLjq4+R7zbOn3Kokg= +golang.org/x/net v0.18.0/go.mod h1:/czyP5RqHAH4odGYxBJ1qz0+CE5WZ+2j1YgoEo8F2jQ= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -377,8 +378,8 @@ golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= -golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q= +golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -398,22 +399,22 @@ golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.10.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= -golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= -golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190422233926-fe54fb35175b/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= -golang.org/x/tools v0.14.0 h1:jvNa2pY0M4r62jkRQ6RwEZZyPcymeL9XZMLBbV7U2nc= -golang.org/x/tools v0.14.0/go.mod h1:uYBEerGOWcJyEORxN+Ek8+TT266gXkNlHdJBwexUsBg= +golang.org/x/tools v0.15.0 h1:zdAyfUGbYmuVokhzVmghFl2ZJh5QhcfebBgmVPFYA+8= +golang.org/x/tools v0.15.0/go.mod h1:hpksKq4dtpQWS1uQ61JkdqWM3LscIS6Slf+VVkm+wQk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405 h1:AB/lmRny7e2pLhFEYIbl5qkDAUt2h0ZRO4wGPhZf+ik= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405/go.mod h1:67X1fPuzjcrkymZzZV1vvkFeTn2Rvc6lYF9MYFGCcwE= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 h1:Jyp0Hsi0bmHXG6k9eATXoYtjd6e2UzZ1SCn/wIupY14= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17/go.mod h1:oQ5rr10WTTMvP4A36n8JpR1OrO1BEiV4f78CneXZxkA= google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= diff --git a/pkg/config/config.go b/pkg/config/config.go index a87caf55b..a0c882f14 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -66,6 +66,7 @@ type Config struct { Room RoomConfig `yaml:"room,omitempty"` TURN TURNConfig `yaml:"turn,omitempty"` Ingress IngressConfig `yaml:"ingress,omitempty"` + SIP SIPConfig `yaml:"sip,omitempty"` WebHook WebHookConfig `yaml:"webhook,omitempty"` NodeSelector NodeSelectorConfig `yaml:"node_selector,omitempty"` KeyFile string `yaml:"key_file,omitempty"` @@ -294,6 +295,9 @@ type IngressConfig struct { WHIPBaseURL string `yaml:"whip_base_url,omitempty"` } +type SIPConfig struct { +} + // not exposed to YAML type APIConfig struct { // amount of time to wait for API to execute, default 2s diff --git a/pkg/rtc/participant.go b/pkg/rtc/participant.go index 404cbf8a2..09f3c77b9 100644 --- a/pkg/rtc/participant.go +++ b/pkg/rtc/participant.go @@ -786,7 +786,7 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea p.TransportManager.Close() }() - p.dataChannelStats.Report() + p.dataChannelStats.Stop() return nil } diff --git a/pkg/rtc/room_test.go b/pkg/rtc/room_test.go index b86527583..36322cbc1 100644 --- a/pkg/rtc/room_test.go +++ b/pkg/rtc/room_test.go @@ -83,7 +83,7 @@ func TestJoinedState(t *testing.T) { func TestRoomJoin(t *testing.T) { t.Run("joining returns existing participant data", func(t *testing.T) { rm := newRoomWithParticipants(t, testRoomOpts{num: numParticipants}) - pNew := newMockParticipant("new", types.CurrentProtocol, false, false) + pNew := NewMockParticipant("new", types.CurrentProtocol, false, false) _ = rm.Join(pNew, nil, nil, iceServersForRoom) @@ -98,7 +98,7 @@ func TestRoomJoin(t *testing.T) { t.Run("subscribe to existing channels upon join", func(t *testing.T) { numExisting := 3 rm := newRoomWithParticipants(t, testRoomOpts{num: numExisting}) - p := newMockParticipant("new", types.CurrentProtocol, false, false) + p := NewMockParticipant("new", types.CurrentProtocol, false, false) err := rm.Join(p, nil, &ParticipantOptions{AutoSubscribe: true}, iceServersForRoom) require.NoError(t, err) @@ -154,7 +154,7 @@ func TestRoomJoin(t *testing.T) { rm.lock.Lock() rm.protoRoom.MaxParticipants = 1 rm.lock.Unlock() - p := newMockParticipant("second", types.ProtocolVersion(0), false, false) + p := NewMockParticipant("second", types.ProtocolVersion(0), false, false) err := rm.Join(p, nil, nil, iceServersForRoom) require.Equal(t, ErrMaxParticipantsExceeded, err) @@ -414,7 +414,7 @@ func TestNewTrack(t *testing.T) { pub := participants[2].(*typesfakes.FakeLocalParticipant) // pub adds track - track := newMockTrack(livekit.TrackType_VIDEO, "webcam") + track := NewMockTrack(livekit.TrackType_VIDEO, "webcam") trackCB := pub.OnTrackPublishedArgsForCall(0) require.NotNil(t, trackCB) trackCB(pub, track) @@ -653,7 +653,7 @@ func TestHiddenParticipants(t *testing.T) { rm := newRoomWithParticipants(t, testRoomOpts{num: 2, numHidden: 1}) defer rm.Close() - pNew := newMockParticipant("new", types.CurrentProtocol, false, false) + pNew := NewMockParticipant("new", types.CurrentProtocol, false, false) rm.Join(pNew, nil, nil, iceServersForRoom) // expect new participant to get a JoinReply @@ -667,7 +667,7 @@ func TestHiddenParticipants(t *testing.T) { t.Run("hidden participant subscribes to tracks", func(t *testing.T) { rm := newRoomWithParticipants(t, testRoomOpts{num: 2}) - hidden := newMockParticipant("hidden", types.CurrentProtocol, true, false) + hidden := NewMockParticipant("hidden", types.CurrentProtocol, true, false) err := rm.Join(hidden, nil, &ParticipantOptions{AutoSubscribe: true}, iceServersForRoom) require.NoError(t, err) @@ -689,7 +689,7 @@ func TestRoomUpdate(t *testing.T) { p1 := rm.GetParticipants()[0].(*typesfakes.FakeLocalParticipant) require.Equal(t, 0, p1.SendRoomUpdateCallCount()) - p2 := newMockParticipant("p2", types.CurrentProtocol, false, false) + p2 := NewMockParticipant("p2", types.CurrentProtocol, false, false) require.NoError(t, rm.Join(p2, nil, nil, iceServersForRoom)) // p1 should have received an update @@ -743,7 +743,7 @@ func newRoomWithParticipants(t *testing.T, opts testRoomOpts) *Room { ) for i := 0; i < opts.num+opts.numHidden; i++ { identity := livekit.ParticipantIdentity(fmt.Sprintf("p%d", i)) - participant := newMockParticipant(identity, opts.protocol, i >= opts.num, true) + participant := NewMockParticipant(identity, opts.protocol, i >= opts.num, true) err := rm.Join(participant, nil, &ParticipantOptions{AutoSubscribe: true}, iceServersForRoom) require.NoError(t, err) participant.StateReturns(livekit.ParticipantInfo_ACTIVE) diff --git a/pkg/rtc/helper_test.go b/pkg/rtc/testutils.go similarity index 85% rename from pkg/rtc/helper_test.go rename to pkg/rtc/testutils.go index 4a0087ea1..fe5c10ad6 100644 --- a/pkg/rtc/helper_test.go +++ b/pkg/rtc/testutils.go @@ -16,6 +16,7 @@ package rtc import ( "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" "github.com/livekit/protocol/utils" "github.com/livekit/livekit-server/pkg/rtc/types" @@ -27,7 +28,7 @@ func init() { prometheus.Init("test", livekit.NodeType_SERVER, "test") } -func newMockParticipant(identity livekit.ParticipantIdentity, protocol types.ProtocolVersion, hidden bool, publisher bool) *typesfakes.FakeLocalParticipant { +func NewMockParticipant(identity livekit.ParticipantIdentity, protocol types.ProtocolVersion, hidden bool, publisher bool) *typesfakes.FakeLocalParticipant { p := &typesfakes.FakeLocalParticipant{} sid := utils.NewGuid(utils.ParticipantPrefix) p.IDReturns(livekit.ParticipantID(sid)) @@ -44,6 +45,12 @@ func newMockParticipant(identity livekit.ParticipantIdentity, protocol types.Pro State: livekit.ParticipantInfo_JOINED, IsPublisher: publisher, }) + p.ToProtoWithVersionReturns(&livekit.ParticipantInfo{ + Sid: sid, + Identity: string(identity), + State: livekit.ParticipantInfo_JOINED, + IsPublisher: publisher, + }, utils.TimedVersion{}) p.SetMetadataCalls(func(m string) { var f func(participant types.LocalParticipant) @@ -71,11 +78,12 @@ func newMockParticipant(identity livekit.ParticipantIdentity, protocol types.Pro p.AddTrackCalls(func(req *livekit.AddTrackRequest) { updateTrack() }) + p.GetLoggerReturns(logger.GetLogger()) return p } -func newMockTrack(kind livekit.TrackType, name string) *typesfakes.FakeMediaTrack { +func NewMockTrack(kind livekit.TrackType, name string) *typesfakes.FakeMediaTrack { t := &typesfakes.FakeMediaTrack{} t.IDReturns(livekit.TrackID(utils.NewGuid(utils.TrackPrefix))) t.KindReturns(kind) diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport.go index ca22ffeec..16e454e78 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport.go @@ -657,12 +657,10 @@ func (t *PCTransport) onPeerConnectionStateChange(state webrtc.PeerConnectionSta } t.maybeNotifyFullyEstablished() - t.logICECandidates() } case webrtc.PeerConnectionStateFailed: t.params.Logger.Infow("peer connection failed") t.clearConnTimer() - t.logICECandidates() t.handleConnectionFailed(false) } } @@ -1606,13 +1604,17 @@ func (t *PCTransport) handleRemoteICECandidate(e *event) error { } func (t *PCTransport) handleLogICECandidates(e *event) error { - t.params.Logger.Infow( - "ice candidates", - "lc", t.allowedLocalCandidates.Get(), - "rc", t.allowedRemoteCandidates.Get(), - "lc (filtered)", t.filteredLocalCandidates.Get(), - "rc (filtered)", t.filteredRemoteCandidates.Get(), - ) + lc := t.allowedLocalCandidates.Get() + rc := t.allowedRemoteCandidates.Get() + if len(lc) != 0 || len(rc) != 0 { + t.params.Logger.Infow( + "ice candidates", + "lc", lc, + "rc", rc, + "lc (filtered)", t.filteredLocalCandidates.Get(), + "rc (filtered)", t.filteredRemoteCandidates.Get(), + ) + } return nil } diff --git a/pkg/service/agentservice.go b/pkg/service/agentservice.go index baaf40da4..f2ba672b2 100644 --- a/pkg/service/agentservice.go +++ b/pkg/service/agentservice.go @@ -198,12 +198,27 @@ func (s *AgentHandler) HandleConnection(conn *websocket.Conn) { } func (s *AgentHandler) handleRegister(worker *worker, msg *livekit.RegisterWorkerRequest) { + if err := s.doHandleRegister(worker, msg); err != nil { + logger.Errorw("failed to register worker", err, "workerID", msg.WorkerId, "jobType", msg.Type) + worker.conn.Close() + } +} + +func (s *AgentHandler) doHandleRegister(worker *worker, msg *livekit.RegisterWorkerRequest) error { + if msg.WorkerId == "" { + return errors.New("invalid worker id") + } + s.mu.Lock() - defer s.mu.Unlock() + if worker.id != "" { + s.mu.Unlock() + return errors.New("worker already registered") + } switch msg.Type { case livekit.JobType_JT_ROOM: worker.id = msg.WorkerId + worker.jobType = msg.Type delete(s.unregistered, worker.conn) s.roomWorkers[worker.id] = worker @@ -218,6 +233,7 @@ func (s *AgentHandler) handleRegister(worker *worker, msg *livekit.RegisterWorke case livekit.JobType_JT_PUBLISHER: worker.id = msg.WorkerId + worker.jobType = msg.Type delete(s.unregistered, worker.conn) s.publisherWorkers[worker.id] = worker @@ -229,7 +245,11 @@ func (s *AgentHandler) handleRegister(worker *worker, msg *livekit.RegisterWorke s.publisherRegistered = true } } + default: + s.mu.Unlock() + return errors.New("invalid job type") } + s.mu.Unlock() _, err := worker.sigConn.WriteServerMessage(&livekit.ServerMessage{ Message: &livekit.ServerMessage_Register{ @@ -242,6 +262,8 @@ func (s *AgentHandler) handleRegister(worker *worker, msg *livekit.RegisterWorke if err != nil { logger.Errorw("failed to write server message", err) } + + return nil } func (s *AgentHandler) handleAvailability(w *worker, msg *livekit.AvailabilityResponse) { @@ -366,8 +388,7 @@ func (s *AgentHandler) JobRequest(ctx context.Context, job *livekit.Job) (*empty Availability: &livekit.AvailabilityRequest{Job: job}, }}) if err != nil { - logger.Errorw("failed to send availability request", err) - return nil, err + logger.Errorw("failed to send availability request", err, "workerID", selected.id) } select { @@ -379,7 +400,7 @@ func (s *AgentHandler) JobRequest(ctx context.Context, job *livekit.Job) (*empty Assignment: &livekit.JobAssignment{Job: job}, }}) if err != nil { - logger.Errorw("failed to assign job", err) + logger.Errorw("failed to assign job", err, "workerID", selected.id) } else { selected.mu.Lock() selected.activeJobs++ diff --git a/pkg/service/errors.go b/pkg/service/errors.go index 7c27d0dac..9a77f5fde 100644 --- a/pkg/service/errors.go +++ b/pkg/service/errors.go @@ -34,4 +34,8 @@ var ( ErrRemoteUnmuteNoteEnabled = psrpc.NewErrorf(psrpc.FailedPrecondition, "remote unmute not enabled") ErrTrackNotFound = psrpc.NewErrorf(psrpc.NotFound, "track is not found") ErrWebHookMissingAPIKey = psrpc.NewErrorf(psrpc.InvalidArgument, "api_key is required to use webhooks") + ErrSIPNotConnected = psrpc.NewErrorf(psrpc.Internal, "sip not connected (redis required)") + ErrSIPTrunkNotFound = psrpc.NewErrorf(psrpc.NotFound, "requested sip trunk does not exist") + ErrSIPDispatchRuleNotFound = psrpc.NewErrorf(psrpc.NotFound, "requested sip dispatch rule does not exist") + ErrSIPParticipantNotFound = psrpc.NewErrorf(psrpc.NotFound, "requested sip participant does not exist") ) diff --git a/pkg/service/interfaces.go b/pkg/service/interfaces.go index 6c32d7c5c..5dd0cb20b 100644 --- a/pkg/service/interfaces.go +++ b/pkg/service/interfaces.go @@ -76,3 +76,21 @@ type RoomAllocator interface { CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (*livekit.Room, bool, error) ValidateCreateRoom(ctx context.Context, roomName livekit.RoomName) error } + +//counterfeiter:generate . SIPStore +type SIPStore interface { + StoreSIPTrunk(ctx context.Context, info *livekit.SIPTrunkInfo) error + LoadSIPTrunk(ctx context.Context, sipTrunkID string) (*livekit.SIPTrunkInfo, error) + ListSIPTrunk(ctx context.Context) ([]*livekit.SIPTrunkInfo, error) + DeleteSIPTrunk(ctx context.Context, info *livekit.SIPTrunkInfo) error + + StoreSIPDispatchRule(ctx context.Context, info *livekit.SIPDispatchRuleInfo) error + LoadSIPDispatchRule(ctx context.Context, sipDispatchRuleID string) (*livekit.SIPDispatchRuleInfo, error) + ListSIPDispatchRule(ctx context.Context) ([]*livekit.SIPDispatchRuleInfo, error) + DeleteSIPDispatchRule(ctx context.Context, info *livekit.SIPDispatchRuleInfo) error + + StoreSIPParticipant(ctx context.Context, info *livekit.SIPParticipantInfo) error + LoadSIPParticipant(ctx context.Context, sipParticipantID string) (*livekit.SIPParticipantInfo, error) + ListSIPParticipant(ctx context.Context) ([]*livekit.SIPParticipantInfo, error) + DeleteSIPParticipant(ctx context.Context, info *livekit.SIPParticipantInfo) error +} diff --git a/pkg/service/ioservice.go b/pkg/service/ioservice.go index b957e8b15..70158d5cc 100644 --- a/pkg/service/ioservice.go +++ b/pkg/service/ioservice.go @@ -20,11 +20,12 @@ import ( "google.golang.org/protobuf/types/known/emptypb" - "github.com/livekit/livekit-server/pkg/telemetry" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/rpc" "github.com/livekit/psrpc" + + "github.com/livekit/livekit-server/pkg/telemetry" ) type IOInfoService struct { @@ -32,6 +33,7 @@ type IOInfoService struct { es EgressStore is IngressStore + ss SIPStore telemetry telemetry.TelemetryService shutdown chan struct{} @@ -41,11 +43,13 @@ func NewIOInfoService( bus psrpc.MessageBus, es EgressStore, is IngressStore, + ss SIPStore, ts telemetry.TelemetryService, ) (*IOInfoService, error) { s := &IOInfoService{ es: es, is: is, + ss: ss, telemetry: ts, shutdown: make(chan struct{}), } diff --git a/pkg/service/ioservice_sip.go b/pkg/service/ioservice_sip.go new file mode 100644 index 000000000..d427c5e2b --- /dev/null +++ b/pkg/service/ioservice_sip.go @@ -0,0 +1,328 @@ +package service + +import ( + "context" + "fmt" + "math" + "regexp" + "sort" + + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" + "github.com/livekit/protocol/rpc" +) + +// sipRulePriority returns sorting priority for dispatch rules. Lower value means higher priority. +func sipRulePriority(info *livekit.SIPDispatchRuleInfo) int32 { + // In all these cases, prefer pin-protected rules. + // Thus, the order will be the following: + // - 0: Direct or Pin (both pin-protected) + // - 1: Individual (pin-protected) + // - 100: Direct (open) + // - 101: Individual (open) + const ( + last = math.MaxInt32 + ) + // TODO: Maybe allow setting specific priorities for dispatch rules? + switch rule := info.GetRule().GetRule().(type) { + default: + return last + case *livekit.SIPDispatchRule_DispatchRuleDirect: + if rule.DispatchRuleDirect.GetPin() != "" { + return 0 + } + return 100 + case *livekit.SIPDispatchRule_DispatchRuleIndividual: + if rule.DispatchRuleIndividual.GetPin() != "" { + return 1 + } + return 101 + } +} + +// sipSortRules predictably sorts dispatch rules by priority (first one is highest). +func sipSortRules(rules []*livekit.SIPDispatchRuleInfo) { + sort.Slice(rules, func(i, j int) bool { + p1, p2 := sipRulePriority(rules[i]), sipRulePriority(rules[j]) + if p1 < p2 { + return true + } else if p1 > p2 { + return false + } + // For predictable sorting order. + room1, _, _ := sipGetPinAndRoom(rules[i]) + room2, _, _ := sipGetPinAndRoom(rules[j]) + return room1 < room2 + }) +} + +// sipSelectDispatch takes a list of dispatch rules, and takes the decision which one should be selected. +// It returns an error if there are conflicting rules. Returns nil if no rules match. +func sipSelectDispatch(rules []*livekit.SIPDispatchRuleInfo, req *rpc.EvaluateSIPDispatchRulesRequest) (*livekit.SIPDispatchRuleInfo, error) { + if len(rules) == 0 { + return nil, nil + } + // Sorting will do the selection for us. We already filtered out irrelevant ones in matchSIPDispatchRule. + sipSortRules(rules) + byPin := make(map[string]*livekit.SIPDispatchRuleInfo) + var ( + pinRule *livekit.SIPDispatchRuleInfo + openRule *livekit.SIPDispatchRuleInfo + ) + openCnt := 0 + for _, r := range rules { + _, pin, err := sipGetPinAndRoom(r) + if err != nil { + return nil, err + } + if pin == "" { + openRule = r // last one + openCnt++ + } else if r2 := byPin[pin]; r2 != nil { + return nil, fmt.Errorf("Conflicting SIP Dispatch Rules: Same PIN for %q and %q", + r.SipDispatchRuleId, r2.SipDispatchRuleId) + } else { + byPin[pin] = r + // Pick the first one with a Pin. If Pin was provided in the request, we already filtered the right rules. + // If not, this rule will just be used to send RequestPin=true flag. + if pinRule == nil { + pinRule = r + } + } + } + if req.GetPin() != "" { + // If it's still nil that's fine. We will report "no rules matched" later. + return pinRule, nil + } + if pinRule != nil { + return pinRule, nil + } + if openCnt > 1 { + return nil, fmt.Errorf("Conflicting SIP Dispatch Rules: Matched %d open rules for %q", openCnt, req.CallingNumber) + } + return openRule, nil +} + +// sipGetPinAndRoom returns a room name/prefix and the pin for a dispatch rule. Just a convenience wrapper. +func sipGetPinAndRoom(info *livekit.SIPDispatchRuleInfo) (room, pin string, err error) { + // TODO: Could probably add methods on SIPDispatchRuleInfo struct instead. + switch rule := info.GetRule().GetRule().(type) { + default: + return "", "", fmt.Errorf("Unsupported SIP Dispatch Rule: %T", rule) + case *livekit.SIPDispatchRule_DispatchRuleDirect: + pin = rule.DispatchRuleDirect.GetPin() + room = rule.DispatchRuleDirect.GetRoomName() + case *livekit.SIPDispatchRule_DispatchRuleIndividual: + pin = rule.DispatchRuleIndividual.GetPin() + room = rule.DispatchRuleIndividual.GetRoomPrefix() + } + return room, pin, nil +} + +// sipMatchTrunk finds a SIP Trunk definition matching the request. +// Returns nil if no rules matched or an error if there are conflicting definitions. +func sipMatchTrunk(trunks []*livekit.SIPTrunkInfo, calling, called string) (*livekit.SIPTrunkInfo, error) { + var ( + selectedTrunk *livekit.SIPTrunkInfo + defaultTrunk *livekit.SIPTrunkInfo + defaultTrunkCnt int // to error in case there are multiple ones + ) + for _, tr := range trunks { + // Do not consider it if regexp doesn't match. + matches := len(tr.InboundNumbersRegex) == 0 + for _, reStr := range tr.InboundNumbersRegex { + // TODO: we should cache it + re, err := regexp.Compile(reStr) + if err != nil { + logger.Errorw("cannot parse SIP trunk regexp", err, "trunkID", tr.SipTrunkId) + continue + } + if re.MatchString(calling) { + matches = true + break + } + } + if !matches { + continue + } + if tr.OutboundNumber == "" { + // Default/wildcard trunk. + defaultTrunk = tr + defaultTrunkCnt++ + } else if tr.OutboundNumber == called { + // Trunk specific to the number. + if selectedTrunk != nil { + return nil, fmt.Errorf("Multiple SIP Trunks matched for %q", called) + } + selectedTrunk = tr + // Keep searching! We want to know if there are any conflicting Trunk definitions. + } + } + if selectedTrunk != nil { + return selectedTrunk, nil + } + if defaultTrunkCnt > 1 { + return nil, fmt.Errorf("Multiple default SIP Trunks matched for %q", called) + } + // Could still be nil here. + return defaultTrunk, nil +} + +// sipMatchDispatchRule finds the best dispatch rule matching the request parameters. Returns an error if no rule matched. +// Trunk parameter can be nil, in which case only wildcard dispatch rules will be effective (ones without Trunk IDs). +func sipMatchDispatchRule(trunk *livekit.SIPTrunkInfo, rules []*livekit.SIPDispatchRuleInfo, req *rpc.EvaluateSIPDispatchRulesRequest) (*livekit.SIPDispatchRuleInfo, error) { + // Trunk can still be nil here in case none matched or were defined. + // This is still fine, but only in case we'll match exactly one wildcard dispatch rule. + if len(rules) == 0 { + return nil, fmt.Errorf("No SIP Dispatch Rules defined") + } + // We split the matched dispatch rules into two sets: specific and default (aka wildcard). + // First, attempt to match any of the specific rules, where we did match the Trunk ID. + // If nothing matches there - fallback to default/wildcard rules, where no Trunk IDs were mentioned. + var ( + specificRules []*livekit.SIPDispatchRuleInfo + defaultRules []*livekit.SIPDispatchRuleInfo + ) + noPin := req.NoPin + sentPin := req.GetPin() + for _, info := range rules { + _, rulePin, err := sipGetPinAndRoom(info) + if err != nil { + logger.Errorw("Invalid SIP Dispatch Rule", err, "dispatchRuleID", info.SipDispatchRuleId) + continue + } + // Filter heavily on the Pin, so that only relevant rules remain. + if noPin { + if rulePin != "" { + // Skip pin-protected rules if no pin mode requested. + continue + } + } else if sentPin != "" { + if rulePin == "" { + // Pin already sent, skip non-pin-protected rules. + continue + } + if sentPin != rulePin { + // Pin doesn't match. Don't return an error here, just wait for other rule to match (or none at all). + // Note that we will NOT match non-pin-protected rules, thus it will not fallback to open rules. + continue + } + } + if len(info.TrunkIds) == 0 { + // Default/wildcard dispatch rule. + defaultRules = append(defaultRules, info) + continue + } + // Specific dispatch rules. Require a Trunk associated with the number. + if trunk == nil { + continue + } + matches := false + for _, id := range info.TrunkIds { + if id == trunk.SipTrunkId { + matches = true + break + } + } + if !matches { + continue + } + specificRules = append(specificRules, info) + } + best, err := sipSelectDispatch(specificRules, req) + if err != nil { + return nil, err + } else if best != nil { + return best, nil + } + best, err = sipSelectDispatch(defaultRules, req) + if err != nil { + return nil, err + } else if best != nil { + return best, nil + } + if trunk == nil { + return nil, fmt.Errorf("No SIP Trunk or Dispatch Rules matched for %q", req.CalledNumber) + } + return nil, fmt.Errorf("No SIP Dispatch Rules matched for %q", req.CalledNumber) +} + +// matchSIPTrunk finds a SIP Trunk definition matching the request. +// Returns nil if no rules matched or an error if there are conflicting definitions. +func (s *IOInfoService) matchSIPTrunk(ctx context.Context, calling, called string) (*livekit.SIPTrunkInfo, error) { + trunks, err := s.ss.ListSIPTrunk(ctx) + if err != nil { + return nil, err + } + return sipMatchTrunk(trunks, calling, called) +} + +// matchSIPDispatchRule finds the best dispatch rule matching the request parameters. Returns an error if no rule matched. +// Trunk parameter can be nil, in which case only wildcard dispatch rules will be effective (ones without Trunk IDs). +func (s *IOInfoService) matchSIPDispatchRule(ctx context.Context, trunk *livekit.SIPTrunkInfo, req *rpc.EvaluateSIPDispatchRulesRequest) (*livekit.SIPDispatchRuleInfo, error) { + // Trunk can still be nil here in case none matched or were defined. + // This is still fine, but only in case we'll match exactly one wildcard dispatch rule. + rules, err := s.ss.ListSIPDispatchRule(ctx) + if err != nil { + return nil, err + } + return sipMatchDispatchRule(trunk, rules, req) +} + +func (s *IOInfoService) EvaluateSIPDispatchRules(ctx context.Context, req *rpc.EvaluateSIPDispatchRulesRequest) (*rpc.EvaluateSIPDispatchRulesResponse, error) { + trunk, err := s.matchSIPTrunk(ctx, req.CallingNumber, req.CalledNumber) + if err != nil { + return nil, err + } + best, err := s.matchSIPDispatchRule(ctx, trunk, req) + if err != nil { + return nil, err + } + sentPin := req.GetPin() + + from := req.CallingNumber + if best.HidePhoneNumber { + // TODO: Decide on the phone masking format. + // Maybe keep regional code, but mask all but 4 last digits? + from = from[len(from)-4:] + } + fromName := "Phone " + from + + room, rulePin, err := sipGetPinAndRoom(best) + if err != nil { + return nil, err + } + if rulePin != "" { + if sentPin == "" { + return &rpc.EvaluateSIPDispatchRulesResponse{ + RequestPin: true, + }, nil + } + if rulePin != sentPin { + // This should never happen in practice, because matchSIPDispatchRule should remove rules with the wrong pin. + return nil, fmt.Errorf("Incorrect PIN for SIP room") + } + } else { + // Pin was sent, but room doesn't require one. Assume user accidentally pressed phone button. + } + switch rule := best.GetRule().GetRule().(type) { + case *livekit.SIPDispatchRule_DispatchRuleIndividual: + // TODO: Decide on the suffix. Do we need to escape specific characters? + room = rule.DispatchRuleIndividual.GetRoomPrefix() + from + } + return &rpc.EvaluateSIPDispatchRulesResponse{ + RoomName: room, + ParticipantIdentity: fromName, + }, nil +} + +func (s *IOInfoService) GetSIPTrunkAuthentication(ctx context.Context, req *rpc.GetSIPTrunkAuthenticationRequest) (*rpc.GetSIPTrunkAuthenticationResponse, error) { + trunk, err := s.matchSIPTrunk(ctx, req.From, req.To) + if err != nil { + return nil, err + } + return &rpc.GetSIPTrunkAuthenticationResponse{ + Username: trunk.Username, + Password: trunk.Password, + }, nil +} diff --git a/pkg/service/ioservice_sip_test.go b/pkg/service/ioservice_sip_test.go new file mode 100644 index 000000000..b8b57fd07 --- /dev/null +++ b/pkg/service/ioservice_sip_test.go @@ -0,0 +1,390 @@ +package service + +import ( + "fmt" + "testing" + + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/rpc" + "github.com/stretchr/testify/require" +) + +const ( + sipNumber1 = "1111 1111" + sipNumber2 = "2222 2222" + sipNumber3 = "3333 3333" + sipTrunkID1 = "aaa" + sipTrunkID2 = "bbb" +) + +func TestSIPMatchTrunk(t *testing.T) { + cases := []struct { + name string + trunks []*livekit.SIPTrunkInfo + exp int + expErr bool + }{ + { + name: "empty", + trunks: nil, + exp: -1, // no error; nil result + }, + { + name: "one wildcard", + trunks: []*livekit.SIPTrunkInfo{ + {SipTrunkId: "aaa"}, + }, + exp: 0, + }, + { + name: "matching", + trunks: []*livekit.SIPTrunkInfo{ + {SipTrunkId: "aaa", OutboundNumber: sipNumber2}, + }, + exp: 0, + }, + { + name: "matching regexp", + trunks: []*livekit.SIPTrunkInfo{ + {SipTrunkId: "aaa", OutboundNumber: sipNumber2, InboundNumbersRegex: []string{`^\d+ \d+$`}}, + }, + exp: 0, + }, + { + name: "not matching", + trunks: []*livekit.SIPTrunkInfo{ + {SipTrunkId: "aaa", OutboundNumber: sipNumber3}, + }, + exp: -1, + }, + { + name: "not matching regexp", + trunks: []*livekit.SIPTrunkInfo{ + {SipTrunkId: "aaa", OutboundNumber: sipNumber2, InboundNumbersRegex: []string{`^\d+$`}}, + }, + exp: -1, + }, + { + name: "one match", + trunks: []*livekit.SIPTrunkInfo{ + {SipTrunkId: "aaa", OutboundNumber: sipNumber3}, + {SipTrunkId: "bbb", OutboundNumber: sipNumber2}, + }, + exp: 1, + }, + { + name: "many matches", + trunks: []*livekit.SIPTrunkInfo{ + {SipTrunkId: "aaa", OutboundNumber: sipNumber3}, + {SipTrunkId: "bbb", OutboundNumber: sipNumber2}, + {SipTrunkId: "ccc", OutboundNumber: sipNumber2}, + }, + expErr: true, + }, + { + name: "many matches default", + trunks: []*livekit.SIPTrunkInfo{ + {SipTrunkId: "aaa", OutboundNumber: sipNumber3}, + {SipTrunkId: "bbb"}, + {SipTrunkId: "ccc", OutboundNumber: sipNumber2}, + {SipTrunkId: "ddd"}, + }, + exp: 2, + }, + { + name: "regexp", + trunks: []*livekit.SIPTrunkInfo{ + {SipTrunkId: "aaa", OutboundNumber: sipNumber3}, + {SipTrunkId: "bbb", OutboundNumber: sipNumber2}, + {SipTrunkId: "ccc", OutboundNumber: sipNumber2, InboundNumbersRegex: []string{`^\d+$`}}, + }, + exp: 1, + }, + { + name: "multiple defaults", + trunks: []*livekit.SIPTrunkInfo{ + {SipTrunkId: "aaa", OutboundNumber: sipNumber3}, + {SipTrunkId: "bbb"}, + {SipTrunkId: "ccc"}, + }, + expErr: true, + }, + } + for _, c := range cases { + c := c + t.Run(c.name, func(t *testing.T) { + got, err := sipMatchTrunk(c.trunks, sipNumber1, sipNumber2) + if c.expErr { + require.Error(t, err) + require.Nil(t, got) + t.Log(err) + } else { + var exp *livekit.SIPTrunkInfo + if c.exp >= 0 { + exp = c.trunks[c.exp] + } + require.NoError(t, err) + require.Equal(t, exp, got) + } + }) + } +} + +func newSIPTrunkDispatch() *livekit.SIPTrunkInfo { + return &livekit.SIPTrunkInfo{ + SipTrunkId: sipTrunkID1, + OutboundNumber: sipNumber2, + } +} + +func newSIPReqDispatch(pin string, noPin bool) *rpc.EvaluateSIPDispatchRulesRequest { + return &rpc.EvaluateSIPDispatchRulesRequest{ + CallingNumber: sipNumber1, + CalledNumber: sipNumber2, + Pin: pin, + //NoPin: noPin, // TODO + } +} + +func newDirectDispatch(room, pin string) *livekit.SIPDispatchRule { + return &livekit.SIPDispatchRule{ + Rule: &livekit.SIPDispatchRule_DispatchRuleDirect{ + DispatchRuleDirect: &livekit.SIPDispatchRuleDirect{ + RoomName: room, Pin: pin, + }, + }, + } +} + +func newIndividualDispatch(roomPref, pin string) *livekit.SIPDispatchRule { + return &livekit.SIPDispatchRule{ + Rule: &livekit.SIPDispatchRule_DispatchRuleIndividual{ + DispatchRuleIndividual: &livekit.SIPDispatchRuleIndividual{ + RoomPrefix: roomPref, Pin: pin, + }, + }, + } +} + +func TestSIPMatchDispatchRule(t *testing.T) { + cases := []struct { + name string + trunk *livekit.SIPTrunkInfo + rules []*livekit.SIPDispatchRuleInfo + reqPin string + noPin bool + exp int + expErr bool + }{ + // These cases just validate that no rules produce an error. + { + name: "empty", + trunk: nil, + rules: nil, + expErr: true, + }, + { + name: "only trunk", + trunk: newSIPTrunkDispatch(), + rules: nil, + expErr: true, + }, + // Default rules should work even if no trunk is defined. + { + name: "one rule/no trunk", + trunk: nil, + rules: []*livekit.SIPDispatchRuleInfo{ + {TrunkIds: nil, Rule: newDirectDispatch("sip", "")}, + }, + exp: 0, + }, + // Default rule should work with a trunk too. + { + name: "one rule/default trunk", + trunk: newSIPTrunkDispatch(), + rules: []*livekit.SIPDispatchRuleInfo{ + {TrunkIds: nil, Rule: newDirectDispatch("sip", "")}, + }, + exp: 0, + }, + // Rule matching the trunk should be selected. + { + name: "one rule/specific trunk", + trunk: newSIPTrunkDispatch(), + rules: []*livekit.SIPDispatchRuleInfo{ + {TrunkIds: []string{sipTrunkID1, sipTrunkID2}, Rule: newDirectDispatch("sip", "")}, + }, + exp: 0, + }, + // Rule NOT matching the trunk should NOT be selected. + { + name: "one rule/wrong trunk", + trunk: newSIPTrunkDispatch(), + rules: []*livekit.SIPDispatchRuleInfo{ + {TrunkIds: []string{"zzz"}, Rule: newDirectDispatch("sip", "")}, + }, + expErr: true, + }, + // Direct rule with a pin should be selected, even if no pin is provided. + { + name: "direct pin/correct", + trunk: newSIPTrunkDispatch(), + rules: []*livekit.SIPDispatchRuleInfo{ + {TrunkIds: []string{sipTrunkID1}, Rule: newDirectDispatch("sip", "123")}, + {TrunkIds: []string{sipTrunkID2}, Rule: newDirectDispatch("sip", "456")}, + }, + reqPin: "123", + exp: 0, + }, + // Direct rule with a pin should reject wrong pin. + { + name: "direct pin/wrong", + trunk: newSIPTrunkDispatch(), + rules: []*livekit.SIPDispatchRuleInfo{ + {TrunkIds: []string{sipTrunkID1}, Rule: newDirectDispatch("sip", "123")}, + {TrunkIds: []string{sipTrunkID2}, Rule: newDirectDispatch("sip", "456")}, + }, + reqPin: "zzz", + expErr: true, + }, + // Multiple direct rules with the same pin should result in an error. + { + name: "direct pin/conflict", + trunk: newSIPTrunkDispatch(), + rules: []*livekit.SIPDispatchRuleInfo{ + {TrunkIds: []string{sipTrunkID1}, Rule: newDirectDispatch("sip1", "123")}, + {TrunkIds: []string{sipTrunkID1, sipTrunkID2}, Rule: newDirectDispatch("sip2", "123")}, + }, + reqPin: "123", + expErr: true, + }, + // Multiple direct rules with the same pin on different trunks are ok. + { + name: "direct pin/no conflict on different trunk", + trunk: newSIPTrunkDispatch(), + rules: []*livekit.SIPDispatchRuleInfo{ + {TrunkIds: []string{sipTrunkID1}, Rule: newDirectDispatch("sip1", "123")}, + {TrunkIds: []string{sipTrunkID2}, Rule: newDirectDispatch("sip2", "123")}, + }, + reqPin: "123", + exp: 0, + }, + // Specific direct rules should take priority over default direct rules. + { + name: "direct pin/default and specific", + trunk: newSIPTrunkDispatch(), + rules: []*livekit.SIPDispatchRuleInfo{ + {TrunkIds: nil, Rule: newDirectDispatch("sip1", "123")}, + {TrunkIds: []string{sipTrunkID1}, Rule: newDirectDispatch("sip2", "123")}, + }, + reqPin: "123", + exp: 1, + }, + // Specific direct rules should take priority over default direct rules. No pin. + { + name: "direct/default and specific", + trunk: newSIPTrunkDispatch(), + rules: []*livekit.SIPDispatchRuleInfo{ + {TrunkIds: nil, Rule: newDirectDispatch("sip1", "")}, + {TrunkIds: []string{sipTrunkID1}, Rule: newDirectDispatch("sip2", "")}, + }, + exp: 1, + }, + // Specific direct rules should take priority over default direct rules. One with pin, other without. + { + name: "direct/default and specific/mixed 1", + trunk: newSIPTrunkDispatch(), + rules: []*livekit.SIPDispatchRuleInfo{ + {TrunkIds: nil, Rule: newDirectDispatch("sip1", "123")}, + {TrunkIds: []string{sipTrunkID1}, Rule: newDirectDispatch("sip2", "")}, + }, + exp: 1, + }, + { + name: "direct/default and specific/mixed 2", + trunk: newSIPTrunkDispatch(), + rules: []*livekit.SIPDispatchRuleInfo{ + {TrunkIds: nil, Rule: newDirectDispatch("sip1", "")}, + {TrunkIds: []string{sipTrunkID1}, Rule: newDirectDispatch("sip2", "123")}, + }, + exp: 1, + }, + // Multiple default direct rules are not allowed. + { + name: "direct/multiple defaults", + trunk: newSIPTrunkDispatch(), + rules: []*livekit.SIPDispatchRuleInfo{ + {TrunkIds: nil, Rule: newDirectDispatch("sip1", "")}, + {TrunkIds: nil, Rule: newDirectDispatch("sip2", "")}, + }, + expErr: true, + }, + // Cannot use both direct and individual rules with the same pin setup. + { + name: "direct vs individual/private", + trunk: newSIPTrunkDispatch(), + rules: []*livekit.SIPDispatchRuleInfo{ + {TrunkIds: nil, Rule: newIndividualDispatch("pref_", "123")}, + {TrunkIds: nil, Rule: newDirectDispatch("sip", "123")}, + }, + expErr: true, + }, + { + name: "direct vs individual/open", + trunk: newSIPTrunkDispatch(), + rules: []*livekit.SIPDispatchRuleInfo{ + {TrunkIds: nil, Rule: newIndividualDispatch("pref_", "")}, + {TrunkIds: nil, Rule: newDirectDispatch("sip", "")}, + }, + expErr: true, + }, + // Direct rules take priority over individual rules. + { + name: "direct vs individual/priority", + trunk: newSIPTrunkDispatch(), + rules: []*livekit.SIPDispatchRuleInfo{ + {TrunkIds: nil, Rule: newIndividualDispatch("pref_", "123")}, + {TrunkIds: nil, Rule: newDirectDispatch("sip", "456")}, + }, + reqPin: "456", + exp: 1, + }, + } + for _, c := range cases { + c := c + t.Run(c.name, func(t *testing.T) { + pins := []string{c.reqPin} + if !c.expErr && c.reqPin != "" { + // Should match the same rule, even if no pin is set (so that it can be requested). + pins = append(pins, "") + } + for i, r := range c.rules { + if r.SipDispatchRuleId == "" { + r.SipDispatchRuleId = fmt.Sprintf("rule_%d", i) + } + } + for _, pin := range pins { + pin := pin + name := pin + if name == "" { + name = "no pin" + } + t.Run(name, func(t *testing.T) { + got, err := sipMatchDispatchRule(c.trunk, c.rules, newSIPReqDispatch(pin, c.noPin)) + if c.expErr { + require.Error(t, err) + require.Nil(t, got) + t.Log(err) + } else { + var exp *livekit.SIPDispatchRuleInfo + if c.exp >= 0 { + exp = c.rules[c.exp] + } + require.NoError(t, err) + require.Equal(t, exp, got) + } + }) + } + }) + } +} diff --git a/pkg/service/redisstore.go b/pkg/service/redisstore.go index cc8c2e146..dfb4c3626 100644 --- a/pkg/service/redisstore.go +++ b/pkg/service/redisstore.go @@ -51,6 +51,10 @@ const ( IngressStatePrefix = "{ingress}_state:" RoomIngressPrefix = "room_{ingress}:" + SIPTrunkKey = "sip_trunk" + SIPDispatchRuleKey = "sip_dispatch_rule" + SIPParticipantKey = "sip_participant" + // RoomParticipantsPrefix is hash of participant_name => ParticipantInfo RoomParticipantsPrefix = "room_participants:" @@ -812,3 +816,129 @@ func (s *RedisStore) DeleteIngress(_ context.Context, info *livekit.IngressInfo) return nil } + +func (s *RedisStore) loadOne(ctx context.Context, key, id string, info proto.Message, notFoundErr error) error { + data, err := s.rc.HGet(s.ctx, key, id).Result() + switch err { + case nil: + return proto.Unmarshal([]byte(data), info) + case redis.Nil: + return notFoundErr + default: + return err + } +} + +func (s *RedisStore) loadMany(ctx context.Context, key string, onResult func() proto.Message) error { + data, err := s.rc.HGetAll(s.ctx, key).Result() + if err != nil { + if err == redis.Nil { + return nil + } + return err + } + + for _, d := range data { + if err = proto.Unmarshal([]byte(d), onResult()); err != nil { + return err + } + } + + return nil +} + +func (s *RedisStore) StoreSIPTrunk(ctx context.Context, info *livekit.SIPTrunkInfo) error { + data, err := proto.Marshal(info) + if err != nil { + return err + } + + return s.rc.HSet(s.ctx, SIPTrunkKey, info.SipTrunkId, data).Err() +} + +func (s *RedisStore) LoadSIPTrunk(ctx context.Context, sipTrunkId string) (*livekit.SIPTrunkInfo, error) { + info := &livekit.SIPTrunkInfo{} + if err := s.loadOne(ctx, SIPTrunkKey, sipTrunkId, info, ErrSIPTrunkNotFound); err != nil { + return nil, err + } + + return info, nil +} + +func (s *RedisStore) DeleteSIPTrunk(ctx context.Context, info *livekit.SIPTrunkInfo) error { + return s.rc.HDel(s.ctx, SIPTrunkKey, info.SipTrunkId).Err() +} + +func (s *RedisStore) ListSIPTrunk(ctx context.Context) (infos []*livekit.SIPTrunkInfo, err error) { + err = s.loadMany(ctx, SIPTrunkKey, func() proto.Message { + infos = append(infos, &livekit.SIPTrunkInfo{}) + return infos[len(infos)-1] + }) + + return infos, err +} + +func (s *RedisStore) StoreSIPDispatchRule(ctx context.Context, info *livekit.SIPDispatchRuleInfo) error { + data, err := proto.Marshal(info) + if err != nil { + return err + } + + return s.rc.HSet(s.ctx, SIPDispatchRuleKey, info.SipDispatchRuleId, data).Err() +} + +func (s *RedisStore) LoadSIPDispatchRule(ctx context.Context, sipDispatchRuleId string) (*livekit.SIPDispatchRuleInfo, error) { + info := &livekit.SIPDispatchRuleInfo{} + if err := s.loadOne(ctx, SIPDispatchRuleKey, sipDispatchRuleId, info, ErrSIPDispatchRuleNotFound); err != nil { + return nil, err + } + + return info, nil +} + +func (s *RedisStore) DeleteSIPDispatchRule(ctx context.Context, info *livekit.SIPDispatchRuleInfo) error { + return s.rc.HDel(s.ctx, SIPDispatchRuleKey, info.SipDispatchRuleId).Err() +} + +func (s *RedisStore) ListSIPDispatchRule(ctx context.Context) (infos []*livekit.SIPDispatchRuleInfo, err error) { + err = s.loadMany(ctx, SIPDispatchRuleKey, func() proto.Message { + infos = append(infos, &livekit.SIPDispatchRuleInfo{}) + return infos[len(infos)-1] + }) + + return infos, err +} + +func (s *RedisStore) StoreSIPParticipant(ctx context.Context, info *livekit.SIPParticipantInfo) error { + data, err := proto.Marshal(info) + if err != nil { + return err + } + + return s.rc.HSet(s.ctx, SIPParticipantKey, info.SipParticipantId, data).Err() +} +func (s *RedisStore) LoadSIPParticipant(ctx context.Context, sipParticipantId string) (*livekit.SIPParticipantInfo, error) { + info := &livekit.SIPParticipantInfo{} + if err := s.loadOne(ctx, SIPParticipantKey, sipParticipantId, info, ErrSIPParticipantNotFound); err != nil { + return nil, err + } + + return info, nil +} + +func (s *RedisStore) DeleteSIPParticipant(ctx context.Context, info *livekit.SIPParticipantInfo) error { + return s.rc.HDel(s.ctx, SIPParticipantKey, info.SipParticipantId).Err() +} + +func (s *RedisStore) ListSIPParticipant(ctx context.Context) (infos []*livekit.SIPParticipantInfo, err error) { + err = s.loadMany(ctx, SIPParticipantKey, func() proto.Message { + infos = append(infos, &livekit.SIPParticipantInfo{}) + return infos[len(infos)-1] + }) + + return infos, err +} + +func (s *RedisStore) SendSIPParticipantDTMF(ctx context.Context, info *livekit.SendSIPParticipantDTMFRequest) (*livekit.SIPParticipantDTMFInfo, error) { + return nil, fmt.Errorf("TODO") +} diff --git a/pkg/service/rtcservice.go b/pkg/service/rtcservice.go index 884a89656..86ff5f630 100644 --- a/pkg/service/rtcservice.go +++ b/pkg/service/rtcservice.go @@ -269,7 +269,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) { close(done) if signalStats != nil { - signalStats.Report() + signalStats.Stop() } }() diff --git a/pkg/service/server.go b/pkg/service/server.go index f7ade50d2..f66d848b5 100644 --- a/pkg/service/server.go +++ b/pkg/service/server.go @@ -65,6 +65,7 @@ func NewLivekitServer(conf *config.Config, roomService livekit.RoomService, egressService *EgressService, ingressService *IngressService, + sipService *SIPService, ioService *IOInfoService, rtcService *RTCService, agentService *AgentService, @@ -116,6 +117,7 @@ func NewLivekitServer(conf *config.Config, ), )) ingressServer := livekit.NewIngressServer(ingressService, twirpLoggingHook) + sipServer := livekit.NewSIPServer(sipService, twirpLoggingHook) mux := http.NewServeMux() if conf.Development { @@ -127,6 +129,7 @@ func NewLivekitServer(conf *config.Config, mux.Handle(roomServer.PathPrefix(), roomServer) mux.Handle(egressServer.PathPrefix(), egressServer) mux.Handle(ingressServer.PathPrefix(), ingressServer) + mux.Handle(sipServer.PathPrefix(), sipServer) mux.Handle("/rtc", rtcService) mux.Handle("/agent", agentService) mux.HandleFunc("/rtc/validate", rtcService.Validate) diff --git a/pkg/service/sip.go b/pkg/service/sip.go new file mode 100644 index 000000000..4a7fe411a --- /dev/null +++ b/pkg/service/sip.go @@ -0,0 +1,207 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package service + +import ( + "context" + "fmt" + + "github.com/livekit/livekit-server/pkg/config" + "github.com/livekit/livekit-server/pkg/telemetry" + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/rpc" + "github.com/livekit/protocol/utils" + "github.com/livekit/psrpc" +) + +type SIPService struct { + conf *config.SIPConfig + nodeID livekit.NodeID + bus psrpc.MessageBus + psrpcClient rpc.SIPClient + store SIPStore + roomService livekit.RoomService +} + +func NewSIPService( + conf *config.SIPConfig, + nodeID livekit.NodeID, + bus psrpc.MessageBus, + psrpcClient rpc.SIPClient, + store SIPStore, + rs livekit.RoomService, + ts telemetry.TelemetryService, +) *SIPService { + return &SIPService{ + conf: conf, + nodeID: nodeID, + bus: bus, + psrpcClient: psrpcClient, + store: store, + roomService: rs, + } +} + +func (s *SIPService) CreateSIPTrunk(ctx context.Context, req *livekit.CreateSIPTrunkRequest) (*livekit.SIPTrunkInfo, error) { + if s.store == nil { + return nil, ErrSIPNotConnected + } + + info := &livekit.SIPTrunkInfo{ + SipTrunkId: utils.NewGuid(utils.SIPTrunkPrefix), + InboundAddresses: req.InboundAddresses, + OutboundAddress: req.OutboundAddress, + OutboundNumber: req.OutboundNumber, + InboundNumbersRegex: req.InboundNumbersRegex, + Username: req.Username, + Password: req.Password, + } + + if err := s.store.StoreSIPTrunk(ctx, info); err != nil { + return nil, err + } + return info, nil +} + +func (s *SIPService) ListSIPTrunk(ctx context.Context, req *livekit.ListSIPTrunkRequest) (*livekit.ListSIPTrunkResponse, error) { + if s.store == nil { + return nil, ErrSIPNotConnected + } + + trunks, err := s.store.ListSIPTrunk(ctx) + if err != nil { + return nil, err + } + + return &livekit.ListSIPTrunkResponse{Items: trunks}, nil +} + +func (s *SIPService) DeleteSIPTrunk(ctx context.Context, req *livekit.DeleteSIPTrunkRequest) (*livekit.SIPTrunkInfo, error) { + if s.store == nil { + return nil, ErrSIPNotConnected + } + + info, err := s.store.LoadSIPTrunk(ctx, req.SipTrunkId) + if err != nil { + return nil, err + } + + if err = s.store.DeleteSIPTrunk(ctx, info); err != nil { + return nil, err + } + + return info, nil +} + +func (s *SIPService) CreateSIPDispatchRule(ctx context.Context, req *livekit.CreateSIPDispatchRuleRequest) (*livekit.SIPDispatchRuleInfo, error) { + if s.store == nil { + return nil, ErrSIPNotConnected + } + + info := &livekit.SIPDispatchRuleInfo{ + SipDispatchRuleId: utils.NewGuid(utils.SIPDispatchRulePrefix), + Rule: req.Rule, + TrunkIds: req.TrunkIds, + HidePhoneNumber: req.HidePhoneNumber, + } + + if err := s.store.StoreSIPDispatchRule(ctx, info); err != nil { + return nil, err + } + return info, nil +} + +func (s *SIPService) ListSIPDispatchRule(ctx context.Context, req *livekit.ListSIPDispatchRuleRequest) (*livekit.ListSIPDispatchRuleResponse, error) { + if s.store == nil { + return nil, ErrSIPNotConnected + } + + rules, err := s.store.ListSIPDispatchRule(ctx) + if err != nil { + return nil, err + } + + return &livekit.ListSIPDispatchRuleResponse{Items: rules}, nil +} + +func (s *SIPService) DeleteSIPDispatchRule(ctx context.Context, req *livekit.DeleteSIPDispatchRuleRequest) (*livekit.SIPDispatchRuleInfo, error) { + if s.store == nil { + return nil, ErrSIPNotConnected + } + + info, err := s.store.LoadSIPDispatchRule(ctx, req.SipDispatchRuleId) + if err != nil { + return nil, err + } + + if err = s.store.DeleteSIPDispatchRule(ctx, info); err != nil { + return nil, err + } + + return info, nil +} + +func (s *SIPService) CreateSIPParticipant(ctx context.Context, req *livekit.CreateSIPParticipantRequest) (*livekit.SIPParticipantInfo, error) { + if s.store == nil { + return nil, ErrSIPNotConnected + } + + info := &livekit.SIPParticipantInfo{ + SipParticipantId: utils.NewGuid(utils.SIPParticipantPrefix), + } + + if err := s.store.StoreSIPParticipant(ctx, info); err != nil { + return nil, err + } + return info, nil +} + +func (s *SIPService) ListSIPParticipant(ctx context.Context, req *livekit.ListSIPParticipantRequest) (*livekit.ListSIPParticipantResponse, error) { + if s.store == nil { + return nil, ErrSIPNotConnected + } + + participants, err := s.store.ListSIPParticipant(ctx) + if err != nil { + return nil, err + } + + return &livekit.ListSIPParticipantResponse{Items: participants}, nil +} + +func (s *SIPService) DeleteSIPParticipant(ctx context.Context, req *livekit.DeleteSIPParticipantRequest) (*livekit.SIPParticipantInfo, error) { + if s.store == nil { + return nil, ErrSIPNotConnected + } + + info, err := s.store.LoadSIPParticipant(ctx, req.SipParticipantId) + if err != nil { + return nil, err + } + + if err = s.store.DeleteSIPParticipant(ctx, info); err != nil { + return nil, err + } + + return info, nil +} + +func (s *SIPService) SendSIPParticipantDTMF(ctx context.Context, req *livekit.SendSIPParticipantDTMFRequest) (*livekit.SIPParticipantDTMFInfo, error) { + if s.store == nil { + return nil, ErrSIPNotConnected + } + + return nil, fmt.Errorf("TODO") +} diff --git a/pkg/service/wire.go b/pkg/service/wire.go index afca2631e..93e3784b0 100644 --- a/pkg/service/wire.go +++ b/pkg/service/wire.go @@ -70,6 +70,10 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live getIngressStore, getIngressConfig, NewIngressService, + rpc.NewSIPClient, + getSIPStore, + getSIPConfig, + NewSIPService, NewRoomAllocator, NewRoomService, NewRTCService, @@ -195,6 +199,19 @@ func getIngressConfig(conf *config.Config) *config.IngressConfig { return &conf.Ingress } +func getSIPStore(s ObjectStore) SIPStore { + switch store := s.(type) { + case *RedisStore: + return store + default: + return nil + } +} + +func getSIPConfig(conf *config.Config) *config.SIPConfig { + return &conf.SIP +} + func createClientConfiguration() clientconfiguration.ClientConfigurationManager { return clientconfiguration.NewStaticClientConfigurationManager(clientconfiguration.StaticConfigurations) } diff --git a/pkg/service/wire_gen.go b/pkg/service/wire_gen.go index 3a6470b88..0f0eb6e87 100644 --- a/pkg/service/wire_gen.go +++ b/pkg/service/wire_gen.go @@ -66,6 +66,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live } egressStore := getEgressStore(objectStore) ingressStore := getIngressStore(objectStore) + sipStore := getSIPStore(objectStore) keyProvider, err := createKeyProvider(conf) if err != nil { return nil, err @@ -76,7 +77,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live } analyticsService := telemetry.NewAnalyticsService(conf, currentNode) telemetryService := telemetry.NewTelemetryService(queuedNotifier, analyticsService) - ioInfoService, err := NewIOInfoService(messageBus, egressStore, ingressStore, telemetryService) + ioInfoService, err := NewIOInfoService(messageBus, egressStore, ingressStore, sipStore, telemetryService) if err != nil { return nil, err } @@ -102,6 +103,12 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live return nil, err } ingressService := NewIngressService(ingressConfig, nodeID, messageBus, ingressClient, ingressStore, roomService, telemetryService) + sipConfig := getSIPConfig(conf) + sipClient, err := rpc.NewSIPClient(messageBus) + if err != nil { + return nil, err + } + sipService := NewSIPService(sipConfig, nodeID, messageBus, sipClient, sipStore, roomService, telemetryService) rtcService := NewRTCService(conf, roomAllocator, objectStore, router, currentNode, agentClient, telemetryService) agentService, err := NewAgentService(messageBus) if err != nil { @@ -123,7 +130,7 @@ func InitializeServer(conf *config.Config, currentNode routing.LocalNode) (*Live if err != nil { return nil, err } - livekitServer, err := NewLivekitServer(conf, roomService, egressService, ingressService, ioInfoService, rtcService, agentService, keyProvider, router, roomManager, signalServer, server, currentNode) + livekitServer, err := NewLivekitServer(conf, roomService, egressService, ingressService, sipService, ioInfoService, rtcService, agentService, keyProvider, router, roomManager, signalServer, server, currentNode) if err != nil { return nil, err } @@ -237,6 +244,19 @@ func getIngressConfig(conf *config.Config) *config.IngressConfig { return &conf.Ingress } +func getSIPStore(s ObjectStore) SIPStore { + switch store := s.(type) { + case *RedisStore: + return store + default: + return nil + } +} + +func getSIPConfig(conf *config.Config) *config.SIPConfig { + return &conf.SIP +} + func createClientConfiguration() clientconfiguration.ClientConfigurationManager { return clientconfiguration.NewStaticClientConfigurationManager(clientconfiguration.StaticConfigurations) } diff --git a/pkg/sfu/buffer/datastats.go b/pkg/sfu/buffer/datastats.go index 515341cc0..795f8a0f2 100644 --- a/pkg/sfu/buffer/datastats.go +++ b/pkg/sfu/buffer/datastats.go @@ -71,7 +71,7 @@ func (s *DataStats) ToProtoActive() *livekit.RTPStats { return &livekit.RTPStats{ StartTime: timestamppb.New(time.Unix(s.windowStart/1e9, s.windowStart%1e9)), - EndTime: timestamppb.New(time.Now()), + EndTime: timestamppb.New(time.Unix(0, now)), Duration: float64(duration / 1e9), Bytes: uint64(s.windowBytes), Bitrate: float64(s.windowBytes) * 8 / float64(duration) / 1e9, diff --git a/pkg/sfu/buffer/dependencydescriptorparser.go b/pkg/sfu/buffer/dependencydescriptorparser.go index 6c91af260..8cc648b7f 100644 --- a/pkg/sfu/buffer/dependencydescriptorparser.go +++ b/pkg/sfu/buffer/dependencydescriptorparser.go @@ -52,8 +52,8 @@ func NewDependencyDescriptorParser(ddExtID uint8, logger logger.Logger, onMaxLay ddExtID: ddExtID, logger: logger, onMaxLayerChanged: onMaxLayerChanged, - seqWrapAround: utils.NewWrapAround[uint16, uint64](), - frameWrapAround: utils.NewWrapAround[uint16, uint64](), + seqWrapAround: utils.NewWrapAround[uint16, uint64](utils.WrapAroundParams{IsRestartAllowed: false}), + frameWrapAround: utils.NewWrapAround[uint16, uint64](utils.WrapAroundParams{IsRestartAllowed: false}), frameChecker: NewFrameIntegrityChecker(180, 1024), // 2seconds for L3T3 30fps video } } diff --git a/pkg/sfu/buffer/rtpstats_receiver.go b/pkg/sfu/buffer/rtpstats_receiver.go index dad7c66ef..b621f42b1 100644 --- a/pkg/sfu/buffer/rtpstats_receiver.go +++ b/pkg/sfu/buffer/rtpstats_receiver.go @@ -60,8 +60,8 @@ type RTPStatsReceiver struct { func NewRTPStatsReceiver(params RTPStatsParams) *RTPStatsReceiver { return &RTPStatsReceiver{ rtpStatsBase: newRTPStatsBase(params), - sequenceNumber: utils.NewWrapAround[uint16, uint64](), - timestamp: utils.NewWrapAround[uint32, uint64](), + sequenceNumber: utils.NewWrapAround[uint16, uint64](utils.WrapAroundParams{IsRestartAllowed: false}), + timestamp: utils.NewWrapAround[uint32, uint64](utils.WrapAroundParams{IsRestartAllowed: false}), history: protoutils.NewBitmap[uint64](cHistorySize), } } @@ -123,36 +123,16 @@ func (r *RTPStatsReceiver) Update( ) } else { resSN = r.sequenceNumber.Update(sequenceNumber) + if resSN.IsUnhandled { + flowState.IsNotHandled = true + return + } resTS = r.timestamp.Update(timestamp) } pktSize := uint64(hdrSize + payloadSize + paddingSize) gapSN := int64(resSN.ExtendedVal - resSN.PreExtendedHighest) if gapSN <= 0 { // duplicate OR out-of-order - // before start, don't restart - if resTS.IsRestart { - r.logger.Infow( - "rolling back timestamp restart", - "tsBefore", resTS.PreExtendedStart, - "tsAfter", r.timestamp.GetExtendedStart(), - "snBefore", resSN.PreExtendedStart, - "snAfter", r.sequenceNumber.GetExtendedStart(), - ) - r.timestamp.RollbackRestart(resTS.PreExtendedStart) - } - if resSN.IsRestart { - r.logger.Infow( - "rolling back sequence number restart", - "snBefore", resSN.PreExtendedStart, - "snAfter", r.sequenceNumber.GetExtendedStart(), - "tsBefore", resTS.PreExtendedStart, - "tsAfter", r.timestamp.GetExtendedStart(), - ) - r.sequenceNumber.RollbackRestart(resSN.PreExtendedStart) - flowState.IsNotHandled = true - return - } - if -gapSN >= cNumSequenceNumbers/2 { r.logger.Warnw( "large sequence number gap negative", nil, @@ -179,36 +159,6 @@ func (r *RTPStatsReceiver) Update( r.packetsOutOfOrder++ } - if resSN.IsRestart { - r.packetsLost += resSN.PreExtendedStart - resSN.ExtendedVal - - extStartSN := r.sequenceNumber.GetExtendedStart() - for i := uint32(0); i < r.nextSnapshotID-cFirstSnapshotID; i++ { - s := &r.snapshots[i] - if s.extStartSN == resSN.PreExtendedStart { - s.extStartSN = extStartSN - } - } - - r.logger.Infow( - "adjusting start sequence number", - "snBefore", resSN.PreExtendedStart, - "snAfter", resSN.ExtendedVal, - "tsBefore", resTS.PreExtendedStart, - "tsAfter", resTS.ExtendedVal, - ) - } - - if resTS.IsRestart { - r.logger.Infow( - "adjusting start timestamp", - "tsBefore", resTS.PreExtendedStart, - "tsAfter", resTS.ExtendedVal, - "snBefore", resSN.PreExtendedStart, - "snAfter", resSN.ExtendedVal, - ) - } - if r.isInRange(resSN.ExtendedVal, resSN.PreExtendedHighest) { if r.history.IsSet(resSN.ExtendedVal) { r.bytesDuplicate += pktSize diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index f995a90dd..87b05b7e3 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -1033,7 +1033,10 @@ func (d *DownTrack) CloseWithFlush(flush bool) { d.bindLock.Unlock() d.connectionStats.Close() d.rtpStats.Stop() - d.params.Logger.Infow("rtp stats", "direction", "downstream", "mime", d.mime, "ssrc", d.ssrc, "stats", d.rtpStats.ToString()) + rtpStats := d.rtpStats.ToString() + if rtpStats != "" { + d.params.Logger.Infow("rtp stats", "direction", "downstream", "mime", d.mime, "ssrc", d.ssrc, "stats", rtpStats) + } d.maxLayerNotifierChMu.Lock() d.maxLayerNotifierChClosed = true diff --git a/pkg/sfu/utils/wraparound.go b/pkg/sfu/utils/wraparound.go index b04b331c2..e37d5ca27 100644 --- a/pkg/sfu/utils/wraparound.go +++ b/pkg/sfu/utils/wraparound.go @@ -26,7 +26,12 @@ type extendedNumber interface { uint32 | uint64 } +type WrapAroundParams struct { + IsRestartAllowed bool +} + type WrapAround[T number, ET extendedNumber] struct { + params WrapAroundParams fullRange ET initialized bool @@ -36,9 +41,10 @@ type WrapAround[T number, ET extendedNumber] struct { extendedHighest ET } -func NewWrapAround[T number, ET extendedNumber]() *WrapAround[T, ET] { +func NewWrapAround[T number, ET extendedNumber](params WrapAroundParams) *WrapAround[T, ET] { var t T return &WrapAround[T, ET]{ + params: params, fullRange: 1 << (unsafe.Sizeof(t) * 8), } } @@ -52,6 +58,7 @@ func (w *WrapAround[T, ET]) Seed(from *WrapAround[T, ET]) { } type WrapAroundUpdateResult[ET extendedNumber] struct { + IsUnhandled bool // when set, other fields are invalid IsRestart bool PreExtendedStart ET // valid only if IsRestart = true PreExtendedHighest ET @@ -140,20 +147,24 @@ func (w *WrapAround[T, ET]) maybeAdjustStart(val T) (result WrapAroundUpdateResu } if val-w.start > T(w.fullRange>>1) { - // out-of-order with existing start => a new start - result.IsRestart = true - if val > w.start { - result.PreExtendedStart = w.fullRange + ET(w.start) - } else { - result.PreExtendedStart = ET(w.start) - } + if w.params.IsRestartAllowed { + // out-of-order with existing start => a new start + result.IsRestart = true + if val > w.start { + result.PreExtendedStart = w.fullRange + ET(w.start) + } else { + result.PreExtendedStart = ET(w.start) + } - if w.isWrapBack(val, w.highest) { - w.cycles = w.fullRange - w.updateExtendedHighest() - cycles = 0 + if w.isWrapBack(val, w.highest) { + w.cycles = w.fullRange + w.updateExtendedHighest() + cycles = 0 + } + w.start = val + } else { + result.IsUnhandled = true } - w.start = val } else { if w.isWrapBack(val, w.highest) { cycles -= w.fullRange diff --git a/pkg/sfu/utils/wraparound_test.go b/pkg/sfu/utils/wraparound_test.go index 55d700650..018e227fc 100644 --- a/pkg/sfu/utils/wraparound_test.go +++ b/pkg/sfu/utils/wraparound_test.go @@ -21,7 +21,7 @@ import ( ) func TestWrapAroundUint16(t *testing.T) { - w := NewWrapAround[uint16, uint32]() + w := NewWrapAround[uint16, uint32](WrapAroundParams{IsRestartAllowed: true}) testCases := []struct { name string input uint16 @@ -194,8 +194,143 @@ func TestWrapAroundUint16(t *testing.T) { } } +func TestWrapAroundUint16NoRestart(t *testing.T) { + w := NewWrapAround[uint16, uint32](WrapAroundParams{IsRestartAllowed: false}) + testCases := []struct { + name string + input uint16 + updated WrapAroundUpdateResult[uint32] + start uint16 + extendedStart uint32 + highest uint16 + extendedHighest uint32 + }{ + // initialize + { + name: "initialize", + input: 10, + updated: WrapAroundUpdateResult[uint32]{ + IsRestart: false, + PreExtendedStart: 0, + PreExtendedHighest: 9, + ExtendedVal: 10, + }, + start: 10, + extendedStart: 10, + highest: 10, + extendedHighest: 10, + }, + // an older number without wrap around should not reset start point + { + name: "no reset start no wrap around", + input: 8, + updated: WrapAroundUpdateResult[uint32]{ + IsUnhandled: true, + // the following fields are not valid when `IsUnhandled = true`, but code fills it in + // and they are filled in here for testing purposes + PreExtendedHighest: 10, + ExtendedVal: 8, + }, + start: 10, + extendedStart: 10, + highest: 10, + extendedHighest: 10, + }, + // an older number with wrap around should not reset start point + { + name: "no reset start wrap around", + input: (1 << 16) - 6, + updated: WrapAroundUpdateResult[uint32]{ + IsUnhandled: true, + PreExtendedHighest: 10, + ExtendedVal: (1 << 16) - 6, + }, + start: 10, + extendedStart: 10, + highest: 10, + extendedHighest: 10, + }, + // yet another older number with wrap around should not reset start point + { + name: "no reset start again", + input: (1 << 16) - 12, + updated: WrapAroundUpdateResult[uint32]{ + IsUnhandled: true, + PreExtendedHighest: 10, + ExtendedVal: (1 << 16) - 12, + }, + start: 10, + extendedStart: 10, + highest: 10, + extendedHighest: 10, + }, + // duplicate should return same as highest + { + name: "duplicate", + input: 10, + updated: WrapAroundUpdateResult[uint32]{ + PreExtendedHighest: 10, + ExtendedVal: 10, + }, + start: 10, + extendedStart: 10, + highest: 10, + extendedHighest: 10, + }, + // a significant jump in order should move highest to that + { + name: "big in-order jump", + input: (1 << 15) - 10, + updated: WrapAroundUpdateResult[uint32]{ + PreExtendedHighest: 10, + ExtendedVal: (1 << 15) - 10, + }, + start: 10, + extendedStart: 10, + highest: (1 << 15) - 10, + extendedHighest: (1 << 15) - 10, + }, + // in-order, should update highest + { + name: "in-order", + input: (1 << 15) + 13, + updated: WrapAroundUpdateResult[uint32]{ + PreExtendedHighest: (1 << 15) - 10, + ExtendedVal: (1 << 15) + 13, + }, + start: 10, + extendedStart: 10, + highest: (1 << 15) + 13, + extendedHighest: (1 << 15) + 13, + }, + // now out-of-order should not reset start as half the range has been seen + { + name: "out-of-order after half range", + input: (1 << 15) - 11, + updated: WrapAroundUpdateResult[uint32]{ + PreExtendedHighest: (1 << 15) + 13, + ExtendedVal: (1 << 15) - 11, + }, + start: 10, + extendedStart: 10, + highest: (1 << 15) + 13, + extendedHighest: (1 << 15) + 13, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + require.Equal(t, tc.updated, w.Update(tc.input)) + require.Equal(t, tc.start, w.GetStart()) + require.Equal(t, tc.extendedStart, w.GetExtendedStart()) + require.Equal(t, tc.highest, w.GetHighest()) + require.Equal(t, tc.extendedHighest, w.GetExtendedHighest()) + }) + } +} + func TestWrapAroundUint16RollbackRestartAndResetHighest(t *testing.T) { - w := NewWrapAround[uint16, uint64]() + w := NewWrapAround[uint16, uint64](WrapAroundParams{IsRestartAllowed: true}) // initialize w.Update(23) @@ -268,7 +403,7 @@ func TestWrapAroundUint16RollbackRestartAndResetHighest(t *testing.T) { } func TestWrapAroundUint16WrapAroundRestartDuplicate(t *testing.T) { - w := NewWrapAround[uint16, uint64]() + w := NewWrapAround[uint16, uint64](WrapAroundParams{IsRestartAllowed: true}) // initialize w.Update(65534) @@ -314,7 +449,7 @@ func TestWrapAroundUint16WrapAroundRestartDuplicate(t *testing.T) { } func TestWrapAroundUint32(t *testing.T) { - w := NewWrapAround[uint32, uint64]() + w := NewWrapAround[uint32, uint64](WrapAroundParams{IsRestartAllowed: true}) testCases := []struct { name string input uint32 diff --git a/pkg/telemetry/signalanddatastats.go b/pkg/telemetry/signalanddatastats.go index 840f126c3..fb17e61e7 100644 --- a/pkg/telemetry/signalanddatastats.go +++ b/pkg/telemetry/signalanddatastats.go @@ -20,12 +20,11 @@ import ( "go.uber.org/atomic" + "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/utils" ) -const statsReportInterval = 10 * time.Second - type BytesTrackType string const ( @@ -35,11 +34,11 @@ const ( // stats for signal and data channel type BytesTrackStats struct { - trackID livekit.TrackID - pID livekit.ParticipantID - send, recv atomic.Uint64 - lastStatsReport atomic.Value // *time.Time - telemetry TelemetryService + trackID livekit.TrackID + pID livekit.ParticipantID + send, recv atomic.Uint64 + telemetry TelemetryService + isStopped atomic.Bool } func NewBytesTrackStats(trackID livekit.TrackID, pID livekit.ParticipantID, telemetry TelemetryService) *BytesTrackStats { @@ -48,8 +47,7 @@ func NewBytesTrackStats(trackID livekit.TrackID, pID livekit.ParticipantID, tele pID: pID, telemetry: telemetry, } - now := time.Now() - s.lastStatsReport.Store(&now) + go s.reporter() return s } @@ -59,29 +57,13 @@ func (s *BytesTrackStats) AddBytes(bytes uint64, isSend bool) { } else { s.recv.Add(bytes) } - - s.report(false) } -func (s *BytesTrackStats) Report() { - s.report(true) +func (s *BytesTrackStats) Stop() { + s.isStopped.Store(true) } -func (s *BytesTrackStats) report(force bool) { - now := time.Now() - if !force { - lr := s.lastStatsReport.Load().(*time.Time) - if time.Since(*lr) < statsReportInterval { - return - } - - if !s.lastStatsReport.CompareAndSwap(lr, &now) { - return - } - } else { - s.lastStatsReport.Store(&now) - } - +func (s *BytesTrackStats) report() { if recv := s.recv.Swap(0); recv > 0 { s.telemetry.TrackStats(StatsKeyForData(livekit.StreamType_UPSTREAM, s.pID, s.trackID), &livekit.AnalyticsStat{ Streams: []*livekit.AnalyticsStream{ @@ -99,6 +81,20 @@ func (s *BytesTrackStats) report(force bool) { } } +func (s *BytesTrackStats) reporter() { + ticker := time.NewTicker(config.TelemetryStatsUpdateInterval) + defer ticker.Stop() + + for !s.isStopped.Load() { + <-ticker.C + s.report() + } + + s.report() +} + +// ----------------------------------------------------------------------- + func BytesTrackIDForParticipantID(typ BytesTrackType, participantID livekit.ParticipantID) livekit.TrackID { return livekit.TrackID(fmt.Sprintf("%s_%s%s", utils.TrackPrefix, string(typ), participantID)) } diff --git a/pkg/telemetry/statsworker.go b/pkg/telemetry/statsworker.go index b86175368..d48a6d6d5 100644 --- a/pkg/telemetry/statsworker.go +++ b/pkg/telemetry/statsworker.go @@ -126,8 +126,6 @@ func (s *StatsWorker) ClosedAt() time.Time { return s.closedAt } -// ------------------------------------------------------------------------- - func (s *StatsWorker) collectStats( ts *timestamppb.Timestamp, streamType livekit.StreamType, @@ -151,6 +149,8 @@ func (s *StatsWorker) collectStats( return stats } +// ------------------------------------------------------------------------- + // create a single stream and single video layer post aggregation func coalesce(stats []*livekit.AnalyticsStat) *livekit.AnalyticsStat { if len(stats) == 0 { diff --git a/test/agent.go b/test/agent.go index 76f608314..65b4952b6 100644 --- a/test/agent.go +++ b/test/agent.go @@ -48,38 +48,38 @@ func newAgentClient(token string) (*agentClient, error) { }, nil } -func (c *agentClient) Run() error { +func (c *agentClient) Run(jobType livekit.JobType) (err error) { go c.read() workerID := utils.NewGuid("W_") - if err := c.write(&livekit.WorkerMessage{ - Message: &livekit.WorkerMessage_Register{ - Register: &livekit.RegisterWorkerRequest{ - Type: livekit.JobType_JT_ROOM, - WorkerId: workerID, - Version: "version", - Name: "name", + switch jobType { + case livekit.JobType_JT_ROOM: + err = c.write(&livekit.WorkerMessage{ + Message: &livekit.WorkerMessage_Register{ + Register: &livekit.RegisterWorkerRequest{ + Type: livekit.JobType_JT_ROOM, + WorkerId: workerID, + Version: "version", + Name: "name", + }, }, - }, - }); err != nil { - return err + }) + + case livekit.JobType_JT_PUBLISHER: + err = c.write(&livekit.WorkerMessage{ + Message: &livekit.WorkerMessage_Register{ + Register: &livekit.RegisterWorkerRequest{ + Type: livekit.JobType_JT_PUBLISHER, + WorkerId: workerID, + Version: "version", + Name: "name", + }, + }, + }) } - if err := c.write(&livekit.WorkerMessage{ - Message: &livekit.WorkerMessage_Register{ - Register: &livekit.RegisterWorkerRequest{ - Type: livekit.JobType_JT_PUBLISHER, - WorkerId: workerID, - Version: "version", - Name: "name", - }, - }, - }); err != nil { - return err - } - - return nil + return err } func (c *agentClient) read() { diff --git a/test/agent_test.go b/test/agent_test.go index 0a45fa9ae..a8825577b 100644 --- a/test/agent_test.go +++ b/test/agent_test.go @@ -21,6 +21,7 @@ import ( "github.com/stretchr/testify/require" "github.com/livekit/protocol/auth" + "github.com/livekit/protocol/livekit" ) func TestAgents(t *testing.T) { @@ -31,15 +32,25 @@ func TestAgents(t *testing.T) { require.NoError(t, err) ac2, err := newAgentClient(agentToken()) require.NoError(t, err) + ac3, err := newAgentClient(agentToken()) + require.NoError(t, err) + ac4, err := newAgentClient(agentToken()) + require.NoError(t, err) defer ac1.close() defer ac2.close() - ac1.Run() - ac2.Run() + defer ac3.close() + defer ac4.close() + ac1.Run(livekit.JobType_JT_ROOM) + ac2.Run(livekit.JobType_JT_ROOM) + ac3.Run(livekit.JobType_JT_PUBLISHER) + ac4.Run(livekit.JobType_JT_PUBLISHER) time.Sleep(time.Second * 3) - require.Equal(t, int32(2), ac1.registered.Load()) - require.Equal(t, int32(2), ac2.registered.Load()) + require.Equal(t, int32(1), ac1.registered.Load()) + require.Equal(t, int32(1), ac2.registered.Load()) + require.Equal(t, int32(1), ac3.registered.Load()) + require.Equal(t, int32(1), ac4.registered.Load()) c1 := createRTCClient("c1", defaultServerPort, nil) c2 := createRTCClient("c2", defaultServerPort, nil) @@ -56,7 +67,7 @@ func TestAgents(t *testing.T) { time.Sleep(time.Second * 3) require.Equal(t, int32(1), ac1.roomJobs.Load()+ac2.roomJobs.Load()) - require.Equal(t, int32(1), ac1.participantJobs.Load()+ac2.participantJobs.Load()) + require.Equal(t, int32(1), ac3.participantJobs.Load()+ac4.participantJobs.Load()) // publish 2 tracks t3, err := c2.AddStaticTrack("audio/opus", "audio", "webcam") @@ -69,7 +80,7 @@ func TestAgents(t *testing.T) { time.Sleep(time.Second * 3) require.Equal(t, int32(1), ac1.roomJobs.Load()+ac2.roomJobs.Load()) - require.Equal(t, int32(2), ac1.participantJobs.Load()+ac2.participantJobs.Load()) + require.Equal(t, int32(2), ac3.participantJobs.Load()+ac4.participantJobs.Load()) } func agentToken() string { diff --git a/version/version.go b/version/version.go index 41403002b..cbd85195e 100644 --- a/version/version.go +++ b/version/version.go @@ -14,4 +14,4 @@ package version -const Version = "1.5.0" +const Version = "1.5.1"