From 403050ef15c75642848bbc1550e8994deb552ebb Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 24 Mar 2026 21:10:18 -0400 Subject: [PATCH] The two fixes resolved both issues: MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. native_concurrently_execute — added delay_cancellation parameter that was expected by callers like e2e_keys.py. Uses asyncio.shield() to prevent cancellation from interrupting in-flight tasks. 2. Lock.__init__ — replaced reactor.callFromThread with loop.call_soon_threadsafe since the reactor object may be None in the asyncio world. Also replaced reactor.running with clock._is_shutdown for the shutdown check. The Lock is constructed inside a DB transaction (potentially on an executor thread), so it needs thread-safe scheduling to set up its renewal looping call on the main event loop. --- synapse/storage/databases/main/lock.py | 12 +++++++----- synapse/storage/native_database.py | 12 ++++++++---- synapse/util/async_helpers.py | 11 ++++++++++- 3 files changed, 25 insertions(+), 10 deletions(-) diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py index 9f95a80d74..3ba7a8c010 100644 --- a/synapse/storage/databases/main/lock.py +++ b/synapse/storage/databases/main/lock.py @@ -398,10 +398,11 @@ class Lock: self._table = "worker_read_write_locks" if read_write else "worker_locks" - # We might be called from a non-main thread, so we defer setting up the - # looping call. + # We might be called from a non-main thread (e.g. a DB executor), + # so we defer setting up the looping call to the event loop. self._looping_call: Any | None = None - reactor.callFromThread(self._setup_looping_call) + loop = clock._get_loop() + loop.call_soon_threadsafe(self._setup_looping_call) self._dropped = False @@ -541,9 +542,10 @@ class Lock: # renewing the lock. if self._looping_call and self._looping_call.running: # We might be called from a non-main thread. - self._reactor.callFromThread(self._looping_call.stop) + loop = self._clock._get_loop() + loop.call_soon_threadsafe(self._looping_call.stop) - if self._reactor.running: + if not self._clock._is_shutdown: logger.error( "Lock for (%s, %s) dropped without being released", self._lock_name, diff --git a/synapse/storage/native_database.py b/synapse/storage/native_database.py index 620cc26148..7cbf49ad87 100644 --- a/synapse/storage/native_database.py +++ b/synapse/storage/native_database.py @@ -176,14 +176,16 @@ class NativeConnectionPool: if self._closed: raise Exception("Connection pool is closed") - conn = self._get_connection() - if self._use_shared_conn: # For in-memory SQLite with a shared connection, run directly # on the event loop thread to avoid thread dispatch overhead. + conn = self._get_connection() return func(conn, *args, **kwargs) def _inner() -> R: + # Get connection inside the executor thread so that + # _thread_local resolves to this thread's connection. + conn = self._get_connection() return func(conn, *args, **kwargs) loop = asyncio.get_running_loop() @@ -211,11 +213,10 @@ class NativeConnectionPool: if self._closed: raise Exception("Connection pool is closed") - conn = self._get_connection() - if self._use_shared_conn: # For in-memory SQLite with a shared connection, run directly # on the event loop thread to avoid thread dispatch overhead. + conn = self._get_connection() try: result = func(conn, *args, **kwargs) conn.commit() @@ -225,6 +226,9 @@ class NativeConnectionPool: raise def _inner() -> R: + # Get connection inside the executor thread so that + # _thread_local resolves to this thread's connection. + conn = self._get_connection() try: result = func(conn, *args, **kwargs) conn.commit() diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index b2e739ba8b..79c555a47a 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -606,10 +606,15 @@ async def native_concurrently_execute( func: Callable[[Any], Awaitable[Any]], args: Iterable[Any], limit: int, + delay_cancellation: bool = False, ) -> None: """asyncio-native equivalent of concurrently_execute. Executes func on each arg with at most `limit` concurrent executions. + + Args: + delay_cancellation: If True, shield the gather from cancellation + so that in-flight tasks can complete before the error propagates. """ sem = asyncio.Semaphore(limit) @@ -617,7 +622,11 @@ async def native_concurrently_execute( async with sem: await func(item) - await asyncio.gather(*[_run(item) for item in args]) + coro = asyncio.gather(*[_run(item) for item in args]) + if delay_cancellation: + await asyncio.shield(coro) + else: + await coro async def native_timeout(