Merge pull request #259 from simplex-chat/ep/message-delivery

concurrent message delivery with a separate thread/queue per connection
This commit is contained in:
Evgeny Poberezkin
2022-01-06 16:47:07 +00:00
committed by GitHub
8 changed files with 105 additions and 47 deletions

View File

@@ -1,3 +1,7 @@
# 0.5.2
- Fix message delivery logic that blocked delivery of all server messages when server per-queue quota exceeded, making it concurrent per SMP queue, not per server.
# 0.5.1
- Fix server subscription logic bug that was leading to memory leak / resource exhaustion in some edge cases.

View File

@@ -1,5 +1,5 @@
name: simplexmq
version: 0.5.1
version: 0.5.2
synopsis: SimpleXMQ message broker
description: |
This package includes <./docs/Simplex-Messaging-Server.html server>,

View File

@@ -5,7 +5,7 @@ cabal-version: 1.12
-- see: https://github.com/sol/hpack
name: simplexmq
version: 0.5.1
version: 0.5.2
synopsis: SimpleXMQ message broker
description: This package includes <./docs/Simplex-Messaging-Server.html server>,
<./docs/Simplex-Messaging-Client.html client> and

View File

@@ -7,7 +7,6 @@
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedLists #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
@@ -376,10 +375,10 @@ sendMessage' c connId msg =
enqueueMsg sq = enqueueMessage c connId sq $ A_MSG msg
enqueueMessage :: forall m. AgentMonad m => AgentClient -> ConnId -> SndQueue -> AMessage -> m AgentMsgId
enqueueMessage c connId sq@SndQueue {server} aMessage = do
enqueueMessage c connId sq aMessage = do
resumeMsgDelivery c connId sq
msgId <- storeSentMsg
queuePendingMsgs c connId server [msgId]
queuePendingMsgs c connId sq [msgId]
pure $ unId msgId
where
storeSentMsg :: m InternalId
@@ -398,15 +397,16 @@ enqueueMessage c connId sq@SndQueue {server} aMessage = do
pure internalId
resumeMsgDelivery :: forall m. AgentMonad m => AgentClient -> ConnId -> SndQueue -> m ()
resumeMsgDelivery c connId SndQueue {server} = do
unlessM srvDelivering $
async (runSrvMsgDelivery c server)
>>= atomically . modifyTVar (srvMsgDeliveries c) . M.insert server
resumeMsgDelivery c connId sq@SndQueue {server, sndId} = do
let qKey = (connId, server, sndId)
unlessM (queueDelivering qKey) $
async (runSmpQueueMsgDelivery c connId sq)
>>= atomically . modifyTVar (smpQueueMsgDeliveries c) . M.insert qKey
unlessM connQueued $
withStore (`getPendingMsgs` connId)
>>= queuePendingMsgs c connId server
>>= queuePendingMsgs c connId sq
where
srvDelivering = isJust . M.lookup server <$> readTVarIO (srvMsgDeliveries c)
queueDelivering qKey = isJust . M.lookup qKey <$> readTVarIO (smpQueueMsgDeliveries c)
connQueued =
atomically $
isJust
@@ -414,42 +414,42 @@ resumeMsgDelivery c connId SndQueue {server} = do
(connMsgsQueued c)
(\m -> (M.lookup connId m, M.insert connId True m))
queuePendingMsgs :: AgentMonad m => AgentClient -> ConnId -> SMPServer -> [InternalId] -> m ()
queuePendingMsgs c connId server msgIds = atomically $ do
q <- getPendingMsgQ c server
mapM_ (writeTQueue q . PendingMsg connId) msgIds
queuePendingMsgs :: AgentMonad m => AgentClient -> ConnId -> SndQueue -> [InternalId] -> m ()
queuePendingMsgs c connId sq msgIds = atomically $ do
q <- getPendingMsgQ c connId sq
mapM_ (writeTQueue q) msgIds
getPendingMsgQ :: AgentClient -> SMPServer -> STM (TQueue PendingMsg)
getPendingMsgQ c srv = do
maybe newMsgQueue pure . M.lookup srv =<< readTVar (srvMsgQueues c)
getPendingMsgQ :: AgentClient -> ConnId -> SndQueue -> STM (TQueue InternalId)
getPendingMsgQ c connId SndQueue {server, sndId} = do
let qKey = (connId, server, sndId)
maybe (newMsgQueue qKey) pure . M.lookup qKey =<< readTVar (smpQueueMsgQueues c)
where
newMsgQueue :: STM (TQueue PendingMsg)
newMsgQueue = do
newMsgQueue qKey = do
mq <- newTQueue
modifyTVar (srvMsgQueues c) $ M.insert srv mq
modifyTVar (smpQueueMsgQueues c) $ M.insert qKey mq
pure mq
runSrvMsgDelivery :: forall m. AgentMonad m => AgentClient -> SMPServer -> m ()
runSrvMsgDelivery c@AgentClient {subQ} srv = do
mq <- atomically $ getPendingMsgQ c srv
runSmpQueueMsgDelivery :: forall m. AgentMonad m => AgentClient -> ConnId -> SndQueue -> m ()
runSmpQueueMsgDelivery c@AgentClient {subQ} connId sq = do
mq <- atomically $ getPendingMsgQ c connId sq
ri <- asks $ reconnectInterval . config
forever $ do
PendingMsg {connId, msgId} <- atomically $ readTQueue mq
msgId <- atomically $ readTQueue mq
let mId = unId msgId
withStore (\st -> E.try $ getPendingMsgData st connId msgId) >>= \case
Left (e :: E.SomeException) ->
notify connId $ MERR mId (INTERNAL $ show e)
Right (sq, rq_, (msgType, msgBody)) -> do
notify $ MERR mId (INTERNAL $ show e)
Right (rq_, (msgType, msgBody)) -> do
withRetryInterval ri $ \loop -> do
tryError (sendAgentMessage c sq msgBody) >>= \case
Left e -> case e of
SMP SMP.QUOTA -> loop
SMP SMP.AUTH -> case msgType of
HELLO_ -> loop
REPLY_ -> notify connId $ ERR e
A_MSG_ -> notify connId $ MERR mId e
SMP {} -> notify connId $ MERR mId e
CMD {} -> notify connId $ MERR mId e
REPLY_ -> notify $ ERR e
A_MSG_ -> notify $ MERR mId e
SMP {} -> notify $ MERR mId e
CMD {} -> notify $ MERR mId e
_ -> loop
Right () -> do
case msgType of
@@ -459,15 +459,15 @@ runSrvMsgDelivery c@AgentClient {subQ} srv = do
-- party initiating connection
Just rq -> do
subscribeQueue c rq connId
notify connId CON
notify CON
-- party joining connection
_ -> createReplyQueue c connId sq
A_MSG_ -> notify connId $ SENT mId
A_MSG_ -> notify $ SENT mId
_ -> pure ()
withStore $ \st -> deleteMsg st connId msgId
where
notify :: ConnId -> ACommand 'Agent -> m ()
notify connId cmd = atomically $ writeTBQueue subQ ("", connId, cmd)
notify :: ACommand 'Agent -> m ()
notify cmd = atomically $ writeTBQueue subQ ("", connId, cmd)
ackMessage' :: forall m. AgentMonad m => AgentClient -> ConnId -> AgentMsgId -> m ()
ackMessage' c connId msgId = do

