From 13a9eee0cfede5056a7dfd990609afb09840d02f Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Sat, 17 Sep 2022 00:18:58 +0100 Subject: [PATCH] cancel threads asynchronously (#527) --- src/Simplex/Messaging/Agent.hs | 15 ++++++--------- src/Simplex/Messaging/Agent/Client.hs | 4 ++-- src/Simplex/Messaging/Agent/NtfSubSupervisor.hs | 1 + 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 18e290bfd..f002f36a0 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -717,9 +717,8 @@ runCommandProcessing c@AgentClient {subQ} server = do retryCommand loop = do -- end... is in a separate atomically because if begin... blocks, SUSPENDED won't be sent atomically $ endAgentOperation c AOSndNetwork - atomically $ do - throwWhenInactive c - beginAgentOperation c AOSndNetwork + atomically $ throwWhenInactive c + atomically $ beginAgentOperation c AOSndNetwork loop notify cmd = atomically $ writeTBQueue subQ (corrId, connId, cmd) withNextSrv :: TVar [SMPServer] -> [SMPServer] -> (SMPServer -> m ()) -> m () @@ -794,9 +793,8 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {connId, duplexHandsh atomically $ endAgentOperation c AOSndNetwork atomically $ throwWhenInactive c msgId <- atomically $ readTQueue mq - atomically $ do - beginAgentOperation c AOSndNetwork - endAgentOperation c AOMsgDelivery + atomically $ beginAgentOperation c AOSndNetwork + atomically $ endAgentOperation c AOMsgDelivery let mId = unId msgId E.try (withStore c $ \db -> getPendingMsgData db connId msgId) >>= \case Left (e :: E.SomeException) -> @@ -887,9 +885,8 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {connId, duplexHandsh retrySending loop = do -- end... is in a separate atomically because if begin... blocks, SUSPENDED won't be sent atomically $ endAgentOperation c AOSndNetwork - atomically $ do - throwWhenInactive c - beginAgentOperation c AOSndNetwork + atomically $ throwWhenInactive c + atomically $ beginAgentOperation c AOSndNetwork loop ackMessage' :: forall m. AgentMonad m => AgentClient -> ConnId -> AgentMsgId -> m () diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index d20a344a9..4cc48b2dc 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -438,7 +438,7 @@ closeProtocolServerClients c clientsSel = _ -> pure () cancelActions :: (Foldable f, Monoid (f (Async ()))) => TVar (f (Async ())) -> IO () -cancelActions as = readTVarIO as >>= mapM_ uninterruptibleCancel >> atomically (writeTVar as mempty) +cancelActions as = readTVarIO as >>= mapM_ (forkIO . uninterruptibleCancel) >> atomically (writeTVar as mempty) withAgentLock :: MonadUnliftIO m => AgentClient -> m a -> m a withAgentLock AgentClient {lock} = @@ -817,7 +817,7 @@ beginAgentOperation c op = do agentOperationBracket :: MonadUnliftIO m => AgentClient -> AgentOperation -> (AgentClient -> STM ()) -> m a -> m a agentOperationBracket c op check action = E.bracket - (atomically $ check c >> beginAgentOperation c op) + (atomically (check c) >> atomically (beginAgentOperation c op)) (\_ -> atomically $ endAgentOperation c op) (const action) diff --git a/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs b/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs index 57385984a..7f3346f4c 100644 --- a/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs +++ b/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs @@ -305,6 +305,7 @@ retryOnError c name loop done e = do where retryLoop = do atomically $ endAgentOperation c AONtfNetwork + atomically $ throwWhenInactive c atomically $ beginAgentOperation c AONtfNetwork loop