mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-30 16:26:02 +00:00
xftp: api to delete snd files internally, cleanup snd files, tests (#719)
This commit is contained in:
@@ -20,6 +20,7 @@ module Simplex.FileTransfer.Agent
|
||||
-- Sending files
|
||||
sendFileExperimental,
|
||||
sendFile,
|
||||
deleteSndFileInternal,
|
||||
)
|
||||
where
|
||||
|
||||
@@ -435,7 +436,7 @@ runXFTPSndPrepareWorker c doWork = do
|
||||
|
||||
sndWorkerInternalError :: AgentMonad m => AgentClient -> DBSndFileId -> SndFileId -> Maybe FilePath -> String -> m ()
|
||||
sndWorkerInternalError c sndFileId sndFileEntityId prefixPath internalErrStr = do
|
||||
forM_ prefixPath (removePath <=< toFSFilePath)
|
||||
forM_ prefixPath $ removePath <=< toFSFilePath
|
||||
withStore' c $ \db -> updateSndFileError db sndFileId internalErrStr
|
||||
notify c sndFileEntityId $ SFERR $ INTERNAL internalErrStr
|
||||
|
||||
@@ -490,7 +491,7 @@ runXFTPSndWorker c srv doWork = do
|
||||
when complete $ do
|
||||
(sndDescr, rcvDescrs) <- sndFileToDescrs sf
|
||||
notify c sndFileEntityId $ SFDONE sndDescr rcvDescrs
|
||||
forM_ prefixPath (removePath <=< toFSFilePath)
|
||||
forM_ prefixPath $ removePath <=< toFSFilePath
|
||||
withStore' c $ \db -> updateSndFileComplete db sndFileId
|
||||
where
|
||||
addRecipients :: SndFileChunk -> SndFileChunkReplica -> m SndFileChunkReplica
|
||||
@@ -568,3 +569,12 @@ runXFTPSndWorker c srv doWork = do
|
||||
chunkUploaded :: SndFileChunk -> Bool
|
||||
chunkUploaded SndFileChunk {replicas} =
|
||||
any (\SndFileChunkReplica {replicaStatus} -> replicaStatus == SFRSUploaded) replicas
|
||||
|
||||
deleteSndFileInternal :: AgentMonad m => AgentClient -> UserId -> SndFileId -> m ()
|
||||
deleteSndFileInternal c userId sndFileEntityId = do
|
||||
SndFile {sndFileId, prefixPath, status} <- withStore c $ \db -> getSndFileByEntityId db userId sndFileEntityId
|
||||
if status == SFSComplete || status == SFSError
|
||||
then do
|
||||
forM_ prefixPath $ removePath <=< toFSFilePath
|
||||
withStore' c (`deleteSndFile'` sndFileId)
|
||||
else withStore' c (`updateSndFileDeleted` sndFileId)
|
||||
|
||||
@@ -84,6 +84,7 @@ module Simplex.Messaging.Agent
|
||||
xftpReceiveFile,
|
||||
xftpDeleteRcvFile,
|
||||
xftpSendFile,
|
||||
xftpDeleteSndFileInternal,
|
||||
activateAgent,
|
||||
suspendAgent,
|
||||
execAgentStoreSQL,
|
||||
@@ -119,7 +120,7 @@ import qualified Data.Text as T
|
||||
import Data.Time.Clock
|
||||
import Data.Time.Clock.System (systemToUTCTime)
|
||||
import qualified Database.SQLite.Simple as DB
|
||||
import Simplex.FileTransfer.Agent (closeXFTPAgent, deleteRcvFile, receiveFile, sendFile, startWorkers, toFSFilePath)
|
||||
import Simplex.FileTransfer.Agent (closeXFTPAgent, deleteRcvFile, deleteSndFileInternal, receiveFile, sendFile, startWorkers, toFSFilePath)
|
||||
import Simplex.FileTransfer.Description (ValidFileDescription)
|
||||
import Simplex.FileTransfer.Protocol (FileParty (..))
|
||||
import Simplex.FileTransfer.Util (removePath)
|
||||
@@ -351,6 +352,10 @@ xftpDeleteRcvFile c = withAgentEnv c .: deleteRcvFile c
|
||||
xftpSendFile :: AgentErrorMonad m => AgentClient -> UserId -> FilePath -> Int -> m SndFileId
|
||||
xftpSendFile c = withAgentEnv c .:. sendFile c
|
||||
|
||||
-- | Delete XFTP snd file internally (deletes work files from file system and db records)
|
||||
xftpDeleteSndFileInternal :: AgentErrorMonad m => AgentClient -> UserId -> SndFileId -> m ()
|
||||
xftpDeleteSndFileInternal c = withAgentEnv c .: deleteSndFileInternal c
|
||||
|
||||
-- TODO rename setAgentForeground
|
||||
|
||||
-- | Activate operations
|
||||
@@ -1598,6 +1603,8 @@ cleanupManager c@AgentClient {subQ} = do
|
||||
deleteRcvFilesDeleted `catchError` (notify "" . RFERR)
|
||||
deleteRcvFilesTmpPaths `catchError` (notify "" . RFERR)
|
||||
deleteSndFilesExpired `catchError` (notify "" . SFERR)
|
||||
deleteSndFilesDeleted `catchError` (notify "" . SFERR)
|
||||
deleteSndFilesPrefixPaths `catchError` (notify "" . SFERR)
|
||||
liftIO $ threadDelay' int
|
||||
where
|
||||
deleteConns =
|
||||
@@ -1626,6 +1633,16 @@ cleanupManager c@AgentClient {subQ} = do
|
||||
forM_ sndExpired $ \(dbId, entId, p) -> flip catchError (notify entId . SFERR) $ do
|
||||
forM_ p $ removePath <=< toFSFilePath
|
||||
withStore' c (`deleteSndFile'` dbId)
|
||||
deleteSndFilesDeleted = do
|
||||
sndDeleted <- withStore' c getCleanupSndFilesDeleted
|
||||
forM_ sndDeleted $ \(dbId, entId, p) -> flip catchError (notify entId . SFERR) $ do
|
||||
forM_ p $ removePath <=< toFSFilePath
|
||||
withStore' c (`deleteSndFile'` dbId)
|
||||
deleteSndFilesPrefixPaths = do
|
||||
sndPrefixPaths <- withStore' c getCleanupSndFilesPrefixPaths
|
||||
forM_ sndPrefixPaths $ \(dbId, entId, p) -> flip catchError (notify entId . SFERR) $ do
|
||||
removePath =<< toFSFilePath p
|
||||
withStore' c (`updateSndFileNoPrefixPath` dbId)
|
||||
notify :: forall e. AEntityI e => EntityId -> ACommand 'Agent e -> ExceptT AgentErrorType m ()
|
||||
notify entId cmd = atomically $ writeTBQueue subQ ("", entId, APC (sAEntity @e) cmd)
|
||||
|
||||
|
||||
@@ -153,11 +153,14 @@ module Simplex.Messaging.Agent.Store.SQLite
|
||||
-- Snd files
|
||||
createSndFile,
|
||||
getSndFile,
|
||||
getSndFileByEntityId,
|
||||
getNextSndFileToPrepare,
|
||||
updateSndFileError,
|
||||
updateSndFileStatus,
|
||||
updateSndFileEncrypted,
|
||||
updateSndFileComplete,
|
||||
updateSndFileNoPrefixPath,
|
||||
updateSndFileDeleted,
|
||||
deleteSndFile',
|
||||
createSndFileReplica,
|
||||
getNextSndChunkToUpload,
|
||||
@@ -165,6 +168,8 @@ module Simplex.Messaging.Agent.Store.SQLite
|
||||
addSndChunkReplicaRecipients,
|
||||
updateSndChunkReplicaStatus,
|
||||
getPendingSndFilesServers,
|
||||
getCleanupSndFilesPrefixPaths,
|
||||
getCleanupSndFilesDeleted,
|
||||
getSndFilesExpired,
|
||||
|
||||
-- * utilities
|
||||
@@ -2103,6 +2108,16 @@ createSndFile db gVar userId numRecipients path prefixPath key nonce =
|
||||
"INSERT INTO snd_files (snd_file_entity_id, user_id, num_recipients, key, nonce, path, prefix_path, status) VALUES (?,?,?,?,?,?,?,?)"
|
||||
(sndFileEntityId, userId, numRecipients, key, nonce, path, prefixPath, SFSNew)
|
||||
|
||||
getSndFileByEntityId :: DB.Connection -> UserId -> SndFileId -> IO (Either StoreError SndFile)
|
||||
getSndFileByEntityId db userId sndFileEntityId = runExceptT $ do
|
||||
sndFileId <- ExceptT $ getSndFileIdByEntityId_ db userId sndFileEntityId
|
||||
ExceptT $ getSndFile db sndFileId
|
||||
|
||||
getSndFileIdByEntityId_ :: DB.Connection -> UserId -> SndFileId -> IO (Either StoreError DBSndFileId)
|
||||
getSndFileIdByEntityId_ db userId sndFileEntityId =
|
||||
firstRow fromOnly SEFileNotFound $
|
||||
DB.query db "SELECT snd_file_id FROM snd_files WHERE user_id = ? AND snd_file_entity_id = ?" (userId, sndFileEntityId)
|
||||
|
||||
getSndFile :: DB.Connection -> DBSndFileId -> IO (Either StoreError SndFile)
|
||||
getSndFile db sndFileId = runExceptT $ do
|
||||
f@SndFile {sndFileEntityId, userId, numRecipients, prefixPath} <- ExceptT getFile
|
||||
@@ -2219,6 +2234,16 @@ updateSndFileComplete db sndFileId = do
|
||||
updatedAt <- getCurrentTime
|
||||
DB.execute db "UPDATE snd_files SET prefix_path = NULL, status = ?, updated_at = ? WHERE snd_file_id = ?" (SFSComplete, updatedAt, sndFileId)
|
||||
|
||||
updateSndFileNoPrefixPath :: DB.Connection -> DBSndFileId -> IO ()
|
||||
updateSndFileNoPrefixPath db sndFileId = do
|
||||
updatedAt <- getCurrentTime
|
||||
DB.execute db "UPDATE snd_files SET prefix_path = NULL, updated_at = ? WHERE snd_file_id = ?" (updatedAt, sndFileId)
|
||||
|
||||
updateSndFileDeleted :: DB.Connection -> DBSndFileId -> IO ()
|
||||
updateSndFileDeleted db sndFileId = do
|
||||
updatedAt <- getCurrentTime
|
||||
DB.execute db "UPDATE snd_files SET deleted = 1, updated_at = ? WHERE snd_file_id = ?" (updatedAt, sndFileId)
|
||||
|
||||
deleteSndFile' :: DB.Connection -> DBSndFileId -> IO ()
|
||||
deleteSndFile' db sndFileId =
|
||||
DB.execute db "DELETE FROM snd_files WHERE snd_file_id = ?" (Only sndFileId)
|
||||
@@ -2335,6 +2360,27 @@ getPendingSndFilesServers db ttl = do
|
||||
toServer :: (NonEmpty TransportHost, ServiceName, C.KeyHash) -> XFTPServer
|
||||
toServer (host, port, keyHash) = XFTPServer host port keyHash
|
||||
|
||||
getCleanupSndFilesPrefixPaths :: DB.Connection -> IO [(DBSndFileId, SndFileId, FilePath)]
|
||||
getCleanupSndFilesPrefixPaths db =
|
||||
DB.query
|
||||
db
|
||||
[sql|
|
||||
SELECT snd_file_id, snd_file_entity_id, prefix_path
|
||||
FROM snd_files
|
||||
WHERE status IN (?,?) AND prefix_path IS NOT NULL
|
||||
|]
|
||||
(SFSComplete, SFSError)
|
||||
|
||||
getCleanupSndFilesDeleted :: DB.Connection -> IO [(DBSndFileId, SndFileId, Maybe FilePath)]
|
||||
getCleanupSndFilesDeleted db =
|
||||
DB.query_
|
||||
db
|
||||
[sql|
|
||||
SELECT snd_file_id, snd_file_entity_id, prefix_path
|
||||
FROM snd_files
|
||||
WHERE deleted = 1
|
||||
|]
|
||||
|
||||
getSndFilesExpired :: DB.Connection -> NominalDiffTime -> IO [(DBSndFileId, SndFileId, Maybe FilePath)]
|
||||
getSndFilesExpired db ttl = do
|
||||
cutoffTs <- addUTCTime (- ttl) <$> getCurrentTime
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
module XFTPAgent where
|
||||
|
||||
import AgentTests.FunctionalAPITests (get, getSMPAgentClient', rfGet, runRight, runRight_, sfGet)
|
||||
import Control.Concurrent (threadDelay)
|
||||
import Control.Logger.Simple
|
||||
import Control.Monad.Except
|
||||
import Data.Bifunctor (first)
|
||||
@@ -18,7 +19,7 @@ import SMPAgentClient (agentCfg, initAgentServers, testDB)
|
||||
import Simplex.FileTransfer.Description
|
||||
import Simplex.FileTransfer.Protocol (FileParty (..), XFTPErrorType (AUTH))
|
||||
import Simplex.FileTransfer.Server.Env (XFTPServerConfig (..))
|
||||
import Simplex.Messaging.Agent (AgentClient, disconnectAgentClient, testProtocolServer, xftpDeleteRcvFile, xftpReceiveFile, xftpSendFile, xftpStartWorkers)
|
||||
import Simplex.Messaging.Agent (AgentClient, disconnectAgentClient, testProtocolServer, xftpDeleteRcvFile, xftpDeleteSndFileInternal, xftpReceiveFile, xftpSendFile, xftpStartWorkers)
|
||||
import Simplex.Messaging.Agent.Client (ProtocolTestFailure (..), ProtocolTestStep (..))
|
||||
import Simplex.Messaging.Agent.Protocol (ACommand (..), AgentErrorType (..), BrokerErrorType (..), noAuthSrv)
|
||||
import Simplex.Messaging.Encoding.String (StrEncoding (..))
|
||||
@@ -36,6 +37,7 @@ xftpAgentTests = around_ testBracket . describe "Functional API" $ do
|
||||
it "should resume receiving file after restart" testXFTPAgentReceiveRestore
|
||||
it "should cleanup rcv tmp path after permanent error" testXFTPAgentReceiveCleanup
|
||||
it "should resume sending file after restart" testXFTPAgentSendRestore
|
||||
it "should cleanup snd prefix path after permanent error" testXFTPAgentSendCleanup
|
||||
describe "XFTP server test via agent API" $ do
|
||||
it "should pass without basic auth" $ testXFTPServerTest Nothing (noAuthSrv testXFTPServer2) `shouldReturn` Nothing
|
||||
let srv1 = testXFTPServer2 {keyHash = "1234"}
|
||||
@@ -87,6 +89,10 @@ testXFTPAgentSendReceive = withXFTPServer $ do
|
||||
sfProgress sndr $ mb 18
|
||||
("", sfId', SFDONE _sndDescr [rfd1, _rfd2]) <- sfGet sndr
|
||||
liftIO $ sfId' `shouldBe` sfId
|
||||
|
||||
-- delete snd file internally
|
||||
xftpDeleteSndFileInternal sndr 1 sfId
|
||||
|
||||
pure rfd1
|
||||
|
||||
-- receive file
|
||||
@@ -101,7 +107,7 @@ testXFTPAgentSendReceive = withXFTPServer $ do
|
||||
file <- B.readFile filePath
|
||||
B.readFile path `shouldReturn` file
|
||||
|
||||
-- delete file
|
||||
-- delete rcv file
|
||||
xftpDeleteRcvFile rcp 1 rfId
|
||||
|
||||
getFileDescription :: FilePath -> ExceptT AgentErrorType IO (ValidFileDescription 'FRecipient)
|
||||
@@ -143,7 +149,17 @@ testXFTPAgentReceiveRestore = withGlobalLogging logCfgNoLogs $ do
|
||||
doesDirectoryExist tmpPath `shouldReturn` True
|
||||
|
||||
withXFTPServerStoreLogOn $ \_ -> do
|
||||
-- receive file - should succeed with server up
|
||||
-- receive file - should start downloading with server up
|
||||
rcp' <- getSMPAgentClient' agentCfg initAgentServers testDB
|
||||
runRight_ $ xftpStartWorkers rcp' (Just recipientFiles)
|
||||
("", rfId', RFPROG _ _) <- rfGet rcp'
|
||||
liftIO $ rfId' `shouldBe` rfId
|
||||
disconnectAgentClient rcp'
|
||||
|
||||
threadDelay 100000
|
||||
|
||||
withXFTPServerStoreLogOn $ \_ -> do
|
||||
-- receive file - should continue downloading with server up
|
||||
rcp' <- getSMPAgentClient' agentCfg initAgentServers testDB
|
||||
runRight_ $ xftpStartWorkers rcp' (Just recipientFiles)
|
||||
rfProgress rcp' $ mb 18
|
||||
@@ -221,7 +237,17 @@ testXFTPAgentSendRestore = withGlobalLogging logCfgNoLogs $ do
|
||||
doesFileExist encPath `shouldReturn` True
|
||||
|
||||
withXFTPServerStoreLogOn $ \_ -> do
|
||||
-- send file - should succeed with server up
|
||||
-- send file - should start uploading with server up
|
||||
sndr' <- getSMPAgentClient' agentCfg initAgentServers testDB
|
||||
runRight_ $ xftpStartWorkers sndr' (Just senderFiles)
|
||||
("", sfId', SFPROG _ _) <- sfGet sndr'
|
||||
liftIO $ sfId' `shouldBe` sfId
|
||||
disconnectAgentClient sndr'
|
||||
|
||||
threadDelay 100000
|
||||
|
||||
withXFTPServerStoreLogOn $ \_ -> do
|
||||
-- send file - should continue uploading with server up
|
||||
sndr' <- getSMPAgentClient' agentCfg initAgentServers testDB
|
||||
runRight_ $ xftpStartWorkers sndr' (Just senderFiles)
|
||||
sfProgress sndr' $ mb 18
|
||||
@@ -244,6 +270,45 @@ testXFTPAgentSendRestore = withGlobalLogging logCfgNoLogs $ do
|
||||
file <- B.readFile filePath
|
||||
B.readFile path `shouldReturn` file
|
||||
|
||||
testXFTPAgentSendCleanup :: IO ()
|
||||
testXFTPAgentSendCleanup = withGlobalLogging logCfgNoLogs $ do
|
||||
-- create random file using cli
|
||||
let filePath = senderFiles </> "testfile"
|
||||
xftpCLI ["rand", filePath, "17mb"] `shouldReturn` ["File created: " <> filePath]
|
||||
getFileSize filePath `shouldReturn` mb 17
|
||||
|
||||
sfId <- withXFTPServerStoreLogOn $ \_ -> do
|
||||
-- send file
|
||||
sndr <- getSMPAgentClient' agentCfg initAgentServers testDB
|
||||
sfId <- runRight $ do
|
||||
xftpStartWorkers sndr (Just senderFiles)
|
||||
sfId <- xftpSendFile sndr 1 filePath 2
|
||||
-- wait for progress events for 5 out of 6 chunks - at this point all chunks should be created on the server
|
||||
forM_ [1 .. 5 :: Integer] $ \_ -> do
|
||||
(_, _, SFPROG _ _) <- sfGet sndr
|
||||
pure ()
|
||||
pure sfId
|
||||
disconnectAgentClient sndr
|
||||
pure sfId
|
||||
|
||||
dirEntries <- listDirectory senderFiles
|
||||
let prefixDir = fromJust $ find (isSuffixOf "_snd.xftp") dirEntries
|
||||
prefixPath = senderFiles </> prefixDir
|
||||
encPath = prefixPath </> "xftp.encrypted"
|
||||
doesDirectoryExist prefixPath `shouldReturn` True
|
||||
doesFileExist encPath `shouldReturn` True
|
||||
|
||||
withXFTPServerThreadOn $ \_ -> do
|
||||
-- send file - should fail with AUTH error
|
||||
sndr' <- getSMPAgentClient' agentCfg initAgentServers testDB
|
||||
runRight_ $ xftpStartWorkers sndr' (Just senderFiles)
|
||||
("", sfId', SFERR (INTERNAL "XFTP {xftpErr = AUTH}")) <- sfGet sndr'
|
||||
sfId' `shouldBe` sfId
|
||||
|
||||
-- prefix path should be removed after permanent error
|
||||
doesDirectoryExist prefixPath `shouldReturn` False
|
||||
doesFileExist encPath `shouldReturn` False
|
||||
|
||||
testXFTPServerTest :: Maybe BasicAuth -> XFTPServerWithAuth -> IO (Maybe ProtocolTestFailure)
|
||||
testXFTPServerTest newFileBasicAuth srv =
|
||||
withXFTPServerCfg testXFTPServerConfig {newFileBasicAuth, xftpPort = xftpTestPort2} $ \_ -> do
|
||||
|
||||
Reference in New Issue
Block a user