mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-14 16:15:12 +00:00
smp server: get message queue faster, avoiding STM contention if queue exists, split transaction for notification delivery (#1289)
* put DRG state to IORef, split STM transaction of sending notification (#1288)
* put DRG state to IORef, split STM transaction of sending notification
* remove comment
* remove comment
* add comment
* revert version
* smp server: get message queue faster, avoiding STM contention if queue exists
* IORef for counter
* Revert "put DRG state to IORef, split STM transaction of sending notification (#1288)"
This reverts commit 517933d189.
* version
* remove IORef
* split notification delivery transations
* revert version
This commit is contained in:
+1
-1
@@ -1,5 +1,5 @@
|
||||
name: simplexmq
|
||||
version: 6.0.2.0
|
||||
version: 6.0.2
|
||||
synopsis: SimpleXMQ message broker
|
||||
description: |
|
||||
This package includes <./docs/Simplex-Messaging-Server.html server>,
|
||||
|
||||
+1
-1
@@ -5,7 +5,7 @@ cabal-version: 1.12
|
||||
-- see: https://github.com/sol/hpack
|
||||
|
||||
name: simplexmq
|
||||
version: 6.0.2.0
|
||||
version: 6.0.2
|
||||
synopsis: SimpleXMQ message broker
|
||||
description: This package includes <./docs/Simplex-Messaging-Server.html server>,
|
||||
<./docs/Simplex-Messaging-Client.html client> and
|
||||
|
||||
@@ -44,7 +44,6 @@ import Control.Monad.Except
|
||||
import Control.Monad.IO.Unlift
|
||||
import Control.Monad.Reader
|
||||
import Control.Monad.Trans.Except
|
||||
import Crypto.Random
|
||||
import Control.Monad.STM (retry)
|
||||
import Data.Bifunctor (first)
|
||||
import Data.ByteString.Base64 (encode)
|
||||
@@ -247,7 +246,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
|
||||
old <- liftIO $ expireBeforeEpoch expCfg
|
||||
rIds <- M.keysSet <$> readTVarIO ms
|
||||
forM_ rIds $ \rId -> do
|
||||
q <- atomically (getMsgQueue ms rId quota)
|
||||
q <- liftIO $ getMsgQueue ms rId quota
|
||||
deleted <- atomically $ deleteExpiredMsgs q old
|
||||
liftIO $ atomicModifyIORef'_ (msgExpired stats) (+ deleted)
|
||||
|
||||
@@ -1255,15 +1254,7 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
|
||||
Just (msg, wasEmpty) -> time "SEND ok" $ do
|
||||
when wasEmpty $ tryDeliverMessage msg
|
||||
when (notification msgFlags) $ do
|
||||
forM_ (notifier qr) $ \ntf -> do
|
||||
asks random >>= atomically . trySendNotification ntf msg >>= \case
|
||||
Nothing -> do
|
||||
incStat $ msgNtfNoSub stats
|
||||
logWarn "No notification subscription"
|
||||
Just False -> do
|
||||
incStat $ msgNtfLost stats
|
||||
logWarn "Dropped message notification"
|
||||
Just True -> incStat $ msgNtfs stats
|
||||
mapM_ (`trySendNotification` msg) (notifier qr)
|
||||
incStat $ msgSentNtf stats
|
||||
liftIO $ updatePeriodStats (activeQueuesNtf stats) (recipientId qr)
|
||||
incStat $ msgSent stats
|
||||
@@ -1335,23 +1326,35 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
|
||||
deliver q s
|
||||
writeTVar st NoSub
|
||||
|
||||
trySendNotification :: NtfCreds -> Message -> TVar ChaChaDRG -> STM (Maybe Bool)
|
||||
trySendNotification NtfCreds {notifierId, rcvNtfDhSecret} msg ntfNonceDrg =
|
||||
mapM (writeNtf notifierId msg rcvNtfDhSecret ntfNonceDrg) =<< TM.lookup notifierId notifiers
|
||||
trySendNotification :: NtfCreds -> Message -> M ()
|
||||
trySendNotification NtfCreds {notifierId, rcvNtfDhSecret} msg = do
|
||||
stats <- asks serverStats
|
||||
liftIO (TM.lookupIO notifierId notifiers) >>= \case
|
||||
Nothing -> do
|
||||
incStat $ msgNtfNoSub stats
|
||||
logWarn "No notification subscription"
|
||||
Just ntfClnt -> do
|
||||
let updateStats True = incStat $ msgNtfs stats
|
||||
updateStats _ = do
|
||||
incStat $ msgNtfLost stats
|
||||
logWarn "Dropped message notification"
|
||||
writeNtf notifierId msg rcvNtfDhSecret ntfClnt >>= mapM_ updateStats
|
||||
|
||||
writeNtf :: NotifierId -> Message -> RcvNtfDhSecret -> TVar ChaChaDRG -> Client -> STM Bool
|
||||
writeNtf nId msg rcvNtfDhSecret ntfNonceDrg Client {sndQ = q} =
|
||||
ifM (isFullTBQueue q) (pure False) (sendNtf $> True)
|
||||
where
|
||||
sendNtf = case msg of
|
||||
Message {msgId, msgTs} -> do
|
||||
(nmsgNonce, encNMsgMeta) <- mkMessageNotification msgId msgTs rcvNtfDhSecret ntfNonceDrg
|
||||
writeTBQueue q [(CorrId "", nId, NMSG nmsgNonce encNMsgMeta)]
|
||||
_ -> pure ()
|
||||
writeNtf :: NotifierId -> Message -> RcvNtfDhSecret -> Client -> M (Maybe Bool)
|
||||
writeNtf nId msg rcvNtfDhSecret Client {sndQ = q} = case msg of
|
||||
Message {msgId, msgTs} -> Just <$> do
|
||||
(nmsgNonce, encNMsgMeta) <- mkMessageNotification msgId msgTs rcvNtfDhSecret
|
||||
-- must be in one STM transaction to avoid the queue becoming full between the check and writing
|
||||
atomically $
|
||||
ifM
|
||||
(isFullTBQueue q)
|
||||
(pure $ False)
|
||||
(True <$ writeTBQueue q [(CorrId "", nId, NMSG nmsgNonce encNMsgMeta)])
|
||||
_ -> pure Nothing
|
||||
|
||||
mkMessageNotification :: ByteString -> SystemTime -> RcvNtfDhSecret -> TVar ChaChaDRG -> STM (C.CbNonce, EncNMsgMeta)
|
||||
mkMessageNotification msgId msgTs rcvNtfDhSecret ntfNonceDrg = do
|
||||
cbNonce <- C.randomCbNonce ntfNonceDrg
|
||||
mkMessageNotification :: ByteString -> SystemTime -> RcvNtfDhSecret -> M (C.CbNonce, EncNMsgMeta)
|
||||
mkMessageNotification msgId msgTs rcvNtfDhSecret = do
|
||||
cbNonce <- atomically . C.randomCbNonce =<< asks random
|
||||
let msgMeta = NMsgMeta {msgId, msgTs}
|
||||
encNMsgMeta = C.cbEncrypt rcvNtfDhSecret cbNonce (smpEncode msgMeta) 128
|
||||
pure . (cbNonce,) $ fromRight "" encNMsgMeta
|
||||
@@ -1441,7 +1444,7 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
|
||||
getStoreMsgQueue name rId = time (name <> " getMsgQueue") $ do
|
||||
ms <- asks msgStore
|
||||
quota <- asks $ msgQueueQuota . config
|
||||
atomically $ getMsgQueue ms rId quota
|
||||
liftIO $ getMsgQueue ms rId quota
|
||||
|
||||
delQueueAndMsgs :: QueueStore -> M (Transmission BrokerMsg)
|
||||
delQueueAndMsgs st = do
|
||||
@@ -1459,24 +1462,23 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
|
||||
|
||||
getQueueInfo :: QueueRec -> M (Transmission BrokerMsg)
|
||||
getQueueInfo QueueRec {senderKey, notifier} = do
|
||||
q@MsgQueue {size} <- getStoreMsgQueue "getQueueInfo" entId
|
||||
info <- atomically $ do
|
||||
qiSub <- TM.lookup entId subscriptions >>= mapM mkQSub
|
||||
qiSize <- readTVar size
|
||||
qiMsg <- toMsgInfo <$$> tryPeekMsg q
|
||||
pure QueueInfo {qiSnd = isJust senderKey, qiNtf = isJust notifier, qiSub, qiSize, qiMsg}
|
||||
q <- getStoreMsgQueue "getQueueInfo" entId
|
||||
qiSub <- liftIO $ TM.lookupIO entId subscriptions >>= mapM mkQSub
|
||||
qiSize <- liftIO $ getQueueSize q
|
||||
qiMsg <- atomically $ toMsgInfo <$$> tryPeekMsg q
|
||||
let info = QueueInfo {qiSnd = isJust senderKey, qiNtf = isJust notifier, qiSub, qiSize, qiMsg}
|
||||
pure (corrId, entId, INFO info)
|
||||
where
|
||||
mkQSub Sub {subThread, delivered} = do
|
||||
qSubThread <- case subThread of
|
||||
ServerSub t -> do
|
||||
st <- readTVar t
|
||||
st <- readTVarIO t
|
||||
pure $ case st of
|
||||
NoSub -> QNoSub
|
||||
SubPending -> QSubPending
|
||||
SubThread _ -> QSubThread
|
||||
ProhibitSub -> pure QProhibitSub
|
||||
qDelivered <- decodeLatin1 . encode <$$> tryReadTMVar delivered
|
||||
qDelivered <- atomically $ decodeLatin1 . encode <$$> tryReadTMVar delivered
|
||||
pure QSub {qSubThread, qDelivered}
|
||||
|
||||
ok :: Transmission BrokerMsg
|
||||
@@ -1564,13 +1566,12 @@ restoreServerMessages =
|
||||
where
|
||||
s = LB.toStrict s'
|
||||
addToMsgQueue rId msg = do
|
||||
(isExpired, logFull) <- atomically $ do
|
||||
q <- getMsgQueue ms rId quota
|
||||
case msg of
|
||||
Message {msgTs}
|
||||
| maybe True (systemSeconds msgTs >=) old_ -> (False,) . isNothing <$> writeMsg q msg
|
||||
| otherwise -> pure (True, False)
|
||||
MessageQuota {} -> writeMsg q msg $> (False, False)
|
||||
q <- liftIO $ getMsgQueue ms rId quota
|
||||
(isExpired, logFull) <- atomically $ case msg of
|
||||
Message {msgTs}
|
||||
| maybe True (systemSeconds msgTs >=) old_ -> (False,) . isNothing <$> writeMsg q msg
|
||||
| otherwise -> pure (True, False)
|
||||
MessageQuota {} -> writeMsg q msg $> (False, False)
|
||||
when logFull . logError . decodeLatin1 $ "message queue " <> strEncode rId <> " is full, message not restored: " <> strEncode (messageId msg)
|
||||
pure $ if isExpired then expired + 1 else expired
|
||||
msgErr :: Show e => String -> e -> String
|
||||
@@ -1595,7 +1596,7 @@ restoreServerStats expiredWhileRestoring = asks (serverStatsBackupFile . config)
|
||||
Right d@ServerStatsData {_qCount = statsQCount} -> do
|
||||
s <- asks serverStats
|
||||
_qCount <- fmap M.size . readTVarIO . queues =<< asks queueStore
|
||||
_msgCount <- foldM (\(!n) q -> (n +) <$> readTVarIO (size q)) 0 =<< readTVarIO =<< asks msgStore
|
||||
_msgCount <- liftIO . foldM (\(!n) q -> (n +) <$> getQueueSize q) 0 =<< readTVarIO =<< asks msgStore
|
||||
liftIO $ setServerStats s d {_qCount, _msgCount, _msgExpired = _msgExpired d + expiredWhileRestoring}
|
||||
renameFile f $ f <> ".bak"
|
||||
logInfo "server stats restored"
|
||||
|
||||
@@ -9,7 +9,7 @@
|
||||
|
||||
module Simplex.Messaging.Server.MsgStore.STM
|
||||
( STMMsgStore,
|
||||
MsgQueue (..),
|
||||
MsgQueue (msgQueue),
|
||||
newMsgStore,
|
||||
getMsgQueue,
|
||||
delMsgQueue,
|
||||
@@ -20,6 +20,7 @@ module Simplex.Messaging.Server.MsgStore.STM
|
||||
tryDelMsg,
|
||||
tryDelPeekMsg,
|
||||
deleteExpiredMsgs,
|
||||
getQueueSize,
|
||||
)
|
||||
where
|
||||
|
||||
@@ -44,9 +45,14 @@ type STMMsgStore = TMap RecipientId MsgQueue
|
||||
newMsgStore :: IO STMMsgStore
|
||||
newMsgStore = TM.emptyIO
|
||||
|
||||
getMsgQueue :: STMMsgStore -> RecipientId -> Int -> STM MsgQueue
|
||||
getMsgQueue st rId quota = maybe newQ pure =<< TM.lookup rId st
|
||||
-- The reason for double lookup is that majority of messaging queues exist,
|
||||
-- because multiple messages are sent to the same queue,
|
||||
-- so the first lookup without STM transaction will return the queue faster.
|
||||
-- In case the queue does not exist, it needs to be looked-up again inside transaction.
|
||||
getMsgQueue :: STMMsgStore -> RecipientId -> Int -> IO MsgQueue
|
||||
getMsgQueue st rId quota = TM.lookupIO rId st >>= maybe (atomically maybeNewQ) pure
|
||||
where
|
||||
maybeNewQ = TM.lookup rId st >>= maybe newQ pure
|
||||
newQ = do
|
||||
msgQueue <- newTQueue
|
||||
canWrite <- newTVar True
|
||||
@@ -117,3 +123,6 @@ tryDeleteMsg MsgQueue {msgQueue = q, size} =
|
||||
tryReadTQueue q >>= \case
|
||||
Just _ -> modifyTVar' size (subtract 1)
|
||||
_ -> pure ()
|
||||
|
||||
getQueueSize :: MsgQueue -> IO Int
|
||||
getQueueSize MsgQueue {size} = readTVarIO size
|
||||
|
||||
@@ -771,7 +771,7 @@ testTiming (ATransport t) =
|
||||
(C.AuthAlg C.SX25519, C.AuthAlg C.SX25519, 200) -- correct key type
|
||||
]
|
||||
timeRepeat n = fmap fst . timeItT . forM_ (replicate n ()) . const
|
||||
similarTime t1 t2 = abs (t2 / t1 - 1) < 0.2 -- normally the difference between "no queue" and "wrong key" is less than 5%
|
||||
similarTime t1 t2 = abs (t2 / t1 - 1) < 0.25 -- normally the difference between "no queue" and "wrong key" is less than 5%
|
||||
testSameTiming :: forall c. Transport c => THandleSMP c 'TClient -> THandleSMP c 'TClient -> (C.AuthAlg, C.AuthAlg, Int) -> Expectation
|
||||
testSameTiming rh sh (C.AuthAlg goodKeyAlg, C.AuthAlg badKeyAlg, n) = do
|
||||
g <- C.newRandom
|
||||
|
||||
Reference in New Issue
Block a user