smp server: combine messages and queue storage to optimise performance, prevent race condition when deleting queue and to avoid "orphan" messages. (#1395)

* smp server: combine queue and message store into one class (WIP)

* keep deleted queue tombstones to prevent race conditions and errors when restoring

* move store log from server to store implementations

* STMQueueStore type class

* fix store closed when messages expired, handle store writing errors

* types

* version

* fix recovery from missing write journal, tests

* version
This commit is contained in:
Evgeny
2024-11-07 08:09:11 +00:00
committed by GitHub
parent 0fd4aa1e23
commit d3275cef48
17 changed files with 996 additions and 654 deletions
+173 -92
View File
@@ -11,11 +11,13 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeFamilies #-}
module Simplex.Messaging.Server.MsgStore.Journal
( JournalMsgStore (msgQueues, random),
JournalMsgQueue (queue),
( JournalMsgStore (queues, senders, notifiers, random),
JournalQueue,
JournalMsgQueue (queue, state),
JMQueue (queueDirectory, statePath),
JournalStoreConfig (..),
getQueueMessages,
@@ -32,6 +34,7 @@ module Simplex.Messaging.Server.MsgStore.Journal
newJournalId,
appendState,
queueLogFileName,
journalFilePath,
logFileExt,
)
where
@@ -42,13 +45,11 @@ import Control.Logger.Simple
import Control.Monad
import Control.Monad.Trans.Except
import qualified Data.Attoparsec.ByteString.Char8 as A
import Data.Bitraversable (bimapM)
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.Functor (($>))
import Data.Int (Int64)
import Data.List (intercalate)
import qualified Data.Map.Strict as M
import Data.Maybe (catMaybes, fromMaybe)
import qualified Data.Text as T
import Data.Time.Clock (getCurrentTime)
@@ -57,11 +58,14 @@ import GHC.IO (catchAny)
import Simplex.Messaging.Agent.Client (getMapLock, withLockMap)
import Simplex.Messaging.Agent.Lock
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol (ErrorType (..), Message (..), RecipientId)
import Simplex.Messaging.Protocol
import Simplex.Messaging.Server.MsgStore.Types
import Simplex.Messaging.Server.QueueStore
import Simplex.Messaging.Server.QueueStore.STM
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Util (ifM, tshow, ($>>=))
import Simplex.Messaging.Server.StoreLog
import Simplex.Messaging.Util (ifM, tshow, ($>>=), (<$$>))
import System.Directory
import System.Exit
import System.FilePath ((</>))
@@ -73,7 +77,10 @@ data JournalMsgStore = JournalMsgStore
{ config :: JournalStoreConfig,
random :: TVar StdGen,
queueLocks :: TMap RecipientId Lock,
msgQueues :: TMap RecipientId JournalMsgQueue
queues :: TMap RecipientId JournalQueue,
senders :: TMap SenderId RecipientId,
notifiers :: TMap NotifierId RecipientId,
storeLog :: TVar (Maybe (StoreLog 'WriteMode))
}
data JournalStoreConfig = JournalStoreConfig
@@ -88,9 +95,16 @@ data JournalStoreConfig = JournalStoreConfig
stateTailSize :: Int
}
data JournalQueue = JournalQueue
{ queueLock :: Lock,
-- To avoid race conditions and errors when restoring queues,
-- Nothing is written to TVar when queue is deleted.
queueRec :: TVar (Maybe QueueRec),
msgQueue_ :: TVar (Maybe JournalMsgQueue)
}
data JMQueue = JMQueue
{ queueDirectory :: FilePath,
queueLock :: Lock,
statePath :: FilePath
}
@@ -198,8 +212,21 @@ logFileExt = ".log"
newtype StoreIO a = StoreIO {unStoreIO :: IO a}
deriving newtype (Functor, Applicative, Monad)
instance STMQueueStore JournalMsgStore where
queues' = queues
senders' = senders
notifiers' = notifiers
storeLog' = storeLog
mkQueue st qr = do
lock <- getMapLock (queueLocks st) $ recipientId qr
q <- newTVar $! Just qr
mq <- newTVar Nothing
pure $ JournalQueue lock q mq
msgQueue_' = msgQueue_
instance MsgStoreClass JournalMsgStore where
type StoreMonad JournalMsgStore = StoreIO
type StoreQueue JournalMsgStore = JournalQueue
type MsgQueue JournalMsgStore = JournalMsgQueue
type MsgStoreConfig JournalMsgStore = JournalStoreConfig
@@ -207,39 +234,51 @@ instance MsgStoreClass JournalMsgStore where
newMsgStore config = do
random <- newTVarIO =<< newStdGen
queueLocks <- TM.emptyIO
msgQueues <- TM.emptyIO
pure JournalMsgStore {config, random, queueLocks, msgQueues}
queues <- TM.emptyIO
senders <- TM.emptyIO
notifiers <- TM.emptyIO
storeLog <- newTVarIO Nothing
pure JournalMsgStore {config, random, queueLocks, queues, senders, notifiers, storeLog}
closeMsgStore st = atomically (swapTVar (msgQueues st) M.empty) >>= mapM_ closeMsgQueueHandles
setStoreLog :: JournalMsgStore -> StoreLog 'WriteMode -> IO ()
setStoreLog st sl = atomically $ writeTVar (storeLog st) (Just sl)
activeMsgQueues = msgQueues
closeMsgStore st = do
readTVarIO (storeLog st) >>= mapM_ closeStoreLog
readTVarIO (queues st) >>= mapM_ closeMsgQueue
activeMsgQueues = queues
{-# INLINE activeMsgQueues #-}
-- This function is a "foldr" that opens and closes all queues, processes them as defined by action and accumulates the result.
-- It is used to export storage to a single file and also to expire messages and validate all queues when server is started.
-- TODO this function requires case-sensitive file system, because it uses queue directory as recipient ID.
-- It can be made to support case-insensite FS by supporting more than one queue per directory, by getting recipient ID from state file name.
withAllMsgQueues :: forall a. Monoid a => Bool -> JournalMsgStore -> (RecipientId -> JournalMsgQueue -> IO a) -> IO a
withAllMsgQueues :: forall a. Monoid a => Bool -> JournalMsgStore -> (RecipientId -> JournalQueue -> IO a) -> IO a
withAllMsgQueues tty ms@JournalMsgStore {config} action = ifM (doesDirectoryExist storePath) processStore (pure mempty)
where
processStore = do
closeMsgStore ms
lock <- createLockIO -- the same lock is used for all queues
(!count, !res) <- foldQueues 0 (processQueue lock) (0, mempty) ("", storePath)
(!count, !res) <- foldQueues 0 processQueue (0, mempty) ("", storePath)
putStrLn $ progress count
pure res
JournalStoreConfig {storePath, pathParts} = config
processQueue :: Lock -> (Int, a) -> (String, FilePath) -> IO (Int, a)
processQueue queueLock (!i, !r) (queueId, dir) = do
processQueue :: (Int, a) -> (String, FilePath) -> IO (Int, a)
processQueue (!i, !r) (queueId, dir) = do
when (tty && i `mod` 100 == 0) $ putStr (progress i <> "\r") >> IO.hFlush stdout
let statePath = msgQueueStatePath dir queueId
q <- openMsgQueue ms JMQueue {queueDirectory = dir, queueLock, statePath}
r' <- case strDecode $ B.pack queueId of
Right rId -> action rId q
Right rId ->
getQueue ms SRecipient rId >>= \case
Right q -> unStoreIO (getMsgQueue ms rId q) *> action rId q <* closeMsgQueue q
Left AUTH -> do
logWarn $ "STORE: processQueue, queue " <> T.pack queueId <> " was removed, removing " <> T.pack dir
removeQueueDirectory_ dir
pure mempty
Left e -> do
logError $ "STORE: processQueue, error getting queue " <> T.pack queueId <> ", " <> tshow e
exitFailure
Left e -> do
putStrLn ("Error: message queue directory " <> dir <> " is invalid: " <> e)
logError $ "STORE: processQueue, message queue directory " <> T.pack dir <> " is invalid, " <> tshow e
exitFailure
closeMsgQueueHandles q
pure (i + 1, r <> r')
progress i = "Processed: " <> show i <> " queues"
foldQueues depth f acc (queueId, path) = do
@@ -258,23 +297,26 @@ instance MsgStoreClass JournalMsgStore where
logQueueStates :: JournalMsgStore -> IO ()
logQueueStates ms = withActiveMsgQueues ms $ \_ -> logQueueState
logQueueState :: JournalMsgQueue -> IO ()
logQueueState q =
readTVarIO (handles q)
>>= maybe (pure ()) (\hs -> readTVarIO (state q) >>= appendState (stateHandle hs))
logQueueState :: JournalQueue -> IO ()
logQueueState q =
void $
readTVarIO (msgQueue_ q)
$>>= \mq -> readTVarIO (handles mq)
$>>= (\hs -> (readTVarIO (state mq) >>= appendState (stateHandle hs)) $> Just ())
getMsgQueue :: JournalMsgStore -> RecipientId -> ExceptT ErrorType IO JournalMsgQueue
getMsgQueue ms@JournalMsgStore {queueLocks, msgQueues, random} rId =
tryStore "getMsgQueue" (B.unpack $ strEncode rId) $ withLockMap queueLocks rId "getMsgQueue" $
TM.lookupIO rId msgQueues >>= maybe newQ pure
queueRec' = queueRec
{-# INLINE queueRec' #-}
getMsgQueue :: JournalMsgStore -> RecipientId -> JournalQueue -> StoreIO JournalMsgQueue
getMsgQueue ms@JournalMsgStore {random} rId JournalQueue {msgQueue_} =
StoreIO $ readTVarIO msgQueue_ >>= maybe newQ pure
where
newQ = do
queueLock <- atomically $ getMapLock queueLocks rId
let dir = msgQueueDirectory ms rId
statePath = msgQueueStatePath dir $ B.unpack (strEncode rId)
queue = JMQueue {queueDirectory = dir, queueLock, statePath}
queue = JMQueue {queueDirectory = dir, statePath}
q <- ifM (doesDirectoryExist dir) (openMsgQueue ms queue) (createQ queue)
atomically $ TM.insert rId q msgQueues
atomically $ writeTVar msgQueue_ $! Just q
pure q
where
createQ :: JMQueue -> IO JournalMsgQueue
@@ -282,23 +324,25 @@ instance MsgStoreClass JournalMsgStore where
-- folder and files are not created here,
-- to avoid file IO for queues without messages during subscription
journalId <- newJournalId random
mkJournalQueue queue (newMsgQueueState journalId, Nothing)
mkJournalQueue queue (newMsgQueueState journalId) Nothing
delMsgQueue :: JournalMsgStore -> RecipientId -> IO ()
delMsgQueue ms rId = withLockMap (queueLocks ms) rId "delMsgQueue" $ do
closeMsgQueue ms rId
removeQueueDirectory ms rId
openedMsgQueue :: JournalQueue -> StoreIO (Maybe JournalMsgQueue)
openedMsgQueue = StoreIO . readTVarIO . msgQueue_
{-# INLINE openedMsgQueue #-}
delMsgQueueSize :: JournalMsgStore -> RecipientId -> IO Int
delMsgQueueSize ms rId = withLockMap (queueLocks ms) rId "delMsgQueue" $ do
st_ <-
atomically (TM.lookupDelete rId (msgQueues ms))
>>= mapM (\q -> closeMsgQueueHandles q >> readTVarIO (state q))
removeQueueDirectory ms rId
pure $ maybe (-1) size st_
deleteQueue :: JournalMsgStore -> RecipientId -> JournalQueue -> IO (Either ErrorType QueueRec)
deleteQueue ms rId q =
fst <$$> deleteQueue_ ms rId q
getQueueMessages :: Bool -> JournalMsgQueue -> IO [Message]
getQueueMessages drainMsgs q = run []
deleteQueueSize :: JournalMsgStore -> RecipientId -> JournalQueue -> IO (Either ErrorType (QueueRec, Int))
deleteQueueSize ms rId q =
deleteQueue_ ms rId q >>= mapM (traverse getSize)
-- traverse operates on the second tuple element
where
getSize = maybe (pure (-1)) (fmap size . readTVarIO . state)
getQueueMessages_ :: Bool -> JournalMsgQueue -> StoreIO [Message]
getQueueMessages_ drainMsgs q = StoreIO (run [])
where
run msgs = readTVarIO (handles q) >>= maybe (pure []) (getMsg msgs)
getMsg msgs hs = chooseReadJournal q drainMsgs hs >>= maybe (pure msgs) readMsg
@@ -308,22 +352,23 @@ instance MsgStoreClass JournalMsgStore where
updateReadPos q drainMsgs len hs
(msg :) <$> run msgs
writeMsg :: JournalMsgStore -> JournalMsgQueue -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool))
writeMsg ms q@JournalMsgQueue {queue = JMQueue {queueDirectory, statePath}, handles} logState msg =
isolateQueue q "writeMsg" $ StoreIO $ do
writeMsg :: JournalMsgStore -> RecipientId -> JournalQueue -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool))
writeMsg ms rId q' logState msg = isolateQueue rId q' "writeMsg" $ do
q <- getMsgQueue ms rId q'
StoreIO $ do
st@MsgQueueState {canWrite, size} <- readTVarIO (state q)
let empty = size == 0
if canWrite || empty
then do
let canWrt' = quota > size
if canWrt'
then writeToJournal st canWrt' msg $> Just (msg, empty)
else writeToJournal st canWrt' msgQuota $> Nothing
then writeToJournal q st canWrt' msg $> Just (msg, empty)
else writeToJournal q st canWrt' msgQuota $> Nothing
else pure Nothing
where
JournalStoreConfig {quota, maxMsgCount} = config ms
msgQuota = MessageQuota {msgId = msgId msg, msgTs = msgTs msg}
writeToJournal st@MsgQueueState {writeState, readState = rs, size} canWrt' !msg' = do
msgQuota = MessageQuota {msgId = messageId msg, msgTs = messageTs msg}
writeToJournal q st@MsgQueueState {writeState, readState = rs, size} canWrt' !msg' = do
let msgStr = strEncode msg' `B.snoc` '\n'
msgLen = fromIntegral $ B.length msgStr
hs <- maybe createQueueDir pure =<< readTVarIO handles
@@ -339,6 +384,7 @@ instance MsgStoreClass JournalMsgStore where
updateQueueState q logState hs st' $
when (size == 0) $ writeTVar (tipMsg q) $ Just (Just (msg, msgLen))
where
JournalMsgQueue {queue = JMQueue {queueDirectory, statePath}, handles} = q
createQueueDir = do
createDirectoryIfMissing True queueDirectory
sh <- openFile statePath AppendMode
@@ -354,11 +400,13 @@ instance MsgStoreClass JournalMsgStore where
pure (newJournalState journalId, wh)
-- can ONLY be used while restoring messages, not while server running
setOverQuota_ :: JournalMsgQueue -> IO ()
setOverQuota_ JournalMsgQueue {state} = atomically $ modifyTVar' state $ \st -> st {canWrite = False}
setOverQuota_ :: JournalQueue -> IO ()
setOverQuota_ q =
readTVarIO (msgQueue_ q)
>>= mapM_ (\JournalMsgQueue {state} -> atomically $ modifyTVar' state $ \st -> st {canWrite = False})
getQueueSize :: JournalMsgQueue -> IO Int
getQueueSize JournalMsgQueue {state} = size <$> readTVarIO state
getQueueSize_ :: JournalMsgQueue -> StoreIO Int
getQueueSize_ JournalMsgQueue {state} = StoreIO $ size <$> readTVarIO state
tryPeekMsg_ :: JournalMsgQueue -> StoreIO (Maybe Message)
tryPeekMsg_ q@JournalMsgQueue {tipMsg, handles} =
@@ -379,27 +427,33 @@ instance MsgStoreClass JournalMsgStore where
$>>= \len -> readTVarIO handles
$>>= \hs -> updateReadPos q logState len hs $> Just ()
isolateQueue :: JournalMsgQueue -> String -> StoreIO a -> ExceptT ErrorType IO a
isolateQueue JournalMsgQueue {queue = q} op =
tryStore op (queueDirectory q) . withLock' (queueLock q) op . unStoreIO
isolateQueue :: RecipientId -> JournalQueue -> String -> StoreIO a -> ExceptT ErrorType IO a
isolateQueue rId JournalQueue {queueLock} op =
tryStore' op rId . withLock' queueLock op . unStoreIO
tryStore :: String -> String -> IO a -> ExceptT ErrorType IO a
tryStore op qId a = ExceptT $ E.mask_ $ E.try a >>= bimapM storeErr pure
tryStore' :: String -> RecipientId -> IO a -> ExceptT ErrorType IO a
tryStore' op rId = tryStore op rId . fmap Right
tryStore :: forall a. String -> RecipientId -> IO (Either ErrorType a) -> ExceptT ErrorType IO a
tryStore op rId a = ExceptT $ E.mask_ $ E.try a >>= either storeErr pure
where
storeErr :: E.SomeException -> IO ErrorType
storeErr :: E.SomeException -> IO (Either ErrorType a)
storeErr e =
let e' = intercalate ", " [op, qId, show e]
in logError ("STORE: " <> T.pack e') $> STORE e'
let e' = intercalate ", " [op, B.unpack $ strEncode rId, show e]
in logError ("STORE: " <> T.pack e') $> Left (STORE e')
isolateQueueId :: String -> JournalMsgStore -> RecipientId -> IO (Either ErrorType a) -> ExceptT ErrorType IO a
isolateQueueId op ms rId = tryStore op rId . withLockMap (queueLocks ms) rId op
openMsgQueue :: JournalMsgStore -> JMQueue -> IO JournalMsgQueue
openMsgQueue ms q@JMQueue {queueDirectory = dir, statePath} = do
(st, sh) <- readWriteQueueState ms statePath
(st', rh, wh_) <- closeOnException sh $ openJournals dir st sh
(st', rh, wh_) <- closeOnException sh $ openJournals ms dir st sh
let hs = MsgQueueHandles {stateHandle = sh, readHandle = rh, writeHandle = wh_}
mkJournalQueue q (st', Just hs)
mkJournalQueue q st' (Just hs)
mkJournalQueue :: JMQueue -> (MsgQueueState, Maybe MsgQueueHandles) -> IO JournalMsgQueue
mkJournalQueue queue (st, hs_) = do
mkJournalQueue :: JMQueue -> MsgQueueState -> Maybe MsgQueueHandles -> IO JournalMsgQueue
mkJournalQueue queue st hs_ = do
state <- newTVarIO st
tipMsg <- newTVarIO Nothing
handles <- newTVarIO hs_
@@ -464,17 +518,26 @@ createNewJournal dir journalId = do
newJournalId :: TVar StdGen -> IO ByteString
newJournalId g = strEncode <$> atomically (stateTVar g $ genByteString 12)
openJournals :: FilePath -> MsgQueueState -> Handle -> IO (MsgQueueState, Handle, Maybe Handle)
openJournals dir st@MsgQueueState {readState = rs, writeState = ws} sh = do
openJournals :: JournalMsgStore -> FilePath -> MsgQueueState -> Handle -> IO (MsgQueueState, Handle, Maybe Handle)
openJournals ms dir st@MsgQueueState {readState = rs, writeState = ws} sh = do
let rjId = journalId rs
wjId = journalId ws
openJournal rs >>= \case
Left path -> do
logError $ "STORE: openJournals, no read file - creating new file, " <> T.pack path
rh <- createNewJournal dir rjId
let st' = newMsgQueueState rjId
closeOnException rh $ appendState sh st'
pure (st', rh, Nothing)
Left path
| rjId == wjId -> do
logError $ "STORE: openJournals, no read/write file - creating new file, " <> T.pack path
newReadJournal
| otherwise -> do
let rs' = (newJournalState wjId) {msgCount = msgCount ws, byteCount = byteCount ws}
st' = st {readState = rs', size = msgCount ws}
openJournal rs' >>= \case
Left path' -> do
logError $ "STORE: openJournals, no read and write files - creating new file, read: " <> T.pack path <> ", write: " <> T.pack path'
newReadJournal
Right rh -> do
logError $ "STORE: openJournals, no read file - switched to write file, " <> T.pack path
closeOnException rh $ fixFileSize rh $ bytePos ws
pure (st', rh, Nothing)
Right rh
| rjId == wjId -> do
closeOnException rh $ fixFileSize rh $ bytePos ws
@@ -483,16 +546,23 @@ openJournals dir st@MsgQueueState {readState = rs, writeState = ws} sh = do
fixFileSize rh $ byteCount rs
openJournal ws >>= \case
Left path -> do
logError $ "STORE: openJournals, no write file - creating new file, " <> T.pack path
wh <- createNewJournal dir wjId
let size' = msgCount rs - msgPos rs
st' = st {writeState = newJournalState wjId, size = size'} -- we don't amend canWrite to trigger QCONT
closeOnException wh $ appendState sh st'
pure (st', rh, Just wh)
let msgs = msgCount rs
bytes = byteCount rs
size' = msgs - msgPos rs
ws' = (newJournalState rjId) {msgPos = msgs, msgCount = msgs, bytePos = bytes, byteCount = bytes}
st' = st {writeState = ws', size = size'} -- we don't amend canWrite to trigger QCONT
logError $ "STORE: openJournals, no write file, " <> T.pack path
pure (st', rh, Nothing)
Right wh -> do
closeOnException wh $ fixFileSize wh $ bytePos ws
pure (st, rh, Just wh)
where
newReadJournal = do
rjId' <- newJournalId $ random ms
rh <- createNewJournal dir rjId'
let st' = newMsgQueueState rjId'
closeOnException rh $ appendState sh st'
pure (st', rh, Nothing)
openJournal :: JournalState t -> IO (Either FilePath Handle)
openJournal JournalState {journalId} =
let path = journalFilePath dir journalId
@@ -599,10 +669,18 @@ validQueueState MsgQueueState {readState = rs, writeState = ws, size}
&& msgPos ws == msgCount ws
&& bytePos ws == byteCount ws
closeMsgQueue :: JournalMsgStore -> RecipientId -> IO ()
closeMsgQueue ms rId =
atomically (TM.lookupDelete rId (msgQueues ms))
>>= mapM_ closeMsgQueueHandles
deleteQueue_ :: JournalMsgStore -> RecipientId -> JournalQueue -> IO (Either ErrorType (QueueRec, Maybe JournalMsgQueue))
deleteQueue_ ms rId q =
runExceptT $ isolateQueueId "deleteQueue_" ms rId $
deleteQueue' ms rId q >>= mapM remove
where
remove r@(_, mq_) = do
mapM_ closeMsgQueueHandles mq_
removeQueueDirectory ms rId
pure r
closeMsgQueue :: JournalQueue -> IO ()
closeMsgQueue JournalQueue {msgQueue_} = atomically (swapTVar msgQueue_ Nothing) >>= mapM_ closeMsgQueueHandles
closeMsgQueueHandles :: JournalMsgQueue -> IO ()
closeMsgQueueHandles q = readTVarIO (handles q) >>= mapM_ closeHandles
@@ -613,9 +691,12 @@ closeMsgQueueHandles q = readTVarIO (handles q) >>= mapM_ closeHandles
mapM_ hClose wh_
removeQueueDirectory :: JournalMsgStore -> RecipientId -> IO ()
removeQueueDirectory st rId =
let dir = msgQueueDirectory st rId
in removePathForcibly dir `catchAny` (\e -> logError $ "STORE: removeQueueDirectory, " <> T.pack dir <> ", " <> tshow e)
removeQueueDirectory st = removeQueueDirectory_ . msgQueueDirectory st
removeQueueDirectory_ :: FilePath -> IO ()
removeQueueDirectory_ dir =
removePathForcibly dir `catchAny` \e ->
logError $ "STORE: removeQueueDirectory, " <> T.pack dir <> ", " <> tshow e
hAppend :: Handle -> Int64 -> ByteString -> IO ()
hAppend h pos s = do
+75 -39
View File
@@ -1,5 +1,6 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
@@ -7,11 +8,11 @@
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeFamilies #-}
module Simplex.Messaging.Server.MsgStore.STM
( STMMsgStore (..),
STMMsgQueue (msgQueue),
STMStoreConfig (..),
)
where
@@ -20,21 +21,35 @@ import Control.Concurrent.STM
import Control.Monad.IO.Class
import Control.Monad.Trans.Except
import Data.Functor (($>))
import Simplex.Messaging.Protocol (ErrorType, Message (..), RecipientId)
import Simplex.Messaging.Protocol
import Simplex.Messaging.Server.MsgStore.Types
import Simplex.Messaging.Server.QueueStore
import Simplex.Messaging.Server.QueueStore.STM
import Simplex.Messaging.Server.StoreLog
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
data STMMsgQueue = STMMsgQueue
{ msgQueue :: TQueue Message,
quota :: Int,
canWrite :: TVar Bool,
size :: TVar Int
}
import Simplex.Messaging.Util ((<$$>))
import System.IO (IOMode (..))
data STMMsgStore = STMMsgStore
{ storeConfig :: STMStoreConfig,
msgQueues :: TMap RecipientId STMMsgQueue
queues :: TMap RecipientId STMQueue,
senders :: TMap SenderId RecipientId,
notifiers :: TMap NotifierId RecipientId,
storeLog :: TVar (Maybe (StoreLog 'WriteMode))
}
data STMQueue = STMQueue
{ -- To avoid race conditions and errors when restoring queues,
-- Nothing is written to TVar when queue is deleted.
queueRec :: TVar (Maybe QueueRec),
msgQueue_ :: TVar (Maybe STMMsgQueue)
}
data STMMsgQueue = STMMsgQueue
{ msgQueue :: TQueue Message,
canWrite :: TVar Bool,
size :: TVar Int
}
data STMStoreConfig = STMStoreConfig
@@ -42,19 +57,34 @@ data STMStoreConfig = STMStoreConfig
quota :: Int
}
instance STMQueueStore STMMsgStore where
queues' = queues
senders' = senders
notifiers' = notifiers
storeLog' = storeLog
mkQueue _ qr = STMQueue <$> (newTVar $! Just qr) <*> newTVar Nothing
msgQueue_' = msgQueue_
instance MsgStoreClass STMMsgStore where
type StoreMonad STMMsgStore = STM
type StoreQueue STMMsgStore = STMQueue
type MsgQueue STMMsgStore = STMMsgQueue
type MsgStoreConfig STMMsgStore = STMStoreConfig
newMsgStore :: STMStoreConfig -> IO STMMsgStore
newMsgStore storeConfig = do
msgQueues <- TM.emptyIO
pure STMMsgStore {storeConfig, msgQueues}
queues <- TM.emptyIO
senders <- TM.emptyIO
notifiers <- TM.emptyIO
storeLog <- newTVarIO Nothing
pure STMMsgStore {storeConfig, queues, senders, notifiers, storeLog}
closeMsgStore _ = pure ()
setStoreLog :: STMMsgStore -> StoreLog 'WriteMode -> IO ()
setStoreLog st sl = atomically $ writeTVar (storeLog st) (Just sl)
activeMsgQueues = msgQueues
closeMsgStore st = readTVarIO (storeLog st) >>= mapM_ closeStoreLog
activeMsgQueues = queues
{-# INLINE activeMsgQueues #-}
withAllMsgQueues _ = withActiveMsgQueues
@@ -64,39 +94,44 @@ instance MsgStoreClass STMMsgStore where
logQueueState _ = pure ()
-- The reason for double lookup is that majority of messaging queues exist,
-- because multiple messages are sent to the same queue,
-- so the first lookup without STM transaction will return the queue faster.
-- In case the queue does not exist, it needs to be looked-up again inside transaction.
getMsgQueue :: STMMsgStore -> RecipientId -> ExceptT ErrorType IO STMMsgQueue
getMsgQueue STMMsgStore {msgQueues = qs, storeConfig = STMStoreConfig {quota}} rId =
liftIO $ TM.lookupIO rId qs >>= maybe (atomically maybeNewQ) pure
queueRec' = queueRec
{-# INLINE queueRec' #-}
getMsgQueue :: STMMsgStore -> RecipientId -> STMQueue -> STM STMMsgQueue
getMsgQueue _ _ STMQueue {msgQueue_} = readTVar msgQueue_ >>= maybe newQ pure
where
maybeNewQ = TM.lookup rId qs >>= maybe newQ pure
newQ = do
msgQueue <- newTQueue
canWrite <- newTVar True
size <- newTVar 0
let q = STMMsgQueue {msgQueue, quota, canWrite, size}
TM.insert rId q qs
let q = STMMsgQueue {msgQueue, canWrite, size}
writeTVar msgQueue_ $! Just q
pure q
delMsgQueue :: STMMsgStore -> RecipientId -> IO ()
delMsgQueue st rId = atomically $ TM.delete rId $ msgQueues st
openedMsgQueue :: STMQueue -> STM (Maybe STMMsgQueue)
openedMsgQueue = readTVar . msgQueue_
{-# INLINE openedMsgQueue #-}
delMsgQueueSize :: STMMsgStore -> RecipientId -> IO Int
delMsgQueueSize st rId = atomically (TM.lookupDelete rId $ msgQueues st) >>= maybe (pure 0) (\STMMsgQueue {size} -> readTVarIO size)
deleteQueue :: STMMsgStore -> RecipientId -> STMQueue -> IO (Either ErrorType QueueRec)
deleteQueue ms rId q = fst <$$> deleteQueue' ms rId q
getQueueMessages :: Bool -> STMMsgQueue -> IO [Message]
getQueueMessages drainMsgs = atomically . (if drainMsgs then flushTQueue else snapshotTQueue) . msgQueue
deleteQueueSize :: STMMsgStore -> RecipientId -> STMQueue -> IO (Either ErrorType (QueueRec, Int))
deleteQueueSize ms rId q = deleteQueue' ms rId q >>= mapM (traverse getSize)
-- traverse operates on the second tuple element
where
getSize = maybe (pure 0) (\STMMsgQueue {size} -> readTVarIO size)
getQueueMessages_ :: Bool -> STMMsgQueue -> STM [Message]
getQueueMessages_ drainMsgs = (if drainMsgs then flushTQueue else snapshotTQueue) . msgQueue
where
snapshotTQueue q = do
msgs <- flushTQueue q
mapM_ (writeTQueue q) msgs
pure msgs
writeMsg :: STMMsgStore -> STMMsgQueue -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool))
writeMsg _ STMMsgQueue {msgQueue = q, quota, canWrite, size} _logState msg = liftIO $ atomically $ do
writeMsg :: STMMsgStore -> RecipientId -> STMQueue -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool))
writeMsg ms rId q' _logState msg = liftIO $ atomically $ do
STMMsgQueue {msgQueue = q, canWrite, size} <- getMsgQueue ms rId q'
canWrt <- readTVar canWrite
empty <- isEmptyTQueue q
if canWrt || empty
@@ -109,13 +144,14 @@ instance MsgStoreClass STMMsgStore where
else (writeTQueue q $! msgQuota) $> Nothing
else pure Nothing
where
msgQuota = MessageQuota {msgId = msgId msg, msgTs = msgTs msg}
STMMsgStore {storeConfig = STMStoreConfig {quota}} = ms
msgQuota = MessageQuota {msgId = messageId msg, msgTs = messageTs msg}
setOverQuota_ :: STMMsgQueue -> IO ()
setOverQuota_ q = atomically $ writeTVar (canWrite q) False
setOverQuota_ :: STMQueue -> IO ()
setOverQuota_ q = readTVarIO (msgQueue_ q) >>= mapM_ (\mq -> atomically $ writeTVar (canWrite mq) False)
getQueueSize :: STMMsgQueue -> IO Int
getQueueSize STMMsgQueue {size} = readTVarIO size
getQueueSize_ :: STMMsgQueue -> STM Int
getQueueSize_ STMMsgQueue {size} = readTVar size
tryPeekMsg_ :: STMMsgQueue -> STM (Maybe Message)
tryPeekMsg_ = tryPeekTQueue . msgQueue
@@ -127,5 +163,5 @@ instance MsgStoreClass STMMsgStore where
Just _ -> modifyTVar' size (subtract 1)
_ -> pure ()
isolateQueue :: STMMsgQueue -> String -> STM a -> ExceptT ErrorType IO a
isolateQueue _ _ = liftIO . atomically
isolateQueue :: RecipientId -> STMQueue -> String -> STM a -> ExceptT ErrorType IO a
isolateQueue _ _ _ = liftIO . atomically
+56 -27
View File
@@ -1,5 +1,6 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
@@ -16,29 +17,44 @@ import Data.Int (Int64)
import Data.Kind
import qualified Data.Map.Strict as M
import Data.Time.Clock.System (SystemTime (systemSeconds))
import Simplex.Messaging.Protocol (ErrorType, Message (..), MsgId, RecipientId)
import Simplex.Messaging.Protocol
import Simplex.Messaging.Server.QueueStore
import Simplex.Messaging.Server.StoreLog.Types
import Simplex.Messaging.TMap (TMap)
import System.IO (IOMode (..))
class MsgStoreClass s => STMQueueStore s where
queues' :: s -> TMap RecipientId (StoreQueue s)
senders' :: s -> TMap SenderId RecipientId
notifiers' :: s -> TMap NotifierId RecipientId
storeLog' :: s -> TVar (Maybe (StoreLog 'WriteMode))
mkQueue :: s -> QueueRec -> STM (StoreQueue s)
msgQueue_' :: StoreQueue s -> TVar (Maybe (MsgQueue s))
class Monad (StoreMonad s) => MsgStoreClass s where
type StoreMonad s = (m :: Type -> Type) | m -> s
type MsgStoreConfig s = c | c -> s
type StoreQueue s = q | q -> s
type MsgQueue s = q | q -> s
newMsgStore :: MsgStoreConfig s -> IO s
setStoreLog :: s -> StoreLog 'WriteMode -> IO ()
closeMsgStore :: s -> IO ()
activeMsgQueues :: s -> TMap RecipientId (MsgQueue s)
withAllMsgQueues :: Monoid a => Bool -> s -> (RecipientId -> MsgQueue s -> IO a) -> IO a
activeMsgQueues :: s -> TMap RecipientId (StoreQueue s)
withAllMsgQueues :: Monoid a => Bool -> s -> (RecipientId -> StoreQueue s -> IO a) -> IO a
logQueueStates :: s -> IO ()
logQueueState :: MsgQueue s -> IO ()
getMsgQueue :: s -> RecipientId -> ExceptT ErrorType IO (MsgQueue s)
delMsgQueue :: s -> RecipientId -> IO ()
delMsgQueueSize :: s -> RecipientId -> IO Int
getQueueMessages :: Bool -> MsgQueue s -> IO [Message]
writeMsg :: s -> MsgQueue s -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool))
setOverQuota_ :: MsgQueue s -> IO () -- can ONLY be used while restoring messages, not while server running
getQueueSize :: MsgQueue s -> IO Int
logQueueState :: StoreQueue s -> IO ()
queueRec' :: StoreQueue s -> TVar (Maybe QueueRec)
getMsgQueue :: s -> RecipientId -> StoreQueue s -> StoreMonad s (MsgQueue s)
openedMsgQueue :: StoreQueue s -> StoreMonad s (Maybe (MsgQueue s))
deleteQueue :: s -> RecipientId -> StoreQueue s -> IO (Either ErrorType QueueRec)
deleteQueueSize :: s -> RecipientId -> StoreQueue s -> IO (Either ErrorType (QueueRec, Int))
getQueueMessages_ :: Bool -> MsgQueue s -> StoreMonad s [Message]
writeMsg :: s -> RecipientId -> StoreQueue s -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool))
setOverQuota_ :: StoreQueue s -> IO () -- can ONLY be used while restoring messages, not while server running
getQueueSize_ :: MsgQueue s -> StoreMonad s Int
tryPeekMsg_ :: MsgQueue s -> StoreMonad s (Maybe Message)
tryDeleteMsg_ :: MsgQueue s -> Bool -> StoreMonad s ()
isolateQueue :: MsgQueue s -> String -> StoreMonad s a -> ExceptT ErrorType IO a
isolateQueue :: RecipientId -> StoreQueue s -> String -> StoreMonad s a -> ExceptT ErrorType IO a
data MSType = MSMemory | MSJournal
@@ -48,42 +64,55 @@ data SMSType :: MSType -> Type where
data AMSType = forall s. AMSType (SMSType s)
withActiveMsgQueues :: (MsgStoreClass s, Monoid a) => s -> (RecipientId -> MsgQueue s -> IO a) -> IO a
withActiveMsgQueues :: (MsgStoreClass s, Monoid a) => s -> (RecipientId -> StoreQueue s -> IO a) -> IO a
withActiveMsgQueues st f = readTVarIO (activeMsgQueues st) >>= foldM run mempty . M.assocs
where
run !acc (k, v) = do
r <- f k v
pure $! acc <> r
tryPeekMsg :: MsgStoreClass s => MsgQueue s -> ExceptT ErrorType IO (Maybe Message)
tryPeekMsg mq = isolateQueue mq "tryPeekMsg" $ tryPeekMsg_ mq
getQueueMessages :: MsgStoreClass s => Bool -> s -> RecipientId -> StoreQueue s -> ExceptT ErrorType IO [Message]
getQueueMessages drainMsgs st rId q = withMsgQueue st rId q "getQueueSize" $ getQueueMessages_ drainMsgs
{-# INLINE getQueueMessages #-}
getQueueSize :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> ExceptT ErrorType IO Int
getQueueSize st rId q = withMsgQueue st rId q "getQueueSize" $ getQueueSize_
{-# INLINE getQueueSize #-}
tryPeekMsg :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> ExceptT ErrorType IO (Maybe Message)
tryPeekMsg st rId q = withMsgQueue st rId q "tryPeekMsg" $ tryPeekMsg_
{-# INLINE tryPeekMsg #-}
tryDelMsg :: MsgStoreClass s => MsgQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message)
tryDelMsg mq msgId' =
isolateQueue mq "tryDelMsg" $
tryDelMsg :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message)
tryDelMsg st rId q msgId' =
withMsgQueue st rId q "tryDelMsg" $ \mq ->
tryPeekMsg_ mq >>= \case
msg_@(Just msg)
| msgId msg == msgId' ->
| messageId msg == msgId' ->
tryDeleteMsg_ mq True >> pure msg_
_ -> pure Nothing
-- atomic delete (== read) last and peek next message if available
tryDelPeekMsg :: MsgStoreClass s => MsgQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message, Maybe Message)
tryDelPeekMsg mq msgId' =
isolateQueue mq "tryDelPeekMsg" $
tryDelPeekMsg :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message, Maybe Message)
tryDelPeekMsg st rId q msgId' =
withMsgQueue st rId q "tryDelPeekMsg" $ \mq ->
tryPeekMsg_ mq >>= \case
msg_@(Just msg)
| msgId msg == msgId' -> (msg_,) <$> (tryDeleteMsg_ mq True >> tryPeekMsg_ mq)
| messageId msg == msgId' -> (msg_,) <$> (tryDeleteMsg_ mq True >> tryPeekMsg_ mq)
| otherwise -> pure (Nothing, msg_)
_ -> pure (Nothing, Nothing)
deleteExpiredMsgs :: MsgStoreClass s => MsgQueue s -> Bool -> Int64 -> ExceptT ErrorType IO Int
deleteExpiredMsgs mq logState old = isolateQueue mq "deleteExpiredMsgs" $ loop 0
withMsgQueue :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> String -> (MsgQueue s -> StoreMonad s a) -> ExceptT ErrorType IO a
withMsgQueue st rId q op a = isolateQueue rId q op $ getMsgQueue st rId q >>= a
{-# INLINE withMsgQueue #-}
deleteExpiredMsgs :: MsgStoreClass s => RecipientId -> StoreQueue s -> Bool -> Int64 -> ExceptT ErrorType IO Int
deleteExpiredMsgs rId q logState old =
isolateQueue rId q "deleteExpiredMsgs" $ openedMsgQueue q >>= maybe (pure 0) (loop 0)
where
loop dc =
loop dc mq =
tryPeekMsg_ mq >>= \case
Just Message {msgTs}
| systemSeconds msgTs < old ->
tryDeleteMsg_ mq logState >> loop (dc + 1)
tryDeleteMsg_ mq logState >> loop (dc + 1) mq
_ -> pure dc