diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 2daa0701f..78e038538 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -1485,7 +1485,9 @@ client pure $ allowNewQueues && maybe True ((== auth_) . Just) newQueueBasicAuth Cmd SRecipient command -> case command of - SUB -> withQueue' $ subscribeQueueAndDeliver (M.lookup entId <$> msgMap) + SUB -> case msgMap of + Left e -> pure $ Just (err e, Nothing) + Right msgs -> withQueue' $ subscribeQueueAndDeliver (M.lookup entId msgs) GET -> withQueue getMessage ACK msgId -> withQueue $ acknowledgeMsg msgId KEY sKey -> withQueue $ \q _ -> either err (corrId,entId,) <$> secureQueue_ q sKey @@ -1626,8 +1628,8 @@ client suspendQueue_ :: (StoreQueue s, QueueRec) -> M s (Transmission BrokerMsg) suspendQueue_ (q, _) = liftIO $ either err (const ok) <$> suspendQueue (queueStore ms) q - subscribeQueueAndDeliver :: Either ErrorType (Maybe Message) -> StoreQueue s -> QueueRec -> M s ResponseAndMessage - subscribeQueueAndDeliver prefetchedMsg q qr@QueueRec {rcvServiceId} = + subscribeQueueAndDeliver :: Maybe Message -> StoreQueue s -> QueueRec -> M s ResponseAndMessage + subscribeQueueAndDeliver msg_ q qr@QueueRec {rcvServiceId} = liftIO (TM.lookupIO entId $ subscriptions clnt) >>= \case Nothing -> sharedSubscribeQueue q SRecipientService rcvServiceId subscribers subscriptions serviceSubsCount (newSubscription NoSub) rcvServices >>= \case @@ -1648,7 +1650,6 @@ client deliver (hasSub, sub_) = do stats <- asks serverStats fmap (either ((,Nothing) . err) id) $ liftIO $ runExceptT $ do - msg_ <- liftEither prefetchedMsg msg' <- forM msg_ $ \msg -> liftIO $ do ts <- getSystemSeconds sub <- maybe (atomically getSub) pure sub_ diff --git a/src/Simplex/Messaging/Server/MsgStore/Journal.hs b/src/Simplex/Messaging/Server/MsgStore/Journal.hs index d994e829b..c65660c93 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Journal.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Journal.hs @@ -64,7 +64,7 @@ import Data.Functor (($>)) import Data.Int (Int64) import Data.List (sort) import qualified Data.Map.Strict as M -import Data.Maybe (catMaybes, fromMaybe, isJust, isNothing, mapMaybe) +import Data.Maybe (fromMaybe, isJust, isNothing, mapMaybe) import Data.Text (Text) import qualified Data.Text as T import Data.Text.Encoding (decodeLatin1) @@ -672,9 +672,6 @@ instance MsgStoreClass (JournalMsgStore s) where atomically $ writeTVar tipMsg $ Just (Just ml) pure $ Just msg - tryPeekMsgs st qs = - M.fromList . catMaybes <$> mapM (\q -> (recipientId' q,) <$$> tryPeekMsg st q) qs - tryDeleteMsg_ :: JournalQueue s -> JournalMsgQueue s -> Bool -> StoreIO s () tryDeleteMsg_ q mq@JournalMsgQueue {tipMsg, handles} logState = StoreIO $ (`E.finally` when logState (updateActiveAt q)) $ void $ diff --git a/src/Simplex/Messaging/Server/MsgStore/STM.hs b/src/Simplex/Messaging/Server/MsgStore/STM.hs index b9d89a5f2..f118e007c 100644 --- a/src/Simplex/Messaging/Server/MsgStore/STM.hs +++ b/src/Simplex/Messaging/Server/MsgStore/STM.hs @@ -24,7 +24,6 @@ import Control.Monad.Trans.Except import Data.Functor (($>)) import Data.Int (Int64) import qualified Data.Map.Strict as M -import Data.Maybe (catMaybes) import Data.Text (Text) import Simplex.Messaging.Protocol import Simplex.Messaging.Server.MsgStore.Types @@ -177,9 +176,6 @@ instance MsgStoreClass STMMsgStore where tryPeekMsg_ _ = tryPeekTQueue . msgTQueue {-# INLINE tryPeekMsg_ #-} - tryPeekMsgs st qs = - M.fromList . catMaybes <$> mapM (\q -> (recipientId' q,) <$$> tryPeekMsg st q) qs - tryDeleteMsg_ :: STMQueue -> STMMsgQueue -> Bool -> STM () tryDeleteMsg_ _ STMMsgQueue {msgTQueue = q, size} _logState = tryReadTQueue q >>= \case diff --git a/src/Simplex/Messaging/Server/MsgStore/Types.hs b/src/Simplex/Messaging/Server/MsgStore/Types.hs index a1480a2ef..5f2a7d070 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Types.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Types.hs @@ -42,7 +42,8 @@ import Data.Functor (($>)) import Data.Int (Int64) import Data.Kind import Data.Map.Strict (Map) -import Data.Maybe (fromMaybe) +import qualified Data.Map.Strict as M +import Data.Maybe (catMaybes, fromMaybe) import Data.Text (Text) import Data.Time.Clock.System (SystemTime (systemSeconds)) import Simplex.Messaging.Protocol @@ -88,12 +89,12 @@ class (Monad (StoreMonad s), QueueStoreClass (StoreQueue s) (QueueStore s)) => M isolateQueue :: s -> StoreQueue s -> Text -> StoreMonad s a -> ExceptT ErrorType IO a unsafeRunStore :: StoreQueue s -> Text -> StoreMonad s a -> IO a - tryPeekMsgs :: s -> [StoreQueue s] -> ExceptT ErrorType IO (Map RecipientId Message) - -- default implementations are overridden for PostgreSQL storage of messages tryPeekMsg :: s -> StoreQueue s -> ExceptT ErrorType IO (Maybe Message) tryPeekMsg st q = snd <$$> withPeekMsgQueue st q "tryPeekMsg" pure {-# INLINE tryPeekMsg #-} + tryPeekMsgs :: s -> [StoreQueue s] -> ExceptT ErrorType IO (Map RecipientId Message) + tryPeekMsgs st qs = M.fromList . catMaybes <$> mapM (\q -> (recipientId q,) <$$> tryPeekMsg st q) qs tryDelMsg :: s -> StoreQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message) tryDelMsg st q msgId' =