agent: avoid race condition between worker and supervisor when getting work (#928)

This commit is contained in:
Evgeny Poberezkin
2023-12-22 11:12:36 +00:00
committed by GitHub
parent 1c2604f6a3
commit fa457d1c25
3 changed files with 54 additions and 59 deletions

View File

@@ -161,19 +161,16 @@ addWorker c wsSel runWorker runWorkerNoSrv srv_ = do
runXFTPRcvWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> TMVar () -> m ()
runXFTPRcvWorker c srv doWork = do
forever $ do
void . atomically $ readTMVar doWork
waitForWork doWork
atomically $ assertAgentForeground c
runXFTPOperation
where
noWorkToDo = void . atomically $ tryTakeTMVar doWork
runXFTPOperation :: m ()
runXFTPOperation = do
rcvFilesTTL <- asks (rcvFilesTTL . config)
nextChunk <- withStore' c $ \db -> getNextRcvChunkToDownload db srv rcvFilesTTL
case nextChunk of
Nothing -> noWorkToDo
Just RcvFileChunk {rcvFileId, rcvFileEntityId, fileTmpPath, replicas = []} -> rcvWorkerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) "chunk has no replicas"
Just fc@RcvFileChunk {userId, rcvFileId, rcvFileEntityId, digest, fileTmpPath, replicas = replica@RcvFileChunkReplica {rcvChunkReplicaId, server, delay} : _} -> do
withWork c doWork (\db -> getNextRcvChunkToDownload db srv rcvFilesTTL) $ \case
RcvFileChunk {rcvFileId, rcvFileEntityId, fileTmpPath, replicas = []} -> rcvWorkerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) "chunk has no replicas"
fc@RcvFileChunk {userId, rcvFileId, rcvFileEntityId, digest, fileTmpPath, replicas = replica@RcvFileChunkReplica {rcvChunkReplicaId, server, delay} : _} -> do
ri <- asks $ reconnectInterval . config
let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay
withRetryInterval ri' $ \delay' loop ->
@@ -230,19 +227,16 @@ rcvWorkerInternalError c rcvFileId rcvFileEntityId tmpPath internalErrStr = do
runXFTPRcvLocalWorker :: forall m. AgentMonad m => AgentClient -> TMVar () -> m ()
runXFTPRcvLocalWorker c doWork = do
forever $ do
void . atomically $ readTMVar doWork
waitForWork doWork
atomically $ assertAgentForeground c
runXFTPOperation
where
runXFTPOperation :: m ()
runXFTPOperation = do
rcvFilesTTL <- asks (rcvFilesTTL . config)
nextFile <- withStore' c (`getNextRcvFileToDecrypt` rcvFilesTTL)
case nextFile of
Nothing -> noWorkToDo
Just f@RcvFile {rcvFileId, rcvFileEntityId, tmpPath} ->
withWork c doWork (`getNextRcvFileToDecrypt` rcvFilesTTL) $
\f@RcvFile {rcvFileId, rcvFileEntityId, tmpPath} ->
decryptFile f `catchAgentError` (rcvWorkerInternalError c rcvFileId rcvFileEntityId tmpPath . show)
noWorkToDo = void . atomically $ tryTakeTMVar doWork
decryptFile :: RcvFile -> m ()
decryptFile RcvFile {rcvFileId, rcvFileEntityId, key, nonce, tmpPath, saveFile, status, chunks} = do
let CryptoFile savePath cfArgs = saveFile
@@ -299,19 +293,16 @@ addXFTPSndWorker c = addWorker c xftpSndWorkers runXFTPSndWorker runXFTPSndPrepa
runXFTPSndPrepareWorker :: forall m. AgentMonad m => AgentClient -> TMVar () -> m ()
runXFTPSndPrepareWorker c doWork = do
forever $ do
void . atomically $ readTMVar doWork
waitForWork doWork
atomically $ assertAgentForeground c
runXFTPOperation
where
runXFTPOperation :: m ()
runXFTPOperation = do
sndFilesTTL <- asks (sndFilesTTL . config)
nextFile <- withStore' c (`getNextSndFileToPrepare` sndFilesTTL)
case nextFile of
Nothing -> noWorkToDo
Just f@SndFile {sndFileId, sndFileEntityId, prefixPath} ->
withWork c doWork (`getNextSndFileToPrepare` sndFilesTTL) $
\f@SndFile {sndFileId, sndFileEntityId, prefixPath} ->
prepareFile f `catchAgentError` (sndWorkerInternalError c sndFileId sndFileEntityId prefixPath . show)
noWorkToDo = void . atomically $ tryTakeTMVar doWork
prepareFile :: SndFile -> m ()
prepareFile SndFile {prefixPath = Nothing} =
throwError $ INTERNAL "no prefix path"
@@ -384,19 +375,16 @@ sndWorkerInternalError c sndFileId sndFileEntityId prefixPath internalErrStr = d
runXFTPSndWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> TMVar () -> m ()
runXFTPSndWorker c srv doWork = do
forever $ do
void . atomically $ readTMVar doWork
waitForWork doWork
atomically $ assertAgentForeground c
runXFTPOperation
where
noWorkToDo = void . atomically $ tryTakeTMVar doWork
runXFTPOperation :: m ()
runXFTPOperation = do
sndFilesTTL <- asks (sndFilesTTL . config)
nextChunk <- withStore' c $ \db -> getNextSndChunkToUpload db srv sndFilesTTL
case nextChunk of
Nothing -> noWorkToDo
Just SndFileChunk {sndFileId, sndFileEntityId, filePrefixPath, replicas = []} -> sndWorkerInternalError c sndFileId sndFileEntityId (Just filePrefixPath) "chunk has no replicas"
Just fc@SndFileChunk {userId, sndFileId, sndFileEntityId, filePrefixPath, digest, replicas = replica@SndFileChunkReplica {sndChunkReplicaId, server, delay} : _} -> do
withWork c doWork (\db -> getNextSndChunkToUpload db srv sndFilesTTL) $ \case
SndFileChunk {sndFileId, sndFileEntityId, filePrefixPath, replicas = []} -> sndWorkerInternalError c sndFileId sndFileEntityId (Just filePrefixPath) "chunk has no replicas"
fc@SndFileChunk {userId, sndFileId, sndFileEntityId, filePrefixPath, digest, replicas = replica@SndFileChunkReplica {sndChunkReplicaId, server, delay} : _} -> do
ri <- asks $ reconnectInterval . config
let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay
withRetryInterval ri' $ \delay' loop ->
@@ -544,23 +532,21 @@ addXFTPDelWorker c srv = do
runXFTPDelWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> TMVar () -> m ()
runXFTPDelWorker c srv doWork = do
forever $ do
void . atomically $ readTMVar doWork
waitForWork doWork
atomically $ assertAgentForeground c
runXFTPOperation
where
noWorkToDo = void . atomically $ tryTakeTMVar doWork
runXFTPOperation :: m ()
runXFTPOperation = do
-- no point in deleting files older than rcv ttl, as they will be expired on server
rcvFilesTTL <- asks (rcvFilesTTL . config)
nextReplica <- withStore' c $ \db -> getNextDeletedSndChunkReplica db srv rcvFilesTTL
case nextReplica of
Nothing -> noWorkToDo
Just replica@DeletedSndChunkReplica {deletedSndChunkReplicaId, userId, server, chunkDigest, delay} -> do
withWork c doWork (\db -> getNextDeletedSndChunkReplica db srv rcvFilesTTL) processDeletedReplica
where
processDeletedReplica replica@DeletedSndChunkReplica {deletedSndChunkReplicaId, userId, server, chunkDigest, delay} = do
ri <- asks $ reconnectInterval . config
let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay
withRetryInterval ri' $ \delay' loop ->
deleteChunkReplica replica
deleteChunkReplica
`catchAgentError` \e -> retryOnError "XFTP del worker" (retryLoop loop e delay') (retryDone e) e
where
retryLoop loop e replicaDelay = do
@@ -572,10 +558,9 @@ runXFTPDelWorker c srv doWork = do
atomically $ assertAgentForeground c
loop
retryDone = delWorkerInternalError c deletedSndChunkReplicaId
deleteChunkReplica :: DeletedSndChunkReplica -> m ()
deleteChunkReplica replica@DeletedSndChunkReplica {userId, deletedSndChunkReplicaId} = do
agentXFTPDeleteChunk c userId replica
withStore' c $ \db -> deleteDeletedSndChunkReplica db deletedSndChunkReplicaId
deleteChunkReplica = do
agentXFTPDeleteChunk c userId replica
withStore' c $ \db -> deleteDeletedSndChunkReplica db deletedSndChunkReplicaId
delWorkerInternalError :: AgentMonad m => AgentClient -> Int64 -> AgentErrorType -> m ()
delWorkerInternalError c deletedSndChunkReplicaId e = do

View File

@@ -85,6 +85,8 @@ module Simplex.Messaging.Agent.Client
AgentState (..),
AgentLocks (..),
AgentStatsKey (..),
waitForWork,
withWork,
agentOperations,
agentOperationBracket,
waitUntilActive,
@@ -133,6 +135,7 @@ import Data.Functor (($>))
import Data.List (deleteFirstsBy, foldl', partition, (\\))
import Data.List.NonEmpty (NonEmpty (..), (<|))
import qualified Data.List.NonEmpty as L
import Data.Maybe (isNothing)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Data.Maybe (isJust, listToMaybe)
@@ -1224,6 +1227,19 @@ cryptoError = \case
where
c = AGENT . A_CRYPTO
waitForWork :: AgentMonad m => TMVar () -> m ()
waitForWork = void . atomically . readTMVar
withWork :: AgentMonad m => AgentClient -> TMVar () -> (DB.Connection -> IO (Maybe a)) -> (a -> m ()) -> m ()
withWork c doWork getWork action = do
r <- withStore' c $ \db -> do
r' <- getWork db
when (isNothing r') noWorkToDo
pure r'
mapM_ action r
where
noWorkToDo = void . atomically $ tryTakeTMVar doWork
endAgentOperation :: AgentClient -> AgentOperation -> STM ()
endAgentOperation c op = endOperation c op $ case op of
AONtfNetwork -> pure ()

View File

@@ -159,24 +159,21 @@ runNtfWorker :: forall m. AgentMonad m => AgentClient -> NtfServer -> TMVar () -
runNtfWorker c srv doWork = do
delay <- asks $ ntfWorkerDelay . config
forever $ do
void . atomically $ readTMVar doWork
waitForWork doWork
agentOperationBracket c AONtfNetwork throwWhenInactive runNtfOperation
threadDelay delay
where
runNtfOperation :: m ()
runNtfOperation = do
nextSub_ <- withStore' c (`getNextNtfSubNTFAction` srv)
logInfo $ "runNtfWorker, nextSub_ " <> tshow nextSub_
case nextSub_ of
Nothing -> noWorkToDo
Just a@(NtfSubscription {connId}, _, _) -> do
runNtfOperation =
withWork c doWork (`getNextNtfSubNTFAction` srv) $
\nextSub@(NtfSubscription {connId}, _, _) -> do
logInfo $ "runNtfWorker, nextSub " <> tshow nextSub
ri <- asks $ reconnectInterval . config
withRetryInterval ri $ \_ loop ->
processAction a
processSub nextSub
`catchAgentError` retryOnError c "NtfWorker" loop (workerInternalError c connId . show)
noWorkToDo = void . atomically $ tryTakeTMVar doWork
processAction :: (NtfSubscription, NtfSubNTFAction, NtfActionTs) -> m ()
processAction (sub@NtfSubscription {connId, smpServer, ntfSubId}, action, actionTs) = do
processSub :: (NtfSubscription, NtfSubNTFAction, NtfActionTs) -> m ()
processSub (sub@NtfSubscription {connId, smpServer, ntfSubId}, action, actionTs) = do
ts <- liftIO getCurrentTime
unlessM (rescheduleAction doWork ts actionTs) $
case action of
@@ -244,23 +241,20 @@ runNtfSMPWorker :: forall m. AgentMonad m => AgentClient -> SMPServer -> TMVar (
runNtfSMPWorker c srv doWork = do
delay <- asks $ ntfSMPWorkerDelay . config
forever $ do
void . atomically $ readTMVar doWork
waitForWork doWork
agentOperationBracket c AONtfNetwork throwWhenInactive runNtfSMPOperation
threadDelay delay
where
runNtfSMPOperation = do
nextSub_ <- withStore' c (`getNextNtfSubSMPAction` srv)
logInfo $ "runNtfSMPWorker, nextSub_ " <> tshow nextSub_
case nextSub_ of
Nothing -> noWorkToDo
Just a@(NtfSubscription {connId}, _, _) -> do
runNtfSMPOperation =
withWork c doWork (`getNextNtfSubSMPAction` srv) $
\nextSub@(NtfSubscription {connId}, _, _) -> do
logInfo $ "runNtfSMPWorker, nextSub " <> tshow nextSub
ri <- asks $ reconnectInterval . config
withRetryInterval ri $ \_ loop ->
processAction a
processSub nextSub
`catchAgentError` retryOnError c "NtfSMPWorker" loop (workerInternalError c connId . show)
noWorkToDo = void . atomically $ tryTakeTMVar doWork
processAction :: (NtfSubscription, NtfSubSMPAction, NtfActionTs) -> m ()
processAction (sub@NtfSubscription {connId, ntfServer}, smpAction, actionTs) = do
processSub :: (NtfSubscription, NtfSubSMPAction, NtfActionTs) -> m ()
processSub (sub@NtfSubscription {connId, ntfServer}, smpAction, actionTs) = do
ts <- liftIO getCurrentTime
unlessM (rescheduleAction doWork ts actionTs) $
case smpAction of