core: compressed message encoding, variable vrange (#3844)

This commit is contained in:
Alexander Bondarenko
2024-03-06 16:02:19 +02:00
committed by GitHub
parent eebf014ff7
commit 64dc758ffd
11 changed files with 230 additions and 138 deletions
+131 -74
View File
@@ -22,6 +22,7 @@ import Control.Monad
import Control.Monad.Except
import Control.Monad.IO.Unlift
import Control.Monad.Reader
import Crypto.Random (ChaChaDRG)
import qualified Data.Aeson as J
import Data.Attoparsec.ByteString.Char8 (Parser)
import qualified Data.Attoparsec.ByteString.Char8 as A
@@ -97,9 +98,11 @@ import Simplex.Messaging.Agent.Store.SQLite.DB (SlowQueryStats (..))
import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB
import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations
import Simplex.Messaging.Client (defaultNetworkConfig)
import Simplex.Messaging.Compression (withCompressCtx)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Crypto.File (CryptoFile (..), CryptoFileArgs (..))
import qualified Simplex.Messaging.Crypto.File as CF
import Simplex.Messaging.Crypto.Ratchet (PQEncryption, pattern PQEncOff)
import qualified Simplex.Messaging.Crypto.Ratchet as CR
import Simplex.Messaging.Encoding
import Simplex.Messaging.Encoding.String
@@ -364,7 +367,8 @@ startChatController mainApp = do
subscribeUsers :: forall m. ChatMonad' m => Bool -> [User] -> m ()
subscribeUsers onlyNeeded users = do
let (us, us') = partition activeUser users
vr <- chatVersionRange
-- TODO PQ this is only used to set membership version range
vr <- chatVersionRange PQEncOff
subscribe vr us
subscribe vr us'
where
@@ -446,7 +450,9 @@ parseChatCommand = A.parseOnly chatCommandP . B.dropWhileEnd isSpace
-- | Chat API commands interpreted in context of a local zone
processChatCommand :: forall m. ChatMonad m => ChatCommand -> m ChatResponse
processChatCommand cmd = chatVersionRange >>= (`processChatCommand'` cmd)
processChatCommand cmd =
chatVersionRange PQEncOff -- TODO PQ this is only used to set membership version range (?)
>>= (`processChatCommand'` cmd)
{-# INLINE processChatCommand #-}
processChatCommand' :: forall m. ChatMonad m => VersionRangeChat -> ChatCommand -> m ChatResponse
@@ -1416,8 +1422,8 @@ processChatCommand' vr = \case
-- [incognito] generate profile to send
incognitoProfile <- if incognito then Just <$> liftIO generateRandomProfile else pure Nothing
let profileToSend = userProfileToSend user incognitoProfile Nothing False
dm <- directMessage $ XInfo profileToSend
enablePQ <- readTVarIO =<< asks pqExperimentalEnabled
dm <- directMessagePQ (CR.PQEncryption enablePQ) maxConnInfoLength $ XInfo profileToSend
connId <- withAgent $ \a -> joinConnection a (aUserId user) True cReq dm (CR.PQEncryption enablePQ) subMode
conn <- withStore' $ \db -> createDirectConnection db user connId cReq ConnJoined (incognitoProfile $> profileToSend) subMode enablePQ
pure $ CRSentConfirmation user conn
@@ -2146,7 +2152,7 @@ processChatCommand' vr = \case
where
connect' groupLinkId cReqHash xContactId inGroup = do
enablePQ <- (not inGroup &&) <$> (readTVarIO =<< asks pqExperimentalEnabled)
(connId, incognitoProfile, subMode) <- requestContact user incognito cReq xContactId inGroup enablePQ
(connId, incognitoProfile, subMode) <- requestContact user incognito cReq xContactId inGroup (CR.PQEncryption enablePQ)
conn <- withStore' $ \db -> createConnReqConnection db userId connId cReqHash xContactId incognitoProfile groupLinkId subMode enablePQ
pure $ CRSentInvitation user conn incognitoProfile
connectContactViaAddress :: User -> IncognitoEnabled -> Contact -> ConnectionRequestUri 'CMContact -> m ChatResponse
@@ -2154,18 +2160,18 @@ processChatCommand' vr = \case
withChatLock "connectViaContact" $ do
newXContactId <- XContactId <$> drgRandomBytes 16
enablePQ <- readTVarIO =<< asks pqExperimentalEnabled
(connId, incognitoProfile, subMode) <- requestContact user incognito cReq newXContactId False enablePQ
(connId, incognitoProfile, subMode) <- requestContact user incognito cReq newXContactId False (CR.PQEncryption enablePQ)
let cReqHash = ConnReqUriHash . C.sha256Hash $ strEncode cReq
ct' <- withStore $ \db -> createAddressContactConnection db user ct connId cReqHash newXContactId incognitoProfile subMode enablePQ
pure $ CRSentInvitationToContact user ct' incognitoProfile
requestContact :: User -> IncognitoEnabled -> ConnectionRequestUri 'CMContact -> XContactId -> Bool -> PQFlag -> m (ConnId, Maybe Profile, SubscriptionMode)
requestContact user incognito cReq xContactId inGroup enablePQ = do
requestContact :: User -> IncognitoEnabled -> ConnectionRequestUri 'CMContact -> XContactId -> Bool -> PQEncryption -> m (ConnId, Maybe Profile, SubscriptionMode)
requestContact user incognito cReq xContactId inGroup pqEnc = do
-- [incognito] generate profile to send
incognitoProfile <- if incognito then Just <$> liftIO generateRandomProfile else pure Nothing
let profileToSend = userProfileToSend user incognitoProfile Nothing inGroup
dm <- directMessage (XContact profileToSend $ Just xContactId)
dm <- directMessagePQ pqEnc maxConnInfoLength (XContact profileToSend $ Just xContactId)
subMode <- chatReadVar subscriptionMode
connId <- withAgent $ \a -> joinConnection a (aUserId user) True cReq dm (CR.PQEncryption enablePQ) subMode
connId <- withAgent $ \a -> joinConnection a (aUserId user) True cReq dm pqEnc subMode
pure (connId, incognitoProfile, subMode)
contactMember :: Contact -> [GroupMember] -> Maybe GroupMember
contactMember Contact {contactId} =
@@ -2190,15 +2196,18 @@ processChatCommand' vr = \case
user' <- updateUser
asks currentUser >>= atomically . (`writeTVar` Just user')
withChatLock "updateProfile" . procCmd $ do
let changedCts = foldr (addChangedProfileContact user') [] contacts
idsEvts = map ctSndMsg changedCts
enablePQ <- readTVarIO =<< asks pqExperimentalEnabled
msgReqs_ <- zipWith (ctMsgReq enablePQ) changedCts <$> createSndMessages idsEvts
(errs, cts) <- partitionEithers . zipWith (second . const) changedCts <$> deliverMessagesB msgReqs_
unless (null errs) $ toView $ CRChatErrors (Just user) errs
let changedCts' = filter (\ChangedProfileContact {ct, ct'} -> directOrUsed ct' && mergedPreferences ct' /= mergedPreferences ct) cts
createContactsSndFeatureItems user' changedCts'
let summary =
let changedCts_ = L.nonEmpty $ foldr (addChangedProfileContact user') [] contacts
summary <- case changedCts_ of
Nothing -> pure $ UserProfileUpdateSummary 0 0 []
Just changedCts -> do
let idsEvts = L.map ctSndMsg changedCts
enablePQ <- readTVarIO =<< asks pqExperimentalEnabled
msgReqs_ <- L.zipWith (ctMsgReq enablePQ) changedCts <$> createSndMessages idsEvts
(errs, cts) <- partitionEithers . L.toList . L.zipWith (second . const) changedCts <$> deliverMessagesB msgReqs_
unless (null errs) $ toView $ CRChatErrors (Just user) errs
let changedCts' = filter (\ChangedProfileContact {ct, ct'} -> directOrUsed ct' && mergedPreferences ct' /= mergedPreferences ct) cts
createContactsSndFeatureItems user' changedCts'
pure
UserProfileUpdateSummary
{ updateSuccesses = length cts,
updateFailures = length errs,
@@ -2217,8 +2226,8 @@ processChatCommand' vr = \case
mergedProfile = userProfileToSend user Nothing (Just ct) False
ct' = updateMergedPreferences user' ct
mergedProfile' = userProfileToSend user' Nothing (Just ct') False
ctSndMsg :: ChangedProfileContact -> (ConnOrGroupId, ChatMsgEvent 'Json)
ctSndMsg ChangedProfileContact {mergedProfile', conn = Connection {connId}} = (ConnectionId connId, XInfo mergedProfile')
ctSndMsg :: ChangedProfileContact -> (ConnOrGroupId, PQEncryption, ChatMsgEvent 'Json)
ctSndMsg ChangedProfileContact {mergedProfile', conn = Connection {connId, enablePQ = enablePQConn}} = (ConnectionId connId, CR.PQEncryption enablePQConn, XInfo mergedProfile')
ctMsgReq :: PQFlag -> ChangedProfileContact -> Either ChatError SndMessage -> Either ChatError MsgReq
ctMsgReq enablePQ ChangedProfileContact {conn = conn@Connection {enablePQ = enablePQConn}} =
fmap $ \SndMessage {msgId, msgBody} ->
@@ -2725,7 +2734,8 @@ acceptFileReceive user@User {userId} RcvFileTransfer {fileId, xftpRcvFile, fileI
unless (fileStatus == RFSNew) $ case fileStatus of
RFSCancelled _ -> throwChatError $ CEFileCancelled fName
_ -> throwChatError $ CEFileAlreadyReceiving fName
vr <- chatVersionRange
-- TODO PQ this is only used to set membership version range
vr <- chatVersionRange PQEncOff
case (xftpRcvFile, fileConnReq) of
-- direct file protocol
(Nothing, Just connReq) -> do
@@ -2764,7 +2774,8 @@ acceptFileReceive user@User {userId} RcvFileTransfer {fileId, xftpRcvFile, fileI
acceptFile cmdFunction send = do
filePath <- getRcvFilePath fileId filePath_ fName True
inline <- receiveInline
vr <- chatVersionRange
-- TODO PQ this is only used to set membership version range
vr <- chatVersionRange PQEncOff
if
| inline -> do
-- accepting inline
@@ -2811,7 +2822,8 @@ receiveViaURI user@User {userId} FileDescriptionURI {description} cf@CryptoFile
startReceivingFile :: ChatMonad m => User -> FileTransferId -> m ()
startReceivingFile user fileId = do
vr <- chatVersionRange
-- TODO PQ this is only used to set membership version range
vr <- chatVersionRange PQEncOff
ci <- withStoreCtx (Just "startReceivingFile, updateRcvFileStatus ...") $ \db -> do
liftIO $ updateRcvFileStatus db fileId FSConnected
liftIO $ updateCIFileStatus db user fileId $ CIFSRcvTransfer 0 1
@@ -2856,8 +2868,8 @@ acceptContactRequest :: ChatMonad m => User -> UserContactRequest -> Maybe Incog
acceptContactRequest user UserContactRequest {agentInvitationId = AgentInvId invId, cReqChatVRange, localDisplayName = cName, profileId, profile = cp, userContactLinkId, xContactId} incognitoProfile contactUsed = do
subMode <- chatReadVar subscriptionMode
let profileToSend = profileToSendOnAccept user incognitoProfile False
dm <- directMessage $ XInfo profileToSend
enablePQ <- readTVarIO =<< asks pqExperimentalEnabled
dm <- directMessagePQ (CR.PQEncryption enablePQ) maxConnInfoLength $ XInfo profileToSend
acId <- withAgent $ \a -> acceptContact a True invId dm (CR.PQEncryption enablePQ) subMode
withStore' $ \db -> createAcceptedContact db user acId (fromJVersionRange cReqChatVRange) cName profileId cp userContactLinkId xContactId incognitoProfile subMode enablePQ contactUsed
@@ -3182,7 +3194,8 @@ deleteTimedItem user (ChatRef cType chatId, itemId) deleteAt = do
ts <- liftIO getCurrentTime
liftIO $ threadDelay' $ diffToMicroseconds $ diffUTCTime deleteAt ts
waitChatStartedAndActivated
vr <- chatVersionRange
-- TODO PQ this is only used to set membership version range
vr <- chatVersionRange PQEncOff
case cType of
CTDirect -> do
(ct, CChatItem _ ci) <- withStore $ \db -> (,) <$> getContact db user chatId <*> getDirectChatItem db user chatId itemId
@@ -3203,7 +3216,8 @@ startUpdatedTimedItemThread user chatRef ci ci' =
expireChatItems :: forall m. ChatMonad m => User -> Int64 -> Bool -> m ()
expireChatItems user@User {userId} ttl sync = do
currentTs <- liftIO getCurrentTime
vr <- chatVersionRange
-- TODO PQ this is only used to set membership version range
vr <- chatVersionRange PQEncOff
let expirationDate = addUTCTime (-1 * fromIntegral ttl) currentTs
-- this is to keep group messages created during last 12 hours even if they're expired according to item_ts
createdAtCutoff = addUTCTime (-43200 :: NominalDiffTime) currentTs
@@ -3250,7 +3264,8 @@ processAgentMessage _ connId (DEL_RCVQ srv qId err_) =
processAgentMessage _ connId DEL_CONN =
toView $ CRAgentConnDeleted (AgentConnId connId)
processAgentMessage corrId connId msg = do
vr <- chatVersionRange
-- TODO PQ this is only used to set membership version range (?)
vr <- chatVersionRange PQEncOff
withStore' (`getUserByAConnId` AgentConnId connId) >>= \case
Just user -> processAgentMessageConn vr user corrId connId msg `catchChatError` (toView . CRChatError (Just user))
_ -> throwChatError $ CENoConnectionUser (AgentConnId connId)
@@ -3289,7 +3304,8 @@ processAgentMsgSndFile _corrId aFileId msg =
(ft@FileTransferMeta {fileId, xftpRedirectFor, cancelled}, sfts) <- withStore $ \db -> do
fileId <- getXFTPSndFileDBId db user $ AgentSndFileId aFileId
getSndFileTransfer db user fileId
vr <- chatVersionRange
-- TODO PQ this is only used to set membership version range
vr <- chatVersionRange PQEncOff
unless cancelled $ case msg of
SFPROG sndProgress sndTotal -> do
let status = CIFSSndTransfer {sndProgress, sndTotal}
@@ -3408,7 +3424,8 @@ processAgentMsgRcvFile _corrId aFileId msg =
ft@RcvFileTransfer {fileId} <- withStore $ \db -> do
fileId <- getXFTPRcvFileDBId db $ AgentRcvFileId aFileId
getRcvFileTransfer db user fileId
vr <- chatVersionRange
-- TODO PQ this is only used to set membership version range
vr <- chatVersionRange PQEncOff
unless (rcvFileCompleteOrCancelled ft) $ case msg of
RFPROG rcvProgress rcvTotal -> do
let status = CIFSRcvTransfer {rcvProgress, rcvTotal}
@@ -5827,7 +5844,7 @@ parseChatMessage conn s = do
sendFileChunk :: ChatMonad m => User -> SndFileTransfer -> m ()
sendFileChunk user ft@SndFileTransfer {fileId, fileStatus, agentConnId = AgentConnId acId} =
unless (fileStatus == FSComplete || fileStatus == FSCancelled) $ do
vr <- chatVersionRange
vr <- chatVersionRange PQEncOff
withStore' (`createSndFileChunk` ft) >>= \case
Just chunkNo -> sendFileChunkNo ft chunkNo
Nothing -> do
@@ -6012,51 +6029,83 @@ contactSendConn_ ct@Contact {activeConn} = case activeConn of
sendDirectMessage :: (MsgEncodingI e, ChatMonad m) => Connection -> CR.PQEncryption -> ChatMsgEvent e -> ConnOrGroupId -> m (SndMessage, Int64, CR.PQEncryption)
sendDirectMessage conn pqEnc chatMsgEvent connOrGroupId = do
when (connDisabled conn) $ throwChatError (CEConnectionDisabled conn)
msg@SndMessage {msgId, msgBody} <- createSndMessage chatMsgEvent connOrGroupId
msg@SndMessage {msgId, msgBody} <- createSndMessage chatMsgEvent connOrGroupId pqEnc
(msgDeliveryId, pqEnc') <- deliverMessage conn pqEnc (toCMEventTag chatMsgEvent) msgBody msgId
pure (msg, msgDeliveryId, pqEnc')
createSndMessage :: (MsgEncodingI e, ChatMonad m) => ChatMsgEvent e -> ConnOrGroupId -> m SndMessage
createSndMessage chatMsgEvent connOrGroupId =
liftEither . runIdentity =<< createSndMessages (Identity (connOrGroupId, chatMsgEvent))
createSndMessage :: (MsgEncodingI e, ChatMonad m) => ChatMsgEvent e -> ConnOrGroupId -> PQEncryption -> m SndMessage
createSndMessage chatMsgEvent connOrGroupId pqEnc =
liftEither . runIdentity =<< createSndMessages (Identity (connOrGroupId, pqEnc, chatMsgEvent))
createSndMessages :: forall e m t. (MsgEncodingI e, ChatMonad' m, Traversable t) => t (ConnOrGroupId, ChatMsgEvent e) -> m (t (Either ChatError SndMessage))
createSndMessages :: forall e m t. (MsgEncodingI e, ChatMonad' m, Traversable t) => t (ConnOrGroupId, PQEncryption, ChatMsgEvent e) -> m (t (Either ChatError SndMessage))
createSndMessages idsEvents = do
gVar <- asks random
vr <- chatVersionRange
withStoreBatch $ \db -> fmap (uncurry (createMsg db gVar vr)) idsEvents
g <- asks random
ChatConfig {chatVRange = vr} <- asks config
withStoreBatch $ \db -> fmap (createMsg db g vr) idsEvents
where
createMsg db gVar chatVRange connOrGroupId evnt = runExceptT $ do
withExceptT ChatErrorStore $ createNewSndMessage db gVar connOrGroupId evnt (encodeMessage chatVRange evnt)
encodeMessage chatVRange evnt sharedMsgId =
encodeChatMessage ChatMessage {chatVRange, msgId = Just sharedMsgId, chatMsgEvent = evnt}
createMsg :: DB.Connection -> TVar ChaChaDRG -> (PQEncryption -> VersionRangeChat) -> (ConnOrGroupId, PQEncryption, ChatMsgEvent e) -> IO (Either ChatError SndMessage)
createMsg db g vr (connOrGroupId, pqEnc, evnt) = runExceptT $ do
withExceptT ChatErrorStore $ createNewSndMessage db g connOrGroupId evnt encodeMessage
where
encodeMessage sharedMsgId =
encodeChatMessage maxEncodedMsgLength ChatMessage {chatVRange = vr pqEnc, msgId = Just sharedMsgId, chatMsgEvent = evnt}
sendGroupMemberMessages :: forall e m. (MsgEncodingI e, ChatMonad m) => User -> Connection -> NonEmpty (ChatMsgEvent e) -> GroupId -> m ()
sendGroupMemberMessages user conn@Connection {connId} events groupId = do
sendGroupMemberMessages user conn events groupId = do
when (connDisabled conn) $ throwChatError (CEConnectionDisabled conn)
let idsEvts = L.map (GroupId groupId,) events
let idsEvts = L.map (GroupId groupId,PQEncOff,) events
(errs, msgs) <- partitionEithers . L.toList <$> createSndMessages idsEvts
unless (null errs) $ toView $ CRChatErrors (Just user) errs
unless (null msgs) $ do
let (errs', msgBatches) = partitionEithers $ batchMessages maxChatMsgSize msgs
forM_ (L.nonEmpty msgs) $ \msgs' -> do
-- TODO PQ based on version (?)
-- let shouldCompress = False
-- batched <- if shouldCompress then batchSndMessagesBinary msgs' else pure $ batchSndMessagesJSON msgs'
let batched = batchSndMessagesJSON msgs'
let (errs', msgBatches) = partitionEithers batched
-- shouldn't happen, as large messages would have caused createNewSndMessage to throw SELargeMsg
unless (null errs') $ toView $ CRChatErrors (Just user) errs'
forM_ msgBatches $ \batch ->
processBatch batch `catchChatError` (toView . CRChatError (Just user))
where
processBatch :: MsgBatch -> m ()
processBatch (MsgBatch batchBody sndMsgs) = do
(agentMsgId, _pqEnc) <- withAgent $ \a -> sendMessage a (aConnId conn) CR.PQEncOff MsgFlags {notification = True} batchBody
let sndMsgDelivery = SndMsgDelivery {connId, agentMsgId}
void . withStoreBatch' $ \db -> map (\SndMessage {msgId} -> createSndMsgDelivery db sndMsgDelivery msgId) sndMsgs
processSndMessageBatch conn batch `catchChatError` (toView . CRChatError (Just user))
processSndMessageBatch :: ChatMonad m => Connection -> MsgBatch -> m ()
processSndMessageBatch conn@Connection {connId} (MsgBatch batchBody sndMsgs) = do
(agentMsgId, _pqEnc) <- withAgent $ \a -> sendMessage a (aConnId conn) CR.PQEncOff MsgFlags {notification = True} batchBody
let sndMsgDelivery = SndMsgDelivery {connId, agentMsgId}
void . withStoreBatch' $ \db -> map (\SndMessage {msgId} -> createSndMsgDelivery db sndMsgDelivery msgId) sndMsgs
batchSndMessagesJSON :: NonEmpty SndMessage -> [Either ChatError MsgBatch]
batchSndMessagesJSON = batchMessages (maxEncodedMsgLength PQEncOff) . L.toList
-- batchSndMessagesBinary :: forall m. ChatMonad m => NonEmpty SndMessage -> m [Either ChatError MsgBatch]
-- batchSndMessagesBinary msgs = do
-- compressed <- liftIO $ withCompressCtx maxChatMsgSize $ \cctx -> mapM (compressForBatch cctx) msgs
-- pure . map toMsgBatch . SMP.batchTransmissions_ (maxEncodedMsgLength PQEncOff) $ L.zip compressed msgs
-- where
-- compressForBatch cctx SndMessage {msgBody} = bimap (const TELargeMsg) smpEncode <$> compress cctx msgBody
-- toMsgBatch :: SMP.TransportBatch SndMessage -> Either ChatError MsgBatch
-- toMsgBatch = \case
-- SMP.TBTransmissions combined _n sms -> Right $ MsgBatch (markCompressedBatch combined) sms
-- SMP.TBError tbe SndMessage {msgId} -> Left . ChatError $ CEInternalError (show tbe <> " " <> show msgId)
-- SMP.TBTransmission {} -> Left . ChatError $ CEInternalError "batchTransmissions_ didn't produce a batch"
directMessage :: (MsgEncodingI e, ChatMonad m) => ChatMsgEvent e -> m ByteString
directMessage chatMsgEvent = do
chatVRange <- chatVersionRange
let r = encodeChatMessage ChatMessage {chatVRange, msgId = Nothing, chatMsgEvent}
directMessage = directMessagePQ PQEncOff maxConnInfoLength
-- TODO PQ check size after compression (in compressedBatchMsgBody_ ?)
directMessagePQ :: (MsgEncodingI e, ChatMonad m) => CR.PQEncryption -> (CR.PQEncryption -> Int) -> ChatMsgEvent e -> m ByteString
directMessagePQ pqEnc maxMsgSize chatMsgEvent = do
chatVRange <- chatVersionRange pqEnc
let shouldCompress = maxVersion chatVRange >= compressedBatchingVersion
r = encodeChatMessage maxMsgSize ChatMessage {chatVRange, msgId = Nothing, chatMsgEvent}
case r of
ECMEncoded encodedBody -> pure encodedBody
ECMEncoded encodedBody
| shouldCompress -> compressedBatchMsgBody encodedBody
| otherwise -> pure encodedBody
ECMLarge -> throwChatError $ CEException "large message"
where
compressedBatchMsgBody msgBody =
liftEitherError (ChatError . CEException . mappend "compressedBatchMsgBody: ") $
withCompressCtx (B.length msgBody) (`compressedBatchMsgBody_` msgBody)
deliverMessage :: ChatMonad m => Connection -> CR.PQEncryption -> CMEventTag e -> MsgBody -> MessageId -> m (Int64, CR.PQEncryption)
deliverMessage conn pqEnc cmEventTag msgBody msgId = do
@@ -6065,8 +6114,8 @@ deliverMessage conn pqEnc cmEventTag msgBody msgId = do
deliverMessage' :: ChatMonad m => Connection -> CR.PQEncryption -> MsgFlags -> MsgBody -> MessageId -> m (Int64, CR.PQEncryption)
deliverMessage' conn pqEnc msgFlags msgBody msgId =
deliverMessages [(conn, pqEnc, msgFlags, msgBody, msgId)] >>= \case
[r] -> liftEither r
deliverMessages ((conn, pqEnc, msgFlags, msgBody, msgId) :| []) >>= \case
r :| [] -> liftEither r
rs -> throwChatError $ CEInternalError $ "deliverMessage: expected 1 result, got " <> show (length rs)
type MsgReq = (Connection, CR.PQEncryption, MsgFlags, MsgBody, MessageId)
@@ -6076,15 +6125,23 @@ contactPQEnc Connection {enablePQ = enablePQConn} = do
enablePQ <- readTVarIO =<< asks pqExperimentalEnabled
pure $ CR.PQEncryption $ enablePQ && enablePQConn
deliverMessages :: ChatMonad' m => [MsgReq] -> m [Either ChatError (Int64, CR.PQEncryption)]
deliverMessages = deliverMessagesB . map Right
deliverMessages :: ChatMonad' m => NonEmpty MsgReq -> m (NonEmpty (Either ChatError (Int64, CR.PQEncryption)))
deliverMessages msgs = deliverMessagesB $ L.map Right msgs
deliverMessagesB :: ChatMonad' m => [Either ChatError MsgReq] -> m [Either ChatError (Int64, CR.PQEncryption)]
deliverMessagesB :: ChatMonad' m => NonEmpty (Either ChatError MsgReq) -> m (NonEmpty (Either ChatError (Int64, CR.PQEncryption)))
deliverMessagesB msgReqs = do
sent <- zipWith prepareBatch msgReqs <$> withAgent' (\a -> sendMessagesB a $ map toAgent msgReqs)
void $ withStoreBatch' $ \db -> map (updatePQSndEnabled db) (rights sent)
withStoreBatch $ \db -> map (bindRight $ createDelivery db) sent
msgReqs' <- compressBodies
sent <- L.zipWith prepareBatch msgReqs' <$> withAgent' (`sendMessagesB` L.map toAgent msgReqs')
void $ withStoreBatch' $ \db -> map (updatePQSndEnabled db) (rights . L.toList $ sent)
withStoreBatch $ \db -> L.map (bindRight $ createDelivery db) sent
where
compressBodies = liftIO $ withCompressCtx maxRawMsgLength $ \cctx ->
forM msgReqs $ \case
mr@(Right (conn, pqEnc, msgFlags, msgBody, msgId))
| pqEnc == CR.PQEncOn -> do
bimap (ChatError . CEException) (\cBody -> (conn, pqEnc, msgFlags, cBody, msgId)) <$> compressedBatchMsgBody_ cctx msgBody
| otherwise -> pure mr
skip -> pure skip
toAgent = \case
Right (conn, pqEnc, msgFlags, msgBody, _msgId) -> Right (aConnId conn, pqEnc, msgFlags, msgBody)
Left _ce -> Left (AP.INTERNAL "ChatError, skip") -- as long as it is Left, the agent batchers should just step over it
@@ -6129,12 +6186,12 @@ sendGroupMessage user gInfo members chatMsgEvent = do
sendGroupMessage' :: (MsgEncodingI e, ChatMonad m) => User -> GroupInfo -> [GroupMember] -> ChatMsgEvent e -> m (SndMessage, [GroupMember])
sendGroupMessage' user GroupInfo {groupId} members chatMsgEvent = do
msg@SndMessage {msgId, msgBody} <- createSndMessage chatMsgEvent (GroupId groupId)
msg@SndMessage {msgId, msgBody} <- createSndMessage chatMsgEvent (GroupId groupId) PQEncOff
recipientMembers <- liftIO $ shuffleMembers (filter memberCurrent members)
let msgFlags = MsgFlags {notification = hasNotification $ toCMEventTag chatMsgEvent}
(toSend, pending) = foldr addMember ([], []) recipientMembers
msgReqs = map (\(_, conn) -> (conn, CR.PQEncOff, msgFlags, msgBody, msgId)) toSend
delivered <- deliverMessages msgReqs
delivered <- maybe (pure []) (fmap L.toList . deliverMessages) $ L.nonEmpty msgReqs
let errors = lefts delivered
unless (null errors) $ toView $ CRChatErrors (Just user) errors
stored <- withStoreBatch' $ \db -> map (\m -> createPendingGroupMessage db (groupMemberId' m) msgId Nothing) pending
@@ -6187,7 +6244,7 @@ memberSendAction chatMsgEvent members m@GroupMember {invitedByGroupMemberId} = c
sendGroupMemberMessage :: forall e m. (MsgEncodingI e, ChatMonad m) => User -> GroupMember -> ChatMsgEvent e -> Int64 -> Maybe Int64 -> m () -> m ()
sendGroupMemberMessage user m@GroupMember {groupMemberId} chatMsgEvent groupId introId_ postDeliver = do
msg <- createSndMessage chatMsgEvent (GroupId groupId)
msg <- createSndMessage chatMsgEvent (GroupId groupId) PQEncOff
messageMember msg `catchChatError` (\e -> toView (CRChatError (Just user) e))
where
messageMember :: SndMessage -> m ()
@@ -6359,16 +6416,16 @@ joinAgentConnectionAsync user enableNtfs cReqUri cInfo subMode = do
pure (cmdId, connId)
allowAgentConnectionAsync :: (MsgEncodingI e, ChatMonad m) => User -> Connection -> ConfirmationId -> ChatMsgEvent e -> m ()
allowAgentConnectionAsync user conn@Connection {connId} confId msg = do
allowAgentConnectionAsync user conn@Connection {connId, enablePQ} confId msg = do
cmdId <- withStore' $ \db -> createCommand db user (Just connId) CFAllowConn
dm <- directMessage msg
dm <- directMessagePQ (CR.PQEncryption enablePQ) maxConnInfoLength msg
withAgent $ \a -> allowConnectionAsync a (aCorrId cmdId) (aConnId conn) confId dm
withStore' $ \db -> updateConnectionStatus db conn ConnAccepted
agentAcceptContactAsync :: (MsgEncodingI e, ChatMonad m) => User -> Bool -> InvitationId -> ChatMsgEvent e -> SubscriptionMode -> CR.PQEncryption -> m (CommandId, ConnId)
agentAcceptContactAsync user enableNtfs invId msg subMode pqEnc = do
cmdId <- withStore' $ \db -> createCommand db user Nothing CFAcceptContact
dm <- directMessage msg
dm <- directMessagePQ pqEnc maxConnInfoLength msg
connId <- withAgent $ \a -> acceptContactAsync a (aCorrId cmdId) enableNtfs invId dm pqEnc subMode
pure (cmdId, connId)
@@ -6603,10 +6660,10 @@ waitChatStartedAndActivated = do
activated <- readTVar chatActivated
unless (isJust started && activated) retry
chatVersionRange :: ChatMonad' m => m VersionRangeChat
chatVersionRange = do
chatVersionRange :: ChatMonad' m => CR.PQEncryption -> m VersionRangeChat
chatVersionRange pqEnc = do
ChatConfig {chatVRange} <- asks config
pure chatVRange
pure $ chatVRange pqEnc
chatCommandP :: Parser ChatCommand
chatCommandP =