mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-04-25 12:02:18 +00:00
smp server: update message counts during message expiration, increase idle interval (#1404)
* smp server: update message counts during message expiration, increase idle interval * version * fix * flip results * version
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
cabal-version: 1.12
|
||||
|
||||
name: simplexmq
|
||||
version: 6.2.0.2
|
||||
version: 6.2.0.3
|
||||
synopsis: SimpleXMQ message broker
|
||||
description: This package includes <./docs/Simplex-Messaging-Server.html server>,
|
||||
<./docs/Simplex-Messaging-Client.html client> and
|
||||
|
||||
@@ -163,11 +163,11 @@ smpServer :: TMVar Bool -> ServerConfig -> Maybe AttachHTTP -> M ()
|
||||
smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHTTP_ = do
|
||||
s <- asks server
|
||||
pa <- asks proxyAgent
|
||||
msgStats <- processServerMessages
|
||||
msgStats_ <- processServerMessages
|
||||
ntfStats <- restoreServerNtfs
|
||||
liftIO $ printMessageStats "messages" msgStats
|
||||
liftIO $ mapM_ (printMessageStats "messages") msgStats_
|
||||
liftIO $ printMessageStats "notifications" ntfStats
|
||||
restoreServerStats msgStats ntfStats
|
||||
restoreServerStats msgStats_ ntfStats
|
||||
raceAny_
|
||||
( serverThread s "server subscribedQ" subscribedQ subscribers subClients pendingSubEvents subscriptions cancelSub
|
||||
: serverThread s "server ntfSubscribedQ" ntfSubscribedQ Env.notifiers ntfSubClients pendingNtfSubEvents ntfSubscriptions (\_ -> pure ())
|
||||
@@ -385,12 +385,15 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
|
||||
threadDelay' interval
|
||||
old <- expireBeforeEpoch expCfg
|
||||
now <- systemSeconds <$> getSystemTime
|
||||
Sum deleted <- withActiveMsgQueues ms $ expireQueueMsgs now ms old
|
||||
atomicModifyIORef'_ (msgExpired stats) (+ deleted)
|
||||
logInfo $ "STORE: expireMessagesThread, expired " <> tshow deleted <> " messages"
|
||||
msgStats@MessageStats {storedMsgsCount = stored, expiredMsgsCount = expired} <-
|
||||
withActiveMsgQueues ms $ expireQueueMsgs now ms old
|
||||
atomicWriteIORef (msgCount stats) stored
|
||||
atomicModifyIORef'_ (msgExpired stats) (+ expired)
|
||||
printMessageStats "STORE: messages" msgStats
|
||||
where
|
||||
expireQueueMsgs now ms old rId q =
|
||||
either (const 0) Sum <$> runExceptT (idleDeleteExpiredMsgs now ms rId q old)
|
||||
expireQueueMsgs now ms old rId q = fmap (fromRight newMessageStats) . runExceptT $ do
|
||||
(expired_, stored) <- idleDeleteExpiredMsgs now ms rId q old
|
||||
pure MessageStats {storedMsgsCount = stored, expiredMsgsCount = fromMaybe 0 expired_, storedQueues = 1}
|
||||
|
||||
expireNtfsThread :: ServerConfig -> M ()
|
||||
expireNtfsThread ServerConfig {notificationExpiration = expCfg} = do
|
||||
@@ -1731,26 +1734,26 @@ exportMessages tty ms f drainMsgs = do
|
||||
exitFailure
|
||||
encodeMessages rId = mconcat . map (\msg -> BLD.byteString (strEncode $ MLRv3 rId msg) <> BLD.char8 '\n')
|
||||
|
||||
processServerMessages :: M MessageStats
|
||||
processServerMessages :: M (Maybe MessageStats)
|
||||
processServerMessages = do
|
||||
old_ <- asks (messageExpiration . config) $>>= (liftIO . fmap Just . expireBeforeEpoch)
|
||||
expire <- asks $ expireMessagesOnStart . config
|
||||
asks msgStore >>= liftIO . processMessages old_ expire
|
||||
where
|
||||
processMessages :: Maybe Int64 -> Bool -> AMsgStore -> IO MessageStats
|
||||
processMessages :: Maybe Int64 -> Bool -> AMsgStore -> IO (Maybe MessageStats)
|
||||
processMessages old_ expire = \case
|
||||
AMS SMSMemory ms@STMMsgStore {storeConfig = STMStoreConfig {storePath}} -> case storePath of
|
||||
Just f -> ifM (doesFileExist f) (importMessages False ms f old_) (pure newMessageStats)
|
||||
Nothing -> pure newMessageStats
|
||||
Just f -> ifM (doesFileExist f) (Just <$> importMessages False ms f old_) (pure Nothing)
|
||||
Nothing -> pure Nothing
|
||||
AMS SMSJournal ms
|
||||
| expire -> case old_ of
|
||||
| expire -> Just <$> case old_ of
|
||||
Just old -> do
|
||||
logInfo "expiring journal store messages..."
|
||||
withAllMsgQueues False ms $ processExpireQueue old
|
||||
Nothing -> do
|
||||
logInfo "validating journal store messages..."
|
||||
withAllMsgQueues False ms $ processValidateQueue
|
||||
| otherwise -> logWarn "skipping message expiration" $> newMessageStats
|
||||
| otherwise -> logWarn "skipping message expiration" $> Nothing
|
||||
where
|
||||
processExpireQueue old rId q =
|
||||
runExceptT expireQueue >>= \case
|
||||
@@ -1887,8 +1890,8 @@ saveServerStats =
|
||||
B.writeFile f $ strEncode stats
|
||||
logInfo "server stats saved"
|
||||
|
||||
restoreServerStats :: MessageStats -> MessageStats -> M ()
|
||||
restoreServerStats msgStats ntfStats = asks (serverStatsBackupFile . config) >>= mapM_ restoreStats
|
||||
restoreServerStats :: Maybe MessageStats -> MessageStats -> M ()
|
||||
restoreServerStats msgStats_ ntfStats = asks (serverStatsBackupFile . config) >>= mapM_ restoreStats
|
||||
where
|
||||
restoreStats f = whenM (doesFileExist f) $ do
|
||||
logInfo $ "restoring server stats from file " <> T.pack f
|
||||
@@ -1897,9 +1900,11 @@ restoreServerStats msgStats ntfStats = asks (serverStatsBackupFile . config) >>=
|
||||
s <- asks serverStats
|
||||
AMS _ st <- asks msgStore
|
||||
_qCount <- M.size <$> readTVarIO (activeMsgQueues st)
|
||||
let _msgCount = storedMsgsCount msgStats
|
||||
let _msgCount = maybe statsMsgCount storedMsgsCount msgStats_
|
||||
_ntfCount = storedMsgsCount ntfStats
|
||||
liftIO $ setServerStats s d {_qCount, _msgCount, _ntfCount, _msgExpired = _msgExpired d + expiredMsgsCount msgStats, _msgNtfExpired = _msgNtfExpired d + expiredMsgsCount ntfStats}
|
||||
_msgExpired' = _msgExpired d + maybe 0 expiredMsgsCount msgStats_
|
||||
_msgNtfExpired' = _msgNtfExpired d + expiredMsgsCount ntfStats
|
||||
liftIO $ setServerStats s d {_qCount, _msgCount, _ntfCount, _msgExpired = _msgExpired', _msgNtfExpired = _msgNtfExpired'}
|
||||
renameFile f $ f <> ".bak"
|
||||
logInfo "server stats restored"
|
||||
compareCounts "Queue" statsQCount _qCount
|
||||
|
||||
@@ -80,6 +80,8 @@ data ServerConfig = ServerConfig
|
||||
-- | time after which the messages can be removed from the queues and check interval, seconds
|
||||
messageExpiration :: Maybe ExpirationConfig,
|
||||
expireMessagesOnStart :: Bool,
|
||||
-- | interval of inactivity after which journal queue is closed
|
||||
idleQueueInterval :: Int64,
|
||||
-- | notification expiration interval (seconds)
|
||||
notificationExpiration :: ExpirationConfig,
|
||||
-- | time after which the socket with inactive client can be disconnected (without any messages or commands, incl. PING),
|
||||
@@ -121,9 +123,12 @@ defaultMessageExpiration :: ExpirationConfig
|
||||
defaultMessageExpiration =
|
||||
ExpirationConfig
|
||||
{ ttl = defMsgExpirationDays * 86400, -- seconds
|
||||
checkInterval = 21600 -- seconds, 6 hours
|
||||
checkInterval = 14400 -- seconds, 4 hours
|
||||
}
|
||||
|
||||
defaultIdleQueueInterval :: Int64
|
||||
defaultIdleQueueInterval = 28800 -- seconds, 8 hours
|
||||
|
||||
defNtfExpirationHours :: Int64
|
||||
defNtfExpirationHours = 24
|
||||
|
||||
@@ -283,15 +288,14 @@ newProhibitedSub = do
|
||||
return Sub {subThread = ProhibitSub, delivered}
|
||||
|
||||
newEnv :: ServerConfig -> IO Env
|
||||
newEnv config@ServerConfig {smpCredentials, httpCredentials, storeLogFile, msgStoreType, storeMsgsFile, smpAgentCfg, information, messageExpiration, msgQueueQuota, maxJournalMsgCount, maxJournalStateLines} = do
|
||||
newEnv config@ServerConfig {smpCredentials, httpCredentials, storeLogFile, msgStoreType, storeMsgsFile, smpAgentCfg, information, messageExpiration, idleQueueInterval, msgQueueQuota, maxJournalMsgCount, maxJournalStateLines} = do
|
||||
serverActive <- newTVarIO True
|
||||
server <- newServer
|
||||
msgStore@(AMS _ store) <- case msgStoreType of
|
||||
AMSType SMSMemory -> AMS SMSMemory <$> newMsgStore STMStoreConfig {storePath = storeMsgsFile, quota = msgQueueQuota}
|
||||
AMSType SMSJournal -> case storeMsgsFile of
|
||||
Just storePath ->
|
||||
let idleInterval = maybe maxBound checkInterval messageExpiration
|
||||
cfg = JournalStoreConfig {storePath, quota = msgQueueQuota, pathParts = journalMsgStoreDepth, maxMsgCount = maxJournalMsgCount, maxStateLines = maxJournalStateLines, stateTailSize = defaultStateTailSize, idleInterval}
|
||||
let cfg = JournalStoreConfig {storePath, quota = msgQueueQuota, pathParts = journalMsgStoreDepth, maxMsgCount = maxJournalMsgCount, maxStateLines = maxJournalStateLines, stateTailSize = defaultStateTailSize, idleInterval = idleQueueInterval}
|
||||
in AMS SMSJournal <$> newMsgStore cfg
|
||||
Nothing -> putStrLn "Error: journal msg store require path in [STORE_LOG], restore_messages" >> exitFailure
|
||||
ntfStore <- NtfStore <$> TM.emptyIO
|
||||
|
||||
@@ -416,6 +416,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
|
||||
{ ttl = 86400 * readIniDefault defMsgExpirationDays "STORE_LOG" "expire_messages_days" ini
|
||||
},
|
||||
expireMessagesOnStart = fromMaybe True $ iniOnOff "STORE_LOG" "expire_messages_on_start" ini,
|
||||
idleQueueInterval = defaultIdleQueueInterval,
|
||||
notificationExpiration =
|
||||
defaultNtfExpiration
|
||||
{ ttl = 3600 * readIniDefault defNtfExpirationHours "STORE_LOG" "expire_ntfs_hours" ini
|
||||
|
||||
@@ -332,20 +332,21 @@ instance MsgStoreClass JournalMsgStore where
|
||||
journalId <- newJournalId random
|
||||
mkJournalQueue queue (newMsgQueueState journalId) Nothing
|
||||
|
||||
withIdleMsgQueue :: Int64 -> JournalMsgStore -> RecipientId -> JournalQueue -> (JournalMsgQueue -> StoreIO a) -> StoreIO (Maybe a)
|
||||
withIdleMsgQueue :: Int64 -> JournalMsgStore -> RecipientId -> JournalQueue -> (JournalMsgQueue -> StoreIO a) -> StoreIO (Maybe a, Int)
|
||||
withIdleMsgQueue now ms@JournalMsgStore {config} rId q action =
|
||||
StoreIO $ readTVarIO (msgQueue_ q) >>= \case
|
||||
Nothing ->
|
||||
Just <$>
|
||||
E.bracket
|
||||
(unStoreIO $ getMsgQueue ms rId q)
|
||||
(\_ -> closeMsgQueue q)
|
||||
(unStoreIO . action)
|
||||
E.bracket (unStoreIO $ getMsgQueue ms rId q) (\_ -> closeMsgQueue q) $ \mq -> unStoreIO $ do
|
||||
r <- action mq
|
||||
sz <- getQueueSize_ mq
|
||||
pure (Just r, sz)
|
||||
Just mq -> do
|
||||
ts <- readTVarIO $ activeAt q
|
||||
if now - ts >= idleInterval config
|
||||
r <- if now - ts >= idleInterval config
|
||||
then Just <$> unStoreIO (action mq) `E.finally` closeMsgQueue q
|
||||
else pure Nothing
|
||||
sz <- unStoreIO $ getQueueSize_ mq
|
||||
pure (r, sz)
|
||||
|
||||
deleteQueue :: JournalMsgStore -> RecipientId -> JournalQueue -> IO (Either ErrorType QueueRec)
|
||||
deleteQueue ms rId q =
|
||||
|
||||
@@ -110,9 +110,13 @@ instance MsgStoreClass STMMsgStore where
|
||||
pure q
|
||||
|
||||
-- does not create queue if it does not exist, does not delete it if it does (can't just close in-memory queue)
|
||||
withIdleMsgQueue :: Int64 -> STMMsgStore -> RecipientId -> STMQueue -> (STMMsgQueue -> STM a) -> STM (Maybe a)
|
||||
withIdleMsgQueue _ _ _ STMQueue {msgQueue_} action = readTVar msgQueue_ >>= mapM action
|
||||
{-# INLINE withIdleMsgQueue #-}
|
||||
withIdleMsgQueue :: Int64 -> STMMsgStore -> RecipientId -> STMQueue -> (STMMsgQueue -> STM a) -> STM (Maybe a, Int)
|
||||
withIdleMsgQueue _ _ _ STMQueue {msgQueue_} action = readTVar msgQueue_ >>= \case
|
||||
Just q -> do
|
||||
r <- action q
|
||||
sz <- getQueueSize_ q
|
||||
pure (Just r, sz)
|
||||
Nothing -> pure (Nothing, 0)
|
||||
|
||||
deleteQueue :: STMMsgStore -> RecipientId -> STMQueue -> IO (Either ErrorType QueueRec)
|
||||
deleteQueue ms rId q = fst <$$> deleteQueue' ms rId q
|
||||
|
||||
@@ -15,7 +15,6 @@ import Control.Monad (foldM)
|
||||
import Control.Monad.Trans.Except
|
||||
import Data.Int (Int64)
|
||||
import Data.Kind
|
||||
import Data.Maybe (fromMaybe)
|
||||
import qualified Data.Map.Strict as M
|
||||
import Data.Time.Clock.System (SystemTime (systemSeconds))
|
||||
import Simplex.Messaging.Protocol
|
||||
@@ -47,7 +46,7 @@ class Monad (StoreMonad s) => MsgStoreClass s where
|
||||
queueRec' :: StoreQueue s -> TVar (Maybe QueueRec)
|
||||
getMsgQueue :: s -> RecipientId -> StoreQueue s -> StoreMonad s (MsgQueue s)
|
||||
-- the journal queue will be closed after action if it was initially closed or idle longer than interval in config
|
||||
withIdleMsgQueue :: Int64 -> s -> RecipientId -> StoreQueue s -> (MsgQueue s -> StoreMonad s a) -> StoreMonad s (Maybe a)
|
||||
withIdleMsgQueue :: Int64 -> s -> RecipientId -> StoreQueue s -> (MsgQueue s -> StoreMonad s a) -> StoreMonad s (Maybe a, Int)
|
||||
deleteQueue :: s -> RecipientId -> StoreQueue s -> IO (Either ErrorType QueueRec)
|
||||
deleteQueueSize :: s -> RecipientId -> StoreQueue s -> IO (Either ErrorType (QueueRec, Int))
|
||||
getQueueMessages_ :: Bool -> MsgQueue s -> StoreMonad s [Message]
|
||||
@@ -114,10 +113,11 @@ deleteExpiredMsgs st rId q old =
|
||||
getMsgQueue st rId q >>= deleteExpireMsgs_ old q
|
||||
|
||||
-- closed and idle queues will be closed after expiration
|
||||
idleDeleteExpiredMsgs :: MsgStoreClass s => Int64 -> s -> RecipientId -> StoreQueue s -> Int64 -> ExceptT ErrorType IO Int
|
||||
-- returns (expired count, queue size after expiration)
|
||||
idleDeleteExpiredMsgs :: MsgStoreClass s => Int64 -> s -> RecipientId -> StoreQueue s -> Int64 -> ExceptT ErrorType IO (Maybe Int, Int)
|
||||
idleDeleteExpiredMsgs now st rId q old =
|
||||
isolateQueue rId q "idleDeleteExpiredMsgs" $
|
||||
fromMaybe 0 <$> withIdleMsgQueue now st rId q (deleteExpireMsgs_ old q)
|
||||
isolateQueue rId q "idleDeleteExpiredMsgs" $
|
||||
withIdleMsgQueue now st rId q (deleteExpireMsgs_ old q)
|
||||
|
||||
deleteExpireMsgs_ :: MsgStoreClass s => Int64 -> StoreQueue s -> MsgQueue s -> StoreMonad s Int
|
||||
deleteExpireMsgs_ old q mq = do
|
||||
|
||||
@@ -133,6 +133,7 @@ cfgMS msType =
|
||||
controlPortAdminAuth = Nothing,
|
||||
messageExpiration = Just defaultMessageExpiration,
|
||||
expireMessagesOnStart = True,
|
||||
idleQueueInterval = defaultIdleQueueInterval,
|
||||
notificationExpiration = defaultNtfExpiration,
|
||||
inactiveClientExpiration = Just defaultInactiveClientExpiration,
|
||||
logStatsInterval = Nothing,
|
||||
|
||||
@@ -972,7 +972,7 @@ testMsgExpireOnInterval =
|
||||
xit' "should expire messages that are not received before messageTTL after expiry interval" $ \(ATransport (t :: TProxy c), msType) -> do
|
||||
g <- C.newRandom
|
||||
(sPub, sKey) <- atomically $ C.generateAuthKeyPair C.SEd25519 g
|
||||
let cfg' = (cfgMS msType) {messageExpiration = Just ExpirationConfig {ttl = 1, checkInterval = 1}}
|
||||
let cfg' = (cfgMS msType) {messageExpiration = Just ExpirationConfig {ttl = 1, checkInterval = 1}, idleQueueInterval = 1}
|
||||
withSmpServerConfigOn (ATransport t) cfg' testPort $ \_ ->
|
||||
testSMPClient @c $ \sh -> do
|
||||
(sId, rId, rKey, _) <- testSMPClient @c $ \rh -> createAndSecureQueue rh sPub
|
||||
|
||||
Reference in New Issue
Block a user