diff --git a/src/Simplex/FileTransfer/Agent.hs b/src/Simplex/FileTransfer/Agent.hs index dba8b3850..aa816ec34 100644 --- a/src/Simplex/FileTransfer/Agent.hs +++ b/src/Simplex/FileTransfer/Agent.hs @@ -7,7 +7,6 @@ {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} -{-# LANGUAGE TupleSections #-} {-# LANGUAGE TypeApplications #-} {-# OPTIONS_GHC -fno-warn-ambiguous-fields #-} @@ -31,6 +30,7 @@ import Control.Monad.Except import Control.Monad.Reader import qualified Data.ByteString.Char8 as B import qualified Data.ByteString.Lazy.Char8 as LB +import Data.Composition ((.:)) import Data.Int (Int64) import Data.List (foldl', sortOn) import qualified Data.List.NonEmpty as L @@ -58,8 +58,6 @@ import qualified Simplex.Messaging.Crypto.File as CF import qualified Simplex.Messaging.Crypto.Lazy as LC import Simplex.Messaging.Encoding import Simplex.Messaging.Protocol (EntityId, XFTPServer) -import Simplex.Messaging.TMap (TMap) -import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Util (liftError, tshow, unlessM, whenM) import System.FilePath (takeFileName, ()) import UnliftIO @@ -76,28 +74,27 @@ startXFTPWorkers c workDir = do where startRcvFiles AgentConfig {rcvFilesTTL} = do pendingRcvServers <- withStore' c (`getPendingRcvFilesServers` rcvFilesTTL) - forM_ pendingRcvServers $ \s -> addXFTPRcvWorker c (Just s) + forM_ pendingRcvServers $ \s -> resumeXFTPRcvWork c (Just s) -- start local worker for files pending decryption, -- no need to make an extra query for the check -- as the worker will check the store anyway - addXFTPRcvWorker c Nothing + resumeXFTPRcvWork c Nothing startSndFiles AgentConfig {sndFilesTTL} = do -- start worker for files pending encryption/creation - addXFTPSndWorker c Nothing + resumeXFTPSndWork c Nothing pendingSndServers <- withStore' c (`getPendingSndFilesServers` sndFilesTTL) - forM_ pendingSndServers $ \s -> addXFTPSndWorker c (Just s) + forM_ pendingSndServers $ \s -> resumeXFTPSndWork c (Just s) startDelFiles AgentConfig {rcvFilesTTL} = do pendingDelServers <- withStore' c (`getPendingDelFilesServers` rcvFilesTTL) - forM_ pendingDelServers $ addXFTPDelWorker c + forM_ pendingDelServers $ resumeXFTPDelWork c closeXFTPAgent :: MonadUnliftIO m => XFTPAgent -> m () -closeXFTPAgent XFTPAgent {xftpRcvWorkers, xftpSndWorkers} = do - stopWorkers xftpRcvWorkers - stopWorkers xftpSndWorkers +closeXFTPAgent a = do + stopWorkers $ xftpRcvWorkers a + stopWorkers $ xftpSndWorkers a + stopWorkers $ xftpDelWorkers a where - stopWorkers wsSel = do - ws <- atomically $ stateTVar wsSel (,M.empty) - mapM_ (uninterruptibleCancel . snd) ws + stopWorkers workers = atomically (swapTVar workers M.empty) >>= mapM_ (liftIO . cancelWorker) xftpReceiveFile' :: AgentMonad m => AgentClient -> UserId -> ValidFileDescription 'FRecipient -> Maybe CryptoFileArgs -> m RcvFileId xftpReceiveFile' c userId (ValidFileDescription fd@FileDescription {chunks}) cfArgs = do @@ -116,7 +113,7 @@ xftpReceiveFile' c userId (ValidFileDescription fd@FileDescription {chunks}) cfA where downloadChunk :: AgentMonad m => FileChunk -> m () downloadChunk FileChunk {replicas = (FileChunkReplica {server} : _)} = do - addXFTPRcvWorker c (Just server) + void $ getXFTPRcvWorker True c (Just server) downloadChunk _ = throwError $ INTERNAL "no replicas" getPrefixPath :: AgentMonad m => String -> m FilePath @@ -132,31 +129,17 @@ toFSFilePath f = ( f) <$> getXFTPWorkPath createEmptyFile :: AgentMonad m => FilePath -> m () createEmptyFile fPath = liftIO $ B.writeFile fPath "" -addXFTPRcvWorker :: AgentMonad m => AgentClient -> Maybe XFTPServer -> m () -addXFTPRcvWorker c = addWorker c xftpRcvWorkers runXFTPRcvWorker runXFTPRcvLocalWorker +resumeXFTPRcvWork :: AgentMonad' m => AgentClient -> Maybe XFTPServer -> m () +resumeXFTPRcvWork = void .: getXFTPRcvWorker False -addWorker :: - AgentMonad m => - AgentClient -> - (XFTPAgent -> TMap (Maybe XFTPServer) (TMVar (), Async ())) -> - (AgentClient -> XFTPServer -> TMVar () -> m ()) -> - (AgentClient -> TMVar () -> m ()) -> - Maybe XFTPServer -> - m () -addWorker c wsSel runWorker runWorkerNoSrv srv_ = do - ws <- asks $ wsSel . xftpAgent - atomically (TM.lookup srv_ ws) >>= \case - Nothing -> do - doWork <- newTMVarIO () - let runWorker' = case srv_ of - Just srv -> runWorker c srv doWork - Nothing -> runWorkerNoSrv c doWork - worker <- async $ runWorker' `agentFinally` atomically (TM.delete srv_ ws) - atomically $ TM.insert srv_ (doWork, worker) ws - Just (doWork, _) -> atomically $ hasWorkToDo' doWork +getXFTPRcvWorker :: AgentMonad' m => Bool -> AgentClient -> Maybe XFTPServer -> m Worker +getXFTPRcvWorker hasWork c server = do + ws <- asks $ xftpRcvWorkers . xftpAgent + getAgentWorker hasWork c server ws $ + maybe (runXFTPRcvLocalWorker c) (runXFTPRcvWorker c) server -runXFTPRcvWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> TMVar () -> m () -runXFTPRcvWorker c srv doWork = do +runXFTPRcvWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> Worker -> m () +runXFTPRcvWorker c srv Worker {doWork} = do cfg <- asks config forever $ do waitForWork doWork @@ -197,7 +180,8 @@ runXFTPRcvWorker c srv doWork = do liftIO . when complete $ updateRcvFileStatus db rcvFileId RFSReceived pure (complete, RFPROG rcvd total) notify c rcvFileEntityId progress - when complete $ addXFTPRcvWorker c Nothing + when complete . void $ + getXFTPRcvWorker True c Nothing where receivedSize :: [RcvFileChunk] -> Int64 receivedSize = foldl' (\sz ch -> sz + receivedChunkSize ch) 0 @@ -225,8 +209,8 @@ rcvWorkerInternalError c rcvFileId rcvFileEntityId tmpPath internalErrStr = do withStore' c $ \db -> updateRcvFileError db rcvFileId internalErrStr notify c rcvFileEntityId $ RFERR $ INTERNAL internalErrStr -runXFTPRcvLocalWorker :: forall m. AgentMonad m => AgentClient -> TMVar () -> m () -runXFTPRcvLocalWorker c doWork = do +runXFTPRcvLocalWorker :: forall m. AgentMonad m => AgentClient -> Worker -> m () +runXFTPRcvLocalWorker c Worker {doWork} = do cfg <- asks config forever $ do waitForWork doWork @@ -285,14 +269,20 @@ xftpSendFile' c userId file numRecipients = do nonce <- atomically $ C.randomCbNonce g -- saving absolute filePath will not allow to restore file encryption after app update, but it's a short window fId <- withStore c $ \db -> createSndFile db g userId file numRecipients relPrefixPath key nonce - addXFTPSndWorker c Nothing + void $ getXFTPSndWorker True c Nothing pure fId -addXFTPSndWorker :: AgentMonad m => AgentClient -> Maybe XFTPServer -> m () -addXFTPSndWorker c = addWorker c xftpSndWorkers runXFTPSndWorker runXFTPSndPrepareWorker +resumeXFTPSndWork :: AgentMonad' m => AgentClient -> Maybe XFTPServer -> m () +resumeXFTPSndWork = void .: getXFTPSndWorker False -runXFTPSndPrepareWorker :: forall m. AgentMonad m => AgentClient -> TMVar () -> m () -runXFTPSndPrepareWorker c doWork = do +getXFTPSndWorker :: AgentMonad' m => Bool -> AgentClient -> Maybe XFTPServer -> m Worker +getXFTPSndWorker hasWork c server = do + ws <- asks $ xftpSndWorkers . xftpAgent + getAgentWorker hasWork c server ws $ + maybe (runXFTPSndPrepareWorker c) (runXFTPSndWorker c) server + +runXFTPSndPrepareWorker :: forall m. AgentMonad m => AgentClient -> Worker -> m () +runXFTPSndPrepareWorker c Worker {doWork} = do cfg <- asks config forever $ do waitForWork doWork @@ -351,7 +341,7 @@ runXFTPSndPrepareWorker c doWork = do atomically $ assertAgentForeground c (replica, ProtoServerWithAuth srv _) <- tryCreate withStore' c $ \db -> createSndFileReplica db ch replica - addXFTPSndWorker c $ Just srv + void $ getXFTPSndWorker True c (Just srv) where tryCreate = do usedSrvs <- newTVarIO ([] :: [XFTPServer]) @@ -373,8 +363,8 @@ sndWorkerInternalError c sndFileId sndFileEntityId prefixPath internalErrStr = d withStore' c $ \db -> updateSndFileError db sndFileId internalErrStr notify c sndFileEntityId $ SFERR $ INTERNAL internalErrStr -runXFTPSndWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> TMVar () -> m () -runXFTPSndWorker c srv doWork = do +runXFTPSndWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> Worker -> m () +runXFTPSndWorker c srv Worker {doWork} = do cfg <- asks config forever $ do waitForWork doWork @@ -513,21 +503,19 @@ deleteSndFileRemote c userId sndFileEntityId (ValidFileDescription FileDescripti deleteFileChunk :: FileChunk -> m () deleteFileChunk FileChunk {digest, replicas = replica@FileChunkReplica {server} : _} = do withStore' c $ \db -> createDeletedSndChunkReplica db userId replica digest - addXFTPDelWorker c server + void $ getXFTPDelWorker True c server deleteFileChunk _ = pure () -addXFTPDelWorker :: AgentMonad m => AgentClient -> XFTPServer -> m () -addXFTPDelWorker c srv = do - ws <- asks $ xftpDelWorkers . xftpAgent - atomically (TM.lookup srv ws) >>= \case - Nothing -> do - doWork <- newTMVarIO () - worker <- async $ runXFTPDelWorker c srv doWork `agentFinally` atomically (TM.delete srv ws) - atomically $ TM.insert srv (doWork, worker) ws - Just (doWork, _) -> atomically $ hasWorkToDo' doWork +resumeXFTPDelWork :: AgentMonad' m => AgentClient -> XFTPServer -> m () +resumeXFTPDelWork = void .: getXFTPDelWorker False -runXFTPDelWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> TMVar () -> m () -runXFTPDelWorker c srv doWork = do +getXFTPDelWorker :: AgentMonad' m => Bool -> AgentClient -> XFTPServer -> m Worker +getXFTPDelWorker hasWork c server = do + ws <- asks $ xftpDelWorkers . xftpAgent + getAgentWorker hasWork c server ws $ runXFTPDelWorker c server + +runXFTPDelWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> Worker -> m () +runXFTPDelWorker c srv Worker {doWork} = do cfg <- asks config forever $ do waitForWork doWork diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index b438f0909..6ebb62a43 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -908,10 +908,10 @@ sendMessagesB c reqs = withConnLocks c connIds "sendMessages" $ do enqueueCommand :: AgentMonad m => AgentClient -> ACorrId -> ConnId -> Maybe SMPServer -> AgentCommand -> m () enqueueCommand c corrId connId server aCommand = do withStore c $ \db -> createCommand db corrId connId server aCommand - void $ getAsyncCmdWorker (\w -> hasWorkToDo w $> w) c server + void $ getAsyncCmdWorker True c server resumeSrvCmds :: forall m. AgentMonad' m => AgentClient -> Maybe SMPServer -> m () -resumeSrvCmds = void .: getAsyncCmdWorker pure +resumeSrvCmds = void .: getAsyncCmdWorker False resumeConnCmds :: forall m. AgentMonad m => AgentClient -> ConnId -> m () resumeConnCmds c connId = @@ -921,23 +921,12 @@ resumeConnCmds c connId = where connQueued = atomically $ isJust <$> TM.lookupInsert connId True (connCmdsQueued c) -getAsyncCmdWorker :: AgentMonad' m => (Worker -> STM Worker) -> AgentClient -> Maybe SMPServer -> m Worker -getAsyncCmdWorker whenExists c server = - atomically (getWorker >>= maybe createWorker whenExists) >>= \w -> runWorker w $> w - where - getWorker = TM.lookup server $ asyncCmdWorkers c - deleteWorker wId = mapM_ $ \w -> when (wId == workerId w) $ TM.delete server $ asyncCmdWorkers c - createWorker = do - w <- newWorker c - TM.insert server w $ asyncCmdWorkers c - pure w - runWorker w@Worker {workerId = wId, doWork} = - runWorkerAsync w . void . runExceptT $ - runCommandProcessing c server doWork - `agentFinally` atomically (getWorker >>= deleteWorker wId) +getAsyncCmdWorker :: AgentMonad' m => Bool -> AgentClient -> Maybe SMPServer -> m Worker +getAsyncCmdWorker hasWork c server = + getAgentWorker hasWork c server (asyncCmdWorkers c) (runCommandProcessing c server) -runCommandProcessing :: forall m. AgentMonad m => AgentClient -> Maybe SMPServer -> TMVar () -> m () -runCommandProcessing c@AgentClient {subQ} server_ doWork = do +runCommandProcessing :: forall m. AgentMonad m => AgentClient -> Maybe SMPServer -> Worker -> m () +runCommandProcessing c@AgentClient {subQ} server_ Worker {doWork} = do ri <- asks $ messageRetryInterval . config -- different retry interval? forever $ do atomically $ endAgentOperation c AOSndNetwork @@ -1126,32 +1115,23 @@ enqueueSavedMessageB c reqs = do in map (\sq -> createSndMsgDelivery db connId sq mId) sqs resumeMsgDelivery :: forall m. AgentMonad' m => AgentClient -> ConnData -> SndQueue -> m () -resumeMsgDelivery = void .:. getDeliveryWorker pure +resumeMsgDelivery = void .:. getDeliveryWorker False -getDeliveryWorker :: AgentMonad' m => ((Worker, TMVar ()) -> STM (Worker, TMVar ())) -> AgentClient -> ConnData -> SndQueue -> m (Worker, TMVar ()) -getDeliveryWorker whenExists c cData sq = do - atomically (getWorker >>= maybe createWorker whenExists) >>= \wl -> runWorker wl $> wl +getDeliveryWorker :: AgentMonad' m => Bool -> AgentClient -> ConnData -> SndQueue -> m (Worker, TMVar ()) +getDeliveryWorker hasWork c cData sq = + getAgentWorker' fst mkLock hasWork c (qAddress sq) (smpDeliveryWorkers c) (runSmpQueueMsgDelivery c cData sq) where - qAddr = qAddress sq - getWorker = TM.lookup qAddr $ smpDeliveryWorkers c - deleteWorker wId = mapM_ $ \(w, _) -> when (wId == workerId w) $ TM.delete qAddr $ smpDeliveryWorkers c - createWorker = do + mkLock w = do retryLock <- newEmptyTMVar - wl <- (,retryLock) <$> newWorker c - TM.insert qAddr wl $ smpDeliveryWorkers c - pure wl - runWorker (w@Worker {workerId = wId, doWork}, retryLock) = - runWorkerAsync w . void . runExceptT $ - runSmpQueueMsgDelivery c cData sq doWork retryLock - `agentFinally` atomically (getWorker >>= deleteWorker wId) + pure (w, retryLock) submitPendingMsg :: AgentMonad' m => AgentClient -> ConnData -> SndQueue -> m () submitPendingMsg c cData sq = do atomically $ modifyTVar' (msgDeliveryOp c) $ \s -> s {opsInProgress = opsInProgress s + 1} - void $ getDeliveryWorker (\wl -> hasWorkToDo (fst wl) $> wl) c cData sq + void $ getDeliveryWorker True c cData sq -runSmpQueueMsgDelivery :: forall m. AgentMonad m => AgentClient -> ConnData -> SndQueue -> TMVar () -> TMVar () -> m () -runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {userId, connId, duplexHandshake} sq doWork qLock = do +runSmpQueueMsgDelivery :: forall m. AgentMonad m => AgentClient -> ConnData -> SndQueue -> (Worker, TMVar ()) -> m () +runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {userId, connId, duplexHandshake} sq (Worker {doWork}, qLock) = do ri <- asks $ messageRetryInterval . config forever $ do atomically $ endAgentOperation c AOSndNetwork diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 8453eaea8..a5acc6caf 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -85,8 +85,9 @@ module Simplex.Messaging.Agent.Client AgentState (..), AgentLocks (..), AgentStatsKey (..), - newWorker, - runWorkerAsync, + getAgentWorker, + getAgentWorker', + cancelWorker, waitForWork, hasWorkToDo, hasWorkToDo', @@ -272,7 +273,26 @@ data AgentClient = AgentClient agentEnv :: Env } -data Worker = Worker {workerId :: Int, doWork :: TMVar (), action :: TMVar (Maybe (Async ()))} +getAgentWorker :: (AgentMonad' m, Ord k) => Bool -> AgentClient -> k -> TMap k Worker -> (Worker -> ExceptT AgentErrorType m ()) -> m Worker +getAgentWorker = getAgentWorker' id pure + +getAgentWorker' :: (AgentMonad' m, Ord k) => (a -> Worker) -> (Worker -> STM a) -> Bool -> AgentClient -> k -> TMap k a -> (a -> ExceptT AgentErrorType m ()) -> m a +getAgentWorker' toW fromW hasWork c key ws work = do + atomically (getWorker >>= maybe createWorker whenExists) >>= \w -> runWorker w $> w + where + getWorker = TM.lookup key ws + deleteWorker wId = mapM_ $ \w -> when (wId == workerId (toW w)) $ TM.delete key ws + createWorker = do + w <- fromW =<< newWorker c + TM.insert key w ws + pure w + whenExists w + | hasWork = hasWorkToDo (toW w) $> w + | otherwise = pure w + runWorker w = do + let w'@Worker {workerId} = toW w + runWorkerAsync w' . void . runExceptT $ + work w `agentFinally` atomically (getWorker >>= deleteWorker workerId) newWorker :: AgentClient -> STM Worker newWorker c = do diff --git a/src/Simplex/Messaging/Agent/Env/SQLite.hs b/src/Simplex/Messaging/Agent/Env/SQLite.hs index 526948bfb..0efdf6c78 100644 --- a/src/Simplex/Messaging/Agent/Env/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Env/SQLite.hs @@ -27,6 +27,7 @@ module Simplex.Messaging.Agent.Env.SQLite NtfSupervisor (..), NtfSupervisorCommand (..), XFTPAgent (..), + Worker (..), ) where @@ -205,8 +206,8 @@ createAgentStore dbFilePath dbKey keepKey = createSQLiteStore dbFilePath dbKey k data NtfSupervisor = NtfSupervisor { ntfTkn :: TVar (Maybe NtfToken), ntfSubQ :: TBQueue (ConnId, NtfSupervisorCommand), - ntfWorkers :: TMap NtfServer (TMVar (), Async ()), - ntfSMPWorkers :: TMap SMPServer (TMVar (), Async ()) + ntfWorkers :: TMap NtfServer Worker, + ntfSMPWorkers :: TMap SMPServer Worker } data NtfSupervisorCommand = NSCCreate | NSCDelete | NSCSmpDelete | NSCNtfWorker NtfServer | NSCNtfSMPWorker SMPServer @@ -223,9 +224,9 @@ newNtfSubSupervisor qSize = do data XFTPAgent = XFTPAgent { -- if set, XFTP file paths will be considered as relative to this directory xftpWorkDir :: TVar (Maybe FilePath), - xftpRcvWorkers :: TMap (Maybe XFTPServer) (TMVar (), Async ()), - xftpSndWorkers :: TMap (Maybe XFTPServer) (TMVar (), Async ()), - xftpDelWorkers :: TMap XFTPServer (TMVar (), Async ()) + xftpRcvWorkers :: TMap (Maybe XFTPServer) Worker, + xftpSndWorkers :: TMap (Maybe XFTPServer) Worker, + xftpDelWorkers :: TMap XFTPServer Worker } newXFTPAgent :: STM XFTPAgent @@ -251,3 +252,9 @@ agentFinally = allFinally mkInternal mkInternal :: SomeException -> AgentErrorType mkInternal = INTERNAL . show {-# INLINE mkInternal #-} + +data Worker = Worker + { workerId :: Int, + doWork :: TMVar (), + action :: TMVar (Maybe (Async ())) + } diff --git a/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs b/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs index 10b8733dc..b02245993 100644 --- a/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs +++ b/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs @@ -37,9 +37,7 @@ import Simplex.Messaging.Client.Agent () import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Notifications.Protocol (NtfSubStatus (..), NtfTknStatus (..), SMPQueueNtf (..)) import Simplex.Messaging.Notifications.Types -import Simplex.Messaging.Protocol (NtfServer, ProtocolServer, SMPServer, sameSrvAddr) -import Simplex.Messaging.TMap (TMap) -import qualified Simplex.Messaging.TMap as TM +import Simplex.Messaging.Protocol (NtfServer, SMPServer, sameSrvAddr) import Simplex.Messaging.Util (diffToMicroseconds, threadDelay', tshow, unlessM) import System.Random (randomR) import UnliftIO @@ -79,11 +77,11 @@ processNtfSub c (connId, cmd) = do Just ClientNtfCreds {notifierId} -> do let newSub = newNtfSubscription connId smpServer (Just notifierId) ntfServer NASKey withStore c $ \db -> createNtfSubscription db newSub $ NtfSubNTFAction NSACreate - addNtfNTFWorker ntfServer + void $ getNtfNTFWorker True c ntfServer Nothing -> do let newSub = newNtfSubscription connId smpServer Nothing ntfServer NASNew withStore c $ \db -> createNtfSubscription db newSub $ NtfSubSMPAction NSASmpKey - addNtfSMPWorker smpServer + void $ getNtfSMPWorker True c smpServer (Just (sub@NtfSubscription {ntfSubStatus, ntfServer = subNtfServer, smpServer = smpServer', ntfQueueId}, action_)) -> do case (clientNtfCreds, ntfQueueId) of (Just ClientNtfCreds {notifierId}, Just ntfQueueId') @@ -103,59 +101,53 @@ processNtfSub c (connId, cmd) = do then resetSubscription else withNtfServer c $ \ntfServer -> do withStore' c $ \db -> supervisorUpdateNtfSub db sub {ntfServer} (NtfSubNTFAction NSACreate) - addNtfNTFWorker ntfServer + void $ getNtfNTFWorker True c ntfServer | otherwise -> case action of - NtfSubNTFAction _ -> addNtfNTFWorker subNtfServer - NtfSubSMPAction _ -> addNtfSMPWorker smpServer + NtfSubNTFAction _ -> void $ getNtfNTFWorker True c subNtfServer + NtfSubSMPAction _ -> void $ getNtfSMPWorker True c smpServer rotate :: m () rotate = do withStore' c $ \db -> supervisorUpdateNtfSub db sub (NtfSubNTFAction NSARotate) - addNtfNTFWorker subNtfServer + void $ getNtfNTFWorker True c subNtfServer resetSubscription :: m () resetSubscription = withNtfServer c $ \ntfServer -> do let sub' = sub {ntfQueueId = Nothing, ntfServer, ntfSubId = Nothing, ntfSubStatus = NASNew} withStore' c $ \db -> supervisorUpdateNtfSub db sub' (NtfSubSMPAction NSASmpKey) - addNtfSMPWorker smpServer + void $ getNtfSMPWorker True c smpServer NSCDelete -> do sub_ <- withStore' c $ \db -> do supervisorUpdateNtfAction db connId (NtfSubNTFAction NSADelete) getNtfSubscription db connId logInfo $ "processNtfSub, NSCDelete - sub_ = " <> tshow sub_ case sub_ of - (Just (NtfSubscription {ntfServer}, _)) -> addNtfNTFWorker ntfServer + (Just (NtfSubscription {ntfServer}, _)) -> void $ getNtfNTFWorker True c ntfServer _ -> pure () -- err "NSCDelete - no subscription" NSCSmpDelete -> do withStore' c (`getPrimaryRcvQueue` connId) >>= \case Right rq@RcvQueue {server = smpServer} -> do logInfo $ "processNtfSub, NSCSmpDelete - rq = " <> tshow rq withStore' c $ \db -> supervisorUpdateNtfAction db connId (NtfSubSMPAction NSASmpDelete) - addNtfSMPWorker smpServer + void $ getNtfSMPWorker True c smpServer _ -> notifyInternalError c connId "NSCSmpDelete - no rcv queue" - NSCNtfWorker ntfServer -> addNtfNTFWorker ntfServer - NSCNtfSMPWorker smpServer -> addNtfSMPWorker smpServer - where - addNtfNTFWorker = addWorker ntfWorkers runNtfWorker - addNtfSMPWorker = addWorker ntfSMPWorkers runNtfSMPWorker - addWorker :: - (NtfSupervisor -> TMap (ProtocolServer s) (TMVar (), Async ())) -> - (AgentClient -> ProtocolServer s -> TMVar () -> m ()) -> - ProtocolServer s -> - m () - addWorker wsSel runWorker srv = do - ws <- asks $ wsSel . ntfSupervisor - atomically (TM.lookup srv ws) >>= \case - Nothing -> do - doWork <- newTMVarIO () - worker <- async $ runWorker c srv doWork `agentFinally` atomically (TM.delete srv ws) - atomically $ TM.insert srv (doWork, worker) ws - Just (doWork, _) -> atomically $ hasWorkToDo' doWork + NSCNtfWorker ntfServer -> void $ getNtfNTFWorker True c ntfServer + NSCNtfSMPWorker smpServer -> void $ getNtfSMPWorker True c smpServer + +getNtfNTFWorker :: AgentMonad' m => Bool -> AgentClient -> NtfServer -> m Worker +getNtfNTFWorker hasWork c server = do + ws <- asks $ ntfWorkers . ntfSupervisor + getAgentWorker hasWork c server ws $ runNtfWorker c server + +getNtfSMPWorker :: AgentMonad' m => Bool -> AgentClient -> SMPServer -> m Worker +getNtfSMPWorker hasWork c server = do + ws <- asks $ ntfSMPWorkers . ntfSupervisor + getAgentWorker hasWork c server ws $ runNtfSMPWorker c server withNtfServer :: AgentMonad' m => AgentClient -> (NtfServer -> m ()) -> m () withNtfServer c action = getNtfServer c >>= mapM_ action -runNtfWorker :: forall m. AgentMonad m => AgentClient -> NtfServer -> TMVar () -> m () -runNtfWorker c srv doWork = do +runNtfWorker :: forall m. AgentMonad m => AgentClient -> NtfServer -> Worker -> m () +runNtfWorker c srv Worker {doWork} = do delay <- asks $ ntfWorkerDelay . config forever $ do waitForWork doWork @@ -236,8 +228,8 @@ runNtfWorker c srv doWork = do withStore' c $ \db -> updateNtfSubscription db sub {ntfSubStatus = toStatus} toAction actionTs' -runNtfSMPWorker :: forall m. AgentMonad m => AgentClient -> SMPServer -> TMVar () -> m () -runNtfSMPWorker c srv doWork = do +runNtfSMPWorker :: forall m. AgentMonad m => AgentClient -> SMPServer -> Worker -> m () +runNtfSMPWorker c srv Worker {doWork} = do delay <- asks $ ntfSMPWorkerDelay . config forever $ do waitForWork doWork @@ -336,13 +328,10 @@ instantNotifications = \case closeNtfSupervisor :: MonadUnliftIO m => NtfSupervisor -> m () closeNtfSupervisor ns = do - cancelNtfWorkers_ $ ntfWorkers ns - cancelNtfWorkers_ $ ntfSMPWorkers ns - -cancelNtfWorkers_ :: MonadUnliftIO m => TMap (ProtocolServer s) (TMVar (), Async ()) -> m () -cancelNtfWorkers_ wsVar = do - ws <- atomically $ stateTVar wsVar (,M.empty) - mapM_ (uninterruptibleCancel . snd) ws + stopWorkers $ ntfWorkers ns + stopWorkers $ ntfSMPWorkers ns + where + stopWorkers workers = atomically (swapTVar workers M.empty) >>= mapM_ (liftIO . cancelWorker) getNtfServer :: AgentMonad' m => AgentClient -> m (Maybe NtfServer) getNtfServer c = do