diff --git a/apps/smp-server/Main.hs b/apps/smp-server/Main.hs index 7bc13b3fb..9cb56e08a 100644 --- a/apps/smp-server/Main.hs +++ b/apps/smp-server/Main.hs @@ -5,6 +5,7 @@ module Main where +import Control.Logger.Simple import Simplex.Messaging.Server (runSMPServer) import Simplex.Messaging.Server.CLI (ServerCLIConfig (..), protocolServerCLI) import Simplex.Messaging.Server.Env.STM (ServerConfig (..), defaultInactiveClientExpiration, defaultMessageExpiration) @@ -17,8 +18,13 @@ cfgPath = "/etc/opt/simplex" logPath :: FilePath logPath = "/var/opt/simplex" +logCfg :: LogConfig +logCfg = LogConfig {lc_file = Nothing, lc_stderr = True} + main :: IO () -main = protocolServerCLI smpServerCLIConfig runSMPServer +main = do + setLogLevel LogInfo + withGlobalLogging logCfg $ protocolServerCLI smpServerCLIConfig runSMPServer smpServerCLIConfig :: ServerCLIConfig ServerConfig smpServerCLIConfig = @@ -52,6 +58,8 @@ smpServerCLIConfig = storeLogFile, allowNewQueues = True, messageExpiration = Just defaultMessageExpiration, - inactiveClientExpiration = Just defaultInactiveClientExpiration + inactiveClientExpiration = Just defaultInactiveClientExpiration, + logStatsInterval = Just 86400, -- seconds + logStatsStartTime = 0 -- seconds from 00:00 UTC } } diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index f2cbe2d68..901f0ee94 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -5,6 +5,7 @@ {-# LANGUAGE KindSignatures #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE NumericUnderscores #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} @@ -32,6 +33,7 @@ module Simplex.Messaging.Server ) where +import Control.Logger.Simple import Control.Monad import Control.Monad.Except import Control.Monad.IO.Unlift @@ -40,8 +42,12 @@ import Crypto.Random import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import Data.Functor (($>)) +import Data.List (intercalate) import qualified Data.Map.Strict as M import Data.Maybe (isNothing) +import qualified Data.Set as S +import qualified Data.Text as T +import Data.Time.Clock (UTCTime (..), diffTimeToPicoseconds, getCurrentTime) import Data.Time.Clock.System (SystemTime (..), getSystemTime) import Data.Type.Equality import Network.Socket (ServiceName) @@ -86,7 +92,7 @@ smpServer started = do raceAny_ ( serverThread s subscribedQ subscribers subscriptions cancelSub : serverThread s ntfSubscribedQ notifiers ntfSubscriptions (\_ -> pure ()) : - map runServer transports <> expireMessagesThread_ cfg + map runServer transports <> expireMessagesThread_ cfg <> serverStatsThread_ cfg ) `finally` withLog closeStoreLog where @@ -140,6 +146,29 @@ smpServer started = do atomically (getMsgQueue ms rId quota) >>= atomically . (`deleteExpiredMsgs` old) + serverStatsThread_ :: ServerConfig -> [m ()] + serverStatsThread_ ServerConfig {logStatsInterval, logStatsStartTime} = + maybe [] ((: []) . logServerStats logStatsStartTime) logStatsInterval + + logServerStats :: Int -> Int -> m () + logServerStats startAt logInterval = do + initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime + logInfo $ "fromTime,qCreated,qSecured,qDeleted,msgSent,msgRecv,msgQueues" + threadDelay $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0) + ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, msgQueues} <- asks serverStats + let interval = 1000000 * logInterval + forever $ do + ts <- liftIO getCurrentTime + fromTime' <- atomically $ swapTVar fromTime ts + qCreated' <- atomically $ swapTVar qCreated 0 + qSecured' <- atomically $ swapTVar qSecured 0 + qDeleted' <- atomically $ swapTVar qDeleted 0 + msgSent' <- atomically $ swapTVar msgSent 0 + msgRecv' <- atomically $ swapTVar msgRecv 0 + msgQueues' <- atomically $ S.size <$> swapTVar msgQueues S.empty + logInfo . T.pack $ intercalate "," [show fromTime', show qCreated', show qSecured', show qDeleted', show msgSent', show msgRecv', show msgQueues'] + threadDelay interval + runClient :: Transport c => TProxy c -> c -> m () runClient _ h = do kh <- asks serverIdentity @@ -311,6 +340,8 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri Left e -> pure $ ERR e Right _ -> do withLog (`logCreateById` rId) + stats <- asks serverStats + atomically $ modifyTVar (qCreated stats) (+ 1) subscribeQueue rId $> IDS (qik ids) logCreateById :: StoreLog 'WriteMode -> RecipientId -> IO () @@ -327,6 +358,8 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri secureQueue_ :: QueueStore -> SndPublicVerifyKey -> m (Transmission BrokerMsg) secureQueue_ st sKey = do withLog $ \s -> logSecureQueue s queueId sKey + stats <- asks serverStats + atomically $ modifyTVar (qSecured stats) (+ 1) atomically $ (corrId,queueId,) . either ERR (const OK) <$> secureQueue st queueId sKey addQueueNotifier_ :: QueueStore -> NtfPublicVerifyKey -> m (Transmission BrokerMsg) @@ -373,7 +406,11 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri acknowledgeMsg = atomically (withSub queueId $ \s -> const s <$$> tryTakeTMVar (delivered s)) >>= \case - Just (Just s) -> deliverMessage tryDelPeekMsg queueId s + Just (Just s) -> do + stats <- asks serverStats + atomically $ modifyTVar (msgRecv stats) (+ 1) + atomically $ modifyTVar (msgQueues stats) (S.insert queueId) + deliverMessage tryDelPeekMsg queueId s _ -> return $ err NO_MSG withSub :: RecipientId -> (Sub -> STM a) -> STM (Maybe a) @@ -396,13 +433,18 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri ms <- asks msgStore ServerConfig {messageExpiration, msgQueueQuota} <- asks config old <- liftIO $ mapM expireBeforeEpoch messageExpiration - atomically $ do + resp@(_, _, sent) <- atomically $ do q <- getMsgQueue ms (recipientId qr) msgQueueQuota mapM_ (deleteExpiredMsgs q) old ifM (isFull q) (pure $ err QUOTA) $ do trySendNotification writeMsg q msg pure ok + when (sent == OK) $ do + stats <- asks serverStats + atomically $ modifyTVar (msgSent stats) (+ 1) + atomically $ modifyTVar (msgQueues stats) (S.insert $ recipientId qr) + pure resp where mkMessage :: m (Either C.CryptoError Message) mkMessage = do @@ -460,6 +502,8 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri delQueueAndMsgs st = do withLog (`logDeleteQueue` queueId) ms <- asks msgStore + stats <- asks serverStats + atomically $ modifyTVar (qDeleted stats) (+ 1) atomically $ deleteQueue st queueId >>= \case Left e -> pure $ err e diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 6ca63df7b..163979d61 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -2,6 +2,7 @@ {-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TupleSections #-} module Simplex.Messaging.Server.Env.STM where @@ -11,6 +12,9 @@ import Crypto.Random import Data.ByteString.Char8 (ByteString) import Data.Map.Strict (Map) import qualified Data.Map.Strict as M +import Data.Set (Set) +import qualified Data.Set as S +import Data.Time.Clock (UTCTime, getCurrentTime) import Data.Time.Clock.System (SystemTime) import Data.X509.Validation (Fingerprint (..)) import Network.Socket (ServiceName) @@ -45,7 +49,12 @@ data ServerConfig = ServerConfig -- | time after which the socket with inactive client can be disconnected (without any messages or commands, incl. PING), -- and check interval, seconds inactiveClientExpiration :: Maybe ExpirationConfig, - -- CA certificate private key is not needed for initialization + -- | log SMP server usage statistics, only aggregates are logged, seconds + logStatsInterval :: Maybe Int, + -- | time of the day when the stats are logged first, to log at consistent times, + -- irrespective of when the server is started (seconds from 00:00 UTC) + logStatsStartTime :: Int, + -- | CA certificate private key is not needed for initialization caCertificateFile :: FilePath, privateKeyFile :: FilePath, certificateFile :: FilePath @@ -73,7 +82,8 @@ data Env = Env msgStore :: STMMsgStore, idsDrg :: TVar ChaChaDRG, storeLog :: Maybe (StoreLog 'WriteMode), - tlsServerParams :: T.ServerParams + tlsServerParams :: T.ServerParams, + serverStats :: ServerStats } data Server = Server @@ -93,6 +103,16 @@ data Client = Client activeAt :: TVar SystemTime } +data ServerStats = ServerStats + { qCreated :: TVar Int, + qSecured :: TVar Int, + qDeleted :: TVar Int, + msgSent :: TVar Int, + msgRecv :: TVar Int, + msgQueues :: TVar (Set RecipientId), + fromTime :: TVar UTCTime + } + data SubscriptionThread = NoSub | SubPending | SubThread ThreadId data Sub = Sub @@ -118,6 +138,17 @@ newClient qSize sessionId ts = do activeAt <- newTVar ts return Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId, connected, activeAt} +newServerStats :: UTCTime -> STM ServerStats +newServerStats ts = do + qCreated <- newTVar 0 + qSecured <- newTVar 0 + qDeleted <- newTVar 0 + msgSent <- newTVar 0 + msgRecv <- newTVar 0 + msgQueues <- newTVar S.empty + fromTime <- newTVar ts + pure ServerStats {qCreated, qSecured, qDeleted, msgSent, msgRecv, msgQueues, fromTime} + newSubscription :: STM Sub newSubscription = do delivered <- newEmptyTMVar @@ -134,7 +165,8 @@ newEnv config@ServerConfig {caCertificateFile, certificateFile, privateKeyFile, tlsServerParams <- liftIO $ loadTLSServerParams caCertificateFile certificateFile privateKeyFile Fingerprint fp <- liftIO $ loadFingerprint caCertificateFile let serverIdentity = KeyHash fp - return Env {config, server, serverIdentity, queueStore, msgStore, idsDrg, storeLog = s', tlsServerParams} + serverStats <- atomically . newServerStats =<< liftIO getCurrentTime + return Env {config, server, serverIdentity, queueStore, msgStore, idsDrg, storeLog = s', tlsServerParams, serverStats} where restoreQueues :: QueueStore -> StoreLog 'ReadMode -> m (StoreLog 'WriteMode) restoreQueues QueueStore {queues, senders, notifiers} s = do diff --git a/tests/SMPClient.hs b/tests/SMPClient.hs index 0dc15a1af..5e5f7e611 100644 --- a/tests/SMPClient.hs +++ b/tests/SMPClient.hs @@ -63,6 +63,8 @@ cfg = allowNewQueues = True, messageExpiration = Just defaultMessageExpiration, inactiveClientExpiration = Just defaultInactiveClientExpiration, + logStatsInterval = Nothing, + logStatsStartTime = 0, caCertificateFile = "tests/fixtures/ca.crt", privateKeyFile = "tests/fixtures/server.key", certificateFile = "tests/fixtures/server.crt"