From 6c66cf367a38b2d8f74adced47e50b44c78328b7 Mon Sep 17 00:00:00 2001 From: Evgeny Date: Fri, 12 Sep 2025 13:08:11 +0100 Subject: [PATCH] smp server: set message counts correctly after import (#1632) --- src/Simplex/Messaging/Server/Main.hs | 13 +++-- .../Messaging/Server/MsgStore/Postgres.hs | 54 +++++++++++++++++-- 2 files changed, 61 insertions(+), 6 deletions(-) diff --git a/src/Simplex/Messaging/Server/Main.hs b/src/Simplex/Messaging/Server/Main.hs index a4164421f..8e5fd55ee 100644 --- a/src/Simplex/Messaging/Server/Main.hs +++ b/src/Simplex/Messaging/Server/Main.hs @@ -74,7 +74,7 @@ import Simplex.Messaging.Agent.Store.Postgres (checkSchemaExists) import Simplex.Messaging.Server.MsgStore.Journal (JournalQueue) import Simplex.Messaging.Server.MsgStore.Types (QSType (..)) import Simplex.Messaging.Server.MsgStore.Journal (postgresQueueStore) -import Simplex.Messaging.Server.MsgStore.Postgres (PostgresMsgStoreCfg (..), batchInsertMessages, exportDbMessages) +import Simplex.Messaging.Server.MsgStore.Postgres import Simplex.Messaging.Server.QueueStore.Postgres (batchInsertQueues, batchInsertServices, foldQueueRecs, foldServiceRecs) import Simplex.Messaging.Server.QueueStore.STM (STMQueueStore (..)) import Simplex.Messaging.Server.QueueStore.Types @@ -653,9 +653,16 @@ importMessagesToDatabase :: FilePath -> DBOpts -> IO Int64 importMessagesToDatabase msgsLogFile dbOpts = do let storeCfg = PostgresStoreCfg {dbOpts, dbStoreLogPath = Nothing, confirmMigrations = MCConsole, deletedTTL = 86400 * defaultDeletedTTL} ms <- newMsgStore $ PostgresMsgStoreCfg storeCfg defaultMsgQueueQuota - mCnt <- batchInsertMessages True msgsLogFile $ queueStore ms + mCnt <- getDbMessageCount ms + when (mCnt > 0) $ do + confirmOrExit ("WARNING: the database contains messages, they will be deleted.") "Message records not imported" + deleteAllMessages ms + inserted <- batchInsertMessages True msgsLogFile $ queueStore ms + mCnt' <- getDbMessageCount ms + unless (inserted == mCnt') $ putStrLn $ "WARNING: inserted " <> show inserted <> " rows, table has " <> show mCnt' <> " messages." + updateQueueCounts ms renameFile msgsLogFile $ msgsLogFile <> ".bak" - pure mCnt + pure mCnt' exportDatabaseToStoreLog :: FilePath -> DBOpts -> FilePath -> IO (Int, Int) exportDatabaseToStoreLog logPath dbOpts storeLogFilePath = do diff --git a/src/Simplex/Messaging/Server/MsgStore/Postgres.hs b/src/Simplex/Messaging/Server/MsgStore/Postgres.hs index a2efb4563..14dd3977a 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Postgres.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Postgres.hs @@ -19,7 +19,10 @@ module Simplex.Messaging.Server.MsgStore.Postgres PostgresQueue, exportDbMessages, getDbMessageStats, + getDbMessageCount, + deleteAllMessages, batchInsertMessages, + updateQueueCounts, ) where @@ -282,7 +285,54 @@ getDbMessageStats ms = toMessageStats (storedQueues, storedMsgsCount) = MessageStats {storedQueues, storedMsgsCount, expiredMsgsCount = 0} --- TODO [messages] update counts +getDbMessageCount :: PostgresMsgStore -> IO Int64 +getDbMessageCount ms = + maybeFirstRow' 0 fromOnly $ + withConnection (dbStore $ queueStore_ ms) (`DB.query_` "SELECT COUNT(*) FROM messages") + +deleteAllMessages :: PostgresMsgStore -> IO () +deleteAllMessages ms = + withConnection (dbStore $ queueStore_ ms) $ \db -> do + void $ DB.execute_ db "TRUNCATE messages" + void $ DB.execute_ + db + [sql| + UPDATE msg_queues + SET msg_queue_size = 0, msg_can_write = TRUE + WHERE msg_queue_size != 0 OR msg_can_write = FALSE + |] + +updateQueueCounts :: PostgresMsgStore -> IO () +updateQueueCounts ms = + withConnection (dbStore $ queueStore_ ms) $ \db -> do + void $ DB.execute_ + db + [sql| + CREATE TEMP TABLE queue_stats AS + SELECT recipient_id, + COUNT(*) AS size, + BOOL_OR(msg_quota) AS has_quota + FROM messages + GROUP BY recipient_id + |] + void $ DB.execute_ + db + [sql| + UPDATE msg_queues + SET msg_queue_size = 0, msg_can_write = TRUE + WHERE msg_queue_size != 0 OR msg_can_write = FALSE + |] + void $ DB.execute_ + db + [sql| + UPDATE msg_queues q + SET msg_queue_size = s.size, + msg_can_write = NOT s.has_quota + FROM queue_stats s + WHERE q.recipient_id = s.recipient_id + |] + void $ DB.execute_ db "DROP TABLE queue_stats" + batchInsertMessages :: StoreQueueClass q => Bool -> FilePath -> PostgresQueueStore q -> IO Int64 batchInsertMessages tty f toStore = do putStrLn "Importing messages..." @@ -296,8 +346,6 @@ batchInsertMessages tty f toStore = do FROM STDIN WITH (FORMAT CSV) |] foldLogLines tty f (putMessage db) (0 :: Int, 0) >>= (DB.putCopyEnd db $>) - Only mCnt : _ <- withTransaction st (`DB.query_` "SELECT count(*) FROM messages") - unless (inserted == mCnt) $ putStrLn $ "WARNING: inserted " <> show inserted <> " rows, table has " <> show mCnt <> " messages." pure inserted where putMessage db (!i, !cnt) _eof s = do