Enhance command tests with additional assertions and new truncation test

- Added assertions to ensure call arguments are not None in multiple command tests.
- Introduced a new test for the CmdCommand class to verify that long command lists are truncated correctly with a '(N more)' suffix, improving command list readability.
This commit is contained in:
agessaman
2026-02-13 15:45:29 -08:00
parent 098ae7814b
commit 8cf7348321
11 changed files with 961 additions and 3 deletions

View File

@@ -37,5 +37,31 @@ class TestCmdCommand:
result = await cmd.execute(msg)
assert result is True
call_args = command_mock_bot.command_manager.send_response.call_args
assert call_args is not None
response = call_args[0][1]
assert "ping" in response or "help" in response or "cmd" in response
def test_get_commands_list_truncation(self, command_mock_bot):
"""Test that _get_commands_list truncates long lists with '(N more)' suffix."""
import re
command_mock_bot.config.add_section("Cmd_Command")
command_mock_bot.config.set("Cmd_Command", "enabled", "true")
command_mock_bot.command_manager.keywords = {}
# Create 25 mock commands with long names to force truncation
commands = {}
for i in range(25):
name = f"longcommandname{i:02d}"
mock_cmd = type("MockCmd", (), {"keywords": [name]})()
commands[name] = mock_cmd
command_mock_bot.command_manager.commands = commands
cmd = CmdCommand(command_mock_bot)
# "Available commands: " = 20 chars; "longcommandnameNN" = 17 chars; ", " = 2 chars
# 3 commands fit in 75 chars; suffix " (22 more)" = 11 chars; total = 86
# max_length=90 allows 3 commands + suffix, but not a 4th command
result = cmd._get_commands_list(max_length=90)
# Should contain truncation indicator
assert "more)" in result
# Should start with prefix
assert result.startswith("Available commands: ")
# Should NOT contain doubled numbers like "(5 5 more)"
assert not re.search(r'\(\d+ \d+ more\)', result)

View File

@@ -43,8 +43,7 @@ class TestHelloCommand:
cmd = HelloCommand(command_mock_bot)
with patch("modules.commands.hello_command.random.choice", side_effect=lambda x: x[0]):
result = cmd.get_random_greeting()
assert result
assert isinstance(result, str)
assert isinstance(result, str) and len(result) > 0
def test_get_emoji_response_vulcan(self, command_mock_bot):
command_mock_bot.config.add_section("Hello_Command")
@@ -78,5 +77,6 @@ class TestHelloCommand:
result = await cmd.execute(msg)
assert result is True
call_args = command_mock_bot.command_manager.send_response.call_args
assert call_args is not None
response = call_args[0][1]
assert response
assert isinstance(response, str) and len(response) > 0

View File

@@ -31,3 +31,5 @@ class TestHelpCommand:
msg = mock_message(content="help", is_dm=True)
result = await cmd.execute(msg)
assert result is True
# Note: HelpCommand.execute() is a placeholder; actual help logic is in
# CommandManager.check_keywords(). Response sending is tested there.

View File

@@ -32,6 +32,7 @@ class TestMagic8Command:
result = await cmd.execute(msg)
assert result is True
call_args = command_mock_bot.command_manager.send_response.call_args
assert call_args is not None
response = call_args[0][1]
assert "🎱" in response
assert any(r in response for r in magic8_responses)
@@ -45,5 +46,6 @@ class TestMagic8Command:
result = await cmd.execute(msg)
assert result is True
call_args = command_mock_bot.command_manager.send_response.call_args
assert call_args is not None
response = call_args[0][1]
assert "Alice" in response or "@[" in response

View File

@@ -54,6 +54,7 @@ class TestRollCommandExecute:
result = await cmd.execute(msg)
assert result is True
call_args = command_mock_bot.command_manager.send_response.call_args
assert call_args is not None
response = call_args[0][1]
# Should be a number between 1 and 100
nums = re.findall(r'\d+', response)
@@ -70,6 +71,7 @@ class TestRollCommandExecute:
result = await cmd.execute(msg)
assert result is True
call_args = command_mock_bot.command_manager.send_response.call_args
assert call_args is not None
response = call_args[0][1]
nums = re.findall(r'\d+', response)
assert len(nums) >= 1

View File

