From fa457d1c250a91f0df72fc7aec9af33282eac3d2 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Fri, 22 Dec 2023 11:12:36 +0000 Subject: [PATCH] agent: avoid race condition between worker and supervisor when getting work (#928) --- src/Simplex/FileTransfer/Agent.hs | 59 +++++++------------ src/Simplex/Messaging/Agent/Client.hs | 16 +++++ .../Messaging/Agent/NtfSubSupervisor.hs | 38 +++++------- 3 files changed, 54 insertions(+), 59 deletions(-) diff --git a/src/Simplex/FileTransfer/Agent.hs b/src/Simplex/FileTransfer/Agent.hs index 2bc02374b..4c2b24df9 100644 --- a/src/Simplex/FileTransfer/Agent.hs +++ b/src/Simplex/FileTransfer/Agent.hs @@ -161,19 +161,16 @@ addWorker c wsSel runWorker runWorkerNoSrv srv_ = do runXFTPRcvWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> TMVar () -> m () runXFTPRcvWorker c srv doWork = do forever $ do - void . atomically $ readTMVar doWork + waitForWork doWork atomically $ assertAgentForeground c runXFTPOperation where - noWorkToDo = void . atomically $ tryTakeTMVar doWork runXFTPOperation :: m () runXFTPOperation = do rcvFilesTTL <- asks (rcvFilesTTL . config) - nextChunk <- withStore' c $ \db -> getNextRcvChunkToDownload db srv rcvFilesTTL - case nextChunk of - Nothing -> noWorkToDo - Just RcvFileChunk {rcvFileId, rcvFileEntityId, fileTmpPath, replicas = []} -> rcvWorkerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) "chunk has no replicas" - Just fc@RcvFileChunk {userId, rcvFileId, rcvFileEntityId, digest, fileTmpPath, replicas = replica@RcvFileChunkReplica {rcvChunkReplicaId, server, delay} : _} -> do + withWork c doWork (\db -> getNextRcvChunkToDownload db srv rcvFilesTTL) $ \case + RcvFileChunk {rcvFileId, rcvFileEntityId, fileTmpPath, replicas = []} -> rcvWorkerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) "chunk has no replicas" + fc@RcvFileChunk {userId, rcvFileId, rcvFileEntityId, digest, fileTmpPath, replicas = replica@RcvFileChunkReplica {rcvChunkReplicaId, server, delay} : _} -> do ri <- asks $ reconnectInterval . config let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay withRetryInterval ri' $ \delay' loop -> @@ -230,19 +227,16 @@ rcvWorkerInternalError c rcvFileId rcvFileEntityId tmpPath internalErrStr = do runXFTPRcvLocalWorker :: forall m. AgentMonad m => AgentClient -> TMVar () -> m () runXFTPRcvLocalWorker c doWork = do forever $ do - void . atomically $ readTMVar doWork + waitForWork doWork atomically $ assertAgentForeground c runXFTPOperation where runXFTPOperation :: m () runXFTPOperation = do rcvFilesTTL <- asks (rcvFilesTTL . config) - nextFile <- withStore' c (`getNextRcvFileToDecrypt` rcvFilesTTL) - case nextFile of - Nothing -> noWorkToDo - Just f@RcvFile {rcvFileId, rcvFileEntityId, tmpPath} -> + withWork c doWork (`getNextRcvFileToDecrypt` rcvFilesTTL) $ + \f@RcvFile {rcvFileId, rcvFileEntityId, tmpPath} -> decryptFile f `catchAgentError` (rcvWorkerInternalError c rcvFileId rcvFileEntityId tmpPath . show) - noWorkToDo = void . atomically $ tryTakeTMVar doWork decryptFile :: RcvFile -> m () decryptFile RcvFile {rcvFileId, rcvFileEntityId, key, nonce, tmpPath, saveFile, status, chunks} = do let CryptoFile savePath cfArgs = saveFile @@ -299,19 +293,16 @@ addXFTPSndWorker c = addWorker c xftpSndWorkers runXFTPSndWorker runXFTPSndPrepa runXFTPSndPrepareWorker :: forall m. AgentMonad m => AgentClient -> TMVar () -> m () runXFTPSndPrepareWorker c doWork = do forever $ do - void . atomically $ readTMVar doWork + waitForWork doWork atomically $ assertAgentForeground c runXFTPOperation where runXFTPOperation :: m () runXFTPOperation = do sndFilesTTL <- asks (sndFilesTTL . config) - nextFile <- withStore' c (`getNextSndFileToPrepare` sndFilesTTL) - case nextFile of - Nothing -> noWorkToDo - Just f@SndFile {sndFileId, sndFileEntityId, prefixPath} -> + withWork c doWork (`getNextSndFileToPrepare` sndFilesTTL) $ + \f@SndFile {sndFileId, sndFileEntityId, prefixPath} -> prepareFile f `catchAgentError` (sndWorkerInternalError c sndFileId sndFileEntityId prefixPath . show) - noWorkToDo = void . atomically $ tryTakeTMVar doWork prepareFile :: SndFile -> m () prepareFile SndFile {prefixPath = Nothing} = throwError $ INTERNAL "no prefix path" @@ -384,19 +375,16 @@ sndWorkerInternalError c sndFileId sndFileEntityId prefixPath internalErrStr = d runXFTPSndWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> TMVar () -> m () runXFTPSndWorker c srv doWork = do forever $ do - void . atomically $ readTMVar doWork + waitForWork doWork atomically $ assertAgentForeground c runXFTPOperation where - noWorkToDo = void . atomically $ tryTakeTMVar doWork runXFTPOperation :: m () runXFTPOperation = do sndFilesTTL <- asks (sndFilesTTL . config) - nextChunk <- withStore' c $ \db -> getNextSndChunkToUpload db srv sndFilesTTL - case nextChunk of - Nothing -> noWorkToDo - Just SndFileChunk {sndFileId, sndFileEntityId, filePrefixPath, replicas = []} -> sndWorkerInternalError c sndFileId sndFileEntityId (Just filePrefixPath) "chunk has no replicas" - Just fc@SndFileChunk {userId, sndFileId, sndFileEntityId, filePrefixPath, digest, replicas = replica@SndFileChunkReplica {sndChunkReplicaId, server, delay} : _} -> do + withWork c doWork (\db -> getNextSndChunkToUpload db srv sndFilesTTL) $ \case + SndFileChunk {sndFileId, sndFileEntityId, filePrefixPath, replicas = []} -> sndWorkerInternalError c sndFileId sndFileEntityId (Just filePrefixPath) "chunk has no replicas" + fc@SndFileChunk {userId, sndFileId, sndFileEntityId, filePrefixPath, digest, replicas = replica@SndFileChunkReplica {sndChunkReplicaId, server, delay} : _} -> do ri <- asks $ reconnectInterval . config let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay withRetryInterval ri' $ \delay' loop -> @@ -544,23 +532,21 @@ addXFTPDelWorker c srv = do runXFTPDelWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> TMVar () -> m () runXFTPDelWorker c srv doWork = do forever $ do - void . atomically $ readTMVar doWork + waitForWork doWork atomically $ assertAgentForeground c runXFTPOperation where - noWorkToDo = void . atomically $ tryTakeTMVar doWork runXFTPOperation :: m () runXFTPOperation = do -- no point in deleting files older than rcv ttl, as they will be expired on server rcvFilesTTL <- asks (rcvFilesTTL . config) - nextReplica <- withStore' c $ \db -> getNextDeletedSndChunkReplica db srv rcvFilesTTL - case nextReplica of - Nothing -> noWorkToDo - Just replica@DeletedSndChunkReplica {deletedSndChunkReplicaId, userId, server, chunkDigest, delay} -> do + withWork c doWork (\db -> getNextDeletedSndChunkReplica db srv rcvFilesTTL) processDeletedReplica + where + processDeletedReplica replica@DeletedSndChunkReplica {deletedSndChunkReplicaId, userId, server, chunkDigest, delay} = do ri <- asks $ reconnectInterval . config let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay withRetryInterval ri' $ \delay' loop -> - deleteChunkReplica replica + deleteChunkReplica `catchAgentError` \e -> retryOnError "XFTP del worker" (retryLoop loop e delay') (retryDone e) e where retryLoop loop e replicaDelay = do @@ -572,10 +558,9 @@ runXFTPDelWorker c srv doWork = do atomically $ assertAgentForeground c loop retryDone = delWorkerInternalError c deletedSndChunkReplicaId - deleteChunkReplica :: DeletedSndChunkReplica -> m () - deleteChunkReplica replica@DeletedSndChunkReplica {userId, deletedSndChunkReplicaId} = do - agentXFTPDeleteChunk c userId replica - withStore' c $ \db -> deleteDeletedSndChunkReplica db deletedSndChunkReplicaId + deleteChunkReplica = do + agentXFTPDeleteChunk c userId replica + withStore' c $ \db -> deleteDeletedSndChunkReplica db deletedSndChunkReplicaId delWorkerInternalError :: AgentMonad m => AgentClient -> Int64 -> AgentErrorType -> m () delWorkerInternalError c deletedSndChunkReplicaId e = do diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 77c3a9312..45ba959c6 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -85,6 +85,8 @@ module Simplex.Messaging.Agent.Client AgentState (..), AgentLocks (..), AgentStatsKey (..), + waitForWork, + withWork, agentOperations, agentOperationBracket, waitUntilActive, @@ -133,6 +135,7 @@ import Data.Functor (($>)) import Data.List (deleteFirstsBy, foldl', partition, (\\)) import Data.List.NonEmpty (NonEmpty (..), (<|)) import qualified Data.List.NonEmpty as L +import Data.Maybe (isNothing) import Data.Map.Strict (Map) import qualified Data.Map.Strict as M import Data.Maybe (isJust, listToMaybe) @@ -1224,6 +1227,19 @@ cryptoError = \case where c = AGENT . A_CRYPTO +waitForWork :: AgentMonad m => TMVar () -> m () +waitForWork = void . atomically . readTMVar + +withWork :: AgentMonad m => AgentClient -> TMVar () -> (DB.Connection -> IO (Maybe a)) -> (a -> m ()) -> m () +withWork c doWork getWork action = do + r <- withStore' c $ \db -> do + r' <- getWork db + when (isNothing r') noWorkToDo + pure r' + mapM_ action r + where + noWorkToDo = void . atomically $ tryTakeTMVar doWork + endAgentOperation :: AgentClient -> AgentOperation -> STM () endAgentOperation c op = endOperation c op $ case op of AONtfNetwork -> pure () diff --git a/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs b/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs index cc7c1b7e4..6df4e393d 100644 --- a/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs +++ b/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs @@ -159,24 +159,21 @@ runNtfWorker :: forall m. AgentMonad m => AgentClient -> NtfServer -> TMVar () - runNtfWorker c srv doWork = do delay <- asks $ ntfWorkerDelay . config forever $ do - void . atomically $ readTMVar doWork + waitForWork doWork agentOperationBracket c AONtfNetwork throwWhenInactive runNtfOperation threadDelay delay where runNtfOperation :: m () - runNtfOperation = do - nextSub_ <- withStore' c (`getNextNtfSubNTFAction` srv) - logInfo $ "runNtfWorker, nextSub_ " <> tshow nextSub_ - case nextSub_ of - Nothing -> noWorkToDo - Just a@(NtfSubscription {connId}, _, _) -> do + runNtfOperation = + withWork c doWork (`getNextNtfSubNTFAction` srv) $ + \nextSub@(NtfSubscription {connId}, _, _) -> do + logInfo $ "runNtfWorker, nextSub " <> tshow nextSub ri <- asks $ reconnectInterval . config withRetryInterval ri $ \_ loop -> - processAction a + processSub nextSub `catchAgentError` retryOnError c "NtfWorker" loop (workerInternalError c connId . show) - noWorkToDo = void . atomically $ tryTakeTMVar doWork - processAction :: (NtfSubscription, NtfSubNTFAction, NtfActionTs) -> m () - processAction (sub@NtfSubscription {connId, smpServer, ntfSubId}, action, actionTs) = do + processSub :: (NtfSubscription, NtfSubNTFAction, NtfActionTs) -> m () + processSub (sub@NtfSubscription {connId, smpServer, ntfSubId}, action, actionTs) = do ts <- liftIO getCurrentTime unlessM (rescheduleAction doWork ts actionTs) $ case action of @@ -244,23 +241,20 @@ runNtfSMPWorker :: forall m. AgentMonad m => AgentClient -> SMPServer -> TMVar ( runNtfSMPWorker c srv doWork = do delay <- asks $ ntfSMPWorkerDelay . config forever $ do - void . atomically $ readTMVar doWork + waitForWork doWork agentOperationBracket c AONtfNetwork throwWhenInactive runNtfSMPOperation threadDelay delay where - runNtfSMPOperation = do - nextSub_ <- withStore' c (`getNextNtfSubSMPAction` srv) - logInfo $ "runNtfSMPWorker, nextSub_ " <> tshow nextSub_ - case nextSub_ of - Nothing -> noWorkToDo - Just a@(NtfSubscription {connId}, _, _) -> do + runNtfSMPOperation = + withWork c doWork (`getNextNtfSubSMPAction` srv) $ + \nextSub@(NtfSubscription {connId}, _, _) -> do + logInfo $ "runNtfSMPWorker, nextSub " <> tshow nextSub ri <- asks $ reconnectInterval . config withRetryInterval ri $ \_ loop -> - processAction a + processSub nextSub `catchAgentError` retryOnError c "NtfSMPWorker" loop (workerInternalError c connId . show) - noWorkToDo = void . atomically $ tryTakeTMVar doWork - processAction :: (NtfSubscription, NtfSubSMPAction, NtfActionTs) -> m () - processAction (sub@NtfSubscription {connId, ntfServer}, smpAction, actionTs) = do + processSub :: (NtfSubscription, NtfSubSMPAction, NtfActionTs) -> m () + processSub (sub@NtfSubscription {connId, ntfServer}, smpAction, actionTs) = do ts <- liftIO getCurrentTime unlessM (rescheduleAction doWork ts actionTs) $ case smpAction of