mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-30 16:26:02 +00:00
xftp: start chunk upload after all chunks are prepared (#1191)
This commit is contained in:
@@ -38,7 +38,7 @@ import qualified Data.ByteString.Char8 as B
|
||||
import qualified Data.ByteString.Lazy.Char8 as LB
|
||||
import Data.Coerce (coerce)
|
||||
import Data.Composition ((.:))
|
||||
import Data.Either (rights)
|
||||
import Data.Either (partitionEithers, rights)
|
||||
import Data.Int (Int64)
|
||||
import Data.List (foldl', partition, sortOn)
|
||||
import qualified Data.List.NonEmpty as L
|
||||
@@ -71,7 +71,7 @@ import qualified Simplex.Messaging.Crypto.File as CF
|
||||
import qualified Simplex.Messaging.Crypto.Lazy as LC
|
||||
import Simplex.Messaging.Encoding
|
||||
import Simplex.Messaging.Encoding.String (strDecode, strEncode)
|
||||
import Simplex.Messaging.Protocol (EntityId, XFTPServer)
|
||||
import Simplex.Messaging.Protocol (EntityId, ProtocolServer, ProtocolType (..), XFTPServer)
|
||||
import qualified Simplex.Messaging.TMap as TM
|
||||
import Simplex.Messaging.Util (catchAll_, liftError, tshow, unlessM, whenM)
|
||||
import System.FilePath (takeFileName, (</>))
|
||||
@@ -397,9 +397,14 @@ runXFTPSndPrepareWorker c Worker {doWork} = do
|
||||
getSndFile db sndFileId
|
||||
else pure sndFile
|
||||
let numRecipients' = min numRecipients maxRecipients
|
||||
-- in case chunk preparation previously failed mid-way, some chunks may already be created -
|
||||
-- here we split previously prepared chunks from the pending ones to then build full list of servers
|
||||
let (pendingChunks, preparedSrvs) = partitionEithers $ map srvOrPendingChunk chunks
|
||||
-- concurrently?
|
||||
-- separate worker to create chunks? record retries and delay on snd_file_chunks?
|
||||
forM_ (filter (\SndFileChunk {replicas} -> null replicas) chunks) $ createChunk numRecipients'
|
||||
srvs <- forM pendingChunks $ createChunk numRecipients'
|
||||
let allSrvs = S.fromList $ preparedSrvs <> srvs
|
||||
lift $ forM_ allSrvs $ \srv -> getXFTPSndWorker True c (Just srv)
|
||||
withStore' c $ \db -> updateSndFileStatus db sndFileId SFSUploading
|
||||
where
|
||||
AgentConfig {xftpMaxRecipientsPerRequest = maxRecipients, messageRetryInterval = ri} = cfg
|
||||
@@ -423,12 +428,16 @@ runXFTPSndPrepareWorker c Worker {doWork} = do
|
||||
let chunkSpecs = prepareChunkSpecs fsEncPath chunkSizes
|
||||
chunkDigests <- liftIO $ mapM getChunkDigest chunkSpecs
|
||||
pure (FileDigest digest, zip chunkSpecs $ coerce chunkDigests)
|
||||
createChunk :: Int -> SndFileChunk -> AM ()
|
||||
srvOrPendingChunk :: SndFileChunk -> Either SndFileChunk (ProtocolServer 'PXFTP)
|
||||
srvOrPendingChunk ch@SndFileChunk {replicas} = case replicas of
|
||||
[] -> Left ch
|
||||
SndFileChunkReplica {server} : _ -> Right server
|
||||
createChunk :: Int -> SndFileChunk -> AM (ProtocolServer 'PXFTP)
|
||||
createChunk numRecipients' ch = do
|
||||
atomically $ assertAgentForeground c
|
||||
(replica, ProtoServerWithAuth srv _) <- tryCreate
|
||||
withStore' c $ \db -> createSndFileReplica db ch replica
|
||||
lift . void $ getXFTPSndWorker True c (Just srv)
|
||||
pure srv
|
||||
where
|
||||
tryCreate = do
|
||||
usedSrvs <- newTVarIO ([] :: [XFTPServer])
|
||||
|
||||
Reference in New Issue
Block a user