smp server: expire messages in postgres database, mark queues as deleted, combine tables (#1471)

* smp server: expire messages in postgres database

* tty

* fail if nothing updated in db

* remove old deleted queues

* index

* fix tests
This commit is contained in:
Evgeny
2025-03-10 09:31:50 +00:00
committed by GitHub
parent e4b9aa9746
commit 7b42aaa132
12 changed files with 184 additions and 191 deletions
@@ -58,7 +58,7 @@ import qualified Data.ByteString.Char8 as B
import Data.Functor (($>))
import Data.Int (Int64)
import Data.List (intercalate, sort)
import Data.Maybe (catMaybes, fromMaybe, isNothing, mapMaybe)
import Data.Maybe (fromMaybe, isNothing, mapMaybe)
import Data.Text (Text)
import qualified Data.Text as T
import Data.Time.Clock (NominalDiffTime, UTCTime, addUTCTime, getCurrentTime)
@@ -67,8 +67,6 @@ import Data.Time.Format.ISO8601 (iso8601Show, iso8601ParseM)
import GHC.IO (catchAny)
import Simplex.Messaging.Agent.Client (getMapLock, withLockMap)
import Simplex.Messaging.Agent.Lock
import Simplex.Messaging.Agent.Store.Postgres.Common (DBOpts)
import Simplex.Messaging.Agent.Store.Shared (MigrationConfirmation)
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol
import Simplex.Messaging.Server.MsgStore.Types
@@ -81,9 +79,8 @@ import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Server.StoreLog
import Simplex.Messaging.Util (ifM, tshow, whenM, ($>>=), (<$$>))
import System.Directory
import System.Exit
import System.FilePath (takeFileName, (</>))
import System.IO (BufferMode (..), Handle, IOMode (..), SeekMode (..), stdout)
import System.IO (BufferMode (..), Handle, IOMode (..), SeekMode (..))
import qualified System.IO as IO
import System.Random (StdGen, genByteString, newStdGen)
@@ -137,7 +134,7 @@ data JournalStoreConfig s = JournalStoreConfig
data QStoreCfg s where
MQStoreCfg :: QStoreCfg 'QSMemory
PQStoreCfg :: DBOpts -> MigrationConfirmation -> QStoreCfg 'QSPostgres
PQStoreCfg :: PostgresStoreCfg -> QStoreCfg 'QSPostgres
data JournalQueue (s :: QSType) = JournalQueue
{ recipientId' :: RecipientId,
@@ -289,10 +286,12 @@ instance QueueStoreClass (JournalQueue s) (QStore s) where
newQueueStore :: QStoreCfg s -> IO (QStore s)
newQueueStore = \case
MQStoreCfg -> MQStore <$> newQueueStore @(JournalQueue s) ()
PQStoreCfg dbOpts confirmMigrations -> PQStore <$> newQueueStore @(JournalQueue s) (dbOpts, confirmMigrations)
PQStoreCfg cfg -> PQStore <$> newQueueStore @(JournalQueue s) cfg
loadedQueues = withQS loadedQueues
{-# INLINE loadedQueues #-}
compactQueues = withQS (compactQueues @(JournalQueue s))
{-# INLINE compactQueues #-}
queueCounts = withQS (queueCounts @(JournalQueue s))
{-# INLINE queueCounts #-}
addQueue_ = withQS addQueue_
@@ -341,54 +340,12 @@ instance MsgStoreClass (JournalMsgStore s) where
closeQueues qs = readTVarIO qs >>= mapM_ closeMsgQueue
withActiveMsgQueues :: Monoid a => JournalMsgStore s -> (JournalQueue s -> IO a) -> IO a
withActiveMsgQueues ms f = case queueStore_ ms of
MQStore st -> withLoadedQueues st f
PQStore st -> withLoadedQueues st f
withActiveMsgQueues = withQS withLoadedQueues . queueStore_
-- This function is a "foldr" that opens and closes all queues, processes them as defined by action and accumulates the result.
-- It is used to export storage to a single file and also to expire messages and validate all queues when server is started.
-- TODO this function requires case-sensitive file system, because it uses queue directory as recipient ID.
-- It can be made to support case-insensite FS by supporting more than one queue per directory, by getting recipient ID from state file name.
-- TODO [postgres] this should simply load all known queues and process them
withAllMsgQueues :: forall a. Monoid a => Bool -> JournalMsgStore s -> (JournalQueue s -> IO a) -> IO a
withAllMsgQueues tty ms@JournalMsgStore {config} action = ifM (doesDirectoryExist storePath) processStore (pure mempty)
where
processStore = do
(!count, !res) <- foldQueues 0 processQueue (0, mempty) ("", storePath)
putStrLn $ progress count
pure res
JournalStoreConfig {storePath, pathParts} = config
processQueue :: (Int, a) -> (String, FilePath) -> IO (Int, a)
processQueue (!i, !r) (queueId, dir) = do
when (tty && i `mod` 100 == 0) $ putStr (progress i <> "\r") >> IO.hFlush stdout
r' <- case strDecode $ B.pack queueId of
Right rId ->
getQueue ms SRecipient rId >>= \case
Right q -> unStoreIO (getMsgQueue ms q False) *> action q <* closeMsgQueue q
Left AUTH -> do
logWarn $ "STORE: processQueue, queue " <> T.pack queueId <> " was removed, removing " <> T.pack dir
removeQueueDirectory_ dir
pure mempty
Left e -> do
logError $ "STORE: processQueue, error getting queue " <> T.pack queueId <> ", " <> tshow e
exitFailure
Left e -> do
logError $ "STORE: processQueue, message queue directory " <> T.pack dir <> " is invalid, " <> tshow e
exitFailure
pure (i + 1, r <> r')
progress i = "Processed: " <> show i <> " queues"
foldQueues depth f acc (queueId, path) = do
let f' = if depth == pathParts - 1 then f else foldQueues (depth + 1) f
listDirs >>= foldM f' acc
where
listDirs = fmap catMaybes . mapM queuePath =<< listDirectory path
queuePath dir = do
let !path' = path </> dir
!queueId' = queueId <> dir
ifM
(doesDirectoryExist path')
(pure $ Just (queueId', path'))
(Nothing <$ putStrLn ("Error: path " <> path' <> " is not a directory, skipping"))
withAllMsgQueues :: Monoid a => Bool -> JournalMsgStore s -> (JournalQueue s -> IO a) -> IO a
withAllMsgQueues tty ms action = case queueStore_ ms of
MQStore st -> withLoadedQueues st action
PQStore st -> foldQueues tty st (mkQueue ms) action
logQueueStates :: JournalMsgStore s -> IO ()
logQueueStates ms = withActiveMsgQueues ms $ unStoreIO . logQueueState
@@ -83,7 +83,7 @@ getQueueRec st party qId =
$>>= (\q -> maybe (Left AUTH) (Right . (q,)) <$> readTVarIO (queueRec q))
getQueueMessages :: MsgStoreClass s => Bool -> s -> StoreQueue s -> ExceptT ErrorType IO [Message]
getQueueMessages drainMsgs st q = withPeekMsgQueue st q "getQueueSize" $ maybe (pure []) (getQueueMessages_ drainMsgs q . fst)
getQueueMessages drainMsgs st q = withPeekMsgQueue st q "getQueueMessages" $ maybe (pure []) (getQueueMessages_ drainMsgs q . fst)
{-# INLINE getQueueMessages #-}
getQueueSize :: MsgStoreClass s => s -> StoreQueue s -> ExceptT ErrorType IO Int