mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-25 20:44:49 +00:00
smp server: update STM cache only after PostgreSQL update (#1470)
* refactor journal store * update postgres store methods to update STM cache only after DB update * todos * drop Co-authored-by: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com> * fix GHC 8.10.7 --------- Co-authored-by: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com>
This commit is contained in:
@@ -1829,8 +1829,6 @@ processServerMessages StartOptions {skipWarnings} = do
|
||||
Just f -> ifM (doesFileExist f) (Just <$> importMessages False ms f old_ skipWarnings) (pure Nothing)
|
||||
Nothing -> pure Nothing
|
||||
AMS _ SMSJournal ms -> processJournalMessages old_ expire ms
|
||||
-- TODO [postgres] is it needed?
|
||||
-- AMS (SType SQSPostgres SMSJournal) ms -> processJournalMessages old_ expire ms
|
||||
processJournalMessages :: forall s. Maybe Int64 -> Bool -> JournalMsgStore s -> IO (Maybe MessageStats)
|
||||
processJournalMessages old_ expire ms
|
||||
| expire = Just <$> case old_ of
|
||||
|
||||
@@ -27,6 +27,7 @@ import qualified Data.X509.File as XF
|
||||
import Data.X509.Validation (Fingerprint (..))
|
||||
import Network.Socket (HostName, ServiceName)
|
||||
import Options.Applicative
|
||||
import Simplex.Messaging.Agent.Store.Postgres.Common (DBOpts (..))
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Protocol (ProtoServerWithAuth (..), ProtocolServer (..), ProtocolTypeI)
|
||||
import Simplex.Messaging.Server.Env.STM (AServerStoreCfg (..), ServerStoreCfg (..), StorePaths (..))
|
||||
@@ -297,17 +298,21 @@ printServerConfig transports logFile = do
|
||||
putStrLn $ case logFile of
|
||||
Just f -> "Store log: " <> f
|
||||
_ -> "Store log disabled."
|
||||
forM_ transports $ \(p, ATransport t, addHTTP) -> do
|
||||
let descr = p <> " (" <> transportName t <> ")..."
|
||||
putStrLn $ "Serving SMP protocol on port " <> descr
|
||||
when addHTTP $ putStrLn $ "Serving static site on port " <> descr
|
||||
printServerTransports transports
|
||||
|
||||
printServerTransports :: [(ServiceName, ATransport, AddHTTP)] -> IO ()
|
||||
printServerTransports = mapM_ $ \(p, ATransport t, addHTTP) -> do
|
||||
let descr = p <> " (" <> transportName t <> ")..."
|
||||
putStrLn $ "Serving SMP protocol on port " <> descr
|
||||
when addHTTP $ putStrLn $ "Serving static site on port " <> descr
|
||||
|
||||
-- TODO [postgres]
|
||||
printSMPServerConfig :: [(ServiceName, ATransport, AddHTTP)] -> AServerStoreCfg -> IO ()
|
||||
printSMPServerConfig transports (ASSCfg _ _ cfg) = printServerConfig transports $ case cfg of
|
||||
SSCMemory sp_ -> (\StorePaths {storeLogFile} -> storeLogFile) <$> sp_
|
||||
SSCMemoryJournal {storeLogFile} -> Just storeLogFile
|
||||
SSCDatabaseJournal {} -> Just "postgres database"
|
||||
printSMPServerConfig transports (ASSCfg _ _ cfg) = case cfg of
|
||||
SSCMemory sp_ -> printServerConfig transports $ (\StorePaths {storeLogFile} -> storeLogFile) <$> sp_
|
||||
SSCMemoryJournal {storeLogFile} -> printServerConfig transports $ Just storeLogFile
|
||||
SSCDatabaseJournal {storeDBOpts = DBOpts {connstr, schema}} -> do
|
||||
B.putStrLn $ "PostgreSQL database: " <> connstr <> ", schema: " <> schema
|
||||
printServerTransports transports
|
||||
|
||||
deleteDirIfExists :: FilePath -> IO ()
|
||||
deleteDirIfExists path = whenM (doesDirectoryExist path) $ removeDirectoryRecursive path
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
{-# LANGUAGE CPP #-}
|
||||
{-# LANGUAGE AllowAmbiguousTypes #-}
|
||||
{-# LANGUAGE DataKinds #-}
|
||||
{-# LANGUAGE DuplicateRecordFields #-}
|
||||
@@ -12,6 +13,9 @@
|
||||
{-# LANGUAGE TypeFamilies #-}
|
||||
{-# LANGUAGE TypeApplications #-}
|
||||
{-# LANGUAGE TypeOperators #-}
|
||||
#if __GLASGOW_HASKELL__ == 810
|
||||
{-# LANGUAGE UndecidableInstances #-}
|
||||
#endif
|
||||
|
||||
module Simplex.Messaging.Server.Env.STM where
|
||||
|
||||
|
||||
@@ -126,7 +126,6 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
|
||||
confirmOrExit
|
||||
("WARNING: journal directory " <> storeMsgsJournalDir <> " will be exported to message log file " <> storeMsgsFilePath)
|
||||
"Journal not exported"
|
||||
-- TODO [postgres]
|
||||
ms <- newJournalMsgStore MQStoreCfg
|
||||
readQueueStore True (mkQueue ms) storeLogFile $ stmQueueStore ms
|
||||
exportMessages True ms storeMsgsFilePath False
|
||||
@@ -195,7 +194,13 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
|
||||
Right (ASType SQSPostgres SMSJournal) -> "store_queues set to `database`, update it to `memory` in INI file."
|
||||
Right (ASType SQSMemory _) -> "store_queues set to `memory`, start the server"
|
||||
Left e -> e <> ", configure storage correctly"
|
||||
SCDelete -> undefined -- TODO [postgres]
|
||||
SCDelete
|
||||
| not schemaExists -> do
|
||||
putStrLn $ "Schema " <> B.unpack schema <> " does not exist in PostrgreSQL database: " <> B.unpack connstr
|
||||
exitFailure
|
||||
| otherwise -> do
|
||||
putStrLn $ "Open database: psql " <> B.unpack connstr
|
||||
putStrLn $ "Delete schema: DROP SCHEMA " <> B.unpack schema <> " CASCADE;"
|
||||
where
|
||||
withIniFile a =
|
||||
doesFileExist iniFile >>= \case
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
{-# LANGUAGE DataKinds #-}
|
||||
{-# LANGUAGE DerivingStrategies #-}
|
||||
{-# LANGUAGE DuplicateRecordFields #-}
|
||||
{-# LANGUAGE FlexibleContexts #-}
|
||||
{-# LANGUAGE FlexibleInstances #-}
|
||||
{-# LANGUAGE GADTs #-}
|
||||
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
|
||||
@@ -11,6 +12,7 @@
|
||||
{-# LANGUAGE MultiParamTypeClasses #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE RankNTypes #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
{-# LANGUAGE StandaloneDeriving #-}
|
||||
{-# LANGUAGE TypeApplications #-}
|
||||
@@ -94,8 +96,18 @@ data JournalMsgStore s = JournalMsgStore
|
||||
}
|
||||
|
||||
data QStore (s :: QSType) where
|
||||
MQStore :: STMQueueStore (JournalQueue 'QSMemory) -> QStore 'QSMemory
|
||||
PQStore :: PostgresQueueStore (JournalQueue 'QSPostgres) -> QStore 'QSPostgres
|
||||
MQStore :: QStoreType 'QSMemory -> QStore 'QSMemory
|
||||
PQStore :: QStoreType 'QSPostgres -> QStore 'QSPostgres
|
||||
|
||||
type family QStoreType s where
|
||||
QStoreType 'QSMemory = STMQueueStore (JournalQueue 'QSMemory)
|
||||
QStoreType 'QSPostgres = PostgresQueueStore (JournalQueue 'QSPostgres)
|
||||
|
||||
withQS :: (QueueStoreClass (JournalQueue s) (QStoreType s) => QStoreType s -> r) -> QStore s -> r
|
||||
withQS f = \case
|
||||
MQStore st -> f st
|
||||
PQStore st -> f st
|
||||
{-# INLINE withQS #-}
|
||||
|
||||
stmQueueStore :: JournalMsgStore 'QSMemory -> STMQueueStore (JournalQueue 'QSMemory)
|
||||
stmQueueStore st = case queueStore_ st of
|
||||
@@ -279,68 +291,31 @@ instance QueueStoreClass (JournalQueue s) (QStore s) where
|
||||
MQStoreCfg -> MQStore <$> newQueueStore @(JournalQueue s) ()
|
||||
PQStoreCfg dbOpts confirmMigrations -> PQStore <$> newQueueStore @(JournalQueue s) (dbOpts, confirmMigrations)
|
||||
|
||||
loadedQueues = \case
|
||||
MQStore st -> loadedQueues st
|
||||
PQStore st -> loadedQueues st
|
||||
loadedQueues = withQS loadedQueues
|
||||
{-# INLINE loadedQueues #-}
|
||||
|
||||
queueCounts = \case
|
||||
-- TODO [postgres] combine these functions
|
||||
MQStore st -> queueCounts @(JournalQueue s) st
|
||||
PQStore st -> queueCounts @(JournalQueue s) st
|
||||
queueCounts = withQS (queueCounts @(JournalQueue s))
|
||||
{-# INLINE queueCounts #-}
|
||||
|
||||
addQueue_ = \case
|
||||
MQStore st -> addQueue_ st
|
||||
PQStore st -> addQueue_ st
|
||||
addQueue_ = withQS addQueue_
|
||||
{-# INLINE addQueue_ #-}
|
||||
|
||||
getQueue_ = \case
|
||||
MQStore st -> getQueue_ st
|
||||
PQStore st -> getQueue_ st
|
||||
getQueue_ = withQS getQueue_
|
||||
{-# INLINE getQueue_ #-}
|
||||
|
||||
secureQueue = \case
|
||||
MQStore st -> secureQueue st
|
||||
PQStore st -> secureQueue st
|
||||
secureQueue = withQS secureQueue
|
||||
{-# INLINE secureQueue #-}
|
||||
|
||||
addQueueNotifier = \case
|
||||
MQStore st -> addQueueNotifier st
|
||||
PQStore st -> addQueueNotifier st
|
||||
addQueueNotifier = withQS addQueueNotifier
|
||||
{-# INLINE addQueueNotifier #-}
|
||||
|
||||
deleteQueueNotifier = \case
|
||||
MQStore st -> deleteQueueNotifier st
|
||||
PQStore st -> deleteQueueNotifier st
|
||||
deleteQueueNotifier = withQS deleteQueueNotifier
|
||||
{-# INLINE deleteQueueNotifier #-}
|
||||
|
||||
suspendQueue = \case
|
||||
MQStore st -> suspendQueue st
|
||||
PQStore st -> suspendQueue st
|
||||
suspendQueue = withQS suspendQueue
|
||||
{-# INLINE suspendQueue #-}
|
||||
|
||||
blockQueue = \case
|
||||
MQStore st -> blockQueue st
|
||||
PQStore st -> blockQueue st
|
||||
blockQueue = withQS blockQueue
|
||||
{-# INLINE blockQueue #-}
|
||||
|
||||
unblockQueue = \case
|
||||
MQStore st -> unblockQueue st
|
||||
PQStore st -> unblockQueue st
|
||||
unblockQueue = withQS unblockQueue
|
||||
{-# INLINE unblockQueue #-}
|
||||
|
||||
updateQueueTime = \case
|
||||
MQStore st -> updateQueueTime st
|
||||
PQStore st -> updateQueueTime st
|
||||
updateQueueTime = withQS updateQueueTime
|
||||
{-# INLINE updateQueueTime #-}
|
||||
|
||||
deleteStoreQueue = \case
|
||||
MQStore st -> deleteStoreQueue st
|
||||
PQStore st -> deleteStoreQueue st
|
||||
deleteStoreQueue = withQS deleteStoreQueue
|
||||
{-# INLINE deleteStoreQueue #-}
|
||||
|
||||
|
||||
instance MsgStoreClass (JournalMsgStore s) where
|
||||
type StoreMonad (JournalMsgStore s) = StoreIO s
|
||||
type QueueStore (JournalMsgStore s) = QStore s
|
||||
|
||||
@@ -18,12 +18,19 @@
|
||||
{-# LANGUAGE TypeOperators #-}
|
||||
{-# OPTIONS_GHC -fno-warn-orphans #-}
|
||||
|
||||
module Simplex.Messaging.Server.QueueStore.Postgres where
|
||||
module Simplex.Messaging.Server.QueueStore.Postgres
|
||||
( PostgresQueueStore (..),
|
||||
batchInsertQueues,
|
||||
foldQueueRecs,
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Concurrent.STM
|
||||
import qualified Control.Exception as E
|
||||
import Control.Logger.Simple
|
||||
import Control.Monad
|
||||
import Control.Monad.Except
|
||||
import Control.Monad.IO.Class
|
||||
import Control.Monad.Trans.Except
|
||||
import Data.Bitraversable (bimapM)
|
||||
import Data.Functor (($>))
|
||||
import Data.Int (Int64)
|
||||
@@ -44,13 +51,14 @@ import Simplex.Messaging.Agent.Store.Shared (MigrationConfirmation)
|
||||
import Simplex.Messaging.Protocol
|
||||
import Simplex.Messaging.Server.QueueStore
|
||||
import Simplex.Messaging.Server.QueueStore.Postgres.Migrations (serverMigrations)
|
||||
import Simplex.Messaging.Server.QueueStore.STM (readQueueRecIO, setStatus, withQueueRec)
|
||||
import Simplex.Messaging.Server.QueueStore.STM (readQueueRecIO)
|
||||
import Simplex.Messaging.Server.QueueStore.Types
|
||||
import Simplex.Messaging.TMap (TMap)
|
||||
import qualified Simplex.Messaging.TMap as TM
|
||||
import Simplex.Messaging.Util (firstRow, ifM, tshow, ($>>), ($>>=), (<$$), (<$$>))
|
||||
import Simplex.Messaging.Util (firstRow, ifM, tshow, (<$$>))
|
||||
import System.Exit (exitFailure)
|
||||
import System.IO (hFlush, stdout)
|
||||
import UnliftIO.STM
|
||||
#if !defined(dbPostgres)
|
||||
import Simplex.Messaging.Agent.Store.Postgres.DB (blobFieldDecoder)
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
@@ -105,17 +113,14 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
|
||||
addQueue_ :: PostgresQueueStore q -> (RecipientId -> QueueRec -> IO q) -> RecipientId -> QueueRec -> IO (Either ErrorType q)
|
||||
addQueue_ st mkQ rId qr = do
|
||||
sq <- mkQ rId qr
|
||||
withQueueLock sq "addQueue_" $
|
||||
addDB $>> add sq
|
||||
withQueueLock sq "addQueue_" $ runExceptT $ do
|
||||
withDB "addQueue_" st $ \db ->
|
||||
E.try (insertQueueDB db rId qr) >>= bimapM handleDuplicate pure
|
||||
atomically $ TM.insert rId sq queues
|
||||
atomically $ TM.insert (senderId qr) rId senders
|
||||
pure sq
|
||||
where
|
||||
PostgresQueueStore {queues, senders} = st
|
||||
addDB =
|
||||
withDB "addQueue_" st $ \db ->
|
||||
E.try (insertQueueDB db rId qr) >>= bimapM handleDuplicate pure
|
||||
add sq = do
|
||||
atomically $ TM.insert rId sq queues
|
||||
atomically $ TM.insert (senderId qr) rId senders
|
||||
pure $ Right sq
|
||||
-- Not doing duplicate checks in maps as the probability of duplicates is very low.
|
||||
-- It needs to be reconsidered when IDs are supplied by the users.
|
||||
-- hasId = anyM [TM.memberIO rId queues, TM.memberIO senderId senders, hasNotifier]
|
||||
@@ -132,17 +137,18 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
|
||||
loadRcvQueue = loadQueue " WHERE q.recipient_id = ?" $ \_ -> pure ()
|
||||
loadSndQueue = loadQueue " WHERE q.sender_id = ?" $ \rId -> TM.insert qId rId senders
|
||||
loadNtfQueue = loadQueue " WHERE n.notifier_id = ?" $ \_ -> pure () -- do NOT cache ref - ntf subscriptions are rare
|
||||
loadQueue condition insertRef =
|
||||
loadQueueRec $>>= \(rId, qRec) -> do
|
||||
sq <- mkQ rId qRec
|
||||
atomically $
|
||||
-- checking the cache again for concurrent reads
|
||||
TM.lookup rId queues >>= \case
|
||||
Just sq' -> pure $ Right sq'
|
||||
Nothing -> do
|
||||
insertRef rId
|
||||
TM.insert rId sq queues
|
||||
pure $ Right sq
|
||||
loadQueue condition insertRef = runExceptT $ do
|
||||
(rId, qRec) <- loadQueueRec
|
||||
sq <- liftIO $ mkQ rId qRec
|
||||
atomically $
|
||||
-- checking the cache again for concurrent reads,
|
||||
-- use previously loaded queue if exists.
|
||||
TM.lookup rId queues >>= \case
|
||||
Just sq' -> pure sq'
|
||||
Nothing -> do
|
||||
insertRef rId
|
||||
TM.insert rId sq queues
|
||||
pure sq
|
||||
where
|
||||
loadQueueRec =
|
||||
withDB "getQueue_" st $ \db -> firstRow rowToQueueRec AUTH $
|
||||
@@ -150,125 +156,90 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
|
||||
|
||||
secureQueue :: PostgresQueueStore q -> q -> SndPublicAuthKey -> IO (Either ErrorType ())
|
||||
secureQueue st sq sKey =
|
||||
withQueueLock sq "secureQueue" $
|
||||
readQueueRecIO qr
|
||||
$>>= \q -> verify q
|
||||
$>> secureDB
|
||||
$>> secure q
|
||||
withQueueDB sq "secureQueue" $ \q -> do
|
||||
verify q
|
||||
withDB' "secureQueue" st $ \db ->
|
||||
DB.execute db "UPDATE msg_queues SET sender_key = ? WHERE recipient_id = ?" (sKey, recipientId sq)
|
||||
atomically $ writeTVar (queueRec sq) $ Just q {senderKey = Just sKey}
|
||||
where
|
||||
qr = queueRec sq
|
||||
verify q = pure $ case senderKey q of
|
||||
Just k | sKey /= k -> Left AUTH
|
||||
_ -> Right ()
|
||||
secureDB =
|
||||
withDB' "secureQueue" st $ \db ->
|
||||
DB.execute db "UPDATE msg_queues SET sender_key = ? WHERE recipient_id = ?" (sKey, recipientId sq)
|
||||
secure q = do
|
||||
atomically $ writeTVar qr $ Just q {senderKey = Just sKey}
|
||||
pure $ Right ()
|
||||
verify q = case senderKey q of
|
||||
Just k | sKey /= k -> throwE AUTH
|
||||
_ -> pure ()
|
||||
|
||||
addQueueNotifier :: PostgresQueueStore q -> q -> NtfCreds -> IO (Either ErrorType (Maybe NotifierId))
|
||||
addQueueNotifier st sq ntfCreds@NtfCreds {notifierId = nId, notifierKey, rcvNtfDhSecret} =
|
||||
withQueueLock sq "addQueueNotifier" $
|
||||
readQueueRecIO qr $>>= add
|
||||
withQueueDB sq "addQueueNotifier" $ \q ->
|
||||
ExceptT $ withLockMap (notifierLocks st) nId "addQueueNotifier" $
|
||||
ifM (TM.memberIO nId notifiers) (pure $ Left DUPLICATE_) $ runExceptT $ do
|
||||
withDB "addQueueNotifier" st $ \db ->
|
||||
E.try (insert db) >>= bimapM handleDuplicate pure
|
||||
nId_ <- forM (notifier q) $ \NtfCreds {notifierId} -> atomically (TM.delete notifierId notifiers) $> notifierId
|
||||
let !q' = q {notifier = Just ntfCreds}
|
||||
atomically $ writeTVar (queueRec sq) $ Just q'
|
||||
-- cache queue notifier ID – after notifier is added ntf server will likely subscribe
|
||||
atomically $ TM.insert nId rId notifiers
|
||||
pure nId_
|
||||
where
|
||||
PostgresQueueStore {notifiers} = st
|
||||
rId = recipientId sq
|
||||
qr = queueRec sq
|
||||
add q =
|
||||
withLockMap (notifierLocks st) nId "addQueueNotifier" $
|
||||
ifM (TM.memberIO nId notifiers) (pure $ Left DUPLICATE_) $
|
||||
addDB $>> do
|
||||
nId_ <- forM (notifier q) $ \NtfCreds {notifierId} -> atomically (TM.delete notifierId notifiers) $> notifierId
|
||||
let !q' = q {notifier = Just ntfCreds}
|
||||
atomically $ writeTVar qr $ Just q'
|
||||
-- cache queue notifier ID – after notifier is added ntf server will likely subscribe
|
||||
atomically $ TM.insert nId rId notifiers
|
||||
pure $ Right nId_
|
||||
addDB =
|
||||
withDB "addQueueNotifier" st $ \db ->
|
||||
E.try (insert db) >>= bimapM handleDuplicate pure
|
||||
where
|
||||
-- TODO [postgres] test how this query works with duplicate recipient_id (updates) and notifier_id (fails)
|
||||
insert db =
|
||||
DB.execute
|
||||
db
|
||||
[sql|
|
||||
INSERT INTO msg_notifiers (recipient_id, notifier_id, notifier_key, rcv_ntf_dh_secret)
|
||||
VALUES (?, ?, ?, ?)
|
||||
ON CONFLICT (recipient_id) DO UPDATE
|
||||
SET notifier_id = EXCLUDED.notifier_id,
|
||||
notifier_key = EXCLUDED.notifier_key,
|
||||
rcv_ntf_dh_secret = EXCLUDED.rcv_ntf_dh_secret
|
||||
|]
|
||||
(rId, nId, notifierKey, rcvNtfDhSecret)
|
||||
-- TODO [postgres] test how this query works with duplicate recipient_id (updates) and notifier_id (fails)
|
||||
insert db =
|
||||
DB.execute
|
||||
db
|
||||
[sql|
|
||||
INSERT INTO msg_notifiers (recipient_id, notifier_id, notifier_key, rcv_ntf_dh_secret)
|
||||
VALUES (?, ?, ?, ?)
|
||||
ON CONFLICT (recipient_id) DO UPDATE
|
||||
SET notifier_id = EXCLUDED.notifier_id,
|
||||
notifier_key = EXCLUDED.notifier_key,
|
||||
rcv_ntf_dh_secret = EXCLUDED.rcv_ntf_dh_secret
|
||||
|]
|
||||
(rId, nId, notifierKey, rcvNtfDhSecret)
|
||||
|
||||
deleteQueueNotifier :: PostgresQueueStore q -> q -> IO (Either ErrorType (Maybe NotifierId))
|
||||
deleteQueueNotifier st sq =
|
||||
withQueueLock sq "deleteQueueNotifier" $
|
||||
readQueueRecIO qr $>>= fmap sequence . delete
|
||||
where
|
||||
qr = queueRec sq
|
||||
delete :: QueueRec -> IO (Maybe (Either ErrorType NotifierId))
|
||||
delete q = forM (notifier q) $ \NtfCreds {notifierId = nId} ->
|
||||
withLockMap (notifierLocks st) nId "deleteQueueNotifier" $ do
|
||||
deleteDB nId $>> do
|
||||
atomically $ TM.delete nId $ notifiers st
|
||||
atomically $ writeTVar qr $! Just q {notifier = Nothing}
|
||||
pure $ Right nId
|
||||
deleteDB nId =
|
||||
withDB' "deleteQueueNotifier" st $ \db ->
|
||||
DB.execute db "DELETE FROM msg_notifiers WHERE notifier_id = ?" (Only nId)
|
||||
withQueueDB sq "deleteQueueNotifier" $ \q ->
|
||||
ExceptT $ fmap sequence $ forM (notifier q) $ \NtfCreds {notifierId = nId} ->
|
||||
withLockMap (notifierLocks st) nId "deleteQueueNotifier" $ runExceptT $ do
|
||||
withDB' "deleteQueueNotifier" st $ \db ->
|
||||
DB.execute db "DELETE FROM msg_notifiers WHERE notifier_id = ?" (Only nId)
|
||||
atomically $ TM.delete nId $ notifiers st
|
||||
atomically $ writeTVar (queueRec sq) $ Just q {notifier = Nothing}
|
||||
pure nId
|
||||
|
||||
-- TODO [postgres] only update STM on DB success
|
||||
suspendQueue :: PostgresQueueStore q -> q -> IO (Either ErrorType ())
|
||||
suspendQueue st sq =
|
||||
setStatus (queueRec sq) EntityOff
|
||||
$>> setStatusDB "suspendQueue" st (recipientId sq) EntityOff
|
||||
suspendQueue st sq = setStatusDB "suspendQueue" st sq EntityOff
|
||||
|
||||
-- TODO [postgres] only update STM on DB success
|
||||
blockQueue :: PostgresQueueStore q -> q -> BlockingInfo -> IO (Either ErrorType ())
|
||||
blockQueue st sq info =
|
||||
setStatus (queueRec sq) (EntityBlocked info)
|
||||
$>> setStatusDB "blockQueue" st (recipientId sq) (EntityBlocked info)
|
||||
blockQueue st sq info = setStatusDB "blockQueue" st sq (EntityBlocked info)
|
||||
|
||||
-- TODO [postgres] only update STM on DB success
|
||||
unblockQueue :: PostgresQueueStore q -> q -> IO (Either ErrorType ())
|
||||
unblockQueue st sq =
|
||||
setStatus (queueRec sq) EntityActive
|
||||
$>> setStatusDB "unblockQueue" st (recipientId sq) EntityActive
|
||||
|
||||
-- TODO [postgres] only update STM on DB success
|
||||
unblockQueue st sq = setStatusDB "unblockQueue" st sq EntityActive
|
||||
|
||||
updateQueueTime :: PostgresQueueStore q -> q -> RoundedSystemTime -> IO (Either ErrorType QueueRec)
|
||||
updateQueueTime st sq t = withQueueRec qr update $>>= updateDB
|
||||
where
|
||||
qr = queueRec sq
|
||||
update q@QueueRec {updatedAt}
|
||||
| updatedAt == Just t = pure (q, False)
|
||||
| otherwise =
|
||||
let !q' = q {updatedAt = Just t}
|
||||
in (writeTVar qr $! Just q') $> (q', True)
|
||||
updateDB (q, changed)
|
||||
| changed = q <$$ withDB' "updateQueueTime" st (\db -> DB.execute db "UPDATE msg_queues SET updated_at = ? WHERE recipient_id = ?" (t, Binary $ unEntityId $ recipientId sq))
|
||||
| otherwise = pure $ Right q
|
||||
updateQueueTime st sq t =
|
||||
withQueueDB sq "updateQueueTime" $ \q@QueueRec {updatedAt} ->
|
||||
if updatedAt == Just t
|
||||
then pure q
|
||||
else do
|
||||
withDB' "updateQueueTime" st $ \db ->
|
||||
DB.execute db "UPDATE msg_queues SET updated_at = ? WHERE recipient_id = ?" (t, recipientId sq)
|
||||
let !q' = q {updatedAt = Just t}
|
||||
atomically $ writeTVar (queueRec sq) $ Just q'
|
||||
pure q'
|
||||
|
||||
-- TODO [postgres] only update STM on DB success
|
||||
-- this method is called from JournalMsgStore deleteQueue that already locks the queue
|
||||
deleteStoreQueue :: PostgresQueueStore q -> q -> IO (Either ErrorType (QueueRec, Maybe (MsgQueue q)))
|
||||
deleteStoreQueue st sq =
|
||||
withQueueRec qr delete
|
||||
$>>= \q -> deleteDB
|
||||
>>= mapM (\_ -> (q,) <$> atomically (swapTVar (msgQueue sq) Nothing))
|
||||
deleteStoreQueue st sq = runExceptT $ do
|
||||
q <- ExceptT $ readQueueRecIO qr
|
||||
withDB' "deleteStoreQueue" st $ \db ->
|
||||
DB.execute db "DELETE FROM msg_queues WHERE recipient_id = ?" (Only $ Binary $ unEntityId $ recipientId sq)
|
||||
atomically $ writeTVar qr Nothing
|
||||
atomically $ TM.delete (senderId q) $ senders st
|
||||
forM_ (notifier q) $ \NtfCreds {notifierId} -> atomically $ TM.delete notifierId $ notifiers st
|
||||
(q,) <$> atomically (swapTVar (msgQueue sq) Nothing)
|
||||
where
|
||||
qr = queueRec sq
|
||||
delete q = do
|
||||
writeTVar qr Nothing
|
||||
TM.delete (senderId q) $ senders st
|
||||
-- TODO [postgres] probably we should delete it?
|
||||
-- forM_ (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId $ notifiers st
|
||||
pure q
|
||||
deleteDB =
|
||||
withDB' "deleteStoreQueue" st $ \db ->
|
||||
DB.execute db "DELETE FROM msg_queues WHERE recipient_id = ?" (Only $ Binary $ unEntityId $ recipientId sq)
|
||||
|
||||
insertQueueDB :: DB.Connection -> RecipientId -> QueueRec -> IO ()
|
||||
insertQueueDB db rId QueueRec {recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, status, updatedAt} = do
|
||||
@@ -342,23 +313,31 @@ rowToQueueRec ((rId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure,
|
||||
let notifier = NtfCreds <$> notifierId_ <*> notifierKey_ <*> rcvNtfDhSecret_
|
||||
in (rId, QueueRec {recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, status, updatedAt})
|
||||
|
||||
setStatusDB :: String -> PostgresQueueStore q -> RecipientId -> ServerEntityStatus -> IO (Either ErrorType ())
|
||||
setStatusDB name st rId status =
|
||||
withDB' name st $ \db ->
|
||||
DB.execute db "UPDATE msg_queues SET status = ? WHERE recipient_id = ?" (status, rId)
|
||||
setStatusDB :: StoreQueueClass q => String -> PostgresQueueStore q -> q -> ServerEntityStatus -> IO (Either ErrorType ())
|
||||
setStatusDB op st sq status =
|
||||
withQueueDB sq op $ \q -> do
|
||||
withDB' op st $ \db ->
|
||||
DB.execute db "UPDATE msg_queues SET status = ? WHERE recipient_id = ?" (status, recipientId sq)
|
||||
atomically $ writeTVar (queueRec sq) $ Just q {status}
|
||||
|
||||
withDB' :: String -> PostgresQueueStore q -> (DB.Connection -> IO a) -> IO (Either ErrorType a)
|
||||
withDB' name st' action = withDB name st' $ fmap Right . action
|
||||
withQueueDB :: StoreQueueClass q => q -> String -> (QueueRec -> ExceptT ErrorType IO a) -> IO (Either ErrorType a)
|
||||
withQueueDB sq op action =
|
||||
withQueueLock sq op $ runExceptT $ do
|
||||
q <- ExceptT $ readQueueRecIO $ queueRec sq
|
||||
action q
|
||||
|
||||
withDB' :: String -> PostgresQueueStore q -> (DB.Connection -> IO a) -> ExceptT ErrorType IO a
|
||||
withDB' op st' action = withDB op st' $ fmap Right . action
|
||||
|
||||
-- TODO [postgres] possibly, use with connection if queries in addQueue_ are combined
|
||||
withDB :: forall a q. String -> PostgresQueueStore q -> (DB.Connection -> IO (Either ErrorType a)) -> IO (Either ErrorType a)
|
||||
withDB name st' action =
|
||||
E.try (withTransaction (dbStore st') action) >>= either logErr pure
|
||||
withDB :: forall a q. String -> PostgresQueueStore q -> (DB.Connection -> IO (Either ErrorType a)) -> ExceptT ErrorType IO a
|
||||
withDB op st' action =
|
||||
ExceptT $ E.try (withTransaction (dbStore st') action) >>= either logErr pure
|
||||
where
|
||||
logErr :: E.SomeException -> IO (Either ErrorType a)
|
||||
logErr e = logError ("STORE: " <> T.pack err) $> Left (STORE err)
|
||||
where
|
||||
err = name <> ", withLog, " <> show e
|
||||
err = op <> ", withLog, " <> show e
|
||||
|
||||
handleDuplicate :: SqlError -> IO ErrorType
|
||||
handleDuplicate e = case constraintViolation e of
|
||||
|
||||
Reference in New Issue
Block a user