diff --git a/simplexmq.cabal b/simplexmq.cabal index cf72107d4..f5ae06870 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -103,6 +103,7 @@ library Simplex.Messaging.Agent.Store.AgentStore Simplex.Messaging.Agent.Store.Common Simplex.Messaging.Agent.Store.DB + Simplex.Messaging.Agent.Store.Entity Simplex.Messaging.Agent.Store.Interface Simplex.Messaging.Agent.Store.Migrations Simplex.Messaging.Agent.Store.Migrations.App @@ -130,12 +131,13 @@ library Simplex.Messaging.Notifications.Types Simplex.Messaging.Parsers Simplex.Messaging.Protocol + Simplex.Messaging.Protocol.Types Simplex.Messaging.Server.Expiration Simplex.Messaging.Server.QueueStore.Postgres.Config Simplex.Messaging.Server.QueueStore.QueueInfo Simplex.Messaging.ServiceScheme Simplex.Messaging.Session - Simplex.Messaging.Agent.Store.Entity + Simplex.Messaging.SystemTime Simplex.Messaging.TMap Simplex.Messaging.Transport Simplex.Messaging.Transport.Buffer @@ -164,6 +166,7 @@ library Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250322_short_links Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250702_conn_invitations_remove_cascade_delete Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251009_queue_to_subscribe + Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251010_client_notices else exposed-modules: Simplex.Messaging.Agent.Store.SQLite @@ -212,6 +215,7 @@ library Simplex.Messaging.Agent.Store.SQLite.Migrations.M20250322_short_links Simplex.Messaging.Agent.Store.SQLite.Migrations.M20250702_conn_invitations_remove_cascade_delete Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251009_queue_to_subscribe + Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251010_client_notices if flag(client_postgres) || flag(server_postgres) exposed-modules: Simplex.Messaging.Agent.Store.Postgres diff --git a/src/Simplex/FileTransfer/Server.hs b/src/Simplex/FileTransfer/Server.hs index c169e69b0..25de49afc 100644 --- a/src/Simplex/FileTransfer/Server.hs +++ b/src/Simplex/FileTransfer/Server.hs @@ -58,8 +58,9 @@ import Simplex.Messaging.Protocol (BlockingInfo, EntityId (..), RcvPublicAuthKey import Simplex.Messaging.Server (controlPortAuth, dummyVerifyCmd, verifyCmdAuthorization) import Simplex.Messaging.Server.Control (CPClientRole (..)) import Simplex.Messaging.Server.Expiration -import Simplex.Messaging.Server.QueueStore (RoundedSystemTime, ServerEntityStatus (..), getRoundedSystemTime) +import Simplex.Messaging.Server.QueueStore (ServerEntityStatus (..)) import Simplex.Messaging.Server.Stats +import Simplex.Messaging.SystemTime import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport (CertChainPubKey (..), SessionId, THandleAuth (..), THandleParams (..), TransportPeer (..), defaultSupportedParams) @@ -451,7 +452,7 @@ processXFTPRequest HTTP2Body {bodyPart} = \case let rIds = L.map (\(FileRecipient rId _) -> rId) rcps pure $ FRSndIds sId rIds pure $ either FRErr id r - addFileRetry :: FileStore -> FileInfo -> Int -> RoundedSystemTime -> M (Either XFTPErrorType XFTPFileId) + addFileRetry :: FileStore -> FileInfo -> Int -> RoundedFileTime -> M (Either XFTPErrorType XFTPFileId) addFileRetry st file n ts = retryAdd n $ \sId -> runExceptT $ do ExceptT $ addFile st sId file ts EntityActive @@ -579,8 +580,8 @@ deleteOrBlockServerFile_ FileRec {filePath, fileInfo} stat storeAction = runExce liftIO $ atomicModifyIORef'_ (filesCount stats) (subtract 1) liftIO $ atomicModifyIORef'_ (filesSize stats) (subtract $ fromIntegral $ size fileInfo) -getFileTime :: IO RoundedSystemTime -getFileTime = getRoundedSystemTime fileTimePrecision +getFileTime :: IO RoundedFileTime +getFileTime = getRoundedSystemTime expireServerFiles :: Maybe Int -> ExpirationConfig -> M () expireServerFiles itemDelay expCfg = do diff --git a/src/Simplex/FileTransfer/Server/Store.hs b/src/Simplex/FileTransfer/Server/Store.hs index f59712fc0..eec481a21 100644 --- a/src/Simplex/FileTransfer/Server/Store.hs +++ b/src/Simplex/FileTransfer/Server/Store.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE DataKinds #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} @@ -8,6 +9,7 @@ module Simplex.FileTransfer.Server.Store ( FileStore (..), FileRec (..), FileRecipient (..), + RoundedFileTime, newFileStore, addFile, setFilePath, @@ -33,7 +35,8 @@ import Simplex.FileTransfer.Transport (XFTPErrorType (..)) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol (BlockingInfo, RcvPublicAuthKey, RecipientId, SenderId) -import Simplex.Messaging.Server.QueueStore (RoundedSystemTime (..), ServerEntityStatus (..)) +import Simplex.Messaging.Server.QueueStore (ServerEntityStatus (..)) +import Simplex.Messaging.SystemTime import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Util (ifM, ($>>=)) @@ -49,10 +52,12 @@ data FileRec = FileRec fileInfo :: FileInfo, filePath :: TVar (Maybe FilePath), recipientIds :: TVar (Set RecipientId), - createdAt :: RoundedSystemTime, + createdAt :: RoundedFileTime, fileStatus :: TVar ServerEntityStatus } +type RoundedFileTime = RoundedSystemTime 3600 + fileTimePrecision :: Int64 fileTimePrecision = 3600 -- truncate creation time to 1 hour @@ -70,14 +75,14 @@ newFileStore = do usedStorage <- newTVarIO 0 pure FileStore {files, recipients, usedStorage} -addFile :: FileStore -> SenderId -> FileInfo -> RoundedSystemTime -> ServerEntityStatus -> STM (Either XFTPErrorType ()) +addFile :: FileStore -> SenderId -> FileInfo -> RoundedFileTime -> ServerEntityStatus -> STM (Either XFTPErrorType ()) addFile FileStore {files} sId fileInfo createdAt status = ifM (TM.member sId files) (pure $ Left DUPLICATE_) $ do f <- newFileRec sId fileInfo createdAt status TM.insert sId f files pure $ Right () -newFileRec :: SenderId -> FileInfo -> RoundedSystemTime -> ServerEntityStatus -> STM FileRec +newFileRec :: SenderId -> FileInfo -> RoundedFileTime -> ServerEntityStatus -> STM FileRec newFileRec senderId fileInfo createdAt status = do recipientIds <- newTVar S.empty filePath <- newTVar Nothing diff --git a/src/Simplex/FileTransfer/Server/StoreLog.hs b/src/Simplex/FileTransfer/Server/StoreLog.hs index c972da281..c82beda29 100644 --- a/src/Simplex/FileTransfer/Server/StoreLog.hs +++ b/src/Simplex/FileTransfer/Server/StoreLog.hs @@ -34,13 +34,13 @@ import Simplex.FileTransfer.Protocol (FileInfo (..)) import Simplex.FileTransfer.Server.Store import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol (BlockingInfo, RcvPublicAuthKey, RecipientId, SenderId) -import Simplex.Messaging.Server.QueueStore (RoundedSystemTime, ServerEntityStatus (..)) +import Simplex.Messaging.Server.QueueStore (ServerEntityStatus (..)) import Simplex.Messaging.Server.StoreLog import Simplex.Messaging.Util (bshow) import System.IO data FileStoreLogRecord - = AddFile SenderId FileInfo RoundedSystemTime ServerEntityStatus + = AddFile SenderId FileInfo RoundedFileTime ServerEntityStatus | PutFile SenderId FilePath | AddRecipients SenderId (NonEmpty FileRecipient) | DeleteFile SenderId @@ -69,7 +69,7 @@ instance StrEncoding FileStoreLogRecord where logFileStoreRecord :: StoreLog 'WriteMode -> FileStoreLogRecord -> IO () logFileStoreRecord = writeStoreLogRecord -logAddFile :: StoreLog 'WriteMode -> SenderId -> FileInfo -> RoundedSystemTime -> ServerEntityStatus -> IO () +logAddFile :: StoreLog 'WriteMode -> SenderId -> FileInfo -> RoundedFileTime -> ServerEntityStatus -> IO () logAddFile s = logFileStoreRecord s .:: AddFile logPutFile :: StoreLog 'WriteMode -> SenderId -> FilePath -> IO () diff --git a/src/Simplex/FileTransfer/Types.hs b/src/Simplex/FileTransfer/Types.hs index 953080480..aa465a12e 100644 --- a/src/Simplex/FileTransfer/Types.hs +++ b/src/Simplex/FileTransfer/Types.hs @@ -15,6 +15,7 @@ import Data.Text.Encoding (encodeUtf8) import Data.Word (Word32) import Simplex.FileTransfer.Client (XFTPChunkSpec (..)) import Simplex.FileTransfer.Description +import Simplex.Messaging.Agent.Store.DB (FromField (..), ToField (..), fromTextField_) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Crypto.File (CryptoFile (..)) import Simplex.Messaging.Encoding @@ -22,7 +23,6 @@ import Simplex.Messaging.Encoding.String import Simplex.Messaging.Parsers import Simplex.Messaging.Protocol (XFTPServer) import System.FilePath (()) -import Simplex.Messaging.Agent.Store.DB (FromField (..), ToField (..), fromTextField_) type RcvFileId = ByteString -- Agent entity ID diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index c8b646ded..9bcd09209 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -151,7 +151,7 @@ import qualified Data.Aeson.TH as JQ import Data.Bifunctor (bimap, first) import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B -import Data.Composition ((.:), (.:.), (.::), (.::.)) +import Data.Composition import Data.Either (isRight, partitionEithers, rights) import Data.Foldable (foldl', toList) import Data.Functor (($>)) @@ -189,10 +189,11 @@ import Simplex.Messaging.Agent.Store import Simplex.Messaging.Agent.Store.AgentStore import Simplex.Messaging.Agent.Store.Common (DBStore) import qualified Simplex.Messaging.Agent.Store.DB as DB +import Simplex.Messaging.Agent.Store.Entity import Simplex.Messaging.Agent.Store.Interface (closeDBStore, execSQL, getCurrentMigrations) import Simplex.Messaging.Agent.Store.Shared (UpMigration (..), upMigration) import qualified Simplex.Messaging.Agent.TSessionSubs as SS -import Simplex.Messaging.Client (NetworkRequestMode (..), SMPClientError, ServerTransmission (..), ServerTransmissionBatch, nonBlockingWriteTBQueue, temporaryClientError, unexpectedResponse) +import Simplex.Messaging.Client (NetworkRequestMode (..), SMPClientError, ServerTransmission (..), ServerTransmissionBatch, nonBlockingWriteTBQueue, smpErrorClientNotice, temporaryClientError, unexpectedResponse) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Crypto.File (CryptoFile, CryptoFileArgs) import Simplex.Messaging.Crypto.Ratchet (PQEncryption, PQSupport (..), pattern PQEncOff, pattern PQEncOn, pattern PQSupportOff, pattern PQSupportOn) @@ -227,7 +228,7 @@ import Simplex.Messaging.Protocol ) import qualified Simplex.Messaging.Protocol as SMP import Simplex.Messaging.ServiceScheme (ServiceScheme (..)) -import Simplex.Messaging.Agent.Store.Entity +import Simplex.Messaging.SystemTime import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport (SMPVersion) import Simplex.Messaging.Util @@ -251,13 +252,14 @@ getSMPAgentClient = getSMPAgentClient_ 1 {-# INLINE getSMPAgentClient #-} getSMPAgentClient_ :: Int -> AgentConfig -> InitialAgentServers -> DBStore -> Bool -> IO AgentClient -getSMPAgentClient_ clientId cfg initServers@InitialAgentServers {smp, xftp} store backgroundMode = +getSMPAgentClient_ clientId cfg initServers@InitialAgentServers {smp, xftp, presetServers} store backgroundMode = newSMPAgentEnv cfg store >>= runReaderT runAgent where runAgent = do liftIO $ checkServers "SMP" smp >> checkServers "XFTP" xftp currentTs <- liftIO getCurrentTime - c@AgentClient {acThread} <- liftIO . newAgentClient clientId initServers currentTs =<< ask + notices <- liftIO $ withTransaction store (`getClientNotices` presetServers) `catchAll_` pure [] + c@AgentClient {acThread} <- liftIO . newAgentClient clientId initServers currentTs notices =<< ask t <- runAgentThreads c `forkFinally` const (liftIO $ disconnectAgentClient c) atomically . writeTVar acThread . Just =<< mkWeakThreadId t pure c @@ -379,8 +381,8 @@ deleteConnectionsAsync c waitDelivery = withAgentEnv c . deleteConnectionsAsync' {-# INLINE deleteConnectionsAsync #-} -- | Create SMP agent connection (NEW command) -createConnection :: ConnectionModeI c => AgentClient -> NetworkRequestMode -> UserId -> Bool -> SConnectionMode c -> Maybe UserLinkData -> Maybe CRClientData -> CR.InitialKeys -> SubscriptionMode -> AE (ConnId, (CreatedConnLink c, Maybe ClientServiceId)) -createConnection c nm userId enableNtfs = withAgentEnv c .::. newConn c nm userId enableNtfs +createConnection :: ConnectionModeI c => AgentClient -> NetworkRequestMode -> UserId -> Bool -> Bool -> SConnectionMode c -> Maybe UserLinkData -> Maybe CRClientData -> CR.InitialKeys -> SubscriptionMode -> AE (ConnId, (CreatedConnLink c, Maybe ClientServiceId)) +createConnection c nm userId enableNtfs checkNotices = withAgentEnv c .::. newConn c nm userId enableNtfs checkNotices {-# INLINE createConnection #-} -- | Create or update user's contact connection short link @@ -863,13 +865,27 @@ switchConnectionAsync' c corrId connId = connectionStats c $ DuplexConnection cData rqs' sqs _ -> throwE $ CMD PROHIBITED "switchConnectionAsync: not duplex" -newConn :: ConnectionModeI c => AgentClient -> NetworkRequestMode -> UserId -> Bool -> SConnectionMode c -> Maybe UserLinkData -> Maybe CRClientData -> CR.InitialKeys -> SubscriptionMode -> AM (ConnId, (CreatedConnLink c, Maybe ClientServiceId)) -newConn c nm userId enableNtfs cMode userData_ clientData pqInitKeys subMode = do +newConn :: ConnectionModeI c => AgentClient -> NetworkRequestMode -> UserId -> Bool -> Bool -> SConnectionMode c -> Maybe UserLinkData -> Maybe CRClientData -> CR.InitialKeys -> SubscriptionMode -> AM (ConnId, (CreatedConnLink c, Maybe ClientServiceId)) +newConn c nm userId enableNtfs checkNotices cMode userData_ clientData pqInitKeys subMode = do srv <- getSMPServer c userId + when (checkNotices && connMode cMode == CMContact) $ checkClientNotices c srv connId <- newConnNoQueues c userId enableNtfs cMode (CR.connPQEncryption pqInitKeys) (connId,) <$> newRcvConnSrv c nm userId connId enableNtfs cMode userData_ clientData pqInitKeys subMode srv `catchE` \e -> withStore' c (`deleteConnRecord` connId) >> throwE e +checkClientNotices :: AgentClient -> SMPServerWithAuth -> AM () +checkClientNotices AgentClient {clientNotices, presetServers} (ProtoServerWithAuth srv@(ProtocolServer {host}) _) = do + notices <- readTVarIO clientNotices + unless (M.null notices) $ checkNotices notices =<< liftIO getSystemSeconds + where + srvKey + | isPresetServer srv presetServers = Nothing -- Nothing is used as key for preset servers + | otherwise = Just srv + checkNotices notices ts = + forM_ (M.lookup srvKey notices) $ \expires_ -> + when (maybe True (ts <) expires_) $ + throwError NOTICE {server = safeDecodeUtf8 $ strEncode $ L.head host, preset = isNothing srvKey, expiresAt = roundedToUTCTime <$> expires_} + setConnShortLink' :: AgentClient -> NetworkRequestMode -> ConnId -> SConnectionMode c -> UserLinkData -> Maybe CRClientData -> AM (ConnShortLink c) setConnShortLink' c nm connId cMode userData clientData = withConnLock c connId "setConnShortLink" $ do @@ -2794,18 +2810,20 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId STEvent msgOrErr -> withRcvConn entId $ \rq@RcvQueue {connId} conn -> case msgOrErr of Right msg -> runProcessSMP rq conn (toConnData conn) msg - Left e -> lift $ notifyErr connId e + Left e -> lift $ do + processClientNotice rq e + notifyErr connId e STResponse (Cmd SRecipient cmd) respOrErr -> withRcvConn entId $ \rq conn -> case cmd of SMP.SUB -> case respOrErr of - Right SMP.OK -> processSubOk rq upConnIds + Right SMP.OK -> liftIO $ processSubOk rq upConnIds -- TODO [certs rcv] associate queue with the service - Right (SMP.SOK serviceId_) -> processSubOk rq upConnIds + Right (SMP.SOK serviceId_) -> liftIO $ processSubOk rq upConnIds Right msg@SMP.MSG {} -> do - processSubOk rq upConnIds -- the connection is UP even when processing this particular message fails + liftIO $ processSubOk rq upConnIds -- the connection is UP even when processing this particular message fails runProcessSMP rq conn (toConnData conn) msg - Right r -> processSubErr rq $ unexpectedResponse r - Left e -> unless (temporaryClientError e) $ processSubErr rq e -- timeout/network was already reported + Right r -> lift $ processSubErr rq $ unexpectedResponse r + Left e -> lift $ unless (temporaryClientError e) $ processSubErr rq e -- timeout/network was already reported SMP.ACK _ -> case respOrErr of Right msg@SMP.MSG {} -> runProcessSMP rq conn (toConnData conn) msg _ -> pure () -- TODO process OK response to ACK @@ -2827,21 +2845,28 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId tryAllErrors' (a rq conn) >>= \case Left e -> notify' connId (ERR e) Right () -> pure () - processSubOk :: RcvQueue -> TVar [ConnId] -> AM () + processSubOk :: RcvQueue -> TVar [ConnId] -> IO () processSubOk rq@RcvQueue {connId} upConnIds = atomically . whenM (isPendingSub rq) $ do SS.addActiveSub tSess sessId (rcvQueueSub rq) $ currentSubs c modifyTVar' upConnIds (connId :) - processSubErr :: RcvQueue -> SMPClientError -> AM () + processSubErr :: RcvQueue -> SMPClientError -> AM' () processSubErr rq@RcvQueue {connId} e = do atomically . whenM (isPendingSub rq) $ failSubscription c tSess rq e >> incSMPServerStat c userId srv connSubErrs - lift $ notifyErr connId e + processClientNotice rq e + notifyErr connId e isPendingSub :: RcvQueue -> STM Bool isPendingSub rq = do pending <- (&&) <$> SS.hasPendingSub tSess (queueId rq) (currentSubs c) <*> activeClientSession c tSess sessId unless pending $ incSMPServerStat c userId srv connSubIgnored pure pending + processClientNotice rq e = + forM_ (smpErrorClientNotice e) $ \notice_ -> + E.bracket_ + (atomically $ takeTMVar $ clientNoticesLock c) + (atomically $ putTMVar (clientNoticesLock c) ()) + (processClientNotices c tSess [(rcvQueueSub rq, notice_)]) notify' :: forall e m. (AEntityI e, MonadIO m) => ConnId -> AEvent e -> m () notify' connId msg = atomically $ writeTBQueue subQ ("", connId, AEvt (sAEntity @e) msg) notifyErr :: ConnId -> SMPClientError -> AM' () diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 8bf3b77fe..217a1682a 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -49,6 +49,7 @@ module Simplex.Messaging.Agent.Client newRcvQueue_, subscribeQueues, subscribeUserServerQueues, + processClientNotices, getQueueMessage, decryptSMPMessage, failSubscription, @@ -237,8 +238,10 @@ import Simplex.Messaging.Agent.Protocol import Simplex.Messaging.Agent.RetryInterval import Simplex.Messaging.Agent.Stats import Simplex.Messaging.Agent.Store +import Simplex.Messaging.Agent.Store.AgentStore (getClientNotices, updateClientNotices) import Simplex.Messaging.Agent.Store.Common (DBStore, withTransaction) import qualified Simplex.Messaging.Agent.Store.DB as DB +import Simplex.Messaging.Agent.Store.Entity import Simplex.Messaging.Agent.TSessionSubs (TSessionSubs) import qualified Simplex.Messaging.Agent.TSessionSubs as SS import Simplex.Messaging.Client @@ -287,9 +290,10 @@ import Simplex.Messaging.Protocol senderCanSecure, ) import qualified Simplex.Messaging.Protocol as SMP +import Simplex.Messaging.Protocol.Types import Simplex.Messaging.Server.QueueStore.QueueInfo import Simplex.Messaging.Session -import Simplex.Messaging.Agent.Store.Entity +import Simplex.Messaging.SystemTime import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport (SMPVersion, SessionId, THandleParams (sessionId, thVersion), TransportError (..), TransportPeer (..), sndAuthKeySMPVersion, shortLinksSMPVersion, newNtfCredsSMPVersion) @@ -337,11 +341,14 @@ data AgentClient = AgentClient xftpClients :: TMap XFTPTransportSession XFTPClientVar, useNetworkConfig :: TVar (NetworkConfig, NetworkConfig), -- (slow, fast) networks presetDomains :: [HostName], + presetServers :: [SMPServer], userNetworkInfo :: TVar UserNetworkInfo, userNetworkUpdated :: TVar (Maybe UTCTime), subscrConns :: TVar (Set ConnId), currentSubs :: TSessionSubs, removedSubs :: TMap (UserId, SMPServer) (TMap SMP.RecipientId SMPClientError), + clientNotices :: TMap (Maybe SMPServer) (Maybe SystemSeconds), + clientNoticesLock :: TMVar (), workerSeq :: TVar Int, smpDeliveryWorkers :: TMap SndQAddr (Worker, TMVar ()), asyncCmdWorkers :: TMap (ConnId, Maybe SMPServer) Worker, @@ -487,8 +494,8 @@ data UserNetworkType = UNNone | UNCellular | UNWifi | UNEthernet | UNOther deriving (Eq, Show) -- | Creates an SMP agent client instance that receives commands and sends responses via 'TBQueue's. -newAgentClient :: Int -> InitialAgentServers -> UTCTime -> Env -> IO AgentClient -newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg, presetDomains} currentTs agentEnv = do +newAgentClient :: Int -> InitialAgentServers -> UTCTime -> Map (Maybe SMPServer) (Maybe SystemSeconds) -> Env -> IO AgentClient +newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg, presetDomains, presetServers} currentTs notices agentEnv = do let cfg = config agentEnv qSize = tbqSize cfg proxySessTs <- newTVarIO =<< getCurrentTime @@ -509,6 +516,8 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg, presetDomai subscrConns <- newTVarIO S.empty currentSubs <- SS.emptyIO removedSubs <- TM.emptyIO + clientNotices <- newTVarIO notices + clientNoticesLock <- newTMVarIO () workerSeq <- newTVarIO 0 smpDeliveryWorkers <- TM.emptyIO asyncCmdWorkers <- TM.emptyIO @@ -542,11 +551,14 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg, presetDomai xftpClients, useNetworkConfig, presetDomains, + presetServers, userNetworkInfo, userNetworkUpdated, subscrConns, currentSubs, removedSubs, + clientNotices, + clientNoticesLock, workerSeq, smpDeliveryWorkers, asyncCmdWorkers, @@ -1406,6 +1418,7 @@ newRcvQueue_ c nm userId connId (ProtoServerWithAuth srv auth) vRange cqrd enabl clientService = ClientService DBNewEntity <$> serviceId, status = New, enableNtfs, + clientNoticeId = Nothing, dbQueueId = DBNewEntity, primary = True, dbReplaceQueueId = Nothing, @@ -1456,10 +1469,10 @@ newRcvQueue_ c nm userId connId (ProtoServerWithAuth srv auth) vRange cqrd enabl newErr :: String -> AM (Maybe ShortLinkCreds) newErr = throwE . BROKER (B.unpack $ strEncode srv) . UNEXPECTED . ("Create queue: " <>) -processSubResults :: AgentClient -> SMPTransportSession -> SessionId -> NonEmpty (RcvQueueSub, Either SMPClientError (Maybe ServiceId)) -> STM () +processSubResults :: AgentClient -> SMPTransportSession -> SessionId -> NonEmpty (RcvQueueSub, Either SMPClientError (Maybe ServiceId)) -> STM [(RcvQueueSub, Maybe ClientNotice)] processSubResults c tSess@(userId, srv, _) sessId rs = do pendingSubs <- SS.getPendingSubs tSess $ currentSubs c - let (failed, subscribed, ignored) = foldr (partitionResults pendingSubs) (M.empty, [], 0) rs + let (failed, subscribed, notices, ignored) = foldr (partitionResults pendingSubs) (M.empty, [], [], 0) rs unless (M.null failed) $ do incSMPServerStat' c userId srv connSubErrs $ M.size failed failSubscriptions c tSess failed @@ -1467,19 +1480,28 @@ processSubResults c tSess@(userId, srv, _) sessId rs = do incSMPServerStat' c userId srv connSubscribed $ length subscribed SS.batchAddActiveSubs tSess sessId subscribed $ currentSubs c unless (ignored == 0) $ incSMPServerStat' c userId srv connSubIgnored ignored + pure notices where partitionResults :: Map SMP.RecipientId RcvQueueSub -> (RcvQueueSub, Either SMPClientError (Maybe ServiceId)) -> - (Map SMP.RecipientId SMPClientError, [RcvQueueSub], Int) -> - (Map SMP.RecipientId SMPClientError, [RcvQueueSub], Int) - partitionResults pendingSubs (rq, r) acc@(failed, subscribed, ignored) = case r of - Left e - | temporaryClientError e -> acc - | otherwise -> (M.insert (queueId rq) e failed, subscribed, ignored) + (Map SMP.RecipientId SMPClientError, [RcvQueueSub], [(RcvQueueSub, Maybe ClientNotice)], Int) -> + (Map SMP.RecipientId SMPClientError, [RcvQueueSub], [(RcvQueueSub, Maybe ClientNotice)], Int) + partitionResults pendingSubs (rq@RcvQueueSub {rcvId, clientNoticeId}, r) acc@(failed, subscribed, notices, ignored) = case r of + Left e -> case smpErrorClientNotice e of + Just notice_ -> (failed', subscribed, (rq, notice_) : notices, ignored) + where + notices' = if isJust notice_ || isJust clientNoticeId then (rq, notice_) : notices else notices + Nothing + | temporaryClientError e -> acc + | otherwise -> (failed', subscribed, notices, ignored) + where + failed' = M.insert rcvId e failed Right _serviceId -- TODO [certs rcv] store association with the service - | queueId rq `M.member` pendingSubs -> (failed, rq : subscribed, ignored) - | otherwise -> (failed, subscribed, ignored + 1) + | rcvId `M.member` pendingSubs -> (failed, rq : subscribed, notices', ignored) + | otherwise -> (failed, subscribed, notices', ignored + 1) + where + notices' = if isJust clientNoticeId then (rq, Nothing) : notices else notices temporaryAgentError :: AgentErrorType -> Bool temporaryAgentError = \case @@ -1588,12 +1610,18 @@ subscribeSessQueues_ c withEvents qs = sendClientBatch_ "SUB" False subscribe_ c if withEvents then Just . S.fromList . map qConnId . M.elems <$> atomically (SS.getActiveSubs tSess $ currentSubs c) else pure Nothing - active <- - atomically $ - ifM + active <- E.uninterruptibleMask_ $ do + (active, notices) <- atomically $ do + r@(_, notices) <- ifM (activeClientSession c tSess sessId) - (processSubResults c tSess sessId rs $> True) - (incSMPServerStat' c userId srv connSubIgnored (length rs) $> False) + ((True,) <$> processSubResults c tSess sessId rs) + ((False, []) <$ incSMPServerStat' c userId srv connSubIgnored (length rs)) + unless (null notices) $ takeTMVar $ clientNoticesLock c + pure r + unless (null notices) $ void $ + (processClientNotices c tSess notices `runReaderT` agentEnv c) + `E.finally` atomically (putTMVar (clientNoticesLock c) ()) + pure active forM_ cs_ $ \cs -> do let (errs, okConns) = partitionEithers $ map (\(RcvQueueSub {connId}, r) -> bimap (connId,) (const connId) r) $ L.toList rs conns = filter (`S.notMember` cs) okConns @@ -1611,6 +1639,17 @@ subscribeSessQueues_ c withEvents qs = sendClientBatch_ "SUB" False subscribe_ c tSess = transportSession' smp sessId = sessionId $ thParams smp +processClientNotices :: AgentClient -> SMPTransportSession -> [(RcvQueueSub, Maybe ClientNotice)] -> AM' () +processClientNotices c@AgentClient {presetServers} tSess notices = do + now <- liftIO getSystemSeconds + tryAllErrors' (withStore' c $ \db -> (,) <$> updateClientNotices db tSess now notices <*> getClientNotices db presetServers) >>= \case + Right (noticeIds, clntNotices) -> atomically $ do + SS.updateClientNotices tSess noticeIds $ currentSubs c + writeTVar (clientNotices c) clntNotices + Left e -> do + logError $ "processClientNotices error: " <> tshow e + notifySub' c "" $ ERR e + activeClientSession :: AgentClient -> SMPTransportSession -> SessionId -> STM Bool activeClientSession c tSess sessId = sameSess <$> tryReadSessVar tSess (smpClients c) where diff --git a/src/Simplex/Messaging/Agent/Env/SQLite.hs b/src/Simplex/Messaging/Agent/Env/SQLite.hs index a6cd86d11..57bc11e3c 100644 --- a/src/Simplex/Messaging/Agent/Env/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Env/SQLite.hs @@ -90,7 +90,8 @@ data InitialAgentServers = InitialAgentServers ntf :: [NtfServer], xftp :: Map UserId (NonEmpty (ServerCfg 'PXFTP)), netCfg :: NetworkConfig, - presetDomains :: [HostName] + presetDomains :: [HostName], + presetServers :: [SMPServer] } data ServerCfg p = ServerCfg diff --git a/src/Simplex/Messaging/Agent/Protocol.hs b/src/Simplex/Messaging/Agent/Protocol.hs index a2b7df4c9..2b220f948 100644 --- a/src/Simplex/Messaging/Agent/Protocol.hs +++ b/src/Simplex/Messaging/Agent/Protocol.hs @@ -168,6 +168,7 @@ module Simplex.Messaging.Agent.Protocol updateSMPServerHosts, shortenShortLink, restoreShortLink, + isPresetServer, linkUserData, linkUserData', ) @@ -1624,15 +1625,16 @@ shortenShortLink presetSrvs = \case CSLInvitation sch srv lnkId linkKey -> CSLInvitation sch (shortServer srv) lnkId linkKey CSLContact sch ct srv linkKey -> CSLContact sch ct (shortServer srv) linkKey where - shortServer srv@(SMPServer hs@(h :| _) p kh) = - if isPresetServer then SMPServerOnlyHost h else srv - where - isPresetServer = case findPresetServer srv presetSrvs of - Just (SMPServer hs' p' kh') -> - all (`elem` hs') hs - && (p == p' || (null p' && (p == "443" || p == "5223"))) - && kh == kh' - Nothing -> False + shortServer srv@(SMPServer (h :| _) _ _) = + if isPresetServer srv presetSrvs then SMPServerOnlyHost h else srv + +isPresetServer :: Foldable t => SMPServer -> t SMPServer -> Bool +isPresetServer srv@(SMPServer hs p kh) presetSrvs = case findPresetServer srv presetSrvs of + Just (SMPServer hs' p' kh') -> + all (`elem` hs') hs + && (p == p' || (null p' && (p == "443" || p == "5223"))) + && kh == kh' + Nothing -> False -- explicit bidirectional is used for ghc 8.10.7 compatibility, [h]/[] patterns are not reversible. pattern SMPServerOnlyHost :: TransportHost -> SMPServer @@ -1650,7 +1652,7 @@ restoreShortLink presetSrvs = \case s@(SMPServerOnlyHost _) -> fromMaybe s $ findPresetServer s presetSrvs s -> s -findPresetServer :: SMPServer -> NonEmpty SMPServer -> Maybe SMPServer +findPresetServer :: Foldable t => SMPServer -> t SMPServer -> Maybe SMPServer findPresetServer ProtocolServer {host = h :| _} = find (\ProtocolServer {host = h' :| _} -> h == h') {-# INLINE findPresetServer #-} @@ -1871,6 +1873,8 @@ data AgentErrorType BROKER {brokerAddress :: String, brokerErr :: BrokerErrorType} | -- | errors of other agents AGENT {agentErr :: SMPAgentError} + | -- | client notice + NOTICE {server :: Text, preset :: Bool, expiresAt :: Maybe UTCTime} | -- | agent implementation or dependency errors INTERNAL {internalErr :: String} | -- | critical agent errors that should be shown to the user, optionally with restart button diff --git a/src/Simplex/Messaging/Agent/Store.hs b/src/Simplex/Messaging/Agent/Store.hs index 849f6159c..c054cb267 100644 --- a/src/Simplex/Messaging/Agent/Store.hs +++ b/src/Simplex/Messaging/Agent/Store.hs @@ -90,6 +90,8 @@ data StoredRcvQueue (q :: DBStored) = RcvQueue status :: QueueStatus, -- | to enable notifications for this queue - this field is duplicated from ConnData enableNtfs :: Bool, + -- | client notice + clientNoticeId :: Maybe NoticeId, -- | database queue ID (within connection) dbQueueId :: DBEntityId' q, -- | True for a primary or a next primary queue of the connection (next if dbReplaceQueueId is set) @@ -113,6 +115,7 @@ data RcvQueueSub = RcvQueueSub rcvPrivateKey :: RcvPrivateAuthKey, status :: QueueStatus, enableNtfs :: Bool, + clientNoticeId :: Maybe NoticeId, dbQueueId :: Int64, primary :: Bool, dbReplaceQueueId :: Maybe Int64 @@ -120,8 +123,8 @@ data RcvQueueSub = RcvQueueSub deriving (Show) rcvQueueSub :: RcvQueue -> RcvQueueSub -rcvQueueSub RcvQueue {userId, connId, server, rcvId, rcvPrivateKey, status, enableNtfs, dbQueueId = DBEntityId dbQueueId, primary, dbReplaceQueueId} = - RcvQueueSub {userId, connId, server, rcvId, rcvPrivateKey, status, enableNtfs, dbQueueId, primary, dbReplaceQueueId} +rcvQueueSub RcvQueue {userId, connId, server, rcvId, rcvPrivateKey, status, enableNtfs, clientNoticeId, dbQueueId = DBEntityId dbQueueId, primary, dbReplaceQueueId} = + RcvQueueSub {userId, connId, server, rcvId, rcvPrivateKey, status, enableNtfs, clientNoticeId, dbQueueId, primary, dbReplaceQueueId} data ShortLinkCreds = ShortLinkCreds { shortLinkId :: SMP.LinkId, @@ -401,6 +404,8 @@ data ConnData = ConnData } deriving (Eq, Show) +type NoticeId = Int64 + -- this function should be mirrored in the clients ratchetSyncAllowed :: ConnData -> Bool ratchetSyncAllowed ConnData {ratchetSyncState, connAgentVersion} = diff --git a/src/Simplex/Messaging/Agent/Store/AgentStore.hs b/src/Simplex/Messaging/Agent/Store/AgentStore.hs index 7d408f1ee..ef66eca38 100644 --- a/src/Simplex/Messaging/Agent/Store/AgentStore.hs +++ b/src/Simplex/Messaging/Agent/Store/AgentStore.hs @@ -40,6 +40,8 @@ module Simplex.Messaging.Agent.Store.AgentStore updateNewConnRcv, updateNewConnSnd, createSndConn, + getClientNotices, + updateClientNotices, getSubscriptionServers, getUserServerRcvQueueSubs, unsetQueuesToSubscribe, @@ -264,6 +266,7 @@ import Data.Int (Int64) import Data.List (foldl', sortBy) import Data.List.NonEmpty (NonEmpty (..)) import qualified Data.List.NonEmpty as L +import Data.Map.Strict (Map) import qualified Data.Map.Strict as M import Data.Maybe (catMaybes, fromMaybe, isJust, isNothing, mapMaybe) import Data.Ord (Down (..)) @@ -283,6 +286,8 @@ import Simplex.Messaging.Agent.Store import Simplex.Messaging.Agent.Store.Common import qualified Simplex.Messaging.Agent.Store.DB as DB import Simplex.Messaging.Agent.Store.DB (Binary (..), BoolInt (..), FromField (..), ToField (..), blobFieldDecoder, fromTextField_) +import Simplex.Messaging.Agent.Store.Entity +import Simplex.Messaging.Client (SMPTransportSession) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Crypto.File (CryptoFile (..), CryptoFileArgs (..)) import Simplex.Messaging.Crypto.Ratchet (PQEncryption (..), PQSupport (..), RatchetX448, SkippedMsgDiff (..), SkippedMsgKeys) @@ -294,7 +299,8 @@ import Simplex.Messaging.Notifications.Types import Simplex.Messaging.Parsers (parseAll) import Simplex.Messaging.Protocol import qualified Simplex.Messaging.Protocol as SMP -import Simplex.Messaging.Agent.Store.Entity +import Simplex.Messaging.Protocol.Types +import Simplex.Messaging.SystemTime import Simplex.Messaging.Transport.Client (TransportHost) import Simplex.Messaging.Util import Simplex.Messaging.Version.Internal @@ -302,7 +308,6 @@ import qualified UnliftIO.Exception as E import UnliftIO.STM #if defined(dbPostgres) import Data.List (sortOn) -import Data.Map.Strict (Map) import Database.PostgreSQL.Simple (In (..), Only (..), Query, SqlError, (:.) (..)) import Database.PostgreSQL.Simple.Errors (constraintViolation) import Database.PostgreSQL.Simple.SqlQQ (sql) @@ -2061,6 +2066,69 @@ newQueueId_ (Only maxId : _) = DBEntityId (maxId + 1) -- * subscribe all connections +getClientNotices :: DB.Connection -> [SMPServer] -> IO (Map (Maybe SMPServer) (Maybe SystemSeconds)) +getClientNotices db presetSrvs = + M.map expiresAt . foldl' addNotice M.empty + <$> DB.query_ + db + [sql| + SELECT n.host, n.port, n.entity_id, COALESCE(n.server_key_hash, s.key_hash), n.created_at, n.notice_ttl + FROM client_notices n + JOIN servers s ON n.host = s.host AND n.port = s.port + WHERE n.protocol = 'smp' + |] + where + expiresAt (createdAt, ttl) = RoundedSystemTime . (createdAt +) <$> ttl + addNotice :: + Map (Maybe SMPServer) (Int64, Maybe Int64) -> + (NonEmpty TransportHost, ServiceName, RecipientId, C.KeyHash, Int64, Maybe Int64) -> + Map (Maybe SMPServer) (Int64, Maybe Int64) + addNotice m (host, port, _, keyHash, createdAt', ttl') = + let srv = SMPServer host port keyHash + srvKey + | isPresetServer srv presetSrvs = Nothing + | otherwise = Just srv + in M.alter (Just . addNoticeHost) srvKey m + where + -- sum of ttls starting from the latest createdAt + addNoticeHost :: Maybe (Int64, Maybe Int64) -> (Int64, Maybe Int64) + addNoticeHost = \case + Just (createdAt, ttl) -> (max createdAt createdAt', (+) <$> ttl <*> ttl') + Nothing -> (createdAt', ttl') + +updateClientNotices :: DB.Connection -> SMPTransportSession -> SystemSeconds -> [(RcvQueueSub, Maybe ClientNotice)] -> IO [(RecipientId, Maybe NoticeId)] +updateClientNotices db (_, srv, _) now = + mapM $ \(rq, notice_) -> maybe (deleteNotice rq) (upsertNotice rq) notice_ + where + deleteNotice RcvQueueSub {rcvId, clientNoticeId} = do + mapM_ (DB.execute db "DELETE FROM client_notices WHERE client_notice_id = ?" . Only) clientNoticeId + pure (rcvId, Nothing) + upsertNotice RcvQueueSub {rcvId, server} ClientNotice {ttl} = + getServerKeyHash_ db server >>= \case + Left _ -> pure (rcvId, Nothing) + Right keyHash_ -> do + noticeId_ <- + maybeFirstRow fromOnly $ + DB.query + db + [sql| + INSERT INTO client_notices(protocol, host, port, entity_id, server_key_hash, notice_ttl, created_at, updated_at) + VALUES ('smp',?,?,?,?,?,?,?) + ON CONFLICT (protocol, host, port, entity_id) + DO UPDATE SET + server_key_hash = EXCLUDED.server_key_hash, + notice_ttl = EXCLUDED.notice_ttl, + updated_at = EXCLUDED.updated_at + RETURNING client_notice_id + |] + (host srv, port srv, rcvId, keyHash_, ttl, now, now) + forM_ noticeId_ $ \noticeId -> do + DB.execute + db + "UPDATE rcv_queues SET client_notice_id = ? WHERE host = ? AND port = ?AND rcv_id = ?" + (noticeId, host srv, port srv, rcvId) + pure (rcvId, noticeId_) + getSubscriptionServers :: DB.Connection -> Bool -> IO [(UserId, SMPServer)] getSubscriptionServers db onlyNeeded = map toUserServer <$> DB.query_ db (select <> toSubscribe <> " c.deleted = 0 AND q.deleted = 0") @@ -2302,7 +2370,7 @@ rcvQueueQuery :: Query rcvQueueQuery = [sql| SELECT c.user_id, COALESCE(q.server_key_hash, s.key_hash), q.conn_id, q.host, q.port, q.rcv_id, q.rcv_private_key, q.rcv_dh_secret, - q.e2e_priv_key, q.e2e_dh_secret, q.snd_id, q.queue_mode, q.status, c.enable_ntfs, + q.e2e_priv_key, q.e2e_dh_secret, q.snd_id, q.queue_mode, q.status, c.enable_ntfs, q.client_notice_id, q.rcv_queue_id, q.rcv_primary, q.replace_rcv_queue_id, q.switch_status, q.smp_client_version, q.delete_errors, q.ntf_public_key, q.ntf_private_key, q.ntf_id, q.rcv_ntf_dh_secret, q.link_id, q.link_key, q.link_priv_sig_key, q.link_enc_fixed_data @@ -2313,13 +2381,13 @@ rcvQueueQuery = toRcvQueue :: (UserId, C.KeyHash, ConnId, NonEmpty TransportHost, ServiceName, SMP.RecipientId, SMP.RcvPrivateAuthKey, SMP.RcvDhSecret, C.PrivateKeyX25519, Maybe C.DhSecretX25519, SMP.SenderId, Maybe QueueMode) - :. (QueueStatus, Maybe BoolInt, DBEntityId, BoolInt, Maybe Int64, Maybe RcvSwitchStatus, Maybe VersionSMPC, Int) + :. (QueueStatus, Maybe BoolInt, Maybe NoticeId, DBEntityId, BoolInt, Maybe Int64, Maybe RcvSwitchStatus, Maybe VersionSMPC, Int) :. (Maybe SMP.NtfPublicAuthKey, Maybe SMP.NtfPrivateAuthKey, Maybe SMP.NotifierId, Maybe RcvNtfDhSecret) :. (Maybe SMP.LinkId, Maybe LinkKey, Maybe C.PrivateKeyEd25519, Maybe EncDataBytes) -> RcvQueue toRcvQueue ( (userId, keyHash, connId, host, port, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, queueMode) - :. (status, enableNtfs_, dbQueueId, BI primary, dbReplaceQueueId, rcvSwchStatus, smpClientVersion_, deleteErrors) + :. (status, enableNtfs_, clientNoticeId, dbQueueId, BI primary, dbReplaceQueueId, rcvSwchStatus, smpClientVersion_, deleteErrors) :. (ntfPublicKey_, ntfPrivateKey_, notifierId_, rcvNtfDhSecret_) :. (shortLinkId_, shortLinkKey_, linkPrivSigKey_, linkEncFixedData_) ) = @@ -2333,7 +2401,7 @@ toRcvQueue _ -> Nothing enableNtfs = maybe True unBI enableNtfs_ -- TODO [certs rcv] read client service - in RcvQueue {userId, connId, server, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, queueMode, shortLink, clientService = Nothing, status, enableNtfs, dbQueueId, primary, dbReplaceQueueId, rcvSwchStatus, smpClientVersion, clientNtfCreds, deleteErrors} + in RcvQueue {userId, connId, server, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, queueMode, shortLink, clientService = Nothing, status, enableNtfs, clientNoticeId, dbQueueId, primary, dbReplaceQueueId, rcvSwchStatus, smpClientVersion, clientNtfCreds, deleteErrors} -- | returns all connection queue credentials, the first queue is the primary one getRcvQueueSubsByConnId_ :: DB.Connection -> ConnId -> IO (Maybe (NonEmpty RcvQueueSub)) @@ -2344,17 +2412,17 @@ getRcvQueueSubsByConnId_ db connId = rcvQueueSubQuery :: Query rcvQueueSubQuery = [sql| - SELECT c.user_id, q.conn_id, q.host, q.port, COALESCE(q.server_key_hash, s.key_hash), q.rcv_id, q.rcv_private_key, q.status, c.enable_ntfs, + SELECT c.user_id, q.conn_id, q.host, q.port, COALESCE(q.server_key_hash, s.key_hash), q.rcv_id, q.rcv_private_key, q.status, c.enable_ntfs, q.client_notice_id, q.rcv_queue_id, q.rcv_primary, q.replace_rcv_queue_id FROM rcv_queues q JOIN servers s ON q.host = s.host AND q.port = s.port JOIN connections c ON q.conn_id = c.conn_id |] -toRcvQueueSub :: (UserId, ConnId, NonEmpty TransportHost, ServiceName, C.KeyHash, SMP.RecipientId, SMP.RcvPrivateAuthKey, QueueStatus, Maybe BoolInt, Int64, BoolInt, Maybe Int64) -> RcvQueueSub -toRcvQueueSub (userId, connId, host, port, keyHash, rcvId, rcvPrivateKey, status, enableNtfs_, dbQueueId, BI primary, dbReplaceQueueId) = +toRcvQueueSub :: (UserId, ConnId, NonEmpty TransportHost, ServiceName, C.KeyHash, SMP.RecipientId, SMP.RcvPrivateAuthKey) :. (QueueStatus, Maybe BoolInt, Maybe NoticeId, Int64, BoolInt, Maybe Int64) -> RcvQueueSub +toRcvQueueSub ((userId, connId, host, port, keyHash, rcvId, rcvPrivateKey) :. (status, enableNtfs_, clientNoticeId, dbQueueId, BI primary, dbReplaceQueueId)) = let enableNtfs = maybe True unBI enableNtfs_ - in RcvQueueSub {userId, connId, server = SMPServer host port keyHash, rcvId, rcvPrivateKey, status, enableNtfs, dbQueueId, primary, dbReplaceQueueId} + in RcvQueueSub {userId, connId, server = SMPServer host port keyHash, rcvId, rcvPrivateKey, status, enableNtfs, clientNoticeId, dbQueueId, primary, dbReplaceQueueId} getRcvQueueById :: DB.Connection -> ConnId -> Int64 -> IO (Either StoreError RcvQueue) getRcvQueueById db connId dbRcvId = diff --git a/src/Simplex/Messaging/Agent/Store/Postgres/Migrations/App.hs b/src/Simplex/Messaging/Agent/Store/Postgres/Migrations/App.hs index ecf0e1377..011d89031 100644 --- a/src/Simplex/Messaging/Agent/Store/Postgres/Migrations/App.hs +++ b/src/Simplex/Messaging/Agent/Store/Postgres/Migrations/App.hs @@ -9,6 +9,7 @@ import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250203_msg_bodies import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250322_short_links import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250702_conn_invitations_remove_cascade_delete import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251009_queue_to_subscribe +import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251010_client_notices import Simplex.Messaging.Agent.Store.Shared (Migration (..)) schemaMigrations :: [(String, Text, Maybe Text)] @@ -17,7 +18,8 @@ schemaMigrations = ("20250203_msg_bodies", m20250203_msg_bodies, Just down_m20250203_msg_bodies), ("20250322_short_links", m20250322_short_links, Just down_m20250322_short_links), ("20250702_conn_invitations_remove_cascade_delete", m20250702_conn_invitations_remove_cascade_delete, Just down_m20250702_conn_invitations_remove_cascade_delete), - ("20251009_queue_to_subscribe", m20251009_queue_to_subscribe, Just down_m20251009_queue_to_subscribe) + ("20251009_queue_to_subscribe", m20251009_queue_to_subscribe, Just down_m20251009_queue_to_subscribe), + ("20251010_client_notices", m20251010_client_notices, Just down_m20251010_client_notices) ] -- | The list of migrations in ascending order by date diff --git a/src/Simplex/Messaging/Agent/Store/Postgres/Migrations/M20241210_initial.hs b/src/Simplex/Messaging/Agent/Store/Postgres/Migrations/M20241210_initial.hs index 6b760342f..6f6b5f834 100644 --- a/src/Simplex/Messaging/Agent/Store/Postgres/Migrations/M20241210_initial.hs +++ b/src/Simplex/Messaging/Agent/Store/Postgres/Migrations/M20241210_initial.hs @@ -1,15 +1,14 @@ +{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE QuasiQuotes #-} module Simplex.Messaging.Agent.Store.Postgres.Migrations.M20241210_initial where import Data.Text (Text) -import qualified Data.Text as T import Text.RawString.QQ (r) m20241210_initial :: Text m20241210_initial = - T.pack - [r| + [r| CREATE TABLE users( user_id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, deleted SMALLINT NOT NULL DEFAULT 0 diff --git a/src/Simplex/Messaging/Agent/Store/Postgres/Migrations/M20250203_msg_bodies.hs b/src/Simplex/Messaging/Agent/Store/Postgres/Migrations/M20250203_msg_bodies.hs index 848566b77..6a0d85e45 100644 --- a/src/Simplex/Messaging/Agent/Store/Postgres/Migrations/M20250203_msg_bodies.hs +++ b/src/Simplex/Messaging/Agent/Store/Postgres/Migrations/M20250203_msg_bodies.hs @@ -1,15 +1,14 @@ +{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE QuasiQuotes #-} module Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250203_msg_bodies where import Data.Text (Text) -import qualified Data.Text as T import Text.RawString.QQ (r) m20250203_msg_bodies :: Text m20250203_msg_bodies = - T.pack - [r| + [r| ALTER TABLE snd_messages ADD COLUMN msg_encrypt_key BYTEA; ALTER TABLE snd_messages ADD COLUMN padded_msg_len BIGINT; @@ -25,8 +24,7 @@ CREATE INDEX idx_snd_messages_snd_message_body_id ON snd_messages(snd_message_bo down_m20250203_msg_bodies :: Text down_m20250203_msg_bodies = - T.pack - [r| + [r| DROP INDEX idx_snd_messages_snd_message_body_id; ALTER TABLE snd_messages DROP COLUMN snd_message_body_id; DROP TABLE snd_message_bodies; diff --git a/src/Simplex/Messaging/Agent/Store/Postgres/Migrations/M20250322_short_links.hs b/src/Simplex/Messaging/Agent/Store/Postgres/Migrations/M20250322_short_links.hs index be627ea0a..28394d51b 100644 --- a/src/Simplex/Messaging/Agent/Store/Postgres/Migrations/M20250322_short_links.hs +++ b/src/Simplex/Messaging/Agent/Store/Postgres/Migrations/M20250322_short_links.hs @@ -1,15 +1,14 @@ +{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE QuasiQuotes #-} module Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250322_short_links where import Data.Text (Text) -import qualified Data.Text as T import Text.RawString.QQ (r) m20250322_short_links :: Text m20250322_short_links = - T.pack - [r| + [r| ALTER TABLE rcv_queues ADD COLUMN link_id BYTEA; ALTER TABLE rcv_queues ADD COLUMN link_key BYTEA; ALTER TABLE rcv_queues ADD COLUMN link_priv_sig_key BYTEA; @@ -42,8 +41,7 @@ CREATE UNIQUE INDEX idx_inv_short_links_link_id ON inv_short_links(host, port, l down_m20250322_short_links :: Text down_m20250322_short_links = - T.pack - [r| + [r| DROP INDEX idx_rcv_queues_link_id; ALTER TABLE rcv_queues DROP COLUMN link_id; ALTER TABLE rcv_queues DROP COLUMN link_key; diff --git a/src/Simplex/Messaging/Agent/Store/Postgres/Migrations/M20250702_conn_invitations_remove_cascade_delete.hs b/src/Simplex/Messaging/Agent/Store/Postgres/Migrations/M20250702_conn_invitations_remove_cascade_delete.hs index a61a60d5d..8d4628673 100644 --- a/src/Simplex/Messaging/Agent/Store/Postgres/Migrations/M20250702_conn_invitations_remove_cascade_delete.hs +++ b/src/Simplex/Messaging/Agent/Store/Postgres/Migrations/M20250702_conn_invitations_remove_cascade_delete.hs @@ -1,15 +1,14 @@ +{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE QuasiQuotes #-} module Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250702_conn_invitations_remove_cascade_delete where import Data.Text (Text) -import qualified Data.Text as T import Text.RawString.QQ (r) m20250702_conn_invitations_remove_cascade_delete :: Text m20250702_conn_invitations_remove_cascade_delete = - T.pack - [r| + [r| ALTER TABLE conn_invitations DROP CONSTRAINT conn_invitations_contact_conn_id_fkey; ALTER TABLE conn_invitations ALTER COLUMN contact_conn_id DROP NOT NULL; @@ -23,8 +22,7 @@ ALTER TABLE conn_invitations down_m20250702_conn_invitations_remove_cascade_delete :: Text down_m20250702_conn_invitations_remove_cascade_delete = - T.pack - [r| + [r| ALTER TABLE conn_invitations DROP CONSTRAINT conn_invitations_contact_conn_id_fkey; ALTER TABLE conn_invitations ALTER COLUMN contact_conn_id SET NOT NULL; diff --git a/src/Simplex/Messaging/Agent/Store/Postgres/Migrations/M20251009_queue_to_subscribe.hs b/src/Simplex/Messaging/Agent/Store/Postgres/Migrations/M20251009_queue_to_subscribe.hs index 8a0bf5862..cac622586 100644 --- a/src/Simplex/Messaging/Agent/Store/Postgres/Migrations/M20251009_queue_to_subscribe.hs +++ b/src/Simplex/Messaging/Agent/Store/Postgres/Migrations/M20251009_queue_to_subscribe.hs @@ -1,23 +1,21 @@ +{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE QuasiQuotes #-} module Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251009_queue_to_subscribe where import Data.Text (Text) -import qualified Data.Text as T import Text.RawString.QQ (r) m20251009_queue_to_subscribe :: Text m20251009_queue_to_subscribe = - T.pack - [r| + [r| ALTER TABLE rcv_queues ADD COLUMN to_subscribe SMALLINT NOT NULL DEFAULT 0; CREATE INDEX idx_rcv_queues_to_subscribe ON rcv_queues(to_subscribe); |] down_m20251009_queue_to_subscribe :: Text down_m20251009_queue_to_subscribe = - T.pack - [r| + [r| DROP INDEX idx_rcv_queues_to_subscribe; ALTER TABLE rcv_queues DROP COLUMN to_subscribe; |] diff --git a/src/Simplex/Messaging/Agent/Store/Postgres/Migrations/M20251010_client_notices.hs b/src/Simplex/Messaging/Agent/Store/Postgres/Migrations/M20251010_client_notices.hs new file mode 100644 index 000000000..ba34c8299 --- /dev/null +++ b/src/Simplex/Messaging/Agent/Store/Postgres/Migrations/M20251010_client_notices.hs @@ -0,0 +1,40 @@ +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE QuasiQuotes #-} + +module Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251010_client_notices where + +import Data.Text (Text) +import Text.RawString.QQ (r) + +m20251010_client_notices :: Text +m20251010_client_notices = + [r| +CREATE TABLE client_notices( + client_notice_id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + protocol TEXT NOT NULL, + host TEXT NOT NULL, + port TEXT NOT NULL, + entity_id BYTEA NOT NULL, + server_key_hash BYTEA, + notice_ttl BIGINT, + created_at BIGINT NOT NULL, + updated_at BIGINT NOT NULL +); + +CREATE UNIQUE INDEX idx_client_notices_entity ON client_notices(protocol, host, port, entity_id); + +ALTER TABLE rcv_queues ADD COLUMN client_notice_id BIGINT +REFERENCES client_notices ON UPDATE RESTRICT ON DELETE SET NULL; + +CREATE INDEX idx_rcv_queues_client_notice_id ON rcv_queues(client_notice_id); +|] + +down_m20251010_client_notices :: Text +down_m20251010_client_notices = + [r| +DROP INDEX idx_rcv_queues_client_notice_id; +ALTER TABLE rcv_queues DROP COLUMN client_notice_id; + +DROP INDEX idx_client_notices_entity; +DROP TABLE client_notices; +|] diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/App.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/App.hs index 48a9df8b6..7371d9584 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/App.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/App.hs @@ -45,6 +45,7 @@ import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20250203_msg_bodies import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20250322_short_links import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20250702_conn_invitations_remove_cascade_delete import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251009_queue_to_subscribe +import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251010_client_notices import Simplex.Messaging.Agent.Store.Shared (Migration (..)) schemaMigrations :: [(String, Query, Maybe Query)] @@ -89,7 +90,8 @@ schemaMigrations = ("m20250203_msg_bodies", m20250203_msg_bodies, Just down_m20250203_msg_bodies), ("m20250322_short_links", m20250322_short_links, Just down_m20250322_short_links), ("m20250702_conn_invitations_remove_cascade_delete", m20250702_conn_invitations_remove_cascade_delete, Just down_m20250702_conn_invitations_remove_cascade_delete), - ("m20251009_queue_to_subscribe", m20251009_queue_to_subscribe, Just down_m20251009_queue_to_subscribe) + ("m20251009_queue_to_subscribe", m20251009_queue_to_subscribe, Just down_m20251009_queue_to_subscribe), + ("m20251010_client_notices", m20251010_client_notices, Just down_m20251010_client_notices) ] -- | The list of migrations in ascending order by date diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20251010_client_notices.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20251010_client_notices.hs new file mode 100644 index 000000000..dddd92781 --- /dev/null +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20251010_client_notices.hs @@ -0,0 +1,39 @@ +{-# LANGUAGE QuasiQuotes #-} + +module Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251010_client_notices where + +import Database.SQLite.Simple (Query) +import Database.SQLite.Simple.QQ (sql) + +m20251010_client_notices :: Query +m20251010_client_notices = + [sql| +CREATE TABLE client_notices( + client_notice_id INTEGER PRIMARY KEY AUTOINCREMENT, + protocol TEXT NOT NULL, + host TEXT NOT NULL, + port TEXT NOT NULL, + entity_id BLOB NOT NULL, + server_key_hash BLOB, + notice_ttl INTEGER, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL +); + +CREATE UNIQUE INDEX idx_client_notices_entity ON client_notices(protocol, host, port, entity_id); + +ALTER TABLE rcv_queues ADD COLUMN client_notice_id INTEGER +REFERENCES client_notices ON UPDATE RESTRICT ON DELETE SET NULL; + +CREATE INDEX idx_rcv_queues_client_notice_id ON rcv_queues(client_notice_id); +|] + +down_m20251010_client_notices :: Query +down_m20251010_client_notices = + [sql| +DROP INDEX idx_rcv_queues_client_notice_id; +ALTER TABLE rcv_queues DROP COLUMN client_notice_id; + +DROP INDEX idx_client_notices_entity; +DROP TABLE client_notices; +|] diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql index 0a3439124..d2838a7b0 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql @@ -47,7 +47,6 @@ CREATE TABLE rcv_queues( ntf_private_key BLOB, ntf_id BLOB, rcv_ntf_dh_secret BLOB, - to_subscribe INTEGER NOT NULL DEFAULT 0, rcv_queue_id INTEGER CHECK(rcv_queue_id NOT NULL), rcv_primary INTEGER CHECK(rcv_primary NOT NULL), replace_rcv_queue_id INTEGER NULL, @@ -61,6 +60,9 @@ CREATE TABLE rcv_queues( link_priv_sig_key BLOB, link_enc_fixed_data BLOB, queue_mode TEXT, + to_subscribe INTEGER NOT NULL DEFAULT 0, + client_notice_id INTEGER + REFERENCES client_notices ON UPDATE RESTRICT ON DELETE SET NULL, PRIMARY KEY(host, port, rcv_id), FOREIGN KEY(host, port) REFERENCES servers ON DELETE RESTRICT ON UPDATE CASCADE, @@ -437,8 +439,18 @@ CREATE TABLE inv_short_links( snd_id BLOB, FOREIGN KEY(host, port) REFERENCES servers ON DELETE RESTRICT ON UPDATE CASCADE ); +CREATE TABLE client_notices( + client_notice_id INTEGER PRIMARY KEY AUTOINCREMENT, + protocol TEXT NOT NULL, + host TEXT NOT NULL, + port TEXT NOT NULL, + entity_id BLOB NOT NULL, + server_key_hash BLOB, + notice_ttl INTEGER, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL +); CREATE UNIQUE INDEX idx_rcv_queues_ntf ON rcv_queues(host, port, ntf_id); -CREATE INDEX idx_rcv_queues_to_subscribe ON rcv_queues(to_subscribe); CREATE UNIQUE INDEX idx_rcv_queue_id ON rcv_queues(conn_id, rcv_queue_id); CREATE UNIQUE INDEX idx_snd_queue_id ON snd_queues(conn_id, snd_queue_id); CREATE INDEX idx_snd_message_deliveries ON snd_message_deliveries( @@ -573,3 +585,11 @@ CREATE UNIQUE INDEX idx_inv_short_links_link_id ON inv_short_links( port, link_id ); +CREATE INDEX idx_rcv_queues_to_subscribe ON rcv_queues(to_subscribe); +CREATE UNIQUE INDEX idx_client_notices_entity ON client_notices( + protocol, + host, + port, + entity_id +); +CREATE INDEX idx_rcv_queues_client_notice_id ON rcv_queues(client_notice_id); diff --git a/src/Simplex/Messaging/Agent/TSessionSubs.hs b/src/Simplex/Messaging/Agent/TSessionSubs.hs index b43fbd336..cce103fe6 100644 --- a/src/Simplex/Messaging/Agent/TSessionSubs.hs +++ b/src/Simplex/Messaging/Agent/TSessionSubs.hs @@ -1,5 +1,6 @@ {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedStrings #-} module Simplex.Messaging.Agent.TSessionSubs @@ -22,6 +23,7 @@ module Simplex.Messaging.Agent.TSessionSubs getPendingSubs, getActiveSubs, setSubsPending, + updateClientNotices, foldSessionSubs, mapSubs, ) @@ -29,6 +31,8 @@ where import Control.Concurrent.STM import Control.Monad +import Data.Int (Int64) +import Data.List (foldl') import Data.Map.Strict (Map) import qualified Data.Map.Strict as M import Data.Maybe (isJust) @@ -182,6 +186,11 @@ setSubsPending_ s sessId_ = do modifyTVar' (pendingSubs s) $ M.union subs pure subs +updateClientNotices :: SMPTransportSession -> [(RecipientId, Maybe Int64)] -> TSessionSubs -> STM () +updateClientNotices tSess noticeIds ss = do + s <- getSessSubs tSess ss + modifyTVar' (pendingSubs s) $ \m -> foldl' (\m' (rcvId, clientNoticeId) -> M.adjust (\rq -> rq {clientNoticeId}) rcvId m') m noticeIds + foldSessionSubs :: (a -> (SMPTransportSession, SessSubs) -> IO a) -> a -> TSessionSubs -> IO a foldSessionSubs f a = foldM f a . M.assocs <=< readTVarIO . sessionSubs diff --git a/src/Simplex/Messaging/Client.hs b/src/Simplex/Messaging/Client.hs index f78eb8e05..27840b092 100644 --- a/src/Simplex/Messaging/Client.hs +++ b/src/Simplex/Messaging/Client.hs @@ -40,6 +40,7 @@ module Simplex.Messaging.Client transportHost', transportSession', useWebPort, + isPresetDomain, -- * SMP protocol command functions createSMPQueue, @@ -103,6 +104,7 @@ module Simplex.Messaging.Client temporaryClientError, smpClientServiceError, smpProxyError, + smpErrorClientNotice, textToHostMode, ServerTransmissionBatch, ServerTransmission (..), @@ -157,6 +159,7 @@ import Simplex.Messaging.Encoding import Simplex.Messaging.Encoding.String import Simplex.Messaging.Parsers (defaultJSON, dropPrefix, enumJSON, sumTypeJSON) import Simplex.Messaging.Protocol +import Simplex.Messaging.Protocol.Types import Simplex.Messaging.Server.QueueStore.QueueInfo import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM @@ -715,13 +718,16 @@ getProtocolClient g nm transportSession@(_, srv, _) cfg@ProtocolClientConfig {qS Right _ -> logWarn "SMP client unprocessed event" useWebPort :: NetworkConfig -> [HostName] -> ProtocolServer p -> Bool -useWebPort cfg presetDomains srv = case smpWebPortServers cfg of +useWebPort cfg presetDomains ProtocolServer {host = h :| _} = case smpWebPortServers cfg of SWPAll -> True - SWPPreset -> case srv of - ProtocolServer {host = THDomainName h :| _} -> any (`isSuffixOf` h) presetDomains - _ -> False + SWPPreset -> isPresetDomain presetDomains h SWPOff -> False +isPresetDomain :: [HostName] -> TransportHost -> Bool +isPresetDomain presetDomains = \case + THDomainName h -> any (`isSuffixOf` h) presetDomains + _ -> False + unexpectedResponse :: Show r => r -> ProtocolClientError err unexpectedResponse = PCEUnexpectedResponse . B.pack . take 32 . show @@ -794,6 +800,12 @@ smpProxyError = \case PCECryptoError _ -> CRYPTO PCEIOError _ -> INTERNAL +smpErrorClientNotice :: SMPClientError -> Maybe (Maybe ClientNotice) +smpErrorClientNotice = \case + PCEProtocolError (BLOCKED BlockingInfo {notice}) -> Just notice + _ -> Nothing +{-# INLINE smpErrorClientNotice #-} + -- | Create a new SMP queue. -- -- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#create-queue-command diff --git a/src/Simplex/Messaging/Encoding/String.hs b/src/Simplex/Messaging/Encoding/String.hs index d254aaaa6..a97d86c33 100644 --- a/src/Simplex/Messaging/Encoding/String.hs +++ b/src/Simplex/Messaging/Encoding/String.hs @@ -10,6 +10,9 @@ module Simplex.Messaging.Encoding.String strToJSON, strToJEncoding, strParseJSON, + textToJSON, + textToEncoding, + textParseJSON, base64urlP, strEncodeList, strListP, @@ -225,9 +228,22 @@ _strP = A.space *> strP strToJSON :: StrEncoding a => a -> J.Value strToJSON = J.String . decodeLatin1 . strEncode +{-# INLINE strToJSON #-} strToJEncoding :: StrEncoding a => a -> J.Encoding strToJEncoding = JE.text . decodeLatin1 . strEncode +{-# INLINE strToJEncoding #-} strParseJSON :: StrEncoding a => String -> J.Value -> JT.Parser a strParseJSON name = J.withText name $ either fail pure . parseAll strP . encodeUtf8 + +textToJSON :: TextEncoding a => a -> J.Value +textToJSON = J.String . textEncode +{-# INLINE textToJSON #-} + +textToEncoding :: TextEncoding a => a -> J.Encoding +textToEncoding = JE.text . textEncode +{-# INLINE textToEncoding #-} + +textParseJSON :: TextEncoding a => String -> J.Value -> JT.Parser a +textParseJSON name = J.withText name $ maybe (fail name) pure . textDecode diff --git a/src/Simplex/Messaging/Notifications/Server.hs b/src/Simplex/Messaging/Notifications/Server.hs index ff36af4c3..43d97988e 100644 --- a/src/Simplex/Messaging/Notifications/Server.hs +++ b/src/Simplex/Messaging/Notifications/Server.hs @@ -67,9 +67,9 @@ import qualified Simplex.Messaging.Protocol as SMP import Simplex.Messaging.Server import Simplex.Messaging.Server.Control (CPClientRole (..)) import Simplex.Messaging.Server.Env.STM (StartOptions (..)) -import Simplex.Messaging.Server.QueueStore (getSystemDate) import Simplex.Messaging.Server.Stats (PeriodStats (..), PeriodStatCounts (..), periodStatCounts, periodStatDataCounts, updatePeriodStats) import Simplex.Messaging.Session +import Simplex.Messaging.SystemTime import Simplex.Messaging.TMap (TMap) import Simplex.Messaging.Transport (ASrvTransport, ATransport (..), THandle (..), THandleAuth (..), THandleParams (..), TProxy, Transport (..), TransportPeer (..), defaultSupportedParams) import Simplex.Messaging.Transport.Buffer (trimCR) diff --git a/src/Simplex/Messaging/Notifications/Server/Store.hs b/src/Simplex/Messaging/Notifications/Server/Store.hs index 63a81ac0f..cb22af000 100644 --- a/src/Simplex/Messaging/Notifications/Server/Store.hs +++ b/src/Simplex/Messaging/Notifications/Server/Store.hs @@ -25,7 +25,7 @@ import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding.String import Simplex.Messaging.Notifications.Protocol import Simplex.Messaging.Protocol (NtfPrivateAuthKey, NtfPublicAuthKey, SMPServer, ServiceId) -import Simplex.Messaging.Server.QueueStore (RoundedSystemTime) +import Simplex.Messaging.SystemTime import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Util (whenM, ($>>=)) @@ -61,10 +61,10 @@ data NtfTknData = NtfTknData tknDhSecret :: C.DhSecretX25519, tknRegCode :: NtfRegCode, tknCronInterval :: TVar Word16, - tknUpdatedAt :: TVar (Maybe RoundedSystemTime) + tknUpdatedAt :: TVar (Maybe SystemDate) } -mkNtfTknData :: NtfTokenId -> NewNtfEntity 'Token -> C.KeyPairX25519 -> C.DhSecretX25519 -> NtfRegCode -> RoundedSystemTime -> IO NtfTknData +mkNtfTknData :: NtfTokenId -> NewNtfEntity 'Token -> C.KeyPairX25519 -> C.DhSecretX25519 -> NtfRegCode -> SystemDate -> IO NtfTknData mkNtfTknData ntfTknId (NewNtfTkn token tknVerifyKey _) tknDhKeys tknDhSecret tknRegCode ts = do tknStatus <- newTVarIO NTRegistered tknCronInterval <- newTVarIO 0 diff --git a/src/Simplex/Messaging/Notifications/Server/Store/Migrations.hs b/src/Simplex/Messaging/Notifications/Server/Store/Migrations.hs index 226a02dc6..6a53ff4a2 100644 --- a/src/Simplex/Messaging/Notifications/Server/Store/Migrations.hs +++ b/src/Simplex/Messaging/Notifications/Server/Store/Migrations.hs @@ -1,11 +1,11 @@ {-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE QuasiQuotes #-} module Simplex.Messaging.Notifications.Server.Store.Migrations where import Data.List (sortOn) import Data.Text (Text) -import qualified Data.Text as T import Simplex.Messaging.Agent.Store.Shared import Text.RawString.QQ (r) @@ -23,8 +23,7 @@ ntfServerMigrations = sortOn name $ map migration ntfServerSchemaMigrations m20250417_initial :: Text m20250417_initial = - T.pack - [r| + [r| CREATE TABLE tokens( token_id BYTEA NOT NULL, push_provider TEXT NOT NULL, @@ -83,8 +82,7 @@ CREATE UNIQUE INDEX idx_last_notifications_token_subscription ON last_notificati m20250517_service_cert :: Text m20250517_service_cert = - T.pack - [r| + [r| ALTER TABLE smp_servers ADD COLUMN ntf_service_id BYTEA; ALTER TABLE subscriptions ADD COLUMN ntf_service_assoc BOOLEAN NOT NULL DEFAULT FALSE; @@ -95,8 +93,7 @@ CREATE INDEX idx_subscriptions_smp_server_id_ntf_service_status ON subscriptions down_m20250517_service_cert :: Text down_m20250517_service_cert = - T.pack - [r| + [r| DROP INDEX idx_subscriptions_smp_server_id_ntf_service_status; CREATE INDEX idx_subscriptions_smp_server_id_status ON subscriptions(smp_server_id, status); diff --git a/src/Simplex/Messaging/Notifications/Server/Store/Postgres.hs b/src/Simplex/Messaging/Notifications/Server/Store/Postgres.hs index 78891796f..80d946c8b 100644 --- a/src/Simplex/Messaging/Notifications/Server/Store/Postgres.hs +++ b/src/Simplex/Messaging/Notifications/Server/Store/Postgres.hs @@ -65,10 +65,10 @@ import Simplex.Messaging.Notifications.Server.Store.Migrations import Simplex.Messaging.Notifications.Server.Store.Types import Simplex.Messaging.Notifications.Server.StoreLog import Simplex.Messaging.Protocol (EntityId (..), EncNMsgMeta, ErrorType (..), NotifierId, NtfPrivateAuthKey, NtfPublicAuthKey, SMPServer, ServiceId, pattern SMPServer) -import Simplex.Messaging.Server.QueueStore (RoundedSystemTime, getSystemDate) import Simplex.Messaging.Server.QueueStore.Postgres (handleDuplicate, withLog_) import Simplex.Messaging.Server.QueueStore.Postgres.Config (PostgresStoreCfg (..)) import Simplex.Messaging.Server.StoreLog (openWriteStoreLog) +import Simplex.Messaging.SystemTime import Simplex.Messaging.Transport.Client (TransportHost) import Simplex.Messaging.Util (anyM, firstRow, maybeFirstRow, toChunks, tshow) import System.Exit (exitFailure) @@ -87,7 +87,7 @@ data NtfPostgresStore = NtfPostgresStore deletedTTL :: Int64 } -mkNtfTknRec :: NtfTokenId -> NewNtfEntity 'Token -> C.PrivateKeyX25519 -> C.DhSecretX25519 -> NtfRegCode -> RoundedSystemTime -> NtfTknRec +mkNtfTknRec :: NtfTokenId -> NewNtfEntity 'Token -> C.PrivateKeyX25519 -> C.DhSecretX25519 -> NtfRegCode -> SystemDate -> NtfTknRec mkNtfTknRec ntfTknId (NewNtfTkn token tknVerifyKey _) tknDhPrivKey tknDhSecret tknRegCode ts = NtfTknRec {ntfTknId, token, tknStatus = NTRegistered, tknVerifyKey, tknDhPrivKey, tknDhSecret, tknRegCode, tknCronInterval = 0, tknUpdatedAt = Just ts} @@ -170,7 +170,7 @@ updateTokenDate st db NtfTknRec {ntfTknId, tknUpdatedAt} = do void $ DB.execute db "UPDATE tokens SET updated_at = ? WHERE token_id = ?" (ts, ntfTknId) withLog "updateTokenDate" st $ \sl -> logUpdateTokenTime sl ntfTknId ts -type NtfTknRow = (NtfTokenId, PushProvider, Binary ByteString, NtfTknStatus, NtfPublicAuthKey, C.PrivateKeyX25519, C.DhSecretX25519, Binary ByteString, Word16, Maybe RoundedSystemTime) +type NtfTknRow = (NtfTokenId, PushProvider, Binary ByteString, NtfTknStatus, NtfPublicAuthKey, C.PrivateKeyX25519, C.DhSecretX25519, Binary ByteString, Word16, Maybe SystemDate) ntfTknQuery :: Query ntfTknQuery = diff --git a/src/Simplex/Messaging/Notifications/Server/Store/Types.hs b/src/Simplex/Messaging/Notifications/Server/Store/Types.hs index 39e303340..abac8d14e 100644 --- a/src/Simplex/Messaging/Notifications/Server/Store/Types.hs +++ b/src/Simplex/Messaging/Notifications/Server/Store/Types.hs @@ -16,7 +16,7 @@ import Simplex.Messaging.Encoding.String import Simplex.Messaging.Notifications.Protocol (DeviceToken, NtfRegCode, NtfSubStatus, NtfSubscriptionId, NtfTokenId, NtfTknStatus, SMPQueueNtf) import Simplex.Messaging.Notifications.Server.Store (NtfSubData (..), NtfTknData (..)) import Simplex.Messaging.Protocol (NotifierId, NtfPrivateAuthKey, NtfPublicAuthKey) -import Simplex.Messaging.Server.QueueStore (RoundedSystemTime) +import Simplex.Messaging.SystemTime data NtfTknRec = NtfTknRec { ntfTknId :: NtfTokenId, @@ -27,7 +27,7 @@ data NtfTknRec = NtfTknRec tknDhSecret :: C.DhSecretX25519, tknRegCode :: NtfRegCode, tknCronInterval :: Word16, - tknUpdatedAt :: Maybe RoundedSystemTime + tknUpdatedAt :: Maybe SystemDate } deriving (Show) diff --git a/src/Simplex/Messaging/Notifications/Server/StoreLog.hs b/src/Simplex/Messaging/Notifications/Server/StoreLog.hs index e71ebaf57..7c71ddb08 100644 --- a/src/Simplex/Messaging/Notifications/Server/StoreLog.hs +++ b/src/Simplex/Messaging/Notifications/Server/StoreLog.hs @@ -39,8 +39,8 @@ import Simplex.Messaging.Notifications.Protocol import Simplex.Messaging.Notifications.Server.Store import Simplex.Messaging.Notifications.Server.Store.Types import Simplex.Messaging.Protocol (EntityId (..), SMPServer, ServiceId) -import Simplex.Messaging.Server.QueueStore (RoundedSystemTime) import Simplex.Messaging.Server.StoreLog +import Simplex.Messaging.SystemTime import System.IO data NtfStoreLogRecord @@ -49,7 +49,7 @@ data NtfStoreLogRecord | UpdateToken NtfTokenId DeviceToken NtfRegCode | TokenCron NtfTokenId Word16 | DeleteToken NtfTokenId - | UpdateTokenTime NtfTokenId RoundedSystemTime + | UpdateTokenTime NtfTokenId SystemDate | CreateSubscription NtfSubRec | SubscriptionStatus NtfSubscriptionId NtfSubStatus NtfAssociatedService | DeleteSubscription NtfSubscriptionId @@ -103,7 +103,7 @@ logTokenCron s tknId cronInt = logNtfStoreRecord s $ TokenCron tknId cronInt logDeleteToken :: StoreLog 'WriteMode -> NtfTokenId -> IO () logDeleteToken s tknId = logNtfStoreRecord s $ DeleteToken tknId -logUpdateTokenTime :: StoreLog 'WriteMode -> NtfTokenId -> RoundedSystemTime -> IO () +logUpdateTokenTime :: StoreLog 'WriteMode -> NtfTokenId -> SystemDate -> IO () logUpdateTokenTime s tknId t = logNtfStoreRecord s $ UpdateTokenTime tknId t logCreateSubscription :: StoreLog 'WriteMode -> NtfSubRec -> IO () diff --git a/src/Simplex/Messaging/Protocol.hs b/src/Simplex/Messaging/Protocol.hs index 40314ad2a..13ac3f182 100644 --- a/src/Simplex/Messaging/Protocol.hs +++ b/src/Simplex/Messaging/Protocol.hs @@ -217,6 +217,7 @@ import Control.Applicative (optional, (<|>)) import Control.Exception (Exception, SomeException, displayException, fromException) import Control.Monad.Except import Data.Aeson (FromJSON (..), ToJSON (..)) +import qualified Data.Aeson as J import qualified Data.Aeson.TH as J import Data.Attoparsec.ByteString.Char8 (Parser, ()) import qualified Data.Attoparsec.ByteString.Char8 as A @@ -224,6 +225,7 @@ import Data.Bifunctor (bimap, first) import qualified Data.ByteString.Base64 as B64 import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B +import qualified Data.ByteString.Lazy as LB import Data.Char (isPrint, isSpace) import Data.Constraint (Dict (..)) import Data.Functor (($>)) @@ -249,6 +251,7 @@ import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding import Simplex.Messaging.Encoding.String import Simplex.Messaging.Parsers +import Simplex.Messaging.Protocol.Types import Simplex.Messaging.Server.QueueStore.QueueInfo import Simplex.Messaging.ServiceScheme import Simplex.Messaging.Transport @@ -1588,7 +1591,8 @@ toNetworkError e = maybe (NEConnectError err) fromTLSError (fromException e) _ -> NETLSError err data BlockingInfo = BlockingInfo - { reason :: BlockingReason + { reason :: BlockingReason, + notice :: Maybe ClientNotice } deriving (Eq, Show) @@ -1596,10 +1600,12 @@ data BlockingReason = BRSpam | BRContent deriving (Eq, Show) instance StrEncoding BlockingInfo where - strEncode BlockingInfo {reason} = "reason=" <> strEncode reason + strEncode BlockingInfo {reason, notice} = + "reason=" <> strEncode reason <> maybe "" ((",notice=" <>) . LB.toStrict . J.encode) notice strP = do reason <- "reason=" *> strP - pure BlockingInfo {reason} + notice <- optional $ ",notice=" *> (J.eitherDecodeStrict <$?> A.takeByteString) + pure BlockingInfo {reason, notice} instance Encoding BlockingInfo where smpEncode = strEncode @@ -1843,9 +1849,13 @@ instance ProtocolEncoding SMPVersion ErrorType BrokerMsg where | otherwise -> e END_ INFO info -> e (INFO_, ' ', info) OK -> e OK_ - ERR err -> case err of - BLOCKED _ | v < blockedEntitySMPVersion -> e (ERR_, ' ', AUTH) - _ -> e (ERR_, ' ', err) + ERR err -> e (ERR_, ' ', err') + where + err' = case err of + BLOCKED info + | v < blockedEntitySMPVersion -> AUTH + | v < clientNoticesSMPVersion -> BLOCKED info {notice = Nothing} + _ -> err PONG -> e PONG_ where e :: Encoding a => a -> ByteString diff --git a/src/Simplex/Messaging/Protocol/Types.hs b/src/Simplex/Messaging/Protocol/Types.hs new file mode 100644 index 000000000..0cfd660e3 --- /dev/null +++ b/src/Simplex/Messaging/Protocol/Types.hs @@ -0,0 +1,17 @@ +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE TemplateHaskell #-} + +module Simplex.Messaging.Protocol.Types where + +import qualified Data.Aeson.TH as J +import Data.Int (Int64) +import Simplex.Messaging.Parsers + +data ClientNotice = ClientNotice + { ttl :: Maybe Int64 -- seconds, Nothing - indefinite + } + deriving (Eq, Show) + +$(J.deriveJSON defaultJSON ''ClientNotice) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 7d6e00ab0..ec75a07d4 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -115,6 +115,7 @@ import Simplex.Messaging.Server.QueueStore.QueueInfo import Simplex.Messaging.Server.QueueStore.Types import Simplex.Messaging.Server.Stats import Simplex.Messaging.Server.StoreLog (foldLogLines) +import Simplex.Messaging.SystemTime import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport @@ -992,14 +993,20 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt else do r <- liftIO $ runExceptT $ do (q, QueueRec {status}) <- ExceptT $ getSenderQueue st qId - when (status == EntityActive) $ ExceptT $ blockQueue (queueStore st) q info - pure status + let rId = recipientId q + when (status /= EntityBlocked info) $ do + ExceptT $ blockQueue (queueStore st) q info + liftIO $ + getSubscribedClient rId (queueSubscribers $ subscribers srv) + $>>= readTVarIO + >>= mapM_ (\c -> atomically (writeTBQueue (sndQ c) ([(NoCorrId, rId, ERR $ BLOCKED info)] , []))) + pure (status, EntityBlocked info) case r of Left e -> liftIO $ hPutStrLn h $ "error: " <> show e - Right EntityActive -> do + Right (EntityActive, status') -> do incStat $ qBlocked stats - liftIO $ hPutStrLn h "ok, queue blocked" - Right status -> liftIO $ hPutStrLn h $ "ok, already inactive: " <> show status + liftIO $ hPutStrLn h $ "ok, queue blocked: " <> show status' + Right (_, status') -> liftIO $ hPutStrLn h $ "ok, already inactive: " <> show status' CPUnblock qId -> withUserRole $ unliftIO u $ do st <- asks msgStore r <- liftIO $ runExceptT $ do @@ -1679,7 +1686,7 @@ client -- This is tracked as "subscription" in the client to prevent these -- clients from being able to subscribe. pure s - getMessage_ :: Sub -> Maybe (MsgId, RoundedSystemTime) -> M s (Transmission BrokerMsg) + getMessage_ :: Sub -> Maybe (MsgId, SystemSeconds) -> M s (Transmission BrokerMsg) getMessage_ s delivered_ = do stats <- asks serverStats fmap (either err id) $ liftIO $ runExceptT $ @@ -1805,13 +1812,13 @@ client pure (corrId, entId, maybe OK (MSG . encryptMsg qr) msg_) _ -> pure $ err NO_MSG where - getDelivered :: Sub -> STM (Maybe (ServerSub, RoundedSystemTime)) + getDelivered :: Sub -> STM (Maybe (ServerSub, SystemSeconds)) getDelivered Sub {delivered, subThread} = do readTVar delivered $>>= \(msgId', ts) -> if msgId == msgId' || B.null msgId then writeTVar delivered Nothing $> Just (subThread, ts) else pure Nothing - updateStats :: ServerStats -> Bool -> RoundedSystemTime -> Message -> IO () + updateStats :: ServerStats -> Bool -> SystemSeconds -> Message -> IO () updateStats stats isGet deliveryTime = \case MessageQuota {} -> pure () Message {msgFlags} -> do @@ -2030,7 +2037,7 @@ client msgId' = messageId msg msgTs' = messageTs msg - setDelivered :: Sub -> Message -> RoundedSystemTime -> STM () + setDelivered :: Sub -> Message -> SystemSeconds -> STM () setDelivered Sub {delivered} msg !ts = do let !msgId = messageId msg writeTVar delivered $ Just (msgId, ts) diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 42652be6b..24cd6dfcc 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -123,6 +123,7 @@ import Simplex.Messaging.Server.QueueStore.Types import Simplex.Messaging.Server.Stats import Simplex.Messaging.Server.StoreLog import Simplex.Messaging.Server.StoreLog.ReadWrite +import Simplex.Messaging.SystemTime import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport (ASrvTransport, SMPVersion, THandleParams, TransportPeer (..), VersionRangeSMP) @@ -464,7 +465,7 @@ data SubscriptionThread = NoSub | SubPending | SubThread (Weak ThreadId) data Sub = Sub { subThread :: ServerSub, -- Nothing value indicates that sub - delivered :: TVar (Maybe (MsgId, RoundedSystemTime)) + delivered :: TVar (Maybe (MsgId, SystemSeconds)) } newServer :: IO (Server s) diff --git a/src/Simplex/Messaging/Server/MsgStore/Journal.hs b/src/Simplex/Messaging/Server/MsgStore/Journal.hs index e81c153de..5038c8826 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Journal.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Journal.hs @@ -84,6 +84,7 @@ import Simplex.Messaging.Server.QueueStore.Postgres #endif import Simplex.Messaging.Server.QueueStore.STM import Simplex.Messaging.Server.QueueStore.Types +import Simplex.Messaging.SystemTime import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Util (ifM, tshow, whenM, ($>>=), (<$$>)) diff --git a/src/Simplex/Messaging/Server/QueueStore.hs b/src/Simplex/Messaging/Server/QueueStore.hs index 9395f5bac..e05719cf6 100644 --- a/src/Simplex/Messaging/Server/QueueStore.hs +++ b/src/Simplex/Messaging/Server/QueueStore.hs @@ -1,8 +1,6 @@ {-# LANGUAGE CPP #-} {-# LANGUAGE DataKinds #-} -{-# LANGUAGE DerivingStrategies #-} {-# LANGUAGE DuplicateRecordFields #-} -{-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE KindSignatures #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE MultiParamTypeClasses #-} @@ -14,14 +12,13 @@ module Simplex.Messaging.Server.QueueStore where import Control.Applicative (optional, (<|>)) import qualified Data.ByteString.Char8 as B import Data.Functor (($>)) -import Data.Int (Int64) import Data.List.NonEmpty (NonEmpty) -import Data.Time.Clock.System (SystemTime (..), getSystemTime) import qualified Data.X509 as X import qualified Data.X509.Validation as XV import Simplex.Messaging.Encoding import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol +import Simplex.Messaging.SystemTime import Simplex.Messaging.Transport (SMPServiceRole) #if defined(dbServerPostgres) import Data.Text.Encoding (decodeLatin1, encodeUtf8) @@ -40,7 +37,7 @@ data QueueRec = QueueRec queueData :: Maybe (LinkId, QueueLinkData), notifier :: Maybe NtfCreds, status :: ServerEntityStatus, - updatedAt :: Maybe RoundedSystemTime, + updatedAt :: Maybe SystemDate, rcvServiceId :: Maybe ServiceId } deriving (Show) @@ -67,7 +64,7 @@ data ServiceRec = ServiceRec serviceRole :: SMPServiceRole, serviceCert :: X.CertificateChain, serviceCertHash :: XV.Fingerprint, -- SHA512 hash of long-term service client certificate. See comment for ClientHandshake. - serviceCreatedAt :: RoundedSystemTime + serviceCreatedAt :: SystemDate } deriving (Show) @@ -111,22 +108,3 @@ instance FromField ServerEntityStatus where fromField = fromTextField_ $ eitherT instance ToField ServerEntityStatus where toField = toField . decodeLatin1 . strEncode #endif - -newtype RoundedSystemTime = RoundedSystemTime Int64 - deriving (Eq, Ord, Show) -#if defined(dbServerPostgres) - deriving newtype (FromField, ToField) -#endif - -instance StrEncoding RoundedSystemTime where - strEncode (RoundedSystemTime t) = strEncode t - strP = RoundedSystemTime <$> strP - -getRoundedSystemTime :: Int64 -> IO RoundedSystemTime -getRoundedSystemTime prec = (\t -> RoundedSystemTime $ (systemSeconds t `div` prec) * prec) <$> getSystemTime - -getSystemDate :: IO RoundedSystemTime -getSystemDate = getRoundedSystemTime 86400 - -getSystemSeconds :: IO RoundedSystemTime -getSystemSeconds = RoundedSystemTime . systemSeconds <$> getSystemTime diff --git a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs index 4a53dcdd4..e86bec07b 100644 --- a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs +++ b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs @@ -85,6 +85,7 @@ import Simplex.Messaging.Server.QueueStore.Postgres.Migrations (serverMigrations import Simplex.Messaging.Server.QueueStore.STM (STMService (..), readQueueRecIO) import Simplex.Messaging.Server.QueueStore.Types import Simplex.Messaging.Server.StoreLog +import Simplex.Messaging.SystemTime import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport (SMPServiceRole (..)) @@ -429,7 +430,7 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where setStatusDB "unblockQueue" st sq EntityActive $ withLog "unblockQueue" st (`logUnblockQueue` recipientId sq) - updateQueueTime :: PostgresQueueStore q -> q -> RoundedSystemTime -> IO (Either ErrorType QueueRec) + updateQueueTime :: PostgresQueueStore q -> q -> SystemDate -> IO (Either ErrorType QueueRec) updateQueueTime st sq t = withQueueRec sq "updateQueueTime" $ \q@QueueRec {updatedAt} -> if updatedAt == Just t @@ -641,7 +642,7 @@ type QueueRecRow = ( RecipientId, NonEmpty RcvPublicAuthKey, RcvDhSecret, SenderId, Maybe SndPublicAuthKey, Maybe QueueMode, Maybe NotifierId, Maybe NtfPublicAuthKey, Maybe RcvNtfDhSecret, Maybe ServiceId, - ServerEntityStatus, Maybe RoundedSystemTime, Maybe LinkId, Maybe ServiceId + ServerEntityStatus, Maybe SystemDate, Maybe LinkId, Maybe ServiceId ) queueRecToRow :: (RecipientId, QueueRec) -> QueueRecRow :. (Maybe EncDataBytes, Maybe EncDataBytes) @@ -709,11 +710,11 @@ mkNotifier (Just notifierId, Just notifierKey, Just rcvNtfDhSecret) ntfServiceId Just NtfCreds {notifierId, notifierKey, rcvNtfDhSecret, ntfServiceId} mkNotifier _ _ = Nothing -serviceRecToRow :: ServiceRec -> (ServiceId, SMPServiceRole, X.CertificateChain, Binary ByteString, RoundedSystemTime) +serviceRecToRow :: ServiceRec -> (ServiceId, SMPServiceRole, X.CertificateChain, Binary ByteString, SystemDate) serviceRecToRow ServiceRec {serviceId, serviceRole, serviceCert, serviceCertHash = XV.Fingerprint fp, serviceCreatedAt} = (serviceId, serviceRole, serviceCert, Binary fp, serviceCreatedAt) -rowToServiceRec :: (ServiceId, SMPServiceRole, X.CertificateChain, Binary ByteString, RoundedSystemTime) -> ServiceRec +rowToServiceRec :: (ServiceId, SMPServiceRole, X.CertificateChain, Binary ByteString, SystemDate) -> ServiceRec rowToServiceRec (serviceId, serviceRole, serviceCert, Binary fp, serviceCreatedAt) = ServiceRec {serviceId, serviceRole, serviceCert, serviceCertHash = XV.Fingerprint fp, serviceCreatedAt} @@ -792,4 +793,8 @@ instance FromField C.APublicAuthKey where fromField = blobFieldDecoder C.decodeP instance ToField EncDataBytes where toField (EncDataBytes s) = toField (Binary s) deriving newtype instance FromField EncDataBytes + +deriving newtype instance ToField (RoundedSystemTime t) + +deriving newtype instance FromField (RoundedSystemTime t) #endif diff --git a/src/Simplex/Messaging/Server/QueueStore/Postgres/Migrations.hs b/src/Simplex/Messaging/Server/QueueStore/Postgres/Migrations.hs index be14202c6..7ff8b9862 100644 --- a/src/Simplex/Messaging/Server/QueueStore/Postgres/Migrations.hs +++ b/src/Simplex/Messaging/Server/QueueStore/Postgres/Migrations.hs @@ -1,11 +1,11 @@ {-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE QuasiQuotes #-} module Simplex.Messaging.Server.QueueStore.Postgres.Migrations where import Data.List (sortOn) import Data.Text (Text) -import qualified Data.Text as T import Simplex.Messaging.Agent.Store.Shared import Text.RawString.QQ (r) @@ -26,8 +26,7 @@ serverMigrations = sortOn name $ map migration serverSchemaMigrations m20250207_initial :: Text m20250207_initial = - T.pack - [r| + [r| CREATE TABLE msg_queues( recipient_id BYTEA NOT NULL, recipient_key BYTEA NOT NULL, @@ -51,24 +50,21 @@ CREATE INDEX idx_msg_queues_deleted_at ON msg_queues (deleted_at); m20250319_updated_index :: Text m20250319_updated_index = - T.pack - [r| + [r| DROP INDEX idx_msg_queues_deleted_at; CREATE INDEX idx_msg_queues_updated_at ON msg_queues (deleted_at, updated_at); |] down_m20250319_updated_index :: Text down_m20250319_updated_index = - T.pack - [r| + [r| DROP INDEX idx_msg_queues_updated_at; CREATE INDEX idx_msg_queues_deleted_at ON msg_queues (deleted_at); |] m20250320_short_links :: Text m20250320_short_links = - T.pack - [r| + [r| ALTER TABLE msg_queues ADD COLUMN queue_mode TEXT, ADD COLUMN link_id BYTEA, @@ -88,8 +84,7 @@ CREATE UNIQUE INDEX idx_msg_queues_link_id ON msg_queues(link_id); down_m20250320_short_links :: Text down_m20250320_short_links = - T.pack - [r| + [r| ALTER TABLE msg_queues ADD COLUMN snd_secure BOOLEAN NOT NULL DEFAULT FALSE; UPDATE msg_queues SET snd_secure = TRUE WHERE queue_mode = 'M'; @@ -124,8 +119,7 @@ ALTER TABLE msg_queues RENAME COLUMN recipient_keys TO recipient_key; m20250514_service_certs :: Text m20250514_service_certs = - T.pack - [r| + [r| CREATE TABLE services( service_id BYTEA NOT NULL, service_role TEXT NOT NULL, @@ -147,8 +141,7 @@ CREATE INDEX idx_msg_queues_ntf_service_id ON msg_queues(ntf_service_id, deleted down_m20250514_service_certs :: Text down_m20250514_service_certs = - T.pack - [r| + [r| DROP INDEX idx_msg_queues_rcv_service_id; DROP INDEX idx_msg_queues_ntf_service_id; @@ -163,8 +156,7 @@ DROP TABLE services; m20250903_store_messages :: Text m20250903_store_messages = - T.pack - [r| + [r| CREATE TABLE messages( message_id BIGINT NOT NULL PRIMARY KEY GENERATED ALWAYS AS IDENTITY, recipient_id BYTEA NOT NULL REFERENCES msg_queues ON DELETE CASCADE ON UPDATE RESTRICT, @@ -434,8 +426,7 @@ $$; down_m20250903_store_messages :: Text down_m20250903_store_messages = - T.pack - [r| + [r| DROP FUNCTION write_message; DROP FUNCTION try_del_msg; DROP FUNCTION try_del_peek_msg; diff --git a/src/Simplex/Messaging/Server/QueueStore/STM.hs b/src/Simplex/Messaging/Server/QueueStore/STM.hs index 515a0ee77..ad98698db 100644 --- a/src/Simplex/Messaging/Server/QueueStore/STM.hs +++ b/src/Simplex/Messaging/Server/QueueStore/STM.hs @@ -41,6 +41,7 @@ import Simplex.Messaging.Protocol import Simplex.Messaging.Server.QueueStore import Simplex.Messaging.Server.QueueStore.Types import Simplex.Messaging.Server.StoreLog +import Simplex.Messaging.SystemTime import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport (SMPServiceRole (..)) @@ -251,7 +252,7 @@ instance StoreQueueClass q => QueueStoreClass q (STMQueueStore q) where setStatus (queueRec sq) EntityActive $>> withLog "unblockQueue" st (`logUnblockQueue` recipientId sq) - updateQueueTime :: STMQueueStore q -> q -> RoundedSystemTime -> IO (Either ErrorType QueueRec) + updateQueueTime :: STMQueueStore q -> q -> SystemDate -> IO (Either ErrorType QueueRec) updateQueueTime st sq t = withQueueRec qr update $>>= log' where qr = queueRec sq diff --git a/src/Simplex/Messaging/Server/QueueStore/Types.hs b/src/Simplex/Messaging/Server/QueueStore/Types.hs index ee155cf91..8de015421 100644 --- a/src/Simplex/Messaging/Server/QueueStore/Types.hs +++ b/src/Simplex/Messaging/Server/QueueStore/Types.hs @@ -14,6 +14,7 @@ import Data.List.NonEmpty (NonEmpty) import Data.Text (Text) import Simplex.Messaging.Protocol import Simplex.Messaging.Server.QueueStore +import Simplex.Messaging.SystemTime import Simplex.Messaging.TMap (TMap) class StoreQueueClass q where @@ -41,7 +42,7 @@ class StoreQueueClass q => QueueStoreClass q s where suspendQueue :: s -> q -> IO (Either ErrorType ()) blockQueue :: s -> q -> BlockingInfo -> IO (Either ErrorType ()) unblockQueue :: s -> q -> IO (Either ErrorType ()) - updateQueueTime :: s -> q -> RoundedSystemTime -> IO (Either ErrorType QueueRec) + updateQueueTime :: s -> q -> SystemDate -> IO (Either ErrorType QueueRec) deleteStoreQueue :: s -> q -> IO (Either ErrorType QueueRec) getCreateService :: s -> ServiceRec -> IO (Either ErrorType ServiceId) setQueueService :: (PartyI p, ServiceParty p) => s -> q -> SParty p -> Maybe ServiceId -> IO (Either ErrorType ()) diff --git a/src/Simplex/Messaging/Server/Stats.hs b/src/Simplex/Messaging/Server/Stats.hs index 4af195295..e60f87815 100644 --- a/src/Simplex/Messaging/Server/Stats.hs +++ b/src/Simplex/Messaging/Server/Stats.hs @@ -27,7 +27,7 @@ import Data.Time.Clock (UTCTime (..)) import GHC.IORef (atomicSwapIORef) import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol (EntityId (..)) -import Simplex.Messaging.Server.QueueStore (RoundedSystemTime (..)) +import Simplex.Messaging.SystemTime import Simplex.Messaging.Util (atomicModifyIORef'_, tshow, unlessM) data ServerStats = ServerStats @@ -976,7 +976,7 @@ data TimeBuckets = TimeBuckets emptyTimeBuckets :: TimeBuckets emptyTimeBuckets = TimeBuckets 0 0 IM.empty -updateTimeBuckets :: RoundedSystemTime -> RoundedSystemTime -> TimeBuckets -> TimeBuckets +updateTimeBuckets :: SystemSeconds -> SystemSeconds -> TimeBuckets -> TimeBuckets updateTimeBuckets (RoundedSystemTime deliveryTime) (RoundedSystemTime currTime) diff --git a/src/Simplex/Messaging/Server/StoreLog.hs b/src/Simplex/Messaging/Server/StoreLog.hs index 6ea015066..4ceb3cddd 100644 --- a/src/Simplex/Messaging/Server/StoreLog.hs +++ b/src/Simplex/Messaging/Server/StoreLog.hs @@ -55,9 +55,9 @@ import GHC.IO (catchAny) import Simplex.Messaging.Encoding import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol --- import Simplex.Messaging.Server.MsgStore.Types import Simplex.Messaging.Server.QueueStore import Simplex.Messaging.Server.StoreLog.Types +import Simplex.Messaging.SystemTime import Simplex.Messaging.Util (ifM, tshow, unlessM, whenM) import System.Directory (doesFileExist, listDirectory, removeFile, renameFile) import System.IO @@ -75,7 +75,7 @@ data StoreLogRecord | UnblockQueue QueueId | DeleteQueue QueueId | DeleteNotifier QueueId - | UpdateTime QueueId RoundedSystemTime + | UpdateTime QueueId SystemDate | NewService ServiceRec | QueueService RecipientId ASubscriberParty (Maybe ServiceId) deriving (Show) @@ -280,7 +280,7 @@ logDeleteQueue s = writeStoreLogRecord s . DeleteQueue logDeleteNotifier :: StoreLog 'WriteMode -> QueueId -> IO () logDeleteNotifier s = writeStoreLogRecord s . DeleteNotifier -logUpdateQueueTime :: StoreLog 'WriteMode -> QueueId -> RoundedSystemTime -> IO () +logUpdateQueueTime :: StoreLog 'WriteMode -> QueueId -> SystemDate -> IO () logUpdateQueueTime s qId t = writeStoreLogRecord s $ UpdateTime qId t logNewService :: StoreLog 'WriteMode -> ServiceRec -> IO () diff --git a/src/Simplex/Messaging/SystemTime.hs b/src/Simplex/Messaging/SystemTime.hs new file mode 100644 index 000000000..7435a694d --- /dev/null +++ b/src/Simplex/Messaging/SystemTime.hs @@ -0,0 +1,47 @@ +{-# LANGUAGE CPP #-} +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE DerivingStrategies #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE KindSignatures #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TypeApplications #-} + +module Simplex.Messaging.SystemTime where + +import Data.Aeson (FromJSON, ToJSON) +import Data.Int (Int64) +import Data.Time.Clock (UTCTime) +import Data.Time.Clock.System (SystemTime (..), getSystemTime, systemToUTCTime) +import Data.Typeable (Proxy (..)) +import GHC.TypeLits (KnownNat, Nat, natVal) +import Simplex.Messaging.Agent.Store.DB (FromField (..), ToField (..)) +import Simplex.Messaging.Encoding.String + +newtype RoundedSystemTime (t :: Nat) = RoundedSystemTime {roundedSeconds :: Int64} + deriving (Eq, Ord, Show) + deriving newtype (FromJSON, ToJSON, FromField, ToField) + +type SystemDate = RoundedSystemTime 86400 + +type SystemSeconds = RoundedSystemTime 1 + +instance StrEncoding (RoundedSystemTime t) where + strEncode (RoundedSystemTime t) = strEncode t + strP = RoundedSystemTime <$> strP + +getRoundedSystemTime :: forall t. KnownNat t => IO (RoundedSystemTime t) +getRoundedSystemTime = (\t -> RoundedSystemTime $ (systemSeconds t `div` prec) * prec) <$> getSystemTime + where + prec = fromIntegral $ natVal $ Proxy @t + +getSystemDate :: IO SystemDate +getSystemDate = getRoundedSystemTime +{-# INLINE getSystemDate #-} + +getSystemSeconds :: IO SystemSeconds +getSystemSeconds = RoundedSystemTime . systemSeconds <$> getSystemTime +{-# INLINE getSystemSeconds #-} + +roundedToUTCTime :: RoundedSystemTime t -> UTCTime +roundedToUTCTime = systemToUTCTime . (`MkSystemTime` 0) . roundedSeconds +{-# INLINE roundedToUTCTime #-} diff --git a/src/Simplex/Messaging/Transport.hs b/src/Simplex/Messaging/Transport.hs index 0b2eb3b75..e2e912875 100644 --- a/src/Simplex/Messaging/Transport.hs +++ b/src/Simplex/Messaging/Transport.hs @@ -55,6 +55,7 @@ module Simplex.Messaging.Transport shortLinksSMPVersion, serviceCertsSMPVersion, newNtfCredsSMPVersion, + clientNoticesSMPVersion, simplexMQVersion, smpBlockSize, TransportConfig (..), @@ -168,6 +169,7 @@ smpBlockSize = 16384 -- 15 - short links, with associated data passed in NEW of LSET command (3/30/2025) -- 16 - service certificates (5/31/2025) -- 17 - create notification credentials with NEW (7/12/2025) +-- 18 - support client notices (10/10/2025) data SMPVersion @@ -213,6 +215,9 @@ serviceCertsSMPVersion = VersionSMP 16 newNtfCredsSMPVersion :: VersionSMP newNtfCredsSMPVersion = VersionSMP 17 +clientNoticesSMPVersion :: VersionSMP +clientNoticesSMPVersion = VersionSMP 18 + minClientSMPRelayVersion :: VersionSMP minClientSMPRelayVersion = VersionSMP 6 @@ -220,13 +225,13 @@ minServerSMPRelayVersion :: VersionSMP minServerSMPRelayVersion = VersionSMP 6 currentClientSMPRelayVersion :: VersionSMP -currentClientSMPRelayVersion = VersionSMP 17 +currentClientSMPRelayVersion = VersionSMP 18 legacyServerSMPRelayVersion :: VersionSMP legacyServerSMPRelayVersion = VersionSMP 6 currentServerSMPRelayVersion :: VersionSMP -currentServerSMPRelayVersion = VersionSMP 17 +currentServerSMPRelayVersion = VersionSMP 18 -- Max SMP protocol version to be used in e2e encrypted -- connection between client and server, as defined by SMP proxy. @@ -234,7 +239,7 @@ currentServerSMPRelayVersion = VersionSMP 17 -- to prevent client version fingerprinting by the -- destination relays when clients upgrade at different times. proxiedSMPRelayVersion :: VersionSMP -proxiedSMPRelayVersion = VersionSMP 16 +proxiedSMPRelayVersion = VersionSMP 17 -- minimal supported protocol version is 6 -- TODO remove code that supports sending commands without batching diff --git a/src/Simplex/Messaging/Util.hs b/src/Simplex/Messaging/Util.hs index 57fb11c21..e9f37b1ae 100644 --- a/src/Simplex/Messaging/Util.hs +++ b/src/Simplex/Messaging/Util.hs @@ -245,6 +245,7 @@ safeDecodeUtf8 :: ByteString -> Text safeDecodeUtf8 = decodeUtf8With onError where onError _ _ = Just '?' +{-# INLINE safeDecodeUtf8 #-} timeoutThrow :: MonadUnliftIO m => e -> Int -> ExceptT e m a -> ExceptT e m a timeoutThrow e ms action = ExceptT (sequence <$> (ms `timeout` runExceptT action)) >>= maybe (throwE e) pure diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index bc6b7e6f0..869483129 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -87,6 +87,8 @@ import Simplex.Messaging.Agent.Client (ProtocolTestFailure (..), ProtocolTestSte import Simplex.Messaging.Agent.Env.SQLite (AgentConfig (..), Env (..), InitialAgentServers (..), createAgentStore) import Simplex.Messaging.Agent.Protocol hiding (CON, CONF, INFO, REQ, SENT, INV, JOINED) import qualified Simplex.Messaging.Agent.Protocol as A +import Simplex.Messaging.Agent.Store (Connection' (..), SomeConn' (..), StoredRcvQueue (..)) +import Simplex.Messaging.Agent.Store.AgentStore (getConn) import Simplex.Messaging.Agent.Store.Common (DBStore (..), withTransaction) import Simplex.Messaging.Agent.Store.Interface import qualified Simplex.Messaging.Agent.Store.DB as DB @@ -100,10 +102,12 @@ import Simplex.Messaging.Encoding.String import Simplex.Messaging.Notifications.Transport (NTFVersion, pattern VersionNTF) import Simplex.Messaging.Protocol (BasicAuth, ErrorType (..), MsgBody, NetworkError (..), ProtocolServer (..), SubscriptionMode (..), initialSMPClientVersion, srvHostnamesSMPClientVersion, supportedSMPClientVRange) import qualified Simplex.Messaging.Protocol as SMP +import Simplex.Messaging.Protocol.Types import Simplex.Messaging.Server.Env.STM (AStoreType (..), ServerConfig (..), ServerStoreCfg (..), StorePaths (..)) import Simplex.Messaging.Server.Expiration import Simplex.Messaging.Server.MsgStore.Types (SMSType (..), SQSType (..)) import Simplex.Messaging.Server.QueueStore.QueueInfo +import Simplex.Messaging.Server.StoreLog (StoreLogRecord (..)) import Simplex.Messaging.Transport (ASrvTransport, SMPVersion, VersionSMP, authCmdsSMPVersion, currentServerSMPRelayVersion, minClientSMPRelayVersion, minServerSMPRelayVersion, sendingProxySMPVersion, sndAuthKeySMPVersion, alpnSupportedSMPHandshakes, supportedServerSMPRelayVRange) import Simplex.Messaging.Util (bshow, diffToMicroseconds) import Simplex.Messaging.Version (VersionRange (..)) @@ -278,7 +282,7 @@ inAnyOrder g rs = withFrozenCallStack $ do createConnection :: ConnectionModeI c => AgentClient -> UserId -> Bool -> SConnectionMode c -> Maybe CRClientData -> SubscriptionMode -> AE (ConnId, ConnectionRequestUri c) createConnection c userId enableNtfs cMode clientData subMode = do - (connId, (CCLink cReq _, Nothing)) <- A.createConnection c NRMInteractive userId enableNtfs cMode Nothing clientData IKPQOn subMode + (connId, (CCLink cReq _, Nothing)) <- A.createConnection c NRMInteractive userId enableNtfs True cMode Nothing clientData IKPQOn subMode pure (connId, cReq) joinConnection :: AgentClient -> UserId -> Bool -> ConnectionRequestUri c -> ConnInfo -> SubscriptionMode -> AE (ConnId, SndQueueSecured) @@ -540,6 +544,10 @@ functionalAPITests ps = do describe "SMP queue info" $ do it "server should respond with queue and subscription information" $ withSmpServer ps testServerQueueInfo +#if !defined(dbServerPostgres) + describe "Client notices" $ do + it "should create client notice" $ testClientNotice ps +#endif testBasicAuth :: (ASrvTransport, AStoreType) -> Bool -> (Maybe BasicAuth, VersionSMP) -> (Maybe BasicAuth, VersionSMP) -> (Maybe BasicAuth, VersionSMP) -> SndQueueSecured -> AgentMsgId -> IO Int testBasicAuth (t, msType) allowNewQueues srv@(srvAuth, srvVersion) clnt1 clnt2 sqSecured baseId = do @@ -700,7 +708,7 @@ runAgentClientTest pqSupport sqSecured viaProxy alice bob baseId = runAgentClientTestPQ :: HasCallStack => SndQueueSecured -> Bool -> (AgentClient, InitialKeys) -> (AgentClient, PQSupport) -> AgentMsgId -> IO () runAgentClientTestPQ sqSecured viaProxy (alice, aPQ) (bob, bPQ) baseId = runRight_ $ do - (bobId, (CCLink qInfo Nothing, Nothing)) <- A.createConnection alice NRMInteractive 1 True SCMInvitation Nothing Nothing aPQ SMSubscribe + (bobId, (CCLink qInfo Nothing, Nothing)) <- A.createConnection alice NRMInteractive 1 True True SCMInvitation Nothing Nothing aPQ SMSubscribe aliceId <- A.prepareConnectionToJoin bob 1 True qInfo bPQ (sqSecured', Nothing) <- A.joinConnection bob NRMInteractive 1 aliceId True qInfo "bob's connInfo" bPQ SMSubscribe liftIO $ sqSecured' `shouldBe` sqSecured @@ -902,7 +910,7 @@ runAgentClientContactTest pqSupport sqSecured viaProxy alice bob baseId = runAgentClientContactTestPQ :: HasCallStack => SndQueueSecured -> Bool -> PQSupport -> (AgentClient, InitialKeys) -> (AgentClient, PQSupport) -> AgentMsgId -> IO () runAgentClientContactTestPQ sqSecured viaProxy reqPQSupport (alice, aPQ) (bob, bPQ) baseId = runRight_ $ do - (_, (CCLink qInfo Nothing, Nothing)) <- A.createConnection alice NRMInteractive 1 True SCMContact Nothing Nothing aPQ SMSubscribe + (_, (CCLink qInfo Nothing, Nothing)) <- A.createConnection alice NRMInteractive 1 True True SCMContact Nothing Nothing aPQ SMSubscribe aliceId <- A.prepareConnectionToJoin bob 1 True qInfo bPQ (sqSecuredJoin, Nothing) <- A.joinConnection bob NRMInteractive 1 aliceId True qInfo "bob's connInfo" bPQ SMSubscribe liftIO $ sqSecuredJoin `shouldBe` False -- joining via contact address connection @@ -946,7 +954,7 @@ runAgentClientContactTestPQ sqSecured viaProxy reqPQSupport (alice, aPQ) (bob, b runAgentClientContactTestPQ3 :: HasCallStack => Bool -> (AgentClient, InitialKeys) -> (AgentClient, PQSupport) -> (AgentClient, PQSupport) -> AgentMsgId -> IO () runAgentClientContactTestPQ3 viaProxy (alice, aPQ) (bob, bPQ) (tom, tPQ) baseId = runRight_ $ do - (_, (CCLink qInfo Nothing, Nothing)) <- A.createConnection alice NRMInteractive 1 True SCMContact Nothing Nothing aPQ SMSubscribe + (_, (CCLink qInfo Nothing, Nothing)) <- A.createConnection alice NRMInteractive 1 True True SCMContact Nothing Nothing aPQ SMSubscribe (bAliceId, bobId, abPQEnc) <- connectViaContact bob bPQ qInfo sentMessages abPQEnc alice bobId bob bAliceId (tAliceId, tomId, atPQEnc) <- connectViaContact tom tPQ qInfo @@ -999,7 +1007,7 @@ noMessages_ ingoreQCONT c err = tryGet `shouldReturn` () testRejectContactRequest :: HasCallStack => IO () testRejectContactRequest = withAgentClients2 $ \alice bob -> runRight_ $ do - (_addrConnId, (CCLink qInfo Nothing, Nothing)) <- A.createConnection alice NRMInteractive 1 True SCMContact Nothing Nothing IKPQOn SMSubscribe + (_addrConnId, (CCLink qInfo Nothing, Nothing)) <- A.createConnection alice NRMInteractive 1 True True SCMContact Nothing Nothing IKPQOn SMSubscribe aliceId <- A.prepareConnectionToJoin bob 1 True qInfo PQSupportOn (sqSecured, Nothing) <- A.joinConnection bob NRMInteractive 1 aliceId True qInfo "bob's connInfo" PQSupportOn SMSubscribe liftIO $ sqSecured `shouldBe` False -- joining via contact address connection @@ -1325,7 +1333,7 @@ testInviationShortLink :: HasCallStack => Bool -> AgentClient -> AgentClient -> testInviationShortLink viaProxy a b = withAgent 3 agentCfg initAgentServers testDB3 $ \c -> do let userData = UserLinkData "some user data" - (bId, (CCLink connReq (Just shortLink), Nothing)) <- runRight $ A.createConnection a NRMInteractive 1 True SCMInvitation (Just userData) Nothing CR.IKUsePQ SMSubscribe + (bId, (CCLink connReq (Just shortLink), Nothing)) <- runRight $ A.createConnection a NRMInteractive 1 True True SCMInvitation (Just userData) Nothing CR.IKUsePQ SMSubscribe (connReq', connData') <- runRight $ getConnShortLink b 1 shortLink strDecode (strEncode shortLink) `shouldBe` Right shortLink connReq' `shouldBe` connReq @@ -1360,13 +1368,13 @@ testInviationShortLinkPrev :: HasCallStack => Bool -> Bool -> AgentClient -> Age testInviationShortLinkPrev viaProxy sndSecure a b = runRight_ $ do let userData = UserLinkData "some user data" -- can't create short link with previous version - (bId, (CCLink connReq Nothing, Nothing)) <- A.createConnection a NRMInteractive 1 True SCMInvitation (Just userData) Nothing CR.IKPQOn SMSubscribe + (bId, (CCLink connReq Nothing, Nothing)) <- A.createConnection a NRMInteractive 1 True True SCMInvitation (Just userData) Nothing CR.IKPQOn SMSubscribe testJoinConn_ viaProxy sndSecure a bId b connReq testInviationShortLinkAsync :: HasCallStack => Bool -> AgentClient -> AgentClient -> IO () testInviationShortLinkAsync viaProxy a b = do let userData = UserLinkData "some user data" - (bId, (CCLink connReq (Just shortLink), Nothing)) <- runRight $ A.createConnection a NRMInteractive 1 True SCMInvitation (Just userData) Nothing CR.IKUsePQ SMSubscribe + (bId, (CCLink connReq (Just shortLink), Nothing)) <- runRight $ A.createConnection a NRMInteractive 1 True True SCMInvitation (Just userData) Nothing CR.IKUsePQ SMSubscribe (connReq', connData') <- runRight $ getConnShortLink b 1 shortLink strDecode (strEncode shortLink) `shouldBe` Right shortLink connReq' `shouldBe` connReq @@ -1385,7 +1393,7 @@ testContactShortLink :: HasCallStack => Bool -> AgentClient -> AgentClient -> IO testContactShortLink viaProxy a b = withAgent 3 agentCfg initAgentServers testDB3 $ \c -> do let userData = UserLinkData "some user data" - (contactId, (CCLink connReq0 (Just shortLink), Nothing)) <- runRight $ A.createConnection a NRMInteractive 1 True SCMContact (Just userData) Nothing CR.IKPQOn SMSubscribe + (contactId, (CCLink connReq0 (Just shortLink), Nothing)) <- runRight $ A.createConnection a NRMInteractive 1 True True SCMContact (Just userData) Nothing CR.IKPQOn SMSubscribe Right connReq <- pure $ smpDecode (smpEncode connReq0) (connReq', connData') <- runRight $ getConnShortLink b 1 shortLink strDecode (strEncode shortLink) `shouldBe` Right shortLink @@ -1430,7 +1438,7 @@ testContactShortLink viaProxy a b = testAddContactShortLink :: HasCallStack => Bool -> AgentClient -> AgentClient -> IO () testAddContactShortLink viaProxy a b = withAgent 3 agentCfg initAgentServers testDB3 $ \c -> do - (contactId, (CCLink connReq0 Nothing, Nothing)) <- runRight $ A.createConnection a NRMInteractive 1 True SCMContact Nothing Nothing CR.IKPQOn SMSubscribe + (contactId, (CCLink connReq0 Nothing, Nothing)) <- runRight $ A.createConnection a NRMInteractive 1 True True SCMContact Nothing Nothing CR.IKPQOn SMSubscribe Right connReq <- pure $ smpDecode (smpEncode connReq0) -- let userData = UserLinkData "some user data" shortLink <- runRight $ setConnShortLink a contactId SCMContact userData Nothing @@ -1471,7 +1479,7 @@ testInviationShortLinkRestart :: HasCallStack => (ASrvTransport, AStoreType) -> testInviationShortLinkRestart ps = withAgentClients2 $ \a b -> do let userData = UserLinkData "some user data" (bId, (CCLink connReq (Just shortLink), Nothing)) <- withSmpServer ps $ - runRight $ A.createConnection a NRMInteractive 1 True SCMInvitation (Just userData) Nothing CR.IKUsePQ SMOnlyCreate + runRight $ A.createConnection a NRMInteractive 1 True True SCMInvitation (Just userData) Nothing CR.IKUsePQ SMOnlyCreate withSmpServer ps $ do runRight_ $ subscribeConnection a bId (connReq', connData') <- runRight $ getConnShortLink b 1 shortLink @@ -1483,7 +1491,7 @@ testContactShortLinkRestart :: HasCallStack => (ASrvTransport, AStoreType) -> IO testContactShortLinkRestart ps = withAgentClients2 $ \a b -> do let userData = UserLinkData "some user data" (contactId, (CCLink connReq0 (Just shortLink), Nothing)) <- withSmpServer ps $ - runRight $ A.createConnection a NRMInteractive 1 True SCMContact (Just userData) Nothing CR.IKPQOn SMOnlyCreate + runRight $ A.createConnection a NRMInteractive 1 True True SCMContact (Just userData) Nothing CR.IKPQOn SMOnlyCreate Right connReq <- pure $ smpDecode (smpEncode connReq0) let updatedData = UserLinkData "updated user data" withSmpServer ps $ do @@ -1503,7 +1511,7 @@ testAddContactShortLinkRestart :: HasCallStack => (ASrvTransport, AStoreType) -> testAddContactShortLinkRestart ps = withAgentClients2 $ \a b -> do let userData = UserLinkData "some user data" ((contactId, (CCLink connReq0 Nothing, Nothing)), shortLink) <- withSmpServer ps $ runRight $ do - r@(contactId, _) <- A.createConnection a NRMInteractive 1 True SCMContact Nothing Nothing CR.IKPQOn SMOnlyCreate + r@(contactId, _) <- A.createConnection a NRMInteractive 1 True True SCMContact Nothing Nothing CR.IKPQOn SMOnlyCreate (r,) <$> setConnShortLink a contactId SCMContact userData Nothing Right connReq <- pure $ smpDecode (smpEncode connReq0) let updatedData = UserLinkData "updated user data" @@ -1523,7 +1531,7 @@ testAddContactShortLinkRestart ps = withAgentClients2 $ \a b -> do testOldContactQueueShortLink :: HasCallStack => (ASrvTransport, AStoreType) -> IO () testOldContactQueueShortLink ps@(_, msType) = withAgentClients2 $ \a b -> do (contactId, (CCLink connReq Nothing, Nothing)) <- withSmpServer ps $ runRight $ - A.createConnection a NRMInteractive 1 True SCMContact Nothing Nothing CR.IKPQOn SMOnlyCreate + A.createConnection a NRMInteractive 1 True True SCMContact Nothing Nothing CR.IKPQOn SMOnlyCreate -- make it an "old" queue let updateStoreLog f = replaceSubstringInFile f " queue_mode=C" "" #if defined(dbServerPostgres) @@ -2263,7 +2271,7 @@ makeConnectionForUsers = makeConnectionForUsers_ PQSupportOn True makeConnectionForUsers_ :: HasCallStack => PQSupport -> SndQueueSecured -> AgentClient -> UserId -> AgentClient -> UserId -> ExceptT AgentErrorType IO (ConnId, ConnId) makeConnectionForUsers_ pqSupport sqSecured alice aliceUserId bob bobUserId = do - (bobId, (CCLink qInfo Nothing, Nothing)) <- A.createConnection alice NRMInteractive aliceUserId True SCMInvitation Nothing Nothing (IKLinkPQ pqSupport) SMSubscribe + (bobId, (CCLink qInfo Nothing, Nothing)) <- A.createConnection alice NRMInteractive aliceUserId True True SCMInvitation Nothing Nothing (IKLinkPQ pqSupport) SMSubscribe aliceId <- A.prepareConnectionToJoin bob bobUserId True qInfo pqSupport (sqSecured', Nothing) <- A.joinConnection bob NRMInteractive bobUserId aliceId True qInfo "bob's connInfo" pqSupport SMSubscribe liftIO $ sqSecured' `shouldBe` sqSecured @@ -3837,6 +3845,76 @@ testServerQueueInfo = do qDelivered <$> qiSub `shouldBe` Just msgId_ pure msgId_ +testClientNotice :: HasCallStack => (ASrvTransport, AStoreType) -> IO () +testClientNotice ps = do + withAgent 1 agentCfg initAgentServers testDB $ \c -> do + (cId, _) <- withSmpServerStoreLogOn ps testPort $ \_ -> runRight $ + A.createConnection c NRMInteractive 1 True True SCMContact Nothing Nothing IKPQOn SMSubscribe + ("", "", DOWN _ [_]) <- nGet c + + addNotice c cId $ Just 1 + + (cId', _) <- withSmpServerStoreLogOn ps testPort $ \_ -> do + subscribedWithErrors c 1 + testNotice c True + threadDelay 1000000 + runRight $ A.createConnection c NRMInteractive 1 True True SCMContact Nothing Nothing IKPQOn SMSubscribe + ("", "", DOWN _ [_]) <- nGet c + + addNotice c cId' $ Just 1 + + (cId'', _) <- withSmpServerStoreLogOn ps testPort $ \_ -> do + subscribedWithErrors c 1 + testNotice c True + threadDelay 1000000 + testNotice c True + threadDelay 1000000 + runRight $ A.createConnection c NRMInteractive 1 True True SCMContact Nothing Nothing IKPQOn SMSubscribe + + addNotice c cId'' $ Just 1 + + withAgent 1 agentCfg initAgentServers testDB $ \c -> do + (cId3, _) <- withSmpServerStoreLogOn ps testPort $ \_ -> do + runRight_ $ subscribeAllConnections c False Nothing + subscribedWithErrors c 3 + testNotice c True + threadDelay 2000000 + testNotice c True + threadDelay 1000000 + runRight $ A.createConnection c NRMInteractive 1 True True SCMContact Nothing Nothing IKPQOn SMSubscribe + ("", "", DOWN _ [_]) <- nGet c + + addNotice c cId3 Nothing + + withSmpServerStoreLogOn ps testPort $ \_ -> do + subscribedWithErrors c 1 + testNotice c False + + removeNotice c cId3 + + withAgent 1 agentCfg initAgentServers testDB $ \c -> do + withSmpServerStoreLogOn ps testPort $ \_ -> do + runRight_ $ subscribeAllConnections c False Nothing + subscribedWithErrors c 4 + void $ runRight $ A.createConnection c NRMInteractive 1 True True SCMContact Nothing Nothing IKPQOn SMSubscribe + where + addNotice c cId ttl = logNotice c cId $ Just ClientNotice {ttl} + removeNotice c cId = logNotice c cId Nothing + logNotice :: AgentClient -> ConnId -> Maybe ClientNotice -> IO () + logNotice c cId notice = do + Right (SomeConn _ (ContactConnection _ RcvQueue {rcvId})) <- withTransaction (store $ agentEnv c) (`getConn` cId) + withFile testStoreLogFile AppendMode $ \h -> B.hPutStrLn h $ strEncode $ BlockQueue rcvId $ SMP.BlockingInfo SMP.BRContent notice + subscribedWithErrors c n = do + ("", "", ERRS errs) <- nGet c + length errs `shouldBe` n + forM_ errs $ \case + (_, SMP _ (BLOCKED _)) -> pure () + r -> expectationFailure $ "unexpected event: " <> show r + testNotice :: HasCallStack => AgentClient -> Bool -> IO () + testNotice c willExpire = do + NOTICE "localhost" False expiresAt_ <- runLeft $ A.createConnection c NRMInteractive 1 True True SCMContact Nothing Nothing IKPQOn SMSubscribe + isJust expiresAt_ `shouldBe` willExpire + noNetworkDelay :: AgentClient -> IO () noNetworkDelay a = do d <- waitNetwork a diff --git a/tests/AgentTests/SQLiteTests.hs b/tests/AgentTests/SQLiteTests.hs index 53adf653a..dff79c861 100644 --- a/tests/AgentTests/SQLiteTests.hs +++ b/tests/AgentTests/SQLiteTests.hs @@ -230,6 +230,7 @@ rcvQueue1 = clientService = Nothing, status = New, enableNtfs = True, + clientNoticeId = Nothing, dbQueueId = DBNewEntity, primary = True, dbReplaceQueueId = Nothing, @@ -443,6 +444,7 @@ testUpgradeSndConnToDuplex = clientService = Nothing, status = New, enableNtfs = True, + clientNoticeId = Nothing, dbQueueId = DBNewEntity, rcvSwchStatus = Nothing, primary = True, diff --git a/tests/AgentTests/ServerChoice.hs b/tests/AgentTests/ServerChoice.hs index a3d6337f2..a27678cb6 100644 --- a/tests/AgentTests/ServerChoice.hs +++ b/tests/AgentTests/ServerChoice.hs @@ -64,7 +64,8 @@ initServers = ntf = [testNtfServer], xftp = userServers [testXFTPServer], netCfg = defaultNetworkConfig, - presetDomains = [] + presetDomains = [], + presetServers = [] } testChooseDifferentOperator :: IO () diff --git a/tests/CoreTests/MsgStoreTests.hs b/tests/CoreTests/MsgStoreTests.hs index 3961a9ce0..48fc7810a 100644 --- a/tests/CoreTests/MsgStoreTests.hs +++ b/tests/CoreTests/MsgStoreTests.hs @@ -43,7 +43,6 @@ import Simplex.Messaging.Server.MsgStore.STM import Simplex.Messaging.Server.MsgStore.Types import Simplex.Messaging.Server.QueueStore import Simplex.Messaging.Server.QueueStore.QueueInfo -import Simplex.Messaging.Server.QueueStore.Types import Simplex.Messaging.Server.StoreLog (closeStoreLog, logCreateQueue) import System.Directory (copyFile, createDirectoryIfMissing, listDirectory, removeFile, renameFile) import System.FilePath (()) @@ -58,6 +57,7 @@ import Simplex.Messaging.Agent.Store.Postgres.Common import Simplex.Messaging.Agent.Store.Shared (MigrationConfirmation (..)) import Simplex.Messaging.Server.MsgStore.Postgres import Simplex.Messaging.Server.QueueStore.Postgres +import Simplex.Messaging.Server.QueueStore.Types import SMPClient (postgressBracket, testServerDBConnectInfo, testStoreDBOpts) #endif diff --git a/tests/CoreTests/StoreLogTests.hs b/tests/CoreTests/StoreLogTests.hs index 3a898ef6a..01966ba05 100644 --- a/tests/CoreTests/StoreLogTests.hs +++ b/tests/CoreTests/StoreLogTests.hs @@ -24,18 +24,22 @@ import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol import Simplex.Messaging.Server.Env.STM (readWriteQueueStore) -import Simplex.Messaging.Server.Main import Simplex.Messaging.Server.MsgStore.Journal import Simplex.Messaging.Server.MsgStore.Types import Simplex.Messaging.Server.QueueStore import Simplex.Messaging.Server.QueueStore.STM (STMQueueStore (..)) import Simplex.Messaging.Server.QueueStore.Types import Simplex.Messaging.Server.StoreLog +import Simplex.Messaging.SystemTime import Simplex.Messaging.Transport (SMPServiceRole (..)) import Simplex.Messaging.Transport.Credentials (genCredentials) import Test.Hspec hiding (fit, it) import Util +#if defined(dbServerPostgres) +import Simplex.Messaging.Server.Main +#endif + testPublicAuthKey :: C.APublicAuthKey testPublicAuthKey = C.APublicAuthKey C.SEd25519 (C.publicKey "MC4CAQAwBQYDK2VwBCIEIDfEfevydXXfKajz3sRkcQ7RPvfWUPoq6pu1TYHV1DEe") diff --git a/tests/CoreTests/TSessionSubs.hs b/tests/CoreTests/TSessionSubs.hs index d31aa323f..e3f819332 100644 --- a/tests/CoreTests/TSessionSubs.hs +++ b/tests/CoreTests/TSessionSubs.hs @@ -126,6 +126,7 @@ dummyRQ userId server connId rcvId = rcvPrivateKey = C.APrivateAuthKey C.SEd25519 "MC4CAQAwBQYDK2VwBCIEIDfEfevydXXfKajz3sRkcQ7RPvfWUPoq6pu1TYHV1DEe", status = New, enableNtfs = False, + clientNoticeId = Nothing, dbQueueId = 0, primary = True, dbReplaceQueueId = Nothing diff --git a/tests/SMPAgentClient.hs b/tests/SMPAgentClient.hs index b6b9fd26f..02bee9ae7 100644 --- a/tests/SMPAgentClient.hs +++ b/tests/SMPAgentClient.hs @@ -65,7 +65,8 @@ initAgentServers = ntf = [testNtfServer], xftp = userServers [testXFTPServer], netCfg = defaultNetworkConfig {tcpTimeout = NetworkTimeout 500000 500000, tcpConnectTimeout = NetworkTimeout 500000 500000}, - presetDomains = [] + presetDomains = [], + presetServers = [] } initAgentServers2 :: InitialAgentServers diff --git a/tests/SMPProxyTests.hs b/tests/SMPProxyTests.hs index 5f1a59fd0..b756ce7c9 100644 --- a/tests/SMPProxyTests.hs +++ b/tests/SMPProxyTests.hs @@ -224,7 +224,7 @@ agentDeliverMessageViaProxy :: (C.AlgorithmI a, C.AuthAlgorithm a) => (NonEmpty agentDeliverMessageViaProxy aTestCfg@(aSrvs, _, aViaProxy) bTestCfg@(bSrvs, _, bViaProxy) alg msg1 msg2 baseId = withAgent 1 aCfg (servers aTestCfg) testDB $ \alice -> withAgent 2 aCfg (servers bTestCfg) testDB2 $ \bob -> runRight_ $ do - (bobId, (CCLink qInfo Nothing, Nothing)) <- A.createConnection alice NRMInteractive 1 True SCMInvitation Nothing Nothing CR.IKPQOn SMSubscribe + (bobId, (CCLink qInfo Nothing, Nothing)) <- A.createConnection alice NRMInteractive 1 True True SCMInvitation Nothing Nothing CR.IKPQOn SMSubscribe aliceId <- A.prepareConnectionToJoin bob 1 True qInfo PQSupportOn (sqSecured, Nothing) <- A.joinConnection bob NRMInteractive 1 aliceId True qInfo "bob's connInfo" PQSupportOn SMSubscribe liftIO $ sqSecured `shouldBe` True @@ -280,7 +280,7 @@ agentDeliverMessagesViaProxyConc agentServers msgs = -- agent connections have to be set up in advance -- otherwise the CONF messages would get mixed with MSG prePair alice bob = do - (bobId, (CCLink qInfo Nothing, Nothing)) <- runExceptT' $ A.createConnection alice NRMInteractive 1 True SCMInvitation Nothing Nothing CR.IKPQOn SMSubscribe + (bobId, (CCLink qInfo Nothing, Nothing)) <- runExceptT' $ A.createConnection alice NRMInteractive 1 True True SCMInvitation Nothing Nothing CR.IKPQOn SMSubscribe aliceId <- runExceptT' $ A.prepareConnectionToJoin bob 1 True qInfo PQSupportOn (sqSecured, Nothing) <- runExceptT' $ A.joinConnection bob NRMInteractive 1 aliceId True qInfo "bob's connInfo" PQSupportOn SMSubscribe liftIO $ sqSecured `shouldBe` True @@ -331,7 +331,7 @@ agentViaProxyVersionError = withAgent 1 agentCfg (servers [SMPServer testHost testPort testKeyHash]) testDB $ \alice -> do Left (A.BROKER _ (TRANSPORT TEVersion)) <- withAgent 2 agentCfg (servers [SMPServer testHost2 testPort2 testKeyHash]) testDB2 $ \bob -> runExceptT $ do - (_bobId, (CCLink qInfo Nothing, Nothing)) <- A.createConnection alice NRMInteractive 1 True SCMInvitation Nothing Nothing CR.IKPQOn SMSubscribe + (_bobId, (CCLink qInfo Nothing, Nothing)) <- A.createConnection alice NRMInteractive 1 True True SCMInvitation Nothing Nothing CR.IKPQOn SMSubscribe aliceId <- A.prepareConnectionToJoin bob 1 True qInfo PQSupportOn A.joinConnection bob NRMInteractive 1 aliceId True qInfo "bob's connInfo" PQSupportOn SMSubscribe pure () @@ -351,7 +351,7 @@ agentViaProxyRetryOffline = do let pqEnc = CR.PQEncOn withServer $ \_ -> do (aliceId, bobId) <- withServer2 $ \_ -> runRight $ do - (bobId, (CCLink qInfo Nothing, Nothing)) <- A.createConnection alice NRMInteractive 1 True SCMInvitation Nothing Nothing CR.IKPQOn SMSubscribe + (bobId, (CCLink qInfo Nothing, Nothing)) <- A.createConnection alice NRMInteractive 1 True True SCMInvitation Nothing Nothing CR.IKPQOn SMSubscribe aliceId <- A.prepareConnectionToJoin bob 1 True qInfo PQSupportOn (sqSecured, Nothing) <- A.joinConnection bob NRMInteractive 1 aliceId True qInfo "bob's connInfo" PQSupportOn SMSubscribe liftIO $ sqSecured `shouldBe` True diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index 53269d6f6..b2c2d997c 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -1290,11 +1290,11 @@ testBlockMessageQueue = pure (rId, sId) -- TODO [postgres] block via control port - withFile testStoreLogFile AppendMode $ \h -> B.hPutStrLn h $ strEncode $ BlockQueue rId $ BlockingInfo BRContent + withFile testStoreLogFile AppendMode $ \h -> B.hPutStrLn h $ strEncode $ BlockQueue rId $ BlockingInfo BRContent Nothing withSmpServerStoreLogOn ps testPort $ runTest t $ \h -> do (sPub, sKey) <- atomically $ C.generateAuthKeyPair C.SEd448 g - Resp "dabc" sId2 (ERR (BLOCKED (BlockingInfo BRContent))) <- signSendRecv h sKey ("dabc", sId, SKEY sPub) + Resp "dabc" sId2 (ERR (BLOCKED (BlockingInfo BRContent Nothing))) <- signSendRecv h sKey ("dabc", sId, SKEY sPub) (sId2, sId) #== "same queue ID in response" where runTest :: Transport c => TProxy c 'TServer -> (THandleSMP c 'TClient -> IO a) -> ThreadId -> IO a