diff --git a/src/Simplex/FileTransfer/Agent.hs b/src/Simplex/FileTransfer/Agent.hs index bae008e58..7559d6673 100644 --- a/src/Simplex/FileTransfer/Agent.hs +++ b/src/Simplex/FileTransfer/Agent.hs @@ -228,6 +228,7 @@ withRetryIntervalLimit maxN ri action = retryOnError :: Text -> AM a -> AM a -> AgentErrorType -> AM a retryOnError name loop done e = do + liftIO $ print $ "error: " <> show e logError $ name <> " error: " <> tshow e if temporaryAgentError e then loop @@ -455,7 +456,8 @@ runXFTPSndWorker c srv Worker {doWork} = do runXFTPOperation cfg@AgentConfig {sndFilesTTL, reconnectInterval = ri, xftpNotifyErrsOnRetry = notifyOnRetry, xftpConsecutiveRetries} = do withWork c doWork (\db -> getNextSndChunkToUpload db srv sndFilesTTL) $ \case SndFileChunk {sndFileId, sndFileEntityId, filePrefixPath, replicas = []} -> sndWorkerInternalError c sndFileId sndFileEntityId (Just filePrefixPath) "chunk has no replicas" - fc@SndFileChunk {userId, sndFileId, sndFileEntityId, filePrefixPath, digest, replicas = replica@SndFileChunkReplica {sndChunkReplicaId, server, delay} : _} -> do + fc@SndFileChunk {userId, chunkSpec = XFTPChunkSpec {chunkOffset}, sndFileId, sndFileEntityId, filePrefixPath, digest, replicas = replica@SndFileChunkReplica {sndChunkReplicaId, server, delay} : _} -> do + liftIO $ putStrLn $ "chunkOffset: " <> show chunkOffset let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop -> uploadFileChunk cfg fc replica @@ -470,7 +472,8 @@ runXFTPSndWorker c srv Worker {doWork} = do loop retryDone e = sndWorkerInternalError c sndFileId sndFileEntityId (Just filePrefixPath) (show e) uploadFileChunk :: AgentConfig -> SndFileChunk -> SndFileChunkReplica -> AM () - uploadFileChunk AgentConfig {xftpMaxRecipientsPerRequest = maxRecipients} sndFileChunk@SndFileChunk {sndFileId, userId, chunkSpec = chunkSpec@XFTPChunkSpec {filePath}, digest = chunkDigest} replica = do + uploadFileChunk AgentConfig {xftpMaxRecipientsPerRequest = maxRecipients} sndFileChunk@SndFileChunk {sndFileId, userId, chunkSpec = chunkSpec@XFTPChunkSpec {filePath, chunkOffset}, digest = chunkDigest} replica = do + liftIO $ putStrLn $ "uploadFileChunk: " <> show chunkOffset replica'@SndFileChunkReplica {sndChunkReplicaId} <- addRecipients sndFileChunk replica fsFilePath <- lift $ toFSFilePath filePath unlessM (doesFileExist fsFilePath) $ throwError $ INTERNAL "encrypted file doesn't exist on upload" @@ -478,13 +481,16 @@ runXFTPSndWorker c srv Worker {doWork} = do atomically $ assertAgentForeground c agentXFTPUploadChunk c userId chunkDigest replica' chunkSpec' atomically $ waitUntilForeground c + -- liftIO $ putStrLn $ "uploaded: " <> show chunkOffset sf@SndFile {sndFileEntityId, prefixPath, chunks} <- withStore c $ \db -> do updateSndChunkReplicaStatus db sndChunkReplicaId SFRSUploaded getSndFile db sndFileId let uploaded = uploadedSize chunks total = totalSize chunks complete = all chunkUploaded chunks + liftIO $ putStrLn $ "uploaded: " <> show chunkOffset <> " size: " <> show uploaded notify c sndFileEntityId $ SFPROG uploaded total + -- liftIO $ putStrLn $ "notified: " <> show chunkOffset <> " size: " <> show uploaded when complete $ do (sndDescr, rcvDescrs) <- sndFileToDescrs sf notify c sndFileEntityId $ SFDONE sndDescr rcvDescrs diff --git a/tests/XFTPAgent.hs b/tests/XFTPAgent.hs index 2befdcc76..89e2e0cc4 100644 --- a/tests/XFTPAgent.hs +++ b/tests/XFTPAgent.hs @@ -24,7 +24,7 @@ import Simplex.FileTransfer.Description (FileDescription (..), FileDescriptionUR import Simplex.FileTransfer.Protocol (FileParty (..)) import Simplex.FileTransfer.Transport (XFTPErrorType (AUTH)) import Simplex.FileTransfer.Server.Env (XFTPServerConfig (..)) -import Simplex.Messaging.Agent (AgentClient, disposeAgentClient, testProtocolServer, xftpDeleteRcvFile, xftpDeleteSndFileInternal, xftpDeleteSndFileRemote, xftpReceiveFile, xftpSendDescription, xftpSendFile, xftpStartWorkers) +import Simplex.Messaging.Agent (AgentClient, execAgentStoreSQL, disposeAgentClient, testProtocolServer, xftpDeleteRcvFile, xftpDeleteSndFileInternal, xftpDeleteSndFileRemote, xftpReceiveFile, xftpSendDescription, xftpSendFile, xftpStartWorkers) import Simplex.Messaging.Agent.Client (ProtocolTestFailure (..), ProtocolTestStep (..)) import Simplex.Messaging.Agent.Protocol (ACommand (..), AgentErrorType (..), BrokerErrorType (..), RcvFileId, SndFileId, noAuthSrv) import qualified Simplex.Messaging.Crypto as C @@ -50,7 +50,7 @@ xftpAgentTests = around_ testBracket . describe "agent XFTP API" $ do it "should send and receive small file without a redirect" testXFTPAgentSendReceiveNoRedirect it "should resume receiving file after restart" testXFTPAgentReceiveRestore it "should cleanup rcv tmp path after permanent error" testXFTPAgentReceiveCleanup - it "should resume sending file after restart" testXFTPAgentSendRestore + fit "should resume sending file after restart" testXFTPAgentSendRestore it "should cleanup snd prefix path after permanent error" testXFTPAgentSendCleanup it "should delete sent file on server" testXFTPAgentDelete it "should resume deleting file after restart" testXFTPAgentDeleteRestore @@ -393,16 +393,22 @@ testXFTPAgentSendRestore = withGlobalLogging logCfgNoLogs $ do liftIO $ sfId' `shouldBe` sfId disposeAgentClient sndr' - threadDelay 100000 + threadDelay 10000 withXFTPServerStoreLogOn $ \_ -> do -- send file - should continue uploading with server up sndr' <- getSMPAgentClient' 3 agentCfg initAgentServers testDB + -- runExceptT (execAgentStoreSQL sndr' "select snd_file_id, snd_file_chunk_id, chunk_no, chunk_offset from snd_file_chunks") >>= print + -- runExceptT (execAgentStoreSQL sndr' "select snd_file_chunk_replica_id, snd_file_chunk_id, replica_number, created_at, updated_at from snd_file_chunk_replicas") >>= print runRight_ $ xftpStartWorkers sndr' (Just senderFiles) sfProgress sndr' $ mb 18 ("", sfId', SFDONE _sndDescr [rfd1, _rfd2]) <- sfGet sndr' liftIO $ sfId' `shouldBe` sfId + + -- runExceptT (execAgentStoreSQL sndr' "select snd_file_id, snd_file_chunk_id, chunk_no, chunk_offset from snd_file_chunks") >>= print + -- runExceptT (execAgentStoreSQL sndr' "select snd_file_chunk_replica_id, snd_file_chunk_id, replica_number, created_at, updated_at from snd_file_chunk_replicas") >>= print + -- prefix path should be removed after sending file threadDelay 100000 doesDirectoryExist prefixPath `shouldReturn` False