This commit is contained in:
Alexander Bondarenko
2024-05-27 22:12:39 +03:00
parent 1bc47c6910
commit cd3992fd0f
4 changed files with 99 additions and 27 deletions
+82 -11
View File
@@ -3331,23 +3331,94 @@ agentSubscriber = do
subscribeUserConnections :: VersionRangeChat -> Bool -> User -> CM ()
subscribeUserConnections vr onlyNeeded user = do
-- get user connections
conns <-
(ctConns, ucConns, mConns, sftConns, rftConns, pcConns) <-
if onlyNeeded
then withStore' getConnectionsToSubscribe
then
-- XXX: can be streamed from DB without collecting everything, needs DB.fold wrapper in mq
foldl' addEntity ([], [], [], [], [], []) <$> withStore' (\db -> getConnectionsToSubscribe db vr user)
else do
withStore' unsetConnectionToSubscribe
ctConns <- mapMaybe (\ct -> if contactActive ct then contactConnId ct else Nothing) <$> withStore_ (`getUserContacts` vr)
ucConns <- map (aConnId . fst) <$> withStore_ (`getUserContactLinks` vr)
mConns <- concatMap (\(Group _ ms) -> mapMaybe memberConnId (filter (not . memberRemoved) ms)) <$> withStore_ (`getUserGroups` vr)
sftConns <- map sndFileTransferConnId <$> withStore_ getLiveSndFileTransfers
rftConns <- mapMaybe liveRcvFileTransferConnId <$> withStore_ getLiveRcvFileTransfers
pcConns <- map aConnId' <$> withStore_ getPendingContactConnections
pure $ concat [ctConns, ucConns, mConns, sftConns, rftConns, pcConns]
-- subscribe using batched commands
void $ withAgent (`Agent.subscribeConnections` conns)
ctConns <- getContactConns
ucConns <- getUserContactLinkConns
-- (gs, mConns, ms) <- getGroupMemberConns
mConns <- getGroupMemberConns
sftConns <- getSndFileTransferConns
rftConns <- getRcvFileTransferConns
pcConns <- getPendingContactConns
pure (ctConns, ucConns, mConns, sftConns, rftConns, pcConns)
let conns = concat [ctConns, ucConns, mConns, sftConns, rftConns, pcConns]
void . lift . forkIO . void . runExceptT $ do -- detach subscription and result processing
rs <- withAgent (`Agent.subscribeConnections` conns) -- subscribe using batched commands
let (errs, _oks) = M.mapEither id rs
-- api <- asks $ coreApi . config
-- refs <- withStore' $ \db -> getConnectionsContacts db (if api then M.keys errs else M.keys rs)
-- let connRefs = M.fromList $ map (\ref@ContactRef {agentConnId} -> (agentConnId, ref)) refs
ce <- asks $ subscriptionEvents . config
contactSubsToView errs ctConns ce
-- TODO: others
where
addEntity (cts, ucs, ms, sfts, rfts, pcs) = \case
RcvDirectMsgConnection c (Just _ct) -> let cts' = aConnId c : cts in (cts', ucs, ms, sfts, rfts, pcs)
RcvDirectMsgConnection c Nothing -> let pcs' = aConnId c : pcs in (cts, ucs, ms, sfts, rfts, pcs')
RcvGroupMsgConnection c _g _m -> let ms' = aConnId c : ms in (cts, ucs, ms', sfts, rfts, pcs)
SndFileConnection c _sft -> let sfts' = aConnId c : sfts in (cts, ucs, ms, sfts', rfts, pcs)
RcvFileConnection c _rft -> let rfts' = aConnId c : rfts in (cts, ucs, ms, sfts, rfts', pcs)
UserContactConnection c _uc -> let ucs' = aConnId c : ucs in (cts, ucs', ms, sfts, rfts, pcs)
withStore_ :: (DB.Connection -> User -> IO [a]) -> CM [a]
withStore_ a = withStore' (`a` user) `catchChatError` \e -> toView (CRChatError (Just user) e) $> []
getContactConns :: CM [ConnId]
getContactConns = do
cts <- withStore_ (`getUserContacts` vr)
let cts' = mapMaybe (\ct -> (,ct) <$> contactConnId ct) $ filter contactActive cts
pure (map fst cts')
getUserContactLinkConns :: CM [ConnId]
getUserContactLinkConns = do
(cs, _ucs) <- unzip <$> withStore_ (`getUserContactLinks` vr)
let connIds = map aConnId cs
pure connIds
getGroupMemberConns :: CM [ConnId] -- ([Group], [ConnId], Map ConnId GroupMember)
getGroupMemberConns = do
gs <- withStore_ (`getUserGroups` vr)
let mPairs = concatMap (\(Group _ ms) -> mapMaybe (\m -> (,m) <$> memberConnId m) (filter (not . memberRemoved) ms)) gs
-- pure (gs, map fst mPairs, M.fromList mPairs)
pure $ map fst mPairs
getSndFileTransferConns :: CM [ConnId]
getSndFileTransferConns = do
sfts <- withStore_ getLiveSndFileTransfers
let connIds = map sndFileTransferConnId sfts
pure connIds
getRcvFileTransferConns :: CM [ConnId]
getRcvFileTransferConns = do
rfts <- withStore_ getLiveRcvFileTransfers
let rftPairs = mapMaybe (\ft -> (,ft) <$> liveRcvFileTransferConnId ft) rfts
pure (map fst rftPairs)
getPendingContactConns :: CM [ConnId]
getPendingContactConns = do
pcs <- withStore_ getPendingContactConnections
let connIds = map aConnId' pcs
pure connIds
contactSubsToView :: Map ConnId AgentErrorType -> [ConnId] -> Bool -> CM ()
contactSubsToView errs cts ce = do
-- chatModifyVar connNetworkStatuses $ M.union (M.fromList statuses) -- via UP
ifM (asks $ coreApi . config) notifyAPI notifyCLI
where
notifyCLI = do
let (okSubs, errSubs) = foldl' (\(os, es) acId -> if M.member acId errs then (os, es + 1) else (os + 1, es)) (0, 0) cts
toView $ CRConnectionSubSummary {user, okSubs, errSubs}
when (ce && errSubs > 0) $ toView $ error "TODO: CRContactSubError {user, contactName :: Text, chatError :: ChatError}"
notifyAPI = toView . CRNetworkStatuses (Just user) $ map (uncurry ConnNetworkStatus) statuses
statuses = foldr addStatus [] cts
where
addStatus :: ConnId -> [(AgentConnId, NetworkStatus)] -> [(AgentConnId, NetworkStatus)]
addStatus connId nss =
case M.lookup connId errs of
Nothing -> nss
Just err -> (AgentConnId connId, NSError $ errorNetworkStatus err) : nss
errorNetworkStatus :: AgentErrorType -> String
errorNetworkStatus = \case
BROKER _ NETWORK -> "network"
SMP _ SMP.AUTH -> "contact deleted"
e -> show e
cleanupManager :: CM ()
cleanupManager = do