mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-16 20:15:10 +00:00
xftp: file workers cycle through pending files based on retries count (limit number of iterations per work item to prevent stuck delivery) (#930)
* xftp: test file reception - shouldn't get stuck if file is deleted on server * comment * expiration test * approach * wip * sort by retries in other works, revert some diff * revert diff * modify tests * refactor * refactor * remove prints * apply to other workers * remove import * comment * refactor * revert queue size * fix test * rename * comment, correct number of retries --------- Co-authored-by: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com>
This commit is contained in:
@@ -69,26 +69,24 @@ startXFTPWorkers :: AgentMonad m => AgentClient -> Maybe FilePath -> m ()
|
||||
startXFTPWorkers c workDir = do
|
||||
wd <- asks $ xftpWorkDir . xftpAgent
|
||||
atomically $ writeTVar wd workDir
|
||||
startRcvFiles
|
||||
startSndFiles
|
||||
startDelFiles
|
||||
cfg <- asks config
|
||||
startRcvFiles cfg
|
||||
startSndFiles cfg
|
||||
startDelFiles cfg
|
||||
where
|
||||
startRcvFiles = do
|
||||
rcvFilesTTL <- asks $ rcvFilesTTL . config
|
||||
startRcvFiles AgentConfig {rcvFilesTTL} = do
|
||||
pendingRcvServers <- withStore' c (`getPendingRcvFilesServers` rcvFilesTTL)
|
||||
forM_ pendingRcvServers $ \s -> addXFTPRcvWorker c (Just s)
|
||||
-- start local worker for files pending decryption,
|
||||
-- no need to make an extra query for the check
|
||||
-- as the worker will check the store anyway
|
||||
addXFTPRcvWorker c Nothing
|
||||
startSndFiles = do
|
||||
sndFilesTTL <- asks $ sndFilesTTL . config
|
||||
startSndFiles AgentConfig {sndFilesTTL} = do
|
||||
-- start worker for files pending encryption/creation
|
||||
addXFTPSndWorker c Nothing
|
||||
pendingSndServers <- withStore' c (`getPendingSndFilesServers` sndFilesTTL)
|
||||
forM_ pendingSndServers $ \s -> addXFTPSndWorker c (Just s)
|
||||
startDelFiles = do
|
||||
rcvFilesTTL <- asks $ rcvFilesTTL . config
|
||||
startDelFiles AgentConfig {rcvFilesTTL} = do
|
||||
pendingDelServers <- withStore' c (`getPendingDelFilesServers` rcvFilesTTL)
|
||||
forM_ pendingDelServers $ addXFTPDelWorker c
|
||||
|
||||
@@ -159,26 +157,24 @@ addWorker c wsSel runWorker runWorkerNoSrv srv_ = do
|
||||
|
||||
runXFTPRcvWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> TMVar () -> m ()
|
||||
runXFTPRcvWorker c srv doWork = do
|
||||
cfg <- asks config
|
||||
forever $ do
|
||||
waitForWork doWork
|
||||
atomically $ assertAgentForeground c
|
||||
runXFTPOperation
|
||||
runXFTPOperation cfg
|
||||
where
|
||||
runXFTPOperation :: m ()
|
||||
runXFTPOperation = do
|
||||
rcvFilesTTL <- asks (rcvFilesTTL . config)
|
||||
runXFTPOperation :: AgentConfig -> m ()
|
||||
runXFTPOperation AgentConfig {rcvFilesTTL, reconnectInterval = ri, xftpNotifyErrsOnRetry = notifyOnRetry, xftpConsecutiveRetries} =
|
||||
withWork c doWork (\db -> getNextRcvChunkToDownload db srv rcvFilesTTL) $ \case
|
||||
RcvFileChunk {rcvFileId, rcvFileEntityId, fileTmpPath, replicas = []} -> rcvWorkerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) "chunk has no replicas"
|
||||
fc@RcvFileChunk {userId, rcvFileId, rcvFileEntityId, digest, fileTmpPath, replicas = replica@RcvFileChunkReplica {rcvChunkReplicaId, server, delay} : _} -> do
|
||||
ri <- asks $ reconnectInterval . config
|
||||
let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay
|
||||
withRetryInterval ri' $ \delay' loop ->
|
||||
withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop ->
|
||||
downloadFileChunk fc replica
|
||||
`catchAgentError` \e -> retryOnError "XFTP rcv worker" (retryLoop loop e delay') (retryDone e) e
|
||||
where
|
||||
retryLoop loop e replicaDelay = do
|
||||
flip catchAgentError (\_ -> pure ()) $ do
|
||||
notifyOnRetry <- asks (xftpNotifyErrsOnRetry . config)
|
||||
when notifyOnRetry $ notify c rcvFileEntityId $ RFERR e
|
||||
closeXFTPServerClient c userId server digest
|
||||
withStore' c $ \db -> updateRcvChunkReplicaDelay db rcvChunkReplicaId replicaDelay
|
||||
@@ -210,6 +206,12 @@ runXFTPRcvWorker c srv doWork = do
|
||||
| otherwise = 0
|
||||
chunkReceived RcvFileChunk {replicas} = any received replicas
|
||||
|
||||
-- The first call of action has n == 0, maxN is max number of retries
|
||||
withRetryIntervalLimit :: forall m. MonadIO m => Int -> RetryInterval -> (Int64 -> m () -> m ()) -> m ()
|
||||
withRetryIntervalLimit maxN ri action =
|
||||
withRetryIntervalCount ri $ \n delay loop ->
|
||||
when (n < maxN) $ action delay loop
|
||||
|
||||
retryOnError :: AgentMonad m => Text -> m a -> m a -> AgentErrorType -> m a
|
||||
retryOnError name loop done e = do
|
||||
logError $ name <> " error: " <> tshow e
|
||||
@@ -225,14 +227,14 @@ rcvWorkerInternalError c rcvFileId rcvFileEntityId tmpPath internalErrStr = do
|
||||
|
||||
runXFTPRcvLocalWorker :: forall m. AgentMonad m => AgentClient -> TMVar () -> m ()
|
||||
runXFTPRcvLocalWorker c doWork = do
|
||||
cfg <- asks config
|
||||
forever $ do
|
||||
waitForWork doWork
|
||||
atomically $ assertAgentForeground c
|
||||
runXFTPOperation
|
||||
runXFTPOperation cfg
|
||||
where
|
||||
runXFTPOperation :: m ()
|
||||
runXFTPOperation = do
|
||||
rcvFilesTTL <- asks (rcvFilesTTL . config)
|
||||
runXFTPOperation :: AgentConfig -> m ()
|
||||
runXFTPOperation AgentConfig {rcvFilesTTL} =
|
||||
withWork c doWork (`getNextRcvFileToDecrypt` rcvFilesTTL) $
|
||||
\f@RcvFile {rcvFileId, rcvFileEntityId, tmpPath} ->
|
||||
decryptFile f `catchAgentError` (rcvWorkerInternalError c rcvFileId rcvFileEntityId tmpPath . show)
|
||||
@@ -291,21 +293,21 @@ addXFTPSndWorker c = addWorker c xftpSndWorkers runXFTPSndWorker runXFTPSndPrepa
|
||||
|
||||
runXFTPSndPrepareWorker :: forall m. AgentMonad m => AgentClient -> TMVar () -> m ()
|
||||
runXFTPSndPrepareWorker c doWork = do
|
||||
cfg <- asks config
|
||||
forever $ do
|
||||
waitForWork doWork
|
||||
atomically $ assertAgentForeground c
|
||||
runXFTPOperation
|
||||
runXFTPOperation cfg
|
||||
where
|
||||
runXFTPOperation :: m ()
|
||||
runXFTPOperation = do
|
||||
sndFilesTTL <- asks (sndFilesTTL . config)
|
||||
runXFTPOperation :: AgentConfig -> m ()
|
||||
runXFTPOperation cfg@AgentConfig {sndFilesTTL} =
|
||||
withWork c doWork (`getNextSndFileToPrepare` sndFilesTTL) $
|
||||
\f@SndFile {sndFileId, sndFileEntityId, prefixPath} ->
|
||||
prepareFile f `catchAgentError` (sndWorkerInternalError c sndFileId sndFileEntityId prefixPath . show)
|
||||
prepareFile :: SndFile -> m ()
|
||||
prepareFile SndFile {prefixPath = Nothing} =
|
||||
prepareFile cfg f `catchAgentError` (sndWorkerInternalError c sndFileId sndFileEntityId prefixPath . show)
|
||||
prepareFile :: AgentConfig -> SndFile -> m ()
|
||||
prepareFile _ SndFile {prefixPath = Nothing} =
|
||||
throwError $ INTERNAL "no prefix path"
|
||||
prepareFile sndFile@SndFile {sndFileId, userId, prefixPath = Just ppath, status} = do
|
||||
prepareFile cfg sndFile@SndFile {sndFileId, userId, prefixPath = Just ppath, status} = do
|
||||
SndFile {numRecipients, chunks} <-
|
||||
if status /= SFSEncrypted -- status is SFSNew or SFSEncrypting
|
||||
then do
|
||||
@@ -318,12 +320,13 @@ runXFTPSndPrepareWorker c doWork = do
|
||||
updateSndFileEncrypted db sndFileId digest chunkSpecsDigests
|
||||
getSndFile db sndFileId
|
||||
else pure sndFile
|
||||
maxRecipients <- asks (xftpMaxRecipientsPerRequest . config)
|
||||
let numRecipients' = min numRecipients maxRecipients
|
||||
-- concurrently?
|
||||
-- separate worker to create chunks? record retries and delay on snd_file_chunks?
|
||||
forM_ (filter (not . chunkCreated) chunks) $ createChunk numRecipients'
|
||||
withStore' c $ \db -> updateSndFileStatus db sndFileId SFSUploading
|
||||
where
|
||||
AgentConfig {xftpMaxRecipientsPerRequest = maxRecipients, messageRetryInterval = ri} = cfg
|
||||
encryptFileForUpload :: SndFile -> FilePath -> m (FileDigest, [(XFTPChunkSpec, FileDigest)])
|
||||
encryptFileForUpload SndFile {key, nonce, srcFile} fsEncPath = do
|
||||
let CryptoFile {filePath} = srcFile
|
||||
@@ -351,7 +354,6 @@ runXFTPSndPrepareWorker c doWork = do
|
||||
addXFTPSndWorker c $ Just srv
|
||||
where
|
||||
tryCreate = do
|
||||
ri <- asks $ messageRetryInterval . config
|
||||
usedSrvs <- newTVarIO ([] :: [XFTPServer])
|
||||
withRetryInterval (riFast ri) $ \_ loop ->
|
||||
createWithNextSrv usedSrvs
|
||||
@@ -373,34 +375,32 @@ sndWorkerInternalError c sndFileId sndFileEntityId prefixPath internalErrStr = d
|
||||
|
||||
runXFTPSndWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> TMVar () -> m ()
|
||||
runXFTPSndWorker c srv doWork = do
|
||||
cfg <- asks config
|
||||
forever $ do
|
||||
waitForWork doWork
|
||||
atomically $ assertAgentForeground c
|
||||
runXFTPOperation
|
||||
runXFTPOperation cfg
|
||||
where
|
||||
runXFTPOperation :: m ()
|
||||
runXFTPOperation = do
|
||||
sndFilesTTL <- asks (sndFilesTTL . config)
|
||||
runXFTPOperation :: AgentConfig -> m ()
|
||||
runXFTPOperation cfg@AgentConfig {sndFilesTTL, reconnectInterval = ri, xftpNotifyErrsOnRetry = notifyOnRetry, xftpConsecutiveRetries} = do
|
||||
withWork c doWork (\db -> getNextSndChunkToUpload db srv sndFilesTTL) $ \case
|
||||
SndFileChunk {sndFileId, sndFileEntityId, filePrefixPath, replicas = []} -> sndWorkerInternalError c sndFileId sndFileEntityId (Just filePrefixPath) "chunk has no replicas"
|
||||
fc@SndFileChunk {userId, sndFileId, sndFileEntityId, filePrefixPath, digest, replicas = replica@SndFileChunkReplica {sndChunkReplicaId, server, delay} : _} -> do
|
||||
ri <- asks $ reconnectInterval . config
|
||||
let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay
|
||||
withRetryInterval ri' $ \delay' loop ->
|
||||
uploadFileChunk fc replica
|
||||
withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop ->
|
||||
uploadFileChunk cfg fc replica
|
||||
`catchAgentError` \e -> retryOnError "XFTP snd worker" (retryLoop loop e delay') (retryDone e) e
|
||||
where
|
||||
retryLoop loop e replicaDelay = do
|
||||
flip catchAgentError (\_ -> pure ()) $ do
|
||||
notifyOnRetry <- asks (xftpNotifyErrsOnRetry . config)
|
||||
when notifyOnRetry $ notify c sndFileEntityId $ SFERR e
|
||||
closeXFTPServerClient c userId server digest
|
||||
withStore' c $ \db -> updateSndChunkReplicaDelay db sndChunkReplicaId replicaDelay
|
||||
atomically $ assertAgentForeground c
|
||||
loop
|
||||
retryDone e = sndWorkerInternalError c sndFileId sndFileEntityId (Just filePrefixPath) (show e)
|
||||
uploadFileChunk :: SndFileChunk -> SndFileChunkReplica -> m ()
|
||||
uploadFileChunk sndFileChunk@SndFileChunk {sndFileId, userId, chunkSpec = chunkSpec@XFTPChunkSpec {filePath}, digest = chunkDigest} replica = do
|
||||
uploadFileChunk :: AgentConfig -> SndFileChunk -> SndFileChunkReplica -> m ()
|
||||
uploadFileChunk AgentConfig {xftpMaxRecipientsPerRequest = maxRecipients} sndFileChunk@SndFileChunk {sndFileId, userId, chunkSpec = chunkSpec@XFTPChunkSpec {filePath}, digest = chunkDigest} replica = do
|
||||
replica'@SndFileChunkReplica {sndChunkReplicaId} <- addRecipients sndFileChunk replica
|
||||
fsFilePath <- toFSFilePath filePath
|
||||
unlessM (doesFileExist fsFilePath) $ throwError $ INTERNAL "encrypted file doesn't exist on upload"
|
||||
@@ -426,7 +426,6 @@ runXFTPSndWorker c srv doWork = do
|
||||
| length rcvIdsKeys > numRecipients = throwError $ INTERNAL "too many recipients"
|
||||
| length rcvIdsKeys == numRecipients = pure cr
|
||||
| otherwise = do
|
||||
maxRecipients <- asks $ xftpMaxRecipientsPerRequest . config
|
||||
let numRecipients' = min (numRecipients - length rcvIdsKeys) maxRecipients
|
||||
rcvIdsKeys' <- agentXFTPAddRecipients c userId chunkDigest cr numRecipients'
|
||||
cr' <- withStore' c $ \db -> addSndChunkReplicaRecipients db cr $ L.toList rcvIdsKeys'
|
||||
@@ -529,27 +528,25 @@ addXFTPDelWorker c srv = do
|
||||
|
||||
runXFTPDelWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> TMVar () -> m ()
|
||||
runXFTPDelWorker c srv doWork = do
|
||||
cfg <- asks config
|
||||
forever $ do
|
||||
waitForWork doWork
|
||||
atomically $ assertAgentForeground c
|
||||
runXFTPOperation
|
||||
runXFTPOperation cfg
|
||||
where
|
||||
runXFTPOperation :: m ()
|
||||
runXFTPOperation = do
|
||||
runXFTPOperation :: AgentConfig -> m ()
|
||||
runXFTPOperation AgentConfig {rcvFilesTTL, reconnectInterval = ri, xftpNotifyErrsOnRetry = notifyOnRetry, xftpConsecutiveRetries} = do
|
||||
-- no point in deleting files older than rcv ttl, as they will be expired on server
|
||||
rcvFilesTTL <- asks (rcvFilesTTL . config)
|
||||
withWork c doWork (\db -> getNextDeletedSndChunkReplica db srv rcvFilesTTL) processDeletedReplica
|
||||
where
|
||||
processDeletedReplica replica@DeletedSndChunkReplica {deletedSndChunkReplicaId, userId, server, chunkDigest, delay} = do
|
||||
ri <- asks $ reconnectInterval . config
|
||||
let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay
|
||||
withRetryInterval ri' $ \delay' loop ->
|
||||
withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop ->
|
||||
deleteChunkReplica
|
||||
`catchAgentError` \e -> retryOnError "XFTP del worker" (retryLoop loop e delay') (retryDone e) e
|
||||
where
|
||||
retryLoop loop e replicaDelay = do
|
||||
flip catchAgentError (\_ -> pure ()) $ do
|
||||
notifyOnRetry <- asks (xftpNotifyErrsOnRetry . config)
|
||||
when notifyOnRetry $ notify c "" $ SFERR e
|
||||
closeXFTPServerClient c userId server chunkDigest
|
||||
withStore' c $ \db -> updateDeletedSndChunkReplicaDelay db deletedSndChunkReplicaId replicaDelay
|
||||
|
||||
@@ -93,6 +93,7 @@ data AgentConfig = AgentConfig
|
||||
rcvFilesTTL :: NominalDiffTime,
|
||||
sndFilesTTL :: NominalDiffTime,
|
||||
xftpNotifyErrsOnRetry :: Bool,
|
||||
xftpConsecutiveRetries :: Int,
|
||||
xftpMaxRecipientsPerRequest :: Int,
|
||||
deleteErrorCount :: Int,
|
||||
ntfCron :: Word16,
|
||||
@@ -158,6 +159,7 @@ defaultAgentConfig =
|
||||
rcvFilesTTL = 2 * nominalDay,
|
||||
sndFilesTTL = nominalDay,
|
||||
xftpNotifyErrsOnRetry = True,
|
||||
xftpConsecutiveRetries = 3,
|
||||
xftpMaxRecipientsPerRequest = 200,
|
||||
deleteErrorCount = 10,
|
||||
ntfCron = 20, -- minutes
|
||||
|
||||
@@ -8,6 +8,7 @@ module Simplex.Messaging.Agent.RetryInterval
|
||||
RetryIntervalMode (..),
|
||||
RI2State (..),
|
||||
withRetryInterval,
|
||||
withRetryIntervalCount,
|
||||
withRetryLock2,
|
||||
updateRetryInterval2,
|
||||
)
|
||||
@@ -48,15 +49,18 @@ data RetryIntervalMode = RISlow | RIFast
|
||||
deriving (Eq, Show)
|
||||
|
||||
withRetryInterval :: forall m a. MonadIO m => RetryInterval -> (Int64 -> m a -> m a) -> m a
|
||||
withRetryInterval ri action = callAction 0 $ initialInterval ri
|
||||
withRetryInterval ri = withRetryIntervalCount ri . const
|
||||
|
||||
withRetryIntervalCount :: forall m a. MonadIO m => RetryInterval -> (Int -> Int64 -> m a -> m a) -> m a
|
||||
withRetryIntervalCount ri action = callAction 0 0 $ initialInterval ri
|
||||
where
|
||||
callAction :: Int64 -> Int64 -> m a
|
||||
callAction elapsed delay = action delay loop
|
||||
callAction :: Int -> Int64 -> Int64 -> m a
|
||||
callAction n elapsed delay = action n delay loop
|
||||
where
|
||||
loop = do
|
||||
liftIO $ threadDelay' delay
|
||||
let elapsed' = elapsed + delay
|
||||
callAction elapsed' $ nextDelay elapsed' delay ri
|
||||
callAction (n + 1) elapsed' $ nextDelay elapsed' delay ri
|
||||
|
||||
-- This function allows action to toggle between slow and fast retry intervals.
|
||||
withRetryLock2 :: forall m. MonadIO m => RetryInterval2 -> TMVar () -> (RI2State -> (RetryIntervalMode -> m ()) -> m ()) -> m ()
|
||||
|
||||
@@ -2400,7 +2400,7 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} ttl = d
|
||||
AND r.received = 0 AND r.replica_number = 1
|
||||
AND f.status = ? AND f.deleted = 0 AND f.created_at >= ?
|
||||
AND f.failed = 0
|
||||
ORDER BY r.created_at ASC
|
||||
ORDER BY r.retries ASC, r.created_at ASC
|
||||
LIMIT 1
|
||||
|]
|
||||
(host, port, keyHash, RFSReceiving, cutoffTs)
|
||||
@@ -2718,7 +2718,7 @@ getNextSndChunkToUpload db server@ProtocolServer {host, port, keyHash} ttl = do
|
||||
AND r.replica_status = ? AND r.replica_number = 1
|
||||
AND (f.status = ? OR f.status = ?) AND f.deleted = 0 AND f.created_at >= ?
|
||||
AND f.failed = 0
|
||||
ORDER BY r.created_at ASC
|
||||
ORDER BY r.retries ASC, r.created_at ASC
|
||||
LIMIT 1
|
||||
|]
|
||||
(host, port, keyHash, SFRSCreated, SFSEncrypted, SFSUploading, cutoffTs)
|
||||
@@ -2882,7 +2882,8 @@ getNextDeletedSndChunkReplica db ProtocolServer {host, port, keyHash} ttl =
|
||||
WHERE s.xftp_host = ? AND s.xftp_port = ? AND s.xftp_key_hash = ?
|
||||
AND r.created_at >= ?
|
||||
AND failed = 0
|
||||
ORDER BY r.created_at ASC LIMIT 1
|
||||
ORDER BY r.retries ASC, r.created_at ASC
|
||||
LIMIT 1
|
||||
|]
|
||||
(host, port, keyHash, cutoffTs)
|
||||
markReplicaFailed :: Int64 -> IO ()
|
||||
|
||||
@@ -764,7 +764,7 @@ testGetNextDeletedSndChunkReplica st = do
|
||||
Right Nothing <- getNextDeletedSndChunkReplica db xftpServer1 86400
|
||||
|
||||
createDeletedSndChunkReplica db 1 (FileChunkReplica xftpServer1 (ChunkReplicaId "abc") testFileReplicaKey) (FileDigest "ghi")
|
||||
DB.execute_ db "UPDATE deleted_snd_chunk_replicas SET retries = 'bad' WHERE deleted_snd_chunk_replica_id = 1"
|
||||
DB.execute_ db "UPDATE deleted_snd_chunk_replicas SET delay = 'bad' WHERE deleted_snd_chunk_replica_id = 1"
|
||||
createDeletedSndChunkReplica db 1 (FileChunkReplica xftpServer1 (ChunkReplicaId "abc") testFileReplicaKey) (FileDigest "ghi")
|
||||
|
||||
Left e <- getNextDeletedSndChunkReplica db xftpServer1 86400
|
||||
|
||||
+94
-3
@@ -30,7 +30,8 @@ import Simplex.Messaging.Crypto.File (CryptoFile (..), CryptoFileArgs)
|
||||
import qualified Simplex.Messaging.Crypto.File as CF
|
||||
import Simplex.Messaging.Encoding.String (StrEncoding (..))
|
||||
import Simplex.Messaging.Protocol (BasicAuth, ProtoServerWithAuth (..), ProtocolServer (..), XFTPServerWithAuth)
|
||||
import System.Directory (doesDirectoryExist, doesFileExist, getFileSize, listDirectory)
|
||||
import Simplex.Messaging.Server.Expiration (ExpirationConfig (..))
|
||||
import System.Directory (doesDirectoryExist, doesFileExist, getFileSize, listDirectory, removeFile)
|
||||
import System.FilePath ((</>))
|
||||
import System.Timeout (timeout)
|
||||
import Test.Hspec
|
||||
@@ -47,6 +48,9 @@ xftpAgentTests = around_ testBracket . describe "agent XFTP API" $ do
|
||||
it "should cleanup snd prefix path after permanent error" testXFTPAgentSendCleanup
|
||||
it "should delete sent file on server" testXFTPAgentDelete
|
||||
it "should resume deleting file after restart" testXFTPAgentDeleteRestore
|
||||
-- TODO when server is fixed to correctly send AUTH error, this test has to be modified to expect AUTH error
|
||||
it "if file is deleted on server, should limit retries and continue receiving next file" testXFTPAgentDeleteOnServer
|
||||
it "if file is expired on server, should report error and continue receiving next file" testXFTPAgentExpiredOnServer
|
||||
it "should request additional recipient IDs when number of recipients exceeds maximum per request" testXFTPAgentRequestAdditionalRecipientIDs
|
||||
describe "XFTP server test via agent API" $ do
|
||||
it "should pass without basic auth" $ testXFTPServerTest Nothing (noAuthSrv testXFTPServer2) `shouldReturn` Nothing
|
||||
@@ -132,8 +136,11 @@ testXFTPAgentSendReceiveEncrypted = withXFTPServer $ do
|
||||
disconnectAgentClient rcp
|
||||
|
||||
createRandomFile :: HasCallStack => IO FilePath
|
||||
createRandomFile = do
|
||||
let filePath = senderFiles </> "testfile"
|
||||
createRandomFile = createRandomFile' "testfile"
|
||||
|
||||
createRandomFile' :: HasCallStack => FilePath -> IO FilePath
|
||||
createRandomFile' fileName = do
|
||||
let filePath = senderFiles </> fileName
|
||||
xftpCLI ["rand", filePath, "17mb"] `shouldReturn` ["File created: " <> filePath]
|
||||
getFileSize filePath `shouldReturn` mb 17
|
||||
pure filePath
|
||||
@@ -156,6 +163,13 @@ testReceive rcp rfd = testReceiveCF rcp rfd Nothing
|
||||
testReceiveCF :: HasCallStack => AgentClient -> ValidFileDescription 'FRecipient -> Maybe CryptoFileArgs -> FilePath -> ExceptT AgentErrorType IO RcvFileId
|
||||
testReceiveCF rcp rfd cfArgs originalFilePath = do
|
||||
xftpStartWorkers rcp (Just recipientFiles)
|
||||
testReceiveCF' rcp rfd cfArgs originalFilePath
|
||||
|
||||
testReceive' :: HasCallStack => AgentClient -> ValidFileDescription 'FRecipient -> FilePath -> ExceptT AgentErrorType IO RcvFileId
|
||||
testReceive' rcp rfd = testReceiveCF' rcp rfd Nothing
|
||||
|
||||
testReceiveCF' :: HasCallStack => AgentClient -> ValidFileDescription 'FRecipient -> Maybe CryptoFileArgs -> FilePath -> ExceptT AgentErrorType IO RcvFileId
|
||||
testReceiveCF' rcp rfd cfArgs originalFilePath = do
|
||||
rfId <- xftpReceiveFile rcp 1 rfd cfArgs
|
||||
rfProgress rcp $ mb 18
|
||||
("", rfId', RFDONE path) <- rfGet rcp
|
||||
@@ -413,6 +427,83 @@ testXFTPAgentDeleteRestore = withGlobalLogging logCfgNoLogs $ do
|
||||
("", rfId', RFERR (INTERNAL "XFTP {xftpErr = AUTH}")) <- rfGet rcp2
|
||||
liftIO $ rfId' `shouldBe` rfId
|
||||
|
||||
testXFTPAgentDeleteOnServer :: HasCallStack => IO ()
|
||||
testXFTPAgentDeleteOnServer = withGlobalLogging logCfgNoLogs $
|
||||
withXFTPServer $ do
|
||||
filePath1 <- createRandomFile' "testfile1"
|
||||
|
||||
-- send file 1
|
||||
sndr <- getSMPAgentClient' agentCfg initAgentServers testDB
|
||||
(_, _, rfd1_1, rfd1_2) <- runRight $ testSend sndr filePath1
|
||||
|
||||
-- receive file 1 successfully
|
||||
rcp <- getSMPAgentClient' agentCfg initAgentServers testDB2
|
||||
runRight_ . void $
|
||||
testReceive rcp rfd1_1 filePath1
|
||||
|
||||
serverFiles <- listDirectory xftpServerFiles
|
||||
length serverFiles `shouldBe` 6
|
||||
|
||||
-- delete file 1 on server from file system
|
||||
forM_ serverFiles (\file -> removeFile (xftpServerFiles </> file))
|
||||
|
||||
threadDelay 1000000
|
||||
length <$> listDirectory xftpServerFiles `shouldReturn` 0
|
||||
|
||||
-- create and send file 2
|
||||
filePath2 <- createRandomFile' "testfile2"
|
||||
(_, _, rfd2, _) <- runRight $ testSend sndr filePath2
|
||||
|
||||
length <$> listDirectory xftpServerFiles `shouldReturn` 6
|
||||
|
||||
runRight_ . void $ do
|
||||
-- receive file 1 again
|
||||
-- TODO should fail with AUTH error
|
||||
_rfId1 <- xftpReceiveFile rcp 1 rfd1_2 Nothing
|
||||
|
||||
-- receive file 2
|
||||
testReceive' rcp rfd2 filePath2
|
||||
|
||||
testXFTPAgentExpiredOnServer :: HasCallStack => IO ()
|
||||
testXFTPAgentExpiredOnServer = withGlobalLogging logCfgNoLogs $ do
|
||||
let fastExpiration = ExpirationConfig {ttl = 2, checkInterval = 1}
|
||||
withXFTPServerCfg testXFTPServerConfig {fileExpiration = Just fastExpiration} . const $ do
|
||||
filePath1 <- createRandomFile' "testfile1"
|
||||
|
||||
-- send file 1
|
||||
sndr <- getSMPAgentClient' agentCfg initAgentServers testDB
|
||||
(_, _, rfd1_1, rfd1_2) <- runRight $ testSend sndr filePath1
|
||||
|
||||
-- receive file 1 successfully
|
||||
rcp <- getSMPAgentClient' agentCfg initAgentServers testDB2
|
||||
runRight_ . void $
|
||||
testReceive rcp rfd1_1 filePath1
|
||||
|
||||
serverFiles <- listDirectory xftpServerFiles
|
||||
length serverFiles `shouldBe` 6
|
||||
|
||||
-- wait until file 1 expires on server
|
||||
forM_ serverFiles (\file -> removeFile (xftpServerFiles </> file))
|
||||
|
||||
threadDelay 3500000
|
||||
length <$> listDirectory xftpServerFiles `shouldReturn` 0
|
||||
|
||||
-- receive file 1 again - should fail with AUTH error
|
||||
runRight $ do
|
||||
rfId <- xftpReceiveFile rcp 1 rfd1_2 Nothing
|
||||
("", rfId', RFERR (INTERNAL "XFTP {xftpErr = AUTH}")) <- rfGet rcp
|
||||
liftIO $ rfId' `shouldBe` rfId
|
||||
|
||||
-- create and send file 2
|
||||
filePath2 <- createRandomFile' "testfile2"
|
||||
(_, _, rfd2, _) <- runRight $ testSend sndr filePath2
|
||||
|
||||
length <$> listDirectory xftpServerFiles `shouldReturn` 6
|
||||
|
||||
-- receive file 2 successfully
|
||||
runRight_ . void $
|
||||
testReceive' rcp rfd2 filePath2
|
||||
|
||||
testXFTPAgentRequestAdditionalRecipientIDs :: HasCallStack => IO ()
|
||||
testXFTPAgentRequestAdditionalRecipientIDs = withXFTPServer $ do
|
||||
filePath <- createRandomFile
|
||||
|
||||
Reference in New Issue
Block a user