From aa17cc55c1938e4f75c54713c58e9676d8355c53 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Mon, 26 Dec 2022 12:02:20 +0000 Subject: [PATCH] collect agent stats (#579) * collect agent stats * remove comment --- src/Simplex/Messaging/Agent.hs | 8 ++++ src/Simplex/Messaging/Agent/Client.hs | 67 +++++++++++++++++++++------ src/Simplex/Messaging/Client.hs | 13 ++++-- 3 files changed, 71 insertions(+), 17 deletions(-) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 37cfe9466..763c51522 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -79,6 +79,8 @@ module Simplex.Messaging.Agent suspendAgent, execAgentStoreSQL, debugAgentLocks, + getAgentStats, + resetAgentStats, logConnection, ) where @@ -314,6 +316,12 @@ execAgentStoreSQL c = withAgentEnv c . execAgentStoreSQL' c debugAgentLocks :: AgentErrorMonad m => AgentClient -> m AgentLocks debugAgentLocks c = withAgentEnv c $ debugAgentLocks' c +getAgentStats :: AgentErrorMonad m => AgentClient -> m [(AgentStatsKey, Int)] +getAgentStats c = readTVarIO (agentStats c) >>= mapM (\(k, cnt) -> (k,) <$> readTVarIO cnt) . M.assocs + +resetAgentStats :: AgentErrorMonad m => AgentClient -> m () +resetAgentStats = atomically . TM.clear . agentStats + withAgentEnv :: AgentClient -> ReaderT Env m a -> m a withAgentEnv c = (`runReaderT` agentEnv c) diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 5104f0ed3..b94c1442c 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -63,6 +63,7 @@ module Simplex.Messaging.Agent.Client AgentOpState (..), AgentState (..), AgentLocks (..), + AgentStatsKey (..), agentOperations, agentOperationBracket, waitUntilActive, @@ -81,7 +82,7 @@ module Simplex.Messaging.Agent.Client where import Control.Applicative ((<|>)) -import Control.Concurrent (forkIO) +import Control.Concurrent (forkIO, threadDelay) import Control.Concurrent.Async (Async, uninterruptibleCancel) import Control.Concurrent.STM (retry, stateTVar, throwSTM) import Control.Exception (AsyncException (..)) @@ -197,6 +198,7 @@ data AgentClient = AgentClient reconnectLocks :: TMap SMPServer Lock, reconnections :: TVar [Async ()], asyncClients :: TVar [Async ()], + agentStats :: TMap AgentStatsKey (TVar Int), clientId :: Int, agentEnv :: Env } @@ -225,6 +227,9 @@ data AgentLocks = AgentLocks {connLocks :: Map String String, srvLocks :: Map St instance ToJSON AgentLocks where toEncoding = J.genericToEncoding J.defaultOptions +data AgentStatsKey = AgentStatsKey {host :: ByteString, clientTs :: ByteString, cmd :: ByteString, res :: ByteString} + deriving (Eq, Ord, Show) + newAgentClient :: InitialAgentServers -> Env -> STM AgentClient newAgentClient InitialAgentServers {smp, ntf, netCfg} agentEnv = do let qSize = tbqSize $ config agentEnv @@ -257,8 +262,9 @@ newAgentClient InitialAgentServers {smp, ntf, netCfg} agentEnv = do reconnectLocks <- TM.empty reconnections <- newTVar [] asyncClients <- newTVar [] + agentStats <- TM.empty clientId <- stateTVar (clientCounter agentEnv) $ \i -> let i' = i + 1 in (i', i') - return AgentClient {active, rcvQ, subQ, msgQ, smpServers, smpClients, ntfServers, ntfClients, useNetworkConfig, subscrConns, activeSubs, pendingSubs, pendingMsgsQueued, smpQueueMsgQueues, smpQueueMsgDeliveries, connCmdsQueued, asyncCmdQueues, asyncCmdProcesses, ntfNetworkOp, rcvNetworkOp, msgDeliveryOp, sndNetworkOp, databaseOp, agentState, getMsgLocks, connLocks, reconnectLocks, reconnections, asyncClients, clientId, agentEnv} + return AgentClient {active, rcvQ, subQ, msgQ, smpServers, smpClients, ntfServers, ntfClients, useNetworkConfig, subscrConns, activeSubs, pendingSubs, pendingMsgsQueued, smpQueueMsgQueues, smpQueueMsgDeliveries, connCmdsQueued, asyncCmdQueues, asyncCmdProcesses, ntfNetworkOp, rcvNetworkOp, msgDeliveryOp, sndNetworkOp, databaseOp, agentState, getMsgLocks, connLocks, reconnectLocks, reconnections, asyncClients, agentStats, clientId, agentEnv} agentClientStore :: AgentClient -> SQLiteStore agentClientStore AgentClient {agentEnv = Env {store}} = store @@ -306,6 +312,7 @@ getSMPServerClient c@AgentClient {active, smpClients, msgQ} srv = do serverDown :: ([RcvQueue], [ConnId]) -> IO () serverDown (qs, conns) = whenM (readTVarIO active) $ do + incClientStat c client "DISCONNECT" "" `E.catch` \(e :: E.SomeException) -> print e notifySub "" $ hostEvent DISCONNECT client unless (null conns) $ notifySub "" $ DOWN srv conns unless (null qs) $ do @@ -335,7 +342,9 @@ getSMPServerClient c@AgentClient {active, smpClients, msgQ} srv = do (client_, rs) <- subscribeQueues c srv qs let (errs, okConns) = partitionEithers $ map (\(RcvQueue {connId}, r) -> bimap (connId,) (const connId) r) rs liftIO $ do - unless connected $ mapM_ (notifySub "" . hostEvent CONNECT) client_ + unless connected . forM_ client_ $ \cl -> do + incClientStat c cl "CONNECT" "" + notifySub "" $ hostEvent CONNECT cl -- TODO deduplicate okConns let conns = okConns \\ S.toList cs unless (null conns) $ notifySub "" $ UP srv conns @@ -362,6 +371,7 @@ getNtfServerClient c@AgentClient {active, ntfClients} srv = do clientDisconnected :: NtfClient -> IO () clientDisconnected client = do atomically $ TM.delete srv ntfClients + incClientStat c client "DISCONNECT" "" atomically $ writeTBQueue (subQ c) ("", "", hostEvent DISCONNECT client) logInfo . decodeUtf8 $ "Agent disconnected from " <> showServer srv @@ -401,9 +411,11 @@ newProtocolClient c srv clients connectClient reconnectClient clientVar = tryCon Right client -> do logInfo . decodeUtf8 $ "Agent connected to " <> showServer srv atomically $ putTMVar clientVar r + liftIO $ incClientStat c client "CLIENT" "OK" atomically $ writeTBQueue (subQ c) ("", "", hostEvent CONNECT client) successAction client Left e -> do + liftIO $ incServerStat c srv "CLIENT" $ strEncode e if temporaryAgentError e then retryAction else atomically $ do @@ -486,23 +498,27 @@ withLockMap_ locks key = withGetLock $ TM.lookup key locks >>= maybe newLock pur where newLock = newEmptyTMVar >>= \l -> TM.insert key l locks $> l -withClient_ :: forall a m msg. (AgentMonad m, ProtocolServerClient msg) => AgentClient -> ProtoServer msg -> (ProtocolClient msg -> m a) -> m a -withClient_ c srv action = (getProtocolServerClient c srv >>= action) `catchError` logServerError +withClient_ :: forall a m msg. (AgentMonad m, ProtocolServerClient msg) => AgentClient -> ProtoServer msg -> ByteString -> (ProtocolClient msg -> m a) -> m a +withClient_ c srv statCmd action = do + cl <- getProtocolServerClient c srv + (action cl <* stat cl "OK") `catchError` logServerError cl where - logServerError :: AgentErrorType -> m a - logServerError e = do - logServer "<--" c srv "" $ bshow e + stat cl = liftIO . incClientStat c cl statCmd + logServerError :: ProtocolClient msg -> AgentErrorType -> m a + logServerError cl e = do + logServer "<--" c srv "" $ strEncode e + stat cl $ bshow e throwError e withLogClient_ :: (AgentMonad m, ProtocolServerClient msg) => AgentClient -> ProtoServer msg -> QueueId -> ByteString -> (ProtocolClient msg -> m a) -> m a withLogClient_ c srv qId cmdStr action = do logServer "-->" c srv qId cmdStr - res <- withClient_ c srv action + res <- withClient_ c srv cmdStr action logServer "<--" c srv qId "OK" return res -withClient :: forall m msg a. (AgentMonad m, ProtocolServerClient msg, ProtocolTypeI (ProtoType msg)) => AgentClient -> ProtoServer msg -> (ProtocolClient msg -> ExceptT ProtocolClientError IO a) -> m a -withClient c srv action = withClient_ c srv $ \client -> liftClient (clientProtocolError @msg) (clientServer client) $ action client +withClient :: forall m msg a. (AgentMonad m, ProtocolServerClient msg, ProtocolTypeI (ProtoType msg)) => AgentClient -> ProtoServer msg -> ByteString -> (ProtocolClient msg -> ExceptT ProtocolClientError IO a) -> m a +withClient c srv statKey action = withClient_ c srv statKey $ \client -> liftClient (clientProtocolError @msg) (clientServer client) $ action client withLogClient :: forall m msg a. (AgentMonad m, ProtocolServerClient msg, ProtocolTypeI (ProtoType msg)) => AgentClient -> ProtoServer msg -> QueueId -> ByteString -> (ProtocolClient msg -> ExceptT ProtocolClientError IO a) -> m a withLogClient c srv qId cmdStr action = withLogClient_ c srv qId cmdStr $ \client -> liftClient (clientProtocolError @msg) (clientServer client) $ action client @@ -554,6 +570,7 @@ runSMPServerTest c (ProtoServerWithAuth srv auth) = do liftError (testErr TSSecureQueue) $ secureSMPQueue smp rpKey rcvId sKey liftError (testErr TSDeleteQueue) $ deleteSMPQueue smp rpKey rcvId ok <- tcpTimeout (networkConfig cfg) `timeout` closeProtocolClient smp + incClientStat c smp "TEST" "OK" pure $ either Just (const Nothing) r <|> maybe (Just (SMPTestFailure TSDisconnect $ BROKER addr TIMEOUT)) (const Nothing) ok Left e -> pure (Just $ testErr TSConnect e) where @@ -569,7 +586,7 @@ newRcvQueue c connId (ProtoServerWithAuth srv auth) vRange = do (e2eDhKey, e2ePrivKey) <- liftIO C.generateKeyPair' logServer "-->" c srv "" "NEW" QIK {rcvId, sndId, rcvPublicDhKey} <- - withClient c srv $ \smp -> createSMPQueue smp rcvPrivateKey recipientKey dhKey auth + withClient c srv "NEW" $ \smp -> createSMPQueue smp rcvPrivateKey recipientKey dhKey auth logServer "<--" c srv "" $ B.unwords ["IDS", logSecret rcvId, logSecret sndId] let rq = RcvQueue @@ -642,6 +659,8 @@ subscribeQueues c srv qs = do Right smp -> do logServer "-->" c srv (bshow (length qs_) <> " queues") "SUB" let qs2 = L.map queueCreds qs' + n = (length qs2 - 1) `div` 90 + 1 + liftIO $ incClientStatN c smp n "SUBS" "OK" liftIO $ do rs <- zip qs_ . L.toList <$> subscribeSMPQueues smp qs2 mapM_ (uncurry $ processSubResult c) rs @@ -769,7 +788,7 @@ sendAgentMessage c sq@SndQueue {server, sndId, sndPrivateKey} msgFlags agentMsg agentNtfRegisterToken :: AgentMonad m => AgentClient -> NtfToken -> C.APublicVerifyKey -> C.PublicKeyX25519 -> m (NtfTokenId, C.PublicKeyX25519) agentNtfRegisterToken c NtfToken {deviceToken, ntfServer, ntfPrivKey} ntfPubKey pubDhKey = - withClient c ntfServer $ \ntf -> ntfRegisterToken ntf ntfPrivKey (NewNtfTkn deviceToken ntfPubKey pubDhKey) + withClient c ntfServer "TNEW" $ \ntf -> ntfRegisterToken ntf ntfPrivKey (NewNtfTkn deviceToken ntfPubKey pubDhKey) agentNtfVerifyToken :: AgentMonad m => AgentClient -> NtfTokenId -> NtfToken -> NtfRegCode -> m () agentNtfVerifyToken c tknId NtfToken {ntfServer, ntfPrivKey} code = @@ -925,3 +944,25 @@ storeError = \case -- NOTE: network IO should NOT be used inside AgentStoreMonad SEAgentError e -> e e -> INTERNAL $ show e + +incStat :: AgentClient -> Int -> AgentStatsKey -> STM () +incStat AgentClient {agentStats} n k = do + TM.lookup k agentStats >>= \case + Just v -> modifyTVar v (+ n) + _ -> newTVar n >>= \v -> TM.insert k v agentStats + +incClientStat :: AgentClient -> ProtocolClient msg -> ByteString -> ByteString -> IO () +incClientStat c pc = incClientStatN c pc 1 + +incServerStat :: AgentClient -> ProtocolServer p -> ByteString -> ByteString -> IO () +incServerStat c ProtocolServer {host} cmd res = do + threadDelay 100000 + atomically $ incStat c 1 statsKey + where + statsKey = AgentStatsKey {host = strEncode $ L.head host, clientTs = "", cmd, res} + +incClientStatN :: AgentClient -> ProtocolClient msg -> Int -> ByteString -> ByteString -> IO () +incClientStatN c pc n cmd res = do + atomically $ incStat c n statsKey + where + statsKey = AgentStatsKey {host = strEncode $ transportHost pc, clientTs = strEncode $ sessionTs pc, cmd, res} diff --git a/src/Simplex/Messaging/Client.hs b/src/Simplex/Messaging/Client.hs index 71e69dcbc..7d043ee0a 100644 --- a/src/Simplex/Messaging/Client.hs +++ b/src/Simplex/Messaging/Client.hs @@ -26,7 +26,7 @@ -- See https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md module Simplex.Messaging.Client ( -- * Connect (disconnect) client to (from) SMP server - ProtocolClient (thVersion, sessionId, transportHost), + ProtocolClient (thVersion, sessionId, sessionTs, transportHost), SMPClient, getProtocolClient, closeProtocolClient, @@ -77,6 +77,7 @@ import Data.List (find) import Data.List.NonEmpty (NonEmpty) import qualified Data.List.NonEmpty as L import Data.Maybe (fromMaybe) +import Data.Time.Clock (UTCTime, getCurrentTime) import GHC.Generics (Generic) import Network.Socket (ServiceName) import Numeric.Natural @@ -101,6 +102,7 @@ data ProtocolClient msg = ProtocolClient { action :: Async (), connected :: TVar Bool, sessionId :: SessionId, + sessionTs :: UTCTime, thVersion :: Version, protocolServer :: ProtoServer msg, transportHost :: TransportHost, @@ -214,7 +216,7 @@ chooseTransportHost NetworkConfig {socksProxy, hostMode, requiredHostMode} hosts onionHost = find isOnionHost hosts publicHost = find (not . isOnionHost) hosts -clientServer :: ProtocolTypeI (ProtoType msg) => ProtocolClient msg -> String +clientServer :: ProtocolTypeI (ProtoType msg) => ProtocolClient msg -> String clientServer = B.unpack . strEncode . protocolServer -- | Connects to 'ProtocolServer' using passed client configuration @@ -242,6 +244,7 @@ getProtocolClient protocolServer cfg@ProtocolClientConfig {qSize, networkConfig, ProtocolClient { action = undefined, sessionId = undefined, + sessionTs = undefined, thVersion = undefined, connected, protocolServer, @@ -262,8 +265,9 @@ getProtocolClient protocolServer cfg@ProtocolClientConfig {qSize, networkConfig, runTransportClient socksProxy useHost port' (Just $ keyHash protocolServer) tcpKeepAlive (client t c thVar) `finally` atomically (putTMVar thVar $ Left PCENetworkError) th_ <- tcpConnectTimeout `timeout` atomically (takeTMVar thVar) + sessionTs <- getCurrentTime pure $ case th_ of - Just (Right THandle {sessionId, thVersion}) -> Right c {action, sessionId, thVersion} + Just (Right THandle {sessionId, thVersion}) -> Right c {action, sessionId, sessionTs, thVersion} Just (Left e) -> Left e Nothing -> Left PCENetworkError @@ -281,7 +285,8 @@ getProtocolClient protocolServer cfg@ProtocolClientConfig {qSize, networkConfig, atomically $ do writeTVar (connected c) True putTMVar thVar $ Right th - let c' = c {sessionId, thVersion} :: ProtocolClient msg + sessionTs <- getCurrentTime + let c' = c {sessionId, thVersion, sessionTs} :: ProtocolClient msg -- TODO remove ping if 0 is passed (or Nothing?) raceAny_ [send c' th, process c', receive c' th, ping c'] `finally` disconnected c'