diff --git a/src/Simplex/Messaging/Client.hs b/src/Simplex/Messaging/Client.hs index f98fac56b..ecf4ee766 100644 --- a/src/Simplex/Messaging/Client.hs +++ b/src/Simplex/Messaging/Client.hs @@ -84,7 +84,6 @@ module Simplex.Messaging.Client ServerTransmissionBatch, ServerTransmission (..), ClientCommand, - HostMode (..), -- * For testing PCTransmission, diff --git a/src/Simplex/Messaging/Client/Agent.hs b/src/Simplex/Messaging/Client/Agent.hs index 10b83157f..a7732b4d4 100644 --- a/src/Simplex/Messaging/Client/Agent.hs +++ b/src/Simplex/Messaging/Client/Agent.hs @@ -54,7 +54,7 @@ import UnliftIO.Exception (Exception) import qualified UnliftIO.Exception as E import UnliftIO.STM -type SMPClientVar = SessionVar (Either (SMPClientError, Maybe UTCTime) SMPClient) +type SMPClientVar = SessionVar (Either (SMPClientError, Maybe UTCTime) (OwnServer, SMPClient)) data SMPClientAgentEvent = CAConnected SMPServer @@ -75,7 +75,8 @@ data SMPClientAgentConfig = SMPClientAgentConfig persistErrorInterval :: NominalDiffTime, msgQSize :: Natural, agentQSize :: Natural, - agentSubsBatchSize :: Int + agentSubsBatchSize :: Int, + ownServerDomains :: [ByteString] } defaultSMPClientAgentConfig :: SMPClientAgentConfig @@ -91,7 +92,8 @@ defaultSMPClientAgentConfig = persistErrorInterval = 0, msgQSize = 256, agentQSize = 256, - agentSubsBatchSize = 900 + agentSubsBatchSize = 900, + ownServerDomains = [] } where second = 1000000 @@ -103,13 +105,15 @@ data SMPClientAgent = SMPClientAgent agentQ :: TBQueue SMPClientAgentEvent, randomDrg :: TVar ChaChaDRG, smpClients :: TMap SMPServer SMPClientVar, - smpSessions :: TMap SessionId SMPClient, + smpSessions :: TMap SessionId (OwnServer, SMPClient), srvSubs :: TMap SMPServer (TMap SMPSub C.APrivateAuthKey), pendingSrvSubs :: TMap SMPServer (TMap SMPSub C.APrivateAuthKey), smpSubWorkers :: TMap SMPServer (SessionVar (Async ())), workerSeq :: TVar Int } +type OwnServer = Bool + newtype InternalException e = InternalException {unInternalException :: e} deriving (Eq, Show) @@ -163,13 +167,17 @@ newSMPClientAgent agentCfg@SMPClientAgentConfig {msgQSize, agentQSize} randomDrg -- | Get or create SMP client for SMPServer getSMPServerClient' :: SMPClientAgent -> SMPServer -> ExceptT SMPClientError IO SMPClient -getSMPServerClient' ca@SMPClientAgent {agentCfg, smpClients, smpSessions, workerSeq} srv = +getSMPServerClient' ca srv = snd <$> getSMPServerClient'' ca srv +{-# INLINE getSMPServerClient' #-} + +getSMPServerClient'' :: SMPClientAgent -> SMPServer -> ExceptT SMPClientError IO (OwnServer, SMPClient) +getSMPServerClient'' ca@SMPClientAgent {agentCfg, smpClients, smpSessions, workerSeq} srv = atomically getClientVar >>= either (ExceptT . newSMPClient) waitForSMPClient where getClientVar :: STM (Either SMPClientVar SMPClientVar) getClientVar = getSessVar workerSeq srv smpClients - waitForSMPClient :: SMPClientVar -> ExceptT SMPClientError IO SMPClient + waitForSMPClient :: SMPClientVar -> ExceptT SMPClientError IO (OwnServer, SMPClient) waitForSMPClient v = do let ProtocolClientConfig {networkConfig = NetworkConfig {tcpConnectTimeout}} = smpCfg agentCfg smpClient_ <- liftIO $ tcpConnectTimeout `timeout` atomically (readTMVar $ sessionVar v) @@ -179,20 +187,22 @@ getSMPServerClient' ca@SMPClientAgent {agentCfg, smpClients, smpSessions, worker Just (Left (e, Just ts)) -> ifM ((ts <) <$> liftIO getCurrentTime) - (atomically (removeSessVar v srv smpClients) >> getSMPServerClient' ca srv) + (atomically (removeSessVar v srv smpClients) >> getSMPServerClient'' ca srv) (throwE e) Nothing -> throwE PCEResponseTimeout - newSMPClient :: SMPClientVar -> IO (Either SMPClientError SMPClient) + newSMPClient :: SMPClientVar -> IO (Either SMPClientError (OwnServer, SMPClient)) newSMPClient v = do r <- connectClient ca srv v `E.catch` (pure . Left . PCEIOError) case r of Right smp -> do logInfo . decodeUtf8 $ "Agent connected to " <> showServer srv + let c = (isOwnServer ca srv, smp) atomically $ do - putTMVar (sessionVar v) (Right smp) - TM.insert (sessionId $ thParams smp) smp smpSessions + putTMVar (sessionVar v) (Right c) + TM.insert (sessionId $ thParams smp) c smpSessions notify ca $ CAConnected srv + pure $ Right c Left e -> do if persistErrorInterval agentCfg == 0 || e == PCENetworkError || e == PCEResponseTimeout then atomically $ do @@ -202,7 +212,12 @@ getSMPServerClient' ca@SMPClientAgent {agentCfg, smpClients, smpSessions, worker ts <- addUTCTime (persistErrorInterval agentCfg) <$> liftIO getCurrentTime atomically $ putTMVar (sessionVar v) (Left (e, Just ts)) reconnectClient ca srv - pure r + pure $ Left e + +isOwnServer :: SMPClientAgent -> SMPServer -> OwnServer +isOwnServer SMPClientAgent {agentCfg} ProtocolServer {host} = + let srv = strEncode $ L.head host + in any (\s -> s == srv || (B.cons '.' s) `B.isSuffixOf` srv) (ownServerDomains agentCfg) -- | Run an SMP client for SMPClientVar connectClient :: SMPClientAgent -> SMPServer -> SMPClientVar -> IO (Either SMPClientError SMPClient) @@ -298,7 +313,7 @@ notify :: MonadIO m => SMPClientAgent -> SMPClientAgentEvent -> m () notify ca evt = atomically $ writeTBQueue (agentQ ca) evt {-# INLINE notify #-} -lookupSMPServerClient :: SMPClientAgent -> SessionId -> STM (Maybe SMPClient) +lookupSMPServerClient :: SMPClientAgent -> SessionId -> STM (Maybe (OwnServer, SMPClient)) lookupSMPServerClient SMPClientAgent {smpSessions} sessId = TM.lookup sessId smpSessions closeSMPClientAgent :: SMPClientAgent -> IO () @@ -315,7 +330,7 @@ closeSMPServerClients c = atomically (smpClients c `swapTVar` M.empty) >>= mapM_ where closeClient v = atomically (readTMVar $ sessionVar v) >>= \case - Right smp -> closeProtocolClient smp `catchAll_` pure () + Right (_, smp) -> closeProtocolClient smp `catchAll_` pure () _ -> pure () cancelActions :: Foldable f => TVar (f (Async ())) -> IO () diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 6c8a18aa5..6cbc4d4aa 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -6,6 +6,7 @@ {-# LANGUAGE GADTs #-} {-# LANGUAGE KindSignatures #-} {-# LANGUAGE LambdaCase #-} +{-# LANGUAGE MultiWayIf #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE NumericUnderscores #-} {-# LANGUAGE OverloadedLists #-} @@ -68,8 +69,8 @@ import GHC.Stats (getRTSStats) import GHC.TypeLits (KnownNat) import Network.Socket (ServiceName, Socket, socketToHandle) import Simplex.Messaging.Agent.Lock -import Simplex.Messaging.Client (ProtocolClient (thParams), ProtocolClientError (..), forwardSMPMessage, smpProxyError) -import Simplex.Messaging.Client.Agent (SMPClientAgent (..), SMPClientAgentEvent (..), closeSMPClientAgent, getSMPServerClient', lookupSMPServerClient) +import Simplex.Messaging.Client (ProtocolClient (thParams), ProtocolClientError (..), forwardSMPMessage, smpProxyError, temporaryClientError) +import Simplex.Messaging.Client.Agent (OwnServer, SMPClientAgent (..), SMPClientAgentEvent (..), closeSMPClientAgent, getSMPServerClient'', isOwnServer, lookupSMPServerClient) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding import Simplex.Messaging.Encoding.String @@ -232,7 +233,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath liftIO $ threadDelay' $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0) - ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, msgSent, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount} <- asks serverStats + ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, msgSent, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount, pRelays, pRelaysOwn, pMsgFwds, pMsgFwdsOwn, pMsgFwdsRecv} <- asks serverStats let interval = 1000000 * logInterval forever $ do withFile statsFilePath AppendMode $ \h -> liftIO $ do @@ -251,32 +252,46 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do msgSentNtf' <- atomically $ swapTVar msgSentNtf 0 msgRecvNtf' <- atomically $ swapTVar msgRecvNtf 0 psNtf <- atomically $ periodStatCounts activeQueuesNtf ts + pRelays' <- atomically $ getResetProxyStatsData pRelays + pRelaysOwn' <- atomically $ getResetProxyStatsData pRelaysOwn + pMsgFwds' <- atomically $ getResetProxyStatsData pMsgFwds + pMsgFwdsOwn' <- atomically $ getResetProxyStatsData pMsgFwdsOwn + pMsgFwdsRecv' <- atomically $ swapTVar pMsgFwdsRecv 0 qCount' <- readTVarIO qCount msgCount' <- readTVarIO msgCount hPutStrLn h $ intercalate "," - [ iso8601Show $ utctDay fromTime', - show qCreated', - show qSecured', - show qDeletedAll', - show msgSent', - show msgRecv', - dayCount ps, - weekCount ps, - monthCount ps, - show msgSentNtf', - show msgRecvNtf', - dayCount psNtf, - weekCount psNtf, - monthCount psNtf, - show qCount', - show msgCount', - show msgExpired', - show qDeletedNew', - show qDeletedSecured' - ] + ( [ iso8601Show $ utctDay fromTime', + show qCreated', + show qSecured', + show qDeletedAll', + show msgSent', + show msgRecv', + dayCount ps, + weekCount ps, + monthCount ps, + show msgSentNtf', + show msgRecvNtf', + dayCount psNtf, + weekCount psNtf, + monthCount psNtf, + show qCount', + show msgCount', + show msgExpired', + show qDeletedNew', + show qDeletedSecured' + ] + <> showProxyStats pRelays' + <> showProxyStats pRelaysOwn' + <> showProxyStats pMsgFwds' + <> showProxyStats pMsgFwdsOwn' + <> [show pMsgFwdsRecv'] + ) liftIO $ threadDelay' interval + where + showProxyStats ProxyStatsData {_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther} = + [show _pRequests, show _pSuccesses, show _pErrorsConnect, show _pErrorsCompat, show _pErrorsOther] runClient :: Transport c => C.APrivateSignKey -> TProxy c -> c -> M () runClient signKey tp h = do @@ -346,7 +361,13 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do subscriptions' <- bshow . M.size <$> readTVarIO subscriptions hPutStrLn h . B.unpack $ B.intercalate "," [bshow cid, encode sessionId, connected', strEncode createdAt, rcvActiveAt', sndActiveAt', bshow age, subscriptions'] CPStats -> withAdminRole $ do - ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, msgSent, msgRecv, msgSentNtf, msgRecvNtf, qCount, msgCount} <- unliftIO u $ asks serverStats + ss <- unliftIO u $ asks serverStats + let putStat :: Show a => ByteString -> (ServerStats -> TVar a) -> IO () + putStat label var = readTVarIO (var ss) >>= \v -> B.hPutStr h $ label <> ": " <> bshow v <> "\n" + putProxyStat :: ByteString -> (ServerStats -> ProxyStats) -> IO () + putProxyStat label var = do + ProxyStatsData {_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther} <- atomically $ getProxyStatsData $ var ss + B.hPutStr h $ label <> ": requests=" <> bshow _pRequests <> ", successes=" <> bshow _pSuccesses <> ", errorsConnect=" <> bshow _pErrorsConnect <> ", errorsCompat=" <> bshow _pErrorsCompat <> ", errorsOther=" <> bshow _pErrorsOther <> "\n" putStat "fromTime" fromTime putStat "qCreated" qCreated putStat "qSecured" qSecured @@ -359,9 +380,11 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do putStat "msgRecvNtf" msgRecvNtf putStat "qCount" qCount putStat "msgCount" msgCount - where - putStat :: Show a => String -> TVar a -> IO () - putStat label var = readTVarIO var >>= \v -> hPutStrLn h $ label <> ": " <> show v + putProxyStat "pRelays" pRelays + putProxyStat "pRelaysOwn" pRelaysOwn + putProxyStat "pMsgFwds" pMsgFwds + putProxyStat "pMsgFwdsOwn" pMsgFwdsOwn + putStat "pMsgFwdsRecv" pMsgFwdsRecv CPStatsRTS -> getRTSStats >>= hPrint h CPThreads -> withAdminRole $ do #if MIN_VERSION_base(4,18,0) @@ -647,33 +670,56 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi ServerConfig {allowSMPProxy, newQueueBasicAuth} <- asks config pure $ allowSMPProxy && maybe True ((== auth) . Just) newQueueBasicAuth getRelay = do - ProxyAgent {smpAgent} <- asks proxyAgent - liftIO $ proxyResp <$> runExceptT (getSMPServerClient' smpAgent srv) `catch` (pure . Left . PCEIOError) + ServerStats {pRelays, pRelaysOwn} <- asks serverStats + let inc = mkIncProxyStats pRelays pRelaysOwn + ProxyAgent {smpAgent = a} <- asks proxyAgent + liftIO (runExceptT (getSMPServerClient'' a srv) `catch` (pure . Left . PCEIOError)) >>= \case + Right (own, smp) -> do + inc own pRequests + case proxyResp smp of + r@PKEY {} -> r <$ inc own pSuccesses + r -> r <$ inc own pErrorsCompat + Left e -> do + let own = isOwnServer a srv + inc own pRequests + inc own $ if temporaryClientError e then pErrorsConnect else pErrorsOther + pure . ERR $ smpProxyError e where - proxyResp = \case - Left err -> ERR $ smpProxyError err - Right smp -> - let THandleParams {sessionId = srvSessId, thVersion, thServerVRange, thAuth} = thParams smp - in case compatibleVRange thServerVRange proxiedSMPRelayVRange of - -- Cap the destination relay version range to prevent client version fingerprinting. - -- See comment for proxiedSMPRelayVersion. - Just (Compatible vr) | thVersion >= sendingProxySMPVersion -> case thAuth of - Just THAuthClient {serverCertKey} -> PKEY srvSessId vr serverCertKey - Nothing -> ERR $ transportErr TENoServerAuth - _ -> ERR $ transportErr TEVersion + proxyResp smp = + let THandleParams {sessionId = srvSessId, thVersion, thServerVRange, thAuth} = thParams smp + in case compatibleVRange thServerVRange proxiedSMPRelayVRange of + -- Cap the destination relay version range to prevent client version fingerprinting. + -- See comment for proxiedSMPRelayVersion. + Just (Compatible vr) | thVersion >= sendingProxySMPVersion -> case thAuth of + Just THAuthClient {serverCertKey} -> PKEY srvSessId vr serverCertKey + Nothing -> ERR $ transportErr TENoServerAuth + _ -> ERR $ transportErr TEVersion PFWD fwdV pubKey encBlock -> do - ProxyAgent {smpAgent} <- asks proxyAgent - atomically (lookupSMPServerClient smpAgent sessId) >>= \case - Just smp - | v >= sendingProxySMPVersion -> - liftIO $ either (ERR . smpProxyError) PRES <$> - runExceptT (forwardSMPMessage smp corrId fwdV pubKey encBlock) `catchError` (pure . Left . PCEIOError) - | otherwise -> pure . ERR $ transportErr TEVersion + ProxyAgent {smpAgent = a} <- asks proxyAgent + ServerStats {pMsgFwds, pMsgFwdsOwn} <- asks serverStats + let inc = mkIncProxyStats pMsgFwds pMsgFwdsOwn + atomically (lookupSMPServerClient a sessId) >>= \case + Just (own, smp) -> do + inc own pRequests + if + | v >= sendingProxySMPVersion -> + liftIO (runExceptT (forwardSMPMessage smp corrId fwdV pubKey encBlock) `catch` (pure . Left . PCEIOError)) >>= \case + Right r -> PRES r <$ inc own pSuccesses + Left e -> case e of + PCEProtocolError {} -> ERR err <$ inc own pSuccesses + _ -> ERR err <$ inc own pErrorsOther + where + err = smpProxyError e + | otherwise -> ERR (transportErr TEVersion) <$ inc own pErrorsCompat where THandleParams {thVersion = v} = thParams smp - Nothing -> pure $ ERR $ PROXY NO_SESSION + Nothing -> inc False pRequests >> inc False pErrorsConnect $> ERR (PROXY NO_SESSION) transportErr :: TransportError -> ErrorType transportErr = PROXY . BROKER . TRANSPORT + mkIncProxyStats :: MonadIO m => ProxyStats -> ProxyStats -> OwnServer -> (ProxyStats -> TVar Int) -> m () + mkIncProxyStats ps psOwn = \own sel -> do + atomically $ modifyTVar' (sel ps) (+ 1) + when own $ atomically $ modifyTVar' (sel psOwn) (+ 1) processCommand :: (Maybe QueueRec, Transmission Cmd) -> M (Either (Transmission (Command 'ProxiedClient)) (Transmission BrokerMsg)) processCommand (qr_, (corrId, queueId, cmd)) = do st <- asks queueStore @@ -987,6 +1033,8 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi -- encrypt to proxy let fr = FwdResponse {fwdCorrId, fwdResponse = r2} r3 = EncFwdResponse $ C.cbEncryptNoPad sessSecret (C.reverseNonce proxyNonce) (smpEncode fr) + stats <- asks serverStats + atomically $ modifyTVar' (pMsgFwdsRecv stats) (+ 1) pure $ RRES r3 where rejectOrVerify :: Maybe (THandleAuth 'TServer) -> SignedTransmission ErrorType Cmd -> M (Either (Transmission BrokerMsg) (Maybe QueueRec, Transmission Cmd)) diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index f23192aeb..52a6094bc 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -126,7 +126,7 @@ data Server = Server savingLock :: Lock } -data ProxyAgent = ProxyAgent +newtype ProxyAgent = ProxyAgent { smpAgent :: SMPClientAgent } diff --git a/src/Simplex/Messaging/Server/Main.hs b/src/Simplex/Messaging/Server/Main.hs index fa1698653..6af1ce2a5 100644 --- a/src/Simplex/Messaging/Server/Main.hs +++ b/src/Simplex/Messaging/Server/Main.hs @@ -10,6 +10,7 @@ module Simplex.Messaging.Server.Main where import Control.Concurrent.STM import Control.Monad (void) +import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import Data.Functor (($>)) import Data.Ini (lookupValue, readIniFile) @@ -147,6 +148,8 @@ smpServerCLI cfgPath logPath = \# It defines prefferred hostname for destination servers with multiple hostnames.\n\ \# host_mode: public\n\ \# required_host_mode: off\n\n\ + \# The domain suffixes of the relays you operate (space-separated) to count as separate proxy statistics.\n\ + \# own_server_domains: \n\n\ \# SOCKS proxy port for forwarding messages to destination servers.\n\ \# You may need a separate instance of SOCKS proxy for incoming single-hop requests.\n\ \# socks_proxy: localhost:9050\n\n\ @@ -245,6 +248,7 @@ smpServerCLI cfgPath logPath = requiredHostMode = fromMaybe False $ iniOnOff "PROXY" "required_host_mode" ini } }, + ownServerDomains = either (const []) textToOwnServers $ lookupValue "PROXY" "own_server_domains" ini, persistErrorInterval = 30 -- seconds }, allowSMPProxy = True @@ -259,6 +263,8 @@ smpServerCLI cfgPath logPath = "public" -> HMPublic "onion" -> HMOnionViaSocks s -> error . T.unpack $ "Invalid host_mode: " <> s + textToOwnServers :: Text -> [ByteString] + textToOwnServers = map encodeUtf8 . T.words data CliCommand = Init InitOptions diff --git a/src/Simplex/Messaging/Server/Stats.hs b/src/Simplex/Messaging/Server/Stats.hs index 0b4c677c2..d8935b44b 100644 --- a/src/Simplex/Messaging/Server/Stats.hs +++ b/src/Simplex/Messaging/Server/Stats.hs @@ -33,6 +33,11 @@ data ServerStats = ServerStats msgSentNtf :: TVar Int, msgRecvNtf :: TVar Int, activeQueuesNtf :: PeriodStats RecipientId, + pRelays :: ProxyStats, + pRelaysOwn :: ProxyStats, + pMsgFwds :: ProxyStats, + pMsgFwdsOwn :: ProxyStats, + pMsgFwdsRecv :: TVar Int, qCount :: TVar Int, msgCount :: TVar Int } @@ -51,6 +56,11 @@ data ServerStatsData = ServerStatsData _msgSentNtf :: Int, _msgRecvNtf :: Int, _activeQueuesNtf :: PeriodStatsData RecipientId, + _pRelays :: ProxyStatsData, + _pRelaysOwn :: ProxyStatsData, + _pMsgFwds :: ProxyStatsData, + _pMsgFwdsOwn :: ProxyStatsData, + _pMsgFwdsRecv :: Int, _qCount :: Int, _msgCount :: Int } @@ -71,9 +81,14 @@ newServerStats ts = do msgSentNtf <- newTVar 0 msgRecvNtf <- newTVar 0 activeQueuesNtf <- newPeriodStats + pRelays <- newProxyStats + pRelaysOwn <- newProxyStats + pMsgFwds <- newProxyStats + pMsgFwdsOwn <- newProxyStats + pMsgFwdsRecv <- newTVar 0 qCount <- newTVar 0 msgCount <- newTVar 0 - pure ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, msgSent, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount} + pure ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedNew, qDeletedSecured, msgSent, msgRecv, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, pRelays, pRelaysOwn, pMsgFwds, pMsgFwdsOwn, pMsgFwdsRecv, qCount, msgCount} getServerStatsData :: ServerStats -> STM ServerStatsData getServerStatsData s = do @@ -90,9 +105,14 @@ getServerStatsData s = do _msgSentNtf <- readTVar $ msgSentNtf s _msgRecvNtf <- readTVar $ msgRecvNtf s _activeQueuesNtf <- getPeriodStatsData $ activeQueuesNtf s + _pRelays <- getProxyStatsData $ pRelays s + _pRelaysOwn <- getProxyStatsData $ pRelaysOwn s + _pMsgFwds <- getProxyStatsData $ pMsgFwds s + _pMsgFwdsOwn <- getProxyStatsData $ pMsgFwdsOwn s + _pMsgFwdsRecv <- readTVar $ pMsgFwdsRecv s _qCount <- readTVar $ qCount s _msgCount <- readTVar $ msgCount s - pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeletedAll, _qDeletedNew, _qDeletedSecured, _msgSent, _msgRecv, _msgExpired, _activeQueues, _msgSentNtf, _msgRecvNtf, _activeQueuesNtf, _qCount, _msgCount} + pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeletedAll, _qDeletedNew, _qDeletedSecured, _msgSent, _msgRecv, _msgExpired, _activeQueues, _msgSentNtf, _msgRecvNtf, _activeQueuesNtf, _pRelays, _pRelaysOwn, _pMsgFwds, _pMsgFwdsOwn, _pMsgFwdsRecv, _qCount, _msgCount} setServerStats :: ServerStats -> ServerStatsData -> STM () setServerStats s d = do @@ -109,28 +129,42 @@ setServerStats s d = do writeTVar (msgSentNtf s) $! _msgSentNtf d writeTVar (msgRecvNtf s) $! _msgRecvNtf d setPeriodStats (activeQueuesNtf s) (_activeQueuesNtf d) + setProxyStats (pRelays s) $! _pRelays d + setProxyStats (pRelaysOwn s) $! _pRelaysOwn d + setProxyStats (pMsgFwds s) $! _pMsgFwds d + setProxyStats (pMsgFwdsOwn s) $! _pMsgFwdsOwn d + writeTVar (pMsgFwdsRecv s) $! _pMsgFwdsRecv d writeTVar (qCount s) $! _qCount d writeTVar (msgCount s) $! _msgCount d instance StrEncoding ServerStatsData where - strEncode ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeletedAll, _qDeletedNew, _qDeletedSecured, _msgSent, _msgRecv, _msgExpired, _msgSentNtf, _msgRecvNtf, _activeQueues, _activeQueuesNtf, _qCount, _msgCount} = + strEncode d = B.unlines - [ "fromTime=" <> strEncode _fromTime, - "qCreated=" <> strEncode _qCreated, - "qSecured=" <> strEncode _qSecured, - "qDeletedAll=" <> strEncode _qDeletedAll, - "qDeletedNew=" <> strEncode _qDeletedNew, - "qDeletedSecured=" <> strEncode _qDeletedSecured, - "qCount=" <> strEncode _qCount, - "msgSent=" <> strEncode _msgSent, - "msgRecv=" <> strEncode _msgRecv, - "msgExpired=" <> strEncode _msgExpired, - "msgSentNtf=" <> strEncode _msgSentNtf, - "msgRecvNtf=" <> strEncode _msgRecvNtf, + [ "fromTime=" <> strEncode (_fromTime d), + "qCreated=" <> strEncode (_qCreated d), + "qSecured=" <> strEncode (_qSecured d), + "qDeletedAll=" <> strEncode (_qDeletedAll d), + "qDeletedNew=" <> strEncode (_qDeletedNew d), + "qDeletedSecured=" <> strEncode (_qDeletedSecured d), + "qCount=" <> strEncode (_qCount d), + "msgSent=" <> strEncode (_msgSent d), + "msgRecv=" <> strEncode (_msgRecv d), + "msgExpired=" <> strEncode (_msgExpired d), + "msgSentNtf=" <> strEncode (_msgSentNtf d), + "msgRecvNtf=" <> strEncode (_msgRecvNtf d), "activeQueues:", - strEncode _activeQueues, + strEncode (_activeQueues d), "activeQueuesNtf:", - strEncode _activeQueuesNtf + strEncode (_activeQueuesNtf d), + "pRelays:", + strEncode (_pRelays d), + "pRelaysOwn:", + strEncode (_pRelaysOwn d), + "pMsgFwds:", + strEncode (_pMsgFwds d), + "pMsgFwdsOwn:", + strEncode (_pMsgFwdsOwn d), + "pMsgFwdsRecv=" <> strEncode (_pMsgFwdsRecv d) ] strP = do _fromTime <- "fromTime=" *> strP <* A.endOfLine @@ -157,7 +191,17 @@ instance StrEncoding ServerStatsData where optional ("activeQueuesNtf:" <* A.endOfLine) >>= \case Just _ -> strP <* optional A.endOfLine _ -> pure newPeriodStatsData - pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeletedAll, _qDeletedNew, _qDeletedSecured, _msgSent, _msgRecv, _msgExpired, _msgSentNtf, _msgRecvNtf, _activeQueues, _activeQueuesNtf, _qCount, _msgCount = 0} + _pRelays <- proxyStatsP "pRelays:" + _pRelaysOwn <- proxyStatsP "pRelaysOwn:" + _pMsgFwds <- proxyStatsP "pMsgFwds:" + _pMsgFwdsOwn <- proxyStatsP "pMsgFwdsOwn:" + _pMsgFwdsRecv <- "pMsgFwdsRecv=" *> strP <* A.endOfLine <|> pure 0 + pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeletedAll, _qDeletedNew, _qDeletedSecured, _msgSent, _msgRecv, _msgExpired, _msgSentNtf, _msgRecvNtf, _activeQueues, _activeQueuesNtf, _pRelays, _pRelaysOwn, _pMsgFwds, _pMsgFwdsOwn, _pMsgFwdsRecv, _qCount, _msgCount = 0} + where + proxyStatsP key = + optional (A.string key >> A.endOfLine) >>= \case + Just _ -> strP <* optional A.endOfLine + _ -> pure newProxyStatsData data PeriodStats a = PeriodStats { day :: TVar (Set a), @@ -231,3 +275,78 @@ updatePeriodStats stats pId = do updatePeriod month where updatePeriod pSel = modifyTVar' (pSel stats) (S.insert pId) + +data ProxyStats = ProxyStats + { pRequests :: TVar Int, + pSuccesses :: TVar Int, -- includes destination server error responses that will be forwarded to the client + pErrorsConnect :: TVar Int, + pErrorsCompat :: TVar Int, + pErrorsOther :: TVar Int + } + +newProxyStats :: STM ProxyStats +newProxyStats = do + pRequests <- newTVar 0 + pSuccesses <- newTVar 0 + pErrorsConnect <- newTVar 0 + pErrorsCompat <- newTVar 0 + pErrorsOther <- newTVar 0 + pure ProxyStats {pRequests, pSuccesses, pErrorsConnect, pErrorsCompat, pErrorsOther} + +data ProxyStatsData = ProxyStatsData + { _pRequests :: Int, + _pSuccesses :: Int, + _pErrorsConnect :: Int, + _pErrorsCompat :: Int, + _pErrorsOther :: Int + } + deriving (Show) + +newProxyStatsData :: ProxyStatsData +newProxyStatsData = ProxyStatsData {_pRequests = 0, _pSuccesses = 0, _pErrorsConnect = 0, _pErrorsCompat = 0, _pErrorsOther = 0} + +getProxyStatsData :: ProxyStats -> STM ProxyStatsData +getProxyStatsData s = do + _pRequests <- readTVar $ pRequests s + _pSuccesses <- readTVar $ pSuccesses s + _pErrorsConnect <- readTVar $ pErrorsConnect s + _pErrorsCompat <- readTVar $ pErrorsCompat s + _pErrorsOther <- readTVar $ pErrorsOther s + pure ProxyStatsData {_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther} + +getResetProxyStatsData :: ProxyStats -> STM ProxyStatsData +getResetProxyStatsData s = do + _pRequests <- swapTVar (pRequests s) 0 + _pSuccesses <- swapTVar (pSuccesses s) 0 + _pErrorsConnect <- swapTVar (pErrorsConnect s) 0 + _pErrorsCompat <- swapTVar (pErrorsCompat s) 0 + _pErrorsOther <- swapTVar (pErrorsOther s) 0 + pure ProxyStatsData {_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther} + +setProxyStats :: ProxyStats -> ProxyStatsData -> STM () +setProxyStats s d = do + writeTVar (pRequests s) $! _pRequests d + writeTVar (pSuccesses s) $! _pSuccesses d + writeTVar (pErrorsConnect s) $! _pErrorsConnect d + writeTVar (pErrorsCompat s) $! _pErrorsCompat d + writeTVar (pErrorsOther s) $! _pErrorsOther d + +instance StrEncoding ProxyStatsData where + strEncode ProxyStatsData {_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther} = + "requests=" + <> strEncode _pRequests + <> "\nsuccesses=" + <> strEncode _pSuccesses + <> "\nerrorsConnect=" + <> strEncode _pErrorsConnect + <> "\nerrorsCompat=" + <> strEncode _pErrorsCompat + <> "\nerrorsOther=" + <> strEncode _pErrorsOther + strP = do + _pRequests <- "requests=" *> strP <* A.endOfLine + _pSuccesses <- "successes=" *> strP <* A.endOfLine + _pErrorsConnect <- "errorsConnect=" *> strP <* A.endOfLine + _pErrorsCompat <- "errorsCompat=" *> strP <* A.endOfLine + _pErrorsOther <- "errorsOther=" *> strP + pure ProxyStatsData {_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther} diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index b0ed67913..b0c90ca96 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -608,7 +608,7 @@ testRestoreMessages at@(ATransport t) = logSize testStoreLogFile `shouldReturn` 2 logSize testStoreMsgsFile `shouldReturn` 5 - logSize testServerStatsBackupFile `shouldReturn` 20 + logSize testServerStatsBackupFile `shouldReturn` 45 Right stats1 <- strDecode <$> B.readFile testServerStatsBackupFile checkStats stats1 [rId] 5 1 @@ -626,7 +626,7 @@ testRestoreMessages at@(ATransport t) = logSize testStoreLogFile `shouldReturn` 1 -- the last message is not removed because it was not ACK'd logSize testStoreMsgsFile `shouldReturn` 3 - logSize testServerStatsBackupFile `shouldReturn` 20 + logSize testServerStatsBackupFile `shouldReturn` 45 Right stats2 <- strDecode <$> B.readFile testServerStatsBackupFile checkStats stats2 [rId] 5 3 @@ -645,7 +645,7 @@ testRestoreMessages at@(ATransport t) = logSize testStoreLogFile `shouldReturn` 1 logSize testStoreMsgsFile `shouldReturn` 0 - logSize testServerStatsBackupFile `shouldReturn` 20 + logSize testServerStatsBackupFile `shouldReturn` 45 Right stats3 <- strDecode <$> B.readFile testServerStatsBackupFile checkStats stats3 [rId] 5 5