diff --git a/CHANGELOG.md b/CHANGELOG.md index 29ae11e15..f455aeda6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.5.2 + +- Fix message delivery logic that blocked delivery of all server messages when server per-queue quota exceeded, making it concurrent per SMP queue, not per server. + # 0.5.1 - Fix server subscription logic bug that was leading to memory leak / resource exhaustion in some edge cases. diff --git a/package.yaml b/package.yaml index ef50b5b27..3e62cc01b 100644 --- a/package.yaml +++ b/package.yaml @@ -1,5 +1,5 @@ name: simplexmq -version: 0.5.1 +version: 0.5.2 synopsis: SimpleXMQ message broker description: | This package includes <./docs/Simplex-Messaging-Server.html server>, diff --git a/simplexmq.cabal b/simplexmq.cabal index c91f87484..7b13ec2ed 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -5,7 +5,7 @@ cabal-version: 1.12 -- see: https://github.com/sol/hpack name: simplexmq -version: 0.5.1 +version: 0.5.2 synopsis: SimpleXMQ message broker description: This package includes <./docs/Simplex-Messaging-Server.html server>, <./docs/Simplex-Messaging-Client.html client> and diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index e30bc236d..093e585f0 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -7,7 +7,6 @@ {-# LANGUAGE KindSignatures #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} -{-# LANGUAGE NumericUnderscores #-} {-# LANGUAGE OverloadedLists #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RankNTypes #-} @@ -376,10 +375,10 @@ sendMessage' c connId msg = enqueueMsg sq = enqueueMessage c connId sq $ A_MSG msg enqueueMessage :: forall m. AgentMonad m => AgentClient -> ConnId -> SndQueue -> AMessage -> m AgentMsgId -enqueueMessage c connId sq@SndQueue {server} aMessage = do +enqueueMessage c connId sq aMessage = do resumeMsgDelivery c connId sq msgId <- storeSentMsg - queuePendingMsgs c connId server [msgId] + queuePendingMsgs c connId sq [msgId] pure $ unId msgId where storeSentMsg :: m InternalId @@ -398,15 +397,16 @@ enqueueMessage c connId sq@SndQueue {server} aMessage = do pure internalId resumeMsgDelivery :: forall m. AgentMonad m => AgentClient -> ConnId -> SndQueue -> m () -resumeMsgDelivery c connId SndQueue {server} = do - unlessM srvDelivering $ - async (runSrvMsgDelivery c server) - >>= atomically . modifyTVar (srvMsgDeliveries c) . M.insert server +resumeMsgDelivery c connId sq@SndQueue {server, sndId} = do + let qKey = (connId, server, sndId) + unlessM (queueDelivering qKey) $ + async (runSmpQueueMsgDelivery c connId sq) + >>= atomically . modifyTVar (smpQueueMsgDeliveries c) . M.insert qKey unlessM connQueued $ withStore (`getPendingMsgs` connId) - >>= queuePendingMsgs c connId server + >>= queuePendingMsgs c connId sq where - srvDelivering = isJust . M.lookup server <$> readTVarIO (srvMsgDeliveries c) + queueDelivering qKey = isJust . M.lookup qKey <$> readTVarIO (smpQueueMsgDeliveries c) connQueued = atomically $ isJust @@ -414,42 +414,42 @@ resumeMsgDelivery c connId SndQueue {server} = do (connMsgsQueued c) (\m -> (M.lookup connId m, M.insert connId True m)) -queuePendingMsgs :: AgentMonad m => AgentClient -> ConnId -> SMPServer -> [InternalId] -> m () -queuePendingMsgs c connId server msgIds = atomically $ do - q <- getPendingMsgQ c server - mapM_ (writeTQueue q . PendingMsg connId) msgIds +queuePendingMsgs :: AgentMonad m => AgentClient -> ConnId -> SndQueue -> [InternalId] -> m () +queuePendingMsgs c connId sq msgIds = atomically $ do + q <- getPendingMsgQ c connId sq + mapM_ (writeTQueue q) msgIds -getPendingMsgQ :: AgentClient -> SMPServer -> STM (TQueue PendingMsg) -getPendingMsgQ c srv = do - maybe newMsgQueue pure . M.lookup srv =<< readTVar (srvMsgQueues c) +getPendingMsgQ :: AgentClient -> ConnId -> SndQueue -> STM (TQueue InternalId) +getPendingMsgQ c connId SndQueue {server, sndId} = do + let qKey = (connId, server, sndId) + maybe (newMsgQueue qKey) pure . M.lookup qKey =<< readTVar (smpQueueMsgQueues c) where - newMsgQueue :: STM (TQueue PendingMsg) - newMsgQueue = do + newMsgQueue qKey = do mq <- newTQueue - modifyTVar (srvMsgQueues c) $ M.insert srv mq + modifyTVar (smpQueueMsgQueues c) $ M.insert qKey mq pure mq -runSrvMsgDelivery :: forall m. AgentMonad m => AgentClient -> SMPServer -> m () -runSrvMsgDelivery c@AgentClient {subQ} srv = do - mq <- atomically $ getPendingMsgQ c srv +runSmpQueueMsgDelivery :: forall m. AgentMonad m => AgentClient -> ConnId -> SndQueue -> m () +runSmpQueueMsgDelivery c@AgentClient {subQ} connId sq = do + mq <- atomically $ getPendingMsgQ c connId sq ri <- asks $ reconnectInterval . config forever $ do - PendingMsg {connId, msgId} <- atomically $ readTQueue mq + msgId <- atomically $ readTQueue mq let mId = unId msgId withStore (\st -> E.try $ getPendingMsgData st connId msgId) >>= \case Left (e :: E.SomeException) -> - notify connId $ MERR mId (INTERNAL $ show e) - Right (sq, rq_, (msgType, msgBody)) -> do + notify $ MERR mId (INTERNAL $ show e) + Right (rq_, (msgType, msgBody)) -> do withRetryInterval ri $ \loop -> do tryError (sendAgentMessage c sq msgBody) >>= \case Left e -> case e of SMP SMP.QUOTA -> loop SMP SMP.AUTH -> case msgType of HELLO_ -> loop - REPLY_ -> notify connId $ ERR e - A_MSG_ -> notify connId $ MERR mId e - SMP {} -> notify connId $ MERR mId e - CMD {} -> notify connId $ MERR mId e + REPLY_ -> notify $ ERR e + A_MSG_ -> notify $ MERR mId e + SMP {} -> notify $ MERR mId e + CMD {} -> notify $ MERR mId e _ -> loop Right () -> do case msgType of @@ -459,15 +459,15 @@ runSrvMsgDelivery c@AgentClient {subQ} srv = do -- party initiating connection Just rq -> do subscribeQueue c rq connId - notify connId CON + notify CON -- party joining connection _ -> createReplyQueue c connId sq - A_MSG_ -> notify connId $ SENT mId + A_MSG_ -> notify $ SENT mId _ -> pure () withStore $ \st -> deleteMsg st connId msgId where - notify :: ConnId -> ACommand 'Agent -> m () - notify connId cmd = atomically $ writeTBQueue subQ ("", connId, cmd) + notify :: ACommand 'Agent -> m () + notify cmd = atomically $ writeTBQueue subQ ("", connId, cmd) ackMessage' :: forall m. AgentMonad m => AgentClient -> ConnId -> AgentMsgId -> m () ackMessage' c connId msgId = do diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index c3f5277e0..34268b5ea 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -77,8 +77,8 @@ data AgentClient = AgentClient subscrConns :: TVar (Map ConnId SMPServer), activations :: TVar (Map ConnId (Async ())), -- activations of send queues in progress connMsgsQueued :: TVar (Map ConnId Bool), - srvMsgQueues :: TVar (Map SMPServer (TQueue PendingMsg)), - srvMsgDeliveries :: TVar (Map SMPServer (Async ())), + smpQueueMsgQueues :: TVar (Map (ConnId, SMPServer, SMP.SenderId) (TQueue InternalId)), + smpQueueMsgDeliveries :: TVar (Map (ConnId, SMPServer, SMP.SenderId) (Async ())), reconnections :: TVar [Async ()], clientId :: Int, agentEnv :: Env, @@ -97,12 +97,12 @@ newAgentClient agentEnv = do subscrConns <- newTVar M.empty activations <- newTVar M.empty connMsgsQueued <- newTVar M.empty - srvMsgQueues <- newTVar M.empty - srvMsgDeliveries <- newTVar M.empty + smpQueueMsgQueues <- newTVar M.empty + smpQueueMsgDeliveries <- newTVar M.empty reconnections <- newTVar [] clientId <- stateTVar (clientCounter agentEnv) $ \i -> (i + 1, i + 1) lock <- newTMVar () - return AgentClient {rcvQ, subQ, msgQ, smpClients, subscrSrvrs, subscrConns, activations, connMsgsQueued, srvMsgQueues, srvMsgDeliveries, reconnections, clientId, agentEnv, smpSubscriber = undefined, lock} + return AgentClient {rcvQ, subQ, msgQ, smpClients, subscrSrvrs, subscrConns, activations, connMsgsQueued, smpQueueMsgQueues, smpQueueMsgDeliveries, reconnections, clientId, agentEnv, smpSubscriber = undefined, lock} -- | Agent monad with MonadReader Env and MonadError AgentErrorType type AgentMonad m = (MonadUnliftIO m, MonadReader Env m, MonadError AgentErrorType m) @@ -178,7 +178,7 @@ closeAgentClient c = liftIO $ do closeSMPServerClients c cancelActions $ activations c cancelActions $ reconnections c - cancelActions $ srvMsgDeliveries c + cancelActions $ smpQueueMsgDeliveries c closeSMPServerClients :: AgentClient -> IO () closeSMPServerClients c = readTVarIO (smpClients c) >>= mapM_ closeSMPClient diff --git a/src/Simplex/Messaging/Agent/Store.hs b/src/Simplex/Messaging/Agent/Store.hs index 43e752353..3b50985df 100644 --- a/src/Simplex/Messaging/Agent/Store.hs +++ b/src/Simplex/Messaging/Agent/Store.hs @@ -61,7 +61,7 @@ class Monad m => MonadAgentStore s m where createRcvMsg :: s -> ConnId -> RcvMsgData -> m () updateSndIds :: s -> ConnId -> m (InternalId, InternalSndId, PrevSndMsgHash) createSndMsg :: s -> ConnId -> SndMsgData -> m () - getPendingMsgData :: s -> ConnId -> InternalId -> m (SndQueue, Maybe RcvQueue, (AMsgType, MsgBody)) + getPendingMsgData :: s -> ConnId -> InternalId -> m (Maybe RcvQueue, (AMsgType, MsgBody)) getPendingMsgs :: s -> ConnId -> m [InternalId] checkRcvMsg :: s -> ConnId -> InternalId -> m () deleteMsg :: s -> ConnId -> InternalId -> m () diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index 33e092195..fc45571ef 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -439,10 +439,9 @@ instance (MonadUnliftIO m, MonadError StoreError m) => MonadAgentStore SQLiteSto insertSndMsgDetails_ db connId sndMsgData updateHashSnd_ db connId sndMsgData - getPendingMsgData :: SQLiteStore -> ConnId -> InternalId -> m (SndQueue, Maybe RcvQueue, (AMsgType, MsgBody)) + getPendingMsgData :: SQLiteStore -> ConnId -> InternalId -> m (Maybe RcvQueue, (AMsgType, MsgBody)) getPendingMsgData st connId msgId = liftIOEither . withTransaction st $ \db -> runExceptT $ do - sq <- ExceptT $ sndQueue <$> getSndQueueByConnAlias_ db connId rq_ <- liftIO $ getRcvQueueByConnAlias_ db connId msgData <- ExceptT $ @@ -456,13 +455,11 @@ instance (MonadUnliftIO m, MonadError StoreError m) => MonadAgentStore SQLiteSto WHERE m.conn_alias = ? AND m.internal_id = ? |] (connId, msgId) - pure (sq, rq_, msgData) + pure (rq_, msgData) where sndMsgData :: [(AMsgType, MsgBody)] -> Either StoreError (AMsgType, MsgBody) sndMsgData [msgData] = Right msgData sndMsgData _ = Left SEMsgNotFound - sndQueue :: Maybe SndQueue -> Either StoreError SndQueue - sndQueue = maybe (Left SEConnNotFound) Right getPendingMsgs :: SQLiteStore -> ConnId -> m [InternalId] getPendingMsgs st connId = diff --git a/tests/AgentTests.hs b/tests/AgentTests.hs index 9b3f0f80d..e30ea7c1f 100644 --- a/tests/AgentTests.hs +++ b/tests/AgentTests.hs @@ -14,6 +14,7 @@ import AgentTests.DoubleRatchetTests (doubleRatchetTests) import AgentTests.FunctionalAPITests (functionalAPITests) import AgentTests.SQLiteTests (storeTests) import Control.Concurrent +import Control.Monad (forM_) import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import Network.HTTP.Types (urlEncode) @@ -24,6 +25,7 @@ import qualified Simplex.Messaging.Agent.Protocol as A import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol (ErrorType (..), MsgBody) import Simplex.Messaging.Transport (ATransport (..), TProxy (..), Transport (..)) +import Simplex.Messaging.Util (bshow) import System.Directory (removeFile) import System.Timeout import Test.Hspec @@ -65,6 +67,10 @@ agentTests (ATransport t) = do smpAgentTest2_2_2_needs_server $ testMsgDeliveryServerRestart t it "should deliver pending messages after agent restarting" $ smpAgentTest1_1_1 $ testMsgDeliveryAgentRestart t + it "should concurrently deliver messages to connections without blocking" $ + smpAgentTest2_2_1 $ testConcurrentMsgDelivery t + it "should deliver messages if one of connections has quota exceeded" $ + smpAgentTest2_2_1 $ testMsgDeliveryQuotaExceeded t -- | receive message to handle `h` (<#:) :: Transport c => c -> IO (ATransmissionOrError 'Agent) @@ -327,6 +333,47 @@ testMsgDeliveryAgentRestart t bob = do withServer test' = withSmpServerStoreLogOn (ATransport t) testPort2 (const test') `shouldReturn` () withAgent = withSmpAgentThreadOn_ (ATransport t) (agentTestPort, testPort, testDB) (pure ()) . const . testSMPAgentClientOn agentTestPort +testConcurrentMsgDelivery :: Transport c => TProxy c -> c -> c -> IO () +testConcurrentMsgDelivery _ alice bob = do + connect (alice, "alice") (bob, "bob") + + ("1", "bob2", Right (INV cReq)) <- alice #: ("1", "bob2", "NEW INV") + let cReq' = strEncode cReq + bob #: ("11", "alice2", "JOIN " <> cReq' <> " 14\nbob's connInfo") #> ("11", "alice2", OK) + ("", "bob2", Right (CONF _confId "bob's connInfo")) <- (alice <#:) + -- below commands would be needed to accept bob's connection, but alice does not + -- alice #: ("2", "bob", "LET " <> _confId <> " 16\nalice's connInfo") #> ("2", "bob", OK) + -- bob <# ("", "alice", INFO "alice's connInfo") + -- bob <# ("", "alice", CON) + -- alice <# ("", "bob", CON) + + -- the first connection should not be blocked by the second one + sendMessage (alice, "alice") (bob, "bob") "hello" + -- alice #: ("2", "bob", "SEND :hello") #> ("2", "bob", MID 1) + -- alice <# ("", "bob", SENT 1) + -- bob <#= \case ("", "alice", Msg "hello") -> True; _ -> False + -- bob #: ("12", "alice", "ACK 1") #> ("12", "alice", OK) + bob #: ("14", "alice", "SEND 9\nhello too") #> ("14", "alice", MID 2) + bob <# ("", "alice", SENT 2) + -- if delivery is blocked it won't go further + alice <#= \case ("", "bob", Msg "hello too") -> True; _ -> False + alice #: ("3", "bob", "ACK 2") #> ("3", "bob", OK) + +testMsgDeliveryQuotaExceeded :: Transport c => TProxy c -> c -> c -> IO () +testMsgDeliveryQuotaExceeded _ alice bob = do + connect (alice, "alice") (bob, "bob") + connect (alice, "alice2") (bob, "bob2") + forM_ [1 .. 4 :: Int] $ \i -> do + let corrId = bshow i + msg = "message " <> bshow i + (_, "bob", Right (MID mId)) <- alice #: (corrId, "bob", "SEND :" <> msg) + alice <#= \case ("", "bob", SENT m) -> m == mId; _ -> False + (_, "bob", Right (MID _)) <- alice #: ("5", "bob", "SEND :over quota") + + alice #: ("1", "bob2", "SEND :hello") #> ("1", "bob2", MID 1) + -- if delivery is blocked it won't go further + alice <# ("", "bob2", SENT 1) + connect :: forall c. Transport c => (c, ByteString) -> (c, ByteString) -> IO () connect (h1, name1) (h2, name2) = do ("c1", _, Right (INV cReq)) <- h1 #: ("c1", name2, "NEW INV") @@ -338,6 +385,16 @@ connect (h1, name1) (h2, name2) = do h2 <# ("", name1, CON) h1 <# ("", name2, CON) +sendMessage :: Transport c => (c, ConnId) -> (c, ConnId) -> ByteString -> IO () +sendMessage (h1, name1) (h2, name2) msg = do + ("m1", name2', Right (MID mId)) <- h1 #: ("m1", name2, "SEND :" <> msg) + name2' `shouldBe` name2 + h1 <#= \case ("", n, SENT m) -> n == name2 && m == mId; _ -> False + ("", name1', Right (MSG MsgMeta {recipient = (msgId, _)} msg')) <- (h2 <#:) + name1' `shouldBe` name1 + msg' `shouldBe` msg + h2 #: ("m2", name1, "ACK " <> bshow msgId) =#> \case ("m2", n, OK) -> n == name1; _ -> False + -- connect' :: forall c. Transport c => c -> c -> IO (ByteString, ByteString) -- connect' h1 h2 = do -- ("c1", conn2, Right (INV cReq)) <- h1 #: ("c1", "", "NEW INV")