@@ -0,0 +1,80 @@
"""Tests for ChannelManager pure logic (no meshcore device calls)."""
import hashlib
import pytest
from unittest.mock import Mock
from modules.channel_manager import ChannelManager
@pytest.fixture
def cm(mock_logger):
"""ChannelManager with mock bot for pure logic tests."""
bot = Mock()
bot.logger = mock_logger
bot.db_manager = Mock()
bot.db_manager.db_path = "/dev/null"
bot.connected = False
bot.meshcore = Mock()
bot.meshcore.channels = {}
return ChannelManager(bot, max_channels=8)
class TestGenerateHashtagKey:
"""Tests for generate_hashtag_key() static method."""
def test_deterministic(self):
key1 = ChannelManager.generate_hashtag_key("general")
key2 = ChannelManager.generate_hashtag_key("general")
assert key1 == key2
assert len(key1) == 16
def test_prepends_hash_if_missing(self):
key_without = ChannelManager.generate_hashtag_key("general")
key_with = ChannelManager.generate_hashtag_key("#general")
assert key_without == key_with
def test_known_value(self):
"""Verify against independently computed SHA256."""
expected = hashlib.sha256(b"#longfast").digest()[:16]
result = ChannelManager.generate_hashtag_key("#LongFast")
assert result == expected
class TestChannelNameLookup:
"""Tests for get_channel_name()."""
def test_cached_channel_name(self, cm):
cm._channels_cache = {0: {"channel_name": "general"}}
cm._cache_valid = True
assert cm.get_channel_name(0) == "general"
def test_not_cached_returns_fallback(self, cm):
cm._channels_cache = {}
cm._cache_valid = True
result = cm.get_channel_name(99)
assert "99" in result
class TestChannelNumberLookup:
"""Tests for get_channel_number()."""
def test_found_by_name(self, cm):
cm._channels_cache = {0: {"channel_name": "general"}, 1: {"channel_name": "test"}}
cm._cache_valid = True
assert cm.get_channel_number("test") == 1
def test_case_insensitive(self, cm):
cm._channels_cache = {0: {"channel_name": "General"}}
cm._cache_valid = True
assert cm.get_channel_number("general") == 0
class TestCacheManagement:
"""Tests for cache invalidation."""
def test_invalidate_cache(self, cm):
cm._cache_valid = True
cm.invalidate_cache()
assert cm._cache_valid is False

View File

