agent: servers stats improvements, fixes (#1208)

* agent: reset stats startedAt time in memory

* getAgentSubsSummary

* change sub counting

* ack statistics

* add import

* instance

* Revert "instance"

This reverts commit 1f63740d56.

* Revert "add import"

This reverts commit ef72df8014.

* modify sub counting

* modify conn creation counting

* use int64

* file size stats

* remove import

* ack err counting

* conn del stats

* format

* new data

* add data

* toKB

* restore connCompleted

* use Int for counts

* use rq from scope

* remove getAgentSubsSummary

* fix connCompleted

* fix

* revert disabling stats

* use srv from scope

* combine ack stats

* modify

* comment

* count subs

* refactor

---------

Co-authored-by: Evgeny Poberezkin <evgeny@poberezkin.com>
This commit is contained in:
spaced4ndy
2024-07-03 15:36:15 +04:00
committed by GitHub
parent f392ce0a93
commit ae8e1c5e9a
11 changed files with 295 additions and 59 deletions

View File

@@ -133,8 +133,8 @@ library
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240223_connections_wait_delivery
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240225_ratchet_kem
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240417_rcv_files_approved_relays
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240518_servers_stats
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240624_snd_secure
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240702_servers_stats
Simplex.Messaging.Agent.TRcvQueues
Simplex.Messaging.Client
Simplex.Messaging.Client.Agent

View File

@@ -49,6 +49,7 @@ import qualified Data.Set as S
import Data.Text (Text)
import Data.Time.Clock (getCurrentTime)
import Data.Time.Format (defaultTimeLocale, formatTime)
import Simplex.FileTransfer.Chunks (toKB)
import Simplex.FileTransfer.Client (XFTPChunkSpec (..))
import Simplex.FileTransfer.Client.Main
import Simplex.FileTransfer.Crypto
@@ -206,7 +207,8 @@ runXFTPRcvWorker c srv Worker {doWork} = do
unlessM ((approvedRelays ||) <$> ipAddressProtected') $ throwE $ FILE NOT_APPROVED
fsFileTmpPath <- lift $ toFSFilePath fileTmpPath
chunkPath <- uniqueCombine fsFileTmpPath $ show chunkNo
let chunkSpec = XFTPRcvChunkSpec chunkPath (unFileSize chunkSize) (unFileDigest digest)
let chSize = unFileSize chunkSize
chunkSpec = XFTPRcvChunkSpec chunkPath chSize (unFileDigest digest)
relChunkPath = fileTmpPath </> takeFileName chunkPath
agentXFTPDownloadChunk c userId digest replica chunkSpec
atomically $ waitUntilForeground c
@@ -221,6 +223,7 @@ runXFTPRcvWorker c srv Worker {doWork} = do
liftIO . when complete $ updateRcvFileStatus db rcvFileId RFSReceived
pure (entityId, complete, RFPROG rcvd total)
atomically $ incXFTPServerStat c userId srv downloads
atomically $ incXFTPServerSizeStat c userId srv downloadsSize (fromIntegral $ toKB chSize)
notify c entityId progress
when complete . lift . void $
getXFTPRcvWorker True c Nothing
@@ -506,7 +509,7 @@ runXFTPSndWorker c srv Worker {doWork} = do
atomically $ incXFTPServerStat c userId srv uploadErrs
sndWorkerInternalError c sndFileId sndFileEntityId (Just filePrefixPath) e
uploadFileChunk :: AgentConfig -> SndFileChunk -> SndFileChunkReplica -> AM ()
uploadFileChunk AgentConfig {xftpMaxRecipientsPerRequest = maxRecipients} sndFileChunk@SndFileChunk {sndFileId, userId, chunkSpec = chunkSpec@XFTPChunkSpec {filePath}, digest = chunkDigest} replica = do
uploadFileChunk AgentConfig {xftpMaxRecipientsPerRequest = maxRecipients} sndFileChunk@SndFileChunk {sndFileId, userId, chunkSpec = chunkSpec@XFTPChunkSpec {filePath, chunkSize = chSize}, digest = chunkDigest} replica = do
replica'@SndFileChunkReplica {sndChunkReplicaId} <- addRecipients sndFileChunk replica
fsFilePath <- lift $ toFSFilePath filePath
unlessM (doesFileExist fsFilePath) $ throwE $ FILE NO_FILE
@@ -521,6 +524,7 @@ runXFTPSndWorker c srv Worker {doWork} = do
total = totalSize chunks
complete = all chunkUploaded chunks
atomically $ incXFTPServerStat c userId srv uploads
atomically $ incXFTPServerSizeStat c userId srv uploadsSize (fromIntegral $ toKB chSize)
notify c sndFileEntityId $ SFPROG uploaded total
when complete $ do
(sndDescr, rcvDescrs) <- sndFileToDescrs sf

View File

@@ -26,6 +26,10 @@ kb :: Integral a => a -> a
kb n = 1024 * n
{-# INLINE kb #-}
toKB :: Integral a => a -> a
toKB n = n `div` 1024
{-# INLINE toKB #-}
mb :: Integral a => a -> a
mb n = 1024 * kb n
{-# INLINE mb #-}

View File

@@ -157,6 +157,7 @@ import Simplex.Messaging.Agent.NtfSubSupervisor
import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Agent.RetryInterval
import Simplex.Messaging.Agent.Stats
import Simplex.Messaging.Agent.Stats (AgentSMPServerStats (connSubErrs))
import Simplex.Messaging.Agent.Store
import Simplex.Messaging.Agent.Store.SQLite
import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB
@@ -210,14 +211,14 @@ getSMPAgentClient_ clientId cfg initServers store backgroundMode =
runAgentThreads c
| backgroundMode = run c "subscriber" $ subscriber c
| otherwise = do
-- restoreServersStats c
restoreServersStats c
raceAny_
[ run c "subscriber" $ subscriber c,
run c "runNtfSupervisor" $ runNtfSupervisor c,
run c "cleanupManager" $ cleanupManager c
-- run c "logServersStats" $ logServersStats c
run c "cleanupManager" $ cleanupManager c,
run c "logServersStats" $ logServersStats c
]
-- `E.finally` saveServersStats c
`E.finally` saveServersStats c
run AgentClient {subQ, acThread} name a =
a `E.catchAny` \e -> whenM (isJust <$> readTVarIO acThread) $ do
logError $ "Agent thread " <> name <> " crashed: " <> tshow e
@@ -234,13 +235,12 @@ logServersStats c = do
saveServersStats :: AgentClient -> AM' ()
saveServersStats c@AgentClient {subQ, smpServersStats, xftpServersStats} = do
-- sss <- mapM (lift . getAgentSMPServerStats) =<< readTVarIO smpServersStats
-- xss <- mapM (lift . getAgentXFTPServerStats) =<< readTVarIO xftpServersStats
-- let stats = AgentPersistedServerStats {smpServersStats = sss, xftpServersStats = xss}
-- tryAgentError' (withStore' c (`updateServersStats` stats)) >>= \case
-- Left e -> atomically $ writeTBQueue subQ ("", "", AEvt SAEConn $ ERR $ INTERNAL $ show e)
-- Right () -> pure ()
pure ()
sss <- mapM (lift . getAgentSMPServerStats) =<< readTVarIO smpServersStats
xss <- mapM (lift . getAgentXFTPServerStats) =<< readTVarIO xftpServersStats
let stats = AgentPersistedServerStats {smpServersStats = sss, xftpServersStats = xss}
tryAgentError' (withStore' c (`updateServersStats` stats)) >>= \case
Left e -> atomically $ writeTBQueue subQ ("", "", AEvt SAEConn $ ERR $ INTERNAL $ show e)
Right () -> pure ()
restoreServersStats :: AgentClient -> AM' ()
restoreServersStats c@AgentClient {smpServersStats, xftpServersStats, srvStatsStartedAt} = do
@@ -885,6 +885,7 @@ createReplyQueue :: AgentClient -> ConnData -> SndQueue -> SubscriptionMode -> S
createReplyQueue c ConnData {userId, connId, enableNtfs} SndQueue {smpClientVersion} subMode srv = do
let sndSecure = False -- smpClientVersion >= sndAuthKeySMPClientVersion
(rq, qUri, tSess, sessId) <- newRcvQueue c userId connId srv (versionToRange smpClientVersion) subMode sndSecure
atomically $ incSMPServerStat c userId (qServer rq) connCreated
let qInfo = toVersionT qUri smpClientVersion
rq' <- withStore c $ \db -> upgradeSndConnToDuplex db connId rq
lift . when (subMode == SMSubscribe) $ addNewQueueSubscription c rq' tSess sessId
@@ -1175,7 +1176,6 @@ runCommandProcessing c@AgentClient {subQ} server_ Worker {doWork} = do
ICDeleteRcvQueue rId -> withServer $ \srv -> tryWithLock "ICDeleteRcvQueue" $ do
rq <- withStore c (\db -> getDeletedRcvQueue db connId srv rId)
deleteQueue c rq
atomically $ incSMPServerStat c userId srv connDeleted
withStore' c (`deleteConnRcvQueue` rq)
ICQSecure rId senderKey ->
withServer $ \srv -> tryWithLock "ICQSecure" . withDuplexConn $ \(DuplexConnection cData rqs sqs) ->
@@ -1425,7 +1425,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} ConnData {connId} sq@SndQueue {userI
withStore' c $ \db -> setSndQueueStatus db sq Active
case rq_ of
-- party initiating connection (in v1)
Just RcvQueue {status} ->
Just rq@RcvQueue {status} ->
-- it is unclear why subscribeQueue was needed here,
-- message delivery can only be enabled for queues that were created in the current session or subscribed
-- subscribeQueue c rq connId
@@ -1435,7 +1435,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} ConnData {connId} sq@SndQueue {userI
-- because it can be sent before HELLO is received
-- With `status == Active` condition, CON is sent here only by the accepting party, that previously received HELLO
when (status == Active) $ do
atomically $ incSMPServerStat c userId server connCompleted
atomically $ incSMPServerStat c userId (qServer rq) connCompleted
notify $ CON pqEncryption
-- this branch should never be reached as receive queue is created before the confirmation,
_ -> logError "HELLO sent without receive queue"
@@ -1627,10 +1627,14 @@ synchronizeRatchet' c connId pqSupport' force = withConnLock c connId "synchroni
_ -> throwE $ CMD PROHIBITED "synchronizeRatchet: not duplex"
ackQueueMessage :: AgentClient -> RcvQueue -> SMP.MsgId -> AM ()
ackQueueMessage c rq srvMsgId =
sendAck c rq srvMsgId `catchAgentError` \case
SMP _ SMP.NO_MSG -> pure ()
e -> throwE e
ackQueueMessage c rq@RcvQueue {userId, server} srvMsgId = do
atomically $ incSMPServerStat c userId server ackAttempts
tryAgentError (sendAck c rq srvMsgId) >>= \case
Right _ -> atomically $ incSMPServerStat c userId server ackMsgs
Left (SMP _ SMP.NO_MSG) -> atomically $ incSMPServerStat c userId server ackNoMsgErrs
Left e -> do
unless (temporaryOrHostError e) $ atomically $ incSMPServerStat c userId server ackOtherErrs
throwE e
-- | Suspend SMP agent connection (OFF command) in Reader monad
suspendConnection' :: AgentClient -> ConnId -> AM ()
@@ -1727,11 +1731,15 @@ deleteConnQueues c waitDelivery ntf rqs = do
Int ->
(RcvQueue, Either AgentErrorType ()) ->
IO ((RcvQueue, Either AgentErrorType ()), Maybe (AM' ()))
deleteQueueRec db maxErrs (rq, r) = case r of
deleteQueueRec db maxErrs (rq@RcvQueue {userId, server}, r) = case r of
Right _ -> deleteConnRcvQueue db rq $> ((rq, r), Just (notifyRQ rq Nothing))
Left e
| temporaryOrHostError e && deleteErrors rq + 1 < maxErrs -> incRcvDeleteErrors db rq $> ((rq, r), Nothing)
| otherwise -> deleteConnRcvQueue db rq $> ((rq, Right ()), Just (notifyRQ rq (Just e)))
| otherwise -> do
deleteConnRcvQueue db rq
-- attempts and successes are counted in deleteQueues function
atomically $ incSMPServerStat c userId server connDeleted
pure ((rq, Right ()), Just (notifyRQ rq (Just e)))
notifyRQ rq e_ = notify ("", qConnId rq, AEvt SAEConn $ DEL_RCVQ (qServer rq) (queueId rq) e_)
notify = when ntf . atomically . writeTBQueue (subQ c)
connResults :: [(RcvQueue, Either AgentErrorType ())] -> Map ConnId (Either AgentErrorType ())
@@ -2009,10 +2017,12 @@ setNtfServers c = atomically . writeTVar (ntfServers c)
{-# INLINE setNtfServers #-}
resetAgentServersStats' :: AgentClient -> AM ()
resetAgentServersStats' c@AgentClient {smpServersStats, xftpServersStats} = do
resetAgentServersStats' c@AgentClient {smpServersStats, xftpServersStats, srvStatsStartedAt} = do
startedAt <- liftIO getCurrentTime
atomically $ writeTVar srvStatsStartedAt startedAt
atomically $ TM.clear smpServersStats
atomically $ TM.clear xftpServersStats
withStore' c resetServersStats
withStore' c (`resetServersStats` startedAt)
-- | Activate operations
foregroundAgent :: AgentClient -> IO ()
@@ -2146,7 +2156,7 @@ data ACKd = ACKd | ACKPending
-- It cannot be finally, as sometimes it needs to be ACK+DEL,
-- and sometimes ACK has to be sent from the consumer.
processSMPTransmissions :: AgentClient -> ServerTransmissionBatch SMPVersion ErrorType BrokerMsg -> AM' ()
processSMPTransmissions c@AgentClient {subQ} (tSess@(_, srv, _), _v, sessId, ts) = do
processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId, ts) = do
upConnIds <- newTVarIO []
forM_ ts $ \(entId, t) -> case t of
STEvent msgOrErr ->
@@ -2171,7 +2181,9 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(_, srv, _), _v, sessId, ts)
logServer "<--" c srv entId $ "error: " <> bshow e
notifyErr "" e
connIds <- readTVarIO upConnIds
unless (null connIds) $ notify' "" $ UP srv connIds
unless (null connIds) $ do
notify' "" $ UP srv connIds
atomically $ incSMPServerStat' c userId srv connSubscribed $ length connIds
where
withRcvConn :: SMP.RecipientId -> (forall c. RcvQueue -> Connection c -> AM ()) -> AM' ()
withRcvConn rId a = do
@@ -2182,17 +2194,19 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(_, srv, _), _v, sessId, ts)
Left e -> notify' connId (ERR e)
Right () -> pure ()
processSubOk :: RcvQueue -> TVar [ConnId] -> AM ()
processSubOk rq@RcvQueue {userId, connId} upConnIds = do
processSubOk rq@RcvQueue {connId} upConnIds =
atomically . whenM (isPendingSub connId) $ do
addSubscription c rq
modifyTVar' upConnIds (connId :)
atomically $ incSMPServerStat c userId srv connSubscribed
processSubErr :: RcvQueue -> SMPClientError -> AM ()
processSubErr rq@RcvQueue {userId, connId} e = do
atomically . whenM (isPendingSub connId) $ failSubscription c rq e
atomically $ incSMPServerStat c userId srv connSubErrs
processSubErr rq@RcvQueue {connId} e = do
atomically . whenM (isPendingSub connId) $
failSubscription c rq e >> incSMPServerStat c userId srv connSubErrs
lift $ notifyErr connId e
isPendingSub connId = (&&) <$> hasPendingSubscription c connId <*> activeClientSession c tSess sessId
isPendingSub connId = do
pending <- (&&) <$> hasPendingSubscription c connId <*> activeClientSession c tSess sessId
unless pending $ incSMPServerStat c userId srv connSubIgnored
pure pending
notify' :: forall e m. (AEntityI e, MonadIO m) => ConnId -> AEvent e -> m ()
notify' connId msg = atomically $ writeTBQueue subQ ("", connId, AEvt (sAEntity @e) msg)
notifyErr :: ConnId -> SMPClientError -> AM' ()
@@ -2201,7 +2215,7 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(_, srv, _), _v, sessId, ts)
processSMP
rq@RcvQueue {rcvId = rId, sndSecure, e2ePrivKey, e2eDhSecret, status}
conn
cData@ConnData {userId, connId, connAgentVersion, ratchetSyncState = rss}
cData@ConnData {connId, connAgentVersion, ratchetSyncState = rss}
smpMsg =
withConnLock c connId "processSMP" $ case smpMsg of
SMP.MSG msg@SMP.RcvMessage {msgId = srvMsgId} -> do

View File

@@ -142,6 +142,8 @@ module Simplex.Messaging.Agent.Client
incSMPServerStat,
incSMPServerStat',
incXFTPServerStat,
incXFTPServerStat',
incXFTPServerSizeStat,
AgentWorkersDetails (..),
getAgentWorkersDetails,
AgentWorkersSummary (..),
@@ -174,8 +176,9 @@ import Data.Bifunctor (bimap, first, second)
import Data.ByteString.Base64
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.Either (partitionEithers)
import Data.Either (isRight, partitionEithers)
import Data.Functor (($>))
import Data.Int (Int64)
import Data.List (deleteFirstsBy, foldl', partition, (\\))
import Data.List.NonEmpty (NonEmpty (..), (<|))
import qualified Data.List.NonEmpty as L
@@ -1329,13 +1332,16 @@ newRcvQueue c userId connId (ProtoServerWithAuth srv auth) vRange subMode sender
pure (rq, qUri, tSess, sessId)
processSubResult :: AgentClient -> RcvQueue -> Either SMPClientError () -> STM ()
processSubResult c rq@RcvQueue {connId} = \case
processSubResult c rq@RcvQueue {userId, server, connId} = \case
Left e ->
unless (temporaryClientError e) $
unless (temporaryClientError e) $ do
incSMPServerStat c userId server connSubErrs
failSubscription c rq e
Right () ->
whenM (hasPendingSubscription c connId) $
addSubscription c rq
ifM
(hasPendingSubscription c connId)
(incSMPServerStat c userId server connSubscribed >> addSubscription c rq)
(incSMPServerStat c userId server connSubIgnored)
temporaryAgentError :: AgentErrorType -> Bool
temporaryAgentError = \case
@@ -1387,14 +1393,14 @@ subscribeQueues c qs = do
subscribeQueues_ :: Env -> TVar (Maybe SessionId) -> SMPClient -> NonEmpty RcvQueue -> IO (BatchResponses SMPClientError ())
subscribeQueues_ env session smp qs' = do
let (userId, srv, _) = transportSession' smp
atomically $ incSMPServerStat' c userId srv connSubAttempts (length qs')
atomically $ incSMPServerStat' c userId srv connSubAttempts $ length qs'
rs <- sendBatch subscribeSMPQueues smp qs'
active <-
atomically $
ifM
(activeClientSession c tSess sessId)
(writeTVar session (Just sessId) >> processSubResults rs $> True)
(pure False)
(incSMPServerStat' c userId srv connSubIgnored (length rs) $> False)
if active
then when (hasTempErrors rs) resubscribe $> rs
else do
@@ -1603,7 +1609,15 @@ deleteQueue c rq@RcvQueue {rcvId, rcvPrivateKey} = do
deleteSMPQueue smp rcvPrivateKey rcvId
deleteQueues :: AgentClient -> [RcvQueue] -> AM' [(RcvQueue, Either AgentErrorType ())]
deleteQueues = sendTSessionBatches "DEL" id $ sendBatch deleteSMPQueues
deleteQueues c = sendTSessionBatches "DEL" id deleteQueues_ c
where
deleteQueues_ smp rqs = do
let (userId, srv, _) = transportSession' smp
atomically $ incSMPServerStat' c userId srv connDelAttempts $ length rqs
rs <- sendBatch deleteSMPQueues smp rqs
let successes = foldl' (\n (_, r) -> if isRight r then n + 1 else n) 0 rs
atomically $ incSMPServerStat' c userId srv connDeleted successes
pure rs
sendAgentMessage :: AgentClient -> SndQueue -> MsgFlags -> ByteString -> AM (Maybe SMPServer)
sendAgentMessage c sq@SndQueue {userId, server, sndId, sndPrivateKey} msgFlags agentMsg = do
@@ -1924,12 +1938,24 @@ incSMPServerStat' AgentClient {smpServersStats} userId srv sel n = do
TM.insert (userId, srv) newStats smpServersStats
incXFTPServerStat :: AgentClient -> UserId -> XFTPServer -> (AgentXFTPServerStats -> TVar Int) -> STM ()
incXFTPServerStat AgentClient {xftpServersStats} userId srv sel = do
incXFTPServerStat c userId srv sel = incXFTPServerStat_ c userId srv sel 1
{-# INLINE incXFTPServerStat #-}
incXFTPServerStat' :: AgentClient -> UserId -> XFTPServer -> (AgentXFTPServerStats -> TVar Int) -> Int -> STM ()
incXFTPServerStat' = incXFTPServerStat_
{-# INLINE incXFTPServerStat' #-}
incXFTPServerSizeStat :: AgentClient -> UserId -> XFTPServer -> (AgentXFTPServerStats -> TVar Int64) -> Int64 -> STM ()
incXFTPServerSizeStat = incXFTPServerStat_
{-# INLINE incXFTPServerSizeStat #-}
incXFTPServerStat_ :: Num n => AgentClient -> UserId -> XFTPServer -> (AgentXFTPServerStats -> TVar n) -> n -> STM ()
incXFTPServerStat_ AgentClient {xftpServersStats} userId srv sel n = do
TM.lookup (userId, srv) xftpServersStats >>= \case
Just v -> modifyTVar' (sel v) (+ 1)
Just v -> modifyTVar' (sel v) (+ n)
Nothing -> do
newStats <- newAgentXFTPServerStats
modifyTVar' (sel newStats) (+ 1)
modifyTVar' (sel newStats) (+ n)
TM.insert (userId, srv) newStats xftpServersStats
data AgentServersSummary = AgentServersSummary

View File

@@ -5,6 +5,7 @@
module Simplex.Messaging.Agent.Stats where
import qualified Data.Aeson.TH as J
import Data.Int (Int64)
import Data.Map (Map)
import Database.SQLite.Simple.FromField (FromField (..))
import Database.SQLite.Simple.ToField (ToField (..))
@@ -29,12 +30,20 @@ data AgentSMPServerStats = AgentSMPServerStats
recvDuplicates :: TVar Int, -- duplicate messages received
recvCryptoErrs :: TVar Int, -- message decryption errors
recvErrs :: TVar Int, -- receive errors
connCreated :: TVar Int,
connSecured :: TVar Int,
connCompleted :: TVar Int,
connDeleted :: TVar Int,
ackMsgs :: TVar Int, -- total messages acknowledged
ackAttempts :: TVar Int, -- acknowledgement attempts
ackNoMsgErrs :: TVar Int, -- NO_MSG ack errors
ackOtherErrs :: TVar Int, -- other permanent ack errors (temporary accounted for in attempts)
-- conn stats are accounted for rcv queue server
connCreated :: TVar Int, -- total connections created
connSecured :: TVar Int, -- connections secured
connCompleted :: TVar Int, -- connections completed
connDeleted :: TVar Int, -- total connections deleted
connDelAttempts :: TVar Int, -- total connection deletion attempts
connDelErrs :: TVar Int, -- permanent connection deletion errors (temporary accounted for in attempts)
connSubscribed :: TVar Int, -- total successful subscription
connSubAttempts :: TVar Int, -- subscription attempts
connSubIgnored :: TVar Int, -- subscription results ignored (client switched to different session or it was not pending)
connSubErrs :: TVar Int -- permanent subscription errors (temporary accounted for in attempts)
}
@@ -53,12 +62,19 @@ data AgentSMPServerStatsData = AgentSMPServerStatsData
_recvDuplicates :: Int,
_recvCryptoErrs :: Int,
_recvErrs :: Int,
_ackMsgs :: Int,
_ackAttempts :: Int,
_ackNoMsgErrs :: Int,
_ackOtherErrs :: Int,
_connCreated :: Int,
_connSecured :: Int,
_connCompleted :: Int,
_connDeleted :: Int,
_connDelAttempts :: Int,
_connDelErrs :: Int,
_connSubscribed :: Int,
_connSubAttempts :: Int,
_connSubIgnored :: Int,
_connSubErrs :: Int
}
deriving (Show)
@@ -79,12 +95,19 @@ newAgentSMPServerStats = do
recvDuplicates <- newTVar 0
recvCryptoErrs <- newTVar 0
recvErrs <- newTVar 0
ackMsgs <- newTVar 0
ackAttempts <- newTVar 0
ackNoMsgErrs <- newTVar 0
ackOtherErrs <- newTVar 0
connCreated <- newTVar 0
connSecured <- newTVar 0
connCompleted <- newTVar 0
connDeleted <- newTVar 0
connDelAttempts <- newTVar 0
connDelErrs <- newTVar 0
connSubscribed <- newTVar 0
connSubAttempts <- newTVar 0
connSubIgnored <- newTVar 0
connSubErrs <- newTVar 0
pure
AgentSMPServerStats
@@ -102,15 +125,55 @@ newAgentSMPServerStats = do
recvDuplicates,
recvCryptoErrs,
recvErrs,
ackMsgs,
ackAttempts,
ackNoMsgErrs,
ackOtherErrs,
connCreated,
connSecured,
connCompleted,
connDeleted,
connDelAttempts,
connDelErrs,
connSubscribed,
connSubAttempts,
connSubIgnored,
connSubErrs
}
newAgentSMPServerStatsData :: AgentSMPServerStatsData
newAgentSMPServerStatsData =
AgentSMPServerStatsData
{ _sentDirect = 0,
_sentViaProxy = 0,
_sentProxied = 0,
_sentDirectAttempts = 0,
_sentViaProxyAttempts = 0,
_sentProxiedAttempts = 0,
_sentAuthErrs = 0,
_sentQuotaErrs = 0,
_sentExpiredErrs = 0,
_sentOtherErrs = 0,
_recvMsgs = 0,
_recvDuplicates = 0,
_recvCryptoErrs = 0,
_recvErrs = 0,
_ackMsgs = 0,
_ackAttempts = 0,
_ackNoMsgErrs = 0,
_ackOtherErrs = 0,
_connCreated = 0,
_connSecured = 0,
_connCompleted = 0,
_connDeleted = 0,
_connDelAttempts = 0,
_connDelErrs = 0,
_connSubscribed = 0,
_connSubAttempts = 0,
_connSubIgnored = 0,
_connSubErrs = 0
}
newAgentSMPServerStats' :: AgentSMPServerStatsData -> STM AgentSMPServerStats
newAgentSMPServerStats' s = do
sentDirect <- newTVar $ _sentDirect s
@@ -127,12 +190,19 @@ newAgentSMPServerStats' s = do
recvDuplicates <- newTVar $ _recvDuplicates s
recvCryptoErrs <- newTVar $ _recvCryptoErrs s
recvErrs <- newTVar $ _recvErrs s
ackMsgs <- newTVar $ _ackMsgs s
ackAttempts <- newTVar $ _ackAttempts s
ackNoMsgErrs <- newTVar $ _ackNoMsgErrs s
ackOtherErrs <- newTVar $ _ackOtherErrs s
connCreated <- newTVar $ _connCreated s
connSecured <- newTVar $ _connSecured s
connCompleted <- newTVar $ _connCompleted s
connDeleted <- newTVar $ _connDeleted s
connDelAttempts <- newTVar $ _connDelAttempts s
connDelErrs <- newTVar $ _connDelErrs s
connSubscribed <- newTVar $ _connSubscribed s
connSubAttempts <- newTVar $ _connSubAttempts s
connSubIgnored <- newTVar $ _connSubIgnored s
connSubErrs <- newTVar $ _connSubErrs s
pure
AgentSMPServerStats
@@ -150,12 +220,19 @@ newAgentSMPServerStats' s = do
recvDuplicates,
recvCryptoErrs,
recvErrs,
ackMsgs,
ackAttempts,
ackNoMsgErrs,
ackOtherErrs,
connCreated,
connSecured,
connCompleted,
connDeleted,
connDelAttempts,
connDelErrs,
connSubscribed,
connSubAttempts,
connSubIgnored,
connSubErrs
}
@@ -177,12 +254,19 @@ getAgentSMPServerStats s = do
_recvDuplicates <- readTVarIO $ recvDuplicates s
_recvCryptoErrs <- readTVarIO $ recvCryptoErrs s
_recvErrs <- readTVarIO $ recvErrs s
_ackMsgs <- readTVarIO $ ackMsgs s
_ackAttempts <- readTVarIO $ ackAttempts s
_ackNoMsgErrs <- readTVarIO $ ackNoMsgErrs s
_ackOtherErrs <- readTVarIO $ ackOtherErrs s
_connCreated <- readTVarIO $ connCreated s
_connSecured <- readTVarIO $ connSecured s
_connCompleted <- readTVarIO $ connCompleted s
_connDeleted <- readTVarIO $ connDeleted s
_connDelAttempts <- readTVarIO $ connDelAttempts s
_connDelErrs <- readTVarIO $ connDelErrs s
_connSubscribed <- readTVarIO $ connSubscribed s
_connSubAttempts <- readTVarIO $ connSubAttempts s
_connSubIgnored <- readTVarIO $ connSubIgnored s
_connSubErrs <- readTVarIO $ connSubErrs s
pure
AgentSMPServerStatsData
@@ -200,20 +284,62 @@ getAgentSMPServerStats s = do
_recvDuplicates,
_recvCryptoErrs,
_recvErrs,
_ackMsgs,
_ackAttempts,
_ackNoMsgErrs,
_ackOtherErrs,
_connCreated,
_connSecured,
_connCompleted,
_connDeleted,
_connDelAttempts,
_connDelErrs,
_connSubscribed,
_connSubAttempts,
_connSubIgnored,
_connSubErrs
}
addSMPStatsData :: AgentSMPServerStatsData -> AgentSMPServerStatsData -> AgentSMPServerStatsData
addSMPStatsData sd1 sd2 =
AgentSMPServerStatsData
{ _sentDirect = _sentDirect sd1 + _sentDirect sd2,
_sentViaProxy = _sentViaProxy sd1 + _sentViaProxy sd2,
_sentProxied = _sentProxied sd1 + _sentProxied sd2,
_sentDirectAttempts = _sentDirectAttempts sd1 + _sentDirectAttempts sd2,
_sentViaProxyAttempts = _sentViaProxyAttempts sd1 + _sentViaProxyAttempts sd2,
_sentProxiedAttempts = _sentProxiedAttempts sd1 + _sentProxiedAttempts sd2,
_sentAuthErrs = _sentAuthErrs sd1 + _sentAuthErrs sd2,
_sentQuotaErrs = _sentQuotaErrs sd1 + _sentQuotaErrs sd2,
_sentExpiredErrs = _sentExpiredErrs sd1 + _sentExpiredErrs sd2,
_sentOtherErrs = _sentOtherErrs sd1 + _sentOtherErrs sd2,
_recvMsgs = _recvMsgs sd1 + _recvMsgs sd2,
_recvDuplicates = _recvDuplicates sd1 + _recvDuplicates sd2,
_recvCryptoErrs = _recvCryptoErrs sd1 + _recvCryptoErrs sd2,
_recvErrs = _recvErrs sd1 + _recvErrs sd2,
_ackMsgs = _ackMsgs sd1 + _ackMsgs sd2,
_ackAttempts = _ackAttempts sd1 + _ackAttempts sd2,
_ackNoMsgErrs = _ackNoMsgErrs sd1 + _ackNoMsgErrs sd2,
_ackOtherErrs = _ackOtherErrs sd1 + _ackOtherErrs sd2,
_connCreated = _connCreated sd1 + _connCreated sd2,
_connSecured = _connSecured sd1 + _connSecured sd2,
_connCompleted = _connCompleted sd1 + _connCompleted sd2,
_connDeleted = _connDeleted sd1 + _connDeleted sd2,
_connDelAttempts = _connDelAttempts sd1 + _connDelAttempts sd2,
_connDelErrs = _connDelErrs sd1 + _connDelErrs sd2,
_connSubscribed = _connSubscribed sd1 + _connSubscribed sd2,
_connSubAttempts = _connSubAttempts sd1 + _connSubAttempts sd2,
_connSubIgnored = _connSubIgnored sd1 + _connSubIgnored sd2,
_connSubErrs = _connSubErrs sd1 + _connSubErrs sd2
}
data AgentXFTPServerStats = AgentXFTPServerStats
{ uploads :: TVar Int, -- total replicas uploaded to server
uploadsSize :: TVar Int64, -- total size of uploaded replicas in KB
uploadAttempts :: TVar Int, -- upload attempts
uploadErrs :: TVar Int, -- upload errors
downloads :: TVar Int, -- total replicas downloaded from server
downloadsSize :: TVar Int64, -- total size of downloaded replicas in KB
downloadAttempts :: TVar Int, -- download attempts
downloadAuthErrs :: TVar Int, -- download AUTH errors
downloadErrs :: TVar Int, -- other download errors (excluding above)
@@ -224,9 +350,11 @@ data AgentXFTPServerStats = AgentXFTPServerStats
data AgentXFTPServerStatsData = AgentXFTPServerStatsData
{ _uploads :: Int,
_uploadsSize :: Int64,
_uploadAttempts :: Int,
_uploadErrs :: Int,
_downloads :: Int,
_downloadsSize :: Int64,
_downloadAttempts :: Int,
_downloadAuthErrs :: Int,
_downloadErrs :: Int,
@@ -239,9 +367,11 @@ data AgentXFTPServerStatsData = AgentXFTPServerStatsData
newAgentXFTPServerStats :: STM AgentXFTPServerStats
newAgentXFTPServerStats = do
uploads <- newTVar 0
uploadsSize <- newTVar 0
uploadAttempts <- newTVar 0
uploadErrs <- newTVar 0
downloads <- newTVar 0
downloadsSize <- newTVar 0
downloadAttempts <- newTVar 0
downloadAuthErrs <- newTVar 0
downloadErrs <- newTVar 0
@@ -251,9 +381,11 @@ newAgentXFTPServerStats = do
pure
AgentXFTPServerStats
{ uploads,
uploadsSize,
uploadAttempts,
uploadErrs,
downloads,
downloadsSize,
downloadAttempts,
downloadAuthErrs,
downloadErrs,
@@ -262,12 +394,31 @@ newAgentXFTPServerStats = do
deleteErrs
}
newAgentXFTPServerStatsData :: AgentXFTPServerStatsData
newAgentXFTPServerStatsData =
AgentXFTPServerStatsData
{ _uploads = 0,
_uploadsSize = 0,
_uploadAttempts = 0,
_uploadErrs = 0,
_downloads = 0,
_downloadsSize = 0,
_downloadAttempts = 0,
_downloadAuthErrs = 0,
_downloadErrs = 0,
_deletions = 0,
_deleteAttempts = 0,
_deleteErrs = 0
}
newAgentXFTPServerStats' :: AgentXFTPServerStatsData -> STM AgentXFTPServerStats
newAgentXFTPServerStats' s = do
uploads <- newTVar $ _uploads s
uploadsSize <- newTVar $ _uploadsSize s
uploadAttempts <- newTVar $ _uploadAttempts s
uploadErrs <- newTVar $ _uploadErrs s
downloads <- newTVar $ _downloads s
downloadsSize <- newTVar $ _downloadsSize s
downloadAttempts <- newTVar $ _downloadAttempts s
downloadAuthErrs <- newTVar $ _downloadAuthErrs s
downloadErrs <- newTVar $ _downloadErrs s
@@ -277,9 +428,11 @@ newAgentXFTPServerStats' s = do
pure
AgentXFTPServerStats
{ uploads,
uploadsSize,
uploadAttempts,
uploadErrs,
downloads,
downloadsSize,
downloadAttempts,
downloadAuthErrs,
downloadErrs,
@@ -293,9 +446,11 @@ newAgentXFTPServerStats' s = do
getAgentXFTPServerStats :: AgentXFTPServerStats -> IO AgentXFTPServerStatsData
getAgentXFTPServerStats s = do
_uploads <- readTVarIO $ uploads s
_uploadsSize <- readTVarIO $ uploadsSize s
_uploadAttempts <- readTVarIO $ uploadAttempts s
_uploadErrs <- readTVarIO $ uploadErrs s
_downloads <- readTVarIO $ downloads s
_downloadsSize <- readTVarIO $ downloadsSize s
_downloadAttempts <- readTVarIO $ downloadAttempts s
_downloadAuthErrs <- readTVarIO $ downloadAuthErrs s
_downloadErrs <- readTVarIO $ downloadErrs s
@@ -305,9 +460,11 @@ getAgentXFTPServerStats s = do
pure
AgentXFTPServerStatsData
{ _uploads,
_uploadsSize,
_uploadAttempts,
_uploadErrs,
_downloads,
_downloadsSize,
_downloadAttempts,
_downloadAuthErrs,
_downloadErrs,
@@ -316,6 +473,23 @@ getAgentXFTPServerStats s = do
_deleteErrs
}
addXFTPStatsData :: AgentXFTPServerStatsData -> AgentXFTPServerStatsData -> AgentXFTPServerStatsData
addXFTPStatsData sd1 sd2 =
AgentXFTPServerStatsData
{ _uploads = _uploads sd1 + _uploads sd2,
_uploadsSize = _uploadsSize sd1 + _uploadsSize sd2,
_uploadAttempts = _uploadAttempts sd1 + _uploadAttempts sd2,
_uploadErrs = _uploadErrs sd1 + _uploadErrs sd2,
_downloads = _downloads sd1 + _downloads sd2,
_downloadsSize = _downloadsSize sd1 + _downloadsSize sd2,
_downloadAttempts = _downloadAttempts sd1 + _downloadAttempts sd2,
_downloadAuthErrs = _downloadAuthErrs sd1 + _downloadAuthErrs sd2,
_downloadErrs = _downloadErrs sd1 + _downloadErrs sd2,
_deletions = _deletions sd1 + _deletions sd2,
_deleteAttempts = _deleteAttempts sd1 + _deleteAttempts sd2,
_deleteErrs = _deleteErrs sd1 + _deleteErrs sd2
}
-- Type for gathering both smp and xftp stats across all users and servers,
-- to then be persisted to db as a single json.
data AgentPersistedServerStats = AgentPersistedServerStats

View File

@@ -175,6 +175,12 @@ instance SMPQueue RcvQueue where
queueId RcvQueue {rcvId} = rcvId
{-# INLINE queueId #-}
instance SMPQueue NewRcvQueue where
qServer RcvQueue {server} = server
{-# INLINE qServer #-}
queueId RcvQueue {rcvId} = rcvId
{-# INLINE queueId #-}
instance SMPQueue SndQueue where
qServer SndQueue {server} = server
{-# INLINE qServer #-}

View File

@@ -3041,10 +3041,9 @@ getServersStats db =
firstRow id SEServersStatsNotFound $
DB.query_ db "SELECT started_at, servers_stats FROM servers_stats WHERE servers_stats_id = 1"
resetServersStats :: DB.Connection -> IO ()
resetServersStats db = do
currentTs <- getCurrentTime
DB.execute db "UPDATE servers_stats SET servers_stats = NULL, started_at = ?, updated_at = ? WHERE servers_stats_id = 1" (currentTs, currentTs)
resetServersStats :: DB.Connection -> UTCTime -> IO ()
resetServersStats db startedAt =
DB.execute db "UPDATE servers_stats SET servers_stats = NULL, started_at = ?, updated_at = ? WHERE servers_stats_id = 1" (startedAt, startedAt)
$(J.deriveJSON defaultJSON ''UpMigration)

View File

@@ -73,6 +73,7 @@ import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240223_connections_wai
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240225_ratchet_kem
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240417_rcv_files_approved_relays
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240624_snd_secure
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240702_servers_stats
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Parsers (dropPrefix, sumTypeJSON)
import Simplex.Messaging.Transport.Client (TransportHost)
@@ -114,7 +115,8 @@ schemaMigrations =
("m20240223_connections_wait_delivery", m20240223_connections_wait_delivery, Just down_m20240223_connections_wait_delivery),
("m20240225_ratchet_kem", m20240225_ratchet_kem, Just down_m20240225_ratchet_kem),
("m20240417_rcv_files_approved_relays", m20240417_rcv_files_approved_relays, Just down_m20240417_rcv_files_approved_relays),
("m20240624_snd_secure", m20240624_snd_secure, Just down_m20240624_snd_secure)
("m20240624_snd_secure", m20240624_snd_secure, Just down_m20240624_snd_secure),
("m20240702_servers_stats", m20240702_servers_stats, Just down_m20240702_servers_stats)
]
-- | The list of migrations in ascending order by date

View File

@@ -1,6 +1,6 @@
{-# LANGUAGE QuasiQuotes #-}
module Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240518_servers_stats where
module Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240702_servers_stats where
import Database.SQLite.Simple (Query)
import Database.SQLite.Simple.QQ (sql)
@@ -8,8 +8,8 @@ import Database.SQLite.Simple.QQ (sql)
-- servers_stats_id: dummy id, there should always only be one record with servers_stats_id = 1
-- servers_stats: overall accumulated stats, past and session, reset to null on stats reset
-- started_at: starting point of tracking stats, reset on stats reset
m20240518_servers_stats :: Query
m20240518_servers_stats =
m20240702_servers_stats :: Query
m20240702_servers_stats =
[sql|
CREATE TABLE servers_stats(
servers_stats_id INTEGER PRIMARY KEY,
@@ -22,8 +22,8 @@ CREATE TABLE servers_stats(
INSERT INTO servers_stats (servers_stats_id) VALUES (1);
|]
down_m20240518_servers_stats :: Query
down_m20240518_servers_stats =
down_m20240702_servers_stats :: Query
down_m20240702_servers_stats =
[sql|
DROP TABLE servers_stats;
|]

View File

@@ -396,6 +396,13 @@ CREATE TABLE processed_ratchet_key_hashes(
created_at TEXT NOT NULL DEFAULT(datetime('now')),
updated_at TEXT NOT NULL DEFAULT(datetime('now'))
);
CREATE TABLE servers_stats(
servers_stats_id INTEGER PRIMARY KEY,
servers_stats TEXT,
started_at TEXT NOT NULL DEFAULT(datetime('now')),
created_at TEXT NOT NULL DEFAULT(datetime('now')),
updated_at TEXT NOT NULL DEFAULT(datetime('now'))
);
CREATE UNIQUE INDEX idx_rcv_queues_ntf ON rcv_queues(host, port, ntf_id);
CREATE UNIQUE INDEX idx_rcv_queue_id ON rcv_queues(conn_id, rcv_queue_id);
CREATE UNIQUE INDEX idx_snd_queue_id ON snd_queues(conn_id, snd_queue_id);