proxy: agent implementation (#1106)

* proxy: agent implementation

* revert change

* update rfc

* test stuck subscription mock

* store proxy sessions inside SMP client var

* rename

* create and use proxy session

* tests

* return proxy in SENT event

* rename, more tests

* rename

* more tests

* remove comment

---------

Co-authored-by: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com>
This commit is contained in:
Evgeny Poberezkin
2024-05-01 08:48:33 +01:00
committed by GitHub
parent 2d1609f222
commit 6d60de2429
11 changed files with 491 additions and 218 deletions
+50
View File
@@ -196,6 +196,56 @@ dhPublic = length x509encoded
The above assumes that the client can only send one message to an SMP relay and then has to wait for response before sending the next message. Missing the response would cause re-delivery (further improvement is possible when proxy detects these redelieveries and not send them to relays but simply reply with the same response).
### Implementation considerations for the client
While client/server protocol is rather straightforward to implement, and it is already working, there are some decisions to make about how the client makes decisions about.
1. When to use proxy and when to connect directly to the destination relay.
While from the perspective of threat model improvement it may be beneficial to always use the proxy, choosing the proxy that is different from other relays in the connection, initially we need to make it opt-in, with an option to only use it for unknown destination relays, to minimize any unexpected adverse effect on the delivery latency.
Proxy mode will be passed from the client via NetworkConfig.
2. Which proxying relays to use.
Ability to request access to the session with the destination relay (and to create such session) is protected with the same basic auth approach as creating queues - the logic here is that opening private servers to all users as proxies would increase the scenarios for DoS attacks (which is the case with the public servers).
The open question is whether the client should choose proxies from:
- all configured relays.
- there should be a subset of configured relays.
- there should be a separate list.
E.g., there could be a second toggle in the relay configuration to allow using relay as proxy, in addition to the current toggle that allows creating queues.
For simplicity, initially we will just use all enabled relays as potential proxies.
3. How many proxying relays should be used during one session.
This is not a simple question, and it creates a contradiction between two risks:
- collusion between proxies and destination relays simplifies correlating sending clients by session - from the point of view of this risk, clients should follow the same policy for creating connections with proxies, that is to create a new connection for each user profile, and if transport isolation is set to "per connection" - for each destination queue.
- traffic correlation by observable traffic sessions (particularly if an attacker can observe user's ISP traffic or multiple proxies) - from this point of view, it would be beneficial to use fewer proxies and fewer connections with proxies and see the risk of proxy colluding with the destination relay as lower than the risk of traffic observation that in the case of multiple sessions would allow to correlate traffic to rarely used destination relays (any private self-hosted relays) and the traffic of the user to a given proxy, to prove the fact of user communicating with the destination relay via the proxy.
While we can transfer this choice on the users, it seems a complex decision to make, and overall the second risk (traffic correlation) seems more important to address than the first.
In any case possible options are:
1. Extreme option 1: Create a new proxy session, with the new random proxy, for each potential transport session that would exist if the user were to be connected to destination relays directly. That is, never to mix access to multiple relays from multiple user profiles (and in case of per-connection isolation, to multiple queues) into a one client session with proxy. This is a rather radical option that nullifies any advantages of having fewer sessions with proxies than there would have been with the destination relays and removes any benefits of batching destination server session requests (PRXY comands).
2. Extreme option 2: Use only one proxy session at the time, mixing traffic from all user profiles and to all destination servers (and for all queues) into a session with one proxy. This minimizes the risks of traffic correlation in case of non-colluding proxy, but maximises the risk in case it colludes with the destination relays.
3. Balanced option: Use one proxy session per user profile, but mix traffic to multiple queues irrespective of connection isolation option and to all destination servers. Given that connection isolation is an experimental option, this makes the most sense, but it would have to be disclosed.
4. Less balanced option: take connection isolation option into account and create a new proxy connection for each destination queue. This feels worse than option 3.
If option 3 is chosen, then the transport session key with the proxy would be different from the transport session key with the relay - proxy session will only use UserId as the key, and the relay session uses (UserId, Server, Maybe EntityId) as the key.
If option 4 is chosen, the keys would also be different, as the proxy would then use (UserId, Maybe (Server, EntityId)) as the key.
We could potentially key proxy sessions (and create proxy connections) per each destination relay, in the same way as we key relays themselves, but it seems to have the least sense, as we neither achieve isolation by queue in case proxy and destination relay collude, nor we sufficiently protect from traffic correlation by any observers.
The implemented design is this:
- for each destination relay a random proxy is chosen and used to send all messages - all requests from a client coalesce to a single session.
- transport isolation mode is taken into account, that is if per-connection isolation is enabled, then a separate proxy connection will be created for each messaging queue.
- supported modes when proxy is used: always, for unknown relays, for unknown relays when IP address is not protected, never.
This decision is made because the argument for protection against collusion between proxy and relay and more balanced traffic distribution is stronger than the argument for protection against traffic correlation, because even mixing all messages to one proxy connection does not provide protection against traffic correlation by time, so in any case it requires adding delays.
### Threat model for SMP proxy and changes to threat model for SMP
#### SMP proxy
+11 -11
View File
@@ -149,7 +149,7 @@ import Simplex.FileTransfer.Protocol (FileParty (..))
import Simplex.FileTransfer.Util (removePath)
import Simplex.Messaging.Agent.Client
import Simplex.Messaging.Agent.Env.SQLite
import Simplex.Messaging.Agent.Lock (withLock', withLock)
import Simplex.Messaging.Agent.Lock (withLock, withLock')
import Simplex.Messaging.Agent.NtfSubSupervisor
import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Agent.RetryInterval
@@ -160,7 +160,7 @@ import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations
import Simplex.Messaging.Client (ProtocolClient (..), ServerTransmission)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Crypto.File (CryptoFile, CryptoFileArgs)
import Simplex.Messaging.Crypto.Ratchet (PQEncryption, PQSupport (..), pattern PQEncOn, pattern PQEncOff, pattern PQSupportOn, pattern PQSupportOff)
import Simplex.Messaging.Crypto.Ratchet (PQEncryption, PQSupport (..), pattern PQEncOff, pattern PQEncOn, pattern PQSupportOff, pattern PQSupportOn)
import qualified Simplex.Messaging.Crypto.Ratchet as CR
import Simplex.Messaging.Encoding
import Simplex.Messaging.Encoding.String
@@ -198,7 +198,7 @@ getSMPAgentClient_ clientId cfg initServers store backgroundMode =
liftIO $ newSMPAgentEnv cfg store >>= runReaderT runAgent
where
runAgent = do
c@AgentClient {acThread} <- atomically . newAgentClient clientId initServers =<< ask
c@AgentClient {acThread} <- atomically . newAgentClient clientId initServers =<< ask
t <- runAgentThreads c `forkFinally` const (liftIO $ disconnectAgentClient c)
atomically . writeTVar acThread . Just =<< mkWeakThreadId t
pure c
@@ -239,7 +239,7 @@ createUser c = withAgentEnv c .: createUser' c
{-# INLINE createUser #-}
-- | Delete user record optionally deleting all user's connections on SMP servers
deleteUser :: AgentClient -> UserId -> Bool -> AE ()
deleteUser :: AgentClient -> UserId -> Bool -> AE ()
deleteUser c = withAgentEnv c .: deleteUser' c
{-# INLINE deleteUser #-}
@@ -815,7 +815,7 @@ joinConnSrv c userId connId enableNtfs cReqUri@CRContactUri {} cInfo pqSup subMo
lift (compatibleContactUri cReqUri pqSup) >>= \case
Just (qInfo, vrsn) -> do
(connId', cReq) <- newConnSrv c userId connId enableNtfs SCMInvitation Nothing (CR.IKNoPQ pqSup) subMode srv
sendInvitation c userId qInfo vrsn cReq cInfo
void $ sendInvitation c userId qInfo vrsn cReq cInfo
pure connId'
Nothing -> throwError $ AGENT A_VERSION
@@ -1209,7 +1209,7 @@ enqueueMessage c cData sq msgFlags aMessage =
{-# INLINE enqueueMessage #-}
-- this function is used only for sending messages in batch, it returns the list of successes to enqueue additional deliveries
enqueueMessageB :: forall t. (Traversable t) => AgentClient -> t (Either AgentErrorType (ConnData, NonEmpty SndQueue, Maybe PQEncryption, MsgFlags, AMessage)) -> AM' (t (Either AgentErrorType ((AgentMsgId, PQEncryption), Maybe (ConnData, [SndQueue], AgentMsgId))))
enqueueMessageB :: forall t. Traversable t => AgentClient -> t (Either AgentErrorType (ConnData, NonEmpty SndQueue, Maybe PQEncryption, MsgFlags, AMessage)) -> AM' (t (Either AgentErrorType ((AgentMsgId, PQEncryption), Maybe (ConnData, [SndQueue], AgentMsgId))))
enqueueMessageB c reqs = do
cfg <- asks config
reqMids <- withStoreBatch c $ \db -> fmap (bindRight $ storeSentMsg db cfg) reqs
@@ -1242,7 +1242,7 @@ enqueueSavedMessage :: AgentClient -> ConnData -> AgentMsgId -> SndQueue -> AM'
enqueueSavedMessage c cData msgId sq = enqueueSavedMessageB c $ Identity (cData, [sq], msgId)
{-# INLINE enqueueSavedMessage #-}
enqueueSavedMessageB :: (Foldable t) => AgentClient -> t (ConnData, [SndQueue], AgentMsgId) -> AM' ()
enqueueSavedMessageB :: Foldable t => AgentClient -> t (ConnData, [SndQueue], AgentMsgId) -> AM' ()
enqueueSavedMessageB c reqs = do
-- saving to the database is in the start to avoid race conditions when delivery is read from queue before it is saved
void $ withStoreBatch' c $ \db -> concatMap (storeDeliveries db) reqs
@@ -1333,7 +1333,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} ConnData {connId} sq (Worker {doWork
retrySndMsg riMode = do
withStore' c $ \db -> updatePendingMsgRIState db connId msgId riState
retrySndOp c $ loop riMode
Right () -> do
Right proxySrv_ -> do
case msgType of
AM_CONN_INFO -> setConfirmed
AM_CONN_INFO_REPLY -> setConfirmed
@@ -1355,7 +1355,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} ConnData {connId} sq (Worker {doWork
when (status == Active) $ notify $ CON pqEncryption
-- this branch should never be reached as receive queue is created before the confirmation,
_ -> logError "HELLO sent without receive queue"
AM_A_MSG_ -> notify $ SENT mId
AM_A_MSG_ -> notify $ SENT mId proxySrv_
AM_A_RCVD_ -> pure ()
AM_QCONT_ -> pure ()
AM_QADD_ -> pure ()
@@ -2212,7 +2212,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v,
where
processEND = \case
Just (Right clnt)
| sessId == sessionId (thParams clnt) -> do
| sessId == sessionId (thParams $ connectedClient clnt) -> do
removeSubscription c connId
notify' END
pure "END"
@@ -2574,7 +2574,7 @@ confirmQueueAsync c cData sq srv connInfo e2eEncryption_ subMode = do
confirmQueue :: Compatible VersionSMPA -> AgentClient -> ConnData -> SndQueue -> SMPServerWithAuth -> ConnInfo -> Maybe (CR.SndE2ERatchetParams 'C.X448) -> SubscriptionMode -> AM ()
confirmQueue (Compatible agentVersion) c cData@ConnData {connId, pqSupport} sq srv connInfo e2eEncryption_ subMode = do
msg <- mkConfirmation =<< mkAgentConfirmation c cData sq srv connInfo subMode
sendConfirmation c sq msg
void $ sendConfirmation c sq msg
withStore' c $ \db -> setSndQueueStatus db sq Confirmed
where
mkConfirmation :: AgentMessage -> AM MsgBody
+198 -75
View File
@@ -132,6 +132,8 @@ module Simplex.Messaging.Agent.Client
SMPTransportSession,
NtfTransportSession,
XFTPTransportSession,
ProxiedRelay (..),
SMPConnectedClient (..),
)
where
@@ -230,7 +232,7 @@ import Simplex.Messaging.Session
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport (SMPVersion)
import Simplex.Messaging.Transport.Client (TransportHost)
import Simplex.Messaging.Transport.Client (TransportHost (..))
import Simplex.Messaging.Util
import Simplex.Messaging.Version
import System.Mem.Weak (Weak)
@@ -263,6 +265,10 @@ data AgentClient = AgentClient
msgQ :: TBQueue (ServerTransmission SMPVersion BrokerMsg),
smpServers :: TMap UserId (NonEmpty SMPServerWithAuth),
smpClients :: TMap SMPTransportSession SMPClientVar,
-- smpProxiedRelays:
-- SMPTransportSession defines connection from proxy to relay,
-- SMPServerWithAuth defines client connected to SMP proxy (with the same userId and entityId in TransportSession)
smpProxiedRelays :: TMap SMPTransportSession SMPServerWithAuth,
ntfServers :: TVar [NtfServer],
ntfClients :: TMap NtfTransportSession NtfClientVar,
xftpServers :: TMap UserId (NonEmpty XFTPServerWithAuth),
@@ -297,6 +303,13 @@ data AgentClient = AgentClient
agentEnv :: Env
}
data SMPConnectedClient = SMPConnectedClient
{ connectedClient :: SMPClient,
proxiedRelays :: TMap SMPServer ProxiedRelayVar
}
type ProxiedRelayVar = SessionVar (Either AgentErrorType ProxiedRelay)
getAgentWorker :: (Ord k, Show k) => String -> Bool -> AgentClient -> k -> TMap k Worker -> (Worker -> AM ()) -> AM' Worker
getAgentWorker = getAgentWorker' id pure
{-# INLINE getAgentWorker #-}
@@ -428,6 +441,7 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv =
msgQ <- newTBQueue qSize
smpServers <- newTVar smp
smpClients <- TM.empty
smpProxiedRelays <- TM.empty
ntfServers <- newTVar ntf
ntfClients <- TM.empty
xftpServers <- newTVar xftp
@@ -463,6 +477,7 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg} agentEnv =
msgQ,
smpServers,
smpClients,
smpProxiedRelays,
ntfServers,
ntfClients,
xftpServers,
@@ -511,15 +526,19 @@ agentDRG AgentClient {agentEnv = Env {random}} = random
class (Encoding err, Show err) => ProtocolServerClient v err msg | msg -> v, msg -> err where
type Client msg = c | c -> msg
getProtocolServerClient :: AgentClient -> TransportSession msg -> AM (Client msg)
type ProtoClient msg = c | c -> msg
protocolClient :: Client msg -> ProtoClient msg
clientProtocolError :: err -> AgentErrorType
closeProtocolServerClient :: Client msg -> IO ()
clientServer :: Client msg -> String
clientTransportHost :: Client msg -> TransportHost
clientSessionTs :: Client msg -> UTCTime
closeProtocolServerClient :: ProtoClient msg -> IO ()
clientServer :: ProtoClient msg -> String
clientTransportHost :: ProtoClient msg -> TransportHost
clientSessionTs :: ProtoClient msg -> UTCTime
instance ProtocolServerClient SMPVersion ErrorType BrokerMsg where
type Client BrokerMsg = ProtocolClient SMPVersion ErrorType BrokerMsg
type Client BrokerMsg = SMPConnectedClient
getProtocolServerClient = getSMPServerClient
type ProtoClient BrokerMsg = ProtocolClient SMPVersion ErrorType BrokerMsg
protocolClient = connectedClient
clientProtocolError = SMP
closeProtocolServerClient = closeProtocolClient
clientServer = protocolClientServer
@@ -529,6 +548,8 @@ instance ProtocolServerClient SMPVersion ErrorType BrokerMsg where
instance ProtocolServerClient NTFVersion ErrorType NtfResponse where
type Client NtfResponse = ProtocolClient NTFVersion ErrorType NtfResponse
getProtocolServerClient = getNtfServerClient
type ProtoClient NtfResponse = ProtocolClient NTFVersion ErrorType NtfResponse
protocolClient = id
clientProtocolError = NTF
closeProtocolServerClient = closeProtocolClient
clientServer = protocolClientServer
@@ -538,61 +559,120 @@ instance ProtocolServerClient NTFVersion ErrorType NtfResponse where
instance ProtocolServerClient XFTPVersion XFTPErrorType FileResponse where
type Client FileResponse = XFTPClient
getProtocolServerClient = getXFTPServerClient
type ProtoClient FileResponse = XFTPClient
protocolClient = id
clientProtocolError = XFTP
closeProtocolServerClient = X.closeXFTPClient
clientServer = X.xftpClientServer
clientTransportHost = X.xftpTransportHost
clientSessionTs = X.xftpSessionTs
getSMPServerClient :: AgentClient -> SMPTransportSession -> AM SMPClient
getSMPServerClient c@AgentClient {active, smpClients, msgQ, workerSeq} tSess@(userId, srv, _) = do
getSMPServerClient :: AgentClient -> SMPTransportSession -> AM SMPConnectedClient
getSMPServerClient c@AgentClient {active, smpClients, workerSeq} tSess = do
unlessM (readTVarIO active) . throwError $ INACTIVE
atomically (getSessVar workerSeq tSess smpClients)
>>= either newClient (waitForProtocolClient c tSess)
where
-- we resubscribe only on newClient error, but not on waitForProtocolClient error,
-- as the large number of delivery workers waiting for the client TMVar
-- make it expensive to check for pending subscriptions.
newClient v =
newProtocolClient c tSess smpClients connectClient v
`catchAgentError` \e -> lift (resubscribeSMPSession c tSess) >> throwError e
connectClient :: SMPClientVar -> AM SMPClient
connectClient v = do
newClient v = do
prs <- atomically TM.empty
smpConnectClient c tSess prs v
getSMPProxyClient :: AgentClient -> SMPTransportSession -> AM (SMPConnectedClient, ProxiedRelay)
getSMPProxyClient c@AgentClient {active, smpClients, smpProxiedRelays, workerSeq} destSess@(userId, destSrv, qId) = do
unlessM (readTVarIO active) . throwError $ INACTIVE
proxySrv <- getNextServer c userId [destSrv]
atomically (getClientVar proxySrv) >>= \(tSess, auth, v) ->
either (newProxyClient tSess auth) (waitForProxyClient tSess auth) v
where
getClientVar :: SMPServerWithAuth -> STM (SMPTransportSession, Maybe SMP.BasicAuth, Either SMPClientVar SMPClientVar)
getClientVar proxySrv = do
ProtoServerWithAuth srv auth <- TM.lookup destSess smpProxiedRelays >>= maybe (TM.insert destSess proxySrv smpProxiedRelays $> proxySrv) pure
let tSess = (userId, srv, qId)
(tSess,auth,) <$> getSessVar workerSeq tSess smpClients
newProxyClient :: SMPTransportSession -> Maybe SMP.BasicAuth -> SMPClientVar -> AM (SMPConnectedClient, ProxiedRelay)
newProxyClient tSess auth v = do
(prs, rv) <- atomically $ do
prs <- TM.empty
-- we do not need to check if it is a new proxied relay session,
-- as the client is just created and there are no sessions yet
(prs,) . either id id <$> getSessVar workerSeq destSrv prs
clnt <- smpConnectClient c tSess prs v
(clnt,) <$> newProxiedRelay clnt auth rv
waitForProxyClient :: SMPTransportSession -> Maybe SMP.BasicAuth -> SMPClientVar -> AM (SMPConnectedClient, ProxiedRelay)
waitForProxyClient tSess auth v = do
clnt@(SMPConnectedClient _ prs) <- waitForProtocolClient c tSess v
sess <-
atomically (getSessVar workerSeq destSrv prs)
>>= either (newProxiedRelay clnt auth) (waitForProxiedRelay tSess)
pure (clnt, sess)
newProxiedRelay :: SMPConnectedClient -> Maybe SMP.BasicAuth -> ProxiedRelayVar -> AM ProxiedRelay
newProxiedRelay clnt@(SMPConnectedClient smp prs) proxyAuth rv =
tryAgentError (liftClient SMP (clientServer smp) $ connectSMPProxiedRelay smp destSrv proxyAuth) >>= \case
Right sess -> do
atomically $ putTMVar (sessionVar rv) (Right sess)
liftIO $ incClientStat c userId clnt "PROXY" "OK"
pure sess
Left e -> do
liftIO $ incClientStat c userId clnt "PROXY" $ strEncode e
atomically $ do
removeSessVar rv destSrv prs
TM.delete destSess smpProxiedRelays
putTMVar (sessionVar rv) (Left e)
throwError e -- signal error to caller
waitForProxiedRelay :: SMPTransportSession -> ProxiedRelayVar -> AM ProxiedRelay
waitForProxiedRelay (_, srv, _) rv = do
NetworkConfig {tcpConnectTimeout} <- atomically $ getNetworkConfig c
sess_ <- liftIO $ tcpConnectTimeout `timeout` atomically (readTMVar $ sessionVar rv)
liftEither $ case sess_ of
Just (Right sess) -> Right sess
Just (Left e) -> Left e
Nothing -> Left $ BROKER (B.unpack $ strEncode srv) TIMEOUT
smpConnectClient :: AgentClient -> SMPTransportSession -> TMap SMPServer ProxiedRelayVar -> SMPClientVar -> AM SMPConnectedClient
smpConnectClient c@AgentClient {smpClients, msgQ} tSess@(_, srv, _) prs v =
newProtocolClient c tSess smpClients connectClient v
`catchAgentError` \e -> lift (resubscribeSMPSession c tSess) >> throwError e
where
connectClient :: SMPClientVar -> AM SMPConnectedClient
connectClient v' = do
cfg <- lift $ getClientConfig c smpCfg
g <- asks random
env <- ask
liftError' (protocolClientError SMP $ B.unpack $ strEncode srv) $
getProtocolClient g tSess cfg (Just msgQ) $
clientDisconnected env v
liftError (protocolClientError SMP $ B.unpack $ strEncode srv) $ do
smp <- ExceptT $ getProtocolClient g tSess cfg (Just msgQ) $ smpClientDisconnected c tSess env v' prs
pure SMPConnectedClient {connectedClient = smp, proxiedRelays = prs}
clientDisconnected :: Env -> SMPClientVar -> SMPClient -> IO ()
clientDisconnected env v client = do
removeClientAndSubs >>= serverDown
logInfo . decodeUtf8 $ "Agent disconnected from " <> showServer srv
smpClientDisconnected :: AgentClient -> SMPTransportSession -> Env -> SMPClientVar -> TMap SMPServer ProxiedRelayVar -> SMPClient -> IO ()
smpClientDisconnected c@AgentClient {active, smpClients, smpProxiedRelays} tSess@(userId, srv, qId) env v prs client = do
removeClientAndSubs >>= serverDown
logInfo . decodeUtf8 $ "Agent disconnected from " <> showServer srv
where
-- we make active subscriptions pending only if the client for tSess was current (in the map) and active,
-- because we can have a race condition when a new current client could have already
-- made subscriptions active, and the old client would be processing diconnection later.
removeClientAndSubs :: IO ([RcvQueue], [ConnId])
removeClientAndSubs = atomically $ ifM currentActiveClient removeSubs $ pure ([], [])
where
-- we make active subscriptions pending only if the client for tSess was current (in the map) and active,
-- because we can have a race condition when a new current client could have already
-- made subscriptions active, and the old client would be processing diconnection later.
removeClientAndSubs :: IO ([RcvQueue], [ConnId])
removeClientAndSubs = atomically $ ifM currentActiveClient removeSubs $ pure ([], [])
where
currentActiveClient = (&&) <$> removeSessVar' v tSess smpClients <*> readTVar active
removeSubs = do
(qs, cs) <- RQ.getDelSessQueues tSess $ activeSubs c
RQ.batchAddQueues (pendingSubs c) qs
pure (qs, cs)
currentActiveClient = (&&) <$> removeSessVar' v tSess smpClients <*> readTVar active
removeSubs = do
(qs, cs) <- RQ.getDelSessQueues tSess $ activeSubs c
RQ.batchAddQueues (pendingSubs c) qs
-- this removes proxied relays that this client created sessions to
destSrvs <- M.keys <$> readTVar prs
forM_ destSrvs $ \destSrv -> TM.delete (userId, destSrv, qId) smpProxiedRelays
pure (qs, cs)
serverDown :: ([RcvQueue], [ConnId]) -> IO ()
serverDown (qs, conns) = whenM (readTVarIO active) $ do
incClientStat c userId client "DISCONNECT" ""
notifySub "" $ hostEvent DISCONNECT client
unless (null conns) $ notifySub "" $ DOWN srv conns
unless (null qs) $ do
atomically $ mapM_ (releaseGetLock c) qs
runReaderT (resubscribeSMPSession c tSess) env
serverDown :: ([RcvQueue], [ConnId]) -> IO ()
serverDown (qs, conns) = whenM (readTVarIO active) $ do
incClientStat' c userId client "DISCONNECT" ""
notifySub "" $ hostEvent' DISCONNECT client
unless (null conns) $ notifySub "" $ DOWN srv conns
unless (null qs) $ do
atomically $ mapM_ (releaseGetLock c) qs
runReaderT (resubscribeSMPSession c tSess) env
notifySub :: forall e. AEntityI e => ConnId -> ACommand 'Agent e -> IO ()
notifySub connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, APC (sAEntity @e) cmd)
notifySub :: forall e. AEntityI e => ConnId -> ACommand 'Agent e -> IO ()
notifySub connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, APC (sAEntity @e) cmd)
resubscribeSMPSession :: AgentClient -> SMPTransportSession -> AM' ()
resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess =
@@ -735,7 +815,11 @@ newProtocolClient c tSess@(userId, srv, entityId_) clients connectClient v =
throwError e -- signal error to caller
hostEvent :: forall v err msg. (ProtocolTypeI (ProtoType msg), ProtocolServerClient v err msg) => (AProtocolType -> TransportHost -> ACommand 'Agent 'AENone) -> Client msg -> ACommand 'Agent 'AENone
hostEvent event = event (AProtocolType $ protocolTypeI @(ProtoType msg)) . clientTransportHost
hostEvent event = hostEvent' event . protocolClient
{-# INLINE hostEvent #-}
hostEvent' :: forall v err msg. (ProtocolTypeI (ProtoType msg), ProtocolServerClient v err msg) => (AProtocolType -> TransportHost -> ACommand 'Agent 'AENone) -> ProtoClient msg -> ACommand 'Agent 'AENone
hostEvent' event = event (AProtocolType $ protocolTypeI @(ProtoType msg)) . clientTransportHost
getClientConfig :: AgentClient -> (AgentConfig -> ProtocolClientConfig v) -> AM' (ProtocolClientConfig v)
getClientConfig c cfgSel = do
@@ -842,7 +926,7 @@ closeClient_ c v = do
NetworkConfig {tcpConnectTimeout} <- atomically $ getNetworkConfig c
E.handle (\BlockedIndefinitelyOnSTM -> pure ()) $
tcpConnectTimeout `timeout` atomically (readTMVar $ sessionVar v) >>= \case
Just (Right client) -> closeProtocolServerClient client `catchAll_` pure ()
Just (Right client) -> closeProtocolServerClient (protocolClient client) `catchAll_` pure ()
_ -> pure ()
closeXFTPServerClient :: AgentClient -> UserId -> XFTPServer -> FileDigest -> IO ()
@@ -895,6 +979,22 @@ withClient_ c tSess@(userId, srv, _) statCmd action = do
stat cl $ strEncode e
throwError e
withProxySession :: AgentClient -> SMPTransportSession -> SMP.SenderId -> ByteString -> ((SMPConnectedClient, ProxiedRelay) -> AM a) -> AM a
withProxySession c destSess@(userId, destSrv, _) entId cmdStr action = do
cp@(cl, _) <- getSMPProxyClient c destSess
logServer ("--> " <> proxySrv cl <> " >") c destSrv entId cmdStr
r <- (action cp <* stat cl "OK") `catchAgentError` logServerError cl
logServer ("<-- " <> proxySrv cl <> " <") c destSrv entId "OK"
pure r
where
stat cl = liftIO . incClientStat c userId cl cmdStr
proxySrv = showServer . protocolClientServer' . protocolClient
logServerError :: SMPConnectedClient -> AgentErrorType -> AM a
logServerError cl e = do
logServer ("<-- " <> proxySrv cl <> " <") c destSrv "" $ strEncode e
stat cl $ strEncode e
throwError e
withLogClient_ :: ProtocolServerClient v err msg => AgentClient -> TransportSession msg -> EntityId -> ByteString -> (Client msg -> AM a) -> AM a
withLogClient_ c tSess@(_, srv, _) entId cmdStr action = do
logServer "-->" c srv entId cmdStr
@@ -903,22 +1003,46 @@ withLogClient_ c tSess@(_, srv, _) entId cmdStr action = do
return res
withClient :: forall v err msg a. ProtocolServerClient v err msg => AgentClient -> TransportSession msg -> ByteString -> (Client msg -> ExceptT (ProtocolClientError err) IO a) -> AM a
withClient c tSess statKey action = withClient_ c tSess statKey $ \client -> liftClient (clientProtocolError @v @err @msg) (clientServer client) $ action client
withClient c tSess statKey action = withClient_ c tSess statKey $ \client -> liftClient (clientProtocolError @v @err @msg) (clientServer $ protocolClient client) $ action client
{-# INLINE withClient #-}
withLogClient :: forall v err msg a. ProtocolServerClient v err msg => AgentClient -> TransportSession msg -> EntityId -> ByteString -> (Client msg -> ExceptT (ProtocolClientError err) IO a) -> AM a
withLogClient c tSess entId cmdStr action = withLogClient_ c tSess entId cmdStr $ \client -> liftClient (clientProtocolError @v @err @msg) (clientServer client) $ action client
withLogClient c tSess entId cmdStr action = withLogClient_ c tSess entId cmdStr $ \client -> liftClient (clientProtocolError @v @err @msg) (clientServer $ protocolClient client) $ action client
{-# INLINE withLogClient #-}
withSMPClient :: SMPQueueRec q => AgentClient -> q -> ByteString -> (SMPClient -> ExceptT SMPClientError IO a) -> AM a
withSMPClient c q cmdStr action = do
tSess <- liftIO $ mkSMPTransportSession c q
withLogClient c tSess (queueId q) cmdStr action
withLogClient c tSess (queueId q) cmdStr $ action . connectedClient
withSMPClient_ :: SMPQueueRec q => AgentClient -> q -> ByteString -> (SMPClient -> AM a) -> AM a
withSMPClient_ c q cmdStr action = do
tSess <- liftIO $ mkSMPTransportSession c q
withLogClient_ c tSess (queueId q) cmdStr action
sendOrProxySMPMessage :: AgentClient -> UserId -> SMPServer -> ByteString -> Maybe SMP.SndPrivateAuthKey -> SMP.SenderId -> MsgFlags -> SMP.MsgBody -> AM (Maybe SMPServer)
sendOrProxySMPMessage c userId destSrv cmdStr spKey_ senderId msgFlags msg = do
sess <- liftIO $ mkTransportSession c userId destSrv senderId
ifM (atomically shouldUseProxy) (sendViaProxy sess) (sendDirectly sess $> Nothing)
where
shouldUseProxy = do
cfg <- getNetworkConfig c
case smpProxyMode cfg of
SPMAlways -> pure True
SPMUnknown -> unknownServer
SPMUnprotected
| ipAddressProtected cfg destSrv -> pure False
| otherwise -> unknownServer
SPMNever -> pure False
unknownServer = maybe True (all ((destSrv /=) . protoServer)) <$> TM.lookup userId (userServers c)
sendViaProxy destSess =
withProxySession c destSess senderId ("PFWD " <> cmdStr) $ \(SMPConnectedClient smp _, proxySess) -> do
liftClient SMP (clientServer smp) $ proxySMPMessage smp proxySess spKey_ senderId msgFlags msg
pure . Just $ protocolClientServer' smp
sendDirectly tSess =
withLogClient_ c tSess senderId ("SEND " <> cmdStr) $ \(SMPConnectedClient smp _) ->
liftClient SMP (clientServer smp) $ sendSMPMessage smp spKey_ senderId msgFlags msg
ipAddressProtected :: NetworkConfig -> ProtocolServer p -> Bool
ipAddressProtected NetworkConfig {socksProxy, hostMode} (ProtocolServer _ hosts _ _) = do
isJust socksProxy || (hostMode == HMOnion && any isOnionHost hosts)
where
isOnionHost = \case THOnionHost _ -> True; _ -> False
withNtfClient :: AgentClient -> NtfServer -> EntityId -> ByteString -> (NtfClient -> ExceptT NtfClientError IO a) -> AM a
withNtfClient c srv = withLogClient c (0, srv, Nothing)
@@ -989,7 +1113,6 @@ runSMPServerTest c userId (ProtoServerWithAuth srv auth) = do
liftError (testErr TSSecureQueue) $ secureSMPQueue smp rpKey rcvId sKey
liftError (testErr TSDeleteQueue) $ deleteSMPQueue smp rpKey rcvId
ok <- tcpTimeout (networkConfig cfg) `timeout` closeProtocolClient smp
incClientStat c userId smp "SMP_TEST" "OK"
pure $ either Just (const Nothing) r <|> maybe (Just (ProtocolTestFailure TSDisconnect $ BROKER addr TIMEOUT)) (const Nothing) ok
Left e -> pure (Just $ testErr TSConnect e)
where
@@ -1104,7 +1227,7 @@ newRcvQueue c userId connId (ProtoServerWithAuth srv auth) vRange subMode = do
logServer "-->" c srv "" "NEW"
tSess <- liftIO $ mkTransportSession c userId srv connId
QIK {rcvId, sndId, rcvPublicDhKey} <-
withClient c tSess "NEW" $ \smp -> createSMPQueue smp rKeys dhKey auth subMode
withClient c tSess "NEW" $ \smp -> createSMPQueue (connectedClient smp) rKeys dhKey auth subMode
liftIO . logServer "<--" c srv "" $ B.unwords ["IDS", logSecret rcvId, logSecret sndId]
let rq =
RcvQueue
@@ -1193,7 +1316,7 @@ sendTSessionBatches statCmd statBatchSize toRQ action c qs =
sendClientBatch (tSess@(userId, srv, _), qs') =
tryAgentError' (getSMPServerClient c tSess) >>= \case
Left e -> pure $ L.map ((,Left e) . toRQ) qs'
Right smp -> liftIO $ do
Right (SMPConnectedClient smp _) -> liftIO $ do
logServer "-->" c srv (bshow (length qs') <> " queues") statCmd
rs <- L.map agentError <$> action smp qs'
statBatch
@@ -1243,20 +1366,17 @@ logSecret :: ByteString -> ByteString
logSecret bs = encode $ B.take 3 bs
{-# INLINE logSecret #-}
sendConfirmation :: AgentClient -> SndQueue -> ByteString -> AM ()
sendConfirmation c sq@SndQueue {sndId, sndPublicKey = Just sndPublicKey, e2ePubKey = e2ePubKey@Just {}} agentConfirmation =
withSMPClient_ c sq "SEND <CONF>" $ \smp -> do
let clientMsg = SMP.ClientMessage (SMP.PHConfirmation sndPublicKey) agentConfirmation
msg <- agentCbEncrypt sq e2ePubKey $ smpEncode clientMsg
liftClient SMP (clientServer smp) $ sendSMPMessage smp Nothing sndId (SMP.MsgFlags {notification = True}) msg
sendConfirmation :: AgentClient -> SndQueue -> ByteString -> AM (Maybe SMPServer)
sendConfirmation c sq@SndQueue {userId, server, sndId, sndPublicKey = Just sndPublicKey, e2ePubKey = e2ePubKey@Just {}} agentConfirmation = do
let clientMsg = SMP.ClientMessage (SMP.PHConfirmation sndPublicKey) agentConfirmation
msg <- agentCbEncrypt sq e2ePubKey $ smpEncode clientMsg
sendOrProxySMPMessage c userId server "<CONF>" Nothing sndId (MsgFlags {notification = True}) msg
sendConfirmation _ _ _ = throwError $ INTERNAL "sendConfirmation called without snd_queue public key(s) in the database"
sendInvitation :: AgentClient -> UserId -> Compatible SMPQueueInfo -> Compatible VersionSMPA -> ConnectionRequestUri 'CMInvitation -> ConnInfo -> AM ()
sendInvitation :: AgentClient -> UserId -> Compatible SMPQueueInfo -> Compatible VersionSMPA -> ConnectionRequestUri 'CMInvitation -> ConnInfo -> AM (Maybe SMPServer)
sendInvitation c userId (Compatible (SMPQueueInfo v SMPQueueAddress {smpServer, senderId, dhPublicKey})) (Compatible agentVersion) connReq connInfo = do
tSess <- liftIO $ mkTransportSession c userId smpServer senderId
withLogClient_ c tSess senderId "SEND <INV>" $ \smp -> do
msg <- mkInvitation
liftClient SMP (clientServer smp) $ sendSMPMessage smp Nothing senderId MsgFlags {notification = True} msg
msg <- mkInvitation
sendOrProxySMPMessage c userId smpServer "<INV>" Nothing senderId (MsgFlags {notification = True}) msg
where
mkInvitation :: AM ByteString
-- this is only encrypted with per-queue E2E, not with double ratchet
@@ -1340,12 +1460,11 @@ deleteQueue c rq@RcvQueue {rcvId, rcvPrivateKey} = do
deleteQueues :: AgentClient -> [RcvQueue] -> AM' [(RcvQueue, Either AgentErrorType ())]
deleteQueues = sendTSessionBatches "DEL" 90 id $ sendBatch deleteSMPQueues
sendAgentMessage :: AgentClient -> SndQueue -> MsgFlags -> ByteString -> AM ()
sendAgentMessage c sq@SndQueue {sndId, sndPrivateKey} msgFlags agentMsg =
withSMPClient_ c sq "SEND <MSG>" $ \smp -> do
let clientMsg = SMP.ClientMessage SMP.PHEmpty agentMsg
msg <- agentCbEncrypt sq Nothing $ smpEncode clientMsg
liftClient SMP (clientServer smp) $ sendSMPMessage smp (Just sndPrivateKey) sndId msgFlags msg
sendAgentMessage :: AgentClient -> SndQueue -> MsgFlags -> ByteString -> AM (Maybe SMPServer)
sendAgentMessage c sq@SndQueue {userId, server, sndId, sndPrivateKey} msgFlags agentMsg = do
let clientMsg = SMP.ClientMessage SMP.PHEmpty agentMsg
msg <- agentCbEncrypt sq Nothing $ smpEncode clientMsg
sendOrProxySMPMessage c userId server "<MSG>" (Just sndPrivateKey) sndId msgFlags msg
agentNtfRegisterToken :: AgentClient -> NtfToken -> NtfPublicAuthKey -> C.PublicKeyX25519 -> AM (NtfTokenId, C.PublicKeyX25519)
agentNtfRegisterToken c NtfToken {deviceToken, ntfServer, ntfPrivKey} ntfPubKey pubDhKey =
@@ -1606,9 +1725,13 @@ incStat AgentClient {agentStats} n k = do
_ -> newTVar n >>= \v -> TM.insert k v agentStats
incClientStat :: ProtocolServerClient v err msg => AgentClient -> UserId -> Client msg -> ByteString -> ByteString -> IO ()
incClientStat c userId pc = incClientStatN c userId pc 1
incClientStat c userId = incClientStat' c userId . protocolClient
{-# INLINE incClientStat #-}
incClientStat' :: ProtocolServerClient v err msg => AgentClient -> UserId -> ProtoClient msg -> ByteString -> ByteString -> IO ()
incClientStat' c userId pc = incClientStatN c userId pc 1
{-# INLINE incClientStat' #-}
incServerStat :: AgentClient -> UserId -> ProtocolServer p -> ByteString -> ByteString -> IO ()
incServerStat c userId ProtocolServer {host} cmd res = do
threadDelay 100000
@@ -1616,7 +1739,7 @@ incServerStat c userId ProtocolServer {host} cmd res = do
where
statsKey = AgentStatsKey {userId, host = strEncode $ L.head host, clientTs = "", cmd, res}
incClientStatN :: ProtocolServerClient v err msg => AgentClient -> UserId -> Client msg -> Int -> ByteString -> ByteString -> IO ()
incClientStatN :: ProtocolServerClient v err msg => AgentClient -> UserId -> ProtoClient msg -> Int -> ByteString -> ByteString -> IO ()
incClientStatN c userId pc n cmd res = do
atomically $ incStat c n statsKey
where
+14 -14
View File
@@ -193,13 +193,13 @@ import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Crypto.Ratchet
( InitialKeys (..),
PQEncryption (..),
pattern PQEncOff,
PQSupport,
pattern PQSupportOn,
pattern PQSupportOff,
RcvE2ERatchetParams,
RcvE2ERatchetParamsUri,
SndE2ERatchetParams
SndE2ERatchetParams,
pattern PQEncOff,
pattern PQSupportOff,
pattern PQSupportOn,
)
import Simplex.Messaging.Encoding
import Simplex.Messaging.Encoding.String
@@ -213,14 +213,14 @@ import Simplex.Messaging.Protocol
MsgId,
NMsgMeta,
ProtocolServer (..),
SMPClientVersion,
SMPMsgMeta,
SMPServer,
SMPServerWithAuth,
SndPublicAuthKey,
SubscriptionMode,
SMPClientVersion,
VersionSMPC,
VersionRangeSMPC,
VersionSMPC,
initialSMPClientVersion,
legacyEncodeServer,
legacyServerP,
@@ -398,7 +398,7 @@ data ACommand (p :: AParty) (e :: AEntity) where
RSYNC :: RatchetSyncState -> Maybe AgentCryptoError -> ConnectionStats -> ACommand Agent AEConn
SEND :: PQEncryption -> MsgFlags -> MsgBody -> ACommand Client AEConn
MID :: AgentMsgId -> PQEncryption -> ACommand Agent AEConn
SENT :: AgentMsgId -> ACommand Agent AEConn
SENT :: AgentMsgId -> Maybe SMPServer -> ACommand Agent AEConn
MERR :: AgentMsgId -> AgentErrorType -> ACommand Agent AEConn
MERRS :: NonEmpty AgentMsgId -> AgentErrorType -> ACommand Agent AEConn
MSG :: MsgMeta -> MsgFlags -> MsgBody -> ACommand Agent AEConn
@@ -517,7 +517,7 @@ aCommandTag = \case
RSYNC {} -> RSYNC_
SEND {} -> SEND_
MID {} -> MID_
SENT _ -> SENT_
SENT {} -> SENT_
MERR {} -> MERR_
MERRS {} -> MERRS_
MSG {} -> MSG_
@@ -913,7 +913,7 @@ instance Encoding AgentMsgEnvelope where
-- AgentRatchetInfo is not encrypted with double ratchet, but with per-queue E2E encryption
data AgentMessage
= -- used by the initiating party when confirming reply queue
AgentConnInfo ConnInfo
AgentConnInfo ConnInfo
| -- AgentConnInfoReply is used by accepting party in duplexHandshake mode (v2), allowing to include reply queue(s) in the initial confirmation.
-- It made removed REPLY message unnecessary.
AgentConnInfoReply (NonEmpty SMPQueueInfo) ConnInfo
@@ -1387,9 +1387,9 @@ deriving instance Show (ConnectionRequestUri m)
data AConnectionRequestUri = forall m. ConnectionModeI m => ACR (SConnectionMode m) (ConnectionRequestUri m)
instance Eq AConnectionRequestUri where
ACR m cr == ACR m' cr' = case testEquality m m' of
Just Refl -> cr == cr'
_ -> False
ACR m cr == ACR m' cr' = case testEquality m m' of
Just Refl -> cr == cr'
_ -> False
deriving instance Show AConnectionRequestUri
@@ -1793,7 +1793,7 @@ commandP binaryP =
SWITCH_ -> s (SWITCH <$> strP_ <*> strP_ <*> strP)
RSYNC_ -> s (RSYNC <$> strP_ <*> strP <*> strP)
MID_ -> s (MID <$> A.decimal <*> _strP)
SENT_ -> s (SENT <$> A.decimal)
SENT_ -> s (SENT <$> A.decimal <*> _strP)
MERR_ -> s (MERR <$> A.decimal <* A.space <*> strP)
MERRS_ -> s (MERRS <$> strP_ <*> strP)
MSG_ -> s (MSG <$> strP <* A.space <*> smpP <* A.space <*> binaryP)
@@ -1856,7 +1856,7 @@ serializeCommand = \case
RSYNC rrState cryptoErr cstats -> s (RSYNC_, rrState, cryptoErr, cstats)
SEND pqEnc msgFlags msgBody -> B.unwords [s SEND_, s pqEnc, smpEncode msgFlags, serializeBinary msgBody]
MID mId pqEnc -> s (MID_, mId, pqEnc)
SENT mId -> s (SENT_, mId)
SENT mId proxySrv_ -> s (SENT_, mId, proxySrv_)
MERR mId e -> s (MERR_, mId, e)
MERRS mIds e -> s (MERRS_, mIds, e)
MSG msgMeta msgFlags msgBody -> B.unwords [s MSG_, s msgMeta, smpEncode msgFlags, serializeBinary msgBody]
+35 -11
View File
@@ -30,9 +30,11 @@ module Simplex.Messaging.Client
TransportSession,
ProtocolClient (thParams, sessionTs),
SMPClient,
ProxiedRelay (..),
getProtocolClient,
closeProtocolClient,
protocolClientServer,
protocolClientServer',
transportHost',
transportSession',
@@ -54,7 +56,7 @@ module Simplex.Messaging.Client
suspendSMPQueue,
deleteSMPQueue,
deleteSMPQueues,
createSMPProxySession,
connectSMPProxiedRelay,
proxySMPMessage,
forwardSMPMessage,
sendProtocolCommand,
@@ -65,6 +67,8 @@ module Simplex.Messaging.Client
ProtocolClientConfig (..),
NetworkConfig (..),
TransportSessionMode (..),
HostMode (..),
SMPProxyMode (..),
defaultClientConfig,
defaultSMPClientConfig,
defaultNetworkConfig,
@@ -207,6 +211,8 @@ data NetworkConfig = NetworkConfig
requiredHostMode :: Bool,
-- | transport sessions are created per user or per entity
sessionMode :: TransportSessionMode,
-- | SMP proxy mode
smpProxyMode :: SMPProxyMode,
-- | timeout for the initial client TCP/TLS connection (microseconds)
tcpConnectTimeout :: Int,
-- | timeout of protocol commands (microseconds)
@@ -226,6 +232,14 @@ data NetworkConfig = NetworkConfig
data TransportSessionMode = TSMUser | TSMEntity
deriving (Eq, Show)
-- SMP proxy mode for sending messages
data SMPProxyMode
= SPMAlways
| SPMUnknown -- use with unknown relays
| SPMUnprotected -- use with unknown relays when IP address is not protected (i.e., when neither SOCKS proxy nor .onion address is used)
| SPMNever
deriving (Eq, Show)
defaultNetworkConfig :: NetworkConfig
defaultNetworkConfig =
NetworkConfig
@@ -233,6 +247,7 @@ defaultNetworkConfig =
hostMode = HMOnionViaSocks,
requiredHostMode = False,
sessionMode = TSMUser,
smpProxyMode = SPMNever,
tcpConnectTimeout = 20_000_000,
tcpTimeout = 15_000_000,
tcpTimeoutPerKb = 5_000,
@@ -302,10 +317,14 @@ chooseTransportHost NetworkConfig {socksProxy, hostMode, requiredHostMode} hosts
publicHost = find (not . isOnionHost) hosts
protocolClientServer :: ProtocolTypeI (ProtoType msg) => ProtocolClient v err msg -> String
protocolClientServer = B.unpack . strEncode . snd3 . transportSession . client_
protocolClientServer = B.unpack . strEncode . protocolClientServer'
{-# INLINE protocolClientServer #-}
protocolClientServer' :: ProtocolClient v err msg -> ProtoServer msg
protocolClientServer' = snd3 . transportSession . client_
where
snd3 (_, s, _) = s
{-# INLINE protocolClientServer #-}
{-# INLINE protocolClientServer' #-}
transportHost' :: ProtocolClient v err msg -> TransportHost
transportHost' = transportHost . client_
@@ -650,14 +669,13 @@ deleteSMPQueues = okSMPCommands DEL
-- send PRXY :: SMPServer -> Maybe BasicAuth -> Command Sender
-- receives PKEY :: SessionId -> X.CertificateChain -> X.SignedExact X.PubKey -> BrokerMsg
createSMPProxySession :: SMPClient -> SMPServer -> Maybe BasicAuth -> ExceptT SMPClientError IO (SessionId, VersionSMP, C.PublicKeyX25519)
createSMPProxySession c relayServ@ProtocolServer {keyHash = C.KeyHash kh} proxyAuth =
connectSMPProxiedRelay :: SMPClient -> SMPServer -> Maybe BasicAuth -> ExceptT SMPClientError IO ProxiedRelay
connectSMPProxiedRelay c relayServ@ProtocolServer {keyHash = C.KeyHash kh} proxyAuth =
sendSMPCommand c Nothing "" (PRXY relayServ proxyAuth) >>= \case
-- XXX: rfc says sessionId should be in the entityId of response
PKEY sId vr (chain, key) -> do
case supportedClientSMPRelayVRange `compatibleVersion` vr of
Nothing -> throwE PCEIncompatibleHost -- TODO different error
Just (Compatible v) -> liftEitherWith x509Error $ (sId,v,) <$> validateRelay chain key
Just (Compatible v) -> liftEitherWith x509Error $ ProxiedRelay sId v <$> validateRelay chain key
r -> throwE . PCEUnexpectedResponse $ bshow r
where
x509Error :: String -> SMPClientError
@@ -672,6 +690,12 @@ createSMPProxySession c relayServ@ProtocolServer {keyHash = C.KeyHash kh} proxyA
pubKey <- C.verifyX509 serverKey exact
C.x509ToPublic (pubKey, []) >>= C.pubKey
data ProxiedRelay = ProxiedRelay
{ prSessionId :: SessionId,
prVersion :: VersionSMP,
prServerKey :: C.PublicKeyX25519
}
-- consider how to process slow responses - is it handled somehow locally or delegated to the caller
-- this method is used in the client
-- sends PFWD :: C.PublicKeyX25519 -> EncTransmission -> Command Sender
@@ -679,9 +703,7 @@ createSMPProxySession c relayServ@ProtocolServer {keyHash = C.KeyHash kh} proxyA
proxySMPMessage ::
SMPClient ->
-- proxy session from PKEY
SessionId ->
VersionSMP ->
C.PublicKeyX25519 ->
ProxiedRelay ->
-- message to deliver
Maybe SndPrivateAuthKey ->
SenderId ->
@@ -689,7 +711,7 @@ proxySMPMessage ::
MsgBody ->
ExceptT SMPClientError IO ()
-- TODO use version
proxySMPMessage c@ProtocolClient {thParams = proxyThParams, client_ = PClient {clientCorrId = g}} sessionId _v serverKey spKey sId flags msg = do
proxySMPMessage c@ProtocolClient {thParams = proxyThParams, client_ = PClient {clientCorrId = g}} (ProxiedRelay sessionId _v serverKey) spKey sId flags msg = do
-- prepare params
let serverThAuth = (\ta -> ta {serverPeerPubKey = serverKey}) <$> thAuth proxyThParams
serverThParams = proxyThParams {sessionId, thAuth = serverThAuth}
@@ -867,4 +889,6 @@ $(J.deriveJSON (enumJSON $ dropPrefix "HM") ''HostMode)
$(J.deriveJSON (enumJSON $ dropPrefix "TSM") ''TransportSessionMode)
$(J.deriveJSON (enumJSON $ dropPrefix "SPM") ''SMPProxyMode)
$(J.deriveJSON defaultJSON ''NetworkConfig)
+4 -4
View File
@@ -13,7 +13,7 @@ module AgentTests (agentTests) where
import AgentTests.ConnectionRequestTests
import AgentTests.DoubleRatchetTests (doubleRatchetTests)
import AgentTests.FunctionalAPITests (functionalAPITests, inAnyOrder, pattern Msg, pattern Msg')
import AgentTests.FunctionalAPITests (functionalAPITests, inAnyOrder, pattern Msg, pattern Msg', pattern SENT)
import AgentTests.MigrationTests (migrationTests)
import AgentTests.NotificationTests (notificationTests)
import AgentTests.SQLiteTests (storeTests)
@@ -27,7 +27,7 @@ import GHC.Stack (withFrozenCallStack)
import Network.HTTP.Types (urlEncode)
import SMPAgentClient
import SMPClient (testKeyHash, testPort, testPort2, testStoreLogFile, withSmpServer, withSmpServerStoreLogOn)
import Simplex.Messaging.Agent.Protocol hiding (MID, CONF, INFO, REQ)
import Simplex.Messaging.Agent.Protocol hiding (MID, CONF, INFO, REQ, SENT)
import qualified Simplex.Messaging.Agent.Protocol as A
import Simplex.Messaging.Crypto.Ratchet (InitialKeys (..), PQEncryption (..), PQSupport (..), pattern IKPQOn, pattern IKPQOff, pattern PQEncOn, pattern PQSupportOn, pattern PQSupportOff)
import qualified Simplex.Messaging.Crypto.Ratchet as CR
@@ -437,8 +437,8 @@ testServerConnectionAfterError t _ = do
bob #: ("1", "alice", "SUB") =#> \("1", "alice", ERR (BROKER _ e)) -> e == NETWORK || e == TIMEOUT
alice #: ("1", "bob", "SUB") =#> \("1", "bob", ERR (BROKER _ e)) -> e == NETWORK || e == TIMEOUT
withServer $ do
alice <#=? \case ("", "bob", APC _ (SENT 4)) -> True; ("", "", APC _ (UP s ["bob"])) -> s == server; _ -> False
alice <#=? \case ("", "bob", APC _ (SENT 4)) -> True; ("", "", APC _ (UP s ["bob"])) -> s == server; _ -> False
alice <#=? \case ("", "bob", APC SAEConn (SENT 4)) -> True; ("", "", APC _ (UP s ["bob"])) -> s == server; _ -> False
alice <#=? \case ("", "bob", APC SAEConn (SENT 4)) -> True; ("", "", APC _ (UP s ["bob"])) -> s == server; _ -> False
bob <#=? \case ("", "alice", APC _ (Msg "hello")) -> True; ("", "", APC _ (UP s ["alice"])) -> s == server; _ -> False
bob <#=? \case ("", "alice", APC _ (Msg "hello")) -> True; ("", "", APC _ (UP s ["alice"])) -> s == server; _ -> False
bob #: ("2", "alice", "ACK 4") #> ("2", "alice", OK)
+49 -37
View File
@@ -45,6 +45,7 @@ module AgentTests.FunctionalAPITests
pattern REQ,
pattern Msg,
pattern Msg',
pattern SENT,
agentCfgV7,
)
where
@@ -70,17 +71,17 @@ import Data.Word (Word16)
import qualified Database.SQLite.Simple as SQL
import GHC.Stack (withFrozenCallStack)
import SMPAgentClient
import SMPClient (cfg, testPort, testPort2, testStoreLogFile2, withSmpServer, withSmpServerConfigOn, withSmpServerOn, withSmpServerStoreLogOn, withSmpServerStoreMsgLogOn, withSmpServerV7)
import SMPClient (cfg, testPort, testPort2, testStoreLogFile2, withSmpServer, withSmpServerConfigOn, withSmpServerOn, withSmpServerProxy, withSmpServerStoreLogOn, withSmpServerStoreMsgLogOn, withSmpServerV7)
import Simplex.Messaging.Agent hiding (createConnection, joinConnection, sendMessage)
import qualified Simplex.Messaging.Agent as A
import Simplex.Messaging.Agent.Client (ProtocolTestFailure (..), ProtocolTestStep (..), UserNetworkInfo (..), UserNetworkType (..), waitForUserNetwork)
import Simplex.Messaging.Agent.Env.SQLite (AgentConfig (..), InitialAgentServers (..), createAgentStore)
import Simplex.Messaging.Agent.Protocol hiding (CON, CONF, INFO, REQ)
import Simplex.Messaging.Agent.Protocol hiding (CON, CONF, INFO, REQ, SENT)
import qualified Simplex.Messaging.Agent.Protocol as A
import Simplex.Messaging.Agent.RetryInterval (RetryInterval (..))
import Simplex.Messaging.Agent.Store.SQLite (MigrationConfirmation (..), SQLiteStore (dbNew))
import Simplex.Messaging.Agent.Store.SQLite.Common (withTransaction')
import Simplex.Messaging.Client (NetworkConfig (..), ProtocolClientConfig (..), TransportSessionMode (TSMEntity, TSMUser), defaultSMPClientConfig)
import Simplex.Messaging.Client (NetworkConfig (..), ProtocolClientConfig (..), SMPProxyMode (..), TransportSessionMode (TSMEntity, TSMUser), defaultSMPClientConfig)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Crypto.Ratchet (InitialKeys (..), PQEncryption (..), PQSupport (..), pattern PQEncOff, pattern PQEncOn, pattern PQSupportOff, pattern PQSupportOn)
import qualified Simplex.Messaging.Crypto.Ratchet as CR
@@ -171,6 +172,9 @@ pattern MsgErr msgId err msgBody <- MSG MsgMeta {recipient = (msgId, _), integri
pattern MsgErr' :: AgentMsgId -> MsgErrorType -> PQEncryption -> MsgBody -> ACommand 'Agent 'AEConn
pattern MsgErr' msgId err pq msgBody <- MSG MsgMeta {recipient = (msgId, _), integrity = MsgError err, pqEncryption = pq} _ msgBody
pattern SENT :: AgentMsgId -> ACommand 'Agent 'AEConn
pattern SENT msgId = A.SENT msgId Nothing
pattern Rcvd :: AgentMsgId -> ACommand 'Agent 'AEConn
pattern Rcvd agentMsgId <- RCVD MsgMeta {integrity = MsgOk} [MsgReceipt {agentMsgId, msgRcptStatus = MROk}]
@@ -448,26 +452,28 @@ canCreateQueue allowNew (srvAuth, srvVersion) (clntAuth, clntVersion) =
let v = basicAuthSMPVersion
in allowNew && (isNothing srvAuth || (srvVersion >= v && clntVersion >= v && srvAuth == clntAuth))
testMatrix2 :: ATransport -> (PQSupport -> AgentClient -> AgentClient -> AgentMsgId -> IO ()) -> Spec
testMatrix2 :: ATransport -> (PQSupport -> Bool -> AgentClient -> AgentClient -> AgentMsgId -> IO ()) -> Spec
testMatrix2 t runTest = do
it "v7" $ withSmpServerV7 t $ runTestCfg2 agentCfgV7 agentCfgV7 3 $ runTest PQSupportOn
it "v7 to current" $ withSmpServerV7 t $ runTestCfg2 agentCfgV7 agentCfg 3 $ runTest PQSupportOn
it "current to v7" $ withSmpServerV7 t $ runTestCfg2 agentCfg agentCfgV7 3 $ runTest PQSupportOn
it "current with v7 server" $ withSmpServerV7 t $ runTestCfg2 agentCfg agentCfg 3 $ runTest PQSupportOn
it "current" $ withSmpServer t $ runTestCfg2 agentCfg agentCfg 3 $ runTest PQSupportOn
it "prev" $ withSmpServer t $ runTestCfg2 agentCfgVPrev agentCfgVPrev 3 $ runTest PQSupportOff
it "prev to current" $ withSmpServer t $ runTestCfg2 agentCfgVPrev agentCfg 3 $ runTest PQSupportOff
it "current to prev" $ withSmpServer t $ runTestCfg2 agentCfg agentCfgVPrev 3 $ runTest PQSupportOff
it "v8, via proxy" $ withSmpServerProxy t $ runTestCfgServers2 agentProxyCfg agentProxyCfg (initAgentServersProxy SPMAlways) 3 $ runTest PQSupportOn True
it "v7" $ withSmpServerV7 t $ runTestCfg2 agentCfgV7 agentCfgV7 3 $ runTest PQSupportOn False
it "v7 to current" $ withSmpServerV7 t $ runTestCfg2 agentCfgV7 agentCfg 3 $ runTest PQSupportOn False
it "current to v7" $ withSmpServerV7 t $ runTestCfg2 agentCfg agentCfgV7 3 $ runTest PQSupportOn False
it "current with v7 server" $ withSmpServerV7 t $ runTestCfg2 agentCfg agentCfg 3 $ runTest PQSupportOn False
it "current" $ withSmpServer t $ runTestCfg2 agentCfg agentCfg 3 $ runTest PQSupportOn False
it "prev" $ withSmpServer t $ runTestCfg2 agentCfgVPrev agentCfgVPrev 3 $ runTest PQSupportOff False
it "prev to current" $ withSmpServer t $ runTestCfg2 agentCfgVPrev agentCfg 3 $ runTest PQSupportOff False
it "current to prev" $ withSmpServer t $ runTestCfg2 agentCfg agentCfgVPrev 3 $ runTest PQSupportOff False
testRatchetMatrix2 :: ATransport -> (PQSupport -> AgentClient -> AgentClient -> AgentMsgId -> IO ()) -> Spec
testRatchetMatrix2 :: ATransport -> (PQSupport -> Bool -> AgentClient -> AgentClient -> AgentMsgId -> IO ()) -> Spec
testRatchetMatrix2 t runTest = do
it "ratchet next" $ withSmpServerV7 t $ runTestCfg2 agentCfgV7 agentCfgV7 3 $ runTest PQSupportOn
it "ratchet next to current" $ withSmpServerV7 t $ runTestCfg2 agentCfgV7 agentCfg 3 $ runTest PQSupportOn
it "ratchet current to next" $ withSmpServerV7 t $ runTestCfg2 agentCfg agentCfgV7 3 $ runTest PQSupportOn
it "ratchet current" $ withSmpServer t $ runTestCfg2 agentCfg agentCfg 3 $ runTest PQSupportOn
it "ratchet prev" $ withSmpServer t $ runTestCfg2 agentCfgRatchetVPrev agentCfgRatchetVPrev 3 $ runTest PQSupportOff
it "ratchets prev to current" $ withSmpServer t $ runTestCfg2 agentCfgRatchetVPrev agentCfg 3 $ runTest PQSupportOff
it "ratchets current to prev" $ withSmpServer t $ runTestCfg2 agentCfg agentCfgRatchetVPrev 3 $ runTest PQSupportOff
it "v8, via proxy" $ withSmpServerProxy t $ runTestCfgServers2 agentProxyCfg agentProxyCfg (initAgentServersProxy SPMAlways) 3 $ runTest PQSupportOn True
it "ratchet next" $ withSmpServerV7 t $ runTestCfg2 agentCfgV7 agentCfgV7 3 $ runTest PQSupportOn False
it "ratchet next to current" $ withSmpServerV7 t $ runTestCfg2 agentCfgV7 agentCfg 3 $ runTest PQSupportOn False
it "ratchet current to next" $ withSmpServerV7 t $ runTestCfg2 agentCfg agentCfgV7 3 $ runTest PQSupportOn False
it "ratchet current" $ withSmpServer t $ runTestCfg2 agentCfg agentCfg 3 $ runTest PQSupportOn False
it "ratchet prev" $ withSmpServer t $ runTestCfg2 agentCfgRatchetVPrev agentCfgRatchetVPrev 3 $ runTest PQSupportOff False
it "ratchets prev to current" $ withSmpServer t $ runTestCfg2 agentCfgRatchetVPrev agentCfg 3 $ runTest PQSupportOff False
it "ratchets current to prev" $ withSmpServer t $ runTestCfg2 agentCfg agentCfgRatchetVPrev 3 $ runTest PQSupportOff False
testServerMatrix2 :: ATransport -> (InitialAgentServers -> IO ()) -> Spec
testServerMatrix2 t runTest = do
@@ -475,10 +481,14 @@ testServerMatrix2 t runTest = do
it "2 servers" $ withSmpServer t . withSmpServerOn t testPort2 $ runTest initAgentServers2
runTestCfg2 :: HasCallStack => AgentConfig -> AgentConfig -> AgentMsgId -> (HasCallStack => AgentClient -> AgentClient -> AgentMsgId -> IO ()) -> IO ()
runTestCfg2 aCfg bCfg baseMsgId runTest =
withAgentClientsCfg2 aCfg bCfg $ \a b -> runTest a b baseMsgId
runTestCfg2 aCfg bCfg = runTestCfgServers2 aCfg bCfg initAgentServers
{-# INLINE runTestCfg2 #-}
runTestCfgServers2 :: HasCallStack => AgentConfig -> AgentConfig -> InitialAgentServers -> AgentMsgId -> (HasCallStack => AgentClient -> AgentClient -> AgentMsgId -> IO ()) -> IO ()
runTestCfgServers2 aCfg bCfg servers baseMsgId runTest =
withAgentClientsCfgServers2 aCfg bCfg servers $ \a b -> runTest a b baseMsgId
{-# INLINE runTestCfgServers2 #-}
withAgentClientsCfgServers2 :: HasCallStack => AgentConfig -> AgentConfig -> InitialAgentServers -> (HasCallStack => AgentClient -> AgentClient -> IO ()) -> IO ()
withAgentClientsCfgServers2 aCfg bCfg servers runTest =
withAgent 1 aCfg servers testDB $ \a ->
@@ -499,8 +509,8 @@ withAgentClients3 runTest =
withAgent 3 agentCfg initAgentServers testDB3 $ \c ->
runTest a b c
runAgentClientTest :: HasCallStack => PQSupport -> AgentClient -> AgentClient -> AgentMsgId -> IO ()
runAgentClientTest pqSupport alice@AgentClient {} bob baseId =
runAgentClientTest :: HasCallStack => PQSupport -> Bool -> AgentClient -> AgentClient -> AgentMsgId -> IO ()
runAgentClientTest pqSupport viaProxy alice@AgentClient {} bob baseId =
runRight_ $ do
(bobId, qInfo) <- A.createConnection alice 1 True SCMInvitation Nothing (IKNoPQ pqSupport) SMSubscribe
aliceId <- A.joinConnection bob 1 True qInfo "bob's connInfo" pqSupport SMSubscribe
@@ -512,18 +522,19 @@ runAgentClientTest pqSupport alice@AgentClient {} bob baseId =
get bob ##> ("", aliceId, A.INFO pqSupport "alice's connInfo")
get bob ##> ("", aliceId, A.CON pqEnc)
-- message IDs 1 to 3 (or 1 to 4 in v1) get assigned to control messages, so first MSG is assigned ID 4
let proxySrv = if viaProxy then Just testSMPServer else Nothing
1 <- msgId <$> A.sendMessage alice bobId pqEnc SMP.noMsgFlags "hello"
get alice ##> ("", bobId, SENT $ baseId + 1)
get alice ##> ("", bobId, A.SENT (baseId + 1) proxySrv)
2 <- msgId <$> A.sendMessage alice bobId pqEnc SMP.noMsgFlags "how are you?"
get alice ##> ("", bobId, SENT $ baseId + 2)
get alice ##> ("", bobId, A.SENT (baseId + 2) proxySrv)
get bob =##> \case ("", c, Msg' _ pq "hello") -> c == aliceId && pq == pqEnc; _ -> False
ackMessage bob aliceId (baseId + 1) Nothing
get bob =##> \case ("", c, Msg' _ pq "how are you?") -> c == aliceId && pq == pqEnc; _ -> False
ackMessage bob aliceId (baseId + 2) Nothing
3 <- msgId <$> A.sendMessage bob aliceId pqEnc SMP.noMsgFlags "hello too"
get bob ##> ("", aliceId, SENT $ baseId + 3)
get bob ##> ("", aliceId, A.SENT (baseId + 3) proxySrv)
4 <- msgId <$> A.sendMessage bob aliceId pqEnc SMP.noMsgFlags "message 1"
get bob ##> ("", aliceId, SENT $ baseId + 4)
get bob ##> ("", aliceId, A.SENT (baseId + 4) proxySrv)
get alice =##> \case ("", c, Msg' _ pq "hello too") -> c == bobId && pq == pqEnc; _ -> False
ackMessage alice bobId (baseId + 3) Nothing
get alice =##> \case ("", c, Msg' _ pq "message 1") -> c == bobId && pq == pqEnc; _ -> False
@@ -627,8 +638,8 @@ testAgentClient3 =
get c =##> \case ("", connId, Msg "c5") -> connId == aIdForC; _ -> False
ackMessage c aIdForC 5 Nothing
runAgentClientContactTest :: HasCallStack => PQSupport -> AgentClient -> AgentClient -> AgentMsgId -> IO ()
runAgentClientContactTest pqSupport alice bob baseId =
runAgentClientContactTest :: HasCallStack => PQSupport -> Bool -> AgentClient -> AgentClient -> AgentMsgId -> IO ()
runAgentClientContactTest pqSupport viaProxy alice bob baseId =
runRight_ $ do
(_, qInfo) <- A.createConnection alice 1 True SCMContact Nothing (IKNoPQ pqSupport) SMSubscribe
aliceId <- A.joinConnection bob 1 True qInfo "bob's connInfo" pqSupport SMSubscribe
@@ -643,18 +654,19 @@ runAgentClientContactTest pqSupport alice bob baseId =
get alice ##> ("", bobId, A.CON pqEnc)
get bob ##> ("", aliceId, A.CON pqEnc)
-- message IDs 1 to 3 (or 1 to 4 in v1) get assigned to control messages, so first MSG is assigned ID 4
let proxySrv = if viaProxy then Just testSMPServer else Nothing
1 <- msgId <$> A.sendMessage alice bobId pqEnc SMP.noMsgFlags "hello"
get alice ##> ("", bobId, SENT $ baseId + 1)
get alice ##> ("", bobId, A.SENT (baseId + 1) proxySrv)
2 <- msgId <$> A.sendMessage alice bobId pqEnc SMP.noMsgFlags "how are you?"
get alice ##> ("", bobId, SENT $ baseId + 2)
get alice ##> ("", bobId, A.SENT (baseId + 2) proxySrv)
get bob =##> \case ("", c, Msg' _ pq "hello") -> c == aliceId && pq == pqEnc; _ -> False
ackMessage bob aliceId (baseId + 1) Nothing
get bob =##> \case ("", c, Msg' _ pq "how are you?") -> c == aliceId && pq == pqEnc; _ -> False
ackMessage bob aliceId (baseId + 2) Nothing
3 <- msgId <$> A.sendMessage bob aliceId pqEnc SMP.noMsgFlags "hello too"
get bob ##> ("", aliceId, SENT $ baseId + 3)
get bob ##> ("", aliceId, A.SENT (baseId + 3) proxySrv)
4 <- msgId <$> A.sendMessage bob aliceId pqEnc SMP.noMsgFlags "message 1"
get bob ##> ("", aliceId, SENT $ baseId + 4)
get bob ##> ("", aliceId, A.SENT (baseId + 4) proxySrv)
get alice =##> \case ("", c, Msg' _ pq "hello too") -> c == bobId && pq == pqEnc; _ -> False
ackMessage alice bobId (baseId + 3) Nothing
get alice =##> \case ("", c, Msg' _ pq "message 1") -> c == bobId && pq == pqEnc; _ -> False
@@ -1493,9 +1505,9 @@ testSuspendingAgentCompleteSending t = withAgentClients2 $ \a b -> do
liftIO $ suspendAgent b 5000000
withSmpServerStoreLogOn t testPort $ \_ -> runRight_ @AgentErrorType $ do
pGet b =##> \case ("", c, APC _ (SENT 5)) -> c == aId; ("", "", APC _ UP {}) -> True; _ -> False
pGet b =##> \case ("", c, APC _ (SENT 5)) -> c == aId; ("", "", APC _ UP {}) -> True; _ -> False
pGet b =##> \case ("", c, APC _ (SENT 6)) -> c == aId; ("", "", APC _ UP {}) -> True; _ -> False
pGet b =##> \case ("", c, APC SAEConn (SENT 5)) -> c == aId; ("", "", APC _ UP {}) -> True; _ -> False
pGet b =##> \case ("", c, APC SAEConn (SENT 5)) -> c == aId; ("", "", APC _ UP {}) -> True; _ -> False
pGet b =##> \case ("", c, APC SAEConn (SENT 6)) -> c == aId; ("", "", APC _ UP {}) -> True; _ -> False
("", "", SUSPENDED) <- nGet b
pGet a =##> \case ("", c, APC _ (Msg "hello too")) -> c == bId; ("", "", APC _ UP {}) -> True; _ -> False
+11 -9
View File
@@ -6,8 +6,8 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE TypeApplications #-}
{-# OPTIONS_GHC -fno-warn-incomplete-uni-patterns #-}
{-# OPTIONS_GHC -fno-warn-ambiguous-fields #-}
{-# OPTIONS_GHC -fno-warn-incomplete-uni-patterns #-}
module AgentTests.NotificationTests where
@@ -17,10 +17,6 @@ import AgentTests.FunctionalAPITests
createConnection,
exchangeGreetingsMsgId,
get,
withAgent,
withAgentClients2,
withAgentClientsCfgServers2,
withAgentClients3,
joinConnection,
makeConnection,
nGet,
@@ -29,13 +25,18 @@ import AgentTests.FunctionalAPITests
sendMessage,
switchComplete,
testServerMatrix2,
withAgent,
withAgentClients2,
withAgentClients3,
withAgentClientsCfg2,
withAgentClientsCfgServers2,
(##>),
(=##>),
pattern CON,
pattern CONF,
pattern INFO,
pattern Msg,
pattern SENT,
)
import Control.Concurrent (ThreadId, killThread, threadDelay)
import Control.Monad
@@ -55,12 +56,12 @@ import SMPClient (cfg, cfgV7, testPort, testPort2, testStoreLogFile2, withSmpSer
import Simplex.Messaging.Agent hiding (createConnection, joinConnection, sendMessage)
import Simplex.Messaging.Agent.Client (ProtocolTestFailure (..), ProtocolTestStep (..), withStore')
import Simplex.Messaging.Agent.Env.SQLite (AgentConfig, Env (..), InitialAgentServers)
import Simplex.Messaging.Agent.Protocol hiding (CON, CONF, INFO)
import Simplex.Messaging.Agent.Protocol hiding (CON, CONF, INFO, SENT)
import Simplex.Messaging.Agent.Store.SQLite (getSavedNtfToken)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Notifications.Server.Env (NtfServerConfig (..))
import Simplex.Messaging.Notifications.Protocol
import Simplex.Messaging.Notifications.Server.Env (NtfServerConfig (..))
import Simplex.Messaging.Notifications.Server.Push.APNS
import Simplex.Messaging.Notifications.Types (NtfToken (..))
import Simplex.Messaging.Protocol (ErrorType (AUTH), MsgFlags (MsgFlags), NtfServer, ProtocolServer (..), SMPMsgMeta (..), SubscriptionMode (..))
@@ -151,7 +152,8 @@ testNtfMatrix t runTest = do
it "next servers: SMP v7, NTF v2; curr clients: v6/v1" $ runNtfTestCfg t cfgV7 ntfServerCfgV2 agentCfg agentCfg runTest
it "curr servers: SMP v6, NTF v1; curr clients: v6/v1" $ runNtfTestCfg t cfg ntfServerCfg agentCfg agentCfg runTest
skip "this case cannot be supported - see RFC" $
it "servers: SMP v6, NTF v1; clients: v7/v2 (not supported)" $ runNtfTestCfg t cfg ntfServerCfg agentCfgV7 agentCfgV7 runTest
it "servers: SMP v6, NTF v1; clients: v7/v2 (not supported)" $
runNtfTestCfg t cfg ntfServerCfg agentCfgV7 agentCfgV7 runTest
-- servers can be migrated in any order
it "servers: next SMP v7, curr NTF v1; curr clients: v6/v1" $ runNtfTestCfg t cfgV7 ntfServerCfg agentCfg agentCfg runTest
it "servers: curr SMP v6, next NTF v2; curr clients: v6/v1" $ runNtfTestCfg t cfg ntfServerCfgV2 agentCfg agentCfg runTest
@@ -258,7 +260,7 @@ testNtfTokenServerRestart t APNSMockServer {apnsQ} = do
atomically $ readTBQueue apnsQ
liftIO $ sendApnsResponse APNSRespOk
pure ntfData
-- the new agent is created as otherwise when running the tests in CI the old agent was keeping the connection to the server
-- the new agent is created as otherwise when running the tests in CI the old agent was keeping the connection to the server
threadDelay 1000000
withAgent 2 agentCfg initAgentServers testDB $ \a' ->
-- server stopped before token is verified, so now the attempt to verify it will return AUTH error but re-register token,
+9 -2
View File
@@ -20,7 +20,8 @@ import qualified Database.SQLite.Simple as SQL
import Network.Socket (ServiceName)
import NtfClient (ntfTestPort)
import SMPClient
( serverBracket,
( proxyVRange,
serverBracket,
testKeyHash,
testPort,
testPort2,
@@ -34,7 +35,7 @@ import Simplex.Messaging.Agent.RetryInterval
import Simplex.Messaging.Agent.Server (runSMPAgentBlocking)
import Simplex.Messaging.Agent.Store.SQLite (MigrationConfirmation (..), SQLiteStore (dbNew))
import Simplex.Messaging.Agent.Store.SQLite.Common (withTransaction')
import Simplex.Messaging.Client (ProtocolClientConfig (..), chooseTransportHost, defaultSMPClientConfig, defaultNetworkConfig)
import Simplex.Messaging.Client (ProtocolClientConfig (..), SMPProxyMode, chooseTransportHost, defaultSMPClientConfig, defaultNetworkConfig)
import Simplex.Messaging.Notifications.Client (defaultNTFClientConfig)
import Simplex.Messaging.Parsers (parseAll)
import Simplex.Messaging.Protocol (NtfServer, ProtoServerWithAuth)
@@ -198,6 +199,9 @@ initAgentServers =
initAgentServers2 :: InitialAgentServers
initAgentServers2 = initAgentServers {smp = userServers [noAuthSrv testSMPServer, noAuthSrv testSMPServer2]}
initAgentServersProxy :: SMPProxyMode -> InitialAgentServers
initAgentServersProxy smpProxyMode = initAgentServers {netCfg = (netCfg initAgentServers) {smpProxyMode}}
agentCfg :: AgentConfig
agentCfg =
defaultAgentConfig
@@ -217,6 +221,9 @@ agentCfg =
where
networkConfig = defaultNetworkConfig {tcpConnectTimeout = 3_000_000, tcpTimeout = 2_000_000}
agentProxyCfg :: AgentConfig
agentProxyCfg = agentCfg {smpCfg = (smpCfg agentCfg) {serverVRange = proxyVRange}}
fastRetryInterval :: RetryInterval
fastRetryInterval = defaultReconnectInterval {initialInterval = 50_000}
+7 -1
View File
@@ -123,9 +123,12 @@ proxyCfg =
cfgV7
{ allowSMPProxy = True,
smpServerVRange = mkVersionRange batchCmdsSMPVersion sendingProxySMPVersion,
smpAgentCfg = defaultSMPClientAgentConfig {smpCfg = (smpCfg defaultSMPClientAgentConfig) {serverVRange = mkVersionRange batchCmdsSMPVersion sendingProxySMPVersion, agreeSecret = True}}
smpAgentCfg = defaultSMPClientAgentConfig {smpCfg = (smpCfg defaultSMPClientAgentConfig) {serverVRange = proxyVRange, agreeSecret = True}}
}
proxyVRange :: VersionRangeSMP
proxyVRange = mkVersionRange batchCmdsSMPVersion sendingProxySMPVersion
withSmpServerStoreMsgLogOn :: HasCallStack => ATransport -> ServiceName -> (HasCallStack => ThreadId -> IO a) -> IO a
withSmpServerStoreMsgLogOn t = withSmpServerConfigOn t cfg {storeLogFile = Just testStoreLogFile, storeMsgsFile = Just testStoreMsgsFile, serverStatsBackupFile = Just testServerStatsBackupFile}
@@ -163,6 +166,9 @@ withSmpServer t = withSmpServerOn t testPort
withSmpServerV7 :: HasCallStack => ATransport -> IO a -> IO a
withSmpServerV7 t = withSmpServerConfigOn t cfgV7 testPort . const
withSmpServerProxy :: HasCallStack => ATransport -> IO a -> IO a
withSmpServerProxy t = withSmpServerConfigOn t proxyCfg testPort . const
runSmpTest :: forall c a. (HasCallStack, Transport c) => (HasCallStack => THandleSMP c 'TClient -> IO a) -> IO a
runSmpTest test = withSmpServer (transport @c) $ testSMPClient test
+103 -54
View File
@@ -1,8 +1,11 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedLists #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
@@ -10,15 +13,23 @@
module SMPProxyTests where
import AgentTests.FunctionalAPITests (runRight_)
import AgentTests.FunctionalAPITests
import Data.ByteString.Char8 (ByteString)
import SMPAgentClient (testSMPServer, testSMPServer2)
import Data.List.NonEmpty (NonEmpty)
import qualified Data.List.NonEmpty as L
import SMPAgentClient
import SMPClient
import qualified SMPClient as SMP
import ServerTests (decryptMsgV3, sendRecv)
import Simplex.Messaging.Agent hiding (createConnection, joinConnection, sendMessage)
import qualified Simplex.Messaging.Agent as A
import Simplex.Messaging.Agent.Env.SQLite (AgentConfig (..), InitialAgentServers (..))
import Simplex.Messaging.Agent.Protocol hiding (CON, CONF, INFO, REQ)
import qualified Simplex.Messaging.Agent.Protocol as A
import Simplex.Messaging.Client
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Protocol
import Simplex.Messaging.Crypto.Ratchet (pattern PQSupportOn)
import qualified Simplex.Messaging.Crypto.Ratchet as CR
import Simplex.Messaging.Protocol as SMP
import Simplex.Messaging.Server.Env.STM (ServerConfig (..))
import Simplex.Messaging.Transport
import Simplex.Messaging.Version (mkVersionRange)
@@ -37,49 +48,52 @@ smpProxyTests = do
xit "no SMP service at host/port" todo
xit "bad SMP fingerprint" todo
xit "batching proxy requests" todo
describe "forwarding requests" $ do
describe "deliver message via SMP proxy" $ do
describe "deliver message via SMP proxy" $ do
let srv1 = SMPServer testHost testPort testKeyHash
srv2 = SMPServer testHost testPort2 testKeyHash
describe "client API" $ do
let maxLen = maxMessageLength sendingProxySMPVersion
it "same server" $
withSmpServerConfigOn (transport @TLS) proxyCfg testPort $ \_ -> do
let proxyServ = SMPServer SMP.testHost SMP.testPort SMP.testKeyHash
let relayServ = proxyServ
describe "one server" $ do
it "deliver via proxy" . oneServer $ do
deliverMessageViaProxy srv1 srv1 C.SEd448 "hello 1" "hello 2"
describe "two servers" $ do
let proxyServ = srv1
relayServ = srv2
(msg1, msg2) <- runIO $ do
g <- C.newRandom
atomically $ (,) <$> C.randomBytes maxLen g <*> C.randomBytes maxLen g
it "deliver via proxy" . twoServersFirstProxy $
deliverMessageViaProxy proxyServ relayServ C.SEd448 "hello 1" "hello 2"
it "different servers" $
withSmpServerConfigOn (transport @TLS) proxyCfg testPort $ \_ ->
withSmpServerConfigOn (transport @TLS) cfgV7 testPort2 $ \_ -> do
let proxyServ = SMPServer SMP.testHost SMP.testPort SMP.testKeyHash
let relayServ = SMPServer SMP.testHost SMP.testPort2 SMP.testKeyHash
deliverMessageViaProxy proxyServ relayServ C.SEd448 "hello 1" "hello 2"
it "max message size, Ed448 keys" $
withSmpServerConfigOn (transport @TLS) proxyCfg testPort $ \_ ->
withSmpServerConfigOn (transport @TLS) cfgV7 testPort2 $ \_ -> do
g <- C.newRandom
msg <- atomically $ C.randomBytes maxLen g
msg' <- atomically $ C.randomBytes maxLen g
let proxyServ = SMPServer SMP.testHost SMP.testPort SMP.testKeyHash
let relayServ = SMPServer SMP.testHost SMP.testPort2 SMP.testKeyHash
deliverMessageViaProxy proxyServ relayServ C.SEd448 msg msg'
it "max message size, Ed25519 keys" $
withSmpServerConfigOn (transport @TLS) proxyCfg testPort $ \_ ->
withSmpServerConfigOn (transport @TLS) cfgV7 testPort2 $ \_ -> do
g <- C.newRandom
msg <- atomically $ C.randomBytes maxLen g
msg' <- atomically $ C.randomBytes maxLen g
let proxyServ = SMPServer SMP.testHost SMP.testPort SMP.testKeyHash
let relayServ = SMPServer SMP.testHost SMP.testPort2 SMP.testKeyHash
deliverMessageViaProxy proxyServ relayServ C.SEd25519 msg msg'
it "max message size, X25519 keys" $
withSmpServerConfigOn (transport @TLS) proxyCfg testPort $ \_ ->
withSmpServerConfigOn (transport @TLS) cfgV7 testPort2 $ \_ -> do
g <- C.newRandom
msg <- atomically $ C.randomBytes maxLen g
msg' <- atomically $ C.randomBytes maxLen g
let proxyServ = SMPServer SMP.testHost SMP.testPort SMP.testKeyHash
let relayServ = SMPServer SMP.testHost SMP.testPort2 SMP.testKeyHash
deliverMessageViaProxy proxyServ relayServ C.SX25519 msg msg'
xit "sender-proxy-relay-recipient works" todo
xit "similar timing for proxied and direct sends" todo
it "max message size, Ed448 keys" . twoServersFirstProxy $
deliverMessageViaProxy proxyServ relayServ C.SEd448 msg1 msg2
it "max message size, Ed25519 keys" . twoServersFirstProxy $
deliverMessageViaProxy proxyServ relayServ C.SEd25519 msg1 msg2
it "max message size, X25519 keys" . twoServersFirstProxy $
deliverMessageViaProxy proxyServ relayServ C.SX25519 msg1 msg2
describe "agent API" $ do
describe "one server" $ do
it "always via proxy" . oneServer $
agentDeliverMessageViaProxy ([srv1], SPMAlways, True) ([srv1], SPMAlways, True) C.SEd448 "hello 1" "hello 2"
it "without proxy" . oneServer $
agentDeliverMessageViaProxy ([srv1], SPMNever, False) ([srv1], SPMNever, False) C.SEd448 "hello 1" "hello 2"
describe "two servers" $ do
it "always via proxy" . twoServers $
agentDeliverMessageViaProxy ([srv1], SPMAlways, True) ([srv2], SPMAlways, True) C.SEd448 "hello 1" "hello 2"
it "both via proxy" . twoServers $
agentDeliverMessageViaProxy ([srv1], SPMUnknown, True) ([srv2], SPMUnknown, True) C.SEd448 "hello 1" "hello 2"
it "first via proxy" . twoServers $
agentDeliverMessageViaProxy ([srv1], SPMUnknown, True) ([srv2], SPMNever, False) C.SEd448 "hello 1" "hello 2"
it "without proxy" . twoServers $
agentDeliverMessageViaProxy ([srv1], SPMNever, False) ([srv2], SPMNever, False) C.SEd448 "hello 1" "hello 2"
it "first via proxy for unknown" . twoServers $
agentDeliverMessageViaProxy ([srv1], SPMUnknown, True) ([srv1, srv2], SPMUnknown, False) C.SEd448 "hello 1" "hello 2"
where
oneServer = withSmpServerConfigOn (transport @TLS) proxyCfg testPort . const
twoServers = twoServers_ proxyCfg proxyCfg
twoServersFirstProxy = twoServers_ proxyCfg cfgV7
twoServers_ cfg1 cfg2 runTest =
withSmpServerConfigOn (transport @TLS) cfg1 testPort $ \_ ->
withSmpServerConfigOn (transport @TLS) cfg2 testPort2 $ const runTest
deliverMessageViaProxy :: (C.AlgorithmI a, C.AuthAlgorithm a) => SMPServer -> SMPServer -> C.SAlgorithm a -> ByteString -> ByteString -> IO ()
deliverMessageViaProxy proxyServ relayServ alg msg msg' = do
@@ -97,39 +111,74 @@ deliverMessageViaProxy proxyServ relayServ alg msg msg' = do
QIK {rcvId, sndId, rcvPublicDhKey = srvDh} <- createSMPQueue rc (rPub, rPriv) rdhPub (Just "correct") SMSubscribe
let dec = decryptMsgV3 $ C.dh' srvDh rdhPriv
-- get proxy session
(sessId, v, relayKey) <- createSMPProxySession pc relayServ (Just "correct")
sess <- connectSMPProxiedRelay pc relayServ (Just "correct")
-- send via proxy to unsecured queue
proxySMPMessage pc sessId v relayKey Nothing sndId noMsgFlags msg
proxySMPMessage pc sess Nothing sndId noMsgFlags msg
-- receive 1
(_tSess, _v, _sid, _ety, MSG RcvMessage {msgId, msgBody = EncRcvMsgBody encBody}) <- atomically $ readTBQueue msgQ
(_tSess, _v, _sid, _ety, SMP.MSG RcvMessage {msgId, msgBody = EncRcvMsgBody encBody}) <- atomically $ readTBQueue msgQ
liftIO $ dec msgId encBody `shouldBe` Right msg
ackSMPMessage rc rPriv rcvId msgId
-- secure queue
(sPub, sPriv) <- atomically $ C.generateAuthKeyPair alg g
secureSMPQueue rc rPriv rcvId sPub
-- send via proxy to secured queue
proxySMPMessage pc sessId v relayKey (Just sPriv) sndId noMsgFlags msg'
proxySMPMessage pc sess (Just sPriv) sndId noMsgFlags msg'
-- receive 2
(_tSess, _v, _sid, _ety, MSG RcvMessage {msgId = msgId', msgBody = EncRcvMsgBody encBody'}) <- atomically $ readTBQueue msgQ
(_tSess, _v, _sid, _ety, SMP.MSG RcvMessage {msgId = msgId', msgBody = EncRcvMsgBody encBody'}) <- atomically $ readTBQueue msgQ
liftIO $ dec msgId' encBody' `shouldBe` Right msg'
ackSMPMessage rc rPriv rcvId msgId'
proxyVRange :: VersionRangeSMP
proxyVRange = mkVersionRange batchCmdsSMPVersion sendingProxySMPVersion
agentDeliverMessageViaProxy :: (C.AlgorithmI a, C.AuthAlgorithm a) => (NonEmpty SMPServer, SMPProxyMode, Bool) -> (NonEmpty SMPServer, SMPProxyMode, Bool) -> C.SAlgorithm a -> ByteString -> ByteString -> IO ()
agentDeliverMessageViaProxy aTestCfg@(aSrvs, _, aViaProxy) bTestCfg@(bSrvs, _, bViaProxy) alg msg1 msg2 =
withAgent 1 aCfg (servers aTestCfg) testDB $ \alice ->
withAgent 2 aCfg (servers bTestCfg) testDB2 $ \bob -> runRight_ $ do
(bobId, qInfo) <- A.createConnection alice 1 True SCMInvitation Nothing (CR.IKNoPQ PQSupportOn) SMSubscribe
aliceId <- A.joinConnection bob 1 True qInfo "bob's connInfo" PQSupportOn SMSubscribe
("", _, A.CONF confId pqSup' _ "bob's connInfo") <- get alice
liftIO $ pqSup' `shouldBe` PQSupportOn
allowConnection alice bobId confId "alice's connInfo"
let pqEnc = CR.PQEncOn
get alice ##> ("", bobId, A.CON pqEnc)
get bob ##> ("", aliceId, A.INFO PQSupportOn "alice's connInfo")
get bob ##> ("", aliceId, A.CON pqEnc)
-- message IDs 1 to 3 (or 1 to 4 in v1) get assigned to control messages, so first MSG is assigned ID 4
let aProxySrv = if aViaProxy then Just $ L.head aSrvs else Nothing
1 <- msgId <$> A.sendMessage alice bobId pqEnc noMsgFlags msg1
get alice ##> ("", bobId, A.SENT (baseId + 1) aProxySrv)
2 <- msgId <$> A.sendMessage alice bobId pqEnc noMsgFlags msg2
get alice ##> ("", bobId, A.SENT (baseId + 2) aProxySrv)
get bob =##> \case ("", c, Msg' _ pq msg1') -> c == aliceId && pq == pqEnc && msg1 == msg1'; _ -> False
ackMessage bob aliceId (baseId + 1) Nothing
get bob =##> \case ("", c, Msg' _ pq msg2') -> c == aliceId && pq == pqEnc && msg2 == msg2'; _ -> False
ackMessage bob aliceId (baseId + 2) Nothing
let bProxySrv = if bViaProxy then Just $ L.head bSrvs else Nothing
3 <- msgId <$> A.sendMessage bob aliceId pqEnc noMsgFlags msg1
get bob ##> ("", aliceId, A.SENT (baseId + 3) bProxySrv)
4 <- msgId <$> A.sendMessage bob aliceId pqEnc noMsgFlags msg2
get bob ##> ("", aliceId, A.SENT (baseId + 4) bProxySrv)
get alice =##> \case ("", c, Msg' _ pq msg1') -> c == bobId && pq == pqEnc && msg1 == msg1'; _ -> False
ackMessage alice bobId (baseId + 3) Nothing
get alice =##> \case ("", c, Msg' _ pq msg2') -> c == bobId && pq == pqEnc && msg2 == msg2'; _ -> False
ackMessage alice bobId (baseId + 4) Nothing
where
baseId = 3
msgId = subtract baseId . fst
aCfg = agentProxyCfg {sndAuthAlg = C.AuthAlg alg, rcvAuthAlg = C.AuthAlg alg}
servers (srvs, smpProxyMode, _) = (initAgentServersProxy smpProxyMode) {smp = userServers $ L.map noAuthSrv srvs}
testNoProxy :: IO ()
testNoProxy = do
withSmpServerConfigOn (transport @TLS) cfg testPort2 $ \_ -> do
testSMPClient_ "127.0.0.1" testPort2 proxyVRange $ \(th :: THandleSMP TLS 'TClient) -> do
(_, _, (_corrId, _entityId, reply)) <- sendRecv th (Nothing, "0", "", PRXY testSMPServer Nothing)
reply `shouldBe` Right (ERR AUTH)
reply `shouldBe` Right (SMP.ERR SMP.AUTH)
testProxyAuth :: IO ()
testProxyAuth = do
withSmpServerConfigOn (transport @TLS) proxyCfgAuth testPort $ \_ -> do
testSMPClient_ "127.0.0.1" testPort proxyVRange $ \(th :: THandleSMP TLS 'TClient) -> do
(_, _s, (_corrId, _entityId, reply)) <- sendRecv th (Nothing, "0", "", PRXY testSMPServer2 $ Just "wrong")
reply `shouldBe` Right (ERR AUTH)
reply `shouldBe` Right (SMP.ERR SMP.AUTH)
where
proxyCfgAuth = proxyCfg {newQueueBasicAuth = Just "correct"}