diff --git a/tests/backend/test_performance_hotpaths.py b/tests/backend/test_performance_hotpaths.py index da6c82b..1c7f41c 100644 --- a/tests/backend/test_performance_hotpaths.py +++ b/tests/backend/test_performance_hotpaths.py @@ -11,7 +11,9 @@ Metrics collected: - Concurrent writer contention - LIKE-search scaling -All tests have hard assertions so regressions fail CI. +Latency and single-thread throughput tests use fixed ceilings/floors. Concurrent +writer tests use the same on non-CI hardware; under CI a lower throughput +floor applies unless MESHCHATX_PERF_MIN_CONCURRENT_OPS is set. """ import os @@ -33,6 +35,24 @@ from meshchatx.src.backend.message_handler import MessageHandler # --------------------------------------------------------------------------- +def _min_concurrent_throughput_ops(): + """Minimum aggregate ops/s for concurrent writer tests. + + Local runs use a strict floor. CI runners are often CPU-starved or on slow + shared storage; wall-clock throughput there is not comparable to dev + hardware. Override with MESHCHATX_PERF_MIN_CONCURRENT_OPS. + """ + raw = os.environ.get("MESHCHATX_PERF_MIN_CONCURRENT_OPS") + if raw is not None and raw.strip() != "": + try: + return max(1, int(raw, 10)) + except ValueError: + pass + if os.environ.get("CI"): + return 20 + return 100 + + def percentile(data, pct): """Return the pct-th percentile of sorted data.""" if not data: @@ -476,7 +496,12 @@ class TestPerformanceHotPaths(unittest.TestCase): latency_report("concurrent_write", all_durations) self.assertEqual(len(errors), 0, f"Writer errors: {errors[:5]}") - self.assertGreater(throughput, 100, "Concurrent write throughput < 100 ops/s") + floor = _min_concurrent_throughput_ops() + self.assertGreater( + throughput, + floor, + f"Concurrent write throughput < {floor} ops/s", + ) def test_concurrent_announce_writers(self): """Multiple threads upserting announces simultaneously.""" @@ -517,7 +542,12 @@ class TestPerformanceHotPaths(unittest.TestCase): latency_report("concurrent_announce_write", all_durations) self.assertEqual(len(errors), 0, f"Writer errors: {errors[:5]}") - self.assertGreater(throughput, 100, "Concurrent announce write < 100 ops/s") + floor = _min_concurrent_throughput_ops() + self.assertGreater( + throughput, + floor, + f"Concurrent announce write < {floor} ops/s", + ) def test_concurrent_read_write_contention(self): """Writers inserting while readers query — simulates real app usage."""