mirror of
https://github.com/element-hq/synapse.git
synced 2026-05-13 17:13:11 +00:00
fix: Cap WorkerLock timeout intervals to 60 seconds (#19394)
Fixes the symptoms of https://github.com/element-hq/synapse/issues/19315
/ https://github.com/element-hq/synapse/issues/19588 but not the
underlying reason causing the number to grow so large in the first
place.
```
ValueError: Exceeds the limit (4300 digits) for integer string conversion; use sys.set_int_max_str_digits() to increase the limit
```
Copied from the original pull request on [Famedly's Synapse
repo](https://github.com/famedly/synapse/pull/221) (with some edits):
Basing the time interval around a 5 seconds leaves a big window of
waiting especially as this window is doubled each retry, when another
worker could be making progress but can not.
Right now, the retry interval in seconds looks like `[0.2, 5, 10, 20,
40, 80, 160, 320, (continues to double)]` after which logging should
start about excessive times and (relatively quickly) end up with an
extremely large retry interval with an unrealistic expectation past the
heat death of the universe. 1 year in seconds = 31,536,000.
With this change, retry intervals in seconds should look more like:
```
[
0.2,
0.4,
0.8,
1.6,
3.2,
6.4,
12.8,
25.6,
51.2,
60, < never goes higher than this
]
```
Logging about excessive wait times will start at 10 minutes.
<details>
<summary>Previous breakdown when we were using 15 minutes</summary>
```
[
0.2,
0.4,
0.8,
1.6,
3.2,
6.4,
12.8,
25.6,
51.2,
102.4, # 1.7 minutes
204.8, # 3.41 minutes
409.6, # 6.83 minutes
819.2, # 13.65 minutes < logging about excessive times will start here, 13th iteration
900, # 15 minutes < never goes higher than this
]
```
</details>
Further suggested work in this area could be to define the cap, the
retry interval starting point and the multiplier depending on how
frequently this lock should be checked. See data below for reasons why.
Increasing the jitter range may also be a good idea
---------
Co-authored-by: Eric Eastwood <madlittlemods@gmail.com>
(cherry picked from commit 3f58bc50df)
This commit is contained in:
committed by
Olivier 'reivilibre
parent
16863c87d5
commit
0eefdbcb95
@@ -0,0 +1 @@
|
||||
Capped the `WorkerLock` time out interval to a maximum of 60 seconds to prevent dealing with excessively long numbers. Contributed by Famedly.
|
||||
@@ -54,6 +54,9 @@ logger = logging.getLogger(__name__)
|
||||
# will not disappear under our feet as long as we don't delete the room.
|
||||
NEW_EVENT_DURING_PURGE_LOCK_NAME = "new_event_during_purge_lock"
|
||||
|
||||
WORKER_LOCK_MAX_RETRY_INTERVAL = Duration(seconds=60)
|
||||
WORKER_LOCK_EXCESSIVE_WAITING_WARN_DURATION = Duration(minutes=10)
|
||||
|
||||
|
||||
class WorkerLocksHandler:
|
||||
"""A class for waiting on taking out locks, rather than using the storage
|
||||
@@ -206,9 +209,10 @@ class WaitingLock:
|
||||
lock_name: str
|
||||
lock_key: str
|
||||
write: bool | None
|
||||
start_ts_ms: int = 0
|
||||
deferred: "defer.Deferred[None]" = attr.Factory(defer.Deferred)
|
||||
_inner_lock: Lock | None = None
|
||||
_retry_interval: float = 0.1
|
||||
_timeout_interval: float = 0.1
|
||||
_lock_span: "opentracing.Scope" = attr.Factory(
|
||||
lambda: start_active_span("WaitingLock.lock")
|
||||
)
|
||||
@@ -220,6 +224,7 @@ class WaitingLock:
|
||||
self.deferred.callback(None)
|
||||
|
||||
async def __aenter__(self) -> None:
|
||||
self.start_ts_ms = self.clock.time_msec()
|
||||
self._lock_span.__enter__()
|
||||
|
||||
with start_active_span("WaitingLock.waiting_for_lock"):
|
||||
@@ -240,19 +245,44 @@ class WaitingLock:
|
||||
break
|
||||
|
||||
try:
|
||||
# Wait until the we get notified the lock might have been
|
||||
# Wait until the notification that the lock might have been
|
||||
# released (by the deferred being resolved). We also
|
||||
# periodically wake up in case the lock was released but we
|
||||
# periodically wake up in case the lock was released, but we
|
||||
# weren't notified.
|
||||
with PreserveLoggingContext():
|
||||
timeout = self._get_next_retry_interval()
|
||||
await timeout_deferred(
|
||||
deferred=self.deferred,
|
||||
timeout=timeout,
|
||||
timeout=self._timeout_interval,
|
||||
clock=self.clock,
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
except defer.TimeoutError:
|
||||
# Only increment the timeout value if this was an actual timeout
|
||||
# (defer.TimeoutError)
|
||||
self._increment_timeout_interval()
|
||||
|
||||
now_ms = self.clock.time_msec()
|
||||
time_spent_trying_to_lock = Duration(
|
||||
milliseconds=now_ms - self.start_ts_ms
|
||||
)
|
||||
if (
|
||||
time_spent_trying_to_lock.as_millis()
|
||||
> WORKER_LOCK_EXCESSIVE_WAITING_WARN_DURATION.as_millis()
|
||||
):
|
||||
logger.warning(
|
||||
"(WaitingLock (%s, %s)) Time spent waiting to acquire lock "
|
||||
"is getting excessive: %ss. There may be a deadlock.",
|
||||
self.lock_name,
|
||||
self.lock_key,
|
||||
time_spent_trying_to_lock.as_secs(),
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"Caught an exception while waiting on WaitingLock(lock_name=%s, lock_key=%s): %r",
|
||||
self.lock_name,
|
||||
self.lock_key,
|
||||
e,
|
||||
)
|
||||
|
||||
return await self._inner_lock.__aenter__()
|
||||
|
||||
@@ -273,15 +303,14 @@ class WaitingLock:
|
||||
|
||||
return r
|
||||
|
||||
def _get_next_retry_interval(self) -> float:
|
||||
next = self._retry_interval
|
||||
self._retry_interval = max(5, next * 2)
|
||||
if self._retry_interval > Duration(minutes=10).as_secs(): # >7 iterations
|
||||
logger.warning(
|
||||
"Lock timeout is getting excessive: %ss. There may be a deadlock.",
|
||||
self._retry_interval,
|
||||
)
|
||||
return next * random.uniform(0.9, 1.1)
|
||||
def _increment_timeout_interval(self) -> float:
|
||||
next_interval = self._timeout_interval
|
||||
next_interval = min(WORKER_LOCK_MAX_RETRY_INTERVAL.as_secs(), next_interval * 2)
|
||||
|
||||
# The jitter value is maintained for the timeout, to help avoid a "thundering
|
||||
# herd" situation when all locks may time out at the same time.
|
||||
self._timeout_interval = next_interval * random.uniform(0.9, 1.1)
|
||||
return self._timeout_interval
|
||||
|
||||
|
||||
@attr.s(auto_attribs=True, eq=False)
|
||||
@@ -294,10 +323,11 @@ class WaitingMultiLock:
|
||||
store: LockStore
|
||||
handler: WorkerLocksHandler
|
||||
|
||||
start_ts_ms: int = 0
|
||||
deferred: "defer.Deferred[None]" = attr.Factory(defer.Deferred)
|
||||
|
||||
_inner_lock_cm: AsyncContextManager | None = None
|
||||
_retry_interval: float = 0.1
|
||||
_timeout_interval: float = 0.1
|
||||
_lock_span: "opentracing.Scope" = attr.Factory(
|
||||
lambda: start_active_span("WaitingLock.lock")
|
||||
)
|
||||
@@ -309,6 +339,7 @@ class WaitingMultiLock:
|
||||
self.deferred.callback(None)
|
||||
|
||||
async def __aenter__(self) -> None:
|
||||
self.start_ts_ms = self.clock.time_msec()
|
||||
self._lock_span.__enter__()
|
||||
|
||||
with start_active_span("WaitingLock.waiting_for_lock"):
|
||||
@@ -324,19 +355,42 @@ class WaitingMultiLock:
|
||||
break
|
||||
|
||||
try:
|
||||
# Wait until the we get notified the lock might have been
|
||||
# Wait until the notification that the lock might have been
|
||||
# released (by the deferred being resolved). We also
|
||||
# periodically wake up in case the lock was released but we
|
||||
# periodically wake up in case the lock was released, but we
|
||||
# weren't notified.
|
||||
with PreserveLoggingContext():
|
||||
timeout = self._get_next_retry_interval()
|
||||
await timeout_deferred(
|
||||
deferred=self.deferred,
|
||||
timeout=timeout,
|
||||
timeout=self._timeout_interval,
|
||||
clock=self.clock,
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
except defer.TimeoutError:
|
||||
# Only increment the timeout value if this was an actual timeout
|
||||
# (defer.TimeoutError)
|
||||
self._increment_timeout_interval()
|
||||
|
||||
now_ms = self.clock.time_msec()
|
||||
time_spent_trying_to_lock = Duration(
|
||||
milliseconds=now_ms - self.start_ts_ms
|
||||
)
|
||||
if (
|
||||
time_spent_trying_to_lock.as_millis()
|
||||
> WORKER_LOCK_EXCESSIVE_WAITING_WARN_DURATION.as_millis()
|
||||
):
|
||||
logger.warning(
|
||||
"(WaitingMultiLock (%r)) Time spent waiting to acquire lock "
|
||||
"is getting excessive: %ss. There may be a deadlock.",
|
||||
self.lock_names,
|
||||
time_spent_trying_to_lock.as_secs(),
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"Caught an exception while waiting on WaitingMultiLock(lock_names=%r): %r",
|
||||
self.lock_names,
|
||||
e,
|
||||
)
|
||||
|
||||
assert self._inner_lock_cm
|
||||
await self._inner_lock_cm.__aenter__()
|
||||
@@ -360,12 +414,11 @@ class WaitingMultiLock:
|
||||
|
||||
return r
|
||||
|
||||
def _get_next_retry_interval(self) -> float:
|
||||
next = self._retry_interval
|
||||
self._retry_interval = max(5, next * 2)
|
||||
if self._retry_interval > Duration(minutes=10).as_secs(): # >7 iterations
|
||||
logger.warning(
|
||||
"Lock timeout is getting excessive: %ss. There may be a deadlock.",
|
||||
self._retry_interval,
|
||||
)
|
||||
return next * random.uniform(0.9, 1.1)
|
||||
def _increment_timeout_interval(self) -> float:
|
||||
next_interval = self._timeout_interval
|
||||
next_interval = min(WORKER_LOCK_MAX_RETRY_INTERVAL.as_secs(), next_interval * 2)
|
||||
|
||||
# The jitter value is maintained for the timeout, to help avoid a "thundering
|
||||
# herd" situation when all locks may time out at the same time.
|
||||
self._timeout_interval = next_interval * random.uniform(0.9, 1.1)
|
||||
return self._timeout_interval
|
||||
|
||||
@@ -26,7 +26,9 @@ from twisted.internet import defer
|
||||
from twisted.internet.testing import MemoryReactor
|
||||
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.databases.main.lock import _RENEWAL_INTERVAL
|
||||
from synapse.util.clock import Clock
|
||||
from synapse.util.duration import Duration
|
||||
|
||||
from tests import unittest
|
||||
from tests.replication._base import BaseMultiWorkerStreamTestCase
|
||||
@@ -40,6 +42,7 @@ class WorkerLockTestCase(unittest.HomeserverTestCase):
|
||||
self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
|
||||
) -> None:
|
||||
self.worker_lock_handler = self.hs.get_worker_locks_handler()
|
||||
self.store = self.hs.get_datastores().main
|
||||
|
||||
def test_wait_for_lock_locally(self) -> None:
|
||||
"""Test waiting for a lock on a single worker"""
|
||||
@@ -56,6 +59,66 @@ class WorkerLockTestCase(unittest.HomeserverTestCase):
|
||||
self.get_success(d2)
|
||||
self.get_success(lock2.__aexit__(None, None, None))
|
||||
|
||||
def test_timeouts_for_lock_locally(self) -> None:
|
||||
"""
|
||||
Test that we regularly retry to reacquire locks.
|
||||
|
||||
This is a regression test to make sure the lock retry time doesn't balloon to a value
|
||||
so large it can't even be printed reliably anymore.
|
||||
"""
|
||||
|
||||
# Create and acquire the first lock
|
||||
lock1 = self.worker_lock_handler.acquire_lock("name", "key")
|
||||
self.get_success(lock1.__aenter__())
|
||||
|
||||
# Create and try to acquire the second lock
|
||||
lock2 = self.worker_lock_handler.acquire_lock("name", "key")
|
||||
d2 = defer.ensureDeferred(lock2.__aenter__())
|
||||
# Make sure we haven't acquired the lock yet (`lock1` still holds it)
|
||||
self.assertNoResult(d2)
|
||||
|
||||
# Advance time by an hour (some duration that would previously cause our timeout
|
||||
# to balloon if it weren't constrained). Max back-off (saturate)
|
||||
#
|
||||
# Note: We use `_pump_by` instead of `pump`/`advance` as the `Lock` has an
|
||||
# internal background looping call that runs every 30 seconds
|
||||
# (`_RENEWAL_INTERVAL`) to renew the `Lock` and push it's "drop timeout" value
|
||||
# further out by 2 minutes (`_LOCK_TIMEOUT_MS`). The `Lock` will prematurely
|
||||
# drop if this renewal is not allowed to run, which sours the test.
|
||||
# self.pump(amount=Duration(hours=1))
|
||||
self._pump_by(amount=Duration(hours=1), by=_RENEWAL_INTERVAL)
|
||||
|
||||
# Make sure we haven't acquired the `lock2` yet (`lock1` still holds it)
|
||||
self.assertNoResult(d2)
|
||||
|
||||
# Release the first lock (`lock1`). The second lock(`lock2`) should be
|
||||
# automatically acquired by the `pump()` inside `get_success()`
|
||||
self.get_success(lock1.__aexit__(None, None, None))
|
||||
|
||||
# We should now have the lock
|
||||
self.successResultOf(d2)
|
||||
|
||||
def _pump_by(
|
||||
self,
|
||||
*,
|
||||
amount: Duration = Duration(seconds=0),
|
||||
by: Duration = Duration(seconds=0.1),
|
||||
) -> None:
|
||||
"""
|
||||
Like `self.pump()` but you can specify the time increment to advance with until
|
||||
you reach the time amount.
|
||||
|
||||
Unlike `self.pump()`, this doesn't multiply the time at all.
|
||||
|
||||
Args:
|
||||
amount: The amount of time to advance
|
||||
by: The time increment in seconds to advance time by until we reach the `amount`
|
||||
"""
|
||||
end_time_s = self.reactor.seconds() + amount.as_secs()
|
||||
|
||||
while self.reactor.seconds() < end_time_s:
|
||||
self.reactor.advance(by.as_secs())
|
||||
|
||||
def test_lock_contention(self) -> None:
|
||||
"""Test lock contention when a lot of locks wait on a single worker"""
|
||||
nb_locks_to_test = 500
|
||||
@@ -124,3 +187,70 @@ class WorkerLockWorkersTestCase(BaseMultiWorkerStreamTestCase):
|
||||
|
||||
self.get_success(d2)
|
||||
self.get_success(lock2.__aexit__(None, None, None))
|
||||
|
||||
def test_timeouts_for_lock_worker(self) -> None:
|
||||
"""
|
||||
Test that we regularly retry to reacquire locks.
|
||||
|
||||
This is a regression test to make sure the lock retry time doesn't balloon to a value
|
||||
so large it can't even be printed reliably anymore.
|
||||
"""
|
||||
worker = self.make_worker_hs(
|
||||
"synapse.app.generic_worker",
|
||||
extra_config={
|
||||
"redis": {"enabled": True},
|
||||
},
|
||||
)
|
||||
worker_lock_handler = worker.get_worker_locks_handler()
|
||||
|
||||
# Create and acquire the first lock on the main process
|
||||
lock1 = self.main_worker_lock_handler.acquire_lock("name", "key")
|
||||
self.get_success(lock1.__aenter__())
|
||||
|
||||
# Create and try to acquire the second lock on the worker
|
||||
lock2 = worker_lock_handler.acquire_lock("name", "key")
|
||||
d2 = defer.ensureDeferred(lock2.__aenter__())
|
||||
# Make sure we haven't acquired the lock yet (`lock1` still holds it)
|
||||
self.assertNoResult(d2)
|
||||
|
||||
# Advance time by an hour (some duration that would previously cause our timeout
|
||||
# to balloon if it weren't constrained). Max back-off (saturate)
|
||||
#
|
||||
# Note: We use `_pump_by` instead of `pump`/`advance` as the `Lock` has an
|
||||
# internal background looping call that runs every 30 seconds
|
||||
# (`_RENEWAL_INTERVAL`) to renew the `Lock` and push it's "drop timeout" value
|
||||
# further out by 2 minutes (`_LOCK_TIMEOUT_MS`). The `Lock` will prematurely
|
||||
# drop if this renewal is not allowed to run, which sours the test.
|
||||
# self.pump(amount=Duration(hours=1))
|
||||
self._pump_by(amount=Duration(hours=1), by=_RENEWAL_INTERVAL)
|
||||
|
||||
# Make sure we haven't acquired the `lock2` yet (`lock1` still holds it)
|
||||
self.assertNoResult(d2)
|
||||
|
||||
# Release the first lock (`lock1`). The second lock(`lock2`) should be
|
||||
# automatically acquired by the `pump()` inside `get_success()`
|
||||
self.get_success(lock1.__aexit__(None, None, None))
|
||||
|
||||
# We should now have the lock
|
||||
self.successResultOf(d2)
|
||||
|
||||
def _pump_by(
|
||||
self,
|
||||
*,
|
||||
amount: Duration = Duration(seconds=0),
|
||||
by: Duration = Duration(seconds=0.1),
|
||||
) -> None:
|
||||
"""
|
||||
Like `self.pump()` but you can specify the time increment to advance with until
|
||||
you reach the time amount.
|
||||
|
||||
Unlike `self.pump()`, this doesn't multiply the time at all.
|
||||
|
||||
Args:
|
||||
amount: The amount of time to advance
|
||||
by: The time increment in seconds to advance time by until we reach the `amount`
|
||||
"""
|
||||
end_time_s = self.reactor.seconds() + amount.as_secs()
|
||||
|
||||
while self.reactor.seconds() < end_time_s:
|
||||
self.reactor.advance(by.as_secs())
|
||||
|
||||
Reference in New Issue
Block a user