diff --git a/.github/workflows/reproduce-schedule.yml b/.github/workflows/reproduce-schedule.yml new file mode 100644 index 000000000..a13c1fca2 --- /dev/null +++ b/.github/workflows/reproduce-schedule.yml @@ -0,0 +1,45 @@ +name: Reproduce latest release + +on: + workflow_dispatch: + schedule: + - cron: '0 2 * * *' # every day at 02:00 night + +jobs: + reproduce: + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v3 + + - name: Get latest release + shell: bash + run: | + curl --proto '=https' \ + --tlsv1.2 \ + -sSf -L \ + 'https://api.github.com/repos/simplex-chat/simplexmq/releases/latest' \ + 2>/dev/null | \ + grep -i "tag_name" | \ + awk -F \" '{print "TAG="$4}' >> $GITHUB_ENV + + - name: Execute reproduce script + run: | + ${GITHUB_WORKSPACE}/scripts/reproduce-builds.sh "$TAG" + + - name: Check if build has been reproduced + env: + url: ${{ secrets.STATUS_SIMPLEX_WEBHOOK_URL }} + user: ${{ secrets.STATUS_SIMPLEX_WEBHOOK_USER }} + pass: ${{ secrets.STATUS_SIMPLEX_WEBHOOK_PASS }} + run: | + if [ -f "${GITHUB_WORKSPACE}/$TAG/_sha256sums" ]; then + exit 0 + else + curl --proto '=https' --tlsv1.2 -sSf \ + -u "${user}:${pass}" \ + -H 'Content-Type: application/json' \ + -d '{"title": "👾 GitHub: Runner", "description": "⛔️ '"$TAG"' did not reproduce."}' \ + "$url" + exit 1 + fi diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index f6589d176..199f202fd 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -579,7 +579,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt pure ServerMetrics {statsData = d, activeQueueCounts = ps, activeNtfCounts = psNtf, queueCount, notifierCount} getRealTimeMetrics :: Env -> IO RealTimeMetrics - getRealTimeMetrics Env {clients, sockets, server = Server {subscribers, notifiers, subClients, ntfSubClients}} = do + getRealTimeMetrics Env {clients, sockets, msgStore = AMS _ _ ms, server = Server {subscribers, notifiers, subClients, ntfSubClients}} = do socketStats <- mapM (traverse getSocketStats) =<< readTVarIO sockets #if MIN_VERSION_base(4,18,0) threadsCount <- length <$> listThreads @@ -591,7 +591,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt smpSubClientsCount <- IM.size <$> readTVarIO subClients ntfSubsCount <- M.size <$> readTVarIO notifiers ntfSubClientsCount <- IM.size <$> readTVarIO ntfSubClients - pure RealTimeMetrics {socketStats, threadsCount, clientsCount, smpSubsCount, smpSubClientsCount, ntfSubsCount, ntfSubClientsCount} + loadedCounts <- loadedQueueCounts ms + pure RealTimeMetrics {socketStats, threadsCount, clientsCount, smpSubsCount, smpSubClientsCount, ntfSubsCount, ntfSubClientsCount, loadedCounts} runClient :: Transport c => C.APrivateSignKey -> TProxy c -> c -> M () runClient signKey tp h = do @@ -1449,6 +1450,7 @@ client withQueue :: (StoreQueue s -> QueueRec -> M (Transmission BrokerMsg)) -> M (Transmission BrokerMsg) withQueue = withQueue_ True + -- SEND passes queueNotBlocked False here to update time, but it fails anyway on blocked queues (see code for SEND). withQueue_ :: Bool -> (StoreQueue s -> QueueRec -> M (Transmission BrokerMsg)) -> M (Transmission BrokerMsg) withQueue_ queueNotBlocked action = case q_ of Nothing -> pure $ err INTERNAL diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index c1c72a39e..dbdb04758 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -332,13 +332,13 @@ newEnv config@ServerConfig {smpCredentials, httpCredentials, serverStoreCfg, smp ASSCfg qt mt (SSCMemory storePaths_) -> do let storePath = storeMsgsFile =<< storePaths_ ms <- newMsgStore STMStoreConfig {storePath, quota = msgQueueQuota} - forM_ storePaths_ $ \StorePaths {storeLogFile = f} -> loadStoreLog (mkQueue ms) f $ queueStore ms + forM_ storePaths_ $ \StorePaths {storeLogFile = f} -> loadStoreLog (mkQueue ms True) f $ queueStore ms pure $ AMS qt mt ms ASSCfg qt mt SSCMemoryJournal {storeLogFile, storeMsgsPath} -> do let qsCfg = MQStoreCfg cfg = mkJournalStoreConfig qsCfg storeMsgsPath msgQueueQuota maxJournalMsgCount maxJournalStateLines idleQueueInterval ms <- newMsgStore cfg - loadStoreLog (mkQueue ms) storeLogFile $ stmQueueStore ms + loadStoreLog (mkQueue ms True) storeLogFile $ stmQueueStore ms pure $ AMS qt mt ms #if defined(dbServerPostgres) ASSCfg qt mt SSCDatabaseJournal {storeCfg, storeMsgsPath'} -> do @@ -374,7 +374,8 @@ newEnv config@ServerConfig {smpCredentials, httpCredentials, serverStoreCfg, smp Just f -> do logInfo $ "compacting queues in file " <> T.pack f st <- newMsgStore STMStoreConfig {storePath = Nothing, quota = msgQueueQuota} - sl <- readWriteQueueStore False (mkQueue st) f (queueStore st) + -- we don't need to have locks in the map + sl <- readWriteQueueStore False (mkQueue st False) f (queueStore st) setStoreLog (queueStore st) sl closeMsgStore st Nothing -> do diff --git a/src/Simplex/Messaging/Server/Main.hs b/src/Simplex/Messaging/Server/Main.hs index 9e63c1dbb..d2470eadb 100644 --- a/src/Simplex/Messaging/Server/Main.hs +++ b/src/Simplex/Messaging/Server/Main.hs @@ -118,7 +118,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = ("WARNING: message log file " <> storeMsgsFilePath <> " will be imported to journal directory " <> storeMsgsJournalDir) "Messages not imported" ms <- newJournalMsgStore MQStoreCfg - readQueueStore True (mkQueue ms) storeLogFile $ stmQueueStore ms + readQueueStore True (mkQueue ms False) storeLogFile $ stmQueueStore ms msgStats <- importMessages True ms storeMsgsFilePath Nothing False -- no expiration putStrLn "Import completed" printMessageStats "Messages" msgStats @@ -137,7 +137,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = "Journal not exported" ms <- newJournalMsgStore MQStoreCfg -- TODO [postgres] in case postgres configured, queues must be read from database - readQueueStore True (mkQueue ms) storeLogFile $ stmQueueStore ms + readQueueStore True (mkQueue ms False) storeLogFile $ stmQueueStore ms exportMessages True ms storeMsgsFilePath False putStrLn "Export completed" case readStoreType ini of @@ -179,7 +179,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = ("WARNING: store log file " <> storeLogFile <> " will be compacted and imported to PostrgreSQL database: " <> B.unpack connstr <> ", schema: " <> B.unpack schema) "Queue records not imported" ms <- newJournalMsgStore MQStoreCfg - sl <- readWriteQueueStore True (mkQueue ms) storeLogFile (queueStore ms) + sl <- readWriteQueueStore True (mkQueue ms False) storeLogFile (queueStore ms) closeStoreLog sl queues <- readTVarIO $ loadedQueues $ stmQueueStore ms let storeCfg = PostgresStoreCfg {dbOpts = dbOpts {createSchema = True}, dbStoreLogPath = Nothing, confirmMigrations = MCConsole, deletedTTL = iniDeletedTTL ini} diff --git a/src/Simplex/Messaging/Server/MsgStore/Journal.hs b/src/Simplex/Messaging/Server/MsgStore/Journal.hs index b48d31743..90ee2208d 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Journal.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Journal.hs @@ -62,7 +62,8 @@ import Data.Either (fromRight) import Data.Functor (($>)) import Data.Int (Int64) import Data.List (intercalate, sort) -import Data.Maybe (fromMaybe, isNothing, mapMaybe) +import qualified Data.Map.Strict as M +import Data.Maybe (fromMaybe, isJust, isNothing, mapMaybe) import Data.Text (Text) import qualified Data.Text as T import Data.Time.Clock (NominalDiffTime, UTCTime, addUTCTime, getCurrentTime) @@ -96,6 +97,7 @@ data JournalMsgStore s = JournalMsgStore queueLocks :: TMap RecipientId Lock, sharedLock :: TMVar RecipientId, queueStore_ :: QStore s, + openedQueueCount :: TVar Int, expireBackupsBefore :: UTCTime } @@ -344,12 +346,6 @@ instance QueueStoreClass (JournalQueue s) (QStore s) where deleteStoreQueue = withQS deleteStoreQueue {-# INLINE deleteStoreQueue #-} -#if defined(dbServerPostgres) -mkTempQueue :: JournalMsgStore s -> RecipientId -> QueueRec -> IO (JournalQueue s) -mkTempQueue ms rId qr = createLockIO >>= makeQueue_ ms rId qr -{-# INLINE mkTempQueue #-} -#endif - makeQueue_ :: JournalMsgStore s -> RecipientId -> QueueRec -> Lock -> IO (JournalQueue s) makeQueue_ JournalMsgStore {sharedLock} rId qr queueLock = do queueRec' <- newTVarIO $ Just qr @@ -379,8 +375,9 @@ instance MsgStoreClass (JournalMsgStore s) where queueLocks <- TM.emptyIO sharedLock <- newEmptyTMVarIO queueStore_ <- newQueueStore @(JournalQueue s) queueStoreCfg + openedQueueCount <- newTVarIO 0 expireBackupsBefore <- addUTCTime (- expireBackupsAfter config) <$> getCurrentTime - pure JournalMsgStore {config, random, queueLocks, sharedLock, queueStore_, expireBackupsBefore} + pure JournalMsgStore {config, random, queueLocks, sharedLock, queueStore_, openedQueueCount, expireBackupsBefore} closeMsgStore :: JournalMsgStore s -> IO () closeMsgStore ms = do @@ -388,7 +385,7 @@ instance MsgStoreClass (JournalMsgStore s) where closeQueues $ loadedQueues @(JournalQueue s) st closeQueueStore @(JournalQueue s) st where - closeQueues qs = readTVarIO qs >>= mapM_ closeMsgQueue + closeQueues qs = readTVarIO qs >>= mapM_ (closeMsgQueue ms) withActiveMsgQueues :: Monoid a => JournalMsgStore s -> (JournalQueue s -> IO a) -> IO a withActiveMsgQueues = withQS withLoadedQueues . queueStore_ @@ -399,12 +396,12 @@ instance MsgStoreClass (JournalMsgStore s) where unsafeWithAllMsgQueues tty withData ms action = case queueStore_ ms of MQStore st -> withLoadedQueues st run #if defined(dbServerPostgres) - PQStore st -> foldQueueRecs tty withData st Nothing $ uncurry (mkTempQueue ms) >=> run + PQStore st -> foldQueueRecs tty withData st Nothing $ uncurry (mkQueue ms False) >=> run #endif where run q = do r <- action q - closeMsgQueue q + closeMsgQueue ms q pure r -- This function is concurrency safe @@ -420,7 +417,7 @@ instance MsgStoreClass (JournalMsgStore s) where PQStore st -> do let JournalMsgStore {queueLocks, sharedLock} = ms foldQueueRecs tty False st (Just veryOld) $ \(rId, qr) -> do - q <- mkTempQueue ms rId qr + q <- mkQueue ms False rId qr withSharedWaitLock rId queueLocks sharedLock $ run $ tryStore' "deleteExpiredMsgs" rId $ getLoadedQueue q >>= unStoreIO . expireQueueMsgs ms now old #endif @@ -447,9 +444,26 @@ instance MsgStoreClass (JournalMsgStore s) where queueStore = queueStore_ {-# INLINE queueStore #-} - mkQueue :: JournalMsgStore s -> RecipientId -> QueueRec -> IO (JournalQueue s) - mkQueue ms rId qr = do - lock <- atomically $ getMapLock (queueLocks ms) rId + loadedQueueCounts :: JournalMsgStore s -> IO LoadedQueueCounts + loadedQueueCounts ms = do + let (qs, ns, nLocks_) = loaded + loadedQueueCount <- M.size <$> readTVarIO qs + loadedNotifierCount <- M.size <$> readTVarIO ns + openJournalCount <- readTVarIO (openedQueueCount ms) + queueLockCount <- M.size <$> readTVarIO (queueLocks ms) + notifierLockCount <- maybe (pure 0) (fmap M.size . readTVarIO) nLocks_ + pure LoadedQueueCounts {loadedQueueCount, loadedNotifierCount, openJournalCount, queueLockCount, notifierLockCount} + where + loaded :: (TMap RecipientId (JournalQueue s), TMap NotifierId RecipientId, Maybe (TMap NotifierId Lock)) + loaded = case queueStore_ ms of + MQStore STMQueueStore {queues, notifiers} -> (queues, notifiers, Nothing) +#if defined(dbServerPostgres) + PQStore PostgresQueueStore {queues, notifiers, notifierLocks} -> (queues, notifiers, Just notifierLocks) +#endif + + mkQueue :: JournalMsgStore s -> Bool -> RecipientId -> QueueRec -> IO (JournalQueue s) + mkQueue ms keepLock rId qr = do + lock <- if keepLock then atomically $ getMapLock (queueLocks ms) rId else createLockIO makeQueue_ ms rId qr lock getMsgQueue :: JournalMsgStore s -> JournalQueue s -> Bool -> StoreIO s (JournalMsgQueue s) @@ -484,7 +498,7 @@ instance MsgStoreClass (JournalMsgStore s) where -- In case the queue became non-empty on write and then again empty on read -- we won't be closing it, to avoid frequent open/close on active queues. r <- peek - when (isNothing r) $ StoreIO $ closeMsgQueue q + when (isNothing r) $ StoreIO $ closeMsgQueue ms q pure r where peek = do @@ -498,7 +512,7 @@ instance MsgStoreClass (JournalMsgStore s) where Nothing -> E.bracket getNonEmptyMsgQueue - (mapM_ $ \_ -> closeMsgQueue q) + (mapM_ $ \_ -> closeMsgQueue ms q) (maybe (pure (Nothing, 0)) (unStoreIO . run)) where run mq = do @@ -508,7 +522,7 @@ instance MsgStoreClass (JournalMsgStore s) where Just mq -> do ts <- readTVarIO $ activeAt q r <- if now - ts >= idleInterval config - then Just <$> unStoreIO (action mq) `E.finally` closeMsgQueue q + then Just <$> unStoreIO (action mq) `E.finally` closeMsgQueue ms q else pure Nothing sz <- unStoreIO $ getQueueSize_ mq pure (r, sz) @@ -523,7 +537,7 @@ instance MsgStoreClass (JournalMsgStore s) where mq <- unStoreIO $ getMsgQueue ms q False -- queueState was updated in getMsgQueue readTVarIO queueState >>= \case - Just QState {hasStored} | not hasStored -> closeMsgQueue q $> Nothing + Just QState {hasStored} | not hasStored -> closeMsgQueue ms q $> Nothing _ -> pure $ Just mq deleteQueue :: JournalMsgStore s -> JournalQueue s -> IO (Either ErrorType QueueRec) @@ -586,6 +600,7 @@ instance MsgStoreClass (JournalMsgStore s) where rh <- createNewJournal queueDirectory $ journalId rs let hs = MsgQueueHandles {stateHandle = sh, readHandle = rh, writeHandle = Nothing} atomically $ writeTVar handles $ Just hs + atomically $ modifyTVar' (openedQueueCount ms) (+ 1) pure hs switchWriteJournal hs = do journalId <- newJournalId $ random ms @@ -657,13 +672,16 @@ openMsgQueue ms@JournalMsgStore {config} q@JMQueue {queueDirectory = dir, stateP Just st | size st == 0 -> do (st', hs_) <- removeJournals st shouldBackup + when (isJust hs_) incOpenedCount mkJournalQueue q st' hs_ | otherwise -> do sh <- openBackupQueueState st shouldBackup (st', rh, wh_) <- closeOnException sh $ openJournals ms dir st sh let hs = MsgQueueHandles {stateHandle = sh, readHandle = rh, writeHandle = wh_} + incOpenedCount mkJournalQueue q st' (Just hs) where + incOpenedCount = atomically $ modifyTVar' (openedQueueCount ms) (+ 1) -- If the queue is empty, journals are deleted. -- New journal is created if queue is written to. -- canWrite is set to True. @@ -926,28 +944,30 @@ validQueueState MsgQueueState {readState = rs, writeState = ws, size} && msgPos ws == msgCount ws && bytePos ws == byteCount ws --- TODO [postgres] possibly, we need to remove the lock from map deleteQueue_ :: JournalMsgStore s -> JournalQueue s -> IO (Either ErrorType (QueueRec, Maybe (JournalMsgQueue s))) deleteQueue_ ms q = - runExceptT $ isolateQueueId "deleteQueue_" ms rId $ - deleteStoreQueue (queueStore_ ms) q >>= mapM remove + runExceptT $ isolateQueueId "deleteQueue_" ms rId $ do + r <- deleteStoreQueue (queueStore_ ms) q >>= mapM remove + atomically $ TM.delete rId (queueLocks ms) + pure r where rId = recipientId q remove r@(_, mq_) = do - mapM_ closeMsgQueueHandles mq_ + mapM_ (closeMsgQueueHandles ms) mq_ removeQueueDirectory ms rId pure r -closeMsgQueue :: JournalQueue s -> IO () -closeMsgQueue JournalQueue {msgQueue'} = atomically (swapTVar msgQueue' Nothing) >>= mapM_ closeMsgQueueHandles +closeMsgQueue :: JournalMsgStore s -> JournalQueue s -> IO () +closeMsgQueue ms JournalQueue {msgQueue'} = atomically (swapTVar msgQueue' Nothing) >>= mapM_ (closeMsgQueueHandles ms) -closeMsgQueueHandles :: JournalMsgQueue s -> IO () -closeMsgQueueHandles q = readTVarIO (handles q) >>= mapM_ closeHandles +closeMsgQueueHandles :: JournalMsgStore s -> JournalMsgQueue s -> IO () +closeMsgQueueHandles ms q = readTVarIO (handles q) >>= mapM_ closeHandles where closeHandles (MsgQueueHandles sh rh wh_) = do hClose sh hClose rh mapM_ hClose wh_ + atomically $ modifyTVar' (openedQueueCount ms) (subtract 1) removeQueueDirectory :: JournalMsgStore s -> RecipientId -> IO () removeQueueDirectory st = removeQueueDirectory_ . msgQueueDirectory st diff --git a/src/Simplex/Messaging/Server/MsgStore/STM.hs b/src/Simplex/Messaging/Server/MsgStore/STM.hs index af8cde941..afde3ff82 100644 --- a/src/Simplex/Messaging/Server/MsgStore/STM.hs +++ b/src/Simplex/Messaging/Server/MsgStore/STM.hs @@ -23,6 +23,7 @@ import Control.Monad.IO.Class import Control.Monad.Trans.Except import Data.Functor (($>)) import Data.Int (Int64) +import qualified Data.Map.Strict as M import Simplex.Messaging.Protocol import Simplex.Messaging.Server.MsgStore.Types import Simplex.Messaging.Server.QueueStore @@ -94,7 +95,13 @@ instance MsgStoreClass STMMsgStore where queueStore = queueStore_ {-# INLINE queueStore #-} - mkQueue _ rId qr = STMQueue rId <$> newTVarIO (Just qr) <*> newTVarIO Nothing + loadedQueueCounts :: STMMsgStore -> IO LoadedQueueCounts + loadedQueueCounts STMMsgStore {queueStore_ = st} = do + loadedQueueCount <- M.size <$> readTVarIO (queues st) + loadedNotifierCount <- M.size <$> readTVarIO (notifiers st) + pure LoadedQueueCounts {loadedQueueCount, loadedNotifierCount, openJournalCount = 0, queueLockCount = 0, notifierLockCount = 0} + + mkQueue _ _ rId qr = STMQueue rId <$> newTVarIO (Just qr) <*> newTVarIO Nothing {-# INLINE mkQueue #-} getMsgQueue :: STMMsgStore -> STMQueue -> Bool -> STM STMMsgQueue diff --git a/src/Simplex/Messaging/Server/MsgStore/Types.hs b/src/Simplex/Messaging/Server/MsgStore/Types.hs index 3bf857e6d..82778b5a4 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Types.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Types.hs @@ -44,9 +44,10 @@ class (Monad (StoreMonad s), QueueStoreClass (StoreQueue s) (QueueStore s)) => M logQueueStates :: s -> IO () logQueueState :: StoreQueue s -> StoreMonad s () queueStore :: s -> QueueStore s + loadedQueueCounts :: s -> IO LoadedQueueCounts -- message store methods - mkQueue :: s -> RecipientId -> QueueRec -> IO (StoreQueue s) + mkQueue :: s -> Bool -> RecipientId -> QueueRec -> IO (StoreQueue s) getMsgQueue :: s -> StoreQueue s -> Bool -> StoreMonad s (MsgQueue (StoreQueue s)) getPeekMsgQueue :: s -> StoreQueue s -> StoreMonad s (Maybe (MsgQueue (StoreQueue s), Message)) @@ -89,11 +90,19 @@ instance Semigroup MessageStats where MessageStats a b c <> MessageStats x y z = MessageStats (a + x) (b + y) (c + z) {-# INLINE (<>) #-} +data LoadedQueueCounts = LoadedQueueCounts + { loadedQueueCount :: Int, + loadedNotifierCount :: Int, + openJournalCount :: Int, + queueLockCount :: Int, + notifierLockCount :: Int + } + newMessageStats :: MessageStats newMessageStats = MessageStats 0 0 0 addQueue :: MsgStoreClass s => s -> RecipientId -> QueueRec -> IO (Either ErrorType (StoreQueue s)) -addQueue st = addQueue_ (queueStore st) (mkQueue st) +addQueue st = addQueue_ (queueStore st) (mkQueue st True) {-# INLINE addQueue #-} getQueue :: (MsgStoreClass s, DirectParty p) => s -> SParty p -> QueueId -> IO (Either ErrorType (StoreQueue s)) diff --git a/src/Simplex/Messaging/Server/Prometheus.hs b/src/Simplex/Messaging/Server/Prometheus.hs index 3f5c3f87e..a542a87f1 100644 --- a/src/Simplex/Messaging/Server/Prometheus.hs +++ b/src/Simplex/Messaging/Server/Prometheus.hs @@ -12,6 +12,7 @@ import Data.Time.Clock (UTCTime (..), diffUTCTime) import Data.Time.Clock.System (systemEpochDay) import Data.Time.Format.ISO8601 (iso8601Show) import Network.Socket (ServiceName) +import Simplex.Messaging.Server.MsgStore.Types (LoadedQueueCounts (..)) import Simplex.Messaging.Server.Stats import Simplex.Messaging.Transport.Server (SocketStats (..)) @@ -30,7 +31,8 @@ data RealTimeMetrics = RealTimeMetrics smpSubsCount :: Int, smpSubClientsCount :: Int, ntfSubsCount :: Int, - ntfSubClientsCount :: Int + ntfSubClientsCount :: Int, + loadedCounts :: LoadedQueueCounts } {-# FOURMOLU_DISABLE\n#-} @@ -46,7 +48,8 @@ prometheusMetrics sm rtm ts = smpSubsCount, smpSubClientsCount, ntfSubsCount, - ntfSubClientsCount + ntfSubClientsCount, + loadedCounts } = rtm ServerStatsData { _fromTime, @@ -371,7 +374,28 @@ prometheusMetrics sm rtm ts = \\n\ \# HELP simplex_smp_subscription_ntf_clients_total Total subscribed NTF servers, first counting method\n\ \# TYPE simplex_smp_subscription_ntf_clients_total gauge\n\ - \simplex_smp_subscription_ntf_clients_total " <> mshow ntfSubClientsCount <> "\n# ntfSubClients\n" + \simplex_smp_subscription_ntf_clients_total " <> mshow ntfSubClientsCount <> "\n# ntfSubClients\n\ + \\n\ + \# HELP simplex_smp_loaded_queues_queue_count Total loaded queues count (all queues for memory/journal storage)\n\ + \# TYPE simplex_smp_loaded_queues_queue_count gauge\n\ + \simplex_smp_loaded_queues_queue_count " <> mshow (loadedQueueCount loadedCounts) <> "\n# loadedCounts.loadedQueueCount\n\ + \\n\ + \# HELP simplex_smp_loaded_queues_ntf_count Total loaded ntf credential references (all ntf credentials for memory/journal storage)\n\ + \# TYPE simplex_smp_loaded_queues_ntf_count gauge\n\ + \simplex_smp_loaded_queues_ntf_count " <> mshow (loadedNotifierCount loadedCounts) <> "\n# loadedCounts.loadedNotifierCount\n\ + \\n\ + \# HELP simplex_smp_loaded_queues_open_journal_count Total opened queue journals (0 for memory storage)\n\ + \# TYPE simplex_smp_loaded_queues_open_journal_count gauge\n\ + \simplex_smp_loaded_queues_open_journal_count " <> mshow (openJournalCount loadedCounts) <> "\n# loadedCounts.openJournalCount\n\ + \\n\ + \# HELP simplex_smp_loaded_queues_queue_lock_count Total queue locks (0 for memory storage)\n\ + \# TYPE simplex_smp_loaded_queues_queue_lock_count gauge\n\ + \simplex_smp_loaded_queues_queue_lock_count " <> mshow (queueLockCount loadedCounts) <> "\n# loadedCounts.queueLockCount\n\ + \\n\ + \# HELP simplex_smp_loaded_queues_ntf_lock_count Total notifier locks (0 for memory/journal storage)\n\ + \# TYPE simplex_smp_loaded_queues_ntf_lock_count gauge\n\ + \simplex_smp_loaded_queues_ntf_lock_count " <> mshow (notifierLockCount loadedCounts) <> "\n# loadedCounts.notifierLockCount\n" + socketsMetric :: (SocketStats -> Int) -> Text -> Text -> Text socketsMetric sel metric descr = "# HELP " <> metric <> " " <> descr <> "\n" diff --git a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs index 454e54c12..89464938d 100644 --- a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs +++ b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs @@ -155,39 +155,52 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where -- hasId = anyM [TM.memberIO rId queues, TM.memberIO senderId senders, hasNotifier] -- hasNotifier = maybe (pure False) (\NtfCreds {notifierId} -> TM.memberIO notifierId notifiers) notifier - getQueue_ :: DirectParty p => PostgresQueueStore q -> (RecipientId -> QueueRec -> IO q) -> SParty p -> QueueId -> IO (Either ErrorType q) + getQueue_ :: DirectParty p => PostgresQueueStore q -> (Bool -> RecipientId -> QueueRec -> IO q) -> SParty p -> QueueId -> IO (Either ErrorType q) getQueue_ st mkQ party qId = case party of SRecipient -> getRcvQueue qId - SSender -> TM.lookupIO qId senders >>= maybe loadSndQueue getRcvQueue - SNotifier -> TM.lookupIO qId notifiers >>= maybe loadNtfQueue getRcvQueue - SLinkClient -> loadLinkQueue + SSender -> TM.lookupIO qId senders >>= maybe (mask loadSndQueue) getRcvQueue + -- TODO [short links] use map of link IDs - queue will be added there on creation in case there is data + SLinkClient -> mask loadLinkQueue + -- loaded queue is deleted from notifiers map to reduce cache size after queue was subscribed to by ntf server + SNotifier -> TM.lookupIO qId notifiers >>= maybe (mask loadNtfQueue) (getRcvQueue >=> (atomically (TM.delete qId notifiers) $>)) where PostgresQueueStore {queues, senders, notifiers} = st - getRcvQueue rId = TM.lookupIO rId queues >>= maybe loadRcvQueue (pure . Right) - loadRcvQueue = loadQueue " WHERE recipient_id = ?" $ \_ -> pure () - loadSndQueue = loadQueue " WHERE sender_id = ?" $ \rId -> TM.insert qId rId senders - loadNtfQueue = loadQueue " WHERE notifier_id = ?" $ \_ -> pure () -- do NOT cache ref - ntf subscriptions are rare - loadLinkQueue = loadQueue " WHERE link_id = ?" $ \_ -> pure () - loadQueue condition insertRef = - E.uninterruptibleMask_ $ runExceptT $ do - (rId, qRec) <- - withDB "getQueue_" st $ \db -> firstRow rowToQueueRec AUTH $ - DB.query db (queueRecQuery <> condition <> " AND deleted_at IS NULL") (Only qId) - liftIO $ do - sq <- mkQ rId qRec -- loaded queue - -- This lock prevents the scenario when the queue is added to cache, - -- while another thread is proccessing the same queue in withAllMsgQueues - -- without adding it to cache, possibly trying to open the same files twice. - -- Alse see comment in idleDeleteExpiredMsgs. - withQueueLock sq "getQueue_" $ atomically $ - -- checking the cache again for concurrent reads, - -- use previously loaded queue if exists. - TM.lookup rId queues >>= \case - Just sq' -> pure sq' - Nothing -> do - insertRef rId - TM.insert rId sq queues - pure sq + getRcvQueue rId = TM.lookupIO rId queues >>= maybe (mask loadRcvQueue) (pure . Right) + loadRcvQueue = do + (rId, qRec) <- loadQueue " WHERE recipient_id = ?" + liftIO $ cacheQueue rId qRec $ \_ -> pure () -- recipient map already checked, not caching sender ref + loadSndQueue = loadSndQueue_ " WHERE sender_id = ?" + loadLinkQueue = loadSndQueue_ " WHERE link_id = ?" + loadNtfQueue = do + (rId, qRec) <- loadQueue " WHERE notifier_id = ?" + liftIO $ + TM.lookupIO rId queues -- checking recipient map first, not creating lock in map, not caching queue + >>= maybe (mkQ False rId qRec) pure + loadSndQueue_ condition = do + (rId, qRec) <- loadQueue condition + liftIO $ + TM.lookupIO rId queues -- checking recipient map first + >>= maybe (cacheQueue rId qRec cacheSender) (atomically (cacheSender rId) $>) + mask = E.uninterruptibleMask_ . runExceptT + cacheSender rId = TM.insert qId rId senders + loadQueue condition = + withDB "getQueue_" st $ \db -> firstRow rowToQueueRec AUTH $ + DB.query db (queueRecQuery <> condition <> " AND deleted_at IS NULL") (Only qId) + cacheQueue rId qRec insertRef = do + sq <- mkQ True rId qRec -- loaded queue + -- This lock prevents the scenario when the queue is added to cache, + -- while another thread is proccessing the same queue in withAllMsgQueues + -- without adding it to cache, possibly trying to open the same files twice. + -- Alse see comment in idleDeleteExpiredMsgs. + withQueueLock sq "getQueue_" $ atomically $ + -- checking the cache again for concurrent reads, + -- use previously loaded queue if exists. + TM.lookup rId queues >>= \case + Just sq' -> pure sq' + Nothing -> do + insertRef rId + TM.insert rId sq queues + pure sq getQueueLinkData :: PostgresQueueStore q -> q -> LinkId -> IO (Either ErrorType QueueLinkData) getQueueLinkData st sq lnkId = runExceptT $ do @@ -328,7 +341,9 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where DB.execute db "UPDATE msg_queues SET deleted_at = ? WHERE recipient_id = ? AND deleted_at IS NULL" (ts, rId) atomically $ writeTVar qr Nothing atomically $ TM.delete (senderId q) $ senders st - forM_ (notifier q) $ \NtfCreds {notifierId} -> atomically $ TM.delete notifierId $ notifiers st + forM_ (notifier q) $ \NtfCreds {notifierId} -> do + atomically $ TM.delete notifierId $ notifiers st + atomically $ TM.delete notifierId $ notifierLocks st mq_ <- atomically $ swapTVar (msgQueue sq) Nothing withLog "deleteStoreQueue" st (`logDeleteQueue` rId) pure (q, mq_) diff --git a/src/Simplex/Messaging/Server/QueueStore/STM.hs b/src/Simplex/Messaging/Server/QueueStore/STM.hs index 5638312c6..e3efe9309 100644 --- a/src/Simplex/Messaging/Server/QueueStore/STM.hs +++ b/src/Simplex/Messaging/Server/QueueStore/STM.hs @@ -96,7 +96,7 @@ instance StoreQueueClass q => QueueStoreClass q (STMQueueStore q) where hasNotifier = maybe (pure False) (\NtfCreds {notifierId} -> TM.member notifierId notifiers) notifier hasLink = maybe (pure False) (\(lnkId, _) -> TM.member lnkId links) queueData - getQueue_ :: DirectParty p => STMQueueStore q -> (RecipientId -> QueueRec -> IO q) -> SParty p -> QueueId -> IO (Either ErrorType q) + getQueue_ :: DirectParty p => STMQueueStore q -> (Bool -> RecipientId -> QueueRec -> IO q) -> SParty p -> QueueId -> IO (Either ErrorType q) getQueue_ st _ party qId = maybe (Left AUTH) Right <$> case party of SRecipient -> TM.lookupIO qId queues diff --git a/src/Simplex/Messaging/Server/QueueStore/Types.hs b/src/Simplex/Messaging/Server/QueueStore/Types.hs index 73c098a68..2a4dbc3ea 100644 --- a/src/Simplex/Messaging/Server/QueueStore/Types.hs +++ b/src/Simplex/Messaging/Server/QueueStore/Types.hs @@ -29,7 +29,7 @@ class StoreQueueClass q => QueueStoreClass q s where loadedQueues :: s -> TMap RecipientId q compactQueues :: s -> IO Int64 addQueue_ :: s -> (RecipientId -> QueueRec -> IO q) -> RecipientId -> QueueRec -> IO (Either ErrorType q) - getQueue_ :: DirectParty p => s -> (RecipientId -> QueueRec -> IO q) -> SParty p -> QueueId -> IO (Either ErrorType q) + getQueue_ :: DirectParty p => s -> (Bool -> RecipientId -> QueueRec -> IO q) -> SParty p -> QueueId -> IO (Either ErrorType q) getQueueLinkData :: s -> q -> LinkId -> IO (Either ErrorType QueueLinkData) addQueueLinkData :: s -> q -> LinkId -> QueueLinkData -> IO (Either ErrorType ()) deleteQueueLinkData :: s -> q -> IO (Either ErrorType ()) diff --git a/src/Simplex/Messaging/Server/StoreLog/ReadWrite.hs b/src/Simplex/Messaging/Server/StoreLog/ReadWrite.hs index d03bf55df..5a8f1f46c 100644 --- a/src/Simplex/Messaging/Server/StoreLog/ReadWrite.hs +++ b/src/Simplex/Messaging/Server/StoreLog/ReadWrite.hs @@ -58,7 +58,7 @@ readQueueStore tty mkQ f st = readLogLines tty f $ \_ -> processLine withQueue qId op a = runExceptT go >>= qError qId op where go = do - q <- ExceptT $ getQueue_ st mkQ SRecipient qId + q <- ExceptT $ getQueue_ st (\_ -> mkQ) SRecipient qId liftIO (readTVarIO $ queueRec q) >>= \case Nothing -> logWarn $ logPfx qId op <> "already deleted" Just _ -> void $ ExceptT $ a q diff --git a/tests/CoreTests/MsgStoreTests.hs b/tests/CoreTests/MsgStoreTests.hs index 2c76b051c..a110ec9d8 100644 --- a/tests/CoreTests/MsgStoreTests.hs +++ b/tests/CoreTests/MsgStoreTests.hs @@ -198,7 +198,7 @@ testExportImportStore ms = do g <- C.newRandom (rId1, qr1) <- testNewQueueRec g QMMessaging (rId2, qr2) <- testNewQueueRec g QMMessaging - sl <- readWriteQueueStore True (mkQueue ms) testStoreLogFile $ queueStore ms + sl <- readWriteQueueStore True (mkQueue ms True) testStoreLogFile $ queueStore ms runRight_ $ do let write q s = writeMsg ms q True =<< mkMessage s q1 <- ExceptT $ addQueue ms rId1 qr1 @@ -223,7 +223,7 @@ testExportImportStore ms = do closeStoreLog sl let cfg = (testJournalStoreCfg MQStoreCfg :: JournalStoreConfig 'QSMemory) {storePath = testStoreMsgsDir2} ms' <- newMsgStore cfg - readWriteQueueStore True (mkQueue ms') testStoreLogFile (queueStore ms') >>= closeStoreLog + readWriteQueueStore True (mkQueue ms' True) testStoreLogFile (queueStore ms') >>= closeStoreLog stats@MessageStats {storedMsgsCount = 5, expiredMsgsCount = 0, storedQueues = 2} <- importMessages False ms' testStoreMsgsFile Nothing False printMessageStats "Messages" stats @@ -232,7 +232,7 @@ testExportImportStore ms = do exportMessages False ms' testStoreMsgsFile2 False (B.readFile testStoreMsgsFile2 `shouldReturn`) =<< B.readFile (testStoreMsgsFile <> ".bak") stmStore <- newMsgStore testSMTStoreConfig - readWriteQueueStore True (mkQueue stmStore) testStoreLogFile (queueStore stmStore) >>= closeStoreLog + readWriteQueueStore True (mkQueue stmStore True) testStoreLogFile (queueStore stmStore) >>= closeStoreLog MessageStats {storedMsgsCount = 5, expiredMsgsCount = 0, storedQueues = 2} <- importMessages False stmStore testStoreMsgsFile2 Nothing False exportMessages False stmStore testStoreMsgsFile False @@ -313,7 +313,7 @@ testMessageState ms = do q <- ExceptT $ addQueue ms rId qr Just (Message {msgId = mId1}, True) <- write q "message 1" Just (Message {}, False) <- write q "message 2" - liftIO $ closeMsgQueue q + liftIO $ closeMsgQueue ms q pure mId1 ls <- B.lines <$> B.readFile statePath @@ -324,7 +324,7 @@ testMessageState ms = do Just (Message {msgId = mId3}, False) <- write q "message 3" (Msg "message 1", Msg "message 3") <- tryDelPeekMsg ms q mId1 (Msg "message 3", Nothing) <- tryDelPeekMsg ms q mId3 - liftIO $ closeMsgQueue q + liftIO $ closeMsgQueue ms q testRemoveJournals :: JournalMsgStore s -> IO () testRemoveJournals ms = do @@ -340,7 +340,7 @@ testRemoveJournals ms = do Just (Message {msgId = mId2}, False) <- write q "message 2" (Msg "message 1", Msg "message 2") <- tryDelPeekMsg ms q mId1 (Msg "message 2", Nothing) <- tryDelPeekMsg ms q mId2 - liftIO $ closeMsgQueue q + liftIO $ closeMsgQueue ms q ls <- B.lines <$> B.readFile statePath length ls `shouldBe` 4 @@ -379,7 +379,7 @@ testRemoveJournals ms = do liftIO $ journalFilesCount dir `shouldReturn` 1 (Msg "message 7", Just MessageQuota {msgId = mId8}) <- tryDelPeekMsg ms q mId7 (Just MessageQuota {}, Nothing) <- tryDelPeekMsg ms q mId8 - liftIO $ closeMsgQueue q + liftIO $ closeMsgQueue ms q journalFilesCount dir `shouldReturn` 1 runRight $ do @@ -387,7 +387,7 @@ testRemoveJournals ms = do Just (Message {}, True) <- write q "message 8" liftIO $ journalFilesCount dir `shouldReturn` 1 liftIO $ stateBackupCount dir `shouldReturn` 2 - liftIO $ closeMsgQueue q + liftIO $ closeMsgQueue ms q where journalFilesCount dir = length . filter ("messages." `isPrefixOf`) <$> listDirectory dir stateBackupCount dir = length . filter (".bak" `isSuffixOf`) <$> listDirectory dir @@ -410,20 +410,20 @@ testRemoveQueueStateBackups = do Just (Message {msgId = mId2}, False) <- write q "message 2" (Msg "message 1", Msg "message 2") <- tryDelPeekMsg ms q mId1 (Msg "message 2", Nothing) <- tryDelPeekMsg ms q mId2 - liftIO $ closeMsgQueue q + liftIO $ closeMsgQueue ms q liftIO $ stateBackupCount dir `shouldReturn` 0 q1 <- ExceptT $ getQueue ms SRecipient rId Just (Message {}, True) <- write q1 "message 3" Just (Message {}, False) <- write q1 "message 4" - liftIO $ closeMsgQueue q1 + liftIO $ closeMsgQueue ms q1 liftIO $ stateBackupCount dir `shouldReturn` 0 liftIO $ threadDelay 1000000 q2 <- ExceptT $ getQueue ms SRecipient rId Just (Message {}, False) <- write q2 "message 5" Nothing <- write q2 "message 5" - liftIO $ closeMsgQueue q2 + liftIO $ closeMsgQueue ms q2 liftIO $ stateBackupCount dir `shouldReturn` 1 where stateBackupCount dir = length . filter (".bak" `isSuffixOf`) <$> listDirectory dir @@ -445,7 +445,7 @@ testExpireIdleQueues = do Just (Message {msgId = mId2}, False) <- write q "message 2" (Msg "message 1", Msg "message 2") <- tryDelPeekMsg ms q mId1 (Msg "message 2", Nothing) <- tryDelPeekMsg ms q mId2 - liftIO $ closeMsgQueue q + liftIO $ closeMsgQueue ms q pure q (Just MsgQueueState {size = 0, readState = rs, writeState = ws}, True) <- readQueueState ms statePath @@ -474,7 +474,7 @@ testReadFileMissing ms = do mq <- fromJust <$> readTVarIO (msgQueue q) MsgQueueState {readState = rs} <- readTVarIO $ state mq - closeMsgQueue q + closeMsgQueue ms q let path = journalFilePath (queueDirectory $ queue mq) $ journalId rs removeFile path @@ -493,7 +493,7 @@ testReadFileMissingSwitch ms = do mq <- fromJust <$> readTVarIO (msgQueue q) MsgQueueState {readState = rs} <- readTVarIO $ state mq - closeMsgQueue q + closeMsgQueue ms q let path = journalFilePath (queueDirectory $ queue mq) $ journalId rs removeFile path @@ -511,7 +511,7 @@ testWriteFileMissing ms = do mq <- fromJust <$> readTVarIO (msgQueue q) MsgQueueState {writeState = ws} <- readTVarIO $ state mq - closeMsgQueue q + closeMsgQueue ms q let path = journalFilePath (queueDirectory $ queue mq) $ journalId ws print path removeFile path @@ -534,7 +534,7 @@ testReadAndWriteFilesMissing ms = do mq <- fromJust <$> readTVarIO (msgQueue q) MsgQueueState {readState = rs, writeState = ws} <- readTVarIO $ state mq - closeMsgQueue q + closeMsgQueue ms q removeFile $ journalFilePath (queueDirectory $ queue mq) $ journalId rs removeFile $ journalFilePath (queueDirectory $ queue mq) $ journalId ws diff --git a/tests/CoreTests/StoreLogTests.hs b/tests/CoreTests/StoreLogTests.hs index 0f3d22f38..d05f689c1 100644 --- a/tests/CoreTests/StoreLogTests.hs +++ b/tests/CoreTests/StoreLogTests.hs @@ -109,7 +109,7 @@ testSMPStoreLog testSuite tests = where testReadWrite SLTC {compacted, state} = do st <- newMsgStore $ testJournalStoreCfg MQStoreCfg - l <- readWriteQueueStore True (mkQueue st) testStoreLogFile $ queueStore st + l <- readWriteQueueStore True (mkQueue st True) testStoreLogFile $ queueStore st storeState st `shouldReturn` state closeStoreLog l ([], compacted') <- partitionEithers . map strDecode . B.lines <$> B.readFile testStoreLogFile diff --git a/tests/SMPClient.hs b/tests/SMPClient.hs index 602055b82..6ee05c27b 100644 --- a/tests/SMPClient.hs +++ b/tests/SMPClient.hs @@ -259,10 +259,6 @@ proxyCfgMS msType = proxyCfgJ2 :: ServerConfig proxyCfgJ2 = journalCfg proxyCfg testStoreLogFile2 testStoreMsgsDir2 --- TODO [postgres] --- proxyCfgJ2 :: ServerConfig --- proxyCfgJ2 = journalCfg proxyCfg testStoreDBOpts2 testStoreMsgsDir2 - proxyVRangeV8 :: VersionRangeSMP proxyVRangeV8 = mkVersionRange minServerSMPRelayVersion sendingProxySMPVersion diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index d7e692ea4..8fd798a6b 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -831,7 +831,7 @@ testRestoreExpireMessages = where export = do ms <- newMsgStore (testJournalStoreCfg MQStoreCfg) {quota = 4} - readWriteQueueStore True (mkQueue ms) testStoreLogFile (queueStore ms) >>= closeStoreLog + readWriteQueueStore True (mkQueue ms True) testStoreLogFile (queueStore ms) >>= closeStoreLog removeFileIfExists testStoreMsgsFile exportMessages False ms testStoreMsgsFile False closeMsgStore ms