agent: optimize subscriptions (#1645)

* agent: optimize subscriptions

* simplify

* clean up
This commit is contained in:
Evgeny
2025-09-30 12:34:42 +01:00
committed by GitHub
parent c8b551dcf7
commit 9854caca33

View File

@@ -1264,32 +1264,43 @@ type QSubResult = QCmdResult (Maybe SMP.ServiceId)
subscribeConnections' :: AgentClient -> [ConnId] -> AM (Map ConnId (Either AgentErrorType (Maybe ClientServiceId)))
subscribeConnections' _ [] = pure M.empty
subscribeConnections' c connIds = do
conns :: Map ConnId (Either StoreError SomeConn) <- M.fromList . zip connIds <$> withStore' c (`getConns` connIds)
let (errs, cs) = M.mapEither id conns
errs' = M.map (Left . storeError) errs
(subRs, rcvQs) = M.mapEither rcvQueueOrResult cs
conns <- withStore' c (`getConns` connIds)
let (subRs, cs) = foldr partitionResultsConns ([], []) $ zip connIds conns
resumeDelivery cs
resumeConnCmds c $ M.keys cs
rcvRs <- lift $ connResults . fst <$> subscribeQueues c (concat $ M.elems rcvQs)
resumeConnCmds c $ map fst cs
rcvRs <- lift $ connResults . fst <$> subscribeQueues c (concatMap rcvQueues cs)
rcvRs' <- storeClientServiceAssocs rcvRs
ns <- asks ntfSupervisor
lift $ whenM (liftIO $ hasInstantNotifications ns) . void . forkIO . void $ sendNtfCreate ns rcvRs' cs
let rs = M.unions ([errs', subRs, rcvRs'] :: [Map ConnId (Either AgentErrorType (Maybe ClientServiceId))])
let rs = M.fromList subRs `M.union` rcvRs'
notifyResultError rs
pure rs
where
rcvQueueOrResult :: SomeConn -> Either (Either AgentErrorType (Maybe ClientServiceId)) [RcvQueue]
rcvQueueOrResult (SomeConn _ conn) = case conn of
DuplexConnection _ rqs _ -> Right $ L.toList rqs
SndConnection _ sq -> Left $ sndSubResult sq
RcvConnection _ rq -> Right [rq]
ContactConnection _ rq -> Right [rq]
NewConnection _ -> Left (Right Nothing)
partitionResultsConns :: (ConnId, Either StoreError SomeConn) ->
([(ConnId, Either AgentErrorType (Maybe ClientServiceId))], [(ConnId, SomeConn)]) ->
([(ConnId, Either AgentErrorType (Maybe ClientServiceId))], [(ConnId, SomeConn)])
partitionResultsConns (connId, conn_) (rs, cs) = case conn_ of
Left e -> ((connId, Left (storeError e)) : rs, cs)
Right c'@(SomeConn _ conn) -> case conn of
DuplexConnection {} -> (rs, cs')
SndConnection _ sq -> ((connId, sndSubResult sq) : rs, cs')
RcvConnection _ _ -> (rs, cs')
ContactConnection _ _ -> (rs, cs')
NewConnection _ -> ((connId, Right Nothing) : rs, cs')
where
cs' = (connId, c') : cs
sndSubResult :: SndQueue -> Either AgentErrorType (Maybe ClientServiceId)
sndSubResult SndQueue {status} = case status of
Confirmed -> Right Nothing
Active -> Left $ CONN SIMPLEX "subscribeConnections"
_ -> Left $ INTERNAL "unexpected queue status"
rcvQueues :: (ConnId, SomeConn) -> [RcvQueue]
rcvQueues (_, SomeConn _ conn) = case conn of
DuplexConnection _ rqs _ -> L.toList rqs
SndConnection {} -> []
RcvConnection _ rq -> [rq]
ContactConnection _ rq -> [rq]
NewConnection _ -> []
connResults :: [(RcvQueue, Either AgentErrorType (Maybe SMP.ServiceId))] -> Map ConnId (Either AgentErrorType (Maybe SMP.ServiceId))
connResults = M.map snd . foldl' addResult M.empty
where
@@ -1308,21 +1319,25 @@ subscribeConnections' c connIds = do
-- TODO [certs rcv] store associations of queues with client service ID
storeClientServiceAssocs :: Map ConnId (Either AgentErrorType (Maybe SMP.ServiceId)) -> AM (Map ConnId (Either AgentErrorType (Maybe ClientServiceId)))
storeClientServiceAssocs = pure . M.map (Nothing <$)
sendNtfCreate :: NtfSupervisor -> Map ConnId (Either AgentErrorType (Maybe ClientServiceId)) -> Map ConnId SomeConn -> AM' ()
sendNtfCreate :: NtfSupervisor -> Map ConnId (Either AgentErrorType (Maybe ClientServiceId)) -> [(ConnId, SomeConn)] -> AM' ()
sendNtfCreate ns rcvRs cs = do
let oks = M.keysSet $ M.filter (either temporaryAgentError $ const True) rcvRs
cs' = M.restrictKeys cs oks
(csCreate, csDelete) = M.partition (\(SomeConn _ conn) -> enableNtfs $ toConnData conn) cs'
(csCreate, csDelete) = foldr (groupConnIds oks) ([], []) cs
sendNtfCmd NSCCreate csCreate
sendNtfCmd NSCSmpDelete csDelete
where
sendNtfCmd cmd cs' = forM_ (L.nonEmpty $ M.keys cs') $ \cids -> atomically $ writeTBQueue (ntfSubQ ns) (cmd, cids)
resumeDelivery :: Map ConnId SomeConn -> AM ()
groupConnIds oks (connId, SomeConn _ conn) acc@(csCreate, csDelete)
| connId `S.notMember` oks = acc
| enableNtfs (toConnData conn) = (connId : csCreate, csDelete)
| otherwise = (csCreate, connId : csDelete)
sendNtfCmd cmd = mapM_ (\cids -> atomically $ writeTBQueue (ntfSubQ ns) (cmd, cids)) . L.nonEmpty
resumeDelivery :: [(ConnId, SomeConn)] -> AM ()
resumeDelivery conns = do
conns' <- M.restrictKeys conns . S.fromList <$> withStore' c getConnectionsForDelivery
deliverTo <- S.fromList <$> withStore' c getConnectionsForDelivery
let conns' = filter ((`S.member` deliverTo) . fst) conns
lift $ mapM_ (mapM_ (\(cData, sqs) -> mapM_ (resumeMsgDelivery c cData) sqs) . sndQueue) conns'
sndQueue :: SomeConn -> Maybe (ConnData, NonEmpty SndQueue)
sndQueue (SomeConn _ conn) = case conn of
sndQueue :: (ConnId, SomeConn) -> Maybe (ConnData, NonEmpty SndQueue)
sndQueue (_, SomeConn _ conn) = case conn of
DuplexConnection cData _ sqs -> Just (cData, sqs)
SndConnection cData sq -> Just (cData, [sq])
_ -> Nothing