From 2bec353eae09a0561a94e8c8ef17054dee70c2ad Mon Sep 17 00:00:00 2001 From: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com> Date: Mon, 20 Mar 2023 20:08:38 +0400 Subject: [PATCH] xftp: add command to delete rcv file; agent manages save path (#692) --- simplexmq.cabal | 2 +- src/Simplex/FileTransfer/Agent.hs | 44 ++++++---- src/Simplex/FileTransfer/Types.hs | 4 +- src/Simplex/Messaging/Agent.hs | 25 ++++-- src/Simplex/Messaging/Agent/Protocol.hs | 8 +- src/Simplex/Messaging/Agent/Store/SQLite.hs | 80 +++++++++++++++---- .../SQLite/Migrations/M20230223_files.hs | 2 + ...07_snd_files.hs => M20230401_snd_files.hs} | 6 +- .../Store/SQLite/Migrations/agent_schema.sql | 2 + tests/XFTPAgent.hs | 51 ++++++------ 10 files changed, 151 insertions(+), 73 deletions(-) rename src/Simplex/Messaging/Agent/Store/SQLite/Migrations/{M20230307_snd_files.hs => M20230401_snd_files.hs} (96%) diff --git a/simplexmq.cabal b/simplexmq.cabal index cd0da9b3c..9140410be 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -78,7 +78,7 @@ library Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230120_delete_errors Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230217_server_key_hash Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230223_files - Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230307_snd_files + Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230401_snd_files Simplex.Messaging.Agent.TAsyncs Simplex.Messaging.Agent.TRcvQueues Simplex.Messaging.Client diff --git a/src/Simplex/FileTransfer/Agent.hs b/src/Simplex/FileTransfer/Agent.hs index 667a0ae3f..12ac35548 100644 --- a/src/Simplex/FileTransfer/Agent.hs +++ b/src/Simplex/FileTransfer/Agent.hs @@ -13,6 +13,7 @@ module Simplex.FileTransfer.Agent ( -- Receiving files receiveFile, addXFTPWorker, + deleteRcvFile, -- Sending files sendFileExperimental, _sendFile, @@ -32,6 +33,8 @@ import qualified Data.ByteString.Char8 as B import Data.List (isSuffixOf, partition) import Data.List.NonEmpty (nonEmpty) import qualified Data.List.NonEmpty as L +import Data.Time.Clock (getCurrentTime) +import Data.Time.Format (defaultTimeLocale, formatTime) import Simplex.FileTransfer.Client.Main (CLIError, SendOptions (..), cliSendFile) import Simplex.FileTransfer.Crypto import Simplex.FileTransfer.Description @@ -54,13 +57,19 @@ import UnliftIO.Concurrent import UnliftIO.Directory import qualified UnliftIO.Exception as E -receiveFile :: AgentMonad m => AgentClient -> UserId -> ValidFileDescription 'FRecipient -> Maybe FilePath -> FilePath -> m RcvFileId -receiveFile c userId (ValidFileDescription fd@FileDescription {chunks}) xftpWorkPath savePath = do +receiveFile :: AgentMonad m => AgentClient -> UserId -> ValidFileDescription 'FRecipient -> Maybe FilePath -> m RcvFileId +receiveFile c userId (ValidFileDescription fd@FileDescription {chunks}) xftpWorkPath = do g <- asks idsDrg workPath <- maybe getTemporaryDirectory pure xftpWorkPath - encPath <- uniqueCombine workPath "xftp.encrypted" - createDirectory encPath - fId <- withStore c $ \db -> createRcvFile db g userId fd encPath savePath + ts <- liftIO getCurrentTime + let isoTime = formatTime defaultTimeLocale "%Y%m%d_%H%M%S_%6q" ts + prefixPath <- uniqueCombine workPath (isoTime <> "_rcv.xftp") + createDirectory prefixPath + let tmpPath = prefixPath "xftp.encrypted" + createDirectory tmpPath + let savePath = prefixPath "xftp.decrypted" + createEmptyFile savePath + fId <- withStore c $ \db -> createRcvFile db g userId fd prefixPath tmpPath savePath forM_ chunks downloadChunk pure fId where @@ -69,6 +78,11 @@ receiveFile c userId (ValidFileDescription fd@FileDescription {chunks}) xftpWork addXFTPWorker c (Just server) downloadChunk _ = throwError $ INTERNAL "no replicas" +createEmptyFile :: AgentMonad m => FilePath -> m () +createEmptyFile fPath = do + h <- openFile fPath AppendMode + liftIO $ B.hPut h "" >> hFlush h + addXFTPWorker :: AgentMonad m => AgentClient -> Maybe XFTPServer -> m () addXFTPWorker c srv_ = do ws <- asks $ xftpWorkers . xftpAgent @@ -129,9 +143,6 @@ runXFTPWorker c srv doWork = do when fileReceived $ liftIO $ updateRcvFileStatus db rcvFileId RFSReceived pure fileReceived - -- check if chunk is downloaded and not acknowledged via flag acknowledged? - -- or just catch and ignore error on acknowledgement? (and remove flag) - -- agentXFTPAckChunk c replicaKey (unChunkReplicaId replicaId) `catchError` \_ -> pure () when fileReceived $ addXFTPWorker c Nothing where allChunksReceived :: RcvFile -> Bool @@ -165,19 +176,15 @@ runXFTPLocalWorker c@AgentClient {subQ} doWork = do decryptFile RcvFile {rcvFileId, rcvFileEntityId, key, nonce, tmpPath, savePath, chunks} = do -- TODO test; recreate file if it's in status RFSDecrypting -- when (status == RFSDecrypting) $ - -- whenM (doesFileExist savePath) (removeFile savePath >> emptyFile) + -- whenM (doesFileExist savePath) (removeFile savePath >> createEmptyFile savePath) withStore' c $ \db -> updateRcvFileStatus db rcvFileId RFSDecrypting chunkPaths <- getChunkPaths chunks encSize <- liftIO $ foldM (\s path -> (s +) . fromIntegral <$> getFileSize path) 0 chunkPaths void $ liftError (INTERNAL . show) $ decryptChunks encSize chunkPaths key nonce $ \_ -> pure savePath forM_ tmpPath removePath withStore' c (`updateRcvFileComplete` rcvFileId) - notify RFDONE + notify $ RFDONE savePath where - -- emptyFile :: m () - -- emptyFile = do - -- h <- openFile savePath AppendMode - -- liftIO $ B.hPut h "" >> hFlush h notify :: forall e. AEntityI e => ACommand 'Agent e -> m () notify cmd = atomically $ writeTBQueue subQ ("", rcvFileEntityId, APC (sAEntity @e) cmd) getChunkPaths :: [RcvFileChunk] -> m [FilePath] @@ -188,6 +195,15 @@ runXFTPLocalWorker c@AgentClient {subQ} doWork = do getChunkPaths (RcvFileChunk {chunkTmpPath = Nothing} : _cs) = throwError $ INTERNAL "no chunk path" +deleteRcvFile :: AgentMonad m => AgentClient -> UserId -> RcvFileId -> m () +deleteRcvFile c userId rcvFileEntityId = do + RcvFile {rcvFileId, prefixPath, status} <- withStore c $ \db -> getRcvFileByEntityId db userId rcvFileEntityId + if status == RFSComplete || status == RFSError + then do + removePath prefixPath + withStore' c (`deleteRcvFile'` rcvFileId) + else withStore' c (`updateRcvFileDeleted` rcvFileId) + sendFileExperimental :: forall m. AgentMonad m => AgentClient -> UserId -> FilePath -> Int -> Maybe FilePath -> m SndFileId sendFileExperimental AgentClient {subQ, xftpServers} userId filePath numRecipients xftpWorkPath = do g <- asks idsDrg diff --git a/src/Simplex/FileTransfer/Types.hs b/src/Simplex/FileTransfer/Types.hs index e71460fc2..578d98ac6 100644 --- a/src/Simplex/FileTransfer/Types.hs +++ b/src/Simplex/FileTransfer/Types.hs @@ -46,9 +46,11 @@ data RcvFile = RcvFile nonce :: C.CbNonce, chunkSize :: FileSize Word32, chunks :: [RcvFileChunk], + prefixPath :: FilePath, tmpPath :: Maybe FilePath, savePath :: FilePath, - status :: RcvFileStatus + status :: RcvFileStatus, + deleted :: Bool } deriving (Eq, Show) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 1adc17f92..8d037c185 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -81,6 +81,7 @@ module Simplex.Messaging.Agent getNtfTokenData, toggleConnectionNtfs, xftpReceiveFile, + xftpDeleteRcvFile, xftpSendFile, activateAgent, suspendAgent, @@ -115,7 +116,7 @@ import qualified Data.Text as T import Data.Time.Clock import Data.Time.Clock.System (systemToUTCTime) import qualified Database.SQLite.Simple as DB -import Simplex.FileTransfer.Agent (addXFTPWorker, receiveFile, sendFileExperimental) +import Simplex.FileTransfer.Agent (addXFTPWorker, deleteRcvFile, receiveFile, sendFileExperimental) import Simplex.FileTransfer.Description (ValidFileDescription) import Simplex.FileTransfer.Protocol (FileParty (..)) import Simplex.FileTransfer.Util (removePath) @@ -339,8 +340,12 @@ toggleConnectionNtfs :: AgentErrorMonad m => AgentClient -> ConnId -> Bool -> m toggleConnectionNtfs c = withAgentEnv c .: toggleConnectionNtfs' c -- | Receive XFTP file -xftpReceiveFile :: AgentErrorMonad m => AgentClient -> UserId -> ValidFileDescription 'FRecipient -> Maybe FilePath -> FilePath -> m RcvFileId -xftpReceiveFile c = withAgentEnv c .:: receiveFile c +xftpReceiveFile :: AgentErrorMonad m => AgentClient -> UserId -> ValidFileDescription 'FRecipient -> Maybe FilePath -> m RcvFileId +xftpReceiveFile c = withAgentEnv c .:. receiveFile c + +-- | Delete XFTP rcv file (deletes work files from file system and db records) +xftpDeleteRcvFile :: AgentErrorMonad m => AgentClient -> UserId -> RcvFileId -> m () +xftpDeleteRcvFile c = withAgentEnv c .: deleteRcvFile c -- | Send XFTP file xftpSendFile :: AgentErrorMonad m => AgentClient -> UserId -> FilePath -> Int -> Maybe FilePath -> m SndFileId @@ -1604,7 +1609,7 @@ cleanupManager c = do forever $ do void . runExceptT $ do deleteConns - deleteTmpPaths + deleteFiles threadDelay int where deleteConns = @@ -1612,9 +1617,15 @@ cleanupManager c = do void $ withStore' c getDeletedConnIds >>= deleteDeletedConns c withStore' c deleteUsersWithoutConns >>= mapM_ notifyUserDeleted notifyUserDeleted userId = atomically $ writeTBQueue (subQ c) ("", "", APC SAENone $ DEL_USER userId) - deleteTmpPaths = do - tmpPaths <- withStore' c getTmpFilePaths - forM_ tmpPaths $ \(fId, p) -> do + deleteFiles = do + -- cleanup rcv files marked for deletion + rcvDeleted <- withStore' c getCleanupRcvFilesDeleted + forM_ rcvDeleted $ \(fId, p) -> do + removePath p + withStore' c (`deleteRcvFile'` fId) + -- cleanup rcv tmp paths + rcvTmpPaths <- withStore' c getCleanupRcvFilesTmpPaths + forM_ rcvTmpPaths $ \(fId, p) -> do removePath p withStore' c (`updateRcvFileNoTmpPath` fId) diff --git a/src/Simplex/Messaging/Agent/Protocol.hs b/src/Simplex/Messaging/Agent/Protocol.hs index 77b1c4ccf..7d7d1180b 100644 --- a/src/Simplex/Messaging/Agent/Protocol.hs +++ b/src/Simplex/Messaging/Agent/Protocol.hs @@ -337,7 +337,7 @@ data ACommand (p :: AParty) (e :: AEntity) where SUSPENDED :: ACommand Agent AENone -- XFTP commands and responses RFPROG :: Int -> Int -> ACommand Agent AERcvFile - RFDONE :: ACommand Agent AERcvFile + RFDONE :: FilePath -> ACommand Agent AERcvFile RFERR :: AgentErrorType -> ACommand Agent AERcvFile SFPROG :: Int -> Int -> ACommand Agent AESndFile SFDONE :: ValidFileDescription 'FSender -> [ValidFileDescription 'FRecipient] -> ACommand Agent AESndFile @@ -443,7 +443,7 @@ aCommandTag = \case ERR _ -> ERR_ SUSPENDED -> SUSPENDED_ RFPROG {} -> RFPROG_ - RFDONE -> RFDONE_ + RFDONE {} -> RFDONE_ RFERR {} -> RFERR_ SFPROG {} -> SFPROG_ SFDONE {} -> SFDONE_ @@ -1447,7 +1447,7 @@ commandP binaryP = ERR_ -> s (ERR <$> strP) SUSPENDED_ -> pure SUSPENDED RFPROG_ -> s (RFPROG <$> A.decimal <* A.space <*> A.decimal) - RFDONE_ -> pure RFDONE + RFDONE_ -> s (RFDONE <$> strP) RFERR_ -> s (RFERR <$> strP) SFPROG_ -> s (SFPROG <$> A.decimal <* A.space <*> A.decimal) SFDONE_ -> s (sfDone . safeDecodeUtf8 <$?> binaryP) @@ -1511,7 +1511,7 @@ serializeCommand = \case OK -> s OK_ SUSPENDED -> s SUSPENDED_ RFPROG rcvd total -> s (RFPROG_, rcvd, total) - RFDONE -> s RFDONE_ + RFDONE fPath -> s (RFDONE_, fPath) RFERR e -> s (RFERR_, e) SFPROG sent total -> s (SFPROG_, sent, total) SFDONE sd rds -> B.unwords [s SFDONE_, serializeBinary (sfDone sd rds)] diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index b3ac192ef..2ad2015dd 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -126,16 +126,20 @@ module Simplex.Messaging.Agent.Store.SQLite -- File transfer createRcvFile, getRcvFile, + getRcvFileByEntityId, updateRcvChunkReplicaDelay, updateRcvFileChunkReceived, updateRcvFileStatus, updateRcvFileError, updateRcvFileComplete, updateRcvFileNoTmpPath, + updateRcvFileDeleted, + deleteRcvFile', getNextRcvChunkToDownload, getNextRcvFileToDecrypt, getPendingRcvFilesServers, - getTmpFilePaths, + getCleanupRcvFilesTmpPaths, + getCleanupRcvFilesDeleted, -- * utilities withConnection, @@ -1736,8 +1740,8 @@ getXFTPServerId_ db ProtocolServer {host, port, keyHash} = do firstRow fromOnly SEXFTPServerNotFound $ DB.query db "SELECT xftp_server_id FROM xftp_servers WHERE xftp_host = ? AND xftp_port = ? AND xftp_key_hash = ?" (host, port, keyHash) -createRcvFile :: DB.Connection -> TVar ChaChaDRG -> UserId -> FileDescription 'FRecipient -> FilePath -> FilePath -> IO (Either StoreError RcvFileId) -createRcvFile db gVar userId fd@FileDescription {chunks} tmpPath savePath = runExceptT $ do +createRcvFile :: DB.Connection -> TVar ChaChaDRG -> UserId -> FileDescription 'FRecipient -> FilePath -> FilePath -> FilePath -> IO (Either StoreError RcvFileId) +createRcvFile db gVar userId fd@FileDescription {chunks} prefixPath tmpPath savePath = runExceptT $ do (rcvFileEntityId, rcvFileId) <- ExceptT $ insertRcvFile fd liftIO $ forM_ chunks $ \fc@FileChunk {replicas} -> do @@ -1751,8 +1755,8 @@ createRcvFile db gVar userId fd@FileDescription {chunks} tmpPath savePath = runE createWithRandomId gVar $ \rcvFileEntityId -> DB.execute db - "INSERT INTO rcv_files (rcv_file_entity_id, user_id, size, digest, key, nonce, chunk_size, tmp_path, save_path, status) VALUES (?,?,?,?,?,?,?,?,?,?)" - (rcvFileEntityId, userId, size, digest, key, nonce, chunkSize, tmpPath, savePath, RFSReceiving) + "INSERT INTO rcv_files (rcv_file_entity_id, user_id, size, digest, key, nonce, chunk_size, prefix_path, tmp_path, save_path, status) VALUES (?,?,?,?,?,?,?,?,?,?,?)" + ((rcvFileEntityId, userId, size, digest, key, nonce, chunkSize) :. (prefixPath, tmpPath, savePath, RFSReceiving)) rcvFileId <- liftIO $ insertedRowId db pure (rcvFileEntityId, rcvFileId) insertChunk :: FileChunk -> DBRcvFileId -> IO Int64 @@ -1770,6 +1774,16 @@ createRcvFile db gVar userId fd@FileDescription {chunks} tmpPath savePath = runE "INSERT INTO rcv_file_chunk_replicas (replica_number, rcv_file_chunk_id, xftp_server_id, replica_id, replica_key) VALUES (?,?,?,?,?)" (replicaNo, chunkId, srvId, replicaId, replicaKey) +getRcvFileByEntityId :: DB.Connection -> UserId -> RcvFileId -> IO (Either StoreError RcvFile) +getRcvFileByEntityId db userId rcvFileEntityId = runExceptT $ do + rcvFileId <- ExceptT $ getRcvFileIdByEntityId_ db userId rcvFileEntityId + ExceptT $ getRcvFile db rcvFileId + +getRcvFileIdByEntityId_ :: DB.Connection -> UserId -> RcvFileId -> IO (Either StoreError DBRcvFileId) +getRcvFileIdByEntityId_ db userId rcvFileEntityId = + firstRow fromOnly SEFileNotFound $ + DB.query db "SELECT rcv_file_id FROM rcv_files WHERE user_id = ? AND rcv_file_entity_id = ?" (userId, rcvFileEntityId) + getRcvFile :: DB.Connection -> DBRcvFileId -> IO (Either StoreError RcvFile) getRcvFile db rcvFileId = runExceptT $ do fd@RcvFile {rcvFileEntityId, userId, tmpPath} <- ExceptT getFile @@ -1782,15 +1796,15 @@ getRcvFile db rcvFileId = runExceptT $ do DB.query db [sql| - SELECT rcv_file_entity_id, user_id, size, digest, key, nonce, chunk_size, tmp_path, save_path, status + SELECT rcv_file_entity_id, user_id, size, digest, key, nonce, chunk_size, prefix_path, tmp_path, save_path, status, deleted FROM rcv_files WHERE rcv_file_id = ? |] (Only rcvFileId) where - toFile :: (RcvFileId, UserId, FileSize Int64, FileDigest, C.SbKey, C.CbNonce, FileSize Word32, Maybe FilePath, FilePath, RcvFileStatus) -> RcvFile - toFile (rcvFileEntityId, userId, size, digest, key, nonce, chunkSize, tmpPath, savePath, status) = - RcvFile {rcvFileId, rcvFileEntityId, userId, size, digest, key, nonce, chunkSize, tmpPath, savePath, status, chunks = []} + toFile :: (RcvFileId, UserId, FileSize Int64, FileDigest, C.SbKey, C.CbNonce, FileSize Word32, FilePath, Maybe FilePath, FilePath, RcvFileStatus, Bool) -> RcvFile + toFile (rcvFileEntityId, userId, size, digest, key, nonce, chunkSize, prefixPath, tmpPath, savePath, status, deleted) = + RcvFile {rcvFileId, rcvFileEntityId, userId, size, digest, key, nonce, chunkSize, prefixPath, tmpPath, savePath, status, deleted, chunks = []} getChunks :: RcvFileId -> UserId -> FilePath -> IO [RcvFileChunk] getChunks rcvFileEntityId userId fileTmpPath = do chunks <- @@ -1862,6 +1876,15 @@ updateRcvFileNoTmpPath db rcvFileId = do updatedAt <- getCurrentTime DB.execute db "UPDATE rcv_files SET tmp_path = NULL, updated_at = ? WHERE rcv_file_id = ?" (updatedAt, rcvFileId) +updateRcvFileDeleted :: DB.Connection -> DBRcvFileId -> IO () +updateRcvFileDeleted db rcvFileId = do + updatedAt <- getCurrentTime + DB.execute db "UPDATE rcv_files SET deleted = 1, updated_at = ? WHERE rcv_file_id = ?" (updatedAt, rcvFileId) + +deleteRcvFile' :: DB.Connection -> DBRcvFileId -> IO () +deleteRcvFile' db rcvFileId = + DB.execute db "DELETE FROM rcv_files WHERE rcv_file_id = ?" (Only rcvFileId) + getNextRcvChunkToDownload :: DB.Connection -> XFTPServer -> IO (Maybe RcvFileChunk) getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} = do maybeFirstRow toChunk $ @@ -1876,7 +1899,8 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} = do JOIN rcv_file_chunks c ON c.rcv_file_chunk_id = r.rcv_file_chunk_id JOIN rcv_files f ON f.rcv_file_id = c.rcv_file_id WHERE s.xftp_host = ? AND s.xftp_port = ? AND s.xftp_key_hash = ? - AND r.received = 0 AND r.replica_number = 1 AND f.status = ? + AND r.received = 0 AND r.replica_number = 1 + AND f.status = ? AND f.deleted = 0 ORDER BY r.created_at ASC LIMIT 1 |] @@ -1901,7 +1925,15 @@ getNextRcvFileToDecrypt :: DB.Connection -> IO (Maybe RcvFile) getNextRcvFileToDecrypt db = do fileId_ :: Maybe DBRcvFileId <- maybeFirstRow fromOnly $ - DB.query db "SELECT rcv_file_id FROM rcv_files WHERE status IN (?,?) ORDER BY created_at ASC LIMIT 1" (RFSReceived, RFSDecrypting) + DB.query + db + [sql| + SELECT rcv_file_id + FROM rcv_files + WHERE status IN (?,?) AND deleted = 0 + ORDER BY created_at ASC LIMIT 1 + |] + (RFSReceived, RFSDecrypting) case fileId_ of Nothing -> pure Nothing Just fileId -> eitherToMaybe <$> getRcvFile db fileId @@ -1918,13 +1950,31 @@ getPendingRcvFilesServers db = do JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id JOIN rcv_file_chunks c ON c.rcv_file_chunk_id = r.rcv_file_chunk_id JOIN rcv_files f ON f.rcv_file_id = c.rcv_file_id - WHERE r.received = 0 AND r.replica_number = 1 AND f.status = ? + WHERE r.received = 0 AND r.replica_number = 1 + AND f.status = ? AND f.deleted = 0 |] (Only RFSReceiving) where toServer :: (NonEmpty TransportHost, ServiceName, C.KeyHash) -> XFTPServer toServer (host, port, keyHash) = XFTPServer host port keyHash -getTmpFilePaths :: DB.Connection -> IO [(DBRcvFileId, FilePath)] -getTmpFilePaths db = - DB.query db "SELECT rcv_file_id, tmp_path FROM rcv_files WHERE status IN (?,?) AND tmp_path IS NOT NULL" (RFSComplete, RFSError) +getCleanupRcvFilesTmpPaths :: DB.Connection -> IO [(DBRcvFileId, FilePath)] +getCleanupRcvFilesTmpPaths db = + DB.query + db + [sql| + SELECT rcv_file_id, tmp_path + FROM rcv_files + WHERE status IN (?,?) AND tmp_path IS NOT NULL + |] + (RFSComplete, RFSError) + +getCleanupRcvFilesDeleted :: DB.Connection -> IO [(DBRcvFileId, FilePath)] +getCleanupRcvFilesDeleted db = + DB.query_ + db + [sql| + SELECT rcv_file_id, prefix_path + FROM rcv_files + WHERE deleted = 1 + |] diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230223_files.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230223_files.hs index 46e9dd300..1bae366a7 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230223_files.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230223_files.hs @@ -27,9 +27,11 @@ CREATE TABLE rcv_files ( key BLOB NOT NULL, nonce BLOB NOT NULL, chunk_size INTEGER NOT NULL, + prefix_path TEXT NOT NULL, tmp_path TEXT, save_path TEXT NOT NULL, status TEXT NOT NULL, + deleted INTEGER NOT NULL DEFAULT 0, error TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')), updated_at TEXT NOT NULL DEFAULT (datetime('now')), diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230307_snd_files.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230401_snd_files.hs similarity index 96% rename from src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230307_snd_files.hs rename to src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230401_snd_files.hs index 3cfc96117..df5754fb2 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230307_snd_files.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230401_snd_files.hs @@ -1,13 +1,13 @@ {-# LANGUAGE QuasiQuotes #-} -module Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230307_snd_files where +module Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230401_snd_files where import Database.SQLite.Simple (Query) import Database.SQLite.Simple.QQ (sql) -- this migration is a draft - it is not included in the list of migrations -m20230307_snd_files :: Query -m20230307_snd_files = +m20230401_snd_files :: Query +m20230401_snd_files = [sql| CREATE TABLE snd_files ( snd_file_id INTEGER PRIMARY KEY AUTOINCREMENT, diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql index 1f17ac080..29de284f9 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql @@ -301,9 +301,11 @@ CREATE TABLE rcv_files( key BLOB NOT NULL, nonce BLOB NOT NULL, chunk_size INTEGER NOT NULL, + prefix_path TEXT NOT NULL, tmp_path TEXT, save_path TEXT NOT NULL, status TEXT NOT NULL, + deleted INTEGER NOT NULL DEFAULT 0, error TEXT, created_at TEXT NOT NULL DEFAULT(datetime('now')), updated_at TEXT NOT NULL DEFAULT(datetime('now')), diff --git a/tests/XFTPAgent.hs b/tests/XFTPAgent.hs index db04e1d20..ba87160da 100644 --- a/tests/XFTPAgent.hs +++ b/tests/XFTPAgent.hs @@ -13,12 +13,11 @@ import qualified Data.ByteString.Char8 as B import SMPAgentClient (agentCfg, initAgentServers) import Simplex.FileTransfer.Description import Simplex.FileTransfer.Protocol (FileParty (..)) -import Simplex.Messaging.Agent (disconnectAgentClient, getSMPAgentClient, xftpReceiveFile, xftpSendFile) +import Simplex.Messaging.Agent (disconnectAgentClient, getSMPAgentClient, xftpDeleteRcvFile, xftpReceiveFile, xftpSendFile) import Simplex.Messaging.Agent.Protocol (ACommand (..), AgentErrorType (..)) import Simplex.Messaging.Encoding.String (StrEncoding (..)) -import System.Directory (doesDirectoryExist, getFileSize) +import System.Directory (doesDirectoryExist, getFileSize, listDirectory) import System.FilePath (()) -import System.Process (readCreateProcess, shell) import System.Timeout (timeout) import Test.Hspec import XFTPCLI @@ -29,7 +28,7 @@ xftpAgentTests = around_ testBracket . describe "Functional API" $ do it "should receive file" testXFTPAgentReceive it "should resume receiving file after restart" testXFTPAgentReceiveRestore it "should cleanup tmp path after permanent error" testXFTPAgentReceiveCleanup - it "should send file using experimental api" testXFTPAgentSendExperimental -- TODO uses default servers (remote) + it "should send file using experimental api" testXFTPAgentSendExperimental testXFTPAgentReceive :: IO () testXFTPAgentReceive = withXFTPServer $ do @@ -49,18 +48,16 @@ testXFTPAgentReceive = withXFTPServer $ do ] -- receive file using agent rcp <- getSMPAgentClient agentCfg initAgentServers - let savePath = recipientFiles "testfile" - run $ "touch " <> savePath runRight_ $ do fd :: ValidFileDescription 'FRecipient <- getFileDescription fdRcv - fId <- xftpReceiveFile rcp 1 fd (Just recipientFiles) savePath - ("", fId', RFDONE) <- rfGet rcp + fId <- xftpReceiveFile rcp 1 fd (Just recipientFiles) + ("", fId', RFDONE path) <- rfGet rcp liftIO $ do fId' `shouldBe` fId - B.readFile savePath `shouldReturn` file + B.readFile path `shouldReturn` file -run :: String -> IO () -run cmd = void $ readCreateProcess (shell cmd) "" + -- delete file + xftpDeleteRcvFile rcp 1 fId getFileDescription :: FilePath -> ExceptT AgentErrorType IO (ValidFileDescription 'FRecipient) getFileDescription path = @@ -88,29 +85,29 @@ testXFTPAgentReceiveRestore = withGlobalLogging logCfgNoLogs $ do ] -- receive file using agent - should not succeed due to server being down - let savePath = recipientFiles "testfile" - run $ "touch " <> savePath rcp <- getSMPAgentClient agentCfg initAgentServers fId <- runRight $ do fd :: ValidFileDescription 'FRecipient <- getFileDescription fdRcv - fId <- xftpReceiveFile rcp 1 fd (Just recipientFiles) savePath + fId <- xftpReceiveFile rcp 1 fd (Just recipientFiles) liftIO $ timeout 300000 (get rcp) `shouldReturn` Nothing -- wait for worker attempt pure fId disconnectAgentClient rcp - doesDirectoryExist (recipientFiles "xftp.encrypted") `shouldReturn` True + [prefixDir] <- listDirectory recipientFiles + let tmpPath = recipientFiles prefixDir "xftp.encrypted" + doesDirectoryExist tmpPath `shouldReturn` True rcp' <- getSMPAgentClient agentCfg initAgentServers withXFTPServerStoreLogOn $ \_ -> do -- receive file using agent - should succeed with server up - ("", fId', RFDONE) <- rfGet rcp' + ("", fId', RFDONE path) <- rfGet rcp' liftIO $ do fId' `shouldBe` fId file <- B.readFile filePath - B.readFile savePath `shouldReturn` file + B.readFile path `shouldReturn` file -- tmp path should be removed after receiving file - doesDirectoryExist (recipientFiles "xftp.encrypted") `shouldReturn` False + doesDirectoryExist tmpPath `shouldReturn` False testXFTPAgentReceiveCleanup :: IO () testXFTPAgentReceiveCleanup = withGlobalLogging logCfgNoLogs $ do @@ -132,16 +129,16 @@ testXFTPAgentReceiveCleanup = withGlobalLogging logCfgNoLogs $ do -- receive file using agent - should not succeed due to server being down rcp <- getSMPAgentClient agentCfg initAgentServers - let savePath = recipientFiles "testfile" - run $ "touch " <> savePath fId <- runRight $ do fd :: ValidFileDescription 'FRecipient <- getFileDescription fdRcv - fId <- xftpReceiveFile rcp 1 fd (Just recipientFiles) savePath + fId <- xftpReceiveFile rcp 1 fd (Just recipientFiles) liftIO $ timeout 300000 (get rcp) `shouldReturn` Nothing -- wait for worker attempt pure fId disconnectAgentClient rcp - doesDirectoryExist (recipientFiles "xftp.encrypted") `shouldReturn` True + [prefixDir] <- listDirectory recipientFiles + let tmpPath = recipientFiles prefixDir "xftp.encrypted" + doesDirectoryExist tmpPath `shouldReturn` True -- receive file using agent - should fail with AUTH error rcp' <- getSMPAgentClient agentCfg initAgentServers @@ -150,7 +147,7 @@ testXFTPAgentReceiveCleanup = withGlobalLogging logCfgNoLogs $ do fId' `shouldBe` fId -- tmp path should be removed after permanent error - doesDirectoryExist (recipientFiles "xftp.encrypted") `shouldReturn` False + doesDirectoryExist tmpPath `shouldReturn` False testXFTPAgentSendExperimental :: IO () testXFTPAgentSendExperimental = withXFTPServer $ do @@ -175,11 +172,9 @@ testXFTPAgentSendExperimental = withXFTPServer $ do -- receive file using agent rcp <- getSMPAgentClient agentCfg initAgentServers - let savePath = recipientFiles "testfile" - run $ "touch " <> savePath runRight_ $ do - rfId <- xftpReceiveFile rcp 1 rfd (Just recipientFiles) savePath - ("", rfId', RFDONE) <- rfGet rcp + rfId <- xftpReceiveFile rcp 1 rfd (Just recipientFiles) + ("", rfId', RFDONE path) <- rfGet rcp liftIO $ do rfId' `shouldBe` rfId - B.readFile savePath `shouldReturn` file + B.readFile path `shouldReturn` file