mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-25 12:04:32 +00:00
smp server: remove locks for deleted queues, additional statistics for objects in memory (#1498)
* smp server: remove locks for deleted queues, additional statistics for objects in memory * version * reduce queue cache usage * less caching, refactor * comments * revert version
This commit is contained in:
@@ -62,7 +62,8 @@ import Data.Either (fromRight)
|
||||
import Data.Functor (($>))
|
||||
import Data.Int (Int64)
|
||||
import Data.List (intercalate, sort)
|
||||
import Data.Maybe (fromMaybe, isNothing, mapMaybe)
|
||||
import qualified Data.Map.Strict as M
|
||||
import Data.Maybe (fromMaybe, isJust, isNothing, mapMaybe)
|
||||
import Data.Text (Text)
|
||||
import qualified Data.Text as T
|
||||
import Data.Time.Clock (NominalDiffTime, UTCTime, addUTCTime, getCurrentTime)
|
||||
@@ -96,6 +97,7 @@ data JournalMsgStore s = JournalMsgStore
|
||||
queueLocks :: TMap RecipientId Lock,
|
||||
sharedLock :: TMVar RecipientId,
|
||||
queueStore_ :: QStore s,
|
||||
openedQueueCount :: TVar Int,
|
||||
expireBackupsBefore :: UTCTime
|
||||
}
|
||||
|
||||
@@ -338,12 +340,6 @@ instance QueueStoreClass (JournalQueue s) (QStore s) where
|
||||
deleteStoreQueue = withQS deleteStoreQueue
|
||||
{-# INLINE deleteStoreQueue #-}
|
||||
|
||||
#if defined(dbServerPostgres)
|
||||
mkTempQueue :: JournalMsgStore s -> RecipientId -> QueueRec -> IO (JournalQueue s)
|
||||
mkTempQueue ms rId qr = createLockIO >>= makeQueue_ ms rId qr
|
||||
{-# INLINE mkTempQueue #-}
|
||||
#endif
|
||||
|
||||
makeQueue_ :: JournalMsgStore s -> RecipientId -> QueueRec -> Lock -> IO (JournalQueue s)
|
||||
makeQueue_ JournalMsgStore {sharedLock} rId qr queueLock = do
|
||||
queueRec' <- newTVarIO $ Just qr
|
||||
@@ -373,8 +369,9 @@ instance MsgStoreClass (JournalMsgStore s) where
|
||||
queueLocks <- TM.emptyIO
|
||||
sharedLock <- newEmptyTMVarIO
|
||||
queueStore_ <- newQueueStore @(JournalQueue s) queueStoreCfg
|
||||
openedQueueCount <- newTVarIO 0
|
||||
expireBackupsBefore <- addUTCTime (- expireBackupsAfter config) <$> getCurrentTime
|
||||
pure JournalMsgStore {config, random, queueLocks, sharedLock, queueStore_, expireBackupsBefore}
|
||||
pure JournalMsgStore {config, random, queueLocks, sharedLock, queueStore_, openedQueueCount, expireBackupsBefore}
|
||||
|
||||
closeMsgStore :: JournalMsgStore s -> IO ()
|
||||
closeMsgStore ms = do
|
||||
@@ -382,7 +379,7 @@ instance MsgStoreClass (JournalMsgStore s) where
|
||||
closeQueues $ loadedQueues @(JournalQueue s) st
|
||||
closeQueueStore @(JournalQueue s) st
|
||||
where
|
||||
closeQueues qs = readTVarIO qs >>= mapM_ closeMsgQueue
|
||||
closeQueues qs = readTVarIO qs >>= mapM_ (closeMsgQueue ms)
|
||||
|
||||
withActiveMsgQueues :: Monoid a => JournalMsgStore s -> (JournalQueue s -> IO a) -> IO a
|
||||
withActiveMsgQueues = withQS withLoadedQueues . queueStore_
|
||||
@@ -393,12 +390,12 @@ instance MsgStoreClass (JournalMsgStore s) where
|
||||
unsafeWithAllMsgQueues tty ms action = case queueStore_ ms of
|
||||
MQStore st -> withLoadedQueues st run
|
||||
#if defined(dbServerPostgres)
|
||||
PQStore st -> foldQueueRecs tty st Nothing $ uncurry (mkTempQueue ms) >=> run
|
||||
PQStore st -> foldQueueRecs tty st Nothing $ uncurry (mkQueue ms False) >=> run
|
||||
#endif
|
||||
where
|
||||
run q = do
|
||||
r <- action q
|
||||
closeMsgQueue q
|
||||
closeMsgQueue ms q
|
||||
pure r
|
||||
|
||||
-- This function is concurrency safe
|
||||
@@ -414,7 +411,7 @@ instance MsgStoreClass (JournalMsgStore s) where
|
||||
PQStore st -> do
|
||||
let JournalMsgStore {queueLocks, sharedLock} = ms
|
||||
foldQueueRecs tty st (Just veryOld) $ \(rId, qr) -> do
|
||||
q <- mkTempQueue ms rId qr
|
||||
q <- mkQueue ms False rId qr
|
||||
withSharedWaitLock rId queueLocks sharedLock $ run $ tryStore' "deleteExpiredMsgs" rId $
|
||||
getLoadedQueue q >>= unStoreIO . expireQueueMsgs ms now old
|
||||
#endif
|
||||
@@ -441,9 +438,26 @@ instance MsgStoreClass (JournalMsgStore s) where
|
||||
queueStore = queueStore_
|
||||
{-# INLINE queueStore #-}
|
||||
|
||||
mkQueue :: JournalMsgStore s -> RecipientId -> QueueRec -> IO (JournalQueue s)
|
||||
mkQueue ms rId qr = do
|
||||
lock <- atomically $ getMapLock (queueLocks ms) rId
|
||||
loadedQueueCounts :: JournalMsgStore s -> IO LoadedQueueCounts
|
||||
loadedQueueCounts ms = do
|
||||
let (qs, ns, nLocks_) = loaded
|
||||
loadedQueueCount <- M.size <$> readTVarIO qs
|
||||
loadedNotifierCount <- M.size <$> readTVarIO ns
|
||||
openJournalCount <- readTVarIO (openedQueueCount ms)
|
||||
queueLockCount <- M.size <$> readTVarIO (queueLocks ms)
|
||||
notifierLockCount <- maybe (pure 0) (fmap M.size . readTVarIO) nLocks_
|
||||
pure LoadedQueueCounts {loadedQueueCount, loadedNotifierCount, openJournalCount, queueLockCount, notifierLockCount}
|
||||
where
|
||||
loaded :: (TMap RecipientId (JournalQueue s), TMap NotifierId RecipientId, Maybe (TMap NotifierId Lock))
|
||||
loaded = case queueStore_ ms of
|
||||
MQStore STMQueueStore {queues, notifiers} -> (queues, notifiers, Nothing)
|
||||
#if defined(dbServerPostgres)
|
||||
PQStore PostgresQueueStore {queues, notifiers, notifierLocks} -> (queues, notifiers, Just notifierLocks)
|
||||
#endif
|
||||
|
||||
mkQueue :: JournalMsgStore s -> Bool -> RecipientId -> QueueRec -> IO (JournalQueue s)
|
||||
mkQueue ms keepLock rId qr = do
|
||||
lock <- if keepLock then atomically $ getMapLock (queueLocks ms) rId else createLockIO
|
||||
makeQueue_ ms rId qr lock
|
||||
|
||||
getMsgQueue :: JournalMsgStore s -> JournalQueue s -> Bool -> StoreIO s (JournalMsgQueue s)
|
||||
@@ -478,7 +492,7 @@ instance MsgStoreClass (JournalMsgStore s) where
|
||||
-- In case the queue became non-empty on write and then again empty on read
|
||||
-- we won't be closing it, to avoid frequent open/close on active queues.
|
||||
r <- peek
|
||||
when (isNothing r) $ StoreIO $ closeMsgQueue q
|
||||
when (isNothing r) $ StoreIO $ closeMsgQueue ms q
|
||||
pure r
|
||||
where
|
||||
peek = do
|
||||
@@ -492,7 +506,7 @@ instance MsgStoreClass (JournalMsgStore s) where
|
||||
Nothing ->
|
||||
E.bracket
|
||||
getNonEmptyMsgQueue
|
||||
(mapM_ $ \_ -> closeMsgQueue q)
|
||||
(mapM_ $ \_ -> closeMsgQueue ms q)
|
||||
(maybe (pure (Nothing, 0)) (unStoreIO . run))
|
||||
where
|
||||
run mq = do
|
||||
@@ -502,7 +516,7 @@ instance MsgStoreClass (JournalMsgStore s) where
|
||||
Just mq -> do
|
||||
ts <- readTVarIO $ activeAt q
|
||||
r <- if now - ts >= idleInterval config
|
||||
then Just <$> unStoreIO (action mq) `E.finally` closeMsgQueue q
|
||||
then Just <$> unStoreIO (action mq) `E.finally` closeMsgQueue ms q
|
||||
else pure Nothing
|
||||
sz <- unStoreIO $ getQueueSize_ mq
|
||||
pure (r, sz)
|
||||
@@ -517,7 +531,7 @@ instance MsgStoreClass (JournalMsgStore s) where
|
||||
mq <- unStoreIO $ getMsgQueue ms q False
|
||||
-- queueState was updated in getMsgQueue
|
||||
readTVarIO queueState >>= \case
|
||||
Just QState {hasStored} | not hasStored -> closeMsgQueue q $> Nothing
|
||||
Just QState {hasStored} | not hasStored -> closeMsgQueue ms q $> Nothing
|
||||
_ -> pure $ Just mq
|
||||
|
||||
deleteQueue :: JournalMsgStore s -> JournalQueue s -> IO (Either ErrorType QueueRec)
|
||||
@@ -580,6 +594,7 @@ instance MsgStoreClass (JournalMsgStore s) where
|
||||
rh <- createNewJournal queueDirectory $ journalId rs
|
||||
let hs = MsgQueueHandles {stateHandle = sh, readHandle = rh, writeHandle = Nothing}
|
||||
atomically $ writeTVar handles $ Just hs
|
||||
atomically $ modifyTVar' (openedQueueCount ms) (+ 1)
|
||||
pure hs
|
||||
switchWriteJournal hs = do
|
||||
journalId <- newJournalId $ random ms
|
||||
@@ -651,13 +666,16 @@ openMsgQueue ms@JournalMsgStore {config} q@JMQueue {queueDirectory = dir, stateP
|
||||
Just st
|
||||
| size st == 0 -> do
|
||||
(st', hs_) <- removeJournals st shouldBackup
|
||||
when (isJust hs_) incOpenedCount
|
||||
mkJournalQueue q st' hs_
|
||||
| otherwise -> do
|
||||
sh <- openBackupQueueState st shouldBackup
|
||||
(st', rh, wh_) <- closeOnException sh $ openJournals ms dir st sh
|
||||
let hs = MsgQueueHandles {stateHandle = sh, readHandle = rh, writeHandle = wh_}
|
||||
incOpenedCount
|
||||
mkJournalQueue q st' (Just hs)
|
||||
where
|
||||
incOpenedCount = atomically $ modifyTVar' (openedQueueCount ms) (+ 1)
|
||||
-- If the queue is empty, journals are deleted.
|
||||
-- New journal is created if queue is written to.
|
||||
-- canWrite is set to True.
|
||||
@@ -920,28 +938,30 @@ validQueueState MsgQueueState {readState = rs, writeState = ws, size}
|
||||
&& msgPos ws == msgCount ws
|
||||
&& bytePos ws == byteCount ws
|
||||
|
||||
-- TODO [postgres] possibly, we need to remove the lock from map
|
||||
deleteQueue_ :: JournalMsgStore s -> JournalQueue s -> IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s)))
|
||||
deleteQueue_ ms q =
|
||||
runExceptT $ isolateQueueId "deleteQueue_" ms rId $
|
||||
deleteStoreQueue (queueStore_ ms) q >>= mapM remove
|
||||
runExceptT $ isolateQueueId "deleteQueue_" ms rId $ do
|
||||
r <- deleteStoreQueue (queueStore_ ms) q >>= mapM remove
|
||||
atomically $ TM.delete rId (queueLocks ms)
|
||||
pure r
|
||||
where
|
||||
rId = recipientId q
|
||||
remove r@(_, mq_) = do
|
||||
mapM_ closeMsgQueueHandles mq_
|
||||
mapM_ (closeMsgQueueHandles ms) mq_
|
||||
removeQueueDirectory ms rId
|
||||
pure r
|
||||
|
||||
closeMsgQueue :: JournalQueue s -> IO ()
|
||||
closeMsgQueue JournalQueue {msgQueue'} = atomically (swapTVar msgQueue' Nothing) >>= mapM_ closeMsgQueueHandles
|
||||
closeMsgQueue :: JournalMsgStore s -> JournalQueue s -> IO ()
|
||||
closeMsgQueue ms JournalQueue {msgQueue'} = atomically (swapTVar msgQueue' Nothing) >>= mapM_ (closeMsgQueueHandles ms)
|
||||
|
||||
closeMsgQueueHandles :: JournalMsgQueue s -> IO ()
|
||||
closeMsgQueueHandles q = readTVarIO (handles q) >>= mapM_ closeHandles
|
||||
closeMsgQueueHandles :: JournalMsgStore s -> JournalMsgQueue s -> IO ()
|
||||
closeMsgQueueHandles ms q = readTVarIO (handles q) >>= mapM_ closeHandles
|
||||
where
|
||||
closeHandles (MsgQueueHandles sh rh wh_) = do
|
||||
hClose sh
|
||||
hClose rh
|
||||
mapM_ hClose wh_
|
||||
atomically $ modifyTVar' (openedQueueCount ms) (subtract 1)
|
||||
|
||||
removeQueueDirectory :: JournalMsgStore s -> RecipientId -> IO ()
|
||||
removeQueueDirectory st = removeQueueDirectory_ . msgQueueDirectory st
|
||||
|
||||
@@ -23,6 +23,7 @@ import Control.Monad.IO.Class
|
||||
import Control.Monad.Trans.Except
|
||||
import Data.Functor (($>))
|
||||
import Data.Int (Int64)
|
||||
import qualified Data.Map.Strict as M
|
||||
import Simplex.Messaging.Protocol
|
||||
import Simplex.Messaging.Server.MsgStore.Types
|
||||
import Simplex.Messaging.Server.QueueStore
|
||||
@@ -94,7 +95,13 @@ instance MsgStoreClass STMMsgStore where
|
||||
queueStore = queueStore_
|
||||
{-# INLINE queueStore #-}
|
||||
|
||||
mkQueue _ rId qr = STMQueue rId <$> newTVarIO (Just qr) <*> newTVarIO Nothing
|
||||
loadedQueueCounts :: STMMsgStore -> IO LoadedQueueCounts
|
||||
loadedQueueCounts STMMsgStore {queueStore_ = st} = do
|
||||
loadedQueueCount <- M.size <$> readTVarIO (queues st)
|
||||
loadedNotifierCount <- M.size <$> readTVarIO (notifiers st)
|
||||
pure LoadedQueueCounts {loadedQueueCount, loadedNotifierCount, openJournalCount = 0, queueLockCount = 0, notifierLockCount = 0}
|
||||
|
||||
mkQueue _ _ rId qr = STMQueue rId <$> newTVarIO (Just qr) <*> newTVarIO Nothing
|
||||
{-# INLINE mkQueue #-}
|
||||
|
||||
getMsgQueue :: STMMsgStore -> STMQueue -> Bool -> STM STMMsgQueue
|
||||
|
||||
@@ -43,9 +43,10 @@ class (Monad (StoreMonad s), QueueStoreClass (StoreQueue s) (QueueStore s)) => M
|
||||
logQueueStates :: s -> IO ()
|
||||
logQueueState :: StoreQueue s -> StoreMonad s ()
|
||||
queueStore :: s -> QueueStore s
|
||||
loadedQueueCounts :: s -> IO LoadedQueueCounts
|
||||
|
||||
-- message store methods
|
||||
mkQueue :: s -> RecipientId -> QueueRec -> IO (StoreQueue s)
|
||||
mkQueue :: s -> Bool -> RecipientId -> QueueRec -> IO (StoreQueue s)
|
||||
getMsgQueue :: s -> StoreQueue s -> Bool -> StoreMonad s (MsgQueue (StoreQueue s))
|
||||
getPeekMsgQueue :: s -> StoreQueue s -> StoreMonad s (Maybe (MsgQueue (StoreQueue s), Message))
|
||||
|
||||
@@ -88,11 +89,19 @@ instance Semigroup MessageStats where
|
||||
MessageStats a b c <> MessageStats x y z = MessageStats (a + x) (b + y) (c + z)
|
||||
{-# INLINE (<>) #-}
|
||||
|
||||
data LoadedQueueCounts = LoadedQueueCounts
|
||||
{ loadedQueueCount :: Int,
|
||||
loadedNotifierCount :: Int,
|
||||
openJournalCount :: Int,
|
||||
queueLockCount :: Int,
|
||||
notifierLockCount :: Int
|
||||
}
|
||||
|
||||
newMessageStats :: MessageStats
|
||||
newMessageStats = MessageStats 0 0 0
|
||||
|
||||
addQueue :: MsgStoreClass s => s -> RecipientId -> QueueRec -> IO (Either ErrorType (StoreQueue s))
|
||||
addQueue st = addQueue_ (queueStore st) (mkQueue st)
|
||||
addQueue st = addQueue_ (queueStore st) (mkQueue st True)
|
||||
{-# INLINE addQueue #-}
|
||||
|
||||
getQueue :: (MsgStoreClass s, DirectParty p) => s -> SParty p -> QueueId -> IO (Either ErrorType (StoreQueue s))
|
||||
|
||||
Reference in New Issue
Block a user