Files
meshcore-bot/modules/service_plugins/telegram_bridge_service.py
agessaman f789b72b92 Implement bot response bridging for Discord and Telegram
- Added configuration options to bridge the bot's own channel responses to Discord and Telegram, allowing command replies to be sent to respective channels.
- Updated `DiscordBridgeService` and `TelegramBridgeService` to register listeners for bot-sent messages, ensuring they are included in the bridging process.
- Enhanced `CommandManager` to invoke listeners with a synthetic event when a message is successfully sent, providing context about the message.
- Updated `config.ini.example` to reflect the new `bridge_bot_responses` option for both services.
- Added tests to verify the correct invocation of listeners upon successful message sending.
2026-03-02 16:39:30 -08:00

479 lines
19 KiB
Python

#!/usr/bin/env python3
"""
Telegram Bridge Service for MeshCore Bot
Posts MeshCore channel messages to Telegram via the Bot API (one-way, read-only)
"""
import asyncio
import html
import os
import re
import time
import copy
from collections import deque
from dataclasses import dataclass
from typing import Dict, Optional, Any, List
from meshcore import EventType
try:
import aiohttp
AIOHTTP_AVAILABLE = True
except ImportError:
aiohttp = None
AIOHTTP_AVAILABLE = False
try:
import requests
REQUESTS_AVAILABLE = True
except ImportError:
requests = None
REQUESTS_AVAILABLE = False
from .base_service import BaseServicePlugin
from ..profanity_filter import censor, contains_profanity
# Telegram API
TELEGRAM_API_BASE = "https://api.telegram.org/bot"
TELEGRAM_MAX_MESSAGE_LENGTH = 4096
TELEGRAM_TRUNCATE_AT = 4000
@dataclass
class QueuedMessage:
"""Represents a message queued for Telegram posting."""
chat_id: str
payload: Dict[str, Any]
channel_name: str
retry_count: int = 0
first_queued: float = 0.0
next_retry_at: float = 0.0
def __post_init__(self):
if self.first_queued == 0.0:
self.first_queued = time.time()
if self.next_retry_at == 0.0:
self.next_retry_at = time.time()
class TelegramBridgeService(BaseServicePlugin):
"""Telegram bridge service.
Posts MeshCore channel messages to Telegram channels/groups via the Bot API.
One-way bridge - messages only flow from MeshCore to Telegram.
Direct messages are NEVER bridged for privacy.
"""
config_section = 'TelegramBridge'
description = "Posts MeshCore channel messages to Telegram (one-way, read-only)"
def __init__(self, bot: Any):
super().__init__(bot)
if not AIOHTTP_AVAILABLE and not REQUESTS_AVAILABLE:
self.logger.error(
"Neither aiohttp nor requests available. Telegram bridge requires one of these."
)
self.enabled = False
return
# API token: config or env (env takes precedence for security)
self.api_token = (
os.environ.get('TELEGRAM_BOT_TOKEN') or
self.bot.config.get('TelegramBridge', 'api_token', fallback='').strip()
)
if not self.api_token:
self.logger.error("Telegram bridge: api_token not set. Set in config or TELEGRAM_BOT_TOKEN env.")
self.enabled = False
return
self.channel_chat_ids: Dict[str, str] = {}
self._load_channel_mappings()
# Optional settings
self.parse_mode = self.bot.config.get('TelegramBridge', 'parse_mode', fallback='HTML')
if self.parse_mode and self.parse_mode.upper() not in ('HTML', 'MARKDOWN', 'MARKDOWNV2'):
self.parse_mode = 'HTML'
self.disable_web_page_preview = self.bot.config.getboolean(
'TelegramBridge', 'disable_web_page_preview', fallback=False
)
self.max_message_length = self.bot.config.getint(
'TelegramBridge', 'max_message_length', fallback=TELEGRAM_MAX_MESSAGE_LENGTH
)
self.max_message_length = min(self.max_message_length, TELEGRAM_MAX_MESSAGE_LENGTH)
# Profanity filter: drop (default), censor, or off
raw_filter = self.bot.config.get('TelegramBridge', 'filter_profanity', fallback='drop').strip().lower()
if raw_filter not in ('drop', 'censor', 'off'):
raw_filter = 'drop'
self.filter_profanity = raw_filter
# Bridge bot's own channel responses to Telegram (default: true)
self.bridge_bot_responses = self.bot.config.getboolean(
'TelegramBridge', 'bridge_bot_responses', fallback=True
)
# Rate limiting: ~1 message per second per chat
self.message_queues: Dict[str, List[QueuedMessage]] = {}
self.send_times: Dict[str, deque] = {}
self.rate_limit_min_interval = 1.0
self.max_retries = 5
self.retry_delay_base = 1.0
self.max_queue_age = 300
self.http_session: Optional[aiohttp.ClientSession] = None
self._queue_processor_task: Optional[asyncio.Task] = None
if not self.channel_chat_ids:
self.logger.warning(
"No Telegram channel mappings configured. "
"Add bridge.<channelname> = <chat_id> in [TelegramBridge]"
)
def _load_channel_mappings(self) -> None:
"""Load bridge.<channel> = chat_id from config."""
if not self.bot.config.has_section('TelegramBridge'):
return
for key, value in self.bot.config.items('TelegramBridge'):
if key.startswith('bridge.'):
channel_name = key[7:].strip()
chat_id = value.strip()
if not chat_id:
continue
self.channel_chat_ids[channel_name] = chat_id
# Log chat_id safely (mask numeric IDs partially)
if chat_id.startswith('@'):
safe_id = chat_id
else:
safe_id = chat_id[:4] + '...' + chat_id[-4:] if len(chat_id) > 10 else '***'
self.logger.info(f"Configured Telegram bridge: {channel_name}{safe_id}")
self.logger.info(f"Loaded {len(self.channel_chat_ids)} Telegram channel mapping(s)")
def _mask_token(self, token: str) -> str:
if not token or len(token) < 8:
return "***"
return token[:4] + "..." + token[-4:]
def _format_mentions_html(self, text: str) -> str:
"""Convert @[username] to <code>@username</code> for Telegram HTML."""
pattern = r'@\[([^\]]+)\]'
return re.sub(pattern, r'<code>@\1</code>', text)
def _escape_html(self, s: str) -> str:
return html.escape(s, quote=True)
def _build_message_text(
self,
sender_name: str,
message_text: str,
channel_name: str,
use_channel_tag: bool = True,
) -> str:
"""Build HTML message: [Channel] **Sender**: text (with escaping)."""
safe_sender = self._escape_html(sender_name)
formatted_body = self._format_mentions_html(message_text)
# Escape HTML in the rest of the body (mentions already wrapped in <code>)
parts = re.split(r'(<code>[^<]*</code>)', formatted_body)
escaped_parts = [
self._escape_html(p) if not p.startswith('<code>') else p
for p in parts
]
safe_body = ''.join(escaped_parts)
if use_channel_tag:
prefix = f"<i>[{self._escape_html(channel_name)}]</i> "
else:
prefix = ""
return f"{prefix}<b>{safe_sender}</b>: {safe_body}"
def _truncate_text(self, text: str) -> str:
if len(text) <= self.max_message_length:
return text
self.logger.debug(f"Truncating message from {len(text)} to {self.max_message_length} chars")
return text[: self.max_message_length - 1].rstrip() + ""
async def start(self) -> None:
if not self.enabled:
self.logger.info("Telegram bridge service is disabled")
return
if not self.channel_chat_ids:
self.logger.warning("Telegram bridge enabled but no channels configured")
return
self.logger.info("Starting Telegram bridge service...")
if AIOHTTP_AVAILABLE:
self.http_session = aiohttp.ClientSession()
else:
self.logger.debug("Using requests for HTTP (fallback)")
if hasattr(self.bot, 'meshcore') and self.bot.meshcore:
self.bot.meshcore.subscribe(EventType.CHANNEL_MSG_RECV, self._on_mesh_channel_message)
self.logger.info("Subscribed to CHANNEL_MSG_RECV events")
else:
self.logger.error("Cannot subscribe to events - meshcore not available")
return
# Register for bot-sent channel messages so bot responses are bridged too
if self.bridge_bot_responses and getattr(self.bot, 'channel_sent_listeners', None) is not None:
self.bot.channel_sent_listeners.append(self._on_mesh_channel_message)
self.logger.info("Registered for bot channel-sent events (bridge_bot_responses=true)")
for chat_id in self.channel_chat_ids.values():
self.message_queues[chat_id] = []
self.send_times[chat_id] = deque()
self._queue_processor_task = asyncio.create_task(self._process_message_queues())
self._running = True
self.logger.info(
f"Telegram bridge service started (bridging {len(self.channel_chat_ids)} channels)"
)
async def stop(self) -> None:
self.logger.info("Stopping Telegram bridge service...")
self._running = False
# Unregister bot channel-sent listener
if getattr(self.bot, 'channel_sent_listeners', None) is not None:
try:
self.bot.channel_sent_listeners.remove(self._on_mesh_channel_message)
except ValueError:
pass
if self._queue_processor_task:
self._queue_processor_task.cancel()
try:
await self._queue_processor_task
except asyncio.CancelledError:
pass
if self.http_session:
await self.http_session.close()
self.http_session = None
self.logger.info("Telegram bridge service stopped")
async def _on_mesh_channel_message(self, event, metadata=None) -> None:
try:
payload = copy.deepcopy(event.payload) if hasattr(event, 'payload') else None
if payload is None:
self.logger.warning("Channel message event has no payload")
return
channel_idx = payload.get('channel_idx', 0)
channel_name = self.bot.channel_manager.get_channel_name(channel_idx)
text = payload.get('text', '')
sender = 'Unknown'
if ':' in text and not text.startswith('http'):
parts = text.split(':', 1)
sender = parts[0].strip()
if not channel_name or channel_name.lower() in ('dm', 'direct', 'private'):
self.logger.debug("Ignoring DM (DMs are never bridged)")
return
chat_id = None
# Normalize: strip leading # and compare case-insensitively so bridge.HowlTest matches #howltest
channel_key = channel_name.lstrip('#').lower()
for config_channel, cid in self.channel_chat_ids.items():
if config_channel.lstrip('#').lower() == channel_key:
chat_id = cid
break
if not chat_id:
self.logger.debug(f"Channel '{channel_name}' not configured for Telegram bridge")
return
if ':' in text and not text.startswith('http'):
parts = text.split(':', 1)
sender_name = parts[0].strip()
message_text = parts[1].strip() if len(parts) > 1 else text
else:
sender_name = sender
message_text = text
# Profanity filter: drop (don't bridge), censor (replace with ****), or off
if self.filter_profanity == 'drop':
if contains_profanity(sender_name, self.logger) or contains_profanity(message_text, self.logger):
self.logger.debug(f"Telegram bridge: dropping message with profanity from [{channel_name}]")
return
elif self.filter_profanity == 'censor':
sender_name = censor(sender_name, self.logger)
message_text = censor(message_text, self.logger)
full_text = self._build_message_text(sender_name, message_text, channel_name)
full_text = self._truncate_text(full_text)
await self._queue_message(chat_id, full_text, channel_name)
except Exception as e:
self.logger.error(f"Error handling mesh channel message: {e}", exc_info=True)
async def _queue_message(self, chat_id: str, text: str, channel_name: str) -> None:
payload: Dict[str, Any] = {
"chat_id": chat_id,
"text": text,
}
if self.parse_mode:
payload["parse_mode"] = self.parse_mode
if self.disable_web_page_preview:
payload["disable_web_page_preview"] = True
queued = QueuedMessage(
chat_id=chat_id,
payload=payload,
channel_name=channel_name,
)
if chat_id not in self.message_queues:
self.message_queues[chat_id] = []
self.message_queues[chat_id].append(queued)
self.logger.debug(f"Queued message for Telegram [{channel_name}]: {text[:50]}...")
async def _process_message_queues(self) -> None:
while self._running:
try:
current_time = time.time()
for chat_id, queue in list(self.message_queues.items()):
if not queue:
continue
# Enforce min interval per chat
if chat_id in self.send_times:
st = self.send_times[chat_id]
while st and (current_time - st[0]) > self.rate_limit_min_interval:
st.popleft()
if st and (current_time - st[-1]) < self.rate_limit_min_interval:
continue
queued_msg = None
for msg in queue:
if current_time >= msg.next_retry_at:
queued_msg = msg
break
if queued_msg is None:
continue
age = current_time - queued_msg.first_queued
if age > self.max_queue_age:
queue.remove(queued_msg)
self.logger.warning(
f"Dropping old message from queue [{queued_msg.channel_name}]: "
f"age {age:.1f}s > {self.max_queue_age}s"
)
continue
success = await self._send_to_telegram(
queued_msg.chat_id,
queued_msg.payload,
queued_msg.channel_name,
queued_msg,
)
if success:
queue.remove(queued_msg)
if chat_id not in self.send_times:
self.send_times[chat_id] = deque()
self.send_times[chat_id].append(current_time)
else:
queued_msg.retry_count += 1
if queued_msg.retry_count > self.max_retries:
queue.remove(queued_msg)
self.logger.error(
f"Dropping message after {self.max_retries} retries "
f"[{queued_msg.channel_name}]: {queued_msg.payload['text'][:50]}..."
)
else:
delay = self.retry_delay_base * (2 ** (queued_msg.retry_count - 1))
queued_msg.next_retry_at = current_time + delay
self.logger.debug(
f"Message failed, retry in {delay:.1f}s "
f"({queued_msg.retry_count}/{self.max_retries}) [{queued_msg.channel_name}]"
)
await asyncio.sleep(0.1)
except asyncio.CancelledError:
break
except Exception as e:
self.logger.error(f"Error in Telegram queue processor: {e}", exc_info=True)
await asyncio.sleep(1.0)
async def _send_to_telegram(
self,
chat_id: str,
payload: Dict[str, Any],
channel_name: str,
queued_msg: Optional[QueuedMessage] = None,
) -> bool:
url = f"{TELEGRAM_API_BASE}{self.api_token}/sendMessage"
if AIOHTTP_AVAILABLE and self.http_session:
return await self._send_async(url, payload, channel_name, queued_msg)
elif REQUESTS_AVAILABLE:
return await self._send_sync(url, payload, channel_name, queued_msg)
self.logger.error("No HTTP library available for Telegram")
return False
async def _send_async(
self,
url: str,
payload: Dict[str, Any],
channel_name: str,
queued_msg: Optional[QueuedMessage] = None,
) -> bool:
try:
async with self.http_session.post(
url, json=payload, timeout=aiohttp.ClientTimeout(total=10)
) as response:
data = await response.json() if response.content else {}
if response.status == 200 and data.get('ok'):
self.logger.debug(f"Posted to Telegram [{channel_name}]: {payload['text'][:50]}...")
return True
if response.status == 429:
retry_after = (data.get('parameters') or {}).get('retry_after', 60)
self.logger.warning(
f"Telegram rate limit for [{channel_name}]. Retry after: {retry_after}s"
)
if queued_msg:
queued_msg.next_retry_at = time.time() + retry_after
queued_msg.retry_count = max(0, queued_msg.retry_count - 1)
return False
self.logger.warning(
f"Telegram API returned {response.status} for [{channel_name}]: {data.get('description', '')}"
)
return False
except asyncio.TimeoutError:
self.logger.error(f"Timeout posting to Telegram [{channel_name}]")
return False
except Exception as e:
self.logger.error(f"Error posting to Telegram [{channel_name}]: {e}")
return False
async def _send_sync(
self,
url: str,
payload: Dict[str, Any],
channel_name: str,
queued_msg: Optional[QueuedMessage] = None,
) -> bool:
try:
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
None,
lambda: requests.post(url, json=payload, timeout=10),
)
data = response.json() if response.content else {}
if response.status_code == 200 and data.get('ok'):
self.logger.debug(f"Posted to Telegram [{channel_name}]: {payload['text'][:50]}...")
return True
if response.status_code == 429:
retry_after = (data.get('parameters') or {}).get('retry_after', 60)
self.logger.warning(
f"Telegram rate limit for [{channel_name}]. Retry after: {retry_after}s"
)
if queued_msg:
queued_msg.next_retry_at = time.time() + retry_after
queued_msg.retry_count = max(0, queued_msg.retry_count - 1)
return False
self.logger.warning(
f"Telegram API returned {response.status_code} for [{channel_name}]: "
f"{data.get('description', '')}"
)
return False
except Exception as e:
self.logger.error(f"Error posting to Telegram [{channel_name}]: {e}")
return False