diff --git a/src/Simplex/FileTransfer/Client.hs b/src/Simplex/FileTransfer/Client.hs index 6b073b308..ff130b20e 100644 --- a/src/Simplex/FileTransfer/Client.hs +++ b/src/Simplex/FileTransfer/Client.hs @@ -31,6 +31,7 @@ import Simplex.Messaging.Client proxyUsername, transportClientConfig, ) +import Simplex.Messaging.Client.Agent () import qualified Simplex.Messaging.Crypto as C import qualified Simplex.Messaging.Crypto.Lazy as LC import Simplex.Messaging.Protocol @@ -45,8 +46,8 @@ import Simplex.Messaging.Transport.Client (TransportClientConfig) import Simplex.Messaging.Transport.HTTP2 import Simplex.Messaging.Transport.HTTP2.Client import Simplex.Messaging.Util (bshow, liftEitherError, whenM) +import UnliftIO import UnliftIO.Directory -import UnliftIO.IO data XFTPClient = XFTPClient { http2Client :: HTTP2Client, @@ -91,6 +92,9 @@ getXFTPClient transportSession@(_, srv, _) config@XFTPClientConfig {networkConfi http2Client <- liftEitherError xftpClientError $ getVerifiedHTTP2Client (Just username) useHost usePort (Just keyHash) Nothing http2Config disconnected pure XFTPClient {http2Client, config} +closeXFTPClient :: XFTPClient -> IO () +closeXFTPClient XFTPClient {http2Client} = closeHTTP2Client http2Client + xftpHTTP2Config :: TransportClientConfig -> XFTPClientConfig -> HTTP2ClientConfig xftpHTTP2Config transportConfig XFTPClientConfig {networkConfig = NetworkConfig {tcpConnectTimeout}} = defaultHTTP2ClientConfig @@ -149,7 +153,7 @@ uploadXFTPChunk c spKey fId chunkSpec = sendXFTPCommand c spKey fId FPUT (Just chunkSpec) >>= okResponse downloadXFTPChunk :: XFTPClient -> C.APrivateSignKey -> XFTPFileId -> XFTPRcvChunkSpec -> ExceptT XFTPClientError IO () -downloadXFTPChunk c rpKey fId chunkSpec@XFTPRcvChunkSpec {filePath} = do +downloadXFTPChunk c@XFTPClient {config} rpKey fId chunkSpec@XFTPRcvChunkSpec {filePath, chunkSize} = do (rDhKey, rpDhKey) <- liftIO C.generateKeyPair' sendXFTPCommand c rpKey fId (FGET rDhKey) Nothing >>= \case (FRFile sDhKey cbNonce, HTTP2Body {bodyHead, bodySize, bodyPart}) -> case bodyPart of @@ -157,10 +161,13 @@ downloadXFTPChunk c rpKey fId chunkSpec@XFTPRcvChunkSpec {filePath} = do Just chunkPart -> do let dhSecret = C.dh' sDhKey rpDhKey cbState <- liftEither . first PCECryptoError $ LC.cbInit dhSecret cbNonce - -- timeout download in the same way as upload - withExceptT PCEResponseError $ - receiveEncFile chunkPart cbState chunkSpec `catchError` \e -> - whenM (doesFileExist filePath) (removeFile filePath) >> throwError e + let t = (fromIntegral chunkSize * uploadTimeoutPerMb config) `div` mb 1 + t `timeout` download cbState >>= maybe (throwError PCEResponseTimeout) pure + where + download cbState = + withExceptT PCEResponseError $ + receiveEncFile chunkPart cbState chunkSpec `catchError` \e -> + whenM (doesFileExist filePath) (removeFile filePath) >> throwError e _ -> throwError $ PCEResponseError NO_FILE (r, _) -> throwError . PCEUnexpectedResponse $ bshow r diff --git a/src/Simplex/FileTransfer/Client/Agent.hs b/src/Simplex/FileTransfer/Client/Agent.hs index f52f229fc..e30956390 100644 --- a/src/Simplex/FileTransfer/Client/Agent.hs +++ b/src/Simplex/FileTransfer/Client/Agent.hs @@ -1,5 +1,6 @@ {-# LANGUAGE DeriveAnyClass #-} {-# LANGUAGE DuplicateRecordFields #-} +{-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE NumericUnderscores #-} {-# LANGUAGE OverloadedStrings #-} @@ -20,7 +21,7 @@ import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol (ProtocolServer (..), XFTPServer) import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM -import Simplex.Messaging.Util (tryError) +import Simplex.Messaging.Util (catchAll_, tryError) import UnliftIO type XFTPClientVar = TMVar (Either XFTPClientAgentError XFTPClient) @@ -114,3 +115,13 @@ getXFTPServerClient XFTPClientAgent {xftpClients, config} srv = do showServer :: XFTPServer -> Text showServer ProtocolServer {host, port} = decodeUtf8 $ strEncode host <> B.pack (if null port then "" else ':' : port) + +closeXFTPServerClient :: XFTPClientAgent -> XFTPServer -> IO () +closeXFTPServerClient XFTPClientAgent {xftpClients, config} srv = + atomically (TM.lookupDelete srv xftpClients) >>= mapM_ closeClient + where + closeClient cVar = do + let NetworkConfig {tcpConnectTimeout} = networkConfig $ xftpConfig config + tcpConnectTimeout `timeout` atomically (readTMVar cVar) >>= \case + Just (Right client) -> closeXFTPClient client `catchAll_` pure () + _ -> pure () diff --git a/src/Simplex/FileTransfer/Client/Main.hs b/src/Simplex/FileTransfer/Client/Main.hs index f1c777177..4b92a90c1 100644 --- a/src/Simplex/FileTransfer/Client/Main.hs +++ b/src/Simplex/FileTransfer/Client/Main.hs @@ -1,11 +1,13 @@ {-# LANGUAGE DataKinds #-} {-# LANGUAGE DeriveAnyClass #-} {-# LANGUAGE DuplicateRecordFields #-} +{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TypeApplications #-} module Simplex.FileTransfer.Client.Main ( xftpClientCLI, @@ -14,6 +16,7 @@ module Simplex.FileTransfer.Client.Main where import Control.Concurrent.STM (stateTVar) +import Control.Logger.Simple import Control.Monad import Control.Monad.Except import Crypto.Random (getRandomBytes) @@ -30,7 +33,9 @@ import qualified Data.List.NonEmpty as L import Data.Map (Map) import qualified Data.Map as M import Data.Maybe (fromMaybe) +import qualified Data.Text as T import Data.Word (Word32) +import GHC.Records (HasField (getField)) import Options.Applicative import Simplex.FileTransfer.Client import Simplex.FileTransfer.Client.Agent @@ -44,7 +49,7 @@ import Simplex.Messaging.Encoding.String (StrEncoding (..)) import Simplex.Messaging.Parsers (parseAll) import Simplex.Messaging.Protocol (ProtoServerWithAuth (..), SenderId, SndPrivateSignKey, SndPublicVerifyKey, XFTPServer, XFTPServerWithAuth) import Simplex.Messaging.Server.CLI (getCliCommand') -import Simplex.Messaging.Util (ifM, whenM) +import Simplex.Messaging.Util (ifM, tshow, whenM) import System.Exit (exitFailure) import System.FilePath (splitExtensions, splitFileName, ()) import System.IO.Temp (getCanonicalTemporaryDirectory) @@ -86,7 +91,8 @@ data SendOptions = SendOptions numRecipients :: Int, xftpServers :: [XFTPServerWithAuth], retryCount :: Int, - tempPath :: Maybe FilePath + tempPath :: Maybe FilePath, + verbose :: Bool } deriving (Show) @@ -94,13 +100,15 @@ data ReceiveOptions = ReceiveOptions { fileDescription :: FilePath, filePath :: Maybe FilePath, retryCount :: Int, - tempPath :: Maybe FilePath + tempPath :: Maybe FilePath, + verbose :: Bool } deriving (Show) data DeleteOptions = DeleteOptions { fileDescription :: FilePath, - retryCount :: Int + retryCount :: Int, + verbose :: Bool } deriving (Show) @@ -148,6 +156,7 @@ cliCommandP = <*> xftpServers <*> retryCountP <*> temp + <*> verboseP receiveP :: Parser ReceiveOptions receiveP = ReceiveOptions @@ -155,11 +164,13 @@ cliCommandP = <*> optional (argument str $ metavar "DIR" <> help "Directory to save file (default: system Downloads directory)") <*> retryCountP <*> temp + <*> verboseP deleteP :: Parser DeleteOptions deleteP = DeleteOptions <$> fileDescrArg <*> retryCountP + <*> verboseP infoP :: Parser InfoOptions infoP = InfoOptions <$> fileDescrArg randomP :: Parser RandomFileOptions @@ -170,6 +181,7 @@ cliCommandP = fileDescrArg = argument str (metavar "FILE" <> help "File description file") retryCountP = option auto (long "retry" <> short 'r' <> metavar "RETRY" <> help "Number of network retries" <> value defaultRetryCount <> showDefault) temp = optional (strOption $ long "tmp" <> metavar "TMP" <> help "Directory for temporary encrypted file (default: system temp directory)") + verboseP = switch (long "verbose" <> short 'v' <> help "Verbose output") xftpServers = option parseXFTPServers @@ -208,17 +220,25 @@ data SentRecipientReplica = SentRecipientReplica chunkSize :: FileSize Word32 } +logCfg :: LogConfig +logCfg = LogConfig {lc_file = Nothing, lc_stderr = True} + xftpClientCLI :: IO () xftpClientCLI = getCliCommand' cliCommandP clientVersion >>= \case - SendFile opts -> runE $ cliSendFile opts - ReceiveFile opts -> runE $ cliReceiveFile opts - DeleteFile opts -> runE $ cliDeleteFile opts + SendFile opts -> runLogE opts $ cliSendFile opts + ReceiveFile opts -> runLogE opts $ cliReceiveFile opts + DeleteFile opts -> runLogE opts $ cliDeleteFile opts FileDescrInfo opts -> runE $ cliFileDescrInfo opts RandomFile opts -> cliRandomFile opts where clientVersion = "SimpleX XFTP client v" <> xftpClientVersion +runLogE :: HasField "verbose" a Bool => a -> ExceptT CLIError IO () -> IO () +runLogE opts a + | getField @"verbose" opts = setLogLevel LogDebug >> withGlobalLogging logCfg (runE a) + | otherwise = runE a + runE :: ExceptT CLIError IO () -> IO () runE a = runExceptT a >>= \case @@ -239,7 +259,7 @@ instance Encoding FileHeader where pure FileHeader {fileName, fileExtra} cliSendFile :: SendOptions -> ExceptT CLIError IO () -cliSendFile SendOptions {filePath, outputDir, numRecipients, xftpServers, retryCount, tempPath} = do +cliSendFile SendOptions {filePath, outputDir, numRecipients, xftpServers, retryCount, tempPath, verbose} = do let (_, fileName) = splitFileName filePath (encPath, fdRcv, fdSnd, chunkSpecs, encSize) <- encryptFile fileName liftIO $ printNoNewLine "Uploading file..." @@ -274,6 +294,7 @@ cliSendFile SendOptions {filePath, outputDir, numRecipients, xftpServers, retryC let chunkSpecs = prepareChunkSpecs encPath chunkSizes fdRcv = FileDescription {party = SRecipient, size = FileSize encSize, digest = FileDigest digest, key, nonce, chunkSize = FileSize defChunkSize, chunks = []} fdSnd = FileDescription {party = SSender, size = FileSize encSize, digest = FileDigest digest, key, nonce, chunkSize = FileSize defChunkSize, chunks = []} + logInfo $ "encrypted file to " <> tshow encPath pure (encPath, fdRcv, fdSnd, chunkSpecs, encSize) where encrypt :: ByteString -> C.SbKey -> C.CbNonce -> Int64 -> Int64 -> FilePath -> ExceptT CLIError IO () @@ -293,19 +314,24 @@ cliSendFile SendOptions {filePath, outputDir, numRecipients, xftpServers, retryC -- TODO shuffle/unshuffle chunks -- the reason we don't do pooled downloads here within one server is that http2 library doesn't handle cleint concurrency, even though -- upload doesn't allow other requests within the same client until complete (but download does allow). + logInfo $ "uploading " <> tshow (length chunks) <> " chunks..." map snd . sortOn fst . concat <$> pooledForConcurrentlyN 16 chunks' (mapM $ uploadFileChunk a) where uploadFileChunk :: XFTPClientAgent -> (Int, XFTPChunkSpec, XFTPServerWithAuth) -> ExceptT CLIError IO (Int, SentFileChunk) uploadFileChunk a (chunkNo, chunkSpec@XFTPChunkSpec {chunkSize}, ProtoServerWithAuth xftpServer auth) = do + logInfo $ "uploading chunk " <> tshow chunkNo <> " to " <> showServer xftpServer <> "..." (sndKey, spKey) <- liftIO $ C.generateSignatureKeyPair C.SEd25519 rKeys <- liftIO $ L.fromList <$> replicateM numRecipients (C.generateSignatureKeyPair C.SEd25519) - chInfo@FileInfo {digest} <- liftIO $ getChunkInfo sndKey chunkSpec + ch@FileInfo {digest} <- liftIO $ getChunkInfo sndKey chunkSpec c <- withRetry retryCount $ getXFTPServerClient a xftpServer - (sndId, rIds) <- withRetry retryCount $ createXFTPChunk c spKey chInfo (L.map fst rKeys) auth - withRetry retryCount $ uploadXFTPChunk c spKey sndId chunkSpec + (sndId, rIds) <- withRetry retryCount $ createXFTPChunk c spKey ch (L.map fst rKeys) auth + withReconnect a xftpServer retryCount $ \c' -> uploadXFTPChunk c' spKey sndId chunkSpec + logInfo $ "uploaded chunk " <> tshow chunkNo uploaded <- atomically . stateTVar uploadedChunks $ \cs -> let cs' = fromIntegral chunkSize : cs in (sum cs', cs') - liftIO $ printProgress "Uploaded" uploaded encSize + liftIO $ do + printProgress "Uploaded" uploaded encSize + when verbose $ putStrLn "" let recipients = L.toList $ L.map ChunkReplicaId rIds `L.zip` L.map snd rKeys replicas = [SentFileChunkReplica {server = xftpServer, recipients}] pure (chunkNo, SentFileChunk {chunkNo, sndId, sndPrivateKey = spKey, chunkSize = FileSize $ fromIntegral chunkSize, digest = FileDigest digest, replicas}) @@ -384,7 +410,7 @@ cliSendFile SendOptions {filePath, outputDir, numRecipients, xftpServers, retryC pure (fdRcvPaths, fdSndPath) cliReceiveFile :: ReceiveOptions -> ExceptT CLIError IO () -cliReceiveFile ReceiveOptions {fileDescription, filePath, retryCount, tempPath} = +cliReceiveFile ReceiveOptions {fileDescription, filePath, retryCount, tempPath, verbose} = getFileDescription' fileDescription >>= receiveFile where receiveFile :: ValidFileDescription 'FPRecipient -> ExceptT CLIError IO () @@ -409,13 +435,16 @@ cliReceiveFile ReceiveOptions {fileDescription, filePath, retryCount, tempPath} downloadFileChunk :: XFTPClientAgent -> FilePath -> FileSize Int64 -> TVar [Int64] -> FileChunk -> ExceptT CLIError IO (Int, FilePath) downloadFileChunk a encPath (FileSize encSize) downloadedChunks FileChunk {chunkNo, chunkSize, digest, replicas = replica : _} = do let FileChunkReplica {server, replicaId, replicaKey} = replica + logInfo $ "downloading chunk " <> tshow chunkNo <> " from " <> showServer server <> "..." chunkPath <- uniqueCombine encPath $ show chunkNo - c <- withRetry retryCount $ getXFTPServerClient a server let chunkSpec = XFTPRcvChunkSpec chunkPath (unFileSize chunkSize) (unFileDigest digest) - withRetry retryCount $ downloadXFTPChunk c replicaKey (unChunkReplicaId replicaId) chunkSpec + withReconnect a server retryCount $ \c -> downloadXFTPChunk c replicaKey (unChunkReplicaId replicaId) chunkSpec + logInfo $ "downloaded chunk " <> tshow chunkNo <> " to " <> T.pack chunkPath downloaded <- atomically . stateTVar downloadedChunks $ \cs -> let cs' = fromIntegral (unFileSize chunkSize) : cs in (sum cs', cs') - liftIO $ printProgress "Downloaded" downloaded encSize + liftIO $ do + printProgress "Downloaded" downloaded encSize + when verbose $ putStrLn "" pure (chunkNo, chunkPath) downloadFileChunk _ _ _ _ _ = throwError $ CLIError "chunk has no replicas" decryptFile :: Int64 -> [FilePath] -> C.SbKey -> C.CbNonce -> ExceptT CLIError IO FilePath @@ -469,10 +498,10 @@ cliDeleteFile DeleteOptions {fileDescription, retryCount} = do forM_ chunks $ deleteFileChunk a liftIO $ putStrLn "File deleted" deleteFileChunk :: XFTPClientAgent -> FileChunk -> ExceptT CLIError IO () - deleteFileChunk a FileChunk {replicas = replica : _} = do + deleteFileChunk a FileChunk {chunkNo, replicas = replica : _} = do let FileChunkReplica {server, replicaId, replicaKey} = replica - c <- withRetry retryCount $ getXFTPServerClient a server - withRetry retryCount $ deleteXFTPChunk c replicaKey (unChunkReplicaId replicaId) + withReconnect a server retryCount $ \c -> deleteXFTPChunk c replicaKey (unChunkReplicaId replicaId) + logInfo $ "deleted chunk " <> tshow chunkNo <> " from " <> showServer server deleteFileChunk _ _ = throwError $ CLIError "chunk has no replicas" cliFileDescrInfo :: InfoOptions -> ExceptT CLIError IO () @@ -541,13 +570,23 @@ uniqueCombine filePath fileName = tryCombine (0 :: Int) f = filePath (name <> suffix <> ext) in ifM (doesPathExist f) (tryCombine $ n + 1) (pure f) +withReconnect :: Show e => XFTPClientAgent -> XFTPServer -> Int -> (XFTPClient -> ExceptT e IO a) -> ExceptT CLIError IO a +withReconnect a srv n run = withRetry n $ do + c <- withRetry n $ getXFTPServerClient a srv + withExceptT (CLIError . show) (run c) `catchError` \e -> do + liftIO $ closeXFTPServerClient a srv + throwError e + withRetry :: Show e => Int -> ExceptT e IO a -> ExceptT CLIError IO a withRetry retryCount = withRetry' retryCount . withExceptT (CLIError . show) where withRetry' :: Int -> ExceptT CLIError IO a -> ExceptT CLIError IO a withRetry' 0 _ = throwError $ CLIError "internal: no retry attempts" withRetry' 1 a = a - withRetry' n a = a `catchError` \_ -> withRetry' (n - 1) a + withRetry' n a = + a `catchError` \e -> do + logWarn ("retrying: " <> tshow e) + withRetry' (n - 1) a cliRandomFile :: RandomFileOptions -> IO () cliRandomFile RandomFileOptions {filePath, fileSize = FileSize size} = do