mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-06-05 23:41:31 +00:00
f0b7a4be73
* smp server: messaging services (#1565)
* smp server: refactor message delivery to always respond SOK to subscriptions
* refactor ntf subscribe
* cancel subscription thread and reduce service subscription count when queue is deleted
* subscribe rcv service, deliver sent messages to subscribed service
* subscribe rcv service to messages (TODO delivery on subscription)
* WIP
* efficient initial delivery of messages to subscribed service
* test: delivery to client with service certificate
* test: upgrade/downgrade to/from service subscriptions
* remove service association from agent API, add per-user flag to use the service
* agent client (WIP)
* service certificates in the client
* rfc about drift detection, and SALL to mark end of message delivery
* fix test
* fix test
* add function for postgresql message storage
* update migration
* servers: maintain xor-hash of all associated queue IDs in PostgreSQL (#1668)
* servers: maintain xor-hash of all associated queue IDs in PostgreSQL (#1615)
* ntf server: maintain xor-hash of all associated queue IDs via PostgreSQL triggers
* smp server: xor hash with triggers
* fix sql and using pgcrypto extension in tests
* track counts and hashes in smp/ntf servers via triggers, smp server stats for service subscription, update SMP protocol to pass expected count and hash in SSUB/NSSUB commands
* agent migrations with functions/triggers
* remove agent triggers
* try tracking service subs in the agent (WIP, does not compile)
* Revert "try tracking service subs in the agent (WIP, does not compile)"
This reverts commit 59e908100d.
* comment
* agent database triggers
* service subscriptions in the client
* test / fix client services
* update schema
* fix postgres migration
* update schema
* move schema test to the end
* use static function with SQLite to avoid dynamic wrapper
* agent: fail when per-connection transport isolation is used with services (#1670)
* agent: service subscription events (#1671)
* agent: use server keyhash when loading service record
* agent: process queue/service associations with delayed subscription results
* agent: service subscription events
* agent: finalize initial service subscriptions, remove associations on service ID changes (#1672)
* agent: remove service/queue associations when service ID changes
* agent: check that service ID in NEW response matches session ID in transport session
* agent subscription WIP
* test
* comment
* enable tests
* update queries
* agent: option to add SQLite aggregates to DB connection (#1673)
* agent: add build_relations_vector function to sqlite
* update aggregate
* use static aggregate
* remove relations
---------
Co-authored-by: Evgeny Poberezkin <evgeny@poberezkin.com>
* add test, treat BAD_SERVICE as temp error, only remove queue associations on service errors
* add packZipWith for backward compatibility with GHC 8.10.7
---------
Co-authored-by: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com>
* servers: service stats and logging, allow services without option (removed), report errors during service message delivery, remove threads when service subscription ended (#1676)
* smp server: always allow services without option
* smp server: maintain IDs hash in session subscription states
* smp server: service message delivery error handling
* ntf server: log subscription count and hash differences
* smp server: remove delivery threads when service subscription ended/client disconnected
* agent: remove service queue association when service ID changed, process ENDS event, test migrating to/from service (#1677)
* agent: remove service queue association when service ID changed
* agent: process ENDS event
* agent: send service subscription error event
* agent: test migrating to/from service subscriptions, fixes
* agent: always remove service when disabled, fix service subscriptions
* ntf server: use different client certs for each SMP server, remove support for store log (#1681)
* ntf server: remove support for store log
* ntf server: use different client certificates for each SMP server
* smp protocol: fix encoding for SOKS/ENDS responses (#1683)
* agent: create user with option to enable client service (#1684)
* agent: create user with option to enable client service
* handle HTTP2 errors
* do not catch async exceptions
* agent: minor fixes
* docs: update protocol (#1705)
* docs: agent threat model
* update protocol docs
* update RFCs (#1730)
* update RFCs
* update
* update overview
* update terminology
* original language in threat model
---------
Co-authored-by: Evgeny @ SimpleX Chat <259188159+evgeny-simplex@users.noreply.github.com>
* docs: fix minor issues in protocols
* docs: add e2e encrypted message wire encoding to PQDR spec
* docs: add missing encodings and other protocol corrections
* docs: move implemented rfcs
* smp: service fixes (#1737)
* smp: deliver service subscription to correct client
* tests: more resilient to concurrency
* optimize PostgreSQL query
* fix service re-association after server "downgrade"
* correctly handle service removed from server (and ID changed)
* remove unused
---------
Co-authored-by: Evgeny @ SimpleX Chat <259188159+evgeny-simplex@users.noreply.github.com>
* prometheus: fix metrics names (#1747)
* test: rcv service re-association on restart (#1746)
* agent: correct log message
* docs: update whitepaper
* smp: fix messaging client service issues (#1751)
* services: fix minor issues
* fix accounting for subscribed service queues, add prometheus stats
* fix uncorrelated subquery
* fix potential race condition when inserting service defensively, as it is also prevented by how client is created
---------
Co-authored-by: Evgeny @ SimpleX Chat <259188159+evgeny-simplex@users.noreply.github.com>
* agent: refactor cleanup if no pending subs (#1757)
* smp server: batch processing of subscription messages (#1753)
* smp server: batch processing of subscription messages
* refactor
* empty line
* fix
---------
Co-authored-by: Evgeny @ SimpleX Chat <259188159+evgeny-simplex@users.noreply.github.com>
* smp: batch queue association updates on subscriptions (#1760)
* smp: batch queue association updates on subscriptions
* refactor to fused batching
* simpler
* batch assoc functions
* clean up
* fix
---------
Co-authored-by: Evgeny @ SimpleX Chat <259188159+evgeny-simplex@users.noreply.github.com>
* agent: use primary key index in setRcvServiceAssocs (#1783)
* agent: use primary key index in setRcvServiceAssocs
Previous WHERE rcv_id = ? did not match the (host, port, rcv_id)
primary key prefix and fell back to a table scan via
idx_rcv_queues_client_notice_id. With ~390k rows per queue, each
update in a 1350-row batch scanned the whole table, yielding ~290s
per batch and a multi-hour rcv-services migration.
* agent: pass SMPServer explicitly to setRcvServiceAssocs
Avoid extracting host/port from the first queue inside setRcvServiceAssocs.
The caller already has SMPServer in scope (from tSess) and the call chain
is short, so threading it through is simpler than inspecting the list.
Removes the empty-list guard from setRcvServiceAssocs (it remains in
processRcvServiceAssocs).
---------
Co-authored-by: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com>
Co-authored-by: Evgeny @ SimpleX Chat <259188159+evgeny-simplex@users.noreply.github.com>
Co-authored-by: sh <37271604+shumvgolove@users.noreply.github.com>
1442 lines
65 KiB
Haskell
1442 lines
65 KiB
Haskell
{-# LANGUAGE DataKinds #-}
|
||
{-# LANGUAGE DeriveAnyClass #-}
|
||
{-# LANGUAGE DuplicateRecordFields #-}
|
||
{-# LANGUAGE FlexibleContexts #-}
|
||
{-# LANGUAGE GADTs #-}
|
||
{-# LANGUAGE LambdaCase #-}
|
||
{-# LANGUAGE NamedFieldPuns #-}
|
||
{-# LANGUAGE NumericUnderscores #-}
|
||
{-# LANGUAGE OverloadedLists #-}
|
||
{-# LANGUAGE OverloadedStrings #-}
|
||
{-# LANGUAGE PatternSynonyms #-}
|
||
{-# LANGUAGE ScopedTypeVariables #-}
|
||
{-# LANGUAGE TemplateHaskell #-}
|
||
{-# LANGUAGE TupleSections #-}
|
||
{-# LANGUAGE TypeApplications #-}
|
||
|
||
-- |
|
||
-- Module : Simplex.Messaging.Client
|
||
-- Copyright : (c) simplex.chat
|
||
-- License : AGPL-3
|
||
--
|
||
-- Maintainer : chat@simplex.chat
|
||
-- Stability : experimental
|
||
-- Portability : non-portable
|
||
--
|
||
-- This module provides a functional client API for SMP protocol.
|
||
--
|
||
-- See https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md
|
||
module Simplex.Messaging.Client
|
||
( -- * Connect (disconnect) client to (from) SMP server
|
||
TransportSession,
|
||
SMPTransportSession,
|
||
ProtocolClient (thParams, sessionTs),
|
||
SMPClient,
|
||
ProxiedRelay (..),
|
||
getProtocolClient,
|
||
closeProtocolClient,
|
||
protocolClientServer,
|
||
protocolClientServer',
|
||
transportHost',
|
||
transportSession',
|
||
useWebPort,
|
||
isPresetDomain,
|
||
|
||
-- * SMP protocol command functions
|
||
createSMPQueue,
|
||
subscribeSMPQueue,
|
||
subscribeSMPQueues,
|
||
streamSubscribeSMPQueues,
|
||
getSMPMessage,
|
||
subscribeSMPQueueNotifications,
|
||
subscribeSMPQueuesNtfs,
|
||
subscribeService,
|
||
smpClientService,
|
||
smpClientServiceId,
|
||
secureSMPQueue,
|
||
secureSndSMPQueue,
|
||
proxySecureSndSMPQueue,
|
||
addSMPQueueLink,
|
||
deleteSMPQueueLink,
|
||
secureGetSMPQueueLink,
|
||
proxySecureGetSMPQueueLink,
|
||
getSMPQueueLink,
|
||
proxyGetSMPQueueLink,
|
||
enableSMPQueueNotifications,
|
||
disableSMPQueueNotifications,
|
||
enableSMPQueuesNtfs,
|
||
disableSMPQueuesNtfs,
|
||
sendSMPMessage,
|
||
ackSMPMessage,
|
||
suspendSMPQueue,
|
||
deleteSMPQueue,
|
||
deleteSMPQueues,
|
||
connectSMPProxiedRelay,
|
||
proxySMPMessage,
|
||
forwardSMPTransmission,
|
||
getSMPQueueInfo,
|
||
sendProtocolCommand,
|
||
sendProtocolCommands,
|
||
|
||
-- * Supporting types and client configuration
|
||
ProtocolClientError (..),
|
||
SMPClientError,
|
||
ProxyClientError (..),
|
||
Response (..),
|
||
unexpectedResponse,
|
||
ProtocolClientConfig (..),
|
||
NetworkConfig (..),
|
||
NetworkTimeout (..),
|
||
NetworkRequestMode (..),
|
||
pattern NRMInteractive,
|
||
TransportSessionMode (..),
|
||
HostMode (..),
|
||
SocksMode (..),
|
||
SMPProxyMode (..),
|
||
SMPProxyFallback (..),
|
||
SMPWebPortServers (..),
|
||
netTimeoutInt,
|
||
defaultClientConfig,
|
||
defaultSMPClientConfig,
|
||
defaultNetworkConfig,
|
||
transportClientConfig,
|
||
clientSocksCredentials,
|
||
chooseTransportHost,
|
||
temporaryClientError,
|
||
smpClientServiceError,
|
||
smpProxyError,
|
||
smpErrorClientNotice,
|
||
textToHostMode,
|
||
clientHandlers,
|
||
ServerTransmissionBatch,
|
||
ServerTransmission (..),
|
||
ClientCommand,
|
||
|
||
-- * For testing
|
||
PCTransmission,
|
||
mkTransmission,
|
||
authTransmission,
|
||
smpClientStub,
|
||
|
||
-- * For debugging
|
||
TBQueueInfo (..),
|
||
getTBQueueInfo,
|
||
getProtocolClientQueuesInfo,
|
||
nonBlockingWriteTBQueue,
|
||
)
|
||
where
|
||
|
||
import Control.Applicative ((<|>))
|
||
import Control.Concurrent (ThreadId, forkFinally, forkIO, killThread, mkWeakThreadId)
|
||
import Control.Concurrent.Async
|
||
import Control.Concurrent.STM
|
||
import Control.Exception (Exception, Handler (..), IOException, SomeAsyncException, SomeException)
|
||
import qualified Control.Exception as E
|
||
import Control.Logger.Simple
|
||
import Control.Monad
|
||
import Control.Monad.Except
|
||
import Control.Monad.IO.Class (liftIO)
|
||
import Control.Monad.Trans.Except
|
||
import Crypto.Random (ChaChaDRG)
|
||
import qualified Data.Aeson.TH as J
|
||
import qualified Data.Attoparsec.ByteString.Char8 as A
|
||
import Data.ByteString.Char8 (ByteString)
|
||
import qualified Data.ByteString.Char8 as B
|
||
import qualified Data.ByteString.Base64 as B64
|
||
import Data.Functor (($>))
|
||
import Data.Int (Int64)
|
||
import Data.List (find, isSuffixOf)
|
||
import Data.List.NonEmpty (NonEmpty (..))
|
||
import qualified Data.List.NonEmpty as L
|
||
import Data.Maybe (catMaybes, fromMaybe)
|
||
import Data.Text (Text)
|
||
import qualified Data.Text as T
|
||
import Data.Time.Clock (UTCTime (..), diffUTCTime, getCurrentTime)
|
||
import qualified Data.X509 as X
|
||
import qualified Data.X509.Validation as XV
|
||
import Network.Socket (HostName, ServiceName)
|
||
import Network.Socks5 (SocksCredentials (..))
|
||
import Numeric.Natural
|
||
import qualified Simplex.Messaging.Crypto as C
|
||
import Simplex.Messaging.Encoding
|
||
import Simplex.Messaging.Encoding.String
|
||
import Simplex.Messaging.Parsers (defaultJSON, dropPrefix, enumJSON, sumTypeJSON)
|
||
import Simplex.Messaging.Protocol
|
||
import Simplex.Messaging.Protocol.Types
|
||
import Simplex.Messaging.Server.QueueStore.QueueInfo
|
||
import Simplex.Messaging.TMap (TMap)
|
||
import qualified Simplex.Messaging.TMap as TM
|
||
import Simplex.Messaging.Transport
|
||
import Simplex.Messaging.Transport.Client (SocksAuth (..), SocksProxyWithAuth (..), TransportClientConfig (..), TransportHost (..), defaultSMPPort, runTransportClient)
|
||
import Simplex.Messaging.Transport.HTTP2 (httpALPN11)
|
||
import Simplex.Messaging.Transport.KeepAlive
|
||
import Simplex.Messaging.Transport.Shared (ChainCertificates (..), chainIdCaCerts, x509validate)
|
||
import Simplex.Messaging.Util
|
||
import Simplex.Messaging.Version
|
||
import System.Mem.Weak (Weak, deRefWeak)
|
||
import System.Timeout (timeout)
|
||
|
||
-- | 'SMPClient' is a handle used to send commands to a specific SMP server.
|
||
--
|
||
-- Use 'getSMPClient' to connect to an SMP server and create a client handle.
|
||
data ProtocolClient v err msg = ProtocolClient
|
||
{ action :: Maybe (Weak ThreadId),
|
||
thParams :: THandleParams v 'TClient,
|
||
sessionTs :: UTCTime,
|
||
client_ :: PClient v err msg
|
||
}
|
||
|
||
data PClient v err msg = PClient
|
||
{ connected :: TVar Bool,
|
||
transportSession :: TransportSession msg,
|
||
transportHost :: TransportHost,
|
||
tcpConnectTimeout :: NetworkTimeout,
|
||
tcpTimeout :: NetworkTimeout,
|
||
sendPings :: TVar Bool,
|
||
lastReceived :: TVar UTCTime,
|
||
timeoutErrorCount :: TVar Int,
|
||
clientCorrId :: TVar ChaChaDRG,
|
||
sentCommands :: TMap CorrId (Request err msg),
|
||
sndQ :: TBQueue (Maybe (Request err msg), ByteString),
|
||
rcvQ :: TBQueue (NonEmpty (Transmission (Either err msg))),
|
||
msgQ :: Maybe (TBQueue (ServerTransmissionBatch v err msg))
|
||
}
|
||
|
||
smpClientStub :: TVar ChaChaDRG -> ByteString -> VersionSMP -> Maybe (THandleAuth 'TClient) -> IO SMPClient
|
||
smpClientStub g sessionId thVersion thAuth = do
|
||
let ts = UTCTime (read "2024-03-31") 0
|
||
connected <- newTVarIO False
|
||
clientCorrId <- atomically $ C.newRandomDRG g
|
||
sentCommands <- TM.emptyIO
|
||
sendPings <- newTVarIO False
|
||
lastReceived <- newTVarIO ts
|
||
timeoutErrorCount <- newTVarIO 0
|
||
sndQ <- newTBQueueIO 100
|
||
rcvQ <- newTBQueueIO 100
|
||
let NetworkConfig {tcpConnectTimeout, tcpTimeout} = defaultNetworkConfig
|
||
return
|
||
ProtocolClient
|
||
{ action = Nothing,
|
||
thParams =
|
||
THandleParams
|
||
{ sessionId,
|
||
thVersion,
|
||
thServerVRange = supportedServerSMPRelayVRange,
|
||
thAuth,
|
||
blockSize = smpBlockSize,
|
||
implySessId = thVersion >= authCmdsSMPVersion,
|
||
encryptBlock = Nothing,
|
||
batch = True,
|
||
serviceAuth = thVersion >= serviceCertsSMPVersion
|
||
},
|
||
sessionTs = ts,
|
||
client_ =
|
||
PClient
|
||
{ connected,
|
||
transportSession = (1, "smp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:5001", Nothing),
|
||
transportHost = "localhost",
|
||
tcpConnectTimeout,
|
||
tcpTimeout,
|
||
sendPings,
|
||
lastReceived,
|
||
timeoutErrorCount,
|
||
clientCorrId,
|
||
sentCommands,
|
||
sndQ,
|
||
rcvQ,
|
||
msgQ = Nothing
|
||
}
|
||
}
|
||
|
||
type SMPClient = ProtocolClient SMPVersion ErrorType BrokerMsg
|
||
|
||
-- | Type for client command data
|
||
type ClientCommand msg = (EntityId, Maybe C.APrivateAuthKey, ProtoCommand msg)
|
||
|
||
-- | Type synonym for transmission from SPM servers.
|
||
-- Batch response is presented as a single `ServerTransmissionBatch` tuple.
|
||
type ServerTransmissionBatch v err msg = (TransportSession msg, THandleParams v 'TClient, NonEmpty (EntityId, ServerTransmission err msg))
|
||
|
||
data ServerTransmission err msg
|
||
= STEvent (Either (ProtocolClientError err) msg)
|
||
| STResponse (ProtoCommand msg) (Either (ProtocolClientError err) msg)
|
||
| STUnexpectedError (ProtocolClientError err)
|
||
|
||
data HostMode
|
||
= -- | prefer (or require) onion hosts when connecting via SOCKS proxy
|
||
HMOnionViaSocks
|
||
| -- | prefer (or require) onion hosts
|
||
HMOnion
|
||
| -- | prefer (or require) public hosts
|
||
HMPublic
|
||
deriving (Eq, Show)
|
||
|
||
textToHostMode :: Text -> Either String HostMode
|
||
textToHostMode = \case
|
||
"public" -> Right HMPublic
|
||
"onion" -> Right HMOnionViaSocks
|
||
s -> Left $ T.unpack $ "Invalid host_mode: " <> s
|
||
|
||
data SocksMode
|
||
= -- | always use SOCKS proxy when enabled
|
||
SMAlways
|
||
| -- | use SOCKS proxy only for .onion hosts when no public host is available
|
||
-- This mode is used in SMP proxy and in notifications server to minimize SOCKS proxy usage.
|
||
SMOnion
|
||
deriving (Eq, Show)
|
||
|
||
instance StrEncoding SocksMode where
|
||
strEncode = \case
|
||
SMAlways -> "always"
|
||
SMOnion -> "onion"
|
||
strP =
|
||
A.takeTill (== ' ') >>= \case
|
||
"always" -> pure SMAlways
|
||
"onion" -> pure SMOnion
|
||
_ -> fail "Invalid Socks mode"
|
||
|
||
-- | network configuration for the client
|
||
data NetworkConfig = NetworkConfig
|
||
{ -- | use SOCKS5 proxy
|
||
socksProxy :: Maybe SocksProxyWithAuth,
|
||
-- | when to use SOCKS proxy
|
||
socksMode :: SocksMode,
|
||
-- | determines critera which host is chosen from the list
|
||
hostMode :: HostMode,
|
||
-- | if above criteria is not met, if the below setting is True return error, otherwise use the first host
|
||
requiredHostMode :: Bool,
|
||
-- | transport sessions are created per user or per entity
|
||
sessionMode :: TransportSessionMode,
|
||
-- | SMP proxy mode
|
||
smpProxyMode :: SMPProxyMode,
|
||
-- | Fallback to direct connection when destination SMP relay does not support SMP proxy protocol extensions
|
||
smpProxyFallback :: SMPProxyFallback,
|
||
-- | use web port 443 for SMP protocol
|
||
smpWebPortServers :: SMPWebPortServers,
|
||
-- | timeout for the initial client TCP/TLS connection (microseconds)
|
||
tcpConnectTimeout :: NetworkTimeout,
|
||
-- | timeout of protocol commands (microseconds)
|
||
tcpTimeout :: NetworkTimeout,
|
||
-- | additional timeout per kilobyte (1024 bytes) to be sent
|
||
tcpTimeoutPerKb :: Int64,
|
||
-- | break response timeouts into groups, so later responses get later deadlines
|
||
rcvConcurrency :: Int,
|
||
-- | TCP keep-alive options, Nothing to skip enabling keep-alive
|
||
tcpKeepAlive :: Maybe KeepAliveOpts,
|
||
-- | period for SMP ping commands (microseconds, 0 to disable)
|
||
smpPingInterval :: Int64,
|
||
-- | the count of timeout errors after which SMP client terminates (and will be reconnected), 0 to disable
|
||
smpPingCount :: Int,
|
||
logTLSErrors :: Bool
|
||
}
|
||
deriving (Eq, Show)
|
||
|
||
data NetworkTimeout = NetworkTimeout {backgroundTimeout :: Int, interactiveTimeout :: Int}
|
||
deriving (Eq, Show)
|
||
|
||
data NetworkRequestMode
|
||
= NRMBackground
|
||
| NRMInteractive' {retryCount :: Int}
|
||
|
||
pattern NRMInteractive :: NetworkRequestMode
|
||
pattern NRMInteractive = NRMInteractive' 0
|
||
|
||
netTimeoutInt :: NetworkTimeout -> NetworkRequestMode -> Int
|
||
netTimeoutInt NetworkTimeout {backgroundTimeout, interactiveTimeout} = \case
|
||
NRMBackground -> backgroundTimeout
|
||
NRMInteractive' n
|
||
| n <= 0 -> interactiveTimeout
|
||
| otherwise ->
|
||
let (m, d)
|
||
| n == 1 = (3, 2)
|
||
| n == 2 = (9, 4)
|
||
| otherwise = (27, 8)
|
||
in (interactiveTimeout * m) `div` d
|
||
|
||
data TransportSessionMode = TSMUser | TSMSession | TSMServer | TSMEntity
|
||
deriving (Eq, Show)
|
||
|
||
-- SMP proxy mode for sending messages
|
||
data SMPProxyMode
|
||
= SPMAlways
|
||
| SPMUnknown -- use with unknown relays
|
||
| SPMUnprotected -- use with unknown relays when IP address is not protected (i.e., when neither SOCKS proxy nor .onion address is used)
|
||
| SPMNever
|
||
deriving (Eq, Show)
|
||
|
||
data SMPProxyFallback
|
||
= SPFAllow -- connect directly when chosen proxy or destination relay do not support proxy protocol.
|
||
| SPFAllowProtected -- connect directly only when IP address is protected (SOCKS proxy or .onion address is used).
|
||
| SPFProhibit -- prohibit direct connection to destination relay.
|
||
deriving (Eq, Show)
|
||
|
||
data SMPWebPortServers
|
||
= SWPAll
|
||
| SWPPreset
|
||
| SWPOff
|
||
deriving (Eq, Show)
|
||
|
||
instance StrEncoding SMPProxyMode where
|
||
strEncode = \case
|
||
SPMAlways -> "always"
|
||
SPMUnknown -> "unknown"
|
||
SPMUnprotected -> "unprotected"
|
||
SPMNever -> "never"
|
||
strP =
|
||
A.takeTill (== ' ') >>= \case
|
||
"always" -> pure SPMAlways
|
||
"unknown" -> pure SPMUnknown
|
||
"unprotected" -> pure SPMUnprotected
|
||
"never" -> pure SPMNever
|
||
_ -> fail "Invalid SMP proxy mode"
|
||
|
||
instance StrEncoding SMPProxyFallback where
|
||
strEncode = \case
|
||
SPFAllow -> "yes"
|
||
SPFAllowProtected -> "protected"
|
||
SPFProhibit -> "no"
|
||
strP =
|
||
A.takeTill (== ' ') >>= \case
|
||
"yes" -> pure SPFAllow
|
||
"protected" -> pure SPFAllowProtected
|
||
"no" -> pure SPFProhibit
|
||
_ -> fail "Invalid SMP proxy fallback mode"
|
||
|
||
instance StrEncoding SMPWebPortServers where
|
||
strEncode = \case
|
||
SWPAll -> "all"
|
||
SWPPreset -> "preset"
|
||
SWPOff -> "off"
|
||
strP =
|
||
A.takeTill (== ' ') >>= \case
|
||
"all" -> pure SWPAll
|
||
"preset" -> pure SWPPreset
|
||
"off" -> pure SWPOff
|
||
_ -> fail "Invalid SMP wep port setting"
|
||
|
||
defaultNetworkConfig :: NetworkConfig
|
||
defaultNetworkConfig =
|
||
NetworkConfig
|
||
{ socksProxy = Nothing,
|
||
socksMode = SMAlways,
|
||
hostMode = HMOnionViaSocks,
|
||
requiredHostMode = False,
|
||
sessionMode = TSMSession,
|
||
smpProxyMode = SPMNever,
|
||
smpProxyFallback = SPFAllow,
|
||
smpWebPortServers = SWPPreset,
|
||
tcpConnectTimeout = NetworkTimeout {backgroundTimeout = 45_000000, interactiveTimeout = 15_000000},
|
||
tcpTimeout = NetworkTimeout {backgroundTimeout = 30_000000, interactiveTimeout = 10_000000},
|
||
tcpTimeoutPerKb = 5_000,
|
||
rcvConcurrency = 8,
|
||
tcpKeepAlive = Just defaultKeepAliveOpts,
|
||
smpPingInterval = 600_000_000, -- 10min
|
||
smpPingCount = 3,
|
||
logTLSErrors = False
|
||
}
|
||
|
||
transportClientConfig :: NetworkConfig -> NetworkRequestMode -> TransportHost -> Bool -> Maybe [ALPN] -> TransportClientConfig
|
||
transportClientConfig NetworkConfig {socksProxy, socksMode, tcpConnectTimeout, tcpKeepAlive, logTLSErrors} nm host useSNI clientALPN =
|
||
TransportClientConfig {socksProxy = useSocksProxy socksMode, tcpConnectTimeout = tOut, tcpKeepAlive, logTLSErrors, clientCredentials = Nothing, clientALPN, useSNI}
|
||
where
|
||
tOut = netTimeoutInt tcpConnectTimeout nm
|
||
socksProxy' = (\(SocksProxyWithAuth _ proxy) -> proxy) <$> socksProxy
|
||
useSocksProxy SMAlways = socksProxy'
|
||
useSocksProxy SMOnion = case host of
|
||
THOnionHost _ -> socksProxy'
|
||
_ -> Nothing
|
||
|
||
clientSocksCredentials :: ProtocolTypeI (ProtoType msg) => NetworkConfig -> UTCTime -> TransportSession msg -> Maybe SocksCredentials
|
||
clientSocksCredentials NetworkConfig {socksProxy, sessionMode} proxySessTs (userId, srv, entityId_) = case socksProxy of
|
||
Just (SocksProxyWithAuth auth _) -> case auth of
|
||
SocksAuthUsername {username, password} -> Just $ SocksCredentials username password
|
||
SocksAuthNull -> Nothing
|
||
SocksIsolateByAuth -> Just $ SocksCredentials sessionUsername ""
|
||
Nothing -> Nothing
|
||
where
|
||
sessionUsername =
|
||
B64.encode $ C.sha256Hash $
|
||
bshow userId <> case sessionMode of
|
||
TSMUser -> ""
|
||
TSMSession -> ":" <> bshow proxySessTs
|
||
TSMServer -> ":" <> bshow proxySessTs <> "@" <> strEncode srv
|
||
TSMEntity -> ":" <> bshow proxySessTs <> "@" <> strEncode srv <> maybe "" ("/" <>) entityId_
|
||
|
||
-- | protocol client configuration.
|
||
data ProtocolClientConfig v = ProtocolClientConfig
|
||
{ -- | size of TBQueue to use for server commands and responses
|
||
qSize :: Natural,
|
||
-- | default server port if port is not specified in ProtocolServer
|
||
defaultTransport :: (ServiceName, ATransport 'TClient),
|
||
-- | network configuration
|
||
networkConfig :: NetworkConfig,
|
||
clientALPN :: Maybe [ALPN],
|
||
serviceCredentials :: Maybe ServiceCredentials,
|
||
-- | client-server protocol version range
|
||
serverVRange :: VersionRange v,
|
||
-- | agree shared session secret (used in SMP proxy for additional encryption layer)
|
||
agreeSecret :: Bool,
|
||
-- | Whether connecting client is a proxy server. See comment in ClientHandshake
|
||
proxyServer :: Bool,
|
||
-- | send SNI to server, False for SMP
|
||
useSNI :: Bool
|
||
}
|
||
|
||
-- | Default protocol client configuration.
|
||
defaultClientConfig :: Maybe [ALPN] -> Bool -> VersionRange v -> ProtocolClientConfig v
|
||
defaultClientConfig clientALPN useSNI serverVRange =
|
||
ProtocolClientConfig
|
||
{ qSize = 64,
|
||
defaultTransport = ("443", transport @TLS),
|
||
networkConfig = defaultNetworkConfig,
|
||
clientALPN,
|
||
serviceCredentials = Nothing,
|
||
serverVRange,
|
||
agreeSecret = False,
|
||
proxyServer = False,
|
||
useSNI
|
||
}
|
||
{-# INLINE defaultClientConfig #-}
|
||
|
||
defaultSMPClientConfig :: ProtocolClientConfig SMPVersion
|
||
defaultSMPClientConfig =
|
||
(defaultClientConfig (Just alpnSupportedSMPHandshakes) False supportedClientSMPRelayVRange)
|
||
{ defaultTransport = (show defaultSMPPort, transport @TLS),
|
||
agreeSecret = True
|
||
}
|
||
{-# INLINE defaultSMPClientConfig #-}
|
||
|
||
data Request err msg = Request
|
||
{ corrId :: CorrId,
|
||
entityId :: EntityId,
|
||
command :: ProtoCommand msg,
|
||
pending :: TVar Bool,
|
||
responseVar :: TMVar (Either (ProtocolClientError err) msg)
|
||
}
|
||
|
||
data Response err msg = Response
|
||
{ entityId :: EntityId,
|
||
response :: Either (ProtocolClientError err) msg
|
||
}
|
||
|
||
chooseTransportHost :: NetworkConfig -> NonEmpty TransportHost -> Either (ProtocolClientError err) TransportHost
|
||
chooseTransportHost NetworkConfig {socksProxy, hostMode, requiredHostMode} hosts =
|
||
firstOrError $ case hostMode of
|
||
HMOnionViaSocks -> maybe publicHost (const onionHost) socksProxy
|
||
HMOnion -> onionHost
|
||
HMPublic -> publicHost
|
||
where
|
||
firstOrError
|
||
| requiredHostMode = maybe (Left PCEIncompatibleHost) Right
|
||
| otherwise = Right . fromMaybe (L.head hosts)
|
||
isOnionHost = \case THOnionHost _ -> True; _ -> False
|
||
onionHost = find isOnionHost hosts
|
||
publicHost = find (not . isOnionHost) hosts
|
||
|
||
protocolClientServer :: ProtocolTypeI (ProtoType msg) => ProtocolClient v err msg -> String
|
||
protocolClientServer = B.unpack . strEncode . protocolClientServer'
|
||
{-# INLINE protocolClientServer #-}
|
||
|
||
protocolClientServer' :: ProtocolClient v err msg -> ProtoServer msg
|
||
protocolClientServer' = snd3 . transportSession . client_
|
||
where
|
||
snd3 (_, s, _) = s
|
||
{-# INLINE protocolClientServer' #-}
|
||
|
||
transportHost' :: ProtocolClient v err msg -> TransportHost
|
||
transportHost' = transportHost . client_
|
||
{-# INLINE transportHost' #-}
|
||
|
||
transportSession' :: ProtocolClient v err msg -> TransportSession msg
|
||
transportSession' = transportSession . client_
|
||
{-# INLINE transportSession' #-}
|
||
|
||
type UserId = Int64
|
||
|
||
-- | Transport session key - includes entity ID if `sessionMode = TSMEntity`.
|
||
-- Please note that for SMP connection ID is used as entity ID, not queue ID.
|
||
type TransportSession msg = (UserId, ProtoServer msg, Maybe ByteString)
|
||
|
||
type SMPTransportSession = TransportSession BrokerMsg
|
||
|
||
-- | Connects to 'ProtocolServer' using passed client configuration
|
||
-- and queue for messages and notifications.
|
||
--
|
||
-- A single queue can be used for multiple 'SMPClient' instances,
|
||
-- as 'SMPServerTransmission' includes server information.
|
||
getProtocolClient :: forall v err msg. Protocol v err msg => TVar ChaChaDRG -> NetworkRequestMode -> TransportSession msg -> ProtocolClientConfig v -> [HostName] -> Maybe (TBQueue (ServerTransmissionBatch v err msg)) -> UTCTime -> (ProtocolClient v err msg -> IO ()) -> IO (Either (ProtocolClientError err) (ProtocolClient v err msg))
|
||
getProtocolClient g nm transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize, networkConfig, clientALPN, serviceCredentials, serverVRange, agreeSecret, proxyServer, useSNI} presetDomains msgQ proxySessTs disconnected = do
|
||
case chooseTransportHost networkConfig (host srv) of
|
||
Right useHost ->
|
||
(getCurrentTime >>= mkProtocolClient useHost >>= runClient useTransport useHost)
|
||
`E.catches` clientHandlers
|
||
Left e -> pure $ Left e
|
||
where
|
||
NetworkConfig {tcpConnectTimeout, tcpTimeout, smpPingInterval} = networkConfig
|
||
mkProtocolClient :: TransportHost -> UTCTime -> IO (PClient v err msg)
|
||
mkProtocolClient transportHost ts = do
|
||
connected <- newTVarIO False
|
||
sendPings <- newTVarIO False
|
||
lastReceived <- newTVarIO ts
|
||
timeoutErrorCount <- newTVarIO 0
|
||
clientCorrId <- atomically $ C.newRandomDRG g
|
||
sentCommands <- TM.emptyIO
|
||
sndQ <- newTBQueueIO qSize
|
||
rcvQ <- newTBQueueIO qSize
|
||
return
|
||
PClient
|
||
{ connected,
|
||
transportSession,
|
||
transportHost,
|
||
tcpConnectTimeout,
|
||
tcpTimeout,
|
||
sendPings,
|
||
lastReceived,
|
||
timeoutErrorCount,
|
||
clientCorrId,
|
||
sentCommands,
|
||
sndQ,
|
||
rcvQ,
|
||
msgQ
|
||
}
|
||
|
||
runClient :: (ServiceName, ATransport 'TClient) -> TransportHost -> PClient v err msg -> IO (Either (ProtocolClientError err) (ProtocolClient v err msg))
|
||
runClient (port', ATransport t) useHost c = do
|
||
cVar <- newEmptyTMVarIO
|
||
let tcConfig = (transportClientConfig networkConfig nm useHost useSNI useALPN) {clientCredentials = serviceCreds <$> serviceCredentials}
|
||
socksCreds = clientSocksCredentials networkConfig proxySessTs transportSession
|
||
tId <-
|
||
runTransportClient tcConfig socksCreds useHost port' (Just $ keyHash srv) (client t c cVar)
|
||
`forkFinally` \r ->
|
||
let err = either toNetworkError (const NEFailedError) r
|
||
in void $ atomically $ tryPutTMVar cVar $ Left $ PCENetworkError err
|
||
c_ <- netTimeoutInt tcpConnectTimeout nm `timeout` atomically (takeTMVar cVar)
|
||
case c_ of
|
||
Just (Right c') -> mkWeakThreadId tId >>= \tId' -> pure $ Right c' {action = Just tId'}
|
||
Just (Left e) -> pure $ Left e
|
||
Nothing -> killThread tId $> Left (PCENetworkError NETimeoutError)
|
||
|
||
useTransport :: (ServiceName, ATransport 'TClient)
|
||
useTransport = case port srv of
|
||
"" -> case protocolTypeI @(ProtoType msg) of
|
||
SPSMP | web -> ("443", transport @TLS)
|
||
_ -> defaultTransport cfg
|
||
p -> (p, transport @TLS)
|
||
|
||
useALPN :: Maybe [ALPN]
|
||
useALPN = if web then Just [httpALPN11] else clientALPN
|
||
|
||
web = useWebPort networkConfig presetDomains srv
|
||
|
||
client :: forall c. Transport c => TProxy c 'TClient -> PClient v err msg -> TMVar (Either (ProtocolClientError err) (ProtocolClient v err msg)) -> c 'TClient -> IO ()
|
||
client _ c cVar h = do
|
||
ks <- if agreeSecret then Just <$> atomically (C.generateKeyPair g) else pure Nothing
|
||
serviceKeys_ <- mapM (\creds -> (creds,) <$> atomically (C.generateKeyPair g)) serviceCredentials
|
||
runExceptT (protocolClientHandshake @v @err @msg h ks (keyHash srv) serverVRange proxyServer serviceKeys_) >>= \case
|
||
Left e -> atomically . putTMVar cVar . Left $ PCETransportError e
|
||
Right th@THandle {params} -> do
|
||
sessionTs <- getCurrentTime
|
||
let c' = ProtocolClient {action = Nothing, client_ = c, thParams = params, sessionTs}
|
||
atomically $ writeTVar (lastReceived c) sessionTs
|
||
atomically $ do
|
||
writeTVar (connected c) True
|
||
putTMVar cVar $ Right c'
|
||
raceAny_ ([send c' th, process c', receive c' th] <> [monitor c' | smpPingInterval > 0])
|
||
`E.finally` disconnected c'
|
||
|
||
send :: Transport c => ProtocolClient v err msg -> THandle v c 'TClient -> IO ()
|
||
send ProtocolClient {client_ = PClient {sndQ}} h = forever $ atomically (readTBQueue sndQ) >>= sendPending
|
||
where
|
||
sendPending (r, s) = case r of
|
||
Nothing -> void $ tPutLog h s
|
||
Just Request {pending, responseVar} ->
|
||
whenM (readTVarIO pending) $ tPutLog h s >>= either responseErr pure
|
||
where
|
||
responseErr = atomically . putTMVar responseVar . Left . PCETransportError
|
||
|
||
receive :: Transport c => ProtocolClient v err msg -> THandle v c 'TClient -> IO ()
|
||
receive ProtocolClient {client_ = PClient {rcvQ, lastReceived, timeoutErrorCount}} h = forever $ do
|
||
tGetClient h >>= atomically . writeTBQueue rcvQ
|
||
getCurrentTime >>= atomically . writeTVar lastReceived
|
||
atomically $ writeTVar timeoutErrorCount 0
|
||
|
||
monitor :: ProtocolClient v err msg -> IO ()
|
||
monitor c@ProtocolClient {client_ = PClient {sendPings, lastReceived, timeoutErrorCount}} = loop smpPingInterval
|
||
where
|
||
loop :: Int64 -> IO ()
|
||
loop delay = do
|
||
threadDelay' delay
|
||
diff <- diffUTCTime <$> getCurrentTime <*> readTVarIO lastReceived
|
||
let idle = diffToMicroseconds diff
|
||
remaining = smpPingInterval - idle
|
||
if remaining > 1_000_000 -- delay pings only for significant time
|
||
then loop remaining
|
||
else do
|
||
whenM (readTVarIO sendPings) $ void . runExceptT $ sendProtocolCommand c NRMBackground Nothing NoEntity (protocolPing @v @err @msg)
|
||
-- sendProtocolCommand/getResponse updates counter for each command
|
||
cnt <- readTVarIO timeoutErrorCount
|
||
-- drop client when maxCnt of commands have timed out in sequence, but only after some time has passed after last received response
|
||
when (maxCnt == 0 || cnt < maxCnt || diff < recoverWindow) $ loop smpPingInterval
|
||
recoverWindow = 15 * 60 -- seconds
|
||
maxCnt = smpPingCount networkConfig
|
||
|
||
process :: ProtocolClient v err msg -> IO ()
|
||
process c = forever $ atomically (readTBQueue $ rcvQ $ client_ c) >>= processMsgs c
|
||
|
||
processMsgs :: ProtocolClient v err msg -> NonEmpty (Transmission (Either err msg)) -> IO ()
|
||
processMsgs c ts = do
|
||
ts' <- catMaybes <$> mapM (processMsg c) (L.toList ts)
|
||
forM_ msgQ $ \q ->
|
||
mapM_ (atomically . writeTBQueue q . serverTransmission c) (L.nonEmpty ts')
|
||
|
||
processMsg :: ProtocolClient v err msg -> Transmission (Either err msg) -> IO (Maybe (EntityId, ServerTransmission err msg))
|
||
processMsg ProtocolClient {client_ = PClient {sentCommands}} (corrId, entId, respOrErr)
|
||
| B.null $ bs corrId = sendMsg $ STEvent clientResp
|
||
| otherwise =
|
||
TM.lookupIO corrId sentCommands >>= \case
|
||
Nothing -> sendMsg $ STUnexpectedError unexpected
|
||
Just Request {entityId, command, pending, responseVar} -> do
|
||
wasPending <-
|
||
atomically $ do
|
||
TM.delete corrId sentCommands
|
||
ifM
|
||
(swapTVar pending False)
|
||
(True <$ tryPutTMVar responseVar (if entityId == entId then clientResp else Left unexpected))
|
||
(pure False)
|
||
if wasPending
|
||
then pure Nothing
|
||
else sendMsg $ if entityId == entId then STResponse command clientResp else STUnexpectedError unexpected
|
||
where
|
||
unexpected = unexpectedResponse respOrErr
|
||
clientResp = case respOrErr of
|
||
Left e -> Left $ PCEResponseError e
|
||
Right r -> case protocolError r of
|
||
Just e -> Left $ PCEProtocolError e
|
||
_ -> Right r
|
||
sendMsg :: ServerTransmission err msg -> IO (Maybe (EntityId, ServerTransmission err msg))
|
||
sendMsg t = case msgQ of
|
||
Just _ -> pure $ Just (entId, t)
|
||
Nothing ->
|
||
Nothing <$ case clientResp of
|
||
Left e -> logError $ "SMP client error: " <> tshow e
|
||
Right _ -> logWarn "SMP client unprocessed event"
|
||
|
||
clientHandlers :: [Handler (Either (ProtocolClientError e) a)]
|
||
clientHandlers =
|
||
[ Handler $ \(e :: IOException) -> pure $ Left $ PCEIOError $ E.displayException e,
|
||
Handler $ \(e :: SomeAsyncException) -> E.throwIO e,
|
||
Handler $ \(e :: SomeException) -> pure $ Left $ PCENetworkError $ toNetworkError e
|
||
]
|
||
|
||
useWebPort :: NetworkConfig -> [HostName] -> ProtocolServer p -> Bool
|
||
useWebPort cfg presetDomains ProtocolServer {host = h :| _} = case smpWebPortServers cfg of
|
||
SWPAll -> True
|
||
SWPPreset -> isPresetDomain presetDomains h
|
||
SWPOff -> False
|
||
|
||
isPresetDomain :: [HostName] -> TransportHost -> Bool
|
||
isPresetDomain presetDomains = \case
|
||
THDomainName h -> any (`isSuffixOf` h) presetDomains
|
||
_ -> False
|
||
|
||
unexpectedResponse :: Show r => r -> ProtocolClientError err
|
||
unexpectedResponse = PCEUnexpectedResponse . B.pack . take 32 . show
|
||
|
||
-- | Disconnects client from the server and terminates client threads.
|
||
closeProtocolClient :: ProtocolClient v err msg -> IO ()
|
||
closeProtocolClient = mapM_ (deRefWeak >=> mapM_ killThread) . action
|
||
{-# INLINE closeProtocolClient #-}
|
||
|
||
-- | SMP client error type.
|
||
data ProtocolClientError err
|
||
= -- | Correctly parsed SMP server ERR response.
|
||
-- This error is forwarded to the agent client as `ERR SMP err`.
|
||
PCEProtocolError err
|
||
| -- | Invalid server response that failed to parse.
|
||
-- Forwarded to the agent client as `ERR BROKER RESPONSE`.
|
||
PCEResponseError err
|
||
| -- | Different response from what is expected to a certain SMP command,
|
||
-- e.g. server should respond `IDS` or `ERR` to `NEW` command,
|
||
-- other responses would result in this error.
|
||
-- Forwarded to the agent client as `ERR BROKER UNEXPECTED`.
|
||
PCEUnexpectedResponse ByteString
|
||
| -- | Used for TCP connection and command response timeouts.
|
||
-- Forwarded to the agent client as `ERR BROKER TIMEOUT`.
|
||
PCEResponseTimeout
|
||
| -- | Failure to establish TCP connection.
|
||
-- Forwarded to the agent client as `ERR BROKER NETWORK`.
|
||
PCENetworkError NetworkError
|
||
| -- | No host compatible with network configuration
|
||
PCEIncompatibleHost
|
||
| -- | Service is unavailable for command that requires service connection
|
||
PCEServiceUnavailable
|
||
| -- | TCP transport handshake or some other transport error.
|
||
-- Forwarded to the agent client as `ERR BROKER TRANSPORT e`.
|
||
PCETransportError TransportError
|
||
| -- | Error when cryptographically "signing" the command or when initializing crypto_box.
|
||
PCECryptoError C.CryptoError
|
||
| -- | IO Error
|
||
PCEIOError String
|
||
deriving (Eq, Show, Exception)
|
||
|
||
type SMPClientError = ProtocolClientError ErrorType
|
||
|
||
temporaryClientError :: ProtocolClientError err -> Bool
|
||
temporaryClientError = \case
|
||
PCENetworkError _ -> True
|
||
PCEResponseTimeout -> True
|
||
PCEIOError _ -> True
|
||
_ -> False
|
||
{-# INLINE temporaryClientError #-}
|
||
|
||
-- it is consistent with clientServiceError
|
||
smpClientServiceError :: SMPClientError -> Bool
|
||
smpClientServiceError = \case
|
||
PCEServiceUnavailable -> True
|
||
PCEProtocolError SERVICE -> True
|
||
PCEProtocolError (PROXY (BROKER NO_SERVICE)) -> True -- for completeness, it cannot happen.
|
||
_ -> False
|
||
|
||
-- converts error of client running on proxy to the error sent to client connected to proxy
|
||
smpProxyError :: SMPClientError -> ErrorType
|
||
smpProxyError = \case
|
||
PCEProtocolError e -> PROXY $ PROTOCOL e
|
||
PCEResponseError e -> PROXY $ BROKER $ RESPONSE $ B.unpack $ strEncode e
|
||
PCEUnexpectedResponse e -> PROXY $ BROKER $ UNEXPECTED $ B.unpack e
|
||
PCEResponseTimeout -> PROXY $ BROKER TIMEOUT
|
||
PCENetworkError e -> PROXY $ BROKER $ NETWORK e
|
||
PCEIncompatibleHost -> PROXY $ BROKER HOST
|
||
PCEServiceUnavailable -> PROXY $ BROKER $ NO_SERVICE -- for completeness, it cannot happen.
|
||
PCETransportError t -> PROXY $ BROKER $ TRANSPORT t
|
||
PCECryptoError _ -> CRYPTO
|
||
PCEIOError _ -> INTERNAL
|
||
|
||
smpErrorClientNotice :: SMPClientError -> Maybe (Maybe ClientNotice)
|
||
smpErrorClientNotice = \case
|
||
PCEProtocolError (BLOCKED BlockingInfo {notice}) -> Just notice
|
||
_ -> Nothing
|
||
{-# INLINE smpErrorClientNotice #-}
|
||
|
||
-- | Create a new SMP queue.
|
||
--
|
||
-- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#create-queue-command
|
||
createSMPQueue ::
|
||
SMPClient ->
|
||
NetworkRequestMode ->
|
||
Maybe C.CbNonce -> -- used as correlation ID to allow deriving SenderId from it for short links
|
||
C.AAuthKeyPair -> -- SMP v6 - signature key pair, SMP v7 - DH key pair
|
||
RcvPublicDhKey ->
|
||
Maybe BasicAuth ->
|
||
SubscriptionMode ->
|
||
QueueReqData ->
|
||
Maybe NewNtfCreds ->
|
||
ExceptT SMPClientError IO QueueIdsKeys
|
||
createSMPQueue c nm nonce_ (rKey, rpKey) dhKey auth subMode qrd ntfCreds =
|
||
sendProtocolCommand_ c nm nonce_ Nothing (Just rpKey) NoEntity (Cmd SCreator $ NEW $ NewQueueReq rKey dhKey auth subMode (Just qrd) ntfCreds) >>= \case
|
||
IDS qik -> pure qik
|
||
r -> throwE $ unexpectedResponse r
|
||
|
||
-- | Subscribe to the SMP queue.
|
||
--
|
||
-- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#subscribe-to-queue
|
||
-- This command is always sent in background request mode
|
||
subscribeSMPQueue :: SMPClient -> RcvPrivateAuthKey -> RecipientId -> ExceptT SMPClientError IO (Maybe ServiceId)
|
||
subscribeSMPQueue c rpKey rId = do
|
||
liftIO $ enablePings c
|
||
sendSMPCommand c NRMBackground (Just rpKey) rId SUB >>= liftIO . processSUBResponse_ c rId >>= except
|
||
|
||
-- | Subscribe to multiple SMP queues batching commands if supported.
|
||
-- This command is always sent in background request mode
|
||
subscribeSMPQueues :: SMPClient -> NonEmpty (RecipientId, RcvPrivateAuthKey) -> IO (NonEmpty (Either SMPClientError (Maybe ServiceId)))
|
||
subscribeSMPQueues c qs = do
|
||
liftIO $ enablePings c
|
||
sendProtocolCommands c NRMBackground cs >>= mapM (processSUBResponse c)
|
||
where
|
||
cs = L.map (\(rId, rpKey) -> (rId, Just rpKey, Cmd SRecipient SUB)) qs
|
||
|
||
-- This command is always sent in background request mode
|
||
streamSubscribeSMPQueues :: SMPClient -> NonEmpty (RecipientId, RcvPrivateAuthKey) -> ([(RecipientId, Either SMPClientError (Maybe ServiceId))] -> IO ()) -> IO ()
|
||
streamSubscribeSMPQueues c qs cb = streamProtocolCommands c NRMBackground cs $ mapM process >=> cb
|
||
where
|
||
cs = L.map (\(rId, rpKey) -> (rId, Just rpKey, Cmd SRecipient SUB)) qs
|
||
process r@(Response rId _) = (rId,) <$> processSUBResponse c r
|
||
|
||
processSUBResponse :: SMPClient -> Response ErrorType BrokerMsg -> IO (Either SMPClientError (Maybe ServiceId))
|
||
processSUBResponse c (Response rId r) = pure r $>>= processSUBResponse_ c rId
|
||
|
||
processSUBResponse_ :: SMPClient -> RecipientId -> BrokerMsg -> IO (Either SMPClientError (Maybe ServiceId))
|
||
processSUBResponse_ c rId = \case
|
||
OK -> pure $ Right Nothing
|
||
SOK serviceId_ -> pure $ Right serviceId_
|
||
cmd@MSG {} -> writeSMPMessage c rId cmd $> Right Nothing
|
||
r' -> pure . Left $ unexpectedResponse r'
|
||
|
||
writeSMPMessage :: SMPClient -> RecipientId -> BrokerMsg -> IO ()
|
||
writeSMPMessage c rId msg = atomically $ mapM_ (`writeTBQueue` serverTransmission c [(rId, STEvent (Right msg))]) (msgQ $ client_ c)
|
||
|
||
serverTransmission :: ProtocolClient v err msg -> NonEmpty (RecipientId, ServerTransmission err msg) -> ServerTransmissionBatch v err msg
|
||
serverTransmission ProtocolClient {thParams, client_ = PClient {transportSession}} ts = (transportSession, thParams, ts)
|
||
|
||
-- | Get message from SMP queue. The server returns ERR PROHIBITED if a client uses SUB and GET via the same transport connection for the same queue
|
||
--
|
||
-- https://github.covm/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#receive-a-message-from-the-queue
|
||
-- This command is always sent in interactive request mode, as NSE has limited time
|
||
getSMPMessage :: SMPClient -> RcvPrivateAuthKey -> RecipientId -> ExceptT SMPClientError IO (Maybe RcvMessage)
|
||
getSMPMessage c rpKey rId =
|
||
sendSMPCommand c NRMInteractive (Just rpKey) rId GET >>= \case
|
||
OK -> pure Nothing
|
||
cmd@(MSG msg) -> liftIO (writeSMPMessage c rId cmd) $> Just msg
|
||
r -> throwE $ unexpectedResponse r
|
||
{-# INLINE getSMPMessage #-}
|
||
|
||
-- | Subscribe to the SMP queue notifications.
|
||
--
|
||
-- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#subscribe-to-queue-notifications
|
||
-- This command is always sent in background request mode
|
||
subscribeSMPQueueNotifications :: SMPClient -> NtfPrivateAuthKey -> NotifierId -> ExceptT SMPClientError IO (Maybe ServiceId)
|
||
subscribeSMPQueueNotifications c npKey nId = do
|
||
liftIO $ enablePings c
|
||
sendSMPCommand c NRMBackground (Just npKey) nId NSUB >>= except . nsubResponse_
|
||
|
||
-- | Subscribe to multiple SMP queues notifications batching commands if supported.
|
||
-- This command is always sent in background request mode
|
||
subscribeSMPQueuesNtfs :: SMPClient -> NonEmpty (NotifierId, NtfPrivateAuthKey) -> IO (NonEmpty (Either SMPClientError (Maybe ServiceId)))
|
||
subscribeSMPQueuesNtfs c qs = do
|
||
liftIO $ enablePings c
|
||
L.map nsubResponse <$> sendProtocolCommands c NRMBackground cs
|
||
where
|
||
cs = L.map (\(nId, npKey) -> (nId, Just npKey, Cmd SNotifier NSUB)) qs
|
||
|
||
nsubResponse :: Response ErrorType BrokerMsg -> Either SMPClientError (Maybe ServiceId)
|
||
nsubResponse (Response _ r) = r >>= nsubResponse_
|
||
{-# INLINE nsubResponse #-}
|
||
|
||
nsubResponse_ :: BrokerMsg -> Either SMPClientError (Maybe ServiceId)
|
||
nsubResponse_ = \case
|
||
OK -> Right Nothing
|
||
SOK serviceId_ -> Right serviceId_
|
||
r' -> Left $ unexpectedResponse r'
|
||
{-# INLINE nsubResponse_ #-}
|
||
|
||
-- This command is always sent in background request mode
|
||
subscribeService :: forall p. (PartyI p, ServiceParty p) => SMPClient -> SParty p -> Int64 -> IdsHash -> ExceptT SMPClientError IO ServiceSub
|
||
subscribeService c party n idsHash = case smpClientService c of
|
||
Just THClientService {serviceId, serviceKey} -> do
|
||
liftIO $ enablePings c
|
||
sendSMPCommand c NRMBackground (Just (C.APrivateAuthKey C.SEd25519 serviceKey)) serviceId subCmd >>= \case
|
||
SOKS n' idsHash' -> pure $ ServiceSub serviceId n' idsHash'
|
||
r -> throwE $ unexpectedResponse r
|
||
where
|
||
subCmd :: Command p
|
||
subCmd = case party of
|
||
SRecipientService -> SUBS n idsHash
|
||
SNotifierService -> NSUBS n idsHash
|
||
Nothing -> throwE PCEServiceUnavailable
|
||
|
||
smpClientService :: SMPClient -> Maybe THClientService
|
||
smpClientService = thAuth . thParams >=> clientService
|
||
{-# INLINE smpClientService #-}
|
||
|
||
smpClientServiceId :: SMPClient -> Maybe ServiceId
|
||
smpClientServiceId = fmap (\THClientService {serviceId} -> serviceId) . smpClientService
|
||
{-# INLINE smpClientServiceId #-}
|
||
|
||
enablePings :: SMPClient -> IO ()
|
||
enablePings ProtocolClient {client_ = PClient {sendPings}} = atomically $ writeTVar sendPings True
|
||
{-# INLINE enablePings #-}
|
||
|
||
-- | Secure the SMP queue by adding a sender public key.
|
||
--
|
||
-- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#secure-queue-command
|
||
secureSMPQueue :: SMPClient -> NetworkRequestMode -> RcvPrivateAuthKey -> RecipientId -> SndPublicAuthKey -> ExceptT SMPClientError IO ()
|
||
secureSMPQueue c nm rpKey rId senderKey = okSMPCommand (KEY senderKey) c nm rpKey rId
|
||
{-# INLINE secureSMPQueue #-}
|
||
|
||
-- | Secure the SMP queue via sender queue ID.
|
||
secureSndSMPQueue :: SMPClient -> NetworkRequestMode -> SndPrivateAuthKey -> SenderId -> ExceptT SMPClientError IO ()
|
||
secureSndSMPQueue c nm spKey sId = okSMPCommand (SKEY $ C.toPublic spKey) c nm spKey sId
|
||
{-# INLINE secureSndSMPQueue #-}
|
||
|
||
proxySecureSndSMPQueue :: SMPClient -> NetworkRequestMode -> ProxiedRelay -> SndPrivateAuthKey -> SenderId -> ExceptT SMPClientError IO (Either ProxyClientError ())
|
||
proxySecureSndSMPQueue c nm proxiedRelay spKey sId = proxyOKSMPCommand c nm proxiedRelay (Just spKey) sId (SKEY $ C.toPublic spKey)
|
||
{-# INLINE proxySecureSndSMPQueue #-}
|
||
|
||
-- | Add or update date for queue link
|
||
addSMPQueueLink :: SMPClient -> NetworkRequestMode -> RcvPrivateAuthKey -> RecipientId -> LinkId -> QueueLinkData -> ExceptT SMPClientError IO ()
|
||
addSMPQueueLink c nm rpKey rId lnkId d = okSMPCommand (LSET lnkId d) c nm rpKey rId
|
||
{-# INLINE addSMPQueueLink #-}
|
||
|
||
-- | Delete queue link
|
||
deleteSMPQueueLink :: SMPClient -> NetworkRequestMode -> RcvPrivateAuthKey -> RecipientId -> ExceptT SMPClientError IO ()
|
||
deleteSMPQueueLink = okSMPCommand LDEL
|
||
{-# INLINE deleteSMPQueueLink #-}
|
||
|
||
-- | Get 1-time inviation SMP queue link data and secure the queue via queue link ID.
|
||
secureGetSMPQueueLink :: SMPClient -> NetworkRequestMode -> SndPrivateAuthKey -> LinkId -> ExceptT SMPClientError IO (SenderId, QueueLinkData)
|
||
secureGetSMPQueueLink c nm spKey lnkId =
|
||
sendSMPCommand c nm (Just spKey) lnkId (LKEY $ C.toPublic spKey) >>= \case
|
||
LNK sId d -> pure (sId, d)
|
||
r -> throwE $ unexpectedResponse r
|
||
|
||
proxySecureGetSMPQueueLink :: SMPClient -> NetworkRequestMode -> ProxiedRelay -> SndPrivateAuthKey -> LinkId -> ExceptT SMPClientError IO (Either ProxyClientError (SenderId, QueueLinkData))
|
||
proxySecureGetSMPQueueLink c nm proxiedRelay spKey lnkId =
|
||
proxySMPCommand c nm proxiedRelay (Just spKey) lnkId (LKEY $ C.toPublic spKey) >>= \case
|
||
Right (LNK sId d) -> pure $ Right (sId, d)
|
||
Right r -> throwE $ unexpectedResponse r
|
||
Left e -> pure $ Left e
|
||
|
||
-- | Get contact address SMP queue link data.
|
||
getSMPQueueLink :: SMPClient -> NetworkRequestMode -> LinkId -> ExceptT SMPClientError IO (SenderId, QueueLinkData)
|
||
getSMPQueueLink c nm lnkId =
|
||
sendSMPCommand c nm Nothing lnkId LGET >>= \case
|
||
LNK sId d -> pure (sId, d)
|
||
r -> throwE $ unexpectedResponse r
|
||
|
||
-- LGET command - get short link data
|
||
proxyGetSMPQueueLink :: SMPClient -> NetworkRequestMode -> ProxiedRelay -> LinkId -> ExceptT SMPClientError IO (Either ProxyClientError (SenderId, QueueLinkData))
|
||
proxyGetSMPQueueLink c nm proxiedRelay lnkId =
|
||
proxySMPCommand c nm proxiedRelay Nothing lnkId LGET >>= \case
|
||
Right (LNK sId d) -> pure $ Right (sId, d)
|
||
Right r -> throwE $ unexpectedResponse r
|
||
Left e -> pure $ Left e
|
||
|
||
-- | Enable notifications for the queue for push notifications server.
|
||
--
|
||
-- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#enable-notifications-command
|
||
enableSMPQueueNotifications :: SMPClient -> RcvPrivateAuthKey -> RecipientId -> NtfPublicAuthKey -> RcvNtfPublicDhKey -> ExceptT SMPClientError IO (NotifierId, RcvNtfPublicDhKey)
|
||
enableSMPQueueNotifications c rpKey rId notifierKey rcvNtfPublicDhKey =
|
||
sendSMPCommand c NRMBackground (Just rpKey) rId (NKEY notifierKey rcvNtfPublicDhKey) >>= \case
|
||
NID nId rcvNtfSrvPublicDhKey -> pure (nId, rcvNtfSrvPublicDhKey)
|
||
r -> throwE $ unexpectedResponse r
|
||
|
||
-- | Enable notifications for the multiple queues for push notifications server.
|
||
-- This command is always sent in background request mode
|
||
enableSMPQueuesNtfs :: SMPClient -> NonEmpty (RecipientId, RcvPrivateAuthKey, NtfPublicAuthKey, RcvNtfPublicDhKey) -> IO (NonEmpty (Either SMPClientError (NotifierId, RcvNtfPublicDhKey)))
|
||
enableSMPQueuesNtfs c qs = L.map process <$> sendProtocolCommands c NRMBackground cs
|
||
where
|
||
cs = L.map (\(rId, rpKey, notifierKey, rcvNtfPublicDhKey) -> (rId, Just rpKey, Cmd SRecipient $ NKEY notifierKey rcvNtfPublicDhKey)) qs
|
||
process (Response _ r) = case r of
|
||
Right (NID nId rcvNtfSrvPublicDhKey) -> Right (nId, rcvNtfSrvPublicDhKey)
|
||
Right r' -> Left $ unexpectedResponse r'
|
||
Left e -> Left e
|
||
|
||
-- | Disable notifications for the queue for push notifications server.
|
||
--
|
||
-- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#disable-notifications-command
|
||
-- This command is always sent in background request mode
|
||
disableSMPQueueNotifications :: SMPClient -> RcvPrivateAuthKey -> RecipientId -> ExceptT SMPClientError IO ()
|
||
disableSMPQueueNotifications c = okSMPCommand NDEL c NRMBackground
|
||
{-# INLINE disableSMPQueueNotifications #-}
|
||
|
||
-- | Disable notifications for multiple queues for push notifications server.
|
||
-- This command is always sent in background request mode
|
||
disableSMPQueuesNtfs :: SMPClient -> NonEmpty (RecipientId, RcvPrivateAuthKey) -> IO (NonEmpty (Either SMPClientError ()))
|
||
disableSMPQueuesNtfs c = okSMPCommands NDEL c NRMBackground
|
||
{-# INLINE disableSMPQueuesNtfs #-}
|
||
|
||
-- | Send SMP message.
|
||
--
|
||
-- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#send-message
|
||
sendSMPMessage :: SMPClient -> NetworkRequestMode -> Maybe SndPrivateAuthKey -> SenderId -> MsgFlags -> MsgBody -> ExceptT SMPClientError IO ()
|
||
sendSMPMessage c nm spKey sId flags msg =
|
||
sendSMPCommand c nm spKey sId (SEND flags msg) >>= \case
|
||
OK -> pure ()
|
||
r -> throwE $ unexpectedResponse r
|
||
|
||
proxySMPMessage :: SMPClient -> NetworkRequestMode -> ProxiedRelay -> Maybe SndPrivateAuthKey -> SenderId -> MsgFlags -> MsgBody -> ExceptT SMPClientError IO (Either ProxyClientError ())
|
||
proxySMPMessage c nm proxiedRelay spKey sId flags msg = proxyOKSMPCommand c nm proxiedRelay spKey sId (SEND flags msg)
|
||
|
||
-- | Acknowledge message delivery (server deletes the message).
|
||
--
|
||
-- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#acknowledge-message-delivery
|
||
-- This command is always sent in background request mode
|
||
ackSMPMessage :: SMPClient -> RcvPrivateAuthKey -> QueueId -> MsgId -> ExceptT SMPClientError IO ()
|
||
ackSMPMessage c rpKey rId msgId =
|
||
sendSMPCommand c NRMBackground (Just rpKey) rId (ACK msgId) >>= \case
|
||
OK -> return ()
|
||
cmd@MSG {} -> liftIO $ writeSMPMessage c rId cmd
|
||
r -> throwE $ unexpectedResponse r
|
||
|
||
-- | Irreversibly suspend SMP queue.
|
||
-- The existing messages from the queue will still be delivered.
|
||
--
|
||
-- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#suspend-queue
|
||
suspendSMPQueue :: SMPClient -> NetworkRequestMode -> RcvPrivateAuthKey -> QueueId -> ExceptT SMPClientError IO ()
|
||
suspendSMPQueue = okSMPCommand OFF
|
||
{-# INLINE suspendSMPQueue #-}
|
||
|
||
-- | Irreversibly delete SMP queue and all messages in it.
|
||
--
|
||
-- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#delete-queue
|
||
deleteSMPQueue :: SMPClient -> NetworkRequestMode -> RcvPrivateAuthKey -> RecipientId -> ExceptT SMPClientError IO ()
|
||
deleteSMPQueue = okSMPCommand DEL
|
||
{-# INLINE deleteSMPQueue #-}
|
||
|
||
-- | Delete multiple SMP queues batching commands if supported.
|
||
deleteSMPQueues :: SMPClient -> NetworkRequestMode -> NonEmpty (RecipientId, RcvPrivateAuthKey) -> IO (NonEmpty (Either SMPClientError ()))
|
||
deleteSMPQueues = okSMPCommands DEL
|
||
{-# INLINE deleteSMPQueues #-}
|
||
|
||
-- send PRXY :: SMPServer -> Maybe BasicAuth -> Command Sender
|
||
-- receives PKEY :: SessionId -> X.CertificateChain -> X.SignedExact X.PubKey -> BrokerMsg
|
||
connectSMPProxiedRelay :: SMPClient -> NetworkRequestMode -> SMPServer -> Maybe BasicAuth -> ExceptT SMPClientError IO ProxiedRelay
|
||
connectSMPProxiedRelay c@ProtocolClient {client_ = PClient {tcpConnectTimeout, tcpTimeout}} nm relayServ@ProtocolServer {port = relayPort, keyHash = C.KeyHash kh} proxyAuth
|
||
| thVersion (thParams c) >= sendingProxySMPVersion =
|
||
sendProtocolCommand_ c nm Nothing tOut Nothing NoEntity (Cmd SProxiedClient (PRXY relayServ proxyAuth)) >>= \case
|
||
PKEY sId vr (CertChainPubKey chain key) ->
|
||
case supportedClientSMPRelayVRange `compatibleVersion` vr of
|
||
Nothing -> throwE $ transportErr TEVersion
|
||
Just (Compatible v) -> do
|
||
relayKey <- liftEitherWith (const $ transportErr $ TEHandshake IDENTITY) =<< liftIO (runExceptT $ validateRelay chain key)
|
||
pure $ ProxiedRelay sId v proxyAuth relayKey
|
||
r -> throwE $ unexpectedResponse r
|
||
| otherwise = throwE $ PCETransportError TEVersion
|
||
where
|
||
tOut = Just $ netTimeoutInt tcpConnectTimeout nm + netTimeoutInt tcpTimeout nm
|
||
transportErr = PCEProtocolError . PROXY . BROKER . TRANSPORT
|
||
hostName = B.unpack $ strEncode $ transportHost' c
|
||
validateRelay :: X.CertificateChain -> X.SignedExact X.PubKey -> ExceptT String IO C.PublicKeyX25519
|
||
validateRelay chain exact = case chainIdCaCerts chain of
|
||
CCValid {leafCert, idCert, caCert}
|
||
| XV.Fingerprint kh == XV.getFingerprint idCert X.HashSHA256 -> do
|
||
errs <- liftIO $ x509validate caCert (hostName, B.pack relayPort) chain
|
||
unless (null errs) $ throwError "bad certificate"
|
||
serverKey <- liftEither $ C.x509ToPublic' $ X.certPubKey $ X.signedObject $ X.getSigned leafCert
|
||
liftEither $ C.x509ToPublic' =<< C.verifyX509 serverKey exact
|
||
_ -> throwError "bad certificate"
|
||
|
||
data ProxiedRelay = ProxiedRelay
|
||
{ prSessionId :: SessionId,
|
||
prVersion :: VersionSMP,
|
||
prBasicAuth :: Maybe BasicAuth, -- auth is included here to allow reconnecting via the same proxy after NO_SESSION error
|
||
prServerKey :: C.PublicKeyX25519
|
||
}
|
||
|
||
data ProxyClientError
|
||
= -- | protocol error response from proxy
|
||
ProxyProtocolError {protocolErr :: ErrorType}
|
||
| -- | unexpexted response
|
||
ProxyUnexpectedResponse {responseStr :: String}
|
||
| -- | error between proxy and server
|
||
ProxyResponseError {responseErr :: ErrorType}
|
||
deriving (Eq, Show, Exception)
|
||
|
||
instance StrEncoding ProxyClientError where
|
||
strEncode = \case
|
||
ProxyProtocolError e -> "PROTOCOL " <> strEncode e
|
||
ProxyUnexpectedResponse s -> "UNEXPECTED " <> B.pack s
|
||
ProxyResponseError e -> "SYNTAX " <> strEncode e
|
||
strP =
|
||
A.takeTill (== ' ') >>= \case
|
||
"PROTOCOL" -> ProxyProtocolError <$> _strP
|
||
"UNEXPECTED" -> ProxyUnexpectedResponse . B.unpack <$> (A.space *> A.takeByteString)
|
||
"SYNTAX" -> ProxyResponseError <$> _strP
|
||
_ -> fail "bad ProxyClientError"
|
||
|
||
-- consider how to process slow responses - is it handled somehow locally or delegated to the caller
|
||
-- this method is used in the client
|
||
-- sends PFWD :: C.PublicKeyX25519 -> EncTransmission -> Command Sender
|
||
-- receives PRES :: EncResponse -> BrokerMsg -- proxy to client
|
||
|
||
-- When client sends message via proxy, there may be one successful scenario and 9 error scenarios
|
||
-- as shown below (WTF stands for unexpected response, ??? for response that failed to parse).
|
||
-- client proxy relay proxy client
|
||
-- 0) PFWD(SEND) -> RFWD -> RRES -> PRES(OK) -> ok
|
||
-- 1) PFWD(SEND) -> RFWD -> RRES -> PRES(ERR) -> PCEProtocolError - business logic error for client
|
||
-- 2) PFWD(SEND) -> RFWD -> RRES -> PRES(WTF) -> PCEUnexpectedReponse - relay/client protocol logic error
|
||
-- 3) PFWD(SEND) -> RFWD -> RRES -> PRES(???) -> PCEResponseError - relay/client syntax error
|
||
-- 4) PFWD(SEND) -> RFWD -> ERR -> ERR PROXY PROTOCOL -> ProxyProtocolError - proxy/relay business logic error
|
||
-- 5) PFWD(SEND) -> RFWD -> WTF -> ERR PROXY $ BROKER (UNEXPECTED s) -> ProxyProtocolError - proxy/relay protocol logic
|
||
-- 6) PFWD(SEND) -> RFWD -> ??? -> ERR PROXY $ BROKER (RESPONSE s) -> ProxyProtocolError - - proxy/relay syntax
|
||
-- 7) PFWD(SEND) -> ERR -> ProxyProtocolError - client/proxy business logic
|
||
-- 8) PFWD(SEND) -> WTF -> ProxyUnexpectedResponse - client/proxy protocol logic
|
||
-- 9) PFWD(SEND) -> ??? -> ProxyResponseError - client/proxy syntax
|
||
--
|
||
-- We report as proxySMPCommand error (ExceptT error) the errors of two kinds:
|
||
-- - protocol errors from the destination relay wrapped in PRES - to simplify processing of AUTH and QUOTA errors, in this case proxy is "transparent" for such errors (PCEProtocolError, PCEUnexpectedResponse, PCEResponseError)
|
||
-- - other response/transport/connection errors from the client connected to proxy itself
|
||
-- Other errors are reported in the function result as `Either ProxiedRelayError ()`, including
|
||
-- - protocol errors from the client connected to proxy in ProxyClientError (PCEProtocolError, PCEUnexpectedResponse, PCEResponseError)
|
||
-- - other errors from the client running on proxy and connected to relay in PREProxiedRelayError
|
||
|
||
-- This function proxies Sender commands that return OK or ERR
|
||
proxyOKSMPCommand :: SMPClient -> NetworkRequestMode -> ProxiedRelay -> Maybe SndPrivateAuthKey -> SenderId -> Command 'Sender -> ExceptT SMPClientError IO (Either ProxyClientError ())
|
||
proxyOKSMPCommand c nm proxiedRelay spKey sId command =
|
||
proxySMPCommand c nm proxiedRelay spKey sId command >>= \case
|
||
Right OK -> pure $ Right ()
|
||
Right r -> throwE $ unexpectedResponse r
|
||
Left e -> pure $ Left e
|
||
|
||
proxySMPCommand ::
|
||
forall p.
|
||
PartyI p =>
|
||
SMPClient ->
|
||
NetworkRequestMode ->
|
||
-- proxy session from PKEY
|
||
ProxiedRelay ->
|
||
-- message to deliver
|
||
Maybe SndPrivateAuthKey ->
|
||
SenderId ->
|
||
Command p ->
|
||
ExceptT SMPClientError IO (Either ProxyClientError BrokerMsg)
|
||
proxySMPCommand c@ProtocolClient {thParams = proxyThParams, client_ = PClient {clientCorrId = g, tcpTimeout}} nm (ProxiedRelay sessionId v _ serverKey) spKey sId command = do
|
||
-- prepare params
|
||
let serverThAuth = (\ta -> ta {peerServerPubKey = serverKey}) <$> thAuth proxyThParams
|
||
serverThParams = smpTHParamsSetVersion v proxyThParams {sessionId, thAuth = serverThAuth}
|
||
(cmdPubKey, cmdPrivKey) <- liftIO . atomically $ C.generateKeyPair @'C.X25519 g
|
||
let cmdSecret = C.dh' serverKey cmdPrivKey
|
||
nonce@(C.CbNonce corrId) <- liftIO . atomically $ C.randomCbNonce g
|
||
-- encode
|
||
let TransmissionForAuth {tForAuth, tToSend} = encodeTransmissionForAuth serverThParams (CorrId corrId, sId, Cmd (sParty @p) command)
|
||
-- serviceAuth is False here – proxied commands are not used with service certificates
|
||
auth <- liftEitherWith PCETransportError $ authTransmission serverThAuth False spKey nonce tForAuth
|
||
b <- case batchTransmissions serverThParams [Right (auth, tToSend)] of
|
||
[] -> throwE $ PCETransportError TELargeMsg
|
||
TBError e _ : _ -> throwE $ PCETransportError e
|
||
TBTransmission s _ : _ -> pure s
|
||
TBTransmissions s _ _ : _ -> pure s
|
||
et <- liftEitherWith PCECryptoError $ EncTransmission <$> C.cbEncrypt cmdSecret nonce b paddedProxiedTLength
|
||
-- proxy interaction errors are wrapped
|
||
let tOut = Just $ 2 * netTimeoutInt tcpTimeout nm
|
||
tryE (sendProtocolCommand_ c nm (Just nonce) tOut Nothing (EntityId sessionId) (Cmd SProxiedClient (PFWD v cmdPubKey et))) >>= \case
|
||
Right r -> case r of
|
||
PRES (EncResponse er) -> do
|
||
-- server interaction errors are thrown directly
|
||
t' <- liftEitherWith PCECryptoError $ C.cbDecrypt cmdSecret (C.reverseNonce nonce) er
|
||
case tParse serverThParams t' of
|
||
t'' :| [] -> case tDecodeClient serverThParams t'' of
|
||
(_, _, cmd) -> case cmd of
|
||
Right (ERR e) -> throwE $ PCEProtocolError e -- this is the error from the destination relay
|
||
Right r' -> pure $ Right r'
|
||
Left e -> throwE $ PCEResponseError e
|
||
_ -> throwE $ PCETransportError TEBadBlock
|
||
ERR e -> pure . Left $ ProxyProtocolError e -- this will not happen, this error is returned via Left
|
||
_ -> pure . Left $ ProxyUnexpectedResponse $ take 32 $ show r
|
||
Left e -> case e of
|
||
PCEProtocolError e' -> pure . Left $ ProxyProtocolError e'
|
||
PCEUnexpectedResponse e' -> pure . Left $ ProxyUnexpectedResponse $ B.unpack e'
|
||
PCEResponseError e' -> pure . Left $ ProxyResponseError e'
|
||
_ -> throwE e
|
||
|
||
-- this method is used in the proxy
|
||
-- sends RFWD :: EncFwdTransmission -> Command Sender
|
||
-- receives RRES :: EncFwdResponse -> BrokerMsg
|
||
-- proxy should send PRES to the client with EncResponse
|
||
-- Always uses background timeout mode
|
||
forwardSMPTransmission :: SMPClient -> CorrId -> VersionSMP -> C.PublicKeyX25519 -> EncTransmission -> ExceptT SMPClientError IO EncResponse
|
||
forwardSMPTransmission c@ProtocolClient {thParams, client_ = PClient {clientCorrId = g}} fwdCorrId fwdVersion fwdKey fwdTransmission = do
|
||
-- prepare params
|
||
sessSecret <- case thAuth thParams of
|
||
Nothing -> throwE $ PCETransportError TENoServerAuth
|
||
Just THAuthClient {sessSecret} -> maybe (throwE $ PCETransportError TENoServerAuth) pure sessSecret
|
||
nonce <- liftIO . atomically $ C.randomCbNonce g
|
||
-- wrap
|
||
let fwdT = FwdTransmission {fwdCorrId, fwdVersion, fwdKey, fwdTransmission}
|
||
eft = EncFwdTransmission $ C.cbEncryptNoPad sessSecret nonce (smpEncode fwdT)
|
||
-- send
|
||
sendProtocolCommand_ c NRMBackground (Just nonce) Nothing Nothing NoEntity (Cmd SProxyService (RFWD eft)) >>= \case
|
||
RRES (EncFwdResponse efr) -> do
|
||
-- unwrap
|
||
r' <- liftEitherWith PCECryptoError $ C.cbDecryptNoPad sessSecret (C.reverseNonce nonce) efr
|
||
FwdResponse {fwdCorrId = _, fwdResponse} <- liftEitherWith (const $ PCEResponseError BLOCK) $ smpDecode r'
|
||
pure fwdResponse
|
||
r -> throwE $ unexpectedResponse r
|
||
|
||
-- get queue information - always sent interactively
|
||
getSMPQueueInfo :: SMPClient -> NetworkRequestMode -> C.APrivateAuthKey -> QueueId -> ExceptT SMPClientError IO QueueInfo
|
||
getSMPQueueInfo c nm pKey qId =
|
||
sendSMPCommand c nm (Just pKey) qId QUE >>= \case
|
||
INFO info -> pure info
|
||
r -> throwE $ unexpectedResponse r
|
||
|
||
okSMPCommand :: PartyI p => Command p -> SMPClient -> NetworkRequestMode -> C.APrivateAuthKey -> QueueId -> ExceptT SMPClientError IO ()
|
||
okSMPCommand cmd c nm pKey qId =
|
||
sendSMPCommand c nm (Just pKey) qId cmd >>= \case
|
||
OK -> return ()
|
||
r -> throwE $ unexpectedResponse r
|
||
|
||
okSMPCommands :: PartyI p => Command p -> SMPClient -> NetworkRequestMode -> NonEmpty (QueueId, C.APrivateAuthKey) -> IO (NonEmpty (Either SMPClientError ()))
|
||
okSMPCommands cmd c nm qs = L.map process <$> sendProtocolCommands c nm cs
|
||
where
|
||
aCmd = Cmd sParty cmd
|
||
cs = L.map (\(qId, pKey) -> (qId, Just pKey, aCmd)) qs
|
||
process (Response _ r) = case r of
|
||
Right OK -> Right ()
|
||
Right r' -> Left $ unexpectedResponse r'
|
||
Left e -> Left e
|
||
|
||
-- | Send SMP command
|
||
sendSMPCommand :: PartyI p => SMPClient -> NetworkRequestMode -> Maybe C.APrivateAuthKey -> EntityId -> Command p -> ExceptT SMPClientError IO BrokerMsg
|
||
sendSMPCommand c nm pKey entId cmd = sendProtocolCommand c nm pKey entId (Cmd sParty cmd)
|
||
{-# INLINE sendSMPCommand #-}
|
||
|
||
type PCTransmission err msg = (Either TransportError SentRawTransmission, Request err msg)
|
||
|
||
-- | Send multiple commands with batching and collect responses
|
||
sendProtocolCommands :: forall v err msg. Protocol v err msg => ProtocolClient v err msg -> NetworkRequestMode -> NonEmpty (ClientCommand msg) -> IO (NonEmpty (Response err msg))
|
||
sendProtocolCommands c@ProtocolClient {thParams} nm cs = do
|
||
bs <- batchTransmissions' thParams <$> mapM (mkTransmission c) cs
|
||
validate . concat =<< mapM (sendBatch c nm) bs
|
||
where
|
||
validate :: [Response err msg] -> IO (NonEmpty (Response err msg))
|
||
validate rs
|
||
| diff == 0 = pure $ L.fromList rs
|
||
| diff > 0 = do
|
||
putStrLn "send error: fewer responses than expected"
|
||
pure $ L.fromList $ rs <> replicate diff (Response NoEntity $ Left $ PCETransportError TEBadBlock)
|
||
| otherwise = do
|
||
putStrLn "send error: more responses than expected"
|
||
pure $ L.fromList $ take (L.length cs) rs
|
||
where
|
||
diff = L.length cs - length rs
|
||
|
||
streamProtocolCommands :: forall v err msg. Protocol v err msg => ProtocolClient v err msg -> NetworkRequestMode -> NonEmpty (ClientCommand msg) -> ([Response err msg] -> IO ()) -> IO ()
|
||
streamProtocolCommands c@ProtocolClient {thParams} nm cs cb = do
|
||
bs <- batchTransmissions' thParams <$> mapM (mkTransmission c) cs
|
||
mapM_ (cb <=< sendBatch c nm) bs
|
||
|
||
sendBatch :: ProtocolClient v err msg -> NetworkRequestMode -> TransportBatch (Request err msg) -> IO [Response err msg]
|
||
sendBatch c@ProtocolClient {client_ = PClient {sndQ}} nm b = do
|
||
case b of
|
||
TBError e Request {entityId} -> do
|
||
putStrLn "send error: large message"
|
||
pure [Response entityId $ Left $ PCETransportError e]
|
||
TBTransmissions s n rs
|
||
| n > 0 -> do
|
||
nonBlockingWriteTBQueue sndQ (Nothing, s) -- do not expire batched responses
|
||
mapConcurrently (getResponse c nm Nothing) rs
|
||
| otherwise -> pure []
|
||
TBTransmission s r -> do
|
||
nonBlockingWriteTBQueue sndQ (Nothing, s)
|
||
(: []) <$> getResponse c nm Nothing r
|
||
|
||
-- | Send Protocol command
|
||
sendProtocolCommand :: forall v err msg. Protocol v err msg => ProtocolClient v err msg -> NetworkRequestMode -> Maybe C.APrivateAuthKey -> EntityId -> ProtoCommand msg -> ExceptT (ProtocolClientError err) IO msg
|
||
sendProtocolCommand c nm = sendProtocolCommand_ c nm Nothing Nothing
|
||
|
||
-- Currently there is coupling - batch commands do not expire, and individually sent commands do.
|
||
-- This is to reflect the fact that we send subscriptions only as batches, and also because we do not track a separate timeout for the whole batch, so it is not obvious when should we expire it.
|
||
-- We could expire a batch of deletes, for example, either when the first response expires or when the last one does.
|
||
-- But a better solution is to process delayed delete responses.
|
||
--
|
||
-- Please note: if nonce is passed it is also used as a correlation ID
|
||
sendProtocolCommand_ :: forall v err msg. Protocol v err msg => ProtocolClient v err msg -> NetworkRequestMode -> Maybe C.CbNonce -> Maybe Int -> Maybe C.APrivateAuthKey -> EntityId -> ProtoCommand msg -> ExceptT (ProtocolClientError err) IO msg
|
||
sendProtocolCommand_ c@ProtocolClient {client_ = PClient {sndQ}, thParams = THandleParams {batch, blockSize, serviceAuth}} nm nonce_ tOut pKey entId cmd =
|
||
ExceptT $ uncurry sendRecv =<< mkTransmission_ c nonce_ (entId, pKey, cmd)
|
||
where
|
||
-- two separate "atomically" needed to avoid blocking
|
||
sendRecv :: Either TransportError SentRawTransmission -> Request err msg -> IO (Either (ProtocolClientError err) msg)
|
||
sendRecv t_ r = case t_ of
|
||
Left e -> pure . Left $ PCETransportError e
|
||
Right t
|
||
| B.length s > blockSize - 2 -> pure . Left $ PCETransportError TELargeMsg
|
||
| otherwise -> do
|
||
nonBlockingWriteTBQueue sndQ (Just r, s)
|
||
response <$> getResponse c nm tOut r
|
||
where
|
||
s
|
||
| batch = tEncodeBatch1 serviceAuth t
|
||
| otherwise = tEncode serviceAuth t
|
||
|
||
nonBlockingWriteTBQueue :: TBQueue a -> a -> IO ()
|
||
nonBlockingWriteTBQueue q x = do
|
||
sent <- atomically $ tryWriteTBQueue q x
|
||
unless sent $ void $ forkIO $ atomically $ writeTBQueue q x
|
||
|
||
getResponse :: ProtocolClient v err msg -> NetworkRequestMode -> Maybe Int -> Request err msg -> IO (Response err msg)
|
||
getResponse ProtocolClient {client_ = PClient {tcpTimeout, timeoutErrorCount}} nm tOut Request {entityId, pending, responseVar} = do
|
||
r <- fromMaybe (netTimeoutInt tcpTimeout nm) tOut `timeout` atomically (takeTMVar responseVar)
|
||
response <- atomically $ do
|
||
writeTVar pending False
|
||
-- Try to read response again in case it arrived after timeout expired
|
||
-- but before `pending` was set to False above.
|
||
-- See `processMsg`.
|
||
((r <|>) <$> tryTakeTMVar responseVar) >>= \case
|
||
Just r' -> writeTVar timeoutErrorCount 0 $> r'
|
||
Nothing -> modifyTVar' timeoutErrorCount (+ 1) $> Left PCEResponseTimeout
|
||
pure Response {entityId, response}
|
||
|
||
mkTransmission :: Protocol v err msg => ProtocolClient v err msg -> ClientCommand msg -> IO (PCTransmission err msg)
|
||
mkTransmission c = mkTransmission_ c Nothing
|
||
|
||
mkTransmission_ :: forall v err msg. Protocol v err msg => ProtocolClient v err msg -> Maybe C.CbNonce -> ClientCommand msg -> IO (PCTransmission err msg)
|
||
mkTransmission_ ProtocolClient {thParams, client_ = PClient {clientCorrId, sentCommands}} nonce_ (entityId, pKey_, command) = do
|
||
nonce@(C.CbNonce corrId) <- maybe (atomically $ C.randomCbNonce clientCorrId) pure nonce_
|
||
let TransmissionForAuth {tForAuth, tToSend} = encodeTransmissionForAuth thParams (CorrId corrId, entityId, command)
|
||
auth = authTransmission (thAuth thParams) (useServiceAuth command) pKey_ nonce tForAuth
|
||
r <- mkRequest (CorrId corrId)
|
||
pure ((,tToSend) <$> auth, r)
|
||
where
|
||
mkRequest :: CorrId -> IO (Request err msg)
|
||
mkRequest corrId = do
|
||
pending <- newTVarIO True
|
||
responseVar <- newEmptyTMVarIO
|
||
let r =
|
||
Request
|
||
{ corrId,
|
||
entityId,
|
||
command,
|
||
pending,
|
||
responseVar
|
||
}
|
||
atomically $ TM.insert corrId r sentCommands
|
||
pure r
|
||
|
||
authTransmission :: Maybe (THandleAuth 'TClient) -> Bool -> Maybe C.APrivateAuthKey -> C.CbNonce -> ByteString -> Either TransportError (Maybe TAuthorizations)
|
||
authTransmission thAuth serviceAuth pKey_ nonce t = traverse authenticate pKey_
|
||
where
|
||
authenticate :: C.APrivateAuthKey -> Either TransportError TAuthorizations
|
||
authenticate (C.APrivateAuthKey a pk) = (,serviceSig) <$> case a of
|
||
C.SX25519 -> case thAuth of
|
||
Just THAuthClient {peerServerPubKey = k} -> Right $ TAAuthenticator $ C.cbAuthenticate k pk nonce t'
|
||
Nothing -> Left TENoServerAuth
|
||
C.SEd25519 -> sign pk
|
||
C.SEd448 -> sign pk
|
||
-- When command is signed by both entity key and service key,
|
||
-- entity key must sign over both transmission and service certificate hash,
|
||
-- to prevent any service substitution via MITM inside TLS.
|
||
(t', serviceSig) = case clientService =<< thAuth of
|
||
Just THClientService {serviceCertHash = XV.Fingerprint fp, serviceKey} | serviceAuth ->
|
||
(fp <> t, Just $ C.sign' serviceKey t) -- service key only needs to sign transmission itself
|
||
_ -> (t, Nothing)
|
||
sign :: forall a. (C.AlgorithmI a, C.SignatureAlgorithm a) => C.PrivateKey a -> Either TransportError TransmissionAuth
|
||
sign pk = Right $ TASignature $ C.ASignature (C.sAlgorithm @a) (C.sign' pk t')
|
||
|
||
data TBQueueInfo = TBQueueInfo
|
||
{ qLength :: Int,
|
||
qFull :: Bool
|
||
}
|
||
deriving (Show)
|
||
|
||
getTBQueueInfo :: TBQueue a -> STM TBQueueInfo
|
||
getTBQueueInfo q = do
|
||
qLength <- fromIntegral <$> lengthTBQueue q
|
||
qFull <- isFullTBQueue q
|
||
pure TBQueueInfo {qLength, qFull}
|
||
|
||
getProtocolClientQueuesInfo :: ProtocolClient v err msg -> IO (TBQueueInfo, TBQueueInfo)
|
||
getProtocolClientQueuesInfo ProtocolClient {client_ = PClient {sndQ, rcvQ}} = do
|
||
sndQInfo <- atomically $ getTBQueueInfo sndQ
|
||
rcvQInfo <- atomically $ getTBQueueInfo rcvQ
|
||
pure (sndQInfo, rcvQInfo)
|
||
|
||
$(J.deriveJSON (enumJSON $ dropPrefix "HM") ''HostMode)
|
||
|
||
$(J.deriveJSON (enumJSON $ dropPrefix "SM") ''SocksMode)
|
||
|
||
$(J.deriveJSON (enumJSON $ dropPrefix "TSM") ''TransportSessionMode)
|
||
|
||
$(J.deriveJSON (enumJSON $ dropPrefix "SPM") ''SMPProxyMode)
|
||
|
||
$(J.deriveJSON (enumJSON $ dropPrefix "SPF") ''SMPProxyFallback)
|
||
|
||
$(J.deriveJSON (enumJSON $ dropPrefix "SWP") ''SMPWebPortServers)
|
||
|
||
$(J.deriveJSON defaultJSON ''NetworkTimeout)
|
||
|
||
$(J.deriveJSON defaultJSON ''NetworkConfig)
|
||
|
||
$(J.deriveJSON (sumTypeJSON $ dropPrefix "Proxy") ''ProxyClientError)
|
||
|
||
$(J.deriveJSON defaultJSON ''TBQueueInfo)
|