diff --git a/simplexmq.cabal b/simplexmq.cabal index 3c8c0ae24..d671dae86 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -1,7 +1,7 @@ cabal-version: 1.12 name: simplexmq -version: 6.4.0.1 +version: 6.4.0.2 synopsis: SimpleXMQ message broker description: This package includes <./docs/Simplex-Messaging-Server.html server>, <./docs/Simplex-Messaging-Client.html client> and diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 89e379023..e4b0d4013 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -439,7 +439,7 @@ subscribeConnections c = withAgentEnv c . subscribeConnections' c {-# INLINE subscribeConnections #-} -- | Get messages for connections (GET commands) -getConnectionMessages :: AgentClient -> NonEmpty ConnId -> IO (NonEmpty (Maybe SMPMsgMeta)) +getConnectionMessages :: AgentClient -> NonEmpty ConnMsgReq -> IO (NonEmpty (Either AgentErrorType (Maybe SMPMsgMeta))) getConnectionMessages c = withAgentEnv' c . getConnectionMessages' c {-# INLINE getConnectionMessages #-} @@ -1276,24 +1276,26 @@ resubscribeConnections' c connIds = do -- union is left-biased, so results returned by subscribeConnections' take precedence (`M.union` r) <$> subscribeConnections' c connIds' -getConnectionMessages' :: AgentClient -> NonEmpty ConnId -> AM' (NonEmpty (Maybe SMPMsgMeta)) -getConnectionMessages' c = mapM getMsg +-- requesting messages sequentially, to reduce memory usage +getConnectionMessages' :: AgentClient -> NonEmpty ConnMsgReq -> AM' (NonEmpty (Either AgentErrorType (Maybe SMPMsgMeta))) +getConnectionMessages' c = mapM $ tryAgentError' . getConnectionMessage where - getMsg :: ConnId -> AM' (Maybe SMPMsgMeta) - getMsg connId = - getConnectionMessage connId `catchAgentError'` \e -> do - logError $ "Error loading message: " <> tshow e - pure Nothing - getConnectionMessage :: ConnId -> AM (Maybe SMPMsgMeta) - getConnectionMessage connId = do + getConnectionMessage :: ConnMsgReq -> AM (Maybe SMPMsgMeta) + getConnectionMessage (ConnMsgReq connId dbQueueId msgTs_) = do whenM (atomically $ hasActiveSubscription c connId) . throwE $ CMD PROHIBITED "getConnectionMessage: subscribed" SomeConn _ conn <- withStore c (`getConn` connId) - case conn of - DuplexConnection _ (rq :| _) _ -> getQueueMessage c rq - RcvConnection _ rq -> getQueueMessage c rq - ContactConnection _ rq -> getQueueMessage c rq + rq <- case conn of + DuplexConnection _ (rq :| _) _ -> pure rq + RcvConnection _ rq -> pure rq + ContactConnection _ rq -> pure rq SndConnection _ _ -> throwE $ CONN SIMPLEX NewConnection _ -> throwE $ CMD PROHIBITED "getConnectionMessage: NewConnection" + msg_ <- getQueueMessage c rq `catchAgentError` \e -> atomically (releaseGetLock c rq) >> throwError e + when (isNothing msg_) $ do + atomically $ releaseGetLock c rq + forM_ msgTs_ $ \msgTs -> withStore' c $ \db -> setLastBrokerTs db connId (DBQueueId dbQueueId) msgTs + pure msg_ +{-# INLINE getConnectionMessages' #-} getNotificationConns' :: AgentClient -> C.CbNonce -> ByteString -> AM (NonEmpty NotificationInfo) getNotificationConns' c nonce encNtfInfo = @@ -1308,7 +1310,7 @@ getNotificationConns' c nonce encNtfInfo = lastNtfInfo = Just . fst <$$> getNtfInfo db lastNtf in initNtfInfos <> [lastNtfInfo] let (errs, ntfInfos_) = partitionEithers rs - logError $ "Error(s) loading notifications: " <> tshow errs + unless (null errs) $ logError $ "Error(s) loading notifications: " <> tshow errs case L.nonEmpty $ catMaybes ntfInfos_ of Just r -> pure r Nothing -> throwE $ INTERNAL "getNotificationConns: couldn't get conn info" @@ -1316,17 +1318,18 @@ getNotificationConns' c nonce encNtfInfo = where getNtfInfo :: DB.Connection -> PNMessageData -> IO (Either AgentErrorType (NotificationInfo, Maybe UTCTime)) getNtfInfo db PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta} = runExceptT $ do - (ntfConnId, rcvNtfDhSecret, lastBrokerTs_) <- liftError' storeError $ getNtfRcvQueue db smpQueue + (ntfConnId, ntfDbQueueId, rcvNtfDhSecret, lastBrokerTs_) <- liftError' storeError $ getNtfRcvQueue db smpQueue let ntfMsgMeta = eitherToMaybe $ smpDecode =<< first show (C.cbDecrypt rcvNtfDhSecret nmsgNonce encNMsgMeta) - ntfInfo = NotificationInfo {ntfConnId, ntfTs, ntfMsgMeta} + ntfInfo = NotificationInfo {ntfConnId, ntfDbQueueId, ntfTs, ntfMsgMeta} pure (ntfInfo, lastBrokerTs_) getInitNtfInfo :: DB.Connection -> PNMessageData -> IO (Either AgentErrorType (Maybe NotificationInfo)) getInitNtfInfo db msgData = runExceptT $ do - (nftInfo, lastBrokerTs_) <- ExceptT $ getNtfInfo db msgData - pure $ case (ntfMsgMeta nftInfo, lastBrokerTs_) of - (Just SMP.NMsgMeta {msgTs}, Just lastBrokerTs) - | systemToUTCTime msgTs > lastBrokerTs -> Just nftInfo + (ntfInfo, lastBrokerTs_) <- ExceptT $ getNtfInfo db msgData + pure $ case ntfMsgMeta ntfInfo of + Just SMP.NMsgMeta {msgTs} + | maybe True (systemToUTCTime msgTs >) lastBrokerTs_ -> Just ntfInfo _ -> Nothing +{-# INLINE getNotificationConns' #-} -- | Send message to the connection (SEND command) in Reader monad sendMessage' :: AgentClient -> ConnId -> PQEncryption -> MsgFlags -> MsgBody -> AM (AgentMsgId, PQEncryption) diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 455c5eb48..bd072fed2 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -326,6 +326,7 @@ data AgentClient = AgentClient xftpServers :: TMap UserId (UserServers 'PXFTP), xftpClients :: TMap XFTPTransportSession XFTPClientVar, useNetworkConfig :: TVar (NetworkConfig, NetworkConfig), -- (slow, fast) networks + presetSMPDomains :: [HostName], userNetworkInfo :: TVar UserNetworkInfo, userNetworkUpdated :: TVar (Maybe UTCTime), subscrConns :: TVar (Set ConnId), @@ -478,7 +479,7 @@ data UserNetworkType = UNNone | UNCellular | UNWifi | UNEthernet | UNOther -- | Creates an SMP agent client instance that receives commands and sends responses via 'TBQueue's. newAgentClient :: Int -> InitialAgentServers -> UTCTime -> Env -> IO AgentClient -newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} currentTs agentEnv = do +newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg, presetDomains} currentTs agentEnv = do let cfg = config agentEnv qSize = tbqSize cfg proxySessTs <- newTVarIO =<< getCurrentTime @@ -532,6 +533,7 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} currentTs a xftpServers, xftpClients, useNetworkConfig, + presetSMPDomains = presetDomains, userNetworkInfo, userNetworkUpdated, subscrConns, @@ -690,7 +692,7 @@ smpConnectClient c@AgentClient {smpClients, msgQ, proxySessTs} tSess@(_, srv, _) env <- ask liftError (protocolClientError SMP $ B.unpack $ strEncode srv) $ do ts <- readTVarIO proxySessTs - smp <- ExceptT $ getProtocolClient g tSess cfg (Just msgQ) ts $ smpClientDisconnected c tSess env v' prs + smp <- ExceptT $ getProtocolClient g tSess cfg (presetSMPDomains c) (Just msgQ) ts $ smpClientDisconnected c tSess env v' prs pure SMPConnectedClient {connectedClient = smp, proxiedRelays = prs} smpClientDisconnected :: AgentClient -> SMPTransportSession -> Env -> SMPClientVar -> TMap SMPServer ProxiedRelayVar -> SMPClient -> IO () @@ -793,7 +795,7 @@ getNtfServerClient c@AgentClient {active, ntfClients, workerSeq, proxySessTs} tS g <- asks random ts <- readTVarIO proxySessTs liftError' (protocolClientError NTF $ B.unpack $ strEncode srv) $ - getProtocolClient g tSess cfg Nothing ts $ + getProtocolClient g tSess cfg [] Nothing ts $ clientDisconnected v clientDisconnected :: NtfClientVar -> NtfClient -> IO () @@ -1225,7 +1227,7 @@ runSMPServerTest c userId (ProtoServerWithAuth srv auth) = do liftIO $ do let tSess = (userId, srv, Nothing) ts <- readTVarIO $ proxySessTs c - getProtocolClient g tSess cfg Nothing ts (\_ -> pure ()) >>= \case + getProtocolClient g tSess cfg (presetSMPDomains c) Nothing ts (\_ -> pure ()) >>= \case Right smp -> do rKeys@(_, rpKey) <- atomically $ C.generateAuthKeyPair ra g (sKey, spKey) <- atomically $ C.generateAuthKeyPair sa g @@ -1302,7 +1304,7 @@ runNTFServerTest c userId (ProtoServerWithAuth srv _) = do liftIO $ do let tSess = (userId, srv, Nothing) ts <- readTVarIO $ proxySessTs c - getProtocolClient g tSess cfg Nothing ts (\_ -> pure ()) >>= \case + getProtocolClient g tSess cfg [] Nothing ts (\_ -> pure ()) >>= \case Right ntf -> do (nKey, npKey) <- atomically $ C.generateAuthKeyPair a g (dhKey, _) <- atomically $ C.generateKeyPair g @@ -1652,6 +1654,7 @@ getQueueMessage c rq@RcvQueue {server, rcvId, rcvPrivateKey} = do l <- maybe (newTMVar ()) pure l_ takeTMVar l pure $ Just l +{-# INLINE getQueueMessage #-} decryptSMPMessage :: RcvQueue -> SMP.RcvMessage -> AM SMP.ClientRcvMsgBody decryptSMPMessage rq SMP.RcvMessage {msgId, msgBody = SMP.EncRcvMsgBody body} = @@ -1741,10 +1744,12 @@ sendAck c rq@RcvQueue {rcvId, rcvPrivateKey} msgId = hasGetLock :: AgentClient -> RcvQueue -> IO Bool hasGetLock c RcvQueue {server, rcvId} = TM.memberIO (server, rcvId) $ getMsgLocks c +{-# INLINE hasGetLock #-} releaseGetLock :: AgentClient -> RcvQueue -> STM () releaseGetLock c RcvQueue {server, rcvId} = TM.lookup (server, rcvId) (getMsgLocks c) >>= mapM_ (`tryPutTMVar` ()) +{-# INLINE releaseGetLock #-} suspendQueue :: AgentClient -> RcvQueue -> AM () suspendQueue c rq@RcvQueue {rcvId, rcvPrivateKey} = diff --git a/src/Simplex/Messaging/Agent/Env/SQLite.hs b/src/Simplex/Messaging/Agent/Env/SQLite.hs index 7b286d3d7..0c10d8cd4 100644 --- a/src/Simplex/Messaging/Agent/Env/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Env/SQLite.hs @@ -96,7 +96,8 @@ data InitialAgentServers = InitialAgentServers { smp :: Map UserId (NonEmpty (ServerCfg 'PSMP)), ntf :: [NtfServer], xftp :: Map UserId (NonEmpty (ServerCfg 'PXFTP)), - netCfg :: NetworkConfig + netCfg :: NetworkConfig, + presetDomains :: [HostName] } data ServerCfg p = ServerCfg diff --git a/src/Simplex/Messaging/Agent/Protocol.hs b/src/Simplex/Messaging/Agent/Protocol.hs index 43cceb376..463639942 100644 --- a/src/Simplex/Messaging/Agent/Protocol.hs +++ b/src/Simplex/Messaging/Agent/Protocol.hs @@ -147,6 +147,7 @@ module Simplex.Messaging.Agent.Protocol AgentMsgId, NotificationsMode (..), NotificationInfo (..), + ConnMsgReq (..), -- * Encode/decode serializeCommand, @@ -678,11 +679,21 @@ instance FromField NotificationsMode where fromField = blobFieldDecoder $ parseA data NotificationInfo = NotificationInfo { ntfConnId :: ConnId, + ntfDbQueueId :: Int64, ntfTs :: SystemTime, + -- Nothing means that the message failed to decrypt or to decode, + -- we can still show event notification ntfMsgMeta :: Maybe NMsgMeta } deriving (Show) +data ConnMsgReq = ConnMsgReq + { msgConnId :: ConnId, + msgDbQueueId :: Int64, + msgTs :: Maybe UTCTime + } + deriving (Show) + data ConnectionMode = CMInvitation | CMContact deriving (Eq, Show) diff --git a/src/Simplex/Messaging/Agent/Store/AgentStore.hs b/src/Simplex/Messaging/Agent/Store/AgentStore.hs index 237787590..446681b70 100644 --- a/src/Simplex/Messaging/Agent/Store/AgentStore.hs +++ b/src/Simplex/Messaging/Agent/Store/AgentStore.hs @@ -98,6 +98,7 @@ module Simplex.Messaging.Agent.Store.AgentStore -- Messages updateRcvIds, createRcvMsg, + setLastBrokerTs, updateRcvMsgHash, createSndMsgBody, updateSndIds, @@ -855,7 +856,11 @@ createRcvMsg db connId rq@RcvQueue {dbQueueId} rcvMsgData@RcvMsgData {msgMeta = insertRcvMsgBase_ db connId rcvMsgData insertRcvMsgDetails_ db connId rq rcvMsgData updateRcvMsgHash db connId sndMsgId internalRcvId internalHash - DB.execute db "UPDATE rcv_queues SET last_broker_ts = ? WHERE conn_id = ? AND rcv_queue_id = ?" (brokerTs, connId, dbQueueId) + setLastBrokerTs db connId dbQueueId brokerTs + +setLastBrokerTs :: DB.Connection -> ConnId -> DBQueueId 'QSStored -> UTCTime -> IO () +setLastBrokerTs db connId dbQueueId brokerTs = + DB.execute db "UPDATE rcv_queues SET last_broker_ts = ? WHERE conn_id = ? AND rcv_queue_id = ? AND (last_broker_ts IS NULL OR last_broker_ts < ?)" (brokerTs, connId, dbQueueId, brokerTs) createSndMsgBody :: DB.Connection -> AMessage -> IO Int64 createSndMsgBody db aMessage = @@ -1781,19 +1786,19 @@ getActiveNtfToken db = ntfMode = fromMaybe NMPeriodic ntfMode_ in NtfToken {deviceToken = DeviceToken provider dt, ntfServer, ntfTokenId, ntfPubKey, ntfPrivKey, ntfDhKeys, ntfDhSecret, ntfTknStatus, ntfTknAction, ntfMode} -getNtfRcvQueue :: DB.Connection -> SMPQueueNtf -> IO (Either StoreError (ConnId, RcvNtfDhSecret, Maybe UTCTime)) +getNtfRcvQueue :: DB.Connection -> SMPQueueNtf -> IO (Either StoreError (ConnId, Int64, RcvNtfDhSecret, Maybe UTCTime)) getNtfRcvQueue db SMPQueueNtf {smpServer = (SMPServer host port _), notifierId} = firstRow' res SEConnNotFound $ DB.query db [sql| - SELECT conn_id, rcv_ntf_dh_secret, last_broker_ts + SELECT conn_id, rcv_queue_id, rcv_ntf_dh_secret, last_broker_ts FROM rcv_queues WHERE host = ? AND port = ? AND ntf_id = ? AND deleted = 0 |] (host, port, notifierId) where - res (connId, Just rcvNtfDhSecret, lastBrokerTs_) = Right (connId, rcvNtfDhSecret, lastBrokerTs_) + res (connId, dbQueueId, Just rcvNtfDhSecret, lastBrokerTs_) = Right (connId, dbQueueId, rcvNtfDhSecret, lastBrokerTs_) res _ = Left SEConnNotFound setConnectionNtfs :: DB.Connection -> ConnId -> Bool -> IO () diff --git a/src/Simplex/Messaging/Client.hs b/src/Simplex/Messaging/Client.hs index 7c18a0aa1..0dbf9c84f 100644 --- a/src/Simplex/Messaging/Client.hs +++ b/src/Simplex/Messaging/Client.hs @@ -84,6 +84,7 @@ module Simplex.Messaging.Client SocksMode (..), SMPProxyMode (..), SMPProxyFallback (..), + SMPWebPortServers (..), defaultClientConfig, defaultSMPClientConfig, defaultNetworkConfig, @@ -129,7 +130,7 @@ import qualified Data.ByteString.Char8 as B import qualified Data.ByteString.Base64 as B64 import Data.Functor (($>)) import Data.Int (Int64) -import Data.List (find) +import Data.List (find, isSuffixOf) import Data.List.NonEmpty (NonEmpty (..)) import qualified Data.List.NonEmpty as L import Data.Maybe (catMaybes, fromMaybe) @@ -138,7 +139,7 @@ import qualified Data.Text as T import Data.Time.Clock (UTCTime (..), diffUTCTime, getCurrentTime) import qualified Data.X509 as X import qualified Data.X509.Validation as XV -import Network.Socket (ServiceName) +import Network.Socket (HostName, ServiceName) import Network.Socks5 (SocksCredentials (..)) import Numeric.Natural import qualified Simplex.Messaging.Crypto as C @@ -291,7 +292,7 @@ data NetworkConfig = NetworkConfig -- | Fallback to direct connection when destination SMP relay does not support SMP proxy protocol extensions smpProxyFallback :: SMPProxyFallback, -- | use web port 443 for SMP protocol - smpWebPort :: Bool, + smpWebPortServers :: SMPWebPortServers, -- | timeout for the initial client TCP/TLS connection (microseconds) tcpConnectTimeout :: Int, -- | timeout of protocol commands (microseconds) @@ -327,6 +328,12 @@ data SMPProxyFallback | SPFProhibit -- prohibit direct connection to destination relay. deriving (Eq, Show) +data SMPWebPortServers + = SWPAll + | SWPPreset + | SWPOff + deriving (Eq, Show) + instance StrEncoding SMPProxyMode where strEncode = \case SPMAlways -> "always" @@ -353,6 +360,18 @@ instance StrEncoding SMPProxyFallback where "no" -> pure SPFProhibit _ -> fail "Invalid SMP proxy fallback mode" +instance StrEncoding SMPWebPortServers where + strEncode = \case + SWPAll -> "all" + SWPPreset -> "preset" + SWPOff -> "off" + strP = + A.takeTill (== ' ') >>= \case + "all" -> pure SWPAll + "preset" -> pure SWPPreset + "off" -> pure SWPOff + _ -> fail "Invalid SMP wep port setting" + defaultNetworkConfig :: NetworkConfig defaultNetworkConfig = NetworkConfig @@ -363,7 +382,7 @@ defaultNetworkConfig = sessionMode = TSMSession, smpProxyMode = SPMNever, smpProxyFallback = SPFAllow, - smpWebPort = False, + smpWebPortServers = SWPPreset, tcpConnectTimeout = defaultTcpConnectTimeout, tcpTimeout = 15_000_000, tcpTimeoutPerKb = 5_000, @@ -498,15 +517,15 @@ type TransportSession msg = (UserId, ProtoServer msg, Maybe ByteString) -- -- A single queue can be used for multiple 'SMPClient' instances, -- as 'SMPServerTransmission' includes server information. -getProtocolClient :: forall v err msg. Protocol v err msg => TVar ChaChaDRG -> TransportSession msg -> ProtocolClientConfig v -> Maybe (TBQueue (ServerTransmissionBatch v err msg)) -> UTCTime -> (ProtocolClient v err msg -> IO ()) -> IO (Either (ProtocolClientError err) (ProtocolClient v err msg)) -getProtocolClient g transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize, networkConfig, clientALPN, serverVRange, agreeSecret, proxyServer, useSNI} msgQ proxySessTs disconnected = do +getProtocolClient :: forall v err msg. Protocol v err msg => TVar ChaChaDRG -> TransportSession msg -> ProtocolClientConfig v -> [HostName] -> Maybe (TBQueue (ServerTransmissionBatch v err msg)) -> UTCTime -> (ProtocolClient v err msg -> IO ()) -> IO (Either (ProtocolClientError err) (ProtocolClient v err msg)) +getProtocolClient g transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize, networkConfig, clientALPN, serverVRange, agreeSecret, proxyServer, useSNI} presetDomains msgQ proxySessTs disconnected = do case chooseTransportHost networkConfig (host srv) of Right useHost -> (getCurrentTime >>= mkProtocolClient useHost >>= runClient useTransport useHost) `catch` \(e :: IOException) -> pure . Left $ PCEIOError e Left e -> pure $ Left e where - NetworkConfig {smpWebPort, tcpConnectTimeout, tcpTimeout, smpPingInterval} = networkConfig + NetworkConfig {smpWebPortServers, tcpConnectTimeout, tcpTimeout, smpPingInterval} = networkConfig mkProtocolClient :: TransportHost -> UTCTime -> IO (PClient v err msg) mkProtocolClient transportHost ts = do connected <- newTVarIO False @@ -554,6 +573,13 @@ getProtocolClient g transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize SPSMP | smpWebPort -> ("443", transport @TLS) _ -> defaultTransport cfg p -> (p, transport @TLS) + where + smpWebPort = case smpWebPortServers of + SWPAll -> True + SWPPreset -> case srv of + ProtocolServer {host = THDomainName h :| _} -> any (`isSuffixOf` h) presetDomains + _ -> False + SWPOff -> False client :: forall c. Transport c => TProxy c -> PClient v err msg -> TMVar (Either (ProtocolClientError err) (ProtocolClient v err msg)) -> c -> IO () client _ c cVar h = do @@ -775,6 +801,7 @@ getSMPMessage c rpKey rId = OK -> pure Nothing cmd@(MSG msg) -> liftIO (writeSMPMessage c rId cmd) $> Just msg r -> throwE $ unexpectedResponse r +{-# INLINE getSMPMessage #-} -- | Subscribe to the SMP queue notifications. -- @@ -1262,6 +1289,8 @@ $(J.deriveJSON (enumJSON $ dropPrefix "SPM") ''SMPProxyMode) $(J.deriveJSON (enumJSON $ dropPrefix "SPF") ''SMPProxyFallback) +$(J.deriveJSON (enumJSON $ dropPrefix "SWP") ''SMPWebPortServers) + $(J.deriveJSON defaultJSON ''NetworkConfig) $(J.deriveJSON (sumTypeJSON $ dropPrefix "Proxy") ''ProxyClientError) diff --git a/src/Simplex/Messaging/Client/Agent.hs b/src/Simplex/Messaging/Client/Agent.hs index 1e3d71b66..5f4774944 100644 --- a/src/Simplex/Messaging/Client/Agent.hs +++ b/src/Simplex/Messaging/Client/Agent.hs @@ -198,7 +198,7 @@ isOwnServer SMPClientAgent {agentCfg} ProtocolServer {host} = -- | Run an SMP client for SMPClientVar connectClient :: SMPClientAgent -> SMPServer -> SMPClientVar -> IO (Either SMPClientError SMPClient) connectClient ca@SMPClientAgent {agentCfg, smpClients, smpSessions, msgQ, randomDrg, startedAt} srv v = - getProtocolClient randomDrg (1, srv, Nothing) (smpCfg agentCfg) (Just msgQ) startedAt clientDisconnected + getProtocolClient randomDrg (1, srv, Nothing) (smpCfg agentCfg) [] (Just msgQ) startedAt clientDisconnected where clientDisconnected :: SMPClient -> IO () clientDisconnected smp = do diff --git a/src/Simplex/Messaging/Encoding/String.hs b/src/Simplex/Messaging/Encoding/String.hs index c963ec99a..c114cee8e 100644 --- a/src/Simplex/Messaging/Encoding/String.hs +++ b/src/Simplex/Messaging/Encoding/String.hs @@ -144,7 +144,7 @@ instance StrEncoding SystemTime where instance StrEncoding UTCTime where strEncode = B.pack . iso8601Show - strP = maybe (Left "bad UTCTime") Right . iso8601ParseM . B.unpack <$?> A.takeTill (\c -> c == ' ' || c == '\n') + strP = maybe (Left "bad UTCTime") Right . iso8601ParseM . B.unpack <$?> A.takeTill (\c -> c == ' ' || c == '\n' || c == ',' || c == ';') -- lists encode/parse as comma-separated strings strEncodeList :: StrEncoding a => [a] -> ByteString diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index c49703d1c..a3a8d7056 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -1946,7 +1946,7 @@ testOnlyCreatePullSlowHandshake = withAgentClientsCfg2 agentProxyCfgV8 agentProx getMsg :: AgentClient -> ConnId -> ExceptT AgentErrorType IO a -> ExceptT AgentErrorType IO a getMsg c cId action = do liftIO $ noMessages c "nothing should be delivered before GET" - [Just _] <- lift $ getConnectionMessages c [cId] + [Right (Just _)] <- lift $ getConnectionMessages c [ConnMsgReq cId 1 Nothing] action getMSGNTF :: AgentClient -> ConnId -> ExceptT AgentErrorType IO () diff --git a/tests/AgentTests/NotificationTests.hs b/tests/AgentTests/NotificationTests.hs index a5a3e4069..d2b7b864f 100644 --- a/tests/AgentTests/NotificationTests.hs +++ b/tests/AgentTests/NotificationTests.hs @@ -57,6 +57,7 @@ import Data.Text (Text) import qualified Data.Text as T import Data.Text.Encoding (encodeUtf8) import qualified Data.Text.IO as TIO +import Data.Time.Clock.System (systemToUTCTime) import NtfClient import SMPAgentClient (agentCfg, initAgentServers, initAgentServers2, testDB, testDB2, testNtfServer, testNtfServer2) import SMPClient (cfgMS, cfgJ2QS, cfgVPrev, ntfTestPort, ntfTestPort2, serverStoreConfig, testPort, testPort2, withSmpServer, withSmpServerConfigOn, withSmpServerStoreLogOn, withSmpServerStoreMsgLogOn, xit'') @@ -75,7 +76,7 @@ import Simplex.Messaging.Notifications.Server.Env (NtfServerConfig (..)) import Simplex.Messaging.Notifications.Server.Push.APNS import Simplex.Messaging.Notifications.Types (NtfTknAction (..), NtfToken (..)) import Simplex.Messaging.Parsers (parseAll) -import Simplex.Messaging.Protocol (ErrorType (AUTH), MsgFlags (MsgFlags), NtfServer, ProtocolServer (..), SMPMsgMeta (..), SubscriptionMode (..)) +import Simplex.Messaging.Protocol (ErrorType (AUTH), MsgFlags (MsgFlags), NMsgMeta (..), NtfServer, ProtocolServer (..), SMPMsgMeta (..), SubscriptionMode (..)) import qualified Simplex.Messaging.Protocol as SMP import Simplex.Messaging.Server.Env.STM (AStoreType (..), ServerConfig (..)) import Simplex.Messaging.Transport (ATransport) @@ -561,11 +562,10 @@ testNotificationSubscriptionExistingConnection apns baseId alice@AgentClient {ag (nonce, message) <- messageNotification apns tkn pure (bobId, aliceId, nonce, message) - Right [NotificationInfo {ntfConnId = cId}] <- runExceptT $ getNotificationConns alice nonce message + Right [NotificationInfo {ntfConnId = cId, ntfMsgMeta = Just NMsgMeta {msgTs}}] <- runExceptT $ getNotificationConns alice nonce message cId `shouldBe` bobId -- alice client already has subscription for the connection, - -- so get fails with CMD PROHIBITED (transformed into Nothing in catch) - [Nothing] <- getConnectionMessages alice [cId] + [Left (CMD PROHIBITED _)] <- getConnectionMessages alice [ConnMsgReq cId 1 $ Just $ systemToUTCTime msgTs] threadDelay 500000 suspendAgent alice 0 @@ -575,7 +575,7 @@ testNotificationSubscriptionExistingConnection apns baseId alice@AgentClient {ag -- aliceNtf client doesn't have subscription and is allowed to get notification message withAgent 3 aliceCfg initAgentServers testDB $ \aliceNtf -> do - (Just SMPMsgMeta {msgFlags = MsgFlags True}) :| _ <- getConnectionMessages aliceNtf [cId] + (Right (Just SMPMsgMeta {msgFlags = MsgFlags True})) :| _ <- getConnectionMessages aliceNtf [ConnMsgReq cId 1 $ Just $ systemToUTCTime msgTs] pure () threadDelay 1000000 >> callCommand "sync" >> threadDelay 1000000 diff --git a/tests/AgentTests/ServerChoice.hs b/tests/AgentTests/ServerChoice.hs index 0df995d08..12e690888 100644 --- a/tests/AgentTests/ServerChoice.hs +++ b/tests/AgentTests/ServerChoice.hs @@ -61,7 +61,8 @@ initServers = { smp = M.fromList [(1, testSMPServers)], ntf = [testNtfServer], xftp = userServers [testXFTPServer], - netCfg = defaultNetworkConfig + netCfg = defaultNetworkConfig, + presetDomains = [] } testChooseDifferentOperator :: IO () diff --git a/tests/SMPAgentClient.hs b/tests/SMPAgentClient.hs index 9fce42669..1c256c092 100644 --- a/tests/SMPAgentClient.hs +++ b/tests/SMPAgentClient.hs @@ -64,7 +64,8 @@ initAgentServers = { smp = userServers [testSMPServer], ntf = [testNtfServer], xftp = userServers [testXFTPServer], - netCfg = defaultNetworkConfig {tcpTimeout = 500_000, tcpConnectTimeout = 500_000} + netCfg = defaultNetworkConfig {tcpTimeout = 500_000, tcpConnectTimeout = 500_000}, + presetDomains = [] } initAgentServers2 :: InitialAgentServers diff --git a/tests/SMPProxyTests.hs b/tests/SMPProxyTests.hs index 7ef66544e..c26e97902 100644 --- a/tests/SMPProxyTests.hs +++ b/tests/SMPProxyTests.hs @@ -167,12 +167,12 @@ deliverMessagesViaProxy proxyServ relayServ alg unsecuredMsgs securedMsgs = do g <- C.newRandom -- set up proxy ts <- getCurrentTime - pc' <- getProtocolClient g (1, proxyServ, Nothing) defaultSMPClientConfig {serverVRange = mkVersionRange minServerSMPRelayVersion currentClientSMPRelayVersion} Nothing ts (\_ -> pure ()) + pc' <- getProtocolClient g (1, proxyServ, Nothing) defaultSMPClientConfig {serverVRange = mkVersionRange minServerSMPRelayVersion currentClientSMPRelayVersion} [] Nothing ts (\_ -> pure ()) pc <- either (fail . show) pure pc' THAuthClient {} <- maybe (fail "getProtocolClient returned no thAuth") pure $ thAuth $ thParams pc -- set up relay msgQ <- newTBQueueIO 1024 - rc' <- getProtocolClient g (2, relayServ, Nothing) defaultSMPClientConfig {serverVRange = mkVersionRange minServerSMPRelayVersion authCmdsSMPVersion} (Just msgQ) ts (\_ -> pure ()) + rc' <- getProtocolClient g (2, relayServ, Nothing) defaultSMPClientConfig {serverVRange = mkVersionRange minServerSMPRelayVersion authCmdsSMPVersion} [] (Just msgQ) ts (\_ -> pure ()) rc <- either (fail . show) pure rc' -- prepare receiving queue (rPub, rPriv) <- atomically $ C.generateAuthKeyPair alg g @@ -210,7 +210,7 @@ proxyConnectDeadRelay n d proxyServ = do g <- C.newRandom -- set up proxy ts <- getCurrentTime - pc' <- getProtocolClient g (1, proxyServ, Nothing) defaultSMPClientConfig {serverVRange = mkVersionRange minServerSMPRelayVersion sendingProxySMPVersion} Nothing ts (\_ -> pure ()) + pc' <- getProtocolClient g (1, proxyServ, Nothing) defaultSMPClientConfig {serverVRange = mkVersionRange minServerSMPRelayVersion sendingProxySMPVersion} [] Nothing ts (\_ -> pure ()) pc <- either (fail . show) pure pc' THAuthClient {} <- maybe (fail "getProtocolClient returned no thAuth") pure $ thAuth $ thParams pc -- get proxy session