smp server: add proxy stats (#1157)

* smp-server: add proxy counters

* count simplex.im messages

* update

* fix

* get own servers from INI

* remove export

---------

Co-authored-by: Evgeny Poberezkin <evgeny@poberezkin.com>
This commit is contained in:
Alexander Bondarenko
2024-05-20 19:07:33 +03:00
committed by GitHub
parent 8fe18c4f6d
commit f89d715a99
7 changed files with 271 additions and 84 deletions
-1
View File
@@ -84,7 +84,6 @@ module Simplex.Messaging.Client
ServerTransmissionBatch,
ServerTransmission (..),
ClientCommand,
HostMode (..),
-- * For testing
PCTransmission,
+28 -13
View File
@@ -54,7 +54,7 @@ import UnliftIO.Exception (Exception)
import qualified UnliftIO.Exception as E
import UnliftIO.STM
type SMPClientVar = SessionVar (Either (SMPClientError, Maybe UTCTime) SMPClient)
type SMPClientVar = SessionVar (Either (SMPClientError, Maybe UTCTime) (OwnServer, SMPClient))
data SMPClientAgentEvent
= CAConnected SMPServer
@@ -75,7 +75,8 @@ data SMPClientAgentConfig = SMPClientAgentConfig
persistErrorInterval :: NominalDiffTime,
msgQSize :: Natural,
agentQSize :: Natural,
agentSubsBatchSize :: Int
agentSubsBatchSize :: Int,
ownServerDomains :: [ByteString]
}
defaultSMPClientAgentConfig :: SMPClientAgentConfig
@@ -91,7 +92,8 @@ defaultSMPClientAgentConfig =
persistErrorInterval = 0,
msgQSize = 256,
agentQSize = 256,
agentSubsBatchSize = 900
agentSubsBatchSize = 900,
ownServerDomains = []
}
where
second = 1000000
@@ -103,13 +105,15 @@ data SMPClientAgent = SMPClientAgent
agentQ :: TBQueue SMPClientAgentEvent,
randomDrg :: TVar ChaChaDRG,
smpClients :: TMap SMPServer SMPClientVar,
smpSessions :: TMap SessionId SMPClient,
smpSessions :: TMap SessionId (OwnServer, SMPClient),
srvSubs :: TMap SMPServer (TMap SMPSub C.APrivateAuthKey),
pendingSrvSubs :: TMap SMPServer (TMap SMPSub C.APrivateAuthKey),
smpSubWorkers :: TMap SMPServer (SessionVar (Async ())),
workerSeq :: TVar Int
}
type OwnServer = Bool
newtype InternalException e = InternalException {unInternalException :: e}
deriving (Eq, Show)
@@ -163,13 +167,17 @@ newSMPClientAgent agentCfg@SMPClientAgentConfig {msgQSize, agentQSize} randomDrg
-- | Get or create SMP client for SMPServer
getSMPServerClient' :: SMPClientAgent -> SMPServer -> ExceptT SMPClientError IO SMPClient
getSMPServerClient' ca@SMPClientAgent {agentCfg, smpClients, smpSessions, workerSeq} srv =
getSMPServerClient' ca srv = snd <$> getSMPServerClient'' ca srv
{-# INLINE getSMPServerClient' #-}
getSMPServerClient'' :: SMPClientAgent -> SMPServer -> ExceptT SMPClientError IO (OwnServer, SMPClient)
getSMPServerClient'' ca@SMPClientAgent {agentCfg, smpClients, smpSessions, workerSeq} srv =
atomically getClientVar >>= either (ExceptT . newSMPClient) waitForSMPClient
where
getClientVar :: STM (Either SMPClientVar SMPClientVar)
getClientVar = getSessVar workerSeq srv smpClients
waitForSMPClient :: SMPClientVar -> ExceptT SMPClientError IO SMPClient
waitForSMPClient :: SMPClientVar -> ExceptT SMPClientError IO (OwnServer, SMPClient)
waitForSMPClient v = do
let ProtocolClientConfig {networkConfig = NetworkConfig {tcpConnectTimeout}} = smpCfg agentCfg
smpClient_ <- liftIO $ tcpConnectTimeout `timeout` atomically (readTMVar $ sessionVar v)
@@ -179,20 +187,22 @@ getSMPServerClient' ca@SMPClientAgent {agentCfg, smpClients, smpSessions, worker
Just (Left (e, Just ts)) ->
ifM
((ts <) <$> liftIO getCurrentTime)
(atomically (removeSessVar v srv smpClients) >> getSMPServerClient' ca srv)
(atomically (removeSessVar v srv smpClients) >> getSMPServerClient'' ca srv)
(throwE e)
Nothing -> throwE PCEResponseTimeout
newSMPClient :: SMPClientVar -> IO (Either SMPClientError SMPClient)
newSMPClient :: SMPClientVar -> IO (Either SMPClientError (OwnServer, SMPClient))
newSMPClient v = do
r <- connectClient ca srv v `E.catch` (pure . Left . PCEIOError)
case r of
Right smp -> do
logInfo . decodeUtf8 $ "Agent connected to " <> showServer srv
let c = (isOwnServer ca srv, smp)
atomically $ do
putTMVar (sessionVar v) (Right smp)
TM.insert (sessionId $ thParams smp) smp smpSessions
putTMVar (sessionVar v) (Right c)
TM.insert (sessionId $ thParams smp) c smpSessions
notify ca $ CAConnected srv
pure $ Right c
Left e -> do
if persistErrorInterval agentCfg == 0 || e == PCENetworkError || e == PCEResponseTimeout
then atomically $ do
@@ -202,7 +212,12 @@ getSMPServerClient' ca@SMPClientAgent {agentCfg, smpClients, smpSessions, worker
ts <- addUTCTime (persistErrorInterval agentCfg) <$> liftIO getCurrentTime
atomically $ putTMVar (sessionVar v) (Left (e, Just ts))
reconnectClient ca srv
pure r
pure $ Left e
isOwnServer :: SMPClientAgent -> SMPServer -> OwnServer
isOwnServer SMPClientAgent {agentCfg} ProtocolServer {host} =
let srv = strEncode $ L.head host
in any (\s -> s == srv || (B.cons '.' s) `B.isSuffixOf` srv) (ownServerDomains agentCfg)
-- | Run an SMP client for SMPClientVar
connectClient :: SMPClientAgent -> SMPServer -> SMPClientVar -> IO (Either SMPClientError SMPClient)
@@ -298,7 +313,7 @@ notify :: MonadIO m => SMPClientAgent -> SMPClientAgentEvent -> m ()
notify ca evt = atomically $ writeTBQueue (agentQ ca) evt
{-# INLINE notify #-}
lookupSMPServerClient :: SMPClientAgent -> SessionId -> STM (Maybe SMPClient)
lookupSMPServerClient :: SMPClientAgent -> SessionId -> STM (Maybe (OwnServer, SMPClient))
lookupSMPServerClient SMPClientAgent {smpSessions} sessId = TM.lookup sessId smpSessions
closeSMPClientAgent :: SMPClientAgent -> IO ()
@@ -315,7 +330,7 @@ closeSMPServerClients c = atomically (smpClients c `swapTVar` M.empty) >>= mapM_
where
closeClient v =
atomically (readTMVar $ sessionVar v) >>= \case
Right smp -> closeProtocolClient smp `catchAll_` pure ()
Right (_, smp) -> closeProtocolClient smp `catchAll_` pure ()
_ -> pure ()
cancelActions :: Foldable f => TVar (f (Async ())) -> IO ()
+96 -48
View File
@@ -6,6 +6,7 @@
{-# LANGUAGE GADTs #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedLists #-}
@@ -68,8 +69,8 @@ import GHC.Stats (getRTSStats)
import GHC.TypeLits (KnownNat)
import Network.Socket (ServiceName, Socket, socketToHandle)
import Simplex.Messaging.Agent.Lock
import Simplex.Messaging.Client (ProtocolClient (thParams), ProtocolClientError (..), forwardSMPMessage, smpProxyError)
import Simplex.Messaging.Client.Agent (SMPClientAgent (..), SMPClientAgentEvent (..), closeSMPClientAgent, getSMPServerClient', lookupSMPServerClient)
import Simplex.Messaging.Client (ProtocolClient (thParams), ProtocolClientError (..), forwardSMPMessage, smpProxyError, temporaryClientError)
import Simplex.Messaging.Client.Agent (OwnServer, SMPClientAgent (..), SMPClientAgentEvent (..), closeSMPClientAgent, getSMPServerClient'', isOwnServer, lookupSMPServerClient)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding
import Simplex.Messaging.Encoding.String
@@ -232,7 +233,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime
liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath
liftIO $ threadDelay' $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0)
ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, msgSent, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount} <- asks serverStats
ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, msgSent, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount, pRelays, pRelaysOwn, pMsgFwds, pMsgFwdsOwn, pMsgFwdsRecv} <- asks serverStats
let interval = 1000000 * logInterval
forever $ do
withFile statsFilePath AppendMode $ \h -> liftIO $ do
@@ -251,32 +252,46 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
msgSentNtf' <- atomically $ swapTVar msgSentNtf 0
msgRecvNtf' <- atomically $ swapTVar msgRecvNtf 0
psNtf <- atomically $ periodStatCounts activeQueuesNtf ts
pRelays' <- atomically $ getResetProxyStatsData pRelays
pRelaysOwn' <- atomically $ getResetProxyStatsData pRelaysOwn
pMsgFwds' <- atomically $ getResetProxyStatsData pMsgFwds
pMsgFwdsOwn' <- atomically $ getResetProxyStatsData pMsgFwdsOwn
pMsgFwdsRecv' <- atomically $ swapTVar pMsgFwdsRecv 0
qCount' <- readTVarIO qCount
msgCount' <- readTVarIO msgCount
hPutStrLn h $
intercalate
","
[ iso8601Show $ utctDay fromTime',
show qCreated',
show qSecured',
show qDeletedAll',
show msgSent',
show msgRecv',
dayCount ps,
weekCount ps,
monthCount ps,
show msgSentNtf',
show msgRecvNtf',
dayCount psNtf,
weekCount psNtf,
monthCount psNtf,
show qCount',
show msgCount',
show msgExpired',
show qDeletedNew',
show qDeletedSecured'
]
( [ iso8601Show $ utctDay fromTime',
show qCreated',
show qSecured',
show qDeletedAll',
show msgSent',
show msgRecv',
dayCount ps,
weekCount ps,
monthCount ps,
show msgSentNtf',
show msgRecvNtf',
dayCount psNtf,
weekCount psNtf,
monthCount psNtf,
show qCount',
show msgCount',
show msgExpired',
show qDeletedNew',
show qDeletedSecured'
]
<> showProxyStats pRelays'
<> showProxyStats pRelaysOwn'
<> showProxyStats pMsgFwds'
<> showProxyStats pMsgFwdsOwn'
<> [show pMsgFwdsRecv']
)
liftIO $ threadDelay' interval
where
showProxyStats ProxyStatsData {_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther} =
[show _pRequests, show _pSuccesses, show _pErrorsConnect, show _pErrorsCompat, show _pErrorsOther]
runClient :: Transport c => C.APrivateSignKey -> TProxy c -> c -> M ()
runClient signKey tp h = do
@@ -346,7 +361,13 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
subscriptions' <- bshow . M.size <$> readTVarIO subscriptions
hPutStrLn h . B.unpack $ B.intercalate "," [bshow cid, encode sessionId, connected', strEncode createdAt, rcvActiveAt', sndActiveAt', bshow age, subscriptions']
CPStats -> withAdminRole $ do
ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, msgSent, msgRecv, msgSentNtf, msgRecvNtf, qCount, msgCount} <- unliftIO u $ asks serverStats
ss <- unliftIO u $ asks serverStats
let putStat :: Show a => ByteString -> (ServerStats -> TVar a) -> IO ()
putStat label var = readTVarIO (var ss) >>= \v -> B.hPutStr h $ label <> ": " <> bshow v <> "\n"
putProxyStat :: ByteString -> (ServerStats -> ProxyStats) -> IO ()
putProxyStat label var = do
ProxyStatsData {_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther} <- atomically $ getProxyStatsData $ var ss
B.hPutStr h $ label <> ": requests=" <> bshow _pRequests <> ", successes=" <> bshow _pSuccesses <> ", errorsConnect=" <> bshow _pErrorsConnect <> ", errorsCompat=" <> bshow _pErrorsCompat <> ", errorsOther=" <> bshow _pErrorsOther <> "\n"
putStat "fromTime" fromTime
putStat "qCreated" qCreated
putStat "qSecured" qSecured
@@ -359,9 +380,11 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
putStat "msgRecvNtf" msgRecvNtf
putStat "qCount" qCount
putStat "msgCount" msgCount
where
putStat :: Show a => String -> TVar a -> IO ()
putStat label var = readTVarIO var >>= \v -> hPutStrLn h $ label <> ": " <> show v
putProxyStat "pRelays" pRelays
putProxyStat "pRelaysOwn" pRelaysOwn
putProxyStat "pMsgFwds" pMsgFwds
putProxyStat "pMsgFwdsOwn" pMsgFwdsOwn
putStat "pMsgFwdsRecv" pMsgFwdsRecv
CPStatsRTS -> getRTSStats >>= hPrint h
CPThreads -> withAdminRole $ do
#if MIN_VERSION_base(4,18,0)
@@ -647,33 +670,56 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
ServerConfig {allowSMPProxy, newQueueBasicAuth} <- asks config
pure $ allowSMPProxy && maybe True ((== auth) . Just) newQueueBasicAuth
getRelay = do
ProxyAgent {smpAgent} <- asks proxyAgent
liftIO $ proxyResp <$> runExceptT (getSMPServerClient' smpAgent srv) `catch` (pure . Left . PCEIOError)
ServerStats {pRelays, pRelaysOwn} <- asks serverStats
let inc = mkIncProxyStats pRelays pRelaysOwn
ProxyAgent {smpAgent = a} <- asks proxyAgent
liftIO (runExceptT (getSMPServerClient'' a srv) `catch` (pure . Left . PCEIOError)) >>= \case
Right (own, smp) -> do
inc own pRequests
case proxyResp smp of
r@PKEY {} -> r <$ inc own pSuccesses
r -> r <$ inc own pErrorsCompat
Left e -> do
let own = isOwnServer a srv
inc own pRequests
inc own $ if temporaryClientError e then pErrorsConnect else pErrorsOther
pure . ERR $ smpProxyError e
where
proxyResp = \case
Left err -> ERR $ smpProxyError err
Right smp ->
let THandleParams {sessionId = srvSessId, thVersion, thServerVRange, thAuth} = thParams smp
in case compatibleVRange thServerVRange proxiedSMPRelayVRange of
-- Cap the destination relay version range to prevent client version fingerprinting.
-- See comment for proxiedSMPRelayVersion.
Just (Compatible vr) | thVersion >= sendingProxySMPVersion -> case thAuth of
Just THAuthClient {serverCertKey} -> PKEY srvSessId vr serverCertKey
Nothing -> ERR $ transportErr TENoServerAuth
_ -> ERR $ transportErr TEVersion
proxyResp smp =
let THandleParams {sessionId = srvSessId, thVersion, thServerVRange, thAuth} = thParams smp
in case compatibleVRange thServerVRange proxiedSMPRelayVRange of
-- Cap the destination relay version range to prevent client version fingerprinting.
-- See comment for proxiedSMPRelayVersion.
Just (Compatible vr) | thVersion >= sendingProxySMPVersion -> case thAuth of
Just THAuthClient {serverCertKey} -> PKEY srvSessId vr serverCertKey
Nothing -> ERR $ transportErr TENoServerAuth
_ -> ERR $ transportErr TEVersion
PFWD fwdV pubKey encBlock -> do
ProxyAgent {smpAgent} <- asks proxyAgent
atomically (lookupSMPServerClient smpAgent sessId) >>= \case
Just smp
| v >= sendingProxySMPVersion ->
liftIO $ either (ERR . smpProxyError) PRES <$>
runExceptT (forwardSMPMessage smp corrId fwdV pubKey encBlock) `catchError` (pure . Left . PCEIOError)
| otherwise -> pure . ERR $ transportErr TEVersion
ProxyAgent {smpAgent = a} <- asks proxyAgent
ServerStats {pMsgFwds, pMsgFwdsOwn} <- asks serverStats
let inc = mkIncProxyStats pMsgFwds pMsgFwdsOwn
atomically (lookupSMPServerClient a sessId) >>= \case
Just (own, smp) -> do
inc own pRequests
if
| v >= sendingProxySMPVersion ->
liftIO (runExceptT (forwardSMPMessage smp corrId fwdV pubKey encBlock) `catch` (pure . Left . PCEIOError)) >>= \case
Right r -> PRES r <$ inc own pSuccesses
Left e -> case e of
PCEProtocolError {} -> ERR err <$ inc own pSuccesses
_ -> ERR err <$ inc own pErrorsOther
where
err = smpProxyError e
| otherwise -> ERR (transportErr TEVersion) <$ inc own pErrorsCompat
where
THandleParams {thVersion = v} = thParams smp
Nothing -> pure $ ERR $ PROXY NO_SESSION
Nothing -> inc False pRequests >> inc False pErrorsConnect $> ERR (PROXY NO_SESSION)
transportErr :: TransportError -> ErrorType
transportErr = PROXY . BROKER . TRANSPORT
mkIncProxyStats :: MonadIO m => ProxyStats -> ProxyStats -> OwnServer -> (ProxyStats -> TVar Int) -> m ()
mkIncProxyStats ps psOwn = \own sel -> do
atomically $ modifyTVar' (sel ps) (+ 1)
when own $ atomically $ modifyTVar' (sel psOwn) (+ 1)
processCommand :: (Maybe QueueRec, Transmission Cmd) -> M (Either (Transmission (Command 'ProxiedClient)) (Transmission BrokerMsg))
processCommand (qr_, (corrId, queueId, cmd)) = do
st <- asks queueStore
@@ -987,6 +1033,8 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi
-- encrypt to proxy
let fr = FwdResponse {fwdCorrId, fwdResponse = r2}
r3 = EncFwdResponse $ C.cbEncryptNoPad sessSecret (C.reverseNonce proxyNonce) (smpEncode fr)
stats <- asks serverStats
atomically $ modifyTVar' (pMsgFwdsRecv stats) (+ 1)
pure $ RRES r3
where
rejectOrVerify :: Maybe (THandleAuth 'TServer) -> SignedTransmission ErrorType Cmd -> M (Either (Transmission BrokerMsg) (Maybe QueueRec, Transmission Cmd))
+1 -1
View File
@@ -126,7 +126,7 @@ data Server = Server
savingLock :: Lock
}
data ProxyAgent = ProxyAgent
newtype ProxyAgent = ProxyAgent
{ smpAgent :: SMPClientAgent
}
+6
View File
@@ -10,6 +10,7 @@ module Simplex.Messaging.Server.Main where
import Control.Concurrent.STM
import Control.Monad (void)
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.Functor (($>))
import Data.Ini (lookupValue, readIniFile)
@@ -147,6 +148,8 @@ smpServerCLI cfgPath logPath =
\# It defines prefferred hostname for destination servers with multiple hostnames.\n\
\# host_mode: public\n\
\# required_host_mode: off\n\n\
\# The domain suffixes of the relays you operate (space-separated) to count as separate proxy statistics.\n\
\# own_server_domains: \n\n\
\# SOCKS proxy port for forwarding messages to destination servers.\n\
\# You may need a separate instance of SOCKS proxy for incoming single-hop requests.\n\
\# socks_proxy: localhost:9050\n\n\
@@ -245,6 +248,7 @@ smpServerCLI cfgPath logPath =
requiredHostMode = fromMaybe False $ iniOnOff "PROXY" "required_host_mode" ini
}
},
ownServerDomains = either (const []) textToOwnServers $ lookupValue "PROXY" "own_server_domains" ini,
persistErrorInterval = 30 -- seconds
},
allowSMPProxy = True
@@ -259,6 +263,8 @@ smpServerCLI cfgPath logPath =
"public" -> HMPublic
"onion" -> HMOnionViaSocks
s -> error . T.unpack $ "Invalid host_mode: " <> s
textToOwnServers :: Text -> [ByteString]
textToOwnServers = map encodeUtf8 . T.words
data CliCommand
= Init InitOptions
+137 -18
View File
@@ -33,6 +33,11 @@ data ServerStats = ServerStats
msgSentNtf :: TVar Int,
msgRecvNtf :: TVar Int,
activeQueuesNtf :: PeriodStats RecipientId,
pRelays :: ProxyStats,
pRelaysOwn :: ProxyStats,
pMsgFwds :: ProxyStats,
pMsgFwdsOwn :: ProxyStats,
pMsgFwdsRecv :: TVar Int,
qCount :: TVar Int,
msgCount :: TVar Int
}
@@ -51,6 +56,11 @@ data ServerStatsData = ServerStatsData
_msgSentNtf :: Int,
_msgRecvNtf :: Int,
_activeQueuesNtf :: PeriodStatsData RecipientId,
_pRelays :: ProxyStatsData,
_pRelaysOwn :: ProxyStatsData,
_pMsgFwds :: ProxyStatsData,
_pMsgFwdsOwn :: ProxyStatsData,
_pMsgFwdsRecv :: Int,
_qCount :: Int,
_msgCount :: Int
}
@@ -71,9 +81,14 @@ newServerStats ts = do
msgSentNtf <- newTVar 0
msgRecvNtf <- newTVar 0
activeQueuesNtf <- newPeriodStats
pRelays <- newProxyStats
pRelaysOwn <- newProxyStats
pMsgFwds <- newProxyStats
pMsgFwdsOwn <- newProxyStats
pMsgFwdsRecv <- newTVar 0
qCount <- newTVar 0
msgCount <- newTVar 0
pure ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, msgSent, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount}
pure ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, msgSent, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, pRelays, pRelaysOwn, pMsgFwds, pMsgFwdsOwn, pMsgFwdsRecv, qCount, msgCount}
getServerStatsData :: ServerStats -> STM ServerStatsData
getServerStatsData s = do
@@ -90,9 +105,14 @@ getServerStatsData s = do
_msgSentNtf <- readTVar $ msgSentNtf s
_msgRecvNtf <- readTVar $ msgRecvNtf s
_activeQueuesNtf <- getPeriodStatsData $ activeQueuesNtf s
_pRelays <- getProxyStatsData $ pRelays s
_pRelaysOwn <- getProxyStatsData $ pRelaysOwn s
_pMsgFwds <- getProxyStatsData $ pMsgFwds s
_pMsgFwdsOwn <- getProxyStatsData $ pMsgFwdsOwn s
_pMsgFwdsRecv <- readTVar $ pMsgFwdsRecv s
_qCount <- readTVar $ qCount s
_msgCount <- readTVar $ msgCount s
pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeletedAll, _qDeletedNew, _qDeletedSecured, _msgSent, _msgRecv, _msgExpired, _activeQueues, _msgSentNtf, _msgRecvNtf, _activeQueuesNtf, _qCount, _msgCount}
pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeletedAll, _qDeletedNew, _qDeletedSecured, _msgSent, _msgRecv, _msgExpired, _activeQueues, _msgSentNtf, _msgRecvNtf, _activeQueuesNtf, _pRelays, _pRelaysOwn, _pMsgFwds, _pMsgFwdsOwn, _pMsgFwdsRecv, _qCount, _msgCount}
setServerStats :: ServerStats -> ServerStatsData -> STM ()
setServerStats s d = do
@@ -109,28 +129,42 @@ setServerStats s d = do
writeTVar (msgSentNtf s) $! _msgSentNtf d
writeTVar (msgRecvNtf s) $! _msgRecvNtf d
setPeriodStats (activeQueuesNtf s) (_activeQueuesNtf d)
setProxyStats (pRelays s) $! _pRelays d
setProxyStats (pRelaysOwn s) $! _pRelaysOwn d
setProxyStats (pMsgFwds s) $! _pMsgFwds d
setProxyStats (pMsgFwdsOwn s) $! _pMsgFwdsOwn d
writeTVar (pMsgFwdsRecv s) $! _pMsgFwdsRecv d
writeTVar (qCount s) $! _qCount d
writeTVar (msgCount s) $! _msgCount d
instance StrEncoding ServerStatsData where
strEncode ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeletedAll, _qDeletedNew, _qDeletedSecured, _msgSent, _msgRecv, _msgExpired, _msgSentNtf, _msgRecvNtf, _activeQueues, _activeQueuesNtf, _qCount, _msgCount} =
strEncode d =
B.unlines
[ "fromTime=" <> strEncode _fromTime,
"qCreated=" <> strEncode _qCreated,
"qSecured=" <> strEncode _qSecured,
"qDeletedAll=" <> strEncode _qDeletedAll,
"qDeletedNew=" <> strEncode _qDeletedNew,
"qDeletedSecured=" <> strEncode _qDeletedSecured,
"qCount=" <> strEncode _qCount,
"msgSent=" <> strEncode _msgSent,
"msgRecv=" <> strEncode _msgRecv,
"msgExpired=" <> strEncode _msgExpired,
"msgSentNtf=" <> strEncode _msgSentNtf,
"msgRecvNtf=" <> strEncode _msgRecvNtf,
[ "fromTime=" <> strEncode (_fromTime d),
"qCreated=" <> strEncode (_qCreated d),
"qSecured=" <> strEncode (_qSecured d),
"qDeletedAll=" <> strEncode (_qDeletedAll d),
"qDeletedNew=" <> strEncode (_qDeletedNew d),
"qDeletedSecured=" <> strEncode (_qDeletedSecured d),
"qCount=" <> strEncode (_qCount d),
"msgSent=" <> strEncode (_msgSent d),
"msgRecv=" <> strEncode (_msgRecv d),
"msgExpired=" <> strEncode (_msgExpired d),
"msgSentNtf=" <> strEncode (_msgSentNtf d),
"msgRecvNtf=" <> strEncode (_msgRecvNtf d),
"activeQueues:",
strEncode _activeQueues,
strEncode (_activeQueues d),
"activeQueuesNtf:",
strEncode _activeQueuesNtf
strEncode (_activeQueuesNtf d),
"pRelays:",
strEncode (_pRelays d),
"pRelaysOwn:",
strEncode (_pRelaysOwn d),
"pMsgFwds:",
strEncode (_pMsgFwds d),
"pMsgFwdsOwn:",
strEncode (_pMsgFwdsOwn d),
"pMsgFwdsRecv=" <> strEncode (_pMsgFwdsRecv d)
]
strP = do
_fromTime <- "fromTime=" *> strP <* A.endOfLine
@@ -157,7 +191,17 @@ instance StrEncoding ServerStatsData where
optional ("activeQueuesNtf:" <* A.endOfLine) >>= \case
Just _ -> strP <* optional A.endOfLine
_ -> pure newPeriodStatsData
pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeletedAll, _qDeletedNew, _qDeletedSecured, _msgSent, _msgRecv, _msgExpired, _msgSentNtf, _msgRecvNtf, _activeQueues, _activeQueuesNtf, _qCount, _msgCount = 0}
_pRelays <- proxyStatsP "pRelays:"
_pRelaysOwn <- proxyStatsP "pRelaysOwn:"
_pMsgFwds <- proxyStatsP "pMsgFwds:"
_pMsgFwdsOwn <- proxyStatsP "pMsgFwdsOwn:"
_pMsgFwdsRecv <- "pMsgFwdsRecv=" *> strP <* A.endOfLine <|> pure 0
pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeletedAll, _qDeletedNew, _qDeletedSecured, _msgSent, _msgRecv, _msgExpired, _msgSentNtf, _msgRecvNtf, _activeQueues, _activeQueuesNtf, _pRelays, _pRelaysOwn, _pMsgFwds, _pMsgFwdsOwn, _pMsgFwdsRecv, _qCount, _msgCount = 0}
where
proxyStatsP key =
optional (A.string key >> A.endOfLine) >>= \case
Just _ -> strP <* optional A.endOfLine
_ -> pure newProxyStatsData
data PeriodStats a = PeriodStats
{ day :: TVar (Set a),
@@ -231,3 +275,78 @@ updatePeriodStats stats pId = do
updatePeriod month
where
updatePeriod pSel = modifyTVar' (pSel stats) (S.insert pId)
data ProxyStats = ProxyStats
{ pRequests :: TVar Int,
pSuccesses :: TVar Int, -- includes destination server error responses that will be forwarded to the client
pErrorsConnect :: TVar Int,
pErrorsCompat :: TVar Int,
pErrorsOther :: TVar Int
}
newProxyStats :: STM ProxyStats
newProxyStats = do
pRequests <- newTVar 0
pSuccesses <- newTVar 0
pErrorsConnect <- newTVar 0
pErrorsCompat <- newTVar 0
pErrorsOther <- newTVar 0
pure ProxyStats {pRequests, pSuccesses, pErrorsConnect, pErrorsCompat, pErrorsOther}
data ProxyStatsData = ProxyStatsData
{ _pRequests :: Int,
_pSuccesses :: Int,
_pErrorsConnect :: Int,
_pErrorsCompat :: Int,
_pErrorsOther :: Int
}
deriving (Show)
newProxyStatsData :: ProxyStatsData
newProxyStatsData = ProxyStatsData {_pRequests = 0, _pSuccesses = 0, _pErrorsConnect = 0, _pErrorsCompat = 0, _pErrorsOther = 0}
getProxyStatsData :: ProxyStats -> STM ProxyStatsData
getProxyStatsData s = do
_pRequests <- readTVar $ pRequests s
_pSuccesses <- readTVar $ pSuccesses s
_pErrorsConnect <- readTVar $ pErrorsConnect s
_pErrorsCompat <- readTVar $ pErrorsCompat s
_pErrorsOther <- readTVar $ pErrorsOther s
pure ProxyStatsData {_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther}
getResetProxyStatsData :: ProxyStats -> STM ProxyStatsData
getResetProxyStatsData s = do
_pRequests <- swapTVar (pRequests s) 0
_pSuccesses <- swapTVar (pSuccesses s) 0
_pErrorsConnect <- swapTVar (pErrorsConnect s) 0
_pErrorsCompat <- swapTVar (pErrorsCompat s) 0
_pErrorsOther <- swapTVar (pErrorsOther s) 0
pure ProxyStatsData {_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther}
setProxyStats :: ProxyStats -> ProxyStatsData -> STM ()
setProxyStats s d = do
writeTVar (pRequests s) $! _pRequests d
writeTVar (pSuccesses s) $! _pSuccesses d
writeTVar (pErrorsConnect s) $! _pErrorsConnect d
writeTVar (pErrorsCompat s) $! _pErrorsCompat d
writeTVar (pErrorsOther s) $! _pErrorsOther d
instance StrEncoding ProxyStatsData where
strEncode ProxyStatsData {_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther} =
"requests="
<> strEncode _pRequests
<> "\nsuccesses="
<> strEncode _pSuccesses
<> "\nerrorsConnect="
<> strEncode _pErrorsConnect
<> "\nerrorsCompat="
<> strEncode _pErrorsCompat
<> "\nerrorsOther="
<> strEncode _pErrorsOther
strP = do
_pRequests <- "requests=" *> strP <* A.endOfLine
_pSuccesses <- "successes=" *> strP <* A.endOfLine
_pErrorsConnect <- "errorsConnect=" *> strP <* A.endOfLine
_pErrorsCompat <- "errorsCompat=" *> strP <* A.endOfLine
_pErrorsOther <- "errorsOther=" *> strP
pure ProxyStatsData {_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther}
+3 -3
View File
@@ -608,7 +608,7 @@ testRestoreMessages at@(ATransport t) =
logSize testStoreLogFile `shouldReturn` 2
logSize testStoreMsgsFile `shouldReturn` 5
logSize testServerStatsBackupFile `shouldReturn` 20
logSize testServerStatsBackupFile `shouldReturn` 45
Right stats1 <- strDecode <$> B.readFile testServerStatsBackupFile
checkStats stats1 [rId] 5 1
@@ -626,7 +626,7 @@ testRestoreMessages at@(ATransport t) =
logSize testStoreLogFile `shouldReturn` 1
-- the last message is not removed because it was not ACK'd
logSize testStoreMsgsFile `shouldReturn` 3
logSize testServerStatsBackupFile `shouldReturn` 20
logSize testServerStatsBackupFile `shouldReturn` 45
Right stats2 <- strDecode <$> B.readFile testServerStatsBackupFile
checkStats stats2 [rId] 5 3
@@ -645,7 +645,7 @@ testRestoreMessages at@(ATransport t) =
logSize testStoreLogFile `shouldReturn` 1
logSize testStoreMsgsFile `shouldReturn` 0
logSize testServerStatsBackupFile `shouldReturn` 20
logSize testServerStatsBackupFile `shouldReturn` 45
Right stats3 <- strDecode <$> B.readFile testServerStatsBackupFile
checkStats stats3 [rId] 5 5