mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-23 21:15:56 +00:00
smp server: split and reduce STM transactions (#1294)
This commit is contained in:
@@ -247,7 +247,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
|
||||
rIds <- M.keysSet <$> readTVarIO ms
|
||||
forM_ rIds $ \rId -> do
|
||||
q <- liftIO $ getMsgQueue ms rId quota
|
||||
deleted <- atomically $ deleteExpiredMsgs q old
|
||||
deleted <- liftIO $ deleteExpiredMsgs q old
|
||||
liftIO $ atomicModifyIORef'_ (msgExpired stats) (+ deleted)
|
||||
|
||||
serverStatsThread_ :: ServerConfig -> [M ()]
|
||||
@@ -623,10 +623,10 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
|
||||
CPDelete queueId' -> withUserRole $ unliftIO u $ do
|
||||
st <- asks queueStore
|
||||
ms <- asks msgStore
|
||||
queueId <- atomically (getQueue st SSender queueId') >>= \case
|
||||
queueId <- liftIO (getQueue st SSender queueId') >>= \case
|
||||
Left _ -> pure queueId' -- fallback to using as recipientId directly
|
||||
Right QueueRec {recipientId} -> pure recipientId
|
||||
r <- atomically $
|
||||
r <- liftIO $
|
||||
deleteQueue st queueId $>>= \q ->
|
||||
Right . (q,) <$> delMsgQueueSize ms queueId
|
||||
case r of
|
||||
@@ -832,7 +832,7 @@ verifyTransmission auth_ tAuth authorized queueId cmd =
|
||||
get :: DirectParty p => SParty p -> M (Either ErrorType QueueRec)
|
||||
get party = do
|
||||
st <- asks queueStore
|
||||
atomically $ getQueue st party queueId
|
||||
liftIO $ getQueue st party queueId
|
||||
|
||||
verifyCmdAuthorization :: Maybe (THandleAuth 'TServer, C.CbNonce) -> Maybe TransmissionAuth -> ByteString -> C.APublicAuthKey -> Bool
|
||||
verifyCmdAuthorization auth_ tAuth authorized key = maybe False (verify key) tAuth
|
||||
@@ -1039,11 +1039,11 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
|
||||
ids@(rId, _) <- getIds
|
||||
-- create QueueRec record with these ids and keys
|
||||
let qr = qRec ids
|
||||
atomically (addQueue st qr) >>= \case
|
||||
liftIO (addQueue st qr) >>= \case
|
||||
Left DUPLICATE_ -> addQueueRetry (n - 1) qik qRec
|
||||
Left e -> pure $ ERR e
|
||||
Right _ -> do
|
||||
withLog (`logCreateById` rId)
|
||||
Right () -> do
|
||||
withLog (`logCreateQueue` qr)
|
||||
stats <- asks serverStats
|
||||
incStat $ qCreated stats
|
||||
incStat $ qCount stats
|
||||
@@ -1052,12 +1052,6 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
|
||||
SMSubscribe -> void $ subscribeQueue qr rId
|
||||
pure $ IDS (qik ids)
|
||||
|
||||
logCreateById :: StoreLog 'WriteMode -> RecipientId -> IO ()
|
||||
logCreateById s rId =
|
||||
atomically (getQueue st SRecipient rId) >>= \case
|
||||
Right q -> logCreateQueue s q
|
||||
_ -> pure ()
|
||||
|
||||
getIds :: M (RecipientId, SenderId)
|
||||
getIds = do
|
||||
n <- asks $ queueIdBytes . config
|
||||
@@ -1069,7 +1063,7 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
|
||||
st <- asks queueStore
|
||||
stats <- asks serverStats
|
||||
incStat $ qSecured stats
|
||||
atomically $ either ERR (const OK) <$> secureQueue st rId sKey
|
||||
liftIO $ either ERR (const OK) <$> secureQueue st rId sKey
|
||||
|
||||
addQueueNotifier_ :: QueueStore -> NtfPublicAuthKey -> RcvNtfPublicDhKey -> M (Transmission BrokerMsg)
|
||||
addQueueNotifier_ st notifierKey dhKey = time "NKEY" $ do
|
||||
@@ -1082,7 +1076,7 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
|
||||
addNotifierRetry n rcvPublicDhKey rcvNtfDhSecret = do
|
||||
notifierId <- randomId =<< asks (queueIdBytes . config)
|
||||
let ntfCreds = NtfCreds {notifierId, notifierKey, rcvNtfDhSecret}
|
||||
atomically (addQueueNotifier st entId ntfCreds) >>= \case
|
||||
liftIO (addQueueNotifier st entId ntfCreds) >>= \case
|
||||
Left DUPLICATE_ -> addNotifierRetry (n - 1) rcvPublicDhKey rcvNtfDhSecret
|
||||
Left e -> pure $ ERR e
|
||||
Right _ -> do
|
||||
@@ -1093,7 +1087,7 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
|
||||
deleteQueueNotifier_ :: QueueStore -> M (Transmission BrokerMsg)
|
||||
deleteQueueNotifier_ st = do
|
||||
withLog (`logDeleteNotifier` entId)
|
||||
atomically (deleteQueueNotifier st entId) >>= \case
|
||||
liftIO (deleteQueueNotifier st entId) >>= \case
|
||||
Right () -> do
|
||||
-- Possibly, the same should be done if the queue is suspended, but currently we do not use it
|
||||
atomically $ writeTQueue ntfSubscribedQ (entId, clnt, False)
|
||||
@@ -1104,7 +1098,7 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
|
||||
suspendQueue_ :: QueueStore -> M (Transmission BrokerMsg)
|
||||
suspendQueue_ st = do
|
||||
withLog (`logSuspendQueue` entId)
|
||||
okResp <$> atomically (suspendQueue st entId)
|
||||
okResp <$> liftIO (suspendQueue st entId)
|
||||
|
||||
subscribeQueue :: QueueRec -> RecipientId -> M (Transmission BrokerMsg)
|
||||
subscribeQueue qr rId = do
|
||||
@@ -1130,7 +1124,7 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
|
||||
deliver :: Bool -> Sub -> M (Transmission BrokerMsg)
|
||||
deliver inc sub = do
|
||||
q <- getStoreMsgQueue "SUB" rId
|
||||
msg_ <- atomically $ tryPeekMsg q
|
||||
msg_ <- liftIO $ tryPeekMsgIO q
|
||||
when (inc && isJust msg_) $
|
||||
incStat . qSub =<< asks serverStats
|
||||
deliverMessage "SUB" qr rId sub msg_
|
||||
@@ -1161,6 +1155,7 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
|
||||
q <- getStoreMsgQueue "GET" entId
|
||||
stats <- asks serverStats
|
||||
(statCnt, r) <-
|
||||
-- TODO split STM, use tryPeekMsgIO
|
||||
atomically $
|
||||
tryPeekMsg q >>= \case
|
||||
Just msg ->
|
||||
@@ -1199,11 +1194,11 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
|
||||
q <- getStoreMsgQueue "ACK" entId
|
||||
case st of
|
||||
ProhibitSub -> do
|
||||
deletedMsg_ <- atomically $ tryDelMsg q msgId
|
||||
deletedMsg_ <- liftIO $ tryDelMsg q msgId
|
||||
mapM_ (updateStats True) deletedMsg_
|
||||
pure ok
|
||||
_ -> do
|
||||
(deletedMsg_, msg_) <- atomically $ tryDelPeekMsg q msgId
|
||||
(deletedMsg_, msg_) <- liftIO $ tryDelPeekMsg q msgId
|
||||
mapM_ (updateStats False) deletedMsg_
|
||||
deliverMessage "ACK" qr entId sub msg_
|
||||
_ -> pure $ err NO_MSG
|
||||
@@ -1246,7 +1241,7 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
|
||||
msg_ <- time "SEND" $ do
|
||||
q <- getStoreMsgQueue "SEND" $ recipientId qr
|
||||
expireMessages q
|
||||
atomically . writeMsg q =<< mkMessage body
|
||||
liftIO . writeMsg q =<< mkMessage body
|
||||
case msg_ of
|
||||
Nothing -> do
|
||||
incStat $ msgSentQuota stats
|
||||
@@ -1273,7 +1268,7 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
|
||||
expireMessages q = do
|
||||
msgExp <- asks $ messageExpiration . config
|
||||
old <- liftIO $ mapM expireBeforeEpoch msgExp
|
||||
deleted <- atomically $ sum <$> mapM (deleteExpiredMsgs q) old
|
||||
deleted <- liftIO $ sum <$> mapM (deleteExpiredMsgs q) old
|
||||
when (deleted > 0) $ do
|
||||
stats <- asks serverStats
|
||||
liftIO $ atomicModifyIORef'_ (msgExpired stats) (+ deleted)
|
||||
@@ -1290,6 +1285,8 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
|
||||
tryDeliverMessage msg = atomically deliverToSub >>= mapM_ forkDeliver
|
||||
where
|
||||
rId = recipientId qr
|
||||
-- TODO split to multiple STM transactions, move lookups to IO
|
||||
-- remove tryPeekMsg
|
||||
deliverToSub =
|
||||
TM.lookup rId subscribers
|
||||
$>>= \rc@Client {subscriptions = subs, sndQ = q} -> TM.lookup rId subs
|
||||
@@ -1450,7 +1447,7 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
|
||||
delQueueAndMsgs st = do
|
||||
withLog (`logDeleteQueue` entId)
|
||||
ms <- asks msgStore
|
||||
atomically (deleteQueue st entId $>>= \q -> delMsgQueue ms entId $> Right q) >>= \case
|
||||
liftIO (deleteQueue st entId $>>= \q -> delMsgQueue ms entId $> Right q) >>= \case
|
||||
Right q -> do
|
||||
-- Possibly, the same should be done if the queue is suspended, but currently we do not use it
|
||||
atomically $ writeTQueue subscribedQ (entId, clnt, False)
|
||||
@@ -1465,7 +1462,7 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
|
||||
q <- getStoreMsgQueue "getQueueInfo" entId
|
||||
qiSub <- liftIO $ TM.lookupIO entId subscriptions >>= mapM mkQSub
|
||||
qiSize <- liftIO $ getQueueSize q
|
||||
qiMsg <- atomically $ toMsgInfo <$$> tryPeekMsg q
|
||||
qiMsg <- liftIO $ toMsgInfo <$$> tryPeekMsgIO q
|
||||
let info = QueueInfo {qiSnd = isJust senderKey, qiNtf = isJust notifier, qiSub, qiSize, qiMsg}
|
||||
pure (corrId, entId, INFO info)
|
||||
where
|
||||
@@ -1571,7 +1568,7 @@ restoreServerMessages =
|
||||
s = LB.toStrict s'
|
||||
addToMsgQueue rId msg = do
|
||||
q <- liftIO $ getMsgQueue ms rId quota
|
||||
(isExpired, logFull) <- atomically $ case msg of
|
||||
(isExpired, logFull) <- liftIO $ case msg of
|
||||
Message {msgTs}
|
||||
| maybe True (systemSeconds msgTs >=) old_ -> (False,) . isNothing <$> writeMsg q msg
|
||||
| otherwise -> pure (True, False)
|
||||
|
||||
@@ -16,7 +16,7 @@ module Simplex.Messaging.Server.MsgStore.STM
|
||||
delMsgQueueSize,
|
||||
writeMsg,
|
||||
tryPeekMsg,
|
||||
peekMsg,
|
||||
tryPeekMsgIO,
|
||||
tryDelMsg,
|
||||
tryDelPeekMsg,
|
||||
deleteExpiredMsgs,
|
||||
@@ -61,14 +61,14 @@ getMsgQueue st rId quota = TM.lookupIO rId st >>= maybe (atomically maybeNewQ) p
|
||||
TM.insert rId q st
|
||||
pure q
|
||||
|
||||
delMsgQueue :: STMMsgStore -> RecipientId -> STM ()
|
||||
delMsgQueue st rId = TM.delete rId st
|
||||
delMsgQueue :: STMMsgStore -> RecipientId -> IO ()
|
||||
delMsgQueue st rId = atomically $ TM.delete rId st
|
||||
|
||||
delMsgQueueSize :: STMMsgStore -> RecipientId -> STM Int
|
||||
delMsgQueueSize st rId = TM.lookupDelete rId st >>= maybe (pure 0) (\MsgQueue {size} -> readTVar size)
|
||||
delMsgQueueSize :: STMMsgStore -> RecipientId -> IO Int
|
||||
delMsgQueueSize st rId = atomically (TM.lookupDelete rId st) >>= maybe (pure 0) (\MsgQueue {size} -> readTVarIO size)
|
||||
|
||||
writeMsg :: MsgQueue -> Message -> STM (Maybe (Message, Bool))
|
||||
writeMsg MsgQueue {msgQueue = q, quota, canWrite, size} !msg = do
|
||||
writeMsg :: MsgQueue -> Message -> IO (Maybe (Message, Bool))
|
||||
writeMsg MsgQueue {msgQueue = q, quota, canWrite, size} !msg = atomically $ do
|
||||
canWrt <- readTVar canWrite
|
||||
empty <- isEmptyTQueue q
|
||||
if canWrt || empty
|
||||
@@ -83,43 +83,44 @@ writeMsg MsgQueue {msgQueue = q, quota, canWrite, size} !msg = do
|
||||
where
|
||||
msgQuota = MessageQuota {msgId = msgId msg, msgTs = msgTs msg}
|
||||
|
||||
tryPeekMsgIO :: MsgQueue -> IO (Maybe Message)
|
||||
tryPeekMsgIO = atomically . tryPeekTQueue . msgQueue
|
||||
{-# INLINE tryPeekMsgIO #-}
|
||||
|
||||
-- TODO remove once deliverToSub is split
|
||||
tryPeekMsg :: MsgQueue -> STM (Maybe Message)
|
||||
tryPeekMsg = tryPeekTQueue . msgQueue
|
||||
{-# INLINE tryPeekMsg #-}
|
||||
|
||||
peekMsg :: MsgQueue -> STM Message
|
||||
peekMsg = peekTQueue . msgQueue
|
||||
{-# INLINE peekMsg #-}
|
||||
|
||||
tryDelMsg :: MsgQueue -> MsgId -> STM (Maybe Message)
|
||||
tryDelMsg mq msgId' =
|
||||
tryDelMsg :: MsgQueue -> MsgId -> IO (Maybe Message)
|
||||
tryDelMsg mq msgId' = atomically $
|
||||
tryPeekMsg mq >>= \case
|
||||
msg_@(Just msg)
|
||||
| msgId msg == msgId' || B.null msgId' -> tryDeleteMsg mq >> pure msg_
|
||||
| msgId msg == msgId' || B.null msgId' -> tryDeleteMsg_ mq >> pure msg_
|
||||
| otherwise -> pure Nothing
|
||||
_ -> pure Nothing
|
||||
|
||||
-- atomic delete (== read) last and peek next message if available
|
||||
tryDelPeekMsg :: MsgQueue -> MsgId -> STM (Maybe Message, Maybe Message)
|
||||
tryDelPeekMsg mq msgId' =
|
||||
tryDelPeekMsg :: MsgQueue -> MsgId -> IO (Maybe Message, Maybe Message)
|
||||
tryDelPeekMsg mq msgId' = atomically $
|
||||
tryPeekMsg mq >>= \case
|
||||
msg_@(Just msg)
|
||||
| msgId msg == msgId' || B.null msgId' -> (msg_,) <$> (tryDeleteMsg mq >> tryPeekMsg mq)
|
||||
| msgId msg == msgId' || B.null msgId' -> (msg_,) <$> (tryDeleteMsg_ mq >> tryPeekMsg mq)
|
||||
| otherwise -> pure (Nothing, msg_)
|
||||
_ -> pure (Nothing, Nothing)
|
||||
|
||||
deleteExpiredMsgs :: MsgQueue -> Int64 -> STM Int
|
||||
deleteExpiredMsgs mq old = loop 0
|
||||
deleteExpiredMsgs :: MsgQueue -> Int64 -> IO Int
|
||||
deleteExpiredMsgs mq old = atomically $ loop 0
|
||||
where
|
||||
loop dc =
|
||||
tryPeekMsg mq >>= \case
|
||||
Just Message {msgTs}
|
||||
| systemSeconds msgTs < old ->
|
||||
tryDeleteMsg mq >> loop (dc + 1)
|
||||
tryDeleteMsg_ mq >> loop (dc + 1)
|
||||
_ -> pure dc
|
||||
|
||||
tryDeleteMsg :: MsgQueue -> STM ()
|
||||
tryDeleteMsg MsgQueue {msgQueue = q, size} =
|
||||
tryDeleteMsg_ :: MsgQueue -> STM ()
|
||||
tryDeleteMsg_ MsgQueue {msgQueue = q, size} =
|
||||
tryReadTQueue q >>= \case
|
||||
Just _ -> modifyTVar' size (subtract 1)
|
||||
_ -> pure ()
|
||||
|
||||
@@ -45,8 +45,8 @@ newQueueStore = do
|
||||
notifiers <- TM.emptyIO
|
||||
pure QueueStore {queues, senders, notifiers}
|
||||
|
||||
addQueue :: QueueStore -> QueueRec -> STM (Either ErrorType ())
|
||||
addQueue QueueStore {queues, senders} q@QueueRec {recipientId = rId, senderId = sId} = do
|
||||
addQueue :: QueueStore -> QueueRec -> IO (Either ErrorType ())
|
||||
addQueue QueueStore {queues, senders} q@QueueRec {recipientId = rId, senderId = sId} = atomically $ do
|
||||
ifM hasId (pure $ Left DUPLICATE_) $ do
|
||||
qVar <- newTVar q
|
||||
TM.insert rId qVar queues
|
||||
@@ -55,26 +55,26 @@ addQueue QueueStore {queues, senders} q@QueueRec {recipientId = rId, senderId =
|
||||
where
|
||||
hasId = (||) <$> TM.member rId queues <*> TM.member sId senders
|
||||
|
||||
getQueue :: DirectParty p => QueueStore -> SParty p -> QueueId -> STM (Either ErrorType QueueRec)
|
||||
getQueue :: DirectParty p => QueueStore -> SParty p -> QueueId -> IO (Either ErrorType QueueRec)
|
||||
getQueue QueueStore {queues, senders, notifiers} party qId =
|
||||
toResult <$> (mapM readTVar =<< getVar)
|
||||
toResult <$> (mapM readTVarIO =<< getVar)
|
||||
where
|
||||
getVar = case party of
|
||||
SRecipient -> TM.lookup qId queues
|
||||
SSender -> TM.lookup qId senders $>>= (`TM.lookup` queues)
|
||||
SNotifier -> TM.lookup qId notifiers $>>= (`TM.lookup` queues)
|
||||
SRecipient -> TM.lookupIO qId queues
|
||||
SSender -> TM.lookupIO qId senders $>>= (`TM.lookupIO` queues)
|
||||
SNotifier -> TM.lookupIO qId notifiers $>>= (`TM.lookupIO` queues)
|
||||
|
||||
secureQueue :: QueueStore -> RecipientId -> SndPublicAuthKey -> STM (Either ErrorType QueueRec)
|
||||
secureQueue :: QueueStore -> RecipientId -> SndPublicAuthKey -> IO (Either ErrorType QueueRec)
|
||||
secureQueue QueueStore {queues} rId sKey =
|
||||
withQueue rId queues $ \qVar ->
|
||||
atomically $ withQueue rId queues $ \qVar ->
|
||||
readTVar qVar >>= \q -> case senderKey q of
|
||||
Just k -> pure $ if sKey == k then Just q else Nothing
|
||||
_ ->
|
||||
let !q' = q {senderKey = Just sKey}
|
||||
in writeTVar qVar q' $> Just q'
|
||||
|
||||
addQueueNotifier :: QueueStore -> RecipientId -> NtfCreds -> STM (Either ErrorType QueueRec)
|
||||
addQueueNotifier QueueStore {queues, notifiers} rId ntfCreds@NtfCreds {notifierId = nId} = do
|
||||
addQueueNotifier :: QueueStore -> RecipientId -> NtfCreds -> IO (Either ErrorType QueueRec)
|
||||
addQueueNotifier QueueStore {queues, notifiers} rId ntfCreds@NtfCreds {notifierId = nId} = atomically $ do
|
||||
ifM (TM.member nId notifiers) (pure $ Left DUPLICATE_) $
|
||||
withQueue rId queues $ \qVar -> do
|
||||
q <- readTVar qVar
|
||||
@@ -83,20 +83,20 @@ addQueueNotifier QueueStore {queues, notifiers} rId ntfCreds@NtfCreds {notifierI
|
||||
TM.insert nId rId notifiers
|
||||
pure $ Just q
|
||||
|
||||
deleteQueueNotifier :: QueueStore -> RecipientId -> STM (Either ErrorType ())
|
||||
deleteQueueNotifier :: QueueStore -> RecipientId -> IO (Either ErrorType ())
|
||||
deleteQueueNotifier QueueStore {queues, notifiers} rId =
|
||||
withQueue rId queues $ \qVar -> do
|
||||
atomically $ withQueue rId queues $ \qVar -> do
|
||||
q <- readTVar qVar
|
||||
forM_ (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId notifiers
|
||||
writeTVar qVar $! q {notifier = Nothing}
|
||||
pure $ Just ()
|
||||
|
||||
suspendQueue :: QueueStore -> RecipientId -> STM (Either ErrorType ())
|
||||
suspendQueue :: QueueStore -> RecipientId -> IO (Either ErrorType ())
|
||||
suspendQueue QueueStore {queues} rId =
|
||||
withQueue rId queues $ \qVar -> modifyTVar' qVar (\q -> q {status = QueueOff}) $> Just ()
|
||||
atomically $ withQueue rId queues $ \qVar -> modifyTVar' qVar (\q -> q {status = QueueOff}) $> Just ()
|
||||
|
||||
deleteQueue :: QueueStore -> RecipientId -> STM (Either ErrorType QueueRec)
|
||||
deleteQueue QueueStore {queues, senders, notifiers} rId = do
|
||||
deleteQueue :: QueueStore -> RecipientId -> IO (Either ErrorType QueueRec)
|
||||
deleteQueue QueueStore {queues, senders, notifiers} rId = atomically $ do
|
||||
TM.lookupDelete rId queues >>= \case
|
||||
Just qVar ->
|
||||
readTVar qVar >>= \q -> do
|
||||
|
||||
@@ -2,7 +2,6 @@
|
||||
|
||||
module Simplex.Messaging.Transport.HTTP2 where
|
||||
|
||||
import Control.Concurrent.STM
|
||||
import qualified Control.Exception as E
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
|
||||
Reference in New Issue
Block a user