mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-04-26 01:02:29 +00:00
SMP proxy: low level client and server implementation
This commit is contained in:
@@ -630,6 +630,26 @@ deleteSMPQueues :: SMPClient -> NonEmpty (RcvPrivateAuthKey, RecipientId) -> IO
|
||||
deleteSMPQueues = okSMPCommands DEL
|
||||
{-# INLINE deleteSMPQueues #-}
|
||||
|
||||
-- TODO picture
|
||||
-- send PRXY :: SMPServer -> Maybe BasicAuth -> Command Sender
|
||||
-- receives PKEY :: SessionId -> X.CertificateChain -> X.SignedExact X.PubKey -> BrokerMsg
|
||||
createSMPProxySession :: SMPClient -> SMPServer -> Maybe BasicAuth -> ExceptT SMPClientError IO (SessionId, C.PublicKeyX25519)
|
||||
createSMPProxySession _proxyClnt _relayServ _proxyAuth = undefined
|
||||
|
||||
-- consider how to process slow responses - is it handled somehow locally or delegated to the caller
|
||||
-- this method is used in the client
|
||||
-- sends PFWD :: C.PublicKeyX25519 -> EncTransmission -> Command Sender
|
||||
-- receives PRES :: EncResponse -> BrokerMsg -- proxy to client
|
||||
proxySMPMessage :: SMPClient -> SessionId -> Maybe SndPrivateAuthKey -> SenderId -> MsgFlags -> MsgBody -> ExceptT SMPClientError IO ()
|
||||
proxySMPMessage _proxyClnt _relaySess _spKey _sId _flags _msg = undefined
|
||||
|
||||
-- this method is used in the server
|
||||
-- sends RFWD :: EncFwdTransmission -> Command Sender
|
||||
-- receives RRES :: EncFwdResponse -> BrokerMsg
|
||||
-- server should send PRES to the client with EncResponse
|
||||
forwardSMPMessage :: SMPClient -> C.CbNonce -> C.PublicKeyX25519 -> EncTransmission -> ExceptT SMPClientError IO EncResponse
|
||||
forwardSMPMessage _relayClnt _corrId _cmdKey _encTrans = undefined
|
||||
|
||||
okSMPCommand :: PartyI p => Command p -> SMPClient -> C.APrivateAuthKey -> QueueId -> ExceptT SMPClientError IO ()
|
||||
okSMPCommand cmd c pKey qId =
|
||||
sendSMPCommand c (Just pKey) qId cmd >>= \case
|
||||
|
||||
@@ -403,7 +403,7 @@ newtype EncTransmission = EncTransmission ByteString
|
||||
data FwdTransmission = FwdTransmission
|
||||
{ fwdCorrId :: ByteString,
|
||||
fwdKey :: C.PublicKeyX25519,
|
||||
fwdTransmission :: ByteString
|
||||
fwdTransmission :: EncTransmission
|
||||
}
|
||||
|
||||
newtype EncFwdTransmission = EncFwdTransmission ByteString
|
||||
@@ -419,7 +419,7 @@ data BrokerMsg where
|
||||
NID :: NotifierId -> RcvNtfPublicDhKey -> BrokerMsg
|
||||
NMSG :: C.CbNonce -> EncNMsgMeta -> BrokerMsg
|
||||
-- Should include certificate chain
|
||||
PKEY :: X.CertificateChain -> X.SignedExact X.PubKey -> BrokerMsg -- TLS-signed server key for proxy shared secret and initial sender key
|
||||
PKEY :: SessionId -> X.CertificateChain -> X.SignedExact X.PubKey -> BrokerMsg -- TLS-signed server key for proxy shared secret and initial sender key
|
||||
RRES :: EncFwdResponse -> BrokerMsg -- relay to proxy
|
||||
PRES :: EncResponse -> BrokerMsg -- proxy to client
|
||||
END :: BrokerMsg
|
||||
|
||||
@@ -587,23 +587,22 @@ dummyKeyX25519 = "MCowBQYDK2VuAyEA4JGSMYht18H4mas/jHeBwfcM7jLwNYJNOAhi2/g4RXg="
|
||||
client :: Client -> Server -> M ()
|
||||
client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId} Server {subscribedQ, ntfSubscribedQ, notifiers} = do
|
||||
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " commands"
|
||||
forever $
|
||||
atomically (readTBQueue rcvQ)
|
||||
>>= mapM processCommand
|
||||
>>= atomically . writeTBQueue sndQ
|
||||
forever $ atomically (readTBQueue rcvQ) >>= mapM processCommand
|
||||
where
|
||||
processCommand :: (Maybe QueueRec, Transmission Cmd) -> M (Transmission BrokerMsg)
|
||||
reply :: Transmission BrokerMsg -> IO ()
|
||||
reply = atomically . writeTBQueue sndQ
|
||||
processCommand :: (Maybe QueueRec, Transmission Cmd) -> M ()
|
||||
processCommand (qr_, (corrId, queueId, cmd)) = do
|
||||
st <- asks queueStore
|
||||
case cmd of
|
||||
Cmd SSender command ->
|
||||
case command of
|
||||
SEND flags msgBody -> withQueue $ \qr -> sendMessage qr flags msgBody
|
||||
PING -> pure (corrId, "", PONG)
|
||||
SEND flags msgBody -> reply =<< withQueue (\qr -> sendMessage qr flags msgBody)
|
||||
PING -> reply (corrId, "", PONG)
|
||||
PRXY relay auth ->
|
||||
ifM
|
||||
allowProxy
|
||||
(setupProxy relay)
|
||||
(setupProxy relay $> Nothing)
|
||||
(pure (corrId, queueId, ERR AUTH))
|
||||
where
|
||||
allowProxy = do
|
||||
@@ -611,9 +610,9 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId} Serv
|
||||
pure $ allowSMPProxy && maybe True ((== auth) . Just) newQueueBasicAuth
|
||||
PFWD _dhPub _encBlock -> error "TODO: processCommand.PFWD"
|
||||
RFWD _encBlock -> error "TODO: processCommand.RFWD"
|
||||
Cmd SNotifier NSUB -> subscribeNotifications
|
||||
Cmd SNotifier NSUB -> reply =<< subscribeNotifications
|
||||
Cmd SRecipient command ->
|
||||
case command of
|
||||
reply =<< case command of
|
||||
NEW rKey dhKey auth subMode ->
|
||||
ifM
|
||||
allowNew
|
||||
@@ -936,14 +935,19 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId} Serv
|
||||
Right q -> updateDeletedStats q $> ok
|
||||
Left e -> pure $ err e
|
||||
|
||||
setupProxy :: SMPServer -> M (Transmission BrokerMsg)
|
||||
setupProxy todo'relay = undefined
|
||||
-- do
|
||||
-- let relaySessionId = "TODO: relaySessionId"
|
||||
-- (dummyRelayDhPublic, _) <- atomically . C.generateKeyPair =<< asks random
|
||||
-- (_, dummySignKey) <- atomically . C.generateKeyPair =<< asks random
|
||||
-- let dummyRelayKeySignature = C.sign' dummySignKey $ smpEncode dummyRelayDhPublic
|
||||
-- pure (corrId, relaySessionId, PKEY dummyRelayDhPublic dummyRelayKeySignature)
|
||||
setupProxy :: SMPServer -> M ()
|
||||
setupProxy relay = do
|
||||
-- decide if to use existing session or to create a new one
|
||||
-- if exists, reply straight away
|
||||
-- TODO
|
||||
-- if not, request session via the queue
|
||||
ProxyAgent {connectQ} <- asks proxyAgent
|
||||
writeTBQueue connectQ (relay, reply . response)
|
||||
where
|
||||
response :: Either ErrorType (SessionId, X.CertificateChain, X.SignedExact X.PubKey) -> Transmission BrokerMsg
|
||||
response = \case
|
||||
Left e -> err e
|
||||
Right (sessId, chain, key) -> (corrId, queueId, PKEY sessId chain key)
|
||||
|
||||
ok :: Transmission BrokerMsg
|
||||
ok = (corrId, queueId, OK)
|
||||
@@ -954,6 +958,18 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId} Serv
|
||||
okResp :: Either ErrorType () -> Transmission BrokerMsg
|
||||
okResp = either err $ const ok
|
||||
|
||||
smpProxyAgent :: ProxyAgent -> M ()
|
||||
smpProxyAgent ProxyAgent {connectQ} = raceAny_ [connectRelay, receiveAgent]
|
||||
where
|
||||
-- check for session var for pending session
|
||||
-- if exists - wait
|
||||
-- if doesn't - create session var, and spawn worker
|
||||
connectRelay :: M ()
|
||||
connectRelay = pure ()
|
||||
|
||||
receiveAgent :: M ()
|
||||
receiveAgent = pure ()
|
||||
|
||||
updateDeletedStats :: QueueRec -> M ()
|
||||
updateDeletedStats q = do
|
||||
stats <- asks serverStats
|
||||
|
||||
@@ -115,7 +115,7 @@ data Env = Env
|
||||
sockets :: SocketState,
|
||||
clientSeq :: TVar ClientId,
|
||||
clients :: TVar (IntMap Client),
|
||||
proxyServer :: SMPProxyServer -- senders served on this proxy
|
||||
proxyAgent :: ProxyAgent -- senders served on this proxy
|
||||
}
|
||||
|
||||
data Server = Server
|
||||
@@ -126,15 +126,20 @@ data Server = Server
|
||||
savingLock :: Lock
|
||||
}
|
||||
|
||||
data SMPProxyServer = SMPProxyServer
|
||||
{ relaySessions :: TMap SessionId SMPProxiedRelay,
|
||||
relayServers :: TMap Text SessionId -- speed up client lookups by server URI
|
||||
data ProxyAgent = ProxyAgent
|
||||
{ relaySessions :: TMap SessionId RelaySession,
|
||||
-- Speed up client lookups by server address.
|
||||
-- if keyhash provided by the client is different from keyhash(es?) received in session,
|
||||
-- server can refuse the request for proxy session.
|
||||
relays :: TMap (TransportHost, ServiceName) (SessionId, C.KeyHash),
|
||||
connectQ :: TBQueue (SMPServer, Either ErrorType (SessionId, X.CertificateChain, X.SignedExact X.PubKey) -> IO ()) -- sndQ to send relay session to the client client
|
||||
}
|
||||
|
||||
data SMPProxiedRelay = SMPProxiedRelay
|
||||
data RelaySession = RelaySession
|
||||
{ worker :: Worker,
|
||||
proxyKey :: C.DhSecretX25519,
|
||||
fwdQ :: TBQueue (ClientId, CorrId, C.PublicKeyX25519, ByteString) -- FWD args from multiple clients using this server
|
||||
-- SessionId??
|
||||
proxyKey :: C.PublicKeyX25519, -- ???
|
||||
relayQ :: TBQueue (ClientId, CorrId, C.PublicKeyX25519, ByteString) -- FWD args from multiple clients using this server
|
||||
-- can be used for QUOTA retries until the session is gone
|
||||
}
|
||||
|
||||
@@ -207,8 +212,8 @@ newEnv config@ServerConfig {caCertificateFile, certificateFile, privateKeyFile,
|
||||
sockets <- atomically newSocketState
|
||||
clientSeq <- newTVarIO 0
|
||||
clients <- newTVarIO mempty
|
||||
proxyServer <- newSMPProxyServer
|
||||
return Env {config, server, serverIdentity, queueStore, msgStore, random, storeLog, tlsServerParams, serverStats, sockets, clientSeq, clients, proxyServer}
|
||||
proxyAgent <- newSMPProxyAgent
|
||||
return Env {config, server, serverIdentity, queueStore, msgStore, random, storeLog, tlsServerParams, serverStats, sockets, clientSeq, clients, proxyAgent}
|
||||
where
|
||||
restoreQueues :: QueueStore -> FilePath -> IO (StoreLog 'WriteMode)
|
||||
restoreQueues QueueStore {queues, senders, notifiers} f = do
|
||||
@@ -225,8 +230,8 @@ newEnv config@ServerConfig {caCertificateFile, certificateFile, privateKeyFile,
|
||||
Nothing -> id
|
||||
Just NtfCreds {notifierId} -> M.insert notifierId (recipientId q)
|
||||
|
||||
newSMPProxyServer :: MonadIO m => m SMPProxyServer
|
||||
newSMPProxyServer = do
|
||||
relayServers <- atomically TM.empty
|
||||
newSMPProxyAgent :: IO ProxyAgent
|
||||
newSMPProxyAgent = do
|
||||
relays <- atomically TM.empty
|
||||
relaySessions <- atomically TM.empty
|
||||
pure SMPProxyServer {relayServers, relaySessions}
|
||||
pure ProxyAgent {relays, relaySessions}
|
||||
|
||||
@@ -349,6 +349,7 @@ data ServerHandshake = ServerHandshake
|
||||
{ smpVersionRange :: VersionRangeSMP,
|
||||
sessionId :: SessionId,
|
||||
-- pub key to agree shared secrets for command authorization and entity ID encryption.
|
||||
-- todo C.PublicKeyX25519
|
||||
authPubKey :: Maybe (X.CertificateChain, X.SignedExact X.PubKey)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user