mirror of
https://github.com/simplex-chat/simplex-chat.git
synced 2026-03-30 22:55:48 +00:00
core: organize withAckMessage (#3889)
* core: organize withAckMessage * mark critical sections * differentiate DB internal error from chat * throw CRITICALs * only CRIT on SEDatabaseError * normalize errors * shift MonadError into ExceptT * simplify * split critical handlers * names, CRITICAL error in withAckMessage, comments * only show critical alerts when database was locked or busy and message failed to process --------- Co-authored-by: Evgeny Poberezkin <evgeny@poberezkin.com>
This commit is contained in:
committed by
GitHub
parent
5fd8e6e4fe
commit
7fa2f2f72e
@@ -3291,10 +3291,24 @@ processAgentMessage _ connId DEL_CONN =
|
||||
toView $ CRAgentConnDeleted (AgentConnId connId)
|
||||
processAgentMessage corrId connId msg = do
|
||||
vr <- chatVersionRange
|
||||
withStore' (`getUserByAConnId` AgentConnId connId) >>= \case
|
||||
-- getUserByAConnId never throws logical errors, only SEDBBusyError can be thrown here
|
||||
critical (withStore' (`getUserByAConnId` AgentConnId connId)) >>= \case
|
||||
Just user -> processAgentMessageConn vr user corrId connId msg `catchChatError` (toView . CRChatError (Just user))
|
||||
_ -> throwChatError $ CENoConnectionUser (AgentConnId connId)
|
||||
|
||||
-- CRITICAL error will be shown to the user as alert with restart button in Android/desktop apps.
|
||||
-- SEDBBusyError will only be thrown on IO exceptions or SQLError during DB queries,
|
||||
-- e.g. when database is locked or busy for longer than 3s.
|
||||
-- In this case there is no better mitigation than showing alert:
|
||||
-- - without ACK the message delivery will be stuck,
|
||||
-- - with ACK message will be lost, as it failed to be saved.
|
||||
-- Full app restart is likely to resolve database condition and the message will be received and processed again.
|
||||
critical :: ChatMonad m => m a -> m a
|
||||
critical a =
|
||||
a `catchChatError` \case
|
||||
ChatErrorStore SEDBBusyError {message} -> throwError $ ChatErrorAgent (CRITICAL True message) Nothing
|
||||
e -> throwError e
|
||||
|
||||
processAgentMessageNoConn :: forall m. ChatMonad m => ACommand 'Agent 'AENone -> m ()
|
||||
processAgentMessageNoConn = \case
|
||||
CONNECT p h -> hostEvent $ CRHostConnected p h
|
||||
@@ -3482,9 +3496,13 @@ processAgentMsgRcvFile _corrId aFileId msg =
|
||||
agentXFTPDeleteRcvFile aFileId fileId
|
||||
toView $ CRRcvFileError user ci e ft
|
||||
|
||||
processAgentMessageConn :: forall m. ChatMonad m => (PQSupport -> VersionRangeChat) -> User -> ACorrId -> ConnId -> ACommand 'Agent 'AEConn -> m ()
|
||||
processAgentMessageConn :: forall m . ChatMonad m => (PQSupport -> VersionRangeChat) -> User -> ACorrId -> ConnId -> ACommand 'Agent 'AEConn -> m ()
|
||||
processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage = do
|
||||
entity <- withStore (\db -> getConnectionEntity db vr user $ AgentConnId agentConnId) >>= updateConnStatus
|
||||
-- Missing connection/entity errors here will be sent to the view but not shown as CRITICAL alert,
|
||||
-- as in this case no need to ACK message - we can't process messages for this connection anyway.
|
||||
-- SEDBException will be re-trown as CRITICAL as it is likely to indicate a temporary database condition
|
||||
-- that will be resolved with app restart.
|
||||
entity <- critical $ withStore (\db -> getConnectionEntity db vr user $ AgentConnId agentConnId) >>= updateConnStatus
|
||||
case agentMessage of
|
||||
END -> case entity of
|
||||
RcvDirectMsgConnection _ (Just ct) -> toView $ CRContactAnotherClient user ct
|
||||
@@ -3547,12 +3565,11 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
|
||||
processINFOpqSupport conn pqSupport
|
||||
_conn' <- saveConnInfo conn connInfo
|
||||
pure ()
|
||||
MSG meta _msgFlags msgBody -> do
|
||||
cmdId <- createAckCmd conn
|
||||
MSG meta _msgFlags msgBody ->
|
||||
-- TODO only acknowledge without saving message?
|
||||
-- probably this branch is never executed, so there should be no reason
|
||||
-- to save message if contact hasn't been created yet - chat item isn't created anyway
|
||||
withAckMessage agentConnId cmdId meta $ do
|
||||
withAckMessage agentConnId conn meta False $ \cmdId -> do
|
||||
(_conn', _) <- saveDirectRcvMSG conn meta cmdId msgBody
|
||||
pure False
|
||||
SENT msgId ->
|
||||
@@ -3584,12 +3601,11 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
|
||||
forM_ contData $ \(hostConnId, xGrpMemIntroCont) ->
|
||||
sendXGrpMemInv hostConnId (Just directConnReq) xGrpMemIntroCont
|
||||
CRContactUri _ -> throwChatError $ CECommandError "unexpected ConnectionRequestUri type"
|
||||
MSG msgMeta _msgFlags msgBody -> do
|
||||
let MsgMeta {pqEncryption} = msgMeta
|
||||
(ct', conn') <- updateContactPQRcv user ct conn pqEncryption
|
||||
checkIntegrityCreateItem (CDDirectRcv ct') msgMeta
|
||||
cmdId <- createAckCmd conn'
|
||||
withAckMessage agentConnId cmdId msgMeta $ do
|
||||
MSG msgMeta _msgFlags msgBody ->
|
||||
withAckMessage agentConnId conn msgMeta True $ \cmdId -> do
|
||||
let MsgMeta {pqEncryption} = msgMeta
|
||||
(ct', conn') <- updateContactPQRcv user ct conn pqEncryption
|
||||
checkIntegrityCreateItem (CDDirectRcv ct') msgMeta `catchChatError` \_ -> pure ()
|
||||
(conn'', msg@RcvMessage {chatMsgEvent = ACME _ event}) <- saveDirectRcvMSG conn' msgMeta cmdId msgBody
|
||||
let ct'' = ct' {activeConn = Just conn''} :: Contact
|
||||
assertDirectAllowed user MDRcv ct'' $ toCMEventTag event
|
||||
@@ -3995,10 +4011,8 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
|
||||
void $ sendDirectMemberMessage imConn (XGrpMemCon memberId) groupId
|
||||
_ -> messageWarning "sendXGrpMemCon: member category GCPreMember or GCPostMember is expected"
|
||||
MSG msgMeta _msgFlags msgBody -> do
|
||||
checkIntegrityCreateItem (CDGroupRcv gInfo m) msgMeta
|
||||
cmdId <- createAckCmd conn
|
||||
let aChatMsgs = parseChatMessages msgBody
|
||||
withAckMessage agentConnId cmdId msgMeta $ do
|
||||
withAckMessage agentConnId conn msgMeta True $ \cmdId -> do
|
||||
checkIntegrityCreateItem (CDGroupRcv gInfo m) msgMeta `catchChatError` \_ -> pure ()
|
||||
forM_ aChatMsgs $ \case
|
||||
Right (ACMsg _ chatMsg) ->
|
||||
processEvent cmdId chatMsg `catchChatError` \e -> toView $ CRChatError (Just user) e
|
||||
@@ -4010,6 +4024,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
|
||||
[Right (ACMsg _ chatMsg)] -> forwardMsg_ chatMsg
|
||||
_ -> pure ()
|
||||
where
|
||||
aChatMsgs = parseChatMessages msgBody
|
||||
brokerTs = metaBrokerTs msgMeta
|
||||
processEvent :: MsgEncodingI e => CommandId -> ChatMessage e -> m ()
|
||||
processEvent cmdId chatMsg = do
|
||||
@@ -4046,12 +4061,12 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
|
||||
BFileChunk sharedMsgId chunk -> bFileChunkGroup gInfo sharedMsgId chunk msgMeta
|
||||
_ -> messageError $ "unsupported message: " <> T.pack (show event)
|
||||
checkSendRcpt :: [AChatMessage] -> m Bool
|
||||
checkSendRcpt aChatMsgs = do
|
||||
checkSendRcpt aMsgs = do
|
||||
currentMemCount <- withStore' $ \db -> getGroupCurrentMembersCount db user gInfo
|
||||
let GroupInfo {chatSettings = ChatSettings {sendRcpts}} = gInfo
|
||||
pure $
|
||||
fromMaybe (sendRcptsSmallGroups user) sendRcpts
|
||||
&& any aChatMsgHasReceipt aChatMsgs
|
||||
&& any aChatMsgHasReceipt aMsgs
|
||||
&& currentMemCount <= smallGroupsRcptsMemLimit
|
||||
where
|
||||
aChatMsgHasReceipt (ACMsg _ ChatMessage {chatMsgEvent}) =
|
||||
@@ -4241,6 +4256,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
|
||||
_ -> pure ()
|
||||
CON _ -> startReceivingFile user fileId
|
||||
MSG meta _ msgBody -> do
|
||||
-- XXX: not all branches do ACK
|
||||
parseFileChunk msgBody >>= receiveFileChunk ft (Just conn) meta
|
||||
OK ->
|
||||
-- [async agent commands] continuation on receiving OK
|
||||
@@ -4384,19 +4400,22 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
|
||||
|
||||
withAckMessage' :: ConnId -> Connection -> MsgMeta -> m () -> m ()
|
||||
withAckMessage' cId conn msgMeta action = do
|
||||
cmdId <- createAckCmd conn
|
||||
withAckMessage cId cmdId msgMeta $ action $> False
|
||||
withAckMessage cId conn msgMeta False $ \_cmdId -> action $> False
|
||||
|
||||
withAckMessage :: ConnId -> CommandId -> MsgMeta -> m Bool -> m ()
|
||||
withAckMessage cId cmdId msgMeta action = do
|
||||
withAckMessage :: ConnId -> Connection -> MsgMeta -> Bool -> (CommandId -> m Bool) -> m ()
|
||||
withAckMessage cId conn msgMeta showCritical action = do
|
||||
cmdId <- createAckCmd conn `catchChatError` \e -> throwError $ ChatErrorAgent (CRITICAL True $ show e) Nothing
|
||||
-- [async agent commands] command should be asynchronous, continuation is ackMsgDeliveryEvent
|
||||
-- TODO catching error and sending ACK after an error, particularly if it is a database error, will result in the message not processed (and no notification to the user).
|
||||
-- Possible solutions are:
|
||||
-- 1) retry processing several times
|
||||
-- 2) stabilize database
|
||||
-- 3) show screen of death to the user asking to restart
|
||||
tryChatError action >>= \case
|
||||
tryChatError (action cmdId) >>= \case
|
||||
Right withRcpt -> ackMsg cId cmdId msgMeta $ if withRcpt then Just "" else Nothing
|
||||
-- If showCritical is True, then these errors don't result in ACK and show user visible alert
|
||||
-- This prevents losing the message that failed to be processed.
|
||||
Left (ChatErrorStore SEDBBusyError {message}) | showCritical -> throwError $ ChatErrorAgent (CRITICAL True message) Nothing
|
||||
Left e -> ackMsg cId cmdId msgMeta Nothing >> throwError e
|
||||
|
||||
ackMsg :: ConnId -> CommandId -> MsgMeta -> Maybe MsgReceiptInfo -> m ()
|
||||
@@ -4997,9 +5016,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
|
||||
checkIntegrityCreateItem :: forall c. ChatTypeI c => ChatDirection c 'MDRcv -> MsgMeta -> m ()
|
||||
checkIntegrityCreateItem cd MsgMeta {integrity, broker = (_, brokerTs)} = case integrity of
|
||||
MsgOk -> pure ()
|
||||
MsgError e ->
|
||||
createInternalChatItem user cd (CIRcvIntegrityError e) (Just brokerTs)
|
||||
`catchChatError` \_ -> pure ()
|
||||
MsgError e -> createInternalChatItem user cd (CIRcvIntegrityError e) (Just brokerTs)
|
||||
|
||||
xInfo :: Contact -> Profile -> m ()
|
||||
xInfo c p' = void $ processContactProfileUpdate c p' True
|
||||
@@ -5719,7 +5736,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
|
||||
|
||||
directMsgReceived :: Contact -> Connection -> MsgMeta -> NonEmpty MsgReceipt -> m ()
|
||||
directMsgReceived ct conn@Connection {connId} msgMeta msgRcpts = do
|
||||
checkIntegrityCreateItem (CDDirectRcv ct) msgMeta
|
||||
checkIntegrityCreateItem (CDDirectRcv ct) msgMeta `catchChatError` \_ -> pure ()
|
||||
forM_ msgRcpts $ \MsgReceipt {agentMsgId, msgRcptStatus} -> do
|
||||
withStore' $ \db -> updateSndMsgDeliveryStatus db connId agentMsgId $ MDSSndRcvd msgRcptStatus
|
||||
updateDirectItemStatus ct conn agentMsgId $ CISSndRcvd msgRcptStatus SSPComplete
|
||||
@@ -5731,7 +5748,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
|
||||
-- - getChatItemIdByAgentMsgId to return [ChatItemId]
|
||||
groupMsgReceived :: GroupInfo -> GroupMember -> Connection -> MsgMeta -> NonEmpty MsgReceipt -> m ()
|
||||
groupMsgReceived gInfo m conn@Connection {connId} msgMeta msgRcpts = do
|
||||
checkIntegrityCreateItem (CDGroupRcv gInfo m) msgMeta
|
||||
checkIntegrityCreateItem (CDGroupRcv gInfo m) msgMeta `catchChatError` \_ -> pure ()
|
||||
forM_ msgRcpts $ \MsgReceipt {agentMsgId, msgRcptStatus} -> do
|
||||
withStore' $ \db -> updateSndMsgDeliveryStatus db connId agentMsgId $ MDSSndRcvd msgRcptStatus
|
||||
updateGroupItemStatus gInfo m conn agentMsgId $ CISSndRcvd msgRcptStatus SSPComplete
|
||||
|
||||
@@ -46,6 +46,8 @@ import Data.Time (NominalDiffTime, UTCTime)
|
||||
import Data.Time.Clock.System (systemToUTCTime)
|
||||
import Data.Version (showVersion)
|
||||
import Data.Word (Word16)
|
||||
import Database.SQLite.Simple (SQLError)
|
||||
import qualified Database.SQLite.Simple as SQL
|
||||
import Language.Haskell.TH (Exp, Q, runIO)
|
||||
import Numeric.Natural
|
||||
import qualified Paths_simplex_chat as SC
|
||||
@@ -80,7 +82,7 @@ import Simplex.Messaging.Protocol (AProtoServerWithAuth, AProtocolType (..), Cor
|
||||
import Simplex.Messaging.TMap (TMap)
|
||||
import Simplex.Messaging.Transport (TLS, simplexMQVersion)
|
||||
import Simplex.Messaging.Transport.Client (TransportHost)
|
||||
import Simplex.Messaging.Util (allFinally, catchAllErrors, liftEitherError, tryAllErrors, (<$$>))
|
||||
import Simplex.Messaging.Util (allFinally, catchAllErrors, liftIOEither, tryAllErrors, (<$$>))
|
||||
import Simplex.RemoteControl.Client
|
||||
import Simplex.RemoteControl.Invitation (RCSignedInvitation, RCVerifiedInvitation)
|
||||
import Simplex.RemoteControl.Types
|
||||
@@ -1296,30 +1298,23 @@ withStoreCtx' :: ChatMonad m => Maybe String -> (DB.Connection -> IO a) -> m a
|
||||
withStoreCtx' ctx_ action = withStoreCtx ctx_ $ liftIO . action
|
||||
|
||||
withStoreCtx :: ChatMonad m => Maybe String -> (DB.Connection -> ExceptT StoreError IO a) -> m a
|
||||
withStoreCtx ctx_ action = do
|
||||
withStoreCtx _ctx action = do
|
||||
ChatController {chatStore} <- ask
|
||||
liftEitherError ChatErrorStore $ case ctx_ of
|
||||
Nothing -> withTransaction chatStore (runExceptT . action) `catch` handleInternal ""
|
||||
-- uncomment to debug store performance
|
||||
-- Just ctx -> do
|
||||
-- t1 <- liftIO getCurrentTime
|
||||
-- putStrLn $ "withStoreCtx start :: " <> show t1 <> " :: " <> ctx
|
||||
-- r <- withTransactionCtx ctx_ chatStore (runExceptT . action) `E.catch` handleInternal (" (" <> ctx <> ")")
|
||||
-- t2 <- liftIO getCurrentTime
|
||||
-- putStrLn $ "withStoreCtx end :: " <> show t2 <> " :: " <> ctx <> " :: duration=" <> show (diffToMilliseconds $ diffUTCTime t2 t1)
|
||||
-- pure r
|
||||
Just _ -> withTransaction chatStore (runExceptT . action) `catch` handleInternal ""
|
||||
where
|
||||
handleInternal :: String -> SomeException -> IO (Either StoreError a)
|
||||
handleInternal ctxStr e = pure . Left . SEInternalError $ show e <> ctxStr
|
||||
liftIOEither $ withTransaction chatStore (runExceptT . withExceptT ChatErrorStore . action) `E.catches` handleDBErrors
|
||||
|
||||
withStoreBatch :: (ChatMonad' m, Traversable t) => (DB.Connection -> t (IO (Either ChatError a))) -> m (t (Either ChatError a))
|
||||
withStoreBatch actions = do
|
||||
ChatController {chatStore} <- ask
|
||||
liftIO $ withTransaction chatStore $ mapM (`E.catch` handleInternal) . actions
|
||||
where
|
||||
handleInternal :: E.SomeException -> IO (Either ChatError a)
|
||||
handleInternal = pure . Left . ChatError . CEInternalError . show
|
||||
liftIO $ withTransaction chatStore $ mapM (`E.catches` handleDBErrors) . actions
|
||||
|
||||
handleDBErrors :: [E.Handler IO (Either ChatError a)]
|
||||
handleDBErrors =
|
||||
[ E.Handler $ \(e :: SQLError) ->
|
||||
let se = SQL.sqlError e
|
||||
busy = se == SQL.ErrorBusy || se == SQL.ErrorLocked
|
||||
in pure . Left . ChatErrorStore $ if busy then SEDBBusyError $ show se else SEDBException $ show e,
|
||||
E.Handler $ \(E.SomeException e) -> pure . Left . ChatErrorStore . SEDBException $ show e
|
||||
]
|
||||
|
||||
withStoreBatch' :: (ChatMonad' m, Traversable t) => (DB.Connection -> t (IO a)) -> m (t (Either ChatError a))
|
||||
withStoreBatch' actions = withStoreBatch $ fmap (fmap Right) . actions
|
||||
|
||||
@@ -95,6 +95,8 @@ data StoreError
|
||||
| SEUniqueID
|
||||
| SELargeMsg
|
||||
| SEInternalError {message :: String}
|
||||
| SEDBException {message :: String}
|
||||
| SEDBBusyError {message :: String}
|
||||
| SEBadChatItem {itemId :: ChatItemId, itemTs :: Maybe ChatItemTs}
|
||||
| SEChatItemNotFound {itemId :: ChatItemId}
|
||||
| SEChatItemNotFoundByText {text :: Text}
|
||||
|
||||
Reference in New Issue
Block a user