core: use batched subscriptions (#818)

* core: use batched subscriptions

* update simplexmq

* remove comments

* clean up

* refactor

* remove todo

* revert change

* revert change

* remove comment

* add delay to the async group test

* add more delay in test
This commit is contained in:
Evgeny Poberezkin
2022-07-17 15:51:17 +01:00
committed by GitHub
parent e8da13c7ca
commit 13fbb66a21
11 changed files with 210 additions and 155 deletions
+122 -86
View File
@@ -31,12 +31,13 @@ import Data.Either (fromRight)
import Data.Fixed (div')
import Data.Functor (($>))
import Data.Int (Int64)
import Data.List (find, isSuffixOf)
import Data.List (find, isSuffixOf, sortBy)
import Data.List.NonEmpty (NonEmpty, nonEmpty)
import qualified Data.List.NonEmpty as L
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Data.Maybe (fromMaybe, isJust, mapMaybe)
import Data.Maybe (fromMaybe, isJust, isNothing, mapMaybe)
import Data.Ord (comparing)
import Data.Text (Text)
import qualified Data.Text as T
import Data.Time.Clock (UTCTime, diffUTCTime, getCurrentTime, nominalDiffTimeToSeconds)
@@ -54,7 +55,7 @@ import Simplex.Chat.Protocol
import Simplex.Chat.Store
import Simplex.Chat.Types
import Simplex.Chat.Util (safeDecodeUtf8, uncurry3)
import Simplex.Messaging.Agent
import Simplex.Messaging.Agent as Agent
import Simplex.Messaging.Agent.Env.SQLite (AgentConfig (..), InitialAgentServers (..), defaultAgentConfig)
import Simplex.Messaging.Agent.Protocol
import qualified Simplex.Messaging.Crypto as C
@@ -64,7 +65,7 @@ import Simplex.Messaging.Parsers (base64P, parseAll)
import Simplex.Messaging.Protocol (ErrorType (..), MsgBody, MsgFlags (..), NtfServer)
import qualified Simplex.Messaging.Protocol as SMP
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Util (ifM, liftEitherError, tryError, unlessM, whenM, (<$?>))
import Simplex.Messaging.Util
import System.Exit (exitFailure, exitSuccess)
import System.FilePath (combine, splitExtensions, takeFileName)
import System.IO (Handle, IOMode (..), SeekMode (..), hFlush, openFile, stdout)
@@ -157,7 +158,7 @@ startChatController user subConns = do
a1 <- async $ race_ notificationSubscriber agentSubscriber
a2 <-
if subConns
then Just <$> async (subscribeUserConnections subscribeConnection user)
then Just <$> async (void . runExceptT $ subscribeUserConnections Agent.subscribeConnections user)
else pure Nothing
atomically . writeTVar s $ Just (a1, a2)
pure a1
@@ -218,7 +219,7 @@ processChatCommand = \case
withUser $ \user -> restoreCalls user
withAgent activateAgent $> CRCmdOk
APISuspendChat t -> withAgent (`suspendAgent` t) $> CRCmdOk
ResubscribeAllConnections -> withUser (subscribeUserConnections resubscribeConnection) $> CRCmdOk
ResubscribeAllConnections -> withUser (subscribeUserConnections Agent.resubscribeConnections) $> CRCmdOk
SetFilesFolder filesFolder' -> do
createDirectoryIfMissing True filesFolder'
ff <- asks filesFolder
@@ -582,7 +583,7 @@ processChatCommand = \case
(NotificationInfo {ntfConnId, ntfMsgMeta}, msgs) <- withAgent $ \a -> getNotificationMessage a nonce encNtfInfo
let ntfMessages = map (\SMP.SMPMsgMeta {msgTs, msgFlags} -> NtfMsgInfo {msgTs = systemToUTCTime msgTs, msgFlags}) msgs
msgTs' = systemToUTCTime . (SMP.msgTs :: SMP.NMsgMeta -> SystemTime) <$> ntfMsgMeta
connEntity <- withStore (\db -> Just <$> getConnectionEntity db user ntfConnId) `catchError` \_ -> pure Nothing
connEntity <- withStore (\db -> Just <$> getConnectionEntity db user (AgentConnId ntfConnId)) `catchError` \_ -> pure Nothing
pure CRNtfMessages {connEntity, msgTs = msgTs', ntfMessages}
GetUserSMPServers -> CRUserSMPServers <$> withUser (\user -> withStore' (`getSMPServers` user))
SetUserSMPServers smpServers -> withUser $ \user -> withChatLock $ do
@@ -618,12 +619,12 @@ processChatCommand = \case
(connId, cReq) <- withAgent (`createConnection` SCMContact)
withStore $ \db -> createUserContactLink db userId connId cReq
pure $ CRUserContactLinkCreated cReq
DeleteMyAddress -> withUser $ \User {userId} -> withChatLock $ do
conns <- withStore (`getUserContactLinkConnections` userId)
DeleteMyAddress -> withUser $ \user -> withChatLock $ do
conns <- withStore (`getUserContactLinkConnections` user)
procCmd $ do
withAgent $ \a -> forM_ conns $ \conn ->
deleteConnection a (aConnId conn) `catchError` \(_ :: AgentErrorType) -> pure ()
withStore' (`deleteUserContactLink` userId)
withStore' (`deleteUserContactLink` user)
pure CRUserContactLinkDeleted
ShowMyAddress -> withUser $ \User {userId} ->
uncurry3 CRUserContactLink <$> withStore (`getUserContactLink` userId)
@@ -1076,85 +1077,120 @@ agentSubscriber = do
withLock l . void . runExceptT $
processAgentMessage u connId msg `catchError` (toView . CRChatError)
subscribeUserConnections ::
(MonadUnliftIO m, MonadReader ChatController m) =>
(forall m'. ChatMonad m' => AgentClient -> ConnId -> ExceptT AgentErrorType m' ()) ->
User ->
m ()
subscribeUserConnections agentSubscribe user@User {userId} = do
n <- asks $ subscriptionConcurrency . config
type AgentBatchSubscribe m = AgentClient -> [ConnId] -> ExceptT AgentErrorType m (Map ConnId (Either AgentErrorType ()))
subscribeUserConnections :: forall m. ChatMonad m => AgentBatchSubscribe m -> User -> m ()
subscribeUserConnections agentBatchSubscribe user = do
-- get user connections
ce <- asks $ subscriptionEvents . config
void . runExceptT $ do
catchErr $ subscribeContacts n ce
catchErr $ subscribeUserContactLink n
catchErr $ subscribeGroups n ce
catchErr $ subscribeFiles n
catchErr $ subscribePendingConnections n
(ctConns, cts) <- getContactConns
(ucConns, ucs) <- getUserContactLinkConns
(gs, mConns, ms) <- getGroupMemberConns
(sftConns, sfts) <- getSndFileTransferConns
(rftConns, rfts) <- getRcvFileTransferConns
(pcConns, pcs) <- getPendingContactConns
-- subscribe using batched commands
rs <- withAgent (`agentBatchSubscribe` concat [ctConns, ucConns, mConns, sftConns, rftConns, pcConns])
-- send connection events to view
contactSubsToView rs cts
contactLinkSubsToView rs ucs
groupSubsToView rs gs ms ce
sndFileSubsToView rs sfts
rcvFileSubsToView rs rfts
pendingConnSubsToView rs pcs
where
catchErr a = a `catchError` \_ -> pure ()
subscribeContacts n ce = do
contacts <- withStore' (`getUserContacts` user)
toView . CRContactSubSummary =<< pooledForConcurrentlyN n contacts (\ct -> ContactSubStatus ct <$> subscribeContact ce ct)
subscribeContact ce ct =
(subscribe (contactConnId ct) $> Nothing)
`catchError` (\e -> when ce (toView $ CRContactSubError ct e) $> Just e)
subscribeGroups n ce = do
groups <- withStore' (`getUserGroups` user)
toView . CRMemberSubErrors . mconcat =<< forM groups (subscribeGroup n ce)
subscribeGroup n ce (Group g@GroupInfo {membership} members) = do
let connectedMembers = mapMaybe (\m -> (m,) <$> memberConnId m) members
if memberStatus membership == GSMemInvited
then do
toView $ CRGroupInvitation g
pure []
else
if null connectedMembers
then do
if memberActive membership
then toView $ CRGroupEmpty g
else toView $ CRGroupRemoved g
pure []
else do
ms <- pooledForConcurrentlyN n connectedMembers $ \(m@GroupMember {localDisplayName = c}, cId) ->
(m,) <$> ((subscribe cId $> Nothing) `catchError` (\e -> when ce (toView $ CRMemberSubError g c e) $> Just e))
toView $ CRGroupSubscribed g
pure $ mapMaybe (\(m, e) -> (Just . MemberSubError m) =<< e) ms
subscribeFiles n = do
sndFileTransfers <- withStore' (`getLiveSndFileTransfers` user)
pooledForConcurrentlyN_ n sndFileTransfers $ \sft -> subscribeSndFile sft
rcvFileTransfers <- withStore' (`getLiveRcvFileTransfers` user)
pooledForConcurrentlyN_ n rcvFileTransfers $ \rft -> subscribeRcvFile rft
getContactConns :: m ([ConnId], Map ConnId Contact)
getContactConns = do
cts <- withStore_ getUserContacts
let connIds = map contactConnId cts
pure (connIds, M.fromList $ zip connIds cts)
getUserContactLinkConns :: m ([ConnId], Map ConnId UserContact)
getUserContactLinkConns = do
(cs, ucs) <- unzip <$> withStore_ getUserContactLinks
let connIds = map aConnId cs
pure (connIds, M.fromList $ zip connIds ucs)
getGroupMemberConns :: m ([Group], [ConnId], Map ConnId GroupMember)
getGroupMemberConns = do
gs <- withStore_ getUserGroups
let mPairs = concatMap (\(Group _ ms) -> mapMaybe (\m -> (,m) <$> memberConnId m) ms) gs
pure (gs, map fst mPairs, M.fromList mPairs)
getSndFileTransferConns :: m ([ConnId], Map ConnId SndFileTransfer)
getSndFileTransferConns = do
sfts <- withStore_ getLiveSndFileTransfers
let connIds = map sndFileTransferConnId sfts
pure (connIds, M.fromList $ zip connIds sfts)
getRcvFileTransferConns :: m ([ConnId], Map ConnId RcvFileTransfer)
getRcvFileTransferConns = do
rfts <- withStore_ getLiveRcvFileTransfers
let rftPairs = mapMaybe (\ft -> (,ft) <$> liveRcvFileTransferConnId ft) rfts
pure (map fst rftPairs, M.fromList rftPairs)
getPendingContactConns :: m ([ConnId], Map ConnId PendingContactConnection)
getPendingContactConns = do
pcs <- withStore_ getPendingContactConnections
let connIds = map aConnId' pcs
pure (connIds, M.fromList $ zip connIds pcs)
contactSubsToView :: Map ConnId (Either AgentErrorType ()) -> Map ConnId Contact -> m ()
contactSubsToView rs = toView . CRContactSubSummary . map (uncurry ContactSubStatus) . resultsFor rs
contactLinkSubsToView :: Map ConnId (Either AgentErrorType ()) -> Map ConnId UserContact -> m ()
contactLinkSubsToView rs ucs = case resultsFor rs ucs of
[] -> pure ()
((_, Just e) : _) -> toView $ CRUserContactLinkSubError e
_ -> toView CRUserContactLinkSubscribed
groupSubsToView :: Map ConnId (Either AgentErrorType ()) -> [Group] -> Map ConnId GroupMember -> Bool -> m ()
groupSubsToView rs gs ms ce = do
mapM_ groupSub $
sortBy (comparing $ \(Group GroupInfo {localDisplayName = g} _) -> g) gs
toView . CRMemberSubSummary $ map (uncurry MemberSubStatus) mRs
where
subscribeSndFile ft@SndFileTransfer {fileId, fileStatus, agentConnId = AgentConnId cId} = do
subscribe cId `catchError` (toView . CRSndFileSubError ft)
void . forkIO $ do
threadDelay 1000000
l <- asks chatLock
a <- asks smpAgent
when (fileStatus == FSConnected) . unlessM (isFileActive fileId sndFiles) $
withAgentLock a . withLock l $
sendFileChunk user ft
subscribeRcvFile ft@RcvFileTransfer {fileStatus} =
case fileStatus of
RFSAccepted fInfo -> resume fInfo
RFSConnected fInfo -> resume fInfo
_ -> pure ()
mRs = resultsFor rs ms
groupSub :: Group -> m ()
groupSub (Group g@GroupInfo {membership, groupId = gId} members) = do
when ce $ mapM_ (toView . uncurry (CRMemberSubError g)) mErrors
toView groupEvent
where
resume RcvFileInfo {agentConnId = AgentConnId cId} =
subscribe cId `catchError` (toView . CRRcvFileSubError ft)
subscribePendingConnections n = do
cs <- withStore' (`getPendingConnections` user)
summary <- pooledForConcurrentlyN n cs $ \Connection {agentConnId = acId@(AgentConnId cId)} ->
PendingSubStatus acId <$> ((subscribe cId $> Nothing) `catchError` (pure . Just))
toView $ CRPendingSubSummary summary
subscribeUserContactLink n = do
cs <- withStore (`getUserContactLinkConnections` userId)
(subscribeConns n cs >> toView CRUserContactLinkSubscribed)
`catchError` (toView . CRUserContactLinkSubError)
subscribe cId = withAgent (`agentSubscribe` cId)
subscribeConns n conns =
withAgent $ \a ->
pooledForConcurrentlyN_ n conns $ \c -> agentSubscribe a (aConnId c)
mErrors :: [(GroupMember, ChatError)]
mErrors =
sortBy (comparing (\(GroupMember {localDisplayName = n}, _) -> n))
. filterErrors
$ filter (\(GroupMember {groupId}, _) -> groupId == gId) mRs
groupEvent :: ChatResponse
groupEvent
| memberStatus membership == GSMemInvited = CRGroupInvitation g
| all (\GroupMember {activeConn} -> isNothing activeConn) members =
if memberActive membership
then CRGroupEmpty g
else CRGroupRemoved g
| otherwise = CRGroupSubscribed g
sndFileSubsToView :: Map ConnId (Either AgentErrorType ()) -> Map ConnId SndFileTransfer -> m ()
sndFileSubsToView rs sfts = do
let sftRs = resultsFor rs sfts
forM_ sftRs $ \(ft@SndFileTransfer {fileId, fileStatus}, err_) -> do
forM_ err_ $ toView . CRSndFileSubError ft
void . forkIO $ do
threadDelay 1000000
l <- asks chatLock
a <- asks smpAgent
when (fileStatus == FSConnected) . unlessM (isFileActive fileId sndFiles) $
withAgentLock a . withLock l $
sendFileChunk user ft
rcvFileSubsToView :: Map ConnId (Either AgentErrorType ()) -> Map ConnId RcvFileTransfer -> m ()
rcvFileSubsToView rs = mapM_ (toView . uncurry CRRcvFileSubError) . filterErrors . resultsFor rs
pendingConnSubsToView :: Map ConnId (Either AgentErrorType ()) -> Map ConnId PendingContactConnection -> m ()
pendingConnSubsToView rs = toView . CRPendingSubSummary . map (uncurry PendingSubStatus) . resultsFor rs
withStore_ :: (DB.Connection -> User -> IO [a]) -> m [a]
withStore_ a = withStore' (`a` user) `catchError` \_ -> pure []
filterErrors :: [(a, Maybe ChatError)] -> [(a, ChatError)]
filterErrors = mapMaybe (\(a, e_) -> (a,) <$> e_)
resultsFor :: Map ConnId (Either AgentErrorType ()) -> Map ConnId a -> [(a, Maybe ChatError)]
resultsFor rs = M.foldrWithKey' addResult []
where
addResult :: ConnId -> a -> [(a, Maybe ChatError)] -> [(a, Maybe ChatError)]
addResult connId = (:) . (,err)
where
err = case M.lookup connId rs of
Just (Left e) -> Just $ ChatErrorAgent e
Just _ -> Nothing
_ -> Just . ChatError . CEAgentNoSubResult $ AgentConnId connId
processAgentMessage :: forall m. ChatMonad m => Maybe User -> ConnId -> ACommand 'Agent -> m ()
processAgentMessage Nothing _ _ = throwChatError CENoActiveUser
@@ -1169,7 +1205,7 @@ processAgentMessage (Just User {userId}) "" agentMessage = case agentMessage of
toView $ event srv cs
showToast ("server " <> str) (safeDecodeUtf8 . strEncode $ SrvLoc host port)
processAgentMessage (Just user@User {userId, profile}) agentConnId agentMessage =
(withStore (\db -> getConnectionEntity db user agentConnId) >>= updateConnStatus) >>= \case
(withStore (\db -> getConnectionEntity db user $ AgentConnId agentConnId) >>= updateConnStatus) >>= \case
RcvDirectMsgConnection conn contact_ ->
processDirectMessage agentMessage conn contact_
RcvGroupMsgConnection conn gInfo m ->