mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-29 08:00:09 +00:00
feat(bench): add smp-server memory benchmark framework
Layered benchmark that isolates per-component memory cost: - Phase 1: baseline (no clients) - Phase 2: TLS connections only - Phase 3: queue creation (NEW + KEY) - Phase 4: subscriptions (SUB) - Phase 5: message send - Phase 6: message receive + ACK - Phase 7: sustained load with time-series Includes Docker Compose (PostgreSQL 17), run.sh with --compare-rts mode for testing different GC configurations.
This commit is contained in:
133
bench/ClientSim.hs
Normal file
133
bench/ClientSim.hs
Normal file
@@ -0,0 +1,133 @@
|
||||
{-# LANGUAGE DataKinds #-}
|
||||
{-# LANGUAGE DuplicateRecordFields #-}
|
||||
{-# LANGUAGE GADTs #-}
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE PatternSynonyms #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
{-# LANGUAGE TupleSections #-}
|
||||
{-# LANGUAGE TypeApplications #-}
|
||||
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}
|
||||
|
||||
module ClientSim
|
||||
( SimClient (..),
|
||||
connectClient,
|
||||
createQueue,
|
||||
subscribeQueue,
|
||||
sendMessage,
|
||||
receiveAndAck,
|
||||
connectN,
|
||||
benchKeyHash,
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Concurrent.Async (mapConcurrently)
|
||||
import Control.Concurrent.STM
|
||||
import Control.Monad (forM_)
|
||||
import Control.Monad.Except (runExceptT)
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import Data.List (unfoldr)
|
||||
import qualified Data.List.NonEmpty as L
|
||||
import Network.Socket (ServiceName)
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Encoding
|
||||
import Simplex.Messaging.Protocol
|
||||
import Simplex.Messaging.Transport
|
||||
import Simplex.Messaging.Transport.Client
|
||||
import Simplex.Messaging.Version
|
||||
|
||||
data SimClient = SimClient
|
||||
{ scHandle :: THandleSMP TLS 'TClient,
|
||||
scRcvKey :: C.APrivateAuthKey,
|
||||
scRcvId :: RecipientId,
|
||||
scSndId :: SenderId,
|
||||
scDhSecret :: C.DhSecret 'C.X25519
|
||||
}
|
||||
|
||||
benchKeyHash :: C.KeyHash
|
||||
benchKeyHash = "LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI="
|
||||
|
||||
connectClient :: TransportHost -> ServiceName -> IO (THandleSMP TLS 'TClient)
|
||||
connectClient host port = do
|
||||
let tcConfig = defaultTransportClientConfig {clientALPN = Just alpnSupportedSMPHandshakes}
|
||||
runTransportClient tcConfig Nothing host port (Just benchKeyHash) $ \h ->
|
||||
runExceptT (smpClientHandshake h Nothing benchKeyHash supportedClientSMPRelayVRange False Nothing) >>= \case
|
||||
Right th -> pure th
|
||||
Left e -> error $ "SMP handshake failed: " <> show e
|
||||
|
||||
connectN :: Int -> TransportHost -> ServiceName -> IO [THandleSMP TLS 'TClient]
|
||||
connectN n host port = do
|
||||
let batches = chunksOf 100 [1 .. n]
|
||||
concat <$> mapM (\batch -> mapConcurrently (\_ -> connectClient host port) batch) batches
|
||||
|
||||
createQueue :: THandleSMP TLS 'TClient -> IO SimClient
|
||||
createQueue h = do
|
||||
g <- C.newRandom
|
||||
(rPub, rKey) <- atomically $ C.generateAuthKeyPair C.SEd448 g
|
||||
(sPub, sKey) <- atomically $ C.generateAuthKeyPair C.SEd25519 g
|
||||
(dhPub, dhPriv :: C.PrivateKeyX25519) <- atomically $ C.generateKeyPair g
|
||||
-- NEW command
|
||||
Resp "1" NoEntity (Ids rId sId srvDh) <- signSendRecv h rKey ("1", NoEntity, New rPub dhPub)
|
||||
let dhShared = C.dh' srvDh dhPriv
|
||||
-- KEY command (secure queue)
|
||||
Resp "2" _ OK <- signSendRecv h rKey ("2", rId, KEY sPub)
|
||||
pure SimClient {scHandle = h, scRcvKey = rKey, scRcvId = rId, scSndId = sId, scDhSecret = dhShared}
|
||||
|
||||
subscribeQueue :: SimClient -> IO ()
|
||||
subscribeQueue SimClient {scHandle = h, scRcvKey = rKey, scRcvId = rId} = do
|
||||
Resp "3" _ (SOK _) <- signSendRecv h rKey ("3", rId, SUB)
|
||||
pure ()
|
||||
|
||||
sendMessage :: THandleSMP TLS 'TClient -> C.APrivateAuthKey -> SenderId -> ByteString -> IO ()
|
||||
sendMessage h sKey sId body = do
|
||||
Resp "4" _ OK <- signSendRecv h sKey ("4", sId, SEND noMsgFlags body)
|
||||
pure ()
|
||||
|
||||
receiveAndAck :: SimClient -> IO ()
|
||||
receiveAndAck SimClient {scHandle = h, scRcvKey = rKey, scRcvId = rId} = do
|
||||
(_, _, Right (MSG RcvMessage {msgId = mId})) <- tGet1 h
|
||||
Resp "5" _ OK <- signSendRecv h rKey ("5", rId, ACK mId)
|
||||
pure ()
|
||||
|
||||
-- Helpers (same patterns as ServerTests.hs)
|
||||
|
||||
pattern Resp :: CorrId -> EntityId -> BrokerMsg -> Transmission (Either ErrorType BrokerMsg)
|
||||
pattern Resp corrId queueId command <- (corrId, queueId, Right command)
|
||||
|
||||
pattern Ids :: RecipientId -> SenderId -> RcvPublicDhKey -> BrokerMsg
|
||||
pattern Ids rId sId srvDh <- IDS (QIK rId sId srvDh _ _ Nothing Nothing)
|
||||
|
||||
pattern New :: RcvPublicAuthKey -> RcvPublicDhKey -> Command 'Creator
|
||||
pattern New rPub dhPub = NEW (NewQueueReq rPub dhPub Nothing SMSubscribe (Just (QRMessaging Nothing)) Nothing)
|
||||
|
||||
signSendRecv :: (Transport c, PartyI p) => THandleSMP c 'TClient -> C.APrivateAuthKey -> (ByteString, EntityId, Command p) -> IO (Transmission (Either ErrorType BrokerMsg))
|
||||
signSendRecv h pk t = do
|
||||
signSend h pk t
|
||||
(r L.:| _) <- tGetClient h
|
||||
pure r
|
||||
|
||||
signSend :: (Transport c, PartyI p) => THandleSMP c 'TClient -> C.APrivateAuthKey -> (ByteString, EntityId, Command p) -> IO ()
|
||||
signSend h@THandle {params} (C.APrivateAuthKey a pk) (corrId, qId, cmd) = do
|
||||
let TransmissionForAuth {tForAuth, tToSend} = encodeTransmissionForAuth params (CorrId corrId, qId, cmd)
|
||||
authorize t = (,Nothing) <$> case a of
|
||||
C.SEd25519 -> Just . TASignature . C.ASignature C.SEd25519 $ C.sign' pk t
|
||||
C.SEd448 -> Just . TASignature . C.ASignature C.SEd448 $ C.sign' pk t
|
||||
C.SX25519 -> (\THAuthClient {peerServerPubKey = k} -> TAAuthenticator $ C.cbAuthenticate k pk (C.cbNonce corrId) t) <$> thAuth params
|
||||
Right () <- tPut1 h (authorize tForAuth, tToSend)
|
||||
pure ()
|
||||
|
||||
tPut1 :: Transport c => THandle v c 'TClient -> SentRawTransmission -> IO (Either TransportError ())
|
||||
tPut1 h t = do
|
||||
rs <- tPut h (Right t L.:| [])
|
||||
case rs of
|
||||
(r : _) -> pure r
|
||||
[] -> error "tPut1: empty result"
|
||||
|
||||
tGet1 :: (ProtocolEncoding v err cmd, Transport c) => THandle v c 'TClient -> IO (Transmission (Either err cmd))
|
||||
tGet1 h = do
|
||||
(r L.:| _) <- tGetClient h
|
||||
pure r
|
||||
|
||||
chunksOf :: Int -> [a] -> [[a]]
|
||||
chunksOf n = unfoldr $ \xs -> if null xs then Nothing else Just (splitAt n xs)
|
||||
243
bench/Main.hs
Normal file
243
bench/Main.hs
Normal file
@@ -0,0 +1,243 @@
|
||||
{-# LANGUAGE CPP #-}
|
||||
{-# LANGUAGE DataKinds #-}
|
||||
{-# LANGUAGE DuplicateRecordFields #-}
|
||||
{-# LANGUAGE GADTs #-}
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE NumericUnderscores #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
{-# LANGUAGE TypeApplications #-}
|
||||
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}
|
||||
|
||||
module Main where
|
||||
|
||||
import Control.Concurrent (threadDelay)
|
||||
import Control.Concurrent.Async (async, cancel, forConcurrently_, mapConcurrently, mapConcurrently_)
|
||||
import Control.Concurrent.STM
|
||||
import Control.Monad (forever, forM_, void, when)
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.IORef
|
||||
import Data.List (unfoldr)
|
||||
import Data.Time.Clock (getCurrentTime, utctDayTime)
|
||||
import Network.Socket (ServiceName)
|
||||
import System.Environment (getArgs)
|
||||
import System.IO (hFlush, stdout)
|
||||
|
||||
import ClientSim
|
||||
import Report
|
||||
|
||||
import Crypto.Random (ChaChaDRG)
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Server (runSMPServerBlocking)
|
||||
import Simplex.Messaging.Server.Env.STM as Env
|
||||
import Simplex.Messaging.Server.Expiration (ExpirationConfig (..))
|
||||
import Simplex.Messaging.Server.MsgStore.Postgres (PostgresMsgStore)
|
||||
import Simplex.Messaging.Server.QueueStore.Postgres.Config (PostgresStoreCfg (..))
|
||||
import Simplex.Messaging.Agent.Store.Postgres.Options (DBOpts (..))
|
||||
import Simplex.Messaging.Agent.Store.Shared (MigrationConfirmation (..))
|
||||
import Simplex.Messaging.Client.Agent (SMPClientAgentConfig (..), defaultSMPClientAgentConfig)
|
||||
import Simplex.Messaging.Transport
|
||||
import Simplex.Messaging.Transport.Server (ServerCredentials (..), mkTransportServerConfig)
|
||||
import Simplex.Messaging.Version
|
||||
import UnliftIO.Exception (bracket)
|
||||
|
||||
import Control.Logger.Simple (logInfo, withGlobalLogging, LogConfig (..), setLogLevel, LogLevel (..))
|
||||
|
||||
data BenchConfig = BenchConfig
|
||||
{ numClients :: Int,
|
||||
sustainedMinutes :: Int,
|
||||
pgConnStr :: ByteString,
|
||||
serverPort :: ServiceName,
|
||||
timeSeriesFile :: FilePath
|
||||
}
|
||||
|
||||
defaultBenchConfig :: BenchConfig
|
||||
defaultBenchConfig =
|
||||
BenchConfig
|
||||
{ numClients = 5000,
|
||||
sustainedMinutes = 5,
|
||||
pgConnStr = "postgresql://smp@localhost:15432/smp_bench",
|
||||
serverPort = "15001",
|
||||
timeSeriesFile = "bench-timeseries.csv"
|
||||
}
|
||||
|
||||
parseArgs :: IO BenchConfig
|
||||
parseArgs = do
|
||||
args <- getArgs
|
||||
pure $ go args defaultBenchConfig
|
||||
where
|
||||
go [] c = c
|
||||
go ("--clients" : n : rest) c = go rest c {numClients = read n}
|
||||
go ("--minutes" : n : rest) c = go rest c {sustainedMinutes = read n}
|
||||
go ("--pg" : s : rest) c = go rest c {pgConnStr = B.pack s}
|
||||
go ("--port" : p : rest) c = go rest c {serverPort = p}
|
||||
go ("--timeseries" : f : rest) c = go rest c {timeSeriesFile = f}
|
||||
go (x : _) _ = error $ "Unknown argument: " <> x
|
||||
|
||||
main :: IO ()
|
||||
main = withGlobalLogging LogConfig {lc_file = Nothing, lc_stderr = True} $ do
|
||||
setLogLevel LogInfo
|
||||
bc@BenchConfig {numClients, sustainedMinutes, serverPort, timeSeriesFile, pgConnStr} <- parseArgs
|
||||
putStrLn $ "SMP Server Memory Benchmark"
|
||||
putStrLn $ " clients: " <> show numClients
|
||||
putStrLn $ " sustain: " <> show sustainedMinutes <> " min"
|
||||
putStrLn $ " pg: " <> B.unpack pgConnStr
|
||||
putStrLn $ " port: " <> serverPort
|
||||
putStrLn ""
|
||||
|
||||
snapshotsRef <- newIORef []
|
||||
|
||||
let snap phase clients = do
|
||||
s <- takeSnapshot phase clients
|
||||
modifyIORef' snapshotsRef (s :)
|
||||
putStrLn $ " [" <> show phase <> "] live=" <> show (snapLive s `div` (1024 * 1024)) <> "MB large=" <> show (snapLarge s `div` (1024 * 1024)) <> "MB"
|
||||
hFlush stdout
|
||||
|
||||
withBenchServer bc $ do
|
||||
putStrLn "Phase 1: Baseline (no clients)"
|
||||
snap "baseline" 0
|
||||
|
||||
putStrLn $ "Phase 2: Connecting " <> show numClients <> " TLS clients..."
|
||||
handles <- connectN numClients "localhost" serverPort
|
||||
putStrLn $ " Connected " <> show (length handles) <> " clients"
|
||||
snap "tls_connect" (length handles)
|
||||
|
||||
putStrLn "Phase 3: Creating queues (NEW + KEY)..."
|
||||
simClients <- mapConcurrently createQueue handles
|
||||
putStrLn $ " Created " <> show (length simClients) <> " queues"
|
||||
snap "queue_create" (length simClients)
|
||||
|
||||
putStrLn "Phase 4: Subscribing (SUB)..."
|
||||
mapConcurrently_ subscribeQueue simClients
|
||||
snap "subscribe" (length simClients)
|
||||
|
||||
-- Pair up clients: first half sends to second half
|
||||
let halfN = length simClients `div` 2
|
||||
senders = take halfN simClients
|
||||
receivers = drop halfN simClients
|
||||
pairs = zip senders receivers
|
||||
|
||||
putStrLn $ "Phase 5: Sending " <> show halfN <> " messages..."
|
||||
g <- C.newRandom
|
||||
forConcurrently_ pairs $ \(sender, receiver) -> do
|
||||
(_, sKey) <- atomically $ C.generateAuthKeyPair C.SEd25519 g
|
||||
sendMessage (scHandle sender) sKey (scSndId receiver) "benchmark test message payload 1234567890"
|
||||
snap "msg_send" (length simClients)
|
||||
|
||||
putStrLn "Phase 6: Receiving and ACKing messages..."
|
||||
forConcurrently_ receivers receiveAndAck
|
||||
snap "msg_recv" (length simClients)
|
||||
|
||||
putStrLn $ "Phase 7: Sustained load (" <> show sustainedMinutes <> " min)..."
|
||||
writeTimeSeriesHeader timeSeriesFile
|
||||
-- Logger thread: snapshot every 10s
|
||||
logger <- async $ forever $ do
|
||||
threadDelay 10_000_000
|
||||
s <- takeSnapshot "sustained" (length simClients)
|
||||
appendTimeSeries timeSeriesFile s
|
||||
-- Worker threads: continuous send/receive
|
||||
let loopDurationUs = sustainedMinutes * 60 * 1_000_000
|
||||
workersDone <- newTVarIO False
|
||||
workers <- async $ do
|
||||
deadline <- (+ loopDurationUs) <$> getMonotonicTimeUs
|
||||
sustainedLoop g pairs deadline
|
||||
atomically $ writeTVar workersDone True
|
||||
-- Wait for workers
|
||||
void $ atomically $ readTVar workersDone >>= \done -> when (not done) retry
|
||||
cancel logger
|
||||
cancel workers
|
||||
snap "sustained_end" (length simClients)
|
||||
|
||||
snapshots <- reverse <$> readIORef snapshotsRef
|
||||
printSummary snapshots
|
||||
putStrLn $ "\nTime-series written to: " <> timeSeriesFile
|
||||
|
||||
sustainedLoop :: TVar ChaChaDRG -> [(SimClient, SimClient)] -> Int -> IO ()
|
||||
sustainedLoop g pairs deadline = go
|
||||
where
|
||||
go = do
|
||||
now <- getMonotonicTimeUs
|
||||
when (now < deadline) $ do
|
||||
forConcurrently_ pairs $ \(sender, receiver) -> do
|
||||
(_, sKey) <- atomically $ C.generateAuthKeyPair C.SEd25519 g
|
||||
sendMessage (scHandle sender) sKey (scSndId receiver) "sustained load message payload"
|
||||
forConcurrently_ (map snd pairs) receiveAndAck
|
||||
go
|
||||
|
||||
getMonotonicTimeUs :: IO Int
|
||||
getMonotonicTimeUs = do
|
||||
t <- getCurrentTime
|
||||
pure $ round (utctDayTime t * 1_000_000)
|
||||
|
||||
withBenchServer :: BenchConfig -> IO a -> IO a
|
||||
withBenchServer BenchConfig {pgConnStr, serverPort} action = do
|
||||
started <- newEmptyTMVarIO
|
||||
let srvCfg = benchServerConfig pgConnStr serverPort
|
||||
bracket
|
||||
(async $ runSMPServerBlocking started srvCfg Nothing)
|
||||
cancel
|
||||
(\_ -> waitForServer started >> action)
|
||||
where
|
||||
waitForServer started = do
|
||||
r <- atomically $ takeTMVar started
|
||||
if r
|
||||
then putStrLn $ "Server started on port " <> serverPort
|
||||
else error "Server failed to start"
|
||||
|
||||
benchServerConfig :: ByteString -> ServiceName -> ServerConfig PostgresMsgStore
|
||||
benchServerConfig pgConn port =
|
||||
let storeCfg = PostgresStoreCfg
|
||||
{ dbOpts = DBOpts {connstr = pgConn, schema = "smp_server", poolSize = 10, createSchema = True},
|
||||
dbStoreLogPath = Nothing,
|
||||
confirmMigrations = MCYesUp,
|
||||
deletedTTL = 86400
|
||||
}
|
||||
in ServerConfig
|
||||
{ transports = [(port, transport @TLS, False)],
|
||||
smpHandshakeTimeout = 120_000_000,
|
||||
tbqSize = 128,
|
||||
msgQueueQuota = 128,
|
||||
maxJournalMsgCount = 256,
|
||||
maxJournalStateLines = 16,
|
||||
queueIdBytes = 24,
|
||||
msgIdBytes = 24,
|
||||
serverStoreCfg = SSCDatabase storeCfg,
|
||||
storeNtfsFile = Nothing,
|
||||
allowNewQueues = True,
|
||||
newQueueBasicAuth = Nothing,
|
||||
controlPortUserAuth = Nothing,
|
||||
controlPortAdminAuth = Nothing,
|
||||
dailyBlockQueueQuota = 20,
|
||||
messageExpiration = Just defaultMessageExpiration,
|
||||
expireMessagesOnStart = False,
|
||||
expireMessagesOnSend = False,
|
||||
idleQueueInterval = 14400,
|
||||
notificationExpiration = defaultNtfExpiration,
|
||||
inactiveClientExpiration = Nothing,
|
||||
logStatsInterval = Nothing,
|
||||
logStatsStartTime = 0,
|
||||
serverStatsLogFile = "bench/tmp/stats.log",
|
||||
serverStatsBackupFile = Nothing,
|
||||
prometheusInterval = Nothing,
|
||||
prometheusMetricsFile = "bench/tmp/metrics.txt",
|
||||
pendingENDInterval = 500_000,
|
||||
ntfDeliveryInterval = 200_000,
|
||||
smpCredentials =
|
||||
ServerCredentials
|
||||
{ caCertificateFile = Just "tests/fixtures/ca.crt",
|
||||
privateKeyFile = "tests/fixtures/server.key",
|
||||
certificateFile = "tests/fixtures/server.crt"
|
||||
},
|
||||
httpCredentials = Nothing,
|
||||
smpServerVRange = supportedServerSMPRelayVRange,
|
||||
Env.transportConfig = mkTransportServerConfig True (Just alpnSupportedSMPHandshakes) True,
|
||||
controlPort = Nothing,
|
||||
smpAgentCfg = defaultSMPClientAgentConfig {persistErrorInterval = 1},
|
||||
allowSMPProxy = False,
|
||||
serverClientConcurrency = 16,
|
||||
information = Nothing,
|
||||
startOptions = StartOptions {maintenance = False, compactLog = False, logLevel = LogInfo, skipWarnings = True, confirmMigrations = MCYesUp}
|
||||
}
|
||||
|
||||
113
bench/Report.hs
Normal file
113
bench/Report.hs
Normal file
@@ -0,0 +1,113 @@
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE NumericUnderscores #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
|
||||
module Report
|
||||
( Snapshot (..),
|
||||
takeSnapshot,
|
||||
printSummary,
|
||||
writeTimeSeriesHeader,
|
||||
appendTimeSeries,
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Concurrent (threadDelay)
|
||||
import Data.List (foldl')
|
||||
import Data.Text (Text)
|
||||
import qualified Data.Text as T
|
||||
import qualified Data.Text.IO as T
|
||||
import Data.Time.Clock (UTCTime, getCurrentTime)
|
||||
import Data.Time.Format.ISO8601 (iso8601Show)
|
||||
import Data.Word (Word32, Word64)
|
||||
import GHC.Stats (RTSStats (..), GCDetails (..), getRTSStats)
|
||||
import System.IO (Handle, IOMode (..), hFlush, hSetBuffering, BufferMode (..), withFile)
|
||||
import System.Mem (performMajorGC)
|
||||
|
||||
data Snapshot = Snapshot
|
||||
{ snapTime :: UTCTime,
|
||||
snapPhase :: Text,
|
||||
snapLive :: Word64,
|
||||
snapHeap :: Word64,
|
||||
snapLarge :: Word64,
|
||||
snapFrag :: Word64,
|
||||
snapGCs :: Word32,
|
||||
snapClients :: Int
|
||||
}
|
||||
|
||||
takeSnapshot :: Text -> Int -> IO Snapshot
|
||||
takeSnapshot phase clients = do
|
||||
performMajorGC
|
||||
threadDelay 1_000_000
|
||||
rts <- getRTSStats
|
||||
ts <- getCurrentTime
|
||||
let GCDetails {gcdetails_live_bytes, gcdetails_mem_in_use_bytes, gcdetails_large_objects_bytes, gcdetails_block_fragmentation_bytes} = gc rts
|
||||
pure
|
||||
Snapshot
|
||||
{ snapTime = ts,
|
||||
snapPhase = phase,
|
||||
snapLive = gcdetails_live_bytes,
|
||||
snapHeap = gcdetails_mem_in_use_bytes,
|
||||
snapLarge = gcdetails_large_objects_bytes,
|
||||
snapFrag = gcdetails_block_fragmentation_bytes,
|
||||
snapGCs = gcs rts,
|
||||
snapClients = clients
|
||||
}
|
||||
|
||||
printSummary :: [Snapshot] -> IO ()
|
||||
printSummary [] = putStrLn "No snapshots collected."
|
||||
printSummary snaps = do
|
||||
putStrLn ""
|
||||
putStrLn hdr
|
||||
putStrLn $ replicate (length hdr) '-'
|
||||
mapM_ printRow (zip (Snapshot {snapLive = 0, snapHeap = 0, snapLarge = 0, snapFrag = 0, snapGCs = 0, snapClients = 0, snapPhase = "", snapTime = snapTime (head snaps)} : snaps) snaps)
|
||||
where
|
||||
hdr = padR 20 "Phase" <> padL 12 "live_MB" <> padL 12 "large_MB" <> padL 12 "frag_MB" <> padL 12 "heap_MB" <> padL 10 "clients" <> padL 14 "d_live_MB" <> padL 14 "d_large_MB" <> padL 14 "KB/client"
|
||||
printRow (prev, cur) =
|
||||
putStrLn $
|
||||
padR 20 (T.unpack $ snapPhase cur)
|
||||
<> padL 12 (showMB $ snapLive cur)
|
||||
<> padL 12 (showMB $ snapLarge cur)
|
||||
<> padL 12 (showMB $ snapFrag cur)
|
||||
<> padL 12 (showMB $ snapHeap cur)
|
||||
<> padL 10 (show $ snapClients cur)
|
||||
<> padL 14 (showDeltaMB (snapLive cur) (snapLive prev))
|
||||
<> padL 14 (showDeltaMB (snapLarge cur) (snapLarge prev))
|
||||
<> padL 14 (perClient cur)
|
||||
showMB w = show (w `div` (1024 * 1024))
|
||||
showDeltaMB a b
|
||||
| a >= b = "+" <> show ((a - b) `div` (1024 * 1024))
|
||||
| otherwise = "-" <> show ((b - a) `div` (1024 * 1024))
|
||||
perClient Snapshot {snapClients, snapLive}
|
||||
| snapClients > 0 = show (snapLive `div` fromIntegral snapClients `div` 1024)
|
||||
| otherwise = "-"
|
||||
padR n s = s <> replicate (max 0 (n - length s)) ' '
|
||||
padL n s = replicate (max 0 (n - length s)) ' ' <> s
|
||||
|
||||
csvHeader :: Text
|
||||
csvHeader = "timestamp,phase,rts_live,rts_heap,rts_large,rts_frag,rts_gc,clients"
|
||||
|
||||
snapshotCsv :: Snapshot -> Text
|
||||
snapshotCsv Snapshot {snapTime, snapPhase, snapLive, snapHeap, snapLarge, snapFrag, snapGCs, snapClients} =
|
||||
T.intercalate
|
||||
","
|
||||
[ T.pack $ iso8601Show snapTime,
|
||||
snapPhase,
|
||||
tshow snapLive,
|
||||
tshow snapHeap,
|
||||
tshow snapLarge,
|
||||
tshow snapFrag,
|
||||
tshow snapGCs,
|
||||
tshow snapClients
|
||||
]
|
||||
|
||||
writeTimeSeriesHeader :: FilePath -> IO ()
|
||||
writeTimeSeriesHeader path = T.writeFile path (csvHeader <> "\n")
|
||||
|
||||
appendTimeSeries :: FilePath -> Snapshot -> IO ()
|
||||
appendTimeSeries path snap =
|
||||
withFile path AppendMode $ \h -> do
|
||||
hSetBuffering h LineBuffering
|
||||
T.hPutStrLn h $ snapshotCsv snap
|
||||
|
||||
tshow :: Show a => a -> Text
|
||||
tshow = T.pack . show
|
||||
20
bench/docker-compose.yml
Normal file
20
bench/docker-compose.yml
Normal file
@@ -0,0 +1,20 @@
|
||||
services:
|
||||
postgres:
|
||||
image: postgres:17
|
||||
environment:
|
||||
POSTGRES_USER: smp
|
||||
POSTGRES_DB: smp_bench
|
||||
POSTGRES_HOST_AUTH_METHOD: trust
|
||||
ports:
|
||||
- "15432:5432"
|
||||
volumes:
|
||||
- ./init.sql:/docker-entrypoint-initdb.d/init.sql
|
||||
- pgdata:/var/lib/postgresql/data
|
||||
healthcheck:
|
||||
test: ["CMD-SHELL", "pg_isready -U smp -d smp_bench"]
|
||||
interval: 2s
|
||||
timeout: 5s
|
||||
retries: 10
|
||||
|
||||
volumes:
|
||||
pgdata:
|
||||
2
bench/init.sql
Normal file
2
bench/init.sql
Normal file
@@ -0,0 +1,2 @@
|
||||
CREATE EXTENSION IF NOT EXISTS pgcrypto;
|
||||
CREATE SCHEMA IF NOT EXISTS smp_server;
|
||||
48
bench/run.sh
Executable file
48
bench/run.sh
Executable file
@@ -0,0 +1,48 @@
|
||||
#!/bin/bash
|
||||
set -e
|
||||
cd "$(dirname "$0")"
|
||||
|
||||
mkdir -p tmp
|
||||
|
||||
reset_db() {
|
||||
docker compose down -v 2>/dev/null || true
|
||||
docker compose up -d --wait
|
||||
echo "PostgreSQL ready."
|
||||
}
|
||||
|
||||
if [ "$1" = "--compare-rts" ]; then
|
||||
shift
|
||||
for label_flags in \
|
||||
"default:-N -A16m -s" \
|
||||
"F1.2:-N -A16m -F1.2 -s" \
|
||||
"F1.5:-N -A16m -F1.5 -s" \
|
||||
"A4m:-N -A4m -s" \
|
||||
"A4m-F1.2:-N -A4m -F1.2 -s" \
|
||||
"compact:-N -A16m -c -s" \
|
||||
"nonmoving:-N -A16m -xn -s"; do
|
||||
label="${label_flags%%:*}"
|
||||
flags="${label_flags#*:}"
|
||||
echo ""
|
||||
echo "=========================================="
|
||||
echo " RTS config: $label ($flags)"
|
||||
echo "=========================================="
|
||||
reset_db
|
||||
cabal run smp-server-bench -- \
|
||||
--timeseries "bench-${label}.csv" \
|
||||
--clients "${BENCH_CLIENTS:-1000}" \
|
||||
--minutes "${BENCH_MINUTES:-2}" \
|
||||
"$@" \
|
||||
+RTS $flags -RTS
|
||||
done
|
||||
echo ""
|
||||
echo "Done. CSV files: bench-*.csv"
|
||||
else
|
||||
reset_db
|
||||
cabal run smp-server-bench -- \
|
||||
--clients "${BENCH_CLIENTS:-5000}" \
|
||||
--minutes "${BENCH_MINUTES:-5}" \
|
||||
"$@" \
|
||||
+RTS -N -A16m -s -RTS
|
||||
fi
|
||||
|
||||
docker compose down
|
||||
@@ -432,6 +432,36 @@ executable smp-server
|
||||
, text
|
||||
default-language: Haskell2010
|
||||
|
||||
executable smp-server-bench
|
||||
if flag(client_library)
|
||||
buildable: False
|
||||
if flag(server_postgres)
|
||||
cpp-options: -DdbServerPostgres
|
||||
main-is: Main.hs
|
||||
other-modules:
|
||||
ClientSim
|
||||
Report
|
||||
hs-source-dirs:
|
||||
bench
|
||||
default-extensions:
|
||||
StrictData
|
||||
ghc-options: -O2 -threaded -rtsopts
|
||||
build-depends:
|
||||
base
|
||||
, async
|
||||
, bytestring
|
||||
, containers
|
||||
, crypton
|
||||
, mtl
|
||||
, network
|
||||
, simple-logger
|
||||
, simplexmq
|
||||
, stm
|
||||
, text
|
||||
, time
|
||||
, unliftio
|
||||
default-language: Haskell2010
|
||||
|
||||
executable xftp
|
||||
if flag(client_library)
|
||||
buildable: False
|
||||
|
||||
Reference in New Issue
Block a user