View File

@@ -77,8 +77,8 @@ data AgentClient = AgentClient
subscrConns :: TVar (Map ConnId SMPServer),
activations :: TVar (Map ConnId (Async ())), -- activations of send queues in progress
connMsgsQueued :: TVar (Map ConnId Bool),
srvMsgQueues :: TVar (Map SMPServer (TQueue PendingMsg)),
srvMsgDeliveries :: TVar (Map SMPServer (Async ())),
smpQueueMsgQueues :: TVar (Map (ConnId, SMPServer, SMP.SenderId) (TQueue InternalId)),
smpQueueMsgDeliveries :: TVar (Map (ConnId, SMPServer, SMP.SenderId) (Async ())),
reconnections :: TVar [Async ()],
clientId :: Int,
agentEnv :: Env,
@@ -97,12 +97,12 @@ newAgentClient agentEnv = do
subscrConns <- newTVar M.empty
activations <- newTVar M.empty
connMsgsQueued <- newTVar M.empty
srvMsgQueues <- newTVar M.empty
srvMsgDeliveries <- newTVar M.empty
smpQueueMsgQueues <- newTVar M.empty
smpQueueMsgDeliveries <- newTVar M.empty
reconnections <- newTVar []
clientId <- stateTVar (clientCounter agentEnv) $ \i -> (i + 1, i + 1)
lock <- newTMVar ()
return AgentClient {rcvQ, subQ, msgQ, smpClients, subscrSrvrs, subscrConns, activations, connMsgsQueued, srvMsgQueues, srvMsgDeliveries, reconnections, clientId, agentEnv, smpSubscriber = undefined, lock}
return AgentClient {rcvQ, subQ, msgQ, smpClients, subscrSrvrs, subscrConns, activations, connMsgsQueued, smpQueueMsgQueues, smpQueueMsgDeliveries, reconnections, clientId, agentEnv, smpSubscriber = undefined, lock}
-- | Agent monad with MonadReader Env and MonadError AgentErrorType
type AgentMonad m = (MonadUnliftIO m, MonadReader Env m, MonadError AgentErrorType m)
@@ -178,7 +178,7 @@ closeAgentClient c = liftIO $ do
closeSMPServerClients c
cancelActions $ activations c
cancelActions $ reconnections c
cancelActions $ srvMsgDeliveries c
cancelActions $ smpQueueMsgDeliveries c
closeSMPServerClients :: AgentClient -> IO ()
closeSMPServerClients c = readTVarIO (smpClients c) >>= mapM_ closeSMPClient

