core: message statuses for sending proxies (#4161)

* core: delivery path

* update simplexmq

* via proxy snd flags

* error statuses

* rework errors

* proxy expired errors

* corrections

* move backwards compatibile parser to new type

* update simplexmq

* names

* refactor, style

* simplexmq

* refactor

---------

Co-authored-by: Evgeny Poberezkin <evgeny@poberezkin.com>
This commit is contained in:
spaced4ndy
2024-05-15 15:30:05 +04:00
committed by GitHub
parent 3129602a78
commit 4c0d47bbd4
10 changed files with 191 additions and 57 deletions
+40 -22
View File
@@ -100,7 +100,7 @@ import Simplex.Messaging.Agent.Store.SQLite (MigrationConfirmation (..), Migrati
import Simplex.Messaging.Agent.Store.SQLite.DB (SlowQueryStats (..))
import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB
import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations
import Simplex.Messaging.Client (defaultNetworkConfig)
import Simplex.Messaging.Client (ProxyClientError (..), defaultNetworkConfig)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Crypto.File (CryptoFile (..), CryptoFileArgs (..))
import qualified Simplex.Messaging.Crypto.File as CF
@@ -113,6 +113,7 @@ import Simplex.Messaging.Protocol (AProtoServerWithAuth (..), AProtocolType (..)
import qualified Simplex.Messaging.Protocol as SMP
import Simplex.Messaging.ServiceScheme (ServiceScheme (..))
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport (TransportError (..))
import Simplex.Messaging.Transport.Client (defaultSocksProxy)
import Simplex.Messaging.Util
import Simplex.Messaging.Version
@@ -699,10 +700,7 @@ processChatCommand' vr = \case
(,) <$> getAChatItem db vr user chatRef itemId <*> liftIO (getChatItemVersions db itemId)
let itemVersions = if null versions then maybeToList $ mkItemVersion ci else versions
memberDeliveryStatuses <- case (cType, dir) of
(SCTGroup, SMDSnd) -> do
withStore' (`getGroupSndStatuses` itemId) >>= \case
[] -> pure Nothing
memStatuses -> pure $ Just $ map (uncurry MemberDeliveryStatus) memStatuses
(SCTGroup, SMDSnd) -> L.nonEmpty <$> withStore' (`getGroupSndStatuses` itemId)
_ -> pure Nothing
forwardedFromChatItem <- getForwardedFromItem user ci
pure $ CRChatItemInfo user aci ChatItemInfo {itemVersions, memberDeliveryStatuses, forwardedFromChatItem}
@@ -3896,7 +3894,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
withAckMessage' agentConnId meta $
void $
saveDirectRcvMSG conn meta msgBody
SENT msgId ->
SENT msgId _proxy ->
sentMsgDeliveryEvent conn msgId
OK ->
-- [async agent commands] continuation on receiving OK
@@ -4029,10 +4027,13 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
notifyMemberConnected gInfo m $ Just ct
let connectedIncognito = contactConnIncognito ct || incognitoMembership gInfo
when (memberCategory m == GCPreMember) $ probeMatchingContactsAndMembers ct connectedIncognito True
SENT msgId -> do
SENT msgId proxy -> do
sentMsgDeliveryEvent conn msgId
checkSndInlineFTComplete conn msgId
updateDirectItemStatus ct conn msgId $ CISSndSent SSPComplete
ci_ <- withStore $ \db -> do
ci_ <- updateDirectItemStatus' db ct conn msgId (CISSndSent SSPComplete)
forM ci_ $ \ci -> liftIO $ setDirectSndChatItemViaProxy db user ct ci (isJust proxy)
forM_ ci_ $ \ci -> toView $ CRChatItemStatusUpdated user (AChatItem SCTDirect SMDSnd (DirectChat ct) ci)
SWITCH qd phase cStats -> do
toView $ CRContactSwitch user ct (SwitchProgress qd phase cStats)
when (phase `elem` [SPStarted, SPCompleted]) $ case qd of
@@ -4067,13 +4068,15 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
OK ->
-- [async agent commands] continuation on receiving OK
when (corrId /= "") $ withCompletedCommand conn agentMsg $ \_cmdData -> pure ()
MWARN msgId err ->
updateDirectItemStatus ct conn msgId (CISSndWarning $ agentSndError err)
MERR msgId err -> do
updateDirectItemStatus ct conn msgId $ agentErrToItemStatus err
updateDirectItemStatus ct conn msgId (CISSndError $ agentSndError err)
toView $ CRChatError (Just user) (ChatErrorAgent err $ Just connEntity)
incAuthErrCounter connEntity conn err
MERRS msgIds err -> do
-- error cannot be AUTH error here
updateDirectItemsStatus ct conn (L.toList msgIds) $ agentErrToItemStatus err
updateDirectItemsStatus ct conn (L.toList msgIds) (CISSndError $ agentSndError err)
toView $ CRChatError (Just user) (ChatErrorAgent err $ Just connEntity)
ERR err -> do
toView $ CRChatError (Just user) (ChatErrorAgent err $ Just connEntity)
@@ -4411,10 +4414,10 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
RCVD msgMeta msgRcpt ->
withAckMessage' agentConnId msgMeta $
groupMsgReceived gInfo m conn msgMeta msgRcpt
SENT msgId -> do
SENT msgId proxy -> do
sentMsgDeliveryEvent conn msgId
checkSndInlineFTComplete conn msgId
updateGroupItemStatus gInfo m conn msgId $ CISSndSent SSPComplete
updateGroupItemStatus gInfo m conn msgId (CISSndSent SSPComplete) (Just $ isJust proxy)
SWITCH qd phase cStats -> do
toView $ CRGroupMemberSwitch user gInfo m (SwitchProgress qd phase cStats)
when (phase `elem` [SPStarted, SPCompleted]) $ case qd of
@@ -4450,13 +4453,15 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
OK ->
-- [async agent commands] continuation on receiving OK
when (corrId /= "") $ withCompletedCommand conn agentMsg $ \_cmdData -> pure ()
MWARN msgId err ->
withStore' $ \db -> updateGroupItemErrorStatus db msgId (groupMemberId' m) (CISSndWarning $ agentSndError err)
MERR msgId err -> do
withStore' $ \db -> updateGroupItemErrorStatus db msgId (groupMemberId' m) $ agentErrToItemStatus err
withStore' $ \db -> updateGroupItemErrorStatus db msgId (groupMemberId' m) (CISSndError $ agentSndError err)
-- group errors are silenced to reduce load on UI event log
-- toView $ CRChatError (Just user) (ChatErrorAgent err $ Just connEntity)
incAuthErrCounter connEntity conn err
MERRS msgIds err -> do
let newStatus = agentErrToItemStatus err
let newStatus = CISSndError $ agentSndError err
-- error cannot be AUTH error here
withStore' $ \db -> forM_ msgIds $ \msgId ->
updateGroupItemErrorStatus db msgId (groupMemberId' m) newStatus `catchAll_` pure ()
@@ -4517,7 +4522,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
updateDirectCIFileStatus db vr user fileId $ CIFSSndTransfer 0 1
toView $ CRSndFileStart user ci ft
sendFileChunk user ft
SENT msgId -> do
SENT msgId _proxy -> do
withStore' $ \db -> updateSndFileChunkSent db ft msgId
unless (fileStatus == FSCancelled) $ sendFileChunk user ft
MERR _ err -> do
@@ -4729,9 +4734,21 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
sentMsgDeliveryEvent Connection {connId} msgId =
withStore' $ \db -> updateSndMsgDeliveryStatus db connId msgId MDSSndSent
agentErrToItemStatus :: AgentErrorType -> CIStatus 'MDSnd
agentErrToItemStatus (SMP _ AUTH) = CISSndErrorAuth
agentErrToItemStatus err = CISSndError . T.unpack . safeDecodeUtf8 $ strEncode err
agentSndError :: AgentErrorType -> SndError
agentSndError = \case
SMP _ AUTH -> SndErrAuth
SMP _ QUOTA -> SndErrQuota
BROKER _ e -> brokerError SndErrRelay e
SMP proxySrv (SMP.PROXY (SMP.BROKER e)) -> brokerError (SndErrProxy proxySrv) e
AP.PROXY proxySrv _ (ProxyProtocolError (SMP.PROXY (SMP.BROKER e))) -> brokerError (SndErrProxyRelay proxySrv) e
e -> SndErrOther . safeDecodeUtf8 $ strEncode e
where
brokerError srvErr = \case
NETWORK -> SndErrExpired
TIMEOUT -> SndErrExpired
HOST -> srvErr SrvErrHost
SMP.TRANSPORT TEVersion -> srvErr SrvErrVersion
e -> srvErr . SrvErrOther . safeDecodeUtf8 $ strEncode e
badRcvFileChunk :: RcvFileTransfer -> String -> CM ()
badRcvFileChunk ft err =
@@ -6060,7 +6077,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
checkIntegrityCreateItem (CDGroupRcv gInfo m) msgMeta `catchChatError` \_ -> pure ()
forM_ msgRcpts $ \MsgReceipt {agentMsgId, msgRcptStatus} -> do
withStore' $ \db -> updateSndMsgDeliveryStatus db connId agentMsgId $ MDSSndRcvd msgRcptStatus
updateGroupItemStatus gInfo m conn agentMsgId $ CISSndRcvd msgRcptStatus SSPComplete
updateGroupItemStatus gInfo m conn agentMsgId (CISSndRcvd msgRcptStatus SSPComplete) Nothing
updateDirectItemsStatus :: Contact -> Connection -> [AgentMsgId] -> CIStatus 'MDSnd -> CM ()
updateDirectItemsStatus ct conn msgIds newStatus = do
@@ -6097,11 +6114,12 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
| otherwise -> updateGroupSndStatus db itemId groupMemberId newStatus $> True
_ -> pure False
updateGroupItemStatus :: GroupInfo -> GroupMember -> Connection -> AgentMsgId -> CIStatus 'MDSnd -> CM ()
updateGroupItemStatus gInfo@GroupInfo {groupId} GroupMember {groupMemberId} Connection {connId} msgId newMemStatus =
updateGroupItemStatus :: GroupInfo -> GroupMember -> Connection -> AgentMsgId -> CIStatus 'MDSnd -> Maybe Bool -> CM ()
updateGroupItemStatus gInfo@GroupInfo {groupId} GroupMember {groupMemberId} Connection {connId} msgId newMemStatus viaProxy_ =
withStore' (\db -> getGroupChatItemByAgentMsgId db user groupId connId msgId) >>= \case
Just (CChatItem SMDSnd ChatItem {meta = CIMeta {itemStatus = CISSndRcvd _ SSPComplete}}) -> pure ()
Just (CChatItem SMDSnd ChatItem {meta = CIMeta {itemId, itemStatus}}) -> do
forM_ viaProxy_ $ \viaProxy -> withStore' $ \db -> setGroupSndViaProxy db itemId groupMemberId viaProxy
memStatusChanged <- updateGroupMemSndStatus itemId groupMemberId newMemStatus
when memStatusChanged $ do
memStatusCounts <- withStore' (`getGroupSndStatusCounts` itemId)
@@ -6724,7 +6742,7 @@ mkChatItem :: (ChatTypeI c, MsgDirectionI d) => ChatDirection c d -> ChatItemId
mkChatItem cd ciId content file quotedItem sharedMsgId itemForwarded itemTimed live itemTs forwardedByMember currentTs =
let itemText = ciContentToText content
itemStatus = ciCreateStatus content
meta = mkCIMeta ciId content itemText itemStatus sharedMsgId itemForwarded Nothing False itemTimed (justTrue live) currentTs itemTs forwardedByMember currentTs currentTs
meta = mkCIMeta ciId content itemText itemStatus Nothing sharedMsgId itemForwarded Nothing False itemTimed (justTrue live) currentTs itemTs forwardedByMember currentTs currentTs
in ChatItem {chatDir = toCIDirection cd, meta, content, formattedText = parseMaybeMarkdownList itemText, quotedItem, reactions = [], file}
deleteDirectCI :: MsgDirectionI d => User -> Contact -> ChatItem 'CTDirect d -> Bool -> Bool -> CM ChatResponse