Merge branch 'master' into master-ghc8107

This commit is contained in:
Evgeny Poberezkin
2023-12-23 13:52:44 +00:00
57 changed files with 1833 additions and 515 deletions
+386 -248
View File
@@ -28,6 +28,7 @@ import Data.Bifunctor (bimap, first)
import Data.ByteArray (ScrubbedBytes)
import qualified Data.ByteArray as BA
import qualified Data.ByteString.Base64 as B64
import Data.ByteString.Builder (toLazyByteString)
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import qualified Data.ByteString.Lazy.Char8 as LB
@@ -37,20 +38,19 @@ import Data.Either (fromRight, lefts, partitionEithers, rights)
import Data.Fixed (div')
import Data.Functor (($>))
import Data.Int (Int64)
import Data.List (find, foldl', isSuffixOf, partition, sortBy, sortOn)
import Data.List.NonEmpty (NonEmpty, nonEmpty)
import Data.List (find, foldl', isSuffixOf, partition, sortOn)
import Data.List.NonEmpty (NonEmpty (..), nonEmpty, toList, (<|))
import qualified Data.List.NonEmpty as L
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Data.Maybe (catMaybes, fromMaybe, isJust, isNothing, listToMaybe, mapMaybe, maybeToList)
import Data.Ord (comparing)
import Data.Text (Text)
import qualified Data.Text as T
import Data.Text.Encoding (decodeLatin1, encodeUtf8)
import Data.Time (NominalDiffTime, addUTCTime, defaultTimeLocale, formatTime)
import Data.Time.Clock (UTCTime, diffUTCTime, getCurrentTime, nominalDay, nominalDiffTimeToSeconds)
import Data.Time.Clock.System (systemToUTCTime)
import Data.Word (Word16, Word32)
import Data.Word (Word32)
import qualified Database.SQLite.Simple as SQL
import Simplex.Chat.Archive
import Simplex.Chat.Call
@@ -58,6 +58,7 @@ import Simplex.Chat.Controller
import Simplex.Chat.Files
import Simplex.Chat.Markdown
import Simplex.Chat.Messages
import Simplex.Chat.Messages.Batch (MsgBatch (..), batchMessages)
import Simplex.Chat.Messages.CIContent
import Simplex.Chat.Messages.CIContent.Events
import Simplex.Chat.Options
@@ -76,7 +77,7 @@ import Simplex.Chat.Store.Shared
import Simplex.Chat.Types
import Simplex.Chat.Types.Preferences
import Simplex.Chat.Types.Util
import Simplex.Chat.Util (encryptFile)
import Simplex.Chat.Util (encryptFile, shuffle)
import Simplex.FileTransfer.Client.Main (maxFileSize)
import Simplex.FileTransfer.Client.Presets (defaultXFTPServers)
import Simplex.FileTransfer.Description (ValidFileDescription, gb, kb, mb)
@@ -196,79 +197,84 @@ createChatDatabase filePrefix key keepKey confirmMigrations = runExceptT $ do
agentStore <- ExceptT $ createAgentStore (agentStoreFile filePrefix) key keepKey confirmMigrations
pure ChatDatabase {chatStore, agentStore}
newChatController :: ChatDatabase -> Maybe User -> ChatConfig -> ChatOpts -> IO ChatController
newChatController ChatDatabase {chatStore, agentStore} user cfg@ChatConfig {agentConfig = aCfg, defaultServers, inlineFiles, tempDir, deviceNameForRemote} ChatOpts {coreOptions = CoreChatOpts {smpServers, xftpServers, networkConfig, logLevel, logConnections, logServerHosts, logFile, tbqSize, highlyAvailable}, deviceName, optFilesFolder, showReactions, allowInstantFiles, autoAcceptFileSize} = do
let inlineFiles' = if allowInstantFiles || autoAcceptFileSize > 0 then inlineFiles else inlineFiles {sendChunks = 0, receiveInstant = False}
config = cfg {logLevel, showReactions, tbqSize, subscriptionEvents = logConnections, hostEvents = logServerHosts, defaultServers = configServers, inlineFiles = inlineFiles', autoAcceptFileSize, highlyAvailable}
firstTime = dbNew chatStore
currentUser <- newTVarIO user
currentRemoteHost <- newTVarIO Nothing
servers <- agentServers config
smpAgent <- getSMPAgentClient aCfg {tbqSize} servers agentStore
agentAsync <- newTVarIO Nothing
random <- liftIO C.newRandom
inputQ <- newTBQueueIO tbqSize
outputQ <- newTBQueueIO tbqSize
connNetworkStatuses <- atomically TM.empty
subscriptionMode <- newTVarIO SMSubscribe
chatLock <- newEmptyTMVarIO
sndFiles <- newTVarIO M.empty
rcvFiles <- newTVarIO M.empty
currentCalls <- atomically TM.empty
localDeviceName <- newTVarIO $ fromMaybe deviceNameForRemote deviceName
multicastSubscribers <- newTMVarIO 0
remoteSessionSeq <- newTVarIO 0
remoteHostSessions <- atomically TM.empty
remoteHostsFolder <- newTVarIO Nothing
remoteCtrlSession <- newTVarIO Nothing
filesFolder <- newTVarIO optFilesFolder
chatStoreChanged <- newTVarIO False
expireCIThreads <- newTVarIO M.empty
expireCIFlags <- newTVarIO M.empty
cleanupManagerAsync <- newTVarIO Nothing
timedItemThreads <- atomically TM.empty
showLiveItems <- newTVarIO False
encryptLocalFiles <- newTVarIO False
userXFTPFileConfig <- newTVarIO $ xftpFileConfig cfg
tempDirectory <- newTVarIO tempDir
contactMergeEnabled <- newTVarIO True
pure
ChatController
{ firstTime,
currentUser,
currentRemoteHost,
smpAgent,
agentAsync,
chatStore,
chatStoreChanged,
random,
inputQ,
outputQ,
connNetworkStatuses,
subscriptionMode,
chatLock,
sndFiles,
rcvFiles,
currentCalls,
localDeviceName,
multicastSubscribers,
remoteSessionSeq,
remoteHostSessions,
remoteHostsFolder,
remoteCtrlSession,
config,
filesFolder,
expireCIThreads,
expireCIFlags,
cleanupManagerAsync,
timedItemThreads,
showLiveItems,
encryptLocalFiles,
userXFTPFileConfig,
tempDirectory,
logFilePath = logFile,
contactMergeEnabled
}
newChatController :: ChatDatabase -> Maybe User -> ChatConfig -> ChatOpts -> Bool -> IO ChatController
newChatController
ChatDatabase {chatStore, agentStore}
user
cfg@ChatConfig {agentConfig = aCfg, defaultServers, inlineFiles, tempDir, deviceNameForRemote}
ChatOpts {coreOptions = CoreChatOpts {smpServers, xftpServers, networkConfig, logLevel, logConnections, logServerHosts, logFile, tbqSize, highlyAvailable}, deviceName, optFilesFolder, showReactions, allowInstantFiles, autoAcceptFileSize}
backgroundMode = do
let inlineFiles' = if allowInstantFiles || autoAcceptFileSize > 0 then inlineFiles else inlineFiles {sendChunks = 0, receiveInstant = False}
config = cfg {logLevel, showReactions, tbqSize, subscriptionEvents = logConnections, hostEvents = logServerHosts, defaultServers = configServers, inlineFiles = inlineFiles', autoAcceptFileSize, highlyAvailable}
firstTime = dbNew chatStore
currentUser <- newTVarIO user
currentRemoteHost <- newTVarIO Nothing
servers <- agentServers config
smpAgent <- getSMPAgentClient aCfg {tbqSize} servers agentStore backgroundMode
agentAsync <- newTVarIO Nothing
random <- liftIO C.newRandom
inputQ <- newTBQueueIO tbqSize
outputQ <- newTBQueueIO tbqSize
connNetworkStatuses <- atomically TM.empty
subscriptionMode <- newTVarIO SMSubscribe
chatLock <- newEmptyTMVarIO
sndFiles <- newTVarIO M.empty
rcvFiles <- newTVarIO M.empty
currentCalls <- atomically TM.empty
localDeviceName <- newTVarIO $ fromMaybe deviceNameForRemote deviceName
multicastSubscribers <- newTMVarIO 0
remoteSessionSeq <- newTVarIO 0
remoteHostSessions <- atomically TM.empty
remoteHostsFolder <- newTVarIO Nothing
remoteCtrlSession <- newTVarIO Nothing
filesFolder <- newTVarIO optFilesFolder
chatStoreChanged <- newTVarIO False
expireCIThreads <- newTVarIO M.empty
expireCIFlags <- newTVarIO M.empty
cleanupManagerAsync <- newTVarIO Nothing
timedItemThreads <- atomically TM.empty
showLiveItems <- newTVarIO False
encryptLocalFiles <- newTVarIO False
userXFTPFileConfig <- newTVarIO $ xftpFileConfig cfg
tempDirectory <- newTVarIO tempDir
contactMergeEnabled <- newTVarIO True
pure
ChatController
{ firstTime,
currentUser,
currentRemoteHost,
smpAgent,
agentAsync,
chatStore,
chatStoreChanged,
random,
inputQ,
outputQ,
connNetworkStatuses,
subscriptionMode,
chatLock,
sndFiles,
rcvFiles,
currentCalls,
localDeviceName,
multicastSubscribers,
remoteSessionSeq,
remoteHostSessions,
remoteHostsFolder,
remoteCtrlSession,
config,
filesFolder,
expireCIThreads,
expireCIFlags,
cleanupManagerAsync,
timedItemThreads,
showLiveItems,
encryptLocalFiles,
userXFTPFileConfig,
tempDirectory,
logFilePath = logFile,
contactMergeEnabled
}
where
configServers :: DefaultAgentServers
configServers =
@@ -601,7 +607,7 @@ processChatCommand = \case
<$> withConnection st (readTVarIO . DB.slow)
APIGetChats {userId, pendingConnections, pagination, query} -> withUserId' userId $ \user -> do
(errs, previews) <- partitionEithers <$> withStore' (\db -> getChatPreviews db user pendingConnections pagination query)
toView $ CRChatErrors (Just user) (map ChatErrorStore errs)
unless (null errs) $ toView $ CRChatErrors (Just user) (map ChatErrorStore errs)
pure $ CRApiChats user previews
APIGetChat (ChatRef cType cId) pagination search -> withUser $ \user -> case cType of
-- TODO optimize queries calculating ChatStats, currently they're disabled
@@ -682,7 +688,7 @@ processChatCommand = \case
withStore $ \db -> getDirectChatItem db user chatId quotedItemId
(origQmc, qd, sent) <- quoteData qci
let msgRef = MsgRef {msgId = itemSharedMsgId, sentAt = itemTs, sent, memberId = Nothing}
qmc = quoteContent origQmc file
qmc = quoteContent mc origQmc file
quotedItem = CIQuote {chatDir = qd, itemId = Just quotedItemId, sharedMsgId = itemSharedMsgId, sentAt = itemTs, content = qmc, formattedText}
pure (MCQuote QuotedMsg {msgRef, content = qmc} (ExtMsgContent mc fInv_ (ttl' <$> timed_) (justTrue live)), Just quotedItem)
where
@@ -696,22 +702,22 @@ processChatCommand = \case
assertUserGroupRole gInfo GRAuthor
send g
where
send g@(Group gInfo@GroupInfo {groupId, membership} ms)
send g@(Group gInfo@GroupInfo {groupId} ms)
| isVoice mc && not (groupFeatureAllowed SGFVoice gInfo) = notAllowedError GFVoice
| not (isVoice mc) && isJust file_ && not (groupFeatureAllowed SGFFiles gInfo) = notAllowedError GFFiles
| otherwise = do
(fInv_, ciFile_, ft_) <- unzipMaybe3 <$> setupSndFileTransfer g (length $ filter memberCurrent ms)
timed_ <- sndGroupCITimed live gInfo itemTTL
(msgContainer, quotedItem_) <- prepareMsg fInv_ timed_ membership
(msg@SndMessage {sharedMsgId}, sentToMembers) <- sendGroupMessage user gInfo ms (XMsgNew msgContainer)
ci <- saveSndChatItem' user (CDGroupSnd gInfo) msg (CISndMsgContent mc) ciFile_ quotedItem_ timed_ live
withStore' $ \db ->
forM_ sentToMembers $ \GroupMember {groupMemberId} ->
createGroupSndStatus db (chatItemId' ci) groupMemberId CISSndNew
mapM_ (sendGroupFileInline ms sharedMsgId) ft_
forM_ (timed_ >>= timedDeleteAt') $
startProximateTimedItemThread user (ChatRef CTGroup groupId, chatItemId' ci)
pure $ CRNewChatItem user (AChatItem SCTGroup SMDSnd (GroupChat gInfo) ci)
(fInv_, ciFile_, ft_) <- unzipMaybe3 <$> setupSndFileTransfer g (length $ filter memberCurrent ms)
timed_ <- sndGroupCITimed live gInfo itemTTL
(msgContainer, quotedItem_) <- prepareGroupMsg user gInfo mc quotedItemId_ fInv_ timed_ live
(msg@SndMessage {sharedMsgId}, sentToMembers) <- sendGroupMessage user gInfo ms (XMsgNew msgContainer)
ci <- saveSndChatItem' user (CDGroupSnd gInfo) msg (CISndMsgContent mc) ciFile_ quotedItem_ timed_ live
withStore' $ \db ->
forM_ sentToMembers $ \GroupMember {groupMemberId} ->
createGroupSndStatus db (chatItemId' ci) groupMemberId CISSndNew
mapM_ (sendGroupFileInline ms sharedMsgId) ft_
forM_ (timed_ >>= timedDeleteAt') $
startProximateTimedItemThread user (ChatRef CTGroup groupId, chatItemId' ci)
pure $ CRNewChatItem user (AChatItem SCTGroup SMDSnd (GroupChat gInfo) ci)
notAllowedError f = pure $ chatCmdError (Just user) ("feature not allowed " <> T.unpack (groupFeatureNameText f))
setupSndFileTransfer :: Group -> Int -> m (Maybe (FileInvitation, CIFile 'MDSnd, FileTransferMeta))
setupSndFileTransfer g@(Group gInfo _) n = forM file_ $ \file -> do
@@ -742,51 +748,9 @@ processChatCommand = \case
void . withStore' $ \db -> createSndGroupInlineFT db m conn ft
sendMemberFileInline m conn ft sharedMsgId
processMember _ = pure ()
prepareMsg :: Maybe FileInvitation -> Maybe CITimed -> GroupMember -> m (MsgContainer, Maybe (CIQuote 'CTGroup))
prepareMsg fInv_ timed_ membership = case quotedItemId_ of
Nothing -> pure (MCSimple (ExtMsgContent mc fInv_ (ttl' <$> timed_) (justTrue live)), Nothing)
Just quotedItemId -> do
CChatItem _ qci@ChatItem {meta = CIMeta {itemTs, itemSharedMsgId}, formattedText, file} <-
withStore $ \db -> getGroupChatItem db user chatId quotedItemId
(origQmc, qd, sent, GroupMember {memberId}) <- quoteData qci membership
let msgRef = MsgRef {msgId = itemSharedMsgId, sentAt = itemTs, sent, memberId = Just memberId}
qmc = quoteContent origQmc file
quotedItem = CIQuote {chatDir = qd, itemId = Just quotedItemId, sharedMsgId = itemSharedMsgId, sentAt = itemTs, content = qmc, formattedText}
pure (MCQuote QuotedMsg {msgRef, content = qmc} (ExtMsgContent mc fInv_ (ttl' <$> timed_) (justTrue live)), Just quotedItem)
where
quoteData :: ChatItem c d -> GroupMember -> m (MsgContent, CIQDirection 'CTGroup, Bool, GroupMember)
quoteData ChatItem {meta = CIMeta {itemDeleted = Just _}} _ = throwChatError CEInvalidQuote
quoteData ChatItem {chatDir = CIGroupSnd, content = CISndMsgContent qmc} membership' = pure (qmc, CIQGroupSnd, True, membership')
quoteData ChatItem {chatDir = CIGroupRcv m, content = CIRcvMsgContent qmc} _ = pure (qmc, CIQGroupRcv $ Just m, False, m)
quoteData _ _ = throwChatError CEInvalidQuote
CTContactRequest -> pure $ chatCmdError (Just user) "not supported"
CTContactConnection -> pure $ chatCmdError (Just user) "not supported"
where
quoteContent :: forall d. MsgContent -> Maybe (CIFile d) -> MsgContent
quoteContent qmc ciFile_
| replaceContent = MCText qTextOrFile
| otherwise = case qmc of
MCImage _ image -> MCImage qTextOrFile image
MCFile _ -> MCFile qTextOrFile
-- consider same for voice messages
-- MCVoice _ voice -> MCVoice qTextOrFile voice
_ -> qmc
where
-- if the message we're quoting with is one of the "large" MsgContents
-- we replace the quote's content with MCText
replaceContent = case mc of
MCText _ -> False
MCFile _ -> False
MCLink {} -> True
MCImage {} -> True
MCVideo {} -> True
MCVoice {} -> False
MCUnknown {} -> True
qText = msgContentText qmc
getFileName :: CIFile d -> String
getFileName CIFile {fileName} = fileName
qFileName = maybe qText (T.pack . getFileName) ciFile_
qTextOrFile = if T.null qText then qFileName else qText
xftpSndFileTransfer :: User -> CryptoFile -> Integer -> Int -> ContactOrGroup -> m (FileInvitation, CIFile 'MDSnd, FileTransferMeta)
xftpSndFileTransfer user file@(CryptoFile filePath cfArgs) fileSize n contactOrGroup = do
let fileName = takeFileName filePath
@@ -1831,7 +1795,7 @@ processChatCommand = \case
LastChats count_ -> withUser' $ \user -> do
let count = fromMaybe 5000 count_
(errs, previews) <- partitionEithers <$> withStore' (\db -> getChatPreviews db user False (PTLast count) clqNoFilters)
toView $ CRChatErrors (Just user) (map ChatErrorStore errs)
unless (null errs) $ toView $ CRChatErrors (Just user) (map ChatErrorStore errs)
pure $ CRChats previews
LastMessages (Just chatName) count search -> withUser $ \user -> do
chatRef <- getChatRef user chatName
@@ -2301,7 +2265,7 @@ processChatCommand = \case
tryChatError (withStore (`getUser` userId)) >>= \case
Left _ -> throwChatError CEUserUnknown
Right user -> pure user
validateUserPassword :: User -> User -> Maybe UserPwd -> m ()
validateUserPassword :: User -> User -> Maybe UserPwd -> m ()
validateUserPassword = validateUserPassword_ . Just
validateUserPassword_ :: Maybe User -> User -> Maybe UserPwd -> m ()
validateUserPassword_ user_ User {userId = userId', viewPwdHash} viewPwd_ =
@@ -2429,6 +2393,50 @@ processChatCommand = \case
cReqHashes = bimap hash hash cReqSchemas
hash = ConnReqUriHash . C.sha256Hash . strEncode
prepareGroupMsg :: forall m. ChatMonad m => User -> GroupInfo -> MsgContent -> Maybe ChatItemId -> Maybe FileInvitation -> Maybe CITimed -> Bool -> m (MsgContainer, Maybe (CIQuote 'CTGroup))
prepareGroupMsg user GroupInfo {groupId, membership} mc quotedItemId_ fInv_ timed_ live = case quotedItemId_ of
Nothing -> pure (MCSimple (ExtMsgContent mc fInv_ (ttl' <$> timed_) (justTrue live)), Nothing)
Just quotedItemId -> do
CChatItem _ qci@ChatItem {meta = CIMeta {itemTs, itemSharedMsgId}, formattedText, file} <-
withStore $ \db -> getGroupChatItem db user groupId quotedItemId
(origQmc, qd, sent, GroupMember {memberId}) <- quoteData qci membership
let msgRef = MsgRef {msgId = itemSharedMsgId, sentAt = itemTs, sent, memberId = Just memberId}
qmc = quoteContent mc origQmc file
quotedItem = CIQuote {chatDir = qd, itemId = Just quotedItemId, sharedMsgId = itemSharedMsgId, sentAt = itemTs, content = qmc, formattedText}
pure (MCQuote QuotedMsg {msgRef, content = qmc} (ExtMsgContent mc fInv_ (ttl' <$> timed_) (justTrue live)), Just quotedItem)
where
quoteData :: ChatItem c d -> GroupMember -> m (MsgContent, CIQDirection 'CTGroup, Bool, GroupMember)
quoteData ChatItem {meta = CIMeta {itemDeleted = Just _}} _ = throwChatError CEInvalidQuote
quoteData ChatItem {chatDir = CIGroupSnd, content = CISndMsgContent qmc} membership' = pure (qmc, CIQGroupSnd, True, membership')
quoteData ChatItem {chatDir = CIGroupRcv m, content = CIRcvMsgContent qmc} _ = pure (qmc, CIQGroupRcv $ Just m, False, m)
quoteData _ _ = throwChatError CEInvalidQuote
quoteContent :: forall d. MsgContent -> MsgContent -> Maybe (CIFile d) -> MsgContent
quoteContent mc qmc ciFile_
| replaceContent = MCText qTextOrFile
| otherwise = case qmc of
MCImage _ image -> MCImage qTextOrFile image
MCFile _ -> MCFile qTextOrFile
-- consider same for voice messages
-- MCVoice _ voice -> MCVoice qTextOrFile voice
_ -> qmc
where
-- if the message we're quoting with is one of the "large" MsgContents
-- we replace the quote's content with MCText
replaceContent = case mc of
MCText _ -> False
MCFile _ -> False
MCLink {} -> True
MCImage {} -> True
MCVideo {} -> True
MCVoice {} -> False
MCUnknown {} -> True
qText = msgContentText qmc
getFileName :: CIFile d -> String
getFileName CIFile {fileName} = fileName
qFileName = maybe qText (T.pack . getFileName) ciFile_
qTextOrFile = if T.null qText then qFileName else qText
assertDirectAllowed :: ChatMonad m => User -> MsgDirection -> Contact -> CMEventTag e -> m ()
assertDirectAllowed user dir ct event =
unless (allowedChatEvent || anyDirectOrUsed ct) . unlessM directMessagesAllowed $
@@ -2606,7 +2614,7 @@ acceptFileReceive user@User {userId} RcvFileTransfer {fileId, xftpRcvFile, fileI
-- marking file as accepted and reading description in the same transaction
-- to prevent race condition with appending description
ci <- xftpAcceptRcvFT db user fileId filePath
rfd <- getRcvFileDescrByFileId db fileId
rfd <- getRcvFileDescrByRcvFileId db fileId
pure (ci, rfd)
receiveViaCompleteFD user fileId rfd cryptoArgs
pure ci
@@ -3184,17 +3192,29 @@ processAgentMsgSndFile _corrId aFileId msg =
sendFileDescription sft rfd msgId sendMsg = do
let rfdText = fileDescrText rfd
withStore' $ \db -> updateSndFTDescrXFTP db user sft rfdText
partSize <- asks $ xftpDescrPartSize . config
sendParts 1 partSize rfdText
parts <- splitFileDescr rfdText
loopSend parts
where
sendParts partNo partSize rfdText = do
let (part, rest) = T.splitAt partSize rfdText
complete = T.null rest
fileDescr = FileDescr {fileDescrText = part, fileDescrPartNo = partNo, fileDescrComplete = complete}
-- returns msgDeliveryId of the last file description message
loopSend :: NonEmpty FileDescr -> m Int64
loopSend (fileDescr :| fds) = do
(_, msgDeliveryId) <- sendMsg $ XMsgFileDescr {msgId, fileDescr}
if complete
then pure msgDeliveryId
else sendParts (partNo + 1) partSize rest
case L.nonEmpty fds of
Just fds' -> loopSend fds'
Nothing -> pure msgDeliveryId
splitFileDescr :: ChatMonad m => RcvFileDescrText -> m (NonEmpty FileDescr)
splitFileDescr rfdText = do
partSize <- asks $ xftpDescrPartSize . config
pure $ splitParts 1 partSize rfdText
where
splitParts partNo partSize remText =
let (part, rest) = T.splitAt partSize remText
complete = T.null rest
fileDescr = FileDescr {fileDescrText = part, fileDescrPartNo = partNo, fileDescrComplete = complete}
in if complete
then fileDescr :| []
else fileDescr <| splitParts (partNo + 1) partSize rest
processAgentMsgRcvFile :: forall m. ChatMonad m => ACorrId -> RcvFileId -> ACommand 'Agent 'AERcvFile -> m ()
processAgentMsgRcvFile _corrId aFileId msg =
@@ -3289,6 +3309,9 @@ processAgentMessageConn user@User {userId} corrId agentConnId agentMessage = do
pure ()
MSG meta _msgFlags msgBody -> do
cmdId <- createAckCmd conn
-- TODO only acknowledge without saving message?
-- probably this branch is never executed, so there should be no reason
-- to save message if contact hasn't been created yet - chat item isn't created anyway
withAckMessage agentConnId cmdId meta $ do
(_conn', _) <- saveDirectRcvMSG conn meta cmdId msgBody
pure False
@@ -3564,21 +3587,105 @@ processAgentMessageConn user@User {userId} corrId agentConnId agentMessage = do
let Connection {viaUserContactLink} = conn
when (isJust viaUserContactLink && isNothing (memberContactId m)) sendXGrpLinkMem
members <- withStore' $ \db -> getGroupMembers db user gInfo
intros <- withStore' $ \db -> createIntroductions db members m
void . sendGroupMessage user gInfo members . XGrpMemNew $ memberInfo m
shuffledIntros <- liftIO $ shuffleMembers intros $ \GroupMemberIntro {reMember = GroupMember {memberRole}} -> memberRole
forM_ shuffledIntros $ \intro ->
processIntro intro `catchChatError` (toView . CRChatError (Just user))
sendIntroductions members
when (groupFeatureAllowed SGFHistory gInfo) sendHistory
where
sendXGrpLinkMem = do
let profileMode = ExistingIncognito <$> incognitoMembershipProfile gInfo
profileToSend = profileToSendOnAccept user profileMode
void $ sendDirectMessage conn (XGrpLinkMem profileToSend) (GroupId groupId)
sendIntroductions members = do
intros <- withStore' $ \db -> createIntroductions db members m
shuffledIntros <- liftIO $ shuffleIntros intros
if isCompatibleRange (memberChatVRange' m) batchSendVRange
then do
let events = map (XGrpMemIntro . memberInfo . reMember) shuffledIntros
forM_ (L.nonEmpty events) $ \events' ->
sendGroupMemberMessages user conn events' groupId
else forM_ shuffledIntros $ \intro ->
processIntro intro `catchChatError` (toView . CRChatError (Just user))
shuffleIntros :: [GroupMemberIntro] -> IO [GroupMemberIntro]
shuffleIntros intros = do
let (admins, others) = partition isAdmin intros
(admPics, admNoPics) = partition hasPicture admins
(othPics, othNoPics) = partition hasPicture others
mconcat <$> mapM shuffle [admPics, admNoPics, othPics, othNoPics]
where
isAdmin GroupMemberIntro {reMember = GroupMember {memberRole}} = memberRole >= GRAdmin
hasPicture GroupMemberIntro {reMember = GroupMember {memberProfile = LocalProfile {image}}} = isJust image
processIntro intro@GroupMemberIntro {introId} = do
void $ sendDirectMessage conn (XGrpMemIntro $ memberInfo (reMember intro)) (GroupId groupId)
withStore' $ \db -> updateIntroStatus db introId GMIntroSent
sendHistory =
when (isCompatibleRange (memberChatVRange' m) batchSendVRange) $ do
(errs, items) <- partitionEithers <$> withStore' (\db -> getGroupHistoryItems db user gInfo 100)
(errs', events) <- partitionEithers <$> mapM (tryChatError . itemForwardEvents) items
let errors = map ChatErrorStore errs <> errs'
unless (null errors) $ toView $ CRChatErrors (Just user) errors
forM_ (L.nonEmpty $ concat events) $ \events' ->
sendGroupMemberMessages user conn events' groupId
itemForwardEvents :: CChatItem 'CTGroup -> m [ChatMsgEvent 'Json]
itemForwardEvents cci = case cci of
(CChatItem SMDRcv ci@ChatItem {chatDir = CIGroupRcv sender, content = CIRcvMsgContent mc, file}) -> do
fInvDescr_ <- join <$> forM file getRcvFileInvDescr
processContentItem sender ci mc fInvDescr_
(CChatItem SMDSnd ci@ChatItem {content = CISndMsgContent mc, file}) -> do
fInvDescr_ <- join <$> forM file getSndFileInvDescr
processContentItem membership ci mc fInvDescr_
_ -> pure []
where
getRcvFileInvDescr :: CIFile 'MDRcv -> m (Maybe (FileInvitation, RcvFileDescrText))
getRcvFileInvDescr ciFile@CIFile {fileId, fileProtocol, fileStatus} = do
expired <- fileExpired
if fileProtocol /= FPXFTP || fileStatus == CIFSRcvCancelled || expired
then pure Nothing
else do
rfd <- withStore $ \db -> getRcvFileDescrByRcvFileId db fileId
pure $ invCompleteDescr ciFile rfd
getSndFileInvDescr :: CIFile 'MDSnd -> m (Maybe (FileInvitation, RcvFileDescrText))
getSndFileInvDescr ciFile@CIFile {fileId, fileProtocol, fileStatus} = do
expired <- fileExpired
if fileProtocol /= FPXFTP || fileStatus == CIFSSndCancelled || expired
then pure Nothing
else do
-- can also lookup in extra_xftp_file_descriptions, though it can be empty;
-- would be best if snd file had a single rcv description for all members saved in files table
rfd <- withStore $ \db -> getRcvFileDescrBySndFileId db fileId
pure $ invCompleteDescr ciFile rfd
fileExpired :: m Bool
fileExpired = do
ttl <- asks $ rcvFilesTTL . agentConfig . config
cutoffTs <- addUTCTime (-ttl) <$> liftIO getCurrentTime
pure $ chatItemTs cci < cutoffTs
invCompleteDescr :: CIFile d -> RcvFileDescr -> Maybe (FileInvitation, RcvFileDescrText)
invCompleteDescr CIFile {fileName, fileSize} RcvFileDescr {fileDescrText, fileDescrComplete}
| fileDescrComplete =
let fInvDescr = FileDescr {fileDescrText = "", fileDescrPartNo = 0, fileDescrComplete = False}
fInv = xftpFileInvitation fileName fileSize fInvDescr
in Just (fInv, fileDescrText)
| otherwise = Nothing
processContentItem :: GroupMember -> ChatItem 'CTGroup d -> MsgContent -> Maybe (FileInvitation, RcvFileDescrText) -> m [ChatMsgEvent Json]
processContentItem sender ChatItem {meta, quotedItem} mc fInvDescr_ =
if isNothing fInvDescr_ && not (msgContentHasText mc)
then pure []
else do
let CIMeta {itemTs, itemSharedMsgId, itemTimed} = meta
quotedItemId_ = quoteItemId =<< quotedItem
fInv_ = fst <$> fInvDescr_
(msgContainer, _) <- prepareGroupMsg user gInfo mc quotedItemId_ fInv_ itemTimed False
let senderVRange = memberChatVRange' sender
xMsgNewChatMsg = ChatMessage {chatVRange = senderVRange, msgId = itemSharedMsgId, chatMsgEvent = XMsgNew msgContainer}
fileDescrEvents <- case (snd <$> fInvDescr_, itemSharedMsgId) of
(Just fileDescrText, Just msgId) -> do
parts <- splitFileDescr fileDescrText
pure . toList $ L.map (XMsgFileDescr msgId) parts
_ -> pure []
let fileDescrChatMsgs = map (ChatMessage senderVRange Nothing) fileDescrEvents
GroupMember {memberId} = sender
msgForwardEvents = map (\cm -> XGrpMsgForward memberId cm itemTs) (xMsgNewChatMsg : fileDescrChatMsgs)
pure msgForwardEvents
_ -> do
-- TODO notify member who forwarded introduction - question - where it is stored? There is via_contact but probably there should be via_member in group_members table
let memCategory = memberCategory m
withStore' (\db -> getViaGroupContact db user m) >>= \case
Nothing -> do
@@ -3606,41 +3713,27 @@ processAgentMessageConn user@User {userId} corrId agentConnId agentMessage = do
void $ sendDirectMessage imConn (XGrpMemCon $ memberId (m :: GroupMember)) (GroupId groupId)
_ -> messageWarning "sendXGrpMemCon: member category GCPreMember or GCPostMember is expected"
MSG msgMeta _msgFlags msgBody -> do
checkIntegrityCreateItem (CDGroupRcv gInfo m) msgMeta `catchChatError` \_ -> pure ()
cmdId <- createAckCmd conn
tryChatError (processChatMessage cmdId) >>= \case
Right (ACMsg _ chatMsg, withRcpt) -> do
ackMsg agentConnId cmdId msgMeta $ if withRcpt then Just "" else Nothing
when (memberRole (membership :: GroupMember) >= GRAdmin) $ forwardMsg_ chatMsg
Left e -> ackMsg agentConnId cmdId msgMeta Nothing >> throwError e
let aChatMsgs = parseChatMessages msgBody
withAckMessage agentConnId cmdId msgMeta $ do
forM_ aChatMsgs $ \case
Right (ACMsg _ chatMsg) ->
processEvent cmdId chatMsg `catchChatError` \e -> toView $ CRChatError (Just user) e
Left e -> toView $ CRChatError (Just user) (ChatError . CEException $ "error parsing chat message: " <> e)
checkSendRcpt $ rights aChatMsgs
-- currently only a single message is forwarded
when (memberRole (membership :: GroupMember) >= GRAdmin) $ case aChatMsgs of
[Right (ACMsg _ chatMsg)] -> forwardMsg_ chatMsg
_ -> pure ()
where
processChatMessage :: Int64 -> m (AChatMessage, Bool)
processChatMessage cmdId = do
msg@(ACMsg _ chatMsg) <- parseAChatMessage conn msgMeta msgBody
checkIntegrity chatMsg `catchChatError` \_ -> pure ()
(msg,) <$> processEvent cmdId chatMsg
brokerTs = metaBrokerTs msgMeta
checkIntegrity :: ChatMessage e -> m ()
checkIntegrity ChatMessage {chatMsgEvent} = do
when checkForEvent $ checkIntegrityCreateItem (CDGroupRcv gInfo m) msgMeta
where
checkForEvent = case chatMsgEvent of
XMsgNew _ -> True
XFileCancel _ -> True
XFileAcptInv {} -> True
XGrpMemNew _ -> True
XGrpMemRole {} -> True
XGrpMemDel _ -> True
XGrpLeave -> True
XGrpDel -> True
XGrpInfo _ -> True
XGrpDirectInv {} -> True
_ -> False
processEvent :: MsgEncodingI e => CommandId -> ChatMessage e -> m Bool
processEvent :: MsgEncodingI e => CommandId -> ChatMessage e -> m ()
processEvent cmdId chatMsg = do
(m', conn', msg@RcvMessage {chatMsgEvent = ACME _ event}) <- saveGroupRcvMsg user groupId m conn msgMeta cmdId msgBody chatMsg
updateChatLock "groupMessage" event
case event of
XMsgNew mc -> memberCanSend m' $ newGroupContentMessage gInfo m' mc msg brokerTs
XMsgNew mc -> memberCanSend m' $ newGroupContentMessage gInfo m' mc msg brokerTs False
XMsgFileDescr sharedMsgId fileDescr -> memberCanSend m' $ groupMessageFileDescription gInfo m' sharedMsgId fileDescr
XMsgUpdate sharedMsgId mContent ttl live -> memberCanSend m' $ groupMessageUpdate gInfo m' sharedMsgId mContent msg brokerTs ttl live
XMsgDel sharedMsgId memberId -> groupMessageDelete gInfo m' sharedMsgId memberId msg brokerTs
@@ -3668,15 +3761,17 @@ processAgentMessageConn user@User {userId} corrId agentConnId agentMessage = do
XInfoProbeOk probe -> xInfoProbeOk (COMGroupMember m') probe
BFileChunk sharedMsgId chunk -> bFileChunkGroup gInfo sharedMsgId chunk msgMeta
_ -> messageError $ "unsupported message: " <> T.pack (show event)
checkSendRcpt event
checkSendRcpt :: ChatMsgEvent e -> m Bool
checkSendRcpt event = do
checkSendRcpt :: [AChatMessage] -> m Bool
checkSendRcpt aChatMsgs = do
currentMemCount <- withStore' $ \db -> getGroupCurrentMembersCount db user gInfo
let GroupInfo {chatSettings = ChatSettings {sendRcpts}} = gInfo
pure $
fromMaybe (sendRcptsSmallGroups user) sendRcpts
&& hasDeliveryReceipt (toCMEventTag event)
&& any aChatMsgHasReceipt aChatMsgs
&& currentMemCount <= smallGroupsRcptsMemLimit
where
aChatMsgHasReceipt (ACMsg _ ChatMessage {chatMsgEvent}) =
hasDeliveryReceipt (toCMEventTag chatMsgEvent)
forwardMsg_ :: MsgEncodingI e => ChatMessage e -> m ()
forwardMsg_ chatMsg =
forM_ (forwardedGroupMsg chatMsg) $ \chatMsg' -> do
@@ -4013,15 +4108,11 @@ processAgentMessageConn user@User {userId} corrId agentConnId agentMessage = do
ackMsgDeliveryEvent :: Connection -> CommandId -> m ()
ackMsgDeliveryEvent Connection {connId} ackCmdId =
withStoreCtx'
(Just $ "createRcvMsgDeliveryEvent, connId: " <> show connId <> ", ackCmdId: " <> show ackCmdId <> ", msgDeliveryStatus: MDSRcvAcknowledged")
$ \db -> createRcvMsgDeliveryEvent db connId ackCmdId MDSRcvAcknowledged
withStore' $ \db -> updateRcvMsgDeliveryStatus db connId ackCmdId MDSRcvAcknowledged
sentMsgDeliveryEvent :: Connection -> AgentMsgId -> m ()
sentMsgDeliveryEvent Connection {connId} msgId =
withStoreCtx
(Just $ "createSndMsgDeliveryEvent, connId: " <> show connId <> ", msgId: " <> show msgId <> ", msgDeliveryStatus: MDSSndSent")
$ \db -> createSndMsgDeliveryEvent db connId msgId MDSSndSent
withStore' $ \db -> updateSndMsgDeliveryStatus db connId msgId MDSSndSent
agentErrToItemStatus :: AgentErrorType -> CIStatus 'MDSnd
agentErrToItemStatus (SMP AUTH) = CISSndErrorAuth
@@ -4283,20 +4374,21 @@ processAgentMessageConn user@User {userId} corrId agentConnId agentMessage = do
ChatErrorStore (SEChatItemSharedMsgIdNotFound sharedMsgId) -> handle sharedMsgId
e -> throwError e
newGroupContentMessage :: GroupInfo -> GroupMember -> MsgContainer -> RcvMessage -> UTCTime -> m ()
newGroupContentMessage gInfo m@GroupMember {memberId, memberRole} mc msg@RcvMessage {sharedMsgId_} brokerTs
newGroupContentMessage :: GroupInfo -> GroupMember -> MsgContainer -> RcvMessage -> UTCTime -> Bool -> m ()
newGroupContentMessage gInfo m@GroupMember {memberId, memberRole} mc msg@RcvMessage {sharedMsgId_} brokerTs forwarded
| isVoice content && not (groupFeatureAllowed SGFVoice gInfo) = rejected GFVoice
| not (isVoice content) && isJust fInv_ && not (groupFeatureAllowed SGFFiles gInfo) = rejected GFFiles
| otherwise = do
-- TODO integrity message check
-- check if message moderation event was received ahead of message
let timed_ = rcvGroupCITimed gInfo itemTTL
live = fromMaybe False live_
withStore' (\db -> getCIModeration db user gInfo memberId sharedMsgId_) >>= \case
Just ciModeration -> do
applyModeration timed_ live ciModeration
withStore' $ \db -> deleteCIModeration db gInfo memberId sharedMsgId_
Nothing -> createItem timed_ live
let timed_ =
if forwarded
then rcvCITimed_ (Just Nothing) itemTTL
else rcvGroupCITimed gInfo itemTTL
live = fromMaybe False live_
withStore' (\db -> getCIModeration db user gInfo memberId sharedMsgId_) >>= \case
Just ciModeration -> do
applyModeration timed_ live ciModeration
withStore' $ \db -> deleteCIModeration db gInfo memberId sharedMsgId_
Nothing -> createItem timed_ live
where
rejected f = void $ newChatItem (CIRcvGroupFeatureRejected f) Nothing Nothing False
ExtMsgContent content fInv_ itemTTL live_ = mcExtMsgContent mc
@@ -5217,7 +5309,7 @@ processAgentMessageConn user@User {userId} corrId agentConnId agentMessage = do
let body = LB.toStrict $ J.encode msg
rcvMsg@RcvMessage {chatMsgEvent = ACME _ event} <- saveGroupFwdRcvMsg user groupId m author body chatMsg
case event of
XMsgNew mc -> memberCanSend author $ newGroupContentMessage gInfo author mc rcvMsg msgTs
XMsgNew mc -> memberCanSend author $ newGroupContentMessage gInfo author mc rcvMsg msgTs True
XMsgFileDescr sharedMsgId fileDescr -> memberCanSend author $ groupMessageFileDescription gInfo author sharedMsgId fileDescr
XMsgUpdate sharedMsgId mContent ttl live -> memberCanSend author $ groupMessageUpdate gInfo author sharedMsgId mContent rcvMsg msgTs ttl live
XMsgDel sharedMsgId memId -> groupMessageDelete gInfo author sharedMsgId memId rcvMsg msgTs
@@ -5236,14 +5328,19 @@ processAgentMessageConn user@User {userId} corrId agentConnId agentMessage = do
directMsgReceived ct conn@Connection {connId} msgMeta msgRcpts = do
checkIntegrityCreateItem (CDDirectRcv ct) msgMeta
forM_ msgRcpts $ \MsgReceipt {agentMsgId, msgRcptStatus} -> do
withStore $ \db -> createSndMsgDeliveryEvent db connId agentMsgId $ MDSSndRcvd msgRcptStatus
withStore' $ \db -> updateSndMsgDeliveryStatus db connId agentMsgId $ MDSSndRcvd msgRcptStatus
updateDirectItemStatus ct conn agentMsgId $ CISSndRcvd msgRcptStatus SSPComplete
-- TODO [batch send] update status of all messages in batch
-- - this is for when we implement identifying inactive connections
-- - regular messages sent in batch would all be marked as delivered by a single receipt
-- - repeat for directMsgReceived if same logic is applied to direct messages
-- - getChatItemIdByAgentMsgId to return [ChatItemId]
groupMsgReceived :: GroupInfo -> GroupMember -> Connection -> MsgMeta -> NonEmpty MsgReceipt -> m ()
groupMsgReceived gInfo m conn@Connection {connId} msgMeta msgRcpts = do
checkIntegrityCreateItem (CDGroupRcv gInfo m) msgMeta
forM_ msgRcpts $ \MsgReceipt {agentMsgId, msgRcptStatus} -> do
withStore $ \db -> createSndMsgDeliveryEvent db connId agentMsgId $ MDSSndRcvd msgRcptStatus
withStore' $ \db -> updateSndMsgDeliveryStatus db connId agentMsgId $ MDSSndRcvd msgRcptStatus
updateGroupItemStatus gInfo m conn agentMsgId $ CISSndRcvd msgRcptStatus SSPComplete
updateDirectItemStatus :: Contact -> Connection -> AgentMsgId -> CIStatus 'MDSnd -> m ()
@@ -5334,17 +5431,13 @@ sendFileInline_ FileTransferMeta {filePath, chunkSize} sharedMsgId sendMsg =
chSize = fromIntegral chunkSize
parseChatMessage :: ChatMonad m => Connection -> ByteString -> m (ChatMessage 'Json)
parseChatMessage conn = parseChatMessage_ conn Nothing
{-# INLINE parseChatMessage #-}
parseAChatMessage :: ChatMonad m => Connection -> MsgMeta -> ByteString -> m AChatMessage
parseAChatMessage conn msgMeta = parseChatMessage_ conn (Just msgMeta)
{-# INLINE parseAChatMessage #-}
parseChatMessage_ :: (ChatMonad m, StrEncoding s) => Connection -> Maybe MsgMeta -> ByteString -> m s
parseChatMessage_ conn msgMeta s = liftEither . first (ChatError . errType) $ strDecode s
parseChatMessage conn s = do
case parseChatMessages s of
[msg] -> liftEither . first (ChatError . errType) $ (\(ACMsg _ m) -> checkEncoding m) =<< msg
_ -> throwChatError $ CEException "parseChatMessage: single message is expected"
where
errType = CEInvalidChatMessage conn (msgMetaToJson <$> msgMeta) (safeDecodeUtf8 s)
errType = CEInvalidChatMessage conn Nothing (safeDecodeUtf8 s)
{-# INLINE parseChatMessage #-}
sendFileChunk :: ChatMonad m => User -> SndFileTransfer -> m ()
sendFileChunk user ft@SndFileTransfer {fileId, fileStatus, agentConnId = AgentConnId acId} =
@@ -5521,40 +5614,77 @@ createSndMessage :: (MsgEncodingI e, ChatMonad m) => ChatMsgEvent e -> ConnOrGro
createSndMessage chatMsgEvent connOrGroupId = do
gVar <- asks random
ChatConfig {chatVRange} <- asks config
withStore $ \db -> createNewSndMessage db gVar connOrGroupId $ \sharedMsgId ->
let msgBody = strEncode ChatMessage {chatVRange, msgId = Just sharedMsgId, chatMsgEvent}
in NewMessage {chatMsgEvent, msgBody}
withStore $ \db -> createNewSndMessage db gVar connOrGroupId chatMsgEvent (encodeMessage chatVRange)
where
encodeMessage chatVRange sharedMsgId =
encodeChatMessage ChatMessage {chatVRange, msgId = Just sharedMsgId, chatMsgEvent}
sendGroupMemberMessages :: forall e m. (MsgEncodingI e, ChatMonad m) => User -> Connection -> NonEmpty (ChatMsgEvent e) -> GroupId -> m ()
sendGroupMemberMessages user conn@Connection {connId} events groupId = do
when (connDisabled conn) $ throwChatError (CEConnectionDisabled conn)
(errs, msgs) <- partitionEithers <$> createSndMessages
unless (null errs) $ toView $ CRChatErrors (Just user) errs
unless (null msgs) $ do
let (errs', msgBatches) = partitionEithers $ batchMessages maxChatMsgSize msgs
-- shouldn't happen, as large messages would have caused createNewSndMessage to throw SELargeMsg
unless (null errs') $ toView $ CRChatErrors (Just user) errs'
forM_ msgBatches $ \batch ->
processBatch batch `catchChatError` (toView . CRChatError (Just user))
where
processBatch :: MsgBatch -> m ()
processBatch (MsgBatch builder sndMsgs) = do
let batchBody = LB.toStrict $ toLazyByteString builder
agentMsgId <- withAgent $ \a -> sendMessage a (aConnId conn) MsgFlags {notification = True} batchBody
let sndMsgDelivery = SndMsgDelivery {connId, agentMsgId}
void . withStoreBatch' $ \db -> map (\SndMessage {msgId} -> createSndMsgDelivery db sndMsgDelivery msgId) sndMsgs
createSndMessages :: m [Either ChatError SndMessage]
createSndMessages = do
gVar <- asks random
ChatConfig {chatVRange} <- asks config
withStoreBatch $ \db -> map (createMsg db gVar chatVRange) (toList events)
createMsg db gVar chatVRange evnt = do
r <- runExceptT $ createNewSndMessage db gVar (GroupId groupId) evnt (encodeMessage chatVRange evnt)
pure $ first ChatErrorStore r
encodeMessage chatVRange evnt sharedMsgId =
encodeChatMessage ChatMessage {chatVRange, msgId = Just sharedMsgId, chatMsgEvent = evnt}
directMessage :: (MsgEncodingI e, ChatMonad m) => ChatMsgEvent e -> m ByteString
directMessage chatMsgEvent = do
ChatConfig {chatVRange} <- asks config
pure $ strEncode ChatMessage {chatVRange, msgId = Nothing, chatMsgEvent}
let r = encodeChatMessage ChatMessage {chatVRange, msgId = Nothing, chatMsgEvent}
case r of
ECMEncoded encodedBody -> pure . LB.toStrict $ encodedBody
ECMLarge -> throwChatError $ CEException "large message"
deliverMessage :: ChatMonad m => Connection -> CMEventTag e -> MsgBody -> MessageId -> m Int64
deliverMessage conn cmEventTag msgBody msgId =
deliverMessages [(conn, cmEventTag, msgBody, msgId)] >>= \case
deliverMessage :: ChatMonad m => Connection -> CMEventTag e -> LazyMsgBody -> MessageId -> m Int64
deliverMessage conn cmEventTag msgBody msgId = do
let msgFlags = MsgFlags {notification = hasNotification cmEventTag}
deliverMessage' conn msgFlags msgBody msgId
deliverMessage' :: ChatMonad m => Connection -> MsgFlags -> LazyMsgBody -> MessageId -> m Int64
deliverMessage' conn msgFlags msgBody msgId =
deliverMessages [(conn, msgFlags, msgBody, msgId)] >>= \case
[r] -> liftEither r
rs -> throwChatError $ CEInternalError $ "deliverMessage: expected 1 result, got " <> show (length rs)
deliverMessages :: ChatMonad' m => [(Connection, CMEventTag e, MsgBody, MessageId)] -> m [Either ChatError Int64]
deliverMessages :: ChatMonad' m => [(Connection, MsgFlags, LazyMsgBody, MessageId)] -> m [Either ChatError Int64]
deliverMessages msgReqs = do
sent <- zipWith prepareBatch msgReqs <$> withAgent' (`sendMessages` aReqs)
withStoreBatch $ \db -> map (bindRight $ createDelivery db) sent
where
aReqs = map (\(conn, cmEvTag, msgBody, _msgId) -> (aConnId conn, msgFlags cmEvTag, msgBody)) msgReqs
msgFlags cmEvTag = MsgFlags {notification = hasNotification cmEvTag}
aReqs = map (\(conn, msgFlags, msgBody, _msgId) -> (aConnId conn, msgFlags, LB.toStrict msgBody)) msgReqs
prepareBatch req = bimap (`ChatErrorAgent` Nothing) (req,)
createDelivery :: DB.Connection -> ((Connection, CMEventTag e, MsgBody, MessageId), AgentMsgId) -> IO (Either ChatError Int64)
createDelivery :: DB.Connection -> ((Connection, MsgFlags, LazyMsgBody, MessageId), AgentMsgId) -> IO (Either ChatError Int64)
createDelivery db ((Connection {connId}, _, _, msgId), agentMsgId) =
Right <$> createSndMsgDelivery db (SndMsgDelivery {connId, agentMsgId}) msgId
sendGroupMessage :: (MsgEncodingI e, ChatMonad m) => User -> GroupInfo -> [GroupMember] -> ChatMsgEvent e -> m (SndMessage, [GroupMember])
sendGroupMessage user GroupInfo {groupId} members chatMsgEvent = do
msg@SndMessage {msgId, msgBody} <- createSndMessage chatMsgEvent (GroupId groupId)
recipientMembers <- liftIO $ shuffleMembers (filter memberCurrent members) $ \GroupMember {memberRole} -> memberRole
let tag = toCMEventTag chatMsgEvent
recipientMembers <- liftIO $ shuffleMembers (filter memberCurrent members)
let msgFlags = MsgFlags {notification = hasNotification $ toCMEventTag chatMsgEvent}
(toSend, pending) = foldr addMember ([], []) recipientMembers
msgReqs = map (\(_, conn) -> (conn, tag, msgBody, msgId)) toSend
msgReqs = map (\(_, conn) -> (conn, msgFlags, msgBody, msgId)) toSend
delivered <- deliverMessages msgReqs
let errors = lefts delivered
unless (null errors) $ toView $ CRChatErrors (Just user) errors
@@ -5562,6 +5692,12 @@ sendGroupMessage user GroupInfo {groupId} members chatMsgEvent = do
let sentToMembers = filterSent delivered toSend fst <> filterSent stored pending id
pure (msg, sentToMembers)
where
shuffleMembers :: [GroupMember] -> IO [GroupMember]
shuffleMembers ms = do
let (adminMs, otherMs) = partition isAdmin ms
liftM2 (<>) (shuffle adminMs) (shuffle otherMs)
where
isAdmin GroupMember {memberRole} = memberRole >= GRAdmin
addMember m (toSend, pending) = case memberSendAction chatMsgEvent members m of
Just (MSASend conn) -> ((m, conn) : toSend, pending)
Just MSAPending -> (toSend, m : pending)
@@ -5610,15 +5746,6 @@ sendGroupMemberMessage user m@GroupMember {groupMemberId} chatMsgEvent groupId i
MSASend conn -> deliverMessage conn (toCMEventTag chatMsgEvent) msgBody msgId >> postDeliver
MSAPending -> withStore' $ \db -> createPendingGroupMessage db groupMemberId msgId introId_
shuffleMembers :: [a] -> (a -> GroupMemberRole) -> IO [a]
shuffleMembers ms role = do
let (adminMs, otherMs) = partition ((GRAdmin <=) . role) ms
liftM2 (<>) (shuffle adminMs) (shuffle otherMs)
where
random :: IO Word16
random = randomRIO (0, 65535)
shuffle xs = map snd . sortBy (comparing fst) <$> mapM (\x -> (,x) <$> random) xs
sendPendingGroupMessages :: ChatMonad m => User -> GroupMember -> Connection -> m ()
sendPendingGroupMessages user GroupMember {groupMemberId, localDisplayName} conn = do
pendingMessages <- withStore' $ \db -> getPendingGroupMessages db groupMemberId
@@ -5635,21 +5762,25 @@ sendPendingGroupMessages user GroupMember {groupMemberId, localDisplayName} conn
_ -> throwChatError $ CEGroupMemberIntroNotFound localDisplayName
_ -> pure ()
-- TODO [batch send] refactor direct message processing same as groups (e.g. checkIntegrity before processing)
saveDirectRcvMSG :: ChatMonad m => Connection -> MsgMeta -> CommandId -> MsgBody -> m (Connection, RcvMessage)
saveDirectRcvMSG conn@Connection {connId} agentMsgMeta agentAckCmdId msgBody = do
ACMsg _ ChatMessage {chatVRange, msgId = sharedMsgId_, chatMsgEvent} <- parseAChatMessage conn agentMsgMeta msgBody
conn' <- updatePeerChatVRange conn chatVRange
let agentMsgId = fst $ recipient agentMsgMeta
newMsg = NewMessage {chatMsgEvent, msgBody}
rcvMsgDelivery = RcvMsgDelivery {connId, agentMsgId, agentMsgMeta, agentAckCmdId}
msg <- withStore $ \db -> createNewMessageAndRcvMsgDelivery db (ConnectionId connId) newMsg sharedMsgId_ rcvMsgDelivery Nothing
pure (conn', msg)
saveDirectRcvMSG conn@Connection {connId} agentMsgMeta agentAckCmdId msgBody =
case parseChatMessages msgBody of
[Right (ACMsg _ ChatMessage {chatVRange, msgId = sharedMsgId_, chatMsgEvent})] -> do
conn' <- updatePeerChatVRange conn chatVRange
let agentMsgId = fst $ recipient agentMsgMeta
newMsg = NewRcvMessage {chatMsgEvent, msgBody}
rcvMsgDelivery = RcvMsgDelivery {connId, agentMsgId, agentMsgMeta, agentAckCmdId}
msg <- withStore $ \db -> createNewMessageAndRcvMsgDelivery db (ConnectionId connId) newMsg sharedMsgId_ rcvMsgDelivery Nothing
pure (conn', msg)
[Left e] -> error $ "saveDirectRcvMSG: error parsing chat message: " <> e
_ -> error "saveDirectRcvMSG: batching not supported"
saveGroupRcvMsg :: (MsgEncodingI e, ChatMonad m) => User -> GroupId -> GroupMember -> Connection -> MsgMeta -> CommandId -> MsgBody -> ChatMessage e -> m (GroupMember, Connection, RcvMessage)
saveGroupRcvMsg user groupId authorMember conn@Connection {connId} agentMsgMeta agentAckCmdId msgBody ChatMessage {chatVRange, msgId = sharedMsgId_, chatMsgEvent} = do
(am', conn') <- updateMemberChatVRange authorMember conn chatVRange
let agentMsgId = fst $ recipient agentMsgMeta
newMsg = NewMessage {chatMsgEvent, msgBody}
newMsg = NewRcvMessage {chatMsgEvent, msgBody}
rcvMsgDelivery = RcvMsgDelivery {connId, agentMsgId, agentMsgMeta, agentAckCmdId}
amId = Just $ groupMemberId' am'
msg <-
@@ -5665,7 +5796,7 @@ saveGroupRcvMsg user groupId authorMember conn@Connection {connId} agentMsgMeta
saveGroupFwdRcvMsg :: (MsgEncodingI e, ChatMonad m) => User -> GroupId -> GroupMember -> GroupMember -> MsgBody -> ChatMessage e -> m RcvMessage
saveGroupFwdRcvMsg user groupId forwardingMember refAuthorMember msgBody ChatMessage {msgId = sharedMsgId_, chatMsgEvent} = do
let newMsg = NewMessage {chatMsgEvent, msgBody}
let newMsg = NewRcvMessage {chatMsgEvent, msgBody}
fwdMemberId = Just $ groupMemberId' forwardingMember
refAuthorId = Just $ groupMemberId' refAuthorMember
withStore (\db -> createNewRcvMessage db (GroupId groupId) newMsg sharedMsgId_ refAuthorId fwdMemberId)
@@ -6229,6 +6360,7 @@ chatCommandP =
"/set voice @" *> (SetContactFeature (ACF SCFVoice) <$> displayName <*> optional (A.space *> strP)),
"/set voice " *> (SetUserFeature (ACF SCFVoice) <$> strP),
"/set files #" *> (SetGroupFeature (AGF SGFFiles) <$> displayName <*> (A.space *> strP)),
"/set history #" *> (SetGroupFeature (AGF SGFHistory) <$> displayName <*> (A.space *> strP)),
"/set calls @" *> (SetContactFeature (ACF SCFCalls) <$> displayName <*> optional (A.space *> strP)),
"/set calls " *> (SetUserFeature (ACF SCFCalls) <$> strP),
"/set delete #" *> (SetGroupFeature (AGF SGFFullDelete) <$> displayName <*> (A.space *> strP)),
@@ -6316,7 +6448,12 @@ chatCommandP =
jsonP = J.eitherDecodeStrict' <$?> A.takeByteString
groupProfile = do
(gName, fullName) <- profileNames
let groupPreferences = Just (emptyGroupPrefs :: GroupPreferences) {directMessages = Just DirectMessagesGroupPreference {enable = FEOn}}
let groupPreferences =
Just
(emptyGroupPrefs :: GroupPreferences)
{ directMessages = Just DirectMessagesGroupPreference {enable = FEOn},
history = Just HistoryGroupPreference {enable = FEOn}
}
pure GroupProfile {displayName = gName, fullName, description = Nothing, image = Nothing, groupPreferences}
fullNameP = A.space *> textP <|> pure ""
textP = safeDecodeUtf8 <$> A.takeByteString
@@ -6354,6 +6491,7 @@ chatCommandP =
<|> ("day" $> 86400)
<|> ("week" $> (7 * 86400))
<|> ("month" $> (30 * 86400))
<|> A.decimal
timedTTLOnOffP =
optional ("on" *> A.space) *> (Just <$> timedTTLP)
<|> ("off" $> Nothing)