From 4c92e32dc245f5f6b87ce9dc20bedb414a56b56a Mon Sep 17 00:00:00 2001 From: IC Rainbow Date: Wed, 6 Dec 2023 01:31:13 +0200 Subject: [PATCH] WIP: add batching --- src/Simplex/Chat.hs | 96 ++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 90 insertions(+), 6 deletions(-) diff --git a/src/Simplex/Chat.hs b/src/Simplex/Chat.hs index aa489e9a9b..af96cc79cc 100644 --- a/src/Simplex/Chat.hs +++ b/src/Simplex/Chat.hs @@ -1,8 +1,11 @@ {-# LANGUAGE DataKinds #-} {-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE GADTs #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE LambdaCase #-} +{-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE MultiWayIf #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedRecordDot #-} @@ -4989,9 +4992,10 @@ processAgentMessageConn user@User {userId} corrId agentConnId agentMessage = do Left _ -> messageError "x.grp.mem.inv error: referenced member does not exist" Right reMember -> do GroupMemberIntro {introId} <- withStore $ \db -> saveIntroInvitation db reMember m introInv - void . sendGroupMessage' user [reMember] (XGrpMemFwd (memberInfo m) introInv) groupId (Just introId) $ - withStore' $ - \db -> updateIntroStatus db introId GMIntroInvForwarded + void . runDBSuspendingChat_ $ sendGroupMessage' user [reMember] (XGrpMemFwd (memberInfo m) introInv) groupId (Just introId) $ + -- withStore' $ \db -> + suspendDB $ \db -> + updateIntroStatus db introId GMIntroInvForwarded _ -> messageError "x.grp.mem.inv can be only sent by invitee member" xGrpMemFwd :: GroupInfo -> GroupMember -> MemberInfo -> IntroInvitation -> m () @@ -5522,11 +5526,91 @@ deliverMessage conn@Connection {connId} cmEventTag msgBody msgId = do let sndMsgDelivery = SndMsgDelivery {connId, agentMsgId} withStore' $ \db -> createSndMsgDelivery db sndMsgDelivery msgId +newtype Sus i a = Sus + { runSus :: ReaderT (TVar [i], ChatController) IO a + } deriving (Functor, Applicative, Monad, MonadIO, MonadUnliftIO) + +-- Recover MonadError via MonadUnliftIO +instance MonadError ChatError (Sus i) where + throwError = E.throwIO + {-# INLINE throwError #-} + catchError = E.catch + {-# INLINE catchError #-} + +-- Alas, there's no MonadCont IO, and no MonadUnliftIO (ContT IO) +-- instance MonadCont Sus where +-- callCC a'susB_susA = Sus $ do +-- callCC $ \a'rB -> +-- let Sus s = a'susB_susA $ \a -> Sus (a'rB a) +-- in s + +-- Hide our extra state var from ChatMonad/Reader +instance MonadReader ChatController (Sus i) where + ask = Sus $ asks snd + {-# INLINE ask #-} + local f (Sus action) = Sus $ local (fmap f) action + {-# INLINE local #-} + +-- Instantiate TVar-powered MonadState, breaking atomic updates +-- instance MonadState [IO ()] Sus where +-- state f = Sus $ asks fst >>= \st -> atomically (stateTVar st f) +-- get = Sus $ asks fst >>= readTVarIO +-- put x = seq x $ Sus $ asks fst >>= \st -> atomically (writeTVar st x) + +-- Or not... just use our hidden state directly +suspend :: i -> Sus i () +suspend action = Sus $ asks fst >>= \st -> atomically $ modifyTVar' st (action :) -- XXX: pushing actions to a stack - reverse to get original order (if needed) +{-# INLINE suspend #-} + +-- Make it classy to prevent monomorphization of ChatMonad constraints +class SuspendIO m where + suspendIO :: IO () -> m () + +instance SuspendIO (Sus (IO ())) where + suspendIO = suspend + +suspendM :: (MonadUnliftIO m, SuspendIO m) => m () -> m () +suspendM action = toIO action >>= suspendIO + +-- Boo, does not aggregate transactions +-- withStoreLater :: (ChatMonad m, SuspendIO m) => (DB.Connection -> IO ()) -> m () +-- withStoreLater action = suspendM (withStore' action) + +withStoreLater :: SuspendDB m => (DB.Connection -> IO ()) -> m () +withStoreLater = suspendDB + +class SuspendDB m where + suspendDB :: (DB.Connection -> IO ()) -> m () + +instance SuspendDB (Sus (DB.Connection -> IO ())) where + suspendDB = suspend + +runSuspendingChat :: ChatMonad m => ([i] -> m r) -> Sus i a -> m (r, a) +runSuspendingChat animate (Sus action) = do + cc <- ask + suspended <- newTVarIO [] + r <- E.try (liftIO $ runReaderT action (suspended, cc)) >>= liftEither + readTVarIO suspended >>= animate >>= \susResults -> pure (susResults, r) + +runIOSuspendingChat_ :: forall m a . ChatMonad m => Sus (IO ()) a -> m a +runIOSuspendingChat_ action = snd <$> runSuspendingChat animate_ action + where + animate_ :: [IO ()] -> m () + animate_ = mapM_ liftIO + +runDBSuspendingChat_ :: forall m a . ChatMonad m => Sus (DB.Connection -> IO ()) a -> m a +runDBSuspendingChat_ action = snd <$> runSuspendingChat animate action + where + animate :: [DB.Connection -> IO ()] -> m () + animate batch = withStore' $ \db -> mapM_ ($ db) $ reverse batch -- XXX: recover original order from consing to [] + sendGroupMessage :: (MsgEncodingI e, ChatMonad m) => User -> GroupInfo -> [GroupMember] -> ChatMsgEvent e -> m (SndMessage, [GroupMember]) sendGroupMessage user GroupInfo {groupId} members chatMsgEvent = - sendGroupMessage' user members chatMsgEvent groupId Nothing $ pure () + -- Give SuspendDB powers + runDBSuspendingChat_ $ sendGroupMessage' user members chatMsgEvent groupId Nothing $ pure () -sendGroupMessage' :: forall e m. (MsgEncodingI e, ChatMonad m) => User -> [GroupMember] -> ChatMsgEvent e -> Int64 -> Maybe Int64 -> m () -> m (SndMessage, [GroupMember]) +-- Require SuspendDB powers +sendGroupMessage' :: forall e m. (MsgEncodingI e, ChatMonad m, SuspendDB m) => User -> [GroupMember] -> ChatMsgEvent e -> Int64 -> Maybe Int64 -> m () -> m (SndMessage, [GroupMember]) sendGroupMessage' user members chatMsgEvent groupId introId_ postDeliver = do msg <- createSndMessage chatMsgEvent (GroupId groupId) -- TODO collect failed deliveries into a single error @@ -5551,7 +5635,7 @@ sendGroupMessage' user members chatMsgEvent groupId introId_ postDeliver = do | forwardSupported && isForwardedGroupMsg chatMsgEvent = pure Nothing | isXGrpMsgForward chatMsgEvent = pure Nothing | otherwise = do - withStore' $ \db -> createPendingGroupMessage db groupMemberId msgId introId_ + withStoreLater $ \db -> createPendingGroupMessage db groupMemberId msgId introId_ pure $ Just m forwardSupported = do let mcvr = memberChatVRange' m