From 8d7dcb550a9c3412e21da7a64b4c4b0119ec735e Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Mon, 15 Jan 2024 10:46:13 +0000 Subject: [PATCH] core: update simplexmq, optimize batching, remove builder (#3685) * core: update simplexmq (optimize batching, remove builder) * do not use builder to batch * refactor --- cabal.project | 2 +- scripts/nix/sha256map.nix | 2 +- src/Simplex/Chat.hs | 16 +++++------ src/Simplex/Chat/Messages.hs | 7 ++--- src/Simplex/Chat/Messages/Batch.hs | 43 ++++++++++++++---------------- src/Simplex/Chat/Protocol.hs | 12 ++++----- tests/MessageBatching.hs | 16 +++++------ tests/ProtocolTests.hs | 3 +-- 8 files changed, 45 insertions(+), 56 deletions(-) diff --git a/cabal.project b/cabal.project index 7f4db12bb1..80d7abe08a 100644 --- a/cabal.project +++ b/cabal.project @@ -14,7 +14,7 @@ constraints: zip +disable-bzip2 +disable-zstd source-repository-package type: git location: https://github.com/simplex-chat/simplexmq.git - tag: ad8cd1d5154617663065652b45c784ad5a0a584d + tag: aee90884175a3092828be1f0be2fc702c69bc101 source-repository-package type: git diff --git a/scripts/nix/sha256map.nix b/scripts/nix/sha256map.nix index 6a6e4ec11d..dc3a7e293f 100644 --- a/scripts/nix/sha256map.nix +++ b/scripts/nix/sha256map.nix @@ -1,5 +1,5 @@ { - "https://github.com/simplex-chat/simplexmq.git"."ad8cd1d5154617663065652b45c784ad5a0a584d" = "19sinz1gynab776x8h9va7r6ifm9pmgzljsbc7z5cbkcnjl5sfh3"; + "https://github.com/simplex-chat/simplexmq.git"."aee90884175a3092828be1f0be2fc702c69bc101" = "0ca5xzcpria481jhl9nlazvjljg3wwfkzzd2x6h4lxql2wbdnlx6"; "https://github.com/simplex-chat/hs-socks.git"."a30cc7a79a08d8108316094f8f2f82a0c5e1ac51" = "0yasvnr7g91k76mjkamvzab2kvlb1g5pspjyjn2fr6v83swjhj38"; "https://github.com/simplex-chat/direct-sqlcipher.git"."f814ee68b16a9447fbb467ccc8f29bdd3546bfd9" = "1ql13f4kfwkbaq7nygkxgw84213i0zm7c1a8hwvramayxl38dq5d"; "https://github.com/simplex-chat/sqlcipher-simple.git"."a46bd361a19376c5211f1058908fc0ae6bf42446" = "1z0r78d8f0812kxbgsm735qf6xx8lvaz27k1a0b4a2m0sshpd5gl"; diff --git a/src/Simplex/Chat.hs b/src/Simplex/Chat.hs index 3d25ba7cf3..820b88fdaa 100644 --- a/src/Simplex/Chat.hs +++ b/src/Simplex/Chat.hs @@ -29,7 +29,6 @@ import Data.Bifunctor (bimap, first) import Data.ByteArray (ScrubbedBytes) import qualified Data.ByteArray as BA import qualified Data.ByteString.Base64 as B64 -import Data.ByteString.Builder (toLazyByteString) import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import qualified Data.ByteString.Lazy.Char8 as LB @@ -5656,8 +5655,7 @@ sendGroupMemberMessages user conn@Connection {connId} events groupId = do processBatch batch `catchChatError` (toView . CRChatError (Just user)) where processBatch :: MsgBatch -> m () - processBatch (MsgBatch builder sndMsgs) = do - let batchBody = LB.toStrict $ toLazyByteString builder + processBatch (MsgBatch batchBody sndMsgs) = do agentMsgId <- withAgent $ \a -> sendMessage a (aConnId conn) MsgFlags {notification = True} batchBody let sndMsgDelivery = SndMsgDelivery {connId, agentMsgId} void . withStoreBatch' $ \db -> map (\SndMessage {msgId} -> createSndMsgDelivery db sndMsgDelivery msgId) sndMsgs @@ -5677,28 +5675,28 @@ directMessage chatMsgEvent = do chatVRange <- chatVersionRange let r = encodeChatMessage ChatMessage {chatVRange, msgId = Nothing, chatMsgEvent} case r of - ECMEncoded encodedBody -> pure . LB.toStrict $ encodedBody + ECMEncoded encodedBody -> pure encodedBody ECMLarge -> throwChatError $ CEException "large message" -deliverMessage :: ChatMonad m => Connection -> CMEventTag e -> LazyMsgBody -> MessageId -> m Int64 +deliverMessage :: ChatMonad m => Connection -> CMEventTag e -> MsgBody -> MessageId -> m Int64 deliverMessage conn cmEventTag msgBody msgId = do let msgFlags = MsgFlags {notification = hasNotification cmEventTag} deliverMessage' conn msgFlags msgBody msgId -deliverMessage' :: ChatMonad m => Connection -> MsgFlags -> LazyMsgBody -> MessageId -> m Int64 +deliverMessage' :: ChatMonad m => Connection -> MsgFlags -> MsgBody -> MessageId -> m Int64 deliverMessage' conn msgFlags msgBody msgId = deliverMessages [(conn, msgFlags, msgBody, msgId)] >>= \case [r] -> liftEither r rs -> throwChatError $ CEInternalError $ "deliverMessage: expected 1 result, got " <> show (length rs) -deliverMessages :: ChatMonad' m => [(Connection, MsgFlags, LazyMsgBody, MessageId)] -> m [Either ChatError Int64] +deliverMessages :: ChatMonad' m => [(Connection, MsgFlags, MsgBody, MessageId)] -> m [Either ChatError Int64] deliverMessages msgReqs = do sent <- zipWith prepareBatch msgReqs <$> withAgent' (`sendMessages` aReqs) withStoreBatch $ \db -> map (bindRight $ createDelivery db) sent where - aReqs = map (\(conn, msgFlags, msgBody, _msgId) -> (aConnId conn, msgFlags, LB.toStrict msgBody)) msgReqs + aReqs = map (\(conn, msgFlags, msgBody, _msgId) -> (aConnId conn, msgFlags, msgBody)) msgReqs prepareBatch req = bimap (`ChatErrorAgent` Nothing) (req,) - createDelivery :: DB.Connection -> ((Connection, MsgFlags, LazyMsgBody, MessageId), AgentMsgId) -> IO (Either ChatError Int64) + createDelivery :: DB.Connection -> ((Connection, MsgFlags, MsgBody, MessageId), AgentMsgId) -> IO (Either ChatError Int64) createDelivery db ((Connection {connId}, _, _, msgId), agentMsgId) = Right <$> createSndMsgDelivery db (SndMsgDelivery {connId, agentMsgId}) msgId diff --git a/src/Simplex/Chat/Messages.hs b/src/Simplex/Chat/Messages.hs index 74b41dc9f2..993cc1eea3 100644 --- a/src/Simplex/Chat/Messages.hs +++ b/src/Simplex/Chat/Messages.hs @@ -22,7 +22,6 @@ import qualified Data.Aeson.Encoding as JE import qualified Data.Aeson.TH as JQ import qualified Data.Attoparsec.ByteString.Char8 as A import qualified Data.ByteString.Base64 as B64 -import qualified Data.ByteString.Lazy as L import qualified Data.ByteString.Lazy.Char8 as LB import Data.Char (isSpace) import Data.Int (Int64) @@ -764,12 +763,10 @@ checkChatType x = case testEquality (chatTypeI @c) (chatTypeI @c') of Just Refl -> Right x Nothing -> Left "bad chat type" -type LazyMsgBody = L.ByteString - data SndMessage = SndMessage { msgId :: MessageId, sharedMsgId :: SharedMsgId, - msgBody :: LazyMsgBody + msgBody :: MsgBody } deriving (Show) @@ -791,7 +788,7 @@ data RcvMessage = RcvMessage data PendingGroupMessage = PendingGroupMessage { msgId :: MessageId, cmEventTag :: ACMEventTag, - msgBody :: LazyMsgBody, + msgBody :: MsgBody, introId_ :: Maybe Int64 } diff --git a/src/Simplex/Chat/Messages/Batch.hs b/src/Simplex/Chat/Messages/Batch.hs index 8b06873a33..dc2c9c3865 100644 --- a/src/Simplex/Chat/Messages/Batch.hs +++ b/src/Simplex/Chat/Messages/Batch.hs @@ -1,6 +1,7 @@ {-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ScopedTypeVariables #-} module Simplex.Chat.Messages.Batch @@ -9,33 +10,29 @@ module Simplex.Chat.Messages.Batch ) where -import Data.ByteString.Builder (Builder, charUtf8, lazyByteString) -import qualified Data.ByteString.Lazy as LB -import Data.Int (Int64) +import Data.ByteString.Char8 (ByteString) +import qualified Data.ByteString.Char8 as B import Simplex.Chat.Controller (ChatError (..), ChatErrorType (..)) import Simplex.Chat.Messages -data MsgBatch = MsgBatch Builder [SndMessage] +data MsgBatch = MsgBatch ByteString [SndMessage] deriving (Show) --- | Batches [SndMessage] into batches of ByteString builders in form of JSON arrays. +-- | Batches [SndMessage] into batches of ByteStrings in form of JSON arrays. -- Does not check if the resulting batch is a valid JSON. -- If a single element is passed, it is returned as is (a JSON string). -- If an element exceeds maxLen, it is returned as ChatError. -batchMessages :: Int64 -> [SndMessage] -> [Either ChatError MsgBatch] -batchMessages maxLen msgs = - let (batches, batch, _, n) = foldr addToBatch ([], [], 0, 0) msgs - in if n == 0 then batches else msgBatch batch : batches +batchMessages :: Int -> [SndMessage] -> [Either ChatError MsgBatch] +batchMessages maxLen = addBatch . foldr addToBatch ([], [], 0, 0) where msgBatch batch = Right (MsgBatch (encodeMessages batch) batch) - addToBatch :: SndMessage -> ([Either ChatError MsgBatch], [SndMessage], Int64, Int) -> ([Either ChatError MsgBatch], [SndMessage], Int64, Int) - addToBatch msg@SndMessage {msgBody} (batches, batch, len, n) + addToBatch :: SndMessage -> ([Either ChatError MsgBatch], [SndMessage], Int, Int) -> ([Either ChatError MsgBatch], [SndMessage], Int, Int) + addToBatch msg@SndMessage {msgBody} acc@(batches, batch, len, n) | batchLen <= maxLen = (batches, msg : batch, len', n + 1) - | msgLen <= maxLen = (batches', [msg], msgLen, 1) - | otherwise = (errLarge msg : (if n == 0 then batches else batches'), [], 0, 0) + | msgLen <= maxLen = (addBatch acc, [msg], msgLen, 1) + | otherwise = (errLarge msg : addBatch acc, [], 0, 0) where - msgLen = LB.length msgBody - batches' = msgBatch batch : batches + msgLen = B.length msgBody len' | n == 0 = msgLen | otherwise = msgLen + len + 1 -- 1 accounts for comma @@ -43,11 +40,11 @@ batchMessages maxLen msgs = | n == 0 = len' | otherwise = len' + 2 -- 2 accounts for opening and closing brackets errLarge SndMessage {msgId} = Left $ ChatError $ CEInternalError ("large message " <> show msgId) - -encodeMessages :: [SndMessage] -> Builder -encodeMessages = \case - [] -> mempty - [msg] -> encodeMsg msg - (msg : msgs) -> charUtf8 '[' <> encodeMsg msg <> mconcat [charUtf8 ',' <> encodeMsg msg' | msg' <- msgs] <> charUtf8 ']' - where - encodeMsg SndMessage {msgBody} = lazyByteString msgBody + addBatch :: ([Either ChatError MsgBatch], [SndMessage], Int, Int) -> [Either ChatError MsgBatch] + addBatch (batches, batch, _, n) = if n == 0 then batches else msgBatch batch : batches + encodeMessages :: [SndMessage] -> ByteString + encodeMessages = \case + [] -> mempty + [msg] -> body msg + msgs -> B.concat ["[", B.intercalate "," (map body msgs), "]"] + body SndMessage {msgBody} = msgBody diff --git a/src/Simplex/Chat/Protocol.hs b/src/Simplex/Chat/Protocol.hs index a0df9f9865..2aa360b4b1 100644 --- a/src/Simplex/Chat/Protocol.hs +++ b/src/Simplex/Chat/Protocol.hs @@ -29,9 +29,7 @@ import qualified Data.Attoparsec.ByteString.Char8 as A import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import Data.ByteString.Internal (c2w, w2c) -import qualified Data.ByteString.Lazy as L import qualified Data.ByteString.Lazy.Char8 as LB -import Data.Int (Int64) import Data.Maybe (fromMaybe) import Data.String import Data.Text (Text) @@ -491,20 +489,20 @@ $(JQ.deriveJSON defaultJSON ''QuotedMsg) -- this limit reserves space for metadata in forwarded messages -- 15780 (limit used for fileChunkSize) - 161 (x.grp.msg.forward overhead) = 15619, round to 15610 -maxChatMsgSize :: Int64 +maxChatMsgSize :: Int maxChatMsgSize = 15610 -data EncodedChatMessage = ECMEncoded L.ByteString | ECMLarge +data EncodedChatMessage = ECMEncoded ByteString | ECMLarge encodeChatMessage :: MsgEncodingI e => ChatMessage e -> EncodedChatMessage encodeChatMessage msg = do case chatToAppMessage msg of AMJson m -> do - let body = J.encode m - if LB.length body > maxChatMsgSize + let body = LB.toStrict $ J.encode m + if B.length body > maxChatMsgSize then ECMLarge else ECMEncoded body - AMBinary m -> ECMEncoded . LB.fromStrict $ strEncode m + AMBinary m -> ECMEncoded $ strEncode m parseChatMessages :: ByteString -> [Either String AChatMessage] parseChatMessages "" = [Left "empty string"] diff --git a/tests/MessageBatching.hs b/tests/MessageBatching.hs index 809a4e3bd7..1a9d968718 100644 --- a/tests/MessageBatching.hs +++ b/tests/MessageBatching.hs @@ -7,8 +7,8 @@ module MessageBatching (batchingTests) where import Crypto.Number.Serialize (os2ip) -import Data.ByteString.Builder (toLazyByteString) -import qualified Data.ByteString.Lazy as LB +import Data.ByteString (ByteString) +import qualified Data.ByteString as B import Data.Either (partitionEithers) import Data.Int (Int64) import Data.String (IsString (..)) @@ -26,7 +26,7 @@ batchingTests = describe "message batching tests" $ do it "image x.msg.new and x.msg.file.descr should fit into single batch" testImageFitsSingleBatch instance IsString SndMessage where - fromString s = SndMessage {msgId, sharedMsgId = SharedMsgId "", msgBody = LB.fromStrict s'} + fromString s = SndMessage {msgId, sharedMsgId = SharedMsgId "", msgBody = s'} where s' = encodeUtf8 $ T.pack s msgId = fromInteger $ os2ip s' @@ -94,14 +94,14 @@ testImageFitsSingleBatch = do -- 261_120 bytes (MAX_IMAGE_SIZE in UI), rounded up, example was 743 let descrRoundedSize = 800 - let xMsgNewStr = LB.replicate xMsgNewRoundedSize 1 - descrStr = LB.replicate descrRoundedSize 2 + let xMsgNewStr = B.replicate xMsgNewRoundedSize 1 + descrStr = B.replicate descrRoundedSize 2 msg s = SndMessage {msgId = 0, sharedMsgId = SharedMsgId "", msgBody = s} batched = "[" <> xMsgNewStr <> "," <> descrStr <> "]" runBatcherTest' maxChatMsgSize [msg xMsgNewStr, msg descrStr] [] [batched] -runBatcherTest :: Int64 -> [SndMessage] -> [ChatError] -> [LB.ByteString] -> Spec +runBatcherTest :: Int -> [SndMessage] -> [ChatError] -> [ByteString] -> Spec runBatcherTest maxLen msgs expectedErrors expectedBatches = it ( (show (map (\SndMessage {msgBody} -> msgBody) msgs) <> ", limit " <> show maxLen <> ": should return ") @@ -110,10 +110,10 @@ runBatcherTest maxLen msgs expectedErrors expectedBatches = ) (runBatcherTest' maxLen msgs expectedErrors expectedBatches) -runBatcherTest' :: Int64 -> [SndMessage] -> [ChatError] -> [LB.ByteString] -> IO () +runBatcherTest' :: Int -> [SndMessage] -> [ChatError] -> [ByteString] -> IO () runBatcherTest' maxLen msgs expectedErrors expectedBatches = do let (errors, batches) = partitionEithers $ batchMessages maxLen msgs - batchedStrs = map (\(MsgBatch builder _) -> toLazyByteString builder) batches + batchedStrs = map (\(MsgBatch batchBody _) -> batchBody) batches testErrors errors `shouldBe` testErrors expectedErrors batchedStrs `shouldBe` expectedBatches where diff --git a/tests/ProtocolTests.hs b/tests/ProtocolTests.hs index 23fbce249e..f848aed6bd 100644 --- a/tests/ProtocolTests.hs +++ b/tests/ProtocolTests.hs @@ -7,7 +7,6 @@ module ProtocolTests where import qualified Data.Aeson as J import Data.ByteString.Char8 (ByteString) -import qualified Data.ByteString.Lazy.Char8 as LB import Data.Time.Clock.System (SystemTime (..), systemToUTCTime) import Simplex.Chat.Protocol import Simplex.Chat.Types @@ -74,7 +73,7 @@ s ##== msg = do let r = encodeChatMessage msg case r of ECMEncoded encodedBody -> - J.eitherDecodeStrict' (LB.toStrict encodedBody) + J.eitherDecodeStrict' encodedBody `shouldBe` (J.eitherDecodeStrict' s :: Either String J.Value) ECMLarge -> expectationFailure $ "large message"