From 14cb88e725645f4f32cea09e3efdb69e3cbee11f Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Wed, 18 Jan 2023 18:22:17 +0000 Subject: [PATCH] remove completed async action handles from memory (#614) * remove completed async action handles from memory * name Co-authored-by: JRoberts <8711996+jr-simplex@users.noreply.github.com> Co-authored-by: JRoberts <8711996+jr-simplex@users.noreply.github.com> --- simplexmq.cabal | 1 + src/Simplex/Messaging/Agent/Client.hs | 30 ++++++++++++-------------- src/Simplex/Messaging/Agent/TAsyncs.hs | 25 +++++++++++++++++++++ 3 files changed, 40 insertions(+), 16 deletions(-) create mode 100644 src/Simplex/Messaging/Agent/TAsyncs.hs diff --git a/simplexmq.cabal b/simplexmq.cabal index 32fc2decf..f49371b96 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -55,6 +55,7 @@ library Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220817_connection_ntfs Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220905_commands Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220915_connection_queues + Simplex.Messaging.Agent.TAsyncs Simplex.Messaging.Agent.TRcvQueues Simplex.Messaging.Client Simplex.Messaging.Client.Agent diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 53c9d4304..e4ddda84f 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -117,6 +117,7 @@ import Simplex.Messaging.Agent.Protocol import Simplex.Messaging.Agent.RetryInterval import Simplex.Messaging.Agent.Store import Simplex.Messaging.Agent.Store.SQLite (SQLiteStore (..), withTransaction) +import Simplex.Messaging.Agent.TAsyncs import Simplex.Messaging.Agent.TRcvQueues (TRcvQueues) import qualified Simplex.Messaging.Agent.TRcvQueues as RQ import Simplex.Messaging.Client @@ -156,7 +157,6 @@ import Simplex.Messaging.Transport.Client (TransportHost) import Simplex.Messaging.Util import Simplex.Messaging.Version import System.Timeout (timeout) -import UnliftIO (async) import qualified UnliftIO.Exception as E import UnliftIO.STM @@ -196,8 +196,8 @@ data AgentClient = AgentClient connLocks :: TMap ConnId Lock, -- locks to prevent concurrent reconnections to SMP servers reconnectLocks :: TMap SMPServer Lock, - reconnections :: TVar [Async ()], - asyncClients :: TVar [Async ()], + reconnections :: TAsyncs, + asyncClients :: TAsyncs, agentStats :: TMap AgentStatsKey (TVar Int), clientId :: Int, agentEnv :: Env @@ -260,8 +260,8 @@ newAgentClient InitialAgentServers {smp, ntf, netCfg} agentEnv = do getMsgLocks <- TM.empty connLocks <- TM.empty reconnectLocks <- TM.empty - reconnections <- newTVar [] - asyncClients <- newTVar [] + reconnections <- newTAsyncs + asyncClients <- newTAsyncs agentStats <- TM.empty clientId <- stateTVar (clientCounter agentEnv) $ \i -> let i' = i + 1 in (i', i') return AgentClient {active, rcvQ, subQ, msgQ, smpServers, smpClients, ntfServers, ntfClients, useNetworkConfig, subscrConns, activeSubs, pendingSubs, pendingMsgsQueued, smpQueueMsgQueues, smpQueueMsgDeliveries, connCmdsQueued, asyncCmdQueues, asyncCmdProcesses, ntfNetworkOp, rcvNetworkOp, msgDeliveryOp, sndNetworkOp, databaseOp, agentState, getMsgLocks, connLocks, reconnectLocks, reconnections, asyncClients, agentStats, clientId, agentEnv} @@ -322,14 +322,13 @@ getSMPServerClient c@AgentClient {active, smpClients, msgQ} srv = do notifySub connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, cmd) reconnectServer :: AgentMonad m => AgentClient -> SMPServer -> m () -reconnectServer c srv = do - a <- async tryReconnectSMPClient - atomically $ modifyTVar' (reconnections c) (a :) +reconnectServer c srv = newAsyncAction tryReconnectSMPClient $ reconnections c where - tryReconnectSMPClient = do + tryReconnectSMPClient aId = do ri <- asks $ reconnectInterval . config withRetryInterval ri $ \loop -> reconnectSMPClient c srv `catchError` const loop + atomically . removeAsyncAction aId $ reconnections c reconnectSMPClient :: forall m. AgentMonad m => AgentClient -> SMPServer -> m () reconnectSMPClient c srv = @@ -422,13 +421,12 @@ newProtocolClient c srv clients connectClient reconnectClient clientVar = tryCon TM.delete srv clients throwError e tryConnectAsync :: m () - tryConnectAsync = do - a <- async connectAsync - atomically $ modifyTVar' (asyncClients c) (a :) - connectAsync :: m () - connectAsync = do + tryConnectAsync = newAsyncAction connectAsync $ asyncClients c + connectAsync :: Int -> m () + connectAsync aId = do ri <- asks $ reconnectInterval . config withRetryInterval ri $ \loop -> void $ tryConnectClient (const $ reconnectClient c srv) loop + atomically . removeAsyncAction aId $ asyncClients c hostEvent :: forall msg. ProtocolTypeI (ProtoType msg) => (AProtocolType -> TransportHost -> ACommand 'Agent) -> ProtocolClient msg -> ACommand 'Agent hostEvent event client = event (AProtocolType $ protocolTypeI @(ProtoType msg)) $ transportHost' client @@ -444,8 +442,8 @@ closeAgentClient c = liftIO $ do atomically $ writeTVar (active c) False closeProtocolServerClients c smpClients closeProtocolServerClients c ntfClients - cancelActions $ reconnections c - cancelActions $ asyncClients c + cancelActions . actions $ reconnections c + cancelActions . actions $ asyncClients c cancelActions $ smpQueueMsgDeliveries c cancelActions $ asyncCmdProcesses c atomically . RQ.clear $ activeSubs c diff --git a/src/Simplex/Messaging/Agent/TAsyncs.hs b/src/Simplex/Messaging/Agent/TAsyncs.hs new file mode 100644 index 000000000..80fc41840 --- /dev/null +++ b/src/Simplex/Messaging/Agent/TAsyncs.hs @@ -0,0 +1,25 @@ +module Simplex.Messaging.Agent.TAsyncs where + +import Control.Concurrent.STM (stateTVar) +import Control.Monad.IO.Unlift (MonadUnliftIO) +import Simplex.Messaging.TMap (TMap) +import qualified Simplex.Messaging.TMap as TM +import UnliftIO.Async (Async, async) +import UnliftIO.STM + +data TAsyncs = TAsyncs + { actionId :: TVar Int, + actions :: TMap Int (Async ()) + } + +newTAsyncs :: STM TAsyncs +newTAsyncs = TAsyncs <$> newTVar 0 <*> TM.empty + +newAsyncAction :: MonadUnliftIO m => (Int -> m ()) -> TAsyncs -> m () +newAsyncAction action as = do + aId <- atomically $ stateTVar (actionId as) $ \i -> (i + 1, i + 1) + a <- async $ action aId + atomically $ TM.insert aId a $ actions as + +removeAsyncAction :: Int -> TAsyncs -> STM () +removeAsyncAction aId = TM.delete aId . actions