From 1ce63bee44bf7f74c7ec2ce3bdd78db0a369fffa Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Tue, 19 Jul 2022 09:30:17 +0100 Subject: [PATCH] create notification subscriptions in batch subscribe (#477) * create notification subscriptions in batch subscribe * refactor * refactor --- src/Simplex/Messaging/Agent.hs | 8 ++++++++ src/Simplex/Messaging/Agent/NtfSubSupervisor.hs | 16 +++++++++------- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 30e745bdc..71680a174 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -414,6 +414,9 @@ subscribeConnections' c connIds = do srvRcvQs :: Map SMPServer (Map ConnId (RcvQueue, ConnData)) = M.foldlWithKey' addRcvQueue M.empty rcvQs mapM_ (mapM_ (uncurry $ resumeMsgDelivery c) . sndQueue) cs rcvRs <- mapConcurrently subscribe (M.assocs srvRcvQs) + ns <- asks ntfSupervisor + tkn <- readTVarIO (ntfTkn ns) + when (instantNotifications tkn) . void . forkIO $ sendNtfCreate ns rcvRs let rs = M.unions $ errs' : sndRs : rcvRs notifyResultError rs pure rs @@ -433,6 +436,11 @@ subscribeConnections' c connIds = do addRcvQueue m connId rq@(RcvQueue {server}, _) = M.alter (Just . maybe (M.singleton connId rq) (M.insert connId rq)) server m subscribe :: (SMPServer, Map ConnId (RcvQueue, ConnData)) -> m (Map ConnId (Either AgentErrorType ())) subscribe (srv, qs) = subscribeQueues c srv (M.map fst qs) + sendNtfCreate :: NtfSupervisor -> [Map ConnId (Either AgentErrorType ())] -> m () + sendNtfCreate ns rcvRs = + forM_ (concatMap M.assocs rcvRs) $ \case + (connId, Right _) -> atomically $ writeTBQueue (ntfSubQ ns) (connId, NSCCreate) + _ -> pure () sndQueue :: SomeConn -> Maybe (ConnData, SndQueue) sndQueue = \case SomeConn _ (DuplexConnection cData _ sq) -> Just (cData, sq) diff --git a/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs b/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs index 45aee73e2..0a45e38a3 100644 --- a/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs +++ b/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs @@ -11,6 +11,7 @@ module Simplex.Messaging.Agent.NtfSubSupervisor nsUpdateToken, nsRemoveNtfToken, sendNtfSubCommand, + instantNotifications, closeNtfSupervisor, getNtfServer, ) @@ -327,13 +328,14 @@ nsRemoveNtfToken :: NtfSupervisor -> STM () nsRemoveNtfToken ns = writeTVar (ntfTkn ns) Nothing sendNtfSubCommand :: NtfSupervisor -> (ConnId, NtfSupervisorCommand) -> STM () -sendNtfSubCommand ns cmd = - readTVar (ntfTkn ns) - >>= mapM_ - ( \NtfToken {ntfTknStatus, ntfMode} -> - when (ntfTknStatus == NTActive && ntfMode == NMInstant) $ - writeTBQueue (ntfSubQ ns) cmd - ) +sendNtfSubCommand ns cmd = do + tkn <- readTVar (ntfTkn ns) + when (instantNotifications tkn) $ writeTBQueue (ntfSubQ ns) cmd + +instantNotifications :: Maybe NtfToken -> Bool +instantNotifications = \case + Just NtfToken {ntfTknStatus = NTActive, ntfMode = NMInstant} -> True + _ -> False closeNtfSupervisor :: NtfSupervisor -> IO () closeNtfSupervisor ns = do