core (pq): don't compress if message fits without compression; check compressed message fits size limit (#3888)

* core (pq): don't compress if message fits without compression; check compressed message fits size limit

* refactor

* errors

* fix tests

* envelope sizes

* refactor

* comment

* more flexible test

* refactor, comment

---------

Co-authored-by: Evgeny Poberezkin <evgeny@poberezkin.com>
This commit is contained in:
spaced4ndy
2024-03-11 02:54:55 +04:00
committed by GitHub
parent 49bd866c4b
commit 56fcaf514e
7 changed files with 57 additions and 57 deletions
+28 -20
View File
@@ -3350,7 +3350,8 @@ processAgentMsgSndFile _corrId aFileId msg =
[] -> case xftpRedirectFor of
Nothing -> xftpSndFileRedirect user fileId rfd >>= toView . CRSndFileRedirectStartXFTP user ft
Just _ -> sendFileError "Prohibit chaining redirects" fileId vr ft
rfds' -> do -- we have 1 chunk - use it as URI whether it is redirect or not
rfds' -> do
-- we have 1 chunk - use it as URI whether it is redirect or not
ft' <- maybe (pure ft) (\fId -> withStore $ \db -> getFileTransferMeta db user fId) xftpRedirectFor
toView $ CRSndStandaloneFileComplete user ft' $ map (decodeLatin1 . strEncode . FD.fileDescriptionURI) rfds'
Just (AChatItem _ d cInfo _ci@ChatItem {meta = CIMeta {itemSharedMsgId = msgId_, itemDeleted}}) ->
@@ -6057,8 +6058,7 @@ sendDirectContactMessage user ct chatMsgEvent = do
conn@Connection {connId, pqSupport} <- liftEither $ contactSendConn_ ct
r <- sendDirectMessage_ conn pqSupport chatMsgEvent (ConnectionId connId)
let (sndMessage, msgDeliveryId, pqEnc') = r
-- TODO PQ use updated ct' and conn'? check downstream if it may affect something, maybe it's not necessary
void $ createContactPQSndItem user ct conn pqEnc' -- (_ct', _conn')
void $ createContactPQSndItem user ct conn pqEnc'
pure (sndMessage, msgDeliveryId)
contactSendConn_ :: Contact -> Either ChatError Connection
@@ -6127,12 +6127,12 @@ processSndMessageBatch conn@Connection {connId} (MsgBatch batchBody sndMsgs) = d
-- TODO v5.7 update batching for groups
batchSndMessagesJSON :: NonEmpty SndMessage -> [Either ChatError MsgBatch]
batchSndMessagesJSON = batchMessages maxRawMsgLength . L.toList
batchSndMessagesJSON = batchMessages maxEncodedMsgLength . L.toList
-- batchSndMessagesBinary :: forall m. ChatMonad m => NonEmpty SndMessage -> m [Either ChatError MsgBatch]
-- batchSndMessagesBinary msgs = do
-- compressed <- liftIO $ withCompressCtx maxChatMsgSize $ \cctx -> mapM (compressForBatch cctx) msgs
-- pure . map toMsgBatch . SMP.batchTransmissions_ (maxEncodedMsgLength PQEncOff) $ L.zip compressed msgs
-- pure . map toMsgBatch . SMP.batchTransmissions_ (maxEncodedMsgLength) $ L.zip compressed msgs
-- where
-- compressForBatch cctx SndMessage {msgBody} = bimap (const TELargeMsg) smpEncode <$> compress cctx msgBody
-- toMsgBatch :: SMP.TransportBatch SndMessage -> Either ChatError MsgBatch
@@ -6146,19 +6146,20 @@ encodeConnInfo chatMsgEvent = do
vr <- chatVersionRange
encodeConnInfoPQ PQSupportOff (maxVersion $ vr PQSupportOff) chatMsgEvent
-- TODO PQ check size after compression (in compressedBatchMsgBody_ ?)
encodeConnInfoPQ :: (MsgEncodingI e, ChatMonad m) => PQSupport -> VersionChat -> ChatMsgEvent e -> m ByteString
encodeConnInfoPQ pqSup v chatMsgEvent = do
vr <- chatVersionRange
let msg = ChatMessage {chatVRange = vr pqSup, msgId = Nothing, chatMsgEvent}
case encodeChatMessage maxConnInfoLength msg of
ECMEncoded encodedBody -> case pqSup of
PQSupportOn | v >= pqEncryptionCompressionVersion -> liftIO $ compressedBatchMsgBody encodedBody
_ -> pure encodedBody
ECMLarge -> throwChatError $ CEException "large message"
where
compressedBatchMsgBody msgBody =
withCompressCtx (toEnum $ B.length msgBody) (`compressedBatchMsgBody_` msgBody)
let info = ChatMessage {chatVRange = vr pqSup, msgId = Nothing, chatMsgEvent}
case encodeChatMessage maxEncodedInfoLength info of
ECMEncoded connInfo -> case pqSup of
PQSupportOn | v >= pqEncryptionCompressionVersion && B.length connInfo > maxCompressedInfoLength -> do
connInfo' <- liftIO compressedBatchMsgBody
when (B.length connInfo' > maxCompressedInfoLength) $ throwChatError $ CEException "large compressed info"
pure connInfo'
_ -> pure connInfo
where
compressedBatchMsgBody = withCompressCtx (toEnum $ B.length connInfo) (`compressedBatchMsgBody_` connInfo)
ECMLarge -> throwChatError $ CEException "large info"
deliverMessage :: ChatMonad m => Connection -> CMEventTag e -> MsgBody -> MessageId -> m (Int64, PQEncryption)
deliverMessage conn cmEventTag msgBody msgId = do
@@ -6183,11 +6184,18 @@ deliverMessagesB msgReqs = do
void $ withStoreBatch' $ \db -> map (updatePQSndEnabled db) (rights . L.toList $ sent)
withStoreBatch $ \db -> L.map (bindRight $ createDelivery db) sent
where
compressBodies = liftIO $ withCompressCtx (toEnum maxRawMsgLength) $ \cctx -> do
forME msgReqs $ \mr@(conn@Connection {pqSupport, connChatVersion}, msgFlags, msgBody, msgId) -> Right <$> case pqSupport of
PQSupportOn | connChatVersion >= pqEncryptionCompressionVersion ->
(\cBody -> (conn, msgFlags, cBody, msgId)) <$> compressedBatchMsgBody_ cctx msgBody
_ -> pure mr
compressBodies = liftIO $ withCompressCtx (toEnum maxEncodedMsgLength) $ \cxt ->
forME msgReqs $ \mr@(conn@Connection {pqSupport, connChatVersion = v}, msgFlags, msgBody, msgId) ->
runExceptT $ case pqSupport of
-- we only compress messages when:
-- 1) PQ support is enabled
-- 2) version is compatible with compression
-- 3) message is longer than max compressed size (as this function is not used for batched messages anyway)
PQSupportOn | v >= pqEncryptionCompressionVersion && B.length msgBody > maxCompressedMsgLength -> do
msgBody' <- liftIO $ compressedBatchMsgBody_ cxt msgBody
when (B.length msgBody' > maxCompressedMsgLength) $ throwError $ ChatError $ CEException "large compressed message"
pure (conn, msgFlags, msgBody', msgId)
_ -> pure mr
toAgent = \case
Right (conn@Connection {pqEncryption}, msgFlags, msgBody, _msgId) -> Right (aConnId conn, pqEncryption, msgFlags, msgBody)
Left _ce -> Left (AP.INTERNAL "ChatError, skip") -- as long as it is Left, the agent batchers should just step over it
+17 -16
View File
@@ -531,29 +531,29 @@ $(JQ.deriveJSON defaultJSON ''QuotedMsg)
-- this limit reserves space for metadata in forwarded messages
-- 15780 (limit used for fileChunkSize) - 161 (x.grp.msg.forward overhead) = 15619, round to 15610
maxRawMsgLength :: Int
maxRawMsgLength = 15610
maxEncodedMsgLength :: Int
maxEncodedMsgLength = 15610
maxEncodedMsgLength :: PQSupport -> Int
maxEncodedMsgLength = \case
PQSupportOn -> 13410 -- reduced by 2200 (original message should be compressed)
PQSupportOff -> maxRawMsgLength
{-# INLINE maxEncodedMsgLength #-}
-- maxEncodedMsgLength - 2222, see e2eEncUserMsgLength in agent
maxCompressedMsgLength :: Int
maxCompressedMsgLength = 13388
maxConnInfoLength :: PQSupport -> Int
maxConnInfoLength = \case
PQSupportOn -> 10902 -- reduced by 3700
PQSupportOff -> 14602 -- 15610 - delta in agent between MSG and INFO
{-# INLINE maxConnInfoLength #-}
-- maxEncodedMsgLength - delta between MSG and INFO + 100 (returned for forward overhead)
-- delta between MSG and INFO = e2eEncUserMsgLength (no PQ) - e2eEncConnInfoLength (no PQ) = 1008
maxEncodedInfoLength :: Int
maxEncodedInfoLength = 14702
maxCompressedInfoLength :: Int
maxCompressedInfoLength = 10976 -- maxEncodedInfoLength - 3726, see e2eEncConnInfoLength in agent
data EncodedChatMessage = ECMEncoded ByteString | ECMLarge
encodeChatMessage :: MsgEncodingI e => (PQSupport -> Int) -> ChatMessage e -> EncodedChatMessage
encodeChatMessage getMaxSize msg = do
encodeChatMessage :: MsgEncodingI e => Int -> ChatMessage e -> EncodedChatMessage
encodeChatMessage maxSize msg = do
case chatToAppMessage msg of
AMJson m -> do
let body = LB.toStrict $ J.encode m
if B.length body > getMaxSize PQSupportOff
if B.length body > maxSize
then ECMLarge
else ECMEncoded body
AMBinary m -> ECMEncoded $ strEncode m
@@ -573,7 +573,8 @@ parseChatMessages s = case B.head s of
decodeCompressed :: ByteString -> [Either String AChatMessage]
decodeCompressed s' = case smpDecode s' of
Left e -> [Left e]
Right compressed -> concatMap (either (pure . Left) parseChatMessages) . L.toList $ decompressBatch maxRawMsgLength compressed
-- TODO v5.7 don't reserve multiple large buffers when decoding batches
Right compressed -> concatMap (either (pure . Left) parseChatMessages) . L.toList $ decompressBatch maxEncodedMsgLength compressed
compressedBatchMsgBody_ :: CompressCtx -> MsgBody -> IO ByteString
compressedBatchMsgBody_ ctx msgBody = markCompressedBatch . smpEncode . (L.:| []) <$> compress ctx msgBody