mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-30 20:45:52 +00:00
xftp: download chunks to separate files
This commit is contained in:
@@ -14,6 +14,7 @@ import Data.Bifunctor (first)
|
||||
import Data.ByteString.Builder (Builder, byteString)
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.Functor (($>))
|
||||
import Data.Int (Int64)
|
||||
import Data.List.NonEmpty (NonEmpty (..))
|
||||
import Data.Word (Word32)
|
||||
@@ -41,7 +42,8 @@ import Simplex.Messaging.Transport (supportedParameters)
|
||||
import Simplex.Messaging.Transport.Client (TransportClientConfig)
|
||||
import Simplex.Messaging.Transport.HTTP2
|
||||
import Simplex.Messaging.Transport.HTTP2.Client
|
||||
import Simplex.Messaging.Util (bshow, liftEitherError)
|
||||
import Simplex.Messaging.Util (bshow, liftEitherError, whenM)
|
||||
import System.Directory (doesFileExist, removeFile)
|
||||
import System.IO (IOMode (..), SeekMode (..))
|
||||
import UnliftIO.IO (hSeek, withFile)
|
||||
|
||||
@@ -150,14 +152,13 @@ downloadXFTPChunk c rpKey fId rKey =
|
||||
_ -> throwError $ PCEResponseError NO_FILE
|
||||
(r, _) -> throwError . PCEUnexpectedResponse $ bshow r
|
||||
|
||||
receiveXFTPChunk :: XFTPChunkBody -> XFTPChunkSpec -> ExceptT XFTPClientError IO ()
|
||||
receiveXFTPChunk XFTPChunkBody {chunkPart} XFTPChunkSpec {filePath, chunkOffset, chunkSize} = do
|
||||
withExceptT PCEResponseError . ExceptT $
|
||||
withFile filePath AppendMode $ \h -> do
|
||||
-- hSeek h AbsoluteSeek $ fromIntegral chunkOffset
|
||||
-- TODO chunk decryption
|
||||
-- TODO delete chunk once it is saved to a separate file
|
||||
receiveFile h chunkPart chunkSize
|
||||
receiveXFTPChunk :: XFTPChunkBody -> FilePath -> Word32 -> ExceptT XFTPClientError IO ()
|
||||
receiveXFTPChunk XFTPChunkBody {chunkPart} filePath chunkSize = do
|
||||
withExceptT PCEResponseError . ExceptT $ do
|
||||
-- TODO chunk decryption
|
||||
withFile filePath WriteMode (\h -> receiveFile h chunkPart chunkSize) >>= \case
|
||||
Right () -> pure $ Right ()
|
||||
Left e -> whenM (doesFileExist filePath) (removeFile filePath) $> Left e
|
||||
|
||||
-- FADD :: NonEmpty RcvPublicVerifyKey -> FileCommand Sender
|
||||
-- FDEL :: FileCommand Sender
|
||||
|
||||
@@ -330,39 +330,33 @@ cliReceiveFile :: ReceiveOptions -> ExceptT CLIError IO ()
|
||||
cliReceiveFile ReceiveOptions {fileDescription, filePath, retryCount, tempPath} = do
|
||||
ValidFileDescription FileDescription {size, digest, key, nonce, chunks} <- getFileDescription fileDescription
|
||||
encPath <- getEncPath tempPath "xftp"
|
||||
-- withFile encPath WriteMode $ \h -> do
|
||||
-- liftIO $ LB.hPut h $ LB.replicate (unFileSize size) '#'
|
||||
createDirectory encPath
|
||||
a <- atomically $ newXFTPAgent defaultXFTPClientAgentConfig
|
||||
writeLock <- atomically createLock
|
||||
let chunkSizes = prepareChunkSizes $ unFileSize size
|
||||
chunkSpecs = prepareChunkSpecs encPath chunkSizes
|
||||
-- chunks have to be ordered because of AppendMode
|
||||
forM_ (zip chunkSpecs chunks) $ \(chunkSpec, chunk) -> do
|
||||
downloadFileChunk a writeLock chunk chunkSpec
|
||||
encDigest <- liftIO $ LC.sha512Hash <$> LB.readFile encPath
|
||||
chunkPaths <- pooledForConcurrentlyN 32 chunks $ downloadFileChunk a encPath
|
||||
encDigest <- liftIO $ LC.sha512Hash <$> readChunks chunkPaths
|
||||
when (encDigest /= unFileDigest digest) $ throwError $ CLIError "File digest mismatch"
|
||||
path <- decryptFile encPath key nonce
|
||||
whenM (doesFileExist encPath) $ removeFile encPath
|
||||
path <- decryptFile chunkPaths key nonce
|
||||
whenM (doesPathExist encPath) $ removeDirectoryRecursive encPath
|
||||
liftIO $ putStrLn $ "File received: " <> path
|
||||
where
|
||||
retries :: Show e => ExceptT e IO a -> ExceptT CLIError IO a
|
||||
retries = withRetry retryCount . withExceptT (CLIError . show)
|
||||
downloadFileChunk :: XFTPClientAgent -> Lock -> FileChunk -> XFTPChunkSpec -> ExceptT CLIError IO ()
|
||||
downloadFileChunk a writeLock FileChunk {replicas = replica : _} chunkSpec = do
|
||||
downloadFileChunk :: XFTPClientAgent -> FilePath -> FileChunk -> ExceptT CLIError IO FilePath
|
||||
downloadFileChunk a encPath FileChunk {chunkNo, chunkSize, replicas = replica : _} = do
|
||||
let FileChunkReplica {server, rcvId, rcvKey} = replica
|
||||
chunkPath <- uniqueCombine encPath $ show chunkNo
|
||||
c <- retries $ getXFTPServerClient a server
|
||||
(rKey, rpKey) <- liftIO C.generateKeyPair'
|
||||
(sKey, body) <- retries $ downloadXFTPChunk c rcvKey (unChunkReplicaId rcvId) rKey
|
||||
-- download and decrypt (DH) chunk from server using XFTPClient
|
||||
-- verify chunk digest - in the client
|
||||
-- save to correct location in file - also in the client
|
||||
retries $ withLock writeLock "save" $ receiveXFTPChunk body chunkSpec
|
||||
downloadFileChunk _ _ _ _ = pure ()
|
||||
decryptFile :: FilePath -> C.SbKey -> C.CbNonce -> ExceptT CLIError IO FilePath
|
||||
decryptFile encPath key nonce = do
|
||||
f <- liftIO $ LB.readFile encPath
|
||||
f' <- liftEither $ first (CLIError . show) $ LC.sbDecrypt key nonce f
|
||||
let (fileHdr, f'') = LB.splitAt 1024 f'
|
||||
retries $ receiveXFTPChunk body chunkPath $ unFileSize chunkSize
|
||||
pure chunkPath
|
||||
downloadFileChunk _ _ _ = throwError $ CLIError "chunk has no replicas"
|
||||
decryptFile :: [FilePath] -> C.SbKey -> C.CbNonce -> ExceptT CLIError IO FilePath
|
||||
decryptFile chunkPaths key nonce = do
|
||||
f <- liftEither . first (CLIError . show) . LC.sbDecrypt key nonce =<< liftIO (readChunks chunkPaths)
|
||||
let (fileHdr, f') = LB.splitAt 1024 f
|
||||
-- withFile encPath ReadMode $ \r -> do
|
||||
-- fileHdr <- liftIO $ B.hGet r 1024
|
||||
case A.parse smpP $ LB.toStrict fileHdr of
|
||||
@@ -370,8 +364,10 @@ cliReceiveFile ReceiveOptions {fileDescription, filePath, retryCount, tempPath}
|
||||
A.Partial _ -> throwError $ CLIError "Invalid file header"
|
||||
A.Done rest FileHeader {fileName} -> do
|
||||
path <- getFilePath fileName
|
||||
liftIO $ LB.writeFile path $ LB.fromStrict rest <> f''
|
||||
liftIO $ LB.writeFile path $ LB.fromStrict rest <> f'
|
||||
pure path
|
||||
readChunks :: [FilePath] -> IO LB.ByteString
|
||||
readChunks = foldM (\s path -> (s <>) <$> LB.readFile path) LB.empty
|
||||
getFilePath :: String -> ExceptT CLIError IO FilePath
|
||||
getFilePath name =
|
||||
case filePath of
|
||||
|
||||
@@ -52,6 +52,6 @@ testFileChunkDelivery =
|
||||
uploadXFTPChunk c spKey sId $ XFTPChunkSpec {filePath = "tests/tmp/chunk1", chunkOffset = 0, chunkSize = chSize}
|
||||
liftIO $ readChunk sId `shouldReturn` bytes
|
||||
(_sDhKey, chunkBody) <- downloadXFTPChunk c rpKey rId rDhKey
|
||||
receiveXFTPChunk chunkBody XFTPChunkSpec {filePath = "tests/tmp/received_chunk1", chunkOffset = 0, chunkSize = chSize}
|
||||
receiveXFTPChunk chunkBody "tests/tmp/received_chunk1" chSize
|
||||
liftIO $ B.readFile "tests/tmp/received_chunk1" `shouldReturn` bytes
|
||||
pure ()
|
||||
|
||||
Reference in New Issue
Block a user