Merge branch 'master' into postgres

This commit is contained in:
spaced4ndy
2024-12-20 17:18:54 +04:00
15 changed files with 577 additions and 102 deletions
+21 -20
View File
@@ -1,7 +1,7 @@
cabal-version: 1.12
name: simplexmq
version: 6.2.0.7
version: 6.2.1.0
synopsis: SimpleXMQ message broker
description: This package includes <./docs/Simplex-Messaging-Server.html server>,
<./docs/Simplex-Messaging-Client.html client> and
@@ -78,7 +78,6 @@ library
Simplex.FileTransfer.Chunks
Simplex.FileTransfer.Client
Simplex.FileTransfer.Client.Agent
Simplex.FileTransfer.Client.Main
Simplex.FileTransfer.Client.Presets
Simplex.FileTransfer.Crypto
Simplex.FileTransfer.Description
@@ -122,7 +121,6 @@ library
Simplex.Messaging.Notifications.Types
Simplex.Messaging.Parsers
Simplex.Messaging.Protocol
Simplex.Messaging.Server.CLI
Simplex.Messaging.Server.Expiration
Simplex.Messaging.Server.QueueStore.QueueInfo
Simplex.Messaging.ServiceScheme
@@ -138,7 +136,6 @@ library
Simplex.Messaging.Transport.HTTP2.Server
Simplex.Messaging.Transport.KeepAlive
Simplex.Messaging.Transport.Server
Simplex.Messaging.Transport.WebSockets
Simplex.Messaging.Util
Simplex.Messaging.Version
Simplex.Messaging.Version.Internal
@@ -201,6 +198,7 @@ library
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20241007_rcv_queues_last_broker_ts
if !flag(client_library)
exposed-modules:
Simplex.FileTransfer.Client.Main
Simplex.FileTransfer.Server
Simplex.FileTransfer.Server.Control
Simplex.FileTransfer.Server.Env
@@ -208,21 +206,6 @@ library
Simplex.FileTransfer.Server.Stats
Simplex.FileTransfer.Server.Store
Simplex.FileTransfer.Server.StoreLog
Simplex.Messaging.Server
Simplex.Messaging.Server.Control
Simplex.Messaging.Server.Env.STM
Simplex.Messaging.Server.Information
Simplex.Messaging.Server.Main
Simplex.Messaging.Server.MsgStore
Simplex.Messaging.Server.MsgStore.Journal
Simplex.Messaging.Server.MsgStore.STM
Simplex.Messaging.Server.MsgStore.Types
Simplex.Messaging.Server.NtfStore
Simplex.Messaging.Server.QueueStore
Simplex.Messaging.Server.QueueStore.STM
Simplex.Messaging.Server.Stats
Simplex.Messaging.Server.StoreLog
Simplex.Messaging.Server.StoreLog.Types
Simplex.Messaging.Notifications.Server
Simplex.Messaging.Notifications.Server.Control
Simplex.Messaging.Notifications.Server.Env
@@ -232,6 +215,24 @@ library
Simplex.Messaging.Notifications.Server.Stats
Simplex.Messaging.Notifications.Server.Store
Simplex.Messaging.Notifications.Server.StoreLog
Simplex.Messaging.Server
Simplex.Messaging.Server.CLI
Simplex.Messaging.Server.Control
Simplex.Messaging.Server.Env.STM
Simplex.Messaging.Server.Information
Simplex.Messaging.Server.Main
Simplex.Messaging.Server.MsgStore
Simplex.Messaging.Server.MsgStore.Journal
Simplex.Messaging.Server.MsgStore.STM
Simplex.Messaging.Server.MsgStore.Types
Simplex.Messaging.Server.NtfStore
Simplex.Messaging.Server.Prometheus
Simplex.Messaging.Server.QueueStore
Simplex.Messaging.Server.QueueStore.STM
Simplex.Messaging.Server.Stats
Simplex.Messaging.Server.StoreLog
Simplex.Messaging.Server.StoreLog.Types
Simplex.Messaging.Transport.WebSockets
other-modules:
Paths_simplexmq
hs-source-dirs:
@@ -290,7 +291,6 @@ library
, transformers ==0.6.*
, unliftio ==0.2.*
, unliftio-core ==0.2.*
, websockets ==0.12.*
, yaml ==0.11.*
, zstd ==0.1.3.*
default-language: Haskell2010
@@ -300,6 +300,7 @@ library
build-depends:
case-insensitive ==1.2.*
, hashable ==1.4.*
, websockets ==0.12.*
if flag(client_postgres)
build-depends:
postgresql-libpq >=0.10.0.0
+1 -2
View File
@@ -51,8 +51,7 @@ import Data.Text (Text)
import Data.Time.Clock (getCurrentTime)
import Data.Time.Format (defaultTimeLocale, formatTime)
import Simplex.FileTransfer.Chunks (toKB)
import Simplex.FileTransfer.Client (XFTPChunkSpec (..))
import Simplex.FileTransfer.Client.Main
import Simplex.FileTransfer.Client (XFTPChunkSpec (..), getChunkDigest, prepareChunkSizes, prepareChunkSpecs, singleChunkSize)
import Simplex.FileTransfer.Crypto
import Simplex.FileTransfer.Description
import Simplex.FileTransfer.Protocol (FileParty (..), SFileParty (..))
+42
View File
@@ -20,14 +20,18 @@ import Data.Bifunctor (first)
import Data.ByteString.Builder (Builder, byteString)
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import qualified Data.ByteString.Lazy as LB
import Data.Int (Int64)
import Data.List (foldl')
import Data.List.NonEmpty (NonEmpty (..))
import Data.Maybe (listToMaybe)
import Data.Time.Clock (UTCTime)
import Data.Word (Word32)
import qualified Data.X509 as X
import qualified Data.X509.Validation as XV
import qualified Network.HTTP.Types as N
import qualified Network.HTTP2.Client as H
import Simplex.FileTransfer.Chunks
import Simplex.FileTransfer.Protocol
import Simplex.FileTransfer.Transport
import Simplex.Messaging.Client
@@ -298,3 +302,41 @@ noFile HTTP2Body {bodyPart} a = case bodyPart of
-- FACK :: FileCommand Recipient
-- PING :: FileCommand Recipient
singleChunkSize :: Int64 -> Maybe Word32
singleChunkSize size' =
listToMaybe $ dropWhile (< chunkSize) serverChunkSizes
where
chunkSize = fromIntegral size'
prepareChunkSizes :: Int64 -> [Word32]
prepareChunkSizes size' = prepareSizes size'
where
(smallSize, bigSize)
| size' > size34 chunkSize3 = (chunkSize2, chunkSize3)
| size' > size34 chunkSize2 = (chunkSize1, chunkSize2)
| otherwise = (chunkSize0, chunkSize1)
size34 sz = (fromIntegral sz * 3) `div` 4
prepareSizes 0 = []
prepareSizes size
| size >= fromIntegral bigSize = replicate (fromIntegral n1) bigSize <> prepareSizes remSz
| size > size34 bigSize = [bigSize]
| otherwise = replicate (fromIntegral n2') smallSize
where
(n1, remSz) = size `divMod` fromIntegral bigSize
n2' = let (n2, remSz2) = (size `divMod` fromIntegral smallSize) in if remSz2 == 0 then n2 else n2 + 1
prepareChunkSpecs :: FilePath -> [Word32] -> [XFTPChunkSpec]
prepareChunkSpecs filePath chunkSizes = reverse . snd $ foldl' addSpec (0, []) chunkSizes
where
addSpec :: (Int64, [XFTPChunkSpec]) -> Word32 -> (Int64, [XFTPChunkSpec])
addSpec (chunkOffset, specs) sz =
let spec = XFTPChunkSpec {filePath, chunkOffset, chunkSize = sz}
in (chunkOffset + fromIntegral sz, spec : specs)
getChunkDigest :: XFTPChunkSpec -> IO ByteString
getChunkDigest XFTPChunkSpec {filePath = chunkPath, chunkOffset, chunkSize} =
withFile chunkPath ReadMode $ \h -> do
hSeek h AbsoluteSeek $ fromIntegral chunkOffset
chunk <- LB.hGet h (fromIntegral chunkSize)
pure $! LC.sha256Hash chunk
+1 -68
View File
@@ -19,11 +19,7 @@ module Simplex.FileTransfer.Client.Main
singleChunkSize,
prepareChunkSizes,
prepareChunkSpecs,
maxFileSize,
maxFileSizeHard,
fileSizeLen,
getChunkDigest,
SentRecipientReplica (..),
)
where
@@ -34,7 +30,6 @@ import Control.Monad.Trans.Except
import Crypto.Random (ChaChaDRG)
import qualified Data.Attoparsec.ByteString.Char8 as A
import Data.Bifunctor (first)
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import qualified Data.ByteString.Lazy.Char8 as LB
import Data.Char (toLower)
@@ -45,7 +40,7 @@ import Data.List.NonEmpty (NonEmpty (..), nonEmpty)
import qualified Data.List.NonEmpty as L
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Data.Maybe (fromMaybe, listToMaybe)
import Data.Maybe (fromMaybe)
import qualified Data.Text as T
import Data.Word (Word32)
import GHC.Records (HasField (getField))
@@ -80,20 +75,6 @@ import UnliftIO.Directory
xftpClientVersion :: String
xftpClientVersion = "1.0.1"
-- | Soft limit for XFTP clients. Should be checked and reported to user.
maxFileSize :: Int64
maxFileSize = gb 1
maxFileSizeStr :: String
maxFileSizeStr = B.unpack . strEncode $ FileSize maxFileSize
-- | Hard internal limit for XFTP agent after which it refuses to prepare chunks.
maxFileSizeHard :: Int64
maxFileSizeHard = gb 5
fileSizeLen :: Int64
fileSizeLen = 8
newtype CLIError = CLIError String
deriving (Eq, Show, Exception)
@@ -231,16 +212,6 @@ data SentFileChunkReplica = SentFileChunkReplica
}
deriving (Show)
data SentRecipientReplica = SentRecipientReplica
{ chunkNo :: Int,
server :: XFTPServer,
rcvNo :: Int,
replicaId :: ChunkReplicaId,
replicaKey :: C.APrivateAuthKey,
digest :: FileDigest,
chunkSize :: FileSize Word32
}
logCfg :: LogConfig
logCfg = LogConfig {lc_file = Nothing, lc_stderr = True}
@@ -414,13 +385,6 @@ cliSendFileOpts SendOptions {filePath, outputDir, numRecipients, xftpServers, re
B.writeFile fdSndPath $ strEncode fdSnd
pure (fdRcvPaths, fdSndPath)
getChunkDigest :: XFTPChunkSpec -> IO ByteString
getChunkDigest XFTPChunkSpec {filePath = chunkPath, chunkOffset, chunkSize} =
withFile chunkPath ReadMode $ \h -> do
hSeek h AbsoluteSeek $ fromIntegral chunkOffset
chunk <- LB.hGet h (fromIntegral chunkSize)
pure $! LC.sha256Hash chunk
cliReceiveFile :: ReceiveOptions -> ExceptT CLIError IO ()
cliReceiveFile ReceiveOptions {fileDescription, filePath, retryCount, tempPath, verbose, yes} =
getFileDescription' fileDescription >>= receive
@@ -536,37 +500,6 @@ getFileDescription' path =
getFileDescription path >>= \case
AVFD fd -> either (throwE . CLIError) pure $ checkParty fd
singleChunkSize :: Int64 -> Maybe Word32
singleChunkSize size' =
listToMaybe $ dropWhile (< chunkSize) serverChunkSizes
where
chunkSize = fromIntegral size'
prepareChunkSizes :: Int64 -> [Word32]
prepareChunkSizes size' = prepareSizes size'
where
(smallSize, bigSize)
| size' > size34 chunkSize3 = (chunkSize2, chunkSize3)
| size' > size34 chunkSize2 = (chunkSize1, chunkSize2)
| otherwise = (chunkSize0, chunkSize1)
size34 sz = (fromIntegral sz * 3) `div` 4
prepareSizes 0 = []
prepareSizes size
| size >= fromIntegral bigSize = replicate (fromIntegral n1) bigSize <> prepareSizes remSz
| size > size34 bigSize = [bigSize]
| otherwise = replicate (fromIntegral n2') smallSize
where
(n1, remSz) = size `divMod` fromIntegral bigSize
n2' = let (n2, remSz2) = (size `divMod` fromIntegral smallSize) in if remSz2 == 0 then n2 else n2 + 1
prepareChunkSpecs :: FilePath -> [Word32] -> [XFTPChunkSpec]
prepareChunkSpecs filePath chunkSizes = reverse . snd $ foldl' addSpec (0, []) chunkSizes
where
addSpec :: (Int64, [XFTPChunkSpec]) -> Word32 -> (Int64, [XFTPChunkSpec])
addSpec (chunkOffset, specs) sz =
let spec = XFTPChunkSpec {filePath, chunkOffset, chunkSize = sz}
in (chunkOffset + fromIntegral sz, spec : specs)
getEncPath :: MonadIO m => Maybe FilePath -> String -> m FilePath
getEncPath path name = (`uniqueCombine` (name <> ".encrypted")) =<< maybe (liftIO getCanonicalTemporaryDirectory) pure path
+19
View File
@@ -39,6 +39,10 @@ module Simplex.FileTransfer.Description
FileClientData,
fileDescriptionURI,
qrSizeLimit,
maxFileSize,
maxFileSizeStr,
maxFileSizeHard,
fileSizeLen,
)
where
@@ -273,6 +277,21 @@ instance StrEncoding FileDescriptionURI where
qrSizeLimit :: Int
qrSizeLimit = 1002 -- ~2 chunks in URLencoded YAML with some spare size for server hosts
-- | Soft limit for XFTP clients. Should be checked and reported to user.
maxFileSize :: Int64
maxFileSize = gb 1
maxFileSizeStr :: String
maxFileSizeStr = B.unpack . strEncode $ FileSize maxFileSize
-- | Hard internal limit for XFTP agent after which it refuses to prepare chunks.
maxFileSizeHard :: Int64
maxFileSizeHard = gb 5
fileSizeLen :: Int64
fileSizeLen = 8
instance (Integral a, Show a) => StrEncoding (FileSize a) where
strEncode (FileSize b)
| b' /= 0 = bshow b
+10
View File
@@ -252,6 +252,16 @@ data DeletedSndChunkReplica = DeletedSndChunkReplica
}
deriving (Show)
data SentRecipientReplica = SentRecipientReplica
{ chunkNo :: Int,
server :: XFTPServer,
rcvNo :: Int,
replicaId :: ChunkReplicaId,
replicaKey :: C.APrivateAuthKey,
digest :: FileDigest,
chunkSize :: FileSize Word32
}
data FileErrorType
= -- | cannot proceed with download from not approved relays without proxy
NOT_APPROVED
-2
View File
@@ -145,7 +145,6 @@ import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport
import Simplex.Messaging.Transport.Client (SocksAuth (..), SocksProxyWithAuth (..), TransportClientConfig (..), TransportHost (..), defaultSMPPort, defaultTcpConnectTimeout, runTransportClient)
import Simplex.Messaging.Transport.KeepAlive
import Simplex.Messaging.Transport.WebSockets (WS)
import Simplex.Messaging.Util (bshow, diffToMicroseconds, ifM, liftEitherWith, raceAny_, threadDelay', tshow, whenM)
import Simplex.Messaging.Version
import System.Mem.Weak (Weak, deRefWeak)
@@ -544,7 +543,6 @@ getProtocolClient g transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize
"" -> case protocolTypeI @(ProtoType msg) of
SPSMP | smpWebPort -> ("443", transport @TLS)
_ -> defaultTransport cfg
"80" -> ("80", transport @WS)
p -> (p, transport @TLS)
client :: forall c. Transport c => TProxy c -> PClient v err msg -> TMVar (Either (ProtocolClientError err) (ProtocolClient v err msg)) -> c -> IO ()
+57 -7
View File
@@ -71,6 +71,7 @@ import Data.Maybe (catMaybes, fromMaybe, isJust, isNothing)
import Data.Semigroup (Sum (..))
import qualified Data.Text as T
import Data.Text.Encoding (decodeLatin1)
import qualified Data.Text.IO as T
import Data.Time.Clock (UTCTime (..), diffTimeToPicoseconds, getCurrentTime)
import Data.Time.Clock.System (SystemTime (..), getSystemTime)
import Data.Time.Format.ISO8601 (iso8601Show)
@@ -98,6 +99,7 @@ import Simplex.Messaging.Server.MsgStore.Journal (JournalQueue, closeMsgQueue)
import Simplex.Messaging.Server.MsgStore.STM
import Simplex.Messaging.Server.MsgStore.Types
import Simplex.Messaging.Server.NtfStore
import Simplex.Messaging.Server.Prometheus
import Simplex.Messaging.Server.QueueStore
import Simplex.Messaging.Server.QueueStore.QueueInfo
import Simplex.Messaging.Server.QueueStore.STM
@@ -176,7 +178,11 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
: receiveFromProxyAgent pa
: expireNtfsThread cfg
: sigIntHandlerThread
: map runServer transports <> expireMessagesThread_ cfg <> serverStatsThread_ cfg <> controlPortThread_ cfg
: map runServer transports
<> expireMessagesThread_ cfg
<> serverStatsThread_ cfg
<> prometheusMetricsThread_ cfg
<> controlPortThread_ cfg
)
`finally` stopServer s
where
@@ -555,6 +561,50 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
showProxyStats ProxyStatsData {_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther} =
[show _pRequests, show _pSuccesses, show _pErrorsConnect, show _pErrorsCompat, show _pErrorsOther]
prometheusMetricsThread_ :: ServerConfig -> [M ()]
prometheusMetricsThread_ ServerConfig {prometheusInterval = Just interval, prometheusMetricsFile} =
[savePrometheusMetrics interval prometheusMetricsFile]
prometheusMetricsThread_ _ = []
savePrometheusMetrics :: Int -> FilePath -> M ()
savePrometheusMetrics saveInterval metricsFile = do
labelMyThread "savePrometheusMetrics"
liftIO $ putStrLn $ "Prometheus metrics saved every " <> show saveInterval <> " seconds to " <> metricsFile
AMS _ st <- asks msgStore
ss <- asks serverStats
env <- ask
let interval = 1000000 * saveInterval
liftIO $ forever $ do
threadDelay interval
ts <- getCurrentTime
sm <- getServerMetrics st ss
rtm <- getRealTimeMetrics env
T.writeFile metricsFile $ prometheusMetrics sm rtm ts
getServerMetrics :: STMQueueStore s => s -> ServerStats -> IO ServerMetrics
getServerMetrics st ss = do
d <- getServerStatsData ss
let ps = periodStatDataCounts $ _activeQueues d
psNtf = periodStatDataCounts $ _activeQueuesNtf d
queueCount <- M.size <$> readTVarIO (activeMsgQueues st)
notifierCount <- M.size <$> readTVarIO (notifiers' st)
pure ServerMetrics {statsData = d, activeQueueCounts = ps, activeNtfCounts = psNtf, queueCount, notifierCount}
getRealTimeMetrics :: Env -> IO RealTimeMetrics
getRealTimeMetrics Env {clients, sockets, server = Server {subscribers, notifiers, subClients, ntfSubClients}} = do
socketStats <- mapM (traverse getSocketStats) =<< readTVarIO sockets
#if MIN_VERSION_base(4,18,0)
threadsCount <- length <$> listThreads
#else
let threadsCount = 0
#endif
clientsCount <- IM.size <$> readTVarIO clients
smpSubsCount <- M.size <$> readTVarIO subscribers
smpSubClientsCount <- IM.size <$> readTVarIO subClients
ntfSubsCount <- M.size <$> readTVarIO notifiers
ntfSubClientsCount <- IM.size <$> readTVarIO ntfSubClients
pure RealTimeMetrics {socketStats, threadsCount, clientsCount, smpSubsCount, smpSubClientsCount, ntfSubsCount, ntfSubClientsCount}
runClient :: Transport c => C.APrivateSignKey -> TProxy c -> c -> M ()
runClient signKey tp h = do
kh <- asks serverIdentity
@@ -695,13 +745,13 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} attachHT
#endif
CPSockets -> withUserRole $ unliftIO u (asks sockets) >>= readTVarIO >>= mapM_ putSockets
where
putSockets (tcpPort, (accepted', closed', active')) = do
(accepted, closed, active) <- (,,) <$> readTVarIO accepted' <*> readTVarIO closed' <*> readTVarIO active'
putSockets (tcpPort, socketsState) = do
ss <- getSocketStats socketsState
hPutStrLn h $ "Sockets for port " <> tcpPort <> ":"
hPutStrLn h $ "accepted: " <> show accepted
hPutStrLn h $ "closed: " <> show closed
hPutStrLn h $ "active: " <> show (IM.size active)
hPutStrLn h $ "leaked: " <> show (accepted - closed - IM.size active)
hPutStrLn h $ "accepted: " <> show (socketsAccepted ss)
hPutStrLn h $ "closed: " <> show (socketsClosed ss)
hPutStrLn h $ "active: " <> show (socketsActive ss)
hPutStrLn h $ "leaked: " <> show (socketsLeaked ss)
CPSocketThreads -> withAdminRole $ do
#if MIN_VERSION_base(4,18,0)
unliftIO u (asks sockets) >>= readTVarIO >>= mapM_ putSocketThreads
+3
View File
@@ -96,6 +96,9 @@ data ServerConfig = ServerConfig
serverStatsLogFile :: FilePath,
-- | file to save and restore stats
serverStatsBackupFile :: Maybe FilePath,
-- | interval and file to save prometheus metrics
prometheusInterval :: Maybe Int,
prometheusMetricsFile :: FilePath,
-- | notification delivery interval
ntfDeliveryInterval :: Int,
-- | interval between sending pending END events to unsubscribed clients, seconds
+5 -1
View File
@@ -253,7 +253,9 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
<> ("expire_ntfs_hours: " <> tshow defNtfExpirationHours <> "\n\n")
<> "# Log daily server statistics to CSV file\n"
<> ("log_stats: " <> onOff logStats <> "\n\n")
<> "[AUTH]\n\
<> "# Log interval for real-time Prometheus metrics\n\
\# prometheus_interval: 300\n\n\
\[AUTH]\n\
\# Set new_queues option to off to completely prohibit creating new messaging queues.\n\
\# This can be useful when you want to decommission the server, but not all connections are switched yet.\n\
\new_queues: on\n\n\
@@ -431,6 +433,8 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
logStatsStartTime = 0, -- seconds from 00:00 UTC
serverStatsLogFile = combine logPath "smp-server-stats.daily.log",
serverStatsBackupFile = logStats $> combine logPath "smp-server-stats.log",
prometheusInterval = eitherToMaybe $ read . T.unpack <$> lookupValue "STORE_LOG" "prometheus_interval" ini,
prometheusMetricsFile = combine logPath "smp-server-metrics.txt",
pendingENDInterval = 15000000, -- 15 seconds
ntfDeliveryInterval = 3000000, -- 3 seconds
smpServerVRange = supportedServerSMPRelayVRange,
+385
View File
@@ -0,0 +1,385 @@
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TypeApplications #-}
{-# OPTIONS_GHC -fno-warn-unrecognised-pragmas #-}
module Simplex.Messaging.Server.Prometheus where
import Data.Int (Int64)
import Data.Text (Text)
import qualified Data.Text as T
import Data.Time.Clock (UTCTime (..), diffUTCTime)
import Data.Time.Clock.System (systemEpochDay)
import Data.Time.Format.ISO8601 (iso8601Show)
import Network.Socket (ServiceName)
import Simplex.Messaging.Server.Stats
data ServerMetrics = ServerMetrics
{ statsData :: ServerStatsData,
activeQueueCounts :: PeriodStatCounts,
activeNtfCounts :: PeriodStatCounts,
queueCount :: Int,
notifierCount :: Int
}
data RealTimeMetrics = RealTimeMetrics
{ socketStats :: [(ServiceName, SocketStats)],
threadsCount :: Int,
clientsCount :: Int,
smpSubsCount :: Int,
smpSubClientsCount :: Int,
ntfSubsCount :: Int,
ntfSubClientsCount :: Int
}
data SocketStats = SocketStats
{ socketsAccepted :: Int,
socketsClosed :: Int,
socketsActive :: Int,
socketsLeaked :: Int
}
{-# FOURMOLU_DISABLE\n#-}
prometheusMetrics :: ServerMetrics -> RealTimeMetrics -> UTCTime -> Text
prometheusMetrics sm rtm ts =
time <> queues <> subscriptions <> messages <> ntfMessages <> ntfs <> relays <> info
where
ServerMetrics {statsData, activeQueueCounts = ps, activeNtfCounts = psNtf, queueCount, notifierCount} = sm
RealTimeMetrics
{ socketStats,
threadsCount,
clientsCount,
smpSubsCount,
smpSubClientsCount,
ntfSubsCount,
ntfSubClientsCount
} = rtm
ServerStatsData
{ _fromTime,
_qCreated,
_qSecured,
_qDeletedAll,
_qDeletedAllB,
_qDeletedNew,
_qDeletedSecured,
_qSub,
_qSubAllB,
_qSubAuth,
_qSubDuplicate,
_qSubProhibited,
_qSubEnd,
_qSubEndB,
_ntfCreated,
_ntfDeleted,
_ntfDeletedB,
_ntfSub,
_ntfSubB,
_ntfSubAuth,
_ntfSubDuplicate,
_msgSent,
_msgSentAuth,
_msgSentQuota,
_msgSentLarge,
_msgRecv,
_msgRecvGet,
_msgGet,
_msgGetNoMsg,
_msgGetAuth,
_msgGetDuplicate,
_msgGetProhibited,
_msgExpired,
_activeQueues,
_msgSentNtf,
_msgRecvNtf,
_activeQueuesNtf,
_msgNtfs,
_msgNtfsB,
_msgNtfNoSub,
_msgNtfLost,
_msgNtfExpired,
_pRelays,
_pRelaysOwn,
_pMsgFwds,
_pMsgFwdsOwn,
_pMsgFwdsRecv,
_qCount,
_msgCount,
_ntfCount
} = statsData
time =
"# Recorded at: " <> T.pack (iso8601Show ts) <> "\n\
\# Stats from: " <> T.pack (iso8601Show _fromTime) <> "\n\
\\n"
queues =
"# Queues\n\
\# ------\n\
\\n\
\# HELP simplex_smp_queues_created Created queues\n\
\# TYPE simplex_smp_queues_created counter\n\
\simplex_smp_queues_created " <> mshow _qCreated <> "\n# qCreated\n\
\\n\
\# HELP simplex_smp_queues_secured Secured queues\n\
\# TYPE simplex_smp_queues_secured counter\n\
\simplex_smp_queues_secured " <> mshow _qSecured <> "\n# qSecured\n\
\\n\
\# HELP simplex_smp_queues_deleted Deleted queues\n\
\# TYPE simplex_smp_queues_deleted counter\n\
\simplex_smp_queues_deleted{type=\"all\"} " <> mshow _qDeletedAll <> "\n# qDeleted\n\
\simplex_smp_queues_deleted{type=\"new\"} " <> mshow _qDeletedNew <> "\n# qDeletedNew\n\
\simplex_smp_queues_deleted{type=\"secured\"} " <> mshow _qDeletedSecured <> "\n# qDeletedSecured\n\
\\n\
\# HELP simplex_smp_queues_deleted_batch Batched requests to delete queues\n\
\# TYPE simplex_smp_queues_deleted_batch counter\n\
\simplex_smp_queues_deleted_batch " <> mshow _qDeletedAllB <> "\n# qDeletedAllB\n\
\\n\
\# HELP simplex_smp_queues_total1 Total number of stored queues (first type of count).\n\
\# TYPE simplex_smp_queues_total1 gauge\n\
\simplex_smp_queues_total1 " <> mshow _qCount <> "\n# qCount\n\
\\n\
\# HELP simplex_smp_queues_total2 Total number of stored queues (second type of count).\n\
\# TYPE simplex_smp_queues_total2 gauge\n\
\simplex_smp_queues_total2 " <> mshow queueCount <> "\n# qCount2\n\
\\n\
\# HELP simplex_smp_queues_daily Daily active queues.\n\
\# TYPE simplex_smp_queues_daily gauge\n\
\simplex_smp_queues_daily " <> mstr (dayCount ps) <> "\n# dayMsgQueues\n\
\\n\
\# HELP simplex_smp_queues_weekly Weekly active queues.\n\
\# TYPE simplex_smp_queues_weekly gauge\n\
\simplex_smp_queues_weekly " <> mstr (weekCount ps) <> "\n# weekMsgQueues\n\
\\n\
\# HELP simplex_smp_queues_monthly Monthly active queues.\n\
\# TYPE simplex_smp_queues_monthly gauge\n\
\simplex_smp_queues_monthly " <> mstr (monthCount ps) <> "\n# monthMsgQueues\n\
\\n\
\# HELP simplex_smp_queues_notify_daily Daily active queues with notifications.\n\
\# TYPE simplex_smp_queues_notify_daily gauge\n\
\simplex_smp_queues_notify_daily " <> mstr (dayCount psNtf) <> "\n# dayCountNtf\n\
\\n\
\# HELP simplex_smp_queues_notify_weekly Weekly active queues with notifications.\n\
\# TYPE simplex_smp_queues_notify_weekly gauge\n\
\simplex_smp_queues_notify_weekly " <> mstr (weekCount psNtf) <> "\n# weekCountNtf\n\
\\n\
\# HELP simplex_smp_queues_notify_monthly Monthly active queues with notifications.\n\
\# TYPE simplex_smp_queues_notify_monthly gauge\n\
\simplex_smp_queues_notify_monthly " <> mstr (monthCount psNtf) <> "\n# monthCountNtf\n\
\\n"
subscriptions =
"# Subscriptions\n\
\# -------------\n\
\\n\
\# HELP simplex_smp_subscribtion_successes Successful subscriptions.\n\
\# TYPE simplex_smp_subscribtion_successes counter\n\
\simplex_smp_subscribtion_successes " <> mshow _qSub <> "\n# qSub\n\
\\n\
\# HELP simplex_smp_subscribtion_successes_batch Batched successful subscriptions.\n\
\# TYPE simplex_smp_subscribtion_successes_batch counter\n\
\simplex_smp_subscribtion_successes_batch " <> mshow _qSubAllB <> "\n# qSubAllB\n\
\\n\
\# HELP simplex_smp_subscribtion_end Ended subscriptions.\n\
\# TYPE simplex_smp_subscribtion_end counter\n\
\simplex_smp_subscribtion_end " <> mshow _qSubEnd <> "\n# qSubEnd\n\
\\n\
\# HELP simplex_smp_subscribtion_end_batch Batched ended subscriptions.\n\
\# TYPE simplex_smp_subscribtion_end_batch counter\n\
\simplex_smp_subscribtion_end_batch " <> mshow _qSubEndB <> "\n# qSubEndB\n\
\\n\
\# HELP simplex_smp_subscribtion_errors Subscription errors.\n\
\# TYPE simplex_smp_subscribtion_errors counter\n\
\simplex_smp_subscribtion_errors{type=\"auth\"} " <> mshow _qSubAuth <> "\n# qSubAuth\n\
\simplex_smp_subscribtion_errors{type=\"duplicate\"} " <> mshow _qSubDuplicate <> "\n# qSubDuplicate\n\
\simplex_smp_subscribtion_errors{type=\"prohibited\"} " <> mshow _qSubProhibited <> "\n# qSubProhibited\n\
\\n"
messages =
"# Messages\n\
\# --------\n\
\\n\
\# HELP simplex_smp_messages_sent Sent messages.\n\
\# TYPE simplex_smp_messages_sent counter\n\
\simplex_smp_messages_sent " <> mshow _msgSent <> "\n# msgSent\n\
\\n\
\# HELP simplex_smp_messages_sent_errors Total number of messages errors by type.\n\
\# TYPE simplex_smp_messages_sent_errors counter\n\
\simplex_smp_messages_sent_errors{type=\"auth\"} " <> mshow _msgSentAuth <> "\n# msgSentAuth\n\
\simplex_smp_messages_sent_errors{type=\"quota\"} " <> mshow _msgSentQuota <> "\n# msgSentQuota\n\
\simplex_smp_messages_sent_errors{type=\"large\"} " <> mshow _msgSentLarge <> "\n# msgSentLarge\n\
\\n\
\# HELP simplex_smp_messages_received Received messages.\n\
\# TYPE simplex_smp_messages_received counter\n\
\simplex_smp_messages_received " <> mshow _msgRecv <> "\n# msgRecv\n\
\\n\
\# HELP simplex_smp_messages_expired Expired messages.\n\
\# TYPE simplex_smp_messages_expired counter\n\
\simplex_smp_messages_expired " <> mshow _msgExpired <> "\n# msgExpired\n\
\\n\
\# HELP simplex_smp_messages_total Total number of messages stored.\n\
\# TYPE simplex_smp_messages_total gauge\n\
\simplex_smp_messages_total " <> mshow _msgCount <> "\n# msgCount\n\
\\n"
ntfMessages =
"# Notification messages (client)\n\
\# ------------------------------\n\
\\n\
\# HELP simplex_smp_messages_notify_sent Sent messages with notification flag (cleint).\n\
\# TYPE simplex_smp_messages_notify_sent counter\n\
\simplex_smp_messages_notify_sent " <> mshow _msgSentNtf <> "\n# msgSentNtf\n\
\\n\
\# HELP simplex_smp_messages_notify_received Received messages with notification flag (client).\n\
\# TYPE simplex_smp_messages_notify_received counter\n\
\simplex_smp_messages_notify_received " <> mshow _msgRecvNtf <> "\n# msgRecvNtf\n\
\\n\
\# HELP simplex_smp_messages_notify_get_sent Requests to get messages with notification flag (client).\n\
\# TYPE simplex_smp_messages_notify_get_sent counter\n\
\simplex_smp_messages_notify_get_sent " <> mshow _msgGet <> "\n# msgGet\n\
\\n\
\# HELP simplex_smp_messages_notify_get_received Succesfully received get requests messages with notification flag (client).\n\
\# TYPE simplex_smp_messages_notify_get_received counter\n\
\simplex_smp_messages_notify_get_received " <> mshow _msgRecvGet <> "\n# msgRecvGet\n\
\\n\
\# HELP simplex_smp_messages_notify_get_errors Error events with messages with notification flag (client). \n\
\# TYPE simplex_smp_messages_notify_get_errors counter\n\
\simplex_smp_messages_notify_get_errors{type=\"nomsg\"} " <> mshow _msgGetNoMsg <> "\n# msgGetNoMsg\n\
\simplex_smp_messages_notify_get_errors{type=\"auth\"} " <> mshow _msgGetAuth <> "\n# msgGetAuth\n\
\simplex_smp_messages_notify_get_errors{type=\"duplicate\"} " <> mshow _msgGetDuplicate <> "\n# msgGetDuplicate\n\
\simplex_smp_messages_notify_get_errors{type=\"prohibited\"} " <> mshow _msgGetProhibited <> "\n# msgGetProhibited\n\
\\n\
\# HELP simplex_smp_queues_notify_created Created queues with notification flag (client).\n\
\# TYPE simplex_smp_queues_notify_created counter\n\
\simplex_smp_queues_notify_created " <> mshow _ntfCreated <> "\n# ntfCreated\n\
\\n\
\# HELP simplex_smp_queues_notify_deleted Deleted queues with notification flag (client).\n\
\# TYPE simplex_smp_queues_notify_deleted counter\n\
\simplex_smp_queues_notify_deleted " <> mshow _ntfDeleted <> "\n# ntfDeleted\n\
\\n\
\# HELP simplex_smp_queues_notify_deleted_batch Deleted batched queues with notification flag (client).\n\
\# TYPE simplex_smp_queues_notify_deleted_batch counter\n\
\simplex_smp_queues_notify_deleted_batch " <> mshow _ntfDeletedB <> "\n# ntfDeletedB\n\
\\n\
\# HELP simplex_smp_queues_notify_total1 Total number of stored queues with notification flag (first type of count).\n\
\# TYPE simplex_smp_queues_notify_total1 gauge\n\
\simplex_smp_queues_notify_total1 " <> mshow _ntfCount <> "\n# ntfCount1\n\
\\n\
\# HELP simplex_smp_queues_notify_total2 Total number of stored queues with notification flag (second type of count).\n\
\# TYPE simplex_smp_queues_notify_total2 gauge\n\
\simplex_smp_queues_notify_total2 " <> mshow notifierCount <> "\n# ntfCount2\n\
\\n"
ntfs =
"# Notifications (server)\n\
\# ----------------------\n\
\\n\
\# HELP simplex_smp_messages_ntf_successes Successful events with notification messages (to ntf server). \n\
\# TYPE simplex_smp_messages_ntf_successes counter\n\
\simplex_smp_messages_ntf_successes " <> mshow _msgNtfs <> "\n# msgNtfs\n\
\\n\
\# HELP simplex_smp_messages_ntf_successes_batch Successful batched events with notification messages (to ntf server). \n\
\# TYPE simplex_smp_messages_ntf_successes_batch counter\n\
\simplex_smp_messages_ntf_successes_batch " <> mshow _msgNtfsB <> "\n# msgNtfsB\n\
\\n\
\# HELP simplex_smp_messages_ntf_errors Error events with notification messages (to ntf server). \n\
\# TYPE simplex_smp_messages_ntf_errors counter\n\
\simplex_smp_messages_ntf_errors{type=\"nosub\"} " <> mshow _msgNtfNoSub <> "\n# msgNtfNoSub\n\
\simplex_smp_messages_ntf_errors{type=\"lost\"} " <> mshow _msgNtfLost <> "\n# msgNtfLost\n\
\simplex_smp_messages_ntf_errors{type=\"expired\"} " <> mshow _msgNtfExpired <> "\n# msgNtfExpired\n\
\\n\
\# HELP simplex_smp_subscription_ntf_requests Subscription requests with notification flag (from ntf server). \n\
\# TYPE simplex_smp_subscription_ntf_requests counter\n\
\simplex_smp_subscription_ntf_requests " <> mshow _ntfSub <> "\n# ntfSub\n\
\\n\
\# HELP simplex_smp_subscription_ntf_requests_batch Batched subscription requests with notification flag (from ntf server). \n\
\# TYPE simplex_smp_subscription_ntf_requests_batch counter\n\
\simplex_smp_subscription_ntf_requests_batch " <> mshow _ntfSubB <> "\n# ntfSubB\n\
\\n\
\# HELP simplex_smp_subscribtion_ntf_errors Subscription errors with notification flag (from ntf server). \n\
\# TYPE simplex_smp_subscribtion_ntf_errors counter\n\
\simplex_smp_subscribtion_ntf_errors{type=\"auth\"} " <> mshow _ntfSubAuth <> "\n# ntfSubAuth\n\
\simplex_smp_subscribtion_ntf_errors{type=\"duplicate\"} " <> mshow _ntfSubDuplicate <> "\n# ntfSubDuplicate\n\
\\n"
relays =
"# Relays\n\
\# ------\n\
\\n\
\# HELP simplex_smp_relay_sessions_requests Session requests through relay.\n\
\# TYPE simplex_smp_relay_sessions_requests counter\n\
\simplex_smp_relay_sessions_requests{source=\"all\"} " <> mshow (_pRequests _pRelays) <> "\n# pRelays_pRequests\n\
\simplex_smp_relay_sessions_requests{source=\"own\"} " <> mshow (_pRequests _pRelaysOwn) <> "\n# pRelaysOwn_pRequests\n\
\\n\
\# HELP simplex_smp_relay_sessions_successes Successful session events through relay.\n\
\# TYPE simplex_smp_relay_sessions_successes counter\n\
\simplex_smp_relay_sessions_successes{source=\"all\"} " <> mshow (_pSuccesses _pRelays) <> "\n# pRelays_pSuccesses\n\
\simplex_smp_relay_sessions_successes{source=\"own\"} " <> mshow (_pSuccesses _pRelaysOwn) <> "\n# pRelaysOwn_pSuccesses\n\
\\n\
\# HELP simplex_smp_relay_sessions_errors Error session events through relay.\n\
\# TYPE simplex_smp_relay_sessions_errors counter\n\
\simplex_smp_relay_sessions_errors{source=\"all\",type=\"connect\"} " <> mshow (_pErrorsConnect _pRelays) <> "\n# pRelays_pErrorsConnect\n\
\simplex_smp_relay_sessions_errors{source=\"all\",type=\"compat\"} " <> mshow (_pErrorsCompat _pRelays) <> "\n# pRelays_pErrorsCompat\n\
\simplex_smp_relay_sessions_errors{source=\"all\",type=\"other\"} " <> mshow (_pErrorsOther _pRelays) <> "\n# pRelays_pErrorsOther\n\
\simplex_smp_relay_sessions_errors{source=\"own\",type=\"connect\"} " <> mshow (_pErrorsConnect _pRelaysOwn) <> "\n# pRelaysOwn_pErrorsConnect\n\
\simplex_smp_relay_sessions_errors{source=\"own\",type=\"compat\"} " <> mshow (_pErrorsCompat _pRelaysOwn) <> "\n# pRelaysOwn_pErrorsCompat\n\
\simplex_smp_relay_sessions_errors{source=\"own\",type=\"other\"} " <> mshow (_pErrorsOther _pRelaysOwn) <> "\n# pRelaysOwn_pErrorsOther\n\
\\n\
\# HELP simplex_smp_relay_messages_requests Message requests sent through relay.\n\
\# TYPE simplex_smp_relay_messages_requests counter\n\
\simplex_smp_relay_messages_requests{source=\"all\"} " <> mshow (_pRequests _pMsgFwds) <> "\n# pMsgFwds_pRequests\n\
\simplex_smp_relay_messages_requests{source=\"own\"} " <> mshow (_pRequests _pMsgFwdsOwn) <> "\n# pMsgFwdsOwn_pRequests\n\
\\n\
\# HELP simplex_smp_relay_messages_successes Successful messages sent through relay.\n\
\# TYPE simplex_smp_relay_messages_successes counter\n\
\simplex_smp_relay_messages_successes{source=\"all\"} " <> mshow (_pSuccesses _pMsgFwds) <> "\n# pMsgFwds_pSuccesses\n\
\simplex_smp_relay_messages_successes{source=\"own\"} " <> mshow (_pSuccesses _pMsgFwdsOwn) <> "\n# pMsgFwdsOwn_pSuccesses\n\
\\n\
\# HELP simplex_smp_relay_messages_errors Error events with messages sent through relay.\n\
\# TYPE simplex_smp_relay_messages_errors counter\n\
\simplex_smp_relay_messages_errors{source=\"all\",type=\"connect\"} " <> mshow (_pErrorsConnect _pMsgFwds) <> "\n# pMsgFwds_pErrorsConnect\n\
\simplex_smp_relay_messages_errors{source=\"all\",type=\"compat\"} " <> mshow (_pErrorsCompat _pMsgFwds) <> "\n# pMsgFwds_pErrorsCompat\n\
\simplex_smp_relay_messages_errors{source=\"all\",type=\"other\"} " <> mshow (_pErrorsOther _pMsgFwds) <> "\n# pMsgFwds_pErrorsOther\n\
\simplex_smp_relay_messages_errors{source=\"own\",type=\"connect\"} " <> mshow (_pErrorsConnect _pMsgFwdsOwn) <> "\n# pMsgFwdsOwn_pErrorsConnect\n\
\simplex_smp_relay_messages_errors{source=\"own\",type=\"compat\"} " <> mshow (_pErrorsCompat _pMsgFwdsOwn) <> "\n# pMsgFwdsOwn_pErrorsCompat\n\
\simplex_smp_relay_messages_errors{source=\"own\",type=\"other\"} " <> mshow (_pErrorsOther _pMsgFwdsOwn) <> "\n# pMsgFwdsOwn_pErrorsOther\n\
\\n\
\# HELP simplex_smp_relay_messages_received Relay messages statistics.\n\
\# TYPE simplex_smp_relay_messages_received counter\n\
\simplex_smp_relay_messages_received " <> mshow _pMsgFwdsRecv <> "\n# pMsgFwdsRecv\n\
\\n"
info =
"# Info\n\
\# ----\n\
\\n"
<> socketsMetric socketsAccepted "simplex_smp_sockets_accepted" "Accepted sockets"
<> socketsMetric socketsClosed "simplex_smp_sockets_closed" "Closed sockets"
<> socketsMetric socketsActive "simplex_smp_sockets_active" "Active sockets"
<> socketsMetric socketsLeaked "simplex_smp_sockets_leaked" "Leaked sockets"
<> "# HELP simplex_smp_threads_total Threads\n\
\# TYPE simplex_smp_threads_total gauge\n\
\simplex_smp_threads_total " <> mshow threadsCount <> "\n\
\\n\
\# HELP simplex_smp_clients_total Clients\n\
\# TYPE simplex_smp_clients_total gauge\n\
\simplex_smp_clients_total " <> mshow clientsCount <> "\n\
\\n\
\# HELP simplex_smp_subscribtion_total Total subscriptions\n\
\# TYPE simplex_smp_subscribtion_total gauge\n\
\simplex_smp_subscribtion_total " <> mshow smpSubsCount <> "\n# smpSubs\n\
\\n\
\# HELP simplex_smp_subscribtion_clients_total Subscribed clients, first counting method\n\
\# TYPE simplex_smp_subscribtion_clients_total gauge\n\
\simplex_smp_subscribtion_clients_total " <> mshow smpSubClientsCount <> "\n# smpSubClients\n\
\\n\
\# HELP simplex_smp_subscription_ntf_total Total notification subscripbtions (from ntf server)\n\
\# TYPE simplex_smp_subscription_ntf_total gauge\n\
\simplex_smp_subscription_ntf_total " <> mshow ntfSubsCount <> "\n# ntfSubs\n\
\\n\
\# HELP simplex_smp_subscription_ntf_clients_total Total subscribed NTF servers, first counting method\n\
\# TYPE simplex_smp_subscription_ntf_clients_total gauge\n\
\simplex_smp_subscription_ntf_clients_total " <> mshow ntfSubClientsCount <> "\n# ntfSubClients\n"
socketsMetric :: (SocketStats -> Int) -> Text -> Text -> Text
socketsMetric sel metric descr =
"# HELP " <> metric <> " " <> descr <> "\n"
<> "# TYPE " <> metric <> " gauge\n"
<> T.concat (map (\(port, ss) -> metric <> "{port=\"" <> T.pack port <> "\"} " <> mshow (sel ss) <> "\n") socketStats)
<> "\n"
mstr a = T.pack a <> " " <> tsEpoch
mshow :: Show a => a -> Text
mshow = mstr . show
tsEpoch = T.pack $ show @Int64 $ floor @Double $ realToFrac (ts `diffUTCTime` epoch) * 1000
epoch = UTCTime systemEpochDay 0
{-# FOURMOLU_ENABLE\n#-}
+8
View File
@@ -638,6 +638,14 @@ data PeriodStatCounts = PeriodStatCounts
monthCount :: String
}
periodStatDataCounts :: PeriodStatsData -> PeriodStatCounts
periodStatDataCounts PeriodStatsData {_day, _week, _month} =
PeriodStatCounts
{ dayCount = show $ IS.size _day,
weekCount = show $ IS.size _week,
monthCount = show $ IS.size _month
}
periodStatCounts :: PeriodStats -> UTCTime -> IO PeriodStatCounts
periodStatCounts ps ts = do
let d = utctDay ts
+10
View File
@@ -13,6 +13,7 @@ module Simplex.Messaging.Transport.Server
runTransportServerState_,
SocketState,
newSocketState,
getSocketStats,
runTransportServer,
runTransportServerSocket,
runLocalTCPServer,
@@ -43,6 +44,7 @@ import Foreign.C.Error
import GHC.IO.Exception (ioe_errno)
import Network.Socket
import qualified Network.TLS as T
import Simplex.Messaging.Server.Prometheus
import Simplex.Messaging.Transport
import Simplex.Messaging.Util (catchAll_, labelMyThread, tshow)
import System.Exit (exitFailure)
@@ -166,6 +168,14 @@ type SocketState = (TVar Int, TVar Int, TVar (IntMap (Weak ThreadId)))
newSocketState :: IO SocketState
newSocketState = (,,) <$> newTVarIO 0 <*> newTVarIO 0 <*> newTVarIO mempty
getSocketStats :: SocketState -> IO SocketStats
getSocketStats (accepted, closed, active) = do
socketsAccepted <- readTVarIO accepted
socketsClosed <- readTVarIO closed
socketsActive <- IM.size <$> readTVarIO active
let socketsLeaked = socketsAccepted - socketsClosed - socketsActive
pure SocketStats {socketsAccepted, socketsClosed, socketsActive, socketsLeaked}
closeServer :: TMVar Bool -> TVar (IntMap (Weak ThreadId)) -> Socket -> IO ()
closeServer started clients sock = do
close sock
+6 -1
View File
@@ -79,6 +79,9 @@ testStoreNtfsFile = "tests/tmp/smp-server-ntfs.log"
testStoreNtfsFile2 :: FilePath
testStoreNtfsFile2 = "tests/tmp/smp-server-ntfs.log.2"
testPrometheusMetricsFile :: FilePath
testPrometheusMetricsFile = "tests/tmp/smp-server-metrics.txt"
testServerStatsBackupFile :: FilePath
testServerStatsBackupFile = "tests/tmp/smp-server-stats.log"
@@ -141,8 +144,10 @@ cfgMS msType =
inactiveClientExpiration = Just defaultInactiveClientExpiration,
logStatsInterval = Nothing,
logStatsStartTime = 0,
serverStatsLogFile = "tests/smp-server-stats.daily.log",
serverStatsLogFile = "tests/tmp/smp-server-stats.daily.log",
serverStatsBackupFile = Nothing,
prometheusInterval = Nothing,
prometheusMetricsFile = testPrometheusMetricsFile,
pendingENDInterval = 500000,
ntfDeliveryInterval = 200000,
smpCredentials =
+9 -1
View File
@@ -45,7 +45,7 @@ import Simplex.Messaging.Server.StoreLog (closeStoreLog)
import Simplex.Messaging.Transport
import Simplex.Messaging.Util (whenM)
import Simplex.Messaging.Version (mkVersionRange)
import System.Directory (doesDirectoryExist, removeDirectoryRecursive, removeFile)
import System.Directory (doesDirectoryExist, doesFileExist, removeDirectoryRecursive, removeFile)
import System.TimeIt (timeItT)
import System.Timeout
import Test.HUnit
@@ -71,6 +71,7 @@ serverTests = do
describe "Store log" testWithStoreLog
describe "Restore messages" testRestoreMessages
describe "Restore messages (old / v2)" testRestoreExpireMessages
describe "Save prometheus metrics" testPrometheusMetrics
describe "Timing of AUTH error" testTiming
describe "Message notifications" testMessageNotifications
describe "Message expiration" $ do
@@ -825,6 +826,13 @@ testRestoreExpireMessages =
runClient :: Transport c => TProxy c -> (THandleSMP c 'TClient -> IO ()) -> Expectation
runClient _ test' = testSMPClient test' `shouldReturn` ()
testPrometheusMetrics :: SpecWith (ATransport, AMSType)
testPrometheusMetrics =
it "should save Prometheus metrics" $ \(at, msType) -> do
let cfg' = (cfgMS msType) {prometheusInterval = Just 1}
withSmpServerConfigOn at cfg' testPort $ \_ -> threadDelay 1000000
doesFileExist testPrometheusMetricsFile `shouldReturn` True
createAndSecureQueue :: Transport c => THandleSMP c 'TClient -> SndPublicAuthKey -> IO (SenderId, RecipientId, RcvPrivateAuthKey, RcvDhSecret)
createAndSecureQueue h sPub = do
g <- C.newRandom