server: add control port commands for clients and ghc threads (#836)

* server: add control port commands for clients and ghc threads (#835)

* Add stats-rts control query

With supporting ghc-options that would provide the data.

* Add CPSkip command

Allows spamming empty lines a few times to clean up the view.

* server: Add CP commands to enumerate clients and threads

* style

---------

Co-authored-by: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com>

* use base64 encoding for session ID

* fromMaybe

* whitespace

---------

Co-authored-by: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com>
This commit is contained in:
Evgeny Poberezkin
2023-08-26 16:02:18 +01:00
committed by GitHub
parent ed05428227
commit 7bdae793cb
6 changed files with 84 additions and 27 deletions

View File

@@ -97,6 +97,7 @@ executables:
- simplexmq
ghc-options:
- -threaded
- -rtsopts
ntf-server:
source-dirs: apps/ntf-server
@@ -105,6 +106,7 @@ executables:
- simplexmq
ghc-options:
- -threaded
- -rtsopts
xftp-server:
source-dirs: apps/xftp-server
@@ -113,6 +115,7 @@ executables:
- simplexmq
ghc-options:
- -threaded
- -rtsopts
smp-agent:
source-dirs: apps/smp-agent
@@ -121,6 +124,7 @@ executables:
- simplexmq
ghc-options:
- -threaded
- -rtsopts
xftp:
source-dirs: apps/xftp
@@ -129,6 +133,7 @@ executables:
- simplexmq
ghc-options:
- -threaded
- -rtsopts
tests:
simplexmq-test:

View File

@@ -203,7 +203,7 @@ executable ntf-server
Paths_simplexmq
hs-source-dirs:
apps/ntf-server
ghc-options: -Wall -Wcompat -Werror=incomplete-patterns -Wredundant-constraints -Wincomplete-record-updates -Wincomplete-uni-patterns -Wunused-type-patterns -threaded
ghc-options: -Wall -Wcompat -Werror=incomplete-patterns -Wredundant-constraints -Wincomplete-record-updates -Wincomplete-uni-patterns -Wunused-type-patterns -threaded -rtsopts
build-depends:
QuickCheck ==2.14.*
, aeson ==2.2.*
@@ -268,7 +268,7 @@ executable smp-agent
Paths_simplexmq
hs-source-dirs:
apps/smp-agent
ghc-options: -Wall -Wcompat -Werror=incomplete-patterns -Wredundant-constraints -Wincomplete-record-updates -Wincomplete-uni-patterns -Wunused-type-patterns -threaded
ghc-options: -Wall -Wcompat -Werror=incomplete-patterns -Wredundant-constraints -Wincomplete-record-updates -Wincomplete-uni-patterns -Wunused-type-patterns -threaded -rtsopts
build-depends:
QuickCheck ==2.14.*
, aeson ==2.2.*
@@ -333,7 +333,7 @@ executable smp-server
Paths_simplexmq
hs-source-dirs:
apps/smp-server
ghc-options: -Wall -Wcompat -Werror=incomplete-patterns -Wredundant-constraints -Wincomplete-record-updates -Wincomplete-uni-patterns -Wunused-type-patterns -threaded
ghc-options: -Wall -Wcompat -Werror=incomplete-patterns -Wredundant-constraints -Wincomplete-record-updates -Wincomplete-uni-patterns -Wunused-type-patterns -threaded -rtsopts
build-depends:
QuickCheck ==2.14.*
, aeson ==2.2.*
@@ -398,7 +398,7 @@ executable xftp
Paths_simplexmq
hs-source-dirs:
apps/xftp
ghc-options: -Wall -Wcompat -Werror=incomplete-patterns -Wredundant-constraints -Wincomplete-record-updates -Wincomplete-uni-patterns -Wunused-type-patterns -threaded
ghc-options: -Wall -Wcompat -Werror=incomplete-patterns -Wredundant-constraints -Wincomplete-record-updates -Wincomplete-uni-patterns -Wunused-type-patterns -threaded -rtsopts
build-depends:
QuickCheck ==2.14.*
, aeson ==2.2.*
@@ -463,7 +463,7 @@ executable xftp-server
Paths_simplexmq
hs-source-dirs:
apps/xftp-server
ghc-options: -Wall -Wcompat -Werror=incomplete-patterns -Wredundant-constraints -Wincomplete-record-updates -Wincomplete-uni-patterns -Wunused-type-patterns -threaded
ghc-options: -Wall -Wcompat -Werror=incomplete-patterns -Wredundant-constraints -Wincomplete-record-updates -Wincomplete-uni-patterns -Wunused-type-patterns -threaded -rtsopts
build-depends:
QuickCheck ==2.14.*
, aeson ==2.2.*

View File

@@ -49,16 +49,19 @@ import qualified Data.ByteString.Char8 as B
import Data.Either (fromRight, partitionEithers)
import Data.Functor (($>))
import Data.Int (Int64)
import Data.List (intercalate)
import Data.List (intercalate, sort)
import qualified Data.List.NonEmpty as L
import qualified Data.Map.Strict as M
import Data.Maybe (isNothing)
import Data.Maybe (fromMaybe, isNothing)
import qualified Data.Text as T
import Data.Text.Encoding (decodeLatin1)
import Data.Time.Clock (UTCTime (..), diffTimeToPicoseconds, getCurrentTime)
import Data.Time.Clock.System (SystemTime (..), getSystemTime)
import Data.Time.Format.ISO8601 (iso8601Show)
import Data.Type.Equality
import GHC.Conc (listThreads, threadStatus)
import GHC.Conc.Sync (threadLabel)
import GHC.Stats (getRTSStats)
import GHC.TypeLits (KnownNat)
import Network.Socket (ServiceName, Socket, socketToHandle)
import Simplex.Messaging.Agent.Lock
@@ -113,8 +116,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
restoreServerMessages
restoreServerStats
raceAny_
( serverThread s subscribedQ subscribers subscriptions cancelSub :
serverThread s ntfSubscribedQ Env.notifiers ntfSubscriptions (\_ -> pure ()) :
( serverThread s "server subscribedQ" subscribedQ subscribers subscriptions cancelSub :
serverThread s "server ntfSubscribedQ" ntfSubscribedQ Env.notifiers ntfSubscriptions (\_ -> pure ()) :
map runServer transports <> expireMessagesThread_ cfg <> serverStatsThread_ cfg <> controlPortThread_ cfg
)
`finally` withLock (savingLock s) "final" (saveServer False)
@@ -130,12 +133,14 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
serverThread ::
forall s.
Server ->
String ->
(Server -> TQueue (QueueId, Client)) ->
(Server -> TMap QueueId Client) ->
(Client -> TMap QueueId s) ->
(s -> IO ()) ->
M ()
serverThread s subQ subs clientSubs unsub = forever $ do
serverThread s label subQ subs clientSubs unsub = forever $ do
labelMyThread label
atomically updateSubscribers
$>>= endPreviousSubscriptions
>>= liftIO . mapM_ unsub
@@ -152,6 +157,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
TM.lookupInsert qId clnt (subs s) $>>= clientToBeNotified
endPreviousSubscriptions :: (QueueId, Client) -> M (Maybe s)
endPreviousSubscriptions (qId, c) = do
labelMyThread $ label <> ".endPreviousSubscriptions"
void . forkIO . atomically $
writeTBQueue (sndQ c) [(CorrId "", qId, END)]
atomically $ TM.lookupDelete qId (clientSubs c)
@@ -165,6 +171,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
ms <- asks msgStore
quota <- asks $ msgQueueQuota . config
let interval = checkInterval expCfg * 1000000
labelMyThread "expireMessages"
forever $ do
liftIO $ threadDelay' interval
old <- liftIO $ expireBeforeEpoch expCfg
@@ -180,6 +187,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
logServerStats :: Int64 -> Int64 -> FilePath -> M ()
logServerStats startAt logInterval statsFilePath = do
labelMyThread "logServerStats"
initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime
liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath
liftIO $ threadDelay' $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0)
@@ -224,9 +232,10 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
threadDelay' interval
runClient :: Transport c => TProxy c -> c -> M ()
runClient _ h = do
runClient tp h = do
kh <- asks serverIdentity
smpVRange <- asks $ smpServerVRange . config
labelMyThread $ "smp handshake for " <> transportName tp
liftIO (runExceptT $ smpServerHandshake h kh smpVRange) >>= \case
Right th -> runClientTransport th
Left _ -> pure ()
@@ -240,10 +249,13 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
srv <- asks server
cpStarted <- newEmptyTMVarIO
u <- askUnliftIO
liftIO $ runTCPServer cpStarted port $ runCPClient u srv
liftIO $ do
labelMyThread "control port server"
runTCPServer cpStarted port $ runCPClient u srv
where
runCPClient :: UnliftIO (ReaderT Env IO) -> Server -> Socket -> IO ()
runCPClient u srv sock = do
labelMyThread "control port client"
h <- socketToHandle sock ReadWriteMode
hSetBuffering h LineBuffering
hSetNewlineMode h universalNewlineMode
@@ -259,7 +271,15 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
processCP h = \case
CPSuspend -> hPutStrLn h "suspend not implemented"
CPResume -> hPutStrLn h "resume not implemented"
CPClients -> hPutStrLn h "clients not implemented"
CPClients -> do
Server {subscribers} <- unliftIO u $ asks server
clients <- readTVarIO subscribers
hPutStrLn h $ "Clients: " <> show (length clients)
forM_ (M.toList clients) $ \(cid, Client {sessionId, connected, activeAt, subscriptions}) -> do
hPutStrLn h . B.unpack $ "Client " <> encode cid <> " $" <> encode sessionId
readTVarIO connected >>= hPutStrLn h . (" connected: " <>) . show
readTVarIO activeAt >>= hPutStrLn h . (" activeAt: " <>) . B.unpack . strEncode
readTVarIO subscriptions >>= hPutStrLn h . (" subscriptions: " <>) . show . M.size
CPStats -> do
ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, msgSentNtf, msgRecvNtf, qCount, msgCount} <- unliftIO u $ asks serverStats
putStat "fromTime" fromTime
@@ -275,12 +295,21 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
where
putStat :: Show a => String -> TVar a -> IO ()
putStat label var = readTVarIO var >>= \v -> hPutStrLn h $ label <> ": " <> show v
CPStatsRTS -> getRTSStats >>= hPutStrLn h . show
CPThreads -> do
threads <- liftIO listThreads
hPutStrLn h $ "Threads: " <> show (length threads)
forM_ (sort threads) $ \tid -> do
label <- threadLabel tid
status <- threadStatus tid
hPutStrLn h $ show tid <> " (" <> show status <> ") " <> fromMaybe "" label
CPSave -> withLock (savingLock srv) "control" $ do
hPutStrLn h "saving server state..."
unliftIO u $ saveServer True
hPutStrLn h "server state saved!"
CPHelp -> hPutStrLn h "commands: stats, save, help, quit"
CPHelp -> hPutStrLn h "commands: stats, stats-rts, clients, threads, save, help, quit"
CPQuit -> pure ()
CPSkip -> pure ()
runClientTransport :: Transport c => THandle c -> M ()
runClientTransport th@THandle {thVersion, sessionId} = do
@@ -289,6 +318,7 @@ runClientTransport th@THandle {thVersion, sessionId} = do
c <- atomically $ newClient q thVersion sessionId ts
s <- asks server
expCfg <- asks $ inactiveClientExpiration . config
labelMyThread . B.unpack $ "client $" <> encode c.sessionId
raceAny_ ([liftIO $ send th c, client c s, receive th c] <> disconnectThread_ c expCfg)
`finally` clientDisconnected c
where
@@ -319,12 +349,14 @@ cancelSub sub =
_ -> return ()
receive :: Transport c => THandle c -> Client -> M ()
receive th Client {rcvQ, sndQ, activeAt} = forever $ do
ts <- L.toList <$> liftIO (tGet th)
atomically . writeTVar activeAt =<< liftIO getSystemTime
as <- partitionEithers <$> mapM cmdAction ts
write sndQ $ fst as
write rcvQ $ snd as
receive th Client {rcvQ, sndQ, activeAt, sessionId} = do
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " receive"
forever $ do
ts <- L.toList <$> liftIO (tGet th)
atomically . writeTVar activeAt =<< liftIO getSystemTime
as <- partitionEithers <$> mapM cmdAction ts
write sndQ $ fst as
write rcvQ $ snd as
where
cmdAction :: SignedTransmission ErrorType Cmd -> M (Either (Transmission BrokerMsg) (Maybe QueueRec, Transmission Cmd))
cmdAction (sig, signed, (corrId, queueId, cmdOrError)) =
@@ -338,10 +370,12 @@ receive th Client {rcvQ, sndQ, activeAt} = forever $ do
write q = mapM_ (atomically . writeTBQueue q) . L.nonEmpty
send :: Transport c => THandle c -> Client -> IO ()
send h@THandle {thVersion = v} Client {sndQ, sessionId, activeAt} = forever $ do
ts <- atomically $ L.sortWith tOrder <$> readTBQueue sndQ
void . liftIO . tPut h Nothing $ L.map ((Nothing,) . encodeTransmission v sessionId) ts
atomically . writeTVar activeAt =<< liftIO getSystemTime
send h@THandle {thVersion = v} Client {sndQ, sessionId, activeAt} = do
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " send"
forever $ do
ts <- atomically $ L.sortWith tOrder <$> readTBQueue sndQ
void . liftIO . tPut h Nothing $ L.map ((Nothing,) . encodeTransmission v sessionId) ts
atomically . writeTVar activeAt =<< liftIO getSystemTime
where
tOrder :: Transmission BrokerMsg -> Int
tOrder (_, _, cmd) = case cmd of
@@ -350,7 +384,8 @@ send h@THandle {thVersion = v} Client {sndQ, sessionId, activeAt} = forever $ do
_ -> 1
disconnectTransport :: Transport c => THandle c -> client -> (client -> TVar SystemTime) -> ExpirationConfig -> IO ()
disconnectTransport THandle {connection} c activeAt expCfg = do
disconnectTransport THandle {connection, sessionId} c activeAt expCfg = do
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " disconnectTransport"
let interval = checkInterval expCfg * 1000000
forever . liftIO $ do
threadDelay' interval
@@ -406,7 +441,8 @@ dummyKeyEd448 :: C.PublicKey 'C.Ed448
dummyKeyEd448 = "MEMwBQYDK2VxAzoA6ibQc9XpkSLtwrf7PLvp81qW/etiumckVFImCMRdftcG/XopbOSaq9qyLhrgJWKOLyNrQPNVvpMA"
client :: forall m. (MonadUnliftIO m, MonadReader Env m) => Client -> Server -> m ()
client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscribedQ, ntfSubscribedQ, notifiers} =
client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscribedQ, ntfSubscribedQ, notifiers} = do
labelMyThread . B.unpack $ "client $" <> encode clnt.sessionId <> " commands"
forever $
atomically (readTBQueue rcvQ)
>>= mapM processCommand

