From da620c388a853cb8612cc695aed5cff6900a916b Mon Sep 17 00:00:00 2001 From: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com> Date: Thu, 6 Jun 2024 15:50:15 +0400 Subject: [PATCH] xftp: start chunk upload after all chunks are prepared (#1191) --- src/Simplex/FileTransfer/Agent.hs | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/src/Simplex/FileTransfer/Agent.hs b/src/Simplex/FileTransfer/Agent.hs index 9c0b3fe00..890966888 100644 --- a/src/Simplex/FileTransfer/Agent.hs +++ b/src/Simplex/FileTransfer/Agent.hs @@ -38,7 +38,7 @@ import qualified Data.ByteString.Char8 as B import qualified Data.ByteString.Lazy.Char8 as LB import Data.Coerce (coerce) import Data.Composition ((.:)) -import Data.Either (rights) +import Data.Either (partitionEithers, rights) import Data.Int (Int64) import Data.List (foldl', partition, sortOn) import qualified Data.List.NonEmpty as L @@ -71,7 +71,7 @@ import qualified Simplex.Messaging.Crypto.File as CF import qualified Simplex.Messaging.Crypto.Lazy as LC import Simplex.Messaging.Encoding import Simplex.Messaging.Encoding.String (strDecode, strEncode) -import Simplex.Messaging.Protocol (EntityId, XFTPServer) +import Simplex.Messaging.Protocol (EntityId, ProtocolServer, ProtocolType (..), XFTPServer) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Util (catchAll_, liftError, tshow, unlessM, whenM) import System.FilePath (takeFileName, ()) @@ -397,9 +397,14 @@ runXFTPSndPrepareWorker c Worker {doWork} = do getSndFile db sndFileId else pure sndFile let numRecipients' = min numRecipients maxRecipients + -- in case chunk preparation previously failed mid-way, some chunks may already be created - + -- here we split previously prepared chunks from the pending ones to then build full list of servers + let (pendingChunks, preparedSrvs) = partitionEithers $ map srvOrPendingChunk chunks -- concurrently? -- separate worker to create chunks? record retries and delay on snd_file_chunks? - forM_ (filter (\SndFileChunk {replicas} -> null replicas) chunks) $ createChunk numRecipients' + srvs <- forM pendingChunks $ createChunk numRecipients' + let allSrvs = S.fromList $ preparedSrvs <> srvs + lift $ forM_ allSrvs $ \srv -> getXFTPSndWorker True c (Just srv) withStore' c $ \db -> updateSndFileStatus db sndFileId SFSUploading where AgentConfig {xftpMaxRecipientsPerRequest = maxRecipients, messageRetryInterval = ri} = cfg @@ -423,12 +428,16 @@ runXFTPSndPrepareWorker c Worker {doWork} = do let chunkSpecs = prepareChunkSpecs fsEncPath chunkSizes chunkDigests <- liftIO $ mapM getChunkDigest chunkSpecs pure (FileDigest digest, zip chunkSpecs $ coerce chunkDigests) - createChunk :: Int -> SndFileChunk -> AM () + srvOrPendingChunk :: SndFileChunk -> Either SndFileChunk (ProtocolServer 'PXFTP) + srvOrPendingChunk ch@SndFileChunk {replicas} = case replicas of + [] -> Left ch + SndFileChunkReplica {server} : _ -> Right server + createChunk :: Int -> SndFileChunk -> AM (ProtocolServer 'PXFTP) createChunk numRecipients' ch = do atomically $ assertAgentForeground c (replica, ProtoServerWithAuth srv _) <- tryCreate withStore' c $ \db -> createSndFileReplica db ch replica - lift . void $ getXFTPSndWorker True c (Just srv) + pure srv where tryCreate = do usedSrvs <- newTVarIO ([] :: [XFTPServer])