From 80309a0089811a2b1503c234f7960f0f8c6e2622 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Wed, 8 Jun 2022 08:59:12 +0100 Subject: [PATCH 1/2] fix possible leak (#391) * fix possible leak * remove subscriptions map from the client --- src/Simplex/Messaging/Server.hs | 6 ++++-- src/Simplex/Messaging/Server/Env/STM.hs | 3 ++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index aac1db5ee..2b2dbaf7f 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -65,6 +65,7 @@ import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport import Simplex.Messaging.Transport.Server import Simplex.Messaging.Util +import System.Mem.Weak (deRefWeak) import UnliftIO.Concurrent import UnliftIO.Exception import UnliftIO.IO @@ -196,6 +197,7 @@ clientDisconnected c@Client {subscriptions, connected} = do atomically $ writeTVar connected False subs <- readTVarIO subscriptions mapM_ cancelSub subs + atomically $ writeTVar subscriptions M.empty cs <- asks $ subscribers . server atomically . mapM_ (\rId -> TM.update deleteCurrentClient rId cs) $ M.keys subs where @@ -209,7 +211,7 @@ sameClientSession Client {sessionId} Client {sessionId = s'} = sessionId == s' cancelSub :: MonadUnliftIO m => Sub -> m () cancelSub = \case - Sub {subThread = SubThread t} -> killThread t + Sub {subThread = SubThread t} -> liftIO $ deRefWeak t >>= mapM_ killThread _ -> return () receive :: (Transport c, MonadUnliftIO m, MonadReader Env m) => THandle c -> Client -> m () @@ -480,7 +482,7 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri forkSub :: MsgQueue -> m () forkSub q = do atomically . setSub $ \s -> s {subThread = SubPending} - t <- forkIO $ subscriber q + t <- mkWeakThreadId =<< forkIO (subscriber q) atomically . setSub $ \case s@Sub {subThread = SubPending} -> s {subThread = SubThread t} s -> s diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index d70fe203f..3f83ebfff 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -32,6 +32,7 @@ import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport (ATransport) import Simplex.Messaging.Transport.Server (loadFingerprint, loadTLSServerParams) import System.IO (IOMode (..)) +import System.Mem.Weak (Weak) import UnliftIO.STM data ServerConfig = ServerConfig @@ -113,7 +114,7 @@ data ServerStats = ServerStats fromTime :: TVar UTCTime } -data SubscriptionThread = NoSub | SubPending | SubThread ThreadId +data SubscriptionThread = NoSub | SubPending | SubThread (Weak ThreadId) data Sub = Sub { subThread :: SubscriptionThread, From 7736ef857673a2a294ac5c765cbc2762376c0a59 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Wed, 8 Jun 2022 09:08:34 +0100 Subject: [PATCH 2/2] v2.2.1 --- package.yaml | 2 +- simplexmq.cabal | 2 +- src/Simplex/Messaging/Transport.hs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/package.yaml b/package.yaml index 4f05628ea..897122846 100644 --- a/package.yaml +++ b/package.yaml @@ -1,5 +1,5 @@ name: simplexmq -version: 2.2.0 +version: 2.2.1 synopsis: SimpleXMQ message broker description: | This package includes <./docs/Simplex-Messaging-Server.html server>, diff --git a/simplexmq.cabal b/simplexmq.cabal index 5c992e587..dcc9597d8 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -5,7 +5,7 @@ cabal-version: 1.12 -- see: https://github.com/sol/hpack name: simplexmq -version: 2.2.0 +version: 2.2.1 synopsis: SimpleXMQ message broker description: This package includes <./docs/Simplex-Messaging-Server.html server>, <./docs/Simplex-Messaging-Client.html client> and diff --git a/src/Simplex/Messaging/Transport.hs b/src/Simplex/Messaging/Transport.hs index 5971c9314..6d697b07f 100644 --- a/src/Simplex/Messaging/Transport.hs +++ b/src/Simplex/Messaging/Transport.hs @@ -96,7 +96,7 @@ supportedSMPVersions :: VersionRange supportedSMPVersions = mkVersionRange 1 1 simplexMQVersion :: String -simplexMQVersion = "2.2.0" +simplexMQVersion = "2.2.1" -- * Transport connection class