diff --git a/src/Simplex/Messaging/Server/MsgStore/Journal.hs b/src/Simplex/Messaging/Server/MsgStore/Journal.hs index 3a639238b..e81c153de 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Journal.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Journal.hs @@ -307,7 +307,7 @@ instance QueueStoreClass (JournalQueue s) (QStore s) where newQueueStore = \case MQStoreCfg -> MQStore <$> newQueueStore @(JournalQueue s) () #if defined(dbServerPostgres) - PQStoreCfg cfg -> PQStore <$> newQueueStore @(JournalQueue s) cfg + PQStoreCfg cfg -> PQStore <$> newQueueStore @(JournalQueue s) (cfg, True) #endif closeQueueStore = withQS (closeQueueStore @(JournalQueue s)) diff --git a/src/Simplex/Messaging/Server/MsgStore/Postgres.hs b/src/Simplex/Messaging/Server/MsgStore/Postgres.hs index 5716dc2c6..a0eb1d1ca 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Postgres.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Postgres.hs @@ -97,7 +97,7 @@ instance MsgStoreClass PostgresMsgStore where newMsgStore :: PostgresMsgStoreCfg -> IO PostgresMsgStore newMsgStore config = do - queueStore_ <- newQueueStore @PostgresQueue $ queueStoreCfg config + queueStore_ <- newQueueStore @PostgresQueue (queueStoreCfg config, False) pure PostgresMsgStore {config, queueStore_} closeMsgStore :: PostgresMsgStore -> IO () diff --git a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs index 0b7e73b1a..4a53dcdd4 100644 --- a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs +++ b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs @@ -109,14 +109,17 @@ data PostgresQueueStore q = PostgresQueueStore notifiers :: TMap NotifierId RecipientId, notifierLocks :: TMap NotifierId Lock, serviceLocks :: TMap CertFingerprint Lock, - deletedTTL :: Int64 + deletedTTL :: Int64, + useCache :: Bool } -instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where - type QueueStoreCfg (PostgresQueueStore q) = PostgresStoreCfg +type UseQueueCache = Bool - newQueueStore :: PostgresStoreCfg -> IO (PostgresQueueStore q) - newQueueStore PostgresStoreCfg {dbOpts, dbStoreLogPath, confirmMigrations, deletedTTL} = do +instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where + type QueueStoreCfg (PostgresQueueStore q) = (PostgresStoreCfg, UseQueueCache) + + newQueueStore :: (PostgresStoreCfg, UseQueueCache) -> IO (PostgresQueueStore q) + newQueueStore (PostgresStoreCfg {dbOpts, dbStoreLogPath, confirmMigrations, deletedTTL}, useCache) = do dbStore <- either err pure =<< createDBStore dbOpts serverMigrations (MigrationConfig confirmMigrations Nothing) dbStoreLog <- mapM (openWriteStoreLog True) dbStoreLogPath queues <- TM.emptyIO @@ -125,7 +128,7 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where notifiers <- TM.emptyIO notifierLocks <- TM.emptyIO serviceLocks <- TM.emptyIO - pure PostgresQueueStore {dbStore, dbStoreLog, queues, senders, links, notifiers, notifierLocks, serviceLocks, deletedTTL} + pure PostgresQueueStore {dbStore, dbStoreLog, queues, senders, links, notifiers, notifierLocks, serviceLocks, deletedTTL, useCache} where err e = do logError $ "STORE: newQueueStore, error opening PostgreSQL database, " <> tshow e @@ -172,28 +175,35 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where void $ withDB "addQueue_" st $ \db -> E.try (DB.execute db insertQueueQuery $ queueRecToRow (rId, qr)) >>= bimapM handleDuplicate pure - atomically $ TM.insert rId sq queues - atomically $ TM.insert (senderId qr) rId senders - forM_ (notifier qr) $ \NtfCreds {notifierId = nId} -> atomically $ TM.insert nId rId notifiers - forM_ (queueData qr) $ \(lnkId, _) -> atomically $ TM.insert lnkId rId links + when useCache $ do + atomically $ TM.insert rId sq queues + atomically $ TM.insert (senderId qr) rId senders + forM_ (notifier qr) $ \NtfCreds {notifierId = nId} -> atomically $ TM.insert nId rId notifiers + forM_ (queueData qr) $ \(lnkId, _) -> atomically $ TM.insert lnkId rId links withLog "addStoreQueue" st $ \s -> logCreateQueue s rId qr pure sq where - PostgresQueueStore {queues, senders, links, notifiers} = st + PostgresQueueStore {queues, senders, links, notifiers, useCache} = st -- Not doing duplicate checks in maps as the probability of duplicates is very low. -- It needs to be reconsidered when IDs are supplied by the users. -- hasId = anyM [TM.memberIO rId queues, TM.memberIO senderId senders, hasNotifier] -- hasNotifier = maybe (pure False) (\NtfCreds {notifierId} -> TM.memberIO notifierId notifiers) notifier getQueue_ :: QueueParty p => PostgresQueueStore q -> (Bool -> RecipientId -> QueueRec -> IO q) -> SParty p -> QueueId -> IO (Either ErrorType q) - getQueue_ st mkQ party qId = case party of - SRecipient -> getRcvQueue qId - SSender -> TM.lookupIO qId senders >>= maybe (mask loadSndQueue) getRcvQueue - SSenderLink -> TM.lookupIO qId links >>= maybe (mask loadLinkQueue) getRcvQueue - -- loaded queue is deleted from notifiers map to reduce cache size after queue was subscribed to by ntf server - SNotifier -> TM.lookupIO qId notifiers >>= maybe (mask loadNtfQueue) (getRcvQueue >=> (atomically (TM.delete qId notifiers) $>)) + getQueue_ st mkQ party qId + | useCache = case party of + SRecipient -> getRcvQueue qId + SSender -> TM.lookupIO qId senders >>= maybe (mask loadSndQueue) getRcvQueue + SSenderLink -> TM.lookupIO qId links >>= maybe (mask loadLinkQueue) getRcvQueue + -- loaded queue is deleted from notifiers map to reduce cache size after queue was subscribed to by ntf server + SNotifier -> TM.lookupIO qId notifiers >>= maybe (mask loadNtfQueue) (getRcvQueue >=> (atomically (TM.delete qId notifiers) $>)) + | otherwise = case party of + SRecipient -> loadQueueNoCache " WHERE recipient_id = ?" + SSender -> loadQueueNoCache " WHERE sender_id = ?" + SSenderLink -> loadQueueNoCache " WHERE link_id = ?" + SNotifier -> loadQueueNoCache " WHERE notifier_id = ?" where - PostgresQueueStore {queues, senders, links, notifiers} = st + PostgresQueueStore {queues, senders, links, notifiers, useCache} = st getRcvQueue rId = TM.lookupIO rId queues >>= maybe (mask loadRcvQueue) (pure . Right) loadRcvQueue = do (rId, qRec) <- loadQueue " WHERE recipient_id = ?" @@ -210,6 +220,7 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where liftIO $ TM.lookupIO rId queues -- checking recipient map first >>= maybe (cacheQueue rId qRec cacheSender) (atomically (cacheSender rId) $>) + loadQueueNoCache cond = mask $ loadQueue cond >>= liftIO . uncurry (mkQ True) mask = E.uninterruptibleMask_ . runExceptT cacheSender rId = TM.insert qId rId senders loadQueue condition = @@ -232,20 +243,27 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where pure sq getQueues_ :: forall p. BatchParty p => PostgresQueueStore q -> (Bool -> RecipientId -> QueueRec -> IO q) -> SParty p -> [QueueId] -> IO [Either ErrorType q] - getQueues_ st mkQ party qIds = case party of - SRecipient -> do - qs <- readTVarIO queues - let qs' = map (\qId -> get qs qId qId) qIds - E.uninterruptibleMask_ $ loadQueues qs' " WHERE recipient_id IN ?" cacheRcvQueue - SNotifier -> do - ns <- readTVarIO notifiers - qs <- readTVarIO queues - let qs' = map (\qId -> get ns qId qId >>= get qs qId) qIds - E.uninterruptibleMask_ $ loadQueues qs' " WHERE notifier_id IN ?" $ \(rId, qRec) -> - forM (notifier qRec) $ \NtfCreds {notifierId = nId} -> -- it is always Just with this query - (nId,) <$> maybe (mkQ False rId qRec) pure (M.lookup rId qs) + getQueues_ st mkQ party qIds + | null qIds = pure [] + | useCache = case party of + SRecipient -> do + qs <- readTVarIO queues + let qs' = map (\qId -> get qs qId qId) qIds + E.uninterruptibleMask_ $ loadQueues qs' " WHERE recipient_id IN ?" cacheRcvQueue + SNotifier -> do + ns <- readTVarIO notifiers + qs <- readTVarIO queues + let qs' = map (\qId -> get ns qId qId >>= get qs qId) qIds + E.uninterruptibleMask_ $ loadQueues qs' " WHERE notifier_id IN ?" $ \(rId, qRec) -> + forM (notifier qRec) $ \NtfCreds {notifierId = nId} -> -- it is always Just with this query + (nId,) <$> maybe (mkQ False rId qRec) pure (M.lookup rId qs) + | otherwise = E.uninterruptibleMask_ $ case party of + SRecipient -> loadQueuesNoCache " WHERE recipient_id IN ?" $ \(rId, qRec) -> + Just . (rId,) <$> mkQ False rId qRec + SNotifier -> loadQueuesNoCache " WHERE notifier_id IN ?" $ \(rId, qRec) -> + forM (notifier qRec) $ \NtfCreds {notifierId = nId} -> (nId,) <$> mkQ False rId qRec where - PostgresQueueStore {queues, notifiers} = st + PostgresQueueStore {queues, notifiers, useCache} = st get :: M.Map QueueId a -> QueueId -> QueueId -> Either QueueId a get m qId = maybe (Left qId) Right . (`M.lookup` m) loadQueues :: [Either QueueId q] -> Query -> ((RecipientId, QueueRec) -> IO (Maybe (QueueId, q))) -> IO [Either ErrorType q] @@ -254,15 +272,16 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where if null qIds' then pure $ map (first (const INTERNAL)) qs' else do - qs_ <- - runExceptT $ fmap M.fromList $ - withDB' "getQueues_" st (\db -> DB.query db (queueRecQuery <> cond <> " AND deleted_at IS NULL") (Only (In qIds'))) - >>= liftIO . fmap catMaybes . mapM (mkCacheQueue . rowToQueueRec) + qs_ <- dbLoadQueues qIds' cond mkCacheQueue pure $ map (result qs_) qs' where result :: Either ErrorType (M.Map QueueId q) -> Either QueueId q -> Either ErrorType q result _ (Right q) = Right q result qs_ (Left qId) = maybe (Left AUTH) Right . M.lookup qId =<< qs_ + dbLoadQueues qIds' cond mkQueue' = + runExceptT $ fmap M.fromList $ + withDB' "getQueues_" st (\db -> DB.query db (queueRecQuery <> cond <> " AND deleted_at IS NULL") (Only (In qIds'))) + >>= liftIO . fmap catMaybes . mapM (mkQueue' . rowToQueueRec) cacheRcvQueue (rId, qRec) = do sq <- mkQ True rId qRec sq' <- withQueueLock sq "getQueue_" $ atomically $ @@ -271,6 +290,12 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where Just sq' -> pure sq' Nothing -> sq <$ TM.insert rId sq queues pure $ Just (rId, sq') + loadQueuesNoCache cond mkQueue' = do + qs_ <- dbLoadQueues qIds cond mkQueue' + pure $ map (result qs_) qIds + where + result :: Either ErrorType (M.Map QueueId q) -> QueueId -> Either ErrorType q + result qs_ qId = maybe (Left AUTH) Right . M.lookup qId =<< qs_ getQueueLinkData :: PostgresQueueStore q -> q -> LinkId -> IO (Either ErrorType QueueLinkData) getQueueLinkData st sq lnkId = runExceptT $ do @@ -336,19 +361,23 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where addQueueNotifier :: PostgresQueueStore q -> q -> NtfCreds -> IO (Either ErrorType (Maybe NtfCreds)) addQueueNotifier st sq ntfCreds@NtfCreds {notifierId = nId, notifierKey, rcvNtfDhSecret} = withQueueRec sq "addQueueNotifier" $ \q -> - ExceptT $ withLockMap (notifierLocks st) nId "addQueueNotifier" $ - ifM (TM.memberIO nId notifiers) (pure $ Left DUPLICATE_) $ runExceptT $ do - assertUpdated $ withDB "addQueueNotifier" st $ \db -> - E.try (update db) >>= bimapM handleDuplicate pure - nc_ <- forM (notifier q) $ \nc@NtfCreds {notifierId} -> atomically (TM.delete notifierId notifiers) $> nc - let !q' = q {notifier = Just ntfCreds} - atomically $ writeTVar (queueRec sq) $ Just q' - -- cache queue notifier ID – after notifier is added ntf server will likely subscribe + checkCachedNotifier $ do + assertUpdated $ withDB "addQueueNotifier" st $ \db -> + E.try (update db) >>= bimapM handleDuplicate pure + nc_ <- forM (notifier q) $ \nc@NtfCreds {notifierId} -> atomically (TM.delete notifierId notifiers) $> nc + let !q' = q {notifier = Just ntfCreds} + atomically $ writeTVar (queueRec sq) $ Just q' + when useCache $ do atomically $ TM.insert nId rId notifiers - withLog "addQueueNotifier" st $ \s -> logAddNotifier s rId ntfCreds - pure nc_ + withLog "addQueueNotifier" st $ \s -> logAddNotifier s rId ntfCreds + pure nc_ where - PostgresQueueStore {notifiers} = st + checkCachedNotifier add + | useCache = + ExceptT $ withLockMap (notifierLocks st) nId "addQueueNotifier" $ + ifM (TM.memberIO nId notifiers) (pure $ Left DUPLICATE_) $ runExceptT add + | otherwise = add + PostgresQueueStore {notifiers, useCache} = st rId = recipientId sq update db = DB.execute @@ -364,13 +393,16 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where deleteQueueNotifier st sq = withQueueRec sq "deleteQueueNotifier" $ \q -> ExceptT $ fmap sequence $ forM (notifier q) $ \nc@NtfCreds {notifierId = nId} -> - withLockMap (notifierLocks st) nId "deleteQueueNotifier" $ runExceptT $ do + withNotifierLock nId $ runExceptT $ do assertUpdated $ withDB' "deleteQueueNotifier" st update - atomically $ TM.delete nId $ notifiers st + when (useCache st) $ atomically $ TM.delete nId $ notifiers st atomically $ writeTVar (queueRec sq) $ Just q {notifier = Nothing} withLog "deleteQueueNotifier" st (`logDeleteNotifier` rId) pure nc where + withNotifierLock nId + | useCache st = withLockMap (notifierLocks st) nId "deleteQueueNotifier" + | otherwise = id rId = recipientId sq update db = DB.execute @@ -420,10 +452,11 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where assertUpdated $ withDB' "deleteStoreQueue" st $ \db -> DB.execute db "UPDATE msg_queues SET deleted_at = ? WHERE recipient_id = ? AND deleted_at IS NULL" (ts, rId) atomically $ writeTVar qr Nothing - atomically $ TM.delete (senderId q) $ senders st - forM_ (notifier q) $ \NtfCreds {notifierId} -> do - atomically $ TM.delete notifierId $ notifiers st - atomically $ TM.delete notifierId $ notifierLocks st + when (useCache st) $ do + atomically $ TM.delete (senderId q) $ senders st + forM_ (notifier q) $ \NtfCreds {notifierId} -> do + atomically $ TM.delete notifierId $ notifiers st + atomically $ TM.delete notifierId $ notifierLocks st withLog "deleteStoreQueue" st (`logDeleteQueue` rId) pure q where diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index 11ff98b33..a6ee6d7f2 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -1539,11 +1539,11 @@ testOldContactQueueShortLink ps@(_, msType) = withAgentClients2 $ \a b -> do ASSCfg _ _ SSCMemoryJournal {storeLogFile} -> updateStoreLog storeLogFile #if defined(dbServerPostgres) ASSCfg _ _ SSCDatabaseJournal {storeCfg} -> do - st :: PostgresQueueStore (JournalQueue 'QSPostgres) <- newQueueStore @(JournalQueue 'QSPostgres) storeCfg + st :: PostgresQueueStore (JournalQueue 'QSPostgres) <- newQueueStore @(JournalQueue 'QSPostgres) (storeCfg, True) updateDbStore st closeQueueStore @(JournalQueue 'QSPostgres) st ASSCfg _ _ (SSCDatabase storeCfg) -> do - st :: PostgresQueueStore PostgresQueue <- newQueueStore @PostgresQueue storeCfg + st :: PostgresQueueStore PostgresQueue <- newQueueStore @PostgresQueue (storeCfg, False) updateDbStore st closeQueueStore @PostgresQueue st #else