From 512afa1e2bc721241a78e8cc9feaf68b5775d919 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Mon, 13 May 2024 15:16:20 +0100 Subject: [PATCH 1/2] agent: count received duplicate messages (#1148) * agent: count received duplicate messages * count total too * names * fix * tuple --- src/Simplex/Messaging/Agent.hs | 20 ++++++++++++++++++++ src/Simplex/Messaging/Agent/Client.hs | 3 +++ 2 files changed, 23 insertions(+) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 198fabdba..f7280facc 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -113,6 +113,7 @@ module Simplex.Messaging.Agent debugAgentLocks, getAgentStats, resetAgentStats, + getMsgCounts, getAgentSubscriptions, logConnection, ) @@ -554,6 +555,9 @@ resetAgentStats :: AgentClient -> IO () resetAgentStats = atomically . TM.clear . agentStats {-# INLINE resetAgentStats #-} +getMsgCounts :: AgentClient -> IO [(ConnId, (Int, Int))] -- (total, duplicates) +getMsgCounts c = readTVarIO (msgCounts c) >>= mapM (\(connId, cnt) -> (connId,) <$> readTVarIO cnt) . M.assocs + withAgentEnv' :: AgentClient -> AM' a -> IO a withAgentEnv' c = (`runReaderT` agentEnv c) {-# INLINE withAgentEnv' #-} @@ -2135,6 +2139,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v, _ -> pure () let encryptedMsgHash = C.sha256Hash encAgentMessage g <- asks random + atomically updateTotalMsgCount tryError (agentClientMsg g encryptedMsgHash) >>= \case Right (Just (msgId, msgMeta, aMessage, rcPrev)) -> do conn'' <- resetRatchetSync @@ -2168,6 +2173,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v, | otherwise = pure conn' Right _ -> prohibited >> ack Left e@(AGENT A_DUPLICATE) -> do + atomically updateDupMsgCount withStore' c (\db -> getLastMsg db connId srvMsgId) >>= \case Just RcvMsg {internalId, msgMeta, msgBody = agentMsgBody, userAck} | userAck -> ackDel internalId @@ -2198,6 +2204,20 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v, checkDuplicateHash e encryptedMsgHash = unlessM (withStore' c $ \db -> checkRcvMsgHashExists db connId encryptedMsgHash) $ throwError e + updateTotalMsgCount :: STM () + updateTotalMsgCount = + TM.lookup connId (msgCounts c) >>= \case + Just v -> modifyTVar' v $ first (+ 1) + Nothing -> addMsgCount 0 + updateDupMsgCount :: STM () + updateDupMsgCount = + TM.lookup connId (msgCounts c) >>= \case + Just v -> modifyTVar' v $ second (+ 1) + Nothing -> addMsgCount 1 + addMsgCount :: Int -> STM () + addMsgCount duplicate = do + counts <- newTVar (1, duplicate) + TM.insert connId counts (msgCounts c) agentClientMsg :: TVar ChaChaDRG -> ByteString -> AM (Maybe (InternalId, MsgMeta, AMessage, CR.RatchetX448)) agentClientMsg g encryptedMsgHash = withStore c $ \db -> runExceptT $ do rc <- ExceptT $ getRatchet db connId -- ratchet state pre-decryption - required for processing EREADY diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 4c359a519..0e4a5b49e 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -298,6 +298,7 @@ data AgentClient = AgentClient -- smpSubWorkers for SMP servers sessions smpSubWorkers :: TMap SMPTransportSession (SessionVar (Async ())), agentStats :: TMap AgentStatsKey (TVar Int), + msgCounts :: TMap ConnId (TVar (Int, Int)), -- (total, duplicates) clientId :: Int, agentEnv :: Env } @@ -459,6 +460,7 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv = deleteLock <- createLock smpSubWorkers <- TM.empty agentStats <- TM.empty + msgCounts <- TM.empty return AgentClient { acThread, @@ -494,6 +496,7 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv = deleteLock, smpSubWorkers, agentStats, + msgCounts, clientId, agentEnv } From 762909ce33c01d71bbabfed156e1e9faeebbedca Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Mon, 13 May 2024 20:35:46 +0100 Subject: [PATCH 2/2] 5.8.0.0 --- package.yaml | 2 +- simplexmq.cabal | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/package.yaml b/package.yaml index 02a088e23..0aed0c806 100644 --- a/package.yaml +++ b/package.yaml @@ -1,5 +1,5 @@ name: simplexmq -version: 5.7.4.0 +version: 5.8.0.0 synopsis: SimpleXMQ message broker description: | This package includes <./docs/Simplex-Messaging-Server.html server>, diff --git a/simplexmq.cabal b/simplexmq.cabal index 3366cb0b8..92b6a9cfb 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -5,7 +5,7 @@ cabal-version: 1.12 -- see: https://github.com/sol/hpack name: simplexmq -version: 5.7.4.0 +version: 5.8.0.0 synopsis: SimpleXMQ message broker description: This package includes <./docs/Simplex-Messaging-Server.html server>, <./docs/Simplex-Messaging-Client.html client> and