mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-04-26 23:55:14 +00:00
remove completed async action handles from memory (#614)
* remove completed async action handles from memory * name Co-authored-by: JRoberts <8711996+jr-simplex@users.noreply.github.com> Co-authored-by: JRoberts <8711996+jr-simplex@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
6ccbe5e66e
commit
14cb88e725
@@ -55,6 +55,7 @@ library
|
||||
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220817_connection_ntfs
|
||||
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220905_commands
|
||||
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220915_connection_queues
|
||||
Simplex.Messaging.Agent.TAsyncs
|
||||
Simplex.Messaging.Agent.TRcvQueues
|
||||
Simplex.Messaging.Client
|
||||
Simplex.Messaging.Client.Agent
|
||||
|
||||
@@ -117,6 +117,7 @@ import Simplex.Messaging.Agent.Protocol
|
||||
import Simplex.Messaging.Agent.RetryInterval
|
||||
import Simplex.Messaging.Agent.Store
|
||||
import Simplex.Messaging.Agent.Store.SQLite (SQLiteStore (..), withTransaction)
|
||||
import Simplex.Messaging.Agent.TAsyncs
|
||||
import Simplex.Messaging.Agent.TRcvQueues (TRcvQueues)
|
||||
import qualified Simplex.Messaging.Agent.TRcvQueues as RQ
|
||||
import Simplex.Messaging.Client
|
||||
@@ -156,7 +157,6 @@ import Simplex.Messaging.Transport.Client (TransportHost)
|
||||
import Simplex.Messaging.Util
|
||||
import Simplex.Messaging.Version
|
||||
import System.Timeout (timeout)
|
||||
import UnliftIO (async)
|
||||
import qualified UnliftIO.Exception as E
|
||||
import UnliftIO.STM
|
||||
|
||||
@@ -196,8 +196,8 @@ data AgentClient = AgentClient
|
||||
connLocks :: TMap ConnId Lock,
|
||||
-- locks to prevent concurrent reconnections to SMP servers
|
||||
reconnectLocks :: TMap SMPServer Lock,
|
||||
reconnections :: TVar [Async ()],
|
||||
asyncClients :: TVar [Async ()],
|
||||
reconnections :: TAsyncs,
|
||||
asyncClients :: TAsyncs,
|
||||
agentStats :: TMap AgentStatsKey (TVar Int),
|
||||
clientId :: Int,
|
||||
agentEnv :: Env
|
||||
@@ -260,8 +260,8 @@ newAgentClient InitialAgentServers {smp, ntf, netCfg} agentEnv = do
|
||||
getMsgLocks <- TM.empty
|
||||
connLocks <- TM.empty
|
||||
reconnectLocks <- TM.empty
|
||||
reconnections <- newTVar []
|
||||
asyncClients <- newTVar []
|
||||
reconnections <- newTAsyncs
|
||||
asyncClients <- newTAsyncs
|
||||
agentStats <- TM.empty
|
||||
clientId <- stateTVar (clientCounter agentEnv) $ \i -> let i' = i + 1 in (i', i')
|
||||
return AgentClient {active, rcvQ, subQ, msgQ, smpServers, smpClients, ntfServers, ntfClients, useNetworkConfig, subscrConns, activeSubs, pendingSubs, pendingMsgsQueued, smpQueueMsgQueues, smpQueueMsgDeliveries, connCmdsQueued, asyncCmdQueues, asyncCmdProcesses, ntfNetworkOp, rcvNetworkOp, msgDeliveryOp, sndNetworkOp, databaseOp, agentState, getMsgLocks, connLocks, reconnectLocks, reconnections, asyncClients, agentStats, clientId, agentEnv}
|
||||
@@ -322,14 +322,13 @@ getSMPServerClient c@AgentClient {active, smpClients, msgQ} srv = do
|
||||
notifySub connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, cmd)
|
||||
|
||||
reconnectServer :: AgentMonad m => AgentClient -> SMPServer -> m ()
|
||||
reconnectServer c srv = do
|
||||
a <- async tryReconnectSMPClient
|
||||
atomically $ modifyTVar' (reconnections c) (a :)
|
||||
reconnectServer c srv = newAsyncAction tryReconnectSMPClient $ reconnections c
|
||||
where
|
||||
tryReconnectSMPClient = do
|
||||
tryReconnectSMPClient aId = do
|
||||
ri <- asks $ reconnectInterval . config
|
||||
withRetryInterval ri $ \loop ->
|
||||
reconnectSMPClient c srv `catchError` const loop
|
||||
atomically . removeAsyncAction aId $ reconnections c
|
||||
|
||||
reconnectSMPClient :: forall m. AgentMonad m => AgentClient -> SMPServer -> m ()
|
||||
reconnectSMPClient c srv =
|
||||
@@ -422,13 +421,12 @@ newProtocolClient c srv clients connectClient reconnectClient clientVar = tryCon
|
||||
TM.delete srv clients
|
||||
throwError e
|
||||
tryConnectAsync :: m ()
|
||||
tryConnectAsync = do
|
||||
a <- async connectAsync
|
||||
atomically $ modifyTVar' (asyncClients c) (a :)
|
||||
connectAsync :: m ()
|
||||
connectAsync = do
|
||||
tryConnectAsync = newAsyncAction connectAsync $ asyncClients c
|
||||
connectAsync :: Int -> m ()
|
||||
connectAsync aId = do
|
||||
ri <- asks $ reconnectInterval . config
|
||||
withRetryInterval ri $ \loop -> void $ tryConnectClient (const $ reconnectClient c srv) loop
|
||||
atomically . removeAsyncAction aId $ asyncClients c
|
||||
|
||||
hostEvent :: forall msg. ProtocolTypeI (ProtoType msg) => (AProtocolType -> TransportHost -> ACommand 'Agent) -> ProtocolClient msg -> ACommand 'Agent
|
||||
hostEvent event client = event (AProtocolType $ protocolTypeI @(ProtoType msg)) $ transportHost' client
|
||||
@@ -444,8 +442,8 @@ closeAgentClient c = liftIO $ do
|
||||
atomically $ writeTVar (active c) False
|
||||
closeProtocolServerClients c smpClients
|
||||
closeProtocolServerClients c ntfClients
|
||||
cancelActions $ reconnections c
|
||||
cancelActions $ asyncClients c
|
||||
cancelActions . actions $ reconnections c
|
||||
cancelActions . actions $ asyncClients c
|
||||
cancelActions $ smpQueueMsgDeliveries c
|
||||
cancelActions $ asyncCmdProcesses c
|
||||
atomically . RQ.clear $ activeSubs c
|
||||
|
||||
@@ -0,0 +1,25 @@
|
||||
module Simplex.Messaging.Agent.TAsyncs where
|
||||
|
||||
import Control.Concurrent.STM (stateTVar)
|
||||
import Control.Monad.IO.Unlift (MonadUnliftIO)
|
||||
import Simplex.Messaging.TMap (TMap)
|
||||
import qualified Simplex.Messaging.TMap as TM
|
||||
import UnliftIO.Async (Async, async)
|
||||
import UnliftIO.STM
|
||||
|
||||
data TAsyncs = TAsyncs
|
||||
{ actionId :: TVar Int,
|
||||
actions :: TMap Int (Async ())
|
||||
}
|
||||
|
||||
newTAsyncs :: STM TAsyncs
|
||||
newTAsyncs = TAsyncs <$> newTVar 0 <*> TM.empty
|
||||
|
||||
newAsyncAction :: MonadUnliftIO m => (Int -> m ()) -> TAsyncs -> m ()
|
||||
newAsyncAction action as = do
|
||||
aId <- atomically $ stateTVar (actionId as) $ \i -> (i + 1, i + 1)
|
||||
a <- async $ action aId
|
||||
atomically $ TM.insert aId a $ actions as
|
||||
|
||||
removeAsyncAction :: Int -> TAsyncs -> STM ()
|
||||
removeAsyncAction aId = TM.delete aId . actions
|
||||
Reference in New Issue
Block a user