smp server: set message counts correctly after import (#1632)

This commit is contained in:
Evgeny
2025-09-12 13:08:11 +01:00
committed by GitHub
parent a137d01c90
commit 6c66cf367a
2 changed files with 61 additions and 6 deletions
+10 -3
View File
@@ -74,7 +74,7 @@ import Simplex.Messaging.Agent.Store.Postgres (checkSchemaExists)
import Simplex.Messaging.Server.MsgStore.Journal (JournalQueue)
import Simplex.Messaging.Server.MsgStore.Types (QSType (..))
import Simplex.Messaging.Server.MsgStore.Journal (postgresQueueStore)
import Simplex.Messaging.Server.MsgStore.Postgres (PostgresMsgStoreCfg (..), batchInsertMessages, exportDbMessages)
import Simplex.Messaging.Server.MsgStore.Postgres
import Simplex.Messaging.Server.QueueStore.Postgres (batchInsertQueues, batchInsertServices, foldQueueRecs, foldServiceRecs)
import Simplex.Messaging.Server.QueueStore.STM (STMQueueStore (..))
import Simplex.Messaging.Server.QueueStore.Types
@@ -653,9 +653,16 @@ importMessagesToDatabase :: FilePath -> DBOpts -> IO Int64
importMessagesToDatabase msgsLogFile dbOpts = do
let storeCfg = PostgresStoreCfg {dbOpts, dbStoreLogPath = Nothing, confirmMigrations = MCConsole, deletedTTL = 86400 * defaultDeletedTTL}
ms <- newMsgStore $ PostgresMsgStoreCfg storeCfg defaultMsgQueueQuota
mCnt <- batchInsertMessages True msgsLogFile $ queueStore ms
mCnt <- getDbMessageCount ms
when (mCnt > 0) $ do
confirmOrExit ("WARNING: the database contains messages, they will be deleted.") "Message records not imported"
deleteAllMessages ms
inserted <- batchInsertMessages True msgsLogFile $ queueStore ms
mCnt' <- getDbMessageCount ms
unless (inserted == mCnt') $ putStrLn $ "WARNING: inserted " <> show inserted <> " rows, table has " <> show mCnt' <> " messages."
updateQueueCounts ms
renameFile msgsLogFile $ msgsLogFile <> ".bak"
pure mCnt
pure mCnt'
exportDatabaseToStoreLog :: FilePath -> DBOpts -> FilePath -> IO (Int, Int)
exportDatabaseToStoreLog logPath dbOpts storeLogFilePath = do
@@ -19,7 +19,10 @@ module Simplex.Messaging.Server.MsgStore.Postgres
PostgresQueue,
exportDbMessages,
getDbMessageStats,
getDbMessageCount,
deleteAllMessages,
batchInsertMessages,
updateQueueCounts,
)
where
@@ -282,7 +285,54 @@ getDbMessageStats ms =
toMessageStats (storedQueues, storedMsgsCount) =
MessageStats {storedQueues, storedMsgsCount, expiredMsgsCount = 0}
-- TODO [messages] update counts
getDbMessageCount :: PostgresMsgStore -> IO Int64
getDbMessageCount ms =
maybeFirstRow' 0 fromOnly $
withConnection (dbStore $ queueStore_ ms) (`DB.query_` "SELECT COUNT(*) FROM messages")
deleteAllMessages :: PostgresMsgStore -> IO ()
deleteAllMessages ms =
withConnection (dbStore $ queueStore_ ms) $ \db -> do
void $ DB.execute_ db "TRUNCATE messages"
void $ DB.execute_
db
[sql|
UPDATE msg_queues
SET msg_queue_size = 0, msg_can_write = TRUE
WHERE msg_queue_size != 0 OR msg_can_write = FALSE
|]
updateQueueCounts :: PostgresMsgStore -> IO ()
updateQueueCounts ms =
withConnection (dbStore $ queueStore_ ms) $ \db -> do
void $ DB.execute_
db
[sql|
CREATE TEMP TABLE queue_stats AS
SELECT recipient_id,
COUNT(*) AS size,
BOOL_OR(msg_quota) AS has_quota
FROM messages
GROUP BY recipient_id
|]
void $ DB.execute_
db
[sql|
UPDATE msg_queues
SET msg_queue_size = 0, msg_can_write = TRUE
WHERE msg_queue_size != 0 OR msg_can_write = FALSE
|]
void $ DB.execute_
db
[sql|
UPDATE msg_queues q
SET msg_queue_size = s.size,
msg_can_write = NOT s.has_quota
FROM queue_stats s
WHERE q.recipient_id = s.recipient_id
|]
void $ DB.execute_ db "DROP TABLE queue_stats"
batchInsertMessages :: StoreQueueClass q => Bool -> FilePath -> PostgresQueueStore q -> IO Int64
batchInsertMessages tty f toStore = do
putStrLn "Importing messages..."
@@ -296,8 +346,6 @@ batchInsertMessages tty f toStore = do
FROM STDIN WITH (FORMAT CSV)
|]
foldLogLines tty f (putMessage db) (0 :: Int, 0) >>= (DB.putCopyEnd db $>)
Only mCnt : _ <- withTransaction st (`DB.query_` "SELECT count(*) FROM messages")
unless (inserted == mCnt) $ putStrLn $ "WARNING: inserted " <> show inserted <> " rows, table has " <> show mCnt <> " messages."
pure inserted
where
putMessage db (!i, !cnt) _eof s = do