mirror of
https://github.com/agessaman/meshcore-bot.git
synced 2026-03-29 11:29:51 +00:00
- Updated the _cleanup_web_viewer and _cleanup_mesh_graph methods to avoid logging errors during shutdown, as the logger's stream may be closed at that time. - Modified the shutdown method in MeshGraph to prevent logging of flushing errors, enhancing stability during the atexit process. - Adjusted test configurations to use Path objects for bot_root and local_root, improving path handling in tests.
1405 lines
64 KiB
Python
1405 lines
64 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Mesh Graph Module
|
|
Tracks observed connections between repeaters to improve path guessing accuracy.
|
|
Persists graph state across bot restarts for development scenarios.
|
|
|
|
Multi-resolution storage and node identity:
|
|
Edges are stored at the resolution observed (2, 4, or 6 hex chars per node). Nodes
|
|
with the same logical identity (e.g. 01, 0101, 0101C1) are treated as one: on read,
|
|
get_edge uses prefix matching and returns the best-matching edge (longest prefix,
|
|
then observation_count, then last_seen). On write:
|
|
- 1-byte observations (2-char prefix): merge into an existing 2/3-byte edge only
|
|
when the edge is unique (exactly one prefix-matching edge). If zero or multiple
|
|
matches exist, create or update only the 1-byte edge to avoid false coalescing.
|
|
- 2/3-byte observations: merge into the best-matching edge when present; when the
|
|
new observation is more specific than the existing edge, promote (remove old edge,
|
|
add new at higher resolution with merged observation count; DB: delete old row,
|
|
insert new). Distinct links (e.g. 7e42→8611 and 7e99→86ff) stay separate.
|
|
"""
|
|
|
|
import sqlite3
|
|
import sys
|
|
import time
|
|
import threading
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional, Tuple, Set
|
|
from collections import defaultdict
|
|
|
|
|
|
class MeshGraph:
|
|
"""Graph structure tracking observed connections between mesh nodes."""
|
|
|
|
def __init__(self, bot):
|
|
"""Initialize the mesh graph.
|
|
|
|
Args:
|
|
bot: Bot instance with db_manager and config access.
|
|
"""
|
|
self.bot = bot
|
|
self.logger = bot.logger
|
|
self.db_manager = bot.db_manager
|
|
|
|
# Capture/validation feature flags
|
|
# graph_capture_enabled: controls whether new edge data is collected from packets
|
|
# When False, no new edges are added and the batch writer thread is not started.
|
|
self.capture_enabled = bot.config.getboolean('Path_Command', 'graph_capture_enabled', fallback=True)
|
|
|
|
# In-memory graph storage: {(from_prefix, to_prefix): edge_data}
|
|
self.edges: Dict[Tuple[str, str], Dict] = {}
|
|
|
|
# Adjacency indexes for O(1) neighbour lookups (derived from self.edges)
|
|
self._outgoing_index: Dict[str, Set[str]] = defaultdict(set) # from_prefix -> set of to_prefixes
|
|
self._incoming_index: Dict[str, Set[str]] = defaultdict(set) # to_prefix -> set of from_prefixes
|
|
|
|
# Per-edge last-notification timestamps for web viewer throttling (unix float)
|
|
self._notification_timestamps: Dict[Tuple[str, str], float] = {}
|
|
|
|
# Track pending updates for batched writes
|
|
self.pending_updates: Set[Tuple[str, str]] = set()
|
|
self.pending_lock = threading.Lock()
|
|
|
|
# Write strategy configuration
|
|
self.write_strategy = bot.config.get('Path_Command', 'graph_write_strategy', fallback='hybrid')
|
|
self.batch_interval = bot.config.getint('Path_Command', 'graph_batch_interval_seconds', fallback=30)
|
|
self.batch_max_pending = bot.config.getint('Path_Command', 'graph_batch_max_pending', fallback=100)
|
|
# Default 14 days: edges older than this carry near-zero recency confidence anyway.
|
|
# Set to 0 in config.ini to load all historical edges (e.g. on servers with ample RAM).
|
|
self.startup_load_days = bot.config.getint('Path_Command', 'graph_startup_load_days', fallback=14)
|
|
self.edge_expiration_days = bot.config.getint('Path_Command', 'graph_edge_expiration_days', fallback=7)
|
|
|
|
# Background task for batched writes
|
|
self._batch_task = None
|
|
self._shutdown_event = threading.Event()
|
|
|
|
# Load graph from database on startup
|
|
self._load_from_database()
|
|
|
|
# Start background batch writer only when capture is active
|
|
if self.capture_enabled and self.write_strategy in ('batched', 'hybrid'):
|
|
self._start_batch_writer()
|
|
|
|
def _prefix_len(self) -> int:
|
|
"""Return configured prefix length in hex chars (always an int for slicing)."""
|
|
n = getattr(self.bot, 'prefix_hex_chars', 2)
|
|
try:
|
|
return max(2, int(n))
|
|
except (TypeError, ValueError):
|
|
return 2
|
|
|
|
def _valid_prefix_length(self, prefix: str) -> bool:
|
|
"""Return True if prefix has 2, 4, or 6 hex chars (after stripping)."""
|
|
if not prefix or not isinstance(prefix, str):
|
|
return False
|
|
s = prefix.strip().lower()
|
|
if len(s) not in (2, 4, 6):
|
|
return False
|
|
return all(c in '0123456789abcdef' for c in s)
|
|
|
|
def _prefix_match(self, a: str, b: str) -> bool:
|
|
"""Return True if a and b match: exact or one is a prefix of the other (after lowercasing)."""
|
|
if not a or not b:
|
|
return False
|
|
a, b = a.lower().strip(), b.lower().strip()
|
|
return a == b or a.startswith(b) or b.startswith(a)
|
|
|
|
def _get_edge_by_prefix_match(self, from_q: str, to_q: str) -> Optional[Dict]:
|
|
"""Return the best matching edge for a prefix query. Single edge only; never merge counts.
|
|
Tie-break: exact key if present, else longest combined prefix length, then max
|
|
observation_count, then most recent last_seen.
|
|
"""
|
|
matches = self._find_all_matching_edges(from_q, to_q)
|
|
if not matches:
|
|
return None
|
|
# Return the edge data of the best match (first in list)
|
|
return matches[0][1]
|
|
|
|
def _find_all_matching_edges(
|
|
self, from_prefix: str, to_prefix: str
|
|
) -> List[Tuple[Tuple[str, str], Dict]]:
|
|
"""Return all edges that prefix-match (from_prefix, to_prefix), ordered by best match first.
|
|
|
|
Best = longest combined prefix length, then observation_count desc, then last_seen desc.
|
|
Used for 1-byte uniqueness check (merge only when len==1) and for 2/3-byte merge/promote.
|
|
"""
|
|
from_q = from_prefix.lower().strip() if from_prefix else ""
|
|
to_q = to_prefix.lower().strip() if to_prefix else ""
|
|
if not from_q or not to_q:
|
|
return []
|
|
|
|
candidates: List[Tuple[Tuple[str, str], Dict]] = []
|
|
for edge_key, edge in self.edges.items():
|
|
from_p, to_p = edge_key
|
|
if self._prefix_match(from_p, from_q) and self._prefix_match(to_p, to_q):
|
|
candidates.append((edge_key, edge))
|
|
|
|
if not candidates:
|
|
return []
|
|
|
|
def sort_key(item):
|
|
edge_key, edge = item
|
|
from_p, to_p = edge_key
|
|
spec = len(from_p) + len(to_p)
|
|
obs = edge.get("observation_count", 0)
|
|
last = edge.get("last_seen")
|
|
if isinstance(last, str):
|
|
try:
|
|
last = datetime.fromisoformat(last.replace("Z", "+00:00"))
|
|
except ValueError:
|
|
last = datetime.min
|
|
last_ts = last if isinstance(last, datetime) else datetime.min
|
|
return (-spec, -obs, last_ts) # desc spec, desc obs, asc time -> most recent last
|
|
|
|
candidates.sort(key=sort_key)
|
|
return candidates
|
|
|
|
def _remove_edge_from_memory(self, edge_key: Tuple[str, str]) -> None:
|
|
"""Remove an edge from in-memory graph and adjacency indexes.
|
|
Does not touch the database. Used when promoting to a higher-resolution key.
|
|
"""
|
|
if edge_key not in self.edges:
|
|
return
|
|
from_p, to_p = edge_key
|
|
del self.edges[edge_key]
|
|
if from_p in self._outgoing_index:
|
|
self._outgoing_index[from_p].discard(to_p)
|
|
if not self._outgoing_index[from_p]:
|
|
del self._outgoing_index[from_p]
|
|
if to_p in self._incoming_index:
|
|
self._incoming_index[to_p].discard(from_p)
|
|
if not self._incoming_index[to_p]:
|
|
del self._incoming_index[to_p]
|
|
self._notification_timestamps.pop(edge_key, None)
|
|
|
|
def _delete_edge_from_db(
|
|
self, edge_key: Tuple[str, str], conn: Optional[sqlite3.Connection] = None
|
|
) -> int:
|
|
"""Delete a single edge row from mesh_connections. Returns rows affected."""
|
|
from_p, to_p = edge_key
|
|
query = "DELETE FROM mesh_connections WHERE from_prefix = ? AND to_prefix = ?"
|
|
params = (from_p, to_p)
|
|
if conn is not None:
|
|
return self.db_manager.execute_update_on_connection(conn, query, params)
|
|
return self.db_manager.execute_update(query, params)
|
|
|
|
def _update_edge_data(
|
|
self,
|
|
edge: Dict,
|
|
now: datetime,
|
|
hop_position: Optional[int] = None,
|
|
from_public_key: Optional[str] = None,
|
|
to_public_key: Optional[str] = None,
|
|
geographic_distance: Optional[float] = None,
|
|
prefix_bytes: int = 1,
|
|
) -> None:
|
|
"""Apply one observation to an edge dict (increment count, last_seen, optional fields)."""
|
|
edge["observation_count"] = edge.get("observation_count", 0) + 1
|
|
edge["last_seen"] = now
|
|
if hop_position is not None:
|
|
current_avg = edge.get("avg_hop_position")
|
|
count = edge["observation_count"]
|
|
if current_avg is not None:
|
|
edge["avg_hop_position"] = ((current_avg * (count - 1)) + hop_position) / count
|
|
else:
|
|
edge["avg_hop_position"] = hop_position
|
|
if from_public_key:
|
|
edge["from_public_key"] = from_public_key
|
|
if to_public_key:
|
|
edge["to_public_key"] = to_public_key
|
|
if geographic_distance is not None:
|
|
edge["geographic_distance"] = geographic_distance
|
|
if prefix_bytes == 2:
|
|
edge["confirmed_2byte"] = True
|
|
|
|
def _load_from_database(self):
|
|
"""Load graph edges from database on startup."""
|
|
try:
|
|
query = '''
|
|
SELECT from_prefix, to_prefix, from_public_key, to_public_key,
|
|
observation_count, first_seen, last_seen, avg_hop_position,
|
|
geographic_distance
|
|
FROM mesh_connections
|
|
'''
|
|
|
|
# Build WHERE clause combining startup_load_days and edge_expiration_days.
|
|
# startup_load_days: explicit cap on how far back to load (0 = no cap).
|
|
# edge_expiration_days: always applied — never load edges we would immediately
|
|
# evict as expired (this bounds memory even when startup_load_days=0).
|
|
where_parts = []
|
|
if self.startup_load_days > 0:
|
|
cutoff_date = datetime.now() - timedelta(days=self.startup_load_days)
|
|
where_parts.append(f"last_seen >= '{cutoff_date.isoformat()}'")
|
|
if self.edge_expiration_days > 0:
|
|
expiry_date = datetime.now() - timedelta(days=self.edge_expiration_days)
|
|
where_parts.append(f"last_seen >= '{expiry_date.isoformat()}'")
|
|
if where_parts:
|
|
# Use the most restrictive (most recent) cutoff
|
|
query += " WHERE " + " AND ".join(where_parts)
|
|
|
|
query += " ORDER BY last_seen DESC"
|
|
|
|
results = self.db_manager.execute_query(query)
|
|
|
|
edge_count = 0
|
|
for row in results:
|
|
from_prefix = row['from_prefix']
|
|
to_prefix = row['to_prefix']
|
|
edge_key = (from_prefix, to_prefix)
|
|
|
|
# Intern public key strings so identical keys across many edges share
|
|
# a single string object in memory rather than duplicating bytes.
|
|
from_pk = row.get('from_public_key')
|
|
to_pk = row.get('to_public_key')
|
|
if from_pk:
|
|
from_pk = sys.intern(from_pk)
|
|
if to_pk:
|
|
to_pk = sys.intern(to_pk)
|
|
|
|
self.edges[edge_key] = {
|
|
'from_prefix': from_prefix,
|
|
'to_prefix': to_prefix,
|
|
'from_public_key': from_pk,
|
|
'to_public_key': to_pk,
|
|
'observation_count': row.get('observation_count', 1),
|
|
'first_seen': row.get('first_seen'),
|
|
'last_seen': row.get('last_seen'),
|
|
'avg_hop_position': row.get('avg_hop_position'),
|
|
'geographic_distance': row.get('geographic_distance')
|
|
}
|
|
|
|
# Maintain adjacency indexes
|
|
self._outgoing_index[from_prefix].add(to_prefix)
|
|
self._incoming_index[to_prefix].add(from_prefix)
|
|
|
|
edge_count += 1
|
|
|
|
self.logger.info(f"Loaded {edge_count} graph edges from database")
|
|
|
|
# Log statistics
|
|
if edge_count > 0:
|
|
total_observations = sum(e['observation_count'] for e in self.edges.values())
|
|
self.logger.info(f"Graph statistics: {edge_count} edges, {total_observations} total observations")
|
|
|
|
# Belt-and-suspenders: prune any edges that slipped through the SQL filter
|
|
# (e.g. timezone edge cases or edges loaded before expiration_days was set)
|
|
self.prune_expired_edges()
|
|
|
|
except Exception as e:
|
|
self.logger.warning(f"Error loading graph from database: {e}")
|
|
# Continue with empty graph
|
|
|
|
def add_edge(self, from_prefix: str, to_prefix: str,
|
|
from_public_key: Optional[str] = None,
|
|
to_public_key: Optional[str] = None,
|
|
hop_position: Optional[int] = None,
|
|
geographic_distance: Optional[float] = None,
|
|
prefix_bytes: int = 1):
|
|
"""Add or update an edge in the graph.
|
|
|
|
Prefixes are stored at the resolution provided (2, 4, or 6 hex chars).
|
|
No truncation; the same physical link can be recorded at different
|
|
resolutions and will create/update the matching edge.
|
|
|
|
Args:
|
|
from_prefix: Source node prefix (2, 4, or 6 hex chars depending on path encoding).
|
|
to_prefix: Destination node prefix (2, 4, or 6 hex chars depending on path encoding).
|
|
from_public_key: Full public key of source node (optional).
|
|
to_public_key: Full public key of destination node (optional).
|
|
hop_position: Position in path where this edge was observed (optional).
|
|
geographic_distance: Distance in km between nodes (optional).
|
|
prefix_bytes: 1 = 1-byte (2-char) prefix (default); 2 = 2-byte confirmed (stored as edge flag for weighting).
|
|
"""
|
|
if not from_prefix or not to_prefix:
|
|
return
|
|
|
|
# Respect the capture kill-switch — allow reads but block writes
|
|
if not self.capture_enabled:
|
|
return
|
|
|
|
# Normalize to lowercase; validate length (2, 4, or 6 hex chars). No truncation.
|
|
from_prefix = from_prefix.lower().strip()
|
|
to_prefix = to_prefix.lower().strip()
|
|
if not self._valid_prefix_length(from_prefix) or not self._valid_prefix_length(to_prefix):
|
|
return
|
|
|
|
# Intern public key strings so repeated identical keys share one object in RAM
|
|
if from_public_key:
|
|
from_public_key = sys.intern(from_public_key)
|
|
if to_public_key:
|
|
to_public_key = sys.intern(to_public_key)
|
|
|
|
edge_key = (from_prefix, to_prefix)
|
|
now = datetime.now()
|
|
|
|
matches = self._find_all_matching_edges(from_prefix, to_prefix)
|
|
best = matches[0] if matches else None
|
|
incoming_1byte = len(from_prefix) == 2 or len(to_prefix) == 2
|
|
|
|
# 1-byte: merge only when exactly one matching edge (unique link)
|
|
if incoming_1byte and len(matches) == 1:
|
|
target_key, target_edge = matches[0]
|
|
self._update_edge_data(
|
|
target_edge, now, hop_position,
|
|
from_public_key, to_public_key, geographic_distance, prefix_bytes,
|
|
)
|
|
self._persist_and_notify_edge(target_key, is_new_edge=False)
|
|
return
|
|
|
|
# 2/3-byte: merge into best match or promote
|
|
if not incoming_1byte and best is not None:
|
|
best_key, best_edge = best
|
|
best_spec = len(best_key[0]) + len(best_key[1])
|
|
new_spec = len(from_prefix) + len(to_prefix)
|
|
|
|
if best_key == edge_key:
|
|
# Update in place
|
|
self._update_edge_data(
|
|
best_edge, now, hop_position,
|
|
from_public_key, to_public_key, geographic_distance, prefix_bytes,
|
|
)
|
|
self._persist_and_notify_edge(edge_key, is_new_edge=False)
|
|
return
|
|
|
|
if best_spec > new_spec:
|
|
# Best match is more specific — update that edge and return
|
|
self._update_edge_data(
|
|
best_edge, now, hop_position,
|
|
from_public_key, to_public_key, geographic_distance, prefix_bytes,
|
|
)
|
|
self._persist_and_notify_edge(best_key, is_new_edge=False)
|
|
return
|
|
|
|
if best_spec < new_spec:
|
|
# Don't promote 1-byte to 3-byte when the existing 1-byte edge has no public_key:
|
|
# keep the 1-byte edge so we don't attribute its observations to one specific 3-byte node
|
|
# (other e0 repeaters would otherwise have no edges and appear removed).
|
|
best_is_1byte = len(best_key[0]) == 2 and len(best_key[1]) == 2
|
|
if best_is_1byte and not best_edge.get("from_public_key") and not best_edge.get("to_public_key"):
|
|
self._update_edge_data(
|
|
best_edge, now, hop_position,
|
|
from_public_key, to_public_key, geographic_distance, prefix_bytes,
|
|
)
|
|
self._persist_and_notify_edge(best_key, is_new_edge=False)
|
|
return
|
|
# Promote: remove old edge, add new at higher resolution with merged count
|
|
merged_count = best_edge["observation_count"] + 1
|
|
first_seen = best_edge.get("first_seen") or now
|
|
self._remove_edge_from_memory(best_key)
|
|
self._delete_edge_from_db(best_key)
|
|
with self.pending_lock:
|
|
self.pending_updates.discard(best_key)
|
|
|
|
self.edges[edge_key] = {
|
|
"from_prefix": from_prefix,
|
|
"to_prefix": to_prefix,
|
|
"from_public_key": from_public_key or best_edge.get("from_public_key"),
|
|
"to_public_key": to_public_key or best_edge.get("to_public_key"),
|
|
"observation_count": merged_count,
|
|
"first_seen": first_seen,
|
|
"last_seen": now,
|
|
"avg_hop_position": hop_position if hop_position is not None else best_edge.get("avg_hop_position"),
|
|
"geographic_distance": geographic_distance if geographic_distance is not None else best_edge.get("geographic_distance"),
|
|
"confirmed_2byte": True if prefix_bytes == 2 else best_edge.get("confirmed_2byte", False),
|
|
}
|
|
self._outgoing_index[from_prefix].add(to_prefix)
|
|
self._incoming_index[to_prefix].add(from_prefix)
|
|
|
|
self._persist_and_notify_edge(edge_key, is_new_edge=True)
|
|
return
|
|
|
|
# Exact-key create or update (1-byte with 0 or 2+ matches, or 2/3-byte with no match)
|
|
if edge_key in self.edges:
|
|
edge = self.edges[edge_key]
|
|
edge['observation_count'] += 1
|
|
edge['last_seen'] = now
|
|
|
|
# Update average hop position
|
|
if hop_position is not None:
|
|
current_avg = edge.get('avg_hop_position')
|
|
count = edge['observation_count']
|
|
if current_avg is not None:
|
|
# Weighted average: (old_avg * (count-1) + new_pos) / count
|
|
edge['avg_hop_position'] = ((current_avg * (count - 1)) + hop_position) / count
|
|
else:
|
|
# First time setting hop position
|
|
edge['avg_hop_position'] = hop_position
|
|
|
|
# Update public keys if provided (always update if we have a better key)
|
|
# This allows us to fill in missing keys on existing edges
|
|
if from_public_key:
|
|
edge['from_public_key'] = from_public_key
|
|
if to_public_key:
|
|
edge['to_public_key'] = to_public_key
|
|
|
|
# Update geographic distance if provided
|
|
if geographic_distance is not None:
|
|
edge['geographic_distance'] = geographic_distance
|
|
|
|
# 2-byte trace confirmation (for path weighting when supported)
|
|
if prefix_bytes == 2:
|
|
edge['confirmed_2byte'] = True
|
|
|
|
is_new_edge = False
|
|
else:
|
|
# New edge — also update adjacency indexes
|
|
self.edges[edge_key] = {
|
|
'from_prefix': from_prefix,
|
|
'to_prefix': to_prefix,
|
|
'from_public_key': from_public_key,
|
|
'to_public_key': to_public_key,
|
|
'observation_count': 1,
|
|
'first_seen': now,
|
|
'last_seen': now,
|
|
'avg_hop_position': hop_position if hop_position is not None else None,
|
|
'geographic_distance': geographic_distance,
|
|
'confirmed_2byte': True if prefix_bytes == 2 else False,
|
|
}
|
|
self._outgoing_index[from_prefix].add(to_prefix)
|
|
self._incoming_index[to_prefix].add(from_prefix)
|
|
is_new_edge = True
|
|
|
|
self._persist_and_notify_edge(edge_key, is_new_edge)
|
|
|
|
def _persist_and_notify_edge(self, edge_key: Tuple[str, str], is_new_edge: bool) -> None:
|
|
"""Persist edge to DB (according to write strategy) and notify web viewer."""
|
|
self.logger.debug(f"Mesh graph: Edge {edge_key} - new={is_new_edge}, strategy={self.write_strategy}")
|
|
if self.write_strategy == 'immediate':
|
|
self._write_edge_to_db(edge_key, is_new_edge)
|
|
elif self.write_strategy == 'batched':
|
|
with self.pending_lock:
|
|
self.pending_updates.add(edge_key)
|
|
if len(self.pending_updates) >= self.batch_max_pending:
|
|
self._flush_pending_updates_sync()
|
|
elif self.write_strategy == 'hybrid':
|
|
if is_new_edge:
|
|
self._write_edge_to_db(edge_key, True)
|
|
else:
|
|
with self.pending_lock:
|
|
self.pending_updates.add(edge_key)
|
|
if len(self.pending_updates) >= self.batch_max_pending:
|
|
self._flush_pending_updates_sync()
|
|
self._notify_web_viewer_edge(edge_key, is_new_edge)
|
|
|
|
def _notify_web_viewer_edge(self, edge_key: Tuple[str, str], is_new: bool):
|
|
"""Notify web viewer of edge update via bot integration.
|
|
|
|
New edges always trigger an immediate notification. Updates to existing
|
|
edges are throttled to at most once every 10 seconds to reduce HTTP
|
|
traffic on busy meshes.
|
|
"""
|
|
try:
|
|
if not hasattr(self.bot, 'web_viewer_integration') or not self.bot.web_viewer_integration:
|
|
return
|
|
|
|
if not hasattr(self.bot.web_viewer_integration, 'bot_integration'):
|
|
return
|
|
|
|
edge = self.edges.get(edge_key)
|
|
if not edge:
|
|
return
|
|
|
|
# Throttle repeated updates for the same edge (new edges always notify)
|
|
now_ts = time.time()
|
|
if not is_new:
|
|
last_notified = self._notification_timestamps.get(edge_key, 0.0)
|
|
if (now_ts - last_notified) < 10.0:
|
|
return # Skip — notified recently enough
|
|
self._notification_timestamps[edge_key] = now_ts
|
|
|
|
# Prepare edge data for web viewer
|
|
edge_data = {
|
|
'from_prefix': edge['from_prefix'],
|
|
'to_prefix': edge['to_prefix'],
|
|
'from_public_key': edge.get('from_public_key'),
|
|
'to_public_key': edge.get('to_public_key'),
|
|
'observation_count': edge['observation_count'],
|
|
'first_seen': edge['first_seen'].isoformat() if isinstance(edge['first_seen'], datetime) else str(edge['first_seen']),
|
|
'last_seen': edge['last_seen'].isoformat() if isinstance(edge['last_seen'], datetime) else str(edge['last_seen']),
|
|
'avg_hop_position': edge.get('avg_hop_position'),
|
|
'geographic_distance': edge.get('geographic_distance'),
|
|
'is_new': is_new
|
|
}
|
|
|
|
# Send update asynchronously
|
|
self.bot.web_viewer_integration.bot_integration.send_mesh_edge_update(edge_data)
|
|
except Exception as e:
|
|
self.logger.debug(f"Error notifying web viewer of edge update: {e}")
|
|
|
|
def _recalculate_distance_if_needed(
|
|
self,
|
|
edge: Dict,
|
|
conn: Optional[sqlite3.Connection] = None,
|
|
location_cache: Optional[Dict[str, Tuple[float, float]]] = None,
|
|
) -> Optional[float]:
|
|
"""Recalculate geographic distance using full public keys if available.
|
|
|
|
This ensures we get the correct location when there are prefix collisions.
|
|
|
|
Args:
|
|
edge: Edge dictionary with prefix and optional public keys.
|
|
conn: Optional existing DB connection for batch operations.
|
|
location_cache: Optional cache for location lookups within a flush (keyed by pk: or prefix:).
|
|
|
|
Returns:
|
|
Optional[float]: Recalculated distance in km, or None if can't calculate.
|
|
"""
|
|
from .utils import calculate_distance
|
|
|
|
# Get location for 'from' node (conn optional for single-connection batch flush)
|
|
if edge.get('from_public_key'):
|
|
from_location = self._get_location_by_public_key(
|
|
edge['from_public_key'], conn=conn, location_cache=location_cache
|
|
)
|
|
else:
|
|
from_location = None
|
|
if not from_location:
|
|
to_location_temp = None
|
|
if edge.get('to_public_key'):
|
|
to_location_temp = self._get_location_by_public_key(
|
|
edge['to_public_key'], conn=conn, location_cache=location_cache
|
|
)
|
|
if not to_location_temp:
|
|
to_location_temp = self._get_location_by_prefix(
|
|
edge['to_prefix'], conn=conn, location_cache=location_cache
|
|
)
|
|
from_location = self._get_location_by_prefix(
|
|
edge['from_prefix'], to_location_temp, conn=conn, location_cache=location_cache
|
|
)
|
|
|
|
# Get location for 'to' node
|
|
if edge.get('to_public_key'):
|
|
to_location = self._get_location_by_public_key(
|
|
edge['to_public_key'], conn=conn, location_cache=location_cache
|
|
)
|
|
else:
|
|
to_location = None
|
|
if not to_location:
|
|
to_location = self._get_location_by_prefix(
|
|
edge['to_prefix'], from_location, conn=conn, location_cache=location_cache
|
|
)
|
|
|
|
# Calculate distance if we have both locations
|
|
if from_location and to_location:
|
|
return calculate_distance(
|
|
from_location[0], from_location[1],
|
|
to_location[0], to_location[1]
|
|
)
|
|
|
|
return None
|
|
|
|
def _get_location_by_public_key(
|
|
self,
|
|
public_key: str,
|
|
conn: Optional[sqlite3.Connection] = None,
|
|
location_cache: Optional[Dict[str, Tuple[float, float]]] = None,
|
|
) -> Optional[Tuple[float, float]]:
|
|
"""Get location for a full public key (more accurate than prefix lookup).
|
|
|
|
Prefers starred repeaters if there are somehow multiple entries (shouldn't happen with full key).
|
|
"""
|
|
cache_key = f"pk:{public_key}" if location_cache is not None else None
|
|
if cache_key is not None and cache_key in location_cache:
|
|
return location_cache[cache_key]
|
|
try:
|
|
query = '''
|
|
SELECT latitude, longitude
|
|
FROM complete_contact_tracking
|
|
WHERE public_key = ?
|
|
AND latitude IS NOT NULL AND longitude IS NOT NULL
|
|
AND latitude != 0 AND longitude != 0
|
|
AND role IN ('repeater', 'roomserver')
|
|
ORDER BY is_starred DESC, COALESCE(last_advert_timestamp, last_heard) DESC
|
|
LIMIT 1
|
|
'''
|
|
if conn is not None:
|
|
results = self.db_manager.execute_query_on_connection(conn, query, (public_key,))
|
|
else:
|
|
results = self.db_manager.execute_query(query, (public_key,))
|
|
if results:
|
|
row = results[0]
|
|
lat = row.get('latitude')
|
|
lon = row.get('longitude')
|
|
if lat is not None and lon is not None:
|
|
result = (float(lat), float(lon))
|
|
if cache_key is not None:
|
|
location_cache[cache_key] = result
|
|
return result
|
|
except Exception as e:
|
|
self.logger.debug(f"Error getting location by public key {public_key[:16]}...: {e}")
|
|
return None
|
|
|
|
def _get_location_by_prefix(
|
|
self,
|
|
prefix: str,
|
|
reference_location: Optional[Tuple[float, float]] = None,
|
|
conn: Optional[sqlite3.Connection] = None,
|
|
location_cache: Optional[Dict[str, Tuple[float, float]]] = None,
|
|
) -> Optional[Tuple[float, float]]:
|
|
"""Get location for a prefix (fallback when full public key not available).
|
|
|
|
For LoRa networks, prefers shorter distances when there are prefix collisions,
|
|
as LoRa range is limited by the curve of the earth.
|
|
|
|
Args:
|
|
prefix: 2-character hex prefix.
|
|
reference_location: Optional (lat, lon) to calculate distance from for LoRa preference.
|
|
conn: Optional existing DB connection for batch operations.
|
|
location_cache: Optional cache for location lookups within a flush.
|
|
"""
|
|
if location_cache is not None:
|
|
if reference_location is not None:
|
|
cache_key = f"prefix:{prefix}:{reference_location[0]}:{reference_location[1]}"
|
|
else:
|
|
cache_key = f"prefix:{prefix}"
|
|
if cache_key in location_cache:
|
|
return location_cache[cache_key]
|
|
try:
|
|
prefix_pattern = f"{prefix}%"
|
|
|
|
# Get all candidates with locations
|
|
query = '''
|
|
SELECT latitude, longitude, is_starred,
|
|
COALESCE(last_advert_timestamp, last_heard) as last_seen
|
|
FROM complete_contact_tracking
|
|
WHERE public_key LIKE ?
|
|
AND latitude IS NOT NULL AND longitude IS NOT NULL
|
|
AND latitude != 0 AND longitude != 0
|
|
AND role IN ('repeater', 'roomserver')
|
|
'''
|
|
if conn is not None:
|
|
results = self.db_manager.execute_query_on_connection(conn, query, (prefix_pattern,))
|
|
else:
|
|
results = self.db_manager.execute_query(query, (prefix_pattern,))
|
|
|
|
if not results:
|
|
return None
|
|
|
|
# If we have a reference location, prefer shorter distances (LoRa range limitation)
|
|
if reference_location and len(results) > 1:
|
|
from .utils import calculate_distance
|
|
ref_lat, ref_lon = reference_location
|
|
|
|
# Calculate distances and sort by distance (shorter first)
|
|
candidates_with_distance = []
|
|
for row in results:
|
|
lat = row.get('latitude')
|
|
lon = row.get('longitude')
|
|
if lat is not None and lon is not None:
|
|
distance = calculate_distance(ref_lat, ref_lon, float(lat), float(lon))
|
|
is_starred = row.get('is_starred', False)
|
|
last_seen = row.get('last_seen', '')
|
|
candidates_with_distance.append((distance, is_starred, last_seen, row))
|
|
|
|
if candidates_with_distance:
|
|
# Sort by: starred first (False < True), then distance (shorter = better for LoRa), then recency
|
|
candidates_with_distance.sort(key=lambda x: (
|
|
not x[1], # Starred first (False < True, so starred=True comes before starred=False)
|
|
x[0], # Distance (shorter first)
|
|
x[2] if x[2] else '' # More recent first (newer timestamps sort later in string comparison)
|
|
))
|
|
|
|
# Get the best candidate
|
|
best_row = candidates_with_distance[0][3]
|
|
lat = best_row.get('latitude')
|
|
lon = best_row.get('longitude')
|
|
if lat is not None and lon is not None:
|
|
result = (float(lat), float(lon))
|
|
if location_cache is not None and reference_location is not None:
|
|
cache_key = f"prefix:{prefix}:{reference_location[0]}:{reference_location[1]}"
|
|
location_cache[cache_key] = result
|
|
return result
|
|
|
|
# No reference location or single result - use standard ordering
|
|
# Prefer starred, then most recent
|
|
results.sort(key=lambda x: (
|
|
not x.get('is_starred', False), # Starred first (False < True)
|
|
x.get('last_seen', '') if x.get('last_seen') else '' # More recent first
|
|
))
|
|
|
|
row = results[0]
|
|
lat = row.get('latitude')
|
|
lon = row.get('longitude')
|
|
if lat is not None and lon is not None:
|
|
result = (float(lat), float(lon))
|
|
if location_cache is not None:
|
|
cache_key = f"prefix:{prefix}" if reference_location is None else f"prefix:{prefix}:{reference_location[0]}:{reference_location[1]}"
|
|
location_cache[cache_key] = result
|
|
return result
|
|
except Exception as e:
|
|
self.logger.debug(f"Error getting location by prefix {prefix}: {e}")
|
|
return None
|
|
|
|
def _write_edge_to_db(
|
|
self,
|
|
edge_key: Tuple[str, str],
|
|
is_new: bool,
|
|
conn: Optional[sqlite3.Connection] = None,
|
|
location_cache: Optional[Dict[str, Tuple[float, float]]] = None,
|
|
skip_distance_recalc: bool = False,
|
|
):
|
|
"""Write a single edge to the database.
|
|
|
|
Args:
|
|
edge_key: (from_prefix, to_prefix) tuple.
|
|
is_new: True if this is a new edge, False if updating existing.
|
|
conn: Optional existing DB connection for batch operations (caller commits).
|
|
location_cache: Optional cache for location lookups within a flush.
|
|
skip_distance_recalc: If True, skip distance recalculation (used by
|
|
_flush_pending_updates_sync which already recalculates before calling here).
|
|
"""
|
|
if edge_key not in self.edges:
|
|
return
|
|
|
|
edge = self.edges[edge_key]
|
|
|
|
# Recalculate distance using full public keys if available (more accurate).
|
|
# Skipped when called from the batch flush loop, which already recalculated.
|
|
if not skip_distance_recalc and (edge.get('from_public_key') or edge.get('to_public_key')):
|
|
recalculated_distance = self._recalculate_distance_if_needed(
|
|
edge, conn=conn, location_cache=location_cache
|
|
)
|
|
if recalculated_distance is not None:
|
|
edge['geographic_distance'] = recalculated_distance
|
|
self.logger.debug(f"Mesh graph: Recalculated distance for {edge_key} using public keys: {recalculated_distance:.1f} km")
|
|
|
|
try:
|
|
if is_new:
|
|
# Upsert new edge.
|
|
# Use INSERT ... ON CONFLICT DO UPDATE so that if the row already exists
|
|
# in the database (e.g. it was filtered out of the in-memory graph at
|
|
# startup by startup_load_days / edge_expiration_days, or written by a
|
|
# concurrent process), we merge rather than fail with UNIQUE constraint.
|
|
query = '''
|
|
INSERT INTO mesh_connections
|
|
(from_prefix, to_prefix, from_public_key, to_public_key,
|
|
observation_count, first_seen, last_seen, avg_hop_position,
|
|
geographic_distance)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(from_prefix, to_prefix) DO UPDATE SET
|
|
observation_count = MAX(observation_count, excluded.observation_count),
|
|
last_seen = MAX(last_seen, excluded.last_seen),
|
|
avg_hop_position = excluded.avg_hop_position,
|
|
geographic_distance = COALESCE(excluded.geographic_distance, geographic_distance),
|
|
from_public_key = COALESCE(excluded.from_public_key, from_public_key),
|
|
to_public_key = COALESCE(excluded.to_public_key, to_public_key)
|
|
'''
|
|
params = (
|
|
edge['from_prefix'],
|
|
edge['to_prefix'],
|
|
edge.get('from_public_key'),
|
|
edge.get('to_public_key'),
|
|
edge['observation_count'],
|
|
edge['first_seen'].isoformat() if isinstance(edge['first_seen'], datetime) else edge['first_seen'],
|
|
edge['last_seen'].isoformat() if isinstance(edge['last_seen'], datetime) else edge['last_seen'],
|
|
edge.get('avg_hop_position'),
|
|
edge.get('geographic_distance')
|
|
)
|
|
else:
|
|
# Update existing edge — recalculate distance if we now have public keys,
|
|
# but only when not already done by the caller (skip_distance_recalc=False).
|
|
current_distance = edge.get('geographic_distance')
|
|
if not skip_distance_recalc and (edge.get('from_public_key') or edge.get('to_public_key')) and current_distance:
|
|
recalculated = self._recalculate_distance_if_needed(
|
|
edge, conn=conn, location_cache=location_cache
|
|
)
|
|
if recalculated is not None:
|
|
# Update if recalculated distance is significantly different (more than 20% difference)
|
|
if abs(recalculated - current_distance) / max(current_distance, 1.0) > 0.2:
|
|
edge['geographic_distance'] = recalculated
|
|
self.logger.info(f"Mesh graph: Corrected distance for {edge_key}: {current_distance:.1f} -> {recalculated:.1f} km")
|
|
|
|
# Update existing edge
|
|
# Always update public keys if provided (allows filling in missing keys on existing edges)
|
|
from_key = edge.get('from_public_key')
|
|
to_key = edge.get('to_public_key')
|
|
query = self._MESH_EDGE_UPDATE_QUERY
|
|
params = (
|
|
edge['observation_count'],
|
|
edge['last_seen'].isoformat() if isinstance(edge['last_seen'], datetime) else edge['last_seen'],
|
|
edge.get('avg_hop_position'),
|
|
edge.get('geographic_distance'),
|
|
from_key, # First occurrence for CASE WHEN check
|
|
from_key, # Second occurrence for value assignment
|
|
to_key, # First occurrence for CASE WHEN check
|
|
to_key, # Second occurrence for value assignment
|
|
edge['from_prefix'],
|
|
edge['to_prefix']
|
|
)
|
|
|
|
if conn is not None:
|
|
rows_affected = self.db_manager.execute_update_on_connection(conn, query, params)
|
|
else:
|
|
rows_affected = self.db_manager.execute_update(query, params)
|
|
if rows_affected > 0:
|
|
self.logger.debug(f"Mesh graph: Successfully wrote edge {edge_key} to database ({'INSERT' if is_new else 'UPDATE'}, {rows_affected} rows)")
|
|
else:
|
|
self.logger.warning(f"Mesh graph: Edge write returned 0 rows affected for {edge_key}")
|
|
|
|
except Exception as e:
|
|
self.logger.warning(f"Error writing edge to database: {e}")
|
|
import traceback
|
|
self.logger.debug(traceback.format_exc())
|
|
|
|
# UPDATE statement used for both single-edge writes and batch executemany
|
|
_MESH_EDGE_UPDATE_QUERY = '''
|
|
UPDATE mesh_connections
|
|
SET observation_count = ?, last_seen = ?,
|
|
avg_hop_position = ?, geographic_distance = ?,
|
|
from_public_key = CASE WHEN ? IS NOT NULL THEN ? ELSE from_public_key END,
|
|
to_public_key = CASE WHEN ? IS NOT NULL THEN ? ELSE to_public_key END
|
|
WHERE from_prefix = ? AND to_prefix = ?
|
|
'''
|
|
|
|
def _build_update_params_for_edge(
|
|
self,
|
|
edge_key: Tuple[str, str],
|
|
conn: Optional[sqlite3.Connection],
|
|
location_cache: Optional[Dict[str, Tuple[float, float]]],
|
|
) -> Optional[Tuple]:
|
|
"""Build UPDATE params for an edge (for batch executemany). Returns None to skip."""
|
|
if edge_key not in self.edges:
|
|
return None
|
|
try:
|
|
edge = self.edges[edge_key]
|
|
# Recalculate distance if we have public keys (same logic as _write_edge_to_db)
|
|
if edge.get('from_public_key') or edge.get('to_public_key'):
|
|
recalculated_distance = self._recalculate_distance_if_needed(
|
|
edge, conn=conn, location_cache=location_cache
|
|
)
|
|
if recalculated_distance is not None:
|
|
edge['geographic_distance'] = recalculated_distance
|
|
current_distance = edge.get('geographic_distance')
|
|
if (edge.get('from_public_key') or edge.get('to_public_key')) and current_distance:
|
|
recalculated = self._recalculate_distance_if_needed(
|
|
edge, conn=conn, location_cache=location_cache
|
|
)
|
|
if recalculated is not None and abs(recalculated - current_distance) / max(current_distance, 1.0) > 0.2:
|
|
edge['geographic_distance'] = recalculated
|
|
from_key = edge.get('from_public_key')
|
|
to_key = edge.get('to_public_key')
|
|
last_seen = edge['last_seen']
|
|
if isinstance(last_seen, datetime):
|
|
last_seen = last_seen.isoformat()
|
|
return (
|
|
edge['observation_count'],
|
|
last_seen,
|
|
edge.get('avg_hop_position'),
|
|
edge.get('geographic_distance'),
|
|
from_key,
|
|
from_key,
|
|
to_key,
|
|
to_key,
|
|
edge['from_prefix'],
|
|
edge['to_prefix'],
|
|
)
|
|
except Exception as e:
|
|
self.logger.debug(f"Error building update params for {edge_key}: {e}")
|
|
return None
|
|
|
|
def prune_expired_edges(self) -> int:
|
|
"""Remove edges from the in-memory graph that have exceeded graph_edge_expiration_days.
|
|
|
|
Only evicts from RAM — the database rows are kept so that historical data is
|
|
preserved and can be reloaded if the expiration window is later widened.
|
|
|
|
Returns:
|
|
int: Number of edges evicted.
|
|
"""
|
|
if self.edge_expiration_days <= 0:
|
|
return 0
|
|
|
|
cutoff = datetime.now() - timedelta(days=self.edge_expiration_days)
|
|
expired_keys = []
|
|
for edge_key, edge in self.edges.items():
|
|
last_seen = edge.get('last_seen')
|
|
if last_seen is None:
|
|
continue
|
|
if isinstance(last_seen, str):
|
|
try:
|
|
last_seen = datetime.fromisoformat(last_seen.replace('Z', '+00:00'))
|
|
except ValueError:
|
|
continue
|
|
if last_seen < cutoff:
|
|
expired_keys.append(edge_key)
|
|
|
|
for edge_key in expired_keys:
|
|
from_prefix, to_prefix = edge_key
|
|
del self.edges[edge_key]
|
|
# Clean up adjacency indexes
|
|
if from_prefix in self._outgoing_index:
|
|
self._outgoing_index[from_prefix].discard(to_prefix)
|
|
if not self._outgoing_index[from_prefix]:
|
|
del self._outgoing_index[from_prefix]
|
|
if to_prefix in self._incoming_index:
|
|
self._incoming_index[to_prefix].discard(from_prefix)
|
|
if not self._incoming_index[to_prefix]:
|
|
del self._incoming_index[to_prefix]
|
|
# Drop stale notification timestamp if present
|
|
self._notification_timestamps.pop(edge_key, None)
|
|
|
|
if expired_keys:
|
|
self.logger.debug(f"Pruned {len(expired_keys)} expired graph edges (older than {self.edge_expiration_days} days)")
|
|
return len(expired_keys)
|
|
|
|
def delete_expired_edges_from_db(self, days: int) -> int:
|
|
"""Delete mesh_connections rows older than the given days.
|
|
Keeps the on-disk table aligned with in-memory pruning and prevents unbounded growth.
|
|
Called from the scheduler (e.g. daily). Use Data_Retention mesh_connections_retention_days
|
|
or Path_Command graph_edge_expiration_days.
|
|
Returns:
|
|
int: Number of rows deleted.
|
|
"""
|
|
if days <= 0:
|
|
return 0
|
|
try:
|
|
deleted = self.db_manager.execute_update(
|
|
"DELETE FROM mesh_connections WHERE last_seen < datetime('now', ?)",
|
|
(f'-{days} days',)
|
|
)
|
|
if deleted > 0:
|
|
self.logger.info(f"Cleaned up {deleted} old mesh_connections entries (older than {days} days)")
|
|
return deleted
|
|
except Exception as e:
|
|
self.logger.error(f"Error cleaning up mesh_connections: {e}")
|
|
return 0
|
|
|
|
def _start_batch_writer(self):
|
|
"""Start background task for batched writes."""
|
|
def batch_writer_loop():
|
|
while not self._shutdown_event.is_set():
|
|
self._shutdown_event.wait(self.batch_interval)
|
|
if not self._shutdown_event.is_set():
|
|
# Flush synchronously (database operations are synchronous)
|
|
self._flush_pending_updates_sync()
|
|
# Periodically evict expired edges from RAM
|
|
self.prune_expired_edges()
|
|
|
|
import threading
|
|
batch_thread = threading.Thread(target=batch_writer_loop, daemon=True)
|
|
batch_thread.start()
|
|
self._batch_thread = batch_thread
|
|
|
|
def _flush_pending_updates_sync(self):
|
|
"""Flush all pending edge updates to database (synchronous version).
|
|
Uses a single connection for the entire batch to avoid 'unable to open database file'
|
|
when many edges are written in quick succession.
|
|
Handles both new edges (INSERT) and existing edges (UPDATE).
|
|
"""
|
|
with self.pending_lock:
|
|
if not self.pending_updates:
|
|
return
|
|
|
|
updates = list(self.pending_updates)
|
|
self.pending_updates.clear()
|
|
|
|
location_cache: Dict[str, Tuple[float, float]] = {}
|
|
try:
|
|
with self.db_manager.connection() as conn:
|
|
cursor = conn.cursor()
|
|
for edge_key in updates:
|
|
if edge_key not in self.edges:
|
|
continue
|
|
edge = self.edges[edge_key]
|
|
# Recalculate distance if we have public keys
|
|
if edge.get('from_public_key') or edge.get('to_public_key'):
|
|
recalculated = self._recalculate_distance_if_needed(
|
|
edge, conn=conn, location_cache=location_cache
|
|
)
|
|
if recalculated is not None:
|
|
edge['geographic_distance'] = recalculated
|
|
# Check if edge exists in DB
|
|
cursor.execute(
|
|
'SELECT 1 FROM mesh_connections WHERE from_prefix = ? AND to_prefix = ?',
|
|
(edge_key[0], edge_key[1]),
|
|
)
|
|
is_new = cursor.fetchone() is None
|
|
# Distance was already recalculated above — tell _write_edge_to_db to skip it
|
|
self._write_edge_to_db(edge_key, is_new, conn=conn, location_cache=location_cache,
|
|
skip_distance_recalc=True)
|
|
conn.commit()
|
|
except Exception as e:
|
|
self.logger.warning(f"Error flushing graph updates: {e}")
|
|
# Connection already closed by context manager; rollback happened on exit if needed
|
|
|
|
if updates:
|
|
self.logger.debug(f"Flushed {len(updates)} pending graph edge updates")
|
|
|
|
async def _flush_pending_updates(self):
|
|
"""Flush all pending edge updates to database (async wrapper)."""
|
|
self._flush_pending_updates_sync()
|
|
|
|
def has_edge(self, from_prefix: str, to_prefix: str) -> bool:
|
|
"""Check if an edge exists in the graph (exact or prefix match).
|
|
|
|
Args:
|
|
from_prefix: Source node prefix.
|
|
to_prefix: Destination node prefix.
|
|
|
|
Returns:
|
|
bool: True if edge exists.
|
|
"""
|
|
return self.get_edge(from_prefix, to_prefix) is not None
|
|
|
|
def get_edge(self, from_prefix: str, to_prefix: str) -> Optional[Dict]:
|
|
"""Get edge data if it exists (exact key first, then prefix match).
|
|
|
|
Args:
|
|
from_prefix: Source node prefix.
|
|
to_prefix: Destination node prefix.
|
|
|
|
Returns:
|
|
Dict with edge data or None if not found.
|
|
"""
|
|
from_norm = from_prefix.lower().strip() if from_prefix else ""
|
|
to_norm = to_prefix.lower().strip() if to_prefix else ""
|
|
if not from_norm or not to_norm:
|
|
return None
|
|
# Exact key first
|
|
exact = self.edges.get((from_norm, to_norm))
|
|
if exact is not None:
|
|
return exact
|
|
return self._get_edge_by_prefix_match(from_norm, to_norm)
|
|
|
|
def get_outgoing_edges(self, prefix: str) -> List[Dict]:
|
|
"""Get all edges originating from a node (prefix match: returns edges where from_prefix matches prefix).
|
|
|
|
Args:
|
|
prefix: Node prefix (2, 4, or 6 hex chars).
|
|
|
|
Returns:
|
|
List of edge dictionaries.
|
|
"""
|
|
prefix = prefix.lower().strip() if prefix else ""
|
|
if not prefix:
|
|
return []
|
|
result = []
|
|
for edge in self.edges.values():
|
|
if self._prefix_match(edge['from_prefix'], prefix):
|
|
result.append(edge)
|
|
return result
|
|
|
|
def get_incoming_edges(self, prefix: str) -> List[Dict]:
|
|
"""Get all edges ending at a node (prefix match: returns edges where to_prefix matches prefix).
|
|
|
|
Args:
|
|
prefix: Node prefix (2, 4, or 6 hex chars).
|
|
|
|
Returns:
|
|
List of edge dictionaries.
|
|
"""
|
|
prefix = prefix.lower().strip() if prefix else ""
|
|
if not prefix:
|
|
return []
|
|
result = []
|
|
for edge in self.edges.values():
|
|
if self._prefix_match(edge['to_prefix'], prefix):
|
|
result.append(edge)
|
|
return result
|
|
|
|
def validate_path_segment(self, from_prefix: str, to_prefix: str,
|
|
min_observations: int = 1,
|
|
check_bidirectional: bool = False) -> Tuple[bool, float]:
|
|
"""Validate a path segment using graph data.
|
|
|
|
Args:
|
|
from_prefix: Source node prefix.
|
|
to_prefix: Destination node prefix.
|
|
min_observations: Minimum observations required for confidence.
|
|
check_bidirectional: If True, check if reverse edge exists and boost confidence.
|
|
|
|
Returns:
|
|
Tuple of (is_valid, confidence_score) where confidence is 0.0-1.0.
|
|
"""
|
|
edge = self.get_edge(from_prefix, to_prefix)
|
|
|
|
if not edge:
|
|
return (False, 0.0)
|
|
|
|
if edge['observation_count'] < min_observations:
|
|
return (False, 0.0)
|
|
|
|
# Confidence based on observation count and recency
|
|
obs_count = edge['observation_count']
|
|
last_seen = edge['last_seen']
|
|
|
|
if isinstance(last_seen, str):
|
|
last_seen = datetime.fromisoformat(last_seen.replace('Z', '+00:00'))
|
|
|
|
hours_ago = (datetime.now() - last_seen).total_seconds() / 3600.0
|
|
|
|
# Observation count confidence (logarithmic scale)
|
|
obs_confidence = min(1.0, 0.3 + (0.7 * (1.0 - 1.0 / (1.0 + obs_count / 10.0))))
|
|
|
|
# Recency confidence (exponential decay, 48 hour half-life for longer advert intervals)
|
|
recency_confidence = 1.0 if hours_ago < 1 else max(0.0, 2.0 ** (-hours_ago / 48.0))
|
|
|
|
# Combined confidence
|
|
confidence = (obs_confidence * 0.6) + (recency_confidence * 0.4)
|
|
|
|
# Bidirectional edge bonus
|
|
if check_bidirectional:
|
|
reverse_edge = self.get_edge(to_prefix, from_prefix)
|
|
if reverse_edge and reverse_edge['observation_count'] >= min_observations:
|
|
# Bidirectional connection is more reliable
|
|
confidence = min(1.0, confidence + 0.15)
|
|
|
|
return (True, confidence)
|
|
|
|
def validate_path(self, path_nodes: List[str], min_observations: int = 1) -> Tuple[bool, float]:
|
|
"""Validate an entire path using graph data.
|
|
|
|
Args:
|
|
path_nodes: List of node prefixes in path order.
|
|
min_observations: Minimum observations required per edge.
|
|
|
|
Returns:
|
|
Tuple of (is_valid, average_confidence).
|
|
"""
|
|
if len(path_nodes) < 2:
|
|
return (True, 1.0) # Single node or empty path is always valid
|
|
|
|
validations = []
|
|
for i in range(len(path_nodes) - 1):
|
|
from_node = path_nodes[i]
|
|
to_node = path_nodes[i + 1]
|
|
is_valid, confidence = self.validate_path_segment(from_node, to_node, min_observations)
|
|
|
|
if not is_valid:
|
|
return (False, 0.0)
|
|
|
|
validations.append(confidence)
|
|
|
|
# Return average confidence
|
|
avg_confidence = sum(validations) / len(validations) if validations else 0.0
|
|
return (True, avg_confidence)
|
|
|
|
def get_candidate_score(self, candidate_prefix: str, prev_prefix: Optional[str],
|
|
next_prefix: Optional[str], min_observations: int = 1,
|
|
hop_position: Optional[int] = None,
|
|
use_bidirectional: bool = True,
|
|
use_hop_position: bool = True) -> float:
|
|
"""Get graph-based score for a candidate node in a path.
|
|
|
|
Args:
|
|
candidate_prefix: The candidate node prefix.
|
|
prev_prefix: Previous node in path (if available).
|
|
next_prefix: Next node in path (if available).
|
|
min_observations: Minimum observations required.
|
|
hop_position: Current position in path (0-based index) for hop position validation.
|
|
use_bidirectional: If True, check bidirectional edges for higher confidence.
|
|
use_hop_position: If True, validate against avg_hop_position if available.
|
|
|
|
Returns:
|
|
Score from 0.0 to 1.0 based on graph evidence.
|
|
"""
|
|
score = 0.0
|
|
evidence_count = 0
|
|
scores = []
|
|
|
|
# Check edge from previous node
|
|
if prev_prefix:
|
|
is_valid, confidence = self.validate_path_segment(
|
|
prev_prefix, candidate_prefix, min_observations,
|
|
check_bidirectional=use_bidirectional
|
|
)
|
|
if is_valid:
|
|
scores.append(confidence)
|
|
score += confidence
|
|
evidence_count += 1
|
|
|
|
# Check edge to next node
|
|
if next_prefix:
|
|
is_valid, confidence = self.validate_path_segment(
|
|
candidate_prefix, next_prefix, min_observations,
|
|
check_bidirectional=use_bidirectional
|
|
)
|
|
if is_valid:
|
|
scores.append(confidence)
|
|
score += confidence
|
|
evidence_count += 1
|
|
|
|
if evidence_count == 0:
|
|
return 0.0
|
|
|
|
# Calculate base score as average
|
|
base_score = score / evidence_count
|
|
|
|
# Hop position validation bonus
|
|
if use_hop_position and hop_position is not None:
|
|
# Check if candidate appears in expected position based on avg_hop_position
|
|
# Check both incoming and outgoing edges for hop position data
|
|
hop_position_match = False
|
|
|
|
if prev_prefix:
|
|
edge = self.get_edge(prev_prefix, candidate_prefix)
|
|
if edge and edge.get('avg_hop_position') is not None:
|
|
# Allow some tolerance (within 0.5 of expected position)
|
|
expected_pos = edge['avg_hop_position']
|
|
if abs(hop_position - expected_pos) <= 0.5:
|
|
hop_position_match = True
|
|
|
|
if not hop_position_match and next_prefix:
|
|
edge = self.get_edge(candidate_prefix, next_prefix)
|
|
if edge and edge.get('avg_hop_position') is not None:
|
|
# For outgoing edge, expected position is one less (since it's the from node)
|
|
expected_pos = edge['avg_hop_position'] - 1
|
|
if abs(hop_position - expected_pos) <= 0.5:
|
|
hop_position_match = True
|
|
|
|
if hop_position_match:
|
|
base_score = min(1.0, base_score + 0.1)
|
|
|
|
# Geographic distance validation (if available)
|
|
# Use stored geographic_distance from edges when available (more accurate)
|
|
if prev_prefix or next_prefix:
|
|
# Check if we have geographic distance data that suggests reasonable routing
|
|
# This is informational - we don't heavily penalize based on distance alone
|
|
# but can use it as a tie-breaker
|
|
geographic_available = False
|
|
if prev_prefix:
|
|
edge = self.get_edge(prev_prefix, candidate_prefix)
|
|
if edge and edge.get('geographic_distance') is not None:
|
|
geographic_available = True
|
|
if not geographic_available and next_prefix:
|
|
edge = self.get_edge(candidate_prefix, next_prefix)
|
|
if edge and edge.get('geographic_distance') is not None:
|
|
geographic_available = True
|
|
|
|
# Having geographic data increases confidence slightly (indicates well-tracked edge)
|
|
if geographic_available:
|
|
base_score = min(1.0, base_score + 0.05)
|
|
|
|
return base_score
|
|
|
|
def find_intermediate_nodes(self, from_prefix: str, to_prefix: str,
|
|
min_observations: int = 1,
|
|
max_hops: int = 2) -> List[Tuple[str, float]]:
|
|
"""Find intermediate nodes that connect from_prefix to to_prefix.
|
|
|
|
Uses multi-hop path inference to find nodes that connect two prefixes
|
|
when a direct edge may not exist or have low confidence.
|
|
|
|
Args:
|
|
from_prefix: Source node prefix.
|
|
to_prefix: Destination node prefix.
|
|
min_observations: Minimum observations required per edge.
|
|
max_hops: Maximum number of hops to search (default: 2, fallback to 3).
|
|
|
|
Returns:
|
|
List of (candidate_prefix, score) tuples sorted by score (highest first).
|
|
Score is 0.0-1.0 based on path strength.
|
|
"""
|
|
from_prefix = from_prefix.lower().strip() if from_prefix else ""
|
|
to_prefix = to_prefix.lower().strip() if to_prefix else ""
|
|
if not from_prefix or not to_prefix:
|
|
return []
|
|
|
|
candidates: Dict[str, float] = {}
|
|
|
|
# Try 2-hop paths first: from_prefix -> intermediate -> to_prefix
|
|
outgoing_edges = self.get_outgoing_edges(from_prefix)
|
|
|
|
for edge in outgoing_edges:
|
|
intermediate_prefix = edge['to_prefix']
|
|
|
|
# Skip if this is the destination (direct edge case)
|
|
if intermediate_prefix == to_prefix:
|
|
continue
|
|
|
|
# Check if intermediate connects to destination
|
|
to_edge = self.get_edge(intermediate_prefix, to_prefix)
|
|
if not to_edge or to_edge['observation_count'] < min_observations:
|
|
continue
|
|
|
|
# Validate both edges
|
|
from_valid, from_confidence = self.validate_path_segment(
|
|
from_prefix, intermediate_prefix, min_observations,
|
|
check_bidirectional=True
|
|
)
|
|
to_valid, to_confidence = self.validate_path_segment(
|
|
intermediate_prefix, to_prefix, min_observations,
|
|
check_bidirectional=True
|
|
)
|
|
|
|
if from_valid and to_valid:
|
|
# Score is minimum of both edges (weakest link)
|
|
path_score = min(from_confidence, to_confidence)
|
|
|
|
# Bidirectional path bonus
|
|
reverse_from = self.get_edge(intermediate_prefix, from_prefix)
|
|
reverse_to = self.get_edge(to_prefix, intermediate_prefix)
|
|
bidirectional_bonus = 1.0
|
|
if reverse_from and reverse_from['observation_count'] >= min_observations:
|
|
if reverse_to and reverse_to['observation_count'] >= min_observations:
|
|
# Both edges are bidirectional - strong evidence
|
|
bidirectional_bonus = 1.2
|
|
else:
|
|
bidirectional_bonus = 1.1
|
|
elif reverse_to and reverse_to['observation_count'] >= min_observations:
|
|
bidirectional_bonus = 1.1
|
|
|
|
path_score = min(1.0, path_score * bidirectional_bonus)
|
|
|
|
# Use best score if we've seen this candidate before
|
|
if intermediate_prefix not in candidates or path_score > candidates[intermediate_prefix]:
|
|
candidates[intermediate_prefix] = path_score
|
|
|
|
# If no 2-hop paths found and max_hops >= 3, try 3-hop paths
|
|
if not candidates and max_hops >= 3:
|
|
# Find 3-hop paths: from_prefix -> intermediate1 -> intermediate2 -> to_prefix
|
|
for edge1 in outgoing_edges:
|
|
intermediate1 = edge1['to_prefix']
|
|
if intermediate1 == to_prefix:
|
|
continue
|
|
|
|
# Get edges from intermediate1
|
|
intermediate1_edges = self.get_outgoing_edges(intermediate1)
|
|
|
|
for edge2 in intermediate1_edges:
|
|
intermediate2 = edge2['to_prefix']
|
|
if intermediate2 == from_prefix or intermediate2 == intermediate1:
|
|
continue
|
|
|
|
# Check if intermediate2 connects to destination
|
|
to_edge = self.get_edge(intermediate2, to_prefix)
|
|
if not to_edge or to_edge['observation_count'] < min_observations:
|
|
continue
|
|
|
|
# Validate all three edges
|
|
valid1, conf1 = self.validate_path_segment(
|
|
from_prefix, intermediate1, min_observations
|
|
)
|
|
valid2, conf2 = self.validate_path_segment(
|
|
intermediate1, intermediate2, min_observations
|
|
)
|
|
valid3, conf3 = self.validate_path_segment(
|
|
intermediate2, to_prefix, min_observations
|
|
)
|
|
|
|
if valid1 and valid2 and valid3:
|
|
# Score is minimum of all three edges
|
|
path_score = min(conf1, conf2, conf3)
|
|
|
|
# 3-hop paths are less reliable, so reduce score
|
|
path_score *= 0.8
|
|
|
|
# Use intermediate2 as candidate (the one before destination)
|
|
if intermediate2 not in candidates or path_score > candidates[intermediate2]:
|
|
candidates[intermediate2] = path_score
|
|
|
|
# Sort by score (highest first) and return
|
|
sorted_candidates = sorted(candidates.items(), key=lambda x: x[1], reverse=True)
|
|
return sorted_candidates
|
|
|
|
def shutdown(self):
|
|
"""Shutdown graph, flushing all pending writes."""
|
|
# Do not log here: atexit may run after the logger's stream is closed.
|
|
# Signal shutdown
|
|
self._shutdown_event.set()
|
|
|
|
# Flush pending updates
|
|
try:
|
|
self._flush_pending_updates_sync()
|
|
except Exception:
|
|
pass # Avoid logging; stream may be closed during atexit
|