From 6b6249e4be17698f05cfa3cb24b9c70d879cf420 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Mon, 26 Aug 2024 14:02:27 +0100 Subject: [PATCH] fix --- src/Simplex/Messaging/Server.hs | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 5b78f61bd..87fe3e9cb 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -739,20 +739,15 @@ receive h@THandle {params = THandleParams {thAuth}} Client {rcvQ, sndQ, rcvActiv send :: Transport c => MVar (THandleSMP c 'TServer) -> Client -> ServerStats -> IO () send th c@Client {sndQ, msgQ, sessionId} stats = do labelMyThread . B.unpack $ "client $" <> encode sessionId <> " send" - forever $ atomically (readTBQueue sndQ) >>= sendTransmissions + forever $ do + ts <- atomically (readTBQueue sndQ) + sendTransmissions ts + updateENDStats ts where sendTransmissions :: NonEmpty (Transmission BrokerMsg) -> IO () - sendTransmissions ts@((_, _, cmd) :| _) = case cmd of - OK | len > 2 -> splitSend - MSG {} | len > 2 -> splitSend - END -> do -- END events are not combined with others - tSend th c ts - atomically $ modifyTVar' (qSubEndSent stats) (+ len) - atomically $ modifyTVar' (qSubEndSentB stats) (+ len `div` 255) -- up to 255 ENDs in the batch - _ -> tSend th c ts - where - len = L.length ts - splitSend = do + sendTransmissions ts + | L.length ts <= 2 = tSend th c ts + | otherwise = do let (msgs_, ts') = mapAccumR splitMessages [] ts -- If the request had batched subscriptions and L.length ts > 2 -- this will reply OK to all SUBs in the first batched transmission, @@ -762,12 +757,20 @@ send th c@Client {sndQ, msgQ, sessionId} stats = do -- without any client response timeouts, and allowing them to interleave -- with other requests responses. mapM_ (atomically . writeTBQueue msgQ) $ L.nonEmpty msgs_ + where splitMessages :: [Transmission BrokerMsg] -> Transmission BrokerMsg -> ([Transmission BrokerMsg], Transmission BrokerMsg) - splitMessages msgs t@(corrId, entId, cmd') = case cmd' of + splitMessages msgs t@(corrId, entId, cmd) = case cmd of -- replace MSG response with OK, accumulating MSG in a separate list. - MSG {} -> ((CorrId "", entId, cmd') : msgs, (corrId, entId, OK)) + MSG {} -> ((CorrId "", entId, cmd) : msgs, (corrId, entId, OK)) _ -> (msgs, t) - + updateENDStats :: NonEmpty (Transmission BrokerMsg) -> IO () + updateENDStats = \case + ts@((_, _, END) :| _) -> do -- END events are not combined with others + let len = L.length ts + atomically $ modifyTVar' (qSubEndSent stats) (+ len) + atomically $ modifyTVar' (qSubEndSentB stats) (+ len `div` 255) -- up to 255 ENDs in the batch + _ -> pure () + sendMsg :: Transport c => MVar (THandleSMP c 'TServer) -> Client -> IO () sendMsg th c@Client {msgQ, sessionId} = do labelMyThread . B.unpack $ "client $" <> encode sessionId <> " sendMsg"