From fdf8bd7ee2b7d090c13df3e210d7739a7e49c6ae Mon Sep 17 00:00:00 2001 From: Evgeny Date: Thu, 13 Mar 2025 13:57:46 +0000 Subject: [PATCH] smp server: mask database store operations from async exception for state consistency (#1481) * smp server: mask database store operations from async exception for state consistency * remove unused, comments --- src/Simplex/Messaging/Server/Main.hs | 8 -------- src/Simplex/Messaging/Server/QueueStore/Postgres.hs | 9 ++++----- 2 files changed, 4 insertions(+), 13 deletions(-) diff --git a/src/Simplex/Messaging/Server/Main.hs b/src/Simplex/Messaging/Server/Main.hs index e3db02259..1c6dc38b5 100644 --- a/src/Simplex/Messaging/Server/Main.hs +++ b/src/Simplex/Messaging/Server/Main.hs @@ -800,7 +800,6 @@ data StoreCmd = SCImport | SCExport | SCDelete data InitOptions = InitOptions { enableStoreLog :: Bool, dbOptions :: DBOpts, - dbMigrateUp :: Bool, logStats :: Bool, signAlgorithm :: SignAlgorithm, ip :: HostName, @@ -842,12 +841,6 @@ cliCommandP cfgPath logPath iniFile = <> help "Enable store log for persistence" ) dbOptions <- dbOptsP - -- TODO [postgresql] remove - dbMigrateUp <- - switch - ( long "db-migrate-up" - <> help "Automatically confirm \"up\" database migrations" - ) logStats <- switch ( long "daily-stats" @@ -951,7 +944,6 @@ cliCommandP cfgPath logPath iniFile = InitOptions { enableStoreLog, dbOptions, - dbMigrateUp, logStats, signAlgorithm, ip, diff --git a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs index 0e30ed66d..0d5796298 100644 --- a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs +++ b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs @@ -140,7 +140,7 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where addQueue_ :: PostgresQueueStore q -> (RecipientId -> QueueRec -> IO q) -> RecipientId -> QueueRec -> IO (Either ErrorType q) addQueue_ st mkQ rId qr = do sq <- mkQ rId qr - withQueueLock sq "addQueue_" $ runExceptT $ do + withQueueLock sq "addQueue_" $ E.uninterruptibleMask_ $ runExceptT $ do void $ withDB "addQueue_" st $ \db -> E.try (DB.execute db insertQueueQuery $ queueRecToRow (rId, qr)) >>= bimapM handleDuplicate pure @@ -203,7 +203,6 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where where PostgresQueueStore {notifiers} = st rId = recipientId sq - -- TODO [postgres] test how this query works with duplicate recipient_id (updates) and notifier_id (fails) update db = DB.execute db @@ -268,7 +267,7 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where -- this method is called from JournalMsgStore deleteQueue that already locks the queue deleteStoreQueue :: PostgresQueueStore q -> q -> IO (Either ErrorType (QueueRec, Maybe (MsgQueue q))) - deleteStoreQueue st sq = runExceptT $ do + deleteStoreQueue st sq = E.uninterruptibleMask_ $ runExceptT $ do q <- ExceptT $ readQueueRecIO qr RoundedSystemTime ts <- liftIO getSystemDate assertUpdated $ withDB' "deleteStoreQueue" st $ \db -> @@ -373,7 +372,7 @@ setStatusDB op st sq status writeLog = withQueueDB :: StoreQueueClass q => q -> String -> (QueueRec -> ExceptT ErrorType IO a) -> IO (Either ErrorType a) withQueueDB sq op action = - withQueueLock sq op $ runExceptT $ ExceptT (readQueueRecIO $ queueRec sq) >>= action + withQueueLock sq op $ E.uninterruptibleMask_ $ runExceptT $ ExceptT (readQueueRecIO $ queueRec sq) >>= action assertUpdated :: ExceptT ErrorType IO Int64 -> ExceptT ErrorType IO () assertUpdated = (>>= \n -> when (n == 0) (throwE AUTH)) @@ -392,7 +391,7 @@ withDB op st action = withLog :: MonadIO m => String -> PostgresQueueStore q -> (StoreLog 'WriteMode -> IO ()) -> m () withLog op PostgresQueueStore {dbStoreLog} action = - forM_ dbStoreLog $ \sl -> liftIO $ E.uninterruptibleMask_ (action sl) `catchAny` \e -> + forM_ dbStoreLog $ \sl -> liftIO $ action sl `catchAny` \e -> logWarn $ "STORE: " <> T.pack (op <> ", withLog, " <> show e) handleDuplicate :: SqlError -> IO ErrorType