mirror of
https://github.com/element-hq/synapse.git
synced 2026-05-14 06:05:10 +00:00
Update WorkerLock tests to better stress the WORKER_LOCK_MAX_RETRY_INTERVAL (#19772)
There is no behavioral change, only a change to the tests. See https://github.com/element-hq/synapse/pull/19772#discussion_r3222059105 for an explanation of why the tests needed changing (and diff comments). Follow-up to https://github.com/element-hq/synapse/pull/19394. The test discussion originally happened in https://github.com/element-hq/synapse/pull/19394#discussion_r2789673181 This is spawning from thinking about the problem again.
This commit is contained in:
@@ -0,0 +1 @@
|
||||
Update `WorkerLock` tests to better stress the `WORKER_LOCK_MAX_RETRY_INTERVAL`.
|
||||
@@ -61,10 +61,10 @@ The maximum wait time before retrying to acquire the lock.
|
||||
Better to retry more quickly than have workers wait around. 5 seconds is still a
|
||||
reasonable gap in time to not overwhelm the CPU/Database.
|
||||
|
||||
This matters most in cross-worker scenarios. When locks are on the same worker, when the
|
||||
lock holder releases, we signal to other locks (with the same name/key) that they
|
||||
should try reacquiring the lock immediately. But locks on other workers only re-check
|
||||
based on their retry `_timeout_interval`.
|
||||
This matters most when locks go stale as normally, when the lock holder releases, we
|
||||
signal to other locks (with the same name/key) that they should try reacquiring the lock
|
||||
immediately. But stale locks are never released and instead forcefully reaped behind the
|
||||
scenes.
|
||||
"""
|
||||
WORKER_LOCK_EXCESSIVE_WAITING_WARN_DURATION = Duration(minutes=10)
|
||||
|
||||
|
||||
@@ -53,9 +53,9 @@ logger = logging.getLogger(__name__)
|
||||
_RENEWAL_INTERVAL = Duration(seconds=30)
|
||||
|
||||
# How long before an acquired lock times out.
|
||||
_LOCK_TIMEOUT_MS = 2 * 60 * 1000
|
||||
_LOCK_TIMEOUT = Duration(minutes=2)
|
||||
|
||||
_LOCK_REAP_INTERVAL = Duration(milliseconds=_LOCK_TIMEOUT_MS / 10.0)
|
||||
_LOCK_REAP_INTERVAL = Duration(milliseconds=_LOCK_TIMEOUT.as_millis() / 10.0)
|
||||
|
||||
|
||||
class LockStore(SQLBaseStore):
|
||||
@@ -63,7 +63,7 @@ class LockStore(SQLBaseStore):
|
||||
|
||||
Locks are identified by a name and key. A lock is acquired by inserting into
|
||||
the `worker_locks` table if a) there is no existing row for the name/key or
|
||||
b) the existing row has a `last_renewed_ts` older than `_LOCK_TIMEOUT_MS`.
|
||||
b) the existing row has a `last_renewed_ts` older than `_LOCK_TIMEOUT`.
|
||||
|
||||
When a lock is taken out the instance inserts a random `token`, the instance
|
||||
that holds that token holds the lock until it drops (or times out).
|
||||
@@ -182,7 +182,7 @@ class LockStore(SQLBaseStore):
|
||||
self._instance_name,
|
||||
token,
|
||||
now,
|
||||
now - _LOCK_TIMEOUT_MS,
|
||||
now - _LOCK_TIMEOUT.as_millis(),
|
||||
),
|
||||
)
|
||||
|
||||
@@ -340,7 +340,9 @@ class LockStore(SQLBaseStore):
|
||||
"""
|
||||
|
||||
def reap_stale_read_write_locks_txn(txn: LoggingTransaction) -> None:
|
||||
txn.execute(delete_sql, (self.clock.time_msec() - _LOCK_TIMEOUT_MS,))
|
||||
txn.execute(
|
||||
delete_sql, (self.clock.time_msec() - _LOCK_TIMEOUT.as_millis(),)
|
||||
)
|
||||
if txn.rowcount:
|
||||
logger.info("Reaped %d stale locks", txn.rowcount)
|
||||
|
||||
@@ -489,7 +491,7 @@ class Lock:
|
||||
)
|
||||
return (
|
||||
last_renewed_ts is not None
|
||||
and self._clock.time_msec() - _LOCK_TIMEOUT_MS < last_renewed_ts
|
||||
and self._clock.time_msec() - _LOCK_TIMEOUT.as_millis() < last_renewed_ts
|
||||
)
|
||||
|
||||
async def __aenter__(self) -> None:
|
||||
|
||||
@@ -25,8 +25,13 @@ import platform
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.testing import MemoryReactor
|
||||
|
||||
from synapse.handlers.worker_lock import WORKER_LOCK_MAX_RETRY_INTERVAL
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.databases.main.lock import _RENEWAL_INTERVAL
|
||||
from synapse.storage.databases.main.lock import (
|
||||
_LOCK_REAP_INTERVAL,
|
||||
_LOCK_TIMEOUT,
|
||||
_RENEWAL_INTERVAL,
|
||||
)
|
||||
from synapse.util.clock import Clock
|
||||
from synapse.util.duration import Duration
|
||||
|
||||
@@ -83,7 +88,7 @@ class WorkerLockTestCase(unittest.HomeserverTestCase):
|
||||
# Note: We use `_pump_by` instead of `pump`/`advance` as the `Lock` has an
|
||||
# internal background looping call that runs every 30 seconds
|
||||
# (`_RENEWAL_INTERVAL`) to renew the `Lock` and push it's "drop timeout" value
|
||||
# further out by 2 minutes (`_LOCK_TIMEOUT_MS`). The `Lock` will prematurely
|
||||
# further out by 2 minutes (`_LOCK_TIMEOUT`). The `Lock` will prematurely
|
||||
# drop if this renewal is not allowed to run, which sours the test.
|
||||
# self.pump(amount=Duration(hours=1))
|
||||
self._pump_by(amount=Duration(hours=1), by=_RENEWAL_INTERVAL)
|
||||
@@ -91,9 +96,34 @@ class WorkerLockTestCase(unittest.HomeserverTestCase):
|
||||
# Make sure we haven't acquired the `lock2` yet (`lock1` still holds it)
|
||||
self.assertNoResult(d2)
|
||||
|
||||
# Release the first lock (`lock1`). The second lock(`lock2`) should be
|
||||
# automatically acquired by the `pump()` inside `get_success()`
|
||||
self.get_success(lock1.__aexit__(None, None, None))
|
||||
# Drop the lock without releasing it. If we just normally released the lock
|
||||
# (`self.get_success(lock1.__aexit__(None, None, None))`), the
|
||||
# `add_lock_released_callback`/`notify_lock_released` cycle would signal that we
|
||||
# should re-aquire the lock right away (on the next reactor tick). And we want
|
||||
# to avoid that as the point of this test is to stress the retry timeout
|
||||
# interval and `WORKER_LOCK_MAX_RETRY_INTERVAL`.
|
||||
del lock1
|
||||
|
||||
# Wait for `lock1` to go stale (it won't be renewed anymore because we deleted
|
||||
# it just above)
|
||||
self._pump_by(
|
||||
amount=_LOCK_TIMEOUT,
|
||||
by=_RENEWAL_INTERVAL,
|
||||
)
|
||||
|
||||
# Wait just enough time so `lock1` is reaped (found stale and forcefully drops
|
||||
# the lock its holding)
|
||||
self._pump_by(
|
||||
amount=_LOCK_REAP_INTERVAL,
|
||||
by=_RENEWAL_INTERVAL,
|
||||
)
|
||||
|
||||
# Wait just enough time so `lock2` tries re-acquiring the lock. Should be no
|
||||
# longer than our `WORKER_LOCK_MAX_RETRY_INTERVAL`.
|
||||
self._pump_by(
|
||||
amount=WORKER_LOCK_MAX_RETRY_INTERVAL,
|
||||
by=_RENEWAL_INTERVAL,
|
||||
)
|
||||
|
||||
# We should now have the lock
|
||||
self.successResultOf(d2)
|
||||
@@ -219,7 +249,7 @@ class WorkerLockWorkersTestCase(BaseMultiWorkerStreamTestCase):
|
||||
# Note: We use `_pump_by` instead of `pump`/`advance` as the `Lock` has an
|
||||
# internal background looping call that runs every 30 seconds
|
||||
# (`_RENEWAL_INTERVAL`) to renew the `Lock` and push it's "drop timeout" value
|
||||
# further out by 2 minutes (`_LOCK_TIMEOUT_MS`). The `Lock` will prematurely
|
||||
# further out by 2 minutes (`_LOCK_TIMEOUT`). The `Lock` will prematurely
|
||||
# drop if this renewal is not allowed to run, which sours the test.
|
||||
# self.pump(amount=Duration(hours=1))
|
||||
self._pump_by(amount=Duration(hours=1), by=_RENEWAL_INTERVAL)
|
||||
@@ -227,9 +257,34 @@ class WorkerLockWorkersTestCase(BaseMultiWorkerStreamTestCase):
|
||||
# Make sure we haven't acquired the `lock2` yet (`lock1` still holds it)
|
||||
self.assertNoResult(d2)
|
||||
|
||||
# Release the first lock (`lock1`). The second lock(`lock2`) should be
|
||||
# automatically acquired by the `pump()` inside `get_success()`
|
||||
self.get_success(lock1.__aexit__(None, None, None))
|
||||
# Drop the lock without releasing it. If we just normally released the lock
|
||||
# (`self.get_success(lock1.__aexit__(None, None, None))`), the
|
||||
# `add_lock_released_callback`/`notify_lock_released` cycle would signal that we
|
||||
# should re-aquire the lock right away (on the next reactor tick). And we want
|
||||
# to avoid that as the point of this test is to stress the retry timeout
|
||||
# interval and `WORKER_LOCK_MAX_RETRY_INTERVAL`.
|
||||
del lock1
|
||||
|
||||
# Wait for `lock1` to go stale (it won't be renewed anymore because we deleted
|
||||
# it just above)
|
||||
self._pump_by(
|
||||
amount=_LOCK_TIMEOUT,
|
||||
by=_RENEWAL_INTERVAL,
|
||||
)
|
||||
|
||||
# Wait just enough time so `lock1` is reaped (found stale and forcefully drops
|
||||
# the lock its holding)
|
||||
self._pump_by(
|
||||
amount=_LOCK_REAP_INTERVAL,
|
||||
by=_RENEWAL_INTERVAL,
|
||||
)
|
||||
|
||||
# Wait just enough time so `lock2` tries re-acquiring the lock. Should be no
|
||||
# longer than our `WORKER_LOCK_MAX_RETRY_INTERVAL`.
|
||||
self._pump_by(
|
||||
amount=WORKER_LOCK_MAX_RETRY_INTERVAL,
|
||||
by=_RENEWAL_INTERVAL,
|
||||
)
|
||||
|
||||
# We should now have the lock
|
||||
self.successResultOf(d2)
|
||||
|
||||
@@ -26,7 +26,7 @@ from twisted.internet.defer import Deferred
|
||||
from twisted.internet.testing import MemoryReactor
|
||||
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.databases.main.lock import _LOCK_TIMEOUT_MS, _RENEWAL_INTERVAL
|
||||
from synapse.storage.databases.main.lock import _LOCK_TIMEOUT, _RENEWAL_INTERVAL
|
||||
from synapse.util.clock import Clock
|
||||
|
||||
from tests import unittest
|
||||
@@ -117,7 +117,7 @@ class LockTestCase(unittest.HomeserverTestCase):
|
||||
self.get_success(lock.__aenter__())
|
||||
|
||||
# Wait for ages with the lock, we should not be able to get the lock.
|
||||
self.reactor.advance(5 * _LOCK_TIMEOUT_MS / 1000)
|
||||
self.reactor.advance(5 * _LOCK_TIMEOUT.as_secs())
|
||||
|
||||
lock2 = self.get_success(self.store.try_acquire_lock("name", "key"))
|
||||
self.assertIsNone(lock2)
|
||||
@@ -138,7 +138,7 @@ class LockTestCase(unittest.HomeserverTestCase):
|
||||
lock._looping_call.stop()
|
||||
|
||||
# Wait for the lock to timeout.
|
||||
self.reactor.advance(2 * _LOCK_TIMEOUT_MS / 1000)
|
||||
self.reactor.advance(2 * _LOCK_TIMEOUT.as_secs())
|
||||
|
||||
lock2 = self.get_success(self.store.try_acquire_lock("name", "key"))
|
||||
self.assertIsNotNone(lock2)
|
||||
@@ -154,7 +154,7 @@ class LockTestCase(unittest.HomeserverTestCase):
|
||||
del lock
|
||||
|
||||
# Wait for the lock to timeout.
|
||||
self.reactor.advance(2 * _LOCK_TIMEOUT_MS / 1000)
|
||||
self.reactor.advance(2 * _LOCK_TIMEOUT.as_secs())
|
||||
|
||||
lock2 = self.get_success(self.store.try_acquire_lock("name", "key"))
|
||||
self.assertIsNotNone(lock2)
|
||||
@@ -402,7 +402,7 @@ class ReadWriteLockTestCase(unittest.HomeserverTestCase):
|
||||
lock._looping_call.stop()
|
||||
|
||||
# Wait for the lock to timeout.
|
||||
self.reactor.advance(2 * _LOCK_TIMEOUT_MS / 1000)
|
||||
self.reactor.advance(2 * _LOCK_TIMEOUT.as_secs())
|
||||
|
||||
lock2 = self.get_success(
|
||||
self.store.try_acquire_read_write_lock("name", "key", write=True)
|
||||
@@ -422,7 +422,7 @@ class ReadWriteLockTestCase(unittest.HomeserverTestCase):
|
||||
del lock
|
||||
|
||||
# Wait for the lock to timeout.
|
||||
self.reactor.advance(2 * _LOCK_TIMEOUT_MS / 1000)
|
||||
self.reactor.advance(2 * _LOCK_TIMEOUT.as_secs())
|
||||
|
||||
lock2 = self.get_success(
|
||||
self.store.try_acquire_read_write_lock("name", "key", write=True)
|
||||
|
||||
Reference in New Issue
Block a user