From f9cd7e5416013803c899dda59110a362e853407d Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Fri, 1 Jul 2022 22:43:18 +0100 Subject: [PATCH] suspend ntf operations when agent is suspended (#453) * suspend ntf operations when agent is suspended * end and begin ntf operation on loop --- src/Simplex/Messaging/Agent.hs | 11 +- src/Simplex/Messaging/Agent/Client.hs | 22 ++-- src/Simplex/Messaging/Agent/Env/SQLite.hs | 6 +- .../Messaging/Agent/NtfSubSupervisor.hs | 100 ++++++++++-------- tests/SMPAgentClient.hs | 3 +- 5 files changed, 84 insertions(+), 58 deletions(-) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index f6b42d91a..bca2b3f30 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -811,6 +811,7 @@ activateAgent' c = atomically $ do activate sndNetworkOp activate msgDeliveryOp activate rcvNetworkOp + activate ntfNetworkOp where activate opSel = modifyTVar' (opSel c) $ \s -> s {opSuspended = False} @@ -819,6 +820,7 @@ suspendAgent' c@AgentClient {agentState = as} maxDelay = do state <- atomically $ do writeTVar as ASSuspending + suspendOperation c AONtfNetwork $ pure () suspendOperation c AORcvNetwork $ suspendOperation c AOMsgDelivery $ suspendSendingAndDatabase c @@ -842,12 +844,11 @@ getSMPServer c = do subscriber :: (MonadUnliftIO m, MonadReader Env m) => AgentClient -> m () subscriber c@AgentClient {msgQ} = forever $ do - atomically $ endAgentOperation c AORcvNetwork t <- atomically $ readTBQueue msgQ - atomically $ beginAgentOperation c AORcvNetwork - withAgentLock c (runExceptT $ processSMPTransmission c t) >>= \case - Left e -> liftIO $ print e - Right _ -> return () + agentOperationBracket c AORcvNetwork $ + withAgentLock c (runExceptT $ processSMPTransmission c t) >>= \case + Left e -> liftIO $ print e + Right _ -> return () processSMPTransmission :: forall m. AgentMonad m => AgentClient -> ServerTransmission BrokerMsg -> m () processSMPTransmission c@AgentClient {smpClients, subQ} (srv, sessId, rId, cmd) = diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 9dad6b6f9..78407769f 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -52,6 +52,7 @@ module Simplex.Messaging.Agent.Client AgentOperation (..), AgentOpState (..), AgentState (..), + agentOperationBracket, beginAgentOperation, endAgentOperation, suspendSendingAndDatabase, @@ -127,6 +128,7 @@ data AgentClient = AgentClient connMsgsQueued :: TMap ConnId Bool, smpQueueMsgQueues :: TMap (ConnId, SMPServer, SMP.SenderId) (TQueue InternalId), smpQueueMsgDeliveries :: TMap (ConnId, SMPServer, SMP.SenderId) (Async ()), + ntfNetworkOp :: TVar AgentOpState, rcvNetworkOp :: TVar AgentOpState, msgDeliveryOp :: TVar AgentOpState, sndNetworkOp :: TVar AgentOpState, @@ -140,11 +142,12 @@ data AgentClient = AgentClient lock :: TMVar () } -data AgentOperation = AORcvNetwork | AOMsgDelivery | AOSndNetwork | AODatabase +data AgentOperation = AONtfNetwork | AORcvNetwork | AOMsgDelivery | AOSndNetwork | AODatabase deriving (Eq, Show) agentOpSel :: AgentOperation -> (AgentClient -> TVar AgentOpState) agentOpSel = \case + AONtfNetwork -> ntfNetworkOp AORcvNetwork -> rcvNetworkOp AOMsgDelivery -> msgDeliveryOp AOSndNetwork -> sndNetworkOp @@ -172,6 +175,7 @@ newAgentClient InitialAgentServers {smp, ntf} agentEnv = do connMsgsQueued <- TM.empty smpQueueMsgQueues <- TM.empty smpQueueMsgDeliveries <- TM.empty + ntfNetworkOp <- newTVar $ AgentOpState False 0 rcvNetworkOp <- newTVar $ AgentOpState False 0 msgDeliveryOp <- newTVar $ AgentOpState False 0 sndNetworkOp <- newTVar $ AgentOpState False 0 @@ -182,7 +186,7 @@ newAgentClient InitialAgentServers {smp, ntf} agentEnv = do asyncClients <- newTVar [] clientId <- stateTVar (clientCounter agentEnv) $ \i -> let i' = i + 1 in (i', i') lock <- newTMVar () - return AgentClient {active, rcvQ, subQ, msgQ, smpServers, smpClients, ntfServers, ntfClients, subscrSrvrs, pendingSubscrSrvrs, subscrConns, connMsgsQueued, smpQueueMsgQueues, smpQueueMsgDeliveries, rcvNetworkOp, msgDeliveryOp, sndNetworkOp, databaseOp, agentState, getMsgLocks, reconnections, asyncClients, clientId, agentEnv, lock} + return AgentClient {active, rcvQ, subQ, msgQ, smpServers, smpClients, ntfServers, ntfClients, subscrSrvrs, pendingSubscrSrvrs, subscrConns, connMsgsQueued, smpQueueMsgQueues, smpQueueMsgDeliveries, ntfNetworkOp, rcvNetworkOp, msgDeliveryOp, sndNetworkOp, databaseOp, agentState, getMsgLocks, reconnections, asyncClients, clientId, agentEnv, lock} agentDbPath :: AgentClient -> FilePath agentDbPath AgentClient {agentEnv = Env {store = SQLiteStore {dbFilePath}}} = dbFilePath @@ -675,6 +679,7 @@ cryptoError = \case endAgentOperation :: AgentClient -> AgentOperation -> STM () endAgentOperation c op = endOperation c op $ case op of + AONtfNetwork -> pure () AORcvNetwork -> suspendOperation c AOMsgDelivery $ suspendSendingAndDatabase c @@ -724,16 +729,21 @@ beginAgentOperation c op = do -- unsafeIOToSTM $ putStrLn $ "beginOperation! " <> show op <> " " <> show (opsInProgress s + 1) writeTVar opVar $! s {opsInProgress = opsInProgress s + 1} +agentOperationBracket :: MonadUnliftIO m => AgentClient -> AgentOperation -> m a -> m a +agentOperationBracket c op action = + E.bracket + (atomically $ beginAgentOperation c op) + (\_ -> atomically $ endAgentOperation c op) + (\_ -> action) + withStore' :: AgentMonad m => AgentClient -> (DB.Connection -> IO a) -> m a withStore' c action = withStore c $ fmap Right . action withStore :: AgentMonad m => AgentClient -> (DB.Connection -> IO (Either StoreError a)) -> m a withStore c action = do st <- asks store - atomically $ beginAgentOperation c AODatabase - r <- liftIO $ withTransaction st action `E.catch` handleInternal - atomically $ endAgentOperation c AODatabase - liftEither $ first storeError r + liftEitherError storeError . agentOperationBracket c AODatabase $ + withTransaction st action `E.catch` handleInternal where handleInternal :: E.SomeException -> IO (Either StoreError a) handleInternal = pure . Left . SEInternal . bshow diff --git a/src/Simplex/Messaging/Agent/Env/SQLite.hs b/src/Simplex/Messaging/Agent/Env/SQLite.hs index ae31dc4b9..4af3c0411 100644 --- a/src/Simplex/Messaging/Agent/Env/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Env/SQLite.hs @@ -68,7 +68,8 @@ data AgentConfig = AgentConfig helloTimeout :: NominalDiffTime, resubscriptionConcurrency :: Int, ntfCron :: Word16, - ntfWorkerThrottle :: Int, + ntfWorkerDelay :: Int, + ntfSMPWorkerDelay :: Int, ntfSubCheckInterval :: NominalDiffTime, ntfMaxMessages :: Int, caCertificateFile :: FilePath, @@ -103,7 +104,8 @@ defaultAgentConfig = helloTimeout = 2 * nominalDay, resubscriptionConcurrency = 16, ntfCron = 20, -- minutes - ntfWorkerThrottle = 1000000, -- microseconds + ntfWorkerDelay = 100000, -- microseconds + ntfSMPWorkerDelay = 500000, -- microseconds ntfSubCheckInterval = nominalDay, ntfMaxMessages = 4, -- CA certificate private key is not needed for initialization diff --git a/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs b/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs index aa75dac01..96a9ec669 100644 --- a/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs +++ b/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs @@ -26,6 +26,7 @@ import Control.Monad.Reader import Data.Bifunctor (first) import Data.Fixed (Fixed (MkFixed), Pico) import qualified Data.Map.Strict as M +import Data.Text (Text) import Data.Time (UTCTime, addUTCTime, diffUTCTime, getCurrentTime, nominalDiffTimeToSeconds) import Simplex.Messaging.Agent.Client import Simplex.Messaging.Agent.Env.SQLite @@ -49,14 +50,17 @@ import qualified UnliftIO.Exception as E import UnliftIO.STM runNtfSupervisor :: (MonadUnliftIO m, MonadReader Env m) => AgentClient -> m () -runNtfSupervisor c = forever . handleError $ do +runNtfSupervisor c = do ns <- asks ntfSupervisor - cmd <- atomically . readTBQueue $ ntfSubQ ns - runExceptT (processNtfSub c cmd) >>= \case - Left e -> liftIO $ print e - Right _ -> return () + forever . handleError $ do + cmd <- atomically . readTBQueue $ ntfSubQ ns + agentOperationBracket c AONtfNetwork $ + runExceptT (processNtfSub c cmd) >>= \case + Left e -> liftIO $ print e + Right _ -> return () where - handleError = E.handle $ \(e :: E.SomeException) -> logError $ "runNtfSupervisor error " <> tshow e + handleError = E.handle $ \(e :: E.SomeException) -> do + logError $ "runNtfSupervisor error " <> tshow e processNtfSub :: forall m. AgentMonad m => AgentClient -> (ConnId, NtfSupervisorCommand) -> m () processNtfSub c@AgentClient {subQ} (connId, cmd) = do @@ -152,26 +156,24 @@ withNtfServer :: AgentMonad m => AgentClient -> (NtfServer -> m ()) -> m () withNtfServer c action = getNtfServer c >>= mapM_ action runNtfWorker :: forall m. AgentMonad m => AgentClient -> NtfServer -> TMVar () -> m () -runNtfWorker c srv doWork = forever $ do - void . atomically $ readTMVar doWork - nextSub_ <- withStore' c (`getNextNtfSubNTFAction` srv) - logInfo $ "runNtfWorker, nextSub_ " <> tshow nextSub_ - case nextSub_ of - Nothing -> noWorkToDo - Just a@(NtfSubscription {connId}, _, _) -> do - ri <- asks $ reconnectInterval . config - withRetryInterval ri $ \loop -> - processAction a - `catchError` ( \e -> do - logInfo $ "runNtfWorker, error " <> tshow e - case e of - BROKER NETWORK -> loop - BROKER TIMEOUT -> loop - _ -> ntfInternalError c connId (show e) - ) - throttle <- asks $ ntfWorkerThrottle . config - liftIO $ threadDelay throttle +runNtfWorker c srv doWork = do + delay <- asks $ ntfWorkerDelay . config + forever $ do + void . atomically $ readTMVar doWork + agentOperationBracket c AONtfNetwork runNtfOperation + threadDelay delay where + runNtfOperation :: m () + runNtfOperation = do + nextSub_ <- withStore' c (`getNextNtfSubNTFAction` srv) + logInfo $ "runNtfWorker, nextSub_ " <> tshow nextSub_ + case nextSub_ of + Nothing -> noWorkToDo + Just a@(NtfSubscription {connId}, _, _) -> do + ri <- asks $ reconnectInterval . config + withRetryInterval ri $ \loop -> + processAction a + `catchError` retryOnError c "NtfWorker" loop (ntfInternalError c connId . show) noWorkToDo = void . atomically $ tryTakeTMVar doWork processAction :: (NtfSubscription, NtfSubNTFAction, NtfActionTs) -> m () processAction (sub@NtfSubscription {connId, smpServer, ntfSubId}, action, actionTs) = do @@ -230,26 +232,23 @@ runNtfWorker c srv doWork = forever $ do updateNtfSubscription db sub {ntfSubStatus = toStatus} toAction actionTs' runNtfSMPWorker :: forall m. AgentMonad m => AgentClient -> SMPServer -> TMVar () -> m () -runNtfSMPWorker c srv doWork = forever $ do - void . atomically $ readTMVar doWork - nextSub_ <- withStore' c (`getNextNtfSubSMPAction` srv) - logInfo $ "runNtfSMPWorker, nextSub_ " <> tshow nextSub_ - case nextSub_ of - Nothing -> noWorkToDo - Just a@(NtfSubscription {connId}, _, _) -> do - ri <- asks $ reconnectInterval . config - withRetryInterval ri $ \loop -> - processAction a - `catchError` ( \e -> do - logInfo $ "runNtfSMPWorker, error " <> tshow e - case e of - BROKER NETWORK -> loop - BROKER TIMEOUT -> loop - _ -> ntfInternalError c connId (show e) - ) - throttle <- asks $ ntfWorkerThrottle . config - liftIO $ threadDelay throttle +runNtfSMPWorker c srv doWork = do + delay <- asks $ ntfSMPWorkerDelay . config + forever $ do + void . atomically $ readTMVar doWork + agentOperationBracket c AONtfNetwork runNtfSMPOperation + threadDelay delay where + runNtfSMPOperation = do + nextSub_ <- withStore' c (`getNextNtfSubSMPAction` srv) + logInfo $ "runNtfSMPWorker, nextSub_ " <> tshow nextSub_ + case nextSub_ of + Nothing -> noWorkToDo + Just a@(NtfSubscription {connId}, _, _) -> do + ri <- asks $ reconnectInterval . config + withRetryInterval ri $ \loop -> + processAction a + `catchError` retryOnError c "NtfSMPWorker" loop (ntfInternalError c connId . show) noWorkToDo = void . atomically $ tryTakeTMVar doWork processAction :: (NtfSubscription, NtfSubSMPAction, NtfActionTs) -> m () processAction (sub@NtfSubscription {connId, ntfServer}, smpAction, actionTs) = do @@ -294,6 +293,19 @@ fromPico (MkFixed i) = i diffInMicros :: UTCTime -> UTCTime -> Int diffInMicros a b = (`div` 1000000) . fromInteger . fromPico . nominalDiffTimeToSeconds $ diffUTCTime a b +retryOnError :: AgentMonad m => AgentClient -> Text -> m () -> (AgentErrorType -> m ()) -> AgentErrorType -> m () +retryOnError c name loop done e = do + logError $ name <> " error: " <> tshow e + case e of + BROKER NETWORK -> retryLoop + BROKER TIMEOUT -> retryLoop + _ -> done e + where + retryLoop = do + atomically $ endAgentOperation c AONtfNetwork + atomically $ beginAgentOperation c AONtfNetwork + loop + ntfInternalError :: AgentMonad m => AgentClient -> ConnId -> String -> m () ntfInternalError c@AgentClient {subQ} connId internalErrStr = do withStore' c $ \db -> setNullNtfSubscriptionAction db connId diff --git a/tests/SMPAgentClient.hs b/tests/SMPAgentClient.hs index fd3d5b980..08cc3ea37 100644 --- a/tests/SMPAgentClient.hs +++ b/tests/SMPAgentClient.hs @@ -183,7 +183,8 @@ agentCfg = defaultTransport = (ntfTestPort, transport @TLS) }, reconnectInterval = defaultReconnectInterval {initialInterval = 50_000}, - ntfWorkerThrottle = 1000, + ntfWorkerDelay = 1000, + ntfSMPWorkerDelay = 1000, caCertificateFile = "tests/fixtures/ca.crt", privateKeyFile = "tests/fixtures/server.key", certificateFile = "tests/fixtures/server.crt"