mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-30 14:16:00 +00:00
server: time/detect blocked message delivery actions (#546)
* server: log slow operations * v3.3.1 * base-64 encode IDs * fixing test * log * log * revert * log * fix * reduce timeout Co-authored-by: JRoberts <8711996+jr-simplex@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
029fc6e781
commit
6fc3b26970
@@ -26,7 +26,7 @@ logCfg = LogConfig {lc_file = Nothing, lc_stderr = True}
|
||||
|
||||
main :: IO ()
|
||||
main = do
|
||||
setLogLevel LogInfo
|
||||
setLogLevel LogDebug
|
||||
withGlobalLogging logCfg . protocolServerCLI smpServerCLIConfig $ \cfg@ServerConfig {inactiveClientExpiration} -> do
|
||||
putStrLn $ case inactiveClientExpiration of
|
||||
Just ExpirationConfig {ttl, checkInterval} -> "expiring clients inactive for " <> show ttl <> " seconds every " <> show checkInterval <> " seconds"
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
name: simplexmq
|
||||
version: 3.3.0
|
||||
version: 3.3.1
|
||||
synopsis: SimpleXMQ message broker
|
||||
description: |
|
||||
This package includes <./docs/Simplex-Messaging-Server.html server>,
|
||||
|
||||
@@ -5,7 +5,7 @@ cabal-version: 1.12
|
||||
-- see: https://github.com/sol/hpack
|
||||
|
||||
name: simplexmq
|
||||
version: 3.3.0
|
||||
version: 3.3.1
|
||||
synopsis: SimpleXMQ message broker
|
||||
description: This package includes <./docs/Simplex-Messaging-Server.html server>,
|
||||
<./docs/Simplex-Messaging-Client.html client> and
|
||||
|
||||
@@ -205,7 +205,6 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge
|
||||
incNtfStat ntfReceived
|
||||
SMP.END -> updateSubStatus smpQueue NSEnd
|
||||
_ -> pure ()
|
||||
pure ()
|
||||
|
||||
receiveAgent =
|
||||
forever $
|
||||
|
||||
@@ -41,6 +41,7 @@ import Control.Monad.IO.Unlift
|
||||
import Control.Monad.Reader
|
||||
import Crypto.Random
|
||||
import Data.Bifunctor (first)
|
||||
import Data.ByteString.Base64 (encode)
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.Either (fromRight, partitionEithers)
|
||||
@@ -321,7 +322,7 @@ dummyKeyEd448 :: C.PublicKey 'C.Ed448
|
||||
dummyKeyEd448 = "MEMwBQYDK2VxAzoA6ibQc9XpkSLtwrf7PLvp81qW/etiumckVFImCMRdftcG/XopbOSaq9qyLhrgJWKOLyNrQPNVvpMA"
|
||||
|
||||
client :: forall m. (MonadUnliftIO m, MonadReader Env m) => Client -> Server -> m ()
|
||||
client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscribedQ, ntfSubscribedQ, notifiers} =
|
||||
client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscribedQ, ntfSubscribedQ, notifiers} =
|
||||
forever $
|
||||
atomically (readTBQueue rcvQ)
|
||||
>>= mapM processCommand
|
||||
@@ -432,7 +433,7 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv
|
||||
okResp <$> atomically (suspendQueue st queueId)
|
||||
|
||||
subscribeQueue :: QueueRec -> RecipientId -> m (Transmission BrokerMsg)
|
||||
subscribeQueue qr rId =
|
||||
subscribeQueue qr rId = timed "subscribe" sessionId rId $ do
|
||||
atomically (TM.lookup rId subscriptions) >>= \case
|
||||
Nothing ->
|
||||
atomically newSub >>= deliver
|
||||
@@ -496,7 +497,7 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv
|
||||
pure ok
|
||||
|
||||
acknowledgeMsg :: QueueRec -> MsgId -> m (Transmission BrokerMsg)
|
||||
acknowledgeMsg qr msgId = do
|
||||
acknowledgeMsg qr msgId = timed "ack" sessionId queueId $ do
|
||||
atomically (TM.lookup queueId subscriptions) >>= \case
|
||||
Nothing -> pure $ err NO_MSG
|
||||
Just sub ->
|
||||
@@ -540,7 +541,7 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv
|
||||
ServerConfig {messageExpiration, msgQueueQuota} <- asks config
|
||||
old <- liftIO $ mapM expireBeforeEpoch messageExpiration
|
||||
ntfNonceDrg <- asks idsDrg
|
||||
resp@(_, _, sent) <- atomically $ do
|
||||
resp@(_, _, sent) <- timed "send" sessionId queueId . atomically $ do
|
||||
q <- getMsgQueue ms (recipientId qr) msgQueueQuota
|
||||
mapM_ (deleteExpiredMsgs q) old
|
||||
ifM (isFull q) (pure $ err QUOTA) $ do
|
||||
@@ -566,7 +567,7 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv
|
||||
|
||||
writeNtf :: NotifierId -> Message -> RcvNtfDhSecret -> TVar ChaChaDRG -> Client -> STM ()
|
||||
writeNtf nId msg rcvNtfDhSecret ntfNonceDrg Client {sndQ = q} =
|
||||
unlessM (isFullTBQueue sndQ) $ do
|
||||
unlessM (isFullTBQueue q) $ do
|
||||
(nmsgNonce, encNMsgMeta) <- mkMessageNotification msg rcvNtfDhSecret ntfNonceDrg
|
||||
writeTBQueue q [(CorrId "", nId, NMSG nmsgNonce encNMsgMeta)]
|
||||
|
||||
@@ -578,7 +579,7 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv
|
||||
pure . (cbNonce,) $ fromRight "" encNMsgMeta
|
||||
|
||||
deliverMessage :: QueueRec -> RecipientId -> TVar Sub -> MsgQueue -> Maybe Message -> m (Transmission BrokerMsg)
|
||||
deliverMessage qr rId sub q msg_ = do
|
||||
deliverMessage qr rId sub q msg_ = timed "deliver" sessionId rId $ do
|
||||
readTVarIO sub >>= \case
|
||||
s@Sub {subThread = NoSub} ->
|
||||
case msg_ of
|
||||
@@ -596,13 +597,14 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv
|
||||
s@Sub {subThread = SubPending} -> s {subThread = SubThread t}
|
||||
s -> s
|
||||
where
|
||||
subscriber = atomically $ do
|
||||
msg <- peekMsg q
|
||||
let encMsg = encryptMsg qr msg
|
||||
writeTBQueue sndQ [(CorrId "", rId, MSG encMsg)]
|
||||
s <- readTVar sub
|
||||
void $ setDelivered s msg
|
||||
writeTVar sub s {subThread = NoSub}
|
||||
subscriber = do
|
||||
msg <- atomically $ peekMsg q
|
||||
timed "subscriber" sessionId rId . atomically $ do
|
||||
let encMsg = encryptMsg qr msg
|
||||
writeTBQueue sndQ [(CorrId "", rId, MSG encMsg)]
|
||||
s <- readTVar sub
|
||||
void $ setDelivered s msg
|
||||
writeTVar sub s {subThread = NoSub}
|
||||
|
||||
encryptMsg :: QueueRec -> Message -> RcvMessage
|
||||
encryptMsg qr Message {msgId, msgTs, msgFlags, msgBody}
|
||||
@@ -648,6 +650,18 @@ withLog action = do
|
||||
env <- ask
|
||||
liftIO . mapM_ action $ storeLog (env :: Env)
|
||||
|
||||
timed :: MonadUnliftIO m => T.Text -> ByteString -> RecipientId -> m a -> m a
|
||||
timed name sessId qId a = do
|
||||
t <- liftIO getSystemTime
|
||||
r <- a
|
||||
t' <- liftIO getSystemTime
|
||||
let int = diff t t'
|
||||
when (int > sec) . logDebug $ T.unwords [name, tshow $ encode sessId, tshow $ encode qId, tshow int]
|
||||
pure r
|
||||
where
|
||||
diff t t' = (systemSeconds t' - systemSeconds t) * sec + fromIntegral (systemNanoseconds t' - systemNanoseconds t)
|
||||
sec = 1000_000000
|
||||
|
||||
randomId :: (MonadUnliftIO m, MonadReader Env m) => Int -> m ByteString
|
||||
randomId n = do
|
||||
gVar <- asks idsDrg
|
||||
|
||||
@@ -99,7 +99,7 @@ supportedSMPServerVRange :: VersionRange
|
||||
supportedSMPServerVRange = mkVersionRange 1 4
|
||||
|
||||
simplexMQVersion :: String
|
||||
simplexMQVersion = "3.3.0"
|
||||
simplexMQVersion = "3.3.1"
|
||||
|
||||
-- * Transport connection class
|
||||
|
||||
|
||||
@@ -270,9 +270,9 @@ testNotificationSubscriptionNewConnection APNSMockServer {apnsQ} = do
|
||||
bob <- getSMPAgentClient agentCfg {database = testDB2} initAgentServers
|
||||
Right () <- runExceptT $ do
|
||||
-- alice registers notification token
|
||||
_ <- registerTestToken alice "abcd" NMInstant apnsQ
|
||||
DeviceToken {} <- registerTestToken alice "abcd" NMInstant apnsQ
|
||||
-- bob registers notification token
|
||||
_ <- registerTestToken bob "bcde" NMInstant apnsQ
|
||||
DeviceToken {} <- registerTestToken bob "bcde" NMInstant apnsQ
|
||||
-- establish connection
|
||||
liftIO $ threadDelay 50000
|
||||
(bobId, qInfo) <- createConnection alice True SCMInvitation Nothing
|
||||
@@ -512,7 +512,7 @@ testSwitchNotifications servers APNSMockServer {apnsQ} = do
|
||||
|
||||
messageNotification :: TBQueue APNSMockRequest -> ExceptT AgentErrorType IO (C.CbNonce, ByteString)
|
||||
messageNotification apnsQ = do
|
||||
1000000 `timeout` atomically (readTBQueue apnsQ) >>= \case
|
||||
750000 `timeout` atomically (readTBQueue apnsQ) >>= \case
|
||||
Nothing -> error "no notification"
|
||||
Just APNSMockRequest {notification = APNSNotification {aps = APNSMutableContent {}, notificationData = Just ntfData}, sendApnsResponse} -> do
|
||||
nonce <- C.cbNonce <$> ntfData .-> "nonce"
|
||||
|
||||
Reference in New Issue
Block a user