mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-27 16:24:19 +00:00
add proxy counters
This commit is contained in:
@@ -680,8 +680,13 @@ client thParams' clnt@Client {clientId, peerId, subscriptions, ntfSubscriptions,
|
||||
ServerConfig {allowSMPProxy, newQueueBasicAuth} <- asks config
|
||||
pure $ allowSMPProxy && maybe True ((== auth) . Just) newQueueBasicAuth
|
||||
getRelay = do
|
||||
withClientStatsId $ \cs -> atomically $ modifyTVar' (CS.proxyRelaysRequested cs) (+ 1)
|
||||
ProxyAgent {smpAgent} <- asks proxyAgent
|
||||
liftIO $ proxyResp <$> runExceptT (getSMPServerClient' smpAgent srv) `catch` (pure . Left . PCEIOError)
|
||||
r <- liftIO $ proxyResp <$> runExceptT (getSMPServerClient' smpAgent srv) `catch` (pure . Left . PCEIOError)
|
||||
case r of
|
||||
PKEY {} -> withClientStatsId $ \cs -> atomically $ modifyTVar' (CS.proxyRelaysConnected cs) (+ 1)
|
||||
_ -> pure ()
|
||||
pure r
|
||||
where
|
||||
proxyResp = \case
|
||||
Left err -> ERR $ smpProxyError err
|
||||
@@ -698,9 +703,13 @@ client thParams' clnt@Client {clientId, peerId, subscriptions, ntfSubscriptions,
|
||||
ProxyAgent {smpAgent} <- asks proxyAgent
|
||||
atomically (lookupSMPServerClient smpAgent sessId) >>= \case
|
||||
Just smp
|
||||
| v >= sendingProxySMPVersion ->
|
||||
liftIO $ either (ERR . smpProxyError) PRES <$>
|
||||
| v >= sendingProxySMPVersion -> do
|
||||
r <- liftIO $ either (ERR . smpProxyError) PRES <$>
|
||||
runExceptT (forwardSMPMessage smp corrId fwdV pubKey encBlock) `catchError` (pure . Left . PCEIOError)
|
||||
case r of
|
||||
PRES {} -> withClientStatsId $ \cs -> atomically $ modifyTVar' (CS.msgSentViaProxy cs) (+ 1)
|
||||
_ -> pure ()
|
||||
pure r
|
||||
| otherwise -> pure . ERR $ transportErr TEVersion
|
||||
where
|
||||
THandleParams {thVersion = v} = thParams smp
|
||||
@@ -773,13 +782,7 @@ client thParams' clnt@Client {clientId, peerId, subscriptions, ntfSubscriptions,
|
||||
stats <- asks serverStats
|
||||
atomically $ modifyTVar' (qCreated stats) (+ 1)
|
||||
atomically $ modifyTVar' (qCount stats) (+ 1)
|
||||
|
||||
now <- liftIO getCurrentTime
|
||||
statsIds' <- asks statsClients -- TVar (IntMap ClientStatsId)
|
||||
stats' <- asks clientStats -- TVar (IntMap ClientStats)
|
||||
atomically $ withClientStatId statsIds' $ \statsId -> do
|
||||
cs <- getClientStats stats' statsId now
|
||||
modifyTVar' (CS.qCreated cs) $ S.insert rId
|
||||
withClientStatsId $ \cs -> atomically $ modifyTVar' (CS.qCreated cs) $ S.insert rId
|
||||
-- TODO: increment current Q counter in IP timeline
|
||||
-- TODO: increment current Q counter in server timeline
|
||||
case subMode of
|
||||
@@ -970,36 +973,12 @@ client thParams' clnt@Client {clientId, peerId, subscriptions, ntfSubscriptions,
|
||||
atomically $ modifyTVar' (msgSent stats) (+ 1)
|
||||
atomically $ modifyTVar' (msgCount stats) (+ 1)
|
||||
atomically $ updatePeriodStats (activeQueues stats) (recipientId qr)
|
||||
|
||||
logDebug $ "Senders gonna send..."
|
||||
senders' <- asks sendSignedClients -- TMap RecipientId (TVar ClientStatsId)
|
||||
statsIds' <- asks statsClients -- TVar (IntMap ClientStatsId)
|
||||
stats' <- asks clientStats -- TVar (IntMap ClientStats)
|
||||
now <- liftIO getCurrentTime
|
||||
atomically $ case senderKey qr of
|
||||
Nothing -> withClientStatId statsIds' $ \statsId -> do
|
||||
-- unsecured queue, no merging
|
||||
cs <- getClientStats stats' statsId now
|
||||
-- XXX: perhaps only merging has to be atomic, with the var on hands, it could be a round of smaller transactions
|
||||
modifyTVar' (CS.msgSentUnsigned cs) (+ 1)
|
||||
Just _secured -> withClientStatId statsIds' $ \currentStatsId -> do
|
||||
-- secured queue, merging is possible
|
||||
senders <- readTVar senders'
|
||||
statsId <- case M.lookup (recipientId qr) senders of
|
||||
Nothing -> do
|
||||
newOwner <- newTVar currentStatsId
|
||||
writeTVar senders' $ M.insert (recipientId qr) newOwner senders
|
||||
pure currentStatsId
|
||||
Just sender -> do
|
||||
prevStatsId <- readTVar sender
|
||||
unless (prevStatsId == currentStatsId) $ do
|
||||
modifyTVar' statsIds' $ IM.insert clientId prevStatsId
|
||||
qsToTransfer <- mergeClientStats stats' prevStatsId currentStatsId
|
||||
unless (S.null qsToTransfer) $ writeTVar senders' $ S.foldl' (\os k -> M.insert k sender os) senders qsToTransfer
|
||||
pure prevStatsId
|
||||
cs <- getClientStats stats' statsId now
|
||||
modifyTVar' (CS.qSentSigned cs) $ S.insert (recipientId qr)
|
||||
modifyTVar' (CS.msgSentSigned cs) (+ 1)
|
||||
case senderKey qr of
|
||||
Nothing -> withClientStatsId $ \cs -> atomically $ modifyTVar' (CS.msgSentUnsigned cs) (+ 1)
|
||||
Just _secured -> do
|
||||
withMergedClientStatsId qr $ \cs -> do
|
||||
atomically $ modifyTVar' (CS.qSentSigned cs) $ S.insert (recipientId qr)
|
||||
atomically $ modifyTVar' (CS.msgSentSigned cs) (+ 1)
|
||||
-- TODO: increment current S counter in IP timeline
|
||||
-- TODO: increment current S counter in server timeline
|
||||
pure ok
|
||||
@@ -1158,32 +1137,61 @@ client thParams' clnt@Client {clientId, peerId, subscriptions, ntfSubscriptions,
|
||||
okResp :: Either ErrorType () -> Transmission BrokerMsg
|
||||
okResp = either err $ const ok
|
||||
|
||||
-- missing clientId entry means the client is exempt from stats
|
||||
withClientStatId statsIds' action = readTVar statsIds' >>= mapM_ action . IM.lookup clientId
|
||||
-- missing clientId entry means the client is exempt from stats
|
||||
withClientStatsId_ statsIds' getCS = IM.lookup clientId <$> readTVar statsIds' >>= mapM getCS
|
||||
|
||||
getClientStats stats' statsId now = do
|
||||
stats <- readTVar stats'
|
||||
case IM.lookup statsId stats of
|
||||
Nothing -> do
|
||||
new <- CS.newClientStats newTVar peerId now
|
||||
writeTVar stats' $ IM.insert statsId new stats
|
||||
pure new
|
||||
Just cs -> cs <$ writeTVar (CS.updatedAt cs) now
|
||||
withClientStatsId updateCS = do
|
||||
statsIds' <- asks statsClients
|
||||
stats' <- asks clientStats
|
||||
now <- liftIO getCurrentTime
|
||||
atomically (withClientStatsId_ statsIds' $ getClientStats stats' now) >>= mapM_ updateCS
|
||||
|
||||
mergeClientStats :: TVar (IntMap CS.ClientStats) -> CS.ClientStatsId -> CS.ClientStatsId -> STM (Set RecipientId)
|
||||
mergeClientStats stats' prevId curId = do
|
||||
stats <- readTVar stats'
|
||||
case (IM.lookup prevId stats, IM.lookup curId stats) of
|
||||
(_, Nothing) -> pure mempty
|
||||
(Nothing, Just cur@CS.ClientStats {qCreated}) -> do
|
||||
writeTVar stats' $ IM.insert prevId cur (IM.delete curId stats)
|
||||
readTVar qCreated
|
||||
(Just prev, Just cur) -> do
|
||||
curData@CS.ClientStatsData {_qCreated} <- CS.readClientStatsData readTVar cur
|
||||
prevData <- CS.readClientStatsData readTVar prev
|
||||
CS.writeClientStatsData prev $ CS.mergeClientStatsData prevData curData
|
||||
writeTVar stats' $ IM.delete curId stats
|
||||
pure _qCreated
|
||||
getClientStats stats' now statsId = do
|
||||
stats <- readTVar stats'
|
||||
case IM.lookup statsId stats of
|
||||
Nothing -> do
|
||||
new <- CS.newClientStats newTVar peerId now
|
||||
writeTVar stats' $ IM.insert statsId new stats
|
||||
pure new
|
||||
Just cs -> cs <$ writeTVar (CS.updatedAt cs) now
|
||||
|
||||
withMergedClientStatsId qr updateCS = do
|
||||
senders' <- asks sendSignedClients
|
||||
statsIds' <- asks statsClients
|
||||
stats' <- asks clientStats
|
||||
now <- liftIO getCurrentTime
|
||||
atomically (withClientStatsId_ statsIds' $ getMergeClientStats senders' statsIds' stats' now qr) >>= mapM_ updateCS
|
||||
|
||||
getMergeClientStats senders' statsIds' stats' now qr currentStatsId = do
|
||||
senders <- readTVar senders'
|
||||
statsId <- case M.lookup (recipientId qr) senders of
|
||||
Nothing -> do
|
||||
newOwner <- newTVar currentStatsId
|
||||
writeTVar senders' $ M.insert (recipientId qr) newOwner senders
|
||||
pure currentStatsId
|
||||
Just sender -> do
|
||||
prevStatsId <- readTVar sender
|
||||
unless (prevStatsId == currentStatsId) $ do
|
||||
modifyTVar' statsIds' $ IM.insert clientId prevStatsId
|
||||
qsToTransfer <- mergeClientStats stats' prevStatsId currentStatsId
|
||||
unless (S.null qsToTransfer) $ writeTVar senders' $ S.foldl' (\os k -> M.insert k sender os) senders qsToTransfer
|
||||
pure prevStatsId
|
||||
getClientStats stats' now statsId
|
||||
|
||||
mergeClientStats :: TVar (IntMap CS.ClientStats) -> CS.ClientStatsId -> CS.ClientStatsId -> STM (Set RecipientId)
|
||||
mergeClientStats stats' prevId curId = do
|
||||
stats <- readTVar stats'
|
||||
case (IM.lookup prevId stats, IM.lookup curId stats) of
|
||||
(_, Nothing) -> pure mempty
|
||||
(Nothing, Just cur@CS.ClientStats {qCreated}) -> do
|
||||
writeTVar stats' $ IM.insert prevId cur (IM.delete curId stats)
|
||||
readTVar qCreated
|
||||
(Just prev, Just cur) -> do
|
||||
curData@CS.ClientStatsData {_qCreated} <- CS.readClientStatsData readTVar cur
|
||||
prevData <- CS.readClientStatsData readTVar prev
|
||||
CS.writeClientStatsData prev $ CS.mergeClientStatsData prevData curData
|
||||
writeTVar stats' $ IM.delete curId stats
|
||||
pure _qCreated
|
||||
|
||||
updateDeletedStats :: QueueRec -> M ()
|
||||
updateDeletedStats q = do
|
||||
|
||||
@@ -15,11 +15,9 @@ import qualified Data.IntMap.Strict as IM
|
||||
import Data.List.NonEmpty (NonEmpty)
|
||||
import Data.Map.Strict (Map)
|
||||
import qualified Data.Map.Strict as M
|
||||
import Data.IntPSQ (IntPSQ)
|
||||
import qualified Data.IntPSQ as IP
|
||||
import Data.Time.Clock (getCurrentTime)
|
||||
import Data.Time.Clock.POSIX (POSIXTime, getPOSIXTime)
|
||||
import Data.Time.Clock.System (SystemTime, systemToUTCTime)
|
||||
import Data.Time.Clock.POSIX (getPOSIXTime)
|
||||
import Data.Time.Clock.System (SystemTime)
|
||||
import Data.X509.Validation (Fingerprint (..))
|
||||
import Network.Socket (ServiceName)
|
||||
import qualified Network.TLS as T
|
||||
@@ -34,7 +32,7 @@ import Simplex.Messaging.Server.MsgStore.STM
|
||||
import Simplex.Messaging.Server.QueueStore (NtfCreds (..), QueueRec (..))
|
||||
import Simplex.Messaging.Server.QueueStore.STM
|
||||
import Simplex.Messaging.Server.Stats
|
||||
import Simplex.Messaging.Server.Stats.Client (ClientStats, ClientStatsId, newClientStats)
|
||||
import Simplex.Messaging.Server.Stats.Client (ClientStats, ClientStatsId)
|
||||
import Simplex.Messaging.Server.Stats.Timeline (Timeline, newTimeline, perMinute)
|
||||
import Simplex.Messaging.Server.StoreLog
|
||||
import Simplex.Messaging.TMap (TMap)
|
||||
|
||||
@@ -28,8 +28,10 @@ data ClientStats = ClientStats
|
||||
qSentSigned :: TVar (Set RecipientId), -- can be IntSet with QueueRecIDs
|
||||
msgSentSigned :: TVar Int,
|
||||
msgSentUnsigned :: TVar Int,
|
||||
msgSentViaProxy :: TVar Int, -- TODO
|
||||
msgDeliveredSigned :: TVar Int
|
||||
msgDeliveredSigned :: TVar Int,
|
||||
proxyRelaysRequested :: TVar Int,
|
||||
proxyRelaysConnected :: TVar Int,
|
||||
msgSentViaProxy :: TVar Int
|
||||
}
|
||||
|
||||
-- may be combined with session duration to produce average rates (q/s, msg/s)
|
||||
@@ -42,8 +44,10 @@ data ClientStatsData = ClientStatsData
|
||||
_qSentSigned :: Set RecipientId,
|
||||
_msgSentSigned :: Int,
|
||||
_msgSentUnsigned :: Int,
|
||||
_msgSentViaProxy :: Int,
|
||||
_msgDeliveredSigned :: Int
|
||||
_msgDeliveredSigned :: Int,
|
||||
_proxyRelaysRequested :: Int,
|
||||
_proxyRelaysConnected :: Int,
|
||||
_msgSentViaProxy :: Int
|
||||
}
|
||||
|
||||
newClientStats :: Monad m => (forall a. a -> m (TVar a)) -> PeerId -> UTCTime -> m ClientStats
|
||||
@@ -56,8 +60,10 @@ newClientStats newF peerId ts = do
|
||||
qSentSigned <- newF mempty
|
||||
msgSentSigned <- newF 0
|
||||
msgSentUnsigned <- newF 0
|
||||
msgSentViaProxy <- newF 0
|
||||
msgDeliveredSigned <- newF 0
|
||||
proxyRelaysRequested <- newF 0
|
||||
proxyRelaysConnected <- newF 0
|
||||
msgSentViaProxy <- newF 0
|
||||
pure
|
||||
ClientStats
|
||||
{ peerAddresses,
|
||||
@@ -68,8 +74,10 @@ newClientStats newF peerId ts = do
|
||||
qSentSigned,
|
||||
msgSentSigned,
|
||||
msgSentUnsigned,
|
||||
msgSentViaProxy,
|
||||
msgDeliveredSigned
|
||||
msgDeliveredSigned,
|
||||
proxyRelaysRequested,
|
||||
proxyRelaysConnected,
|
||||
msgSentViaProxy
|
||||
}
|
||||
{-# INLINE newClientStats #-}
|
||||
|
||||
@@ -83,8 +91,10 @@ readClientStatsData readF cs = do
|
||||
_qSentSigned <- readF $ qSentSigned cs
|
||||
_msgSentSigned <- readF $ msgSentSigned cs
|
||||
_msgSentUnsigned <- readF $ msgSentUnsigned cs
|
||||
_msgSentViaProxy <- readF $ msgSentViaProxy cs
|
||||
_msgDeliveredSigned <- readF $ msgDeliveredSigned cs
|
||||
_proxyRelaysRequested <- readF $ proxyRelaysRequested cs
|
||||
_proxyRelaysConnected <- readF $ proxyRelaysConnected cs
|
||||
_msgSentViaProxy <- readF $ msgSentViaProxy cs
|
||||
pure
|
||||
ClientStatsData
|
||||
{ _peerAddresses,
|
||||
@@ -95,8 +105,10 @@ readClientStatsData readF cs = do
|
||||
_qSentSigned,
|
||||
_msgSentSigned,
|
||||
_msgSentUnsigned,
|
||||
_msgSentViaProxy,
|
||||
_msgDeliveredSigned
|
||||
_msgDeliveredSigned,
|
||||
_proxyRelaysRequested,
|
||||
_proxyRelaysConnected,
|
||||
_msgSentViaProxy
|
||||
}
|
||||
{-# INLINE readClientStatsData #-}
|
||||
|
||||
@@ -110,8 +122,10 @@ writeClientStatsData cs csd = do
|
||||
writeTVar (qSentSigned cs) (_qSentSigned csd)
|
||||
writeTVar (msgSentSigned cs) (_msgSentSigned csd)
|
||||
writeTVar (msgSentUnsigned cs) (_msgSentUnsigned csd)
|
||||
writeTVar (msgSentViaProxy cs) (_msgSentViaProxy csd)
|
||||
writeTVar (msgDeliveredSigned cs) (_msgDeliveredSigned csd)
|
||||
writeTVar (proxyRelaysRequested cs) (_proxyRelaysRequested csd)
|
||||
writeTVar (proxyRelaysConnected cs) (_proxyRelaysConnected csd)
|
||||
writeTVar (msgSentViaProxy cs) (_msgSentViaProxy csd)
|
||||
|
||||
mergeClientStatsData :: ClientStatsData -> ClientStatsData -> ClientStatsData
|
||||
mergeClientStatsData a b =
|
||||
@@ -124,6 +138,8 @@ mergeClientStatsData a b =
|
||||
_qSentSigned = _qSentSigned a <> _qSentSigned b,
|
||||
_msgSentSigned = _msgSentSigned a + _msgSentSigned b,
|
||||
_msgSentUnsigned = _msgSentUnsigned a + _msgSentUnsigned b,
|
||||
_msgSentViaProxy = _msgSentViaProxy a + _msgSentViaProxy b,
|
||||
_msgDeliveredSigned = _msgDeliveredSigned a + _msgDeliveredSigned b
|
||||
_msgDeliveredSigned = _msgDeliveredSigned a + _msgDeliveredSigned b,
|
||||
_proxyRelaysRequested = _proxyRelaysRequested a + _proxyRelaysRequested b,
|
||||
_proxyRelaysConnected = _proxyRelaysConnected a + _proxyRelaysConnected b,
|
||||
_msgSentViaProxy = _msgSentViaProxy a + _msgSentViaProxy b
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user