From d33cf644f5212e8ee3f9fb261a17e03e13f8b94a Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Thu, 3 Nov 2022 08:22:20 +0000 Subject: [PATCH 1/2] server: split atomically in SEND (#555) --- src/Simplex/Messaging/Server.hs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index bbacfa4d1..22992f610 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -541,14 +541,12 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, ServerConfig {messageExpiration, msgQueueQuota} <- asks config old <- liftIO $ mapM expireBeforeEpoch messageExpiration ntfNonceDrg <- asks idsDrg - resp@(_, _, sent) <- timed "send" sessionId queueId . atomically $ do - q <- getMsgQueue ms (recipientId qr) msgQueueQuota - mapM_ (deleteExpiredMsgs q) old - ifM (isFull q) (pure $ err QUOTA) $ do - when (notification msgFlags) $ trySendNotification msg ntfNonceDrg - writeMsg q msg - pure ok + resp@(_, _, sent) <- timed "send" sessionId queueId $ do + q <- atomically $ getMsgQueue ms (recipientId qr) msgQueueQuota + atomically $ mapM_ (deleteExpiredMsgs q) old + atomically $ ifM (isFull q) (pure $ err QUOTA) (writeMsg q msg $> ok) when (sent == OK) $ do + when (notification msgFlags) . atomically $ trySendNotification msg ntfNonceDrg stats <- asks serverStats atomically $ modifyTVar (msgSent stats) (+ 1) atomically $ updatePeriodStats (activeQueues stats) (recipientId qr) From 29b99d6716abd44296af18896d9c8f0216944469 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Thu, 3 Nov 2022 09:11:53 +0000 Subject: [PATCH 2/2] server: additional logs for slow operations (#556) * server: additional logs for slow operations * more time logs --- src/Simplex/Messaging/Server.hs | 64 ++++++++++++++++++--------------- 1 file changed, 35 insertions(+), 29 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 22992f610..8bdafaf04 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -354,7 +354,7 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, DEL -> delQueueAndMsgs st where createQueue :: QueueStore -> RcvPublicVerifyKey -> RcvPublicDhKey -> m (Transmission BrokerMsg) - createQueue st recipientKey dhKey = do + createQueue st recipientKey dhKey = time "NEW" $ do (rcvPublicDhKey, privDhKey) <- liftIO C.generateKeyPair' let rcvDhSecret = C.dh' dhKey privDhKey qik (rcvId, sndId) = QIK {rcvId, sndId, rcvPublicDhKey} @@ -398,14 +398,14 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, liftM2 (,) (randomId n) (randomId n) secureQueue_ :: QueueStore -> SndPublicVerifyKey -> m (Transmission BrokerMsg) - secureQueue_ st sKey = do + secureQueue_ st sKey = time "KEY" $ do withLog $ \s -> logSecureQueue s queueId sKey stats <- asks serverStats atomically $ modifyTVar (qSecured stats) (+ 1) atomically $ (corrId,queueId,) . either ERR (const OK) <$> secureQueue st queueId sKey addQueueNotifier_ :: QueueStore -> NtfPublicVerifyKey -> RcvNtfPublicDhKey -> m (Transmission BrokerMsg) - addQueueNotifier_ st notifierKey dhKey = do + addQueueNotifier_ st notifierKey dhKey = time "NKEY" $ do (rcvPublicDhKey, privDhKey) <- liftIO C.generateKeyPair' let rcvNtfDhSecret = C.dh' dhKey privDhKey (corrId,queueId,) <$> addNotifierRetry 3 rcvPublicDhKey rcvNtfDhSecret @@ -433,10 +433,10 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, okResp <$> atomically (suspendQueue st queueId) subscribeQueue :: QueueRec -> RecipientId -> m (Transmission BrokerMsg) - subscribeQueue qr rId = timed "subscribe" sessionId rId $ do + subscribeQueue qr rId = do atomically (TM.lookup rId subscriptions) >>= \case Nothing -> - atomically newSub >>= deliver + newSub >>= deliver Just sub -> readTVarIO sub >>= \case Sub {subThread = ProhibitSub} -> @@ -445,20 +445,20 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, s -> atomically (tryTakeTMVar $ delivered s) >> deliver sub where - newSub :: STM (TVar Sub) - newSub = do + newSub :: m (TVar Sub) + newSub = time "SUB newSub" . atomically $ do writeTBQueue subscribedQ (rId, clnt) sub <- newTVar =<< newSubscription NoSub TM.insert rId sub subscriptions pure sub deliver :: TVar Sub -> m (Transmission BrokerMsg) deliver sub = do - q <- getStoreMsgQueue rId + q <- getStoreMsgQueue "SUB" rId msg_ <- atomically $ tryPeekMsg q - deliverMessage qr rId sub q msg_ + deliverMessage "SUB" qr rId sub q msg_ getMessage :: QueueRec -> m (Transmission BrokerMsg) - getMessage qr = + getMessage qr = time "GET" $ do atomically (TM.lookup queueId subscriptions) >>= \case Nothing -> atomically newSub >>= getMessage_ @@ -478,7 +478,7 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, pure s getMessage_ :: Sub -> m (Transmission BrokerMsg) getMessage_ s = do - q <- getStoreMsgQueue queueId + q <- getStoreMsgQueue "GET" queueId atomically $ tryPeekMsg q >>= \case Just msg -> @@ -490,20 +490,20 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, withQueue action = maybe (pure $ err AUTH) action qr_ subscribeNotifications :: m (Transmission BrokerMsg) - subscribeNotifications = atomically $ do + subscribeNotifications = time "NSUB" . atomically $ do unlessM (TM.member queueId ntfSubscriptions) $ do writeTBQueue ntfSubscribedQ (queueId, clnt) TM.insert queueId () ntfSubscriptions pure ok acknowledgeMsg :: QueueRec -> MsgId -> m (Transmission BrokerMsg) - acknowledgeMsg qr msgId = timed "ack" sessionId queueId $ do + acknowledgeMsg qr msgId = time "ACK" $ do atomically (TM.lookup queueId subscriptions) >>= \case Nothing -> pure $ err NO_MSG Just sub -> atomically (getDelivered sub) >>= \case Just s -> do - q <- getStoreMsgQueue queueId + q <- getStoreMsgQueue "ACK" queueId case s of Sub {subThread = ProhibitSub} -> do msgDeleted <- atomically $ tryDelMsg q msgId @@ -512,7 +512,7 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, _ -> do (msgDeleted, msg_) <- atomically $ tryDelPeekMsg q msgId when msgDeleted updateStats - deliverMessage qr queueId sub q msg_ + deliverMessage "ACK" qr queueId sub q msg_ _ -> pure $ err NO_MSG where getDelivered :: TVar Sub -> STM (Maybe Sub) @@ -537,16 +537,13 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, mapM mkMessage (C.maxLenBS msgBody) >>= \case Left _ -> pure $ err LARGE_MSG Right msg -> do - ms <- asks msgStore - ServerConfig {messageExpiration, msgQueueQuota} <- asks config - old <- liftIO $ mapM expireBeforeEpoch messageExpiration - ntfNonceDrg <- asks idsDrg - resp@(_, _, sent) <- timed "send" sessionId queueId $ do - q <- atomically $ getMsgQueue ms (recipientId qr) msgQueueQuota - atomically $ mapM_ (deleteExpiredMsgs q) old + resp@(_, _, sent) <- time "SEND" $ do + q <- getStoreMsgQueue "SEND" $ recipientId qr + expireMessages q atomically $ ifM (isFull q) (pure $ err QUOTA) (writeMsg q msg $> ok) - when (sent == OK) $ do - when (notification msgFlags) . atomically $ trySendNotification msg ntfNonceDrg + when (sent == OK) . time "SEND ok" $ do + when (notification msgFlags) $ + atomically . trySendNotification msg =<< asks idsDrg stats <- asks serverStats atomically $ modifyTVar (msgSent stats) (+ 1) atomically $ updatePeriodStats (activeQueues stats) (recipientId qr) @@ -558,6 +555,12 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, msgTs <- liftIO getSystemTime pure $ Message msgId msgTs msgFlags body + expireMessages :: MsgQueue -> m () + expireMessages q = do + msgExp <- asks $ messageExpiration . config + old <- liftIO $ mapM expireBeforeEpoch msgExp + atomically $ mapM_ (deleteExpiredMsgs q) old + trySendNotification :: Message -> TVar ChaChaDRG -> STM () trySendNotification msg ntfNonceDrg = forM_ (notifier qr) $ \NtfCreds {notifierId, rcvNtfDhSecret} -> @@ -576,8 +579,8 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, encNMsgMeta = C.cbEncrypt rcvNtfDhSecret cbNonce (smpEncode msgMeta) 128 pure . (cbNonce,) $ fromRight "" encNMsgMeta - deliverMessage :: QueueRec -> RecipientId -> TVar Sub -> MsgQueue -> Maybe Message -> m (Transmission BrokerMsg) - deliverMessage qr rId sub q msg_ = timed "deliver" sessionId rId $ do + deliverMessage :: T.Text -> QueueRec -> RecipientId -> TVar Sub -> MsgQueue -> Maybe Message -> m (Transmission BrokerMsg) + deliverMessage name qr rId sub q msg_ = time (name <> " deliver") $ do readTVarIO sub >>= \case s@Sub {subThread = NoSub} -> case msg_ of @@ -597,13 +600,16 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, where subscriber = do msg <- atomically $ peekMsg q - timed "subscriber" sessionId rId . atomically $ do + time "subscriber" . atomically $ do let encMsg = encryptMsg qr msg writeTBQueue sndQ [(CorrId "", rId, MSG encMsg)] s <- readTVar sub void $ setDelivered s msg writeTVar sub s {subThread = NoSub} + time :: T.Text -> m a -> m a + time name = timed name sessionId queueId + encryptMsg :: QueueRec -> Message -> RcvMessage encryptMsg qr Message {msgId, msgTs, msgFlags, msgBody} | thVersion == 1 || thVersion == 2 = encrypt msgBody @@ -617,8 +623,8 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, setDelivered :: Sub -> Message -> STM Bool setDelivered s Message {msgId} = tryPutTMVar (delivered s) msgId - getStoreMsgQueue :: RecipientId -> m MsgQueue - getStoreMsgQueue rId = do + getStoreMsgQueue :: T.Text -> RecipientId -> m MsgQueue + getStoreMsgQueue name rId = time (name <> " getMsgQueue") $ do ms <- asks msgStore quota <- asks $ msgQueueQuota . config atomically $ getMsgQueue ms rId quota