diff --git a/changelog.d/19394.bugfix b/changelog.d/19394.bugfix new file mode 100644 index 0000000000..4ca92cfb32 --- /dev/null +++ b/changelog.d/19394.bugfix @@ -0,0 +1 @@ +Capped the `WorkerLock` time out interval to a maximum of 60 seconds to prevent dealing with excessively long numbers. Contributed by Famedly. diff --git a/synapse/handlers/worker_lock.py b/synapse/handlers/worker_lock.py index 1537a18cc0..51be3b5084 100644 --- a/synapse/handlers/worker_lock.py +++ b/synapse/handlers/worker_lock.py @@ -54,6 +54,9 @@ logger = logging.getLogger(__name__) # will not disappear under our feet as long as we don't delete the room. NEW_EVENT_DURING_PURGE_LOCK_NAME = "new_event_during_purge_lock" +WORKER_LOCK_MAX_RETRY_INTERVAL = Duration(seconds=60) +WORKER_LOCK_EXCESSIVE_WAITING_WARN_DURATION = Duration(minutes=10) + class WorkerLocksHandler: """A class for waiting on taking out locks, rather than using the storage @@ -206,9 +209,10 @@ class WaitingLock: lock_name: str lock_key: str write: bool | None + start_ts_ms: int = 0 deferred: "defer.Deferred[None]" = attr.Factory(defer.Deferred) _inner_lock: Lock | None = None - _retry_interval: float = 0.1 + _timeout_interval: float = 0.1 _lock_span: "opentracing.Scope" = attr.Factory( lambda: start_active_span("WaitingLock.lock") ) @@ -220,6 +224,7 @@ class WaitingLock: self.deferred.callback(None) async def __aenter__(self) -> None: + self.start_ts_ms = self.clock.time_msec() self._lock_span.__enter__() with start_active_span("WaitingLock.waiting_for_lock"): @@ -240,19 +245,44 @@ class WaitingLock: break try: - # Wait until the we get notified the lock might have been + # Wait until the notification that the lock might have been # released (by the deferred being resolved). We also - # periodically wake up in case the lock was released but we + # periodically wake up in case the lock was released, but we # weren't notified. with PreserveLoggingContext(): - timeout = self._get_next_retry_interval() await timeout_deferred( deferred=self.deferred, - timeout=timeout, + timeout=self._timeout_interval, clock=self.clock, ) - except Exception: - pass + except defer.TimeoutError: + # Only increment the timeout value if this was an actual timeout + # (defer.TimeoutError) + self._increment_timeout_interval() + + now_ms = self.clock.time_msec() + time_spent_trying_to_lock = Duration( + milliseconds=now_ms - self.start_ts_ms + ) + if ( + time_spent_trying_to_lock.as_millis() + > WORKER_LOCK_EXCESSIVE_WAITING_WARN_DURATION.as_millis() + ): + logger.warning( + "(WaitingLock (%s, %s)) Time spent waiting to acquire lock " + "is getting excessive: %ss. There may be a deadlock.", + self.lock_name, + self.lock_key, + time_spent_trying_to_lock.as_secs(), + ) + + except Exception as e: + logger.warning( + "Caught an exception while waiting on WaitingLock(lock_name=%s, lock_key=%s): %r", + self.lock_name, + self.lock_key, + e, + ) return await self._inner_lock.__aenter__() @@ -273,15 +303,14 @@ class WaitingLock: return r - def _get_next_retry_interval(self) -> float: - next = self._retry_interval - self._retry_interval = max(5, next * 2) - if self._retry_interval > Duration(minutes=10).as_secs(): # >7 iterations - logger.warning( - "Lock timeout is getting excessive: %ss. There may be a deadlock.", - self._retry_interval, - ) - return next * random.uniform(0.9, 1.1) + def _increment_timeout_interval(self) -> float: + next_interval = self._timeout_interval + next_interval = min(WORKER_LOCK_MAX_RETRY_INTERVAL.as_secs(), next_interval * 2) + + # The jitter value is maintained for the timeout, to help avoid a "thundering + # herd" situation when all locks may time out at the same time. + self._timeout_interval = next_interval * random.uniform(0.9, 1.1) + return self._timeout_interval @attr.s(auto_attribs=True, eq=False) @@ -294,10 +323,11 @@ class WaitingMultiLock: store: LockStore handler: WorkerLocksHandler + start_ts_ms: int = 0 deferred: "defer.Deferred[None]" = attr.Factory(defer.Deferred) _inner_lock_cm: AsyncContextManager | None = None - _retry_interval: float = 0.1 + _timeout_interval: float = 0.1 _lock_span: "opentracing.Scope" = attr.Factory( lambda: start_active_span("WaitingLock.lock") ) @@ -309,6 +339,7 @@ class WaitingMultiLock: self.deferred.callback(None) async def __aenter__(self) -> None: + self.start_ts_ms = self.clock.time_msec() self._lock_span.__enter__() with start_active_span("WaitingLock.waiting_for_lock"): @@ -324,19 +355,42 @@ class WaitingMultiLock: break try: - # Wait until the we get notified the lock might have been + # Wait until the notification that the lock might have been # released (by the deferred being resolved). We also - # periodically wake up in case the lock was released but we + # periodically wake up in case the lock was released, but we # weren't notified. with PreserveLoggingContext(): - timeout = self._get_next_retry_interval() await timeout_deferred( deferred=self.deferred, - timeout=timeout, + timeout=self._timeout_interval, clock=self.clock, ) - except Exception: - pass + except defer.TimeoutError: + # Only increment the timeout value if this was an actual timeout + # (defer.TimeoutError) + self._increment_timeout_interval() + + now_ms = self.clock.time_msec() + time_spent_trying_to_lock = Duration( + milliseconds=now_ms - self.start_ts_ms + ) + if ( + time_spent_trying_to_lock.as_millis() + > WORKER_LOCK_EXCESSIVE_WAITING_WARN_DURATION.as_millis() + ): + logger.warning( + "(WaitingMultiLock (%r)) Time spent waiting to acquire lock " + "is getting excessive: %ss. There may be a deadlock.", + self.lock_names, + time_spent_trying_to_lock.as_secs(), + ) + + except Exception as e: + logger.warning( + "Caught an exception while waiting on WaitingMultiLock(lock_names=%r): %r", + self.lock_names, + e, + ) assert self._inner_lock_cm await self._inner_lock_cm.__aenter__() @@ -360,12 +414,11 @@ class WaitingMultiLock: return r - def _get_next_retry_interval(self) -> float: - next = self._retry_interval - self._retry_interval = max(5, next * 2) - if self._retry_interval > Duration(minutes=10).as_secs(): # >7 iterations - logger.warning( - "Lock timeout is getting excessive: %ss. There may be a deadlock.", - self._retry_interval, - ) - return next * random.uniform(0.9, 1.1) + def _increment_timeout_interval(self) -> float: + next_interval = self._timeout_interval + next_interval = min(WORKER_LOCK_MAX_RETRY_INTERVAL.as_secs(), next_interval * 2) + + # The jitter value is maintained for the timeout, to help avoid a "thundering + # herd" situation when all locks may time out at the same time. + self._timeout_interval = next_interval * random.uniform(0.9, 1.1) + return self._timeout_interval diff --git a/tests/handlers/test_worker_lock.py b/tests/handlers/test_worker_lock.py index 61ff51ff92..74201f4151 100644 --- a/tests/handlers/test_worker_lock.py +++ b/tests/handlers/test_worker_lock.py @@ -26,7 +26,9 @@ from twisted.internet import defer from twisted.internet.testing import MemoryReactor from synapse.server import HomeServer +from synapse.storage.databases.main.lock import _RENEWAL_INTERVAL from synapse.util.clock import Clock +from synapse.util.duration import Duration from tests import unittest from tests.replication._base import BaseMultiWorkerStreamTestCase @@ -40,6 +42,7 @@ class WorkerLockTestCase(unittest.HomeserverTestCase): self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer ) -> None: self.worker_lock_handler = self.hs.get_worker_locks_handler() + self.store = self.hs.get_datastores().main def test_wait_for_lock_locally(self) -> None: """Test waiting for a lock on a single worker""" @@ -56,6 +59,66 @@ class WorkerLockTestCase(unittest.HomeserverTestCase): self.get_success(d2) self.get_success(lock2.__aexit__(None, None, None)) + def test_timeouts_for_lock_locally(self) -> None: + """ + Test that we regularly retry to reacquire locks. + + This is a regression test to make sure the lock retry time doesn't balloon to a value + so large it can't even be printed reliably anymore. + """ + + # Create and acquire the first lock + lock1 = self.worker_lock_handler.acquire_lock("name", "key") + self.get_success(lock1.__aenter__()) + + # Create and try to acquire the second lock + lock2 = self.worker_lock_handler.acquire_lock("name", "key") + d2 = defer.ensureDeferred(lock2.__aenter__()) + # Make sure we haven't acquired the lock yet (`lock1` still holds it) + self.assertNoResult(d2) + + # Advance time by an hour (some duration that would previously cause our timeout + # to balloon if it weren't constrained). Max back-off (saturate) + # + # Note: We use `_pump_by` instead of `pump`/`advance` as the `Lock` has an + # internal background looping call that runs every 30 seconds + # (`_RENEWAL_INTERVAL`) to renew the `Lock` and push it's "drop timeout" value + # further out by 2 minutes (`_LOCK_TIMEOUT_MS`). The `Lock` will prematurely + # drop if this renewal is not allowed to run, which sours the test. + # self.pump(amount=Duration(hours=1)) + self._pump_by(amount=Duration(hours=1), by=_RENEWAL_INTERVAL) + + # Make sure we haven't acquired the `lock2` yet (`lock1` still holds it) + self.assertNoResult(d2) + + # Release the first lock (`lock1`). The second lock(`lock2`) should be + # automatically acquired by the `pump()` inside `get_success()` + self.get_success(lock1.__aexit__(None, None, None)) + + # We should now have the lock + self.successResultOf(d2) + + def _pump_by( + self, + *, + amount: Duration = Duration(seconds=0), + by: Duration = Duration(seconds=0.1), + ) -> None: + """ + Like `self.pump()` but you can specify the time increment to advance with until + you reach the time amount. + + Unlike `self.pump()`, this doesn't multiply the time at all. + + Args: + amount: The amount of time to advance + by: The time increment in seconds to advance time by until we reach the `amount` + """ + end_time_s = self.reactor.seconds() + amount.as_secs() + + while self.reactor.seconds() < end_time_s: + self.reactor.advance(by.as_secs()) + def test_lock_contention(self) -> None: """Test lock contention when a lot of locks wait on a single worker""" nb_locks_to_test = 500 @@ -124,3 +187,70 @@ class WorkerLockWorkersTestCase(BaseMultiWorkerStreamTestCase): self.get_success(d2) self.get_success(lock2.__aexit__(None, None, None)) + + def test_timeouts_for_lock_worker(self) -> None: + """ + Test that we regularly retry to reacquire locks. + + This is a regression test to make sure the lock retry time doesn't balloon to a value + so large it can't even be printed reliably anymore. + """ + worker = self.make_worker_hs( + "synapse.app.generic_worker", + extra_config={ + "redis": {"enabled": True}, + }, + ) + worker_lock_handler = worker.get_worker_locks_handler() + + # Create and acquire the first lock on the main process + lock1 = self.main_worker_lock_handler.acquire_lock("name", "key") + self.get_success(lock1.__aenter__()) + + # Create and try to acquire the second lock on the worker + lock2 = worker_lock_handler.acquire_lock("name", "key") + d2 = defer.ensureDeferred(lock2.__aenter__()) + # Make sure we haven't acquired the lock yet (`lock1` still holds it) + self.assertNoResult(d2) + + # Advance time by an hour (some duration that would previously cause our timeout + # to balloon if it weren't constrained). Max back-off (saturate) + # + # Note: We use `_pump_by` instead of `pump`/`advance` as the `Lock` has an + # internal background looping call that runs every 30 seconds + # (`_RENEWAL_INTERVAL`) to renew the `Lock` and push it's "drop timeout" value + # further out by 2 minutes (`_LOCK_TIMEOUT_MS`). The `Lock` will prematurely + # drop if this renewal is not allowed to run, which sours the test. + # self.pump(amount=Duration(hours=1)) + self._pump_by(amount=Duration(hours=1), by=_RENEWAL_INTERVAL) + + # Make sure we haven't acquired the `lock2` yet (`lock1` still holds it) + self.assertNoResult(d2) + + # Release the first lock (`lock1`). The second lock(`lock2`) should be + # automatically acquired by the `pump()` inside `get_success()` + self.get_success(lock1.__aexit__(None, None, None)) + + # We should now have the lock + self.successResultOf(d2) + + def _pump_by( + self, + *, + amount: Duration = Duration(seconds=0), + by: Duration = Duration(seconds=0.1), + ) -> None: + """ + Like `self.pump()` but you can specify the time increment to advance with until + you reach the time amount. + + Unlike `self.pump()`, this doesn't multiply the time at all. + + Args: + amount: The amount of time to advance + by: The time increment in seconds to advance time by until we reach the `amount` + """ + end_time_s = self.reactor.seconds() + amount.as_secs() + + while self.reactor.seconds() < end_time_s: + self.reactor.advance(by.as_secs())