mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-30 16:26:02 +00:00
smp server: do not use queue cache with PostgreSQL message storage (#1637)
* smp server: do not use queue cache with PostgreSQL message storage * fix loading queues via notifier IDs
This commit is contained in:
@@ -307,7 +307,7 @@ instance QueueStoreClass (JournalQueue s) (QStore s) where
|
||||
newQueueStore = \case
|
||||
MQStoreCfg -> MQStore <$> newQueueStore @(JournalQueue s) ()
|
||||
#if defined(dbServerPostgres)
|
||||
PQStoreCfg cfg -> PQStore <$> newQueueStore @(JournalQueue s) cfg
|
||||
PQStoreCfg cfg -> PQStore <$> newQueueStore @(JournalQueue s) (cfg, True)
|
||||
#endif
|
||||
|
||||
closeQueueStore = withQS (closeQueueStore @(JournalQueue s))
|
||||
|
||||
@@ -97,7 +97,7 @@ instance MsgStoreClass PostgresMsgStore where
|
||||
|
||||
newMsgStore :: PostgresMsgStoreCfg -> IO PostgresMsgStore
|
||||
newMsgStore config = do
|
||||
queueStore_ <- newQueueStore @PostgresQueue $ queueStoreCfg config
|
||||
queueStore_ <- newQueueStore @PostgresQueue (queueStoreCfg config, False)
|
||||
pure PostgresMsgStore {config, queueStore_}
|
||||
|
||||
closeMsgStore :: PostgresMsgStore -> IO ()
|
||||
|
||||
@@ -109,14 +109,17 @@ data PostgresQueueStore q = PostgresQueueStore
|
||||
notifiers :: TMap NotifierId RecipientId,
|
||||
notifierLocks :: TMap NotifierId Lock,
|
||||
serviceLocks :: TMap CertFingerprint Lock,
|
||||
deletedTTL :: Int64
|
||||
deletedTTL :: Int64,
|
||||
useCache :: Bool
|
||||
}
|
||||
|
||||
instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
|
||||
type QueueStoreCfg (PostgresQueueStore q) = PostgresStoreCfg
|
||||
type UseQueueCache = Bool
|
||||
|
||||
newQueueStore :: PostgresStoreCfg -> IO (PostgresQueueStore q)
|
||||
newQueueStore PostgresStoreCfg {dbOpts, dbStoreLogPath, confirmMigrations, deletedTTL} = do
|
||||
instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
|
||||
type QueueStoreCfg (PostgresQueueStore q) = (PostgresStoreCfg, UseQueueCache)
|
||||
|
||||
newQueueStore :: (PostgresStoreCfg, UseQueueCache) -> IO (PostgresQueueStore q)
|
||||
newQueueStore (PostgresStoreCfg {dbOpts, dbStoreLogPath, confirmMigrations, deletedTTL}, useCache) = do
|
||||
dbStore <- either err pure =<< createDBStore dbOpts serverMigrations (MigrationConfig confirmMigrations Nothing)
|
||||
dbStoreLog <- mapM (openWriteStoreLog True) dbStoreLogPath
|
||||
queues <- TM.emptyIO
|
||||
@@ -125,7 +128,7 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
|
||||
notifiers <- TM.emptyIO
|
||||
notifierLocks <- TM.emptyIO
|
||||
serviceLocks <- TM.emptyIO
|
||||
pure PostgresQueueStore {dbStore, dbStoreLog, queues, senders, links, notifiers, notifierLocks, serviceLocks, deletedTTL}
|
||||
pure PostgresQueueStore {dbStore, dbStoreLog, queues, senders, links, notifiers, notifierLocks, serviceLocks, deletedTTL, useCache}
|
||||
where
|
||||
err e = do
|
||||
logError $ "STORE: newQueueStore, error opening PostgreSQL database, " <> tshow e
|
||||
@@ -172,28 +175,35 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
|
||||
void $ withDB "addQueue_" st $ \db ->
|
||||
E.try (DB.execute db insertQueueQuery $ queueRecToRow (rId, qr))
|
||||
>>= bimapM handleDuplicate pure
|
||||
atomically $ TM.insert rId sq queues
|
||||
atomically $ TM.insert (senderId qr) rId senders
|
||||
forM_ (notifier qr) $ \NtfCreds {notifierId = nId} -> atomically $ TM.insert nId rId notifiers
|
||||
forM_ (queueData qr) $ \(lnkId, _) -> atomically $ TM.insert lnkId rId links
|
||||
when useCache $ do
|
||||
atomically $ TM.insert rId sq queues
|
||||
atomically $ TM.insert (senderId qr) rId senders
|
||||
forM_ (notifier qr) $ \NtfCreds {notifierId = nId} -> atomically $ TM.insert nId rId notifiers
|
||||
forM_ (queueData qr) $ \(lnkId, _) -> atomically $ TM.insert lnkId rId links
|
||||
withLog "addStoreQueue" st $ \s -> logCreateQueue s rId qr
|
||||
pure sq
|
||||
where
|
||||
PostgresQueueStore {queues, senders, links, notifiers} = st
|
||||
PostgresQueueStore {queues, senders, links, notifiers, useCache} = st
|
||||
-- Not doing duplicate checks in maps as the probability of duplicates is very low.
|
||||
-- It needs to be reconsidered when IDs are supplied by the users.
|
||||
-- hasId = anyM [TM.memberIO rId queues, TM.memberIO senderId senders, hasNotifier]
|
||||
-- hasNotifier = maybe (pure False) (\NtfCreds {notifierId} -> TM.memberIO notifierId notifiers) notifier
|
||||
|
||||
getQueue_ :: QueueParty p => PostgresQueueStore q -> (Bool -> RecipientId -> QueueRec -> IO q) -> SParty p -> QueueId -> IO (Either ErrorType q)
|
||||
getQueue_ st mkQ party qId = case party of
|
||||
SRecipient -> getRcvQueue qId
|
||||
SSender -> TM.lookupIO qId senders >>= maybe (mask loadSndQueue) getRcvQueue
|
||||
SSenderLink -> TM.lookupIO qId links >>= maybe (mask loadLinkQueue) getRcvQueue
|
||||
-- loaded queue is deleted from notifiers map to reduce cache size after queue was subscribed to by ntf server
|
||||
SNotifier -> TM.lookupIO qId notifiers >>= maybe (mask loadNtfQueue) (getRcvQueue >=> (atomically (TM.delete qId notifiers) $>))
|
||||
getQueue_ st mkQ party qId
|
||||
| useCache = case party of
|
||||
SRecipient -> getRcvQueue qId
|
||||
SSender -> TM.lookupIO qId senders >>= maybe (mask loadSndQueue) getRcvQueue
|
||||
SSenderLink -> TM.lookupIO qId links >>= maybe (mask loadLinkQueue) getRcvQueue
|
||||
-- loaded queue is deleted from notifiers map to reduce cache size after queue was subscribed to by ntf server
|
||||
SNotifier -> TM.lookupIO qId notifiers >>= maybe (mask loadNtfQueue) (getRcvQueue >=> (atomically (TM.delete qId notifiers) $>))
|
||||
| otherwise = case party of
|
||||
SRecipient -> loadQueueNoCache " WHERE recipient_id = ?"
|
||||
SSender -> loadQueueNoCache " WHERE sender_id = ?"
|
||||
SSenderLink -> loadQueueNoCache " WHERE link_id = ?"
|
||||
SNotifier -> loadQueueNoCache " WHERE notifier_id = ?"
|
||||
where
|
||||
PostgresQueueStore {queues, senders, links, notifiers} = st
|
||||
PostgresQueueStore {queues, senders, links, notifiers, useCache} = st
|
||||
getRcvQueue rId = TM.lookupIO rId queues >>= maybe (mask loadRcvQueue) (pure . Right)
|
||||
loadRcvQueue = do
|
||||
(rId, qRec) <- loadQueue " WHERE recipient_id = ?"
|
||||
@@ -210,6 +220,7 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
|
||||
liftIO $
|
||||
TM.lookupIO rId queues -- checking recipient map first
|
||||
>>= maybe (cacheQueue rId qRec cacheSender) (atomically (cacheSender rId) $>)
|
||||
loadQueueNoCache cond = mask $ loadQueue cond >>= liftIO . uncurry (mkQ True)
|
||||
mask = E.uninterruptibleMask_ . runExceptT
|
||||
cacheSender rId = TM.insert qId rId senders
|
||||
loadQueue condition =
|
||||
@@ -232,20 +243,27 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
|
||||
pure sq
|
||||
|
||||
getQueues_ :: forall p. BatchParty p => PostgresQueueStore q -> (Bool -> RecipientId -> QueueRec -> IO q) -> SParty p -> [QueueId] -> IO [Either ErrorType q]
|
||||
getQueues_ st mkQ party qIds = case party of
|
||||
SRecipient -> do
|
||||
qs <- readTVarIO queues
|
||||
let qs' = map (\qId -> get qs qId qId) qIds
|
||||
E.uninterruptibleMask_ $ loadQueues qs' " WHERE recipient_id IN ?" cacheRcvQueue
|
||||
SNotifier -> do
|
||||
ns <- readTVarIO notifiers
|
||||
qs <- readTVarIO queues
|
||||
let qs' = map (\qId -> get ns qId qId >>= get qs qId) qIds
|
||||
E.uninterruptibleMask_ $ loadQueues qs' " WHERE notifier_id IN ?" $ \(rId, qRec) ->
|
||||
forM (notifier qRec) $ \NtfCreds {notifierId = nId} -> -- it is always Just with this query
|
||||
(nId,) <$> maybe (mkQ False rId qRec) pure (M.lookup rId qs)
|
||||
getQueues_ st mkQ party qIds
|
||||
| null qIds = pure []
|
||||
| useCache = case party of
|
||||
SRecipient -> do
|
||||
qs <- readTVarIO queues
|
||||
let qs' = map (\qId -> get qs qId qId) qIds
|
||||
E.uninterruptibleMask_ $ loadQueues qs' " WHERE recipient_id IN ?" cacheRcvQueue
|
||||
SNotifier -> do
|
||||
ns <- readTVarIO notifiers
|
||||
qs <- readTVarIO queues
|
||||
let qs' = map (\qId -> get ns qId qId >>= get qs qId) qIds
|
||||
E.uninterruptibleMask_ $ loadQueues qs' " WHERE notifier_id IN ?" $ \(rId, qRec) ->
|
||||
forM (notifier qRec) $ \NtfCreds {notifierId = nId} -> -- it is always Just with this query
|
||||
(nId,) <$> maybe (mkQ False rId qRec) pure (M.lookup rId qs)
|
||||
| otherwise = E.uninterruptibleMask_ $ case party of
|
||||
SRecipient -> loadQueuesNoCache " WHERE recipient_id IN ?" $ \(rId, qRec) ->
|
||||
Just . (rId,) <$> mkQ False rId qRec
|
||||
SNotifier -> loadQueuesNoCache " WHERE notifier_id IN ?" $ \(rId, qRec) ->
|
||||
forM (notifier qRec) $ \NtfCreds {notifierId = nId} -> (nId,) <$> mkQ False rId qRec
|
||||
where
|
||||
PostgresQueueStore {queues, notifiers} = st
|
||||
PostgresQueueStore {queues, notifiers, useCache} = st
|
||||
get :: M.Map QueueId a -> QueueId -> QueueId -> Either QueueId a
|
||||
get m qId = maybe (Left qId) Right . (`M.lookup` m)
|
||||
loadQueues :: [Either QueueId q] -> Query -> ((RecipientId, QueueRec) -> IO (Maybe (QueueId, q))) -> IO [Either ErrorType q]
|
||||
@@ -254,15 +272,16 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
|
||||
if null qIds'
|
||||
then pure $ map (first (const INTERNAL)) qs'
|
||||
else do
|
||||
qs_ <-
|
||||
runExceptT $ fmap M.fromList $
|
||||
withDB' "getQueues_" st (\db -> DB.query db (queueRecQuery <> cond <> " AND deleted_at IS NULL") (Only (In qIds')))
|
||||
>>= liftIO . fmap catMaybes . mapM (mkCacheQueue . rowToQueueRec)
|
||||
qs_ <- dbLoadQueues qIds' cond mkCacheQueue
|
||||
pure $ map (result qs_) qs'
|
||||
where
|
||||
result :: Either ErrorType (M.Map QueueId q) -> Either QueueId q -> Either ErrorType q
|
||||
result _ (Right q) = Right q
|
||||
result qs_ (Left qId) = maybe (Left AUTH) Right . M.lookup qId =<< qs_
|
||||
dbLoadQueues qIds' cond mkQueue' =
|
||||
runExceptT $ fmap M.fromList $
|
||||
withDB' "getQueues_" st (\db -> DB.query db (queueRecQuery <> cond <> " AND deleted_at IS NULL") (Only (In qIds')))
|
||||
>>= liftIO . fmap catMaybes . mapM (mkQueue' . rowToQueueRec)
|
||||
cacheRcvQueue (rId, qRec) = do
|
||||
sq <- mkQ True rId qRec
|
||||
sq' <- withQueueLock sq "getQueue_" $ atomically $
|
||||
@@ -271,6 +290,12 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
|
||||
Just sq' -> pure sq'
|
||||
Nothing -> sq <$ TM.insert rId sq queues
|
||||
pure $ Just (rId, sq')
|
||||
loadQueuesNoCache cond mkQueue' = do
|
||||
qs_ <- dbLoadQueues qIds cond mkQueue'
|
||||
pure $ map (result qs_) qIds
|
||||
where
|
||||
result :: Either ErrorType (M.Map QueueId q) -> QueueId -> Either ErrorType q
|
||||
result qs_ qId = maybe (Left AUTH) Right . M.lookup qId =<< qs_
|
||||
|
||||
getQueueLinkData :: PostgresQueueStore q -> q -> LinkId -> IO (Either ErrorType QueueLinkData)
|
||||
getQueueLinkData st sq lnkId = runExceptT $ do
|
||||
@@ -336,19 +361,23 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
|
||||
addQueueNotifier :: PostgresQueueStore q -> q -> NtfCreds -> IO (Either ErrorType (Maybe NtfCreds))
|
||||
addQueueNotifier st sq ntfCreds@NtfCreds {notifierId = nId, notifierKey, rcvNtfDhSecret} =
|
||||
withQueueRec sq "addQueueNotifier" $ \q ->
|
||||
ExceptT $ withLockMap (notifierLocks st) nId "addQueueNotifier" $
|
||||
ifM (TM.memberIO nId notifiers) (pure $ Left DUPLICATE_) $ runExceptT $ do
|
||||
assertUpdated $ withDB "addQueueNotifier" st $ \db ->
|
||||
E.try (update db) >>= bimapM handleDuplicate pure
|
||||
nc_ <- forM (notifier q) $ \nc@NtfCreds {notifierId} -> atomically (TM.delete notifierId notifiers) $> nc
|
||||
let !q' = q {notifier = Just ntfCreds}
|
||||
atomically $ writeTVar (queueRec sq) $ Just q'
|
||||
-- cache queue notifier ID – after notifier is added ntf server will likely subscribe
|
||||
checkCachedNotifier $ do
|
||||
assertUpdated $ withDB "addQueueNotifier" st $ \db ->
|
||||
E.try (update db) >>= bimapM handleDuplicate pure
|
||||
nc_ <- forM (notifier q) $ \nc@NtfCreds {notifierId} -> atomically (TM.delete notifierId notifiers) $> nc
|
||||
let !q' = q {notifier = Just ntfCreds}
|
||||
atomically $ writeTVar (queueRec sq) $ Just q'
|
||||
when useCache $ do
|
||||
atomically $ TM.insert nId rId notifiers
|
||||
withLog "addQueueNotifier" st $ \s -> logAddNotifier s rId ntfCreds
|
||||
pure nc_
|
||||
withLog "addQueueNotifier" st $ \s -> logAddNotifier s rId ntfCreds
|
||||
pure nc_
|
||||
where
|
||||
PostgresQueueStore {notifiers} = st
|
||||
checkCachedNotifier add
|
||||
| useCache =
|
||||
ExceptT $ withLockMap (notifierLocks st) nId "addQueueNotifier" $
|
||||
ifM (TM.memberIO nId notifiers) (pure $ Left DUPLICATE_) $ runExceptT add
|
||||
| otherwise = add
|
||||
PostgresQueueStore {notifiers, useCache} = st
|
||||
rId = recipientId sq
|
||||
update db =
|
||||
DB.execute
|
||||
@@ -364,13 +393,16 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
|
||||
deleteQueueNotifier st sq =
|
||||
withQueueRec sq "deleteQueueNotifier" $ \q ->
|
||||
ExceptT $ fmap sequence $ forM (notifier q) $ \nc@NtfCreds {notifierId = nId} ->
|
||||
withLockMap (notifierLocks st) nId "deleteQueueNotifier" $ runExceptT $ do
|
||||
withNotifierLock nId $ runExceptT $ do
|
||||
assertUpdated $ withDB' "deleteQueueNotifier" st update
|
||||
atomically $ TM.delete nId $ notifiers st
|
||||
when (useCache st) $ atomically $ TM.delete nId $ notifiers st
|
||||
atomically $ writeTVar (queueRec sq) $ Just q {notifier = Nothing}
|
||||
withLog "deleteQueueNotifier" st (`logDeleteNotifier` rId)
|
||||
pure nc
|
||||
where
|
||||
withNotifierLock nId
|
||||
| useCache st = withLockMap (notifierLocks st) nId "deleteQueueNotifier"
|
||||
| otherwise = id
|
||||
rId = recipientId sq
|
||||
update db =
|
||||
DB.execute
|
||||
@@ -420,10 +452,11 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
|
||||
assertUpdated $ withDB' "deleteStoreQueue" st $ \db ->
|
||||
DB.execute db "UPDATE msg_queues SET deleted_at = ? WHERE recipient_id = ? AND deleted_at IS NULL" (ts, rId)
|
||||
atomically $ writeTVar qr Nothing
|
||||
atomically $ TM.delete (senderId q) $ senders st
|
||||
forM_ (notifier q) $ \NtfCreds {notifierId} -> do
|
||||
atomically $ TM.delete notifierId $ notifiers st
|
||||
atomically $ TM.delete notifierId $ notifierLocks st
|
||||
when (useCache st) $ do
|
||||
atomically $ TM.delete (senderId q) $ senders st
|
||||
forM_ (notifier q) $ \NtfCreds {notifierId} -> do
|
||||
atomically $ TM.delete notifierId $ notifiers st
|
||||
atomically $ TM.delete notifierId $ notifierLocks st
|
||||
withLog "deleteStoreQueue" st (`logDeleteQueue` rId)
|
||||
pure q
|
||||
where
|
||||
|
||||
Reference in New Issue
Block a user