From 6ba3100d348e23549245e2d435fa9815108584e4 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Wed, 20 Dec 2023 06:38:39 +0000 Subject: [PATCH] core: batch sending messages (#3566) * core: batch sending messages * batch without iorefs (#3573) * one-pass * simplexmq * simplexmq * simplexmq * simplexmq * revert change to ios project file * refactor * simplify --------- Co-authored-by: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com> --- cabal.project | 2 +- package.yaml | 2 +- scripts/nix/sha256map.nix | 2 +- simplex-chat.cabal | 14 ++--- src/Simplex/Chat.hs | 96 +++++++++++++++++++++------------- src/Simplex/Chat/Controller.hs | 15 ++++++ tests/ChatClient.hs | 1 + 7 files changed, 87 insertions(+), 45 deletions(-) diff --git a/cabal.project b/cabal.project index 873035d7ab..e81c21c990 100644 --- a/cabal.project +++ b/cabal.project @@ -14,7 +14,7 @@ constraints: zip +disable-bzip2 +disable-zstd source-repository-package type: git location: https://github.com/simplex-chat/simplexmq.git - tag: 18be2709f59a4cb20fe9758b899622092dba062e + tag: 8c250ebe19f56dd7d53572d984e8016cb0e4d658 source-repository-package type: git diff --git a/package.yaml b/package.yaml index af58ce6729..65f99a7a78 100644 --- a/package.yaml +++ b/package.yaml @@ -45,7 +45,7 @@ dependencies: - sqlcipher-simple == 0.4.* - stm == 2.5.* - terminal == 0.2.* - - time == 1.9.* + - time == 1.12.* - tls >= 1.7.0 && < 1.8 - unliftio == 0.2.* - unliftio-core == 0.2.* diff --git a/scripts/nix/sha256map.nix b/scripts/nix/sha256map.nix index 3733163f49..9f06b66101 100644 --- a/scripts/nix/sha256map.nix +++ b/scripts/nix/sha256map.nix @@ -1,5 +1,5 @@ { - "https://github.com/simplex-chat/simplexmq.git"."18be2709f59a4cb20fe9758b899622092dba062e" = "08dr4vyg1wz2z768iikg8fks5zqf4dw5myr87hbpv964idda3pmj"; + "https://github.com/simplex-chat/simplexmq.git"."8c250ebe19f56dd7d53572d984e8016cb0e4d658" = "080rw86yncf1h3zr5a8y65cndihq6f3ji43vxrdhr2mrb75vmw8m"; "https://github.com/simplex-chat/hs-socks.git"."a30cc7a79a08d8108316094f8f2f82a0c5e1ac51" = "0yasvnr7g91k76mjkamvzab2kvlb1g5pspjyjn2fr6v83swjhj38"; "https://github.com/simplex-chat/direct-sqlcipher.git"."f814ee68b16a9447fbb467ccc8f29bdd3546bfd9" = "1ql13f4kfwkbaq7nygkxgw84213i0zm7c1a8hwvramayxl38dq5d"; "https://github.com/simplex-chat/sqlcipher-simple.git"."a46bd361a19376c5211f1058908fc0ae6bf42446" = "1z0r78d8f0812kxbgsm735qf6xx8lvaz27k1a0b4a2m0sshpd5gl"; diff --git a/simplex-chat.cabal b/simplex-chat.cabal index f3918dfecd..6462d26008 100644 --- a/simplex-chat.cabal +++ b/simplex-chat.cabal @@ -199,7 +199,7 @@ library , sqlcipher-simple ==0.4.* , stm ==2.5.* , terminal ==0.2.* - , time ==1.9.* + , time ==1.12.* , tls >=1.7.0 && <1.8 , unliftio ==0.2.* , unliftio-core ==0.2.* @@ -259,7 +259,7 @@ executable simplex-bot , sqlcipher-simple ==0.4.* , stm ==2.5.* , terminal ==0.2.* - , time ==1.9.* + , time ==1.12.* , tls >=1.7.0 && <1.8 , unliftio ==0.2.* , unliftio-core ==0.2.* @@ -319,7 +319,7 @@ executable simplex-bot-advanced , sqlcipher-simple ==0.4.* , stm ==2.5.* , terminal ==0.2.* - , time ==1.9.* + , time ==1.12.* , tls >=1.7.0 && <1.8 , unliftio ==0.2.* , unliftio-core ==0.2.* @@ -381,7 +381,7 @@ executable simplex-broadcast-bot , sqlcipher-simple ==0.4.* , stm ==2.5.* , terminal ==0.2.* - , time ==1.9.* + , time ==1.12.* , tls >=1.7.0 && <1.8 , unliftio ==0.2.* , unliftio-core ==0.2.* @@ -442,7 +442,7 @@ executable simplex-chat , sqlcipher-simple ==0.4.* , stm ==2.5.* , terminal ==0.2.* - , time ==1.9.* + , time ==1.12.* , tls >=1.7.0 && <1.8 , unliftio ==0.2.* , unliftio-core ==0.2.* @@ -508,7 +508,7 @@ executable simplex-directory-service , sqlcipher-simple ==0.4.* , stm ==2.5.* , terminal ==0.2.* - , time ==1.9.* + , time ==1.12.* , tls >=1.7.0 && <1.8 , unliftio ==0.2.* , unliftio-core ==0.2.* @@ -602,7 +602,7 @@ test-suite simplex-chat-test , sqlcipher-simple ==0.4.* , stm ==2.5.* , terminal ==0.2.* - , time ==1.9.* + , time ==1.12.* , tls >=1.7.0 && <1.8 , unliftio ==0.2.* , unliftio-core ==0.2.* diff --git a/src/Simplex/Chat.hs b/src/Simplex/Chat.hs index 6b619c5bd6..4e7a1cab9a 100644 --- a/src/Simplex/Chat.hs +++ b/src/Simplex/Chat.hs @@ -35,7 +35,7 @@ import qualified Data.ByteString.Char8 as B import qualified Data.ByteString.Lazy.Char8 as LB import Data.Char import Data.Constraint (Dict (..)) -import Data.Either (fromRight, partitionEithers, rights) +import Data.Either (fromRight, lefts, partitionEithers, rights) import Data.Fixed (div') import Data.Functor (($>)) import Data.Int (Int64) @@ -5002,7 +5002,7 @@ processAgentMessageConn user@User {userId} corrId agentConnId agentMessage = do Left _ -> messageError "x.grp.mem.inv error: referenced member does not exist" Right reMember -> do GroupMemberIntro {introId} <- withStore $ \db -> saveIntroInvitation db reMember m introInv - void . sendGroupMessage' user [reMember] (XGrpMemFwd (memberInfo m) introInv) groupId (Just introId) $ + sendGroupMemberMessage user reMember (XGrpMemFwd (memberInfo m) introInv) groupId (Just introId) $ withStore' $ \db -> updateIntroStatus db introId GMIntroInvForwarded _ -> messageError "x.grp.mem.inv can be only sent by invitee member" @@ -5529,46 +5529,62 @@ directMessage chatMsgEvent = do pure $ strEncode ChatMessage {chatVRange, msgId = Nothing, chatMsgEvent} deliverMessage :: ChatMonad m => Connection -> CMEventTag e -> MsgBody -> MessageId -> m Int64 -deliverMessage conn@Connection {connId} cmEventTag msgBody msgId = do - let msgFlags = MsgFlags {notification = hasNotification cmEventTag} - agentMsgId <- withAgent $ \a -> sendMessage a (aConnId conn) msgFlags msgBody - let sndMsgDelivery = SndMsgDelivery {connId, agentMsgId} - withStore' $ \db -> createSndMsgDelivery db sndMsgDelivery msgId +deliverMessage conn cmEventTag msgBody msgId = + deliverMessages [(conn, cmEventTag, msgBody, msgId)] >>= \case + [r] -> liftEither r + rs -> throwChatError $ CEInternalError $ "deliverMessage: expected 1 result, got " <> show (length rs) + +deliverMessages :: ChatMonad' m => [(Connection, CMEventTag e, MsgBody, MessageId)] -> m [Either ChatError Int64] +deliverMessages msgReqs = do + sent <- zipWith prepareBatch msgReqs <$> withAgent' (`sendMessages` aReqs) + withStoreBatch $ \db -> map (bindRight $ createDelivery db) sent + where + aReqs = map (\(conn, cmEvTag, msgBody, _msgId) -> (aConnId conn, msgFlags cmEvTag, msgBody)) msgReqs + msgFlags cmEvTag = MsgFlags {notification = hasNotification cmEvTag} + prepareBatch req = bimap (`ChatErrorAgent` Nothing) (req,) + createDelivery :: DB.Connection -> ((Connection, CMEventTag e, MsgBody, MessageId), AgentMsgId) -> IO (Either ChatError Int64) + createDelivery db ((Connection {connId}, _, _, msgId), agentMsgId) = + Right <$> createSndMsgDelivery db (SndMsgDelivery {connId, agentMsgId}) msgId sendGroupMessage :: (MsgEncodingI e, ChatMonad m) => User -> GroupInfo -> [GroupMember] -> ChatMsgEvent e -> m (SndMessage, [GroupMember]) -sendGroupMessage user GroupInfo {groupId} members chatMsgEvent = - sendGroupMessage' user members chatMsgEvent groupId Nothing $ pure () - -sendGroupMessage' :: forall e m. (MsgEncodingI e, ChatMonad m) => User -> [GroupMember] -> ChatMsgEvent e -> Int64 -> Maybe Int64 -> m () -> m (SndMessage, [GroupMember]) -sendGroupMessage' user members chatMsgEvent groupId introId_ postDeliver = do - msg <- createSndMessage chatMsgEvent (GroupId groupId) - -- TODO collect failed deliveries into a single error +sendGroupMessage user GroupInfo {groupId} members chatMsgEvent = do + msg@SndMessage {msgId, msgBody} <- createSndMessage chatMsgEvent (GroupId groupId) recipientMembers <- liftIO $ shuffleMembers (filter memberCurrent members) $ \GroupMember {memberRole} -> memberRole - rs <- forM recipientMembers $ \m -> - messageMember m msg `catchChatError` (\e -> toView (CRChatError (Just user) e) $> Nothing) - let sentToMembers = catMaybes rs + let tag = toCMEventTag chatMsgEvent + (toSend, pending) = foldr addMember ([], []) recipientMembers + msgReqs = map (\(_, conn) -> (conn, tag, msgBody, msgId)) toSend + delivered <- deliverMessages msgReqs + let errors = lefts delivered + unless (null errors) $ toView $ CRChatErrors (Just user) errors + stored <- withStoreBatch' $ \db -> map (\m -> createPendingGroupMessage db (groupMemberId' m) msgId Nothing) pending + let sentToMembers = filterSent delivered toSend fst <> filterSent stored pending id pure (msg, sentToMembers) where - messageMember :: GroupMember -> SndMessage -> m (Maybe GroupMember) - messageMember m@GroupMember {groupMemberId} SndMessage {msgId, msgBody} = case memberConn m of - Nothing -> pendingOrForwarded - Just conn@Connection {connStatus} - | connDisabled conn || connStatus == ConnDeleted -> pure Nothing - | connStatus == ConnSndReady || connStatus == ConnReady -> do - let tag = toCMEventTag chatMsgEvent - deliverMessage conn tag msgBody msgId >> postDeliver - pure $ Just m - | otherwise -> pendingOrForwarded + addMember m (toSend, pending) = case memberSendAction chatMsgEvent members m of + Just (MSASend conn) -> ((m, conn) : toSend, pending) + Just MSAPending -> (toSend, m : pending) + Nothing -> (toSend, pending) + filterSent :: [Either ChatError a] -> [mem] -> (mem -> GroupMember) -> [GroupMember] + filterSent rs ms mem = [mem m | (Right _, m) <- zip rs ms] + +data MemberSendAction = MSASend Connection | MSAPending + +memberSendAction :: ChatMsgEvent e -> [GroupMember] -> GroupMember -> Maybe MemberSendAction +memberSendAction chatMsgEvent members m = case memberConn m of + Nothing -> pendingOrForwarded + Just conn@Connection {connStatus} + | connDisabled conn || connStatus == ConnDeleted -> Nothing + | connStatus == ConnSndReady || connStatus == ConnReady -> Just (MSASend conn) + | otherwise -> pendingOrForwarded + where + pendingOrForwarded + | forwardSupported && isForwardedGroupMsg chatMsgEvent = Nothing + | isXGrpMsgForward chatMsgEvent = Nothing + | otherwise = Just MSAPending where - pendingOrForwarded - | forwardSupported && isForwardedGroupMsg chatMsgEvent = pure Nothing - | isXGrpMsgForward chatMsgEvent = pure Nothing - | otherwise = do - withStore' $ \db -> createPendingGroupMessage db groupMemberId msgId introId_ - pure $ Just m - forwardSupported = do + forwardSupported = let mcvr = memberChatVRange' m - isCompatibleRange mcvr groupForwardVRange && invitingMemberSupportsForward + in isCompatibleRange mcvr groupForwardVRange && invitingMemberSupportsForward invitingMemberSupportsForward = case m.invitedByGroupMemberId of Just invMemberId -> -- can be optimized for large groups by replacing [GroupMember] with Map GroupMemberId GroupMember @@ -5582,6 +5598,16 @@ sendGroupMessage' user members chatMsgEvent groupId introId_ postDeliver = do XGrpMsgForward {} -> True _ -> False +sendGroupMemberMessage :: forall e m. (MsgEncodingI e, ChatMonad m) => User -> GroupMember -> ChatMsgEvent e -> Int64 -> Maybe Int64 -> m () -> m () +sendGroupMemberMessage user m@GroupMember {groupMemberId} chatMsgEvent groupId introId_ postDeliver = do + msg <- createSndMessage chatMsgEvent (GroupId groupId) + messageMember msg `catchChatError` (\e -> toView (CRChatError (Just user) e)) + where + messageMember :: SndMessage -> m () + messageMember SndMessage {msgId, msgBody} = forM_ (memberSendAction chatMsgEvent [m] m) $ \case + MSASend conn -> deliverMessage conn (toCMEventTag chatMsgEvent) msgBody msgId >> postDeliver + MSAPending -> withStore' $ \db -> createPendingGroupMessage db groupMemberId msgId introId_ + shuffleMembers :: [a] -> (a -> GroupMemberRole) -> IO [a] shuffleMembers ms role = do let (adminMs, otherMs) = partition ((GRAdmin <=) . role) ms diff --git a/src/Simplex/Chat/Controller.hs b/src/Simplex/Chat/Controller.hs index 8446c15a81..70e0cc64fc 100644 --- a/src/Simplex/Chat/Controller.hs +++ b/src/Simplex/Chat/Controller.hs @@ -84,6 +84,7 @@ import Simplex.RemoteControl.Invitation (RCSignedInvitation, RCVerifiedInvitatio import Simplex.RemoteControl.Types import System.IO (Handle) import System.Mem.Weak (Weak) +import qualified UnliftIO.Exception as E import UnliftIO.STM versionNumber :: String @@ -1287,12 +1288,26 @@ withStoreCtx ctx_ action = do handleInternal :: String -> SomeException -> IO (Either StoreError a) handleInternal ctxStr e = pure . Left . SEInternalError $ show e <> ctxStr +withStoreBatch :: (ChatMonad' m, Traversable t) => (DB.Connection -> t (IO (Either ChatError a))) -> m (t (Either ChatError a)) +withStoreBatch actions = do + ChatController {chatStore} <- ask + liftIO $ withTransaction chatStore $ mapM (`E.catch` handleInternal) . actions + where + handleInternal :: E.SomeException -> IO (Either ChatError a) + handleInternal = pure . Left . ChatError . CEInternalError . show + +withStoreBatch' :: (ChatMonad' m, Traversable t) => (DB.Connection -> t (IO a)) -> m (t (Either ChatError a)) +withStoreBatch' actions = withStoreBatch $ fmap (fmap Right) . actions + withAgent :: ChatMonad m => (AgentClient -> ExceptT AgentErrorType m a) -> m a withAgent action = asks smpAgent >>= runExceptT . action >>= liftEither . first (`ChatErrorAgent` Nothing) +withAgent' :: ChatMonad' m => (AgentClient -> m a) -> m a +withAgent' action = asks smpAgent >>= action + $(JQ.deriveJSON (enumJSON $ dropPrefix "HS") ''HelpSection) $(JQ.deriveJSON (sumTypeJSON $ dropPrefix "CLQ") ''ChatListQuery) diff --git a/tests/ChatClient.hs b/tests/ChatClient.hs index 821d7b032b..c32d8002b9 100644 --- a/tests/ChatClient.hs +++ b/tests/ChatClient.hs @@ -353,6 +353,7 @@ serverCfg = serverStatsBackupFile = Nothing, smpServerVRange = supportedSMPServerVRange, transportConfig = defaultTransportServerConfig, + smpHandshakeTimeout = 1000000, controlPort = Nothing }