smp: support client notices (#1659)

* agent: support client notices

* improve

* fix, test

* rename

* cleanup

* send and process notices in more cases

* dont delete

* dont remove notice on other permanent errors

* dont remove notice if there is no notice ID in queue

* add server to error

* allow deleting

* only use notice if key hash matches
This commit is contained in:
Evgeny
2025-10-17 18:34:59 +01:00
committed by GitHub
parent 234aeb81dd
commit 1329fc726f
54 changed files with 653 additions and 221 deletions
@@ -85,6 +85,7 @@ import Simplex.Messaging.Server.QueueStore.Postgres.Migrations (serverMigrations
import Simplex.Messaging.Server.QueueStore.STM (STMService (..), readQueueRecIO)
import Simplex.Messaging.Server.QueueStore.Types
import Simplex.Messaging.Server.StoreLog
import Simplex.Messaging.SystemTime
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport (SMPServiceRole (..))
@@ -429,7 +430,7 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
setStatusDB "unblockQueue" st sq EntityActive $
withLog "unblockQueue" st (`logUnblockQueue` recipientId sq)
updateQueueTime :: PostgresQueueStore q -> q -> RoundedSystemTime -> IO (Either ErrorType QueueRec)
updateQueueTime :: PostgresQueueStore q -> q -> SystemDate -> IO (Either ErrorType QueueRec)
updateQueueTime st sq t =
withQueueRec sq "updateQueueTime" $ \q@QueueRec {updatedAt} ->
if updatedAt == Just t
@@ -641,7 +642,7 @@ type QueueRecRow =
( RecipientId, NonEmpty RcvPublicAuthKey, RcvDhSecret,
SenderId, Maybe SndPublicAuthKey, Maybe QueueMode,
Maybe NotifierId, Maybe NtfPublicAuthKey, Maybe RcvNtfDhSecret, Maybe ServiceId,
ServerEntityStatus, Maybe RoundedSystemTime, Maybe LinkId, Maybe ServiceId
ServerEntityStatus, Maybe SystemDate, Maybe LinkId, Maybe ServiceId
)
queueRecToRow :: (RecipientId, QueueRec) -> QueueRecRow :. (Maybe EncDataBytes, Maybe EncDataBytes)
@@ -709,11 +710,11 @@ mkNotifier (Just notifierId, Just notifierKey, Just rcvNtfDhSecret) ntfServiceId
Just NtfCreds {notifierId, notifierKey, rcvNtfDhSecret, ntfServiceId}
mkNotifier _ _ = Nothing
serviceRecToRow :: ServiceRec -> (ServiceId, SMPServiceRole, X.CertificateChain, Binary ByteString, RoundedSystemTime)
serviceRecToRow :: ServiceRec -> (ServiceId, SMPServiceRole, X.CertificateChain, Binary ByteString, SystemDate)
serviceRecToRow ServiceRec {serviceId, serviceRole, serviceCert, serviceCertHash = XV.Fingerprint fp, serviceCreatedAt} =
(serviceId, serviceRole, serviceCert, Binary fp, serviceCreatedAt)
rowToServiceRec :: (ServiceId, SMPServiceRole, X.CertificateChain, Binary ByteString, RoundedSystemTime) -> ServiceRec
rowToServiceRec :: (ServiceId, SMPServiceRole, X.CertificateChain, Binary ByteString, SystemDate) -> ServiceRec
rowToServiceRec (serviceId, serviceRole, serviceCert, Binary fp, serviceCreatedAt) =
ServiceRec {serviceId, serviceRole, serviceCert, serviceCertHash = XV.Fingerprint fp, serviceCreatedAt}
@@ -792,4 +793,8 @@ instance FromField C.APublicAuthKey where fromField = blobFieldDecoder C.decodeP
instance ToField EncDataBytes where toField (EncDataBytes s) = toField (Binary s)
deriving newtype instance FromField EncDataBytes
deriving newtype instance ToField (RoundedSystemTime t)
deriving newtype instance FromField (RoundedSystemTime t)
#endif
@@ -1,11 +1,11 @@
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}
module Simplex.Messaging.Server.QueueStore.Postgres.Migrations where
import Data.List (sortOn)
import Data.Text (Text)
import qualified Data.Text as T
import Simplex.Messaging.Agent.Store.Shared
import Text.RawString.QQ (r)
@@ -26,8 +26,7 @@ serverMigrations = sortOn name $ map migration serverSchemaMigrations
m20250207_initial :: Text
m20250207_initial =
T.pack
[r|
[r|
CREATE TABLE msg_queues(
recipient_id BYTEA NOT NULL,
recipient_key BYTEA NOT NULL,
@@ -51,24 +50,21 @@ CREATE INDEX idx_msg_queues_deleted_at ON msg_queues (deleted_at);
m20250319_updated_index :: Text
m20250319_updated_index =
T.pack
[r|
[r|
DROP INDEX idx_msg_queues_deleted_at;
CREATE INDEX idx_msg_queues_updated_at ON msg_queues (deleted_at, updated_at);
|]
down_m20250319_updated_index :: Text
down_m20250319_updated_index =
T.pack
[r|
[r|
DROP INDEX idx_msg_queues_updated_at;
CREATE INDEX idx_msg_queues_deleted_at ON msg_queues (deleted_at);
|]
m20250320_short_links :: Text
m20250320_short_links =
T.pack
[r|
[r|
ALTER TABLE msg_queues
ADD COLUMN queue_mode TEXT,
ADD COLUMN link_id BYTEA,
@@ -88,8 +84,7 @@ CREATE UNIQUE INDEX idx_msg_queues_link_id ON msg_queues(link_id);
down_m20250320_short_links :: Text
down_m20250320_short_links =
T.pack
[r|
[r|
ALTER TABLE msg_queues ADD COLUMN snd_secure BOOLEAN NOT NULL DEFAULT FALSE;
UPDATE msg_queues SET snd_secure = TRUE WHERE queue_mode = 'M';
@@ -124,8 +119,7 @@ ALTER TABLE msg_queues RENAME COLUMN recipient_keys TO recipient_key;
m20250514_service_certs :: Text
m20250514_service_certs =
T.pack
[r|
[r|
CREATE TABLE services(
service_id BYTEA NOT NULL,
service_role TEXT NOT NULL,
@@ -147,8 +141,7 @@ CREATE INDEX idx_msg_queues_ntf_service_id ON msg_queues(ntf_service_id, deleted
down_m20250514_service_certs :: Text
down_m20250514_service_certs =
T.pack
[r|
[r|
DROP INDEX idx_msg_queues_rcv_service_id;
DROP INDEX idx_msg_queues_ntf_service_id;
@@ -163,8 +156,7 @@ DROP TABLE services;
m20250903_store_messages :: Text
m20250903_store_messages =
T.pack
[r|
[r|
CREATE TABLE messages(
message_id BIGINT NOT NULL PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
recipient_id BYTEA NOT NULL REFERENCES msg_queues ON DELETE CASCADE ON UPDATE RESTRICT,
@@ -434,8 +426,7 @@ $$;
down_m20250903_store_messages :: Text
down_m20250903_store_messages =
T.pack
[r|
[r|
DROP FUNCTION write_message;
DROP FUNCTION try_del_msg;
DROP FUNCTION try_del_peek_msg;
@@ -41,6 +41,7 @@ import Simplex.Messaging.Protocol
import Simplex.Messaging.Server.QueueStore
import Simplex.Messaging.Server.QueueStore.Types
import Simplex.Messaging.Server.StoreLog
import Simplex.Messaging.SystemTime
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport (SMPServiceRole (..))
@@ -251,7 +252,7 @@ instance StoreQueueClass q => QueueStoreClass q (STMQueueStore q) where
setStatus (queueRec sq) EntityActive
$>> withLog "unblockQueue" st (`logUnblockQueue` recipientId sq)
updateQueueTime :: STMQueueStore q -> q -> RoundedSystemTime -> IO (Either ErrorType QueueRec)
updateQueueTime :: STMQueueStore q -> q -> SystemDate -> IO (Either ErrorType QueueRec)
updateQueueTime st sq t = withQueueRec qr update $>>= log'
where
qr = queueRec sq
@@ -14,6 +14,7 @@ import Data.List.NonEmpty (NonEmpty)
import Data.Text (Text)
import Simplex.Messaging.Protocol
import Simplex.Messaging.Server.QueueStore
import Simplex.Messaging.SystemTime
import Simplex.Messaging.TMap (TMap)
class StoreQueueClass q where
@@ -41,7 +42,7 @@ class StoreQueueClass q => QueueStoreClass q s where
suspendQueue :: s -> q -> IO (Either ErrorType ())
blockQueue :: s -> q -> BlockingInfo -> IO (Either ErrorType ())
unblockQueue :: s -> q -> IO (Either ErrorType ())
updateQueueTime :: s -> q -> RoundedSystemTime -> IO (Either ErrorType QueueRec)
updateQueueTime :: s -> q -> SystemDate -> IO (Either ErrorType QueueRec)
deleteStoreQueue :: s -> q -> IO (Either ErrorType QueueRec)
getCreateService :: s -> ServiceRec -> IO (Either ErrorType ServiceId)
setQueueService :: (PartyI p, ServiceParty p) => s -> q -> SParty p -> Maybe ServiceId -> IO (Either ErrorType ())