mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-06-09 04:12:55 +00:00
smp server: batch processing of subscription messages (#1753)
* smp server: batch processing of subscription messages * refactor * empty line * fix --------- Co-authored-by: Evgeny @ SimpleX Chat <259188159+evgeny-simplex@users.noreply.github.com>
This commit is contained in:
@@ -1366,14 +1366,20 @@ client
|
||||
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " commands"
|
||||
let THandleParams {thVersion} = thParams'
|
||||
clntServiceId = (\THClientService {serviceId} -> serviceId) <$> (peerClientService =<< thAuth thParams')
|
||||
process t acc@(rs, msgs) =
|
||||
process msgMap t acc@(rs, msgs) =
|
||||
(maybe acc (\(!r, !msg_) -> (r : rs, maybe msgs (: msgs) msg_)))
|
||||
<$> processCommand clntServiceId thVersion t
|
||||
forever $
|
||||
atomically (readTBQueue rcvQ)
|
||||
>>= foldrM process ([], [])
|
||||
<$> processCommand clntServiceId thVersion msgMap t
|
||||
forever $ do
|
||||
batch <- atomically (readTBQueue rcvQ)
|
||||
msgMap <- prefetchMsgs batch
|
||||
foldrM (process msgMap) ([], []) batch
|
||||
>>= \(rs_, msgs) -> mapM_ (atomically . writeTBQueue sndQ . (,msgs)) (L.nonEmpty rs_)
|
||||
where
|
||||
prefetchMsgs :: NonEmpty (VerifiedTransmission s) -> M s (Either ErrorType (Map RecipientId Message))
|
||||
prefetchMsgs batch =
|
||||
let subQs = [q | (Just (q, _), (_, _, Cmd SRecipient SUB)) <- L.toList batch]
|
||||
in if null subQs then pure $ Right M.empty else liftIO $ runExceptT $ tryPeekMsgs ms subQs
|
||||
|
||||
processProxiedCmd :: Transmission (Command 'ProxiedClient) -> M s (Maybe ResponseAndMessage)
|
||||
processProxiedCmd (corrId, EntityId sessId, command) = (\t -> ((corrId, EntityId sessId, t), Nothing)) <$$> case command of
|
||||
PRXY srv auth -> ifM allowProxy getRelay (pure $ Just $ ERR $ PROXY BASIC_AUTH)
|
||||
@@ -1454,8 +1460,8 @@ client
|
||||
mkIncProxyStats ps psOwn own sel = do
|
||||
incStat $ sel ps
|
||||
when own $ incStat $ sel psOwn
|
||||
processCommand :: Maybe ServiceId -> VersionSMP -> VerifiedTransmission s -> M s (Maybe ResponseAndMessage)
|
||||
processCommand clntServiceId clntVersion (q_, (corrId, entId, cmd)) = case cmd of
|
||||
processCommand :: Maybe ServiceId -> VersionSMP -> Either ErrorType (Map RecipientId Message) -> VerifiedTransmission s -> M s (Maybe ResponseAndMessage)
|
||||
processCommand clntServiceId clntVersion msgMap (q_, (corrId, entId, cmd)) = case cmd of
|
||||
Cmd SProxiedClient command -> processProxiedCmd (corrId, entId, command)
|
||||
Cmd SSender command -> case command of
|
||||
SKEY k -> withQueue $ \q qr -> checkMode QMMessaging qr $ secureQueue_ q k
|
||||
@@ -1479,7 +1485,9 @@ client
|
||||
pure $ allowNewQueues && maybe True ((== auth_) . Just) newQueueBasicAuth
|
||||
Cmd SRecipient command ->
|
||||
case command of
|
||||
SUB -> withQueue' subscribeQueueAndDeliver
|
||||
SUB -> case msgMap of
|
||||
Left e -> pure $ Just (err e, Nothing)
|
||||
Right msgs -> withQueue' $ subscribeQueueAndDeliver (M.lookup entId msgs)
|
||||
GET -> withQueue getMessage
|
||||
ACK msgId -> withQueue $ acknowledgeMsg msgId
|
||||
KEY sKey -> withQueue $ \q _ -> either err (corrId,entId,) <$> secureQueue_ q sKey
|
||||
@@ -1620,8 +1628,8 @@ client
|
||||
suspendQueue_ :: (StoreQueue s, QueueRec) -> M s (Transmission BrokerMsg)
|
||||
suspendQueue_ (q, _) = liftIO $ either err (const ok) <$> suspendQueue (queueStore ms) q
|
||||
|
||||
subscribeQueueAndDeliver :: StoreQueue s -> QueueRec -> M s ResponseAndMessage
|
||||
subscribeQueueAndDeliver q qr@QueueRec {rcvServiceId} =
|
||||
subscribeQueueAndDeliver :: Maybe Message -> StoreQueue s -> QueueRec -> M s ResponseAndMessage
|
||||
subscribeQueueAndDeliver msg_ q qr@QueueRec {rcvServiceId} =
|
||||
liftIO (TM.lookupIO entId $ subscriptions clnt) >>= \case
|
||||
Nothing ->
|
||||
sharedSubscribeQueue q SRecipientService rcvServiceId subscribers subscriptions serviceSubsCount (newSubscription NoSub) rcvServices >>= \case
|
||||
@@ -1642,7 +1650,6 @@ client
|
||||
deliver (hasSub, sub_) = do
|
||||
stats <- asks serverStats
|
||||
fmap (either ((,Nothing) . err) id) $ liftIO $ runExceptT $ do
|
||||
msg_ <- tryPeekMsg ms q
|
||||
msg' <- forM msg_ $ \msg -> liftIO $ do
|
||||
ts <- getSystemSeconds
|
||||
sub <- maybe (atomically getSub) pure sub_
|
||||
@@ -2087,7 +2094,7 @@ client
|
||||
-- rejectOrVerify filters allowed commands, no need to repeat it here.
|
||||
-- INTERNAL is used because processCommand never returns Nothing for sender commands (could be extracted for better types).
|
||||
-- `fst` removes empty message that is only returned for `SUB` command
|
||||
Right t''@(_, (corrId', entId', _)) -> maybe (corrId', entId', ERR INTERNAL) fst <$> lift (processCommand Nothing fwdVersion t'')
|
||||
Right t''@(_, (corrId', entId', _)) -> maybe (corrId', entId', ERR INTERNAL) fst <$> lift (processCommand Nothing fwdVersion (Right M.empty) t'')
|
||||
-- encode response
|
||||
r' <- case batchTransmissions clntTHParams [Right (Nothing, encodeTransmission clntTHParams r)] of
|
||||
[] -> throwE INTERNAL -- at least 1 item is guaranteed from NonEmpty/Right
|
||||
|
||||
@@ -41,7 +41,7 @@ import Data.List (intersperse)
|
||||
import qualified Data.Map.Strict as M
|
||||
import Data.Text (Text)
|
||||
import Data.Time.Clock.System (SystemTime (..))
|
||||
import Database.PostgreSQL.Simple (Binary (..), Only (..), (:.) (..))
|
||||
import Database.PostgreSQL.Simple (Binary (..), In (..), Only (..), (:.) (..))
|
||||
import qualified Database.PostgreSQL.Simple as DB
|
||||
import qualified Database.PostgreSQL.Simple.Copy as DB
|
||||
import Database.PostgreSQL.Simple.SqlQQ (sql)
|
||||
@@ -246,6 +246,25 @@ instance MsgStoreClass PostgresMsgStore where
|
||||
tryPeekMsg ms q = isolateQueue ms q "tryPeekMsg" $ tryPeekMsg_ q ()
|
||||
{-# INLINE tryPeekMsg #-}
|
||||
|
||||
tryPeekMsgs :: PostgresMsgStore -> [PostgresQueue] -> ExceptT ErrorType IO (M.Map RecipientId Message)
|
||||
tryPeekMsgs _ms [] = pure M.empty
|
||||
tryPeekMsgs ms qs =
|
||||
uninterruptibleMask_ $
|
||||
withDB' "tryPeekMsgs" (queueStore_ ms) $ \db ->
|
||||
M.fromList . map toRcvMsg <$>
|
||||
DB.query
|
||||
db
|
||||
[sql|
|
||||
SELECT DISTINCT ON (recipient_id)
|
||||
recipient_id, msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body
|
||||
FROM messages
|
||||
WHERE recipient_id IN ?
|
||||
ORDER BY recipient_id, message_id ASC
|
||||
|]
|
||||
(Only (In (map recipientId' qs)))
|
||||
where
|
||||
toRcvMsg (Only rId :. msg) = (rId, toMessage msg)
|
||||
|
||||
tryDelMsg :: PostgresMsgStore -> PostgresQueue -> MsgId -> ExceptT ErrorType IO (Maybe Message)
|
||||
tryDelMsg ms q msgId =
|
||||
uninterruptibleMask_ $
|
||||
|
||||
@@ -41,7 +41,9 @@ import Control.Monad.Trans.Except
|
||||
import Data.Functor (($>))
|
||||
import Data.Int (Int64)
|
||||
import Data.Kind
|
||||
import Data.Maybe (fromMaybe)
|
||||
import Data.Map.Strict (Map)
|
||||
import qualified Data.Map.Strict as M
|
||||
import Data.Maybe (catMaybes, fromMaybe)
|
||||
import Data.Text (Text)
|
||||
import Data.Time.Clock.System (SystemTime (systemSeconds))
|
||||
import Simplex.Messaging.Protocol
|
||||
@@ -91,6 +93,9 @@ class (Monad (StoreMonad s), QueueStoreClass (StoreQueue s) (QueueStore s)) => M
|
||||
tryPeekMsg :: s -> StoreQueue s -> ExceptT ErrorType IO (Maybe Message)
|
||||
tryPeekMsg st q = snd <$$> withPeekMsgQueue st q "tryPeekMsg" pure
|
||||
{-# INLINE tryPeekMsg #-}
|
||||
|
||||
tryPeekMsgs :: s -> [StoreQueue s] -> ExceptT ErrorType IO (Map RecipientId Message)
|
||||
tryPeekMsgs st qs = M.fromList . catMaybes <$> mapM (\q -> (recipientId q,) <$$> tryPeekMsg st q) qs
|
||||
|
||||
tryDelMsg :: s -> StoreQueue s -> MsgId -> ExceptT ErrorType IO (Maybe Message)
|
||||
tryDelMsg st q msgId' =
|
||||
|
||||
Reference in New Issue
Block a user