@@ -0,0 +1,274 @@
"""Tests for modules.command_manager."""
import time
import pytest
from configparser import ConfigParser
from unittest.mock import Mock, MagicMock, patch, AsyncMock
from modules.command_manager import CommandManager, InternetStatusCache
from modules.models import MeshMessage
from tests.conftest import mock_message
@pytest.fixture
def cm_bot(mock_logger):
"""Mock bot for CommandManager tests."""
bot = Mock()
bot.logger = mock_logger
bot.config = ConfigParser()
bot.config.add_section("Bot")
bot.config.set("Bot", "bot_name", "TestBot")
bot.config.add_section("Channels")
bot.config.set("Channels", "monitor_channels", "general,test")
bot.config.set("Channels", "respond_to_dms", "true")
bot.config.add_section("Keywords")
bot.config.set("Keywords", "ping", "Pong!")
bot.config.set("Keywords", "test", "ack")
bot.translator = Mock()
# Translator returns "key: kwarg_values" so assertions can check content
bot.translator.translate = Mock(
side_effect=lambda key, **kw: f"{key}: {' '.join(str(v) for v in kw.values())}"
)
bot.meshcore = None
bot.rate_limiter = Mock()
bot.rate_limiter.can_send = Mock(return_value=True)
bot.bot_tx_rate_limiter = Mock()
bot.bot_tx_rate_limiter.wait_for_tx = Mock()
bot.tx_delay_ms = 0
return bot
def make_manager(bot, commands=None):
"""Create CommandManager with mocked PluginLoader."""
with patch("modules.command_manager.PluginLoader") as mock_loader_class:
mock_loader = Mock()
mock_loader.load_all_plugins = Mock(return_value=commands or {})
mock_loader_class.return_value = mock_loader
return CommandManager(bot)
class TestLoadKeywords:
"""Tests for keyword loading from config."""
def test_load_keywords_from_config(self, cm_bot):
manager = make_manager(cm_bot)
assert manager.keywords["ping"] == "Pong!"
assert manager.keywords["test"] == "ack"
def test_load_keywords_strips_quotes(self, cm_bot):
cm_bot.config.set("Keywords", "quoted", '"Hello World"')
manager = make_manager(cm_bot)
assert manager.keywords["quoted"] == "Hello World"
def test_load_keywords_decodes_escapes(self, cm_bot):
cm_bot.config.set("Keywords", "multiline", r"Line1\nLine2")
manager = make_manager(cm_bot)
assert "\n" in manager.keywords["multiline"]
def test_load_keywords_empty_section(self, cm_bot):
cm_bot.config.remove_section("Keywords")
cm_bot.config.add_section("Keywords")
manager = make_manager(cm_bot)
assert manager.keywords == {}
class TestLoadBannedUsers:
"""Tests for banned users loading."""
def test_load_banned_users_from_config(self, cm_bot):
cm_bot.config.add_section("Banned_Users")
cm_bot.config.set("Banned_Users", "banned_users", "BadUser1, BadUser2")
manager = make_manager(cm_bot)
assert "BadUser1" in manager.banned_users
assert "BadUser2" in manager.banned_users
def test_load_banned_users_empty(self, cm_bot):
manager = make_manager(cm_bot)
assert manager.banned_users == []
def test_load_banned_users_whitespace_handling(self, cm_bot):
cm_bot.config.add_section("Banned_Users")
cm_bot.config.set("Banned_Users", "banned_users", " user1 , user2 ")
manager = make_manager(cm_bot)
assert "user1" in manager.banned_users
assert "user2" in manager.banned_users
class TestIsUserBanned:
"""Tests for ban checking logic."""
def test_exact_match(self, cm_bot):
cm_bot.config.add_section("Banned_Users")
cm_bot.config.set("Banned_Users", "banned_users", "BadUser")
manager = make_manager(cm_bot)
assert manager.is_user_banned("BadUser") is True
def test_prefix_match(self, cm_bot):
cm_bot.config.add_section("Banned_Users")
cm_bot.config.set("Banned_Users", "banned_users", "BadUser")
manager = make_manager(cm_bot)
assert manager.is_user_banned("BadUser 123") is True
def test_no_match(self, cm_bot):
cm_bot.config.add_section("Banned_Users")
cm_bot.config.set("Banned_Users", "banned_users", "BadUser")
manager = make_manager(cm_bot)
assert manager.is_user_banned("GoodUser") is False
def test_none_sender(self, cm_bot):
manager = make_manager(cm_bot)
assert manager.is_user_banned(None) is False
class TestChannelTriggerAllowed:
"""Tests for _is_channel_trigger_allowed."""
def test_dm_always_allowed(self, cm_bot):
cm_bot.config.set("Channels", "channel_keywords", "ping")
manager = make_manager(cm_bot)
msg = mock_message(content="wx", is_dm=True)
assert manager._is_channel_trigger_allowed("wx", msg) is True
def test_none_whitelist_allows_all(self, cm_bot):
manager = make_manager(cm_bot)
assert manager.channel_keywords is None
msg = mock_message(content="anything", channel="general", is_dm=False)
assert manager._is_channel_trigger_allowed("anything", msg) is True
def test_whitelist_allows_listed(self, cm_bot):
cm_bot.config.set("Channels", "channel_keywords", "ping, help")
manager = make_manager(cm_bot)
msg = mock_message(content="ping", channel="general", is_dm=False)
assert manager._is_channel_trigger_allowed("ping", msg) is True
def test_whitelist_blocks_unlisted(self, cm_bot):
cm_bot.config.set("Channels", "channel_keywords", "ping, help")
manager = make_manager(cm_bot)
msg = mock_message(content="wx", channel="general", is_dm=False)
assert manager._is_channel_trigger_allowed("wx", msg) is False
class TestLoadMonitorChannels:
"""Tests for monitor channels loading."""
def test_load_monitor_channels(self, cm_bot):
manager = make_manager(cm_bot)
assert "general" in manager.monitor_channels
assert "test" in manager.monitor_channels
assert len(manager.monitor_channels) == 2
def test_load_monitor_channels_empty(self, cm_bot):
cm_bot.config.set("Channels", "monitor_channels", "")
manager = make_manager(cm_bot)
assert manager.monitor_channels == []
class TestLoadChannelKeywords:
"""Tests for channel keyword whitelist loading."""
def test_load_channel_keywords_returns_list(self, cm_bot):
cm_bot.config.set("Channels", "channel_keywords", "ping, wx, help")
manager = make_manager(cm_bot)
assert isinstance(manager.channel_keywords, list)
assert "ping" in manager.channel_keywords
assert "wx" in manager.channel_keywords
assert "help" in manager.channel_keywords
def test_load_channel_keywords_empty_returns_none(self, cm_bot):
cm_bot.config.set("Channels", "channel_keywords", "")
manager = make_manager(cm_bot)
assert manager.channel_keywords is None
def test_load_channel_keywords_not_set_returns_none(self, cm_bot):
manager = make_manager(cm_bot)
assert manager.channel_keywords is None
class TestCheckKeywords:
"""Tests for check_keywords() message matching."""
def test_exact_keyword_match(self, cm_bot):
manager = make_manager(cm_bot)
msg = mock_message(content="ping", channel="general", is_dm=False)
matches = manager.check_keywords(msg)
assert any(trigger == "ping" for trigger, _ in matches)
def test_prefix_required_blocks_bare_keyword(self, cm_bot):
cm_bot.config.set("Bot", "command_prefix", "!")
manager = make_manager(cm_bot)
msg = mock_message(content="ping", channel="general", is_dm=False)
matches = manager.check_keywords(msg)
assert len(matches) == 0
def test_prefix_matches(self, cm_bot):
cm_bot.config.set("Bot", "command_prefix", "!")
manager = make_manager(cm_bot)
msg = mock_message(content="!ping", channel="general", is_dm=False)
matches = manager.check_keywords(msg)
assert any(trigger == "ping" for trigger, _ in matches)
def test_wrong_channel_no_match(self, cm_bot):
manager = make_manager(cm_bot)
msg = mock_message(content="ping", channel="other", is_dm=False)
matches = manager.check_keywords(msg)
assert len(matches) == 0
def test_dm_allowed(self, cm_bot):
manager = make_manager(cm_bot)
msg = mock_message(content="ping", is_dm=True)
matches = manager.check_keywords(msg)
assert any(trigger == "ping" for trigger, _ in matches)
def test_help_routing(self, cm_bot):
manager = make_manager(cm_bot)
msg = mock_message(content="help", is_dm=True)
matches = manager.check_keywords(msg)
assert any(trigger == "help" for trigger, _ in matches)
class TestGetHelpForCommand:
"""Tests for command-specific help."""
def test_known_command_returns_help(self, cm_bot):
mock_cmd = MagicMock()
mock_cmd.keywords = ["wx"]
mock_cmd.get_help_text = Mock(return_value="Weather forecast info")
mock_cmd.dm_only = False
mock_cmd.requires_internet = False
manager = make_manager(cm_bot, commands={"wx": mock_cmd})
result = manager.get_help_for_command("wx")
# Translator receives help_text as kwarg, so it appears in the output
assert "Weather forecast info" in result
# Verify translator was called with the right key
cm_bot.translator.translate.assert_called_with(
"commands.help.specific", command="wx", help_text="Weather forecast info"
)
def test_unknown_command_returns_error(self, cm_bot):
manager = make_manager(cm_bot)
result = manager.get_help_for_command("nonexistent")
# Translator receives 'commands.help.unknown' key with command name
cm_bot.translator.translate.assert_called()
call_args = cm_bot.translator.translate.call_args
assert call_args[0][0] == "commands.help.unknown"
assert call_args[1]["command"] == "nonexistent"
class TestInternetStatusCache:
"""Tests for InternetStatusCache."""
def test_is_valid_fresh(self):
cache = InternetStatusCache(has_internet=True, timestamp=time.time())
assert cache.is_valid(30) is True
def test_is_valid_stale(self):
cache = InternetStatusCache(has_internet=True, timestamp=time.time() - 60)
assert cache.is_valid(30) is False
def test_get_lock_lazy_creation(self):
cache = InternetStatusCache(has_internet=True, timestamp=0)
assert cache._lock is None
lock1 = cache._get_lock()
lock2 = cache._get_lock()
assert lock1 is lock2