View File

@@ -61,7 +61,7 @@ class Monad m => MonadAgentStore s m where
createRcvMsg :: s -> ConnId -> RcvMsgData -> m ()
updateSndIds :: s -> ConnId -> m (InternalId, InternalSndId, PrevSndMsgHash)
createSndMsg :: s -> ConnId -> SndMsgData -> m ()
getPendingMsgData :: s -> ConnId -> InternalId -> m (SndQueue, Maybe RcvQueue, (AMsgType, MsgBody))
getPendingMsgData :: s -> ConnId -> InternalId -> m (Maybe RcvQueue, (AMsgType, MsgBody))
getPendingMsgs :: s -> ConnId -> m [InternalId]
checkRcvMsg :: s -> ConnId -> InternalId -> m ()
deleteMsg :: s -> ConnId -> InternalId -> m ()

View File

@@ -439,10 +439,9 @@ instance (MonadUnliftIO m, MonadError StoreError m) => MonadAgentStore SQLiteSto
insertSndMsgDetails_ db connId sndMsgData
updateHashSnd_ db connId sndMsgData
getPendingMsgData :: SQLiteStore -> ConnId -> InternalId -> m (SndQueue, Maybe RcvQueue, (AMsgType, MsgBody))
getPendingMsgData :: SQLiteStore -> ConnId -> InternalId -> m (Maybe RcvQueue, (AMsgType, MsgBody))
getPendingMsgData st connId msgId =
liftIOEither . withTransaction st $ \db -> runExceptT $ do
sq <- ExceptT $ sndQueue <$> getSndQueueByConnAlias_ db connId
rq_ <- liftIO $ getRcvQueueByConnAlias_ db connId
msgData <-
ExceptT $
@@ -456,13 +455,11 @@ instance (MonadUnliftIO m, MonadError StoreError m) => MonadAgentStore SQLiteSto
WHERE m.conn_alias = ? AND m.internal_id = ?
|]
(connId, msgId)
pure (sq, rq_, msgData)
pure (rq_, msgData)
where
sndMsgData :: [(AMsgType, MsgBody)] -> Either StoreError (AMsgType, MsgBody)
sndMsgData [msgData] = Right msgData
sndMsgData _ = Left SEMsgNotFound
sndQueue :: Maybe SndQueue -> Either StoreError SndQueue
sndQueue = maybe (Left SEConnNotFound) Right
getPendingMsgs :: SQLiteStore -> ConnId -> m [InternalId]
getPendingMsgs st connId =

View File

