Files
meshcore-bot/modules/mesh_graph.py
agessaman 2178a80dca Refactor cleanup methods in core.py and mesh_graph.py to suppress logging during shutdown
- Updated the _cleanup_web_viewer and _cleanup_mesh_graph methods to avoid logging errors during shutdown, as the logger's stream may be closed at that time.
- Modified the shutdown method in MeshGraph to prevent logging of flushing errors, enhancing stability during the atexit process.
- Adjusted test configurations to use Path objects for bot_root and local_root, improving path handling in tests.
2026-03-11 20:41:44 -07:00

1405 lines
64 KiB
Python

#!/usr/bin/env python3
"""
Mesh Graph Module
Tracks observed connections between repeaters to improve path guessing accuracy.
Persists graph state across bot restarts for development scenarios.
Multi-resolution storage and node identity:
Edges are stored at the resolution observed (2, 4, or 6 hex chars per node). Nodes
with the same logical identity (e.g. 01, 0101, 0101C1) are treated as one: on read,
get_edge uses prefix matching and returns the best-matching edge (longest prefix,
then observation_count, then last_seen). On write:
- 1-byte observations (2-char prefix): merge into an existing 2/3-byte edge only
when the edge is unique (exactly one prefix-matching edge). If zero or multiple
matches exist, create or update only the 1-byte edge to avoid false coalescing.
- 2/3-byte observations: merge into the best-matching edge when present; when the
new observation is more specific than the existing edge, promote (remove old edge,
add new at higher resolution with merged observation count; DB: delete old row,
insert new). Distinct links (e.g. 7e42→8611 and 7e99→86ff) stay separate.
"""
import sqlite3
import sys
import time
import threading
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Set
from collections import defaultdict
class MeshGraph:
"""Graph structure tracking observed connections between mesh nodes."""
def __init__(self, bot):
"""Initialize the mesh graph.
Args:
bot: Bot instance with db_manager and config access.
"""
self.bot = bot
self.logger = bot.logger
self.db_manager = bot.db_manager
# Capture/validation feature flags
# graph_capture_enabled: controls whether new edge data is collected from packets
# When False, no new edges are added and the batch writer thread is not started.
self.capture_enabled = bot.config.getboolean('Path_Command', 'graph_capture_enabled', fallback=True)
# In-memory graph storage: {(from_prefix, to_prefix): edge_data}
self.edges: Dict[Tuple[str, str], Dict] = {}
# Adjacency indexes for O(1) neighbour lookups (derived from self.edges)
self._outgoing_index: Dict[str, Set[str]] = defaultdict(set) # from_prefix -> set of to_prefixes
self._incoming_index: Dict[str, Set[str]] = defaultdict(set) # to_prefix -> set of from_prefixes
# Per-edge last-notification timestamps for web viewer throttling (unix float)
self._notification_timestamps: Dict[Tuple[str, str], float] = {}
# Track pending updates for batched writes
self.pending_updates: Set[Tuple[str, str]] = set()
self.pending_lock = threading.Lock()
# Write strategy configuration
self.write_strategy = bot.config.get('Path_Command', 'graph_write_strategy', fallback='hybrid')
self.batch_interval = bot.config.getint('Path_Command', 'graph_batch_interval_seconds', fallback=30)
self.batch_max_pending = bot.config.getint('Path_Command', 'graph_batch_max_pending', fallback=100)
# Default 14 days: edges older than this carry near-zero recency confidence anyway.
# Set to 0 in config.ini to load all historical edges (e.g. on servers with ample RAM).
self.startup_load_days = bot.config.getint('Path_Command', 'graph_startup_load_days', fallback=14)
self.edge_expiration_days = bot.config.getint('Path_Command', 'graph_edge_expiration_days', fallback=7)
# Background task for batched writes
self._batch_task = None
self._shutdown_event = threading.Event()
# Load graph from database on startup
self._load_from_database()
# Start background batch writer only when capture is active
if self.capture_enabled and self.write_strategy in ('batched', 'hybrid'):
self._start_batch_writer()
def _prefix_len(self) -> int:
"""Return configured prefix length in hex chars (always an int for slicing)."""
n = getattr(self.bot, 'prefix_hex_chars', 2)
try:
return max(2, int(n))
except (TypeError, ValueError):
return 2
def _valid_prefix_length(self, prefix: str) -> bool:
"""Return True if prefix has 2, 4, or 6 hex chars (after stripping)."""
if not prefix or not isinstance(prefix, str):
return False
s = prefix.strip().lower()
if len(s) not in (2, 4, 6):
return False
return all(c in '0123456789abcdef' for c in s)
def _prefix_match(self, a: str, b: str) -> bool:
"""Return True if a and b match: exact or one is a prefix of the other (after lowercasing)."""
if not a or not b:
return False
a, b = a.lower().strip(), b.lower().strip()
return a == b or a.startswith(b) or b.startswith(a)
def _get_edge_by_prefix_match(self, from_q: str, to_q: str) -> Optional[Dict]:
"""Return the best matching edge for a prefix query. Single edge only; never merge counts.
Tie-break: exact key if present, else longest combined prefix length, then max
observation_count, then most recent last_seen.
"""
matches = self._find_all_matching_edges(from_q, to_q)
if not matches:
return None
# Return the edge data of the best match (first in list)
return matches[0][1]
def _find_all_matching_edges(
self, from_prefix: str, to_prefix: str
) -> List[Tuple[Tuple[str, str], Dict]]:
"""Return all edges that prefix-match (from_prefix, to_prefix), ordered by best match first.
Best = longest combined prefix length, then observation_count desc, then last_seen desc.
Used for 1-byte uniqueness check (merge only when len==1) and for 2/3-byte merge/promote.
"""
from_q = from_prefix.lower().strip() if from_prefix else ""
to_q = to_prefix.lower().strip() if to_prefix else ""
if not from_q or not to_q:
return []
candidates: List[Tuple[Tuple[str, str], Dict]] = []
for edge_key, edge in self.edges.items():
from_p, to_p = edge_key
if self._prefix_match(from_p, from_q) and self._prefix_match(to_p, to_q):
candidates.append((edge_key, edge))
if not candidates:
return []
def sort_key(item):
edge_key, edge = item
from_p, to_p = edge_key
spec = len(from_p) + len(to_p)
obs = edge.get("observation_count", 0)
last = edge.get("last_seen")
if isinstance(last, str):
try:
last = datetime.fromisoformat(last.replace("Z", "+00:00"))
except ValueError:
last = datetime.min
last_ts = last if isinstance(last, datetime) else datetime.min
return (-spec, -obs, last_ts) # desc spec, desc obs, asc time -> most recent last
candidates.sort(key=sort_key)
return candidates
def _remove_edge_from_memory(self, edge_key: Tuple[str, str]) -> None:
"""Remove an edge from in-memory graph and adjacency indexes.
Does not touch the database. Used when promoting to a higher-resolution key.
"""
if edge_key not in self.edges:
return
from_p, to_p = edge_key
del self.edges[edge_key]
if from_p in self._outgoing_index:
self._outgoing_index[from_p].discard(to_p)
if not self._outgoing_index[from_p]:
del self._outgoing_index[from_p]
if to_p in self._incoming_index:
self._incoming_index[to_p].discard(from_p)
if not self._incoming_index[to_p]:
del self._incoming_index[to_p]
self._notification_timestamps.pop(edge_key, None)
def _delete_edge_from_db(
self, edge_key: Tuple[str, str], conn: Optional[sqlite3.Connection] = None
) -> int:
"""Delete a single edge row from mesh_connections. Returns rows affected."""
from_p, to_p = edge_key
query = "DELETE FROM mesh_connections WHERE from_prefix = ? AND to_prefix = ?"
params = (from_p, to_p)
if conn is not None:
return self.db_manager.execute_update_on_connection(conn, query, params)
return self.db_manager.execute_update(query, params)
def _update_edge_data(
self,
edge: Dict,
now: datetime,
hop_position: Optional[int] = None,
from_public_key: Optional[str] = None,
to_public_key: Optional[str] = None,
geographic_distance: Optional[float] = None,
prefix_bytes: int = 1,
) -> None:
"""Apply one observation to an edge dict (increment count, last_seen, optional fields)."""
edge["observation_count"] = edge.get("observation_count", 0) + 1
edge["last_seen"] = now
if hop_position is not None:
current_avg = edge.get("avg_hop_position")
count = edge["observation_count"]
if current_avg is not None:
edge["avg_hop_position"] = ((current_avg * (count - 1)) + hop_position) / count
else:
edge["avg_hop_position"] = hop_position
if from_public_key:
edge["from_public_key"] = from_public_key
if to_public_key:
edge["to_public_key"] = to_public_key
if geographic_distance is not None:
edge["geographic_distance"] = geographic_distance
if prefix_bytes == 2:
edge["confirmed_2byte"] = True
def _load_from_database(self):
"""Load graph edges from database on startup."""
try:
query = '''
SELECT from_prefix, to_prefix, from_public_key, to_public_key,
observation_count, first_seen, last_seen, avg_hop_position,
geographic_distance
FROM mesh_connections
'''
# Build WHERE clause combining startup_load_days and edge_expiration_days.
# startup_load_days: explicit cap on how far back to load (0 = no cap).
# edge_expiration_days: always applied — never load edges we would immediately
# evict as expired (this bounds memory even when startup_load_days=0).
where_parts = []
if self.startup_load_days > 0:
cutoff_date = datetime.now() - timedelta(days=self.startup_load_days)
where_parts.append(f"last_seen >= '{cutoff_date.isoformat()}'")
if self.edge_expiration_days > 0:
expiry_date = datetime.now() - timedelta(days=self.edge_expiration_days)
where_parts.append(f"last_seen >= '{expiry_date.isoformat()}'")
if where_parts:
# Use the most restrictive (most recent) cutoff
query += " WHERE " + " AND ".join(where_parts)
query += " ORDER BY last_seen DESC"
results = self.db_manager.execute_query(query)
edge_count = 0
for row in results:
from_prefix = row['from_prefix']
to_prefix = row['to_prefix']
edge_key = (from_prefix, to_prefix)
# Intern public key strings so identical keys across many edges share
# a single string object in memory rather than duplicating bytes.
from_pk = row.get('from_public_key')
to_pk = row.get('to_public_key')
if from_pk:
from_pk = sys.intern(from_pk)
if to_pk:
to_pk = sys.intern(to_pk)
self.edges[edge_key] = {
'from_prefix': from_prefix,
'to_prefix': to_prefix,
'from_public_key': from_pk,
'to_public_key': to_pk,
'observation_count': row.get('observation_count', 1),
'first_seen': row.get('first_seen'),
'last_seen': row.get('last_seen'),
'avg_hop_position': row.get('avg_hop_position'),
'geographic_distance': row.get('geographic_distance')
}
# Maintain adjacency indexes
self._outgoing_index[from_prefix].add(to_prefix)
self._incoming_index[to_prefix].add(from_prefix)
edge_count += 1
self.logger.info(f"Loaded {edge_count} graph edges from database")
# Log statistics
if edge_count > 0:
total_observations = sum(e['observation_count'] for e in self.edges.values())
self.logger.info(f"Graph statistics: {edge_count} edges, {total_observations} total observations")
# Belt-and-suspenders: prune any edges that slipped through the SQL filter
# (e.g. timezone edge cases or edges loaded before expiration_days was set)
self.prune_expired_edges()
except Exception as e:
self.logger.warning(f"Error loading graph from database: {e}")
# Continue with empty graph
def add_edge(self, from_prefix: str, to_prefix: str,
from_public_key: Optional[str] = None,
to_public_key: Optional[str] = None,
hop_position: Optional[int] = None,
geographic_distance: Optional[float] = None,
prefix_bytes: int = 1):
"""Add or update an edge in the graph.
Prefixes are stored at the resolution provided (2, 4, or 6 hex chars).
No truncation; the same physical link can be recorded at different
resolutions and will create/update the matching edge.
Args:
from_prefix: Source node prefix (2, 4, or 6 hex chars depending on path encoding).
to_prefix: Destination node prefix (2, 4, or 6 hex chars depending on path encoding).
from_public_key: Full public key of source node (optional).
to_public_key: Full public key of destination node (optional).
hop_position: Position in path where this edge was observed (optional).
geographic_distance: Distance in km between nodes (optional).
prefix_bytes: 1 = 1-byte (2-char) prefix (default); 2 = 2-byte confirmed (stored as edge flag for weighting).
"""
if not from_prefix or not to_prefix:
return
# Respect the capture kill-switch — allow reads but block writes
if not self.capture_enabled:
return
# Normalize to lowercase; validate length (2, 4, or 6 hex chars). No truncation.
from_prefix = from_prefix.lower().strip()
to_prefix = to_prefix.lower().strip()
if not self._valid_prefix_length(from_prefix) or not self._valid_prefix_length(to_prefix):
return
# Intern public key strings so repeated identical keys share one object in RAM
if from_public_key:
from_public_key = sys.intern(from_public_key)
if to_public_key:
to_public_key = sys.intern(to_public_key)
edge_key = (from_prefix, to_prefix)
now = datetime.now()
matches = self._find_all_matching_edges(from_prefix, to_prefix)
best = matches[0] if matches else None
incoming_1byte = len(from_prefix) == 2 or len(to_prefix) == 2
# 1-byte: merge only when exactly one matching edge (unique link)
if incoming_1byte and len(matches) == 1:
target_key, target_edge = matches[0]
self._update_edge_data(
target_edge, now, hop_position,
from_public_key, to_public_key, geographic_distance, prefix_bytes,
)
self._persist_and_notify_edge(target_key, is_new_edge=False)
return
# 2/3-byte: merge into best match or promote
if not incoming_1byte and best is not None:
best_key, best_edge = best
best_spec = len(best_key[0]) + len(best_key[1])
new_spec = len(from_prefix) + len(to_prefix)
if best_key == edge_key:
# Update in place
self._update_edge_data(
best_edge, now, hop_position,
from_public_key, to_public_key, geographic_distance, prefix_bytes,
)
self._persist_and_notify_edge(edge_key, is_new_edge=False)
return
if best_spec > new_spec:
# Best match is more specific — update that edge and return
self._update_edge_data(
best_edge, now, hop_position,
from_public_key, to_public_key, geographic_distance, prefix_bytes,
)
self._persist_and_notify_edge(best_key, is_new_edge=False)
return
if best_spec < new_spec:
# Don't promote 1-byte to 3-byte when the existing 1-byte edge has no public_key:
# keep the 1-byte edge so we don't attribute its observations to one specific 3-byte node
# (other e0 repeaters would otherwise have no edges and appear removed).
best_is_1byte = len(best_key[0]) == 2 and len(best_key[1]) == 2
if best_is_1byte and not best_edge.get("from_public_key") and not best_edge.get("to_public_key"):
self._update_edge_data(
best_edge, now, hop_position,
from_public_key, to_public_key, geographic_distance, prefix_bytes,
)
self._persist_and_notify_edge(best_key, is_new_edge=False)
return
# Promote: remove old edge, add new at higher resolution with merged count
merged_count = best_edge["observation_count"] + 1
first_seen = best_edge.get("first_seen") or now
self._remove_edge_from_memory(best_key)
self._delete_edge_from_db(best_key)
with self.pending_lock:
self.pending_updates.discard(best_key)
self.edges[edge_key] = {
"from_prefix": from_prefix,
"to_prefix": to_prefix,
"from_public_key": from_public_key or best_edge.get("from_public_key"),
"to_public_key": to_public_key or best_edge.get("to_public_key"),
"observation_count": merged_count,
"first_seen": first_seen,
"last_seen": now,
"avg_hop_position": hop_position if hop_position is not None else best_edge.get("avg_hop_position"),
"geographic_distance": geographic_distance if geographic_distance is not None else best_edge.get("geographic_distance"),
"confirmed_2byte": True if prefix_bytes == 2 else best_edge.get("confirmed_2byte", False),
}
self._outgoing_index[from_prefix].add(to_prefix)
self._incoming_index[to_prefix].add(from_prefix)
self._persist_and_notify_edge(edge_key, is_new_edge=True)
return
# Exact-key create or update (1-byte with 0 or 2+ matches, or 2/3-byte with no match)
if edge_key in self.edges:
edge = self.edges[edge_key]
edge['observation_count'] += 1
edge['last_seen'] = now
# Update average hop position
if hop_position is not None:
current_avg = edge.get('avg_hop_position')
count = edge['observation_count']
if current_avg is not None:
# Weighted average: (old_avg * (count-1) + new_pos) / count
edge['avg_hop_position'] = ((current_avg * (count - 1)) + hop_position) / count
else:
# First time setting hop position
edge['avg_hop_position'] = hop_position
# Update public keys if provided (always update if we have a better key)
# This allows us to fill in missing keys on existing edges
if from_public_key:
edge['from_public_key'] = from_public_key
if to_public_key:
edge['to_public_key'] = to_public_key
# Update geographic distance if provided
if geographic_distance is not None:
edge['geographic_distance'] = geographic_distance
# 2-byte trace confirmation (for path weighting when supported)
if prefix_bytes == 2:
edge['confirmed_2byte'] = True
is_new_edge = False
else:
# New edge — also update adjacency indexes
self.edges[edge_key] = {
'from_prefix': from_prefix,
'to_prefix': to_prefix,
'from_public_key': from_public_key,
'to_public_key': to_public_key,
'observation_count': 1,
'first_seen': now,
'last_seen': now,
'avg_hop_position': hop_position if hop_position is not None else None,
'geographic_distance': geographic_distance,
'confirmed_2byte': True if prefix_bytes == 2 else False,
}
self._outgoing_index[from_prefix].add(to_prefix)
self._incoming_index[to_prefix].add(from_prefix)
is_new_edge = True
self._persist_and_notify_edge(edge_key, is_new_edge)
def _persist_and_notify_edge(self, edge_key: Tuple[str, str], is_new_edge: bool) -> None:
"""Persist edge to DB (according to write strategy) and notify web viewer."""
self.logger.debug(f"Mesh graph: Edge {edge_key} - new={is_new_edge}, strategy={self.write_strategy}")
if self.write_strategy == 'immediate':
self._write_edge_to_db(edge_key, is_new_edge)
elif self.write_strategy == 'batched':
with self.pending_lock:
self.pending_updates.add(edge_key)
if len(self.pending_updates) >= self.batch_max_pending:
self._flush_pending_updates_sync()
elif self.write_strategy == 'hybrid':
if is_new_edge:
self._write_edge_to_db(edge_key, True)
else:
with self.pending_lock:
self.pending_updates.add(edge_key)
if len(self.pending_updates) >= self.batch_max_pending:
self._flush_pending_updates_sync()
self._notify_web_viewer_edge(edge_key, is_new_edge)
def _notify_web_viewer_edge(self, edge_key: Tuple[str, str], is_new: bool):
"""Notify web viewer of edge update via bot integration.
New edges always trigger an immediate notification. Updates to existing
edges are throttled to at most once every 10 seconds to reduce HTTP
traffic on busy meshes.
"""
try:
if not hasattr(self.bot, 'web_viewer_integration') or not self.bot.web_viewer_integration:
return
if not hasattr(self.bot.web_viewer_integration, 'bot_integration'):
return
edge = self.edges.get(edge_key)
if not edge:
return
# Throttle repeated updates for the same edge (new edges always notify)
now_ts = time.time()
if not is_new:
last_notified = self._notification_timestamps.get(edge_key, 0.0)
if (now_ts - last_notified) < 10.0:
return # Skip — notified recently enough
self._notification_timestamps[edge_key] = now_ts
# Prepare edge data for web viewer
edge_data = {
'from_prefix': edge['from_prefix'],
'to_prefix': edge['to_prefix'],
'from_public_key': edge.get('from_public_key'),
'to_public_key': edge.get('to_public_key'),
'observation_count': edge['observation_count'],
'first_seen': edge['first_seen'].isoformat() if isinstance(edge['first_seen'], datetime) else str(edge['first_seen']),
'last_seen': edge['last_seen'].isoformat() if isinstance(edge['last_seen'], datetime) else str(edge['last_seen']),
'avg_hop_position': edge.get('avg_hop_position'),
'geographic_distance': edge.get('geographic_distance'),
'is_new': is_new
}
# Send update asynchronously
self.bot.web_viewer_integration.bot_integration.send_mesh_edge_update(edge_data)
except Exception as e:
self.logger.debug(f"Error notifying web viewer of edge update: {e}")
def _recalculate_distance_if_needed(
self,
edge: Dict,
conn: Optional[sqlite3.Connection] = None,
location_cache: Optional[Dict[str, Tuple[float, float]]] = None,
) -> Optional[float]:
"""Recalculate geographic distance using full public keys if available.
This ensures we get the correct location when there are prefix collisions.
Args:
edge: Edge dictionary with prefix and optional public keys.
conn: Optional existing DB connection for batch operations.
location_cache: Optional cache for location lookups within a flush (keyed by pk: or prefix:).
Returns:
Optional[float]: Recalculated distance in km, or None if can't calculate.
"""
from .utils import calculate_distance
# Get location for 'from' node (conn optional for single-connection batch flush)
if edge.get('from_public_key'):
from_location = self._get_location_by_public_key(
edge['from_public_key'], conn=conn, location_cache=location_cache
)
else:
from_location = None
if not from_location:
to_location_temp = None
if edge.get('to_public_key'):
to_location_temp = self._get_location_by_public_key(
edge['to_public_key'], conn=conn, location_cache=location_cache
)
if not to_location_temp:
to_location_temp = self._get_location_by_prefix(
edge['to_prefix'], conn=conn, location_cache=location_cache
)
from_location = self._get_location_by_prefix(
edge['from_prefix'], to_location_temp, conn=conn, location_cache=location_cache
)
# Get location for 'to' node
if edge.get('to_public_key'):
to_location = self._get_location_by_public_key(
edge['to_public_key'], conn=conn, location_cache=location_cache
)
else:
to_location = None
if not to_location:
to_location = self._get_location_by_prefix(
edge['to_prefix'], from_location, conn=conn, location_cache=location_cache
)
# Calculate distance if we have both locations
if from_location and to_location:
return calculate_distance(
from_location[0], from_location[1],
to_location[0], to_location[1]
)
return None
def _get_location_by_public_key(
self,
public_key: str,
conn: Optional[sqlite3.Connection] = None,
location_cache: Optional[Dict[str, Tuple[float, float]]] = None,
) -> Optional[Tuple[float, float]]:
"""Get location for a full public key (more accurate than prefix lookup).
Prefers starred repeaters if there are somehow multiple entries (shouldn't happen with full key).
"""
cache_key = f"pk:{public_key}" if location_cache is not None else None
if cache_key is not None and cache_key in location_cache:
return location_cache[cache_key]
try:
query = '''
SELECT latitude, longitude
FROM complete_contact_tracking
WHERE public_key = ?
AND latitude IS NOT NULL AND longitude IS NOT NULL
AND latitude != 0 AND longitude != 0
AND role IN ('repeater', 'roomserver')
ORDER BY is_starred DESC, COALESCE(last_advert_timestamp, last_heard) DESC
LIMIT 1
'''
if conn is not None:
results = self.db_manager.execute_query_on_connection(conn, query, (public_key,))
else:
results = self.db_manager.execute_query(query, (public_key,))
if results:
row = results[0]
lat = row.get('latitude')
lon = row.get('longitude')
if lat is not None and lon is not None:
result = (float(lat), float(lon))
if cache_key is not None:
location_cache[cache_key] = result
return result
except Exception as e:
self.logger.debug(f"Error getting location by public key {public_key[:16]}...: {e}")
return None
def _get_location_by_prefix(
self,
prefix: str,
reference_location: Optional[Tuple[float, float]] = None,
conn: Optional[sqlite3.Connection] = None,
location_cache: Optional[Dict[str, Tuple[float, float]]] = None,
) -> Optional[Tuple[float, float]]:
"""Get location for a prefix (fallback when full public key not available).
For LoRa networks, prefers shorter distances when there are prefix collisions,
as LoRa range is limited by the curve of the earth.
Args:
prefix: 2-character hex prefix.
reference_location: Optional (lat, lon) to calculate distance from for LoRa preference.
conn: Optional existing DB connection for batch operations.
location_cache: Optional cache for location lookups within a flush.
"""
if location_cache is not None:
if reference_location is not None:
cache_key = f"prefix:{prefix}:{reference_location[0]}:{reference_location[1]}"
else:
cache_key = f"prefix:{prefix}"
if cache_key in location_cache:
return location_cache[cache_key]
try:
prefix_pattern = f"{prefix}%"
# Get all candidates with locations
query = '''
SELECT latitude, longitude, is_starred,
COALESCE(last_advert_timestamp, last_heard) as last_seen
FROM complete_contact_tracking
WHERE public_key LIKE ?
AND latitude IS NOT NULL AND longitude IS NOT NULL
AND latitude != 0 AND longitude != 0
AND role IN ('repeater', 'roomserver')
'''
if conn is not None:
results = self.db_manager.execute_query_on_connection(conn, query, (prefix_pattern,))
else:
results = self.db_manager.execute_query(query, (prefix_pattern,))
if not results:
return None
# If we have a reference location, prefer shorter distances (LoRa range limitation)
if reference_location and len(results) > 1:
from .utils import calculate_distance
ref_lat, ref_lon = reference_location
# Calculate distances and sort by distance (shorter first)
candidates_with_distance = []
for row in results:
lat = row.get('latitude')
lon = row.get('longitude')
if lat is not None and lon is not None:
distance = calculate_distance(ref_lat, ref_lon, float(lat), float(lon))
is_starred = row.get('is_starred', False)
last_seen = row.get('last_seen', '')
candidates_with_distance.append((distance, is_starred, last_seen, row))
if candidates_with_distance:
# Sort by: starred first (False < True), then distance (shorter = better for LoRa), then recency
candidates_with_distance.sort(key=lambda x: (
not x[1], # Starred first (False < True, so starred=True comes before starred=False)
x[0], # Distance (shorter first)
x[2] if x[2] else '' # More recent first (newer timestamps sort later in string comparison)
))
# Get the best candidate
best_row = candidates_with_distance[0][3]
lat = best_row.get('latitude')
lon = best_row.get('longitude')
if lat is not None and lon is not None:
result = (float(lat), float(lon))
if location_cache is not None and reference_location is not None:
cache_key = f"prefix:{prefix}:{reference_location[0]}:{reference_location[1]}"
location_cache[cache_key] = result
return result
# No reference location or single result - use standard ordering
# Prefer starred, then most recent
results.sort(key=lambda x: (
not x.get('is_starred', False), # Starred first (False < True)
x.get('last_seen', '') if x.get('last_seen') else '' # More recent first
))
row = results[0]
lat = row.get('latitude')
lon = row.get('longitude')
if lat is not None and lon is not None:
result = (float(lat), float(lon))
if location_cache is not None:
cache_key = f"prefix:{prefix}" if reference_location is None else f"prefix:{prefix}:{reference_location[0]}:{reference_location[1]}"
location_cache[cache_key] = result
return result
except Exception as e:
self.logger.debug(f"Error getting location by prefix {prefix}: {e}")
return None
def _write_edge_to_db(
self,
edge_key: Tuple[str, str],
is_new: bool,
conn: Optional[sqlite3.Connection] = None,
location_cache: Optional[Dict[str, Tuple[float, float]]] = None,
skip_distance_recalc: bool = False,
):
"""Write a single edge to the database.
Args:
edge_key: (from_prefix, to_prefix) tuple.
is_new: True if this is a new edge, False if updating existing.
conn: Optional existing DB connection for batch operations (caller commits).
location_cache: Optional cache for location lookups within a flush.
skip_distance_recalc: If True, skip distance recalculation (used by
_flush_pending_updates_sync which already recalculates before calling here).
"""
if edge_key not in self.edges:
return
edge = self.edges[edge_key]
# Recalculate distance using full public keys if available (more accurate).
# Skipped when called from the batch flush loop, which already recalculated.
if not skip_distance_recalc and (edge.get('from_public_key') or edge.get('to_public_key')):
recalculated_distance = self._recalculate_distance_if_needed(
edge, conn=conn, location_cache=location_cache
)
if recalculated_distance is not None:
edge['geographic_distance'] = recalculated_distance
self.logger.debug(f"Mesh graph: Recalculated distance for {edge_key} using public keys: {recalculated_distance:.1f} km")
try:
if is_new:
# Upsert new edge.
# Use INSERT ... ON CONFLICT DO UPDATE so that if the row already exists
# in the database (e.g. it was filtered out of the in-memory graph at
# startup by startup_load_days / edge_expiration_days, or written by a
# concurrent process), we merge rather than fail with UNIQUE constraint.
query = '''
INSERT INTO mesh_connections
(from_prefix, to_prefix, from_public_key, to_public_key,
observation_count, first_seen, last_seen, avg_hop_position,
geographic_distance)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(from_prefix, to_prefix) DO UPDATE SET
observation_count = MAX(observation_count, excluded.observation_count),
last_seen = MAX(last_seen, excluded.last_seen),
avg_hop_position = excluded.avg_hop_position,
geographic_distance = COALESCE(excluded.geographic_distance, geographic_distance),
from_public_key = COALESCE(excluded.from_public_key, from_public_key),
to_public_key = COALESCE(excluded.to_public_key, to_public_key)
'''
params = (
edge['from_prefix'],
edge['to_prefix'],
edge.get('from_public_key'),
edge.get('to_public_key'),
edge['observation_count'],
edge['first_seen'].isoformat() if isinstance(edge['first_seen'], datetime) else edge['first_seen'],
edge['last_seen'].isoformat() if isinstance(edge['last_seen'], datetime) else edge['last_seen'],
edge.get('avg_hop_position'),
edge.get('geographic_distance')
)
else:
# Update existing edge — recalculate distance if we now have public keys,
# but only when not already done by the caller (skip_distance_recalc=False).
current_distance = edge.get('geographic_distance')
if not skip_distance_recalc and (edge.get('from_public_key') or edge.get('to_public_key')) and current_distance:
recalculated = self._recalculate_distance_if_needed(
edge, conn=conn, location_cache=location_cache
)
if recalculated is not None:
# Update if recalculated distance is significantly different (more than 20% difference)
if abs(recalculated - current_distance) / max(current_distance, 1.0) > 0.2:
edge['geographic_distance'] = recalculated
self.logger.info(f"Mesh graph: Corrected distance for {edge_key}: {current_distance:.1f} -> {recalculated:.1f} km")
# Update existing edge
# Always update public keys if provided (allows filling in missing keys on existing edges)
from_key = edge.get('from_public_key')
to_key = edge.get('to_public_key')
query = self._MESH_EDGE_UPDATE_QUERY
params = (
edge['observation_count'],
edge['last_seen'].isoformat() if isinstance(edge['last_seen'], datetime) else edge['last_seen'],
edge.get('avg_hop_position'),
edge.get('geographic_distance'),
from_key, # First occurrence for CASE WHEN check
from_key, # Second occurrence for value assignment
to_key, # First occurrence for CASE WHEN check
to_key, # Second occurrence for value assignment
edge['from_prefix'],
edge['to_prefix']
)
if conn is not None:
rows_affected = self.db_manager.execute_update_on_connection(conn, query, params)
else:
rows_affected = self.db_manager.execute_update(query, params)
if rows_affected > 0:
self.logger.debug(f"Mesh graph: Successfully wrote edge {edge_key} to database ({'INSERT' if is_new else 'UPDATE'}, {rows_affected} rows)")
else:
self.logger.warning(f"Mesh graph: Edge write returned 0 rows affected for {edge_key}")
except Exception as e:
self.logger.warning(f"Error writing edge to database: {e}")
import traceback
self.logger.debug(traceback.format_exc())
# UPDATE statement used for both single-edge writes and batch executemany
_MESH_EDGE_UPDATE_QUERY = '''
UPDATE mesh_connections
SET observation_count = ?, last_seen = ?,
avg_hop_position = ?, geographic_distance = ?,
from_public_key = CASE WHEN ? IS NOT NULL THEN ? ELSE from_public_key END,
to_public_key = CASE WHEN ? IS NOT NULL THEN ? ELSE to_public_key END
WHERE from_prefix = ? AND to_prefix = ?
'''
def _build_update_params_for_edge(
self,
edge_key: Tuple[str, str],
conn: Optional[sqlite3.Connection],
location_cache: Optional[Dict[str, Tuple[float, float]]],
) -> Optional[Tuple]:
"""Build UPDATE params for an edge (for batch executemany). Returns None to skip."""
if edge_key not in self.edges:
return None
try:
edge = self.edges[edge_key]
# Recalculate distance if we have public keys (same logic as _write_edge_to_db)
if edge.get('from_public_key') or edge.get('to_public_key'):
recalculated_distance = self._recalculate_distance_if_needed(
edge, conn=conn, location_cache=location_cache
)
if recalculated_distance is not None:
edge['geographic_distance'] = recalculated_distance
current_distance = edge.get('geographic_distance')
if (edge.get('from_public_key') or edge.get('to_public_key')) and current_distance:
recalculated = self._recalculate_distance_if_needed(
edge, conn=conn, location_cache=location_cache
)
if recalculated is not None and abs(recalculated - current_distance) / max(current_distance, 1.0) > 0.2:
edge['geographic_distance'] = recalculated
from_key = edge.get('from_public_key')
to_key = edge.get('to_public_key')
last_seen = edge['last_seen']
if isinstance(last_seen, datetime):
last_seen = last_seen.isoformat()
return (
edge['observation_count'],
last_seen,
edge.get('avg_hop_position'),
edge.get('geographic_distance'),
from_key,
from_key,
to_key,
to_key,
edge['from_prefix'],
edge['to_prefix'],
)
except Exception as e:
self.logger.debug(f"Error building update params for {edge_key}: {e}")
return None
def prune_expired_edges(self) -> int:
"""Remove edges from the in-memory graph that have exceeded graph_edge_expiration_days.
Only evicts from RAM — the database rows are kept so that historical data is
preserved and can be reloaded if the expiration window is later widened.
Returns:
int: Number of edges evicted.
"""
if self.edge_expiration_days <= 0:
return 0
cutoff = datetime.now() - timedelta(days=self.edge_expiration_days)
expired_keys = []
for edge_key, edge in self.edges.items():
last_seen = edge.get('last_seen')
if last_seen is None:
continue
if isinstance(last_seen, str):
try:
last_seen = datetime.fromisoformat(last_seen.replace('Z', '+00:00'))
except ValueError:
continue
if last_seen < cutoff:
expired_keys.append(edge_key)
for edge_key in expired_keys:
from_prefix, to_prefix = edge_key
del self.edges[edge_key]
# Clean up adjacency indexes
if from_prefix in self._outgoing_index:
self._outgoing_index[from_prefix].discard(to_prefix)
if not self._outgoing_index[from_prefix]:
del self._outgoing_index[from_prefix]
if to_prefix in self._incoming_index:
self._incoming_index[to_prefix].discard(from_prefix)
if not self._incoming_index[to_prefix]:
del self._incoming_index[to_prefix]
# Drop stale notification timestamp if present
self._notification_timestamps.pop(edge_key, None)
if expired_keys:
self.logger.debug(f"Pruned {len(expired_keys)} expired graph edges (older than {self.edge_expiration_days} days)")
return len(expired_keys)
def delete_expired_edges_from_db(self, days: int) -> int:
"""Delete mesh_connections rows older than the given days.
Keeps the on-disk table aligned with in-memory pruning and prevents unbounded growth.
Called from the scheduler (e.g. daily). Use Data_Retention mesh_connections_retention_days
or Path_Command graph_edge_expiration_days.
Returns:
int: Number of rows deleted.
"""
if days <= 0:
return 0
try:
deleted = self.db_manager.execute_update(
"DELETE FROM mesh_connections WHERE last_seen < datetime('now', ?)",
(f'-{days} days',)
)
if deleted > 0:
self.logger.info(f"Cleaned up {deleted} old mesh_connections entries (older than {days} days)")
return deleted
except Exception as e:
self.logger.error(f"Error cleaning up mesh_connections: {e}")
return 0
def _start_batch_writer(self):
"""Start background task for batched writes."""
def batch_writer_loop():
while not self._shutdown_event.is_set():
self._shutdown_event.wait(self.batch_interval)
if not self._shutdown_event.is_set():
# Flush synchronously (database operations are synchronous)
self._flush_pending_updates_sync()
# Periodically evict expired edges from RAM
self.prune_expired_edges()
import threading
batch_thread = threading.Thread(target=batch_writer_loop, daemon=True)
batch_thread.start()
self._batch_thread = batch_thread
def _flush_pending_updates_sync(self):
"""Flush all pending edge updates to database (synchronous version).
Uses a single connection for the entire batch to avoid 'unable to open database file'
when many edges are written in quick succession.
Handles both new edges (INSERT) and existing edges (UPDATE).
"""
with self.pending_lock:
if not self.pending_updates:
return
updates = list(self.pending_updates)
self.pending_updates.clear()
location_cache: Dict[str, Tuple[float, float]] = {}
try:
with self.db_manager.connection() as conn:
cursor = conn.cursor()
for edge_key in updates:
if edge_key not in self.edges:
continue
edge = self.edges[edge_key]
# Recalculate distance if we have public keys
if edge.get('from_public_key') or edge.get('to_public_key'):
recalculated = self._recalculate_distance_if_needed(
edge, conn=conn, location_cache=location_cache
)
if recalculated is not None:
edge['geographic_distance'] = recalculated
# Check if edge exists in DB
cursor.execute(
'SELECT 1 FROM mesh_connections WHERE from_prefix = ? AND to_prefix = ?',
(edge_key[0], edge_key[1]),
)
is_new = cursor.fetchone() is None
# Distance was already recalculated above — tell _write_edge_to_db to skip it
self._write_edge_to_db(edge_key, is_new, conn=conn, location_cache=location_cache,
skip_distance_recalc=True)
conn.commit()
except Exception as e:
self.logger.warning(f"Error flushing graph updates: {e}")
# Connection already closed by context manager; rollback happened on exit if needed
if updates:
self.logger.debug(f"Flushed {len(updates)} pending graph edge updates")
async def _flush_pending_updates(self):
"""Flush all pending edge updates to database (async wrapper)."""
self._flush_pending_updates_sync()
def has_edge(self, from_prefix: str, to_prefix: str) -> bool:
"""Check if an edge exists in the graph (exact or prefix match).
Args:
from_prefix: Source node prefix.
to_prefix: Destination node prefix.
Returns:
bool: True if edge exists.
"""
return self.get_edge(from_prefix, to_prefix) is not None
def get_edge(self, from_prefix: str, to_prefix: str) -> Optional[Dict]:
"""Get edge data if it exists (exact key first, then prefix match).
Args:
from_prefix: Source node prefix.
to_prefix: Destination node prefix.
Returns:
Dict with edge data or None if not found.
"""
from_norm = from_prefix.lower().strip() if from_prefix else ""
to_norm = to_prefix.lower().strip() if to_prefix else ""
if not from_norm or not to_norm:
return None
# Exact key first
exact = self.edges.get((from_norm, to_norm))
if exact is not None:
return exact
return self._get_edge_by_prefix_match(from_norm, to_norm)
def get_outgoing_edges(self, prefix: str) -> List[Dict]:
"""Get all edges originating from a node (prefix match: returns edges where from_prefix matches prefix).
Args:
prefix: Node prefix (2, 4, or 6 hex chars).
Returns:
List of edge dictionaries.
"""
prefix = prefix.lower().strip() if prefix else ""
if not prefix:
return []
result = []
for edge in self.edges.values():
if self._prefix_match(edge['from_prefix'], prefix):
result.append(edge)
return result
def get_incoming_edges(self, prefix: str) -> List[Dict]:
"""Get all edges ending at a node (prefix match: returns edges where to_prefix matches prefix).
Args:
prefix: Node prefix (2, 4, or 6 hex chars).
Returns:
List of edge dictionaries.
"""
prefix = prefix.lower().strip() if prefix else ""
if not prefix:
return []
result = []
for edge in self.edges.values():
if self._prefix_match(edge['to_prefix'], prefix):
result.append(edge)
return result
def validate_path_segment(self, from_prefix: str, to_prefix: str,
min_observations: int = 1,
check_bidirectional: bool = False) -> Tuple[bool, float]:
"""Validate a path segment using graph data.
Args:
from_prefix: Source node prefix.
to_prefix: Destination node prefix.
min_observations: Minimum observations required for confidence.
check_bidirectional: If True, check if reverse edge exists and boost confidence.
Returns:
Tuple of (is_valid, confidence_score) where confidence is 0.0-1.0.
"""
edge = self.get_edge(from_prefix, to_prefix)
if not edge:
return (False, 0.0)
if edge['observation_count'] < min_observations:
return (False, 0.0)
# Confidence based on observation count and recency
obs_count = edge['observation_count']
last_seen = edge['last_seen']
if isinstance(last_seen, str):
last_seen = datetime.fromisoformat(last_seen.replace('Z', '+00:00'))
hours_ago = (datetime.now() - last_seen).total_seconds() / 3600.0
# Observation count confidence (logarithmic scale)
obs_confidence = min(1.0, 0.3 + (0.7 * (1.0 - 1.0 / (1.0 + obs_count / 10.0))))
# Recency confidence (exponential decay, 48 hour half-life for longer advert intervals)
recency_confidence = 1.0 if hours_ago < 1 else max(0.0, 2.0 ** (-hours_ago / 48.0))
# Combined confidence
confidence = (obs_confidence * 0.6) + (recency_confidence * 0.4)
# Bidirectional edge bonus
if check_bidirectional:
reverse_edge = self.get_edge(to_prefix, from_prefix)
if reverse_edge and reverse_edge['observation_count'] >= min_observations:
# Bidirectional connection is more reliable
confidence = min(1.0, confidence + 0.15)
return (True, confidence)
def validate_path(self, path_nodes: List[str], min_observations: int = 1) -> Tuple[bool, float]:
"""Validate an entire path using graph data.
Args:
path_nodes: List of node prefixes in path order.
min_observations: Minimum observations required per edge.
Returns:
Tuple of (is_valid, average_confidence).
"""
if len(path_nodes) < 2:
return (True, 1.0) # Single node or empty path is always valid
validations = []
for i in range(len(path_nodes) - 1):
from_node = path_nodes[i]
to_node = path_nodes[i + 1]
is_valid, confidence = self.validate_path_segment(from_node, to_node, min_observations)
if not is_valid:
return (False, 0.0)
validations.append(confidence)
# Return average confidence
avg_confidence = sum(validations) / len(validations) if validations else 0.0
return (True, avg_confidence)
def get_candidate_score(self, candidate_prefix: str, prev_prefix: Optional[str],
next_prefix: Optional[str], min_observations: int = 1,
hop_position: Optional[int] = None,
use_bidirectional: bool = True,
use_hop_position: bool = True) -> float:
"""Get graph-based score for a candidate node in a path.
Args:
candidate_prefix: The candidate node prefix.
prev_prefix: Previous node in path (if available).
next_prefix: Next node in path (if available).
min_observations: Minimum observations required.
hop_position: Current position in path (0-based index) for hop position validation.
use_bidirectional: If True, check bidirectional edges for higher confidence.
use_hop_position: If True, validate against avg_hop_position if available.
Returns:
Score from 0.0 to 1.0 based on graph evidence.
"""
score = 0.0
evidence_count = 0
scores = []
# Check edge from previous node
if prev_prefix:
is_valid, confidence = self.validate_path_segment(
prev_prefix, candidate_prefix, min_observations,
check_bidirectional=use_bidirectional
)
if is_valid:
scores.append(confidence)
score += confidence
evidence_count += 1
# Check edge to next node
if next_prefix:
is_valid, confidence = self.validate_path_segment(
candidate_prefix, next_prefix, min_observations,
check_bidirectional=use_bidirectional
)
if is_valid:
scores.append(confidence)
score += confidence
evidence_count += 1
if evidence_count == 0:
return 0.0
# Calculate base score as average
base_score = score / evidence_count
# Hop position validation bonus
if use_hop_position and hop_position is not None:
# Check if candidate appears in expected position based on avg_hop_position
# Check both incoming and outgoing edges for hop position data
hop_position_match = False
if prev_prefix:
edge = self.get_edge(prev_prefix, candidate_prefix)
if edge and edge.get('avg_hop_position') is not None:
# Allow some tolerance (within 0.5 of expected position)
expected_pos = edge['avg_hop_position']
if abs(hop_position - expected_pos) <= 0.5:
hop_position_match = True
if not hop_position_match and next_prefix:
edge = self.get_edge(candidate_prefix, next_prefix)
if edge and edge.get('avg_hop_position') is not None:
# For outgoing edge, expected position is one less (since it's the from node)
expected_pos = edge['avg_hop_position'] - 1
if abs(hop_position - expected_pos) <= 0.5:
hop_position_match = True
if hop_position_match:
base_score = min(1.0, base_score + 0.1)
# Geographic distance validation (if available)
# Use stored geographic_distance from edges when available (more accurate)
if prev_prefix or next_prefix:
# Check if we have geographic distance data that suggests reasonable routing
# This is informational - we don't heavily penalize based on distance alone
# but can use it as a tie-breaker
geographic_available = False
if prev_prefix:
edge = self.get_edge(prev_prefix, candidate_prefix)
if edge and edge.get('geographic_distance') is not None:
geographic_available = True
if not geographic_available and next_prefix:
edge = self.get_edge(candidate_prefix, next_prefix)
if edge and edge.get('geographic_distance') is not None:
geographic_available = True
# Having geographic data increases confidence slightly (indicates well-tracked edge)
if geographic_available:
base_score = min(1.0, base_score + 0.05)
return base_score
def find_intermediate_nodes(self, from_prefix: str, to_prefix: str,
min_observations: int = 1,
max_hops: int = 2) -> List[Tuple[str, float]]:
"""Find intermediate nodes that connect from_prefix to to_prefix.
Uses multi-hop path inference to find nodes that connect two prefixes
when a direct edge may not exist or have low confidence.
Args:
from_prefix: Source node prefix.
to_prefix: Destination node prefix.
min_observations: Minimum observations required per edge.
max_hops: Maximum number of hops to search (default: 2, fallback to 3).
Returns:
List of (candidate_prefix, score) tuples sorted by score (highest first).
Score is 0.0-1.0 based on path strength.
"""
from_prefix = from_prefix.lower().strip() if from_prefix else ""
to_prefix = to_prefix.lower().strip() if to_prefix else ""
if not from_prefix or not to_prefix:
return []
candidates: Dict[str, float] = {}
# Try 2-hop paths first: from_prefix -> intermediate -> to_prefix
outgoing_edges = self.get_outgoing_edges(from_prefix)
for edge in outgoing_edges:
intermediate_prefix = edge['to_prefix']
# Skip if this is the destination (direct edge case)
if intermediate_prefix == to_prefix:
continue
# Check if intermediate connects to destination
to_edge = self.get_edge(intermediate_prefix, to_prefix)
if not to_edge or to_edge['observation_count'] < min_observations:
continue
# Validate both edges
from_valid, from_confidence = self.validate_path_segment(
from_prefix, intermediate_prefix, min_observations,
check_bidirectional=True
)
to_valid, to_confidence = self.validate_path_segment(
intermediate_prefix, to_prefix, min_observations,
check_bidirectional=True
)
if from_valid and to_valid:
# Score is minimum of both edges (weakest link)
path_score = min(from_confidence, to_confidence)
# Bidirectional path bonus
reverse_from = self.get_edge(intermediate_prefix, from_prefix)
reverse_to = self.get_edge(to_prefix, intermediate_prefix)
bidirectional_bonus = 1.0
if reverse_from and reverse_from['observation_count'] >= min_observations:
if reverse_to and reverse_to['observation_count'] >= min_observations:
# Both edges are bidirectional - strong evidence
bidirectional_bonus = 1.2
else:
bidirectional_bonus = 1.1
elif reverse_to and reverse_to['observation_count'] >= min_observations:
bidirectional_bonus = 1.1
path_score = min(1.0, path_score * bidirectional_bonus)
# Use best score if we've seen this candidate before
if intermediate_prefix not in candidates or path_score > candidates[intermediate_prefix]:
candidates[intermediate_prefix] = path_score
# If no 2-hop paths found and max_hops >= 3, try 3-hop paths
if not candidates and max_hops >= 3:
# Find 3-hop paths: from_prefix -> intermediate1 -> intermediate2 -> to_prefix
for edge1 in outgoing_edges:
intermediate1 = edge1['to_prefix']
if intermediate1 == to_prefix:
continue
# Get edges from intermediate1
intermediate1_edges = self.get_outgoing_edges(intermediate1)
for edge2 in intermediate1_edges:
intermediate2 = edge2['to_prefix']
if intermediate2 == from_prefix or intermediate2 == intermediate1:
continue
# Check if intermediate2 connects to destination
to_edge = self.get_edge(intermediate2, to_prefix)
if not to_edge or to_edge['observation_count'] < min_observations:
continue
# Validate all three edges
valid1, conf1 = self.validate_path_segment(
from_prefix, intermediate1, min_observations
)
valid2, conf2 = self.validate_path_segment(
intermediate1, intermediate2, min_observations
)
valid3, conf3 = self.validate_path_segment(
intermediate2, to_prefix, min_observations
)
if valid1 and valid2 and valid3:
# Score is minimum of all three edges
path_score = min(conf1, conf2, conf3)
# 3-hop paths are less reliable, so reduce score
path_score *= 0.8
# Use intermediate2 as candidate (the one before destination)
if intermediate2 not in candidates or path_score > candidates[intermediate2]:
candidates[intermediate2] = path_score
# Sort by score (highest first) and return
sorted_candidates = sorted(candidates.items(), key=lambda x: x[1], reverse=True)
return sorted_candidates
def shutdown(self):
"""Shutdown graph, flushing all pending writes."""
# Do not log here: atexit may run after the logger's stream is closed.
# Signal shutdown
self._shutdown_event.set()
# Flush pending updates
try:
self._flush_pending_updates_sync()
except Exception:
pass # Avoid logging; stream may be closed during atexit