|
|
|
|
@@ -3181,6 +3181,7 @@ cleanupManager = do
|
|
|
|
|
forM_ us $ cleanupUser interval stepDelay
|
|
|
|
|
forM_ us' $ cleanupUser interval stepDelay
|
|
|
|
|
cleanupMessages `catchChatError` (toView . CRChatError Nothing)
|
|
|
|
|
-- TODO possibly, also cleanup async commands
|
|
|
|
|
cleanupProbes `catchChatError` (toView . CRChatError Nothing)
|
|
|
|
|
liftIO $ threadDelay' $ diffToMicroseconds interval
|
|
|
|
|
where
|
|
|
|
|
@@ -3589,15 +3590,13 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
|
|
|
|
|
-- TODO only acknowledge without saving message?
|
|
|
|
|
-- probably this branch is never executed, so there should be no reason
|
|
|
|
|
-- to save message if contact hasn't been created yet - chat item isn't created anyway
|
|
|
|
|
withAckMessage agentConnId conn meta False $ \cmdId -> do
|
|
|
|
|
(_conn', _) <- saveDirectRcvMSG conn meta cmdId msgBody
|
|
|
|
|
pure False
|
|
|
|
|
withAckMessage' agentConnId meta $
|
|
|
|
|
void $ saveDirectRcvMSG conn meta msgBody
|
|
|
|
|
SENT msgId ->
|
|
|
|
|
sentMsgDeliveryEvent conn msgId
|
|
|
|
|
OK ->
|
|
|
|
|
-- [async agent commands] continuation on receiving OK
|
|
|
|
|
withCompletedCommand conn agentMsg $ \CommandData {cmdFunction, cmdId} ->
|
|
|
|
|
when (cmdFunction == CFAckMessage) $ ackMsgDeliveryEvent conn cmdId
|
|
|
|
|
when (corrId /= "") $ withCompletedCommand conn agentMsg $ \_cmdData -> pure ()
|
|
|
|
|
MERR _ err -> do
|
|
|
|
|
toView $ CRChatError (Just user) (ChatErrorAgent err $ Just connEntity)
|
|
|
|
|
incAuthErrCounter connEntity conn err
|
|
|
|
|
@@ -3622,11 +3621,11 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
|
|
|
|
|
sendXGrpMemInv hostConnId (Just directConnReq) xGrpMemIntroCont
|
|
|
|
|
CRContactUri _ -> throwChatError $ CECommandError "unexpected ConnectionRequestUri type"
|
|
|
|
|
MSG msgMeta _msgFlags msgBody ->
|
|
|
|
|
withAckMessage agentConnId conn msgMeta True $ \cmdId -> do
|
|
|
|
|
withAckMessage agentConnId msgMeta True $ do
|
|
|
|
|
let MsgMeta {pqEncryption} = msgMeta
|
|
|
|
|
(ct', conn') <- updateContactPQRcv user ct conn pqEncryption
|
|
|
|
|
checkIntegrityCreateItem (CDDirectRcv ct') msgMeta `catchChatError` \_ -> pure ()
|
|
|
|
|
(conn'', msg@RcvMessage {chatMsgEvent = ACME _ event}) <- saveDirectRcvMSG conn' msgMeta cmdId msgBody
|
|
|
|
|
(conn'', msg@RcvMessage {chatMsgEvent = ACME _ event}) <- saveDirectRcvMSG conn' msgMeta msgBody
|
|
|
|
|
let ct'' = ct' {activeConn = Just conn''} :: Contact
|
|
|
|
|
assertDirectAllowed user MDRcv ct'' $ toCMEventTag event
|
|
|
|
|
updateChatLock "direct message" event
|
|
|
|
|
@@ -3656,7 +3655,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
|
|
|
|
|
let Contact {chatSettings = ChatSettings {sendRcpts}} = ct''
|
|
|
|
|
pure $ fromMaybe (sendRcptsContacts user) sendRcpts && hasDeliveryReceipt (toCMEventTag event)
|
|
|
|
|
RCVD msgMeta msgRcpt ->
|
|
|
|
|
withAckMessage' agentConnId conn msgMeta $
|
|
|
|
|
withAckMessage' agentConnId msgMeta $
|
|
|
|
|
directMsgReceived ct conn msgMeta msgRcpt
|
|
|
|
|
CONF confId pqSupport _ connInfo -> do
|
|
|
|
|
conn' <- processCONFpqSupport conn pqSupport
|
|
|
|
|
@@ -3764,8 +3763,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
|
|
|
|
|
createInternalChatItem user (CDDirectRcv ct') (CIRcvConnEvent $ RCERatchetSync rss) Nothing
|
|
|
|
|
OK ->
|
|
|
|
|
-- [async agent commands] continuation on receiving OK
|
|
|
|
|
withCompletedCommand conn agentMsg $ \CommandData {cmdFunction, cmdId} ->
|
|
|
|
|
when (cmdFunction == CFAckMessage) $ ackMsgDeliveryEvent conn cmdId
|
|
|
|
|
when (corrId /= "") $ withCompletedCommand conn agentMsg $ \_cmdData -> pure ()
|
|
|
|
|
MERR msgId err -> do
|
|
|
|
|
updateDirectItemStatus ct conn msgId $ agentErrToItemStatus err
|
|
|
|
|
toView $ CRChatError (Just user) (ChatErrorAgent err $ Just connEntity)
|
|
|
|
|
@@ -4031,11 +4029,11 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
|
|
|
|
|
void $ sendDirectMemberMessage imConn (XGrpMemCon memberId) groupId
|
|
|
|
|
_ -> messageWarning "sendXGrpMemCon: member category GCPreMember or GCPostMember is expected"
|
|
|
|
|
MSG msgMeta _msgFlags msgBody -> do
|
|
|
|
|
withAckMessage agentConnId conn msgMeta True $ \cmdId -> do
|
|
|
|
|
withAckMessage agentConnId msgMeta True $ do
|
|
|
|
|
checkIntegrityCreateItem (CDGroupRcv gInfo m) msgMeta `catchChatError` \_ -> pure ()
|
|
|
|
|
forM_ aChatMsgs $ \case
|
|
|
|
|
Right (ACMsg _ chatMsg) ->
|
|
|
|
|
processEvent cmdId chatMsg `catchChatError` \e -> toView $ CRChatError (Just user) e
|
|
|
|
|
processEvent chatMsg `catchChatError` \e -> toView $ CRChatError (Just user) e
|
|
|
|
|
Left e -> toView $ CRChatError (Just user) (ChatError . CEException $ "error parsing chat message: " <> e)
|
|
|
|
|
checkSendRcpt $ rights aChatMsgs
|
|
|
|
|
-- currently only a single message is forwarded
|
|
|
|
|
@@ -4046,9 +4044,9 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
|
|
|
|
|
where
|
|
|
|
|
aChatMsgs = parseChatMessages msgBody
|
|
|
|
|
brokerTs = metaBrokerTs msgMeta
|
|
|
|
|
processEvent :: MsgEncodingI e => CommandId -> ChatMessage e -> m ()
|
|
|
|
|
processEvent cmdId chatMsg = do
|
|
|
|
|
(m', conn', msg@RcvMessage {chatMsgEvent = ACME _ event}) <- saveGroupRcvMsg user groupId m conn msgMeta cmdId msgBody chatMsg
|
|
|
|
|
processEvent :: MsgEncodingI e => ChatMessage e -> m ()
|
|
|
|
|
processEvent chatMsg = do
|
|
|
|
|
(m', conn', msg@RcvMessage {chatMsgEvent = ACME _ event}) <- saveGroupRcvMsg user groupId m conn msgMeta msgBody chatMsg
|
|
|
|
|
updateChatLock "groupMessage" event
|
|
|
|
|
case event of
|
|
|
|
|
XMsgNew mc -> memberCanSend m' $ newGroupContentMessage gInfo m' mc msg brokerTs False
|
|
|
|
|
@@ -4108,7 +4106,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
|
|
|
|
|
unless (null ms) . void $
|
|
|
|
|
sendGroupMessage' user gInfo ms msg
|
|
|
|
|
RCVD msgMeta msgRcpt ->
|
|
|
|
|
withAckMessage' agentConnId conn msgMeta $
|
|
|
|
|
withAckMessage' agentConnId msgMeta $
|
|
|
|
|
groupMsgReceived gInfo m conn msgMeta msgRcpt
|
|
|
|
|
SENT msgId -> do
|
|
|
|
|
sentMsgDeliveryEvent conn msgId
|
|
|
|
|
@@ -4148,8 +4146,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
|
|
|
|
|
createInternalChatItem user (CDGroupRcv gInfo m') (CIRcvConnEvent $ RCERatchetSync rss) Nothing
|
|
|
|
|
OK ->
|
|
|
|
|
-- [async agent commands] continuation on receiving OK
|
|
|
|
|
withCompletedCommand conn agentMsg $ \CommandData {cmdFunction, cmdId} ->
|
|
|
|
|
when (cmdFunction == CFAckMessage) $ ackMsgDeliveryEvent conn cmdId
|
|
|
|
|
when (corrId /= "") $ withCompletedCommand conn agentMsg $ \_cmdData -> pure ()
|
|
|
|
|
MERR msgId err -> do
|
|
|
|
|
withStore' $ \db -> updateGroupItemErrorStatus db msgId (groupMemberId' m) $ agentErrToItemStatus err
|
|
|
|
|
-- group errors are silenced to reduce load on UI event log
|
|
|
|
|
@@ -4231,10 +4228,10 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
|
|
|
|
|
lookupChatItemByFileId db vr user fileId
|
|
|
|
|
toView $ CRSndFileRcvCancelled user ci ft
|
|
|
|
|
_ -> throwChatError $ CEFileSend fileId err
|
|
|
|
|
MSG meta _ _ -> withAckMessage' agentConnId conn meta $ pure ()
|
|
|
|
|
MSG meta _ _ -> withAckMessage' agentConnId meta $ pure ()
|
|
|
|
|
OK ->
|
|
|
|
|
-- [async agent commands] continuation on receiving OK
|
|
|
|
|
withCompletedCommand conn agentMsg $ \_cmdData -> pure ()
|
|
|
|
|
when (corrId /= "") $ withCompletedCommand conn agentMsg $ \_cmdData -> pure ()
|
|
|
|
|
ERR err -> do
|
|
|
|
|
toView $ CRChatError (Just user) (ChatErrorAgent err $ Just connEntity)
|
|
|
|
|
when (corrId /= "") $ withCompletedCommand conn agentMsg $ \_cmdData -> pure ()
|
|
|
|
|
@@ -4280,7 +4277,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
|
|
|
|
|
parseFileChunk msgBody >>= receiveFileChunk ft (Just conn) meta
|
|
|
|
|
OK ->
|
|
|
|
|
-- [async agent commands] continuation on receiving OK
|
|
|
|
|
withCompletedCommand conn agentMsg $ \_cmdData -> pure ()
|
|
|
|
|
when (corrId /= "") $ withCompletedCommand conn agentMsg $ \_cmdData -> pure ()
|
|
|
|
|
MERR _ err -> do
|
|
|
|
|
toView $ CRChatError (Just user) (ChatErrorAgent err $ Just connEntity)
|
|
|
|
|
incAuthErrCounter connEntity conn err
|
|
|
|
|
@@ -4307,7 +4304,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
|
|
|
|
|
RcvChunkOk ->
|
|
|
|
|
if B.length chunk /= fromInteger chunkSize
|
|
|
|
|
then badRcvFileChunk ft "incorrect chunk size"
|
|
|
|
|
else ack $ appendFileChunk ft chunkNo chunk False
|
|
|
|
|
else withAckMessage' agentConnId meta $ appendFileChunk ft chunkNo chunk False
|
|
|
|
|
RcvChunkFinal ->
|
|
|
|
|
if B.length chunk > fromInteger chunkSize
|
|
|
|
|
then badRcvFileChunk ft "incorrect chunk size"
|
|
|
|
|
@@ -4321,12 +4318,8 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
|
|
|
|
|
getChatItemByFileId db vr user fileId
|
|
|
|
|
toView $ CRRcvFileComplete user ci
|
|
|
|
|
forM_ conn_ $ \conn -> deleteAgentConnectionAsync user (aConnId conn)
|
|
|
|
|
RcvChunkDuplicate -> ack $ pure ()
|
|
|
|
|
RcvChunkDuplicate -> withAckMessage' agentConnId meta $ pure ()
|
|
|
|
|
RcvChunkError -> badRcvFileChunk ft $ "incorrect chunk number " <> show chunkNo
|
|
|
|
|
where
|
|
|
|
|
ack a = case conn_ of
|
|
|
|
|
Just conn -> withAckMessage' agentConnId conn meta a
|
|
|
|
|
Nothing -> a
|
|
|
|
|
|
|
|
|
|
processUserContactRequest :: ACommand 'Agent e -> ConnectionEntity -> Connection -> UserContact -> m ()
|
|
|
|
|
processUserContactRequest agentMsg connEntity conn UserContact {userContactLinkId} = case agentMsg of
|
|
|
|
|
@@ -4397,6 +4390,8 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
|
|
|
|
|
where
|
|
|
|
|
s = " " <> name <> "=" <> B.unpack (strEncode $ toCMEventTag event)
|
|
|
|
|
|
|
|
|
|
-- TODO v5.7 / v6.0 - together with deprecating old group protocol establishing direct connections?
|
|
|
|
|
-- we could save command records only for agent APIs we process continuations for (INV)
|
|
|
|
|
withCompletedCommand :: forall e. AEntityI e => Connection -> ACommand 'Agent e -> (CommandData -> m ()) -> m ()
|
|
|
|
|
withCompletedCommand Connection {connId} agentMsg action = do
|
|
|
|
|
let agentMsgTag = APCT (sAEntity @e) $ aCommandTag agentMsg
|
|
|
|
|
@@ -4414,36 +4409,27 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
|
|
|
|
|
withStore' $ \db -> updateCommandStatus db user cmdId CSError
|
|
|
|
|
throwChatError . CEAgentCommandError $ msg
|
|
|
|
|
|
|
|
|
|
createAckCmd :: Connection -> m CommandId
|
|
|
|
|
createAckCmd Connection {connId} = do
|
|
|
|
|
withStore' $ \db -> createCommand db user (Just connId) CFAckMessage
|
|
|
|
|
withAckMessage' :: ConnId -> MsgMeta -> m () -> m ()
|
|
|
|
|
withAckMessage' cId msgMeta action = do
|
|
|
|
|
withAckMessage cId msgMeta False $ action $> False
|
|
|
|
|
|
|
|
|
|
withAckMessage' :: ConnId -> Connection -> MsgMeta -> m () -> m ()
|
|
|
|
|
withAckMessage' cId conn msgMeta action = do
|
|
|
|
|
withAckMessage cId conn msgMeta False $ \_cmdId -> action $> False
|
|
|
|
|
|
|
|
|
|
withAckMessage :: ConnId -> Connection -> MsgMeta -> Bool -> (CommandId -> m Bool) -> m ()
|
|
|
|
|
withAckMessage cId conn msgMeta showCritical action = do
|
|
|
|
|
cmdId <- createAckCmd conn `catchChatError` \e -> throwError $ ChatErrorAgent (CRITICAL True $ show e) Nothing
|
|
|
|
|
-- [async agent commands] command should be asynchronous, continuation is ackMsgDeliveryEvent
|
|
|
|
|
withAckMessage :: ConnId -> MsgMeta -> Bool -> m Bool -> m ()
|
|
|
|
|
withAckMessage cId msgMeta showCritical action =
|
|
|
|
|
-- [async agent commands] command should be asynchronous
|
|
|
|
|
-- TODO catching error and sending ACK after an error, particularly if it is a database error, will result in the message not processed (and no notification to the user).
|
|
|
|
|
-- Possible solutions are:
|
|
|
|
|
-- 1) retry processing several times
|
|
|
|
|
-- 2) stabilize database
|
|
|
|
|
-- 3) show screen of death to the user asking to restart
|
|
|
|
|
tryChatError (action cmdId) >>= \case
|
|
|
|
|
Right withRcpt -> ackMsg cId cmdId msgMeta $ if withRcpt then Just "" else Nothing
|
|
|
|
|
tryChatError action >>= \case
|
|
|
|
|
Right withRcpt -> ackMsg msgMeta $ if withRcpt then Just "" else Nothing
|
|
|
|
|
-- If showCritical is True, then these errors don't result in ACK and show user visible alert
|
|
|
|
|
-- This prevents losing the message that failed to be processed.
|
|
|
|
|
Left (ChatErrorStore SEDBBusyError {message}) | showCritical -> throwError $ ChatErrorAgent (CRITICAL True message) Nothing
|
|
|
|
|
Left e -> ackMsg cId cmdId msgMeta Nothing >> throwError e
|
|
|
|
|
|
|
|
|
|
ackMsg :: ConnId -> CommandId -> MsgMeta -> Maybe MsgReceiptInfo -> m ()
|
|
|
|
|
ackMsg cId cmdId MsgMeta {recipient = (msgId, _)} rcpt = withAgent $ \a -> ackMessageAsync a (aCorrId cmdId) cId msgId rcpt
|
|
|
|
|
|
|
|
|
|
ackMsgDeliveryEvent :: Connection -> CommandId -> m ()
|
|
|
|
|
ackMsgDeliveryEvent Connection {connId} ackCmdId =
|
|
|
|
|
withStore' $ \db -> updateRcvMsgDeliveryStatus db connId ackCmdId MDSRcvAcknowledged
|
|
|
|
|
Left e -> ackMsg msgMeta Nothing >> throwError e
|
|
|
|
|
where
|
|
|
|
|
ackMsg :: MsgMeta -> Maybe MsgReceiptInfo -> m ()
|
|
|
|
|
ackMsg MsgMeta {recipient = (msgId, _)} rcpt = withAgent $ \a -> ackMessageAsync a "" cId msgId rcpt
|
|
|
|
|
|
|
|
|
|
sentMsgDeliveryEvent :: Connection -> AgentMsgId -> m ()
|
|
|
|
|
sentMsgDeliveryEvent Connection {connId} msgId =
|
|
|
|
|
@@ -6350,25 +6336,25 @@ sendPendingGroupMessages user GroupMember {groupMemberId, localDisplayName} conn
|
|
|
|
|
_ -> pure ()
|
|
|
|
|
|
|
|
|
|
-- TODO [batch send] refactor direct message processing same as groups (e.g. checkIntegrity before processing)
|
|
|
|
|
saveDirectRcvMSG :: ChatMonad m => Connection -> MsgMeta -> CommandId -> MsgBody -> m (Connection, RcvMessage)
|
|
|
|
|
saveDirectRcvMSG conn@Connection {connId} agentMsgMeta agentAckCmdId msgBody =
|
|
|
|
|
saveDirectRcvMSG :: ChatMonad m => Connection -> MsgMeta -> MsgBody -> m (Connection, RcvMessage)
|
|
|
|
|
saveDirectRcvMSG conn@Connection {connId} agentMsgMeta msgBody =
|
|
|
|
|
case parseChatMessages msgBody of
|
|
|
|
|
[Right (ACMsg _ ChatMessage {chatVRange, msgId = sharedMsgId_, chatMsgEvent})] -> do
|
|
|
|
|
conn' <- updatePeerChatVRange conn chatVRange
|
|
|
|
|
let agentMsgId = fst $ recipient agentMsgMeta
|
|
|
|
|
newMsg = NewRcvMessage {chatMsgEvent, msgBody}
|
|
|
|
|
rcvMsgDelivery = RcvMsgDelivery {connId, agentMsgId, agentMsgMeta, agentAckCmdId}
|
|
|
|
|
rcvMsgDelivery = RcvMsgDelivery {connId, agentMsgId, agentMsgMeta}
|
|
|
|
|
msg <- withStore $ \db -> createNewMessageAndRcvMsgDelivery db (ConnectionId connId) newMsg sharedMsgId_ rcvMsgDelivery Nothing
|
|
|
|
|
pure (conn', msg)
|
|
|
|
|
[Left e] -> error $ "saveDirectRcvMSG: error parsing chat message: " <> e
|
|
|
|
|
_ -> error "saveDirectRcvMSG: batching not supported"
|
|
|
|
|
|
|
|
|
|
saveGroupRcvMsg :: (MsgEncodingI e, ChatMonad m) => User -> GroupId -> GroupMember -> Connection -> MsgMeta -> CommandId -> MsgBody -> ChatMessage e -> m (GroupMember, Connection, RcvMessage)
|
|
|
|
|
saveGroupRcvMsg user groupId authorMember conn@Connection {connId} agentMsgMeta agentAckCmdId msgBody ChatMessage {chatVRange, msgId = sharedMsgId_, chatMsgEvent} = do
|
|
|
|
|
saveGroupRcvMsg :: (MsgEncodingI e, ChatMonad m) => User -> GroupId -> GroupMember -> Connection -> MsgMeta -> MsgBody -> ChatMessage e -> m (GroupMember, Connection, RcvMessage)
|
|
|
|
|
saveGroupRcvMsg user groupId authorMember conn@Connection {connId} agentMsgMeta msgBody ChatMessage {chatVRange, msgId = sharedMsgId_, chatMsgEvent} = do
|
|
|
|
|
(am'@GroupMember {memberId = amMemId, groupMemberId = amGroupMemId}, conn') <- updateMemberChatVRange authorMember conn chatVRange
|
|
|
|
|
let agentMsgId = fst $ recipient agentMsgMeta
|
|
|
|
|
newMsg = NewRcvMessage {chatMsgEvent, msgBody}
|
|
|
|
|
rcvMsgDelivery = RcvMsgDelivery {connId, agentMsgId, agentMsgMeta, agentAckCmdId}
|
|
|
|
|
rcvMsgDelivery = RcvMsgDelivery {connId, agentMsgId, agentMsgMeta}
|
|
|
|
|
msg <-
|
|
|
|
|
withStore (\db -> createNewMessageAndRcvMsgDelivery db (GroupId groupId) newMsg sharedMsgId_ rcvMsgDelivery $ Just amGroupMemId)
|
|
|
|
|
`catchChatError` \e -> case e of
|
|
|
|
|
|