From c0f357d817d01b130706386bc27ac8836072e4e5 Mon Sep 17 00:00:00 2001 From: Evgeny Date: Mon, 21 Jul 2025 15:19:43 +0100 Subject: [PATCH] servers: control port session improvements (#1591) * servers: prohibit changing role during control port session * quota for blocked queues * allow disabling blocking and quota * fix test * fix INI file --- src/Simplex/FileTransfer/Server.hs | 9 ++-- src/Simplex/Messaging/Notifications/Server.hs | 7 +-- src/Simplex/Messaging/Server.hs | 47 +++++++++++++------ src/Simplex/Messaging/Server/Control.hs | 2 +- src/Simplex/Messaging/Server/Env/STM.hs | 1 + src/Simplex/Messaging/Server/Main.hs | 1 + src/Simplex/Messaging/Server/Main/Init.hs | 7 ++- tests/SMPClient.hs | 1 + 8 files changed, 47 insertions(+), 28 deletions(-) diff --git a/src/Simplex/FileTransfer/Server.hs b/src/Simplex/FileTransfer/Server.hs index a5e5727e4..c12c2b5f6 100644 --- a/src/Simplex/FileTransfer/Server.hs +++ b/src/Simplex/FileTransfer/Server.hs @@ -54,7 +54,7 @@ import qualified Simplex.Messaging.Crypto.Lazy as LC import Simplex.Messaging.Encoding import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol (BlockingInfo, EntityId (..), RcvPublicAuthKey, RcvPublicDhKey, RecipientId, SignedTransmission, pattern NoEntity) -import Simplex.Messaging.Server (dummyVerifyCmd, verifyCmdAuthorization) +import Simplex.Messaging.Server (controlPortAuth, dummyVerifyCmd, verifyCmdAuthorization) import Simplex.Messaging.Server.Control (CPClientRole (..)) import Simplex.Messaging.Server.Expiration import Simplex.Messaging.Server.QueueStore (RoundedSystemTime, ServerEntityStatus (..), getRoundedSystemTime) @@ -277,12 +277,9 @@ xftpServer cfg@XFTPServerConfig {xftpPort, transportConfig, inactiveClientExpira CPSkip -> False _ -> True processCP h role = \case - CPAuth auth -> atomically $ writeTVar role $! newRole cfg + CPAuth auth -> controlPortAuth h user admin role auth where - newRole XFTPServerConfig {controlPortUserAuth = user, controlPortAdminAuth = admin} - | Just auth == admin = CPRAdmin - | Just auth == user = CPRUser - | otherwise = CPRNone + XFTPServerConfig {controlPortUserAuth = user, controlPortAdminAuth = admin} = cfg CPStatsRTS -> E.tryAny getRTSStats >>= either (hPrint h) (hPrint h) CPDelete fileId -> withUserRole $ unliftIO u $ do fs <- asks store diff --git a/src/Simplex/Messaging/Notifications/Server.hs b/src/Simplex/Messaging/Notifications/Server.hs index c856b5249..ac274dc08 100644 --- a/src/Simplex/Messaging/Notifications/Server.hs +++ b/src/Simplex/Messaging/Notifications/Server.hs @@ -364,12 +364,9 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg, startOptions} CPSkip -> False _ -> True processCP h role = \case - CPAuth auth -> atomically $ writeTVar role $! newRole cfg + CPAuth auth -> controlPortAuth h user admin role auth where - newRole NtfServerConfig {controlPortUserAuth = user, controlPortAdminAuth = admin} - | Just auth == admin = CPRAdmin - | Just auth == user = CPRUser - | otherwise = CPRNone + NtfServerConfig {controlPortUserAuth = user, controlPortAdminAuth = admin} = cfg CPStats -> withUserRole $ do ss <- unliftIO u $ asks serverStats let getStat :: (NtfServerStats -> IORef a) -> IO a diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index a439e8709..801597b48 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -31,6 +31,7 @@ module Simplex.Messaging.Server ( runSMPServer, runSMPServerBlocking, + controlPortAuth, importMessages, exportMessages, printMessageStats, @@ -558,6 +559,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt msgNtfNoSub' <- atomicSwapIORef (msgNtfNoSub ss) 0 msgNtfLost' <- atomicSwapIORef (msgNtfLost ss) 0 msgNtfExpired' <- atomicSwapIORef (msgNtfExpired ss) 0 + _qBlocked <- atomicSwapIORef (qBlocked ss) 0 -- not logged, only reset pRelays' <- getResetProxyStatsData pRelays pRelaysOwn' <- getResetProxyStatsData pRelaysOwn pMsgFwds' <- getResetProxyStatsData pMsgFwds @@ -770,12 +772,9 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt CPSkip -> False _ -> True processCP h role = \case - CPAuth auth -> atomically $ writeTVar role $! newRole cfg + CPAuth auth -> controlPortAuth h user admin role auth where - newRole ServerConfig {controlPortUserAuth = user, controlPortAdminAuth = admin} - | Just auth == admin = CPRAdmin - | Just auth == user = CPRUser - | otherwise = CPRNone + ServerConfig {controlPortUserAuth = user, controlPortAdminAuth = admin} = cfg CPSuspend -> withAdminRole $ hPutStrLn h "suspend not implemented" CPResume -> withAdminRole $ hPutStrLn h "resume not implemented" CPClients -> withAdminRole $ do @@ -964,7 +963,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt SubPending -> (c1, c2 + 1, c3, c4) SubThread _ -> (c1, c2, c3 + 1, c4) ProhibitSub -> pure (c1, c2, c3, c4 + 1) - CPDelete sId -> withUserRole $ unliftIO u $ do + CPDelete sId -> withAdminRole $ unliftIO u $ do st <- asks msgStore r <- liftIO $ runExceptT $ do q <- ExceptT $ getQueue st SSender sId @@ -983,14 +982,20 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt "status: " <> show status <> ", updatedAt: " <> show updatedAt <> ", queueMode: " <> show queueMode CPBlock sId info -> withUserRole $ unliftIO u $ do st <- asks msgStore - r <- liftIO $ runExceptT $ do - q <- ExceptT $ getQueue st SSender sId - ExceptT $ blockQueue (queueStore st) q info - case r of - Left e -> liftIO $ hPutStrLn h $ "error: " <> show e - Right () -> do - incStat . qBlocked =<< asks serverStats - liftIO $ hPutStrLn h "ok" + stats <- asks serverStats + blocked <- liftIO $ readIORef $ qBlocked stats + let quota = dailyBlockQueueQuota cfg + if blocked >= quota && quota /= 0 + then liftIO $ hPutStrLn h $ "error: reached limit of " <> show quota <> " queues blocked daily" + else do + r <- liftIO $ runExceptT $ do + q <- ExceptT $ getQueue st SSender sId + ExceptT $ blockQueue (queueStore st) q info + case r of + Left e -> liftIO $ hPutStrLn h $ "error: " <> show e + Right () -> do + incStat $ qBlocked stats + liftIO $ hPutStrLn h "ok" CPUnblock sId -> withUserRole $ unliftIO u $ do st <- asks msgStore r <- liftIO $ runExceptT $ do @@ -1045,6 +1050,20 @@ runClientTransport h@THandle {params = thParams@THandleParams {sessionId}} = do where hasSubs ServerSubscribers {subClients} = IS.member clientId <$> readTVarIO subClients +controlPortAuth :: Handle -> Maybe BasicAuth -> Maybe BasicAuth -> TVar CPClientRole -> BasicAuth -> IO () +controlPortAuth h user admin role auth = do + readTVarIO role >>= \case + CPRNone -> do + atomically $ writeTVar role $! newRole + hPutStrLn h $ currentRole newRole + r -> hPutStrLn h $ currentRole r <> if r == newRole then "" else ", start new session to change." + where + currentRole r = "Current role is " <> show r + newRole + | Just auth == admin = CPRAdmin + | Just auth == user = CPRUser + | otherwise = CPRNone + clientDisconnected :: forall s. Client s -> M s () clientDisconnected c@Client {clientId, subscriptions, ntfSubscriptions, serviceSubsCount, ntfServiceSubsCount, connected, clientTHParams = THandleParams {sessionId, thAuth}, endThreads} = do labelMyThread . B.unpack $ "client $" <> encode sessionId <> " disc" diff --git a/src/Simplex/Messaging/Server/Control.hs b/src/Simplex/Messaging/Server/Control.hs index 318bce1cc..7c6565285 100644 --- a/src/Simplex/Messaging/Server/Control.hs +++ b/src/Simplex/Messaging/Server/Control.hs @@ -8,7 +8,7 @@ import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol (BasicAuth, BlockingInfo, SenderId) data CPClientRole = CPRNone | CPRUser | CPRAdmin - deriving (Eq) + deriving (Eq, Show) data ControlProtocol = CPAuth BasicAuth diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 4d0de07a6..d0f1a84fd 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -149,6 +149,7 @@ data ServerConfig s = ServerConfig -- | control port passwords, controlPortUserAuth :: Maybe BasicAuth, controlPortAdminAuth :: Maybe BasicAuth, + dailyBlockQueueQuota :: Int, -- | time after which the messages can be removed from the queues and check interval, seconds messageExpiration :: Maybe ExpirationConfig, expireMessagesOnStart :: Bool, diff --git a/src/Simplex/Messaging/Server/Main.hs b/src/Simplex/Messaging/Server/Main.hs index b2d16d904..17c5de7e3 100644 --- a/src/Simplex/Messaging/Server/Main.hs +++ b/src/Simplex/Messaging/Server/Main.hs @@ -420,6 +420,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = newQueueBasicAuth = either error id <$!> strDecodeIni "AUTH" "create_password" ini, controlPortAdminAuth = either error id <$!> strDecodeIni "AUTH" "control_port_admin_password" ini, controlPortUserAuth = either error id <$!> strDecodeIni "AUTH" "control_port_user_password" ini, + dailyBlockQueueQuota = readIniDefault 20 "AUTH" "daily_block_queue_quota" ini, messageExpiration = Just defaultMessageExpiration diff --git a/src/Simplex/Messaging/Server/Main/Init.hs b/src/Simplex/Messaging/Server/Main/Init.hs index 7b1b320be..9f52d0797 100644 --- a/src/Simplex/Messaging/Server/Main/Init.hs +++ b/src/Simplex/Messaging/Server/Main/Init.hs @@ -107,8 +107,11 @@ iniFileContent cfgPath logPath opts host basicAuth controlPortPwds = ) <> "\n\n" <> (optDisabled controlPortPwds <> "control_port_admin_password: " <> maybe "" fst controlPortPwds <> "\n") - <> (optDisabled controlPortPwds <> "control_port_user_password: " <> maybe "" snd controlPortPwds <> "\n") - <> "\n\ + <> (optDisabled controlPortPwds <> "control_port_user_password: " <> maybe "" snd controlPortPwds <> "\n\n") + <> "# The limit for queues that can be blocked via control port per day, resets at 0:00 UTC.\n\ + \# Set to 0 to disable limit, to -1 to prohibit blocking. Default is 20.\n\ + \# daily_block_queue_quota: 20\n\ + \\n\ \[TRANSPORT]\n\ \# Host is only used to print server address on start.\n\ \# You can specify multiple server ports.\n" diff --git a/tests/SMPClient.hs b/tests/SMPClient.hs index 50f11e25f..2b18a2d51 100644 --- a/tests/SMPClient.hs +++ b/tests/SMPClient.hs @@ -226,6 +226,7 @@ cfgMS msType = withStoreCfg (testServerStoreConfig msType) $ \serverStoreCfg -> newQueueBasicAuth = Nothing, controlPortUserAuth = Nothing, controlPortAdminAuth = Nothing, + dailyBlockQueueQuota = 20, messageExpiration = Just defaultMessageExpiration, expireMessagesOnStart = True, idleQueueInterval = defaultIdleQueueInterval,