collect agent stats (#579)

* collect agent stats

* remove comment
This commit is contained in:
Evgeny Poberezkin
2022-12-26 12:02:20 +00:00
committed by GitHub
parent fb21d9836e
commit aa17cc55c1
3 changed files with 71 additions and 17 deletions
+8
View File
@@ -79,6 +79,8 @@ module Simplex.Messaging.Agent
suspendAgent,
execAgentStoreSQL,
debugAgentLocks,
getAgentStats,
resetAgentStats,
logConnection,
)
where
@@ -314,6 +316,12 @@ execAgentStoreSQL c = withAgentEnv c . execAgentStoreSQL' c
debugAgentLocks :: AgentErrorMonad m => AgentClient -> m AgentLocks
debugAgentLocks c = withAgentEnv c $ debugAgentLocks' c
getAgentStats :: AgentErrorMonad m => AgentClient -> m [(AgentStatsKey, Int)]
getAgentStats c = readTVarIO (agentStats c) >>= mapM (\(k, cnt) -> (k,) <$> readTVarIO cnt) . M.assocs
resetAgentStats :: AgentErrorMonad m => AgentClient -> m ()
resetAgentStats = atomically . TM.clear . agentStats
withAgentEnv :: AgentClient -> ReaderT Env m a -> m a
withAgentEnv c = (`runReaderT` agentEnv c)
+54 -13
View File
@@ -63,6 +63,7 @@ module Simplex.Messaging.Agent.Client
AgentOpState (..),
AgentState (..),
AgentLocks (..),
AgentStatsKey (..),
agentOperations,
agentOperationBracket,
waitUntilActive,
@@ -81,7 +82,7 @@ module Simplex.Messaging.Agent.Client
where
import Control.Applicative ((<|>))
import Control.Concurrent (forkIO)
import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.Async (Async, uninterruptibleCancel)
import Control.Concurrent.STM (retry, stateTVar, throwSTM)
import Control.Exception (AsyncException (..))
@@ -197,6 +198,7 @@ data AgentClient = AgentClient
reconnectLocks :: TMap SMPServer Lock,
reconnections :: TVar [Async ()],
asyncClients :: TVar [Async ()],
agentStats :: TMap AgentStatsKey (TVar Int),
clientId :: Int,
agentEnv :: Env
}
@@ -225,6 +227,9 @@ data AgentLocks = AgentLocks {connLocks :: Map String String, srvLocks :: Map St
instance ToJSON AgentLocks where toEncoding = J.genericToEncoding J.defaultOptions
data AgentStatsKey = AgentStatsKey {host :: ByteString, clientTs :: ByteString, cmd :: ByteString, res :: ByteString}
deriving (Eq, Ord, Show)
newAgentClient :: InitialAgentServers -> Env -> STM AgentClient
newAgentClient InitialAgentServers {smp, ntf, netCfg} agentEnv = do
let qSize = tbqSize $ config agentEnv
@@ -257,8 +262,9 @@ newAgentClient InitialAgentServers {smp, ntf, netCfg} agentEnv = do
reconnectLocks <- TM.empty
reconnections <- newTVar []
asyncClients <- newTVar []
agentStats <- TM.empty
clientId <- stateTVar (clientCounter agentEnv) $ \i -> let i' = i + 1 in (i', i')
return AgentClient {active, rcvQ, subQ, msgQ, smpServers, smpClients, ntfServers, ntfClients, useNetworkConfig, subscrConns, activeSubs, pendingSubs, pendingMsgsQueued, smpQueueMsgQueues, smpQueueMsgDeliveries, connCmdsQueued, asyncCmdQueues, asyncCmdProcesses, ntfNetworkOp, rcvNetworkOp, msgDeliveryOp, sndNetworkOp, databaseOp, agentState, getMsgLocks, connLocks, reconnectLocks, reconnections, asyncClients, clientId, agentEnv}
return AgentClient {active, rcvQ, subQ, msgQ, smpServers, smpClients, ntfServers, ntfClients, useNetworkConfig, subscrConns, activeSubs, pendingSubs, pendingMsgsQueued, smpQueueMsgQueues, smpQueueMsgDeliveries, connCmdsQueued, asyncCmdQueues, asyncCmdProcesses, ntfNetworkOp, rcvNetworkOp, msgDeliveryOp, sndNetworkOp, databaseOp, agentState, getMsgLocks, connLocks, reconnectLocks, reconnections, asyncClients, agentStats, clientId, agentEnv}
agentClientStore :: AgentClient -> SQLiteStore
agentClientStore AgentClient {agentEnv = Env {store}} = store
@@ -306,6 +312,7 @@ getSMPServerClient c@AgentClient {active, smpClients, msgQ} srv = do
serverDown :: ([RcvQueue], [ConnId]) -> IO ()
serverDown (qs, conns) = whenM (readTVarIO active) $ do
incClientStat c client "DISCONNECT" "" `E.catch` \(e :: E.SomeException) -> print e
notifySub "" $ hostEvent DISCONNECT client
unless (null conns) $ notifySub "" $ DOWN srv conns
unless (null qs) $ do
@@ -335,7 +342,9 @@ getSMPServerClient c@AgentClient {active, smpClients, msgQ} srv = do
(client_, rs) <- subscribeQueues c srv qs
let (errs, okConns) = partitionEithers $ map (\(RcvQueue {connId}, r) -> bimap (connId,) (const connId) r) rs
liftIO $ do
unless connected $ mapM_ (notifySub "" . hostEvent CONNECT) client_
unless connected . forM_ client_ $ \cl -> do
incClientStat c cl "CONNECT" ""
notifySub "" $ hostEvent CONNECT cl
-- TODO deduplicate okConns
let conns = okConns \\ S.toList cs
unless (null conns) $ notifySub "" $ UP srv conns
@@ -362,6 +371,7 @@ getNtfServerClient c@AgentClient {active, ntfClients} srv = do
clientDisconnected :: NtfClient -> IO ()
clientDisconnected client = do
atomically $ TM.delete srv ntfClients
incClientStat c client "DISCONNECT" ""
atomically $ writeTBQueue (subQ c) ("", "", hostEvent DISCONNECT client)
logInfo . decodeUtf8 $ "Agent disconnected from " <> showServer srv
@@ -401,9 +411,11 @@ newProtocolClient c srv clients connectClient reconnectClient clientVar = tryCon
Right client -> do
logInfo . decodeUtf8 $ "Agent connected to " <> showServer srv
atomically $ putTMVar clientVar r
liftIO $ incClientStat c client "CLIENT" "OK"
atomically $ writeTBQueue (subQ c) ("", "", hostEvent CONNECT client)
successAction client
Left e -> do
liftIO $ incServerStat c srv "CLIENT" $ strEncode e
if temporaryAgentError e
then retryAction
else atomically $ do
@@ -486,23 +498,27 @@ withLockMap_ locks key = withGetLock $ TM.lookup key locks >>= maybe newLock pur
where
newLock = newEmptyTMVar >>= \l -> TM.insert key l locks $> l
withClient_ :: forall a m msg. (AgentMonad m, ProtocolServerClient msg) => AgentClient -> ProtoServer msg -> (ProtocolClient msg -> m a) -> m a
withClient_ c srv action = (getProtocolServerClient c srv >>= action) `catchError` logServerError
withClient_ :: forall a m msg. (AgentMonad m, ProtocolServerClient msg) => AgentClient -> ProtoServer msg -> ByteString -> (ProtocolClient msg -> m a) -> m a
withClient_ c srv statCmd action = do
cl <- getProtocolServerClient c srv
(action cl <* stat cl "OK") `catchError` logServerError cl
where
logServerError :: AgentErrorType -> m a
logServerError e = do
logServer "<--" c srv "" $ bshow e
stat cl = liftIO . incClientStat c cl statCmd
logServerError :: ProtocolClient msg -> AgentErrorType -> m a
logServerError cl e = do
logServer "<--" c srv "" $ strEncode e
stat cl $ bshow e
throwError e
withLogClient_ :: (AgentMonad m, ProtocolServerClient msg) => AgentClient -> ProtoServer msg -> QueueId -> ByteString -> (ProtocolClient msg -> m a) -> m a
withLogClient_ c srv qId cmdStr action = do
logServer "-->" c srv qId cmdStr
res <- withClient_ c srv action
res <- withClient_ c srv cmdStr action
logServer "<--" c srv qId "OK"
return res
withClient :: forall m msg a. (AgentMonad m, ProtocolServerClient msg, ProtocolTypeI (ProtoType msg)) => AgentClient -> ProtoServer msg -> (ProtocolClient msg -> ExceptT ProtocolClientError IO a) -> m a
withClient c srv action = withClient_ c srv $ \client -> liftClient (clientProtocolError @msg) (clientServer client) $ action client
withClient :: forall m msg a. (AgentMonad m, ProtocolServerClient msg, ProtocolTypeI (ProtoType msg)) => AgentClient -> ProtoServer msg -> ByteString -> (ProtocolClient msg -> ExceptT ProtocolClientError IO a) -> m a
withClient c srv statKey action = withClient_ c srv statKey $ \client -> liftClient (clientProtocolError @msg) (clientServer client) $ action client
withLogClient :: forall m msg a. (AgentMonad m, ProtocolServerClient msg, ProtocolTypeI (ProtoType msg)) => AgentClient -> ProtoServer msg -> QueueId -> ByteString -> (ProtocolClient msg -> ExceptT ProtocolClientError IO a) -> m a
withLogClient c srv qId cmdStr action = withLogClient_ c srv qId cmdStr $ \client -> liftClient (clientProtocolError @msg) (clientServer client) $ action client
@@ -554,6 +570,7 @@ runSMPServerTest c (ProtoServerWithAuth srv auth) = do
liftError (testErr TSSecureQueue) $ secureSMPQueue smp rpKey rcvId sKey
liftError (testErr TSDeleteQueue) $ deleteSMPQueue smp rpKey rcvId
ok <- tcpTimeout (networkConfig cfg) `timeout` closeProtocolClient smp
incClientStat c smp "TEST" "OK"
pure $ either Just (const Nothing) r <|> maybe (Just (SMPTestFailure TSDisconnect $ BROKER addr TIMEOUT)) (const Nothing) ok
Left e -> pure (Just $ testErr TSConnect e)
where
@@ -569,7 +586,7 @@ newRcvQueue c connId (ProtoServerWithAuth srv auth) vRange = do
(e2eDhKey, e2ePrivKey) <- liftIO C.generateKeyPair'
logServer "-->" c srv "" "NEW"
QIK {rcvId, sndId, rcvPublicDhKey} <-
withClient c srv $ \smp -> createSMPQueue smp rcvPrivateKey recipientKey dhKey auth
withClient c srv "NEW" $ \smp -> createSMPQueue smp rcvPrivateKey recipientKey dhKey auth
logServer "<--" c srv "" $ B.unwords ["IDS", logSecret rcvId, logSecret sndId]
let rq =
RcvQueue
@@ -642,6 +659,8 @@ subscribeQueues c srv qs = do
Right smp -> do
logServer "-->" c srv (bshow (length qs_) <> " queues") "SUB"
let qs2 = L.map queueCreds qs'
n = (length qs2 - 1) `div` 90 + 1
liftIO $ incClientStatN c smp n "SUBS" "OK"
liftIO $ do
rs <- zip qs_ . L.toList <$> subscribeSMPQueues smp qs2
mapM_ (uncurry $ processSubResult c) rs
@@ -769,7 +788,7 @@ sendAgentMessage c sq@SndQueue {server, sndId, sndPrivateKey} msgFlags agentMsg
agentNtfRegisterToken :: AgentMonad m => AgentClient -> NtfToken -> C.APublicVerifyKey -> C.PublicKeyX25519 -> m (NtfTokenId, C.PublicKeyX25519)
agentNtfRegisterToken c NtfToken {deviceToken, ntfServer, ntfPrivKey} ntfPubKey pubDhKey =
withClient c ntfServer $ \ntf -> ntfRegisterToken ntf ntfPrivKey (NewNtfTkn deviceToken ntfPubKey pubDhKey)
withClient c ntfServer "TNEW" $ \ntf -> ntfRegisterToken ntf ntfPrivKey (NewNtfTkn deviceToken ntfPubKey pubDhKey)
agentNtfVerifyToken :: AgentMonad m => AgentClient -> NtfTokenId -> NtfToken -> NtfRegCode -> m ()
agentNtfVerifyToken c tknId NtfToken {ntfServer, ntfPrivKey} code =
@@ -925,3 +944,25 @@ storeError = \case
-- NOTE: network IO should NOT be used inside AgentStoreMonad
SEAgentError e -> e
e -> INTERNAL $ show e
incStat :: AgentClient -> Int -> AgentStatsKey -> STM ()
incStat AgentClient {agentStats} n k = do
TM.lookup k agentStats >>= \case
Just v -> modifyTVar v (+ n)
_ -> newTVar n >>= \v -> TM.insert k v agentStats
incClientStat :: AgentClient -> ProtocolClient msg -> ByteString -> ByteString -> IO ()
incClientStat c pc = incClientStatN c pc 1
incServerStat :: AgentClient -> ProtocolServer p -> ByteString -> ByteString -> IO ()
incServerStat c ProtocolServer {host} cmd res = do
threadDelay 100000
atomically $ incStat c 1 statsKey
where
statsKey = AgentStatsKey {host = strEncode $ L.head host, clientTs = "", cmd, res}
incClientStatN :: AgentClient -> ProtocolClient msg -> Int -> ByteString -> ByteString -> IO ()
incClientStatN c pc n cmd res = do
atomically $ incStat c n statsKey
where
statsKey = AgentStatsKey {host = strEncode $ transportHost pc, clientTs = strEncode $ sessionTs pc, cmd, res}
+9 -4
View File
@@ -26,7 +26,7 @@
-- See https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md
module Simplex.Messaging.Client
( -- * Connect (disconnect) client to (from) SMP server
ProtocolClient (thVersion, sessionId, transportHost),
ProtocolClient (thVersion, sessionId, sessionTs, transportHost),
SMPClient,
getProtocolClient,
closeProtocolClient,
@@ -77,6 +77,7 @@ import Data.List (find)
import Data.List.NonEmpty (NonEmpty)
import qualified Data.List.NonEmpty as L
import Data.Maybe (fromMaybe)
import Data.Time.Clock (UTCTime, getCurrentTime)
import GHC.Generics (Generic)
import Network.Socket (ServiceName)
import Numeric.Natural
@@ -101,6 +102,7 @@ data ProtocolClient msg = ProtocolClient
{ action :: Async (),
connected :: TVar Bool,
sessionId :: SessionId,
sessionTs :: UTCTime,
thVersion :: Version,
protocolServer :: ProtoServer msg,
transportHost :: TransportHost,
@@ -214,7 +216,7 @@ chooseTransportHost NetworkConfig {socksProxy, hostMode, requiredHostMode} hosts
onionHost = find isOnionHost hosts
publicHost = find (not . isOnionHost) hosts
clientServer :: ProtocolTypeI (ProtoType msg) => ProtocolClient msg -> String
clientServer :: ProtocolTypeI (ProtoType msg) => ProtocolClient msg -> String
clientServer = B.unpack . strEncode . protocolServer
-- | Connects to 'ProtocolServer' using passed client configuration
@@ -242,6 +244,7 @@ getProtocolClient protocolServer cfg@ProtocolClientConfig {qSize, networkConfig,
ProtocolClient
{ action = undefined,
sessionId = undefined,
sessionTs = undefined,
thVersion = undefined,
connected,
protocolServer,
@@ -262,8 +265,9 @@ getProtocolClient protocolServer cfg@ProtocolClientConfig {qSize, networkConfig,
runTransportClient socksProxy useHost port' (Just $ keyHash protocolServer) tcpKeepAlive (client t c thVar)
`finally` atomically (putTMVar thVar $ Left PCENetworkError)
th_ <- tcpConnectTimeout `timeout` atomically (takeTMVar thVar)
sessionTs <- getCurrentTime
pure $ case th_ of
Just (Right THandle {sessionId, thVersion}) -> Right c {action, sessionId, thVersion}
Just (Right THandle {sessionId, thVersion}) -> Right c {action, sessionId, sessionTs, thVersion}
Just (Left e) -> Left e
Nothing -> Left PCENetworkError
@@ -281,7 +285,8 @@ getProtocolClient protocolServer cfg@ProtocolClientConfig {qSize, networkConfig,
atomically $ do
writeTVar (connected c) True
putTMVar thVar $ Right th
let c' = c {sessionId, thVersion} :: ProtocolClient msg
sessionTs <- getCurrentTime
let c' = c {sessionId, thVersion, sessionTs} :: ProtocolClient msg
-- TODO remove ping if 0 is passed (or Nothing?)
raceAny_ [send c' th, process c', receive c' th, ping c']
`finally` disconnected c'