asynchronously subscribe to user connections (#310)

* asynchronously subscribe to user connections

* send subscription status summaries to view/api

* refactor

* add help messages in summaries

* update simplexmq

* rename config field

Co-authored-by: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com>
This commit is contained in:
Efim Poberezkin
2022-02-25 16:29:36 +04:00
committed by GitHub
parent bbab069bcd
commit 5961b7d951
10 changed files with 93 additions and 41 deletions
+45 -31
View File
@@ -58,7 +58,7 @@ import System.Exit (exitFailure, exitSuccess)
import System.FilePath (combine, splitExtensions, takeFileName)
import System.IO (Handle, IOMode (..), SeekMode (..), hFlush, openFile, stdout)
import Text.Read (readMaybe)
import UnliftIO.Async (Async, async, race_)
import UnliftIO.Async
import UnliftIO.Concurrent (forkIO, threadDelay)
import UnliftIO.Directory (doesDirectoryExist, doesFileExist, getFileSize, getHomeDirectory, getTemporaryDirectory)
import qualified UnliftIO.Exception as E
@@ -78,8 +78,9 @@ defaultChatConfig =
},
dbPoolSize = 1,
yesToMigrations = False,
tbqSize = 16,
tbqSize = 64,
fileChunkSize = 15780,
subscriptionEvents = False,
testView = False
}
@@ -87,12 +88,13 @@ logCfg :: LogConfig
logCfg = LogConfig {lc_file = Nothing, lc_stderr = True}
newChatController :: SQLiteStore -> Maybe User -> ChatConfig -> ChatOpts -> (Notification -> IO ()) -> IO ChatController
newChatController chatStore user config@ChatConfig {agentConfig = cfg, tbqSize} ChatOpts {dbFilePrefix, smpServers} sendNotification = do
newChatController chatStore user cfg@ChatConfig {agentConfig = aCfg, tbqSize} ChatOpts {dbFilePrefix, smpServers, logConnections} sendNotification = do
let f = chatStoreFile dbFilePrefix
let config = cfg {subscriptionEvents = logConnections}
activeTo <- newTVarIO ActiveNone
firstTime <- not <$> doesFileExist f
currentUser <- newTVarIO user
smpAgent <- getSMPAgentClient cfg {dbFile = dbFilePrefix <> "_agent.db", smpServers}
smpAgent <- getSMPAgentClient aCfg {dbFile = dbFilePrefix <> "_agent.db", smpServers}
agentAsync <- newTVarIO Nothing
idsDrg <- newTVarIO =<< drgNew
inputQ <- newTBQueueIO tbqSize
@@ -462,36 +464,48 @@ agentSubscriber user = do
processAgentMessage u connId msg `catchError` (toView . CRChatError)
subscribeUserConnections :: (MonadUnliftIO m, MonadReader ChatController m) => User -> m ()
subscribeUserConnections user@User {userId} = void . runExceptT $ do
subscribeContacts
subscribeGroups
subscribeFiles
subscribePendingConnections
subscribeUserContactLink
subscribeUserConnections user@User {userId} = do
ce <- asks $ subscriptionEvents . config
void . runExceptT . (mapConcurrently_ id) $
[ subscribeContacts ce,
subscribeGroups ce,
subscribeFiles,
subscribePendingConnections,
subscribeUserContactLink
]
where
subscribeContacts = do
subscribeContacts ce = do
contacts <- withStore (`getUserContacts` user)
forM_ contacts $ \ct ->
(subscribe (contactConnId ct) >> toView (CRContactSubscribed ct)) `catchError` (toView . CRContactSubError ct)
subscribeGroups = do
toView . CRContactSubSummary =<< forConcurrently contacts (\ct -> ContactSubStatus ct <$> subscribeContact ce ct)
subscribeContact ce ct =
(subscribe (contactConnId ct) >> when ce (toView $ CRContactSubscribed ct) $> Nothing)
`catchError` (\e -> when ce (toView $ CRContactSubError ct e) $> Just e)
subscribeGroups ce = do
groups <- withStore (`getUserGroups` user)
forM_ groups $ \(Group g@GroupInfo {membership} members) -> do
let connectedMembers = mapMaybe (\m -> (m,) <$> memberConnId m) members
if memberStatus membership == GSMemInvited
then toView $ CRGroupInvitation g
else
if null connectedMembers
then
if memberActive membership
then toView $ CRGroupEmpty g
else toView $ CRGroupRemoved g
else do
forM_ connectedMembers $ \(GroupMember {localDisplayName = c}, cId) ->
subscribe cId `catchError` (toView . CRMemberSubError g c)
toView $ CRGroupSubscribed g
toView . CRMemberSubErrors . mconcat =<< forConcurrently groups (subscribeGroup ce)
subscribeGroup ce (Group g@GroupInfo {membership} members) = do
let connectedMembers = mapMaybe (\m -> (m,) <$> memberConnId m) members
if memberStatus membership == GSMemInvited
then do
toView $ CRGroupInvitation g
pure []
else
if null connectedMembers
then do
if memberActive membership
then toView $ CRGroupEmpty g
else toView $ CRGroupRemoved g
pure []
else do
ms <- forConcurrently connectedMembers $ \(m@GroupMember {localDisplayName = c}, cId) ->
(m,) <$> ((subscribe cId $> Nothing) `catchError` (\e -> when ce (toView $ CRMemberSubError g c e) $> Just e))
toView $ CRGroupSubscribed g
pure $ mapMaybe (\(m, e) -> maybe Nothing (Just . MemberSubError m) e) ms
subscribeFiles = do
withStore (`getLiveSndFileTransfers` user) >>= mapM_ subscribeSndFile
withStore (`getLiveRcvFileTransfers` user) >>= mapM_ subscribeRcvFile
sndFileTransfers <- withStore (`getLiveSndFileTransfers` user)
forConcurrently_ sndFileTransfers $ \sft -> async $ subscribeSndFile sft
rcvFileTransfers <- withStore (`getLiveRcvFileTransfers` user)
forConcurrently_ rcvFileTransfers $ \rft -> async $ subscribeRcvFile rft
where
subscribeSndFile ft@SndFileTransfer {fileId, fileStatus, agentConnId = AgentConnId cId} = do
subscribe cId `catchError` (toView . CRSndFileSubError ft)
@@ -520,7 +534,7 @@ subscribeUserConnections user@User {userId} = void . runExceptT $ do
subscribe cId = withAgent (`subscribeConnection` cId)
subscribeConns conns =
withAgent $ \a ->
forM_ conns $ subscribeConnection a . aConnId
forConcurrently_ conns $ \c -> subscribeConnection a (aConnId c)
processAgentMessage :: forall m. ChatMonad m => Maybe User -> ConnId -> ACommand 'Agent -> m ()
processAgentMessage Nothing _ _ = throwChatError CENoActiveUser