mirror of
https://github.com/agessaman/meshcore-bot.git
synced 2026-03-29 03:19:51 +00:00
- Updated the _cleanup_web_viewer and _cleanup_mesh_graph methods to avoid logging errors during shutdown, as the logger's stream may be closed at that time. - Modified the shutdown method in MeshGraph to prevent logging of flushing errors, enhancing stability during the atexit process. - Adjusted test configurations to use Path objects for bot_root and local_root, improving path handling in tests.
452 lines
18 KiB
Python
452 lines
18 KiB
Python
"""Tests for modules.command_manager."""
|
|
|
|
import time
|
|
|
|
import pytest
|
|
from configparser import ConfigParser
|
|
from pathlib import Path
|
|
from unittest.mock import Mock, MagicMock, patch, AsyncMock
|
|
|
|
from modules.command_manager import CommandManager, InternetStatusCache
|
|
from modules.models import MeshMessage
|
|
from tests.conftest import mock_message
|
|
|
|
|
|
@pytest.fixture
|
|
def cm_bot(mock_logger):
|
|
"""Mock bot for CommandManager tests."""
|
|
bot = Mock()
|
|
bot.logger = mock_logger
|
|
bot.bot_root = Path("/tmp")
|
|
bot._local_root = None # Use bot_root / local / commands in CommandManager
|
|
bot.config = ConfigParser()
|
|
bot.config.add_section("Bot")
|
|
bot.config.set("Bot", "bot_name", "TestBot")
|
|
bot.config.add_section("Channels")
|
|
bot.config.set("Channels", "monitor_channels", "general,test")
|
|
bot.config.set("Channels", "respond_to_dms", "true")
|
|
bot.config.add_section("Keywords")
|
|
bot.config.set("Keywords", "ping", "Pong!")
|
|
bot.config.set("Keywords", "test", "ack")
|
|
bot.translator = Mock()
|
|
# Translator returns "key: kwarg_values" so assertions can check content
|
|
bot.translator.translate = Mock(
|
|
side_effect=lambda key, **kw: f"{key}: {' '.join(str(v) for v in kw.values())}"
|
|
)
|
|
bot.meshcore = None
|
|
bot.rate_limiter = Mock()
|
|
bot.rate_limiter.can_send = Mock(return_value=True)
|
|
bot.bot_tx_rate_limiter = Mock()
|
|
bot.bot_tx_rate_limiter.wait_for_tx = Mock()
|
|
bot.tx_delay_ms = 0
|
|
return bot
|
|
|
|
|
|
def make_manager(bot, commands=None):
|
|
"""Create CommandManager with mocked PluginLoader."""
|
|
with patch("modules.command_manager.PluginLoader") as mock_loader_class:
|
|
mock_loader = Mock()
|
|
mock_loader.load_all_plugins = Mock(return_value=commands or {})
|
|
mock_loader_class.return_value = mock_loader
|
|
return CommandManager(bot)
|
|
|
|
|
|
class TestLoadKeywords:
|
|
"""Tests for keyword loading from config."""
|
|
|
|
def test_load_keywords_from_config(self, cm_bot):
|
|
manager = make_manager(cm_bot)
|
|
assert manager.keywords["ping"] == "Pong!"
|
|
assert manager.keywords["test"] == "ack"
|
|
|
|
def test_load_keywords_strips_quotes(self, cm_bot):
|
|
cm_bot.config.set("Keywords", "quoted", '"Hello World"')
|
|
manager = make_manager(cm_bot)
|
|
assert manager.keywords["quoted"] == "Hello World"
|
|
|
|
def test_load_keywords_decodes_escapes(self, cm_bot):
|
|
cm_bot.config.set("Keywords", "multiline", r"Line1\nLine2")
|
|
manager = make_manager(cm_bot)
|
|
assert "\n" in manager.keywords["multiline"]
|
|
|
|
def test_load_keywords_empty_section(self, cm_bot):
|
|
cm_bot.config.remove_section("Keywords")
|
|
cm_bot.config.add_section("Keywords")
|
|
manager = make_manager(cm_bot)
|
|
assert manager.keywords == {}
|
|
|
|
|
|
class TestLoadBannedUsers:
|
|
"""Tests for banned users loading."""
|
|
|
|
def test_load_banned_users_from_config(self, cm_bot):
|
|
cm_bot.config.add_section("Banned_Users")
|
|
cm_bot.config.set("Banned_Users", "banned_users", "BadUser1, BadUser2")
|
|
manager = make_manager(cm_bot)
|
|
assert "BadUser1" in manager.banned_users
|
|
assert "BadUser2" in manager.banned_users
|
|
|
|
def test_load_banned_users_empty(self, cm_bot):
|
|
manager = make_manager(cm_bot)
|
|
assert manager.banned_users == []
|
|
|
|
def test_load_banned_users_whitespace_handling(self, cm_bot):
|
|
cm_bot.config.add_section("Banned_Users")
|
|
cm_bot.config.set("Banned_Users", "banned_users", " user1 , user2 ")
|
|
manager = make_manager(cm_bot)
|
|
assert "user1" in manager.banned_users
|
|
assert "user2" in manager.banned_users
|
|
|
|
|
|
class TestIsUserBanned:
|
|
"""Tests for ban checking logic."""
|
|
|
|
def test_exact_match(self, cm_bot):
|
|
cm_bot.config.add_section("Banned_Users")
|
|
cm_bot.config.set("Banned_Users", "banned_users", "BadUser")
|
|
manager = make_manager(cm_bot)
|
|
assert manager.is_user_banned("BadUser") is True
|
|
|
|
def test_prefix_match(self, cm_bot):
|
|
cm_bot.config.add_section("Banned_Users")
|
|
cm_bot.config.set("Banned_Users", "banned_users", "BadUser")
|
|
manager = make_manager(cm_bot)
|
|
assert manager.is_user_banned("BadUser 123") is True
|
|
|
|
def test_no_match(self, cm_bot):
|
|
cm_bot.config.add_section("Banned_Users")
|
|
cm_bot.config.set("Banned_Users", "banned_users", "BadUser")
|
|
manager = make_manager(cm_bot)
|
|
assert manager.is_user_banned("GoodUser") is False
|
|
|
|
def test_none_sender(self, cm_bot):
|
|
manager = make_manager(cm_bot)
|
|
assert manager.is_user_banned(None) is False
|
|
|
|
|
|
class TestChannelTriggerAllowed:
|
|
"""Tests for _is_channel_trigger_allowed."""
|
|
|
|
def test_dm_always_allowed(self, cm_bot):
|
|
cm_bot.config.set("Channels", "channel_keywords", "ping")
|
|
manager = make_manager(cm_bot)
|
|
msg = mock_message(content="wx", is_dm=True)
|
|
assert manager._is_channel_trigger_allowed("wx", msg) is True
|
|
|
|
def test_none_whitelist_allows_all(self, cm_bot):
|
|
manager = make_manager(cm_bot)
|
|
assert manager.channel_keywords is None
|
|
msg = mock_message(content="anything", channel="general", is_dm=False)
|
|
assert manager._is_channel_trigger_allowed("anything", msg) is True
|
|
|
|
def test_whitelist_allows_listed(self, cm_bot):
|
|
cm_bot.config.set("Channels", "channel_keywords", "ping, help")
|
|
manager = make_manager(cm_bot)
|
|
msg = mock_message(content="ping", channel="general", is_dm=False)
|
|
assert manager._is_channel_trigger_allowed("ping", msg) is True
|
|
|
|
def test_whitelist_blocks_unlisted(self, cm_bot):
|
|
cm_bot.config.set("Channels", "channel_keywords", "ping, help")
|
|
manager = make_manager(cm_bot)
|
|
msg = mock_message(content="wx", channel="general", is_dm=False)
|
|
assert manager._is_channel_trigger_allowed("wx", msg) is False
|
|
|
|
|
|
class TestLoadMonitorChannels:
|
|
"""Tests for monitor channels loading."""
|
|
|
|
def test_load_monitor_channels(self, cm_bot):
|
|
manager = make_manager(cm_bot)
|
|
assert "general" in manager.monitor_channels
|
|
assert "test" in manager.monitor_channels
|
|
assert len(manager.monitor_channels) == 2
|
|
|
|
def test_load_monitor_channels_empty(self, cm_bot):
|
|
cm_bot.config.set("Channels", "monitor_channels", "")
|
|
manager = make_manager(cm_bot)
|
|
assert manager.monitor_channels == []
|
|
|
|
def test_load_monitor_channels_quoted(self, cm_bot):
|
|
"""Quoted monitor_channels (e.g. \"#bot,#bot-everett,#bots\") is supported."""
|
|
cm_bot.config.set("Channels", "monitor_channels", '"#bot,#bot-everett,#bots"')
|
|
manager = make_manager(cm_bot)
|
|
assert manager.monitor_channels == ["#bot", "#bot-everett", "#bots"]
|
|
|
|
|
|
class TestLoadChannelKeywords:
|
|
"""Tests for channel keyword whitelist loading."""
|
|
|
|
def test_load_channel_keywords_returns_list(self, cm_bot):
|
|
cm_bot.config.set("Channels", "channel_keywords", "ping, wx, help")
|
|
manager = make_manager(cm_bot)
|
|
assert isinstance(manager.channel_keywords, list)
|
|
assert "ping" in manager.channel_keywords
|
|
assert "wx" in manager.channel_keywords
|
|
assert "help" in manager.channel_keywords
|
|
|
|
def test_load_channel_keywords_empty_returns_none(self, cm_bot):
|
|
cm_bot.config.set("Channels", "channel_keywords", "")
|
|
manager = make_manager(cm_bot)
|
|
assert manager.channel_keywords is None
|
|
|
|
def test_load_channel_keywords_not_set_returns_none(self, cm_bot):
|
|
manager = make_manager(cm_bot)
|
|
assert manager.channel_keywords is None
|
|
|
|
|
|
class TestCheckKeywords:
|
|
"""Tests for check_keywords() message matching."""
|
|
|
|
def test_exact_keyword_match(self, cm_bot):
|
|
manager = make_manager(cm_bot)
|
|
msg = mock_message(content="ping", channel="general", is_dm=False)
|
|
matches = manager.check_keywords(msg)
|
|
assert any(trigger == "ping" for trigger, _ in matches)
|
|
|
|
def test_prefix_required_blocks_bare_keyword(self, cm_bot):
|
|
cm_bot.config.set("Bot", "command_prefix", "!")
|
|
manager = make_manager(cm_bot)
|
|
msg = mock_message(content="ping", channel="general", is_dm=False)
|
|
matches = manager.check_keywords(msg)
|
|
assert len(matches) == 0
|
|
|
|
def test_prefix_matches(self, cm_bot):
|
|
cm_bot.config.set("Bot", "command_prefix", "!")
|
|
manager = make_manager(cm_bot)
|
|
msg = mock_message(content="!ping", channel="general", is_dm=False)
|
|
matches = manager.check_keywords(msg)
|
|
assert any(trigger == "ping" for trigger, _ in matches)
|
|
|
|
def test_wrong_channel_no_match(self, cm_bot):
|
|
manager = make_manager(cm_bot)
|
|
msg = mock_message(content="ping", channel="other", is_dm=False)
|
|
matches = manager.check_keywords(msg)
|
|
assert len(matches) == 0
|
|
|
|
def test_dm_allowed(self, cm_bot):
|
|
manager = make_manager(cm_bot)
|
|
msg = mock_message(content="ping", is_dm=True)
|
|
matches = manager.check_keywords(msg)
|
|
assert any(trigger == "ping" for trigger, _ in matches)
|
|
|
|
def test_help_routing(self, cm_bot):
|
|
manager = make_manager(cm_bot)
|
|
msg = mock_message(content="help", is_dm=True)
|
|
matches = manager.check_keywords(msg)
|
|
assert any(trigger == "help" for trigger, _ in matches)
|
|
|
|
|
|
class TestGetHelpForCommand:
|
|
"""Tests for command-specific help."""
|
|
|
|
def test_known_command_returns_help(self, cm_bot):
|
|
mock_cmd = MagicMock()
|
|
mock_cmd.keywords = ["wx"]
|
|
mock_cmd.get_help_text = Mock(return_value="Weather forecast info")
|
|
mock_cmd.dm_only = False
|
|
mock_cmd.requires_internet = False
|
|
manager = make_manager(cm_bot, commands={"wx": mock_cmd})
|
|
result = manager.get_help_for_command("wx")
|
|
# Translator receives help_text as kwarg, so it appears in the output
|
|
assert "Weather forecast info" in result
|
|
# Verify translator was called with the right key
|
|
cm_bot.translator.translate.assert_called_with(
|
|
"commands.help.specific", command="wx", help_text="Weather forecast info"
|
|
)
|
|
|
|
def test_unknown_command_returns_error(self, cm_bot):
|
|
manager = make_manager(cm_bot)
|
|
result = manager.get_help_for_command("nonexistent")
|
|
# Translator receives 'commands.help.unknown' key with command name
|
|
cm_bot.translator.translate.assert_called()
|
|
call_args = cm_bot.translator.translate.call_args
|
|
assert call_args[0][0] == "commands.help.unknown"
|
|
assert call_args[1]["command"] == "nonexistent"
|
|
|
|
|
|
class TestInternetStatusCache:
|
|
"""Tests for InternetStatusCache."""
|
|
|
|
def test_is_valid_fresh(self):
|
|
cache = InternetStatusCache(has_internet=True, timestamp=time.time())
|
|
assert cache.is_valid(30) is True
|
|
|
|
def test_is_valid_stale(self):
|
|
cache = InternetStatusCache(has_internet=True, timestamp=time.time() - 60)
|
|
assert cache.is_valid(30) is False
|
|
|
|
def test_get_lock_lazy_creation(self):
|
|
cache = InternetStatusCache(has_internet=True, timestamp=0)
|
|
assert cache._lock is None
|
|
lock1 = cache._get_lock()
|
|
lock2 = cache._get_lock()
|
|
assert lock1 is lock2
|
|
|
|
|
|
class TestSendChannelMessageListeners:
|
|
"""Tests for channel_sent_listeners invocation when bot sends a channel message."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_successful_send_invokes_listeners_with_synthetic_event(self, cm_bot, mock_logger):
|
|
"""When send_channel_message succeeds, each channel_sent_listener is called with event.payload shape (channel_idx, text)."""
|
|
import asyncio
|
|
from meshcore import EventType
|
|
|
|
cm_bot.connected = True
|
|
cm_bot.channel_manager = Mock()
|
|
cm_bot.channel_manager.get_channel_number = Mock(return_value=3)
|
|
cm_bot.meshcore = Mock()
|
|
cm_bot.meshcore.commands = Mock()
|
|
cm_bot.meshcore.commands.send_chan_msg = AsyncMock(return_value=Mock(type=EventType.MSG_SENT, payload=None))
|
|
cm_bot.bot_tx_rate_limiter.wait_for_tx = AsyncMock(return_value=None)
|
|
cm_bot.channel_sent_listeners = []
|
|
received = []
|
|
|
|
async def capture_listener(event, metadata=None):
|
|
received.append(getattr(event, 'payload', None))
|
|
|
|
cm_bot.channel_sent_listeners.append(capture_listener)
|
|
|
|
created_tasks = []
|
|
|
|
with patch("modules.command_manager.asyncio.create_task") as mock_create_task:
|
|
def capture_and_run(coro):
|
|
t = asyncio.get_event_loop().create_task(coro)
|
|
created_tasks.append(t)
|
|
return t
|
|
|
|
mock_create_task.side_effect = capture_and_run
|
|
|
|
manager = make_manager(cm_bot)
|
|
result = await manager.send_channel_message("general", "Hello mesh")
|
|
|
|
for t in created_tasks:
|
|
await t
|
|
|
|
assert result is True
|
|
assert len(received) == 1
|
|
assert received[0] == {"channel_idx": 3, "text": "TestBot: Hello mesh"}
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_failed_send_does_not_invoke_listeners(self, cm_bot):
|
|
"""When send_channel_message fails (e.g. channel not found), listeners are not called."""
|
|
cm_bot.connected = True
|
|
cm_bot.channel_manager = Mock()
|
|
cm_bot.channel_manager.get_channel_number = Mock(return_value=None)
|
|
cm_bot.channel_sent_listeners = []
|
|
received = []
|
|
|
|
async def capture_listener(event, metadata=None):
|
|
received.append(getattr(event, 'payload', None))
|
|
|
|
cm_bot.channel_sent_listeners.append(capture_listener)
|
|
|
|
manager = make_manager(cm_bot)
|
|
result = await manager.send_channel_message("nonexistent", "Hi")
|
|
|
|
assert result is False
|
|
assert len(received) == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_no_listeners_no_error(self, cm_bot):
|
|
"""When channel_sent_listeners is missing or empty, send_channel_message still returns success."""
|
|
from meshcore import EventType
|
|
|
|
cm_bot.connected = True
|
|
cm_bot.channel_manager = Mock()
|
|
cm_bot.channel_manager.get_channel_number = Mock(return_value=1)
|
|
cm_bot.meshcore = Mock()
|
|
cm_bot.meshcore.commands = Mock()
|
|
cm_bot.meshcore.commands.send_chan_msg = AsyncMock(return_value=Mock(type=EventType.MSG_SENT, payload=None))
|
|
cm_bot.bot_tx_rate_limiter.wait_for_tx = AsyncMock(return_value=None)
|
|
cm_bot.channel_sent_listeners = []
|
|
|
|
manager = make_manager(cm_bot)
|
|
result = await manager.send_channel_message("general", "Hi")
|
|
|
|
assert result is True
|
|
|
|
|
|
class TestSendChannelMessagesChunked:
|
|
"""Tests for send_channel_messages_chunked."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_empty_chunks_returns_true_without_send(self, cm_bot):
|
|
"""Empty chunks returns True and does not call send_channel_message."""
|
|
manager = make_manager(cm_bot)
|
|
manager.send_channel_message = AsyncMock(return_value=True)
|
|
result = await manager.send_channel_messages_chunked("general", [])
|
|
assert result is True
|
|
manager.send_channel_message.assert_not_called()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_single_chunk_calls_send_once_no_wait(self, cm_bot):
|
|
"""Single chunk calls send_channel_message once; no wait_for_tx or sleep."""
|
|
cm_bot.config.set("Bot", "bot_tx_rate_limit_seconds", "1.0")
|
|
manager = make_manager(cm_bot)
|
|
manager.send_channel_message = AsyncMock(return_value=True)
|
|
cm_bot.bot_tx_rate_limiter.wait_for_tx = AsyncMock(return_value=None)
|
|
|
|
with patch("modules.command_manager.asyncio.sleep", new_callable=AsyncMock) as mock_sleep:
|
|
result = await manager.send_channel_messages_chunked("general", ["only one"])
|
|
|
|
assert result is True
|
|
assert manager.send_channel_message.call_count == 1
|
|
cm_bot.bot_tx_rate_limiter.wait_for_tx.assert_not_called()
|
|
mock_sleep.assert_not_called()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_multiple_chunks_waits_and_sleeps_between(self, cm_bot):
|
|
"""Multiple chunks call send_channel_message per chunk; wait_for_tx and sleep between."""
|
|
cm_bot.config.set("Bot", "bot_tx_rate_limit_seconds", "1.0")
|
|
manager = make_manager(cm_bot)
|
|
manager.send_channel_message = AsyncMock(return_value=True)
|
|
cm_bot.bot_tx_rate_limiter.wait_for_tx = AsyncMock(return_value=None)
|
|
|
|
with patch("modules.command_manager.asyncio.sleep", new_callable=AsyncMock) as mock_sleep:
|
|
result = await manager.send_channel_messages_chunked("general", ["a", "b", "c"])
|
|
|
|
assert result is True
|
|
assert manager.send_channel_message.call_count == 3
|
|
assert cm_bot.bot_tx_rate_limiter.wait_for_tx.call_count == 2
|
|
assert mock_sleep.await_count == 2
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_chunked_first_uses_provided_rate_limit_args_subsequent_skip(self, cm_bot):
|
|
"""First chunk uses provided skip_user_rate_limit/rate_limit_key; subsequent use True/None."""
|
|
cm_bot.config.set("Bot", "bot_tx_rate_limit_seconds", "1.0")
|
|
manager = make_manager(cm_bot)
|
|
manager.send_channel_message = AsyncMock(return_value=True)
|
|
cm_bot.bot_tx_rate_limiter.wait_for_tx = AsyncMock(return_value=None)
|
|
|
|
with patch("modules.command_manager.asyncio.sleep", new_callable=AsyncMock):
|
|
await manager.send_channel_messages_chunked(
|
|
"general",
|
|
["first", "second"],
|
|
skip_user_rate_limit=False,
|
|
rate_limit_key="user123",
|
|
)
|
|
|
|
calls = manager.send_channel_message.call_args_list
|
|
assert len(calls) == 2
|
|
# First call: skip_user_rate_limit=False, rate_limit_key="user123"
|
|
assert calls[0][1]["skip_user_rate_limit"] is False
|
|
assert calls[0][1]["rate_limit_key"] == "user123"
|
|
# Second call: skip_user_rate_limit=True, rate_limit_key=None
|
|
assert calls[1][1]["skip_user_rate_limit"] is True
|
|
assert calls[1][1]["rate_limit_key"] is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_chunked_returns_false_on_first_send_failure(self, cm_bot):
|
|
"""When first send_channel_message returns False, chunked returns False and does not send rest."""
|
|
cm_bot.config.set("Bot", "bot_tx_rate_limit_seconds", "1.0")
|
|
manager = make_manager(cm_bot)
|
|
manager.send_channel_message = AsyncMock(side_effect=[False, True]) # first fails
|
|
cm_bot.bot_tx_rate_limiter.wait_for_tx = AsyncMock(return_value=None)
|
|
|
|
with patch("modules.command_manager.asyncio.sleep", new_callable=AsyncMock):
|
|
result = await manager.send_channel_messages_chunked("general", ["a", "b"])
|
|
|
|
assert result is False
|
|
manager.send_channel_message.assert_called_once()
|