diff --git a/simplexmq.cabal b/simplexmq.cabal index a3d6af40b..b70cd513f 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -109,6 +109,7 @@ library Simplex.Messaging.Protocol Simplex.Messaging.Server Simplex.Messaging.Server.CLI + Simplex.Messaging.Server.Control Simplex.Messaging.Server.Env.STM Simplex.Messaging.Server.Expiration Simplex.Messaging.Server.Main diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 4a8eb1467..5826bcf7b 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -58,11 +58,13 @@ import Data.Time.Clock.System (SystemTime (..), getSystemTime) import Data.Time.Format.ISO8601 (iso8601Show) import Data.Type.Equality import GHC.TypeLits (KnownNat) -import Network.Socket (ServiceName) +import Network.Socket (ServiceName, Socket, socketToHandle) +import Simplex.Messaging.Agent.Lock import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding (Encoding (smpEncode)) import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol +import Simplex.Messaging.Server.Control import Simplex.Messaging.Server.Env.STM import Simplex.Messaging.Server.Expiration import Simplex.Messaging.Server.MsgStore @@ -74,10 +76,11 @@ import Simplex.Messaging.Server.StoreLog import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport +import Simplex.Messaging.Transport.Buffer (trimCR) import Simplex.Messaging.Transport.Server import Simplex.Messaging.Util import System.Exit (exitFailure) -import System.IO (hPutStrLn) +import System.IO (hPutStrLn, hSetNewlineMode, universalNewlineMode) import System.Mem.Weak (deRefWeak) import UnliftIO.Concurrent import UnliftIO.Directory (doesFileExist, renameFile) @@ -110,15 +113,18 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do raceAny_ ( serverThread s subscribedQ subscribers subscriptions cancelSub : serverThread s ntfSubscribedQ notifiers ntfSubscriptions (\_ -> pure ()) : - map runServer transports <> expireMessagesThread_ cfg <> serverStatsThread_ cfg + map runServer transports <> expireMessagesThread_ cfg <> serverStatsThread_ cfg <> controlPortThread_ cfg ) - `finally` (withLog closeStoreLog >> saveServerMessages >> saveServerStats) + `finally` withLock (savingLock s) "final" (saveServer False) where runServer :: (ServiceName, ATransport) -> M () runServer (tcpPort, ATransport t) = do serverParams <- asks tlsServerParams runTransportServer started tcpPort serverParams tCfg (runClient t) + saveServer :: Bool -> M () + saveServer keepMsgs = withLog closeStoreLog >> saveServerMessages keepMsgs >> saveServerStats + serverThread :: forall s. Server -> @@ -223,6 +229,57 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do Right th -> runClientTransport th Left _ -> pure () + controlPortThread_ :: ServerConfig -> [M ()] + controlPortThread_ ServerConfig {controlPort = Just port} = [runCPServer port] + controlPortThread_ _ = [] + + runCPServer :: ServiceName -> M () + runCPServer port = do + srv <- asks server + cpStarted <- newEmptyTMVarIO + u <- askUnliftIO + liftIO $ runTCPServer cpStarted port $ runCPClient u srv + where + runCPClient :: UnliftIO (ReaderT Env IO) -> Server -> Socket -> IO () + runCPClient u srv sock = do + h <- socketToHandle sock ReadWriteMode + hSetBuffering h LineBuffering + hSetNewlineMode h universalNewlineMode + hPutStrLn h "SMP server control port\n'help' for supported commands" + cpLoop h + where + cpLoop h = do + s <- B.hGetLine h + case strDecode $ trimCR s of + Right CPQuit -> hClose h + Right cmd -> processCP h cmd >> cpLoop h + Left err -> hPutStrLn h ("error: " <> err) >> cpLoop h + processCP h = \case + CPSuspend -> hPutStrLn h "suspend not implemented" + CPResume -> hPutStrLn h "resume not implemented" + CPClients -> hPutStrLn h "clients not implemented" + CPStats -> do + ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, msgSentNtf, msgRecvNtf, qCount, msgCount} <- unliftIO u $ asks serverStats + putStat "fromTime" fromTime + putStat "qCreated" qCreated + putStat "qSecured" qSecured + putStat "qDeleted" qDeleted + putStat "msgSent" msgSent + putStat "msgRecv" msgRecv + putStat "msgSentNtf" msgSentNtf + putStat "msgRecvNtf" msgRecvNtf + putStat "qCount" qCount + putStat "msgCount" msgCount + where + putStat :: Show a => String -> TVar a -> IO () + putStat label var = readTVarIO var >>= \v -> hPutStrLn h $ label <> ": " <> show v + CPSave -> withLock (savingLock srv) "control" $ do + hPutStrLn h "saving server state..." + unliftIO u $ saveServer True + hPutStrLn h "server state saved!" + CPHelp -> hPutStrLn h "commands: stats, save, help, quit" + CPQuit -> pure () + runClientTransport :: Transport c => THandle c -> M () runClientTransport th@THandle {thVersion, sessionId} = do q <- asks $ tbqSize . config @@ -720,8 +777,8 @@ randomId n = do gVar <- asks idsDrg atomically (C.pseudoRandomBytes n gVar) -saveServerMessages :: (MonadUnliftIO m, MonadReader Env m) => m () -saveServerMessages = asks (storeMsgsFile . config) >>= mapM_ saveMessages +saveServerMessages :: (MonadUnliftIO m, MonadReader Env m) => Bool -> m () +saveServerMessages keepMsgs = asks (storeMsgsFile . config) >>= mapM_ saveMessages where saveMessages f = do logInfo $ "saving messages to file " <> T.pack f @@ -730,8 +787,9 @@ saveServerMessages = asks (storeMsgsFile . config) >>= mapM_ saveMessages readTVarIO ms >>= mapM_ (saveQueueMsgs ms h) . M.keys logInfo "messages saved" where + getMessages = if keepMsgs then snapshotMsgQueue else flushMsgQueue saveQueueMsgs ms h rId = - atomically (flushMsgQueue ms rId) + atomically (getMessages ms rId) >>= mapM_ (B.hPutStrLn h . strEncode . MLRv3 rId) restoreServerMessages :: forall m. (MonadUnliftIO m, MonadReader Env m) => m () diff --git a/src/Simplex/Messaging/Server/Control.hs b/src/Simplex/Messaging/Server/Control.hs new file mode 100644 index 000000000..184e94775 --- /dev/null +++ b/src/Simplex/Messaging/Server/Control.hs @@ -0,0 +1,36 @@ +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE OverloadedStrings #-} + +module Simplex.Messaging.Server.Control where + +import qualified Data.Attoparsec.ByteString.Char8 as A +import Simplex.Messaging.Encoding.String + +data ControlProtocol + = CPSuspend + | CPResume + | CPClients + | CPStats + | CPSave + | CPHelp + | CPQuit + +instance StrEncoding ControlProtocol where + strEncode = \case + CPSuspend -> "suspend" + CPResume -> "resume" + CPClients -> "clients" + CPStats -> "stats" + CPSave -> "save" + CPHelp -> "help" + CPQuit -> "quit" + strP = + A.takeTill (== ' ') >>= \case + "suspend" -> pure CPSuspend + "resume" -> pure CPResume + "clients" -> pure CPClients + "stats" -> pure CPStats + "save" -> pure CPSave + "help" -> pure CPHelp + "quit" -> pure CPQuit + _ -> fail "bad ControlProtocol command" diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 165e4d330..386355e80 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -19,6 +19,7 @@ import Data.X509.Validation (Fingerprint (..)) import Network.Socket (ServiceName) import qualified Network.TLS as T import Numeric.Natural +import Simplex.Messaging.Agent.Lock import Simplex.Messaging.Crypto (KeyHash (..)) import Simplex.Messaging.Protocol import Simplex.Messaging.Server.Expiration @@ -70,7 +71,9 @@ data ServerConfig = ServerConfig -- | SMP client-server protocol version range smpServerVRange :: VersionRange, -- | TCP transport config - transportConfig :: TransportServerConfig + transportConfig :: TransportServerConfig, + -- | run listener on control port + controlPort :: Maybe ServiceName } defMsgExpirationDays :: Int64 @@ -106,7 +109,8 @@ data Server = Server { subscribedQ :: TQueue (RecipientId, Client), subscribers :: TMap RecipientId Client, ntfSubscribedQ :: TQueue (NotifierId, Client), - notifiers :: TMap NotifierId Client + notifiers :: TMap NotifierId Client, + savingLock :: Lock } data Client = Client @@ -133,7 +137,8 @@ newServer = do subscribers <- TM.empty ntfSubscribedQ <- newTQueue notifiers <- TM.empty - return Server {subscribedQ, subscribers, ntfSubscribedQ, notifiers} + savingLock <- createLock + return Server {subscribedQ, subscribers, ntfSubscribedQ, notifiers, savingLock} newClient :: Natural -> Version -> ByteString -> SystemTime -> STM Client newClient qSize thVersion sessionId ts = do diff --git a/src/Simplex/Messaging/Server/Main.hs b/src/Simplex/Messaging/Server/Main.hs index b1d92991b..749a8d6ba 100644 --- a/src/Simplex/Messaging/Server/Main.hs +++ b/src/Simplex/Messaging/Server/Main.hs @@ -130,7 +130,8 @@ smpServerCLI cfgPath logPath = <> ("host: " <> host <> "\n") <> ("port: " <> defaultServerPort <> "\n") <> "log_tls_errors: off\n\ - \websockets: off\n\n\ + \websockets: off\n\ + \# control_port: 5224\n\n\ \[INACTIVE_CLIENTS]\n\ \# TTL and interval to check inactive clients\n\ \disconnect: off\n" @@ -202,7 +203,8 @@ smpServerCLI cfgPath logPath = transportConfig = defaultTransportServerConfig { logTLSErrors = fromMaybe False $ iniOnOff "TRANSPORT" "log_tls_errors" ini - } + }, + controlPort = either (const Nothing) (Just . T.unpack) $ lookupValue "TRANSPORT" "control_port" ini } data CliCommand diff --git a/src/Simplex/Messaging/Server/MsgStore/STM.hs b/src/Simplex/Messaging/Server/MsgStore/STM.hs index e9dd95eec..95e425d8e 100644 --- a/src/Simplex/Messaging/Server/MsgStore/STM.hs +++ b/src/Simplex/Messaging/Server/MsgStore/STM.hs @@ -13,6 +13,7 @@ module Simplex.Messaging.Server.MsgStore.STM getMsgQueue, delMsgQueue, flushMsgQueue, + snapshotMsgQueue, writeMsg, tryPeekMsg, peekMsg, @@ -62,6 +63,14 @@ delMsgQueue st rId = TM.delete rId st flushMsgQueue :: STMMsgStore -> RecipientId -> STM [Message] flushMsgQueue st rId = TM.lookupDelete rId st >>= maybe (pure []) (flushTQueue . msgQueue) +snapshotMsgQueue :: STMMsgStore -> RecipientId -> STM [Message] +snapshotMsgQueue st rId = TM.lookup rId st >>= maybe (pure []) (snapshotTQueue . msgQueue) + where + snapshotTQueue q = do + msgs <- flushTQueue q + mapM_ (writeTQueue q) msgs + pure msgs + writeMsg :: MsgQueue -> Message -> STM (Maybe Message) writeMsg MsgQueue {msgQueue = q, quota, canWrite, size} msg = do canWrt <- readTVar canWrite diff --git a/src/Simplex/Messaging/Server/Stats.hs b/src/Simplex/Messaging/Server/Stats.hs index b462eaffa..493bd5ac1 100644 --- a/src/Simplex/Messaging/Server/Stats.hs +++ b/src/Simplex/Messaging/Server/Stats.hs @@ -94,7 +94,7 @@ setServerStats s d = do writeTVar (msgRecvNtf s) $! _msgRecvNtf d setPeriodStats (activeQueuesNtf s) (_activeQueuesNtf d) writeTVar (qCount s) $! _qCount d - writeTVar (msgCount s) $! _qCount d + writeTVar (msgCount s) $! _msgCount d instance StrEncoding ServerStatsData where strEncode ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _msgSentNtf, _msgRecvNtf, _activeQueues, _activeQueuesNtf} = diff --git a/tests/SMPClient.hs b/tests/SMPClient.hs index 470fcf215..639f254d0 100644 --- a/tests/SMPClient.hs +++ b/tests/SMPClient.hs @@ -100,7 +100,8 @@ cfg = privateKeyFile = "tests/fixtures/server.key", certificateFile = "tests/fixtures/server.crt", smpServerVRange = supportedSMPServerVRange, - transportConfig = defaultTransportServerConfig + transportConfig = defaultTransportServerConfig, + controlPort = Nothing } withSmpServerStoreMsgLogOnV2 :: HasCallStack => ATransport -> ServiceName -> (HasCallStack => ThreadId -> IO a) -> IO a