From 9854caca331f6c85f4d76db47f8eb3d205fa7eee Mon Sep 17 00:00:00 2001 From: Evgeny Date: Tue, 30 Sep 2025 12:34:42 +0100 Subject: [PATCH] agent: optimize subscriptions (#1645) * agent: optimize subscriptions * simplify * clean up --- src/Simplex/Messaging/Agent.hs | 59 +++++++++++++++++++++------------- 1 file changed, 37 insertions(+), 22 deletions(-) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 7193cb53d..c3f349b1d 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -1264,32 +1264,43 @@ type QSubResult = QCmdResult (Maybe SMP.ServiceId) subscribeConnections' :: AgentClient -> [ConnId] -> AM (Map ConnId (Either AgentErrorType (Maybe ClientServiceId))) subscribeConnections' _ [] = pure M.empty subscribeConnections' c connIds = do - conns :: Map ConnId (Either StoreError SomeConn) <- M.fromList . zip connIds <$> withStore' c (`getConns` connIds) - let (errs, cs) = M.mapEither id conns - errs' = M.map (Left . storeError) errs - (subRs, rcvQs) = M.mapEither rcvQueueOrResult cs + conns <- withStore' c (`getConns` connIds) + let (subRs, cs) = foldr partitionResultsConns ([], []) $ zip connIds conns resumeDelivery cs - resumeConnCmds c $ M.keys cs - rcvRs <- lift $ connResults . fst <$> subscribeQueues c (concat $ M.elems rcvQs) + resumeConnCmds c $ map fst cs + rcvRs <- lift $ connResults . fst <$> subscribeQueues c (concatMap rcvQueues cs) rcvRs' <- storeClientServiceAssocs rcvRs ns <- asks ntfSupervisor lift $ whenM (liftIO $ hasInstantNotifications ns) . void . forkIO . void $ sendNtfCreate ns rcvRs' cs - let rs = M.unions ([errs', subRs, rcvRs'] :: [Map ConnId (Either AgentErrorType (Maybe ClientServiceId))]) + let rs = M.fromList subRs `M.union` rcvRs' notifyResultError rs pure rs where - rcvQueueOrResult :: SomeConn -> Either (Either AgentErrorType (Maybe ClientServiceId)) [RcvQueue] - rcvQueueOrResult (SomeConn _ conn) = case conn of - DuplexConnection _ rqs _ -> Right $ L.toList rqs - SndConnection _ sq -> Left $ sndSubResult sq - RcvConnection _ rq -> Right [rq] - ContactConnection _ rq -> Right [rq] - NewConnection _ -> Left (Right Nothing) + partitionResultsConns :: (ConnId, Either StoreError SomeConn) -> + ([(ConnId, Either AgentErrorType (Maybe ClientServiceId))], [(ConnId, SomeConn)]) -> + ([(ConnId, Either AgentErrorType (Maybe ClientServiceId))], [(ConnId, SomeConn)]) + partitionResultsConns (connId, conn_) (rs, cs) = case conn_ of + Left e -> ((connId, Left (storeError e)) : rs, cs) + Right c'@(SomeConn _ conn) -> case conn of + DuplexConnection {} -> (rs, cs') + SndConnection _ sq -> ((connId, sndSubResult sq) : rs, cs') + RcvConnection _ _ -> (rs, cs') + ContactConnection _ _ -> (rs, cs') + NewConnection _ -> ((connId, Right Nothing) : rs, cs') + where + cs' = (connId, c') : cs sndSubResult :: SndQueue -> Either AgentErrorType (Maybe ClientServiceId) sndSubResult SndQueue {status} = case status of Confirmed -> Right Nothing Active -> Left $ CONN SIMPLEX "subscribeConnections" _ -> Left $ INTERNAL "unexpected queue status" + rcvQueues :: (ConnId, SomeConn) -> [RcvQueue] + rcvQueues (_, SomeConn _ conn) = case conn of + DuplexConnection _ rqs _ -> L.toList rqs + SndConnection {} -> [] + RcvConnection _ rq -> [rq] + ContactConnection _ rq -> [rq] + NewConnection _ -> [] connResults :: [(RcvQueue, Either AgentErrorType (Maybe SMP.ServiceId))] -> Map ConnId (Either AgentErrorType (Maybe SMP.ServiceId)) connResults = M.map snd . foldl' addResult M.empty where @@ -1308,21 +1319,25 @@ subscribeConnections' c connIds = do -- TODO [certs rcv] store associations of queues with client service ID storeClientServiceAssocs :: Map ConnId (Either AgentErrorType (Maybe SMP.ServiceId)) -> AM (Map ConnId (Either AgentErrorType (Maybe ClientServiceId))) storeClientServiceAssocs = pure . M.map (Nothing <$) - sendNtfCreate :: NtfSupervisor -> Map ConnId (Either AgentErrorType (Maybe ClientServiceId)) -> Map ConnId SomeConn -> AM' () + sendNtfCreate :: NtfSupervisor -> Map ConnId (Either AgentErrorType (Maybe ClientServiceId)) -> [(ConnId, SomeConn)] -> AM' () sendNtfCreate ns rcvRs cs = do let oks = M.keysSet $ M.filter (either temporaryAgentError $ const True) rcvRs - cs' = M.restrictKeys cs oks - (csCreate, csDelete) = M.partition (\(SomeConn _ conn) -> enableNtfs $ toConnData conn) cs' + (csCreate, csDelete) = foldr (groupConnIds oks) ([], []) cs sendNtfCmd NSCCreate csCreate sendNtfCmd NSCSmpDelete csDelete where - sendNtfCmd cmd cs' = forM_ (L.nonEmpty $ M.keys cs') $ \cids -> atomically $ writeTBQueue (ntfSubQ ns) (cmd, cids) - resumeDelivery :: Map ConnId SomeConn -> AM () + groupConnIds oks (connId, SomeConn _ conn) acc@(csCreate, csDelete) + | connId `S.notMember` oks = acc + | enableNtfs (toConnData conn) = (connId : csCreate, csDelete) + | otherwise = (csCreate, connId : csDelete) + sendNtfCmd cmd = mapM_ (\cids -> atomically $ writeTBQueue (ntfSubQ ns) (cmd, cids)) . L.nonEmpty + resumeDelivery :: [(ConnId, SomeConn)] -> AM () resumeDelivery conns = do - conns' <- M.restrictKeys conns . S.fromList <$> withStore' c getConnectionsForDelivery + deliverTo <- S.fromList <$> withStore' c getConnectionsForDelivery + let conns' = filter ((`S.member` deliverTo) . fst) conns lift $ mapM_ (mapM_ (\(cData, sqs) -> mapM_ (resumeMsgDelivery c cData) sqs) . sndQueue) conns' - sndQueue :: SomeConn -> Maybe (ConnData, NonEmpty SndQueue) - sndQueue (SomeConn _ conn) = case conn of + sndQueue :: (ConnId, SomeConn) -> Maybe (ConnData, NonEmpty SndQueue) + sndQueue (_, SomeConn _ conn) = case conn of DuplexConnection cData _ sqs -> Just (cData, sqs) SndConnection cData sq -> Just (cData, [sq]) _ -> Nothing