diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py index 9f95a80d74..3ba7a8c010 100644 --- a/synapse/storage/databases/main/lock.py +++ b/synapse/storage/databases/main/lock.py @@ -398,10 +398,11 @@ class Lock: self._table = "worker_read_write_locks" if read_write else "worker_locks" - # We might be called from a non-main thread, so we defer setting up the - # looping call. + # We might be called from a non-main thread (e.g. a DB executor), + # so we defer setting up the looping call to the event loop. self._looping_call: Any | None = None - reactor.callFromThread(self._setup_looping_call) + loop = clock._get_loop() + loop.call_soon_threadsafe(self._setup_looping_call) self._dropped = False @@ -541,9 +542,10 @@ class Lock: # renewing the lock. if self._looping_call and self._looping_call.running: # We might be called from a non-main thread. - self._reactor.callFromThread(self._looping_call.stop) + loop = self._clock._get_loop() + loop.call_soon_threadsafe(self._looping_call.stop) - if self._reactor.running: + if not self._clock._is_shutdown: logger.error( "Lock for (%s, %s) dropped without being released", self._lock_name, diff --git a/synapse/storage/native_database.py b/synapse/storage/native_database.py index 620cc26148..7cbf49ad87 100644 --- a/synapse/storage/native_database.py +++ b/synapse/storage/native_database.py @@ -176,14 +176,16 @@ class NativeConnectionPool: if self._closed: raise Exception("Connection pool is closed") - conn = self._get_connection() - if self._use_shared_conn: # For in-memory SQLite with a shared connection, run directly # on the event loop thread to avoid thread dispatch overhead. + conn = self._get_connection() return func(conn, *args, **kwargs) def _inner() -> R: + # Get connection inside the executor thread so that + # _thread_local resolves to this thread's connection. + conn = self._get_connection() return func(conn, *args, **kwargs) loop = asyncio.get_running_loop() @@ -211,11 +213,10 @@ class NativeConnectionPool: if self._closed: raise Exception("Connection pool is closed") - conn = self._get_connection() - if self._use_shared_conn: # For in-memory SQLite with a shared connection, run directly # on the event loop thread to avoid thread dispatch overhead. + conn = self._get_connection() try: result = func(conn, *args, **kwargs) conn.commit() @@ -225,6 +226,9 @@ class NativeConnectionPool: raise def _inner() -> R: + # Get connection inside the executor thread so that + # _thread_local resolves to this thread's connection. + conn = self._get_connection() try: result = func(conn, *args, **kwargs) conn.commit() diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index b2e739ba8b..79c555a47a 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -606,10 +606,15 @@ async def native_concurrently_execute( func: Callable[[Any], Awaitable[Any]], args: Iterable[Any], limit: int, + delay_cancellation: bool = False, ) -> None: """asyncio-native equivalent of concurrently_execute. Executes func on each arg with at most `limit` concurrent executions. + + Args: + delay_cancellation: If True, shield the gather from cancellation + so that in-flight tasks can complete before the error propagates. """ sem = asyncio.Semaphore(limit) @@ -617,7 +622,11 @@ async def native_concurrently_execute( async with sem: await func(item) - await asyncio.gather(*[_run(item) for item in args]) + coro = asyncio.gather(*[_run(item) for item in args]) + if delay_cancellation: + await asyncio.shield(coro) + else: + await coro async def native_timeout(