Dynamic group attachment for Brew subscribers (#19)

This commit is contained in:
misadeks
2026-02-23 17:03:21 +01:00
committed by GitHub
parent 18738a5946
commit cbfd50b79d
14 changed files with 615 additions and 337 deletions

315
Cargo.lock generated
View File

@@ -61,12 +61,6 @@ dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "anyhow"
version = "1.0.101"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f0e0fee31ef5ed1ba1316088939cea399010ed7731dba877ed44aeb407a75ea"
[[package]]
name = "arrayvec"
version = "0.7.6"
@@ -454,12 +448,6 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582"
[[package]]
name = "foldhash"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2"
[[package]]
name = "generic-array"
version = "0.14.7"
@@ -497,19 +485,6 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "getrandom"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "139ef39800118c7683f2fd3c98c1b23c09ae076556b435f8e9064ae108aaeeec"
dependencies = [
"cfg-if",
"libc",
"r-efi",
"wasip2",
"wasip3",
]
[[package]]
name = "git-version"
version = "0.3.9"
@@ -532,9 +507,9 @@ dependencies = [
[[package]]
name = "glam"
version = "0.32.0"
version = "0.31.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34627c5158214743a374170fed714833fdf4e4b0cbcc1ea98417866a4c5d4441"
checksum = "74a4d85559e2637d3d839438b5b3d75c31e655276f9544d72475c36b92fabbed"
[[package]]
name = "glob"
@@ -542,15 +517,6 @@ version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280"
[[package]]
name = "hashbrown"
version = "0.15.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1"
dependencies = [
"foldhash",
]
[[package]]
name = "hashbrown"
version = "0.16.1"
@@ -579,22 +545,14 @@ version = "1.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87"
[[package]]
name = "id-arena"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d3067d79b975e8844ca9eb072e16b31c3c1c36928edf9c6789548c524d0d954"
[[package]]
name = "indexmap"
version = "2.13.0"
version = "2.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7714e70437a7dc3ac8eb7e6f8df75fd8eb422675fc7678aff7364301092b1017"
checksum = "0ad4bb2b565bca0645f4d68c5c9af97fba094e9791da685bf83cb5f3ce74acf2"
dependencies = [
"equivalent",
"hashbrown 0.16.1",
"serde",
"serde_core",
"hashbrown",
]
[[package]]
@@ -653,17 +611,11 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
[[package]]
name = "leb128fmt"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
[[package]]
name = "libc"
version = "0.2.182"
version = "0.2.179"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6800badb6cb2082ffd7b6a67e6125bb39f18782f793520caee8cb8846be06112"
checksum = "c5a2d376baa530d1238d133232d15e239abad80d05838b4b59354e5268af431f"
[[package]]
name = "libloading"
@@ -710,9 +662,9 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
[[package]]
name = "memchr"
version = "2.8.0"
version = "2.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79"
checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273"
[[package]]
name = "minimal-lexical"
@@ -825,9 +777,9 @@ dependencies = [
[[package]]
name = "num-conv"
version = "0.2.0"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf97ec579c3c42f953ef76dbf8d55ac91fb219dde70e49aa4a6b7d74e9919050"
checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9"
[[package]]
name = "num-integer"
@@ -965,16 +917,6 @@ dependencies = [
"zerocopy",
]
[[package]]
name = "prettyplease"
version = "0.2.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b"
dependencies = [
"proc-macro2",
"syn",
]
[[package]]
name = "primal-check"
version = "0.3.4"
@@ -986,9 +928,9 @@ dependencies = [
[[package]]
name = "proc-macro2"
version = "1.0.106"
version = "1.0.105"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fd00f0bb2e90d81d1044c2b32617f68fcb9fa3bb7640c23e9c748e53fb30934"
checksum = "535d180e0ecab6268a3e718bb9fd44db66bbbc256257165fc699dadf70d16fe7"
dependencies = [
"unicode-ident",
]
@@ -1007,7 +949,7 @@ dependencies = [
"rustc-hash 2.1.1",
"rustls",
"socket2",
"thiserror 2.0.18",
"thiserror 2.0.17",
"tokio",
"tracing",
"web-time",
@@ -1030,7 +972,7 @@ dependencies = [
"rustls-pki-types",
"rustls-platform-verifier",
"slab",
"thiserror 2.0.18",
"thiserror 2.0.17",
"tinyvec",
"tracing",
"web-time",
@@ -1052,9 +994,9 @@ dependencies = [
[[package]]
name = "quote"
version = "1.0.44"
version = "1.0.43"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21b2ebcf727b7760c461f091f9f0f539b77b8e87f2fd88131e7f1b433b3cece4"
checksum = "dc74d9a594b72ae6656596548f56f667211f8a97b3d4c3d467150794690dc40a"
dependencies = [
"proc-macro2",
]
@@ -1083,7 +1025,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1"
dependencies = [
"rand_chacha 0.9.0",
"rand_core 0.9.5",
"rand_core 0.9.3",
]
[[package]]
@@ -1103,7 +1045,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb"
dependencies = [
"ppv-lite86",
"rand_core 0.9.5",
"rand_core 0.9.3",
]
[[package]]
@@ -1117,9 +1059,9 @@ dependencies = [
[[package]]
name = "rand_core"
version = "0.9.5"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76afc826de14238e6e8c374ddcc1fa19e374fd8dd986b0d2af0d02377261d83c"
checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38"
dependencies = [
"getrandom 0.3.4",
]
@@ -1139,9 +1081,9 @@ dependencies = [
[[package]]
name = "regex"
version = "1.12.3"
version = "1.12.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e10754a14b9137dd7b1e3e5b0493cc9171fdd105e0ab477f51b72e7f3ac0e276"
checksum = "843bc0191f75f3e22651ae5f1e72939ab2f72a4bc30fa80a066bd66edefc24d4"
dependencies = [
"aho-corasick",
"memchr",
@@ -1151,9 +1093,9 @@ dependencies = [
[[package]]
name = "regex-automata"
version = "0.4.14"
version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e1dd4122fc1595e8162618945476892eefca7b88c52820e74af6262213cae8f"
checksum = "5276caf25ac86c8d810222b3dbb938e512c55c6831a10f3e6ed1c93b84041f1c"
dependencies = [
"aho-corasick",
"memchr",
@@ -1162,9 +1104,9 @@ dependencies = [
[[package]]
name = "regex-syntax"
version = "0.8.9"
version = "0.8.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a96887878f22d7bad8a3b6dc5b7440e0ada9a245242924394987b21cf2210a4c"
checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58"
[[package]]
name = "ring"
@@ -1242,7 +1184,7 @@ dependencies = [
"openssl-probe 0.2.1",
"rustls-pki-types",
"schannel",
"security-framework 3.6.0",
"security-framework 3.5.1",
]
[[package]]
@@ -1279,7 +1221,7 @@ dependencies = [
"rustls-native-certs 0.8.3",
"rustls-platform-verifier-android",
"rustls-webpki",
"security-framework 3.6.0",
"security-framework 3.5.1",
"security-framework-sys",
"webpki-root-certs",
"windows-sys 0.61.2",
@@ -1341,9 +1283,9 @@ dependencies = [
[[package]]
name = "security-framework"
version = "3.6.0"
version = "3.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d17b898a6d6948c3a8ee4372c17cb384f90d2e6e912ef00895b14fd7ab54ec38"
checksum = "b3297343eaf830f66ede390ea39da1d462b6b0c1b000f420d0a83f898bbbe6ef"
dependencies = [
"bitflags",
"core-foundation 0.10.1",
@@ -1354,20 +1296,14 @@ dependencies = [
[[package]]
name = "security-framework-sys"
version = "2.16.0"
version = "2.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "321c8673b092a9a42605034a9879d73cb79101ed5fd117bc9a597b89b4e9e61a"
checksum = "cc1f0cbffaac4852523ce30d8bd3c5cdc873501d96ff467ca09b6767bb8cd5c0"
dependencies = [
"core-foundation-sys",
"libc",
]
[[package]]
name = "semver"
version = "1.0.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d767eb0aabc880b29956c35734170f26ed551a859dbd361d140cdbeca61ab1e2"
[[package]]
name = "serde"
version = "1.0.228"
@@ -1516,9 +1452,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
[[package]]
name = "syn"
version = "2.0.116"
version = "2.0.114"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3df424c70518695237746f84cede799c9c58fcb37450d7b23716568cc8bc69cb"
checksum = "d4d107df263a3013ef9b1879b0df87d706ff80f65a86ea879bd9c31f9b307c2a"
dependencies = [
"proc-macro2",
"quote",
@@ -1604,11 +1540,11 @@ dependencies = [
[[package]]
name = "thiserror"
version = "2.0.18"
version = "2.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4288b5bcbc7920c07a1149a35cf9590a2aa808e0bc1eafaade0b80947865fbc4"
checksum = "f63587ca0f12b72a0600bcba1d40081f830876000bb46dd2337a3051618f4fc8"
dependencies = [
"thiserror-impl 2.0.18",
"thiserror-impl 2.0.17",
]
[[package]]
@@ -1624,9 +1560,9 @@ dependencies = [
[[package]]
name = "thiserror-impl"
version = "2.0.18"
version = "2.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ebc4ee7f67670e9b64d05fa4253e753e016c6c95ff35b89b7941d6b856dec1d5"
checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913"
dependencies = [
"proc-macro2",
"quote",
@@ -1644,30 +1580,30 @@ dependencies = [
[[package]]
name = "time"
version = "0.3.47"
version = "0.3.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "743bd48c283afc0388f9b8827b976905fb217ad9e647fae3a379a9283c4def2c"
checksum = "91e7d9e3bb61134e77bde20dd4825b97c010155709965fedf0f49bb138e52a9d"
dependencies = [
"deranged",
"itoa",
"num-conv",
"powerfmt",
"serde_core",
"serde",
"time-core",
"time-macros",
]
[[package]]
name = "time-core"
version = "0.1.8"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca"
checksum = "40868e7c1d2f0b8d73e4a8c7f0ff63af4f6d19be117e90bd73eb1d62cf831c6b"
[[package]]
name = "time-macros"
version = "0.2.27"
version = "0.2.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e70e4c5a0e0a8a4823ad65dfe1a6930e4f4d756dcd9dd7939022b5e8c501215"
checksum = "30cfb0125f12d9c277f35663a0a33f8c30190f4e4574868a330595412d34ebf3"
dependencies = [
"num-conv",
"time-core",
@@ -1773,7 +1709,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "786d480bce6247ab75f005b14ae1624ad978d3029d9113f0a22fa1ac773faeaf"
dependencies = [
"crossbeam-channel",
"thiserror 2.0.18",
"thiserror 2.0.17",
"time",
"tracing-subscriber",
]
@@ -1868,9 +1804,9 @@ checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb"
[[package]]
name = "unicode-ident"
version = "1.0.24"
version = "1.0.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75"
checksum = "9312f7c4f6ff9069b165498234ce8be658059c6728633667c526e27dc2cf1df5"
[[package]]
name = "unicode-xid"
@@ -1898,11 +1834,11 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]]
name = "uuid"
version = "1.21.0"
version = "1.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b672338555252d43fd2240c714dc444b8c6fb0a5c5335e65a07bba7742735ddb"
checksum = "ee48d38b119b0cd71fe4141b30f5ba9c7c5d9f4e7a3a8b4a674e4b6ef789976f"
dependencies = [
"getrandom 0.4.1",
"getrandom 0.3.4",
"js-sys",
"wasm-bindgen",
]
@@ -1937,18 +1873,9 @@ checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b"
[[package]]
name = "wasip2"
version = "1.0.2+wasi-0.2.9"
version = "1.0.1+wasi-0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9517f9239f02c069db75e65f174b3da828fe5f5b945c4dd26bd25d89c03ebcf5"
dependencies = [
"wit-bindgen",
]
[[package]]
name = "wasip3"
version = "0.4.0+wasi-0.3.0-rc-2026-01-06"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5428f8bf88ea5ddc08faddef2ac4a67e390b88186c703ce6dbd955e1c145aca5"
checksum = "0562428422c63773dad2c345a1882263bbf4d65cf3f42e90921f787ef5ad58e7"
dependencies = [
"wit-bindgen",
]
@@ -1998,40 +1925,6 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "wasm-encoder"
version = "0.244.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "990065f2fe63003fe337b932cfb5e3b80e0b4d0f5ff650e6985b1048f62c8319"
dependencies = [
"leb128fmt",
"wasmparser",
]
[[package]]
name = "wasm-metadata"
version = "0.244.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb0e353e6a2fbdc176932bbaab493762eb1255a7900fe0fea1a2f96c296cc909"
dependencies = [
"anyhow",
"indexmap",
"wasm-encoder",
"wasmparser",
]
[[package]]
name = "wasmparser"
version = "0.244.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe"
dependencies = [
"bitflags",
"hashbrown 0.15.5",
"indexmap",
"semver",
]
[[package]]
name = "web-time"
version = "1.1.0"
@@ -2299,91 +2192,9 @@ dependencies = [
[[package]]
name = "wit-bindgen"
version = "0.51.0"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7249219f66ced02969388cf2bb044a09756a083d0fab1e566056b04d9fbcaa5"
dependencies = [
"wit-bindgen-rust-macro",
]
[[package]]
name = "wit-bindgen-core"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea61de684c3ea68cb082b7a88508a8b27fcc8b797d738bfc99a82facf1d752dc"
dependencies = [
"anyhow",
"heck",
"wit-parser",
]
[[package]]
name = "wit-bindgen-rust"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21"
dependencies = [
"anyhow",
"heck",
"indexmap",
"prettyplease",
"syn",
"wasm-metadata",
"wit-bindgen-core",
"wit-component",
]
[[package]]
name = "wit-bindgen-rust-macro"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c0f9bfd77e6a48eccf51359e3ae77140a7f50b1e2ebfe62422d8afdaffab17a"
dependencies = [
"anyhow",
"prettyplease",
"proc-macro2",
"quote",
"syn",
"wit-bindgen-core",
"wit-bindgen-rust",
]
[[package]]
name = "wit-component"
version = "0.244.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2"
dependencies = [
"anyhow",
"bitflags",
"indexmap",
"log",
"serde",
"serde_derive",
"serde_json",
"wasm-encoder",
"wasm-metadata",
"wasmparser",
"wit-parser",
]
[[package]]
name = "wit-parser"
version = "0.244.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ecc8ac4bc1dc3381b7f59c34f00b67e18f910c2c0f50015669dde7def656a736"
dependencies = [
"anyhow",
"id-arena",
"indexmap",
"log",
"semver",
"serde",
"serde_derive",
"serde_json",
"unicode-xid",
"wasmparser",
]
checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59"
[[package]]
name = "yasna"
@@ -2396,18 +2207,18 @@ dependencies = [
[[package]]
name = "zerocopy"
version = "0.8.39"
version = "0.8.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db6d35d663eadb6c932438e763b262fe1a70987f9ae936e60158176d710cae4a"
checksum = "1fabae64378cb18147bb18bca364e63bdbe72a0ffe4adf0addfec8aa166b2c56"
dependencies = [
"zerocopy-derive",
]
[[package]]
name = "zerocopy-derive"
version = "0.8.39"
version = "0.8.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4122cd3169e94605190e77839c9a40d40ed048d305bfdc146e7df40ab0f3e517"
checksum = "c9c2d862265a8bb4471d87e033e730f536e2a285cc7cb05dbce09a2a97075f90"
dependencies = [
"proc-macro2",
"quote",
@@ -2422,6 +2233,6 @@ checksum = "b97154e67e32c85465826e8bcc1c59429aaaf107c1e4a9e53c8d8ccd5eff88d0"
[[package]]
name = "zmij"
version = "1.0.21"
version = "1.0.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8848ee67ecc8aedbaf3e4122217aff892639231befc6a1b58d29fff4c2cabaa"
checksum = "3ff05f8caa9038894637571ae6b9e29466c1f4f829d26c9b28f869a29cbe3445"

View File

@@ -72,7 +72,6 @@ fn build_bs_stack(cfg: &mut SharedConfig) -> MessageRouter {
username: brew_cfg.username,
password: brew_cfg.password,
issi: brew_cfg.issi,
groups: brew_cfg.groups,
reconnect_delay: Duration::from_secs(brew_cfg.reconnect_delay_secs),
jitter_initial_latency_frames: brew_cfg.jitter_initial_latency_frames,
};

View File

@@ -18,8 +18,6 @@ pub struct CfgBrew {
pub password: Option<String>,
/// ISSI to register with the TetraPack server
pub issi: u32,
/// GSSIs (group IDs) to affiliate to
pub groups: Vec<u32>,
/// Reconnection delay in seconds
pub reconnect_delay_secs: u64,
/// Extra initial jitter playout delay in frames (added on top of adaptive baseline)
@@ -41,8 +39,6 @@ pub struct CfgBrewDto {
pub password: String,
/// ISSI to register with the TetraPack server
pub issi: u32,
/// GSSIs (group IDs) to affiliate to
pub groups: Vec<u32>,
/// Reconnection delay in seconds
#[serde(default = "default_brew_reconnect_delay")]
pub reconnect_delay_secs: u64,
@@ -71,7 +67,6 @@ pub fn apply_brew_patch(src: CfgBrewDto) -> CfgBrew {
username: Some(src.username.to_string()),
password: Some(src.password),
issi: src.issi,
groups: src.groups,
reconnect_delay_secs: src.reconnect_delay_secs,
jitter_initial_latency_frames: src.jitter_initial_latency_frames,
}

View File

@@ -1,22 +1,22 @@
//! Brew protocol entity bridging TetraPack WebSocket to UMAC/MLE with hangtime-based circuit reuse
use std::collections::{HashMap, VecDeque};
use std::collections::{HashMap, HashSet, VecDeque};
use std::thread;
use std::time::{Duration, Instant};
use crossbeam_channel::{Receiver, Sender, unbounded};
use uuid::Uuid;
use crate::{MessageQueue, TetraEntityTrait};
use tetra_config::SharedConfig;
use tetra_core::{Sap, TdmaTime, tetra_entities::TetraEntity};
use tetra_saps::control::brew::{BrewSubscriberAction, BrewSubscriberUpdate};
use tetra_saps::{SapMsg, SapMsgInner, control::call_control::CallControl, tmd::TmdCircuitDataReq};
use crate::{MessageQueue, TetraEntityTrait};
use super::worker::{BrewCommand, BrewConfig, BrewEvent, BrewWorker};
/// Hangtime before releasing group call circuit to allow reuse without re-signaling.
const GROUP_CALL_HANGTIME: Duration = Duration::from_secs(5);
const GROUP_CALL_HANGTIME: Duration = Duration::from_secs(1);
/// Minimum playout buffer depth in frames.
const BREW_JITTER_MIN_FRAMES: usize = 2;
/// Default playout buffer depth in frames.
@@ -248,6 +248,9 @@ pub struct BrewEntity {
/// UL calls being forwarded to TetraPack, keyed by timeslot
ul_forwarded: HashMap<u8, UlForwardedCall>,
/// Registered subscriber groups (ISSI -> set of GSSIs)
subscriber_groups: HashMap<u32, HashSet<u32>>,
/// Whether the worker is connected
connected: bool,
@@ -286,6 +289,9 @@ impl BrewEntity {
dl_jitter: HashMap::new(),
hanging_calls: HashMap::new(),
ul_forwarded: HashMap::new(),
subscriber_groups: HashMap::new(),
// next_call_id: 100, // Start at 100 to avoid collision with CMCE
// next_usage: 10, // Start at 10 to avoid collision
connected: false,
worker_handle: Some(handle),
}
@@ -297,6 +303,8 @@ impl BrewEntity {
match event {
BrewEvent::Connected => {
tracing::info!("BrewEntity: connected to TetraPack server");
self.connected = true;
self.resync_subscribers();
self.set_network_connected(true);
}
BrewEvent::Disconnected(reason) => {
@@ -331,6 +339,107 @@ impl BrewEntity {
}
}
fn handle_subscriber_update(&mut self, update: BrewSubscriberUpdate) {
let issi = update.issi;
let groups = update.groups;
match update.action {
BrewSubscriberAction::Register => {
let known = self.subscriber_groups.contains_key(&issi);
self.subscriber_groups.entry(issi).or_insert_with(HashSet::new);
tracing::info!("BrewEntity: subscriber register issi={} known={}", issi, known);
let _ = self.command_sender.send(BrewCommand::RegisterSubscriber { issi });
}
BrewSubscriberAction::Deregister => {
let mut removed_groups = Vec::new();
if let Some(existing) = self.subscriber_groups.remove(&issi) {
for gssi in existing {
removed_groups.push(gssi);
}
}
if !removed_groups.is_empty() {
tracing::info!(
"BrewEntity: subscriber deaffiliate on deregister issi={} groups={:?}",
issi,
removed_groups
);
let _ = self.command_sender.send(BrewCommand::DeaffiliateGroups {
issi,
groups: removed_groups,
});
}
tracing::info!("BrewEntity: subscriber deregister issi={}", issi);
let _ = self.command_sender.send(BrewCommand::DeregisterSubscriber { issi });
}
BrewSubscriberAction::Affiliate => {
let is_new = !self.subscriber_groups.contains_key(&issi);
let mut new_groups = Vec::new();
{
let entry = self.subscriber_groups.entry(issi).or_insert_with(HashSet::new);
for gssi in groups {
if entry.insert(gssi) {
new_groups.push(gssi);
}
}
}
if is_new {
tracing::info!("BrewEntity: affiliate from unknown issi={}, sending register", issi);
let _ = self.command_sender.send(BrewCommand::RegisterSubscriber { issi });
}
if new_groups.is_empty() {
tracing::debug!("BrewEntity: affiliate ignored (no new groups) issi={}", issi);
} else {
tracing::info!("BrewEntity: subscriber affiliate issi={} groups={:?}", issi, new_groups);
let _ = self.command_sender.send(BrewCommand::AffiliateGroups { issi, groups: new_groups });
}
}
BrewSubscriberAction::Deaffiliate => {
let mut removed_groups = Vec::new();
if let Some(entry) = self.subscriber_groups.get_mut(&issi) {
for gssi in groups {
if entry.remove(&gssi) {
removed_groups.push(gssi);
}
}
} else {
removed_groups = groups;
}
if removed_groups.is_empty() {
tracing::debug!("BrewEntity: deaffiliate ignored (no matching groups) issi={}", issi);
} else {
tracing::info!("BrewEntity: subscriber deaffiliate issi={} groups={:?}", issi, removed_groups);
let _ = self.command_sender.send(BrewCommand::DeaffiliateGroups {
issi,
groups: removed_groups,
});
}
}
}
}
fn resync_subscribers(&self) {
if self.subscriber_groups.is_empty() {
tracing::debug!("BrewEntity: no subscribers to resync");
return;
}
tracing::info!("BrewEntity: resyncing {} subscribers to TetraPack", self.subscriber_groups.len());
for (issi, groups) in &self.subscriber_groups {
let _ = self.command_sender.send(BrewCommand::RegisterSubscriber { issi: *issi });
if !groups.is_empty() {
let gssi_list: Vec<u32> = groups.iter().copied().collect();
let _ = self.command_sender.send(BrewCommand::AffiliateGroups {
issi: *issi,
groups: gssi_list,
});
}
}
}
fn set_network_connected(&mut self, connected: bool) {
self.connected = connected;
let mut state = self.config.state_write();
@@ -496,12 +605,10 @@ impl BrewEntity {
/// Clean up expired hanging call tracking hints (CMCE already released circuits)
fn expire_hanging_calls(&mut self, _queue: &mut MessageQueue) {
const HANGTIME_SECS: u64 = 5;
let expired: Vec<u32> = self
.hanging_calls
.iter()
.filter(|(_, h)| h.since.elapsed().as_secs() > HANGTIME_SECS)
.filter(|(_, h)| h.since.elapsed() >= GROUP_CALL_HANGTIME)
.map(|(gssi, _)| *gssi)
.collect();
@@ -644,6 +751,30 @@ impl BrewEntity {
tracing::warn!("BrewEntity: NetworkCallReady for unknown uuid={}", brew_uuid);
}
}
fn drop_network_call(&mut self, brew_uuid: Uuid) {
if let Some(call) = self.active_calls.remove(&brew_uuid) {
tracing::info!(
"BrewEntity: dropping network call uuid={} gssi={} (CMCE request)",
brew_uuid,
call.dest_gssi
);
self.dl_jitter.remove(&brew_uuid);
self.hanging_calls.remove(&call.dest_gssi);
return;
}
let hanging_gssi = self
.hanging_calls
.iter()
.find_map(|(gssi, hanging)| if hanging.uuid == brew_uuid { Some(*gssi) } else { None });
if let Some(gssi) = hanging_gssi {
tracing::info!("BrewEntity: dropping hanging call uuid={} gssi={} (CMCE request)", brew_uuid, gssi);
self.hanging_calls.remove(&gssi);
} else {
tracing::debug!("BrewEntity: drop requested for unknown uuid={}", brew_uuid);
}
}
}
// ─── TetraEntityTrait implementation ──────────────────────────────
@@ -688,6 +819,9 @@ impl TetraEntityTrait for BrewEntity {
SapMsgInner::CmceCallControl(CallControl::CallEnded { call_id, ts }) => {
self.handle_local_call_end(call_id, ts);
}
SapMsgInner::CmceCallControl(CallControl::NetworkCallEnd { brew_uuid }) => {
self.drop_network_call(brew_uuid);
}
SapMsgInner::CmceCallControl(CallControl::NetworkCallReady {
brew_uuid,
call_id,
@@ -696,6 +830,9 @@ impl TetraEntityTrait for BrewEntity {
}) => {
self.rx_network_call_ready(brew_uuid, call_id, ts, usage);
}
SapMsgInner::BrewSubscriberUpdate(update) => {
self.handle_subscriber_update(update);
}
_ => {
tracing::debug!("BrewEntity: unexpected rx_prim from {:?} on {:?}", message.src, message.sap);
}
@@ -713,16 +850,43 @@ impl BrewEntity {
tracing::trace!("BrewEntity: not connected, ignoring local call start");
return;
}
// TODO: Check if local
// if dest_gssi == 9 {
// tracing::debug!(
// "BrewEntity: suppressing local call forwarding for TG 9 (call_id={} src={} ts={})",
// call_id,
// source_issi,
// ts
// );
// return;
// }
// Check if this group is subscribed in Brew config
// let groups = &self.config.config().brew.groups;
let groups = &self.brew_config.groups;
if !groups.contains(&dest_gssi) {
tracing::debug!(
"BrewEntity: local call on GSSI {} not subscribed (subscribed: {:?}), not forwarding",
// If we're already forwarding on this timeslot, treat as a talker change/update
if let Some(fwd) = self.ul_forwarded.get_mut(&ts) {
if fwd.call_id != call_id || fwd.dest_gssi != dest_gssi {
tracing::warn!(
"BrewEntity: updating forwarded call on ts={} (was call_id={} gssi={}) -> (call_id={} gssi={})",
ts,
fwd.call_id,
fwd.dest_gssi,
call_id,
dest_gssi
);
}
fwd.call_id = call_id;
fwd.source_issi = source_issi;
fwd.dest_gssi = dest_gssi;
fwd.frame_count = 0;
// Send GROUP_TX update for the new talker
let _ = self.command_sender.send(BrewCommand::SendGroupTx {
uuid: fwd.uuid,
source_issi,
dest_gssi,
groups
);
priority: 0,
service: 0, // TETRA encoded speech
});
return;
}

View File

@@ -1,5 +1,6 @@
//! Brew WebSocket worker thread handling HTTP Digest Auth, TLS, and bidirectional Brew message exchange
use std::collections::{HashMap, HashSet};
use std::io::{Read, Write};
use std::net::{TcpStream, ToSocketAddrs};
use std::sync::Arc;
@@ -53,9 +54,15 @@ pub enum BrewCommand {
/// Register a subscriber (ISSI)
RegisterSubscriber { issi: u32 },
/// Deregister a subscriber (ISSI)
DeregisterSubscriber { issi: u32 },
/// Affiliate subscriber to groups
AffiliateGroups { issi: u32, groups: Vec<u32> },
/// Deaffiliate subscriber from groups
DeaffiliateGroups { issi: u32, groups: Vec<u32> },
/// Send GROUP_TX to TetraPack (local radio started transmitting on subscribed group)
SendGroupTx {
uuid: Uuid,
@@ -91,8 +98,6 @@ pub struct BrewConfig {
pub password: Option<String>,
/// ISSI to register with the server
pub issi: u32,
/// GSSIs (group IDs) to affiliate to
pub groups: Vec<u32>,
/// Reconnection delay
pub reconnect_delay: Duration,
/// Extra initial jitter playout delay in frames (added on top of adaptive baseline)
@@ -250,6 +255,8 @@ pub struct BrewWorker {
event_sender: Sender<BrewEvent>,
/// Receive commands from the BrewEntity
command_receiver: Receiver<BrewCommand>,
/// Registered subscribers and their affiliated groups (tracked from commands)
subscriber_groups: HashMap<u32, HashSet<u32>>,
}
impl BrewWorker {
@@ -258,6 +265,7 @@ impl BrewWorker {
config,
event_sender,
command_receiver,
subscriber_groups: HashMap::new(),
}
}
@@ -463,7 +471,7 @@ impl BrewWorker {
_ => {}
}
// Step 3: Register subscriber and affiliate to groups
// Step 3: Register subscriber
self.send_registration(&mut ws)?;
// Step 4: Main message loop
@@ -471,39 +479,37 @@ impl BrewWorker {
}
/// Send initial registration and group affiliation
fn send_registration(&self, ws: &mut WebSocket<MaybeTlsStream<TcpStream>>) -> Result<(), String> {
fn send_registration(&mut self, ws: &mut WebSocket<MaybeTlsStream<TcpStream>>) -> Result<(), String> {
// Register ISSI
let reg_msg = build_subscriber_register(self.config.issi, &self.config.groups);
let reg_msg = build_subscriber_register(self.config.issi, &[]);
ws.send(Message::Binary(reg_msg.into()))
.map_err(|e| format!("failed to send registration: {}", e))?;
tracing::info!("BrewWorker: registered ISSI {}", self.config.issi);
// Affiliate to groups
if !self.config.groups.is_empty() {
let aff_msg = build_subscriber_affiliate(self.config.issi, &self.config.groups);
ws.send(Message::Binary(aff_msg.into()))
.map_err(|e| format!("failed to send affiliation: {}", e))?;
tracing::info!("BrewWorker: affiliated to groups {:?}", self.config.groups);
}
self.subscriber_groups.entry(self.config.issi).or_insert_with(HashSet::new);
Ok(())
}
/// Graceful teardown: DEAFFILIATE → DEREGISTER → WS close
fn graceful_teardown(&self, ws: &mut WebSocket<MaybeTlsStream<TcpStream>>) {
if !self.config.groups.is_empty() {
let deaff_msg = build_subscriber_deaffiliate(self.config.issi, &self.config.groups);
if let Err(e) = ws.send(Message::Binary(deaff_msg.into())) {
tracing::error!("BrewWorker: failed to send deaffiliation: {}", e);
} else {
tracing::info!("BrewWorker: deaffiliated from groups {:?}", self.config.groups);
for (issi, groups) in &self.subscriber_groups {
if !groups.is_empty() {
let mut group_list: Vec<u32> = groups.iter().copied().collect();
group_list.sort_unstable();
let deaff_msg = build_subscriber_deaffiliate(*issi, &group_list);
if let Err(e) = ws.send(Message::Binary(deaff_msg.into())) {
tracing::error!("BrewWorker: failed to send deaffiliation: {}", e);
} else {
tracing::info!("BrewWorker: deaffiliated issi={} groups={:?}", issi, group_list);
}
}
let dereg_msg = build_subscriber_deregister(*issi);
if let Err(e) = ws.send(Message::Binary(dereg_msg.into())) {
tracing::error!("BrewWorker: failed to send deregistration: {}", e);
} else {
tracing::info!("BrewWorker: deregistered ISSI {}", issi);
}
}
let dereg_msg = build_subscriber_deregister(self.config.issi);
if let Err(e) = ws.send(Message::Binary(dereg_msg.into())) {
tracing::error!("BrewWorker: failed to send deregistration: {}", e);
} else {
tracing::info!("BrewWorker: deregistered ISSI {}", self.config.issi);
}
let _ = ws.close(None);
}
@@ -595,15 +601,46 @@ impl BrewWorker {
};
match cmd {
BrewCommand::RegisterSubscriber { issi } => {
self.subscriber_groups.entry(issi).or_insert_with(HashSet::new);
let msg = build_subscriber_register(issi, &[]);
if let Err(e) = ws.send(Message::Binary(msg.into())) {
tracing::error!("BrewWorker: failed to send registration: {}", e);
} else {
tracing::debug!("BrewWorker: sent REGISTER issi={}", issi);
}
}
BrewCommand::DeregisterSubscriber { issi } => {
self.subscriber_groups.remove(&issi);
let msg = build_subscriber_deregister(issi);
if let Err(e) = ws.send(Message::Binary(msg.into())) {
tracing::error!("BrewWorker: failed to send deregistration: {}", e);
} else {
tracing::debug!("BrewWorker: sent DEREGISTER issi={}", issi);
}
}
BrewCommand::AffiliateGroups { issi, groups } => {
let entry = self.subscriber_groups.entry(issi).or_insert_with(HashSet::new);
for gssi in &groups {
entry.insert(*gssi);
}
let msg = build_subscriber_affiliate(issi, &groups);
if let Err(e) = ws.send(Message::Binary(msg.into())) {
tracing::error!("BrewWorker: failed to send affiliation: {}", e);
} else {
tracing::debug!("BrewWorker: sent AFFILIATE issi={} groups={:?}", issi, groups);
}
}
BrewCommand::DeaffiliateGroups { issi, groups } => {
if let Some(entry) = self.subscriber_groups.get_mut(&issi) {
for gssi in &groups {
entry.remove(gssi);
}
}
let msg = build_subscriber_deaffiliate(issi, &groups);
if let Err(e) = ws.send(Message::Binary(msg.into())) {
tracing::error!("BrewWorker: failed to send deaffiliation: {}", e);
} else {
tracing::debug!("BrewWorker: sent DEAFFILIATE issi={} groups={:?}", issi, groups);
}
}
BrewCommand::SendGroupTx {

View File

@@ -109,6 +109,9 @@ impl TetraEntityTrait for CmceBs {
SapMsgInner::CmceCallControl(_) => {
self.cc.rx_call_control(queue, message);
}
SapMsgInner::BrewSubscriberUpdate(update) => {
self.cc.handle_subscriber_update(queue, update);
}
_ => {
panic!("Unexpected control message: {:?}", message.msg);
}

View File

@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use tetra_config::SharedConfig;
use tetra_core::TimeslotOwner;
@@ -18,6 +18,7 @@ use tetra_pdus::cmce::{
use tetra_saps::{
SapMsg, SapMsgInner,
control::{
brew::{BrewSubscriberAction, BrewSubscriberUpdate},
call_control::{CallControl, Circuit},
enums::{circuit_mode_type::CircuitModeType, communication_type::CommunicationType},
},
@@ -42,6 +43,10 @@ pub struct CcBsSubentity {
circuits: CircuitMgr,
/// Active group calls: call_id -> call info
active_calls: HashMap<u16, ActiveCall>,
/// Registered subscriber groups (ISSI -> set of GSSIs)
subscriber_groups: HashMap<u32, HashSet<u32>>,
/// Listener counts per GSSI
group_listeners: HashMap<u32, usize>,
}
/// Origin of a group call
@@ -82,6 +87,8 @@ impl CcBsSubentity {
cached_setups: HashMap::new(),
circuits: CircuitMgr::new(),
active_calls: HashMap::new(),
subscriber_groups: HashMap::new(),
group_listeners: HashMap::new(),
}
}
@@ -242,6 +249,124 @@ impl CcBsSubentity {
sdu
}
fn has_listener(&self, gssi: u32) -> bool {
self.group_listeners.get(&gssi).copied().unwrap_or(0) > 0
}
fn inc_group_listener(&mut self, gssi: u32) {
let entry = self.group_listeners.entry(gssi).or_insert(0);
*entry += 1;
}
fn dec_group_listener(&mut self, gssi: u32) {
if let Some(entry) = self.group_listeners.get_mut(&gssi) {
if *entry <= 1 {
self.group_listeners.remove(&gssi);
} else {
*entry -= 1;
}
}
}
fn drop_group_calls_if_unlistened(&mut self, queue: &mut MessageQueue, gssi: u32) {
if self.has_listener(gssi) {
return;
}
let to_drop: Vec<(u16, CallOrigin)> = self
.active_calls
.iter()
.filter(|(_, call)| call.dest_gssi == gssi)
.map(|(call_id, call)| (*call_id, call.origin.clone()))
.collect();
for (call_id, origin) in to_drop {
tracing::info!("CMCE: dropping call_id={} gssi={} (no listeners)", call_id, gssi);
if let CallOrigin::Network { brew_uuid } = origin {
if self.config.config().brew.is_some() {
queue.push_back(SapMsg {
sap: Sap::Control,
src: TetraEntity::Cmce,
dest: TetraEntity::Brew,
dltime: self.dltime,
msg: SapMsgInner::CmceCallControl(CallControl::NetworkCallEnd { brew_uuid }),
});
}
}
self.release_call(queue, call_id);
}
}
pub fn handle_subscriber_update(&mut self, queue: &mut MessageQueue, update: BrewSubscriberUpdate) {
let issi = update.issi;
let groups = update.groups;
match update.action {
BrewSubscriberAction::Register => {
let known = self.subscriber_groups.contains_key(&issi);
self.subscriber_groups.entry(issi).or_insert_with(HashSet::new);
tracing::info!("CMCE: subscriber register issi={} known={}", issi, known);
}
BrewSubscriberAction::Deregister => {
if let Some(existing) = self.subscriber_groups.remove(&issi) {
for gssi in existing {
self.dec_group_listener(gssi);
self.drop_group_calls_if_unlistened(queue, gssi);
}
}
tracing::info!("CMCE: subscriber deregister issi={}", issi);
}
BrewSubscriberAction::Affiliate => {
let mut new_groups = Vec::new();
{
let entry = self.subscriber_groups.entry(issi).or_insert_with(HashSet::new);
for gssi in groups {
if entry.insert(gssi) {
new_groups.push(gssi);
}
}
}
for gssi in &new_groups {
self.inc_group_listener(*gssi);
}
if new_groups.is_empty() {
tracing::debug!("CMCE: affiliate ignored (no new groups) issi={}", issi);
} else {
tracing::info!("CMCE: subscriber affiliate issi={} groups={:?}", issi, new_groups);
}
}
BrewSubscriberAction::Deaffiliate => {
let mut removed_groups = Vec::new();
let mut known_issi = false;
if let Some(entry) = self.subscriber_groups.get_mut(&issi) {
known_issi = true;
for gssi in groups {
if entry.remove(&gssi) {
removed_groups.push(gssi);
}
}
} else {
removed_groups = groups;
}
if known_issi {
for gssi in &removed_groups {
self.dec_group_listener(*gssi);
}
}
if removed_groups.is_empty() {
tracing::debug!("CMCE: deaffiliate ignored (no matching groups) issi={}", issi);
} else {
tracing::info!("CMCE: subscriber deaffiliate issi={} groups={:?}", issi, removed_groups);
for gssi in &removed_groups {
self.drop_group_calls_if_unlistened(queue, *gssi);
}
}
}
}
}
fn send_d_call_proceeding(&mut self, queue: &mut MessageQueue, message: &SapMsg, pdu_request: &USetup, call_id: u16) {
tracing::trace!("send_d_call_proceeding");
@@ -469,6 +594,15 @@ impl CcBsSubentity {
let dest_gssi = dest_gssi as u32;
let dest_addr = TetraAddress::new(dest_gssi, SsiType::Gssi);
if !self.has_listener(dest_gssi) {
tracing::info!(
"CMCE: rejecting U-SETUP from issi={} to gssi={} (no listeners)",
calling_party.ssi,
dest_gssi
);
return;
}
// Allocate circuit (DL+UL for group call)
let circuit = match {
let mut state = self.config.state_write();
@@ -577,7 +711,7 @@ impl CcBsSubentity {
simplex_duplex_selection: pdu.simplex_duplex_selection,
basic_service_information: pdu.basic_service_information.clone(),
transmission_grant: TransmissionGrant::GrantedToOtherUser,
transmission_request_permission: false,
transmission_request_permission: true,
call_priority: pdu.call_priority,
notification_indicator: None,
temporary_address: None,
@@ -727,8 +861,8 @@ impl CcBsSubentity {
/// Check if any active calls in hangtime have expired, and if so, release them
fn check_hangtime_expiry(&mut self, queue: &mut MessageQueue) {
// Hangtime: ~5 seconds = 5 * 18 * 4 = 360 frames (approximately)
const HANGTIME_FRAMES: i32 = 5 * 18 * 4;
// Hangtime: ~1 second = 1 * 18 * 4 = 72 frames (approximately)
const HANGTIME_FRAMES: i32 = 1 * 18 * 4;
let expired: Vec<u16> = self
.active_calls
@@ -1096,6 +1230,25 @@ impl CcBsSubentity {
/// Handle network-initiated group call start
fn rx_network_call_start(&mut self, queue: &mut MessageQueue, brew_uuid: uuid::Uuid, source_issi: u32, dest_gssi: u32, _priority: u8) {
if !self.has_listener(dest_gssi) {
tracing::info!(
"CMCE: ignoring network call start uuid={} gssi={} (no listeners)",
brew_uuid,
dest_gssi
);
self.drop_group_calls_if_unlistened(queue, dest_gssi);
if self.config.config().brew.is_some() {
queue.push_back(SapMsg {
sap: Sap::Control,
src: TetraEntity::Cmce,
dest: TetraEntity::Brew,
dltime: self.dltime,
msg: SapMsgInner::CmceCallControl(CallControl::NetworkCallEnd { brew_uuid }),
});
}
return;
}
// Check if there's an active call for this GSSI (speaker change scenario)
if let Some((call_id, call)) = self.active_calls.iter_mut().find(|(_, c)| c.dest_gssi == dest_gssi) {
// Speaker change during active or hangtime

View File

@@ -67,6 +67,15 @@ impl MmClientMgr {
self.clients.contains_key(&issi)
}
pub fn set_client_state(&mut self, issi: u32, state: MmClientState) -> Result<(), ClientMgrErr> {
if let Some(client) = self.clients.get_mut(&issi) {
client.state = state;
Ok(())
} else {
Err(ClientMgrErr::ClientNotFound { issi })
}
}
/// Registers a fresh state for a client, based on ssi
/// If client is already registered, previous state is discarded.
pub fn try_register_client(&mut self, issi: u32, attached: bool) -> Result<bool, ClientMgrErr> {

View File

@@ -1,11 +1,12 @@
use crate::{MessageQueue, TetraEntityTrait};
use tetra_config::SharedConfig;
use tetra_core::tetra_entities::TetraEntity;
use tetra_core::{BitBuffer, Sap, SsiType, TetraAddress, assert_warn, unimplemented_log};
use tetra_core::{BitBuffer, Sap, SsiType, TdmaTime, TetraAddress, assert_warn, unimplemented_log};
use tetra_saps::control::brew::{BrewSubscriberAction, BrewSubscriberUpdate};
use tetra_saps::lmm::LmmMleUnitdataReq;
use tetra_saps::{SapMsg, SapMsgInner};
use crate::mm::components::client_state::MmClientMgr;
use crate::mm::components::client_state::{MmClientMgr, MmClientState};
use crate::mm::components::not_supported::make_ul_mm_pdu_function_not_supported;
use tetra_pdus::mm::enums::location_update_type::LocationUpdateType;
use tetra_pdus::mm::enums::mm_pdu_type_ul::MmPduTypeUl;
@@ -34,6 +35,27 @@ impl MmBs {
}
}
fn emit_brew_update(&self, queue: &mut MessageQueue, dltime: TdmaTime, issi: u32, groups: Vec<u32>, action: BrewSubscriberAction) {
let update = BrewSubscriberUpdate { issi, groups, action };
let msg = SapMsg {
sap: Sap::Control,
src: TetraEntity::Mm,
dest: TetraEntity::Brew,
dltime,
msg: SapMsgInner::BrewSubscriberUpdate(update.clone()),
};
queue.push_back(msg);
let msg = SapMsg {
sap: Sap::Control,
src: TetraEntity::Mm,
dest: TetraEntity::Cmce,
dltime,
msg: SapMsgInner::BrewSubscriberUpdate(update),
};
queue.push_back(msg);
}
fn rx_u_itsi_detach(&mut self, _queue: &mut MessageQueue, mut message: SapMsg) {
tracing::trace!("rx_u_itsi_detach");
let SapMsgInner::LmmMleUnitdataInd(prim) = &mut message.msg else {
@@ -59,7 +81,13 @@ impl MmBs {
let ssi = prim.received_address.ssi;
let detached_client = self.client_mgr.remove_client(ssi);
if detached_client.is_none() {
if let Some(client) = detached_client {
if !client.groups.is_empty() {
let groups: Vec<u32> = client.groups.iter().copied().collect();
self.emit_brew_update(_queue, message.dltime, ssi, groups, BrewSubscriberAction::Deaffiliate);
}
self.emit_brew_update(_queue, message.dltime, ssi, Vec::new(), BrewSubscriberAction::Deregister);
} else {
tracing::warn!("Received UItsiDetach for unknown client with SSI: {}", ssi);
// return;
};
@@ -107,20 +135,28 @@ impl MmBs {
// Try to register the client
let issi = prim.received_address.ssi;
match self.client_mgr.try_register_client(issi, true) {
Ok(_) => {}
Err(e) => {
tracing::warn!("Failed registering roaming MS {}: {:?}", issi, e);
// unimplemented_log!("Handle failed registration of roaming MS");
return;
let is_new = !self.client_mgr.client_is_known(issi);
if is_new {
match self.client_mgr.try_register_client(issi, true) {
Ok(_) => {
self.emit_brew_update(queue, message.dltime, issi, Vec::new(), BrewSubscriberAction::Register);
}
Err(e) => {
tracing::warn!("Failed registering roaming MS {}: {:?}", issi, e);
// unimplemented_log!("Handle failed registration of roaming MS");
return;
}
}
} else if let Err(e) = self.client_mgr.set_client_state(issi, MmClientState::Attached) {
tracing::warn!("Failed updating roaming MS {}: {:?}", issi, e);
return;
}
// Process optional GroupIdentityLocationDemand field
let gila = if let Some(gild) = pdu.group_identity_location_demand {
// Try to attach to requested groups, then build GroupIdentityLocationAccept element
let accepted_groups = if let Some(giu) = &gild.group_identity_uplink {
Some(self.try_attach_detach_groups(issi, &giu))
Some(self.try_attach_detach_groups(queue, message.dltime, issi, &giu))
} else {
None
};
@@ -268,8 +304,17 @@ impl MmBs {
// If group_identity_attach_detach_mode == 1, we first detach all groups
if pdu.group_identity_attach_detach_mode == true {
let prior_groups: Vec<u32> = self
.client_mgr
.get_client_by_issi(issi)
.map(|client| client.groups.iter().copied().collect())
.unwrap_or_default();
match self.client_mgr.client_detach_all_groups(issi) {
Ok(_) => {}
Ok(_) => {
if !prior_groups.is_empty() {
self.emit_brew_update(queue, message.dltime, issi, prior_groups, BrewSubscriberAction::Deaffiliate);
}
}
Err(e) => {
tracing::warn!("Failed detaching all groups for MS {}: {:?}", issi, e);
return;
@@ -279,7 +324,7 @@ impl MmBs {
// Try to attach to requested groups, and retrieve list of accepted GroupIdentityDownlink elements
// We can unwrap since we did compat check earlier
let accepted_gid = self.try_attach_detach_groups(issi, &pdu.group_identity_uplink.unwrap());
let accepted_gid = self.try_attach_detach_groups(queue, message.dltime, issi, &pdu.group_identity_uplink.unwrap());
// Build reply PDU
let pdu_response = DAttachDetachGroupIdentityAcknowledgement {
@@ -352,8 +397,17 @@ impl MmBs {
};
}
fn try_attach_detach_groups(&mut self, issi: u32, giu_vec: &Vec<GroupIdentityUplink>) -> Vec<GroupIdentityDownlink> {
fn try_attach_detach_groups(
&mut self,
queue: &mut MessageQueue,
dltime: TdmaTime,
issi: u32,
giu_vec: &Vec<GroupIdentityUplink>,
) -> Vec<GroupIdentityDownlink> {
let mut accepted_groups = Vec::new();
let mut aff_groups = Vec::new();
let mut deaff_groups = Vec::new();
for giu in giu_vec.iter() {
if giu.gssi.is_none() || giu.vgssi.is_some() || giu.address_extension.is_some() {
unimplemented_log!("Only support GroupIdentityUplink with address_type 0");
@@ -361,26 +415,60 @@ impl MmBs {
}
let gssi = giu.gssi.unwrap(); // can't fail
match self.client_mgr.client_group_attach(issi, gssi, true) {
Ok(_) => {
// We have added the client to this group. Add an entry to the downlink response
let gid = GroupIdentityDownlink {
group_identity_attachment: Some(GroupIdentityAttachment {
group_identity_attachment_lifetime: 3, // re-attach after location update
class_of_usage: giu.class_of_usage.unwrap_or(0),
}),
group_identity_detachment_uplink: None,
gssi: Some(giu.gssi.unwrap()),
address_extension: None,
vgssi: None,
};
accepted_groups.push(gid);
let is_detach = giu.group_identity_detachment_uplink.is_some();
if is_detach {
match self.client_mgr.client_group_attach(issi, gssi, false) {
Ok(changed) => {
if changed {
deaff_groups.push(gssi);
}
let gid = GroupIdentityDownlink {
group_identity_attachment: None,
group_identity_detachment_uplink: giu.group_identity_detachment_uplink,
gssi: Some(gssi),
address_extension: None,
vgssi: None,
};
accepted_groups.push(gid);
}
Err(e) => {
tracing::warn!("Failed detaching MS {} from group {}: {:?}", issi, gssi, e);
}
}
Err(e) => {
tracing::warn!("Failed attaching MS {} to group {}: {:?}", issi, gssi, e);
} else {
match self.client_mgr.client_group_attach(issi, gssi, true) {
Ok(changed) => {
if changed {
aff_groups.push(gssi);
}
// We have added the client to this group. Add an entry to the downlink response
let gid = GroupIdentityDownlink {
group_identity_attachment: Some(GroupIdentityAttachment {
group_identity_attachment_lifetime: 3, // re-attach after location update
class_of_usage: giu.class_of_usage.unwrap_or(0),
}),
group_identity_detachment_uplink: None,
gssi: Some(gssi),
address_extension: None,
vgssi: None,
};
accepted_groups.push(gid);
}
Err(e) => {
tracing::warn!("Failed attaching MS {} to group {}: {:?}", issi, gssi, e);
}
}
}
}
if !aff_groups.is_empty() {
self.emit_brew_update(queue, dltime, issi, aff_groups, BrewSubscriberAction::Affiliate);
}
if !deaff_groups.is_empty() {
self.emit_brew_update(queue, dltime, issi, deaff_groups, BrewSubscriberAction::Deaffiliate);
}
accepted_groups
}

View File

@@ -0,0 +1,14 @@
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BrewSubscriberAction {
Register,
Deregister,
Affiliate,
Deaffiliate,
}
#[derive(Debug, Clone)]
pub struct BrewSubscriberUpdate {
pub issi: u32,
pub groups: Vec<u32>,
pub action: BrewSubscriberAction,
}

View File

@@ -68,8 +68,8 @@ pub enum CallControl {
ts: u8, // Allocated timeslot
usage: u8, // Usage number
},
/// Request CMCE to end a network call
/// Sent by Brew when TetraPack sends GROUP_IDLE
/// Request ending a network call
/// Sent by Brew when TetraPack sends GROUP_IDLE, or by CMCE to make Brew drop a call
NetworkCallEnd {
brew_uuid: uuid::Uuid, // Identifies the call to end
},

View File

@@ -1,2 +1,3 @@
pub mod brew;
pub mod call_control;
pub mod enums;

View File

@@ -4,6 +4,7 @@ use tetra_core::Sap;
use tetra_core::TdmaTime;
use tetra_core::tetra_entities::TetraEntity;
use crate::control::brew::BrewSubscriberUpdate;
use crate::control::call_control::CallControl;
use crate::tmd::TmdCircuitDataInd;
use crate::tmd::TmdCircuitDataReq;
@@ -76,6 +77,9 @@ pub enum SapMsgInner {
// CMCE -> UMAC control
CmceCallControl(CallControl),
// MM -> Brew/CMCE subscriber update
BrewSubscriberUpdate(BrewSubscriberUpdate),
// LTPD-SAP (MLE-LTPD)
LtpdMleUnitdataInd(LtpdMleUnitdataInd),
@@ -104,6 +108,9 @@ impl Display for SapMsgInner {
SapMsgInner::TlmbSyncInd(_) => write!(f, "TmbSyncInd"),
SapMsgInner::TlmbSysinfoInd(_) => write!(f, "TmbSysinfoInd"),
// Control/Brew
SapMsgInner::BrewSubscriberUpdate(_) => write!(f, "BrewSubscriberUpdate"),
// TLB-SAP
// SapMsgInner::TlbTlSyncInd(_) => write!(f, "TlbTlSyncInd"),
// SapMsgInner::TlbTlSysinfoInd(_) => write!(f, "TlbTlSysinfoInd"),

View File

@@ -147,6 +147,7 @@ voice_service = true
###############################################################################
# Brew protocol: Connect to TetraPack/BrandMeister server via TETRA Homebrew Protocol.
# All groups that radios attach to are forwarded to Brew as affiliations.
# Uncomment this section to automatically load and use Brew entity
# See: https://wiki.tetrapack.online/books/tetra/page/brew
@@ -167,10 +168,6 @@ voice_service = true
# ISSI to register with server. Usually, your DMR ID.
# issi = 012345
# Comma separated list of Group IDs (GSSIs) to affiliate with
# BlueStation will receive those groups from TetraPack/Brandmeister
# groups = [91]
# Reconnection delay (seconds)
# reconnect_delay_secs = 15