rfc: client certificates for servers using SMP protocol as clients (opertors' chat relays, notification servers, service bots) (#1534)

* rfc: client certificates for high volume clients (opertors' chat relays, notification servers, service bots)

* client certificates types (WIP)

* parameterize Transport

* protocol/schema/api changes

* agent API

* rename command

* agent subscriptions return local ClientServiceId to chat

* verify transmissions

* fix receiving client certificates, refactor

* ntf server: remove shared queue for all notification subscriptions (#1543)

* ntf server: remove shared queue for all notification subscriptions

* wait for subscriber with timeout

* safer

* refactor

* log

* remove unused

* WIP service subscriptions and associations, refactor

* process service subscriptions

* rename

* simplify switching subscriptions

* SMP service handshake with additional server handshake response

* notification delivery and STM persistence for services

* smp server: database storage, store log, fix encoding for STORE error, replace String with Text in locks and error

* stats

* more stats

* rename SMP commands

* service subscriptions in ntf server agent (tests fail)

* fix

* refactor

* exports

* subscribe ntf server as service for associated queues

* test ntf service connection, fix SOKS response, fix service associations not removed in STM storage

* INI option to support services

* ntf server: downgrade subscriptions when service is no longer supported, track counts of subscribed queues

* smp protocol: include service certificate fingerprint in the string signed over with entity key (TODO two tests fail)

* fix test

* ntf server prometheus stats, use Int64 in SOKS/ENDS responses (to avoid conversions), additional error status for ntf subscription

* update RFC

* refactor useServiceAuth to avoid ad hoc decisions about which commands use service signatures, and to prohibit service signatures on other commands

* remove duplicate service signature syntax check from checkCredentials, it is checked in verifyTransmission

* service errors, todos

* fix checkCredentials in ntf server, service errors

* refactor service auth

* refactor

* service agent: store returned queue count instead of expected

* refactor serverThread

* refactor serviceSig

* rename

* refactor, rename, test repeat NSUB service association

* respond with error to SUBS

* smp server: export/import service records between database and store log

* comment

* comments

* ghc 8.10.7
This commit is contained in:
Evgeny
2025-06-06 08:03:47 +01:00
committed by GitHub
parent 8e86c97a13
commit 5241f5fe5e
74 changed files with 3610 additions and 1339 deletions
@@ -21,7 +21,9 @@
module Simplex.Messaging.Server.QueueStore.Postgres
( PostgresQueueStore (..),
PostgresStoreCfg (..),
batchInsertServices,
batchInsertQueues,
foldServiceRecs,
foldQueueRecs,
handleDuplicate,
withLog_,
@@ -43,13 +45,16 @@ import Data.Bitraversable (bimapM)
import Data.Either (fromRight)
import Data.Functor (($>))
import Data.Int (Int64)
import Data.List (intersperse)
import Data.List (foldl', intersperse, partition)
import Data.List.NonEmpty (NonEmpty)
import qualified Data.Map.Strict as M
import Data.Maybe (catMaybes, fromMaybe)
import qualified Data.Text as T
import qualified Data.Set as S
import Data.Text (Text)
import Data.Time.Clock.System (SystemTime (..), getSystemTime)
import Database.PostgreSQL.Simple (Binary (..), Only (..), Query, SqlError, (:.) (..))
import qualified Data.X509 as X
import qualified Data.X509.Validation as XV
import Database.PostgreSQL.Simple (Binary (..), In (..), Only (..), Query, SqlError, (:.) (..))
import qualified Database.PostgreSQL.Simple as DB
import qualified Database.PostgreSQL.Simple.Copy as DB
import Database.PostgreSQL.Simple.FromField (FromField (..))
@@ -65,16 +70,18 @@ import Simplex.Messaging.Agent.Store.Postgres.Common
import Simplex.Messaging.Agent.Store.Postgres.DB (blobFieldDecoder)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding
import Simplex.Messaging.Parsers (parseAll)
import Simplex.Messaging.Protocol
import Simplex.Messaging.Server.QueueStore
import Simplex.Messaging.Server.QueueStore.Postgres.Config
import Simplex.Messaging.Server.QueueStore.Postgres.Migrations (serverMigrations)
import Simplex.Messaging.Server.QueueStore.STM (readQueueRecIO)
import Simplex.Messaging.Server.QueueStore.STM (STMService (..), readQueueRecIO)
import Simplex.Messaging.Server.QueueStore.Types
import Simplex.Messaging.Server.StoreLog
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Util (eitherToMaybe, firstRow, ifM, tshow, (<$$>))
import Simplex.Messaging.Transport (SMPServiceRole (..))
import Simplex.Messaging.Util (eitherToMaybe, firstRow, ifM, maybeFirstRow, tshow, (<$$>))
import System.Exit (exitFailure)
import System.IO (IOMode (..), hFlush, stdout)
import UnliftIO.STM
@@ -96,6 +103,7 @@ data PostgresQueueStore q = PostgresQueueStore
-- this map only cashes the queues that were attempted to be subscribed to,
notifiers :: TMap NotifierId RecipientId,
notifierLocks :: TMap NotifierId Lock,
serviceLocks :: TMap CertFingerprint Lock,
deletedTTL :: Int64
}
@@ -111,7 +119,8 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
links <- TM.emptyIO
notifiers <- TM.emptyIO
notifierLocks <- TM.emptyIO
pure PostgresQueueStore {dbStore, dbStoreLog, queues, senders, links, notifiers, notifierLocks, deletedTTL}
serviceLocks <- TM.emptyIO
pure PostgresQueueStore {dbStore, dbStoreLog, queues, senders, links, notifiers, notifierLocks, serviceLocks, deletedTTL}
where
err e = do
logError $ "STORE: newQueueStore, error opening PostgreSQL database, " <> tshow e
@@ -131,18 +140,23 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
fmap (fromRight 0) $ runExceptT $ withDB' "removeDeletedQueues" st $ \db ->
DB.execute db "DELETE FROM msg_queues WHERE deleted_at < ?" (Only old)
queueCounts :: PostgresQueueStore q -> IO QueueCounts
queueCounts st =
getEntityCounts :: PostgresQueueStore q -> IO EntityCounts
getEntityCounts st =
withConnection (dbStore st) $ \db -> do
(queueCount, notifierCount) : _ <-
DB.query_
(queueCount, notifierCount, rcvServiceCount, ntfServiceCount, rcvServiceQueuesCount, ntfServiceQueuesCount) : _ <-
DB.query
db
[sql|
SELECT
(SELECT COUNT(1) FROM msg_queues WHERE deleted_at IS NULL) AS queue_count,
(SELECT COUNT(1) FROM msg_queues WHERE deleted_at IS NULL AND notifier_id IS NOT NULL) AS notifier_count
(SELECT COUNT(1) FROM msg_queues WHERE deleted_at IS NULL AND notifier_id IS NOT NULL) AS notifier_count,
(SELECT COUNT(1) FROM services WHERE service_role = ?) AS rcv_service_count,
(SELECT COUNT(1) FROM services WHERE service_role = ?) AS ntf_service_count,
(SELECT COUNT(1) FROM msg_queues WHERE rcv_service_id IS NOT NULL AND deleted_at IS NULL) AS rcv_service_queues_count,
(SELECT COUNT(1) FROM msg_queues WHERE ntf_service_id IS NOT NULL AND deleted_at IS NULL) AS ntf_service_queues_count
|]
pure QueueCounts {queueCount, notifierCount}
(SRMessaging, SRNotifier)
pure EntityCounts {queueCount, notifierCount, rcvServiceCount, ntfServiceCount, rcvServiceQueuesCount, ntfServiceQueuesCount}
-- this implementation assumes that the lock is already taken by addQueue
-- and relies on unique constraints in the database to prevent duplicate IDs.
@@ -169,13 +183,15 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
getQueue_ :: DirectParty p => PostgresQueueStore q -> (Bool -> RecipientId -> QueueRec -> IO q) -> SParty p -> QueueId -> IO (Either ErrorType q)
getQueue_ st mkQ party qId = case party of
SRecipient -> getRcvQueue qId
SSender -> TM.lookupIO qId senders >>= maybe (mask loadSndQueue) getRcvQueue
SSender -> getSndQueue
SProxyService -> getSndQueue
SSenderLink -> TM.lookupIO qId links >>= maybe (mask loadLinkQueue) getRcvQueue
-- loaded queue is deleted from notifiers map to reduce cache size after queue was subscribed to by ntf server
SNotifier -> TM.lookupIO qId notifiers >>= maybe (mask loadNtfQueue) (getRcvQueue >=> (atomically (TM.delete qId notifiers) $>))
where
PostgresQueueStore {queues, senders, links, notifiers} = st
getRcvQueue rId = TM.lookupIO rId queues >>= maybe (mask loadRcvQueue) (pure . Right)
getSndQueue = TM.lookupIO qId senders >>= maybe (mask loadSndQueue) getRcvQueue
loadRcvQueue = do
(rId, qRec) <- loadQueue " WHERE recipient_id = ?"
liftIO $ cacheQueue rId qRec $ \_ -> pure () -- recipient map already checked, not caching sender ref
@@ -273,20 +289,20 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
where
rId = recipientId sq
addQueueNotifier :: PostgresQueueStore q -> q -> NtfCreds -> IO (Either ErrorType (Maybe NotifierId))
addQueueNotifier :: PostgresQueueStore q -> q -> NtfCreds -> IO (Either ErrorType (Maybe NtfCreds))
addQueueNotifier st sq ntfCreds@NtfCreds {notifierId = nId, notifierKey, rcvNtfDhSecret} =
withQueueRec sq "addQueueNotifier" $ \q ->
ExceptT $ withLockMap (notifierLocks st) nId "addQueueNotifier" $
ifM (TM.memberIO nId notifiers) (pure $ Left DUPLICATE_) $ runExceptT $ do
assertUpdated $ withDB "addQueueNotifier" st $ \db ->
E.try (update db) >>= bimapM handleDuplicate pure
nId_ <- forM (notifier q) $ \NtfCreds {notifierId} -> atomically (TM.delete notifierId notifiers) $> notifierId
nc_ <- forM (notifier q) $ \nc@NtfCreds {notifierId} -> atomically (TM.delete notifierId notifiers) $> nc
let !q' = q {notifier = Just ntfCreds}
atomically $ writeTVar (queueRec sq) $ Just q'
-- cache queue notifier ID after notifier is added ntf server will likely subscribe
atomically $ TM.insert nId rId notifiers
withLog "addQueueNotifier" st $ \s -> logAddNotifier s rId ntfCreds
pure nId_
pure nc_
where
PostgresQueueStore {notifiers} = st
rId = recipientId sq
@@ -300,16 +316,16 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
|]
(nId, notifierKey, rcvNtfDhSecret, rId)
deleteQueueNotifier :: PostgresQueueStore q -> q -> IO (Either ErrorType (Maybe NotifierId))
deleteQueueNotifier :: PostgresQueueStore q -> q -> IO (Either ErrorType (Maybe NtfCreds))
deleteQueueNotifier st sq =
withQueueRec sq "deleteQueueNotifier" $ \q ->
ExceptT $ fmap sequence $ forM (notifier q) $ \NtfCreds {notifierId = nId} ->
ExceptT $ fmap sequence $ forM (notifier q) $ \nc@NtfCreds {notifierId = nId} ->
withLockMap (notifierLocks st) nId "deleteQueueNotifier" $ runExceptT $ do
assertUpdated $ withDB' "deleteQueueNotifier" st update
atomically $ TM.delete nId $ notifiers st
atomically $ writeTVar (queueRec sq) $ Just q {notifier = Nothing}
withLog "deleteQueueNotifier" st (`logDeleteNotifier` rId)
pure nId
pure nc
where
rId = recipientId sq
update db =
@@ -371,6 +387,75 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
rId = recipientId sq
qr = queueRec sq
getCreateService :: PostgresQueueStore q -> ServiceRec -> IO (Either ErrorType ServiceId)
getCreateService st sr@ServiceRec {serviceId = newSrvId, serviceRole, serviceCertHash = XV.Fingerprint fp} =
withLockMap (serviceLocks st) fp "getCreateService" $ E.uninterruptibleMask_ $ runExceptT $ do
(serviceId, new) <-
withDB "getCreateService" st $ \db ->
maybeFirstRow id (DB.query db "SELECT service_id, service_role FROM services WHERE service_cert_hash = ?" (Only (Binary fp))) >>= \case
Just (serviceId, role)
| role == serviceRole -> pure $ Right (serviceId, False)
| otherwise -> pure $ Left SERVICE
Nothing ->
E.try (DB.execute db insertServiceQuery (serviceRecToRow sr))
>>= bimapM handleDuplicate (\_ -> pure (newSrvId, True))
when new $ withLog "getCreateService" st (`logNewService` sr)
pure serviceId
setQueueService :: (PartyI p, SubscriberParty p) => PostgresQueueStore q -> q -> SParty p -> Maybe ServiceId -> IO (Either ErrorType ())
setQueueService st sq party serviceId = withQueueRec sq "setQueueService" $ \q -> case party of
SRecipient
| rcvServiceId q == serviceId -> pure ()
| otherwise -> do
assertUpdated $ withDB' "setQueueService" st $ \db ->
DB.execute db "UPDATE msg_queues SET rcv_service_id = ? WHERE recipient_id = ? AND deleted_at IS NULL" (serviceId, rId)
updateQueueRec q {rcvServiceId = serviceId}
SNotifier -> case notifier q of
Nothing -> throwE AUTH
Just nc@NtfCreds {ntfServiceId = prevSrvId}
| prevSrvId == serviceId -> pure ()
| otherwise -> do
assertUpdated $ withDB' "setQueueService" st $ \db ->
DB.execute db "UPDATE msg_queues SET ntf_service_id = ? WHERE recipient_id = ? AND notifier_id IS NOT NULL AND deleted_at IS NULL" (serviceId, rId)
updateQueueRec q {notifier = Just nc {ntfServiceId = serviceId}}
where
rId = recipientId sq
updateQueueRec :: QueueRec -> ExceptT ErrorType IO ()
updateQueueRec q' = do
atomically $ writeTVar (queueRec sq) $ Just q'
withLog "setQueueService" st $ \sl -> logQueueService sl rId party serviceId
getQueueNtfServices :: PostgresQueueStore q -> [(NotifierId, a)] -> IO (Either ErrorType ([(Maybe ServiceId, [(NotifierId, a)])], [(NotifierId, a)]))
getQueueNtfServices st ntfs = E.uninterruptibleMask_ $ runExceptT $ do
snIds <-
withDB' "getQueueNtfServices" st $ \db ->
DB.query db "SELECT ntf_service_id, notifier_id FROM msg_queues WHERE notifier_id IN ? AND deleted_at IS NULL" (Only (In (map fst ntfs)))
pure $
if null snIds
then ([], ntfs)
else
let snIds' = foldl' (\m (sId, nId) -> M.alter (Just . maybe (S.singleton nId) (S.insert nId)) sId m) M.empty snIds
in foldr addService ([], ntfs) (M.assocs snIds')
where
addService ::
(Maybe ServiceId, S.Set NotifierId) ->
([(Maybe ServiceId, [(NotifierId, a)])], [(NotifierId, a)]) ->
([(Maybe ServiceId, [(NotifierId, a)])], [(NotifierId, a)])
addService (serviceId, snIds) (ssNtfs, ntfs') =
let (sNtfs, restNtfs) = partition (\(nId, _) -> S.member nId snIds) ntfs'
in ((serviceId, sNtfs) : ssNtfs, restNtfs)
getNtfServiceQueueCount :: PostgresQueueStore q -> ServiceId -> IO (Either ErrorType Int64)
getNtfServiceQueueCount st serviceId =
E.uninterruptibleMask_ $ runExceptT $ withDB' "getNtfServiceQueueCount" st $ \db ->
fmap (fromMaybe 0) $ maybeFirstRow fromOnly $
DB.query db "SELECT count(1) FROM msg_queues WHERE ntf_service_id = ? AND deleted_at IS NULL" (Only serviceId)
batchInsertServices :: [STMService] -> PostgresQueueStore q -> IO Int64
batchInsertServices services' toStore =
withConnection (dbStore toStore) $ \db ->
DB.executeMany db insertServiceQuery $ map (serviceRecToRow . serviceRec) services'
batchInsertQueues :: StoreQueueClass q => Bool -> M.Map RecipientId q -> PostgresQueueStore q' -> IO Int64
batchInsertQueues tty queues toStore = do
qs <- catMaybes <$> mapM (\(rId, q) -> (rId,) <$$> readTVarIO (queueRec q)) (M.assocs queues)
@@ -381,7 +466,7 @@ batchInsertQueues tty queues toStore = do
DB.copy_
db
[sql|
COPY msg_queues (recipient_id, recipient_keys, rcv_dh_secret, sender_id, sender_key, queue_mode, notifier_id, notifier_key, rcv_ntf_dh_secret, status, updated_at, link_id, fixed_data, user_data)
COPY msg_queues (recipient_id, recipient_keys, rcv_dh_secret, sender_id, sender_key, queue_mode, notifier_id, notifier_key, rcv_ntf_dh_secret, ntf_service_id, status, updated_at, link_id, rcv_service_id, fixed_data, user_data)
FROM STDIN WITH (FORMAT CSV)
|]
mapM_ (putQueue db) (zip [1..] qs)
@@ -399,10 +484,24 @@ insertQueueQuery :: Query
insertQueueQuery =
[sql|
INSERT INTO msg_queues
(recipient_id, recipient_keys, rcv_dh_secret, sender_id, sender_key, queue_mode, notifier_id, notifier_key, rcv_ntf_dh_secret, status, updated_at, link_id, fixed_data, user_data)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)
(recipient_id, recipient_keys, rcv_dh_secret, sender_id, sender_key, queue_mode, notifier_id, notifier_key, rcv_ntf_dh_secret, ntf_service_id, status, updated_at, link_id, rcv_service_id, fixed_data, user_data)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
|]
insertServiceQuery :: Query
insertServiceQuery =
[sql|
INSERT INTO services
(service_id, service_role, service_cert, service_cert_hash, created_at)
VALUES (?,?,?,?,?)
|]
foldServiceRecs :: forall a q. Monoid a => PostgresQueueStore q -> (ServiceRec -> IO a) -> IO a
foldServiceRecs st f =
withConnection (dbStore st) $ \db ->
DB.fold_ db "SELECT service_id, service_role, service_cert, service_cert_hash, created_at FROM services" mempty $
\ !acc -> fmap (acc <>) . f . rowToServiceRec
foldQueueRecs :: forall a q. Monoid a => Bool -> Bool -> PostgresQueueStore q -> Maybe Int64 -> ((RecipientId, QueueRec) -> IO a) -> IO a
foldQueueRecs tty withData st skipOld_ f = do
(n, r) <- withConnection (dbStore st) $ \db ->
@@ -417,12 +516,11 @@ foldQueueRecs tty withData st skipOld_ f = do
where
foldRecs db acc f' = case skipOld_ of
Nothing
| withData -> DB.fold_ db (query <> " WHERE deleted_at IS NULL") acc $ \acc' -> f' acc' . rowToQueueRecWithData
| otherwise -> DB.fold_ db (query <> " WHERE deleted_at IS NULL") acc $ \acc' -> f' acc' . rowToQueueRec
| withData -> DB.fold_ db (queueRecQueryWithData <> " WHERE deleted_at IS NULL") acc $ \acc' -> f' acc' . rowToQueueRecWithData
| otherwise -> DB.fold_ db (queueRecQuery <> " WHERE deleted_at IS NULL") acc $ \acc' -> f' acc' . rowToQueueRec
Just old
| withData -> DB.fold db (query <> " WHERE deleted_at IS NULL AND updated_at > ?") (Only old) acc $ \acc' -> f' acc' . rowToQueueRecWithData
| otherwise -> DB.fold db (query <> " WHERE deleted_at IS NULL AND updated_at > ?") (Only old) acc $ \acc' -> f' acc' . rowToQueueRec
query = if withData then queueRecQueryWithData else queueRecQuery
| withData -> DB.fold db (queueRecQueryWithData <> " WHERE deleted_at IS NULL AND updated_at > ?") (Only old) acc $ \acc' -> f' acc' . rowToQueueRecWithData
| otherwise -> DB.fold db (queueRecQuery <> " WHERE deleted_at IS NULL AND updated_at > ?") (Only old) acc $ \acc' -> f' acc' . rowToQueueRec
progress i = "Processed: " <> show i <> " records"
queueRecQuery :: Query
@@ -430,9 +528,8 @@ queueRecQuery =
[sql|
SELECT recipient_id, recipient_keys, rcv_dh_secret,
sender_id, sender_key, queue_mode,
notifier_id, notifier_key, rcv_ntf_dh_secret,
status, updated_at,
link_id
notifier_id, notifier_key, rcv_ntf_dh_secret, ntf_service_id,
status, updated_at, link_id, rcv_service_id
FROM msg_queues
|]
@@ -441,23 +538,28 @@ queueRecQueryWithData =
[sql|
SELECT recipient_id, recipient_keys, rcv_dh_secret,
sender_id, sender_key, queue_mode,
notifier_id, notifier_key, rcv_ntf_dh_secret,
status, updated_at,
link_id, fixed_data, user_data
notifier_id, notifier_key, rcv_ntf_dh_secret, ntf_service_id,
status, updated_at, link_id, rcv_service_id,
fixed_data, user_data
FROM msg_queues
|]
type QueueRecRow = (RecipientId, NonEmpty RcvPublicAuthKey, RcvDhSecret, SenderId, Maybe SndPublicAuthKey, Maybe QueueMode, Maybe NotifierId, Maybe NtfPublicAuthKey, Maybe RcvNtfDhSecret, ServerEntityStatus, Maybe RoundedSystemTime, Maybe LinkId)
type QueueRecRow =
( RecipientId, NonEmpty RcvPublicAuthKey, RcvDhSecret,
SenderId, Maybe SndPublicAuthKey, Maybe QueueMode,
Maybe NotifierId, Maybe NtfPublicAuthKey, Maybe RcvNtfDhSecret, Maybe ServiceId,
ServerEntityStatus, Maybe RoundedSystemTime, Maybe LinkId, Maybe ServiceId
)
queueRecToRow :: (RecipientId, QueueRec) -> QueueRecRow :. (Maybe EncDataBytes, Maybe EncDataBytes)
queueRecToRow (rId, QueueRec {recipientKeys, rcvDhSecret, senderId, senderKey, queueMode, queueData, notifier = n, status, updatedAt}) =
(rId, recipientKeys, rcvDhSecret, senderId, senderKey, queueMode, notifierId <$> n, notifierKey <$> n, rcvNtfDhSecret <$> n, status, updatedAt, linkId_)
queueRecToRow (rId, QueueRec {recipientKeys, rcvDhSecret, senderId, senderKey, queueMode, queueData, notifier = n, status, updatedAt, rcvServiceId}) =
(rId, recipientKeys, rcvDhSecret, senderId, senderKey, queueMode, notifierId <$> n, notifierKey <$> n, rcvNtfDhSecret <$> n, ntfServiceId =<< n, status, updatedAt, linkId_, rcvServiceId)
:. (fst <$> queueData_, snd <$> queueData_)
where
(linkId_, queueData_) = queueDataColumns queueData
queueRecToText :: (RecipientId, QueueRec) -> ByteString
queueRecToText (rId, QueueRec {recipientKeys, rcvDhSecret, senderId, senderKey, queueMode, queueData, notifier = n, status, updatedAt}) =
queueRecToText (rId, QueueRec {recipientKeys, rcvDhSecret, senderId, senderKey, queueMode, queueData, notifier = n, status, updatedAt, rcvServiceId}) =
LB.toStrict $ BB.toLazyByteString $ mconcat tabFields <> BB.char7 '\n'
where
tabFields = BB.char7 ',' `intersperse` fields
@@ -471,9 +573,11 @@ queueRecToText (rId, QueueRec {recipientKeys, rcvDhSecret, senderId, senderKey,
nullable (notifierId <$> n),
nullable (notifierKey <$> n),
nullable (rcvNtfDhSecret <$> n),
nullable (ntfServiceId =<< n),
BB.char7 '"' <> renderField (toField status) <> BB.char7 '"',
nullable updatedAt,
nullable linkId_,
nullable rcvServiceId,
nullable (fst <$> queueData_),
nullable (snd <$> queueData_)
]
@@ -494,19 +598,32 @@ queueDataColumns = \case
Nothing -> (Nothing, Nothing)
rowToQueueRec :: QueueRecRow -> (RecipientId, QueueRec)
rowToQueueRec (rId, recipientKeys, rcvDhSecret, senderId, senderKey, queueMode, notifierId_, notifierKey_, rcvNtfDhSecret_, status, updatedAt, linkId_) =
let notifier = NtfCreds <$> notifierId_ <*> notifierKey_ <*> rcvNtfDhSecret_
rowToQueueRec (rId, recipientKeys, rcvDhSecret, senderId, senderKey, queueMode, notifierId_, notifierKey_, rcvNtfDhSecret_, ntfServiceId, status, updatedAt, linkId_, rcvServiceId) =
let notifier = mkNotifier (notifierId_, notifierKey_, rcvNtfDhSecret_) ntfServiceId
queueData = (,(EncDataBytes "", EncDataBytes "")) <$> linkId_
in (rId, QueueRec {recipientKeys, rcvDhSecret, senderId, senderKey, queueMode, queueData, notifier, status, updatedAt})
in (rId, QueueRec {recipientKeys, rcvDhSecret, senderId, senderKey, queueMode, queueData, notifier, status, updatedAt, rcvServiceId})
rowToQueueRecWithData :: QueueRecRow :. (Maybe EncDataBytes, Maybe EncDataBytes) -> (RecipientId, QueueRec)
rowToQueueRecWithData ((rId, recipientKeys, rcvDhSecret, senderId, senderKey, queueMode, notifierId_, notifierKey_, rcvNtfDhSecret_, status, updatedAt, linkId_) :. (immutableData_, userData_)) =
let notifier = NtfCreds <$> notifierId_ <*> notifierKey_ <*> rcvNtfDhSecret_
rowToQueueRecWithData ((rId, recipientKeys, rcvDhSecret, senderId, senderKey, queueMode, notifierId_, notifierKey_, rcvNtfDhSecret_, ntfServiceId, status, updatedAt, linkId_, rcvServiceId) :. (immutableData_, userData_)) =
let notifier = mkNotifier (notifierId_, notifierKey_, rcvNtfDhSecret_) ntfServiceId
encData = fromMaybe (EncDataBytes "")
queueData = (,(encData immutableData_, encData userData_)) <$> linkId_
in (rId, QueueRec {recipientKeys, rcvDhSecret, senderId, senderKey, queueMode, queueData, notifier, status, updatedAt})
in (rId, QueueRec {recipientKeys, rcvDhSecret, senderId, senderKey, queueMode, queueData, notifier, status, updatedAt, rcvServiceId})
setStatusDB :: StoreQueueClass q => String -> PostgresQueueStore q -> q -> ServerEntityStatus -> ExceptT ErrorType IO () -> IO (Either ErrorType ())
mkNotifier :: (Maybe NotifierId, Maybe NtfPublicAuthKey, Maybe RcvNtfDhSecret) -> Maybe ServiceId -> Maybe NtfCreds
mkNotifier (Just notifierId, Just notifierKey, Just rcvNtfDhSecret) ntfServiceId =
Just NtfCreds {notifierId, notifierKey, rcvNtfDhSecret, ntfServiceId}
mkNotifier _ _ = Nothing
serviceRecToRow :: ServiceRec -> (ServiceId, SMPServiceRole, X.CertificateChain, Binary ByteString, RoundedSystemTime)
serviceRecToRow ServiceRec {serviceId, serviceRole, serviceCert, serviceCertHash = XV.Fingerprint fp, serviceCreatedAt} =
(serviceId, serviceRole, serviceCert, Binary fp, serviceCreatedAt)
rowToServiceRec :: (ServiceId, SMPServiceRole, X.CertificateChain, Binary ByteString, RoundedSystemTime) -> ServiceRec
rowToServiceRec (serviceId, serviceRole, serviceCert, Binary fp, serviceCreatedAt) =
ServiceRec {serviceId, serviceRole, serviceCert, serviceCertHash = XV.Fingerprint fp, serviceCreatedAt}
setStatusDB :: StoreQueueClass q => Text -> PostgresQueueStore q -> q -> ServerEntityStatus -> ExceptT ErrorType IO () -> IO (Either ErrorType ())
setStatusDB op st sq status writeLog =
withQueueRec sq op $ \q -> do
assertUpdated $ withDB' op st $ \db ->
@@ -514,33 +631,33 @@ setStatusDB op st sq status writeLog =
atomically $ writeTVar (queueRec sq) $ Just q {status}
writeLog
withQueueRec :: StoreQueueClass q => q -> String -> (QueueRec -> ExceptT ErrorType IO a) -> IO (Either ErrorType a)
withQueueRec :: StoreQueueClass q => q -> Text -> (QueueRec -> ExceptT ErrorType IO a) -> IO (Either ErrorType a)
withQueueRec sq op action =
withQueueLock sq op $ E.uninterruptibleMask_ $ runExceptT $ ExceptT (readQueueRecIO $ queueRec sq) >>= action
assertUpdated :: ExceptT ErrorType IO Int64 -> ExceptT ErrorType IO ()
assertUpdated = (>>= \n -> when (n == 0) (throwE AUTH))
withDB' :: String -> PostgresQueueStore q -> (DB.Connection -> IO a) -> ExceptT ErrorType IO a
withDB' :: Text -> PostgresQueueStore q -> (DB.Connection -> IO a) -> ExceptT ErrorType IO a
withDB' op st action = withDB op st $ fmap Right . action
withDB :: forall a q. String -> PostgresQueueStore q -> (DB.Connection -> IO (Either ErrorType a)) -> ExceptT ErrorType IO a
withDB :: forall a q. Text -> PostgresQueueStore q -> (DB.Connection -> IO (Either ErrorType a)) -> ExceptT ErrorType IO a
withDB op st action =
ExceptT $ E.try (withConnection (dbStore st) action) >>= either logErr pure
where
logErr :: E.SomeException -> IO (Either ErrorType a)
logErr e = logError ("STORE: " <> T.pack err) $> Left (STORE err)
logErr e = logError ("STORE: " <> err) $> Left (STORE err)
where
err = op <> ", withDB, " <> show e
err = op <> ", withDB, " <> tshow e
withLog :: MonadIO m => String -> PostgresQueueStore q -> (StoreLog 'WriteMode -> IO ()) -> m ()
withLog :: MonadIO m => Text -> PostgresQueueStore q -> (StoreLog 'WriteMode -> IO ()) -> m ()
withLog op PostgresQueueStore {dbStoreLog} = withLog_ op dbStoreLog
{-# INLINE withLog #-}
withLog_ :: MonadIO m => String -> Maybe (StoreLog 'WriteMode) -> (StoreLog 'WriteMode -> IO ()) -> m ()
withLog_ :: MonadIO m => Text -> Maybe (StoreLog 'WriteMode) -> (StoreLog 'WriteMode -> IO ()) -> m ()
withLog_ op sl_ action =
forM_ sl_ $ \sl -> liftIO $ action sl `catchAny` \e ->
logWarn $ "STORE: " <> T.pack (op <> ", withLog, " <> show e)
logWarn $ "STORE: " <> op <> ", withLog, " <> tshow e
handleDuplicate :: SqlError -> IO ErrorType
handleDuplicate e = case constraintViolation e of
@@ -553,6 +670,14 @@ instance ToField (NonEmpty C.APublicAuthKey) where toField = toField . Binary .
instance FromField (NonEmpty C.APublicAuthKey) where fromField = blobFieldDecoder smpDecode
instance ToField SMPServiceRole where toField = toField . decodeLatin1 . smpEncode
instance FromField SMPServiceRole where fromField = fromTextField_ $ eitherToMaybe . smpDecode . encodeUtf8
instance ToField X.CertificateChain where toField = toField . Binary . smpEncode . C.encodeCertChain
instance FromField X.CertificateChain where fromField = blobFieldDecoder (parseAll C.certChainP)
#if !defined(dbPostgres)
instance ToField EntityId where toField (EntityId s) = toField $ Binary s
@@ -13,7 +13,8 @@ serverSchemaMigrations :: [(String, Text, Maybe Text)]
serverSchemaMigrations =
[ ("20250207_initial", m20250207_initial, Nothing),
("20250319_updated_index", m20250319_updated_index, Just down_m20250319_updated_index),
("20250320_short_links", m20250320_short_links, Just down_m20250320_short_links)
("20250320_short_links", m20250320_short_links, Just down_m20250320_short_links),
("20250514_service_certs", m20250514_service_certs, Just down_m20250514_service_certs)
]
-- | The list of migrations in ascending order by date
@@ -48,7 +49,7 @@ CREATE INDEX idx_msg_queues_deleted_at ON msg_queues (deleted_at);
|]
m20250319_updated_index :: Text
m20250319_updated_index =
m20250319_updated_index =
T.pack
[r|
DROP INDEX idx_msg_queues_deleted_at;
@@ -119,3 +120,42 @@ UPDATE msg_queues SET recipient_keys = substring(recipient_keys from 3);
ALTER TABLE msg_queues RENAME COLUMN recipient_keys TO recipient_key;
|]
m20250514_service_certs :: Text
m20250514_service_certs =
T.pack
[r|
CREATE TABLE services(
service_id BYTEA NOT NULL,
service_role TEXT NOT NULL,
service_cert BYTEA NOT NULL,
service_cert_hash BYTEA NOT NULL UNIQUE,
created_at BIGINT NOT NULL,
PRIMARY KEY (service_id)
);
CREATE INDEX idx_services_service_role ON services(service_role);
ALTER TABLE msg_queues
ADD COLUMN rcv_service_id BYTEA REFERENCES services(service_id) ON DELETE SET NULL ON UPDATE RESTRICT,
ADD COLUMN ntf_service_id BYTEA REFERENCES services(service_id) ON DELETE SET NULL ON UPDATE RESTRICT;
CREATE INDEX idx_msg_queues_rcv_service_id ON msg_queues(rcv_service_id, deleted_at);
CREATE INDEX idx_msg_queues_ntf_service_id ON msg_queues(ntf_service_id, deleted_at);
|]
down_m20250514_service_certs :: Text
down_m20250514_service_certs =
T.pack
[r|
DROP INDEX idx_msg_queues_rcv_service_id;
DROP INDEX idx_msg_queues_ntf_service_id;
ALTER TABLE msg_queues
DROP COLUMN rcv_service_id,
DROP COLUMN ntf_service_id;
DROP INDEX idx_services_service_role;
DROP TABLE services;
|]
@@ -41,7 +41,19 @@ CREATE TABLE smp_server.msg_queues (
queue_mode text,
link_id bytea,
fixed_data bytea,
user_data bytea
user_data bytea,
rcv_service_id bytea,
ntf_service_id bytea
);
CREATE TABLE smp_server.services (
service_id bytea NOT NULL,
service_role text NOT NULL,
service_cert bytea NOT NULL,
service_cert_hash bytea NOT NULL,
created_at bigint NOT NULL
);
@@ -56,6 +68,16 @@ ALTER TABLE ONLY smp_server.msg_queues
ALTER TABLE ONLY smp_server.services
ADD CONSTRAINT services_pkey PRIMARY KEY (service_id);
ALTER TABLE ONLY smp_server.services
ADD CONSTRAINT services_service_cert_hash_key UNIQUE (service_cert_hash);
CREATE UNIQUE INDEX idx_msg_queues_link_id ON smp_server.msg_queues USING btree (link_id);
@@ -64,6 +86,14 @@ CREATE UNIQUE INDEX idx_msg_queues_notifier_id ON smp_server.msg_queues USING bt
CREATE INDEX idx_msg_queues_ntf_service_id ON smp_server.msg_queues USING btree (ntf_service_id, deleted_at);
CREATE INDEX idx_msg_queues_rcv_service_id ON smp_server.msg_queues USING btree (rcv_service_id, deleted_at);
CREATE UNIQUE INDEX idx_msg_queues_sender_id ON smp_server.msg_queues USING btree (sender_id);
@@ -72,3 +102,17 @@ CREATE INDEX idx_msg_queues_updated_at ON smp_server.msg_queues USING btree (del
CREATE INDEX idx_services_service_role ON smp_server.services USING btree (service_role);
ALTER TABLE ONLY smp_server.msg_queues
ADD CONSTRAINT msg_queues_ntf_service_id_fkey FOREIGN KEY (ntf_service_id) REFERENCES smp_server.services(service_id) ON UPDATE RESTRICT ON DELETE SET NULL;
ALTER TABLE ONLY smp_server.msg_queues
ADD CONSTRAINT msg_queues_rcv_service_id_fkey FOREIGN KEY (rcv_service_id) REFERENCES smp_server.services(service_id) ON UPDATE RESTRICT ON DELETE SET NULL;
+149 -24
View File
@@ -16,6 +16,7 @@
module Simplex.Messaging.Server.QueueStore.STM
( STMQueueStore (..),
STMService (..),
setStoreLog,
withLog',
readQueueRecIO,
@@ -28,16 +29,22 @@ import Control.Logger.Simple
import Control.Monad
import Data.Bitraversable (bimapM)
import Data.Functor (($>))
import Data.Int (Int64)
import Data.List (partition)
import Data.List.NonEmpty (NonEmpty)
import qualified Data.Map.Strict as M
import qualified Data.Text as T
import Data.Set (Set)
import qualified Data.Set as S
import Data.Text (Text)
import qualified Data.X509.Validation as XV
import Simplex.Messaging.Protocol
import Simplex.Messaging.Server.QueueStore
import Simplex.Messaging.Server.QueueStore.Types
import Simplex.Messaging.Server.StoreLog
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Util (anyM, ifM, ($>>), ($>>=), (<$$))
import Simplex.Messaging.Transport (SMPServiceRole (..))
import Simplex.Messaging.Util (anyM, ifM, tshow, ($>>), ($>>=), (<$$))
import System.IO
import UnliftIO.STM
@@ -45,10 +52,18 @@ data STMQueueStore q = STMQueueStore
{ queues :: TMap RecipientId q,
senders :: TMap SenderId RecipientId,
notifiers :: TMap NotifierId RecipientId,
services :: TMap ServiceId STMService,
serviceCerts :: TMap CertFingerprint ServiceId,
links :: TMap LinkId RecipientId,
storeLog :: TVar (Maybe (StoreLog 'WriteMode))
}
data STMService = STMService
{ serviceRec :: ServiceRec,
serviceRcvQueues :: TVar (Set RecipientId),
serviceNtfQueues :: TVar (Set NotifierId)
}
setStoreLog :: STMQueueStore q -> StoreLog 'WriteMode -> IO ()
setStoreLog st sl = atomically $ writeTVar (storeLog st) (Just sl)
@@ -60,9 +75,11 @@ instance StoreQueueClass q => QueueStoreClass q (STMQueueStore q) where
queues <- TM.emptyIO
senders <- TM.emptyIO
notifiers <- TM.emptyIO
services <- TM.emptyIO
serviceCerts <- TM.emptyIO
links <- TM.emptyIO
storeLog <- newTVarIO Nothing
pure STMQueueStore {queues, senders, notifiers, links, storeLog}
pure STMQueueStore {queues, senders, notifiers, links, services, serviceCerts, storeLog}
closeQueueStore :: STMQueueStore q -> IO ()
closeQueueStore STMQueueStore {queues, senders, notifiers, storeLog} = do
@@ -76,11 +93,25 @@ instance StoreQueueClass q => QueueStoreClass q (STMQueueStore q) where
compactQueues _ = pure 0
{-# INLINE compactQueues #-}
queueCounts :: STMQueueStore q -> IO QueueCounts
queueCounts st = do
getEntityCounts :: STMQueueStore q -> IO EntityCounts
getEntityCounts st = do
queueCount <- M.size <$> readTVarIO (queues st)
notifierCount <- M.size <$> readTVarIO (notifiers st)
pure QueueCounts {queueCount, notifierCount}
ss <- readTVarIO (services st)
rcvServiceQueuesCount <- serviceQueuesCount serviceRcvQueues ss
ntfServiceQueuesCount <- serviceQueuesCount serviceNtfQueues ss
pure
EntityCounts
{ queueCount,
notifierCount,
rcvServiceCount = serviceCount SRMessaging ss,
ntfServiceCount = serviceCount SRNotifier ss,
rcvServiceQueuesCount,
ntfServiceQueuesCount
}
where
serviceCount role = M.foldl' (\ !n s -> if serviceRole (serviceRec s) == role then n + 1 else n) 0
serviceQueuesCount serviceSel = foldM (\n s -> (n +) . S.size <$> readTVarIO (serviceSel s)) 0
addQueue_ :: STMQueueStore q -> (RecipientId -> QueueRec -> IO q) -> RecipientId -> QueueRec -> IO (Either ErrorType q)
addQueue_ st mkQ rId qr@QueueRec {senderId = sId, notifier, queueData} = do
@@ -101,11 +132,13 @@ instance StoreQueueClass q => QueueStoreClass q (STMQueueStore q) where
getQueue_ st _ party qId =
maybe (Left AUTH) Right <$> case party of
SRecipient -> TM.lookupIO qId queues
SSender -> TM.lookupIO qId senders $>>= (`TM.lookupIO` queues)
SSender -> getSndQueue
SProxyService -> getSndQueue
SNotifier -> TM.lookupIO qId notifiers $>>= (`TM.lookupIO` queues)
SSenderLink -> TM.lookupIO qId links $>>= (`TM.lookupIO` queues)
where
STMQueueStore {queues, senders, notifiers, links} = st
getSndQueue = TM.lookupIO qId senders $>>= (`TM.lookupIO` queues)
getQueueLinkData :: STMQueueStore q -> q -> LinkId -> IO (Either ErrorType QueueLinkData)
getQueueLinkData _ q lnkId = atomically $ readQueueRec (queueRec q) $>>= pure . getData
@@ -162,31 +195,31 @@ instance StoreQueueClass q => QueueStoreClass q (STMQueueStore q) where
writeTVar qr $ Just q {senderKey = Just sKey}
pure $ Right ()
addQueueNotifier :: STMQueueStore q -> q -> NtfCreds -> IO (Either ErrorType (Maybe NotifierId))
addQueueNotifier :: STMQueueStore q -> q -> NtfCreds -> IO (Either ErrorType (Maybe NtfCreds))
addQueueNotifier st sq ntfCreds@NtfCreds {notifierId = nId} =
atomically (readQueueRec qr $>>= add)
$>>= \nId_ -> nId_ <$$ withLog "addQueueNotifier" st (\s -> logAddNotifier s rId ntfCreds)
$>>= \nc_ -> nc_ <$$ withLog "addQueueNotifier" st (\s -> logAddNotifier s rId ntfCreds)
where
rId = recipientId sq
qr = queueRec sq
STMQueueStore {notifiers} = st
add q = ifM (TM.member nId notifiers) (pure $ Left DUPLICATE_) $ do
nId_ <- forM (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId notifiers $> notifierId
nc_ <- forM (notifier q) $ \nc -> nc <$ removeNotifier st nc
let !q' = q {notifier = Just ntfCreds}
writeTVar qr $ Just q'
TM.insert nId rId notifiers
pure $ Right nId_
pure $ Right nc_
deleteQueueNotifier :: STMQueueStore q -> q -> IO (Either ErrorType (Maybe NotifierId))
deleteQueueNotifier :: STMQueueStore q -> q -> IO (Either ErrorType (Maybe NtfCreds))
deleteQueueNotifier st sq =
withQueueRec qr delete
$>>= \nId_ -> nId_ <$$ withLog "deleteQueueNotifier" st (`logDeleteNotifier` recipientId sq)
$>>= \nc_ -> nc_ <$$ withLog "deleteQueueNotifier" st (`logDeleteNotifier` recipientId sq)
where
qr = queueRec sq
delete q = forM (notifier q) $ \NtfCreds {notifierId} -> do
TM.delete notifierId $ notifiers st
delete q = forM (notifier q) $ \nc -> do
removeNotifier st nc
writeTVar qr $ Just q {notifier = Nothing}
pure notifierId
pure nc
suspendQueue :: STMQueueStore q -> q -> IO (Either ErrorType ())
suspendQueue st sq =
@@ -219,16 +252,93 @@ instance StoreQueueClass q => QueueStoreClass q (STMQueueStore q) where
deleteStoreQueue :: STMQueueStore q -> q -> IO (Either ErrorType (QueueRec, Maybe (MsgQueue q)))
deleteStoreQueue st sq =
withQueueRec qr delete
$>>= \q -> withLog "deleteStoreQueue" st (`logDeleteQueue` recipientId sq)
$>>= \q -> withLog "deleteStoreQueue" st (`logDeleteQueue` rId)
>>= mapM (\_ -> (q,) <$> atomically (swapTVar (msgQueue sq) Nothing))
where
rId = recipientId sq
qr = queueRec sq
delete q = do
delete q@QueueRec {senderId, rcvServiceId} = do
writeTVar qr Nothing
TM.delete (senderId q) $ senders st
forM_ (notifier q) $ \NtfCreds {notifierId} -> TM.delete notifierId $ notifiers st
TM.delete senderId $ senders st
mapM_ (removeServiceQueue st serviceRcvQueues rId) rcvServiceId
mapM_ (removeNotifier st) $ notifier q
pure q
getCreateService :: STMQueueStore q -> ServiceRec -> IO (Either ErrorType ServiceId)
getCreateService st sr@ServiceRec {serviceId = newSrvId, serviceRole, serviceCertHash = XV.Fingerprint fp} =
TM.lookupIO fp serviceCerts
>>= maybe
(atomically $ TM.lookup fp serviceCerts >>= maybe newService checkService)
(atomically . checkService)
$>>= \(serviceId, new) ->
if new
then serviceId <$$ withLog "getCreateService" st (`logNewService` sr)
else pure $ Right serviceId
where
STMQueueStore {services, serviceCerts} = st
checkService sId =
TM.lookup sId services >>= \case
Just STMService {serviceRec = ServiceRec {serviceId, serviceRole = role}}
| role == serviceRole -> pure $ Right (serviceId, False)
| otherwise -> pure $ Left $ SERVICE
Nothing -> newService_
newService = ifM (TM.member newSrvId services) (pure $ Left DUPLICATE_) newService_
newService_ = do
TM.insertM newSrvId newSTMService services
TM.insert fp newSrvId serviceCerts
pure $ Right (newSrvId, True)
newSTMService = do
serviceRcvQueues <- newTVar S.empty
serviceNtfQueues <- newTVar S.empty
pure STMService {serviceRec = sr, serviceRcvQueues, serviceNtfQueues}
setQueueService :: (PartyI p, SubscriberParty p) => STMQueueStore q -> q -> SParty p -> Maybe ServiceId -> IO (Either ErrorType ())
setQueueService st sq party serviceId =
atomically (readQueueRec qr $>>= setService)
$>> withLog "setQueueService" st (\sl -> logQueueService sl rId party serviceId)
where
qr = queueRec sq
rId = recipientId sq
setService :: QueueRec -> STM (Either ErrorType ())
setService q@QueueRec {rcvServiceId = prevSrvId} = case party of
SRecipient
| prevSrvId == serviceId -> pure $ Right ()
| otherwise -> do
updateServiceQueues serviceRcvQueues rId prevSrvId
let !q' = Just q {rcvServiceId = serviceId}
writeTVar qr q' $> Right ()
SNotifier -> case notifier q of
Nothing -> pure $ Left AUTH
Just nc@NtfCreds {notifierId = nId, ntfServiceId = prevNtfSrvId}
| prevNtfSrvId == serviceId -> pure $ Right ()
| otherwise -> do
let !q' = Just q {notifier = Just nc {ntfServiceId = serviceId}}
updateServiceQueues serviceNtfQueues nId prevNtfSrvId
writeTVar qr q' $> Right ()
updateServiceQueues :: (STMService -> TVar (Set QueueId)) -> QueueId -> Maybe ServiceId -> STM ()
updateServiceQueues serviceSel qId prevSrvId = do
mapM_ (removeServiceQueue st serviceSel qId) prevSrvId
mapM_ (addServiceQueue st serviceSel qId) serviceId
getQueueNtfServices :: STMQueueStore q -> [(NotifierId, a)] -> IO (Either ErrorType ([(Maybe ServiceId, [(NotifierId, a)])], [(NotifierId, a)]))
getQueueNtfServices st ntfs = do
ss <- readTVarIO (services st)
(ssNtfs, noServiceNtfs) <- if M.null ss then pure ([], ntfs) else foldM addService ([], ntfs) (M.assocs ss)
ns <- readTVarIO (notifiers st)
let (ntfs', deleteNtfs) = partition (\(nId, _) -> M.member nId ns) noServiceNtfs
ssNtfs' = (Nothing, ntfs') : ssNtfs
pure $ Right (ssNtfs', deleteNtfs)
where
addService (ssNtfs, ntfs') (serviceId, s) = do
snIds <- readTVarIO $ serviceNtfQueues s
let (sNtfs, restNtfs) = partition (\(nId, _) -> S.member nId snIds) ntfs'
pure ((Just serviceId, sNtfs) : ssNtfs, restNtfs)
getNtfServiceQueueCount :: STMQueueStore q -> ServiceId -> IO (Either ErrorType Int64)
getNtfServiceQueueCount st serviceId =
TM.lookupIO serviceId (services st) >>=
maybe (pure $ Left AUTH) (fmap (Right . fromIntegral . S.size) . readTVarIO . serviceNtfQueues)
withQueueRec :: TVar (Maybe QueueRec) -> (QueueRec -> STM a) -> IO (Either ErrorType a)
withQueueRec qr a = atomically $ readQueueRec qr >>= mapM a
@@ -238,6 +348,21 @@ setStatus qr status =
Just q -> (Right (), Just q {status})
Nothing -> (Left AUTH, Nothing)
addServiceQueue :: STMQueueStore q -> (STMService -> TVar (Set QueueId)) -> QueueId -> ServiceId -> STM ()
addServiceQueue st serviceSel qId serviceId =
TM.lookup serviceId (services st) >>= mapM_ (\s -> modifyTVar' (serviceSel s) (S.insert qId))
{-# INLINE addServiceQueue #-}
removeServiceQueue :: STMQueueStore q -> (STMService -> TVar (Set QueueId)) -> QueueId -> ServiceId -> STM ()
removeServiceQueue st serviceSel qId serviceId =
TM.lookup serviceId (services st) >>= mapM_ (\s -> modifyTVar' (serviceSel s) (S.delete qId))
{-# INLINE removeServiceQueue #-}
removeNotifier :: STMQueueStore q -> NtfCreds -> STM ()
removeNotifier st NtfCreds {notifierId = nId, ntfServiceId} = do
TM.delete nId $ notifiers st
mapM_ (removeServiceQueue st serviceNtfQueues nId) ntfServiceId
readQueueRec :: TVar (Maybe QueueRec) -> STM (Either ErrorType QueueRec)
readQueueRec qr = maybe (Left AUTH) Right <$> readTVar qr
{-# INLINE readQueueRec #-}
@@ -246,16 +371,16 @@ readQueueRecIO :: TVar (Maybe QueueRec) -> IO (Either ErrorType QueueRec)
readQueueRecIO qr = maybe (Left AUTH) Right <$> readTVarIO qr
{-# INLINE readQueueRecIO #-}
withLog' :: String -> TVar (Maybe (StoreLog 'WriteMode)) -> (StoreLog 'WriteMode -> IO ()) -> IO (Either ErrorType ())
withLog' :: Text -> TVar (Maybe (StoreLog 'WriteMode)) -> (StoreLog 'WriteMode -> IO ()) -> IO (Either ErrorType ())
withLog' name sl action =
readTVarIO sl
>>= maybe (pure $ Right ()) (E.try . E.uninterruptibleMask_ . action >=> bimapM logErr pure)
where
logErr :: E.SomeException -> IO ErrorType
logErr e = logError ("STORE: " <> T.pack err) $> STORE err
logErr e = logError ("STORE: " <> err) $> STORE err
where
err = name <> ", withLog, " <> show e
err = name <> ", withLog, " <> tshow e
withLog :: String -> STMQueueStore q -> (StoreLog 'WriteMode -> IO ()) -> IO (Either ErrorType ())
withLog :: Text -> STMQueueStore q -> (StoreLog 'WriteMode -> IO ()) -> IO (Either ErrorType ())
withLog name = withLog' name . storeLog
{-# INLINE withLog #-}
@@ -11,6 +11,7 @@ import Control.Concurrent.STM
import Control.Monad
import Data.Int (Int64)
import Data.List.NonEmpty (NonEmpty)
import Data.Text (Text)
import Simplex.Messaging.Protocol
import Simplex.Messaging.Server.QueueStore
import Simplex.Messaging.TMap (TMap)
@@ -20,13 +21,13 @@ class StoreQueueClass q where
recipientId :: q -> RecipientId
queueRec :: q -> TVar (Maybe QueueRec)
msgQueue :: q -> TVar (Maybe (MsgQueue q))
withQueueLock :: q -> String -> IO a -> IO a
withQueueLock :: q -> Text -> IO a -> IO a
class StoreQueueClass q => QueueStoreClass q s where
type QueueStoreCfg s
newQueueStore :: QueueStoreCfg s -> IO s
closeQueueStore :: s -> IO ()
queueCounts :: s -> IO QueueCounts
getEntityCounts :: s -> IO EntityCounts
loadedQueues :: s -> TMap RecipientId q
compactQueues :: s -> IO Int64
addQueue_ :: s -> (RecipientId -> QueueRec -> IO q) -> RecipientId -> QueueRec -> IO (Either ErrorType q)
@@ -36,17 +37,25 @@ class StoreQueueClass q => QueueStoreClass q s where
deleteQueueLinkData :: s -> q -> IO (Either ErrorType ())
secureQueue :: s -> q -> SndPublicAuthKey -> IO (Either ErrorType ())
updateKeys :: s -> q -> NonEmpty RcvPublicAuthKey -> IO (Either ErrorType ())
addQueueNotifier :: s -> q -> NtfCreds -> IO (Either ErrorType (Maybe NotifierId))
deleteQueueNotifier :: s -> q -> IO (Either ErrorType (Maybe NotifierId))
addQueueNotifier :: s -> q -> NtfCreds -> IO (Either ErrorType (Maybe NtfCreds))
deleteQueueNotifier :: s -> q -> IO (Either ErrorType (Maybe NtfCreds))
suspendQueue :: s -> q -> IO (Either ErrorType ())
blockQueue :: s -> q -> BlockingInfo -> IO (Either ErrorType ())
unblockQueue :: s -> q -> IO (Either ErrorType ())
updateQueueTime :: s -> q -> RoundedSystemTime -> IO (Either ErrorType QueueRec)
deleteStoreQueue :: s -> q -> IO (Either ErrorType (QueueRec, Maybe (MsgQueue q)))
getCreateService :: s -> ServiceRec -> IO (Either ErrorType ServiceId)
setQueueService :: (PartyI p, SubscriberParty p) => s -> q -> SParty p -> Maybe ServiceId -> IO (Either ErrorType ())
getQueueNtfServices :: s -> [(NotifierId, a)] -> IO (Either ErrorType ([(Maybe ServiceId, [(NotifierId, a)])], [(NotifierId, a)]))
getNtfServiceQueueCount :: s -> ServiceId -> IO (Either ErrorType Int64)
data QueueCounts = QueueCounts
data EntityCounts = EntityCounts
{ queueCount :: Int,
notifierCount :: Int
notifierCount :: Int,
rcvServiceCount :: Int,
ntfServiceCount :: Int,
rcvServiceQueuesCount :: Int,
ntfServiceQueuesCount :: Int
}
withLoadedQueues :: (Monoid a, QueueStoreClass q s) => s -> (q -> IO a) -> IO a