diff --git a/tests/commands/test_cmd_command.py b/tests/commands/test_cmd_command.py index 28565c2..4a7c4a4 100644 --- a/tests/commands/test_cmd_command.py +++ b/tests/commands/test_cmd_command.py @@ -37,5 +37,31 @@ class TestCmdCommand: result = await cmd.execute(msg) assert result is True call_args = command_mock_bot.command_manager.send_response.call_args + assert call_args is not None response = call_args[0][1] assert "ping" in response or "help" in response or "cmd" in response + + def test_get_commands_list_truncation(self, command_mock_bot): + """Test that _get_commands_list truncates long lists with '(N more)' suffix.""" + import re + command_mock_bot.config.add_section("Cmd_Command") + command_mock_bot.config.set("Cmd_Command", "enabled", "true") + command_mock_bot.command_manager.keywords = {} + # Create 25 mock commands with long names to force truncation + commands = {} + for i in range(25): + name = f"longcommandname{i:02d}" + mock_cmd = type("MockCmd", (), {"keywords": [name]})() + commands[name] = mock_cmd + command_mock_bot.command_manager.commands = commands + cmd = CmdCommand(command_mock_bot) + # "Available commands: " = 20 chars; "longcommandnameNN" = 17 chars; ", " = 2 chars + # 3 commands fit in 75 chars; suffix " (22 more)" = 11 chars; total = 86 + # max_length=90 allows 3 commands + suffix, but not a 4th command + result = cmd._get_commands_list(max_length=90) + # Should contain truncation indicator + assert "more)" in result + # Should start with prefix + assert result.startswith("Available commands: ") + # Should NOT contain doubled numbers like "(5 5 more)" + assert not re.search(r'\(\d+ \d+ more\)', result) diff --git a/tests/commands/test_hello_command.py b/tests/commands/test_hello_command.py index a0cb1ac..43caf77 100644 --- a/tests/commands/test_hello_command.py +++ b/tests/commands/test_hello_command.py @@ -43,8 +43,7 @@ class TestHelloCommand: cmd = HelloCommand(command_mock_bot) with patch("modules.commands.hello_command.random.choice", side_effect=lambda x: x[0]): result = cmd.get_random_greeting() - assert result - assert isinstance(result, str) + assert isinstance(result, str) and len(result) > 0 def test_get_emoji_response_vulcan(self, command_mock_bot): command_mock_bot.config.add_section("Hello_Command") @@ -78,5 +77,6 @@ class TestHelloCommand: result = await cmd.execute(msg) assert result is True call_args = command_mock_bot.command_manager.send_response.call_args + assert call_args is not None response = call_args[0][1] - assert response + assert isinstance(response, str) and len(response) > 0 diff --git a/tests/commands/test_help_command.py b/tests/commands/test_help_command.py index 3611cff..4ac7d98 100644 --- a/tests/commands/test_help_command.py +++ b/tests/commands/test_help_command.py @@ -31,3 +31,5 @@ class TestHelpCommand: msg = mock_message(content="help", is_dm=True) result = await cmd.execute(msg) assert result is True + # Note: HelpCommand.execute() is a placeholder; actual help logic is in + # CommandManager.check_keywords(). Response sending is tested there. diff --git a/tests/commands/test_magic8_command.py b/tests/commands/test_magic8_command.py index 4470272..be31706 100644 --- a/tests/commands/test_magic8_command.py +++ b/tests/commands/test_magic8_command.py @@ -32,6 +32,7 @@ class TestMagic8Command: result = await cmd.execute(msg) assert result is True call_args = command_mock_bot.command_manager.send_response.call_args + assert call_args is not None response = call_args[0][1] assert "🎱" in response assert any(r in response for r in magic8_responses) @@ -45,5 +46,6 @@ class TestMagic8Command: result = await cmd.execute(msg) assert result is True call_args = command_mock_bot.command_manager.send_response.call_args + assert call_args is not None response = call_args[0][1] assert "Alice" in response or "@[" in response diff --git a/tests/commands/test_roll_command.py b/tests/commands/test_roll_command.py index 057285f..d66cad1 100644 --- a/tests/commands/test_roll_command.py +++ b/tests/commands/test_roll_command.py @@ -54,6 +54,7 @@ class TestRollCommandExecute: result = await cmd.execute(msg) assert result is True call_args = command_mock_bot.command_manager.send_response.call_args + assert call_args is not None response = call_args[0][1] # Should be a number between 1 and 100 nums = re.findall(r'\d+', response) @@ -70,6 +71,7 @@ class TestRollCommandExecute: result = await cmd.execute(msg) assert result is True call_args = command_mock_bot.command_manager.send_response.call_args + assert call_args is not None response = call_args[0][1] nums = re.findall(r'\d+', response) assert len(nums) >= 1 diff --git a/tests/test_channel_manager_logic.py b/tests/test_channel_manager_logic.py new file mode 100644 index 0000000..f21a6d1 --- /dev/null +++ b/tests/test_channel_manager_logic.py @@ -0,0 +1,80 @@ +"""Tests for ChannelManager pure logic (no meshcore device calls).""" + +import hashlib + +import pytest +from unittest.mock import Mock + +from modules.channel_manager import ChannelManager + + +@pytest.fixture +def cm(mock_logger): + """ChannelManager with mock bot for pure logic tests.""" + bot = Mock() + bot.logger = mock_logger + bot.db_manager = Mock() + bot.db_manager.db_path = "/dev/null" + bot.connected = False + bot.meshcore = Mock() + bot.meshcore.channels = {} + return ChannelManager(bot, max_channels=8) + + +class TestGenerateHashtagKey: + """Tests for generate_hashtag_key() static method.""" + + def test_deterministic(self): + key1 = ChannelManager.generate_hashtag_key("general") + key2 = ChannelManager.generate_hashtag_key("general") + assert key1 == key2 + assert len(key1) == 16 + + def test_prepends_hash_if_missing(self): + key_without = ChannelManager.generate_hashtag_key("general") + key_with = ChannelManager.generate_hashtag_key("#general") + assert key_without == key_with + + def test_known_value(self): + """Verify against independently computed SHA256.""" + expected = hashlib.sha256(b"#longfast").digest()[:16] + result = ChannelManager.generate_hashtag_key("#LongFast") + assert result == expected + + +class TestChannelNameLookup: + """Tests for get_channel_name().""" + + def test_cached_channel_name(self, cm): + cm._channels_cache = {0: {"channel_name": "general"}} + cm._cache_valid = True + assert cm.get_channel_name(0) == "general" + + def test_not_cached_returns_fallback(self, cm): + cm._channels_cache = {} + cm._cache_valid = True + result = cm.get_channel_name(99) + assert "99" in result + + +class TestChannelNumberLookup: + """Tests for get_channel_number().""" + + def test_found_by_name(self, cm): + cm._channels_cache = {0: {"channel_name": "general"}, 1: {"channel_name": "test"}} + cm._cache_valid = True + assert cm.get_channel_number("test") == 1 + + def test_case_insensitive(self, cm): + cm._channels_cache = {0: {"channel_name": "General"}} + cm._cache_valid = True + assert cm.get_channel_number("general") == 0 + + +class TestCacheManagement: + """Tests for cache invalidation.""" + + def test_invalidate_cache(self, cm): + cm._cache_valid = True + cm.invalidate_cache() + assert cm._cache_valid is False diff --git a/tests/test_command_manager.py b/tests/test_command_manager.py new file mode 100644 index 0000000..d9200c5 --- /dev/null +++ b/tests/test_command_manager.py @@ -0,0 +1,274 @@ +"""Tests for modules.command_manager.""" + +import time + +import pytest +from configparser import ConfigParser +from unittest.mock import Mock, MagicMock, patch, AsyncMock + +from modules.command_manager import CommandManager, InternetStatusCache +from modules.models import MeshMessage +from tests.conftest import mock_message + + +@pytest.fixture +def cm_bot(mock_logger): + """Mock bot for CommandManager tests.""" + bot = Mock() + bot.logger = mock_logger + bot.config = ConfigParser() + bot.config.add_section("Bot") + bot.config.set("Bot", "bot_name", "TestBot") + bot.config.add_section("Channels") + bot.config.set("Channels", "monitor_channels", "general,test") + bot.config.set("Channels", "respond_to_dms", "true") + bot.config.add_section("Keywords") + bot.config.set("Keywords", "ping", "Pong!") + bot.config.set("Keywords", "test", "ack") + bot.translator = Mock() + # Translator returns "key: kwarg_values" so assertions can check content + bot.translator.translate = Mock( + side_effect=lambda key, **kw: f"{key}: {' '.join(str(v) for v in kw.values())}" + ) + bot.meshcore = None + bot.rate_limiter = Mock() + bot.rate_limiter.can_send = Mock(return_value=True) + bot.bot_tx_rate_limiter = Mock() + bot.bot_tx_rate_limiter.wait_for_tx = Mock() + bot.tx_delay_ms = 0 + return bot + + +def make_manager(bot, commands=None): + """Create CommandManager with mocked PluginLoader.""" + with patch("modules.command_manager.PluginLoader") as mock_loader_class: + mock_loader = Mock() + mock_loader.load_all_plugins = Mock(return_value=commands or {}) + mock_loader_class.return_value = mock_loader + return CommandManager(bot) + + +class TestLoadKeywords: + """Tests for keyword loading from config.""" + + def test_load_keywords_from_config(self, cm_bot): + manager = make_manager(cm_bot) + assert manager.keywords["ping"] == "Pong!" + assert manager.keywords["test"] == "ack" + + def test_load_keywords_strips_quotes(self, cm_bot): + cm_bot.config.set("Keywords", "quoted", '"Hello World"') + manager = make_manager(cm_bot) + assert manager.keywords["quoted"] == "Hello World" + + def test_load_keywords_decodes_escapes(self, cm_bot): + cm_bot.config.set("Keywords", "multiline", r"Line1\nLine2") + manager = make_manager(cm_bot) + assert "\n" in manager.keywords["multiline"] + + def test_load_keywords_empty_section(self, cm_bot): + cm_bot.config.remove_section("Keywords") + cm_bot.config.add_section("Keywords") + manager = make_manager(cm_bot) + assert manager.keywords == {} + + +class TestLoadBannedUsers: + """Tests for banned users loading.""" + + def test_load_banned_users_from_config(self, cm_bot): + cm_bot.config.add_section("Banned_Users") + cm_bot.config.set("Banned_Users", "banned_users", "BadUser1, BadUser2") + manager = make_manager(cm_bot) + assert "BadUser1" in manager.banned_users + assert "BadUser2" in manager.banned_users + + def test_load_banned_users_empty(self, cm_bot): + manager = make_manager(cm_bot) + assert manager.banned_users == [] + + def test_load_banned_users_whitespace_handling(self, cm_bot): + cm_bot.config.add_section("Banned_Users") + cm_bot.config.set("Banned_Users", "banned_users", " user1 , user2 ") + manager = make_manager(cm_bot) + assert "user1" in manager.banned_users + assert "user2" in manager.banned_users + + +class TestIsUserBanned: + """Tests for ban checking logic.""" + + def test_exact_match(self, cm_bot): + cm_bot.config.add_section("Banned_Users") + cm_bot.config.set("Banned_Users", "banned_users", "BadUser") + manager = make_manager(cm_bot) + assert manager.is_user_banned("BadUser") is True + + def test_prefix_match(self, cm_bot): + cm_bot.config.add_section("Banned_Users") + cm_bot.config.set("Banned_Users", "banned_users", "BadUser") + manager = make_manager(cm_bot) + assert manager.is_user_banned("BadUser 123") is True + + def test_no_match(self, cm_bot): + cm_bot.config.add_section("Banned_Users") + cm_bot.config.set("Banned_Users", "banned_users", "BadUser") + manager = make_manager(cm_bot) + assert manager.is_user_banned("GoodUser") is False + + def test_none_sender(self, cm_bot): + manager = make_manager(cm_bot) + assert manager.is_user_banned(None) is False + + +class TestChannelTriggerAllowed: + """Tests for _is_channel_trigger_allowed.""" + + def test_dm_always_allowed(self, cm_bot): + cm_bot.config.set("Channels", "channel_keywords", "ping") + manager = make_manager(cm_bot) + msg = mock_message(content="wx", is_dm=True) + assert manager._is_channel_trigger_allowed("wx", msg) is True + + def test_none_whitelist_allows_all(self, cm_bot): + manager = make_manager(cm_bot) + assert manager.channel_keywords is None + msg = mock_message(content="anything", channel="general", is_dm=False) + assert manager._is_channel_trigger_allowed("anything", msg) is True + + def test_whitelist_allows_listed(self, cm_bot): + cm_bot.config.set("Channels", "channel_keywords", "ping, help") + manager = make_manager(cm_bot) + msg = mock_message(content="ping", channel="general", is_dm=False) + assert manager._is_channel_trigger_allowed("ping", msg) is True + + def test_whitelist_blocks_unlisted(self, cm_bot): + cm_bot.config.set("Channels", "channel_keywords", "ping, help") + manager = make_manager(cm_bot) + msg = mock_message(content="wx", channel="general", is_dm=False) + assert manager._is_channel_trigger_allowed("wx", msg) is False + + +class TestLoadMonitorChannels: + """Tests for monitor channels loading.""" + + def test_load_monitor_channels(self, cm_bot): + manager = make_manager(cm_bot) + assert "general" in manager.monitor_channels + assert "test" in manager.monitor_channels + assert len(manager.monitor_channels) == 2 + + def test_load_monitor_channels_empty(self, cm_bot): + cm_bot.config.set("Channels", "monitor_channels", "") + manager = make_manager(cm_bot) + assert manager.monitor_channels == [] + + +class TestLoadChannelKeywords: + """Tests for channel keyword whitelist loading.""" + + def test_load_channel_keywords_returns_list(self, cm_bot): + cm_bot.config.set("Channels", "channel_keywords", "ping, wx, help") + manager = make_manager(cm_bot) + assert isinstance(manager.channel_keywords, list) + assert "ping" in manager.channel_keywords + assert "wx" in manager.channel_keywords + assert "help" in manager.channel_keywords + + def test_load_channel_keywords_empty_returns_none(self, cm_bot): + cm_bot.config.set("Channels", "channel_keywords", "") + manager = make_manager(cm_bot) + assert manager.channel_keywords is None + + def test_load_channel_keywords_not_set_returns_none(self, cm_bot): + manager = make_manager(cm_bot) + assert manager.channel_keywords is None + + +class TestCheckKeywords: + """Tests for check_keywords() message matching.""" + + def test_exact_keyword_match(self, cm_bot): + manager = make_manager(cm_bot) + msg = mock_message(content="ping", channel="general", is_dm=False) + matches = manager.check_keywords(msg) + assert any(trigger == "ping" for trigger, _ in matches) + + def test_prefix_required_blocks_bare_keyword(self, cm_bot): + cm_bot.config.set("Bot", "command_prefix", "!") + manager = make_manager(cm_bot) + msg = mock_message(content="ping", channel="general", is_dm=False) + matches = manager.check_keywords(msg) + assert len(matches) == 0 + + def test_prefix_matches(self, cm_bot): + cm_bot.config.set("Bot", "command_prefix", "!") + manager = make_manager(cm_bot) + msg = mock_message(content="!ping", channel="general", is_dm=False) + matches = manager.check_keywords(msg) + assert any(trigger == "ping" for trigger, _ in matches) + + def test_wrong_channel_no_match(self, cm_bot): + manager = make_manager(cm_bot) + msg = mock_message(content="ping", channel="other", is_dm=False) + matches = manager.check_keywords(msg) + assert len(matches) == 0 + + def test_dm_allowed(self, cm_bot): + manager = make_manager(cm_bot) + msg = mock_message(content="ping", is_dm=True) + matches = manager.check_keywords(msg) + assert any(trigger == "ping" for trigger, _ in matches) + + def test_help_routing(self, cm_bot): + manager = make_manager(cm_bot) + msg = mock_message(content="help", is_dm=True) + matches = manager.check_keywords(msg) + assert any(trigger == "help" for trigger, _ in matches) + + +class TestGetHelpForCommand: + """Tests for command-specific help.""" + + def test_known_command_returns_help(self, cm_bot): + mock_cmd = MagicMock() + mock_cmd.keywords = ["wx"] + mock_cmd.get_help_text = Mock(return_value="Weather forecast info") + mock_cmd.dm_only = False + mock_cmd.requires_internet = False + manager = make_manager(cm_bot, commands={"wx": mock_cmd}) + result = manager.get_help_for_command("wx") + # Translator receives help_text as kwarg, so it appears in the output + assert "Weather forecast info" in result + # Verify translator was called with the right key + cm_bot.translator.translate.assert_called_with( + "commands.help.specific", command="wx", help_text="Weather forecast info" + ) + + def test_unknown_command_returns_error(self, cm_bot): + manager = make_manager(cm_bot) + result = manager.get_help_for_command("nonexistent") + # Translator receives 'commands.help.unknown' key with command name + cm_bot.translator.translate.assert_called() + call_args = cm_bot.translator.translate.call_args + assert call_args[0][0] == "commands.help.unknown" + assert call_args[1]["command"] == "nonexistent" + + +class TestInternetStatusCache: + """Tests for InternetStatusCache.""" + + def test_is_valid_fresh(self): + cache = InternetStatusCache(has_internet=True, timestamp=time.time()) + assert cache.is_valid(30) is True + + def test_is_valid_stale(self): + cache = InternetStatusCache(has_internet=True, timestamp=time.time() - 60) + assert cache.is_valid(30) is False + + def test_get_lock_lazy_creation(self): + cache = InternetStatusCache(has_internet=True, timestamp=0) + assert cache._lock is None + lock1 = cache._get_lock() + lock2 = cache._get_lock() + assert lock1 is lock2 diff --git a/tests/test_db_manager.py b/tests/test_db_manager.py new file mode 100644 index 0000000..582f172 --- /dev/null +++ b/tests/test_db_manager.py @@ -0,0 +1,182 @@ +"""Tests for modules.db_manager.""" + +import sqlite3 +import json + +import pytest +from unittest.mock import Mock + +from modules.db_manager import DBManager + + +@pytest.fixture +def db(mock_logger, tmp_path): + """File-based DBManager for testing. _init_database() auto-creates core tables.""" + bot = Mock() + bot.logger = mock_logger + return DBManager(bot, str(tmp_path / "test.db")) + + +class TestGeocoding: + """Tests for geocoding cache.""" + + def test_cache_and_retrieve_geocoding(self, db): + db.cache_geocoding("Seattle, WA", 47.6062, -122.3321) + lat, lon = db.get_cached_geocoding("Seattle, WA") + assert abs(lat - 47.6062) < 0.001 + assert abs(lon - (-122.3321)) < 0.001 + + def test_get_cached_geocoding_miss(self, db): + lat, lon = db.get_cached_geocoding("Nonexistent City") + assert lat is None + assert lon is None + + def test_cache_geocoding_overwrites_existing(self, db): + db.cache_geocoding("Test", 10.0, 20.0) + db.cache_geocoding("Test", 30.0, 40.0) + lat, lon = db.get_cached_geocoding("Test") + assert abs(lat - 30.0) < 0.001 + assert abs(lon - 40.0) < 0.001 + + def test_cache_geocoding_invalid_hours_logged(self, db): + """Invalid cache_hours is caught and logged, not raised.""" + db.cache_geocoding("Test", 10.0, 20.0, cache_hours=0) + db.bot.logger.error.assert_called() + # Verify it did not store anything + lat, lon = db.get_cached_geocoding("Test") + assert lat is None + + +class TestGenericCache: + """Tests for generic cache.""" + + def test_cache_and_retrieve_value(self, db): + db.cache_value("weather_key", "sunny", "weather") + result = db.get_cached_value("weather_key", "weather") + assert result == "sunny" + + def test_get_cached_value_miss(self, db): + assert db.get_cached_value("nonexistent", "any") is None + + def test_different_keys_stored_independently(self, db): + db.cache_value("key_a", "value_a", "weather") + db.cache_value("key_b", "value_b", "weather") + assert db.get_cached_value("key_a", "weather") == "value_a" + assert db.get_cached_value("key_b", "weather") == "value_b" + + def test_cache_json_round_trip(self, db): + data = {"temp": 72, "conditions": "clear", "nested": {"wind": 5}} + db.cache_json("forecast", data, "weather") + result = db.get_cached_json("forecast", "weather") + assert result == data + + def test_get_cached_json_invalid_json(self, db): + """Manually insert invalid JSON; get_cached_json returns None.""" + with sqlite3.connect(str(db.db_path)) as conn: + conn.execute( + "INSERT INTO generic_cache (cache_key, cache_value, cache_type, expires_at) " + "VALUES (?, ?, ?, datetime('now', '+24 hours'))", + ("bad_json", "not{valid}json", "test"), + ) + conn.commit() + assert db.get_cached_json("bad_json", "test") is None + + +class TestCacheCleanup: + """Tests for cache expiry cleanup.""" + + def test_cleanup_expired_deletes_old(self, db): + db.cache_value("old_key", "old_val", "test") + # Manually set expires_at to the past + with sqlite3.connect(str(db.db_path)) as conn: + conn.execute( + "UPDATE generic_cache SET expires_at = datetime('now', '-1 hours') " + "WHERE cache_key = 'old_key'" + ) + conn.commit() + db.cleanup_expired_cache() + assert db.get_cached_value("old_key", "test") is None + + def test_cleanup_expired_preserves_valid(self, db): + db.cache_value("fresh_key", "fresh_val", "test", cache_hours=720) + db.cleanup_expired_cache() + assert db.get_cached_value("fresh_key", "test") == "fresh_val" + + +class TestTableManagement: + """Tests for table creation whitelist.""" + + def test_create_table_allowed(self, db): + db.create_table( + "greeted_users", + "id INTEGER PRIMARY KEY, name TEXT NOT NULL", + ) + with sqlite3.connect(str(db.db_path)) as conn: + cursor = conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='greeted_users'" + ) + assert cursor.fetchone() is not None + + def test_create_table_disallowed_raises(self, db): + with pytest.raises(ValueError, match="not in allowed tables"): + db.create_table("not_allowed", "id INTEGER PRIMARY KEY") + + def test_create_table_sql_injection_name_raises(self, db): + with pytest.raises(ValueError): + db.create_table("DROP TABLE users; --", "id INTEGER PRIMARY KEY") + + +class TestExecuteQuery: + """Tests for raw query execution.""" + + def test_execute_query_returns_dicts(self, db): + db.set_metadata("test_key", "test_value") + rows = db.execute_query("SELECT * FROM bot_metadata WHERE key = ?", ("test_key",)) + assert len(rows) == 1 + assert rows[0]["key"] == "test_key" + assert rows[0]["value"] == "test_value" + + def test_execute_update_returns_rowcount(self, db): + db.set_metadata("del_key", "del_value") + count = db.execute_update( + "DELETE FROM bot_metadata WHERE key = ?", ("del_key",) + ) + assert count == 1 + + +class TestMetadata: + """Tests for bot metadata storage.""" + + def test_set_and_get_metadata(self, db): + db.set_metadata("version", "1.2.3") + assert db.get_metadata("version") == "1.2.3" + + def test_get_metadata_miss(self, db): + assert db.get_metadata("nonexistent") is None + + def test_bot_start_time_round_trip(self, db): + ts = 1234567890.5 + db.set_bot_start_time(ts) + assert db.get_bot_start_time() == ts + + +class TestCacheHoursValidation: + """Tests for cache_hours boundary validation.""" + + def test_boundary_values(self, db): + # Valid boundaries + db.cache_value("k1", "v1", "t", cache_hours=1) + assert db.get_cached_value("k1", "t") == "v1" + + db.cache_value("k2", "v2", "t", cache_hours=87600) + assert db.get_cached_value("k2", "t") == "v2" + + # Invalid boundaries — caught and logged, not stored + db.cache_value("k3", "v3", "t", cache_hours=0) + db.bot.logger.error.assert_called() + assert db.get_cached_value("k3", "t") is None + + db.bot.logger.error.reset_mock() + db.cache_value("k4", "v4", "t", cache_hours=87601) + db.bot.logger.error.assert_called() + assert db.get_cached_value("k4", "t") is None diff --git a/tests/test_feed_manager_formatting.py b/tests/test_feed_manager_formatting.py new file mode 100644 index 0000000..a6c4bc2 --- /dev/null +++ b/tests/test_feed_manager_formatting.py @@ -0,0 +1,150 @@ +"""Tests for FeedManager pure formatting and filtering logic.""" + +import json +from datetime import datetime, timezone, timedelta + +import pytest +from configparser import ConfigParser +from unittest.mock import Mock + +from modules.feed_manager import FeedManager + + +@pytest.fixture +def fm(mock_logger): + """FeedManager with disabled networking for pure-logic tests.""" + bot = Mock() + bot.logger = mock_logger + bot.config = ConfigParser() + bot.config.add_section("Feed_Manager") + bot.config.set("Feed_Manager", "feed_manager_enabled", "false") + bot.config.set("Feed_Manager", "max_message_length", "200") + bot.db_manager = Mock() + bot.db_manager.db_path = "/dev/null" + return FeedManager(bot) + + +class TestApplyShortening: + """Tests for _apply_shortening().""" + + def test_truncate_short_text_unchanged(self, fm): + assert fm._apply_shortening("hello", "truncate:20") == "hello" + + def test_truncate_long_text_adds_ellipsis(self, fm): + result = fm._apply_shortening("Hello World", "truncate:5") + assert result == "Hello..." + + def test_word_wrap_breaks_at_boundary(self, fm): + result = fm._apply_shortening("Hello beautiful world", "word_wrap:15") + # word_wrap truncates at a word boundary and appends "..." + # "Hello beautiful world"[:15] = "Hello beautiful", last space at 5 (too early), + # so result is "Hello beautiful..." (truncated at 15 chars + ellipsis) + assert result.endswith("...") + # The base text (without ellipsis) should be <= the wrap limit + assert len(result.rstrip(".")) <= 15 or result == "Hello beautiful..." + + def test_first_words_limits_count(self, fm): + result = fm._apply_shortening("one two three four", "first_words:2") + assert result.startswith("one two") + + def test_regex_extracts_group(self, fm): + result = fm._apply_shortening("Price: $42.99 today", r"regex:Price: \$(\d+\.\d+)") + assert result == "42.99" + + def test_if_regex_returns_then_on_match(self, fm): + result = fm._apply_shortening("open", "if_regex:open:YES:NO") + assert result == "YES" + + def test_if_regex_returns_else_on_no_match(self, fm): + result = fm._apply_shortening("closed", "if_regex:open:YES:NO") + assert result == "NO" + + def test_empty_text_returns_empty(self, fm): + assert fm._apply_shortening("", "truncate:10") == "" + + +class TestGetNestedValue: + """Tests for _get_nested_value().""" + + def test_simple_field_access(self, fm): + assert fm._get_nested_value({"name": "test"}, "name") == "test" + + def test_nested_field_access(self, fm): + data = {"raw": {"Priority": "high"}} + assert fm._get_nested_value(data, "raw.Priority") == "high" + + def test_missing_field_returns_default(self, fm): + assert fm._get_nested_value({}, "missing") == "" + assert fm._get_nested_value({}, "missing", "N/A") == "N/A" + + +class TestShouldSendItem: + """Tests for _should_send_item() filter evaluation.""" + + def test_no_filter_sends_all(self, fm): + feed = {"id": 1} + item = {"raw": {"Priority": "low"}} + assert fm._should_send_item(feed, item) is True + + def test_equals_filter_matches(self, fm): + feed = { + "id": 1, + "filter_config": json.dumps({ + "conditions": [ + {"field": "Priority", "operator": "equals", "value": "high"} + ] + }), + } + item = {"raw": {"Priority": "high"}} + assert fm._should_send_item(feed, item) is True + + def test_equals_filter_rejects(self, fm): + feed = { + "id": 1, + "filter_config": json.dumps({ + "conditions": [ + {"field": "Priority", "operator": "equals", "value": "high"} + ] + }), + } + item = {"raw": {"Priority": "low"}} + assert fm._should_send_item(feed, item) is False + + def test_in_filter_matches(self, fm): + feed = { + "id": 1, + "filter_config": json.dumps({ + "conditions": [ + {"field": "Priority", "operator": "in", "values": ["high", "highest"]} + ] + }), + } + item = {"raw": {"Priority": "highest"}} + assert fm._should_send_item(feed, item) is True + + def test_and_logic_all_must_pass(self, fm): + feed = { + "id": 1, + "filter_config": json.dumps({ + "conditions": [ + {"field": "Priority", "operator": "equals", "value": "high"}, + {"field": "Status", "operator": "equals", "value": "open"}, + ], + "logic": "AND", + }), + } + # First condition passes, second fails + item = {"raw": {"Priority": "high", "Status": "closed"}} + assert fm._should_send_item(feed, item) is False + + +class TestFormatTimestamp: + """Tests for _format_timestamp().""" + + def test_recent_timestamp(self, fm): + five_min_ago = datetime.now(timezone.utc) - timedelta(minutes=5) + result = fm._format_timestamp(five_min_ago) + assert "5m ago" in result + + def test_none_returns_empty(self, fm): + assert fm._format_timestamp(None) == "" diff --git a/tests/test_plugin_loader.py b/tests/test_plugin_loader.py new file mode 100644 index 0000000..4c8de4d --- /dev/null +++ b/tests/test_plugin_loader.py @@ -0,0 +1,161 @@ +"""Tests for modules.plugin_loader.""" + +import pytest +from unittest.mock import Mock, MagicMock, AsyncMock + +from modules.plugin_loader import PluginLoader +from modules.commands.base_command import BaseCommand + + +@pytest.fixture +def loader_bot(mock_logger, minimal_config): + """Mock bot for PluginLoader tests.""" + bot = MagicMock() + bot.logger = mock_logger + bot.config = minimal_config + bot.translator = MagicMock() + bot.translator.translate = Mock(side_effect=lambda k, **kw: k) + bot.translator.get_value = Mock(return_value=None) + bot.command_manager = MagicMock() + bot.command_manager.monitor_channels = ["general"] + bot.command_manager.send_response = AsyncMock(return_value=True) + bot.meshcore = None + return bot + + +def _load_and_register(loader, plugin_file): + """Load a plugin and register it in the loader's internal state (like load_all_plugins does).""" + instance = loader.load_plugin(plugin_file) + if instance: + metadata = instance.get_metadata() + name = metadata["name"] + loader.loaded_plugins[name] = instance + loader.plugin_metadata[name] = metadata + loader._build_keyword_mappings(name, metadata) + return instance + + +class TestDiscover: + """Tests for plugin discovery.""" + + def test_discover_plugins_finds_command_files(self, loader_bot): + loader = PluginLoader(loader_bot) + plugins = loader.discover_plugins() + assert isinstance(plugins, list) + assert len(plugins) > 0 + # Should find well-known commands + assert "ping_command" in plugins + assert "help_command" in plugins + + def test_discover_plugins_excludes_base_and_init(self, loader_bot): + loader = PluginLoader(loader_bot) + plugins = loader.discover_plugins() + assert "__init__" not in plugins + assert "base_command" not in plugins + + def test_discover_alternative_plugins_empty_when_no_dir(self, loader_bot, tmp_path): + loader = PluginLoader(loader_bot, commands_dir=str(tmp_path / "nonexistent")) + result = loader.discover_alternative_plugins() + assert result == [] + + +class TestValidatePlugin: + """Tests for plugin class validation.""" + + def test_validate_missing_execute(self, loader_bot): + loader = PluginLoader(loader_bot) + + class NoExecute: + name = "test" + keywords = ["test"] + + errors = loader._validate_plugin(NoExecute) + assert any("execute" in e.lower() for e in errors) + + def test_validate_sync_execute(self, loader_bot): + loader = PluginLoader(loader_bot) + + class SyncExecute: + name = "test" + keywords = ["test"] + + def execute(self, message): + return True + + errors = loader._validate_plugin(SyncExecute) + assert any("async" in e.lower() for e in errors) + + def test_validate_valid_class(self, loader_bot): + loader = PluginLoader(loader_bot) + + class ValidCommand: + name = "test" + keywords = ["test"] + + async def execute(self, message): + return True + + errors = loader._validate_plugin(ValidCommand) + assert len(errors) == 0 + + +class TestLoadPlugin: + """Tests for loading individual plugins.""" + + def test_load_ping_command(self, loader_bot): + loader = PluginLoader(loader_bot) + plugin = loader.load_plugin("ping_command") + assert plugin is not None + assert isinstance(plugin, BaseCommand) + assert plugin.name == "ping" + + def test_load_nonexistent_returns_none(self, loader_bot): + loader = PluginLoader(loader_bot) + plugin = loader.load_plugin("totally_nonexistent_command") + assert plugin is None + assert "totally_nonexistent_command" in loader._failed_plugins + + +class TestKeywordLookup: + """Tests for keyword-based plugin lookup after registration.""" + + def test_get_plugin_by_keyword(self, loader_bot): + loader = PluginLoader(loader_bot) + _load_and_register(loader, "ping_command") + result = loader.get_plugin_by_keyword("ping") + assert result is not None + assert result.name == "ping" + + def test_get_plugin_by_keyword_miss(self, loader_bot): + loader = PluginLoader(loader_bot) + assert loader.get_plugin_by_keyword("nonexistent") is None + + def test_get_plugin_by_name(self, loader_bot): + loader = PluginLoader(loader_bot) + _load_and_register(loader, "ping_command") + result = loader.get_plugin_by_name("ping") + assert result is not None + assert result.name == "ping" + + +class TestCategoryAndFailed: + """Tests for category filtering and failed plugin tracking.""" + + def test_get_plugins_by_category(self, loader_bot): + loader = PluginLoader(loader_bot) + _load_and_register(loader, "ping_command") + _load_and_register(loader, "help_command") + # Ping and help are in the "basic" category + result = loader.get_plugins_by_category("basic") + assert isinstance(result, dict) + assert "ping" in result or "help" in result + + def test_get_failed_plugins_returns_copy(self, loader_bot): + loader = PluginLoader(loader_bot) + loader.load_plugin("nonexistent_command") + failed = loader.get_failed_plugins() + assert isinstance(failed, dict) + assert "nonexistent_command" in failed + # Mutating the return should not affect internal state + failed.clear() + assert len(loader.get_failed_plugins()) > 0 diff --git a/tests/test_scheduler_logic.py b/tests/test_scheduler_logic.py new file mode 100644 index 0000000..2fe85de --- /dev/null +++ b/tests/test_scheduler_logic.py @@ -0,0 +1,79 @@ +"""Tests for MessageScheduler pure logic (no threading, no asyncio).""" + +import pytest +from configparser import ConfigParser +from unittest.mock import Mock + +from modules.scheduler import MessageScheduler + + +@pytest.fixture +def scheduler(mock_logger): + """MessageScheduler with mock bot for pure logic tests.""" + bot = Mock() + bot.logger = mock_logger + bot.config = ConfigParser() + bot.config.add_section("Bot") + return MessageScheduler(bot) + + +class TestIsValidTimeFormat: + """Tests for _is_valid_time_format().""" + + def test_valid_time_0000(self, scheduler): + assert scheduler._is_valid_time_format("0000") is True + + def test_valid_time_2359(self, scheduler): + assert scheduler._is_valid_time_format("2359") is True + + def test_valid_time_1200(self, scheduler): + assert scheduler._is_valid_time_format("1200") is True + + def test_invalid_time_2400(self, scheduler): + assert scheduler._is_valid_time_format("2400") is False + + def test_invalid_time_0060(self, scheduler): + assert scheduler._is_valid_time_format("0060") is False + + def test_invalid_time_short(self, scheduler): + assert scheduler._is_valid_time_format("123") is False + + def test_invalid_time_letters(self, scheduler): + assert scheduler._is_valid_time_format("abcd") is False + + def test_invalid_time_empty(self, scheduler): + assert scheduler._is_valid_time_format("") is False + + +class TestGetCurrentTime: + """Tests for timezone-aware time retrieval.""" + + def test_valid_timezone(self, scheduler): + scheduler.bot.config.set("Bot", "timezone", "US/Pacific") + result = scheduler.get_current_time() + assert result.tzinfo is not None + + def test_invalid_timezone_falls_back(self, scheduler): + scheduler.bot.config.set("Bot", "timezone", "Invalid/Zone") + result = scheduler.get_current_time() + # Should still return a datetime (system time fallback) + assert result is not None + scheduler.bot.logger.warning.assert_called() + + def test_empty_timezone_uses_system(self, scheduler): + scheduler.bot.config.set("Bot", "timezone", "") + result = scheduler.get_current_time() + assert result is not None + + +class TestHasMeshInfoPlaceholders: + """Tests for _has_mesh_info_placeholders().""" + + def test_detects_placeholder(self, scheduler): + assert scheduler._has_mesh_info_placeholders("Contacts: {total_contacts}") is True + + def test_no_placeholder_returns_false(self, scheduler): + assert scheduler._has_mesh_info_placeholders("Hello world") is False + + def test_detects_legacy_placeholder(self, scheduler): + assert scheduler._has_mesh_info_placeholders("Repeaters: {repeaters}") is True