Merge branch 'stable' into ep/message-delivery

This commit is contained in:
Evgeny Poberezkin
2022-01-06 16:38:22 +00:00
6 changed files with 48 additions and 46 deletions

View File

@@ -1,3 +1,7 @@
# 0.5.2
- Fix message delivery logic that blocked delivery of all server messages when server per-queue quota exceeded, making it concurrent per SMP queue, not per server.
# 0.5.1
- Fix server subscription logic bug that was leading to memory leak / resource exhaustion in some edge cases.

View File

@@ -1,5 +1,5 @@
name: simplexmq
version: 0.5.1
version: 0.5.2
synopsis: SimpleXMQ message broker
description: |
This package includes <./docs/Simplex-Messaging-Server.html server>,

View File

@@ -5,7 +5,7 @@ cabal-version: 1.12
-- see: https://github.com/sol/hpack
name: simplexmq
version: 0.5.1
version: 0.5.2
synopsis: SimpleXMQ message broker
description: This package includes <./docs/Simplex-Messaging-Server.html server>,
<./docs/Simplex-Messaging-Client.html client> and

View File

@@ -7,7 +7,6 @@
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedLists #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
@@ -376,10 +375,10 @@ sendMessage' c connId msg =
enqueueMsg sq = enqueueMessage c connId sq $ A_MSG msg
enqueueMessage :: forall m. AgentMonad m => AgentClient -> ConnId -> SndQueue -> AMessage -> m AgentMsgId
enqueueMessage c connId sq@SndQueue {server} aMessage = do
enqueueMessage c connId sq aMessage = do
resumeMsgDelivery c connId sq
msgId <- storeSentMsg
queuePendingMsgs c connId server [msgId]
queuePendingMsgs c connId sq [msgId]
pure $ unId msgId
where
storeSentMsg :: m InternalId
@@ -398,15 +397,16 @@ enqueueMessage c connId sq@SndQueue {server} aMessage = do
pure internalId
resumeMsgDelivery :: forall m. AgentMonad m => AgentClient -> ConnId -> SndQueue -> m ()
resumeMsgDelivery c connId SndQueue {server} = do
unlessM srvDelivering $
async (runSrvMsgDelivery c server)
>>= atomically . modifyTVar (srvMsgDeliveries c) . M.insert server
resumeMsgDelivery c connId sq@SndQueue {server, sndId} = do
let qKey = (connId, server, sndId)
unlessM (queueDelivering qKey) $
async (runSmpQueueMsgDelivery c connId sq)
>>= atomically . modifyTVar (smpQueueMsgDeliveries c) . M.insert qKey
unlessM connQueued $
withStore (`getPendingMsgs` connId)
>>= queuePendingMsgs c connId server
>>= queuePendingMsgs c connId sq
where
srvDelivering = isJust . M.lookup server <$> readTVarIO (srvMsgDeliveries c)
queueDelivering qKey = isJust . M.lookup qKey <$> readTVarIO (smpQueueMsgDeliveries c)
connQueued =
atomically $
isJust
@@ -414,31 +414,31 @@ resumeMsgDelivery c connId SndQueue {server} = do
(connMsgsQueued c)
(\m -> (M.lookup connId m, M.insert connId True m))
queuePendingMsgs :: AgentMonad m => AgentClient -> ConnId -> SMPServer -> [InternalId] -> m ()
queuePendingMsgs c connId server msgIds = atomically $ do
q <- getPendingMsgQ c server
mapM_ (writeTQueue q . PendingMsg connId) msgIds
queuePendingMsgs :: AgentMonad m => AgentClient -> ConnId -> SndQueue -> [InternalId] -> m ()
queuePendingMsgs c connId sq msgIds = atomically $ do
q <- getPendingMsgQ c connId sq
mapM_ (writeTQueue q) msgIds
getPendingMsgQ :: AgentClient -> SMPServer -> STM (TQueue PendingMsg)
getPendingMsgQ c srv = do
maybe newMsgQueue pure . M.lookup srv =<< readTVar (srvMsgQueues c)
getPendingMsgQ :: AgentClient -> ConnId -> SndQueue -> STM (TQueue InternalId)
getPendingMsgQ c connId SndQueue {server, sndId} = do
let qKey = (connId, server, sndId)
maybe (newMsgQueue qKey) pure . M.lookup qKey =<< readTVar (smpQueueMsgQueues c)
where
newMsgQueue :: STM (TQueue PendingMsg)
newMsgQueue = do
newMsgQueue qKey = do
mq <- newTQueue
modifyTVar (srvMsgQueues c) $ M.insert srv mq
modifyTVar (smpQueueMsgQueues c) $ M.insert qKey mq
pure mq
runSrvMsgDelivery :: forall m. AgentMonad m => AgentClient -> SMPServer -> m ()
runSrvMsgDelivery c@AgentClient {subQ} srv = do
mq <- atomically $ getPendingMsgQ c srv
runSmpQueueMsgDelivery :: forall m. AgentMonad m => AgentClient -> ConnId -> SndQueue -> m ()
runSmpQueueMsgDelivery c@AgentClient {subQ} connId sq = do
mq <- atomically $ getPendingMsgQ c connId sq
ri <- asks $ reconnectInterval . config
forever $ do
PendingMsg {connId, msgId} <- atomically $ readTQueue mq
msgId <- atomically $ readTQueue mq
let mId = unId msgId
withStore (\st -> E.try $ getPendingMsgData st connId msgId) >>= \case
Left (e :: E.SomeException) ->
notify connId $ MERR mId (INTERNAL $ show e)
notify $ MERR mId (INTERNAL $ show e)
Right (sq, rq_, (msgType, msgBody)) -> do
withRetryInterval ri $ \loop -> do
tryError (sendAgentMessage c sq msgBody) >>= \case
@@ -446,10 +446,10 @@ runSrvMsgDelivery c@AgentClient {subQ} srv = do
SMP SMP.QUOTA -> loop
SMP SMP.AUTH -> case msgType of
HELLO_ -> loop
REPLY_ -> notify connId $ ERR e
A_MSG_ -> notify connId $ MERR mId e
SMP {} -> notify connId $ MERR mId e
CMD {} -> notify connId $ MERR mId e
REPLY_ -> notify $ ERR e
A_MSG_ -> notify $ MERR mId e
SMP {} -> notify $ MERR mId e
CMD {} -> notify $ MERR mId e
_ -> loop
Right () -> do
case msgType of
@@ -459,15 +459,15 @@ runSrvMsgDelivery c@AgentClient {subQ} srv = do
-- party initiating connection
Just rq -> do
subscribeQueue c rq connId
notify connId CON
notify CON
-- party joining connection
_ -> createReplyQueue c connId sq
A_MSG_ -> notify connId $ SENT mId
A_MSG_ -> notify $ SENT mId
_ -> pure ()
withStore $ \st -> deleteMsg st connId msgId
where
notify :: ConnId -> ACommand 'Agent -> m ()
notify connId cmd = atomically $ writeTBQueue subQ ("", connId, cmd)
notify :: ACommand 'Agent -> m ()
notify cmd = atomically $ writeTBQueue subQ ("", connId, cmd)
ackMessage' :: forall m. AgentMonad m => AgentClient -> ConnId -> AgentMsgId -> m ()
ackMessage' c connId msgId = do

