From c4f377a85b016bf89b857c33de367d75447e6712 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Wed, 18 Jan 2023 11:07:25 +0000 Subject: [PATCH 1/4] fix SMP server stats (#612) * fix SMP server stats * add server stats test * fix test --- src/Simplex/Messaging/Server/Stats.hs | 4 ++- tests/ServerTests.hs | 35 +++++++++++++++++++++++++-- 2 files changed, 36 insertions(+), 3 deletions(-) diff --git a/src/Simplex/Messaging/Server/Stats.hs b/src/Simplex/Messaging/Server/Stats.hs index 82170e90f..b462eaffa 100644 --- a/src/Simplex/Messaging/Server/Stats.hs +++ b/src/Simplex/Messaging/Server/Stats.hs @@ -47,6 +47,7 @@ data ServerStatsData = ServerStatsData _qCount :: Int, _msgCount :: Int } + deriving (Show) newServerStats :: UTCTime -> STM ServerStats newServerStats ts = do @@ -88,7 +89,7 @@ setServerStats s d = do writeTVar (qDeleted s) $! _qDeleted d writeTVar (msgSent s) $! _msgSent d writeTVar (msgRecv s) $! _msgRecv d - setPeriodStats (activeQueuesNtf s) (_activeQueuesNtf d) + setPeriodStats (activeQueues s) (_activeQueues d) writeTVar (msgSentNtf s) $! _msgSentNtf d writeTVar (msgRecvNtf s) $! _msgRecvNtf d setPeriodStats (activeQueuesNtf s) (_activeQueuesNtf d) @@ -152,6 +153,7 @@ data PeriodStatsData a = PeriodStatsData _week :: Set a, _month :: Set a } + deriving (Show) newPeriodStatsData :: PeriodStatsData a newPeriodStatsData = PeriodStatsData {_day = S.empty, _week = S.empty, _month = S.empty} diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index 66315336a..5a83e5f1e 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -11,6 +11,7 @@ module ServerTests where +import AgentTests.NotificationTests (removeFileIfExists) import Control.Concurrent (ThreadId, killThread, threadDelay) import Control.Concurrent.STM import Control.Exception (SomeException, try) @@ -20,6 +21,7 @@ import Data.Bifunctor (first) import Data.ByteString.Base64 import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B +import qualified Data.Set as S import SMPClient import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding @@ -28,6 +30,7 @@ import Simplex.Messaging.Parsers (parseAll) import Simplex.Messaging.Protocol import Simplex.Messaging.Server.Env.STM (ServerConfig (..)) import Simplex.Messaging.Server.Expiration +import Simplex.Messaging.Server.Stats (PeriodStatsData (..), ServerStatsData (..)) import Simplex.Messaging.Transport import System.Directory (removeFile) import System.TimeIt (timeItT) @@ -605,6 +608,10 @@ logSize f = testRestoreMessages :: ATransport -> Spec testRestoreMessages at@(ATransport t) = it "should store messages on exit and restore on start" $ do + removeFileIfExists testStoreLogFile + removeFileIfExists testStoreMsgsFile + removeFileIfExists testServerStatsBackupFile + (sPub, sKey) <- C.generateSignatureKeyPair C.SEd25519 recipientId <- newTVarIO "" recipientKey <- newTVarIO Nothing @@ -632,11 +639,15 @@ testRestoreMessages at@(ATransport t) = Resp "6" _ (ERR QUOTA) <- signSendRecv h sKey ("6", sId, _SEND "hello 6") pure () + rId <- readTVarIO recipientId + logSize testStoreLogFile `shouldReturn` 2 logSize testStoreMsgsFile `shouldReturn` 5 + logSize testServerStatsBackupFile `shouldReturn` 16 + Right stats1 <- strDecode <$> B.readFile testServerStatsBackupFile + checkStats stats1 [rId] 5 1 withSmpServerStoreMsgLogOn at testPort . runTest t $ \h -> do - rId <- readTVarIO recipientId Just rKey <- readTVarIO recipientKey Just dh <- readTVarIO dhShared let dec = decryptMsgV3 dh @@ -650,9 +661,11 @@ testRestoreMessages at@(ATransport t) = logSize testStoreLogFile `shouldReturn` 1 -- the last message is not removed because it was not ACK'd logSize testStoreMsgsFile `shouldReturn` 3 + logSize testServerStatsBackupFile `shouldReturn` 16 + Right stats2 <- strDecode <$> B.readFile testServerStatsBackupFile + checkStats stats2 [rId] 5 3 withSmpServerStoreMsgLogOn at testPort . runTest t $ \h -> do - rId <- readTVarIO recipientId Just rKey <- readTVarIO recipientKey Just dh <- readTVarIO dhShared let dec = decryptMsgV3 dh @@ -667,9 +680,13 @@ testRestoreMessages at@(ATransport t) = logSize testStoreLogFile `shouldReturn` 1 logSize testStoreMsgsFile `shouldReturn` 0 + logSize testServerStatsBackupFile `shouldReturn` 16 + Right stats3 <- strDecode <$> B.readFile testServerStatsBackupFile + checkStats stats3 [rId] 5 5 removeFile testStoreLogFile removeFile testStoreMsgsFile + removeFile testServerStatsBackupFile where runTest :: Transport c => TProxy c -> (THandle c -> IO ()) -> ThreadId -> Expectation runTest _ test' server = do @@ -679,6 +696,20 @@ testRestoreMessages at@(ATransport t) = runClient :: Transport c => TProxy c -> (THandle c -> IO ()) -> Expectation runClient _ test' = testSMPClient test' `shouldReturn` () +checkStats :: ServerStatsData -> [RecipientId] -> Int -> Int -> Expectation +checkStats s qs sent received = do + _qCreated s `shouldBe` length qs + _qSecured s `shouldBe` length qs + _qDeleted s `shouldBe` 0 + _msgSent s `shouldBe` sent + _msgRecv s `shouldBe` received + _msgSentNtf s `shouldBe` 0 + _msgRecvNtf s `shouldBe` 0 + let PeriodStatsData {_day, _week, _month} = _activeQueues s + S.toList _day `shouldBe` qs + S.toList _week `shouldBe` qs + S.toList _month `shouldBe` qs + testRestoreMessagesV2 :: ATransport -> Spec testRestoreMessagesV2 at@(ATransport t) = it "should store messages on exit and restore on start" $ do From f921dc3adb0e5d96f1cbd3eff31eb1503e1ae38b Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Wed, 18 Jan 2023 11:21:45 +0000 Subject: [PATCH 2/4] 4.3.1 --- package.yaml | 2 +- simplexmq.cabal | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/package.yaml b/package.yaml index d4ed1c127..04d8c773d 100644 --- a/package.yaml +++ b/package.yaml @@ -1,5 +1,5 @@ name: simplexmq -version: 4.3.0 +version: 4.3.1 synopsis: SimpleXMQ message broker description: | This package includes <./docs/Simplex-Messaging-Server.html server>, diff --git a/simplexmq.cabal b/simplexmq.cabal index cd0571d83..32fc2decf 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -5,7 +5,7 @@ cabal-version: 1.12 -- see: https://github.com/sol/hpack name: simplexmq -version: 4.3.0 +version: 4.3.1 synopsis: SimpleXMQ message broker description: This package includes <./docs/Simplex-Messaging-Server.html server>, <./docs/Simplex-Messaging-Client.html client> and From 6ccbe5e66e7d34889ffb11daefc31daf8ac9791c Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Wed, 18 Jan 2023 14:30:25 +0000 Subject: [PATCH 3/4] retry unsuccessful subscriptions in case of temporary errors (#613) * retry unsuccessful subscriptions in case of temporary errors * do not send DOWN if connection has any active queues --- src/Simplex/Messaging/Agent/Client.hs | 89 ++++++++++++----------- src/Simplex/Messaging/Agent/TRcvQueues.hs | 8 +- 2 files changed, 52 insertions(+), 45 deletions(-) diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index a94d866d0..53c9d4304 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -96,7 +96,7 @@ import Data.Bifunctor (bimap, first, second) import Data.ByteString.Base64 import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B -import Data.Either (isRight, partitionEithers) +import Data.Either (isRight, lefts, partitionEithers) import Data.Functor (($>)) import Data.List (partition) import Data.List.NonEmpty (NonEmpty (..)) @@ -286,7 +286,7 @@ getSMPServerClient c@AgentClient {active, smpClients, msgQ} srv = do unlessM (readTVarIO active) . throwError $ INTERNAL "agent is stopped" atomically (getClientVar srv smpClients) >>= either - (newProtocolClient c srv smpClients connectClient reconnectClient) + (newProtocolClient c srv smpClients connectClient reconnectSMPClient) (waitForProtocolClient c srv) where connectClient :: m SMPClient @@ -303,9 +303,11 @@ getSMPServerClient c@AgentClient {active, smpClients, msgQ} srv = do removeClientAndSubs :: IO ([RcvQueue], [ConnId]) removeClientAndSubs = atomically $ do TM.delete srv smpClients - (qs, conns) <- RQ.getDelSrvQueues srv $ activeSubs c + qs <- RQ.getDelSrvQueues srv $ activeSubs c mapM_ (`RQ.addQueue` pendingSubs c) qs - pure (qs, S.toList conns) + let cs = S.fromList $ map (\RcvQueue {connId} -> connId) qs + cs' <- RQ.getConns $ activeSubs c + pure (qs, S.toList $ cs `S.difference` cs') serverDown :: ([RcvQueue], [ConnId]) -> IO () serverDown (qs, conns) = whenM (readTVarIO active) $ do @@ -314,40 +316,41 @@ getSMPServerClient c@AgentClient {active, smpClients, msgQ} srv = do unless (null conns) $ notifySub "" $ DOWN srv conns unless (null qs) $ do atomically $ mapM_ (releaseGetLock c) qs - unliftIO u reconnectServer + unliftIO u $ reconnectServer c srv - reconnectServer :: m () - reconnectServer = do - a <- async tryReconnectClient - atomically $ modifyTVar' (reconnections c) (a :) + notifySub :: ConnId -> ACommand 'Agent -> IO () + notifySub connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, cmd) - tryReconnectClient :: m () - tryReconnectClient = do - ri <- asks $ reconnectInterval . config - withRetryInterval ri $ \loop -> - reconnectClient `catchError` const loop - - reconnectClient :: m () - reconnectClient = - withLockMap_ (reconnectLocks c) srv "reconnect" $ - atomically (RQ.getSrvQueues srv $ pendingSubs c) >>= resubscribe - where - resubscribe :: [RcvQueue] -> m () - resubscribe qs = do - connected <- maybe False isRight <$> atomically (TM.lookup srv smpClients $>>= tryReadTMVar) - cs <- atomically . RQ.getConns $ activeSubs c - (client_, rs) <- subscribeQueues c srv qs - let (errs, okConns) = partitionEithers $ map (\(RcvQueue {connId}, r) -> bimap (connId,) (const connId) r) rs - liftIO $ do - unless connected . forM_ client_ $ \cl -> do - incClientStat c cl "CONNECT" "" - notifySub "" $ hostEvent CONNECT cl - let conns = S.toList $ S.fromList okConns `S.difference` cs - unless (null conns) $ notifySub "" $ UP srv conns - let (tempErrs, finalErrs) = partition (temporaryAgentError . snd) errs - liftIO $ mapM_ (\(connId, e) -> notifySub connId $ ERR e) finalErrs - mapM_ (throwError . snd) $ listToMaybe tempErrs +reconnectServer :: AgentMonad m => AgentClient -> SMPServer -> m () +reconnectServer c srv = do + a <- async tryReconnectSMPClient + atomically $ modifyTVar' (reconnections c) (a :) + where + tryReconnectSMPClient = do + ri <- asks $ reconnectInterval . config + withRetryInterval ri $ \loop -> + reconnectSMPClient c srv `catchError` const loop +reconnectSMPClient :: forall m. AgentMonad m => AgentClient -> SMPServer -> m () +reconnectSMPClient c srv = + withLockMap_ (reconnectLocks c) srv "reconnect" $ + atomically (RQ.getSrvQueues srv $ pendingSubs c) >>= resubscribe + where + resubscribe :: [RcvQueue] -> m () + resubscribe qs = do + connected <- maybe False isRight <$> atomically (TM.lookup srv (smpClients c) $>>= tryReadTMVar) + cs <- atomically . RQ.getConns $ activeSubs c + (client_, rs) <- subscribeQueues c srv qs + let (errs, okConns) = partitionEithers $ map (\(RcvQueue {connId}, r) -> bimap (connId,) (const connId) r) rs + liftIO $ do + unless connected . forM_ client_ $ \cl -> do + incClientStat c cl "CONNECT" "" + notifySub "" $ hostEvent CONNECT cl + let conns = S.toList $ S.fromList okConns `S.difference` cs + unless (null conns) $ notifySub "" $ UP srv conns + let (tempErrs, finalErrs) = partition (temporaryAgentError . snd) errs + liftIO $ mapM_ (\(connId, e) -> notifySub connId $ ERR e) finalErrs + mapM_ (throwError . snd) $ listToMaybe tempErrs notifySub :: ConnId -> ACommand 'Agent -> IO () notifySub connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, cmd) @@ -356,7 +359,7 @@ getNtfServerClient c@AgentClient {active, ntfClients} srv = do unlessM (readTVarIO active) . throwError $ INTERNAL "agent is stopped" atomically (getClientVar srv ntfClients) >>= either - (newProtocolClient c srv ntfClients connectClient $ pure ()) + (newProtocolClient c srv ntfClients connectClient $ \_ _ -> pure ()) (waitForProtocolClient c srv) where connectClient :: m NtfClient @@ -396,7 +399,7 @@ newProtocolClient :: ProtoServer msg -> TMap (ProtoServer msg) (ClientVar msg) -> m (ProtocolClient msg) -> - m () -> + (AgentClient -> ProtoServer msg -> m ()) -> ClientVar msg -> m (ProtocolClient msg) newProtocolClient c srv clients connectClient reconnectClient clientVar = tryConnectClient pure tryConnectAsync @@ -425,7 +428,7 @@ newProtocolClient c srv clients connectClient reconnectClient clientVar = tryCon connectAsync :: m () connectAsync = do ri <- asks $ reconnectInterval . config - withRetryInterval ri $ \loop -> void $ tryConnectClient (const reconnectClient) loop + withRetryInterval ri $ \loop -> void $ tryConnectClient (const $ reconnectClient c srv) loop hostEvent :: forall msg. ProtocolTypeI (ProtoType msg) => (AProtocolType -> TransportHost -> ACommand 'Agent) -> ProtocolClient msg -> ACommand 'Agent hostEvent event client = event (AProtocolType $ protocolTypeI @(ProtoType msg)) $ transportHost' client @@ -609,9 +612,11 @@ subscribeQueue c rq@RcvQueue {connId, server, rcvPrivateKey, rcvId} = do atomically $ do modifyTVar' (subscrConns c) $ S.insert connId RQ.addQueue rq $ pendingSubs c - withLogClient c server rcvId "SUB" $ \smp -> + r <- withLogClient c server rcvId "SUB" $ \smp -> liftIO (runExceptT (subscribeSMPQueue smp rcvPrivateKey rcvId) >>= processSubResult c rq) - >>= either throwError pure + case r of + Left e -> reconnectServer c server >> throwError (protocolClientError SMP (B.unpack $ strEncode server) e) + _ -> pure () processSubResult :: AgentClient -> RcvQueue -> Either ProtocolClientError () -> IO (Either ProtocolClientError ()) processSubResult c rq r = do @@ -646,7 +651,7 @@ subscribeQueues c srv qs = do forM_ qs_ $ \rq@RcvQueue {connId} -> atomically $ do modifyTVar' (subscrConns c) $ S.insert connId RQ.addQueue rq $ pendingSubs c - case L.nonEmpty qs_ of + r <- case L.nonEmpty qs_ of Just qs' -> do smp_ <- tryError (getSMPServerClient c srv) (eitherToMaybe smp_,) . (errs <>) <$> case smp_ of @@ -661,6 +666,8 @@ subscribeQueues c srv qs = do mapM_ (uncurry $ processSubResult c) rs pure $ map (second . first $ protocolClientError SMP $ clientServer smp) rs _ -> pure (Nothing, errs) + when (any temporaryOrHostError . lefts . map snd $ snd r) $ reconnectServer c srv + pure r where checkQueue rq@RcvQueue {rcvId, server} | server == srv = do diff --git a/src/Simplex/Messaging/Agent/TRcvQueues.hs b/src/Simplex/Messaging/Agent/TRcvQueues.hs index 9ace32db7..bed4138a2 100644 --- a/src/Simplex/Messaging/Agent/TRcvQueues.hs +++ b/src/Simplex/Messaging/Agent/TRcvQueues.hs @@ -40,9 +40,9 @@ getSrvQueues srv (TRcvQueues qs) = M.foldl' addQ [] <$> readTVar qs where addQ qs' rq@RcvQueue {server} = if srv == server then rq : qs' else qs' -getDelSrvQueues :: SMPServer -> TRcvQueues -> STM ([RcvQueue], Set ConnId) -getDelSrvQueues srv (TRcvQueues qs) = stateTVar qs $ M.foldl' addQ (([], S.empty), M.empty) +getDelSrvQueues :: SMPServer -> TRcvQueues -> STM [RcvQueue] +getDelSrvQueues srv (TRcvQueues qs) = stateTVar qs $ M.foldl' addQ ([], M.empty) where - addQ (removed@(remQs, remConns), qs') rq@RcvQueue {connId, server, rcvId} - | srv == server = ((rq : remQs, S.insert connId remConns), qs') + addQ (removed, qs') rq@RcvQueue {server, rcvId} + | srv == server = (rq : removed, qs') | otherwise = (removed, M.insert (server, rcvId) rq qs') From 14cb88e725645f4f32cea09e3efdb69e3cbee11f Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Wed, 18 Jan 2023 18:22:17 +0000 Subject: [PATCH 4/4] remove completed async action handles from memory (#614) * remove completed async action handles from memory * name Co-authored-by: JRoberts <8711996+jr-simplex@users.noreply.github.com> Co-authored-by: JRoberts <8711996+jr-simplex@users.noreply.github.com> --- simplexmq.cabal | 1 + src/Simplex/Messaging/Agent/Client.hs | 30 ++++++++++++-------------- src/Simplex/Messaging/Agent/TAsyncs.hs | 25 +++++++++++++++++++++ 3 files changed, 40 insertions(+), 16 deletions(-) create mode 100644 src/Simplex/Messaging/Agent/TAsyncs.hs diff --git a/simplexmq.cabal b/simplexmq.cabal index 32fc2decf..f49371b96 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -55,6 +55,7 @@ library Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220817_connection_ntfs Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220905_commands Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220915_connection_queues + Simplex.Messaging.Agent.TAsyncs Simplex.Messaging.Agent.TRcvQueues Simplex.Messaging.Client Simplex.Messaging.Client.Agent diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 53c9d4304..e4ddda84f 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -117,6 +117,7 @@ import Simplex.Messaging.Agent.Protocol import Simplex.Messaging.Agent.RetryInterval import Simplex.Messaging.Agent.Store import Simplex.Messaging.Agent.Store.SQLite (SQLiteStore (..), withTransaction) +import Simplex.Messaging.Agent.TAsyncs import Simplex.Messaging.Agent.TRcvQueues (TRcvQueues) import qualified Simplex.Messaging.Agent.TRcvQueues as RQ import Simplex.Messaging.Client @@ -156,7 +157,6 @@ import Simplex.Messaging.Transport.Client (TransportHost) import Simplex.Messaging.Util import Simplex.Messaging.Version import System.Timeout (timeout) -import UnliftIO (async) import qualified UnliftIO.Exception as E import UnliftIO.STM @@ -196,8 +196,8 @@ data AgentClient = AgentClient connLocks :: TMap ConnId Lock, -- locks to prevent concurrent reconnections to SMP servers reconnectLocks :: TMap SMPServer Lock, - reconnections :: TVar [Async ()], - asyncClients :: TVar [Async ()], + reconnections :: TAsyncs, + asyncClients :: TAsyncs, agentStats :: TMap AgentStatsKey (TVar Int), clientId :: Int, agentEnv :: Env @@ -260,8 +260,8 @@ newAgentClient InitialAgentServers {smp, ntf, netCfg} agentEnv = do getMsgLocks <- TM.empty connLocks <- TM.empty reconnectLocks <- TM.empty - reconnections <- newTVar [] - asyncClients <- newTVar [] + reconnections <- newTAsyncs + asyncClients <- newTAsyncs agentStats <- TM.empty clientId <- stateTVar (clientCounter agentEnv) $ \i -> let i' = i + 1 in (i', i') return AgentClient {active, rcvQ, subQ, msgQ, smpServers, smpClients, ntfServers, ntfClients, useNetworkConfig, subscrConns, activeSubs, pendingSubs, pendingMsgsQueued, smpQueueMsgQueues, smpQueueMsgDeliveries, connCmdsQueued, asyncCmdQueues, asyncCmdProcesses, ntfNetworkOp, rcvNetworkOp, msgDeliveryOp, sndNetworkOp, databaseOp, agentState, getMsgLocks, connLocks, reconnectLocks, reconnections, asyncClients, agentStats, clientId, agentEnv} @@ -322,14 +322,13 @@ getSMPServerClient c@AgentClient {active, smpClients, msgQ} srv = do notifySub connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, cmd) reconnectServer :: AgentMonad m => AgentClient -> SMPServer -> m () -reconnectServer c srv = do - a <- async tryReconnectSMPClient - atomically $ modifyTVar' (reconnections c) (a :) +reconnectServer c srv = newAsyncAction tryReconnectSMPClient $ reconnections c where - tryReconnectSMPClient = do + tryReconnectSMPClient aId = do ri <- asks $ reconnectInterval . config withRetryInterval ri $ \loop -> reconnectSMPClient c srv `catchError` const loop + atomically . removeAsyncAction aId $ reconnections c reconnectSMPClient :: forall m. AgentMonad m => AgentClient -> SMPServer -> m () reconnectSMPClient c srv = @@ -422,13 +421,12 @@ newProtocolClient c srv clients connectClient reconnectClient clientVar = tryCon TM.delete srv clients throwError e tryConnectAsync :: m () - tryConnectAsync = do - a <- async connectAsync - atomically $ modifyTVar' (asyncClients c) (a :) - connectAsync :: m () - connectAsync = do + tryConnectAsync = newAsyncAction connectAsync $ asyncClients c + connectAsync :: Int -> m () + connectAsync aId = do ri <- asks $ reconnectInterval . config withRetryInterval ri $ \loop -> void $ tryConnectClient (const $ reconnectClient c srv) loop + atomically . removeAsyncAction aId $ asyncClients c hostEvent :: forall msg. ProtocolTypeI (ProtoType msg) => (AProtocolType -> TransportHost -> ACommand 'Agent) -> ProtocolClient msg -> ACommand 'Agent hostEvent event client = event (AProtocolType $ protocolTypeI @(ProtoType msg)) $ transportHost' client @@ -444,8 +442,8 @@ closeAgentClient c = liftIO $ do atomically $ writeTVar (active c) False closeProtocolServerClients c smpClients closeProtocolServerClients c ntfClients - cancelActions $ reconnections c - cancelActions $ asyncClients c + cancelActions . actions $ reconnections c + cancelActions . actions $ asyncClients c cancelActions $ smpQueueMsgDeliveries c cancelActions $ asyncCmdProcesses c atomically . RQ.clear $ activeSubs c diff --git a/src/Simplex/Messaging/Agent/TAsyncs.hs b/src/Simplex/Messaging/Agent/TAsyncs.hs new file mode 100644 index 000000000..80fc41840 --- /dev/null +++ b/src/Simplex/Messaging/Agent/TAsyncs.hs @@ -0,0 +1,25 @@ +module Simplex.Messaging.Agent.TAsyncs where + +import Control.Concurrent.STM (stateTVar) +import Control.Monad.IO.Unlift (MonadUnliftIO) +import Simplex.Messaging.TMap (TMap) +import qualified Simplex.Messaging.TMap as TM +import UnliftIO.Async (Async, async) +import UnliftIO.STM + +data TAsyncs = TAsyncs + { actionId :: TVar Int, + actions :: TMap Int (Async ()) + } + +newTAsyncs :: STM TAsyncs +newTAsyncs = TAsyncs <$> newTVar 0 <*> TM.empty + +newAsyncAction :: MonadUnliftIO m => (Int -> m ()) -> TAsyncs -> m () +newAsyncAction action as = do + aId <- atomically $ stateTVar (actionId as) $ \i -> (i + 1, i + 1) + a <- async $ action aId + atomically $ TM.insert aId a $ actions as + +removeAsyncAction :: Int -> TAsyncs -> STM () +removeAsyncAction aId = TM.delete aId . actions