Files
meshcore-bot/modules/config_validation.py
Adam Gessaman 4bf09291f5 Implement public channel guard and enhance configuration validation
- Updated `config.ini.example` to include a warning about running the bot on the Public channel and added an override key for intentional usage.
- Enhanced `config_validation.py` to implement a public channel guard that prevents the bot from starting if the Public channel is included in monitored channels without the override.
- Refactored `CommandManager` and `Core` to check for the Public channel key during channel loading and connection setup, ensuring compliance with the new guard.
- Improved documentation in `configuration.md` and `config-validation.md` to clarify the implications of using the Public channel and the necessary configuration changes.
2026-04-09 20:44:48 -07:00

309 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Configuration validation for MeshCore Bot config.ini.
Validates section names against canonical (standardized) names and flags
non-standard sections (e.g. WebViewer instead of Web_Viewer). Can be run
standalone via validate_config.py or at bot startup with --validate-config.
"""
import configparser
import os
from pathlib import Path
from typing import Optional
# Severity levels for validation results
SEVERITY_ERROR = "error"
SEVERITY_WARNING = "warning"
SEVERITY_INFO = "info"
# Public channel guard: first 16 bytes of SHA256("#public") as hex.
# The Public channel always has this key regardless of display name.
PUBLIC_CHANNEL_KEY_HEX = "8b3387e9c5cdea6ac9e5edbaa115cd72"
PUBLIC_CHANNEL_OVERRIDE_KEY = (
"i_understand_that_running_the_bot_on_the_public_channel_is_potentially_"
"disruptive_to_other_users_enjoyment_of_the_mesh_and_i_would_like_to_do_it_anyway"
)
def _channel_name_is_public(name: str) -> bool:
"""Return True if name matches the conventional 'Public' channel name.
Public is a special channel with a fixed key (not hashtag-derived).
This name check is used pre-connection as a heuristic; the authoritative
check is _check_public_channel_guard() which compares actual device keys.
"""
return name.strip().lstrip("#").lower() == "public"
# Canonical non-command section names (as used in config.ini.example and code)
CANONICAL_NON_COMMAND_SECTIONS = frozenset({
"Connection",
"Bot",
"Channels",
"Banned_Users",
"Localization",
"Admin_ACL",
"Plugin_Overrides",
"Companion_Purge",
"Keywords",
"Scheduled_Messages",
"Logging",
"Custom_Syntax",
"External_Data",
"Weather",
"Solar_Config",
"Channels_List",
"Web_Viewer",
"Feed_Manager",
"PacketCapture",
"MapUploader",
"Weather_Service",
"DiscordBridge",
})
# Sections required for the bot to start (accessed without has_section guards)
# Missing any of these causes ConfigParser.NoSectionError during startup
REQUIRED_SECTIONS = frozenset({
"Connection", # Serial/BLE/TCP connection params
"Bot", # db_path, rate limits, bot_name, etc.
"Channels", # monitor_channels, respond_to_dms
})
# Optional sections: when absent, use defaults or treat as empty/disabled
ADMIN_ACL_SECTION = "Admin_ACL" # admin commands disabled
BANNED_USERS_SECTION = "Banned_Users" # empty banned list
LOCALIZATION_SECTION = "Localization" # language=en, translation_path=translations/
def strip_optional_quotes(s: str) -> str:
"""Strip one layer of surrounding double or single quotes if present.
Allows config values like monitor_channels to be written as
"#bot,#bot-everett,#bots" so the list does not look like comments.
Backward compatible: unquoted values are returned unchanged.
"""
if not isinstance(s, str):
return s
s = s.strip()
if len(s) >= 2 and s[0] == s[-1] and s[0] in '"\'':
return s[1:-1]
return s
# Non-standard section name -> suggested canonical name (exact match)
SECTION_TYPO_MAP = {
"WebViewer": "Web_Viewer",
"FeedManager": "Feed_Manager",
"PrefixCommand": "Prefix_Command",
"Jokes": "Joke_Command / DadJoke_Command (deprecated; move options into those sections)",
}
def _get_command_prefix_to_section() -> dict[str, str]:
"""Build map from command prefix (lowercase) to canonical section for similarity suggestions.
Discovers command sections from config.ini.example in the project. Returns a dict
mapping prefix.lower() -> "Prefix_Command" (e.g. "stats" -> "Stats_Command").
"""
result: dict[str, str] = {}
base = Path(__file__).resolve().parent.parent # project root
example_paths = [base / "config.ini.example", base / "config.ini.minimal-example"]
for path in example_paths:
if not path.exists():
continue
try:
parser = configparser.ConfigParser()
parser.read(path)
for section in parser.sections():
if section.endswith("_Command"):
prefix = section[:-8] # remove "_Command"
result[prefix.lower()] = section
except configparser.Error:
pass
return result
def _suggest_similar_command(section: str, prefix_to_section: dict[str, str]) -> Optional[str]:
"""If section looks like a command name (e.g. Stats, Hacker), suggest the canonical section."""
return prefix_to_section.get(section.strip().lower())
def _resolve_path(file_path: str, base_dir: Path) -> Path:
"""Resolve a path relative to base_dir (or as absolute)."""
p = Path(file_path)
if p.is_absolute():
return p.resolve()
return (base_dir.resolve() / p).resolve()
def _check_path_writable(
file_path: str, base_dir: Path, description: str
) -> Optional[str]:
"""Check if a file path can be written. Returns warning message if not."""
if not file_path or not file_path.strip():
return None
try:
resolved = _resolve_path(file_path.strip(), base_dir)
except (OSError, RuntimeError):
return f"{description}: cannot resolve path '{file_path}'"
parent = resolved.parent
# Find first existing ancestor to check writability
check_dir = parent
while not check_dir.exists():
check_dir = check_dir.parent
if check_dir == check_dir.parent: # reached root
return f"{description} '{resolved}': parent directory does not exist"
if not os.access(str(check_dir), os.W_OK):
return f"{description} '{resolved}': directory {check_dir} is not writable"
# If file exists, verify it's writable
if resolved.exists() and not os.access(str(resolved), os.W_OK):
return f"{description} '{resolved}': file exists but is not writable"
return None
def validate_config(config_path: str) -> list[tuple[str, str]]:
"""
Validate config file section names. Returns a list of (severity, message).
Args:
config_path: Path to config.ini (or other config file).
Returns:
List of (severity, message). severity is one of SEVERITY_*.
"""
path = Path(config_path)
if not path.exists():
return [(SEVERITY_ERROR, f"Config file not found: {config_path}")]
config = configparser.ConfigParser()
try:
config.read(config_path)
except configparser.Error as e:
return [(SEVERITY_ERROR, f"Failed to parse config: {e}")]
results: list[tuple[str, str]] = []
# Check required sections (bot fails to start without these)
sections_present = frozenset(s.strip() for s in config.sections() if s.strip())
missing_required = REQUIRED_SECTIONS - sections_present
for section in sorted(missing_required):
results.append((
SEVERITY_ERROR,
f"Missing required section [{section}]; bot will not start without it.",
))
# Note when optional sections are absent
if ADMIN_ACL_SECTION not in sections_present:
results.append((
SEVERITY_INFO,
f"Section [{ADMIN_ACL_SECTION}] absent; admin commands (repeater, webviewer, reload, channelpause) disabled.",
))
if BANNED_USERS_SECTION not in sections_present:
results.append((
SEVERITY_INFO,
f"Section [{BANNED_USERS_SECTION}] absent; no users banned.",
))
if LOCALIZATION_SECTION not in sections_present:
results.append((
SEVERITY_INFO,
f"Section [{LOCALIZATION_SECTION}] absent; using defaults (language=en, translation_path=translations/).",
))
# Check writable paths (database, log file)
bot_root = Path(config_path).resolve().parent
if config.has_section("Bot"):
db_path = config.get("Bot", "db_path", fallback="").strip()
if db_path:
msg = _check_path_writable(db_path, bot_root, "Database path")
if msg:
results.append((SEVERITY_WARNING, msg))
# Optional: local_dir_path should exist and be readable when set
local_dir = config.get("Bot", "local_dir_path", fallback="").strip()
if local_dir:
try:
resolved = _resolve_path(local_dir, bot_root)
if not resolved.exists():
results.append((
SEVERITY_WARNING,
f"Local plugins path '{resolved}' does not exist; local commands/plugins will not load.",
))
elif not os.access(str(resolved), os.R_OK):
results.append((
SEVERITY_WARNING,
f"Local plugins path '{resolved}' is not readable.",
))
except (OSError, RuntimeError):
results.append((
SEVERITY_WARNING,
f"Local plugins path: cannot resolve '{local_dir}'.",
))
if config.has_section("Logging"):
log_file = config.get("Logging", "log_file", fallback="").strip()
if log_file:
msg = _check_path_writable(log_file, bot_root, "Log file path")
if msg:
results.append((SEVERITY_WARNING, msg))
if config.has_section("Web_Viewer"):
web_db = config.get("Web_Viewer", "db_path", fallback="").strip()
if web_db:
msg = _check_path_writable(web_db, bot_root, "Web viewer db_path")
if msg:
results.append((SEVERITY_WARNING, msg))
# Public channel guard: refuse to run on the shared Public channel without explicit override
if config.has_section("Channels") and config.has_option("Channels", "monitor_channels"):
raw = strip_optional_quotes(config.get("Channels", "monitor_channels", fallback=""))
entries = [e.strip() for e in raw.split(",") if e.strip()]
if any(_channel_name_is_public(e) for e in entries):
override = config.get("Bot", PUBLIC_CHANNEL_OVERRIDE_KEY, fallback="").strip().lower()
if override != "true":
results.append((
SEVERITY_ERROR,
"monitor_channels includes the Public channel. Running a bot on Public "
"is disruptive to other mesh users. To override, add to [Bot]:\n"
f" {PUBLIC_CHANNEL_OVERRIDE_KEY} = true",
))
prefix_to_section: Optional[dict[str, str]] = None
for section in config.sections():
section_stripped = section.strip()
if not section_stripped:
continue
# Valid: canonical non-command section
if section_stripped in CANONICAL_NON_COMMAND_SECTIONS:
continue
# Valid: command section (ends with _Command)
if section_stripped.endswith("_Command"):
continue
# Check typo map for known non-standard names
if section_stripped in SECTION_TYPO_MAP:
suggestion = SECTION_TYPO_MAP[section_stripped]
# Special case: [Jokes] + [Joke_Command]/[DadJoke_Command] overlap
if section_stripped == "Jokes" and (
"Joke_Command" in sections_present or "DadJoke_Command" in sections_present
):
results.append((
SEVERITY_WARNING,
"Both [Jokes] and [Joke_Command]/[DadJoke_Command] are present; "
"the *_Command sections take precedence. Consider removing [Jokes] to avoid confusion.",
))
else:
results.append((
SEVERITY_WARNING,
f"Non-standard section [{section_stripped}]; did you mean [{suggestion}]?",
))
else:
# Check if section looks like a command name (e.g. [Stats] -> [Stats_Command])
if prefix_to_section is None:
prefix_to_section = _get_command_prefix_to_section()
similar = _suggest_similar_command(section_stripped, prefix_to_section)
if similar:
msg = f"Unknown section [{section_stripped}]; did you mean [{similar}]?"
else:
msg = f"Unknown section [{section_stripped}] (not in canonical list and not a *_Command section)."
results.append((SEVERITY_INFO, msg))
return results