View File

@@ -11,9 +11,12 @@ data ControlProtocol
| CPResume
| CPClients
| CPStats
| CPStatsRTS
| CPThreads
| CPSave
| CPHelp
| CPQuit
| CPSkip
instance StrEncoding ControlProtocol where
strEncode = \case
@@ -21,16 +24,22 @@ instance StrEncoding ControlProtocol where
CPResume -> "resume"
CPClients -> "clients"
CPStats -> "stats"
CPStatsRTS -> "stats-rts"
CPThreads -> "threads"
CPSave -> "save"
CPHelp -> "help"
CPQuit -> "quit"
CPSkip -> ""
strP =
A.takeTill (== ' ') >>= \case
"suspend" -> pure CPSuspend
"resume" -> pure CPResume
"clients" -> pure CPClients
"stats" -> pure CPStats
"stats-rts" -> pure CPStatsRTS
"threads" -> pure CPThreads
"save" -> pure CPSave
"help" -> pure CPHelp
"quit" -> pure CPQuit
"" -> pure CPSkip
_ -> fail "bad ControlProtocol command"

View File

@@ -32,7 +32,7 @@ import qualified Network.TLS as T
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport
import Simplex.Messaging.Util (catchAll_, tshow)
import Simplex.Messaging.Util (catchAll_, labelMyThread, tshow)
import System.Exit (exitFailure)
import System.Mem.Weak (Weak, deRefWeak)
import UnliftIO.Concurrent
@@ -63,6 +63,7 @@ runTransportServer :: forall c m. (Transport c, MonadUnliftIO m) => TMVar Bool -
runTransportServer started port serverParams cfg server = do
u <- askUnliftIO
let tCfg = serverTransportConfig cfg
labelMyThread $ "transport server for " <> transportName (TProxy :: TProxy c)
liftIO . runTCPServer started port $ \conn ->
E.bracket
(connectTLS Nothing tCfg serverParams conn >>= getServerConnection tCfg)

View File

@@ -9,6 +9,7 @@ import Control.Monad
import Control.Monad.Except
import Control.Monad.IO.Unlift
import Data.Bifunctor (first)
import qualified Data.ByteString as BW
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.Int (Int64)
@@ -19,6 +20,8 @@ import Data.Text (Text)
import qualified Data.Text as T
import Data.Text.Encoding (decodeUtf8With)
import Data.Time (NominalDiffTime)
import GHC.Conc
import Numeric (showHex)
import UnliftIO.Async
import qualified UnliftIO.Exception as UE
@@ -149,3 +152,6 @@ diffToMicroseconds diff = fromIntegral ((truncate $ diff * 1000000) :: Integer)
diffToMilliseconds :: NominalDiffTime -> Int64
diffToMilliseconds diff = fromIntegral ((truncate $ diff * 1000) :: Integer)
labelMyThread :: MonadIO m => String -> m ()
labelMyThread label = liftIO $ myThreadId >>= (`labelThread` label)