From 80a070a8ea83da53174cd94b3438a6b0532b3670 Mon Sep 17 00:00:00 2001 From: Evgeny Date: Wed, 26 Feb 2025 11:43:08 +0000 Subject: [PATCH] smp server: update STM cache only after PostgreSQL update (#1470) * refactor journal store * update postgres store methods to update STM cache only after DB update * todos * drop Co-authored-by: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com> * fix GHC 8.10.7 --------- Co-authored-by: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com> --- src/Simplex/Messaging/Server.hs | 2 - src/Simplex/Messaging/Server/CLI.hs | 23 +- src/Simplex/Messaging/Server/Env/STM.hs | 4 + src/Simplex/Messaging/Server/Main.hs | 9 +- .../Messaging/Server/MsgStore/Journal.hs | 77 ++---- .../Messaging/Server/QueueStore/Postgres.hs | 243 ++++++++---------- 6 files changed, 162 insertions(+), 196 deletions(-) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 67e8e227e..b0edfa2dd 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -1829,8 +1829,6 @@ processServerMessages StartOptions {skipWarnings} = do Just f -> ifM (doesFileExist f) (Just <$> importMessages False ms f old_ skipWarnings) (pure Nothing) Nothing -> pure Nothing AMS _ SMSJournal ms -> processJournalMessages old_ expire ms - -- TODO [postgres] is it needed? - -- AMS (SType SQSPostgres SMSJournal) ms -> processJournalMessages old_ expire ms processJournalMessages :: forall s. Maybe Int64 -> Bool -> JournalMsgStore s -> IO (Maybe MessageStats) processJournalMessages old_ expire ms | expire = Just <$> case old_ of diff --git a/src/Simplex/Messaging/Server/CLI.hs b/src/Simplex/Messaging/Server/CLI.hs index 6dd7d184a..3e52d2001 100644 --- a/src/Simplex/Messaging/Server/CLI.hs +++ b/src/Simplex/Messaging/Server/CLI.hs @@ -27,6 +27,7 @@ import qualified Data.X509.File as XF import Data.X509.Validation (Fingerprint (..)) import Network.Socket (HostName, ServiceName) import Options.Applicative +import Simplex.Messaging.Agent.Store.Postgres.Common (DBOpts (..)) import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol (ProtoServerWithAuth (..), ProtocolServer (..), ProtocolTypeI) import Simplex.Messaging.Server.Env.STM (AServerStoreCfg (..), ServerStoreCfg (..), StorePaths (..)) @@ -297,17 +298,21 @@ printServerConfig transports logFile = do putStrLn $ case logFile of Just f -> "Store log: " <> f _ -> "Store log disabled." - forM_ transports $ \(p, ATransport t, addHTTP) -> do - let descr = p <> " (" <> transportName t <> ")..." - putStrLn $ "Serving SMP protocol on port " <> descr - when addHTTP $ putStrLn $ "Serving static site on port " <> descr + printServerTransports transports + +printServerTransports :: [(ServiceName, ATransport, AddHTTP)] -> IO () +printServerTransports = mapM_ $ \(p, ATransport t, addHTTP) -> do + let descr = p <> " (" <> transportName t <> ")..." + putStrLn $ "Serving SMP protocol on port " <> descr + when addHTTP $ putStrLn $ "Serving static site on port " <> descr --- TODO [postgres] printSMPServerConfig :: [(ServiceName, ATransport, AddHTTP)] -> AServerStoreCfg -> IO () -printSMPServerConfig transports (ASSCfg _ _ cfg) = printServerConfig transports $ case cfg of - SSCMemory sp_ -> (\StorePaths {storeLogFile} -> storeLogFile) <$> sp_ - SSCMemoryJournal {storeLogFile} -> Just storeLogFile - SSCDatabaseJournal {} -> Just "postgres database" +printSMPServerConfig transports (ASSCfg _ _ cfg) = case cfg of + SSCMemory sp_ -> printServerConfig transports $ (\StorePaths {storeLogFile} -> storeLogFile) <$> sp_ + SSCMemoryJournal {storeLogFile} -> printServerConfig transports $ Just storeLogFile + SSCDatabaseJournal {storeDBOpts = DBOpts {connstr, schema}} -> do + B.putStrLn $ "PostgreSQL database: " <> connstr <> ", schema: " <> schema + printServerTransports transports deleteDirIfExists :: FilePath -> IO () deleteDirIfExists path = whenM (doesDirectoryExist path) $ removeDirectoryRecursive path diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index b09e0549c..da8c22b1a 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE CPP #-} {-# LANGUAGE AllowAmbiguousTypes #-} {-# LANGUAGE DataKinds #-} {-# LANGUAGE DuplicateRecordFields #-} @@ -12,6 +13,9 @@ {-# LANGUAGE TypeFamilies #-} {-# LANGUAGE TypeApplications #-} {-# LANGUAGE TypeOperators #-} +#if __GLASGOW_HASKELL__ == 810 +{-# LANGUAGE UndecidableInstances #-} +#endif module Simplex.Messaging.Server.Env.STM where diff --git a/src/Simplex/Messaging/Server/Main.hs b/src/Simplex/Messaging/Server/Main.hs index 4be4594d4..388a6a069 100644 --- a/src/Simplex/Messaging/Server/Main.hs +++ b/src/Simplex/Messaging/Server/Main.hs @@ -126,7 +126,6 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = confirmOrExit ("WARNING: journal directory " <> storeMsgsJournalDir <> " will be exported to message log file " <> storeMsgsFilePath) "Journal not exported" - -- TODO [postgres] ms <- newJournalMsgStore MQStoreCfg readQueueStore True (mkQueue ms) storeLogFile $ stmQueueStore ms exportMessages True ms storeMsgsFilePath False @@ -195,7 +194,13 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = Right (ASType SQSPostgres SMSJournal) -> "store_queues set to `database`, update it to `memory` in INI file." Right (ASType SQSMemory _) -> "store_queues set to `memory`, start the server" Left e -> e <> ", configure storage correctly" - SCDelete -> undefined -- TODO [postgres] + SCDelete + | not schemaExists -> do + putStrLn $ "Schema " <> B.unpack schema <> " does not exist in PostrgreSQL database: " <> B.unpack connstr + exitFailure + | otherwise -> do + putStrLn $ "Open database: psql " <> B.unpack connstr + putStrLn $ "Delete schema: DROP SCHEMA " <> B.unpack schema <> " CASCADE;" where withIniFile a = doesFileExist iniFile >>= \case diff --git a/src/Simplex/Messaging/Server/MsgStore/Journal.hs b/src/Simplex/Messaging/Server/MsgStore/Journal.hs index 8ad63e162..32fefb386 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Journal.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Journal.hs @@ -2,6 +2,7 @@ {-# LANGUAGE DataKinds #-} {-# LANGUAGE DerivingStrategies #-} {-# LANGUAGE DuplicateRecordFields #-} +{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} @@ -11,6 +12,7 @@ {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE StandaloneDeriving #-} {-# LANGUAGE TypeApplications #-} @@ -94,8 +96,18 @@ data JournalMsgStore s = JournalMsgStore } data QStore (s :: QSType) where - MQStore :: STMQueueStore (JournalQueue 'QSMemory) -> QStore 'QSMemory - PQStore :: PostgresQueueStore (JournalQueue 'QSPostgres) -> QStore 'QSPostgres + MQStore :: QStoreType 'QSMemory -> QStore 'QSMemory + PQStore :: QStoreType 'QSPostgres -> QStore 'QSPostgres + +type family QStoreType s where + QStoreType 'QSMemory = STMQueueStore (JournalQueue 'QSMemory) + QStoreType 'QSPostgres = PostgresQueueStore (JournalQueue 'QSPostgres) + +withQS :: (QueueStoreClass (JournalQueue s) (QStoreType s) => QStoreType s -> r) -> QStore s -> r +withQS f = \case + MQStore st -> f st + PQStore st -> f st +{-# INLINE withQS #-} stmQueueStore :: JournalMsgStore 'QSMemory -> STMQueueStore (JournalQueue 'QSMemory) stmQueueStore st = case queueStore_ st of @@ -279,68 +291,31 @@ instance QueueStoreClass (JournalQueue s) (QStore s) where MQStoreCfg -> MQStore <$> newQueueStore @(JournalQueue s) () PQStoreCfg dbOpts confirmMigrations -> PQStore <$> newQueueStore @(JournalQueue s) (dbOpts, confirmMigrations) - loadedQueues = \case - MQStore st -> loadedQueues st - PQStore st -> loadedQueues st + loadedQueues = withQS loadedQueues {-# INLINE loadedQueues #-} - - queueCounts = \case - -- TODO [postgres] combine these functions - MQStore st -> queueCounts @(JournalQueue s) st - PQStore st -> queueCounts @(JournalQueue s) st + queueCounts = withQS (queueCounts @(JournalQueue s)) {-# INLINE queueCounts #-} - - addQueue_ = \case - MQStore st -> addQueue_ st - PQStore st -> addQueue_ st + addQueue_ = withQS addQueue_ {-# INLINE addQueue_ #-} - - getQueue_ = \case - MQStore st -> getQueue_ st - PQStore st -> getQueue_ st + getQueue_ = withQS getQueue_ {-# INLINE getQueue_ #-} - - secureQueue = \case - MQStore st -> secureQueue st - PQStore st -> secureQueue st + secureQueue = withQS secureQueue {-# INLINE secureQueue #-} - - addQueueNotifier = \case - MQStore st -> addQueueNotifier st - PQStore st -> addQueueNotifier st + addQueueNotifier = withQS addQueueNotifier {-# INLINE addQueueNotifier #-} - - deleteQueueNotifier = \case - MQStore st -> deleteQueueNotifier st - PQStore st -> deleteQueueNotifier st + deleteQueueNotifier = withQS deleteQueueNotifier {-# INLINE deleteQueueNotifier #-} - - suspendQueue = \case - MQStore st -> suspendQueue st - PQStore st -> suspendQueue st + suspendQueue = withQS suspendQueue {-# INLINE suspendQueue #-} - - blockQueue = \case - MQStore st -> blockQueue st - PQStore st -> blockQueue st + blockQueue = withQS blockQueue {-# INLINE blockQueue #-} - - unblockQueue = \case - MQStore st -> unblockQueue st - PQStore st -> unblockQueue st + unblockQueue = withQS unblockQueue {-# INLINE unblockQueue #-} - - updateQueueTime = \case - MQStore st -> updateQueueTime st - PQStore st -> updateQueueTime st + updateQueueTime = withQS updateQueueTime {-# INLINE updateQueueTime #-} - - deleteStoreQueue = \case - MQStore st -> deleteStoreQueue st - PQStore st -> deleteStoreQueue st + deleteStoreQueue = withQS deleteStoreQueue {-# INLINE deleteStoreQueue #-} - instance MsgStoreClass (JournalMsgStore s) where type StoreMonad (JournalMsgStore s) = StoreIO s type QueueStore (JournalMsgStore s) = QStore s diff --git a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs index 376858ce5..8988cb3f3 100644 --- a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs +++ b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs @@ -18,12 +18,19 @@ {-# LANGUAGE TypeOperators #-} {-# OPTIONS_GHC -fno-warn-orphans #-} -module Simplex.Messaging.Server.QueueStore.Postgres where +module Simplex.Messaging.Server.QueueStore.Postgres + ( PostgresQueueStore (..), + batchInsertQueues, + foldQueueRecs, + ) +where -import Control.Concurrent.STM import qualified Control.Exception as E import Control.Logger.Simple import Control.Monad +import Control.Monad.Except +import Control.Monad.IO.Class +import Control.Monad.Trans.Except import Data.Bitraversable (bimapM) import Data.Functor (($>)) import Data.Int (Int64) @@ -44,13 +51,14 @@ import Simplex.Messaging.Agent.Store.Shared (MigrationConfirmation) import Simplex.Messaging.Protocol import Simplex.Messaging.Server.QueueStore import Simplex.Messaging.Server.QueueStore.Postgres.Migrations (serverMigrations) -import Simplex.Messaging.Server.QueueStore.STM (readQueueRecIO, setStatus, withQueueRec) +import Simplex.Messaging.Server.QueueStore.STM (readQueueRecIO) import Simplex.Messaging.Server.QueueStore.Types import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM -import Simplex.Messaging.Util (firstRow, ifM, tshow, ($>>), ($>>=), (<$$), (<$$>)) +import Simplex.Messaging.Util (firstRow, ifM, tshow, (<$$>)) import System.Exit (exitFailure) import System.IO (hFlush, stdout) +import UnliftIO.STM #if !defined(dbPostgres) import Simplex.Messaging.Agent.Store.Postgres.DB (blobFieldDecoder) import qualified Simplex.Messaging.Crypto as C @@ -105,17 +113,14 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where addQueue_ :: PostgresQueueStore q -> (RecipientId -> QueueRec -> IO q) -> RecipientId -> QueueRec -> IO (Either ErrorType q) addQueue_ st mkQ rId qr = do sq <- mkQ rId qr - withQueueLock sq "addQueue_" $ - addDB $>> add sq + withQueueLock sq "addQueue_" $ runExceptT $ do + withDB "addQueue_" st $ \db -> + E.try (insertQueueDB db rId qr) >>= bimapM handleDuplicate pure + atomically $ TM.insert rId sq queues + atomically $ TM.insert (senderId qr) rId senders + pure sq where PostgresQueueStore {queues, senders} = st - addDB = - withDB "addQueue_" st $ \db -> - E.try (insertQueueDB db rId qr) >>= bimapM handleDuplicate pure - add sq = do - atomically $ TM.insert rId sq queues - atomically $ TM.insert (senderId qr) rId senders - pure $ Right sq -- Not doing duplicate checks in maps as the probability of duplicates is very low. -- It needs to be reconsidered when IDs are supplied by the users. -- hasId = anyM [TM.memberIO rId queues, TM.memberIO senderId senders, hasNotifier] @@ -132,17 +137,18 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where loadRcvQueue = loadQueue " WHERE q.recipient_id = ?" $ \_ -> pure () loadSndQueue = loadQueue " WHERE q.sender_id = ?" $ \rId -> TM.insert qId rId senders loadNtfQueue = loadQueue " WHERE n.notifier_id = ?" $ \_ -> pure () -- do NOT cache ref - ntf subscriptions are rare - loadQueue condition insertRef = - loadQueueRec $>>= \(rId, qRec) -> do - sq <- mkQ rId qRec - atomically $ - -- checking the cache again for concurrent reads - TM.lookup rId queues >>= \case - Just sq' -> pure $ Right sq' - Nothing -> do - insertRef rId - TM.insert rId sq queues - pure $ Right sq + loadQueue condition insertRef = runExceptT $ do + (rId, qRec) <- loadQueueRec + sq <- liftIO $ mkQ rId qRec + atomically $ + -- checking the cache again for concurrent reads, + -- use previously loaded queue if exists. + TM.lookup rId queues >>= \case + Just sq' -> pure sq' + Nothing -> do + insertRef rId + TM.insert rId sq queues + pure sq where loadQueueRec = withDB "getQueue_" st $ \db -> firstRow rowToQueueRec AUTH $ @@ -150,125 +156,90 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where secureQueue :: PostgresQueueStore q -> q -> SndPublicAuthKey -> IO (Either ErrorType ()) secureQueue st sq sKey = - withQueueLock sq "secureQueue" $ - readQueueRecIO qr - $>>= \q -> verify q - $>> secureDB - $>> secure q + withQueueDB sq "secureQueue" $ \q -> do + verify q + withDB' "secureQueue" st $ \db -> + DB.execute db "UPDATE msg_queues SET sender_key = ? WHERE recipient_id = ?" (sKey, recipientId sq) + atomically $ writeTVar (queueRec sq) $ Just q {senderKey = Just sKey} where - qr = queueRec sq - verify q = pure $ case senderKey q of - Just k | sKey /= k -> Left AUTH - _ -> Right () - secureDB = - withDB' "secureQueue" st $ \db -> - DB.execute db "UPDATE msg_queues SET sender_key = ? WHERE recipient_id = ?" (sKey, recipientId sq) - secure q = do - atomically $ writeTVar qr $ Just q {senderKey = Just sKey} - pure $ Right () + verify q = case senderKey q of + Just k | sKey /= k -> throwE AUTH + _ -> pure () addQueueNotifier :: PostgresQueueStore q -> q -> NtfCreds -> IO (Either ErrorType (Maybe NotifierId)) addQueueNotifier st sq ntfCreds@NtfCreds {notifierId = nId, notifierKey, rcvNtfDhSecret} = - withQueueLock sq "addQueueNotifier" $ - readQueueRecIO qr $>>= add + withQueueDB sq "addQueueNotifier" $ \q -> + ExceptT $ withLockMap (notifierLocks st) nId "addQueueNotifier" $ + ifM (TM.memberIO nId notifiers) (pure $ Left DUPLICATE_) $ runExceptT $ do + withDB "addQueueNotifier" st $ \db -> + E.try (insert db) >>= bimapM handleDuplicate pure + nId_ <- forM (notifier q) $ \NtfCreds {notifierId} -> atomically (TM.delete notifierId notifiers) $> notifierId + let !q' = q {notifier = Just ntfCreds} + atomically $ writeTVar (queueRec sq) $ Just q' + -- cache queue notifier ID – after notifier is added ntf server will likely subscribe + atomically $ TM.insert nId rId notifiers + pure nId_ where PostgresQueueStore {notifiers} = st rId = recipientId sq - qr = queueRec sq - add q = - withLockMap (notifierLocks st) nId "addQueueNotifier" $ - ifM (TM.memberIO nId notifiers) (pure $ Left DUPLICATE_) $ - addDB $>> do - nId_ <- forM (notifier q) $ \NtfCreds {notifierId} -> atomically (TM.delete notifierId notifiers) $> notifierId - let !q' = q {notifier = Just ntfCreds} - atomically $ writeTVar qr $ Just q' - -- cache queue notifier ID – after notifier is added ntf server will likely subscribe - atomically $ TM.insert nId rId notifiers - pure $ Right nId_ - addDB = - withDB "addQueueNotifier" st $ \db -> - E.try (insert db) >>= bimapM handleDuplicate pure - where - -- TODO [postgres] test how this query works with duplicate recipient_id (updates) and notifier_id (fails) - insert db = - DB.execute - db - [sql| - INSERT INTO msg_notifiers (recipient_id, notifier_id, notifier_key, rcv_ntf_dh_secret) - VALUES (?, ?, ?, ?) - ON CONFLICT (recipient_id) DO UPDATE - SET notifier_id = EXCLUDED.notifier_id, - notifier_key = EXCLUDED.notifier_key, - rcv_ntf_dh_secret = EXCLUDED.rcv_ntf_dh_secret - |] - (rId, nId, notifierKey, rcvNtfDhSecret) + -- TODO [postgres] test how this query works with duplicate recipient_id (updates) and notifier_id (fails) + insert db = + DB.execute + db + [sql| + INSERT INTO msg_notifiers (recipient_id, notifier_id, notifier_key, rcv_ntf_dh_secret) + VALUES (?, ?, ?, ?) + ON CONFLICT (recipient_id) DO UPDATE + SET notifier_id = EXCLUDED.notifier_id, + notifier_key = EXCLUDED.notifier_key, + rcv_ntf_dh_secret = EXCLUDED.rcv_ntf_dh_secret + |] + (rId, nId, notifierKey, rcvNtfDhSecret) deleteQueueNotifier :: PostgresQueueStore q -> q -> IO (Either ErrorType (Maybe NotifierId)) deleteQueueNotifier st sq = - withQueueLock sq "deleteQueueNotifier" $ - readQueueRecIO qr $>>= fmap sequence . delete - where - qr = queueRec sq - delete :: QueueRec -> IO (Maybe (Either ErrorType NotifierId)) - delete q = forM (notifier q) $ \NtfCreds {notifierId = nId} -> - withLockMap (notifierLocks st) nId "deleteQueueNotifier" $ do - deleteDB nId $>> do - atomically $ TM.delete nId $ notifiers st - atomically $ writeTVar qr $! Just q {notifier = Nothing} - pure $ Right nId - deleteDB nId = - withDB' "deleteQueueNotifier" st $ \db -> - DB.execute db "DELETE FROM msg_notifiers WHERE notifier_id = ?" (Only nId) + withQueueDB sq "deleteQueueNotifier" $ \q -> + ExceptT $ fmap sequence $ forM (notifier q) $ \NtfCreds {notifierId = nId} -> + withLockMap (notifierLocks st) nId "deleteQueueNotifier" $ runExceptT $ do + withDB' "deleteQueueNotifier" st $ \db -> + DB.execute db "DELETE FROM msg_notifiers WHERE notifier_id = ?" (Only nId) + atomically $ TM.delete nId $ notifiers st + atomically $ writeTVar (queueRec sq) $ Just q {notifier = Nothing} + pure nId - -- TODO [postgres] only update STM on DB success suspendQueue :: PostgresQueueStore q -> q -> IO (Either ErrorType ()) - suspendQueue st sq = - setStatus (queueRec sq) EntityOff - $>> setStatusDB "suspendQueue" st (recipientId sq) EntityOff + suspendQueue st sq = setStatusDB "suspendQueue" st sq EntityOff - -- TODO [postgres] only update STM on DB success blockQueue :: PostgresQueueStore q -> q -> BlockingInfo -> IO (Either ErrorType ()) - blockQueue st sq info = - setStatus (queueRec sq) (EntityBlocked info) - $>> setStatusDB "blockQueue" st (recipientId sq) (EntityBlocked info) + blockQueue st sq info = setStatusDB "blockQueue" st sq (EntityBlocked info) - -- TODO [postgres] only update STM on DB success unblockQueue :: PostgresQueueStore q -> q -> IO (Either ErrorType ()) - unblockQueue st sq = - setStatus (queueRec sq) EntityActive - $>> setStatusDB "unblockQueue" st (recipientId sq) EntityActive - - -- TODO [postgres] only update STM on DB success + unblockQueue st sq = setStatusDB "unblockQueue" st sq EntityActive + updateQueueTime :: PostgresQueueStore q -> q -> RoundedSystemTime -> IO (Either ErrorType QueueRec) - updateQueueTime st sq t = withQueueRec qr update $>>= updateDB - where - qr = queueRec sq - update q@QueueRec {updatedAt} - | updatedAt == Just t = pure (q, False) - | otherwise = - let !q' = q {updatedAt = Just t} - in (writeTVar qr $! Just q') $> (q', True) - updateDB (q, changed) - | changed = q <$$ withDB' "updateQueueTime" st (\db -> DB.execute db "UPDATE msg_queues SET updated_at = ? WHERE recipient_id = ?" (t, Binary $ unEntityId $ recipientId sq)) - | otherwise = pure $ Right q + updateQueueTime st sq t = + withQueueDB sq "updateQueueTime" $ \q@QueueRec {updatedAt} -> + if updatedAt == Just t + then pure q + else do + withDB' "updateQueueTime" st $ \db -> + DB.execute db "UPDATE msg_queues SET updated_at = ? WHERE recipient_id = ?" (t, recipientId sq) + let !q' = q {updatedAt = Just t} + atomically $ writeTVar (queueRec sq) $ Just q' + pure q' - -- TODO [postgres] only update STM on DB success + -- this method is called from JournalMsgStore deleteQueue that already locks the queue deleteStoreQueue :: PostgresQueueStore q -> q -> IO (Either ErrorType (QueueRec, Maybe (MsgQueue q))) - deleteStoreQueue st sq = - withQueueRec qr delete - $>>= \q -> deleteDB - >>= mapM (\_ -> (q,) <$> atomically (swapTVar (msgQueue sq) Nothing)) + deleteStoreQueue st sq = runExceptT $ do + q <- ExceptT $ readQueueRecIO qr + withDB' "deleteStoreQueue" st $ \db -> + DB.execute db "DELETE FROM msg_queues WHERE recipient_id = ?" (Only $ Binary $ unEntityId $ recipientId sq) + atomically $ writeTVar qr Nothing + atomically $ TM.delete (senderId q) $ senders st + forM_ (notifier q) $ \NtfCreds {notifierId} -> atomically $ TM.delete notifierId $ notifiers st + (q,) <$> atomically (swapTVar (msgQueue sq) Nothing) where qr = queueRec sq - delete q = do - writeTVar qr Nothing - TM.delete (senderId q) $ senders st - -- TODO [postgres] probably we should delete it? - -- forM_ (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId $ notifiers st - pure q - deleteDB = - withDB' "deleteStoreQueue" st $ \db -> - DB.execute db "DELETE FROM msg_queues WHERE recipient_id = ?" (Only $ Binary $ unEntityId $ recipientId sq) insertQueueDB :: DB.Connection -> RecipientId -> QueueRec -> IO () insertQueueDB db rId QueueRec {recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, status, updatedAt} = do @@ -342,23 +313,31 @@ rowToQueueRec ((rId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, let notifier = NtfCreds <$> notifierId_ <*> notifierKey_ <*> rcvNtfDhSecret_ in (rId, QueueRec {recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, status, updatedAt}) -setStatusDB :: String -> PostgresQueueStore q -> RecipientId -> ServerEntityStatus -> IO (Either ErrorType ()) -setStatusDB name st rId status = - withDB' name st $ \db -> - DB.execute db "UPDATE msg_queues SET status = ? WHERE recipient_id = ?" (status, rId) +setStatusDB :: StoreQueueClass q => String -> PostgresQueueStore q -> q -> ServerEntityStatus -> IO (Either ErrorType ()) +setStatusDB op st sq status = + withQueueDB sq op $ \q -> do + withDB' op st $ \db -> + DB.execute db "UPDATE msg_queues SET status = ? WHERE recipient_id = ?" (status, recipientId sq) + atomically $ writeTVar (queueRec sq) $ Just q {status} -withDB' :: String -> PostgresQueueStore q -> (DB.Connection -> IO a) -> IO (Either ErrorType a) -withDB' name st' action = withDB name st' $ fmap Right . action +withQueueDB :: StoreQueueClass q => q -> String -> (QueueRec -> ExceptT ErrorType IO a) -> IO (Either ErrorType a) +withQueueDB sq op action = + withQueueLock sq op $ runExceptT $ do + q <- ExceptT $ readQueueRecIO $ queueRec sq + action q + +withDB' :: String -> PostgresQueueStore q -> (DB.Connection -> IO a) -> ExceptT ErrorType IO a +withDB' op st' action = withDB op st' $ fmap Right . action -- TODO [postgres] possibly, use with connection if queries in addQueue_ are combined -withDB :: forall a q. String -> PostgresQueueStore q -> (DB.Connection -> IO (Either ErrorType a)) -> IO (Either ErrorType a) -withDB name st' action = - E.try (withTransaction (dbStore st') action) >>= either logErr pure +withDB :: forall a q. String -> PostgresQueueStore q -> (DB.Connection -> IO (Either ErrorType a)) -> ExceptT ErrorType IO a +withDB op st' action = + ExceptT $ E.try (withTransaction (dbStore st') action) >>= either logErr pure where logErr :: E.SomeException -> IO (Either ErrorType a) logErr e = logError ("STORE: " <> T.pack err) $> Left (STORE err) where - err = name <> ", withLog, " <> show e + err = op <> ", withLog, " <> show e handleDuplicate :: SqlError -> IO ErrorType handleDuplicate e = case constraintViolation e of