refactor(tests): update concurrent writer tests with dynamic throughput floor based on environment variables

This commit is contained in:
Ivan
2026-04-01 11:49:30 +03:00
parent d5e4afc42b
commit fda9f18677
+33 -3
View File
@@ -11,7 +11,9 @@ Metrics collected:
- Concurrent writer contention
- LIKE-search scaling
All tests have hard assertions so regressions fail CI.
Latency and single-thread throughput tests use fixed ceilings/floors. Concurrent
writer tests use the same on non-CI hardware; under CI a lower throughput
floor applies unless MESHCHATX_PERF_MIN_CONCURRENT_OPS is set.
"""
import os
@@ -33,6 +35,24 @@ from meshchatx.src.backend.message_handler import MessageHandler
# ---------------------------------------------------------------------------
def _min_concurrent_throughput_ops():
"""Minimum aggregate ops/s for concurrent writer tests.
Local runs use a strict floor. CI runners are often CPU-starved or on slow
shared storage; wall-clock throughput there is not comparable to dev
hardware. Override with MESHCHATX_PERF_MIN_CONCURRENT_OPS.
"""
raw = os.environ.get("MESHCHATX_PERF_MIN_CONCURRENT_OPS")
if raw is not None and raw.strip() != "":
try:
return max(1, int(raw, 10))
except ValueError:
pass
if os.environ.get("CI"):
return 20
return 100
def percentile(data, pct):
"""Return the pct-th percentile of sorted data."""
if not data:
@@ -476,7 +496,12 @@ class TestPerformanceHotPaths(unittest.TestCase):
latency_report("concurrent_write", all_durations)
self.assertEqual(len(errors), 0, f"Writer errors: {errors[:5]}")
self.assertGreater(throughput, 100, "Concurrent write throughput < 100 ops/s")
floor = _min_concurrent_throughput_ops()
self.assertGreater(
throughput,
floor,
f"Concurrent write throughput < {floor} ops/s",
)
def test_concurrent_announce_writers(self):
"""Multiple threads upserting announces simultaneously."""
@@ -517,7 +542,12 @@ class TestPerformanceHotPaths(unittest.TestCase):
latency_report("concurrent_announce_write", all_durations)
self.assertEqual(len(errors), 0, f"Writer errors: {errors[:5]}")
self.assertGreater(throughput, 100, "Concurrent announce write < 100 ops/s")
floor = _min_concurrent_throughput_ops()
self.assertGreater(
throughput,
floor,
f"Concurrent announce write < {floor} ops/s",
)
def test_concurrent_read_write_contention(self):
"""Writers inserting while readers query — simulates real app usage."""