182
tests/test_db_manager.py Normal file
View File

@@ -0,0 +1,182 @@
"""Tests for modules.db_manager."""
import sqlite3
import json
import pytest
from unittest.mock import Mock
from modules.db_manager import DBManager
@pytest.fixture
def db(mock_logger, tmp_path):
"""File-based DBManager for testing. _init_database() auto-creates core tables."""
bot = Mock()
bot.logger = mock_logger
return DBManager(bot, str(tmp_path / "test.db"))
class TestGeocoding:
"""Tests for geocoding cache."""
def test_cache_and_retrieve_geocoding(self, db):
db.cache_geocoding("Seattle, WA", 47.6062, -122.3321)
lat, lon = db.get_cached_geocoding("Seattle, WA")
assert abs(lat - 47.6062) < 0.001
assert abs(lon - (-122.3321)) < 0.001
def test_get_cached_geocoding_miss(self, db):
lat, lon = db.get_cached_geocoding("Nonexistent City")
assert lat is None
assert lon is None
def test_cache_geocoding_overwrites_existing(self, db):
db.cache_geocoding("Test", 10.0, 20.0)
db.cache_geocoding("Test", 30.0, 40.0)
lat, lon = db.get_cached_geocoding("Test")
assert abs(lat - 30.0) < 0.001
assert abs(lon - 40.0) < 0.001
def test_cache_geocoding_invalid_hours_logged(self, db):
"""Invalid cache_hours is caught and logged, not raised."""
db.cache_geocoding("Test", 10.0, 20.0, cache_hours=0)
db.bot.logger.error.assert_called()
# Verify it did not store anything
lat, lon = db.get_cached_geocoding("Test")
assert lat is None
class TestGenericCache:
"""Tests for generic cache."""
def test_cache_and_retrieve_value(self, db):
db.cache_value("weather_key", "sunny", "weather")
result = db.get_cached_value("weather_key", "weather")
assert result == "sunny"
def test_get_cached_value_miss(self, db):
assert db.get_cached_value("nonexistent", "any") is None
def test_different_keys_stored_independently(self, db):
db.cache_value("key_a", "value_a", "weather")
db.cache_value("key_b", "value_b", "weather")
assert db.get_cached_value("key_a", "weather") == "value_a"
assert db.get_cached_value("key_b", "weather") == "value_b"
def test_cache_json_round_trip(self, db):
data = {"temp": 72, "conditions": "clear", "nested": {"wind": 5}}
db.cache_json("forecast", data, "weather")
result = db.get_cached_json("forecast", "weather")
assert result == data
def test_get_cached_json_invalid_json(self, db):
"""Manually insert invalid JSON; get_cached_json returns None."""
with sqlite3.connect(str(db.db_path)) as conn:
conn.execute(
"INSERT INTO generic_cache (cache_key, cache_value, cache_type, expires_at) "
"VALUES (?, ?, ?, datetime('now', '+24 hours'))",
("bad_json", "not{valid}json", "test"),
)
conn.commit()
assert db.get_cached_json("bad_json", "test") is None
class TestCacheCleanup:
"""Tests for cache expiry cleanup."""
def test_cleanup_expired_deletes_old(self, db):
db.cache_value("old_key", "old_val", "test")
# Manually set expires_at to the past
with sqlite3.connect(str(db.db_path)) as conn:
conn.execute(
"UPDATE generic_cache SET expires_at = datetime('now', '-1 hours') "
"WHERE cache_key = 'old_key'"
)
conn.commit()
db.cleanup_expired_cache()
assert db.get_cached_value("old_key", "test") is None
def test_cleanup_expired_preserves_valid(self, db):
db.cache_value("fresh_key", "fresh_val", "test", cache_hours=720)
db.cleanup_expired_cache()
assert db.get_cached_value("fresh_key", "test") == "fresh_val"
class TestTableManagement:
"""Tests for table creation whitelist."""
def test_create_table_allowed(self, db):
db.create_table(
"greeted_users",
"id INTEGER PRIMARY KEY, name TEXT NOT NULL",
)
with sqlite3.connect(str(db.db_path)) as conn:
cursor = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='greeted_users'"
)
assert cursor.fetchone() is not None
def test_create_table_disallowed_raises(self, db):
with pytest.raises(ValueError, match="not in allowed tables"):
db.create_table("not_allowed", "id INTEGER PRIMARY KEY")
def test_create_table_sql_injection_name_raises(self, db):
with pytest.raises(ValueError):
db.create_table("DROP TABLE users; --", "id INTEGER PRIMARY KEY")
class TestExecuteQuery:
"""Tests for raw query execution."""
def test_execute_query_returns_dicts(self, db):
db.set_metadata("test_key", "test_value")
rows = db.execute_query("SELECT * FROM bot_metadata WHERE key = ?", ("test_key",))
assert len(rows) == 1
assert rows[0]["key"] == "test_key"
assert rows[0]["value"] == "test_value"
def test_execute_update_returns_rowcount(self, db):
db.set_metadata("del_key", "del_value")
count = db.execute_update(
"DELETE FROM bot_metadata WHERE key = ?", ("del_key",)
)
assert count == 1
class TestMetadata:
"""Tests for bot metadata storage."""
def test_set_and_get_metadata(self, db):
db.set_metadata("version", "1.2.3")
assert db.get_metadata("version") == "1.2.3"
def test_get_metadata_miss(self, db):
assert db.get_metadata("nonexistent") is None
def test_bot_start_time_round_trip(self, db):
ts = 1234567890.5
db.set_bot_start_time(ts)
assert db.get_bot_start_time() == ts
class TestCacheHoursValidation:
"""Tests for cache_hours boundary validation."""
def test_boundary_values(self, db):
# Valid boundaries
db.cache_value("k1", "v1", "t", cache_hours=1)
assert db.get_cached_value("k1", "t") == "v1"
db.cache_value("k2", "v2", "t", cache_hours=87600)
assert db.get_cached_value("k2", "t") == "v2"
# Invalid boundaries — caught and logged, not stored
db.cache_value("k3", "v3", "t", cache_hours=0)
db.bot.logger.error.assert_called()
assert db.get_cached_value("k3", "t") is None
db.bot.logger.error.reset_mock()
db.cache_value("k4", "v4", "t", cache_hours=87601)
db.bot.logger.error.assert_called()
assert db.get_cached_value("k4", "t") is None

