ntf server, agent: send all periodic notifications from one thread, only to old active clients or new clients with periodic notification mode (#1528)

* ntf server, agent: send all periodic notifications from one thread, only to old active clients or new clients with periodic notification mode

* send different type via subscription queues

* option to compact store log on start
This commit is contained in:
Evgeny
2025-04-28 12:18:55 +01:00
committed by GitHub
parent f024ab1c3f
commit 7d0115daec
11 changed files with 137 additions and 131 deletions
+44 -62
View File
@@ -40,7 +40,7 @@ import qualified Data.Text as T
import qualified Data.Text.IO as T
import Data.Text.Encoding (decodeLatin1)
import Data.Time.Clock (UTCTime (..), diffTimeToPicoseconds, getCurrentTime)
import Data.Time.Clock.System (getSystemTime)
import Data.Time.Clock.System (SystemTime (..), getSystemTime)
import Data.Time.Format.ISO8601 (iso8601Show)
import GHC.IORef (atomicSwapIORef)
import GHC.Stats (getRTSStats)
@@ -76,7 +76,7 @@ import System.Environment (lookupEnv)
import System.Exit (exitFailure, exitSuccess)
import System.IO (BufferMode (..), hClose, hPrint, hPutStrLn, hSetBuffering, hSetNewlineMode, universalNewlineMode)
import System.Mem.Weak (deRefWeak)
import UnliftIO (IOMode (..), UnliftIO, askUnliftIO, async, uninterruptibleCancel, unliftIO, withFile)
import UnliftIO (IOMode (..), UnliftIO, askUnliftIO, unliftIO, withFile)
import UnliftIO.Concurrent (forkIO, killThread, mkWeakThreadId)
import UnliftIO.Directory (doesFileExist, renameFile)
import UnliftIO.Exception
@@ -108,6 +108,7 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg, startOptions}
raceAny_
( ntfSubscriber s
: ntfPush ps
: periodicNtfsThread ps
: map runServer transports
<> serverStatsThread_ cfg
<> prometheusMetricsThread_ cfg
@@ -252,7 +253,7 @@ ntfServer cfg@NtfServerConfig {transports, transportConfig = tCfg, startOptions}
ntfActiveSubs <- getSMPSubMetrics a srvSubs
ntfPendingSubs <- getSMPSubMetrics a pendingSrvSubs
smpSessionCount <- M.size <$> readTVarIO smpSessions
apnsPushQLength <- fromIntegral <$> atomically (lengthTBQueue pushQ)
apnsPushQLength <- atomically $ lengthTBQueue pushQ
pure NtfRealTimeMetrics {threadsCount, srvSubscribers, srvClients, srvSubWorkers, ntfActiveSubs, ntfPendingSubs, smpSessionCount, apnsPushQLength}
where
getSMPSubMetrics :: SMPClientAgent -> TMap SMPServer (TMap SMPSub a) -> IO NtfSMPSubMetrics
@@ -463,16 +464,12 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge
-- TODO [ntfdb] possibly, the subscriptions can be batched here and sent every say 5 seconds
-- this should be analysed once we have prometheus stats
subs <- atomically $ readTQueue subscriberSubQ
-- TODO [ntfdb] validate/partition that SMP server matches and log internal error if not
updated <- liftIO $ batchUpdateSubStatus st subs NSPending
logSubStatus smpServer "subscribing" (L.length subs) updated
liftIO $ subscribeQueues smpServer subs
-- \| Subscribe to queues. The list of results can have a different order.
subscribeQueues :: SMPServer -> NonEmpty NtfSubRec -> IO ()
subscribeQueues srv subs = subscribeQueuesNtfs ca srv (L.map sub subs)
where
sub NtfSubRec {smpQueue = SMPQueueNtf {notifierId}, notifierKey} = (notifierId, notifierKey)
subscribeQueues :: SMPServer -> NonEmpty ServerNtfSub -> IO ()
subscribeQueues srv subs = subscribeQueuesNtfs ca srv (L.map snd subs)
receiveSMP :: M ()
receiveSMP = forever $ do
@@ -492,7 +489,7 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge
let newNtf = PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta}
ntfs_ <- liftIO $ addTokenLastNtf st newNtf
forM_ ntfs_ $ \(tkn, lastNtfs) -> atomically $ writeTBQueue pushQ (tkn, PNMessage lastNtfs)
-- TODO [ntfdb] track queued notifications separately?
-- TODO [ntfdb] count queued notifications separately?
incNtfStat ntfReceived
Right SMP.END -> do
whenM (atomically $ activeClientSession' ca sessionId srv) $ do
@@ -554,31 +551,34 @@ ntfPush :: NtfPushServer -> M ()
ntfPush s@NtfPushServer {pushQ} = forever $ do
(tkn@NtfTknRec {ntfTknId, token = t@(DeviceToken pp _), tknStatus}, ntf) <- atomically (readTBQueue pushQ)
liftIO $ logDebug $ "sending push notification to " <> T.pack (show pp)
st <- asks store
case ntf of
PNVerification _ ->
deliverNotification pp tkn ntf >>= \case
liftIO (deliverNotification st pp tkn ntf) >>= \case
Right _ -> do
st <- asks store
void $ liftIO $ setTknStatusConfirmed st tkn
incNtfStatT t ntfVrfDelivered
Left _ -> incNtfStatT t ntfVrfFailed
PNCheckMessages -> checkActiveTkn tknStatus $ do
deliverNotification pp tkn ntf
>>= incNtfStatT t . (\case Left _ -> ntfCronFailed; Right () -> ntfCronDelivered)
PNCheckMessages -> do
liftIO (deliverNotification st pp tkn ntf) >>= \case
Right _ -> do
void $ liftIO $ updateTokenCronSentAt st ntfTknId . systemSeconds =<< getSystemTime
incNtfStatT t ntfCronDelivered
Left _ -> incNtfStatT t ntfCronFailed
PNMessage {} -> checkActiveTkn tknStatus $ do
stats <- asks serverStats
liftIO $ updatePeriodStats (activeTokens stats) ntfTknId
deliverNotification pp tkn ntf
liftIO (deliverNotification st pp tkn ntf)
>>= incNtfStatT t . (\case Left _ -> ntfFailed; Right () -> ntfDelivered)
where
checkActiveTkn :: NtfTknStatus -> M () -> M ()
checkActiveTkn status action
| status == NTActive = action
| otherwise = liftIO $ logError "bad notification token status"
deliverNotification :: PushProvider -> NtfTknRec -> PushNotification -> M (Either PushProviderError ())
deliverNotification pp tkn@NtfTknRec {ntfTknId} ntf = do
deliver <- liftIO $ getPushClient s pp
liftIO (runExceptT $ deliver tkn ntf) >>= \case
deliverNotification :: NtfPostgresStore -> PushProvider -> NtfTknRec -> PushNotification -> IO (Either PushProviderError ())
deliverNotification st pp tkn@NtfTknRec {ntfTknId} ntf = do
deliver <- getPushClient s pp
runExceptT (deliver tkn ntf) >>= \case
Right _ -> pure $ Right ()
Left e -> case e of
PPConnection _ -> retryDeliver
@@ -586,24 +586,35 @@ ntfPush s@NtfPushServer {pushQ} = forever $ do
PPCryptoError _ -> err e
PPResponseError {} -> err e
PPTokenInvalid r -> do
st <- asks store
void $ liftIO $ updateTknStatus st tkn $ NTInvalid $ Just r
void $ updateTknStatus st tkn $ NTInvalid $ Just r
err e
PPPermanentError -> err e
where
retryDeliver :: M (Either PushProviderError ())
retryDeliver :: IO (Either PushProviderError ())
retryDeliver = do
deliver <- liftIO $ newPushClient s pp
liftIO (runExceptT $ deliver tkn ntf) >>= \case
deliver <- newPushClient s pp
runExceptT (deliver tkn ntf) >>= \case
Right _ -> pure $ Right ()
Left e -> case e of
PPTokenInvalid r -> do
st <- asks store
void $ liftIO $ updateTknStatus st tkn $ NTInvalid $ Just r
void $ updateTknStatus st tkn $ NTInvalid $ Just r
err e
_ -> err e
err e = logError ("Push provider error (" <> tshow pp <> ", " <> tshow ntfTknId <> "): " <> tshow e) $> Left e
-- TODO [ntfdb] this could be further improved by sending periodic notifications directly from this thread,
-- without any queue
periodicNtfsThread :: NtfPushServer -> M ()
periodicNtfsThread NtfPushServer {pushQ} = do
st <- asks store
ntfsInterval <- asks $ periodicNtfsInterval . config
let interval = 1000000 * ntfsInterval
liftIO $ forever $ do
threadDelay interval
now <- systemSeconds <$> getSystemTime
cnt <- withPeriodicNtfTokens st now $ \tkn -> atomically $ writeTBQueue pushQ (tkn, PNCheckMessages)
logInfo $ "Scheduled periodic notifications: " <> tshow cnt
runNtfClientTransport :: Transport c => THandleNTF c 'TServer -> M ()
runNtfClientTransport th@THandle {params} = do
qSize <- asks $ clientQSize . config
@@ -692,7 +703,7 @@ verifyNtfTransmission st auth_ (tAuth, authorized, (corrId, entId, _)) = \case
e -> VRFailed e
client :: NtfServerClient -> NtfSubscriber -> NtfPushServer -> M ()
client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPushServer {pushQ, intervalNotifiers} =
client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPushServer {pushQ} =
forever $
atomically (readTBQueue rcvQ)
>>= mapM processCommand
@@ -728,11 +739,7 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu
TVFY code -- this allows repeated verification for cases when client connection dropped before server response
| (tknStatus == NTRegistered || tknStatus == NTConfirmed || tknStatus == NTActive) && tknRegCode == code -> do
logDebug "TVFY - token verified"
withNtfStore (`setTokenActive` tkn) $ \tIds -> do
-- TODO [ntfdb] this will be unnecessary if all cron notifications move to one thread
forM_ tIds cancelInvervalNotifications
incNtfStatT token tknVerified
pure NROk
withNtfStore (`setTokenActive` tkn) $ \_ -> NROk <$ incNtfStatT token tknVerified
| otherwise -> do
logDebug "TVFY - incorrect code or token status"
pure $ NRErr AUTH
@@ -754,45 +761,24 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu
forM_ ss $ \(smpServer, nIds) -> do
atomically $ removeSubscriptions ca smpServer SPNotifier nIds
atomically $ removePendingSubs ca smpServer SPNotifier nIds
cancelInvervalNotifications tknId
incNtfStatT token tknDeleted
pure NROk
TCRN 0 -> do
logDebug "TCRN 0"
withNtfStore (\st -> updateTknCronInterval st ntfTknId 0) $ \_ -> do
-- TODO [ntfdb] move cron intervals to one thread
cancelInvervalNotifications tknId
pure NROk
withNtfStore (\st -> updateTknCronInterval st ntfTknId 0) $ \_ -> pure NROk
TCRN int
| int < 20 -> pure $ NRErr QUOTA
| otherwise -> do
logDebug "TCRN"
withNtfStore (\st -> updateTknCronInterval st ntfTknId int) $ \_ -> do
-- TODO [ntfdb] move cron intervals to one thread
liftIO (TM.lookupIO tknId intervalNotifiers) >>= \case
Nothing -> runIntervalNotifier int
Just IntervalNotifier {interval, action} ->
unless (interval == int) $ do
uninterruptibleCancel action
runIntervalNotifier int
pure NROk
where
runIntervalNotifier interval = do
action <- async . intervalNotifier $ fromIntegral interval * 1000000 * 60
let notifier = IntervalNotifier {action, token = tkn, interval}
atomically $ TM.insert tknId notifier intervalNotifiers
where
intervalNotifier delay = forever $ do
liftIO $ threadDelay' delay
atomically $ writeTBQueue pushQ (tkn, PNCheckMessages)
NtfReqNew corrId (ANE SSubscription newSub@(NewNtfSub _ (SMPQueueNtf srv _) _)) -> do
withNtfStore (\st -> updateTknCronInterval st ntfTknId int) $ \_ -> pure NROk
NtfReqNew corrId (ANE SSubscription newSub@(NewNtfSub _ (SMPQueueNtf srv nId) nKey)) -> do
logDebug "SNEW - new subscription"
subId <- getId
let sub = mkNtfSubRec subId newSub
resp <-
withNtfStore (`addNtfSubscription` sub) $ \case
True -> do
atomically $ writeTBQueue newSubQ (srv, [sub])
atomically $ writeTBQueue newSubQ (srv, [(subId, (nId, nKey))])
incNtfStat subCreated
pure $ NRSubId subId
False -> pure $ NRErr AUTH
@@ -823,10 +809,6 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu
getRegCode = NtfRegCode <$> (randomBytes =<< asks (regCodeBytes . config))
randomBytes :: Int -> M ByteString
randomBytes n = atomically . C.randomBytes n =<< asks random
cancelInvervalNotifications :: NtfTokenId -> M ()
cancelInvervalNotifications tknId =
atomically (TM.lookupDelete tknId intervalNotifiers)
>>= mapM_ (uninterruptibleCancel . action)
withNtfStore :: (NtfPostgresStore -> IO (Either ErrorType a)) -> (a -> M NtfResponse) -> M NtfResponse
withNtfStore stAction continue = do