diff --git a/package.yaml b/package.yaml index 4f05628ea..897122846 100644 --- a/package.yaml +++ b/package.yaml @@ -1,5 +1,5 @@ name: simplexmq -version: 2.2.0 +version: 2.2.1 synopsis: SimpleXMQ message broker description: | This package includes <./docs/Simplex-Messaging-Server.html server>, diff --git a/simplexmq.cabal b/simplexmq.cabal index 99cc897f6..7a4cb46de 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -5,7 +5,7 @@ cabal-version: 1.12 -- see: https://github.com/sol/hpack name: simplexmq -version: 2.2.0 +version: 2.2.1 synopsis: SimpleXMQ message broker description: This package includes <./docs/Simplex-Messaging-Server.html server>, <./docs/Simplex-Messaging-Client.html client> and diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 5b0ee4f5a..b43cad9cb 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -65,6 +65,7 @@ import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport import Simplex.Messaging.Transport.Server import Simplex.Messaging.Util +import System.Mem.Weak (deRefWeak) import UnliftIO.Concurrent import UnliftIO.Exception import UnliftIO.IO @@ -196,6 +197,7 @@ clientDisconnected c@Client {subscriptions, connected} = do atomically $ writeTVar connected False subs <- readTVarIO subscriptions mapM_ cancelSub subs + atomically $ writeTVar subscriptions M.empty cs <- asks $ subscribers . server atomically . mapM_ (\rId -> TM.update deleteCurrentClient rId cs) $ M.keys subs where @@ -210,7 +212,7 @@ sameClientSession Client {sessionId} Client {sessionId = s'} = sessionId == s' cancelSub :: MonadUnliftIO m => TVar Sub -> m () cancelSub sub = readTVarIO sub >>= \case - Sub {subThread = SubThread t} -> killThread t + Sub {subThread = SubThread t} -> liftIO $ deRefWeak t >>= mapM_ killThread _ -> return () receive :: (Transport c, MonadUnliftIO m, MonadReader Env m) => THandle c -> Client -> m () @@ -536,7 +538,7 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri forkSub :: m () forkSub = do atomically . modifyTVar sub $ \s -> s {subThread = SubPending} - t <- forkIO subscriber + t <- mkWeakThreadId =<< forkIO subscriber atomically . modifyTVar sub $ \case s@Sub {subThread = SubPending} -> s {subThread = SubThread t} s -> s diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 35f0dc051..1440486a3 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -32,6 +32,7 @@ import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport (ATransport) import Simplex.Messaging.Transport.Server (loadFingerprint, loadTLSServerParams) import System.IO (IOMode (..)) +import System.Mem.Weak (Weak) import UnliftIO.STM data ServerConfig = ServerConfig @@ -113,7 +114,7 @@ data ServerStats = ServerStats fromTime :: TVar UTCTime } -data SubscriptionThread = NoSub | SubPending | SubThread ThreadId | ProhibitSub +data SubscriptionThread = NoSub | SubPending | SubThread (Weak ThreadId) | ProhibitSub data Sub = Sub { subThread :: SubscriptionThread, diff --git a/src/Simplex/Messaging/Transport.hs b/src/Simplex/Messaging/Transport.hs index 221db1b65..680cc36b4 100644 --- a/src/Simplex/Messaging/Transport.hs +++ b/src/Simplex/Messaging/Transport.hs @@ -96,7 +96,7 @@ supportedSMPVersions :: VersionRange supportedSMPVersions = mkVersionRange 1 2 simplexMQVersion :: String -simplexMQVersion = "2.2.0" +simplexMQVersion = "2.2.1" -- * Transport connection class