mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-29 20:24:15 +00:00
smp server: reduce queue expiration/idle intervals, skip expiring very old queues (#1488)
* smp server: reduce idle queue interval and queue expiration interval * only expire recent queues (not very old) * fix * version
This commit is contained in:
@@ -147,23 +147,6 @@ runSMPServerBlocking started cfg attachHTTP_ = newEnv cfg >>= runReaderT (smpSer
|
||||
type M a = ReaderT Env IO a
|
||||
type AttachHTTP = Socket -> TLS.Context -> IO ()
|
||||
|
||||
data MessageStats = MessageStats
|
||||
{ storedMsgsCount :: Int,
|
||||
expiredMsgsCount :: Int,
|
||||
storedQueues :: Int
|
||||
}
|
||||
|
||||
instance Monoid MessageStats where
|
||||
mempty = MessageStats 0 0 0
|
||||
{-# INLINE mempty #-}
|
||||
|
||||
instance Semigroup MessageStats where
|
||||
MessageStats a b c <> MessageStats x y z = MessageStats (a + x) (b + y) (c + z)
|
||||
{-# INLINE (<>) #-}
|
||||
|
||||
newMessageStats :: MessageStats
|
||||
newMessageStats = MessageStats 0 0 0
|
||||
|
||||
smpServer :: TMVar Bool -> ServerConfig -> Maybe AttachHTTP -> M ()
|
||||
smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOptions} attachHTTP_ = do
|
||||
s <- asks server
|
||||
@@ -389,9 +372,9 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
|
||||
expireMessagesThread_ _ = []
|
||||
|
||||
expireMessagesThread :: ExpirationConfig -> M ()
|
||||
expireMessagesThread expCfg = do
|
||||
expireMessagesThread ExpirationConfig {checkInterval, ttl} = do
|
||||
AMS _ _ ms <- asks msgStore
|
||||
let interval = checkInterval expCfg * 1000000
|
||||
let interval = checkInterval * 1000000
|
||||
stats <- asks serverStats
|
||||
labelMyThread "expireMessagesThread"
|
||||
liftIO $ forever $ expire ms stats interval
|
||||
@@ -402,17 +385,13 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
|
||||
logInfo "Started expiring messages..."
|
||||
n <- compactQueues @(StoreQueue s) $ queueStore ms
|
||||
when (n > 0) $ logInfo $ "Removed " <> tshow n <> " old deleted queues from the database."
|
||||
old <- expireBeforeEpoch expCfg
|
||||
now <- systemSeconds <$> getSystemTime
|
||||
tryAny (withAllMsgQueues False "idleDeleteExpiredMsgs" ms $ expireQueueMsgs now ms old) >>= \case
|
||||
tryAny (expireOldMessages False ms now ttl) >>= \case
|
||||
Right msgStats@MessageStats {storedMsgsCount = stored, expiredMsgsCount = expired} -> do
|
||||
atomicWriteIORef (msgCount stats) stored
|
||||
atomicModifyIORef'_ (msgExpired stats) (+ expired)
|
||||
printMessageStats "STORE: messages" msgStats
|
||||
Left e -> logError $ "STORE: withAllMsgQueues, error expiring messages, " <> tshow e
|
||||
expireQueueMsgs now ms old q = do
|
||||
(expired_, stored) <- idleDeleteExpiredMsgs now ms q old
|
||||
pure MessageStats {storedMsgsCount = stored, expiredMsgsCount = fromMaybe 0 expired_, storedQueues = 1}
|
||||
|
||||
expireNtfsThread :: ServerConfig -> M ()
|
||||
expireNtfsThread ServerConfig {notificationExpiration = expCfg} = do
|
||||
|
||||
@@ -147,11 +147,11 @@ defaultMessageExpiration :: ExpirationConfig
|
||||
defaultMessageExpiration =
|
||||
ExpirationConfig
|
||||
{ ttl = defMsgExpirationDays * 86400, -- seconds
|
||||
checkInterval = 14400 -- seconds, 4 hours
|
||||
checkInterval = 7200 -- seconds, 2 hours
|
||||
}
|
||||
|
||||
defaultIdleQueueInterval :: Int64
|
||||
defaultIdleQueueInterval = 28800 -- seconds, 8 hours
|
||||
defaultIdleQueueInterval = 14400 -- seconds, 4 hours
|
||||
|
||||
defNtfExpirationHours :: Int64
|
||||
defNtfExpirationHours = 24
|
||||
|
||||
@@ -210,7 +210,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
|
||||
let storeCfg = PostgresStoreCfg {dbOpts, dbStoreLogPath = Nothing, confirmMigrations = MCConsole, deletedTTL = iniDeletedTTL ini}
|
||||
ps <- newJournalMsgStore $ PQStoreCfg storeCfg
|
||||
sl <- openWriteStoreLog False storeLogFilePath
|
||||
Sum qCnt <- foldQueueRecs True (postgresQueueStore ps) $ \(rId, qr) -> logCreateQueue sl rId qr $> Sum (1 :: Int)
|
||||
Sum qCnt <- foldQueueRecs True (postgresQueueStore ps) Nothing $ \(rId, qr) -> logCreateQueue sl rId qr $> Sum (1 :: Int)
|
||||
putStrLn $ "Export completed: " <> show qCnt <> " queues"
|
||||
putStrLn $ case readStoreType ini of
|
||||
Right (ASType SQSPostgres SMSJournal) -> "store_queues set to `database`, update it to `memory` in INI file."
|
||||
|
||||
@@ -393,7 +393,7 @@ instance MsgStoreClass (JournalMsgStore s) where
|
||||
unsafeWithAllMsgQueues tty ms action = case queueStore_ ms of
|
||||
MQStore st -> withLoadedQueues st run
|
||||
#if defined(dbServerPostgres)
|
||||
PQStore st -> foldQueueRecs tty st $ uncurry (mkTempQueue ms) >=> run
|
||||
PQStore st -> foldQueueRecs tty st Nothing $ uncurry (mkTempQueue ms) >=> run
|
||||
#endif
|
||||
where
|
||||
run q = do
|
||||
@@ -401,23 +401,32 @@ instance MsgStoreClass (JournalMsgStore s) where
|
||||
closeMsgQueue q
|
||||
pure r
|
||||
|
||||
-- This function is concurrency safe, it is used to expire queues.
|
||||
withAllMsgQueues :: forall a. Monoid a => Bool -> String -> JournalMsgStore s -> (JournalQueue s -> StoreIO s a) -> IO a
|
||||
withAllMsgQueues tty op ms action = case queueStore_ ms of
|
||||
-- This function is concurrency safe
|
||||
expireOldMessages :: Bool -> JournalMsgStore s -> Int64 -> Int64 -> IO MessageStats
|
||||
expireOldMessages tty ms now ttl = case queueStore_ ms of
|
||||
MQStore st ->
|
||||
withLoadedQueues st $ \q ->
|
||||
run $ isolateQueue q op $ action q
|
||||
withLoadedQueues st $ \q -> run $ isolateQueue q "deleteExpiredMsgs" $ do
|
||||
StoreIO (readTVarIO $ queueRec q) >>= \case
|
||||
Just QueueRec {updatedAt = Just (RoundedSystemTime t)} | t > veryOld ->
|
||||
expireQueueMsgs ms now old q
|
||||
_ -> pure newMessageStats
|
||||
#if defined(dbServerPostgres)
|
||||
PQStore st -> do
|
||||
let JournalMsgStore {queueLocks, sharedLock} = ms
|
||||
foldQueueRecs tty st $ \(rId, qr) -> do
|
||||
foldQueueRecs tty st (Just veryOld) $ \(rId, qr) -> do
|
||||
q <- mkTempQueue ms rId qr
|
||||
withSharedWaitLock rId queueLocks sharedLock $
|
||||
run $ tryStore' op rId $ unStoreIO $ action q
|
||||
withSharedWaitLock rId queueLocks sharedLock $ run $ tryStore' "deleteExpiredMsgs" rId $
|
||||
getLoadedQueue q >>= unStoreIO . expireQueueMsgs ms now old
|
||||
#endif
|
||||
where
|
||||
run :: ExceptT ErrorType IO a -> IO a
|
||||
run = fmap (fromRight mempty) . runExceptT
|
||||
old = now - ttl
|
||||
veryOld = now - 2 * ttl - 86400
|
||||
run :: ExceptT ErrorType IO MessageStats -> IO MessageStats
|
||||
run = fmap (fromRight newMessageStats) . runExceptT
|
||||
-- Use cached queue if available.
|
||||
-- Also see the comment in loadQueue in PostgresQueueStore
|
||||
getLoadedQueue :: JournalQueue s -> IO (JournalQueue s)
|
||||
getLoadedQueue q = fromMaybe q <$> TM.lookupIO (recipientId q) (loadedQueues $ queueStore_ ms)
|
||||
|
||||
logQueueStates :: JournalMsgStore s -> IO ()
|
||||
logQueueStates ms = withActiveMsgQueues ms $ unStoreIO . logQueueState
|
||||
@@ -437,9 +446,6 @@ instance MsgStoreClass (JournalMsgStore s) where
|
||||
lock <- atomically $ getMapLock (queueLocks ms) rId
|
||||
makeQueue_ ms rId qr lock
|
||||
|
||||
getLoadedQueue :: JournalMsgStore s -> JournalQueue s -> StoreIO s (JournalQueue s)
|
||||
getLoadedQueue ms sq = StoreIO $ fromMaybe sq <$> TM.lookupIO (recipientId sq) (loadedQueues $ queueStore_ ms)
|
||||
|
||||
getMsgQueue :: JournalMsgStore s -> JournalQueue s -> Bool -> StoreIO s (JournalMsgQueue s)
|
||||
getMsgQueue ms@JournalMsgStore {random} q'@JournalQueue {recipientId' = rId, msgQueue'} forWrite =
|
||||
StoreIO $ readTVarIO msgQueue' >>= maybe newQ pure
|
||||
|
||||
@@ -82,8 +82,11 @@ instance MsgStoreClass STMMsgStore where
|
||||
{-# INLINE withActiveMsgQueues #-}
|
||||
unsafeWithAllMsgQueues _ = withLoadedQueues . queueStore_
|
||||
{-# INLINE unsafeWithAllMsgQueues #-}
|
||||
withAllMsgQueues _tty _op ms action = withLoadedQueues (queueStore_ ms) $ atomically . action
|
||||
{-# INLINE withAllMsgQueues #-}
|
||||
|
||||
expireOldMessages :: Bool -> STMMsgStore -> Int64 -> Int64 -> IO MessageStats
|
||||
expireOldMessages _tty ms now ttl =
|
||||
withLoadedQueues (queueStore_ ms) $ atomically . expireQueueMsgs ms now (now - ttl)
|
||||
|
||||
logQueueStates _ = pure ()
|
||||
{-# INLINE logQueueStates #-}
|
||||
logQueueState _ = pure ()
|
||||
@@ -94,10 +97,6 @@ instance MsgStoreClass STMMsgStore where
|
||||
mkQueue _ rId qr = STMQueue rId <$> newTVarIO (Just qr) <*> newTVarIO Nothing
|
||||
{-# INLINE mkQueue #-}
|
||||
|
||||
getLoadedQueue :: STMMsgStore -> STMQueue -> STM STMQueue
|
||||
getLoadedQueue _ = pure
|
||||
{-# INLINE getLoadedQueue #-}
|
||||
|
||||
getMsgQueue :: STMMsgStore -> STMQueue -> Bool -> STM STMMsgQueue
|
||||
getMsgQueue _ STMQueue {msgQueue'} _ = readTVar msgQueue' >>= maybe newQ pure
|
||||
where
|
||||
|
||||
@@ -21,6 +21,7 @@ import Control.Monad.Trans.Except
|
||||
import Data.Functor (($>))
|
||||
import Data.Int (Int64)
|
||||
import Data.Kind
|
||||
import Data.Maybe (fromMaybe)
|
||||
import Data.Time.Clock.System (SystemTime (systemSeconds))
|
||||
import Simplex.Messaging.Protocol
|
||||
import Simplex.Messaging.Server.QueueStore
|
||||
@@ -37,14 +38,14 @@ class (Monad (StoreMonad s), QueueStoreClass (StoreQueue s) (QueueStore s)) => M
|
||||
withActiveMsgQueues :: Monoid a => s -> (StoreQueue s -> IO a) -> IO a
|
||||
-- This function can only be used in server CLI commands or before server is started.
|
||||
unsafeWithAllMsgQueues :: Monoid a => Bool -> s -> (StoreQueue s -> IO a) -> IO a
|
||||
withAllMsgQueues :: Monoid a => Bool -> String -> s -> (StoreQueue s -> StoreMonad s a) -> IO a
|
||||
-- tty, store, now, ttl
|
||||
expireOldMessages :: Bool -> s -> Int64 -> Int64 -> IO MessageStats
|
||||
logQueueStates :: s -> IO ()
|
||||
logQueueState :: StoreQueue s -> StoreMonad s ()
|
||||
queueStore :: s -> QueueStore s
|
||||
|
||||
-- message store methods
|
||||
mkQueue :: s -> RecipientId -> QueueRec -> IO (StoreQueue s)
|
||||
getLoadedQueue :: s -> StoreQueue s -> StoreMonad s (StoreQueue s)
|
||||
getMsgQueue :: s -> StoreQueue s -> Bool -> StoreMonad s (MsgQueue (StoreQueue s))
|
||||
getPeekMsgQueue :: s -> StoreQueue s -> StoreMonad s (Maybe (MsgQueue (StoreQueue s), Message))
|
||||
|
||||
@@ -73,6 +74,23 @@ data SQSType :: QSType -> Type where
|
||||
SQSMemory :: SQSType 'QSMemory
|
||||
SQSPostgres :: SQSType 'QSPostgres
|
||||
|
||||
data MessageStats = MessageStats
|
||||
{ storedMsgsCount :: Int,
|
||||
expiredMsgsCount :: Int,
|
||||
storedQueues :: Int
|
||||
}
|
||||
|
||||
instance Monoid MessageStats where
|
||||
mempty = MessageStats 0 0 0
|
||||
{-# INLINE mempty #-}
|
||||
|
||||
instance Semigroup MessageStats where
|
||||
MessageStats a b c <> MessageStats x y z = MessageStats (a + x) (b + y) (c + z)
|
||||
{-# INLINE (<>) #-}
|
||||
|
||||
newMessageStats :: MessageStats
|
||||
newMessageStats = MessageStats 0 0 0
|
||||
|
||||
addQueue :: MsgStoreClass s => s -> RecipientId -> QueueRec -> IO (Either ErrorType (StoreQueue s))
|
||||
addQueue st = addQueue_ (queueStore st) (mkQueue st)
|
||||
{-# INLINE addQueue #-}
|
||||
@@ -122,14 +140,10 @@ deleteExpiredMsgs st q old =
|
||||
isolateQueue q "deleteExpiredMsgs" $
|
||||
getMsgQueue st q False >>= deleteExpireMsgs_ old q
|
||||
|
||||
-- closed and idle queues will be closed after expiration
|
||||
-- returns (expired count, queue size after expiration)
|
||||
idleDeleteExpiredMsgs :: MsgStoreClass s => Int64 -> s -> StoreQueue s -> Int64 -> StoreMonad s (Maybe Int, Int)
|
||||
idleDeleteExpiredMsgs now st q old = do
|
||||
-- Use cached queue if available.
|
||||
-- Also see the comment in loadQueue in PostgresQueueStore
|
||||
q' <- getLoadedQueue st q
|
||||
withIdleMsgQueue now st q' $ deleteExpireMsgs_ old q'
|
||||
expireQueueMsgs :: MsgStoreClass s => s -> Int64 -> Int64 -> StoreQueue s -> StoreMonad s MessageStats
|
||||
expireQueueMsgs st now old q = do
|
||||
(expired_, stored) <- withIdleMsgQueue now st q $ deleteExpireMsgs_ old q
|
||||
pure MessageStats {storedMsgsCount = stored, expiredMsgsCount = fromMaybe 0 expired_, storedQueues = 1}
|
||||
|
||||
deleteExpireMsgs_ :: MsgStoreClass s => Int64 -> StoreQueue s -> MsgQueue (StoreQueue s) -> StoreMonad s Int
|
||||
deleteExpireMsgs_ old q mq = do
|
||||
|
||||
@@ -324,10 +324,10 @@ insertQueueQuery =
|
||||
VALUES (?,?,?,?,?,?,?,?,?,?,?)
|
||||
|]
|
||||
|
||||
foldQueueRecs :: Monoid a => Bool -> PostgresQueueStore q -> ((RecipientId, QueueRec) -> IO a) -> IO a
|
||||
foldQueueRecs tty st f = do
|
||||
foldQueueRecs :: Monoid a => Bool -> PostgresQueueStore q -> Maybe Int64 -> ((RecipientId, QueueRec) -> IO a) -> IO a
|
||||
foldQueueRecs tty st skipOld_ f = do
|
||||
(n, r) <- withConnection (dbStore st) $ \db ->
|
||||
DB.fold_ db (queueRecQuery <> " WHERE deleted_at IS NULL") (0 :: Int, mempty) $ \(i, acc) row -> do
|
||||
foldRecs db (0 :: Int, mempty) $ \(i, acc) row -> do
|
||||
r <- f $ rowToQueueRec row
|
||||
let !i' = i + 1
|
||||
!acc' = acc <> r
|
||||
@@ -336,6 +336,9 @@ foldQueueRecs tty st f = do
|
||||
when tty $ putStrLn $ progress n
|
||||
pure r
|
||||
where
|
||||
foldRecs db = case skipOld_ of
|
||||
Nothing -> DB.fold_ db (queueRecQuery <> " WHERE deleted_at IS NULL")
|
||||
Just old -> DB.fold db (queueRecQuery <> " WHERE deleted_at IS NULL AND updated_at > ?") (Only old)
|
||||
progress i = "Processed: " <> show i <> " records"
|
||||
|
||||
queueRecQuery :: Query
|
||||
|
||||
@@ -11,7 +11,8 @@ import Text.RawString.QQ (r)
|
||||
|
||||
serverSchemaMigrations :: [(String, Text, Maybe Text)]
|
||||
serverSchemaMigrations =
|
||||
[ ("20250207_initial", m20250207_initial, Nothing)
|
||||
[ ("20250207_initial", m20250207_initial, Nothing),
|
||||
("20250319_updated_index", m20250319_updated_index, Just down_m20250319_updated_index)
|
||||
]
|
||||
|
||||
-- | The list of migrations in ascending order by date
|
||||
@@ -44,3 +45,19 @@ CREATE UNIQUE INDEX idx_msg_queues_sender_id ON msg_queues(sender_id);
|
||||
CREATE UNIQUE INDEX idx_msg_queues_notifier_id ON msg_queues(notifier_id);
|
||||
CREATE INDEX idx_msg_queues_deleted_at ON msg_queues (deleted_at);
|
||||
|]
|
||||
|
||||
m20250319_updated_index :: Text
|
||||
m20250319_updated_index =
|
||||
T.pack
|
||||
[r|
|
||||
DROP INDEX idx_msg_queues_deleted_at;
|
||||
CREATE INDEX idx_msg_queues_updated_at ON msg_queues (deleted_at, updated_at);
|
||||
|]
|
||||
|
||||
down_m20250319_updated_index :: Text
|
||||
down_m20250319_updated_index =
|
||||
T.pack
|
||||
[r|
|
||||
DROP INDEX idx_msg_queues_updated_at;
|
||||
CREATE INDEX idx_msg_queues_deleted_at ON msg_queues (deleted_at);
|
||||
|]
|
||||
|
||||
@@ -32,7 +32,7 @@ import Data.Time.Clock.System (SystemTime (..), getSystemTime)
|
||||
import Simplex.Messaging.Crypto (pattern MaxLenBS)
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Protocol (EntityId (..), Message (..), RecipientId, SParty (..), noMsgFlags)
|
||||
import Simplex.Messaging.Server (MessageStats (..), exportMessages, importMessages, printMessageStats)
|
||||
import Simplex.Messaging.Server (exportMessages, importMessages, printMessageStats)
|
||||
import Simplex.Messaging.Server.Env.STM (journalMsgStoreDepth, readWriteQueueStore)
|
||||
import Simplex.Messaging.Server.Expiration (ExpirationConfig (..), expireBeforeEpoch)
|
||||
import Simplex.Messaging.Server.MsgStore.Journal
|
||||
@@ -453,7 +453,7 @@ testExpireIdleQueues = do
|
||||
old <- expireBeforeEpoch ExpirationConfig {ttl = 1, checkInterval = 1} -- no old messages
|
||||
now <- systemSeconds <$> getSystemTime
|
||||
|
||||
(expired_, stored) <- runRight $ isolateQueue q "" $ idleDeleteExpiredMsgs now ms q old
|
||||
(expired_, stored) <- runRight $ isolateQueue q "" $ withIdleMsgQueue now ms q $ deleteExpireMsgs_ old q
|
||||
expired_ `shouldBe` Just 0
|
||||
stored `shouldBe` 0
|
||||
(Nothing, False) <- readQueueState ms statePath
|
||||
|
||||
Reference in New Issue
Block a user