@@ -14,6 +14,7 @@ import AgentTests.DoubleRatchetTests (doubleRatchetTests)
import AgentTests.FunctionalAPITests (functionalAPITests)
import AgentTests.SQLiteTests (storeTests)
import Control.Concurrent
import Control.Monad (forM_)
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Network.HTTP.Types (urlEncode)
@@ -24,6 +25,7 @@ import qualified Simplex.Messaging.Agent.Protocol as A
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol (ErrorType (..), MsgBody)
import Simplex.Messaging.Transport (ATransport (..), TProxy (..), Transport (..))
import Simplex.Messaging.Util (bshow)
import System.Directory (removeFile)
import System.Timeout
import Test.Hspec
@@ -65,6 +67,10 @@ agentTests (ATransport t) = do
smpAgentTest2_2_2_needs_server $ testMsgDeliveryServerRestart t
it "should deliver pending messages after agent restarting" $
smpAgentTest1_1_1 $ testMsgDeliveryAgentRestart t
it "should concurrently deliver messages to connections without blocking" $
smpAgentTest2_2_1 $ testConcurrentMsgDelivery t
it "should deliver messages if one of connections has quota exceeded" $
smpAgentTest2_2_1 $ testMsgDeliveryQuotaExceeded t
-- | receive message to handle `h`
(<#:) :: Transport c => c -> IO (ATransmissionOrError 'Agent)
@@ -327,6 +333,47 @@ testMsgDeliveryAgentRestart t bob = do
withServer test' = withSmpServerStoreLogOn (ATransport t) testPort2 (const test') `shouldReturn` ()
withAgent = withSmpAgentThreadOn_ (ATransport t) (agentTestPort, testPort, testDB) (pure ()) . const . testSMPAgentClientOn agentTestPort
testConcurrentMsgDelivery :: Transport c => TProxy c -> c -> c -> IO ()
testConcurrentMsgDelivery _ alice bob = do
connect (alice, "alice") (bob, "bob")
("1", "bob2", Right (INV cReq)) <- alice #: ("1", "bob2", "NEW INV")
let cReq' = strEncode cReq
bob #: ("11", "alice2", "JOIN " <> cReq' <> " 14\nbob's connInfo") #> ("11", "alice2", OK)
("", "bob2", Right (CONF _confId "bob's connInfo")) <- (alice <#:)
-- below commands would be needed to accept bob's connection, but alice does not
-- alice #: ("2", "bob", "LET " <> _confId <> " 16\nalice's connInfo") #> ("2", "bob", OK)
-- bob <# ("", "alice", INFO "alice's connInfo")
-- bob <# ("", "alice", CON)
-- alice <# ("", "bob", CON)
-- the first connection should not be blocked by the second one
sendMessage (alice, "alice") (bob, "bob") "hello"
-- alice #: ("2", "bob", "SEND :hello") #> ("2", "bob", MID 1)
-- alice <# ("", "bob", SENT 1)
-- bob <#= \case ("", "alice", Msg "hello") -> True; _ -> False
-- bob #: ("12", "alice", "ACK 1") #> ("12", "alice", OK)
bob #: ("14", "alice", "SEND 9\nhello too") #> ("14", "alice", MID 2)
bob <# ("", "alice", SENT 2)
-- if delivery is blocked it won't go further
alice <#= \case ("", "bob", Msg "hello too") -> True; _ -> False
alice #: ("3", "bob", "ACK 2") #> ("3", "bob", OK)
testMsgDeliveryQuotaExceeded :: Transport c => TProxy c -> c -> c -> IO ()
testMsgDeliveryQuotaExceeded _ alice bob = do
connect (alice, "alice") (bob, "bob")
connect (alice, "alice2") (bob, "bob2")
forM_ [1 .. 4 :: Int] $ \i -> do
let corrId = bshow i
msg = "message " <> bshow i
(_, "bob", Right (MID mId)) <- alice #: (corrId, "bob", "SEND :" <> msg)
alice <#= \case ("", "bob", SENT m) -> m == mId; _ -> False
(_, "bob", Right (MID _)) <- alice #: ("5", "bob", "SEND :over quota")
alice #: ("1", "bob2", "SEND :hello") #> ("1", "bob2", MID 1)
-- if delivery is blocked it won't go further
alice <# ("", "bob2", SENT 1)
connect :: forall c. Transport c => (c, ByteString) -> (c, ByteString) -> IO ()
connect (h1, name1) (h2, name2) = do
("c1", _, Right (INV cReq)) <- h1 #: ("c1", name2, "NEW INV")
@@ -338,6 +385,16 @@ connect (h1, name1) (h2, name2) = do
h2 <# ("", name1, CON)
h1 <# ("", name2, CON)
sendMessage :: Transport c => (c, ConnId) -> (c, ConnId) -> ByteString -> IO ()
sendMessage (h1, name1) (h2, name2) msg = do
("m1", name2', Right (MID mId)) <- h1 #: ("m1", name2, "SEND :" <> msg)
name2' `shouldBe` name2
h1 <#= \case ("", n, SENT m) -> n == name2 && m == mId; _ -> False
("", name1', Right (MSG MsgMeta {recipient = (msgId, _)} msg')) <- (h2 <#:)
name1' `shouldBe` name1
msg' `shouldBe` msg
h2 #: ("m2", name1, "ACK " <> bshow msgId) =#> \case ("m2", n, OK) -> n == name1; _ -> False
-- connect' :: forall c. Transport c => c -> c -> IO (ByteString, ByteString)
-- connect' h1 h2 = do
-- ("c1", conn2, Right (INV cReq)) <- h1 #: ("c1", "", "NEW INV")