xftp: restore snd files, expire snd files in agent (#718)

This commit is contained in:
spaced4ndy
2023-04-11 22:00:09 +04:00
committed by GitHub
parent d1774e5b56
commit d35bd8a954
5 changed files with 138 additions and 27 deletions

View File

@@ -75,9 +75,10 @@ startWorkers :: AgentMonad m => AgentClient -> Maybe FilePath -> m ()
startWorkers c workDir = do
wd <- asks $ xftpWorkDir . xftpAgent
atomically $ writeTVar wd workDir
startFiles
startRcvFiles
startSndFiles
where
startFiles = do
startRcvFiles = do
rcvFilesTTL <- asks (rcvFilesTTL . config)
pendingRcvServers <- withStore' c (`getPendingRcvFilesServers` rcvFilesTTL)
forM_ pendingRcvServers $ \s -> addXFTPRcvWorker c (Just s)
@@ -85,6 +86,12 @@ startWorkers c workDir = do
-- no need to make an extra query for the check
-- as the worker will check the store anyway
addXFTPRcvWorker c Nothing
startSndFiles = do
sndFilesTTL <- asks (sndFilesTTL . config)
-- start worker for files pending encryption/creation
addXFTPSndWorker c Nothing
pendingSndServers <- withStore' c (`getPendingSndFilesServers` sndFilesTTL)
forM_ pendingSndServers $ \s -> addXFTPSndWorker c (Just s)
closeXFTPAgent :: MonadUnliftIO m => XFTPAgent -> m ()
closeXFTPAgent XFTPAgent {xftpRcvWorkers, xftpSndWorkers} = do
@@ -158,11 +165,11 @@ runXFTPRcvWorker c srv doWork = do
forever $ do
void . atomically $ readTMVar doWork
-- TODO waitUntilNotSuspended
agentOperationBracket c AORcvNetwork waitUntilActive runXftpOperation
agentOperationBracket c AORcvNetwork waitUntilActive runXFTPOperation
where
noWorkToDo = void . atomically $ tryTakeTMVar doWork
runXftpOperation :: m ()
runXftpOperation = do
runXFTPOperation :: m ()
runXFTPOperation = do
rcvFilesTTL <- asks (rcvFilesTTL . config)
nextChunk <- withStore' c $ \db -> getNextRcvChunkToDownload db srv rcvFilesTTL
case nextChunk of
@@ -229,10 +236,10 @@ runXFTPRcvLocalWorker c doWork = do
forever $ do
void . atomically $ readTMVar doWork
-- TODO waitUntilNotSuspended
runXftpOperation
runXFTPOperation
where
runXftpOperation :: m ()
runXftpOperation = do
runXFTPOperation :: m ()
runXFTPOperation = do
rcvFilesTTL <- asks (rcvFilesTTL . config)
nextFile <- withStore' c (`getNextRcvFileToDecrypt` rcvFilesTTL)
case nextFile of
@@ -350,11 +357,12 @@ runXFTPSndPrepareWorker c doWork = do
forever $ do
void . atomically $ readTMVar doWork
-- TODO waitUntilNotSuspended
runXftpOperation
runXFTPOperation
where
runXftpOperation :: m ()
runXftpOperation = do
nextFile <- withStore' c getNextSndFileToPrepare
runXFTPOperation :: m ()
runXFTPOperation = do
sndFilesTTL <- asks (sndFilesTTL . config)
nextFile <- withStore' c (`getNextSndFileToPrepare` sndFilesTTL)
case nextFile of
Nothing -> noWorkToDo
Just f@SndFile {sndFileId, sndFileEntityId, prefixPath} ->
@@ -436,12 +444,13 @@ runXFTPSndWorker c srv doWork = do
forever $ do
void . atomically $ readTMVar doWork
-- TODO waitUntilNotSuspended
agentOperationBracket c AOSndNetwork throwWhenInactive runXftpOperation
agentOperationBracket c AOSndNetwork throwWhenInactive runXFTPOperation
where
noWorkToDo = void . atomically $ tryTakeTMVar doWork
runXftpOperation :: m ()
runXftpOperation = do
nextChunk <- withStore' c $ \db -> getNextSndChunkToUpload db srv
runXFTPOperation :: m ()
runXFTPOperation = do
sndFilesTTL <- asks (sndFilesTTL . config)
nextChunk <- withStore' c $ \db -> getNextSndChunkToUpload db srv sndFilesTTL
case nextChunk of
Nothing -> noWorkToDo
Just SndFileChunk {sndFileId, sndFileEntityId, filePrefixPath, replicas = []} -> sndWorkerInternalError c sndFileId sndFileEntityId (Just filePrefixPath) "chunk has no replicas"

View File

@@ -97,6 +97,7 @@ where
import Control.Concurrent.STM (stateTVar)
import Control.Logger.Simple (logError, logInfo, showText)
import Control.Monad ((<=<))
import Control.Monad.Except
import Control.Monad.IO.Unlift (MonadUnliftIO)
import Control.Monad.Reader
@@ -1596,6 +1597,7 @@ cleanupManager c@AgentClient {subQ} = do
deleteRcvFilesExpired `catchError` (notify "" . RFERR)
deleteRcvFilesDeleted `catchError` (notify "" . RFERR)
deleteRcvFilesTmpPaths `catchError` (notify "" . RFERR)
deleteSndFilesExpired `catchError` (notify "" . SFERR)
liftIO $ threadDelay' int
where
deleteConns =
@@ -1618,6 +1620,12 @@ cleanupManager c@AgentClient {subQ} = do
forM_ rcvTmpPaths $ \(dbId, entId, p) -> flip catchError (notify entId . RFERR) $ do
removePath =<< toFSFilePath p
withStore' c (`updateRcvFileNoTmpPath` dbId)
deleteSndFilesExpired = do
sndFilesTTL <- asks (sndFilesTTL . config)
sndExpired <- withStore' c (`getSndFilesExpired` sndFilesTTL)
forM_ sndExpired $ \(dbId, entId, p) -> flip catchError (notify entId . SFERR) $ do
forM_ p $ removePath <=< toFSFilePath
withStore' c (`deleteSndFile'` dbId)
notify :: forall e. AEntityI e => EntityId -> ACommand 'Agent e -> ExceptT AgentErrorType m ()
notify entId cmd = atomically $ writeTBQueue subQ ("", entId, APC (sAEntity @e) cmd)

View File

@@ -83,6 +83,7 @@ data AgentConfig = AgentConfig
initialCleanupDelay :: Int64,
cleanupInterval :: Int64,
rcvFilesTTL :: NominalDiffTime,
sndFilesTTL :: NominalDiffTime,
xftpNotifyErrsOnRetry :: Bool,
xftpMaxRecipientsPerRequest :: Int,
deleteErrorCount :: Int,
@@ -145,6 +146,7 @@ defaultAgentConfig =
initialCleanupDelay = 30 * 1000000, -- 30 seconds
cleanupInterval = 30 * 60 * 1000000, -- 30 minutes
rcvFilesTTL = 2 * nominalDay,
sndFilesTTL = nominalDay,
xftpNotifyErrsOnRetry = True,
xftpMaxRecipientsPerRequest = 200,
deleteErrorCount = 10,

View File

@@ -158,11 +158,14 @@ module Simplex.Messaging.Agent.Store.SQLite
updateSndFileStatus,
updateSndFileEncrypted,
updateSndFileComplete,
deleteSndFile',
createSndFileReplica,
getNextSndChunkToUpload,
updateSndChunkReplicaDelay,
addSndChunkReplicaRecipients,
updateSndChunkReplicaStatus,
getPendingSndFilesServers,
getSndFilesExpired,
-- * utilities
withConnection,
@@ -2176,8 +2179,9 @@ getChunkReplicaRecipients_ db replicaId =
|]
(Only replicaId)
getNextSndFileToPrepare :: DB.Connection -> IO (Maybe SndFile)
getNextSndFileToPrepare db = do
getNextSndFileToPrepare :: DB.Connection -> NominalDiffTime -> IO (Maybe SndFile)
getNextSndFileToPrepare db ttl = do
cutoffTs <- addUTCTime (- ttl) <$> getCurrentTime
fileId_ :: Maybe DBSndFileId <-
maybeFirstRow fromOnly $
DB.query
@@ -2185,10 +2189,10 @@ getNextSndFileToPrepare db = do
[sql|
SELECT snd_file_id
FROM snd_files
WHERE status IN (?,?,?) AND deleted = 0
WHERE status IN (?,?,?) AND deleted = 0 AND created_at >= ?
ORDER BY created_at ASC LIMIT 1
|]
(SFSNew, SFSEncrypting, SFSEncrypted)
(SFSNew, SFSEncrypting, SFSEncrypted, cutoffTs)
case fileId_ of
Nothing -> pure Nothing
Just fileId -> eitherToMaybe <$> getSndFile db fileId
@@ -2215,6 +2219,10 @@ updateSndFileComplete db sndFileId = do
updatedAt <- getCurrentTime
DB.execute db "UPDATE snd_files SET prefix_path = NULL, status = ?, updated_at = ? WHERE snd_file_id = ?" (SFSComplete, updatedAt, sndFileId)
deleteSndFile' :: DB.Connection -> DBSndFileId -> IO ()
deleteSndFile' db sndFileId =
DB.execute db "DELETE FROM snd_files WHERE snd_file_id = ?" (Only sndFileId)
createSndFileReplica :: DB.Connection -> SndFileChunk -> NewSndChunkReplica -> IO ()
createSndFileReplica db SndFileChunk {sndChunkId} NewSndChunkReplica {server, replicaId, replicaKey, rcvIdsKeys} = do
srvId <- createXFTPServer_ db server
@@ -2237,8 +2245,9 @@ createSndFileReplica db SndFileChunk {sndChunkId} NewSndChunkReplica {server, re
|]
(rId, rcvId, rcvKey)
getNextSndChunkToUpload :: DB.Connection -> XFTPServer -> IO (Maybe SndFileChunk)
getNextSndChunkToUpload db server@ProtocolServer {host, port, keyHash} = do
getNextSndChunkToUpload :: DB.Connection -> XFTPServer -> NominalDiffTime -> IO (Maybe SndFileChunk)
getNextSndChunkToUpload db server@ProtocolServer {host, port, keyHash} ttl = do
cutoffTs <- addUTCTime (- ttl) <$> getCurrentTime
chunk_ <-
maybeFirstRow toChunk $
DB.query
@@ -2254,11 +2263,11 @@ getNextSndChunkToUpload db server@ProtocolServer {host, port, keyHash} = do
JOIN snd_files f ON f.snd_file_id = c.snd_file_id
WHERE s.xftp_host = ? AND s.xftp_port = ? AND s.xftp_key_hash = ?
AND r.replica_status = ? AND r.replica_number = 1
AND (f.status = ? OR f.status = ?) AND f.deleted = 0
AND (f.status = ? OR f.status = ?) AND f.deleted = 0 AND f.created_at >= ?
ORDER BY r.created_at ASC
LIMIT 1
|]
(host, port, keyHash, SFRSCreated, SFSEncrypted, SFSUploading)
(host, port, keyHash, SFRSCreated, SFSEncrypted, SFSUploading, cutoffTs)
forM chunk_ $ \chunk@SndFileChunk {replicas} -> do
replicas' <- forM replicas $ \replica@SndFileChunkReplica {sndChunkReplicaId} -> do
rcvIdsKeys <- getChunkReplicaRecipients_ db sndChunkReplicaId
@@ -2304,3 +2313,36 @@ updateSndChunkReplicaStatus :: DB.Connection -> Int64 -> SndFileReplicaStatus ->
updateSndChunkReplicaStatus db replicaId status = do
updatedAt <- getCurrentTime
DB.execute db "UPDATE snd_file_chunk_replicas SET replica_status = ?, updated_at = ? WHERE snd_file_chunk_replica_id = ?" (status, updatedAt, replicaId)
getPendingSndFilesServers :: DB.Connection -> NominalDiffTime -> IO [XFTPServer]
getPendingSndFilesServers db ttl = do
cutoffTs <- addUTCTime (- ttl) <$> getCurrentTime
map toServer
<$> DB.query
db
[sql|
SELECT DISTINCT
s.xftp_host, s.xftp_port, s.xftp_key_hash
FROM snd_file_chunk_replicas r
JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id
JOIN snd_file_chunks c ON c.snd_file_chunk_id = r.snd_file_chunk_id
JOIN snd_files f ON f.snd_file_id = c.snd_file_id
WHERE r.replica_status = ? AND r.replica_number = 1
AND (f.status = ? OR f.status = ?) AND f.deleted = 0 AND f.created_at >= ?
|]
(SFRSCreated, SFSEncrypted, SFSUploading, cutoffTs)
where
toServer :: (NonEmpty TransportHost, ServiceName, C.KeyHash) -> XFTPServer
toServer (host, port, keyHash) = XFTPServer host port keyHash
getSndFilesExpired :: DB.Connection -> NominalDiffTime -> IO [(DBSndFileId, SndFileId, Maybe FilePath)]
getSndFilesExpired db ttl = do
cutoffTs <- addUTCTime (- ttl) <$> getCurrentTime
DB.query
db
[sql|
SELECT snd_file_id, snd_file_entity_id, prefix_path
FROM snd_files
WHERE created_at < ?
|]
(Only cutoffTs)

View File

@@ -12,6 +12,8 @@ import Control.Monad.Except
import Data.Bifunctor (first)
import qualified Data.ByteString.Char8 as B
import Data.Int (Int64)
import Data.List (find, isSuffixOf)
import Data.Maybe (fromJust)
import SMPAgentClient (agentCfg, initAgentServers, testDB)
import Simplex.FileTransfer.Description
import Simplex.FileTransfer.Protocol (FileParty (..), XFTPErrorType (AUTH))
@@ -21,7 +23,7 @@ import Simplex.Messaging.Agent.Client (ProtocolTestFailure (..), ProtocolTestSte
import Simplex.Messaging.Agent.Protocol (ACommand (..), AgentErrorType (..), BrokerErrorType (..), noAuthSrv)
import Simplex.Messaging.Encoding.String (StrEncoding (..))
import Simplex.Messaging.Protocol (BasicAuth, ProtoServerWithAuth (..), ProtocolServer (..), XFTPServerWithAuth)
import System.Directory (doesDirectoryExist, getFileSize, listDirectory)
import System.Directory (doesDirectoryExist, doesFileExist, getFileSize, listDirectory)
import System.FilePath ((</>))
import System.Timeout (timeout)
import Test.Hspec
@@ -33,6 +35,7 @@ xftpAgentTests = around_ testBracket . describe "Functional API" $ do
it "should send and receive file" testXFTPAgentSendReceive
it "should resume receiving file after restart" testXFTPAgentReceiveRestore
it "should cleanup rcv tmp path after permanent error" testXFTPAgentReceiveCleanup
it "should resume sending file after restart" testXFTPAgentSendRestore
describe "XFTP server test via agent API" $ do
it "should pass without basic auth" $ testXFTPServerTest Nothing (noAuthSrv testXFTPServer2) `shouldReturn` Nothing
let srv1 = testXFTPServer2 {keyHash = "1234"}
@@ -126,7 +129,7 @@ testXFTPAgentReceiveRestore = withGlobalLogging logCfgNoLogs $ do
liftIO $ sfId' `shouldBe` sfId
pure rfd1
-- receive file - should not succeed due to server being down
-- receive file - should not succeed with server down
rcp <- getSMPAgentClient' agentCfg initAgentServers testDB
rfId <- runRight $ do
xftpStartWorkers rcp (Just recipientFiles)
@@ -171,7 +174,7 @@ testXFTPAgentReceiveCleanup = withGlobalLogging logCfgNoLogs $ do
liftIO $ sfId' `shouldBe` sfId
pure rfd1
-- receive file - should not succeed due to server being down
-- receive file - should not succeed with server down
rcp <- getSMPAgentClient' agentCfg initAgentServers testDB
rfId <- runRight $ do
xftpStartWorkers rcp (Just recipientFiles)
@@ -194,6 +197,53 @@ testXFTPAgentReceiveCleanup = withGlobalLogging logCfgNoLogs $ do
-- tmp path should be removed after permanent error
doesDirectoryExist tmpPath `shouldReturn` False
testXFTPAgentSendRestore :: IO ()
testXFTPAgentSendRestore = withGlobalLogging logCfgNoLogs $ do
-- create random file using cli
let filePath = senderFiles </> "testfile"
xftpCLI ["rand", filePath, "17mb"] `shouldReturn` ["File created: " <> filePath]
getFileSize filePath `shouldReturn` mb 17
-- send file - should not succeed with server down
sndr <- getSMPAgentClient' agentCfg initAgentServers testDB
sfId <- runRight $ do
xftpStartWorkers sndr (Just senderFiles)
sfId <- xftpSendFile sndr 1 filePath 2
liftIO $ timeout 1000000 (get sndr) `shouldReturn` Nothing -- wait for worker to encrypt and attempt to create file
pure sfId
disconnectAgentClient sndr
dirEntries <- listDirectory senderFiles
let prefixDir = fromJust $ find (isSuffixOf "_snd.xftp") dirEntries
prefixPath = senderFiles </> prefixDir
encPath = prefixPath </> "xftp.encrypted"
doesDirectoryExist prefixPath `shouldReturn` True
doesFileExist encPath `shouldReturn` True
withXFTPServerStoreLogOn $ \_ -> do
-- send file - should succeed with server up
sndr' <- getSMPAgentClient' agentCfg initAgentServers testDB
runRight_ $ xftpStartWorkers sndr' (Just senderFiles)
sfProgress sndr' $ mb 18
("", sfId', SFDONE _sndDescr [rfd1, _rfd2]) <- sfGet sndr'
liftIO $ sfId' `shouldBe` sfId
-- prefix path should be removed after sending file
doesDirectoryExist prefixPath `shouldReturn` False
doesFileExist encPath `shouldReturn` False
-- receive file
rcp <- getSMPAgentClient' agentCfg initAgentServers testDB
runRight_ $ do
xftpStartWorkers rcp (Just recipientFiles)
rfId <- xftpReceiveFile rcp 1 rfd1
rfProgress rcp $ mb 18
("", rfId', RFDONE path) <- rfGet rcp
liftIO $ do
rfId' `shouldBe` rfId
file <- B.readFile filePath
B.readFile path `shouldReturn` file
testXFTPServerTest :: Maybe BasicAuth -> XFTPServerWithAuth -> IO (Maybe ProtocolTestFailure)
testXFTPServerTest newFileBasicAuth srv =
withXFTPServerCfg testXFTPServerConfig {newFileBasicAuth, xftpPort = xftpTestPort2} $ \_ -> do