diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 8df8e0c7d..e82205d62 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -4,6 +4,7 @@ on: push: branches: - master + - stable tags: - "v*" pull_request: diff --git a/CHANGELOG.md b/CHANGELOG.md index bbdcff942..29ae11e15 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.5.1 + +- Fix server subscription logic bug that was leading to memory leak / resource exhaustion in some edge cases. + # 0.5.0 - No changes in SMP server implementation - it is backwards compatible with v0.4.1 diff --git a/README.md b/README.md index 83a48466c..f865e2dd8 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,14 @@ [![GitHub build](https://github.com/simplex-chat/simplexmq/workflows/build/badge.svg)](https://github.com/simplex-chat/simplexmq/actions?query=workflow%3Abuild) [![GitHub release](https://img.shields.io/github/v/release/simplex-chat/simplexmq)](https://github.com/simplex-chat/simplexmq/releases) +📢 **v0.5.1 brings a hotfix to the server's subscription management logic, to apply it log in to your server via SSH and run the following command. If you have store log enabled for your server, information about already established queues will be preserved.** If you're doing a custom installation instead of Linode or DigitalOcean you may have to change the path for binary download. + +```sh +systemctl stop smp-server +curl -L -o /opt/simplex/bin/smp-server https://github.com/simplex-chat/simplexmq/releases/download/v0.5.1/smp-server-ubuntu-20_04-x86-64 +systemctl start smp-server +``` + ## Message broker for unidirectional (simplex) queues SimpleXMQ is a message broker for managing message queues and sending messages over public network. It consists of SMP server, SMP client library and SMP agent that implement [SMP protocol](https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md) for client-server communication and [SMP agent protocol](https://github.com/simplex-chat/simplexmq/blob/master/protocol/agent-protocol.md) to manage duplex connections via simplex queues on multiple SMP servers. diff --git a/apps/smp-server/Main.hs b/apps/smp-server/Main.hs index ecf7800ac..c52f925b4 100644 --- a/apps/smp-server/Main.hs +++ b/apps/smp-server/Main.hs @@ -42,6 +42,7 @@ serverConfig :: ServerConfig serverConfig = ServerConfig { tbqSize = 16, + serverTbqSize = 128, msgQueueQuota = 256, queueIdBytes = 24, msgIdBytes = 24, -- must be at least 24 bytes, it is used as 192-bit nonce for XSalsa20 diff --git a/package.yaml b/package.yaml index 06133b1a8..0da0db01f 100644 --- a/package.yaml +++ b/package.yaml @@ -1,5 +1,5 @@ name: simplexmq -version: 0.5.0 +version: 0.5.1 synopsis: SimpleXMQ message broker description: | This package includes <./docs/Simplex-Messaging-Server.html server>, diff --git a/simplexmq.cabal b/simplexmq.cabal index 7b179f44f..750dc50a4 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -5,7 +5,7 @@ cabal-version: 1.12 -- see: https://github.com/sol/hpack name: simplexmq -version: 0.5.0 +version: 0.5.1 synopsis: SimpleXMQ message broker description: This package includes <./docs/Simplex-Messaging-Server.html server>, <./docs/Simplex-Messaging-Client.html client> and diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index d922a5f15..97e2adcb5 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -96,18 +96,26 @@ runSMPServerBlocking started cfg@ServerConfig {transports} = do (s -> m' ()) -> m' () serverThread s subQ subs clientSubs unsub = forever $ do - atomically updateSubscribers >>= mapM_ unsub + atomically updateSubscribers + >>= fmap join . mapM endPreviousSubscriptions + >>= mapM_ unsub where - updateSubscribers :: STM (Maybe s) + updateSubscribers :: STM (Maybe (QueueId, Client)) updateSubscribers = do (qId, clnt) <- readTBQueue $ subQ s - serverSubs <- readTVar $ subs s - writeTVar (subs s) $ M.insert qId clnt serverSubs - join <$> mapM (endPreviousSubscriptions qId) (M.lookup qId serverSubs) - endPreviousSubscriptions :: QueueId -> Client -> STM (Maybe s) - endPreviousSubscriptions qId c = do - writeTBQueue (sndQ c) (CorrId "", qId, END) - stateTVar (clientSubs c) $ \ss -> (M.lookup qId ss, M.delete qId ss) + let clientToBeNotified = \c' -> + if sameClientSession clnt c' + then pure Nothing + else do + yes <- readTVar $ connected c' + pure $ if yes then Just (qId, c') else Nothing + stateTVar (subs s) (\cs -> (M.lookup qId cs, M.insert qId clnt cs)) + >>= fmap join . mapM clientToBeNotified + endPreviousSubscriptions :: (QueueId, Client) -> m' (Maybe s) + endPreviousSubscriptions (qId, c) = do + void . forkIO . atomically $ + writeTBQueue (sndQ c) (CorrId "", qId, END) + atomically . stateTVar (clientSubs c) $ \ss -> (M.lookup qId ss, M.delete qId ss) runClient :: (Transport c, MonadUnliftIO m, MonadReader Env m) => TProxy c -> c -> m () runClient _ h = do @@ -123,11 +131,23 @@ runClientTransport th@THandle {sessionId} = do c <- atomically $ newClient q sessionId s <- asks server raceAny_ [send th c, client c s, receive th c] - `finally` cancelSubscribers c + `finally` clientDisconnected c -cancelSubscribers :: MonadUnliftIO m => Client -> m () -cancelSubscribers Client {subscriptions} = - readTVarIO subscriptions >>= mapM_ cancelSub +clientDisconnected :: (MonadUnliftIO m, MonadReader Env m) => Client -> m () +clientDisconnected c@Client {subscriptions, connected} = do + atomically $ writeTVar connected False + subs <- readTVarIO subscriptions + mapM_ cancelSub subs + cs <- asks $ subscribers . server + atomically . mapM_ (modifyTVar cs . M.update deleteCurrentClient) $ M.keys subs + where + deleteCurrentClient :: Client -> Maybe Client + deleteCurrentClient c' + | sameClientSession c c' = Nothing + | otherwise = Just c' + +sameClientSession :: Client -> Client -> Bool +sameClientSession Client {sessionId = s} Client {sessionId = s'} = False -- TODO replace with s == s' cancelSub :: MonadUnliftIO m => Sub -> m () cancelSub = \case diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 0783c88ca..a574eb781 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -25,11 +25,12 @@ import System.IO (IOMode (..)) import UnliftIO.STM data ServerConfig = ServerConfig - { tbqSize :: Natural, + { transports :: [(ServiceName, ATransport)], + tbqSize :: Natural, + serverTbqSize :: Natural, msgQueueQuota :: Natural, queueIdBytes :: Int, msgIdBytes :: Int, - transports :: [(ServiceName, ATransport)], storeLog :: Maybe (StoreLog 'ReadMode), blockSize :: Int, serverPrivateKey :: C.PrivateKey 'C.RSA, -- TODO delete @@ -60,7 +61,8 @@ data Client = Client ntfSubscriptions :: TVar (Map NotifierId ()), rcvQ :: TBQueue (Transmission ClientCmd), sndQ :: TBQueue BrokerTransmission, - sessionId :: ByteString + sessionId :: ByteString, + connected :: TVar Bool } data SubscriptionThread = NoSub | SubPending | SubThread ThreadId @@ -84,7 +86,8 @@ newClient qSize sessionId = do ntfSubscriptions <- newTVar M.empty rcvQ <- newTBQueue qSize sndQ <- newTBQueue qSize - return Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId} + connected <- newTVar True + return Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId, connected} newSubscription :: STM Sub newSubscription = do @@ -93,7 +96,7 @@ newSubscription = do newEnv :: forall m. (MonadUnliftIO m, MonadRandom m) => ServerConfig -> m Env newEnv config = do - server <- atomically $ newServer (tbqSize config) + server <- atomically $ newServer (serverTbqSize config) queueStore <- atomically newQueueStore msgStore <- atomically newMsgStore idsDrg <- drgNew >>= newTVarIO diff --git a/src/Simplex/Messaging/Transport.hs b/src/Simplex/Messaging/Transport.hs index 101bf5a6b..5da2a6d00 100644 --- a/src/Simplex/Messaging/Transport.hs +++ b/src/Simplex/Messaging/Transport.hs @@ -320,7 +320,7 @@ major :: SMPVersion -> (Int, Int) major (SMPVersion a b _ _) = (a, b) currentSMPVersion :: SMPVersion -currentSMPVersion = "0.5.0.0" +currentSMPVersion = "0.5.1.0" currentSMPVersionStr :: ByteString currentSMPVersionStr = serializeSMPVersion currentSMPVersion diff --git a/tests/SMPClient.hs b/tests/SMPClient.hs index 9b4221c86..88a4d33f4 100644 --- a/tests/SMPClient.hs +++ b/tests/SMPClient.hs @@ -61,6 +61,7 @@ cfg = ServerConfig { transports = undefined, tbqSize = 1, + serverTbqSize = 1, msgQueueQuota = 4, queueIdBytes = 24, msgIdBytes = 24, diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index 5131757c2..7733d44fe 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -11,7 +11,7 @@ module ServerTests where import Control.Concurrent (ThreadId, killThread) import Control.Concurrent.STM import Control.Exception (SomeException, try) -import Control.Monad.Except (forM_, runExceptT) +import Control.Monad.Except (forM, forM_, runExceptT) import Data.ByteString.Base64 import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B @@ -34,9 +34,10 @@ serverTests t = do describe "SMP queues" $ do describe "NEW and KEY commands, SEND messages" $ testCreateSecure t describe "NEW, OFF and DEL commands, SEND messages" $ testCreateDelete t + describe "Stress test" $ stressTest t describe "SMP messages" $ do describe "duplex communication over 2 SMP connections" $ testDuplex t - describe "switch subscription to another SMP queue" $ testSwitchSub t + describe "switch subscription to another TCP connection" $ testSwitchSub t describe "Store log" $ testWithStoreLog t describe "Timing of AUTH error" $ testTiming t describe "Message notifications" $ testMessageNotifications t @@ -188,6 +189,23 @@ testCreateDelete (ATransport t) = Resp "cdab" _ err10 <- signSendRecv rh rKey ("cdab", rId, "SUB") (err10, ERR AUTH) #== "rejects SUB when deleted" +stressTest :: ATransport -> Spec +stressTest (ATransport t) = + it "should create many queues, disconnect and re-connect" $ + smpTest3 t $ \h1 h2 h3 -> do + (rPub, rKey) <- C.generateSignatureKeyPair 0 C.SEd25519 + (dhPub, _ :: C.PrivateKey 'C.X25519) <- C.generateKeyPair' 0 + rIds <- forM [1 .. 50 :: Int] . const $ do + Resp "" "" (Ids rId _ _) <- signSendRecv h1 rKey ("", "", B.unwords ["NEW", C.serializeKey rPub, C.serializeKey dhPub]) + pure rId + let subscribeQueues h = forM_ rIds $ \rId -> do + Resp "" rId' OK <- signSendRecv h rKey ("", rId, "SUB") + rId' `shouldBe` rId + closeConnection $ connection h1 + subscribeQueues h2 + closeConnection $ connection h2 + subscribeQueues h3 + testDuplex :: ATransport -> Spec testDuplex (ATransport t) = it "should create 2 simplex connections and exchange messages" $