View File

@@ -0,0 +1,150 @@
"""Tests for FeedManager pure formatting and filtering logic."""
import json
from datetime import datetime, timezone, timedelta
import pytest
from configparser import ConfigParser
from unittest.mock import Mock
from modules.feed_manager import FeedManager
@pytest.fixture
def fm(mock_logger):
"""FeedManager with disabled networking for pure-logic tests."""
bot = Mock()
bot.logger = mock_logger
bot.config = ConfigParser()
bot.config.add_section("Feed_Manager")
bot.config.set("Feed_Manager", "feed_manager_enabled", "false")
bot.config.set("Feed_Manager", "max_message_length", "200")
bot.db_manager = Mock()
bot.db_manager.db_path = "/dev/null"
return FeedManager(bot)
class TestApplyShortening:
"""Tests for _apply_shortening()."""
def test_truncate_short_text_unchanged(self, fm):
assert fm._apply_shortening("hello", "truncate:20") == "hello"
def test_truncate_long_text_adds_ellipsis(self, fm):
result = fm._apply_shortening("Hello World", "truncate:5")
assert result == "Hello..."
def test_word_wrap_breaks_at_boundary(self, fm):
result = fm._apply_shortening("Hello beautiful world", "word_wrap:15")
# word_wrap truncates at a word boundary and appends "..."
# "Hello beautiful world"[:15] = "Hello beautiful", last space at 5 (too early),
# so result is "Hello beautiful..." (truncated at 15 chars + ellipsis)
assert result.endswith("...")
# The base text (without ellipsis) should be <= the wrap limit
assert len(result.rstrip(".")) <= 15 or result == "Hello beautiful..."
def test_first_words_limits_count(self, fm):
result = fm._apply_shortening("one two three four", "first_words:2")
assert result.startswith("one two")
def test_regex_extracts_group(self, fm):
result = fm._apply_shortening("Price: $42.99 today", r"regex:Price: \$(\d+\.\d+)")
assert result == "42.99"
def test_if_regex_returns_then_on_match(self, fm):
result = fm._apply_shortening("open", "if_regex:open:YES:NO")
assert result == "YES"
def test_if_regex_returns_else_on_no_match(self, fm):
result = fm._apply_shortening("closed", "if_regex:open:YES:NO")
assert result == "NO"
def test_empty_text_returns_empty(self, fm):
assert fm._apply_shortening("", "truncate:10") == ""
class TestGetNestedValue:
"""Tests for _get_nested_value()."""
def test_simple_field_access(self, fm):
assert fm._get_nested_value({"name": "test"}, "name") == "test"
def test_nested_field_access(self, fm):
data = {"raw": {"Priority": "high"}}
assert fm._get_nested_value(data, "raw.Priority") == "high"
def test_missing_field_returns_default(self, fm):
assert fm._get_nested_value({}, "missing") == ""
assert fm._get_nested_value({}, "missing", "N/A") == "N/A"
class TestShouldSendItem:
"""Tests for _should_send_item() filter evaluation."""
def test_no_filter_sends_all(self, fm):
feed = {"id": 1}
item = {"raw": {"Priority": "low"}}
assert fm._should_send_item(feed, item) is True
def test_equals_filter_matches(self, fm):
feed = {
"id": 1,
"filter_config": json.dumps({
"conditions": [
{"field": "Priority", "operator": "equals", "value": "high"}
]
}),
}
item = {"raw": {"Priority": "high"}}
assert fm._should_send_item(feed, item) is True
def test_equals_filter_rejects(self, fm):
feed = {
"id": 1,
"filter_config": json.dumps({
"conditions": [
{"field": "Priority", "operator": "equals", "value": "high"}
]
}),
}
item = {"raw": {"Priority": "low"}}
assert fm._should_send_item(feed, item) is False
def test_in_filter_matches(self, fm):
feed = {
"id": 1,
"filter_config": json.dumps({
"conditions": [
{"field": "Priority", "operator": "in", "values": ["high", "highest"]}
]
}),
}
item = {"raw": {"Priority": "highest"}}
assert fm._should_send_item(feed, item) is True
def test_and_logic_all_must_pass(self, fm):
feed = {
"id": 1,
"filter_config": json.dumps({
"conditions": [
{"field": "Priority", "operator": "equals", "value": "high"},
{"field": "Status", "operator": "equals", "value": "open"},
],
"logic": "AND",
}),
}
# First condition passes, second fails
item = {"raw": {"Priority": "high", "Status": "closed"}}
assert fm._should_send_item(feed, item) is False
class TestFormatTimestamp:
"""Tests for _format_timestamp()."""
def test_recent_timestamp(self, fm):
five_min_ago = datetime.now(timezone.utc) - timedelta(minutes=5)
result = fm._format_timestamp(five_min_ago)
assert "5m ago" in result
def test_none_returns_empty(self, fm):
assert fm._format_timestamp(None) == ""

