mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-06-07 11:02:05 +00:00
use interval in config
This commit is contained in:
@@ -25,7 +25,7 @@ import Data.List (intercalate)
|
||||
import Data.List.NonEmpty (NonEmpty)
|
||||
import Data.Maybe (isJust, isNothing)
|
||||
import qualified Data.Text as T
|
||||
import Data.Time.Clock (addUTCTime, getCurrentTime, nominalDay)
|
||||
import Data.Time.Clock (getCurrentTime, nominalDay)
|
||||
import Data.Time.Clock.System (SystemTime)
|
||||
import qualified Data.X509 as X
|
||||
import Data.X509.Validation (Fingerprint (..))
|
||||
@@ -297,9 +297,9 @@ newEnv config@ServerConfig {smpCredentials, httpCredentials, storeLogFile, msgSt
|
||||
msgStore@(AMS _ store) <- case msgStoreType of
|
||||
AMSType SMSMemory -> AMS SMSMemory <$> newMsgStore STMStoreConfig {storePath = storeMsgsFile, quota = msgQueueQuota}
|
||||
AMSType SMSJournal -> case storeMsgsFile of
|
||||
Just storePath -> do
|
||||
cfg <- mkJournalStoreConfig storePath msgQueueQuota maxJournalMsgCount maxJournalStateLines idleQueueInterval
|
||||
AMS SMSJournal <$> newMsgStore cfg
|
||||
Just storePath ->
|
||||
let cfg = mkJournalStoreConfig storePath msgQueueQuota maxJournalMsgCount maxJournalStateLines idleQueueInterval
|
||||
in AMS SMSJournal <$> newMsgStore cfg
|
||||
Nothing -> putStrLn "Error: journal msg store require path in [STORE_LOG], restore_messages" >> exitFailure
|
||||
ntfStore <- NtfStore <$> TM.emptyIO
|
||||
random <- C.newRandom
|
||||
@@ -357,21 +357,19 @@ newEnv config@ServerConfig {smpCredentials, httpCredentials, storeLogFile, msgSt
|
||||
| isJust storeMsgsFile = SPMMessages
|
||||
| otherwise = SPMQueues
|
||||
|
||||
mkJournalStoreConfig :: FilePath -> Int -> Int -> Int -> Int64 -> IO JournalStoreConfig
|
||||
mkJournalStoreConfig storePath msgQueueQuota maxJournalMsgCount maxJournalStateLines idleQueueInterval = do
|
||||
expireBefore <- addUTCTime (-14 * nominalDay) <$> getCurrentTime
|
||||
pure
|
||||
JournalStoreConfig
|
||||
{ storePath,
|
||||
quota = msgQueueQuota,
|
||||
pathParts = journalMsgStoreDepth,
|
||||
maxMsgCount = maxJournalMsgCount,
|
||||
maxStateLines = maxJournalStateLines,
|
||||
stateTailSize = defaultStateTailSize,
|
||||
idleInterval = idleQueueInterval,
|
||||
expireBefore,
|
||||
keepMinBackups = 2
|
||||
}
|
||||
mkJournalStoreConfig :: FilePath -> Int -> Int -> Int -> Int64 -> JournalStoreConfig
|
||||
mkJournalStoreConfig storePath msgQueueQuota maxJournalMsgCount maxJournalStateLines idleQueueInterval =
|
||||
JournalStoreConfig
|
||||
{ storePath,
|
||||
quota = msgQueueQuota,
|
||||
pathParts = journalMsgStoreDepth,
|
||||
maxMsgCount = maxJournalMsgCount,
|
||||
maxStateLines = maxJournalStateLines,
|
||||
stateTailSize = defaultStateTailSize,
|
||||
idleInterval = idleQueueInterval,
|
||||
expireBackupsAfter = 14 * nominalDay,
|
||||
keepMinBackups = 2
|
||||
}
|
||||
|
||||
newSMPProxyAgent :: SMPClientAgentConfig -> TVar ChaChaDRG -> IO ProxyAgent
|
||||
newSMPProxyAgent smpAgentCfg random = do
|
||||
|
||||
@@ -146,9 +146,9 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
|
||||
doesFileExist iniFile >>= \case
|
||||
True -> readIniFile iniFile >>= either exitError a
|
||||
_ -> exitError $ "Error: server is not initialized (" <> iniFile <> " does not exist).\nRun `" <> executableName <> " init`."
|
||||
newJournalMsgStore = do
|
||||
cfg <- mkJournalStoreConfig storeMsgsJournalDir defaultMsgQueueQuota defaultMaxJournalMsgCount defaultMaxJournalStateLines $ checkInterval defaultMessageExpiration
|
||||
newMsgStore cfg
|
||||
newJournalMsgStore =
|
||||
let cfg = mkJournalStoreConfig storeMsgsJournalDir defaultMsgQueueQuota defaultMaxJournalMsgCount defaultMaxJournalStateLines $ checkInterval defaultMessageExpiration
|
||||
in newMsgStore cfg
|
||||
iniFile = combine cfgPath "smp-server.ini"
|
||||
serverVersion = "SMP server v" <> simplexMQVersion
|
||||
defaultServerPorts = "5223,443"
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
{-# LANGUAGE TupleSections #-}
|
||||
|
||||
module Simplex.Messaging.Server.MsgStore.Journal
|
||||
( JournalMsgStore (queueStore, random),
|
||||
( JournalMsgStore (queueStore, random, expireBackupsBefore),
|
||||
JournalQueue,
|
||||
JournalMsgQueue (queue, state),
|
||||
JMQueue (queueDirectory, statePath),
|
||||
@@ -52,7 +52,7 @@ import Data.List (intercalate, sort)
|
||||
import Data.Maybe (catMaybes, fromMaybe, isNothing, mapMaybe)
|
||||
import Data.Text (Text)
|
||||
import qualified Data.Text as T
|
||||
import Data.Time.Clock (UTCTime, getCurrentTime)
|
||||
import Data.Time.Clock (NominalDiffTime, UTCTime, addUTCTime, getCurrentTime)
|
||||
import Data.Time.Clock.System (SystemTime (..), getSystemTime)
|
||||
import Data.Time.Format.ISO8601 (iso8601Show, iso8601ParseM)
|
||||
import GHC.IO (catchAny)
|
||||
@@ -78,7 +78,8 @@ data JournalMsgStore = JournalMsgStore
|
||||
{ config :: JournalStoreConfig,
|
||||
random :: TVar StdGen,
|
||||
queueLocks :: TMap RecipientId Lock,
|
||||
queueStore :: STMQueueStore JournalQueue
|
||||
queueStore :: STMQueueStore JournalQueue,
|
||||
expireBackupsBefore :: UTCTime
|
||||
}
|
||||
|
||||
data JournalStoreConfig = JournalStoreConfig
|
||||
@@ -94,7 +95,7 @@ data JournalStoreConfig = JournalStoreConfig
|
||||
-- time in seconds after which the queue will be closed after message expiration
|
||||
idleInterval :: Int64,
|
||||
-- expire state backup files
|
||||
expireBefore :: UTCTime,
|
||||
expireBackupsAfter :: NominalDiffTime,
|
||||
keepMinBackups :: Int
|
||||
}
|
||||
|
||||
@@ -242,7 +243,8 @@ instance MsgStoreClass JournalMsgStore where
|
||||
random <- newTVarIO =<< newStdGen
|
||||
queueLocks <- TM.emptyIO
|
||||
queueStore <- newQueueStore
|
||||
pure JournalMsgStore {config, random, queueLocks, queueStore}
|
||||
expireBackupsBefore <- addUTCTime (- expireBackupsAfter config) <$> getCurrentTime
|
||||
pure JournalMsgStore {config, random, queueLocks, queueStore, expireBackupsBefore}
|
||||
|
||||
setStoreLog :: JournalMsgStore -> StoreLog 'WriteMode -> IO ()
|
||||
setStoreLog st sl = atomically $ writeTVar (storeLog $ queueStore st) (Just sl)
|
||||
@@ -548,10 +550,9 @@ openMsgQueue ms@JournalMsgStore {config} q@JMQueue {queueDirectory = dir, stateP
|
||||
pure sh
|
||||
removeBackups = do
|
||||
times <- sort . mapMaybe backupPathTime <$> listDirectory dir
|
||||
let toDelete = filter (< expireBefore) $ take (length times - keepMinBackups) times
|
||||
let toDelete = filter (< expireBackupsBefore ms) $ take (length times - keepMinBackups config) times
|
||||
mapM_ (safeRemoveFile "removeBackups" . stateBackupPath statePath) toDelete
|
||||
where
|
||||
JournalStoreConfig {expireBefore, keepMinBackups} = config
|
||||
backupPathTime :: FilePath -> Maybe UTCTime
|
||||
backupPathTime = iso8601ParseM . T.unpack <=< T.stripSuffix ".bak" <=< T.stripPrefix statePathPfx . T.pack
|
||||
statePathPfx = T.pack $ takeFileName statePath <> "."
|
||||
|
||||
@@ -27,8 +27,7 @@ import qualified Data.ByteString.Char8 as B
|
||||
import qualified Data.ByteString.Base64.URL as B64
|
||||
import Data.List (isPrefixOf, isSuffixOf)
|
||||
import Data.Maybe (fromJust)
|
||||
import Data.Time.Calendar (fromGregorian)
|
||||
import Data.Time.Clock (UTCTime (..), addUTCTime, getCurrentTime)
|
||||
import Data.Time.Clock (addUTCTime, nominalDay)
|
||||
import Data.Time.Clock.System (getSystemTime)
|
||||
import Simplex.Messaging.Crypto (pattern MaxLenBS)
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
@@ -86,7 +85,7 @@ testJournalStoreCfg =
|
||||
maxStateLines = 2,
|
||||
stateTailSize = 256,
|
||||
idleInterval = 21600,
|
||||
expireBefore = UTCTime (fromGregorian 2025 1 1) 0,
|
||||
expireBackupsAfter = nominalDay,
|
||||
keepMinBackups = 3
|
||||
}
|
||||
|
||||
@@ -283,7 +282,7 @@ testQueueState ms = do
|
||||
removeOtherFiles dir statePath
|
||||
length . lines <$> readFile statePath `shouldReturn` 3
|
||||
corruptFile statePath
|
||||
readQueueState ms statePath `shouldReturn` (Just state1, False)
|
||||
readQueueState ms statePath `shouldReturn` (Just state1, True)
|
||||
length <$> listDirectory dir `shouldReturn` 1
|
||||
length . lines <$> readFile statePath `shouldReturn` 3
|
||||
where
|
||||
@@ -400,9 +399,9 @@ testRemoveQueueStateBackups = do
|
||||
g <- C.newRandom
|
||||
(rId, qr) <- testNewQueueRec g True
|
||||
|
||||
expireBefore <- addUTCTime 1 <$> getCurrentTime -- expire all backups created withing one second
|
||||
let cfg = testJournalStoreCfg {maxStateLines = 1, expireBefore, keepMinBackups = 0}
|
||||
ms <- newMsgStore cfg
|
||||
ms' <- newMsgStore testJournalStoreCfg {maxStateLines = 1, expireBackupsAfter = 0, keepMinBackups = 0}
|
||||
-- set expiration time 1 second ahead
|
||||
let ms = ms' {expireBackupsBefore = addUTCTime 1 $ expireBackupsBefore ms'}
|
||||
|
||||
let dir = msgQueueDirectory ms rId
|
||||
write q s = writeMsg ms q True =<< mkMessage s
|
||||
|
||||
Reference in New Issue
Block a user