Merge branch 'master' into proxy

This commit is contained in:
spaced4ndy
2024-05-14 10:41:06 +04:00
4 changed files with 25 additions and 2 deletions

View File

@@ -1,5 +1,5 @@
name: simplexmq
version: 5.7.4.0
version: 5.8.0.0
synopsis: SimpleXMQ message broker
description: |
This package includes <./docs/Simplex-Messaging-Server.html server>,

View File

@@ -5,7 +5,7 @@ cabal-version: 1.12
-- see: https://github.com/sol/hpack
name: simplexmq
version: 5.7.4.0
version: 5.8.0.0
synopsis: SimpleXMQ message broker
description: This package includes <./docs/Simplex-Messaging-Server.html server>,
<./docs/Simplex-Messaging-Client.html client> and

View File

@@ -113,6 +113,7 @@ module Simplex.Messaging.Agent
debugAgentLocks,
getAgentStats,
resetAgentStats,
getMsgCounts,
getAgentSubscriptions,
logConnection,
)
@@ -554,6 +555,9 @@ resetAgentStats :: AgentClient -> IO ()
resetAgentStats = atomically . TM.clear . agentStats
{-# INLINE resetAgentStats #-}
getMsgCounts :: AgentClient -> IO [(ConnId, (Int, Int))] -- (total, duplicates)
getMsgCounts c = readTVarIO (msgCounts c) >>= mapM (\(connId, cnt) -> (connId,) <$> readTVarIO cnt) . M.assocs
withAgentEnv' :: AgentClient -> AM' a -> IO a
withAgentEnv' c = (`runReaderT` agentEnv c)
{-# INLINE withAgentEnv' #-}
@@ -2139,6 +2143,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v,
_ -> pure ()
let encryptedMsgHash = C.sha256Hash encAgentMessage
g <- asks random
atomically updateTotalMsgCount
tryError (agentClientMsg g encryptedMsgHash) >>= \case
Right (Just (msgId, msgMeta, aMessage, rcPrev)) -> do
conn'' <- resetRatchetSync
@@ -2172,6 +2177,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v,
| otherwise = pure conn'
Right _ -> prohibited >> ack
Left e@(AGENT A_DUPLICATE) -> do
atomically updateDupMsgCount
withStore' c (\db -> getLastMsg db connId srvMsgId) >>= \case
Just RcvMsg {internalId, msgMeta, msgBody = agentMsgBody, userAck}
| userAck -> ackDel internalId
@@ -2202,6 +2208,20 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v,
checkDuplicateHash e encryptedMsgHash =
unlessM (withStore' c $ \db -> checkRcvMsgHashExists db connId encryptedMsgHash) $
throwError e
updateTotalMsgCount :: STM ()
updateTotalMsgCount =
TM.lookup connId (msgCounts c) >>= \case
Just v -> modifyTVar' v $ first (+ 1)
Nothing -> addMsgCount 0
updateDupMsgCount :: STM ()
updateDupMsgCount =
TM.lookup connId (msgCounts c) >>= \case
Just v -> modifyTVar' v $ second (+ 1)
Nothing -> addMsgCount 1
addMsgCount :: Int -> STM ()
addMsgCount duplicate = do
counts <- newTVar (1, duplicate)
TM.insert connId counts (msgCounts c)
agentClientMsg :: TVar ChaChaDRG -> ByteString -> AM (Maybe (InternalId, MsgMeta, AMessage, CR.RatchetX448))
agentClientMsg g encryptedMsgHash = withStore c $ \db -> runExceptT $ do
rc <- ExceptT $ getRatchet db connId -- ratchet state pre-decryption - required for processing EREADY

View File

@@ -306,6 +306,7 @@ data AgentClient = AgentClient
-- smpSubWorkers for SMP servers sessions
smpSubWorkers :: TMap SMPTransportSession (SessionVar (Async ())),
agentStats :: TMap AgentStatsKey (TVar Int),
msgCounts :: TMap ConnId (TVar (Int, Int)), -- (total, duplicates)
clientId :: Int,
agentEnv :: Env
}
@@ -475,6 +476,7 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv =
deleteLock <- createLock
smpSubWorkers <- TM.empty
agentStats <- TM.empty
msgCounts <- TM.empty
return
AgentClient
{ acThread,
@@ -511,6 +513,7 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv =
deleteLock,
smpSubWorkers,
agentStats,
msgCounts,
clientId,
agentEnv
}