mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-30 16:26:02 +00:00
cancel threads asynchronously (#527)
This commit is contained in:
committed by
GitHub
parent
959ae34f16
commit
13a9eee0cf
@@ -717,9 +717,8 @@ runCommandProcessing c@AgentClient {subQ} server = do
|
||||
retryCommand loop = do
|
||||
-- end... is in a separate atomically because if begin... blocks, SUSPENDED won't be sent
|
||||
atomically $ endAgentOperation c AOSndNetwork
|
||||
atomically $ do
|
||||
throwWhenInactive c
|
||||
beginAgentOperation c AOSndNetwork
|
||||
atomically $ throwWhenInactive c
|
||||
atomically $ beginAgentOperation c AOSndNetwork
|
||||
loop
|
||||
notify cmd = atomically $ writeTBQueue subQ (corrId, connId, cmd)
|
||||
withNextSrv :: TVar [SMPServer] -> [SMPServer] -> (SMPServer -> m ()) -> m ()
|
||||
@@ -794,9 +793,8 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {connId, duplexHandsh
|
||||
atomically $ endAgentOperation c AOSndNetwork
|
||||
atomically $ throwWhenInactive c
|
||||
msgId <- atomically $ readTQueue mq
|
||||
atomically $ do
|
||||
beginAgentOperation c AOSndNetwork
|
||||
endAgentOperation c AOMsgDelivery
|
||||
atomically $ beginAgentOperation c AOSndNetwork
|
||||
atomically $ endAgentOperation c AOMsgDelivery
|
||||
let mId = unId msgId
|
||||
E.try (withStore c $ \db -> getPendingMsgData db connId msgId) >>= \case
|
||||
Left (e :: E.SomeException) ->
|
||||
@@ -887,9 +885,8 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {connId, duplexHandsh
|
||||
retrySending loop = do
|
||||
-- end... is in a separate atomically because if begin... blocks, SUSPENDED won't be sent
|
||||
atomically $ endAgentOperation c AOSndNetwork
|
||||
atomically $ do
|
||||
throwWhenInactive c
|
||||
beginAgentOperation c AOSndNetwork
|
||||
atomically $ throwWhenInactive c
|
||||
atomically $ beginAgentOperation c AOSndNetwork
|
||||
loop
|
||||
|
||||
ackMessage' :: forall m. AgentMonad m => AgentClient -> ConnId -> AgentMsgId -> m ()
|
||||
|
||||
@@ -438,7 +438,7 @@ closeProtocolServerClients c clientsSel =
|
||||
_ -> pure ()
|
||||
|
||||
cancelActions :: (Foldable f, Monoid (f (Async ()))) => TVar (f (Async ())) -> IO ()
|
||||
cancelActions as = readTVarIO as >>= mapM_ uninterruptibleCancel >> atomically (writeTVar as mempty)
|
||||
cancelActions as = readTVarIO as >>= mapM_ (forkIO . uninterruptibleCancel) >> atomically (writeTVar as mempty)
|
||||
|
||||
withAgentLock :: MonadUnliftIO m => AgentClient -> m a -> m a
|
||||
withAgentLock AgentClient {lock} =
|
||||
@@ -817,7 +817,7 @@ beginAgentOperation c op = do
|
||||
agentOperationBracket :: MonadUnliftIO m => AgentClient -> AgentOperation -> (AgentClient -> STM ()) -> m a -> m a
|
||||
agentOperationBracket c op check action =
|
||||
E.bracket
|
||||
(atomically $ check c >> beginAgentOperation c op)
|
||||
(atomically (check c) >> atomically (beginAgentOperation c op))
|
||||
(\_ -> atomically $ endAgentOperation c op)
|
||||
(const action)
|
||||
|
||||
|
||||
@@ -305,6 +305,7 @@ retryOnError c name loop done e = do
|
||||
where
|
||||
retryLoop = do
|
||||
atomically $ endAgentOperation c AONtfNetwork
|
||||
atomically $ throwWhenInactive c
|
||||
atomically $ beginAgentOperation c AONtfNetwork
|
||||
loop
|
||||
|
||||
|
||||
Reference in New Issue
Block a user