From 79adb8378245cb11f4abd507537efca9a69a28c2 Mon Sep 17 00:00:00 2001 From: sh <37271604+shumvgolove@users.noreply.github.com> Date: Thu, 27 Mar 2025 16:06:32 +0000 Subject: [PATCH 1/2] ci: add reproduce builds workflow (#1497) * ci: add reproduce builds workflow * ci: trigger webhook and build every day * ci: change secrets --- .github/workflows/reproduce-schedule.yml | 45 ++++++++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 .github/workflows/reproduce-schedule.yml diff --git a/.github/workflows/reproduce-schedule.yml b/.github/workflows/reproduce-schedule.yml new file mode 100644 index 000000000..a13c1fca2 --- /dev/null +++ b/.github/workflows/reproduce-schedule.yml @@ -0,0 +1,45 @@ +name: Reproduce latest release + +on: + workflow_dispatch: + schedule: + - cron: '0 2 * * *' # every day at 02:00 night + +jobs: + reproduce: + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v3 + + - name: Get latest release + shell: bash + run: | + curl --proto '=https' \ + --tlsv1.2 \ + -sSf -L \ + 'https://api.github.com/repos/simplex-chat/simplexmq/releases/latest' \ + 2>/dev/null | \ + grep -i "tag_name" | \ + awk -F \" '{print "TAG="$4}' >> $GITHUB_ENV + + - name: Execute reproduce script + run: | + ${GITHUB_WORKSPACE}/scripts/reproduce-builds.sh "$TAG" + + - name: Check if build has been reproduced + env: + url: ${{ secrets.STATUS_SIMPLEX_WEBHOOK_URL }} + user: ${{ secrets.STATUS_SIMPLEX_WEBHOOK_USER }} + pass: ${{ secrets.STATUS_SIMPLEX_WEBHOOK_PASS }} + run: | + if [ -f "${GITHUB_WORKSPACE}/$TAG/_sha256sums" ]; then + exit 0 + else + curl --proto '=https' --tlsv1.2 -sSf \ + -u "${user}:${pass}" \ + -H 'Content-Type: application/json' \ + -d '{"title": "👾 GitHub: Runner", "description": "⛔️ '"$TAG"' did not reproduce."}' \ + "$url" + exit 1 + fi From 7636bc7491c019782d680ed42fbc97ca34abfc0a Mon Sep 17 00:00:00 2001 From: Evgeny Date: Fri, 28 Mar 2025 18:51:54 +0000 Subject: [PATCH 2/2] smp server: remove locks for deleted queues, additional statistics for objects in memory (#1498) * smp server: remove locks for deleted queues, additional statistics for objects in memory * version * reduce queue cache usage * less caching, refactor * comments * revert version --- src/Simplex/Messaging/Server.hs | 6 +- src/Simplex/Messaging/Server/Env/STM.hs | 7 +- src/Simplex/Messaging/Server/Main.hs | 6 +- .../Messaging/Server/MsgStore/Journal.hs | 74 ++++++++++++------- src/Simplex/Messaging/Server/MsgStore/STM.hs | 9 ++- .../Messaging/Server/MsgStore/Types.hs | 13 +++- src/Simplex/Messaging/Server/Prometheus.hs | 30 +++++++- .../Messaging/Server/QueueStore/Postgres.hs | 70 ++++++++++-------- .../Messaging/Server/QueueStore/STM.hs | 2 +- .../Messaging/Server/QueueStore/Types.hs | 2 +- .../Messaging/Server/StoreLog/ReadWrite.hs | 2 +- tests/CoreTests/MsgStoreTests.hs | 32 ++++---- tests/CoreTests/StoreLogTests.hs | 2 +- tests/SMPClient.hs | 4 - tests/ServerTests.hs | 2 +- 15 files changed, 166 insertions(+), 95 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index e422fef84..5ba04e802 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -579,7 +579,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt pure ServerMetrics {statsData = d, activeQueueCounts = ps, activeNtfCounts = psNtf, queueCount, notifierCount} getRealTimeMetrics :: Env -> IO RealTimeMetrics - getRealTimeMetrics Env {clients, sockets, server = Server {subscribers, notifiers, subClients, ntfSubClients}} = do + getRealTimeMetrics Env {clients, sockets, msgStore = AMS _ _ ms, server = Server {subscribers, notifiers, subClients, ntfSubClients}} = do socketStats <- mapM (traverse getSocketStats) =<< readTVarIO sockets #if MIN_VERSION_base(4,18,0) threadsCount <- length <$> listThreads @@ -591,7 +591,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt smpSubClientsCount <- IM.size <$> readTVarIO subClients ntfSubsCount <- M.size <$> readTVarIO notifiers ntfSubClientsCount <- IM.size <$> readTVarIO ntfSubClients - pure RealTimeMetrics {socketStats, threadsCount, clientsCount, smpSubsCount, smpSubClientsCount, ntfSubsCount, ntfSubClientsCount} + loadedCounts <- loadedQueueCounts ms + pure RealTimeMetrics {socketStats, threadsCount, clientsCount, smpSubsCount, smpSubClientsCount, ntfSubsCount, ntfSubClientsCount, loadedCounts} runClient :: Transport c => C.APrivateSignKey -> TProxy c -> c -> M () runClient signKey tp h = do @@ -1418,6 +1419,7 @@ client withQueue :: (StoreQueue s -> QueueRec -> M (Transmission BrokerMsg)) -> M (Transmission BrokerMsg) withQueue = withQueue_ True + -- SEND passes queueNotBlocked False here to update time, but it fails anyway on blocked queues (see code for SEND). withQueue_ :: Bool -> (StoreQueue s -> QueueRec -> M (Transmission BrokerMsg)) -> M (Transmission BrokerMsg) withQueue_ queueNotBlocked action = case q_ of Nothing -> pure $ err INTERNAL diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index c1c72a39e..dbdb04758 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -332,13 +332,13 @@ newEnv config@ServerConfig {smpCredentials, httpCredentials, serverStoreCfg, smp ASSCfg qt mt (SSCMemory storePaths_) -> do let storePath = storeMsgsFile =<< storePaths_ ms <- newMsgStore STMStoreConfig {storePath, quota = msgQueueQuota} - forM_ storePaths_ $ \StorePaths {storeLogFile = f} -> loadStoreLog (mkQueue ms) f $ queueStore ms + forM_ storePaths_ $ \StorePaths {storeLogFile = f} -> loadStoreLog (mkQueue ms True) f $ queueStore ms pure $ AMS qt mt ms ASSCfg qt mt SSCMemoryJournal {storeLogFile, storeMsgsPath} -> do let qsCfg = MQStoreCfg cfg = mkJournalStoreConfig qsCfg storeMsgsPath msgQueueQuota maxJournalMsgCount maxJournalStateLines idleQueueInterval ms <- newMsgStore cfg - loadStoreLog (mkQueue ms) storeLogFile $ stmQueueStore ms + loadStoreLog (mkQueue ms True) storeLogFile $ stmQueueStore ms pure $ AMS qt mt ms #if defined(dbServerPostgres) ASSCfg qt mt SSCDatabaseJournal {storeCfg, storeMsgsPath'} -> do @@ -374,7 +374,8 @@ newEnv config@ServerConfig {smpCredentials, httpCredentials, serverStoreCfg, smp Just f -> do logInfo $ "compacting queues in file " <> T.pack f st <- newMsgStore STMStoreConfig {storePath = Nothing, quota = msgQueueQuota} - sl <- readWriteQueueStore False (mkQueue st) f (queueStore st) + -- we don't need to have locks in the map + sl <- readWriteQueueStore False (mkQueue st False) f (queueStore st) setStoreLog (queueStore st) sl closeMsgStore st Nothing -> do diff --git a/src/Simplex/Messaging/Server/Main.hs b/src/Simplex/Messaging/Server/Main.hs index 83e59243d..64178b7ce 100644 --- a/src/Simplex/Messaging/Server/Main.hs +++ b/src/Simplex/Messaging/Server/Main.hs @@ -118,7 +118,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = ("WARNING: message log file " <> storeMsgsFilePath <> " will be imported to journal directory " <> storeMsgsJournalDir) "Messages not imported" ms <- newJournalMsgStore MQStoreCfg - readQueueStore True (mkQueue ms) storeLogFile $ stmQueueStore ms + readQueueStore True (mkQueue ms False) storeLogFile $ stmQueueStore ms msgStats <- importMessages True ms storeMsgsFilePath Nothing False -- no expiration putStrLn "Import completed" printMessageStats "Messages" msgStats @@ -137,7 +137,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = "Journal not exported" ms <- newJournalMsgStore MQStoreCfg -- TODO [postgres] in case postgres configured, queues must be read from database - readQueueStore True (mkQueue ms) storeLogFile $ stmQueueStore ms + readQueueStore True (mkQueue ms False) storeLogFile $ stmQueueStore ms exportMessages True ms storeMsgsFilePath False putStrLn "Export completed" case readStoreType ini of @@ -179,7 +179,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = ("WARNING: store log file " <> storeLogFile <> " will be compacted and imported to PostrgreSQL database: " <> B.unpack connstr <> ", schema: " <> B.unpack schema) "Queue records not imported" ms <- newJournalMsgStore MQStoreCfg - sl <- readWriteQueueStore True (mkQueue ms) storeLogFile (queueStore ms) + sl <- readWriteQueueStore True (mkQueue ms False) storeLogFile (queueStore ms) closeStoreLog sl queues <- readTVarIO $ loadedQueues $ stmQueueStore ms let storeCfg = PostgresStoreCfg {dbOpts = dbOpts {createSchema = True}, dbStoreLogPath = Nothing, confirmMigrations = MCConsole, deletedTTL = iniDeletedTTL ini} diff --git a/src/Simplex/Messaging/Server/MsgStore/Journal.hs b/src/Simplex/Messaging/Server/MsgStore/Journal.hs index 0686a71bf..0b3992af5 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Journal.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Journal.hs @@ -62,7 +62,8 @@ import Data.Either (fromRight) import Data.Functor (($>)) import Data.Int (Int64) import Data.List (intercalate, sort) -import Data.Maybe (fromMaybe, isNothing, mapMaybe) +import qualified Data.Map.Strict as M +import Data.Maybe (fromMaybe, isJust, isNothing, mapMaybe) import Data.Text (Text) import qualified Data.Text as T import Data.Time.Clock (NominalDiffTime, UTCTime, addUTCTime, getCurrentTime) @@ -96,6 +97,7 @@ data JournalMsgStore s = JournalMsgStore queueLocks :: TMap RecipientId Lock, sharedLock :: TMVar RecipientId, queueStore_ :: QStore s, + openedQueueCount :: TVar Int, expireBackupsBefore :: UTCTime } @@ -338,12 +340,6 @@ instance QueueStoreClass (JournalQueue s) (QStore s) where deleteStoreQueue = withQS deleteStoreQueue {-# INLINE deleteStoreQueue #-} -#if defined(dbServerPostgres) -mkTempQueue :: JournalMsgStore s -> RecipientId -> QueueRec -> IO (JournalQueue s) -mkTempQueue ms rId qr = createLockIO >>= makeQueue_ ms rId qr -{-# INLINE mkTempQueue #-} -#endif - makeQueue_ :: JournalMsgStore s -> RecipientId -> QueueRec -> Lock -> IO (JournalQueue s) makeQueue_ JournalMsgStore {sharedLock} rId qr queueLock = do queueRec' <- newTVarIO $ Just qr @@ -373,8 +369,9 @@ instance MsgStoreClass (JournalMsgStore s) where queueLocks <- TM.emptyIO sharedLock <- newEmptyTMVarIO queueStore_ <- newQueueStore @(JournalQueue s) queueStoreCfg + openedQueueCount <- newTVarIO 0 expireBackupsBefore <- addUTCTime (- expireBackupsAfter config) <$> getCurrentTime - pure JournalMsgStore {config, random, queueLocks, sharedLock, queueStore_, expireBackupsBefore} + pure JournalMsgStore {config, random, queueLocks, sharedLock, queueStore_, openedQueueCount, expireBackupsBefore} closeMsgStore :: JournalMsgStore s -> IO () closeMsgStore ms = do @@ -382,7 +379,7 @@ instance MsgStoreClass (JournalMsgStore s) where closeQueues $ loadedQueues @(JournalQueue s) st closeQueueStore @(JournalQueue s) st where - closeQueues qs = readTVarIO qs >>= mapM_ closeMsgQueue + closeQueues qs = readTVarIO qs >>= mapM_ (closeMsgQueue ms) withActiveMsgQueues :: Monoid a => JournalMsgStore s -> (JournalQueue s -> IO a) -> IO a withActiveMsgQueues = withQS withLoadedQueues . queueStore_ @@ -393,12 +390,12 @@ instance MsgStoreClass (JournalMsgStore s) where unsafeWithAllMsgQueues tty ms action = case queueStore_ ms of MQStore st -> withLoadedQueues st run #if defined(dbServerPostgres) - PQStore st -> foldQueueRecs tty st Nothing $ uncurry (mkTempQueue ms) >=> run + PQStore st -> foldQueueRecs tty st Nothing $ uncurry (mkQueue ms False) >=> run #endif where run q = do r <- action q - closeMsgQueue q + closeMsgQueue ms q pure r -- This function is concurrency safe @@ -414,7 +411,7 @@ instance MsgStoreClass (JournalMsgStore s) where PQStore st -> do let JournalMsgStore {queueLocks, sharedLock} = ms foldQueueRecs tty st (Just veryOld) $ \(rId, qr) -> do - q <- mkTempQueue ms rId qr + q <- mkQueue ms False rId qr withSharedWaitLock rId queueLocks sharedLock $ run $ tryStore' "deleteExpiredMsgs" rId $ getLoadedQueue q >>= unStoreIO . expireQueueMsgs ms now old #endif @@ -441,9 +438,26 @@ instance MsgStoreClass (JournalMsgStore s) where queueStore = queueStore_ {-# INLINE queueStore #-} - mkQueue :: JournalMsgStore s -> RecipientId -> QueueRec -> IO (JournalQueue s) - mkQueue ms rId qr = do - lock <- atomically $ getMapLock (queueLocks ms) rId + loadedQueueCounts :: JournalMsgStore s -> IO LoadedQueueCounts + loadedQueueCounts ms = do + let (qs, ns, nLocks_) = loaded + loadedQueueCount <- M.size <$> readTVarIO qs + loadedNotifierCount <- M.size <$> readTVarIO ns + openJournalCount <- readTVarIO (openedQueueCount ms) + queueLockCount <- M.size <$> readTVarIO (queueLocks ms) + notifierLockCount <- maybe (pure 0) (fmap M.size . readTVarIO) nLocks_ + pure LoadedQueueCounts {loadedQueueCount, loadedNotifierCount, openJournalCount, queueLockCount, notifierLockCount} + where + loaded :: (TMap RecipientId (JournalQueue s), TMap NotifierId RecipientId, Maybe (TMap NotifierId Lock)) + loaded = case queueStore_ ms of + MQStore STMQueueStore {queues, notifiers} -> (queues, notifiers, Nothing) +#if defined(dbServerPostgres) + PQStore PostgresQueueStore {queues, notifiers, notifierLocks} -> (queues, notifiers, Just notifierLocks) +#endif + + mkQueue :: JournalMsgStore s -> Bool -> RecipientId -> QueueRec -> IO (JournalQueue s) + mkQueue ms keepLock rId qr = do + lock <- if keepLock then atomically $ getMapLock (queueLocks ms) rId else createLockIO makeQueue_ ms rId qr lock getMsgQueue :: JournalMsgStore s -> JournalQueue s -> Bool -> StoreIO s (JournalMsgQueue s) @@ -478,7 +492,7 @@ instance MsgStoreClass (JournalMsgStore s) where -- In case the queue became non-empty on write and then again empty on read -- we won't be closing it, to avoid frequent open/close on active queues. r <- peek - when (isNothing r) $ StoreIO $ closeMsgQueue q + when (isNothing r) $ StoreIO $ closeMsgQueue ms q pure r where peek = do @@ -492,7 +506,7 @@ instance MsgStoreClass (JournalMsgStore s) where Nothing -> E.bracket getNonEmptyMsgQueue - (mapM_ $ \_ -> closeMsgQueue q) + (mapM_ $ \_ -> closeMsgQueue ms q) (maybe (pure (Nothing, 0)) (unStoreIO . run)) where run mq = do @@ -502,7 +516,7 @@ instance MsgStoreClass (JournalMsgStore s) where Just mq -> do ts <- readTVarIO $ activeAt q r <- if now - ts >= idleInterval config - then Just <$> unStoreIO (action mq) `E.finally` closeMsgQueue q + then Just <$> unStoreIO (action mq) `E.finally` closeMsgQueue ms q else pure Nothing sz <- unStoreIO $ getQueueSize_ mq pure (r, sz) @@ -517,7 +531,7 @@ instance MsgStoreClass (JournalMsgStore s) where mq <- unStoreIO $ getMsgQueue ms q False -- queueState was updated in getMsgQueue readTVarIO queueState >>= \case - Just QState {hasStored} | not hasStored -> closeMsgQueue q $> Nothing + Just QState {hasStored} | not hasStored -> closeMsgQueue ms q $> Nothing _ -> pure $ Just mq deleteQueue :: JournalMsgStore s -> JournalQueue s -> IO (Either ErrorType QueueRec) @@ -580,6 +594,7 @@ instance MsgStoreClass (JournalMsgStore s) where rh <- createNewJournal queueDirectory $ journalId rs let hs = MsgQueueHandles {stateHandle = sh, readHandle = rh, writeHandle = Nothing} atomically $ writeTVar handles $ Just hs + atomically $ modifyTVar' (openedQueueCount ms) (+ 1) pure hs switchWriteJournal hs = do journalId <- newJournalId $ random ms @@ -651,13 +666,16 @@ openMsgQueue ms@JournalMsgStore {config} q@JMQueue {queueDirectory = dir, stateP Just st | size st == 0 -> do (st', hs_) <- removeJournals st shouldBackup + when (isJust hs_) incOpenedCount mkJournalQueue q st' hs_ | otherwise -> do sh <- openBackupQueueState st shouldBackup (st', rh, wh_) <- closeOnException sh $ openJournals ms dir st sh let hs = MsgQueueHandles {stateHandle = sh, readHandle = rh, writeHandle = wh_} + incOpenedCount mkJournalQueue q st' (Just hs) where + incOpenedCount = atomically $ modifyTVar' (openedQueueCount ms) (+ 1) -- If the queue is empty, journals are deleted. -- New journal is created if queue is written to. -- canWrite is set to True. @@ -920,28 +938,30 @@ validQueueState MsgQueueState {readState = rs, writeState = ws, size} && msgPos ws == msgCount ws && bytePos ws == byteCount ws --- TODO [postgres] possibly, we need to remove the lock from map deleteQueue_ :: JournalMsgStore s -> JournalQueue s -> IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s))) deleteQueue_ ms q = - runExceptT $ isolateQueueId "deleteQueue_" ms rId $ - deleteStoreQueue (queueStore_ ms) q >>= mapM remove + runExceptT $ isolateQueueId "deleteQueue_" ms rId $ do + r <- deleteStoreQueue (queueStore_ ms) q >>= mapM remove + atomically $ TM.delete rId (queueLocks ms) + pure r where rId = recipientId q remove r@(_, mq_) = do - mapM_ closeMsgQueueHandles mq_ + mapM_ (closeMsgQueueHandles ms) mq_ removeQueueDirectory ms rId pure r -closeMsgQueue :: JournalQueue s -> IO () -closeMsgQueue JournalQueue {msgQueue'} = atomically (swapTVar msgQueue' Nothing) >>= mapM_ closeMsgQueueHandles +closeMsgQueue :: JournalMsgStore s -> JournalQueue s -> IO () +closeMsgQueue ms JournalQueue {msgQueue'} = atomically (swapTVar msgQueue' Nothing) >>= mapM_ (closeMsgQueueHandles ms) -closeMsgQueueHandles :: JournalMsgQueue s -> IO () -closeMsgQueueHandles q = readTVarIO (handles q) >>= mapM_ closeHandles +closeMsgQueueHandles :: JournalMsgStore s -> JournalMsgQueue s -> IO () +closeMsgQueueHandles ms q = readTVarIO (handles q) >>= mapM_ closeHandles where closeHandles (MsgQueueHandles sh rh wh_) = do hClose sh hClose rh mapM_ hClose wh_ + atomically $ modifyTVar' (openedQueueCount ms) (subtract 1) removeQueueDirectory :: JournalMsgStore s -> RecipientId -> IO () removeQueueDirectory st = removeQueueDirectory_ . msgQueueDirectory st diff --git a/src/Simplex/Messaging/Server/MsgStore/STM.hs b/src/Simplex/Messaging/Server/MsgStore/STM.hs index 6fa94fd03..0ae592069 100644 --- a/src/Simplex/Messaging/Server/MsgStore/STM.hs +++ b/src/Simplex/Messaging/Server/MsgStore/STM.hs @@ -23,6 +23,7 @@ import Control.Monad.IO.Class import Control.Monad.Trans.Except import Data.Functor (($>)) import Data.Int (Int64) +import qualified Data.Map.Strict as M import Simplex.Messaging.Protocol import Simplex.Messaging.Server.MsgStore.Types import Simplex.Messaging.Server.QueueStore @@ -94,7 +95,13 @@ instance MsgStoreClass STMMsgStore where queueStore = queueStore_ {-# INLINE queueStore #-} - mkQueue _ rId qr = STMQueue rId <$> newTVarIO (Just qr) <*> newTVarIO Nothing + loadedQueueCounts :: STMMsgStore -> IO LoadedQueueCounts + loadedQueueCounts STMMsgStore {queueStore_ = st} = do + loadedQueueCount <- M.size <$> readTVarIO (queues st) + loadedNotifierCount <- M.size <$> readTVarIO (notifiers st) + pure LoadedQueueCounts {loadedQueueCount, loadedNotifierCount, openJournalCount = 0, queueLockCount = 0, notifierLockCount = 0} + + mkQueue _ _ rId qr = STMQueue rId <$> newTVarIO (Just qr) <*> newTVarIO Nothing {-# INLINE mkQueue #-} getMsgQueue :: STMMsgStore -> STMQueue -> Bool -> STM STMMsgQueue diff --git a/src/Simplex/Messaging/Server/MsgStore/Types.hs b/src/Simplex/Messaging/Server/MsgStore/Types.hs index 01dfdb88c..93c61370d 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Types.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Types.hs @@ -43,9 +43,10 @@ class (Monad (StoreMonad s), QueueStoreClass (StoreQueue s) (QueueStore s)) => M logQueueStates :: s -> IO () logQueueState :: StoreQueue s -> StoreMonad s () queueStore :: s -> QueueStore s + loadedQueueCounts :: s -> IO LoadedQueueCounts -- message store methods - mkQueue :: s -> RecipientId -> QueueRec -> IO (StoreQueue s) + mkQueue :: s -> Bool -> RecipientId -> QueueRec -> IO (StoreQueue s) getMsgQueue :: s -> StoreQueue s -> Bool -> StoreMonad s (MsgQueue (StoreQueue s)) getPeekMsgQueue :: s -> StoreQueue s -> StoreMonad s (Maybe (MsgQueue (StoreQueue s), Message)) @@ -88,11 +89,19 @@ instance Semigroup MessageStats where MessageStats a b c <> MessageStats x y z = MessageStats (a + x) (b + y) (c + z) {-# INLINE (<>) #-} +data LoadedQueueCounts = LoadedQueueCounts + { loadedQueueCount :: Int, + loadedNotifierCount :: Int, + openJournalCount :: Int, + queueLockCount :: Int, + notifierLockCount :: Int + } + newMessageStats :: MessageStats newMessageStats = MessageStats 0 0 0 addQueue :: MsgStoreClass s => s -> RecipientId -> QueueRec -> IO (Either ErrorType (StoreQueue s)) -addQueue st = addQueue_ (queueStore st) (mkQueue st) +addQueue st = addQueue_ (queueStore st) (mkQueue st True) {-# INLINE addQueue #-} getQueue :: (MsgStoreClass s, DirectParty p) => s -> SParty p -> QueueId -> IO (Either ErrorType (StoreQueue s)) diff --git a/src/Simplex/Messaging/Server/Prometheus.hs b/src/Simplex/Messaging/Server/Prometheus.hs index 3f5c3f87e..a542a87f1 100644 --- a/src/Simplex/Messaging/Server/Prometheus.hs +++ b/src/Simplex/Messaging/Server/Prometheus.hs @@ -12,6 +12,7 @@ import Data.Time.Clock (UTCTime (..), diffUTCTime) import Data.Time.Clock.System (systemEpochDay) import Data.Time.Format.ISO8601 (iso8601Show) import Network.Socket (ServiceName) +import Simplex.Messaging.Server.MsgStore.Types (LoadedQueueCounts (..)) import Simplex.Messaging.Server.Stats import Simplex.Messaging.Transport.Server (SocketStats (..)) @@ -30,7 +31,8 @@ data RealTimeMetrics = RealTimeMetrics smpSubsCount :: Int, smpSubClientsCount :: Int, ntfSubsCount :: Int, - ntfSubClientsCount :: Int + ntfSubClientsCount :: Int, + loadedCounts :: LoadedQueueCounts } {-# FOURMOLU_DISABLE\n#-} @@ -46,7 +48,8 @@ prometheusMetrics sm rtm ts = smpSubsCount, smpSubClientsCount, ntfSubsCount, - ntfSubClientsCount + ntfSubClientsCount, + loadedCounts } = rtm ServerStatsData { _fromTime, @@ -371,7 +374,28 @@ prometheusMetrics sm rtm ts = \\n\ \# HELP simplex_smp_subscription_ntf_clients_total Total subscribed NTF servers, first counting method\n\ \# TYPE simplex_smp_subscription_ntf_clients_total gauge\n\ - \simplex_smp_subscription_ntf_clients_total " <> mshow ntfSubClientsCount <> "\n# ntfSubClients\n" + \simplex_smp_subscription_ntf_clients_total " <> mshow ntfSubClientsCount <> "\n# ntfSubClients\n\ + \\n\ + \# HELP simplex_smp_loaded_queues_queue_count Total loaded queues count (all queues for memory/journal storage)\n\ + \# TYPE simplex_smp_loaded_queues_queue_count gauge\n\ + \simplex_smp_loaded_queues_queue_count " <> mshow (loadedQueueCount loadedCounts) <> "\n# loadedCounts.loadedQueueCount\n\ + \\n\ + \# HELP simplex_smp_loaded_queues_ntf_count Total loaded ntf credential references (all ntf credentials for memory/journal storage)\n\ + \# TYPE simplex_smp_loaded_queues_ntf_count gauge\n\ + \simplex_smp_loaded_queues_ntf_count " <> mshow (loadedNotifierCount loadedCounts) <> "\n# loadedCounts.loadedNotifierCount\n\ + \\n\ + \# HELP simplex_smp_loaded_queues_open_journal_count Total opened queue journals (0 for memory storage)\n\ + \# TYPE simplex_smp_loaded_queues_open_journal_count gauge\n\ + \simplex_smp_loaded_queues_open_journal_count " <> mshow (openJournalCount loadedCounts) <> "\n# loadedCounts.openJournalCount\n\ + \\n\ + \# HELP simplex_smp_loaded_queues_queue_lock_count Total queue locks (0 for memory storage)\n\ + \# TYPE simplex_smp_loaded_queues_queue_lock_count gauge\n\ + \simplex_smp_loaded_queues_queue_lock_count " <> mshow (queueLockCount loadedCounts) <> "\n# loadedCounts.queueLockCount\n\ + \\n\ + \# HELP simplex_smp_loaded_queues_ntf_lock_count Total notifier locks (0 for memory/journal storage)\n\ + \# TYPE simplex_smp_loaded_queues_ntf_lock_count gauge\n\ + \simplex_smp_loaded_queues_ntf_lock_count " <> mshow (notifierLockCount loadedCounts) <> "\n# loadedCounts.notifierLockCount\n" + socketsMetric :: (SocketStats -> Int) -> Text -> Text -> Text socketsMetric sel metric descr = "# HELP " <> metric <> " " <> descr <> "\n" diff --git a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs index 93d6e8213..83a25cb0e 100644 --- a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs +++ b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs @@ -35,7 +35,6 @@ import Control.Monad.Trans.Except import Data.ByteString.Builder (Builder) import qualified Data.ByteString.Builder as BB import Data.ByteString.Char8 (ByteString) -import qualified Data.ByteString.Char8 as B import qualified Data.ByteString.Lazy as LB import Data.Bitraversable (bimapM) import Data.Either (fromRight) @@ -155,37 +154,48 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where -- hasId = anyM [TM.memberIO rId queues, TM.memberIO senderId senders, hasNotifier] -- hasNotifier = maybe (pure False) (\NtfCreds {notifierId} -> TM.memberIO notifierId notifiers) notifier - getQueue_ :: DirectParty p => PostgresQueueStore q -> (RecipientId -> QueueRec -> IO q) -> SParty p -> QueueId -> IO (Either ErrorType q) + getQueue_ :: DirectParty p => PostgresQueueStore q -> (Bool -> RecipientId -> QueueRec -> IO q) -> SParty p -> QueueId -> IO (Either ErrorType q) getQueue_ st mkQ party qId = case party of SRecipient -> getRcvQueue qId - SSender -> TM.lookupIO qId senders >>= maybe loadSndQueue getRcvQueue - SNotifier -> TM.lookupIO qId notifiers >>= maybe loadNtfQueue getRcvQueue + SSender -> TM.lookupIO qId senders >>= maybe (mask loadSndQueue) getRcvQueue + -- loaded queue is deleted from notifiers map to reduce cache size after queue was subscribed to by ntf server + SNotifier -> TM.lookupIO qId notifiers >>= maybe (mask loadNtfQueue) (getRcvQueue >=> (atomically (TM.delete qId notifiers) $>)) where PostgresQueueStore {queues, senders, notifiers} = st - getRcvQueue rId = TM.lookupIO rId queues >>= maybe loadRcvQueue (pure . Right) - loadRcvQueue = loadQueue " WHERE recipient_id = ?" $ \_ -> pure () - loadSndQueue = loadQueue " WHERE sender_id = ?" $ \rId -> TM.insert qId rId senders - loadNtfQueue = loadQueue " WHERE notifier_id = ?" $ \_ -> pure () -- do NOT cache ref - ntf subscriptions are rare - loadQueue condition insertRef = - E.uninterruptibleMask_ $ runExceptT $ do - (rId, qRec) <- - withDB "getQueue_" st $ \db -> firstRow rowToQueueRec AUTH $ - DB.query db (queueRecQuery <> condition <> " AND deleted_at IS NULL") (Only qId) - liftIO $ do - sq <- mkQ rId qRec -- loaded queue - -- This lock prevents the scenario when the queue is added to cache, - -- while another thread is proccessing the same queue in withAllMsgQueues - -- without adding it to cache, possibly trying to open the same files twice. - -- Alse see comment in idleDeleteExpiredMsgs. - withQueueLock sq "getQueue_" $ atomically $ - -- checking the cache again for concurrent reads, - -- use previously loaded queue if exists. - TM.lookup rId queues >>= \case - Just sq' -> pure sq' - Nothing -> do - insertRef rId - TM.insert rId sq queues - pure sq + getRcvQueue rId = TM.lookupIO rId queues >>= maybe (mask loadRcvQueue) (pure . Right) + loadRcvQueue = do + (rId, qRec) <- loadQueue " WHERE recipient_id = ?" + liftIO $ cacheQueue rId qRec $ \_ -> pure () -- recipient map already checked, not caching sender ref + loadSndQueue = do + (rId, qRec) <- loadQueue " WHERE sender_id = ?" + liftIO $ + TM.lookupIO rId queues -- checking recipient map first + >>= maybe (cacheQueue rId qRec cacheSender) (atomically (cacheSender rId) $>) + loadNtfQueue = do + (rId, qRec) <- loadQueue " WHERE notifier_id = ?" + liftIO $ + TM.lookupIO rId queues -- checking recipient map first, not creating lock in map, not caching queue + >>= maybe (mkQ False rId qRec) pure + mask = E.uninterruptibleMask_ . runExceptT + cacheSender rId = TM.insert qId rId senders + loadQueue condition = + withDB "getQueue_" st $ \db -> firstRow rowToQueueRec AUTH $ + DB.query db (queueRecQuery <> condition <> " AND deleted_at IS NULL") (Only qId) + cacheQueue rId qRec insertRef = do + sq <- mkQ True rId qRec -- loaded queue + -- This lock prevents the scenario when the queue is added to cache, + -- while another thread is proccessing the same queue in withAllMsgQueues + -- without adding it to cache, possibly trying to open the same files twice. + -- Alse see comment in idleDeleteExpiredMsgs. + withQueueLock sq "getQueue_" $ atomically $ + -- checking the cache again for concurrent reads, + -- use previously loaded queue if exists. + TM.lookup rId queues >>= \case + Just sq' -> pure sq' + Nothing -> do + insertRef rId + TM.insert rId sq queues + pure sq secureQueue :: PostgresQueueStore q -> q -> SndPublicAuthKey -> IO (Either ErrorType ()) secureQueue st sq sKey = @@ -289,7 +299,9 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where DB.execute db "UPDATE msg_queues SET deleted_at = ? WHERE recipient_id = ? AND deleted_at IS NULL" (ts, rId) atomically $ writeTVar qr Nothing atomically $ TM.delete (senderId q) $ senders st - forM_ (notifier q) $ \NtfCreds {notifierId} -> atomically $ TM.delete notifierId $ notifiers st + forM_ (notifier q) $ \NtfCreds {notifierId} -> do + atomically $ TM.delete notifierId $ notifiers st + atomically $ TM.delete notifierId $ notifierLocks st mq_ <- atomically $ swapTVar (msgQueue sq) Nothing withLog "deleteStoreQueue" st (`logDeleteQueue` rId) pure (q, mq_) diff --git a/src/Simplex/Messaging/Server/QueueStore/STM.hs b/src/Simplex/Messaging/Server/QueueStore/STM.hs index 8a360c3a0..e22597d0e 100644 --- a/src/Simplex/Messaging/Server/QueueStore/STM.hs +++ b/src/Simplex/Messaging/Server/QueueStore/STM.hs @@ -92,7 +92,7 @@ instance StoreQueueClass q => QueueStoreClass q (STMQueueStore q) where hasId = anyM [TM.member rId queues, TM.member sId senders, hasNotifier] hasNotifier = maybe (pure False) (\NtfCreds {notifierId} -> TM.member notifierId notifiers) notifier - getQueue_ :: DirectParty p => STMQueueStore q -> (RecipientId -> QueueRec -> IO q) -> SParty p -> QueueId -> IO (Either ErrorType q) + getQueue_ :: DirectParty p => STMQueueStore q -> (Bool -> RecipientId -> QueueRec -> IO q) -> SParty p -> QueueId -> IO (Either ErrorType q) getQueue_ st _ party qId = maybe (Left AUTH) Right <$> case party of SRecipient -> TM.lookupIO qId queues diff --git a/src/Simplex/Messaging/Server/QueueStore/Types.hs b/src/Simplex/Messaging/Server/QueueStore/Types.hs index 8af65a335..e4ff517fc 100644 --- a/src/Simplex/Messaging/Server/QueueStore/Types.hs +++ b/src/Simplex/Messaging/Server/QueueStore/Types.hs @@ -29,7 +29,7 @@ class StoreQueueClass q => QueueStoreClass q s where loadedQueues :: s -> TMap RecipientId q compactQueues :: s -> IO Int64 addQueue_ :: s -> (RecipientId -> QueueRec -> IO q) -> RecipientId -> QueueRec -> IO (Either ErrorType q) - getQueue_ :: DirectParty p => s -> (RecipientId -> QueueRec -> IO q) -> SParty p -> QueueId -> IO (Either ErrorType q) + getQueue_ :: DirectParty p => s -> (Bool -> RecipientId -> QueueRec -> IO q) -> SParty p -> QueueId -> IO (Either ErrorType q) secureQueue :: s -> q -> SndPublicAuthKey -> IO (Either ErrorType ()) addQueueNotifier :: s -> q -> NtfCreds -> IO (Either ErrorType (Maybe NotifierId)) deleteQueueNotifier :: s -> q -> IO (Either ErrorType (Maybe NotifierId)) diff --git a/src/Simplex/Messaging/Server/StoreLog/ReadWrite.hs b/src/Simplex/Messaging/Server/StoreLog/ReadWrite.hs index fd4da85ab..77be7c97a 100644 --- a/src/Simplex/Messaging/Server/StoreLog/ReadWrite.hs +++ b/src/Simplex/Messaging/Server/StoreLog/ReadWrite.hs @@ -56,7 +56,7 @@ readQueueStore tty mkQ f st = readLogLines tty f $ \_ -> processLine withQueue qId op a = runExceptT go >>= qError qId op where go = do - q <- ExceptT $ getQueue_ st mkQ SRecipient qId + q <- ExceptT $ getQueue_ st (\_ -> mkQ) SRecipient qId liftIO (readTVarIO $ queueRec q) >>= \case Nothing -> logWarn $ logPfx qId op <> "already deleted" Just _ -> void $ ExceptT $ a q diff --git a/tests/CoreTests/MsgStoreTests.hs b/tests/CoreTests/MsgStoreTests.hs index 1f9cbf777..0ec1031be 100644 --- a/tests/CoreTests/MsgStoreTests.hs +++ b/tests/CoreTests/MsgStoreTests.hs @@ -196,7 +196,7 @@ testExportImportStore ms = do g <- C.newRandom (rId1, qr1) <- testNewQueueRec g True (rId2, qr2) <- testNewQueueRec g True - sl <- readWriteQueueStore True (mkQueue ms) testStoreLogFile $ queueStore ms + sl <- readWriteQueueStore True (mkQueue ms True) testStoreLogFile $ queueStore ms runRight_ $ do let write q s = writeMsg ms q True =<< mkMessage s q1 <- ExceptT $ addQueue ms rId1 qr1 @@ -221,7 +221,7 @@ testExportImportStore ms = do closeStoreLog sl let cfg = (testJournalStoreCfg MQStoreCfg :: JournalStoreConfig 'QSMemory) {storePath = testStoreMsgsDir2} ms' <- newMsgStore cfg - readWriteQueueStore True (mkQueue ms') testStoreLogFile (queueStore ms') >>= closeStoreLog + readWriteQueueStore True (mkQueue ms' True) testStoreLogFile (queueStore ms') >>= closeStoreLog stats@MessageStats {storedMsgsCount = 5, expiredMsgsCount = 0, storedQueues = 2} <- importMessages False ms' testStoreMsgsFile Nothing False printMessageStats "Messages" stats @@ -230,7 +230,7 @@ testExportImportStore ms = do exportMessages False ms' testStoreMsgsFile2 False (B.readFile testStoreMsgsFile2 `shouldReturn`) =<< B.readFile (testStoreMsgsFile <> ".bak") stmStore <- newMsgStore testSMTStoreConfig - readWriteQueueStore True (mkQueue stmStore) testStoreLogFile (queueStore stmStore) >>= closeStoreLog + readWriteQueueStore True (mkQueue stmStore True) testStoreLogFile (queueStore stmStore) >>= closeStoreLog MessageStats {storedMsgsCount = 5, expiredMsgsCount = 0, storedQueues = 2} <- importMessages False stmStore testStoreMsgsFile2 Nothing False exportMessages False stmStore testStoreMsgsFile False @@ -311,7 +311,7 @@ testMessageState ms = do q <- ExceptT $ addQueue ms rId qr Just (Message {msgId = mId1}, True) <- write q "message 1" Just (Message {}, False) <- write q "message 2" - liftIO $ closeMsgQueue q + liftIO $ closeMsgQueue ms q pure mId1 ls <- B.lines <$> B.readFile statePath @@ -322,7 +322,7 @@ testMessageState ms = do Just (Message {msgId = mId3}, False) <- write q "message 3" (Msg "message 1", Msg "message 3") <- tryDelPeekMsg ms q mId1 (Msg "message 3", Nothing) <- tryDelPeekMsg ms q mId3 - liftIO $ closeMsgQueue q + liftIO $ closeMsgQueue ms q testRemoveJournals :: JournalMsgStore s -> IO () testRemoveJournals ms = do @@ -338,7 +338,7 @@ testRemoveJournals ms = do Just (Message {msgId = mId2}, False) <- write q "message 2" (Msg "message 1", Msg "message 2") <- tryDelPeekMsg ms q mId1 (Msg "message 2", Nothing) <- tryDelPeekMsg ms q mId2 - liftIO $ closeMsgQueue q + liftIO $ closeMsgQueue ms q ls <- B.lines <$> B.readFile statePath length ls `shouldBe` 4 @@ -377,7 +377,7 @@ testRemoveJournals ms = do liftIO $ journalFilesCount dir `shouldReturn` 1 (Msg "message 7", Just MessageQuota {msgId = mId8}) <- tryDelPeekMsg ms q mId7 (Just MessageQuota {}, Nothing) <- tryDelPeekMsg ms q mId8 - liftIO $ closeMsgQueue q + liftIO $ closeMsgQueue ms q journalFilesCount dir `shouldReturn` 1 runRight $ do @@ -385,7 +385,7 @@ testRemoveJournals ms = do Just (Message {}, True) <- write q "message 8" liftIO $ journalFilesCount dir `shouldReturn` 1 liftIO $ stateBackupCount dir `shouldReturn` 2 - liftIO $ closeMsgQueue q + liftIO $ closeMsgQueue ms q where journalFilesCount dir = length . filter ("messages." `isPrefixOf`) <$> listDirectory dir stateBackupCount dir = length . filter (".bak" `isSuffixOf`) <$> listDirectory dir @@ -408,20 +408,20 @@ testRemoveQueueStateBackups = do Just (Message {msgId = mId2}, False) <- write q "message 2" (Msg "message 1", Msg "message 2") <- tryDelPeekMsg ms q mId1 (Msg "message 2", Nothing) <- tryDelPeekMsg ms q mId2 - liftIO $ closeMsgQueue q + liftIO $ closeMsgQueue ms q liftIO $ stateBackupCount dir `shouldReturn` 0 q1 <- ExceptT $ getQueue ms SRecipient rId Just (Message {}, True) <- write q1 "message 3" Just (Message {}, False) <- write q1 "message 4" - liftIO $ closeMsgQueue q1 + liftIO $ closeMsgQueue ms q1 liftIO $ stateBackupCount dir `shouldReturn` 0 liftIO $ threadDelay 1000000 q2 <- ExceptT $ getQueue ms SRecipient rId Just (Message {}, False) <- write q2 "message 5" Nothing <- write q2 "message 5" - liftIO $ closeMsgQueue q2 + liftIO $ closeMsgQueue ms q2 liftIO $ stateBackupCount dir `shouldReturn` 1 where stateBackupCount dir = length . filter (".bak" `isSuffixOf`) <$> listDirectory dir @@ -443,7 +443,7 @@ testExpireIdleQueues = do Just (Message {msgId = mId2}, False) <- write q "message 2" (Msg "message 1", Msg "message 2") <- tryDelPeekMsg ms q mId1 (Msg "message 2", Nothing) <- tryDelPeekMsg ms q mId2 - liftIO $ closeMsgQueue q + liftIO $ closeMsgQueue ms q pure q (Just MsgQueueState {size = 0, readState = rs, writeState = ws}, True) <- readQueueState ms statePath @@ -472,7 +472,7 @@ testReadFileMissing ms = do mq <- fromJust <$> readTVarIO (msgQueue q) MsgQueueState {readState = rs} <- readTVarIO $ state mq - closeMsgQueue q + closeMsgQueue ms q let path = journalFilePath (queueDirectory $ queue mq) $ journalId rs removeFile path @@ -491,7 +491,7 @@ testReadFileMissingSwitch ms = do mq <- fromJust <$> readTVarIO (msgQueue q) MsgQueueState {readState = rs} <- readTVarIO $ state mq - closeMsgQueue q + closeMsgQueue ms q let path = journalFilePath (queueDirectory $ queue mq) $ journalId rs removeFile path @@ -509,7 +509,7 @@ testWriteFileMissing ms = do mq <- fromJust <$> readTVarIO (msgQueue q) MsgQueueState {writeState = ws} <- readTVarIO $ state mq - closeMsgQueue q + closeMsgQueue ms q let path = journalFilePath (queueDirectory $ queue mq) $ journalId ws print path removeFile path @@ -532,7 +532,7 @@ testReadAndWriteFilesMissing ms = do mq <- fromJust <$> readTVarIO (msgQueue q) MsgQueueState {readState = rs, writeState = ws} <- readTVarIO $ state mq - closeMsgQueue q + closeMsgQueue ms q removeFile $ journalFilePath (queueDirectory $ queue mq) $ journalId rs removeFile $ journalFilePath (queueDirectory $ queue mq) $ journalId ws diff --git a/tests/CoreTests/StoreLogTests.hs b/tests/CoreTests/StoreLogTests.hs index d871f5b0a..5fa36559e 100644 --- a/tests/CoreTests/StoreLogTests.hs +++ b/tests/CoreTests/StoreLogTests.hs @@ -108,7 +108,7 @@ testSMPStoreLog testSuite tests = where testReadWrite SLTC {compacted, state} = do st <- newMsgStore $ testJournalStoreCfg MQStoreCfg - l <- readWriteQueueStore True (mkQueue st) testStoreLogFile $ queueStore st + l <- readWriteQueueStore True (mkQueue st True) testStoreLogFile $ queueStore st storeState st `shouldReturn` state closeStoreLog l ([], compacted') <- partitionEithers . map strDecode . B.lines <$> B.readFile testStoreLogFile diff --git a/tests/SMPClient.hs b/tests/SMPClient.hs index 602055b82..6ee05c27b 100644 --- a/tests/SMPClient.hs +++ b/tests/SMPClient.hs @@ -259,10 +259,6 @@ proxyCfgMS msType = proxyCfgJ2 :: ServerConfig proxyCfgJ2 = journalCfg proxyCfg testStoreLogFile2 testStoreMsgsDir2 --- TODO [postgres] --- proxyCfgJ2 :: ServerConfig --- proxyCfgJ2 = journalCfg proxyCfg testStoreDBOpts2 testStoreMsgsDir2 - proxyVRangeV8 :: VersionRangeSMP proxyVRangeV8 = mkVersionRange minServerSMPRelayVersion sendingProxySMPVersion diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index 824338452..8255b898a 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -825,7 +825,7 @@ testRestoreExpireMessages = where export = do ms <- newMsgStore (testJournalStoreCfg MQStoreCfg) {quota = 4} - readWriteQueueStore True (mkQueue ms) testStoreLogFile (queueStore ms) >>= closeStoreLog + readWriteQueueStore True (mkQueue ms True) testStoreLogFile (queueStore ms) >>= closeStoreLog removeFileIfExists testStoreMsgsFile exportMessages False ms testStoreMsgsFile False closeMsgStore ms