mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-24 14:35:22 +00:00
xftp: safer retries
This commit is contained in:
@@ -197,7 +197,9 @@ runXFTPRcvWorker c srv doWork = do
|
||||
withStore' c $ \db -> updateRcvChunkReplicaDelay db rcvChunkReplicaId replicaDelay
|
||||
atomically $ assertAgentForeground c
|
||||
loop
|
||||
retryDone e = rcvWorkerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) (show e)
|
||||
retryDone e = do
|
||||
withStore' c (`deleteSndFileReplica` rcvChunkReplicaId)
|
||||
rcvWorkerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) (show e)
|
||||
downloadFileChunk :: RcvFileChunk -> RcvFileChunkReplica -> m ()
|
||||
downloadFileChunk RcvFileChunk {userId, rcvFileId, rcvFileEntityId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath} replica = do
|
||||
fsFileTmpPath <- toFSFilePath fileTmpPath
|
||||
@@ -415,7 +417,7 @@ runXFTPSndPrepareWorker c doWork = do
|
||||
createChunk :: Int -> SndFileChunk -> m ()
|
||||
createChunk numRecipients' ch = do
|
||||
atomically $ assertAgentForeground c
|
||||
(replica, ProtoServerWithAuth srv _) <- agentOperationBracket c AOSndNetwork throwWhenInactive tryCreate
|
||||
(replica, ProtoServerWithAuth srv _) <- tryCreate
|
||||
withStore' c $ \db -> createSndFileReplica db ch replica
|
||||
addXFTPSndWorker c $ Just srv
|
||||
where
|
||||
@@ -445,7 +447,7 @@ runXFTPSndWorker c srv doWork = do
|
||||
forever $ do
|
||||
void . atomically $ readTMVar doWork
|
||||
atomically $ assertAgentForeground c
|
||||
agentOperationBracket c AOSndNetwork throwWhenInactive runXFTPOperation
|
||||
runXFTPOperation
|
||||
where
|
||||
noWorkToDo = void . atomically $ tryTakeTMVar doWork
|
||||
runXFTPOperation :: m ()
|
||||
@@ -470,7 +472,9 @@ runXFTPSndWorker c srv doWork = do
|
||||
withStore' c $ \db -> updateSndChunkReplicaDelay db sndChunkReplicaId replicaDelay
|
||||
atomically $ assertAgentForeground c
|
||||
loop
|
||||
retryDone e = sndWorkerInternalError c sndFileId sndFileEntityId (Just filePrefixPath) (show e)
|
||||
retryDone e = do
|
||||
withStore' c (`deleteSndFileReplica` sndChunkReplicaId)
|
||||
sndWorkerInternalError c sndFileId sndFileEntityId (Just filePrefixPath) (show e)
|
||||
uploadFileChunk :: SndFileChunk -> SndFileChunkReplica -> m ()
|
||||
uploadFileChunk sndFileChunk@SndFileChunk {sndFileId, userId, chunkSpec = chunkSpec@XFTPChunkSpec {filePath}, digest = chunkDigest} replica = do
|
||||
replica'@SndFileChunkReplica {sndChunkReplicaId} <- addRecipients sndFileChunk replica
|
||||
@@ -628,11 +632,13 @@ runXFTPDelWorker c srv doWork = do
|
||||
withStore' c $ \db -> updateDeletedSndChunkReplicaDelay db deletedSndChunkReplicaId replicaDelay
|
||||
atomically $ assertAgentForeground c
|
||||
loop
|
||||
retryDone e = delWorkerInternalError c deletedSndChunkReplicaId e
|
||||
retryDone e = do
|
||||
withStore' c (`deleteDeletedSndChunkReplica` deletedSndChunkReplicaId)
|
||||
delWorkerInternalError c deletedSndChunkReplicaId e
|
||||
deleteChunkReplica :: DeletedSndChunkReplica -> m ()
|
||||
deleteChunkReplica replica@DeletedSndChunkReplica {userId, deletedSndChunkReplicaId} = do
|
||||
agentXFTPDeleteChunk c userId replica
|
||||
withStore' c $ \db -> deleteDeletedSndChunkReplica db deletedSndChunkReplicaId
|
||||
withStore' c (`deleteDeletedSndChunkReplica` deletedSndChunkReplicaId)
|
||||
|
||||
delWorkerInternalError :: AgentMonad m => AgentClient -> Int64 -> AgentErrorType -> m ()
|
||||
delWorkerInternalError c deletedSndChunkReplicaId e = do
|
||||
|
||||
@@ -144,6 +144,7 @@ module Simplex.Messaging.Agent.Store.SQLite
|
||||
updateRcvFileNoTmpPath,
|
||||
updateRcvFileDeleted,
|
||||
deleteRcvFile',
|
||||
deleteRcvFileReplica,
|
||||
getNextRcvChunkToDownload,
|
||||
getNextRcvFileToDecrypt,
|
||||
getPendingRcvFilesServers,
|
||||
@@ -162,6 +163,7 @@ module Simplex.Messaging.Agent.Store.SQLite
|
||||
updateSndFileNoPrefixPath,
|
||||
updateSndFileDeleted,
|
||||
deleteSndFile',
|
||||
deleteSndFileReplica,
|
||||
getSndFileDeleted,
|
||||
createSndFileReplica,
|
||||
getNextSndChunkToUpload,
|
||||
@@ -1998,6 +2000,10 @@ deleteRcvFile' :: DB.Connection -> DBRcvFileId -> IO ()
|
||||
deleteRcvFile' db rcvFileId =
|
||||
DB.execute db "DELETE FROM rcv_files WHERE rcv_file_id = ?" (Only rcvFileId)
|
||||
|
||||
deleteRcvFileReplica :: DB.Connection -> Int64 -> IO ()
|
||||
deleteRcvFileReplica db replicaId =
|
||||
DB.execute db "DELETE FROM rcv_file_chunk_replicas WHERE rcv_file_chunk_replica_id = ?" (Only replicaId)
|
||||
|
||||
getNextRcvChunkToDownload :: DB.Connection -> XFTPServer -> NominalDiffTime -> IO (Maybe RcvFileChunk)
|
||||
getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} ttl = do
|
||||
cutoffTs <- addUTCTime (- ttl) <$> getCurrentTime
|
||||
@@ -2015,7 +2021,7 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} ttl = d
|
||||
WHERE s.xftp_host = ? AND s.xftp_port = ? AND s.xftp_key_hash = ?
|
||||
AND r.received = 0 AND r.replica_number = 1
|
||||
AND f.status = ? AND f.deleted = 0 AND f.created_at >= ?
|
||||
ORDER BY r.created_at ASC
|
||||
ORDER BY r.retries ASC, r.created_at ASC
|
||||
LIMIT 1
|
||||
|]
|
||||
(host, port, keyHash, RFSReceiving, cutoffTs)
|
||||
@@ -2255,6 +2261,10 @@ deleteSndFile' :: DB.Connection -> DBSndFileId -> IO ()
|
||||
deleteSndFile' db sndFileId =
|
||||
DB.execute db "DELETE FROM snd_files WHERE snd_file_id = ?" (Only sndFileId)
|
||||
|
||||
deleteSndFileReplica :: DB.Connection -> Int64 -> IO ()
|
||||
deleteSndFileReplica db replicaId =
|
||||
DB.execute db "DELETE FROM snd_file_chunk_replicas WHERE snd_file_chunk_replica_id = ?" (Only replicaId)
|
||||
|
||||
getSndFileDeleted :: DB.Connection -> DBSndFileId -> IO Bool
|
||||
getSndFileDeleted db sndFileId =
|
||||
fromMaybe True
|
||||
@@ -2301,7 +2311,7 @@ getNextSndChunkToUpload db server@ProtocolServer {host, port, keyHash} ttl = do
|
||||
WHERE s.xftp_host = ? AND s.xftp_port = ? AND s.xftp_key_hash = ?
|
||||
AND r.replica_status = ? AND r.replica_number = 1
|
||||
AND (f.status = ? OR f.status = ?) AND f.deleted = 0 AND f.created_at >= ?
|
||||
ORDER BY r.created_at ASC
|
||||
ORDER BY r.retries ASC, r.created_at ASC
|
||||
LIMIT 1
|
||||
|]
|
||||
(host, port, keyHash, SFRSCreated, SFSEncrypted, SFSUploading, cutoffTs)
|
||||
@@ -2443,7 +2453,8 @@ getNextDeletedSndChunkReplica db ProtocolServer {host, port, keyHash} ttl = do
|
||||
JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id
|
||||
WHERE s.xftp_host = ? AND s.xftp_port = ? AND s.xftp_key_hash = ?
|
||||
AND r.created_at >= ?
|
||||
ORDER BY r.created_at ASC LIMIT 1
|
||||
ORDER BY r.retries ASC, r.created_at ASC
|
||||
LIMIT 1
|
||||
|]
|
||||
(host, port, keyHash, cutoffTs)
|
||||
case replicaId_ of
|
||||
|
||||
Reference in New Issue
Block a user