diff --git a/src/Simplex/FileTransfer/Agent.hs b/src/Simplex/FileTransfer/Agent.hs index 16353d6ee..f032070f7 100644 --- a/src/Simplex/FileTransfer/Agent.hs +++ b/src/Simplex/FileTransfer/Agent.hs @@ -20,6 +20,7 @@ module Simplex.FileTransfer.Agent -- Sending files sendFileExperimental, sendFile, + deleteSndFileInternal, ) where @@ -435,7 +436,7 @@ runXFTPSndPrepareWorker c doWork = do sndWorkerInternalError :: AgentMonad m => AgentClient -> DBSndFileId -> SndFileId -> Maybe FilePath -> String -> m () sndWorkerInternalError c sndFileId sndFileEntityId prefixPath internalErrStr = do - forM_ prefixPath (removePath <=< toFSFilePath) + forM_ prefixPath $ removePath <=< toFSFilePath withStore' c $ \db -> updateSndFileError db sndFileId internalErrStr notify c sndFileEntityId $ SFERR $ INTERNAL internalErrStr @@ -490,7 +491,7 @@ runXFTPSndWorker c srv doWork = do when complete $ do (sndDescr, rcvDescrs) <- sndFileToDescrs sf notify c sndFileEntityId $ SFDONE sndDescr rcvDescrs - forM_ prefixPath (removePath <=< toFSFilePath) + forM_ prefixPath $ removePath <=< toFSFilePath withStore' c $ \db -> updateSndFileComplete db sndFileId where addRecipients :: SndFileChunk -> SndFileChunkReplica -> m SndFileChunkReplica @@ -568,3 +569,12 @@ runXFTPSndWorker c srv doWork = do chunkUploaded :: SndFileChunk -> Bool chunkUploaded SndFileChunk {replicas} = any (\SndFileChunkReplica {replicaStatus} -> replicaStatus == SFRSUploaded) replicas + +deleteSndFileInternal :: AgentMonad m => AgentClient -> UserId -> SndFileId -> m () +deleteSndFileInternal c userId sndFileEntityId = do + SndFile {sndFileId, prefixPath, status} <- withStore c $ \db -> getSndFileByEntityId db userId sndFileEntityId + if status == SFSComplete || status == SFSError + then do + forM_ prefixPath $ removePath <=< toFSFilePath + withStore' c (`deleteSndFile'` sndFileId) + else withStore' c (`updateSndFileDeleted` sndFileId) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 17e2da789..05a4d1357 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -84,6 +84,7 @@ module Simplex.Messaging.Agent xftpReceiveFile, xftpDeleteRcvFile, xftpSendFile, + xftpDeleteSndFileInternal, activateAgent, suspendAgent, execAgentStoreSQL, @@ -119,7 +120,7 @@ import qualified Data.Text as T import Data.Time.Clock import Data.Time.Clock.System (systemToUTCTime) import qualified Database.SQLite.Simple as DB -import Simplex.FileTransfer.Agent (closeXFTPAgent, deleteRcvFile, receiveFile, sendFile, startWorkers, toFSFilePath) +import Simplex.FileTransfer.Agent (closeXFTPAgent, deleteRcvFile, deleteSndFileInternal, receiveFile, sendFile, startWorkers, toFSFilePath) import Simplex.FileTransfer.Description (ValidFileDescription) import Simplex.FileTransfer.Protocol (FileParty (..)) import Simplex.FileTransfer.Util (removePath) @@ -351,6 +352,10 @@ xftpDeleteRcvFile c = withAgentEnv c .: deleteRcvFile c xftpSendFile :: AgentErrorMonad m => AgentClient -> UserId -> FilePath -> Int -> m SndFileId xftpSendFile c = withAgentEnv c .:. sendFile c +-- | Delete XFTP snd file internally (deletes work files from file system and db records) +xftpDeleteSndFileInternal :: AgentErrorMonad m => AgentClient -> UserId -> SndFileId -> m () +xftpDeleteSndFileInternal c = withAgentEnv c .: deleteSndFileInternal c + -- TODO rename setAgentForeground -- | Activate operations @@ -1598,6 +1603,8 @@ cleanupManager c@AgentClient {subQ} = do deleteRcvFilesDeleted `catchError` (notify "" . RFERR) deleteRcvFilesTmpPaths `catchError` (notify "" . RFERR) deleteSndFilesExpired `catchError` (notify "" . SFERR) + deleteSndFilesDeleted `catchError` (notify "" . SFERR) + deleteSndFilesPrefixPaths `catchError` (notify "" . SFERR) liftIO $ threadDelay' int where deleteConns = @@ -1626,6 +1633,16 @@ cleanupManager c@AgentClient {subQ} = do forM_ sndExpired $ \(dbId, entId, p) -> flip catchError (notify entId . SFERR) $ do forM_ p $ removePath <=< toFSFilePath withStore' c (`deleteSndFile'` dbId) + deleteSndFilesDeleted = do + sndDeleted <- withStore' c getCleanupSndFilesDeleted + forM_ sndDeleted $ \(dbId, entId, p) -> flip catchError (notify entId . SFERR) $ do + forM_ p $ removePath <=< toFSFilePath + withStore' c (`deleteSndFile'` dbId) + deleteSndFilesPrefixPaths = do + sndPrefixPaths <- withStore' c getCleanupSndFilesPrefixPaths + forM_ sndPrefixPaths $ \(dbId, entId, p) -> flip catchError (notify entId . SFERR) $ do + removePath =<< toFSFilePath p + withStore' c (`updateSndFileNoPrefixPath` dbId) notify :: forall e. AEntityI e => EntityId -> ACommand 'Agent e -> ExceptT AgentErrorType m () notify entId cmd = atomically $ writeTBQueue subQ ("", entId, APC (sAEntity @e) cmd) diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index 2bd9d833d..b960c547d 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -153,11 +153,14 @@ module Simplex.Messaging.Agent.Store.SQLite -- Snd files createSndFile, getSndFile, + getSndFileByEntityId, getNextSndFileToPrepare, updateSndFileError, updateSndFileStatus, updateSndFileEncrypted, updateSndFileComplete, + updateSndFileNoPrefixPath, + updateSndFileDeleted, deleteSndFile', createSndFileReplica, getNextSndChunkToUpload, @@ -165,6 +168,8 @@ module Simplex.Messaging.Agent.Store.SQLite addSndChunkReplicaRecipients, updateSndChunkReplicaStatus, getPendingSndFilesServers, + getCleanupSndFilesPrefixPaths, + getCleanupSndFilesDeleted, getSndFilesExpired, -- * utilities @@ -2103,6 +2108,16 @@ createSndFile db gVar userId numRecipients path prefixPath key nonce = "INSERT INTO snd_files (snd_file_entity_id, user_id, num_recipients, key, nonce, path, prefix_path, status) VALUES (?,?,?,?,?,?,?,?)" (sndFileEntityId, userId, numRecipients, key, nonce, path, prefixPath, SFSNew) +getSndFileByEntityId :: DB.Connection -> UserId -> SndFileId -> IO (Either StoreError SndFile) +getSndFileByEntityId db userId sndFileEntityId = runExceptT $ do + sndFileId <- ExceptT $ getSndFileIdByEntityId_ db userId sndFileEntityId + ExceptT $ getSndFile db sndFileId + +getSndFileIdByEntityId_ :: DB.Connection -> UserId -> SndFileId -> IO (Either StoreError DBSndFileId) +getSndFileIdByEntityId_ db userId sndFileEntityId = + firstRow fromOnly SEFileNotFound $ + DB.query db "SELECT snd_file_id FROM snd_files WHERE user_id = ? AND snd_file_entity_id = ?" (userId, sndFileEntityId) + getSndFile :: DB.Connection -> DBSndFileId -> IO (Either StoreError SndFile) getSndFile db sndFileId = runExceptT $ do f@SndFile {sndFileEntityId, userId, numRecipients, prefixPath} <- ExceptT getFile @@ -2219,6 +2234,16 @@ updateSndFileComplete db sndFileId = do updatedAt <- getCurrentTime DB.execute db "UPDATE snd_files SET prefix_path = NULL, status = ?, updated_at = ? WHERE snd_file_id = ?" (SFSComplete, updatedAt, sndFileId) +updateSndFileNoPrefixPath :: DB.Connection -> DBSndFileId -> IO () +updateSndFileNoPrefixPath db sndFileId = do + updatedAt <- getCurrentTime + DB.execute db "UPDATE snd_files SET prefix_path = NULL, updated_at = ? WHERE snd_file_id = ?" (updatedAt, sndFileId) + +updateSndFileDeleted :: DB.Connection -> DBSndFileId -> IO () +updateSndFileDeleted db sndFileId = do + updatedAt <- getCurrentTime + DB.execute db "UPDATE snd_files SET deleted = 1, updated_at = ? WHERE snd_file_id = ?" (updatedAt, sndFileId) + deleteSndFile' :: DB.Connection -> DBSndFileId -> IO () deleteSndFile' db sndFileId = DB.execute db "DELETE FROM snd_files WHERE snd_file_id = ?" (Only sndFileId) @@ -2335,6 +2360,27 @@ getPendingSndFilesServers db ttl = do toServer :: (NonEmpty TransportHost, ServiceName, C.KeyHash) -> XFTPServer toServer (host, port, keyHash) = XFTPServer host port keyHash +getCleanupSndFilesPrefixPaths :: DB.Connection -> IO [(DBSndFileId, SndFileId, FilePath)] +getCleanupSndFilesPrefixPaths db = + DB.query + db + [sql| + SELECT snd_file_id, snd_file_entity_id, prefix_path + FROM snd_files + WHERE status IN (?,?) AND prefix_path IS NOT NULL + |] + (SFSComplete, SFSError) + +getCleanupSndFilesDeleted :: DB.Connection -> IO [(DBSndFileId, SndFileId, Maybe FilePath)] +getCleanupSndFilesDeleted db = + DB.query_ + db + [sql| + SELECT snd_file_id, snd_file_entity_id, prefix_path + FROM snd_files + WHERE deleted = 1 + |] + getSndFilesExpired :: DB.Connection -> NominalDiffTime -> IO [(DBSndFileId, SndFileId, Maybe FilePath)] getSndFilesExpired db ttl = do cutoffTs <- addUTCTime (- ttl) <$> getCurrentTime diff --git a/tests/XFTPAgent.hs b/tests/XFTPAgent.hs index a52a34f44..2f769d9da 100644 --- a/tests/XFTPAgent.hs +++ b/tests/XFTPAgent.hs @@ -7,6 +7,7 @@ module XFTPAgent where import AgentTests.FunctionalAPITests (get, getSMPAgentClient', rfGet, runRight, runRight_, sfGet) +import Control.Concurrent (threadDelay) import Control.Logger.Simple import Control.Monad.Except import Data.Bifunctor (first) @@ -18,7 +19,7 @@ import SMPAgentClient (agentCfg, initAgentServers, testDB) import Simplex.FileTransfer.Description import Simplex.FileTransfer.Protocol (FileParty (..), XFTPErrorType (AUTH)) import Simplex.FileTransfer.Server.Env (XFTPServerConfig (..)) -import Simplex.Messaging.Agent (AgentClient, disconnectAgentClient, testProtocolServer, xftpDeleteRcvFile, xftpReceiveFile, xftpSendFile, xftpStartWorkers) +import Simplex.Messaging.Agent (AgentClient, disconnectAgentClient, testProtocolServer, xftpDeleteRcvFile, xftpDeleteSndFileInternal, xftpReceiveFile, xftpSendFile, xftpStartWorkers) import Simplex.Messaging.Agent.Client (ProtocolTestFailure (..), ProtocolTestStep (..)) import Simplex.Messaging.Agent.Protocol (ACommand (..), AgentErrorType (..), BrokerErrorType (..), noAuthSrv) import Simplex.Messaging.Encoding.String (StrEncoding (..)) @@ -36,6 +37,7 @@ xftpAgentTests = around_ testBracket . describe "Functional API" $ do it "should resume receiving file after restart" testXFTPAgentReceiveRestore it "should cleanup rcv tmp path after permanent error" testXFTPAgentReceiveCleanup it "should resume sending file after restart" testXFTPAgentSendRestore + it "should cleanup snd prefix path after permanent error" testXFTPAgentSendCleanup describe "XFTP server test via agent API" $ do it "should pass without basic auth" $ testXFTPServerTest Nothing (noAuthSrv testXFTPServer2) `shouldReturn` Nothing let srv1 = testXFTPServer2 {keyHash = "1234"} @@ -87,6 +89,10 @@ testXFTPAgentSendReceive = withXFTPServer $ do sfProgress sndr $ mb 18 ("", sfId', SFDONE _sndDescr [rfd1, _rfd2]) <- sfGet sndr liftIO $ sfId' `shouldBe` sfId + + -- delete snd file internally + xftpDeleteSndFileInternal sndr 1 sfId + pure rfd1 -- receive file @@ -101,7 +107,7 @@ testXFTPAgentSendReceive = withXFTPServer $ do file <- B.readFile filePath B.readFile path `shouldReturn` file - -- delete file + -- delete rcv file xftpDeleteRcvFile rcp 1 rfId getFileDescription :: FilePath -> ExceptT AgentErrorType IO (ValidFileDescription 'FRecipient) @@ -143,7 +149,17 @@ testXFTPAgentReceiveRestore = withGlobalLogging logCfgNoLogs $ do doesDirectoryExist tmpPath `shouldReturn` True withXFTPServerStoreLogOn $ \_ -> do - -- receive file - should succeed with server up + -- receive file - should start downloading with server up + rcp' <- getSMPAgentClient' agentCfg initAgentServers testDB + runRight_ $ xftpStartWorkers rcp' (Just recipientFiles) + ("", rfId', RFPROG _ _) <- rfGet rcp' + liftIO $ rfId' `shouldBe` rfId + disconnectAgentClient rcp' + + threadDelay 100000 + + withXFTPServerStoreLogOn $ \_ -> do + -- receive file - should continue downloading with server up rcp' <- getSMPAgentClient' agentCfg initAgentServers testDB runRight_ $ xftpStartWorkers rcp' (Just recipientFiles) rfProgress rcp' $ mb 18 @@ -221,7 +237,17 @@ testXFTPAgentSendRestore = withGlobalLogging logCfgNoLogs $ do doesFileExist encPath `shouldReturn` True withXFTPServerStoreLogOn $ \_ -> do - -- send file - should succeed with server up + -- send file - should start uploading with server up + sndr' <- getSMPAgentClient' agentCfg initAgentServers testDB + runRight_ $ xftpStartWorkers sndr' (Just senderFiles) + ("", sfId', SFPROG _ _) <- sfGet sndr' + liftIO $ sfId' `shouldBe` sfId + disconnectAgentClient sndr' + + threadDelay 100000 + + withXFTPServerStoreLogOn $ \_ -> do + -- send file - should continue uploading with server up sndr' <- getSMPAgentClient' agentCfg initAgentServers testDB runRight_ $ xftpStartWorkers sndr' (Just senderFiles) sfProgress sndr' $ mb 18 @@ -244,6 +270,45 @@ testXFTPAgentSendRestore = withGlobalLogging logCfgNoLogs $ do file <- B.readFile filePath B.readFile path `shouldReturn` file +testXFTPAgentSendCleanup :: IO () +testXFTPAgentSendCleanup = withGlobalLogging logCfgNoLogs $ do + -- create random file using cli + let filePath = senderFiles "testfile" + xftpCLI ["rand", filePath, "17mb"] `shouldReturn` ["File created: " <> filePath] + getFileSize filePath `shouldReturn` mb 17 + + sfId <- withXFTPServerStoreLogOn $ \_ -> do + -- send file + sndr <- getSMPAgentClient' agentCfg initAgentServers testDB + sfId <- runRight $ do + xftpStartWorkers sndr (Just senderFiles) + sfId <- xftpSendFile sndr 1 filePath 2 + -- wait for progress events for 5 out of 6 chunks - at this point all chunks should be created on the server + forM_ [1 .. 5 :: Integer] $ \_ -> do + (_, _, SFPROG _ _) <- sfGet sndr + pure () + pure sfId + disconnectAgentClient sndr + pure sfId + + dirEntries <- listDirectory senderFiles + let prefixDir = fromJust $ find (isSuffixOf "_snd.xftp") dirEntries + prefixPath = senderFiles prefixDir + encPath = prefixPath "xftp.encrypted" + doesDirectoryExist prefixPath `shouldReturn` True + doesFileExist encPath `shouldReturn` True + + withXFTPServerThreadOn $ \_ -> do + -- send file - should fail with AUTH error + sndr' <- getSMPAgentClient' agentCfg initAgentServers testDB + runRight_ $ xftpStartWorkers sndr' (Just senderFiles) + ("", sfId', SFERR (INTERNAL "XFTP {xftpErr = AUTH}")) <- sfGet sndr' + sfId' `shouldBe` sfId + + -- prefix path should be removed after permanent error + doesDirectoryExist prefixPath `shouldReturn` False + doesFileExist encPath `shouldReturn` False + testXFTPServerTest :: Maybe BasicAuth -> XFTPServerWithAuth -> IO (Maybe ProtocolTestFailure) testXFTPServerTest newFileBasicAuth srv = withXFTPServerCfg testXFTPServerConfig {newFileBasicAuth, xftpPort = xftpTestPort2} $ \_ -> do