Files
simplexmq/src/Simplex/FileTransfer/Client/Agent.hs
Evgeny f3408d9bb6 explicit exports (#1719)
* explicit exports

* more empty exports

* add exports

* reorder

* use correct ControlProtocol type for xftp router

---------

Co-authored-by: Evgeny @ SimpleX Chat <259188159+evgeny-simplex@users.noreply.github.com>
2026-03-02 17:34:01 +00:00

143 lines
5.2 KiB
Haskell

{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedStrings #-}
module Simplex.FileTransfer.Client.Agent
( XFTPClientVar,
XFTPClientAgent (..),
XFTPClientAgentConfig (..),
XFTPClientAgentError (..),
defaultXFTPClientAgentConfig,
newXFTPAgent,
getXFTPServerClient,
showServer,
closeXFTPServerClient,
) where
import Control.Logger.Simple (logInfo)
import Control.Monad
import Control.Monad.Except
import Control.Monad.Trans (lift)
import Control.Monad.Trans.Except
import Data.Bifunctor (first)
import qualified Data.ByteString.Char8 as B
import Data.Text (Text)
import Data.Text.Encoding (decodeUtf8)
import Data.Time.Clock (UTCTime, getCurrentTime)
import Simplex.FileTransfer.Client
import Simplex.Messaging.Agent.RetryInterval
import Simplex.Messaging.Client (NetworkConfig (..), NetworkRequestMode (..), ProtocolClientError (..), netTimeoutInt, temporaryClientError)
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol (ProtocolServer (..), XFTPServer)
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Util (catchAll_)
import UnliftIO
type XFTPClientVar = TMVar (Either XFTPClientAgentError XFTPClient)
data XFTPClientAgent = XFTPClientAgent
{ xftpClients :: TMap XFTPServer XFTPClientVar,
startedAt :: UTCTime,
config :: XFTPClientAgentConfig
}
data XFTPClientAgentConfig = XFTPClientAgentConfig
{ xftpConfig :: XFTPClientConfig,
reconnectInterval :: RetryInterval
}
defaultXFTPClientAgentConfig :: XFTPClientAgentConfig
defaultXFTPClientAgentConfig =
XFTPClientAgentConfig
{ xftpConfig = defaultXFTPClientConfig,
reconnectInterval =
RetryInterval
{ initialInterval = 5_000000,
increaseAfter = 10_000000,
maxInterval = 60_000000
}
}
data XFTPClientAgentError = XFTPClientAgentError XFTPServer XFTPClientError
deriving (Show, Exception)
newXFTPAgent :: XFTPClientAgentConfig -> IO XFTPClientAgent
newXFTPAgent config = do
xftpClients <- TM.emptyIO
startedAt <- getCurrentTime
pure XFTPClientAgent {xftpClients, startedAt, config}
type ME a = ExceptT XFTPClientAgentError IO a
getXFTPServerClient :: XFTPClientAgent -> XFTPServer -> ME XFTPClient
getXFTPServerClient XFTPClientAgent {xftpClients, startedAt, config} srv = do
atomically getClientVar >>= either newXFTPClient waitForXFTPClient
where
connectClient :: ME XFTPClient
connectClient =
ExceptT $
first (XFTPClientAgentError srv)
<$> getXFTPClient (1, srv, Nothing) (xftpConfig config) [] startedAt clientDisconnected
clientDisconnected :: XFTPClient -> IO ()
clientDisconnected _ = do
atomically $ TM.delete srv xftpClients
logInfo $ "disconnected from " <> showServer srv
getClientVar :: STM (Either XFTPClientVar XFTPClientVar)
getClientVar = maybe (Left <$> newClientVar) (pure . Right) =<< TM.lookup srv xftpClients
where
newClientVar :: STM XFTPClientVar
newClientVar = do
var <- newEmptyTMVar
TM.insert srv var xftpClients
pure var
waitForXFTPClient :: XFTPClientVar -> ME XFTPClient
waitForXFTPClient clientVar = do
let XFTPClientConfig {xftpNetworkConfig = NetworkConfig {tcpConnectTimeout}} = xftpConfig config
client_ <- liftIO $ netTimeoutInt tcpConnectTimeout NRMBackground `timeout` atomically (readTMVar clientVar)
liftEither $ case client_ of
Just (Right c) -> Right c
Just (Left e) -> Left e
Nothing -> Left $ XFTPClientAgentError srv PCEResponseTimeout
newXFTPClient :: XFTPClientVar -> ME XFTPClient
newXFTPClient clientVar = tryConnectClient tryConnectAsync
where
tryConnectClient :: ME () -> ME XFTPClient
tryConnectClient retryAction =
tryError connectClient >>= \r -> case r of
Right client -> do
logInfo $ "connected to " <> showServer srv
atomically $ putTMVar clientVar r
pure client
Left e@(XFTPClientAgentError _ e') -> do
if temporaryClientError e'
then retryAction
else atomically $ do
putTMVar clientVar r
TM.delete srv xftpClients
throwE e
tryConnectAsync :: ME ()
tryConnectAsync = void . lift . async . runExceptT $ do
withRetryInterval (reconnectInterval config) $ \_ loop -> void $ tryConnectClient loop
showServer :: XFTPServer -> Text
showServer ProtocolServer {host, port} =
decodeUtf8 $ strEncode host <> B.pack (if null port then "" else ':' : port)
closeXFTPServerClient :: XFTPClientAgent -> XFTPServer -> IO ()
closeXFTPServerClient XFTPClientAgent {xftpClients, config} srv =
atomically (TM.lookupDelete srv xftpClients) >>= mapM_ closeClient
where
closeClient cVar = do
let NetworkConfig {tcpConnectTimeout} = xftpNetworkConfig $ xftpConfig config
netTimeoutInt tcpConnectTimeout NRMBackground `timeout` atomically (readTMVar cVar) >>= \case
Just (Right client) -> closeXFTPClient client `catchAll_` pure ()
_ -> pure ()