refactor STM queues (#1447)

This commit is contained in:
Evgeny
2025-02-05 12:04:27 +00:00
committed by GitHub
parent 45373e7f1f
commit ce24f83b64
10 changed files with 341 additions and 351 deletions
+49 -50
View File
@@ -397,8 +397,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
atomicModifyIORef'_ (msgExpired stats) (+ expired)
printMessageStats "STORE: messages" msgStats
where
expireQueueMsgs now ms old rId q = fmap (fromRight newMessageStats) . runExceptT $ do
(expired_, stored) <- idleDeleteExpiredMsgs now ms rId q old
expireQueueMsgs now ms old q = fmap (fromRight newMessageStats) . runExceptT $ do
(expired_, stored) <- idleDeleteExpiredMsgs now ms q old
pure MessageStats {storedMsgsCount = stored, expiredMsgsCount = fromMaybe 0 expired_, storedQueues = 1}
expireNtfsThread :: ServerConfig -> M ()
@@ -429,8 +429,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
ss@ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedAllB, qDeletedNew, qDeletedSecured, qSub, qSubAllB, qSubAuth, qSubDuplicate, qSubProhibited, qSubEnd, qSubEndB, ntfCreated, ntfDeleted, ntfDeletedB, ntfSub, ntfSubB, ntfSubAuth, ntfSubDuplicate, msgSent, msgSentAuth, msgSentQuota, msgSentLarge, msgRecv, msgRecvGet, msgGet, msgGetNoMsg, msgGetAuth, msgGetDuplicate, msgGetProhibited, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount, ntfCount, pRelays, pRelaysOwn, pMsgFwds, pMsgFwdsOwn, pMsgFwdsRecv}
<- asks serverStats
AMS _ st <- asks msgStore
let queues = activeMsgQueues st
notifiers = notifiers' st
let STMQueueStore {queues, notifiers} = stmQueueStore st
interval = 1000000 * logInterval
forever $ do
withFile statsFilePath AppendMode $ \h -> liftIO $ do
@@ -581,13 +580,14 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
rtm <- getRealTimeMetrics env
T.writeFile metricsFile $ prometheusMetrics sm rtm ts
getServerMetrics :: STMQueueStore s => s -> ServerStats -> IO ServerMetrics
getServerMetrics :: STMStoreClass s => s -> ServerStats -> IO ServerMetrics
getServerMetrics st ss = do
d <- getServerStatsData ss
let ps = periodStatDataCounts $ _activeQueues d
psNtf = periodStatDataCounts $ _activeQueuesNtf d
queueCount <- M.size <$> readTVarIO (activeMsgQueues st)
notifierCount <- M.size <$> readTVarIO (notifiers' st)
STMQueueStore {queues, notifiers} = stmQueueStore st
queueCount <- M.size <$> readTVarIO queues
notifierCount <- M.size <$> readTVarIO notifiers
pure ServerMetrics {statsData = d, activeQueueCounts = ps, activeNtfCounts = psNtf, queueCount, notifierCount}
getRealTimeMetrics :: Env -> IO RealTimeMetrics
@@ -675,8 +675,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
CPStats -> withUserRole $ do
ss <- unliftIO u $ asks serverStats
AMS _ st <- unliftIO u $ asks msgStore
let queues = activeMsgQueues st
notifiers = notifiers' st
let STMQueueStore {queues, notifiers} = stmQueueStore st
getStat :: (ServerStats -> IORef a) -> IO a
getStat var = readIORef (var ss)
putStat :: Show a => String -> (ServerStats -> IORef a) -> IO ()
@@ -852,8 +851,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
CPDelete sId -> withUserRole $ unliftIO u $ do
AMS _ st <- asks msgStore
r <- liftIO $ runExceptT $ do
(q, qr) <- ExceptT $ getQueueRec st SSender sId
ExceptT $ deleteQueueSize st (recipientId qr) q
q <- ExceptT $ getQueue st SSender sId
ExceptT $ deleteQueueSize st q
case r of
Left e -> liftIO $ hPutStrLn h $ "error: " <> show e
Right (qr, numDeleted) -> do
@@ -916,7 +915,7 @@ runClientTransport h@THandle {params = thParams@THandleParams {thVersion, sessio
c <- liftIO $ newClient msType clientId q thVersion sessionId ts
runClientThreads msType ms active c clientId `finally` clientDisconnected c
where
runClientThreads :: STMQueueStore (MsgStore s) => SMSType s -> MsgStore s -> TVar (IM.IntMap (Maybe AClient)) -> Client (MsgStore s) -> IS.Key -> M ()
runClientThreads :: STMStoreClass (MsgStore s) => SMSType s -> MsgStore s -> TVar (IM.IntMap (Maybe AClient)) -> Client (MsgStore s) -> IS.Key -> M ()
runClientThreads msType ms active c clientId = do
atomically $ modifyTVar' active $ IM.insert clientId $ Just (AClient msType c)
s <- asks server
@@ -972,7 +971,7 @@ cancelSub s = case subThread s of
_ -> pure ()
ProhibitSub -> pure ()
receive :: forall c s. (Transport c, STMQueueStore s) => THandleSMP c 'TServer -> s -> Client s -> M ()
receive :: forall c s. (Transport c, STMStoreClass s) => THandleSMP c 'TServer -> s -> Client s -> M ()
receive h@THandle {params = THandleParams {thAuth}} ms Client {rcvQ, sndQ, rcvActiveAt, sessionId} = do
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " receive"
sa <- asks serverActive
@@ -1072,7 +1071,7 @@ data VerificationResult s = VRVerified (Maybe (StoreQueue s, QueueRec)) | VRFail
-- - the queue or party key do not exist.
-- In all cases, the time of the verification should depend only on the provided authorization type,
-- a dummy key is used to run verification in the last two cases, and failure is returned irrespective of the result.
verifyTransmission :: forall s. STMQueueStore s => s -> Maybe (THandleAuth 'TServer, C.CbNonce) -> Maybe TransmissionAuth -> ByteString -> QueueId -> Cmd -> M (VerificationResult s)
verifyTransmission :: forall s. STMStoreClass s => s -> Maybe (THandleAuth 'TServer, C.CbNonce) -> Maybe TransmissionAuth -> ByteString -> QueueId -> Cmd -> M (VerificationResult s)
verifyTransmission ms auth_ tAuth authorized queueId cmd =
case cmd of
Cmd SRecipient (NEW k _ _ _ _) -> pure $ Nothing `verifiedWith` k
@@ -1149,7 +1148,7 @@ forkClient Client {endThreads, endThreadSeq} label action = do
action `finally` atomically (modifyTVar' endThreads $ IM.delete tId)
mkWeakThreadId t >>= atomically . modifyTVar' endThreads . IM.insert tId
client :: forall s. STMQueueStore s => THandleParams SMPVersion 'TServer -> Server -> s -> Client s -> M ()
client :: forall s. STMStoreClass s => THandleParams SMPVersion 'TServer -> Server -> s -> Client s -> M ()
client
thParams'
Server {subscribedQ, ntfSubscribedQ, subscribers}
@@ -1282,10 +1281,9 @@ client
updatedAt <- Just <$> liftIO getSystemDate
let rcvDhSecret = C.dh' dhKey privDhKey
qik (rcvId, sndId) = QIK {rcvId, sndId, rcvPublicDhKey, sndSecure}
qRec (recipientId, senderId) =
qRec senderId =
QueueRec
{ recipientId,
senderId,
{ senderId,
recipientKey,
rcvDhSecret,
senderKey = Nothing,
@@ -1297,12 +1295,12 @@ client
(corrId,entId,) <$> addQueueRetry 3 qik qRec
where
addQueueRetry ::
Int -> ((RecipientId, SenderId) -> QueueIdsKeys) -> ((RecipientId, SenderId) -> QueueRec) -> M BrokerMsg
Int -> ((RecipientId, SenderId) -> QueueIdsKeys) -> (SenderId -> QueueRec) -> M BrokerMsg
addQueueRetry 0 _ _ = pure $ ERR INTERNAL
addQueueRetry n qik qRec = do
ids <- getIds
let qr = qRec ids
liftIO (addQueue ms qr) >>= \case
ids@(rId, sId) <- getIds
let qr = qRec sId
liftIO (addQueue ms rId qr) >>= \case
Left DUPLICATE_ -> addQueueRetry (n - 1) qik qRec
Left e -> pure $ ERR e
Right q -> do
@@ -1379,7 +1377,7 @@ client
incStat $ qSubDuplicate stats
atomically (tryTakeTMVar $ delivered s) >> deliver False s
where
rId = recipientId qr
rId = recipientId' q
newSub :: M Sub
newSub = time "SUB newSub" . atomically $ do
writeTQueue subscribedQ (rId, clientId, True)
@@ -1390,7 +1388,7 @@ client
deliver inc sub = do
stats <- asks serverStats
fmap (either (\e -> (corrId, rId, ERR e)) id) $ liftIO $ runExceptT $ do
msg_ <- tryPeekMsg ms rId q
msg_ <- tryPeekMsg ms q
liftIO $ when (inc && isJust msg_) $ incStat (qSub stats)
liftIO $ deliverMessage "SUB" qr rId sub msg_
@@ -1424,7 +1422,7 @@ client
getMessage_ s delivered_ = do
stats <- asks serverStats
fmap (either err id) $ liftIO $ runExceptT $
tryPeekMsg ms (recipientId qr) q >>= \case
tryPeekMsg ms q >>= \case
Just msg -> do
let encMsg = encryptMsg qr msg
incStat $ (if isJust delivered_ then msgGetDuplicate else msgGet) stats
@@ -1471,11 +1469,11 @@ client
fmap (either err id) $ liftIO $ runExceptT $ do
case st of
ProhibitSub -> do
deletedMsg_ <- tryDelMsg ms (recipientId qr) q msgId
deletedMsg_ <- tryDelMsg ms q msgId
liftIO $ mapM_ (updateStats stats True) deletedMsg_
pure ok
_ -> do
(deletedMsg_, msg_) <- tryDelPeekMsg ms (recipientId qr) q msgId
(deletedMsg_, msg_) <- tryDelPeekMsg ms q msgId
liftIO $ mapM_ (updateStats stats False) deletedMsg_
liftIO $ deliverMessage "ACK" qr entId sub msg_
_ -> pure $ err NO_MSG
@@ -1529,7 +1527,7 @@ client
msg_ <- liftIO $ time "SEND" $ runExceptT $ do
expireMessages messageExpiration stats
msg <- liftIO $ mkMessage msgId body
writeMsg ms (recipientId qr) q True msg
writeMsg ms q True msg
case msg_ of
Left e -> pure $ err e
Right Nothing -> do
@@ -1540,10 +1538,10 @@ client
when (notification msgFlags) $ do
mapM_ (`enqueueNotification` msg) (notifier qr)
incStat $ msgSentNtf stats
liftIO $ updatePeriodStats (activeQueuesNtf stats) (recipientId qr)
liftIO $ updatePeriodStats (activeQueuesNtf stats) (recipientId' q)
incStat $ msgSent stats
incStat $ msgCount stats
liftIO $ updatePeriodStats (activeQueues stats) (recipientId qr)
liftIO $ updatePeriodStats (activeQueues stats) (recipientId' q)
pure ok
where
mkMessage :: MsgId -> C.MaxLenBS MaxMessageLen -> IO Message
@@ -1553,7 +1551,7 @@ client
expireMessages :: Maybe ExpirationConfig -> ServerStats -> ExceptT ErrorType IO ()
expireMessages msgExp stats = do
deleted <- maybe (pure 0) (deleteExpiredMsgs ms (recipientId qr) q <=< liftIO . expireBeforeEpoch) msgExp
deleted <- maybe (pure 0) (deleteExpiredMsgs ms q <=< liftIO . expireBeforeEpoch) msgExp
liftIO $ when (deleted > 0) $ atomicModifyIORef'_ (msgExpired stats) (+ deleted)
-- The condition for delivery of the message is:
@@ -1571,7 +1569,7 @@ client
whenM (TM.memberIO rId subscribers) $
atomically deliverToSub >>= mapM_ forkDeliver
where
rId = recipientId qr
rId = recipientId' q
deliverToSub =
-- lookup has ot be in the same transaction,
-- so that if subscription ends, it re-evalutates
@@ -1715,7 +1713,7 @@ client
delQueueAndMsgs :: (StoreQueue s, QueueRec) -> M (Transmission BrokerMsg)
delQueueAndMsgs (q, _) = do
liftIO (deleteQueue ms entId q) >>= \case
liftIO (deleteQueue ms q) >>= \case
Right qr -> do
-- Possibly, the same should be done if the queue is suspended, but currently we do not use it
atomically $ do
@@ -1735,11 +1733,11 @@ client
Left e -> pure $ err e
getQueueInfo :: StoreQueue s -> QueueRec -> M BrokerMsg
getQueueInfo q QueueRec {recipientId = rId, senderKey, notifier} = do
getQueueInfo q QueueRec {senderKey, notifier} = do
fmap (either ERR id) $ liftIO $ runExceptT $ do
qiSub <- liftIO $ TM.lookupIO entId subscriptions >>= mapM mkQSub
qiSize <- getQueueSize ms rId q
qiMsg <- toMsgInfo <$$> tryPeekMsg ms rId q
qiSize <- getQueueSize ms q
qiMsg <- toMsgInfo <$$> tryPeekMsg ms q
let info = QueueInfo {qiSnd = isJust senderKey, qiNtf = isJust notifier, qiSub, qiSize, qiMsg}
pure $ INFO info
where
@@ -1809,8 +1807,9 @@ exportMessages tty ms f drainMsgs = do
logError $ "error exporting messages: " <> tshow e
exitFailure
where
saveQueueMsgs h rId q =
runExceptT (getQueueMessages drainMsgs ms rId q) >>= \case
saveQueueMsgs h q = do
let rId = recipientId' q
runExceptT (getQueueMessages drainMsgs ms q) >>= \case
Right msgs -> Sum (length msgs) <$ BLD.hPutBuilder h (encodeMessages rId msgs)
Left e -> do
logError $ "STORE: saveQueueMsgs, error exporting messages from queue " <> decodeLatin1 (strEncode rId) <> ", " <> tshow e
@@ -1838,7 +1837,7 @@ processServerMessages = do
withAllMsgQueues False ms $ processValidateQueue
| otherwise -> logWarn "skipping message expiration" $> Nothing
where
processExpireQueue old rId q =
processExpireQueue old q =
runExceptT expireQueue >>= \case
Right (storedMsgsCount, expiredMsgsCount) ->
pure MessageStats {storedMsgsCount, expiredMsgsCount, storedQueues = 1}
@@ -1847,20 +1846,20 @@ processServerMessages = do
exitFailure
where
expireQueue = do
expired'' <- deleteExpiredMsgs ms rId q old
stored'' <- getQueueSize ms rId q
expired'' <- deleteExpiredMsgs ms q old
stored'' <- getQueueSize ms q
liftIO $ closeMsgQueue q
pure (stored'', expired'')
processValidateQueue :: RecipientId -> JournalQueue -> IO MessageStats
processValidateQueue rId q =
runExceptT (getQueueSize ms rId q) >>= \case
processValidateQueue :: JournalQueue -> IO MessageStats
processValidateQueue q =
runExceptT (getQueueSize ms q) >>= \case
Right storedMsgsCount -> pure newMessageStats {storedMsgsCount, storedQueues = 1}
Left e -> do
logError $ "STORE: processValidateQueue, failed opening message queue, " <> tshow e
exitFailure
-- TODO this function should be called after importing queues from store log
importMessages :: forall s. STMQueueStore s => Bool -> s -> FilePath -> Maybe Int64 -> IO MessageStats
importMessages :: forall s. STMStoreClass s => Bool -> s -> FilePath -> Maybe Int64 -> IO MessageStats
importMessages tty ms f old_ = do
logInfo $ "restoring messages from file " <> T.pack f
LB.readFile f >>= runExceptT . foldM restoreMsg (0, Nothing, (0, 0, M.empty)) . LB.lines >>= \case
@@ -1873,7 +1872,7 @@ importMessages tty ms f old_ = do
renameFile f $ f <> ".bak"
mapM_ setOverQuota_ overQuota
logQueueStates ms
storedQueues <- M.size <$> readTVarIO (activeMsgQueues ms)
storedQueues <- M.size <$> readTVarIO (queues $ stmQueueStore ms)
pure MessageStats {storedMsgsCount, expiredMsgsCount, storedQueues}
where
progress i = "Processed " <> show i <> " lines"
@@ -1892,7 +1891,7 @@ importMessages tty ms f old_ = do
(i + 1,Just (rId, q),) <$> case msg of
Message {msgTs}
| maybe True (systemSeconds msgTs >=) old_ -> do
writeMsg ms rId q False msg >>= \case
writeMsg ms q False msg >>= \case
Just _ -> pure (stored + 1, expired, overQuota)
Nothing -> do
logError $ decodeLatin1 $ "message queue " <> strEncode rId <> " is full, message not restored: " <> strEncode (messageId msg)
@@ -1901,11 +1900,11 @@ importMessages tty ms f old_ = do
MessageQuota {} ->
-- queue was over quota at some point,
-- it will be set as over quota once fully imported
mergeQuotaMsgs >> writeMsg ms rId q False msg $> (stored, expired, M.insert rId q overQuota)
mergeQuotaMsgs >> writeMsg ms q False msg $> (stored, expired, M.insert rId q overQuota)
where
-- if the first message in queue head is "quota", remove it.
mergeQuotaMsgs =
withPeekMsgQueue ms rId q "mergeQuotaMsgs" $ maybe (pure ()) $ \case
withPeekMsgQueue ms q "mergeQuotaMsgs" $ maybe (pure ()) $ \case
(mq, MessageQuota {}) -> tryDeleteMsg_ q mq False
_ -> pure ()
msgErr :: Show e => String -> e -> String
@@ -1982,7 +1981,7 @@ restoreServerStats msgStats_ ntfStats = asks (serverStatsBackupFile . config) >>
Right d@ServerStatsData {_qCount = statsQCount, _msgCount = statsMsgCount, _ntfCount = statsNtfCount} -> do
s <- asks serverStats
AMS _ st <- asks msgStore
_qCount <- M.size <$> readTVarIO (activeMsgQueues st)
_qCount <- M.size <$> readTVarIO (queues $ stmQueueStore st)
let _msgCount = maybe statsMsgCount storedMsgsCount msgStats_
_ntfCount = storedMsgsCount ntfStats
_msgExpired' = _msgExpired d + maybe 0 expiredMsgsCount msgStats_
+2 -2
View File
@@ -189,7 +189,7 @@ type family MsgStore s where
MsgStore 'MSMemory = STMMsgStore
MsgStore 'MSJournal = JournalMsgStore
data AMsgStore = forall s. (STMQueueStore (MsgStore s), MsgStoreClass (MsgStore s)) => AMS (SMSType s) (MsgStore s)
data AMsgStore = forall s. (STMStoreClass (MsgStore s), MsgStoreClass (MsgStore s)) => AMS (SMSType s) (MsgStore s)
data AStoreQueue = forall s. MsgStoreClass (MsgStore s) => ASQ (SMSType s) (StoreQueue (MsgStore s))
@@ -362,5 +362,5 @@ newSMPProxyAgent smpAgentCfg random = do
smpAgent <- newSMPClientAgent smpAgentCfg random
pure ProxyAgent {smpAgent}
readWriteQueueStore :: STMQueueStore s => FilePath -> s -> IO (StoreLog 'WriteMode)
readWriteQueueStore :: STMStoreClass s => FilePath -> s -> IO (StoreLog 'WriteMode)
readWriteQueueStore = readWriteStoreLog readQueueStore writeQueueStore
@@ -15,7 +15,7 @@
{-# LANGUAGE TupleSections #-}
module Simplex.Messaging.Server.MsgStore.Journal
( JournalMsgStore (queues, senders, notifiers, random),
( JournalMsgStore (queueStore, random),
JournalQueue,
JournalMsgQueue (queue, state),
JMQueue (queueDirectory, statePath),
@@ -77,10 +77,7 @@ data JournalMsgStore = JournalMsgStore
{ config :: JournalStoreConfig,
random :: TVar StdGen,
queueLocks :: TMap RecipientId Lock,
queues :: TMap RecipientId JournalQueue,
senders :: TMap SenderId RecipientId,
notifiers :: TMap NotifierId RecipientId,
storeLog :: TVar (Maybe (StoreLog 'WriteMode))
queueStore :: STMQueueStore JournalQueue
}
data JournalStoreConfig = JournalStoreConfig
@@ -98,7 +95,8 @@ data JournalStoreConfig = JournalStoreConfig
}
data JournalQueue = JournalQueue
{ queueLock :: Lock,
{ recipientId :: RecipientId,
queueLock :: Lock,
-- To avoid race conditions and errors when restoring queues,
-- Nothing is written to TVar when queue is deleted.
queueRec :: TVar (Maybe QueueRec),
@@ -218,18 +216,15 @@ logFileExt = ".log"
newtype StoreIO a = StoreIO {unStoreIO :: IO a}
deriving newtype (Functor, Applicative, Monad)
instance STMQueueStore JournalMsgStore where
queues' = queues
senders' = senders
notifiers' = notifiers
storeLog' = storeLog
mkQueue st qr = do
lock <- getMapLock (queueLocks st) $ recipientId qr
instance STMStoreClass JournalMsgStore where
stmQueueStore JournalMsgStore {queueStore} = queueStore
mkQueue st rId qr = do
lock <- getMapLock (queueLocks st) rId
q <- newTVar $ Just qr
mq <- newTVar Nothing
activeAt <- newTVar 0
isEmpty <- newTVar Nothing
pure $ JournalQueue lock q mq activeAt isEmpty
pure $ JournalQueue rId lock q mq activeAt isEmpty
msgQueue_' = msgQueue_
instance MsgStoreClass JournalMsgStore where
@@ -242,27 +237,21 @@ instance MsgStoreClass JournalMsgStore where
newMsgStore config = do
random <- newTVarIO =<< newStdGen
queueLocks <- TM.emptyIO
queues <- TM.emptyIO
senders <- TM.emptyIO
notifiers <- TM.emptyIO
storeLog <- newTVarIO Nothing
pure JournalMsgStore {config, random, queueLocks, queues, senders, notifiers, storeLog}
queueStore <- newQueueStore
pure JournalMsgStore {config, random, queueLocks, queueStore}
setStoreLog :: JournalMsgStore -> StoreLog 'WriteMode -> IO ()
setStoreLog st sl = atomically $ writeTVar (storeLog st) (Just sl)
setStoreLog st sl = atomically $ writeTVar (storeLog $ queueStore st) (Just sl)
closeMsgStore st = do
closeMsgStore JournalMsgStore {queueStore = st} = do
readTVarIO (storeLog st) >>= mapM_ closeStoreLog
readTVarIO (queues st) >>= mapM_ closeMsgQueue
activeMsgQueues = queues
{-# INLINE activeMsgQueues #-}
-- This function is a "foldr" that opens and closes all queues, processes them as defined by action and accumulates the result.
-- It is used to export storage to a single file and also to expire messages and validate all queues when server is started.
-- TODO this function requires case-sensitive file system, because it uses queue directory as recipient ID.
-- It can be made to support case-insensite FS by supporting more than one queue per directory, by getting recipient ID from state file name.
withAllMsgQueues :: forall a. Monoid a => Bool -> JournalMsgStore -> (RecipientId -> JournalQueue -> IO a) -> IO a
withAllMsgQueues :: forall a. Monoid a => Bool -> JournalMsgStore -> (JournalQueue -> IO a) -> IO a
withAllMsgQueues tty ms@JournalMsgStore {config} action = ifM (doesDirectoryExist storePath) processStore (pure mempty)
where
processStore = do
@@ -276,7 +265,7 @@ instance MsgStoreClass JournalMsgStore where
r' <- case strDecode $ B.pack queueId of
Right rId ->
getQueue ms SRecipient rId >>= \case
Right q -> unStoreIO (getMsgQueue ms rId q) *> action rId q <* closeMsgQueue q
Right q -> unStoreIO (getMsgQueue ms q) *> action q <* closeMsgQueue q
Left AUTH -> do
logWarn $ "STORE: processQueue, queue " <> T.pack queueId <> " was removed, removing " <> T.pack dir
removeQueueDirectory_ dir
@@ -303,7 +292,7 @@ instance MsgStoreClass JournalMsgStore where
(Nothing <$ putStrLn ("Error: path " <> path' <> " is not a directory, skipping"))
logQueueStates :: JournalMsgStore -> IO ()
logQueueStates ms = withActiveMsgQueues ms $ \_ -> unStoreIO . logQueueState
logQueueStates ms = withActiveMsgQueues ms $ unStoreIO . logQueueState
logQueueState :: JournalQueue -> StoreIO ()
logQueueState q =
@@ -312,11 +301,14 @@ instance MsgStoreClass JournalMsgStore where
$>>= \mq -> readTVarIO (handles mq)
$>>= (\hs -> (readTVarIO (state mq) >>= appendState (stateHandle hs)) $> Just ())
recipientId' = recipientId
{-# INLINE recipientId' #-}
queueRec' = queueRec
{-# INLINE queueRec' #-}
getMsgQueue :: JournalMsgStore -> RecipientId -> JournalQueue -> StoreIO JournalMsgQueue
getMsgQueue ms@JournalMsgStore {random} rId JournalQueue {msgQueue_} =
getMsgQueue :: JournalMsgStore -> JournalQueue -> StoreIO JournalMsgQueue
getMsgQueue ms@JournalMsgStore {random} JournalQueue {recipientId = rId, msgQueue_} =
StoreIO $ readTVarIO msgQueue_ >>= maybe newQ pure
where
newQ = do
@@ -334,8 +326,8 @@ instance MsgStoreClass JournalMsgStore where
journalId <- newJournalId random
mkJournalQueue queue (newMsgQueueState journalId) Nothing
getPeekMsgQueue :: JournalMsgStore -> RecipientId -> JournalQueue -> StoreIO (Maybe (JournalMsgQueue, Message))
getPeekMsgQueue ms rId q@JournalQueue {isEmpty} =
getPeekMsgQueue :: JournalMsgStore -> JournalQueue -> StoreIO (Maybe (JournalMsgQueue, Message))
getPeekMsgQueue ms q@JournalQueue {isEmpty} =
StoreIO (readTVarIO isEmpty) >>= \case
Just True -> pure Nothing
Just False -> peek
@@ -350,16 +342,16 @@ instance MsgStoreClass JournalMsgStore where
pure r
where
peek = do
mq <- getMsgQueue ms rId q
mq <- getMsgQueue ms q
(mq,) <$$> tryPeekMsg_ q mq
-- only runs action if queue is not empty
withIdleMsgQueue :: Int64 -> JournalMsgStore -> RecipientId -> JournalQueue -> (JournalMsgQueue -> StoreIO a) -> StoreIO (Maybe a, Int)
withIdleMsgQueue now ms@JournalMsgStore {config} rId q action =
withIdleMsgQueue :: Int64 -> JournalMsgStore -> JournalQueue -> (JournalMsgQueue -> StoreIO a) -> StoreIO (Maybe a, Int)
withIdleMsgQueue now ms@JournalMsgStore {config} q action =
StoreIO $ readTVarIO (msgQueue_ q) >>= \case
Nothing ->
E.bracket
(unStoreIO $ getPeekMsgQueue ms rId q)
(unStoreIO $ getPeekMsgQueue ms q)
(mapM_ $ \_ -> closeMsgQueue q)
(maybe (pure (Nothing, 0)) (unStoreIO . run))
where
@@ -375,13 +367,12 @@ instance MsgStoreClass JournalMsgStore where
sz <- unStoreIO $ getQueueSize_ mq
pure (r, sz)
deleteQueue :: JournalMsgStore -> RecipientId -> JournalQueue -> IO (Either ErrorType QueueRec)
deleteQueue ms rId q =
fst <$$> deleteQueue_ ms rId q
deleteQueue :: JournalMsgStore -> JournalQueue -> IO (Either ErrorType QueueRec)
deleteQueue ms q = fst <$$> deleteQueue_ ms q
deleteQueueSize :: JournalMsgStore -> RecipientId -> JournalQueue -> IO (Either ErrorType (QueueRec, Int))
deleteQueueSize ms rId q =
deleteQueue_ ms rId q >>= mapM (traverse getSize)
deleteQueueSize :: JournalMsgStore -> JournalQueue -> IO (Either ErrorType (QueueRec, Int))
deleteQueueSize ms q =
deleteQueue_ ms q >>= mapM (traverse getSize)
-- traverse operates on the second tuple element
where
getSize = maybe (pure (-1)) (fmap size . readTVarIO . state)
@@ -397,9 +388,9 @@ instance MsgStoreClass JournalMsgStore where
updateReadPos q drainMsgs len hs
(msg :) <$> run msgs
writeMsg :: JournalMsgStore -> RecipientId -> JournalQueue -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool))
writeMsg ms rId q' logState msg = isolateQueue rId q' "writeMsg" $ do
q <- getMsgQueue ms rId q'
writeMsg :: JournalMsgStore -> JournalQueue -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool))
writeMsg ms q' logState msg = isolateQueue q' "writeMsg" $ do
q <- getMsgQueue ms q'
StoreIO $ (`E.finally` updateActiveAt q') $ do
st@MsgQueueState {canWrite, size} <- readTVarIO (state q)
let empty = size == 0
@@ -476,9 +467,9 @@ instance MsgStoreClass JournalMsgStore where
$>>= \len -> readTVarIO handles
$>>= \hs -> updateReadPos mq logState len hs $> Just ()
isolateQueue :: RecipientId -> JournalQueue -> String -> StoreIO a -> ExceptT ErrorType IO a
isolateQueue rId JournalQueue {queueLock} op =
tryStore' op rId . withLock' queueLock op . unStoreIO
isolateQueue :: JournalQueue -> String -> StoreIO a -> ExceptT ErrorType IO a
isolateQueue JournalQueue {recipientId, queueLock} op =
tryStore' op recipientId . withLock' queueLock op . unStoreIO
updateActiveAt :: JournalQueue -> IO ()
updateActiveAt q = atomically . writeTVar (activeAt q) . systemSeconds =<< getSystemTime
@@ -721,11 +712,12 @@ validQueueState MsgQueueState {readState = rs, writeState = ws, size}
&& msgPos ws == msgCount ws
&& bytePos ws == byteCount ws
deleteQueue_ :: JournalMsgStore -> RecipientId -> JournalQueue -> IO (Either ErrorType (QueueRec, Maybe JournalMsgQueue))
deleteQueue_ ms rId q =
deleteQueue_ :: JournalMsgStore -> JournalQueue -> IO (Either ErrorType (QueueRec, Maybe JournalMsgQueue))
deleteQueue_ ms q =
runExceptT $ isolateQueueId "deleteQueue_" ms rId $
deleteQueue' ms rId q >>= mapM remove
deleteQueue' ms q >>= mapM remove
where
rId = recipientId q
remove r@(_, mq_) = do
mapM_ closeMsgQueueHandles mq_
removeQueueDirectory ms rId
+29 -37
View File
@@ -26,22 +26,18 @@ import Simplex.Messaging.Server.MsgStore.Types
import Simplex.Messaging.Server.QueueStore
import Simplex.Messaging.Server.QueueStore.STM
import Simplex.Messaging.Server.StoreLog
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Util ((<$$>), ($>>=))
import System.IO (IOMode (..))
data STMMsgStore = STMMsgStore
{ storeConfig :: STMStoreConfig,
queues :: TMap RecipientId STMQueue,
senders :: TMap SenderId RecipientId,
notifiers :: TMap NotifierId RecipientId,
storeLog :: TVar (Maybe (StoreLog 'WriteMode))
queueStore :: STMQueueStore STMQueue
}
data STMQueue = STMQueue
{ -- To avoid race conditions and errors when restoring queues,
-- Nothing is written to TVar when queue is deleted.
recipientId :: RecipientId,
queueRec :: TVar (Maybe QueueRec),
msgQueue_ :: TVar (Maybe STMMsgQueue)
}
@@ -57,12 +53,9 @@ data STMStoreConfig = STMStoreConfig
quota :: Int
}
instance STMQueueStore STMMsgStore where
queues' = queues
senders' = senders
notifiers' = notifiers
storeLog' = storeLog
mkQueue _ qr = STMQueue <$> newTVar (Just qr) <*> newTVar Nothing
instance STMStoreClass STMMsgStore where
stmQueueStore = queueStore
mkQueue _ rId qr = STMQueue rId <$> newTVar (Just qr) <*> newTVar Nothing
msgQueue_' = msgQueue_
instance MsgStoreClass STMMsgStore where
@@ -73,32 +66,31 @@ instance MsgStoreClass STMMsgStore where
newMsgStore :: STMStoreConfig -> IO STMMsgStore
newMsgStore storeConfig = do
queues <- TM.emptyIO
senders <- TM.emptyIO
notifiers <- TM.emptyIO
storeLog <- newTVarIO Nothing
pure STMMsgStore {storeConfig, queues, senders, notifiers, storeLog}
queueStore <- newQueueStore
pure STMMsgStore {storeConfig, queueStore}
setStoreLog :: STMMsgStore -> StoreLog 'WriteMode -> IO ()
setStoreLog st sl = atomically $ writeTVar (storeLog st) (Just sl)
setStoreLog st sl = atomically $ writeTVar (storeLog $ queueStore st) (Just sl)
closeMsgStore st = readTVarIO (storeLog st) >>= mapM_ closeStoreLog
activeMsgQueues = queues
{-# INLINE activeMsgQueues #-}
closeMsgStore st = readTVarIO (storeLog $ queueStore st) >>= mapM_ closeStoreLog
withAllMsgQueues _ = withActiveMsgQueues
{-# INLINE withAllMsgQueues #-}
logQueueStates _ = pure ()
{-# INLINE logQueueStates #-}
logQueueState _ = pure ()
{-# INLINE logQueueState #-}
recipientId' = recipientId
{-# INLINE recipientId' #-}
queueRec' = queueRec
{-# INLINE queueRec' #-}
getMsgQueue :: STMMsgStore -> RecipientId -> STMQueue -> STM STMMsgQueue
getMsgQueue _ _ STMQueue {msgQueue_} = readTVar msgQueue_ >>= maybe newQ pure
getMsgQueue :: STMMsgStore -> STMQueue -> STM STMMsgQueue
getMsgQueue _ STMQueue {msgQueue_} = readTVar msgQueue_ >>= maybe newQ pure
where
newQ = do
msgQueue <- newTQueue
@@ -108,23 +100,23 @@ instance MsgStoreClass STMMsgStore where
writeTVar msgQueue_ (Just q)
pure q
getPeekMsgQueue :: STMMsgStore -> RecipientId -> STMQueue -> STM (Maybe (STMMsgQueue, Message))
getPeekMsgQueue _ _ q@STMQueue {msgQueue_} = readTVar msgQueue_ $>>= \mq -> (mq,) <$$> tryPeekMsg_ q mq
getPeekMsgQueue :: STMMsgStore -> STMQueue -> STM (Maybe (STMMsgQueue, Message))
getPeekMsgQueue _ q@STMQueue {msgQueue_} = readTVar msgQueue_ $>>= \mq -> (mq,) <$$> tryPeekMsg_ q mq
-- does not create queue if it does not exist, does not delete it if it does (can't just close in-memory queue)
withIdleMsgQueue :: Int64 -> STMMsgStore -> RecipientId -> STMQueue -> (STMMsgQueue -> STM a) -> STM (Maybe a, Int)
withIdleMsgQueue _ _ _ STMQueue {msgQueue_} action = readTVar msgQueue_ >>= \case
withIdleMsgQueue :: Int64 -> STMMsgStore -> STMQueue -> (STMMsgQueue -> STM a) -> STM (Maybe a, Int)
withIdleMsgQueue _ _ STMQueue {msgQueue_} action = readTVar msgQueue_ >>= \case
Just q -> do
r <- action q
sz <- getQueueSize_ q
pure (Just r, sz)
Nothing -> pure (Nothing, 0)
deleteQueue :: STMMsgStore -> RecipientId -> STMQueue -> IO (Either ErrorType QueueRec)
deleteQueue ms rId q = fst <$$> deleteQueue' ms rId q
deleteQueue :: STMMsgStore -> STMQueue -> IO (Either ErrorType QueueRec)
deleteQueue ms q = fst <$$> deleteQueue' ms q
deleteQueueSize :: STMMsgStore -> RecipientId -> STMQueue -> IO (Either ErrorType (QueueRec, Int))
deleteQueueSize ms rId q = deleteQueue' ms rId q >>= mapM (traverse getSize)
deleteQueueSize :: STMMsgStore -> STMQueue -> IO (Either ErrorType (QueueRec, Int))
deleteQueueSize ms q = deleteQueue' ms q >>= mapM (traverse getSize)
-- traverse operates on the second tuple element
where
getSize = maybe (pure 0) (\STMMsgQueue {size} -> readTVarIO size)
@@ -137,9 +129,9 @@ instance MsgStoreClass STMMsgStore where
mapM_ (writeTQueue q) msgs
pure msgs
writeMsg :: STMMsgStore -> RecipientId -> STMQueue -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool))
writeMsg ms rId q' _logState msg = liftIO $ atomically $ do
STMMsgQueue {msgQueue = q, canWrite, size} <- getMsgQueue ms rId q'
writeMsg :: STMMsgStore -> STMQueue -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool))
writeMsg ms q' _logState msg = liftIO $ atomically $ do
STMMsgQueue {msgQueue = q, canWrite, size} <- getMsgQueue ms q'
canWrt <- readTVar canWrite
empty <- isEmptyTQueue q
if canWrt || empty
@@ -171,5 +163,5 @@ instance MsgStoreClass STMMsgStore where
Just _ -> modifyTVar' size (subtract 1)
_ -> pure ()
isolateQueue :: RecipientId -> STMQueue -> String -> STM a -> ExceptT ErrorType IO a
isolateQueue _ _ _ = liftIO . atomically
isolateQueue :: STMQueue -> String -> STM a -> ExceptT ErrorType IO a
isolateQueue _ _ = liftIO . atomically
+44 -43
View File
@@ -20,7 +20,6 @@ import Control.Monad.Trans.Except
import Data.Functor (($>))
import Data.Int (Int64)
import Data.Kind
import qualified Data.Map.Strict as M
import Data.Time.Clock.System (SystemTime (systemSeconds))
import Simplex.Messaging.Protocol
import Simplex.Messaging.Server.QueueStore
@@ -29,12 +28,16 @@ import Simplex.Messaging.TMap (TMap)
import Simplex.Messaging.Util ((<$$>))
import System.IO (IOMode (..))
class MsgStoreClass s => STMQueueStore s where
queues' :: s -> TMap RecipientId (StoreQueue s)
senders' :: s -> TMap SenderId RecipientId
notifiers' :: s -> TMap NotifierId RecipientId
storeLog' :: s -> TVar (Maybe (StoreLog 'WriteMode))
mkQueue :: s -> QueueRec -> STM (StoreQueue s)
data STMQueueStore q = STMQueueStore
{ queues :: TMap RecipientId q,
senders :: TMap SenderId RecipientId,
notifiers :: TMap NotifierId RecipientId,
storeLog :: TVar (Maybe (StoreLog 'WriteMode))
}
class MsgStoreClass s => STMStoreClass s where
stmQueueStore :: s -> STMQueueStore (StoreQueue s)
mkQueue :: s -> RecipientId -> QueueRec -> STM (StoreQueue s)
msgQueue_' :: StoreQueue s -> TVar (Maybe (MsgQueue s))
class Monad (StoreMonad s) => MsgStoreClass s where
@@ -45,25 +48,25 @@ class Monad (StoreMonad s) => MsgStoreClass s where
newMsgStore :: MsgStoreConfig s -> IO s
setStoreLog :: s -> StoreLog 'WriteMode -> IO ()
closeMsgStore :: s -> IO ()
activeMsgQueues :: s -> TMap RecipientId (StoreQueue s)
withAllMsgQueues :: Monoid a => Bool -> s -> (RecipientId -> StoreQueue s -> IO a) -> IO a
withAllMsgQueues :: Monoid a => Bool -> s -> (StoreQueue s -> IO a) -> IO a
logQueueStates :: s -> IO ()
logQueueState :: StoreQueue s -> StoreMonad s ()
recipientId' :: StoreQueue s -> RecipientId
queueRec' :: StoreQueue s -> TVar (Maybe QueueRec)
getPeekMsgQueue :: s -> RecipientId -> StoreQueue s -> StoreMonad s (Maybe (MsgQueue s, Message))
getMsgQueue :: s -> RecipientId -> StoreQueue s -> StoreMonad s (MsgQueue s)
getPeekMsgQueue :: s -> StoreQueue s -> StoreMonad s (Maybe (MsgQueue s, Message))
getMsgQueue :: s -> StoreQueue s -> StoreMonad s (MsgQueue s)
-- the journal queue will be closed after action if it was initially closed or idle longer than interval in config
withIdleMsgQueue :: Int64 -> s -> RecipientId -> StoreQueue s -> (MsgQueue s -> StoreMonad s a) -> StoreMonad s (Maybe a, Int)
deleteQueue :: s -> RecipientId -> StoreQueue s -> IO (Either ErrorType QueueRec)
deleteQueueSize :: s -> RecipientId -> StoreQueue s -> IO (Either ErrorType (QueueRec, Int))
withIdleMsgQueue :: Int64 -> s -> StoreQueue s -> (MsgQueue s -> StoreMonad s a) -> StoreMonad s (Maybe a, Int)
deleteQueue :: s -> StoreQueue s -> IO (Either ErrorType QueueRec)
deleteQueueSize :: s -> StoreQueue s -> IO (Either ErrorType (QueueRec, Int))
getQueueMessages_ :: Bool -> MsgQueue s -> StoreMonad s [Message]
writeMsg :: s -> RecipientId -> StoreQueue s -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool))
writeMsg :: s -> StoreQueue s -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool))
setOverQuota_ :: StoreQueue s -> IO () -- can ONLY be used while restoring messages, not while server running
getQueueSize_ :: MsgQueue s -> StoreMonad s Int
tryPeekMsg_ :: StoreQueue s -> MsgQueue s -> StoreMonad s (Maybe Message)
tryDeleteMsg_ :: StoreQueue s -> MsgQueue s -> Bool -> StoreMonad s ()
isolateQueue :: RecipientId -> StoreQueue s -> String -> StoreMonad s a -> ExceptT ErrorType IO a
isolateQueue :: StoreQueue s -> String -> StoreMonad s a -> ExceptT ErrorType IO a
data MSType = MSMemory | MSJournal
@@ -73,28 +76,26 @@ data SMSType :: MSType -> Type where
data AMSType = forall s. AMSType (SMSType s)
withActiveMsgQueues :: (MsgStoreClass s, Monoid a) => s -> (RecipientId -> StoreQueue s -> IO a) -> IO a
withActiveMsgQueues st f = readTVarIO (activeMsgQueues st) >>= foldM run mempty . M.assocs
withActiveMsgQueues :: (STMStoreClass s, Monoid a) => s -> (StoreQueue s -> IO a) -> IO a
withActiveMsgQueues st f = readTVarIO (queues $ stmQueueStore st) >>= foldM run mempty
where
run !acc (k, v) = do
r <- f k v
pure $! acc <> r
run !acc = fmap (acc <>) . f
getQueueMessages :: MsgStoreClass s => Bool -> s -> RecipientId -> StoreQueue s -> ExceptT ErrorType IO [Message]
getQueueMessages drainMsgs st rId q = withPeekMsgQueue st rId q "getQueueSize" $ maybe (pure []) (getQueueMessages_ drainMsgs . fst)
getQueueMessages :: MsgStoreClass s => Bool -> s -> StoreQueue s -> ExceptT ErrorType IO [Message]
getQueueMessages drainMsgs st q = withPeekMsgQueue st q "getQueueSize" $ maybe (pure []) (getQueueMessages_ drainMsgs . fst)
{-# INLINE getQueueMessages #-}
getQueueSize :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> ExceptT ErrorType IO Int
getQueueSize st rId q = withPeekMsgQueue st rId q "getQueueSize" $ maybe (pure 0) (getQueueSize_ . fst)
getQueueSize :: MsgStoreClass s => s -> StoreQueue s -> ExceptT ErrorType IO Int
getQueueSize st q = withPeekMsgQueue st q "getQueueSize" $ maybe (pure 0) (getQueueSize_ . fst)
{-# INLINE getQueueSize #-}
tryPeekMsg :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> ExceptT ErrorType IO (Maybe Message)
tryPeekMsg st rId q = snd <$$> withPeekMsgQueue st rId q "tryPeekMsg" pure
tryPeekMsg :: MsgStoreClass s => s -> StoreQueue s -> ExceptT ErrorType IO (Maybe Message)
tryPeekMsg st q = snd <$$> withPeekMsgQueue st q "tryPeekMsg" pure
{-# INLINE tryPeekMsg #-}
tryDelMsg :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message)
tryDelMsg st rId q msgId' =
withPeekMsgQueue st rId q "tryDelMsg" $
tryDelMsg :: MsgStoreClass s => s -> StoreQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message)
tryDelMsg st q msgId' =
withPeekMsgQueue st q "tryDelMsg" $
maybe (pure Nothing) $ \(mq, msg) ->
if
| messageId msg == msgId' ->
@@ -102,30 +103,30 @@ tryDelMsg st rId q msgId' =
| otherwise -> pure Nothing
-- atomic delete (== read) last and peek next message if available
tryDelPeekMsg :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message, Maybe Message)
tryDelPeekMsg st rId q msgId' =
withPeekMsgQueue st rId q "tryDelPeekMsg" $
tryDelPeekMsg :: MsgStoreClass s => s -> StoreQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message, Maybe Message)
tryDelPeekMsg st q msgId' =
withPeekMsgQueue st q "tryDelPeekMsg" $
maybe (pure (Nothing, Nothing)) $ \(mq, msg) ->
if
| messageId msg == msgId' -> (Just msg,) <$> (tryDeleteMsg_ q mq True >> tryPeekMsg_ q mq)
| otherwise -> pure (Nothing, Just msg)
-- The action is called with Nothing when it is known that the queue is empty
withPeekMsgQueue :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> String -> (Maybe (MsgQueue s, Message) -> StoreMonad s a) -> ExceptT ErrorType IO a
withPeekMsgQueue st rId q op a = isolateQueue rId q op $ getPeekMsgQueue st rId q >>= a
withPeekMsgQueue :: MsgStoreClass s => s -> StoreQueue s -> String -> (Maybe (MsgQueue s, Message) -> StoreMonad s a) -> ExceptT ErrorType IO a
withPeekMsgQueue st q op a = isolateQueue q op $ getPeekMsgQueue st q >>= a
{-# INLINE withPeekMsgQueue #-}
deleteExpiredMsgs :: MsgStoreClass s => s -> RecipientId -> StoreQueue s -> Int64 -> ExceptT ErrorType IO Int
deleteExpiredMsgs st rId q old =
isolateQueue rId q "deleteExpiredMsgs" $
getMsgQueue st rId q >>= deleteExpireMsgs_ old q
deleteExpiredMsgs :: MsgStoreClass s => s -> StoreQueue s -> Int64 -> ExceptT ErrorType IO Int
deleteExpiredMsgs st q old =
isolateQueue q "deleteExpiredMsgs" $
getMsgQueue st q >>= deleteExpireMsgs_ old q
-- closed and idle queues will be closed after expiration
-- returns (expired count, queue size after expiration)
idleDeleteExpiredMsgs :: MsgStoreClass s => Int64 -> s -> RecipientId -> StoreQueue s -> Int64 -> ExceptT ErrorType IO (Maybe Int, Int)
idleDeleteExpiredMsgs now st rId q old =
isolateQueue rId q "idleDeleteExpiredMsgs" $
withIdleMsgQueue now st rId q (deleteExpireMsgs_ old q)
idleDeleteExpiredMsgs :: MsgStoreClass s => Int64 -> s -> StoreQueue s -> Int64 -> ExceptT ErrorType IO (Maybe Int, Int)
idleDeleteExpiredMsgs now st q old =
isolateQueue q "idleDeleteExpiredMsgs" $
withIdleMsgQueue now st q (deleteExpireMsgs_ old q)
deleteExpireMsgs_ :: MsgStoreClass s => Int64 -> StoreQueue s -> MsgQueue s -> StoreMonad s Int
deleteExpireMsgs_ old q mq = do
+1 -2
View File
@@ -16,8 +16,7 @@ import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol
data QueueRec = QueueRec
{ recipientId :: !RecipientId,
recipientKey :: !RcvPublicAuthKey,
{ recipientKey :: !RcvPublicAuthKey,
rcvDhSecret :: !RcvDhSecret,
senderId :: !SenderId,
senderKey :: !(Maybe SndPublicAuthKey),
+69 -60
View File
@@ -25,6 +25,7 @@ module Simplex.Messaging.Server.QueueStore.STM
unblockQueue,
updateQueueTime,
deleteQueue',
newQueueStore,
readQueueStore,
withLog',
)
@@ -51,99 +52,106 @@ import Simplex.Messaging.Util (ifM, tshow, ($>>=), (<$$))
import System.IO
import UnliftIO.STM
addQueue :: STMQueueStore s => s -> QueueRec -> IO (Either ErrorType (StoreQueue s))
addQueue st qr@QueueRec {recipientId = rId, senderId = sId, notifier}=
atomically add
$>>= \q -> q <$$ withLog "addQueue" st (`logCreateQueue` qr)
where
add = ifM hasId (pure $ Left DUPLICATE_) $ do
q <- mkQueue st qr -- STMQueue lock <$> (newTVar $! Just qr) <*> newTVar Nothing
TM.insert rId q $ queues' st
TM.insert sId rId $ senders' st
forM_ notifier $ \NtfCreds {notifierId} -> TM.insert notifierId rId $ notifiers' st
pure $ Right q
hasId = or <$> sequence [TM.member rId $ queues' st, TM.member sId $ senders' st, hasNotifier]
hasNotifier = maybe (pure False) (\NtfCreds {notifierId} -> TM.member notifierId (notifiers' st)) notifier
newQueueStore :: IO (STMQueueStore q)
newQueueStore = do
queues <- TM.emptyIO
senders <- TM.emptyIO
notifiers <- TM.emptyIO
storeLog <- newTVarIO Nothing
pure STMQueueStore {queues, senders, notifiers, storeLog}
getQueue :: (STMQueueStore s, DirectParty p) => s -> SParty p -> QueueId -> IO (Either ErrorType (StoreQueue s))
addQueue :: STMStoreClass s => s -> RecipientId -> QueueRec -> IO (Either ErrorType (StoreQueue s))
addQueue st rId qr@QueueRec {senderId = sId, notifier}=
atomically add
$>>= \q -> q <$$ withLog "addQueue" st (\s -> logCreateQueue s rId qr)
where
STMQueueStore {queues, senders, notifiers} = stmQueueStore st
add = ifM hasId (pure $ Left DUPLICATE_) $ do
q <- mkQueue st rId qr
TM.insert rId q queues
TM.insert sId rId senders
forM_ notifier $ \NtfCreds {notifierId} -> TM.insert notifierId rId notifiers
pure $ Right q
hasId = or <$> sequence [TM.member rId queues, TM.member sId senders, hasNotifier]
hasNotifier = maybe (pure False) (\NtfCreds {notifierId} -> TM.member notifierId notifiers) notifier
getQueue :: (STMStoreClass s, DirectParty p) => s -> SParty p -> QueueId -> IO (Either ErrorType (StoreQueue s))
getQueue st party qId =
maybe (Left AUTH) Right <$> case party of
SRecipient -> TM.lookupIO qId $ queues' st
SSender -> TM.lookupIO qId (senders' st) $>>= (`TM.lookupIO` queues' st)
SNotifier -> TM.lookupIO qId (notifiers' st) $>>= (`TM.lookupIO` queues' st)
SRecipient -> TM.lookupIO qId queues
SSender -> TM.lookupIO qId senders $>>= (`TM.lookupIO` queues)
SNotifier -> TM.lookupIO qId notifiers $>>= (`TM.lookupIO` queues)
where
STMQueueStore {queues, senders, notifiers} = stmQueueStore st
getQueueRec :: (STMQueueStore s, DirectParty p) => s -> SParty p -> QueueId -> IO (Either ErrorType (StoreQueue s, QueueRec))
getQueueRec :: (STMStoreClass s, DirectParty p) => s -> SParty p -> QueueId -> IO (Either ErrorType (StoreQueue s, QueueRec))
getQueueRec st party qId =
getQueue st party qId
$>>= (\q -> maybe (Left AUTH) (Right . (q,)) <$> readTVarIO (queueRec' q))
secureQueue :: STMQueueStore s => s -> StoreQueue s -> SndPublicAuthKey -> IO (Either ErrorType ())
secureQueue :: STMStoreClass s => s -> StoreQueue s -> SndPublicAuthKey -> IO (Either ErrorType ())
secureQueue st sq sKey =
atomically (readQueueRec qr $>>= secure)
$>>= \rId -> withLog "secureQueue" st $ \s -> logSecureQueue s rId sKey
$>>= \_ -> withLog "secureQueue" st $ \s -> logSecureQueue s (recipientId' sq) sKey
where
qr = queueRec' sq
secure q@QueueRec {recipientId = rId} = case senderKey q of
Just k -> pure $ if sKey == k then Right rId else Left AUTH
secure q = case senderKey q of
Just k -> pure $ if sKey == k then Right () else Left AUTH
Nothing -> do
writeTVar qr $ Just q {senderKey = Just sKey}
pure $ Right rId
pure $ Right ()
addQueueNotifier :: STMQueueStore s => s -> StoreQueue s -> NtfCreds -> IO (Either ErrorType (Maybe NotifierId))
addQueueNotifier :: STMStoreClass s => s -> StoreQueue s -> NtfCreds -> IO (Either ErrorType (Maybe NotifierId))
addQueueNotifier st sq ntfCreds@NtfCreds {notifierId = nId} =
atomically (readQueueRec qr $>>= add)
$>>= \(rId, nId_) -> nId_ <$$ withLog "addQueueNotifier" st (\s -> logAddNotifier s rId ntfCreds)
$>>= \nId_ -> nId_ <$$ withLog "addQueueNotifier" st (\s -> logAddNotifier s rId ntfCreds)
where
rId = recipientId' sq
qr = queueRec' sq
add q@QueueRec {recipientId = rId} = ifM (TM.member nId (notifiers' st)) (pure $ Left DUPLICATE_) $ do
nId_ <- forM (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId (notifiers' st) $> notifierId
STMQueueStore {notifiers} = stmQueueStore st
add q = ifM (TM.member nId notifiers) (pure $ Left DUPLICATE_) $ do
nId_ <- forM (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId notifiers $> notifierId
let !q' = q {notifier = Just ntfCreds}
writeTVar qr $ Just q'
TM.insert nId rId $ notifiers' st
pure $ Right (rId, nId_)
TM.insert nId rId notifiers
pure $ Right nId_
deleteQueueNotifier :: STMQueueStore s => s -> StoreQueue s -> IO (Either ErrorType (Maybe NotifierId))
deleteQueueNotifier :: STMStoreClass s => s -> StoreQueue s -> IO (Either ErrorType (Maybe NotifierId))
deleteQueueNotifier st sq =
atomically (readQueueRec qr >>= mapM delete)
$>>= \(rId, nId_) -> nId_ <$$ withLog "deleteQueueNotifier" st (`logDeleteNotifier` rId)
$>>= \nId_ -> nId_ <$$ withLog "deleteQueueNotifier" st (`logDeleteNotifier` recipientId' sq)
where
qr = queueRec' sq
delete q = fmap (recipientId q,) $ forM (notifier q) $ \NtfCreds {notifierId} -> do
TM.delete notifierId $ notifiers' st
delete q = forM (notifier q) $ \NtfCreds {notifierId} -> do
TM.delete notifierId $ notifiers $ stmQueueStore st
writeTVar qr $! Just q {notifier = Nothing}
pure notifierId
suspendQueue :: STMQueueStore s => s -> StoreQueue s -> IO (Either ErrorType ())
suspendQueue :: STMStoreClass s => s -> StoreQueue s -> IO (Either ErrorType ())
suspendQueue st sq =
atomically (readQueueRec qr >>= mapM suspend)
$>>= \rId -> withLog "suspendQueue" st (`logSuspendQueue` rId)
$>>= \_ -> withLog "suspendQueue" st (`logSuspendQueue` recipientId' sq)
where
qr = queueRec' sq
suspend q = do
writeTVar qr $! Just q {status = EntityOff}
pure $ recipientId q
suspend q = writeTVar qr $! Just q {status = EntityOff}
blockQueue :: STMQueueStore s => s -> StoreQueue s -> BlockingInfo -> IO (Either ErrorType ())
blockQueue :: STMStoreClass s => s -> StoreQueue s -> BlockingInfo -> IO (Either ErrorType ())
blockQueue st sq info =
atomically (readQueueRec qr >>= mapM block)
$>>= \rId -> withLog "blockQueue" st (\sl -> logBlockQueue sl rId info)
$>>= \_ -> withLog "blockQueue" st (\sl -> logBlockQueue sl (recipientId' sq) info)
where
qr = queueRec' sq
block q = do
writeTVar qr $ Just q {status = EntityBlocked info}
pure $ recipientId q
block q = writeTVar qr $ Just q {status = EntityBlocked info}
unblockQueue :: STMQueueStore s => s -> StoreQueue s -> IO (Either ErrorType ())
unblockQueue :: STMStoreClass s => s -> StoreQueue s -> IO (Either ErrorType ())
unblockQueue st sq =
atomically (readQueueRec qr >>= mapM unblock)
$>>= \rId -> withLog "unblockQueue" st (`logUnblockQueue` rId)
$>>= \_ -> withLog "unblockQueue" st (`logUnblockQueue` recipientId' sq)
where
qr = queueRec' sq
unblock q = do
writeTVar qr $ Just q {status = EntityActive}
pure $ recipientId q
unblock q = writeTVar qr $ Just q {status = EntityActive}
updateQueueTime :: STMQueueStore s => s -> StoreQueue s -> RoundedSystemTime -> IO (Either ErrorType QueueRec)
updateQueueTime :: STMStoreClass s => s -> StoreQueue s -> RoundedSystemTime -> IO (Either ErrorType QueueRec)
updateQueueTime st sq t = atomically (readQueueRec qr >>= mapM update) $>>= log'
where
qr = queueRec' sq
@@ -153,20 +161,21 @@ updateQueueTime st sq t = atomically (readQueueRec qr >>= mapM update) $>>= log'
let !q' = q {updatedAt = Just t}
in (writeTVar qr $! Just q') $> (q', True)
log' (q, changed)
| changed = q <$$ withLog "updateQueueTime" st (\sl -> logUpdateQueueTime sl (recipientId q) t)
| changed = q <$$ withLog "updateQueueTime" st (\sl -> logUpdateQueueTime sl (recipientId' sq) t)
| otherwise = pure $ Right q
deleteQueue' :: STMQueueStore s => s -> RecipientId -> StoreQueue s -> IO (Either ErrorType (QueueRec, Maybe (MsgQueue s)))
deleteQueue' st rId sq =
deleteQueue' :: STMStoreClass s => s -> StoreQueue s -> IO (Either ErrorType (QueueRec, Maybe (MsgQueue s)))
deleteQueue' st sq =
atomically (readQueueRec qr >>= mapM delete)
$>>= \q -> withLog "deleteQueue" st (`logDeleteQueue` rId)
$>>= \q -> withLog "deleteQueue" st (`logDeleteQueue` recipientId' sq)
>>= bimapM pure (\_ -> (q,) <$> atomically (swapTVar (msgQueue_' sq) Nothing))
where
qr = queueRec' sq
STMQueueStore {senders, notifiers} = stmQueueStore st
delete q = do
writeTVar qr Nothing
TM.delete (senderId q) $ senders' st
forM_ (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId $ notifiers' st
TM.delete (senderId q) senders
forM_ (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId notifiers
pure q
readQueueRec :: TVar (Maybe QueueRec) -> STM (Either ErrorType QueueRec)
@@ -183,10 +192,10 @@ withLog' name sl action =
where
err = name <> ", withLog, " <> show e
withLog :: STMQueueStore s => String -> s -> (StoreLog 'WriteMode -> IO ()) -> IO (Either ErrorType ())
withLog name = withLog' name . storeLog'
withLog :: STMStoreClass s => String -> s -> (StoreLog 'WriteMode -> IO ()) -> IO (Either ErrorType ())
withLog name = withLog' name . storeLog . stmQueueStore
readQueueStore :: forall s. STMQueueStore s => FilePath -> s -> IO ()
readQueueStore :: forall s. STMStoreClass s => FilePath -> s -> IO ()
readQueueStore f st = withFile f ReadMode $ LB.hGetContents >=> mapM_ processLine . LB.lines
where
processLine :: LB.ByteString -> IO ()
@@ -195,13 +204,13 @@ readQueueStore f st = withFile f ReadMode $ LB.hGetContents >=> mapM_ processLin
s = LB.toStrict s'
procLogRecord :: StoreLogRecord -> IO ()
procLogRecord = \case
CreateQueue q -> addQueue st q >>= qError (recipientId q) "CreateQueue"
CreateQueue rId q -> addQueue st rId q >>= qError rId "CreateQueue"
SecureQueue qId sKey -> withQueue qId "SecureQueue" $ \q -> secureQueue st q sKey
AddNotifier qId ntfCreds -> withQueue qId "AddNotifier" $ \q -> addQueueNotifier st q ntfCreds
SuspendQueue qId -> withQueue qId "SuspendQueue" $ suspendQueue st
BlockQueue qId info -> withQueue qId "BlockQueue" $ \q -> blockQueue st q info
UnblockQueue qId -> withQueue qId "UnblockQueue" $ unblockQueue st
DeleteQueue qId -> withQueue qId "DeleteQueue" $ deleteQueue st qId
DeleteQueue qId -> withQueue qId "DeleteQueue" $ deleteQueue st
DeleteNotifier qId -> withQueue qId "DeleteNotifier" $ deleteQueueNotifier st
UpdateTime qId t -> withQueue qId "UpdateTime" $ \q -> updateQueueTime st q t
printError :: String -> IO ()
+13 -14
View File
@@ -54,7 +54,7 @@ import System.Directory (doesFileExist, renameFile)
import System.IO
data StoreLogRecord
= CreateQueue QueueRec
= CreateQueue RecipientId QueueRec
| SecureQueue QueueId SndPublicAuthKey
| AddNotifier QueueId NtfCreds
| SuspendQueue QueueId
@@ -77,10 +77,9 @@ data SLRTag
| UpdateTime_
instance StrEncoding QueueRec where
strEncode QueueRec {recipientId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, status, updatedAt} =
strEncode QueueRec {recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, status, updatedAt} =
B.unwords
[ "rid=" <> strEncode recipientId,
"rk=" <> strEncode recipientKey,
[ "rk=" <> strEncode recipientKey,
"rdh=" <> strEncode rcvDhSecret,
"sid=" <> strEncode senderId,
"sk=" <> strEncode senderKey
@@ -98,7 +97,6 @@ instance StrEncoding QueueRec where
_ -> " status=" <> strEncode status
strP = do
recipientId <- "rid=" *> strP_
recipientKey <- "rk=" *> strP_
rcvDhSecret <- "rdh=" *> strP_
senderId <- "sid=" *> strP_
@@ -107,7 +105,7 @@ instance StrEncoding QueueRec where
notifier <- optional $ " notifier=" *> strP
updatedAt <- optional $ " updated_at=" *> strP
status <- (" status=" *> strP) <|> pure EntityActive
pure QueueRec {recipientId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, status, updatedAt}
pure QueueRec {recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, status, updatedAt}
instance StrEncoding SLRTag where
strEncode = \case
@@ -136,7 +134,7 @@ instance StrEncoding SLRTag where
instance StrEncoding StoreLogRecord where
strEncode = \case
CreateQueue q -> strEncode (CreateQueue_, q)
CreateQueue rId q -> B.unwords [strEncode CreateQueue_, "rid=" <> strEncode rId, strEncode q]
SecureQueue rId sKey -> strEncode (SecureQueue_, rId, sKey)
AddNotifier rId ntfCreds -> strEncode (AddNotifier_, rId, ntfCreds)
SuspendQueue rId -> strEncode (SuspendQueue_, rId)
@@ -148,7 +146,7 @@ instance StrEncoding StoreLogRecord where
strP =
strP_ >>= \case
CreateQueue_ -> CreateQueue <$> strP
CreateQueue_ -> CreateQueue <$> ("rid=" *> strP_) <*> strP
SecureQueue_ -> SecureQueue <$> strP_ <*> strP
AddNotifier_ -> AddNotifier <$> strP_ <*> strP
SuspendQueue_ -> SuspendQueue <$> strP
@@ -186,8 +184,8 @@ writeStoreLogRecord (WriteStoreLog _ h) r = E.uninterruptibleMask_ $ do
B.hPut h $ strEncode r `B.snoc` '\n' -- hPutStrLn makes write non-atomic for length > 1024
hFlush h
logCreateQueue :: StoreLog 'WriteMode -> QueueRec -> IO ()
logCreateQueue s = writeStoreLogRecord s . CreateQueue
logCreateQueue :: StoreLog 'WriteMode -> RecipientId -> QueueRec -> IO ()
logCreateQueue s rId q = writeStoreLogRecord s $ CreateQueue rId q
logSecureQueue :: StoreLog 'WriteMode -> QueueId -> SndPublicAuthKey -> IO ()
logSecureQueue s qId sKey = writeStoreLogRecord s $ SecureQueue qId sKey
@@ -248,10 +246,11 @@ readWriteStoreLog readStore writeStore f st =
renameFile tempBackup timedBackup
logInfo $ "original state preserved as " <> T.pack timedBackup
writeQueueStore :: STMQueueStore s => StoreLog 'WriteMode -> s -> IO ()
writeQueueStore s st = readTVarIO (activeMsgQueues st) >>= mapM_ writeQueue . M.assocs
writeQueueStore :: STMStoreClass s => StoreLog 'WriteMode -> s -> IO ()
writeQueueStore s st = readTVarIO qs >>= mapM_ writeQueue . M.assocs
where
qs = queues $ stmQueueStore st
writeQueue (rId, q) =
readTVarIO (queueRec' q) >>= \case
Just q' -> logCreateQueue s q'
Nothing -> atomically $ TM.delete rId $ activeMsgQueues st
Just q' -> logCreateQueue s rId q'
Nothing -> atomically $ TM.delete rId qs
+80 -81
View File
@@ -58,12 +58,12 @@ msgStoreTests = do
it "should create write file when missing" testWriteFileMissing
it "should create read file when read and write files are missing" testReadAndWriteFilesMissing
where
someMsgStoreTests :: STMQueueStore s => SpecWith s
someMsgStoreTests :: STMStoreClass s => SpecWith s
someMsgStoreTests = do
it "should get queue and store/read messages" testGetQueue
it "should not fail on EOF when changing read journal" testChangeReadJournal
withMsgStore :: STMQueueStore s => MsgStoreConfig s -> (s -> IO ()) -> IO ()
withMsgStore :: STMStoreClass s => MsgStoreConfig s -> (s -> IO ()) -> IO ()
withMsgStore cfg = bracket (newMsgStore cfg) closeMsgStore
testSMTStoreConfig :: STMStoreConfig
@@ -105,8 +105,7 @@ testNewQueueRec g sndSecure = do
(k, pk) <- atomically $ C.generateKeyPair @'C.X25519 g
let qr =
QueueRec
{ recipientId = rId,
recipientKey,
{ recipientKey,
rcvDhSecret = C.dh' k pk,
senderId,
senderKey = Nothing,
@@ -117,66 +116,66 @@ testNewQueueRec g sndSecure = do
}
pure (rId, qr)
testGetQueue :: STMQueueStore s => s -> IO ()
testGetQueue :: STMStoreClass s => s -> IO ()
testGetQueue ms = do
g <- C.newRandom
(rId, qr) <- testNewQueueRec g True
runRight_ $ do
q <- ExceptT $ addQueue ms qr
let write s = writeMsg ms rId q True =<< mkMessage s
q <- ExceptT $ addQueue ms rId qr
let write s = writeMsg ms q True =<< mkMessage s
Just (Message {msgId = mId1}, True) <- write "message 1"
Just (Message {msgId = mId2}, False) <- write "message 2"
Just (Message {msgId = mId3}, False) <- write "message 3"
Msg "message 1" <- tryPeekMsg ms rId q
Msg "message 1" <- tryPeekMsg ms rId q
Nothing <- tryDelMsg ms rId q mId2
Msg "message 1" <- tryDelMsg ms rId q mId1
Nothing <- tryDelMsg ms rId q mId1
Msg "message 2" <- tryPeekMsg ms rId q
Nothing <- tryDelMsg ms rId q mId1
(Nothing, Msg "message 2") <- tryDelPeekMsg ms rId q mId1
(Msg "message 2", Msg "message 3") <- tryDelPeekMsg ms rId q mId2
(Nothing, Msg "message 3") <- tryDelPeekMsg ms rId q mId2
Msg "message 3" <- tryPeekMsg ms rId q
(Msg "message 3", Nothing) <- tryDelPeekMsg ms rId q mId3
Nothing <- tryDelMsg ms rId q mId2
Nothing <- tryDelMsg ms rId q mId3
Nothing <- tryPeekMsg ms rId q
Msg "message 1" <- tryPeekMsg ms q
Msg "message 1" <- tryPeekMsg ms q
Nothing <- tryDelMsg ms q mId2
Msg "message 1" <- tryDelMsg ms q mId1
Nothing <- tryDelMsg ms q mId1
Msg "message 2" <- tryPeekMsg ms q
Nothing <- tryDelMsg ms q mId1
(Nothing, Msg "message 2") <- tryDelPeekMsg ms q mId1
(Msg "message 2", Msg "message 3") <- tryDelPeekMsg ms q mId2
(Nothing, Msg "message 3") <- tryDelPeekMsg ms q mId2
Msg "message 3" <- tryPeekMsg ms q
(Msg "message 3", Nothing) <- tryDelPeekMsg ms q mId3
Nothing <- tryDelMsg ms q mId2
Nothing <- tryDelMsg ms q mId3
Nothing <- tryPeekMsg ms q
Just (Message {msgId = mId4}, True) <- write "message 4"
Msg "message 4" <- tryPeekMsg ms rId q
Msg "message 4" <- tryPeekMsg ms q
Just (Message {msgId = mId5}, False) <- write "message 5"
(Nothing, Msg "message 4") <- tryDelPeekMsg ms rId q mId3
(Msg "message 4", Msg "message 5") <- tryDelPeekMsg ms rId q mId4
(Nothing, Msg "message 4") <- tryDelPeekMsg ms q mId3
(Msg "message 4", Msg "message 5") <- tryDelPeekMsg ms q mId4
Just (Message {msgId = mId6}, False) <- write "message 6"
Just (Message {msgId = mId7}, False) <- write "message 7"
Nothing <- write "message 8"
Msg "message 5" <- tryPeekMsg ms rId q
(Nothing, Msg "message 5") <- tryDelPeekMsg ms rId q mId4
(Msg "message 5", Msg "message 6") <- tryDelPeekMsg ms rId q mId5
(Msg "message 6", Msg "message 7") <- tryDelPeekMsg ms rId q mId6
(Msg "message 7", Just MessageQuota {msgId = mId8}) <- tryDelPeekMsg ms rId q mId7
(Just MessageQuota {}, Nothing) <- tryDelPeekMsg ms rId q mId8
(Nothing, Nothing) <- tryDelPeekMsg ms rId q mId8
void $ ExceptT $ deleteQueue ms rId q
Msg "message 5" <- tryPeekMsg ms q
(Nothing, Msg "message 5") <- tryDelPeekMsg ms q mId4
(Msg "message 5", Msg "message 6") <- tryDelPeekMsg ms q mId5
(Msg "message 6", Msg "message 7") <- tryDelPeekMsg ms q mId6
(Msg "message 7", Just MessageQuota {msgId = mId8}) <- tryDelPeekMsg ms q mId7
(Just MessageQuota {}, Nothing) <- tryDelPeekMsg ms q mId8
(Nothing, Nothing) <- tryDelPeekMsg ms q mId8
void $ ExceptT $ deleteQueue ms q
testChangeReadJournal :: STMQueueStore s => s -> IO ()
testChangeReadJournal :: STMStoreClass s => s -> IO ()
testChangeReadJournal ms = do
g <- C.newRandom
(rId, qr) <- testNewQueueRec g True
runRight_ $ do
q <- ExceptT $ addQueue ms qr
let write s = writeMsg ms rId q True =<< mkMessage s
q <- ExceptT $ addQueue ms rId qr
let write s = writeMsg ms q True =<< mkMessage s
Just (Message {msgId = mId1}, True) <- write "message 1"
(Msg "message 1", Nothing) <- tryDelPeekMsg ms rId q mId1
(Msg "message 1", Nothing) <- tryDelPeekMsg ms q mId1
Just (Message {msgId = mId2}, True) <- write "message 2"
(Msg "message 2", Nothing) <- tryDelPeekMsg ms rId q mId2
(Msg "message 2", Nothing) <- tryDelPeekMsg ms q mId2
Just (Message {msgId = mId3}, True) <- write "message 3"
(Msg "message 3", Nothing) <- tryDelPeekMsg ms rId q mId3
(Msg "message 3", Nothing) <- tryDelPeekMsg ms q mId3
Just (Message {msgId = mId4}, True) <- write "message 4"
(Msg "message 4", Nothing) <- tryDelPeekMsg ms rId q mId4
(Msg "message 4", Nothing) <- tryDelPeekMsg ms q mId4
Just (Message {msgId = mId5}, True) <- write "message 5"
(Msg "message 5", Nothing) <- tryDelPeekMsg ms rId q mId5
void $ ExceptT $ deleteQueue ms rId q
(Msg "message 5", Nothing) <- tryDelPeekMsg ms q mId5
void $ ExceptT $ deleteQueue ms q
testExportImportStore :: JournalMsgStore -> IO ()
testExportImportStore ms = do
@@ -185,21 +184,21 @@ testExportImportStore ms = do
(rId2, qr2) <- testNewQueueRec g True
sl <- readWriteQueueStore testStoreLogFile ms
runRight_ $ do
let write rId q s = writeMsg ms rId q True =<< mkMessage s
q1 <- ExceptT $ addQueue ms qr1
liftIO $ logCreateQueue sl qr1
Just (Message {}, True) <- write rId1 q1 "message 1"
Just (Message {}, False) <- write rId1 q1 "message 2"
q2 <- ExceptT $ addQueue ms qr2
liftIO $ logCreateQueue sl qr2
Just (Message {msgId = mId3}, True) <- write rId2 q2 "message 3"
Just (Message {msgId = mId4}, False) <- write rId2 q2 "message 4"
(Msg "message 3", Msg "message 4") <- tryDelPeekMsg ms rId2 q2 mId3
(Msg "message 4", Nothing) <- tryDelPeekMsg ms rId2 q2 mId4
Just (Message {}, True) <- write rId2 q2 "message 5"
Just (Message {}, False) <- write rId2 q2 "message 6"
Just (Message {}, False) <- write rId2 q2 "message 7"
Nothing <- write rId2 q2 "message 8"
let write q s = writeMsg ms q True =<< mkMessage s
q1 <- ExceptT $ addQueue ms rId1 qr1
liftIO $ logCreateQueue sl rId1 qr1
Just (Message {}, True) <- write q1 "message 1"
Just (Message {}, False) <- write q1 "message 2"
q2 <- ExceptT $ addQueue ms rId2 qr2
liftIO $ logCreateQueue sl rId2 qr2
Just (Message {msgId = mId3}, True) <- write q2 "message 3"
Just (Message {msgId = mId4}, False) <- write q2 "message 4"
(Msg "message 3", Msg "message 4") <- tryDelPeekMsg ms q2 mId3
(Msg "message 4", Nothing) <- tryDelPeekMsg ms q2 mId4
Just (Message {}, True) <- write q2 "message 5"
Just (Message {}, False) <- write q2 "message 6"
Just (Message {}, False) <- write q2 "message 7"
Nothing <- write q2 "message 8"
pure ()
length <$> listDirectory (msgQueueDirectory ms rId1) `shouldReturn` 2
length <$> listDirectory (msgQueueDirectory ms rId2) `shouldReturn` 3
@@ -297,10 +296,10 @@ testMessageState ms = do
(rId, qr) <- testNewQueueRec g True
let dir = msgQueueDirectory ms rId
statePath = msgQueueStatePath dir $ B.unpack (B64.encode $ unEntityId rId)
write q s = writeMsg ms rId q True =<< mkMessage s
write q s = writeMsg ms q True =<< mkMessage s
mId1 <- runRight $ do
q <- ExceptT $ addQueue ms qr
q <- ExceptT $ addQueue ms rId qr
Just (Message {msgId = mId1}, True) <- write q "message 1"
Just (Message {}, False) <- write q "message 2"
liftIO $ closeMsgQueue q
@@ -312,19 +311,19 @@ testMessageState ms = do
runRight_ $ do
q <- ExceptT $ getQueue ms SRecipient rId
Just (Message {msgId = mId3}, False) <- write q "message 3"
(Msg "message 1", Msg "message 3") <- tryDelPeekMsg ms rId q mId1
(Msg "message 3", Nothing) <- tryDelPeekMsg ms rId q mId3
(Msg "message 1", Msg "message 3") <- tryDelPeekMsg ms q mId1
(Msg "message 3", Nothing) <- tryDelPeekMsg ms q mId3
liftIO $ closeMsgQueue q
testReadFileMissing :: JournalMsgStore -> IO ()
testReadFileMissing ms = do
g <- C.newRandom
(rId, qr) <- testNewQueueRec g True
let write q s = writeMsg ms rId q True =<< mkMessage s
let write q s = writeMsg ms q True =<< mkMessage s
q <- runRight $ do
q <- ExceptT $ addQueue ms qr
q <- ExceptT $ addQueue ms rId qr
Just (Message {}, True) <- write q "message 1"
Msg "message 1" <- tryPeekMsg ms rId q
Msg "message 1" <- tryPeekMsg ms q
pure q
mq <- fromJust <$> readTVarIO (msgQueue_' q)
@@ -335,9 +334,9 @@ testReadFileMissing ms = do
runRight_ $ do
q' <- ExceptT $ getQueue ms SRecipient rId
Nothing <- tryPeekMsg ms rId q'
Nothing <- tryPeekMsg ms q'
Just (Message {}, True) <- write q' "message 2"
Msg "message 2" <- tryPeekMsg ms rId q'
Msg "message 2" <- tryPeekMsg ms q'
pure ()
testReadFileMissingSwitch :: JournalMsgStore -> IO ()
@@ -354,8 +353,8 @@ testReadFileMissingSwitch ms = do
runRight_ $ do
q' <- ExceptT $ getQueue ms SRecipient rId
Just (Message {}, False) <- writeMsg ms rId q' True =<< mkMessage "message 6"
Msg "message 5" <- tryPeekMsg ms rId q'
Just (Message {}, False) <- writeMsg ms q' True =<< mkMessage "message 6"
Msg "message 5" <- tryPeekMsg ms q'
pure ()
testWriteFileMissing :: JournalMsgStore -> IO ()
@@ -373,12 +372,12 @@ testWriteFileMissing ms = do
runRight_ $ do
q' <- ExceptT $ getQueue ms SRecipient rId
Just Message {msgId = mId3} <- tryPeekMsg ms rId q'
(Msg "message 3", Msg "message 4") <- tryDelPeekMsg ms rId q' mId3
Just Message {msgId = mId4} <- tryPeekMsg ms rId q'
(Msg "message 4", Nothing) <- tryDelPeekMsg ms rId q' mId4
Just (Message {}, True) <- writeMsg ms rId q' True =<< mkMessage "message 6"
Msg "message 6" <- tryPeekMsg ms rId q'
Just Message {msgId = mId3} <- tryPeekMsg ms q'
(Msg "message 3", Msg "message 4") <- tryDelPeekMsg ms q' mId3
Just Message {msgId = mId4} <- tryPeekMsg ms q'
(Msg "message 4", Nothing) <- tryDelPeekMsg ms q' mId4
Just (Message {}, True) <- writeMsg ms q' True =<< mkMessage "message 6"
Msg "message 6" <- tryPeekMsg ms q'
pure ()
testReadAndWriteFilesMissing :: JournalMsgStore -> IO ()
@@ -395,20 +394,20 @@ testReadAndWriteFilesMissing ms = do
runRight_ $ do
q' <- ExceptT $ getQueue ms SRecipient rId
Nothing <- tryPeekMsg ms rId q'
Just (Message {}, True) <- writeMsg ms rId q' True =<< mkMessage "message 6"
Msg "message 6" <- tryPeekMsg ms rId q'
Nothing <- tryPeekMsg ms q'
Just (Message {}, True) <- writeMsg ms q' True =<< mkMessage "message 6"
Msg "message 6" <- tryPeekMsg ms q'
pure ()
writeMessages :: JournalMsgStore -> RecipientId -> QueueRec -> IO JournalQueue
writeMessages ms rId qr = runRight $ do
q <- ExceptT $ addQueue ms qr
let write s = writeMsg ms rId q True =<< mkMessage s
q <- ExceptT $ addQueue ms rId qr
let write s = writeMsg ms q True =<< mkMessage s
Just (Message {msgId = mId1}, True) <- write "message 1"
Just (Message {msgId = mId2}, False) <- write "message 2"
Just (Message {}, False) <- write "message 3"
(Msg "message 1", Msg "message 2") <- tryDelPeekMsg ms rId q mId1
(Msg "message 2", Msg "message 3") <- tryDelPeekMsg ms rId q mId2
(Msg "message 1", Msg "message 2") <- tryDelPeekMsg ms q mId1
(Msg "message 2", Msg "message 3") <- tryDelPeekMsg ms q mId2
Just (Message {}, False) <- write "message 4"
Just (Message {}, False) <- write "message 5"
pure q
+12 -12
View File
@@ -60,38 +60,38 @@ storeLogTests =
("SMP server store log, sndSecure = " <> show sndSecure)
[ SLTC
{ name = "create new queue",
saved = [CreateQueue qr],
compacted = [CreateQueue qr],
saved = [CreateQueue rId qr],
compacted = [CreateQueue rId qr],
state = M.fromList [(rId, qr)]
},
SLTC
{ name = "secure queue",
saved = [CreateQueue qr, SecureQueue rId testPublicAuthKey],
compacted = [CreateQueue qr {senderKey = Just testPublicAuthKey}],
saved = [CreateQueue rId qr, SecureQueue rId testPublicAuthKey],
compacted = [CreateQueue rId qr {senderKey = Just testPublicAuthKey}],
state = M.fromList [(rId, qr {senderKey = Just testPublicAuthKey})]
},
SLTC
{ name = "create and delete queue",
saved = [CreateQueue qr, DeleteQueue rId],
saved = [CreateQueue rId qr, DeleteQueue rId],
compacted = [],
state = M.fromList []
},
SLTC
{ name = "create queue and add notifier",
saved = [CreateQueue qr, AddNotifier rId ntfCreds],
compacted = [CreateQueue $ qr {notifier = Just ntfCreds}],
saved = [CreateQueue rId qr, AddNotifier rId ntfCreds],
compacted = [CreateQueue rId qr {notifier = Just ntfCreds}],
state = M.fromList [(rId, qr {notifier = Just ntfCreds})]
},
SLTC
{ name = "delete notifier",
saved = [CreateQueue qr, AddNotifier rId ntfCreds, DeleteNotifier rId],
compacted = [CreateQueue qr],
saved = [CreateQueue rId qr, AddNotifier rId ntfCreds, DeleteNotifier rId],
compacted = [CreateQueue rId qr],
state = M.fromList [(rId, qr)]
},
SLTC
{ name = "update time",
saved = [CreateQueue qr, UpdateTime rId date],
compacted = [CreateQueue qr {updatedAt = Just date}],
saved = [CreateQueue rId qr, UpdateTime rId date],
compacted = [CreateQueue rId qr {updatedAt = Just date}],
state = M.fromList [(rId, qr {updatedAt = Just date})]
}
]
@@ -112,4 +112,4 @@ testSMPStoreLog testSuite tests =
([], compacted') <- partitionEithers . map strDecode . B.lines <$> B.readFile testStoreLogFile
compacted' `shouldBe` compacted
storeState :: JournalMsgStore -> IO (M.Map RecipientId QueueRec)
storeState st = M.mapMaybe id <$> (readTVarIO (queues st) >>= mapM (readTVarIO . queueRec'))
storeState st = M.mapMaybe id <$> (readTVarIO (queues $ stmQueueStore st) >>= mapM (readTVarIO . queueRec'))