mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-27 05:15:20 +00:00
refactor
This commit is contained in:
@@ -1485,7 +1485,9 @@ client
|
||||
pure $ allowNewQueues && maybe True ((== auth_) . Just) newQueueBasicAuth
|
||||
Cmd SRecipient command ->
|
||||
case command of
|
||||
SUB -> withQueue' $ subscribeQueueAndDeliver (M.lookup entId <$> msgMap)
|
||||
SUB -> case msgMap of
|
||||
Left e -> pure $ Just (err e, Nothing)
|
||||
Right msgs -> withQueue' $ subscribeQueueAndDeliver (M.lookup entId msgs)
|
||||
GET -> withQueue getMessage
|
||||
ACK msgId -> withQueue $ acknowledgeMsg msgId
|
||||
KEY sKey -> withQueue $ \q _ -> either err (corrId,entId,) <$> secureQueue_ q sKey
|
||||
@@ -1626,8 +1628,8 @@ client
|
||||
suspendQueue_ :: (StoreQueue s, QueueRec) -> M s (Transmission BrokerMsg)
|
||||
suspendQueue_ (q, _) = liftIO $ either err (const ok) <$> suspendQueue (queueStore ms) q
|
||||
|
||||
subscribeQueueAndDeliver :: Either ErrorType (Maybe Message) -> StoreQueue s -> QueueRec -> M s ResponseAndMessage
|
||||
subscribeQueueAndDeliver prefetchedMsg q qr@QueueRec {rcvServiceId} =
|
||||
subscribeQueueAndDeliver :: Maybe Message -> StoreQueue s -> QueueRec -> M s ResponseAndMessage
|
||||
subscribeQueueAndDeliver msg_ q qr@QueueRec {rcvServiceId} =
|
||||
liftIO (TM.lookupIO entId $ subscriptions clnt) >>= \case
|
||||
Nothing ->
|
||||
sharedSubscribeQueue q SRecipientService rcvServiceId subscribers subscriptions serviceSubsCount (newSubscription NoSub) rcvServices >>= \case
|
||||
@@ -1648,7 +1650,6 @@ client
|
||||
deliver (hasSub, sub_) = do
|
||||
stats <- asks serverStats
|
||||
fmap (either ((,Nothing) . err) id) $ liftIO $ runExceptT $ do
|
||||
msg_ <- liftEither prefetchedMsg
|
||||
msg' <- forM msg_ $ \msg -> liftIO $ do
|
||||
ts <- getSystemSeconds
|
||||
sub <- maybe (atomically getSub) pure sub_
|
||||
|
||||
@@ -64,7 +64,7 @@ import Data.Functor (($>))
|
||||
import Data.Int (Int64)
|
||||
import Data.List (sort)
|
||||
import qualified Data.Map.Strict as M
|
||||
import Data.Maybe (catMaybes, fromMaybe, isJust, isNothing, mapMaybe)
|
||||
import Data.Maybe (fromMaybe, isJust, isNothing, mapMaybe)
|
||||
import Data.Text (Text)
|
||||
import qualified Data.Text as T
|
||||
import Data.Text.Encoding (decodeLatin1)
|
||||
@@ -672,9 +672,6 @@ instance MsgStoreClass (JournalMsgStore s) where
|
||||
atomically $ writeTVar tipMsg $ Just (Just ml)
|
||||
pure $ Just msg
|
||||
|
||||
tryPeekMsgs st qs =
|
||||
M.fromList . catMaybes <$> mapM (\q -> (recipientId' q,) <$$> tryPeekMsg st q) qs
|
||||
|
||||
tryDeleteMsg_ :: JournalQueue s -> JournalMsgQueue s -> Bool -> StoreIO s ()
|
||||
tryDeleteMsg_ q mq@JournalMsgQueue {tipMsg, handles} logState = StoreIO $ (`E.finally` when logState (updateActiveAt q)) $
|
||||
void $
|
||||
|
||||
@@ -24,7 +24,6 @@ import Control.Monad.Trans.Except
|
||||
import Data.Functor (($>))
|
||||
import Data.Int (Int64)
|
||||
import qualified Data.Map.Strict as M
|
||||
import Data.Maybe (catMaybes)
|
||||
import Data.Text (Text)
|
||||
import Simplex.Messaging.Protocol
|
||||
import Simplex.Messaging.Server.MsgStore.Types
|
||||
@@ -177,9 +176,6 @@ instance MsgStoreClass STMMsgStore where
|
||||
tryPeekMsg_ _ = tryPeekTQueue . msgTQueue
|
||||
{-# INLINE tryPeekMsg_ #-}
|
||||
|
||||
tryPeekMsgs st qs =
|
||||
M.fromList . catMaybes <$> mapM (\q -> (recipientId' q,) <$$> tryPeekMsg st q) qs
|
||||
|
||||
tryDeleteMsg_ :: STMQueue -> STMMsgQueue -> Bool -> STM ()
|
||||
tryDeleteMsg_ _ STMMsgQueue {msgTQueue = q, size} _logState =
|
||||
tryReadTQueue q >>= \case
|
||||
|
||||
@@ -42,7 +42,8 @@ import Data.Functor (($>))
|
||||
import Data.Int (Int64)
|
||||
import Data.Kind
|
||||
import Data.Map.Strict (Map)
|
||||
import Data.Maybe (fromMaybe)
|
||||
import qualified Data.Map.Strict as M
|
||||
import Data.Maybe (catMaybes, fromMaybe)
|
||||
import Data.Text (Text)
|
||||
import Data.Time.Clock.System (SystemTime (systemSeconds))
|
||||
import Simplex.Messaging.Protocol
|
||||
@@ -88,12 +89,12 @@ class (Monad (StoreMonad s), QueueStoreClass (StoreQueue s) (QueueStore s)) => M
|
||||
isolateQueue :: s -> StoreQueue s -> Text -> StoreMonad s a -> ExceptT ErrorType IO a
|
||||
unsafeRunStore :: StoreQueue s -> Text -> StoreMonad s a -> IO a
|
||||
|
||||
tryPeekMsgs :: s -> [StoreQueue s] -> ExceptT ErrorType IO (Map RecipientId Message)
|
||||
|
||||
-- default implementations are overridden for PostgreSQL storage of messages
|
||||
tryPeekMsg :: s -> StoreQueue s -> ExceptT ErrorType IO (Maybe Message)
|
||||
tryPeekMsg st q = snd <$$> withPeekMsgQueue st q "tryPeekMsg" pure
|
||||
{-# INLINE tryPeekMsg #-}
|
||||
tryPeekMsgs :: s -> [StoreQueue s] -> ExceptT ErrorType IO (Map RecipientId Message)
|
||||
tryPeekMsgs st qs = M.fromList . catMaybes <$> mapM (\q -> (recipientId q,) <$$> tryPeekMsg st q) qs
|
||||
|
||||
tryDelMsg :: s -> StoreQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message)
|
||||
tryDelMsg st q msgId' =
|
||||
|
||||
Reference in New Issue
Block a user