From 7d0115daec2842e82b23bb1a9b85baddad08e8b3 Mon Sep 17 00:00:00 2001 From: Evgeny Date: Mon, 28 Apr 2025 12:18:55 +0100 Subject: [PATCH] ntf server, agent: send all periodic notifications from one thread, only to old active clients or new clients with periodic notification mode (#1528) * ntf server, agent: send all periodic notifications from one thread, only to old active clients or new clients with periodic notification mode * send different type via subscription queues * option to compact store log on start --- src/Simplex/Messaging/Agent.hs | 13 ++- src/Simplex/Messaging/Agent/Client.hs | 9 +- src/Simplex/Messaging/Notifications/Client.hs | 5 +- src/Simplex/Messaging/Notifications/Server.hs | 106 ++++++++---------- .../Messaging/Notifications/Server/Env.hs | 48 ++++---- .../Messaging/Notifications/Server/Main.hs | 1 + .../Notifications/Server/Prometheus.hs | 21 ++-- .../Notifications/Server/Store/Migrations.hs | 8 +- .../Notifications/Server/Store/Postgres.hs | 52 +++++---- .../Notifications/Server/Store/Types.hs | 4 +- tests/NtfClient.hs | 1 + 11 files changed, 137 insertions(+), 131 deletions(-) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 34d6bbc97..89e379023 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -2197,10 +2197,9 @@ registerNtfToken' c suppliedDeviceToken suppliedNtfMode = atomically $ nsUpdateToken ns tkn' agentNtfCheckToken c tknId tkn' >>= \case NTActive -> do - cron <- asks $ ntfCron . config - agentNtfEnableCron c tknId tkn cron when (suppliedNtfMode == NMInstant) $ initializeNtfSubs c when (suppliedNtfMode == NMPeriodic && savedNtfMode == NMInstant) $ deleteNtfSubs c NSCSmpDelete + lift $ setCronInterval c tknId tkn t tkn' (NTActive, Just NTACheck) $ pure () status -> t tkn' (status, Nothing) $ pure () | otherwise -> replaceToken tknId @@ -2261,11 +2260,17 @@ verifyNtfToken' c deviceToken nonce code = withToken c tkn (Just (NTConfirmed, NTAVerify code')) (NTActive, Just NTACheck) $ agentNtfVerifyToken c tknId tkn code' when (toStatus == NTActive) $ do - cron <- asks $ ntfCron . config - agentNtfEnableCron c tknId tkn cron + lift $ setCronInterval c tknId tkn when (ntfMode == NMInstant) $ initializeNtfSubs c _ -> throwE $ CMD PROHIBITED "verifyNtfToken: no token" +setCronInterval :: AgentClient -> NtfTokenId -> NtfToken -> AM' () +setCronInterval c tknId tkn@NtfToken {ntfMode} = do + cron <- case ntfMode of + NMPeriodic -> asks $ ntfCron . config + _ -> pure 0 + void $ forkIO $ void $ runExceptT $ agentNtfSetCronInterval c tknId tkn cron + checkNtfToken' :: AgentClient -> DeviceToken -> AM NtfTknStatus checkNtfToken' c deviceToken = withStore' c getSavedNtfToken >>= \case diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 240b25f7e..455c5eb48 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -77,7 +77,7 @@ module Simplex.Messaging.Agent.Client agentNtfCheckToken, agentNtfReplaceToken, agentNtfDeleteToken, - agentNtfEnableCron, + agentNtfSetCronInterval, agentNtfCreateSubscription, agentNtfCreateSubscriptions, agentNtfCheckSubscription, @@ -1812,9 +1812,10 @@ agentNtfDeleteToken :: AgentClient -> NtfServer -> C.APrivateAuthKey -> NtfToken agentNtfDeleteToken c ntfServer ntfPrivKey tknId = withNtfClient c ntfServer tknId "TDEL" $ \ntf -> ntfDeleteToken ntf ntfPrivKey tknId -agentNtfEnableCron :: AgentClient -> NtfTokenId -> NtfToken -> Word16 -> AM () -agentNtfEnableCron c tknId NtfToken {ntfServer, ntfPrivKey} interval = - withNtfClient c ntfServer tknId "TCRN" $ \ntf -> ntfEnableCron ntf ntfPrivKey tknId interval +-- set to 0 to disable +agentNtfSetCronInterval :: AgentClient -> NtfTokenId -> NtfToken -> Word16 -> AM () +agentNtfSetCronInterval c tknId NtfToken {ntfServer, ntfPrivKey} interval = + withNtfClient c ntfServer tknId "TCRN" $ \ntf -> ntfSetCronInterval ntf ntfPrivKey tknId interval agentNtfCreateSubscription :: AgentClient -> NtfTokenId -> NtfToken -> SMPQueueNtf -> SMP.NtfPrivateAuthKey -> AM NtfSubscriptionId agentNtfCreateSubscription c tknId NtfToken {ntfServer, ntfPrivKey} smpQueue nKey = diff --git a/src/Simplex/Messaging/Notifications/Client.hs b/src/Simplex/Messaging/Notifications/Client.hs index 273010c2c..a2a4f2ec9 100644 --- a/src/Simplex/Messaging/Notifications/Client.hs +++ b/src/Simplex/Messaging/Notifications/Client.hs @@ -49,8 +49,9 @@ ntfReplaceToken c pKey tknId token = okNtfCommand (TRPL token) c pKey tknId ntfDeleteToken :: NtfClient -> C.APrivateAuthKey -> NtfTokenId -> ExceptT NtfClientError IO () ntfDeleteToken = okNtfCommand TDEL -ntfEnableCron :: NtfClient -> C.APrivateAuthKey -> NtfTokenId -> Word16 -> ExceptT NtfClientError IO () -ntfEnableCron c pKey tknId int = okNtfCommand (TCRN int) c pKey tknId +-- set to 0 to disable +ntfSetCronInterval :: NtfClient -> C.APrivateAuthKey -> NtfTokenId -> Word16 -> ExceptT NtfClientError IO () +ntfSetCronInterval c pKey tknId int = okNtfCommand (TCRN int) c pKey tknId ntfCreateSubscription :: NtfClient -> C.APrivateAuthKey -> NewNtfEntity 'Subscription -> ExceptT NtfClientError IO NtfSubscriptionId ntfCreateSubscription c pKey newSub = diff --git a/src/Simplex/Messaging/Notifications/Server.hs b/src/Simplex/Messaging/Notifications/Server.hs index 55ab37885..5407a3bc9 100644 --- a/src/Simplex/Messaging/Notifications/Server.hs +++ b/src/Simplex/Messaging/Notifications/Server.hs @@ -40,7 +40,7 @@ import qualified Data.Text as T import qualified Data.Text.IO as T import Data.Text.Encoding (decodeLatin1) import Data.Time.Clock (UTCTime (..), diffTimeToPicoseconds, getCurrentTime) -import Data.Time.Clock.System (getSystemTime) +import Data.Time.Clock.System (SystemTime (..), getSystemTime) import Data.Time.Format.ISO8601 (iso8601Show) import GHC.IORef (atomicSwapIORef) import GHC.Stats (getRTSStats) @@ -76,7 +76,7 @@ import System.Environment (lookupEnv) import System.Exit (exitFailure, exitSuccess) import System.IO (BufferMode (..), hClose, hPrint, hPutStrLn, hSetBuffering, hSetNewlineMode, universalNewlineMode) import System.Mem.Weak (deRefWeak) -import UnliftIO (IOMode (..), UnliftIO, askUnliftIO, async, uninterruptibleCancel, unliftIO, withFile) +import UnliftIO (IOMode (..), UnliftIO, askUnliftIO, unliftIO, withFile) import UnliftIO.Concurrent (forkIO, killThread, mkWeakThreadId) import UnliftIO.Directory (doesFileExist, renameFile) import UnliftIO.Exception @@ -108,6 +108,7 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg, startOptions} raceAny_ ( ntfSubscriber s : ntfPush ps + : periodicNtfsThread ps : map runServer transports <> serverStatsThread_ cfg <> prometheusMetricsThread_ cfg @@ -252,7 +253,7 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg, startOptions} ntfActiveSubs <- getSMPSubMetrics a srvSubs ntfPendingSubs <- getSMPSubMetrics a pendingSrvSubs smpSessionCount <- M.size <$> readTVarIO smpSessions - apnsPushQLength <- fromIntegral <$> atomically (lengthTBQueue pushQ) + apnsPushQLength <- atomically $ lengthTBQueue pushQ pure NtfRealTimeMetrics {threadsCount, srvSubscribers, srvClients, srvSubWorkers, ntfActiveSubs, ntfPendingSubs, smpSessionCount, apnsPushQLength} where getSMPSubMetrics :: SMPClientAgent -> TMap SMPServer (TMap SMPSub a) -> IO NtfSMPSubMetrics @@ -463,16 +464,12 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge -- TODO [ntfdb] possibly, the subscriptions can be batched here and sent every say 5 seconds -- this should be analysed once we have prometheus stats subs <- atomically $ readTQueue subscriberSubQ - -- TODO [ntfdb] validate/partition that SMP server matches and log internal error if not updated <- liftIO $ batchUpdateSubStatus st subs NSPending logSubStatus smpServer "subscribing" (L.length subs) updated liftIO $ subscribeQueues smpServer subs - -- \| Subscribe to queues. The list of results can have a different order. - subscribeQueues :: SMPServer -> NonEmpty NtfSubRec -> IO () - subscribeQueues srv subs = subscribeQueuesNtfs ca srv (L.map sub subs) - where - sub NtfSubRec {smpQueue = SMPQueueNtf {notifierId}, notifierKey} = (notifierId, notifierKey) + subscribeQueues :: SMPServer -> NonEmpty ServerNtfSub -> IO () + subscribeQueues srv subs = subscribeQueuesNtfs ca srv (L.map snd subs) receiveSMP :: M () receiveSMP = forever $ do @@ -492,7 +489,7 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge let newNtf = PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta} ntfs_ <- liftIO $ addTokenLastNtf st newNtf forM_ ntfs_ $ \(tkn, lastNtfs) -> atomically $ writeTBQueue pushQ (tkn, PNMessage lastNtfs) - -- TODO [ntfdb] track queued notifications separately? + -- TODO [ntfdb] count queued notifications separately? incNtfStat ntfReceived Right SMP.END -> do whenM (atomically $ activeClientSession' ca sessionId srv) $ do @@ -554,31 +551,34 @@ ntfPush :: NtfPushServer -> M () ntfPush s@NtfPushServer {pushQ} = forever $ do (tkn@NtfTknRec {ntfTknId, token = t@(DeviceToken pp _), tknStatus}, ntf) <- atomically (readTBQueue pushQ) liftIO $ logDebug $ "sending push notification to " <> T.pack (show pp) + st <- asks store case ntf of PNVerification _ -> - deliverNotification pp tkn ntf >>= \case + liftIO (deliverNotification st pp tkn ntf) >>= \case Right _ -> do - st <- asks store void $ liftIO $ setTknStatusConfirmed st tkn incNtfStatT t ntfVrfDelivered Left _ -> incNtfStatT t ntfVrfFailed - PNCheckMessages -> checkActiveTkn tknStatus $ do - deliverNotification pp tkn ntf - >>= incNtfStatT t . (\case Left _ -> ntfCronFailed; Right () -> ntfCronDelivered) + PNCheckMessages -> do + liftIO (deliverNotification st pp tkn ntf) >>= \case + Right _ -> do + void $ liftIO $ updateTokenCronSentAt st ntfTknId . systemSeconds =<< getSystemTime + incNtfStatT t ntfCronDelivered + Left _ -> incNtfStatT t ntfCronFailed PNMessage {} -> checkActiveTkn tknStatus $ do stats <- asks serverStats liftIO $ updatePeriodStats (activeTokens stats) ntfTknId - deliverNotification pp tkn ntf + liftIO (deliverNotification st pp tkn ntf) >>= incNtfStatT t . (\case Left _ -> ntfFailed; Right () -> ntfDelivered) where checkActiveTkn :: NtfTknStatus -> M () -> M () checkActiveTkn status action | status == NTActive = action | otherwise = liftIO $ logError "bad notification token status" - deliverNotification :: PushProvider -> NtfTknRec -> PushNotification -> M (Either PushProviderError ()) - deliverNotification pp tkn@NtfTknRec {ntfTknId} ntf = do - deliver <- liftIO $ getPushClient s pp - liftIO (runExceptT $ deliver tkn ntf) >>= \case + deliverNotification :: NtfPostgresStore -> PushProvider -> NtfTknRec -> PushNotification -> IO (Either PushProviderError ()) + deliverNotification st pp tkn@NtfTknRec {ntfTknId} ntf = do + deliver <- getPushClient s pp + runExceptT (deliver tkn ntf) >>= \case Right _ -> pure $ Right () Left e -> case e of PPConnection _ -> retryDeliver @@ -586,24 +586,35 @@ ntfPush s@NtfPushServer {pushQ} = forever $ do PPCryptoError _ -> err e PPResponseError {} -> err e PPTokenInvalid r -> do - st <- asks store - void $ liftIO $ updateTknStatus st tkn $ NTInvalid $ Just r + void $ updateTknStatus st tkn $ NTInvalid $ Just r err e PPPermanentError -> err e where - retryDeliver :: M (Either PushProviderError ()) + retryDeliver :: IO (Either PushProviderError ()) retryDeliver = do - deliver <- liftIO $ newPushClient s pp - liftIO (runExceptT $ deliver tkn ntf) >>= \case + deliver <- newPushClient s pp + runExceptT (deliver tkn ntf) >>= \case Right _ -> pure $ Right () Left e -> case e of PPTokenInvalid r -> do - st <- asks store - void $ liftIO $ updateTknStatus st tkn $ NTInvalid $ Just r + void $ updateTknStatus st tkn $ NTInvalid $ Just r err e _ -> err e err e = logError ("Push provider error (" <> tshow pp <> ", " <> tshow ntfTknId <> "): " <> tshow e) $> Left e +-- TODO [ntfdb] this could be further improved by sending periodic notifications directly from this thread, +-- without any queue +periodicNtfsThread :: NtfPushServer -> M () +periodicNtfsThread NtfPushServer {pushQ} = do + st <- asks store + ntfsInterval <- asks $ periodicNtfsInterval . config + let interval = 1000000 * ntfsInterval + liftIO $ forever $ do + threadDelay interval + now <- systemSeconds <$> getSystemTime + cnt <- withPeriodicNtfTokens st now $ \tkn -> atomically $ writeTBQueue pushQ (tkn, PNCheckMessages) + logInfo $ "Scheduled periodic notifications: " <> tshow cnt + runNtfClientTransport :: Transport c => THandleNTF c 'TServer -> M () runNtfClientTransport th@THandle {params} = do qSize <- asks $ clientQSize . config @@ -692,7 +703,7 @@ verifyNtfTransmission st auth_ (tAuth, authorized, (corrId, entId, _)) = \case e -> VRFailed e client :: NtfServerClient -> NtfSubscriber -> NtfPushServer -> M () -client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPushServer {pushQ, intervalNotifiers} = +client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPushServer {pushQ} = forever $ atomically (readTBQueue rcvQ) >>= mapM processCommand @@ -728,11 +739,7 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu TVFY code -- this allows repeated verification for cases when client connection dropped before server response | (tknStatus == NTRegistered || tknStatus == NTConfirmed || tknStatus == NTActive) && tknRegCode == code -> do logDebug "TVFY - token verified" - withNtfStore (`setTokenActive` tkn) $ \tIds -> do - -- TODO [ntfdb] this will be unnecessary if all cron notifications move to one thread - forM_ tIds cancelInvervalNotifications - incNtfStatT token tknVerified - pure NROk + withNtfStore (`setTokenActive` tkn) $ \_ -> NROk <$ incNtfStatT token tknVerified | otherwise -> do logDebug "TVFY - incorrect code or token status" pure $ NRErr AUTH @@ -754,45 +761,24 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu forM_ ss $ \(smpServer, nIds) -> do atomically $ removeSubscriptions ca smpServer SPNotifier nIds atomically $ removePendingSubs ca smpServer SPNotifier nIds - cancelInvervalNotifications tknId incNtfStatT token tknDeleted pure NROk TCRN 0 -> do logDebug "TCRN 0" - withNtfStore (\st -> updateTknCronInterval st ntfTknId 0) $ \_ -> do - -- TODO [ntfdb] move cron intervals to one thread - cancelInvervalNotifications tknId - pure NROk + withNtfStore (\st -> updateTknCronInterval st ntfTknId 0) $ \_ -> pure NROk TCRN int | int < 20 -> pure $ NRErr QUOTA | otherwise -> do logDebug "TCRN" - withNtfStore (\st -> updateTknCronInterval st ntfTknId int) $ \_ -> do - -- TODO [ntfdb] move cron intervals to one thread - liftIO (TM.lookupIO tknId intervalNotifiers) >>= \case - Nothing -> runIntervalNotifier int - Just IntervalNotifier {interval, action} -> - unless (interval == int) $ do - uninterruptibleCancel action - runIntervalNotifier int - pure NROk - where - runIntervalNotifier interval = do - action <- async . intervalNotifier $ fromIntegral interval * 1000000 * 60 - let notifier = IntervalNotifier {action, token = tkn, interval} - atomically $ TM.insert tknId notifier intervalNotifiers - where - intervalNotifier delay = forever $ do - liftIO $ threadDelay' delay - atomically $ writeTBQueue pushQ (tkn, PNCheckMessages) - NtfReqNew corrId (ANE SSubscription newSub@(NewNtfSub _ (SMPQueueNtf srv _) _)) -> do + withNtfStore (\st -> updateTknCronInterval st ntfTknId int) $ \_ -> pure NROk + NtfReqNew corrId (ANE SSubscription newSub@(NewNtfSub _ (SMPQueueNtf srv nId) nKey)) -> do logDebug "SNEW - new subscription" subId <- getId let sub = mkNtfSubRec subId newSub resp <- withNtfStore (`addNtfSubscription` sub) $ \case True -> do - atomically $ writeTBQueue newSubQ (srv, [sub]) + atomically $ writeTBQueue newSubQ (srv, [(subId, (nId, nKey))]) incNtfStat subCreated pure $ NRSubId subId False -> pure $ NRErr AUTH @@ -823,10 +809,6 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu getRegCode = NtfRegCode <$> (randomBytes =<< asks (regCodeBytes . config)) randomBytes :: Int -> M ByteString randomBytes n = atomically . C.randomBytes n =<< asks random - cancelInvervalNotifications :: NtfTokenId -> M () - cancelInvervalNotifications tknId = - atomically (TM.lookupDelete tknId intervalNotifiers) - >>= mapM_ (uninterruptibleCancel . action) withNtfStore :: (NtfPostgresStore -> IO (Either ErrorType a)) -> (a -> M NtfResponse) -> M NtfResponse withNtfStore stAction continue = do diff --git a/src/Simplex/Messaging/Notifications/Server/Env.hs b/src/Simplex/Messaging/Notifications/Server/Env.hs index 6488cbb11..415f341cb 100644 --- a/src/Simplex/Messaging/Notifications/Server/Env.hs +++ b/src/Simplex/Messaging/Notifications/Server/Env.hs @@ -1,6 +1,7 @@ {-# LANGUAGE DataKinds #-} {-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE GADTs #-} +{-# LANGUAGE LambdaCase #-} {-# LANGUAGE KindSignatures #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedStrings #-} @@ -8,33 +9,38 @@ module Simplex.Messaging.Notifications.Server.Env where import Control.Concurrent (ThreadId) -import Control.Concurrent.Async (Async) +import Control.Logger.Simple +import Control.Monad import Crypto.Random import Data.Int (Int64) import Data.List.NonEmpty (NonEmpty) +import qualified Data.Text as T import Data.Time.Clock (getCurrentTime) import Data.Time.Clock.System (SystemTime) -import Data.Word (Word16) import Data.X509.Validation (Fingerprint (..)) import Network.Socket -import qualified Network.TLS as T +import qualified Network.TLS as TLS import Numeric.Natural import Simplex.Messaging.Client.Agent import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Notifications.Protocol import Simplex.Messaging.Notifications.Server.Push.APNS import Simplex.Messaging.Notifications.Server.Stats +import Simplex.Messaging.Notifications.Server.Store (newNtfSTMStore) import Simplex.Messaging.Notifications.Server.Store.Postgres import Simplex.Messaging.Notifications.Server.Store.Types +import Simplex.Messaging.Notifications.Server.StoreLog (readWriteNtfSTMStore) import Simplex.Messaging.Notifications.Transport (NTFVersion, VersionRangeNTF) import Simplex.Messaging.Protocol (BasicAuth, CorrId, SMPServer, Transmission) -import Simplex.Messaging.Server.Env.STM (StartOptions) +import Simplex.Messaging.Server.Env.STM (StartOptions (..)) import Simplex.Messaging.Server.Expiration -import Simplex.Messaging.Server.QueueStore.Postgres.Config (PostgresStoreCfg) +import Simplex.Messaging.Server.QueueStore.Postgres.Config (PostgresStoreCfg (..)) +import Simplex.Messaging.Server.StoreLog (closeStoreLog) import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport (ATransport, THandleParams, TransportPeer (..)) import Simplex.Messaging.Transport.Server (AddHTTP, ServerCredentials, TransportServerConfig, loadFingerprint, loadServerCredential) +import System.Exit (exitFailure) import System.Mem.Weak (Weak) import UnliftIO.STM @@ -54,6 +60,7 @@ data NtfServerConfig = NtfServerConfig inactiveClientExpiration :: Maybe ExpirationConfig, dbStoreConfig :: PostgresStoreCfg, ntfCredentials :: ServerCredentials, + periodicNtfsInterval :: Int, -- seconds -- stats config - see SMP server config logStatsInterval :: Maybe Int64, logStatsStartTime :: Int64, @@ -80,29 +87,34 @@ data NtfEnv = NtfEnv pushServer :: NtfPushServer, store :: NtfPostgresStore, random :: TVar ChaChaDRG, - tlsServerCreds :: T.Credential, + tlsServerCreds :: TLS.Credential, serverIdentity :: C.KeyHash, serverStats :: NtfServerStats } newNtfServerEnv :: NtfServerConfig -> IO NtfEnv -newNtfServerEnv config@NtfServerConfig {subQSize, pushQSize, smpAgentCfg, apnsConfig, dbStoreConfig, ntfCredentials} = do +newNtfServerEnv config@NtfServerConfig {subQSize, pushQSize, smpAgentCfg, apnsConfig, dbStoreConfig, ntfCredentials, startOptions} = do + when (compactLog startOptions) $ compactDbStoreLog $ dbStoreLogPath dbStoreConfig random <- C.newRandom store <- newNtfDbStore dbStoreConfig - -- TODO [ntfdb] this should happen with compacting on start - -- logInfo "restoring subscriptions..." - -- storeLog <- mapM (`readWriteNtfStore` store) storeLogFile - -- logInfo "restored subscriptions" subscriber <- newNtfSubscriber subQSize smpAgentCfg random pushServer <- newNtfPushServer pushQSize apnsConfig tlsServerCreds <- loadServerCredential ntfCredentials Fingerprint fp <- loadFingerprint ntfCredentials serverStats <- newNtfServerStats =<< getCurrentTime pure NtfEnv {config, subscriber, pushServer, store, random, tlsServerCreds, serverIdentity = C.KeyHash fp, serverStats} + where + compactDbStoreLog = \case + Just f -> do + logInfo $ "compacting store log " <> T.pack f + newNtfSTMStore >>= readWriteNtfSTMStore False f >>= closeStoreLog + Nothing -> do + logError "Error: `--compact-log` used without `enable: on` option in STORE_LOG section of INI file" + exitFailure data NtfSubscriber = NtfSubscriber { smpSubscribers :: TMap SMPServer SMPSubscriber, - newSubQ :: TBQueue (SMPServer, NonEmpty NtfSubRec), -- should match SMPServer + newSubQ :: TBQueue (SMPServer, NonEmpty ServerNtfSub), smpAgent :: SMPClientAgent } @@ -115,7 +127,7 @@ newNtfSubscriber qSize smpAgentCfg random = do data SMPSubscriber = SMPSubscriber { smpServer :: SMPServer, - subscriberSubQ :: TQueue (NonEmpty NtfSubRec), + subscriberSubQ :: TQueue (NonEmpty ServerNtfSub), subThreadId :: TVar (Maybe (Weak ThreadId)) } @@ -128,22 +140,14 @@ newSMPSubscriber smpServer = do data NtfPushServer = NtfPushServer { pushQ :: TBQueue (NtfTknRec, PushNotification), pushClients :: TMap PushProvider PushProviderClient, - intervalNotifiers :: TMap NtfTokenId IntervalNotifier, apnsConfig :: APNSPushClientConfig } -data IntervalNotifier = IntervalNotifier - { action :: Async (), - token :: NtfTknRec, - interval :: Word16 - } - newNtfPushServer :: Natural -> APNSPushClientConfig -> IO NtfPushServer newNtfPushServer qSize apnsConfig = do pushQ <- newTBQueueIO qSize pushClients <- TM.emptyIO - intervalNotifiers <- TM.emptyIO - pure NtfPushServer {pushQ, pushClients, intervalNotifiers, apnsConfig} + pure NtfPushServer {pushQ, pushClients, apnsConfig} newPushClient :: NtfPushServer -> PushProvider -> IO PushProviderClient newPushClient NtfPushServer {apnsConfig, pushClients} pp = do diff --git a/src/Simplex/Messaging/Notifications/Server/Main.hs b/src/Simplex/Messaging/Notifications/Server/Main.hs index be3210be1..f59c565bc 100644 --- a/src/Simplex/Messaging/Notifications/Server/Main.hs +++ b/src/Simplex/Messaging/Notifications/Server/Main.hs @@ -263,6 +263,7 @@ ntfServerCLI cfgPath logPath = privateKeyFile = c serverKeyFile, certificateFile = c serverCrtFile }, + periodicNtfsInterval = 5 * 60, -- 5 minutes logStatsInterval = logStats $> 86400, -- seconds logStatsStartTime = 0, -- seconds from 00:00 UTC serverStatsLogFile = combine logPath "ntf-server-stats.daily.log", diff --git a/src/Simplex/Messaging/Notifications/Server/Prometheus.hs b/src/Simplex/Messaging/Notifications/Server/Prometheus.hs index a3399c27f..78d5b4d38 100644 --- a/src/Simplex/Messaging/Notifications/Server/Prometheus.hs +++ b/src/Simplex/Messaging/Notifications/Server/Prometheus.hs @@ -13,6 +13,7 @@ import qualified Data.Text as T import Data.Time.Clock (UTCTime (..), diffUTCTime) import Data.Time.Clock.System (systemEpochDay) import Data.Time.Format.ISO8601 (iso8601Show) +import Numeric.Natural (Natural) import Simplex.Messaging.Notifications.Server.Stats import Simplex.Messaging.Server.Stats (PeriodStatCounts (..)) import Simplex.Messaging.Transport (simplexMQVersion) @@ -32,13 +33,13 @@ rtsOptionsEnv = "NTF_RTS_OPTIONS" data NtfRealTimeMetrics = NtfRealTimeMetrics { threadsCount :: Int, - srvSubscribers :: NtfSMPWorkerMetrics, -- smpSubscribers - srvClients :: NtfSMPWorkerMetrics, -- smpClients - srvSubWorkers :: NtfSMPWorkerMetrics, -- smpSubWorkers - ntfActiveSubs :: NtfSMPSubMetrics, -- srvSubs - ntfPendingSubs :: NtfSMPSubMetrics, -- pendingSrvSubs - smpSessionCount :: Int, -- smpSessions - apnsPushQLength :: Int -- lengthTBQueue pushQ + srvSubscribers :: NtfSMPWorkerMetrics, + srvClients :: NtfSMPWorkerMetrics, + srvSubWorkers :: NtfSMPWorkerMetrics, + ntfActiveSubs :: NtfSMPSubMetrics, + ntfPendingSubs :: NtfSMPSubMetrics, + smpSessionCount :: Int, + apnsPushQLength :: Natural } data NtfSMPWorkerMetrics = NtfSMPWorkerMetrics {ownServers :: [Text], otherServers :: Int} @@ -212,9 +213,9 @@ ntfPrometheusMetrics sm rtm ts = \# TYPE simplex_ntf_smp_sessions_count gauge\n\ \simplex_ntf_smp_sessions_count " <> mshow smpSessionCount <> "\n# smpSessionCount\n\ \\n\ - \# HELP simplex_ntf_apns_queue_length Count of notifications in push queue\n\ - \# TYPE simplex_ntf_apns_queue_length gauge\n\ - \simplex_ntf_apns_queue_length " <> mshow apnsPushQLength <> "\n# apnsPushQLength\n\ + \# HELP simplex_ntf_apns_push_queue_length Count of notifications in push queue\n\ + \# TYPE simplex_ntf_apns_push_queue_length gauge\n\ + \simplex_ntf_apns_push_queue_length " <> mshow apnsPushQLength <> "\n# apnsPushQLength\n\ \\n" showSubMetric NtfSMPSubMetrics {ownSrvSubs, otherServers, otherSrvSubCount} mPfx descrPfx = showOwnSrvSubs <> showOtherSrvSubs diff --git a/src/Simplex/Messaging/Notifications/Server/Store/Migrations.hs b/src/Simplex/Messaging/Notifications/Server/Store/Migrations.hs index a9de42668..700be059f 100644 --- a/src/Simplex/Messaging/Notifications/Server/Store/Migrations.hs +++ b/src/Simplex/Messaging/Notifications/Server/Store/Migrations.hs @@ -33,14 +33,14 @@ CREATE TABLE tokens( dh_priv_key BYTEA NOT NULL, dh_secret BYTEA NOT NULL, reg_code BYTEA NOT NULL, - cron_interval BIGINT NOT NULL, - cron_sent_at BIGINT, + cron_interval BIGINT NOT NULL, -- minutes + cron_sent_at BIGINT, -- seconds updated_at BIGINT, PRIMARY KEY (token_id) ); CREATE UNIQUE INDEX idx_tokens_push_provider_token ON tokens(push_provider, push_provider_token, verify_key); -CREATE INDEX idx_tokens_cron_sent_at ON tokens((cron_sent_at + cron_interval)); +CREATE INDEX idx_tokens_status_cron_interval_sent_at ON tokens(status, cron_interval, (cron_sent_at + cron_interval * 60)); CREATE TABLE smp_servers( smp_server_id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, @@ -69,7 +69,7 @@ CREATE TABLE last_notifications( token_ntf_id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, token_id BYTEA NOT NULL REFERENCES tokens ON DELETE CASCADE ON UPDATE RESTRICT, subscription_id BYTEA NOT NULL REFERENCES subscriptions ON DELETE CASCADE ON UPDATE RESTRICT, - sent_at BIGINT NOT NULL, + sent_at TIMESTAMPTZ NOT NULL, nmsg_nonce BYTEA NOT NULL, nmsg_data BYTEA NOT NULL ); diff --git a/src/Simplex/Messaging/Notifications/Server/Store/Postgres.hs b/src/Simplex/Messaging/Notifications/Server/Store/Postgres.hs index 3aa1e7e31..031eda711 100644 --- a/src/Simplex/Messaging/Notifications/Server/Store/Postgres.hs +++ b/src/Simplex/Messaging/Notifications/Server/Store/Postgres.hs @@ -14,7 +14,7 @@ {-# LANGUAGE TupleSections #-} {-# LANGUAGE TypeApplications #-} {-# LANGUAGE TypeOperators #-} -{-# OPTIONS_GHC -fno-warn-orphans #-} +{-# OPTIONS_GHC -fno-warn-orphans -fno-warn-ambiguous-fields #-} module Simplex.Messaging.Notifications.Server.Store.Postgres where @@ -42,7 +42,8 @@ import qualified Data.Set as S import Data.Text (Text) import qualified Data.Text as T import Data.Text.Encoding (decodeLatin1, encodeUtf8) -import Data.Time.Clock.System (SystemTime (..)) +import Data.Time.Clock (UTCTime) +import Data.Time.Clock.System (SystemTime (..), systemToUTCTime, utcToSystemTime) import Data.Word (Word16) import Database.PostgreSQL.Simple (Binary (..), In (..), Only (..), Query, ToRow, (:.) (..)) import qualified Database.PostgreSQL.Simple as DB @@ -253,14 +254,14 @@ getUsedSMPServers st = |] (Only (In [NSNew, NSPending, NSActive, NSInactive])) -foldNtfSubscriptions :: NtfPostgresStore -> SMPServer -> Int -> s -> (s -> NtfSubRec -> IO s) -> IO s +foldNtfSubscriptions :: NtfPostgresStore -> SMPServer -> Int -> s -> (s -> ServerNtfSub -> IO s) -> IO s foldNtfSubscriptions st srv fetchCount state action = withConnection (dbStore st) $ \db -> - DB.foldWithOptions opts db query params state $ \s -> action s . toNtfSub + DB.foldWithOptions opts db query params state $ \s -> action s . toServerNtfSub where query = [sql| - SELECT s.subscription_id, s.token_id, s.smp_notifier_id, s.status, s.smp_notifier_key + SELECT s.subscription_id, s.smp_notifier_id, s.smp_notifier_key FROM subscriptions s JOIN smp_servers p ON p.smp_server_id = s.smp_server_id WHERE p.smp_host = ? AND p.smp_port = ? AND p.smp_keyhash = ? @@ -268,8 +269,7 @@ foldNtfSubscriptions st srv fetchCount state action = |] params = srvToRow srv :. Only (In [NSNew, NSPending, NSActive, NSInactive]) opts = DB.defaultFoldOptions {DB.fetchQuantity = DB.Fixed fetchCount} - toNtfSub (ntfSubId, tokenId, nId, subStatus, notifierKey) = - NtfSubRec {ntfSubId, tokenId, smpQueue = SMPQueueNtf srv nId, subStatus, notifierKey} + toServerNtfSub (ntfSubId, notifierId, notifierKey) = (ntfSubId, (notifierId, notifierKey)) -- Returns token and subscription. -- If subscription exists but belongs to another token, returns Left AUTH @@ -347,7 +347,7 @@ setTknStatusConfirmed st NtfTknRec {ntfTknId} = updated <- DB.execute db "UPDATE tokens SET status = ? WHERE token_id = ? AND status != ? AND status != ?" (NTConfirmed, ntfTknId, NTConfirmed, NTActive) when (updated > 0) $ withLog "updateTknStatus" st $ \sl -> logTokenStatus sl ntfTknId NTConfirmed -setTokenActive :: NtfPostgresStore -> NtfTknRec -> IO (Either ErrorType [NtfTokenId]) +setTokenActive :: NtfPostgresStore -> NtfTknRec -> IO (Either ErrorType ()) setTokenActive st tkn@NtfTknRec {ntfTknId, token = DeviceToken pp ppToken} = withDB' "setTokenActive" st $ \db -> do updateTknStatus_ st db tkn NTActive @@ -363,7 +363,17 @@ setTokenActive st tkn@NtfTknRec {ntfTknId, token = DeviceToken pp ppToken} = |] (pp, Binary ppToken, ntfTknId) withLog "deleteNtfToken" st $ \sl -> mapM_ (logDeleteToken sl) tknIds - pure tknIds + +withPeriodicNtfTokens :: NtfPostgresStore -> Int64 -> (NtfTknRec -> IO ()) -> IO Int +withPeriodicNtfTokens st now notify = + fmap (fromRight 0) $ withDB' "withPeriodicNtfTokens" st $ \db -> + DB.fold db (ntfTknQuery <> " WHERE status = ? AND cron_interval != 0 AND (cron_sent_at + cron_interval * 60) < ?") (NTActive, now) 0 $ \ !n row -> do + notify (rowToNtfTkn row) $> (n + 1) + +updateTokenCronSentAt :: NtfPostgresStore -> NtfTokenId -> Int64 -> IO (Either ErrorType ()) +updateTokenCronSentAt st tknId now = + withDB' "updateTokenCronSentAt" st $ \db -> + void $ DB.execute db "UPDATE tokens t SET cron_sent_at = ? WHERE token_id = ?" (now, tknId) addNtfSubscription :: NtfPostgresStore -> NtfSubRec -> IO (Either ErrorType Bool) addNtfSubscription st sub = @@ -495,10 +505,10 @@ batchUpdateStatus_ st srv mkParams = |] (srvToRow srv) -batchUpdateSubStatus :: NtfPostgresStore -> NonEmpty NtfSubRec -> NtfSubStatus -> IO Int64 +batchUpdateSubStatus :: NtfPostgresStore -> NonEmpty ServerNtfSub -> NtfSubStatus -> IO Int64 batchUpdateSubStatus st subs status = fmap (fromRight (-1)) $ withDB' "batchUpdateSubStatus" st $ \db -> do - let params = L.toList $ L.map (\NtfSubRec {ntfSubId} -> (status, ntfSubId)) subs + let params = L.toList $ L.map (\(subId, _) -> (status, subId)) subs subIds <- DB.returning db @@ -572,7 +582,7 @@ addTokenLastNtf st newNtf = JOIN smp_servers p ON p.smp_server_id = s.smp_server_id ORDER BY token_ntf_id DESC |] - (tId, sId, ntfTs, nmsgNonce, Binary encNMsgMeta, tId, maxNtfs, tId) + (tId, sId, systemToUTCTime ntfTs, nmsgNonce, Binary encNMsgMeta, tId, maxNtfs, tId) let lastNtfs = fromMaybe (newNtf :| []) (L.nonEmpty lastNtfs_) pure (tkn, lastNtfs) where @@ -581,9 +591,10 @@ addTokenLastNtf st newNtf = toTokenSubId :: NtfTknRow :. Only NtfSubscriptionId -> (NtfTknRec, NtfSubscriptionId) toTokenSubId (tknRow :. Only sId) = (rowToNtfTkn tknRow, sId) -toLastNtf :: SMPQueueNtfRow :. (SystemTime, C.CbNonce, Binary EncNMsgMeta) -> PNMessageData +toLastNtf :: SMPQueueNtfRow :. (UTCTime, C.CbNonce, Binary EncNMsgMeta) -> PNMessageData toLastNtf (qRow :. (ts, nonce, Binary encMeta)) = - PNMessageData {smpQueue = rowToSMPQueue qRow, ntfTs = ts, nmsgNonce = nonce, encNMsgMeta = encMeta} + let ntfTs = MkSystemTime (systemSeconds $ utcToSystemTime ts) 0 + in PNMessageData {smpQueue = rowToSMPQueue qRow, ntfTs, nmsgNonce = nonce, encNMsgMeta = encMeta} getEntityCounts :: NtfPostgresStore -> IO (Int64, Int64, Int64) getEntityCounts st = @@ -609,9 +620,11 @@ importNtfSTMStore NtfPostgresStore {dbStore = s} stmStore = do tokens <- filterTokens allTokens let skipped = length allTokens - length tokens when (skipped /= 0) $ putStrLn $ "Total skipped tokens " <> show skipped - -- uncomment this line instead of the next to import tokens one by one. + -- uncomment this line instead of the next two to import tokens one by one. -- tCnt <- withConnection s $ \db -> foldM (importTkn db) 0 tokens - tRows <- mapM (fmap ntfTknToRow . mkTknRec) tokens + -- token interval is reset to 0 to only send notifications to devices with periodic mode, + -- and before clients are upgraded - to all active devices. + tRows <- mapM (fmap (ntfTknToRow . (\t -> t {tknCronInterval = 0} :: NtfTknRec)) . mkTknRec) tokens tCnt <- withConnection s $ \db -> DB.executeMany db insertNtfTknQuery tRows let tokenIds = S.fromList $ map (\NtfTknData {ntfTknId} -> ntfTknId) tokens (tokenIds,) <$> checkCount "token" (length tokens) tCnt @@ -723,7 +736,7 @@ importNtfSTMStore NtfPostgresStore {dbStore = s} stmStore = do where ntfRow (!qs, !rows) PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta} = case M.lookup smpQueue subLookup of Just ntfSubId -> - let row = (tId, ntfSubId, ntfTs, nmsgNonce, Binary encNMsgMeta) + let row = (tId, ntfSubId, systemToUTCTime ntfTs, nmsgNonce, Binary encNMsgMeta) in (qs, row : rows) Nothing -> (S.insert smpQueue qs, rows) checkCount name expected inserted @@ -806,11 +819,6 @@ assertUpdated :: Int64 -> Either ErrorType () assertUpdated 0 = Left AUTH assertUpdated _ = Right () --- TODO [ntfdb] change instance and maybe field type to not round to a second, for more reliable sorting of the most recent notifications -instance FromField SystemTime where fromField f = fmap (`MkSystemTime` 0) . fromField f - -instance ToField SystemTime where toField = toField . systemSeconds - instance FromField NtfSubStatus where fromField = fromTextField_ $ either (const Nothing) Just . smpDecode . encodeUtf8 instance ToField NtfSubStatus where toField = toField . decodeLatin1 . smpEncode diff --git a/src/Simplex/Messaging/Notifications/Server/Store/Types.hs b/src/Simplex/Messaging/Notifications/Server/Store/Types.hs index 802906386..76233290b 100644 --- a/src/Simplex/Messaging/Notifications/Server/Store/Types.hs +++ b/src/Simplex/Messaging/Notifications/Server/Store/Types.hs @@ -14,7 +14,7 @@ import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding.String import Simplex.Messaging.Notifications.Protocol (DeviceToken, NtfRegCode, NtfSubStatus, NtfSubscriptionId, NtfTokenId, NtfTknStatus, SMPQueueNtf) import Simplex.Messaging.Notifications.Server.Store (NtfSubData (..), NtfTknData (..)) -import Simplex.Messaging.Protocol (NtfPrivateAuthKey, NtfPublicAuthKey) +import Simplex.Messaging.Protocol (NotifierId, NtfPrivateAuthKey, NtfPublicAuthKey) import Simplex.Messaging.Server.QueueStore (RoundedSystemTime) data NtfTknRec = NtfTknRec @@ -81,6 +81,8 @@ data NtfSubRec = NtfSubRec } deriving (Show) +type ServerNtfSub = (NtfSubscriptionId, (NotifierId, NtfPrivateAuthKey)) + mkSubData :: NtfSubRec -> IO NtfSubData mkSubData NtfSubRec {ntfSubId, smpQueue, notifierKey, tokenId, subStatus = status} = do subStatus <- newTVarIO status diff --git a/tests/NtfClient.hs b/tests/NtfClient.hs index 22bb3abaa..11d0400d6 100644 --- a/tests/NtfClient.hs +++ b/tests/NtfClient.hs @@ -152,6 +152,7 @@ ntfServerCfg = privateKeyFile = "tests/fixtures/server.key", certificateFile = "tests/fixtures/server.crt" }, + periodicNtfsInterval = 1, -- stats config logStatsInterval = Nothing, logStatsStartTime = 0,