mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-30 12:05:49 +00:00
server: configuration to expire inactive clients in ini file (#369)
* server: configuration to expire inactive clients in ini file * corrections Co-authored-by: JRoberts <8711996+jr-simplex@users.noreply.github.com> Co-authored-by: JRoberts <8711996+jr-simplex@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
964daf5442
commit
4e4eea34f9
@@ -42,7 +42,21 @@ ntfServerCLIConfig =
|
||||
defaultServerPort = "443",
|
||||
executableName = "ntf-server",
|
||||
serverVersion = "SMP notifications server v0.1.0",
|
||||
mkServerConfig = \_storeLogFile transports ->
|
||||
mkIniFile = \enableStoreLog defaultServerPort ->
|
||||
"[STORE_LOG]\n\
|
||||
\# The server uses STM memory for persistence,\n\
|
||||
\# that will be lost on restart (e.g., as with redis).\n\
|
||||
\# This option enables saving memory to append only log,\n\
|
||||
\# and restoring it when the server is started.\n\
|
||||
\# Log is compacted on start (deleted objects are removed).\n\
|
||||
\# The messages are not logged.\n"
|
||||
<> ("enable: " <> (if enableStoreLog then "on" else "off # on") <> "\n\n")
|
||||
<> "[TRANSPORT]\n\
|
||||
\port: "
|
||||
<> defaultServerPort
|
||||
<> "\n\
|
||||
\websockets: off\n",
|
||||
mkServerConfig = \_storeLogFile transports _ ->
|
||||
NtfServerConfig
|
||||
{ transports,
|
||||
subIdBytes = 24,
|
||||
|
||||
@@ -1,14 +1,16 @@
|
||||
{-# LANGUAGE DuplicateRecordFields #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE NumericUnderscores #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE TypeApplications #-}
|
||||
|
||||
module Main where
|
||||
|
||||
import Control.Logger.Simple
|
||||
import Simplex.Messaging.Server (runSMPServer)
|
||||
import Simplex.Messaging.Server.CLI (ServerCLIConfig (..), protocolServerCLI)
|
||||
import Simplex.Messaging.Server.CLI (ServerCLIConfig (..), protocolServerCLI, readStrictIni, strictIni)
|
||||
import Simplex.Messaging.Server.Env.STM (ServerConfig (..), defaultInactiveClientExpiration, defaultMessageExpiration)
|
||||
import Simplex.Messaging.Server.Expiration
|
||||
import Simplex.Messaging.Transport (simplexMQVersion)
|
||||
import System.FilePath (combine)
|
||||
|
||||
@@ -24,7 +26,11 @@ logCfg = LogConfig {lc_file = Nothing, lc_stderr = True}
|
||||
main :: IO ()
|
||||
main = do
|
||||
setLogLevel LogInfo
|
||||
withGlobalLogging logCfg $ protocolServerCLI smpServerCLIConfig runSMPServer
|
||||
withGlobalLogging logCfg . protocolServerCLI smpServerCLIConfig $ \cfg@ServerConfig {inactiveClientExpiration} -> do
|
||||
putStrLn $ case inactiveClientExpiration of
|
||||
Just ExpirationConfig {ttl, checkInterval} -> "expiring clients inactive for " <> show ttl <> " seconds every " <> show checkInterval <> " seconds"
|
||||
_ -> "not expiring inactive clients"
|
||||
runSMPServer cfg
|
||||
|
||||
smpServerCLIConfig :: ServerCLIConfig ServerConfig
|
||||
smpServerCLIConfig =
|
||||
@@ -44,7 +50,24 @@ smpServerCLIConfig =
|
||||
defaultServerPort = "5223",
|
||||
executableName = "smp-server",
|
||||
serverVersion = "SMP server v" <> simplexMQVersion,
|
||||
mkServerConfig = \storeLogFile transports ->
|
||||
mkIniFile = \enableStoreLog defaultServerPort ->
|
||||
"[STORE_LOG]\n\
|
||||
\# The server uses STM memory for persistence,\n\
|
||||
\# that will be lost on restart (e.g., as with redis).\n\
|
||||
\# This option enables saving memory to append only log,\n\
|
||||
\# and restoring it when the server is started.\n\
|
||||
\# Log is compacted on start (deleted objects are removed).\n\
|
||||
\# The messages are not logged.\n"
|
||||
<> ("enable: " <> (if enableStoreLog then "on" else "off # on") <> "\n\n")
|
||||
<> "[TRANSPORT]\n"
|
||||
<> ("port: " <> defaultServerPort <> "\n")
|
||||
<> "websockets: off\n\n"
|
||||
<> "[INACTIVE_CLIENTS]\n\
|
||||
\# TTL and interval to check inactive clients\n\
|
||||
\disconnect: on\n"
|
||||
<> ("ttl: " <> show (ttl defaultInactiveClientExpiration) <> "\n")
|
||||
<> ("check_interval: " <> show (checkInterval defaultInactiveClientExpiration) <> "\n"),
|
||||
mkServerConfig = \storeLogFile transports ini ->
|
||||
ServerConfig
|
||||
{ transports,
|
||||
tbqSize = 16,
|
||||
@@ -58,7 +81,15 @@ smpServerCLIConfig =
|
||||
storeLogFile,
|
||||
allowNewQueues = True,
|
||||
messageExpiration = Just defaultMessageExpiration,
|
||||
inactiveClientExpiration = Just defaultInactiveClientExpiration,
|
||||
inactiveClientExpiration =
|
||||
if strictIni "INACTIVE_CLIENTS" "disconnect" ini == "on"
|
||||
then
|
||||
Just
|
||||
ExpirationConfig
|
||||
{ ttl = readStrictIni "INACTIVE_CLIENTS" "ttl" ini,
|
||||
checkInterval = readStrictIni "INACTIVE_CLIENTS" "check_interval" ini
|
||||
}
|
||||
else Nothing,
|
||||
logStatsInterval = Just 86400, -- seconds
|
||||
logStatsStartTime = 0 -- seconds from 00:00 UTC
|
||||
}
|
||||
|
||||
@@ -131,7 +131,8 @@ smpServer started = do
|
||||
atomically $ TM.lookupDelete qId (clientSubs c)
|
||||
|
||||
expireMessagesThread_ :: ServerConfig -> [m ()]
|
||||
expireMessagesThread_ = maybe [] ((: []) . expireMessages) . messageExpiration
|
||||
expireMessagesThread_ ServerConfig {messageExpiration = Just msgExp} = [expireMessages msgExp]
|
||||
expireMessagesThread_ _ = []
|
||||
|
||||
expireMessages :: ExpirationConfig -> m ()
|
||||
expireMessages expCfg = do
|
||||
@@ -147,8 +148,9 @@ smpServer started = do
|
||||
>>= atomically . (`deleteExpiredMsgs` old)
|
||||
|
||||
serverStatsThread_ :: ServerConfig -> [m ()]
|
||||
serverStatsThread_ ServerConfig {logStatsInterval, logStatsStartTime} =
|
||||
maybe [] ((: []) . logServerStats logStatsStartTime) logStatsInterval
|
||||
serverStatsThread_ ServerConfig {logStatsInterval = Just interval, logStatsStartTime} =
|
||||
[logServerStats logStatsStartTime interval]
|
||||
serverStatsThread_ _ = []
|
||||
|
||||
logServerStats :: Int -> Int -> m ()
|
||||
logServerStats startAt logInterval = do
|
||||
@@ -186,7 +188,8 @@ runClientTransport th@THandle {sessionId} = do
|
||||
raceAny_ ([send th c, client c s, receive th c] <> disconnectThread_ c expCfg)
|
||||
`finally` clientDisconnected c
|
||||
where
|
||||
disconnectThread_ c expCfg = maybe [] ((: []) . disconnectTransport th c activeAt) expCfg
|
||||
disconnectThread_ c (Just expCfg) = [disconnectTransport th c activeAt expCfg]
|
||||
disconnectThread_ _ _ = []
|
||||
|
||||
clientDisconnected :: (MonadUnliftIO m, MonadReader Env m) => Client -> m ()
|
||||
clientDisconnected c@Client {subscriptions, connected} = do
|
||||
|
||||
@@ -43,7 +43,8 @@ data ServerCLIConfig cfg = ServerCLIConfig
|
||||
defaultServerPort :: ServiceName,
|
||||
executableName :: String,
|
||||
serverVersion :: String,
|
||||
mkServerConfig :: Maybe FilePath -> [(ServiceName, ATransport)] -> cfg
|
||||
mkIniFile :: Bool -> ServiceName -> String,
|
||||
mkServerConfig :: Maybe FilePath -> [(ServiceName, ATransport)] -> Ini -> cfg
|
||||
}
|
||||
|
||||
protocolServerCLI :: ServerCLIConfig cfg -> (cfg -> IO ()) -> IO ()
|
||||
@@ -55,7 +56,7 @@ protocolServerCLI cliCfg@ServerCLIConfig {iniFile, executableName} server =
|
||||
_ -> initializeServer cliCfg opts
|
||||
Start ->
|
||||
doesFileExist iniFile >>= \case
|
||||
True -> readIniFile iniFile >>= either exitError (runServer cliCfg server . mkIniOptions)
|
||||
True -> readIniFile iniFile >>= either exitError (runServer cliCfg server)
|
||||
_ -> exitError $ "Error: server is not initialized (" <> iniFile <> " does not exist).\nRun `" <> executableName <> " init`."
|
||||
Delete -> cleanup cliCfg >> putStrLn "Deleted configuration and log files"
|
||||
|
||||
@@ -140,12 +141,12 @@ initializeServer cliCfg InitOptions {enableStoreLog, signAlgorithm, ip, fqdn} =
|
||||
createDirectoryIfMissing True logDir
|
||||
createX509
|
||||
fp <- saveFingerprint
|
||||
createIni
|
||||
writeFile iniFile $ mkIniFile enableStoreLog defaultServerPort
|
||||
putStrLn $ "Server initialized, you can modify configuration in " <> iniFile <> ".\nRun `" <> executableName <> " start` to start server."
|
||||
printServiceInfo cliCfg fp
|
||||
warnCAPrivateKeyFile
|
||||
where
|
||||
ServerCLIConfig {cfgDir, logDir, iniFile, executableName, caKeyFile, caCrtFile, serverKeyFile, serverCrtFile, fingerprintFile, defaultServerPort} = cliCfg
|
||||
ServerCLIConfig {cfgDir, logDir, iniFile, executableName, caKeyFile, caCrtFile, serverKeyFile, serverCrtFile, fingerprintFile, defaultServerPort, mkIniFile} = cliCfg
|
||||
createX509 = do
|
||||
createOpensslCaConf
|
||||
createOpensslServerConf
|
||||
@@ -199,22 +200,6 @@ initializeServer cliCfg InitOptions {enableStoreLog, signAlgorithm, ip, fqdn} =
|
||||
withFile fingerprintFile WriteMode (`B.hPutStrLn` strEncode fp)
|
||||
pure fp
|
||||
|
||||
createIni = do
|
||||
writeFile iniFile $
|
||||
"[STORE_LOG]\n\
|
||||
\# The server uses STM memory for persistence,\n\
|
||||
\# that will be lost on restart (e.g., as with redis).\n\
|
||||
\# This option enables saving memory to append only log,\n\
|
||||
\# and restoring it when the server is started.\n\
|
||||
\# Log is compacted on start (deleted objects are removed).\n\
|
||||
\# The messages are not logged.\n"
|
||||
<> ("enable: " <> (if enableStoreLog then "on" else "off # on") <> "\n\n")
|
||||
<> "[TRANSPORT]\n\
|
||||
\port: "
|
||||
<> defaultServerPort
|
||||
<> "\n\
|
||||
\websockets: off\n"
|
||||
|
||||
warnCAPrivateKeyFile =
|
||||
putStrLn $
|
||||
"----------\n\
|
||||
@@ -231,29 +216,32 @@ data IniOptions = IniOptions
|
||||
enableWebsockets :: Bool
|
||||
}
|
||||
|
||||
-- TODO ? properly parse ini as a whole
|
||||
mkIniOptions :: Ini -> IniOptions
|
||||
mkIniOptions ini =
|
||||
IniOptions
|
||||
{ enableStoreLog = (== "on") $ strict "STORE_LOG" "enable",
|
||||
port = T.unpack $ strict "TRANSPORT" "port",
|
||||
enableWebsockets = (== "on") $ strict "TRANSPORT" "websockets"
|
||||
{ enableStoreLog = (== "on") $ strictIni "STORE_LOG" "enable" ini,
|
||||
port = T.unpack $ strictIni "TRANSPORT" "port" ini,
|
||||
enableWebsockets = (== "on") $ strictIni "TRANSPORT" "websockets" ini
|
||||
}
|
||||
where
|
||||
strict :: String -> String -> T.Text
|
||||
strict section key =
|
||||
fromRight (error ("no key " <> key <> " in section " <> section)) $
|
||||
lookupValue (T.pack section) (T.pack key) ini
|
||||
|
||||
runServer :: ServerCLIConfig cfg -> (cfg -> IO ()) -> IniOptions -> IO ()
|
||||
runServer cliCfg server IniOptions {enableStoreLog, port, enableWebsockets} = do
|
||||
strictIni :: String -> String -> Ini -> T.Text
|
||||
strictIni section key ini =
|
||||
fromRight (error ("no key " <> key <> " in section " <> section)) $
|
||||
lookupValue (T.pack section) (T.pack key) ini
|
||||
|
||||
readStrictIni :: Read a => String -> String -> Ini -> a
|
||||
readStrictIni section key = read . T.unpack . strictIni section key
|
||||
|
||||
runServer :: ServerCLIConfig cfg -> (cfg -> IO ()) -> Ini -> IO ()
|
||||
runServer cliCfg server ini = do
|
||||
hSetBuffering stdout LineBuffering
|
||||
hSetBuffering stderr LineBuffering
|
||||
fp <- checkSavedFingerprint
|
||||
printServiceInfo cliCfg fp
|
||||
let transports = (port, transport @TLS) : [("80", transport @WS) | enableWebsockets]
|
||||
let IniOptions {enableStoreLog, port, enableWebsockets} = mkIniOptions ini
|
||||
transports = (port, transport @TLS) : [("80", transport @WS) | enableWebsockets]
|
||||
logFile = if enableStoreLog then Just storeLogFile else Nothing
|
||||
cfg = mkServerConfig logFile transports
|
||||
cfg = mkServerConfig logFile transports ini
|
||||
printServerConfig logFile transports
|
||||
server cfg
|
||||
where
|
||||
|
||||
@@ -70,8 +70,8 @@ defaultMessageExpiration =
|
||||
defaultInactiveClientExpiration :: ExpirationConfig
|
||||
defaultInactiveClientExpiration =
|
||||
ExpirationConfig
|
||||
{ ttl = 7200, -- 2 hours
|
||||
checkInterval = 3600 -- seconds, 1 hour
|
||||
{ ttl = 86400, -- seconds, 24 hours
|
||||
checkInterval = 43200 -- seconds, 12 hours
|
||||
}
|
||||
|
||||
data Env = Env
|
||||
|
||||
Reference in New Issue
Block a user