View File

@@ -77,8 +77,8 @@ data AgentClient = AgentClient
subscrConns :: TVar (Map ConnId SMPServer),
activations :: TVar (Map ConnId (Async ())), -- activations of send queues in progress
connMsgsQueued :: TVar (Map ConnId Bool),
srvMsgQueues :: TVar (Map SMPServer (TQueue PendingMsg)),
srvMsgDeliveries :: TVar (Map SMPServer (Async ())),
smpQueueMsgQueues :: TVar (Map (ConnId, SMPServer, SMP.SenderId) (TQueue InternalId)),
smpQueueMsgDeliveries :: TVar (Map (ConnId, SMPServer, SMP.SenderId) (Async ())),
reconnections :: TVar [Async ()],
clientId :: Int,
agentEnv :: Env,
@@ -97,12 +97,12 @@ newAgentClient agentEnv = do
subscrConns <- newTVar M.empty
activations <- newTVar M.empty
connMsgsQueued <- newTVar M.empty
srvMsgQueues <- newTVar M.empty
srvMsgDeliveries <- newTVar M.empty
smpQueueMsgQueues <- newTVar M.empty
smpQueueMsgDeliveries <- newTVar M.empty
reconnections <- newTVar []
clientId <- stateTVar (clientCounter agentEnv) $ \i -> (i + 1, i + 1)
lock <- newTMVar ()
return AgentClient {rcvQ, subQ, msgQ, smpClients, subscrSrvrs, subscrConns, activations, connMsgsQueued, srvMsgQueues, srvMsgDeliveries, reconnections, clientId, agentEnv, smpSubscriber = undefined, lock}
return AgentClient {rcvQ, subQ, msgQ, smpClients, subscrSrvrs, subscrConns, activations, connMsgsQueued, smpQueueMsgQueues, smpQueueMsgDeliveries, reconnections, clientId, agentEnv, smpSubscriber = undefined, lock}
-- | Agent monad with MonadReader Env and MonadError AgentErrorType
type AgentMonad m = (MonadUnliftIO m, MonadReader Env m, MonadError AgentErrorType m)
@@ -178,7 +178,7 @@ closeAgentClient c = liftIO $ do
closeSMPServerClients c
cancelActions $ activations c
cancelActions $ reconnections c
cancelActions $ srvMsgDeliveries c
cancelActions $ smpQueueMsgDeliveries c
closeSMPServerClients :: AgentClient -> IO ()
closeSMPServerClients c = readTVarIO (smpClients c) >>= mapM_ closeSMPClient

