diff --git a/changelog.d/19772.misc b/changelog.d/19772.misc new file mode 100644 index 0000000000..939507f5c3 --- /dev/null +++ b/changelog.d/19772.misc @@ -0,0 +1 @@ +Update `WorkerLock` tests to better stress the `WORKER_LOCK_MAX_RETRY_INTERVAL`. diff --git a/synapse/handlers/worker_lock.py b/synapse/handlers/worker_lock.py index 57792ea53c..a37b04494b 100644 --- a/synapse/handlers/worker_lock.py +++ b/synapse/handlers/worker_lock.py @@ -61,10 +61,10 @@ The maximum wait time before retrying to acquire the lock. Better to retry more quickly than have workers wait around. 5 seconds is still a reasonable gap in time to not overwhelm the CPU/Database. -This matters most in cross-worker scenarios. When locks are on the same worker, when the -lock holder releases, we signal to other locks (with the same name/key) that they -should try reacquiring the lock immediately. But locks on other workers only re-check -based on their retry `_timeout_interval`. +This matters most when locks go stale as normally, when the lock holder releases, we +signal to other locks (with the same name/key) that they should try reacquiring the lock +immediately. But stale locks are never released and instead forcefully reaped behind the +scenes. """ WORKER_LOCK_EXCESSIVE_WAITING_WARN_DURATION = Duration(minutes=10) diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py index dd49f98366..decb74e994 100644 --- a/synapse/storage/databases/main/lock.py +++ b/synapse/storage/databases/main/lock.py @@ -53,9 +53,9 @@ logger = logging.getLogger(__name__) _RENEWAL_INTERVAL = Duration(seconds=30) # How long before an acquired lock times out. -_LOCK_TIMEOUT_MS = 2 * 60 * 1000 +_LOCK_TIMEOUT = Duration(minutes=2) -_LOCK_REAP_INTERVAL = Duration(milliseconds=_LOCK_TIMEOUT_MS / 10.0) +_LOCK_REAP_INTERVAL = Duration(milliseconds=_LOCK_TIMEOUT.as_millis() / 10.0) class LockStore(SQLBaseStore): @@ -63,7 +63,7 @@ class LockStore(SQLBaseStore): Locks are identified by a name and key. A lock is acquired by inserting into the `worker_locks` table if a) there is no existing row for the name/key or - b) the existing row has a `last_renewed_ts` older than `_LOCK_TIMEOUT_MS`. + b) the existing row has a `last_renewed_ts` older than `_LOCK_TIMEOUT`. When a lock is taken out the instance inserts a random `token`, the instance that holds that token holds the lock until it drops (or times out). @@ -182,7 +182,7 @@ class LockStore(SQLBaseStore): self._instance_name, token, now, - now - _LOCK_TIMEOUT_MS, + now - _LOCK_TIMEOUT.as_millis(), ), ) @@ -340,7 +340,9 @@ class LockStore(SQLBaseStore): """ def reap_stale_read_write_locks_txn(txn: LoggingTransaction) -> None: - txn.execute(delete_sql, (self.clock.time_msec() - _LOCK_TIMEOUT_MS,)) + txn.execute( + delete_sql, (self.clock.time_msec() - _LOCK_TIMEOUT.as_millis(),) + ) if txn.rowcount: logger.info("Reaped %d stale locks", txn.rowcount) @@ -489,7 +491,7 @@ class Lock: ) return ( last_renewed_ts is not None - and self._clock.time_msec() - _LOCK_TIMEOUT_MS < last_renewed_ts + and self._clock.time_msec() - _LOCK_TIMEOUT.as_millis() < last_renewed_ts ) async def __aenter__(self) -> None: diff --git a/tests/handlers/test_worker_lock.py b/tests/handlers/test_worker_lock.py index 74201f4151..a38adcd4d4 100644 --- a/tests/handlers/test_worker_lock.py +++ b/tests/handlers/test_worker_lock.py @@ -25,8 +25,13 @@ import platform from twisted.internet import defer from twisted.internet.testing import MemoryReactor +from synapse.handlers.worker_lock import WORKER_LOCK_MAX_RETRY_INTERVAL from synapse.server import HomeServer -from synapse.storage.databases.main.lock import _RENEWAL_INTERVAL +from synapse.storage.databases.main.lock import ( + _LOCK_REAP_INTERVAL, + _LOCK_TIMEOUT, + _RENEWAL_INTERVAL, +) from synapse.util.clock import Clock from synapse.util.duration import Duration @@ -83,7 +88,7 @@ class WorkerLockTestCase(unittest.HomeserverTestCase): # Note: We use `_pump_by` instead of `pump`/`advance` as the `Lock` has an # internal background looping call that runs every 30 seconds # (`_RENEWAL_INTERVAL`) to renew the `Lock` and push it's "drop timeout" value - # further out by 2 minutes (`_LOCK_TIMEOUT_MS`). The `Lock` will prematurely + # further out by 2 minutes (`_LOCK_TIMEOUT`). The `Lock` will prematurely # drop if this renewal is not allowed to run, which sours the test. # self.pump(amount=Duration(hours=1)) self._pump_by(amount=Duration(hours=1), by=_RENEWAL_INTERVAL) @@ -91,9 +96,34 @@ class WorkerLockTestCase(unittest.HomeserverTestCase): # Make sure we haven't acquired the `lock2` yet (`lock1` still holds it) self.assertNoResult(d2) - # Release the first lock (`lock1`). The second lock(`lock2`) should be - # automatically acquired by the `pump()` inside `get_success()` - self.get_success(lock1.__aexit__(None, None, None)) + # Drop the lock without releasing it. If we just normally released the lock + # (`self.get_success(lock1.__aexit__(None, None, None))`), the + # `add_lock_released_callback`/`notify_lock_released` cycle would signal that we + # should re-aquire the lock right away (on the next reactor tick). And we want + # to avoid that as the point of this test is to stress the retry timeout + # interval and `WORKER_LOCK_MAX_RETRY_INTERVAL`. + del lock1 + + # Wait for `lock1` to go stale (it won't be renewed anymore because we deleted + # it just above) + self._pump_by( + amount=_LOCK_TIMEOUT, + by=_RENEWAL_INTERVAL, + ) + + # Wait just enough time so `lock1` is reaped (found stale and forcefully drops + # the lock its holding) + self._pump_by( + amount=_LOCK_REAP_INTERVAL, + by=_RENEWAL_INTERVAL, + ) + + # Wait just enough time so `lock2` tries re-acquiring the lock. Should be no + # longer than our `WORKER_LOCK_MAX_RETRY_INTERVAL`. + self._pump_by( + amount=WORKER_LOCK_MAX_RETRY_INTERVAL, + by=_RENEWAL_INTERVAL, + ) # We should now have the lock self.successResultOf(d2) @@ -219,7 +249,7 @@ class WorkerLockWorkersTestCase(BaseMultiWorkerStreamTestCase): # Note: We use `_pump_by` instead of `pump`/`advance` as the `Lock` has an # internal background looping call that runs every 30 seconds # (`_RENEWAL_INTERVAL`) to renew the `Lock` and push it's "drop timeout" value - # further out by 2 minutes (`_LOCK_TIMEOUT_MS`). The `Lock` will prematurely + # further out by 2 minutes (`_LOCK_TIMEOUT`). The `Lock` will prematurely # drop if this renewal is not allowed to run, which sours the test. # self.pump(amount=Duration(hours=1)) self._pump_by(amount=Duration(hours=1), by=_RENEWAL_INTERVAL) @@ -227,9 +257,34 @@ class WorkerLockWorkersTestCase(BaseMultiWorkerStreamTestCase): # Make sure we haven't acquired the `lock2` yet (`lock1` still holds it) self.assertNoResult(d2) - # Release the first lock (`lock1`). The second lock(`lock2`) should be - # automatically acquired by the `pump()` inside `get_success()` - self.get_success(lock1.__aexit__(None, None, None)) + # Drop the lock without releasing it. If we just normally released the lock + # (`self.get_success(lock1.__aexit__(None, None, None))`), the + # `add_lock_released_callback`/`notify_lock_released` cycle would signal that we + # should re-aquire the lock right away (on the next reactor tick). And we want + # to avoid that as the point of this test is to stress the retry timeout + # interval and `WORKER_LOCK_MAX_RETRY_INTERVAL`. + del lock1 + + # Wait for `lock1` to go stale (it won't be renewed anymore because we deleted + # it just above) + self._pump_by( + amount=_LOCK_TIMEOUT, + by=_RENEWAL_INTERVAL, + ) + + # Wait just enough time so `lock1` is reaped (found stale and forcefully drops + # the lock its holding) + self._pump_by( + amount=_LOCK_REAP_INTERVAL, + by=_RENEWAL_INTERVAL, + ) + + # Wait just enough time so `lock2` tries re-acquiring the lock. Should be no + # longer than our `WORKER_LOCK_MAX_RETRY_INTERVAL`. + self._pump_by( + amount=WORKER_LOCK_MAX_RETRY_INTERVAL, + by=_RENEWAL_INTERVAL, + ) # We should now have the lock self.successResultOf(d2) diff --git a/tests/storage/databases/main/test_lock.py b/tests/storage/databases/main/test_lock.py index 622eb96ded..c38e0cc834 100644 --- a/tests/storage/databases/main/test_lock.py +++ b/tests/storage/databases/main/test_lock.py @@ -26,7 +26,7 @@ from twisted.internet.defer import Deferred from twisted.internet.testing import MemoryReactor from synapse.server import HomeServer -from synapse.storage.databases.main.lock import _LOCK_TIMEOUT_MS, _RENEWAL_INTERVAL +from synapse.storage.databases.main.lock import _LOCK_TIMEOUT, _RENEWAL_INTERVAL from synapse.util.clock import Clock from tests import unittest @@ -117,7 +117,7 @@ class LockTestCase(unittest.HomeserverTestCase): self.get_success(lock.__aenter__()) # Wait for ages with the lock, we should not be able to get the lock. - self.reactor.advance(5 * _LOCK_TIMEOUT_MS / 1000) + self.reactor.advance(5 * _LOCK_TIMEOUT.as_secs()) lock2 = self.get_success(self.store.try_acquire_lock("name", "key")) self.assertIsNone(lock2) @@ -138,7 +138,7 @@ class LockTestCase(unittest.HomeserverTestCase): lock._looping_call.stop() # Wait for the lock to timeout. - self.reactor.advance(2 * _LOCK_TIMEOUT_MS / 1000) + self.reactor.advance(2 * _LOCK_TIMEOUT.as_secs()) lock2 = self.get_success(self.store.try_acquire_lock("name", "key")) self.assertIsNotNone(lock2) @@ -154,7 +154,7 @@ class LockTestCase(unittest.HomeserverTestCase): del lock # Wait for the lock to timeout. - self.reactor.advance(2 * _LOCK_TIMEOUT_MS / 1000) + self.reactor.advance(2 * _LOCK_TIMEOUT.as_secs()) lock2 = self.get_success(self.store.try_acquire_lock("name", "key")) self.assertIsNotNone(lock2) @@ -402,7 +402,7 @@ class ReadWriteLockTestCase(unittest.HomeserverTestCase): lock._looping_call.stop() # Wait for the lock to timeout. - self.reactor.advance(2 * _LOCK_TIMEOUT_MS / 1000) + self.reactor.advance(2 * _LOCK_TIMEOUT.as_secs()) lock2 = self.get_success( self.store.try_acquire_read_write_lock("name", "key", write=True) @@ -422,7 +422,7 @@ class ReadWriteLockTestCase(unittest.HomeserverTestCase): del lock # Wait for the lock to timeout. - self.reactor.advance(2 * _LOCK_TIMEOUT_MS / 1000) + self.reactor.advance(2 * _LOCK_TIMEOUT.as_secs()) lock2 = self.get_success( self.store.try_acquire_read_write_lock("name", "key", write=True)