mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-25 01:35:22 +00:00
agent: create user with option to enable client service (#1684)
* agent: create user with option to enable client service * handle HTTP2 errors * do not catch async exceptions
This commit is contained in:
@@ -47,6 +47,7 @@ import Simplex.Messaging.Client
|
||||
transportClientConfig,
|
||||
clientSocksCredentials,
|
||||
unexpectedResponse,
|
||||
clientHandlers,
|
||||
useWebPort,
|
||||
)
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
@@ -61,7 +62,6 @@ import Simplex.Messaging.Protocol
|
||||
SenderId,
|
||||
pattern NoEntity,
|
||||
NetworkError (..),
|
||||
toNetworkError,
|
||||
)
|
||||
import Simplex.Messaging.Transport (ALPN, CertChainPubKey (..), HandshakeError (..), THandleAuth (..), THandleParams (..), TransportError (..), TransportPeer (..), defaultSupportedParams)
|
||||
import Simplex.Messaging.Transport.Client (TransportClientConfig (..), TransportHost)
|
||||
@@ -70,8 +70,10 @@ import Simplex.Messaging.Transport.HTTP2.Client
|
||||
import Simplex.Messaging.Transport.HTTP2.File
|
||||
import Simplex.Messaging.Util (liftEitherWith, liftError', tshow, whenM)
|
||||
import Simplex.Messaging.Version
|
||||
import UnliftIO
|
||||
import System.IO (IOMode (..), SeekMode (..), hSeek, withFile)
|
||||
import System.Timeout (timeout)
|
||||
import UnliftIO.Directory
|
||||
import UnliftIO.STM
|
||||
|
||||
data XFTPClient = XFTPClient
|
||||
{ http2Client :: HTTP2Client,
|
||||
@@ -261,13 +263,11 @@ downloadXFTPChunk g c@XFTPClient {config} rpKey fId chunkSpec@XFTPRcvChunkSpec {
|
||||
let dhSecret = C.dh' sDhKey rpDhKey
|
||||
cbState <- liftEither . first PCECryptoError $ LC.cbInit dhSecret cbNonce
|
||||
let t = chunkTimeout config chunkSize
|
||||
ExceptT (sequence <$> (t `timeout` (download cbState `catches` errors))) >>= maybe (throwE PCEResponseTimeout) pure
|
||||
ExceptT (sequence <$> (t `timeout` (download cbState `E.catches` handlers))) >>= maybe (throwE PCEResponseTimeout) pure
|
||||
where
|
||||
errors =
|
||||
[ Handler $ \(e :: H.HTTP2Error) -> pure $ Left $ PCENetworkError $ NEConnectError $ displayException e,
|
||||
Handler $ \(e :: IOException) -> pure $ Left $ PCEIOError $ E.displayException e,
|
||||
Handler $ \(e :: SomeException) -> pure $ Left $ PCENetworkError $ toNetworkError e
|
||||
]
|
||||
handlers =
|
||||
E.Handler (\(e :: H.HTTP2Error) -> pure $ Left $ PCENetworkError $ NEConnectError $ E.displayException e)
|
||||
: clientHandlers
|
||||
download cbState =
|
||||
runExceptT . withExceptT PCEResponseError $
|
||||
receiveEncFile chunkPart cbState chunkSpec `catchError` \e ->
|
||||
|
||||
@@ -337,8 +337,8 @@ resumeAgentClient :: AgentClient -> IO ()
|
||||
resumeAgentClient c = atomically $ writeTVar (active c) True
|
||||
{-# INLINE resumeAgentClient #-}
|
||||
|
||||
createUser :: AgentClient -> NonEmpty (ServerCfg 'PSMP) -> NonEmpty (ServerCfg 'PXFTP) -> AE UserId
|
||||
createUser c = withAgentEnv c .: createUser' c
|
||||
createUser :: AgentClient -> Bool -> NonEmpty (ServerCfg 'PSMP) -> NonEmpty (ServerCfg 'PXFTP) -> AE UserId
|
||||
createUser c = withAgentEnv c .:. createUser' c
|
||||
{-# INLINE createUser #-}
|
||||
|
||||
-- | Delete user record optionally deleting all user's connections on SMP servers
|
||||
@@ -754,14 +754,23 @@ logConnection c connected =
|
||||
let event = if connected then "connected to" else "disconnected from"
|
||||
in logInfo $ T.unwords ["client", tshow (clientId c), event, "Agent"]
|
||||
|
||||
createUser' :: AgentClient -> NonEmpty (ServerCfg 'PSMP) -> NonEmpty (ServerCfg 'PXFTP) -> AM UserId
|
||||
createUser' c smp xftp = do
|
||||
createUser' :: AgentClient -> Bool -> NonEmpty (ServerCfg 'PSMP) -> NonEmpty (ServerCfg 'PXFTP) -> AM UserId
|
||||
createUser' c useService smp xftp = do
|
||||
liftIO $ checkUserServers "createUser SMP" smp
|
||||
liftIO $ checkUserServers "createUser XFTP" xftp
|
||||
userId <- withStore' c createUserRecord
|
||||
atomically $ TM.insert userId (mkUserServers smp) $ smpServers c
|
||||
atomically $ TM.insert userId (mkUserServers xftp) $ xftpServers c
|
||||
atomically $ TM.insert userId False $ useClientServices c
|
||||
ok <- atomically $ do
|
||||
(cfg, _) <- readTVar $ useNetworkConfig c
|
||||
if useService && sessionMode cfg == TSMEntity
|
||||
then pure False
|
||||
else do
|
||||
TM.insert userId (mkUserServers smp) $ smpServers c
|
||||
TM.insert userId (mkUserServers xftp) $ xftpServers c
|
||||
TM.insert userId useService $ useClientServices c
|
||||
pure True
|
||||
unless ok $ do
|
||||
withStore c (`deleteUserRecord` userId)
|
||||
throwE $ CMD PROHIBITED "createUser'"
|
||||
pure userId
|
||||
|
||||
deleteUser' :: AgentClient -> UserId -> Bool -> AM ()
|
||||
|
||||
@@ -107,6 +107,7 @@ module Simplex.Messaging.Client
|
||||
smpProxyError,
|
||||
smpErrorClientNotice,
|
||||
textToHostMode,
|
||||
clientHandlers,
|
||||
ServerTransmissionBatch,
|
||||
ServerTransmission (..),
|
||||
ClientCommand,
|
||||
@@ -129,7 +130,7 @@ import Control.Applicative ((<|>))
|
||||
import Control.Concurrent (ThreadId, forkFinally, forkIO, killThread, mkWeakThreadId)
|
||||
import Control.Concurrent.Async
|
||||
import Control.Concurrent.STM
|
||||
import Control.Exception (Exception, SomeException)
|
||||
import Control.Exception (Exception, Handler (..), IOException, SomeAsyncException, SomeException)
|
||||
import qualified Control.Exception as E
|
||||
import Control.Logger.Simple
|
||||
import Control.Monad
|
||||
@@ -567,7 +568,7 @@ getProtocolClient g nm transportSession@(_, srv, _) cfg@ProtocolClientConfig {qS
|
||||
case chooseTransportHost networkConfig (host srv) of
|
||||
Right useHost ->
|
||||
(getCurrentTime >>= mkProtocolClient useHost >>= runClient useTransport useHost)
|
||||
`E.catch` \(e :: SomeException) -> pure $ Left $ PCEIOError $ E.displayException e
|
||||
`E.catches` clientHandlers
|
||||
Left e -> pure $ Left e
|
||||
where
|
||||
NetworkConfig {tcpConnectTimeout, tcpTimeout, smpPingInterval} = networkConfig
|
||||
@@ -719,6 +720,13 @@ getProtocolClient g nm transportSession@(_, srv, _) cfg@ProtocolClientConfig {qS
|
||||
Left e -> logError $ "SMP client error: " <> tshow e
|
||||
Right _ -> logWarn "SMP client unprocessed event"
|
||||
|
||||
clientHandlers :: [Handler (Either (ProtocolClientError e) a)]
|
||||
clientHandlers =
|
||||
[ Handler $ \(e :: IOException) -> pure $ Left $ PCEIOError $ E.displayException e,
|
||||
Handler $ \(e :: SomeAsyncException) -> E.throwIO e,
|
||||
Handler $ \(e :: SomeException) -> pure $ Left $ PCENetworkError $ toNetworkError e
|
||||
]
|
||||
|
||||
useWebPort :: NetworkConfig -> [HostName] -> ProtocolServer p -> Bool
|
||||
useWebPort cfg presetDomains ProtocolServer {host = h :| _} = case smpWebPortServers cfg of
|
||||
SWPAll -> True
|
||||
|
||||
@@ -37,6 +37,7 @@ where
|
||||
import Control.Concurrent (forkIO)
|
||||
import Control.Concurrent.Async (Async, uninterruptibleCancel)
|
||||
import Control.Concurrent.STM (retry)
|
||||
import qualified Control.Exception as E
|
||||
import Control.Logger.Simple
|
||||
import Control.Monad
|
||||
import Control.Monad.Except
|
||||
@@ -83,7 +84,6 @@ import Simplex.Messaging.Transport
|
||||
import Simplex.Messaging.Util (catchAll_, ifM, safeDecodeUtf8, toChunks, tshow, whenM, ($>>=), (<$$>))
|
||||
import System.Timeout (timeout)
|
||||
import UnliftIO (async)
|
||||
import qualified UnliftIO.Exception as E
|
||||
import UnliftIO.STM
|
||||
|
||||
type SMPClientVar = SessionVar (Either (SMPClientError, Maybe UTCTime) (OwnServer, SMPClient))
|
||||
@@ -226,7 +226,7 @@ getSMPServerClient'' ca@SMPClientAgent {agentCfg, smpClients, smpSessions, worke
|
||||
|
||||
newSMPClient :: SMPClientVar -> IO (Either SMPClientError (OwnServer, SMPClient))
|
||||
newSMPClient v = do
|
||||
r <- connectClient ca srv v `E.catch` \(e :: E.SomeException) -> pure $ Left $ PCEIOError $ E.displayException e
|
||||
r <- connectClient ca srv v `E.catches` clientHandlers
|
||||
case r of
|
||||
Right smp -> do
|
||||
logInfo . decodeUtf8 $ "Agent connected to " <> showServer srv
|
||||
@@ -324,7 +324,7 @@ reconnectClient ca@SMPClientAgent {active, agentCfg, smpSubWorkers, workerSeq} s
|
||||
(Just <$> getSessVar workerSeq srv smpSubWorkers ts)
|
||||
newSubWorker :: SessionVar (Async ()) -> IO ()
|
||||
newSubWorker v = do
|
||||
a <- async $ void (E.tryAny runSubWorker) >> atomically (cleanup v)
|
||||
a <- async $ void (E.try @E.SomeException runSubWorker) >> atomically (cleanup v)
|
||||
atomically $ putTMVar (sessionVar v) a
|
||||
runSubWorker =
|
||||
withRetryInterval (reconnectInterval agentCfg) $ \_ loop -> do
|
||||
|
||||
@@ -586,7 +586,7 @@ removeServiceAndAssociations st srv = do
|
||||
withDB "removeServiceAndAssociations" st $ \db -> runExceptT $ do
|
||||
srvId <- ExceptT $ getServerId db
|
||||
subsCount <- liftIO $ removeServiceAssociation_ db srvId
|
||||
liftIO $ removeServerService db srvId
|
||||
liftIO $ void $ removeServerService db srvId
|
||||
pure (srvId, fromIntegral subsCount)
|
||||
where
|
||||
getServerId db =
|
||||
|
||||
@@ -97,7 +97,7 @@ import Network.Socket (ServiceName, Socket, socketToHandle)
|
||||
import qualified Network.TLS as TLS
|
||||
import Numeric.Natural (Natural)
|
||||
import Simplex.Messaging.Agent.Lock
|
||||
import Simplex.Messaging.Client (ProtocolClient (thParams), ProtocolClientError (..), SMPClient, SMPClientError, forwardSMPTransmission, smpProxyError, temporaryClientError)
|
||||
import Simplex.Messaging.Client (ProtocolClient (thParams), ProtocolClientError (..), SMPClient, SMPClientError, clientHandlers, forwardSMPTransmission, smpProxyError, temporaryClientError)
|
||||
import Simplex.Messaging.Client.Agent (OwnServer, SMPClientAgent (..), SMPClientAgentEvent (..), closeSMPClientAgent, getSMPServerClient'', isOwnServer, lookupSMPServerClient, getConnectedSMPServerClient)
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Encoding
|
||||
@@ -1386,7 +1386,7 @@ client
|
||||
Just r -> Just <$> proxyServerResponse a r
|
||||
Nothing ->
|
||||
forkProxiedCmd $
|
||||
liftIO (runExceptT (getSMPServerClient'' a srv) `E.catch` (\(e :: SomeException) -> pure $ Left $ PCEIOError $ E.displayException e))
|
||||
liftIO (runExceptT (getSMPServerClient'' a srv) `E.catches` clientHandlers)
|
||||
>>= proxyServerResponse a
|
||||
proxyServerResponse :: SMPClientAgent 'Sender -> Either SMPClientError (OwnServer, SMPClient) -> M s BrokerMsg
|
||||
proxyServerResponse a smp_ = do
|
||||
@@ -1423,7 +1423,7 @@ client
|
||||
inc own pRequests
|
||||
if v >= sendingProxySMPVersion
|
||||
then forkProxiedCmd $ do
|
||||
liftIO (runExceptT (forwardSMPTransmission smp corrId fwdV pubKey encBlock) `E.catch` (\(e :: SomeException) -> pure $ Left $ PCEIOError $ E.displayException e)) >>= \case
|
||||
liftIO (runExceptT (forwardSMPTransmission smp corrId fwdV pubKey encBlock) `E.catches` clientHandlers) >>= \case
|
||||
Right r -> PRES r <$ inc own pSuccesses
|
||||
Left e -> ERR (smpProxyError e) <$ case e of
|
||||
PCEProtocolError {} -> inc own pSuccesses
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
module Simplex.Messaging.Transport.HTTP2.Client where
|
||||
|
||||
import Control.Concurrent.Async
|
||||
import Control.Exception (Handler (..), IOException, SomeAsyncException, SomeException)
|
||||
import qualified Control.Exception as E
|
||||
import Control.Monad
|
||||
import Data.Functor (($>))
|
||||
@@ -92,6 +93,13 @@ defaultHTTP2ClientConfig =
|
||||
data HTTP2ClientError = HCResponseTimeout | HCNetworkError NetworkError | HCIOError String
|
||||
deriving (Show)
|
||||
|
||||
httpClientHandlers :: [Handler (Either HTTP2ClientError a)]
|
||||
httpClientHandlers =
|
||||
[ Handler $ \(e :: IOException) -> pure $ Left $ HCIOError $ E.displayException e,
|
||||
Handler $ \(e :: SomeAsyncException) -> E.throwIO e,
|
||||
Handler $ \(e :: SomeException) -> pure $ Left $ HCNetworkError $ toNetworkError e
|
||||
]
|
||||
|
||||
getHTTP2Client :: HostName -> ServiceName -> Maybe XS.CertificateStore -> HTTP2ClientConfig -> IO () -> IO (Either HTTP2ClientError HTTP2Client)
|
||||
getHTTP2Client host port = getVerifiedHTTP2Client Nothing (THDomainName host) port Nothing
|
||||
|
||||
@@ -110,7 +118,7 @@ attachHTTP2Client config host port disconnected bufferSize tls = getVerifiedHTTP
|
||||
getVerifiedHTTP2ClientWith :: forall p. TransportPeerI p => HTTP2ClientConfig -> TransportHost -> ServiceName -> IO () -> ((TLS p -> H.Client HTTP2Response) -> IO HTTP2Response) -> IO (Either HTTP2ClientError HTTP2Client)
|
||||
getVerifiedHTTP2ClientWith config host port disconnected setup =
|
||||
(mkHTTPS2Client >>= runClient)
|
||||
`E.catch` \(e :: E.SomeException) -> pure $ Left $ HCIOError $ E.displayException e
|
||||
`E.catches` httpClientHandlers
|
||||
where
|
||||
mkHTTPS2Client :: IO HClient
|
||||
mkHTTPS2Client = do
|
||||
@@ -176,9 +184,9 @@ sendRequest HTTP2Client {client_ = HClient {config, reqQ}} req reqTimeout_ = do
|
||||
sendRequestDirect :: HTTP2Client -> Request -> Maybe Int -> IO (Either HTTP2ClientError HTTP2Response)
|
||||
sendRequestDirect HTTP2Client {client_ = HClient {config, disconnected}, sendReq} req reqTimeout_ = do
|
||||
let reqTimeout = http2RequestTimeout config reqTimeout_
|
||||
reqTimeout `timeout` E.try (sendReq req process) >>= \case
|
||||
reqTimeout `timeout` ((Right <$> sendReq req process) `E.catches` httpClientHandlers) >>= \case
|
||||
Just (Right r) -> pure $ Right r
|
||||
Just (Left (e :: E.SomeException)) -> disconnected $> Left (HCIOError $ E.displayException e)
|
||||
Just (Left e) -> disconnected $> Left e
|
||||
Nothing -> pure $ Left HCResponseTimeout
|
||||
where
|
||||
process r = do
|
||||
|
||||
@@ -1018,7 +1018,7 @@ testUpdateConnectionUserId :: HasCallStack => IO ()
|
||||
testUpdateConnectionUserId =
|
||||
withAgentClients2 $ \alice bob -> runRight_ $ do
|
||||
(connId, qInfo) <- createConnection alice 1 True SCMInvitation Nothing SMSubscribe
|
||||
newUserId <- createUser alice [noAuthSrvCfg testSMPServer] [noAuthSrvCfg testXFTPServer]
|
||||
newUserId <- createUser alice False [noAuthSrvCfg testSMPServer] [noAuthSrvCfg testXFTPServer]
|
||||
_ <- changeConnectionUser alice 1 connId newUserId
|
||||
aliceId <- A.prepareConnectionToJoin bob 1 True qInfo PQSupportOn
|
||||
sqSecured' <- A.joinConnection bob NRMInteractive 1 aliceId True qInfo "bob's connInfo" PQSupportOn SMSubscribe
|
||||
@@ -3001,7 +3001,7 @@ testUsers =
|
||||
withAgentClients2 $ \a b -> runRight_ $ do
|
||||
(aId, bId) <- makeConnection a b
|
||||
exchangeGreetings a bId b aId
|
||||
auId <- createUser a [noAuthSrvCfg testSMPServer] [noAuthSrvCfg testXFTPServer]
|
||||
auId <- createUser a False [noAuthSrvCfg testSMPServer] [noAuthSrvCfg testXFTPServer]
|
||||
(aId', bId') <- makeConnectionForUsers a auId b 1
|
||||
exchangeGreetings a bId' b aId'
|
||||
deleteUser a auId True
|
||||
@@ -3016,7 +3016,7 @@ testDeleteUserQuietly =
|
||||
withAgentClients2 $ \a b -> runRight_ $ do
|
||||
(aId, bId) <- makeConnection a b
|
||||
exchangeGreetings a bId b aId
|
||||
auId <- createUser a [noAuthSrvCfg testSMPServer] [noAuthSrvCfg testXFTPServer]
|
||||
auId <- createUser a False [noAuthSrvCfg testSMPServer] [noAuthSrvCfg testXFTPServer]
|
||||
(aId', bId') <- makeConnectionForUsers a auId b 1
|
||||
exchangeGreetings a bId' b aId'
|
||||
deleteUser a auId False
|
||||
@@ -3028,7 +3028,7 @@ testUsersNoServer ps = withAgentClientsCfg2 aCfg agentCfg $ \a b -> do
|
||||
(aId, bId, auId, _aId', bId') <- withSmpServerStoreLogOn ps testPort $ \_ -> runRight $ do
|
||||
(aId, bId) <- makeConnection a b
|
||||
exchangeGreetings a bId b aId
|
||||
auId <- createUser a [noAuthSrvCfg testSMPServer] [noAuthSrvCfg testXFTPServer]
|
||||
auId <- createUser a False [noAuthSrvCfg testSMPServer] [noAuthSrvCfg testXFTPServer]
|
||||
(aId', bId') <- makeConnectionForUsers a auId b 1
|
||||
exchangeGreetings a bId' b aId'
|
||||
pure (aId, bId, auId, aId', bId')
|
||||
@@ -3628,7 +3628,7 @@ testTwoUsers = withAgentClients2 $ \a b -> do
|
||||
("", "", UP _ _) <- nGet a
|
||||
a `hasClients` 1
|
||||
|
||||
aUserId2 <- createUser a [noAuthSrvCfg testSMPServer] [noAuthSrvCfg testXFTPServer]
|
||||
aUserId2 <- createUser a False [noAuthSrvCfg testSMPServer] [noAuthSrvCfg testXFTPServer]
|
||||
(aId2, bId2) <- makeConnectionForUsers a aUserId2 b 1
|
||||
exchangeGreetings a bId2 b aId2
|
||||
(aId2', bId2') <- makeConnectionForUsers a aUserId2 b 1
|
||||
|
||||
Reference in New Issue
Block a user