restore rare entities prefetch

This commit is contained in:
Alexander Bondarenko
2024-05-29 15:12:52 +03:00
parent a1216d86fd
commit b94ced6b39
2 changed files with 62 additions and 44 deletions
+57 -42
View File
@@ -3336,73 +3336,73 @@ agentSubscriber = do
subscribeUserConnections :: VersionRangeChat -> Bool -> User -> CM ()
subscribeUserConnections vr onlyNeeded user = do
-- get user connections
((ctConns, ucConns, mConns, sftConns, rftConns, pcConns), gs) <-
(conns, ctConns, ucs, gs, mConns, sfts, rfts, pcConns) <-
if onlyNeeded
then do
-- XXX: can be streamed from DB without collecting everything, needs DB.fold wrapper in mq
conns <- withStore' (\db -> getConnectionsToSubscribe db vr user)
pure (foldl' addEntity ([], [], [], [], [], []) conns, [])
(conns, entities) <- withStore' $ \a -> getConnectionsToSubscribe a vr user
let (ctConns, ucs, mConns, sfts, rfts, pcConns) = foldl' addEntity ([], M.empty, [], M.empty, M.empty, []) entities
pure (conns, ctConns, ucs, [], mConns, sfts, rfts, pcConns)
else do
withStore' unsetConnectionToSubscribe
ctConns <- getContactConns
ucConns <- getUserContactLinkConns
-- (gs, mConns, ms) <- getGroupMemberConns
(ucConns, ucs) <- getUserContactLinkConns
(gs, mConns) <- getGroupMemberConns
sftConns <- getSndFileTransferConns
rftConns <- getRcvFileTransferConns
(sftConns, sfts) <- getSndFileTransferConns
(rftConns, rfts) <- getRcvFileTransferConns
pcConns <- getPendingContactConns
pure ((ctConns, ucConns, mConns, sftConns, rftConns, pcConns), gs)
let userConns = concat [ctConns, ucConns, mConns, sftConns, rftConns, pcConns]
let conns = concat [ctConns, ucConns, mConns, sftConns, rftConns, pcConns]
pure (conns, ctConns, ucs, gs, mConns, sfts, rfts, pcConns)
void . lift . forkIO . void . runExceptT $ do
-- detach subscription and result processing
rs <- withAgent (`Agent.subscribeConnections` userConns) -- subscribe using batched commands
rs <- withAgent (`Agent.subscribeConnections` conns) -- subscribe using batched commands
let (errs, _oks) = M.mapEither id rs
api <- asks $ coreApi . config
refs <- withStore' $ \db -> getConnectionsContacts db (if api then M.keys errs else M.keys rs)
let connRefs = M.fromList $ map (\ref@ContactRef {agentConnId = AgentConnId acId} -> (acId, ref)) refs
ce <- asks $ subscriptionEvents . config
contactSubsToView errs ctConns connRefs ce
contactLinkSubsToView errs ucConns
contactLinkSubsToView errs ucs
groupSubsToView errs gs mConns connRefs ce
-- TODO: sndFileSubsToView rs sfts
-- TODO: rcvFileSubsToView rs rfts
-- TODO: pendingConnSubsToView rs pcs
sndFileSubsToView errs sfts
rcvFileSubsToView errs rfts
pendingConnSubsToView errs pcConns
where
addEntity (cts, ucs, ms, sfts, rfts, pcs) = \case
RcvDirectMsgConnection c (Just _ct) -> let cts' = aConnId c : cts in (cts', ucs, ms, sfts, rfts, pcs)
RcvDirectMsgConnection c Nothing -> let pcs' = aConnId c : pcs in (cts, ucs, ms, sfts, rfts, pcs')
RcvGroupMsgConnection c _g _m -> let ms' = aConnId c : ms in (cts, ucs, ms', sfts, rfts, pcs)
SndFileConnection c _sft -> let sfts' = aConnId c : sfts in (cts, ucs, ms, sfts', rfts, pcs)
RcvFileConnection c _rft -> let rfts' = aConnId c : rfts in (cts, ucs, ms, sfts, rfts', pcs)
UserContactConnection c _uc -> let ucs' = aConnId c : ucs in (cts, ucs', ms, sfts, rfts, pcs)
withStore_ :: (DB.Connection -> User -> IO [a]) -> CM [a]
withStore_ a = withStore' (`a` user) `catchChatError` \e -> toView (CRChatError (Just user) e) $> []
RcvDirectMsgConnection c (Just _ct) -> let cts' = addSub c cts in (cts', ucs, ms, sfts, rfts, pcs)
RcvDirectMsgConnection c Nothing -> let pcs' = addSub c pcs in (cts, ucs, ms, sfts, rfts, pcs')
RcvGroupMsgConnection c _g _m -> let ms' = addSub c ms in (cts, ucs, ms', sfts, rfts, pcs)
SndFileConnection c sft -> let sfts' = addConn c sft sfts in (cts, ucs, ms, sfts', rfts, pcs)
RcvFileConnection c rft -> let rfts' = addConn c rft rfts in (cts, ucs, ms, sfts, rfts', pcs)
UserContactConnection c uc -> let ucs' = addConn c uc ucs in (cts, ucs', ms, sfts, rfts, pcs)
addConn :: Connection -> a -> Map ConnId a -> Map ConnId a
addConn = M.insert . aConnId
addSub :: Connection -> [ConnId] -> [ConnId]
addSub c = (aConnId c :)
getContactConns :: CM [ConnId]
getContactConns = do
cts <- withStore_ (`getUserContacts` vr)
cts <- withStore_ (`getUserContacts` vr) -- TODO: lightweight query
let cts' = mapMaybe (\ct -> (,ct) <$> contactConnId ct) $ filter contactActive cts
pure (map fst cts')
getUserContactLinkConns :: CM [ConnId]
getUserContactLinkConns :: CM ([ConnId], Map ConnId UserContact)
getUserContactLinkConns = do
(cs, _ucs) <- unzip <$> withStore_ (`getUserContactLinks` vr)
(cs, ucs) <- unzip <$> withStore_ (`getUserContactLinks` vr)
let connIds = map aConnId cs
pure connIds
getGroupMemberConns :: CM ([Group], [ConnId]) -- ([Group], [ConnId], Map ConnId GroupMember)
pure (connIds, M.fromList $ zip connIds ucs)
getGroupMemberConns :: CM ([Group], [ConnId])
getGroupMemberConns = do
gs <- withStore_ (`getUserGroups` vr)
gs <- withStore_ (`getUserGroups` vr) -- TODO: lightweight query
let mPairs = concatMap (\(Group _ ms) -> mapMaybe (\m -> (,m) <$> memberConnId m) (filter (not . memberRemoved) ms)) gs
-- pure (gs, map fst mPairs, M.fromList mPairs)
pure (gs, map fst mPairs)
getSndFileTransferConns :: CM [ConnId]
getSndFileTransferConns :: CM ([ConnId], Map ConnId SndFileTransfer)
getSndFileTransferConns = do
sfts <- withStore_ getLiveSndFileTransfers
let connIds = map sndFileTransferConnId sfts
pure connIds
getRcvFileTransferConns :: CM [ConnId]
pure (connIds, M.fromList $ zip connIds sfts)
getRcvFileTransferConns :: CM ([ConnId], Map ConnId RcvFileTransfer)
getRcvFileTransferConns = do
rfts <- withStore_ getLiveRcvFileTransfers
let rftPairs = mapMaybe (\ft -> (,ft) <$> liveRcvFileTransferConnId ft) rfts
pure (map fst rftPairs)
pure (map fst rftPairs, M.fromList rftPairs)
getPendingContactConns :: CM [ConnId]
getPendingContactConns = do
pcs <- withStore_ getPendingContactConnections
@@ -3427,13 +3427,11 @@ subscribeUserConnections vr onlyNeeded user = do
BROKER _ NETWORK -> "network"
SMP _ SMP.AUTH -> "contact deleted"
e -> show e
contactLinkSubsToView :: Map ConnId AgentErrorType -> [ConnId] -> CM ()
contactLinkSubsToView errs ucConns = do
let conns = S.fromList ucConns
links <- withStore_ (`getUserContactLinks` vr)
let (addresses, groupLinks) = partition (\(_, uc) -> isNothing $ userContactGroupId uc) $ filter (\(c, _) -> S.member (aConnId c) conns) links -- TODO: move into query
forM_ addresses $ \(conn, _uc) -> toView $ CRUserAddrSubStatus {user, userContactError = (`ChatErrorAgent` Nothing) <$> M.lookup (aConnId conn) errs}
let groups = S.fromList $ map (aConnId . fst) groupLinks
contactLinkSubsToView :: Map ConnId AgentErrorType -> Map ConnId UserContact -> CM ()
contactLinkSubsToView errs ucs = do
let (addresses, groupLinks) = partition (\(_, uc) -> isNothing $ userContactGroupId uc) (M.assocs ucs) -- TODO: move into query
forM_ addresses $ \(acId, _uc) -> toView $ CRUserAddrSubStatus {user, userContactError = (`ChatErrorAgent` Nothing) <$> M.lookup acId errs}
let groups = S.fromList $ map fst groupLinks
errGroups = M.restrictKeys errs groups
unless (S.null groups) $ toView CRUserGroupLinksSubSummary
{ user,
@@ -3444,7 +3442,7 @@ subscribeUserConnections vr onlyNeeded user = do
groupSubsToView errs gs ms connRefs ce = do
mapM_ groupSub $
sortOn (\(Group GroupInfo {localDisplayName = g} _) -> g) gs
toView CRMemberSubSummary {user, okSubs = S.size conns - M.size errConns, errSubs = M.size errConns} -- XXX: add label?
toView CRMemberSubSummary {user, okSubs = S.size conns - M.size errConns, errSubs = M.size errConns}
where
conns = S.fromList ms
errConns = M.restrictKeys errs conns
@@ -3469,6 +3467,23 @@ subscribeUserConnections vr onlyNeeded user = do
then CRGroupEmpty user g
else CRGroupRemoved user g
| otherwise = CRGroupSubscribed user g
sndFileSubsToView :: Map ConnId AgentErrorType -> Map ConnId SndFileTransfer -> CM ()
sndFileSubsToView errs sfts =
forM_ (M.assocs sfts) $ \(acId, ft@SndFileTransfer {fileId, fileStatus}) -> do
forM_ (M.lookup acId errs) $ toView . CRSndFileSubError user ft . (`ChatErrorAgent` Nothing)
void . forkIO $ do
threadDelay 1000000
when (fileStatus == FSConnected) . unlessM (isFileActive fileId sndFiles) . withChatLock "subscribe sendFileChunk" $
sendFileChunk user ft
rcvFileSubsToView :: Map ConnId AgentErrorType -> Map ConnId RcvFileTransfer -> CM ()
rcvFileSubsToView errs = mapM_ (toView . uncurry (CRRcvFileSubError user)) . M.mapMaybeWithKey (\acId rft -> (\e -> (rft, ChatErrorAgent e Nothing)) <$> M.lookup acId errs)
pendingConnSubsToView :: Map ConnId AgentErrorType -> [ConnId] -> CM () -- XXX: ignored by View
pendingConnSubsToView errs pcs = toView CRPendingSubSummary {user, okSubs = S.size conns - M.size errConns, errSubs = M.size errConns}
where
conns = S.fromList pcs
errConns = M.restrictKeys errs conns
withStore_ :: (DB.Connection -> User -> IO [a]) -> CM [a]
withStore_ a = withStore' (`a` user) `catchChatError` \e -> toView (CRChatError (Just user) e) $> []
cleanupManager :: CM ()
cleanupManager = do
interval <- asks (cleanupManagerInterval . config)
+5 -2
View File
@@ -211,11 +211,14 @@ getContactConnEntityByConnReqHash db vr user@User {userId} (cReqHash1, cReqHash2
(userId, cReqHash1, cReqHash2, ConnDeleted)
maybe (pure Nothing) (fmap eitherToMaybe . runExceptT . getConnectionEntity db vr user) connId_
getConnectionsToSubscribe :: DB.Connection -> VersionRangeChat -> User -> IO [ConnectionEntity]
getConnectionsToSubscribe :: DB.Connection -> VersionRangeChat -> User -> IO ([ConnId], [ConnectionEntity])
getConnectionsToSubscribe db vr user@User {userId} = do
aConnIds <- map fromOnly <$> DB.query db "SELECT agent_conn_id FROM connections WHERE c.user_id = ? AND to_subscribe = 1" (Only userId)
unsetConnectionToSubscribe db
fmap catMaybes $ forM aConnIds $ \acId -> eitherToMaybe <$> runExceptT (getConnectionEntity db vr user acId)
entities <- forM aConnIds $ \acId -> eitherToMaybe <$> runExceptT (getConnectionEntity db vr user acId)
unsetConnectionToSubscribe db
let connIds = map (\(AgentConnId connId) -> connId) aConnIds
pure (connIds, catMaybes entities)
unsetConnectionToSubscribe :: DB.Connection -> IO ()
unsetConnectionToSubscribe db = DB.execute_ db "UPDATE connections SET to_subscribe = 0 WHERE to_subscribe = 1"