From 5f7fe8b0dcbaf89b554659a1b645586547f33aff Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Fri, 17 Dec 2021 12:28:48 +0000 Subject: [PATCH] remove client from servers subscribers map after client disconnection (#228) Co-authored-by: Efim Poberezkin <8711996+efim-poberezkin@users.noreply.github.com> --- .github/workflows/build.yml | 1 + README.md | 6 ++++ apps/smp-server/Main.hs | 1 + src/Simplex/Messaging/Server.hs | 47 ++++++++++++++++++------- src/Simplex/Messaging/Server/Env/STM.hs | 22 ++++++++---- tests/SMPClient.hs | 1 + tests/ServerTests.hs | 19 +++++++++- 7 files changed, 76 insertions(+), 21 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 8df8e0c7d..e82205d62 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -4,6 +4,7 @@ on: push: branches: - master + - stable tags: - "v*" pull_request: diff --git a/README.md b/README.md index d91ad3413..4d722676a 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,12 @@ [![GitHub build](https://github.com/simplex-chat/simplexmq/workflows/build/badge.svg)](https://github.com/simplex-chat/simplexmq/actions?query=workflow%3Abuild) [![GitHub release](https://img.shields.io/github/v/release/simplex-chat/simplexmq)](https://github.com/simplex-chat/simplexmq/releases) +📢 **v0.5.1 brings a hotfix to the server's subscription management logic, to apply it log in to your server via SSH and run the following command. If you have store log enabled for your server, information about already established queues will be preserved.** If you're doing a custom installation instead of Linode or DigitalOcean you may have to change the path for binary download. + +```sh +systemctl stop smp-server && curl -L -o /opt/simplex/bin/smp-server https://github.com/simplex-chat/simplexmq/releases/download/v0.5.1/smp-server-ubuntu-20_04-x86-64 && chmod +x /opt/simplex/bin/smp-server && systemctl start smp-server +``` + ## Message broker for unidirectional (simplex) queues SimpleXMQ is a message broker for managing message queues and sending messages over public network. It consists of SMP server, SMP client library and SMP agent that implement [SMP protocol](https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md) for client-server communication and [SMP agent protocol](https://github.com/simplex-chat/simplexmq/blob/master/protocol/agent-protocol.md) to manage duplex connections via simplex queues on multiple SMP servers. diff --git a/apps/smp-server/Main.hs b/apps/smp-server/Main.hs index b71bed5d4..0952ad656 100644 --- a/apps/smp-server/Main.hs +++ b/apps/smp-server/Main.hs @@ -41,6 +41,7 @@ serverConfig :: ServerConfig serverConfig = ServerConfig { tbqSize = 16, + serverTbqSize = 128, msgQueueQuota = 256, queueIdBytes = 12, msgIdBytes = 6, diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 8c475f9d1..5416e263e 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -79,13 +79,25 @@ runSMPServerBlocking started cfg@ServerConfig {transports} = do runServer (tcpPort, ATransport t) = runTransportServer started tcpPort (runClient t) serverThread :: MonadUnliftIO m' => Server -> m' () - serverThread Server {subscribedQ, subscribers} = forever . atomically $ do - (rId, clnt) <- readTBQueue subscribedQ - cs <- readTVar subscribers - case M.lookup rId cs of - Just Client {rcvQ} -> writeTBQueue rcvQ (CorrId B.empty, rId, Cmd SBroker END) - Nothing -> return () - writeTVar subscribers $ M.insert rId clnt cs + serverThread Server {subscribedQ, subscribers} = forever $ do + atomically updateSubscribers >>= \case + Just (rId, Client {rcvQ}) -> + void . forkIO . atomically $ + writeTBQueue rcvQ (CorrId "", rId, Cmd SBroker END) + _ -> pure () + where + updateSubscribers :: STM (Maybe (RecipientId, Client)) + updateSubscribers = do + (rId, c) <- readTBQueue subscribedQ + stateTVar subscribers (\cs -> (M.lookup rId cs, M.insert rId c cs)) >>= \case + Just c' -> clientToBeNotified rId c c' + _ -> pure Nothing + clientToBeNotified :: RecipientId -> Client -> Client -> STM (Maybe (RecipientId, Client)) + clientToBeNotified rId c c'@Client {connected} + | clientId c /= clientId c' = do + yes <- readTVar connected + pure $ if yes then Just (rId, c') else Nothing + | otherwise = pure Nothing runClient :: (Transport c, MonadUnliftIO m, MonadReader Env m) => TProxy c -> c -> m () runClient _ h = do @@ -98,14 +110,23 @@ runClient _ h = do runClientTransport :: (Transport c, MonadUnliftIO m, MonadReader Env m) => THandle c -> m () runClientTransport th = do q <- asks $ tbqSize . config - c <- atomically $ newClient q s <- asks server + c <- atomically $ newClient s q raceAny_ [send th c, client c s, receive th c] - `finally` cancelSubscribers c + `finally` clientDisconnected c -cancelSubscribers :: MonadUnliftIO m => Client -> m () -cancelSubscribers Client {subscriptions} = - readTVarIO subscriptions >>= mapM_ cancelSub +clientDisconnected :: (MonadUnliftIO m, MonadReader Env m) => Client -> m () +clientDisconnected c@Client {subscriptions, connected} = do + atomically $ writeTVar connected False + subs <- readTVarIO subscriptions + mapM_ cancelSub subs + cs <- asks $ subscribers . server + atomically . mapM_ (modifyTVar cs . M.update deleteCurrentClient) $ M.keys subs + where + deleteCurrentClient :: Client -> Maybe Client + deleteCurrentClient c' + | clientId c == clientId c' = Nothing + | otherwise = Just c' cancelSub :: MonadUnliftIO m => Sub -> m () cancelSub = \case @@ -326,7 +347,7 @@ client clnt@Client {subscriptions, rcvQ, sndQ} Server {subscribedQ} = subscriber :: MsgQueue -> m () subscriber q = atomically $ do msg <- peekMsg q - writeTBQueue sndQ $ mkResp (CorrId B.empty) rId (msgCmd msg) + writeTBQueue sndQ $ mkResp (CorrId "") rId (msgCmd msg) setSub (\s -> s {subThread = NoSub}) void setDelivered diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 5c397096b..787270d75 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -6,6 +6,7 @@ module Simplex.Messaging.Server.Env.STM where import Control.Concurrent (ThreadId) +import Control.Concurrent.STM (stateTVar) import Control.Monad.IO.Unlift import Crypto.Random import Data.Map.Strict (Map) @@ -25,6 +26,7 @@ import UnliftIO.STM data ServerConfig = ServerConfig { transports :: [(ServiceName, ATransport)], tbqSize :: Natural, + serverTbqSize :: Natural, msgQueueQuota :: Natural, queueIdBytes :: Int, msgIdBytes :: Int, @@ -46,13 +48,16 @@ data Env = Env data Server = Server { subscribedQ :: TBQueue (RecipientId, Client), - subscribers :: TVar (Map RecipientId Client) + subscribers :: TVar (Map RecipientId Client), + nextClientId :: TVar Natural } data Client = Client { subscriptions :: TVar (Map RecipientId Sub), rcvQ :: TBQueue Transmission, - sndQ :: TBQueue Transmission + sndQ :: TBQueue Transmission, + clientId :: Natural, + connected :: TVar Bool } data SubscriptionThread = NoSub | SubPending | SubThread ThreadId @@ -66,14 +71,17 @@ newServer :: Natural -> STM Server newServer qSize = do subscribedQ <- newTBQueue qSize subscribers <- newTVar M.empty - return Server {subscribedQ, subscribers} + nextClientId <- newTVar 0 + return Server {subscribedQ, subscribers, nextClientId} -newClient :: Natural -> STM Client -newClient qSize = do +newClient :: Server -> Natural -> STM Client +newClient Server {nextClientId} qSize = do subscriptions <- newTVar M.empty rcvQ <- newTBQueue qSize sndQ <- newTBQueue qSize - return Client {subscriptions, rcvQ, sndQ} + clientId <- stateTVar nextClientId $ \i -> (i, i + 1) + connected <- newTVar True + return Client {subscriptions, rcvQ, sndQ, clientId, connected} newSubscription :: STM Sub newSubscription = do @@ -82,7 +90,7 @@ newSubscription = do newEnv :: forall m. (MonadUnliftIO m, MonadRandom m) => ServerConfig -> m Env newEnv config = do - server <- atomically $ newServer (tbqSize config) + server <- atomically $ newServer (serverTbqSize config) queueStore <- atomically newQueueStore msgStore <- atomically newMsgStore idsDrg <- drgNew >>= newTVarIO diff --git a/tests/SMPClient.hs b/tests/SMPClient.hs index 58a5d5163..e3110d19e 100644 --- a/tests/SMPClient.hs +++ b/tests/SMPClient.hs @@ -60,6 +60,7 @@ cfg = ServerConfig { transports = undefined, tbqSize = 1, + serverTbqSize = 1, msgQueueQuota = 4, queueIdBytes = 12, msgIdBytes = 6, diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index a3d93093b..fe6aedbfb 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -11,7 +11,7 @@ module ServerTests where import Control.Concurrent (ThreadId, killThread) import Control.Concurrent.STM import Control.Exception (SomeException, try) -import Control.Monad.Except (forM_, runExceptT) +import Control.Monad.Except (forM, forM_, runExceptT) import Data.ByteString.Base64 import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B @@ -34,6 +34,7 @@ serverTests t = do describe "SMP queues" do describe "NEW and KEY commands, SEND messages" $ testCreateSecure t describe "NEW, OFF and DEL commands, SEND messages" $ testCreateDelete t + describe "Stress test" $ stressTest t describe "SMP messages" do describe "duplex communication over 2 SMP connections" $ testDuplex t describe "switch subscription to another SMP queue" $ testSwitchSub t @@ -179,6 +180,22 @@ testCreateDelete (ATransport t) = Resp "cdab" _ err10 <- signSendRecv rh rKey ("cdab", rId, "SUB") (err10, ERR AUTH) #== "rejects SUB when deleted" +stressTest :: ATransport -> Spec +stressTest (ATransport t) = + it "should create many queues, disconnect and re-connect" $ + smpTest3 t $ \h1 h2 h3 -> do + (rPub, rKey) <- C.generateKeyPair rsaKeySize + rIds <- forM [1 .. 50 :: Int] . const $ do + Resp "" "" (IDS rId _) <- signSendRecv h1 rKey ("", "", "NEW " <> C.serializePubKey rPub) + pure rId + let subscribeQueues h = forM_ rIds $ \rId -> do + Resp "" rId' OK <- signSendRecv h rKey ("", rId, "SUB") + rId' `shouldBe` rId + closeConnection $ connection h1 + subscribeQueues h2 + closeConnection $ connection h2 + subscribeQueues h3 + testDuplex :: ATransport -> Spec testDuplex (ATransport t) = it "should create 2 simplex connections and exchange messages" $