mirror of
https://github.com/agessaman/meshcore-bot.git
synced 2026-03-30 20:15:40 +00:00
- Updated MessageHandler to use OrderedDict for SNR and RSSI caches, implementing LRU eviction to maintain a maximum size. - Added thread safety to rate limiting classes by introducing locks, ensuring consistent behavior in concurrent environments. - Introduced periodic cleanup in TransmissionTracker to manage memory usage effectively. - Added unit tests for LRU cache behavior and automatic cleanup functionality. These changes improve performance and reliability in handling message data and rate limiting.
1598 lines
66 KiB
Python
1598 lines
66 KiB
Python
"""Tests for MessageHandler pure logic (no network, no meshcore device)."""
|
|
|
|
import configparser
|
|
import time
|
|
from unittest.mock import AsyncMock, Mock, patch
|
|
|
|
import pytest
|
|
|
|
from modules.message_handler import MessageHandler
|
|
from modules.models import MeshMessage
|
|
|
|
|
|
@pytest.fixture
|
|
def bot(mock_logger):
|
|
"""Minimal bot mock for MessageHandler instantiation."""
|
|
bot = Mock()
|
|
bot.logger = mock_logger
|
|
bot.config = configparser.ConfigParser()
|
|
bot.config.add_section("Bot")
|
|
bot.config.set("Bot", "enabled", "true")
|
|
bot.config.set("Bot", "rf_data_timeout", "15.0")
|
|
bot.config.set("Bot", "message_correlation_timeout", "10.0")
|
|
bot.config.set("Bot", "enable_enhanced_correlation", "true")
|
|
bot.config.add_section("Channels")
|
|
bot.config.set("Channels", "respond_to_dms", "true")
|
|
bot.connection_time = None
|
|
bot.prefix_hex_chars = 2
|
|
bot.command_manager = Mock()
|
|
bot.command_manager.monitor_channels = ["general", "test"]
|
|
bot.command_manager.is_user_banned = Mock(return_value=False)
|
|
bot.command_manager.commands = {}
|
|
return bot
|
|
|
|
|
|
@pytest.fixture
|
|
def handler(bot):
|
|
return MessageHandler(bot)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _is_old_cached_message
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestIsOldCachedMessage:
|
|
"""Tests for MessageHandler._is_old_cached_message()."""
|
|
|
|
def test_no_connection_time_returns_false(self, handler):
|
|
handler.bot.connection_time = None
|
|
assert handler._is_old_cached_message(12345) is False
|
|
|
|
def test_none_timestamp_returns_false(self, handler):
|
|
handler.bot.connection_time = time.time()
|
|
assert handler._is_old_cached_message(None) is False
|
|
|
|
def test_unknown_timestamp_returns_false(self, handler):
|
|
handler.bot.connection_time = time.time()
|
|
assert handler._is_old_cached_message("unknown") is False
|
|
|
|
def test_zero_timestamp_returns_false(self, handler):
|
|
handler.bot.connection_time = time.time()
|
|
assert handler._is_old_cached_message(0) is False
|
|
|
|
def test_negative_timestamp_returns_false(self, handler):
|
|
handler.bot.connection_time = time.time()
|
|
assert handler._is_old_cached_message(-1) is False
|
|
|
|
def test_old_timestamp_returns_true(self, handler):
|
|
now = time.time()
|
|
handler.bot.connection_time = now
|
|
old = now - 100 # 100 seconds before connection
|
|
assert handler._is_old_cached_message(old) is True
|
|
|
|
def test_recent_timestamp_returns_false(self, handler):
|
|
now = time.time()
|
|
handler.bot.connection_time = now
|
|
recent = now + 1 # after connection
|
|
assert handler._is_old_cached_message(recent) is False
|
|
|
|
def test_far_future_timestamp_returns_false(self, handler):
|
|
handler.bot.connection_time = time.time()
|
|
future = time.time() + 7200 # 2 hours in future
|
|
assert handler._is_old_cached_message(future) is False
|
|
|
|
def test_invalid_string_returns_false(self, handler):
|
|
handler.bot.connection_time = time.time()
|
|
assert handler._is_old_cached_message("not_a_number") is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _path_bytes_to_nodes
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestPathBytesToNodes:
|
|
"""Tests for MessageHandler._path_bytes_to_nodes()."""
|
|
|
|
def test_single_byte_per_hop(self, handler):
|
|
# 3 bytes -> 3 nodes of 2 hex chars each
|
|
path_hex, nodes = handler._path_bytes_to_nodes(bytes.fromhex("017e86"), prefix_hex_chars=2)
|
|
assert path_hex == "017e86"
|
|
assert nodes == ["01", "7E", "86"]
|
|
|
|
def test_two_bytes_per_hop(self, handler):
|
|
path_hex, nodes = handler._path_bytes_to_nodes(bytes.fromhex("01027e86"), prefix_hex_chars=4)
|
|
assert nodes == ["0102", "7E86"]
|
|
|
|
def test_remainder_falls_back_to_1byte(self, handler):
|
|
# 3 bytes with prefix_hex_chars=4 → remainder, fallback to 1 byte
|
|
path_hex, nodes = handler._path_bytes_to_nodes(bytes.fromhex("017e86"), prefix_hex_chars=4)
|
|
assert nodes == ["01", "7E", "86"]
|
|
|
|
def test_empty_bytes(self, handler):
|
|
path_hex, nodes = handler._path_bytes_to_nodes(b"", prefix_hex_chars=2)
|
|
assert path_hex == ""
|
|
# Empty or fallback nodes — no crash expected
|
|
assert isinstance(nodes, list)
|
|
|
|
def test_zero_prefix_hex_chars_defaults_to_2(self, handler):
|
|
path_hex, nodes = handler._path_bytes_to_nodes(bytes.fromhex("017e"), prefix_hex_chars=0)
|
|
assert nodes == ["01", "7E"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _path_hex_to_nodes
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestPathHexToNodes:
|
|
"""Tests for MessageHandler._path_hex_to_nodes()."""
|
|
|
|
def test_splits_into_2char_nodes(self, handler):
|
|
handler.bot.prefix_hex_chars = 2
|
|
nodes = handler._path_hex_to_nodes("017e86")
|
|
assert nodes == ["01", "7e", "86"]
|
|
|
|
def test_empty_string_returns_empty(self, handler):
|
|
nodes = handler._path_hex_to_nodes("")
|
|
assert nodes == []
|
|
|
|
def test_short_string_returns_empty(self, handler):
|
|
nodes = handler._path_hex_to_nodes("0")
|
|
assert nodes == []
|
|
|
|
def test_4char_prefix_hex_chars(self, handler):
|
|
handler.bot.prefix_hex_chars = 4
|
|
nodes = handler._path_hex_to_nodes("01027e86")
|
|
assert nodes == ["0102", "7e86"]
|
|
|
|
def test_remainder_falls_back_to_2chars(self, handler):
|
|
handler.bot.prefix_hex_chars = 4
|
|
# 6 hex chars (3 bytes) with 4-char chunks → remainder → fallback to 2-char
|
|
nodes = handler._path_hex_to_nodes("017e86")
|
|
assert nodes == ["01", "7e", "86"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _format_path_string
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestFormatPathString:
|
|
"""Tests for MessageHandler._format_path_string()."""
|
|
|
|
def test_empty_path_returns_direct(self, handler):
|
|
assert handler._format_path_string("") == "Direct"
|
|
|
|
def test_legacy_single_byte_per_hop(self, handler):
|
|
result = handler._format_path_string("017e86")
|
|
assert result == "01,7e,86"
|
|
|
|
def test_with_bytes_per_hop_1(self, handler):
|
|
result = handler._format_path_string("017e86", bytes_per_hop=1)
|
|
assert result == "01,7e,86"
|
|
|
|
def test_with_bytes_per_hop_2(self, handler):
|
|
result = handler._format_path_string("01027e86", bytes_per_hop=2)
|
|
assert result == "0102,7e86"
|
|
|
|
def test_remainder_with_bytes_per_hop_falls_back(self, handler):
|
|
# 3 bytes (6 hex) with bytes_per_hop=2 → remainder → fallback to 1 byte
|
|
result = handler._format_path_string("017e86", bytes_per_hop=2)
|
|
assert result == "01,7e,86"
|
|
|
|
def test_none_path_returns_direct(self, handler):
|
|
assert handler._format_path_string(None) == "Direct"
|
|
|
|
def test_invalid_hex_returns_raw(self, handler):
|
|
result = handler._format_path_string("ZZZZ", bytes_per_hop=None)
|
|
# Should not crash; returns "Raw: ..." fallback
|
|
assert "Raw" in result or "ZZ" in result.upper() or result == "Direct"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _get_route_type_name
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestGetRouteTypeName:
|
|
"""Tests for MessageHandler._get_route_type_name()."""
|
|
|
|
def test_known_types(self, handler):
|
|
assert handler._get_route_type_name(0x00) == "ROUTE_TYPE_TRANSPORT_FLOOD"
|
|
assert handler._get_route_type_name(0x01) == "ROUTE_TYPE_FLOOD"
|
|
assert handler._get_route_type_name(0x02) == "ROUTE_TYPE_DIRECT"
|
|
assert handler._get_route_type_name(0x03) == "ROUTE_TYPE_TRANSPORT_DIRECT"
|
|
|
|
def test_unknown_type(self, handler):
|
|
result = handler._get_route_type_name(0xFF)
|
|
assert "UNKNOWN" in result
|
|
assert "ff" in result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# get_payload_type_name
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestGetPayloadTypeName:
|
|
"""Tests for MessageHandler.get_payload_type_name()."""
|
|
|
|
def test_known_types(self, handler):
|
|
assert handler.get_payload_type_name(0x00) == "REQ"
|
|
assert handler.get_payload_type_name(0x02) == "TXT_MSG"
|
|
assert handler.get_payload_type_name(0x04) == "ADVERT"
|
|
assert handler.get_payload_type_name(0x05) == "GRP_TXT"
|
|
assert handler.get_payload_type_name(0x08) == "PATH"
|
|
assert handler.get_payload_type_name(0x0F) == "RAW_CUSTOM"
|
|
|
|
def test_unknown_type(self, handler):
|
|
result = handler.get_payload_type_name(0xAB)
|
|
assert "UNKNOWN" in result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# should_process_message
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestShouldProcessMessage:
|
|
"""Tests for MessageHandler.should_process_message()."""
|
|
|
|
def _make_msg(self, channel=None, is_dm=False, sender_id="Alice"):
|
|
return MeshMessage(
|
|
content="hello",
|
|
channel=channel,
|
|
is_dm=is_dm,
|
|
sender_id=sender_id,
|
|
)
|
|
|
|
def test_bot_disabled_returns_false(self, handler):
|
|
handler.bot.config.set("Bot", "enabled", "false")
|
|
msg = self._make_msg(channel="general")
|
|
assert handler.should_process_message(msg) is False
|
|
|
|
def test_banned_user_returns_false(self, handler):
|
|
handler.bot.command_manager.is_user_banned.return_value = True
|
|
msg = self._make_msg(channel="general")
|
|
assert handler.should_process_message(msg) is False
|
|
|
|
def test_monitored_channel_returns_true(self, handler):
|
|
msg = self._make_msg(channel="general")
|
|
assert handler.should_process_message(msg) is True
|
|
|
|
def test_unmonitored_channel_returns_false(self, handler):
|
|
msg = self._make_msg(channel="unmonitored")
|
|
assert handler.should_process_message(msg) is False
|
|
|
|
def test_dm_enabled_returns_true(self, handler):
|
|
handler.bot.config.set("Channels", "respond_to_dms", "true")
|
|
msg = self._make_msg(is_dm=True)
|
|
assert handler.should_process_message(msg) is True
|
|
|
|
def test_dm_disabled_returns_false(self, handler):
|
|
handler.bot.config.set("Channels", "respond_to_dms", "false")
|
|
msg = self._make_msg(is_dm=True)
|
|
assert handler.should_process_message(msg) is False
|
|
|
|
def test_command_override_allows_unmonitored_channel(self, handler):
|
|
cmd = Mock()
|
|
cmd.is_channel_allowed = Mock(return_value=True)
|
|
handler.bot.command_manager.commands = {"special": cmd}
|
|
msg = self._make_msg(channel="unmonitored")
|
|
assert handler.should_process_message(msg) is True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _cleanup_stale_cache_entries
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestCleanupStaleCacheEntries:
|
|
"""Tests for MessageHandler._cleanup_stale_cache_entries()."""
|
|
|
|
def test_removes_old_timestamp_cache_entries(self, handler):
|
|
now = time.time()
|
|
current_time = now + handler._cache_cleanup_interval + 1
|
|
# Old entry: well outside rf_data_timeout relative to current_time
|
|
old_ts = current_time - handler.rf_data_timeout - 10
|
|
# Recent entry: within rf_data_timeout of current_time
|
|
recent_ts = current_time - 1
|
|
handler.rf_data_by_timestamp[old_ts] = {"timestamp": old_ts, "data": "old"}
|
|
handler.rf_data_by_timestamp[recent_ts] = {"timestamp": recent_ts, "data": "new"}
|
|
# Force full cleanup
|
|
handler._last_cache_cleanup = 0
|
|
handler._cleanup_stale_cache_entries(current_time=current_time)
|
|
# Old entry should be gone, recent kept
|
|
assert old_ts not in handler.rf_data_by_timestamp
|
|
assert recent_ts in handler.rf_data_by_timestamp
|
|
|
|
def test_removes_stale_pubkey_cache_entries(self, handler):
|
|
now = time.time()
|
|
handler.rf_data_by_pubkey["deadbeef"] = [
|
|
{"timestamp": now - 100, "data": "old"}, # stale
|
|
{"timestamp": now, "data": "new"}, # fresh
|
|
]
|
|
handler._last_cache_cleanup = 0
|
|
handler._cleanup_stale_cache_entries(current_time=now + handler._cache_cleanup_interval + 1)
|
|
entries = handler.rf_data_by_pubkey.get("deadbeef", [])
|
|
assert all(now - e["timestamp"] < handler.rf_data_timeout for e in entries)
|
|
|
|
def test_removes_stale_recent_rf_data(self, handler):
|
|
now = time.time()
|
|
handler.recent_rf_data = [
|
|
{"timestamp": now - 100},
|
|
{"timestamp": now},
|
|
]
|
|
handler._last_cache_cleanup = 0
|
|
handler._cleanup_stale_cache_entries(current_time=now + handler._cache_cleanup_interval + 1)
|
|
assert all(now - e["timestamp"] < handler.rf_data_timeout for e in handler.recent_rf_data)
|
|
|
|
def test_skips_full_cleanup_within_interval(self, handler):
|
|
now = time.time()
|
|
handler._last_cache_cleanup = now # just cleaned
|
|
# Stale entry in timestamp cache
|
|
stale_ts = now - 100
|
|
handler.rf_data_by_timestamp[stale_ts] = {"timestamp": stale_ts}
|
|
# Call with time just slightly after (within cleanup interval)
|
|
handler._cleanup_stale_cache_entries(current_time=now + 1)
|
|
# Still cleaned (timeout-only cleanup still runs)
|
|
assert stale_ts not in handler.rf_data_by_timestamp
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# find_recent_rf_data
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestFindRecentRfData:
|
|
"""Tests for MessageHandler.find_recent_rf_data()."""
|
|
|
|
def _rf_entry(self, age=0, packet_prefix="aabbccdd", pubkey_prefix="1122"):
|
|
return {
|
|
"timestamp": time.time() - age,
|
|
"snr": 5,
|
|
"rssi": -80,
|
|
"packet_prefix": packet_prefix,
|
|
"pubkey_prefix": pubkey_prefix,
|
|
}
|
|
|
|
def test_returns_none_when_empty(self, handler):
|
|
handler.recent_rf_data = []
|
|
assert handler.find_recent_rf_data() is None
|
|
|
|
def test_returns_none_when_all_too_old(self, handler):
|
|
handler.rf_data_timeout = 5
|
|
handler.recent_rf_data = [self._rf_entry(age=100)]
|
|
assert handler.find_recent_rf_data() is None
|
|
|
|
def test_returns_most_recent_fallback(self, handler):
|
|
handler.rf_data_timeout = 30
|
|
entry = self._rf_entry(age=1)
|
|
handler.recent_rf_data = [entry]
|
|
result = handler.find_recent_rf_data()
|
|
assert result is entry
|
|
|
|
def test_exact_packet_prefix_match(self, handler):
|
|
handler.rf_data_timeout = 30
|
|
target = self._rf_entry(age=1, packet_prefix="deadbeefdeadbeef1234567890abcdef")
|
|
other = self._rf_entry(age=2, packet_prefix="00000000000000000000000000000000")
|
|
handler.recent_rf_data = [target, other]
|
|
result = handler.find_recent_rf_data("deadbeefdeadbeef1234567890abcdef")
|
|
assert result is target
|
|
|
|
def test_exact_pubkey_prefix_match(self, handler):
|
|
handler.rf_data_timeout = 30
|
|
target = self._rf_entry(age=1, pubkey_prefix="abcd", packet_prefix="")
|
|
other = self._rf_entry(age=2, pubkey_prefix="1111", packet_prefix="")
|
|
handler.recent_rf_data = [target, other]
|
|
result = handler.find_recent_rf_data("abcd")
|
|
assert result is target
|
|
|
|
def test_partial_packet_prefix_match(self, handler):
|
|
handler.rf_data_timeout = 30
|
|
long_prefix = "aabbccddeeff0011aabbccddeeff0011"
|
|
partial_key = "aabbccddeeff0011" + "xxxxxxxxxxxxxxxx"
|
|
target = self._rf_entry(age=1, packet_prefix=long_prefix, pubkey_prefix="")
|
|
handler.recent_rf_data = [target]
|
|
result = handler.find_recent_rf_data(partial_key)
|
|
assert result is target
|
|
|
|
def test_no_key_returns_most_recent(self, handler):
|
|
handler.rf_data_timeout = 30
|
|
old = self._rf_entry(age=10)
|
|
new = self._rf_entry(age=1)
|
|
handler.recent_rf_data = [old, new]
|
|
result = handler.find_recent_rf_data()
|
|
assert result["timestamp"] == new["timestamp"]
|
|
|
|
def test_custom_max_age(self, handler):
|
|
handler.rf_data_timeout = 30
|
|
entry = self._rf_entry(age=20)
|
|
handler.recent_rf_data = [entry]
|
|
# With max_age=5, entry is too old
|
|
assert handler.find_recent_rf_data(max_age_seconds=5) is None
|
|
# With max_age=30, entry is visible
|
|
assert handler.find_recent_rf_data(max_age_seconds=30) is entry
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# handle_raw_data
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestHandleRawData:
|
|
"""Tests for MessageHandler.handle_raw_data()."""
|
|
|
|
def _make_event(self, payload):
|
|
event = Mock()
|
|
event.payload = payload
|
|
return event
|
|
|
|
async def test_no_payload_logs_warning(self, handler):
|
|
event = Mock(spec=[])
|
|
handler.logger = Mock()
|
|
await handler.handle_raw_data(event)
|
|
handler.logger.warning.assert_called()
|
|
|
|
async def test_payload_none_logs_warning(self, handler):
|
|
event = Mock()
|
|
event.payload = None
|
|
handler.logger = Mock()
|
|
await handler.handle_raw_data(event)
|
|
handler.logger.warning.assert_called()
|
|
|
|
async def test_payload_without_data_field_logs_warning(self, handler):
|
|
event = self._make_event({"other": "stuff"})
|
|
handler.logger = Mock()
|
|
with patch.object(handler, "decode_meshcore_packet", return_value=None):
|
|
await handler.handle_raw_data(event)
|
|
handler.logger.warning.assert_called()
|
|
|
|
async def test_payload_with_hex_data_calls_decode(self, handler):
|
|
event = self._make_event({"data": "aabbccdd"})
|
|
handler.logger = Mock()
|
|
with patch.object(handler, "decode_meshcore_packet", return_value=None) as mock_decode:
|
|
await handler.handle_raw_data(event)
|
|
mock_decode.assert_called_once_with("aabbccdd")
|
|
|
|
async def test_payload_strips_0x_prefix(self, handler):
|
|
event = self._make_event({"data": "0xaabbccdd"})
|
|
handler.logger = Mock()
|
|
with patch.object(handler, "decode_meshcore_packet", return_value=None) as mock_decode:
|
|
await handler.handle_raw_data(event)
|
|
mock_decode.assert_called_once_with("aabbccdd")
|
|
|
|
async def test_decoded_packet_calls_process_advertisement(self, handler):
|
|
event = self._make_event({"data": "aabbccdd"})
|
|
handler.logger = Mock()
|
|
packet_info = {"type": "adv", "node_id": "ab"}
|
|
with patch.object(handler, "decode_meshcore_packet", return_value=packet_info):
|
|
with patch.object(handler, "_process_advertisement_packet", new_callable=AsyncMock) as mock_adv:
|
|
await handler.handle_raw_data(event)
|
|
mock_adv.assert_called_once_with(packet_info, None)
|
|
|
|
async def test_non_string_data_logs_warning(self, handler):
|
|
event = self._make_event({"data": 12345})
|
|
handler.logger = Mock()
|
|
await handler.handle_raw_data(event)
|
|
handler.logger.warning.assert_called()
|
|
|
|
async def test_exception_does_not_raise(self, handler):
|
|
event = self._make_event({"data": "aabb"})
|
|
handler.logger = Mock()
|
|
with patch.object(handler, "decode_meshcore_packet", side_effect=RuntimeError("oops")):
|
|
# Should not raise
|
|
await handler.handle_raw_data(event)
|
|
handler.logger.error.assert_called()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# handle_contact_message
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestHandleContactMessage:
|
|
"""Tests for MessageHandler.handle_contact_message()."""
|
|
|
|
def _make_event(self, payload):
|
|
event = Mock()
|
|
event.payload = payload
|
|
event.metadata = {}
|
|
return event
|
|
|
|
def _setup_handler(self, handler):
|
|
handler.logger = Mock()
|
|
handler.bot.meshcore = Mock()
|
|
handler.bot.meshcore.contacts = {}
|
|
handler.bot.translator = None
|
|
|
|
async def test_no_payload_returns_early(self, handler):
|
|
self._setup_handler(handler)
|
|
event = Mock(spec=[])
|
|
await handler.handle_contact_message(event)
|
|
handler.logger.warning.assert_called()
|
|
|
|
async def test_payload_none_returns_early(self, handler):
|
|
self._setup_handler(handler)
|
|
event = Mock()
|
|
event.payload = None
|
|
await handler.handle_contact_message(event)
|
|
handler.logger.warning.assert_called()
|
|
|
|
async def test_old_cached_message_not_processed(self, handler):
|
|
self._setup_handler(handler)
|
|
# Set connection_time in the future relative to an old timestamp
|
|
handler.bot.connection_time = time.time()
|
|
old_ts = int(time.time()) - 3600 # 1 hour old
|
|
event = self._make_event({
|
|
"pubkey_prefix": "ab12",
|
|
"text": "hello",
|
|
"path_len": 255,
|
|
"sender_timestamp": old_ts,
|
|
})
|
|
with patch.object(handler, "process_message", new_callable=AsyncMock) as mock_pm:
|
|
with patch.object(handler, "_debug_decode_message_path", new_callable=AsyncMock):
|
|
with patch.object(handler, "_debug_decode_packet_for_message", new_callable=AsyncMock):
|
|
await handler.handle_contact_message(event)
|
|
mock_pm.assert_not_called()
|
|
|
|
async def test_new_message_calls_process_message(self, handler):
|
|
self._setup_handler(handler)
|
|
handler.bot.connection_time = None # No connection time = don't filter
|
|
event = self._make_event({
|
|
"pubkey_prefix": "ab12",
|
|
"text": "hello",
|
|
"path_len": 255,
|
|
"sender_timestamp": int(time.time()),
|
|
})
|
|
with patch.object(handler, "process_message", new_callable=AsyncMock) as mock_pm:
|
|
with patch.object(handler, "_debug_decode_message_path", new_callable=AsyncMock):
|
|
with patch.object(handler, "_debug_decode_packet_for_message", new_callable=AsyncMock):
|
|
await handler.handle_contact_message(event)
|
|
mock_pm.assert_called_once()
|
|
|
|
async def test_snr_from_payload(self, handler):
|
|
self._setup_handler(handler)
|
|
handler.bot.connection_time = None
|
|
captured = {}
|
|
|
|
async def capture_message(msg):
|
|
captured["msg"] = msg
|
|
|
|
event = self._make_event({
|
|
"pubkey_prefix": "ab12",
|
|
"text": "hello",
|
|
"path_len": 255,
|
|
"sender_timestamp": int(time.time()),
|
|
"SNR": 7,
|
|
"RSSI": -70,
|
|
})
|
|
with patch.object(handler, "process_message", side_effect=capture_message):
|
|
with patch.object(handler, "_debug_decode_message_path", new_callable=AsyncMock):
|
|
with patch.object(handler, "_debug_decode_packet_for_message", new_callable=AsyncMock):
|
|
await handler.handle_contact_message(event)
|
|
assert captured["msg"].snr == 7
|
|
assert captured["msg"].rssi == -70
|
|
|
|
async def test_direct_path_len_255(self, handler):
|
|
self._setup_handler(handler)
|
|
handler.bot.connection_time = None
|
|
captured = {}
|
|
|
|
async def capture_message(msg):
|
|
captured["msg"] = msg
|
|
|
|
event = self._make_event({
|
|
"pubkey_prefix": "ab12",
|
|
"text": "hi",
|
|
"path_len": 255,
|
|
"sender_timestamp": int(time.time()),
|
|
})
|
|
with patch.object(handler, "process_message", side_effect=capture_message):
|
|
with patch.object(handler, "_debug_decode_message_path", new_callable=AsyncMock):
|
|
with patch.object(handler, "_debug_decode_packet_for_message", new_callable=AsyncMock):
|
|
await handler.handle_contact_message(event)
|
|
assert captured["msg"].is_dm is True
|
|
|
|
async def test_message_is_dm(self, handler):
|
|
self._setup_handler(handler)
|
|
handler.bot.connection_time = None
|
|
captured = {}
|
|
|
|
async def capture_message(msg):
|
|
captured["msg"] = msg
|
|
|
|
event = self._make_event({
|
|
"pubkey_prefix": "ab12",
|
|
"text": "dm text",
|
|
"path_len": 0,
|
|
"sender_timestamp": int(time.time()),
|
|
})
|
|
with patch.object(handler, "process_message", side_effect=capture_message):
|
|
with patch.object(handler, "_debug_decode_message_path", new_callable=AsyncMock):
|
|
with patch.object(handler, "_debug_decode_packet_for_message", new_callable=AsyncMock):
|
|
await handler.handle_contact_message(event)
|
|
assert captured["msg"].is_dm is True
|
|
assert captured["msg"].content == "dm text"
|
|
|
|
async def test_contact_name_lookup(self, handler):
|
|
self._setup_handler(handler)
|
|
handler.bot.connection_time = None
|
|
handler.bot.meshcore.contacts = {
|
|
"key1": {
|
|
"public_key": "ab12deadbeef",
|
|
"name": "Alice",
|
|
"out_path": "",
|
|
"out_path_len": 0,
|
|
}
|
|
}
|
|
captured = {}
|
|
|
|
async def capture_message(msg):
|
|
captured["msg"] = msg
|
|
|
|
event = self._make_event({
|
|
"pubkey_prefix": "ab12",
|
|
"text": "hi",
|
|
"path_len": 255,
|
|
"sender_timestamp": int(time.time()),
|
|
})
|
|
with patch.object(handler, "process_message", side_effect=capture_message):
|
|
with patch.object(handler, "_debug_decode_message_path", new_callable=AsyncMock):
|
|
with patch.object(handler, "_debug_decode_packet_for_message", new_callable=AsyncMock):
|
|
await handler.handle_contact_message(event)
|
|
assert captured["msg"].sender_id == "Alice"
|
|
|
|
async def test_exception_does_not_propagate(self, handler):
|
|
self._setup_handler(handler)
|
|
event = self._make_event({
|
|
"pubkey_prefix": "ab12",
|
|
"text": "hello",
|
|
"path_len": 255,
|
|
"sender_timestamp": int(time.time()),
|
|
})
|
|
with patch.object(handler, "_debug_decode_message_path", side_effect=RuntimeError("boom")):
|
|
# Should not raise
|
|
await handler.handle_contact_message(event)
|
|
handler.logger.error.assert_called()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# handle_channel_message
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestHandleChannelMessage:
|
|
"""Tests for MessageHandler.handle_channel_message()."""
|
|
|
|
def _setup_handler(self, handler):
|
|
handler.logger = Mock()
|
|
handler.bot.meshcore = Mock()
|
|
handler.bot.meshcore.contacts = {}
|
|
handler.bot.channel_manager = Mock()
|
|
handler.bot.channel_manager.get_channel_name = Mock(return_value="general")
|
|
handler.bot.translator = None
|
|
handler.bot.mesh_graph = None
|
|
handler.recent_rf_data = []
|
|
handler.enhanced_correlation = False
|
|
|
|
def _make_event(self, payload):
|
|
event = Mock()
|
|
event.payload = payload
|
|
return event
|
|
|
|
async def test_no_payload_returns_early(self, handler):
|
|
self._setup_handler(handler)
|
|
event = Mock(spec=[])
|
|
await handler.handle_channel_message(event)
|
|
handler.logger.warning.assert_called()
|
|
|
|
async def test_payload_none_returns_early(self, handler):
|
|
self._setup_handler(handler)
|
|
event = Mock()
|
|
event.payload = None
|
|
await handler.handle_channel_message(event)
|
|
handler.logger.warning.assert_called()
|
|
|
|
async def test_basic_channel_message_calls_process_message(self, handler):
|
|
self._setup_handler(handler)
|
|
handler.bot.connection_time = None
|
|
event = self._make_event({
|
|
"channel_idx": 0,
|
|
"text": "ALICE: hello world",
|
|
"path_len": 255,
|
|
"sender_timestamp": int(time.time()),
|
|
})
|
|
with patch.object(handler, "process_message", new_callable=AsyncMock) as mock_pm:
|
|
with patch.object(handler, "_debug_decode_message_path", new_callable=AsyncMock):
|
|
with patch.object(handler, "_debug_decode_packet_for_message", new_callable=AsyncMock):
|
|
await handler.handle_channel_message(event)
|
|
mock_pm.assert_called_once()
|
|
|
|
async def test_sender_extracted_from_text(self, handler):
|
|
self._setup_handler(handler)
|
|
handler.bot.connection_time = None
|
|
captured = {}
|
|
|
|
async def capture(msg):
|
|
captured["msg"] = msg
|
|
|
|
event = self._make_event({
|
|
"channel_idx": 0,
|
|
"text": "BOB: hi there",
|
|
"path_len": 0,
|
|
"sender_timestamp": int(time.time()),
|
|
})
|
|
with patch.object(handler, "process_message", side_effect=capture):
|
|
with patch.object(handler, "_debug_decode_message_path", new_callable=AsyncMock):
|
|
with patch.object(handler, "_debug_decode_packet_for_message", new_callable=AsyncMock):
|
|
await handler.handle_channel_message(event)
|
|
assert captured["msg"].sender_id == "BOB"
|
|
assert captured["msg"].content == "hi there"
|
|
|
|
async def test_text_without_colon_uses_full_text(self, handler):
|
|
self._setup_handler(handler)
|
|
handler.bot.connection_time = None
|
|
captured = {}
|
|
|
|
async def capture(msg):
|
|
captured["msg"] = msg
|
|
|
|
event = self._make_event({
|
|
"channel_idx": 0,
|
|
"text": "no colon here",
|
|
"path_len": 0,
|
|
"sender_timestamp": int(time.time()),
|
|
})
|
|
with patch.object(handler, "process_message", side_effect=capture):
|
|
with patch.object(handler, "_debug_decode_message_path", new_callable=AsyncMock):
|
|
with patch.object(handler, "_debug_decode_packet_for_message", new_callable=AsyncMock):
|
|
await handler.handle_channel_message(event)
|
|
assert captured["msg"].content == "no colon here"
|
|
|
|
async def test_old_cached_message_not_processed(self, handler):
|
|
self._setup_handler(handler)
|
|
handler.bot.connection_time = time.time()
|
|
old_ts = int(time.time()) - 3600
|
|
event = self._make_event({
|
|
"channel_idx": 0,
|
|
"text": "CAROL: old msg",
|
|
"path_len": 0,
|
|
"sender_timestamp": old_ts,
|
|
})
|
|
with patch.object(handler, "process_message", new_callable=AsyncMock) as mock_pm:
|
|
with patch.object(handler, "_debug_decode_message_path", new_callable=AsyncMock):
|
|
with patch.object(handler, "_debug_decode_packet_for_message", new_callable=AsyncMock):
|
|
await handler.handle_channel_message(event)
|
|
mock_pm.assert_not_called()
|
|
|
|
async def test_snr_from_payload(self, handler):
|
|
self._setup_handler(handler)
|
|
handler.bot.connection_time = None
|
|
captured = {}
|
|
|
|
async def capture(msg):
|
|
captured["msg"] = msg
|
|
|
|
event = self._make_event({
|
|
"channel_idx": 0,
|
|
"text": "DAN: test",
|
|
"path_len": 0,
|
|
"sender_timestamp": int(time.time()),
|
|
"SNR": 9,
|
|
"RSSI": -85,
|
|
})
|
|
with patch.object(handler, "process_message", side_effect=capture):
|
|
with patch.object(handler, "_debug_decode_message_path", new_callable=AsyncMock):
|
|
with patch.object(handler, "_debug_decode_packet_for_message", new_callable=AsyncMock):
|
|
await handler.handle_channel_message(event)
|
|
assert captured["msg"].snr == 9
|
|
assert captured["msg"].rssi == -85
|
|
|
|
async def test_channel_name_set_on_message(self, handler):
|
|
self._setup_handler(handler)
|
|
handler.bot.connection_time = None
|
|
handler.bot.channel_manager.get_channel_name = Mock(return_value="emergency")
|
|
captured = {}
|
|
|
|
async def capture(msg):
|
|
captured["msg"] = msg
|
|
|
|
event = self._make_event({
|
|
"channel_idx": 2,
|
|
"text": "EVE: help",
|
|
"path_len": 0,
|
|
"sender_timestamp": int(time.time()),
|
|
})
|
|
with patch.object(handler, "process_message", side_effect=capture):
|
|
with patch.object(handler, "_debug_decode_message_path", new_callable=AsyncMock):
|
|
with patch.object(handler, "_debug_decode_packet_for_message", new_callable=AsyncMock):
|
|
await handler.handle_channel_message(event)
|
|
assert captured["msg"].channel == "emergency"
|
|
assert captured["msg"].is_dm is False
|
|
|
|
async def test_exception_does_not_propagate(self, handler):
|
|
self._setup_handler(handler)
|
|
event = self._make_event({
|
|
"channel_idx": 0,
|
|
"text": "FRANK: crash",
|
|
"path_len": 0,
|
|
"sender_timestamp": int(time.time()),
|
|
})
|
|
with patch.object(handler, "_debug_decode_message_path", side_effect=RuntimeError("boom")):
|
|
await handler.handle_channel_message(event)
|
|
handler.logger.error.assert_called()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Packet construction helpers (used across multiple test classes below)
|
|
# ---------------------------------------------------------------------------
|
|
#
|
|
# MeshCore packet binary layout (from decode_meshcore_packet / Packet.cpp):
|
|
#
|
|
# header (1 byte):
|
|
# bits 7-6 = payload_version (0 = VER_1, must be 0)
|
|
# bits 5-2 = payload_type (ADVERT=4, TXT_MSG=2, GRP_TXT=5, TRACE=9, …)
|
|
# bits 1-0 = route_type (TRANSPORT_FLOOD=0, FLOOD=1, DIRECT=2, TRANSPORT_DIRECT=3)
|
|
# [transport bytes: 4 bytes if route_type is TRANSPORT_FLOOD or TRANSPORT_DIRECT]
|
|
# path_len_byte (1 byte):
|
|
# low 6 bits = hop_count
|
|
# high 2 bits = size_code (bytes_per_hop = size_code + 1)
|
|
# => path_byte_length = hop_count * bytes_per_hop
|
|
# path bytes (path_byte_length bytes)
|
|
# payload bytes (remainder)
|
|
#
|
|
# header = (0 << 6) | (payload_type << 2) | route_type
|
|
#
|
|
# Pre-computed examples used in tests:
|
|
# FLOOD(1)+ADVERT(4), 0 hops: header=0x11, path_len=0x00 → "1100"
|
|
# FLOOD(1)+TXT_MSG(2), 0 hops: header=0x09, path_len=0x00 → "0900"
|
|
# FLOOD(1)+ADVERT(4), 2 hops 1-byte (AB,CD): header=0x11, path_len=0x02 → "110202abcdfeed"
|
|
# DIRECT(2)+GRP_TXT(5), 0 hops: header=0x16, path_len=0x00 → "1600"
|
|
# TRANSPORT_FLOOD(0)+TXT_MSG(2), 4-byte transport, 0 hops: header=0x08 → "0801020304 00 ff"
|
|
# FLOOD(1)+TRACE(9), 2 hops, payload: header=0x25
|
|
#
|
|
# Advert payload format (for parse_advert, from AdvertDataHelpers.h):
|
|
# bytes 0-31: public_key (32 bytes)
|
|
# bytes 32-35: timestamp (uint32 little-endian)
|
|
# bytes 36-99: signature (64 bytes)
|
|
# byte 100: flags_byte (app_data[0])
|
|
# bits 3-0 = adv_type (CHAT=1, REPEATER=2, ROOM=3, SENSOR=4)
|
|
# bit 4 = ADV_LATLON_MASK (has location: 8 bytes lat+lon)
|
|
# bit 5 = ADV_FEAT1_MASK (has feat1: 2 bytes)
|
|
# bit 6 = ADV_FEAT2_MASK (has feat2: 2 bytes)
|
|
# bit 7 = ADV_NAME_MASK (has name: remaining bytes as UTF-8)
|
|
# bytes 101+: optional location / feat1 / feat2 / name
|
|
|
|
def _make_advert_payload(
|
|
flags_byte: int,
|
|
*,
|
|
pub_key: bytes = b"\xaa" * 32,
|
|
timestamp: int = 1700000000,
|
|
signature: bytes = b"\xbb" * 64,
|
|
location_lat_raw: int = 0,
|
|
location_lon_raw: int = 0,
|
|
feat1: int = 0,
|
|
feat2: int = 0,
|
|
name: str = "",
|
|
) -> bytes:
|
|
"""Build a minimal valid advert payload byte string for parse_advert()."""
|
|
# Header: pub_key (32) + timestamp (4 little-endian) + signature (64) = 100 bytes
|
|
ts_bytes = timestamp.to_bytes(4, "little")
|
|
header = pub_key[:32] + ts_bytes + signature[:64]
|
|
assert len(header) == 100
|
|
|
|
app_data = bytes([flags_byte])
|
|
|
|
# Optional location (8 bytes): lat (int32 LE) + lon (int32 LE)
|
|
if flags_byte & 0x10:
|
|
app_data += location_lat_raw.to_bytes(4, "little", signed=True)
|
|
app_data += location_lon_raw.to_bytes(4, "little", signed=True)
|
|
|
|
# Optional feat1 (2 bytes)
|
|
if flags_byte & 0x20:
|
|
app_data += feat1.to_bytes(2, "little")
|
|
|
|
# Optional feat2 (2 bytes)
|
|
if flags_byte & 0x40:
|
|
app_data += feat2.to_bytes(2, "little")
|
|
|
|
# Optional name (variable length UTF-8)
|
|
if flags_byte & 0x80:
|
|
app_data += name.encode("utf-8")
|
|
|
|
return header + app_data
|
|
|
|
|
|
def _make_packet_hex(
|
|
payload_type: int,
|
|
route_type: int,
|
|
path_bytes: bytes = b"",
|
|
payload_bytes: bytes = b"\xfe",
|
|
*,
|
|
hop_count: int = 0,
|
|
bytes_per_hop: int = 1,
|
|
transport: bytes = b"",
|
|
) -> str:
|
|
"""Build a valid MeshCore packet hex string for decode_meshcore_packet()."""
|
|
header = (0 << 6) | (payload_type << 2) | route_type
|
|
# path_len_byte: high 2 bits = size_code (bytes_per_hop - 1), low 6 bits = hop_count
|
|
size_code = bytes_per_hop - 1
|
|
path_len_byte = (size_code << 6) | (hop_count & 0x3F)
|
|
pkt = bytes([header]) + transport + bytes([path_len_byte]) + path_bytes + payload_bytes
|
|
return pkt.hex()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# decode_meshcore_packet
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestDecodeMeshcorePacket:
|
|
"""Tests for MessageHandler.decode_meshcore_packet() — pure hex/binary parsing."""
|
|
|
|
# --- invalid / edge-case inputs ---
|
|
|
|
def test_none_raw_hex_returns_none(self, handler):
|
|
result = handler.decode_meshcore_packet(None)
|
|
assert result is None
|
|
|
|
def test_empty_raw_hex_returns_none(self, handler):
|
|
result = handler.decode_meshcore_packet("")
|
|
assert result is None
|
|
|
|
def test_single_byte_too_short_returns_none(self, handler):
|
|
# 1 byte only → fails minimum size check (< 2)
|
|
result = handler.decode_meshcore_packet("11")
|
|
assert result is None
|
|
|
|
def test_invalid_hex_string_raises_or_returns_none(self, handler):
|
|
# "ZZZZZZ" is not valid hex. bytes.fromhex() raises ValueError inside the try-block;
|
|
# the except handler then tries to reference the unbound local `byte_data` in its log
|
|
# message, which produces an UnboundLocalError that propagates out.
|
|
# Either behaviour (None return or propagated exception) is acceptable here; the
|
|
# important thing is that the method does NOT silently succeed.
|
|
try:
|
|
result = handler.decode_meshcore_packet("ZZZZZZ")
|
|
assert result is None
|
|
except (ValueError, UnboundLocalError):
|
|
pass # expected — source-level bug causes exception to propagate
|
|
|
|
def test_0x_prefix_stripped(self, handler):
|
|
# Prepend '0x' — should be stripped transparently
|
|
hex_no_prefix = _make_packet_hex(4, 1, payload_bytes=b"\xde")
|
|
result = handler.decode_meshcore_packet("0x" + hex_no_prefix)
|
|
assert result is not None
|
|
assert result["route_type_name"] == "FLOOD"
|
|
assert result["payload_type_name"] == "ADVERT"
|
|
|
|
def test_payload_hex_preferred_over_raw_hex(self, handler):
|
|
# raw_hex encodes a TXT_MSG, payload_hex encodes an ADVERT
|
|
raw_txt = _make_packet_hex(2, 1, payload_bytes=b"\x01")
|
|
raw_adv = _make_packet_hex(4, 1, payload_bytes=b"\x02")
|
|
result = handler.decode_meshcore_packet(raw_txt, payload_hex=raw_adv)
|
|
# Should decode from payload_hex (ADVERT), not raw_hex (TXT_MSG)
|
|
assert result["payload_type_name"] == "ADVERT"
|
|
|
|
def test_unknown_payload_version_returns_none(self, handler):
|
|
# Build a packet with payload_version = 1 (VER_2) in bits 7-6
|
|
payload_type = 2 # TXT_MSG
|
|
route_type = 1 # FLOOD
|
|
header = (1 << 6) | (payload_type << 2) | route_type # version bits = 01
|
|
path_len_byte = 0x00
|
|
pkt = bytes([header, path_len_byte, 0xAA])
|
|
result = handler.decode_meshcore_packet(pkt.hex())
|
|
assert result is None
|
|
|
|
# --- FLOOD route ---
|
|
|
|
def test_flood_advert_no_path(self, handler):
|
|
hex_str = _make_packet_hex(4, 1, payload_bytes=b"\xde\xad")
|
|
result = handler.decode_meshcore_packet(hex_str)
|
|
assert result is not None
|
|
assert result["route_type_name"] == "FLOOD"
|
|
assert result["payload_type_name"] == "ADVERT"
|
|
assert result["route_type"] == 1
|
|
assert result["payload_type"] == 4
|
|
assert result["payload_version"] == 0
|
|
assert result["has_transport_codes"] is False
|
|
assert result["transport_codes"] is None
|
|
assert result["path_len"] == 0
|
|
assert result["path"] == []
|
|
assert result["path_hex"] == ""
|
|
assert result["payload_hex"] == "dead"
|
|
|
|
def test_flood_txt_msg_no_path(self, handler):
|
|
hex_str = _make_packet_hex(2, 1, payload_bytes=b"\x48\x69")
|
|
result = handler.decode_meshcore_packet(hex_str)
|
|
assert result is not None
|
|
assert result["payload_type_name"] == "TXT_MSG"
|
|
assert result["route_type_name"] == "FLOOD"
|
|
|
|
def test_flood_advert_two_hops_one_byte(self, handler):
|
|
path = bytes([0xAB, 0xCD])
|
|
hex_str = _make_packet_hex(
|
|
4, 1,
|
|
path_bytes=path,
|
|
payload_bytes=b"\xEE",
|
|
hop_count=2,
|
|
bytes_per_hop=1,
|
|
)
|
|
result = handler.decode_meshcore_packet(hex_str)
|
|
assert result is not None
|
|
assert result["path_len"] == 2
|
|
assert result["path_byte_length"] == 2
|
|
assert result["bytes_per_hop"] == 1
|
|
assert result["path_hex"] == "abcd"
|
|
assert result["path"] == ["AB", "CD"]
|
|
|
|
def test_flood_advert_two_hops_two_bytes(self, handler):
|
|
# 2 hops, 2 bytes each → 4 path bytes
|
|
path = bytes([0x01, 0x02, 0xAB, 0xCD])
|
|
hex_str = _make_packet_hex(
|
|
4, 1,
|
|
path_bytes=path,
|
|
payload_bytes=b"\xEE",
|
|
hop_count=2,
|
|
bytes_per_hop=2,
|
|
)
|
|
result = handler.decode_meshcore_packet(hex_str)
|
|
assert result is not None
|
|
assert result["bytes_per_hop"] == 2
|
|
assert result["path_byte_length"] == 4
|
|
assert result["path_len"] == 2
|
|
assert result["path"] == ["0102", "ABCD"]
|
|
|
|
# --- DIRECT route ---
|
|
|
|
def test_direct_grp_txt_no_path(self, handler):
|
|
hex_str = _make_packet_hex(5, 2, payload_bytes=b"\x01\x02\x03")
|
|
result = handler.decode_meshcore_packet(hex_str)
|
|
assert result is not None
|
|
assert result["route_type_name"] == "DIRECT"
|
|
assert result["payload_type_name"] == "GRP_TXT"
|
|
assert result["has_transport_codes"] is False
|
|
|
|
# --- TRANSPORT_FLOOD route (has 4 transport bytes) ---
|
|
|
|
def test_transport_flood_has_transport_codes(self, handler):
|
|
transport = bytes([0x01, 0x02, 0x03, 0x04])
|
|
hex_str = _make_packet_hex(
|
|
2, 0,
|
|
payload_bytes=b"\xFF",
|
|
transport=transport,
|
|
)
|
|
result = handler.decode_meshcore_packet(hex_str)
|
|
assert result is not None
|
|
assert result["route_type_name"] == "TRANSPORT_FLOOD"
|
|
assert result["has_transport_codes"] is True
|
|
assert result["transport_codes"] is not None
|
|
assert result["transport_codes"]["code1"] == 0x0201
|
|
assert result["transport_codes"]["code2"] == 0x0403
|
|
|
|
def test_transport_direct_has_transport_codes(self, handler):
|
|
transport = bytes([0x0A, 0x0B, 0x0C, 0x0D])
|
|
hex_str = _make_packet_hex(
|
|
4, 3,
|
|
payload_bytes=b"\xAA",
|
|
transport=transport,
|
|
)
|
|
result = handler.decode_meshcore_packet(hex_str)
|
|
assert result is not None
|
|
assert result["route_type_name"] == "TRANSPORT_DIRECT"
|
|
assert result["has_transport_codes"] is True
|
|
|
|
# --- Too-short packet after stripping transport ---
|
|
|
|
def test_too_short_for_path_len_returns_none(self, handler):
|
|
# TRANSPORT_FLOOD needs 1 (header) + 4 (transport) + 1 (path_len) = 6 bytes minimum
|
|
# Provide only header + 4 transport bytes (no path_len byte)
|
|
header = (0 << 6) | (2 << 2) | 0 # TRANSPORT_FLOOD + TXT_MSG
|
|
pkt = bytes([header, 0x01, 0x02, 0x03, 0x04]) # 5 bytes, missing path_len
|
|
result = handler.decode_meshcore_packet(pkt.hex())
|
|
assert result is None
|
|
|
|
def test_path_bytes_exceed_available_data_returns_none(self, handler):
|
|
# Claim 3 hops (3 path bytes) but only provide 2 in the packet
|
|
header = (0 << 6) | (4 << 2) | 1 # FLOOD + ADVERT
|
|
path_len_byte = 0x03 # 3 hops, 1 byte each
|
|
pkt = bytes([header, path_len_byte, 0xAA, 0xBB]) # only 2 path bytes
|
|
result = handler.decode_meshcore_packet(pkt.hex())
|
|
assert result is None
|
|
|
|
# --- All standard payload types decode without crashing ---
|
|
|
|
def test_all_payload_types_decode(self, handler):
|
|
known_types = {
|
|
0x00: "REQ",
|
|
0x01: "RESPONSE",
|
|
0x02: "TXT_MSG",
|
|
0x03: "ACK",
|
|
0x04: "ADVERT",
|
|
0x05: "GRP_TXT",
|
|
0x06: "GRP_DATA",
|
|
0x07: "ANON_REQ",
|
|
0x08: "PATH",
|
|
0x09: "TRACE",
|
|
0x0A: "MULTIPART",
|
|
0x0F: "RAW_CUSTOM",
|
|
}
|
|
for pt_val, expected_name in known_types.items():
|
|
hex_str = _make_packet_hex(pt_val, 1, payload_bytes=b"\x01\x02")
|
|
result = handler.decode_meshcore_packet(hex_str)
|
|
assert result is not None, f"Expected non-None for payload_type 0x{pt_val:02x}"
|
|
assert result["payload_type_name"] == expected_name
|
|
|
|
# --- Return dict structure completeness ---
|
|
|
|
def test_return_dict_has_expected_keys(self, handler):
|
|
hex_str = _make_packet_hex(4, 1, payload_bytes=b"\xAA")
|
|
result = handler.decode_meshcore_packet(hex_str)
|
|
required_keys = {
|
|
"header", "route_type", "route_type_name", "payload_type", "payload_type_name",
|
|
"payload_version", "route_type_enum", "payload_type_enum", "payload_version_enum",
|
|
"has_transport_codes", "transport_codes", "transport_size",
|
|
"path_len", "path_byte_length", "bytes_per_hop",
|
|
"path_info", "path", "path_hex", "payload_hex", "payload_bytes",
|
|
}
|
|
for key in required_keys:
|
|
assert key in result, f"Missing key: {key}"
|
|
|
|
def test_trace_packet_path_info_type(self, handler):
|
|
# TRACE(9) + FLOOD(1): path_info should have type='trace'
|
|
# Provide minimal trace payload: tag(4) + auth(4) + flags(1) = 9 bytes
|
|
trace_payload = b"\x00" * 9
|
|
hex_str = _make_packet_hex(9, 1, payload_bytes=trace_payload)
|
|
result = handler.decode_meshcore_packet(hex_str)
|
|
assert result is not None
|
|
assert result["path_info"]["type"] == "trace"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# parse_advert
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestParseAdvert:
|
|
"""Tests for MessageHandler.parse_advert() — pure binary advert parsing."""
|
|
|
|
def test_too_short_payload_returns_empty_dict(self, handler):
|
|
# Must be >= 101 bytes
|
|
result = handler.parse_advert(b"\x00" * 100)
|
|
assert result == {}
|
|
|
|
def test_none_length_payload_short(self, handler):
|
|
result = handler.parse_advert(b"")
|
|
assert result == {}
|
|
|
|
def test_no_app_data_after_100_bytes_returns_empty(self, handler):
|
|
# Exactly 100 bytes means app_data is empty → returns {}
|
|
result = handler.parse_advert(b"\x00" * 100)
|
|
assert result == {}
|
|
|
|
def test_companion_advert_basic(self, handler):
|
|
# ADV_TYPE_CHAT=0x01, no optional fields
|
|
payload = _make_advert_payload(0x01)
|
|
result = handler.parse_advert(payload)
|
|
assert result is not None
|
|
assert result != {}
|
|
assert result["mode"] == "Companion"
|
|
assert "public_key" in result
|
|
assert len(result["public_key"]) == 64 # 32 bytes → 64 hex chars
|
|
assert "advert_time" in result
|
|
assert result["advert_time"] == 1700000000
|
|
|
|
def test_repeater_advert(self, handler):
|
|
payload = _make_advert_payload(0x02) # ADV_TYPE_REPEATER
|
|
result = handler.parse_advert(payload)
|
|
assert result["mode"] == "Repeater"
|
|
|
|
def test_room_server_advert(self, handler):
|
|
payload = _make_advert_payload(0x03) # ADV_TYPE_ROOM
|
|
result = handler.parse_advert(payload)
|
|
assert result["mode"] == "RoomServer"
|
|
|
|
def test_sensor_advert(self, handler):
|
|
payload = _make_advert_payload(0x04) # ADV_TYPE_SENSOR
|
|
result = handler.parse_advert(payload)
|
|
assert result["mode"] == "Sensor"
|
|
|
|
def test_unknown_type_advert(self, handler):
|
|
payload = _make_advert_payload(0x05) # No matching type
|
|
result = handler.parse_advert(payload)
|
|
assert result["mode"] == "Type5"
|
|
|
|
def test_companion_with_name(self, handler):
|
|
# ADV_NAME_MASK=0x80 | ADV_TYPE_CHAT=0x01
|
|
payload = _make_advert_payload(0x81, name="TestNode")
|
|
result = handler.parse_advert(payload)
|
|
assert result["mode"] == "Companion"
|
|
assert result["name"] == "TestNode"
|
|
|
|
def test_companion_with_location(self, handler):
|
|
# ADV_LATLON_MASK=0x10 | ADV_TYPE_CHAT=0x01
|
|
# lat=47.606209 → raw=47606209, lon=-122.332069 → raw=-122332069
|
|
lat_raw = 47606209
|
|
lon_raw = -122332069
|
|
payload = _make_advert_payload(
|
|
0x11,
|
|
location_lat_raw=lat_raw,
|
|
location_lon_raw=lon_raw,
|
|
)
|
|
result = handler.parse_advert(payload)
|
|
assert result["mode"] == "Companion"
|
|
assert "lat" in result
|
|
assert "lon" in result
|
|
assert abs(result["lat"] - round(lat_raw / 1_000_000, 6)) < 1e-5
|
|
assert abs(result["lon"] - round(lon_raw / 1_000_000, 6)) < 1e-5
|
|
|
|
def test_advert_with_name_and_location(self, handler):
|
|
# ADV_LATLON_MASK=0x10 | ADV_NAME_MASK=0x80 | ADV_TYPE_CHAT=0x01 = 0x91
|
|
payload = _make_advert_payload(
|
|
0x91,
|
|
location_lat_raw=10000000,
|
|
location_lon_raw=-20000000,
|
|
name="Rooftop",
|
|
)
|
|
result = handler.parse_advert(payload)
|
|
assert result["mode"] == "Companion"
|
|
assert "lat" in result and "lon" in result
|
|
assert result["name"] == "Rooftop"
|
|
|
|
def test_advert_with_feat1(self, handler):
|
|
# ADV_FEAT1_MASK=0x20 | ADV_TYPE_CHAT=0x01 = 0x21
|
|
payload = _make_advert_payload(0x21, feat1=0x1234)
|
|
result = handler.parse_advert(payload)
|
|
assert result["feat1"] == 0x1234
|
|
|
|
def test_advert_with_feat2(self, handler):
|
|
# ADV_FEAT2_MASK=0x40 | ADV_TYPE_CHAT=0x01 = 0x41
|
|
payload = _make_advert_payload(0x41, feat2=0xABCD)
|
|
result = handler.parse_advert(payload)
|
|
assert result["feat2"] == 0xABCD
|
|
|
|
def test_advert_with_all_optional_fields(self, handler):
|
|
# 0x10 | 0x20 | 0x40 | 0x80 | 0x02 = 0xF2 (REPEATER with all flags)
|
|
payload = _make_advert_payload(
|
|
0xF2,
|
|
location_lat_raw=1000000,
|
|
location_lon_raw=-2000000,
|
|
feat1=0x0001,
|
|
feat2=0x0002,
|
|
name="AllFlagsRepeater",
|
|
)
|
|
result = handler.parse_advert(payload)
|
|
assert result["mode"] == "Repeater"
|
|
assert "lat" in result
|
|
assert "lon" in result
|
|
assert result["feat1"] == 0x0001
|
|
assert result["feat2"] == 0x0002
|
|
assert result["name"] == "AllFlagsRepeater"
|
|
|
|
def test_location_flag_but_too_short_returns_partial(self, handler):
|
|
# ADV_LATLON_MASK set but only 1 app_data byte (the flags) → too short for lat+lon
|
|
pub_key = b"\xaa" * 32
|
|
ts = (1700000000).to_bytes(4, "little")
|
|
sig = b"\xbb" * 64
|
|
flags = bytes([0x11]) # ADV_LATLON_MASK | ADV_TYPE_CHAT — no lat/lon data after
|
|
payload = pub_key + ts + sig + flags # 101 bytes, but no location data
|
|
result = handler.parse_advert(payload)
|
|
# Method returns advert dict without location (early return on short data)
|
|
assert "mode" in result
|
|
assert "lat" not in result
|
|
|
|
def test_public_key_in_result(self, handler):
|
|
pub_key = bytes(range(32)) # 0x00..0x1f
|
|
payload = _make_advert_payload(0x01, pub_key=pub_key)
|
|
result = handler.parse_advert(payload)
|
|
assert result["public_key"] == bytes(range(32)).hex()
|
|
|
|
def test_signature_in_result(self, handler):
|
|
sig = bytes([0xFF] * 64)
|
|
payload = _make_advert_payload(0x01, signature=sig)
|
|
result = handler.parse_advert(payload)
|
|
assert result["signature"] == "ff" * 64
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# store_message_for_correlation and cleanup_old_messages
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestMessageCorrelation:
|
|
"""Tests for store_message_for_correlation(), cleanup_old_messages(), and
|
|
correlate_message_with_rf_data()."""
|
|
|
|
def test_store_message_adds_to_pending(self, handler):
|
|
handler.store_message_for_correlation("msg-001", {"pubkey_prefix": "aa"})
|
|
assert "msg-001" in handler.pending_messages
|
|
entry = handler.pending_messages["msg-001"]
|
|
assert entry["data"] == {"pubkey_prefix": "aa"}
|
|
assert entry["processed"] is False
|
|
assert isinstance(entry["timestamp"], float)
|
|
|
|
def test_store_message_overwrites_existing(self, handler):
|
|
handler.store_message_for_correlation("dup", {"v": 1})
|
|
handler.store_message_for_correlation("dup", {"v": 2})
|
|
assert handler.pending_messages["dup"]["data"]["v"] == 2
|
|
|
|
def test_cleanup_removes_expired_entries(self, handler):
|
|
handler.message_timeout = 5.0
|
|
# Store a message then backdate its timestamp well past the timeout
|
|
handler.store_message_for_correlation("old-msg", {"x": 1})
|
|
handler.pending_messages["old-msg"]["timestamp"] = time.time() - 100
|
|
handler.cleanup_old_messages()
|
|
assert "old-msg" not in handler.pending_messages
|
|
|
|
def test_cleanup_keeps_fresh_entries(self, handler):
|
|
handler.message_timeout = 60.0
|
|
handler.store_message_for_correlation("fresh", {"x": 2})
|
|
handler.cleanup_old_messages()
|
|
assert "fresh" in handler.pending_messages
|
|
|
|
def test_cleanup_empty_pending_is_safe(self, handler):
|
|
handler.pending_messages = {}
|
|
handler.cleanup_old_messages() # Should not raise
|
|
|
|
def test_correlate_unknown_message_id_returns_none(self, handler):
|
|
result = handler.correlate_message_with_rf_data("nonexistent-id")
|
|
assert result is None
|
|
|
|
def test_correlate_message_with_matching_rf_data(self, handler):
|
|
handler.store_message_for_correlation("m1", {"pubkey_prefix": "aabb"})
|
|
rf = {
|
|
"timestamp": time.time(),
|
|
"snr": 5,
|
|
"rssi": -80,
|
|
"packet_prefix": "aabb",
|
|
"pubkey_prefix": "aabb",
|
|
}
|
|
handler.recent_rf_data = [rf]
|
|
handler.rf_data_timeout = 60
|
|
result = handler.correlate_message_with_rf_data("m1")
|
|
assert result is not None
|
|
assert handler.pending_messages["m1"]["processed"] is True
|
|
|
|
def test_correlate_no_matching_rf_returns_none(self, handler):
|
|
handler.store_message_for_correlation("m2", {"pubkey_prefix": "ffff"})
|
|
handler.recent_rf_data = []
|
|
result = handler.correlate_message_with_rf_data("m2")
|
|
assert result is None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# try_correlate_pending_messages
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestTryCorrelatePendingMessages:
|
|
"""Tests for MessageHandler.try_correlate_pending_messages()."""
|
|
|
|
def test_marks_matching_message_processed(self, handler):
|
|
handler.store_message_for_correlation("pm1", {"pubkey_prefix": "ccdd"})
|
|
rf_data = {"pubkey_prefix": "ccdd", "packet_prefix": "ccdd", "timestamp": time.time()}
|
|
handler.try_correlate_pending_messages(rf_data)
|
|
assert handler.pending_messages["pm1"]["processed"] is True
|
|
|
|
def test_skips_already_processed_messages(self, handler):
|
|
handler.store_message_for_correlation("pm2", {"pubkey_prefix": "eeff"})
|
|
handler.pending_messages["pm2"]["processed"] = True
|
|
rf_data = {"pubkey_prefix": "eeff", "packet_prefix": "eeff", "timestamp": time.time()}
|
|
# Should not raise; processed flag remains True
|
|
handler.try_correlate_pending_messages(rf_data)
|
|
assert handler.pending_messages["pm2"]["processed"] is True
|
|
|
|
def test_no_match_does_not_mark_processed(self, handler):
|
|
handler.store_message_for_correlation("pm3", {"pubkey_prefix": "1111"})
|
|
rf_data = {"pubkey_prefix": "9999", "packet_prefix": "9999", "timestamp": time.time()}
|
|
handler.try_correlate_pending_messages(rf_data)
|
|
assert handler.pending_messages["pm3"]["processed"] is False
|
|
|
|
def test_partial_prefix_match_16chars(self, handler):
|
|
# If both pubkey_prefixes share first 16 chars, they correlate
|
|
long_key = "aabbccddeeff0011aabbccddeeff0011"
|
|
handler.store_message_for_correlation("pm4", {"pubkey_prefix": long_key})
|
|
rf_data = {"pubkey_prefix": long_key, "packet_prefix": long_key, "timestamp": time.time()}
|
|
handler.try_correlate_pending_messages(rf_data)
|
|
assert handler.pending_messages["pm4"]["processed"] is True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# handle_rf_log_data
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestHandleRfLogData:
|
|
"""Tests for MessageHandler.handle_rf_log_data() — async event handler."""
|
|
|
|
def _make_event(self, payload):
|
|
event = Mock()
|
|
event.payload = payload
|
|
return event
|
|
|
|
def _setup_handler(self, handler):
|
|
handler.logger = Mock()
|
|
handler.bot.transmission_tracker = None
|
|
handler.bot.web_viewer_integration = None
|
|
|
|
async def test_no_payload_attribute_logs_warning(self, handler):
|
|
self._setup_handler(handler)
|
|
event = Mock(spec=[]) # no .payload attribute
|
|
await handler.handle_rf_log_data(event)
|
|
handler.logger.warning.assert_called()
|
|
|
|
async def test_payload_none_logs_warning(self, handler):
|
|
self._setup_handler(handler)
|
|
event = Mock()
|
|
event.payload = None
|
|
await handler.handle_rf_log_data(event)
|
|
handler.logger.warning.assert_called()
|
|
|
|
async def test_payload_without_snr_field_no_store(self, handler):
|
|
self._setup_handler(handler)
|
|
event = self._make_event({"raw_hex": "1100de", "rssi": -80})
|
|
await handler.handle_rf_log_data(event)
|
|
# No SNR field → nothing stored in recent_rf_data
|
|
assert len(handler.recent_rf_data) == 0
|
|
|
|
async def test_snr_without_raw_hex_no_store(self, handler):
|
|
self._setup_handler(handler)
|
|
# Has snr but no raw_hex → packet_prefix is None → no store
|
|
event = self._make_event({"snr": 5.0})
|
|
await handler.handle_rf_log_data(event)
|
|
assert len(handler.recent_rf_data) == 0
|
|
|
|
async def test_snr_cached_from_packet_prefix(self, handler):
|
|
self._setup_handler(handler)
|
|
raw_hex = "a" * 64 # 32 hex chars → packet_prefix is first 32 chars = "a"*32
|
|
event = self._make_event({"snr": 7.5, "raw_hex": raw_hex})
|
|
await handler.handle_rf_log_data(event)
|
|
expected_prefix = raw_hex[:32]
|
|
assert handler.snr_cache.get(expected_prefix) == 7.5
|
|
|
|
async def test_rssi_cached_from_packet_prefix(self, handler):
|
|
self._setup_handler(handler)
|
|
raw_hex = "b" * 64
|
|
event = self._make_event({"snr": 3.0, "rssi": -95, "raw_hex": raw_hex})
|
|
await handler.handle_rf_log_data(event)
|
|
expected_prefix = raw_hex[:32]
|
|
assert handler.rssi_cache.get(expected_prefix) == -95
|
|
|
|
async def test_rf_data_stored_in_recent_rf_data(self, handler):
|
|
self._setup_handler(handler)
|
|
raw_hex = "c" * 64
|
|
event = self._make_event({"snr": 4.0, "raw_hex": raw_hex})
|
|
await handler.handle_rf_log_data(event)
|
|
assert len(handler.recent_rf_data) == 1
|
|
entry = handler.recent_rf_data[0]
|
|
assert entry["snr"] == 4.0
|
|
assert entry["packet_prefix"] == raw_hex[:32]
|
|
|
|
async def test_rf_data_added_to_timestamp_index(self, handler):
|
|
self._setup_handler(handler)
|
|
raw_hex = "d" * 64
|
|
event = self._make_event({"snr": 2.0, "raw_hex": raw_hex})
|
|
await handler.handle_rf_log_data(event)
|
|
assert len(handler.rf_data_by_timestamp) == 1
|
|
|
|
async def test_rf_data_added_to_pubkey_index(self, handler):
|
|
self._setup_handler(handler)
|
|
raw_hex = "e" * 64
|
|
event = self._make_event({"snr": 1.0, "raw_hex": raw_hex})
|
|
await handler.handle_rf_log_data(event)
|
|
prefix = raw_hex[:32]
|
|
assert prefix in handler.rf_data_by_pubkey
|
|
|
|
async def test_pubkey_from_metadata_stored(self, handler):
|
|
self._setup_handler(handler)
|
|
raw_hex = "f" * 64
|
|
meta = {"pubkey_prefix": "aabbccdd"}
|
|
event = self._make_event({"snr": 6.0, "raw_hex": raw_hex})
|
|
await handler.handle_rf_log_data(event, metadata=meta)
|
|
entry = handler.recent_rf_data[0]
|
|
assert entry["pubkey_prefix"] == "aabbccdd"
|
|
|
|
async def test_valid_packet_decoded_and_routing_stored(self, handler):
|
|
self._setup_handler(handler)
|
|
# Build a valid MeshCore packet (FLOOD + TXT_MSG, 0 hops)
|
|
pkt_hex = _make_packet_hex(2, 1, payload_bytes=b"\x48\x65\x6c\x6c\x6f")
|
|
# raw_hex must be >= 64 chars for packet_prefix, pad with zeros
|
|
padded = pkt_hex.ljust(64, "0")
|
|
event = self._make_event({"snr": 9.0, "raw_hex": padded})
|
|
await handler.handle_rf_log_data(event)
|
|
assert len(handler.recent_rf_data) == 1
|
|
# routing_info should be populated since raw_hex contains a valid packet
|
|
entry = handler.recent_rf_data[0]
|
|
assert entry["routing_info"] is not None
|
|
|
|
async def test_exception_does_not_propagate(self, handler):
|
|
self._setup_handler(handler)
|
|
event = Mock()
|
|
# Make deepcopy blow up
|
|
with patch("modules.message_handler.copy.deepcopy", side_effect=RuntimeError("deepcopy fail")):
|
|
await handler.handle_rf_log_data(event)
|
|
handler.logger.error.assert_called()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _get_path_from_rf_data
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestGetPathFromRfData:
|
|
"""Tests for MessageHandler._get_path_from_rf_data() — path extraction helper."""
|
|
|
|
def test_routing_info_path_nodes_returned_directly(self, handler):
|
|
rf = {
|
|
"routing_info": {"path_nodes": ["ab", "cd"], "path_length": 2},
|
|
"raw_hex": "",
|
|
}
|
|
path_str, nodes, hops = handler._get_path_from_rf_data(rf)
|
|
assert path_str == "ab,cd"
|
|
assert nodes == ["ab", "cd"]
|
|
assert hops == 2
|
|
|
|
def test_no_raw_hex_returns_none_tuple(self, handler):
|
|
rf = {"routing_info": {}, "raw_hex": ""}
|
|
path_str, nodes, hops = handler._get_path_from_rf_data(rf)
|
|
assert path_str is None
|
|
assert nodes is None
|
|
assert hops == 255
|
|
|
|
def test_raw_hex_decoded_and_path_returned(self, handler):
|
|
path_b = bytes([0xAB, 0xCD])
|
|
pkt_hex = _make_packet_hex(4, 1, path_bytes=path_b, payload_bytes=b"\xEE",
|
|
hop_count=2, bytes_per_hop=1)
|
|
padded = pkt_hex.ljust(64, "0")
|
|
rf = {"routing_info": {}, "raw_hex": padded, "payload": ""}
|
|
path_str, nodes, hops = handler._get_path_from_rf_data(rf)
|
|
assert nodes is not None
|
|
assert len(nodes) == 2
|
|
assert hops == 2
|
|
|
|
def test_invalid_raw_hex_raises_or_returns_none_tuple(self, handler):
|
|
# Non-hex raw_hex triggers the same UnboundLocalError source bug as
|
|
# decode_meshcore_packet("ZZZZ") — document the actual behaviour.
|
|
rf = {"routing_info": {}, "raw_hex": "ZZZZ", "payload": ""}
|
|
try:
|
|
path_str, nodes, hops = handler._get_path_from_rf_data(rf)
|
|
assert path_str is None
|
|
assert nodes is None
|
|
except (ValueError, UnboundLocalError):
|
|
pass # expected — source-level bug causes exception to propagate
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# SNR/RSSI LRU cache bounds
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestSignalCacheLRUBounds:
|
|
"""Tests for bounded LRU eviction on snr_cache and rssi_cache."""
|
|
|
|
def test_snr_cache_evicts_oldest_at_limit(self, handler):
|
|
handler._max_signal_cache_size = 3
|
|
# Fill cache to capacity
|
|
handler.snr_cache["aaa"] = 1.0
|
|
handler.snr_cache["bbb"] = 2.0
|
|
handler.snr_cache["ccc"] = 3.0
|
|
# Simulate the write path with LRU eviction
|
|
key = "ddd"
|
|
handler.snr_cache[key] = 4.0
|
|
handler.snr_cache.move_to_end(key)
|
|
while len(handler.snr_cache) > handler._max_signal_cache_size:
|
|
handler.snr_cache.popitem(last=False)
|
|
# Oldest entry ("aaa") should be evicted
|
|
assert "aaa" not in handler.snr_cache
|
|
assert len(handler.snr_cache) == 3
|
|
assert list(handler.snr_cache.keys()) == ["bbb", "ccc", "ddd"]
|
|
|
|
def test_rssi_cache_evicts_oldest_at_limit(self, handler):
|
|
handler._max_signal_cache_size = 2
|
|
handler.rssi_cache["x1"] = -50.0
|
|
handler.rssi_cache["x2"] = -60.0
|
|
# Add a third entry with eviction
|
|
key = "x3"
|
|
handler.rssi_cache[key] = -70.0
|
|
handler.rssi_cache.move_to_end(key)
|
|
while len(handler.rssi_cache) > handler._max_signal_cache_size:
|
|
handler.rssi_cache.popitem(last=False)
|
|
assert "x1" not in handler.rssi_cache
|
|
assert len(handler.rssi_cache) == 2
|
|
|
|
def test_existing_key_update_does_not_evict(self, handler):
|
|
handler._max_signal_cache_size = 2
|
|
handler.snr_cache["a"] = 1.0
|
|
handler.snr_cache["b"] = 2.0
|
|
# Update existing key — no eviction needed
|
|
handler.snr_cache["a"] = 5.0
|
|
handler.snr_cache.move_to_end("a")
|
|
while len(handler.snr_cache) > handler._max_signal_cache_size:
|
|
handler.snr_cache.popitem(last=False)
|
|
assert len(handler.snr_cache) == 2
|
|
assert handler.snr_cache["a"] == 5.0
|
|
assert handler.snr_cache["b"] == 2.0
|