From 012c8cc10461760526391f4158b124eb5518e40b Mon Sep 17 00:00:00 2001 From: sh <37271604+shumvgolove@users.noreply.github.com> Date: Mon, 18 May 2026 08:26:50 +0000 Subject: [PATCH 1/4] ntf server: concurrent notification delivery (#1779) * ntf-server: carry retry reason in PPRetryLater, log retries Change PPRetryLater from nullary to PPRetryLater Text so the cause (503 / 410-reason) propagates to the retry call site. Log a warning at every retry attempt with provider, token id and reason. * ntf-server: parallel push delivery via forkIO + per-srvHost lock Fork delivery per notification, taking an MVar keyed by srvHost_ so notifications from the same SMP server serialize while different servers proceed concurrently. Switch APNS to sendRequestDirect so concurrent deliveries share one HTTP/2 connection via stream multiplexing rather than serializing through the client reqQ. * ntf-server: single-flight push client creation via SessionVar Match the take/create/wait pattern in Agent/Client.hs (newProtocolClient / waitForProtocolClient). pushClients now wraps clients in SessionVar (Either SomeException PushProviderClient) so concurrent first-time access and concurrent retries collapse to a single mkClient call; waiters observe the winner's result via readTMVar (or its error). retryDeliver evicts the failing client by SessionVar identity before re-fetching. * use multiple queues and workers, remove semaphores and threads per notification * fix * retry connecting client * fix * move config * fix --------- Co-authored-by: Evgeny @ SimpleX Chat <259188159+evgeny-simplex@users.noreply.github.com> Co-authored-by: Evgeny Poberezkin --- src/Simplex/Messaging/Notifications/Server.hs | 114 ++++++++++++------ .../Messaging/Notifications/Server/Env.hs | 72 ++++++++--- .../Notifications/Server/Push/APNS.hs | 18 +-- 3 files changed, 140 insertions(+), 64 deletions(-) diff --git a/src/Simplex/Messaging/Notifications/Server.hs b/src/Simplex/Messaging/Notifications/Server.hs index a508b2a17..75fafab58 100644 --- a/src/Simplex/Messaging/Notifications/Server.hs +++ b/src/Simplex/Messaging/Notifications/Server.hs @@ -53,6 +53,7 @@ import Data.Time.Format.ISO8601 (iso8601Show) import GHC.IORef (atomicSwapIORef) import GHC.Stats (getRTSStats) import Network.Socket (ServiceName, Socket, socketToHandle) +import Numeric.Natural (Natural) import Simplex.Messaging.Client (ProtocolClientError (..), SMPClientError, ServerTransmission (..)) import Simplex.Messaging.Client.Agent import qualified Simplex.Messaging.Crypto as C @@ -85,7 +86,7 @@ import System.Exit (exitFailure, exitSuccess) import System.IO (BufferMode (..), hClose, hPrint, hPutStrLn, hSetBuffering, hSetNewlineMode, universalNewlineMode) import System.Mem.Weak (deRefWeak) import System.Timeout (timeout) -import UnliftIO (IOMode (..), UnliftIO, askUnliftIO, race_, unliftIO, withFile) +import UnliftIO (IOMode (..), UnliftIO (..), askUnliftIO, race_, unliftIO, withFile) import UnliftIO.Concurrent (forkIO, killThread, mkWeakThreadId) import UnliftIO.Directory (doesFileExist, renameFile) import UnliftIO.Exception @@ -116,7 +117,6 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg, startOptions} void $ forkIO $ resubscribe s raceAny_ ( ntfSubscriber s - : ntfPush ps : periodicNtfsThread ps : map runServer transports <> serverStatsThread_ cfg @@ -147,12 +147,17 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg, startOptions} saveServer NtfSubscriber {smpSubscribers, smpAgent} <- asks subscriber liftIO $ readTVarIO smpSubscribers >>= mapM_ stopSubscriber + NtfPushServer {pushWorkers} <- asks pushServer + liftIO $ readTVarIO pushWorkers >>= mapM_ stopPushWorker liftIO $ closeSMPClientAgent smpAgent logNote "Server stopped" where stopSubscriber v = atomically (tryReadTMVar $ sessionVar v) >>= mapM (deRefWeak . subThreadId >=> mapM_ killThread) + stopPushWorker v = + atomically (tryReadTMVar $ sessionVar v) + >>= mapM (deRefWeak . workerThreadId >=> mapM_ killThread) saveServer :: M () saveServer = asks store >>= liftIO . closeNtfDbStore >> saveServerStats @@ -257,7 +262,7 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg, startOptions} let threadsCount = 0 #endif let NtfSubscriber {smpSubscribers, smpAgent = a} = subscriber - NtfPushServer {pushQ} = pushServer + NtfPushServer {pushWorkers} = pushServer SMPClientAgent {smpClients, smpSessions, smpSubWorkers} = a srvSubscribers <- getSMPWorkerMetrics a smpSubscribers srvClients <- getSMPWorkerMetrics a smpClients @@ -267,7 +272,7 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg, startOptions} ntfPendingServiceSubs <- getSMPServiceSubMetrics a pendingServiceSubs snd ntfPendingQueueSubs <- getSMPSubMetrics a pendingQueueSubs smpSessionCount <- M.size <$> readTVarIO smpSessions - apnsPushQLength <- atomically $ lengthTBQueue pushQ + apnsPushQLength <- pushWorkersQLength pushWorkers pure NtfRealTimeMetrics { threadsCount, @@ -526,35 +531,36 @@ ntfSubscriber NtfSubscriber {smpAgent = ca@SMPClientAgent {msgQ, agentQ}} = where receiveSMP = do st <- asks store - NtfPushServer {pushQ} <- asks pushServer + ps <- asks pushServer stats <- asks serverStats - liftIO $ forever $ do + forever $ do ((_, srv@(SMPServer (h :| _) _ _), _), _thVersion, sessionId, ts) <- atomically $ readTBQueue msgQ - forM ts $ \(ntfId, t) -> case t of + forM_ ts $ \(ntfId, t) -> case t of STUnexpectedError e -> logError $ "SMP client unexpected error: " <> tshow e -- uncorrelated response, should not happen STResponse {} -> pure () -- it was already reported as timeout error STEvent msgOrErr -> do let smpQueue = SMPQueueNtf srv ntfId case msgOrErr of Right (SMP.NMSG nmsgNonce encNMsgMeta) -> do - ntfTs <- getSystemTime - updatePeriodStats (activeSubs stats) ntfId + ntfTs <- liftIO getSystemTime + liftIO $ updatePeriodStats (activeSubs stats) ntfId let newNtf = PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta} - srvHost_ = if isOwnServer ca srv then Just (safeDecodeUtf8 $ strEncode h) else Nothing - addTokenLastNtf st newNtf >>= \case + srvHost = safeDecodeUtf8 $ strEncode h + isOwn = isOwnServer ca srv + liftIO (addTokenLastNtf st newNtf) >>= \case Right (tkn, lastNtfs) -> do - atomically $ writeTBQueue pushQ (srvHost_, tkn, PNMessage lastNtfs) - incNtfStat_ stats ntfReceived - mapM_ (`incServerStat` ntfReceivedOwn stats) srvHost_ - Left AUTH -> do + pushNotification ps (Just srvHost) isOwn tkn $ PNMessage lastNtfs + liftIO $ incNtfStat_ stats ntfReceived + when isOwn $ liftIO $ incServerStat srvHost (ntfReceivedOwn stats) + Left AUTH -> liftIO $ do incNtfStat_ stats ntfReceivedAuth - mapM_ (`incServerStat` ntfReceivedAuthOwn stats) srvHost_ + when isOwn $ incServerStat srvHost (ntfReceivedAuthOwn stats) Left _ -> pure () Right SMP.END -> whenM (atomically $ activeClientSession' ca sessionId srv) $ - void $ updateSrvSubStatus st smpQueue NSEnd + void $ liftIO $ updateSrvSubStatus st smpQueue NSEnd Right SMP.DELD -> - void $ updateSrvSubStatus st smpQueue NSDeleted + void $ liftIO $ updateSrvSubStatus st smpQueue NSDeleted Right (SMP.ERR e) -> logError $ "SMP server error: " <> tshow e Right _ -> logError "SMP server unexpected response" Left e -> logError $ "SMP client error: " <> tshow e @@ -632,9 +638,25 @@ logSubStatus srv event n updated = showServer' :: SMPServer -> Text showServer' = decodeLatin1 . strEncode . host -ntfPush :: NtfPushServer -> M () -ntfPush s@NtfPushServer {pushQ} = forever $ do - (srvHost_, tkn@NtfTknRec {ntfTknId, token = t@(DeviceToken pp _), tknStatus}, ntf) <- atomically (readTBQueue pushQ) +pushNotification :: NtfPushServer -> Maybe T.Text -> OwnServer -> NtfTknRec -> PushNotification -> M () +pushNotification s srvHost_ isOwn tkn@NtfTknRec {token = DeviceToken pp _} ntf = do + q <- getOrCreatePushWorker s (srvHost_, pp) isOwn + atomically $ writeTBQueue q (tkn, ntf) + +getOrCreatePushWorker :: NtfPushServer -> (Maybe T.Text, PushProvider) -> OwnServer -> M (TBQueue (NtfTknRec, PushNotification)) +getOrCreatePushWorker s@NtfPushServer {pushWorkers, pushWorkerSeq, pushQSize} key@(srvHost_, _) isOwn = do + ts <- liftIO getCurrentTime + atomically (getSessVar pushWorkerSeq key pushWorkers ts) >>= \case + Left v -> do + q <- liftIO $ newTBQueueIO pushQSize + tId <- mkWeakThreadId =<< forkIO (runPushWorker s srvHost_ isOwn q) + atomically $ putTMVar (sessionVar v) PushWorker {workerQ = q, workerThreadId = tId} + pure q + Right v -> workerQ <$> atomically (readTMVar $ sessionVar v) + +runPushWorker :: NtfPushServer -> Maybe T.Text -> OwnServer -> TBQueue (NtfTknRec, PushNotification) -> M () +runPushWorker s srvHost_ isOwn q = forever $ do + (tkn@NtfTknRec {ntfTknId, token = t@(DeviceToken pp _), tknStatus}, ntf) <- atomically (readTBQueue q) liftIO $ logDebug $ "sending push notification to " <> T.pack (show pp) st <- asks store case ntf of @@ -644,7 +666,7 @@ ntfPush s@NtfPushServer {pushQ} = forever $ do void $ liftIO $ setTknStatusConfirmed st tkn incNtfStatT t ntfVrfDelivered Left _ -> incNtfStatT t ntfVrfFailed - PNCheckMessages -> do + PNCheckMessages -> liftIO (deliverNotification st pp tkn ntf) >>= \case Right _ -> do void $ liftIO $ updateTokenCronSentAt st ntfTknId . systemSeconds =<< getSystemTime @@ -656,24 +678,23 @@ ntfPush s@NtfPushServer {pushQ} = forever $ do liftIO (deliverNotification st pp tkn ntf) >>= \case Left _ -> do incNtfStatT t ntfFailed - liftIO $ mapM_ (`incServerStat` ntfFailedOwn stats) srvHost_ + when isOwn $ liftIO $ mapM_ (`incServerStat` ntfFailedOwn stats) srvHost_ Right () -> do incNtfStatT t ntfDelivered - liftIO $ mapM_ (`incServerStat` ntfDeliveredOwn stats) srvHost_ - + when isOwn $ liftIO $ mapM_ (`incServerStat` ntfDeliveredOwn stats) srvHost_ where checkActiveTkn :: NtfTknStatus -> M () -> M () checkActiveTkn status action | status == NTActive = action | otherwise = liftIO $ logError "bad notification token status" deliverNotification :: NtfPostgresStore -> PushProvider -> NtfTknRec -> PushNotification -> IO (Either PushProviderError ()) - deliverNotification st pp tkn@NtfTknRec {ntfTknId} ntf = do - deliver <- getPushClient s pp - runExceptT (deliver tkn ntf) >>= \case + deliverNotification st pp tkn@NtfTknRec {ntfTknId} ntf' = do + (deliver, clientVar) <- getPushClient s pp + runExceptT (deliver tkn ntf') >>= \case Right _ -> pure $ Right () Left e -> case e of - PPConnection _ -> retryDeliver - PPRetryLater -> retryDeliver + PPConnection ce -> retryDeliver clientVar $ "connection " <> tshow ce + PPRetryLater r -> retryDeliver clientVar r PPCryptoError _ -> err e PPResponseError {} -> err e PPTokenInvalid r -> do @@ -681,10 +702,12 @@ ntfPush s@NtfPushServer {pushQ} = forever $ do err e PPPermanentError -> err e where - retryDeliver :: IO (Either PushProviderError ()) - retryDeliver = do - deliver <- newPushClient s pp - runExceptT (deliver tkn ntf) >>= \case + retryDeliver :: PushClientVar -> Text -> IO (Either PushProviderError ()) + retryDeliver oldVar reason = do + logWarn $ "retrying push (" <> tshow pp <> ", " <> tshow ntfTknId <> "): " <> reason + atomically $ removeSessVar oldVar pp (pushClients s) + (deliver, _) <- getPushClient s pp + runExceptT (deliver tkn ntf') >>= \case Right _ -> pure $ Right () Left e -> case e of PPTokenInvalid r -> do @@ -693,15 +716,26 @@ ntfPush s@NtfPushServer {pushQ} = forever $ do _ -> err e err e = logError ("Push provider error (" <> tshow pp <> ", " <> tshow ntfTknId <> "): " <> tshow e) $> Left e +pushWorkersQLength :: TMap (Maybe T.Text, PushProvider) PushWorkerVar -> IO Natural +pushWorkersQLength workers = do + ws <- readTVarIO workers + foldM addQLength 0 ws + where + addQLength acc v = + atomically (tryReadTMVar $ sessionVar v) >>= \case + Just PushWorker {workerQ} -> (acc +) <$> atomically (lengthTBQueue workerQ) + Nothing -> pure acc + periodicNtfsThread :: NtfPushServer -> M () -periodicNtfsThread NtfPushServer {pushQ} = do +periodicNtfsThread s = do st <- asks store ntfsInterval <- asks $ periodicNtfsInterval . config let interval = 1000000 * ntfsInterval + UnliftIO unlift <- askUnliftIO liftIO $ forever $ do threadDelay interval now <- systemSeconds <$> getSystemTime - cnt <- withPeriodicNtfTokens st now $ \tkn -> atomically $ writeTBQueue pushQ (Nothing, tkn, PNCheckMessages) + cnt <- withPeriodicNtfTokens st now $ \tkn -> unlift $ pushNotification s Nothing False tkn PNCheckMessages logNote $ "Scheduled periodic notifications: " <> tshow cnt runNtfClientTransport :: Transport c => THandleNTF c 'TServer -> M () @@ -791,7 +825,7 @@ verifyNtfTransmission st thAuth (tAuth, authorized, (corrId, entId, cmd)) = case e -> VRFailed e client :: NtfServerClient -> NtfSubscriber -> NtfPushServer -> M () -client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} NtfPushServer {pushQ} = +client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} ps = forever $ atomically (readTBQueue rcvQ) >>= mapM processCommand @@ -808,7 +842,7 @@ client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} NtfPushServ ts <- liftIO $ getSystemDate let tkn = mkNtfTknRec tknId newTkn srvDhPrivKey dhSecret regCode ts withNtfStore (`addNtfToken` tkn) $ \_ -> do - atomically $ writeTBQueue pushQ (Nothing, tkn, PNVerification regCode) + pushNotification ps Nothing False tkn $ PNVerification regCode incNtfStatT token ntfVrfQueued incNtfStatT token tknCreated pure $ NRTknId tknId srvDhPubKey @@ -824,7 +858,7 @@ client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} NtfPushServ | otherwise -> withNtfStore (\st -> updateTknStatus st tkn NTRegistered) $ \_ -> sendVerification where sendVerification = do - atomically $ writeTBQueue pushQ (Nothing, tkn, PNVerification tknRegCode) + pushNotification ps Nothing False tkn $ PNVerification tknRegCode incNtfStatT token ntfVrfQueued pure $ NRTknId ntfTknId $ C.publicKey tknDhPrivKey TVFY code -- this allows repeated verification for cases when client connection dropped before server response @@ -842,7 +876,7 @@ client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} NtfPushServ regCode <- getRegCode let tkn' = tkn {token = token', tknStatus = NTRegistered, tknRegCode = regCode} withNtfStore (`replaceNtfToken` tkn') $ \_ -> do - atomically $ writeTBQueue pushQ (Nothing, tkn', PNVerification regCode) + pushNotification ps Nothing False tkn' $ PNVerification regCode incNtfStatT token ntfVrfQueued incNtfStatT token tknReplaced pure NROk diff --git a/src/Simplex/Messaging/Notifications/Server/Env.hs b/src/Simplex/Messaging/Notifications/Server/Env.hs index 654428602..c798f6a9e 100644 --- a/src/Simplex/Messaging/Notifications/Server/Env.hs +++ b/src/Simplex/Messaging/Notifications/Server/Env.hs @@ -13,22 +13,27 @@ module Simplex.Messaging.Notifications.Server.Env SMPSubscriberVar, SMPSubscriber (..), NtfPushServer (..), + PushClientVar, + PushWorker (..), + PushWorkerVar, NtfRequest (..), NtfServerClient (..), defaultInactiveClientExpiration, newNtfServerEnv, newNtfSubscriber, newNtfPushServer, - newPushClient, getPushClient, newNtfServerClient, ) where import Control.Concurrent (ThreadId) +import qualified Control.Exception as E import Control.Logger.Simple import Control.Monad import Crypto.Random +import Data.Functor (($>)) import Data.Int (Int64) +import Simplex.Messaging.Agent.RetryInterval import Data.List.NonEmpty (NonEmpty) import qualified Data.Text as T import Data.Time.Clock (getCurrentTime) @@ -58,6 +63,7 @@ import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport (ASrvTransport, SMPServiceRole (..), ServiceCredentials (..), THandleParams, TransportPeer (..)) import Simplex.Messaging.Transport.Server (AddHTTP, ServerCredentials, TransportServerConfig, loadFingerprint, loadServerCredential) +import Simplex.Messaging.Util (tshow) import System.Exit (exitFailure) import System.Mem.Weak (Weak) import UnliftIO.STM @@ -163,28 +169,62 @@ data SMPSubscriber = SMPSubscriber } data NtfPushServer = NtfPushServer - { pushQ :: TBQueue (Maybe T.Text, NtfTknRec, PushNotification), -- Maybe Text is a hostname of "own" server - pushClients :: TMap PushProvider PushProviderClient, + { pushWorkers :: TMap (Maybe T.Text, PushProvider) PushWorkerVar, + pushWorkerSeq :: TVar Int, + pushQSize :: Natural, + pushClients :: TMap PushProvider PushClientVar, + pushClientSeq :: TVar Int, apnsConfig :: APNSPushClientConfig } -newNtfPushServer :: Natural -> APNSPushClientConfig -> IO NtfPushServer -newNtfPushServer qSize apnsConfig = do - pushQ <- newTBQueueIO qSize - pushClients <- TM.emptyIO - pure NtfPushServer {pushQ, pushClients, apnsConfig} +data PushWorker = PushWorker + { workerQ :: TBQueue (NtfTknRec, PushNotification), + workerThreadId :: Weak ThreadId + } -newPushClient :: NtfPushServer -> PushProvider -> IO PushProviderClient -newPushClient NtfPushServer {apnsConfig, pushClients} pp = do - c <- case apnsProviderHost pp of +type PushWorkerVar = SessionVar PushWorker + +-- The Either communicates client-creation failure from the winner to the waiters. +type PushClientVar = SessionVar (Either E.SomeException PushProviderClient) + +newNtfPushServer :: Natural -> APNSPushClientConfig -> IO NtfPushServer +newNtfPushServer pushQSize apnsConfig = do + pushWorkers <- TM.emptyIO + pushWorkerSeq <- newTVarIO 0 + pushClients <- TM.emptyIO + pushClientSeq <- newTVarIO 0 + pure NtfPushServer {pushWorkers, pushWorkerSeq, pushQSize, pushClients, pushClientSeq, apnsConfig} + +-- | Single-flight access to the per-provider push client with bounded retry. +-- The returned PushClientVar is the handle retryDeliver passes to removeSessVar to evict +-- this specific instance before re-fetching. +getPushClient :: NtfPushServer -> PushProvider -> IO (PushProviderClient, PushClientVar) +getPushClient s@NtfPushServer {apnsConfig = APNSPushClientConfig {reconnectInterval}} pp = + withRetryIntervalCount reconnectInterval $ \n _delay loop -> do + ts <- getCurrentTime + E.try (atomically (getSessVar (pushClientSeq s) pp (pushClients s) ts) >>= either (newPushClient s pp) waitForPushClient) >>= \case + Right result -> pure result + Left e + | n < 2 -> do + logError $ "getPushClient error (" <> tshow pp <> "): " <> tshow (e :: E.SomeException) + loop + | otherwise -> E.throwIO e + +newPushClient :: NtfPushServer -> PushProvider -> PushClientVar -> IO (PushProviderClient, PushClientVar) +newPushClient NtfPushServer {pushClients, apnsConfig} pp v = do + r <- E.try $ case apnsProviderHost pp of Nothing -> pure $ \_ _ -> pure () Just host -> apnsPushProviderClient <$> createAPNSPushClient host apnsConfig - atomically $ TM.insert pp c pushClients - pure c + atomically $ do + putTMVar (sessionVar v) r + case r of + Left _ -> removeSessVar v pp pushClients + Right _ -> pure () + either E.throwIO (\c -> pure (c, v)) r -getPushClient :: NtfPushServer -> PushProvider -> IO PushProviderClient -getPushClient s@NtfPushServer {pushClients} pp = - TM.lookupIO pp pushClients >>= maybe (newPushClient s pp) pure +waitForPushClient :: PushClientVar -> IO (PushProviderClient, PushClientVar) +waitForPushClient v = + atomically (readTMVar $ sessionVar v) >>= either E.throwIO (\c -> pure (c, v)) data NtfRequest = NtfReqNew CorrId ANewNtfEntity diff --git a/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs b/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs index 4b274271c..47dc435c2 100644 --- a/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs +++ b/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs @@ -23,7 +23,7 @@ module Simplex.Messaging.Notifications.Server.Push.APNS apnsPushProviderClient, ) where -import Control.Exception (Exception) +import Control.Exception (Exception, throwIO) import Control.Logger.Simple import Control.Monad import Control.Monad.Except @@ -66,6 +66,7 @@ import qualified Network.HTTP2.Client as H import Network.Socket (HostName, ServiceName) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Notifications.Protocol +import Simplex.Messaging.Agent.RetryInterval (RetryInterval (..)) import Simplex.Messaging.Notifications.Server.Push.APNS.Internal import Simplex.Messaging.Notifications.Server.Store.Types (NtfTknRec (..)) import Simplex.Messaging.Parsers (defaultJSON) @@ -192,7 +193,8 @@ data APNSPushClientConfig = APNSPushClientConfig appTeamId :: Text, apnsPort :: ServiceName, http2cfg :: HTTP2ClientConfig, - caStoreFile :: FilePath + caStoreFile :: FilePath, + reconnectInterval :: RetryInterval } apnsProviderHost :: PushProvider -> Maybe HostName @@ -214,7 +216,8 @@ defaultAPNSPushClientConfig = appTeamId = "5NN7GUYB6T", apnsPort = "443", http2cfg = defaultHTTP2ClientConfig {bufferSize = 16384}, - caStoreFile = "/etc/ssl/cert.pem" + caStoreFile = "/etc/ssl/cert.pem", + reconnectInterval = RetryInterval {initialInterval = 2000000, increaseAfter = 0, maxInterval = 10000000} } data APNSPushClient = APNSPushClient @@ -230,7 +233,7 @@ data APNSPushClient = APNSPushClient createAPNSPushClient :: HostName -> APNSPushClientConfig -> IO APNSPushClient createAPNSPushClient apnsHost apnsCfg@APNSPushClientConfig {authKeyFileEnv, authKeyAlg, authKeyIdEnv, appTeamId} = do https2Client <- newTVarIO Nothing - void $ connectHTTPS2 apnsHost apnsCfg https2Client + connectHTTPS2 apnsHost apnsCfg https2Client >>= either (throwIO . userError . show) (\_ -> pure ()) privateKey <- readECPrivateKey =<< getEnv authKeyFileEnv authKeyId <- T.pack <$> getEnv authKeyIdEnv let jwtHeader = JWTHeader {alg = authKeyAlg, kid = authKeyId} @@ -326,7 +329,7 @@ data PushProviderError | PPCryptoError C.CryptoError | PPResponseError (Maybe Status) Text | PPTokenInvalid NTInvalidReason - | PPRetryLater + | PPRetryLater Text | PPPermanentError deriving (Show, Exception) @@ -343,7 +346,6 @@ apnsPushProviderClient c@APNSPushClient {nonceDrg, apnsCfg} tkn@NtfTknRec {token nonce <- atomically $ C.randomCbNonce nonceDrg apnsNtf <- liftEither $ first PPCryptoError $ apnsNotification tkn nonce (paddedNtfLength apnsCfg) pn req <- liftIO $ apnsRequest c tknStr apnsNtf - -- TODO when HTTP2 client is thread-safe, we can use sendRequestDirect HTTP2Response {response, respBody = HTTP2Body {bodyHead}} <- liftHTTPS2 $ sendRequest http2 req Nothing let status = H.responseStatus response reason' = maybe "" reason $ J.decodeStrict' bodyHead @@ -373,8 +375,8 @@ apnsPushProviderClient c@APNSPushClient {nonceDrg, apnsCfg} tkn@NtfTknRec {token | status == Just N.gone410 = throwE $ case reason' of "ExpiredToken" -> PPTokenInvalid NTIRExpiredToken "Unregistered" -> PPTokenInvalid NTIRUnregistered - _ -> PPRetryLater - | status == Just N.serviceUnavailable503 = liftIO (disconnectApnsHTTP2Client c) >> throwE PPRetryLater + _ -> PPRetryLater $ "410 " <> reason' + | status == Just N.serviceUnavailable503 = liftIO (disconnectApnsHTTP2Client c) >> throwE (PPRetryLater "503") -- Just tooManyRequests429 -> TooManyRequests - too many requests for the same token | otherwise = throwE $ PPResponseError status reason' liftHTTPS2 a = ExceptT $ first PPConnection <$> a From b6f551000f611d493e45261bab0fe431efcfcf9f Mon Sep 17 00:00:00 2001 From: sh <37271604+shumvgolove@users.noreply.github.com> Date: Mon, 18 May 2026 13:35:47 +0000 Subject: [PATCH 2/4] ntf server: concurrent APNS push via sendRequestDirect (#1780) The per-(srvHost, provider) worker shards added in #1779 still funnel all APNS sends through one HTTP2Client's reqQ, where a single process thread calls sendRequest serially - one in-flight HTTP/2 stream at a time, capping APNS throughput at 1/RTT. sendRequestDirect bypasses the queue and invokes sendReq directly from the calling worker, so concurrent workers open parallel HTTP/2 streams on the shared APNS connection and the multiplexing happens on the wire. --- src/Simplex/Messaging/Notifications/Server/Push/APNS.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs b/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs index 47dc435c2..0657e5cec 100644 --- a/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs +++ b/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs @@ -346,7 +346,7 @@ apnsPushProviderClient c@APNSPushClient {nonceDrg, apnsCfg} tkn@NtfTknRec {token nonce <- atomically $ C.randomCbNonce nonceDrg apnsNtf <- liftEither $ first PPCryptoError $ apnsNotification tkn nonce (paddedNtfLength apnsCfg) pn req <- liftIO $ apnsRequest c tknStr apnsNtf - HTTP2Response {response, respBody = HTTP2Body {bodyHead}} <- liftHTTPS2 $ sendRequest http2 req Nothing + HTTP2Response {response, respBody = HTTP2Body {bodyHead}} <- liftHTTPS2 $ sendRequestDirect http2 req Nothing let status = H.responseStatus response reason' = maybe "" reason $ J.decodeStrict' bodyHead if status == Just N.ok200 From 118a8e89bb654d6546f2b0ab6e1cf6b91fa47763 Mon Sep 17 00:00:00 2001 From: sh <37271604+shumvgolove@users.noreply.github.com> Date: Wed, 20 May 2026 12:56:55 +0000 Subject: [PATCH 3/4] agent: use primary key index in setRcvServiceAssocs (#1783) * agent: use primary key index in setRcvServiceAssocs Previous WHERE rcv_id = ? did not match the (host, port, rcv_id) primary key prefix and fell back to a table scan via idx_rcv_queues_client_notice_id. With ~390k rows per queue, each update in a 1350-row batch scanned the whole table, yielding ~290s per batch and a multi-hour rcv-services migration. * agent: pass SMPServer explicitly to setRcvServiceAssocs Avoid extracting host/port from the first queue inside setRcvServiceAssocs. The caller already has SMPServer in scope (from tSess) and the call chain is short, so threading it through is simpler than inspecting the list. Removes the empty-list guard from setRcvServiceAssocs (it remains in processRcvServiceAssocs). --- src/Simplex/Messaging/Agent.hs | 2 +- src/Simplex/Messaging/Agent/Client.hs | 10 +++++----- src/Simplex/Messaging/Agent/Store/AgentStore.hs | 14 ++++++++++---- 3 files changed, 16 insertions(+), 10 deletions(-) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 39f6fd15f..bd77b892a 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -3106,7 +3106,7 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), THandlePar unless (null connIds) $ do notify' "" $ UP srv connIds atomically $ incSMPServerStat' c userId srv connSubscribed $ length connIds - readTVarIO serviceRQs >>= processRcvServiceAssocs c + readTVarIO serviceRQs >>= processRcvServiceAssocs c srv where withRcvConn :: SMP.RecipientId -> (forall c. RcvQueue -> Connection c -> AM ()) -> AM' () withRcvConn rId a = do diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 1039c5d75..d33794006 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -1692,7 +1692,7 @@ subscribeSessQueues_ c withEvents qs = sendClientBatch_ "SUB" False subscribe_ c unless (null notices) $ takeTMVar $ clientNoticesLock c pure r unless (null serviceQs) $ void $ - processRcvServiceAssocs c serviceQs `runReaderT` agentEnv c + processRcvServiceAssocs c srv serviceQs `runReaderT` agentEnv c unless (null notices) $ void $ (processClientNotices c tSess notices `runReaderT` agentEnv c) `E.finally` atomically (putTMVar (clientNoticesLock c) ()) @@ -1714,10 +1714,10 @@ subscribeSessQueues_ c withEvents qs = sendClientBatch_ "SUB" False subscribe_ c tSess = transportSession' smp sessId = sessionId $ thParams smp -processRcvServiceAssocs :: SMPQueue q => AgentClient -> [q] -> AM' () -processRcvServiceAssocs _ [] = pure () -processRcvServiceAssocs c serviceQs = - withStore' c (`setRcvServiceAssocs` serviceQs) `catchAllErrors'` \e -> do +processRcvServiceAssocs :: SMPQueue q => AgentClient -> SMPServer -> [q] -> AM' () +processRcvServiceAssocs _ _ [] = pure () +processRcvServiceAssocs c srv serviceQs = + withStore' c (\db -> setRcvServiceAssocs db srv serviceQs) `catchAllErrors'` \e -> do logError $ "processRcvServiceAssocs error: " <> tshow e notifySub' c "" $ ERR e diff --git a/src/Simplex/Messaging/Agent/Store/AgentStore.hs b/src/Simplex/Messaging/Agent/Store/AgentStore.hs index 6a369ee2c..e3ea1671b 100644 --- a/src/Simplex/Messaging/Agent/Store/AgentStore.hs +++ b/src/Simplex/Messaging/Agent/Store/AgentStore.hs @@ -2399,12 +2399,18 @@ unassocUserServerRcvQueueSubs' db userId srv@(SMPServer h p kh) = do unsetQueuesToSubscribe :: DB.Connection -> IO () unsetQueuesToSubscribe db = DB.execute_ db "UPDATE rcv_queues SET to_subscribe = 0 WHERE to_subscribe = 1" -setRcvServiceAssocs :: SMPQueue q => DB.Connection -> [q] -> IO () -setRcvServiceAssocs db rqs = do +setRcvServiceAssocs :: SMPQueue q => DB.Connection -> SMPServer -> [q] -> IO () +setRcvServiceAssocs db ProtocolServer {host, port} rqs = #if defined(dbPostgres) - DB.execute db "UPDATE rcv_queues SET rcv_service_assoc = 1 WHERE rcv_id IN ?" $ Only $ In (map queueId rqs) + DB.execute + db + "UPDATE rcv_queues SET rcv_service_assoc = 1 WHERE host = ? AND port = ? AND rcv_id IN ?" + (host, port, In (map queueId rqs)) #else - DB.executeMany db "UPDATE rcv_queues SET rcv_service_assoc = 1 WHERE rcv_id = ?" $ map (Only . queueId) rqs + DB.executeMany + db + "UPDATE rcv_queues SET rcv_service_assoc = 1 WHERE host = ? AND port = ? AND rcv_id = ?" + (map (\q -> (host, port, queueId q)) rqs) #endif removeRcvServiceAssocs :: DB.Connection -> UserId -> SMPServer -> IO () From f03cec7a58ed13a39a52886888c74bcefdb64479 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Thu, 21 May 2026 09:10:25 +0100 Subject: [PATCH 4/4] 6.5.2.0 --- simplexmq.cabal | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/simplexmq.cabal b/simplexmq.cabal index 66cd43b17..5fd0cdf8e 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -1,7 +1,7 @@ cabal-version: 1.12 name: simplexmq -version: 6.5.1.0 +version: 6.5.2.0 synopsis: SimpleXMQ message broker description: This package includes <./docs/Simplex-Messaging-Server.html server>, <./docs/Simplex-Messaging-Client.html client> and