Merge branch 'master' into f/notifications

This commit is contained in:
Evgeny Poberezkin
2022-06-08 09:49:58 +01:00
5 changed files with 9 additions and 6 deletions

View File

@@ -1,5 +1,5 @@
name: simplexmq
version: 2.2.0
version: 2.2.1
synopsis: SimpleXMQ message broker
description: |
This package includes <./docs/Simplex-Messaging-Server.html server>,

View File

@@ -5,7 +5,7 @@ cabal-version: 1.12
-- see: https://github.com/sol/hpack
name: simplexmq
version: 2.2.0
version: 2.2.1
synopsis: SimpleXMQ message broker
description: This package includes <./docs/Simplex-Messaging-Server.html server>,
<./docs/Simplex-Messaging-Client.html client> and

View File

@@ -65,6 +65,7 @@ import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport
import Simplex.Messaging.Transport.Server
import Simplex.Messaging.Util
import System.Mem.Weak (deRefWeak)
import UnliftIO.Concurrent
import UnliftIO.Exception
import UnliftIO.IO
@@ -196,6 +197,7 @@ clientDisconnected c@Client {subscriptions, connected} = do
atomically $ writeTVar connected False
subs <- readTVarIO subscriptions
mapM_ cancelSub subs
atomically $ writeTVar subscriptions M.empty
cs <- asks $ subscribers . server
atomically . mapM_ (\rId -> TM.update deleteCurrentClient rId cs) $ M.keys subs
where
@@ -210,7 +212,7 @@ sameClientSession Client {sessionId} Client {sessionId = s'} = sessionId == s'
cancelSub :: MonadUnliftIO m => TVar Sub -> m ()
cancelSub sub =
readTVarIO sub >>= \case
Sub {subThread = SubThread t} -> killThread t
Sub {subThread = SubThread t} -> liftIO $ deRefWeak t >>= mapM_ killThread
_ -> return ()
receive :: (Transport c, MonadUnliftIO m, MonadReader Env m) => THandle c -> Client -> m ()
@@ -536,7 +538,7 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri
forkSub :: m ()
forkSub = do
atomically . modifyTVar sub $ \s -> s {subThread = SubPending}
t <- forkIO subscriber
t <- mkWeakThreadId =<< forkIO subscriber
atomically . modifyTVar sub $ \case
s@Sub {subThread = SubPending} -> s {subThread = SubThread t}
s -> s

View File

@@ -32,6 +32,7 @@ import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport (ATransport)
import Simplex.Messaging.Transport.Server (loadFingerprint, loadTLSServerParams)
import System.IO (IOMode (..))
import System.Mem.Weak (Weak)
import UnliftIO.STM
data ServerConfig = ServerConfig
@@ -113,7 +114,7 @@ data ServerStats = ServerStats
fromTime :: TVar UTCTime
}
data SubscriptionThread = NoSub | SubPending | SubThread ThreadId | ProhibitSub
data SubscriptionThread = NoSub | SubPending | SubThread (Weak ThreadId) | ProhibitSub
data Sub = Sub
{ subThread :: SubscriptionThread,

View File

@@ -96,7 +96,7 @@ supportedSMPVersions :: VersionRange
supportedSMPVersions = mkVersionRange 1 2
simplexMQVersion :: String
simplexMQVersion = "2.2.0"
simplexMQVersion = "2.2.1"
-- * Transport connection class