mirror of
https://git.quad4.io/RNS-Things/MeshChatX.git
synced 2026-04-02 16:35:48 +00:00
797 lines
29 KiB
Python
797 lines
29 KiB
Python
"""Performance regression tests for the critical hot paths.
|
|
|
|
Focus areas (user priority):
|
|
- NomadNet browser: load announces, search announces, favourites
|
|
- Messages: load conversations, search messages, load conversation messages,
|
|
upsert messages (drafts)
|
|
|
|
Metrics collected:
|
|
- ops/sec throughput
|
|
- p50 / p95 / p99 latency
|
|
- Concurrent writer contention
|
|
- LIKE-search scaling
|
|
|
|
Latency and single-thread throughput tests use fixed ceilings/floors. Concurrent
|
|
writer tests use the same on non-CI hardware; under CI a lower throughput
|
|
floor applies unless MESHCHATX_PERF_MIN_CONCURRENT_OPS is set.
|
|
"""
|
|
|
|
import os
|
|
import secrets
|
|
import shutil
|
|
import statistics
|
|
import tempfile
|
|
import threading
|
|
import time
|
|
import unittest
|
|
|
|
from meshchatx.src.backend.announce_manager import AnnounceManager
|
|
from meshchatx.src.backend.database import Database
|
|
from meshchatx.src.backend.message_handler import MessageHandler
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _min_concurrent_throughput_ops():
|
|
"""Minimum aggregate ops/s for concurrent writer tests.
|
|
|
|
Local runs use a strict floor. CI runners are often CPU-starved or on slow
|
|
shared storage; wall-clock throughput there is not comparable to dev
|
|
hardware. Override with MESHCHATX_PERF_MIN_CONCURRENT_OPS.
|
|
"""
|
|
raw = os.environ.get("MESHCHATX_PERF_MIN_CONCURRENT_OPS")
|
|
if raw is not None and raw.strip() != "":
|
|
try:
|
|
return max(1, int(raw, 10))
|
|
except ValueError:
|
|
pass
|
|
if os.environ.get("CI"):
|
|
return 20
|
|
return 100
|
|
|
|
|
|
def percentile(data, pct):
|
|
"""Return the pct-th percentile of sorted data."""
|
|
if not data:
|
|
return 0
|
|
s = sorted(data)
|
|
k = (len(s) - 1) * (pct / 100)
|
|
f = int(k)
|
|
c = f + 1
|
|
if c >= len(s):
|
|
return s[f]
|
|
return s[f] + (k - f) * (s[c] - s[f])
|
|
|
|
|
|
def timed_call(fn, *args, **kwargs):
|
|
"""Call fn and return (result, duration_ms)."""
|
|
t0 = time.perf_counter()
|
|
result = fn(*args, **kwargs)
|
|
return result, (time.perf_counter() - t0) * 1000
|
|
|
|
|
|
def latency_report(name, durations_ms):
|
|
"""Print and return latency stats."""
|
|
p50 = percentile(durations_ms, 50)
|
|
p95 = percentile(durations_ms, 95)
|
|
p99 = percentile(durations_ms, 99)
|
|
avg = statistics.mean(durations_ms)
|
|
ops = 1000 / avg if avg > 0 else float("inf")
|
|
print(
|
|
f" {name}: avg={avg:.2f}ms p50={p50:.2f}ms p95={p95:.2f}ms "
|
|
f"p99={p99:.2f}ms ops/s={ops:.0f}"
|
|
)
|
|
return {"avg": avg, "p50": p50, "p95": p95, "p99": p99, "ops": ops}
|
|
|
|
|
|
def make_message(peer_hash, i, content_size=100):
|
|
return {
|
|
"hash": secrets.token_hex(16),
|
|
"source_hash": peer_hash,
|
|
"destination_hash": "local_hash_0" * 2,
|
|
"peer_hash": peer_hash,
|
|
"state": "delivered",
|
|
"progress": 1.0,
|
|
"is_incoming": i % 2,
|
|
"method": "direct",
|
|
"delivery_attempts": 1,
|
|
"next_delivery_attempt_at": None,
|
|
"title": f"Message title {i} " + secrets.token_hex(8),
|
|
"content": f"Content body {i} " + "x" * content_size,
|
|
"fields": "{}",
|
|
"timestamp": time.time() - i,
|
|
"rssi": -50,
|
|
"snr": 5.0,
|
|
"quality": 3,
|
|
"is_spam": 0,
|
|
"reply_to_hash": None,
|
|
}
|
|
|
|
|
|
def make_announce(i):
|
|
return {
|
|
"destination_hash": secrets.token_hex(16),
|
|
"aspect": "lxmf.delivery" if i % 3 != 0 else "lxst.telephony",
|
|
"identity_hash": secrets.token_hex(16),
|
|
"identity_public_key": "pubkey_" + secrets.token_hex(8),
|
|
"app_data": "appdata_" + secrets.token_hex(16),
|
|
"rssi": -50 + (i % 30),
|
|
"snr": 5.0,
|
|
"quality": i % 10,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Test class
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestPerformanceHotPaths(unittest.TestCase):
|
|
NUM_MESSAGES = 10_000
|
|
NUM_PEERS = 200
|
|
NUM_ANNOUNCES = 5_000
|
|
NUM_FAVOURITES = 100
|
|
NUM_CONTACTS = 50
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
cls.test_dir = tempfile.mkdtemp()
|
|
cls.db_path = os.path.join(cls.test_dir, "perf_hotpaths.db")
|
|
cls.db = Database(cls.db_path)
|
|
cls.db.initialize()
|
|
cls.handler = MessageHandler(cls.db)
|
|
cls.announce_mgr = AnnounceManager(cls.db)
|
|
|
|
cls._seed_data()
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
cls.db.close_all()
|
|
shutil.rmtree(cls.test_dir, ignore_errors=True)
|
|
|
|
@classmethod
|
|
def _seed_data(cls):
|
|
print("\n--- Seeding test data ---")
|
|
|
|
# Peers
|
|
cls.peer_hashes = [secrets.token_hex(16) for _ in range(cls.NUM_PEERS)]
|
|
cls.heavy_peer = cls.peer_hashes[0]
|
|
|
|
# Messages: distribute across peers, heavy_peer gets 2000
|
|
print(f" Seeding {cls.NUM_MESSAGES} messages across {cls.NUM_PEERS} peers...")
|
|
t0 = time.perf_counter()
|
|
with cls.db.provider:
|
|
for i in range(cls.NUM_MESSAGES):
|
|
if i < 2000:
|
|
peer = cls.heavy_peer
|
|
else:
|
|
peer = cls.peer_hashes[i % cls.NUM_PEERS]
|
|
cls.db.messages.upsert_lxmf_message(make_message(peer, i))
|
|
print(f" Done in {(time.perf_counter() - t0) * 1000:.0f}ms")
|
|
|
|
# Announces
|
|
print(f" Seeding {cls.NUM_ANNOUNCES} announces...")
|
|
cls.announce_hashes = []
|
|
t0 = time.perf_counter()
|
|
with cls.db.provider:
|
|
for i in range(cls.NUM_ANNOUNCES):
|
|
data = make_announce(i)
|
|
cls.announce_hashes.append(data["destination_hash"])
|
|
cls.db.announces.upsert_announce(data)
|
|
print(f" Done in {(time.perf_counter() - t0) * 1000:.0f}ms")
|
|
|
|
# Favourites
|
|
print(f" Seeding {cls.NUM_FAVOURITES} favourites...")
|
|
with cls.db.provider:
|
|
for i in range(cls.NUM_FAVOURITES):
|
|
cls.db.announces.upsert_favourite(
|
|
cls.announce_hashes[i],
|
|
f"Fav Node {i}",
|
|
"lxmf.delivery",
|
|
)
|
|
|
|
# Contacts (for JOIN benchmarks)
|
|
print(f" Seeding {cls.NUM_CONTACTS} contacts...")
|
|
with cls.db.provider:
|
|
for i in range(cls.NUM_CONTACTS):
|
|
cls.db.contacts.add_contact(
|
|
name=f"Contact {i}",
|
|
remote_identity_hash=cls.peer_hashes[i % cls.NUM_PEERS],
|
|
lxmf_address=cls.peer_hashes[(i + 1) % cls.NUM_PEERS],
|
|
)
|
|
|
|
print("--- Seeding complete ---\n")
|
|
|
|
# ===================================================================
|
|
# ANNOUNCES — load, search, count
|
|
# ===================================================================
|
|
|
|
def test_announce_load_filtered_latency(self):
|
|
"""Load announces filtered by aspect with pagination — the NomadNet browser default view."""
|
|
print("\n[Announce] Filtered load (aspect + pagination):")
|
|
durations = []
|
|
offsets = [0, 100, 500, 1000, 2000]
|
|
for offset in offsets:
|
|
_, ms = timed_call(
|
|
self.announce_mgr.get_filtered_announces,
|
|
aspect="lxmf.delivery",
|
|
limit=50,
|
|
offset=offset,
|
|
)
|
|
durations.append(ms)
|
|
|
|
stats = latency_report("filtered_load", durations)
|
|
self.assertLess(stats["p99"], 100, "Announce filtered load p99 > 100ms")
|
|
|
|
def test_announce_search_latency(self):
|
|
"""Search announces by destination/identity hash substring."""
|
|
print("\n[Announce] LIKE search:")
|
|
search_terms = [
|
|
secrets.token_hex(4),
|
|
"abc",
|
|
self.announce_hashes[0][:8],
|
|
self.announce_hashes[2500][:10],
|
|
"nonexistent_term_xyz",
|
|
]
|
|
durations = []
|
|
for term in search_terms:
|
|
_, ms = timed_call(
|
|
self.announce_mgr.get_filtered_announces,
|
|
aspect="lxmf.delivery",
|
|
query=term,
|
|
limit=50,
|
|
offset=0,
|
|
)
|
|
durations.append(ms)
|
|
|
|
stats = latency_report("search", durations)
|
|
self.assertLess(stats["p95"], 150, "Announce search p95 > 150ms")
|
|
|
|
def test_announce_search_with_blocked(self):
|
|
"""Search with a block-list — simulates real NomadNet browser filtering."""
|
|
print("\n[Announce] Search with blocked list:")
|
|
blocked = [secrets.token_hex(16) for _ in range(50)]
|
|
durations = []
|
|
for _ in range(20):
|
|
_, ms = timed_call(
|
|
self.announce_mgr.get_filtered_announces,
|
|
aspect="lxmf.delivery",
|
|
query="abc",
|
|
blocked_identity_hashes=blocked,
|
|
limit=50,
|
|
offset=0,
|
|
)
|
|
durations.append(ms)
|
|
|
|
stats = latency_report("search+blocked", durations)
|
|
self.assertLess(stats["p95"], 200, "Announce search+blocked p95 > 200ms")
|
|
|
|
def test_announce_count_latency(self):
|
|
"""Count announces (used for pagination total)."""
|
|
print("\n[Announce] Count:")
|
|
durations = []
|
|
for _ in range(30):
|
|
_, ms = timed_call(
|
|
self.announce_mgr.get_filtered_announces_count,
|
|
aspect="lxmf.delivery",
|
|
)
|
|
durations.append(ms)
|
|
|
|
stats = latency_report("count", durations)
|
|
self.assertLess(stats["p95"], 100, "Announce count p95 > 100ms")
|
|
|
|
# ===================================================================
|
|
# FAVOURITES
|
|
# ===================================================================
|
|
|
|
def test_favourites_load_latency(self):
|
|
"""Load all favourites — typically displayed in sidebar."""
|
|
print("\n[Favourites] Load all:")
|
|
durations = []
|
|
for _ in range(50):
|
|
_, ms = timed_call(self.db.announces.get_favourites, "lxmf.delivery")
|
|
durations.append(ms)
|
|
|
|
stats = latency_report("load_favs", durations)
|
|
self.assertLess(stats["p95"], 20, "Favourites load p95 > 20ms")
|
|
|
|
def test_favourite_upsert_throughput(self):
|
|
"""Measure upsert throughput for favourites."""
|
|
print("\n[Favourites] Upsert throughput:")
|
|
durations = []
|
|
for i in range(100):
|
|
dest = secrets.token_hex(16)
|
|
_, ms = timed_call(
|
|
self.db.announces.upsert_favourite,
|
|
dest,
|
|
f"Bench Fav {i}",
|
|
"lxmf.delivery",
|
|
)
|
|
durations.append(ms)
|
|
|
|
stats = latency_report("upsert_fav", durations)
|
|
self.assertGreater(stats["ops"], 500, "Favourite upsert < 500 ops/s")
|
|
|
|
# ===================================================================
|
|
# CONVERSATIONS — load, search
|
|
# ===================================================================
|
|
|
|
def test_conversations_load_latency(self):
|
|
"""Load conversation list — the main messages sidebar query."""
|
|
print("\n[Conversations] Load list (with JOINs):")
|
|
durations = []
|
|
for _ in range(20):
|
|
_, ms = timed_call(
|
|
self.handler.get_conversations,
|
|
"local_hash",
|
|
limit=50,
|
|
offset=0,
|
|
)
|
|
durations.append(ms)
|
|
|
|
stats = latency_report("load_conversations", durations)
|
|
self.assertLess(stats["p95"], 200, "Conversation list p95 > 200ms")
|
|
|
|
def test_conversations_search_latency(self):
|
|
"""Search conversations — LIKE across titles, content, peer hashes."""
|
|
print("\n[Conversations] Search:")
|
|
terms = [
|
|
"Message title 5",
|
|
"Content body",
|
|
"abc",
|
|
"zzz_nope",
|
|
self.heavy_peer[:8],
|
|
]
|
|
durations = []
|
|
for term in terms:
|
|
_, ms = timed_call(
|
|
self.handler.get_conversations,
|
|
"local_hash",
|
|
search=term,
|
|
limit=50,
|
|
)
|
|
durations.append(ms)
|
|
|
|
stats = latency_report("search_conversations", durations)
|
|
self.assertLess(stats["p95"], 500, "Conversation search p95 > 500ms")
|
|
|
|
def test_conversations_load_paginated(self):
|
|
"""Paginate through conversation list at various offsets."""
|
|
print("\n[Conversations] Paginated load:")
|
|
durations = []
|
|
for offset in [0, 20, 50, 100, 150]:
|
|
_, ms = timed_call(
|
|
self.handler.get_conversations,
|
|
"local_hash",
|
|
limit=20,
|
|
offset=offset,
|
|
)
|
|
durations.append(ms)
|
|
|
|
stats = latency_report("paginated_conversations", durations)
|
|
self.assertLess(stats["p95"], 300, "Paginated conversations p95 > 300ms")
|
|
|
|
# ===================================================================
|
|
# MESSAGES — load, search, upsert (drafts)
|
|
# ===================================================================
|
|
|
|
def test_message_load_latency(self):
|
|
"""Load messages for a single conversation (heavy peer with 2000 msgs)."""
|
|
print("\n[Messages] Load conversation messages:")
|
|
durations = []
|
|
offsets = [0, 100, 500, 1000, 1900]
|
|
for offset in offsets:
|
|
result, ms = timed_call(
|
|
self.handler.get_conversation_messages,
|
|
"local_hash",
|
|
self.heavy_peer,
|
|
limit=50,
|
|
offset=offset,
|
|
)
|
|
durations.append(ms)
|
|
self.assertEqual(len(result), 50)
|
|
|
|
stats = latency_report("load_messages", durations)
|
|
self.assertLess(stats["p99"], 50, "Message load p99 > 50ms")
|
|
|
|
def test_message_search_latency(self):
|
|
"""Search messages across all conversations — the global search."""
|
|
print("\n[Messages] Global search:")
|
|
terms = [
|
|
"Message title 100",
|
|
"Content body 5000",
|
|
secrets.token_hex(4),
|
|
"nonexistent_xyz_123",
|
|
]
|
|
durations = []
|
|
for term in terms:
|
|
_, ms = timed_call(
|
|
self.handler.search_messages,
|
|
"local_hash",
|
|
term,
|
|
)
|
|
durations.append(ms)
|
|
|
|
stats = latency_report("search_messages", durations)
|
|
self.assertLess(stats["p95"], 300, "Message search p95 > 300ms")
|
|
|
|
def test_message_upsert_throughput(self):
|
|
"""Measure message upsert throughput — simulates saving drafts rapidly."""
|
|
print("\n[Messages] Upsert throughput (draft saves):")
|
|
durations = []
|
|
peer = secrets.token_hex(16)
|
|
for i in range(200):
|
|
msg = make_message(peer, i + 100000)
|
|
_, ms = timed_call(self.db.messages.upsert_lxmf_message, msg)
|
|
durations.append(ms)
|
|
|
|
stats = latency_report("upsert_message", durations)
|
|
self.assertGreater(stats["ops"], 300, "Message upsert < 300 ops/s")
|
|
self.assertLess(stats["p95"], 10, "Message upsert p95 > 10ms")
|
|
|
|
def test_message_upsert_update_throughput(self):
|
|
"""Measure message UPDATE throughput — re-saving existing messages (state changes)."""
|
|
print("\n[Messages] Update existing messages:")
|
|
peer = secrets.token_hex(16)
|
|
msgs = []
|
|
for i in range(100):
|
|
msg = make_message(peer, i + 200000)
|
|
self.db.messages.upsert_lxmf_message(msg)
|
|
msgs.append(msg)
|
|
|
|
durations = []
|
|
for msg in msgs:
|
|
msg["state"] = "failed"
|
|
msg["content"] = "Updated content " + secrets.token_hex(16)
|
|
_, ms = timed_call(self.db.messages.upsert_lxmf_message, msg)
|
|
durations.append(ms)
|
|
|
|
stats = latency_report("update_message", durations)
|
|
self.assertGreater(stats["ops"], 300, "Message update < 300 ops/s")
|
|
|
|
# ===================================================================
|
|
# CONCURRENT WRITERS — contention stress
|
|
# ===================================================================
|
|
|
|
def test_concurrent_message_writers(self):
|
|
"""Multiple threads inserting messages simultaneously."""
|
|
print("\n[Concurrency] Message writers:")
|
|
num_threads = 8
|
|
msgs_per_thread = 100
|
|
errors = []
|
|
all_durations = []
|
|
lock = threading.Lock()
|
|
|
|
def writer(thread_id):
|
|
thread_durations = []
|
|
peer = secrets.token_hex(16)
|
|
for i in range(msgs_per_thread):
|
|
msg = make_message(peer, thread_id * 10000 + i)
|
|
try:
|
|
_, ms = timed_call(self.db.messages.upsert_lxmf_message, msg)
|
|
thread_durations.append(ms)
|
|
except Exception as e:
|
|
errors.append(str(e))
|
|
with lock:
|
|
all_durations.extend(thread_durations)
|
|
|
|
threads = [
|
|
threading.Thread(target=writer, args=(t,)) for t in range(num_threads)
|
|
]
|
|
t0 = time.perf_counter()
|
|
for t in threads:
|
|
t.start()
|
|
for t in threads:
|
|
t.join()
|
|
wall_ms = (time.perf_counter() - t0) * 1000
|
|
|
|
total_ops = num_threads * msgs_per_thread
|
|
throughput = total_ops / (wall_ms / 1000)
|
|
print(
|
|
f" Wall time: {wall_ms:.0f}ms for {total_ops} inserts ({throughput:.0f} ops/s)"
|
|
)
|
|
latency_report("concurrent_write", all_durations)
|
|
|
|
self.assertEqual(len(errors), 0, f"Writer errors: {errors[:5]}")
|
|
floor = _min_concurrent_throughput_ops()
|
|
self.assertGreater(
|
|
throughput,
|
|
floor,
|
|
f"Concurrent write throughput < {floor} ops/s",
|
|
)
|
|
|
|
def test_concurrent_announce_writers(self):
|
|
"""Multiple threads upserting announces simultaneously."""
|
|
print("\n[Concurrency] Announce writers:")
|
|
num_threads = 6
|
|
announces_per_thread = 100
|
|
errors = []
|
|
all_durations = []
|
|
lock = threading.Lock()
|
|
|
|
def writer(thread_id):
|
|
thread_durations = []
|
|
for i in range(announces_per_thread):
|
|
data = make_announce(thread_id * 10000 + i)
|
|
try:
|
|
_, ms = timed_call(self.db.announces.upsert_announce, data)
|
|
thread_durations.append(ms)
|
|
except Exception as e:
|
|
errors.append(str(e))
|
|
with lock:
|
|
all_durations.extend(thread_durations)
|
|
|
|
threads = [
|
|
threading.Thread(target=writer, args=(t,)) for t in range(num_threads)
|
|
]
|
|
t0 = time.perf_counter()
|
|
for t in threads:
|
|
t.start()
|
|
for t in threads:
|
|
t.join()
|
|
wall_ms = (time.perf_counter() - t0) * 1000
|
|
|
|
total_ops = num_threads * announces_per_thread
|
|
throughput = total_ops / (wall_ms / 1000)
|
|
print(
|
|
f" Wall time: {wall_ms:.0f}ms for {total_ops} upserts ({throughput:.0f} ops/s)"
|
|
)
|
|
latency_report("concurrent_announce_write", all_durations)
|
|
|
|
self.assertEqual(len(errors), 0, f"Writer errors: {errors[:5]}")
|
|
floor = _min_concurrent_throughput_ops()
|
|
self.assertGreater(
|
|
throughput,
|
|
floor,
|
|
f"Concurrent announce write < {floor} ops/s",
|
|
)
|
|
|
|
def test_concurrent_read_write_contention(self):
|
|
"""Writers inserting while readers query — simulates real app usage."""
|
|
print("\n[Contention] Mixed read/write:")
|
|
num_writers = 4
|
|
num_readers = 4
|
|
ops_per_thread = 50
|
|
write_errors = []
|
|
read_errors = []
|
|
write_durations = []
|
|
read_durations = []
|
|
lock = threading.Lock()
|
|
|
|
def writer(thread_id):
|
|
local_durs = []
|
|
peer = secrets.token_hex(16)
|
|
for i in range(ops_per_thread):
|
|
msg = make_message(peer, thread_id * 10000 + i)
|
|
try:
|
|
_, ms = timed_call(self.db.messages.upsert_lxmf_message, msg)
|
|
local_durs.append(ms)
|
|
except Exception as e:
|
|
write_errors.append(str(e))
|
|
with lock:
|
|
write_durations.extend(local_durs)
|
|
|
|
def reader(_thread_id):
|
|
local_durs = []
|
|
for _ in range(ops_per_thread):
|
|
try:
|
|
_, ms = timed_call(
|
|
self.handler.get_conversations,
|
|
"local_hash",
|
|
limit=20,
|
|
)
|
|
local_durs.append(ms)
|
|
except Exception as e:
|
|
read_errors.append(str(e))
|
|
with lock:
|
|
read_durations.extend(local_durs)
|
|
|
|
writers = [
|
|
threading.Thread(target=writer, args=(t,)) for t in range(num_writers)
|
|
]
|
|
readers = [
|
|
threading.Thread(target=reader, args=(t,)) for t in range(num_readers)
|
|
]
|
|
|
|
t0 = time.perf_counter()
|
|
for t in writers + readers:
|
|
t.start()
|
|
for t in writers + readers:
|
|
t.join()
|
|
wall_ms = (time.perf_counter() - t0) * 1000
|
|
|
|
print(f" Wall time: {wall_ms:.0f}ms")
|
|
latency_report("contention_writes", write_durations)
|
|
latency_report("contention_reads", read_durations)
|
|
|
|
self.assertEqual(len(write_errors), 0, f"Write errors: {write_errors[:5]}")
|
|
self.assertEqual(len(read_errors), 0, f"Read errors: {read_errors[:5]}")
|
|
|
|
# ===================================================================
|
|
# LIKE SEARCH SCALING — how search degrades with data size
|
|
# ===================================================================
|
|
|
|
def test_like_search_scaling(self):
|
|
"""Measure how LIKE search scales across different table sizes.
|
|
This catches missing FTS indexes or query plan regressions."""
|
|
print("\n[Scaling] LIKE search across data sizes:")
|
|
|
|
# Message search on the existing 10k dataset
|
|
_, ms_msg = timed_call(
|
|
self.handler.search_messages,
|
|
"local_hash",
|
|
"Content body",
|
|
)
|
|
print(f" Message LIKE search ({self.NUM_MESSAGES} rows): {ms_msg:.2f}ms")
|
|
self.assertLess(ms_msg, 500, "Message LIKE search > 500ms on 10k rows")
|
|
|
|
# Announce search on the existing 5k dataset
|
|
_, ms_ann = timed_call(
|
|
self.announce_mgr.get_filtered_announces,
|
|
aspect="lxmf.delivery",
|
|
query="abc",
|
|
limit=50,
|
|
)
|
|
print(f" Announce LIKE search ({self.NUM_ANNOUNCES} rows): {ms_ann:.2f}ms")
|
|
self.assertLess(ms_ann, 200, "Announce LIKE search > 200ms on 5k rows")
|
|
|
|
# Contacts search
|
|
_, ms_con = timed_call(self.db.contacts.get_contacts, search="Contact")
|
|
print(f" Contacts LIKE search ({self.NUM_CONTACTS} rows): {ms_con:.2f}ms")
|
|
self.assertLess(ms_con, 50, "Contacts LIKE search > 50ms")
|
|
|
|
# ===================================================================
|
|
# N+1 BATCH OPERATIONS — transaction wrapping regression tests
|
|
# ===================================================================
|
|
|
|
def test_mark_conversations_as_read_batch(self):
|
|
"""mark_conversations_as_read should be fast for large batches (transaction-wrapped)."""
|
|
print("\n[Batch] mark_conversations_as_read:")
|
|
hashes = [secrets.token_hex(16) for _ in range(200)]
|
|
durations = []
|
|
for _ in range(5):
|
|
_, ms = timed_call(self.db.messages.mark_conversations_as_read, hashes)
|
|
durations.append(ms)
|
|
|
|
stats = latency_report("mark_read_200", durations)
|
|
self.assertLess(stats["p95"], 50, "mark_conversations_as_read(200) p95 > 50ms")
|
|
|
|
def test_mark_all_notifications_as_viewed_batch(self):
|
|
"""mark_all_notifications_as_viewed should be fast for large batches."""
|
|
print("\n[Batch] mark_all_notifications_as_viewed:")
|
|
hashes = [secrets.token_hex(16) for _ in range(200)]
|
|
durations = []
|
|
for _ in range(5):
|
|
_, ms = timed_call(
|
|
self.db.messages.mark_all_notifications_as_viewed, hashes
|
|
)
|
|
durations.append(ms)
|
|
|
|
stats = latency_report("mark_viewed_200", durations)
|
|
self.assertLess(
|
|
stats["p95"], 50, "mark_all_notifications_as_viewed(200) p95 > 50ms"
|
|
)
|
|
|
|
def test_move_conversations_to_folder_batch(self):
|
|
"""move_conversations_to_folder should be fast for large batches."""
|
|
print("\n[Batch] move_conversations_to_folder:")
|
|
self.db.messages.create_folder("perf_test_folder")
|
|
folders = self.db.messages.get_all_folders()
|
|
folder_id = folders[0]["id"]
|
|
|
|
hashes = [secrets.token_hex(16) for _ in range(200)]
|
|
durations = []
|
|
for _ in range(5):
|
|
_, ms = timed_call(
|
|
self.db.messages.move_conversations_to_folder, hashes, folder_id
|
|
)
|
|
durations.append(ms)
|
|
|
|
stats = latency_report("move_folder_200", durations)
|
|
self.assertLess(
|
|
stats["p95"], 50, "move_conversations_to_folder(200) p95 > 50ms"
|
|
)
|
|
|
|
# ===================================================================
|
|
# INDEX VERIFICATION — confirm new indexes are used
|
|
# ===================================================================
|
|
|
|
def test_indexes_exist(self):
|
|
"""Verify critical indexes exist in the schema."""
|
|
print("\n[Indexes] Checking critical indexes exist:")
|
|
rows = self.db.provider.fetchall(
|
|
"SELECT name FROM sqlite_master WHERE type='index'"
|
|
)
|
|
index_names = {r["name"] for r in rows}
|
|
|
|
expected = [
|
|
"idx_contacts_lxmf_address",
|
|
"idx_contacts_lxst_address",
|
|
"idx_notifications_is_viewed",
|
|
"idx_map_drawings_identity_hash",
|
|
"idx_map_drawings_identity_name",
|
|
"idx_voicemails_is_read",
|
|
"idx_archived_pages_created_at",
|
|
"idx_lxmf_messages_state_peer",
|
|
"idx_lxmf_messages_peer_hash",
|
|
"idx_lxmf_messages_peer_ts",
|
|
"idx_announces_updated_at",
|
|
"idx_announces_aspect",
|
|
]
|
|
for idx in expected:
|
|
self.assertIn(idx, index_names, f"Missing index: {idx}")
|
|
print(f" {idx}: OK")
|
|
|
|
def test_pragmas_applied(self):
|
|
"""Verify performance PRAGMAs are active."""
|
|
print("\n[PRAGMAs] Checking applied PRAGMAs:")
|
|
journal = self.db._get_pragma_value("journal_mode")
|
|
print(f" journal_mode: {journal}")
|
|
self.assertEqual(journal, "wal")
|
|
|
|
sync = self.db._get_pragma_value("synchronous")
|
|
print(f" synchronous: {sync}")
|
|
self.assertEqual(sync, 1) # NORMAL = 1
|
|
|
|
temp_store = self.db._get_pragma_value("temp_store")
|
|
print(f" temp_store: {temp_store}")
|
|
self.assertEqual(temp_store, 2) # MEMORY = 2
|
|
|
|
cache_size = self.db._get_pragma_value("cache_size")
|
|
print(f" cache_size: {cache_size}")
|
|
self.assertLessEqual(cache_size, -8000)
|
|
|
|
# ===================================================================
|
|
# QUERY PLAN CHECKS — confirm indexes are actually used
|
|
# ===================================================================
|
|
|
|
def test_query_plan_messages_by_peer(self):
|
|
"""The most common message query should use peer_hash index."""
|
|
print("\n[Query Plan] Messages by peer_hash:")
|
|
rows = self.db.provider.fetchall(
|
|
"EXPLAIN QUERY PLAN SELECT * FROM lxmf_messages WHERE peer_hash = ? ORDER BY id DESC LIMIT 50",
|
|
("test",),
|
|
)
|
|
plan = " ".join(str(r["detail"]) for r in rows)
|
|
print(f" {plan}")
|
|
self.assertIn("idx_lxmf_messages_peer_hash", plan.lower())
|
|
|
|
def test_query_plan_announces_by_aspect(self):
|
|
"""Announce filtering by aspect should use the aspect index."""
|
|
print("\n[Query Plan] Announces by aspect:")
|
|
rows = self.db.provider.fetchall(
|
|
"EXPLAIN QUERY PLAN SELECT * FROM announces WHERE aspect = ? ORDER BY updated_at DESC LIMIT 50",
|
|
("lxmf.delivery",),
|
|
)
|
|
plan = " ".join(str(r["detail"]) for r in rows)
|
|
print(f" {plan}")
|
|
self.assertIn("idx_announces_aspect", plan.lower())
|
|
|
|
def test_query_plan_failed_messages_state_peer(self):
|
|
"""The failed_count subquery should use the state+peer composite index."""
|
|
print("\n[Query Plan] Failed messages (state, peer_hash):")
|
|
rows = self.db.provider.fetchall(
|
|
"EXPLAIN QUERY PLAN SELECT COUNT(*) FROM lxmf_messages WHERE state = 'failed' AND peer_hash = ?",
|
|
("test",),
|
|
)
|
|
plan = " ".join(str(r["detail"]) for r in rows)
|
|
print(f" {plan}")
|
|
self.assertIn("idx_lxmf_messages_state_peer", plan.lower())
|
|
|
|
def test_query_plan_notifications_unread(self):
|
|
"""Notification unread filter should use the is_viewed index."""
|
|
print("\n[Query Plan] Notifications unread:")
|
|
rows = self.db.provider.fetchall(
|
|
"EXPLAIN QUERY PLAN SELECT * FROM notifications WHERE is_viewed = 0 ORDER BY timestamp DESC LIMIT 50",
|
|
)
|
|
plan = " ".join(str(r["detail"]) for r in rows)
|
|
print(f" {plan}")
|
|
self.assertIn("idx_notifications_is_viewed", plan.lower())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|