core: batch send pending group messages (#4242)

This commit is contained in:
spaced4ndy
2024-05-28 18:32:29 +04:00
committed by GitHub
parent 3d395b0e45
commit 2143eb2d7a
7 changed files with 30 additions and 39 deletions

View File

@@ -1743,7 +1743,6 @@ public enum ChatErrorType: Decodable {
case groupMemberNotActive
case groupMemberUserRemoved
case groupMemberNotFound
case groupMemberIntroNotFound(contactName: ContactName)
case groupCantResendInvitation(groupInfo: GroupInfo, contactName: ContactName)
case groupInternal(message: String)
case fileNotFound(message: String)

View File

@@ -4942,7 +4942,6 @@ sealed class ChatErrorType {
is GroupMemberNotActive -> "groupMemberNotActive"
is GroupMemberUserRemoved -> "groupMemberUserRemoved"
is GroupMemberNotFound -> "groupMemberNotFound"
is GroupMemberIntroNotFound -> "groupMemberIntroNotFound"
is GroupCantResendInvitation -> "groupCantResendInvitation"
is GroupInternal -> "groupInternal"
is FileNotFound -> "fileNotFound"
@@ -5022,7 +5021,6 @@ sealed class ChatErrorType {
@Serializable @SerialName("groupMemberNotActive") object GroupMemberNotActive: ChatErrorType()
@Serializable @SerialName("groupMemberUserRemoved") object GroupMemberUserRemoved: ChatErrorType()
@Serializable @SerialName("groupMemberNotFound") object GroupMemberNotFound: ChatErrorType()
@Serializable @SerialName("groupMemberIntroNotFound") class GroupMemberIntroNotFound(val contactName: String): ChatErrorType()
@Serializable @SerialName("groupCantResendInvitation") class GroupCantResendInvitation(val groupInfo: GroupInfo, val contactName: String): ChatErrorType()
@Serializable @SerialName("groupInternal") class GroupInternal(val message: String): ChatErrorType()
@Serializable @SerialName("fileNotFound") class FileNotFound(val message: String): ChatErrorType()

View File

@@ -6597,16 +6597,20 @@ sendGroupMemberMessages user conn events groupId = do
let idsEvts = L.map (GroupId groupId,) events
(errs, msgs) <- lift $ partitionEithers . L.toList <$> createSndMessages idsEvts
unless (null errs) $ toView $ CRChatErrors (Just user) errs
forM_ (L.nonEmpty msgs) $ \msgs' -> do
-- TODO v5.7 based on version (?)
-- let shouldCompress = False
-- let batched = if shouldCompress then batchSndMessagesBinary msgs' else batchSndMessagesJSON msgs'
let batched = batchSndMessagesJSON msgs'
let (errs', msgBatches) = partitionEithers batched
-- shouldn't happen, as large messages would have caused createNewSndMessage to throw SELargeMsg
unless (null errs') $ toView $ CRChatErrors (Just user) errs'
forM_ msgBatches $ \batch ->
processSndMessageBatch conn batch `catchChatError` (toView . CRChatError (Just user))
forM_ (L.nonEmpty msgs) $ \msgs' ->
batchSendGroupMemberMessages user conn msgs'
batchSendGroupMemberMessages :: User -> Connection -> NonEmpty SndMessage -> CM ()
batchSendGroupMemberMessages user conn msgs = do
-- TODO v5.7 based on version (?)
-- let shouldCompress = False
-- let batched = if shouldCompress then batchSndMessagesBinary msgs' else batchSndMessagesJSON msgs'
let batched = batchSndMessagesJSON msgs
let (errs', msgBatches) = partitionEithers batched
-- shouldn't happen, as large messages would have caused createNewSndMessage to throw SELargeMsg
unless (null errs') $ toView $ CRChatErrors (Just user) errs'
forM_ msgBatches $ \batch ->
processSndMessageBatch conn batch `catchChatError` (toView . CRChatError (Just user))
processSndMessageBatch :: Connection -> MsgBatch -> CM ()
processSndMessageBatch conn@Connection {connId} (MsgBatch batchBody sndMsgs) = do
@@ -6795,21 +6799,20 @@ sendGroupMemberMessage user m@GroupMember {groupMemberId} chatMsgEvent groupId i
MSASend conn -> deliverMessage conn (toCMEventTag chatMsgEvent) msgBody msgId >> postDeliver
MSAPending -> withStore' $ \db -> createPendingGroupMessage db groupMemberId msgId introId_
-- TODO ensure order - pending messages interleave with user input messages
sendPendingGroupMessages :: User -> GroupMember -> Connection -> CM ()
sendPendingGroupMessages user GroupMember {groupMemberId, localDisplayName} conn = do
pendingMessages <- withStore' $ \db -> getPendingGroupMessages db groupMemberId
-- TODO ensure order - pending messages interleave with user input messages
forM_ pendingMessages $ \pgm ->
processPendingMessage pgm `catchChatError` (toView . CRChatError (Just user))
sendPendingGroupMessages user GroupMember {groupMemberId} conn = do
pgms <- withStore' $ \db -> getPendingGroupMessages db groupMemberId
forM_ (L.nonEmpty pgms) $ \pgms' -> do
let msgs = L.map (\(sndMsg, _, _) -> sndMsg) pgms'
batchSendGroupMemberMessages user conn msgs
lift . void . withStoreBatch' $ \db -> L.map (\SndMessage {msgId} -> deletePendingGroupMessage db groupMemberId msgId) msgs
lift . void . withStoreBatch' $ \db -> L.map (\(_, tag, introId_) -> updateIntro_ db tag introId_) pgms'
where
processPendingMessage PendingGroupMessage {msgId, cmEventTag = ACMEventTag _ tag, msgBody, introId_} = do
void $ deliverMessage conn tag msgBody msgId
withStore' $ \db -> deletePendingGroupMessage db groupMemberId msgId
case tag of
XGrpMemFwd_ -> case introId_ of
Just introId -> withStore' $ \db -> updateIntroStatus db introId GMIntroInvForwarded
_ -> throwChatError $ CEGroupMemberIntroNotFound localDisplayName
_ -> pure ()
updateIntro_ :: DB.Connection -> ACMEventTag -> Maybe Int64 -> IO ()
updateIntro_ db tag introId_ = case (tag, introId_) of
(ACMEventTag _ XGrpMemFwd_, Just introId) -> updateIntroStatus db introId GMIntroInvForwarded
_ -> pure ()
-- TODO [batch send] refactor direct message processing same as groups (e.g. checkIntegrity before processing)
saveDirectRcvMSG :: Connection -> MsgMeta -> MsgBody -> CM (Connection, RcvMessage)

View File

@@ -1118,7 +1118,6 @@ data ChatErrorType
| CECantBlockMemberForSelf {groupInfo :: GroupInfo, member :: GroupMember, setShowMessages :: Bool}
| CEGroupMemberUserRemoved
| CEGroupMemberNotFound
| CEGroupMemberIntroNotFound {contactName :: ContactName}
| CEGroupCantResendInvitation {groupInfo :: GroupInfo, contactName :: ContactName}
| CEGroupInternal {message :: String}
| CEFileNotFound {message :: String}

View File

@@ -945,13 +945,6 @@ data RcvMessage = RcvMessage
forwardedByMember :: Maybe GroupMemberId
}
data PendingGroupMessage = PendingGroupMessage
{ msgId :: MessageId,
cmEventTag :: ACMEventTag,
msgBody :: MsgBody,
introId_ :: Maybe Int64
}
type MessageId = Int64
data ConnOrGroupId = ConnectionId Int64 | GroupId Int64

View File

@@ -285,13 +285,13 @@ createPendingGroupMessage db groupMemberId messageId introId_ = do
|]
(groupMemberId, messageId, introId_, currentTs, currentTs)
getPendingGroupMessages :: DB.Connection -> Int64 -> IO [PendingGroupMessage]
getPendingGroupMessages :: DB.Connection -> Int64 -> IO [(SndMessage, ACMEventTag, Maybe Int64)]
getPendingGroupMessages db groupMemberId =
map pendingGroupMessage
<$> DB.query
db
[sql|
SELECT pgm.message_id, m.chat_msg_event, m.msg_body, pgm.group_member_intro_id
SELECT pgm.message_id, m.shared_msg_id, m.msg_body, m.chat_msg_event, pgm.group_member_intro_id
FROM pending_group_messages pgm
JOIN messages m USING (message_id)
WHERE pgm.group_member_id = ?
@@ -299,8 +299,8 @@ getPendingGroupMessages db groupMemberId =
|]
(Only groupMemberId)
where
pendingGroupMessage (msgId, cmEventTag, msgBody, introId_) =
PendingGroupMessage {msgId, cmEventTag, msgBody, introId_}
pendingGroupMessage (msgId, sharedMsgId, msgBody, cmEventTag, introId_) =
(SndMessage {msgId, sharedMsgId, msgBody}, cmEventTag, introId_)
deletePendingGroupMessage :: DB.Connection -> Int64 -> MessageId -> IO ()
deletePendingGroupMessage db groupMemberId messageId =

View File

@@ -1954,7 +1954,6 @@ viewChatError isCmd logLevel testView = \case
]
CEGroupMemberUserRemoved -> ["you are no longer a member of the group"]
CEGroupMemberNotFound -> ["group doesn't have this member"]
CEGroupMemberIntroNotFound c -> ["group member intro not found for " <> ttyContact c]
CEGroupCantResendInvitation g c -> viewCannotResendInvitation g c
CEGroupInternal s -> ["chat group bug: " <> plain s]
CEFileNotFound f -> ["file not found: " <> plain f]