Merge branch 'master' into short-links

This commit is contained in:
Evgeny Poberezkin
2025-03-28 19:50:18 +00:00
16 changed files with 215 additions and 96 deletions
@@ -155,39 +155,52 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
-- hasId = anyM [TM.memberIO rId queues, TM.memberIO senderId senders, hasNotifier]
-- hasNotifier = maybe (pure False) (\NtfCreds {notifierId} -> TM.memberIO notifierId notifiers) notifier
getQueue_ :: DirectParty p => PostgresQueueStore q -> (RecipientId -> QueueRec -> IO q) -> SParty p -> QueueId -> IO (Either ErrorType q)
getQueue_ :: DirectParty p => PostgresQueueStore q -> (Bool -> RecipientId -> QueueRec -> IO q) -> SParty p -> QueueId -> IO (Either ErrorType q)
getQueue_ st mkQ party qId = case party of
SRecipient -> getRcvQueue qId
SSender -> TM.lookupIO qId senders >>= maybe loadSndQueue getRcvQueue
SNotifier -> TM.lookupIO qId notifiers >>= maybe loadNtfQueue getRcvQueue
SLinkClient -> loadLinkQueue
SSender -> TM.lookupIO qId senders >>= maybe (mask loadSndQueue) getRcvQueue
-- TODO [short links] use map of link IDs - queue will be added there on creation in case there is data
SLinkClient -> mask loadLinkQueue
-- loaded queue is deleted from notifiers map to reduce cache size after queue was subscribed to by ntf server
SNotifier -> TM.lookupIO qId notifiers >>= maybe (mask loadNtfQueue) (getRcvQueue >=> (atomically (TM.delete qId notifiers) $>))
where
PostgresQueueStore {queues, senders, notifiers} = st
getRcvQueue rId = TM.lookupIO rId queues >>= maybe loadRcvQueue (pure . Right)
loadRcvQueue = loadQueue " WHERE recipient_id = ?" $ \_ -> pure ()
loadSndQueue = loadQueue " WHERE sender_id = ?" $ \rId -> TM.insert qId rId senders
loadNtfQueue = loadQueue " WHERE notifier_id = ?" $ \_ -> pure () -- do NOT cache ref - ntf subscriptions are rare
loadLinkQueue = loadQueue " WHERE link_id = ?" $ \_ -> pure ()
loadQueue condition insertRef =
E.uninterruptibleMask_ $ runExceptT $ do
(rId, qRec) <-
withDB "getQueue_" st $ \db -> firstRow rowToQueueRec AUTH $
DB.query db (queueRecQuery <> condition <> " AND deleted_at IS NULL") (Only qId)
liftIO $ do
sq <- mkQ rId qRec -- loaded queue
-- This lock prevents the scenario when the queue is added to cache,
-- while another thread is proccessing the same queue in withAllMsgQueues
-- without adding it to cache, possibly trying to open the same files twice.
-- Alse see comment in idleDeleteExpiredMsgs.
withQueueLock sq "getQueue_" $ atomically $
-- checking the cache again for concurrent reads,
-- use previously loaded queue if exists.
TM.lookup rId queues >>= \case
Just sq' -> pure sq'
Nothing -> do
insertRef rId
TM.insert rId sq queues
pure sq
getRcvQueue rId = TM.lookupIO rId queues >>= maybe (mask loadRcvQueue) (pure . Right)
loadRcvQueue = do
(rId, qRec) <- loadQueue " WHERE recipient_id = ?"
liftIO $ cacheQueue rId qRec $ \_ -> pure () -- recipient map already checked, not caching sender ref
loadSndQueue = loadSndQueue_ " WHERE sender_id = ?"
loadLinkQueue = loadSndQueue_ " WHERE link_id = ?"
loadNtfQueue = do
(rId, qRec) <- loadQueue " WHERE notifier_id = ?"
liftIO $
TM.lookupIO rId queues -- checking recipient map first, not creating lock in map, not caching queue
>>= maybe (mkQ False rId qRec) pure
loadSndQueue_ condition = do
(rId, qRec) <- loadQueue condition
liftIO $
TM.lookupIO rId queues -- checking recipient map first
>>= maybe (cacheQueue rId qRec cacheSender) (atomically (cacheSender rId) $>)
mask = E.uninterruptibleMask_ . runExceptT
cacheSender rId = TM.insert qId rId senders
loadQueue condition =
withDB "getQueue_" st $ \db -> firstRow rowToQueueRec AUTH $
DB.query db (queueRecQuery <> condition <> " AND deleted_at IS NULL") (Only qId)
cacheQueue rId qRec insertRef = do
sq <- mkQ True rId qRec -- loaded queue
-- This lock prevents the scenario when the queue is added to cache,
-- while another thread is proccessing the same queue in withAllMsgQueues
-- without adding it to cache, possibly trying to open the same files twice.
-- Alse see comment in idleDeleteExpiredMsgs.
withQueueLock sq "getQueue_" $ atomically $
-- checking the cache again for concurrent reads,
-- use previously loaded queue if exists.
TM.lookup rId queues >>= \case
Just sq' -> pure sq'
Nothing -> do
insertRef rId
TM.insert rId sq queues
pure sq
getQueueLinkData :: PostgresQueueStore q -> q -> LinkId -> IO (Either ErrorType QueueLinkData)
getQueueLinkData st sq lnkId = runExceptT $ do
@@ -328,7 +341,9 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
DB.execute db "UPDATE msg_queues SET deleted_at = ? WHERE recipient_id = ? AND deleted_at IS NULL" (ts, rId)
atomically $ writeTVar qr Nothing
atomically $ TM.delete (senderId q) $ senders st
forM_ (notifier q) $ \NtfCreds {notifierId} -> atomically $ TM.delete notifierId $ notifiers st
forM_ (notifier q) $ \NtfCreds {notifierId} -> do
atomically $ TM.delete notifierId $ notifiers st
atomically $ TM.delete notifierId $ notifierLocks st
mq_ <- atomically $ swapTVar (msgQueue sq) Nothing
withLog "deleteStoreQueue" st (`logDeleteQueue` rId)
pure (q, mq_)
@@ -96,7 +96,7 @@ instance StoreQueueClass q => QueueStoreClass q (STMQueueStore q) where
hasNotifier = maybe (pure False) (\NtfCreds {notifierId} -> TM.member notifierId notifiers) notifier
hasLink = maybe (pure False) (\(lnkId, _) -> TM.member lnkId links) queueData
getQueue_ :: DirectParty p => STMQueueStore q -> (RecipientId -> QueueRec -> IO q) -> SParty p -> QueueId -> IO (Either ErrorType q)
getQueue_ :: DirectParty p => STMQueueStore q -> (Bool -> RecipientId -> QueueRec -> IO q) -> SParty p -> QueueId -> IO (Either ErrorType q)
getQueue_ st _ party qId =
maybe (Left AUTH) Right <$> case party of
SRecipient -> TM.lookupIO qId queues
@@ -29,7 +29,7 @@ class StoreQueueClass q => QueueStoreClass q s where
loadedQueues :: s -> TMap RecipientId q
compactQueues :: s -> IO Int64
addQueue_ :: s -> (RecipientId -> QueueRec -> IO q) -> RecipientId -> QueueRec -> IO (Either ErrorType q)
getQueue_ :: DirectParty p => s -> (RecipientId -> QueueRec -> IO q) -> SParty p -> QueueId -> IO (Either ErrorType q)
getQueue_ :: DirectParty p => s -> (Bool -> RecipientId -> QueueRec -> IO q) -> SParty p -> QueueId -> IO (Either ErrorType q)
getQueueLinkData :: s -> q -> LinkId -> IO (Either ErrorType QueueLinkData)
addQueueLinkData :: s -> q -> LinkId -> QueueLinkData -> IO (Either ErrorType ())
deleteQueueLinkData :: s -> q -> IO (Either ErrorType ())