SMP proxy: server implementation (#1098)

* wip

* PRXY command

* progress

* SMP Proxy: client-level implementation (#1101)

* buildable

* encode messages

* update pkey

* fix queue types

* wrap SEND in proxy lookup

* WIP proxy client

* WIP

* post-rebase fixes

* encode something with something

* cleanup

* update

* fix nonce/corrId in batchingTests

* WIP: dig into createSMPProxySession

* agent

* test progress

* pass the test

* parameterize transport handle with transport peer to include server certificate (#1100)

* parameterize transport handle with transport peer to include server certificate

* include server certificate into THandle

* load server chain and sign key

* fix key type

* fix for 8.10

---------

Co-authored-by: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com>
Co-authored-by: IC Rainbow <aenor.realm@gmail.com>

* cleanup

* add 2-server test

* remove subsumed test

* checkCredentials for BrokerMsg

* skip batching tests

* remove userId param

* remove agent changes

---------

Co-authored-by: Evgeny Poberezkin <evgeny@poberezkin.com>

---------

Co-authored-by: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com>
This commit is contained in:
Evgeny Poberezkin
2024-04-17 19:31:07 +01:00
committed by GitHub
parent 11a68f4f15
commit a945cc5786
17 changed files with 515 additions and 187 deletions
+3 -1
View File
@@ -186,9 +186,11 @@ xftpClientError = \case
sendXFTPCommand :: forall p. FilePartyI p => XFTPClient -> C.APrivateAuthKey -> XFTPFileId -> FileCommand p -> Maybe XFTPChunkSpec -> ExceptT XFTPClientError IO (FileResponse, HTTP2Body)
sendXFTPCommand c@XFTPClient {thParams} pKey fId cmd chunkSpec_ = do
-- TODO random corrId
let corrIdUsedAsNonce = ""
t <-
liftEither . first PCETransportError $
xftpEncodeAuthTransmission thParams pKey ("", fId, FileCmd (sFileParty @p) cmd)
xftpEncodeAuthTransmission thParams pKey (corrIdUsedAsNonce, fId, FileCmd (sFileParty @p) cmd)
sendXFTPTransmission c t chunkSpec_
sendXFTPTransmission :: XFTPClient -> ByteString -> Maybe XFTPChunkSpec -> ExceptT XFTPClientError IO (FileResponse, HTTP2Body)
+2 -1
View File
@@ -48,6 +48,7 @@ import Simplex.Messaging.Protocol
SndPublicAuthKey,
Transmission,
TransmissionForAuth (..),
CorrId (..),
encodeTransmission,
encodeTransmissionForAuth,
messageTagP,
@@ -328,7 +329,7 @@ checkParty' c = case testEquality (sFileParty @p) (sFileParty @p') of
xftpEncodeAuthTransmission :: ProtocolEncoding XFTPVersion e c => THandleParams XFTPVersion 'TClient -> C.APrivateAuthKey -> Transmission c -> Either TransportError ByteString
xftpEncodeAuthTransmission thParams@THandleParams {thAuth} pKey (corrId, fId, msg) = do
let TransmissionForAuth {tForAuth, tToSend} = encodeTransmissionForAuth thParams (corrId, fId, msg)
xftpEncodeBatch1 . (,tToSend) =<< authTransmission thAuth (Just pKey) corrId tForAuth
xftpEncodeBatch1 . (,tToSend) =<< authTransmission thAuth (Just pKey) (C.cbNonce $ bs corrId) tForAuth
xftpEncodeTransmission :: ProtocolEncoding XFTPVersion e c => THandleParams XFTPVersion p -> Transmission c -> Either TransportError ByteString
xftpEncodeTransmission thParams (corrId, fId, msg) = do
+1 -1
View File
@@ -788,7 +788,7 @@ compatibleContactUri (CRContactUri ConnReqUriData {crAgentVRange, crSmpQueues =
AgentConfig {smpClientVRange, smpAgentVRange} <- asks config
pure $
(,)
<$> (qUri `compatibleVersion` smpClientVRange)
<$> (qUri `compatibleVersion` smpClientVRange)
<*> (crAgentVRange `compatibleVersion` smpAgentVRange pqSup)
versionPQSupport_ :: VersionSMPA -> Maybe CR.VersionE2E -> PQSupport
+123 -22
View File
@@ -54,6 +54,9 @@ module Simplex.Messaging.Client
suspendSMPQueue,
deleteSMPQueue,
deleteSMPQueues,
createSMPProxySession,
proxySMPMessage,
forwardSMPMessage,
sendProtocolCommand,
-- * Supporting types and client configuration
@@ -69,6 +72,7 @@ module Simplex.Messaging.Client
chooseTransportHost,
proxyUsername,
temporaryClientError,
smpProxyError,
ServerTransmission,
ClientCommand,
@@ -98,9 +102,12 @@ import Data.List.NonEmpty (NonEmpty (..))
import qualified Data.List.NonEmpty as L
import Data.Maybe (fromMaybe)
import Data.Time.Clock (UTCTime (..), getCurrentTime)
import qualified Data.X509 as X
import qualified Data.X509.Validation as XV
import Network.Socket (ServiceName)
import Numeric.Natural
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Parsers (defaultJSON, dropPrefix, enumJSON)
import Simplex.Messaging.Protocol
@@ -110,7 +117,7 @@ import Simplex.Messaging.Transport
import Simplex.Messaging.Transport.Client (SocksProxy, TransportClientConfig (..), TransportHost (..), runTransportClient)
import Simplex.Messaging.Transport.KeepAlive
import Simplex.Messaging.Transport.WebSockets (WS)
import Simplex.Messaging.Util (bshow, raceAny_, threadDelay')
import Simplex.Messaging.Util (bshow, liftEitherWith, raceAny_, threadDelay')
import Simplex.Messaging.Version
import System.Timeout (timeout)
@@ -480,6 +487,19 @@ temporaryClientError = \case
_ -> False
{-# INLINE temporaryClientError #-}
-- TODO keep error params
smpProxyError :: SMPClientError -> ErrorType
smpProxyError = \case
PCEProtocolError _ -> PROXY PROTOCOL
PCEResponseError _ -> PROXY RESPONSE
PCEUnexpectedResponse _ -> PROXY UNEXPECTED
PCEResponseTimeout -> PROXY TIMEOUT
PCENetworkError -> PROXY NETWORK
PCEIncompatibleHost -> PROXY BAD_HOST
PCETransportError _ -> PROXY TRANSPORT
PCECryptoError _ -> INTERNAL
PCEIOError _ -> INTERNAL
-- | Create a new SMP queue.
--
-- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#create-queue-command
@@ -631,24 +651,100 @@ deleteSMPQueues = okSMPCommands DEL
{-# INLINE deleteSMPQueues #-}
-- TODO picture
-- send PRXY :: SMPServer -> Maybe BasicAuth -> Command Sender
-- receives PKEY :: SessionId -> X.CertificateChain -> X.SignedExact X.PubKey -> BrokerMsg
createSMPProxySession :: SMPClient -> SMPServer -> Maybe BasicAuth -> ExceptT SMPClientError IO (SessionId, C.PublicKeyX25519)
createSMPProxySession _proxyClnt _relayServ _proxyAuth = undefined
createSMPProxySession :: SMPClient -> SMPServer -> Maybe BasicAuth -> ExceptT SMPClientError IO (SessionId, VersionSMP, C.PublicKeyX25519)
createSMPProxySession c relayServ@ProtocolServer {keyHash = C.KeyHash kh} proxyAuth =
sendSMPCommand c Nothing "" (PRXY relayServ proxyAuth) >>= \case
-- XXX: rfc says sessionId should be in the entityId of response
PKEY sId vr (chain, key) -> do
case supportedClientSMPRelayVRange `compatibleVersion` vr of
Nothing -> throwE PCEIncompatibleHost -- TODO different error
Just (Compatible v) -> liftEitherWith x509Error $ (sId,v,) <$> validateRelay chain key
r -> throwE . PCEUnexpectedResponse $ bshow r
where
x509Error :: String -> SMPClientError
x509Error _msg = PCEResponseError $ error "TODO: x509 error" -- TODO different error
validateRelay :: X.CertificateChain -> X.SignedExact X.PubKey -> Either String C.PublicKeyX25519
validateRelay (X.CertificateChain cert) exact = do
serverKey <- case cert of
[leaf, ca]
| XV.Fingerprint kh == XV.getFingerprint ca X.HashSHA256 ->
C.x509ToPublic (X.certPubKey . X.signedObject $ X.getSigned leaf, []) >>= C.pubKey
_ -> throwError "bad certificate"
pubKey <- C.verifyX509 serverKey exact
C.x509ToPublic (pubKey, []) >>= C.pubKey
-- consider how to process slow responses - is it handled somehow locally or delegated to the caller
-- this method is used in the client
-- sends PFWD :: C.PublicKeyX25519 -> EncTransmission -> Command Sender
-- receives PRES :: EncResponse -> BrokerMsg -- proxy to client
proxySMPMessage :: SMPClient -> SessionId -> Maybe SndPrivateAuthKey -> SenderId -> MsgFlags -> MsgBody -> ExceptT SMPClientError IO ()
proxySMPMessage _proxyClnt _relaySess _spKey _sId _flags _msg = undefined
proxySMPMessage ::
SMPClient ->
-- proxy session from PKEY
SessionId ->
VersionSMP ->
C.PublicKeyX25519 ->
-- message to deliver
Maybe SndPrivateAuthKey ->
SenderId ->
MsgFlags ->
MsgBody ->
ExceptT SMPClientError IO ()
-- TODO use version
proxySMPMessage c@ProtocolClient {thParams = proxyThParams, client_ = PClient {clientCorrId = g}} sessionId _v serverKey spKey sId flags msg = do
-- prepare params
let serverThAuth = (\ta -> ta {serverPeerPubKey = serverKey}) <$> thAuth proxyThParams
serverThParams = proxyThParams {sessionId, thAuth = serverThAuth}
(cmdPubKey, cmdPrivKey) <- liftIO . atomically $ C.generateKeyPair @'C.X25519 g
let cmdSecret = C.dh' serverKey cmdPrivKey
nonce@(C.CbNonce corrId) <- liftIO . atomically $ C.randomCbNonce g
-- encode
let TransmissionForAuth {tForAuth, tToSend} = encodeTransmissionForAuth serverThParams (CorrId corrId, sId, Cmd SSender $ SEND flags msg)
auth <- liftEitherWith PCETransportError $ authTransmission serverThAuth spKey nonce tForAuth
b <- case batchTransmissions (batch serverThParams) (blockSize serverThParams) [Right (auth, tToSend)] of
[] -> throwE $ PCETransportError TELargeMsg -- some other error. Internal?
TBError e _ : _ -> throwE $ PCETransportError e -- large message error?
TBTransmission s _ : _ -> pure s
TBTransmissions s _ _ : _ -> pure s
et <- liftEitherWith PCECryptoError $ EncTransmission <$> C.cbEncrypt cmdSecret nonce b paddedProxiedMsgLength
sendProtocolCommand_ c (Just nonce) Nothing sessionId (Cmd SProxiedClient (PFWD cmdPubKey et)) >>= \case
-- TODO support PKEY + resend?
PRES (EncResponse er) -> do
t' <- liftEitherWith PCECryptoError $ C.cbDecrypt cmdSecret (C.reverseNonce nonce) er
case tParse proxyThParams t' of
t'' :| [] -> case tDecodeParseValidate proxyThParams t'' of
(_auth, _signed, (_c, _e, r)) -> case r of -- TODO: verify
Left e -> throwE $ PCEResponseError e
Right OK -> pure ()
Right (ERR e) -> throwE $ PCEProtocolError e
Right u -> throwE . PCEUnexpectedResponse $ bshow u -- possibly differentiate unexpected response from server/proxy
_ -> throwE $ PCETransportError TEBadBlock
r -> throwE . PCEUnexpectedResponse $ bshow r -- from proxy
-- this method is used in the server
-- this method is used in the proxy
-- sends RFWD :: EncFwdTransmission -> Command Sender
-- receives RRES :: EncFwdResponse -> BrokerMsg
-- server should send PRES to the client with EncResponse
forwardSMPMessage :: SMPClient -> C.CbNonce -> C.PublicKeyX25519 -> EncTransmission -> ExceptT SMPClientError IO EncResponse
forwardSMPMessage _relayClnt _corrId _cmdKey _encTrans = undefined
-- proxy should send PRES to the client with EncResponse
forwardSMPMessage :: SMPClient -> CorrId -> C.PublicKeyX25519 -> EncTransmission -> ExceptT SMPClientError IO EncResponse
forwardSMPMessage c@ProtocolClient {thParams, client_ = PClient {clientCorrId = g}} fwdCorrId fwdKey fwdTransmission = do
-- prepare params
sessSecret <- case thAuth thParams of
Nothing -> throwError $ PCEProtocolError INTERNAL -- different error - proxy didn't pass key?
Just THAuthClient {serverPeerPubKey, clientPrivKey} -> pure $ C.dh' serverPeerPubKey clientPrivKey
nonce <- liftIO . atomically $ C.randomCbNonce g
-- wrap
let fwdT = FwdTransmission {fwdCorrId, fwdKey, fwdTransmission}
eft <- liftEitherWith PCECryptoError $ EncFwdTransmission <$> C.cbEncrypt sessSecret nonce (smpEncode fwdT) paddedForwardedMsgLength
-- send
sendProtocolCommand_ c (Just nonce) Nothing "" (Cmd SSender (RFWD eft)) >>= \case
RRES (EncFwdResponse efr) -> do
-- unwrap
r' <- liftEitherWith PCECryptoError $ C.cbDecrypt sessSecret (C.reverseNonce nonce) efr
FwdResponse {fwdCorrId = _, fwdResponse} <- liftEitherWith (const $ PCEResponseError BLOCK) $ smpDecode r'
pure fwdResponse
r -> throwE . PCEUnexpectedResponse $ bshow r
okSMPCommand :: PartyI p => Command p -> SMPClient -> C.APrivateAuthKey -> QueueId -> ExceptT SMPClientError IO ()
okSMPCommand cmd c pKey qId =
@@ -713,8 +809,11 @@ sendBatch c@ProtocolClient {client_ = PClient {sndQ}} b = do
-- | Send Protocol command
sendProtocolCommand :: forall v err msg. ProtocolEncoding v err (ProtoCommand msg) => ProtocolClient v err msg -> Maybe C.APrivateAuthKey -> EntityId -> ProtoCommand msg -> ExceptT (ProtocolClientError err) IO msg
sendProtocolCommand c@ProtocolClient {client_ = PClient {sndQ}, thParams = THandleParams {batch, blockSize}} pKey entId cmd =
ExceptT $ uncurry sendRecv =<< mkTransmission c (pKey, entId, cmd)
sendProtocolCommand c = sendProtocolCommand_ c Nothing
sendProtocolCommand_ :: forall v err msg. ProtocolEncoding v err (ProtoCommand msg) => ProtocolClient v err msg -> Maybe C.CbNonce -> Maybe C.APrivateAuthKey -> EntityId -> ProtoCommand msg -> ExceptT (ProtocolClientError err) IO msg
sendProtocolCommand_ c@ProtocolClient {client_ = PClient {sndQ}, thParams = THandleParams {batch, blockSize}} nonce_ pKey entId cmd =
ExceptT $ uncurry sendRecv =<< mkTransmission_ c nonce_ (pKey, entId, cmd)
where
-- two separate "atomically" needed to avoid blocking
sendRecv :: Either TransportError SentRawTransmission -> Request err msg -> IO (Either (ProtocolClientError err) msg)
@@ -733,33 +832,35 @@ getResponse :: ProtocolClient v err msg -> Request err msg -> IO (Response err m
getResponse ProtocolClient {client_ = PClient {tcpTimeout, pingErrorCount}} Request {entityId, responseVar} = do
response <-
timeout tcpTimeout (atomically (takeTMVar responseVar)) >>= \case
-- BTW: another registerDelay candidate. Also, crashes caller with BlockedIndef.
Just r -> atomically (writeTVar pingErrorCount 0) $> r
Nothing -> pure $ Left PCEResponseTimeout
pure Response {entityId, response}
mkTransmission :: forall v err msg. ProtocolEncoding v err (ProtoCommand msg) => ProtocolClient v err msg -> ClientCommand msg -> IO (PCTransmission err msg)
mkTransmission ProtocolClient {thParams, client_ = PClient {clientCorrId, sentCommands}} (pKey_, entId, cmd) = do
corrId <- atomically getNextCorrId
let TransmissionForAuth {tForAuth, tToSend} = encodeTransmissionForAuth thParams (corrId, entId, cmd)
auth = authTransmission (thAuth thParams) pKey_ corrId tForAuth
r <- atomically $ mkRequest corrId
mkTransmission :: ProtocolEncoding v err (ProtoCommand msg) => ProtocolClient v err msg -> ClientCommand msg -> IO (PCTransmission err msg)
mkTransmission c = mkTransmission_ c Nothing
mkTransmission_ :: forall v err msg. ProtocolEncoding v err (ProtoCommand msg) => ProtocolClient v err msg -> Maybe C.CbNonce -> ClientCommand msg -> IO (PCTransmission err msg)
mkTransmission_ ProtocolClient {thParams, client_ = PClient {clientCorrId, sentCommands}} nonce_ (pKey_, entId, cmd) = do
nonce@(C.CbNonce corrId) <- maybe (atomically $ C.randomCbNonce clientCorrId) pure nonce_
let TransmissionForAuth {tForAuth, tToSend} = encodeTransmissionForAuth thParams (CorrId corrId, entId, cmd)
auth = authTransmission (thAuth thParams) pKey_ nonce tForAuth
r <- atomically $ mkRequest (CorrId corrId)
pure ((,tToSend) <$> auth, r)
where
getNextCorrId :: STM CorrId
getNextCorrId = CorrId <$> C.randomBytes 24 clientCorrId -- also used as nonce
mkRequest :: CorrId -> STM (Request err msg)
mkRequest corrId = do
r <- Request entId <$> newEmptyTMVar
TM.insert corrId r sentCommands
pure r
authTransmission :: Maybe (THandleAuth 'TClient) -> Maybe C.APrivateAuthKey -> CorrId -> ByteString -> Either TransportError (Maybe TransmissionAuth)
authTransmission thAuth pKey_ (CorrId corrId) t = traverse authenticate pKey_
authTransmission :: Maybe (THandleAuth 'TClient) -> Maybe C.APrivateAuthKey -> C.CbNonce -> ByteString -> Either TransportError (Maybe TransmissionAuth)
authTransmission thAuth pKey_ nonce t = traverse authenticate pKey_
where
authenticate :: C.APrivateAuthKey -> Either TransportError TransmissionAuth
authenticate (C.APrivateAuthKey a pk) = case a of
C.SX25519 -> case thAuth of
Just THAuthClient {serverPeerPubKey = k} -> Right $ TAAuthenticator $ C.cbAuthenticate k pk (C.cbNonce corrId) t
Just THAuthClient {serverPeerPubKey = k} -> Right $ TAAuthenticator $ C.cbAuthenticate k pk nonce t
Nothing -> Left TENoServerAuth
C.SEd25519 -> sign pk
C.SEd448 -> sign pk
+16 -6
View File
@@ -98,6 +98,8 @@ data SMPClientAgent = SMPClientAgent
agentQ :: TBQueue SMPClientAgentEvent,
randomDrg :: TVar ChaChaDRG,
smpClients :: TMap SMPServer SMPClientVar,
smpSessions :: TMap SessionId SMPClient,
-- TODO add lookup by session ID
srvSubs :: TMap SMPServer (TMap SMPSub C.APrivateAuthKey),
pendingSrvSubs :: TMap SMPServer (TMap SMPSub C.APrivateAuthKey),
reconnections :: TVar [Async ()],
@@ -135,6 +137,7 @@ newSMPClientAgent agentCfg@SMPClientAgentConfig {msgQSize, agentQSize} randomDrg
msgQ <- newTBQueue msgQSize
agentQ <- newTBQueue agentQSize
smpClients <- TM.empty
smpSessions <- TM.empty
srvSubs <- TM.empty
pendingSrvSubs <- TM.empty
reconnections <- newTVar []
@@ -147,6 +150,7 @@ newSMPClientAgent agentCfg@SMPClientAgentConfig {msgQSize, agentQSize} randomDrg
agentQ,
randomDrg,
smpClients,
smpSessions,
srvSubs,
pendingSrvSubs,
reconnections,
@@ -155,7 +159,7 @@ newSMPClientAgent agentCfg@SMPClientAgentConfig {msgQSize, agentQSize} randomDrg
}
getSMPServerClient' :: SMPClientAgent -> SMPServer -> ExceptT SMPClientError IO SMPClient
getSMPServerClient' ca@SMPClientAgent {agentCfg, smpClients, msgQ, randomDrg, workerSeq} srv =
getSMPServerClient' ca@SMPClientAgent {agentCfg, smpClients, smpSessions, msgQ, randomDrg, workerSeq} srv =
atomically getClientVar >>= either newSMPClient waitForSMPClient
where
getClientVar :: STM (Either SMPClientVar SMPClientVar)
@@ -178,7 +182,9 @@ getSMPServerClient' ca@SMPClientAgent {agentCfg, smpClients, msgQ, randomDrg, wo
tryE (connectClient v) >>= \r -> case r of
Right smp -> do
logInfo . decodeUtf8 $ "Agent connected to " <> showServer srv
atomically $ putTMVar (sessionVar v) r
atomically $ do
putTMVar (sessionVar v) r
TM.insert (sessionId $ thParams smp) smp smpSessions
successAction smp
Left e -> do
if e == PCENetworkError || e == PCEResponseTimeout
@@ -200,13 +206,14 @@ getSMPServerClient' ca@SMPClientAgent {agentCfg, smpClients, msgQ, randomDrg, wo
connectClient v = ExceptT $ getProtocolClient randomDrg (1, srv, Nothing) (smpCfg agentCfg) (Just msgQ) (clientDisconnected v)
clientDisconnected :: SMPClientVar -> SMPClient -> IO ()
clientDisconnected v _ = do
removeClientAndSubs v >>= (`forM_` serverDown)
clientDisconnected v smp = do
removeClientAndSubs v smp >>= (`forM_` serverDown)
logInfo . decodeUtf8 $ "Agent disconnected from " <> showServer srv
removeClientAndSubs :: SMPClientVar -> IO (Maybe (Map SMPSub C.APrivateAuthKey))
removeClientAndSubs v = atomically $ do
removeClientAndSubs :: SMPClientVar -> SMPClient -> IO (Maybe (Map SMPSub C.APrivateAuthKey))
removeClientAndSubs v smp = atomically $ do
removeSessVar v srv smpClients
TM.delete (sessionId $ thParams smp) smpSessions
TM.lookupDelete srv (srvSubs ca) >>= mapM updateSubs
where
updateSubs sVar = do
@@ -271,6 +278,9 @@ getSMPServerClient' ca@SMPClientAgent {agentCfg, smpClients, msgQ, randomDrg, wo
notify :: SMPClientAgentEvent -> IO ()
notify evt = atomically $ writeTBQueue (agentQ ca) evt
lookupSMPServerClient :: SMPClientAgent -> SessionId -> STM (Maybe SMPClient)
lookupSMPServerClient SMPClientAgent {smpSessions} sessId = TM.lookup sessId smpSessions
closeSMPClientAgent :: SMPClientAgent -> IO ()
closeSMPClientAgent c = do
closeSMPServerClients c
+4
View File
@@ -141,6 +141,7 @@ module Simplex.Messaging.Crypto
sbEncrypt_,
cbNonce,
randomCbNonce,
reverseNonce,
-- * NaCl crypto_secretbox
SbKey (unSbKey),
@@ -1292,6 +1293,9 @@ randomCbNonce = fmap CryptoBoxNonce . randomBytes 24
randomBytes :: Int -> TVar ChaChaDRG -> STM ByteString
randomBytes n gVar = stateTVar gVar $ randomBytesGenerate n
reverseNonce :: CbNonce -> CbNonce
reverseNonce (CryptoBoxNonce s) = CryptoBoxNonce (B.reverse s)
instance Encoding CbNonce where
smpEncode = unCbNonce
smpP = CryptoBoxNonce <$> A.take 24
+139 -36
View File
@@ -43,6 +43,8 @@ module Simplex.Messaging.Protocol
( -- * SMP protocol parameters
supportedSMPClientVRange,
maxMessageLength,
paddedProxiedMsgLength,
paddedForwardedMsgLength,
e2eEncConfirmationLength,
e2eEncMessageLength,
@@ -56,6 +58,7 @@ module Simplex.Messaging.Protocol
SubscriptionMode (..),
Party (..),
Cmd (..),
DirectParty,
BrokerMsg (..),
SParty (..),
PartyI (..),
@@ -63,6 +66,7 @@ module Simplex.Messaging.Protocol
ProtocolErrorType (..),
ErrorType (..),
CommandError (..),
ProxyError (..),
Transmission,
TransmissionAuth (..),
SignedTransmission,
@@ -121,6 +125,12 @@ module Simplex.Messaging.Protocol
EncNMsgMeta,
SMPMsgMeta (..),
NMsgMeta (..),
EncFwdResponse (..),
EncFwdTransmission (..),
EncResponse (..),
EncTransmission (..),
FwdResponse (..),
FwdTransmission (..),
MsgFlags (..),
initialSMPClientVersion,
userProtocol,
@@ -191,6 +201,7 @@ import Data.Word (Word16)
import qualified Data.X509 as X
import GHC.TypeLits (ErrorMessage (..), TypeError, type (+))
import qualified GHC.TypeLits as TE
import qualified GHC.TypeLits as Type
import Network.Socket (ServiceName)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding
@@ -233,6 +244,12 @@ supportedSMPClientVRange = mkVersionRange initialSMPClientVersion currentSMPClie
maxMessageLength :: Int
maxMessageLength = 16088
paddedProxiedMsgLength :: Int
paddedProxiedMsgLength = 16388
paddedForwardedMsgLength :: Int
paddedForwardedMsgLength = 16688
type MaxMessageLen = 16088
-- 16 extra bytes: 8 for timestamp and 8 for flags (7 flags and the space, only 1 flag is currently used)
@@ -246,7 +263,7 @@ e2eEncMessageLength :: Int
e2eEncMessageLength = 16032
-- | SMP protocol clients
data Party = Recipient | Sender | Notifier
data Party = Recipient | Sender | Notifier | ProxiedClient
deriving (Show)
-- | Singleton types for SMP protocol clients
@@ -254,11 +271,13 @@ data SParty :: Party -> Type where
SRecipient :: SParty Recipient
SSender :: SParty Sender
SNotifier :: SParty Notifier
SProxiedClient :: SParty ProxiedClient
instance TestEquality SParty where
testEquality SRecipient SRecipient = Just Refl
testEquality SSender SSender = Just Refl
testEquality SNotifier SNotifier = Just Refl
testEquality SProxiedClient SProxiedClient = Just Refl
testEquality _ _ = Nothing
deriving instance Show (SParty p)
@@ -271,6 +290,15 @@ instance PartyI Sender where sParty = SSender
instance PartyI Notifier where sParty = SNotifier
instance PartyI ProxiedClient where sParty = SProxiedClient
type family DirectParty (p :: Party) :: Constraint where
DirectParty Recipient = ()
DirectParty Sender = ()
DirectParty Notifier = ()
DirectParty p =
(Int ~ Bool, TypeError (Type.Text "Party " :<>: ShowType p :<>: Type.Text " is not direct"))
-- | Type for client command of any participant.
data Cmd = forall p. PartyI p => Cmd (SParty p) (Command p)
@@ -361,13 +389,13 @@ data Command (p :: Party) where
PING :: Command Sender
-- SMP notification subscriber commands
NSUB :: Command Notifier
PRXY :: SMPServer -> Maybe BasicAuth -> Command Sender -- request a relay server connection by URI
PRXY :: SMPServer -> Maybe BasicAuth -> Command ProxiedClient -- request a relay server connection by URI
-- Transmission to proxy:
-- - entity ID: ID of the session with relay returned in PKEY (response to PRXY)
-- - corrId: also used as a nonce to encrypt transmission to relay, corrId + 1 - from relay
-- - key (1st param in the command) is used to agree DH secret for this particular transmission and its response
-- Encrypted transmission should include session ID (tlsunique) from proxy-relay connection.
PFWD :: C.PublicKeyX25519 -> EncTransmission -> Command Sender -- use CorrId as CbNonce, client to proxy
PFWD :: C.PublicKeyX25519 -> EncTransmission -> Command ProxiedClient -- use CorrId as CbNonce, client to proxy
-- Transmission forwarded to relay:
-- - entity ID: empty
-- - corrId: unique correlation ID between proxy and relay, also used as a nonce to encrypt forwarded transmission
@@ -401,11 +429,18 @@ newtype EncTransmission = EncTransmission ByteString
deriving (Show)
data FwdTransmission = FwdTransmission
{ fwdCorrId :: ByteString,
{ fwdCorrId :: CorrId,
fwdKey :: C.PublicKeyX25519,
fwdTransmission :: EncTransmission
}
instance Encoding FwdTransmission where
smpEncode FwdTransmission {fwdCorrId = CorrId corrId, fwdKey, fwdTransmission = EncTransmission t} =
smpEncode (corrId, fwdKey, Tail t)
smpP = do
(corrId, fwdKey, Tail t) <- smpP
pure FwdTransmission {fwdCorrId = CorrId corrId, fwdKey, fwdTransmission = EncTransmission t}
newtype EncFwdTransmission = EncFwdTransmission ByteString
deriving (Show)
@@ -419,7 +454,7 @@ data BrokerMsg where
NID :: NotifierId -> RcvNtfPublicDhKey -> BrokerMsg
NMSG :: C.CbNonce -> EncNMsgMeta -> BrokerMsg
-- Should include certificate chain
PKEY :: SessionId -> X.CertificateChain -> X.SignedExact X.PubKey -> BrokerMsg -- TLS-signed server key for proxy shared secret and initial sender key
PKEY :: SessionId -> VersionRangeSMP -> (X.CertificateChain, X.SignedExact X.PubKey) -> BrokerMsg -- TLS-signed server key for proxy shared secret and initial sender key
RRES :: EncFwdResponse -> BrokerMsg -- relay to proxy
PRES :: EncResponse -> BrokerMsg -- proxy to client
END :: BrokerMsg
@@ -438,10 +473,17 @@ newtype EncFwdResponse = EncFwdResponse ByteString
deriving (Eq, Show)
data FwdResponse = FwdResponse
{ fwdCorrId :: ByteString,
fwdResponse :: ByteString
{ fwdCorrId :: CorrId,
fwdResponse :: EncResponse
}
instance Encoding FwdResponse where
smpEncode FwdResponse {fwdCorrId = CorrId corrId, fwdResponse = EncResponse t} =
smpEncode (corrId, Tail t)
smpP = do
(corrId, Tail t) <- smpP
pure FwdResponse {fwdCorrId = CorrId corrId, fwdResponse = EncResponse t}
newtype EncResponse = EncResponse ByteString
deriving (Eq, Show)
@@ -607,8 +649,8 @@ data CommandTag (p :: Party) where
DEL_ :: CommandTag Recipient
SEND_ :: CommandTag Sender
PING_ :: CommandTag Sender
PRXY_ :: CommandTag Sender
PFWD_ :: CommandTag Sender
PRXY_ :: CommandTag ProxiedClient
PFWD_ :: CommandTag ProxiedClient
RFWD_ :: CommandTag Sender
NSUB_ :: CommandTag Notifier
@@ -672,8 +714,8 @@ instance ProtocolMsgTag CmdTag where
"DEL" -> Just $ CT SRecipient DEL_
"SEND" -> Just $ CT SSender SEND_
"PING" -> Just $ CT SSender PING_
"PRXY" -> Just $ CT SSender PRXY_
"PFWD" -> Just $ CT SSender PFWD_
"PRXY" -> Just $ CT SProxiedClient PRXY_
"PFWD" -> Just $ CT SProxiedClient PFWD_
"RFWD" -> Just $ CT SSender RFWD_
"NSUB" -> Just $ CT SNotifier NSUB_
_ -> Nothing
@@ -1096,6 +1138,8 @@ data ErrorType
SESSION
| -- | SMP command is unknown or has invalid syntax
CMD {cmdErr :: CommandError}
| -- | error from proxied relay
PROXY {proxyErr :: ProxyError}
| -- | command authorization error - bad signature or non-existing SMP queue
AUTH
| -- | SMP queue capacity is exceeded on the server
@@ -1115,8 +1159,12 @@ data ErrorType
instance StrEncoding ErrorType where
strEncode = \case
CMD e -> "CMD " <> bshow e
PROXY e -> "PROXY " <> bshow e
e -> bshow e
strP = "CMD " *> (CMD <$> parseRead1) <|> parseRead1
strP =
"CMD " *> (CMD <$> parseRead1)
<|> "PROXY " *> (PROXY <$> parseRead1)
<|> parseRead1
-- | SMP command error type.
data CommandError
@@ -1134,6 +1182,22 @@ data CommandError
NO_ENTITY
deriving (Eq, Read, Show)
-- TODO keep error params
data ProxyError
= -- | Correctly parsed SMP server ERR response.
-- This error is forwarded to the agent client as `ERR SMP err`.
PROTOCOL -- {protocolErr :: String}
| -- | Invalid server response that failed to parse.
-- Forwarded to the agent client as `ERR BROKER RESPONSE`.
RESPONSE -- {responseErr :: String}
| UNEXPECTED
| TIMEOUT
| NETWORK
| BAD_HOST
| NO_SESSION
| TRANSPORT -- {transportErr :: TransportError}
deriving (Eq, Read, Show)
-- | SMP transmission parser.
transmissionP :: THandleParams v p -> Parser RawTransmission
transmissionP THandleParams {sessionId, implySessId} = do
@@ -1195,9 +1259,9 @@ instance PartyI p => ProtocolEncoding SMPVersion ErrorType (Command p) where
SEND flags msg -> e (SEND_, ' ', flags, ' ', Tail msg)
PING -> e PING_
NSUB -> e NSUB_
PRXY host auth_ -> e (PRXY_, ' ', strEncode host, ' ', auth_)
PFWD {} -> error "TODO: e (PFWD_,,)"
RFWD {} -> error "TODO: e (RFWD_,,)"
PRXY host auth_ -> e (PRXY_, ' ', host, auth_)
PFWD pubKey (EncTransmission s) -> e (PFWD_, ' ', pubKey, Tail s)
RFWD (EncFwdTransmission s) -> e (RFWD_, ' ', Tail s)
where
e :: Encoding a => a -> ByteString
e = smpEncode
@@ -1207,27 +1271,33 @@ instance PartyI p => ProtocolEncoding SMPVersion ErrorType (Command p) where
fromProtocolError = fromProtocolError @SMPVersion @ErrorType @BrokerMsg
{-# INLINE fromProtocolError #-}
checkCredentials (auth, _, queueId, _) cmd = case cmd of
checkCredentials (auth, _, entId, _) cmd = case cmd of
-- NEW must have signature but NOT queue ID
NEW {}
| isNothing auth -> Left $ CMD NO_AUTH
| not (B.null queueId) -> Left $ CMD HAS_AUTH
| not (B.null entId) -> Left $ CMD HAS_AUTH
| otherwise -> Right cmd
-- SEND must have queue ID, signature is not always required
SEND {}
| B.null queueId -> Left $ CMD NO_ENTITY
| B.null entId -> Left $ CMD NO_ENTITY
| otherwise -> Right cmd
-- PING must not have queue ID or signature
PING
| isNothing auth && B.null queueId -> Right cmd
| otherwise -> Left $ CMD HAS_AUTH
PRXY {}
| isNothing auth && B.null queueId -> Right cmd
PING -> noAuthCmd
PRXY {} -> noAuthCmd
PFWD {}
| B.null entId -> Left $ CMD NO_ENTITY
| isNothing auth -> Right cmd
| otherwise -> Left $ CMD HAS_AUTH
RFWD _ -> noAuthCmd
-- other client commands must have both signature and queue ID
_
| isNothing auth || B.null queueId -> Left $ CMD NO_AUTH
| isNothing auth || B.null entId -> Left $ CMD NO_AUTH
| otherwise -> Right cmd
where
-- command must not have entity ID (queue or session ID) or signature
noAuthCmd :: Either ErrorType (Command p)
noAuthCmd
| isNothing auth && B.null entId = Right cmd
| otherwise = Left $ CMD HAS_AUTH
instance ProtocolEncoding SMPVersion ErrorType Cmd where
type Tag Cmd = CmdTag
@@ -1255,9 +1325,11 @@ instance ProtocolEncoding SMPVersion ErrorType Cmd where
Cmd SSender <$> case tag of
SEND_ -> SEND <$> _smpP <*> (unTail <$> _smpP)
PING_ -> pure PING
PFWD_ -> error "TODO: PFWD_"
RFWD_ -> error "TODO: RFWD_"
PRXY_ -> PRXY <$> (_smpP >>= either fail pure . strDecode) <*> _smpP
RFWD_ -> RFWD <$> (EncFwdTransmission . unTail <$> _smpP)
CT SProxiedClient tag ->
Cmd SProxiedClient <$> case tag of
PFWD_ -> PFWD <$> _smpP <*> (EncTransmission . unTail <$> smpP)
PRXY_ -> PRXY <$> _smpP <*> smpP
CT SNotifier NSUB_ -> pure $ Cmd SNotifier NSUB
fromProtocolError = fromProtocolError @SMPVersion @ErrorType @BrokerMsg
@@ -1273,7 +1345,7 @@ instance ProtocolEncoding SMPVersion ErrorType BrokerMsg where
e (MSG_, ' ', msgId, Tail body)
NID nId srvNtfDh -> e (NID_, ' ', nId, srvNtfDh)
NMSG nmsgNonce encNMsgMeta -> e (NMSG_, ' ', nmsgNonce, encNMsgMeta)
PKEY cert key -> e (PKEY_, ' ', C.encodeCertChain cert, C.SignedObject key)
PKEY sid vr (cert, key) -> e (PKEY_, ' ', sid, vr, C.encodeCertChain cert, C.SignedObject key)
RRES (EncFwdResponse encBlock) -> e (RRES_, ' ', Tail encBlock)
PRES (EncResponse encBlock) -> e (PRES_, ' ', Tail encBlock)
END -> e END_
@@ -1293,7 +1365,7 @@ instance ProtocolEncoding SMPVersion ErrorType BrokerMsg where
IDS_ -> IDS <$> (QIK <$> _smpP <*> smpP <*> smpP)
NID_ -> NID <$> _smpP <*> smpP
NMSG_ -> NMSG <$> _smpP <*> smpP
PKEY_ -> PKEY <$> (A.space *> C.certChainP) <*> (C.getSignedExact <$> smpP)
PKEY_ -> PKEY <$> _smpP <*> smpP <*> ((,) <$> C.certChainP <*> (C.getSignedExact <$> smpP))
RRES_ -> RRES <$> (EncFwdResponse . unTail <$> _smpP)
PRES_ -> PRES <$> (EncResponse . unTail <$> _smpP)
END_ -> pure END
@@ -1308,19 +1380,24 @@ instance ProtocolEncoding SMPVersion ErrorType BrokerMsg where
PEBlock -> BLOCK
{-# INLINE fromProtocolError #-}
checkCredentials (_, _, queueId, _) cmd = case cmd of
checkCredentials (_, _, entId, _) cmd = case cmd of
-- IDS response should not have queue ID
IDS _ -> Right cmd
-- ERR response does not always have queue ID
ERR _ -> Right cmd
-- PONG response must not have queue ID
PONG
| B.null queueId -> Right cmd
| otherwise -> Left $ CMD HAS_AUTH
PONG -> noEntityMsg
PKEY {} -> noEntityMsg
RRES _ -> noEntityMsg
-- other broker responses must have queue ID
_
| B.null queueId -> Left $ CMD NO_ENTITY
| B.null entId -> Left $ CMD NO_ENTITY
| otherwise -> Right cmd
where
noEntityMsg :: Either ErrorType BrokerMsg
noEntityMsg
| B.null entId = Right cmd
| otherwise = Left $ CMD HAS_AUTH
-- | Parse SMP protocol commands and broker messages
parseProtocol :: forall v err msg. ProtocolEncoding v err msg => Version v -> ByteString -> Either err msg
@@ -1343,6 +1420,7 @@ instance Encoding ErrorType where
BLOCK -> "BLOCK"
SESSION -> "SESSION"
CMD err -> "CMD " <> smpEncode err
PROXY err -> "PROXY " <> smpEncode err
AUTH -> "AUTH"
QUOTA -> "QUOTA"
EXPIRED -> "EXPIRED"
@@ -1356,6 +1434,7 @@ instance Encoding ErrorType where
"BLOCK" -> pure BLOCK
"SESSION" -> pure SESSION
"CMD" -> CMD <$> _smpP
"PROXY" -> PROXY <$> _smpP
"AUTH" -> pure AUTH
"QUOTA" -> pure QUOTA
"EXPIRED" -> pure EXPIRED
@@ -1381,7 +1460,29 @@ instance Encoding CommandError where
"NO_AUTH" -> pure NO_AUTH
"HAS_AUTH" -> pure HAS_AUTH
"NO_ENTITY" -> pure NO_ENTITY
"NO_QUEUE" -> pure NO_ENTITY
"NO_QUEUE" -> pure NO_ENTITY -- for backward compatibility
_ -> fail "bad command error type"
instance Encoding ProxyError where
smpEncode e = case e of
PROTOCOL -> "PROTOCOL"
RESPONSE -> "RESPONSE"
UNEXPECTED -> "UNEXPECTED"
TIMEOUT -> "TIMEOUT"
NETWORK -> "NETWORK"
BAD_HOST -> "BAD_HOST"
NO_SESSION -> "NO_SESSION"
TRANSPORT -> "TRANSPORT"
smpP =
A.takeTill (== ' ') >>= \case
"PROTOCOL" -> pure PROTOCOL
"RESPONSE" -> pure RESPONSE
"UNEXPECTED" -> pure UNEXPECTED
"TIMEOUT" -> pure TIMEOUT
"NETWORK" -> pure NETWORK
"BAD_HOST" -> pure BAD_HOST
"NO_SESSION" -> pure NO_SESSION
"TRANSPORT" -> pure TRANSPORT
_ -> fail "bad command error type"
-- | Send signed SMP transmission to TCP transport.
@@ -1521,4 +1622,6 @@ $(J.deriveJSON defaultJSON ''MsgFlags)
$(J.deriveJSON (sumTypeJSON id) ''CommandError)
$(J.deriveJSON (sumTypeJSON id) ''ProxyError)
$(J.deriveJSON (sumTypeJSON id) ''ErrorType)
+132 -67
View File
@@ -13,7 +13,6 @@
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
-- |
-- Module : Simplex.Messaging.Server
@@ -43,6 +42,7 @@ import Control.Monad
import Control.Monad.Except
import Control.Monad.IO.Unlift
import Control.Monad.Reader
import Control.Monad.Trans.Except
import Crypto.Random
import Data.Bifunctor (first)
import Data.ByteString.Base64 (encode)
@@ -54,6 +54,7 @@ import Data.Functor (($>))
import Data.Int (Int64)
import qualified Data.IntMap.Strict as IM
import Data.List (intercalate)
import Data.List.NonEmpty (NonEmpty)
import qualified Data.List.NonEmpty as L
import qualified Data.Map.Strict as M
import Data.Maybe (isNothing)
@@ -67,8 +68,10 @@ import GHC.Stats (getRTSStats)
import GHC.TypeLits (KnownNat)
import Network.Socket (ServiceName, Socket, socketToHandle)
import Simplex.Messaging.Agent.Lock
import Simplex.Messaging.Client (ProtocolClient (thParams), forwardSMPMessage, smpProxyError)
import Simplex.Messaging.Client.Agent (SMPClientAgent (..), SMPClientAgentEvent (..), getSMPServerClient', lookupSMPServerClient)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding (Encoding (smpEncode))
import Simplex.Messaging.Encoding
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol
import Simplex.Messaging.Server.Control
@@ -90,6 +93,7 @@ import System.Exit (exitFailure)
import System.IO (hPrint, hPutStrLn, hSetNewlineMode, universalNewlineMode)
import System.Mem.Weak (deRefWeak)
import UnliftIO (timeout)
import UnliftIO.Async (mapConcurrently)
import UnliftIO.Concurrent
import UnliftIO.Directory (doesFileExist, renameFile)
import UnliftIO.Exception
@@ -122,11 +126,13 @@ type M a = ReaderT Env IO a
smpServer :: TMVar Bool -> ServerConfig -> M ()
smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
s <- asks server
pa <- asks proxyAgent
expired <- restoreServerMessages
restoreServerStats expired
raceAny_
( serverThread s "server subscribedQ" subscribedQ subscribers subscriptions cancelSub
: serverThread s "server ntfSubscribedQ" ntfSubscribedQ Env.notifiers ntfSubscriptions (\_ -> pure ())
: receiveFromProxyAgent pa
: map runServer transports <> expireMessagesThread_ cfg <> serverStatsThread_ cfg <> controlPortThread_ cfg
)
`finally` withLock' (savingLock s) "final" (saveServer False)
@@ -179,6 +185,19 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
mkWeakThreadId t >>= atomically . modifyTVar' (endThreads c) . IM.insert tId
atomically $ TM.lookupDelete qId (clientSubs c)
receiveFromProxyAgent :: ProxyAgent -> M ()
receiveFromProxyAgent ProxyAgent {smpAgent = SMPClientAgent {agentQ}} =
forever $
atomically (readTBQueue agentQ) >>= \case
CAConnected srv -> logInfo $ "SMP server connected " <> showServer' srv
CADisconnected srv [] -> logInfo $ "SMP server disconnected " <> showServer' srv
CADisconnected srv subs -> logError $ "SMP server disconnected " <> showServer' srv <> " / subscriptions: " <> tshow (length subs)
CAReconnected srv -> logInfo $ "SMP server reconnected " <> showServer' srv
CAResubscribed srv subs -> logError $ "SMP server resubscribed " <> showServer' srv <> " / subscriptions: " <> tshow (length subs)
CASubError srv errs -> logError $ "SMP server subscription errors " <> showServer' srv <> " / errors: " <> tshow (length errs)
where
showServer' = decodeLatin1 . strEncode . host
expireMessagesThread_ :: ServerConfig -> [M ()]
expireMessagesThread_ ServerConfig {messageExpiration = Just msgExp} = [expireMessages msgExp]
expireMessagesThread_ _ = []
@@ -314,7 +333,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
CPResume -> withAdminRole $ hPutStrLn h "resume not implemented"
CPClients -> withAdminRole $ do
active <- unliftIO u (asks clients) >>= readTVarIO
hPutStrLn h $ "clientId,sessionId,connected,createdAt,rcvActiveAt,sndActiveAt,age,subscriptions"
hPutStrLn h "clientId,sessionId,connected,createdAt,rcvActiveAt,sndActiveAt,age,subscriptions"
forM_ (IM.toList active) $ \(cid, Client {sessionId, connected, createdAt, rcvActiveAt, sndActiveAt, subscriptions}) -> do
connected' <- bshow <$> readTVarIO connected
rcvActiveAt' <- strEncode <$> readTVarIO rcvActiveAt
@@ -410,7 +429,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
hPutStrLn h "AUTH"
runClientTransport :: Transport c => THandleSMP c 'TServer -> M ()
runClientTransport th@THandle {params = THandleParams {thVersion, sessionId}} = do
runClientTransport th@THandle {params = thParams@THandleParams {thVersion, sessionId}} = do
q <- asks $ tbqSize . config
ts <- liftIO getSystemTime
active <- asks clients
@@ -422,7 +441,7 @@ runClientTransport th@THandle {params = THandleParams {thVersion, sessionId}} =
s <- asks server
expCfg <- asks $ inactiveClientExpiration . config
labelMyThread . B.unpack $ "client $" <> encode sessionId
raceAny_ ([liftIO $ send th c, client c s, receive th c] <> disconnectThread_ c expCfg)
raceAny_ ([liftIO $ send th c, client thParams c s, receive th c] <> disconnectThread_ c expCfg)
`finally` clientDisconnected c
where
disconnectThread_ c (Just expCfg) = [liftIO $ disconnectTransport th (rcvActiveAt c) (sndActiveAt c) expCfg (noSubscriptions c)]
@@ -463,19 +482,19 @@ receive th@THandle {params = THandleParams {thAuth}} Client {rcvQ, sndQ, rcvActi
forever $ do
ts <- L.toList <$> liftIO (tGet th)
atomically . writeTVar rcvActiveAt =<< liftIO getSystemTime
as <- partitionEithers <$> mapM cmdAction ts
write sndQ $ fst as
write rcvQ $ snd as
(errs, cmds) <- partitionEithers <$> mapM cmdAction ts
write sndQ errs
write rcvQ cmds
where
cmdAction :: SignedTransmission ErrorType Cmd -> M (Either (Transmission BrokerMsg) (Maybe QueueRec, Transmission Cmd))
cmdAction (tAuth, authorized, (corrId, queueId, cmdOrError)) =
cmdAction (tAuth, authorized, (corrId, entId, cmdOrError)) =
case cmdOrError of
Left e -> pure $ Left (corrId, queueId, ERR e)
Right cmd -> verified <$> verifyTransmission ((,C.cbNonce (bs corrId)) <$> thAuth) tAuth authorized queueId cmd
Left e -> pure $ Left (corrId, entId, ERR e)
Right cmd -> verified <$> verifyTransmission ((,C.cbNonce (bs corrId)) <$> thAuth) tAuth authorized entId cmd
where
verified = \case
VRVerified qr -> Right (qr, (corrId, queueId, cmd))
VRFailed -> Left (corrId, queueId, ERR AUTH)
VRVerified qr -> Right (qr, (corrId, entId, cmd))
VRFailed -> Left (corrId, entId, ERR AUTH)
write q = mapM_ (atomically . writeTBQueue q) . L.nonEmpty
send :: Transport c => THandleSMP c 'TServer -> Client -> IO ()
@@ -522,19 +541,18 @@ verifyTransmission auth_ tAuth authorized queueId cmd =
-- SEND will be accepted without authorization before the queue is secured with KEY command
Cmd SSender SEND {} -> verifyQueue (\q -> Just q `verified` maybe (isNothing tAuth) verify (senderKey q)) <$> get SSender
Cmd SSender PING -> pure $ VRVerified Nothing
-- NSUB will not be accepted without authorization
Cmd SNotifier NSUB -> verifyQueue (\q -> maybe dummyVerify (Just q `verifiedWith`) (notifierKey <$> notifier q)) <$> get SNotifier
Cmd SSender PRXY {} -> pure $ VRVerified Nothing
Cmd SSender PFWD {} -> pure $ VRVerified Nothing
Cmd SSender RFWD {} -> pure $ VRVerified Nothing
-- NSUB will not be accepted without authorization
Cmd SNotifier NSUB -> verifyQueue (\q -> maybe dummyVerify (\n -> Just q `verifiedWith` notifierKey n) (notifier q)) <$> get SNotifier
Cmd SProxiedClient _ -> pure $ VRVerified Nothing
where
verify = verifyCmdAuthorization auth_ tAuth authorized
dummyVerify = verify (dummyAuthKey tAuth) `seq` VRFailed
verifyQueue :: (QueueRec -> VerificationResult) -> Either ErrorType QueueRec -> VerificationResult
verifyQueue = either (\_ -> dummyVerify)
verifyQueue = either (const dummyVerify)
verified q cond = if cond then VRVerified q else VRFailed
verifiedWith q k = q `verified` verify k
get :: SParty p -> M (Either ErrorType QueueRec)
get :: DirectParty p => SParty p -> M (Either ErrorType QueueRec)
get party = do
st <- asks queueStore
atomically $ getQueue st party queueId
@@ -584,35 +602,55 @@ dummyKeyEd448 = "MEMwBQYDK2VxAzoA6ibQc9XpkSLtwrf7PLvp81qW/etiumckVFImCMRdftcG/Xo
dummyKeyX25519 :: C.PublicKey 'C.X25519
dummyKeyX25519 = "MCowBQYDK2VuAyEA4JGSMYht18H4mas/jHeBwfcM7jLwNYJNOAhi2/g4RXg="
client :: Client -> Server -> M ()
client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId} Server {subscribedQ, ntfSubscribedQ, notifiers} = do
client :: THandleParams SMPVersion 'TServer -> Client -> Server -> M ()
client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId} Server {subscribedQ, ntfSubscribedQ, notifiers} = do
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " commands"
forever $ atomically (readTBQueue rcvQ) >>= mapM processCommand
forever $ do
(cmds, rs) <- partitionEithers . L.toList <$> (mapM processCommand =<< atomically (readTBQueue rcvQ))
mapM_ reply (L.nonEmpty rs)
-- TODO cancel this thread if the client gets disconnected
-- TODO limit client concurrency
forkIO $ mapM_ reply . L.nonEmpty =<< mapConcurrently processProxiedCmd cmds
where
reply :: Transmission BrokerMsg -> IO ()
reply :: MonadIO m => NonEmpty (Transmission BrokerMsg) -> m ()
reply = atomically . writeTBQueue sndQ
processCommand :: (Maybe QueueRec, Transmission Cmd) -> M ()
processProxiedCmd :: Transmission (Command 'ProxiedClient) -> M (Transmission BrokerMsg)
processProxiedCmd (corrId, sessId, command) = (corrId, sessId,) <$> case command of
PRXY srv auth -> ifM allowProxy getRelay (pure $ ERR AUTH)
where
allowProxy = do
ServerConfig {allowSMPProxy, newQueueBasicAuth} <- asks config
pure $ allowSMPProxy && maybe True ((== auth) . Just) newQueueBasicAuth
getRelay = do
ProxyAgent {smpAgent} <- asks proxyAgent
-- TODO catch IO errors too
liftIO $ proxyResp <$> runExceptT (getSMPServerClient' smpAgent srv)
where
proxyResp = \case
Right smp ->
let THandleParams {sessionId = srvSessId, thAuth} = thParams smp
vr = supportedServerSMPRelayVRange
in case thAuth of
Just THAuthClient {serverCertKey} -> PKEY srvSessId vr serverCertKey
Nothing -> ERR $ PROXY TRANSPORT -- TODO different error?
Left err -> ERR $ smpProxyError err
PFWD pubKey encBlock -> do
ProxyAgent {smpAgent} <- asks proxyAgent
atomically (lookupSMPServerClient smpAgent sessId) >>= \case
Just smp -> liftIO $ either (ERR . smpProxyError) PRES <$> runExceptT (forwardSMPMessage smp corrId pubKey encBlock)
Nothing -> pure $ ERR $ PROXY NO_SESSION
processCommand :: (Maybe QueueRec, Transmission Cmd) -> M (Either (Transmission (Command 'ProxiedClient)) (Transmission BrokerMsg))
processCommand (qr_, (corrId, queueId, cmd)) = do
st <- asks queueStore
case cmd of
Cmd SSender command ->
case command of
SEND flags msgBody -> reply =<< withQueue (\qr -> sendMessage qr flags msgBody)
PING -> reply (corrId, "", PONG)
PRXY relay auth ->
ifM
allowProxy
(setupProxy relay $> Nothing)
(pure (corrId, queueId, ERR AUTH))
where
allowProxy = do
ServerConfig {allowSMPProxy, newQueueBasicAuth} <- asks config
pure $ allowSMPProxy && maybe True ((== auth) . Just) newQueueBasicAuth
PFWD _dhPub _encBlock -> error "TODO: processCommand.PFWD"
RFWD _encBlock -> error "TODO: processCommand.RFWD"
Cmd SNotifier NSUB -> reply =<< subscribeNotifications
Cmd SProxiedClient command -> pure $ Left (corrId, queueId, command)
Cmd SSender command -> Right <$> case command of
SEND flags msgBody -> withQueue $ \qr -> sendMessage qr flags msgBody
PING -> pure (corrId, "", PONG)
RFWD encBlock -> (corrId, "",) <$> processForwardedCommand encBlock
Cmd SNotifier NSUB -> Right <$> subscribeNotifications
Cmd SRecipient command ->
reply =<< case command of
Right <$> case command of
NEW rKey dhKey auth subMode ->
ifM
allowNew
@@ -876,6 +914,59 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId} Serv
encNMsgMeta = C.cbEncrypt rcvNtfDhSecret cbNonce (smpEncode msgMeta) 128
pure . (cbNonce,) $ fromRight "" encNMsgMeta
processForwardedCommand :: EncFwdTransmission -> M BrokerMsg
processForwardedCommand (EncFwdTransmission s) = fmap (either id id) . runExceptT $ do
-- TODO error
THAuthServer {clientPeerPubKey, serverPrivKey} <- maybe (throwError $ ERR INTERNAL) pure thAuth
-- TODO compute during handshake?
let sessSecret = C.dh' clientPeerPubKey serverPrivKey
proxyNonce = C.cbNonce $ bs corrId
-- TODO error
s' <- liftEitherWith internalErr $ C.cbDecrypt sessSecret proxyNonce s
-- TODO error
FwdTransmission {fwdCorrId, fwdKey, fwdTransmission = EncTransmission et} <- liftEitherWith internalErr $ smpDecode s'
-- TODO error - this error is reported to proxy, as we failed to get to client's transmission
let clientSecret = C.dh' fwdKey serverPrivKey
clientNonce = C.cbNonce $ bs fwdCorrId
b <- liftEitherWith internalErr $ C.cbDecrypt clientSecret clientNonce et
-- only allowing single forwarded transactions
let t' = tDecodeParseValidate thParams' $ L.head $ tParse thParams' b
clntThAuth = Just $ THAuthServer {clientPeerPubKey = fwdKey, serverPrivKey}
-- TODO error
r <-
lift (rejectOrVerify clntThAuth t') >>= \case
Left r -> pure r
Right t''@(_, (corrId', entId', _)) ->
-- Left will not be returned by processCommand, as only SEND command is allowed
fromRight (corrId', entId', ERR INTERNAL) <$> lift (processCommand t'')
-- encode response
r' <- case batchTransmissions (batch thParams') (blockSize thParams') [Right (Nothing, encodeTransmission thParams' r)] of
[] -> throwE $ ERR INTERNAL -- TODO error
TBError _ _ : _ -> throwE $ ERR INTERNAL -- TODO error
TBTransmission b' _ : _ -> pure b'
TBTransmissions b' _ _ : _ -> pure b'
-- encrypt to client
r2 <- liftEitherWith internalErr $ EncResponse <$> C.cbEncrypt clientSecret (C.reverseNonce clientNonce) r' paddedProxiedMsgLength
-- encrypt to proxy
let fr = FwdResponse {fwdCorrId, fwdResponse = r2}
r3 <- liftEitherWith internalErr $ EncFwdResponse <$> C.cbEncrypt sessSecret (C.reverseNonce proxyNonce) (smpEncode fr) paddedForwardedMsgLength
pure $ RRES r3
where
internalErr _ = ERR INTERNAL -- TODO errors
THandleParams {thAuth} = thParams'
rejectOrVerify :: Maybe (THandleAuth 'TServer) -> SignedTransmission ErrorType Cmd -> M (Either (Transmission BrokerMsg) (Maybe QueueRec, Transmission Cmd))
rejectOrVerify clntThAuth (tAuth, authorized, (corrId', entId', cmdOrError)) =
case cmdOrError of
Left e -> pure $ Left (corrId', entId', ERR e)
-- flags msgBody -> withQueue $ \qr -> sendMessage qr flags msgBody
Right cmd'@(Cmd SSender SEND {}) -> verified <$> verifyTransmission ((,C.cbNonce (bs corrId')) <$> clntThAuth) tAuth authorized entId' cmd'
where
verified = \case
VRVerified qr -> Right (qr, (corrId', entId', cmd'))
VRFailed -> Left (corrId', entId', ERR AUTH)
Right _ -> pure $ Left (corrId', entId', ERR $ CMD PROHIBITED)
deliverMessage :: T.Text -> QueueRec -> RecipientId -> TVar Sub -> MsgQueue -> Maybe Message -> M (Transmission BrokerMsg)
deliverMessage name qr rId sub q msg_ = time (name <> " deliver") $ do
readTVarIO sub >>= \case
@@ -935,20 +1026,6 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId} Serv
Right q -> updateDeletedStats q $> ok
Left e -> pure $ err e
setupProxy :: SMPServer -> M ()
setupProxy relay = do
-- decide if to use existing session or to create a new one
-- if exists, reply straight away
-- TODO
-- if not, request session via the queue
ProxyAgent {connectQ} <- asks proxyAgent
writeTBQueue connectQ (relay, reply . response)
where
response :: Either ErrorType (SessionId, X.CertificateChain, X.SignedExact X.PubKey) -> Transmission BrokerMsg
response = \case
Left e -> err e
Right (sessId, chain, key) -> (corrId, queueId, PKEY sessId chain key)
ok :: Transmission BrokerMsg
ok = (corrId, queueId, OK)
@@ -958,18 +1035,6 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId} Serv
okResp :: Either ErrorType () -> Transmission BrokerMsg
okResp = either err $ const ok
smpProxyAgent :: ProxyAgent -> M ()
smpProxyAgent ProxyAgent {connectQ} = raceAny_ [connectRelay, receiveAgent]
where
-- check for session var for pending session
-- if exists - wait
-- if doesn't - create session var, and spawn worker
connectRelay :: M ()
connectRelay = pure ()
receiveAgent :: M ()
receiveAgent = pure ()
updateDeletedStats :: QueueRec -> M ()
updateDeletedStats q = do
stats <- asks serverStats
+12 -16
View File
@@ -15,7 +15,6 @@ import qualified Data.IntMap.Strict as IM
import Data.List.NonEmpty (NonEmpty)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Data.Text (Text)
import Data.Time.Clock (getCurrentTime)
import Data.Time.Clock.System (SystemTime)
import Data.X509.Validation (Fingerprint (..))
@@ -24,6 +23,8 @@ import qualified Network.TLS as T
import Numeric.Natural
import Simplex.Messaging.Agent.Env.SQLite (Worker)
import Simplex.Messaging.Agent.Lock
import Simplex.Messaging.Client (SMPClient)
import Simplex.Messaging.Client.Agent (SMPClientAgent, SMPClientAgentConfig, newSMPClientAgent)
import Simplex.Messaging.Crypto (KeyHash (..))
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Protocol
@@ -35,7 +36,7 @@ import Simplex.Messaging.Server.Stats
import Simplex.Messaging.Server.StoreLog
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport (ATransport, SessionId, VersionSMP, VersionRangeSMP)
import Simplex.Messaging.Transport (ATransport, VersionRangeSMP, VersionSMP)
import Simplex.Messaging.Transport.Server (SocketState, TransportServerConfig, loadFingerprint, loadTLSServerParams, newSocketState)
import System.IO (IOMode (..))
import System.Mem.Weak (Weak)
@@ -82,6 +83,7 @@ data ServerConfig = ServerConfig
transportConfig :: TransportServerConfig,
-- | run listener on control port
controlPort :: Maybe ServiceName,
smpAgentCfg :: SMPClientAgentConfig,
allowSMPProxy :: Bool -- auth is the same with `newQueueBasicAuth`
}
@@ -127,18 +129,13 @@ data Server = Server
}
data ProxyAgent = ProxyAgent
{ relaySessions :: TMap SessionId RelaySession,
-- Speed up client lookups by server address.
-- if keyhash provided by the client is different from keyhash(es?) received in session,
-- server can refuse the request for proxy session.
relays :: TMap (TransportHost, ServiceName) (SessionId, C.KeyHash),
connectQ :: TBQueue (SMPServer, Either ErrorType (SessionId, X.CertificateChain, X.SignedExact X.PubKey) -> IO ()) -- sndQ to send relay session to the client client
{ smpAgent :: SMPClientAgent
}
data RelaySession = RelaySession
{ worker :: Worker,
-- SessionId??
proxyKey :: C.PublicKeyX25519, -- ???
smpClient :: SMPClient,
relayQ :: TBQueue (ClientId, CorrId, C.PublicKeyX25519, ByteString) -- FWD args from multiple clients using this server
-- can be used for QUOTA retries until the session is gone
}
@@ -199,7 +196,7 @@ newSubscription subThread = do
return Sub {subThread, delivered}
newEnv :: ServerConfig -> IO Env
newEnv config@ServerConfig {caCertificateFile, certificateFile, privateKeyFile, storeLogFile} = do
newEnv config@ServerConfig {caCertificateFile, certificateFile, privateKeyFile, storeLogFile, smpAgentCfg} = do
server <- atomically newServer
queueStore <- atomically newQueueStore
msgStore <- atomically newMsgStore
@@ -212,7 +209,7 @@ newEnv config@ServerConfig {caCertificateFile, certificateFile, privateKeyFile,
sockets <- atomically newSocketState
clientSeq <- newTVarIO 0
clients <- newTVarIO mempty
proxyAgent <- newSMPProxyAgent
proxyAgent <- atomically $ newSMPProxyAgent smpAgentCfg random
return Env {config, server, serverIdentity, queueStore, msgStore, random, storeLog, tlsServerParams, serverStats, sockets, clientSeq, clients, proxyAgent}
where
restoreQueues :: QueueStore -> FilePath -> IO (StoreLog 'WriteMode)
@@ -230,8 +227,7 @@ newEnv config@ServerConfig {caCertificateFile, certificateFile, privateKeyFile,
Nothing -> id
Just NtfCreds {notifierId} -> M.insert notifierId (recipientId q)
newSMPProxyAgent :: IO ProxyAgent
newSMPProxyAgent = do
relays <- atomically TM.empty
relaySessions <- atomically TM.empty
pure ProxyAgent {relays, relaySessions}
newSMPProxyAgent :: SMPClientAgentConfig -> TVar ChaChaDRG -> STM ProxyAgent
newSMPProxyAgent smpAgentCfg random = do
smpAgent <- newSMPClientAgent smpAgentCfg random
pure ProxyAgent {smpAgent}
+5 -2
View File
@@ -18,6 +18,8 @@ import qualified Data.Text as T
import Data.Text.Encoding (encodeUtf8)
import Network.Socket (HostName)
import Options.Applicative
import Simplex.Messaging.Client (ProtocolClientConfig (..))
import Simplex.Messaging.Client.Agent (SMPClientAgentConfig (..), defaultSMPClientAgentConfig)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol (BasicAuth (..), ProtoServerWithAuth (ProtoServerWithAuth), pattern SMPServer)
@@ -25,10 +27,11 @@ import Simplex.Messaging.Server (runSMPServer)
import Simplex.Messaging.Server.CLI
import Simplex.Messaging.Server.Env.STM (ServerConfig (..), defMsgExpirationDays, defaultInactiveClientExpiration, defaultMessageExpiration)
import Simplex.Messaging.Server.Expiration
import Simplex.Messaging.Transport (simplexMQVersion, supportedServerSMPRelayVRange)
import Simplex.Messaging.Transport (simplexMQVersion, supportedServerSMPRelayVRange, batchCmdsSMPVersion, sendingProxySMPVersion)
import Simplex.Messaging.Transport.Client (TransportHost (..))
import Simplex.Messaging.Transport.Server (TransportServerConfig (..), defaultTransportServerConfig)
import Simplex.Messaging.Util (safeDecodeUtf8)
import Simplex.Messaging.Version (mkVersionRange)
import System.Directory (createDirectoryIfMissing, doesFileExist)
import System.FilePath (combine)
import System.IO (BufferMode (..), hSetBuffering, stderr, stdout)
@@ -214,6 +217,7 @@ smpServerCLI cfgPath logPath =
{ logTLSErrors = fromMaybe False $ iniOnOff "TRANSPORT" "log_tls_errors" ini
},
controlPort = either (const Nothing) (Just . T.unpack) $ lookupValue "TRANSPORT" "control_port" ini,
smpAgentCfg = defaultSMPClientAgentConfig {smpCfg = (smpCfg defaultSMPClientAgentConfig) {serverVRange = mkVersionRange batchCmdsSMPVersion sendingProxySMPVersion}},
allowSMPProxy = True -- TODO: "get from INI"
}
@@ -306,4 +310,3 @@ cliCommandP cfgPath logPath iniFile =
pure InitOptions {enableStoreLog, logStats, signAlgorithm, ip, fqdn, password, scripted}
parseBasicAuth :: ReadM ServerPassword
parseBasicAuth = eitherReader $ fmap ServerPassword . strDecode . B.pack
@@ -54,7 +54,7 @@ addQueue QueueStore {queues, senders} q@QueueRec {recipientId = rId, senderId =
where
hasId = (||) <$> TM.member rId queues <*> TM.member sId senders
getQueue :: QueueStore -> SParty p -> QueueId -> STM (Either ErrorType QueueRec)
getQueue :: DirectParty p => QueueStore -> SParty p -> QueueId -> STM (Either ErrorType QueueRec)
getQueue QueueStore {queues, senders, notifiers} party qId =
toResult <$> (mapM readTVar =<< getVar)
where
+1 -1
View File
@@ -116,7 +116,7 @@ import UnliftIO.STM
-- * Transport parameters
smpBlockSize :: Int
smpBlockSize = 16384
smpBlockSize = 16384 * 2
-- SMP protocol version history:
-- 1 - binary protocol encoding (1/1/2022)
+6 -6
View File
@@ -281,12 +281,12 @@ randomSUB_ :: (C.AlgorithmI a, C.AuthAlgorithm a) => C.SAlgorithm a -> VersionSM
randomSUB_ a v sessId = do
g <- C.newRandom
rId <- atomically $ C.randomBytes 24 g
corrId <- atomically $ CorrId <$> C.randomBytes 24 g
nonce@(C.CbNonce corrId) <- atomically $ C.randomCbNonce g
(rKey, rpKey) <- atomically $ C.generateAuthKeyPair a g
thAuth_ <- testTHandleAuth v g rKey
let thParams = testTHandleParams v sessId
TransmissionForAuth {tForAuth, tToSend} = encodeTransmissionForAuth thParams (corrId, rId, Cmd SRecipient SUB)
pure $ (,tToSend) <$> authTransmission thAuth_ (Just rpKey) corrId tForAuth
TransmissionForAuth {tForAuth, tToSend} = encodeTransmissionForAuth thParams (CorrId corrId, rId, Cmd SRecipient SUB)
pure $ (,tToSend) <$> authTransmission thAuth_ (Just rpKey) nonce tForAuth
randomSUBCmd :: ProtocolClient SMPVersion ErrorType BrokerMsg -> IO (PCTransmission ErrorType BrokerMsg)
randomSUBCmd = randomSUBCmd_ C.SEd25519
@@ -311,13 +311,13 @@ randomSEND_ :: (C.AlgorithmI a, C.AuthAlgorithm a) => C.SAlgorithm a -> VersionS
randomSEND_ a v sessId len = do
g <- C.newRandom
sId <- atomically $ C.randomBytes 24 g
corrId <- atomically $ CorrId <$> C.randomBytes 3 g
nonce@(C.CbNonce corrId) <- atomically $ C.randomCbNonce g
(sKey, spKey) <- atomically $ C.generateAuthKeyPair a g
thAuth_ <- testTHandleAuth v g sKey
msg <- atomically $ C.randomBytes len g
let thParams = testTHandleParams v sessId
TransmissionForAuth {tForAuth, tToSend} = encodeTransmissionForAuth thParams (corrId, sId, Cmd SSender $ SEND noMsgFlags msg)
pure $ (,tToSend) <$> authTransmission thAuth_ (Just spKey) corrId tForAuth
TransmissionForAuth {tForAuth, tToSend} = encodeTransmissionForAuth thParams (CorrId corrId, sId, Cmd SSender $ SEND noMsgFlags msg)
pure $ (,tToSend) <$> authTransmission thAuth_ (Just spKey) nonce tForAuth
testTHandleParams :: VersionSMP -> ByteString -> THandleParams SMPVersion 'TClient
testTHandleParams v sessionId =
+7 -3
View File
@@ -2,7 +2,6 @@
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TypeApplications #-}
{-# OPTIONS_GHC -Wno-orphans #-}
module CoreTests.ProtocolErrorTests where
@@ -14,9 +13,10 @@ import GHC.Generics (Generic)
import Generic.Random (genericArbitraryU)
import Simplex.FileTransfer.Transport (XFTPErrorType (..))
import Simplex.Messaging.Agent.Protocol
import qualified Simplex.Messaging.Agent.Protocol as Agent
import Simplex.Messaging.Encoding
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol (CommandError (..), ErrorType (..))
import Simplex.Messaging.Protocol (CommandError (..), ErrorType (..), ProxyError (..))
import Simplex.Messaging.Transport (HandshakeError (..), TransportError (..))
import Simplex.RemoteControl.Types (RCErrorType (..))
import Test.Hspec
@@ -33,7 +33,7 @@ protocolErrorTests = modifyMaxSuccess (const 1000) $ do
|| strDecode (strEncode err) == Right err
where
errHasSpaces = \case
BROKER srv (RESPONSE e) -> hasSpaces srv || hasSpaces e
BROKER srv (Agent.RESPONSE e) -> hasSpaces srv || hasSpaces e
BROKER srv _ -> hasSpaces srv
_ -> False
hasSpaces s = ' ' `B.elem` encodeUtf8 (T.pack s)
@@ -54,6 +54,8 @@ deriving instance Generic ErrorType
deriving instance Generic CommandError
deriving instance Generic ProxyError
deriving instance Generic TransportError
deriving instance Generic HandshakeError
@@ -78,6 +80,8 @@ instance Arbitrary ErrorType where arbitrary = genericArbitraryU
instance Arbitrary CommandError where arbitrary = genericArbitraryU
instance Arbitrary ProxyError where arbitrary = genericArbitraryU
instance Arbitrary TransportError where arbitrary = genericArbitraryU
instance Arbitrary HandshakeError where arbitrary = genericArbitraryU
+9 -2
View File
@@ -16,7 +16,8 @@ import Control.Monad.Except (runExceptT)
import Data.ByteString.Char8 (ByteString)
import Data.List.NonEmpty (NonEmpty)
import Network.Socket
import Simplex.Messaging.Client (chooseTransportHost, defaultNetworkConfig)
import Simplex.Messaging.Client (ProtocolClientConfig (..), chooseTransportHost, defaultNetworkConfig)
import Simplex.Messaging.Client.Agent (SMPClientAgentConfig (..), defaultSMPClientAgentConfig)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding
import Simplex.Messaging.Protocol
@@ -112,6 +113,7 @@ cfg =
smpServerVRange = supportedServerSMPRelayVRange,
transportConfig = defaultTransportServerConfig,
controlPort = Nothing,
smpAgentCfg = defaultSMPClientAgentConfig,
allowSMPProxy = False
}
@@ -119,7 +121,12 @@ cfgV7 :: ServerConfig
cfgV7 = cfg {smpServerVRange = mkVersionRange batchCmdsSMPVersion authCmdsSMPVersion}
proxyCfg :: ServerConfig
proxyCfg = cfg {allowSMPProxy = True}
proxyCfg =
cfgV7
{ allowSMPProxy = True,
smpServerVRange = mkVersionRange batchCmdsSMPVersion sendingProxySMPVersion,
smpAgentCfg = defaultSMPClientAgentConfig {smpCfg = (smpCfg defaultSMPClientAgentConfig) {serverVRange = mkVersionRange batchCmdsSMPVersion sendingProxySMPVersion}}
}
withSmpServerStoreMsgLogOn :: HasCallStack => ATransport -> ServiceName -> (HasCallStack => ThreadId -> IO a) -> IO a
withSmpServerStoreMsgLogOn t = withSmpServerConfigOn t cfg {storeLogFile = Just testStoreLogFile, storeMsgsFile = Just testStoreMsgsFile, serverStatsBackupFile = Just testServerStatsBackupFile}
+53 -21
View File
@@ -1,38 +1,79 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
module SMPProxyTests where
import AgentTests.FunctionalAPITests (runRight_)
import Debug.Trace
import SMPAgentClient (testSMPServer, testSMPServer2)
import SMPClient
import ServerTests (sendRecv)
import qualified SMPClient as SMP
import ServerTests (decryptMsgV3, sendRecv)
import Simplex.Messaging.Client
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Protocol
import Simplex.Messaging.Server.Env.STM (ServerConfig (..))
import Simplex.Messaging.Transport
import Simplex.Messaging.Version (mkVersionRange)
import Test.Hspec
import UnliftIO
smpProxyTests :: Spec
smpProxyTests = do
describe "server configuration" $ do
it "refuses proxy handshake unless enabled" testNoProxy
it "checks basic auth in proxy requests" testProxyAuth
xdescribe "proxy requests" $ do
xdescribe "bad relay URIs" $ do
it "host not resolved" todo
it "when SMP port blackholed" todo
it "no SMP service at host/port" todo
it "bad SMP fingerprint" todo
it "connects to relay" testProxyConnect
xit "connects to itself as a relay" todo
describe "proxy requests" $ do
describe "bad relay URIs" $ do
xit "host not resolved" todo
xit "when SMP port blackholed" todo
xit "no SMP service at host/port" todo
xit "bad SMP fingerprint" todo
xit "batching proxy requests" todo
xdescribe "forwarding requests" $ do
it "sender-proxy-relay-recipient works" todo
it "similar timing for proxied and direct sends" todo
describe "forwarding requests" $ do
describe "deliver message via SMP proxy" $ do
it "same server" $
withSmpServerConfigOn (transport @TLS) proxyCfg testPort $ \_ -> do
let proxyServ = SMPServer SMP.testHost SMP.testPort SMP.testKeyHash
let relayServ = proxyServ
deliverMessageViaProxy proxyServ relayServ
it "different servers" $
withSmpServerConfigOn (transport @TLS) proxyCfg testPort $ \_ ->
withSmpServerConfigOn (transport @TLS) cfgV7 testPort2 $ \_ -> do
let proxyServ = SMPServer SMP.testHost SMP.testPort SMP.testKeyHash
let relayServ = SMPServer SMP.testHost SMP.testPort2 SMP.testKeyHash
deliverMessageViaProxy proxyServ relayServ
xit "sender-proxy-relay-recipient works" todo
xit "similar timing for proxied and direct sends" todo
deliverMessageViaProxy :: SMPServer -> SMPServer -> IO ()
deliverMessageViaProxy proxyServ relayServ = do
g <- C.newRandom
-- set up proxy
Right pc <- getProtocolClient g (1, proxyServ, Nothing) defaultSMPClientConfig {serverVRange = mkVersionRange batchCmdsSMPVersion sendingProxySMPVersion} Nothing (\_ -> pure ())
THAuthClient {} <- maybe (fail "getProtocolClient returned no thAuth") pure $ thAuth $ thParams pc
-- set up relay
msgQ <- newTBQueueIO 4
Right rc <- getProtocolClient g (2, relayServ, Nothing) defaultSMPClientConfig (Just msgQ) (\_ -> pure ())
runRight_ $ do
-- prepare receiving queue
(rPub, rPriv) <- atomically $ C.generateAuthKeyPair C.SEd448 g
(rdhPub, rdhPriv :: C.PrivateKeyX25519) <- atomically $ C.generateKeyPair g
QIK {sndId, rcvPublicDhKey = srvDh} <- createSMPQueue rc (rPub, rPriv) rdhPub (Just "correct") SMSubscribe
let dec = decryptMsgV3 $ C.dh' srvDh rdhPriv
-- run proxy test
(sessId, v, relayKey) <- createSMPProxySession pc relayServ (Just "correct")
proxySMPMessage pc sessId v relayKey Nothing sndId noMsgFlags "hello"
-- check delivery
(_tSess, _v, _sid, _ety, MSG RcvMessage {msgId, msgBody = EncRcvMsgBody encBody}) <- atomically $ readTBQueue msgQ
liftIO $ dec msgId encBody `shouldBe` Right "hello"
proxyVRange :: VersionRangeSMP
proxyVRange = mkVersionRange batchCmdsSMPVersion sendingProxySMPVersion
@@ -54,15 +95,6 @@ testProxyAuth = do
where
proxyCfgAuth = proxyCfg {newQueueBasicAuth = Just "correct"}
testProxyConnect :: IO ()
testProxyConnect = do
withSmpServerConfigOn (transport @TLS) proxyCfg testPort $ \_ -> do
testSMPClient_ "127.0.0.1" testPort proxyVRange $ \(th :: THandleSMP TLS 'TClient) -> do
(_, _, (_corrId, _entityId, reply)) <- sendRecv th (Nothing, "0", "", PRXY testSMPServer2 Nothing)
case reply of
Right PKEY {} -> pure ()
_ -> fail $ "bad reply: " <> show reply
todo :: IO ()
todo = do
fail "TODO"
+1 -1
View File
@@ -47,7 +47,7 @@ main = do
$ do
describe "Agent SQLite schema dump" schemaDumpTest
describe "Core tests" $ do
describe "Batching tests" batchingTests
xdescribe "Batching tests" batchingTests
describe "Encoding tests" encodingTests
describe "Protocol error tests" protocolErrorTests
describe "Version range" versionRangeTests