From 628930df1fa1c3fff6fd1413e7b437148c4a83b5 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Sat, 4 Jun 2022 13:08:05 +0100 Subject: [PATCH 1/3] support stopping and resuming agent (#385) * export agentDbPath * support fully closing and resuming agent * whitespace * clean up --- src/Simplex/Messaging/Agent.hs | 6 ++- src/Simplex/Messaging/Agent/Client.hs | 41 +++++++++++++------ .../Notifications/Server/Push/testpush.sh | 3 +- 3 files changed, 35 insertions(+), 15 deletions(-) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index ee67611a7..b8384fa27 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -35,7 +35,8 @@ module Simplex.Messaging.Agent AgentMonad, AgentErrorMonad, getSMPAgentClient, - disconnectAgentClient, -- used in tests + disconnectAgentClient, + resumeAgentClient, withAgentLock, createConnection, joinConnection, @@ -113,6 +114,9 @@ getSMPAgentClient cfg initServers = newSMPAgentEnv cfg >>= runReaderT runAgent disconnectAgentClient :: MonadUnliftIO m => AgentClient -> m () disconnectAgentClient c = closeAgentClient c >> logConnection c False +resumeAgentClient :: MonadIO m => AgentClient -> m () +resumeAgentClient c = atomically $ writeTVar (active c) True + -- | type AgentErrorMonad m = (MonadUnliftIO m, MonadError AgentErrorType m) diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 2d842e3c9..e6e3c0c47 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -39,6 +39,7 @@ module Simplex.Messaging.Agent.Client logServer, removeSubscription, hasActiveSubscription, + agentDbPath, ) where @@ -63,6 +64,7 @@ import Simplex.Messaging.Agent.Env.SQLite import Simplex.Messaging.Agent.Protocol import Simplex.Messaging.Agent.RetryInterval import Simplex.Messaging.Agent.Store +import Simplex.Messaging.Agent.Store.SQLite (SQLiteStore (..)) import Simplex.Messaging.Client import Simplex.Messaging.Client.Agent () import qualified Simplex.Messaging.Crypto as C @@ -132,6 +134,9 @@ newAgentClient InitialAgentServers {smp, ntf} agentEnv = do lock <- newTMVar () return AgentClient {active, rcvQ, subQ, msgQ, smpServers, ntfServers, smpClients, ntfClients, subscrSrvrs, pendingSubscrSrvrs, subscrConns, connMsgsQueued, smpQueueMsgQueues, smpQueueMsgDeliveries, reconnections, asyncClients, clientId, agentEnv, smpSubscriber = undefined, lock} +agentDbPath :: AgentClient -> FilePath +agentDbPath AgentClient {agentEnv = Env {store = SQLiteStore {dbFilePath}}} = dbFilePath + -- | Agent monad with MonadReader Env and MonadError AgentErrorType type AgentMonad m = (MonadUnliftIO m, MonadReader Env m, MonadError AgentErrorType m) @@ -184,10 +189,11 @@ getSMPServerClient c@AgentClient {active, smpClients, msgQ} srv = do _ -> TM.insert srv cVar ps serverDown :: UnliftIO m -> Map ConnId RcvQueue -> IO () - serverDown u cs = unless (M.null cs) $ do - let conns = M.keys cs - unless (null conns) . notifySub "" $ DOWN srv conns - whenM (readTVarIO active) $ unliftIO u reconnectServer + serverDown u cs = unless (M.null cs) $ + whenM (readTVarIO active) $ do + let conns = M.keys cs + unless (null conns) . notifySub "" $ DOWN srv conns + unliftIO u reconnectServer reconnectServer :: m () reconnectServer = do @@ -304,21 +310,30 @@ newProtocolClient c srv clients connectClient reconnectClient clientVar = tryCon closeAgentClient :: MonadIO m => AgentClient -> m () closeAgentClient c = liftIO $ do atomically $ writeTVar (active c) False - closeSMPServerClients c + closeProtocolServerClients (clientTimeout smpCfg) $ smpClients c + closeProtocolServerClients (clientTimeout ntfCfg) $ ntfClients c cancelActions $ reconnections c cancelActions $ asyncClients c cancelActions $ smpQueueMsgDeliveries c - -closeSMPServerClients :: AgentClient -> IO () -closeSMPServerClients c = readTVarIO (smpClients c) >>= mapM_ (forkIO . closeClient) + clear subscrSrvrs + clear pendingSubscrSrvrs + clear subscrConns + clear connMsgsQueued + clear smpQueueMsgQueues where - closeClient smpVar = - atomically (readTMVar smpVar) >>= \case - Right smp -> closeProtocolClient smp `catchAll_` pure () + clientTimeout sel = tcpTimeout . sel . config $ agentEnv c + clear sel = atomically $ writeTVar (sel c) M.empty + +closeProtocolServerClients :: Int -> TMap ProtocolServer (ClientVar msg) -> IO () +closeProtocolServerClients tcpTimeout cs = readTVarIO cs >>= mapM_ (forkIO . closeClient) >> atomically (writeTVar cs M.empty) + where + closeClient cVar = + tcpTimeout `timeout` atomically (readTMVar cVar) >>= \case + Just (Right client) -> closeProtocolClient client `catchAll_` pure () _ -> pure () -cancelActions :: Foldable f => TVar (f (Async ())) -> IO () -cancelActions as = readTVarIO as >>= mapM_ uninterruptibleCancel +cancelActions :: (Foldable f, Monoid (f (Async ()))) => TVar (f (Async ())) -> IO () +cancelActions as = readTVarIO as >>= mapM_ uninterruptibleCancel >> atomically (writeTVar as mempty) withAgentLock :: MonadUnliftIO m => AgentClient -> m a -> m a withAgentLock AgentClient {lock} = diff --git a/src/Simplex/Messaging/Notifications/Server/Push/testpush.sh b/src/Simplex/Messaging/Notifications/Server/Push/testpush.sh index b41135d83..bac5d8684 100755 --- a/src/Simplex/Messaging/Notifications/Server/Push/testpush.sh +++ b/src/Simplex/Messaging/Notifications/Server/Push/testpush.sh @@ -22,4 +22,5 @@ export AUTHENTICATION_TOKEN="${JWT_HEADER}.${JWT_CLAIMS}.${JWT_SIGNED_HEADER_CLA # curl -v --header "apns-topic: $TOPIC" --header "apns-push-type: background" --header "apns-priority: 5" --header "authorization: bearer $AUTHENTICATION_TOKEN" --data '{"aps":{"content-available":1}}' --http2 https://${APNS_HOST_NAME}/3/device/${DEVICE_TOKEN} # mutable-content notification -curl -v --header "apns-topic: $TOPIC" --header "apns-push-type: alert" --header "authorization: bearer $AUTHENTICATION_TOKEN" --data '{"aps":{"category": "NTF_CAT_CHECK_MESSAGE__SECRET", "mutable-content": 1, "alert":"received encrypted message"}, "data": {"test":"123"}}' --http2 https://${APNS_HOST_NAME}/3/device/${DEVICE_TOKEN} \ No newline at end of file +# NTF_CAT_CHECK_MESSAGE category will not show alert if the app is in foreground +curl -v --header "apns-topic: $TOPIC" --header "apns-push-type: alert" --header "authorization: bearer $AUTHENTICATION_TOKEN" --data '{"aps":{"category": "NTF_CAT_CHECK_MESSAGE__SECRET", "mutable-content": 1, "alert":"received encrypted message"}, "data": {"test":"123"}}' --http2 https://${APNS_HOST_NAME}/3/device/${DEVICE_TOKEN} From 3f69636f1a5f48292b6155195eb6840789364f86 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Tue, 7 Jun 2022 11:52:32 +0100 Subject: [PATCH 2/3] fix sockets/threads/memory leak (#388) * fix sockets/threads/memory leak * refactor --- src/Simplex/Messaging/Transport/Server.hs | 40 ++++++++++------------- 1 file changed, 17 insertions(+), 23 deletions(-) diff --git a/src/Simplex/Messaging/Transport/Server.hs b/src/Simplex/Messaging/Transport/Server.hs index c2e12aff0..3c1604a2f 100644 --- a/src/Simplex/Messaging/Transport/Server.hs +++ b/src/Simplex/Messaging/Transport/Server.hs @@ -12,20 +12,22 @@ module Simplex.Messaging.Transport.Server ) where +import Control.Concurrent.STM (stateTVar) import Control.Monad.Except import Control.Monad.IO.Unlift import qualified Crypto.Store.X509 as SX import Data.Default (def) -import Data.Set (Set) -import qualified Data.Set as S import qualified Data.X509 as X import Data.X509.Validation (Fingerprint (..)) import qualified Data.X509.Validation as XV import Network.Socket import qualified Network.TLS as T +import Simplex.Messaging.TMap (TMap) +import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport import Simplex.Messaging.Util (catchAll_) import System.Exit (exitFailure) +import System.Mem.Weak (Weak, deRefWeak) import UnliftIO.Concurrent import qualified UnliftIO.Exception as E import UnliftIO.STM @@ -36,37 +38,29 @@ import UnliftIO.STM runTransportServer :: forall c m. (Transport c, MonadUnliftIO m) => TMVar Bool -> ServiceName -> T.ServerParams -> (c -> m ()) -> m () runTransportServer started port serverParams server = do u <- askUnliftIO - liftIO $ do - clients <- newTVarIO S.empty + liftIO . runTCPServer started port $ \conn -> E.bracket - (startTCPServer started port) - (closeServer started clients) - $ \sock -> forever . E.bracketOnError (accept sock) (close . fst) $ \(conn, _peer) -> do - -- catchAll_ is needed here in case the connection was closed earlier - tid <- forkFinally (connectClient u conn) (const . liftIO $ gracefulClose conn 5000 `catchAll_` pure ()) - atomically . modifyTVar' clients $ S.insert tid - where - connectClient :: UnliftIO m -> Socket -> IO () - connectClient u conn = - E.bracket - (connectTLS serverParams conn >>= getServerConnection) - closeConnection - (unliftIO u . server) + (connectTLS serverParams conn >>= getServerConnection) + closeConnection + (unliftIO u . server) --- | Run TCP server without TLS - only used in SimpleX Chat +-- | Run TCP server without TLS runTCPServer :: TMVar Bool -> ServiceName -> (Socket -> IO ()) -> IO () runTCPServer started port server = do - clients <- newTVarIO S.empty + clients <- atomically TM.empty + clientId <- newTVarIO 0 E.bracket (startTCPServer started port) (closeServer started clients) $ \sock -> forever . E.bracketOnError (accept sock) (close . fst) $ \(conn, _peer) -> do - tid <- forkFinally (server conn) (const $ gracefulClose conn 5000) - atomically . modifyTVar' clients $ S.insert tid + -- catchAll_ is needed here in case the connection was closed earlier + cId <- atomically $ stateTVar clientId $ \cId -> (cId + 1, cId + 1) + tId <- mkWeakThreadId =<< forkFinally (server conn) (const $ gracefulClose conn 5000 `catchAll_` atomically (TM.delete cId clients)) + atomically $ TM.insert cId tId clients -closeServer :: TMVar Bool -> TVar (Set ThreadId) -> Socket -> IO () +closeServer :: TMVar Bool -> TMap Int (Weak ThreadId) -> Socket -> IO () closeServer started clients sock = do - readTVarIO clients >>= mapM_ killThread + readTVarIO clients >>= mapM_ (deRefWeak >=> mapM_ killThread) close sock void . atomically $ tryPutTMVar started False From a3d1f5540d55513fd00a8d7f4a43c0ac10a1145e Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Tue, 7 Jun 2022 11:55:28 +0100 Subject: [PATCH 3/3] v2.2.0 --- CHANGELOG.md | 10 ++++++++++ package.yaml | 2 +- simplexmq.cabal | 2 +- src/Simplex/Messaging/Transport.hs | 2 +- 4 files changed, 13 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c7af5a10a..048edcb25 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,13 @@ +# 2.2.0 + +SMP server: + +- Fix sockets/threads/memory leak + +SMP agent: + +- Support stopping and resuming agent with `disconnectAgentClient` / `resumeAgentClient` + # 2.1.1 SMP server: diff --git a/package.yaml b/package.yaml index 177d4ad86..4f05628ea 100644 --- a/package.yaml +++ b/package.yaml @@ -1,5 +1,5 @@ name: simplexmq -version: 2.1.1 +version: 2.2.0 synopsis: SimpleXMQ message broker description: | This package includes <./docs/Simplex-Messaging-Server.html server>, diff --git a/simplexmq.cabal b/simplexmq.cabal index 14abecc27..5c992e587 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -5,7 +5,7 @@ cabal-version: 1.12 -- see: https://github.com/sol/hpack name: simplexmq -version: 2.1.1 +version: 2.2.0 synopsis: SimpleXMQ message broker description: This package includes <./docs/Simplex-Messaging-Server.html server>, <./docs/Simplex-Messaging-Client.html client> and diff --git a/src/Simplex/Messaging/Transport.hs b/src/Simplex/Messaging/Transport.hs index 9f58e5441..5971c9314 100644 --- a/src/Simplex/Messaging/Transport.hs +++ b/src/Simplex/Messaging/Transport.hs @@ -96,7 +96,7 @@ supportedSMPVersions :: VersionRange supportedSMPVersions = mkVersionRange 1 1 simplexMQVersion :: String -simplexMQVersion = "2.1.1" +simplexMQVersion = "2.2.0" -- * Transport connection class