Feature: Adaptive JitterBuffer for Brew Originated Calls (#16)

* brew: replay adaptive jitter buffer and bump v0.4.3
This commit is contained in:
Elliott Cooper
2026-02-21 11:16:44 +00:00
committed by GitHub
parent 2853908a6b
commit ca97b70648
7 changed files with 238 additions and 22 deletions

18
Cargo.lock generated
View File

@@ -161,7 +161,7 @@ dependencies = [
[[package]]
name = "bluestation-bs"
version = "0.4.1"
version = "0.4.3"
dependencies = [
"clap",
"ctrlc",
@@ -733,7 +733,7 @@ dependencies = [
[[package]]
name = "net-tnmm-test"
version = "0.4.1"
version = "0.4.3"
dependencies = [
"clap",
"tetra-config",
@@ -748,7 +748,7 @@ dependencies = [
[[package]]
name = "net-tnmm-test-quic"
version = "0.4.1"
version = "0.4.3"
dependencies = [
"quinn",
"rcgen",
@@ -910,7 +910,7 @@ checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe"
[[package]]
name = "pdu-tool"
version = "0.4.1"
version = "0.4.3"
dependencies = [
"clap",
"tetra-config",
@@ -1527,7 +1527,7 @@ dependencies = [
[[package]]
name = "tetra-config"
version = "0.4.1"
version = "0.4.3"
dependencies = [
"serde",
"tetra-core",
@@ -1536,7 +1536,7 @@ dependencies = [
[[package]]
name = "tetra-core"
version = "0.4.1"
version = "0.4.3"
dependencies = [
"const_format",
"git-version",
@@ -1548,7 +1548,7 @@ dependencies = [
[[package]]
name = "tetra-entities"
version = "0.4.1"
version = "0.4.3"
dependencies = [
"as-any",
"bitcode",
@@ -1576,7 +1576,7 @@ dependencies = [
[[package]]
name = "tetra-pdus"
version = "0.4.1"
version = "0.4.3"
dependencies = [
"tetra-config",
"tetra-core",
@@ -1586,7 +1586,7 @@ dependencies = [
[[package]]
name = "tetra-saps"
version = "0.4.1"
version = "0.4.3"
dependencies = [
"tetra-core",
"tracing",

View File

@@ -17,7 +17,7 @@ members = [
]
[workspace.package]
version = "0.4.1"
version = "0.4.3"
edition = "2024"
authors = ["Wouter Bokslag / Midnight Blue"]
license = "MIT"

View File

@@ -74,6 +74,7 @@ fn build_bs_stack(cfg: &mut SharedConfig) -> MessageRouter {
issi: brew_cfg.issi,
groups: brew_cfg.groups,
reconnect_delay: Duration::from_secs(brew_cfg.reconnect_delay_secs),
jitter_initial_latency_frames: brew_cfg.jitter_initial_latency_frames,
};
let brew_entity = BrewEntity::new(cfg.clone(), brew_config);
router.register_entity(Box::new(brew_entity));

View File

@@ -22,6 +22,8 @@ pub struct CfgBrew {
pub groups: Vec<u32>,
/// Reconnection delay in seconds
pub reconnect_delay_secs: u64,
/// Extra initial jitter playout delay in frames (added on top of adaptive baseline)
pub jitter_initial_latency_frames: u8,
}
#[derive(Default, Deserialize)]
@@ -44,6 +46,9 @@ pub struct CfgBrewDto {
/// Reconnection delay in seconds
#[serde(default = "default_brew_reconnect_delay")]
pub reconnect_delay_secs: u64,
/// Extra initial jitter playout delay in frames (added on top of adaptive baseline)
#[serde(default)]
pub jitter_initial_latency_frames: u8,
#[serde(flatten)]
pub extra: HashMap<String, Value>,
@@ -68,5 +73,6 @@ pub fn apply_brew_patch(src: CfgBrewDto) -> CfgBrew {
issi: src.issi,
groups: src.groups,
reconnect_delay_secs: src.reconnect_delay_secs,
jitter_initial_latency_frames: src.jitter_initial_latency_frames,
}
}

View File

@@ -1,6 +1,6 @@
//! Brew protocol entity bridging TetraPack WebSocket to UMAC/MLE with hangtime-based circuit reuse
use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::thread;
use std::time::{Duration, Instant};
@@ -17,6 +17,20 @@ use super::worker::{BrewCommand, BrewConfig, BrewEvent, BrewWorker};
/// Hangtime before releasing group call circuit to allow reuse without re-signaling.
const GROUP_CALL_HANGTIME: Duration = Duration::from_secs(5);
/// Minimum playout buffer depth in frames.
const BREW_JITTER_MIN_FRAMES: usize = 2;
/// Default playout buffer depth in frames.
const BREW_JITTER_BASE_FRAMES: usize = 4;
/// Maximum adaptive playout target depth in frames.
const BREW_JITTER_TARGET_MAX_FRAMES: usize = 12;
/// Maximum queued frames kept per call before oldest frames are dropped.
const BREW_JITTER_MAX_FRAMES: usize = 24;
/// Expected receive interval for one TCH/S frame in microseconds (~56.67 ms).
const BREW_EXPECTED_FRAME_INTERVAL_US: f64 = 56_667.0;
/// Warn threshold for excessive adaptive playout depth.
const BREW_JITTER_WARN_TARGET_FRAMES: usize = 8;
/// Rate-limit warning logs per call.
const BREW_JITTER_WARN_INTERVAL: Duration = Duration::from_secs(5);
// ─── Active call tracking ─────────────────────────────────────────
@@ -75,6 +89,141 @@ struct UlForwardedCall {
frame_count: u64,
}
#[derive(Debug)]
struct JitterFrame {
rx_seq: u64,
rx_at: Instant,
acelp_data: Vec<u8>,
}
#[derive(Debug, Default)]
struct VoiceJitterBuffer {
frames: VecDeque<JitterFrame>,
next_rx_seq: u64,
started: bool,
target_frames: usize,
prev_rx_at: Option<Instant>,
jitter_us_ewma: f64,
underrun_boost: usize,
stable_pops: u32,
dropped_overflow: u64,
underruns: u64,
last_warn_at: Option<Instant>,
initial_latency_frames: usize,
}
impl VoiceJitterBuffer {
fn with_initial_latency(initial_latency_frames: usize) -> Self {
let initial = initial_latency_frames.min(BREW_JITTER_TARGET_MAX_FRAMES - BREW_JITTER_MIN_FRAMES);
Self {
target_frames: BREW_JITTER_BASE_FRAMES + initial,
initial_latency_frames: initial,
..Default::default()
}
}
fn push(&mut self, acelp_data: Vec<u8>) {
if self.target_frames == 0 {
self.target_frames = BREW_JITTER_BASE_FRAMES + self.initial_latency_frames;
}
let now = Instant::now();
if let Some(prev) = self.prev_rx_at {
let delta_us = now.duration_since(prev).as_micros() as f64;
let deviation_us = (delta_us - BREW_EXPECTED_FRAME_INTERVAL_US).abs();
self.jitter_us_ewma += (deviation_us - self.jitter_us_ewma) / 16.0;
}
self.prev_rx_at = Some(now);
let frame = JitterFrame {
rx_seq: self.next_rx_seq,
rx_at: now,
acelp_data,
};
self.next_rx_seq = self.next_rx_seq.wrapping_add(1);
self.frames.push_back(frame);
while self.frames.len() > BREW_JITTER_MAX_FRAMES {
self.frames.pop_front();
self.dropped_overflow += 1;
}
self.recompute_target();
}
fn pop_ready(&mut self) -> Option<JitterFrame> {
if self.target_frames == 0 {
self.target_frames = BREW_JITTER_BASE_FRAMES + self.initial_latency_frames;
}
if !self.started {
if self.frames.len() < self.target_frames {
return None;
}
self.started = true;
}
match self.frames.pop_front() {
Some(frame) => {
if self.frames.len() >= self.target_frames {
self.stable_pops = self.stable_pops.saturating_add(1);
if self.stable_pops >= 80 {
self.stable_pops = 0;
if self.underrun_boost > 0 {
self.underrun_boost -= 1;
self.recompute_target();
}
}
} else {
self.stable_pops = 0;
}
Some(frame)
}
None => {
self.started = false;
self.underruns += 1;
self.underrun_boost = (self.underrun_boost + 1).min(4);
self.stable_pops = 0;
self.recompute_target();
None
}
}
}
fn target_frames(&self) -> usize {
self.target_frames.max(BREW_JITTER_MIN_FRAMES)
}
fn recompute_target(&mut self) {
let jitter_component =
((self.jitter_us_ewma * 2.0) / BREW_EXPECTED_FRAME_INTERVAL_US).ceil() as usize;
let target =
BREW_JITTER_BASE_FRAMES + self.initial_latency_frames + jitter_component + self.underrun_boost;
self.target_frames = target.clamp(BREW_JITTER_MIN_FRAMES, BREW_JITTER_TARGET_MAX_FRAMES);
}
fn maybe_warn_unhealthy(&mut self, uuid: Uuid) {
let now = Instant::now();
if let Some(last_warn) = self.last_warn_at {
if now.duration_since(last_warn) < BREW_JITTER_WARN_INTERVAL {
return;
}
}
if self.target_frames() < BREW_JITTER_WARN_TARGET_FRAMES && self.underruns == 0 {
return;
}
self.last_warn_at = Some(now);
tracing::warn!(
"BrewEntity: high jitter on uuid={} target_frames={} queue={} underruns={} overflow_drops={} jitter_ms={:.1}",
uuid,
self.target_frames(),
self.frames.len(),
self.underruns,
self.dropped_overflow,
self.jitter_us_ewma / 1000.0
);
}
}
// ─── BrewEntity ───────────────────────────────────────────────────
pub struct BrewEntity {
@@ -91,6 +240,8 @@ pub struct BrewEntity {
/// Active DL calls from Brew keyed by session UUID (currently transmitting)
active_calls: HashMap<Uuid, ActiveCall>,
/// Per-call jitter/playout buffer for downlink voice from Brew.
dl_jitter: HashMap<Uuid, VoiceJitterBuffer>,
/// DL calls in hangtime keyed by dest_gssi — circuit stays open, waiting for
/// new speaker or timeout. Only one hanging call per GSSI.
@@ -129,6 +280,7 @@ impl BrewEntity {
event_receiver,
command_sender,
active_calls: HashMap::new(),
dl_jitter: HashMap::new(),
hanging_calls: HashMap::new(),
ul_forwarded: HashMap::new(),
connected: false,
@@ -164,7 +316,7 @@ impl BrewEntity {
self.handle_group_call_end(queue, uuid, cause);
}
BrewEvent::VoiceFrame { uuid, length_bits, data } => {
self.handle_voice_frame(queue, uuid, length_bits, data);
self.handle_voice_frame(uuid, length_bits, data);
}
BrewEvent::SubscriberEvent { msg_type, issi, groups } => {
tracing::debug!("BrewEntity: subscriber event type={} issi={} groups={:?}", msg_type, issi, groups);
@@ -230,6 +382,9 @@ impl BrewEntity {
frame_count: hanging.frame_count,
};
self.active_calls.insert(uuid, call);
self.dl_jitter
.entry(uuid)
.or_insert_with(|| VoiceJitterBuffer::with_initial_latency(self.brew_config.jitter_initial_latency_frames as usize));
// Forward to CMCE (will reuse circuit automatically)
queue.push_back(SapMsg {
@@ -266,6 +421,9 @@ impl BrewEntity {
frame_count: 0,
};
self.active_calls.insert(uuid, call);
self.dl_jitter
.entry(uuid)
.or_insert_with(|| VoiceJitterBuffer::with_initial_latency(self.brew_config.jitter_initial_latency_frames as usize));
queue.push_back(SapMsg {
sap: Sap::Control,
@@ -287,6 +445,7 @@ impl BrewEntity {
tracing::debug!("BrewEntity: GROUP_IDLE for unknown uuid={}", uuid);
return;
};
self.dl_jitter.remove(&uuid);
tracing::info!(
"BrewEntity: group call ended uuid={} call_id={:?} gssi={} frames={}",
@@ -343,7 +502,7 @@ impl BrewEntity {
}
/// Handle a voice frame from Brew — inject into the downlink
fn handle_voice_frame(&mut self, queue: &mut MessageQueue, uuid: Uuid, _length_bits: u16, data: Vec<u8>) {
fn handle_voice_frame(&mut self, uuid: Uuid, _length_bits: u16, data: Vec<u8>) {
let Some(call) = self.active_calls.get_mut(&uuid) else {
// Voice frame for unknown call — might arrive before GROUP_TX or after GROUP_IDLE
tracing::trace!("BrewEntity: voice frame for unknown uuid={} ({} bytes)", uuid, data.len());
@@ -383,15 +542,55 @@ impl BrewEntity {
}
let acelp_data = data[1..].to_vec(); // 35 bytes = 280 bits, of which 274 are ACELP
// Inject ACELP frame into the downlink via TMD SAP
let tmd_msg = SapMsg {
sap: Sap::TmdSap,
src: TetraEntity::Brew,
dest: TetraEntity::Umac,
dltime: self.dltime,
msg: SapMsgInner::TmdCircuitDataReq(TmdCircuitDataReq { ts, data: acelp_data }),
};
queue.push_back(tmd_msg);
self.dl_jitter
.entry(uuid)
.or_insert_with(|| VoiceJitterBuffer::with_initial_latency(self.brew_config.jitter_initial_latency_frames as usize))
.push(acelp_data);
}
fn drain_jitter_playout(&mut self, queue: &mut MessageQueue) {
if self.dltime.f == 18 {
return;
}
let mut to_send: Vec<(u8, Uuid, usize, JitterFrame)> = Vec::new();
for (uuid, call) in &self.active_calls {
let Some(ts) = call.ts else {
continue;
};
if ts != self.dltime.t {
continue;
}
let Some(jitter) = self.dl_jitter.get_mut(uuid) else {
continue;
};
jitter.maybe_warn_unhealthy(*uuid);
if let Some(frame) = jitter.pop_ready() {
to_send.push((ts, *uuid, jitter.target_frames(), frame));
}
}
for (ts, uuid, target_frames, frame) in to_send {
tracing::trace!(
"BrewEntity: playout uuid={} ts={} rx_seq={} age_ms={} target_frames={}",
uuid,
ts,
frame.rx_seq,
frame.rx_at.elapsed().as_millis(),
target_frames
);
queue.push_back(SapMsg {
sap: Sap::TmdSap,
src: TetraEntity::Brew,
dest: TetraEntity::Umac,
dltime: self.dltime,
msg: SapMsgInner::TmdCircuitDataReq(TmdCircuitDataReq {
ts,
data: frame.acelp_data,
}),
});
}
}
/// Release all active calls (on disconnect)
@@ -399,6 +598,7 @@ impl BrewEntity {
// Request CMCE to end all active network calls
let calls: Vec<(Uuid, ActiveCall)> = self.active_calls.drain().collect();
for (uuid, _) in calls {
self.dl_jitter.remove(&uuid);
queue.push_back(SapMsg {
sap: Sap::Control,
src: TetraEntity::Brew,
@@ -410,6 +610,7 @@ impl BrewEntity {
// Clear hanging call tracking
self.hanging_calls.clear();
self.dl_jitter.clear();
}
/// Handle NetworkCallReady response from CMCE
@@ -448,6 +649,8 @@ impl TetraEntityTrait for BrewEntity {
self.dltime = ts;
// Process all pending events from the worker thread
self.process_events(queue);
// Feed one buffered frame at each traffic playout opportunity.
self.drain_jitter_playout(queue);
// Expire hanging calls that have exceeded hangtime
self.expire_hanging_calls(queue);
}

View File

@@ -92,6 +92,8 @@ pub struct BrewConfig {
pub groups: Vec<u32>,
/// Reconnection delay
pub reconnect_delay: Duration,
/// Extra initial jitter playout delay in frames (added on top of adaptive baseline)
pub jitter_initial_latency_frames: u8,
}
// ─── TLS helper ──────────────────────────────────────────────────

View File

@@ -173,3 +173,7 @@ voice_service = true # Not yet implemented but may be needed to keep certain MS
# Reconnection delay (seconds)
# reconnect_delay_secs = 15
# Optional: additional initial latency compensation in frames for inbound Brew jitter playout.
# Adaptive jitter buffering is always enabled; this adds fixed startup delay if needed.
# jitter_initial_latency_frames = 0