diff --git a/modules/bridge_outbound.py b/modules/bridge_outbound.py new file mode 100644 index 0000000..51c4f4d --- /dev/null +++ b/modules/bridge_outbound.py @@ -0,0 +1,196 @@ +#!/usr/bin/env python3 +""" +Minimal Discord webhook and Telegram Bot API posting for service plugins. + +Independent of DiscordBridgeService / TelegramBridgeService queues and lifecycle. +""" + +from __future__ import annotations + +import asyncio +import logging +from typing import Any, Optional + +try: + import aiohttp + + AIOHTTP_AVAILABLE = True +except ImportError: + aiohttp = None # type: ignore[assignment] + AIOHTTP_AVAILABLE = False + +try: + import requests + + REQUESTS_AVAILABLE = True +except ImportError: + requests = None # type: ignore[assignment] + REQUESTS_AVAILABLE = False + +DISCORD_WEBHOOK_PREFIX = "https://discord.com/api/webhooks/" +DISCORD_CONTENT_MAX = 2000 +TELEGRAM_TEXT_MAX = 4096 +TELEGRAM_TRUNCATE_AT = 4000 +HTTP_TIMEOUT_SECONDS = 10.0 + + +def is_valid_discord_webhook_url(url: str) -> bool: + u = (url or "").strip() + return bool(u.startswith(DISCORD_WEBHOOK_PREFIX)) + + +def _truncate_discord_content(content: str) -> str: + if len(content) <= DISCORD_CONTENT_MAX: + return content + return content[: DISCORD_CONTENT_MAX - 1].rstrip() + "…" + + +def _truncate_telegram_text(text: str) -> str: + if len(text) <= TELEGRAM_TRUNCATE_AT: + return text + return text[: TELEGRAM_TRUNCATE_AT - 1].rstrip() + "…" + + +async def post_discord_webhook( + url: str, + content: str, + *, + username: str = "MeshCore", + session: Any = None, + logger: Optional[logging.Logger] = None, +) -> bool: + """POST plain text to a Discord webhook. Returns True on HTTP 204/200.""" + log = logger or logging.getLogger(__name__) + if not is_valid_discord_webhook_url(url): + log.warning("Invalid Discord webhook URL (skipped)") + return False + + payload = { + "content": _truncate_discord_content(content), + "username": username[:80] if username else "MeshCore", + } + + if AIOHTTP_AVAILABLE: + timeout = aiohttp.ClientTimeout(total=HTTP_TIMEOUT_SECONDS) + try: + if session is not None: + async with session.post(url, json=payload, timeout=timeout) as response: + return await _discord_response_ok(response, log) + async with aiohttp.ClientSession() as sess: + async with sess.post(url, json=payload, timeout=timeout) as response: + return await _discord_response_ok(response, log) + except asyncio.TimeoutError: + log.error("Timeout posting to Discord webhook") + return False + except Exception as e: + log.error("Error posting to Discord webhook: %s", e, exc_info=True) + return False + + if REQUESTS_AVAILABLE: + loop = asyncio.get_event_loop() + + def _sync_post() -> bool: + try: + r = requests.post(url, json=payload, timeout=HTTP_TIMEOUT_SECONDS) + if r.status_code in (200, 204): + return True + log.warning( + "Discord webhook returned %s: %s", + r.status_code, + (r.text or "")[:200], + ) + return False + except Exception as ex: + log.error("requests Discord webhook error: %s", ex, exc_info=True) + return False + + return await loop.run_in_executor(None, _sync_post) + + log.error("No aiohttp or requests; cannot post to Discord webhook") + return False + + +async def _discord_response_ok(response: Any, log: logging.Logger) -> bool: + if response.status in (200, 204): + return True + body = await response.text() + log.warning("Discord webhook returned %s: %s", response.status, body[:200]) + return False + + +async def post_telegram_message( + api_token: str, + chat_id: str, + text: str, + *, + session: Any = None, + logger: Optional[logging.Logger] = None, +) -> bool: + """POST sendMessage (plain text, no parse_mode).""" + log = logger or logging.getLogger(__name__) + token = (api_token or "").strip() + cid = (chat_id or "").strip() + if not token or not cid: + log.warning("Telegram post skipped: missing token or chat_id") + return False + + safe_text = _truncate_telegram_text(text) + url = f"https://api.telegram.org/bot{token}/sendMessage" + payload: dict[str, Any] = {"chat_id": cid, "text": safe_text} + + if AIOHTTP_AVAILABLE: + timeout = aiohttp.ClientTimeout(total=HTTP_TIMEOUT_SECONDS) + try: + if session is not None: + async with session.post(url, json=payload, timeout=timeout) as response: + return await _telegram_response_ok(response, log) + async with aiohttp.ClientSession() as sess: + async with sess.post(url, json=payload, timeout=timeout) as response: + return await _telegram_response_ok(response, log) + except asyncio.TimeoutError: + log.error("Timeout posting to Telegram") + return False + except Exception as e: + log.error("Error posting to Telegram: %s", e, exc_info=True) + return False + + if REQUESTS_AVAILABLE: + loop = asyncio.get_event_loop() + + def _sync_post() -> bool: + try: + r = requests.post(url, json=payload, timeout=HTTP_TIMEOUT_SECONDS) + if r.status_code == 200: + try: + data = r.json() + except Exception: + data = {} + if data.get("ok"): + return True + log.warning("Telegram API not ok: %s", data.get("description", r.text[:200])) + return False + log.warning("Telegram returned %s: %s", r.status_code, (r.text or "")[:200]) + return False + except Exception as ex: + log.error("requests Telegram error: %s", ex, exc_info=True) + return False + + return await loop.run_in_executor(None, _sync_post) + + log.error("No aiohttp or requests; cannot post to Telegram") + return False + + +async def _telegram_response_ok(response: Any, log: logging.Logger) -> bool: + try: + data = await response.json() if response.content else {} + except Exception: + data = {} + if response.status == 200 and data.get("ok"): + return True + log.warning( + "Telegram API returned %s: %s", + response.status, + (data.get("description") or "")[:200], + ) + return False diff --git a/modules/service_plugins/base_service.py b/modules/service_plugins/base_service.py index 0a4a104..6b716e8 100644 --- a/modules/service_plugins/base_service.py +++ b/modules/service_plugins/base_service.py @@ -3,16 +3,43 @@ Base service plugin class for background services """ +from __future__ import annotations + +import asyncio from abc import ABC, abstractmethod +from dataclasses import dataclass from typing import Any, Optional +@dataclass +class ExternalNotifySettings: + """Parsed outbound notification targets from the plugin config section.""" + + discord_urls: list[str] + telegram_chat_ids: list[str] + telegram_token: Optional[str] + + class BaseServicePlugin(ABC): """Base class for background service plugins. This class defines the interface for service plugins, which are long-running background tasks that can interact with the bot and mesh network. It manages service lifecycle (start/stop) and metadata. + + **Optional outbound notifications (Discord / Telegram)** — read from this + plugin's config section (``config_section``): + + - ``discord_webhook_urls`` — comma-separated Discord webhook URLs + (``https://discord.com/api/webhooks/...``). + - ``telegram_chat_ids`` — comma-separated chat IDs or ``@channel`` usernames. + - ``telegram_bot_token`` — optional; else ``TELEGRAM_BOT_TOKEN`` env, then + ``[TelegramBridge] api_token``. + + **Mesh silence convention:** ``silence_mesh_output`` (default false). Services + that both transmit mesh channel messages and call ``send_external_notifications`` + should skip ``send_channel_message`` when this is true so alerts go only to + webhook/Telegram. Subclasses implement the guard at their mesh send sites. """ # Optional: Config section name (if different from class name) @@ -35,6 +62,132 @@ class BaseServicePlugin(ABC): self.logger = bot.logger self.enabled = True self._running = False + self._external_notify_cache: Optional[ExternalNotifySettings] = None + + def _resolve_telegram_token_for_section(self, section: str) -> Optional[str]: + import os + + if self.bot.config.has_section(section): + t = (self.bot.config.get(section, "telegram_bot_token", fallback="") or "").strip() + if t: + return t + t = (os.environ.get("TELEGRAM_BOT_TOKEN") or "").strip() + if t: + return t + if self.bot.config.has_section("TelegramBridge"): + t = (self.bot.config.get("TelegramBridge", "api_token", fallback="") or "").strip() + if t: + return t + return None + + def _parse_external_notify_settings(self) -> ExternalNotifySettings: + from modules.bridge_outbound import is_valid_discord_webhook_url + + section = self.config_section or self._derive_config_section() + discord_raw = "" + telegram_raw = "" + if self.bot.config.has_section(section): + discord_raw = (self.bot.config.get(section, "discord_webhook_urls", fallback="") or "").strip() + telegram_raw = (self.bot.config.get(section, "telegram_chat_ids", fallback="") or "").strip() + + discord_urls = [ + u.strip() + for u in discord_raw.split(",") + if u.strip() and is_valid_discord_webhook_url(u.strip()) + ] + telegram_chat_ids = [t.strip() for t in telegram_raw.split(",") if t.strip()] + telegram_token = self._resolve_telegram_token_for_section(section) + + return ExternalNotifySettings( + discord_urls=discord_urls, + telegram_chat_ids=telegram_chat_ids, + telegram_token=telegram_token, + ) + + def _get_external_notify_settings(self) -> ExternalNotifySettings: + if self._external_notify_cache is None: + self._external_notify_cache = self._parse_external_notify_settings() + return self._external_notify_cache + + def has_external_notification_targets(self) -> bool: + """True if Discord URLs are set, or Telegram chats plus a resolved bot token.""" + s = self._get_external_notify_settings() + if s.discord_urls: + return True + return bool(s.telegram_chat_ids and s.telegram_token) + + def _external_notify_discord_username(self) -> str: + label = self.config_section or self._derive_config_section() + return (label or "MeshCore")[:80] + + async def send_external_notifications(self, text: str, *, discord_username: Optional[str] = None) -> None: + """Send text to configured Discord webhooks and Telegram chats. + + No-op when no URLs/chat IDs are configured. Logs per-target failures; + does not raise. Uses aiohttp with one shared session when available. + """ + from modules import bridge_outbound + + settings = self._get_external_notify_settings() + if not settings.discord_urls and not settings.telegram_chat_ids: + return + + user = discord_username if discord_username is not None else self._external_notify_discord_username() + + if bridge_outbound.AIOHTTP_AVAILABLE: + import aiohttp + + aws: list[Any] = [] + async with aiohttp.ClientSession() as session: + for url in settings.discord_urls: + aws.append( + bridge_outbound.post_discord_webhook( + url, text, username=user, session=session, logger=self.logger + ) + ) + tok = settings.telegram_token + if tok: + for cid in settings.telegram_chat_ids: + aws.append( + bridge_outbound.post_telegram_message( + tok, cid, text, session=session, logger=self.logger + ) + ) + elif settings.telegram_chat_ids: + self.logger.warning( + "telegram_chat_ids set but no Telegram bot token " + "(set telegram_bot_token, TELEGRAM_BOT_TOKEN, or [TelegramBridge] api_token)" + ) + + if aws: + results = await asyncio.gather(*aws, return_exceptions=True) + for r in results: + if isinstance(r, Exception): + self.logger.warning("External notification error: %s", r, exc_info=True) + return + + for url in settings.discord_urls: + try: + await bridge_outbound.post_discord_webhook( + url, text, username=user, session=None, logger=self.logger + ) + except Exception as e: + self.logger.warning("Discord webhook failed: %s", e, exc_info=True) + + tok = settings.telegram_token + if tok: + for cid in settings.telegram_chat_ids: + try: + await bridge_outbound.post_telegram_message( + tok, cid, text, session=None, logger=self.logger + ) + except Exception as e: + self.logger.warning("Telegram failed: %s", e, exc_info=True) + elif settings.telegram_chat_ids: + self.logger.warning( + "telegram_chat_ids set but no Telegram bot token " + "(set telegram_bot_token, TELEGRAM_BOT_TOKEN, or [TelegramBridge] api_token)" + ) @abstractmethod async def start(self) -> None: diff --git a/modules/service_plugins/repeater_prefix_collision_service.py b/modules/service_plugins/repeater_prefix_collision_service.py index 446477f..2f64162 100644 --- a/modules/service_plugins/repeater_prefix_collision_service.py +++ b/modules/service_plugins/repeater_prefix_collision_service.py @@ -4,6 +4,14 @@ Repeater Prefix Collision Service for MeshCore Bot Watches NEW_CONTACT events and notifies channels when a newly discovered repeater shares a prefix with an existing repeater (configurable: 1/2/3 bytes). + +Optional Discord/Telegram via ``send_external_notifications`` (see BaseServicePlugin): + +- ``notify_external_on_all_new_repeaters`` — when true, sends a discovery message to + webhook/Telegram for every qualified new repeater/roomserver; collision detail stays + on mesh only (unless ``silence_mesh_output``). +- ``silence_mesh_output`` — when true, collision alerts are not sent on mesh channels; + discovery/collision externals follow ``notify_external_on_all_new_repeaters``. """ import asyncio @@ -64,19 +72,30 @@ class RepeaterPrefixCollisionService(BaseServicePlugin): ) self._notified: dict[_NotifyKey, float] = {} # key -> last_sent_epoch_seconds self._prefix_cooldown: dict[tuple[int, str], float] = {} # (bytes, prefix_hex_lower) -> last_sent_epoch_seconds + self._discovery_notified: dict[str, float] = {} # public_key -> last discovery external epoch self._dedupe_prune_max_age_seconds = max( 3600.0, float(self.cooldown_minutes_per_prefix) * 120.0, ) + self.notify_external_on_all_new_repeaters = self.bot.config.getboolean( + section, "notify_external_on_all_new_repeaters", fallback=False + ) + self.silence_mesh_output = self.bot.config.getboolean( + section, "silence_mesh_output", fallback=False + ) + self._running = False self._handler_installed = False self._handler_lock = asyncio.Lock() self.logger.info( - "RepeaterPrefixCollision service initialized: channels=%s notify_on_prefix_bytes=%s", + "RepeaterPrefixCollision service initialized: channels=%s notify_on_prefix_bytes=%s " + "notify_external_on_all_new_repeaters=%s silence_mesh_output=%s", self.channels, self.notify_on_prefix_bytes, + self.notify_external_on_all_new_repeaters, + self.silence_mesh_output, ) async def start(self) -> None: @@ -184,6 +203,9 @@ class RepeaterPrefixCollisionService(BaseServicePlugin): self._prune_old_dedupe_state() + if self.notify_external_on_all_new_repeaters: + await self._maybe_send_discovery_external(public_key, name, location) + for nbytes in self.notify_on_prefix_bytes: await self._maybe_notify_for_prefix_bytes( public_key=public_key, @@ -260,6 +282,31 @@ class RepeaterPrefixCollisionService(BaseServicePlugin): stale_p = [k for k, ts in self._prefix_cooldown.items() if ts < cutoff] for prefix_key in stale_p: del self._prefix_cooldown[prefix_key] + if self._discovery_notified: + stale_d = [k for k, ts in self._discovery_notified.items() if ts < cutoff] + for dk in stale_d: + del self._discovery_notified[dk] + + async def _maybe_send_discovery_external(self, public_key: str, name: str, location: str) -> None: + """Webhook/Telegram discovery line; deduped per public_key (same cooldown window).""" + if not self.has_external_notification_targets(): + return + if self._is_discovery_recently_notified(public_key): + return + text = self._format_discovery_message(public_key, name, location) + await self.send_external_notifications(text, discord_username="New repeater") + self._discovery_notified[public_key] = time.time() + + def _format_discovery_message(self, public_key: str, name: str, location: str) -> str: + pk_short = f"{public_key[:16]}…" if len(public_key) > 16 else public_key + return f"New repeater heard: {name} near {location}. Key {pk_short}" + + def _is_discovery_recently_notified(self, public_key: str) -> bool: + ts = self._discovery_notified.get(public_key) + if not ts: + return False + cooldown_s = max(0, int(self.cooldown_minutes_per_prefix)) * 60 + return (time.time() - ts) < cooldown_s if cooldown_s else False async def _maybe_notify_for_prefix_bytes( self, @@ -294,7 +341,12 @@ class RepeaterPrefixCollisionService(BaseServicePlugin): prefix_bytes=prefix_bytes, ) - await self._send_to_channels(text) + if not self.notify_external_on_all_new_repeaters: + await self.send_external_notifications( + text, discord_username="Repeater prefix collision" + ) + if not self.silence_mesh_output: + await self._send_to_channels(text) pk_short = f"{public_key[:16]}…" if len(public_key) > 16 else public_key self.logger.info( diff --git a/tests/test_base_service_external_notifications.py b/tests/test_base_service_external_notifications.py new file mode 100644 index 0000000..d762fdb --- /dev/null +++ b/tests/test_base_service_external_notifications.py @@ -0,0 +1,92 @@ +"""Tests for BaseServicePlugin outbound helpers.""" + +import configparser +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from modules.service_plugins.base_service import BaseServicePlugin + + +class _StubPlugin(BaseServicePlugin): + config_section = "StubExternal_Service" + + async def start(self) -> None: + self._running = True + + async def stop(self) -> None: + self._running = False + + +def _bot_with_section(**section_kv): + bot = MagicMock() + bot.logger = MagicMock() + bot.logger.warning = MagicMock() + cfg = configparser.ConfigParser() + cfg.add_section("StubExternal_Service") + for k, v in section_kv.items(): + cfg.set("StubExternal_Service", k, str(v)) + bot.config = cfg + return bot + + +def test_parse_external_notify_filters_invalid_discord_urls(): + bot = _bot_with_section( + discord_webhook_urls="https://discord.com/api/webhooks/1/abc, https://evil.com/x", + telegram_chat_ids="", + ) + svc = _StubPlugin(bot) + s = svc._parse_external_notify_settings() + assert len(s.discord_urls) == 1 + assert s.discord_urls[0].startswith("https://discord.com/api/webhooks/1/") + + +def test_has_external_notification_targets_discord_only(): + bot = _bot_with_section( + discord_webhook_urls="https://discord.com/api/webhooks/1/abc", + ) + svc = _StubPlugin(bot) + assert svc.has_external_notification_targets() is True + + +def test_has_external_notification_targets_telegram_requires_token(): + bot = _bot_with_section( + telegram_chat_ids="-1001", + ) + svc = _StubPlugin(bot) + assert svc.has_external_notification_targets() is False + + bot2 = _bot_with_section( + telegram_chat_ids="-1001", + telegram_bot_token="secret", + ) + svc2 = _StubPlugin(bot2) + assert svc2.has_external_notification_targets() is True + + +@pytest.mark.asyncio +async def test_send_external_notifications_noop_when_empty(): + bot = _bot_with_section() + svc = _StubPlugin(bot) + await svc.send_external_notifications("hello") + # no crash + + +@pytest.mark.asyncio +async def test_send_external_notifications_calls_discord_and_telegram(): + bot = _bot_with_section( + discord_webhook_urls="https://discord.com/api/webhooks/1/tok", + telegram_chat_ids="-99", + telegram_bot_token="secret", + ) + svc = _StubPlugin(bot) + + with ( + patch("modules.bridge_outbound.post_discord_webhook", new_callable=AsyncMock, return_value=True) as pd, + patch("modules.bridge_outbound.post_telegram_message", new_callable=AsyncMock, return_value=True) as pt, + patch("modules.bridge_outbound.AIOHTTP_AVAILABLE", True), + ): + await svc.send_external_notifications("ping") + + assert pd.await_count == 1 + assert pt.await_count == 1 diff --git a/tests/test_bridge_outbound.py b/tests/test_bridge_outbound.py new file mode 100644 index 0000000..91baf2b --- /dev/null +++ b/tests/test_bridge_outbound.py @@ -0,0 +1,87 @@ +"""Tests for modules.bridge_outbound.""" + +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from modules import bridge_outbound + + +def test_is_valid_discord_webhook_url(): + assert bridge_outbound.is_valid_discord_webhook_url( + "https://discord.com/api/webhooks/123/abc-token" + ) + assert not bridge_outbound.is_valid_discord_webhook_url("http://example.com/hook") + assert not bridge_outbound.is_valid_discord_webhook_url("") + + +@pytest.mark.asyncio +async def test_post_discord_webhook_async_success(): + mock_resp = AsyncMock() + mock_resp.status = 204 + + mock_session = MagicMock() + mock_session.post = MagicMock() + cm = MagicMock() + cm.__aenter__ = AsyncMock(return_value=mock_resp) + cm.__aexit__ = AsyncMock(return_value=None) + mock_session.post.return_value = cm + + ok = await bridge_outbound.post_discord_webhook( + "https://discord.com/api/webhooks/1/tok", + "hello world", + username="Bot", + session=mock_session, + logger=MagicMock(), + ) + assert ok is True + + +@pytest.mark.asyncio +async def test_post_telegram_message_async_ok_json(): + mock_resp = AsyncMock() + mock_resp.status = 200 + mock_resp.json = AsyncMock(return_value={"ok": True}) + + mock_session = MagicMock() + cm = MagicMock() + cm.__aenter__ = AsyncMock(return_value=mock_resp) + cm.__aexit__ = AsyncMock(return_value=None) + mock_session.post.return_value = cm + + ok = await bridge_outbound.post_telegram_message( + "tok123", + "-100123", + "alert text", + session=mock_session, + logger=MagicMock(), + ) + assert ok is True + + +@pytest.mark.asyncio +async def test_post_discord_invalid_url_returns_false(): + log = MagicMock() + ok = await bridge_outbound.post_discord_webhook( + "https://example.com/nope", + "x", + logger=log, + ) + assert ok is False + + +@pytest.mark.asyncio +@patch.object(bridge_outbound, "AIOHTTP_AVAILABLE", False) +@patch.object(bridge_outbound, "REQUESTS_AVAILABLE", True) +async def test_post_discord_requests_fallback(): + mock_resp = MagicMock() + mock_resp.status_code = 204 + + with patch("modules.bridge_outbound.requests.post", return_value=mock_resp) as p: + ok = await bridge_outbound.post_discord_webhook( + "https://discord.com/api/webhooks/9/x", + "hi", + logger=MagicMock(), + ) + assert ok is True + p.assert_called_once() diff --git a/tests/test_repeater_prefix_collision_service.py b/tests/test_repeater_prefix_collision_service.py index 9a3ff0c..5050e0f 100644 --- a/tests/test_repeater_prefix_collision_service.py +++ b/tests/test_repeater_prefix_collision_service.py @@ -1,7 +1,7 @@ import asyncio import configparser from datetime import date, timedelta -from unittest.mock import AsyncMock, MagicMock, Mock +from unittest.mock import AsyncMock, MagicMock, Mock, patch import pytest @@ -279,3 +279,107 @@ async def test_on_new_contact_schedules_task_that_sends(): await asyncio.sleep(0.05) assert bot.command_manager.send_channel_message.await_count == 2 + + +@pytest.mark.asyncio +async def test_discovery_external_when_notify_all_true_and_webhook(): + bot = _make_bot( + { + "notify_external_on_all_new_repeaters": "true", + "discord_webhook_urls": "https://discord.com/api/webhooks/123456789/abcdefghij", + } + ) + db: _FakeDB = bot.db_manager + db.contact_row = _base_contact_row() + db.duplicate_count = 0 + + svc = RepeaterPrefixCollisionService(bot) + await svc.start() + + with patch.object(svc, "send_external_notifications", new_callable=AsyncMock) as ext: + await svc._handle_new_contact_payload({"public_key": "01020304", "name": "Lonely"}) + + assert ext.await_count == 1 + args = ext.await_args_list[0][0] + assert "New repeater heard" in args[0] + assert "NewRepeater" in args[0] + assert bot.command_manager.send_channel_message.await_count == 0 + + +@pytest.mark.asyncio +async def test_discovery_skips_external_when_no_targets_configured(): + bot = _make_bot({"notify_external_on_all_new_repeaters": "true"}) + db: _FakeDB = bot.db_manager + db.contact_row = _base_contact_row() + db.duplicate_count = 0 + + svc = RepeaterPrefixCollisionService(bot) + await svc.start() + + with patch.object(svc, "send_external_notifications", new_callable=AsyncMock) as ext: + await svc._handle_new_contact_payload({"public_key": "01020304", "name": "Lonely"}) + + assert ext.await_count == 0 + + +@pytest.mark.asyncio +async def test_silence_mesh_skips_channel_but_sends_collision_external(): + bot = _make_bot( + { + "silence_mesh_output": "true", + "notify_external_on_all_new_repeaters": "false", + } + ) + db: _FakeDB = bot.db_manager + db.contact_row = _base_contact_row() + db.duplicate_count = 2 + + svc = RepeaterPrefixCollisionService(bot) + await svc.start() + + with patch.object(svc, "send_external_notifications", new_callable=AsyncMock) as ext: + await svc._handle_new_contact_payload({"public_key": "01020304", "name": "NewRepeater"}) + + assert bot.command_manager.send_channel_message.await_count == 0 + assert ext.await_count == 1 + assert "Heard new repeater" in ext.await_args_list[0][0][0] + + +@pytest.mark.asyncio +async def test_collision_external_default_username_when_notify_all_false(): + bot = _make_bot() + db: _FakeDB = bot.db_manager + db.contact_row = _base_contact_row() + db.duplicate_count = 2 + + svc = RepeaterPrefixCollisionService(bot) + await svc.start() + + with patch.object(svc, "send_external_notifications", new_callable=AsyncMock) as ext: + await svc._handle_new_contact_payload({"public_key": "01020304", "name": "NewRepeater"}) + + kw = ext.await_args_list[0][1] + assert kw.get("discord_username") == "Repeater prefix collision" + + +@pytest.mark.asyncio +async def test_notify_all_true_collision_only_discovery_external_not_collision_copy(): + bot = _make_bot( + { + "notify_external_on_all_new_repeaters": "true", + "discord_webhook_urls": "https://discord.com/api/webhooks/123456789/abcdefghij", + } + ) + db: _FakeDB = bot.db_manager + db.contact_row = _base_contact_row() + db.duplicate_count = 2 + + svc = RepeaterPrefixCollisionService(bot) + await svc.start() + + with patch.object(svc, "send_external_notifications", new_callable=AsyncMock) as ext: + await svc._handle_new_contact_payload({"public_key": "01020304", "name": "Dup"}) + + assert ext.await_count == 1 + assert "New repeater heard" in ext.await_args_list[0][0][0] + assert "free prefixes remain" not in ext.await_args_list[0][0][0]