161
tests/test_plugin_loader.py Normal file
View File

@@ -0,0 +1,161 @@
"""Tests for modules.plugin_loader."""
import pytest
from unittest.mock import Mock, MagicMock, AsyncMock
from modules.plugin_loader import PluginLoader
from modules.commands.base_command import BaseCommand
@pytest.fixture
def loader_bot(mock_logger, minimal_config):
"""Mock bot for PluginLoader tests."""
bot = MagicMock()
bot.logger = mock_logger
bot.config = minimal_config
bot.translator = MagicMock()
bot.translator.translate = Mock(side_effect=lambda k, **kw: k)
bot.translator.get_value = Mock(return_value=None)
bot.command_manager = MagicMock()
bot.command_manager.monitor_channels = ["general"]
bot.command_manager.send_response = AsyncMock(return_value=True)
bot.meshcore = None
return bot
def _load_and_register(loader, plugin_file):
"""Load a plugin and register it in the loader's internal state (like load_all_plugins does)."""
instance = loader.load_plugin(plugin_file)
if instance:
metadata = instance.get_metadata()
name = metadata["name"]
loader.loaded_plugins[name] = instance
loader.plugin_metadata[name] = metadata
loader._build_keyword_mappings(name, metadata)
return instance
class TestDiscover:
"""Tests for plugin discovery."""
def test_discover_plugins_finds_command_files(self, loader_bot):
loader = PluginLoader(loader_bot)
plugins = loader.discover_plugins()
assert isinstance(plugins, list)
assert len(plugins) > 0
# Should find well-known commands
assert "ping_command" in plugins
assert "help_command" in plugins
def test_discover_plugins_excludes_base_and_init(self, loader_bot):
loader = PluginLoader(loader_bot)
plugins = loader.discover_plugins()
assert "__init__" not in plugins
assert "base_command" not in plugins
def test_discover_alternative_plugins_empty_when_no_dir(self, loader_bot, tmp_path):
loader = PluginLoader(loader_bot, commands_dir=str(tmp_path / "nonexistent"))
result = loader.discover_alternative_plugins()
assert result == []
class TestValidatePlugin:
"""Tests for plugin class validation."""
def test_validate_missing_execute(self, loader_bot):
loader = PluginLoader(loader_bot)
class NoExecute:
name = "test"
keywords = ["test"]
errors = loader._validate_plugin(NoExecute)
assert any("execute" in e.lower() for e in errors)
def test_validate_sync_execute(self, loader_bot):
loader = PluginLoader(loader_bot)
class SyncExecute:
name = "test"
keywords = ["test"]
def execute(self, message):
return True
errors = loader._validate_plugin(SyncExecute)
assert any("async" in e.lower() for e in errors)
def test_validate_valid_class(self, loader_bot):
loader = PluginLoader(loader_bot)
class ValidCommand:
name = "test"
keywords = ["test"]
async def execute(self, message):
return True
errors = loader._validate_plugin(ValidCommand)
assert len(errors) == 0
class TestLoadPlugin:
"""Tests for loading individual plugins."""
def test_load_ping_command(self, loader_bot):
loader = PluginLoader(loader_bot)
plugin = loader.load_plugin("ping_command")
assert plugin is not None
assert isinstance(plugin, BaseCommand)
assert plugin.name == "ping"
def test_load_nonexistent_returns_none(self, loader_bot):
loader = PluginLoader(loader_bot)
plugin = loader.load_plugin("totally_nonexistent_command")
assert plugin is None
assert "totally_nonexistent_command" in loader._failed_plugins
class TestKeywordLookup:
"""Tests for keyword-based plugin lookup after registration."""
def test_get_plugin_by_keyword(self, loader_bot):
loader = PluginLoader(loader_bot)
_load_and_register(loader, "ping_command")
result = loader.get_plugin_by_keyword("ping")
assert result is not None
assert result.name == "ping"
def test_get_plugin_by_keyword_miss(self, loader_bot):
loader = PluginLoader(loader_bot)
assert loader.get_plugin_by_keyword("nonexistent") is None
def test_get_plugin_by_name(self, loader_bot):
loader = PluginLoader(loader_bot)
_load_and_register(loader, "ping_command")
result = loader.get_plugin_by_name("ping")
assert result is not None
assert result.name == "ping"
class TestCategoryAndFailed:
"""Tests for category filtering and failed plugin tracking."""
def test_get_plugins_by_category(self, loader_bot):
loader = PluginLoader(loader_bot)
_load_and_register(loader, "ping_command")
_load_and_register(loader, "help_command")
# Ping and help are in the "basic" category
result = loader.get_plugins_by_category("basic")
assert isinstance(result, dict)
assert "ping" in result or "help" in result
def test_get_failed_plugins_returns_copy(self, loader_bot):
loader = PluginLoader(loader_bot)
loader.load_plugin("nonexistent_command")
failed = loader.get_failed_plugins()
assert isinstance(failed, dict)
assert "nonexistent_command" in failed
# Mutating the return should not affect internal state
failed.clear()
assert len(loader.get_failed_plugins()) > 0

