From ae8e1c5e9aa3155907f1bd075e9c69af5fce2bee Mon Sep 17 00:00:00 2001 From: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com> Date: Wed, 3 Jul 2024 15:36:15 +0400 Subject: [PATCH] agent: servers stats improvements, fixes (#1208) * agent: reset stats startedAt time in memory * getAgentSubsSummary * change sub counting * ack statistics * add import * instance * Revert "instance" This reverts commit 1f63740d565d43d033558ee62cfb5a8c2fe4575d. * Revert "add import" This reverts commit ef72df80144044a0c1eefa1bffab2fd01d0851b4. * modify sub counting * modify conn creation counting * use int64 * file size stats * remove import * ack err counting * conn del stats * format * new data * add data * toKB * restore connCompleted * use Int for counts * use rq from scope * remove getAgentSubsSummary * fix connCompleted * fix * revert disabling stats * use srv from scope * combine ack stats * modify * comment * count subs * refactor --------- Co-authored-by: Evgeny Poberezkin --- simplexmq.cabal | 2 +- src/Simplex/FileTransfer/Agent.hs | 8 +- src/Simplex/FileTransfer/Chunks.hs | 4 + src/Simplex/Messaging/Agent.hs | 76 +++++--- src/Simplex/Messaging/Agent/Client.hs | 48 +++-- src/Simplex/Messaging/Agent/Stats.hs | 182 +++++++++++++++++- src/Simplex/Messaging/Agent/Store.hs | 6 + src/Simplex/Messaging/Agent/Store/SQLite.hs | 7 +- .../Agent/Store/SQLite/Migrations.hs | 4 +- ...rs_stats.hs => M20240702_servers_stats.hs} | 10 +- .../Store/SQLite/Migrations/agent_schema.sql | 7 + 11 files changed, 295 insertions(+), 59 deletions(-) rename src/Simplex/Messaging/Agent/Store/SQLite/Migrations/{M20240518_servers_stats.hs => M20240702_servers_stats.hs} (79%) diff --git a/simplexmq.cabal b/simplexmq.cabal index d1fa32d43..aec362196 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -133,8 +133,8 @@ library Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240223_connections_wait_delivery Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240225_ratchet_kem Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240417_rcv_files_approved_relays - Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240518_servers_stats Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240624_snd_secure + Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240702_servers_stats Simplex.Messaging.Agent.TRcvQueues Simplex.Messaging.Client Simplex.Messaging.Client.Agent diff --git a/src/Simplex/FileTransfer/Agent.hs b/src/Simplex/FileTransfer/Agent.hs index 90dda5cbc..4683143c5 100644 --- a/src/Simplex/FileTransfer/Agent.hs +++ b/src/Simplex/FileTransfer/Agent.hs @@ -49,6 +49,7 @@ import qualified Data.Set as S import Data.Text (Text) import Data.Time.Clock (getCurrentTime) import Data.Time.Format (defaultTimeLocale, formatTime) +import Simplex.FileTransfer.Chunks (toKB) import Simplex.FileTransfer.Client (XFTPChunkSpec (..)) import Simplex.FileTransfer.Client.Main import Simplex.FileTransfer.Crypto @@ -206,7 +207,8 @@ runXFTPRcvWorker c srv Worker {doWork} = do unlessM ((approvedRelays ||) <$> ipAddressProtected') $ throwE $ FILE NOT_APPROVED fsFileTmpPath <- lift $ toFSFilePath fileTmpPath chunkPath <- uniqueCombine fsFileTmpPath $ show chunkNo - let chunkSpec = XFTPRcvChunkSpec chunkPath (unFileSize chunkSize) (unFileDigest digest) + let chSize = unFileSize chunkSize + chunkSpec = XFTPRcvChunkSpec chunkPath chSize (unFileDigest digest) relChunkPath = fileTmpPath takeFileName chunkPath agentXFTPDownloadChunk c userId digest replica chunkSpec atomically $ waitUntilForeground c @@ -221,6 +223,7 @@ runXFTPRcvWorker c srv Worker {doWork} = do liftIO . when complete $ updateRcvFileStatus db rcvFileId RFSReceived pure (entityId, complete, RFPROG rcvd total) atomically $ incXFTPServerStat c userId srv downloads + atomically $ incXFTPServerSizeStat c userId srv downloadsSize (fromIntegral $ toKB chSize) notify c entityId progress when complete . lift . void $ getXFTPRcvWorker True c Nothing @@ -506,7 +509,7 @@ runXFTPSndWorker c srv Worker {doWork} = do atomically $ incXFTPServerStat c userId srv uploadErrs sndWorkerInternalError c sndFileId sndFileEntityId (Just filePrefixPath) e uploadFileChunk :: AgentConfig -> SndFileChunk -> SndFileChunkReplica -> AM () - uploadFileChunk AgentConfig {xftpMaxRecipientsPerRequest = maxRecipients} sndFileChunk@SndFileChunk {sndFileId, userId, chunkSpec = chunkSpec@XFTPChunkSpec {filePath}, digest = chunkDigest} replica = do + uploadFileChunk AgentConfig {xftpMaxRecipientsPerRequest = maxRecipients} sndFileChunk@SndFileChunk {sndFileId, userId, chunkSpec = chunkSpec@XFTPChunkSpec {filePath, chunkSize = chSize}, digest = chunkDigest} replica = do replica'@SndFileChunkReplica {sndChunkReplicaId} <- addRecipients sndFileChunk replica fsFilePath <- lift $ toFSFilePath filePath unlessM (doesFileExist fsFilePath) $ throwE $ FILE NO_FILE @@ -521,6 +524,7 @@ runXFTPSndWorker c srv Worker {doWork} = do total = totalSize chunks complete = all chunkUploaded chunks atomically $ incXFTPServerStat c userId srv uploads + atomically $ incXFTPServerSizeStat c userId srv uploadsSize (fromIntegral $ toKB chSize) notify c sndFileEntityId $ SFPROG uploaded total when complete $ do (sndDescr, rcvDescrs) <- sndFileToDescrs sf diff --git a/src/Simplex/FileTransfer/Chunks.hs b/src/Simplex/FileTransfer/Chunks.hs index 0b35649c5..d8890944d 100644 --- a/src/Simplex/FileTransfer/Chunks.hs +++ b/src/Simplex/FileTransfer/Chunks.hs @@ -26,6 +26,10 @@ kb :: Integral a => a -> a kb n = 1024 * n {-# INLINE kb #-} +toKB :: Integral a => a -> a +toKB n = n `div` 1024 +{-# INLINE toKB #-} + mb :: Integral a => a -> a mb n = 1024 * kb n {-# INLINE mb #-} diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 32bfa4198..480c8f801 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -157,6 +157,7 @@ import Simplex.Messaging.Agent.NtfSubSupervisor import Simplex.Messaging.Agent.Protocol import Simplex.Messaging.Agent.RetryInterval import Simplex.Messaging.Agent.Stats +import Simplex.Messaging.Agent.Stats (AgentSMPServerStats (connSubErrs)) import Simplex.Messaging.Agent.Store import Simplex.Messaging.Agent.Store.SQLite import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB @@ -210,14 +211,14 @@ getSMPAgentClient_ clientId cfg initServers store backgroundMode = runAgentThreads c | backgroundMode = run c "subscriber" $ subscriber c | otherwise = do - -- restoreServersStats c + restoreServersStats c raceAny_ [ run c "subscriber" $ subscriber c, run c "runNtfSupervisor" $ runNtfSupervisor c, - run c "cleanupManager" $ cleanupManager c - -- run c "logServersStats" $ logServersStats c + run c "cleanupManager" $ cleanupManager c, + run c "logServersStats" $ logServersStats c ] - -- `E.finally` saveServersStats c + `E.finally` saveServersStats c run AgentClient {subQ, acThread} name a = a `E.catchAny` \e -> whenM (isJust <$> readTVarIO acThread) $ do logError $ "Agent thread " <> name <> " crashed: " <> tshow e @@ -234,13 +235,12 @@ logServersStats c = do saveServersStats :: AgentClient -> AM' () saveServersStats c@AgentClient {subQ, smpServersStats, xftpServersStats} = do - -- sss <- mapM (lift . getAgentSMPServerStats) =<< readTVarIO smpServersStats - -- xss <- mapM (lift . getAgentXFTPServerStats) =<< readTVarIO xftpServersStats - -- let stats = AgentPersistedServerStats {smpServersStats = sss, xftpServersStats = xss} - -- tryAgentError' (withStore' c (`updateServersStats` stats)) >>= \case - -- Left e -> atomically $ writeTBQueue subQ ("", "", AEvt SAEConn $ ERR $ INTERNAL $ show e) - -- Right () -> pure () - pure () + sss <- mapM (lift . getAgentSMPServerStats) =<< readTVarIO smpServersStats + xss <- mapM (lift . getAgentXFTPServerStats) =<< readTVarIO xftpServersStats + let stats = AgentPersistedServerStats {smpServersStats = sss, xftpServersStats = xss} + tryAgentError' (withStore' c (`updateServersStats` stats)) >>= \case + Left e -> atomically $ writeTBQueue subQ ("", "", AEvt SAEConn $ ERR $ INTERNAL $ show e) + Right () -> pure () restoreServersStats :: AgentClient -> AM' () restoreServersStats c@AgentClient {smpServersStats, xftpServersStats, srvStatsStartedAt} = do @@ -885,6 +885,7 @@ createReplyQueue :: AgentClient -> ConnData -> SndQueue -> SubscriptionMode -> S createReplyQueue c ConnData {userId, connId, enableNtfs} SndQueue {smpClientVersion} subMode srv = do let sndSecure = False -- smpClientVersion >= sndAuthKeySMPClientVersion (rq, qUri, tSess, sessId) <- newRcvQueue c userId connId srv (versionToRange smpClientVersion) subMode sndSecure + atomically $ incSMPServerStat c userId (qServer rq) connCreated let qInfo = toVersionT qUri smpClientVersion rq' <- withStore c $ \db -> upgradeSndConnToDuplex db connId rq lift . when (subMode == SMSubscribe) $ addNewQueueSubscription c rq' tSess sessId @@ -1175,7 +1176,6 @@ runCommandProcessing c@AgentClient {subQ} server_ Worker {doWork} = do ICDeleteRcvQueue rId -> withServer $ \srv -> tryWithLock "ICDeleteRcvQueue" $ do rq <- withStore c (\db -> getDeletedRcvQueue db connId srv rId) deleteQueue c rq - atomically $ incSMPServerStat c userId srv connDeleted withStore' c (`deleteConnRcvQueue` rq) ICQSecure rId senderKey -> withServer $ \srv -> tryWithLock "ICQSecure" . withDuplexConn $ \(DuplexConnection cData rqs sqs) -> @@ -1425,7 +1425,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} ConnData {connId} sq@SndQueue {userI withStore' c $ \db -> setSndQueueStatus db sq Active case rq_ of -- party initiating connection (in v1) - Just RcvQueue {status} -> + Just rq@RcvQueue {status} -> -- it is unclear why subscribeQueue was needed here, -- message delivery can only be enabled for queues that were created in the current session or subscribed -- subscribeQueue c rq connId @@ -1435,7 +1435,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} ConnData {connId} sq@SndQueue {userI -- because it can be sent before HELLO is received -- With `status == Active` condition, CON is sent here only by the accepting party, that previously received HELLO when (status == Active) $ do - atomically $ incSMPServerStat c userId server connCompleted + atomically $ incSMPServerStat c userId (qServer rq) connCompleted notify $ CON pqEncryption -- this branch should never be reached as receive queue is created before the confirmation, _ -> logError "HELLO sent without receive queue" @@ -1627,10 +1627,14 @@ synchronizeRatchet' c connId pqSupport' force = withConnLock c connId "synchroni _ -> throwE $ CMD PROHIBITED "synchronizeRatchet: not duplex" ackQueueMessage :: AgentClient -> RcvQueue -> SMP.MsgId -> AM () -ackQueueMessage c rq srvMsgId = - sendAck c rq srvMsgId `catchAgentError` \case - SMP _ SMP.NO_MSG -> pure () - e -> throwE e +ackQueueMessage c rq@RcvQueue {userId, server} srvMsgId = do + atomically $ incSMPServerStat c userId server ackAttempts + tryAgentError (sendAck c rq srvMsgId) >>= \case + Right _ -> atomically $ incSMPServerStat c userId server ackMsgs + Left (SMP _ SMP.NO_MSG) -> atomically $ incSMPServerStat c userId server ackNoMsgErrs + Left e -> do + unless (temporaryOrHostError e) $ atomically $ incSMPServerStat c userId server ackOtherErrs + throwE e -- | Suspend SMP agent connection (OFF command) in Reader monad suspendConnection' :: AgentClient -> ConnId -> AM () @@ -1727,11 +1731,15 @@ deleteConnQueues c waitDelivery ntf rqs = do Int -> (RcvQueue, Either AgentErrorType ()) -> IO ((RcvQueue, Either AgentErrorType ()), Maybe (AM' ())) - deleteQueueRec db maxErrs (rq, r) = case r of + deleteQueueRec db maxErrs (rq@RcvQueue {userId, server}, r) = case r of Right _ -> deleteConnRcvQueue db rq $> ((rq, r), Just (notifyRQ rq Nothing)) Left e | temporaryOrHostError e && deleteErrors rq + 1 < maxErrs -> incRcvDeleteErrors db rq $> ((rq, r), Nothing) - | otherwise -> deleteConnRcvQueue db rq $> ((rq, Right ()), Just (notifyRQ rq (Just e))) + | otherwise -> do + deleteConnRcvQueue db rq + -- attempts and successes are counted in deleteQueues function + atomically $ incSMPServerStat c userId server connDeleted + pure ((rq, Right ()), Just (notifyRQ rq (Just e))) notifyRQ rq e_ = notify ("", qConnId rq, AEvt SAEConn $ DEL_RCVQ (qServer rq) (queueId rq) e_) notify = when ntf . atomically . writeTBQueue (subQ c) connResults :: [(RcvQueue, Either AgentErrorType ())] -> Map ConnId (Either AgentErrorType ()) @@ -2009,10 +2017,12 @@ setNtfServers c = atomically . writeTVar (ntfServers c) {-# INLINE setNtfServers #-} resetAgentServersStats' :: AgentClient -> AM () -resetAgentServersStats' c@AgentClient {smpServersStats, xftpServersStats} = do +resetAgentServersStats' c@AgentClient {smpServersStats, xftpServersStats, srvStatsStartedAt} = do + startedAt <- liftIO getCurrentTime + atomically $ writeTVar srvStatsStartedAt startedAt atomically $ TM.clear smpServersStats atomically $ TM.clear xftpServersStats - withStore' c resetServersStats + withStore' c (`resetServersStats` startedAt) -- | Activate operations foregroundAgent :: AgentClient -> IO () @@ -2146,7 +2156,7 @@ data ACKd = ACKd | ACKPending -- It cannot be finally, as sometimes it needs to be ACK+DEL, -- and sometimes ACK has to be sent from the consumer. processSMPTransmissions :: AgentClient -> ServerTransmissionBatch SMPVersion ErrorType BrokerMsg -> AM' () -processSMPTransmissions c@AgentClient {subQ} (tSess@(_, srv, _), _v, sessId, ts) = do +processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId, ts) = do upConnIds <- newTVarIO [] forM_ ts $ \(entId, t) -> case t of STEvent msgOrErr -> @@ -2171,7 +2181,9 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(_, srv, _), _v, sessId, ts) logServer "<--" c srv entId $ "error: " <> bshow e notifyErr "" e connIds <- readTVarIO upConnIds - unless (null connIds) $ notify' "" $ UP srv connIds + unless (null connIds) $ do + notify' "" $ UP srv connIds + atomically $ incSMPServerStat' c userId srv connSubscribed $ length connIds where withRcvConn :: SMP.RecipientId -> (forall c. RcvQueue -> Connection c -> AM ()) -> AM' () withRcvConn rId a = do @@ -2182,17 +2194,19 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(_, srv, _), _v, sessId, ts) Left e -> notify' connId (ERR e) Right () -> pure () processSubOk :: RcvQueue -> TVar [ConnId] -> AM () - processSubOk rq@RcvQueue {userId, connId} upConnIds = do + processSubOk rq@RcvQueue {connId} upConnIds = atomically . whenM (isPendingSub connId) $ do addSubscription c rq modifyTVar' upConnIds (connId :) - atomically $ incSMPServerStat c userId srv connSubscribed processSubErr :: RcvQueue -> SMPClientError -> AM () - processSubErr rq@RcvQueue {userId, connId} e = do - atomically . whenM (isPendingSub connId) $ failSubscription c rq e - atomically $ incSMPServerStat c userId srv connSubErrs + processSubErr rq@RcvQueue {connId} e = do + atomically . whenM (isPendingSub connId) $ + failSubscription c rq e >> incSMPServerStat c userId srv connSubErrs lift $ notifyErr connId e - isPendingSub connId = (&&) <$> hasPendingSubscription c connId <*> activeClientSession c tSess sessId + isPendingSub connId = do + pending <- (&&) <$> hasPendingSubscription c connId <*> activeClientSession c tSess sessId + unless pending $ incSMPServerStat c userId srv connSubIgnored + pure pending notify' :: forall e m. (AEntityI e, MonadIO m) => ConnId -> AEvent e -> m () notify' connId msg = atomically $ writeTBQueue subQ ("", connId, AEvt (sAEntity @e) msg) notifyErr :: ConnId -> SMPClientError -> AM' () @@ -2201,7 +2215,7 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(_, srv, _), _v, sessId, ts) processSMP rq@RcvQueue {rcvId = rId, sndSecure, e2ePrivKey, e2eDhSecret, status} conn - cData@ConnData {userId, connId, connAgentVersion, ratchetSyncState = rss} + cData@ConnData {connId, connAgentVersion, ratchetSyncState = rss} smpMsg = withConnLock c connId "processSMP" $ case smpMsg of SMP.MSG msg@SMP.RcvMessage {msgId = srvMsgId} -> do diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 01d97f9ac..c9db8e7a7 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -142,6 +142,8 @@ module Simplex.Messaging.Agent.Client incSMPServerStat, incSMPServerStat', incXFTPServerStat, + incXFTPServerStat', + incXFTPServerSizeStat, AgentWorkersDetails (..), getAgentWorkersDetails, AgentWorkersSummary (..), @@ -174,8 +176,9 @@ import Data.Bifunctor (bimap, first, second) import Data.ByteString.Base64 import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B -import Data.Either (partitionEithers) +import Data.Either (isRight, partitionEithers) import Data.Functor (($>)) +import Data.Int (Int64) import Data.List (deleteFirstsBy, foldl', partition, (\\)) import Data.List.NonEmpty (NonEmpty (..), (<|)) import qualified Data.List.NonEmpty as L @@ -1329,13 +1332,16 @@ newRcvQueue c userId connId (ProtoServerWithAuth srv auth) vRange subMode sender pure (rq, qUri, tSess, sessId) processSubResult :: AgentClient -> RcvQueue -> Either SMPClientError () -> STM () -processSubResult c rq@RcvQueue {connId} = \case +processSubResult c rq@RcvQueue {userId, server, connId} = \case Left e -> - unless (temporaryClientError e) $ + unless (temporaryClientError e) $ do + incSMPServerStat c userId server connSubErrs failSubscription c rq e Right () -> - whenM (hasPendingSubscription c connId) $ - addSubscription c rq + ifM + (hasPendingSubscription c connId) + (incSMPServerStat c userId server connSubscribed >> addSubscription c rq) + (incSMPServerStat c userId server connSubIgnored) temporaryAgentError :: AgentErrorType -> Bool temporaryAgentError = \case @@ -1387,14 +1393,14 @@ subscribeQueues c qs = do subscribeQueues_ :: Env -> TVar (Maybe SessionId) -> SMPClient -> NonEmpty RcvQueue -> IO (BatchResponses SMPClientError ()) subscribeQueues_ env session smp qs' = do let (userId, srv, _) = transportSession' smp - atomically $ incSMPServerStat' c userId srv connSubAttempts (length qs') + atomically $ incSMPServerStat' c userId srv connSubAttempts $ length qs' rs <- sendBatch subscribeSMPQueues smp qs' active <- atomically $ ifM (activeClientSession c tSess sessId) (writeTVar session (Just sessId) >> processSubResults rs $> True) - (pure False) + (incSMPServerStat' c userId srv connSubIgnored (length rs) $> False) if active then when (hasTempErrors rs) resubscribe $> rs else do @@ -1603,7 +1609,15 @@ deleteQueue c rq@RcvQueue {rcvId, rcvPrivateKey} = do deleteSMPQueue smp rcvPrivateKey rcvId deleteQueues :: AgentClient -> [RcvQueue] -> AM' [(RcvQueue, Either AgentErrorType ())] -deleteQueues = sendTSessionBatches "DEL" id $ sendBatch deleteSMPQueues +deleteQueues c = sendTSessionBatches "DEL" id deleteQueues_ c + where + deleteQueues_ smp rqs = do + let (userId, srv, _) = transportSession' smp + atomically $ incSMPServerStat' c userId srv connDelAttempts $ length rqs + rs <- sendBatch deleteSMPQueues smp rqs + let successes = foldl' (\n (_, r) -> if isRight r then n + 1 else n) 0 rs + atomically $ incSMPServerStat' c userId srv connDeleted successes + pure rs sendAgentMessage :: AgentClient -> SndQueue -> MsgFlags -> ByteString -> AM (Maybe SMPServer) sendAgentMessage c sq@SndQueue {userId, server, sndId, sndPrivateKey} msgFlags agentMsg = do @@ -1924,12 +1938,24 @@ incSMPServerStat' AgentClient {smpServersStats} userId srv sel n = do TM.insert (userId, srv) newStats smpServersStats incXFTPServerStat :: AgentClient -> UserId -> XFTPServer -> (AgentXFTPServerStats -> TVar Int) -> STM () -incXFTPServerStat AgentClient {xftpServersStats} userId srv sel = do +incXFTPServerStat c userId srv sel = incXFTPServerStat_ c userId srv sel 1 +{-# INLINE incXFTPServerStat #-} + +incXFTPServerStat' :: AgentClient -> UserId -> XFTPServer -> (AgentXFTPServerStats -> TVar Int) -> Int -> STM () +incXFTPServerStat' = incXFTPServerStat_ +{-# INLINE incXFTPServerStat' #-} + +incXFTPServerSizeStat :: AgentClient -> UserId -> XFTPServer -> (AgentXFTPServerStats -> TVar Int64) -> Int64 -> STM () +incXFTPServerSizeStat = incXFTPServerStat_ +{-# INLINE incXFTPServerSizeStat #-} + +incXFTPServerStat_ :: Num n => AgentClient -> UserId -> XFTPServer -> (AgentXFTPServerStats -> TVar n) -> n -> STM () +incXFTPServerStat_ AgentClient {xftpServersStats} userId srv sel n = do TM.lookup (userId, srv) xftpServersStats >>= \case - Just v -> modifyTVar' (sel v) (+ 1) + Just v -> modifyTVar' (sel v) (+ n) Nothing -> do newStats <- newAgentXFTPServerStats - modifyTVar' (sel newStats) (+ 1) + modifyTVar' (sel newStats) (+ n) TM.insert (userId, srv) newStats xftpServersStats data AgentServersSummary = AgentServersSummary diff --git a/src/Simplex/Messaging/Agent/Stats.hs b/src/Simplex/Messaging/Agent/Stats.hs index c8f81a6aa..424052d74 100644 --- a/src/Simplex/Messaging/Agent/Stats.hs +++ b/src/Simplex/Messaging/Agent/Stats.hs @@ -5,6 +5,7 @@ module Simplex.Messaging.Agent.Stats where import qualified Data.Aeson.TH as J +import Data.Int (Int64) import Data.Map (Map) import Database.SQLite.Simple.FromField (FromField (..)) import Database.SQLite.Simple.ToField (ToField (..)) @@ -29,12 +30,20 @@ data AgentSMPServerStats = AgentSMPServerStats recvDuplicates :: TVar Int, -- duplicate messages received recvCryptoErrs :: TVar Int, -- message decryption errors recvErrs :: TVar Int, -- receive errors - connCreated :: TVar Int, - connSecured :: TVar Int, - connCompleted :: TVar Int, - connDeleted :: TVar Int, + ackMsgs :: TVar Int, -- total messages acknowledged + ackAttempts :: TVar Int, -- acknowledgement attempts + ackNoMsgErrs :: TVar Int, -- NO_MSG ack errors + ackOtherErrs :: TVar Int, -- other permanent ack errors (temporary accounted for in attempts) + -- conn stats are accounted for rcv queue server + connCreated :: TVar Int, -- total connections created + connSecured :: TVar Int, -- connections secured + connCompleted :: TVar Int, -- connections completed + connDeleted :: TVar Int, -- total connections deleted + connDelAttempts :: TVar Int, -- total connection deletion attempts + connDelErrs :: TVar Int, -- permanent connection deletion errors (temporary accounted for in attempts) connSubscribed :: TVar Int, -- total successful subscription connSubAttempts :: TVar Int, -- subscription attempts + connSubIgnored :: TVar Int, -- subscription results ignored (client switched to different session or it was not pending) connSubErrs :: TVar Int -- permanent subscription errors (temporary accounted for in attempts) } @@ -53,12 +62,19 @@ data AgentSMPServerStatsData = AgentSMPServerStatsData _recvDuplicates :: Int, _recvCryptoErrs :: Int, _recvErrs :: Int, + _ackMsgs :: Int, + _ackAttempts :: Int, + _ackNoMsgErrs :: Int, + _ackOtherErrs :: Int, _connCreated :: Int, _connSecured :: Int, _connCompleted :: Int, _connDeleted :: Int, + _connDelAttempts :: Int, + _connDelErrs :: Int, _connSubscribed :: Int, _connSubAttempts :: Int, + _connSubIgnored :: Int, _connSubErrs :: Int } deriving (Show) @@ -79,12 +95,19 @@ newAgentSMPServerStats = do recvDuplicates <- newTVar 0 recvCryptoErrs <- newTVar 0 recvErrs <- newTVar 0 + ackMsgs <- newTVar 0 + ackAttempts <- newTVar 0 + ackNoMsgErrs <- newTVar 0 + ackOtherErrs <- newTVar 0 connCreated <- newTVar 0 connSecured <- newTVar 0 connCompleted <- newTVar 0 connDeleted <- newTVar 0 + connDelAttempts <- newTVar 0 + connDelErrs <- newTVar 0 connSubscribed <- newTVar 0 connSubAttempts <- newTVar 0 + connSubIgnored <- newTVar 0 connSubErrs <- newTVar 0 pure AgentSMPServerStats @@ -102,15 +125,55 @@ newAgentSMPServerStats = do recvDuplicates, recvCryptoErrs, recvErrs, + ackMsgs, + ackAttempts, + ackNoMsgErrs, + ackOtherErrs, connCreated, connSecured, connCompleted, connDeleted, + connDelAttempts, + connDelErrs, connSubscribed, connSubAttempts, + connSubIgnored, connSubErrs } +newAgentSMPServerStatsData :: AgentSMPServerStatsData +newAgentSMPServerStatsData = + AgentSMPServerStatsData + { _sentDirect = 0, + _sentViaProxy = 0, + _sentProxied = 0, + _sentDirectAttempts = 0, + _sentViaProxyAttempts = 0, + _sentProxiedAttempts = 0, + _sentAuthErrs = 0, + _sentQuotaErrs = 0, + _sentExpiredErrs = 0, + _sentOtherErrs = 0, + _recvMsgs = 0, + _recvDuplicates = 0, + _recvCryptoErrs = 0, + _recvErrs = 0, + _ackMsgs = 0, + _ackAttempts = 0, + _ackNoMsgErrs = 0, + _ackOtherErrs = 0, + _connCreated = 0, + _connSecured = 0, + _connCompleted = 0, + _connDeleted = 0, + _connDelAttempts = 0, + _connDelErrs = 0, + _connSubscribed = 0, + _connSubAttempts = 0, + _connSubIgnored = 0, + _connSubErrs = 0 + } + newAgentSMPServerStats' :: AgentSMPServerStatsData -> STM AgentSMPServerStats newAgentSMPServerStats' s = do sentDirect <- newTVar $ _sentDirect s @@ -127,12 +190,19 @@ newAgentSMPServerStats' s = do recvDuplicates <- newTVar $ _recvDuplicates s recvCryptoErrs <- newTVar $ _recvCryptoErrs s recvErrs <- newTVar $ _recvErrs s + ackMsgs <- newTVar $ _ackMsgs s + ackAttempts <- newTVar $ _ackAttempts s + ackNoMsgErrs <- newTVar $ _ackNoMsgErrs s + ackOtherErrs <- newTVar $ _ackOtherErrs s connCreated <- newTVar $ _connCreated s connSecured <- newTVar $ _connSecured s connCompleted <- newTVar $ _connCompleted s connDeleted <- newTVar $ _connDeleted s + connDelAttempts <- newTVar $ _connDelAttempts s + connDelErrs <- newTVar $ _connDelErrs s connSubscribed <- newTVar $ _connSubscribed s connSubAttempts <- newTVar $ _connSubAttempts s + connSubIgnored <- newTVar $ _connSubIgnored s connSubErrs <- newTVar $ _connSubErrs s pure AgentSMPServerStats @@ -150,12 +220,19 @@ newAgentSMPServerStats' s = do recvDuplicates, recvCryptoErrs, recvErrs, + ackMsgs, + ackAttempts, + ackNoMsgErrs, + ackOtherErrs, connCreated, connSecured, connCompleted, connDeleted, + connDelAttempts, + connDelErrs, connSubscribed, connSubAttempts, + connSubIgnored, connSubErrs } @@ -177,12 +254,19 @@ getAgentSMPServerStats s = do _recvDuplicates <- readTVarIO $ recvDuplicates s _recvCryptoErrs <- readTVarIO $ recvCryptoErrs s _recvErrs <- readTVarIO $ recvErrs s + _ackMsgs <- readTVarIO $ ackMsgs s + _ackAttempts <- readTVarIO $ ackAttempts s + _ackNoMsgErrs <- readTVarIO $ ackNoMsgErrs s + _ackOtherErrs <- readTVarIO $ ackOtherErrs s _connCreated <- readTVarIO $ connCreated s _connSecured <- readTVarIO $ connSecured s _connCompleted <- readTVarIO $ connCompleted s _connDeleted <- readTVarIO $ connDeleted s + _connDelAttempts <- readTVarIO $ connDelAttempts s + _connDelErrs <- readTVarIO $ connDelErrs s _connSubscribed <- readTVarIO $ connSubscribed s _connSubAttempts <- readTVarIO $ connSubAttempts s + _connSubIgnored <- readTVarIO $ connSubIgnored s _connSubErrs <- readTVarIO $ connSubErrs s pure AgentSMPServerStatsData @@ -200,20 +284,62 @@ getAgentSMPServerStats s = do _recvDuplicates, _recvCryptoErrs, _recvErrs, + _ackMsgs, + _ackAttempts, + _ackNoMsgErrs, + _ackOtherErrs, _connCreated, _connSecured, _connCompleted, _connDeleted, + _connDelAttempts, + _connDelErrs, _connSubscribed, _connSubAttempts, + _connSubIgnored, _connSubErrs } +addSMPStatsData :: AgentSMPServerStatsData -> AgentSMPServerStatsData -> AgentSMPServerStatsData +addSMPStatsData sd1 sd2 = + AgentSMPServerStatsData + { _sentDirect = _sentDirect sd1 + _sentDirect sd2, + _sentViaProxy = _sentViaProxy sd1 + _sentViaProxy sd2, + _sentProxied = _sentProxied sd1 + _sentProxied sd2, + _sentDirectAttempts = _sentDirectAttempts sd1 + _sentDirectAttempts sd2, + _sentViaProxyAttempts = _sentViaProxyAttempts sd1 + _sentViaProxyAttempts sd2, + _sentProxiedAttempts = _sentProxiedAttempts sd1 + _sentProxiedAttempts sd2, + _sentAuthErrs = _sentAuthErrs sd1 + _sentAuthErrs sd2, + _sentQuotaErrs = _sentQuotaErrs sd1 + _sentQuotaErrs sd2, + _sentExpiredErrs = _sentExpiredErrs sd1 + _sentExpiredErrs sd2, + _sentOtherErrs = _sentOtherErrs sd1 + _sentOtherErrs sd2, + _recvMsgs = _recvMsgs sd1 + _recvMsgs sd2, + _recvDuplicates = _recvDuplicates sd1 + _recvDuplicates sd2, + _recvCryptoErrs = _recvCryptoErrs sd1 + _recvCryptoErrs sd2, + _recvErrs = _recvErrs sd1 + _recvErrs sd2, + _ackMsgs = _ackMsgs sd1 + _ackMsgs sd2, + _ackAttempts = _ackAttempts sd1 + _ackAttempts sd2, + _ackNoMsgErrs = _ackNoMsgErrs sd1 + _ackNoMsgErrs sd2, + _ackOtherErrs = _ackOtherErrs sd1 + _ackOtherErrs sd2, + _connCreated = _connCreated sd1 + _connCreated sd2, + _connSecured = _connSecured sd1 + _connSecured sd2, + _connCompleted = _connCompleted sd1 + _connCompleted sd2, + _connDeleted = _connDeleted sd1 + _connDeleted sd2, + _connDelAttempts = _connDelAttempts sd1 + _connDelAttempts sd2, + _connDelErrs = _connDelErrs sd1 + _connDelErrs sd2, + _connSubscribed = _connSubscribed sd1 + _connSubscribed sd2, + _connSubAttempts = _connSubAttempts sd1 + _connSubAttempts sd2, + _connSubIgnored = _connSubIgnored sd1 + _connSubIgnored sd2, + _connSubErrs = _connSubErrs sd1 + _connSubErrs sd2 + } + data AgentXFTPServerStats = AgentXFTPServerStats { uploads :: TVar Int, -- total replicas uploaded to server + uploadsSize :: TVar Int64, -- total size of uploaded replicas in KB uploadAttempts :: TVar Int, -- upload attempts uploadErrs :: TVar Int, -- upload errors downloads :: TVar Int, -- total replicas downloaded from server + downloadsSize :: TVar Int64, -- total size of downloaded replicas in KB downloadAttempts :: TVar Int, -- download attempts downloadAuthErrs :: TVar Int, -- download AUTH errors downloadErrs :: TVar Int, -- other download errors (excluding above) @@ -224,9 +350,11 @@ data AgentXFTPServerStats = AgentXFTPServerStats data AgentXFTPServerStatsData = AgentXFTPServerStatsData { _uploads :: Int, + _uploadsSize :: Int64, _uploadAttempts :: Int, _uploadErrs :: Int, _downloads :: Int, + _downloadsSize :: Int64, _downloadAttempts :: Int, _downloadAuthErrs :: Int, _downloadErrs :: Int, @@ -239,9 +367,11 @@ data AgentXFTPServerStatsData = AgentXFTPServerStatsData newAgentXFTPServerStats :: STM AgentXFTPServerStats newAgentXFTPServerStats = do uploads <- newTVar 0 + uploadsSize <- newTVar 0 uploadAttempts <- newTVar 0 uploadErrs <- newTVar 0 downloads <- newTVar 0 + downloadsSize <- newTVar 0 downloadAttempts <- newTVar 0 downloadAuthErrs <- newTVar 0 downloadErrs <- newTVar 0 @@ -251,9 +381,11 @@ newAgentXFTPServerStats = do pure AgentXFTPServerStats { uploads, + uploadsSize, uploadAttempts, uploadErrs, downloads, + downloadsSize, downloadAttempts, downloadAuthErrs, downloadErrs, @@ -262,12 +394,31 @@ newAgentXFTPServerStats = do deleteErrs } +newAgentXFTPServerStatsData :: AgentXFTPServerStatsData +newAgentXFTPServerStatsData = + AgentXFTPServerStatsData + { _uploads = 0, + _uploadsSize = 0, + _uploadAttempts = 0, + _uploadErrs = 0, + _downloads = 0, + _downloadsSize = 0, + _downloadAttempts = 0, + _downloadAuthErrs = 0, + _downloadErrs = 0, + _deletions = 0, + _deleteAttempts = 0, + _deleteErrs = 0 + } + newAgentXFTPServerStats' :: AgentXFTPServerStatsData -> STM AgentXFTPServerStats newAgentXFTPServerStats' s = do uploads <- newTVar $ _uploads s + uploadsSize <- newTVar $ _uploadsSize s uploadAttempts <- newTVar $ _uploadAttempts s uploadErrs <- newTVar $ _uploadErrs s downloads <- newTVar $ _downloads s + downloadsSize <- newTVar $ _downloadsSize s downloadAttempts <- newTVar $ _downloadAttempts s downloadAuthErrs <- newTVar $ _downloadAuthErrs s downloadErrs <- newTVar $ _downloadErrs s @@ -277,9 +428,11 @@ newAgentXFTPServerStats' s = do pure AgentXFTPServerStats { uploads, + uploadsSize, uploadAttempts, uploadErrs, downloads, + downloadsSize, downloadAttempts, downloadAuthErrs, downloadErrs, @@ -293,9 +446,11 @@ newAgentXFTPServerStats' s = do getAgentXFTPServerStats :: AgentXFTPServerStats -> IO AgentXFTPServerStatsData getAgentXFTPServerStats s = do _uploads <- readTVarIO $ uploads s + _uploadsSize <- readTVarIO $ uploadsSize s _uploadAttempts <- readTVarIO $ uploadAttempts s _uploadErrs <- readTVarIO $ uploadErrs s _downloads <- readTVarIO $ downloads s + _downloadsSize <- readTVarIO $ downloadsSize s _downloadAttempts <- readTVarIO $ downloadAttempts s _downloadAuthErrs <- readTVarIO $ downloadAuthErrs s _downloadErrs <- readTVarIO $ downloadErrs s @@ -305,9 +460,11 @@ getAgentXFTPServerStats s = do pure AgentXFTPServerStatsData { _uploads, + _uploadsSize, _uploadAttempts, _uploadErrs, _downloads, + _downloadsSize, _downloadAttempts, _downloadAuthErrs, _downloadErrs, @@ -316,6 +473,23 @@ getAgentXFTPServerStats s = do _deleteErrs } +addXFTPStatsData :: AgentXFTPServerStatsData -> AgentXFTPServerStatsData -> AgentXFTPServerStatsData +addXFTPStatsData sd1 sd2 = + AgentXFTPServerStatsData + { _uploads = _uploads sd1 + _uploads sd2, + _uploadsSize = _uploadsSize sd1 + _uploadsSize sd2, + _uploadAttempts = _uploadAttempts sd1 + _uploadAttempts sd2, + _uploadErrs = _uploadErrs sd1 + _uploadErrs sd2, + _downloads = _downloads sd1 + _downloads sd2, + _downloadsSize = _downloadsSize sd1 + _downloadsSize sd2, + _downloadAttempts = _downloadAttempts sd1 + _downloadAttempts sd2, + _downloadAuthErrs = _downloadAuthErrs sd1 + _downloadAuthErrs sd2, + _downloadErrs = _downloadErrs sd1 + _downloadErrs sd2, + _deletions = _deletions sd1 + _deletions sd2, + _deleteAttempts = _deleteAttempts sd1 + _deleteAttempts sd2, + _deleteErrs = _deleteErrs sd1 + _deleteErrs sd2 + } + -- Type for gathering both smp and xftp stats across all users and servers, -- to then be persisted to db as a single json. data AgentPersistedServerStats = AgentPersistedServerStats diff --git a/src/Simplex/Messaging/Agent/Store.hs b/src/Simplex/Messaging/Agent/Store.hs index baec2ef93..ae010a884 100644 --- a/src/Simplex/Messaging/Agent/Store.hs +++ b/src/Simplex/Messaging/Agent/Store.hs @@ -175,6 +175,12 @@ instance SMPQueue RcvQueue where queueId RcvQueue {rcvId} = rcvId {-# INLINE queueId #-} +instance SMPQueue NewRcvQueue where + qServer RcvQueue {server} = server + {-# INLINE qServer #-} + queueId RcvQueue {rcvId} = rcvId + {-# INLINE queueId #-} + instance SMPQueue SndQueue where qServer SndQueue {server} = server {-# INLINE qServer #-} diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index d4cd99b39..0727343e7 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -3041,10 +3041,9 @@ getServersStats db = firstRow id SEServersStatsNotFound $ DB.query_ db "SELECT started_at, servers_stats FROM servers_stats WHERE servers_stats_id = 1" -resetServersStats :: DB.Connection -> IO () -resetServersStats db = do - currentTs <- getCurrentTime - DB.execute db "UPDATE servers_stats SET servers_stats = NULL, started_at = ?, updated_at = ? WHERE servers_stats_id = 1" (currentTs, currentTs) +resetServersStats :: DB.Connection -> UTCTime -> IO () +resetServersStats db startedAt = + DB.execute db "UPDATE servers_stats SET servers_stats = NULL, started_at = ?, updated_at = ? WHERE servers_stats_id = 1" (startedAt, startedAt) $(J.deriveJSON defaultJSON ''UpMigration) diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs index 2279d7ea5..131561f4d 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs @@ -73,6 +73,7 @@ import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240223_connections_wai import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240225_ratchet_kem import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240417_rcv_files_approved_relays import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240624_snd_secure +import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240702_servers_stats import Simplex.Messaging.Encoding.String import Simplex.Messaging.Parsers (dropPrefix, sumTypeJSON) import Simplex.Messaging.Transport.Client (TransportHost) @@ -114,7 +115,8 @@ schemaMigrations = ("m20240223_connections_wait_delivery", m20240223_connections_wait_delivery, Just down_m20240223_connections_wait_delivery), ("m20240225_ratchet_kem", m20240225_ratchet_kem, Just down_m20240225_ratchet_kem), ("m20240417_rcv_files_approved_relays", m20240417_rcv_files_approved_relays, Just down_m20240417_rcv_files_approved_relays), - ("m20240624_snd_secure", m20240624_snd_secure, Just down_m20240624_snd_secure) + ("m20240624_snd_secure", m20240624_snd_secure, Just down_m20240624_snd_secure), + ("m20240702_servers_stats", m20240702_servers_stats, Just down_m20240702_servers_stats) ] -- | The list of migrations in ascending order by date diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20240518_servers_stats.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20240702_servers_stats.hs similarity index 79% rename from src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20240518_servers_stats.hs rename to src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20240702_servers_stats.hs index fe017e233..5e283d8b1 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20240518_servers_stats.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20240702_servers_stats.hs @@ -1,6 +1,6 @@ {-# LANGUAGE QuasiQuotes #-} -module Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240518_servers_stats where +module Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240702_servers_stats where import Database.SQLite.Simple (Query) import Database.SQLite.Simple.QQ (sql) @@ -8,8 +8,8 @@ import Database.SQLite.Simple.QQ (sql) -- servers_stats_id: dummy id, there should always only be one record with servers_stats_id = 1 -- servers_stats: overall accumulated stats, past and session, reset to null on stats reset -- started_at: starting point of tracking stats, reset on stats reset -m20240518_servers_stats :: Query -m20240518_servers_stats = +m20240702_servers_stats :: Query +m20240702_servers_stats = [sql| CREATE TABLE servers_stats( servers_stats_id INTEGER PRIMARY KEY, @@ -22,8 +22,8 @@ CREATE TABLE servers_stats( INSERT INTO servers_stats (servers_stats_id) VALUES (1); |] -down_m20240518_servers_stats :: Query -down_m20240518_servers_stats = +down_m20240702_servers_stats :: Query +down_m20240702_servers_stats = [sql| DROP TABLE servers_stats; |] diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql index b9d2d945f..80af08989 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql @@ -396,6 +396,13 @@ CREATE TABLE processed_ratchet_key_hashes( created_at TEXT NOT NULL DEFAULT(datetime('now')), updated_at TEXT NOT NULL DEFAULT(datetime('now')) ); +CREATE TABLE servers_stats( + servers_stats_id INTEGER PRIMARY KEY, + servers_stats TEXT, + started_at TEXT NOT NULL DEFAULT(datetime('now')), + created_at TEXT NOT NULL DEFAULT(datetime('now')), + updated_at TEXT NOT NULL DEFAULT(datetime('now')) +); CREATE UNIQUE INDEX idx_rcv_queues_ntf ON rcv_queues(host, port, ntf_id); CREATE UNIQUE INDEX idx_rcv_queue_id ON rcv_queues(conn_id, rcv_queue_id); CREATE UNIQUE INDEX idx_snd_queue_id ON snd_queues(conn_id, snd_queue_id);