xftp, ntf: refactor, reuse workers abstractions (#935)

* xftp: refactor, reuse workers abstractions (wip, test fails)

* rename

* refactor (tests pass)

* refactor2

* linebreaks

* do

* reuse worker abstractions for notifications

---------

Co-authored-by: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com>
This commit is contained in:
spaced4ndy
2023-12-27 13:02:08 +04:00
committed by GitHub
parent e43e4860b9
commit 22e1932372
5 changed files with 130 additions and 146 deletions

View File

@@ -7,7 +7,6 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}
@@ -31,6 +30,7 @@ import Control.Monad.Except
import Control.Monad.Reader
import qualified Data.ByteString.Char8 as B
import qualified Data.ByteString.Lazy.Char8 as LB
import Data.Composition ((.:))
import Data.Int (Int64)
import Data.List (foldl', sortOn)
import qualified Data.List.NonEmpty as L
@@ -58,8 +58,6 @@ import qualified Simplex.Messaging.Crypto.File as CF
import qualified Simplex.Messaging.Crypto.Lazy as LC
import Simplex.Messaging.Encoding
import Simplex.Messaging.Protocol (EntityId, XFTPServer)
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Util (liftError, tshow, unlessM, whenM)
import System.FilePath (takeFileName, (</>))
import UnliftIO
@@ -76,28 +74,27 @@ startXFTPWorkers c workDir = do
where
startRcvFiles AgentConfig {rcvFilesTTL} = do
pendingRcvServers <- withStore' c (`getPendingRcvFilesServers` rcvFilesTTL)
forM_ pendingRcvServers $ \s -> addXFTPRcvWorker c (Just s)
forM_ pendingRcvServers $ \s -> resumeXFTPRcvWork c (Just s)
-- start local worker for files pending decryption,
-- no need to make an extra query for the check
-- as the worker will check the store anyway
addXFTPRcvWorker c Nothing
resumeXFTPRcvWork c Nothing
startSndFiles AgentConfig {sndFilesTTL} = do
-- start worker for files pending encryption/creation
addXFTPSndWorker c Nothing
resumeXFTPSndWork c Nothing
pendingSndServers <- withStore' c (`getPendingSndFilesServers` sndFilesTTL)
forM_ pendingSndServers $ \s -> addXFTPSndWorker c (Just s)
forM_ pendingSndServers $ \s -> resumeXFTPSndWork c (Just s)
startDelFiles AgentConfig {rcvFilesTTL} = do
pendingDelServers <- withStore' c (`getPendingDelFilesServers` rcvFilesTTL)
forM_ pendingDelServers $ addXFTPDelWorker c
forM_ pendingDelServers $ resumeXFTPDelWork c
closeXFTPAgent :: MonadUnliftIO m => XFTPAgent -> m ()
closeXFTPAgent XFTPAgent {xftpRcvWorkers, xftpSndWorkers} = do
stopWorkers xftpRcvWorkers
stopWorkers xftpSndWorkers
closeXFTPAgent a = do
stopWorkers $ xftpRcvWorkers a
stopWorkers $ xftpSndWorkers a
stopWorkers $ xftpDelWorkers a
where
stopWorkers wsSel = do
ws <- atomically $ stateTVar wsSel (,M.empty)
mapM_ (uninterruptibleCancel . snd) ws
stopWorkers workers = atomically (swapTVar workers M.empty) >>= mapM_ (liftIO . cancelWorker)
xftpReceiveFile' :: AgentMonad m => AgentClient -> UserId -> ValidFileDescription 'FRecipient -> Maybe CryptoFileArgs -> m RcvFileId
xftpReceiveFile' c userId (ValidFileDescription fd@FileDescription {chunks}) cfArgs = do
@@ -116,7 +113,7 @@ xftpReceiveFile' c userId (ValidFileDescription fd@FileDescription {chunks}) cfA
where
downloadChunk :: AgentMonad m => FileChunk -> m ()
downloadChunk FileChunk {replicas = (FileChunkReplica {server} : _)} = do
addXFTPRcvWorker c (Just server)
void $ getXFTPRcvWorker True c (Just server)
downloadChunk _ = throwError $ INTERNAL "no replicas"
getPrefixPath :: AgentMonad m => String -> m FilePath
@@ -132,31 +129,17 @@ toFSFilePath f = (</> f) <$> getXFTPWorkPath
createEmptyFile :: AgentMonad m => FilePath -> m ()
createEmptyFile fPath = liftIO $ B.writeFile fPath ""
addXFTPRcvWorker :: AgentMonad m => AgentClient -> Maybe XFTPServer -> m ()
addXFTPRcvWorker c = addWorker c xftpRcvWorkers runXFTPRcvWorker runXFTPRcvLocalWorker
resumeXFTPRcvWork :: AgentMonad' m => AgentClient -> Maybe XFTPServer -> m ()
resumeXFTPRcvWork = void .: getXFTPRcvWorker False
addWorker ::
AgentMonad m =>
AgentClient ->
(XFTPAgent -> TMap (Maybe XFTPServer) (TMVar (), Async ())) ->
(AgentClient -> XFTPServer -> TMVar () -> m ()) ->
(AgentClient -> TMVar () -> m ()) ->
Maybe XFTPServer ->
m ()
addWorker c wsSel runWorker runWorkerNoSrv srv_ = do
ws <- asks $ wsSel . xftpAgent
atomically (TM.lookup srv_ ws) >>= \case
Nothing -> do
doWork <- newTMVarIO ()
let runWorker' = case srv_ of
Just srv -> runWorker c srv doWork
Nothing -> runWorkerNoSrv c doWork
worker <- async $ runWorker' `agentFinally` atomically (TM.delete srv_ ws)
atomically $ TM.insert srv_ (doWork, worker) ws
Just (doWork, _) -> atomically $ hasWorkToDo' doWork
getXFTPRcvWorker :: AgentMonad' m => Bool -> AgentClient -> Maybe XFTPServer -> m Worker
getXFTPRcvWorker hasWork c server = do
ws <- asks $ xftpRcvWorkers . xftpAgent
getAgentWorker hasWork c server ws $
maybe (runXFTPRcvLocalWorker c) (runXFTPRcvWorker c) server
runXFTPRcvWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> TMVar () -> m ()
runXFTPRcvWorker c srv doWork = do
runXFTPRcvWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> Worker -> m ()
runXFTPRcvWorker c srv Worker {doWork} = do
cfg <- asks config
forever $ do
waitForWork doWork
@@ -197,7 +180,8 @@ runXFTPRcvWorker c srv doWork = do
liftIO . when complete $ updateRcvFileStatus db rcvFileId RFSReceived
pure (complete, RFPROG rcvd total)
notify c rcvFileEntityId progress
when complete $ addXFTPRcvWorker c Nothing
when complete . void $
getXFTPRcvWorker True c Nothing
where
receivedSize :: [RcvFileChunk] -> Int64
receivedSize = foldl' (\sz ch -> sz + receivedChunkSize ch) 0
@@ -225,8 +209,8 @@ rcvWorkerInternalError c rcvFileId rcvFileEntityId tmpPath internalErrStr = do
withStore' c $ \db -> updateRcvFileError db rcvFileId internalErrStr
notify c rcvFileEntityId $ RFERR $ INTERNAL internalErrStr
runXFTPRcvLocalWorker :: forall m. AgentMonad m => AgentClient -> TMVar () -> m ()
runXFTPRcvLocalWorker c doWork = do
runXFTPRcvLocalWorker :: forall m. AgentMonad m => AgentClient -> Worker -> m ()
runXFTPRcvLocalWorker c Worker {doWork} = do
cfg <- asks config
forever $ do
waitForWork doWork
@@ -285,14 +269,20 @@ xftpSendFile' c userId file numRecipients = do
nonce <- atomically $ C.randomCbNonce g
-- saving absolute filePath will not allow to restore file encryption after app update, but it's a short window
fId <- withStore c $ \db -> createSndFile db g userId file numRecipients relPrefixPath key nonce
addXFTPSndWorker c Nothing
void $ getXFTPSndWorker True c Nothing
pure fId
addXFTPSndWorker :: AgentMonad m => AgentClient -> Maybe XFTPServer -> m ()
addXFTPSndWorker c = addWorker c xftpSndWorkers runXFTPSndWorker runXFTPSndPrepareWorker
resumeXFTPSndWork :: AgentMonad' m => AgentClient -> Maybe XFTPServer -> m ()
resumeXFTPSndWork = void .: getXFTPSndWorker False
runXFTPSndPrepareWorker :: forall m. AgentMonad m => AgentClient -> TMVar () -> m ()
runXFTPSndPrepareWorker c doWork = do
getXFTPSndWorker :: AgentMonad' m => Bool -> AgentClient -> Maybe XFTPServer -> m Worker
getXFTPSndWorker hasWork c server = do
ws <- asks $ xftpSndWorkers . xftpAgent
getAgentWorker hasWork c server ws $
maybe (runXFTPSndPrepareWorker c) (runXFTPSndWorker c) server
runXFTPSndPrepareWorker :: forall m. AgentMonad m => AgentClient -> Worker -> m ()
runXFTPSndPrepareWorker c Worker {doWork} = do
cfg <- asks config
forever $ do
waitForWork doWork
@@ -351,7 +341,7 @@ runXFTPSndPrepareWorker c doWork = do
atomically $ assertAgentForeground c
(replica, ProtoServerWithAuth srv _) <- tryCreate
withStore' c $ \db -> createSndFileReplica db ch replica
addXFTPSndWorker c $ Just srv
void $ getXFTPSndWorker True c (Just srv)
where
tryCreate = do
usedSrvs <- newTVarIO ([] :: [XFTPServer])
@@ -373,8 +363,8 @@ sndWorkerInternalError c sndFileId sndFileEntityId prefixPath internalErrStr = d
withStore' c $ \db -> updateSndFileError db sndFileId internalErrStr
notify c sndFileEntityId $ SFERR $ INTERNAL internalErrStr
runXFTPSndWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> TMVar () -> m ()
runXFTPSndWorker c srv doWork = do
runXFTPSndWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> Worker -> m ()
runXFTPSndWorker c srv Worker {doWork} = do
cfg <- asks config
forever $ do
waitForWork doWork
@@ -513,21 +503,19 @@ deleteSndFileRemote c userId sndFileEntityId (ValidFileDescription FileDescripti
deleteFileChunk :: FileChunk -> m ()
deleteFileChunk FileChunk {digest, replicas = replica@FileChunkReplica {server} : _} = do
withStore' c $ \db -> createDeletedSndChunkReplica db userId replica digest
addXFTPDelWorker c server
void $ getXFTPDelWorker True c server
deleteFileChunk _ = pure ()
addXFTPDelWorker :: AgentMonad m => AgentClient -> XFTPServer -> m ()
addXFTPDelWorker c srv = do
ws <- asks $ xftpDelWorkers . xftpAgent
atomically (TM.lookup srv ws) >>= \case
Nothing -> do
doWork <- newTMVarIO ()
worker <- async $ runXFTPDelWorker c srv doWork `agentFinally` atomically (TM.delete srv ws)
atomically $ TM.insert srv (doWork, worker) ws
Just (doWork, _) -> atomically $ hasWorkToDo' doWork
resumeXFTPDelWork :: AgentMonad' m => AgentClient -> XFTPServer -> m ()
resumeXFTPDelWork = void .: getXFTPDelWorker False
runXFTPDelWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> TMVar () -> m ()
runXFTPDelWorker c srv doWork = do
getXFTPDelWorker :: AgentMonad' m => Bool -> AgentClient -> XFTPServer -> m Worker
getXFTPDelWorker hasWork c server = do
ws <- asks $ xftpDelWorkers . xftpAgent
getAgentWorker hasWork c server ws $ runXFTPDelWorker c server
runXFTPDelWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> Worker -> m ()
runXFTPDelWorker c srv Worker {doWork} = do
cfg <- asks config
forever $ do
waitForWork doWork

View File

@@ -908,10 +908,10 @@ sendMessagesB c reqs = withConnLocks c connIds "sendMessages" $ do
enqueueCommand :: AgentMonad m => AgentClient -> ACorrId -> ConnId -> Maybe SMPServer -> AgentCommand -> m ()
enqueueCommand c corrId connId server aCommand = do
withStore c $ \db -> createCommand db corrId connId server aCommand
void $ getAsyncCmdWorker (\w -> hasWorkToDo w $> w) c server
void $ getAsyncCmdWorker True c server
resumeSrvCmds :: forall m. AgentMonad' m => AgentClient -> Maybe SMPServer -> m ()
resumeSrvCmds = void .: getAsyncCmdWorker pure
resumeSrvCmds = void .: getAsyncCmdWorker False
resumeConnCmds :: forall m. AgentMonad m => AgentClient -> ConnId -> m ()
resumeConnCmds c connId =
@@ -921,23 +921,12 @@ resumeConnCmds c connId =
where
connQueued = atomically $ isJust <$> TM.lookupInsert connId True (connCmdsQueued c)
getAsyncCmdWorker :: AgentMonad' m => (Worker -> STM Worker) -> AgentClient -> Maybe SMPServer -> m Worker
getAsyncCmdWorker whenExists c server =
atomically (getWorker >>= maybe createWorker whenExists) >>= \w -> runWorker w $> w
where
getWorker = TM.lookup server $ asyncCmdWorkers c
deleteWorker wId = mapM_ $ \w -> when (wId == workerId w) $ TM.delete server $ asyncCmdWorkers c
createWorker = do
w <- newWorker c
TM.insert server w $ asyncCmdWorkers c
pure w
runWorker w@Worker {workerId = wId, doWork} =
runWorkerAsync w . void . runExceptT $
runCommandProcessing c server doWork
`agentFinally` atomically (getWorker >>= deleteWorker wId)
getAsyncCmdWorker :: AgentMonad' m => Bool -> AgentClient -> Maybe SMPServer -> m Worker
getAsyncCmdWorker hasWork c server =
getAgentWorker hasWork c server (asyncCmdWorkers c) (runCommandProcessing c server)
runCommandProcessing :: forall m. AgentMonad m => AgentClient -> Maybe SMPServer -> TMVar () -> m ()
runCommandProcessing c@AgentClient {subQ} server_ doWork = do
runCommandProcessing :: forall m. AgentMonad m => AgentClient -> Maybe SMPServer -> Worker -> m ()
runCommandProcessing c@AgentClient {subQ} server_ Worker {doWork} = do
ri <- asks $ messageRetryInterval . config -- different retry interval?
forever $ do
atomically $ endAgentOperation c AOSndNetwork
@@ -1126,32 +1115,23 @@ enqueueSavedMessageB c reqs = do
in map (\sq -> createSndMsgDelivery db connId sq mId) sqs
resumeMsgDelivery :: forall m. AgentMonad' m => AgentClient -> ConnData -> SndQueue -> m ()
resumeMsgDelivery = void .:. getDeliveryWorker pure
resumeMsgDelivery = void .:. getDeliveryWorker False
getDeliveryWorker :: AgentMonad' m => ((Worker, TMVar ()) -> STM (Worker, TMVar ())) -> AgentClient -> ConnData -> SndQueue -> m (Worker, TMVar ())
getDeliveryWorker whenExists c cData sq = do
atomically (getWorker >>= maybe createWorker whenExists) >>= \wl -> runWorker wl $> wl
getDeliveryWorker :: AgentMonad' m => Bool -> AgentClient -> ConnData -> SndQueue -> m (Worker, TMVar ())
getDeliveryWorker hasWork c cData sq =
getAgentWorker' fst mkLock hasWork c (qAddress sq) (smpDeliveryWorkers c) (runSmpQueueMsgDelivery c cData sq)
where
qAddr = qAddress sq
getWorker = TM.lookup qAddr $ smpDeliveryWorkers c
deleteWorker wId = mapM_ $ \(w, _) -> when (wId == workerId w) $ TM.delete qAddr $ smpDeliveryWorkers c
createWorker = do
mkLock w = do
retryLock <- newEmptyTMVar
wl <- (,retryLock) <$> newWorker c
TM.insert qAddr wl $ smpDeliveryWorkers c
pure wl
runWorker (w@Worker {workerId = wId, doWork}, retryLock) =
runWorkerAsync w . void . runExceptT $
runSmpQueueMsgDelivery c cData sq doWork retryLock
`agentFinally` atomically (getWorker >>= deleteWorker wId)
pure (w, retryLock)
submitPendingMsg :: AgentMonad' m => AgentClient -> ConnData -> SndQueue -> m ()
submitPendingMsg c cData sq = do
atomically $ modifyTVar' (msgDeliveryOp c) $ \s -> s {opsInProgress = opsInProgress s + 1}
void $ getDeliveryWorker (\wl -> hasWorkToDo (fst wl) $> wl) c cData sq
void $ getDeliveryWorker True c cData sq
runSmpQueueMsgDelivery :: forall m. AgentMonad m => AgentClient -> ConnData -> SndQueue -> TMVar () -> TMVar () -> m ()
runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {userId, connId, duplexHandshake} sq doWork qLock = do
runSmpQueueMsgDelivery :: forall m. AgentMonad m => AgentClient -> ConnData -> SndQueue -> (Worker, TMVar ()) -> m ()
runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {userId, connId, duplexHandshake} sq (Worker {doWork}, qLock) = do
ri <- asks $ messageRetryInterval . config
forever $ do
atomically $ endAgentOperation c AOSndNetwork

View File

@@ -85,8 +85,9 @@ module Simplex.Messaging.Agent.Client
AgentState (..),
AgentLocks (..),
AgentStatsKey (..),
newWorker,
runWorkerAsync,
getAgentWorker,
getAgentWorker',
cancelWorker,
waitForWork,
hasWorkToDo,
hasWorkToDo',
@@ -272,7 +273,26 @@ data AgentClient = AgentClient
agentEnv :: Env
}
data Worker = Worker {workerId :: Int, doWork :: TMVar (), action :: TMVar (Maybe (Async ()))}
getAgentWorker :: (AgentMonad' m, Ord k) => Bool -> AgentClient -> k -> TMap k Worker -> (Worker -> ExceptT AgentErrorType m ()) -> m Worker
getAgentWorker = getAgentWorker' id pure
getAgentWorker' :: (AgentMonad' m, Ord k) => (a -> Worker) -> (Worker -> STM a) -> Bool -> AgentClient -> k -> TMap k a -> (a -> ExceptT AgentErrorType m ()) -> m a
getAgentWorker' toW fromW hasWork c key ws work = do
atomically (getWorker >>= maybe createWorker whenExists) >>= \w -> runWorker w $> w
where
getWorker = TM.lookup key ws
deleteWorker wId = mapM_ $ \w -> when (wId == workerId (toW w)) $ TM.delete key ws
createWorker = do
w <- fromW =<< newWorker c
TM.insert key w ws
pure w
whenExists w
| hasWork = hasWorkToDo (toW w) $> w
| otherwise = pure w
runWorker w = do
let w'@Worker {workerId} = toW w
runWorkerAsync w' . void . runExceptT $
work w `agentFinally` atomically (getWorker >>= deleteWorker workerId)
newWorker :: AgentClient -> STM Worker
newWorker c = do

View File

@@ -27,6 +27,7 @@ module Simplex.Messaging.Agent.Env.SQLite
NtfSupervisor (..),
NtfSupervisorCommand (..),
XFTPAgent (..),
Worker (..),
)
where
@@ -205,8 +206,8 @@ createAgentStore dbFilePath dbKey keepKey = createSQLiteStore dbFilePath dbKey k
data NtfSupervisor = NtfSupervisor
{ ntfTkn :: TVar (Maybe NtfToken),
ntfSubQ :: TBQueue (ConnId, NtfSupervisorCommand),
ntfWorkers :: TMap NtfServer (TMVar (), Async ()),
ntfSMPWorkers :: TMap SMPServer (TMVar (), Async ())
ntfWorkers :: TMap NtfServer Worker,
ntfSMPWorkers :: TMap SMPServer Worker
}
data NtfSupervisorCommand = NSCCreate | NSCDelete | NSCSmpDelete | NSCNtfWorker NtfServer | NSCNtfSMPWorker SMPServer
@@ -223,9 +224,9 @@ newNtfSubSupervisor qSize = do
data XFTPAgent = XFTPAgent
{ -- if set, XFTP file paths will be considered as relative to this directory
xftpWorkDir :: TVar (Maybe FilePath),
xftpRcvWorkers :: TMap (Maybe XFTPServer) (TMVar (), Async ()),
xftpSndWorkers :: TMap (Maybe XFTPServer) (TMVar (), Async ()),
xftpDelWorkers :: TMap XFTPServer (TMVar (), Async ())
xftpRcvWorkers :: TMap (Maybe XFTPServer) Worker,
xftpSndWorkers :: TMap (Maybe XFTPServer) Worker,
xftpDelWorkers :: TMap XFTPServer Worker
}
newXFTPAgent :: STM XFTPAgent
@@ -251,3 +252,9 @@ agentFinally = allFinally mkInternal
mkInternal :: SomeException -> AgentErrorType
mkInternal = INTERNAL . show
{-# INLINE mkInternal #-}
data Worker = Worker
{ workerId :: Int,
doWork :: TMVar (),
action :: TMVar (Maybe (Async ()))
}

View File

@@ -37,9 +37,7 @@ import Simplex.Messaging.Client.Agent ()
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Notifications.Protocol (NtfSubStatus (..), NtfTknStatus (..), SMPQueueNtf (..))
import Simplex.Messaging.Notifications.Types
import Simplex.Messaging.Protocol (NtfServer, ProtocolServer, SMPServer, sameSrvAddr)
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Protocol (NtfServer, SMPServer, sameSrvAddr)
import Simplex.Messaging.Util (diffToMicroseconds, threadDelay', tshow, unlessM)
import System.Random (randomR)
import UnliftIO
@@ -79,11 +77,11 @@ processNtfSub c (connId, cmd) = do
Just ClientNtfCreds {notifierId} -> do
let newSub = newNtfSubscription connId smpServer (Just notifierId) ntfServer NASKey
withStore c $ \db -> createNtfSubscription db newSub $ NtfSubNTFAction NSACreate
addNtfNTFWorker ntfServer
void $ getNtfNTFWorker True c ntfServer
Nothing -> do
let newSub = newNtfSubscription connId smpServer Nothing ntfServer NASNew
withStore c $ \db -> createNtfSubscription db newSub $ NtfSubSMPAction NSASmpKey
addNtfSMPWorker smpServer
void $ getNtfSMPWorker True c smpServer
(Just (sub@NtfSubscription {ntfSubStatus, ntfServer = subNtfServer, smpServer = smpServer', ntfQueueId}, action_)) -> do
case (clientNtfCreds, ntfQueueId) of
(Just ClientNtfCreds {notifierId}, Just ntfQueueId')
@@ -103,59 +101,53 @@ processNtfSub c (connId, cmd) = do
then resetSubscription
else withNtfServer c $ \ntfServer -> do
withStore' c $ \db -> supervisorUpdateNtfSub db sub {ntfServer} (NtfSubNTFAction NSACreate)
addNtfNTFWorker ntfServer
void $ getNtfNTFWorker True c ntfServer
| otherwise -> case action of
NtfSubNTFAction _ -> addNtfNTFWorker subNtfServer
NtfSubSMPAction _ -> addNtfSMPWorker smpServer
NtfSubNTFAction _ -> void $ getNtfNTFWorker True c subNtfServer
NtfSubSMPAction _ -> void $ getNtfSMPWorker True c smpServer
rotate :: m ()
rotate = do
withStore' c $ \db -> supervisorUpdateNtfSub db sub (NtfSubNTFAction NSARotate)
addNtfNTFWorker subNtfServer
void $ getNtfNTFWorker True c subNtfServer
resetSubscription :: m ()
resetSubscription =
withNtfServer c $ \ntfServer -> do
let sub' = sub {ntfQueueId = Nothing, ntfServer, ntfSubId = Nothing, ntfSubStatus = NASNew}
withStore' c $ \db -> supervisorUpdateNtfSub db sub' (NtfSubSMPAction NSASmpKey)
addNtfSMPWorker smpServer
void $ getNtfSMPWorker True c smpServer
NSCDelete -> do
sub_ <- withStore' c $ \db -> do
supervisorUpdateNtfAction db connId (NtfSubNTFAction NSADelete)
getNtfSubscription db connId
logInfo $ "processNtfSub, NSCDelete - sub_ = " <> tshow sub_
case sub_ of
(Just (NtfSubscription {ntfServer}, _)) -> addNtfNTFWorker ntfServer
(Just (NtfSubscription {ntfServer}, _)) -> void $ getNtfNTFWorker True c ntfServer
_ -> pure () -- err "NSCDelete - no subscription"
NSCSmpDelete -> do
withStore' c (`getPrimaryRcvQueue` connId) >>= \case
Right rq@RcvQueue {server = smpServer} -> do
logInfo $ "processNtfSub, NSCSmpDelete - rq = " <> tshow rq
withStore' c $ \db -> supervisorUpdateNtfAction db connId (NtfSubSMPAction NSASmpDelete)
addNtfSMPWorker smpServer
void $ getNtfSMPWorker True c smpServer
_ -> notifyInternalError c connId "NSCSmpDelete - no rcv queue"
NSCNtfWorker ntfServer -> addNtfNTFWorker ntfServer
NSCNtfSMPWorker smpServer -> addNtfSMPWorker smpServer
where
addNtfNTFWorker = addWorker ntfWorkers runNtfWorker
addNtfSMPWorker = addWorker ntfSMPWorkers runNtfSMPWorker
addWorker ::
(NtfSupervisor -> TMap (ProtocolServer s) (TMVar (), Async ())) ->
(AgentClient -> ProtocolServer s -> TMVar () -> m ()) ->
ProtocolServer s ->
m ()
addWorker wsSel runWorker srv = do
ws <- asks $ wsSel . ntfSupervisor
atomically (TM.lookup srv ws) >>= \case
Nothing -> do
doWork <- newTMVarIO ()
worker <- async $ runWorker c srv doWork `agentFinally` atomically (TM.delete srv ws)
atomically $ TM.insert srv (doWork, worker) ws
Just (doWork, _) -> atomically $ hasWorkToDo' doWork
NSCNtfWorker ntfServer -> void $ getNtfNTFWorker True c ntfServer
NSCNtfSMPWorker smpServer -> void $ getNtfSMPWorker True c smpServer
getNtfNTFWorker :: AgentMonad' m => Bool -> AgentClient -> NtfServer -> m Worker
getNtfNTFWorker hasWork c server = do
ws <- asks $ ntfWorkers . ntfSupervisor
getAgentWorker hasWork c server ws $ runNtfWorker c server
getNtfSMPWorker :: AgentMonad' m => Bool -> AgentClient -> SMPServer -> m Worker
getNtfSMPWorker hasWork c server = do
ws <- asks $ ntfSMPWorkers . ntfSupervisor
getAgentWorker hasWork c server ws $ runNtfSMPWorker c server
withNtfServer :: AgentMonad' m => AgentClient -> (NtfServer -> m ()) -> m ()
withNtfServer c action = getNtfServer c >>= mapM_ action
runNtfWorker :: forall m. AgentMonad m => AgentClient -> NtfServer -> TMVar () -> m ()
runNtfWorker c srv doWork = do
runNtfWorker :: forall m. AgentMonad m => AgentClient -> NtfServer -> Worker -> m ()
runNtfWorker c srv Worker {doWork} = do
delay <- asks $ ntfWorkerDelay . config
forever $ do
waitForWork doWork
@@ -236,8 +228,8 @@ runNtfWorker c srv doWork = do
withStore' c $ \db ->
updateNtfSubscription db sub {ntfSubStatus = toStatus} toAction actionTs'
runNtfSMPWorker :: forall m. AgentMonad m => AgentClient -> SMPServer -> TMVar () -> m ()
runNtfSMPWorker c srv doWork = do
runNtfSMPWorker :: forall m. AgentMonad m => AgentClient -> SMPServer -> Worker -> m ()
runNtfSMPWorker c srv Worker {doWork} = do
delay <- asks $ ntfSMPWorkerDelay . config
forever $ do
waitForWork doWork
@@ -336,13 +328,10 @@ instantNotifications = \case
closeNtfSupervisor :: MonadUnliftIO m => NtfSupervisor -> m ()
closeNtfSupervisor ns = do
cancelNtfWorkers_ $ ntfWorkers ns
cancelNtfWorkers_ $ ntfSMPWorkers ns
cancelNtfWorkers_ :: MonadUnliftIO m => TMap (ProtocolServer s) (TMVar (), Async ()) -> m ()
cancelNtfWorkers_ wsVar = do
ws <- atomically $ stateTVar wsVar (,M.empty)
mapM_ (uninterruptibleCancel . snd) ws
stopWorkers $ ntfWorkers ns
stopWorkers $ ntfSMPWorkers ns
where
stopWorkers workers = atomically (swapTVar workers M.empty) >>= mapM_ (liftIO . cancelWorker)
getNtfServer :: AgentMonad' m => AgentClient -> m (Maybe NtfServer)
getNtfServer c = do