diff --git a/CHANGELOG.md b/CHANGELOG.md index b5792195d..f0093ba92 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,18 @@ +# 5.7.3 + +SMP/NTF protocol: +- add ALPN for handshake version negotiation, similar to XFTP (to preserve backwards compatibility with the old clients). +- upgrade clients to versions v7/v2 of the protocols. + +SMP server: +- faster responses to subscription requests. + +XFTP client: +- fix network exception during file download treated as permanent file error. + +SMP agent: +- do not report subscription timeouts while client is offline. + # 5.7.2 SMP agent: diff --git a/package.yaml b/package.yaml index 7a536f7bf..6d63ad488 100644 --- a/package.yaml +++ b/package.yaml @@ -1,5 +1,5 @@ name: simplexmq -version: 5.7.2.0 +version: 5.7.3.1 synopsis: SimpleXMQ message broker description: | This package includes <./docs/Simplex-Messaging-Server.html server>, diff --git a/simplexmq.cabal b/simplexmq.cabal index aa008f600..494602941 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -5,7 +5,7 @@ cabal-version: 1.12 -- see: https://github.com/sol/hpack name: simplexmq -version: 5.7.2.0 +version: 5.7.3.1 synopsis: SimpleXMQ message broker description: This package includes <./docs/Simplex-Messaging-Server.html server>, <./docs/Simplex-Messaging-Client.html client> and diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 4195bffba..156f5da18 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -53,7 +53,7 @@ import Data.Either (fromRight, partitionEithers) import Data.Functor (($>)) import Data.Int (Int64) import qualified Data.IntMap.Strict as IM -import Data.List (intercalate) +import Data.List (intercalate, mapAccumR) import Data.List.NonEmpty (NonEmpty (..)) import qualified Data.List.NonEmpty as L import qualified Data.Map.Strict as M @@ -429,7 +429,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do hPutStrLn h "AUTH" runClientTransport :: Transport c => THandleSMP c 'TServer -> M () -runClientTransport th@THandle {params = thParams@THandleParams {thVersion, sessionId}} = do +runClientTransport h@THandle {params = thParams@THandleParams {thVersion, sessionId}} = do q <- asks $ tbqSize . config ts <- liftIO getSystemTime active <- asks clients @@ -440,11 +440,12 @@ runClientTransport th@THandle {params = thParams@THandleParams {thVersion, sessi pure new s <- asks server expCfg <- asks $ inactiveClientExpiration . config + th <- newMVar h -- put TH under a fair lock to interleave messages and command responses labelMyThread . B.unpack $ "client $" <> encode sessionId - raceAny_ ([liftIO $ send th c, client thParams c s, receive th c] <> disconnectThread_ c expCfg) + raceAny_ ([liftIO $ send th c, liftIO $ sendMsg th c, client thParams c s, receive h c] <> disconnectThread_ c expCfg) `finally` clientDisconnected c where - disconnectThread_ c (Just expCfg) = [liftIO $ disconnectTransport th (rcvActiveAt c) (sndActiveAt c) expCfg (noSubscriptions c)] + disconnectThread_ c (Just expCfg) = [liftIO $ disconnectTransport h (rcvActiveAt c) (sndActiveAt c) expCfg (noSubscriptions c)] disconnectThread_ _ _ = [] noSubscriptions c = atomically $ (&&) <$> TM.null (subscriptions c) <*> TM.null (ntfSubscriptions c) @@ -477,10 +478,10 @@ cancelSub sub = _ -> return () receive :: Transport c => THandleSMP c 'TServer -> Client -> M () -receive th@THandle {params = THandleParams {thAuth}} Client {rcvQ, sndQ, rcvActiveAt, sessionId} = do +receive h@THandle {params = THandleParams {thAuth}} Client {rcvQ, sndQ, rcvActiveAt, sessionId} = do labelMyThread . B.unpack $ "client $" <> encode sessionId <> " receive" forever $ do - ts <- L.toList <$> liftIO (tGet th) + ts <- L.toList <$> liftIO (tGet h) atomically . writeTVar rcvActiveAt =<< liftIO getSystemTime (errs, cmds) <- partitionEithers <$> mapM cmdAction ts write sndQ errs @@ -497,20 +498,41 @@ receive th@THandle {params = THandleParams {thAuth}} Client {rcvQ, sndQ, rcvActi VRFailed -> Left (corrId, entId, ERR AUTH) write q = mapM_ (atomically . writeTBQueue q) . L.nonEmpty -send :: Transport c => THandleSMP c 'TServer -> Client -> IO () -send h@THandle {params} Client {sndQ, sessionId, sndActiveAt} = do +send :: Transport c => MVar (THandleSMP c 'TServer) -> Client -> IO () +send th c@Client {sndQ, msgQ, sessionId} = do labelMyThread . B.unpack $ "client $" <> encode sessionId <> " send" - forever $ do - ts <- atomically $ L.sortWith tOrder <$> readTBQueue sndQ - -- TODO we can authorize responses as well - void . liftIO . tPut h $ L.map (\t -> Right (Nothing, encodeTransmission params t)) ts - atomically . writeTVar sndActiveAt =<< liftIO getSystemTime + forever $ atomically (readTBQueue sndQ) >>= sendTransmissions where - tOrder :: Transmission BrokerMsg -> Int - tOrder (_, _, cmd) = case cmd of - MSG {} -> 0 - NMSG {} -> 0 - _ -> 1 + sendTransmissions :: NonEmpty (Transmission BrokerMsg) -> IO () + sendTransmissions ts + | L.length ts <= 2 = tSend th c ts + | otherwise = do + let (msgs_, ts') = mapAccumR splitMessages [] ts + -- If the request had batched subscriptions (L.length ts > 2) + -- this will reply OK to all SUBs in the first batched transmission, + -- to reduce client timeouts. + tSend th c ts' + -- After that all messages will be sent in separate transmissions, + -- without any client response timeouts, and allowing them to interleave + -- with other requests responses. + mapM_ (atomically . writeTBQueue msgQ) $ L.nonEmpty msgs_ + where + splitMessages :: [Transmission BrokerMsg] -> Transmission BrokerMsg -> ([Transmission BrokerMsg], Transmission BrokerMsg) + splitMessages msgs t@(corrId, entId, cmd) = case cmd of + -- replace MSG response with OK, accumulating MSG in a separate list. + MSG {} -> ((CorrId "", entId, cmd) : msgs, (corrId, entId, OK)) + _ -> (msgs, t) + +sendMsg :: Transport c => MVar (THandleSMP c 'TServer) -> Client -> IO () +sendMsg th c@Client {msgQ, sessionId} = do + labelMyThread . B.unpack $ "client $" <> encode sessionId <> " sendMsg" + forever $ atomically (readTBQueue msgQ) >>= mapM_ (\t -> tSend th c [t]) + +tSend :: Transport c => MVar (THandleSMP c 'TServer) -> Client -> NonEmpty (Transmission BrokerMsg) -> IO () +tSend th Client {sndActiveAt} ts = do + withMVar th $ \h@THandle {params} -> + void . tPut h $ L.map (\t -> Right (Nothing, encodeTransmission params t)) ts + atomically . writeTVar sndActiveAt =<< liftIO getSystemTime disconnectTransport :: Transport c => THandle v c 'TServer -> TVar SystemTime -> TVar SystemTime -> ExpirationConfig -> IO Bool -> IO () disconnectTransport THandle {connection, params = THandleParams {sessionId}} rcvActiveAt sndActiveAt expCfg noSubscriptions = do @@ -1085,9 +1107,10 @@ saveServerMessages keepMsgs = asks (storeMsgsFile . config) >>= mapM_ saveMessag >>= mapM_ (B.hPutStrLn h . strEncode . MLRv3 rId) restoreServerMessages :: M Int -restoreServerMessages = asks (storeMsgsFile . config) >>= \case - Just f -> ifM (doesFileExist f) (restoreMessages f) (pure 0) - Nothing -> pure 0 +restoreServerMessages = + asks (storeMsgsFile . config) >>= \case + Just f -> ifM (doesFileExist f) (restoreMessages f) (pure 0) + Nothing -> pure 0 where restoreMessages f = do logInfo $ "restoring messages from file " <> T.pack f diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index c9f7e1cb1..845d483b1 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -138,6 +138,7 @@ data Client = Client ntfSubscriptions :: TMap NotifierId (), rcvQ :: TBQueue (NonEmpty (Maybe QueueRec, Transmission Cmd)), sndQ :: TBQueue (NonEmpty (Transmission BrokerMsg)), + msgQ :: TBQueue (NonEmpty (Transmission BrokerMsg)), endThreads :: TVar (IntMap (Weak ThreadId)), endThreadSeq :: TVar Int, thVersion :: VersionSMP, @@ -172,13 +173,14 @@ newClient nextClientId qSize thVersion sessionId createdAt = do ntfSubscriptions <- TM.empty rcvQ <- newTBQueue qSize sndQ <- newTBQueue qSize + msgQ <- newTBQueue qSize endThreads <- newTVar IM.empty endThreadSeq <- newTVar 0 connected <- newTVar True rcvActiveAt <- newTVar createdAt sndActiveAt <- newTVar createdAt proxyClient_ <- newTVar Nothing - return Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, endThreads, endThreadSeq, thVersion, sessionId, connected, createdAt, rcvActiveAt, sndActiveAt, proxyClient_} + return Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, msgQ, endThreads, endThreadSeq, thVersion, sessionId, connected, createdAt, rcvActiveAt, sndActiveAt, proxyClient_} newSubscription :: SubscriptionThread -> STM Sub newSubscription subThread = do diff --git a/tests/AgentTests.hs b/tests/AgentTests.hs index d42fffdd1..7873e1d74 100644 --- a/tests/AgentTests.hs +++ b/tests/AgentTests.hs @@ -27,9 +27,9 @@ import GHC.Stack (withFrozenCallStack) import Network.HTTP.Types (urlEncode) import SMPAgentClient import SMPClient (testKeyHash, testPort, testPort2, testStoreLogFile, withSmpServer, withSmpServerStoreLogOn) -import Simplex.Messaging.Agent.Protocol hiding (MID, CONF, INFO, REQ, SENT) +import Simplex.Messaging.Agent.Protocol hiding (CONF, INFO, MID, REQ, SENT) import qualified Simplex.Messaging.Agent.Protocol as A -import Simplex.Messaging.Crypto.Ratchet (InitialKeys (..), PQEncryption (..), PQSupport (..), pattern IKPQOn, pattern IKPQOff, pattern PQEncOn, pattern PQSupportOn, pattern PQSupportOff) +import Simplex.Messaging.Crypto.Ratchet (InitialKeys (..), PQEncryption (..), PQSupport (..), pattern IKPQOff, pattern IKPQOn, pattern PQEncOn, pattern PQSupportOff, pattern PQSupportOn) import qualified Simplex.Messaging.Crypto.Ratchet as CR import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol (ErrorType (..)) @@ -547,10 +547,10 @@ testResumeDeliveryQuotaExceeded _ alice bob = do bob <#= \case ("", "alice", Msg "message 4") -> True; _ -> False bob #: ("4", "alice", "ACK 7") #> ("4", "alice", OK) inAnyOrder - (tGetAgent alice) - [ \case ("", c, Right (SENT 8)) -> c == "bob"; _ -> False, - \case ("", c, Right QCONT) -> c == "bob"; _ -> False - ] + (tGetAgent alice) + [ \case ("", c, Right (SENT 8)) -> c == "bob"; _ -> False, + \case ("", c, Right QCONT) -> c == "bob"; _ -> False + ] bob <#= \case ("", "alice", Msg "over quota") -> True; _ -> False -- message 8 is skipped because of alice agent sending "QCONT" message bob #: ("5", "alice", "ACK 9") #> ("5", "alice", OK) @@ -580,7 +580,7 @@ enableKEMStr _ = "" pqConnModeStr :: InitialKeys -> ByteString pqConnModeStr (IKNoPQ PQSupportOff) = "" -pqConnModeStr pq = " " <> strEncode pq +pqConnModeStr pq = " " <> strEncode pq sendMessage :: Transport c => (c, ConnId) -> (c, ConnId) -> ByteString -> IO () sendMessage (h1, name1) (h2, name2) msg = do diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index 1561bf7a2..3476c24c9 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -59,10 +59,10 @@ import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import Data.Either (isRight) import Data.Int (Int64) -import Data.List (nub) +import Data.List (find, nub) import Data.List.NonEmpty (NonEmpty) import qualified Data.Map as M -import Data.Maybe (isNothing) +import Data.Maybe (isJust, isNothing) import qualified Data.Set as S import Data.Time.Clock (diffUTCTime, getCurrentTime) import Data.Time.Clock.System (SystemTime (..), getSystemTime) @@ -341,6 +341,9 @@ functionalAPITests t = do skip "faster version of the previous test (200 subscriptions gets very slow with test coverage)" $ it "should subscribe to multiple (6) subscriptions with batching" $ testBatchedSubscriptions 6 3 t + it "should subscribe to multiple connections with pending messages" $ + withSmpServer t $ + testBatchedPendingMessages 10 5 describe "Async agent commands" $ do it "should connect using async agent commands" $ withSmpServer t testAsyncCommands @@ -1546,7 +1549,7 @@ testBatchedSubscriptions :: Int -> Int -> ATransport -> IO () testBatchedSubscriptions nCreate nDel t = withAgentClientsCfgServers2 agentCfg agentCfg initAgentServers2 $ \a b -> do conns <- runServers $ do - conns <- replicateM (nCreate :: Int) $ makeConnection_ PQSupportOff a b + conns <- replicateM nCreate $ makeConnection_ PQSupportOff a b forM_ conns $ \(aId, bId) -> exchangeGreetings_ PQEncOff a bId b aId let (aIds', bIds') = unzip $ take nDel conns delete a bIds' @@ -1605,6 +1608,25 @@ testBatchedSubscriptions nCreate nDel t = killThread t1 pure res +testBatchedPendingMessages :: Int -> Int -> IO () +testBatchedPendingMessages nCreate nMsgs = + withA $ \a -> do + conns <- withB $ \b -> runRight $ do + replicateM nCreate $ makeConnection a b + let msgConns = take nMsgs conns + runRight_ $ forM_ msgConns $ \(_, bId) -> sendMessage a bId SMP.noMsgFlags "hello" + replicateM_ nMsgs $ get a =##> \case ("", cId, SENT _) -> isJust $ find ((cId ==) . snd) msgConns; _ -> False + withB $ \b -> runRight_ $ do + r <- subscribeConnections b $ map fst conns + liftIO $ all isRight r `shouldBe` True + replicateM_ nMsgs $ do + ("", cId, Msg' msgId _ "hello") <- get b + liftIO $ isJust (find ((cId ==) . fst) msgConns) `shouldBe` True + ackMessage b cId msgId Nothing + where + withA = withAgent 1 agentCfg initAgentServers testDB + withB = withAgent 2 agentCfg initAgentServers testDB2 + testAsyncCommands :: IO () testAsyncCommands = withAgentClients2 $ \alice bob -> runRight_ $ do