mirror of
https://github.com/element-hq/synapse.git
synced 2026-03-30 17:15:50 +00:00
The two fixes resolved both issues:
1. native_concurrently_execute — added delay_cancellation parameter that was expected by callers like e2e_keys.py. Uses asyncio.shield() to prevent cancellation from interrupting in-flight tasks. 2. Lock.__init__ — replaced reactor.callFromThread with loop.call_soon_threadsafe since the reactor object may be None in the asyncio world. Also replaced reactor.running with clock._is_shutdown for the shutdown check. The Lock is constructed inside a DB transaction (potentially on an executor thread), so it needs thread-safe scheduling to set up its renewal looping call on the main event loop.
This commit is contained in:
@@ -398,10 +398,11 @@ class Lock:
|
||||
|
||||
self._table = "worker_read_write_locks" if read_write else "worker_locks"
|
||||
|
||||
# We might be called from a non-main thread, so we defer setting up the
|
||||
# looping call.
|
||||
# We might be called from a non-main thread (e.g. a DB executor),
|
||||
# so we defer setting up the looping call to the event loop.
|
||||
self._looping_call: Any | None = None
|
||||
reactor.callFromThread(self._setup_looping_call)
|
||||
loop = clock._get_loop()
|
||||
loop.call_soon_threadsafe(self._setup_looping_call)
|
||||
|
||||
self._dropped = False
|
||||
|
||||
@@ -541,9 +542,10 @@ class Lock:
|
||||
# renewing the lock.
|
||||
if self._looping_call and self._looping_call.running:
|
||||
# We might be called from a non-main thread.
|
||||
self._reactor.callFromThread(self._looping_call.stop)
|
||||
loop = self._clock._get_loop()
|
||||
loop.call_soon_threadsafe(self._looping_call.stop)
|
||||
|
||||
if self._reactor.running:
|
||||
if not self._clock._is_shutdown:
|
||||
logger.error(
|
||||
"Lock for (%s, %s) dropped without being released",
|
||||
self._lock_name,
|
||||
|
||||
@@ -176,14 +176,16 @@ class NativeConnectionPool:
|
||||
if self._closed:
|
||||
raise Exception("Connection pool is closed")
|
||||
|
||||
conn = self._get_connection()
|
||||
|
||||
if self._use_shared_conn:
|
||||
# For in-memory SQLite with a shared connection, run directly
|
||||
# on the event loop thread to avoid thread dispatch overhead.
|
||||
conn = self._get_connection()
|
||||
return func(conn, *args, **kwargs)
|
||||
|
||||
def _inner() -> R:
|
||||
# Get connection inside the executor thread so that
|
||||
# _thread_local resolves to this thread's connection.
|
||||
conn = self._get_connection()
|
||||
return func(conn, *args, **kwargs)
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
@@ -211,11 +213,10 @@ class NativeConnectionPool:
|
||||
if self._closed:
|
||||
raise Exception("Connection pool is closed")
|
||||
|
||||
conn = self._get_connection()
|
||||
|
||||
if self._use_shared_conn:
|
||||
# For in-memory SQLite with a shared connection, run directly
|
||||
# on the event loop thread to avoid thread dispatch overhead.
|
||||
conn = self._get_connection()
|
||||
try:
|
||||
result = func(conn, *args, **kwargs)
|
||||
conn.commit()
|
||||
@@ -225,6 +226,9 @@ class NativeConnectionPool:
|
||||
raise
|
||||
|
||||
def _inner() -> R:
|
||||
# Get connection inside the executor thread so that
|
||||
# _thread_local resolves to this thread's connection.
|
||||
conn = self._get_connection()
|
||||
try:
|
||||
result = func(conn, *args, **kwargs)
|
||||
conn.commit()
|
||||
|
||||
@@ -606,10 +606,15 @@ async def native_concurrently_execute(
|
||||
func: Callable[[Any], Awaitable[Any]],
|
||||
args: Iterable[Any],
|
||||
limit: int,
|
||||
delay_cancellation: bool = False,
|
||||
) -> None:
|
||||
"""asyncio-native equivalent of concurrently_execute.
|
||||
|
||||
Executes func on each arg with at most `limit` concurrent executions.
|
||||
|
||||
Args:
|
||||
delay_cancellation: If True, shield the gather from cancellation
|
||||
so that in-flight tasks can complete before the error propagates.
|
||||
"""
|
||||
sem = asyncio.Semaphore(limit)
|
||||
|
||||
@@ -617,7 +622,11 @@ async def native_concurrently_execute(
|
||||
async with sem:
|
||||
await func(item)
|
||||
|
||||
await asyncio.gather(*[_run(item) for item in args])
|
||||
coro = asyncio.gather(*[_run(item) for item in args])
|
||||
if delay_cancellation:
|
||||
await asyncio.shield(coro)
|
||||
else:
|
||||
await coro
|
||||
|
||||
|
||||
async def native_timeout(
|
||||
|
||||
Reference in New Issue
Block a user