attempt 5

This commit is contained in:
Evgeny Poberezkin
2023-12-18 11:34:25 +00:00
parent d8ec57602f
commit cb89b963bf
2 changed files with 29 additions and 4 deletions
+9 -4
View File
@@ -1098,14 +1098,19 @@ enqueueMessage c cData sq msgFlags aMessage =
oneResult $ \r -> enqueueMessageB c [(r, (cData, [sq], msgFlags, aMessage))]
-- this function is used only for sending messages in batch, it returns the list of successes to enqueue additional deliveries
enqueueMessageB :: forall m. AgentMonad' m => AgentClient -> [(EIORef AgentMsgId, (ConnData, NonEmpty SndQueue, MsgFlags, AMessage))] -> m [(ConnData, [SndQueue], AgentMsgId)]
enqueueMessageB :: forall m. AgentMonad' m => AgentClient -> [Either AgentErrorType (ConnData, NonEmpty SndQueue, MsgFlags, AMessage))] -> m [Either AgentErrorType (ConnData, [SndQueue], AgentMsgId)]
enqueueMessageB c reqs = do
forM_ reqs $ \(_, (cData, sq :| _, _, _)) ->
forME_ reqs $ \(_, (cData, sq :| _, _, _)) ->
runExceptT $ resumeMsgDelivery c cData sq
aVRange <- asks $ smpAgentVRange . config
mIds <- withStoreBatch c $ \db ->
map (storeSentMsg db $ maxVersion aVRange) reqs
catMaybes <$> mapM processResults (zip reqs mIds)
map (mapE $ storeSentMsg db $ maxVersion aVRange) reqs
forME mIds $ \mId -> do
let InternalId msgId = mId
queuePendingMsgs c sq [mId]
let sqs' = filter isActiveSndQ sqs
pure $ Right (cData, sqs', msgId)
-- catMaybes <$> mapM processResults (zip reqs mIds)
where
storeSentMsg :: DB.Connection -> Version -> (EIORef AgentMsgId, (ConnData, NonEmpty SndQueue, MsgFlags, AMessage)) -> IO (Either StoreError InternalId)
storeSentMsg db agentVersion (_, (ConnData {connId}, sq :| _, msgFlags, aMessage)) = runExceptT $ do
+20
View File
@@ -85,6 +85,26 @@ unlessM b = ifM b $ pure ()
($>>=) :: (Monad m, Monad f, Traversable f) => m (f a) -> (a -> m (f b)) -> m (f b)
f $>>= g = f >>= fmap join . mapM g
mapME :: Monad m => (a -> m (Either e b)) -> [Either e a] -> m [Either e b]
mapME f = mapM (mapE f)
{-# INLINE mapME #-}
mapME_ :: Monad m => (a -> m (Either e b)) -> [Either e a] -> m ()
mapME_ f = mapM_ (mapE f)
{-# INLINE mapME_ #-}
mapE :: Monad m => (a -> m (Either e b)) -> Either e a -> m (Either e b)
mapE f = either (pure . Left) f
{-# INLINE mapE #-}
forME :: Monad m => [Either e a] -> (a -> m (Either e b)) -> m [Either e b]
forME = flip mapME
{-# INLINE forME #-}
forME_ :: Monad m => [Either e a] -> (a -> m (Either e b)) -> m [Either e b]
forME_ = void . flip mapME_
{-# INLINE forME_ #-}
catchAll :: IO a -> (E.SomeException -> IO a) -> IO a
catchAll = E.catch
{-# INLINE catchAll #-}