Files
agessaman 21676f0792 feat(worldcup): add support for announcing disallowed goals and enhance goal tracking
- Introduced a new configuration option `announce_disallowed` to allow notifications for goals overturned by VAR.
- Updated `WorldCupLiveService` to handle disallowed goals, including formatting for previously announced scorers.
- Enhanced `ESPNClient` to ensure accurate goal tracking and state management.
- Added unit tests to validate the new disallowed goal functionality and ensure correct behavior during matches.
2026-06-17 22:11:24 -07:00

1178 lines
58 KiB
Python

"""Shared path-inference engine.
Decodes a hex path string (e.g. from an observed advert path) into a list of repeater nodes,
using recency-weighted scoring, geographic proximity, and the mesh graph for collision resolution.
This is the single implementation shared by the web viewer (POST /api/decode-path and
/api/mesh/resolve-path) and the bot `path` command (modules/commands/path_command.py). The
per-node *selection* logic (recency filtering, graph-based disambiguation, geographic proximity,
and the combination of the two) lives here once. Each caller keeps its own candidate gathering
and output shaping, because those genuinely differ (the web returns a list of minimal node dicts;
the bot returns a richer ``repeater_info`` mapping and has a device-contacts fallback).
Behavioral parity is preserved via :class:`PathInferenceConfig.bot_command`:
* ``bot_command=False`` (the web/default path) reproduces the web viewer's previous decode logic
byte-for-byte. The web's ``decode_path_nodes`` output must not change.
* ``bot_command=True`` (built by ``PathCommand``) enables the bot command's extra behaviors:
SNR bonuses, zero-hop/SNR gating on graph evidence, ``proximity_method='path'`` selection with
sender/previous/next-node proximity, simple-proximity tie-breakers, and the final-hop
no-location penalty.
"""
import configparser
import math
import sqlite3
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Callable, Optional
from modules.utils import calculate_distance
# Minimum recency score for a repeater to survive collision filtering (~55h with the default
# 12h half-life). Shared by both callers.
MIN_RECENCY_THRESHOLD = 0.01
@dataclass
class PathInferenceConfig:
"""Snapshot of the selection knobs read from ``[Bot]``/``[Path_Command]``.
``decode_path_nodes`` builds one via :meth:`from_config` (``bot_command=False``); the bot
``PathCommand`` builds one from its own ``self.*`` attributes (``bot_command=True``) so that
tests which mutate those attributes after construction are honored.
"""
# Geographic
geographic_guessing_enabled: bool = False
bot_latitude: Optional[float] = None
bot_longitude: Optional[float] = None
geographic_scoring_enabled: bool = True
proximity_method: str = 'simple'
path_proximity_fallback: bool = True
max_proximity_range: float = 200.0
# Recency / weighting
max_repeater_age_days: int = 14
recency_weight: float = 0.4
proximity_weight: float = 0.6
recency_decay_half_life_hours: float = 12.0
# Graph validation
graph_based_validation: bool = True
min_edge_observations: int = 3
graph_use_bidirectional: bool = True
graph_use_hop_position: bool = True
graph_multi_hop_enabled: bool = True
graph_multi_hop_max_hops: int = 2
graph_geographic_combined: bool = False
graph_geographic_weight: float = 0.7
graph_confidence_override_threshold: float = 0.7
graph_distance_penalty_enabled: bool = True
graph_max_reasonable_hop_distance_km: float = 30.0
graph_distance_penalty_strength: float = 0.3
graph_zero_hop_bonus: float = 0.4
graph_prefer_stored_keys: bool = True
graph_final_hop_proximity_enabled: bool = True
graph_final_hop_proximity_weight: float = 0.25
graph_final_hop_max_distance: float = 0.0
graph_final_hop_proximity_normalization_km: float = 200.0
graph_final_hop_very_close_threshold_km: float = 10.0
graph_final_hop_close_threshold_km: float = 30.0
graph_final_hop_max_proximity_weight: float = 0.6
graph_path_validation_max_bonus: float = 0.3
graph_path_validation_obs_divisor: float = 50.0
star_bias_multiplier: float = 2.5
# Master switch enabling the bot `path` command's extra behaviors (see module docstring).
bot_command: bool = False
@classmethod
def from_config(cls, config) -> "PathInferenceConfig":
"""Read the selection knobs from a ConfigParser the way the web viewer always has.
``bot_command`` is False here; the web path never enables the bot-only behaviors.
"""
geographic_guessing_enabled = False
bot_latitude = None
bot_longitude = None
try:
if config.has_section('Bot'):
lat = config.getfloat('Bot', 'bot_latitude', fallback=None)
lon = config.getfloat('Bot', 'bot_longitude', fallback=None)
if lat is not None and lon is not None and -90 <= lat <= 90 and -180 <= lon <= 180:
bot_latitude = lat
bot_longitude = lon
geographic_guessing_enabled = True
except (ValueError, configparser.Error): # malformed float or missing section
pass
preset = config.get('Path_Command', 'path_selection_preset', fallback='balanced').lower()
if preset == 'geographic':
preset_graph_confidence_threshold = 0.5
preset_distance_threshold = 30.0
preset_distance_penalty = 0.5
preset_final_hop_weight = 0.4
elif preset == 'graph':
preset_graph_confidence_threshold = 0.9
preset_distance_threshold = 50.0
preset_distance_penalty = 0.2
preset_final_hop_weight = 0.15
else: # 'balanced' (default)
preset_graph_confidence_threshold = 0.7
preset_distance_threshold = 30.0
preset_distance_penalty = 0.3
preset_final_hop_weight = 0.25
recency_weight = max(0.0, min(1.0, config.getfloat('Path_Command', 'recency_weight', fallback=0.4)))
graph_geographic_weight = max(0.0, min(1.0, config.getfloat('Path_Command', 'graph_geographic_weight', fallback=0.7)))
graph_confidence_override_threshold = max(0.0, min(1.0, config.getfloat('Path_Command', 'graph_confidence_override_threshold', fallback=preset_graph_confidence_threshold)))
graph_distance_penalty_strength = max(0.0, min(1.0, config.getfloat('Path_Command', 'graph_distance_penalty_strength', fallback=preset_distance_penalty)))
graph_zero_hop_bonus = max(0.0, min(1.0, config.getfloat('Path_Command', 'graph_zero_hop_bonus', fallback=0.4)))
graph_final_hop_proximity_weight = max(0.0, min(1.0, config.getfloat('Path_Command', 'graph_final_hop_proximity_weight', fallback=preset_final_hop_weight)))
graph_final_hop_max_proximity_weight = max(0.0, min(1.0, config.getfloat('Path_Command', 'graph_final_hop_max_proximity_weight', fallback=0.6)))
graph_path_validation_max_bonus = max(0.0, min(1.0, config.getfloat('Path_Command', 'graph_path_validation_max_bonus', fallback=0.3)))
star_bias_multiplier = max(1.0, config.getfloat('Path_Command', 'star_bias_multiplier', fallback=2.5))
return cls(
geographic_guessing_enabled=geographic_guessing_enabled,
bot_latitude=bot_latitude,
bot_longitude=bot_longitude,
geographic_scoring_enabled=config.getboolean('Path_Command', 'geographic_scoring_enabled', fallback=True),
proximity_method=config.get('Path_Command', 'proximity_method', fallback='simple'),
path_proximity_fallback=config.getboolean('Path_Command', 'path_proximity_fallback', fallback=True),
max_proximity_range=config.getfloat('Path_Command', 'max_proximity_range', fallback=200.0),
max_repeater_age_days=config.getint('Path_Command', 'max_repeater_age_days', fallback=14),
recency_weight=recency_weight,
proximity_weight=1.0 - recency_weight,
recency_decay_half_life_hours=config.getfloat('Path_Command', 'recency_decay_half_life_hours', fallback=12.0),
graph_based_validation=config.getboolean('Path_Command', 'graph_based_validation', fallback=True),
min_edge_observations=config.getint('Path_Command', 'min_edge_observations', fallback=3),
graph_use_bidirectional=config.getboolean('Path_Command', 'graph_use_bidirectional', fallback=True),
graph_use_hop_position=config.getboolean('Path_Command', 'graph_use_hop_position', fallback=True),
graph_multi_hop_enabled=config.getboolean('Path_Command', 'graph_multi_hop_enabled', fallback=True),
graph_multi_hop_max_hops=config.getint('Path_Command', 'graph_multi_hop_max_hops', fallback=2),
graph_geographic_combined=config.getboolean('Path_Command', 'graph_geographic_combined', fallback=False),
graph_geographic_weight=graph_geographic_weight,
graph_confidence_override_threshold=graph_confidence_override_threshold,
graph_distance_penalty_enabled=config.getboolean('Path_Command', 'graph_distance_penalty_enabled', fallback=True),
graph_max_reasonable_hop_distance_km=config.getfloat('Path_Command', 'graph_max_reasonable_hop_distance_km', fallback=preset_distance_threshold),
graph_distance_penalty_strength=graph_distance_penalty_strength,
graph_zero_hop_bonus=graph_zero_hop_bonus,
graph_prefer_stored_keys=config.getboolean('Path_Command', 'graph_prefer_stored_keys', fallback=True),
graph_final_hop_proximity_enabled=config.getboolean('Path_Command', 'graph_final_hop_proximity_enabled', fallback=True),
graph_final_hop_proximity_weight=graph_final_hop_proximity_weight,
graph_final_hop_max_distance=config.getfloat('Path_Command', 'graph_final_hop_max_distance', fallback=0.0),
graph_final_hop_proximity_normalization_km=config.getfloat('Path_Command', 'graph_final_hop_proximity_normalization_km', fallback=200.0),
graph_final_hop_very_close_threshold_km=config.getfloat('Path_Command', 'graph_final_hop_very_close_threshold_km', fallback=10.0),
graph_final_hop_close_threshold_km=config.getfloat('Path_Command', 'graph_final_hop_close_threshold_km', fallback=30.0),
graph_final_hop_max_proximity_weight=graph_final_hop_max_proximity_weight,
graph_path_validation_max_bonus=graph_path_validation_max_bonus,
graph_path_validation_obs_divisor=config.getfloat('Path_Command', 'graph_path_validation_obs_divisor', fallback=50.0),
star_bias_multiplier=star_bias_multiplier,
bot_command=False,
)
@dataclass
class NodeSelection:
"""Result of resolving a single path node to a repeater.
``status`` is one of:
* ``'single'`` — exactly one (recent) candidate; ``repeater`` is it.
* ``'resolved'`` — a colliding prefix disambiguated with confidence >= 0.5; ``repeater``,
``confidence`` and ``method`` describe the winner.
* ``'collision'`` — a colliding prefix that could not be confidently resolved; ``repeater``
is None and ``recent_repeaters`` holds the candidates for display/fallback.
* ``'not_found'`` — no candidate survived recency filtering.
"""
status: str
repeater: Optional[dict[str, Any]] = None
confidence: float = 0.0
method: Optional[str] = None
matches: int = 0
recent_repeaters: list[dict[str, Any]] = None # type: ignore[assignment]
# ---------------------------------------------------------------------------
# Recency scoring
# ---------------------------------------------------------------------------
def calculate_recency_weighted_scores(repeaters, cfg: PathInferenceConfig):
"""Score repeaters 0.0-1.0 by recency (higher = more recently heard), sorted descending."""
scored_repeaters = []
now = datetime.now()
for repeater in repeaters:
most_recent_time = None
for field in ('last_heard', 'last_advert_timestamp', 'last_seen'):
value = repeater.get(field)
if value:
try:
if isinstance(value, str):
dt = datetime.fromisoformat(value.replace('Z', '+00:00'))
else:
dt = value
if most_recent_time is None or dt > most_recent_time:
most_recent_time = dt
except Exception:
pass
if most_recent_time is None:
recency_score = 0.1
else:
hours_ago = (now - most_recent_time).total_seconds() / 3600.0
recency_score = math.exp(-hours_ago / cfg.recency_decay_half_life_hours)
recency_score = max(0.0, min(1.0, recency_score))
scored_repeaters.append((repeater, recency_score))
scored_repeaters.sort(key=lambda x: x[1], reverse=True)
return scored_repeaters
def apply_tie_breakers(distances: list[tuple[float, dict[str, Any]]]) -> dict[str, Any]:
"""Break ties between equidistant repeaters deterministically (bot `path` command only)."""
import contextlib
min_distance = distances[0][0]
tied_repeaters = [repeater for distance, repeater in distances if distance == min_distance]
# Tie-breaker 1: prefer active repeaters
active_repeaters = [r for r in tied_repeaters if r.get('is_active', True)]
if len(active_repeaters) == 1:
return active_repeaters[0]
elif len(active_repeaters) > 1:
tied_repeaters = active_repeaters
# Tie-breaker 2: prefer most recent activity
def get_recent_timestamp(repeater):
timestamps = []
for field in ('last_heard', 'last_advert_timestamp', 'last_seen'):
value = repeater.get(field)
if value:
try:
if isinstance(value, str):
dt = datetime.fromisoformat(value.replace('Z', '+00:00'))
else:
dt = value
timestamps.append(dt)
except Exception:
pass
return max(timestamps) if timestamps else datetime.min
try:
tied_repeaters.sort(key=get_recent_timestamp, reverse=True)
except Exception:
pass
# Tie-breaker 3: higher advertisement count
with contextlib.suppress(BaseException):
tied_repeaters.sort(key=lambda r: r.get('advert_count', 0), reverse=True)
# Tie-breaker 4: alphabetical (deterministic)
tied_repeaters.sort(key=lambda r: r.get('name', ''))
return tied_repeaters[0]
# ---------------------------------------------------------------------------
# Geographic proximity
# ---------------------------------------------------------------------------
def get_node_location(node_id: str, cfg: PathInferenceConfig, *, db_manager, logger) -> Optional[tuple[float, float]]:
"""Look up a node's coordinates from complete_contact_tracking (bot `path` proximity only)."""
try:
if cfg.max_repeater_age_days > 0:
query = f'''
SELECT latitude, longitude, is_starred FROM complete_contact_tracking
WHERE public_key LIKE ? AND latitude IS NOT NULL AND longitude IS NOT NULL
AND latitude != 0 AND longitude != 0 AND role IN ('repeater', 'roomserver')
AND (
(last_advert_timestamp IS NOT NULL AND last_advert_timestamp >= datetime('now', '-{cfg.max_repeater_age_days} days'))
OR (last_advert_timestamp IS NULL AND last_heard >= datetime('now', '-{cfg.max_repeater_age_days} days'))
)
ORDER BY is_starred DESC, COALESCE(last_advert_timestamp, last_heard) DESC
LIMIT 1
'''
else:
query = '''
SELECT latitude, longitude, is_starred FROM complete_contact_tracking
WHERE public_key LIKE ? AND latitude IS NOT NULL AND longitude IS NOT NULL
AND latitude != 0 AND longitude != 0 AND role IN ('repeater', 'roomserver')
ORDER BY is_starred DESC, COALESCE(last_advert_timestamp, last_heard) DESC
LIMIT 1
'''
results = db_manager.execute_query(query, (f"{node_id}%",))
if results:
row = results[0]
return (row['latitude'], row['longitude'])
return None
except Exception as e:
logger.warning(f"Error getting location for node {node_id}: {e}")
return None
def select_by_simple_proximity(repeaters_with_location, cfg: PathInferenceConfig, *, logger):
"""Select the closest repeater to the bot, with a strong recency bias."""
scored_repeaters = calculate_recency_weighted_scores(repeaters_with_location, cfg)
scored_repeaters = [(r, score) for r, score in scored_repeaters if score >= MIN_RECENCY_THRESHOLD]
if not scored_repeaters:
return None, 0.0
if len(scored_repeaters) == 1:
repeater, recency_score = scored_repeaters[0]
distance = calculate_distance(cfg.bot_latitude, cfg.bot_longitude, repeater['latitude'], repeater['longitude'])
if cfg.max_proximity_range > 0 and distance > cfg.max_proximity_range:
return None, 0.0
base_confidence = 0.4 + (recency_score * 0.5)
return repeater, base_confidence
combined_scores = []
for repeater, recency_score in scored_repeaters:
distance = calculate_distance(cfg.bot_latitude, cfg.bot_longitude, repeater['latitude'], repeater['longitude'])
if cfg.max_proximity_range > 0 and distance > cfg.max_proximity_range:
continue
normalized_distance = min(distance / 1000.0, 1.0)
proximity_score = 1.0 - normalized_distance
combined_score = (recency_score * cfg.recency_weight) + (proximity_score * cfg.proximity_weight)
if repeater.get('is_starred', False):
combined_score *= cfg.star_bias_multiplier
# SNR presence means a confirmed zero-hop neighbor; only the bot command carries SNR.
if cfg.bot_command and repeater.get('snr') is not None:
combined_score += combined_score * 0.2
combined_scores.append((combined_score, distance, repeater))
if not combined_scores:
return None, 0.0
combined_scores.sort(key=lambda x: x[0], reverse=True)
best_score, best_distance, best_repeater = combined_scores[0]
if len(combined_scores) == 1:
confidence = 0.4 + (best_score * 0.5)
else:
second_best_score = combined_scores[1][0]
score_ratio = best_score / second_best_score if second_best_score > 0 else 1.0
if score_ratio > 1.5:
confidence = 0.9
elif score_ratio > 1.2:
confidence = 0.8
elif score_ratio > 1.1:
confidence = 0.7
elif cfg.bot_command:
# Scores too similar — fall back to a deterministic tie-breaker.
distances_for_tiebreaker = [(d, r) for _, d, r in combined_scores]
return apply_tie_breakers(distances_for_tiebreaker), 0.5
else:
confidence = 0.5
return best_repeater, confidence
def select_by_dual_proximity(repeaters, prev_location, next_location, cfg: PathInferenceConfig, *, logger):
"""Select a repeater near both its previous and next path nodes (bot `path` proximity)."""
scored_repeaters = calculate_recency_weighted_scores(repeaters, cfg)
scored_repeaters = [(r, score) for r, score in scored_repeaters if score >= MIN_RECENCY_THRESHOLD]
if not scored_repeaters:
return None, 0.0
best_repeater = None
best_combined_score = 0.0
for repeater, recency_score in scored_repeaters:
prev_distance = calculate_distance(prev_location[0], prev_location[1], repeater['latitude'], repeater['longitude'])
next_distance = calculate_distance(next_location[0], next_location[1], repeater['latitude'], repeater['longitude'])
avg_distance = (prev_distance + next_distance) / 2
normalized_distance = min(avg_distance / 1000.0, 1.0)
proximity_score = 1.0 - normalized_distance
combined_score = (recency_score * cfg.recency_weight) + (proximity_score * cfg.proximity_weight)
if repeater.get('is_starred', False):
combined_score *= cfg.star_bias_multiplier
if repeater.get('snr') is not None:
combined_score += combined_score * 0.2
if combined_score > best_combined_score:
best_combined_score = combined_score
best_repeater = repeater
if best_repeater:
if cfg.max_proximity_range > 0:
prev_dist = calculate_distance(prev_location[0], prev_location[1], best_repeater['latitude'], best_repeater['longitude'])
next_dist = calculate_distance(next_location[0], next_location[1], best_repeater['latitude'], best_repeater['longitude'])
if prev_dist > cfg.max_proximity_range or next_dist > cfg.max_proximity_range:
return None, 0.0
confidence = 0.4 + (best_combined_score * 0.5)
return best_repeater, confidence
return None, 0.0
def select_by_single_proximity(repeaters, reference_location, direction, cfg: PathInferenceConfig, *, logger):
"""Select a repeater near a single reference point (sender/bot/prev/next)."""
scored_repeaters = calculate_recency_weighted_scores(repeaters, cfg)
scored_repeaters = [(r, score) for r, score in scored_repeaters if score >= MIN_RECENCY_THRESHOLD]
if not scored_repeaters:
return None, 0.0
# The first hop (from sender) and last hop (to bot) prioritize distance above all else.
if direction in ("bot", "sender"):
proximity_weight = 1.0
recency_weight = 0.0
else:
proximity_weight = cfg.proximity_weight
recency_weight = cfg.recency_weight
best_repeater = None
best_combined_score = 0.0
for repeater, recency_score in scored_repeaters:
distance = calculate_distance(reference_location[0], reference_location[1], repeater['latitude'], repeater['longitude'])
if cfg.max_proximity_range > 0 and distance > cfg.max_proximity_range:
continue
normalized_distance = min(distance / 1000.0, 1.0)
proximity_score = 1.0 - normalized_distance
combined_score = (recency_score * recency_weight) + (proximity_score * proximity_weight)
if repeater.get('is_starred', False):
combined_score *= cfg.star_bias_multiplier
if repeater.get('snr') is not None:
combined_score += combined_score * 0.2
if combined_score > best_combined_score:
best_combined_score = combined_score
best_repeater = repeater
if best_repeater:
confidence = 0.4 + (best_combined_score * 0.5)
return best_repeater, confidence
return None, 0.0
def select_by_path_proximity(repeaters_with_location, node_id, path_context, sender_location, cfg: PathInferenceConfig, *, db_manager, logger):
"""Select a repeater based on proximity to its previous/next nodes in the path."""
try:
scored_repeaters = calculate_recency_weighted_scores(repeaters_with_location, cfg)
recent_repeaters = [r for r, score in scored_repeaters if score >= MIN_RECENCY_THRESHOLD]
if not recent_repeaters:
return None, 0.0
current_index = path_context.index(node_id) if node_id in path_context else -1
if current_index == -1:
return None, 0.0
prev_location = None
next_location = None
if current_index > 0:
prev_location = get_node_location(path_context[current_index - 1], cfg, db_manager=db_manager, logger=logger)
if current_index < len(path_context) - 1:
next_location = get_node_location(path_context[current_index + 1], cfg, db_manager=db_manager, logger=logger)
# First repeater: prioritize the sender as the source.
is_first_repeater = (current_index == 0)
if is_first_repeater and sender_location:
return select_by_single_proximity(recent_repeaters, sender_location, "sender", cfg, logger=logger)
# Last repeater: prioritize the bot as the destination.
is_last_repeater = (current_index == len(path_context) - 1)
if is_last_repeater and cfg.geographic_guessing_enabled:
if cfg.bot_latitude is not None and cfg.bot_longitude is not None:
bot_location = (cfg.bot_latitude, cfg.bot_longitude)
return select_by_single_proximity(recent_repeaters, bot_location, "bot", cfg, logger=logger)
if prev_location and next_location:
return select_by_dual_proximity(recent_repeaters, prev_location, next_location, cfg, logger=logger)
elif prev_location:
return select_by_single_proximity(recent_repeaters, prev_location, "previous", cfg, logger=logger)
elif next_location:
return select_by_single_proximity(recent_repeaters, next_location, "next", cfg, logger=logger)
else:
return None, 0.0
except Exception as e:
logger.warning(f"Error in path proximity calculation: {e}")
return None, 0.0
def select_geographic(repeaters, node_id, path_context, cfg: PathInferenceConfig, *, db_manager, logger, sender_location=None):
"""Pick the best repeater geographically, dispatching on ``proximity_method``.
For the web/default path (``bot_command=False``) this is always simple proximity, matching the
web viewer's previous behavior (which ignored ``proximity_method``).
"""
if not cfg.geographic_guessing_enabled:
return None, 0.0
if cfg.bot_command:
repeaters_with_location = []
for repeater in repeaters:
lat = repeater.get('latitude')
lon = repeater.get('longitude')
if lat is not None and lon is not None and not (lat == 0.0 and lon == 0.0):
repeaters_with_location.append(repeater)
if not repeaters_with_location:
return None, 0.0
if cfg.proximity_method == 'path' and path_context and node_id:
result = select_by_path_proximity(repeaters_with_location, node_id, path_context, sender_location, cfg, db_manager=db_manager, logger=logger)
if result[0] is not None:
return result
elif cfg.path_proximity_fallback:
return select_by_simple_proximity(repeaters_with_location, cfg, logger=logger)
else:
return None, 0.0
return select_by_simple_proximity(repeaters_with_location, cfg, logger=logger)
# Web/default: truthiness filter (excludes None and 0.0) + simple proximity.
repeaters_with_location = [r for r in repeaters if r.get('latitude') and r.get('longitude')]
if not repeaters_with_location:
return None, 0.0
return select_by_simple_proximity(repeaters_with_location, cfg, logger=logger)
# ---------------------------------------------------------------------------
# Graph-based selection
# ---------------------------------------------------------------------------
def _graph_path_validation_bonus_web(candidate_public_key, path_context, current_index, cfg, *, db_manager, logger):
"""Web variant of the path-validation bonus (prefix-match aware)."""
path_validation_bonus = 0.0
if not (candidate_public_key and len(path_context) > 1):
return 0.0
try:
query = '''
SELECT path_hex, observation_count, last_seen, from_prefix, to_prefix, bytes_per_hop
FROM observed_paths
WHERE public_key = ? AND packet_type = 'advert'
ORDER BY observation_count DESC, last_seen DESC
LIMIT 10
'''
stored_paths = db_manager.execute_query(query, (candidate_public_key,))
if stored_paths:
decoded_path_hex = ''.join([node.lower() for node in path_context])
path_prefix_up_to_current = ''.join([node.lower() for node in path_context[:current_index]])
for stored_path in stored_paths:
stored_hex = stored_path.get('path_hex', '').lower()
obs_count = stored_path.get('observation_count', 1)
if stored_hex:
n = (stored_path.get('bytes_per_hop') or 1) * 2
if n <= 0:
n = 2
stored_nodes = [stored_hex[i:i+n] for i in range(0, len(stored_hex), n)]
if (len(stored_hex) % n) != 0:
stored_nodes = [stored_hex[i:i+2] for i in range(0, len(stored_hex), 2)]
decoded_nodes = path_context if path_context else [decoded_path_hex[i:i+n] for i in range(0, len(decoded_path_hex), n)]
common_segments = 0
min_len = min(len(stored_nodes), len(decoded_nodes))
for i in range(min_len):
if stored_nodes[i] == decoded_nodes[i]:
common_segments += 1
else:
break
prefix_match = False
if path_prefix_up_to_current and len(stored_hex) >= len(path_prefix_up_to_current):
if stored_hex.startswith(path_prefix_up_to_current):
prefix_match = True
if common_segments >= 2 or prefix_match:
if prefix_match and common_segments >= current_index:
segment_bonus = min(cfg.graph_path_validation_max_bonus, 0.1 * (current_index + 1))
else:
segment_bonus = min(0.2, 0.05 * common_segments)
obs_bonus = min(0.15, obs_count / cfg.graph_path_validation_obs_divisor)
path_validation_bonus = max(path_validation_bonus, segment_bonus + obs_bonus)
path_validation_bonus = min(cfg.graph_path_validation_max_bonus, path_validation_bonus)
if path_validation_bonus >= cfg.graph_path_validation_max_bonus * 0.9:
break
except (sqlite3.Error, OSError, KeyError, ValueError) as _score_err:
logger.debug("Path-scoring graph query failed: %s", _score_err)
return path_validation_bonus
def _graph_path_validation_bonus_bot(candidate_public_key, candidate_prefix, path_context, cfg, *, db_manager, logger):
"""Bot `path` command variant of the path-validation bonus (preserved verbatim)."""
path_validation_bonus = 0.0
if not (candidate_public_key and len(path_context) > 1):
return 0.0
try:
query = '''
SELECT path_hex, observation_count, last_seen, from_prefix, to_prefix, bytes_per_hop
FROM observed_paths
WHERE public_key = ? AND packet_type = 'advert'
ORDER BY observation_count DESC, last_seen DESC
LIMIT 10
'''
stored_paths = db_manager.execute_query(query, (candidate_public_key,))
if stored_paths:
decoded_path_hex = ''.join([node.lower() for node in path_context])
path_n = len(path_context[0]) if path_context else 0
for stored_path in stored_paths:
stored_hex = stored_path.get('path_hex', '').lower()
obs_count = stored_path.get('observation_count', 1)
if not stored_hex:
continue
stored_n = (stored_path.get('bytes_per_hop') or 1) * 2
if stored_n <= 0:
stored_n = 2
if path_n != stored_n:
continue
stored_nodes = [stored_hex[i:i+stored_n] for i in range(0, len(stored_hex), stored_n)]
if (len(stored_hex) % stored_n) != 0:
stored_nodes = [stored_hex[i:i+2] for i in range(0, len(stored_hex), 2)]
decoded_nodes = [decoded_path_hex[i:i+path_n] for i in range(0, len(decoded_path_hex), path_n)]
if (len(decoded_path_hex) % path_n) != 0:
decoded_nodes = [decoded_path_hex[i:i+2] for i in range(0, len(decoded_path_hex), 2)]
common_segments = 0
min_len = min(len(stored_nodes), len(decoded_nodes))
for i in range(min_len):
if stored_nodes[i] == decoded_nodes[i]:
common_segments += 1
else:
break
if common_segments >= 2:
segment_bonus = min(0.2, 0.05 * common_segments)
obs_bonus = min(0.15, obs_count / cfg.graph_path_validation_obs_divisor)
path_validation_bonus = max(path_validation_bonus, segment_bonus + obs_bonus)
path_validation_bonus = min(cfg.graph_path_validation_max_bonus, path_validation_bonus)
if path_validation_bonus >= cfg.graph_path_validation_max_bonus * 0.9:
break
except Exception as e:
logger.debug(f"Error checking path validation for {candidate_prefix}: {e}")
return path_validation_bonus
def select_repeater_by_graph(repeaters, node_id, path_context, cfg: PathInferenceConfig, *,
mesh_graph, db_manager, logger, graph_n, path_prefix_hex_chars=None):
"""Select the best repeater for a colliding prefix using mesh-graph evidence.
``graph_n`` is the configured prefix width in hex chars (edge keys use it). When the path was
decoded with wider (multi-byte) node IDs, ``path_prefix_hex_chars`` is the node-ID width used
for candidate public-key matching; graph lookups are normalized to ``graph_n``.
"""
if not cfg.graph_based_validation or not mesh_graph:
return None, 0.0, None
if graph_n <= 0:
graph_n = 2
if path_prefix_hex_chars is None:
path_prefix_hex_chars = graph_n
prefix_n = path_prefix_hex_chars if path_prefix_hex_chars >= 2 else graph_n
try:
current_index = path_context.index(node_id) if node_id in path_context else -1
except Exception:
current_index = -1
if current_index == -1:
return None, 0.0, None
prev_node_id = path_context[current_index - 1] if current_index > 0 else None
next_node_id = path_context[current_index + 1] if current_index < len(path_context) - 1 else None
prev_norm = (prev_node_id[:graph_n].lower() if prev_node_id and len(prev_node_id) > graph_n else (prev_node_id.lower() if prev_node_id else None))
next_norm = (next_node_id[:graph_n].lower() if next_node_id and len(next_node_id) > graph_n else (next_node_id.lower() if next_node_id else None))
best_repeater = None
best_score = 0.0
best_method = None
# find_intermediate_nodes() depends only on (prev_norm, next_norm), so compute it at most once.
multi_hop_map: Optional[dict[str, float]] = None
for repeater in repeaters:
pk = repeater.get('public_key') or ''
candidate_prefix = pk[:prefix_n].lower() if pk else None
candidate_public_key = pk.lower() if pk else None
if not candidate_prefix:
continue
candidate_norm = candidate_prefix[:graph_n].lower() if len(candidate_prefix) > graph_n else candidate_prefix
graph_score = mesh_graph.get_candidate_score(
candidate_norm, prev_norm, next_norm, cfg.min_edge_observations,
hop_position=current_index if cfg.graph_use_hop_position else None,
use_bidirectional=cfg.graph_use_bidirectional,
use_hop_position=cfg.graph_use_hop_position,
)
stored_key_bonus = 0.0
if cfg.graph_prefer_stored_keys and candidate_public_key:
if prev_norm:
prev_to_candidate_edge = mesh_graph.get_edge(prev_norm, candidate_norm)
if prev_to_candidate_edge:
stored_to_key = prev_to_candidate_edge.get('to_public_key', '').lower() if prev_to_candidate_edge.get('to_public_key') else None
if stored_to_key and stored_to_key == candidate_public_key:
stored_key_bonus = max(stored_key_bonus, 0.4)
if next_norm:
candidate_to_next_edge = mesh_graph.get_edge(candidate_norm, next_norm)
if candidate_to_next_edge:
stored_from_key = candidate_to_next_edge.get('from_public_key', '').lower() if candidate_to_next_edge.get('from_public_key') else None
if stored_from_key and stored_from_key == candidate_public_key:
stored_key_bonus = max(stored_key_bonus, 0.4)
# Zero-hop / SNR bonus. The bot command requires graph evidence (graph_score > 0) before
# trusting a direct-heard signal, and adds an SNR-confirmed bonus; the web path applies the
# zero-hop bonus unconditionally and has no SNR data.
zero_hop_bonus = 0.0
snr_bonus = 0.0
hop_count = repeater.get('hop_count')
snr = repeater.get('snr')
if cfg.bot_command:
if hop_count is not None and hop_count == 0 and graph_score > 0:
zero_hop_bonus = cfg.graph_zero_hop_bonus
if snr is not None and graph_score > 0:
snr_bonus = cfg.graph_zero_hop_bonus * 1.2
else:
if hop_count is not None and hop_count == 0:
zero_hop_bonus = cfg.graph_zero_hop_bonus
graph_score_with_bonus = min(1.0, graph_score + stored_key_bonus + zero_hop_bonus + snr_bonus)
# The bot adds path-validation before multi-hop; the web adds it at the end (see below).
if cfg.bot_command:
path_validation_bonus = _graph_path_validation_bonus_bot(
candidate_public_key, candidate_prefix, path_context, cfg, db_manager=db_manager, logger=logger
)
graph_score_with_bonus = min(1.0, graph_score_with_bonus + path_validation_bonus)
multi_hop_score = 0.0
if cfg.graph_multi_hop_enabled and graph_score_with_bonus < 0.6 and prev_norm and next_norm:
if multi_hop_map is None:
multi_hop_map = dict(mesh_graph.find_intermediate_nodes(
prev_norm, next_norm, cfg.min_edge_observations, max_hops=cfg.graph_multi_hop_max_hops
))
multi_hop_score = multi_hop_map.get(candidate_norm, 0.0)
candidate_score = max(graph_score_with_bonus, multi_hop_score)
method = 'graph_multihop' if multi_hop_score > graph_score_with_bonus else 'graph'
# Distance penalty for intermediate hops (identical for both callers).
if cfg.graph_distance_penalty_enabled and next_norm is not None:
repeater_lat = repeater.get('latitude')
repeater_lon = repeater.get('longitude')
if repeater_lat is not None and repeater_lon is not None:
max_distance = 0.0
if prev_norm:
prev_to_candidate_edge = mesh_graph.get_edge(prev_norm, candidate_norm)
if prev_to_candidate_edge and prev_to_candidate_edge.get('geographic_distance'):
max_distance = max(max_distance, prev_to_candidate_edge.get('geographic_distance'))
if next_norm:
candidate_to_next_edge = mesh_graph.get_edge(candidate_norm, next_norm)
if candidate_to_next_edge and candidate_to_next_edge.get('geographic_distance'):
max_distance = max(max_distance, candidate_to_next_edge.get('geographic_distance'))
if max_distance > cfg.graph_max_reasonable_hop_distance_km:
excess_distance = max_distance - cfg.graph_max_reasonable_hop_distance_km
normalized_excess = min(excess_distance / cfg.graph_max_reasonable_hop_distance_km, 1.0)
penalty = normalized_excess * cfg.graph_distance_penalty_strength
candidate_score = candidate_score * (1.0 - penalty)
elif max_distance > 0:
if max_distance > cfg.graph_max_reasonable_hop_distance_km * 0.8:
small_penalty = (max_distance - cfg.graph_max_reasonable_hop_distance_km * 0.8) / (cfg.graph_max_reasonable_hop_distance_km * 0.2) * cfg.graph_distance_penalty_strength * 0.5
candidate_score = candidate_score * (1.0 - small_penalty)
# Final-hop bot-proximity bonus.
if next_norm is None and cfg.graph_final_hop_proximity_enabled:
if cfg.bot_latitude is not None and cfg.bot_longitude is not None:
repeater_lat = repeater.get('latitude')
repeater_lon = repeater.get('longitude')
if cfg.bot_command:
has_valid_location = (repeater_lat is not None and repeater_lon is not None and not (repeater_lat == 0.0 and repeater_lon == 0.0))
if has_valid_location:
distance = calculate_distance(cfg.bot_latitude, cfg.bot_longitude, repeater_lat, repeater_lon)
if cfg.graph_final_hop_max_distance > 0 and distance > cfg.graph_final_hop_max_distance:
pass # beyond max distance — skip proximity bonus
else:
candidate_score = _final_hop_blend(candidate_score, distance, cfg)
else:
# No usable location — penalize so located neighbors win the final hop.
candidate_score = candidate_score * 0.5
else:
if repeater_lat is not None and repeater_lon is not None:
distance = calculate_distance(cfg.bot_latitude, cfg.bot_longitude, repeater_lat, repeater_lon)
if cfg.graph_final_hop_max_distance > 0 and distance > cfg.graph_final_hop_max_distance:
candidate_score *= 0.3
else:
candidate_score = _final_hop_blend(candidate_score, distance, cfg)
if not cfg.bot_command:
path_validation_bonus = _graph_path_validation_bonus_web(
candidate_public_key, path_context, current_index, cfg, db_manager=db_manager, logger=logger
)
candidate_score = min(1.0, candidate_score + path_validation_bonus)
if repeater.get('is_starred', False):
candidate_score *= cfg.star_bias_multiplier
if candidate_score > best_score:
best_score = candidate_score
best_repeater = repeater
best_method = method
if best_repeater and best_score > 0.0:
confidence = min(1.0, best_score) if best_score <= 1.0 else 0.95 + (min(0.05, (best_score - 1.0) / cfg.star_bias_multiplier))
return best_repeater, confidence, best_method or 'graph'
return None, 0.0, None
def _final_hop_blend(candidate_score, distance, cfg: PathInferenceConfig):
"""Blend the graph score with bot-proximity for the final hop (shared formula)."""
normalized_distance = min(distance / cfg.graph_final_hop_proximity_normalization_km, 1.0)
proximity_score = 1.0 - normalized_distance
effective_weight = cfg.graph_final_hop_proximity_weight
if distance < cfg.graph_final_hop_very_close_threshold_km:
effective_weight = min(cfg.graph_final_hop_max_proximity_weight, cfg.graph_final_hop_proximity_weight * 2.0)
elif distance < cfg.graph_final_hop_close_threshold_km:
effective_weight = min(0.5, cfg.graph_final_hop_proximity_weight * 1.5)
return candidate_score * (1.0 - effective_weight) + proximity_score * effective_weight
# ---------------------------------------------------------------------------
# Per-node orchestration (shared by both callers)
# ---------------------------------------------------------------------------
def _has_valid_location(repeater) -> bool:
lat = repeater.get('latitude')
lon = repeater.get('longitude')
return lat is not None and lon is not None and not (lat == 0.0 and lon == 0.0)
def select_node_repeater(node_id, candidates, node_ids, cfg: PathInferenceConfig, *,
mesh_graph, db_manager, logger, graph_n, sender_location=None) -> NodeSelection:
"""Resolve one path node from its candidate repeaters (recency + graph + geographic).
Returns a :class:`NodeSelection`; each caller maps it to its own output shape.
"""
if not candidates:
return NodeSelection('not_found', matches=0, recent_repeaters=[])
if len(candidates) == 1:
recent_repeaters = candidates
else:
scored_repeaters = calculate_recency_weighted_scores(candidates, cfg)
recent_repeaters = [r for r, score in scored_repeaters if score >= MIN_RECENCY_THRESHOLD]
if len(recent_repeaters) > 1:
graph_repeater = None
graph_confidence = 0.0
method = None
geo_repeater = None
geo_confidence = 0.0
if cfg.graph_based_validation and mesh_graph:
graph_repeater, graph_confidence, method = select_repeater_by_graph(
recent_repeaters, node_id, node_ids, cfg,
mesh_graph=mesh_graph, db_manager=db_manager, logger=logger,
graph_n=graph_n, path_prefix_hex_chars=len(node_id) if node_id else graph_n,
)
if cfg.geographic_guessing_enabled:
geo_repeater, geo_confidence = select_geographic(
recent_repeaters, node_id, node_ids, cfg,
db_manager=db_manager, logger=logger, sender_location=sender_location,
)
selected_repeater, confidence, method = _combine_selection(
node_id, node_ids, cfg, graph_repeater, graph_confidence, geo_repeater, geo_confidence, method
)
if selected_repeater and confidence >= 0.5:
return NodeSelection('resolved', selected_repeater, confidence, method, len(recent_repeaters), recent_repeaters)
return NodeSelection('collision', None, confidence, method, len(recent_repeaters), recent_repeaters)
elif len(recent_repeaters) == 1:
return NodeSelection('single', recent_repeaters[0], 1.0, None, 1, recent_repeaters)
return NodeSelection('not_found', matches=0, recent_repeaters=[])
def _combine_selection(node_id, node_ids, cfg, graph_repeater, graph_confidence, geo_repeater, geo_confidence, method):
"""Choose between graph and geographic selections (web vs bot semantics)."""
selected_repeater = None
confidence = 0.0
is_final_hop = (node_id == node_ids[-1] if node_ids else False)
if cfg.bot_command:
if cfg.graph_geographic_combined and graph_repeater and geo_repeater:
graph_pubkey = graph_repeater.get('public_key', '')
geo_pubkey = geo_repeater.get('public_key', '')
if graph_pubkey and geo_pubkey and graph_pubkey == geo_pubkey:
confidence = graph_confidence * cfg.graph_geographic_weight + geo_confidence * (1.0 - cfg.graph_geographic_weight)
selected_repeater = graph_repeater
method = 'graph_geographic_combined'
else:
if is_final_hop and graph_repeater and not _has_valid_location(graph_repeater) and geo_repeater:
selected_repeater = geo_repeater
confidence = geo_confidence
method = 'geographic'
elif graph_confidence > geo_confidence:
selected_repeater = graph_repeater
confidence = graph_confidence
method = method or 'graph'
else:
selected_repeater = geo_repeater
confidence = geo_confidence
method = 'geographic'
else:
if graph_repeater and graph_confidence >= cfg.graph_confidence_override_threshold:
if is_final_hop and not _has_valid_location(graph_repeater) and geo_repeater:
selected_repeater = geo_repeater
confidence = geo_confidence
method = 'geographic'
else:
selected_repeater = graph_repeater
confidence = graph_confidence
method = method or 'graph'
elif not graph_repeater or graph_confidence < cfg.graph_confidence_override_threshold:
if geo_repeater and (not graph_repeater or geo_confidence > graph_confidence):
selected_repeater = geo_repeater
confidence = geo_confidence
method = 'geographic'
elif graph_repeater:
if is_final_hop and not _has_valid_location(graph_repeater) and geo_repeater:
selected_repeater = geo_repeater
confidence = geo_confidence
method = 'geographic'
else:
selected_repeater = graph_repeater
confidence = graph_confidence
method = method or 'graph'
return selected_repeater, confidence, method
# Web/default semantics.
if cfg.graph_geographic_combined and graph_repeater and geo_repeater:
graph_pubkey = graph_repeater.get('public_key', '')
geo_pubkey = geo_repeater.get('public_key', '')
if graph_pubkey and geo_pubkey and graph_pubkey == geo_pubkey:
confidence = graph_confidence * cfg.graph_geographic_weight + geo_confidence * (1.0 - cfg.graph_geographic_weight)
selected_repeater = graph_repeater
else:
if graph_confidence > geo_confidence:
selected_repeater = graph_repeater
confidence = graph_confidence
else:
selected_repeater = geo_repeater
confidence = geo_confidence
else:
if is_final_hop and geo_repeater and geo_confidence >= 0.6:
if not graph_repeater or geo_confidence >= graph_confidence * 0.9:
selected_repeater = geo_repeater
confidence = geo_confidence
elif graph_repeater:
selected_repeater = graph_repeater
confidence = graph_confidence
elif graph_repeater and graph_confidence >= cfg.graph_confidence_override_threshold:
selected_repeater = graph_repeater
confidence = graph_confidence
elif not graph_repeater or graph_confidence < cfg.graph_confidence_override_threshold:
if geo_repeater and (not graph_repeater or geo_confidence > graph_confidence):
selected_repeater = geo_repeater
confidence = geo_confidence
elif graph_repeater:
selected_repeater = graph_repeater
confidence = graph_confidence
return selected_repeater, confidence, method
# ---------------------------------------------------------------------------
# Public entrypoint (web viewer)
# ---------------------------------------------------------------------------
def decode_path_nodes(
path_hex: str,
bytes_per_hop: int | None = None,
*,
config,
db_manager,
logger,
mesh_graph=None,
include_location: bool = False,
lookup_func: Optional[Callable[[str], list[dict[str, Any]]]] = None,
) -> list[dict[str, Any]]:
"""Decode a hex path string to repeater nodes (see module docstring).
Args:
path_hex: hex path string (continuous or separated).
bytes_per_hop: 1/2/3 when known from a packet/contact; else derived from config.
config: ConfigParser-like accessor for [Bot]/[Path_Command] settings.
db_manager: object exposing execute_query(sql, params) -> list[dict].
logger: logging.Logger.
mesh_graph: MeshGraph instance, or None to disable graph-based selection.
include_location: when True, found nodes also carry latitude/longitude/last_seen
(used by the mesh-map /api/mesh/resolve-path caller).
lookup_func: optional candidate provider, Callable(node_id) -> list[repeater dict]. When
given, candidates come from it instead of the database query (used by callers/tests
that inject candidates).
"""
import re
cfg = PathInferenceConfig.from_config(config)
# Parse the path input - use bytes_per_hop when provided (e.g. from packet/contact)
if bytes_per_hop is not None and bytes_per_hop in (1, 2, 3):
prefix_hex_chars = bytes_per_hop * 2
else:
prefix_hex_chars = config.getint('Bot', 'prefix_bytes', fallback=1) * 2
if prefix_hex_chars <= 0:
prefix_hex_chars = 2
path_input_clean = path_hex.replace(' ', '').replace(',', '').replace(':', '')
if re.match(r'^[0-9a-fA-F]{4,}$', path_input_clean):
hex_matches = [path_input_clean[i:i+prefix_hex_chars] for i in range(0, len(path_input_clean), prefix_hex_chars)]
if (len(path_input_clean) % prefix_hex_chars) != 0 and prefix_hex_chars > 2:
hex_matches = [path_input_clean[i:i+2] for i in range(0, len(path_input_clean), 2)]
else:
path_input = path_hex.replace(',', ' ').replace(':', ' ')
hex_pattern = rf'[0-9a-fA-F]{{{prefix_hex_chars}}}'
hex_matches = re.findall(hex_pattern, path_input)
if not hex_matches and prefix_hex_chars > 2:
hex_pattern = r'[0-9a-fA-F]{2}'
hex_matches = re.findall(hex_pattern, path_input)
if not hex_matches:
return []
node_ids = [match.upper() for match in hex_matches]
def _loc(rep):
if not include_location:
return {}
return {
'latitude': rep.get('latitude'),
'longitude': rep.get('longitude'),
'last_seen': rep.get('last_seen'),
}
decoded_path = []
try:
for node_id in node_ids:
if lookup_func is not None:
results = lookup_func(node_id)
elif cfg.max_repeater_age_days > 0:
query = f'''
SELECT name, public_key, device_type, last_heard, last_heard as last_seen,
last_advert_timestamp, latitude, longitude, city, state, country,
advert_count, signal_strength, hop_count, role, is_starred
FROM complete_contact_tracking
WHERE public_key LIKE ? AND role IN ('repeater', 'roomserver')
AND (
(last_advert_timestamp IS NOT NULL AND last_advert_timestamp >= datetime('now', 'localtime', '-{cfg.max_repeater_age_days} days'))
OR (last_advert_timestamp IS NULL AND last_heard >= datetime('now', 'localtime', '-{cfg.max_repeater_age_days} days'))
)
ORDER BY COALESCE(last_advert_timestamp, last_heard) DESC
'''
results = db_manager.execute_query(query, (f"{node_id}%",))
else:
query = '''
SELECT name, public_key, device_type, last_heard, last_heard as last_seen,
last_advert_timestamp, latitude, longitude, city, state, country,
advert_count, signal_strength, hop_count, role, is_starred
FROM complete_contact_tracking
WHERE public_key LIKE ? AND role IN ('repeater', 'roomserver')
ORDER BY COALESCE(last_advert_timestamp, last_heard) DESC
'''
results = db_manager.execute_query(query, (f"{node_id}%",))
if results:
repeaters_data = [
{
'name': row['name'],
'public_key': row['public_key'],
'device_type': row['device_type'],
'last_seen': row['last_seen'],
'last_heard': row.get('last_heard', row['last_seen']),
'last_advert_timestamp': row.get('last_advert_timestamp'),
'is_active': True,
'latitude': row['latitude'],
'longitude': row['longitude'],
'city': row['city'],
'state': row['state'],
'country': row['country'],
'hop_count': row.get('hop_count'),
'is_starred': bool(row.get('is_starred', 0)),
} for row in results
]
selection = select_node_repeater(
node_id, repeaters_data, node_ids, cfg,
mesh_graph=mesh_graph, db_manager=db_manager, logger=logger,
graph_n=prefix_hex_chars,
)
if selection.status == 'resolved':
selected_repeater = selection.repeater
decoded_path.append({
'node_id': node_id,
'name': selected_repeater['name'],
'public_key': selected_repeater['public_key'],
'device_type': selected_repeater['device_type'],
'role': selected_repeater.get('role', 'repeater'),
'found': True,
'geographic_guess': selection.confidence < 0.8,
'collision': True,
'matches': selection.matches,
**_loc(selected_repeater),
})
elif selection.status == 'collision':
fallback = selection.recent_repeaters[0]
decoded_path.append({
'node_id': node_id,
'name': fallback['name'],
'public_key': fallback['public_key'],
'device_type': fallback['device_type'],
'role': fallback.get('role', 'repeater'),
'found': True,
'geographic_guess': True,
'collision': True,
'matches': selection.matches,
**_loc(fallback),
})
elif selection.status == 'single':
repeater = selection.repeater
decoded_path.append({
'node_id': node_id,
'name': repeater['name'],
'public_key': repeater['public_key'],
'device_type': repeater['device_type'],
'role': repeater.get('role', 'repeater'),
'found': True,
'geographic_guess': False,
'collision': False,
'matches': 1,
**_loc(repeater),
})
else:
decoded_path.append({
'node_id': node_id,
'name': None,
'found': False,
})
else:
decoded_path.append({
'node_id': node_id,
'name': None,
'found': False,
})
except Exception as e:
logger.error(f"Error decoding path: {e}", exc_info=True)
return []
return decoded_path