mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-24 16:45:16 +00:00
smp server: merge quota messages and set queue to "over quota" state after restoring, server tests with journal and memory store (#1380)
* smp: run server tests with journal and memory store * merge quota messages, set queue to "over quota" state after restoring * fix test
This commit is contained in:
@@ -155,7 +155,7 @@ smpServer :: TMVar Bool -> ServerConfig -> Maybe AttachHTTP -> M ()
|
||||
smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHTTP_ = do
|
||||
s <- asks server
|
||||
pa <- asks proxyAgent
|
||||
msgStats <- restoreServerMessages
|
||||
msgStats <- processServerMessages
|
||||
ntfStats <- restoreServerNtfs
|
||||
liftIO $ printMessageStats "messages" msgStats
|
||||
liftIO $ printMessageStats "notifications" ntfStats
|
||||
@@ -215,8 +215,6 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
|
||||
saveServer :: Bool -> M ()
|
||||
saveServer drainMsgs = do
|
||||
withLog closeStoreLog
|
||||
AMS _ ms <- asks msgStore
|
||||
liftIO $ closeMsgStore ms
|
||||
saveServerMessages drainMsgs
|
||||
saveServerNtfs
|
||||
saveServerStats
|
||||
@@ -383,7 +381,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
|
||||
void $ withActiveMsgQueues ms (\_ -> expireQueueMsgs stats old) 0
|
||||
where
|
||||
expireQueueMsgs stats old q acc =
|
||||
runExceptT (deleteExpiredMsgs q old) >>= \case
|
||||
runExceptT (deleteExpiredMsgs q True old) >>= \case
|
||||
Right deleted -> (acc + deleted) <$ atomicModifyIORef'_ (msgExpired stats) (+ deleted)
|
||||
Left _ -> pure 0
|
||||
|
||||
@@ -1476,7 +1474,7 @@ client thParams' clnt@Client {clientId, subscriptions, ntfSubscriptions, rcvQ, s
|
||||
|
||||
expireMessages :: MsgStoreClass s => MsgQueue s -> Maybe ExpirationConfig -> ServerStats -> ExceptT ErrorType IO ()
|
||||
expireMessages q msgExp stats = do
|
||||
deleted <- maybe (pure 0) (deleteExpiredMsgs q <=< liftIO . expireBeforeEpoch) msgExp
|
||||
deleted <- maybe (pure 0) (deleteExpiredMsgs q True <=< liftIO . expireBeforeEpoch) msgExp
|
||||
liftIO $ when (deleted > 0) $ atomicModifyIORef'_ (msgExpired stats) (+ deleted)
|
||||
|
||||
-- The condition for delivery of the message is:
|
||||
@@ -1733,7 +1731,9 @@ saveServerMessages drainMsgs =
|
||||
AMS SMSMemory ms@STMMsgStore {storeConfig = STMStoreConfig {storePath}} -> case storePath of
|
||||
Just f -> liftIO $ exportMessages ms f drainMsgs
|
||||
Nothing -> logInfo "undelivered messages are not saved"
|
||||
AMS SMSJournal _ -> logInfo "closed journal message storage"
|
||||
AMS SMSJournal ms -> do
|
||||
liftIO $ closeMsgStore ms
|
||||
logInfo "closed journal message storage"
|
||||
|
||||
exportMessages :: MsgStoreClass s => s -> FilePath -> Bool -> IO ()
|
||||
exportMessages ms f drainMsgs = do
|
||||
@@ -1744,8 +1744,8 @@ exportMessages ms f drainMsgs = do
|
||||
saveQueueMsgs h rId q acc = getQueueMessages drainMsgs q >>= \msgs -> (acc + length msgs) <$ BLD.hPutBuilder h (encodeMessages rId msgs)
|
||||
encodeMessages rId = mconcat . map (\msg -> BLD.byteString (strEncode $ MLRv3 rId msg) <> BLD.char8 '\n')
|
||||
|
||||
restoreServerMessages :: M MessageStats
|
||||
restoreServerMessages = do
|
||||
processServerMessages :: M MessageStats
|
||||
processServerMessages = do
|
||||
old_ <- asks (messageExpiration . config) $>>= (liftIO . fmap Just . expireBeforeEpoch)
|
||||
asks msgStore >>= liftIO . processMessages old_
|
||||
where
|
||||
@@ -1772,23 +1772,25 @@ restoreServerMessages = do
|
||||
exitFailure
|
||||
where
|
||||
expireQueue = do
|
||||
expired'' <- deleteExpiredMsgs q old
|
||||
expired'' <- deleteExpiredMsgs q False old
|
||||
stored'' <- liftIO $ getQueueSize q
|
||||
liftIO $ logQueueState q
|
||||
liftIO $ closeMsgQueue q
|
||||
pure (stored'', expired'')
|
||||
processValidateQueue q (!stored, !qCount) = getQueueSize q >>= \stored' -> pure (stored + stored', qCount + 1)
|
||||
|
||||
importMessages :: MsgStoreClass s => s -> FilePath -> Maybe Int64 -> IO MessageStats
|
||||
importMessages :: forall s. MsgStoreClass s => s -> FilePath -> Maybe Int64 -> IO MessageStats
|
||||
importMessages ms f old_ = do
|
||||
logInfo $ "restoring messages from file " <> T.pack f
|
||||
LB.readFile f >>= runExceptT . foldM restoreMsg (0, 0, 0) . zip [0..] . LB.lines >>= \case
|
||||
LB.readFile f >>= runExceptT . foldM restoreMsg (0, Nothing, (0, 0, M.empty)) . zip [0..] . LB.lines >>= \case
|
||||
Left e -> do
|
||||
putStrLn ""
|
||||
logError . T.pack $ "error restoring messages: " <> e
|
||||
liftIO exitFailure
|
||||
Right (lineCount, storedMsgsCount, expiredMsgsCount) -> do
|
||||
Right (lineCount, _, (storedMsgsCount, expiredMsgsCount, overQuota)) -> do
|
||||
putStrLn $ "Processed " <> show lineCount <> " lines"
|
||||
renameFile f $ f <> ".bak"
|
||||
mapM_ setOverQuota_ overQuota
|
||||
logQueueStates ms
|
||||
storedQueues <- M.size <$> readTVarIO (activeMsgQueues ms)
|
||||
pure MessageStats {storedMsgsCount, expiredMsgsCount, storedQueues}
|
||||
@@ -1796,24 +1798,37 @@ importMessages ms f old_ = do
|
||||
progress i = do
|
||||
liftIO $ putStr $ "Processed " <> show i <> " lines\r"
|
||||
hFlush stdout
|
||||
restoreMsg :: (Int, Int, Int) -> (Int, LB.ByteString) -> ExceptT String IO (Int, Int, Int)
|
||||
restoreMsg (!lineCount, !stored, !expired) (i, s') = do
|
||||
restoreMsg :: (Int, Maybe (RecipientId, MsgQueue s), (Int, Int, M.Map RecipientId (MsgQueue s))) -> (Int, LB.ByteString) -> ExceptT String IO (Int, Maybe (RecipientId, MsgQueue s), (Int, Int, M.Map RecipientId (MsgQueue s)))
|
||||
restoreMsg (!lineCount, q_, (!stored, !expired, !overQuota)) (i, s') = do
|
||||
when (i `mod` 1000 == 0) $ progress i
|
||||
MLRv3 rId msg <- liftEither . first (msgErr "parsing") $ strDecode s
|
||||
liftError show $ addToMsgQueue rId msg
|
||||
where
|
||||
s = LB.toStrict s'
|
||||
addToMsgQueue rId msg = do
|
||||
q <- getMsgQueue ms rId
|
||||
(isExpired, logFull) <- case msg of
|
||||
q <- case q_ of
|
||||
-- to avoid lookup when restoring the next message to the same queue
|
||||
Just (rId', q') | rId' == rId -> pure q'
|
||||
_ -> getMsgQueue ms rId
|
||||
(lineCount + 1,Just (rId, q),) <$> case msg of
|
||||
Message {msgTs}
|
||||
| maybe True (systemSeconds msgTs >=) old_ -> (False,) . isNothing <$> writeMsg ms q False msg
|
||||
| otherwise -> pure (True, False)
|
||||
MessageQuota {} -> writeMsg ms q False msg $> (False, False)
|
||||
when logFull . logError . decodeLatin1 $ "message queue " <> strEncode rId <> " is full, message not restored: " <> strEncode (messageId msg)
|
||||
let !stored' = if logFull || isExpired then stored else stored + 1
|
||||
!expired' = if isExpired then expired + 1 else expired
|
||||
pure (lineCount + 1, stored', expired')
|
||||
| maybe True (systemSeconds msgTs >=) old_ -> do
|
||||
writeMsg ms q False msg >>= \case
|
||||
Just _ -> pure (stored + 1, expired, overQuota)
|
||||
Nothing -> do
|
||||
logError $ decodeLatin1 $ "message queue " <> strEncode rId <> " is full, message not restored: " <> strEncode (messageId msg)
|
||||
pure (stored, expired, overQuota)
|
||||
| otherwise -> pure (stored, expired + 1, overQuota)
|
||||
MessageQuota {} ->
|
||||
-- queue was over quota at some point,
|
||||
-- it will be set as over quota once fully imported
|
||||
mergeQuotaMsgs >> writeMsg ms q False msg $> (stored, expired, M.insert rId q overQuota)
|
||||
where
|
||||
-- if the first message in queue head is "quota", remove it.
|
||||
mergeQuotaMsgs = isolateQueue q "mergeQuotaMsgs" $
|
||||
tryPeekMsg_ q >>= \case
|
||||
Just MessageQuota {} -> tryDeleteMsg_ q False
|
||||
_ -> pure ()
|
||||
msgErr :: Show e => String -> e -> String
|
||||
msgErr op e = op <> " error (" <> show e <> "): " <> B.unpack (B.take 100 s)
|
||||
|
||||
|
||||
@@ -28,7 +28,7 @@ module Simplex.Messaging.Server.MsgStore.Journal
|
||||
readWriteQueueState,
|
||||
newMsgQueueState,
|
||||
newJournalId,
|
||||
logQueueState,
|
||||
appendState,
|
||||
queueLogFileName,
|
||||
logFileExt,
|
||||
)
|
||||
@@ -255,11 +255,12 @@ instance MsgStoreClass JournalMsgStore where
|
||||
(Nothing <$ putStrLn ("Error: path " <> path' <> " is not a directory, skipping"))
|
||||
|
||||
logQueueStates :: JournalMsgStore -> IO ()
|
||||
logQueueStates st = withActiveMsgQueues st (\_ q _ -> logState q) ()
|
||||
where
|
||||
logState q =
|
||||
readTVarIO (handles q)
|
||||
>>= maybe (pure ()) (\hs -> readTVarIO (state q) >>= logQueueState (stateHandle hs))
|
||||
logQueueStates ms = withActiveMsgQueues ms (\_ q _ -> logQueueState q) ()
|
||||
|
||||
logQueueState :: JournalMsgQueue -> IO ()
|
||||
logQueueState q =
|
||||
readTVarIO (handles q)
|
||||
>>= maybe (pure ()) (\hs -> readTVarIO (state q) >>= appendState (stateHandle hs))
|
||||
|
||||
getMsgQueue :: JournalMsgStore -> RecipientId -> ExceptT ErrorType IO JournalMsgQueue
|
||||
getMsgQueue ms@JournalMsgStore {queueLocks, msgQueues, random} rId =
|
||||
@@ -318,7 +319,7 @@ instance MsgStoreClass JournalMsgStore where
|
||||
else pure Nothing
|
||||
where
|
||||
JournalStoreConfig {quota, maxMsgCount} = config ms
|
||||
msgQuota = MessageQuota {msgId = msgId msg, msgTs = msgTs msg}
|
||||
!msgQuota = MessageQuota {msgId = msgId msg, msgTs = msgTs msg}
|
||||
writeToJournal st@MsgQueueState {writeState, readState = rs, size} canWrt' msg' = do
|
||||
let msgStr = strEncode msg' `B.snoc` '\n'
|
||||
msgLen = fromIntegral $ B.length msgStr
|
||||
@@ -349,6 +350,10 @@ instance MsgStoreClass JournalMsgStore where
|
||||
atomically $ writeTVar handles $ Just $ hs {writeHandle = Just wh}
|
||||
pure (newJournalState journalId, wh)
|
||||
|
||||
-- can ONLY be used while restoring messages, not while server running
|
||||
setOverQuota_ :: JournalMsgQueue -> IO ()
|
||||
setOverQuota_ JournalMsgQueue {state} = atomically $ modifyTVar' state $ \st -> st {canWrite = False}
|
||||
|
||||
getQueueSize :: JournalMsgQueue -> IO Int
|
||||
getQueueSize JournalMsgQueue {state} = size <$> readTVarIO state
|
||||
|
||||
@@ -363,13 +368,13 @@ instance MsgStoreClass JournalMsgStore where
|
||||
atomically $ writeTVar tipMsg $ Just (Just ml)
|
||||
pure $ Just msg
|
||||
|
||||
tryDeleteMsg_ :: JournalMsgQueue -> StoreIO ()
|
||||
tryDeleteMsg_ q@JournalMsgQueue {tipMsg, handles} = StoreIO $
|
||||
tryDeleteMsg_ :: JournalMsgQueue -> Bool -> StoreIO ()
|
||||
tryDeleteMsg_ q@JournalMsgQueue {tipMsg, handles} logState = StoreIO $
|
||||
void $
|
||||
readTVarIO tipMsg -- if there is no cached tipMsg, do nothing
|
||||
$>>= (pure . fmap snd)
|
||||
$>>= \len -> readTVarIO handles
|
||||
$>>= \hs -> updateReadPos q True len hs $> Just ()
|
||||
$>>= \hs -> updateReadPos q logState len hs $> Just ()
|
||||
|
||||
isolateQueue :: JournalMsgQueue -> String -> StoreIO a -> ExceptT ErrorType IO a
|
||||
isolateQueue q op (StoreIO a) = tryStore op $ withLock' (queueLock $ queue q) op $ a
|
||||
@@ -416,11 +421,11 @@ chooseReadJournal q log' hs = do
|
||||
updateQueueState :: JournalMsgQueue -> Bool -> MsgQueueHandles -> MsgQueueState -> IO ()
|
||||
updateQueueState q log' hs st = do
|
||||
unless (validQueueState st) $ E.throwIO $ userError $ "updating to invalid state: " <> show st
|
||||
when log' $ logQueueState (stateHandle hs) st
|
||||
when log' $ appendState (stateHandle hs) st
|
||||
atomically $ writeTVar (state q) st
|
||||
|
||||
logQueueState :: Handle -> MsgQueueState -> IO ()
|
||||
logQueueState h st = B.hPutStr h $ strEncode st `B.snoc` '\n'
|
||||
appendState :: Handle -> MsgQueueState -> IO ()
|
||||
appendState h st = B.hPutStr h $ strEncode st `B.snoc` '\n'
|
||||
|
||||
updateReadPos :: JournalMsgQueue -> Bool -> Int64 -> MsgQueueHandles -> IO ()
|
||||
updateReadPos q log' len hs = do
|
||||
@@ -555,7 +560,7 @@ readWriteQueueState JournalMsgStore {random, config} statePath =
|
||||
pure r
|
||||
writeQueueState st = do
|
||||
sh <- openFile statePath AppendMode
|
||||
logQueueState sh st
|
||||
appendState sh st
|
||||
pure (st, sh)
|
||||
|
||||
validQueueState :: MsgQueueState -> Bool
|
||||
|
||||
@@ -62,6 +62,8 @@ instance MsgStoreClass STMMsgStore where
|
||||
|
||||
logQueueStates _ = pure ()
|
||||
|
||||
logQueueState _ = pure ()
|
||||
|
||||
-- The reason for double lookup is that majority of messaging queues exist,
|
||||
-- because multiple messages are sent to the same queue,
|
||||
-- so the first lookup without STM transaction will return the queue faster.
|
||||
@@ -104,10 +106,13 @@ instance MsgStoreClass STMMsgStore where
|
||||
modifyTVar' size (+ 1)
|
||||
if canWrt'
|
||||
then writeTQueue q msg $> Just (msg, empty)
|
||||
else (writeTQueue q $! msgQuota) $> Nothing
|
||||
else writeTQueue q msgQuota $> Nothing
|
||||
else pure Nothing
|
||||
where
|
||||
msgQuota = MessageQuota {msgId = msgId msg, msgTs = msgTs msg}
|
||||
!msgQuota = MessageQuota {msgId = msgId msg, msgTs = msgTs msg}
|
||||
|
||||
setOverQuota_ :: STMMsgQueue -> IO ()
|
||||
setOverQuota_ q = atomically $ writeTVar (canWrite q) False
|
||||
|
||||
getQueueSize :: STMMsgQueue -> IO Int
|
||||
getQueueSize STMMsgQueue {size} = readTVarIO size
|
||||
@@ -116,8 +121,8 @@ instance MsgStoreClass STMMsgStore where
|
||||
tryPeekMsg_ = tryPeekTQueue . msgQueue
|
||||
{-# INLINE tryPeekMsg_ #-}
|
||||
|
||||
tryDeleteMsg_ :: STMMsgQueue -> STM ()
|
||||
tryDeleteMsg_ STMMsgQueue {msgQueue = q, size} =
|
||||
tryDeleteMsg_ :: STMMsgQueue -> Bool -> STM ()
|
||||
tryDeleteMsg_ STMMsgQueue {msgQueue = q, size} _logState =
|
||||
tryReadTQueue q >>= \case
|
||||
Just _ -> modifyTVar' size (subtract 1)
|
||||
_ -> pure ()
|
||||
|
||||
@@ -26,14 +26,16 @@ class Monad (StoreMonad s) => MsgStoreClass s where
|
||||
activeMsgQueues :: s -> TMap RecipientId (MsgQueue s)
|
||||
withAllMsgQueues :: s -> (RecipientId -> MsgQueue s -> a -> IO a) -> a -> IO a
|
||||
logQueueStates :: s -> IO ()
|
||||
logQueueState :: MsgQueue s -> IO ()
|
||||
getMsgQueue :: s -> RecipientId -> ExceptT ErrorType IO (MsgQueue s)
|
||||
delMsgQueue :: s -> RecipientId -> IO ()
|
||||
delMsgQueueSize :: s -> RecipientId -> IO Int
|
||||
getQueueMessages :: Bool -> MsgQueue s -> IO [Message]
|
||||
writeMsg :: s -> MsgQueue s -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool))
|
||||
setOverQuota_ :: MsgQueue s -> IO () -- can ONLY be used while restoring messages, not while server running
|
||||
getQueueSize :: MsgQueue s -> IO Int
|
||||
tryPeekMsg_ :: MsgQueue s -> StoreMonad s (Maybe Message)
|
||||
tryDeleteMsg_ :: MsgQueue s -> StoreMonad s ()
|
||||
tryDeleteMsg_ :: MsgQueue s -> Bool -> StoreMonad s ()
|
||||
isolateQueue :: MsgQueue s -> String -> StoreMonad s a -> ExceptT ErrorType IO a
|
||||
|
||||
data MSType = MSMemory | MSJournal
|
||||
@@ -57,7 +59,7 @@ tryDelMsg mq msgId' =
|
||||
tryPeekMsg_ mq >>= \case
|
||||
msg_@(Just msg)
|
||||
| msgId msg == msgId' ->
|
||||
tryDeleteMsg_ mq >> pure msg_
|
||||
tryDeleteMsg_ mq True >> pure msg_
|
||||
_ -> pure Nothing
|
||||
|
||||
-- atomic delete (== read) last and peek next message if available
|
||||
@@ -66,16 +68,16 @@ tryDelPeekMsg mq msgId' =
|
||||
isolateQueue mq "tryDelPeekMsg" $
|
||||
tryPeekMsg_ mq >>= \case
|
||||
msg_@(Just msg)
|
||||
| msgId msg == msgId' -> (msg_,) <$> (tryDeleteMsg_ mq >> tryPeekMsg_ mq)
|
||||
| msgId msg == msgId' -> (msg_,) <$> (tryDeleteMsg_ mq True >> tryPeekMsg_ mq)
|
||||
| otherwise -> pure (Nothing, msg_)
|
||||
_ -> pure (Nothing, Nothing)
|
||||
|
||||
deleteExpiredMsgs :: MsgStoreClass s => MsgQueue s -> Int64 -> ExceptT ErrorType IO Int
|
||||
deleteExpiredMsgs mq old = isolateQueue mq "deleteExpiredMsgs" $ loop 0
|
||||
deleteExpiredMsgs :: MsgStoreClass s => MsgQueue s -> Bool -> Int64 -> ExceptT ErrorType IO Int
|
||||
deleteExpiredMsgs mq logState old = isolateQueue mq "deleteExpiredMsgs" $ loop 0
|
||||
where
|
||||
loop dc =
|
||||
tryPeekMsg_ mq >>= \case
|
||||
Just Message {msgTs}
|
||||
| systemSeconds msgTs < old ->
|
||||
tryDeleteMsg_ mq >> loop (dc + 1)
|
||||
tryDeleteMsg_ mq logState >> loop (dc + 1)
|
||||
_ -> pure dc
|
||||
|
||||
Reference in New Issue
Block a user