do not close queue state when queue is opened for writing

This commit is contained in:
Evgeny Poberezkin
2025-02-16 20:41:11 +00:00
parent 4a3f01ff9c
commit a7a4e278e0
7 changed files with 154 additions and 55 deletions

View File

@@ -1,7 +1,7 @@
cabal-version: 1.12
name: simplexmq
version: 6.3.0.500
version: 6.3.0.501
synopsis: SimpleXMQ message broker
description: This package includes <./docs/Simplex-Messaging-Server.html server>,
<./docs/Simplex-Messaging-Client.html client> and

View File

@@ -25,7 +25,7 @@ import Data.List (intercalate)
import Data.List.NonEmpty (NonEmpty)
import Data.Maybe (isJust, isNothing)
import qualified Data.Text as T
import Data.Time.Clock (getCurrentTime)
import Data.Time.Clock (addUTCTime, getCurrentTime, nominalDay)
import Data.Time.Clock.System (SystemTime)
import qualified Data.X509 as X
import Data.X509.Validation (Fingerprint (..))
@@ -297,9 +297,9 @@ newEnv config@ServerConfig {smpCredentials, httpCredentials, storeLogFile, msgSt
msgStore@(AMS _ store) <- case msgStoreType of
AMSType SMSMemory -> AMS SMSMemory <$> newMsgStore STMStoreConfig {storePath = storeMsgsFile, quota = msgQueueQuota}
AMSType SMSJournal -> case storeMsgsFile of
Just storePath ->
let cfg = JournalStoreConfig {storePath, quota = msgQueueQuota, pathParts = journalMsgStoreDepth, maxMsgCount = maxJournalMsgCount, maxStateLines = maxJournalStateLines, stateTailSize = defaultStateTailSize, idleInterval = idleQueueInterval}
in AMS SMSJournal <$> newMsgStore cfg
Just storePath -> do
cfg <- mkJournalStoreConfig storePath msgQueueQuota maxJournalMsgCount maxJournalStateLines idleQueueInterval
AMS SMSJournal <$> newMsgStore cfg
Nothing -> putStrLn "Error: journal msg store require path in [STORE_LOG], restore_messages" >> exitFailure
ntfStore <- NtfStore <$> TM.emptyIO
random <- C.newRandom
@@ -357,6 +357,22 @@ newEnv config@ServerConfig {smpCredentials, httpCredentials, storeLogFile, msgSt
| isJust storeMsgsFile = SPMMessages
| otherwise = SPMQueues
mkJournalStoreConfig :: FilePath -> Int -> Int -> Int -> Int64 -> IO JournalStoreConfig
mkJournalStoreConfig storePath msgQueueQuota maxJournalMsgCount maxJournalStateLines idleQueueInterval = do
expireBefore <- addUTCTime (-14 * nominalDay) <$> getCurrentTime
pure
JournalStoreConfig
{ storePath,
quota = msgQueueQuota,
pathParts = journalMsgStoreDepth,
maxMsgCount = maxJournalMsgCount,
maxStateLines = maxJournalStateLines,
stateTailSize = defaultStateTailSize,
idleInterval = idleQueueInterval,
expireBefore,
keepMinBackups = 2
}
newSMPProxyAgent :: SMPClientAgentConfig -> TVar ChaChaDRG -> IO ProxyAgent
newSMPProxyAgent smpAgentCfg random = do
smpAgent <- newSMPClientAgent smpAgentCfg random

View File

@@ -44,7 +44,6 @@ import Simplex.Messaging.Server.CLI
import Simplex.Messaging.Server.Env.STM
import Simplex.Messaging.Server.Expiration
import Simplex.Messaging.Server.Information
import Simplex.Messaging.Server.MsgStore.Journal (JournalStoreConfig (..))
import Simplex.Messaging.Server.MsgStore.Types (AMSType (..), SMSType (..), newMsgStore)
import Simplex.Messaging.Server.QueueStore.STM (readQueueStore)
import Simplex.Messaging.Transport (simplexMQVersion, supportedProxyClientSMPRelayVRange, supportedServerSMPRelayVRange)
@@ -147,7 +146,9 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
doesFileExist iniFile >>= \case
True -> readIniFile iniFile >>= either exitError a
_ -> exitError $ "Error: server is not initialized (" <> iniFile <> " does not exist).\nRun `" <> executableName <> " init`."
newJournalMsgStore = newMsgStore JournalStoreConfig {storePath = storeMsgsJournalDir, pathParts = journalMsgStoreDepth, quota = defaultMsgQueueQuota, maxMsgCount = defaultMaxJournalMsgCount, maxStateLines = defaultMaxJournalStateLines, stateTailSize = defaultStateTailSize, idleInterval = checkInterval defaultMessageExpiration}
newJournalMsgStore = do
cfg <- mkJournalStoreConfig storeMsgsJournalDir defaultMsgQueueQuota defaultMaxJournalMsgCount defaultMaxJournalStateLines $ checkInterval defaultMessageExpiration
newMsgStore cfg
iniFile = combine cfgPath "smp-server.ini"
serverVersion = "SMP server v" <> simplexMQVersion
defaultServerPorts = "5223,443"

View File

@@ -48,12 +48,13 @@ import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.Functor (($>))
import Data.Int (Int64)
import Data.List (intercalate)
import Data.Maybe (catMaybes, fromMaybe, isNothing)
import Data.List (intercalate, sort)
import Data.Maybe (catMaybes, fromMaybe, isNothing, mapMaybe)
import Data.Text (Text)
import qualified Data.Text as T
import Data.Time.Clock (getCurrentTime)
import Data.Time.Clock (UTCTime, getCurrentTime)
import Data.Time.Clock.System (SystemTime (..), getSystemTime)
import Data.Time.Format.ISO8601 (iso8601Show)
import Data.Time.Format.ISO8601 (iso8601Show, iso8601ParseM)
import GHC.IO (catchAny)
import Simplex.Messaging.Agent.Client (getMapLock, withLockMap)
import Simplex.Messaging.Agent.Lock
@@ -68,7 +69,7 @@ import Simplex.Messaging.Server.StoreLog
import Simplex.Messaging.Util (ifM, tshow, whenM, ($>>=), (<$$>))
import System.Directory
import System.Exit
import System.FilePath ((</>))
import System.FilePath (takeFileName, (</>))
import System.IO (BufferMode (..), Handle, IOMode (..), SeekMode (..), stdout)
import qualified System.IO as IO
import System.Random (StdGen, genByteString, newStdGen)
@@ -91,7 +92,10 @@ data JournalStoreConfig = JournalStoreConfig
maxStateLines :: Int,
stateTailSize :: Int,
-- time in seconds after which the queue will be closed after message expiration
idleInterval :: Int64
idleInterval :: Int64,
-- expire state backup files
expireBefore :: UTCTime,
keepMinBackups :: Int
}
data JournalQueue = JournalQueue
@@ -265,7 +269,7 @@ instance MsgStoreClass JournalMsgStore where
r' <- case strDecode $ B.pack queueId of
Right rId ->
getQueue ms SRecipient rId >>= \case
Right q -> unStoreIO (getMsgQueue ms q) *> action q <* closeMsgQueue q
Right q -> unStoreIO (getMsgQueue ms q False) *> action q <* closeMsgQueue q
Left AUTH -> do
logWarn $ "STORE: processQueue, queue " <> T.pack queueId <> " was removed, removing " <> T.pack dir
removeQueueDirectory_ dir
@@ -307,15 +311,15 @@ instance MsgStoreClass JournalMsgStore where
queueRec' = queueRec
{-# INLINE queueRec' #-}
getMsgQueue :: JournalMsgStore -> JournalQueue -> StoreIO JournalMsgQueue
getMsgQueue ms@JournalMsgStore {random} JournalQueue {recipientId = rId, msgQueue_} =
getMsgQueue :: JournalMsgStore -> JournalQueue -> Bool -> StoreIO JournalMsgQueue
getMsgQueue ms@JournalMsgStore {random} JournalQueue {recipientId = rId, msgQueue_} forWrite =
StoreIO $ readTVarIO msgQueue_ >>= maybe newQ pure
where
newQ = do
let dir = msgQueueDirectory ms rId
statePath = msgQueueStatePath dir $ B.unpack (strEncode rId)
queue = JMQueue {queueDirectory = dir, statePath}
q <- ifM (doesDirectoryExist dir) (openMsgQueue ms queue) (createQ queue)
q <- ifM (doesDirectoryExist dir) (openMsgQueue ms queue forWrite) (createQ queue)
atomically $ writeTVar msgQueue_ $ Just q
pure q
where
@@ -342,7 +346,7 @@ instance MsgStoreClass JournalMsgStore where
pure r
where
peek = do
mq <- getMsgQueue ms q
mq <- getMsgQueue ms q False
(mq,) <$$> tryPeekMsg_ q mq
-- only runs action if queue is not empty
@@ -390,7 +394,7 @@ instance MsgStoreClass JournalMsgStore where
writeMsg :: JournalMsgStore -> JournalQueue -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool))
writeMsg ms q' logState msg = isolateQueue q' "writeMsg" $ do
q <- getMsgQueue ms q'
q <- getMsgQueue ms q' True
StoreIO $ (`E.finally` updateActiveAt q') $ do
st@MsgQueueState {canWrite, size} <- readTVarIO (state q)
let empty = size == 0
@@ -488,25 +492,41 @@ tryStore op rId a = ExceptT $ E.mask_ $ E.try a >>= either storeErr pure
isolateQueueId :: String -> JournalMsgStore -> RecipientId -> IO (Either ErrorType a) -> ExceptT ErrorType IO a
isolateQueueId op ms rId = tryStore op rId . withLockMap (queueLocks ms) rId op
openMsgQueue :: JournalMsgStore -> JMQueue -> IO JournalMsgQueue
openMsgQueue ms q@JMQueue {queueDirectory = dir, statePath} = do
openMsgQueue :: JournalMsgStore -> JMQueue -> Bool -> IO JournalMsgQueue
openMsgQueue ms q@JMQueue {queueDirectory = dir, statePath} forWrite = do
(st, sh) <- readWriteQueueState ms statePath
let MsgQueueState {readState = rs, writeState = ws, size} = st
if size == 0
then E.uninterruptibleMask_ $ do
-- If the queue is empty, journals are deleted and state file is closed
-- Journal will be created again if queue is written to.
-- canWrite is set to True.
st' <- newMsgQueueState <$> newJournalId (random ms)
unsafeAppendState sh st'
hClose sh
removeJournalIfExists dir rs
unless (journalId ws == journalId rs) $ removeJournalIfExists dir ws
mkJournalQueue q st' Nothing
removeBackups ms q
if size st == 0
then do
(st', hs_) <- removeJournals st sh
mkJournalQueue q st' hs_
else do
(st', rh, wh_) <- closeOnException sh $ openJournals ms dir st sh
let hs = MsgQueueHandles {stateHandle = sh, readHandle = rh, writeHandle = wh_}
mkJournalQueue q st' (Just hs)
where
-- If the queue is empty, journals are deleted.
-- New journal is created if queue is written to.
-- canWrite is set to True.
removeJournals MsgQueueState {readState = rs, writeState = ws} sh = E.uninterruptibleMask_ $ do
rjId <- newJournalId $ random ms
let st' = newMsgQueueState rjId
hs_ <-
if forWrite
then Just <$> newJournalHandles st' rjId
else Nothing <$ backupQueueState
removeJournalIfExists dir rs
unless (journalId ws == journalId rs) $ removeJournalIfExists dir ws
pure (st', hs_)
where
newJournalHandles st' rjId = do
appendState_ sh st'
rh <- closeOnException sh $ createNewJournal dir rjId
pure MsgQueueHandles {stateHandle = sh, readHandle = rh, writeHandle = Nothing}
backupQueueState = do
hClose sh
ts <- getCurrentTime
renameFile statePath $ stateBackupPath statePath ts
mkJournalQueue :: JMQueue -> MsgQueueState -> Maybe MsgQueueHandles -> IO JournalMsgQueue
mkJournalQueue queue st hs_ = do
@@ -540,11 +560,11 @@ updateQueueState q log' hs st a = do
atomically $ writeTVar (state q) st >> a
appendState :: Handle -> MsgQueueState -> IO ()
appendState h = E.uninterruptibleMask_ . unsafeAppendState h
appendState h = E.uninterruptibleMask_ . appendState_ h
{-# INLINE appendState #-}
unsafeAppendState :: Handle -> MsgQueueState -> IO ()
unsafeAppendState h st = B.hPutStr h $ strEncode st `B.snoc` '\n'
appendState_ :: Handle -> MsgQueueState -> IO ()
appendState_ h st = B.hPutStr h $ strEncode st `B.snoc` '\n'
updateReadPos :: JournalMsgQueue -> Bool -> Int64 -> MsgQueueHandles -> IO ()
updateReadPos q log' len hs = do
@@ -645,16 +665,21 @@ fixFileSize h pos = do
| otherwise -> pure ()
removeJournal :: FilePath -> JournalState t -> IO ()
removeJournal dir JournalState {journalId} = safeRemoveFile $ journalFilePath dir journalId
removeJournal dir JournalState {journalId} =
safeRemoveFile "removeJournal" $ journalFilePath dir journalId
removeJournalIfExists :: FilePath -> JournalState t -> IO ()
removeJournalIfExists dir JournalState {journalId} = do
let path = journalFilePath dir journalId
whenM (doesFileExist path) $ safeRemoveFile path
handleError "removeJournalIfExists" path $
whenM (doesFileExist path) $ removeFile path
safeRemoveFile :: FilePath -> IO ()
safeRemoveFile path =
removeFile path `catchAny` (\e -> logError $ "STORE: removeJournal, " <> T.pack path <> ", " <> tshow e)
safeRemoveFile :: Text -> FilePath -> IO ()
safeRemoveFile cxt path = handleError cxt path $ removeFile path
handleError :: Text -> FilePath -> IO () -> IO ()
handleError cxt path a =
a `catchAny` \e -> logError $ "STORE: " <> cxt <> ", " <> T.pack path <> ", " <> tshow e
-- This function is supposed to be resilient to crashes while updating state files,
-- and also resilient to crashes during its execution.
@@ -703,7 +728,7 @@ readWriteQueueState JournalMsgStore {random, config} statePath =
renameFile statePath tempBackup -- 1) temp backup
r <- writeQueueState st -- 2) save state
ts <- getCurrentTime
renameFile tempBackup (statePath <> "." <> iso8601Show ts <> ".bak") -- 3) timed backup
renameFile tempBackup $ stateBackupPath statePath ts -- 3) timed backup
pure r
writeQueueState st = do
sh <- openFile statePath AppendMode
@@ -718,6 +743,20 @@ readWriteQueueState JournalMsgStore {random, config} statePath =
then IO.hSeek h AbsoluteSeek (size - sz') >> B.hGet h sz
else B.hGet h (fromIntegral size)
removeBackups :: JournalMsgStore -> JMQueue -> IO ()
removeBackups ms JMQueue {queueDirectory = dir, statePath} = do
times <- sort . mapMaybe backupPathTime <$> listDirectory dir
let toDelete = filter (< expireBefore) $ take (length times - keepMinBackups) times
mapM_ (safeRemoveFile "removeBackups" . stateBackupPath statePath) toDelete
where
JournalMsgStore {config = JournalStoreConfig {expireBefore, keepMinBackups}} = ms
backupPathTime :: FilePath -> Maybe UTCTime
backupPathTime = iso8601ParseM . T.unpack <=< T.stripSuffix ".bak" <=< T.stripPrefix statePathPfx . T.pack
statePathPfx = T.pack $ takeFileName statePath <> "."
stateBackupPath :: FilePath -> UTCTime -> FilePath
stateBackupPath statePath ts = statePath <> "." <> iso8601Show ts <> ".bak"
validQueueState :: MsgQueueState -> Bool
validQueueState MsgQueueState {readState = rs, writeState = ws, size}
| journalId rs == journalId ws =
@@ -764,8 +803,7 @@ removeQueueDirectory st = removeQueueDirectory_ . msgQueueDirectory st
removeQueueDirectory_ :: FilePath -> IO ()
removeQueueDirectory_ dir =
removePathForcibly dir `catchAny` \e ->
logError $ "STORE: removeQueueDirectory, " <> T.pack dir <> ", " <> tshow e
handleError "removeQueueDirectory" dir $ removePathForcibly dir
hAppend :: Handle -> Int64 -> ByteString -> IO ()
hAppend h pos s = do

View File

@@ -89,8 +89,8 @@ instance MsgStoreClass STMMsgStore where
queueRec' = queueRec
{-# INLINE queueRec' #-}
getMsgQueue :: STMMsgStore -> STMQueue -> STM STMMsgQueue
getMsgQueue _ STMQueue {msgQueue_} = readTVar msgQueue_ >>= maybe newQ pure
getMsgQueue :: STMMsgStore -> STMQueue -> Bool -> STM STMMsgQueue
getMsgQueue _ STMQueue {msgQueue_} _ = readTVar msgQueue_ >>= maybe newQ pure
where
newQ = do
msgQueue <- newTQueue
@@ -131,7 +131,7 @@ instance MsgStoreClass STMMsgStore where
writeMsg :: STMMsgStore -> STMQueue -> Bool -> Message -> ExceptT ErrorType IO (Maybe (Message, Bool))
writeMsg ms q' _logState msg = liftIO $ atomically $ do
STMMsgQueue {msgQueue = q, canWrite, size} <- getMsgQueue ms q'
STMMsgQueue {msgQueue = q, canWrite, size} <- getMsgQueue ms q' True
canWrt <- readTVar canWrite
empty <- isEmptyTQueue q
if canWrt || empty

View File

@@ -54,7 +54,7 @@ class Monad (StoreMonad s) => MsgStoreClass s where
recipientId' :: StoreQueue s -> RecipientId
queueRec' :: StoreQueue s -> TVar (Maybe QueueRec)
getPeekMsgQueue :: s -> StoreQueue s -> StoreMonad s (Maybe (MsgQueue s, Message))
getMsgQueue :: s -> StoreQueue s -> StoreMonad s (MsgQueue s)
getMsgQueue :: s -> StoreQueue s -> Bool -> StoreMonad s (MsgQueue s)
-- the journal queue will be closed after action if it was initially closed or idle longer than interval in config
withIdleMsgQueue :: Int64 -> s -> StoreQueue s -> (MsgQueue s -> StoreMonad s a) -> StoreMonad s (Maybe a, Int)
@@ -119,7 +119,7 @@ withPeekMsgQueue st q op a = isolateQueue q op $ getPeekMsgQueue st q >>= a
deleteExpiredMsgs :: MsgStoreClass s => s -> StoreQueue s -> Int64 -> ExceptT ErrorType IO Int
deleteExpiredMsgs st q old =
isolateQueue q "deleteExpiredMsgs" $
getMsgQueue st q >>= deleteExpireMsgs_ old q
getMsgQueue st q False >>= deleteExpireMsgs_ old q
-- closed and idle queues will be closed after expiration
-- returns (expired count, queue size after expiration)

View File

@@ -15,6 +15,7 @@
module CoreTests.MsgStoreTests where
import AgentTests.FunctionalAPITests (runRight, runRight_)
import Control.Concurrent (threadDelay)
import Control.Concurrent.STM
import Control.Exception (bracket)
import Control.Monad
@@ -24,8 +25,10 @@ import Crypto.Random (ChaChaDRG)
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import qualified Data.ByteString.Base64.URL as B64
import Data.List (isPrefixOf)
import Data.List (isPrefixOf, isSuffixOf)
import Data.Maybe (fromJust)
import Data.Time.Calendar (fromGregorian)
import Data.Time.Clock (UTCTime (..), addUTCTime, getCurrentTime)
import Data.Time.Clock.System (getSystemTime)
import Simplex.Messaging.Crypto (pattern MaxLenBS)
import qualified Simplex.Messaging.Crypto as C
@@ -59,6 +62,8 @@ msgStoreTests = do
it "should switch to write file when read file missing" testReadFileMissingSwitch
it "should create write file when missing" testWriteFileMissing
it "should create read file when read and write files are missing" testReadAndWriteFilesMissing
describe "Journal message store: queue state backup expiration" $ do
it "should remove old queue state backups" testRemoveQueueStateBackups
where
someMsgStoreTests :: STMStoreClass s => SpecWith s
someMsgStoreTests = do
@@ -80,7 +85,9 @@ testJournalStoreCfg =
maxMsgCount = 4,
maxStateLines = 2,
stateTailSize = 256,
idleInterval = 21600
idleInterval = 21600,
expireBefore = UTCTime (fromGregorian 2025 1 1) 0,
keepMinBackups = 3
}
mkMessage :: MonadIO m => ByteString -> m Message
@@ -328,7 +335,7 @@ testRemoveJournals ms = do
runRight $ do
q <- ExceptT $ addQueue ms rId qr
Just (Message {msgId = mId1}, True) <- write q "message 1"
Just (Message {msgId = mId2}, False) <- write q "message 2"
Just (Message {msgId = mId2}, False) <- write q "message 2"
(Msg "message 1", Msg "message 2") <- tryDelPeekMsg ms q mId1
(Msg "message 2", Nothing) <- tryDelPeekMsg ms q mId2
liftIO $ closeMsgQueue q
@@ -344,7 +351,7 @@ testRemoveJournals ms = do
Nothing <- tryPeekMsg ms q
-- still not removed, queue is empty and not opened
liftIO $ journalFilesCount dir `shouldReturn` 1
_mq <- isolateQueue q "test" $ getMsgQueue ms q
_mq <- isolateQueue q "test" $ getMsgQueue ms q False
-- journal is removed
liftIO $ journalFilesCount dir `shouldReturn` 0
Just (Message {msgId = mId3}, True) <- write q "message 3"
@@ -372,11 +379,48 @@ testRemoveJournals ms = do
journalFilesCount dir `shouldReturn` 1
runRight $ do
q <- ExceptT $ getQueue ms SRecipient rId
_mq <- isolateQueue q "test" $ getMsgQueue ms q
liftIO $ journalFilesCount dir `shouldReturn` 0
Just (Message {}, True) <- write q "message 8"
liftIO $ journalFilesCount dir `shouldReturn` 1
liftIO $ closeMsgQueue q
where
journalFilesCount dir = length . filter ("messages." `isPrefixOf`) <$> listDirectory dir
testRemoveQueueStateBackups :: IO ()
testRemoveQueueStateBackups = do
g <- C.newRandom
(rId, qr) <- testNewQueueRec g True
expireBefore <- addUTCTime 1 <$> getCurrentTime -- expire all backups created withing one second
let cfg = testJournalStoreCfg {maxStateLines = 1, expireBefore, keepMinBackups = 0}
ms <- newMsgStore cfg
let dir = msgQueueDirectory ms rId
write q s = writeMsg ms q True =<< mkMessage s
runRight $ do
q <- ExceptT $ addQueue ms rId qr
Just (Message {msgId = mId1}, True) <- write q "message 1"
Just (Message {msgId = mId2}, False) <- write q "message 2"
(Msg "message 1", Msg "message 2") <- tryDelPeekMsg ms q mId1
(Msg "message 2", Nothing) <- tryDelPeekMsg ms q mId2
liftIO $ closeMsgQueue q
liftIO $ stateBackupCount dir `shouldReturn` 0
q1 <- ExceptT $ getQueue ms SRecipient rId
Just (Message {}, True) <- write q1 "message 3"
Just (Message {}, False) <- write q1 "message 4"
liftIO $ closeMsgQueue q1
liftIO $ stateBackupCount dir `shouldReturn` 0
liftIO $ threadDelay 1000000
q2 <- ExceptT $ getQueue ms SRecipient rId
Just (Message {}, False) <- write q2 "message 5"
Nothing <- write q2 "message 5"
liftIO $ closeMsgQueue q2
liftIO $ stateBackupCount dir `shouldReturn` 1
where
stateBackupCount dir = length . filter (".bak" `isSuffixOf`) <$> listDirectory dir
testReadFileMissing :: JournalMsgStore -> IO ()
testReadFileMissing ms = do
g <- C.newRandom