From 14c9e581a1a28967ad4d0ec1bce09cf71e9eed93 Mon Sep 17 00:00:00 2001 From: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com> Date: Thu, 26 Sep 2024 17:57:20 +0400 Subject: [PATCH] ntf: measure batch size --- src/Simplex/Messaging/Client.hs | 33 +++++++++++-------- src/Simplex/Messaging/Notifications/Client.hs | 2 +- tests/AgentTests.hs | 2 +- tests/AgentTests/NotificationTests.hs | 4 +-- 4 files changed, 24 insertions(+), 17 deletions(-) diff --git a/src/Simplex/Messaging/Client.hs b/src/Simplex/Messaging/Client.hs index 1e7163161..ca0f22591 100644 --- a/src/Simplex/Messaging/Client.hs +++ b/src/Simplex/Messaging/Client.hs @@ -117,12 +117,12 @@ import Control.Monad.Trans.Except import Crypto.Random (ChaChaDRG) import qualified Data.Aeson.TH as J import qualified Data.Attoparsec.ByteString.Char8 as A +import qualified Data.ByteString.Base64 as B64 import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B -import qualified Data.ByteString.Base64 as B64 import Data.Functor (($>)) import Data.Int (Int64) -import Data.List (find) +import Data.List (find, intercalate) import Data.List.NonEmpty (NonEmpty (..)) import qualified Data.List.NonEmpty as L import Data.Maybe (catMaybes, fromMaybe) @@ -383,12 +383,13 @@ clientSocksCredentials NetworkConfig {socksProxy, sessionMode} proxySessTs (user Nothing -> Nothing where sessionUsername = - B64.encode $ C.sha256Hash $ - bshow userId <> case sessionMode of - TSMUser -> "" - TSMSession -> ":" <> bshow proxySessTs - TSMServer -> ":" <> bshow proxySessTs <> "@" <> strEncode srv - TSMEntity -> ":" <> bshow proxySessTs <> "@" <> strEncode srv <> maybe "" ("/" <>) entityId_ + B64.encode $ + C.sha256Hash $ + bshow userId <> case sessionMode of + TSMUser -> "" + TSMSession -> ":" <> bshow proxySessTs + TSMServer -> ":" <> bshow proxySessTs <> "@" <> strEncode srv + TSMEntity -> ":" <> bshow proxySessTs <> "@" <> strEncode srv <> maybe "" ("/" <>) entityId_ -- | protocol client configuration. data ProtocolClientConfig v = ProtocolClientConfig @@ -715,7 +716,7 @@ subscribeSMPQueue c@ProtocolClient {client_ = PClient {sendPings}} rpKey rId = d subscribeSMPQueues :: SMPClient -> NonEmpty (RcvPrivateAuthKey, RecipientId) -> IO (NonEmpty (Either SMPClientError ())) subscribeSMPQueues c@ProtocolClient {client_ = PClient {sendPings}} qs = do atomically $ writeTVar sendPings True - sendProtocolCommands c cs >>= mapM (processSUBResponse c) + sendProtocolCommands "subscribeSMPQueues" c cs >>= mapM (processSUBResponse c) where cs = L.map (\(rpKey, rId) -> (Just rpKey, rId, Cmd SRecipient SUB)) qs @@ -788,7 +789,7 @@ enableSMPQueueNotifications c rpKey rId notifierKey rcvNtfPublicDhKey = -- | Enable notifications for the multiple queues for push notifications server. enableSMPQueuesNtfs :: SMPClient -> NonEmpty (RcvPrivateAuthKey, RecipientId, NtfPublicAuthKey, RcvNtfPublicDhKey) -> IO (NonEmpty (Either SMPClientError (NotifierId, RcvNtfPublicDhKey))) -enableSMPQueuesNtfs c qs = L.map process <$> sendProtocolCommands c cs +enableSMPQueuesNtfs c qs = L.map process <$> sendProtocolCommands "enableSMPQueuesNtfs" c cs where cs = L.map (\(rpKey, rId, notifierKey, rcvNtfPublicDhKey) -> (Just rpKey, rId, Cmd SRecipient $ NKEY notifierKey rcvNtfPublicDhKey)) qs process (Response _ r) = case r of @@ -1014,7 +1015,7 @@ okSMPCommand cmd c pKey qId = r -> throwE $ unexpectedResponse r okSMPCommands :: PartyI p => Command p -> SMPClient -> NonEmpty (C.APrivateAuthKey, QueueId) -> IO (NonEmpty (Either SMPClientError ())) -okSMPCommands cmd c qs = L.map process <$> sendProtocolCommands c cs +okSMPCommands cmd c qs = L.map process <$> sendProtocolCommands "okSMPCommands" c cs where aCmd = Cmd sParty cmd cs = L.map (\(pKey, qId) -> (Just pKey, qId, aCmd)) qs @@ -1031,11 +1032,17 @@ sendSMPCommand c pKey qId cmd = sendProtocolCommand c pKey qId (Cmd sParty cmd) type PCTransmission err msg = (Either TransportError SentRawTransmission, Request err msg) -- | Send multiple commands with batching and collect responses -sendProtocolCommands :: forall v err msg. Protocol v err msg => ProtocolClient v err msg -> NonEmpty (ClientCommand msg) -> IO (NonEmpty (Response err msg)) -sendProtocolCommands c@ProtocolClient {thParams = THandleParams {batch, blockSize}} cs = do +sendProtocolCommands :: forall v err msg. Protocol v err msg => String -> ProtocolClient v err msg -> NonEmpty (ClientCommand msg) -> IO (NonEmpty (Response err msg)) +sendProtocolCommands name c@ProtocolClient {thParams = THandleParams {batch, blockSize}} cs = do bs <- batchTransmissions' batch blockSize <$> mapM (mkTransmission c) cs + let size = intercalate "," $ map (show . batchSize) bs + liftIO $ print $ show name <> " length cs = " <> show (length cs) <> " length bs = " <> show (length bs) <> ", size = " <> show size <> "/" <> show blockSize validate . concat =<< mapM (sendBatch c) bs where + batchSize = \case + TBTransmissions bytestring _ _ -> B.length bytestring + TBTransmission bytestring _ -> B.length bytestring + TBError _ _ -> 0 validate :: [Response err msg] -> IO (NonEmpty (Response err msg)) validate rs | diff == 0 = pure $ L.fromList rs diff --git a/src/Simplex/Messaging/Notifications/Client.hs b/src/Simplex/Messaging/Notifications/Client.hs index 21daa068d..1b27bddab 100644 --- a/src/Simplex/Messaging/Notifications/Client.hs +++ b/src/Simplex/Messaging/Notifications/Client.hs @@ -54,7 +54,7 @@ ntfCreateSubscription c pKey newSub = r -> throwE $ unexpectedResponse r ntfCreateSubscriptions :: NtfClient -> C.APrivateAuthKey -> NonEmpty (NewNtfEntity 'Subscription) -> IO (NonEmpty (Either NtfClientError NtfSubscriptionId)) -ntfCreateSubscriptions c pKey newSubs = L.map process <$> sendProtocolCommands c cs +ntfCreateSubscriptions c pKey newSubs = L.map process <$> sendProtocolCommands "ntfCreateSubscriptions" c cs where cs = L.map (\newSub -> (Just pKey, NoEntity, NtfCmd SSubscription $ SNEW newSub)) newSubs process (Response _ r) = case r of diff --git a/tests/AgentTests.hs b/tests/AgentTests.hs index c9e11f296..bdf8e1f52 100644 --- a/tests/AgentTests.hs +++ b/tests/AgentTests.hs @@ -24,6 +24,6 @@ agentTests (ATransport t) = do describe "Connection request" connectionRequestTests describe "Double ratchet tests" doubleRatchetTests describe "Functional API" $ functionalAPITests (ATransport t) - describe "Notification tests" $ notificationTests (ATransport t) + fdescribe "Notification tests" $ notificationTests (ATransport t) describe "SQLite store" storeTests describe "Migration tests" migrationTests diff --git a/tests/AgentTests/NotificationTests.hs b/tests/AgentTests/NotificationTests.hs index a25564a0b..cefca6cca 100644 --- a/tests/AgentTests/NotificationTests.hs +++ b/tests/AgentTests/NotificationTests.hs @@ -141,10 +141,10 @@ notificationTests t = do it "should resume subscriptions after SMP server is restarted" $ withAPNSMockServer $ \apns -> withNtfServer t $ testNotificationsSMPRestart t apns - describe "Notifications after SMP server restart" $ + fdescribe "Notifications after SMP server restart" $ it "should resume batched subscriptions after SMP server is restarted" $ withAPNSMockServer $ \apns -> - withNtfServer t $ testNotificationsSMPRestartBatch 100 t apns + withNtfServer t $ testNotificationsSMPRestartBatch 150 t apns describe "should switch notifications to the new queue" $ testServerMatrix2 t $ \servers -> withAPNSMockServer $ \apns ->