servers: maintain xor-hash of all associated queue IDs in PostgreSQL (#1668)

* servers: maintain xor-hash of all associated queue IDs in PostgreSQL (#1615)

* ntf server: maintain xor-hash of all associated queue IDs via PostgreSQL triggers

* smp server: xor hash with triggers

* fix sql and using pgcrypto extension in tests

* track counts and hashes in smp/ntf servers via triggers, smp server stats for service subscription, update SMP protocol to pass expected count and hash in SSUB/NSSUB commands

* agent migrations with functions/triggers

* remove agent triggers

* try tracking service subs in the agent (WIP, does not compile)

* Revert "try tracking service subs in the agent (WIP, does not compile)"

This reverts commit 59e908100d.

* comment

* agent database triggers

* service subscriptions in the client

* test / fix client services

* update schema

* fix postgres migration

* update schema

* move schema test to the end

* use static function with SQLite to avoid dynamic wrapper
This commit is contained in:
Evgeny
2025-11-25 16:55:59 +00:00
committed by GitHub
parent 1ca4677b28
commit 3ccf854865
44 changed files with 2969 additions and 331 deletions
@@ -355,8 +355,8 @@ instance QueueStoreClass (JournalQueue s) (QStore s) where
{-# INLINE setQueueService #-}
getQueueNtfServices = withQS (getQueueNtfServices @(JournalQueue s))
{-# INLINE getQueueNtfServices #-}
getServiceQueueCount = withQS (getServiceQueueCount @(JournalQueue s))
{-# INLINE getServiceQueueCount #-}
getServiceQueueCountHash = withQS (getServiceQueueCountHash @(JournalQueue s))
{-# INLINE getServiceQueueCountHash #-}
makeQueue_ :: JournalMsgStore s -> RecipientId -> QueueRec -> Lock -> IO (JournalQueue s)
makeQueue_ JournalMsgStore {sharedLock} rId qr queueLock = do
@@ -21,6 +21,7 @@ import Simplex.Messaging.Transport (simplexMQVersion)
import Simplex.Messaging.Transport.Server (SocketStats (..))
import Simplex.Messaging.Util (tshow)
-- TODO [certs rcv] add service subscriptions and count/hash diffs
data ServerMetrics = ServerMetrics
{ statsData :: ServerStatsData,
activeQueueCounts :: PeriodStatCounts,
@@ -65,6 +65,7 @@ data ServiceRec = ServiceRec
serviceCert :: X.CertificateChain,
serviceCertHash :: XV.Fingerprint, -- SHA512 hash of long-term service client certificate. See comment for ClientHandshake.
serviceCreatedAt :: SystemDate
-- entitiesHash :: IdsHash -- a xor-hash of all associated entities
}
deriving (Show)
@@ -524,15 +524,11 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
let (sNtfs, restNtfs) = partition (\(nId, _) -> S.member nId snIds) ntfs'
in ((serviceId, sNtfs) : ssNtfs, restNtfs)
getServiceQueueCount :: (PartyI p, ServiceParty p) => PostgresQueueStore q -> SParty p -> ServiceId -> IO (Either ErrorType Int64)
getServiceQueueCount st party serviceId =
E.uninterruptibleMask_ $ runExceptT $ withDB' "getServiceQueueCount" st $ \db ->
maybeFirstRow' 0 fromOnly $
DB.query db query (Only serviceId)
where
query = case party of
SRecipientService -> "SELECT count(1) FROM msg_queues WHERE rcv_service_id = ? AND deleted_at IS NULL"
SNotifierService -> "SELECT count(1) FROM msg_queues WHERE ntf_service_id = ? AND deleted_at IS NULL"
getServiceQueueCountHash :: (PartyI p, ServiceParty p) => PostgresQueueStore q -> SParty p -> ServiceId -> IO (Either ErrorType (Int64, IdsHash))
getServiceQueueCountHash st party serviceId =
E.uninterruptibleMask_ $ runExceptT $ withDB' "getServiceQueueCountHash" st $ \db ->
maybeFirstRow' (0, mempty) id $
DB.query db ("SELECT queue_count, queue_ids_hash FROM services WHERE service_id = ? AND service_role = ?") (serviceId, partyServiceRole party)
batchInsertServices :: [STMService] -> PostgresQueueStore q -> IO Int64
batchInsertServices services' toStore =
@@ -793,6 +789,10 @@ instance ToField C.APublicAuthKey where toField = toField . Binary . C.encodePub
instance FromField C.APublicAuthKey where fromField = blobFieldDecoder C.decodePubKey
instance ToField IdsHash where toField (IdsHash s) = toField (Binary s)
deriving newtype instance FromField IdsHash
instance ToField EncDataBytes where toField (EncDataBytes s) = toField (Binary s)
deriving newtype instance FromField EncDataBytes
@@ -7,6 +7,7 @@ module Simplex.Messaging.Server.QueueStore.Postgres.Migrations where
import Data.List (sortOn)
import Data.Text (Text)
import Simplex.Messaging.Agent.Store.Shared
import Simplex.Messaging.Agent.Store.Postgres.Migrations.Util
import Text.RawString.QQ (r)
serverSchemaMigrations :: [(String, Text, Maybe Text)]
@@ -15,7 +16,8 @@ serverSchemaMigrations =
("20250319_updated_index", m20250319_updated_index, Just down_m20250319_updated_index),
("20250320_short_links", m20250320_short_links, Just down_m20250320_short_links),
("20250514_service_certs", m20250514_service_certs, Just down_m20250514_service_certs),
("20250903_store_messages", m20250903_store_messages, Just down_m20250903_store_messages)
("20250903_store_messages", m20250903_store_messages, Just down_m20250903_store_messages),
("20250915_queue_ids_hash", m20250915_queue_ids_hash, Just down_m20250915_queue_ids_hash)
]
-- | The list of migrations in ascending order by date
@@ -447,3 +449,139 @@ ALTER TABLE msg_queues
DROP TABLE messages;
|]
m20250915_queue_ids_hash :: Text
m20250915_queue_ids_hash =
createXorHashFuncs
<> [r|
ALTER TABLE services
ADD COLUMN queue_count BIGINT NOT NULL DEFAULT 0,
ADD COLUMN queue_ids_hash BYTEA NOT NULL DEFAULT '\x00000000000000000000000000000000';
CREATE FUNCTION update_all_aggregates() RETURNS VOID
LANGUAGE plpgsql
AS $$
BEGIN
WITH acc AS (
SELECT
s.service_id,
count(1) as q_count,
xor_aggregate(public.digest(CASE WHEN s.service_role = 'M' THEN q.recipient_id ELSE COALESCE(q.notifier_id, '\x00000000000000000000000000000000') END, 'md5')) AS q_ids_hash
FROM services s
JOIN msg_queues q ON (s.service_id = q.rcv_service_id AND s.service_role = 'M') OR (s.service_id = q.ntf_service_id AND s.service_role = 'N')
WHERE q.deleted_at IS NULL
GROUP BY s.service_id
)
UPDATE services s
SET queue_count = COALESCE(acc.q_count, 0),
queue_ids_hash = COALESCE(acc.q_ids_hash, '\x00000000000000000000000000000000')
FROM acc
WHERE s.service_id = acc.service_id;
END;
$$;
SELECT update_all_aggregates();
CREATE FUNCTION update_aggregates(p_service_id BYTEA, p_role TEXT, p_queue_id BYTEA, p_change BIGINT) RETURNS VOID
LANGUAGE plpgsql
AS $$
BEGIN
UPDATE services
SET queue_count = queue_count + p_change,
queue_ids_hash = xor_combine(queue_ids_hash, public.digest(p_queue_id, 'md5'))
WHERE service_id = p_service_id AND service_role = p_role;
END;
$$;
CREATE FUNCTION on_queue_insert() RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
BEGIN
IF NEW.rcv_service_id IS NOT NULL THEN
PERFORM update_aggregates(NEW.rcv_service_id, 'M', NEW.recipient_id, 1);
END IF;
IF NEW.ntf_service_id IS NOT NULL AND NEW.notifier_id IS NOT NULL THEN
PERFORM update_aggregates(NEW.ntf_service_id, 'N', NEW.notifier_id, 1);
END IF;
RETURN NEW;
END;
$$;
CREATE FUNCTION on_queue_delete() RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
BEGIN
IF OLD.deleted_at IS NULL THEN
IF OLD.rcv_service_id IS NOT NULL THEN
PERFORM update_aggregates(OLD.rcv_service_id, 'M', OLD.recipient_id, -1);
END IF;
IF OLD.ntf_service_id IS NOT NULL AND OLD.notifier_id IS NOT NULL THEN
PERFORM update_aggregates(OLD.ntf_service_id, 'N', OLD.notifier_id, -1);
END IF;
END IF;
RETURN OLD;
END;
$$;
CREATE FUNCTION on_queue_update() RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
BEGIN
IF OLD.deleted_at IS NULL AND OLD.rcv_service_id IS NOT NULL THEN
IF NOT (NEW.deleted_at IS NULL AND NEW.rcv_service_id IS NOT NULL) THEN
PERFORM update_aggregates(OLD.rcv_service_id, 'M', OLD.recipient_id, -1);
ELSIF OLD.rcv_service_id IS DISTINCT FROM NEW.rcv_service_id THEN
PERFORM update_aggregates(OLD.rcv_service_id, 'M', OLD.recipient_id, -1);
PERFORM update_aggregates(NEW.rcv_service_id, 'M', NEW.recipient_id, 1);
END IF;
ELSIF NEW.deleted_at IS NULL AND NEW.rcv_service_id IS NOT NULL THEN
PERFORM update_aggregates(NEW.rcv_service_id, 'M', NEW.recipient_id, 1);
END IF;
IF OLD.deleted_at IS NULL AND OLD.ntf_service_id IS NOT NULL AND OLD.notifier_id IS NOT NULL THEN
IF NOT (NEW.deleted_at IS NULL AND NEW.ntf_service_id IS NOT NULL AND NEW.notifier_id IS NOT NULL) THEN
PERFORM update_aggregates(OLD.ntf_service_id, 'N', OLD.notifier_id, -1);
ELSIF OLD.ntf_service_id IS DISTINCT FROM NEW.ntf_service_id OR OLD.notifier_id IS DISTINCT FROM NEW.notifier_id THEN
PERFORM update_aggregates(OLD.ntf_service_id, 'N', OLD.notifier_id, -1);
PERFORM update_aggregates(NEW.ntf_service_id, 'N', NEW.notifier_id, 1);
END IF;
ELSIF NEW.deleted_at IS NULL AND NEW.ntf_service_id IS NOT NULL AND NEW.notifier_id IS NOT NULL THEN
PERFORM update_aggregates(NEW.ntf_service_id, 'N', NEW.notifier_id, 1);
END IF;
RETURN NEW;
END;
$$;
CREATE TRIGGER tr_queue_insert
AFTER INSERT ON msg_queues
FOR EACH ROW EXECUTE PROCEDURE on_queue_insert();
CREATE TRIGGER tr_queue_delete
AFTER DELETE ON msg_queues
FOR EACH ROW EXECUTE PROCEDURE on_queue_delete();
CREATE TRIGGER tr_queue_update
AFTER UPDATE ON msg_queues
FOR EACH ROW EXECUTE PROCEDURE on_queue_update();
|]
down_m20250915_queue_ids_hash :: Text
down_m20250915_queue_ids_hash =
[r|
DROP TRIGGER tr_queue_insert ON msg_queues;
DROP TRIGGER tr_queue_delete ON msg_queues;
DROP TRIGGER tr_queue_update ON msg_queues;
DROP FUNCTION on_queue_insert;
DROP FUNCTION on_queue_delete;
DROP FUNCTION on_queue_update;
DROP FUNCTION update_aggregates;
DROP FUNCTION update_all_aggregates;
ALTER TABLE services
DROP COLUMN queue_count,
DROP COLUMN queue_ids_hash;
|]
<> dropXorHashFuncs
@@ -104,6 +104,71 @@ $$;
CREATE FUNCTION smp_server.on_queue_delete() RETURNS trigger
LANGUAGE plpgsql
AS $$
BEGIN
IF OLD.deleted_at IS NULL THEN
IF OLD.rcv_service_id IS NOT NULL THEN
PERFORM update_aggregates(OLD.rcv_service_id, 'M', OLD.recipient_id, -1);
END IF;
IF OLD.ntf_service_id IS NOT NULL AND OLD.notifier_id IS NOT NULL THEN
PERFORM update_aggregates(OLD.ntf_service_id, 'N', OLD.notifier_id, -1);
END IF;
END IF;
RETURN OLD;
END;
$$;
CREATE FUNCTION smp_server.on_queue_insert() RETURNS trigger
LANGUAGE plpgsql
AS $$
BEGIN
IF NEW.rcv_service_id IS NOT NULL THEN
PERFORM update_aggregates(NEW.rcv_service_id, 'M', NEW.recipient_id, 1);
END IF;
IF NEW.ntf_service_id IS NOT NULL AND NEW.notifier_id IS NOT NULL THEN
PERFORM update_aggregates(NEW.ntf_service_id, 'N', NEW.notifier_id, 1);
END IF;
RETURN NEW;
END;
$$;
CREATE FUNCTION smp_server.on_queue_update() RETURNS trigger
LANGUAGE plpgsql
AS $$
BEGIN
IF OLD.deleted_at IS NULL AND OLD.rcv_service_id IS NOT NULL THEN
IF NOT (NEW.deleted_at IS NULL AND NEW.rcv_service_id IS NOT NULL) THEN
PERFORM update_aggregates(OLD.rcv_service_id, 'M', OLD.recipient_id, -1);
ELSIF OLD.rcv_service_id IS DISTINCT FROM NEW.rcv_service_id THEN
PERFORM update_aggregates(OLD.rcv_service_id, 'M', OLD.recipient_id, -1);
PERFORM update_aggregates(NEW.rcv_service_id, 'M', NEW.recipient_id, 1);
END IF;
ELSIF NEW.deleted_at IS NULL AND NEW.rcv_service_id IS NOT NULL THEN
PERFORM update_aggregates(NEW.rcv_service_id, 'M', NEW.recipient_id, 1);
END IF;
IF OLD.deleted_at IS NULL AND OLD.ntf_service_id IS NOT NULL AND OLD.notifier_id IS NOT NULL THEN
IF NOT (NEW.deleted_at IS NULL AND NEW.ntf_service_id IS NOT NULL AND NEW.notifier_id IS NOT NULL) THEN
PERFORM update_aggregates(OLD.ntf_service_id, 'N', OLD.notifier_id, -1);
ELSIF OLD.ntf_service_id IS DISTINCT FROM NEW.ntf_service_id OR OLD.notifier_id IS DISTINCT FROM NEW.notifier_id THEN
PERFORM update_aggregates(OLD.ntf_service_id, 'N', OLD.notifier_id, -1);
PERFORM update_aggregates(NEW.ntf_service_id, 'N', NEW.notifier_id, 1);
END IF;
ELSIF NEW.deleted_at IS NULL AND NEW.ntf_service_id IS NOT NULL AND NEW.notifier_id IS NOT NULL THEN
PERFORM update_aggregates(NEW.ntf_service_id, 'N', NEW.notifier_id, 1);
END IF;
RETURN NEW;
END;
$$;
CREATE FUNCTION smp_server.try_del_msg(p_recipient_id bytea, p_msg_id bytea) RETURNS TABLE(r_msg_id bytea, r_msg_ts bigint, r_msg_quota boolean, r_msg_ntf_flag boolean, r_msg_body bytea)
LANGUAGE plpgsql
AS $$
@@ -225,6 +290,43 @@ $$;
CREATE FUNCTION smp_server.update_aggregates(p_service_id bytea, p_role text, p_queue_id bytea, p_change bigint) RETURNS void
LANGUAGE plpgsql
AS $$
BEGIN
UPDATE services
SET queue_count = queue_count + p_change,
queue_ids_hash = xor_combine(queue_ids_hash, public.digest(p_queue_id, 'md5'))
WHERE service_id = p_service_id AND service_role = p_role;
END;
$$;
CREATE FUNCTION smp_server.update_all_aggregates() RETURNS void
LANGUAGE plpgsql
AS $$
BEGIN
WITH acc AS (
SELECT
s.service_id,
count(1) as q_count,
xor_aggregate(public.digest(CASE WHEN s.service_role = 'M' THEN q.recipient_id ELSE COALESCE(q.notifier_id, '\x00000000000000000000000000000000') END, 'md5')) AS q_ids_hash
FROM services s
JOIN msg_queues q ON (s.service_id = q.rcv_service_id AND s.service_role = 'M') OR (s.service_id = q.ntf_service_id AND s.service_role = 'N')
WHERE q.deleted_at IS NULL
GROUP BY s.service_id
)
UPDATE services s
SET queue_count = COALESCE(acc.q_count, 0),
queue_ids_hash = COALESCE(acc.q_ids_hash, '\x00000000000000000000000000000000')
FROM acc
WHERE s.service_id = acc.service_id;
END;
$$;
CREATE FUNCTION smp_server.write_message(p_recipient_id bytea, p_msg_id bytea, p_msg_ts bigint, p_msg_quota boolean, p_msg_ntf_flag boolean, p_msg_body bytea, p_quota integer) RETURNS TABLE(quota_written boolean, was_empty boolean)
LANGUAGE plpgsql
AS $$
@@ -256,6 +358,34 @@ END;
$$;
CREATE FUNCTION smp_server.xor_combine(state bytea, value bytea) RETURNS bytea
LANGUAGE plpgsql IMMUTABLE STRICT
AS $$
DECLARE
result BYTEA := state;
i INTEGER;
len INTEGER := octet_length(value);
BEGIN
IF octet_length(state) != len THEN
RAISE EXCEPTION 'Inputs must be equal length (% != %)', octet_length(state), len;
END IF;
FOR i IN 0..len-1 LOOP
result := set_byte(result, i, get_byte(state, i) # get_byte(value, i));
END LOOP;
RETURN result;
END;
$$;
CREATE AGGREGATE smp_server.xor_aggregate(bytea) (
SFUNC = smp_server.xor_combine,
STYPE = bytea,
INITCOND = '\x00000000000000000000000000000000'
);
SET default_table_access_method = heap;
@@ -320,7 +450,9 @@ CREATE TABLE smp_server.services (
service_role text NOT NULL,
service_cert bytea NOT NULL,
service_cert_hash bytea NOT NULL,
created_at bigint NOT NULL
created_at bigint NOT NULL,
queue_count bigint DEFAULT 0 NOT NULL,
queue_ids_hash bytea DEFAULT '\x00000000000000000000000000000000'::bytea NOT NULL
);
@@ -390,6 +522,18 @@ CREATE INDEX idx_services_service_role ON smp_server.services USING btree (servi
CREATE TRIGGER tr_queue_delete AFTER DELETE ON smp_server.msg_queues FOR EACH ROW EXECUTE FUNCTION smp_server.on_queue_delete();
CREATE TRIGGER tr_queue_insert AFTER INSERT ON smp_server.msg_queues FOR EACH ROW EXECUTE FUNCTION smp_server.on_queue_insert();
CREATE TRIGGER tr_queue_update AFTER UPDATE ON smp_server.msg_queues FOR EACH ROW EXECUTE FUNCTION smp_server.on_queue_update();
ALTER TABLE ONLY smp_server.messages
ADD CONSTRAINT messages_recipient_id_fkey FOREIGN KEY (recipient_id) REFERENCES smp_server.msg_queues(recipient_id) ON UPDATE RESTRICT ON DELETE CASCADE;
+26 -18
View File
@@ -28,6 +28,7 @@ where
import qualified Control.Exception as E
import Control.Logger.Simple
import Control.Monad
import Data.Bifunctor (first)
import Data.Bitraversable (bimapM)
import Data.Functor (($>))
import Data.Int (Int64)
@@ -62,8 +63,8 @@ data STMQueueStore q = STMQueueStore
data STMService = STMService
{ serviceRec :: ServiceRec,
serviceRcvQueues :: TVar (Set RecipientId),
serviceNtfQueues :: TVar (Set NotifierId)
serviceRcvQueues :: TVar (Set RecipientId, IdsHash), -- TODO [certs rcv] get/maintain hash
serviceNtfQueues :: TVar (Set NotifierId, IdsHash) -- TODO [certs rcv] get/maintain hash
}
setStoreLog :: STMQueueStore q -> StoreLog 'WriteMode -> IO ()
@@ -113,7 +114,7 @@ instance StoreQueueClass q => QueueStoreClass q (STMQueueStore q) where
}
where
serviceCount role = M.foldl' (\ !n s -> if serviceRole (serviceRec s) == role then n + 1 else n) 0
serviceQueuesCount serviceSel = foldM (\n s -> (n +) . S.size <$> readTVarIO (serviceSel s)) 0
serviceQueuesCount serviceSel = foldM (\n s -> (n +) . S.size . fst <$> readTVarIO (serviceSel s)) 0
addQueue_ :: STMQueueStore q -> (RecipientId -> QueueRec -> IO q) -> RecipientId -> QueueRec -> IO (Either ErrorType q)
addQueue_ st mkQ rId qr@QueueRec {senderId = sId, notifier, queueData, rcvServiceId} = do
@@ -304,8 +305,8 @@ instance StoreQueueClass q => QueueStoreClass q (STMQueueStore q) where
TM.insert fp newSrvId serviceCerts
pure $ Right (newSrvId, True)
newSTMService = do
serviceRcvQueues <- newTVar S.empty
serviceNtfQueues <- newTVar S.empty
serviceRcvQueues <- newTVar (S.empty, mempty)
serviceNtfQueues <- newTVar (S.empty, mempty)
pure STMService {serviceRec = sr, serviceRcvQueues, serviceNtfQueues}
setQueueService :: (PartyI p, ServiceParty p) => STMQueueStore q -> q -> SParty p -> Maybe ServiceId -> IO (Either ErrorType ())
@@ -331,7 +332,7 @@ instance StoreQueueClass q => QueueStoreClass q (STMQueueStore q) where
let !q' = Just q {notifier = Just nc {ntfServiceId = serviceId}}
updateServiceQueues serviceNtfQueues nId prevNtfSrvId
writeTVar qr q' $> Right ()
updateServiceQueues :: (STMService -> TVar (Set QueueId)) -> QueueId -> Maybe ServiceId -> STM ()
updateServiceQueues :: (STMService -> TVar (Set QueueId, IdsHash)) -> QueueId -> Maybe ServiceId -> STM ()
updateServiceQueues serviceSel qId prevSrvId = do
mapM_ (removeServiceQueue st serviceSel qId) prevSrvId
mapM_ (addServiceQueue st serviceSel qId) serviceId
@@ -346,16 +347,16 @@ instance StoreQueueClass q => QueueStoreClass q (STMQueueStore q) where
pure $ Right (ssNtfs', deleteNtfs)
where
addService (ssNtfs, ntfs') (serviceId, s) = do
snIds <- readTVarIO $ serviceNtfQueues s
(snIds, _) <- readTVarIO $ serviceNtfQueues s
let (sNtfs, restNtfs) = partition (\(nId, _) -> S.member nId snIds) ntfs'
pure ((Just serviceId, sNtfs) : ssNtfs, restNtfs)
getServiceQueueCount :: (PartyI p, ServiceParty p) => STMQueueStore q -> SParty p -> ServiceId -> IO (Either ErrorType Int64)
getServiceQueueCount st party serviceId =
getServiceQueueCountHash :: (PartyI p, ServiceParty p) => STMQueueStore q -> SParty p -> ServiceId -> IO (Either ErrorType (Int64, IdsHash))
getServiceQueueCountHash st party serviceId =
TM.lookupIO serviceId (services st) >>=
maybe (pure $ Left AUTH) (fmap (Right . fromIntegral . S.size) . readTVarIO . serviceSel)
maybe (pure $ Left AUTH) (fmap (Right . first (fromIntegral . S.size)) . readTVarIO . serviceSel)
where
serviceSel :: STMService -> TVar (Set QueueId)
serviceSel :: STMService -> TVar (Set QueueId, IdsHash)
serviceSel = case party of
SRecipientService -> serviceRcvQueues
SNotifierService -> serviceNtfQueues
@@ -366,7 +367,7 @@ foldRcvServiceQueues st serviceId f acc =
Nothing -> pure acc
Just s ->
readTVarIO (serviceRcvQueues s)
>>= foldM (\a -> get >=> maybe (pure a) (f a)) acc
>>= foldM (\a -> get >=> maybe (pure a) (f a)) acc . fst
where
get rId = TM.lookupIO rId (queues st) $>>= \q -> (q,) <$$> readTVarIO (queueRec q)
@@ -379,16 +380,23 @@ setStatus qr status =
Just q -> (Right (), Just q {status})
Nothing -> (Left AUTH, Nothing)
addServiceQueue :: STMQueueStore q -> (STMService -> TVar (Set QueueId)) -> QueueId -> ServiceId -> STM ()
addServiceQueue st serviceSel qId serviceId =
TM.lookup serviceId (services st) >>= mapM_ (\s -> modifyTVar' (serviceSel s) (S.insert qId))
addServiceQueue :: STMQueueStore q -> (STMService -> TVar (Set QueueId, IdsHash)) -> QueueId -> ServiceId -> STM ()
addServiceQueue = setServiceQueues_ S.insert
{-# INLINE addServiceQueue #-}
removeServiceQueue :: STMQueueStore q -> (STMService -> TVar (Set QueueId)) -> QueueId -> ServiceId -> STM ()
removeServiceQueue st serviceSel qId serviceId =
TM.lookup serviceId (services st) >>= mapM_ (\s -> modifyTVar' (serviceSel s) (S.delete qId))
removeServiceQueue :: STMQueueStore q -> (STMService -> TVar (Set QueueId, IdsHash)) -> QueueId -> ServiceId -> STM ()
removeServiceQueue = setServiceQueues_ S.delete
{-# INLINE removeServiceQueue #-}
setServiceQueues_ :: (QueueId -> Set QueueId -> Set QueueId) -> STMQueueStore q -> (STMService -> TVar (Set QueueId, IdsHash)) -> QueueId -> ServiceId -> STM ()
setServiceQueues_ updateSet st serviceSel qId serviceId =
TM.lookup serviceId (services st) >>= mapM_ (\v -> modifyTVar' (serviceSel v) update)
where
update (s, idsHash) =
let !s' = updateSet qId s
!idsHash' = queueIdHash qId <> idsHash
in (s', idsHash')
removeNotifier :: STMQueueStore q -> NtfCreds -> STM ()
removeNotifier st NtfCreds {notifierId = nId, ntfServiceId} = do
TM.delete nId $ notifiers st
@@ -47,7 +47,7 @@ class StoreQueueClass q => QueueStoreClass q s where
getCreateService :: s -> ServiceRec -> IO (Either ErrorType ServiceId)
setQueueService :: (PartyI p, ServiceParty p) => s -> q -> SParty p -> Maybe ServiceId -> IO (Either ErrorType ())
getQueueNtfServices :: s -> [(NotifierId, a)] -> IO (Either ErrorType ([(Maybe ServiceId, [(NotifierId, a)])], [(NotifierId, a)]))
getServiceQueueCount :: (PartyI p, ServiceParty p) => s -> SParty p -> ServiceId -> IO (Either ErrorType Int64)
getServiceQueueCountHash :: (PartyI p, ServiceParty p) => s -> SParty p -> ServiceId -> IO (Either ErrorType (Int64, IdsHash))
data EntityCounts = EntityCounts
{ queueCount :: Int,
+75 -7
View File
@@ -821,7 +821,15 @@ data ServiceStats = ServiceStats
srvSubCount :: IORef Int,
srvSubDuplicate :: IORef Int,
srvSubQueues :: IORef Int,
srvSubEnd :: IORef Int
srvSubEnd :: IORef Int,
-- counts of subscriptions
srvSubOk :: IORef Int, -- server has the same queues as expected
srvSubMore :: IORef Int, -- server has more queues than expected
srvSubFewer :: IORef Int, -- server has fewer queues than expected
srvSubDiff :: IORef Int, -- server has the same count, but different queues than expected (based on xor-hash)
-- adds actual deviations
srvSubMoreTotal :: IORef Int, -- server has more queues than expected, adds diff
srvSubFewerTotal :: IORef Int
}
data ServiceStatsData = ServiceStatsData
@@ -832,7 +840,13 @@ data ServiceStatsData = ServiceStatsData
_srvSubCount :: Int,
_srvSubDuplicate :: Int,
_srvSubQueues :: Int,
_srvSubEnd :: Int
_srvSubEnd :: Int,
_srvSubOk :: Int,
_srvSubMore :: Int,
_srvSubFewer :: Int,
_srvSubDiff :: Int,
_srvSubMoreTotal :: Int,
_srvSubFewerTotal :: Int
}
deriving (Show)
@@ -846,7 +860,13 @@ newServiceStatsData =
_srvSubCount = 0,
_srvSubDuplicate = 0,
_srvSubQueues = 0,
_srvSubEnd = 0
_srvSubEnd = 0,
_srvSubOk = 0,
_srvSubMore = 0,
_srvSubFewer = 0,
_srvSubDiff = 0,
_srvSubMoreTotal = 0,
_srvSubFewerTotal = 0
}
newServiceStats :: IO ServiceStats
@@ -859,6 +879,12 @@ newServiceStats = do
srvSubDuplicate <- newIORef 0
srvSubQueues <- newIORef 0
srvSubEnd <- newIORef 0
srvSubOk <- newIORef 0
srvSubMore <- newIORef 0
srvSubFewer <- newIORef 0
srvSubDiff <- newIORef 0
srvSubMoreTotal <- newIORef 0
srvSubFewerTotal <- newIORef 0
pure
ServiceStats
{ srvAssocNew,
@@ -868,7 +894,13 @@ newServiceStats = do
srvSubCount,
srvSubDuplicate,
srvSubQueues,
srvSubEnd
srvSubEnd,
srvSubOk,
srvSubMore,
srvSubFewer,
srvSubDiff,
srvSubMoreTotal,
srvSubFewerTotal
}
getServiceStatsData :: ServiceStats -> IO ServiceStatsData
@@ -881,6 +913,12 @@ getServiceStatsData s = do
_srvSubDuplicate <- readIORef $ srvSubDuplicate s
_srvSubQueues <- readIORef $ srvSubQueues s
_srvSubEnd <- readIORef $ srvSubEnd s
_srvSubOk <- readIORef $ srvSubOk s
_srvSubMore <- readIORef $ srvSubMore s
_srvSubFewer <- readIORef $ srvSubFewer s
_srvSubDiff <- readIORef $ srvSubDiff s
_srvSubMoreTotal <- readIORef $ srvSubMoreTotal s
_srvSubFewerTotal <- readIORef $ srvSubFewerTotal s
pure
ServiceStatsData
{ _srvAssocNew,
@@ -890,7 +928,13 @@ getServiceStatsData s = do
_srvSubCount,
_srvSubDuplicate,
_srvSubQueues,
_srvSubEnd
_srvSubEnd,
_srvSubOk,
_srvSubMore,
_srvSubFewer,
_srvSubDiff,
_srvSubMoreTotal,
_srvSubFewerTotal
}
getResetServiceStatsData :: ServiceStats -> IO ServiceStatsData
@@ -903,6 +947,12 @@ getResetServiceStatsData s = do
_srvSubDuplicate <- atomicSwapIORef (srvSubDuplicate s) 0
_srvSubQueues <- atomicSwapIORef (srvSubQueues s) 0
_srvSubEnd <- atomicSwapIORef (srvSubEnd s) 0
_srvSubOk <- atomicSwapIORef (srvSubOk s) 0
_srvSubMore <- atomicSwapIORef (srvSubMore s) 0
_srvSubFewer <- atomicSwapIORef (srvSubFewer s) 0
_srvSubDiff <- atomicSwapIORef (srvSubDiff s) 0
_srvSubMoreTotal <- atomicSwapIORef (srvSubMoreTotal s) 0
_srvSubFewerTotal <- atomicSwapIORef (srvSubFewerTotal s) 0
pure
ServiceStatsData
{ _srvAssocNew,
@@ -912,7 +962,13 @@ getResetServiceStatsData s = do
_srvSubCount,
_srvSubDuplicate,
_srvSubQueues,
_srvSubEnd
_srvSubEnd,
_srvSubOk,
_srvSubMore,
_srvSubFewer,
_srvSubDiff,
_srvSubMoreTotal,
_srvSubFewerTotal
}
-- this function is not thread safe, it is used on server start only
@@ -926,6 +982,12 @@ setServiceStats s d = do
writeIORef (srvSubDuplicate s) $! _srvSubDuplicate d
writeIORef (srvSubQueues s) $! _srvSubQueues d
writeIORef (srvSubEnd s) $! _srvSubEnd d
writeIORef (srvSubOk s) $! _srvSubOk d
writeIORef (srvSubMore s) $! _srvSubMore d
writeIORef (srvSubFewer s) $! _srvSubFewer d
writeIORef (srvSubDiff s) $! _srvSubDiff d
writeIORef (srvSubMoreTotal s) $! _srvSubMoreTotal d
writeIORef (srvSubFewerTotal s) $! _srvSubFewerTotal d
instance StrEncoding ServiceStatsData where
strEncode ServiceStatsData {_srvAssocNew, _srvAssocDuplicate, _srvAssocUpdated, _srvAssocRemoved, _srvSubCount, _srvSubDuplicate, _srvSubQueues, _srvSubEnd} =
@@ -963,7 +1025,13 @@ instance StrEncoding ServiceStatsData where
_srvSubCount,
_srvSubDuplicate,
_srvSubQueues,
_srvSubEnd
_srvSubEnd,
_srvSubOk = 0,
_srvSubMore = 0,
_srvSubFewer = 0,
_srvSubDiff = 0,
_srvSubMoreTotal = 0,
_srvSubFewerTotal = 0
}
data TimeBuckets = TimeBuckets
@@ -61,7 +61,7 @@ readQueueStore tty mkQ f st = readLogLines tty f $ \_ -> processLine
Left e -> logError $ errPfx <> tshow e
where
errPfx = "STORE: getCreateService, stored service " <> decodeLatin1 (strEncode serviceId) <> ", "
QueueService rId (ASP party) serviceId -> withQueue rId "QueueService" $ \q -> setQueueService st q party serviceId
QueueService qId (ASP party) serviceId -> withQueue qId "QueueService" $ \q -> setQueueService st q party serviceId
printError :: String -> IO ()
printError e = B.putStrLn $ "Error parsing log: " <> B.pack e <> " - " <> s
withQueue :: forall a. RecipientId -> T.Text -> (q -> IO (Either ErrorType a)) -> IO ()