mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-13 23:03:15 +00:00
servers: control port session improvements (#1591)
* servers: prohibit changing role during control port session * quota for blocked queues * allow disabling blocking and quota * fix test * fix INI file
This commit is contained in:
@@ -54,7 +54,7 @@ import qualified Simplex.Messaging.Crypto.Lazy as LC
|
||||
import Simplex.Messaging.Encoding
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Protocol (BlockingInfo, EntityId (..), RcvPublicAuthKey, RcvPublicDhKey, RecipientId, SignedTransmission, pattern NoEntity)
|
||||
import Simplex.Messaging.Server (dummyVerifyCmd, verifyCmdAuthorization)
|
||||
import Simplex.Messaging.Server (controlPortAuth, dummyVerifyCmd, verifyCmdAuthorization)
|
||||
import Simplex.Messaging.Server.Control (CPClientRole (..))
|
||||
import Simplex.Messaging.Server.Expiration
|
||||
import Simplex.Messaging.Server.QueueStore (RoundedSystemTime, ServerEntityStatus (..), getRoundedSystemTime)
|
||||
@@ -277,12 +277,9 @@ xftpServer cfg@XFTPServerConfig {xftpPort, transportConfig, inactiveClientExpira
|
||||
CPSkip -> False
|
||||
_ -> True
|
||||
processCP h role = \case
|
||||
CPAuth auth -> atomically $ writeTVar role $! newRole cfg
|
||||
CPAuth auth -> controlPortAuth h user admin role auth
|
||||
where
|
||||
newRole XFTPServerConfig {controlPortUserAuth = user, controlPortAdminAuth = admin}
|
||||
| Just auth == admin = CPRAdmin
|
||||
| Just auth == user = CPRUser
|
||||
| otherwise = CPRNone
|
||||
XFTPServerConfig {controlPortUserAuth = user, controlPortAdminAuth = admin} = cfg
|
||||
CPStatsRTS -> E.tryAny getRTSStats >>= either (hPrint h) (hPrint h)
|
||||
CPDelete fileId -> withUserRole $ unliftIO u $ do
|
||||
fs <- asks store
|
||||
|
||||
@@ -364,12 +364,9 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg, startOptions}
|
||||
CPSkip -> False
|
||||
_ -> True
|
||||
processCP h role = \case
|
||||
CPAuth auth -> atomically $ writeTVar role $! newRole cfg
|
||||
CPAuth auth -> controlPortAuth h user admin role auth
|
||||
where
|
||||
newRole NtfServerConfig {controlPortUserAuth = user, controlPortAdminAuth = admin}
|
||||
| Just auth == admin = CPRAdmin
|
||||
| Just auth == user = CPRUser
|
||||
| otherwise = CPRNone
|
||||
NtfServerConfig {controlPortUserAuth = user, controlPortAdminAuth = admin} = cfg
|
||||
CPStats -> withUserRole $ do
|
||||
ss <- unliftIO u $ asks serverStats
|
||||
let getStat :: (NtfServerStats -> IORef a) -> IO a
|
||||
|
||||
@@ -31,6 +31,7 @@
|
||||
module Simplex.Messaging.Server
|
||||
( runSMPServer,
|
||||
runSMPServerBlocking,
|
||||
controlPortAuth,
|
||||
importMessages,
|
||||
exportMessages,
|
||||
printMessageStats,
|
||||
@@ -558,6 +559,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
|
||||
msgNtfNoSub' <- atomicSwapIORef (msgNtfNoSub ss) 0
|
||||
msgNtfLost' <- atomicSwapIORef (msgNtfLost ss) 0
|
||||
msgNtfExpired' <- atomicSwapIORef (msgNtfExpired ss) 0
|
||||
_qBlocked <- atomicSwapIORef (qBlocked ss) 0 -- not logged, only reset
|
||||
pRelays' <- getResetProxyStatsData pRelays
|
||||
pRelaysOwn' <- getResetProxyStatsData pRelaysOwn
|
||||
pMsgFwds' <- getResetProxyStatsData pMsgFwds
|
||||
@@ -770,12 +772,9 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
|
||||
CPSkip -> False
|
||||
_ -> True
|
||||
processCP h role = \case
|
||||
CPAuth auth -> atomically $ writeTVar role $! newRole cfg
|
||||
CPAuth auth -> controlPortAuth h user admin role auth
|
||||
where
|
||||
newRole ServerConfig {controlPortUserAuth = user, controlPortAdminAuth = admin}
|
||||
| Just auth == admin = CPRAdmin
|
||||
| Just auth == user = CPRUser
|
||||
| otherwise = CPRNone
|
||||
ServerConfig {controlPortUserAuth = user, controlPortAdminAuth = admin} = cfg
|
||||
CPSuspend -> withAdminRole $ hPutStrLn h "suspend not implemented"
|
||||
CPResume -> withAdminRole $ hPutStrLn h "resume not implemented"
|
||||
CPClients -> withAdminRole $ do
|
||||
@@ -964,7 +963,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
|
||||
SubPending -> (c1, c2 + 1, c3, c4)
|
||||
SubThread _ -> (c1, c2, c3 + 1, c4)
|
||||
ProhibitSub -> pure (c1, c2, c3, c4 + 1)
|
||||
CPDelete sId -> withUserRole $ unliftIO u $ do
|
||||
CPDelete sId -> withAdminRole $ unliftIO u $ do
|
||||
st <- asks msgStore
|
||||
r <- liftIO $ runExceptT $ do
|
||||
q <- ExceptT $ getQueue st SSender sId
|
||||
@@ -983,14 +982,20 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
|
||||
"status: " <> show status <> ", updatedAt: " <> show updatedAt <> ", queueMode: " <> show queueMode
|
||||
CPBlock sId info -> withUserRole $ unliftIO u $ do
|
||||
st <- asks msgStore
|
||||
r <- liftIO $ runExceptT $ do
|
||||
q <- ExceptT $ getQueue st SSender sId
|
||||
ExceptT $ blockQueue (queueStore st) q info
|
||||
case r of
|
||||
Left e -> liftIO $ hPutStrLn h $ "error: " <> show e
|
||||
Right () -> do
|
||||
incStat . qBlocked =<< asks serverStats
|
||||
liftIO $ hPutStrLn h "ok"
|
||||
stats <- asks serverStats
|
||||
blocked <- liftIO $ readIORef $ qBlocked stats
|
||||
let quota = dailyBlockQueueQuota cfg
|
||||
if blocked >= quota && quota /= 0
|
||||
then liftIO $ hPutStrLn h $ "error: reached limit of " <> show quota <> " queues blocked daily"
|
||||
else do
|
||||
r <- liftIO $ runExceptT $ do
|
||||
q <- ExceptT $ getQueue st SSender sId
|
||||
ExceptT $ blockQueue (queueStore st) q info
|
||||
case r of
|
||||
Left e -> liftIO $ hPutStrLn h $ "error: " <> show e
|
||||
Right () -> do
|
||||
incStat $ qBlocked stats
|
||||
liftIO $ hPutStrLn h "ok"
|
||||
CPUnblock sId -> withUserRole $ unliftIO u $ do
|
||||
st <- asks msgStore
|
||||
r <- liftIO $ runExceptT $ do
|
||||
@@ -1045,6 +1050,20 @@ runClientTransport h@THandle {params = thParams@THandleParams {sessionId}} = do
|
||||
where
|
||||
hasSubs ServerSubscribers {subClients} = IS.member clientId <$> readTVarIO subClients
|
||||
|
||||
controlPortAuth :: Handle -> Maybe BasicAuth -> Maybe BasicAuth -> TVar CPClientRole -> BasicAuth -> IO ()
|
||||
controlPortAuth h user admin role auth = do
|
||||
readTVarIO role >>= \case
|
||||
CPRNone -> do
|
||||
atomically $ writeTVar role $! newRole
|
||||
hPutStrLn h $ currentRole newRole
|
||||
r -> hPutStrLn h $ currentRole r <> if r == newRole then "" else ", start new session to change."
|
||||
where
|
||||
currentRole r = "Current role is " <> show r
|
||||
newRole
|
||||
| Just auth == admin = CPRAdmin
|
||||
| Just auth == user = CPRUser
|
||||
| otherwise = CPRNone
|
||||
|
||||
clientDisconnected :: forall s. Client s -> M s ()
|
||||
clientDisconnected c@Client {clientId, subscriptions, ntfSubscriptions, serviceSubsCount, ntfServiceSubsCount, connected, clientTHParams = THandleParams {sessionId, thAuth}, endThreads} = do
|
||||
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " disc"
|
||||
|
||||
@@ -8,7 +8,7 @@ import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Protocol (BasicAuth, BlockingInfo, SenderId)
|
||||
|
||||
data CPClientRole = CPRNone | CPRUser | CPRAdmin
|
||||
deriving (Eq)
|
||||
deriving (Eq, Show)
|
||||
|
||||
data ControlProtocol
|
||||
= CPAuth BasicAuth
|
||||
|
||||
@@ -149,6 +149,7 @@ data ServerConfig s = ServerConfig
|
||||
-- | control port passwords,
|
||||
controlPortUserAuth :: Maybe BasicAuth,
|
||||
controlPortAdminAuth :: Maybe BasicAuth,
|
||||
dailyBlockQueueQuota :: Int,
|
||||
-- | time after which the messages can be removed from the queues and check interval, seconds
|
||||
messageExpiration :: Maybe ExpirationConfig,
|
||||
expireMessagesOnStart :: Bool,
|
||||
|
||||
@@ -420,6 +420,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
|
||||
newQueueBasicAuth = either error id <$!> strDecodeIni "AUTH" "create_password" ini,
|
||||
controlPortAdminAuth = either error id <$!> strDecodeIni "AUTH" "control_port_admin_password" ini,
|
||||
controlPortUserAuth = either error id <$!> strDecodeIni "AUTH" "control_port_user_password" ini,
|
||||
dailyBlockQueueQuota = readIniDefault 20 "AUTH" "daily_block_queue_quota" ini,
|
||||
messageExpiration =
|
||||
Just
|
||||
defaultMessageExpiration
|
||||
|
||||
@@ -107,8 +107,11 @@ iniFileContent cfgPath logPath opts host basicAuth controlPortPwds =
|
||||
)
|
||||
<> "\n\n"
|
||||
<> (optDisabled controlPortPwds <> "control_port_admin_password: " <> maybe "" fst controlPortPwds <> "\n")
|
||||
<> (optDisabled controlPortPwds <> "control_port_user_password: " <> maybe "" snd controlPortPwds <> "\n")
|
||||
<> "\n\
|
||||
<> (optDisabled controlPortPwds <> "control_port_user_password: " <> maybe "" snd controlPortPwds <> "\n\n")
|
||||
<> "# The limit for queues that can be blocked via control port per day, resets at 0:00 UTC.\n\
|
||||
\# Set to 0 to disable limit, to -1 to prohibit blocking. Default is 20.\n\
|
||||
\# daily_block_queue_quota: 20\n\
|
||||
\\n\
|
||||
\[TRANSPORT]\n\
|
||||
\# Host is only used to print server address on start.\n\
|
||||
\# You can specify multiple server ports.\n"
|
||||
|
||||
@@ -226,6 +226,7 @@ cfgMS msType = withStoreCfg (testServerStoreConfig msType) $ \serverStoreCfg ->
|
||||
newQueueBasicAuth = Nothing,
|
||||
controlPortUserAuth = Nothing,
|
||||
controlPortAdminAuth = Nothing,
|
||||
dailyBlockQueueQuota = 20,
|
||||
messageExpiration = Just defaultMessageExpiration,
|
||||
expireMessagesOnStart = True,
|
||||
idleQueueInterval = defaultIdleQueueInterval,
|
||||
|
||||
Reference in New Issue
Block a user