From 32a150eb4030e91c59c077440143889c0d0321ff Mon Sep 17 00:00:00 2001 From: Evgeny Date: Sun, 18 Aug 2024 14:07:09 +0100 Subject: [PATCH] agent: enable notifications for all connections (#1262) --- src/Simplex/Messaging/Agent.hs | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 17d11246c..6ae73061a 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -959,12 +959,12 @@ subscribeConnections' c connIds = do let (errs, cs) = M.mapEither id conns errs' = M.map (Left . storeError) errs (subRs, rcvQs) = M.mapEither rcvQueueOrResult cs - mapM_ (mapM_ (\(cData, sqs) -> mapM_ (lift . resumeMsgDelivery c cData) sqs) . sndQueue) cs + lift $ mapM_ (mapM_ (\(cData, sqs) -> mapM_ (resumeMsgDelivery c cData) sqs) . sndQueue) cs lift $ resumeConnCmds c $ M.keys cs rcvRs <- lift $ connResults . fst <$> subscribeQueues c (concat $ M.elems rcvQs) ns <- asks ntfSupervisor tkn <- readTVarIO (ntfTkn ns) - when (instantNotifications tkn) . void . lift . forkIO . void . runExceptT $ sendNtfCreate ns rcvRs conns + lift $ when (instantNotifications tkn) . void . forkIO . void $ sendNtfCreate ns cs let rs = M.unions ([errs', subRs, rcvRs] :: [Map ConnId (Either AgentErrorType ())]) notifyResultError rs pure rs @@ -996,15 +996,15 @@ subscribeConnections' c connIds = do order (Active, _) = 2 order (_, Right _) = 3 order _ = 4 - sendNtfCreate :: NtfSupervisor -> Map ConnId (Either AgentErrorType ()) -> Map ConnId (Either StoreError SomeConn) -> AM () - sendNtfCreate ns rcvRs conns = - forM_ (M.assocs rcvRs) $ \case - (connId, Right _) -> forM_ (M.lookup connId conns) $ \case - Right (SomeConn _ conn) -> do - let cmd = if enableNtfs $ toConnData conn then NSCCreate else NSCDelete - atomically $ writeTBQueue (ntfSubQ ns) (connId, cmd) - _ -> pure () - _ -> pure () + sendNtfCreate :: NtfSupervisor -> Map ConnId SomeConn -> AM' () + sendNtfCreate ns cs = + -- TODO this needs to be batched end to end. + -- Currently, the only change is to ignore failed subscriptions. + forM_ cs $ \case + SomeConn _ conn -> do + let cmd = if enableNtfs $ toConnData conn then NSCCreate else NSCDelete + ConnData {connId} = toConnData conn + atomically $ writeTBQueue (ntfSubQ ns) (connId, cmd) sndQueue :: SomeConn -> Maybe (ConnData, NonEmpty SndQueue) sndQueue (SomeConn _ conn) = case conn of DuplexConnection cData _ sqs -> Just (cData, sqs)