store messages (#166)

Co-authored-by: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com>
This commit is contained in:
Efim Poberezkin
2021-12-29 23:11:55 +04:00
committed by GitHub
parent a7703209f2
commit 81f29d679b
7 changed files with 612 additions and 93 deletions
+97 -55
View File
@@ -51,6 +51,7 @@ import Simplex.Messaging.Agent.Env.SQLite (AgentConfig (..), defaultAgentConfig)
import Simplex.Messaging.Agent.Protocol
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Parsers (parseAll)
import Simplex.Messaging.Protocol (MsgBody)
import qualified Simplex.Messaging.Protocol as SMP
import Simplex.Messaging.Util (bshow, raceAny_, tryError)
import System.Exit (exitFailure, exitSuccess)
@@ -262,7 +263,7 @@ processChatCommand user@User {userId, profile} = \case
(agentConnId, cReq) <- withAgent (`createConnection` SCMInvitation)
GroupMember {memberId} <- withStore $ \st -> createContactGroupMember st gVar user groupId contact memRole agentConnId
let msg = XGrpInv $ GroupInvitation (userMemberId, userRole) (memberId, memRole) cReq groupProfile
sendDirectMessage (contactConnId contact) msg
sendDirectMessage (contactConn contact) msg
showSentGroupInvitation gName cName
setActive $ ActiveG gName
JoinGroup gName -> do
@@ -306,7 +307,6 @@ processChatCommand user@User {userId, profile} = \case
showGroupMembers group
ListGroups -> withStore (`getUserGroupNames` userId) >>= showGroupsList
SendGroupMessage gName msg -> do
-- TODO save sent messages
-- TODO save pending message delivery for members without connections
Group {members, membership} <- withStore $ \st -> getGroup st user gName
unless (memberActive membership) $ chatError CEGroupMemberUserRemoved
@@ -320,7 +320,7 @@ processChatCommand user@User {userId, profile} = \case
let fileInv = FileInvitation {fileName = takeFileName f, fileSize, fileConnReq}
SndFileTransfer {fileId} <- withStore $ \st ->
createSndFileTransfer st userId contact f fileInv agentConnId chSize
sendDirectMessage (contactConnId contact) $ XFile fileInv
sendDirectMessage (contactConn contact) $ XFile fileInv
showSentFileInfo fileId
setActive $ ActiveC cName
SendGroupFile gName f -> do
@@ -332,8 +332,9 @@ processChatCommand user@User {userId, profile} = \case
(connId, fileConnReq) <- withAgent (`createConnection` SCMInvitation)
pure (m, connId, FileInvitation {fileName, fileSize, fileConnReq})
fileId <- withStore $ \st -> createSndGroupFileTransfer st userId group ms f fileSize chSize
-- TODO sendGroupMessage - same file invitation to all
forM_ ms $ \(m, _, fileInv) ->
traverse (`sendDirectMessage` XFile fileInv) $ memberConnId m
traverse (`sendDirectMessage` XFile fileInv) $ memberConn m
showSentFileInfo fileId
setActive $ ActiveG gName
ReceiveFile fileId filePath_ -> do
@@ -361,7 +362,7 @@ processChatCommand user@User {userId, profile} = \case
user' <- withStore $ \st -> updateUserProfile st user p
asks currentUser >>= atomically . (`writeTVar` user')
contacts <- withStore (`getUserContacts` user)
forM_ contacts $ \ct -> sendDirectMessage (contactConnId ct) $ XInfo p
forM_ contacts $ \ct -> sendDirectMessage (contactConn ct) $ XInfo p
showUserProfileUpdated user user'
ShowProfile -> showUserProfile profile
QuitChat -> liftIO exitSuccess
@@ -375,7 +376,7 @@ processChatCommand user@User {userId, profile} = \case
sendMessageCmd cName msg = do
contact <- withStore $ \st -> getContact st userId cName
let msgEvent = XMsgNew $ MsgContent MTText [] [MsgContentBody {contentType = SimplexContentType XCText, contentData = msg}]
sendDirectMessage (contactConnId contact) msgEvent
sendDirectMessage (contactConn contact) msgEvent
setActive $ ActiveC cName
contactMember :: Contact -> [GroupMember] -> Maybe GroupMember
contactMember Contact {contactId} =
@@ -530,21 +531,27 @@ processAgentMessage user@User {userId, profile} agentConnId agentMessage = do
allowAgentConnection conn confId $ XInfo profile
INFO connInfo ->
saveConnInfo conn connInfo
MSG meta _ ->
MSG meta msgBody -> do
_ <- saveRcvMSG conn meta msgBody
withAckMessage agentConnId meta $ pure ()
ackMsgDeliveryEvent conn meta
SENT msgId ->
sentMsgDeliveryEvent conn msgId
_ -> pure ()
Just ct@Contact {localDisplayName = c} -> case agentMsg of
MSG meta msgBody -> withAckMessage agentConnId meta $ do
ChatMessage {chatMsgEvent} <- liftEither $ parseChatMessage msgBody
case chatMsgEvent of
XMsgNew (MsgContent MTText [] body) -> newTextMessage c meta $ find (isSimplexContentType XCText) body
XFile fInv -> processFileInvitation ct meta fInv
XInfo p -> xInfo ct p
XGrpInv gInv -> processGroupInvitation ct gInv
XInfoProbe probe -> xInfoProbe ct probe
XInfoProbeCheck probeHash -> xInfoProbeCheck ct probeHash
XInfoProbeOk probe -> xInfoProbeOk ct probe
_ -> pure ()
MSG meta msgBody -> do
chatMsgEvent <- saveRcvMSG conn meta msgBody
withAckMessage agentConnId meta $
case chatMsgEvent of
XMsgNew (MsgContent MTText [] body) -> newTextMessage c meta $ find (isSimplexContentType XCText) body
XFile fInv -> processFileInvitation ct meta fInv
XInfo p -> xInfo ct p
XGrpInv gInv -> processGroupInvitation ct gInv
XInfoProbe probe -> xInfoProbe ct probe
XInfoProbeCheck probeHash -> xInfoProbeCheck ct probeHash
XInfoProbeOk probe -> xInfoProbeOk ct probe
_ -> pure ()
ackMsgDeliveryEvent conn meta
CONF confId connInfo -> do
-- confirming direct connection with a member
ChatMessage {chatMsgEvent} <- liftEither $ parseChatMessage connInfo
@@ -576,6 +583,8 @@ processAgentMessage user@User {userId, profile} agentConnId agentMessage = do
when (memberIsReady m) $ do
notifyMemberConnected gName m
when (memberCategory m == GCPreMember) $ probeMatchingContacts ct
SENT msgId ->
sentMsgDeliveryEvent conn msgId
END -> do
showContactAnotherClient c
showToast (c <> "> ") "connected to another client"
@@ -641,7 +650,7 @@ processAgentMessage user@User {userId, profile} agentConnId agentMessage = do
intros <- withStore $ \st -> createIntroductions st group m
sendGroupMessage members . XGrpMemNew $ memberInfo m
forM_ intros $ \intro -> do
sendDirectMessage agentConnId . XGrpMemIntro . memberInfo $ reMember intro
sendDirectMessage conn . XGrpMemIntro . memberInfo $ reMember intro
withStore $ \st -> updateIntroStatus st intro GMIntroSent
_ -> do
-- TODO send probe and decide whether to use existing contact connection or the new contact connection
@@ -654,20 +663,24 @@ processAgentMessage user@User {userId, profile} agentConnId agentMessage = do
when (contactIsReady ct) $ do
notifyMemberConnected gName m
when (memberCategory m == GCPreMember) $ probeMatchingContacts ct
MSG meta msgBody -> withAckMessage agentConnId meta $ do
ChatMessage {chatMsgEvent} <- liftEither $ parseChatMessage msgBody
case chatMsgEvent of
XMsgNew (MsgContent MTText [] body) ->
newGroupTextMessage gName m meta $ find (isSimplexContentType XCText) body
XFile fInv -> processGroupFileInvitation gName m meta fInv
XGrpMemNew memInfo -> xGrpMemNew gName m memInfo
XGrpMemIntro memInfo -> xGrpMemIntro gName m memInfo
XGrpMemInv memId introInv -> xGrpMemInv gName m memId introInv
XGrpMemFwd memInfo introInv -> xGrpMemFwd gName m memInfo introInv
XGrpMemDel memId -> xGrpMemDel gName m memId
XGrpLeave -> xGrpLeave gName m
XGrpDel -> xGrpDel gName m
_ -> messageError $ "unsupported message: " <> T.pack (show chatMsgEvent)
MSG meta msgBody -> do
chatMsgEvent <- saveRcvMSG conn meta msgBody
withAckMessage agentConnId meta $
case chatMsgEvent of
XMsgNew (MsgContent MTText [] body) ->
newGroupTextMessage gName m meta $ find (isSimplexContentType XCText) body
XFile fInv -> processGroupFileInvitation gName m meta fInv
XGrpMemNew memInfo -> xGrpMemNew gName m memInfo
XGrpMemIntro memInfo -> xGrpMemIntro conn gName m memInfo
XGrpMemInv memId introInv -> xGrpMemInv gName m memId introInv
XGrpMemFwd memInfo introInv -> xGrpMemFwd gName m memInfo introInv
XGrpMemDel memId -> xGrpMemDel gName m memId
XGrpLeave -> xGrpLeave gName m
XGrpDel -> xGrpDel gName m
_ -> messageError $ "unsupported message: " <> T.pack (show chatMsgEvent)
ackMsgDeliveryEvent conn meta
SENT msgId ->
sentMsgDeliveryEvent conn msgId
_ -> pure ()
processSndFileConn :: ACommand 'Agent -> Connection -> SndFileTransfer -> m ()
@@ -676,6 +689,7 @@ processAgentMessage user@User {userId, profile} agentConnId agentMessage = do
CONF confId connInfo -> do
ChatMessage {chatMsgEvent} <- liftEither $ parseChatMessage connInfo
case chatMsgEvent of
-- TODO save XFileAcpt message
XFileAcpt name
| name == fileName -> do
withStore $ \st -> updateSndFileStatus st ft FSAccepted
@@ -755,6 +769,14 @@ processAgentMessage user@User {userId, profile} agentConnId agentMessage = do
withAckMessage cId MsgMeta {recipient = (msgId, _)} action =
action `E.finally` withAgent (\a -> ackMessage a cId msgId `catchError` \_ -> pure ())
ackMsgDeliveryEvent :: Connection -> MsgMeta -> m ()
ackMsgDeliveryEvent Connection {connId} MsgMeta {recipient = (msgId, _)} =
withStore $ \st -> createRcvMsgDeliveryEvent st connId msgId MDSRcvAcknowledged
sentMsgDeliveryEvent :: Connection -> AgentMsgId -> m ()
sentMsgDeliveryEvent Connection {connId} msgId =
withStore $ \st -> createSndMsgDeliveryEvent st connId msgId MDSSndSent
badRcvFileChunk :: RcvFileTransfer -> String -> m ()
badRcvFileChunk ft@RcvFileTransfer {fileStatus} err =
case fileStatus of
@@ -773,13 +795,13 @@ processAgentMessage user@User {userId, profile} agentConnId agentMessage = do
probeMatchingContacts ct = do
gVar <- asks idsDrg
(probe, probeId) <- withStore $ \st -> createSentProbe st gVar userId ct
sendDirectMessage (contactConnId ct) $ XInfoProbe probe
sendDirectMessage (contactConn ct) $ XInfoProbe probe
cs <- withStore (\st -> getMatchingContacts st userId ct)
let probeHash = C.sha256Hash probe
forM_ cs $ \c -> sendProbeHash c probeHash probeId `catchError` const (pure ())
where
sendProbeHash c probeHash probeId = do
sendDirectMessage (contactConnId c) $ XInfoProbeCheck probeHash
sendDirectMessage (contactConn c) $ XInfoProbeCheck probeHash
withStore $ \st -> createSentProbeHash st userId probeId c
messageWarning :: Text -> m ()
@@ -792,7 +814,7 @@ processAgentMessage user@User {userId, profile} agentConnId agentMessage = do
newTextMessage c meta = \case
Just MsgContentBody {contentData = bs} -> do
let text = safeDecodeUtf8 bs
showReceivedMessage c (snd $ broker meta) (msgPlain text) (integrity meta)
showReceivedMessage c (snd $ broker meta) (msgPlain text) (integrity (meta :: MsgMeta))
showToast (c <> "> ") text
setActive $ ActiveC c
_ -> messageError "x.msg.new: no expected message body"
@@ -801,7 +823,7 @@ processAgentMessage user@User {userId, profile} agentConnId agentMessage = do
newGroupTextMessage gName GroupMember {localDisplayName = c} meta = \case
Just MsgContentBody {contentData = bs} -> do
let text = safeDecodeUtf8 bs
showReceivedGroupMessage gName c (snd $ broker meta) (msgPlain text) (integrity meta)
showReceivedGroupMessage gName c (snd $ broker meta) (msgPlain text) (integrity (meta :: MsgMeta))
showToast ("#" <> gName <> " " <> c <> "> ") text
setActive $ ActiveG gName
_ -> messageError "x.msg.new: no expected message body"
@@ -811,14 +833,14 @@ processAgentMessage user@User {userId, profile} agentConnId agentMessage = do
-- TODO chunk size has to be sent as part of invitation
chSize <- asks $ fileChunkSize . config
ft <- withStore $ \st -> createRcvFileTransfer st userId contact fInv chSize
showReceivedMessage c (snd $ broker meta) (receivedFileInvitation ft) (integrity meta)
showReceivedMessage c (snd $ broker meta) (receivedFileInvitation ft) (integrity (meta :: MsgMeta))
setActive $ ActiveC c
processGroupFileInvitation :: GroupName -> GroupMember -> MsgMeta -> FileInvitation -> m ()
processGroupFileInvitation gName m@GroupMember {localDisplayName = c} meta fInv = do
chSize <- asks $ fileChunkSize . config
ft <- withStore $ \st -> createRcvGroupFileTransfer st userId m fInv chSize
showReceivedGroupMessage gName c (snd $ broker meta) (receivedFileInvitation ft) (integrity meta)
showReceivedGroupMessage gName c (snd $ broker meta) (receivedFileInvitation ft) (integrity (meta :: MsgMeta))
setActive $ ActiveG gName
processGroupInvitation :: Contact -> GroupInvitation -> m ()
@@ -846,7 +868,7 @@ processAgentMessage user@User {userId, profile} agentConnId agentMessage = do
probeMatch :: Contact -> Contact -> ByteString -> m ()
probeMatch c1@Contact {profile = p1} c2@Contact {profile = p2} probe =
when (p1 == p2) $ do
sendDirectMessage (contactConnId c1) $ XInfoProbeOk probe
sendDirectMessage (contactConn c1) $ XInfoProbeOk probe
mergeContacts c1 c2
xInfoProbeOk :: Contact -> ByteString -> m ()
@@ -859,9 +881,6 @@ processAgentMessage user@User {userId, profile} agentConnId agentMessage = do
withStore $ \st -> mergeContactRecords st userId to from
showContactsMerged to from
parseChatMessage :: ByteString -> Either ChatError ChatMessage
parseChatMessage msgBody = first ChatErrorMessage (parseAll rawChatMessageP msgBody >>= toChatMessage)
saveConnInfo :: Connection -> ConnInfo -> m ()
saveConnInfo activeConn connInfo = do
ChatMessage {chatMsgEvent} <- liftEither $ parseChatMessage connInfo
@@ -881,8 +900,8 @@ processAgentMessage user@User {userId, profile} agentConnId agentMessage = do
newMember <- withStore $ \st -> createNewGroupMember st user group memInfo GCPostMember GSMemAnnounced
showJoinedGroupMemberConnecting gName m newMember
xGrpMemIntro :: GroupName -> GroupMember -> MemberInfo -> m ()
xGrpMemIntro gName m memInfo@(MemberInfo memId _ _) =
xGrpMemIntro :: Connection -> GroupName -> GroupMember -> MemberInfo -> m ()
xGrpMemIntro conn gName m memInfo@(MemberInfo memId _ _) =
case memberCategory m of
GCHostMember -> do
group <- withStore $ \st -> getGroup st user gName
@@ -893,7 +912,7 @@ processAgentMessage user@User {userId, profile} agentConnId agentMessage = do
(directConnId, directConnReq) <- withAgent (`createConnection` SCMInvitation)
newMember <- withStore $ \st -> createIntroReMember st user group m memInfo groupConnId directConnId
let msg = XGrpMemInv memId IntroInvitation {groupConnReq, directConnReq}
sendDirectMessage agentConnId msg
sendDirectMessage conn msg
withStore $ \st -> updateGroupMemberStatus st userId newMember GSMemIntroInvited
_ -> messageError "x.grp.mem.intro can be only sent by host member"
@@ -908,8 +927,8 @@ processAgentMessage user@User {userId, profile} agentConnId agentMessage = do
intro <- withStore $ \st -> saveIntroInvitation st reMember m introInv
case activeConn (reMember :: GroupMember) of
Nothing -> pure () -- this is not an error, introduction will be forwarded once the member is connected
Just Connection {agentConnId = reAgentConnId} -> do
sendDirectMessage reAgentConnId $ XGrpMemFwd (memberInfo m) introInv
Just reConn -> do
sendDirectMessage reConn $ XGrpMemFwd (memberInfo m) introInv
withStore $ \st -> updateIntroStatus st intro GMIntroInvForwarded
_ -> messageError "x.grp.mem.inv can be only sent by invitee member"
@@ -964,6 +983,9 @@ processAgentMessage user@User {userId, profile} agentConnId agentMessage = do
mapM_ deleteMemberConnection ms
showGroupDeleted gName m
parseChatMessage :: ByteString -> Either ChatError ChatMessage
parseChatMessage msgBody = first ChatErrorMessage (parseAll rawChatMessageP msgBody >>= toChatMessage)
sendFileChunk :: ChatMonad m => SndFileTransfer -> m ()
sendFileChunk ft@SndFileTransfer {fileId, fileStatus, agentConnId} =
unless (fileStatus == FSComplete || fileStatus == FSCancelled) $
@@ -1069,22 +1091,42 @@ deleteMemberConnection m@GroupMember {activeConn} = do
-- withStore $ \st -> deleteGroupMemberConnection st userId m
forM_ activeConn $ \conn -> withStore $ \st -> updateConnectionStatus st conn ConnDeleted
sendDirectMessage :: ChatMonad m => ConnId -> ChatMsgEvent -> m ()
sendDirectMessage agentConnId chatMsgEvent =
void . withAgent $ \a -> sendMessage a agentConnId $ directMessage chatMsgEvent
sendDirectMessage :: ChatMonad m => Connection -> ChatMsgEvent -> m ()
sendDirectMessage conn chatMsgEvent = do
let msgBody = directMessage chatMsgEvent
newMsg = NewMessage {direction = MDSnd, chatMsgEventType = toChatEventType chatMsgEvent, msgBody}
-- can be done in transaction after sendMessage, probably shouldn't
msgId <- withStore $ \st -> createNewMessage st newMsg
deliverMessage conn msgBody msgId
directMessage :: ChatMsgEvent -> ByteString
directMessage chatMsgEvent =
serializeRawChatMessage $
rawChatMessage ChatMessage {chatMsgId = Nothing, chatMsgEvent, chatDAG = Nothing}
deliverMessage :: ChatMonad m => Connection -> MsgBody -> MessageId -> m ()
deliverMessage Connection {connId, agentConnId} msgBody msgId = do
agentMsgId <- withAgent $ \a -> sendMessage a agentConnId msgBody
let sndMsgDelivery = SndMsgDelivery {connId, agentMsgId}
withStore $ \st -> createSndMsgDelivery st sndMsgDelivery msgId
sendGroupMessage :: ChatMonad m => [GroupMember] -> ChatMsgEvent -> m ()
sendGroupMessage members chatMsgEvent = do
let msg = directMessage chatMsgEvent
let msgBody = directMessage chatMsgEvent
newMsg = NewMessage {direction = MDSnd, chatMsgEventType = toChatEventType chatMsgEvent, msgBody}
msgId <- withStore $ \st -> createNewMessage st newMsg
-- TODO once scheduled delivery is implemented memberActive should be changed to memberCurrent
withAgent $ \a ->
forM_ (filter memberActive members) $
traverse (\connId -> sendMessage a connId msg) . memberConnId
forM_ (map memberConn $ filter memberActive members) $
traverse (\conn -> deliverMessage conn msgBody msgId)
saveRcvMSG :: ChatMonad m => Connection -> MsgMeta -> MsgBody -> m ChatMsgEvent
saveRcvMSG Connection {connId} agentMsgMeta msgBody = do
ChatMessage {chatMsgEvent} <- liftEither $ parseChatMessage msgBody
let newMsg = NewMessage {direction = MDRcv, chatMsgEventType = toChatEventType chatMsgEvent, msgBody}
agentMsgId = fst $ recipient agentMsgMeta
rcvMsgDelivery = RcvMsgDelivery {connId, agentMsgId, agentMsgMeta}
withStore $ \st -> createNewMessageAndRcvMsgDelivery st newMsg rcvMsgDelivery
pure chatMsgEvent
allowAgentConnection :: ChatMonad m => Connection -> ConfirmationId -> ChatMsgEvent -> m ()
allowAgentConnection conn@Connection {agentConnId} confId msg = do