mirror of
https://github.com/agessaman/meshcore-bot.git
synced 2026-03-30 20:15:40 +00:00
- Introduced `extract_path_node_ids_from_message` to streamline extraction of node IDs from mesh messages, prioritizing `routing_info.path_nodes` and supporting multi-byte comma parsing. - Updated `calculate_path_distances` to accept an optional message parameter, enhancing its functionality by allowing it to derive node IDs from messages. - Added `node_ids_from_path_string` to handle parsing of path strings into node IDs, improving the handling of both legacy and multi-byte formats. - Refactored `TestCommand` to utilize the new extraction function, ensuring consistent behavior across commands. - Expanded test coverage for new utility functions to validate their correctness and robustness. These changes improve the clarity and maintainability of path-related utilities, enhancing overall functionality in message processing.
786 lines
30 KiB
Python
786 lines
30 KiB
Python
"""Tests for modules.utils."""
|
|
|
|
import configparser
|
|
from pathlib import Path
|
|
from unittest.mock import MagicMock, Mock, patch
|
|
|
|
import pytest
|
|
|
|
from modules.utils import (
|
|
abbreviate_location,
|
|
calculate_distance,
|
|
calculate_packet_hash,
|
|
calculate_path_distances,
|
|
check_internet_connectivity,
|
|
decode_escape_sequences,
|
|
decode_path_len_byte,
|
|
encode_path_len_byte,
|
|
extract_path_node_ids_from_message,
|
|
format_elapsed_display,
|
|
format_keyword_response_with_placeholders,
|
|
format_location_for_display,
|
|
get_config_timezone,
|
|
get_major_city_queries,
|
|
is_valid_timezone,
|
|
node_ids_from_path_string,
|
|
parse_location_string,
|
|
parse_path_string,
|
|
resolve_path,
|
|
truncate_string,
|
|
)
|
|
|
|
|
|
class TestAbbreviateLocation:
|
|
"""Tests for abbreviate_location()."""
|
|
|
|
def test_empty_returns_empty(self):
|
|
assert abbreviate_location("") == ""
|
|
assert abbreviate_location(None) is None
|
|
|
|
def test_under_max_length_unchanged(self):
|
|
assert abbreviate_location("Seattle", max_length=20) == "Seattle"
|
|
assert abbreviate_location("Portland, OR", max_length=20) == "Portland, OR"
|
|
|
|
def test_united_states_abbreviated(self):
|
|
assert "USA" in abbreviate_location("United States of America", max_length=50)
|
|
assert abbreviate_location("United States", max_length=50) == "USA"
|
|
|
|
def test_british_columbia_abbreviated(self):
|
|
assert abbreviate_location("Vancouver, British Columbia", max_length=50) == "Vancouver, BC"
|
|
|
|
def test_over_max_truncates_with_ellipsis(self):
|
|
result = abbreviate_location("Very Long City Name That Exceeds Limit", max_length=20)
|
|
assert len(result) <= 20
|
|
assert result.endswith("...")
|
|
|
|
def test_comma_separated_keeps_first_part_when_truncating(self):
|
|
result = abbreviate_location("Seattle, Washington, USA", max_length=10)
|
|
assert "Seattle" in result or result.startswith("Seattle")
|
|
|
|
|
|
class TestTruncateString:
|
|
"""Tests for truncate_string()."""
|
|
|
|
def test_empty_returns_empty(self):
|
|
assert truncate_string("", 10) == ""
|
|
assert truncate_string(None, 10) is None
|
|
|
|
def test_under_max_unchanged(self):
|
|
assert truncate_string("hello", 10) == "hello"
|
|
|
|
def test_over_max_truncates_with_ellipsis(self):
|
|
assert truncate_string("hello world", 8) == "hello..."
|
|
assert truncate_string("hello world", 11) == "hello world"
|
|
|
|
def test_custom_ellipsis(self):
|
|
# max_length=8 with ellipsis=".." (2 chars) -> 6 chars + ".."
|
|
assert truncate_string("hello world", 8, ellipsis="..") == "hello .."
|
|
|
|
|
|
class TestDecodeEscapeSequences:
|
|
"""Tests for decode_escape_sequences()."""
|
|
|
|
def test_empty_returns_empty(self):
|
|
assert decode_escape_sequences("") == ""
|
|
assert decode_escape_sequences(None) is None
|
|
|
|
def test_newline(self):
|
|
assert decode_escape_sequences(r"Line 1\nLine 2") == "Line 1\nLine 2"
|
|
|
|
def test_tab(self):
|
|
assert decode_escape_sequences(r"Col1\tCol2") == "Col1\tCol2"
|
|
|
|
def test_literal_backslash_n(self):
|
|
assert decode_escape_sequences(r"Literal \\n here") == "Literal \\n here"
|
|
|
|
def test_mixed(self):
|
|
result = decode_escape_sequences(r"Line 1\nLine 2\tTab")
|
|
assert "Line 1" in result
|
|
assert "\n" in result
|
|
assert "\t" in result
|
|
|
|
def test_carriage_return(self):
|
|
assert decode_escape_sequences(r"Line1\r\nLine2") == "Line1\r\nLine2"
|
|
|
|
|
|
class TestParseLocationString:
|
|
"""Tests for parse_location_string()."""
|
|
|
|
def test_no_comma_returns_city_only(self):
|
|
city, second, kind = parse_location_string("Seattle")
|
|
assert city == "Seattle"
|
|
assert second is None
|
|
assert kind is None
|
|
|
|
def test_zipcode_only(self):
|
|
city, second, kind = parse_location_string("98101")
|
|
assert city == "98101"
|
|
assert second is None
|
|
assert kind is None
|
|
|
|
def test_city_state_format(self):
|
|
city, second, kind = parse_location_string("Seattle, WA")
|
|
assert city == "Seattle"
|
|
assert second is not None
|
|
assert kind in ("state", None)
|
|
|
|
def test_city_country_format(self):
|
|
city, second, kind = parse_location_string("Stockholm, Sweden")
|
|
assert city == "Stockholm"
|
|
assert second is not None
|
|
|
|
|
|
class TestCalculateDistance:
|
|
"""Tests for calculate_distance() (Haversine)."""
|
|
|
|
def test_same_point_zero_distance(self):
|
|
assert calculate_distance(47.6062, -122.3321, 47.6062, -122.3321) == 0.0
|
|
|
|
def test_known_distance_seattle_portland(self):
|
|
# Seattle to Portland ~233 km
|
|
dist = calculate_distance(47.6062, -122.3321, 45.5152, -122.6784)
|
|
assert 220 < dist < 250
|
|
|
|
def test_known_distance_short(self):
|
|
# ~1 degree lat at equator ~111 km
|
|
dist = calculate_distance(0, 0, 1, 0)
|
|
assert 110 < dist < 112
|
|
|
|
|
|
class TestFormatElapsedDisplay:
|
|
"""Tests for format_elapsed_display()."""
|
|
|
|
def test_none_returns_sync_message(self):
|
|
assert "Sync" in format_elapsed_display(None)
|
|
assert "Clock" in format_elapsed_display(None)
|
|
|
|
def test_unknown_returns_sync_message(self):
|
|
assert "Sync" in format_elapsed_display("unknown")
|
|
|
|
def test_invalid_type_returns_sync_message(self):
|
|
assert "Sync" in format_elapsed_display("not_a_number")
|
|
|
|
def test_valid_recent_timestamp_returns_ms(self):
|
|
import time
|
|
ts = time.time() - 1.5 # 1.5 seconds ago
|
|
result = format_elapsed_display(ts)
|
|
assert "ms" in result
|
|
assert "Sync" not in result
|
|
|
|
def test_future_timestamp_returns_sync_message(self):
|
|
import time
|
|
ts = time.time() + 3600 # 1 hour in future
|
|
assert "Sync" in format_elapsed_display(ts)
|
|
|
|
def test_translator_used_when_provided(self):
|
|
translator = MagicMock()
|
|
translator.translate = MagicMock(return_value="Custom Sync Message")
|
|
result = format_elapsed_display(None, translator=translator)
|
|
assert result == "Custom Sync Message"
|
|
|
|
|
|
class TestDecodePathLenByte:
|
|
"""Tests for decode_path_len_byte() (RF path_len encoding: low 6 bits = hop count, high 2 = size code)."""
|
|
|
|
def test_single_byte_one_hop(self):
|
|
# size_code=0 -> 1 byte/hop, hop_count=1 -> 1 path byte
|
|
path_byte_length, bytes_per_hop = decode_path_len_byte(0x01)
|
|
assert path_byte_length == 1
|
|
assert bytes_per_hop == 1
|
|
|
|
def test_single_byte_three_hops(self):
|
|
path_byte_length, bytes_per_hop = decode_path_len_byte(0x03)
|
|
assert path_byte_length == 3
|
|
assert bytes_per_hop == 1
|
|
|
|
def test_multi_byte_two_bytes_per_hop_one_hop(self):
|
|
# size_code=1 -> 2 bytes/hop, hop_count=1 -> 2 path bytes
|
|
path_byte_length, bytes_per_hop = decode_path_len_byte(0x41)
|
|
assert path_byte_length == 2
|
|
assert bytes_per_hop == 2
|
|
|
|
def test_multi_byte_two_bytes_per_hop_three_hops(self):
|
|
# size_code=1, hop_count=3 -> 6 path bytes
|
|
path_byte_length, bytes_per_hop = decode_path_len_byte(0x43)
|
|
assert path_byte_length == 6
|
|
assert bytes_per_hop == 2
|
|
|
|
def test_three_bytes_per_hop(self):
|
|
# size_code=2 -> 3 bytes/hop, hop_count=2 -> 6 path bytes
|
|
path_byte_length, bytes_per_hop = decode_path_len_byte(0x82)
|
|
assert path_byte_length == 6
|
|
assert bytes_per_hop == 3
|
|
|
|
def test_reserved_size_code_fallback(self):
|
|
# size_code=3 (bytes_per_hop=4) is reserved -> legacy: path_len_byte as raw byte count, 1 byte/hop
|
|
path_byte_length, bytes_per_hop = decode_path_len_byte(0xC2)
|
|
assert path_byte_length == 0xC2 # raw byte value
|
|
assert bytes_per_hop == 1
|
|
|
|
def test_path_exceeds_max_fallback(self):
|
|
# 32 hops * 2 bytes = 64, max_path_size=64 is ok; 33*2=66 > 64 -> legacy
|
|
path_byte_length, bytes_per_hop = decode_path_len_byte(0x41, max_path_size=64)
|
|
assert path_byte_length == 2
|
|
assert bytes_per_hop == 2
|
|
path_byte_length, bytes_per_hop = decode_path_len_byte(0x61, max_path_size=64) # 33 hops * 2
|
|
assert path_byte_length == 0x61
|
|
assert bytes_per_hop == 1
|
|
|
|
def test_zero_hops(self):
|
|
path_byte_length, bytes_per_hop = decode_path_len_byte(0x00)
|
|
assert path_byte_length == 0
|
|
assert bytes_per_hop == 1
|
|
|
|
|
|
class TestEncodePathLenByte:
|
|
"""Tests for encode_path_len_byte() (inverse of decode_path_len_byte for valid encodings)."""
|
|
|
|
def test_four_hops_one_byte_per_hop(self):
|
|
assert encode_path_len_byte(4, 1) == 0x04
|
|
assert (0x04 >> 6) == 0
|
|
assert (0x04 & 0x3F) == 4
|
|
|
|
def test_three_hops_two_bytes_per_hop(self):
|
|
assert encode_path_len_byte(3, 2) == 0x43
|
|
pbl, bph = decode_path_len_byte(0x43)
|
|
assert pbl == 6
|
|
assert bph == 2
|
|
|
|
def test_roundtrip_with_decode(self):
|
|
for pb in (0x01, 0x03, 0x41, 0x43, 0x82, 0x00):
|
|
pbl, bph = decode_path_len_byte(pb)
|
|
if bph == 1 and pbl == pb:
|
|
continue # legacy fallback path
|
|
hop_count = pb & 0x3F
|
|
assert encode_path_len_byte(hop_count, bph) == pb
|
|
|
|
def test_invalid_bytes_per_hop_raises(self):
|
|
with pytest.raises(ValueError):
|
|
encode_path_len_byte(4, 4)
|
|
|
|
|
|
class TestParsePathString:
|
|
"""Tests for parse_path_string()."""
|
|
|
|
def test_empty_returns_empty_list(self):
|
|
assert parse_path_string("") == []
|
|
assert parse_path_string(None) == []
|
|
|
|
def test_comma_separated(self):
|
|
assert parse_path_string("01,5f,ab") == ["01", "5F", "AB"]
|
|
|
|
def test_space_separated(self):
|
|
assert parse_path_string("01 5f ab") == ["01", "5F", "AB"]
|
|
|
|
def test_continuous_hex(self):
|
|
assert parse_path_string("015fab") == ["01", "5F", "AB"]
|
|
|
|
def test_with_hop_count_suffix(self):
|
|
result = parse_path_string("01,5f (2 hops)")
|
|
assert result == ["01", "5F"]
|
|
|
|
def test_mixed_case_normalized_uppercase(self):
|
|
assert parse_path_string("01,5f,aB") == ["01", "5F", "AB"]
|
|
|
|
# --- Multi-byte (4 hex chars = 2 bytes per hop) ---
|
|
|
|
def test_four_char_continuous_hex(self):
|
|
assert parse_path_string("01025fab", prefix_hex_chars=4) == ["0102", "5FAB"]
|
|
|
|
def test_four_char_comma_separated(self):
|
|
assert parse_path_string("0102,5fab,abcd", prefix_hex_chars=4) == ["0102", "5FAB", "ABCD"]
|
|
|
|
def test_four_char_space_separated(self):
|
|
assert parse_path_string("0102 5fab abcd", prefix_hex_chars=4) == ["0102", "5FAB", "ABCD"]
|
|
|
|
def test_four_char_legacy_fallback_when_no_four_char_matches(self):
|
|
# Input that has no 4-char groups (odd length or different pattern) -> fallback to 2-char
|
|
result = parse_path_string("01", prefix_hex_chars=4)
|
|
assert result == ["01"]
|
|
|
|
def test_two_char_explicit(self):
|
|
"""Ensure prefix_hex_chars=2 still works when passed explicitly."""
|
|
assert parse_path_string("015fab", prefix_hex_chars=2) == ["01", "5F", "AB"]
|
|
|
|
|
|
class TestNodeIdsFromPathString:
|
|
"""Tests for node_ids_from_path_string() — multi-byte comma tokens vs legacy parse."""
|
|
|
|
def test_six_char_comma_tokens_not_split_into_one_byte_ids(self):
|
|
assert node_ids_from_path_string("28667c,e0eed9", 2) == ["28667C", "E0EED9"]
|
|
|
|
def test_strips_hop_suffix_before_comma_parse(self):
|
|
assert node_ids_from_path_string("28667c,e0eed9 (3 hops)", 2) == ["28667C", "E0EED9"]
|
|
|
|
def test_legacy_one_byte_comma_path(self):
|
|
assert node_ids_from_path_string("01,5f (2 hops)", 2) == ["01", "5F"]
|
|
|
|
def test_continuous_hex_uses_prefix_width(self):
|
|
assert node_ids_from_path_string("01025f7e", 2) == ["01", "02", "5F", "7E"]
|
|
|
|
def test_direct_returns_empty(self):
|
|
assert node_ids_from_path_string("Direct", 2) == []
|
|
|
|
|
|
class TestExtractPathNodeIdsFromMessage:
|
|
"""Tests for extract_path_node_ids_from_message()."""
|
|
|
|
def test_prefers_routing_info_path_nodes(self):
|
|
m = Mock()
|
|
m.path = "01,5f"
|
|
m.routing_info = {"path_length": 2, "path_nodes": ["28667C", "E0EED9"]}
|
|
assert extract_path_node_ids_from_message(m) == ["28667C", "E0EED9"]
|
|
|
|
def test_path_length_zero_returns_empty(self):
|
|
m = Mock()
|
|
m.path = "anything"
|
|
m.routing_info = {"path_length": 0}
|
|
assert extract_path_node_ids_from_message(m) == []
|
|
|
|
def test_comma_multibyte_from_path_string(self):
|
|
m = Mock()
|
|
m.path = "28667c,e0eed9 (3 hops)"
|
|
m.routing_info = None
|
|
assert extract_path_node_ids_from_message(m) == ["28667C", "E0EED9"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# is_valid_timezone
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestIsValidTimezone:
|
|
"""Tests for is_valid_timezone()."""
|
|
|
|
def test_valid_utc(self):
|
|
assert is_valid_timezone("UTC") is True
|
|
|
|
def test_valid_us_eastern(self):
|
|
assert is_valid_timezone("America/New_York") is True
|
|
|
|
def test_valid_europe_london(self):
|
|
assert is_valid_timezone("Europe/London") is True
|
|
|
|
def test_invalid_returns_false(self):
|
|
assert is_valid_timezone("Not/A/Timezone") is False
|
|
|
|
def test_empty_string_returns_false(self):
|
|
assert is_valid_timezone("") is False
|
|
|
|
def test_whitespace_only_returns_false(self):
|
|
assert is_valid_timezone(" ") is False
|
|
|
|
def test_strips_whitespace_before_checking(self):
|
|
assert is_valid_timezone(" UTC ") is True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# get_config_timezone
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestGetConfigTimezone:
|
|
"""Tests for get_config_timezone()."""
|
|
|
|
def _config(self, tz_value=""):
|
|
cfg = configparser.ConfigParser()
|
|
cfg.add_section("Bot")
|
|
if tz_value:
|
|
cfg.set("Bot", "timezone", tz_value)
|
|
return cfg
|
|
|
|
def test_valid_timezone_returned(self):
|
|
cfg = self._config("UTC")
|
|
tz, iana = get_config_timezone(cfg)
|
|
assert iana == "UTC"
|
|
assert tz is not None
|
|
|
|
def test_invalid_timezone_falls_back_to_utc_iana(self):
|
|
cfg = self._config("Not/Valid")
|
|
_, iana = get_config_timezone(cfg)
|
|
assert iana == "UTC"
|
|
|
|
def test_empty_timezone_falls_back(self):
|
|
cfg = self._config("")
|
|
_, iana = get_config_timezone(cfg)
|
|
assert iana == "UTC"
|
|
|
|
def test_invalid_timezone_logs_warning_when_logger_provided(self):
|
|
cfg = self._config("Bad/Zone")
|
|
logger = Mock()
|
|
get_config_timezone(cfg, logger)
|
|
logger.warning.assert_called_once()
|
|
|
|
def test_no_logger_no_crash_on_invalid(self):
|
|
cfg = self._config("Bad/Zone")
|
|
_, iana = get_config_timezone(cfg, None)
|
|
assert iana == "UTC"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# format_location_for_display
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestFormatLocationForDisplay:
|
|
"""Tests for format_location_for_display()."""
|
|
|
|
def test_none_city_returns_none(self):
|
|
assert format_location_for_display(None) is None
|
|
|
|
def test_empty_city_returns_none(self):
|
|
assert format_location_for_display("") is None
|
|
|
|
def test_city_only(self):
|
|
result = format_location_for_display("Seattle", max_length=50)
|
|
assert "Seattle" in result
|
|
|
|
def test_city_and_state(self):
|
|
result = format_location_for_display("Seattle", "WA", max_length=50)
|
|
assert "Seattle" in result
|
|
assert "WA" in result
|
|
|
|
def test_state_not_duplicated_when_same_as_city(self):
|
|
result = format_location_for_display("Seattle", "Seattle", max_length=50)
|
|
# "Seattle" should not appear twice as comma-joined
|
|
assert result.count("Seattle") == 1
|
|
|
|
def test_respects_max_length(self):
|
|
result = format_location_for_display("Very Long City Name That Goes On", "State", max_length=15)
|
|
assert len(result) <= 15
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# get_major_city_queries
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestGetMajorCityQueries:
|
|
"""Tests for get_major_city_queries()."""
|
|
|
|
def test_known_city_returns_list(self):
|
|
result = get_major_city_queries("seattle")
|
|
assert isinstance(result, list)
|
|
assert len(result) > 0
|
|
assert any("Seattle" in q for q in result)
|
|
|
|
def test_unknown_city_returns_empty(self):
|
|
result = get_major_city_queries("tinyunknownvillage")
|
|
assert result == []
|
|
|
|
def test_case_insensitive(self):
|
|
assert get_major_city_queries("Seattle") == get_major_city_queries("seattle")
|
|
|
|
def test_new_york_returns_multiple_queries(self):
|
|
result = get_major_city_queries("new york")
|
|
assert len(result) >= 1
|
|
|
|
def test_portland_returns_multiple_queries(self):
|
|
# Portland has OR and ME variants
|
|
result = get_major_city_queries("portland")
|
|
assert len(result) >= 2
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# resolve_path
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestResolvePath:
|
|
"""Tests for resolve_path()."""
|
|
|
|
def test_absolute_path_unchanged(self):
|
|
result = resolve_path("/var/lib/bot/data.db", "/opt/bot")
|
|
assert result == "/var/lib/bot/data.db"
|
|
|
|
def test_relative_path_resolved_to_base_dir(self):
|
|
result = resolve_path("data.db", "/opt/bot")
|
|
assert result == "/opt/bot/data.db"
|
|
|
|
def test_path_object_input(self):
|
|
result = resolve_path(Path("data.db"), Path("/opt/bot"))
|
|
assert result == "/opt/bot/data.db"
|
|
|
|
def test_returns_string(self):
|
|
result = resolve_path("data.db", "/opt/bot")
|
|
assert isinstance(result, str)
|
|
|
|
def test_dot_base_dir_resolves_to_cwd(self):
|
|
import os
|
|
result = resolve_path("data.db", ".")
|
|
assert result == os.path.join(os.getcwd(), "data.db")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# check_internet_connectivity
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestCheckInternetConnectivity:
|
|
"""Tests for check_internet_connectivity()."""
|
|
|
|
def test_returns_true_when_socket_connects(self):
|
|
mock_sock = Mock()
|
|
with patch("modules.utils.socket.socket") as mock_socket_cls:
|
|
mock_socket_cls.return_value = mock_sock
|
|
result = check_internet_connectivity(host="8.8.8.8", port=53, timeout=1.0)
|
|
assert result is True
|
|
mock_sock.connect.assert_called_once_with(("8.8.8.8", 53))
|
|
|
|
def test_returns_false_when_all_fail(self):
|
|
with patch("modules.utils.socket.socket") as mock_socket_cls, \
|
|
patch("modules.utils.urllib.request.urlopen") as mock_urlopen:
|
|
mock_socket_cls.return_value.connect.side_effect = OSError("refused")
|
|
mock_urlopen.side_effect = OSError("no net")
|
|
result = check_internet_connectivity(host="8.8.8.8", port=53, timeout=1.0)
|
|
assert result is False
|
|
|
|
def test_falls_back_to_http_when_socket_fails(self):
|
|
mock_response = Mock()
|
|
mock_response.close = Mock()
|
|
with patch("modules.utils.socket.socket") as mock_socket_cls, \
|
|
patch("modules.utils.urllib.request.urlopen") as mock_urlopen:
|
|
mock_socket_cls.return_value.connect.side_effect = OSError("refused")
|
|
mock_urlopen.return_value = mock_response
|
|
result = check_internet_connectivity(host="8.8.8.8", port=53, timeout=1.0)
|
|
assert result is True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# calculate_path_distances
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestCalculatePathDistances:
|
|
"""Tests for calculate_path_distances()."""
|
|
|
|
def _bot(self):
|
|
bot = Mock()
|
|
bot.db_manager = Mock()
|
|
bot.prefix_hex_chars = 2
|
|
bot.logger = Mock()
|
|
return bot
|
|
|
|
def test_empty_path_returns_direct(self):
|
|
path_dist, fl_dist = calculate_path_distances(self._bot(), "")
|
|
assert "direct" in path_dist.lower()
|
|
|
|
def test_direct_path_returns_direct(self):
|
|
path_dist, fl_dist = calculate_path_distances(self._bot(), "Direct")
|
|
assert "direct" in path_dist.lower()
|
|
|
|
def test_no_db_manager_returns_unknown(self):
|
|
bot = Mock(spec=[]) # No db_manager attribute
|
|
path_dist, fl_dist = calculate_path_distances(bot, "01,5f")
|
|
assert "unknown" in path_dist.lower()
|
|
|
|
def test_single_node_returns_locally(self):
|
|
bot = self._bot()
|
|
with patch("modules.utils._get_node_location_from_db", return_value=None):
|
|
path_dist, fl_dist = calculate_path_distances(bot, "01")
|
|
assert "local" in path_dist.lower() or "1 hop" in path_dist.lower()
|
|
|
|
def test_two_nodes_with_locations_returns_distance(self):
|
|
bot = self._bot()
|
|
# Seattle and Portland coords
|
|
locations = [((47.6062, -122.3321), None), ((45.5152, -122.6784), None)]
|
|
with patch("modules.utils._get_node_location_from_db", side_effect=locations):
|
|
path_dist, fl_dist = calculate_path_distances(bot, "01,5f")
|
|
assert "km" in path_dist
|
|
assert "km" in fl_dist
|
|
|
|
def test_two_nodes_no_locations_returns_unknown(self):
|
|
bot = self._bot()
|
|
with patch("modules.utils._get_node_location_from_db", return_value=None):
|
|
path_dist, fl_dist = calculate_path_distances(bot, "01,5f")
|
|
assert "unknown" in path_dist.lower()
|
|
|
|
def test_multibyte_comma_path_one_segment_not_five(self):
|
|
"""6-char hop IDs must not be split into 2-char tokens (would inflate segment count)."""
|
|
bot = self._bot()
|
|
bot.prefix_hex_chars = 2
|
|
locations = [((47.6062, -122.3321), None), ((45.5152, -122.6784), None)]
|
|
with patch("modules.utils._get_node_location_from_db", side_effect=locations):
|
|
path_dist, fl_dist = calculate_path_distances(bot, "28667c,e0eed9")
|
|
assert "(1 segs)" in path_dist
|
|
assert "km" in fl_dist
|
|
|
|
def test_message_routing_info_path_nodes_overrides_display_path(self):
|
|
bot = self._bot()
|
|
bot.prefix_hex_chars = 2
|
|
msg = Mock()
|
|
msg.path = "01,5f"
|
|
msg.routing_info = {
|
|
"path_length": 2,
|
|
"path_nodes": ["28667c", "e0eed9"],
|
|
}
|
|
locations = [((47.6062, -122.3321), None), ((45.5152, -122.6784), None)]
|
|
with patch("modules.utils._get_node_location_from_db", side_effect=locations):
|
|
path_dist, fl_dist = calculate_path_distances(bot, "", message=msg)
|
|
assert "(1 segs)" in path_dist
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# format_keyword_response_with_placeholders
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestFormatKeywordResponseWithPlaceholders:
|
|
"""Tests for format_keyword_response_with_placeholders()."""
|
|
|
|
def _bot(self):
|
|
bot = Mock()
|
|
bot.config = configparser.ConfigParser()
|
|
bot.config.add_section("Bot")
|
|
bot.config.set("Bot", "timezone", "UTC")
|
|
bot.db_manager = Mock()
|
|
bot.prefix_hex_chars = 2
|
|
bot.logger = Mock()
|
|
bot.translator = None
|
|
return bot
|
|
|
|
def _msg(self, **kwargs):
|
|
msg = Mock()
|
|
msg.sender_id = kwargs.get("sender_id", "Alice")
|
|
msg.path = kwargs.get("path", "01,5f")
|
|
msg.snr = kwargs.get("snr", 10)
|
|
msg.rssi = kwargs.get("rssi", -80)
|
|
msg.timestamp = kwargs.get("timestamp")
|
|
msg.hops = kwargs.get("hops")
|
|
return msg
|
|
|
|
def test_sender_placeholder(self):
|
|
bot = self._bot()
|
|
msg = self._msg(sender_id="Bob")
|
|
with patch("modules.utils.calculate_path_distances", return_value=("", "")):
|
|
result = format_keyword_response_with_placeholders("{sender}", msg, bot)
|
|
assert result == "Bob"
|
|
|
|
def test_no_message_uses_unknown_defaults(self):
|
|
bot = self._bot()
|
|
result = format_keyword_response_with_placeholders("{sender}", None, bot)
|
|
assert result == "Unknown"
|
|
|
|
def test_mesh_info_total_contacts(self):
|
|
bot = self._bot()
|
|
mesh_info = {"total_contacts": 42}
|
|
result = format_keyword_response_with_placeholders("{total_contacts}", None, bot, mesh_info)
|
|
assert result == "42"
|
|
|
|
def test_missing_placeholder_returns_format_string(self):
|
|
bot = self._bot()
|
|
# {nonexistent} is not in replacements -> KeyError -> returns raw string
|
|
result = format_keyword_response_with_placeholders("{nonexistent}", None, bot)
|
|
assert result == "{nonexistent}"
|
|
|
|
def test_hops_label_singular(self):
|
|
bot = self._bot()
|
|
msg = self._msg(hops=1)
|
|
with patch("modules.utils.calculate_path_distances", return_value=("", "")):
|
|
result = format_keyword_response_with_placeholders("{hops_label}", msg, bot)
|
|
assert result == "1 hop"
|
|
|
|
def test_hops_label_plural(self):
|
|
bot = self._bot()
|
|
msg = self._msg(hops=3)
|
|
with patch("modules.utils.calculate_path_distances", return_value=("", "")):
|
|
result = format_keyword_response_with_placeholders("{hops_label}", msg, bot)
|
|
assert result == "3 hops"
|
|
|
|
def test_connection_info_contains_snr_rssi(self):
|
|
bot = self._bot()
|
|
msg = self._msg(snr=12, rssi=-75)
|
|
with patch("modules.utils.calculate_path_distances", return_value=("", "")):
|
|
result = format_keyword_response_with_placeholders("{connection_info}", msg, bot)
|
|
assert "SNR" in result
|
|
assert "RSSI" in result
|
|
|
|
class TestCalculatePacketHashPathLength:
|
|
"""Tests that calculate_packet_hash uses decode_path_len_byte so multi-byte paths skip correctly."""
|
|
|
|
def test_single_byte_path_hash_valid(self):
|
|
# Minimal TRACE packet: header(0x24=route0 type9), 4B transport, path_len=0x01 (1 hop, 1 byte), path [0x01], payload
|
|
raw = "24000000000101deadbeef"
|
|
h = calculate_packet_hash(raw)
|
|
assert len(h) == 16
|
|
assert all(c in "0123456789ABCDEF" for c in h)
|
|
assert h != "0000000000000000"
|
|
|
|
def test_multi_byte_path_hash_valid(self):
|
|
# Same but path_len=0x41 (1 hop, 2 bytes), path 0x01 0x02, same payload
|
|
raw = "2400000000410102deadbeef"
|
|
h = calculate_packet_hash(raw)
|
|
assert len(h) == 16
|
|
assert all(c in "0123456789ABCDEF" for c in h)
|
|
assert h != "0000000000000000"
|
|
|
|
def test_single_vs_multi_byte_path_different_hashes(self):
|
|
# TRACE includes path_byte_length in hash, so different path lengths must produce different hashes
|
|
single = "24000000000101deadbeef"
|
|
multi = "2400000000410102deadbeef"
|
|
assert calculate_packet_hash(single) != calculate_packet_hash(multi)
|
|
|
|
|
|
class TestMultiBytePathDisplayContract:
|
|
"""Contract: path_hex stored with bytes_per_hop=2 should format to comma-separated 4-char nodes."""
|
|
|
|
def test_multi_byte_path_format_contract(self):
|
|
# path_hex "01025fab" with 2 bytes per hop (4 hex chars per node) -> "0102,5fab"
|
|
path_hex = "01025fab"
|
|
nodes = parse_path_string(path_hex, prefix_hex_chars=4)
|
|
assert nodes == ["0102", "5FAB"]
|
|
display = ",".join(n.lower() for n in nodes)
|
|
assert display == "0102,5fab"
|
|
|
|
def test_single_byte_path_format_contract(self):
|
|
# path_hex "01025fab" with 1 byte per hop (2 hex chars) -> "01,02,5f,ab"
|
|
path_hex = "01025fab"
|
|
nodes = parse_path_string(path_hex, prefix_hex_chars=2)
|
|
assert nodes == ["01", "02", "5F", "AB"]
|
|
display = ",".join(n.lower() for n in nodes)
|
|
assert display == "01,02,5f,ab"
|
|
|
|
|
|
class TestCalculatePacketHashEdgeCases:
|
|
"""Additional calculate_packet_hash branch coverage."""
|
|
|
|
def test_payload_type_with_value_attr_handled(self):
|
|
"""Lines 376-378: payload_type object with .value attribute is accepted."""
|
|
# FLOOD+TXT_MSG header: (0<<6)|(2<<2)|1 = 0x09, no transport, 0 hops, 1 payload byte
|
|
raw = "090000ff"
|
|
|
|
class FakeEnum:
|
|
value = 2 # TXT_MSG
|
|
|
|
h = calculate_packet_hash(raw, payload_type=FakeEnum())
|
|
assert h != "0000000000000000"
|
|
assert len(h) == 16
|
|
|
|
def test_too_short_for_path_len_returns_default(self):
|
|
"""Line 391: packet with only header byte (and transport if applicable) → default hash."""
|
|
# Header only, no path_len_byte: just "09" (1 byte, offset=1, len<=1 → too short)
|
|
h = calculate_packet_hash("09")
|
|
assert h == "0000000000000000"
|
|
|
|
def test_not_enough_path_bytes_returns_default(self):
|
|
"""Line 399: path_len_byte says N hops but fewer bytes available → default hash."""
|
|
# Header 0x09 (FLOOD+TXT_MSG), path_len_byte=0x02 (2 hops 1-byte), but only 1 path byte
|
|
h = calculate_packet_hash("09020100") # header, path_len(2 hops), 1 path byte + 1 'payload'?
|
|
# Actually: header(09), path_len(02)=2 hops → needs 2 path bytes + 1 payload byte = 5 bytes total
|
|
# We provide 4 bytes: 09 02 01 00 → path bytes = 01 00 but payload missing?
|
|
# Let's check: offset=1, path_len_byte=02 → path_byte_length=2, offset after path_len=2
|
|
# need len>=2+2=4 for path, but payload needed too: len>=5
|
|
# Exactly 4 bytes → payload_start=4 → len==4 not > 4 → return default
|
|
assert h == "0000000000000000"
|
|
|
|
def test_exception_in_packet_hash_returns_default(self):
|
|
"""Lines 427-429: exception during processing returns default hash."""
|
|
h = calculate_packet_hash("not-valid-hex!!!")
|
|
assert h == "0000000000000000"
|
|
|
|
def test_non_transport_type_skips_transport_bytes(self):
|
|
"""Route type 1 (FLOOD) → no transport bytes; hash succeeds."""
|
|
# Header 0x09 = FLOOD + TXT_MSG, path_len=0, payload=0xFF
|
|
h = calculate_packet_hash("090000ff")
|
|
assert h != "0000000000000000"
|
|
assert len(h) == 16
|
|
|
|
def test_transport_type_skips_4_bytes(self):
|
|
"""Route type 0 (TRANSPORT_FLOOD) → offset +=4; packet big enough."""
|
|
# Header 0x08 = TRANSPORT_FLOOD + TXT_MSG, 4 transport bytes, path_len=0, payload=0xFF
|
|
h = calculate_packet_hash("0800000000" + "00" + "ff")
|
|
assert h != "0000000000000000"
|
|
|