xftp: download timeout, reconnect CLI on timeouts, verbose logging (#664)

* xftp: download timeout, reconnect CLI on timeouts, verbose logging

* typo

Co-authored-by: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com>

---------

Co-authored-by: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com>
This commit is contained in:
Evgeny Poberezkin
2023-03-01 07:47:27 +00:00
committed by GitHub
parent 249bcc7bb3
commit 3d3eb335e8
3 changed files with 84 additions and 27 deletions
+13 -6
View File
@@ -31,6 +31,7 @@ import Simplex.Messaging.Client
proxyUsername,
transportClientConfig,
)
import Simplex.Messaging.Client.Agent ()
import qualified Simplex.Messaging.Crypto as C
import qualified Simplex.Messaging.Crypto.Lazy as LC
import Simplex.Messaging.Protocol
@@ -45,8 +46,8 @@ import Simplex.Messaging.Transport.Client (TransportClientConfig)
import Simplex.Messaging.Transport.HTTP2
import Simplex.Messaging.Transport.HTTP2.Client
import Simplex.Messaging.Util (bshow, liftEitherError, whenM)
import UnliftIO
import UnliftIO.Directory
import UnliftIO.IO
data XFTPClient = XFTPClient
{ http2Client :: HTTP2Client,
@@ -91,6 +92,9 @@ getXFTPClient transportSession@(_, srv, _) config@XFTPClientConfig {networkConfi
http2Client <- liftEitherError xftpClientError $ getVerifiedHTTP2Client (Just username) useHost usePort (Just keyHash) Nothing http2Config disconnected
pure XFTPClient {http2Client, config}
closeXFTPClient :: XFTPClient -> IO ()
closeXFTPClient XFTPClient {http2Client} = closeHTTP2Client http2Client
xftpHTTP2Config :: TransportClientConfig -> XFTPClientConfig -> HTTP2ClientConfig
xftpHTTP2Config transportConfig XFTPClientConfig {networkConfig = NetworkConfig {tcpConnectTimeout}} =
defaultHTTP2ClientConfig
@@ -149,7 +153,7 @@ uploadXFTPChunk c spKey fId chunkSpec =
sendXFTPCommand c spKey fId FPUT (Just chunkSpec) >>= okResponse
downloadXFTPChunk :: XFTPClient -> C.APrivateSignKey -> XFTPFileId -> XFTPRcvChunkSpec -> ExceptT XFTPClientError IO ()
downloadXFTPChunk c rpKey fId chunkSpec@XFTPRcvChunkSpec {filePath} = do
downloadXFTPChunk c@XFTPClient {config} rpKey fId chunkSpec@XFTPRcvChunkSpec {filePath, chunkSize} = do
(rDhKey, rpDhKey) <- liftIO C.generateKeyPair'
sendXFTPCommand c rpKey fId (FGET rDhKey) Nothing >>= \case
(FRFile sDhKey cbNonce, HTTP2Body {bodyHead, bodySize, bodyPart}) -> case bodyPart of
@@ -157,10 +161,13 @@ downloadXFTPChunk c rpKey fId chunkSpec@XFTPRcvChunkSpec {filePath} = do
Just chunkPart -> do
let dhSecret = C.dh' sDhKey rpDhKey
cbState <- liftEither . first PCECryptoError $ LC.cbInit dhSecret cbNonce
-- timeout download in the same way as upload
withExceptT PCEResponseError $
receiveEncFile chunkPart cbState chunkSpec `catchError` \e ->
whenM (doesFileExist filePath) (removeFile filePath) >> throwError e
let t = (fromIntegral chunkSize * uploadTimeoutPerMb config) `div` mb 1
t `timeout` download cbState >>= maybe (throwError PCEResponseTimeout) pure
where
download cbState =
withExceptT PCEResponseError $
receiveEncFile chunkPart cbState chunkSpec `catchError` \e ->
whenM (doesFileExist filePath) (removeFile filePath) >> throwError e
_ -> throwError $ PCEResponseError NO_FILE
(r, _) -> throwError . PCEUnexpectedResponse $ bshow r
+12 -1
View File
@@ -1,5 +1,6 @@
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedStrings #-}
@@ -20,7 +21,7 @@ import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol (ProtocolServer (..), XFTPServer)
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Util (tryError)
import Simplex.Messaging.Util (catchAll_, tryError)
import UnliftIO
type XFTPClientVar = TMVar (Either XFTPClientAgentError XFTPClient)
@@ -114,3 +115,13 @@ getXFTPServerClient XFTPClientAgent {xftpClients, config} srv = do
showServer :: XFTPServer -> Text
showServer ProtocolServer {host, port} =
decodeUtf8 $ strEncode host <> B.pack (if null port then "" else ':' : port)
closeXFTPServerClient :: XFTPClientAgent -> XFTPServer -> IO ()
closeXFTPServerClient XFTPClientAgent {xftpClients, config} srv =
atomically (TM.lookupDelete srv xftpClients) >>= mapM_ closeClient
where
closeClient cVar = do
let NetworkConfig {tcpConnectTimeout} = networkConfig $ xftpConfig config
tcpConnectTimeout `timeout` atomically (readTMVar cVar) >>= \case
Just (Right client) -> closeXFTPClient client `catchAll_` pure ()
_ -> pure ()
+59 -20
View File
@@ -1,11 +1,13 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
module Simplex.FileTransfer.Client.Main
( xftpClientCLI,
@@ -14,6 +16,7 @@ module Simplex.FileTransfer.Client.Main
where
import Control.Concurrent.STM (stateTVar)
import Control.Logger.Simple
import Control.Monad
import Control.Monad.Except
import Crypto.Random (getRandomBytes)
@@ -30,7 +33,9 @@ import qualified Data.List.NonEmpty as L
import Data.Map (Map)
import qualified Data.Map as M
import Data.Maybe (fromMaybe)
import qualified Data.Text as T
import Data.Word (Word32)
import GHC.Records (HasField (getField))
import Options.Applicative
import Simplex.FileTransfer.Client
import Simplex.FileTransfer.Client.Agent
@@ -44,7 +49,7 @@ import Simplex.Messaging.Encoding.String (StrEncoding (..))
import Simplex.Messaging.Parsers (parseAll)
import Simplex.Messaging.Protocol (ProtoServerWithAuth (..), SenderId, SndPrivateSignKey, SndPublicVerifyKey, XFTPServer, XFTPServerWithAuth)
import Simplex.Messaging.Server.CLI (getCliCommand')
import Simplex.Messaging.Util (ifM, whenM)
import Simplex.Messaging.Util (ifM, tshow, whenM)
import System.Exit (exitFailure)
import System.FilePath (splitExtensions, splitFileName, (</>))
import System.IO.Temp (getCanonicalTemporaryDirectory)
@@ -86,7 +91,8 @@ data SendOptions = SendOptions
numRecipients :: Int,
xftpServers :: [XFTPServerWithAuth],
retryCount :: Int,
tempPath :: Maybe FilePath
tempPath :: Maybe FilePath,
verbose :: Bool
}
deriving (Show)
@@ -94,13 +100,15 @@ data ReceiveOptions = ReceiveOptions
{ fileDescription :: FilePath,
filePath :: Maybe FilePath,
retryCount :: Int,
tempPath :: Maybe FilePath
tempPath :: Maybe FilePath,
verbose :: Bool
}
deriving (Show)
data DeleteOptions = DeleteOptions
{ fileDescription :: FilePath,
retryCount :: Int
retryCount :: Int,
verbose :: Bool
}
deriving (Show)
@@ -148,6 +156,7 @@ cliCommandP =
<*> xftpServers
<*> retryCountP
<*> temp
<*> verboseP
receiveP :: Parser ReceiveOptions
receiveP =
ReceiveOptions
@@ -155,11 +164,13 @@ cliCommandP =
<*> optional (argument str $ metavar "DIR" <> help "Directory to save file (default: system Downloads directory)")
<*> retryCountP
<*> temp
<*> verboseP
deleteP :: Parser DeleteOptions
deleteP =
DeleteOptions
<$> fileDescrArg
<*> retryCountP
<*> verboseP
infoP :: Parser InfoOptions
infoP = InfoOptions <$> fileDescrArg
randomP :: Parser RandomFileOptions
@@ -170,6 +181,7 @@ cliCommandP =
fileDescrArg = argument str (metavar "FILE" <> help "File description file")
retryCountP = option auto (long "retry" <> short 'r' <> metavar "RETRY" <> help "Number of network retries" <> value defaultRetryCount <> showDefault)
temp = optional (strOption $ long "tmp" <> metavar "TMP" <> help "Directory for temporary encrypted file (default: system temp directory)")
verboseP = switch (long "verbose" <> short 'v' <> help "Verbose output")
xftpServers =
option
parseXFTPServers
@@ -208,17 +220,25 @@ data SentRecipientReplica = SentRecipientReplica
chunkSize :: FileSize Word32
}
logCfg :: LogConfig
logCfg = LogConfig {lc_file = Nothing, lc_stderr = True}
xftpClientCLI :: IO ()
xftpClientCLI =
getCliCommand' cliCommandP clientVersion >>= \case
SendFile opts -> runE $ cliSendFile opts
ReceiveFile opts -> runE $ cliReceiveFile opts
DeleteFile opts -> runE $ cliDeleteFile opts
SendFile opts -> runLogE opts $ cliSendFile opts
ReceiveFile opts -> runLogE opts $ cliReceiveFile opts
DeleteFile opts -> runLogE opts $ cliDeleteFile opts
FileDescrInfo opts -> runE $ cliFileDescrInfo opts
RandomFile opts -> cliRandomFile opts
where
clientVersion = "SimpleX XFTP client v" <> xftpClientVersion
runLogE :: HasField "verbose" a Bool => a -> ExceptT CLIError IO () -> IO ()
runLogE opts a
| getField @"verbose" opts = setLogLevel LogDebug >> withGlobalLogging logCfg (runE a)
| otherwise = runE a
runE :: ExceptT CLIError IO () -> IO ()
runE a =
runExceptT a >>= \case
@@ -239,7 +259,7 @@ instance Encoding FileHeader where
pure FileHeader {fileName, fileExtra}
cliSendFile :: SendOptions -> ExceptT CLIError IO ()
cliSendFile SendOptions {filePath, outputDir, numRecipients, xftpServers, retryCount, tempPath} = do
cliSendFile SendOptions {filePath, outputDir, numRecipients, xftpServers, retryCount, tempPath, verbose} = do
let (_, fileName) = splitFileName filePath
(encPath, fdRcv, fdSnd, chunkSpecs, encSize) <- encryptFile fileName
liftIO $ printNoNewLine "Uploading file..."
@@ -274,6 +294,7 @@ cliSendFile SendOptions {filePath, outputDir, numRecipients, xftpServers, retryC
let chunkSpecs = prepareChunkSpecs encPath chunkSizes
fdRcv = FileDescription {party = SRecipient, size = FileSize encSize, digest = FileDigest digest, key, nonce, chunkSize = FileSize defChunkSize, chunks = []}
fdSnd = FileDescription {party = SSender, size = FileSize encSize, digest = FileDigest digest, key, nonce, chunkSize = FileSize defChunkSize, chunks = []}
logInfo $ "encrypted file to " <> tshow encPath
pure (encPath, fdRcv, fdSnd, chunkSpecs, encSize)
where
encrypt :: ByteString -> C.SbKey -> C.CbNonce -> Int64 -> Int64 -> FilePath -> ExceptT CLIError IO ()
@@ -293,19 +314,24 @@ cliSendFile SendOptions {filePath, outputDir, numRecipients, xftpServers, retryC
-- TODO shuffle/unshuffle chunks
-- the reason we don't do pooled downloads here within one server is that http2 library doesn't handle cleint concurrency, even though
-- upload doesn't allow other requests within the same client until complete (but download does allow).
logInfo $ "uploading " <> tshow (length chunks) <> " chunks..."
map snd . sortOn fst . concat <$> pooledForConcurrentlyN 16 chunks' (mapM $ uploadFileChunk a)
where
uploadFileChunk :: XFTPClientAgent -> (Int, XFTPChunkSpec, XFTPServerWithAuth) -> ExceptT CLIError IO (Int, SentFileChunk)
uploadFileChunk a (chunkNo, chunkSpec@XFTPChunkSpec {chunkSize}, ProtoServerWithAuth xftpServer auth) = do
logInfo $ "uploading chunk " <> tshow chunkNo <> " to " <> showServer xftpServer <> "..."
(sndKey, spKey) <- liftIO $ C.generateSignatureKeyPair C.SEd25519
rKeys <- liftIO $ L.fromList <$> replicateM numRecipients (C.generateSignatureKeyPair C.SEd25519)
chInfo@FileInfo {digest} <- liftIO $ getChunkInfo sndKey chunkSpec
ch@FileInfo {digest} <- liftIO $ getChunkInfo sndKey chunkSpec
c <- withRetry retryCount $ getXFTPServerClient a xftpServer
(sndId, rIds) <- withRetry retryCount $ createXFTPChunk c spKey chInfo (L.map fst rKeys) auth
withRetry retryCount $ uploadXFTPChunk c spKey sndId chunkSpec
(sndId, rIds) <- withRetry retryCount $ createXFTPChunk c spKey ch (L.map fst rKeys) auth
withReconnect a xftpServer retryCount $ \c' -> uploadXFTPChunk c' spKey sndId chunkSpec
logInfo $ "uploaded chunk " <> tshow chunkNo
uploaded <- atomically . stateTVar uploadedChunks $ \cs ->
let cs' = fromIntegral chunkSize : cs in (sum cs', cs')
liftIO $ printProgress "Uploaded" uploaded encSize
liftIO $ do
printProgress "Uploaded" uploaded encSize
when verbose $ putStrLn ""
let recipients = L.toList $ L.map ChunkReplicaId rIds `L.zip` L.map snd rKeys
replicas = [SentFileChunkReplica {server = xftpServer, recipients}]
pure (chunkNo, SentFileChunk {chunkNo, sndId, sndPrivateKey = spKey, chunkSize = FileSize $ fromIntegral chunkSize, digest = FileDigest digest, replicas})
@@ -384,7 +410,7 @@ cliSendFile SendOptions {filePath, outputDir, numRecipients, xftpServers, retryC
pure (fdRcvPaths, fdSndPath)
cliReceiveFile :: ReceiveOptions -> ExceptT CLIError IO ()
cliReceiveFile ReceiveOptions {fileDescription, filePath, retryCount, tempPath} =
cliReceiveFile ReceiveOptions {fileDescription, filePath, retryCount, tempPath, verbose} =
getFileDescription' fileDescription >>= receiveFile
where
receiveFile :: ValidFileDescription 'FPRecipient -> ExceptT CLIError IO ()
@@ -409,13 +435,16 @@ cliReceiveFile ReceiveOptions {fileDescription, filePath, retryCount, tempPath}
downloadFileChunk :: XFTPClientAgent -> FilePath -> FileSize Int64 -> TVar [Int64] -> FileChunk -> ExceptT CLIError IO (Int, FilePath)
downloadFileChunk a encPath (FileSize encSize) downloadedChunks FileChunk {chunkNo, chunkSize, digest, replicas = replica : _} = do
let FileChunkReplica {server, replicaId, replicaKey} = replica
logInfo $ "downloading chunk " <> tshow chunkNo <> " from " <> showServer server <> "..."
chunkPath <- uniqueCombine encPath $ show chunkNo
c <- withRetry retryCount $ getXFTPServerClient a server
let chunkSpec = XFTPRcvChunkSpec chunkPath (unFileSize chunkSize) (unFileDigest digest)
withRetry retryCount $ downloadXFTPChunk c replicaKey (unChunkReplicaId replicaId) chunkSpec
withReconnect a server retryCount $ \c -> downloadXFTPChunk c replicaKey (unChunkReplicaId replicaId) chunkSpec
logInfo $ "downloaded chunk " <> tshow chunkNo <> " to " <> T.pack chunkPath
downloaded <- atomically . stateTVar downloadedChunks $ \cs ->
let cs' = fromIntegral (unFileSize chunkSize) : cs in (sum cs', cs')
liftIO $ printProgress "Downloaded" downloaded encSize
liftIO $ do
printProgress "Downloaded" downloaded encSize
when verbose $ putStrLn ""
pure (chunkNo, chunkPath)
downloadFileChunk _ _ _ _ _ = throwError $ CLIError "chunk has no replicas"
decryptFile :: Int64 -> [FilePath] -> C.SbKey -> C.CbNonce -> ExceptT CLIError IO FilePath
@@ -469,10 +498,10 @@ cliDeleteFile DeleteOptions {fileDescription, retryCount} = do
forM_ chunks $ deleteFileChunk a
liftIO $ putStrLn "File deleted"
deleteFileChunk :: XFTPClientAgent -> FileChunk -> ExceptT CLIError IO ()
deleteFileChunk a FileChunk {replicas = replica : _} = do
deleteFileChunk a FileChunk {chunkNo, replicas = replica : _} = do
let FileChunkReplica {server, replicaId, replicaKey} = replica
c <- withRetry retryCount $ getXFTPServerClient a server
withRetry retryCount $ deleteXFTPChunk c replicaKey (unChunkReplicaId replicaId)
withReconnect a server retryCount $ \c -> deleteXFTPChunk c replicaKey (unChunkReplicaId replicaId)
logInfo $ "deleted chunk " <> tshow chunkNo <> " from " <> showServer server
deleteFileChunk _ _ = throwError $ CLIError "chunk has no replicas"
cliFileDescrInfo :: InfoOptions -> ExceptT CLIError IO ()
@@ -541,13 +570,23 @@ uniqueCombine filePath fileName = tryCombine (0 :: Int)
f = filePath </> (name <> suffix <> ext)
in ifM (doesPathExist f) (tryCombine $ n + 1) (pure f)
withReconnect :: Show e => XFTPClientAgent -> XFTPServer -> Int -> (XFTPClient -> ExceptT e IO a) -> ExceptT CLIError IO a
withReconnect a srv n run = withRetry n $ do
c <- withRetry n $ getXFTPServerClient a srv
withExceptT (CLIError . show) (run c) `catchError` \e -> do
liftIO $ closeXFTPServerClient a srv
throwError e
withRetry :: Show e => Int -> ExceptT e IO a -> ExceptT CLIError IO a
withRetry retryCount = withRetry' retryCount . withExceptT (CLIError . show)
where
withRetry' :: Int -> ExceptT CLIError IO a -> ExceptT CLIError IO a
withRetry' 0 _ = throwError $ CLIError "internal: no retry attempts"
withRetry' 1 a = a
withRetry' n a = a `catchError` \_ -> withRetry' (n - 1) a
withRetry' n a =
a `catchError` \e -> do
logWarn ("retrying: " <> tshow e)
withRetry' (n - 1) a
cliRandomFile :: RandomFileOptions -> IO ()
cliRandomFile RandomFileOptions {filePath, fileSize = FileSize size} = do