reduce the number of STM transactions

This commit is contained in:
Evgeny Poberezkin
2024-08-08 23:45:54 +01:00
parent 7d8457263b
commit 0e7471ee00
4 changed files with 48 additions and 45 deletions
+20 -20
View File
@@ -184,7 +184,7 @@ runXFTPRcvWorker c srv Worker {doWork} = do
cfg <- asks config
forever $ do
lift $ waitForWork doWork
atomically $ assertAgentForeground c
liftIO $ assertAgentForeground c
runXFTPOperation cfg
where
runXFTPOperation :: AgentConfig -> AM ()
@@ -194,7 +194,7 @@ runXFTPRcvWorker c srv Worker {doWork} = do
(fc@RcvFileChunk {userId, rcvFileId, rcvFileEntityId, digest, fileTmpPath, replicas = replica@RcvFileChunkReplica {rcvChunkReplicaId, server, delay} : _}, approvedRelays) -> do
let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay
withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop -> do
atomically $ waitWhileSuspended c
liftIO $ waitWhileSuspended c
liftIO $ waitForUserNetwork c
atomically $ incXFTPServerStat c userId srv downloadAttempts
downloadFileChunk fc replica approvedRelays
@@ -205,7 +205,7 @@ runXFTPRcvWorker c srv Worker {doWork} = do
when (serverHostError e) $ notify c rcvFileEntityId $ RFWARN e
liftIO $ closeXFTPServerClient c userId server digest
withStore' c $ \db -> updateRcvChunkReplicaDelay db rcvChunkReplicaId replicaDelay
atomically $ assertAgentForeground c
liftIO $ assertAgentForeground c
loop
retryDone e = do
atomically . incXFTPServerStat c userId srv $ case e of
@@ -221,7 +221,7 @@ runXFTPRcvWorker c srv Worker {doWork} = do
chunkSpec = XFTPRcvChunkSpec chunkPath chSize (unFileDigest digest)
relChunkPath = fileTmpPath </> takeFileName chunkPath
agentXFTPDownloadChunk c userId digest replica chunkSpec
atomically $ waitUntilForeground c
liftIO $ waitUntilForeground c
(entityId, complete, progress) <- withStore c $ \db -> runExceptT $ do
liftIO $ updateRcvFileChunkReceived db (rcvChunkReplicaId replica) rcvChunkId relChunkPath
RcvFile {size = FileSize currentSize, chunks, redirect} <- ExceptT $ getRcvFile db rcvFileId
@@ -273,7 +273,7 @@ runXFTPRcvLocalWorker c Worker {doWork} = do
cfg <- asks config
forever $ do
lift $ waitForWork doWork
atomically $ assertAgentForeground c
liftIO $ assertAgentForeground c
runXFTPOperation cfg
where
runXFTPOperation :: AgentConfig -> AM ()
@@ -299,12 +299,12 @@ runXFTPRcvLocalWorker c Worker {doWork} = do
Nothing -> do
notify c rcvFileEntityId $ RFDONE fsSavePath
lift $ forM_ tmpPath (removePath <=< toFSFilePath)
atomically $ waitUntilForeground c
liftIO $ waitUntilForeground c
withStore' c (`updateRcvFileComplete` rcvFileId)
Just RcvFileRedirect {redirectFileInfo, redirectDbId} -> do
let RedirectFileInfo {size = redirectSize, digest = redirectDigest} = redirectFileInfo
lift $ forM_ tmpPath (removePath <=< toFSFilePath)
atomically $ waitUntilForeground c
liftIO $ waitUntilForeground c
withStore' c (`updateRcvFileComplete` rcvFileId)
-- proceed with redirect
yaml <- liftError (FILE . FILE_IO . show) (CF.readFile $ CryptoFile fsSavePath cfArgs) `agentFinally` (lift $ toFSFilePath fsSavePath >>= removePath)
@@ -392,7 +392,7 @@ runXFTPSndPrepareWorker c Worker {doWork} = do
cfg <- asks config
forever $ do
lift $ waitForWork doWork
atomically $ assertAgentForeground c
liftIO $ assertAgentForeground c
runXFTPOperation cfg
where
runXFTPOperation :: AgentConfig -> AM ()
@@ -454,7 +454,7 @@ runXFTPSndPrepareWorker c Worker {doWork} = do
SndFileChunkReplica {server} : _ -> Right server
createChunk :: Int -> SndFileChunk -> AM (ProtocolServer 'PXFTP)
createChunk numRecipients' ch = do
atomically $ assertAgentForeground c
liftIO $ assertAgentForeground c
(replica, ProtoServerWithAuth srv _) <- tryCreate
withStore' c $ \db -> createSndFileReplica db ch replica
pure srv
@@ -464,7 +464,7 @@ runXFTPSndPrepareWorker c Worker {doWork} = do
let AgentClient {xftpServers} = c
userSrvCount <- length <$> atomically (TM.lookup userId xftpServers)
withRetryIntervalCount (riFast ri) $ \n _ loop -> do
atomically $ waitWhileSuspended c
liftIO $ waitWhileSuspended c
liftIO $ waitForUserNetwork c
let triedAllSrvs = n > userSrvCount
createWithNextSrv usedSrvs
@@ -474,7 +474,7 @@ runXFTPSndPrepareWorker c Worker {doWork} = do
retryLoop loop triedAllSrvs e = do
flip catchAgentError (\_ -> pure ()) $ do
when (triedAllSrvs && serverHostError e) $ notify c sndFileEntityId $ SFWARN e
atomically $ assertAgentForeground c
liftIO $ assertAgentForeground c
loop
createWithNextSrv usedSrvs = do
deleted <- withStore' c $ \db -> getSndFileDeleted db sndFileId
@@ -494,7 +494,7 @@ runXFTPSndWorker c srv Worker {doWork} = do
cfg <- asks config
forever $ do
lift $ waitForWork doWork
atomically $ assertAgentForeground c
liftIO $ assertAgentForeground c
runXFTPOperation cfg
where
runXFTPOperation :: AgentConfig -> AM ()
@@ -504,7 +504,7 @@ runXFTPSndWorker c srv Worker {doWork} = do
fc@SndFileChunk {userId, sndFileId, sndFileEntityId, filePrefixPath, digest, replicas = replica@SndFileChunkReplica {sndChunkReplicaId, server, delay} : _} -> do
let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay
withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop -> do
atomically $ waitWhileSuspended c
liftIO $ waitWhileSuspended c
liftIO $ waitForUserNetwork c
atomically $ incXFTPServerStat c userId srv uploadAttempts
uploadFileChunk cfg fc replica
@@ -515,7 +515,7 @@ runXFTPSndWorker c srv Worker {doWork} = do
when (serverHostError e) $ notify c sndFileEntityId $ SFWARN e
liftIO $ closeXFTPServerClient c userId server digest
withStore' c $ \db -> updateSndChunkReplicaDelay db sndChunkReplicaId replicaDelay
atomically $ assertAgentForeground c
liftIO $ assertAgentForeground c
loop
retryDone e = do
atomically $ incXFTPServerStat c userId srv uploadErrs
@@ -526,9 +526,9 @@ runXFTPSndWorker c srv Worker {doWork} = do
fsFilePath <- lift $ toFSFilePath filePath
unlessM (doesFileExist fsFilePath) $ throwE $ FILE NO_FILE
let chunkSpec' = chunkSpec {filePath = fsFilePath} :: XFTPChunkSpec
atomically $ assertAgentForeground c
liftIO $ assertAgentForeground c
agentXFTPUploadChunk c userId chunkDigest replica' chunkSpec'
atomically $ waitUntilForeground c
liftIO $ waitUntilForeground c
sf@SndFile {sndFileEntityId, prefixPath, chunks} <- withStore c $ \db -> do
updateSndChunkReplicaStatus db sndChunkReplicaId SFRSUploaded
getSndFile db sndFileId
@@ -666,7 +666,7 @@ runXFTPDelWorker c srv Worker {doWork} = do
cfg <- asks config
forever $ do
lift $ waitForWork doWork
atomically $ assertAgentForeground c
liftIO $ assertAgentForeground c
runXFTPOperation cfg
where
runXFTPOperation :: AgentConfig -> AM ()
@@ -677,7 +677,7 @@ runXFTPDelWorker c srv Worker {doWork} = do
processDeletedReplica replica@DeletedSndChunkReplica {deletedSndChunkReplicaId, userId, server, chunkDigest, delay} = do
let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay
withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop -> do
atomically $ waitWhileSuspended c
liftIO $ waitWhileSuspended c
liftIO $ waitForUserNetwork c
atomically $ incXFTPServerStat c userId srv deleteAttempts
deleteChunkReplica
@@ -688,7 +688,7 @@ runXFTPDelWorker c srv Worker {doWork} = do
when (serverHostError e) $ notify c "" $ SFWARN e
liftIO $ closeXFTPServerClient c userId server chunkDigest
withStore' c $ \db -> updateDeletedSndChunkReplicaDelay db deletedSndChunkReplicaId replicaDelay
atomically $ assertAgentForeground c
liftIO $ assertAgentForeground c
loop
retryDone e = do
atomically $ incXFTPServerStat c userId srv deleteErrs
@@ -703,7 +703,7 @@ delWorkerInternalError c deletedSndChunkReplicaId e = do
withStore' c $ \db -> deleteDeletedSndChunkReplica db deletedSndChunkReplicaId
notify c "" $ SFERR e
assertAgentForeground :: AgentClient -> STM ()
assertAgentForeground :: AgentClient -> IO ()
assertAgentForeground c = do
throwWhenInactive c
waitUntilForeground c
+8 -8
View File
@@ -235,7 +235,7 @@ logServersStats c = do
liftIO $ threadDelay' delay
int <- asks (logStatsInterval . config)
forever $ do
atomically $ waitUntilActive c
liftIO $ waitUntilActive c
saveServersStats c
liftIO $ threadDelay' int
@@ -1136,7 +1136,7 @@ runCommandProcessing c@AgentClient {subQ} server_ Worker {doWork} = do
forever $ do
atomically $ endAgentOperation c AOSndNetwork
lift $ waitForWork doWork
atomically $ throwWhenInactive c
liftIO $ throwWhenInactive c
atomically $ beginAgentOperation c AOSndNetwork
withWork c doWork (`getPendingServerCommand` server_) $ runProcessCmd (riFast ri)
where
@@ -1254,7 +1254,7 @@ runCommandProcessing c@AgentClient {subQ} server_ Worker {doWork} = do
SomeConn _ conn@DuplexConnection {} -> a conn
_ -> internalErr "command requires duplex connection"
tryCommand action = withRetryInterval ri $ \_ loop -> do
atomically $ waitWhileSuspended c
liftIO $ waitWhileSuspended c
liftIO $ waitForUserNetwork c
tryError action >>= \case
Left e
@@ -1363,7 +1363,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} ConnData {connId} sq@SndQueue {userI
forever $ do
atomically $ endAgentOperation c AOSndNetwork
lift $ waitForWork doWork
atomically $ throwWhenInactive c
liftIO $ throwWhenInactive c
atomically $ throwWhenNoDelivery c sq
atomically $ beginAgentOperation c AOSndNetwork
withWork c doWork (\db -> getPendingQueueMsg db connId sq) $
@@ -1372,7 +1372,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} ConnData {connId} sq@SndQueue {userI
let mId = unId msgId
ri' = maybe id updateRetryInterval2 msgRetryState ri
withRetryLock2 ri' qLock $ \riState loop -> do
atomically $ waitWhileSuspended c
liftIO $ waitWhileSuspended c
liftIO $ waitForUserNetwork c
resp <- tryError $ case msgType of
AM_CONN_INFO -> sendConfirmation c sq msgBody
@@ -1525,7 +1525,7 @@ retrySndOp :: AgentClient -> AM () -> AM ()
retrySndOp c loop = do
-- end... is in a separate atomically because if begin... blocks, SUSPENDED won't be sent
atomically $ endAgentOperation c AOSndNetwork
atomically $ throwWhenInactive c
liftIO $ throwWhenInactive c
atomically $ beginAgentOperation c AOSndNetwork
loop
@@ -2113,7 +2113,7 @@ cleanupManager c@AgentClient {subQ} = do
int <- asks (cleanupInterval . config)
ttl <- asks $ storedMsgDataTTL . config
forever $ do
atomically $ waitUntilActive c
liftIO $ waitUntilActive c
run ERR deleteConns
run ERR $ withStore' c (`deleteRcvMsgHashesExpired` ttl)
run ERR $ withStore' c (`deleteSndMsgsExpired` ttl)
@@ -2133,7 +2133,7 @@ cleanupManager c@AgentClient {subQ} = do
step <- asks $ cleanupStepInterval . config
liftIO $ threadDelay step
-- we are catching it to avoid CRITICAL errors in tests when this is the only remaining handle to active
waitActive a = liftIO (E.tryAny . atomically $ waitUntilActive c) >>= either (\_ -> pure ()) (\_ -> void a)
waitActive a = liftIO (E.tryAny $ waitUntilActive c) >>= either (\_ -> pure ()) (\_ -> void a)
deleteConns =
withLock (deleteLock c) "cleanupManager" $ do
void $ withStore' c getDeletedConnIds >>= deleteDeletedConns c
+17 -14
View File
@@ -720,7 +720,7 @@ resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess = do
withRetryForeground ri isForeground (isNetworkOnline c) $ \_ loop -> do
pending <- atomically getPending
forM_ (L.nonEmpty pending) $ \qs -> do
atomically $ waitUntilForeground c
liftIO $ waitUntilForeground c
liftIO $ waitForUserNetwork c
reconnectSMPClient c tSess qs
loop
@@ -911,12 +911,13 @@ cancelWorker Worker {doWork, action} = do
noWorkToDo doWork
atomically (tryTakeTMVar action) >>= mapM_ (mapM_ uninterruptibleCancel)
waitUntilActive :: AgentClient -> STM ()
waitUntilActive c = unlessM (readTVar $ active c) retry
{-# INLINE waitUntilActive #-}
waitUntilActive :: AgentClient -> IO ()
waitUntilActive AgentClient {active} =
unlessM (readTVarIO active) . atomically $
unlessM (readTVar active) retry
throwWhenInactive :: AgentClient -> STM ()
throwWhenInactive c = unlessM (readTVar $ active c) $ throwSTM ThreadKilled
throwWhenInactive :: AgentClient -> IO ()
throwWhenInactive c = unlessM (readTVarIO $ active c) $ E.throwIO ThreadKilled
{-# INLINE throwWhenInactive #-}
-- this function is used to remove workers once delivery is complete, not when it is removed from the map
@@ -1865,22 +1866,24 @@ beginAgentOperation c op = do
-- unsafeIOToSTM $ putStrLn $ "beginOperation! " <> show op <> " " <> show (opsInProgress s + 1)
writeTVar opVar $! s {opsInProgress = opsInProgress s + 1}
agentOperationBracket :: MonadUnliftIO m => AgentClient -> AgentOperation -> (AgentClient -> STM ()) -> m a -> m a
agentOperationBracket :: MonadUnliftIO m => AgentClient -> AgentOperation -> (AgentClient -> IO ()) -> m a -> m a
agentOperationBracket c op check action =
E.bracket
(atomically (check c) >> atomically (beginAgentOperation c op))
(liftIO (check c) >> atomically (beginAgentOperation c op))
(\_ -> atomically $ endAgentOperation c op)
(const action)
waitUntilForeground :: AgentClient -> STM ()
waitUntilForeground c = unlessM ((ASForeground ==) <$> readTVar (agentState c)) retry
{-# INLINE waitUntilForeground #-}
waitUntilForeground :: AgentClient -> IO ()
waitUntilForeground AgentClient {agentState} =
unlessM ((ASForeground ==) <$> readTVarIO agentState) . atomically $
unlessM ((ASForeground ==) <$> readTVar agentState) retry
-- This function waits while agent is suspended, but will proceed while it is suspending,
-- to allow completing in-flight operations.
waitWhileSuspended :: AgentClient -> STM ()
waitWhileSuspended c = unlessM ((ASSuspended /=) <$> readTVar (agentState c)) retry
{-# INLINE waitWhileSuspended #-}
waitWhileSuspended :: AgentClient -> IO ()
waitWhileSuspended AgentClient {agentState} =
unlessM ((ASSuspended /=) <$> readTVarIO agentState) . atomically $
unlessM ((ASSuspended /=) <$> readTVar agentState) retry
withStore' :: AgentClient -> (DB.Connection -> IO a) -> AM a
withStore' c action = withStore c $ fmap Right . action
@@ -159,7 +159,7 @@ runNtfWorker c srv Worker {doWork} =
logInfo $ "runNtfWorker, nextSub " <> tshow nextSub
ri <- asks $ reconnectInterval . config
withRetryInterval ri $ \_ loop -> do
atomically $ waitWhileSuspended c
liftIO $ waitWhileSuspended c
liftIO $ waitForUserNetwork c
processSub nextSub
`catchAgentError` retryOnError c "NtfWorker" loop (workerInternalError c connId . show)
@@ -244,7 +244,7 @@ runNtfSMPWorker c srv Worker {doWork} = do
logInfo $ "runNtfSMPWorker, nextSub " <> tshow nextSub
ri <- asks $ reconnectInterval . config
withRetryInterval ri $ \_ loop -> do
atomically $ waitWhileSuspended c
liftIO $ waitWhileSuspended c
liftIO $ waitForUserNetwork c
processSub nextSub
`catchAgentError` retryOnError c "NtfSMPWorker" loop (workerInternalError c connId . show)
@@ -297,7 +297,7 @@ retryOnError c name loop done e = do
where
retryLoop = do
atomically $ endAgentOperation c AONtfNetwork
atomically $ throwWhenInactive c
liftIO $ throwWhenInactive c
atomically $ beginAgentOperation c AONtfNetwork
loop