This commit is contained in:
Evgeny Poberezkin
2024-08-26 14:02:27 +01:00
parent 2257825f56
commit 6b6249e4be
+18 -15
View File
@@ -739,20 +739,15 @@ receive h@THandle {params = THandleParams {thAuth}} Client {rcvQ, sndQ, rcvActiv
send :: Transport c => MVar (THandleSMP c 'TServer) -> Client -> ServerStats -> IO ()
send th c@Client {sndQ, msgQ, sessionId} stats = do
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " send"
forever $ atomically (readTBQueue sndQ) >>= sendTransmissions
forever $ do
ts <- atomically (readTBQueue sndQ)
sendTransmissions ts
updateENDStats ts
where
sendTransmissions :: NonEmpty (Transmission BrokerMsg) -> IO ()
sendTransmissions ts@((_, _, cmd) :| _) = case cmd of
OK | len > 2 -> splitSend
MSG {} | len > 2 -> splitSend
END -> do -- END events are not combined with others
tSend th c ts
atomically $ modifyTVar' (qSubEndSent stats) (+ len)
atomically $ modifyTVar' (qSubEndSentB stats) (+ len `div` 255) -- up to 255 ENDs in the batch
_ -> tSend th c ts
where
len = L.length ts
splitSend = do
sendTransmissions ts
| L.length ts <= 2 = tSend th c ts
| otherwise = do
let (msgs_, ts') = mapAccumR splitMessages [] ts
-- If the request had batched subscriptions and L.length ts > 2
-- this will reply OK to all SUBs in the first batched transmission,
@@ -762,12 +757,20 @@ send th c@Client {sndQ, msgQ, sessionId} stats = do
-- without any client response timeouts, and allowing them to interleave
-- with other requests responses.
mapM_ (atomically . writeTBQueue msgQ) $ L.nonEmpty msgs_
where
splitMessages :: [Transmission BrokerMsg] -> Transmission BrokerMsg -> ([Transmission BrokerMsg], Transmission BrokerMsg)
splitMessages msgs t@(corrId, entId, cmd') = case cmd' of
splitMessages msgs t@(corrId, entId, cmd) = case cmd of
-- replace MSG response with OK, accumulating MSG in a separate list.
MSG {} -> ((CorrId "", entId, cmd') : msgs, (corrId, entId, OK))
MSG {} -> ((CorrId "", entId, cmd) : msgs, (corrId, entId, OK))
_ -> (msgs, t)
updateENDStats :: NonEmpty (Transmission BrokerMsg) -> IO ()
updateENDStats = \case
ts@((_, _, END) :| _) -> do -- END events are not combined with others
let len = L.length ts
atomically $ modifyTVar' (qSubEndSent stats) (+ len)
atomically $ modifyTVar' (qSubEndSentB stats) (+ len `div` 255) -- up to 255 ENDs in the batch
_ -> pure ()
sendMsg :: Transport c => MVar (THandleSMP c 'TServer) -> Client -> IO ()
sendMsg th c@Client {msgQ, sessionId} = do
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " sendMsg"