diff --git a/apps/smp-server/Main.hs b/apps/smp-server/Main.hs index 7e7438867..51e208cb7 100644 --- a/apps/smp-server/Main.hs +++ b/apps/smp-server/Main.hs @@ -26,7 +26,7 @@ logCfg = LogConfig {lc_file = Nothing, lc_stderr = True} main :: IO () main = do - setLogLevel LogInfo + setLogLevel LogDebug withGlobalLogging logCfg . protocolServerCLI smpServerCLIConfig $ \cfg@ServerConfig {inactiveClientExpiration} -> do putStrLn $ case inactiveClientExpiration of Just ExpirationConfig {ttl, checkInterval} -> "expiring clients inactive for " <> show ttl <> " seconds every " <> show checkInterval <> " seconds" diff --git a/package.yaml b/package.yaml index b68489751..5727ad9c0 100644 --- a/package.yaml +++ b/package.yaml @@ -1,5 +1,5 @@ name: simplexmq -version: 3.3.0 +version: 3.3.1 synopsis: SimpleXMQ message broker description: | This package includes <./docs/Simplex-Messaging-Server.html server>, diff --git a/simplexmq.cabal b/simplexmq.cabal index c371fbd23..6cea0f677 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -5,7 +5,7 @@ cabal-version: 1.12 -- see: https://github.com/sol/hpack name: simplexmq -version: 3.3.0 +version: 3.3.1 synopsis: SimpleXMQ message broker description: This package includes <./docs/Simplex-Messaging-Server.html server>, <./docs/Simplex-Messaging-Client.html client> and diff --git a/src/Simplex/Messaging/Notifications/Server.hs b/src/Simplex/Messaging/Notifications/Server.hs index 063aa8236..ae3fd1772 100644 --- a/src/Simplex/Messaging/Notifications/Server.hs +++ b/src/Simplex/Messaging/Notifications/Server.hs @@ -205,7 +205,6 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge incNtfStat ntfReceived SMP.END -> updateSubStatus smpQueue NSEnd _ -> pure () - pure () receiveAgent = forever $ diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index e64240dcd..bbacfa4d1 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -41,6 +41,7 @@ import Control.Monad.IO.Unlift import Control.Monad.Reader import Crypto.Random import Data.Bifunctor (first) +import Data.ByteString.Base64 (encode) import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import Data.Either (fromRight, partitionEithers) @@ -321,7 +322,7 @@ dummyKeyEd448 :: C.PublicKey 'C.Ed448 dummyKeyEd448 = "MEMwBQYDK2VxAzoA6ibQc9XpkSLtwrf7PLvp81qW/etiumckVFImCMRdftcG/XopbOSaq9qyLhrgJWKOLyNrQPNVvpMA" client :: forall m. (MonadUnliftIO m, MonadReader Env m) => Client -> Server -> m () -client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscribedQ, ntfSubscribedQ, notifiers} = +client clnt@Client {thVersion, sessionId, subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscribedQ, ntfSubscribedQ, notifiers} = forever $ atomically (readTBQueue rcvQ) >>= mapM processCommand @@ -432,7 +433,7 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv okResp <$> atomically (suspendQueue st queueId) subscribeQueue :: QueueRec -> RecipientId -> m (Transmission BrokerMsg) - subscribeQueue qr rId = + subscribeQueue qr rId = timed "subscribe" sessionId rId $ do atomically (TM.lookup rId subscriptions) >>= \case Nothing -> atomically newSub >>= deliver @@ -496,7 +497,7 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv pure ok acknowledgeMsg :: QueueRec -> MsgId -> m (Transmission BrokerMsg) - acknowledgeMsg qr msgId = do + acknowledgeMsg qr msgId = timed "ack" sessionId queueId $ do atomically (TM.lookup queueId subscriptions) >>= \case Nothing -> pure $ err NO_MSG Just sub -> @@ -540,7 +541,7 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv ServerConfig {messageExpiration, msgQueueQuota} <- asks config old <- liftIO $ mapM expireBeforeEpoch messageExpiration ntfNonceDrg <- asks idsDrg - resp@(_, _, sent) <- atomically $ do + resp@(_, _, sent) <- timed "send" sessionId queueId . atomically $ do q <- getMsgQueue ms (recipientId qr) msgQueueQuota mapM_ (deleteExpiredMsgs q) old ifM (isFull q) (pure $ err QUOTA) $ do @@ -566,7 +567,7 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv writeNtf :: NotifierId -> Message -> RcvNtfDhSecret -> TVar ChaChaDRG -> Client -> STM () writeNtf nId msg rcvNtfDhSecret ntfNonceDrg Client {sndQ = q} = - unlessM (isFullTBQueue sndQ) $ do + unlessM (isFullTBQueue q) $ do (nmsgNonce, encNMsgMeta) <- mkMessageNotification msg rcvNtfDhSecret ntfNonceDrg writeTBQueue q [(CorrId "", nId, NMSG nmsgNonce encNMsgMeta)] @@ -578,7 +579,7 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv pure . (cbNonce,) $ fromRight "" encNMsgMeta deliverMessage :: QueueRec -> RecipientId -> TVar Sub -> MsgQueue -> Maybe Message -> m (Transmission BrokerMsg) - deliverMessage qr rId sub q msg_ = do + deliverMessage qr rId sub q msg_ = timed "deliver" sessionId rId $ do readTVarIO sub >>= \case s@Sub {subThread = NoSub} -> case msg_ of @@ -596,13 +597,14 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv s@Sub {subThread = SubPending} -> s {subThread = SubThread t} s -> s where - subscriber = atomically $ do - msg <- peekMsg q - let encMsg = encryptMsg qr msg - writeTBQueue sndQ [(CorrId "", rId, MSG encMsg)] - s <- readTVar sub - void $ setDelivered s msg - writeTVar sub s {subThread = NoSub} + subscriber = do + msg <- atomically $ peekMsg q + timed "subscriber" sessionId rId . atomically $ do + let encMsg = encryptMsg qr msg + writeTBQueue sndQ [(CorrId "", rId, MSG encMsg)] + s <- readTVar sub + void $ setDelivered s msg + writeTVar sub s {subThread = NoSub} encryptMsg :: QueueRec -> Message -> RcvMessage encryptMsg qr Message {msgId, msgTs, msgFlags, msgBody} @@ -648,6 +650,18 @@ withLog action = do env <- ask liftIO . mapM_ action $ storeLog (env :: Env) +timed :: MonadUnliftIO m => T.Text -> ByteString -> RecipientId -> m a -> m a +timed name sessId qId a = do + t <- liftIO getSystemTime + r <- a + t' <- liftIO getSystemTime + let int = diff t t' + when (int > sec) . logDebug $ T.unwords [name, tshow $ encode sessId, tshow $ encode qId, tshow int] + pure r + where + diff t t' = (systemSeconds t' - systemSeconds t) * sec + fromIntegral (systemNanoseconds t' - systemNanoseconds t) + sec = 1000_000000 + randomId :: (MonadUnliftIO m, MonadReader Env m) => Int -> m ByteString randomId n = do gVar <- asks idsDrg diff --git a/src/Simplex/Messaging/Transport.hs b/src/Simplex/Messaging/Transport.hs index a012c5c9a..f95979208 100644 --- a/src/Simplex/Messaging/Transport.hs +++ b/src/Simplex/Messaging/Transport.hs @@ -99,7 +99,7 @@ supportedSMPServerVRange :: VersionRange supportedSMPServerVRange = mkVersionRange 1 4 simplexMQVersion :: String -simplexMQVersion = "3.3.0" +simplexMQVersion = "3.3.1" -- * Transport connection class diff --git a/tests/AgentTests/NotificationTests.hs b/tests/AgentTests/NotificationTests.hs index 74ca9bcc9..d340a3522 100644 --- a/tests/AgentTests/NotificationTests.hs +++ b/tests/AgentTests/NotificationTests.hs @@ -270,9 +270,9 @@ testNotificationSubscriptionNewConnection APNSMockServer {apnsQ} = do bob <- getSMPAgentClient agentCfg {database = testDB2} initAgentServers Right () <- runExceptT $ do -- alice registers notification token - _ <- registerTestToken alice "abcd" NMInstant apnsQ + DeviceToken {} <- registerTestToken alice "abcd" NMInstant apnsQ -- bob registers notification token - _ <- registerTestToken bob "bcde" NMInstant apnsQ + DeviceToken {} <- registerTestToken bob "bcde" NMInstant apnsQ -- establish connection liftIO $ threadDelay 50000 (bobId, qInfo) <- createConnection alice True SCMInvitation Nothing @@ -512,7 +512,7 @@ testSwitchNotifications servers APNSMockServer {apnsQ} = do messageNotification :: TBQueue APNSMockRequest -> ExceptT AgentErrorType IO (C.CbNonce, ByteString) messageNotification apnsQ = do - 1000000 `timeout` atomically (readTBQueue apnsQ) >>= \case + 750000 `timeout` atomically (readTBQueue apnsQ) >>= \case Nothing -> error "no notification" Just APNSMockRequest {notification = APNSNotification {aps = APNSMutableContent {}, notificationData = Just ntfData}, sendApnsResponse} -> do nonce <- C.cbNonce <$> ntfData .-> "nonce"