diff --git a/package.yaml b/package.yaml index 1b30d0f2c..4e1390cb0 100644 --- a/package.yaml +++ b/package.yaml @@ -1,5 +1,5 @@ name: simplexmq -version: 6.0.2.0 +version: 6.0.2 synopsis: SimpleXMQ message broker description: | This package includes <./docs/Simplex-Messaging-Server.html server>, diff --git a/simplexmq.cabal b/simplexmq.cabal index 0e84119ac..59e674987 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -5,7 +5,7 @@ cabal-version: 1.12 -- see: https://github.com/sol/hpack name: simplexmq -version: 6.0.2.0 +version: 6.0.2 synopsis: SimpleXMQ message broker description: This package includes <./docs/Simplex-Messaging-Server.html server>, <./docs/Simplex-Messaging-Client.html client> and diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index fb840fbb1..29b4e2138 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -44,7 +44,6 @@ import Control.Monad.Except import Control.Monad.IO.Unlift import Control.Monad.Reader import Control.Monad.Trans.Except -import Crypto.Random import Control.Monad.STM (retry) import Data.Bifunctor (first) import Data.ByteString.Base64 (encode) @@ -247,7 +246,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do old <- liftIO $ expireBeforeEpoch expCfg rIds <- M.keysSet <$> readTVarIO ms forM_ rIds $ \rId -> do - q <- atomically (getMsgQueue ms rId quota) + q <- liftIO $ getMsgQueue ms rId quota deleted <- atomically $ deleteExpiredMsgs q old liftIO $ atomicModifyIORef'_ (msgExpired stats) (+ deleted) @@ -1255,15 +1254,7 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi Just (msg, wasEmpty) -> time "SEND ok" $ do when wasEmpty $ tryDeliverMessage msg when (notification msgFlags) $ do - forM_ (notifier qr) $ \ntf -> do - asks random >>= atomically . trySendNotification ntf msg >>= \case - Nothing -> do - incStat $ msgNtfNoSub stats - logWarn "No notification subscription" - Just False -> do - incStat $ msgNtfLost stats - logWarn "Dropped message notification" - Just True -> incStat $ msgNtfs stats + mapM_ (`trySendNotification` msg) (notifier qr) incStat $ msgSentNtf stats liftIO $ updatePeriodStats (activeQueuesNtf stats) (recipientId qr) incStat $ msgSent stats @@ -1335,23 +1326,35 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi deliver q s writeTVar st NoSub - trySendNotification :: NtfCreds -> Message -> TVar ChaChaDRG -> STM (Maybe Bool) - trySendNotification NtfCreds {notifierId, rcvNtfDhSecret} msg ntfNonceDrg = - mapM (writeNtf notifierId msg rcvNtfDhSecret ntfNonceDrg) =<< TM.lookup notifierId notifiers + trySendNotification :: NtfCreds -> Message -> M () + trySendNotification NtfCreds {notifierId, rcvNtfDhSecret} msg = do + stats <- asks serverStats + liftIO (TM.lookupIO notifierId notifiers) >>= \case + Nothing -> do + incStat $ msgNtfNoSub stats + logWarn "No notification subscription" + Just ntfClnt -> do + let updateStats True = incStat $ msgNtfs stats + updateStats _ = do + incStat $ msgNtfLost stats + logWarn "Dropped message notification" + writeNtf notifierId msg rcvNtfDhSecret ntfClnt >>= mapM_ updateStats - writeNtf :: NotifierId -> Message -> RcvNtfDhSecret -> TVar ChaChaDRG -> Client -> STM Bool - writeNtf nId msg rcvNtfDhSecret ntfNonceDrg Client {sndQ = q} = - ifM (isFullTBQueue q) (pure False) (sendNtf $> True) - where - sendNtf = case msg of - Message {msgId, msgTs} -> do - (nmsgNonce, encNMsgMeta) <- mkMessageNotification msgId msgTs rcvNtfDhSecret ntfNonceDrg - writeTBQueue q [(CorrId "", nId, NMSG nmsgNonce encNMsgMeta)] - _ -> pure () + writeNtf :: NotifierId -> Message -> RcvNtfDhSecret -> Client -> M (Maybe Bool) + writeNtf nId msg rcvNtfDhSecret Client {sndQ = q} = case msg of + Message {msgId, msgTs} -> Just <$> do + (nmsgNonce, encNMsgMeta) <- mkMessageNotification msgId msgTs rcvNtfDhSecret + -- must be in one STM transaction to avoid the queue becoming full between the check and writing + atomically $ + ifM + (isFullTBQueue q) + (pure $ False) + (True <$ writeTBQueue q [(CorrId "", nId, NMSG nmsgNonce encNMsgMeta)]) + _ -> pure Nothing - mkMessageNotification :: ByteString -> SystemTime -> RcvNtfDhSecret -> TVar ChaChaDRG -> STM (C.CbNonce, EncNMsgMeta) - mkMessageNotification msgId msgTs rcvNtfDhSecret ntfNonceDrg = do - cbNonce <- C.randomCbNonce ntfNonceDrg + mkMessageNotification :: ByteString -> SystemTime -> RcvNtfDhSecret -> M (C.CbNonce, EncNMsgMeta) + mkMessageNotification msgId msgTs rcvNtfDhSecret = do + cbNonce <- atomically . C.randomCbNonce =<< asks random let msgMeta = NMsgMeta {msgId, msgTs} encNMsgMeta = C.cbEncrypt rcvNtfDhSecret cbNonce (smpEncode msgMeta) 128 pure . (cbNonce,) $ fromRight "" encNMsgMeta @@ -1441,7 +1444,7 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi getStoreMsgQueue name rId = time (name <> " getMsgQueue") $ do ms <- asks msgStore quota <- asks $ msgQueueQuota . config - atomically $ getMsgQueue ms rId quota + liftIO $ getMsgQueue ms rId quota delQueueAndMsgs :: QueueStore -> M (Transmission BrokerMsg) delQueueAndMsgs st = do @@ -1459,24 +1462,23 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi getQueueInfo :: QueueRec -> M (Transmission BrokerMsg) getQueueInfo QueueRec {senderKey, notifier} = do - q@MsgQueue {size} <- getStoreMsgQueue "getQueueInfo" entId - info <- atomically $ do - qiSub <- TM.lookup entId subscriptions >>= mapM mkQSub - qiSize <- readTVar size - qiMsg <- toMsgInfo <$$> tryPeekMsg q - pure QueueInfo {qiSnd = isJust senderKey, qiNtf = isJust notifier, qiSub, qiSize, qiMsg} + q <- getStoreMsgQueue "getQueueInfo" entId + qiSub <- liftIO $ TM.lookupIO entId subscriptions >>= mapM mkQSub + qiSize <- liftIO $ getQueueSize q + qiMsg <- atomically $ toMsgInfo <$$> tryPeekMsg q + let info = QueueInfo {qiSnd = isJust senderKey, qiNtf = isJust notifier, qiSub, qiSize, qiMsg} pure (corrId, entId, INFO info) where mkQSub Sub {subThread, delivered} = do qSubThread <- case subThread of ServerSub t -> do - st <- readTVar t + st <- readTVarIO t pure $ case st of NoSub -> QNoSub SubPending -> QSubPending SubThread _ -> QSubThread ProhibitSub -> pure QProhibitSub - qDelivered <- decodeLatin1 . encode <$$> tryReadTMVar delivered + qDelivered <- atomically $ decodeLatin1 . encode <$$> tryReadTMVar delivered pure QSub {qSubThread, qDelivered} ok :: Transmission BrokerMsg @@ -1564,13 +1566,12 @@ restoreServerMessages = where s = LB.toStrict s' addToMsgQueue rId msg = do - (isExpired, logFull) <- atomically $ do - q <- getMsgQueue ms rId quota - case msg of - Message {msgTs} - | maybe True (systemSeconds msgTs >=) old_ -> (False,) . isNothing <$> writeMsg q msg - | otherwise -> pure (True, False) - MessageQuota {} -> writeMsg q msg $> (False, False) + q <- liftIO $ getMsgQueue ms rId quota + (isExpired, logFull) <- atomically $ case msg of + Message {msgTs} + | maybe True (systemSeconds msgTs >=) old_ -> (False,) . isNothing <$> writeMsg q msg + | otherwise -> pure (True, False) + MessageQuota {} -> writeMsg q msg $> (False, False) when logFull . logError . decodeLatin1 $ "message queue " <> strEncode rId <> " is full, message not restored: " <> strEncode (messageId msg) pure $ if isExpired then expired + 1 else expired msgErr :: Show e => String -> e -> String @@ -1595,7 +1596,7 @@ restoreServerStats expiredWhileRestoring = asks (serverStatsBackupFile . config) Right d@ServerStatsData {_qCount = statsQCount} -> do s <- asks serverStats _qCount <- fmap M.size . readTVarIO . queues =<< asks queueStore - _msgCount <- foldM (\(!n) q -> (n +) <$> readTVarIO (size q)) 0 =<< readTVarIO =<< asks msgStore + _msgCount <- liftIO . foldM (\(!n) q -> (n +) <$> getQueueSize q) 0 =<< readTVarIO =<< asks msgStore liftIO $ setServerStats s d {_qCount, _msgCount, _msgExpired = _msgExpired d + expiredWhileRestoring} renameFile f $ f <> ".bak" logInfo "server stats restored" diff --git a/src/Simplex/Messaging/Server/MsgStore/STM.hs b/src/Simplex/Messaging/Server/MsgStore/STM.hs index e0a5c8b45..04447292a 100644 --- a/src/Simplex/Messaging/Server/MsgStore/STM.hs +++ b/src/Simplex/Messaging/Server/MsgStore/STM.hs @@ -9,7 +9,7 @@ module Simplex.Messaging.Server.MsgStore.STM ( STMMsgStore, - MsgQueue (..), + MsgQueue (msgQueue), newMsgStore, getMsgQueue, delMsgQueue, @@ -20,6 +20,7 @@ module Simplex.Messaging.Server.MsgStore.STM tryDelMsg, tryDelPeekMsg, deleteExpiredMsgs, + getQueueSize, ) where @@ -44,9 +45,14 @@ type STMMsgStore = TMap RecipientId MsgQueue newMsgStore :: IO STMMsgStore newMsgStore = TM.emptyIO -getMsgQueue :: STMMsgStore -> RecipientId -> Int -> STM MsgQueue -getMsgQueue st rId quota = maybe newQ pure =<< TM.lookup rId st +-- The reason for double lookup is that majority of messaging queues exist, +-- because multiple messages are sent to the same queue, +-- so the first lookup without STM transaction will return the queue faster. +-- In case the queue does not exist, it needs to be looked-up again inside transaction. +getMsgQueue :: STMMsgStore -> RecipientId -> Int -> IO MsgQueue +getMsgQueue st rId quota = TM.lookupIO rId st >>= maybe (atomically maybeNewQ) pure where + maybeNewQ = TM.lookup rId st >>= maybe newQ pure newQ = do msgQueue <- newTQueue canWrite <- newTVar True @@ -117,3 +123,6 @@ tryDeleteMsg MsgQueue {msgQueue = q, size} = tryReadTQueue q >>= \case Just _ -> modifyTVar' size (subtract 1) _ -> pure () + +getQueueSize :: MsgQueue -> IO Int +getQueueSize MsgQueue {size} = readTVarIO size diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index 30521c8a8..a788af821 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -771,7 +771,7 @@ testTiming (ATransport t) = (C.AuthAlg C.SX25519, C.AuthAlg C.SX25519, 200) -- correct key type ] timeRepeat n = fmap fst . timeItT . forM_ (replicate n ()) . const - similarTime t1 t2 = abs (t2 / t1 - 1) < 0.2 -- normally the difference between "no queue" and "wrong key" is less than 5% + similarTime t1 t2 = abs (t2 / t1 - 1) < 0.25 -- normally the difference between "no queue" and "wrong key" is less than 5% testSameTiming :: forall c. Transport c => THandleSMP c 'TClient -> THandleSMP c 'TClient -> (C.AuthAlg, C.AuthAlg, Int) -> Expectation testSameTiming rh sh (C.AuthAlg goodKeyAlg, C.AuthAlg badKeyAlg, n) = do g <- C.newRandom