diff --git a/src/Simplex/Chat.hs b/src/Simplex/Chat.hs index f9fdf0f5f7..7e7d201d88 100644 --- a/src/Simplex/Chat.hs +++ b/src/Simplex/Chat.hs @@ -3336,73 +3336,73 @@ agentSubscriber = do subscribeUserConnections :: VersionRangeChat -> Bool -> User -> CM () subscribeUserConnections vr onlyNeeded user = do -- get user connections - ((ctConns, ucConns, mConns, sftConns, rftConns, pcConns), gs) <- + (conns, ctConns, ucs, gs, mConns, sfts, rfts, pcConns) <- if onlyNeeded then do - -- XXX: can be streamed from DB without collecting everything, needs DB.fold wrapper in mq - conns <- withStore' (\db -> getConnectionsToSubscribe db vr user) - pure (foldl' addEntity ([], [], [], [], [], []) conns, []) + (conns, entities) <- withStore' $ \a -> getConnectionsToSubscribe a vr user + let (ctConns, ucs, mConns, sfts, rfts, pcConns) = foldl' addEntity ([], M.empty, [], M.empty, M.empty, []) entities + pure (conns, ctConns, ucs, [], mConns, sfts, rfts, pcConns) else do withStore' unsetConnectionToSubscribe ctConns <- getContactConns - ucConns <- getUserContactLinkConns - -- (gs, mConns, ms) <- getGroupMemberConns + (ucConns, ucs) <- getUserContactLinkConns (gs, mConns) <- getGroupMemberConns - sftConns <- getSndFileTransferConns - rftConns <- getRcvFileTransferConns + (sftConns, sfts) <- getSndFileTransferConns + (rftConns, rfts) <- getRcvFileTransferConns pcConns <- getPendingContactConns - pure ((ctConns, ucConns, mConns, sftConns, rftConns, pcConns), gs) - let userConns = concat [ctConns, ucConns, mConns, sftConns, rftConns, pcConns] + let conns = concat [ctConns, ucConns, mConns, sftConns, rftConns, pcConns] + pure (conns, ctConns, ucs, gs, mConns, sfts, rfts, pcConns) void . lift . forkIO . void . runExceptT $ do -- detach subscription and result processing - rs <- withAgent (`Agent.subscribeConnections` userConns) -- subscribe using batched commands + rs <- withAgent (`Agent.subscribeConnections` conns) -- subscribe using batched commands let (errs, _oks) = M.mapEither id rs api <- asks $ coreApi . config refs <- withStore' $ \db -> getConnectionsContacts db (if api then M.keys errs else M.keys rs) let connRefs = M.fromList $ map (\ref@ContactRef {agentConnId = AgentConnId acId} -> (acId, ref)) refs ce <- asks $ subscriptionEvents . config contactSubsToView errs ctConns connRefs ce - contactLinkSubsToView errs ucConns + contactLinkSubsToView errs ucs groupSubsToView errs gs mConns connRefs ce - -- TODO: sndFileSubsToView rs sfts - -- TODO: rcvFileSubsToView rs rfts - -- TODO: pendingConnSubsToView rs pcs + sndFileSubsToView errs sfts + rcvFileSubsToView errs rfts + pendingConnSubsToView errs pcConns where addEntity (cts, ucs, ms, sfts, rfts, pcs) = \case - RcvDirectMsgConnection c (Just _ct) -> let cts' = aConnId c : cts in (cts', ucs, ms, sfts, rfts, pcs) - RcvDirectMsgConnection c Nothing -> let pcs' = aConnId c : pcs in (cts, ucs, ms, sfts, rfts, pcs') - RcvGroupMsgConnection c _g _m -> let ms' = aConnId c : ms in (cts, ucs, ms', sfts, rfts, pcs) - SndFileConnection c _sft -> let sfts' = aConnId c : sfts in (cts, ucs, ms, sfts', rfts, pcs) - RcvFileConnection c _rft -> let rfts' = aConnId c : rfts in (cts, ucs, ms, sfts, rfts', pcs) - UserContactConnection c _uc -> let ucs' = aConnId c : ucs in (cts, ucs', ms, sfts, rfts, pcs) - withStore_ :: (DB.Connection -> User -> IO [a]) -> CM [a] - withStore_ a = withStore' (`a` user) `catchChatError` \e -> toView (CRChatError (Just user) e) $> [] + RcvDirectMsgConnection c (Just _ct) -> let cts' = addSub c cts in (cts', ucs, ms, sfts, rfts, pcs) + RcvDirectMsgConnection c Nothing -> let pcs' = addSub c pcs in (cts, ucs, ms, sfts, rfts, pcs') + RcvGroupMsgConnection c _g _m -> let ms' = addSub c ms in (cts, ucs, ms', sfts, rfts, pcs) + SndFileConnection c sft -> let sfts' = addConn c sft sfts in (cts, ucs, ms, sfts', rfts, pcs) + RcvFileConnection c rft -> let rfts' = addConn c rft rfts in (cts, ucs, ms, sfts, rfts', pcs) + UserContactConnection c uc -> let ucs' = addConn c uc ucs in (cts, ucs', ms, sfts, rfts, pcs) + addConn :: Connection -> a -> Map ConnId a -> Map ConnId a + addConn = M.insert . aConnId + addSub :: Connection -> [ConnId] -> [ConnId] + addSub c = (aConnId c :) getContactConns :: CM [ConnId] getContactConns = do - cts <- withStore_ (`getUserContacts` vr) + cts <- withStore_ (`getUserContacts` vr) -- TODO: lightweight query let cts' = mapMaybe (\ct -> (,ct) <$> contactConnId ct) $ filter contactActive cts pure (map fst cts') - getUserContactLinkConns :: CM [ConnId] + getUserContactLinkConns :: CM ([ConnId], Map ConnId UserContact) getUserContactLinkConns = do - (cs, _ucs) <- unzip <$> withStore_ (`getUserContactLinks` vr) + (cs, ucs) <- unzip <$> withStore_ (`getUserContactLinks` vr) let connIds = map aConnId cs - pure connIds - getGroupMemberConns :: CM ([Group], [ConnId]) -- ([Group], [ConnId], Map ConnId GroupMember) + pure (connIds, M.fromList $ zip connIds ucs) + getGroupMemberConns :: CM ([Group], [ConnId]) getGroupMemberConns = do - gs <- withStore_ (`getUserGroups` vr) + gs <- withStore_ (`getUserGroups` vr) -- TODO: lightweight query let mPairs = concatMap (\(Group _ ms) -> mapMaybe (\m -> (,m) <$> memberConnId m) (filter (not . memberRemoved) ms)) gs - -- pure (gs, map fst mPairs, M.fromList mPairs) pure (gs, map fst mPairs) - getSndFileTransferConns :: CM [ConnId] + getSndFileTransferConns :: CM ([ConnId], Map ConnId SndFileTransfer) getSndFileTransferConns = do sfts <- withStore_ getLiveSndFileTransfers let connIds = map sndFileTransferConnId sfts - pure connIds - getRcvFileTransferConns :: CM [ConnId] + pure (connIds, M.fromList $ zip connIds sfts) + getRcvFileTransferConns :: CM ([ConnId], Map ConnId RcvFileTransfer) getRcvFileTransferConns = do rfts <- withStore_ getLiveRcvFileTransfers let rftPairs = mapMaybe (\ft -> (,ft) <$> liveRcvFileTransferConnId ft) rfts - pure (map fst rftPairs) + pure (map fst rftPairs, M.fromList rftPairs) getPendingContactConns :: CM [ConnId] getPendingContactConns = do pcs <- withStore_ getPendingContactConnections @@ -3427,13 +3427,11 @@ subscribeUserConnections vr onlyNeeded user = do BROKER _ NETWORK -> "network" SMP _ SMP.AUTH -> "contact deleted" e -> show e - contactLinkSubsToView :: Map ConnId AgentErrorType -> [ConnId] -> CM () - contactLinkSubsToView errs ucConns = do - let conns = S.fromList ucConns - links <- withStore_ (`getUserContactLinks` vr) - let (addresses, groupLinks) = partition (\(_, uc) -> isNothing $ userContactGroupId uc) $ filter (\(c, _) -> S.member (aConnId c) conns) links -- TODO: move into query - forM_ addresses $ \(conn, _uc) -> toView $ CRUserAddrSubStatus {user, userContactError = (`ChatErrorAgent` Nothing) <$> M.lookup (aConnId conn) errs} - let groups = S.fromList $ map (aConnId . fst) groupLinks + contactLinkSubsToView :: Map ConnId AgentErrorType -> Map ConnId UserContact -> CM () + contactLinkSubsToView errs ucs = do + let (addresses, groupLinks) = partition (\(_, uc) -> isNothing $ userContactGroupId uc) (M.assocs ucs) -- TODO: move into query + forM_ addresses $ \(acId, _uc) -> toView $ CRUserAddrSubStatus {user, userContactError = (`ChatErrorAgent` Nothing) <$> M.lookup acId errs} + let groups = S.fromList $ map fst groupLinks errGroups = M.restrictKeys errs groups unless (S.null groups) $ toView CRUserGroupLinksSubSummary { user, @@ -3444,7 +3442,7 @@ subscribeUserConnections vr onlyNeeded user = do groupSubsToView errs gs ms connRefs ce = do mapM_ groupSub $ sortOn (\(Group GroupInfo {localDisplayName = g} _) -> g) gs - toView CRMemberSubSummary {user, okSubs = S.size conns - M.size errConns, errSubs = M.size errConns} -- XXX: add label? + toView CRMemberSubSummary {user, okSubs = S.size conns - M.size errConns, errSubs = M.size errConns} where conns = S.fromList ms errConns = M.restrictKeys errs conns @@ -3469,6 +3467,23 @@ subscribeUserConnections vr onlyNeeded user = do then CRGroupEmpty user g else CRGroupRemoved user g | otherwise = CRGroupSubscribed user g + sndFileSubsToView :: Map ConnId AgentErrorType -> Map ConnId SndFileTransfer -> CM () + sndFileSubsToView errs sfts = + forM_ (M.assocs sfts) $ \(acId, ft@SndFileTransfer {fileId, fileStatus}) -> do + forM_ (M.lookup acId errs) $ toView . CRSndFileSubError user ft . (`ChatErrorAgent` Nothing) + void . forkIO $ do + threadDelay 1000000 + when (fileStatus == FSConnected) . unlessM (isFileActive fileId sndFiles) . withChatLock "subscribe sendFileChunk" $ + sendFileChunk user ft + rcvFileSubsToView :: Map ConnId AgentErrorType -> Map ConnId RcvFileTransfer -> CM () + rcvFileSubsToView errs = mapM_ (toView . uncurry (CRRcvFileSubError user)) . M.mapMaybeWithKey (\acId rft -> (\e -> (rft, ChatErrorAgent e Nothing)) <$> M.lookup acId errs) + pendingConnSubsToView :: Map ConnId AgentErrorType -> [ConnId] -> CM () -- XXX: ignored by View + pendingConnSubsToView errs pcs = toView CRPendingSubSummary {user, okSubs = S.size conns - M.size errConns, errSubs = M.size errConns} + where + conns = S.fromList pcs + errConns = M.restrictKeys errs conns + withStore_ :: (DB.Connection -> User -> IO [a]) -> CM [a] + withStore_ a = withStore' (`a` user) `catchChatError` \e -> toView (CRChatError (Just user) e) $> [] cleanupManager :: CM () cleanupManager = do interval <- asks (cleanupManagerInterval . config) diff --git a/src/Simplex/Chat/Store/Connections.hs b/src/Simplex/Chat/Store/Connections.hs index 50b26ff105..ca84dadcc1 100644 --- a/src/Simplex/Chat/Store/Connections.hs +++ b/src/Simplex/Chat/Store/Connections.hs @@ -211,11 +211,14 @@ getContactConnEntityByConnReqHash db vr user@User {userId} (cReqHash1, cReqHash2 (userId, cReqHash1, cReqHash2, ConnDeleted) maybe (pure Nothing) (fmap eitherToMaybe . runExceptT . getConnectionEntity db vr user) connId_ -getConnectionsToSubscribe :: DB.Connection -> VersionRangeChat -> User -> IO [ConnectionEntity] +getConnectionsToSubscribe :: DB.Connection -> VersionRangeChat -> User -> IO ([ConnId], [ConnectionEntity]) getConnectionsToSubscribe db vr user@User {userId} = do aConnIds <- map fromOnly <$> DB.query db "SELECT agent_conn_id FROM connections WHERE c.user_id = ? AND to_subscribe = 1" (Only userId) unsetConnectionToSubscribe db - fmap catMaybes $ forM aConnIds $ \acId -> eitherToMaybe <$> runExceptT (getConnectionEntity db vr user acId) + entities <- forM aConnIds $ \acId -> eitherToMaybe <$> runExceptT (getConnectionEntity db vr user acId) + unsetConnectionToSubscribe db + let connIds = map (\(AgentConnId connId) -> connId) aConnIds + pure (connIds, catMaybes entities) unsetConnectionToSubscribe :: DB.Connection -> IO () unsetConnectionToSubscribe db = DB.execute_ db "UPDATE connections SET to_subscribe = 0 WHERE to_subscribe = 1"