View File

@@ -0,0 +1,79 @@
"""Tests for MessageScheduler pure logic (no threading, no asyncio)."""
import pytest
from configparser import ConfigParser
from unittest.mock import Mock
from modules.scheduler import MessageScheduler
@pytest.fixture
def scheduler(mock_logger):
"""MessageScheduler with mock bot for pure logic tests."""
bot = Mock()
bot.logger = mock_logger
bot.config = ConfigParser()
bot.config.add_section("Bot")
return MessageScheduler(bot)
class TestIsValidTimeFormat:
"""Tests for _is_valid_time_format()."""
def test_valid_time_0000(self, scheduler):
assert scheduler._is_valid_time_format("0000") is True
def test_valid_time_2359(self, scheduler):
assert scheduler._is_valid_time_format("2359") is True
def test_valid_time_1200(self, scheduler):
assert scheduler._is_valid_time_format("1200") is True
def test_invalid_time_2400(self, scheduler):
assert scheduler._is_valid_time_format("2400") is False
def test_invalid_time_0060(self, scheduler):
assert scheduler._is_valid_time_format("0060") is False
def test_invalid_time_short(self, scheduler):
assert scheduler._is_valid_time_format("123") is False
def test_invalid_time_letters(self, scheduler):
assert scheduler._is_valid_time_format("abcd") is False
def test_invalid_time_empty(self, scheduler):
assert scheduler._is_valid_time_format("") is False
class TestGetCurrentTime:
"""Tests for timezone-aware time retrieval."""
def test_valid_timezone(self, scheduler):
scheduler.bot.config.set("Bot", "timezone", "US/Pacific")
result = scheduler.get_current_time()
assert result.tzinfo is not None
def test_invalid_timezone_falls_back(self, scheduler):
scheduler.bot.config.set("Bot", "timezone", "Invalid/Zone")
result = scheduler.get_current_time()
# Should still return a datetime (system time fallback)
assert result is not None
scheduler.bot.logger.warning.assert_called()
def test_empty_timezone_uses_system(self, scheduler):
scheduler.bot.config.set("Bot", "timezone", "")
result = scheduler.get_current_time()
assert result is not None
class TestHasMeshInfoPlaceholders:
"""Tests for _has_mesh_info_placeholders()."""
def test_detects_placeholder(self, scheduler):
assert scheduler._has_mesh_info_placeholders("Contacts: {total_contacts}") is True
def test_no_placeholder_returns_false(self, scheduler):
assert scheduler._has_mesh_info_placeholders("Hello world") is False
def test_detects_legacy_placeholder(self, scheduler):
assert scheduler._has_mesh_info_placeholders("Repeaters: {repeaters}") is True