mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-20 21:45:27 +00:00
test with failing files (in progress)
This commit is contained in:
@@ -228,6 +228,7 @@ withRetryIntervalLimit maxN ri action =
|
||||
|
||||
retryOnError :: Text -> AM a -> AM a -> AgentErrorType -> AM a
|
||||
retryOnError name loop done e = do
|
||||
liftIO $ print $ "error: " <> show e
|
||||
logError $ name <> " error: " <> tshow e
|
||||
if temporaryAgentError e
|
||||
then loop
|
||||
@@ -455,7 +456,8 @@ runXFTPSndWorker c srv Worker {doWork} = do
|
||||
runXFTPOperation cfg@AgentConfig {sndFilesTTL, reconnectInterval = ri, xftpNotifyErrsOnRetry = notifyOnRetry, xftpConsecutiveRetries} = do
|
||||
withWork c doWork (\db -> getNextSndChunkToUpload db srv sndFilesTTL) $ \case
|
||||
SndFileChunk {sndFileId, sndFileEntityId, filePrefixPath, replicas = []} -> sndWorkerInternalError c sndFileId sndFileEntityId (Just filePrefixPath) "chunk has no replicas"
|
||||
fc@SndFileChunk {userId, sndFileId, sndFileEntityId, filePrefixPath, digest, replicas = replica@SndFileChunkReplica {sndChunkReplicaId, server, delay} : _} -> do
|
||||
fc@SndFileChunk {userId, chunkSpec = XFTPChunkSpec {chunkOffset}, sndFileId, sndFileEntityId, filePrefixPath, digest, replicas = replica@SndFileChunkReplica {sndChunkReplicaId, server, delay} : _} -> do
|
||||
liftIO $ putStrLn $ "chunkOffset: " <> show chunkOffset
|
||||
let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay
|
||||
withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop ->
|
||||
uploadFileChunk cfg fc replica
|
||||
@@ -470,7 +472,8 @@ runXFTPSndWorker c srv Worker {doWork} = do
|
||||
loop
|
||||
retryDone e = sndWorkerInternalError c sndFileId sndFileEntityId (Just filePrefixPath) (show e)
|
||||
uploadFileChunk :: AgentConfig -> SndFileChunk -> SndFileChunkReplica -> AM ()
|
||||
uploadFileChunk AgentConfig {xftpMaxRecipientsPerRequest = maxRecipients} sndFileChunk@SndFileChunk {sndFileId, userId, chunkSpec = chunkSpec@XFTPChunkSpec {filePath}, digest = chunkDigest} replica = do
|
||||
uploadFileChunk AgentConfig {xftpMaxRecipientsPerRequest = maxRecipients} sndFileChunk@SndFileChunk {sndFileId, userId, chunkSpec = chunkSpec@XFTPChunkSpec {filePath, chunkOffset}, digest = chunkDigest} replica = do
|
||||
liftIO $ putStrLn $ "uploadFileChunk: " <> show chunkOffset
|
||||
replica'@SndFileChunkReplica {sndChunkReplicaId} <- addRecipients sndFileChunk replica
|
||||
fsFilePath <- lift $ toFSFilePath filePath
|
||||
unlessM (doesFileExist fsFilePath) $ throwError $ INTERNAL "encrypted file doesn't exist on upload"
|
||||
@@ -478,13 +481,16 @@ runXFTPSndWorker c srv Worker {doWork} = do
|
||||
atomically $ assertAgentForeground c
|
||||
agentXFTPUploadChunk c userId chunkDigest replica' chunkSpec'
|
||||
atomically $ waitUntilForeground c
|
||||
-- liftIO $ putStrLn $ "uploaded: " <> show chunkOffset
|
||||
sf@SndFile {sndFileEntityId, prefixPath, chunks} <- withStore c $ \db -> do
|
||||
updateSndChunkReplicaStatus db sndChunkReplicaId SFRSUploaded
|
||||
getSndFile db sndFileId
|
||||
let uploaded = uploadedSize chunks
|
||||
total = totalSize chunks
|
||||
complete = all chunkUploaded chunks
|
||||
liftIO $ putStrLn $ "uploaded: " <> show chunkOffset <> " size: " <> show uploaded
|
||||
notify c sndFileEntityId $ SFPROG uploaded total
|
||||
-- liftIO $ putStrLn $ "notified: " <> show chunkOffset <> " size: " <> show uploaded
|
||||
when complete $ do
|
||||
(sndDescr, rcvDescrs) <- sndFileToDescrs sf
|
||||
notify c sndFileEntityId $ SFDONE sndDescr rcvDescrs
|
||||
|
||||
+9
-3
@@ -24,7 +24,7 @@ import Simplex.FileTransfer.Description (FileDescription (..), FileDescriptionUR
|
||||
import Simplex.FileTransfer.Protocol (FileParty (..))
|
||||
import Simplex.FileTransfer.Transport (XFTPErrorType (AUTH))
|
||||
import Simplex.FileTransfer.Server.Env (XFTPServerConfig (..))
|
||||
import Simplex.Messaging.Agent (AgentClient, disposeAgentClient, testProtocolServer, xftpDeleteRcvFile, xftpDeleteSndFileInternal, xftpDeleteSndFileRemote, xftpReceiveFile, xftpSendDescription, xftpSendFile, xftpStartWorkers)
|
||||
import Simplex.Messaging.Agent (AgentClient, execAgentStoreSQL, disposeAgentClient, testProtocolServer, xftpDeleteRcvFile, xftpDeleteSndFileInternal, xftpDeleteSndFileRemote, xftpReceiveFile, xftpSendDescription, xftpSendFile, xftpStartWorkers)
|
||||
import Simplex.Messaging.Agent.Client (ProtocolTestFailure (..), ProtocolTestStep (..))
|
||||
import Simplex.Messaging.Agent.Protocol (ACommand (..), AgentErrorType (..), BrokerErrorType (..), RcvFileId, SndFileId, noAuthSrv)
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
@@ -50,7 +50,7 @@ xftpAgentTests = around_ testBracket . describe "agent XFTP API" $ do
|
||||
it "should send and receive small file without a redirect" testXFTPAgentSendReceiveNoRedirect
|
||||
it "should resume receiving file after restart" testXFTPAgentReceiveRestore
|
||||
it "should cleanup rcv tmp path after permanent error" testXFTPAgentReceiveCleanup
|
||||
it "should resume sending file after restart" testXFTPAgentSendRestore
|
||||
fit "should resume sending file after restart" testXFTPAgentSendRestore
|
||||
it "should cleanup snd prefix path after permanent error" testXFTPAgentSendCleanup
|
||||
it "should delete sent file on server" testXFTPAgentDelete
|
||||
it "should resume deleting file after restart" testXFTPAgentDeleteRestore
|
||||
@@ -393,16 +393,22 @@ testXFTPAgentSendRestore = withGlobalLogging logCfgNoLogs $ do
|
||||
liftIO $ sfId' `shouldBe` sfId
|
||||
disposeAgentClient sndr'
|
||||
|
||||
threadDelay 100000
|
||||
threadDelay 10000
|
||||
|
||||
withXFTPServerStoreLogOn $ \_ -> do
|
||||
-- send file - should continue uploading with server up
|
||||
sndr' <- getSMPAgentClient' 3 agentCfg initAgentServers testDB
|
||||
-- runExceptT (execAgentStoreSQL sndr' "select snd_file_id, snd_file_chunk_id, chunk_no, chunk_offset from snd_file_chunks") >>= print
|
||||
-- runExceptT (execAgentStoreSQL sndr' "select snd_file_chunk_replica_id, snd_file_chunk_id, replica_number, created_at, updated_at from snd_file_chunk_replicas") >>= print
|
||||
runRight_ $ xftpStartWorkers sndr' (Just senderFiles)
|
||||
sfProgress sndr' $ mb 18
|
||||
("", sfId', SFDONE _sndDescr [rfd1, _rfd2]) <- sfGet sndr'
|
||||
liftIO $ sfId' `shouldBe` sfId
|
||||
|
||||
|
||||
-- runExceptT (execAgentStoreSQL sndr' "select snd_file_id, snd_file_chunk_id, chunk_no, chunk_offset from snd_file_chunks") >>= print
|
||||
-- runExceptT (execAgentStoreSQL sndr' "select snd_file_chunk_replica_id, snd_file_chunk_id, replica_number, created_at, updated_at from snd_file_chunk_replicas") >>= print
|
||||
|
||||
-- prefix path should be removed after sending file
|
||||
threadDelay 100000
|
||||
doesDirectoryExist prefixPath `shouldReturn` False
|
||||
|
||||
Reference in New Issue
Block a user