smp server: optionally append store log with postgres storage (without loading and compacting, for debugging during migration) (#1480)

* smp server: optionally maintain store log with postgres storage (without loading and compacting, for debugging during migration)

* refactor

* remove comment
This commit is contained in:
Evgeny
2025-03-13 10:25:56 +00:00
committed by GitHub
parent d44f09d111
commit 019a32a623
10 changed files with 151 additions and 100 deletions
@@ -47,9 +47,10 @@ import Database.PostgreSQL.Simple.FromField (FromField (..))
import Database.PostgreSQL.Simple.ToField (ToField (..))
import Database.PostgreSQL.Simple.Errors (ConstraintViolation (..), constraintViolation)
import Database.PostgreSQL.Simple.SqlQQ (sql)
import GHC.IO (catchAny)
import Simplex.Messaging.Agent.Client (withLockMap)
import Simplex.Messaging.Agent.Lock (Lock)
import Simplex.Messaging.Agent.Store.Postgres (createDBStore)
import Simplex.Messaging.Agent.Store.Postgres (createDBStore, closeDBStore)
import Simplex.Messaging.Agent.Store.Postgres.Common
import Simplex.Messaging.Agent.Store.Shared (MigrationConfirmation)
import Simplex.Messaging.Protocol
@@ -57,11 +58,12 @@ import Simplex.Messaging.Server.QueueStore
import Simplex.Messaging.Server.QueueStore.Postgres.Migrations (serverMigrations)
import Simplex.Messaging.Server.QueueStore.STM (readQueueRecIO)
import Simplex.Messaging.Server.QueueStore.Types
import Simplex.Messaging.Server.StoreLog
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Util (firstRow, ifM, tshow, (<$$>))
import System.Exit (exitFailure)
import System.IO (hFlush, stdout)
import System.IO (IOMode (..), hFlush, stdout)
import UnliftIO.STM
#if !defined(dbPostgres)
import Simplex.Messaging.Agent.Store.Postgres.DB (blobFieldDecoder)
@@ -71,6 +73,7 @@ import Simplex.Messaging.Encoding.String
data PostgresQueueStore q = PostgresQueueStore
{ dbStore :: DBStore,
dbStoreLog :: Maybe (StoreLog 'WriteMode),
-- this map caches all created and opened queues
queues :: TMap RecipientId q,
-- this map only cashes the queues that were attempted to send messages to,
@@ -83,6 +86,7 @@ data PostgresQueueStore q = PostgresQueueStore
data PostgresStoreCfg = PostgresStoreCfg
{ dbOpts :: DBOpts,
dbStoreLogPath :: Maybe FilePath,
confirmMigrations :: MigrationConfirmation,
deletedTTL :: Int64
}
@@ -91,18 +95,24 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
type QueueStoreCfg (PostgresQueueStore q) = PostgresStoreCfg
newQueueStore :: PostgresStoreCfg -> IO (PostgresQueueStore q)
newQueueStore PostgresStoreCfg {dbOpts, confirmMigrations, deletedTTL} = do
newQueueStore PostgresStoreCfg {dbOpts, dbStoreLogPath, confirmMigrations, deletedTTL} = do
dbStore <- either err pure =<< createDBStore dbOpts serverMigrations confirmMigrations
dbStoreLog <- mapM (openWriteStoreLog True) dbStoreLogPath
queues <- TM.emptyIO
senders <- TM.emptyIO
notifiers <- TM.emptyIO
notifierLocks <- TM.emptyIO
pure PostgresQueueStore {dbStore, queues, senders, notifiers, notifierLocks, deletedTTL}
pure PostgresQueueStore {dbStore, dbStoreLog, queues, senders, notifiers, notifierLocks, deletedTTL}
where
err e = do
logError $ "STORE: newQueueStore, error opening PostgreSQL database, " <> tshow e
exitFailure
closeQueueStore :: PostgresQueueStore q -> IO ()
closeQueueStore PostgresQueueStore {dbStore, dbStoreLog} = do
closeDBStore dbStore
mapM_ closeStoreLog dbStoreLog
loadedQueues = queues
{-# INLINE loadedQueues #-}
@@ -136,6 +146,7 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
>>= bimapM handleDuplicate pure
atomically $ TM.insert rId sq queues
atomically $ TM.insert (senderId qr) rId senders
withLog "addStoreQueue" st $ \s -> logCreateQueue s rId qr
pure sq
where
PostgresQueueStore {queues, senders} = st
@@ -166,9 +177,11 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
withQueueDB sq "secureQueue" $ \q -> do
verify q
assertUpdated $ withDB' "secureQueue" st $ \db ->
DB.execute db "UPDATE msg_queues SET sender_key = ? WHERE recipient_id = ? AND deleted_at IS NULL" (sKey, recipientId sq)
DB.execute db "UPDATE msg_queues SET sender_key = ? WHERE recipient_id = ? AND deleted_at IS NULL" (sKey, rId)
atomically $ writeTVar (queueRec sq) $ Just q {senderKey = Just sKey}
withLog "secureQueue" st $ \s -> logSecureQueue s rId sKey
where
rId = recipientId sq
verify q = case senderKey q of
Just k | sKey /= k -> throwE AUTH
_ -> pure ()
@@ -185,6 +198,7 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
atomically $ writeTVar (queueRec sq) $ Just q'
-- cache queue notifier ID after notifier is added ntf server will likely subscribe
atomically $ TM.insert nId rId notifiers
withLog "addQueueNotifier" st $ \s -> logAddNotifier s rId ntfCreds
pure nId_
where
PostgresQueueStore {notifiers} = st
@@ -208,8 +222,10 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
assertUpdated $ withDB' "deleteQueueNotifier" st update
atomically $ TM.delete nId $ notifiers st
atomically $ writeTVar (queueRec sq) $ Just q {notifier = Nothing}
withLog "deleteQueueNotifier" st (`logDeleteNotifier` rId)
pure nId
where
rId = recipientId sq
update db =
DB.execute
db
@@ -218,16 +234,22 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
SET notifier_id = NULL, notifier_key = NULL, rcv_ntf_dh_secret = NULL
WHERE recipient_id = ? AND deleted_at IS NULL
|]
(Only $ recipientId sq)
(Only rId)
suspendQueue :: PostgresQueueStore q -> q -> IO (Either ErrorType ())
suspendQueue st sq = setStatusDB "suspendQueue" st sq EntityOff
suspendQueue st sq =
setStatusDB "suspendQueue" st sq EntityOff $
withLog "suspendQueue" st (`logSuspendQueue` recipientId sq)
blockQueue :: PostgresQueueStore q -> q -> BlockingInfo -> IO (Either ErrorType ())
blockQueue st sq info = setStatusDB "blockQueue" st sq (EntityBlocked info)
blockQueue st sq info =
setStatusDB "blockQueue" st sq (EntityBlocked info) $
withLog "blockQueue" st $ \sl -> logBlockQueue sl (recipientId sq) info
unblockQueue :: PostgresQueueStore q -> q -> IO (Either ErrorType ())
unblockQueue st sq = setStatusDB "unblockQueue" st sq EntityActive
unblockQueue st sq =
setStatusDB "unblockQueue" st sq EntityActive $
withLog "unblockQueue" st (`logUnblockQueue` recipientId sq)
updateQueueTime :: PostgresQueueStore q -> q -> RoundedSystemTime -> IO (Either ErrorType QueueRec)
updateQueueTime st sq t =
@@ -236,10 +258,13 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
then pure q
else do
assertUpdated $ withDB' "updateQueueTime" st $ \db ->
DB.execute db "UPDATE msg_queues SET updated_at = ? WHERE recipient_id = ? AND deleted_at IS NULL" (t, recipientId sq)
DB.execute db "UPDATE msg_queues SET updated_at = ? WHERE recipient_id = ? AND deleted_at IS NULL" (t, rId)
let !q' = q {updatedAt = Just t}
atomically $ writeTVar (queueRec sq) $ Just q'
withLog "updateQueueTime" st $ \sl -> logUpdateQueueTime sl rId t
pure q'
where
rId = recipientId sq
-- this method is called from JournalMsgStore deleteQueue that already locks the queue
deleteStoreQueue :: PostgresQueueStore q -> q -> IO (Either ErrorType (QueueRec, Maybe (MsgQueue q)))
@@ -247,12 +272,15 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
q <- ExceptT $ readQueueRecIO qr
RoundedSystemTime ts <- liftIO getSystemDate
assertUpdated $ withDB' "deleteStoreQueue" st $ \db ->
DB.execute db "UPDATE msg_queues SET deleted_at = ? WHERE recipient_id = ? AND deleted_at IS NULL" (ts, recipientId sq)
DB.execute db "UPDATE msg_queues SET deleted_at = ? WHERE recipient_id = ? AND deleted_at IS NULL" (ts, rId)
atomically $ writeTVar qr Nothing
atomically $ TM.delete (senderId q) $ senders st
forM_ (notifier q) $ \NtfCreds {notifierId} -> atomically $ TM.delete notifierId $ notifiers st
(q,) <$> atomically (swapTVar (msgQueue sq) Nothing)
mq_ <- atomically $ swapTVar (msgQueue sq) Nothing
withLog "deleteStoreQueue" st (`logDeleteQueue` rId)
pure (q, mq_)
where
rId = recipientId sq
qr = queueRec sq
batchInsertQueues :: StoreQueueClass q => Bool -> M.Map RecipientId q -> PostgresQueueStore q' -> IO Int64
@@ -335,12 +363,13 @@ rowToQueueRec (rId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, n
let notifier = NtfCreds <$> notifierId_ <*> notifierKey_ <*> rcvNtfDhSecret_
in (rId, QueueRec {recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, status, updatedAt})
setStatusDB :: StoreQueueClass q => String -> PostgresQueueStore q -> q -> ServerEntityStatus -> IO (Either ErrorType ())
setStatusDB op st sq status =
setStatusDB :: StoreQueueClass q => String -> PostgresQueueStore q -> q -> ServerEntityStatus -> ExceptT ErrorType IO () -> IO (Either ErrorType ())
setStatusDB op st sq status writeLog =
withQueueDB sq op $ \q -> do
assertUpdated $ withDB' op st $ \db ->
DB.execute db "UPDATE msg_queues SET status = ? WHERE recipient_id = ? AND deleted_at IS NULL" (status, recipientId sq)
atomically $ writeTVar (queueRec sq) $ Just q {status}
writeLog
withQueueDB :: StoreQueueClass q => q -> String -> (QueueRec -> ExceptT ErrorType IO a) -> IO (Either ErrorType a)
withQueueDB sq op action =
@@ -361,6 +390,11 @@ withDB op st action =
where
err = op <> ", withLog, " <> show e
withLog :: MonadIO m => String -> PostgresQueueStore q -> (StoreLog 'WriteMode -> IO ()) -> m ()
withLog op PostgresQueueStore {dbStoreLog} action =
forM_ dbStoreLog $ \sl -> liftIO $ E.uninterruptibleMask_ (action sl) `catchAny` \e ->
logWarn $ "STORE: " <> T.pack (op <> ", withLog, " <> show e)
handleDuplicate :: SqlError -> IO ErrorType
handleDuplicate e = case constraintViolation e of
Just (UniqueViolation _) -> pure AUTH