smp server: PostgreSQL queue store (#1448)

* smp server: queue store typeclass

* parameterize JournalMsgStore

* typeclass for queue store

* postgres WIP

* compiles, passes tests

* remove StoreType

* split migrations

* progress

* addQueueRec

* reduce type spaghetti

* remove addQueue from typeclass definition

* getQueue

* test postgres storage in SMP server

* fix schema

* comment

* import queues to postgresql

* import queues to postgresql

* log

* fix test

* counts

* ci: test smp server with postgres backend (#1463)

* ci: test smp server with postgres backend

* postgres service

* attempt

* attempt

* empty

* empty

* PGHOST attempt

* PGHOST + softlink attempt

* only softlink attempt

* working attempt (PGHOST)

* remove env var

* empty

* do not start server without DB schema, do not import when schema exists

* export database

* enable all tests, disable two tests

* option for migration confirmation

* comments

---------

Co-authored-by: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com>
This commit is contained in:
Evgeny
2025-02-24 15:47:27 +00:00
committed by GitHub
parent f9d7b1eebc
commit 4dc40bd795
42 changed files with 1757 additions and 819 deletions
@@ -0,0 +1,376 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TypeOperators #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}
module Simplex.Messaging.Server.QueueStore.Postgres where
import Control.Concurrent.STM
import qualified Control.Exception as E
import Control.Logger.Simple
import Control.Monad
import Data.Bitraversable (bimapM)
import Data.Functor (($>))
import Data.Int (Int64)
import qualified Data.Map.Strict as M
import Data.Maybe (catMaybes, mapMaybe)
import qualified Data.Text as T
import Database.PostgreSQL.Simple (Binary (..), Only (..), Query, SqlError, (:.) (..))
import qualified Database.PostgreSQL.Simple as PSQL
import Database.PostgreSQL.Simple.Errors (ConstraintViolation (..), constraintViolation)
import Database.PostgreSQL.Simple.SqlQQ (sql)
import Simplex.Messaging.Agent.Client (withLockMap)
import Simplex.Messaging.Agent.Lock (Lock)
import Simplex.Messaging.Agent.Store.Postgres (createDBStore)
import Simplex.Messaging.Agent.Store.Postgres.Common
import Simplex.Messaging.Agent.Store.Postgres.DB (FromField (..), ToField (..), blobFieldDecoder)
import qualified Simplex.Messaging.Agent.Store.Postgres.DB as DB
import Simplex.Messaging.Agent.Store.Shared (MigrationConfirmation)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol
import Simplex.Messaging.Server.QueueStore
import Simplex.Messaging.Server.QueueStore.Postgres.Migrations (serverMigrations)
import Simplex.Messaging.Server.QueueStore.STM (readQueueRecIO, setStatus, withQueueRec)
import Simplex.Messaging.Server.QueueStore.Types
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Util (firstRow, ifM, tshow, ($>>), ($>>=), (<$$), (<$$>))
import System.Exit (exitFailure)
import System.IO (hFlush, stdout)
data PostgresQueueStore q = PostgresQueueStore
{ dbStore :: DBStore,
-- this map caches all created and opened queues
queues :: TMap RecipientId q,
-- this map only cashes the queues that were attempted to send messages to,
senders :: TMap SenderId RecipientId,
-- this map only cashes the queues that were attempted to be subscribed to,
notifiers :: TMap NotifierId RecipientId,
notifierLocks :: TMap NotifierId Lock
}
instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
type QueueStoreCfg (PostgresQueueStore q) = (DBOpts, MigrationConfirmation)
newQueueStore :: (DBOpts, MigrationConfirmation) -> IO (PostgresQueueStore q)
newQueueStore (dbOpts, confirmMigrations) = do
dbStore <- either err pure =<< createDBStore dbOpts serverMigrations confirmMigrations
queues <- TM.emptyIO
senders <- TM.emptyIO
notifiers <- TM.emptyIO
notifierLocks <- TM.emptyIO
pure PostgresQueueStore {dbStore, queues, senders, notifiers, notifierLocks}
where
err e = do
logError $ "STORE: newQueueStore, error opening PostgreSQL database, " <> tshow e
exitFailure
loadedQueues = queues
{-# INLINE loadedQueues #-}
queueCounts :: PostgresQueueStore q -> IO QueueCounts
queueCounts st =
withConnection (dbStore st) $ \db -> do
(queueCount, notifierCount) : _ <-
DB.query_
db
[sql|
SELECT
(SELECT COUNT(1) FROM msg_queues) AS queue_count,
(SELECT COUNT(1) FROM msg_notifiers) AS notifier_count
|]
pure QueueCounts {queueCount, notifierCount}
-- this implementation assumes that the lock is already taken by addQueue
-- and relies on unique constraints in the database to prevent duplicate IDs.
addQueue_ :: PostgresQueueStore q -> (RecipientId -> QueueRec -> IO q) -> RecipientId -> QueueRec -> IO (Either ErrorType q)
addQueue_ st mkQ rId qr = do
sq <- mkQ rId qr
withQueueLock sq "addQueue_" $
addDB $>> add sq
where
PostgresQueueStore {queues, senders} = st
addDB =
withDB "addQueue_" st $ \db ->
E.try (insertQueueDB db rId qr) >>= bimapM handleDuplicate pure
add sq = do
atomically $ TM.insert rId sq queues
atomically $ TM.insert (senderId qr) rId senders
pure $ Right sq
-- Not doing duplicate checks in maps as the probability of duplicates is very low.
-- It needs to be reconsidered when IDs are supplied by the users.
-- hasId = anyM [TM.memberIO rId queues, TM.memberIO senderId senders, hasNotifier]
-- hasNotifier = maybe (pure False) (\NtfCreds {notifierId} -> TM.memberIO notifierId notifiers) notifier
getQueue_ :: DirectParty p => PostgresQueueStore q -> (RecipientId -> QueueRec -> IO q) -> SParty p -> QueueId -> IO (Either ErrorType q)
getQueue_ st mkQ party qId = case party of
SRecipient -> getRcvQueue qId
SSender -> TM.lookupIO qId senders >>= maybe loadSndQueue getRcvQueue
SNotifier -> TM.lookupIO qId notifiers >>= maybe loadNtfQueue getRcvQueue
where
PostgresQueueStore {queues, senders, notifiers} = st
getRcvQueue rId = TM.lookupIO rId queues >>= maybe loadRcvQueue (pure . Right)
loadRcvQueue = loadQueue " WHERE q.recipient_id = ?" $ \_ -> pure ()
loadSndQueue = loadQueue " WHERE q.sender_id = ?" $ \rId -> TM.insert qId rId senders
loadNtfQueue = loadQueue " WHERE n.notifier_id = ?" $ \_ -> pure () -- do NOT cache ref - ntf subscriptions are rare
loadQueue condition insertRef =
loadQueueRec $>>= \(rId, qRec) -> do
sq <- mkQ rId qRec
atomically $
-- checking the cache again for concurrent reads
TM.lookup rId queues >>= \case
Just sq' -> pure $ Right sq'
Nothing -> do
insertRef rId
TM.insert rId sq queues
pure $ Right sq
where
loadQueueRec =
withDB "getQueue_" st $ \db -> firstRow rowToQueueRec AUTH $
DB.query db (queueRecQuery <> condition) (Only qId)
secureQueue :: PostgresQueueStore q -> q -> SndPublicAuthKey -> IO (Either ErrorType ())
secureQueue st sq sKey =
withQueueLock sq "secureQueue" $
readQueueRecIO qr
$>>= \q -> verify q
$>> secureDB
$>> secure q
where
qr = queueRec sq
verify q = pure $ case senderKey q of
Just k | sKey /= k -> Left AUTH
_ -> Right ()
secureDB =
withDB' "secureQueue" st $ \db ->
DB.execute db "UPDATE msg_queues SET sender_key = ? WHERE recipient_id = ?" (sKey, recipientId sq)
secure q = do
atomically $ writeTVar qr $ Just q {senderKey = Just sKey}
pure $ Right ()
addQueueNotifier :: PostgresQueueStore q -> q -> NtfCreds -> IO (Either ErrorType (Maybe NotifierId))
addQueueNotifier st sq ntfCreds@NtfCreds {notifierId = nId, notifierKey, rcvNtfDhSecret} =
withQueueLock sq "addQueueNotifier" $
readQueueRecIO qr $>>= add
where
PostgresQueueStore {notifiers} = st
rId = recipientId sq
qr = queueRec sq
add q =
withLockMap (notifierLocks st) nId "addQueueNotifier" $
ifM (TM.memberIO nId notifiers) (pure $ Left DUPLICATE_) $
addDB $>> do
nId_ <- forM (notifier q) $ \NtfCreds {notifierId} -> atomically (TM.delete notifierId notifiers) $> notifierId
let !q' = q {notifier = Just ntfCreds}
atomically $ writeTVar qr $ Just q'
-- cache queue notifier ID after notifier is added ntf server will likely subscribe
atomically $ TM.insert nId rId notifiers
pure $ Right nId_
addDB =
withDB "addQueueNotifier" st $ \db ->
E.try (insert db) >>= bimapM handleDuplicate pure
where
-- TODO [postgres] test how this query works with duplicate recipient_id (updates) and notifier_id (fails)
insert db =
DB.execute
db
[sql|
INSERT INTO msg_notifiers (recipient_id, notifier_id, notifier_key, rcv_ntf_dh_secret)
VALUES (?, ?, ?, ?)
ON CONFLICT (recipient_id) DO UPDATE
SET notifier_id = EXCLUDED.notifier_id,
notifier_key = EXCLUDED.notifier_key,
rcv_ntf_dh_secret = EXCLUDED.rcv_ntf_dh_secret
|]
(rId, nId, notifierKey, rcvNtfDhSecret)
deleteQueueNotifier :: PostgresQueueStore q -> q -> IO (Either ErrorType (Maybe NotifierId))
deleteQueueNotifier st sq =
withQueueLock sq "deleteQueueNotifier" $
readQueueRecIO qr $>>= fmap sequence . delete
where
qr = queueRec sq
delete :: QueueRec -> IO (Maybe (Either ErrorType NotifierId))
delete q = forM (notifier q) $ \NtfCreds {notifierId = nId} ->
withLockMap (notifierLocks st) nId "deleteQueueNotifier" $ do
deleteDB nId $>> do
atomically $ TM.delete nId $ notifiers st
atomically $ writeTVar qr $! Just q {notifier = Nothing}
pure $ Right nId
deleteDB nId =
withDB' "deleteQueueNotifier" st $ \db ->
DB.execute db "DELETE FROM msg_notifiers WHERE notifier_id = ?" (Only nId)
-- TODO [postgres] only update STM on DB success
suspendQueue :: PostgresQueueStore q -> q -> IO (Either ErrorType ())
suspendQueue st sq =
setStatus (queueRec sq) EntityOff
$>> setStatusDB "suspendQueue" st (recipientId sq) EntityOff
-- TODO [postgres] only update STM on DB success
blockQueue :: PostgresQueueStore q -> q -> BlockingInfo -> IO (Either ErrorType ())
blockQueue st sq info =
setStatus (queueRec sq) (EntityBlocked info)
$>> setStatusDB "blockQueue" st (recipientId sq) (EntityBlocked info)
-- TODO [postgres] only update STM on DB success
unblockQueue :: PostgresQueueStore q -> q -> IO (Either ErrorType ())
unblockQueue st sq =
setStatus (queueRec sq) EntityActive
$>> setStatusDB "unblockQueue" st (recipientId sq) EntityActive
-- TODO [postgres] only update STM on DB success
updateQueueTime :: PostgresQueueStore q -> q -> RoundedSystemTime -> IO (Either ErrorType QueueRec)
updateQueueTime st sq t = withQueueRec qr update $>>= updateDB
where
qr = queueRec sq
update q@QueueRec {updatedAt}
| updatedAt == Just t = pure (q, False)
| otherwise =
let !q' = q {updatedAt = Just t}
in (writeTVar qr $! Just q') $> (q', True)
updateDB (q, changed)
| changed = q <$$ withDB' "updateQueueTime" st (\db -> DB.execute db "UPDATE msg_queues SET updated_at = ? WHERE recipient_id = ?" (t, Binary $ unEntityId $ recipientId sq))
| otherwise = pure $ Right q
-- TODO [postgres] only update STM on DB success
deleteStoreQueue :: PostgresQueueStore q -> q -> IO (Either ErrorType (QueueRec, Maybe (MsgQueue q)))
deleteStoreQueue st sq =
withQueueRec qr delete
$>>= \q -> deleteDB
>>= mapM (\_ -> (q,) <$> atomically (swapTVar (msgQueue sq) Nothing))
where
qr = queueRec sq
delete q = do
writeTVar qr Nothing
TM.delete (senderId q) $ senders st
-- TODO [postgres] probably we should delete it?
-- forM_ (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId $ notifiers st
pure q
deleteDB =
withDB' "deleteStoreQueue" st $ \db ->
DB.execute db "DELETE FROM msg_queues WHERE recipient_id = ?" (Only $ Binary $ unEntityId $ recipientId sq)
insertQueueDB :: DB.Connection -> RecipientId -> QueueRec -> IO ()
insertQueueDB db rId QueueRec {recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, status, updatedAt} = do
DB.execute db insertQueueQuery (rId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, status, updatedAt)
forM_ notifier $ \NtfCreds {notifierId, notifierKey, rcvNtfDhSecret} ->
DB.execute db insertNotifierQuery (rId, notifierId, notifierKey, rcvNtfDhSecret)
batchInsertQueues :: StoreQueueClass q => Bool -> M.Map RecipientId q -> PostgresQueueStore q' -> IO (Int64, Int64)
batchInsertQueues tty queues toStore = do
qs <- catMaybes <$> mapM (\(rId, q) -> (rId,) <$$> readTVarIO (queueRec q)) (M.assocs queues)
putStrLn $ "Importing " <> show (length qs) <> " queues..."
let st = dbStore toStore
(ns, count) <- foldM (processChunk st) ((0, 0), 0) $ toChunks 1000000 qs
putStrLn $ progress count
pure ns
where
processChunk st ((qCnt, nCnt), i) qs = do
qCnt' <- withConnection st $ \db -> PSQL.executeMany db insertQueueQuery $ map toQueueRow qs
nCnt' <- withConnection st $ \db -> PSQL.executeMany db insertNotifierQuery $ mapMaybe toNotifierRow qs
let i' = i + length qs
when tty $ putStr (progress i' <> "\r") >> hFlush stdout
pure ((qCnt + qCnt', nCnt + nCnt'), i')
progress i = "Imported: " <> show i <> " queues"
toQueueRow (rId, QueueRec {recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, status, updatedAt}) =
(rId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, status, updatedAt)
toNotifierRow (rId, QueueRec {notifier}) =
(\NtfCreds {notifierId, notifierKey, rcvNtfDhSecret} -> (rId, notifierId, notifierKey, rcvNtfDhSecret)) <$> notifier
toChunks :: Int -> [a] -> [[a]]
toChunks _ [] = []
toChunks n xs =
let (ys, xs') = splitAt n xs
in ys : toChunks n xs'
insertQueueQuery :: Query
insertQueueQuery =
[sql|
INSERT INTO msg_queues
(recipient_id, recipient_key, rcv_dh_secret, sender_id, sender_key, snd_secure, status, updated_at)
VALUES (?,?,?,?,?,?,?,?)
|]
insertNotifierQuery :: Query
insertNotifierQuery =
[sql|
INSERT INTO msg_notifiers (recipient_id, notifier_id, notifier_key, rcv_ntf_dh_secret)
VALUES (?, ?, ?, ?)
|]
foldQueueRecs :: Monoid a => Bool -> PostgresQueueStore q -> (RecipientId -> QueueRec -> IO a) -> IO a
foldQueueRecs tty st f = do
fmap snd $ withConnection (dbStore st) $ \db ->
PSQL.fold_ db queueRecQuery (0 :: Int, mempty) $ \(!i, !acc) row -> do
r <- uncurry f (rowToQueueRec row)
let i' = i + 1
when (tty && i' `mod` 100000 == 0) $ putStr ("Processed: " <> show i <> " records\r") >> hFlush stdout
pure (i', acc <> r)
queueRecQuery :: Query
queueRecQuery =
[sql|
SELECT q.recipient_id, q.recipient_key, q.rcv_dh_secret, q.sender_id, q.sender_key, q.snd_secure, q.status, q.updated_at,
n.notifier_id, n.notifier_key, n.rcv_ntf_dh_secret
FROM msg_queues q
LEFT JOIN msg_notifiers n ON q.recipient_id = n.recipient_id
|]
rowToQueueRec :: ( (RecipientId, RcvPublicAuthKey, RcvDhSecret, SenderId, Maybe SndPublicAuthKey, SenderCanSecure, ServerEntityStatus, Maybe RoundedSystemTime)
:. (Maybe NotifierId, Maybe NtfPublicAuthKey, Maybe RcvNtfDhSecret)
) -> (RecipientId, QueueRec)
rowToQueueRec ((rId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, status, updatedAt) :. (notifierId_, notifierKey_, rcvNtfDhSecret_)) =
let notifier = NtfCreds <$> notifierId_ <*> notifierKey_ <*> rcvNtfDhSecret_
in (rId, QueueRec {recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, status, updatedAt})
setStatusDB :: String -> PostgresQueueStore q -> RecipientId -> ServerEntityStatus -> IO (Either ErrorType ())
setStatusDB name st rId status =
withDB' name st $ \db ->
DB.execute db "UPDATE msg_queues SET status = ? WHERE recipient_id = ?" (status, rId)
withDB' :: String -> PostgresQueueStore q -> (DB.Connection -> IO a) -> IO (Either ErrorType a)
withDB' name st' action = withDB name st' $ fmap Right . action
-- TODO [postgres] possibly, use with connection if queries in addQueue_ are combined
withDB :: forall a q. String -> PostgresQueueStore q -> (DB.Connection -> IO (Either ErrorType a)) -> IO (Either ErrorType a)
withDB name st' action =
E.try (withTransaction (dbStore st') action) >>= either logErr pure
where
logErr :: E.SomeException -> IO (Either ErrorType a)
logErr e = logError ("STORE: " <> T.pack err) $> Left (STORE err)
where
err = name <> ", withLog, " <> show e
handleDuplicate :: SqlError -> IO ErrorType
handleDuplicate e = case constraintViolation e of
Just (UniqueViolation _) -> pure AUTH
_ -> E.throwIO e
-- The orphan instances below are copy-pasted, but here they are defined specifically for PostgreSQL
instance ToField EntityId where toField (EntityId s) = toField $ Binary s
deriving newtype instance FromField EntityId
instance ToField (C.DhSecret 'C.X25519) where toField = toField . Binary . C.dhBytes'
instance FromField (C.DhSecret 'C.X25519) where fromField = blobFieldDecoder strDecode
instance ToField C.APublicAuthKey where toField = toField . Binary . C.encodePubKey
instance FromField C.APublicAuthKey where fromField = blobFieldDecoder C.decodePubKey
@@ -0,0 +1,49 @@
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE QuasiQuotes #-}
module Simplex.Messaging.Server.QueueStore.Postgres.Migrations where
import Data.List (sortOn)
import Data.Text (Text)
import qualified Data.Text as T
import Simplex.Messaging.Agent.Store.Shared
import Text.RawString.QQ (r)
serverSchemaMigrations :: [(String, Text, Maybe Text)]
serverSchemaMigrations =
[ ("20250207_initial", m20250207_initial, Nothing)
]
-- | The list of migrations in ascending order by date
serverMigrations :: [Migration]
serverMigrations = sortOn name $ map migration serverSchemaMigrations
where
migration (name, up, down) = Migration {name, up, down = down}
m20250207_initial :: Text
m20250207_initial =
T.pack
[r|
CREATE TABLE msg_queues(
recipient_id BYTEA NOT NULL,
recipient_key BYTEA NOT NULL,
rcv_dh_secret BYTEA NOT NULL,
sender_id BYTEA NOT NULL,
sender_key BYTEA,
snd_secure BOOLEAN NOT NULL,
status TEXT NOT NULL,
updated_at BIGINT,
PRIMARY KEY (recipient_id)
);
CREATE TABLE msg_notifiers(
notifier_id BYTEA NOT NULL,
recipient_id BYTEA NOT NULL REFERENCES msg_queues(recipient_id) ON DELETE CASCADE ON UPDATE RESTRICT,
notifier_key BYTEA NOT NULL,
rcv_ntf_dh_secret BYTEA NOT NULL,
PRIMARY KEY (notifier_id)
);
CREATE UNIQUE INDEX idx_msg_queues_sender_id ON msg_queues(sender_id);
CREATE UNIQUE INDEX idx_msg_notifiers_recipient_id ON msg_notifiers(recipient_id);
|]
+148 -174
View File
@@ -3,7 +3,7 @@
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NamedFieldPuns #-}
@@ -11,177 +11,186 @@
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE UndecidableInstances #-}
module Simplex.Messaging.Server.QueueStore.STM
( addQueue,
getQueue,
getQueueRec,
secureQueue,
addQueueNotifier,
deleteQueueNotifier,
suspendQueue,
blockQueue,
unblockQueue,
updateQueueTime,
deleteQueue',
newQueueStore,
readQueueStore,
( STMQueueStore (..),
setStoreLog,
withLog',
withQueueRec,
readQueueRecIO,
setStatus,
)
where
import qualified Control.Exception as E
import Control.Logger.Simple
import Control.Monad
import Control.Monad.IO.Class
import Control.Monad.Trans.Except
import Data.Bitraversable (bimapM)
import qualified Data.ByteString.Char8 as B
import Data.Functor (($>))
import qualified Data.Map.Strict as M
import qualified Data.Text as T
import Data.Text.Encoding (decodeLatin1)
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol
import Simplex.Messaging.Server.MsgStore.Types
import Simplex.Messaging.Server.QueueStore
import Simplex.Messaging.Server.QueueStore.Types
import Simplex.Messaging.Server.StoreLog
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Util (ifM, safeDecodeUtf8, tshow, ($>>=), (<$$))
import System.Exit (exitFailure)
import Simplex.Messaging.Util (anyM, ifM, ($>>), ($>>=), (<$$))
import System.IO
import UnliftIO.STM
newQueueStore :: IO (STMQueueStore q)
newQueueStore = do
queues <- TM.emptyIO
senders <- TM.emptyIO
notifiers <- TM.emptyIO
storeLog <- newTVarIO Nothing
pure STMQueueStore {queues, senders, notifiers, storeLog}
data STMQueueStore q = STMQueueStore
{ queues :: TMap RecipientId q,
senders :: TMap SenderId RecipientId,
notifiers :: TMap NotifierId RecipientId,
storeLog :: TVar (Maybe (StoreLog 'WriteMode))
}
addQueue :: STMStoreClass s => s -> RecipientId -> QueueRec -> IO (Either ErrorType (StoreQueue s))
addQueue st rId qr@QueueRec {senderId = sId, notifier}=
atomically add
$>>= \q -> q <$$ withLog "addQueue" st (\s -> logCreateQueue s rId qr)
where
STMQueueStore {queues, senders, notifiers} = stmQueueStore st
add = ifM hasId (pure $ Left DUPLICATE_) $ do
q <- mkQueue st rId qr
TM.insert rId q queues
TM.insert sId rId senders
forM_ notifier $ \NtfCreds {notifierId} -> TM.insert notifierId rId notifiers
pure $ Right q
hasId = or <$> sequence [TM.member rId queues, TM.member sId senders, hasNotifier]
hasNotifier = maybe (pure False) (\NtfCreds {notifierId} -> TM.member notifierId notifiers) notifier
setStoreLog :: STMQueueStore q -> StoreLog 'WriteMode -> IO ()
setStoreLog st sl = atomically $ writeTVar (storeLog st) (Just sl)
getQueue :: (STMStoreClass s, DirectParty p) => s -> SParty p -> QueueId -> IO (Either ErrorType (StoreQueue s))
getQueue st party qId =
maybe (Left AUTH) Right <$> case party of
SRecipient -> TM.lookupIO qId queues
SSender -> TM.lookupIO qId senders $>>= (`TM.lookupIO` queues)
SNotifier -> TM.lookupIO qId notifiers $>>= (`TM.lookupIO` queues)
where
STMQueueStore {queues, senders, notifiers} = stmQueueStore st
instance StoreQueueClass q => QueueStoreClass q (STMQueueStore q) where
type QueueStoreCfg (STMQueueStore q) = ()
getQueueRec :: (STMStoreClass s, DirectParty p) => s -> SParty p -> QueueId -> IO (Either ErrorType (StoreQueue s, QueueRec))
getQueueRec st party qId =
getQueue st party qId
$>>= (\q -> maybe (Left AUTH) (Right . (q,)) <$> readTVarIO (queueRec' q))
newQueueStore :: () -> IO (STMQueueStore q)
newQueueStore _ = do
queues <- TM.emptyIO
senders <- TM.emptyIO
notifiers <- TM.emptyIO
storeLog <- newTVarIO Nothing
pure STMQueueStore {queues, senders, notifiers, storeLog}
secureQueue :: STMStoreClass s => s -> StoreQueue s -> SndPublicAuthKey -> IO (Either ErrorType ())
secureQueue st sq sKey =
atomically (readQueueRec qr $>>= secure)
$>>= \_ -> withLog "secureQueue" st $ \s -> logSecureQueue s (recipientId' sq) sKey
where
qr = queueRec' sq
secure q = case senderKey q of
Just k -> pure $ if sKey == k then Right () else Left AUTH
Nothing -> do
writeTVar qr $ Just q {senderKey = Just sKey}
pure $ Right ()
loadedQueues = queues
{-# INLINE loadedQueues #-}
-- foldAllQueues = withLoadedQueues
-- {-# INLINE foldAllQueues #-}
addQueueNotifier :: STMStoreClass s => s -> StoreQueue s -> NtfCreds -> IO (Either ErrorType (Maybe NotifierId))
addQueueNotifier st sq ntfCreds@NtfCreds {notifierId = nId} =
atomically (readQueueRec qr $>>= add)
$>>= \nId_ -> nId_ <$$ withLog "addQueueNotifier" st (\s -> logAddNotifier s rId ntfCreds)
where
rId = recipientId' sq
qr = queueRec' sq
STMQueueStore {notifiers} = stmQueueStore st
add q = ifM (TM.member nId notifiers) (pure $ Left DUPLICATE_) $ do
nId_ <- forM (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId notifiers $> notifierId
let !q' = q {notifier = Just ntfCreds}
writeTVar qr $ Just q'
TM.insert nId rId notifiers
pure $ Right nId_
queueCounts :: STMQueueStore q -> IO QueueCounts
queueCounts st = do
queueCount <- M.size <$> readTVarIO (queues st)
notifierCount <- M.size <$> readTVarIO (notifiers st)
pure QueueCounts {queueCount, notifierCount}
deleteQueueNotifier :: STMStoreClass s => s -> StoreQueue s -> IO (Either ErrorType (Maybe NotifierId))
deleteQueueNotifier st sq =
atomically (readQueueRec qr >>= mapM delete)
$>>= \nId_ -> nId_ <$$ withLog "deleteQueueNotifier" st (`logDeleteNotifier` recipientId' sq)
where
qr = queueRec' sq
delete q = forM (notifier q) $ \NtfCreds {notifierId} -> do
TM.delete notifierId $ notifiers $ stmQueueStore st
writeTVar qr $! Just q {notifier = Nothing}
pure notifierId
addQueue_ :: STMQueueStore q -> (RecipientId -> QueueRec -> IO q) -> RecipientId -> QueueRec -> IO (Either ErrorType q)
addQueue_ st mkQ rId qr@QueueRec {senderId = sId, notifier} = do
sq <- mkQ rId qr
add sq $>> withLog "addStoreQueue" st (\s -> logCreateQueue s rId qr) $> Right sq
where
STMQueueStore {queues, senders, notifiers} = st
add q = atomically $ ifM hasId (pure $ Left DUPLICATE_) $ Right () <$ do
TM.insert rId q queues
TM.insert sId rId senders
forM_ notifier $ \NtfCreds {notifierId} -> TM.insert notifierId rId notifiers
hasId = anyM [TM.member rId queues, TM.member sId senders, hasNotifier]
hasNotifier = maybe (pure False) (\NtfCreds {notifierId} -> TM.member notifierId notifiers) notifier
suspendQueue :: STMStoreClass s => s -> StoreQueue s -> IO (Either ErrorType ())
suspendQueue st sq =
atomically (readQueueRec qr >>= mapM suspend)
$>>= \_ -> withLog "suspendQueue" st (`logSuspendQueue` recipientId' sq)
where
qr = queueRec' sq
suspend q = writeTVar qr $! Just q {status = EntityOff}
getQueue_ :: DirectParty p => STMQueueStore q -> (RecipientId -> QueueRec -> IO q) -> SParty p -> QueueId -> IO (Either ErrorType q)
getQueue_ st _ party qId =
maybe (Left AUTH) Right <$> case party of
SRecipient -> TM.lookupIO qId queues
SSender -> TM.lookupIO qId senders $>>= (`TM.lookupIO` queues)
SNotifier -> TM.lookupIO qId notifiers $>>= (`TM.lookupIO` queues)
where
STMQueueStore {queues, senders, notifiers} = st
blockQueue :: STMStoreClass s => s -> StoreQueue s -> BlockingInfo -> IO (Either ErrorType ())
blockQueue st sq info =
atomically (readQueueRec qr >>= mapM block)
$>>= \_ -> withLog "blockQueue" st (\sl -> logBlockQueue sl (recipientId' sq) info)
where
qr = queueRec' sq
block q = writeTVar qr $ Just q {status = EntityBlocked info}
secureQueue :: STMQueueStore q -> q -> SndPublicAuthKey -> IO (Either ErrorType ())
secureQueue st sq sKey =
atomically (readQueueRec qr $>>= secure)
$>> withLog "secureQueue" st (\s -> logSecureQueue s (recipientId sq) sKey)
where
qr = queueRec sq
secure q = case senderKey q of
Just k -> pure $ if sKey == k then Right () else Left AUTH
Nothing -> do
writeTVar qr $ Just q {senderKey = Just sKey}
pure $ Right ()
unblockQueue :: STMStoreClass s => s -> StoreQueue s -> IO (Either ErrorType ())
unblockQueue st sq =
atomically (readQueueRec qr >>= mapM unblock)
$>>= \_ -> withLog "unblockQueue" st (`logUnblockQueue` recipientId' sq)
where
qr = queueRec' sq
unblock q = writeTVar qr $ Just q {status = EntityActive}
addQueueNotifier :: STMQueueStore q -> q -> NtfCreds -> IO (Either ErrorType (Maybe NotifierId))
addQueueNotifier st sq ntfCreds@NtfCreds {notifierId = nId} =
atomically (readQueueRec qr $>>= add)
$>>= \nId_ -> nId_ <$$ withLog "addQueueNotifier" st (\s -> logAddNotifier s rId ntfCreds)
where
rId = recipientId sq
qr = queueRec sq
STMQueueStore {notifiers} = st
add q = ifM (TM.member nId notifiers) (pure $ Left DUPLICATE_) $ do
nId_ <- forM (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId notifiers $> notifierId
let !q' = q {notifier = Just ntfCreds}
writeTVar qr $ Just q'
TM.insert nId rId notifiers
pure $ Right nId_
updateQueueTime :: STMStoreClass s => s -> StoreQueue s -> RoundedSystemTime -> IO (Either ErrorType QueueRec)
updateQueueTime st sq t = atomically (readQueueRec qr >>= mapM update) $>>= log'
where
qr = queueRec' sq
update q@QueueRec {updatedAt}
| updatedAt == Just t = pure (q, False)
| otherwise =
let !q' = q {updatedAt = Just t}
in (writeTVar qr $! Just q') $> (q', True)
log' (q, changed)
| changed = q <$$ withLog "updateQueueTime" st (\sl -> logUpdateQueueTime sl (recipientId' sq) t)
| otherwise = pure $ Right q
deleteQueueNotifier :: STMQueueStore q -> q -> IO (Either ErrorType (Maybe NotifierId))
deleteQueueNotifier st sq =
withQueueRec qr delete
$>>= \nId_ -> nId_ <$$ withLog "deleteQueueNotifier" st (`logDeleteNotifier` recipientId sq)
where
qr = queueRec sq
delete q = forM (notifier q) $ \NtfCreds {notifierId} -> do
TM.delete notifierId $ notifiers st
writeTVar qr $ Just q {notifier = Nothing}
pure notifierId
deleteQueue' :: STMStoreClass s => s -> StoreQueue s -> IO (Either ErrorType (QueueRec, Maybe (MsgQueue s)))
deleteQueue' st sq =
atomically (readQueueRec qr >>= mapM delete)
$>>= \q -> withLog "deleteQueue" st (`logDeleteQueue` recipientId' sq)
>>= bimapM pure (\_ -> (q,) <$> atomically (swapTVar (msgQueue_' sq) Nothing))
where
qr = queueRec' sq
STMQueueStore {senders, notifiers} = stmQueueStore st
delete q = do
writeTVar qr Nothing
TM.delete (senderId q) senders
forM_ (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId notifiers
pure q
suspendQueue :: STMQueueStore q -> q -> IO (Either ErrorType ())
suspendQueue st sq =
setStatus (queueRec sq) EntityOff
$>> withLog "suspendQueue" st (`logSuspendQueue` recipientId sq)
blockQueue :: STMQueueStore q -> q -> BlockingInfo -> IO (Either ErrorType ())
blockQueue st sq info =
setStatus (queueRec sq) (EntityBlocked info)
$>> withLog "blockQueue" st (\sl -> logBlockQueue sl (recipientId sq) info)
unblockQueue :: STMQueueStore q -> q -> IO (Either ErrorType ())
unblockQueue st sq =
setStatus (queueRec sq) EntityActive
$>> withLog "unblockQueue" st (`logUnblockQueue` recipientId sq)
updateQueueTime :: STMQueueStore q -> q -> RoundedSystemTime -> IO (Either ErrorType QueueRec)
updateQueueTime st sq t = withQueueRec qr update $>>= log'
where
qr = queueRec sq
update q@QueueRec {updatedAt}
| updatedAt == Just t = pure (q, False)
| otherwise =
let !q' = q {updatedAt = Just t}
in writeTVar qr (Just q') $> (q', True)
log' (q, changed)
| changed = q <$$ withLog "updateQueueTime" st (\sl -> logUpdateQueueTime sl (recipientId sq) t)
| otherwise = pure $ Right q
deleteStoreQueue :: STMQueueStore q -> q -> IO (Either ErrorType (QueueRec, Maybe (MsgQueue q)))
deleteStoreQueue st sq =
withQueueRec qr delete
$>>= \q -> withLog "deleteStoreQueue" st (`logDeleteQueue` recipientId sq)
>>= mapM (\_ -> (q,) <$> atomically (swapTVar (msgQueue sq) Nothing))
where
qr = queueRec sq
delete q = do
writeTVar qr Nothing
TM.delete (senderId q) $ senders st
forM_ (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId $ notifiers st
pure q
withQueueRec :: TVar (Maybe QueueRec) -> (QueueRec -> STM a) -> IO (Either ErrorType a)
withQueueRec qr a = atomically $ readQueueRec qr >>= mapM a
setStatus :: TVar (Maybe QueueRec) -> ServerEntityStatus -> IO (Either ErrorType ())
setStatus qr status =
atomically $ stateTVar qr $ \case
Just q -> (Right (), Just q {status})
Nothing -> (Left AUTH, Nothing)
readQueueRec :: TVar (Maybe QueueRec) -> STM (Either ErrorType QueueRec)
readQueueRec qr = maybe (Left AUTH) Right <$> readTVar qr
{-# INLINE readQueueRec #-}
readQueueRecIO :: TVar (Maybe QueueRec) -> IO (Either ErrorType QueueRec)
readQueueRecIO qr = maybe (Left AUTH) Right <$> readTVarIO qr
{-# INLINE readQueueRecIO #-}
withLog' :: String -> TVar (Maybe (StoreLog 'WriteMode)) -> (StoreLog 'WriteMode -> IO ()) -> IO (Either ErrorType ())
withLog' name sl action =
readTVarIO sl
@@ -192,41 +201,6 @@ withLog' name sl action =
where
err = name <> ", withLog, " <> show e
withLog :: STMStoreClass s => String -> s -> (StoreLog 'WriteMode -> IO ()) -> IO (Either ErrorType ())
withLog name = withLog' name . storeLog . stmQueueStore
readQueueStore :: forall s. STMStoreClass s => FilePath -> s -> IO ()
readQueueStore f st = readLogLines False f processLine
where
processLine :: Bool -> B.ByteString -> IO ()
processLine eof s = either printError procLogRecord (strDecode s)
where
procLogRecord :: StoreLogRecord -> IO ()
procLogRecord = \case
CreateQueue rId q -> addQueue st rId q >>= qError rId "CreateQueue"
SecureQueue qId sKey -> withQueue qId "SecureQueue" $ \q -> secureQueue st q sKey
AddNotifier qId ntfCreds -> withQueue qId "AddNotifier" $ \q -> addQueueNotifier st q ntfCreds
SuspendQueue qId -> withQueue qId "SuspendQueue" $ suspendQueue st
BlockQueue qId info -> withQueue qId "BlockQueue" $ \q -> blockQueue st q info
UnblockQueue qId -> withQueue qId "UnblockQueue" $ unblockQueue st
DeleteQueue qId -> withQueue qId "DeleteQueue" $ deleteQueue st
DeleteNotifier qId -> withQueue qId "DeleteNotifier" $ deleteQueueNotifier st
UpdateTime qId t -> withQueue qId "UpdateTime" $ \q -> updateQueueTime st q t
printError :: String -> IO ()
printError e
| eof = logWarn err
| otherwise = logError err >> exitFailure
where
err = "Error parsing log: " <> T.pack e <> " - " <> safeDecodeUtf8 s
withQueue :: forall a. RecipientId -> T.Text -> (StoreQueue s -> IO (Either ErrorType a)) -> IO ()
withQueue qId op a = runExceptT go >>= qError qId op
where
go = do
q <- ExceptT $ getQueue st SRecipient qId
liftIO (readTVarIO $ queueRec' q) >>= \case
Nothing -> logWarn $ logPfx qId op <> "already deleted"
Just _ -> void $ ExceptT $ a q
qError qId op = \case
Left e -> logError $ logPfx qId op <> tshow e
Right _ -> pure ()
logPfx qId op = "STORE: " <> op <> ", stored queue " <> decodeLatin1 (strEncode qId) <> ", "
withLog :: String -> STMQueueStore q -> (StoreLog 'WriteMode -> IO ()) -> IO (Either ErrorType ())
withLog name = withLog' name . storeLog
{-# INLINE withLog #-}
@@ -0,0 +1,48 @@
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TypeFamilyDependencies #-}
module Simplex.Messaging.Server.QueueStore.Types where
import Control.Concurrent.STM
import Control.Monad
import Simplex.Messaging.Protocol
import Simplex.Messaging.Server.QueueStore
import Simplex.Messaging.TMap (TMap)
class StoreQueueClass q where
type MsgQueue q = mq | mq -> q
recipientId :: q -> RecipientId
queueRec :: q -> TVar (Maybe QueueRec)
msgQueue :: q -> TVar (Maybe (MsgQueue q))
withQueueLock :: q -> String -> IO a -> IO a
class StoreQueueClass q => QueueStoreClass q s where
type QueueStoreCfg s
newQueueStore :: QueueStoreCfg s -> IO s
queueCounts :: s -> IO QueueCounts
loadedQueues :: s -> TMap RecipientId q
-- foldAllQueues :: Monoid a => s -> (q -> IO a) -> IO a
addQueue_ :: s -> (RecipientId -> QueueRec -> IO q) -> RecipientId -> QueueRec -> IO (Either ErrorType q)
getQueue_ :: DirectParty p => s -> (RecipientId -> QueueRec -> IO q) -> SParty p -> QueueId -> IO (Either ErrorType q)
secureQueue :: s -> q -> SndPublicAuthKey -> IO (Either ErrorType ())
addQueueNotifier :: s -> q -> NtfCreds -> IO (Either ErrorType (Maybe NotifierId))
deleteQueueNotifier :: s -> q -> IO (Either ErrorType (Maybe NotifierId))
suspendQueue :: s -> q -> IO (Either ErrorType ())
blockQueue :: s -> q -> BlockingInfo -> IO (Either ErrorType ())
unblockQueue :: s -> q -> IO (Either ErrorType ())
updateQueueTime :: s -> q -> RoundedSystemTime -> IO (Either ErrorType QueueRec)
deleteStoreQueue :: s -> q -> IO (Either ErrorType (QueueRec, Maybe (MsgQueue q)))
data QueueCounts = QueueCounts
{ queueCount :: Int,
notifierCount :: Int
}
withLoadedQueues :: (Monoid a, QueueStoreClass q s) => s -> (q -> IO a) -> IO a
withLoadedQueues st f = readTVarIO (loadedQueues st) >>= foldM run mempty
where
run !acc = fmap (acc <>) . f