mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-26 03:14:53 +00:00
server: log slow operations
This commit is contained in:
@@ -26,7 +26,7 @@ logCfg = LogConfig {lc_file = Nothing, lc_stderr = True}
|
||||
|
||||
main :: IO ()
|
||||
main = do
|
||||
setLogLevel LogInfo
|
||||
setLogLevel LogDebug
|
||||
withGlobalLogging logCfg . protocolServerCLI smpServerCLIConfig $ \cfg@ServerConfig {inactiveClientExpiration} -> do
|
||||
putStrLn $ case inactiveClientExpiration of
|
||||
Just ExpirationConfig {ttl, checkInterval} -> "expiring clients inactive for " <> show ttl <> " seconds every " <> show checkInterval <> " seconds"
|
||||
|
||||
@@ -321,7 +321,7 @@ dummyKeyEd448 :: C.PublicKey 'C.Ed448
|
||||
dummyKeyEd448 = "MEMwBQYDK2VxAzoA6ibQc9XpkSLtwrf7PLvp81qW/etiumckVFImCMRdftcG/XopbOSaq9qyLhrgJWKOLyNrQPNVvpMA"
|
||||
|
||||
client :: forall m. (MonadUnliftIO m, MonadReader Env m) => Client -> Server -> m ()
|
||||
client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscribedQ, ntfSubscribedQ, notifiers} =
|
||||
client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscribedQ, ntfSubscribedQ, notifiers} =
|
||||
forever $
|
||||
atomically (readTBQueue rcvQ)
|
||||
>>= mapM processCommand
|
||||
@@ -432,7 +432,7 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv
|
||||
okResp <$> atomically (suspendQueue st queueId)
|
||||
|
||||
subscribeQueue :: QueueRec -> RecipientId -> m (Transmission BrokerMsg)
|
||||
subscribeQueue qr rId =
|
||||
subscribeQueue qr rId = timed "subscribe" sessionId rId $ do
|
||||
atomically (TM.lookup rId subscriptions) >>= \case
|
||||
Nothing ->
|
||||
atomically newSub >>= deliver
|
||||
@@ -496,7 +496,7 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv
|
||||
pure ok
|
||||
|
||||
acknowledgeMsg :: QueueRec -> MsgId -> m (Transmission BrokerMsg)
|
||||
acknowledgeMsg qr msgId = do
|
||||
acknowledgeMsg qr msgId = timed "ack" sessionId queueId $ do
|
||||
atomically (TM.lookup queueId subscriptions) >>= \case
|
||||
Nothing -> pure $ err NO_MSG
|
||||
Just sub ->
|
||||
@@ -540,7 +540,7 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv
|
||||
ServerConfig {messageExpiration, msgQueueQuota} <- asks config
|
||||
old <- liftIO $ mapM expireBeforeEpoch messageExpiration
|
||||
ntfNonceDrg <- asks idsDrg
|
||||
resp@(_, _, sent) <- atomically $ do
|
||||
resp@(_, _, sent) <- timed "send" sessionId queueId . atomically $ do
|
||||
q <- getMsgQueue ms (recipientId qr) msgQueueQuota
|
||||
mapM_ (deleteExpiredMsgs q) old
|
||||
ifM (isFull q) (pure $ err QUOTA) $ do
|
||||
@@ -578,7 +578,7 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv
|
||||
pure . (cbNonce,) $ fromRight "" encNMsgMeta
|
||||
|
||||
deliverMessage :: QueueRec -> RecipientId -> TVar Sub -> MsgQueue -> Maybe Message -> m (Transmission BrokerMsg)
|
||||
deliverMessage qr rId sub q msg_ = do
|
||||
deliverMessage qr rId sub q msg_ = timed "deliver" sessionId rId $ do
|
||||
readTVarIO sub >>= \case
|
||||
s@Sub {subThread = NoSub} ->
|
||||
case msg_ of
|
||||
@@ -596,13 +596,14 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv
|
||||
s@Sub {subThread = SubPending} -> s {subThread = SubThread t}
|
||||
s -> s
|
||||
where
|
||||
subscriber = atomically $ do
|
||||
msg <- peekMsg q
|
||||
let encMsg = encryptMsg qr msg
|
||||
writeTBQueue sndQ [(CorrId "", rId, MSG encMsg)]
|
||||
s <- readTVar sub
|
||||
void $ setDelivered s msg
|
||||
writeTVar sub s {subThread = NoSub}
|
||||
subscriber = do
|
||||
msg <- atomically $ peekMsg q
|
||||
timed "subscriber" sessionId rId . atomically $ do
|
||||
let encMsg = encryptMsg qr msg
|
||||
writeTBQueue sndQ [(CorrId "", rId, MSG encMsg)]
|
||||
s <- readTVar sub
|
||||
void $ setDelivered s msg
|
||||
writeTVar sub s {subThread = NoSub}
|
||||
|
||||
encryptMsg :: QueueRec -> Message -> RcvMessage
|
||||
encryptMsg qr Message {msgId, msgTs, msgFlags, msgBody}
|
||||
@@ -648,6 +649,18 @@ withLog action = do
|
||||
env <- ask
|
||||
liftIO . mapM_ action $ storeLog (env :: Env)
|
||||
|
||||
timed :: MonadUnliftIO m => T.Text -> ByteString -> RecipientId -> m a -> m a
|
||||
timed name sessId qId a = do
|
||||
t <- liftIO getSystemTime
|
||||
r <- a
|
||||
t' <- liftIO getSystemTime
|
||||
let int = diff t t'
|
||||
when (int > sec) . logDebug $ T.unwords [name, tshow sessId, tshow qId, tshow int]
|
||||
pure r
|
||||
where
|
||||
diff t t' = (systemSeconds t' - systemSeconds t) * sec + fromIntegral (systemNanoseconds t' - systemNanoseconds t)
|
||||
sec = 1000_000000
|
||||
|
||||
randomId :: (MonadUnliftIO m, MonadReader Env m) => Int -> m ByteString
|
||||
randomId n = do
|
||||
gVar <- asks idsDrg
|
||||
|
||||
+19
-13
@@ -1,6 +1,7 @@
|
||||
{-# LANGUAGE TypeApplications #-}
|
||||
|
||||
import AgentTests (agentTests)
|
||||
import Control.Logger.Simple
|
||||
import CoreTests.EncodingTests
|
||||
import CoreTests.ProtocolErrorTests
|
||||
import CoreTests.VersionRangeTests
|
||||
@@ -12,18 +13,23 @@ import System.Directory (createDirectoryIfMissing, removeDirectoryRecursive)
|
||||
import System.Environment (setEnv)
|
||||
import Test.Hspec
|
||||
|
||||
logCfg :: LogConfig
|
||||
logCfg = LogConfig {lc_file = Nothing, lc_stderr = True}
|
||||
|
||||
main :: IO ()
|
||||
main = do
|
||||
createDirectoryIfMissing False "tests/tmp"
|
||||
setEnv "APNS_KEY_ID" "H82WD9K9AQ"
|
||||
setEnv "APNS_KEY_FILE" "./tests/fixtures/AuthKey_H82WD9K9AQ.p8"
|
||||
hspec $ do
|
||||
describe "Core tests" $ do
|
||||
describe "Encoding tests" encodingTests
|
||||
describe "Protocol error tests" protocolErrorTests
|
||||
describe "Version range" versionRangeTests
|
||||
describe "SMP server via TLS" $ serverTests (transport @TLS)
|
||||
describe "SMP server via WebSockets" $ serverTests (transport @WS)
|
||||
describe "Notifications server" $ ntfServerTests (transport @TLS)
|
||||
describe "SMP client agent" $ agentTests (transport @TLS)
|
||||
removeDirectoryRecursive "tests/tmp"
|
||||
setLogLevel LogDebug
|
||||
withGlobalLogging logCfg $ do
|
||||
createDirectoryIfMissing False "tests/tmp"
|
||||
setEnv "APNS_KEY_ID" "H82WD9K9AQ"
|
||||
setEnv "APNS_KEY_FILE" "./tests/fixtures/AuthKey_H82WD9K9AQ.p8"
|
||||
hspec $ do
|
||||
describe "Core tests" $ do
|
||||
describe "Encoding tests" encodingTests
|
||||
describe "Protocol error tests" protocolErrorTests
|
||||
describe "Version range" versionRangeTests
|
||||
fdescribe "SMP server via TLS" $ serverTests (transport @TLS)
|
||||
describe "SMP server via WebSockets" $ serverTests (transport @WS)
|
||||
describe "Notifications server" $ ntfServerTests (transport @TLS)
|
||||
describe "SMP client agent" $ agentTests (transport @TLS)
|
||||
removeDirectoryRecursive "tests/tmp"
|
||||
|
||||
Reference in New Issue
Block a user