mirror of
https://github.com/agessaman/meshcore-bot.git
synced 2026-03-30 20:15:40 +00:00
- Updated `config.ini.example` to include a new option for additional hashtag channels to decode in the packet stream. - Modified `BotDataViewer` to retrieve and display additional decode-only channels from the configuration. - Improved packet handling in `message_handler.py` to capture full packet data for web viewer integration. - Enhanced the web viewer's JavaScript to support detailed packet analysis and display, including color-coded hex breakdowns and improved user interface elements. - Added new styles and scripts to the web viewer templates for better visual representation of packet data and improved user experience.
2115 lines
109 KiB
Python
2115 lines
109 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Message handling functionality for the MeshCore Bot
|
|
Processes incoming messages and routes them to appropriate command handlers
|
|
"""
|
|
|
|
import asyncio
|
|
import time
|
|
import json
|
|
import re
|
|
import copy
|
|
from typing import List, Optional, Dict, Any
|
|
from meshcore import EventType
|
|
|
|
from .models import MeshMessage
|
|
from .enums import PayloadType, PayloadVersion, RouteType, AdvertFlags, DeviceRole
|
|
from .utils import calculate_packet_hash
|
|
from .security_utils import sanitize_input
|
|
|
|
|
|
class MessageHandler:
|
|
"""Handles incoming messages and routes them to command processors.
|
|
|
|
This class is responsible for processing various types of MeshCore events,
|
|
including contact messages (DMs), raw data packets, advertisement packets,
|
|
and RF log data. It also maintains caches for SNR/RSSI data and correlates
|
|
messages with routing information.
|
|
"""
|
|
|
|
def __init__(self, bot):
|
|
self.bot = bot
|
|
self.logger = bot.logger
|
|
# Cache for storing SNR and RSSI data from RF log events
|
|
self.snr_cache = {}
|
|
self.rssi_cache = {}
|
|
|
|
# Load configuration for RF data correlation
|
|
self.rf_data_timeout = float(bot.config.get('Bot', 'rf_data_timeout', fallback='15.0'))
|
|
self.message_timeout = float(bot.config.get('Bot', 'message_correlation_timeout', fallback='10.0'))
|
|
self.enhanced_correlation = bot.config.getboolean('Bot', 'enable_enhanced_correlation', fallback=True)
|
|
|
|
# Time-based cache for recent RF log data
|
|
self.recent_rf_data = []
|
|
|
|
# Message correlation system to prevent race conditions
|
|
self.pending_messages = {} # Store messages waiting for RF data
|
|
|
|
# Enhanced RF data storage with better correlation
|
|
self.rf_data_by_timestamp = {} # Index by timestamp for faster lookup
|
|
self.rf_data_by_pubkey = {} # Index by pubkey for exact matches
|
|
|
|
# Cache memory management
|
|
self._max_rf_cache_size = 1000 # Maximum entries per cache
|
|
self._cache_cleanup_interval = 60 # Cleanup every 60 seconds
|
|
self._last_cache_cleanup = time.time()
|
|
|
|
# Multitest command listener (for collecting paths during listening window)
|
|
self.multitest_listener = None
|
|
|
|
self.logger.info(f"RF Data Correlation: timeout={self.rf_data_timeout}s, enhanced={self.enhanced_correlation}")
|
|
|
|
async def handle_contact_message(self, event, metadata=None):
|
|
"""Handle incoming contact message (DM).
|
|
|
|
Processes direct messages, extracts path information, correlates with
|
|
RF data for signal metrics (SNR/RSSI), and forwards to the command processor.
|
|
|
|
Args:
|
|
event: The MeshCore event object containing the message payload.
|
|
metadata: Optional metadata dictionary associated with the event.
|
|
"""
|
|
try:
|
|
# Copy payload immediately to avoid segfault if event is freed
|
|
import copy
|
|
payload = copy.deepcopy(event.payload) if hasattr(event, 'payload') else None
|
|
if payload is None:
|
|
self.logger.warning("Contact message event has no payload")
|
|
return
|
|
|
|
# Debug: Log the full payload structure
|
|
self.logger.debug(f"Contact message payload: {payload}")
|
|
self.logger.debug(f"Payload keys: {list(payload.keys())}")
|
|
self.logger.debug(f"Event metadata: {event.metadata if hasattr(event, 'metadata') else 'None'}")
|
|
|
|
self.logger.info(f"Received DM from {payload.get('pubkey_prefix', 'unknown')}: {payload.get('text', '')}")
|
|
|
|
# Extract path information from contacts using pubkey_prefix
|
|
path_info = "Unknown"
|
|
path_len = payload.get('path_len', 255)
|
|
|
|
if metadata and 'pubkey_prefix' in metadata:
|
|
pubkey_prefix = metadata.get('pubkey_prefix', '')
|
|
if pubkey_prefix:
|
|
self.logger.debug(f"Looking up path for pubkey_prefix: {pubkey_prefix}")
|
|
|
|
# Look up the contact to get path information
|
|
if hasattr(self.bot.meshcore, 'contacts') and self.bot.meshcore.contacts:
|
|
for contact_key, contact_data in self.bot.meshcore.contacts.items():
|
|
if contact_data.get('public_key', '').startswith(pubkey_prefix):
|
|
out_path = contact_data.get('out_path', '')
|
|
out_path_len = contact_data.get('out_path_len', -1)
|
|
|
|
if out_path and out_path_len > 0:
|
|
# Convert hex path to readable node IDs using first 2 chars of pubkey
|
|
try:
|
|
path_bytes = bytes.fromhex(out_path)
|
|
path_nodes = []
|
|
for i in range(0, len(path_bytes), 2):
|
|
if i + 1 < len(path_bytes):
|
|
node_id = int.from_bytes(path_bytes[i:i+2], byteorder='little')
|
|
# Convert to 2-character hex representation
|
|
path_nodes.append(f"{node_id:02x}")
|
|
|
|
path_info = f"{','.join(path_nodes)} ({out_path_len} hops)"
|
|
self.logger.debug(f"Found path info: {path_info}")
|
|
except Exception as e:
|
|
self.logger.debug(f"Error converting path: {e}")
|
|
path_info = f"Path: {out_path} ({out_path_len} hops)"
|
|
break
|
|
elif out_path_len == 0:
|
|
path_info = "Direct"
|
|
self.logger.debug(f"Direct connection: {path_info}")
|
|
break
|
|
else:
|
|
path_info = "Unknown path"
|
|
self.logger.debug(f"No path info available: {path_info}")
|
|
break
|
|
|
|
# Fallback to basic path logic if no detailed info found
|
|
if path_info == "Unknown":
|
|
if path_len == 255:
|
|
path_info = "Direct"
|
|
elif path_len > 0:
|
|
path_info = f"Routed ({path_len} hops)"
|
|
elif path_len == 0:
|
|
path_info = "Direct"
|
|
|
|
# Try to decode packet and extract routing information from stored raw data
|
|
decoded_packet = None
|
|
routing_info = None
|
|
# Look for raw packet data in recent RF data
|
|
# Extract packet prefix from message raw_hex for correlation
|
|
message_raw_hex = payload.get('raw_hex', '')
|
|
message_packet_prefix = message_raw_hex[:32] if message_raw_hex else None
|
|
message_pubkey = payload.get('pubkey_prefix', '') # Keep for contact lookup
|
|
|
|
if message_packet_prefix:
|
|
recent_rf_data = self.find_recent_rf_data(message_packet_prefix)
|
|
elif message_pubkey:
|
|
# Fallback to pubkey correlation if no raw_hex
|
|
recent_rf_data = self.find_recent_rf_data(message_pubkey)
|
|
if recent_rf_data and recent_rf_data.get('raw_hex'):
|
|
# Use payload field if available, otherwise fall back to raw_hex
|
|
payload_hex = recent_rf_data.get('payload')
|
|
decoded_packet = self.decode_meshcore_packet(recent_rf_data['raw_hex'], payload_hex)
|
|
if decoded_packet:
|
|
self.logger.debug(f"Decoded packet for routing from RF data: {decoded_packet}")
|
|
|
|
# Extract routing information
|
|
if recent_rf_data.get('routing_info'):
|
|
routing_info = recent_rf_data['routing_info']
|
|
self.logger.debug(f"Found routing info: {routing_info}")
|
|
|
|
# If we have routing info, use it for path information
|
|
if routing_info:
|
|
path_len = routing_info.get('path_length', 0)
|
|
if path_len > 0:
|
|
path_hex = routing_info.get('path_hex', '')
|
|
path_nodes = routing_info.get('path_nodes', [])
|
|
route_type = routing_info.get('route_type', 'Unknown')
|
|
|
|
# Convert path to readable format
|
|
if path_nodes:
|
|
path_info = f"{','.join(path_nodes)} ({path_len} hops via {route_type})"
|
|
else:
|
|
path_info = f"Path: {path_hex} ({path_len} hops via {route_type})"
|
|
|
|
self.logger.info(f"🛣️ MESSAGE ROUTING: {path_info}")
|
|
else:
|
|
path_info = f"Direct via {routing_info.get('route_type', 'Unknown')}"
|
|
self.logger.info(f"📡 DIRECT MESSAGE: {path_info}")
|
|
|
|
# Get additional metadata - try multiple sources for SNR and RSSI
|
|
snr = 'unknown'
|
|
rssi = 'unknown'
|
|
|
|
# Try to get SNR from payload first - check multiple possible field names
|
|
if 'SNR' in payload:
|
|
snr = payload.get('SNR')
|
|
elif 'snr' in payload:
|
|
snr = payload.get('snr')
|
|
elif 'signal_to_noise' in payload:
|
|
snr = payload.get('signal_to_noise')
|
|
elif 'signal_noise_ratio' in payload:
|
|
snr = payload.get('signal_noise_ratio')
|
|
# Try to get SNR from event metadata if available
|
|
elif metadata:
|
|
if 'snr' in metadata:
|
|
snr = metadata.get('snr')
|
|
elif 'SNR' in metadata:
|
|
snr = metadata.get('SNR')
|
|
|
|
# If still no SNR, try to get it from the cache using pubkey prefix from payload
|
|
if snr == 'unknown':
|
|
pubkey_prefix = payload.get('pubkey_prefix', '')
|
|
if pubkey_prefix and pubkey_prefix in self.snr_cache:
|
|
snr = self.snr_cache[pubkey_prefix]
|
|
self.logger.debug(f"Retrieved cached SNR {snr} for pubkey {pubkey_prefix}")
|
|
|
|
# Try to get RSSI from payload first
|
|
if 'RSSI' in payload:
|
|
rssi = payload.get('RSSI')
|
|
elif 'rssi' in payload:
|
|
rssi = payload.get('rssi')
|
|
elif 'signal_strength' in payload:
|
|
rssi = payload.get('signal_strength')
|
|
# Try to get RSSI from event metadata if available
|
|
elif metadata:
|
|
if 'rssi' in metadata:
|
|
rssi = metadata.get('rssi')
|
|
elif 'RSSI' in metadata:
|
|
rssi = metadata.get('RSSI')
|
|
|
|
# If still no RSSI, try to get it from the cache using pubkey prefix from payload
|
|
if rssi == 'unknown':
|
|
pubkey_prefix = payload.get('pubkey_prefix', '')
|
|
if pubkey_prefix and pubkey_prefix in self.rssi_cache:
|
|
rssi = self.rssi_cache[pubkey_prefix]
|
|
self.logger.debug(f"Retrieved cached RSSI {rssi} for pubkey {pubkey_prefix}")
|
|
|
|
# For DMs, we can't decode the encrypted packet, but we can get SNR/RSSI from the payload
|
|
# For channel messages, we can decode the packet since they use shared keys
|
|
self.logger.debug(f"Processing DM from packet prefix: {message_packet_prefix}, pubkey: {message_pubkey}")
|
|
|
|
# DMs are encrypted with recipient's public key, so we can't decode the raw packet
|
|
# But we can get SNR/RSSI from the message payload if available
|
|
if 'SNR' in payload:
|
|
snr = payload.get('SNR')
|
|
self.logger.debug(f"Using SNR from DM payload: {snr}")
|
|
elif 'snr' in payload:
|
|
snr = payload.get('snr')
|
|
self.logger.debug(f"Using SNR from DM payload: {snr}")
|
|
|
|
if 'RSSI' in payload:
|
|
rssi = payload.get('RSSI')
|
|
self.logger.debug(f"Using RSSI from DM payload: {rssi}")
|
|
elif 'rssi' in payload:
|
|
rssi = payload.get('rssi')
|
|
self.logger.debug(f"Using RSSI from DM payload: {rssi}")
|
|
|
|
# Since DMs don't include SNR/RSSI in payload, try to get it from recent RF data
|
|
# This is a fallback since RF data often comes right before/after the message
|
|
if snr == 'unknown' or rssi == 'unknown':
|
|
recent_rf_data = self.find_recent_rf_data()
|
|
if recent_rf_data:
|
|
self.logger.debug(f"Found recent RF data for DM: {recent_rf_data}")
|
|
|
|
if snr == 'unknown' and recent_rf_data.get('snr'):
|
|
snr = recent_rf_data['snr']
|
|
self.logger.debug(f"Using SNR from recent RF data: {snr}")
|
|
|
|
if rssi == 'unknown' and recent_rf_data.get('rssi'):
|
|
rssi = recent_rf_data['rssi']
|
|
self.logger.debug(f"Using RSSI from recent RF data: {rssi}")
|
|
|
|
# For DMs, we can't determine the actual routing path from encrypted data
|
|
# Use the path_len from the payload (255 means unknown/direct)
|
|
path_len = payload.get('path_len', 255)
|
|
if path_len == 255:
|
|
path_info = "Direct (0 hops)"
|
|
else:
|
|
path_info = f"Routed through {path_len} hops"
|
|
|
|
self.logger.debug(f"DM path info: {path_info}")
|
|
|
|
timestamp = payload.get('sender_timestamp', 'unknown')
|
|
|
|
# Look up contact name from pubkey prefix
|
|
sender_id = payload.get('pubkey_prefix', '')
|
|
sender_name = sender_id # Default to sender_id
|
|
if hasattr(self.bot.meshcore, 'contacts') and self.bot.meshcore.contacts:
|
|
for contact_key, contact_data in self.bot.meshcore.contacts.items():
|
|
if contact_data.get('public_key', '').startswith(sender_id):
|
|
# Use the contact name if available, otherwise use adv_name
|
|
contact_name = contact_data.get('name', contact_data.get('adv_name', sender_id))
|
|
sender_name = contact_name
|
|
break
|
|
|
|
# Get the full public key from contacts if available
|
|
sender_pubkey = payload.get('pubkey_prefix', '')
|
|
sender_pubkey = sender_id # Default to sender_id
|
|
if hasattr(self.bot.meshcore, 'contacts') and self.bot.meshcore.contacts:
|
|
for contact_key, contact_data in self.bot.meshcore.contacts.items():
|
|
if contact_data.get('public_key', '').startswith(sender_id):
|
|
# Use the full public key from the contact
|
|
sender_pubkey = contact_data.get('public_key', sender_id)
|
|
self.logger.debug(f"Found full public key for {sender_name}: {sender_pubkey[:16]}...")
|
|
break
|
|
|
|
# Sanitize message content to prevent injection attacks
|
|
# Note: Firmware enforces 150-char limit at hardware level, so we disable length check
|
|
# but still strip control characters for security
|
|
message_content = payload.get('text', '')
|
|
message_content = sanitize_input(message_content, max_length=None, strip_controls=True)
|
|
|
|
# Format timestamp
|
|
if timestamp and timestamp != 'unknown':
|
|
try:
|
|
from datetime import datetime,UTC
|
|
dt = datetime.fromtimestamp(message.timestamp)
|
|
elapsed_str = round((datetime.now(UTC).timestamp()-message.timestamp)*1000)
|
|
except:
|
|
elapsed_str = "Unknown"
|
|
else:
|
|
elapsed_str = "Unknown"
|
|
|
|
# Convert to our message format
|
|
message = MeshMessage(
|
|
content=message_content,
|
|
sender_id=sender_name,
|
|
sender_pubkey=sender_pubkey,
|
|
is_dm=True,
|
|
timestamp=timestamp,
|
|
snr=snr,
|
|
rssi=rssi,
|
|
elapsed=elapsed_str,
|
|
hops=path_len if path_len != 255 else 0,
|
|
path=path_info
|
|
)
|
|
|
|
# Always decode and log path information for debugging (regardless of keywords)
|
|
recent_rf_data = self.find_recent_rf_data()
|
|
|
|
# If we have RF data with routing information, update the path with that instead
|
|
if recent_rf_data and recent_rf_data.get('routing_info'):
|
|
rf_routing = recent_rf_data['routing_info']
|
|
if rf_routing.get('path_length', 0) > 0:
|
|
path_nodes = rf_routing.get('path_nodes', [])
|
|
route_type = rf_routing.get('route_type', 'Unknown')
|
|
if path_nodes:
|
|
message.path = f"{','.join(path_nodes)} ({len(path_nodes)} hops via {route_type})"
|
|
self.logger.info(f"🛣️ CONTACT USING RF ROUTING: {message.path}")
|
|
else:
|
|
message.path = f"{rf_routing.get('path_hex', 'Unknown')} ({rf_routing.get('path_length', 0)} hops via {route_type})"
|
|
self.logger.info(f"🛣️ CONTACT USING RF ROUTING: {message.path}")
|
|
else:
|
|
message.path = f"Direct via {rf_routing.get('route_type', 'Unknown')}"
|
|
self.logger.info(f"📡 CONTACT USING RF ROUTING: {message.path}")
|
|
|
|
await self._debug_decode_message_path(message, sender_id, recent_rf_data)
|
|
|
|
# Always attempt packet decoding and log the results for debugging
|
|
await self._debug_decode_packet_for_message(message, sender_id, recent_rf_data)
|
|
|
|
await self.process_message(message)
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error handling contact message: {e}")
|
|
|
|
async def handle_raw_data(self, event, metadata=None):
|
|
"""Handle raw data events (full packet data from debug mode).
|
|
|
|
Processes raw packet data, attempts to decode it, and if successful,
|
|
checking if it's an advertisement packet to track.
|
|
|
|
Args:
|
|
event: The MeshCore event object containing the raw data payload.
|
|
metadata: Optional metadata dictionary.
|
|
"""
|
|
try:
|
|
# Copy payload immediately to avoid segfault if event is freed
|
|
# Make a deep copy to ensure we have all the data we need
|
|
payload = copy.deepcopy(event.payload) if hasattr(event, 'payload') else None
|
|
if payload is None:
|
|
self.logger.warning("RAW_DATA event has no payload")
|
|
return
|
|
|
|
self.logger.info(f"📦 RAW_DATA EVENT RECEIVED: {payload}")
|
|
self.logger.info(f"📦 Event type: {type(event)}")
|
|
self.logger.info(f"📦 Metadata: {metadata}")
|
|
|
|
# This should contain the full packet data we need
|
|
if hasattr(payload, 'data') or 'data' in payload:
|
|
raw_data = payload.get('data', payload.data if hasattr(payload, 'data') else None)
|
|
if raw_data:
|
|
self.logger.info(f"🔍 FULL PACKET DATA: {raw_data}")
|
|
|
|
# Try to decode this as a MeshCore packet
|
|
if isinstance(raw_data, str):
|
|
# Convert to hex if it's not already
|
|
if not raw_data.startswith('0x'):
|
|
raw_hex = raw_data
|
|
else:
|
|
raw_hex = raw_data[2:] # Remove 0x prefix
|
|
|
|
# Decode the packet
|
|
packet_info = self.decode_meshcore_packet(raw_hex)
|
|
if packet_info:
|
|
self.logger.info(f"✅ SUCCESSFULLY DECODED RAW PACKET: {packet_info}")
|
|
|
|
# Check if this is an advertisement packet and track it
|
|
await self._process_advertisement_packet(packet_info, metadata)
|
|
else:
|
|
self.logger.warning("❌ Failed to decode raw packet data")
|
|
else:
|
|
self.logger.warning(f"❌ Unexpected raw data type: {type(raw_data)}")
|
|
else:
|
|
self.logger.warning("❌ No data field in RAW_DATA event")
|
|
else:
|
|
self.logger.warning(f"❌ Unexpected RAW_DATA payload structure: {payload}")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error handling raw data event: {e}")
|
|
import traceback
|
|
self.logger.error(traceback.format_exc())
|
|
|
|
async def _process_advertisement_packet(self, packet_info: Dict, metadata=None):
|
|
"""Process advertisement packets for complete repeater tracking.
|
|
|
|
Extracts node information, location data, and routing path from
|
|
advertisement packets and updates the repeater database.
|
|
|
|
Args:
|
|
packet_info: Dictionary containing decoded packet information.
|
|
metadata: Optional metadata dictionary with signal metrics.
|
|
"""
|
|
try:
|
|
# Check if this is an advertisement packet
|
|
if (packet_info.get('payload_type') == 'ADVERT' or
|
|
packet_info.get('payload_type_name') == 'ADVERT' or
|
|
packet_info.get('type') == 'advert'):
|
|
self.logger.debug(f"Processing advertisement packet: {packet_info}")
|
|
|
|
# Parse the advert payload if we have it
|
|
advert_data = {}
|
|
if 'payload_hex' in packet_info:
|
|
try:
|
|
payload_bytes = bytes.fromhex(packet_info['payload_hex'])
|
|
parsed_advert = self.parse_advert(payload_bytes)
|
|
if parsed_advert:
|
|
advert_data = parsed_advert
|
|
self.logger.info(f"✅ Parsed ADVERT: {advert_data.get('mode', 'Unknown')} - {advert_data.get('name', 'No name')}")
|
|
except Exception as e:
|
|
self.logger.warning(f"Failed to parse ADVERT payload: {e}")
|
|
|
|
# Fallback to basic information if parsing failed
|
|
if not advert_data:
|
|
advert_data = {
|
|
'public_key': packet_info.get('sender_id', ''),
|
|
'name': packet_info.get('name', packet_info.get('adv_name', 'Unknown')),
|
|
'mode': 'Unknown'
|
|
}
|
|
|
|
# Add advert data to packet_info for web viewer
|
|
if advert_data:
|
|
packet_info['advert_name'] = advert_data.get('name')
|
|
packet_info['advert_mode'] = advert_data.get('mode')
|
|
packet_info['advert_public_key'] = advert_data.get('public_key')
|
|
|
|
# Extract signal information from metadata
|
|
signal_info = {}
|
|
if metadata:
|
|
signal_info.update(metadata)
|
|
|
|
# Add hop count if available
|
|
if 'hops' in packet_info:
|
|
signal_info['hops'] = packet_info['hops']
|
|
|
|
# Extract packet_hash and path information if available (from routing_info or packet_info)
|
|
packet_hash = None
|
|
out_path = ''
|
|
out_path_len = -1
|
|
|
|
if 'routing_info' in packet_info and packet_info['routing_info']:
|
|
routing_info = packet_info['routing_info']
|
|
packet_hash = routing_info.get('packet_hash')
|
|
# Extract path information from routing_info
|
|
path_hex = routing_info.get('path_hex', '')
|
|
path_length = routing_info.get('path_length', 0)
|
|
if path_hex and path_length > 0:
|
|
out_path = path_hex
|
|
out_path_len = path_length
|
|
elif path_length == 0:
|
|
# Direct connection
|
|
out_path = ''
|
|
out_path_len = 0
|
|
elif 'packet_hash' in packet_info:
|
|
packet_hash = packet_info['packet_hash']
|
|
|
|
# Also check packet_info directly for path information (fallback)
|
|
if out_path_len == -1:
|
|
if 'path_hex' in packet_info:
|
|
out_path = packet_info.get('path_hex', '')
|
|
out_path_len = packet_info.get('path_len', -1)
|
|
elif 'path_len' in packet_info:
|
|
out_path_len = packet_info.get('path_len', -1)
|
|
if out_path_len == 0:
|
|
out_path = ''
|
|
|
|
# Add path information to advert_data so it gets saved to the database
|
|
if out_path_len >= 0:
|
|
advert_data['out_path'] = out_path
|
|
advert_data['out_path_len'] = out_path_len
|
|
|
|
# Track this advertisement in the complete database
|
|
if hasattr(self.bot, 'repeater_manager'):
|
|
# Track all advertisements regardless of type
|
|
success = await self.bot.repeater_manager.track_contact_advertisement(
|
|
advert_data, signal_info, packet_hash=packet_hash
|
|
)
|
|
if success:
|
|
# Log rich advert information
|
|
mode = advert_data.get('mode', 'Unknown')
|
|
name = advert_data.get('name', 'No name')
|
|
location = ""
|
|
if 'lat' in advert_data and 'lon' in advert_data:
|
|
# Try to get resolved location from database if available
|
|
try:
|
|
if hasattr(self.bot, 'repeater_manager'):
|
|
# Look up the contact to get resolved location
|
|
public_key = advert_data.get('public_key')
|
|
if public_key:
|
|
contact_query = self.bot.db_manager.execute_query(
|
|
'SELECT city, state, country FROM complete_contact_tracking WHERE public_key = ?',
|
|
(public_key,)
|
|
)
|
|
if contact_query:
|
|
contact = contact_query[0]
|
|
city = contact.get('city')
|
|
state = contact.get('state')
|
|
if city and state:
|
|
location = f" at {city}, {state}"
|
|
elif city:
|
|
location = f" at {city}"
|
|
else:
|
|
# Fallback to coordinates if no resolved location
|
|
location = f" at {advert_data['lat']:.4f},{advert_data['lon']:.4f}"
|
|
else:
|
|
# No contact found yet, use coordinates
|
|
location = f" at {advert_data['lat']:.4f},{advert_data['lon']:.4f}"
|
|
else:
|
|
# No public key, use coordinates
|
|
location = f" at {advert_data['lat']:.4f},{advert_data['lon']:.4f}"
|
|
else:
|
|
# No repeater manager, use coordinates
|
|
location = f" at {advert_data['lat']:.4f},{advert_data['lon']:.4f}"
|
|
except Exception as e:
|
|
# If lookup fails, fallback to coordinates
|
|
self.logger.debug(f"Could not get resolved location for logging: {e}")
|
|
location = f" at {advert_data['lat']:.4f},{advert_data['lon']:.4f}"
|
|
|
|
# Show hop count in log
|
|
hop_count = signal_info.get('hops', 0)
|
|
hop_info = f" ({hop_count} hop{'s' if hop_count != 1 else ''})" if hop_count is not None else ""
|
|
|
|
self.logger.info(f"📡 Tracked {mode}: {name}{location}{hop_info}")
|
|
else:
|
|
self.logger.warning(f"Failed to track contact advertisement: {advert_data.get('name', 'Unknown')}")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error processing advertisement packet: {e}")
|
|
|
|
async def handle_rf_log_data(self, event, metadata=None):
|
|
"""Handle RF log data events to cache SNR information and store raw packet data.
|
|
|
|
Captures low-level RF information (SNR, RSSI) and raw packet data to
|
|
correlate with higher-level messages for detailed signal reporting.
|
|
|
|
Args:
|
|
event: The MeshCore event object containing RF data.
|
|
metadata: Optional metadata dictionary.
|
|
"""
|
|
try:
|
|
# Copy payload immediately to avoid segfault if event is freed
|
|
import copy
|
|
payload = copy.deepcopy(event.payload) if hasattr(event, 'payload') else None
|
|
if payload is None:
|
|
self.logger.warning("RF log data event has no payload")
|
|
return
|
|
|
|
# Extract SNR from payload
|
|
if 'snr' in payload:
|
|
snr_value = payload.get('snr')
|
|
|
|
# Use raw_hex prefix for correlation instead of trying to extract pubkey
|
|
raw_hex = payload.get('raw_hex', '')
|
|
packet_prefix = None
|
|
|
|
if raw_hex:
|
|
# Use first 32 characters as correlation key (16 bytes)
|
|
# This provides unique identification while being consistent
|
|
packet_prefix = raw_hex[:32]
|
|
self.logger.debug(f"Using packet prefix for correlation: {packet_prefix}")
|
|
|
|
# Keep pubkey_prefix for contact lookup (from metadata if available)
|
|
pubkey_prefix = None
|
|
if metadata and 'pubkey_prefix' in metadata:
|
|
pubkey_prefix = metadata.get('pubkey_prefix')
|
|
self.logger.debug(f"Got pubkey_prefix from metadata: {pubkey_prefix[:16]}...")
|
|
|
|
if packet_prefix and snr_value is not None:
|
|
# Cache the SNR value for this packet prefix
|
|
self.snr_cache[packet_prefix] = snr_value
|
|
self.logger.debug(f"Cached SNR {snr_value} for packet prefix {packet_prefix}")
|
|
|
|
# Extract and cache RSSI if available
|
|
if 'rssi' in payload:
|
|
rssi_value = payload.get('rssi')
|
|
if packet_prefix and rssi_value is not None:
|
|
# Cache the RSSI value for this packet prefix
|
|
self.rssi_cache[packet_prefix] = rssi_value
|
|
self.logger.debug(f"Cached RSSI {rssi_value} for packet prefix {packet_prefix}")
|
|
|
|
# Store recent RF data with timestamp for SNR/RSSI matching only
|
|
if packet_prefix:
|
|
import time
|
|
current_time = time.time()
|
|
|
|
# Store both raw packet data and extracted payload for analysis
|
|
raw_hex = payload.get('raw_hex', '')
|
|
extracted_payload = payload.get('payload', '')
|
|
payload_length = payload.get('payload_length', 0)
|
|
|
|
# Extract routing information from raw packet if available
|
|
routing_info = None
|
|
packet_hash = None
|
|
if raw_hex:
|
|
# Use extracted payload if available, otherwise use raw_hex
|
|
decoded_packet = self.decode_meshcore_packet(raw_hex, extracted_payload)
|
|
if decoded_packet:
|
|
# Calculate packet hash for this packet (useful for tracking same message via different paths)
|
|
# Use extracted_payload if available (actual MeshCore packet), otherwise use raw_hex
|
|
# This matches the logic in decode_meshcore_packet which prefers extracted_payload
|
|
# extracted_payload is the actual MeshCore packet without RF wrapper, so use it if available
|
|
packet_hex_for_hash = extracted_payload if (extracted_payload and len(extracted_payload) > 0) else raw_hex
|
|
|
|
# Ensure we use the numeric payload_type value (not enum or string)
|
|
payload_type_value = decoded_packet.get('payload_type', None)
|
|
if payload_type_value is not None:
|
|
# Handle enum.value if it's an enum
|
|
if hasattr(payload_type_value, 'value'):
|
|
payload_type_value = payload_type_value.value
|
|
payload_type_value = int(payload_type_value)
|
|
packet_hash = calculate_packet_hash(packet_hex_for_hash, payload_type_value)
|
|
|
|
routing_info = {
|
|
'path_length': decoded_packet.get('path_len', 0),
|
|
'path_hex': decoded_packet.get('path_hex', ''),
|
|
'path_nodes': decoded_packet.get('path', []),
|
|
'route_type': decoded_packet.get('route_type_name', 'Unknown'),
|
|
'payload_length': payload_length, # Use the actual payload length
|
|
'payload_type': decoded_packet.get('payload_type_name', 'Unknown'),
|
|
'packet_hash': packet_hash # Store hash for packet tracking
|
|
}
|
|
|
|
# Log the routing information for analysis
|
|
if routing_info['path_length'] > 0:
|
|
# Format path with comma separation (every 2 characters)
|
|
path_hex = routing_info['path_hex']
|
|
formatted_path = ','.join([path_hex[i:i+2] for i in range(0, len(path_hex), 2)])
|
|
log_message = f"🛣️ ROUTING INFO: {routing_info['route_type']} | Path: {formatted_path} ({routing_info['path_length']} bytes) | Payload: {routing_info['payload_length']} bytes | Type: {routing_info['payload_type']}"
|
|
self.logger.info(log_message)
|
|
else:
|
|
log_message = f"📡 DIRECT MESSAGE: {routing_info['route_type']} | Type: {routing_info['payload_type']}"
|
|
self.logger.info(log_message)
|
|
|
|
# Capture full packet data for web viewer (for all packets)
|
|
if (hasattr(self.bot, 'web_viewer_integration') and
|
|
self.bot.web_viewer_integration and
|
|
self.bot.web_viewer_integration.bot_integration):
|
|
# Use extracted_payload which is the full MeshCore packet
|
|
# (header + path_len + path + payload, without RF wrapper)
|
|
decoded_packet['raw_packet_hex'] = extracted_payload if extracted_payload else raw_hex
|
|
decoded_packet['packet_hash'] = packet_hash
|
|
self.bot.web_viewer_integration.bot_integration.capture_full_packet_data(decoded_packet)
|
|
|
|
# Process ADVERT packets for contact tracking (regardless of path length)
|
|
if routing_info['payload_type'] == 'ADVERT':
|
|
# Add routing_info to decoded_packet so it's available in _process_advertisement_packet
|
|
decoded_packet['routing_info'] = routing_info
|
|
# Create signal info from available data
|
|
signal_info = {
|
|
'snr': snr_value,
|
|
'rssi': payload.get('rssi') if 'rssi' in payload else None,
|
|
'hops': routing_info['path_length']
|
|
}
|
|
await self._process_advertisement_packet(decoded_packet, signal_info)
|
|
|
|
rf_data = {
|
|
'timestamp': current_time,
|
|
'packet_prefix': packet_prefix, # Use packet prefix for correlation
|
|
'pubkey_prefix': pubkey_prefix, # Keep for contact lookup
|
|
'snr': snr_value,
|
|
'rssi': payload.get('rssi') if 'rssi' in payload else None,
|
|
'raw_hex': raw_hex, # Full packet data
|
|
'payload': extracted_payload, # Extracted payload
|
|
'payload_length': payload_length, # Payload length
|
|
'routing_info': routing_info, # Extracted routing information
|
|
'packet_hash': packet_hash # Packet hash for tracking same message via different paths
|
|
}
|
|
self.recent_rf_data.append(rf_data)
|
|
|
|
# Update correlation indexes
|
|
self.rf_data_by_timestamp[current_time] = rf_data
|
|
if packet_prefix:
|
|
if packet_prefix not in self.rf_data_by_pubkey:
|
|
self.rf_data_by_pubkey[packet_prefix] = []
|
|
self.rf_data_by_pubkey[packet_prefix].append(rf_data)
|
|
|
|
# Clean up old data from all indexes
|
|
self._cleanup_stale_cache_entries(current_time)
|
|
|
|
# Try to correlate with any pending messages
|
|
self.try_correlate_pending_messages(rf_data)
|
|
|
|
self.logger.debug(f"Stored recent RF data with routing info: {rf_data}")
|
|
|
|
# Clean up old pending messages
|
|
self.cleanup_old_messages()
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error handling RF log data: {e}")
|
|
|
|
def extract_path_from_raw_hex(self, raw_hex: str, expected_hops: int) -> Optional[str]:
|
|
"""Extract path information directly from raw hex data.
|
|
|
|
Attempts to find a sequence of node IDs in the raw packet data that matches
|
|
the expected number of hops.
|
|
|
|
Args:
|
|
raw_hex: Raw packet data as a hex string.
|
|
expected_hops: The expected number of hops in the path.
|
|
|
|
Returns:
|
|
Optional[str]: Comma-separated path string if found, None otherwise.
|
|
"""
|
|
try:
|
|
if not raw_hex or len(raw_hex) < 20:
|
|
return None
|
|
|
|
# For 0-hop (direct) messages, don't try to extract a path
|
|
if expected_hops == 0:
|
|
self.logger.debug("Direct message (0 hops) - no path to extract")
|
|
return "Direct"
|
|
|
|
# Skip the header area - don't look for paths in the first 6-8 bytes
|
|
# Header (1 byte) + transport codes (2-4 bytes) + path length (1 byte) = 4-6 bytes minimum
|
|
min_start = 8 # Start looking after header + transport + path length
|
|
|
|
# Look for path patterns in the hex data, but skip the header area
|
|
# Based on the example: ea9a1503777e5fd5658eea506990ad18...
|
|
# The path 77,7e,5f appears to be at positions 6-11 (3 bytes = 6 hex chars)
|
|
|
|
# Try different positions where path might be located, but avoid header area
|
|
path_positions = [
|
|
(8, 14), # Position 8-13 (3 bytes)
|
|
(10, 16), # Position 10-15 (3 bytes)
|
|
(12, 18), # Position 12-17 (3 bytes)
|
|
(14, 20), # Position 14-19 (3 bytes)
|
|
]
|
|
|
|
for start, end in path_positions:
|
|
if end <= len(raw_hex) and start >= min_start:
|
|
path_hex = raw_hex[start:end]
|
|
if len(path_hex) >= 6: # At least 3 bytes
|
|
# Convert hex to path nodes
|
|
path_nodes = []
|
|
for i in range(0, len(path_hex), 2):
|
|
if i + 1 < len(path_hex):
|
|
node_hex = path_hex[i:i+2]
|
|
path_nodes.append(node_hex)
|
|
|
|
if len(path_nodes) == expected_hops:
|
|
path_string = ','.join(path_nodes)
|
|
self.logger.debug(f"Found path at position {start}-{end}: {path_string}")
|
|
return path_string
|
|
|
|
# If no exact match, try to find any 3-byte pattern that looks like a path
|
|
# But skip the header area
|
|
for i in range(min_start, len(raw_hex) - 6, 2):
|
|
path_hex = raw_hex[i:i+6]
|
|
if len(path_hex) == 6:
|
|
# Check if this looks like a valid path (all hex chars)
|
|
if all(c in '0123456789abcdef' for c in path_hex.lower()):
|
|
path_nodes = [path_hex[j:j+2] for j in range(0, 6, 2)]
|
|
path_string = ','.join(path_nodes)
|
|
self.logger.debug(f"Found potential path at position {i}: {path_string}")
|
|
return path_string
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
self.logger.debug(f"Error extracting path from raw hex: {e}")
|
|
return None
|
|
|
|
def _cleanup_stale_cache_entries(self, current_time: Optional[float] = None) -> None:
|
|
"""Remove stale entries from RF data caches and enforce maximum size limits.
|
|
|
|
Args:
|
|
current_time: Optional timestamp to use as "now". Defaults to time.time().
|
|
"""
|
|
if current_time is None:
|
|
current_time = time.time()
|
|
|
|
# Only run periodic cleanup if enough time has passed
|
|
if current_time - self._last_cache_cleanup < self._cache_cleanup_interval:
|
|
# Still do basic timeout cleanup, but skip size enforcement
|
|
cutoff_time = current_time - self.rf_data_timeout
|
|
|
|
# Clean timestamp-indexed cache (timeout only)
|
|
stale_timestamps = [ts for ts in self.rf_data_by_timestamp.keys()
|
|
if ts < cutoff_time]
|
|
for ts in stale_timestamps:
|
|
del self.rf_data_by_timestamp[ts]
|
|
|
|
# Clean pubkey-indexed cache (timeout only)
|
|
for pubkey in list(self.rf_data_by_pubkey.keys()):
|
|
self.rf_data_by_pubkey[pubkey] = [data for data in self.rf_data_by_pubkey[pubkey]
|
|
if current_time - data['timestamp'] < self.rf_data_timeout]
|
|
if not self.rf_data_by_pubkey[pubkey]:
|
|
del self.rf_data_by_pubkey[pubkey]
|
|
|
|
# Clean recent_rf_data list (timeout only)
|
|
self.recent_rf_data = [data for data in self.recent_rf_data
|
|
if current_time - data['timestamp'] < self.rf_data_timeout]
|
|
return
|
|
|
|
# Full cleanup with size enforcement
|
|
self._last_cache_cleanup = current_time
|
|
cutoff_time = current_time - self.rf_data_timeout
|
|
|
|
# Clean timestamp-indexed cache
|
|
stale_timestamps = [ts for ts in self.rf_data_by_timestamp.keys()
|
|
if ts < cutoff_time]
|
|
for ts in stale_timestamps:
|
|
del self.rf_data_by_timestamp[ts]
|
|
|
|
# Enforce maximum size on timestamp cache (keep most recent)
|
|
if len(self.rf_data_by_timestamp) > self._max_rf_cache_size:
|
|
sorted_items = sorted(self.rf_data_by_timestamp.items(),
|
|
key=lambda x: x[1].get('timestamp', 0),
|
|
reverse=True)
|
|
self.rf_data_by_timestamp = dict(sorted_items[:self._max_rf_cache_size])
|
|
|
|
# Clean pubkey-indexed cache
|
|
for pubkey in list(self.rf_data_by_pubkey.keys()):
|
|
self.rf_data_by_pubkey[pubkey] = [data for data in self.rf_data_by_pubkey[pubkey]
|
|
if current_time - data['timestamp'] < self.rf_data_timeout]
|
|
if not self.rf_data_by_pubkey[pubkey]:
|
|
del self.rf_data_by_pubkey[pubkey]
|
|
|
|
# Enforce maximum size on pubkey cache (keep most recent per pubkey)
|
|
total_pubkey_entries = sum(len(entries) for entries in self.rf_data_by_pubkey.values())
|
|
if total_pubkey_entries > self._max_rf_cache_size:
|
|
# Sort all entries by timestamp and keep most recent
|
|
all_pubkey_entries = []
|
|
for pubkey, entries in self.rf_data_by_pubkey.items():
|
|
for entry in entries:
|
|
all_pubkey_entries.append((pubkey, entry))
|
|
all_pubkey_entries.sort(key=lambda x: x[1].get('timestamp', 0), reverse=True)
|
|
|
|
# Rebuild pubkey cache with only the most recent entries
|
|
self.rf_data_by_pubkey = {}
|
|
for pubkey, entry in all_pubkey_entries[:self._max_rf_cache_size]:
|
|
if pubkey not in self.rf_data_by_pubkey:
|
|
self.rf_data_by_pubkey[pubkey] = []
|
|
self.rf_data_by_pubkey[pubkey].append(entry)
|
|
|
|
# Clean recent_rf_data list
|
|
self.recent_rf_data = [data for data in self.recent_rf_data
|
|
if current_time - data['timestamp'] < self.rf_data_timeout]
|
|
|
|
# Enforce maximum size on recent_rf_data (keep most recent)
|
|
if len(self.recent_rf_data) > self._max_rf_cache_size:
|
|
self.recent_rf_data.sort(key=lambda x: x.get('timestamp', 0), reverse=True)
|
|
self.recent_rf_data = self.recent_rf_data[:self._max_rf_cache_size]
|
|
|
|
def find_recent_rf_data(self, correlation_key=None, max_age_seconds=None):
|
|
"""Find recent RF data for SNR/RSSI and packet decoding with improved correlation
|
|
|
|
Args:
|
|
correlation_key: Can be either:
|
|
- packet_prefix (from raw_hex[:32]) for RF data correlation
|
|
- pubkey_prefix (from message payload) for message correlation
|
|
"""
|
|
import time
|
|
current_time = time.time()
|
|
|
|
# Use default timeout if not specified
|
|
if max_age_seconds is None:
|
|
max_age_seconds = self.rf_data_timeout
|
|
|
|
# Filter recent RF data by age
|
|
recent_data = [data for data in self.recent_rf_data
|
|
if current_time - data['timestamp'] < max_age_seconds]
|
|
|
|
if not recent_data:
|
|
self.logger.debug(f"No recent RF data found within {max_age_seconds}s window")
|
|
return None
|
|
|
|
# Strategy 1: Try exact packet prefix match first (for RF data correlation)
|
|
if correlation_key:
|
|
for data in recent_data:
|
|
rf_packet_prefix = data.get('packet_prefix', '') or ''
|
|
if rf_packet_prefix == correlation_key:
|
|
self.logger.debug(f"Found exact packet prefix match: {rf_packet_prefix}")
|
|
return data
|
|
|
|
# Strategy 2: Try pubkey prefix match (for message correlation)
|
|
if correlation_key:
|
|
for data in recent_data:
|
|
rf_pubkey_prefix = data.get('pubkey_prefix', '') or ''
|
|
if rf_pubkey_prefix == correlation_key:
|
|
self.logger.debug(f"Found exact pubkey prefix match: {rf_pubkey_prefix}")
|
|
return data
|
|
|
|
# Strategy 3: Try partial packet prefix matches
|
|
if correlation_key:
|
|
for data in recent_data:
|
|
rf_packet_prefix = data.get('packet_prefix', '') or ''
|
|
# Check for partial match (at least 16 characters)
|
|
min_length = min(len(rf_packet_prefix), len(correlation_key), 16)
|
|
if (rf_packet_prefix[:min_length] == correlation_key[:min_length] and min_length >= 16):
|
|
self.logger.debug(f"Found partial packet prefix match: {rf_packet_prefix[:16]}... matches {correlation_key[:16]}...")
|
|
return data
|
|
|
|
# Strategy 4: Use most recent data (fallback for timing issues)
|
|
if recent_data:
|
|
most_recent = max(recent_data, key=lambda x: x['timestamp'])
|
|
packet_prefix = most_recent.get('packet_prefix', 'unknown')
|
|
self.logger.debug(f"Using most recent RF data (fallback): {packet_prefix} at {most_recent['timestamp']}")
|
|
return most_recent
|
|
|
|
return None
|
|
|
|
def store_message_for_correlation(self, message_id, message_data):
|
|
"""Store a message temporarily to wait for RF data correlation"""
|
|
import time
|
|
self.pending_messages[message_id] = {
|
|
'data': message_data,
|
|
'timestamp': time.time(),
|
|
'processed': False
|
|
}
|
|
self.logger.debug(f"Stored message {message_id} for RF data correlation")
|
|
|
|
def correlate_message_with_rf_data(self, message_id):
|
|
"""Try to correlate a stored message with available RF data"""
|
|
if message_id not in self.pending_messages:
|
|
return None
|
|
|
|
message_info = self.pending_messages[message_id]
|
|
message_data = message_info['data']
|
|
|
|
# Try to find RF data for this message
|
|
pubkey_prefix = message_data.get('pubkey_prefix', '')
|
|
rf_data = self.find_recent_rf_data(pubkey_prefix)
|
|
|
|
if rf_data:
|
|
self.logger.debug(f"Successfully correlated message {message_id} with RF data")
|
|
message_info['processed'] = True
|
|
return rf_data
|
|
|
|
return None
|
|
|
|
def cleanup_old_messages(self):
|
|
"""Clean up old pending messages that couldn't be correlated"""
|
|
import time
|
|
current_time = time.time()
|
|
|
|
to_remove = []
|
|
for message_id, message_info in self.pending_messages.items():
|
|
if current_time - message_info['timestamp'] > self.message_timeout:
|
|
to_remove.append(message_id)
|
|
|
|
for message_id in to_remove:
|
|
del self.pending_messages[message_id]
|
|
self.logger.debug(f"Cleaned up old pending message {message_id}")
|
|
|
|
def try_correlate_pending_messages(self, rf_data):
|
|
"""Try to correlate new RF data with any pending messages"""
|
|
pubkey_prefix = rf_data.get('pubkey_prefix', '') or ''
|
|
|
|
for message_id, message_info in self.pending_messages.items():
|
|
if message_info['processed']:
|
|
continue
|
|
|
|
message_pubkey = message_info['data'].get('pubkey_prefix', '') or ''
|
|
|
|
# Check if this RF data matches the pending message
|
|
if (pubkey_prefix == message_pubkey or
|
|
(len(pubkey_prefix) >= 16 and len(message_pubkey) >= 16 and
|
|
pubkey_prefix[:16] == message_pubkey[:16])):
|
|
self.logger.debug(f"Correlated RF data with pending message {message_id}")
|
|
message_info['processed'] = True
|
|
break
|
|
|
|
|
|
|
|
def decode_meshcore_packet(self, raw_hex: str, payload_hex: str = None) -> Optional[dict]:
|
|
"""
|
|
Decode a MeshCore packet from raw hex data - matches Packet.cpp exactly
|
|
|
|
Args:
|
|
raw_hex: Raw packet data as hex string (may be RF data or direct MeshCore packet)
|
|
payload_hex: Optional extracted payload hex string (preferred over raw_hex)
|
|
|
|
Returns:
|
|
Decoded packet information or None if parsing fails
|
|
"""
|
|
try:
|
|
# Use payload_hex if provided (this is the actual MeshCore packet)
|
|
if payload_hex:
|
|
self.logger.debug("Using provided payload_hex for decoding")
|
|
hex_data = payload_hex
|
|
elif raw_hex:
|
|
self.logger.debug("Using raw_hex for decoding")
|
|
hex_data = raw_hex
|
|
else:
|
|
self.logger.debug("No packet data provided for decoding")
|
|
return None
|
|
|
|
# Remove 0x prefix if present (like in your other project)
|
|
if hex_data.startswith('0x'):
|
|
hex_data = hex_data[2:]
|
|
|
|
byte_data = bytes.fromhex(hex_data)
|
|
|
|
# Validate minimum packet size
|
|
if len(byte_data) < 2:
|
|
self.logger.error(f"Packet too short: {len(byte_data)} bytes")
|
|
return None
|
|
|
|
header = byte_data[0]
|
|
|
|
# Extract route type
|
|
route_type = RouteType(header & 0x03)
|
|
has_transport = route_type in [RouteType.TRANSPORT_FLOOD, RouteType.TRANSPORT_DIRECT]
|
|
|
|
# Calculate path length offset based on presence of transport codes
|
|
offset = 1
|
|
if has_transport:
|
|
offset += 4
|
|
|
|
# Check if we have enough data for path_len
|
|
if len(byte_data) <= offset:
|
|
self.logger.error(f"Packet too short for path_len at offset {offset}: {len(byte_data)} bytes")
|
|
return None
|
|
|
|
path_len = byte_data[offset]
|
|
offset += 1
|
|
|
|
# Check if we have enough data for the full path
|
|
if len(byte_data) < offset + path_len:
|
|
self.logger.error(f"Packet too short for path (need {offset + path_len}, have {len(byte_data)})")
|
|
return None
|
|
|
|
# Extract path
|
|
path_bytes = byte_data[offset:offset + path_len]
|
|
offset += path_len
|
|
|
|
# Remaining data is payload
|
|
payload = byte_data[offset:]
|
|
|
|
# Extract payload version (bits 6-7)
|
|
payload_version = PayloadVersion((header >> 6) & 0x03)
|
|
|
|
# Only accept VER_1 (version 0)
|
|
if payload_version != PayloadVersion.VER_1:
|
|
self.logger.warning(f"Encountered an unknown packet version. Version: {payload_version.value} RAW: {hex_data}")
|
|
return None
|
|
|
|
# Extract payload type (bits 2-5)
|
|
payload_type = PayloadType((header >> 2) & 0x0F)
|
|
|
|
# Convert path to list of hex values
|
|
path_hex = path_bytes.hex()
|
|
path_values = []
|
|
i = 0
|
|
while i < len(path_hex):
|
|
path_values.append(path_hex[i:i+2])
|
|
i += 2
|
|
|
|
# Process path based on packet type
|
|
path_info = self._process_packet_path(
|
|
path_bytes,
|
|
payload,
|
|
route_type,
|
|
payload_type
|
|
)
|
|
|
|
# Extract transport codes if present (only for TRANSPORT_FLOOD and TRANSPORT_DIRECT)
|
|
transport_codes = None
|
|
if has_transport and len(byte_data) >= 5: # header(1) + transport(4)
|
|
transport_bytes = byte_data[1:5]
|
|
transport_codes = {
|
|
'code1': int.from_bytes(transport_bytes[0:2], byteorder='little'),
|
|
'code2': int.from_bytes(transport_bytes[2:4], byteorder='little'),
|
|
'hex': transport_bytes.hex()
|
|
}
|
|
|
|
packet_info = {
|
|
'header': f"0x{header:02x}",
|
|
# Raw values for backward compatibility
|
|
'route_type': route_type.value,
|
|
'route_type_name': route_type.name,
|
|
'payload_type': payload_type.value,
|
|
'payload_type_name': payload_type.name,
|
|
'payload_version': payload_version.value,
|
|
# Enum objects for improved type safety
|
|
'route_type_enum': route_type,
|
|
'payload_type_enum': payload_type,
|
|
'payload_version_enum': payload_version,
|
|
# Transport and path information
|
|
'has_transport_codes': has_transport,
|
|
'transport_codes': transport_codes,
|
|
'transport_size': 4 if has_transport else 0,
|
|
'path_len': path_len,
|
|
'path_info': path_info,
|
|
'path': path_values, # For backward compatibility
|
|
'path_hex': path_hex,
|
|
'payload_hex': payload.hex(),
|
|
'payload_bytes': len(payload)
|
|
}
|
|
|
|
self.logger.debug(f"Successfully decoded: route={packet_info.get('route_type_name')}, type={packet_info.get('payload_type_name')}")
|
|
return packet_info
|
|
|
|
except Exception as e:
|
|
# Log as ERROR not DEBUG so we can see what's failing
|
|
self.logger.error(f"Error decoding packet (len={len(byte_data)}): {e}", exc_info=True)
|
|
self.logger.error(f"Failed packet hex: {hex_data}")
|
|
return None
|
|
|
|
def parse_advert(self, payload):
|
|
"""Parse advert payload - matches C++ AdvertDataHelpers.h implementation"""
|
|
try:
|
|
# Validate minimum payload size
|
|
if len(payload) < 101:
|
|
self.logger.error(f"ADVERT payload too short: {len(payload)} bytes")
|
|
return {}
|
|
|
|
# advert header
|
|
pub_key = payload[0:32]
|
|
timestamp = int.from_bytes(payload[32:32+4], "little")
|
|
signature = payload[36:36+64]
|
|
|
|
# appdata - parse according to C++ AdvertDataParser
|
|
app_data = payload[100:]
|
|
if len(app_data) == 0:
|
|
self.logger.error("ADVERT has no app data")
|
|
return {}
|
|
|
|
flags_byte = app_data[0]
|
|
|
|
# Log the full flag byte for debugging
|
|
if hasattr(self, 'debug') and self.debug:
|
|
self.logger.debug(f"ADVERT flags: 0x{flags_byte:02X} (binary: {flags_byte:08b})")
|
|
|
|
# Create flags object with the full byte value
|
|
flags = AdvertFlags(flags_byte)
|
|
|
|
advert = {
|
|
"public_key": pub_key.hex(),
|
|
"advert_time": timestamp,
|
|
"signature": signature.hex(),
|
|
}
|
|
|
|
# Extract type from lower 4 bits (matches C++ getType())
|
|
adv_type = flags_byte & 0x0F
|
|
if adv_type == AdvertFlags.ADV_TYPE_CHAT.value:
|
|
advert.update({"mode": DeviceRole.Companion.name})
|
|
elif adv_type == AdvertFlags.ADV_TYPE_REPEATER.value:
|
|
advert.update({"mode": DeviceRole.Repeater.name})
|
|
elif adv_type == AdvertFlags.ADV_TYPE_ROOM.value:
|
|
advert.update({"mode": DeviceRole.RoomServer.name})
|
|
elif adv_type == AdvertFlags.ADV_TYPE_SENSOR.value:
|
|
advert.update({"mode": "Sensor"})
|
|
else:
|
|
advert.update({"mode": f"Type{adv_type}"})
|
|
|
|
# Parse data according to C++ AdvertDataParser logic
|
|
i = 1 # Start after flags byte
|
|
|
|
# Parse location data if present (matches C++ hasLatLon())
|
|
if AdvertFlags.ADV_LATLON_MASK in flags:
|
|
if len(app_data) < i + 8:
|
|
self.logger.error(f"ADVERT with location flag too short: {len(app_data)} bytes")
|
|
return advert
|
|
|
|
lat = int.from_bytes(app_data[i:i+4], 'little', signed=True)
|
|
lon = int.from_bytes(app_data[i+4:i+8], 'little', signed=True)
|
|
advert.update({"lat": round(lat / 1000000.0, 6), "lon": round(lon / 1000000.0, 6)})
|
|
i += 8
|
|
|
|
# Parse feat1 data if present
|
|
if AdvertFlags.ADV_FEAT1_MASK in flags:
|
|
if len(app_data) < i + 2:
|
|
self.logger.error(f"ADVERT with feat1 flag too short: {len(app_data)} bytes")
|
|
return advert
|
|
feat1 = int.from_bytes(app_data[i:i+2], 'little')
|
|
advert.update({"feat1": feat1})
|
|
i += 2
|
|
|
|
# Parse feat2 data if present
|
|
if AdvertFlags.ADV_FEAT2_MASK in flags:
|
|
if len(app_data) < i + 2:
|
|
self.logger.error(f"ADVERT with feat2 flag too short: {len(app_data)} bytes")
|
|
return advert
|
|
feat2 = int.from_bytes(app_data[i:i+2], 'little')
|
|
advert.update({"feat2": feat2})
|
|
i += 2
|
|
|
|
# Parse name data if present (matches C++ hasName())
|
|
if AdvertFlags.ADV_NAME_MASK in flags:
|
|
if len(app_data) >= i:
|
|
name_len = len(app_data) - i
|
|
if name_len > 0:
|
|
try:
|
|
# Decode name and handle potential null terminators
|
|
name = app_data[i:].decode('utf-8', errors='ignore').rstrip('\x00')
|
|
advert.update({"name": name})
|
|
except Exception as e:
|
|
self.logger.warning(f"Failed to decode ADVERT name: {e}")
|
|
|
|
return advert
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error parsing ADVERT payload: {e}", exc_info=True)
|
|
return {}
|
|
|
|
def _process_packet_path(self, path_bytes: bytes, payload: bytes,
|
|
route_type: RouteType, payload_type: PayloadType) -> dict:
|
|
"""
|
|
Process the path field based on packet and route type
|
|
|
|
Args:
|
|
path_bytes: Raw path bytes
|
|
payload: Payload bytes (needed for TRACE packets)
|
|
route_type: Route type from header
|
|
payload_type: Payload type from header
|
|
|
|
Returns:
|
|
dict: Processed path information
|
|
"""
|
|
try:
|
|
# Convert path bytes to hex node IDs
|
|
path_nodes = [f"{b:02x}" for b in path_bytes]
|
|
|
|
# Special handling for TRACE packets
|
|
if payload_type == PayloadType.TRACE:
|
|
# In TRACE packets, path field contains SNR data
|
|
# Real path is in the payload after tag(4) + auth(4) + flags(1)
|
|
snr_values = []
|
|
for b in path_bytes:
|
|
# Convert SNR byte to dB (signed value)
|
|
snr_db = (b - 256) / 4 if b > 127 else b / 4
|
|
snr_values.append(snr_db)
|
|
|
|
# For TRACE packets, we don't have a clear "real path" in the payload
|
|
# The path field contains SNR data, and the payload contains the message
|
|
# We'll use the SNR path length as the hop count, but don't extract a "real path"
|
|
real_path = []
|
|
# Note: TRACE packets have SNR data in the path field, not routing path
|
|
# The actual routing information is embedded in the SNR readings
|
|
|
|
return {
|
|
'type': 'trace',
|
|
'snr_data': snr_values,
|
|
'snr_path': path_nodes, # SNR data as hex for reference
|
|
'path': [], # No actual routing path for TRACE packets
|
|
'description': f"TRACE packet with {len(snr_values)} SNR readings (path contains SNR data, not routing info)"
|
|
}
|
|
|
|
# Regular packets - determine path type based on route type
|
|
is_direct = route_type in [RouteType.DIRECT, RouteType.TRANSPORT_DIRECT]
|
|
|
|
if is_direct:
|
|
# Direct routing: path contains routing instructions
|
|
# Bytes are stripped at each hop
|
|
return {
|
|
'type': 'routing_instructions',
|
|
'path': path_nodes,
|
|
'meaning': 'bytes_stripped_at_each_hop',
|
|
'description': f"Direct route via {','.join(path_nodes)} ({len(path_nodes)} hops)"
|
|
}
|
|
else:
|
|
# Flood routing: path contains historical route
|
|
# Bytes are added as packet floods through network
|
|
return {
|
|
'type': 'historical_route',
|
|
'path': path_nodes,
|
|
'meaning': 'bytes_added_as_packet_floods',
|
|
'description': f"Flooded through {','.join(path_nodes)} ({len(path_nodes)} hops)"
|
|
}
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error processing packet path: {e}")
|
|
# Return basic path info as fallback
|
|
path_nodes = [f"{b:02x}" for b in path_bytes]
|
|
return {
|
|
'type': 'unknown',
|
|
'path': path_nodes,
|
|
'description': f"Path: {','.join(path_nodes)}"
|
|
}
|
|
|
|
def _get_route_type_name(self, route_type):
|
|
"""Get human-readable name for route type"""
|
|
route_types = {
|
|
0x00: "ROUTE_TYPE_TRANSPORT_FLOOD",
|
|
0x01: "ROUTE_TYPE_FLOOD",
|
|
0x02: "ROUTE_TYPE_DIRECT",
|
|
0x03: "ROUTE_TYPE_TRANSPORT_DIRECT"
|
|
}
|
|
return route_types.get(route_type, f"UNKNOWN_ROUTE_{route_type:02x}")
|
|
|
|
def get_payload_type_name(self, payload_type: int) -> str:
|
|
"""Get human-readable name for payload type"""
|
|
payload_types = {
|
|
0x00: "REQ",
|
|
0x01: "RESPONSE",
|
|
0x02: "TXT_MSG",
|
|
0x03: "ACK",
|
|
0x04: "ADVERT",
|
|
0x05: "GRP_TXT",
|
|
0x06: "GRP_DATA",
|
|
0x07: "ANON_REQ",
|
|
0x08: "PATH",
|
|
0x09: "TRACE",
|
|
0x0A: "MULTIPART",
|
|
# Additional payload types found in meshcore library (may not be in official spec)
|
|
0x0B: "UNKNOWN_0b", # Not defined in official spec
|
|
0x0C: "UNKNOWN_0c", # Not defined in official spec
|
|
0x0D: "UNKNOWN_0d", # Not defined in official spec
|
|
0x0E: "UNKNOWN_0e", # Not defined in official spec
|
|
0x0F: "RAW_CUSTOM"
|
|
}
|
|
return payload_types.get(payload_type, f"UNKNOWN_{payload_type:02x}")
|
|
|
|
async def handle_channel_message(self, event, metadata=None):
|
|
"""Handle incoming channel message"""
|
|
try:
|
|
# Copy payload immediately to avoid segfault if event is freed
|
|
import copy
|
|
payload = copy.deepcopy(event.payload) if hasattr(event, 'payload') else None
|
|
if payload is None:
|
|
self.logger.warning("Channel message event has no payload")
|
|
return
|
|
|
|
channel_idx = payload.get('channel_idx', 0)
|
|
|
|
# Debug: Log the full payload structure
|
|
self.logger.debug(f"Channel message payload: {payload}")
|
|
self.logger.debug(f"Payload keys: {list(payload.keys())}")
|
|
|
|
# Get sender information from text field if it's in "SENDER: message" format
|
|
text = payload.get('text', '')
|
|
sender_id = "Channel User" # Default fallback
|
|
|
|
# Try to extract sender from text field (e.g., "HOWL: Test" -> "HOWL")
|
|
message_content = text # Default to full text
|
|
if ':' in text and not text.startswith(':'):
|
|
parts = text.split(':', 1)
|
|
if len(parts) == 2 and parts[0].strip():
|
|
sender_id = parts[0].strip()
|
|
message_content = parts[1].strip() # Use the part after the colon for keyword processing
|
|
self.logger.debug(f"Extracted sender from text: {sender_id}")
|
|
self.logger.debug(f"Message content for processing: {message_content}")
|
|
|
|
# Always strip trailing whitespace/newlines from message content to handle cases like "Wx 98104\n"
|
|
message_content = message_content.strip()
|
|
|
|
# Get channel name from channel number
|
|
channel_name = self.bot.channel_manager.get_channel_name(channel_idx)
|
|
|
|
self.logger.info(f"Received channel message ({channel_name}) from {sender_id}: {text}")
|
|
|
|
# Get SNR and RSSI using the same logic as contact messages
|
|
snr = 'unknown'
|
|
rssi = 'unknown'
|
|
|
|
# Try to get SNR from payload first
|
|
if 'SNR' in payload:
|
|
snr = payload.get('SNR')
|
|
elif 'snr' in payload:
|
|
snr = payload.get('snr')
|
|
# Try to get SNR from event metadata if available
|
|
elif metadata:
|
|
if 'snr' in metadata:
|
|
snr = metadata.get('snr')
|
|
elif 'SNR' in metadata:
|
|
snr = metadata.get('SNR')
|
|
|
|
# If still no SNR, try to get it from the cache using pubkey prefix from payload
|
|
if snr == 'unknown':
|
|
pubkey_prefix = payload.get('pubkey_prefix', '')
|
|
if pubkey_prefix and pubkey_prefix in self.snr_cache:
|
|
snr = self.snr_cache[pubkey_prefix]
|
|
self.logger.debug(f"Retrieved cached SNR {snr} for pubkey {pubkey_prefix}")
|
|
|
|
# Try to get RSSI from payload first
|
|
if 'RSSI' in payload:
|
|
rssi = payload.get('RSSI')
|
|
elif 'rssi' in payload:
|
|
rssi = payload.get('rssi')
|
|
elif 'signal_strength' in payload:
|
|
rssi = payload.get('signal_strength')
|
|
# Try to get RSSI from event metadata if available
|
|
elif metadata:
|
|
if 'rssi' in metadata:
|
|
rssi = metadata.get('rssi')
|
|
elif 'RSSI' in metadata:
|
|
rssi = metadata.get('RSSI')
|
|
|
|
# If still no RSSI, try to get it from the cache using pubkey prefix from payload
|
|
if rssi == 'unknown':
|
|
pubkey_prefix = payload.get('pubkey_prefix', '')
|
|
if pubkey_prefix and pubkey_prefix in self.rssi_cache:
|
|
rssi = self.rssi_cache[pubkey_prefix]
|
|
self.logger.debug(f"Retrieved cached RSSI {rssi} for pubkey {pubkey_prefix}")
|
|
|
|
# For channel messages, we can decode the packet since they use shared channel keys
|
|
# This gives us access to the actual routing information
|
|
# Extract packet prefix from message raw_hex for correlation
|
|
message_raw_hex = payload.get('raw_hex', '')
|
|
message_packet_prefix = message_raw_hex[:32] if message_raw_hex else None
|
|
message_pubkey = payload.get('pubkey_prefix', '') # Keep for contact lookup
|
|
self.logger.debug(f"Processing channel message from packet prefix: {message_packet_prefix}, pubkey: {message_pubkey}")
|
|
|
|
# Enhanced RF data correlation with multiple strategies
|
|
recent_rf_data = None
|
|
|
|
# Strategy 1: Try immediate correlation using packet prefix
|
|
if message_packet_prefix:
|
|
recent_rf_data = self.find_recent_rf_data(message_packet_prefix)
|
|
elif message_pubkey:
|
|
# Fallback to pubkey correlation
|
|
recent_rf_data = self.find_recent_rf_data(message_pubkey)
|
|
|
|
# Strategy 2: If no immediate match and enhanced correlation is enabled, store message and wait briefly
|
|
if not recent_rf_data and self.enhanced_correlation:
|
|
import time
|
|
correlation_key = message_packet_prefix or message_pubkey
|
|
message_id = f"{correlation_key}_{int(time.time() * 1000)}"
|
|
self.store_message_for_correlation(message_id, payload)
|
|
|
|
# Wait a short time for RF data to arrive (non-blocking)
|
|
await asyncio.sleep(0.1) # 100ms wait
|
|
recent_rf_data = self.correlate_message_with_rf_data(message_id)
|
|
|
|
# Strategy 3: Try with extended timeout if still no match
|
|
if not recent_rf_data:
|
|
extended_timeout = self.rf_data_timeout * 2 # Double the normal timeout
|
|
if message_packet_prefix:
|
|
recent_rf_data = self.find_recent_rf_data(message_packet_prefix, max_age_seconds=extended_timeout)
|
|
elif message_pubkey:
|
|
recent_rf_data = self.find_recent_rf_data(message_pubkey, max_age_seconds=extended_timeout)
|
|
|
|
# Strategy 4: Use most recent RF data as last resort
|
|
if not recent_rf_data:
|
|
extended_timeout = self.rf_data_timeout * 2 # Double the normal timeout
|
|
recent_rf_data = self.find_recent_rf_data(max_age_seconds=extended_timeout)
|
|
|
|
if recent_rf_data and recent_rf_data.get('raw_hex'):
|
|
raw_hex = recent_rf_data['raw_hex']
|
|
self.logger.info(f"🔍 FOUND RF DATA: {len(raw_hex)} chars, starts with: {raw_hex[:32]}...")
|
|
self.logger.debug(f"Full RF data: {raw_hex}")
|
|
|
|
# Extract SNR/RSSI from the RF data
|
|
if recent_rf_data.get('snr'):
|
|
snr = recent_rf_data['snr']
|
|
self.logger.debug(f"Using SNR from RF data: {snr}")
|
|
|
|
if recent_rf_data.get('rssi'):
|
|
rssi = recent_rf_data['rssi']
|
|
self.logger.debug(f"Using RSSI from RF data: {rssi}")
|
|
|
|
# Try to extract path information from raw hex directly
|
|
path_string = None
|
|
hops = payload.get('path_len', 255)
|
|
|
|
# First try the packet decoder
|
|
# Use payload field if available, otherwise use raw_hex
|
|
payload_hex = recent_rf_data.get('payload')
|
|
packet_info = self.decode_meshcore_packet(raw_hex, payload_hex)
|
|
if packet_info and packet_info.get('path_len') is not None:
|
|
# Valid packet decoded - use the results even if path is empty (0 hops = direct)
|
|
hops = packet_info.get('path_len', 0)
|
|
|
|
# Check if this is a TRACE packet with SNR data
|
|
if packet_info.get('payload_type') == 9: # TRACE packet
|
|
# For TRACE packets, we need to extract the actual routing path differently
|
|
# The path_nodes field contains SNR data, not routing path
|
|
path_hex = packet_info.get('path_hex', '')
|
|
if path_hex and len(path_hex) >= 2:
|
|
# Convert hex string to node list (every 2 characters = 1 node)
|
|
path_nodes = [path_hex[i:i+2] for i in range(0, len(path_hex), 2)]
|
|
path_string = ','.join(path_nodes)
|
|
self.logger.info(f"🎯 EXTRACTED PATH FROM TRACE PACKET: {path_string} ({hops} hops)")
|
|
else:
|
|
path_string = "Direct" if hops == 0 else f"Unknown routing ({hops} hops)"
|
|
self.logger.info(f"🎯 EXTRACTED PATH FROM TRACE PACKET: {path_string}")
|
|
else:
|
|
# For all other packet types, try multiple methods to get the path
|
|
path_string = None
|
|
|
|
# Method 1: Try path_nodes field first
|
|
path_nodes = packet_info.get('path_nodes', [])
|
|
if path_nodes:
|
|
path_string = ','.join(path_nodes)
|
|
self.logger.info(f"🎯 EXTRACTED PATH FROM PACKET: {path_string} ({hops} hops)")
|
|
else:
|
|
# Method 2: Try path_hex field
|
|
path_hex = packet_info.get('path_hex', '')
|
|
if path_hex and len(path_hex) >= 2:
|
|
# Convert hex string to node list (every 2 characters = 1 node)
|
|
path_nodes = [path_hex[i:i+2] for i in range(0, len(path_hex), 2)]
|
|
path_string = ','.join(path_nodes)
|
|
self.logger.info(f"🎯 EXTRACTED PATH FROM PACKET HEX: {path_string} ({hops} hops)")
|
|
else:
|
|
# Method 3: Try path_info.path field
|
|
path_info = packet_info.get('path_info', {})
|
|
if path_info and path_info.get('path'):
|
|
path_string = ','.join(path_info['path'])
|
|
self.logger.info(f"🎯 EXTRACTED PATH FROM PATH_INFO: {path_string} ({hops} hops)")
|
|
else:
|
|
# No path found - this is truly unknown
|
|
path_string = "Direct" if hops == 0 else "Unknown routing"
|
|
self.logger.info(f"🎯 EXTRACTED PATH FROM PACKET: {path_string} ({hops} hops)")
|
|
else:
|
|
# Packet decoding failed - try to extract path directly from raw hex
|
|
self.logger.debug("Packet decoding failed, trying direct hex parsing")
|
|
path_string = self.extract_path_from_raw_hex(raw_hex, hops)
|
|
if path_string:
|
|
self.logger.info(f"🎯 EXTRACTED PATH FROM RAW HEX: {path_string} ({hops} hops)")
|
|
else:
|
|
# Try to use routing info from RF data as fallback
|
|
if recent_rf_data.get('routing_info') and recent_rf_data['routing_info'].get('path_nodes'):
|
|
routing_info = recent_rf_data['routing_info']
|
|
hops = len(routing_info['path_nodes'])
|
|
path_string = ','.join(routing_info['path_nodes'])
|
|
self.logger.info(f"🎯 EXTRACTED PATH FROM RF ROUTING INFO: {path_string} ({hops} hops)")
|
|
else:
|
|
# Final fallback to basic path info
|
|
self.logger.debug("No path info available, using basic path info")
|
|
path_string = None
|
|
else:
|
|
self.logger.warning("❌ NO RF DATA found for channel message after all correlation attempts")
|
|
hops = payload.get('path_len', 255)
|
|
path_string = None
|
|
|
|
# Get the full public key from contacts if available
|
|
sender_pubkey = payload.get('pubkey_prefix', '')
|
|
if hasattr(self.bot.meshcore, 'contacts') and self.bot.meshcore.contacts:
|
|
for contact_key, contact_data in self.bot.meshcore.contacts.items():
|
|
if contact_data.get('public_key', '').startswith(sender_pubkey):
|
|
# Use the full public key from the contact
|
|
sender_pubkey = contact_data.get('public_key', sender_pubkey)
|
|
self.logger.debug(f"Found full public key for {sender_id}: {sender_pubkey[:16]}...")
|
|
break
|
|
|
|
# Convert to our message format
|
|
message = MeshMessage(
|
|
content=message_content, # Use the extracted message content
|
|
sender_id=sender_id,
|
|
sender_pubkey=sender_pubkey,
|
|
channel=channel_name,
|
|
timestamp=payload.get('sender_timestamp', 0),
|
|
snr=snr,
|
|
rssi=rssi,
|
|
hops=hops,
|
|
path=path_string, # Use the path information extracted from RF data
|
|
is_dm=False
|
|
)
|
|
|
|
# Path information is now set directly in the MeshMessage constructor from RF data
|
|
# No need for additional path extraction since we're using the actual routing data
|
|
|
|
# Path information is now set directly in the MeshMessage constructor
|
|
# No need for additional path processing since we're using the actual routing data
|
|
self.logger.debug(f"Message routing info: hops={message.hops}, routing={message.path}")
|
|
|
|
# Always decode and log packet information for debugging (regardless of keywords)
|
|
await self._debug_decode_message_path(message, sender_id, recent_rf_data)
|
|
|
|
# Always attempt packet decoding and log the results for debugging
|
|
await self._debug_decode_packet_for_message(message, sender_id, recent_rf_data)
|
|
|
|
# Process the message
|
|
await self.process_message(message)
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error handling channel message: {e}")
|
|
import traceback
|
|
self.logger.error(traceback.format_exc())
|
|
|
|
async def discover_message_path(self, sender_id: str, rf_data: dict) -> tuple[int, str]:
|
|
"""
|
|
Discover the actual routing path for a message using CLI commands.
|
|
This is more reliable than trying to decode packet fragments.
|
|
|
|
Args:
|
|
sender_id: The name or ID of the sender
|
|
rf_data: The RF data containing pubkey information
|
|
|
|
Returns:
|
|
tuple[int, str]: (Number of hops, formatted path string)
|
|
"""
|
|
try:
|
|
# First try to find the contact by name
|
|
if hasattr(self.bot.meshcore, 'contacts') and self.bot.meshcore.contacts:
|
|
contact = None
|
|
pubkey_prefix = rf_data.get('pubkey_prefix', '')
|
|
|
|
# Look for contact by name first
|
|
for contact_key, contact_data in self.bot.meshcore.contacts.items():
|
|
if contact_data.get('adv_name') == sender_id:
|
|
contact = contact_data
|
|
break
|
|
|
|
# If not found by name, try by pubkey prefix
|
|
if not contact and pubkey_prefix:
|
|
for contact_key, contact_data in self.bot.meshcore.contacts.items():
|
|
if contact_data.get('public_key', '').startswith(pubkey_prefix):
|
|
contact = contact_data
|
|
break
|
|
|
|
if contact:
|
|
# Use the stored path information if available
|
|
out_path = contact.get('out_path', '')
|
|
out_path_len = contact.get('out_path_len', -1)
|
|
|
|
if out_path_len == 0:
|
|
self.logger.debug(f"Direct connection to {sender_id}")
|
|
return 0, "Direct"
|
|
elif out_path_len > 0:
|
|
# Format the path string with two-character node prefixes
|
|
path_string = self._format_path_string(out_path)
|
|
self.logger.debug(f"Stored path to {sender_id}: {out_path_len} hops via {path_string}")
|
|
return out_path_len, path_string
|
|
else:
|
|
# Path not set - use basic info
|
|
self.logger.debug(f"No stored path for {sender_id}, using basic info")
|
|
return 255, "No stored path"
|
|
else:
|
|
self.logger.debug(f"Contact {sender_id} not found in contacts")
|
|
return 255, "Unknown" # Unknown path
|
|
|
|
return 255, "Unknown" # Fallback to unknown
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error discovering message path: {e}")
|
|
return 255
|
|
|
|
# CLI path discovery removed - focusing only on packet decoding
|
|
|
|
async def _debug_decode_message_path(self, message: MeshMessage, sender_id: str, rf_data: dict):
|
|
"""
|
|
Debug method to decode and log path information for ALL incoming messages.
|
|
This runs regardless of whether the message matches keywords, helping with
|
|
network topology debugging.
|
|
|
|
Args:
|
|
message: The received message
|
|
sender_id: The name or ID of the sender
|
|
rf_data: The RF data containing pubkey information
|
|
"""
|
|
try:
|
|
if not rf_data:
|
|
self.logger.debug(f"No RF data for {sender_id}")
|
|
return
|
|
|
|
pubkey_prefix = rf_data.get('pubkey_prefix', '')
|
|
if not pubkey_prefix:
|
|
self.logger.debug(f"No pubkey prefix for {sender_id}")
|
|
return
|
|
|
|
# Try to find the contact to get stored path information
|
|
if hasattr(self.bot.meshcore, 'contacts') and self.bot.meshcore.contacts:
|
|
contact = None
|
|
|
|
# Look for contact by name first
|
|
for contact_key, contact_data in self.bot.meshcore.contacts.items():
|
|
if contact_data.get('adv_name') == sender_id:
|
|
contact = contact_data
|
|
break
|
|
|
|
# If not found by name, try by pubkey prefix
|
|
if not contact:
|
|
for contact_key, contact_data in self.bot.meshcore.contacts.items():
|
|
if contact_data.get('public_key', '').startswith(pubkey_prefix):
|
|
contact = contact_data
|
|
break
|
|
|
|
if contact:
|
|
out_path = contact.get('out_path', '')
|
|
out_path_len = contact.get('out_path_len', -1)
|
|
|
|
if out_path_len == 0:
|
|
self.logger.info(f"📡 {sender_id} → Direct connection")
|
|
elif out_path_len > 0:
|
|
path_string = self._format_path_string(out_path)
|
|
self.logger.info(f"📡 {sender_id} → {path_string} ({out_path_len} hops)")
|
|
else:
|
|
self.logger.info(f"📡 {sender_id} → Path not set")
|
|
else:
|
|
self.logger.info(f"📡 {sender_id} → Contact not found")
|
|
else:
|
|
self.logger.debug(f"No contacts available for {sender_id}")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error in debug path decoding: {e}")
|
|
|
|
async def _debug_decode_packet_for_message(self, message: MeshMessage, sender_id: str, rf_data: dict):
|
|
"""
|
|
Debug method to decode and log packet information for ALL incoming messages.
|
|
This provides comprehensive packet analysis for debugging purposes.
|
|
|
|
Args:
|
|
message: The received message
|
|
sender_id: The name or ID of the sender
|
|
rf_data: The RF data containing raw packet information
|
|
"""
|
|
try:
|
|
if not rf_data:
|
|
self.logger.debug(f"No RF data available for {sender_id}")
|
|
return
|
|
|
|
raw_hex = rf_data.get('raw_hex', '')
|
|
if not raw_hex:
|
|
self.logger.debug(f"No raw_hex in RF data for {sender_id}")
|
|
return
|
|
|
|
self.logger.debug(f"Decoding packet for {sender_id} ({len(raw_hex)} chars)")
|
|
|
|
# Log basic payload info if available
|
|
extracted_payload = rf_data.get('payload', '')
|
|
payload_length = rf_data.get('payload_length', 0)
|
|
|
|
if extracted_payload:
|
|
self.logger.debug(f"Payload: {payload_length} bytes")
|
|
else:
|
|
self.logger.debug("No payload data available")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error in debug packet decoding: {e}")
|
|
|
|
def _format_path_string(self, hex_path: str) -> str:
|
|
"""
|
|
Convert a hex path string to the two-character node prefix format.
|
|
|
|
Args:
|
|
hex_path: Hex string representing the path (e.g., "01025f7e")
|
|
|
|
Returns:
|
|
str: Formatted path string (e.g., "01,02,5f,7e")
|
|
"""
|
|
try:
|
|
if not hex_path:
|
|
return "Direct"
|
|
|
|
# Convert hex to bytes and extract one-byte chunks for two-character format
|
|
path_bytes = bytes.fromhex(hex_path)
|
|
path_nodes = []
|
|
|
|
for i in range(len(path_bytes)):
|
|
# Extract each byte and convert to two-character hex
|
|
node_id = path_bytes[i]
|
|
path_nodes.append(f"{node_id:02x}")
|
|
|
|
if path_nodes:
|
|
return ",".join(path_nodes)
|
|
else:
|
|
return "Direct"
|
|
|
|
except Exception as e:
|
|
self.logger.debug(f"Error formatting path string: {e}")
|
|
truncated = hex_path[:16] if len(hex_path) > 16 else hex_path
|
|
return f"Raw: {truncated}{'...' if len(hex_path) > 16 else ''}"
|
|
|
|
async def process_message(self, message: MeshMessage):
|
|
"""Process a received message"""
|
|
# Check if multitest is listening and notify it
|
|
if self.multitest_listener:
|
|
try:
|
|
self.multitest_listener.on_message_received(message)
|
|
except AttributeError as e:
|
|
self.logger.warning(f"Multitest listener missing method: {e}")
|
|
self.multitest_listener = None # Disable broken listener
|
|
except Exception as e:
|
|
self.logger.error(f"Error notifying multitest listener: {e}", exc_info=True)
|
|
|
|
# Record all messages in stats database FIRST (before any filtering)
|
|
# This ensures we collect stats for all channels, not just monitored ones
|
|
if 'stats' in self.bot.command_manager.commands:
|
|
stats_command = self.bot.command_manager.commands['stats']
|
|
if stats_command:
|
|
stats_command.record_message(message)
|
|
stats_command.record_path_stats(message)
|
|
|
|
# Check greeter command for public channel messages (BEFORE general message filtering)
|
|
# This allows greeter to work on its own configured channels even if not in monitor_channels
|
|
if 'greeter' in self.bot.command_manager.commands:
|
|
greeter_command = self.bot.command_manager.commands['greeter']
|
|
# First, check if this message should cancel a pending greeting (human greeting detection)
|
|
if greeter_command:
|
|
greeter_command.check_message_for_human_greeting(message)
|
|
# Then check if we should greet this user
|
|
if greeter_command and greeter_command.should_execute(message):
|
|
try:
|
|
success = await greeter_command.execute(message)
|
|
|
|
# Small delay to ensure send_response has completed
|
|
await asyncio.sleep(0.1)
|
|
|
|
# Determine if a response was sent
|
|
response_sent = False
|
|
if hasattr(greeter_command, 'last_response') and greeter_command.last_response:
|
|
response_sent = True
|
|
elif hasattr(self.bot.command_manager, '_last_response') and self.bot.command_manager._last_response:
|
|
response_sent = True
|
|
|
|
# Record command execution in stats database
|
|
if 'stats' in self.bot.command_manager.commands:
|
|
stats_command = self.bot.command_manager.commands['stats']
|
|
if stats_command:
|
|
stats_command.record_command(message, 'greeter', response_sent)
|
|
except Exception as e:
|
|
self.logger.error(f"Error executing greeter command: {e}")
|
|
|
|
# Now check if we should process this message for bot responses
|
|
if not self.should_process_message(message):
|
|
return
|
|
|
|
self.logger.info(f"Processing message: {message.content}")
|
|
|
|
# Check for advert command (DM only)
|
|
if message.is_dm and message.content.strip().lower() == "advert":
|
|
await self.bot.command_manager.handle_advert_command(message)
|
|
return
|
|
|
|
# Check for keywords and custom syntax
|
|
keyword_matches = self.bot.command_manager.check_keywords(message)
|
|
|
|
help_response_sent = False
|
|
plugin_command_with_response_matched = False
|
|
if keyword_matches:
|
|
for keyword, response in keyword_matches:
|
|
# Use translator if available for logging
|
|
if hasattr(self.bot, 'translator'):
|
|
log_msg = self.bot.translator.translate('messages.keyword_matched', keyword=keyword)
|
|
self.logger.info(log_msg)
|
|
else:
|
|
self.logger.info(f"Keyword '{keyword}' matched, responding")
|
|
|
|
# Track if this is a help response
|
|
if keyword == 'help':
|
|
help_response_sent = True
|
|
|
|
# Track if this is a plugin command that has a response format
|
|
if keyword in self.bot.command_manager.commands and response is not None:
|
|
plugin_command_with_response_matched = True
|
|
|
|
# Skip commands that handle their own responses (response is None)
|
|
# These will be recorded when they execute via execute_commands
|
|
if response is None:
|
|
continue
|
|
|
|
# Record command execution in stats database for keyword-matched commands with responses
|
|
# Commands without responses (response is None) are recorded in execute_commands to avoid double-counting
|
|
if 'stats' in self.bot.command_manager.commands:
|
|
stats_command = self.bot.command_manager.commands['stats']
|
|
if stats_command:
|
|
# response is not None here, so we know a response will be sent
|
|
stats_command.record_command(message, keyword, True)
|
|
|
|
# Send response
|
|
if message.is_dm:
|
|
success = await self.bot.command_manager.send_dm(message.sender_id, response)
|
|
else:
|
|
success = await self.bot.command_manager.send_channel_message(message.channel, response)
|
|
|
|
# Capture keyword command data for web viewer
|
|
if (hasattr(self.bot, 'web_viewer_integration') and
|
|
self.bot.web_viewer_integration and
|
|
self.bot.web_viewer_integration.bot_integration):
|
|
try:
|
|
self.bot.web_viewer_integration.bot_integration.capture_command(
|
|
message, keyword, response, success
|
|
)
|
|
except Exception as e:
|
|
self.logger.debug(f"Failed to capture keyword data for web viewer: {e}")
|
|
|
|
# Only execute commands if no help response was sent and no plugin command with response was matched
|
|
# Help responses and plugin commands with responses should be the final response for that message
|
|
# Plugin commands without responses (response is None) should still be executed
|
|
if not help_response_sent and not plugin_command_with_response_matched:
|
|
await self.bot.command_manager.execute_commands(message)
|
|
|
|
def should_process_message(self, message: MeshMessage) -> bool:
|
|
"""Check if message should be processed by the bot"""
|
|
# Check if bot is enabled
|
|
if not self.bot.config.getboolean('Bot', 'enabled'):
|
|
return False
|
|
|
|
# Check if sender is banned
|
|
if message.sender_id and message.sender_id in self.bot.command_manager.banned_users:
|
|
self.logger.debug(f"Ignoring message from banned user: {message.sender_id}")
|
|
return False
|
|
|
|
# Check if channel is monitored (with command override support)
|
|
if not message.is_dm and message.channel:
|
|
# Check if channel is in global monitor_channels
|
|
if message.channel in self.bot.command_manager.monitor_channels:
|
|
return True # Global allow - all commands can work
|
|
|
|
# Check if ANY command allows this channel (for selective access)
|
|
for command_name, command in self.bot.command_manager.commands.items():
|
|
if hasattr(command, 'is_channel_allowed') and callable(command.is_channel_allowed):
|
|
if command.is_channel_allowed(message):
|
|
# At least one command allows this channel
|
|
self.logger.debug(f"Channel {message.channel} allowed by command '{command_name}' override")
|
|
return True
|
|
|
|
# Channel not in global list and no command allows it
|
|
self.logger.debug(f"Channel {message.channel} not in monitored channels: {self.bot.command_manager.monitor_channels}")
|
|
return False
|
|
|
|
# Check if DMs are enabled
|
|
if message.is_dm and not self.bot.config.getboolean('Channels', 'respond_to_dms'):
|
|
self.logger.debug("DMs are disabled")
|
|
return False
|
|
|
|
return True
|
|
|
|
async def handle_new_contact(self, event, metadata=None):
|
|
"""Handle NEW_CONTACT events for automatic contact management"""
|
|
try:
|
|
# Copy payload immediately to avoid segfault if event is freed
|
|
# Make a deep copy to ensure we have all the data we need
|
|
if hasattr(event, 'payload'):
|
|
contact_data = copy.deepcopy(event.payload)
|
|
else:
|
|
# Fallback: try to copy the event itself if it's a dict-like object
|
|
contact_data = copy.deepcopy(event) if isinstance(event, dict) else None
|
|
|
|
if not contact_data:
|
|
self.logger.warning("NEW_CONTACT event has no payload data")
|
|
return
|
|
|
|
self.logger.info(f"🔍 NEW_CONTACT EVENT RECEIVED: {event}")
|
|
self.logger.info(f"📦 Event type: {type(event)}")
|
|
self.logger.info(f"📦 Event payload: {contact_data}")
|
|
|
|
# Get contact details
|
|
contact_name = contact_data.get('name', contact_data.get('adv_name', 'Unknown'))
|
|
public_key = contact_data.get('public_key', '')
|
|
|
|
self.logger.info(f"Processing new contact: {contact_name} (key: {public_key[:16]}...)")
|
|
|
|
# Extract additional signal information from the event
|
|
signal_info = {}
|
|
if metadata:
|
|
signal_info.update(metadata)
|
|
|
|
# Try to get signal data, packet_hash, and path information from recent RF data correlation
|
|
# Only collect RSSI/SNR for zero-hop (direct) advertisements
|
|
packet_hash = None
|
|
try:
|
|
# Look for recent RF data that might correlate with this contact
|
|
recent_rf_data = self.bot.message_handler.recent_rf_data
|
|
if recent_rf_data:
|
|
# Find RF data that might match this contact's public key
|
|
for rf_entry in recent_rf_data[-10:]: # Check last 10 RF entries
|
|
if 'routing_info' in rf_entry:
|
|
routing_info = rf_entry['routing_info']
|
|
|
|
# Extract packet_hash if available
|
|
packet_hash = routing_info.get('packet_hash') or rf_entry.get('packet_hash')
|
|
|
|
# Extract path information from routing_info
|
|
path_hex = routing_info.get('path_hex', '')
|
|
path_length = routing_info.get('path_length', 0)
|
|
|
|
# Add path information to contact_data if not already present
|
|
if 'out_path' not in contact_data or not contact_data.get('out_path'):
|
|
if path_hex and path_length > 0:
|
|
contact_data['out_path'] = path_hex
|
|
contact_data['out_path_len'] = path_length
|
|
elif path_length == 0:
|
|
contact_data['out_path'] = ''
|
|
contact_data['out_path_len'] = 0
|
|
|
|
# Only collect signal data for direct (zero-hop) advertisements
|
|
if path_length == 0:
|
|
# Direct advertisement - collect signal data
|
|
if 'snr' in rf_entry:
|
|
signal_info['snr'] = rf_entry['snr']
|
|
if 'rssi' in rf_entry:
|
|
signal_info['rssi'] = rf_entry['rssi']
|
|
signal_info['hops'] = 0
|
|
self.logger.debug(f"📡 Direct advertisement - collecting signal data: SNR={rf_entry.get('snr')}, RSSI={rf_entry.get('rssi')}")
|
|
else:
|
|
# Multi-hop advertisement - only collect hop count, not signal data
|
|
signal_info['hops'] = path_length
|
|
self.logger.debug(f"📡 Multi-hop advertisement ({path_length} hops) - skipping signal data collection")
|
|
break
|
|
except Exception as e:
|
|
self.logger.debug(f"Could not correlate RF data: {e}")
|
|
|
|
# Log captured signal information
|
|
if signal_info:
|
|
self.logger.info(f"📡 Signal data: {signal_info}")
|
|
else:
|
|
self.logger.info(f"📡 No signal data available")
|
|
|
|
# Check if this is a repeater or companion
|
|
if hasattr(self.bot, 'repeater_manager'):
|
|
is_repeater = self.bot.repeater_manager._is_repeater_device(contact_data)
|
|
|
|
if is_repeater:
|
|
# REPEATER: Track directly in SQLite database (no device contact list)
|
|
self.logger.info(f"📡 New repeater discovered: {contact_name} - tracking in database only")
|
|
|
|
# Track repeater in complete database with signal info
|
|
await self.bot.repeater_manager.track_contact_advertisement(contact_data, signal_info, packet_hash=packet_hash)
|
|
|
|
# Check if auto-purge is needed (run after tracking to ensure data is captured)
|
|
await self.bot.repeater_manager.check_and_auto_purge()
|
|
|
|
self.logger.info(f"✅ Repeater {contact_name} tracked in database - not added to device contacts")
|
|
return
|
|
else:
|
|
# COMPANION: Track in database AND add to device contact list
|
|
self.logger.info(f"👤 New companion discovered: {contact_name} - will be added to device contacts")
|
|
|
|
# Track companion in complete database with signal info
|
|
await self.bot.repeater_manager.track_contact_advertisement(contact_data, signal_info, packet_hash=packet_hash)
|
|
|
|
# Add companion to device contact list
|
|
try:
|
|
result = await self.bot.meshcore.commands.add_contact(contact_data)
|
|
if hasattr(result, 'type') and result.type.name == 'OK':
|
|
self.logger.info(f"✅ Companion {contact_name} added to device contacts")
|
|
else:
|
|
self.logger.warning(f"❌ Failed to add companion {contact_name} to device: {result}")
|
|
except Exception as e:
|
|
self.logger.error(f"❌ Error adding companion {contact_name} to device: {e}")
|
|
|
|
# Check if auto-purge is needed
|
|
await self.bot.repeater_manager.check_and_auto_purge()
|
|
return
|
|
|
|
# Fallback: Track in database for unknown contact types
|
|
if hasattr(self.bot, 'repeater_manager'):
|
|
await self.bot.repeater_manager.track_contact_advertisement(contact_data, packet_hash=packet_hash)
|
|
await self.bot.repeater_manager.check_and_auto_purge()
|
|
|
|
# For unknown contact types, handle based on auto_manage_contacts setting
|
|
if hasattr(self.bot, 'repeater_manager'):
|
|
auto_manage_setting = self.bot.config.get('Bot', 'auto_manage_contacts', fallback='false').lower()
|
|
|
|
if auto_manage_setting == 'device':
|
|
# Device mode: Let device handle auto-addition, bot manages capacity
|
|
self.logger.info(f"Device auto-addition mode - new contact '{contact_name}' will be handled by device")
|
|
|
|
# Check contact list capacity and manage if needed
|
|
status = await self.bot.repeater_manager.get_contact_list_status()
|
|
|
|
if status and status.get('is_near_limit', False):
|
|
self.logger.warning(f"Contact list near limit ({status['usage_percentage']:.1f}%) - managing capacity")
|
|
await self.bot.repeater_manager.manage_contact_list(auto_cleanup=True)
|
|
else:
|
|
self.logger.info(f"New contact '{contact_name}' - contact list has adequate space")
|
|
|
|
elif auto_manage_setting == 'bot':
|
|
# Bot mode: Bot automatically adds companion contacts to device and manages capacity
|
|
self.logger.info(f"Bot auto-addition mode - automatically adding new companion contact '{contact_name}' to device")
|
|
|
|
# Add the contact to the device's contact list
|
|
success = await self.bot.repeater_manager.add_discovered_contact(
|
|
contact_name,
|
|
public_key,
|
|
f"Auto-added companion contact discovered via NEW_CONTACT event"
|
|
)
|
|
|
|
if success:
|
|
self.logger.info(f"Successfully added companion contact '{contact_name}' to device")
|
|
else:
|
|
self.logger.warning(f"Failed to add companion contact '{contact_name}' to device")
|
|
|
|
# Check contact list capacity and manage if needed
|
|
status = await self.bot.repeater_manager.get_contact_list_status()
|
|
|
|
if status and status.get('is_near_limit', False):
|
|
self.logger.warning(f"Contact list near limit ({status['usage_percentage']:.1f}%) - managing capacity")
|
|
await self.bot.repeater_manager.manage_contact_list(auto_cleanup=True)
|
|
else:
|
|
self.logger.info(f"New contact '{contact_name}' - contact list has adequate space")
|
|
|
|
else: # false or any other value
|
|
# Manual mode: Just log the discovery, no automatic actions
|
|
self.logger.info(f"Manual mode - new companion contact '{contact_name}' discovered (not auto-added)")
|
|
|
|
# Log the new contact discovery
|
|
if hasattr(self.bot, 'repeater_manager'):
|
|
self.bot.repeater_manager.db_manager.execute_update(
|
|
'INSERT INTO purging_log (action, details) VALUES (?, ?)',
|
|
('new_contact_discovered', f'New contact discovered: {contact_name} (key: {public_key[:16]}...)')
|
|
)
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error handling new contact event: {e}")
|
|
import traceback
|
|
self.logger.error(traceback.format_exc())
|