mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-30 16:26:02 +00:00
xftp: pass save path to agent (#685)
This commit is contained in:
@@ -58,13 +58,13 @@ import UnliftIO.Concurrent
|
||||
import UnliftIO.Directory
|
||||
import qualified UnliftIO.Exception as E
|
||||
|
||||
receiveFile :: AgentMonad m => AgentClient -> UserId -> ValidFileDescription 'FRecipient -> Maybe FilePath -> m RcvFileId
|
||||
receiveFile c userId (ValidFileDescription fd@FileDescription {chunks}) xftpWorkPath = do
|
||||
receiveFile :: AgentMonad m => AgentClient -> UserId -> ValidFileDescription 'FRecipient -> Maybe FilePath -> FilePath -> m RcvFileId
|
||||
receiveFile c userId (ValidFileDescription fd@FileDescription {chunks}) xftpWorkPath savePath = do
|
||||
g <- asks idsDrg
|
||||
workPath <- maybe getTemporaryDirectory pure xftpWorkPath
|
||||
encPath <- uniqueCombine workPath "xftp.encrypted"
|
||||
createDirectory encPath
|
||||
fId <- withStore c $ \db -> createRcvFile db g userId fd workPath encPath
|
||||
fId <- withStore c $ \db -> createRcvFile db g userId fd encPath savePath
|
||||
forM_ chunks downloadChunk
|
||||
pure fId
|
||||
where
|
||||
@@ -167,18 +167,22 @@ runXFTPLocalWorker c@AgentClient {subQ} doWork = do
|
||||
decryptFile f `catchError` (workerInternalError c rcvFileId rcvFileEntityId tmpPath . show)
|
||||
noWorkToDo = void . atomically $ tryTakeTMVar doWork
|
||||
decryptFile :: RcvFile -> m ()
|
||||
decryptFile RcvFile {rcvFileId, rcvFileEntityId, key, nonce, tmpPath, saveDir, savePath, chunks} = do
|
||||
forM_ savePath $ \p -> do
|
||||
removePath p
|
||||
withStore' c (`updateRcvFileNoSavePath` rcvFileId)
|
||||
decryptFile RcvFile {rcvFileId, rcvFileEntityId, key, nonce, tmpPath, savePath, chunks} = do
|
||||
-- TODO test; recreate file if it's in status RFSDecrypting
|
||||
-- when (status == RFSDecrypting) $
|
||||
-- whenM (doesFileExist savePath) (removeFile savePath >> emptyFile)
|
||||
withStore' c $ \db -> updateRcvFileStatus db rcvFileId RFSDecrypting
|
||||
chunkPaths <- getChunkPaths chunks
|
||||
encSize <- liftIO $ foldM (\s path -> (s +) . fromIntegral <$> getFileSize path) 0 chunkPaths
|
||||
path <- decrypt encSize chunkPaths
|
||||
decrypt encSize chunkPaths
|
||||
forM_ tmpPath removePath
|
||||
withStore' c $ \db -> updateRcvFileComplete db rcvFileId path
|
||||
notify $ RFDONE path
|
||||
withStore' c (`updateRcvFileComplete` rcvFileId)
|
||||
notify RFDONE
|
||||
where
|
||||
-- emptyFile :: m ()
|
||||
-- emptyFile = do
|
||||
-- h <- openFile savePath AppendMode
|
||||
-- liftIO $ B.hPut h "" >> hFlush h
|
||||
notify :: forall e. AEntityI e => ACommand 'Agent e -> m ()
|
||||
notify cmd = atomically $ writeTBQueue subQ ("", rcvFileEntityId, APC (sAEntity @e) cmd)
|
||||
getChunkPaths :: [RcvFileChunk] -> m [FilePath]
|
||||
@@ -189,7 +193,7 @@ runXFTPLocalWorker c@AgentClient {subQ} doWork = do
|
||||
getChunkPaths (RcvFileChunk {chunkTmpPath = Nothing} : _cs) =
|
||||
throwError $ INTERNAL "no chunk path"
|
||||
-- TODO refactor with decrypt in CLI, streaming decryption
|
||||
decrypt :: Int64 -> [FilePath] -> m FilePath
|
||||
decrypt :: Int64 -> [FilePath] -> m ()
|
||||
decrypt encSize chunkPaths = do
|
||||
lazyChunks <- liftIO $ readChunks chunkPaths
|
||||
(authOk, f) <- liftEither . first cryptoError $ LC.sbDecryptTailTag key nonce (encSize - authTagSize) lazyChunks
|
||||
@@ -200,14 +204,12 @@ runXFTPLocalWorker c@AgentClient {subQ} doWork = do
|
||||
-- TODO XFTP errors
|
||||
A.Fail _ _ e -> throwError $ INTERNAL $ "Invalid file header: " <> e
|
||||
A.Partial _ -> throwError $ INTERNAL "Invalid file header"
|
||||
A.Done rest FileHeader {fileName} -> do
|
||||
-- TODO touch file in agent bracket
|
||||
path <- uniqueCombine saveDir fileName
|
||||
liftIO $ LB.writeFile path $ LB.fromStrict rest <> f'
|
||||
A.Done rest FileHeader {fileName = _fn} -> do
|
||||
-- ? check file name match
|
||||
liftIO $ LB.writeFile savePath $ LB.fromStrict rest <> f'
|
||||
unless authOk $ do
|
||||
removeFile path
|
||||
removeFile savePath
|
||||
throwError $ INTERNAL "Error decrypting file: incorrect auth tag"
|
||||
pure path
|
||||
readChunks :: [FilePath] -> IO LB.ByteString
|
||||
readChunks = foldM (\s path -> (s <>) <$> LB.readFile path) ""
|
||||
|
||||
|
||||
@@ -174,7 +174,7 @@ downloadXFTPChunk :: XFTPClient -> C.APrivateSignKey -> XFTPFileId -> XFTPRcvChu
|
||||
downloadXFTPChunk c@XFTPClient {config} rpKey fId chunkSpec@XFTPRcvChunkSpec {filePath, chunkSize} = do
|
||||
(rDhKey, rpDhKey) <- liftIO C.generateKeyPair'
|
||||
sendXFTPCommand c rpKey fId (FGET rDhKey) Nothing >>= \case
|
||||
(FRFile sDhKey cbNonce, HTTP2Body {bodyHead, bodySize, bodyPart}) -> case bodyPart of
|
||||
(FRFile sDhKey cbNonce, HTTP2Body {bodyHead = _bg, bodySize = _bs, bodyPart}) -> case bodyPart of
|
||||
-- TODO atm bodySize is set to 0, so chunkSize will be incorrect - validate once set
|
||||
Just chunkPart -> do
|
||||
let dhSecret = C.dh' sDhKey rpDhKey
|
||||
|
||||
@@ -47,8 +47,7 @@ data RcvFile = RcvFile
|
||||
chunkSize :: FileSize Word32,
|
||||
chunks :: [RcvFileChunk],
|
||||
tmpPath :: Maybe FilePath,
|
||||
saveDir :: FilePath,
|
||||
savePath :: Maybe FilePath,
|
||||
savePath :: FilePath,
|
||||
status :: RcvFileStatus
|
||||
}
|
||||
deriving (Eq, Show)
|
||||
|
||||
@@ -339,8 +339,8 @@ toggleConnectionNtfs :: AgentErrorMonad m => AgentClient -> ConnId -> Bool -> m
|
||||
toggleConnectionNtfs c = withAgentEnv c .: toggleConnectionNtfs' c
|
||||
|
||||
-- | Receive XFTP file
|
||||
xftpReceiveFile :: AgentErrorMonad m => AgentClient -> UserId -> ValidFileDescription 'FRecipient -> Maybe FilePath -> m RcvFileId
|
||||
xftpReceiveFile c = withAgentEnv c .:. receiveFile c
|
||||
xftpReceiveFile :: AgentErrorMonad m => AgentClient -> UserId -> ValidFileDescription 'FRecipient -> Maybe FilePath -> FilePath -> m RcvFileId
|
||||
xftpReceiveFile c = withAgentEnv c .:: receiveFile c
|
||||
|
||||
-- | Send XFTP file
|
||||
xftpSendFile :: AgentErrorMonad m => AgentClient -> UserId -> FilePath -> Int -> Maybe FilePath -> m SndFileId
|
||||
|
||||
@@ -337,7 +337,7 @@ data ACommand (p :: AParty) (e :: AEntity) where
|
||||
SUSPENDED :: ACommand Agent AENone
|
||||
-- XFTP commands and responses
|
||||
RFPROG :: Int -> Int -> ACommand Agent AERcvFile
|
||||
RFDONE :: FilePath -> ACommand Agent AERcvFile
|
||||
RFDONE :: ACommand Agent AERcvFile
|
||||
RFERR :: AgentErrorType -> ACommand Agent AERcvFile
|
||||
SFPROG :: Int -> Int -> ACommand Agent AESndFile
|
||||
SFDONE :: ValidFileDescription 'FSender -> [ValidFileDescription 'FRecipient] -> ACommand Agent AESndFile
|
||||
@@ -443,7 +443,7 @@ aCommandTag = \case
|
||||
ERR _ -> ERR_
|
||||
SUSPENDED -> SUSPENDED_
|
||||
RFPROG {} -> RFPROG_
|
||||
RFDONE {} -> RFDONE_
|
||||
RFDONE -> RFDONE_
|
||||
RFERR {} -> RFERR_
|
||||
SFPROG {} -> SFPROG_
|
||||
SFDONE {} -> SFDONE_
|
||||
@@ -1447,7 +1447,7 @@ commandP binaryP =
|
||||
ERR_ -> s (ERR <$> strP)
|
||||
SUSPENDED_ -> pure SUSPENDED
|
||||
RFPROG_ -> s (RFPROG <$> A.decimal <* A.space <*> A.decimal)
|
||||
RFDONE_ -> s (RFDONE <$> strP)
|
||||
RFDONE_ -> pure RFDONE
|
||||
RFERR_ -> s (RFERR <$> strP)
|
||||
SFPROG_ -> s (SFPROG <$> A.decimal <* A.space <*> A.decimal)
|
||||
SFDONE_ -> s (sfDone . safeDecodeUtf8 <$?> binaryP)
|
||||
@@ -1511,7 +1511,7 @@ serializeCommand = \case
|
||||
OK -> s OK_
|
||||
SUSPENDED -> s SUSPENDED_
|
||||
RFPROG rcvd total -> s (RFPROG_, rcvd, total)
|
||||
RFDONE fPath -> s (RFDONE_, fPath)
|
||||
RFDONE -> s RFDONE_
|
||||
RFERR e -> s (RFERR_, e)
|
||||
SFPROG sent total -> s (SFPROG_, sent, total)
|
||||
SFDONE sd rds -> B.unwords [s SFDONE_, serializeBinary (sfDone sd rds)]
|
||||
|
||||
@@ -132,7 +132,6 @@ module Simplex.Messaging.Agent.Store.SQLite
|
||||
updateRcvFileStatus,
|
||||
updateRcvFileError,
|
||||
updateRcvFileComplete,
|
||||
updateRcvFileNoSavePath,
|
||||
updateRcvFileNoTmpPath,
|
||||
getNextRcvChunkToDownload,
|
||||
getNextRcvFileToDecrypt,
|
||||
@@ -1739,7 +1738,7 @@ getXFTPServerId_ db ProtocolServer {host, port, keyHash} = do
|
||||
DB.query db "SELECT xftp_server_id FROM xftp_servers WHERE xftp_host = ? AND xftp_port = ? AND xftp_key_hash = ?" (host, port, keyHash)
|
||||
|
||||
createRcvFile :: DB.Connection -> TVar ChaChaDRG -> UserId -> FileDescription 'FRecipient -> FilePath -> FilePath -> IO (Either StoreError RcvFileId)
|
||||
createRcvFile db gVar userId fd@FileDescription {chunks} saveDir tmpPath = runExceptT $ do
|
||||
createRcvFile db gVar userId fd@FileDescription {chunks} tmpPath savePath = runExceptT $ do
|
||||
(rcvFileEntityId, rcvFileId) <- ExceptT $ insertRcvFile fd
|
||||
liftIO $
|
||||
forM_ chunks $ \fc@FileChunk {replicas} -> do
|
||||
@@ -1753,8 +1752,8 @@ createRcvFile db gVar userId fd@FileDescription {chunks} saveDir tmpPath = runEx
|
||||
createWithRandomId gVar $ \rcvFileEntityId ->
|
||||
DB.execute
|
||||
db
|
||||
"INSERT INTO rcv_files (rcv_file_entity_id, user_id, size, digest, key, nonce, chunk_size, tmp_path, save_dir, status) VALUES (?,?,?,?,?,?,?,?,?,?)"
|
||||
(rcvFileEntityId, userId, size, digest, key, nonce, chunkSize, tmpPath, saveDir, RFSReceiving)
|
||||
"INSERT INTO rcv_files (rcv_file_entity_id, user_id, size, digest, key, nonce, chunk_size, tmp_path, save_path, status) VALUES (?,?,?,?,?,?,?,?,?,?)"
|
||||
(rcvFileEntityId, userId, size, digest, key, nonce, chunkSize, tmpPath, savePath, RFSReceiving)
|
||||
rcvFileId <- liftIO $ insertedRowId db
|
||||
pure (rcvFileEntityId, rcvFileId)
|
||||
insertChunk :: FileChunk -> DBRcvFileId -> IO Int64
|
||||
@@ -1784,15 +1783,15 @@ getRcvFile db rcvFileId = runExceptT $ do
|
||||
DB.query
|
||||
db
|
||||
[sql|
|
||||
SELECT rcv_file_entity_id, user_id, size, digest, key, nonce, chunk_size, tmp_path, save_dir, save_path, status
|
||||
SELECT rcv_file_entity_id, user_id, size, digest, key, nonce, chunk_size, tmp_path, save_path, status
|
||||
FROM rcv_files
|
||||
WHERE rcv_file_id = ?
|
||||
|]
|
||||
(Only rcvFileId)
|
||||
where
|
||||
toFile :: (RcvFileId, UserId, FileSize Int64, FileDigest, C.SbKey, C.CbNonce, FileSize Word32, Maybe FilePath, FilePath, Maybe FilePath, RcvFileStatus) -> RcvFile
|
||||
toFile (rcvFileEntityId, userId, size, digest, key, nonce, chunkSize, tmpPath, saveDir, savePath, status) =
|
||||
RcvFile {rcvFileId, rcvFileEntityId, userId, size, digest, key, nonce, chunkSize, tmpPath, saveDir, savePath, status, chunks = []}
|
||||
toFile :: (RcvFileId, UserId, FileSize Int64, FileDigest, C.SbKey, C.CbNonce, FileSize Word32, Maybe FilePath, FilePath, RcvFileStatus) -> RcvFile
|
||||
toFile (rcvFileEntityId, userId, size, digest, key, nonce, chunkSize, tmpPath, savePath, status) =
|
||||
RcvFile {rcvFileId, rcvFileEntityId, userId, size, digest, key, nonce, chunkSize, tmpPath, savePath, status, chunks = []}
|
||||
getChunks :: RcvFileId -> UserId -> FilePath -> IO [RcvFileChunk]
|
||||
getChunks rcvFileEntityId userId fileTmpPath = do
|
||||
chunks <-
|
||||
@@ -1859,15 +1858,10 @@ updateRcvFileError db rcvFileId errStr = do
|
||||
updatedAt <- getCurrentTime
|
||||
DB.execute db "UPDATE rcv_files SET tmp_path = NULL, error = ?, status = ?, updated_at = ? WHERE rcv_file_id = ?" (errStr, RFSError, updatedAt, rcvFileId)
|
||||
|
||||
updateRcvFileComplete :: DB.Connection -> DBRcvFileId -> FilePath -> IO ()
|
||||
updateRcvFileComplete db rcvFileId savePath = do
|
||||
updateRcvFileComplete :: DB.Connection -> DBRcvFileId -> IO ()
|
||||
updateRcvFileComplete db rcvFileId = do
|
||||
updatedAt <- getCurrentTime
|
||||
DB.execute db "UPDATE rcv_files SET tmp_path = NULL, save_path = ?, status = ?, updated_at = ? WHERE rcv_file_id = ?" (savePath, RFSComplete, updatedAt, rcvFileId)
|
||||
|
||||
updateRcvFileNoSavePath :: DB.Connection -> DBRcvFileId -> IO ()
|
||||
updateRcvFileNoSavePath db rcvFileId = do
|
||||
updatedAt <- getCurrentTime
|
||||
DB.execute db "UPDATE rcv_files SET save_path = NULL, updated_at = ? WHERE rcv_file_id = ?" (updatedAt, rcvFileId)
|
||||
DB.execute db "UPDATE rcv_files SET tmp_path = NULL, status = ?, updated_at = ? WHERE rcv_file_id = ?" (RFSComplete, updatedAt, rcvFileId)
|
||||
|
||||
updateRcvFileNoTmpPath :: DB.Connection -> DBRcvFileId -> IO ()
|
||||
updateRcvFileNoTmpPath db rcvFileId = do
|
||||
|
||||
@@ -28,8 +28,7 @@ CREATE TABLE rcv_files (
|
||||
nonce BLOB NOT NULL,
|
||||
chunk_size INTEGER NOT NULL,
|
||||
tmp_path TEXT,
|
||||
save_dir TEXT NOT NULL,
|
||||
save_path TEXT,
|
||||
save_path TEXT NOT NULL,
|
||||
status TEXT NOT NULL,
|
||||
error TEXT,
|
||||
created_at TEXT NOT NULL DEFAULT (datetime('now')),
|
||||
|
||||
@@ -302,8 +302,7 @@ CREATE TABLE rcv_files(
|
||||
nonce BLOB NOT NULL,
|
||||
chunk_size INTEGER NOT NULL,
|
||||
tmp_path TEXT,
|
||||
save_dir TEXT NOT NULL,
|
||||
save_path TEXT,
|
||||
save_path TEXT NOT NULL,
|
||||
status TEXT NOT NULL,
|
||||
error TEXT,
|
||||
created_at TEXT NOT NULL DEFAULT(datetime('now')),
|
||||
|
||||
Reference in New Issue
Block a user