diff --git a/tests/backend/test_performance_hotpaths.py b/tests/backend/test_performance_hotpaths.py index 1c7f41c..2b9ea2f 100644 --- a/tests/backend/test_performance_hotpaths.py +++ b/tests/backend/test_performance_hotpaths.py @@ -11,9 +11,11 @@ Metrics collected: - Concurrent writer contention - LIKE-search scaling -Latency and single-thread throughput tests use fixed ceilings/floors. Concurrent -writer tests use the same on non-CI hardware; under CI a lower throughput -floor applies unless MESHCHATX_PERF_MIN_CONCURRENT_OPS is set. +Latency and single-thread throughput tests use fixed ceilings/floors. Message +upsert throughput uses 300 ops/s locally; under CI a lower floor applies unless +MESHCHATX_PERF_MIN_MESSAGE_UPSERT_OPS is set. Concurrent writer tests use the +same on non-CI hardware; under CI a lower throughput floor applies unless +MESHCHATX_PERF_MIN_CONCURRENT_OPS is set. """ import os @@ -35,6 +37,24 @@ from meshchatx.src.backend.message_handler import MessageHandler # --------------------------------------------------------------------------- +def _min_message_upsert_throughput_ops(): + """Minimum single-thread upsert/update ops/s for regression tests. + + Local runs use a strict floor (300). CI runners are often noisy; a dip + just below 300 is not a regression. Override with + MESHCHATX_PERF_MIN_MESSAGE_UPSERT_OPS. + """ + raw = os.environ.get("MESHCHATX_PERF_MIN_MESSAGE_UPSERT_OPS") + if raw is not None and raw.strip() != "": + try: + return max(1, int(raw, 10)) + except ValueError: + pass + if os.environ.get("CI"): + return 250 + return 300 + + def _min_concurrent_throughput_ops(): """Minimum aggregate ops/s for concurrent writer tests. @@ -429,7 +449,12 @@ class TestPerformanceHotPaths(unittest.TestCase): durations.append(ms) stats = latency_report("upsert_message", durations) - self.assertGreater(stats["ops"], 300, "Message upsert < 300 ops/s") + min_ops = _min_message_upsert_throughput_ops() + self.assertGreater( + stats["ops"], + min_ops, + f"Message upsert < {min_ops} ops/s", + ) self.assertLess(stats["p95"], 10, "Message upsert p95 > 10ms") def test_message_upsert_update_throughput(self): @@ -450,7 +475,12 @@ class TestPerformanceHotPaths(unittest.TestCase): durations.append(ms) stats = latency_report("update_message", durations) - self.assertGreater(stats["ops"], 300, "Message update < 300 ops/s") + min_ops = _min_message_upsert_throughput_ops() + self.assertGreater( + stats["ops"], + min_ops, + f"Message update < {min_ops} ops/s", + ) # =================================================================== # CONCURRENT WRITERS — contention stress