xftp: fix repeated replica creation if it was in uploaded status (#1079)

* test with failing files (in progress)

* print

* add replica uploading state

* Revert "add replica uploading state"

This reverts commit 7068213aa6.

* <=

* fix

* prints

* test no redundancy

* all tests no redundancy

* revert delay

* refactor

---------

Co-authored-by: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com>
Co-authored-by: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com>
This commit is contained in:
Evgeny Poberezkin
2024-04-01 10:37:35 +01:00
committed by GitHub
parent 6ded721daa
commit 39bb804fab
2 changed files with 23 additions and 8 deletions

View File

@@ -389,7 +389,7 @@ runXFTPSndPrepareWorker c Worker {doWork} = do
let numRecipients' = min numRecipients maxRecipients
-- concurrently?
-- separate worker to create chunks? record retries and delay on snd_file_chunks?
forM_ (filter (not . chunkCreated) chunks) $ createChunk numRecipients'
forM_ (filter (\SndFileChunk {replicas} -> null replicas) chunks) $ createChunk numRecipients'
withStore' c $ \db -> updateSndFileStatus db sndFileId SFSUploading
where
AgentConfig {xftpMaxRecipientsPerRequest = maxRecipients, messageRetryInterval = ri} = cfg
@@ -413,9 +413,6 @@ runXFTPSndPrepareWorker c Worker {doWork} = do
let chunkSpecs = prepareChunkSpecs fsEncPath chunkSizes
chunkDigests <- liftIO $ mapM getChunkDigest chunkSpecs
pure (FileDigest digest, zip chunkSpecs $ coerce chunkDigests)
chunkCreated :: SndFileChunk -> Bool
chunkCreated SndFileChunk {replicas} =
any (\SndFileChunkReplica {replicaStatus} -> replicaStatus == SFRSCreated) replicas
createChunk :: Int -> SndFileChunk -> AM ()
createChunk numRecipients' ch = do
atomically $ assertAgentForeground c

View File

@@ -20,10 +20,10 @@ import Data.Int (Int64)
import Data.List (find, isSuffixOf)
import Data.Maybe (fromJust)
import SMPAgentClient (agentCfg, initAgentServers, testDB, testDB2, testDB3)
import Simplex.FileTransfer.Description (FileDescription (..), FileDescriptionURI (..), ValidFileDescription, fileDescriptionURI, mb, qrSizeLimit, pattern ValidFileDescription)
import Simplex.FileTransfer.Description (FileChunk (..), FileDescription (..), FileDescriptionURI (..), ValidFileDescription, fileDescriptionURI, mb, qrSizeLimit, pattern ValidFileDescription)
import Simplex.FileTransfer.Protocol (FileParty (..))
import Simplex.FileTransfer.Transport (XFTPErrorType (AUTH))
import Simplex.FileTransfer.Server.Env (XFTPServerConfig (..))
import Simplex.FileTransfer.Transport (XFTPErrorType (AUTH))
import Simplex.Messaging.Agent (AgentClient, disposeAgentClient, testProtocolServer, xftpDeleteRcvFile, xftpDeleteSndFileInternal, xftpDeleteSndFileRemote, xftpReceiveFile, xftpSendDescription, xftpSendFile, xftpStartWorkers)
import Simplex.Messaging.Agent.Client (ProtocolTestFailure (..), ProtocolTestStep (..))
import Simplex.Messaging.Agent.Protocol (ACommand (..), AgentErrorType (..), BrokerErrorType (..), RcvFileId, SndFileId, noAuthSrv)
@@ -105,7 +105,6 @@ testXFTPAgentSendReceive = withXFTPServer $ do
(sfId, _, rfd1, rfd2) <- testSend sndr filePath
liftIO $ xftpDeleteSndFileInternal sndr sfId
pure (rfd1, rfd2)
-- receive file, delete rcv file
testReceiveDelete 2 rfd1 filePath
testReceiveDelete 3 rfd2 filePath
@@ -157,6 +156,9 @@ testXFTPAgentSendReceiveRedirect = withXFTPServer $ do
sfGet sndr >>= \case
(_, _, SFDONE _snd (vfd : _)) -> pure vfd
r -> error $ "Expected SFDONE, got " <> show r
testNoRedundancy vfdDirect
redirectFileId <- runRight $ xftpSendDescription sndr 1 vfdDirect 1
logInfo $ "File sent, sending redirect: " <> tshow redirectFileId
sfGet sndr `shouldReturn` ("", redirectFileId, SFPROG 65536 65536)
@@ -164,6 +166,9 @@ testXFTPAgentSendReceiveRedirect = withXFTPServer $ do
sfGet sndr >>= \case
(_, _, SFDONE _snd (vfd : _)) -> pure vfd
r -> error $ "Expected SFDONE, got " <> show r
testNoRedundancy vfdRedirect
case fdRedirect of
FileDescription {redirect = Just _} -> pure ()
_ -> error "missing RedirectFileInfo"
@@ -208,6 +213,9 @@ testXFTPAgentSendReceiveNoRedirect = withXFTPServer $ do
sfGet sndr >>= \case
(_, _, SFDONE _snd (vfd : _)) -> pure vfd
r -> error $ "Expected SFDONE, got " <> show r
testNoRedundancy vfdDirect
let uri = strEncode $ fileDescriptionURI vfdDirect
B.length uri `shouldSatisfy` (< qrSizeLimit)
case strDecode uri of
@@ -255,9 +263,15 @@ testSendCF sndr file = do
sfId <- xftpSendFile sndr 1 file 2
sfProgress sndr $ mb 18
("", sfId', SFDONE sndDescr [rfd1, rfd2]) <- sfGet sndr
liftIO $ testNoRedundancy rfd1
liftIO $ testNoRedundancy rfd2
liftIO $ sfId' `shouldBe` sfId
pure (sfId, sndDescr, rfd1, rfd2)
testNoRedundancy :: HasCallStack => ValidFileDescription 'FRecipient -> IO ()
testNoRedundancy (ValidFileDescription FileDescription {chunks}) =
all (\FileChunk {replicas} -> length replicas == 1) chunks `shouldBe` True
testReceive :: HasCallStack => AgentClient -> ValidFileDescription 'FRecipient -> FilePath -> ExceptT AgentErrorType IO RcvFileId
testReceive rcp rfd = testReceiveCF rcp rfd Nothing
@@ -400,7 +414,9 @@ testXFTPAgentSendRestore = withGlobalLogging logCfgNoLogs $ do
sndr' <- getSMPAgentClient' 3 agentCfg initAgentServers testDB
runRight_ $ xftpStartWorkers sndr' (Just senderFiles)
sfProgress sndr' $ mb 18
("", sfId', SFDONE _sndDescr [rfd1, _rfd2]) <- sfGet sndr'
("", sfId', SFDONE _sndDescr [rfd1, rfd2]) <- sfGet sndr'
liftIO $ testNoRedundancy rfd1
liftIO $ testNoRedundancy rfd2
liftIO $ sfId' `shouldBe` sfId
-- prefix path should be removed after sending file
@@ -618,6 +634,8 @@ testXFTPAgentRequestAdditionalRecipientIDs = withXFTPServer $ do
length rfds `shouldBe` 500
pure rfds
forM_ rfds testNoRedundancy
-- receive file using different descriptions
-- ! revise number of recipients and indexes if xftpMaxRecipientsPerRequest is changed
rcp <- getSMPAgentClient' 2 agentCfg initAgentServers testDB2