From afb338a41ab6942dd32022ddfab08a53a6138fa4 Mon Sep 17 00:00:00 2001 From: Evgeny Date: Mon, 21 Apr 2025 17:12:16 +0100 Subject: [PATCH] ntf server: optimize in-memory storage (#1516) * ntf server: optimize in-memory storage * test * ntf server: fix store log parser for token status --- src/Simplex/Messaging/Client/Agent.hs | 16 +++- src/Simplex/Messaging/Notifications/Server.hs | 15 +-- .../Messaging/Notifications/Server/Store.hs | 92 +++++++++++-------- tests/ServerTests/SchemaDump.hs | 7 +- 4 files changed, 79 insertions(+), 51 deletions(-) diff --git a/src/Simplex/Messaging/Client/Agent.hs b/src/Simplex/Messaging/Client/Agent.hs index 1a7a67806..e8f5a03e2 100644 --- a/src/Simplex/Messaging/Client/Agent.hs +++ b/src/Simplex/Messaging/Client/Agent.hs @@ -358,7 +358,7 @@ smpSubscribeQueues party ca smp srv subs = do pending <- maybe (pure M.empty) readTVar =<< TM.lookup srv (pendingSrvSubs ca) let acc@(_, _, oks, notPending) = foldr (groupSub pending) (False, [], [], []) (L.zip subs rs) unless (null oks) $ addSubscriptions ca srv party oks - unless (null notPending) $ removePendingSubs ca srv party notPending + unless (null notPending) $ removePendingSubs ca srv party $ S.fromList notPending pure acc sessId = sessionId $ thParams smp groupSub :: Map SMPSub C.APrivateAuthKey -> ((QueueId, C.APrivateAuthKey), Either SMPClientError ()) -> (Bool, [(QueueId, SMPClientError)], [(QueueId, (SessionId, C.APrivateAuthKey))], [QueueId]) -> (Bool, [(QueueId, SMPClientError)], [(QueueId, (SessionId, C.APrivateAuthKey))], [QueueId]) @@ -412,14 +412,22 @@ removeSubscription :: SMPClientAgent -> SMPServer -> SMPSub -> STM () removeSubscription = removeSub_ . srvSubs {-# INLINE removeSubscription #-} +removePendingSub :: SMPClientAgent -> SMPServer -> SMPSub -> STM () +removePendingSub = removeSub_ . pendingSrvSubs +{-# INLINE removePendingSub #-} + removeSub_ :: TMap SMPServer (TMap SMPSub s) -> SMPServer -> SMPSub -> STM () removeSub_ subs srv s = TM.lookup srv subs >>= mapM_ (TM.delete s) -removePendingSubs :: SMPClientAgent -> SMPServer -> SMPSubParty -> [QueueId] -> STM () +removeSubscriptions :: SMPClientAgent -> SMPServer -> SMPSubParty -> Set QueueId -> STM () +removeSubscriptions = removeSubs_ . srvSubs +{-# INLINE removeSubscriptions #-} + +removePendingSubs :: SMPClientAgent -> SMPServer -> SMPSubParty -> Set QueueId -> STM () removePendingSubs = removeSubs_ . pendingSrvSubs {-# INLINE removePendingSubs #-} -removeSubs_ :: TMap SMPServer (TMap SMPSub C.APrivateAuthKey) -> SMPServer -> SMPSubParty -> [QueueId] -> STM () +removeSubs_ :: TMap SMPServer (TMap SMPSub s) -> SMPServer -> SMPSubParty -> Set QueueId -> STM () removeSubs_ subs srv party qs = TM.lookup srv subs >>= mapM_ (`modifyTVar'` (`M.withoutKeys` ss)) where - ss = S.fromList $ map (party,) qs + ss = S.map (party,) qs diff --git a/src/Simplex/Messaging/Notifications/Server.hs b/src/Simplex/Messaging/Notifications/Server.hs index 84aebf9db..d1c1e60ea 100644 --- a/src/Simplex/Messaging/Notifications/Server.hs +++ b/src/Simplex/Messaging/Notifications/Server.hs @@ -691,9 +691,10 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu TDEL -> do logDebug "TDEL" st <- asks store - qs <- atomically $ deleteNtfToken st tknId - forM_ qs $ \SMPQueueNtf {smpServer, notifierId} -> - atomically $ removeSubscription ca smpServer (SPNotifier, notifierId) + ss <- atomically $ deleteNtfToken st tknId + forM_ (M.assocs ss) $ \(smpServer, nIds) -> do + atomically $ removeSubscriptions ca smpServer SPNotifier nIds + atomically $ removePendingSubs ca smpServer SPNotifier nIds cancelInvervalNotifications tknId withNtfLog (`logDeleteToken` tknId) incNtfStatT token tknDeleted @@ -732,9 +733,10 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu subId <- getId sub <- atomically $ mkNtfSubData subId newSub resp <- - atomically (addNtfSubscription st subId sub) >>= \case - Just _ -> atomically (writeTBQueue newSubQ [NtfSub sub]) $> NRSubId subId - _ -> pure $ NRErr AUTH + ifM + (atomically $ addNtfSubscription st subId sub) + (atomically (writeTBQueue newSubQ [NtfSub sub]) $> NRSubId subId) + (pure $ NRErr AUTH) withNtfLog (`logCreateSubscription` sub) incNtfStat subCreated pure (corrId, NoEntity, resp) @@ -756,6 +758,7 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu st <- asks store atomically $ deleteNtfSubscription st subId atomically $ removeSubscription ca smpServer (SPNotifier, notifierId) + atomically $ removePendingSub ca smpServer (SPNotifier, notifierId) withNtfLog (`logDeleteSubscription` subId) incNtfStat subDeleted pure NROk diff --git a/src/Simplex/Messaging/Notifications/Server/Store.hs b/src/Simplex/Messaging/Notifications/Server/Store.hs index 259a933b6..b67c1901f 100644 --- a/src/Simplex/Messaging/Notifications/Server/Store.hs +++ b/src/Simplex/Messaging/Notifications/Server/Store.hs @@ -8,6 +8,7 @@ {-# LANGUAGE OverloadedLists #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE StandaloneDeriving #-} module Simplex.Messaging.Notifications.Server.Store where @@ -16,15 +17,16 @@ import Control.Monad import Data.ByteString.Char8 (ByteString) import Data.Functor (($>)) import Data.List.NonEmpty (NonEmpty (..), (<|)) +import Data.Map.Strict (Map) import qualified Data.Map.Strict as M -import Data.Maybe (catMaybes) +import Data.Maybe (isNothing) import Data.Set (Set) import qualified Data.Set as S import Data.Word (Word16) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding.String import Simplex.Messaging.Notifications.Protocol -import Simplex.Messaging.Protocol (NtfPrivateAuthKey, NtfPublicAuthKey, SMPServer) +import Simplex.Messaging.Protocol (NotifierId, NtfPrivateAuthKey, NtfPublicAuthKey, SMPServer) import Simplex.Messaging.Server.QueueStore (RoundedSystemTime) import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM @@ -35,8 +37,11 @@ data NtfStore = NtfStore -- multiple registrations exist to protect from malicious registrations if token is compromised tokenRegistrations :: TMap DeviceToken (TMap ByteString NtfTokenId), subscriptions :: TMap NtfSubscriptionId NtfSubData, - tokenSubscriptions :: TMap NtfTokenId (TVar (Set NtfSubscriptionId)), - subscriptionLookup :: TMap SMPQueueNtf NtfSubscriptionId, + -- the first set is used to delete from `subscriptions` when token is deleted, the second - to cancel SMP subsriptions. + -- TODO [notifications] it can be simplified once NtfSubData is fully removed. + tokenSubscriptions :: TMap NtfTokenId (TMap SMPServer (TVar (Set NtfSubscriptionId), TVar (Set NotifierId))), + -- TODO [notifications] for subscriptions that "migrated" to server subscription, we may replace NtfSubData with NtfTokenId here (Either NtfSubData NtfTokenId). + subscriptionLookup :: TMap SMPServer (TMap NotifierId NtfSubData), tokenLastNtfs :: TMap NtfTokenId (TVar (NonEmpty PNMessageData)) } @@ -134,7 +139,7 @@ removeTokenRegistration st NtfTknData {ntfTknId = tId, token, tknVerifyKey} = >>= mapM_ (\tId' -> when (tId == tId') $ TM.delete k regs) k = C.toPubKey C.pubKeyBytes tknVerifyKey -deleteNtfToken :: NtfStore -> NtfTokenId -> STM [SMPQueueNtf] +deleteNtfToken :: NtfStore -> NtfTokenId -> STM (Map SMPServer (Set NotifierId)) deleteNtfToken st tknId = do void $ TM.lookupDelete tknId (tokens st) $>>= \NtfTknData {token, tknVerifyKey} -> @@ -147,25 +152,25 @@ deleteNtfToken st tknId = do regs = tokenRegistrations st regKey = C.toPubKey C.pubKeyBytes -deleteTokenSubs :: NtfStore -> NtfTokenId -> STM [SMPQueueNtf] -deleteTokenSubs st tknId = do - qs <- - TM.lookupDelete tknId (tokenSubscriptions st) - >>= mapM (readTVar >=> mapM deleteSub . S.toList) - pure $ maybe [] catMaybes qs +deleteTokenSubs :: NtfStore -> NtfTokenId -> STM (Map SMPServer (Set NotifierId)) +deleteTokenSubs st tknId = + TM.lookupDelete tknId (tokenSubscriptions st) + >>= maybe (pure M.empty) (readTVar >=> deleteSrvSubs) where - deleteSub subId = do - TM.lookupDelete subId (subscriptions st) - $>>= \NtfSubData {smpQueue} -> - TM.delete smpQueue (subscriptionLookup st) $> Just smpQueue + deleteSrvSubs :: Map SMPServer (TVar (Set NtfSubscriptionId), TVar (Set NotifierId)) -> STM (Map SMPServer (Set NotifierId)) + deleteSrvSubs = M.traverseWithKey $ \smpServer (sVar, nVar) -> do + sIds <- readTVar sVar + modifyTVar' (subscriptions st) (`M.withoutKeys` sIds) + nIds <- readTVar nVar + TM.lookup smpServer (subscriptionLookup st) >>= mapM_ (`modifyTVar'` (`M.withoutKeys` nIds)) + pure nIds getNtfSubscriptionIO :: NtfStore -> NtfSubscriptionId -> IO (Maybe NtfSubData) getNtfSubscriptionIO st subId = TM.lookupIO subId (subscriptions st) findNtfSubscription :: NtfStore -> SMPQueueNtf -> STM (Maybe NtfSubData) -findNtfSubscription st smpQueue = do - TM.lookup smpQueue (subscriptionLookup st) - $>>= \subId -> TM.lookup subId (subscriptions st) +findNtfSubscription st SMPQueueNtf {smpServer, notifierId} = + TM.lookup smpServer (subscriptionLookup st) $>>= TM.lookup notifierId findNtfSubscriptionToken :: NtfStore -> SMPQueueNtf -> STM (Maybe NtfTknData) findNtfSubscriptionToken st smpQueue = do @@ -183,30 +188,45 @@ mkNtfSubData ntfSubId (NewNtfSub tokenId smpQueue notifierKey) = do subStatus <- newTVar NSNew pure NtfSubData {ntfSubId, smpQueue, tokenId, subStatus, notifierKey} -addNtfSubscription :: NtfStore -> NtfSubscriptionId -> NtfSubData -> STM (Maybe ()) -addNtfSubscription st subId sub@NtfSubData {smpQueue, tokenId} = - TM.lookup tokenId (tokenSubscriptions st) >>= maybe newTokenSub pure >>= insertSub +-- returns False if subscription existed before +addNtfSubscription :: NtfStore -> NtfSubscriptionId -> NtfSubData -> STM Bool +addNtfSubscription st subId sub@NtfSubData {smpQueue = SMPQueueNtf {smpServer, notifierId}, tokenId} = + TM.lookup tokenId (tokenSubscriptions st) + >>= maybe newTokenSubs pure + >>= \ts -> TM.lookup smpServer ts + >>= maybe (newTokenSrvSubs ts) pure + >>= insertSub where - newTokenSub = do - ts <- newTVar S.empty + newTokenSubs = do + ts <- newTVar M.empty TM.insert tokenId ts $ tokenSubscriptions st pure ts - insertSub ts = do - modifyTVar' ts $ S.insert subId + newTokenSrvSubs ts = do + tss <- (,) <$> newTVar S.empty <*> newTVar S.empty + TM.insert smpServer tss ts + pure tss + insertSub :: (TVar (Set NtfSubscriptionId), TVar (Set NotifierId)) -> STM Bool + insertSub (sIds, nIds) = do + modifyTVar' sIds $ S.insert subId + modifyTVar' nIds $ S.insert notifierId TM.insert subId sub $ subscriptions st - TM.insert smpQueue subId (subscriptionLookup st) - -- return Nothing if subscription existed before - pure $ Just () + TM.lookup smpServer (subscriptionLookup st) + >>= maybe newSubs pure + >>= fmap isNothing . TM.lookupInsert notifierId sub + newSubs = do + ss <- newTVar M.empty + TM.insert smpServer ss $ subscriptionLookup st + pure ss deleteNtfSubscription :: NtfStore -> NtfSubscriptionId -> STM () -deleteNtfSubscription st subId = do - TM.lookupDelete subId (subscriptions st) - >>= mapM_ - ( \NtfSubData {smpQueue, tokenId} -> do - TM.delete smpQueue $ subscriptionLookup st - ts_ <- TM.lookup tokenId (tokenSubscriptions st) - forM_ ts_ $ \ts -> modifyTVar' ts $ S.delete subId - ) +deleteNtfSubscription st subId = TM.lookupDelete subId (subscriptions st) >>= mapM_ deleteSubIndices + where + deleteSubIndices NtfSubData {smpQueue = SMPQueueNtf {smpServer, notifierId}, tokenId} = do + TM.lookup smpServer (subscriptionLookup st) >>= mapM_ (TM.delete notifierId) + tss_ <- TM.lookup tokenId (tokenSubscriptions st) $>>= TM.lookup smpServer + forM_ tss_ $ \(sIds, nIds) -> do + modifyTVar' sIds $ S.delete subId + modifyTVar' nIds $ S.delete notifierId addTokenLastNtf :: NtfStore -> NtfTokenId -> PNMessageData -> IO (NonEmpty PNMessageData) addTokenLastNtf st tknId newNtf = diff --git a/tests/ServerTests/SchemaDump.hs b/tests/ServerTests/SchemaDump.hs index e3ffdb5cb..001e9d71e 100644 --- a/tests/ServerTests/SchemaDump.hs +++ b/tests/ServerTests/SchemaDump.hs @@ -18,7 +18,7 @@ import Simplex.Messaging.Server.QueueStore.Postgres.Migrations (serverMigrations import Simplex.Messaging.Util (ifM) import System.Directory (doesFileExist, removeFile) import System.Environment (lookupEnv) -import System.Process (readCreateProcess, readCreateProcessWithExitCode, shell) +import System.Process (readCreateProcess, shell) import Test.Hspec testDBSchema :: B.ByteString @@ -87,10 +87,7 @@ getSchema schemaPath = do ("pg_dump " <> B.unpack testServerDBConnstr <> " --schema " <> B.unpack testDBSchema) <> " --schema-only --no-owner --no-privileges --no-acl --no-subscriptions --no-tablespaces > " <> schemaPath - (code, out, err) <- readCreateProcessWithExitCode (shell cmd) "" - print code - putStrLn $ "out: " <> out - putStrLn $ "err: " <> err + void $ readCreateProcess (shell cmd) "" threadDelay 20000 let sed = (if ci then "sed -i" else "sed -i ''") void $ readCreateProcess (shell $ sed <> " '/^--/d' " <> schemaPath) ""