From d35bd8a95412f08d7e8ddaab27eff3a80bda2927 Mon Sep 17 00:00:00 2001 From: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com> Date: Tue, 11 Apr 2023 22:00:09 +0400 Subject: [PATCH] xftp: restore snd files, expire snd files in agent (#718) --- src/Simplex/FileTransfer/Agent.hs | 41 +++++++++------ src/Simplex/Messaging/Agent.hs | 8 +++ src/Simplex/Messaging/Agent/Env/SQLite.hs | 2 + src/Simplex/Messaging/Agent/Store/SQLite.hs | 58 ++++++++++++++++++--- tests/XFTPAgent.hs | 56 ++++++++++++++++++-- 5 files changed, 138 insertions(+), 27 deletions(-) diff --git a/src/Simplex/FileTransfer/Agent.hs b/src/Simplex/FileTransfer/Agent.hs index 75624426f..16353d6ee 100644 --- a/src/Simplex/FileTransfer/Agent.hs +++ b/src/Simplex/FileTransfer/Agent.hs @@ -75,9 +75,10 @@ startWorkers :: AgentMonad m => AgentClient -> Maybe FilePath -> m () startWorkers c workDir = do wd <- asks $ xftpWorkDir . xftpAgent atomically $ writeTVar wd workDir - startFiles + startRcvFiles + startSndFiles where - startFiles = do + startRcvFiles = do rcvFilesTTL <- asks (rcvFilesTTL . config) pendingRcvServers <- withStore' c (`getPendingRcvFilesServers` rcvFilesTTL) forM_ pendingRcvServers $ \s -> addXFTPRcvWorker c (Just s) @@ -85,6 +86,12 @@ startWorkers c workDir = do -- no need to make an extra query for the check -- as the worker will check the store anyway addXFTPRcvWorker c Nothing + startSndFiles = do + sndFilesTTL <- asks (sndFilesTTL . config) + -- start worker for files pending encryption/creation + addXFTPSndWorker c Nothing + pendingSndServers <- withStore' c (`getPendingSndFilesServers` sndFilesTTL) + forM_ pendingSndServers $ \s -> addXFTPSndWorker c (Just s) closeXFTPAgent :: MonadUnliftIO m => XFTPAgent -> m () closeXFTPAgent XFTPAgent {xftpRcvWorkers, xftpSndWorkers} = do @@ -158,11 +165,11 @@ runXFTPRcvWorker c srv doWork = do forever $ do void . atomically $ readTMVar doWork -- TODO waitUntilNotSuspended - agentOperationBracket c AORcvNetwork waitUntilActive runXftpOperation + agentOperationBracket c AORcvNetwork waitUntilActive runXFTPOperation where noWorkToDo = void . atomically $ tryTakeTMVar doWork - runXftpOperation :: m () - runXftpOperation = do + runXFTPOperation :: m () + runXFTPOperation = do rcvFilesTTL <- asks (rcvFilesTTL . config) nextChunk <- withStore' c $ \db -> getNextRcvChunkToDownload db srv rcvFilesTTL case nextChunk of @@ -229,10 +236,10 @@ runXFTPRcvLocalWorker c doWork = do forever $ do void . atomically $ readTMVar doWork -- TODO waitUntilNotSuspended - runXftpOperation + runXFTPOperation where - runXftpOperation :: m () - runXftpOperation = do + runXFTPOperation :: m () + runXFTPOperation = do rcvFilesTTL <- asks (rcvFilesTTL . config) nextFile <- withStore' c (`getNextRcvFileToDecrypt` rcvFilesTTL) case nextFile of @@ -350,11 +357,12 @@ runXFTPSndPrepareWorker c doWork = do forever $ do void . atomically $ readTMVar doWork -- TODO waitUntilNotSuspended - runXftpOperation + runXFTPOperation where - runXftpOperation :: m () - runXftpOperation = do - nextFile <- withStore' c getNextSndFileToPrepare + runXFTPOperation :: m () + runXFTPOperation = do + sndFilesTTL <- asks (sndFilesTTL . config) + nextFile <- withStore' c (`getNextSndFileToPrepare` sndFilesTTL) case nextFile of Nothing -> noWorkToDo Just f@SndFile {sndFileId, sndFileEntityId, prefixPath} -> @@ -436,12 +444,13 @@ runXFTPSndWorker c srv doWork = do forever $ do void . atomically $ readTMVar doWork -- TODO waitUntilNotSuspended - agentOperationBracket c AOSndNetwork throwWhenInactive runXftpOperation + agentOperationBracket c AOSndNetwork throwWhenInactive runXFTPOperation where noWorkToDo = void . atomically $ tryTakeTMVar doWork - runXftpOperation :: m () - runXftpOperation = do - nextChunk <- withStore' c $ \db -> getNextSndChunkToUpload db srv + runXFTPOperation :: m () + runXFTPOperation = do + sndFilesTTL <- asks (sndFilesTTL . config) + nextChunk <- withStore' c $ \db -> getNextSndChunkToUpload db srv sndFilesTTL case nextChunk of Nothing -> noWorkToDo Just SndFileChunk {sndFileId, sndFileEntityId, filePrefixPath, replicas = []} -> sndWorkerInternalError c sndFileId sndFileEntityId (Just filePrefixPath) "chunk has no replicas" diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index e83621396..17e2da789 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -97,6 +97,7 @@ where import Control.Concurrent.STM (stateTVar) import Control.Logger.Simple (logError, logInfo, showText) +import Control.Monad ((<=<)) import Control.Monad.Except import Control.Monad.IO.Unlift (MonadUnliftIO) import Control.Monad.Reader @@ -1596,6 +1597,7 @@ cleanupManager c@AgentClient {subQ} = do deleteRcvFilesExpired `catchError` (notify "" . RFERR) deleteRcvFilesDeleted `catchError` (notify "" . RFERR) deleteRcvFilesTmpPaths `catchError` (notify "" . RFERR) + deleteSndFilesExpired `catchError` (notify "" . SFERR) liftIO $ threadDelay' int where deleteConns = @@ -1618,6 +1620,12 @@ cleanupManager c@AgentClient {subQ} = do forM_ rcvTmpPaths $ \(dbId, entId, p) -> flip catchError (notify entId . RFERR) $ do removePath =<< toFSFilePath p withStore' c (`updateRcvFileNoTmpPath` dbId) + deleteSndFilesExpired = do + sndFilesTTL <- asks (sndFilesTTL . config) + sndExpired <- withStore' c (`getSndFilesExpired` sndFilesTTL) + forM_ sndExpired $ \(dbId, entId, p) -> flip catchError (notify entId . SFERR) $ do + forM_ p $ removePath <=< toFSFilePath + withStore' c (`deleteSndFile'` dbId) notify :: forall e. AEntityI e => EntityId -> ACommand 'Agent e -> ExceptT AgentErrorType m () notify entId cmd = atomically $ writeTBQueue subQ ("", entId, APC (sAEntity @e) cmd) diff --git a/src/Simplex/Messaging/Agent/Env/SQLite.hs b/src/Simplex/Messaging/Agent/Env/SQLite.hs index c7b361b31..25f2e7495 100644 --- a/src/Simplex/Messaging/Agent/Env/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Env/SQLite.hs @@ -83,6 +83,7 @@ data AgentConfig = AgentConfig initialCleanupDelay :: Int64, cleanupInterval :: Int64, rcvFilesTTL :: NominalDiffTime, + sndFilesTTL :: NominalDiffTime, xftpNotifyErrsOnRetry :: Bool, xftpMaxRecipientsPerRequest :: Int, deleteErrorCount :: Int, @@ -145,6 +146,7 @@ defaultAgentConfig = initialCleanupDelay = 30 * 1000000, -- 30 seconds cleanupInterval = 30 * 60 * 1000000, -- 30 minutes rcvFilesTTL = 2 * nominalDay, + sndFilesTTL = nominalDay, xftpNotifyErrsOnRetry = True, xftpMaxRecipientsPerRequest = 200, deleteErrorCount = 10, diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index 480a477c0..2bd9d833d 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -158,11 +158,14 @@ module Simplex.Messaging.Agent.Store.SQLite updateSndFileStatus, updateSndFileEncrypted, updateSndFileComplete, + deleteSndFile', createSndFileReplica, getNextSndChunkToUpload, updateSndChunkReplicaDelay, addSndChunkReplicaRecipients, updateSndChunkReplicaStatus, + getPendingSndFilesServers, + getSndFilesExpired, -- * utilities withConnection, @@ -2176,8 +2179,9 @@ getChunkReplicaRecipients_ db replicaId = |] (Only replicaId) -getNextSndFileToPrepare :: DB.Connection -> IO (Maybe SndFile) -getNextSndFileToPrepare db = do +getNextSndFileToPrepare :: DB.Connection -> NominalDiffTime -> IO (Maybe SndFile) +getNextSndFileToPrepare db ttl = do + cutoffTs <- addUTCTime (- ttl) <$> getCurrentTime fileId_ :: Maybe DBSndFileId <- maybeFirstRow fromOnly $ DB.query @@ -2185,10 +2189,10 @@ getNextSndFileToPrepare db = do [sql| SELECT snd_file_id FROM snd_files - WHERE status IN (?,?,?) AND deleted = 0 + WHERE status IN (?,?,?) AND deleted = 0 AND created_at >= ? ORDER BY created_at ASC LIMIT 1 |] - (SFSNew, SFSEncrypting, SFSEncrypted) + (SFSNew, SFSEncrypting, SFSEncrypted, cutoffTs) case fileId_ of Nothing -> pure Nothing Just fileId -> eitherToMaybe <$> getSndFile db fileId @@ -2215,6 +2219,10 @@ updateSndFileComplete db sndFileId = do updatedAt <- getCurrentTime DB.execute db "UPDATE snd_files SET prefix_path = NULL, status = ?, updated_at = ? WHERE snd_file_id = ?" (SFSComplete, updatedAt, sndFileId) +deleteSndFile' :: DB.Connection -> DBSndFileId -> IO () +deleteSndFile' db sndFileId = + DB.execute db "DELETE FROM snd_files WHERE snd_file_id = ?" (Only sndFileId) + createSndFileReplica :: DB.Connection -> SndFileChunk -> NewSndChunkReplica -> IO () createSndFileReplica db SndFileChunk {sndChunkId} NewSndChunkReplica {server, replicaId, replicaKey, rcvIdsKeys} = do srvId <- createXFTPServer_ db server @@ -2237,8 +2245,9 @@ createSndFileReplica db SndFileChunk {sndChunkId} NewSndChunkReplica {server, re |] (rId, rcvId, rcvKey) -getNextSndChunkToUpload :: DB.Connection -> XFTPServer -> IO (Maybe SndFileChunk) -getNextSndChunkToUpload db server@ProtocolServer {host, port, keyHash} = do +getNextSndChunkToUpload :: DB.Connection -> XFTPServer -> NominalDiffTime -> IO (Maybe SndFileChunk) +getNextSndChunkToUpload db server@ProtocolServer {host, port, keyHash} ttl = do + cutoffTs <- addUTCTime (- ttl) <$> getCurrentTime chunk_ <- maybeFirstRow toChunk $ DB.query @@ -2254,11 +2263,11 @@ getNextSndChunkToUpload db server@ProtocolServer {host, port, keyHash} = do JOIN snd_files f ON f.snd_file_id = c.snd_file_id WHERE s.xftp_host = ? AND s.xftp_port = ? AND s.xftp_key_hash = ? AND r.replica_status = ? AND r.replica_number = 1 - AND (f.status = ? OR f.status = ?) AND f.deleted = 0 + AND (f.status = ? OR f.status = ?) AND f.deleted = 0 AND f.created_at >= ? ORDER BY r.created_at ASC LIMIT 1 |] - (host, port, keyHash, SFRSCreated, SFSEncrypted, SFSUploading) + (host, port, keyHash, SFRSCreated, SFSEncrypted, SFSUploading, cutoffTs) forM chunk_ $ \chunk@SndFileChunk {replicas} -> do replicas' <- forM replicas $ \replica@SndFileChunkReplica {sndChunkReplicaId} -> do rcvIdsKeys <- getChunkReplicaRecipients_ db sndChunkReplicaId @@ -2304,3 +2313,36 @@ updateSndChunkReplicaStatus :: DB.Connection -> Int64 -> SndFileReplicaStatus -> updateSndChunkReplicaStatus db replicaId status = do updatedAt <- getCurrentTime DB.execute db "UPDATE snd_file_chunk_replicas SET replica_status = ?, updated_at = ? WHERE snd_file_chunk_replica_id = ?" (status, updatedAt, replicaId) + +getPendingSndFilesServers :: DB.Connection -> NominalDiffTime -> IO [XFTPServer] +getPendingSndFilesServers db ttl = do + cutoffTs <- addUTCTime (- ttl) <$> getCurrentTime + map toServer + <$> DB.query + db + [sql| + SELECT DISTINCT + s.xftp_host, s.xftp_port, s.xftp_key_hash + FROM snd_file_chunk_replicas r + JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id + JOIN snd_file_chunks c ON c.snd_file_chunk_id = r.snd_file_chunk_id + JOIN snd_files f ON f.snd_file_id = c.snd_file_id + WHERE r.replica_status = ? AND r.replica_number = 1 + AND (f.status = ? OR f.status = ?) AND f.deleted = 0 AND f.created_at >= ? + |] + (SFRSCreated, SFSEncrypted, SFSUploading, cutoffTs) + where + toServer :: (NonEmpty TransportHost, ServiceName, C.KeyHash) -> XFTPServer + toServer (host, port, keyHash) = XFTPServer host port keyHash + +getSndFilesExpired :: DB.Connection -> NominalDiffTime -> IO [(DBSndFileId, SndFileId, Maybe FilePath)] +getSndFilesExpired db ttl = do + cutoffTs <- addUTCTime (- ttl) <$> getCurrentTime + DB.query + db + [sql| + SELECT snd_file_id, snd_file_entity_id, prefix_path + FROM snd_files + WHERE created_at < ? + |] + (Only cutoffTs) diff --git a/tests/XFTPAgent.hs b/tests/XFTPAgent.hs index 8c2b2f814..a52a34f44 100644 --- a/tests/XFTPAgent.hs +++ b/tests/XFTPAgent.hs @@ -12,6 +12,8 @@ import Control.Monad.Except import Data.Bifunctor (first) import qualified Data.ByteString.Char8 as B import Data.Int (Int64) +import Data.List (find, isSuffixOf) +import Data.Maybe (fromJust) import SMPAgentClient (agentCfg, initAgentServers, testDB) import Simplex.FileTransfer.Description import Simplex.FileTransfer.Protocol (FileParty (..), XFTPErrorType (AUTH)) @@ -21,7 +23,7 @@ import Simplex.Messaging.Agent.Client (ProtocolTestFailure (..), ProtocolTestSte import Simplex.Messaging.Agent.Protocol (ACommand (..), AgentErrorType (..), BrokerErrorType (..), noAuthSrv) import Simplex.Messaging.Encoding.String (StrEncoding (..)) import Simplex.Messaging.Protocol (BasicAuth, ProtoServerWithAuth (..), ProtocolServer (..), XFTPServerWithAuth) -import System.Directory (doesDirectoryExist, getFileSize, listDirectory) +import System.Directory (doesDirectoryExist, doesFileExist, getFileSize, listDirectory) import System.FilePath (()) import System.Timeout (timeout) import Test.Hspec @@ -33,6 +35,7 @@ xftpAgentTests = around_ testBracket . describe "Functional API" $ do it "should send and receive file" testXFTPAgentSendReceive it "should resume receiving file after restart" testXFTPAgentReceiveRestore it "should cleanup rcv tmp path after permanent error" testXFTPAgentReceiveCleanup + it "should resume sending file after restart" testXFTPAgentSendRestore describe "XFTP server test via agent API" $ do it "should pass without basic auth" $ testXFTPServerTest Nothing (noAuthSrv testXFTPServer2) `shouldReturn` Nothing let srv1 = testXFTPServer2 {keyHash = "1234"} @@ -126,7 +129,7 @@ testXFTPAgentReceiveRestore = withGlobalLogging logCfgNoLogs $ do liftIO $ sfId' `shouldBe` sfId pure rfd1 - -- receive file - should not succeed due to server being down + -- receive file - should not succeed with server down rcp <- getSMPAgentClient' agentCfg initAgentServers testDB rfId <- runRight $ do xftpStartWorkers rcp (Just recipientFiles) @@ -171,7 +174,7 @@ testXFTPAgentReceiveCleanup = withGlobalLogging logCfgNoLogs $ do liftIO $ sfId' `shouldBe` sfId pure rfd1 - -- receive file - should not succeed due to server being down + -- receive file - should not succeed with server down rcp <- getSMPAgentClient' agentCfg initAgentServers testDB rfId <- runRight $ do xftpStartWorkers rcp (Just recipientFiles) @@ -194,6 +197,53 @@ testXFTPAgentReceiveCleanup = withGlobalLogging logCfgNoLogs $ do -- tmp path should be removed after permanent error doesDirectoryExist tmpPath `shouldReturn` False +testXFTPAgentSendRestore :: IO () +testXFTPAgentSendRestore = withGlobalLogging logCfgNoLogs $ do + -- create random file using cli + let filePath = senderFiles "testfile" + xftpCLI ["rand", filePath, "17mb"] `shouldReturn` ["File created: " <> filePath] + getFileSize filePath `shouldReturn` mb 17 + + -- send file - should not succeed with server down + sndr <- getSMPAgentClient' agentCfg initAgentServers testDB + sfId <- runRight $ do + xftpStartWorkers sndr (Just senderFiles) + sfId <- xftpSendFile sndr 1 filePath 2 + liftIO $ timeout 1000000 (get sndr) `shouldReturn` Nothing -- wait for worker to encrypt and attempt to create file + pure sfId + disconnectAgentClient sndr + + dirEntries <- listDirectory senderFiles + let prefixDir = fromJust $ find (isSuffixOf "_snd.xftp") dirEntries + prefixPath = senderFiles prefixDir + encPath = prefixPath "xftp.encrypted" + doesDirectoryExist prefixPath `shouldReturn` True + doesFileExist encPath `shouldReturn` True + + withXFTPServerStoreLogOn $ \_ -> do + -- send file - should succeed with server up + sndr' <- getSMPAgentClient' agentCfg initAgentServers testDB + runRight_ $ xftpStartWorkers sndr' (Just senderFiles) + sfProgress sndr' $ mb 18 + ("", sfId', SFDONE _sndDescr [rfd1, _rfd2]) <- sfGet sndr' + liftIO $ sfId' `shouldBe` sfId + + -- prefix path should be removed after sending file + doesDirectoryExist prefixPath `shouldReturn` False + doesFileExist encPath `shouldReturn` False + + -- receive file + rcp <- getSMPAgentClient' agentCfg initAgentServers testDB + runRight_ $ do + xftpStartWorkers rcp (Just recipientFiles) + rfId <- xftpReceiveFile rcp 1 rfd1 + rfProgress rcp $ mb 18 + ("", rfId', RFDONE path) <- rfGet rcp + liftIO $ do + rfId' `shouldBe` rfId + file <- B.readFile filePath + B.readFile path `shouldReturn` file + testXFTPServerTest :: Maybe BasicAuth -> XFTPServerWithAuth -> IO (Maybe ProtocolTestFailure) testXFTPServerTest newFileBasicAuth srv = withXFTPServerCfg testXFTPServerConfig {newFileBasicAuth, xftpPort = xftpTestPort2} $ \_ -> do