View File

@@ -67,9 +67,9 @@ agentTests (ATransport t) = do
smpAgentTest2_2_2_needs_server $ testMsgDeliveryServerRestart t
it "should deliver pending messages after agent restarting" $
smpAgentTest1_1_1 $ testMsgDeliveryAgentRestart t
xit "should concurrently deliver messages to connections without blocking" $
it "should concurrently deliver messages to connections without blocking" $
smpAgentTest2_2_1 $ testConcurrentMsgDelivery t
xit "should deliver messages if one of connections has quota exceeded" $
it "should deliver messages if one of connections has quota exceeded" $
smpAgentTest2_2_1 $ testMsgDeliveryQuotaExceeded t
-- | receive message to handle `h`
@@ -354,9 +354,8 @@ testConcurrentMsgDelivery _ alice bob = do
-- bob <#= \case ("", "alice", Msg "hello") -> True; _ -> False
-- bob #: ("12", "alice", "ACK 1") #> ("12", "alice", OK)
bob #: ("14", "alice", "SEND 9\nhello too") #> ("14", "alice", MID 2)
putStrLn "it gets this far"
bob <# ("", "alice", SENT 2)
putStrLn "it never gets here as the message is blocked by HELLO in in another connection"
-- if delivery is blocked it won't go further
alice <#= \case ("", "bob", Msg "hello too") -> True; _ -> False
alice #: ("3", "bob", "ACK 2") #> ("3", "bob", OK)
@@ -372,9 +371,8 @@ testMsgDeliveryQuotaExceeded _ alice bob = do
(_, "bob", Right (MID _)) <- alice #: ("5", "bob", "SEND :over quota")
alice #: ("1", "bob2", "SEND :hello") #> ("1", "bob2", MID 1)
putStrLn "it gets this far"
-- if delivery is blocked it won't go further
alice <# ("", "bob2", SENT 1)
putStrLn "it never gets here as the message is blocked by MSG in in another connection"
connect :: forall c. Transport c => (c, ByteString) -> (c, ByteString) -> IO ()
connect (h1, name1) (h2, name2) = do