mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-24 23:26:00 +00:00
xftp: cli - option to pass servers, choose servers randomly (#641)
This commit is contained in:
@@ -7,6 +7,7 @@
|
||||
|
||||
module Simplex.FileTransfer.Client.Main (xftpClientCLI) where
|
||||
|
||||
import Control.Concurrent.STM (stateTVar)
|
||||
import Control.Monad
|
||||
import Control.Monad.Except
|
||||
import Crypto.Random (getRandomBytes)
|
||||
@@ -17,6 +18,7 @@ import qualified Data.ByteString.Char8 as B
|
||||
import qualified Data.ByteString.Lazy.Char8 as LB
|
||||
import Data.Int (Int64)
|
||||
import Data.List (foldl', sortOn)
|
||||
import Data.List.NonEmpty (NonEmpty (..), nonEmpty)
|
||||
import qualified Data.List.NonEmpty as L
|
||||
import Data.Map (Map)
|
||||
import qualified Data.Map as M
|
||||
@@ -32,12 +34,14 @@ import qualified Simplex.Messaging.Crypto as C
|
||||
import qualified Simplex.Messaging.Crypto.Lazy as LC
|
||||
import Simplex.Messaging.Encoding
|
||||
import Simplex.Messaging.Encoding.String (StrEncoding (..))
|
||||
import Simplex.Messaging.Parsers (parseAll)
|
||||
import Simplex.Messaging.Protocol (SenderId, SndPrivateSignKey, SndPublicVerifyKey, XFTPServer)
|
||||
import Simplex.Messaging.Server.CLI (getCliCommand')
|
||||
import Simplex.Messaging.Util (ifM, whenM)
|
||||
import System.Exit (exitFailure)
|
||||
import System.FilePath (splitExtensions, splitFileName, (</>))
|
||||
import System.IO.Temp (getCanonicalTemporaryDirectory)
|
||||
import System.Random (StdGen, newStdGen, randomR)
|
||||
import UnliftIO
|
||||
import UnliftIO.Directory
|
||||
|
||||
@@ -71,6 +75,7 @@ data SendOptions = SendOptions
|
||||
{ filePath :: FilePath,
|
||||
outputDir :: Maybe FilePath,
|
||||
numRecipients :: Int,
|
||||
xftpServers :: [XFTPServer],
|
||||
retryCount :: Int,
|
||||
tempPath :: Maybe FilePath
|
||||
}
|
||||
@@ -93,8 +98,8 @@ data RandomFileOptions = RandomFileOptions
|
||||
defaultRetryCount :: Int
|
||||
defaultRetryCount = 3
|
||||
|
||||
xftpServer :: XFTPServer
|
||||
xftpServer = "xftp://vr0bXzm4iKkLvleRMxLznTS-lHjXEyXunxn_7VJckk4=@localhost:443"
|
||||
defaultXFTPServers :: NonEmpty XFTPServer
|
||||
defaultXFTPServers = L.fromList ["xftp://vr0bXzm4iKkLvleRMxLznTS-lHjXEyXunxn_7VJckk4=@localhost:443"]
|
||||
|
||||
cliCommandP :: Parser CliCommand
|
||||
cliCommandP =
|
||||
@@ -110,6 +115,7 @@ cliCommandP =
|
||||
<$> argument str (metavar "FILE" <> help "File to send")
|
||||
<*> optional (argument str $ metavar "DIR" <> help "Directory to save file descriptions (default: current directory)")
|
||||
<*> option auto (short 'n' <> metavar "COUNT" <> help "Number of recipients" <> value 1 <> showDefault)
|
||||
<*> xftpServers
|
||||
<*> retries
|
||||
<*> temp
|
||||
receiveP :: Parser ReceiveOptions
|
||||
@@ -127,6 +133,17 @@ cliCommandP =
|
||||
strDec = eitherReader $ strDecode . B.pack
|
||||
retries = option auto (long "retry" <> short 'r' <> metavar "RETRY" <> help "Number of network retries" <> value defaultRetryCount <> showDefault)
|
||||
temp = optional (strOption $ long "temp" <> metavar "TEMP" <> help "Directory for temporary encrypted file (default: system temp directory)")
|
||||
xftpServers =
|
||||
option
|
||||
parseXFTPServers
|
||||
( long "servers"
|
||||
<> short 's'
|
||||
<> metavar "SERVER"
|
||||
<> help "Semicolon-separated list of XFTP server(s) to use (each server can have more than one hostname)"
|
||||
<> value []
|
||||
)
|
||||
parseXFTPServers = eitherReader $ parseAll xftpServersP . B.pack
|
||||
xftpServersP = strP `A.sepBy1` A.char ';'
|
||||
|
||||
data SentFileChunk = SentFileChunk
|
||||
{ chunkNo :: Int,
|
||||
@@ -183,7 +200,7 @@ instance Encoding FileHeader where
|
||||
pure FileHeader {fileName, fileExtra}
|
||||
|
||||
cliSendFile :: SendOptions -> ExceptT CLIError IO ()
|
||||
cliSendFile SendOptions {filePath, outputDir, numRecipients, retryCount, tempPath} = do
|
||||
cliSendFile SendOptions {filePath, outputDir, numRecipients, xftpServers, retryCount, tempPath} = do
|
||||
let (_, fileName) = splitFileName filePath
|
||||
(encPath, fd, chunkSpecs) <- encryptFile fileName
|
||||
sentChunks <- uploadFile chunkSpecs
|
||||
@@ -218,18 +235,20 @@ cliSendFile SendOptions {filePath, outputDir, numRecipients, retryCount, tempPat
|
||||
uploadFile :: [XFTPChunkSpec] -> ExceptT CLIError IO [SentFileChunk]
|
||||
uploadFile chunks = do
|
||||
a <- atomically $ newXFTPAgent defaultXFTPClientAgentConfig
|
||||
gen <- newTVarIO =<< liftIO newStdGen
|
||||
let xftpSrvs = fromMaybe defaultXFTPServers (nonEmpty xftpServers)
|
||||
-- TODO shuffle chunks
|
||||
sentChunks <- pooledForConcurrentlyN 32 (zip [1 ..] chunks) $ uploadFileChunk a
|
||||
sentChunks <- pooledForConcurrentlyN 32 (zip [1 ..] chunks) $ uploadFileChunk a gen xftpSrvs
|
||||
-- TODO unshuffle chunks
|
||||
pure $ map snd sentChunks
|
||||
where
|
||||
retries = withRetry retryCount
|
||||
uploadFileChunk :: XFTPClientAgent -> (Int, XFTPChunkSpec) -> ExceptT CLIError IO (Int, SentFileChunk)
|
||||
uploadFileChunk a (chunkNo, chunkSpec@XFTPChunkSpec {chunkSize}) = do
|
||||
uploadFileChunk :: XFTPClientAgent -> TVar StdGen -> NonEmpty XFTPServer -> (Int, XFTPChunkSpec) -> ExceptT CLIError IO (Int, SentFileChunk)
|
||||
uploadFileChunk a gen srvs (chunkNo, chunkSpec@XFTPChunkSpec {chunkSize}) = do
|
||||
(sndKey, spKey) <- liftIO $ C.generateSignatureKeyPair C.SEd25519
|
||||
rKeys <- liftIO $ L.fromList <$> replicateM numRecipients (C.generateSignatureKeyPair C.SEd25519)
|
||||
chInfo@FileInfo {digest} <- liftIO $ getChunkInfo sndKey chunkSpec
|
||||
-- TODO choose server randomly
|
||||
xftpServer <- liftIO $ getXFTPServer gen srvs
|
||||
c <- retries $ withExceptT (CLIError . show) $ getXFTPServerClient a xftpServer
|
||||
(sndId, rIds) <- retries $ withExceptT (CLIError . show) $ createXFTPChunk c spKey chInfo $ L.map fst rKeys
|
||||
retries $ withExceptT (CLIError . show) $ uploadXFTPChunk c spKey sndId chunkSpec
|
||||
@@ -242,6 +261,11 @@ cliSendFile SendOptions {filePath, outputDir, numRecipients, retryCount, tempPat
|
||||
hSeek h AbsoluteSeek $ fromIntegral chunkOffset
|
||||
digest <- LC.sha512Hash <$> LB.hGet h (fromIntegral chunkSize)
|
||||
pure FileInfo {sndKey, size = fromIntegral chunkSize, digest}
|
||||
getXFTPServer :: TVar StdGen -> NonEmpty XFTPServer -> IO XFTPServer
|
||||
getXFTPServer gen = \case
|
||||
srv :| [] -> pure srv
|
||||
servers -> do
|
||||
atomically $ (servers L.!!) <$> stateTVar gen (randomR (0, L.length servers - 1))
|
||||
|
||||
-- M chunks, R replicas, N recipients
|
||||
-- rcvReplicas: M[SentFileChunk] -> M * R * N [SentRecipientReplica]
|
||||
|
||||
Reference in New Issue
Block a user