diff --git a/package.yaml b/package.yaml index 02a088e23..0aed0c806 100644 --- a/package.yaml +++ b/package.yaml @@ -1,5 +1,5 @@ name: simplexmq -version: 5.7.4.0 +version: 5.8.0.0 synopsis: SimpleXMQ message broker description: | This package includes <./docs/Simplex-Messaging-Server.html server>, diff --git a/simplexmq.cabal b/simplexmq.cabal index 1b9f931ad..bd0883c23 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -5,7 +5,7 @@ cabal-version: 1.12 -- see: https://github.com/sol/hpack name: simplexmq -version: 5.7.4.0 +version: 5.8.0.0 synopsis: SimpleXMQ message broker description: This package includes <./docs/Simplex-Messaging-Server.html server>, <./docs/Simplex-Messaging-Client.html client> and diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index cd2c7f32f..d4fb11dd5 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -113,6 +113,7 @@ module Simplex.Messaging.Agent debugAgentLocks, getAgentStats, resetAgentStats, + getMsgCounts, getAgentSubscriptions, logConnection, ) @@ -554,6 +555,9 @@ resetAgentStats :: AgentClient -> IO () resetAgentStats = atomically . TM.clear . agentStats {-# INLINE resetAgentStats #-} +getMsgCounts :: AgentClient -> IO [(ConnId, (Int, Int))] -- (total, duplicates) +getMsgCounts c = readTVarIO (msgCounts c) >>= mapM (\(connId, cnt) -> (connId,) <$> readTVarIO cnt) . M.assocs + withAgentEnv' :: AgentClient -> AM' a -> IO a withAgentEnv' c = (`runReaderT` agentEnv c) {-# INLINE withAgentEnv' #-} @@ -2139,6 +2143,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v, _ -> pure () let encryptedMsgHash = C.sha256Hash encAgentMessage g <- asks random + atomically updateTotalMsgCount tryError (agentClientMsg g encryptedMsgHash) >>= \case Right (Just (msgId, msgMeta, aMessage, rcPrev)) -> do conn'' <- resetRatchetSync @@ -2172,6 +2177,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v, | otherwise = pure conn' Right _ -> prohibited >> ack Left e@(AGENT A_DUPLICATE) -> do + atomically updateDupMsgCount withStore' c (\db -> getLastMsg db connId srvMsgId) >>= \case Just RcvMsg {internalId, msgMeta, msgBody = agentMsgBody, userAck} | userAck -> ackDel internalId @@ -2202,6 +2208,20 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v, checkDuplicateHash e encryptedMsgHash = unlessM (withStore' c $ \db -> checkRcvMsgHashExists db connId encryptedMsgHash) $ throwError e + updateTotalMsgCount :: STM () + updateTotalMsgCount = + TM.lookup connId (msgCounts c) >>= \case + Just v -> modifyTVar' v $ first (+ 1) + Nothing -> addMsgCount 0 + updateDupMsgCount :: STM () + updateDupMsgCount = + TM.lookup connId (msgCounts c) >>= \case + Just v -> modifyTVar' v $ second (+ 1) + Nothing -> addMsgCount 1 + addMsgCount :: Int -> STM () + addMsgCount duplicate = do + counts <- newTVar (1, duplicate) + TM.insert connId counts (msgCounts c) agentClientMsg :: TVar ChaChaDRG -> ByteString -> AM (Maybe (InternalId, MsgMeta, AMessage, CR.RatchetX448)) agentClientMsg g encryptedMsgHash = withStore c $ \db -> runExceptT $ do rc <- ExceptT $ getRatchet db connId -- ratchet state pre-decryption - required for processing EREADY diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 702d4c23f..13adb9fdc 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -306,6 +306,7 @@ data AgentClient = AgentClient -- smpSubWorkers for SMP servers sessions smpSubWorkers :: TMap SMPTransportSession (SessionVar (Async ())), agentStats :: TMap AgentStatsKey (TVar Int), + msgCounts :: TMap ConnId (TVar (Int, Int)), -- (total, duplicates) clientId :: Int, agentEnv :: Env } @@ -475,6 +476,7 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv = deleteLock <- createLock smpSubWorkers <- TM.empty agentStats <- TM.empty + msgCounts <- TM.empty return AgentClient { acThread, @@ -511,6 +513,7 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv = deleteLock, smpSubWorkers, agentStats, + msgCounts, clientId, agentEnv }