diff --git a/CHANGELOG.md b/CHANGELOG.md index 29ae11e15..f455aeda6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.5.2 + +- Fix message delivery logic that blocked delivery of all server messages when server per-queue quota exceeded, making it concurrent per SMP queue, not per server. + # 0.5.1 - Fix server subscription logic bug that was leading to memory leak / resource exhaustion in some edge cases. diff --git a/package.yaml b/package.yaml index ef50b5b27..3e62cc01b 100644 --- a/package.yaml +++ b/package.yaml @@ -1,5 +1,5 @@ name: simplexmq -version: 0.5.1 +version: 0.5.2 synopsis: SimpleXMQ message broker description: | This package includes <./docs/Simplex-Messaging-Server.html server>, diff --git a/simplexmq.cabal b/simplexmq.cabal index c91f87484..7b13ec2ed 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -5,7 +5,7 @@ cabal-version: 1.12 -- see: https://github.com/sol/hpack name: simplexmq -version: 0.5.1 +version: 0.5.2 synopsis: SimpleXMQ message broker description: This package includes <./docs/Simplex-Messaging-Server.html server>, <./docs/Simplex-Messaging-Client.html client> and diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index e30bc236d..dcb0c29dc 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -7,7 +7,6 @@ {-# LANGUAGE KindSignatures #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} -{-# LANGUAGE NumericUnderscores #-} {-# LANGUAGE OverloadedLists #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RankNTypes #-} @@ -376,10 +375,10 @@ sendMessage' c connId msg = enqueueMsg sq = enqueueMessage c connId sq $ A_MSG msg enqueueMessage :: forall m. AgentMonad m => AgentClient -> ConnId -> SndQueue -> AMessage -> m AgentMsgId -enqueueMessage c connId sq@SndQueue {server} aMessage = do +enqueueMessage c connId sq aMessage = do resumeMsgDelivery c connId sq msgId <- storeSentMsg - queuePendingMsgs c connId server [msgId] + queuePendingMsgs c connId sq [msgId] pure $ unId msgId where storeSentMsg :: m InternalId @@ -398,15 +397,16 @@ enqueueMessage c connId sq@SndQueue {server} aMessage = do pure internalId resumeMsgDelivery :: forall m. AgentMonad m => AgentClient -> ConnId -> SndQueue -> m () -resumeMsgDelivery c connId SndQueue {server} = do - unlessM srvDelivering $ - async (runSrvMsgDelivery c server) - >>= atomically . modifyTVar (srvMsgDeliveries c) . M.insert server +resumeMsgDelivery c connId sq@SndQueue {server, sndId} = do + let qKey = (connId, server, sndId) + unlessM (queueDelivering qKey) $ + async (runSmpQueueMsgDelivery c connId sq) + >>= atomically . modifyTVar (smpQueueMsgDeliveries c) . M.insert qKey unlessM connQueued $ withStore (`getPendingMsgs` connId) - >>= queuePendingMsgs c connId server + >>= queuePendingMsgs c connId sq where - srvDelivering = isJust . M.lookup server <$> readTVarIO (srvMsgDeliveries c) + queueDelivering qKey = isJust . M.lookup qKey <$> readTVarIO (smpQueueMsgDeliveries c) connQueued = atomically $ isJust @@ -414,31 +414,31 @@ resumeMsgDelivery c connId SndQueue {server} = do (connMsgsQueued c) (\m -> (M.lookup connId m, M.insert connId True m)) -queuePendingMsgs :: AgentMonad m => AgentClient -> ConnId -> SMPServer -> [InternalId] -> m () -queuePendingMsgs c connId server msgIds = atomically $ do - q <- getPendingMsgQ c server - mapM_ (writeTQueue q . PendingMsg connId) msgIds +queuePendingMsgs :: AgentMonad m => AgentClient -> ConnId -> SndQueue -> [InternalId] -> m () +queuePendingMsgs c connId sq msgIds = atomically $ do + q <- getPendingMsgQ c connId sq + mapM_ (writeTQueue q) msgIds -getPendingMsgQ :: AgentClient -> SMPServer -> STM (TQueue PendingMsg) -getPendingMsgQ c srv = do - maybe newMsgQueue pure . M.lookup srv =<< readTVar (srvMsgQueues c) +getPendingMsgQ :: AgentClient -> ConnId -> SndQueue -> STM (TQueue InternalId) +getPendingMsgQ c connId SndQueue {server, sndId} = do + let qKey = (connId, server, sndId) + maybe (newMsgQueue qKey) pure . M.lookup qKey =<< readTVar (smpQueueMsgQueues c) where - newMsgQueue :: STM (TQueue PendingMsg) - newMsgQueue = do + newMsgQueue qKey = do mq <- newTQueue - modifyTVar (srvMsgQueues c) $ M.insert srv mq + modifyTVar (smpQueueMsgQueues c) $ M.insert qKey mq pure mq -runSrvMsgDelivery :: forall m. AgentMonad m => AgentClient -> SMPServer -> m () -runSrvMsgDelivery c@AgentClient {subQ} srv = do - mq <- atomically $ getPendingMsgQ c srv +runSmpQueueMsgDelivery :: forall m. AgentMonad m => AgentClient -> ConnId -> SndQueue -> m () +runSmpQueueMsgDelivery c@AgentClient {subQ} connId sq = do + mq <- atomically $ getPendingMsgQ c connId sq ri <- asks $ reconnectInterval . config forever $ do - PendingMsg {connId, msgId} <- atomically $ readTQueue mq + msgId <- atomically $ readTQueue mq let mId = unId msgId withStore (\st -> E.try $ getPendingMsgData st connId msgId) >>= \case Left (e :: E.SomeException) -> - notify connId $ MERR mId (INTERNAL $ show e) + notify $ MERR mId (INTERNAL $ show e) Right (sq, rq_, (msgType, msgBody)) -> do withRetryInterval ri $ \loop -> do tryError (sendAgentMessage c sq msgBody) >>= \case @@ -446,10 +446,10 @@ runSrvMsgDelivery c@AgentClient {subQ} srv = do SMP SMP.QUOTA -> loop SMP SMP.AUTH -> case msgType of HELLO_ -> loop - REPLY_ -> notify connId $ ERR e - A_MSG_ -> notify connId $ MERR mId e - SMP {} -> notify connId $ MERR mId e - CMD {} -> notify connId $ MERR mId e + REPLY_ -> notify $ ERR e + A_MSG_ -> notify $ MERR mId e + SMP {} -> notify $ MERR mId e + CMD {} -> notify $ MERR mId e _ -> loop Right () -> do case msgType of @@ -459,15 +459,15 @@ runSrvMsgDelivery c@AgentClient {subQ} srv = do -- party initiating connection Just rq -> do subscribeQueue c rq connId - notify connId CON + notify CON -- party joining connection _ -> createReplyQueue c connId sq - A_MSG_ -> notify connId $ SENT mId + A_MSG_ -> notify $ SENT mId _ -> pure () withStore $ \st -> deleteMsg st connId msgId where - notify :: ConnId -> ACommand 'Agent -> m () - notify connId cmd = atomically $ writeTBQueue subQ ("", connId, cmd) + notify :: ACommand 'Agent -> m () + notify cmd = atomically $ writeTBQueue subQ ("", connId, cmd) ackMessage' :: forall m. AgentMonad m => AgentClient -> ConnId -> AgentMsgId -> m () ackMessage' c connId msgId = do diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index c3f5277e0..34268b5ea 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -77,8 +77,8 @@ data AgentClient = AgentClient subscrConns :: TVar (Map ConnId SMPServer), activations :: TVar (Map ConnId (Async ())), -- activations of send queues in progress connMsgsQueued :: TVar (Map ConnId Bool), - srvMsgQueues :: TVar (Map SMPServer (TQueue PendingMsg)), - srvMsgDeliveries :: TVar (Map SMPServer (Async ())), + smpQueueMsgQueues :: TVar (Map (ConnId, SMPServer, SMP.SenderId) (TQueue InternalId)), + smpQueueMsgDeliveries :: TVar (Map (ConnId, SMPServer, SMP.SenderId) (Async ())), reconnections :: TVar [Async ()], clientId :: Int, agentEnv :: Env, @@ -97,12 +97,12 @@ newAgentClient agentEnv = do subscrConns <- newTVar M.empty activations <- newTVar M.empty connMsgsQueued <- newTVar M.empty - srvMsgQueues <- newTVar M.empty - srvMsgDeliveries <- newTVar M.empty + smpQueueMsgQueues <- newTVar M.empty + smpQueueMsgDeliveries <- newTVar M.empty reconnections <- newTVar [] clientId <- stateTVar (clientCounter agentEnv) $ \i -> (i + 1, i + 1) lock <- newTMVar () - return AgentClient {rcvQ, subQ, msgQ, smpClients, subscrSrvrs, subscrConns, activations, connMsgsQueued, srvMsgQueues, srvMsgDeliveries, reconnections, clientId, agentEnv, smpSubscriber = undefined, lock} + return AgentClient {rcvQ, subQ, msgQ, smpClients, subscrSrvrs, subscrConns, activations, connMsgsQueued, smpQueueMsgQueues, smpQueueMsgDeliveries, reconnections, clientId, agentEnv, smpSubscriber = undefined, lock} -- | Agent monad with MonadReader Env and MonadError AgentErrorType type AgentMonad m = (MonadUnliftIO m, MonadReader Env m, MonadError AgentErrorType m) @@ -178,7 +178,7 @@ closeAgentClient c = liftIO $ do closeSMPServerClients c cancelActions $ activations c cancelActions $ reconnections c - cancelActions $ srvMsgDeliveries c + cancelActions $ smpQueueMsgDeliveries c closeSMPServerClients :: AgentClient -> IO () closeSMPServerClients c = readTVarIO (smpClients c) >>= mapM_ closeSMPClient diff --git a/tests/AgentTests.hs b/tests/AgentTests.hs index d7ba1c2b6..e30ea7c1f 100644 --- a/tests/AgentTests.hs +++ b/tests/AgentTests.hs @@ -67,9 +67,9 @@ agentTests (ATransport t) = do smpAgentTest2_2_2_needs_server $ testMsgDeliveryServerRestart t it "should deliver pending messages after agent restarting" $ smpAgentTest1_1_1 $ testMsgDeliveryAgentRestart t - xit "should concurrently deliver messages to connections without blocking" $ + it "should concurrently deliver messages to connections without blocking" $ smpAgentTest2_2_1 $ testConcurrentMsgDelivery t - xit "should deliver messages if one of connections has quota exceeded" $ + it "should deliver messages if one of connections has quota exceeded" $ smpAgentTest2_2_1 $ testMsgDeliveryQuotaExceeded t -- | receive message to handle `h` @@ -354,9 +354,8 @@ testConcurrentMsgDelivery _ alice bob = do -- bob <#= \case ("", "alice", Msg "hello") -> True; _ -> False -- bob #: ("12", "alice", "ACK 1") #> ("12", "alice", OK) bob #: ("14", "alice", "SEND 9\nhello too") #> ("14", "alice", MID 2) - putStrLn "it gets this far" bob <# ("", "alice", SENT 2) - putStrLn "it never gets here as the message is blocked by HELLO in in another connection" + -- if delivery is blocked it won't go further alice <#= \case ("", "bob", Msg "hello too") -> True; _ -> False alice #: ("3", "bob", "ACK 2") #> ("3", "bob", OK) @@ -372,9 +371,8 @@ testMsgDeliveryQuotaExceeded _ alice bob = do (_, "bob", Right (MID _)) <- alice #: ("5", "bob", "SEND :over quota") alice #: ("1", "bob2", "SEND :hello") #> ("1", "bob2", MID 1) - putStrLn "it gets this far" + -- if delivery is blocked it won't go further alice <# ("", "bob2", SENT 1) - putStrLn "it never gets here as the message is blocked by MSG in in another connection" connect :: forall c. Transport c => (c, ByteString) -> (c, ByteString) -> IO () connect (h1, name1) (h2, name2) = do