diff --git a/cabal.project b/cabal.project index 02264df7f3..e2c6d2f276 100644 --- a/cabal.project +++ b/cabal.project @@ -7,7 +7,7 @@ constraints: zip +disable-bzip2 +disable-zstd source-repository-package type: git location: https://github.com/simplex-chat/simplexmq.git - tag: 50c210c5c0c7f792c39123c2177bb60b307295b9 + tag: afecefc3adee044545144a269ad7df7b738af65a source-repository-package type: git diff --git a/docs/rfcs/2022-08-26-group-connections-recovery.md b/docs/rfcs/2022-08-26-group-connections-recovery.md new file mode 100644 index 0000000000..462879e936 --- /dev/null +++ b/docs/rfcs/2022-08-26-group-connections-recovery.md @@ -0,0 +1,57 @@ +# Group connections recovery after asynchronous commands + +## Problem + +Similar to direct chat connections, group connections can fail to be established on failing IO, for example on bad network. When this happens agent throws error. For direct connections it is viable to propagate the error to UI to indicate failure, so the user can retry. For group connections this becomes unviable as multiple connections are being established automatically on each new joining member (2 for each existing member except host), and the user can't retry. + +## Proposal 2 + +- Have separate command processing queue in agent + +- Chat creates correlation id for command + +- Chat has data structure to represent possible continuations + +- Chat saves correlation id and continuation to database + +- Agent asynchronously responds after command completion with correlation id + +- Chat restores continuation by correlation id and processes it + +## Proposal + +- Add special versions of commands `createConnection`, `joinConnection` (`allowConnection` as well?), to which agent responds synchronously with connection id and status; + +- Add new connection type `NewConnection` / `NoQueueConnection`; + +- Add agent responses signalling these commands' asynchronous completion, e.g. `CREATE_SUCCESS`, `JOIN_SUCCESS` (?); + +- Return connection status on subscriptions; + +In chat: + +- Event-driven processing - for group connections use commands with synchronous responses, save intermediate connection state, process success responses; + +- On subscription check whether connection's status has changed, if yes, run respective continuation - same as on success event. + +### Commands use + +`joinConnection` in group connections is used: + +- #779 : when joining group - `APIJoinGroup`; +- #2147 : on receiving `XGrpMemFwd` message for both group and direct connections; -- from inviting member to existing members + +`allowConnection`: + +- #1419 : on receiving `XGrpMemInfo` in `CONF` inside direct connection; -- from existing member to invitee after XGrpMemFwd +- #1486 : on receiving `XGrpAcpt` in `CONF` inside group connection; -- from invitee to host +- #1495 : on receiving `XGrpMemInfo` in `CONF` inside group connection; -- from existing member to invitee after XGrpMemFwd + +`createConnection`: + +- #756 : when inviting new member - `APIAddMember`, probably doesn't have to be recovered - error can be signalled; +- #2112 : on receiving `XGrpMemIntro` message for both group and direct connections for each introduced member; -- from host to invitee + +## Misc + +Chat.hs #1419 - why is XOk sent instead of XGrpMemInfo? diff --git a/scripts/nix/sha256map.nix b/scripts/nix/sha256map.nix index df144900e3..5ab74a9df9 100644 --- a/scripts/nix/sha256map.nix +++ b/scripts/nix/sha256map.nix @@ -1,5 +1,5 @@ { - "https://github.com/simplex-chat/simplexmq.git"."50c210c5c0c7f792c39123c2177bb60b307295b9" = "1f23p5crfy8fhfmcv96r7c6xpzgj2ab8nwqzdhis6mskhrfhyj4g"; + "https://github.com/simplex-chat/simplexmq.git"."afecefc3adee044545144a269ad7df7b738af65a" = "0v3z9sfy7vjbgm3cznb7vaycn2r2yav83x74khf6a001cs1zv171"; "https://github.com/simplex-chat/direct-sqlcipher.git"."34309410eb2069b029b8fc1872deb1e0db123294" = "0kwkmhyfsn2lixdlgl15smgr1h5gjk7fky6abzh8rng2h5ymnffd"; "https://github.com/simplex-chat/sqlcipher-simple.git"."5e154a2aeccc33ead6c243ec07195ab673137221" = "1d1gc5wax4vqg0801ajsmx1sbwvd9y7p7b8mmskvqsmpbwgbh0m0"; "https://github.com/simplex-chat/aeson.git"."3eb66f9a68f103b5f1489382aad89f5712a64db7" = "0kilkx59fl6c3qy3kjczqvm8c3f4n3p0bdk9biyflf51ljnzp4yp"; diff --git a/simplex-chat.cabal b/simplex-chat.cabal index 47ee627c36..ac41f0d7e1 100644 --- a/simplex-chat.cabal +++ b/simplex-chat.cabal @@ -48,6 +48,7 @@ library Simplex.Chat.Migrations.M20220822_groups_host_conn_custom_user_profile_id Simplex.Chat.Migrations.M20220823_delete_broken_group_event_chat_items Simplex.Chat.Migrations.M20220824_profiles_local_alias + Simplex.Chat.Migrations.M20220909_commands Simplex.Chat.Mobile Simplex.Chat.Options Simplex.Chat.ProfileGenerator diff --git a/src/Simplex/Chat.hs b/src/Simplex/Chat.hs index 65670eb262..a790e66b8e 100644 --- a/src/Simplex/Chat.hs +++ b/src/Simplex/Chat.hs @@ -259,6 +259,7 @@ processChatCommand = \case setupSndFileTransfer :: Contact -> m (Maybe (FileInvitation, CIFile 'MDSnd)) setupSndFileTransfer ct = forM file_ $ \file -> do (fileSize, chSize) <- checkSndFile file + -- [async agent commands] keep command synchronous, but process error (agentConnId, fileConnReq) <- withAgent $ \a -> createConnection a True SCMInvitation let fileName = takeFileName file fileInvitation = FileInvitation {fileName, fileSize, fileConnReq = Just fileConnReq} @@ -427,7 +428,7 @@ processChatCommand = \case withFilesFolder $ \filesFolder -> deleteFile filesFolder fileInfo withAgent $ \a -> forM_ conns $ \conn -> deleteConnection a (aConnId conn) `catchError` \(_ :: AgentErrorType) -> pure () - -- two functions below are called in separate transactions to prevent crashes on android + -- functions below are called in separate transactions to prevent crashes on android -- (possibly, race condition on integrity check?) withStore' $ \db -> deleteContactConnectionsAndFiles db userId ct withStore' $ \db -> deleteContact db userId ct @@ -447,7 +448,7 @@ processChatCommand = \case withChatLock . procCmd $ do when (memberActive membership) . void $ sendGroupMessage gInfo members XGrpDel mapM_ deleteMemberConnection members - -- two functions below are called in separate transactions to prevent crashes on android + -- functions below are called in separate transactions to prevent crashes on android -- (possibly, race condition on integrity check?) withStore' $ \db -> deleteGroupConnectionsAndFiles db user gInfo members withStore' $ \db -> deleteGroupItemsAndMembers db user gInfo @@ -752,6 +753,7 @@ processChatCommand = \case case contactMember contact members of Nothing -> do gVar <- asks idsDrg + -- [async agent commands] keep command synchronous, but process error (agentConnId, cReq) <- withAgent $ \a -> createConnection a True SCMInvitation member <- withStore $ \db -> createNewContactMember db gVar user groupId contact memRole agentConnId cReq sendInvitation member cReq @@ -764,6 +766,7 @@ processChatCommand = \case APIJoinGroup groupId -> withUser $ \user@User {userId} -> do ReceivedGroupInvitation {fromMember, connRequest, groupInfo = g@GroupInfo {membership}} <- withStore $ \db -> getGroupInvitation db user groupId withChatLock . procCmd $ do + -- [async agent commands] keep command synchronous, but process error agentConnId <- withAgent $ \a -> joinConnection a True connRequest . directMessage $ XGrpAcpt (memberId (membership :: GroupMember)) withStore' $ \db -> do createMemberConnection db userId fromMember agentConnId @@ -1116,6 +1119,7 @@ acceptFileReceive user@User {userId} RcvFileTransfer {fileId, fileInvitation = F case fileConnReq of -- direct file protocol Just connReq -> + -- [async agent commands] keep command synchronous, but process error tryError (withAgent $ \a -> joinConnection a True connReq . directMessage $ XFileAcpt fName) >>= \case Right agentConnId -> do filePath <- getRcvFilePath filePath_ fName @@ -1130,6 +1134,7 @@ acceptFileReceive user@User {userId} RcvFileTransfer {fileId, fileInvitation = F case activeConn of Just conn -> do sharedMsgId <- withStore $ \db -> getSharedMsgIdByFileId db userId fileId + -- [async agent commands] keep command synchronous, but process error (agentConnId, fileInvConnReq) <- withAgent $ \a -> createConnection a True SCMInvitation filePath <- getRcvFilePath filePath_ fName ci <- withStore $ \db -> acceptRcvFileTransfer db user fileId agentConnId ConnNew filePath @@ -1189,10 +1194,10 @@ agentSubscriber = do q <- asks $ subQ . smpAgent l <- asks chatLock forever $ do - (_, connId, msg) <- atomically $ readTBQueue q + (corrId, connId, msg) <- atomically $ readTBQueue q u <- readTVarIO =<< asks currentUser withLock l . void . runExceptT $ - processAgentMessage u connId msg `catchError` (toView . CRChatError) + processAgentMessage u corrId connId msg `catchError` (toView . CRChatError) type AgentBatchSubscribe m = AgentClient -> [ConnId] -> ExceptT AgentErrorType m (Map ConnId (Either AgentErrorType ())) @@ -1309,9 +1314,9 @@ subscribeUserConnections agentBatchSubscribe user = do Just _ -> Nothing _ -> Just . ChatError . CEAgentNoSubResult $ AgentConnId connId -processAgentMessage :: forall m. ChatMonad m => Maybe User -> ConnId -> ACommand 'Agent -> m () -processAgentMessage Nothing _ _ = throwChatError CENoActiveUser -processAgentMessage (Just User {userId}) "" agentMessage = case agentMessage of +processAgentMessage :: forall m. ChatMonad m => Maybe User -> ConnId -> ACorrId -> ACommand 'Agent -> m () +processAgentMessage Nothing _ _ _ = throwChatError CENoActiveUser +processAgentMessage (Just User {userId}) _ "" agentMessage = case agentMessage of CONNECT p h -> hostEvent $ CRHostConnected p h DISCONNECT p h -> hostEvent $ CRHostDisconnected p h DOWN srv conns -> serverEvent srv conns CRContactsDisconnected "disconnected" @@ -1324,7 +1329,7 @@ processAgentMessage (Just User {userId}) "" agentMessage = case agentMessage of cs <- withStore' $ \db -> getConnectionsContacts db userId conns toView $ event srv cs showToast ("server " <> str) (safeDecodeUtf8 $ strEncode host) -processAgentMessage (Just user@User {userId, profile}) agentConnId agentMessage = +processAgentMessage (Just user@User {userId, profile}) corrId agentConnId agentMessage = (withStore (\db -> getConnectionEntity db user $ AgentConnId agentConnId) >>= updateConnStatus) >>= \case RcvDirectMsgConnection conn contact_ -> processDirectMessage agentMessage conn contact_ @@ -1364,24 +1369,41 @@ processAgentMessage (Just user@User {userId, profile}) agentConnId agentMessage incognitoProfile <- forM customUserProfileId $ \profileId -> withStore (\db -> getProfileById db userId profileId) let profileToSend = fromLocalProfile $ fromMaybe profile incognitoProfile saveConnInfo conn connInfo - allowAgentConnection conn confId $ XInfo profileToSend + -- [async agent commands] no continuation needed, but command should be asynchronous for stability + allowAgentConnectionAsync user conn confId $ XInfo profileToSend INFO connInfo -> saveConnInfo conn connInfo MSG meta _msgFlags msgBody -> do - _ <- saveRcvMSG conn (ConnectionId connId) meta msgBody - withAckMessage agentConnId meta $ pure () - ackMsgDeliveryEvent conn meta + cmdId <- createAckCmd conn + _ <- saveRcvMSG conn (ConnectionId connId) meta msgBody cmdId + withAckMessage agentConnId cmdId meta $ pure () SENT msgId -> -- ? updateDirectChatItemStatus sentMsgDeliveryEvent conn msgId + OK -> + -- [async agent commands] continuation on receiving OK + withCompletedCommand conn agentMsg $ \CommandData {cmdFunction, cmdId} -> + when (cmdFunction == CFAckMessage) $ ackMsgDeliveryEvent conn cmdId MERR _ err -> toView . CRChatError $ ChatErrorAgent err -- ? updateDirectChatItemStatus ERR err -> toView . CRChatError $ ChatErrorAgent err -- TODO add debugging output _ -> pure () Just ct@Contact {localDisplayName = c, contactId} -> case agentMsg of + INV (ACR _ cReq) -> + -- [async agent commands] XGrpMemIntro continuation on receiving INV + withCompletedCommand conn agentMsg $ \_ -> + case cReq of + directConnReq@(CRInvitationUri _ _) -> do + contData <- withStore' $ \db -> do + setConnConnReqInv db user connId cReq + getXGrpMemIntroContDirect db user ct + forM_ contData $ \(hostConnId, xGrpMemIntroCont) -> + sendXGrpMemIntro hostConnId directConnReq xGrpMemIntroCont + CRContactUri _ -> throwChatError $ CECommandError "unexpected ConnectionRequestUri type" MSG msgMeta _msgFlags msgBody -> do - msg@RcvMessage {chatMsgEvent} <- saveRcvMSG conn (ConnectionId connId) msgMeta msgBody - withAckMessage agentConnId msgMeta $ + cmdId <- createAckCmd conn + msg@RcvMessage {chatMsgEvent} <- saveRcvMSG conn (ConnectionId connId) msgMeta msgBody cmdId + withAckMessage agentConnId cmdId msgMeta $ case chatMsgEvent of XMsgNew mc -> newContentMessage ct mc msg msgMeta XMsgUpdate sharedMsgId mContent -> messageUpdate ct sharedMsgId mContent msg msgMeta @@ -1400,7 +1422,6 @@ processAgentMessage (Just user@User {userId, profile}) agentConnId agentMessage XCallExtra callId extraInfo -> xCallExtra ct callId extraInfo msg msgMeta XCallEnd callId -> xCallEnd ct callId msg msgMeta _ -> pure () - ackMsgDeliveryEvent conn msgMeta CONF confId _ connInfo -> do -- confirming direct connection with a member ChatMessage {chatMsgEvent} <- liftEither $ parseChatMessage connInfo @@ -1408,7 +1429,8 @@ processAgentMessage (Just user@User {userId, profile}) agentConnId agentMessage XGrpMemInfo _memId _memProfile -> do -- TODO check member ID -- TODO update member profile - allowAgentConnection conn confId XOk + -- [async agent commands] no continuation needed, but command should be asynchronous for stability + allowAgentConnectionAsync user conn confId XOk _ -> messageError "CONF from member must have x.grp.mem.info" INFO connInfo -> do ChatMessage {chatMsgEvent} <- liftEither $ parseChatMessage connInfo @@ -1449,6 +1471,10 @@ processAgentMessage (Just user@User {userId, profile}) agentConnId agentMessage chatItem <- withStore $ \db -> updateDirectChatItemStatus db userId contactId (chatItemId' ci) CISSndSent toView $ CRChatItemStatusUpdated (AChatItem SCTDirect SMDSnd (DirectChat ct) chatItem) _ -> pure () + OK -> + -- [async agent commands] continuation on receiving OK + withCompletedCommand conn agentMsg $ \CommandData {cmdFunction, cmdId} -> + when (cmdFunction == CFAckMessage) $ ackMsgDeliveryEvent conn cmdId END -> do toView $ CRContactAnotherClient ct showToast (c <> "> ") "connected to another client" @@ -1464,7 +1490,19 @@ processAgentMessage (Just user@User {userId, profile}) agentConnId agentMessage _ -> pure () processGroupMessage :: ACommand 'Agent -> Connection -> GroupInfo -> GroupMember -> m () - processGroupMessage agentMsg conn gInfo@GroupInfo {groupId, localDisplayName = gName, membership, chatSettings} m = case agentMsg of + processGroupMessage agentMsg conn@Connection {connId} gInfo@GroupInfo {groupId, localDisplayName = gName, membership, chatSettings} m = case agentMsg of + INV (ACR _ cReq) -> + -- [async agent commands] XGrpMemIntro continuation on receiving INV + withCompletedCommand conn agentMsg $ \_ -> + case cReq of + groupConnReq@(CRInvitationUri _ _) -> do + contData <- withStore' $ \db -> do + setConnConnReqInv db user connId cReq + getXGrpMemIntroContGroup db user m + forM_ contData $ \(hostConnId, directConnReq) -> do + let GroupMember {groupMemberId, memberId} = m + sendXGrpMemIntro hostConnId directConnReq XGrpMemIntroCont {groupId, groupMemberId, memberId, groupConnReq} + CRContactUri _ -> throwChatError $ CECommandError "unexpected ConnectionRequestUri type" CONF confId _ connInfo -> do ChatMessage {chatMsgEvent} <- liftEither $ parseChatMessage connInfo case memberCategory m of @@ -1473,7 +1511,8 @@ processAgentMessage (Just user@User {userId, profile}) agentConnId agentMessage XGrpAcpt memId | sameMemberId memId m -> do withStore $ \db -> liftIO $ updateGroupMemberStatus db userId m GSMemAccepted - allowAgentConnection conn confId XOk + -- [async agent commands] no continuation needed, but command should be asynchronous for stability + allowAgentConnectionAsync user conn confId XOk | otherwise -> messageError "x.grp.acpt: memberId is different from expected" _ -> messageError "CONF from invited member must have x.grp.acpt" _ -> @@ -1481,7 +1520,8 @@ processAgentMessage (Just user@User {userId, profile}) agentConnId agentMessage XGrpMemInfo memId _memProfile | sameMemberId memId m -> do -- TODO update member profile - allowAgentConnection conn confId $ XGrpMemInfo (memberId (membership :: GroupMember)) (fromLocalProfile $ memberProfile membership) + -- [async agent commands] no continuation needed, but command should be asynchronous for stability + allowAgentConnectionAsync user conn confId $ XGrpMemInfo (memberId (membership :: GroupMember)) (fromLocalProfile $ memberProfile membership) | otherwise -> messageError "x.grp.mem.info: memberId is different from expected" _ -> messageError "CONF from member must have x.grp.mem.info" INFO connInfo -> do @@ -1532,8 +1572,9 @@ processAgentMessage (Just user@User {userId, profile}) agentConnId agentMessage let connectedIncognito = contactConnIncognito ct || memberIncognito membership when (memberCategory m == GCPreMember) $ probeMatchingContacts ct connectedIncognito MSG msgMeta _msgFlags msgBody -> do - msg@RcvMessage {chatMsgEvent} <- saveRcvMSG conn (GroupId groupId) msgMeta msgBody - withAckMessage agentConnId msgMeta $ + cmdId <- createAckCmd conn + msg@RcvMessage {chatMsgEvent} <- saveRcvMSG conn (GroupId groupId) msgMeta msgBody cmdId + withAckMessage agentConnId cmdId msgMeta $ case chatMsgEvent of XMsgNew mc -> newGroupContentMessage gInfo m mc msg msgMeta XMsgUpdate sharedMsgId mContent -> groupMessageUpdate gInfo m sharedMsgId mContent msg @@ -1543,7 +1584,7 @@ processAgentMessage (Just user@User {userId, profile}) agentConnId agentMessage XFileCancel sharedMsgId -> xFileCancelGroup gInfo m sharedMsgId msgMeta XFileAcptInv sharedMsgId fileConnReq fName -> xFileAcptInvGroup gInfo m sharedMsgId fileConnReq fName msgMeta XGrpMemNew memInfo -> xGrpMemNew gInfo m memInfo msg msgMeta - XGrpMemIntro memInfo -> xGrpMemIntro conn gInfo m memInfo + XGrpMemIntro memInfo -> xGrpMemIntro gInfo m memInfo XGrpMemInv memId introInv -> xGrpMemInv gInfo m memId introInv XGrpMemFwd memInfo introInv -> xGrpMemFwd gInfo m memInfo introInv XGrpMemDel memId -> xGrpMemDel gInfo m memId msg msgMeta @@ -1551,9 +1592,12 @@ processAgentMessage (Just user@User {userId, profile}) agentConnId agentMessage XGrpDel -> xGrpDel gInfo m msg msgMeta XGrpInfo p' -> xGrpInfo gInfo m p' msg msgMeta _ -> messageError $ "unsupported message: " <> T.pack (show chatMsgEvent) - ackMsgDeliveryEvent conn msgMeta SENT msgId -> sentMsgDeliveryEvent conn msgId + OK -> + -- [async agent commands] continuation on receiving OK + withCompletedCommand conn agentMsg $ \CommandData {cmdFunction, cmdId} -> + when (cmdFunction == CFAckMessage) $ ackMsgDeliveryEvent conn cmdId MERR _ err -> toView . CRChatError $ ChatErrorAgent err ERR err -> toView . CRChatError $ ChatErrorAgent err -- TODO add debugging output @@ -1571,7 +1615,8 @@ processAgentMessage (Just user@User {userId, profile}) agentConnId agentMessage XFileAcpt name | name == fileName -> do withStore' $ \db -> updateSndFileStatus db ft FSAccepted - allowAgentConnection conn confId XOk + -- [async agent commands] no continuation needed, but command should be asynchronous for stability + allowAgentConnectionAsync user conn confId XOk | otherwise -> messageError "x.file.acpt: fileName is different from expected" _ -> messageError "CONF from file connection must have x.file.acpt" CON -> do @@ -1590,8 +1635,12 @@ processAgentMessage (Just user@User {userId, profile}) agentConnId agentMessage ci <- withStore $ \db -> getChatItemByFileId db user fileId toView $ CRSndFileRcvCancelled ci ft _ -> throwChatError $ CEFileSend fileId err - MSG meta _ _ -> - withAckMessage agentConnId meta $ pure () + MSG meta _ _ -> do + cmdId <- createAckCmd conn + withAckMessage agentConnId cmdId meta $ pure () + OK -> + -- [async agent commands] continuation on receiving OK + withCompletedCommand conn agentMsg $ \_cmdData -> pure () ERR err -> toView . CRChatError $ ChatErrorAgent err -- TODO add debugging output _ -> pure () @@ -1605,7 +1654,7 @@ processAgentMessage (Just user@User {userId, profile}) agentConnId agentMessage CONF confId _ connInfo -> do ChatMessage {chatMsgEvent} <- liftEither $ parseChatMessage connInfo case chatMsgEvent of - XOk -> allowAgentConnection conn confId XOk + XOk -> allowAgentConnectionAsync user conn confId XOk -- [async agent commands] no continuation needed, but command should be asynchronous for stability _ -> pure () CON -> do ci <- withStore $ \db -> do @@ -1613,39 +1662,44 @@ processAgentMessage (Just user@User {userId, profile}) agentConnId agentMessage liftIO $ updateCIFileStatus db user fileId CIFSRcvTransfer getChatItemByFileId db user fileId toView $ CRRcvFileStart ci - MSG meta@MsgMeta {recipient = (msgId, _), integrity} _ msgBody -> withAckMessage agentConnId meta $ do - parseFileChunk msgBody >>= \case - FileChunkCancel -> - unless cancelled $ do - cancelRcvFileTransfer user ft - toView (CRRcvFileSndCancelled ft) - FileChunk {chunkNo, chunkBytes = chunk} -> do - case integrity of - MsgOk -> pure () - MsgError MsgDuplicate -> pure () -- TODO remove once agent removes duplicates - MsgError e -> - badRcvFileChunk ft $ "invalid file chunk number " <> show chunkNo <> ": " <> show e - withStore' (\db -> createRcvFileChunk db ft chunkNo msgId) >>= \case - RcvChunkOk -> - if B.length chunk /= fromInteger chunkSize - then badRcvFileChunk ft "incorrect chunk size" - else appendFileChunk ft chunkNo chunk - RcvChunkFinal -> - if B.length chunk > fromInteger chunkSize - then badRcvFileChunk ft "incorrect chunk size" - else do - appendFileChunk ft chunkNo chunk - ci <- withStore $ \db -> do - liftIO $ do - updateRcvFileStatus db ft FSComplete - updateCIFileStatus db user fileId CIFSRcvComplete - deleteRcvFileChunks db ft - getChatItemByFileId db user fileId - toView $ CRRcvFileComplete ci - closeFileHandle fileId rcvFiles - withAgent (`deleteConnection` agentConnId) - RcvChunkDuplicate -> pure () - RcvChunkError -> badRcvFileChunk ft $ "incorrect chunk number " <> show chunkNo + MSG meta@MsgMeta {recipient = (msgId, _), integrity} _ msgBody -> do + cmdId <- createAckCmd conn + withAckMessage agentConnId cmdId meta $ do + parseFileChunk msgBody >>= \case + FileChunkCancel -> + unless cancelled $ do + cancelRcvFileTransfer user ft + toView (CRRcvFileSndCancelled ft) + FileChunk {chunkNo, chunkBytes = chunk} -> do + case integrity of + MsgOk -> pure () + MsgError MsgDuplicate -> pure () -- TODO remove once agent removes duplicates + MsgError e -> + badRcvFileChunk ft $ "invalid file chunk number " <> show chunkNo <> ": " <> show e + withStore' (\db -> createRcvFileChunk db ft chunkNo msgId) >>= \case + RcvChunkOk -> + if B.length chunk /= fromInteger chunkSize + then badRcvFileChunk ft "incorrect chunk size" + else appendFileChunk ft chunkNo chunk + RcvChunkFinal -> + if B.length chunk > fromInteger chunkSize + then badRcvFileChunk ft "incorrect chunk size" + else do + appendFileChunk ft chunkNo chunk + ci <- withStore $ \db -> do + liftIO $ do + updateRcvFileStatus db ft FSComplete + updateCIFileStatus db user fileId CIFSRcvComplete + deleteRcvFileChunks db ft + getChatItemByFileId db user fileId + toView $ CRRcvFileComplete ci + closeFileHandle fileId rcvFiles + withAgent (`deleteConnection` agentConnId) + RcvChunkDuplicate -> pure () + RcvChunkError -> badRcvFileChunk ft $ "incorrect chunk number " <> show chunkNo + OK -> + -- [async agent commands] continuation on receiving OK + withCompletedCommand conn agentMsg $ \_cmdData -> pure () MERR _ err -> toView . CRChatError $ ChatErrorAgent err ERR err -> toView . CRChatError $ ChatErrorAgent err -- TODO add debugging output @@ -1677,13 +1731,30 @@ processAgentMessage (Just user@User {userId, profile}) agentConnId agentMessage toView $ CRReceivedContactRequest cReq showToast (localDisplayName <> "> ") "wants to connect to you" - withAckMessage :: ConnId -> MsgMeta -> m () -> m () - withAckMessage cId MsgMeta {recipient = (msgId, _)} action = - action `E.finally` withAgent (\a -> ackMessage a cId msgId `catchError` \_ -> pure ()) + withCompletedCommand :: Connection -> ACommand 'Agent -> (CommandData -> m ()) -> m () + withCompletedCommand Connection {connId} agentMsg action = do + let agentMsgTag = aCommandTag agentMsg + cmdData_ <- withStore' $ \db -> getCommandDataByCorrId db user corrId + case cmdData_ of + Just cmdData@CommandData {cmdId, cmdConnId = Just cmdConnId', cmdFunction} + | connId == cmdConnId' && agentMsgTag == commandExpectedResponse cmdFunction -> do + withStore' $ \db -> updateCommandStatus db user cmdId CSCompleted + action cmdData + | otherwise -> throwChatError . CEAgentCommandError $ "not matching connection id or unexpected response, details - connId = " <> show connId <> ", agentMsgTag = " <> show agentMsgTag <> ", cmdData " <> show cmdData + _ -> throwChatError . CEAgentCommandError $ "no connection or connection id, details - connId = " <> show connId <> ", agentMsgTag = " <> show agentMsgTag <> ", corrId = " <> commandId corrId - ackMsgDeliveryEvent :: Connection -> MsgMeta -> m () - ackMsgDeliveryEvent Connection {connId} MsgMeta {recipient = (msgId, _)} = - withStore $ \db -> createRcvMsgDeliveryEvent db connId msgId MDSRcvAcknowledged + createAckCmd :: Connection -> m CommandId + createAckCmd Connection {connId} = do + withStore' $ \db -> createCommand db user (Just connId) CFAckMessage + + withAckMessage :: ConnId -> CommandId -> MsgMeta -> m () -> m () + withAckMessage cId cmdId MsgMeta {recipient = (msgId, _)} action = + -- [async agent commands] command should be asynchronous, continuation is ackMsgDeliveryEvent + action `E.finally` withAgent (\a -> ackMessageAsync a (aCorrId cmdId) cId msgId `catchError` \_ -> pure ()) + + ackMsgDeliveryEvent :: Connection -> CommandId -> m () + ackMsgDeliveryEvent Connection {connId} ackCmdId = + withStore' $ \db -> createRcvMsgDeliveryEvent db connId ackCmdId MDSRcvAcknowledged sentMsgDeliveryEvent :: Connection -> AgentMsgId -> m () sentMsgDeliveryEvent Connection {connId} msgId = @@ -1889,10 +1960,13 @@ processAgentMessage (Just user@User {userId, profile}) agentConnId agentMessage unless cancelled $ if fName == fileName then - tryError (withAgent $ \a -> joinConnection a True fileConnReq . directMessage $ XOk) >>= \case - Right acId -> - withStore' $ \db -> createSndGroupFileTransferConnection db userId fileId acId m - Left e -> throwError e + tryError + -- [async agent commands] no continuation needed, but command should be asynchronous for stability + (joinAgentConnectionAsync user True fileConnReq . directMessage $ XOk) + >>= \case + Right connIds -> + withStore' $ \db -> createSndGroupFileTransferConnection db user fileId connIds m + Left e -> throwError e else messageError "x.file.acpt.inv: fileName is different from expected" groupMsgToView :: GroupInfo -> GroupMember -> ChatItem 'CTGroup 'MDRcv -> MsgMeta -> m () @@ -2084,24 +2158,29 @@ processAgentMessage (Just user@User {userId, profile}) agentConnId agentMessage groupMsgToView gInfo m ci msgMeta toView $ CRJoinedGroupMemberConnecting gInfo m newMember - xGrpMemIntro :: Connection -> GroupInfo -> GroupMember -> MemberInfo -> m () - xGrpMemIntro conn gInfo@GroupInfo {groupId, membership} m memInfo@(MemberInfo memId _ _) = do + xGrpMemIntro :: GroupInfo -> GroupMember -> MemberInfo -> m () + xGrpMemIntro gInfo@GroupInfo {membership} m memInfo@(MemberInfo memId _ _) = do case memberCategory m of GCHostMember -> do members <- withStore' $ \db -> getGroupMembers db user gInfo if isMember memId gInfo members then messageWarning "x.grp.mem.intro ignored: member already exists" else do - (groupConnId, groupConnReq) <- withAgent $ \a -> createConnection a True SCMInvitation - (directConnId, directConnReq) <- withAgent $ \a -> createConnection a True SCMInvitation + -- [async agent commands] commands should be asynchronous, continuation is to send XGrpMemInv - have to remember one has completed and process on second + groupConnIds <- createAgentConnectionAsync user True SCMInvitation + directConnIds <- createAgentConnectionAsync user True SCMInvitation -- [incognito] direct connection with member has to be established using the same incognito profile [that was known to host and used for group membership] let customUserProfileId = if memberIncognito membership then Just (localProfileId $ memberProfile membership) else Nothing - newMember <- withStore $ \db -> createIntroReMember db user gInfo m memInfo groupConnId directConnId customUserProfileId - let msg = XGrpMemInv memId IntroInvitation {groupConnReq, directConnReq} - void $ sendDirectMessage conn msg (GroupId groupId) - withStore' $ \db -> updateGroupMemberStatus db userId newMember GSMemIntroInvited + void $ withStore $ \db -> createIntroReMember db user gInfo m memInfo groupConnIds directConnIds customUserProfileId _ -> messageError "x.grp.mem.intro can be only sent by host member" + sendXGrpMemIntro :: Int64 -> ConnReqInvitation -> XGrpMemIntroCont -> m () + sendXGrpMemIntro hostConnId directConnReq XGrpMemIntroCont {groupId, groupMemberId, memberId, groupConnReq} = do + hostConn <- withStore $ \db -> getConnectionById db user hostConnId + let msg = XGrpMemInv memberId IntroInvitation {groupConnReq, directConnReq} + void $ sendDirectMessage hostConn msg (GroupId groupId) + withStore' $ \db -> updateGroupMemberStatusById db userId groupMemberId GSMemIntroInvited + xGrpMemInv :: GroupInfo -> GroupMember -> MemberId -> IntroInvitation -> m () xGrpMemInv gInfo m memId introInv = do case memberCategory m of @@ -2127,10 +2206,11 @@ processAgentMessage (Just user@User {userId, profile}) agentConnId agentMessage withStore' $ \db -> saveMemberInvitation db toMember introInv -- [incognito] send membership incognito profile, create direct connection as incognito let msg = XGrpMemInfo (memberId (membership :: GroupMember)) (fromLocalProfile $ memberProfile membership) - groupConnId <- withAgent $ \a -> joinConnection a True groupConnReq $ directMessage msg - directConnId <- withAgent $ \a -> joinConnection a True directConnReq $ directMessage msg + -- [async agent commands] no continuation needed, but commands should be asynchronous for stability + groupConnIds <- joinAgentConnectionAsync user True groupConnReq $ directMessage msg + directConnIds <- joinAgentConnectionAsync user True directConnReq $ directMessage msg let customUserProfileId = if memberIncognito membership then Just (localProfileId $ memberProfile membership) else Nothing - withStore' $ \db -> createIntroToMemberContact db userId m toMember groupConnId directConnId customUserProfileId + withStore' $ \db -> createIntroToMemberContact db user m toMember groupConnIds directConnIds customUserProfileId xGrpMemDel :: GroupInfo -> GroupMember -> MemberId -> RcvMessage -> MsgMeta -> m () xGrpMemDel gInfo@GroupInfo {membership} m memId msg msgMeta = do @@ -2384,12 +2464,12 @@ sendPendingGroupMessages GroupMember {groupMemberId, localDisplayName} conn = do Nothing -> throwChatError $ CEGroupMemberIntroNotFound localDisplayName Just introId -> withStore' $ \db -> updateIntroStatus db introId GMIntroInvForwarded -saveRcvMSG :: ChatMonad m => Connection -> ConnOrGroupId -> MsgMeta -> MsgBody -> m RcvMessage -saveRcvMSG Connection {connId} connOrGroupId agentMsgMeta msgBody = do +saveRcvMSG :: ChatMonad m => Connection -> ConnOrGroupId -> MsgMeta -> MsgBody -> CommandId -> m RcvMessage +saveRcvMSG Connection {connId} connOrGroupId agentMsgMeta msgBody agentAckCmdId = do ChatMessage {msgId = sharedMsgId_, chatMsgEvent} <- liftEither $ parseChatMessage msgBody let agentMsgId = fst $ recipient agentMsgMeta newMsg = NewMessage {chatMsgEvent, msgBody} - rcvMsgDelivery = RcvMsgDelivery {connId, agentMsgId, agentMsgMeta} + rcvMsgDelivery = RcvMsgDelivery {connId, agentMsgId, agentMsgMeta, agentAckCmdId} withStore' $ \db -> createNewMessageAndRcvMsgDelivery db connOrGroupId newMsg sharedMsgId_ rcvMsgDelivery saveSndChatItem :: ChatMonad m => User -> ChatDirection c 'MDSnd -> SndMessage -> CIContent 'MDSnd -> Maybe (CIFile 'MDSnd) -> Maybe (CIQuote c) -> m (ChatItem c 'MDSnd) @@ -2416,9 +2496,22 @@ mkChatItem cd ciId content file quotedItem sharedMsgId itemTs currentTs = do meta = mkCIMeta ciId content itemText ciStatusNew sharedMsgId False False tz currentTs itemTs currentTs currentTs pure ChatItem {chatDir = toCIDirection cd, meta, content, formattedText = parseMaybeMarkdownList itemText, quotedItem, file} -allowAgentConnection :: ChatMonad m => Connection -> ConfirmationId -> ChatMsgEvent -> m () -allowAgentConnection conn confId msg = do - withAgent $ \a -> allowConnection a (aConnId conn) confId $ directMessage msg +createAgentConnectionAsync :: forall m c. (ChatMonad m, ConnectionModeI c) => User -> Bool -> SConnectionMode c -> m (CommandId, ConnId) +createAgentConnectionAsync user enableNtfs cMode = do + cmdId <- withStore' $ \db -> createCommand db user Nothing CFCreateConn + connId <- withAgent $ \a -> createConnectionAsync a (aCorrId cmdId) enableNtfs cMode + pure (cmdId, connId) + +joinAgentConnectionAsync :: ChatMonad m => User -> Bool -> ConnectionRequestUri c -> ConnInfo -> m (CommandId, ConnId) +joinAgentConnectionAsync user enableNtfs cReqUri cInfo = do + cmdId <- withStore' $ \db -> createCommand db user Nothing CFJoinConn + connId <- withAgent $ \a -> joinConnectionAsync a (aCorrId cmdId) enableNtfs cReqUri cInfo + pure (cmdId, connId) + +allowAgentConnectionAsync :: ChatMonad m => User -> Connection -> ConfirmationId -> ChatMsgEvent -> m () +allowAgentConnectionAsync user conn@Connection {connId} confId msg = do + cmdId <- withStore' $ \db -> createCommand db user (Just connId) CFAllowConn + withAgent $ \a -> allowConnectionAsync a (aCorrId cmdId) (aConnId conn) confId $ directMessage msg withStore' $ \db -> updateConnectionStatus db conn ConnAccepted getCreateActiveUser :: SQLiteStore -> IO User diff --git a/src/Simplex/Chat/Controller.hs b/src/Simplex/Chat/Controller.hs index 9d4d1cae4b..b477376cac 100644 --- a/src/Simplex/Chat/Controller.hs +++ b/src/Simplex/Chat/Controller.hs @@ -445,6 +445,7 @@ data ChatErrorType | CEAgentVersion | CEAgentNoSubResult {agentConnId :: AgentConnId} | CECommandError {message :: String} + | CEAgentCommandError {message :: String} deriving (Show, Exception, Generic) instance ToJSON ChatErrorType where diff --git a/src/Simplex/Chat/Messages.hs b/src/Simplex/Chat/Messages.hs index 07f77478fb..fa431fa3e6 100644 --- a/src/Simplex/Chat/Messages.hs +++ b/src/Simplex/Chat/Messages.hs @@ -913,7 +913,8 @@ data SndMsgDelivery = SndMsgDelivery data RcvMsgDelivery = RcvMsgDelivery { connId :: Int64, agentMsgId :: AgentMsgId, - agentMsgMeta :: MsgMeta + agentMsgMeta :: MsgMeta, + agentAckCmdId :: CommandId } data MsgMetaJSON = MsgMetaJSON diff --git a/src/Simplex/Chat/Migrations/M20220909_commands.hs b/src/Simplex/Chat/Migrations/M20220909_commands.hs new file mode 100644 index 0000000000..73e5d5bc6e --- /dev/null +++ b/src/Simplex/Chat/Migrations/M20220909_commands.hs @@ -0,0 +1,24 @@ +{-# LANGUAGE QuasiQuotes #-} + +module Simplex.Chat.Migrations.M20220909_commands where + +import Database.SQLite.Simple (Query) +import Database.SQLite.Simple.QQ (sql) + +m20220909_commands :: Query +m20220909_commands = + [sql| +CREATE TABLE commands ( + command_id INTEGER PRIMARY KEY, -- used as ACorrId + connection_id INTEGER REFERENCES connections ON DELETE CASCADE, + command_function TEXT NOT NULL, + command_status TEXT NOT NULL, + user_id INTEGER NOT NULL REFERENCES users ON DELETE CASCADE, + created_at TEXT NOT NULL DEFAULT (datetime('now')), + updated_at TEXT NOT NULL DEFAULT (datetime('now')) +); + +ALTER TABLE msg_deliveries ADD COLUMN agent_ack_cmd_id INTEGER; -- correlation id + +ALTER TABLE connections ADD COLUMN conn_req_inv BLOB; +|] diff --git a/src/Simplex/Chat/Migrations/chat_schema.sql b/src/Simplex/Chat/Migrations/chat_schema.sql index 55bd57cec5..f1e589bc9f 100644 --- a/src/Simplex/Chat/Migrations/chat_schema.sql +++ b/src/Simplex/Chat/Migrations/chat_schema.sql @@ -243,6 +243,7 @@ CREATE TABLE connections( via_user_contact_link INTEGER DEFAULT NULL REFERENCES user_contact_links(user_contact_link_id) ON DELETE SET NULL, custom_user_profile_id INTEGER REFERENCES contact_profiles ON DELETE SET NULL, + conn_req_inv BLOB, FOREIGN KEY(snd_file_id, connection_id) REFERENCES snd_files(file_id, connection_id) ON DELETE CASCADE @@ -299,7 +300,8 @@ CREATE TABLE msg_deliveries( agent_msg_meta TEXT, -- JSON with timestamps etc. sent in MSG, NULL for sent chat_ts TEXT NOT NULL DEFAULT(datetime('now')), created_at TEXT CHECK(created_at NOT NULL), - updated_at TEXT CHECK(updated_at NOT NULL), -- broker_ts for received, created_at for sent + updated_at TEXT CHECK(updated_at NOT NULL), + agent_ack_cmd_id INTEGER, -- broker_ts for received, created_at for sent UNIQUE(connection_id, agent_msg_id) ); CREATE TABLE msg_delivery_events( @@ -400,3 +402,12 @@ CREATE INDEX idx_chat_items_contacts ON chat_items( contact_id, chat_item_id ); +CREATE TABLE commands( + command_id INTEGER PRIMARY KEY, -- used as ACorrId + connection_id INTEGER REFERENCES connections ON DELETE CASCADE, + command_function TEXT NOT NULL, + command_status TEXT NOT NULL, + user_id INTEGER NOT NULL REFERENCES users ON DELETE CASCADE, + created_at TEXT NOT NULL DEFAULT(datetime('now')), + updated_at TEXT NOT NULL DEFAULT(datetime('now')) +); diff --git a/src/Simplex/Chat/Store.hs b/src/Simplex/Chat/Store.hs index 7130420aa3..e9e8c826ea 100644 --- a/src/Simplex/Chat/Store.hs +++ b/src/Simplex/Chat/Store.hs @@ -59,6 +59,7 @@ module Simplex.Chat.Store getPendingContactConnections, getContactConnections, getConnectionEntity, + getConnectionById, getConnectionsContacts, getGroupAndMember, updateConnectionStatus, @@ -83,6 +84,7 @@ module Simplex.Chat.Store getMemberInvitation, createMemberConnection, updateGroupMemberStatus, + updateGroupMemberStatusById, createNewGroupMember, deleteGroupMember, deleteGroupMemberConnection, @@ -175,6 +177,13 @@ module Simplex.Chat.Store createCall, deleteCalls, getCalls, + createCommand, + setCommandConnId, + updateCommandStatus, + getCommandDataByCorrId, + setConnConnReqInv, + getXGrpMemIntroContDirect, + getXGrpMemIntroContGroup, getPendingContactConnection, deletePendingContactConnection, updateContactSettings, @@ -234,9 +243,10 @@ import Simplex.Chat.Migrations.M20220818_chat_notifications import Simplex.Chat.Migrations.M20220822_groups_host_conn_custom_user_profile_id import Simplex.Chat.Migrations.M20220823_delete_broken_group_event_chat_items import Simplex.Chat.Migrations.M20220824_profiles_local_alias +import Simplex.Chat.Migrations.M20220909_commands import Simplex.Chat.Protocol import Simplex.Chat.Types -import Simplex.Messaging.Agent.Protocol (AgentMsgId, ConnId, InvitationId, MsgMeta (..)) +import Simplex.Messaging.Agent.Protocol (ACorrId, AgentMsgId, ConnId, InvitationId, MsgMeta (..)) import Simplex.Messaging.Agent.Store.SQLite (SQLiteStore (..), createSQLiteStore, firstRow, firstRow', maybeFirstRow, withTransaction) import Simplex.Messaging.Agent.Store.SQLite.Migrations (Migration (..)) import qualified Simplex.Messaging.Crypto as C @@ -268,7 +278,8 @@ schemaMigrations = ("20220818_chat_notifications", m20220818_chat_notifications), ("20220822_groups_host_conn_custom_user_profile_id", m20220822_groups_host_conn_custom_user_profile_id), ("20220823_delete_broken_group_event_chat_items", m20220823_delete_broken_group_event_chat_items), - ("20220824_profiles_local_alias", m20220824_profiles_local_alias) + ("20220824_profiles_local_alias", m20220824_profiles_local_alias), + ("20220909_commands", m20220909_commands) ] -- | The list of migrations in ascending order by date @@ -1268,6 +1279,19 @@ getConnectionEntity db user@User {userId, userContactId} agentConnId = do userContact_ [Only cReq] = Right UserContact {userContactLinkId, connReqContact = cReq} userContact_ _ = Left SEUserContactLinkNotFound +getConnectionById :: DB.Connection -> User -> Int64 -> ExceptT StoreError IO Connection +getConnectionById db User {userId} connId = ExceptT $ do + firstRow toConnection (SEConnectionNotFoundById connId) $ + DB.query + db + [sql| + SELECT connection_id, agent_conn_id, conn_level, via_contact, via_user_contact_link, custom_user_profile_id, + conn_status, conn_type, contact_id, group_member_id, snd_file_id, rcv_file_id, user_contact_link_id, created_at + FROM connections + WHERE user_id = ? AND connection_id = ? + |] + (userId, connId) + getConnectionsContacts :: DB.Connection -> UserId -> [ConnId] -> IO [ContactRef] getConnectionsContacts db userId agentConnIds = do DB.execute_ db "DROP TABLE IF EXISTS temp.conn_ids" @@ -1658,7 +1682,10 @@ createMemberConnection db userId GroupMember {groupMemberId} agentConnId = do void $ createMemberConnection_ db userId groupMemberId agentConnId Nothing 0 currentTs updateGroupMemberStatus :: DB.Connection -> UserId -> GroupMember -> GroupMemberStatus -> IO () -updateGroupMemberStatus db userId GroupMember {groupMemberId} memStatus = do +updateGroupMemberStatus db userId GroupMember {groupMemberId} = updateGroupMemberStatusById db userId groupMemberId + +updateGroupMemberStatusById :: DB.Connection -> UserId -> GroupMemberId -> GroupMemberStatus -> IO () +updateGroupMemberStatusById db userId groupMemberId memStatus = do currentTs <- getCurrentTime DB.execute db @@ -1779,7 +1806,7 @@ saveIntroInvitation db reMember toMember introInv = do WHERE group_member_intro_id = :intro_id |] [ ":intro_status" := GMIntroInvReceived, - ":group_queue_info" := groupConnReq introInv, + ":group_queue_info" := groupConnReq (introInv :: IntroInvitation), ":direct_queue_info" := directConnReq introInv, ":updated_at" := currentTs, ":intro_id" := introId intro @@ -1824,11 +1851,12 @@ getIntroduction_ db reMember toMember = ExceptT $ do in Right GroupMemberIntro {introId, reMember, toMember, introStatus, introInvitation} toIntro _ = Left SEIntroNotFound -createIntroReMember :: DB.Connection -> User -> GroupInfo -> GroupMember -> MemberInfo -> ConnId -> ConnId -> Maybe ProfileId -> ExceptT StoreError IO GroupMember -createIntroReMember db user@User {userId} gInfo@GroupInfo {groupId} _host@GroupMember {memberContactId, activeConn} memInfo@(MemberInfo _ _ memberProfile) groupAgentConnId directAgentConnId customUserProfileId = do +createIntroReMember :: DB.Connection -> User -> GroupInfo -> GroupMember -> MemberInfo -> (CommandId, ConnId) -> (CommandId, ConnId) -> Maybe ProfileId -> ExceptT StoreError IO GroupMember +createIntroReMember db user@User {userId} gInfo@GroupInfo {groupId} _host@GroupMember {memberContactId, activeConn} memInfo@(MemberInfo _ _ memberProfile) (groupCmdId, groupAgentConnId) (directCmdId, directAgentConnId) customUserProfileId = do let cLevel = 1 + maybe 0 (connLevel :: Connection -> Int) activeConn currentTs <- liftIO getCurrentTime Connection {connId = directConnId} <- liftIO $ createConnection_ db userId ConnContact Nothing directAgentConnId memberContactId Nothing customUserProfileId cLevel currentTs + liftIO $ setCommandConnId db user directCmdId directConnId (localDisplayName, contactId, memProfileId) <- createContact_ db userId directConnId memberProfile (Just groupId) currentTs liftIO $ do let newMember = @@ -1842,15 +1870,18 @@ createIntroReMember db user@User {userId} gInfo@GroupInfo {groupId} _host@GroupM memProfileId } member <- createNewMember_ db user gInfo newMember currentTs - conn <- createMemberConnection_ db userId (groupMemberId' member) groupAgentConnId memberContactId cLevel currentTs + conn@Connection {connId = groupConnId} <- createMemberConnection_ db userId (groupMemberId' member) groupAgentConnId memberContactId cLevel currentTs + liftIO $ setCommandConnId db user groupCmdId groupConnId pure (member :: GroupMember) {activeConn = Just conn} -createIntroToMemberContact :: DB.Connection -> UserId -> GroupMember -> GroupMember -> ConnId -> ConnId -> Maybe ProfileId -> IO () -createIntroToMemberContact db userId GroupMember {memberContactId = viaContactId, activeConn} _to@GroupMember {groupMemberId, localDisplayName} groupAgentConnId directAgentConnId customUserProfileId = do +createIntroToMemberContact :: DB.Connection -> User -> GroupMember -> GroupMember -> (CommandId, ConnId) -> (CommandId, ConnId) -> Maybe ProfileId -> IO () +createIntroToMemberContact db user@User {userId} GroupMember {memberContactId = viaContactId, activeConn} _to@GroupMember {groupMemberId, localDisplayName} (groupCmdId, groupAgentConnId) (directCmdId, directAgentConnId) customUserProfileId = do let cLevel = 1 + maybe 0 (connLevel :: Connection -> Int) activeConn currentTs <- getCurrentTime - void $ createMemberConnection_ db userId groupMemberId groupAgentConnId viaContactId cLevel currentTs + Connection {connId = groupConnId} <- createMemberConnection_ db userId groupMemberId groupAgentConnId viaContactId cLevel currentTs + setCommandConnId db user groupCmdId groupConnId Connection {connId = directConnId} <- createConnection_ db userId ConnContact Nothing directAgentConnId viaContactId Nothing customUserProfileId cLevel currentTs + setCommandConnId db user directCmdId directConnId contactId <- createMemberContact_ directConnId currentTs updateMember_ contactId currentTs where @@ -1978,10 +2009,11 @@ createSndGroupFileTransfer db userId GroupInfo {groupId} filePath FileInvitation (userId, groupId, fileName, filePath, fileSize, chunkSize, CIFSSndStored, currentTs, currentTs) insertedRowId db -createSndGroupFileTransferConnection :: DB.Connection -> UserId -> Int64 -> ConnId -> GroupMember -> IO () -createSndGroupFileTransferConnection db userId fileId acId GroupMember {groupMemberId} = do +createSndGroupFileTransferConnection :: DB.Connection -> User -> Int64 -> (CommandId, ConnId) -> GroupMember -> IO () +createSndGroupFileTransferConnection db user@User {userId} fileId (cmdId, acId) GroupMember {groupMemberId} = do currentTs <- getCurrentTime Connection {connId} <- createSndFileConnection_ db userId fileId acId + setCommandConnId db user cmdId connId DB.execute db "INSERT INTO snd_files (file_id, file_status, connection_id, group_member_id, created_at, updated_at) VALUES (?,?,?,?,?,?)" @@ -2431,7 +2463,7 @@ createSndMsgDelivery db sndMsgDelivery messageId = do createMsgDeliveryEvent_ db msgDeliveryId MDSSndAgent currentTs createNewMessageAndRcvMsgDelivery :: DB.Connection -> ConnOrGroupId -> NewMessage -> Maybe SharedMsgId -> RcvMsgDelivery -> IO RcvMessage -createNewMessageAndRcvMsgDelivery db connOrGroupId NewMessage {chatMsgEvent, msgBody} sharedMsgId_ RcvMsgDelivery {connId, agentMsgId, agentMsgMeta} = do +createNewMessageAndRcvMsgDelivery db connOrGroupId NewMessage {chatMsgEvent, msgBody} sharedMsgId_ RcvMsgDelivery {connId, agentMsgId, agentMsgMeta, agentAckCmdId} = do currentTs <- getCurrentTime DB.execute db @@ -2440,8 +2472,8 @@ createNewMessageAndRcvMsgDelivery db connOrGroupId NewMessage {chatMsgEvent, msg msgId <- insertedRowId db DB.execute db - "INSERT INTO msg_deliveries (message_id, connection_id, agent_msg_id, agent_msg_meta, chat_ts, created_at, updated_at) VALUES (?,?,?,?,?,?,?)" - (msgId, connId, agentMsgId, msgMetaJson agentMsgMeta, snd $ broker agentMsgMeta, currentTs, currentTs) + "INSERT INTO msg_deliveries (message_id, connection_id, agent_msg_id, agent_msg_meta, agent_ack_cmd_id, chat_ts, created_at, updated_at) VALUES (?,?,?,?,?,?,?,?)" + (msgId, connId, agentMsgId, msgMetaJson agentMsgMeta, agentAckCmdId, snd $ broker agentMsgMeta, currentTs, currentTs) msgDeliveryId <- insertedRowId db createMsgDeliveryEvent_ db msgDeliveryId MDSRcvAgent currentTs pure RcvMessage {msgId, chatMsgEvent, sharedMsgId_, msgBody} @@ -2457,12 +2489,12 @@ createSndMsgDeliveryEvent db connId agentMsgId sndMsgDeliveryStatus = do currentTs <- getCurrentTime createMsgDeliveryEvent_ db msgDeliveryId sndMsgDeliveryStatus currentTs -createRcvMsgDeliveryEvent :: DB.Connection -> Int64 -> AgentMsgId -> MsgDeliveryStatus 'MDRcv -> ExceptT StoreError IO () -createRcvMsgDeliveryEvent db connId agentMsgId rcvMsgDeliveryStatus = do - msgDeliveryId <- getMsgDeliveryId_ db connId agentMsgId - liftIO $ do +createRcvMsgDeliveryEvent :: DB.Connection -> Int64 -> CommandId -> MsgDeliveryStatus 'MDRcv -> IO () +createRcvMsgDeliveryEvent db connId cmdId rcvMsgDeliveryStatus = do + msgDeliveryId <- getMsgDeliveryIdByCmdId_ db connId cmdId + forM_ msgDeliveryId $ \mdId -> do currentTs <- getCurrentTime - createMsgDeliveryEvent_ db msgDeliveryId rcvMsgDeliveryStatus currentTs + createMsgDeliveryEvent_ db mdId rcvMsgDeliveryStatus currentTs createSndMsgDelivery_ :: DB.Connection -> SndMsgDelivery -> MessageId -> UTCTime -> IO Int64 createSndMsgDelivery_ db SndMsgDelivery {connId, agentMsgId} messageId createdAt = do @@ -2500,6 +2532,19 @@ getMsgDeliveryId_ db connId agentMsgId = |] (connId, agentMsgId) +getMsgDeliveryIdByCmdId_ :: DB.Connection -> Int64 -> CommandId -> IO (Maybe AgentMsgId) +getMsgDeliveryIdByCmdId_ db connId cmdId = + maybeFirstRow fromOnly $ + DB.query + db + [sql| + SELECT msg_delivery_id + FROM msg_deliveries + WHERE connection_id = ? AND agent_ack_cmd_id = ? + LIMIT 1 + |] + (connId, cmdId) + createPendingGroupMessage :: DB.Connection -> Int64 -> MessageId -> Maybe Int64 -> IO () createPendingGroupMessage db groupMemberId messageId introId_ = do currentTs <- getCurrentTime @@ -3850,6 +3895,129 @@ getCalls db User {userId} = do toCall :: (ContactId, CallId, ChatItemId, CallState, UTCTime) -> Call toCall (contactId, callId, chatItemId, callState, callTs) = Call {contactId, callId, chatItemId, callState, callTs} +createCommand :: DB.Connection -> User -> Maybe Int64 -> CommandFunction -> IO CommandId +createCommand db User {userId} connId commandFunction = do + currentTs <- getCurrentTime + DB.execute + db + [sql| + INSERT INTO commands (connection_id, command_function, command_status, user_id, created_at, updated_at) + VALUES (?,?,?,?,?,?) + |] + (connId, commandFunction, CSCreated, userId, currentTs, currentTs) + insertedRowId db + +setCommandConnId :: DB.Connection -> User -> CommandId -> Int64 -> IO () +setCommandConnId db User {userId} cmdId connId = do + updatedAt <- getCurrentTime + DB.execute + db + [sql| + UPDATE commands + SET connection_id = ?, updated_at = ? + WHERE user_id = ? AND command_id = ? + |] + (connId, updatedAt, userId, cmdId) + +updateCommandStatus :: DB.Connection -> User -> CommandId -> CommandStatus -> IO () +updateCommandStatus db User {userId} cmdId status = do + updatedAt <- getCurrentTime + DB.execute + db + [sql| + UPDATE commands + SET command_status = ?, updated_at = ? + WHERE user_id = ? AND command_id = ? + |] + (status, updatedAt, userId, cmdId) + +getCommandDataByCorrId :: DB.Connection -> User -> ACorrId -> IO (Maybe CommandData) +getCommandDataByCorrId db User {userId} corrId = + maybeFirstRow toCommandData $ + DB.query + db + [sql| + SELECT command_id, connection_id, command_function, command_status + FROM commands + WHERE user_id = ? AND command_id = ? + |] + (userId, commandId corrId) + where + toCommandData :: (CommandId, Maybe Int64, CommandFunction, CommandStatus) -> CommandData + toCommandData (cmdId, cmdConnId, cmdFunction, cmdStatus) = CommandData {cmdId, cmdConnId, cmdFunction, cmdStatus} + +setConnConnReqInv :: DB.Connection -> User -> Int64 -> ConnReqInvitation -> IO () +setConnConnReqInv db User {userId} connId connReq = do + updatedAt <- getCurrentTime + DB.execute + db + [sql| + UPDATE connections + SET conn_req_inv = ?, updated_at = ? + WHERE user_id = ? AND connection_id = ? + |] + (connReq, updatedAt, userId, connId) + +getXGrpMemIntroContDirect :: DB.Connection -> User -> Contact -> IO (Maybe (Int64, XGrpMemIntroCont)) +getXGrpMemIntroContDirect db User {userId} Contact {contactId} = do + fmap join . maybeFirstRow toCont $ + DB.query + db + [sql| + SELECT ch.connection_id, g.group_id, m.group_member_id, m.member_id, c.conn_req_inv + FROM contacts ct + JOIN group_members m ON m.contact_id = ct.contact_id + LEFT JOIN connections c ON c.connection_id = ( + SELECT MAX(cc.connection_id) + FROM connections cc + WHERE cc.group_member_id = m.group_member_id + ) + JOIN groups g ON g.group_id = m.group_id AND g.group_id = ct.via_group + JOIN group_members mh ON mh.group_id = g.group_id + LEFT JOIN connections ch ON ch.connection_id = ( + SELECT max(cc.connection_id) + FROM connections cc + where cc.group_member_id = mh.group_member_id + ) + WHERE ct.user_id = ? AND ct.contact_id = ? AND mh.member_category = ? + |] + (userId, contactId, GCHostMember) + where + toCont :: (Int64, GroupId, GroupMemberId, MemberId, Maybe ConnReqInvitation) -> Maybe (Int64, XGrpMemIntroCont) + toCont (hostConnId, groupId, groupMemberId, memberId, connReq_) = case connReq_ of + Just groupConnReq -> Just (hostConnId, XGrpMemIntroCont {groupId, groupMemberId, memberId, groupConnReq}) + _ -> Nothing + +getXGrpMemIntroContGroup :: DB.Connection -> User -> GroupMember -> IO (Maybe (Int64, ConnReqInvitation)) +getXGrpMemIntroContGroup db User {userId} GroupMember {groupMemberId} = do + fmap join . maybeFirstRow toCont $ + DB.query + db + [sql| + SELECT ch.connection_id, c.conn_req_inv + FROM group_members m + JOIN contacts ct ON ct.contact_id = m.contact_id + LEFT JOIN connections c ON c.connection_id = ( + SELECT MAX(cc.connection_id) + FROM connections cc + WHERE cc.contact_id = ct.contact_id + ) + JOIN groups g ON g.group_id = m.group_id AND g.group_id = ct.via_group + JOIN group_members mh ON mh.group_id = g.group_id + LEFT JOIN connections ch ON ch.connection_id = ( + SELECT max(cc.connection_id) + FROM connections cc + where cc.group_member_id = mh.group_member_id + ) + WHERE m.user_id = ? AND m.group_member_id = ? AND mh.member_category = ? + |] + (userId, groupMemberId, GCHostMember) + where + toCont :: (Int64, Maybe ConnReqInvitation) -> Maybe (Int64, ConnReqInvitation) + toCont (hostConnId, connReq_) = case connReq_ of + Just connReq -> Just (hostConnId, connReq) + _ -> Nothing + -- | Saves unique local display name based on passed displayName, suffixed with _N if required. -- This function should be called inside transaction. withLocalDisplayName :: forall a. DB.Connection -> UserId -> Text -> (Text -> IO (Either StoreError a)) -> IO (Either StoreError a) @@ -3936,6 +4104,7 @@ data StoreError | SESharedMsgIdNotFoundByFileId {fileId :: FileTransferId} | SEFileIdNotFoundBySharedMsgId {sharedMsgId :: SharedMsgId} | SEConnectionNotFound {agentConnId :: AgentConnId} + | SEConnectionNotFoundById {connId :: Int64} | SEPendingConnectionNotFound {connId :: Int64} | SEIntroNotFound | SEUniqueID diff --git a/src/Simplex/Chat/Types.hs b/src/Simplex/Chat/Types.hs index c2416cdd5a..3e1df4464c 100644 --- a/src/Simplex/Chat/Types.hs +++ b/src/Simplex/Chat/Types.hs @@ -20,7 +20,7 @@ import qualified Data.Aeson as J import qualified Data.Aeson.Encoding as JE import qualified Data.Aeson.Types as JT import qualified Data.Attoparsec.ByteString.Char8 as A -import Data.ByteString.Char8 (ByteString) +import Data.ByteString.Char8 (ByteString, pack, unpack) import qualified Data.ByteString.Char8 as B import Data.Int (Int64) import Data.Maybe (isJust) @@ -34,7 +34,7 @@ import Database.SQLite.Simple.Internal (Field (..)) import Database.SQLite.Simple.Ok (Ok (Ok)) import Database.SQLite.Simple.ToField (ToField (..)) import GHC.Generics (Generic) -import Simplex.Messaging.Agent.Protocol (ConnId, ConnectionMode (..), ConnectionRequestUri, InvitationId) +import Simplex.Messaging.Agent.Protocol (ACommandTag (..), ACorrId, AParty (..), ConnId, ConnectionMode (..), ConnectionRequestUri, InvitationId) import Simplex.Messaging.Encoding.String import Simplex.Messaging.Parsers (dropPrefix, fromTextField_, sumTypeJSON) import Simplex.Messaging.Util ((<$?>)) @@ -915,3 +915,77 @@ type JSONString = String textParseJSON :: TextEncoding a => String -> J.Value -> JT.Parser a textParseJSON name = J.withText name $ maybe (fail $ "bad " <> name) pure . textDecode + +type CommandId = Int64 + +aCorrId :: CommandId -> ACorrId +aCorrId = pack . show + +commandId :: ACorrId -> String +commandId = unpack + +data CommandStatus + = CSCreated + | CSCompleted + deriving (Show, Generic) + +instance FromField CommandStatus where fromField = fromTextField_ textDecode + +instance ToField CommandStatus where toField = toField . textEncode + +instance TextEncoding CommandStatus where + textDecode = \case + "created" -> Just CSCreated + "completed" -> Just CSCompleted + _ -> Nothing + textEncode = \case + CSCreated -> "created" + CSCompleted -> "completed" + +data CommandFunction + = CFCreateConn + | CFJoinConn + | CFAllowConn + | CFAckMessage + deriving (Eq, Show, Generic) + +instance FromField CommandFunction where fromField = fromTextField_ textDecode + +instance ToField CommandFunction where toField = toField . textEncode + +instance TextEncoding CommandFunction where + textDecode = \case + "create_conn" -> Just CFCreateConn + "join_conn" -> Just CFJoinConn + "allow_conn" -> Just CFAllowConn + "ack_message" -> Just CFAckMessage + _ -> Nothing + textEncode = \case + CFCreateConn -> "create_conn" + CFJoinConn -> "join_conn" + CFAllowConn -> "allow_conn" + CFAckMessage -> "ack_message" + +commandExpectedResponse :: CommandFunction -> ACommandTag 'Agent +commandExpectedResponse = \case + CFCreateConn -> INV_ + CFJoinConn -> OK_ + CFAllowConn -> OK_ + CFAckMessage -> OK_ + +data CommandData = CommandData + { cmdId :: CommandId, + cmdConnId :: Maybe Int64, + cmdFunction :: CommandFunction, + cmdStatus :: CommandStatus + } + deriving (Show) + +-- ad-hoc type for data required for XGrpMemIntro continuation +data XGrpMemIntroCont = XGrpMemIntroCont + { groupId :: GroupId, + groupMemberId :: GroupMemberId, + memberId :: MemberId, + groupConnReq :: ConnReqInvitation + } + deriving (Show) diff --git a/src/Simplex/Chat/View.hs b/src/Simplex/Chat/View.hs index fbf9878c65..f9e5ef4ad1 100644 --- a/src/Simplex/Chat/View.hs +++ b/src/Simplex/Chat/View.hs @@ -926,6 +926,7 @@ viewChatError = \case CEAgentVersion -> ["unsupported agent version"] CEAgentNoSubResult connId -> ["no subscription result for connection: " <> sShow connId] CECommandError e -> ["bad chat command: " <> plain e] + CEAgentCommandError e -> ["agent command error: " <> plain e] -- e -> ["chat error: " <> sShow e] ChatErrorStore err -> case err of SEDuplicateName -> ["this display name is already used by user, contact or group"] diff --git a/stack.yaml b/stack.yaml index ef535363c4..ad0e40179a 100644 --- a/stack.yaml +++ b/stack.yaml @@ -49,7 +49,7 @@ extra-deps: # - simplexmq-1.0.0@sha256:34b2004728ae396e3ae449cd090ba7410781e2b3cefc59259915f4ca5daa9ea8,8561 # - ../simplexmq - github: simplex-chat/simplexmq - commit: 50c210c5c0c7f792c39123c2177bb60b307295b9 + commit: afecefc3adee044545144a269ad7df7b738af65a # - ../direct-sqlcipher - github: simplex-chat/direct-sqlcipher commit: 34309410eb2069b029b8fc1872deb1e0db123294 diff --git a/tests/ChatClient.hs b/tests/ChatClient.hs index 00197bc58b..c2329e4ab4 100644 --- a/tests/ChatClient.hs +++ b/tests/ChatClient.hs @@ -8,7 +8,7 @@ module ChatClient where -import Control.Concurrent (ThreadId, forkIOWithUnmask, killThread) +import Control.Concurrent (ThreadId, forkIO, forkIOWithUnmask, killThread, threadDelay) import Control.Concurrent.Async import Control.Concurrent.STM import Control.Exception (bracket, bracket_) @@ -129,9 +129,10 @@ startTestChat_ st cfg opts dbFilePrefix user = do stopTestChat :: TestCC -> IO () stopTestChat TestCC {chatController = cc, chatAsync, termAsync} = do - stopChatController cc + void . forkIO $ stopChatController cc uninterruptibleCancel termAsync uninterruptibleCancel chatAsync + threadDelay 100000 withNewTestChat :: String -> Profile -> (TestCC -> IO a) -> IO a withNewTestChat = withNewTestChatCfgOpts testCfg testOpts