mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-04-30 14:35:19 +00:00
Merge remote-tracking branch 'origin/master' into ab/measure
This commit is contained in:
@@ -63,7 +63,7 @@ jobs:
|
||||
mv $(cabal list-bin xftp) xftp-ubuntu-${{ matrix.platform_name}}
|
||||
|
||||
- name: Build changelog
|
||||
if: startsWith(github.ref, 'refs/tags/v') && matrix.os == 'ubuntu-20.04'
|
||||
if: startsWith(github.ref, 'refs/tags/v') && matrix.os == 'ubuntu-22.04'
|
||||
id: build_changelog
|
||||
uses: mikepenz/release-changelog-builder-action@v1
|
||||
with:
|
||||
@@ -75,7 +75,7 @@ jobs:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
|
||||
- name: Create release
|
||||
if: startsWith(github.ref, 'refs/tags/v') && matrix.os == 'ubuntu-20.04' && matrix.ghc == '9.6.3'
|
||||
if: startsWith(github.ref, 'refs/tags/v') && matrix.ghc != '8.10.7'
|
||||
uses: softprops/action-gh-release@v1
|
||||
with:
|
||||
body: |
|
||||
|
||||
+1
-1
@@ -1,5 +1,5 @@
|
||||
name: simplexmq
|
||||
version: 5.7.4.0
|
||||
version: 5.8.0.0
|
||||
synopsis: SimpleXMQ message broker
|
||||
description: |
|
||||
This package includes <./docs/Simplex-Messaging-Server.html server>,
|
||||
|
||||
+1
-1
@@ -5,7 +5,7 @@ cabal-version: 1.12
|
||||
-- see: https://github.com/sol/hpack
|
||||
|
||||
name: simplexmq
|
||||
version: 5.7.4.0
|
||||
version: 5.8.0.0
|
||||
synopsis: SimpleXMQ message broker
|
||||
description: This package includes <./docs/Simplex-Messaging-Server.html server>,
|
||||
<./docs/Simplex-Messaging-Client.html client> and
|
||||
|
||||
@@ -113,6 +113,7 @@ module Simplex.Messaging.Agent
|
||||
debugAgentLocks,
|
||||
getAgentStats,
|
||||
resetAgentStats,
|
||||
getMsgCounts,
|
||||
getAgentSubscriptions,
|
||||
logConnection,
|
||||
)
|
||||
@@ -554,6 +555,9 @@ resetAgentStats :: AgentClient -> IO ()
|
||||
resetAgentStats = atomically . TM.clear . agentStats
|
||||
{-# INLINE resetAgentStats #-}
|
||||
|
||||
getMsgCounts :: AgentClient -> IO [(ConnId, (Int, Int))] -- (total, duplicates)
|
||||
getMsgCounts c = readTVarIO (msgCounts c) >>= mapM (\(connId, cnt) -> (connId,) <$> readTVarIO cnt) . M.assocs
|
||||
|
||||
withAgentEnv' :: AgentClient -> AM' a -> IO a
|
||||
withAgentEnv' c = (`runReaderT` agentEnv c)
|
||||
{-# INLINE withAgentEnv' #-}
|
||||
@@ -2135,6 +2139,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v,
|
||||
_ -> pure ()
|
||||
let encryptedMsgHash = C.sha256Hash encAgentMessage
|
||||
g <- asks random
|
||||
atomically updateTotalMsgCount
|
||||
tryError (agentClientMsg g encryptedMsgHash) >>= \case
|
||||
Right (Just (msgId, msgMeta, aMessage, rcPrev)) -> do
|
||||
conn'' <- resetRatchetSync
|
||||
@@ -2168,6 +2173,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v,
|
||||
| otherwise = pure conn'
|
||||
Right _ -> prohibited >> ack
|
||||
Left e@(AGENT A_DUPLICATE) -> do
|
||||
atomically updateDupMsgCount
|
||||
withStore' c (\db -> getLastMsg db connId srvMsgId) >>= \case
|
||||
Just RcvMsg {internalId, msgMeta, msgBody = agentMsgBody, userAck}
|
||||
| userAck -> ackDel internalId
|
||||
@@ -2198,6 +2204,20 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v,
|
||||
checkDuplicateHash e encryptedMsgHash =
|
||||
unlessM (withStore' c $ \db -> checkRcvMsgHashExists db connId encryptedMsgHash) $
|
||||
throwError e
|
||||
updateTotalMsgCount :: STM ()
|
||||
updateTotalMsgCount =
|
||||
TM.lookup connId (msgCounts c) >>= \case
|
||||
Just v -> modifyTVar' v $ first (+ 1)
|
||||
Nothing -> addMsgCount 0
|
||||
updateDupMsgCount :: STM ()
|
||||
updateDupMsgCount =
|
||||
TM.lookup connId (msgCounts c) >>= \case
|
||||
Just v -> modifyTVar' v $ second (+ 1)
|
||||
Nothing -> addMsgCount 1
|
||||
addMsgCount :: Int -> STM ()
|
||||
addMsgCount duplicate = do
|
||||
counts <- newTVar (1, duplicate)
|
||||
TM.insert connId counts (msgCounts c)
|
||||
agentClientMsg :: TVar ChaChaDRG -> ByteString -> AM (Maybe (InternalId, MsgMeta, AMessage, CR.RatchetX448))
|
||||
agentClientMsg g encryptedMsgHash = withStore c $ \db -> runExceptT $ do
|
||||
rc <- ExceptT $ getRatchet db connId -- ratchet state pre-decryption - required for processing EREADY
|
||||
|
||||
@@ -298,6 +298,7 @@ data AgentClient = AgentClient
|
||||
-- smpSubWorkers for SMP servers sessions
|
||||
smpSubWorkers :: TMap SMPTransportSession (SessionVar (Async ())),
|
||||
agentStats :: TMap AgentStatsKey (TVar Int),
|
||||
msgCounts :: TMap ConnId (TVar (Int, Int)), -- (total, duplicates)
|
||||
clientId :: Int,
|
||||
agentEnv :: Env
|
||||
}
|
||||
@@ -459,6 +460,7 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv =
|
||||
deleteLock <- createLock
|
||||
smpSubWorkers <- TM.empty
|
||||
agentStats <- TM.empty
|
||||
msgCounts <- TM.empty
|
||||
return
|
||||
AgentClient
|
||||
{ acThread,
|
||||
@@ -494,6 +496,7 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv =
|
||||
deleteLock,
|
||||
smpSubWorkers,
|
||||
agentStats,
|
||||
msgCounts,
|
||||
clientId,
|
||||
agentEnv
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user