mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-30 14:16:00 +00:00
smp server: mask database store operations from async exception for state consistency (#1481)
* smp server: mask database store operations from async exception for state consistency * remove unused, comments
This commit is contained in:
@@ -800,7 +800,6 @@ data StoreCmd = SCImport | SCExport | SCDelete
|
||||
data InitOptions = InitOptions
|
||||
{ enableStoreLog :: Bool,
|
||||
dbOptions :: DBOpts,
|
||||
dbMigrateUp :: Bool,
|
||||
logStats :: Bool,
|
||||
signAlgorithm :: SignAlgorithm,
|
||||
ip :: HostName,
|
||||
@@ -842,12 +841,6 @@ cliCommandP cfgPath logPath iniFile =
|
||||
<> help "Enable store log for persistence"
|
||||
)
|
||||
dbOptions <- dbOptsP
|
||||
-- TODO [postgresql] remove
|
||||
dbMigrateUp <-
|
||||
switch
|
||||
( long "db-migrate-up"
|
||||
<> help "Automatically confirm \"up\" database migrations"
|
||||
)
|
||||
logStats <-
|
||||
switch
|
||||
( long "daily-stats"
|
||||
@@ -951,7 +944,6 @@ cliCommandP cfgPath logPath iniFile =
|
||||
InitOptions
|
||||
{ enableStoreLog,
|
||||
dbOptions,
|
||||
dbMigrateUp,
|
||||
logStats,
|
||||
signAlgorithm,
|
||||
ip,
|
||||
|
||||
@@ -140,7 +140,7 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
|
||||
addQueue_ :: PostgresQueueStore q -> (RecipientId -> QueueRec -> IO q) -> RecipientId -> QueueRec -> IO (Either ErrorType q)
|
||||
addQueue_ st mkQ rId qr = do
|
||||
sq <- mkQ rId qr
|
||||
withQueueLock sq "addQueue_" $ runExceptT $ do
|
||||
withQueueLock sq "addQueue_" $ E.uninterruptibleMask_ $ runExceptT $ do
|
||||
void $ withDB "addQueue_" st $ \db ->
|
||||
E.try (DB.execute db insertQueueQuery $ queueRecToRow (rId, qr))
|
||||
>>= bimapM handleDuplicate pure
|
||||
@@ -203,7 +203,6 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
|
||||
where
|
||||
PostgresQueueStore {notifiers} = st
|
||||
rId = recipientId sq
|
||||
-- TODO [postgres] test how this query works with duplicate recipient_id (updates) and notifier_id (fails)
|
||||
update db =
|
||||
DB.execute
|
||||
db
|
||||
@@ -268,7 +267,7 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
|
||||
|
||||
-- this method is called from JournalMsgStore deleteQueue that already locks the queue
|
||||
deleteStoreQueue :: PostgresQueueStore q -> q -> IO (Either ErrorType (QueueRec, Maybe (MsgQueue q)))
|
||||
deleteStoreQueue st sq = runExceptT $ do
|
||||
deleteStoreQueue st sq = E.uninterruptibleMask_ $ runExceptT $ do
|
||||
q <- ExceptT $ readQueueRecIO qr
|
||||
RoundedSystemTime ts <- liftIO getSystemDate
|
||||
assertUpdated $ withDB' "deleteStoreQueue" st $ \db ->
|
||||
@@ -373,7 +372,7 @@ setStatusDB op st sq status writeLog =
|
||||
|
||||
withQueueDB :: StoreQueueClass q => q -> String -> (QueueRec -> ExceptT ErrorType IO a) -> IO (Either ErrorType a)
|
||||
withQueueDB sq op action =
|
||||
withQueueLock sq op $ runExceptT $ ExceptT (readQueueRecIO $ queueRec sq) >>= action
|
||||
withQueueLock sq op $ E.uninterruptibleMask_ $ runExceptT $ ExceptT (readQueueRecIO $ queueRec sq) >>= action
|
||||
|
||||
assertUpdated :: ExceptT ErrorType IO Int64 -> ExceptT ErrorType IO ()
|
||||
assertUpdated = (>>= \n -> when (n == 0) (throwE AUTH))
|
||||
@@ -392,7 +391,7 @@ withDB op st action =
|
||||
|
||||
withLog :: MonadIO m => String -> PostgresQueueStore q -> (StoreLog 'WriteMode -> IO ()) -> m ()
|
||||
withLog op PostgresQueueStore {dbStoreLog} action =
|
||||
forM_ dbStoreLog $ \sl -> liftIO $ E.uninterruptibleMask_ (action sl) `catchAny` \e ->
|
||||
forM_ dbStoreLog $ \sl -> liftIO $ action sl `catchAny` \e ->
|
||||
logWarn $ "STORE: " <> T.pack (op <> ", withLog, " <> show e)
|
||||
|
||||
handleDuplicate :: SqlError -> IO ErrorType
|
||||
|
||||
Reference in New Issue
Block a user