core: batch broadcast send db operations (#3956)

* core: batch broadcast send db operations

* refactor

* Update src/Simplex/Chat.hs

Co-authored-by: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com>

---------

Co-authored-by: Evgeny Poberezkin <evgeny@poberezkin.com>
Co-authored-by: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com>
This commit is contained in:
spaced4ndy
2024-03-27 11:49:14 +04:00
committed by GitHub
parent 96fb61ccfc
commit 42ebe8bc60
+37 -17
View File
@@ -1563,19 +1563,40 @@ processChatCommand' vr = \case
processChatCommand . APISendMessage chatRef True Nothing $ ComposedMessage Nothing Nothing mc
SendMessageBroadcast msg -> withUser $ \user -> do
contacts <- withStore' $ \db -> getUserContacts db vr user
let cts = filter (\ct -> contactReady ct && contactActive ct && directOrUsed ct) contacts
ChatConfig {logLevel} <- asks config
withChatLock "sendMessageBroadcast" . procCmd $ do
(successes, failures) <- foldM (sendAndCount user logLevel) (0, 0) cts
timestamp <- liftIO getCurrentTime
pure CRBroadcastSent {user, msgContent = mc, successes, failures, timestamp}
let ctConns_ = L.nonEmpty $ foldr addContactConn [] contacts
case ctConns_ of
Nothing -> do
timestamp <- liftIO getCurrentTime
pure CRBroadcastSent {user, msgContent = mc, successes = 0, failures = 0, timestamp}
Just (ctConns :: NonEmpty (Contact, Connection)) -> do
let idsEvts = L.map ctSndEvent ctConns
sndMsgs <- createSndMessages idsEvts
let msgReqs_ :: NonEmpty (Either ChatError MsgReq) = L.zipWith (fmap . ctMsgReq) ctConns sndMsgs
(errs, ctSndMsgs :: [(Contact, SndMessage)]) <-
partitionEithers . L.toList . zipWith3' combineResults ctConns sndMsgs <$> deliverMessagesB msgReqs_
timestamp <- liftIO getCurrentTime
void $ withStoreBatch' $ \db -> map (createCI db user timestamp) ctSndMsgs
pure CRBroadcastSent {user, msgContent = mc, successes = length ctSndMsgs, failures = length errs, timestamp}
where
mc = MCText msg
sendAndCount user ll (s, f) ct =
(sendToContact user ct $> (s + 1, f)) `catchChatError` \e -> when (ll <= CLLInfo) (toView $ CRChatError (Just user) e) $> (s, f + 1)
sendToContact user ct = do
(sndMsg, _) <- sendDirectContactMessage user ct (XMsgNew $ MCSimple (extMsgContent mc Nothing))
void $ saveSndChatItem user (CDDirectSnd ct) sndMsg (CISndMsgContent mc)
addContactConn :: Contact -> [(Contact, Connection)] -> [(Contact, Connection)]
addContactConn ct ctConns = case contactSendConn_ ct of
Right conn | directOrUsed ct -> (ct, conn) : ctConns
_ -> ctConns
ctSndEvent :: (Contact, Connection) -> (ConnOrGroupId, PQSupport, ChatMsgEvent 'Json)
ctSndEvent (_, Connection {connId, pqSupport}) = (ConnectionId connId, pqSupport, XMsgNew $ MCSimple (extMsgContent mc Nothing))
ctMsgReq :: (Contact, Connection) -> SndMessage -> MsgReq
ctMsgReq (_, conn) SndMessage {msgId, msgBody} = (conn, MsgFlags {notification = hasNotification XMsgNew_}, msgBody, msgId)
zipWith3' :: (a -> b -> c -> d) -> NonEmpty a -> NonEmpty b -> NonEmpty c -> NonEmpty d
zipWith3' f ~(x :| xs) ~(y :| ys) ~(z :| zs) = f x y z :| zipWith3 f xs ys zs
combineResults :: (Contact, Connection) -> Either ChatError SndMessage -> Either ChatError (Int64, PQEncryption) -> Either ChatError (Contact, SndMessage)
combineResults (ct, _) (Right msg') (Right _) = Right (ct, msg')
combineResults _ (Left e) _ = Left e
combineResults _ _ (Left e) = Left e
createCI :: DB.Connection -> User -> UTCTime -> (Contact, SndMessage) -> IO ()
createCI db user createdAt (ct, sndMsg) =
void $ createNewSndChatItem db user (CDDirectSnd ct) sndMsg (CISndMsgContent mc) Nothing Nothing False createdAt
SendMessageQuote cName (AMsgDirection msgDir) quotedMsg msg -> withUser $ \user@User {userId} -> do
contactId <- withStore $ \db -> getContactIdByName db user cName
quotedItemId <- withStore $ \db -> getDirectChatItemIdByText db userId contactId msgDir quotedMsg
@@ -2221,7 +2242,7 @@ processChatCommand' vr = \case
summary <- case changedCts_ of
Nothing -> pure $ UserProfileUpdateSummary 0 0 []
Just changedCts -> do
let idsEvts = L.map ctSndMsg changedCts
let idsEvts = L.map ctSndEvent changedCts
msgReqs_ <- L.zipWith ctMsgReq changedCts <$> createSndMessages idsEvts
(errs, cts) <- partitionEithers . L.toList . L.zipWith (second . const) changedCts <$> deliverMessagesB msgReqs_
unless (null errs) $ toView $ CRChatErrors (Just user) errs
@@ -2238,16 +2259,15 @@ processChatCommand' vr = \case
-- [incognito] filter out contacts with whom user has incognito connections
addChangedProfileContact :: User -> Contact -> [ChangedProfileContact] -> [ChangedProfileContact]
addChangedProfileContact user' ct changedCts = case contactSendConn_ ct' of
Left _ -> changedCts
Right conn
| connIncognito conn || mergedProfile' == mergedProfile -> changedCts
| otherwise -> ChangedProfileContact ct ct' mergedProfile' conn : changedCts
Right conn | not (connIncognito conn) && mergedProfile' /= mergedProfile ->
ChangedProfileContact ct ct' mergedProfile' conn : changedCts
_ -> changedCts
where
mergedProfile = userProfileToSend user Nothing (Just ct) False
ct' = updateMergedPreferences user' ct
mergedProfile' = userProfileToSend user' Nothing (Just ct') False
ctSndMsg :: ChangedProfileContact -> (ConnOrGroupId, PQSupport, ChatMsgEvent 'Json)
ctSndMsg ChangedProfileContact {mergedProfile', conn = Connection {connId, pqSupport}} = (ConnectionId connId, pqSupport, XInfo mergedProfile')
ctSndEvent :: ChangedProfileContact -> (ConnOrGroupId, PQSupport, ChatMsgEvent 'Json)
ctSndEvent ChangedProfileContact {mergedProfile', conn = Connection {connId, pqSupport}} = (ConnectionId connId, pqSupport, XInfo mergedProfile')
ctMsgReq :: ChangedProfileContact -> Either ChatError SndMessage -> Either ChatError MsgReq
ctMsgReq ChangedProfileContact {conn} =
fmap $ \SndMessage {msgId, msgBody} ->