mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-06-07 15:22:03 +00:00
log server stats (#367)
* log server stats * separate stats updates from the existing transactions
This commit is contained in:
committed by
GitHub
parent
1064e9c315
commit
ace94d7c69
+10
-2
@@ -5,6 +5,7 @@
|
||||
|
||||
module Main where
|
||||
|
||||
import Control.Logger.Simple
|
||||
import Simplex.Messaging.Server (runSMPServer)
|
||||
import Simplex.Messaging.Server.CLI (ServerCLIConfig (..), protocolServerCLI)
|
||||
import Simplex.Messaging.Server.Env.STM (ServerConfig (..), defaultInactiveClientExpiration, defaultMessageExpiration)
|
||||
@@ -17,8 +18,13 @@ cfgPath = "/etc/opt/simplex"
|
||||
logPath :: FilePath
|
||||
logPath = "/var/opt/simplex"
|
||||
|
||||
logCfg :: LogConfig
|
||||
logCfg = LogConfig {lc_file = Nothing, lc_stderr = True}
|
||||
|
||||
main :: IO ()
|
||||
main = protocolServerCLI smpServerCLIConfig runSMPServer
|
||||
main = do
|
||||
setLogLevel LogInfo
|
||||
withGlobalLogging logCfg $ protocolServerCLI smpServerCLIConfig runSMPServer
|
||||
|
||||
smpServerCLIConfig :: ServerCLIConfig ServerConfig
|
||||
smpServerCLIConfig =
|
||||
@@ -52,6 +58,8 @@ smpServerCLIConfig =
|
||||
storeLogFile,
|
||||
allowNewQueues = True,
|
||||
messageExpiration = Just defaultMessageExpiration,
|
||||
inactiveClientExpiration = Just defaultInactiveClientExpiration
|
||||
inactiveClientExpiration = Just defaultInactiveClientExpiration,
|
||||
logStatsInterval = Just 86400, -- seconds
|
||||
logStatsStartTime = 0 -- seconds from 00:00 UTC
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
{-# LANGUAGE KindSignatures #-}
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE NumericUnderscores #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE RankNTypes #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
@@ -32,6 +33,7 @@ module Simplex.Messaging.Server
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Logger.Simple
|
||||
import Control.Monad
|
||||
import Control.Monad.Except
|
||||
import Control.Monad.IO.Unlift
|
||||
@@ -40,8 +42,12 @@ import Crypto.Random
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.Functor (($>))
|
||||
import Data.List (intercalate)
|
||||
import qualified Data.Map.Strict as M
|
||||
import Data.Maybe (isNothing)
|
||||
import qualified Data.Set as S
|
||||
import qualified Data.Text as T
|
||||
import Data.Time.Clock (UTCTime (..), diffTimeToPicoseconds, getCurrentTime)
|
||||
import Data.Time.Clock.System (SystemTime (..), getSystemTime)
|
||||
import Data.Type.Equality
|
||||
import Network.Socket (ServiceName)
|
||||
@@ -86,7 +92,7 @@ smpServer started = do
|
||||
raceAny_
|
||||
( serverThread s subscribedQ subscribers subscriptions cancelSub :
|
||||
serverThread s ntfSubscribedQ notifiers ntfSubscriptions (\_ -> pure ()) :
|
||||
map runServer transports <> expireMessagesThread_ cfg
|
||||
map runServer transports <> expireMessagesThread_ cfg <> serverStatsThread_ cfg
|
||||
)
|
||||
`finally` withLog closeStoreLog
|
||||
where
|
||||
@@ -140,6 +146,29 @@ smpServer started = do
|
||||
atomically (getMsgQueue ms rId quota)
|
||||
>>= atomically . (`deleteExpiredMsgs` old)
|
||||
|
||||
serverStatsThread_ :: ServerConfig -> [m ()]
|
||||
serverStatsThread_ ServerConfig {logStatsInterval, logStatsStartTime} =
|
||||
maybe [] ((: []) . logServerStats logStatsStartTime) logStatsInterval
|
||||
|
||||
logServerStats :: Int -> Int -> m ()
|
||||
logServerStats startAt logInterval = do
|
||||
initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime
|
||||
logInfo $ "fromTime,qCreated,qSecured,qDeleted,msgSent,msgRecv,msgQueues"
|
||||
threadDelay $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0)
|
||||
ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, msgQueues} <- asks serverStats
|
||||
let interval = 1000000 * logInterval
|
||||
forever $ do
|
||||
ts <- liftIO getCurrentTime
|
||||
fromTime' <- atomically $ swapTVar fromTime ts
|
||||
qCreated' <- atomically $ swapTVar qCreated 0
|
||||
qSecured' <- atomically $ swapTVar qSecured 0
|
||||
qDeleted' <- atomically $ swapTVar qDeleted 0
|
||||
msgSent' <- atomically $ swapTVar msgSent 0
|
||||
msgRecv' <- atomically $ swapTVar msgRecv 0
|
||||
msgQueues' <- atomically $ S.size <$> swapTVar msgQueues S.empty
|
||||
logInfo . T.pack $ intercalate "," [show fromTime', show qCreated', show qSecured', show qDeleted', show msgSent', show msgRecv', show msgQueues']
|
||||
threadDelay interval
|
||||
|
||||
runClient :: Transport c => TProxy c -> c -> m ()
|
||||
runClient _ h = do
|
||||
kh <- asks serverIdentity
|
||||
@@ -311,6 +340,8 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri
|
||||
Left e -> pure $ ERR e
|
||||
Right _ -> do
|
||||
withLog (`logCreateById` rId)
|
||||
stats <- asks serverStats
|
||||
atomically $ modifyTVar (qCreated stats) (+ 1)
|
||||
subscribeQueue rId $> IDS (qik ids)
|
||||
|
||||
logCreateById :: StoreLog 'WriteMode -> RecipientId -> IO ()
|
||||
@@ -327,6 +358,8 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri
|
||||
secureQueue_ :: QueueStore -> SndPublicVerifyKey -> m (Transmission BrokerMsg)
|
||||
secureQueue_ st sKey = do
|
||||
withLog $ \s -> logSecureQueue s queueId sKey
|
||||
stats <- asks serverStats
|
||||
atomically $ modifyTVar (qSecured stats) (+ 1)
|
||||
atomically $ (corrId,queueId,) . either ERR (const OK) <$> secureQueue st queueId sKey
|
||||
|
||||
addQueueNotifier_ :: QueueStore -> NtfPublicVerifyKey -> m (Transmission BrokerMsg)
|
||||
@@ -373,7 +406,11 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri
|
||||
acknowledgeMsg =
|
||||
atomically (withSub queueId $ \s -> const s <$$> tryTakeTMVar (delivered s))
|
||||
>>= \case
|
||||
Just (Just s) -> deliverMessage tryDelPeekMsg queueId s
|
||||
Just (Just s) -> do
|
||||
stats <- asks serverStats
|
||||
atomically $ modifyTVar (msgRecv stats) (+ 1)
|
||||
atomically $ modifyTVar (msgQueues stats) (S.insert queueId)
|
||||
deliverMessage tryDelPeekMsg queueId s
|
||||
_ -> return $ err NO_MSG
|
||||
|
||||
withSub :: RecipientId -> (Sub -> STM a) -> STM (Maybe a)
|
||||
@@ -396,13 +433,18 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri
|
||||
ms <- asks msgStore
|
||||
ServerConfig {messageExpiration, msgQueueQuota} <- asks config
|
||||
old <- liftIO $ mapM expireBeforeEpoch messageExpiration
|
||||
atomically $ do
|
||||
resp@(_, _, sent) <- atomically $ do
|
||||
q <- getMsgQueue ms (recipientId qr) msgQueueQuota
|
||||
mapM_ (deleteExpiredMsgs q) old
|
||||
ifM (isFull q) (pure $ err QUOTA) $ do
|
||||
trySendNotification
|
||||
writeMsg q msg
|
||||
pure ok
|
||||
when (sent == OK) $ do
|
||||
stats <- asks serverStats
|
||||
atomically $ modifyTVar (msgSent stats) (+ 1)
|
||||
atomically $ modifyTVar (msgQueues stats) (S.insert $ recipientId qr)
|
||||
pure resp
|
||||
where
|
||||
mkMessage :: m (Either C.CryptoError Message)
|
||||
mkMessage = do
|
||||
@@ -460,6 +502,8 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri
|
||||
delQueueAndMsgs st = do
|
||||
withLog (`logDeleteQueue` queueId)
|
||||
ms <- asks msgStore
|
||||
stats <- asks serverStats
|
||||
atomically $ modifyTVar (qDeleted stats) (+ 1)
|
||||
atomically $
|
||||
deleteQueue st queueId >>= \case
|
||||
Left e -> pure $ err e
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
{-# LANGUAGE DuplicateRecordFields #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
{-# LANGUAGE TupleSections #-}
|
||||
|
||||
module Simplex.Messaging.Server.Env.STM where
|
||||
|
||||
@@ -11,6 +12,9 @@ import Crypto.Random
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import Data.Map.Strict (Map)
|
||||
import qualified Data.Map.Strict as M
|
||||
import Data.Set (Set)
|
||||
import qualified Data.Set as S
|
||||
import Data.Time.Clock (UTCTime, getCurrentTime)
|
||||
import Data.Time.Clock.System (SystemTime)
|
||||
import Data.X509.Validation (Fingerprint (..))
|
||||
import Network.Socket (ServiceName)
|
||||
@@ -45,7 +49,12 @@ data ServerConfig = ServerConfig
|
||||
-- | time after which the socket with inactive client can be disconnected (without any messages or commands, incl. PING),
|
||||
-- and check interval, seconds
|
||||
inactiveClientExpiration :: Maybe ExpirationConfig,
|
||||
-- CA certificate private key is not needed for initialization
|
||||
-- | log SMP server usage statistics, only aggregates are logged, seconds
|
||||
logStatsInterval :: Maybe Int,
|
||||
-- | time of the day when the stats are logged first, to log at consistent times,
|
||||
-- irrespective of when the server is started (seconds from 00:00 UTC)
|
||||
logStatsStartTime :: Int,
|
||||
-- | CA certificate private key is not needed for initialization
|
||||
caCertificateFile :: FilePath,
|
||||
privateKeyFile :: FilePath,
|
||||
certificateFile :: FilePath
|
||||
@@ -73,7 +82,8 @@ data Env = Env
|
||||
msgStore :: STMMsgStore,
|
||||
idsDrg :: TVar ChaChaDRG,
|
||||
storeLog :: Maybe (StoreLog 'WriteMode),
|
||||
tlsServerParams :: T.ServerParams
|
||||
tlsServerParams :: T.ServerParams,
|
||||
serverStats :: ServerStats
|
||||
}
|
||||
|
||||
data Server = Server
|
||||
@@ -93,6 +103,16 @@ data Client = Client
|
||||
activeAt :: TVar SystemTime
|
||||
}
|
||||
|
||||
data ServerStats = ServerStats
|
||||
{ qCreated :: TVar Int,
|
||||
qSecured :: TVar Int,
|
||||
qDeleted :: TVar Int,
|
||||
msgSent :: TVar Int,
|
||||
msgRecv :: TVar Int,
|
||||
msgQueues :: TVar (Set RecipientId),
|
||||
fromTime :: TVar UTCTime
|
||||
}
|
||||
|
||||
data SubscriptionThread = NoSub | SubPending | SubThread ThreadId
|
||||
|
||||
data Sub = Sub
|
||||
@@ -118,6 +138,17 @@ newClient qSize sessionId ts = do
|
||||
activeAt <- newTVar ts
|
||||
return Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId, connected, activeAt}
|
||||
|
||||
newServerStats :: UTCTime -> STM ServerStats
|
||||
newServerStats ts = do
|
||||
qCreated <- newTVar 0
|
||||
qSecured <- newTVar 0
|
||||
qDeleted <- newTVar 0
|
||||
msgSent <- newTVar 0
|
||||
msgRecv <- newTVar 0
|
||||
msgQueues <- newTVar S.empty
|
||||
fromTime <- newTVar ts
|
||||
pure ServerStats {qCreated, qSecured, qDeleted, msgSent, msgRecv, msgQueues, fromTime}
|
||||
|
||||
newSubscription :: STM Sub
|
||||
newSubscription = do
|
||||
delivered <- newEmptyTMVar
|
||||
@@ -134,7 +165,8 @@ newEnv config@ServerConfig {caCertificateFile, certificateFile, privateKeyFile,
|
||||
tlsServerParams <- liftIO $ loadTLSServerParams caCertificateFile certificateFile privateKeyFile
|
||||
Fingerprint fp <- liftIO $ loadFingerprint caCertificateFile
|
||||
let serverIdentity = KeyHash fp
|
||||
return Env {config, server, serverIdentity, queueStore, msgStore, idsDrg, storeLog = s', tlsServerParams}
|
||||
serverStats <- atomically . newServerStats =<< liftIO getCurrentTime
|
||||
return Env {config, server, serverIdentity, queueStore, msgStore, idsDrg, storeLog = s', tlsServerParams, serverStats}
|
||||
where
|
||||
restoreQueues :: QueueStore -> StoreLog 'ReadMode -> m (StoreLog 'WriteMode)
|
||||
restoreQueues QueueStore {queues, senders, notifiers} s = do
|
||||
|
||||
@@ -63,6 +63,8 @@ cfg =
|
||||
allowNewQueues = True,
|
||||
messageExpiration = Just defaultMessageExpiration,
|
||||
inactiveClientExpiration = Just defaultInactiveClientExpiration,
|
||||
logStatsInterval = Nothing,
|
||||
logStatsStartTime = 0,
|
||||
caCertificateFile = "tests/fixtures/ca.crt",
|
||||
privateKeyFile = "tests/fixtures/server.key",
|
||||
certificateFile = "tests/fixtures/server.crt"
|
||||
|
||||
Reference in New Issue
Block a user