Files
synapse/tests/handlers/test_worker_lock.py
Jason Little 3f58bc50df fix: Cap WorkerLock timeout intervals to 60 seconds (#19394)
Fixes the symptoms of https://github.com/element-hq/synapse/issues/19315
/ https://github.com/element-hq/synapse/issues/19588 but not the
underlying reason causing the number to grow so large in the first
place.

```
ValueError: Exceeds the limit (4300 digits) for integer string conversion; use sys.set_int_max_str_digits() to increase the limit
```

Copied from the original pull request on [Famedly's Synapse
repo](https://github.com/famedly/synapse/pull/221) (with some edits):

Basing the time interval around a 5 seconds leaves a big window of
waiting especially as this window is doubled each retry, when another
worker could be making progress but can not.

Right now, the retry interval in seconds looks like `[0.2, 5, 10, 20,
40, 80, 160, 320, (continues to double)]` after which logging should
start about excessive times and (relatively quickly) end up with an
extremely large retry interval with an unrealistic expectation past the
heat death of the universe. 1 year in seconds = 31,536,000.

With this change, retry intervals in seconds should look more like:

```
[
0.2, 
0.4, 
0.8, 
1.6, 
3.2, 
6.4, 
12.8, 
25.6, 
51.2, 
60, < never goes higher than this
]
```

Logging about excessive wait times will start at 10 minutes.

<details>
<summary>Previous breakdown when we were using 15 minutes</summary>

```
[
0.2, 
0.4, 
0.8, 
1.6, 
3.2, 
6.4, 
12.8, 
25.6, 
51.2, 
102.4,  # 1.7 minutes
204.8,  # 3.41 minutes
409.6,  # 6.83 minutes
819.2,  # 13.65 minutes  < logging about excessive times will start here, 13th iteration
900,  # 15 minutes < never goes higher than this
]
```
</details>

Further suggested work in this area could be to define the cap, the
retry interval starting point and the multiplier depending on how
frequently this lock should be checked. See data below for reasons why.
Increasing the jitter range may also be a good idea

---------

Co-authored-by: Eric Eastwood <madlittlemods@gmail.com>
2026-05-05 14:40:17 +01:00

257 lines
9.6 KiB
Python

#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright 2023 The Matrix.org Foundation C.I.C.
# Copyright (C) 2023 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
# Originally licensed under the Apache License, Version 2.0:
# <http://www.apache.org/licenses/LICENSE-2.0>.
#
# [This file includes modifications made by New Vector Limited]
#
#
import logging
import platform
from twisted.internet import defer
from twisted.internet.testing import MemoryReactor
from synapse.server import HomeServer
from synapse.storage.databases.main.lock import _RENEWAL_INTERVAL
from synapse.util.clock import Clock
from synapse.util.duration import Duration
from tests import unittest
from tests.replication._base import BaseMultiWorkerStreamTestCase
from tests.utils import test_timeout
logger = logging.getLogger(__name__)
class WorkerLockTestCase(unittest.HomeserverTestCase):
def prepare(
self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
) -> None:
self.worker_lock_handler = self.hs.get_worker_locks_handler()
self.store = self.hs.get_datastores().main
def test_wait_for_lock_locally(self) -> None:
"""Test waiting for a lock on a single worker"""
lock1 = self.worker_lock_handler.acquire_lock("name", "key")
self.get_success(lock1.__aenter__())
lock2 = self.worker_lock_handler.acquire_lock("name", "key")
d2 = defer.ensureDeferred(lock2.__aenter__())
self.assertNoResult(d2)
self.get_success(lock1.__aexit__(None, None, None))
self.get_success(d2)
self.get_success(lock2.__aexit__(None, None, None))
def test_timeouts_for_lock_locally(self) -> None:
"""
Test that we regularly retry to reacquire locks.
This is a regression test to make sure the lock retry time doesn't balloon to a value
so large it can't even be printed reliably anymore.
"""
# Create and acquire the first lock
lock1 = self.worker_lock_handler.acquire_lock("name", "key")
self.get_success(lock1.__aenter__())
# Create and try to acquire the second lock
lock2 = self.worker_lock_handler.acquire_lock("name", "key")
d2 = defer.ensureDeferred(lock2.__aenter__())
# Make sure we haven't acquired the lock yet (`lock1` still holds it)
self.assertNoResult(d2)
# Advance time by an hour (some duration that would previously cause our timeout
# to balloon if it weren't constrained). Max back-off (saturate)
#
# Note: We use `_pump_by` instead of `pump`/`advance` as the `Lock` has an
# internal background looping call that runs every 30 seconds
# (`_RENEWAL_INTERVAL`) to renew the `Lock` and push it's "drop timeout" value
# further out by 2 minutes (`_LOCK_TIMEOUT_MS`). The `Lock` will prematurely
# drop if this renewal is not allowed to run, which sours the test.
# self.pump(amount=Duration(hours=1))
self._pump_by(amount=Duration(hours=1), by=_RENEWAL_INTERVAL)
# Make sure we haven't acquired the `lock2` yet (`lock1` still holds it)
self.assertNoResult(d2)
# Release the first lock (`lock1`). The second lock(`lock2`) should be
# automatically acquired by the `pump()` inside `get_success()`
self.get_success(lock1.__aexit__(None, None, None))
# We should now have the lock
self.successResultOf(d2)
def _pump_by(
self,
*,
amount: Duration = Duration(seconds=0),
by: Duration = Duration(seconds=0.1),
) -> None:
"""
Like `self.pump()` but you can specify the time increment to advance with until
you reach the time amount.
Unlike `self.pump()`, this doesn't multiply the time at all.
Args:
amount: The amount of time to advance
by: The time increment in seconds to advance time by until we reach the `amount`
"""
end_time_s = self.reactor.seconds() + amount.as_secs()
while self.reactor.seconds() < end_time_s:
self.reactor.advance(by.as_secs())
def test_lock_contention(self) -> None:
"""Test lock contention when a lot of locks wait on a single worker"""
nb_locks_to_test = 500
current_machine = platform.machine().lower()
if current_machine.startswith("riscv"):
# RISC-V specific settings
timeout_seconds = 15 # Increased timeout for RISC-V
# add a print or log statement here for visibility in CI logs
logger.info( # use logger.info
"Detected RISC-V architecture (%s). "
"Adjusting test_lock_contention: timeout=%ss",
current_machine,
timeout_seconds,
)
else:
# Settings for other architectures
timeout_seconds = 5
# It takes around 0.5s on a 5+ years old laptop
with test_timeout(timeout_seconds): # Use the dynamically set timeout
d = self._take_locks(
nb_locks_to_test
) # Use the (potentially adjusted) number of locks
self.assertEqual(
self.get_success(d), nb_locks_to_test
) # Assert against the used number of locks
async def _take_locks(self, nb_locks: int) -> int:
locks = [
self.hs.get_worker_locks_handler().acquire_lock("test_lock", "")
for _ in range(nb_locks)
]
nb_locks_taken = 0
for lock in locks:
async with lock:
nb_locks_taken += 1
return nb_locks_taken
class WorkerLockWorkersTestCase(BaseMultiWorkerStreamTestCase):
def prepare(
self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
) -> None:
self.main_worker_lock_handler = self.hs.get_worker_locks_handler()
def test_wait_for_lock_worker(self) -> None:
"""Test waiting for a lock on another worker"""
worker = self.make_worker_hs(
"synapse.app.generic_worker",
extra_config={
"redis": {"enabled": True},
},
)
worker_lock_handler = worker.get_worker_locks_handler()
lock1 = self.main_worker_lock_handler.acquire_lock("name", "key")
self.get_success(lock1.__aenter__())
lock2 = worker_lock_handler.acquire_lock("name", "key")
d2 = defer.ensureDeferred(lock2.__aenter__())
self.assertNoResult(d2)
self.get_success(lock1.__aexit__(None, None, None))
self.get_success(d2)
self.get_success(lock2.__aexit__(None, None, None))
def test_timeouts_for_lock_worker(self) -> None:
"""
Test that we regularly retry to reacquire locks.
This is a regression test to make sure the lock retry time doesn't balloon to a value
so large it can't even be printed reliably anymore.
"""
worker = self.make_worker_hs(
"synapse.app.generic_worker",
extra_config={
"redis": {"enabled": True},
},
)
worker_lock_handler = worker.get_worker_locks_handler()
# Create and acquire the first lock on the main process
lock1 = self.main_worker_lock_handler.acquire_lock("name", "key")
self.get_success(lock1.__aenter__())
# Create and try to acquire the second lock on the worker
lock2 = worker_lock_handler.acquire_lock("name", "key")
d2 = defer.ensureDeferred(lock2.__aenter__())
# Make sure we haven't acquired the lock yet (`lock1` still holds it)
self.assertNoResult(d2)
# Advance time by an hour (some duration that would previously cause our timeout
# to balloon if it weren't constrained). Max back-off (saturate)
#
# Note: We use `_pump_by` instead of `pump`/`advance` as the `Lock` has an
# internal background looping call that runs every 30 seconds
# (`_RENEWAL_INTERVAL`) to renew the `Lock` and push it's "drop timeout" value
# further out by 2 minutes (`_LOCK_TIMEOUT_MS`). The `Lock` will prematurely
# drop if this renewal is not allowed to run, which sours the test.
# self.pump(amount=Duration(hours=1))
self._pump_by(amount=Duration(hours=1), by=_RENEWAL_INTERVAL)
# Make sure we haven't acquired the `lock2` yet (`lock1` still holds it)
self.assertNoResult(d2)
# Release the first lock (`lock1`). The second lock(`lock2`) should be
# automatically acquired by the `pump()` inside `get_success()`
self.get_success(lock1.__aexit__(None, None, None))
# We should now have the lock
self.successResultOf(d2)
def _pump_by(
self,
*,
amount: Duration = Duration(seconds=0),
by: Duration = Duration(seconds=0.1),
) -> None:
"""
Like `self.pump()` but you can specify the time increment to advance with until
you reach the time amount.
Unlike `self.pump()`, this doesn't multiply the time at all.
Args:
amount: The amount of time to advance
by: The time increment in seconds to advance time by until we reach the `amount`
"""
end_time_s = self.reactor.seconds() + amount.as_secs()
while self.reactor.seconds() < end_time_s:
self.reactor.advance(by.as_secs())