From 5cafd9d5c494cbc2d70c07e43aef31ac92f9883b Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Thu, 9 May 2024 09:20:57 +0100 Subject: [PATCH 1/4] server: more efficient responses to batch subscriptions (#1137) * server: more efficient responses to batch subscriptions * comments * comment * enable tests * LogError --- src/Simplex/Messaging/Server.hs | 32 ++++++++++++++++++-------- tests/AgentTests.hs | 14 +++++------ tests/AgentTests/FunctionalAPITests.hs | 28 +++++++++++++++++++--- 3 files changed, 55 insertions(+), 19 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 5415ebd65..895ba28ae 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -53,7 +53,8 @@ import Data.Either (fromRight, partitionEithers) import Data.Functor (($>)) import Data.Int (Int64) import qualified Data.IntMap.Strict as IM -import Data.List (intercalate) +import Data.List (intercalate, mapAccumR) +import Data.List.NonEmpty (NonEmpty) import qualified Data.List.NonEmpty as L import qualified Data.Map.Strict as M import Data.Maybe (isNothing) @@ -482,16 +483,29 @@ send :: Transport c => THandleSMP c 'TServer -> Client -> IO () send h@THandle {params} Client {sndQ, sessionId, sndActiveAt} = do labelMyThread . B.unpack $ "client $" <> encode sessionId <> " send" forever $ do - ts <- atomically $ L.sortWith tOrder <$> readTBQueue sndQ - -- TODO we can authorize responses as well - void . liftIO . tPut h $ L.map (\t -> Right (Nothing, encodeTransmission params t)) ts + sendTransmissions =<< atomically (readTBQueue sndQ) atomically . writeTVar sndActiveAt =<< liftIO getSystemTime where - tOrder :: Transmission BrokerMsg -> Int - tOrder (_, _, cmd) = case cmd of - MSG {} -> 0 - NMSG {} -> 0 - _ -> 1 + sendTransmissions :: NonEmpty (Transmission BrokerMsg) -> IO () + sendTransmissions ts + | L.length ts <= 2 = tSend ts + | otherwise = do + let (msgs, ts') = mapAccumR splitMessages [] ts + -- If the request had batched subscriptions (L.length ts > 2) + -- this will reply OK to all SUBs in the first batched transmission, + -- to reduce client timeouts. + tSend ts' + -- After that all messages will be sent in separate transmissions, + -- without any client response timeouts. + mapM_ tSend (L.nonEmpty msgs) + where + splitMessages :: [Transmission BrokerMsg] -> Transmission BrokerMsg -> ([Transmission BrokerMsg], Transmission BrokerMsg) + splitMessages msgs t@(corrId, entId, cmd) = case cmd of + -- replace MSG response with OK, accumulating MSG in a separate list. + MSG {} -> ((CorrId "", entId, cmd) : msgs, (corrId, entId, OK)) + _ -> (msgs, t) + tSend :: NonEmpty (Transmission BrokerMsg) -> IO () + tSend = void . tPut h . L.map (\t -> Right (Nothing, encodeTransmission params t)) disconnectTransport :: Transport c => THandle v c 'TServer -> TVar SystemTime -> TVar SystemTime -> ExpirationConfig -> IO Bool -> IO () disconnectTransport THandle {connection, params = THandleParams {sessionId}} rcvActiveAt sndActiveAt expCfg noSubscriptions = do diff --git a/tests/AgentTests.hs b/tests/AgentTests.hs index b890c2c00..32610b54e 100644 --- a/tests/AgentTests.hs +++ b/tests/AgentTests.hs @@ -27,9 +27,9 @@ import GHC.Stack (withFrozenCallStack) import Network.HTTP.Types (urlEncode) import SMPAgentClient import SMPClient (testKeyHash, testPort, testPort2, testStoreLogFile, withSmpServer, withSmpServerStoreLogOn) -import Simplex.Messaging.Agent.Protocol hiding (MID, CONF, INFO, REQ) +import Simplex.Messaging.Agent.Protocol hiding (CONF, INFO, MID, REQ) import qualified Simplex.Messaging.Agent.Protocol as A -import Simplex.Messaging.Crypto.Ratchet (InitialKeys (..), PQEncryption (..), PQSupport (..), pattern IKPQOn, pattern IKPQOff, pattern PQEncOn, pattern PQSupportOn, pattern PQSupportOff) +import Simplex.Messaging.Crypto.Ratchet (InitialKeys (..), PQEncryption (..), PQSupport (..), pattern IKPQOff, pattern IKPQOn, pattern PQEncOn, pattern PQSupportOff, pattern PQSupportOn) import qualified Simplex.Messaging.Crypto.Ratchet as CR import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol (ErrorType (..)) @@ -547,10 +547,10 @@ testResumeDeliveryQuotaExceeded _ alice bob = do bob <#= \case ("", "alice", Msg "message 4") -> True; _ -> False bob #: ("4", "alice", "ACK 7") #> ("4", "alice", OK) inAnyOrder - (tGetAgent alice) - [ \case ("", c, Right (SENT 8)) -> c == "bob"; _ -> False, - \case ("", c, Right QCONT) -> c == "bob"; _ -> False - ] + (tGetAgent alice) + [ \case ("", c, Right (SENT 8)) -> c == "bob"; _ -> False, + \case ("", c, Right QCONT) -> c == "bob"; _ -> False + ] bob <#= \case ("", "alice", Msg "over quota") -> True; _ -> False -- message 8 is skipped because of alice agent sending "QCONT" message bob #: ("5", "alice", "ACK 9") #> ("5", "alice", OK) @@ -580,7 +580,7 @@ enableKEMStr _ = "" pqConnModeStr :: InitialKeys -> ByteString pqConnModeStr (IKNoPQ PQSupportOff) = "" -pqConnModeStr pq = " " <> strEncode pq +pqConnModeStr pq = " " <> strEncode pq sendMessage :: Transport c => (c, ConnId) -> (c, ConnId) -> ByteString -> IO () sendMessage (h1, name1) (h2, name2) msg = do diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index cdcf5baed..59e433ea5 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -58,10 +58,10 @@ import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import Data.Either (isRight) import Data.Int (Int64) -import Data.List (nub) +import Data.List (find, nub) import Data.List.NonEmpty (NonEmpty) import qualified Data.Map as M -import Data.Maybe (isNothing) +import Data.Maybe (isJust, isNothing) import qualified Data.Set as S import Data.Time.Clock (diffUTCTime, getCurrentTime) import Data.Time.Clock.System (SystemTime (..), getSystemTime) @@ -337,6 +337,9 @@ functionalAPITests t = do skip "faster version of the previous test (200 subscriptions gets very slow with test coverage)" $ it "should subscribe to multiple (6) subscriptions with batching" $ testBatchedSubscriptions 6 3 t + it "should subscribe to multiple connections with pending messages" $ + withSmpServer t $ + testBatchedPendingMessages 10 5 describe "Async agent commands" $ do it "should connect using async agent commands" $ withSmpServer t testAsyncCommands @@ -1534,7 +1537,7 @@ testBatchedSubscriptions :: Int -> Int -> ATransport -> IO () testBatchedSubscriptions nCreate nDel t = withAgentClientsCfgServers2 agentCfg agentCfg initAgentServers2 $ \a b -> do conns <- runServers $ do - conns <- replicateM (nCreate :: Int) $ makeConnection_ PQSupportOff a b + conns <- replicateM nCreate $ makeConnection_ PQSupportOff a b forM_ conns $ \(aId, bId) -> exchangeGreetings_ PQEncOff a bId b aId let (aIds', bIds') = unzip $ take nDel conns delete a bIds' @@ -1593,6 +1596,25 @@ testBatchedSubscriptions nCreate nDel t = killThread t1 pure res +testBatchedPendingMessages :: Int -> Int -> IO () +testBatchedPendingMessages nCreate nMsgs = + withA $ \a -> do + conns <- withB $ \b -> runRight $ do + replicateM nCreate $ makeConnection a b + let msgConns = take nMsgs conns + runRight_ $ forM_ msgConns $ \(_, bId) -> sendMessage a bId SMP.noMsgFlags "hello" + replicateM_ nMsgs $ get a =##> \case ("", cId, SENT _) -> isJust $ find ((cId ==) . snd) msgConns; _ -> False + withB $ \b -> runRight_ $ do + r <- subscribeConnections b $ map fst conns + liftIO $ all isRight r `shouldBe` True + replicateM_ nMsgs $ do + ("", cId, Msg' msgId _ "hello") <- get b + liftIO $ isJust (find ((cId ==) . fst) msgConns) `shouldBe` True + ackMessage b cId msgId Nothing + where + withA = withAgent 1 agentCfg initAgentServers testDB + withB = withAgent 2 agentCfg initAgentServers testDB2 + testAsyncCommands :: IO () testAsyncCommands = withAgentClients2 $ \alice bob -> runRight_ $ do From dc111437fda043b4921cf31f6e402a9644ad3324 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Fri, 10 May 2024 15:15:43 +0100 Subject: [PATCH 2/4] 5.7.3.0 --- package.yaml | 2 +- simplexmq.cabal | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/package.yaml b/package.yaml index 7a536f7bf..093293e80 100644 --- a/package.yaml +++ b/package.yaml @@ -1,5 +1,5 @@ name: simplexmq -version: 5.7.2.0 +version: 5.7.3.0 synopsis: SimpleXMQ message broker description: | This package includes <./docs/Simplex-Messaging-Server.html server>, diff --git a/simplexmq.cabal b/simplexmq.cabal index 7539d675a..808e52ae0 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -5,7 +5,7 @@ cabal-version: 1.12 -- see: https://github.com/sol/hpack name: simplexmq -version: 5.7.2.0 +version: 5.7.3.0 synopsis: SimpleXMQ message broker description: This package includes <./docs/Simplex-Messaging-Server.html server>, <./docs/Simplex-Messaging-Client.html client> and From 727fd8b8f5eed729ee82f25f831e86df502b27b8 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Fri, 10 May 2024 22:19:11 +0100 Subject: [PATCH 3/4] server: more efficient response to batched subscriptions (#1141) * server: more efficient response to batched subscriptions * add sndMsgQ for interleaving messages with replies * remove redundant liftIO * refactor * refactor2 * rename * fix * diff * remove comment * remove comment --------- Co-authored-by: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com> --- src/Simplex/Messaging/Server.hs | 50 +++++++++++++++---------- src/Simplex/Messaging/Server/Env/STM.hs | 4 +- 2 files changed, 33 insertions(+), 21 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 895ba28ae..85beccd6b 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -411,7 +411,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do hPutStrLn h "AUTH" runClientTransport :: Transport c => THandleSMP c 'TServer -> M () -runClientTransport th@THandle {params = THandleParams {thVersion, sessionId}} = do +runClientTransport h@THandle {params = THandleParams {thVersion, sessionId}} = do q <- asks $ tbqSize . config ts <- liftIO getSystemTime active <- asks clients @@ -422,11 +422,12 @@ runClientTransport th@THandle {params = THandleParams {thVersion, sessionId}} = pure new s <- asks server expCfg <- asks $ inactiveClientExpiration . config + th <- newMVar h -- put TH under a fair lock to interleave messages and command responses labelMyThread . B.unpack $ "client $" <> encode sessionId - raceAny_ ([liftIO $ send th c, client c s, receive th c] <> disconnectThread_ c expCfg) + raceAny_ ([liftIO $ send th c, liftIO $ sendMsg th c, client c s, receive h c] <> disconnectThread_ c expCfg) `finally` clientDisconnected c where - disconnectThread_ c (Just expCfg) = [liftIO $ disconnectTransport th (rcvActiveAt c) (sndActiveAt c) expCfg (noSubscriptions c)] + disconnectThread_ c (Just expCfg) = [liftIO $ disconnectTransport h (rcvActiveAt c) (sndActiveAt c) expCfg (noSubscriptions c)] disconnectThread_ _ _ = [] noSubscriptions c = atomically $ (&&) <$> TM.null (subscriptions c) <*> TM.null (ntfSubscriptions c) @@ -459,10 +460,10 @@ cancelSub sub = _ -> return () receive :: Transport c => THandleSMP c 'TServer -> Client -> M () -receive th@THandle {params = THandleParams {thAuth}} Client {rcvQ, sndQ, rcvActiveAt, sessionId} = do +receive h@THandle {params = THandleParams {thAuth}} Client {rcvQ, sndQ, rcvActiveAt, sessionId} = do labelMyThread . B.unpack $ "client $" <> encode sessionId <> " receive" forever $ do - ts <- L.toList <$> liftIO (tGet th) + ts <- L.toList <$> liftIO (tGet h) atomically . writeTVar rcvActiveAt =<< liftIO getSystemTime as <- partitionEithers <$> mapM cmdAction ts write sndQ $ fst as @@ -479,33 +480,41 @@ receive th@THandle {params = THandleParams {thAuth}} Client {rcvQ, sndQ, rcvActi VRFailed -> Left (corrId, queueId, ERR AUTH) write q = mapM_ (atomically . writeTBQueue q) . L.nonEmpty -send :: Transport c => THandleSMP c 'TServer -> Client -> IO () -send h@THandle {params} Client {sndQ, sessionId, sndActiveAt} = do +send :: Transport c => MVar (THandleSMP c 'TServer) -> Client -> IO () +send th c@Client {sndQ, msgQ, sessionId} = do labelMyThread . B.unpack $ "client $" <> encode sessionId <> " send" - forever $ do - sendTransmissions =<< atomically (readTBQueue sndQ) - atomically . writeTVar sndActiveAt =<< liftIO getSystemTime + forever $ atomically (readTBQueue sndQ) >>= sendTransmissions where sendTransmissions :: NonEmpty (Transmission BrokerMsg) -> IO () sendTransmissions ts - | L.length ts <= 2 = tSend ts + | L.length ts <= 2 = tSend th c ts | otherwise = do - let (msgs, ts') = mapAccumR splitMessages [] ts + let (msgs_, ts') = mapAccumR splitMessages [] ts -- If the request had batched subscriptions (L.length ts > 2) -- this will reply OK to all SUBs in the first batched transmission, -- to reduce client timeouts. - tSend ts' + tSend th c ts' -- After that all messages will be sent in separate transmissions, - -- without any client response timeouts. - mapM_ tSend (L.nonEmpty msgs) + -- without any client response timeouts, and allowing them to interleave + -- with other requests responses. + mapM_ (atomically . writeTBQueue msgQ) $ L.nonEmpty msgs_ where splitMessages :: [Transmission BrokerMsg] -> Transmission BrokerMsg -> ([Transmission BrokerMsg], Transmission BrokerMsg) splitMessages msgs t@(corrId, entId, cmd) = case cmd of -- replace MSG response with OK, accumulating MSG in a separate list. MSG {} -> ((CorrId "", entId, cmd) : msgs, (corrId, entId, OK)) _ -> (msgs, t) - tSend :: NonEmpty (Transmission BrokerMsg) -> IO () - tSend = void . tPut h . L.map (\t -> Right (Nothing, encodeTransmission params t)) + +sendMsg :: Transport c => MVar (THandleSMP c 'TServer) -> Client -> IO () +sendMsg th c@Client {msgQ, sessionId} = do + labelMyThread . B.unpack $ "client $" <> encode sessionId <> " sendMsg" + forever $ atomically (readTBQueue msgQ) >>= mapM_ (\t -> tSend th c [t]) + +tSend :: Transport c => MVar (THandleSMP c 'TServer) -> Client -> NonEmpty (Transmission BrokerMsg) -> IO () +tSend th Client {sndActiveAt} ts = do + withMVar th $ \h@THandle {params} -> + void . tPut h $ L.map (\t -> Right (Nothing, encodeTransmission params t)) ts + atomically . writeTVar sndActiveAt =<< liftIO getSystemTime disconnectTransport :: Transport c => THandle v c 'TServer -> TVar SystemTime -> TVar SystemTime -> ExpirationConfig -> IO Bool -> IO () disconnectTransport THandle {connection, params = THandleParams {sessionId}} rcvActiveAt sndActiveAt expCfg noSubscriptions = do @@ -989,9 +998,10 @@ saveServerMessages keepMsgs = asks (storeMsgsFile . config) >>= mapM_ saveMessag >>= mapM_ (B.hPutStrLn h . strEncode . MLRv3 rId) restoreServerMessages :: M Int -restoreServerMessages = asks (storeMsgsFile . config) >>= \case - Just f -> ifM (doesFileExist f) (restoreMessages f) (pure 0) - Nothing -> pure 0 +restoreServerMessages = + asks (storeMsgsFile . config) >>= \case + Just f -> ifM (doesFileExist f) (restoreMessages f) (pure 0) + Nothing -> pure 0 where restoreMessages f = do logInfo $ "restoring messages from file " <> T.pack f diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 6794ad979..bd8262f07 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -128,6 +128,7 @@ data Client = Client ntfSubscriptions :: TMap NotifierId (), rcvQ :: TBQueue (NonEmpty (Maybe QueueRec, Transmission Cmd)), sndQ :: TBQueue (NonEmpty (Transmission BrokerMsg)), + msgQ :: TBQueue (NonEmpty (Transmission BrokerMsg)), endThreads :: TVar (IntMap (Weak ThreadId)), endThreadSeq :: TVar Int, thVersion :: VersionSMP, @@ -161,12 +162,13 @@ newClient nextClientId qSize thVersion sessionId createdAt = do ntfSubscriptions <- TM.empty rcvQ <- newTBQueue qSize sndQ <- newTBQueue qSize + msgQ <- newTBQueue qSize endThreads <- newTVar IM.empty endThreadSeq <- newTVar 0 connected <- newTVar True rcvActiveAt <- newTVar createdAt sndActiveAt <- newTVar createdAt - return Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, endThreads, endThreadSeq, thVersion, sessionId, connected, createdAt, rcvActiveAt, sndActiveAt} + return Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, msgQ, endThreads, endThreadSeq, thVersion, sessionId, connected, createdAt, rcvActiveAt, sndActiveAt} newSubscription :: SubscriptionThread -> STM Sub newSubscription subThread = do From acc7faea11453e291393440816a060098faabf86 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Fri, 10 May 2024 22:24:00 +0100 Subject: [PATCH 4/4] 5.7.3.1 --- CHANGELOG.md | 15 +++++++++++++++ package.yaml | 2 +- simplexmq.cabal | 2 +- 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b5792195d..f0093ba92 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,18 @@ +# 5.7.3 + +SMP/NTF protocol: +- add ALPN for handshake version negotiation, similar to XFTP (to preserve backwards compatibility with the old clients). +- upgrade clients to versions v7/v2 of the protocols. + +SMP server: +- faster responses to subscription requests. + +XFTP client: +- fix network exception during file download treated as permanent file error. + +SMP agent: +- do not report subscription timeouts while client is offline. + # 5.7.2 SMP agent: diff --git a/package.yaml b/package.yaml index 093293e80..6d63ad488 100644 --- a/package.yaml +++ b/package.yaml @@ -1,5 +1,5 @@ name: simplexmq -version: 5.7.3.0 +version: 5.7.3.1 synopsis: SimpleXMQ message broker description: | This package includes <./docs/Simplex-Messaging-Server.html server>, diff --git a/simplexmq.cabal b/simplexmq.cabal index 808e52ae0..de10da8b8 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -5,7 +5,7 @@ cabal-version: 1.12 -- see: https://github.com/sol/hpack name: simplexmq -version: 5.7.3.0 +version: 5.7.3.1 synopsis: SimpleXMQ message broker description: This package includes <./docs/Simplex-Messaging-Server.html server>, <./docs/Simplex-Messaging-Client.html client> and