From cb89b963bf376073343ddccf7e9a5310d83d3a61 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Mon, 18 Dec 2023 11:34:25 +0000 Subject: [PATCH] attempt 5 --- src/Simplex/Messaging/Agent.hs | 13 +++++++++---- src/Simplex/Messaging/Util.hs | 20 ++++++++++++++++++++ 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 9f0572278..a61ae26d5 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -1098,14 +1098,19 @@ enqueueMessage c cData sq msgFlags aMessage = oneResult $ \r -> enqueueMessageB c [(r, (cData, [sq], msgFlags, aMessage))] -- this function is used only for sending messages in batch, it returns the list of successes to enqueue additional deliveries -enqueueMessageB :: forall m. AgentMonad' m => AgentClient -> [(EIORef AgentMsgId, (ConnData, NonEmpty SndQueue, MsgFlags, AMessage))] -> m [(ConnData, [SndQueue], AgentMsgId)] +enqueueMessageB :: forall m. AgentMonad' m => AgentClient -> [Either AgentErrorType (ConnData, NonEmpty SndQueue, MsgFlags, AMessage))] -> m [Either AgentErrorType (ConnData, [SndQueue], AgentMsgId)] enqueueMessageB c reqs = do - forM_ reqs $ \(_, (cData, sq :| _, _, _)) -> + forME_ reqs $ \(_, (cData, sq :| _, _, _)) -> runExceptT $ resumeMsgDelivery c cData sq aVRange <- asks $ smpAgentVRange . config mIds <- withStoreBatch c $ \db -> - map (storeSentMsg db $ maxVersion aVRange) reqs - catMaybes <$> mapM processResults (zip reqs mIds) + map (mapE $ storeSentMsg db $ maxVersion aVRange) reqs + forME mIds $ \mId -> do + let InternalId msgId = mId + queuePendingMsgs c sq [mId] + let sqs' = filter isActiveSndQ sqs + pure $ Right (cData, sqs', msgId) + -- catMaybes <$> mapM processResults (zip reqs mIds) where storeSentMsg :: DB.Connection -> Version -> (EIORef AgentMsgId, (ConnData, NonEmpty SndQueue, MsgFlags, AMessage)) -> IO (Either StoreError InternalId) storeSentMsg db agentVersion (_, (ConnData {connId}, sq :| _, msgFlags, aMessage)) = runExceptT $ do diff --git a/src/Simplex/Messaging/Util.hs b/src/Simplex/Messaging/Util.hs index 3143427ca..830f16c50 100644 --- a/src/Simplex/Messaging/Util.hs +++ b/src/Simplex/Messaging/Util.hs @@ -85,6 +85,26 @@ unlessM b = ifM b $ pure () ($>>=) :: (Monad m, Monad f, Traversable f) => m (f a) -> (a -> m (f b)) -> m (f b) f $>>= g = f >>= fmap join . mapM g +mapME :: Monad m => (a -> m (Either e b)) -> [Either e a] -> m [Either e b] +mapME f = mapM (mapE f) +{-# INLINE mapME #-} + +mapME_ :: Monad m => (a -> m (Either e b)) -> [Either e a] -> m () +mapME_ f = mapM_ (mapE f) +{-# INLINE mapME_ #-} + +mapE :: Monad m => (a -> m (Either e b)) -> Either e a -> m (Either e b) +mapE f = either (pure . Left) f +{-# INLINE mapE #-} + +forME :: Monad m => [Either e a] -> (a -> m (Either e b)) -> m [Either e b] +forME = flip mapME +{-# INLINE forME #-} + +forME_ :: Monad m => [Either e a] -> (a -> m (Either e b)) -> m [Either e b] +forME_ = void . flip mapME_ +{-# INLINE forME_ #-} + catchAll :: IO a -> (E.SomeException -> IO a) -> IO a catchAll = E.catch {-# INLINE catchAll #-}