mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-26 16:15:08 +00:00
ntf: measure batch size
This commit is contained in:
@@ -117,12 +117,12 @@ import Control.Monad.Trans.Except
|
||||
import Crypto.Random (ChaChaDRG)
|
||||
import qualified Data.Aeson.TH as J
|
||||
import qualified Data.Attoparsec.ByteString.Char8 as A
|
||||
import qualified Data.ByteString.Base64 as B64
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import qualified Data.ByteString.Base64 as B64
|
||||
import Data.Functor (($>))
|
||||
import Data.Int (Int64)
|
||||
import Data.List (find)
|
||||
import Data.List (find, intercalate)
|
||||
import Data.List.NonEmpty (NonEmpty (..))
|
||||
import qualified Data.List.NonEmpty as L
|
||||
import Data.Maybe (catMaybes, fromMaybe)
|
||||
@@ -383,12 +383,13 @@ clientSocksCredentials NetworkConfig {socksProxy, sessionMode} proxySessTs (user
|
||||
Nothing -> Nothing
|
||||
where
|
||||
sessionUsername =
|
||||
B64.encode $ C.sha256Hash $
|
||||
bshow userId <> case sessionMode of
|
||||
TSMUser -> ""
|
||||
TSMSession -> ":" <> bshow proxySessTs
|
||||
TSMServer -> ":" <> bshow proxySessTs <> "@" <> strEncode srv
|
||||
TSMEntity -> ":" <> bshow proxySessTs <> "@" <> strEncode srv <> maybe "" ("/" <>) entityId_
|
||||
B64.encode $
|
||||
C.sha256Hash $
|
||||
bshow userId <> case sessionMode of
|
||||
TSMUser -> ""
|
||||
TSMSession -> ":" <> bshow proxySessTs
|
||||
TSMServer -> ":" <> bshow proxySessTs <> "@" <> strEncode srv
|
||||
TSMEntity -> ":" <> bshow proxySessTs <> "@" <> strEncode srv <> maybe "" ("/" <>) entityId_
|
||||
|
||||
-- | protocol client configuration.
|
||||
data ProtocolClientConfig v = ProtocolClientConfig
|
||||
@@ -715,7 +716,7 @@ subscribeSMPQueue c@ProtocolClient {client_ = PClient {sendPings}} rpKey rId = d
|
||||
subscribeSMPQueues :: SMPClient -> NonEmpty (RcvPrivateAuthKey, RecipientId) -> IO (NonEmpty (Either SMPClientError ()))
|
||||
subscribeSMPQueues c@ProtocolClient {client_ = PClient {sendPings}} qs = do
|
||||
atomically $ writeTVar sendPings True
|
||||
sendProtocolCommands c cs >>= mapM (processSUBResponse c)
|
||||
sendProtocolCommands "subscribeSMPQueues" c cs >>= mapM (processSUBResponse c)
|
||||
where
|
||||
cs = L.map (\(rpKey, rId) -> (Just rpKey, rId, Cmd SRecipient SUB)) qs
|
||||
|
||||
@@ -788,7 +789,7 @@ enableSMPQueueNotifications c rpKey rId notifierKey rcvNtfPublicDhKey =
|
||||
|
||||
-- | Enable notifications for the multiple queues for push notifications server.
|
||||
enableSMPQueuesNtfs :: SMPClient -> NonEmpty (RcvPrivateAuthKey, RecipientId, NtfPublicAuthKey, RcvNtfPublicDhKey) -> IO (NonEmpty (Either SMPClientError (NotifierId, RcvNtfPublicDhKey)))
|
||||
enableSMPQueuesNtfs c qs = L.map process <$> sendProtocolCommands c cs
|
||||
enableSMPQueuesNtfs c qs = L.map process <$> sendProtocolCommands "enableSMPQueuesNtfs" c cs
|
||||
where
|
||||
cs = L.map (\(rpKey, rId, notifierKey, rcvNtfPublicDhKey) -> (Just rpKey, rId, Cmd SRecipient $ NKEY notifierKey rcvNtfPublicDhKey)) qs
|
||||
process (Response _ r) = case r of
|
||||
@@ -1014,7 +1015,7 @@ okSMPCommand cmd c pKey qId =
|
||||
r -> throwE $ unexpectedResponse r
|
||||
|
||||
okSMPCommands :: PartyI p => Command p -> SMPClient -> NonEmpty (C.APrivateAuthKey, QueueId) -> IO (NonEmpty (Either SMPClientError ()))
|
||||
okSMPCommands cmd c qs = L.map process <$> sendProtocolCommands c cs
|
||||
okSMPCommands cmd c qs = L.map process <$> sendProtocolCommands "okSMPCommands" c cs
|
||||
where
|
||||
aCmd = Cmd sParty cmd
|
||||
cs = L.map (\(pKey, qId) -> (Just pKey, qId, aCmd)) qs
|
||||
@@ -1031,11 +1032,17 @@ sendSMPCommand c pKey qId cmd = sendProtocolCommand c pKey qId (Cmd sParty cmd)
|
||||
type PCTransmission err msg = (Either TransportError SentRawTransmission, Request err msg)
|
||||
|
||||
-- | Send multiple commands with batching and collect responses
|
||||
sendProtocolCommands :: forall v err msg. Protocol v err msg => ProtocolClient v err msg -> NonEmpty (ClientCommand msg) -> IO (NonEmpty (Response err msg))
|
||||
sendProtocolCommands c@ProtocolClient {thParams = THandleParams {batch, blockSize}} cs = do
|
||||
sendProtocolCommands :: forall v err msg. Protocol v err msg => String -> ProtocolClient v err msg -> NonEmpty (ClientCommand msg) -> IO (NonEmpty (Response err msg))
|
||||
sendProtocolCommands name c@ProtocolClient {thParams = THandleParams {batch, blockSize}} cs = do
|
||||
bs <- batchTransmissions' batch blockSize <$> mapM (mkTransmission c) cs
|
||||
let size = intercalate "," $ map (show . batchSize) bs
|
||||
liftIO $ print $ show name <> " length cs = " <> show (length cs) <> " length bs = " <> show (length bs) <> ", size = " <> show size <> "/" <> show blockSize
|
||||
validate . concat =<< mapM (sendBatch c) bs
|
||||
where
|
||||
batchSize = \case
|
||||
TBTransmissions bytestring _ _ -> B.length bytestring
|
||||
TBTransmission bytestring _ -> B.length bytestring
|
||||
TBError _ _ -> 0
|
||||
validate :: [Response err msg] -> IO (NonEmpty (Response err msg))
|
||||
validate rs
|
||||
| diff == 0 = pure $ L.fromList rs
|
||||
|
||||
@@ -54,7 +54,7 @@ ntfCreateSubscription c pKey newSub =
|
||||
r -> throwE $ unexpectedResponse r
|
||||
|
||||
ntfCreateSubscriptions :: NtfClient -> C.APrivateAuthKey -> NonEmpty (NewNtfEntity 'Subscription) -> IO (NonEmpty (Either NtfClientError NtfSubscriptionId))
|
||||
ntfCreateSubscriptions c pKey newSubs = L.map process <$> sendProtocolCommands c cs
|
||||
ntfCreateSubscriptions c pKey newSubs = L.map process <$> sendProtocolCommands "ntfCreateSubscriptions" c cs
|
||||
where
|
||||
cs = L.map (\newSub -> (Just pKey, NoEntity, NtfCmd SSubscription $ SNEW newSub)) newSubs
|
||||
process (Response _ r) = case r of
|
||||
|
||||
+1
-1
@@ -24,6 +24,6 @@ agentTests (ATransport t) = do
|
||||
describe "Connection request" connectionRequestTests
|
||||
describe "Double ratchet tests" doubleRatchetTests
|
||||
describe "Functional API" $ functionalAPITests (ATransport t)
|
||||
describe "Notification tests" $ notificationTests (ATransport t)
|
||||
fdescribe "Notification tests" $ notificationTests (ATransport t)
|
||||
describe "SQLite store" storeTests
|
||||
describe "Migration tests" migrationTests
|
||||
|
||||
@@ -141,10 +141,10 @@ notificationTests t = do
|
||||
it "should resume subscriptions after SMP server is restarted" $
|
||||
withAPNSMockServer $ \apns ->
|
||||
withNtfServer t $ testNotificationsSMPRestart t apns
|
||||
describe "Notifications after SMP server restart" $
|
||||
fdescribe "Notifications after SMP server restart" $
|
||||
it "should resume batched subscriptions after SMP server is restarted" $
|
||||
withAPNSMockServer $ \apns ->
|
||||
withNtfServer t $ testNotificationsSMPRestartBatch 100 t apns
|
||||
withNtfServer t $ testNotificationsSMPRestartBatch 150 t apns
|
||||
describe "should switch notifications to the new queue" $
|
||||
testServerMatrix2 t $ \servers ->
|
||||
withAPNSMockServer $ \apns ->
|
||||
|
||||
Reference in New Issue
Block a user