Files
meshcore-bot/modules/topology_engine.py
agessaman 0beb592cc9 Enhance topology engine with advert-origin anchor prior settings
- Introduced new configuration options in `config.ini.example` for advert-origin anchor scoring, including `topology_advert_anchor_enabled`, `topology_advert_anchor_weight`, `topology_advert_anchor_max_adjustment`, and `topology_advert_anchor_freshness_hours`.
- Updated documentation to reflect the new settings and their usage in topology evaluation paths.
- Modified `TopologyEngine` to incorporate anchor prior logic, allowing for soft nudging of topology scoring based on known origin public keys.
- Enhanced `PathCommand` and telemetry recording to support the new anchor prior adjustments.
- Added API endpoints and web viewer features for resetting backfill comparisons and displaying anchor diagnostics.
2026-03-16 20:37:16 -07:00

1417 lines
58 KiB
Python

#!/usr/bin/env python3
"""
Probabilistic topology engine for shadow/cutover path resolution.
This engine supplements the legacy mesh graph heuristics with a lightweight
Viterbi-style decode across candidate repeater states. It is intentionally
additive: legacy tables and APIs remain authoritative unless mode='new'.
"""
import json
import math
import random
import sqlite3
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Tuple
class TopologyEngine:
"""Shadow-capable probabilistic resolver for prefix-collision paths."""
def __init__(self, bot):
self.bot = bot
self.logger = bot.logger
self.db = bot.db_manager
self.prefix_hex_chars = max(2, getattr(bot, "prefix_hex_chars", 2))
cfg = bot.config
self.mode = cfg.get("Path_Command", "topology_engine_mode", fallback="legacy").lower()
self.shadow_sample_rate = max(
0.0, min(1.0, cfg.getfloat("Path_Command", "topology_shadow_sample_rate", fallback=1.0))
)
self.ghost_enabled = cfg.getboolean("Path_Command", "topology_ghost_enabled", fallback=True)
self.ghost_threshold = max(
0.0, min(1.0, cfg.getfloat("Path_Command", "topology_ghost_min_confidence", fallback=0.35))
)
self.max_candidates_per_prefix = max(
1, cfg.getint("Path_Command", "topology_max_candidates_per_prefix", fallback=12)
)
self.min_edge_observations = max(
1, cfg.getint("Path_Command", "min_edge_observations", fallback=3)
)
self.advert_anchor_enabled = cfg.getboolean(
"Path_Command", "topology_advert_anchor_enabled", fallback=False
)
self.advert_anchor_weight = max(
0.0, min(1.0, cfg.getfloat("Path_Command", "topology_advert_anchor_weight", fallback=0.2))
)
self.advert_anchor_max_adjustment = max(
0.0,
min(
0.5,
cfg.getfloat("Path_Command", "topology_advert_anchor_max_adjustment", fallback=0.08),
),
)
self.advert_anchor_freshness_hours = max(
1,
cfg.getint("Path_Command", "topology_advert_anchor_freshness_hours", fallback=168),
)
self._origin_location_cache: Dict[str, Optional[Tuple[float, float]]] = {}
self._last_anchor_debug: Dict[str, Any] = {"applied": False, "adjustment_total": 0.0}
def ingest_path_observation(
self,
path_nodes: List[str],
packet_hash: Optional[str] = None,
bytes_per_hop: Optional[int] = None,
) -> None:
"""Capture normalized path observations for offline/topology analysis."""
if not path_nodes:
return
try:
path_hex = "".join(path_nodes).lower()
self.db.execute_update(
"""
INSERT INTO topology_inference_shadow
(path_hex, path_nodes_json, resolved_path_json, method, model_confidence, packet_hash, bytes_per_hop)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
path_hex,
json.dumps(path_nodes),
json.dumps([]),
"shadow_ingest",
0.0,
packet_hash,
bytes_per_hop,
),
)
except Exception as e:
self.logger.debug(f"Topology engine ingest_path_observation failed: {e}")
def resolve_path_candidates(
self,
node_id: str,
path_context: List[str],
repeaters: List[Dict[str, Any]],
current_index: int,
resolution_context: Optional[Dict[str, Any]] = None,
) -> Tuple[Optional[Dict[str, Any]], float, Optional[str]]:
"""Resolve the best repeater candidate for a colliding node via Viterbi."""
if not repeaters or current_index < 0 or current_index >= len(path_context):
return None, 0.0, None
state_sets: List[List[Dict[str, Any]]] = []
for i, path_node in enumerate(path_context):
if i == current_index:
candidates = self._normalize_repeaters(repeaters, path_node)
else:
candidates = self._query_candidates_for_prefix(path_node)
if not candidates:
candidates = [self._ghost_state(path_node)]
state_sets.append(candidates)
best_sequence, best_score = self._viterbi_decode(
state_sets,
path_context,
resolution_context=resolution_context,
)
if not best_sequence:
return None, 0.0, None
selected = best_sequence[current_index]
confidence = self._score_to_confidence(best_score, path_length=len(path_context))
if selected.get("is_ghost"):
if self.ghost_enabled:
self._record_ghost_node(selected, path_context)
return None, confidence, "topology_viterbi_ghost"
return None, 0.0, None
selected_pk = selected.get("public_key")
resolved = None
for candidate in repeaters:
if candidate.get("public_key") == selected_pk:
resolved = candidate
break
if not resolved and repeaters:
# Best-effort fallback if candidate mapping fails.
resolved = max(repeaters, key=lambda r: self._recency_score(r))
if confidence < self.ghost_threshold and self.ghost_enabled:
# Too uncertain: treat as ghost hypothesis in shadow mode.
self._record_ghost_node(self._ghost_state(node_id), path_context)
return None, confidence, "topology_viterbi_ghost"
return resolved, confidence, "topology_viterbi"
def select_for_hop(
self,
repeaters: List[Dict[str, Any]],
node_id: str,
path_context: List[str],
topology_mode: Optional[str] = None,
packet_hash: Optional[str] = None,
) -> Dict[str, Any]:
"""Command-facing topology selection wrapper with stable return shape."""
if not repeaters or not path_context:
return {
"repeater": None,
"confidence": 0.0,
"method": None,
"is_topology_guess": False,
"anchor_prior_applied": False,
"anchor_prior_adjustment": 0.0,
}
try:
resolution_context = None
origin_public_key = None
origin_packet_type = None
if self._should_apply_anchor_prior(topology_mode=topology_mode):
origin_public_key, origin_packet_type = self._lookup_observed_path_origin(
packet_hash=packet_hash,
path_nodes=path_context,
)
resolution_context = self._build_resolution_context(
origin_public_key=origin_public_key,
origin_packet_type=origin_packet_type,
)
current_index = path_context.index(node_id) if node_id in path_context else -1
if current_index < 0:
return {
"repeater": None,
"confidence": 0.0,
"method": None,
"is_topology_guess": False,
"anchor_prior_applied": False,
"anchor_prior_adjustment": 0.0,
}
repeater, confidence, method = self.resolve_path_candidates(
node_id=node_id,
path_context=path_context,
repeaters=repeaters,
current_index=current_index,
resolution_context=resolution_context,
)
anchor_debug = self._consume_last_anchor_debug()
return {
"repeater": repeater,
"confidence": confidence,
"method": method,
"is_topology_guess": bool(method and method.startswith("topology")),
"anchor_prior_applied": bool(anchor_debug.get("applied")),
"anchor_prior_adjustment": float(anchor_debug.get("adjustment_total") or 0.0),
"origin_public_key": origin_public_key,
"origin_packet_type": origin_packet_type,
}
except Exception as e:
self.logger.debug(f"Topology engine select_for_hop failed: {e}")
return {
"repeater": None,
"confidence": 0.0,
"method": None,
"is_topology_guess": False,
"anchor_prior_applied": False,
"anchor_prior_adjustment": 0.0,
}
def maybe_record_shadow_comparison(
self,
topology_mode: str,
path_nodes: List[str],
model_result: Optional[Dict[str, Any]],
legacy_choice: Optional[Dict[str, Any]],
legacy_confidence: float,
legacy_method: Optional[str],
non_collision: bool = False,
packet_hash: Optional[str] = None,
bytes_per_hop: Optional[int] = None,
origin_public_key: Optional[str] = None,
origin_packet_type: Optional[str] = None,
anchor_prior_applied: bool = False,
anchor_prior_adjustment: float = 0.0,
) -> bool:
"""Record shadow telemetry only when the engine is in shadow mode."""
if (topology_mode or "").lower() != "shadow":
return False
model_result = model_result or {}
if not origin_public_key and packet_hash and path_nodes:
origin_public_key, origin_packet_type = self._lookup_observed_path_origin(
packet_hash=packet_hash,
path_nodes=path_nodes,
)
return self.record_shadow_comparison(
path_nodes=path_nodes,
model_choice=model_result.get("repeater"),
model_confidence=float(model_result.get("confidence") or 0.0),
model_method=model_result.get("method"),
legacy_choice=legacy_choice,
legacy_confidence=float(legacy_confidence or 0.0),
legacy_method=legacy_method,
packet_hash=packet_hash,
bytes_per_hop=bytes_per_hop,
non_collision=non_collision,
origin_public_key=origin_public_key,
origin_packet_type=origin_packet_type,
anchor_prior_applied=bool(model_result.get("anchor_prior_applied", anchor_prior_applied)),
anchor_prior_adjustment=float(model_result.get("anchor_prior_adjustment", anchor_prior_adjustment) or 0.0),
)
def record_shadow_comparison(
self,
path_nodes: List[str],
model_choice: Optional[Dict[str, Any]],
model_confidence: float,
model_method: Optional[str],
legacy_choice: Optional[Dict[str, Any]],
legacy_confidence: float,
legacy_method: Optional[str],
packet_hash: Optional[str] = None,
bytes_per_hop: Optional[int] = None,
non_collision: bool = False,
backfill_key: Optional[str] = None,
origin_public_key: Optional[str] = None,
origin_packet_type: Optional[str] = None,
anchor_prior_applied: bool = False,
anchor_prior_adjustment: float = 0.0,
) -> bool:
"""Persist legacy-vs-new comparison rows and update daily metrics."""
if random.random() > self.shadow_sample_rate:
return False
model_pk = (model_choice or {}).get("public_key")
legacy_pk = (legacy_choice or {}).get("public_key")
agreement = 1 if model_pk and legacy_pk and model_pk == legacy_pk else 0
metric_date = datetime.now().strftime("%Y-%m-%d")
resolved_path_json = json.dumps(
{
"model_public_key": model_pk,
"legacy_public_key": legacy_pk,
"model_name": (model_choice or {}).get("name"),
"legacy_name": (legacy_choice or {}).get("name"),
"origin_public_key": (origin_public_key or "").lower() or None,
"origin_packet_type": (origin_packet_type or "").lower() or None,
"anchor_prior_applied": bool(anchor_prior_applied),
"anchor_prior_adjustment": float(anchor_prior_adjustment or 0.0),
}
)
path_hex = "".join(path_nodes).lower() if path_nodes else ""
try:
if backfill_key:
existing_backfill = self.db.execute_query(
"SELECT id FROM topology_inference_shadow WHERE backfill_key = ? LIMIT 1",
(backfill_key,),
)
if existing_backfill:
# Idempotent backfill: do not duplicate rows or re-roll metrics.
return False
self.db.execute_update(
"""
INSERT INTO topology_inference_shadow
(path_hex, path_nodes_json, resolved_path_json, method, model_confidence, legacy_method, legacy_confidence, agreement, packet_hash, bytes_per_hop, backfill_key)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
path_hex,
json.dumps(path_nodes),
resolved_path_json,
model_method or "topology_viterbi",
float(model_confidence or 0.0),
legacy_method,
float(legacy_confidence or 0.0),
agreement,
packet_hash,
bytes_per_hop,
backfill_key,
),
)
existing = self.db.execute_query(
"SELECT * FROM topology_model_metrics WHERE metric_date = ? LIMIT 1",
(metric_date,),
)
if existing:
row = existing[0]
total = int(row.get("total_comparisons", 0)) + 1
agree_count = int(row.get("agreement_count", 0)) + agreement
disagree_count = int(row.get("disagreement_count", 0)) + (1 - agreement)
nc_total = int(row.get("non_collision_comparisons", 0)) + (1 if non_collision else 0)
nc_agree = int(row.get("non_collision_agreement_count", 0)) + (1 if non_collision and agreement else 0)
self.db.execute_update(
"""
UPDATE topology_model_metrics
SET total_comparisons = ?,
agreement_count = ?,
disagreement_count = ?,
non_collision_comparisons = ?,
non_collision_agreement_count = ?,
avg_legacy_confidence = ?,
avg_model_confidence = ?,
updated_at = CURRENT_TIMESTAMP
WHERE metric_date = ?
""",
(
total,
agree_count,
disagree_count,
nc_total,
nc_agree,
self._rolling_avg(float(row.get("avg_legacy_confidence") or 0.0), float(legacy_confidence or 0.0), total),
self._rolling_avg(float(row.get("avg_model_confidence") or 0.0), float(model_confidence or 0.0), total),
metric_date,
),
)
else:
self.db.execute_update(
"""
INSERT INTO topology_model_metrics
(metric_date, total_comparisons, agreement_count, disagreement_count,
non_collision_comparisons, non_collision_agreement_count,
avg_legacy_confidence, avg_model_confidence, metadata_json)
VALUES (?, 1, ?, ?, ?, ?, ?, ?, ?)
""",
(
metric_date,
agreement,
1 - agreement,
1 if non_collision else 0,
1 if non_collision and agreement else 0,
float(legacy_confidence or 0.0),
float(model_confidence or 0.0),
json.dumps({}),
),
)
return True
except Exception as e:
self.logger.debug(f"Topology shadow comparison write failed: {e}")
return False
def get_shadow_diagnostics(self, days: int = 7) -> Dict[str, Any]:
"""Return summarized shadow diagnostics for optional API/debug use."""
days = max(1, min(90, int(days or 7)))
metrics = self.db.execute_query(
"""
SELECT metric_date, total_comparisons, agreement_count, disagreement_count,
non_collision_comparisons, non_collision_agreement_count,
avg_legacy_confidence, avg_model_confidence
FROM topology_model_metrics
WHERE metric_date >= date('now', ?)
ORDER BY metric_date DESC
""",
(f"-{days} days",),
)
recent = self.db.execute_query(
"""
SELECT path_hex, method, model_confidence, legacy_method, legacy_confidence, agreement, created_at
FROM topology_inference_shadow
WHERE created_at >= datetime('now', ?)
ORDER BY created_at DESC
LIMIT 100
""",
(f"-{days} days",),
)
ghosts = self.db.execute_query(
"""
SELECT ghost_id, prefix, evidence_count, confidence_tier, model_confidence, first_seen, last_seen
FROM topology_ghost_nodes
ORDER BY last_seen DESC
LIMIT 100
"""
)
anchor_rows = self.db.execute_query(
"""
SELECT method, agreement, resolved_path_json
FROM topology_inference_shadow
WHERE created_at >= datetime('now', ?)
AND method != 'shadow_ingest'
""",
(f"-{days} days",),
)
anchor_stats = self._compute_anchor_diagnostics(anchor_rows)
return {
"metrics": metrics,
"recent_comparisons": recent,
"ghost_nodes": ghosts,
"anchor_diagnostics": anchor_stats,
}
def get_model_graph(
self,
days: int = 7,
min_confidence: float = 0.0,
include_ghost: bool = False,
) -> Dict[str, Any]:
"""Build a model-derived mesh graph payload from shadow comparison rows."""
days = max(1, min(90, int(days or 7)))
min_confidence = max(0.0, min(1.0, float(min_confidence or 0.0)))
model_rows = self.db.execute_query(
"""
SELECT path_nodes_json, method, model_confidence, created_at
FROM topology_inference_shadow
WHERE created_at >= datetime('now', ?)
AND method != 'shadow_ingest'
AND model_confidence >= ?
ORDER BY created_at DESC
LIMIT 5000
""",
(f"-{days} days", min_confidence),
)
edge_acc: Dict[str, Dict[str, Any]] = {}
node_prefixes: set = set()
prefix_hex_chars = self.prefix_hex_chars
for row in model_rows:
raw_nodes = row.get("path_nodes_json")
created_at = row.get("created_at")
method = (row.get("method") or "topology_viterbi").strip() or "topology_viterbi"
confidence = float(row.get("model_confidence") or 0.0)
if confidence < min_confidence:
continue
path_nodes: List[str] = []
if raw_nodes:
try:
parsed = json.loads(raw_nodes)
if isinstance(parsed, list):
path_nodes = [str(v).lower() for v in parsed if v is not None]
except (TypeError, ValueError, json.JSONDecodeError):
path_nodes = []
if len(path_nodes) < 2:
continue
# Track prefix length seen in model rows so frontend node/edge prefixes stay aligned.
for node in path_nodes:
if not node:
continue
prefix_hex_chars = max(prefix_hex_chars, len(node))
node_prefixes.add(node)
for i in range(len(path_nodes) - 1):
from_prefix = (path_nodes[i] or "").lower()
to_prefix = (path_nodes[i + 1] or "").lower()
if not from_prefix or not to_prefix:
continue
if not include_ghost and (from_prefix.startswith("ghost:") or to_prefix.startswith("ghost:")):
continue
edge_key = f"{from_prefix}->{to_prefix}"
current = edge_acc.get(edge_key)
if not current:
current = {
"from_prefix": from_prefix,
"to_prefix": to_prefix,
"observation_count": 0,
"first_seen": created_at,
"last_seen": created_at,
"model_confidence_sum": 0.0,
"evidence_count": 0,
"source_method_counts": {},
}
edge_acc[edge_key] = current
current["observation_count"] += 1
current["evidence_count"] += 1
current["model_confidence_sum"] += confidence
if created_at:
if not current.get("first_seen") or str(created_at) < str(current.get("first_seen")):
current["first_seen"] = created_at
if not current.get("last_seen") or str(created_at) > str(current.get("last_seen")):
current["last_seen"] = created_at
method_counts = current["source_method_counts"]
method_counts[method] = int(method_counts.get(method, 0)) + 1
edges: List[Dict[str, Any]] = []
for edge in edge_acc.values():
evidence_count = int(edge.get("evidence_count") or 0)
avg_conf = (float(edge.get("model_confidence_sum") or 0.0) / evidence_count) if evidence_count > 0 else 0.0
method_counts = edge.get("source_method_counts") or {}
dominant_method = max(method_counts.items(), key=lambda kv: kv[1])[0] if method_counts else "topology_viterbi"
edges.append(
{
"from_prefix": edge["from_prefix"],
"to_prefix": edge["to_prefix"],
"from_public_key": None,
"to_public_key": None,
"observation_count": int(edge.get("observation_count") or 0),
"first_seen": edge.get("first_seen"),
"last_seen": edge.get("last_seen"),
"avg_hop_position": None,
"geographic_distance": None,
"model_confidence": round(avg_conf, 4),
"evidence_count": evidence_count,
"source_method": dominant_method,
}
)
# Resolve node metadata (name/location/last seen) from contact tracking when possible.
nodes: List[Dict[str, Any]] = []
if node_prefixes:
try:
conn = sqlite3.connect(self.db.db_path, timeout=60)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
for prefix in sorted(node_prefixes):
if not prefix:
continue
cursor.execute(
"""
SELECT public_key, name, latitude, longitude, role, is_starred, last_heard, last_advert_timestamp
FROM complete_contact_tracking
WHERE public_key LIKE ?
AND role IN ('repeater', 'roomserver')
ORDER BY COALESCE(last_advert_timestamp, last_heard) DESC, is_starred DESC
LIMIT 1
""",
(f"{prefix}%",),
)
row = cursor.fetchone()
if not row:
continue
if row["latitude"] in (None, 0) or row["longitude"] in (None, 0):
continue
nodes.append(
{
"public_key": row["public_key"],
"prefix": prefix.lower(),
"name": row["name"] or f"Node {prefix.upper()}",
"latitude": float(row["latitude"]),
"longitude": float(row["longitude"]),
"role": row["role"] or "repeater",
"is_starred": bool(row["is_starred"]),
"last_heard": row["last_heard"],
"last_advert_timestamp": row["last_advert_timestamp"],
}
)
conn.close()
except Exception as e:
self.logger.debug(f"Topology model graph node hydration failed: {e}")
return {
"nodes": nodes,
"edges": edges,
"prefix_hex_chars": max(2, prefix_hex_chars),
"days": days,
"min_confidence": min_confidence,
"include_ghost": include_ghost,
}
def run_confidence_backfill(
self,
days: int = 7,
limit: Optional[int] = None,
progress_callback: Optional[Callable[[Dict[str, Any]], None]] = None,
) -> Dict[str, Any]:
"""Replay observed paths and persist model-vs-legacy confidence rows."""
days = max(1, min(90, int(days or 7)))
limit_clause = ""
params: Tuple[Any, ...]
if limit is not None:
safe_limit = max(1, min(500000, int(limit)))
limit_clause = " LIMIT ?"
params = (f"-{days} days", safe_limit)
else:
params = (f"-{days} days",)
count_query = """
SELECT COUNT(*) AS total_rows
FROM observed_paths
WHERE last_seen >= datetime('now', ?)
"""
total_rows = int((self.db.execute_query(count_query, (f"-{days} days",)) or [{"total_rows": 0}])[0].get("total_rows", 0))
if limit is not None:
total_rows = min(total_rows, max(1, min(500000, int(limit))))
select_query = f"""
SELECT path_hex, path_length, bytes_per_hop, packet_hash, observation_count, last_seen, packet_type, public_key
FROM observed_paths
WHERE last_seen >= datetime('now', ?)
ORDER BY last_seen ASC
{limit_clause}
"""
rows = self.db.execute_query(select_query, params)
processed = 0
skipped = 0
errors = 0
comparisons_written = 0
start_ts = datetime.now().isoformat()
if progress_callback:
progress_callback(
{
"status": "running",
"days": days,
"total": total_rows,
"processed": 0,
"rows_skipped": 0,
"errors": 0,
"comparisons_written": 0,
"started_at": start_ts,
}
)
# Force full replay coverage independent of runtime shadow sampling.
original_sample_rate = self.shadow_sample_rate
self.shadow_sample_rate = 1.0
try:
for row in rows:
processed += 1
try:
path_nodes = self._parse_observed_path_nodes(row)
if len(path_nodes) < 2:
skipped += 1
continue
row_resolution_context = self._build_resolution_context(
origin_public_key=row.get("public_key"),
origin_packet_type=row.get("packet_type"),
)
wrote_for_row = False
for current_index, node_id in enumerate(path_nodes):
repeaters = self._query_candidates_for_prefix(node_id)
if not repeaters:
continue
legacy_choice, legacy_confidence, legacy_method, non_collision = self._legacy_select_candidate(
path_context=path_nodes,
repeaters=repeaters,
current_index=current_index,
)
model_choice, model_confidence, model_method = self.resolve_path_candidates(
node_id=node_id,
path_context=path_nodes,
repeaters=repeaters,
current_index=current_index,
resolution_context=row_resolution_context,
)
anchor_debug = self._consume_last_anchor_debug()
# If both methods fail to pick anything, skip this node.
if not legacy_choice and not model_choice:
continue
backfill_key = self._build_backfill_key(
path_nodes=path_nodes,
packet_hash=row.get("packet_hash"),
current_index=current_index,
)
wrote = self.record_shadow_comparison(
path_nodes=path_nodes,
model_choice=model_choice,
model_confidence=model_confidence,
model_method=model_method,
legacy_choice=legacy_choice,
legacy_confidence=legacy_confidence,
legacy_method=legacy_method,
packet_hash=row.get("packet_hash"),
bytes_per_hop=row.get("bytes_per_hop"),
non_collision=non_collision,
backfill_key=backfill_key,
origin_public_key=row.get("public_key"),
origin_packet_type=row.get("packet_type"),
anchor_prior_applied=bool(anchor_debug.get("applied")),
anchor_prior_adjustment=float(anchor_debug.get("adjustment_total") or 0.0),
)
if wrote:
comparisons_written += 1
wrote_for_row = True
if not wrote_for_row:
skipped += 1
except Exception:
errors += 1
if progress_callback and processed % 100 == 0:
progress_callback(
{
"status": "running",
"days": days,
"total": total_rows,
"processed": processed,
"rows_skipped": skipped,
"errors": errors,
"comparisons_written": comparisons_written,
"started_at": start_ts,
}
)
finally:
self.shadow_sample_rate = original_sample_rate
result = {
"status": "completed",
"days": days,
"total": total_rows,
"processed": processed,
"rows_skipped": skipped,
"errors": errors,
"comparisons_written": comparisons_written,
"started_at": start_ts,
"completed_at": datetime.now().isoformat(),
}
if progress_callback:
progress_callback(result)
return result
def _build_backfill_key(
self,
path_nodes: List[str],
packet_hash: Optional[str],
current_index: int,
) -> str:
path_hex = "".join(path_nodes).lower()
packet_component = (packet_hash or "nohash").lower()
return f"obs_replay:{packet_component}:{path_hex}:{current_index}"
def _parse_observed_path_nodes(self, row: Dict[str, Any]) -> List[str]:
path_hex = str(row.get("path_hex") or "").strip().lower()
if not path_hex:
return []
bytes_per_hop = row.get("bytes_per_hop")
path_length = row.get("path_length")
hop_hex_chars = 0
if bytes_per_hop:
try:
hop_hex_chars = int(bytes_per_hop) * 2
except (TypeError, ValueError):
hop_hex_chars = 0
if hop_hex_chars <= 0 and path_length:
try:
path_length = int(path_length)
if path_length > 0 and len(path_hex) % path_length == 0:
hop_hex_chars = len(path_hex) // path_length
except (TypeError, ValueError):
hop_hex_chars = 0
if hop_hex_chars <= 0:
hop_hex_chars = self.prefix_hex_chars
hop_hex_chars = max(2, hop_hex_chars)
if len(path_hex) < hop_hex_chars:
return []
if len(path_hex) % hop_hex_chars != 0:
# Fallback for malformed rows: parse by default configured prefix width.
hop_hex_chars = self.prefix_hex_chars
if hop_hex_chars <= 0 or len(path_hex) % hop_hex_chars != 0:
return []
return [path_hex[i : i + hop_hex_chars] for i in range(0, len(path_hex), hop_hex_chars)]
def _legacy_select_candidate(
self,
path_context: List[str],
repeaters: List[Dict[str, Any]],
current_index: int,
) -> Tuple[Optional[Dict[str, Any]], float, str, bool]:
if not repeaters:
return None, 0.0, "legacy_none", False
if len(repeaters) == 1:
return repeaters[0], 1.0, "legacy_single", True
# Legacy fallback prioritizes recency for colliding prefixes.
selected = max(repeaters, key=lambda r: self._recency_score(r))
confidence = max(0.0, min(1.0, self._recency_score(selected)))
# If graph context exists, nudge confidence upward when adjacent edges are observed.
mesh_graph = getattr(self.bot, "mesh_graph", None)
if mesh_graph and current_index > 0:
prev_prefix = (path_context[current_index - 1] or "").lower()[: self.prefix_hex_chars]
cur_prefix = (selected.get("public_key") or selected.get("prefix") or "").lower()[: self.prefix_hex_chars]
graph_score = mesh_graph.get_candidate_score(
candidate_prefix=cur_prefix,
prev_prefix=prev_prefix,
next_prefix=None,
min_observations=self.min_edge_observations,
hop_position=None,
use_bidirectional=True,
use_hop_position=False,
)
confidence = max(confidence, max(0.0, min(1.0, float(graph_score or 0.0))))
return selected, confidence, "legacy_recency_graph", False
def _normalize_repeaters(self, repeaters: List[Dict[str, Any]], node_id: str) -> List[Dict[str, Any]]:
node_prefix = (node_id or "").lower()
states: List[Dict[str, Any]] = []
for r in repeaters:
pk = (r.get("public_key") or "").lower()
if not pk:
continue
if node_prefix and not pk.startswith(node_prefix):
continue
states.append(
{
"public_key": pk,
"prefix": pk[: max(len(node_prefix), self.prefix_hex_chars)],
"name": r.get("name"),
"last_heard": r.get("last_heard") or r.get("last_seen"),
"last_advert_timestamp": r.get("last_advert_timestamp"),
"hop_count": r.get("hop_count"),
"is_starred": bool(r.get("is_starred", False)),
"snr": r.get("snr"),
"latitude": r.get("latitude"),
"longitude": r.get("longitude"),
}
)
return states[: self.max_candidates_per_prefix]
def _query_candidates_for_prefix(self, prefix: str) -> List[Dict[str, Any]]:
prefix = (prefix or "").lower()
if not prefix:
return []
results = self.db.execute_query(
"""
SELECT public_key, name, last_heard, last_advert_timestamp, hop_count, is_starred, snr, latitude, longitude
FROM complete_contact_tracking
WHERE public_key LIKE ?
AND role IN ('repeater', 'roomserver')
ORDER BY is_starred DESC, COALESCE(last_advert_timestamp, last_heard) DESC
LIMIT ?
""",
(f"{prefix}%", self.max_candidates_per_prefix),
)
states: List[Dict[str, Any]] = []
for row in results:
pk = (row.get("public_key") or "").lower()
if not pk:
continue
states.append(
{
"public_key": pk,
"prefix": pk[: max(len(prefix), self.prefix_hex_chars)],
"name": row.get("name"),
"last_heard": row.get("last_heard"),
"last_advert_timestamp": row.get("last_advert_timestamp"),
"hop_count": row.get("hop_count"),
"is_starred": bool(row.get("is_starred", 0)),
"snr": row.get("snr"),
"latitude": row.get("latitude"),
"longitude": row.get("longitude"),
}
)
return states
def _viterbi_decode(
self,
state_sets: List[List[Dict[str, Any]]],
path_context: List[str],
resolution_context: Optional[Dict[str, Any]] = None,
) -> Tuple[List[Dict[str, Any]], float]:
"""Compute best state path with log-space Viterbi."""
if not state_sets:
return [], 0.0
epsilon = 1e-6
dp: List[List[float]] = []
parent: List[List[int]] = []
first_scores = []
first_parent = []
anchor_debug = {"applied": False, "adjustment_total": 0.0}
self._last_anchor_debug = anchor_debug
path_length = len(path_context)
for st in state_sets[0]:
emission = max(epsilon, self._emission_score(st))
adjust = self._origin_emission_adjustment(
state=st,
node_index=0,
path_length=path_length,
resolution_context=resolution_context,
)
if adjust:
anchor_debug["applied"] = True
anchor_debug["adjustment_total"] += adjust
emission = max(epsilon, min(0.99, emission + adjust))
first_scores.append(math.log(emission))
first_parent.append(-1)
dp.append(first_scores)
parent.append(first_parent)
for i in range(1, len(state_sets)):
cur_scores = []
cur_parent = []
prev_states = state_sets[i - 1]
cur_states = state_sets[i]
for cur_idx, cur_st in enumerate(cur_states):
emission = max(epsilon, self._emission_score(cur_st))
best_prev_idx = -1
best_val = -1e18
for prev_idx, prev_st in enumerate(prev_states):
transition = max(
epsilon,
self._transition_score(
prev_st,
cur_st,
path_context[i - 1],
path_context[i],
transition_index=i - 1,
path_length=path_length,
resolution_context=resolution_context,
),
)
val = dp[i - 1][prev_idx] + math.log(transition) + math.log(emission)
if val > best_val:
best_val = val
best_prev_idx = prev_idx
cur_scores.append(best_val)
cur_parent.append(best_prev_idx)
dp.append(cur_scores)
parent.append(cur_parent)
if not dp[-1]:
return [], 0.0
best_last_idx = max(range(len(dp[-1])), key=lambda idx: dp[-1][idx])
best_log_score = dp[-1][best_last_idx]
sequence: List[Dict[str, Any]] = [state_sets[-1][best_last_idx]]
cursor = best_last_idx
for i in range(len(state_sets) - 1, 0, -1):
cursor = parent[i][cursor]
if cursor < 0:
break
sequence.append(state_sets[i - 1][cursor])
sequence.reverse()
if len(sequence) != len(state_sets):
return [], 0.0
self._last_anchor_debug = anchor_debug
return sequence, best_log_score
def _emission_score(self, state: Dict[str, Any]) -> float:
if state.get("is_ghost"):
return 0.25
recency = self._recency_score(state)
zero_hop = 0.12 if state.get("hop_count") == 0 else 0.0
snr_bonus = 0.08 if state.get("snr") is not None else 0.0
starred = 0.06 if state.get("is_starred") else 0.0
return max(0.01, min(0.99, 0.35 + 0.45 * recency + zero_hop + snr_bonus + starred))
def _transition_score(
self,
prev_state: Dict[str, Any],
cur_state: Dict[str, Any],
prev_node: str,
cur_node: str,
transition_index: Optional[int] = None,
path_length: Optional[int] = None,
resolution_context: Optional[Dict[str, Any]] = None,
) -> float:
prev_is_ghost = prev_state.get("is_ghost")
cur_is_ghost = cur_state.get("is_ghost")
if prev_is_ghost and cur_is_ghost:
return 0.25
if prev_is_ghost or cur_is_ghost:
return 0.4
mesh_graph = getattr(self.bot, "mesh_graph", None)
if not mesh_graph:
return 0.5
prev_prefix = (prev_state.get("public_key") or prev_node or "").lower()[: self.prefix_hex_chars]
cur_prefix = (cur_state.get("public_key") or cur_node or "").lower()[: self.prefix_hex_chars]
graph_score = mesh_graph.get_candidate_score(
candidate_prefix=cur_prefix,
prev_prefix=prev_prefix,
next_prefix=None,
min_observations=self.min_edge_observations,
hop_position=None,
use_bidirectional=True,
use_hop_position=False,
)
base_score = max(0.01, min(0.99, 0.2 + 0.75 * graph_score))
adjust = self._origin_transition_adjustment(
prev_state=prev_state,
cur_state=cur_state,
prev_node=prev_node,
cur_node=cur_node,
transition_index=transition_index,
path_length=path_length,
resolution_context=resolution_context,
)
if adjust:
self._last_anchor_debug["applied"] = True
self._last_anchor_debug["adjustment_total"] = float(
self._last_anchor_debug.get("adjustment_total", 0.0) + adjust
)
return max(0.01, min(0.99, base_score + adjust))
def _should_apply_anchor_prior(
self,
topology_mode: Optional[str] = None,
origin_packet_type: Optional[str] = None,
) -> bool:
if not self.advert_anchor_enabled:
return False
mode = (topology_mode or "").strip().lower()
# Phase-1 rollout: evaluation paths only.
if mode and mode != "shadow":
return False
packet_type = (origin_packet_type or "").strip().lower()
if packet_type and packet_type != "advert":
return False
return True
def _build_resolution_context(
self,
origin_public_key: Optional[str] = None,
origin_packet_type: Optional[str] = None,
) -> Optional[Dict[str, Any]]:
packet_type = (origin_packet_type or "").strip().lower()
origin_pk = (origin_public_key or "").strip().lower()
if not self._should_apply_anchor_prior(
topology_mode="shadow",
origin_packet_type=packet_type,
):
return None
if packet_type != "advert" or not origin_pk:
return None
if not self._is_contact_fresh(origin_pk):
return None
return {
"origin_public_key": origin_pk,
"origin_packet_type": packet_type,
"origin_location": self._get_contact_location(origin_pk),
}
def _lookup_observed_path_origin(
self,
packet_hash: Optional[str],
path_nodes: Optional[List[str]] = None,
) -> Tuple[Optional[str], Optional[str]]:
safe_hash = (packet_hash or "").strip()
if not safe_hash or safe_hash == "0000000000000000":
return None, None
try:
rows = self.db.execute_query(
"""
SELECT public_key, packet_type, path_hex, path_length, bytes_per_hop, last_seen
FROM observed_paths
WHERE packet_hash = ?
ORDER BY last_seen DESC
LIMIT 10
""",
(safe_hash,),
)
if not rows:
return None, None
path_hex = "".join(path_nodes or []).lower()
if path_hex:
for row in rows:
candidate_nodes = self._parse_observed_path_nodes(row)
if candidate_nodes and "".join(candidate_nodes).lower() == path_hex:
return row.get("public_key"), row.get("packet_type")
top = rows[0]
return top.get("public_key"), top.get("packet_type")
except Exception:
return None, None
def _get_contact_location(self, public_key: str) -> Optional[Tuple[float, float]]:
pk = (public_key or "").strip().lower()
if not pk:
return None
if pk in self._origin_location_cache:
return self._origin_location_cache[pk]
location: Optional[Tuple[float, float]] = None
try:
rows = self.db.execute_query(
"""
SELECT latitude, longitude
FROM complete_contact_tracking
WHERE public_key = ?
LIMIT 1
""",
(pk,),
)
if rows:
lat = rows[0].get("latitude")
lon = rows[0].get("longitude")
if lat not in (None, 0, 0.0) and lon not in (None, 0, 0.0):
location = (float(lat), float(lon))
except Exception:
location = None
self._origin_location_cache[pk] = location
return location
def _is_contact_fresh(self, public_key: str) -> bool:
pk = (public_key or "").strip().lower()
if not pk:
return False
try:
rows = self.db.execute_query(
"""
SELECT COALESCE(last_advert_timestamp, last_heard) AS last_seen
FROM complete_contact_tracking
WHERE public_key = ?
LIMIT 1
""",
(pk,),
)
if not rows:
return False
last_seen = rows[0].get("last_seen")
if not last_seen:
return False
if isinstance(last_seen, str):
dt = datetime.fromisoformat(last_seen.replace("Z", "+00:00"))
else:
dt = last_seen
if getattr(dt, "tzinfo", None):
age_hours = max(0.0, (datetime.now(dt.tzinfo) - dt).total_seconds() / 3600.0)
else:
age_hours = max(0.0, (datetime.now() - dt).total_seconds() / 3600.0)
return age_hours <= float(self.advert_anchor_freshness_hours)
except Exception:
return False
def _origin_emission_adjustment(
self,
state: Dict[str, Any],
node_index: int,
path_length: int,
resolution_context: Optional[Dict[str, Any]],
) -> float:
if node_index != 0 or not resolution_context:
return 0.0
origin_pk = (resolution_context.get("origin_public_key") or "").lower()
if not origin_pk or state.get("is_ghost"):
return 0.0
state_pk = (state.get("public_key") or "").lower()
if not state_pk:
return 0.0
raw = 0.0
if state_pk == origin_pk:
raw += 1.0
elif origin_pk.startswith(state_pk) or state_pk.startswith(origin_pk):
raw += 0.7
else:
raw -= 0.25
return self._bounded_anchor_adjustment(raw)
def _origin_transition_adjustment(
self,
prev_state: Dict[str, Any],
cur_state: Dict[str, Any],
prev_node: str,
cur_node: str,
transition_index: Optional[int],
path_length: Optional[int],
resolution_context: Optional[Dict[str, Any]],
) -> float:
if not resolution_context or transition_index != 0:
return 0.0
if prev_state.get("is_ghost") or cur_state.get("is_ghost"):
return 0.0
origin_pk = (resolution_context.get("origin_public_key") or "").lower()
if not origin_pk:
return 0.0
prev_pk = (prev_state.get("public_key") or "").lower()
cur_pk = (cur_state.get("public_key") or "").lower()
prev_prefix = (prev_pk or prev_node or "").lower()[: self.prefix_hex_chars]
cur_prefix = (cur_pk or cur_node or "").lower()[: self.prefix_hex_chars]
if not prev_prefix or not cur_prefix:
return 0.0
raw = 0.0
if prev_pk == origin_pk:
raw += 0.8
elif prev_pk and not origin_pk.startswith(prev_pk):
raw -= 0.2
mesh_graph = getattr(self.bot, "mesh_graph", None)
if mesh_graph:
edge = mesh_graph.get_edge(prev_prefix, cur_prefix)
if edge:
edge_from = (edge.get("from_public_key") or "").lower()
edge_to = (edge.get("to_public_key") or "").lower()
if edge_from:
raw += 0.6 if edge_from == origin_pk else -0.2
if edge_to and cur_pk:
raw += 0.2 if edge_to == cur_pk else -0.05
origin_loc = resolution_context.get("origin_location")
cur_lat = cur_state.get("latitude")
cur_lon = cur_state.get("longitude")
if origin_loc and cur_lat not in (None, 0, 0.0) and cur_lon not in (None, 0, 0.0):
try:
km = self._haversine_km(origin_loc[0], origin_loc[1], float(cur_lat), float(cur_lon))
if km <= 200.0:
raw += 0.15
elif km > 800.0:
raw -= 0.12
except Exception:
pass
return self._bounded_anchor_adjustment(raw)
def _bounded_anchor_adjustment(self, raw_score: float) -> float:
if raw_score == 0.0:
return 0.0
weighted = float(raw_score) * self.advert_anchor_weight
capped = max(-self.advert_anchor_max_adjustment, min(self.advert_anchor_max_adjustment, weighted))
return capped
def _consume_last_anchor_debug(self) -> Dict[str, Any]:
debug = dict(self._last_anchor_debug or {})
self._last_anchor_debug = {"applied": False, "adjustment_total": 0.0}
return {
"applied": bool(debug.get("applied")),
"adjustment_total": float(debug.get("adjustment_total") or 0.0),
}
@staticmethod
def _haversine_km(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
radius_km = 6371.0
d_lat = math.radians(lat2 - lat1)
d_lon = math.radians(lon2 - lon1)
a = (
math.sin(d_lat / 2) ** 2
+ math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(d_lon / 2) ** 2
)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(max(0.0, 1 - a)))
return radius_km * c
def _compute_anchor_diagnostics(self, rows: List[Dict[str, Any]]) -> Dict[str, Any]:
total_rows = 0
anchored_rows = 0
anchored_agree = 0
anchored_ghost = 0
unanchored_rows = 0
unanchored_agree = 0
unanchored_ghost = 0
prior_applied_count = 0
prior_adjustment_sum = 0.0
for row in rows:
total_rows += 1
method = (row.get("method") or "").lower()
agreement = int(row.get("agreement") or 0)
payload = {}
try:
payload = json.loads(row.get("resolved_path_json") or "{}")
except (TypeError, ValueError, json.JSONDecodeError):
payload = {}
origin_packet_type = (payload.get("origin_packet_type") or "").lower()
origin_public_key = (payload.get("origin_public_key") or "").lower()
is_anchored = origin_packet_type == "advert" and bool(origin_public_key)
prior_applied = bool(payload.get("anchor_prior_applied"))
prior_adjust = float(payload.get("anchor_prior_adjustment") or 0.0)
if prior_applied:
prior_applied_count += 1
prior_adjustment_sum += prior_adjust
if is_anchored:
anchored_rows += 1
anchored_agree += agreement
if method == "topology_viterbi_ghost":
anchored_ghost += 1
else:
unanchored_rows += 1
unanchored_agree += agreement
if method == "topology_viterbi_ghost":
unanchored_ghost += 1
anchored_agreement_rate = (anchored_agree / anchored_rows) if anchored_rows else 0.0
unanchored_agreement_rate = (unanchored_agree / unanchored_rows) if unanchored_rows else 0.0
anchored_ghost_rate = (anchored_ghost / anchored_rows) if anchored_rows else 0.0
unanchored_ghost_rate = (unanchored_ghost / unanchored_rows) if unanchored_rows else 0.0
return {
"total_rows": total_rows,
"anchored_rows": anchored_rows,
"unanchored_rows": unanchored_rows,
"anchor_prior_applied_count": prior_applied_count,
"average_anchor_adjustment": (prior_adjustment_sum / prior_applied_count) if prior_applied_count else 0.0,
"anchored_agreement_rate": anchored_agreement_rate,
"unanchored_agreement_rate": unanchored_agreement_rate,
"agreement_delta_anchored_vs_unanchored": anchored_agreement_rate - unanchored_agreement_rate,
"anchored_ghost_rate": anchored_ghost_rate,
"unanchored_ghost_rate": unanchored_ghost_rate,
"ghost_rate_delta_anchored_vs_unanchored": anchored_ghost_rate - unanchored_ghost_rate,
}
def _recency_score(self, state: Dict[str, Any]) -> float:
recent = state.get("last_advert_timestamp") or state.get("last_heard")
if not recent:
return 0.0
try:
if isinstance(recent, str):
dt = datetime.fromisoformat(recent.replace("Z", "+00:00"))
else:
dt = recent
age_hours = max(0.0, (datetime.now(dt.tzinfo) - dt).total_seconds() / 3600.0) if getattr(dt, "tzinfo", None) else max(0.0, (datetime.now() - dt).total_seconds() / 3600.0)
return math.exp(-age_hours / 12.0)
except Exception:
return 0.0
def _score_to_confidence(self, log_score: float, path_length: Optional[int] = None) -> float:
"""Convert log probability into bounded confidence with path-length normalization."""
steps = max(1, int(path_length or 1))
# Normalize by number of transitions so confidence remains comparable across path lengths.
normalized_score = log_score / float(steps)
return max(0.0, min(1.0, 1.0 / (1.0 + math.exp(-2.0 * normalized_score))))
def _ghost_state(self, prefix: str) -> Dict[str, Any]:
return {
"is_ghost": True,
"public_key": f"ghost:{prefix.lower()}",
"prefix": prefix.lower(),
"name": "Ghost Node",
}
def _record_ghost_node(self, ghost_state: Dict[str, Any], path_context: List[str]) -> None:
if not self.ghost_enabled:
return
prefix = (ghost_state.get("prefix") or "").lower()
if not prefix:
return
ghost_id = f"ghost_{prefix}"
neighbors = {
"path_context": path_context,
"previous": path_context[:-1],
"next": path_context[1:],
}
try:
existing = self.db.execute_query(
"SELECT id, evidence_count FROM topology_ghost_nodes WHERE ghost_id = ? LIMIT 1",
(ghost_id,),
)
if existing:
evidence_count = int(existing[0].get("evidence_count", 1)) + 1
tier = self._ghost_tier(evidence_count)
self.db.execute_update(
"""
UPDATE topology_ghost_nodes
SET inferred_neighbors_json = ?, evidence_count = ?, confidence_tier = ?, model_confidence = ?, last_seen = CURRENT_TIMESTAMP
WHERE ghost_id = ?
""",
(json.dumps(neighbors), evidence_count, tier, min(0.99, 0.2 + evidence_count * 0.05), ghost_id),
)
else:
self.db.execute_update(
"""
INSERT INTO topology_ghost_nodes
(ghost_id, prefix, inferred_neighbors_json, evidence_count, confidence_tier, model_confidence)
VALUES (?, ?, ?, 1, ?, ?)
""",
(ghost_id, prefix, json.dumps(neighbors), self._ghost_tier(1), 0.25),
)
except Exception as e:
self.logger.debug(f"Topology ghost write failed: {e}")
def _ghost_tier(self, evidence_count: int) -> str:
if evidence_count >= 40:
return "confirmed"
if evidence_count >= 20:
return "likely"
if evidence_count >= 8:
return "possible"
return "noise"
@staticmethod
def _rolling_avg(previous_avg: float, new_value: float, n: int) -> float:
if n <= 1:
return float(new_value)
return ((previous_avg * (n - 1)) + new_value) / float(n)