diff --git a/meshchatx/meshchat.py b/meshchatx/meshchat.py index 8fcf5ed..bf772fb 100644 --- a/meshchatx/meshchat.py +++ b/meshchatx/meshchat.py @@ -24,6 +24,7 @@ import threading import time import traceback import webbrowser +import fnmatch from datetime import UTC, datetime, timedelta from logging.handlers import RotatingFileHandler from urllib.parse import urlparse @@ -1425,6 +1426,115 @@ class ReticulumMeshChat: except Exception: return default + @staticmethod + def parse_discovery_patterns(value): + if value is None: + return [] + if isinstance(value, str): + value = value.replace("\n", ",") + return [part.strip() for part in value.split(",") if part.strip()] + if isinstance(value, (list, tuple)): + return [str(part).strip() for part in value if str(part).strip()] + text_value = str(value).strip() + return [text_value] if text_value else [] + + @staticmethod + def sanitize_discovery_patterns( + value, + max_patterns: int = 128, + max_pattern_length: int = 128, + ): + sanitized = [] + seen = set() + for pattern in ReticulumMeshChat.parse_discovery_patterns(value): + cleaned = pattern.replace("\r", "").replace("\n", "").replace(",", "").strip() + if not cleaned: + continue + cleaned = "".join(ch for ch in cleaned if ch.isprintable()) + if not cleaned: + continue + if len(cleaned) > max_pattern_length: + cleaned = cleaned[:max_pattern_length] + lowered = cleaned.lower() + if lowered in seen: + continue + seen.add(lowered) + sanitized.append(cleaned) + if len(sanitized) >= max_patterns: + break + return sanitized + + @staticmethod + def discovery_filter_candidates(interface): + if not isinstance(interface, dict): + return [str(interface)] + candidates = [] + for key in ( + "name", + "type", + "reachable_on", + "target_host", + "remote", + "listen_ip", + "port", + "target_port", + "listen_port", + "discovery_hash", + "transport_id", + "network_id", + ): + value = interface.get(key) + if value is not None and value != "": + candidates.append(str(value)) + + host = ( + interface.get("reachable_on") + or interface.get("target_host") + or interface.get("remote") + or interface.get("listen_ip") + ) + port = ( + interface.get("port") + or interface.get("target_port") + or interface.get("listen_port") + ) + if host and port: + candidates.append(f"{host}:{port}") + return candidates + + @staticmethod + def matches_discovery_pattern(patterns, interface): + if not patterns: + return False + candidates = [ + value.lower() + for value in ReticulumMeshChat.discovery_filter_candidates(interface) + ] + for pattern in patterns: + normalized_pattern = str(pattern).lower() + for candidate in candidates: + if fnmatch.fnmatchcase(candidate, normalized_pattern): + return True + return False + + @staticmethod + def filter_discovered_interfaces(interfaces, whitelist_patterns, blacklist_patterns): + if not isinstance(interfaces, list): + return interfaces + whitelist = ReticulumMeshChat.sanitize_discovery_patterns(whitelist_patterns) + blacklist = ReticulumMeshChat.sanitize_discovery_patterns(blacklist_patterns) + return [ + interface + for interface in interfaces + if ( + ( + not whitelist + or ReticulumMeshChat.matches_discovery_pattern(whitelist, interface) + ) + and not ReticulumMeshChat.matches_discovery_pattern(blacklist, interface) + ) + ] + def get_lxst_version(self) -> str: return self.get_package_version("lxst", getattr(LXST, "__version__", "unknown")) @@ -3850,6 +3960,9 @@ class ReticulumMeshChat: }, "emergency": getattr(self, "emergency", False), "integrity_issues": getattr(self, "integrity_issues", []), + "database_health_issues": getattr( + self, "database_health_issues", [] + ), "user_guidance": self.build_user_guidance_messages(), "tutorial_seen": self.config.get("tutorial_seen", "false") == "true", @@ -4496,6 +4609,12 @@ class ReticulumMeshChat: "interface_discovery_sources": reticulum_config.get( "interface_discovery_sources", ), + "interface_discovery_whitelist": reticulum_config.get( + "interface_discovery_whitelist", + ), + "interface_discovery_blacklist": reticulum_config.get( + "interface_discovery_blacklist", + ), "required_discovery_value": reticulum_config.get( "required_discovery_value", ), @@ -4526,11 +4645,23 @@ class ReticulumMeshChat: if value is None or value == "": reticulum_config.pop(key, None) else: + if key in ( + "interface_discovery_whitelist", + "interface_discovery_blacklist", + ): + sanitized = ReticulumMeshChat.sanitize_discovery_patterns(value) + if sanitized: + reticulum_config[key] = ",".join(sanitized) + else: + reticulum_config.pop(key, None) + return reticulum_config[key] = value for key in ( "discover_interfaces", "interface_discovery_sources", + "interface_discovery_whitelist", + "interface_discovery_blacklist", "required_discovery_value", "autoconnect_discovered_interfaces", "network_identity", @@ -4548,6 +4679,12 @@ class ReticulumMeshChat: "interface_discovery_sources": reticulum_config.get( "interface_discovery_sources", ), + "interface_discovery_whitelist": reticulum_config.get( + "interface_discovery_whitelist", + ), + "interface_discovery_blacklist": reticulum_config.get( + "interface_discovery_blacklist", + ), "required_discovery_value": reticulum_config.get( "required_discovery_value", ), @@ -4564,6 +4701,14 @@ class ReticulumMeshChat: try: discovery = InterfaceDiscovery(discover_interfaces=False) interfaces = discovery.list_discovered_interfaces() + reticulum_config = self._get_reticulum_section() + whitelist_patterns = reticulum_config.get("interface_discovery_whitelist") + blacklist_patterns = reticulum_config.get("interface_discovery_blacklist") + interfaces = ReticulumMeshChat.filter_discovered_interfaces( + interfaces, + whitelist_patterns, + blacklist_patterns, + ) active = [] try: if hasattr(self, "reticulum") and self.reticulum: