diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index bbacfa4d1..8bdafaf04 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -354,7 +354,7 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, DEL -> delQueueAndMsgs st where createQueue :: QueueStore -> RcvPublicVerifyKey -> RcvPublicDhKey -> m (Transmission BrokerMsg) - createQueue st recipientKey dhKey = do + createQueue st recipientKey dhKey = time "NEW" $ do (rcvPublicDhKey, privDhKey) <- liftIO C.generateKeyPair' let rcvDhSecret = C.dh' dhKey privDhKey qik (rcvId, sndId) = QIK {rcvId, sndId, rcvPublicDhKey} @@ -398,14 +398,14 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, liftM2 (,) (randomId n) (randomId n) secureQueue_ :: QueueStore -> SndPublicVerifyKey -> m (Transmission BrokerMsg) - secureQueue_ st sKey = do + secureQueue_ st sKey = time "KEY" $ do withLog $ \s -> logSecureQueue s queueId sKey stats <- asks serverStats atomically $ modifyTVar (qSecured stats) (+ 1) atomically $ (corrId,queueId,) . either ERR (const OK) <$> secureQueue st queueId sKey addQueueNotifier_ :: QueueStore -> NtfPublicVerifyKey -> RcvNtfPublicDhKey -> m (Transmission BrokerMsg) - addQueueNotifier_ st notifierKey dhKey = do + addQueueNotifier_ st notifierKey dhKey = time "NKEY" $ do (rcvPublicDhKey, privDhKey) <- liftIO C.generateKeyPair' let rcvNtfDhSecret = C.dh' dhKey privDhKey (corrId,queueId,) <$> addNotifierRetry 3 rcvPublicDhKey rcvNtfDhSecret @@ -433,10 +433,10 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, okResp <$> atomically (suspendQueue st queueId) subscribeQueue :: QueueRec -> RecipientId -> m (Transmission BrokerMsg) - subscribeQueue qr rId = timed "subscribe" sessionId rId $ do + subscribeQueue qr rId = do atomically (TM.lookup rId subscriptions) >>= \case Nothing -> - atomically newSub >>= deliver + newSub >>= deliver Just sub -> readTVarIO sub >>= \case Sub {subThread = ProhibitSub} -> @@ -445,20 +445,20 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, s -> atomically (tryTakeTMVar $ delivered s) >> deliver sub where - newSub :: STM (TVar Sub) - newSub = do + newSub :: m (TVar Sub) + newSub = time "SUB newSub" . atomically $ do writeTBQueue subscribedQ (rId, clnt) sub <- newTVar =<< newSubscription NoSub TM.insert rId sub subscriptions pure sub deliver :: TVar Sub -> m (Transmission BrokerMsg) deliver sub = do - q <- getStoreMsgQueue rId + q <- getStoreMsgQueue "SUB" rId msg_ <- atomically $ tryPeekMsg q - deliverMessage qr rId sub q msg_ + deliverMessage "SUB" qr rId sub q msg_ getMessage :: QueueRec -> m (Transmission BrokerMsg) - getMessage qr = + getMessage qr = time "GET" $ do atomically (TM.lookup queueId subscriptions) >>= \case Nothing -> atomically newSub >>= getMessage_ @@ -478,7 +478,7 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, pure s getMessage_ :: Sub -> m (Transmission BrokerMsg) getMessage_ s = do - q <- getStoreMsgQueue queueId + q <- getStoreMsgQueue "GET" queueId atomically $ tryPeekMsg q >>= \case Just msg -> @@ -490,20 +490,20 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, withQueue action = maybe (pure $ err AUTH) action qr_ subscribeNotifications :: m (Transmission BrokerMsg) - subscribeNotifications = atomically $ do + subscribeNotifications = time "NSUB" . atomically $ do unlessM (TM.member queueId ntfSubscriptions) $ do writeTBQueue ntfSubscribedQ (queueId, clnt) TM.insert queueId () ntfSubscriptions pure ok acknowledgeMsg :: QueueRec -> MsgId -> m (Transmission BrokerMsg) - acknowledgeMsg qr msgId = timed "ack" sessionId queueId $ do + acknowledgeMsg qr msgId = time "ACK" $ do atomically (TM.lookup queueId subscriptions) >>= \case Nothing -> pure $ err NO_MSG Just sub -> atomically (getDelivered sub) >>= \case Just s -> do - q <- getStoreMsgQueue queueId + q <- getStoreMsgQueue "ACK" queueId case s of Sub {subThread = ProhibitSub} -> do msgDeleted <- atomically $ tryDelMsg q msgId @@ -512,7 +512,7 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, _ -> do (msgDeleted, msg_) <- atomically $ tryDelPeekMsg q msgId when msgDeleted updateStats - deliverMessage qr queueId sub q msg_ + deliverMessage "ACK" qr queueId sub q msg_ _ -> pure $ err NO_MSG where getDelivered :: TVar Sub -> STM (Maybe Sub) @@ -537,18 +537,13 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, mapM mkMessage (C.maxLenBS msgBody) >>= \case Left _ -> pure $ err LARGE_MSG Right msg -> do - ms <- asks msgStore - ServerConfig {messageExpiration, msgQueueQuota} <- asks config - old <- liftIO $ mapM expireBeforeEpoch messageExpiration - ntfNonceDrg <- asks idsDrg - resp@(_, _, sent) <- timed "send" sessionId queueId . atomically $ do - q <- getMsgQueue ms (recipientId qr) msgQueueQuota - mapM_ (deleteExpiredMsgs q) old - ifM (isFull q) (pure $ err QUOTA) $ do - when (notification msgFlags) $ trySendNotification msg ntfNonceDrg - writeMsg q msg - pure ok - when (sent == OK) $ do + resp@(_, _, sent) <- time "SEND" $ do + q <- getStoreMsgQueue "SEND" $ recipientId qr + expireMessages q + atomically $ ifM (isFull q) (pure $ err QUOTA) (writeMsg q msg $> ok) + when (sent == OK) . time "SEND ok" $ do + when (notification msgFlags) $ + atomically . trySendNotification msg =<< asks idsDrg stats <- asks serverStats atomically $ modifyTVar (msgSent stats) (+ 1) atomically $ updatePeriodStats (activeQueues stats) (recipientId qr) @@ -560,6 +555,12 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, msgTs <- liftIO getSystemTime pure $ Message msgId msgTs msgFlags body + expireMessages :: MsgQueue -> m () + expireMessages q = do + msgExp <- asks $ messageExpiration . config + old <- liftIO $ mapM expireBeforeEpoch msgExp + atomically $ mapM_ (deleteExpiredMsgs q) old + trySendNotification :: Message -> TVar ChaChaDRG -> STM () trySendNotification msg ntfNonceDrg = forM_ (notifier qr) $ \NtfCreds {notifierId, rcvNtfDhSecret} -> @@ -578,8 +579,8 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, encNMsgMeta = C.cbEncrypt rcvNtfDhSecret cbNonce (smpEncode msgMeta) 128 pure . (cbNonce,) $ fromRight "" encNMsgMeta - deliverMessage :: QueueRec -> RecipientId -> TVar Sub -> MsgQueue -> Maybe Message -> m (Transmission BrokerMsg) - deliverMessage qr rId sub q msg_ = timed "deliver" sessionId rId $ do + deliverMessage :: T.Text -> QueueRec -> RecipientId -> TVar Sub -> MsgQueue -> Maybe Message -> m (Transmission BrokerMsg) + deliverMessage name qr rId sub q msg_ = time (name <> " deliver") $ do readTVarIO sub >>= \case s@Sub {subThread = NoSub} -> case msg_ of @@ -599,13 +600,16 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, where subscriber = do msg <- atomically $ peekMsg q - timed "subscriber" sessionId rId . atomically $ do + time "subscriber" . atomically $ do let encMsg = encryptMsg qr msg writeTBQueue sndQ [(CorrId "", rId, MSG encMsg)] s <- readTVar sub void $ setDelivered s msg writeTVar sub s {subThread = NoSub} + time :: T.Text -> m a -> m a + time name = timed name sessionId queueId + encryptMsg :: QueueRec -> Message -> RcvMessage encryptMsg qr Message {msgId, msgTs, msgFlags, msgBody} | thVersion == 1 || thVersion == 2 = encrypt msgBody @@ -619,8 +623,8 @@ client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, setDelivered :: Sub -> Message -> STM Bool setDelivered s Message {msgId} = tryPutTMVar (delivered s) msgId - getStoreMsgQueue :: RecipientId -> m MsgQueue - getStoreMsgQueue rId = do + getStoreMsgQueue :: T.Text -> RecipientId -> m MsgQueue + getStoreMsgQueue name rId = time (name <> " getMsgQueue") $ do ms <- asks msgStore quota <- asks $ msgQueueQuota . config atomically $ getMsgQueue ms rId quota