Merge branch 'stable'

This commit is contained in:
Evgeny Poberezkin
2024-05-23 22:56:04 +01:00
5 changed files with 55 additions and 16 deletions
+22 -14
View File
@@ -1,3 +1,4 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
@@ -1701,7 +1702,7 @@ processChatCommand' vr = \case
sndMsgs <- lift $ createSndMessages idsEvts
let msgReqs_ :: NonEmpty (Either ChatError MsgReq) = L.zipWith (fmap . ctMsgReq) ctConns sndMsgs
(errs, ctSndMsgs :: [(Contact, SndMessage)]) <-
lift $ partitionEithers . L.toList . zipWith3' combineResults ctConns sndMsgs <$> deliverMessagesB msgReqs_
partitionEithers . L.toList . zipWith3' combineResults ctConns sndMsgs <$> deliverMessagesB msgReqs_
timestamp <- liftIO getCurrentTime
lift . void $ withStoreBatch' $ \db -> map (createCI db user timestamp) ctSndMsgs
pure CRBroadcastSent {user, msgContent = mc, successes = length ctSndMsgs, failures = length errs, timestamp}
@@ -2397,7 +2398,7 @@ processChatCommand' vr = \case
Just changedCts -> do
let idsEvts = L.map ctSndEvent changedCts
msgReqs_ <- lift $ L.zipWith ctMsgReq changedCts <$> createSndMessages idsEvts
(errs, cts) <- lift $ partitionEithers . L.toList . L.zipWith (second . const) changedCts <$> deliverMessagesB msgReqs_
(errs, cts) <- partitionEithers . L.toList . L.zipWith (second . const) changedCts <$> deliverMessagesB msgReqs_
unless (null errs) $ toView $ CRChatErrors (Just user) errs
let changedCts' = filter (\ChangedProfileContact {ct, ct'} -> directOrUsed ct' && mergedPreferences ct' /= mergedPreferences ct) cts
lift $ createContactsSndFeatureItems user' changedCts'
@@ -6561,21 +6562,21 @@ deliverMessage conn cmEventTag msgBody msgId = do
deliverMessage' :: Connection -> MsgFlags -> MsgBody -> MessageId -> CM (Int64, PQEncryption)
deliverMessage' conn msgFlags msgBody msgId =
lift (deliverMessages ((conn, msgFlags, msgBody, msgId) :| [])) >>= \case
deliverMessages ((conn, msgFlags, msgBody, msgId) :| []) >>= \case
r :| [] -> liftEither r
rs -> throwChatError $ CEInternalError $ "deliverMessage: expected 1 result, got " <> show (length rs)
type MsgReq = (Connection, MsgFlags, MsgBody, MessageId)
deliverMessages :: NonEmpty MsgReq -> CM' (NonEmpty (Either ChatError (Int64, PQEncryption)))
deliverMessages :: NonEmpty MsgReq -> CM (NonEmpty (Either ChatError (Int64, PQEncryption)))
deliverMessages msgs = deliverMessagesB $ L.map Right msgs
deliverMessagesB :: NonEmpty (Either ChatError MsgReq) -> CM' (NonEmpty (Either ChatError (Int64, PQEncryption)))
deliverMessagesB :: NonEmpty (Either ChatError MsgReq) -> CM (NonEmpty (Either ChatError (Int64, PQEncryption)))
deliverMessagesB msgReqs = do
msgReqs' <- liftIO compressBodies
sent <- L.zipWith prepareBatch msgReqs' <$> withAgent' (`sendMessagesB` L.map toAgent msgReqs')
void $ withStoreBatch' $ \db -> map (updatePQSndEnabled db) (rights . L.toList $ sent)
withStoreBatch $ \db -> L.map (bindRight $ createDelivery db) sent
sent <- L.zipWith prepareBatch msgReqs' <$> withAgent (`sendMessagesB` L.map toAgent msgReqs')
lift . void $ withStoreBatch' $ \db -> map (updatePQSndEnabled db) (rights . L.toList $ sent)
lift . withStoreBatch $ \db -> L.map (bindRight $ createDelivery db) sent
where
compressBodies =
forME msgReqs $ \mr@(conn@Connection {pqSupport, connChatVersion = v}, msgFlags, msgBody, msgId) ->
@@ -6634,10 +6635,11 @@ sendGroupMessage' user GroupInfo {groupId} members chatMsgEvent = do
msg@SndMessage {msgId, msgBody} <- createSndMessage chatMsgEvent (GroupId groupId)
recipientMembers <- liftIO $ shuffleMembers (filter memberCurrent members)
let msgFlags = MsgFlags {notification = hasNotification $ toCMEventTag chatMsgEvent}
(toSend, pending) = foldr addMember ([], []) recipientMembers
(toSend, pending, _, dups) = foldr addMember ([], [], S.empty, 0 :: Int) recipientMembers
-- TODO PQ either somehow ensure that group members connections cannot have pqSupport/pqEncryption or pass Off's here
msgReqs = map (\(_, conn) -> (conn, msgFlags, msgBody, msgId)) toSend
delivered <- maybe (pure []) (fmap L.toList . lift . deliverMessages) $ L.nonEmpty msgReqs
when (dups /= 0) $ logError $ "sendGroupMessage: " <> tshow dups <> " duplicate members"
delivered <- maybe (pure []) (fmap L.toList . deliverMessages) $ L.nonEmpty msgReqs
let errors = lefts delivered
unless (null errors) $ toView $ CRChatErrors (Just user) errors
stored <- lift . withStoreBatch' $ \db -> map (\m -> createPendingGroupMessage db (groupMemberId' m) msgId Nothing) pending
@@ -6650,10 +6652,16 @@ sendGroupMessage' user GroupInfo {groupId} members chatMsgEvent = do
liftM2 (<>) (shuffle adminMs) (shuffle otherMs)
where
isAdmin GroupMember {memberRole} = memberRole >= GRAdmin
addMember m (toSend, pending) = case memberSendAction chatMsgEvent members m of
Just (MSASend conn) -> ((m, conn) : toSend, pending)
Just MSAPending -> (toSend, m : pending)
Nothing -> (toSend, pending)
addMember m acc@(toSend, pending, !mIds, !dups) = case memberSendAction chatMsgEvent members m of
Just a
| mId `S.member` mIds -> (toSend, pending, mIds, dups + 1)
| otherwise -> case a of
MSASend conn -> ((m, conn) : toSend, pending, mIds', dups)
MSAPending -> (toSend, m : pending, mIds', dups)
Nothing -> acc
where
mId = groupMemberId' m
mIds' = S.insert mId mIds
filterSent :: [Either ChatError a] -> [mem] -> (mem -> GroupMember) -> [GroupMember]
filterSent rs ms mem = [mem m | (Right _, m) <- zip rs ms]
+1
View File
@@ -26,6 +26,7 @@ import Data.Text (Text)
import qualified Data.Text as T
import qualified Database.SQLite3 as SQL
import Simplex.Chat.Controller
import Simplex.Chat.Util ()
import Simplex.Messaging.Agent.Client (agentClientStore)
import Simplex.Messaging.Agent.Store.SQLite (SQLiteStore (..), closeSQLiteStore, keyString, sqlString, storeKey)
import Simplex.Messaging.Util
+30
View File
@@ -1,10 +1,18 @@
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TupleSections #-}
{-# OPTIONS_GHC -Wno-orphans #-}
module Simplex.Chat.Util (week, encryptFile, chunkSize, liftIOEither, shuffle) where
import Control.Exception (Exception)
import Control.Monad
import Control.Monad.Except
import Control.Monad.IO.Class
import Control.Monad.IO.Unlift (MonadUnliftIO (..))
import Control.Monad.Reader
import Data.Bifunctor (first)
import qualified Data.ByteString.Lazy as LB
import Data.List (sortBy)
import Data.Ord (comparing)
@@ -13,6 +21,7 @@ import Data.Word (Word16)
import Simplex.Messaging.Crypto.File (CryptoFile (..), CryptoFileArgs (..))
import qualified Simplex.Messaging.Crypto.File as CF
import System.Random (randomRIO)
import qualified UnliftIO.Exception as E
import UnliftIO.IO (IOMode (..), withFile)
week :: NominalDiffTime
@@ -46,3 +55,24 @@ shuffle xs = map snd . sortBy (comparing fst) <$> mapM (\x -> (,x) <$> random) x
liftIOEither :: (MonadIO m, MonadError e m) => IO (Either e a) -> m a
liftIOEither a = liftIO a >>= liftEither
{-# INLINE liftIOEither #-}
newtype InternalException e = InternalException {unInternalException :: e}
deriving (Eq, Show)
instance Exception e => Exception (InternalException e)
instance Exception e => MonadUnliftIO (ExceptT e IO) where
{-# INLINE withRunInIO #-}
withRunInIO :: ((forall a. ExceptT e IO a -> IO a) -> IO b) -> ExceptT e IO b
withRunInIO inner =
ExceptT . fmap (first unInternalException) . E.try $
withRunInIO $ \run ->
inner $ run . (either (E.throwIO . InternalException) pure <=< runExceptT)
instance Exception e => MonadUnliftIO (ExceptT e (ReaderT r IO)) where
{-# INLINE withRunInIO #-}
withRunInIO :: ((forall a. ExceptT e (ReaderT r IO) a -> IO a) -> IO b) -> ExceptT e (ReaderT r IO) b
withRunInIO inner =
withExceptT unInternalException . ExceptT . E.try $
withRunInIO $ \run ->
inner $ run . (either (E.throwIO . InternalException) pure <=< runExceptT)