diff --git a/src/Simplex/FileTransfer/Agent.hs b/src/Simplex/FileTransfer/Agent.hs index c82316c82..ad3ad72c0 100644 --- a/src/Simplex/FileTransfer/Agent.hs +++ b/src/Simplex/FileTransfer/Agent.hs @@ -184,7 +184,7 @@ runXFTPRcvWorker c srv Worker {doWork} = do cfg <- asks config forever $ do lift $ waitForWork doWork - atomically $ assertAgentForeground c + liftIO $ assertAgentForeground c runXFTPOperation cfg where runXFTPOperation :: AgentConfig -> AM () @@ -194,7 +194,7 @@ runXFTPRcvWorker c srv Worker {doWork} = do (fc@RcvFileChunk {userId, rcvFileId, rcvFileEntityId, digest, fileTmpPath, replicas = replica@RcvFileChunkReplica {rcvChunkReplicaId, server, delay} : _}, approvedRelays) -> do let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop -> do - atomically $ waitWhileSuspended c + liftIO $ waitWhileSuspended c liftIO $ waitForUserNetwork c atomically $ incXFTPServerStat c userId srv downloadAttempts downloadFileChunk fc replica approvedRelays @@ -205,7 +205,7 @@ runXFTPRcvWorker c srv Worker {doWork} = do when (serverHostError e) $ notify c rcvFileEntityId $ RFWARN e liftIO $ closeXFTPServerClient c userId server digest withStore' c $ \db -> updateRcvChunkReplicaDelay db rcvChunkReplicaId replicaDelay - atomically $ assertAgentForeground c + liftIO $ assertAgentForeground c loop retryDone e = do atomically . incXFTPServerStat c userId srv $ case e of @@ -221,7 +221,7 @@ runXFTPRcvWorker c srv Worker {doWork} = do chunkSpec = XFTPRcvChunkSpec chunkPath chSize (unFileDigest digest) relChunkPath = fileTmpPath takeFileName chunkPath agentXFTPDownloadChunk c userId digest replica chunkSpec - atomically $ waitUntilForeground c + liftIO $ waitUntilForeground c (entityId, complete, progress) <- withStore c $ \db -> runExceptT $ do liftIO $ updateRcvFileChunkReceived db (rcvChunkReplicaId replica) rcvChunkId relChunkPath RcvFile {size = FileSize currentSize, chunks, redirect} <- ExceptT $ getRcvFile db rcvFileId @@ -273,7 +273,7 @@ runXFTPRcvLocalWorker c Worker {doWork} = do cfg <- asks config forever $ do lift $ waitForWork doWork - atomically $ assertAgentForeground c + liftIO $ assertAgentForeground c runXFTPOperation cfg where runXFTPOperation :: AgentConfig -> AM () @@ -299,12 +299,12 @@ runXFTPRcvLocalWorker c Worker {doWork} = do Nothing -> do notify c rcvFileEntityId $ RFDONE fsSavePath lift $ forM_ tmpPath (removePath <=< toFSFilePath) - atomically $ waitUntilForeground c + liftIO $ waitUntilForeground c withStore' c (`updateRcvFileComplete` rcvFileId) Just RcvFileRedirect {redirectFileInfo, redirectDbId} -> do let RedirectFileInfo {size = redirectSize, digest = redirectDigest} = redirectFileInfo lift $ forM_ tmpPath (removePath <=< toFSFilePath) - atomically $ waitUntilForeground c + liftIO $ waitUntilForeground c withStore' c (`updateRcvFileComplete` rcvFileId) -- proceed with redirect yaml <- liftError (FILE . FILE_IO . show) (CF.readFile $ CryptoFile fsSavePath cfArgs) `agentFinally` (lift $ toFSFilePath fsSavePath >>= removePath) @@ -392,7 +392,7 @@ runXFTPSndPrepareWorker c Worker {doWork} = do cfg <- asks config forever $ do lift $ waitForWork doWork - atomically $ assertAgentForeground c + liftIO $ assertAgentForeground c runXFTPOperation cfg where runXFTPOperation :: AgentConfig -> AM () @@ -454,7 +454,7 @@ runXFTPSndPrepareWorker c Worker {doWork} = do SndFileChunkReplica {server} : _ -> Right server createChunk :: Int -> SndFileChunk -> AM (ProtocolServer 'PXFTP) createChunk numRecipients' ch = do - atomically $ assertAgentForeground c + liftIO $ assertAgentForeground c (replica, ProtoServerWithAuth srv _) <- tryCreate withStore' c $ \db -> createSndFileReplica db ch replica pure srv @@ -464,7 +464,7 @@ runXFTPSndPrepareWorker c Worker {doWork} = do let AgentClient {xftpServers} = c userSrvCount <- length <$> atomically (TM.lookup userId xftpServers) withRetryIntervalCount (riFast ri) $ \n _ loop -> do - atomically $ waitWhileSuspended c + liftIO $ waitWhileSuspended c liftIO $ waitForUserNetwork c let triedAllSrvs = n > userSrvCount createWithNextSrv usedSrvs @@ -474,7 +474,7 @@ runXFTPSndPrepareWorker c Worker {doWork} = do retryLoop loop triedAllSrvs e = do flip catchAgentError (\_ -> pure ()) $ do when (triedAllSrvs && serverHostError e) $ notify c sndFileEntityId $ SFWARN e - atomically $ assertAgentForeground c + liftIO $ assertAgentForeground c loop createWithNextSrv usedSrvs = do deleted <- withStore' c $ \db -> getSndFileDeleted db sndFileId @@ -494,7 +494,7 @@ runXFTPSndWorker c srv Worker {doWork} = do cfg <- asks config forever $ do lift $ waitForWork doWork - atomically $ assertAgentForeground c + liftIO $ assertAgentForeground c runXFTPOperation cfg where runXFTPOperation :: AgentConfig -> AM () @@ -504,7 +504,7 @@ runXFTPSndWorker c srv Worker {doWork} = do fc@SndFileChunk {userId, sndFileId, sndFileEntityId, filePrefixPath, digest, replicas = replica@SndFileChunkReplica {sndChunkReplicaId, server, delay} : _} -> do let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop -> do - atomically $ waitWhileSuspended c + liftIO $ waitWhileSuspended c liftIO $ waitForUserNetwork c atomically $ incXFTPServerStat c userId srv uploadAttempts uploadFileChunk cfg fc replica @@ -515,7 +515,7 @@ runXFTPSndWorker c srv Worker {doWork} = do when (serverHostError e) $ notify c sndFileEntityId $ SFWARN e liftIO $ closeXFTPServerClient c userId server digest withStore' c $ \db -> updateSndChunkReplicaDelay db sndChunkReplicaId replicaDelay - atomically $ assertAgentForeground c + liftIO $ assertAgentForeground c loop retryDone e = do atomically $ incXFTPServerStat c userId srv uploadErrs @@ -526,9 +526,9 @@ runXFTPSndWorker c srv Worker {doWork} = do fsFilePath <- lift $ toFSFilePath filePath unlessM (doesFileExist fsFilePath) $ throwE $ FILE NO_FILE let chunkSpec' = chunkSpec {filePath = fsFilePath} :: XFTPChunkSpec - atomically $ assertAgentForeground c + liftIO $ assertAgentForeground c agentXFTPUploadChunk c userId chunkDigest replica' chunkSpec' - atomically $ waitUntilForeground c + liftIO $ waitUntilForeground c sf@SndFile {sndFileEntityId, prefixPath, chunks} <- withStore c $ \db -> do updateSndChunkReplicaStatus db sndChunkReplicaId SFRSUploaded getSndFile db sndFileId @@ -666,7 +666,7 @@ runXFTPDelWorker c srv Worker {doWork} = do cfg <- asks config forever $ do lift $ waitForWork doWork - atomically $ assertAgentForeground c + liftIO $ assertAgentForeground c runXFTPOperation cfg where runXFTPOperation :: AgentConfig -> AM () @@ -677,7 +677,7 @@ runXFTPDelWorker c srv Worker {doWork} = do processDeletedReplica replica@DeletedSndChunkReplica {deletedSndChunkReplicaId, userId, server, chunkDigest, delay} = do let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop -> do - atomically $ waitWhileSuspended c + liftIO $ waitWhileSuspended c liftIO $ waitForUserNetwork c atomically $ incXFTPServerStat c userId srv deleteAttempts deleteChunkReplica @@ -688,7 +688,7 @@ runXFTPDelWorker c srv Worker {doWork} = do when (serverHostError e) $ notify c "" $ SFWARN e liftIO $ closeXFTPServerClient c userId server chunkDigest withStore' c $ \db -> updateDeletedSndChunkReplicaDelay db deletedSndChunkReplicaId replicaDelay - atomically $ assertAgentForeground c + liftIO $ assertAgentForeground c loop retryDone e = do atomically $ incXFTPServerStat c userId srv deleteErrs @@ -703,7 +703,7 @@ delWorkerInternalError c deletedSndChunkReplicaId e = do withStore' c $ \db -> deleteDeletedSndChunkReplica db deletedSndChunkReplicaId notify c "" $ SFERR e -assertAgentForeground :: AgentClient -> STM () +assertAgentForeground :: AgentClient -> IO () assertAgentForeground c = do throwWhenInactive c waitUntilForeground c diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index f7e70b721..6753cb36f 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -235,7 +235,7 @@ logServersStats c = do liftIO $ threadDelay' delay int <- asks (logStatsInterval . config) forever $ do - atomically $ waitUntilActive c + liftIO $ waitUntilActive c saveServersStats c liftIO $ threadDelay' int @@ -1136,7 +1136,7 @@ runCommandProcessing c@AgentClient {subQ} server_ Worker {doWork} = do forever $ do atomically $ endAgentOperation c AOSndNetwork lift $ waitForWork doWork - atomically $ throwWhenInactive c + liftIO $ throwWhenInactive c atomically $ beginAgentOperation c AOSndNetwork withWork c doWork (`getPendingServerCommand` server_) $ runProcessCmd (riFast ri) where @@ -1254,7 +1254,7 @@ runCommandProcessing c@AgentClient {subQ} server_ Worker {doWork} = do SomeConn _ conn@DuplexConnection {} -> a conn _ -> internalErr "command requires duplex connection" tryCommand action = withRetryInterval ri $ \_ loop -> do - atomically $ waitWhileSuspended c + liftIO $ waitWhileSuspended c liftIO $ waitForUserNetwork c tryError action >>= \case Left e @@ -1363,7 +1363,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} ConnData {connId} sq@SndQueue {userI forever $ do atomically $ endAgentOperation c AOSndNetwork lift $ waitForWork doWork - atomically $ throwWhenInactive c + liftIO $ throwWhenInactive c atomically $ throwWhenNoDelivery c sq atomically $ beginAgentOperation c AOSndNetwork withWork c doWork (\db -> getPendingQueueMsg db connId sq) $ @@ -1372,7 +1372,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} ConnData {connId} sq@SndQueue {userI let mId = unId msgId ri' = maybe id updateRetryInterval2 msgRetryState ri withRetryLock2 ri' qLock $ \riState loop -> do - atomically $ waitWhileSuspended c + liftIO $ waitWhileSuspended c liftIO $ waitForUserNetwork c resp <- tryError $ case msgType of AM_CONN_INFO -> sendConfirmation c sq msgBody @@ -1525,7 +1525,7 @@ retrySndOp :: AgentClient -> AM () -> AM () retrySndOp c loop = do -- end... is in a separate atomically because if begin... blocks, SUSPENDED won't be sent atomically $ endAgentOperation c AOSndNetwork - atomically $ throwWhenInactive c + liftIO $ throwWhenInactive c atomically $ beginAgentOperation c AOSndNetwork loop @@ -2113,7 +2113,7 @@ cleanupManager c@AgentClient {subQ} = do int <- asks (cleanupInterval . config) ttl <- asks $ storedMsgDataTTL . config forever $ do - atomically $ waitUntilActive c + liftIO $ waitUntilActive c run ERR deleteConns run ERR $ withStore' c (`deleteRcvMsgHashesExpired` ttl) run ERR $ withStore' c (`deleteSndMsgsExpired` ttl) @@ -2133,7 +2133,7 @@ cleanupManager c@AgentClient {subQ} = do step <- asks $ cleanupStepInterval . config liftIO $ threadDelay step -- we are catching it to avoid CRITICAL errors in tests when this is the only remaining handle to active - waitActive a = liftIO (E.tryAny . atomically $ waitUntilActive c) >>= either (\_ -> pure ()) (\_ -> void a) + waitActive a = liftIO (E.tryAny $ waitUntilActive c) >>= either (\_ -> pure ()) (\_ -> void a) deleteConns = withLock (deleteLock c) "cleanupManager" $ do void $ withStore' c getDeletedConnIds >>= deleteDeletedConns c diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index d3e1f2eca..3d92ea38e 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -720,7 +720,7 @@ resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess = do withRetryForeground ri isForeground (isNetworkOnline c) $ \_ loop -> do pending <- atomically getPending forM_ (L.nonEmpty pending) $ \qs -> do - atomically $ waitUntilForeground c + liftIO $ waitUntilForeground c liftIO $ waitForUserNetwork c reconnectSMPClient c tSess qs loop @@ -911,12 +911,13 @@ cancelWorker Worker {doWork, action} = do noWorkToDo doWork atomically (tryTakeTMVar action) >>= mapM_ (mapM_ uninterruptibleCancel) -waitUntilActive :: AgentClient -> STM () -waitUntilActive c = unlessM (readTVar $ active c) retry -{-# INLINE waitUntilActive #-} +waitUntilActive :: AgentClient -> IO () +waitUntilActive AgentClient {active} = + unlessM (readTVarIO active) . atomically $ + unlessM (readTVar active) retry -throwWhenInactive :: AgentClient -> STM () -throwWhenInactive c = unlessM (readTVar $ active c) $ throwSTM ThreadKilled +throwWhenInactive :: AgentClient -> IO () +throwWhenInactive c = unlessM (readTVarIO $ active c) $ E.throwIO ThreadKilled {-# INLINE throwWhenInactive #-} -- this function is used to remove workers once delivery is complete, not when it is removed from the map @@ -1865,22 +1866,24 @@ beginAgentOperation c op = do -- unsafeIOToSTM $ putStrLn $ "beginOperation! " <> show op <> " " <> show (opsInProgress s + 1) writeTVar opVar $! s {opsInProgress = opsInProgress s + 1} -agentOperationBracket :: MonadUnliftIO m => AgentClient -> AgentOperation -> (AgentClient -> STM ()) -> m a -> m a +agentOperationBracket :: MonadUnliftIO m => AgentClient -> AgentOperation -> (AgentClient -> IO ()) -> m a -> m a agentOperationBracket c op check action = E.bracket - (atomically (check c) >> atomically (beginAgentOperation c op)) + (liftIO (check c) >> atomically (beginAgentOperation c op)) (\_ -> atomically $ endAgentOperation c op) (const action) -waitUntilForeground :: AgentClient -> STM () -waitUntilForeground c = unlessM ((ASForeground ==) <$> readTVar (agentState c)) retry -{-# INLINE waitUntilForeground #-} +waitUntilForeground :: AgentClient -> IO () +waitUntilForeground AgentClient {agentState} = + unlessM ((ASForeground ==) <$> readTVarIO agentState) . atomically $ + unlessM ((ASForeground ==) <$> readTVar agentState) retry -- This function waits while agent is suspended, but will proceed while it is suspending, -- to allow completing in-flight operations. -waitWhileSuspended :: AgentClient -> STM () -waitWhileSuspended c = unlessM ((ASSuspended /=) <$> readTVar (agentState c)) retry -{-# INLINE waitWhileSuspended #-} +waitWhileSuspended :: AgentClient -> IO () +waitWhileSuspended AgentClient {agentState} = + unlessM ((ASSuspended /=) <$> readTVarIO agentState) . atomically $ + unlessM ((ASSuspended /=) <$> readTVar agentState) retry withStore' :: AgentClient -> (DB.Connection -> IO a) -> AM a withStore' c action = withStore c $ fmap Right . action diff --git a/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs b/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs index 249aad942..23a88ea70 100644 --- a/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs +++ b/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs @@ -159,7 +159,7 @@ runNtfWorker c srv Worker {doWork} = logInfo $ "runNtfWorker, nextSub " <> tshow nextSub ri <- asks $ reconnectInterval . config withRetryInterval ri $ \_ loop -> do - atomically $ waitWhileSuspended c + liftIO $ waitWhileSuspended c liftIO $ waitForUserNetwork c processSub nextSub `catchAgentError` retryOnError c "NtfWorker" loop (workerInternalError c connId . show) @@ -244,7 +244,7 @@ runNtfSMPWorker c srv Worker {doWork} = do logInfo $ "runNtfSMPWorker, nextSub " <> tshow nextSub ri <- asks $ reconnectInterval . config withRetryInterval ri $ \_ loop -> do - atomically $ waitWhileSuspended c + liftIO $ waitWhileSuspended c liftIO $ waitForUserNetwork c processSub nextSub `catchAgentError` retryOnError c "NtfSMPWorker" loop (workerInternalError c connId . show) @@ -297,7 +297,7 @@ retryOnError c name loop done e = do where retryLoop = do atomically $ endAgentOperation c AONtfNetwork - atomically $ throwWhenInactive c + liftIO $ throwWhenInactive c atomically $ beginAgentOperation c AONtfNetwork loop