From d6df82f7cc93b08c0f6d603bea824a5122d8d2ab Mon Sep 17 00:00:00 2001 From: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com> Date: Fri, 21 Apr 2023 19:21:32 +0400 Subject: [PATCH] xftp: safer retries --- src/Simplex/FileTransfer/Agent.hs | 18 ++++++++++++------ src/Simplex/Messaging/Agent/Store/SQLite.hs | 17 ++++++++++++++--- 2 files changed, 26 insertions(+), 9 deletions(-) diff --git a/src/Simplex/FileTransfer/Agent.hs b/src/Simplex/FileTransfer/Agent.hs index 11681d7b9..5663c76d5 100644 --- a/src/Simplex/FileTransfer/Agent.hs +++ b/src/Simplex/FileTransfer/Agent.hs @@ -197,7 +197,9 @@ runXFTPRcvWorker c srv doWork = do withStore' c $ \db -> updateRcvChunkReplicaDelay db rcvChunkReplicaId replicaDelay atomically $ assertAgentForeground c loop - retryDone e = rcvWorkerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) (show e) + retryDone e = do + withStore' c (`deleteSndFileReplica` rcvChunkReplicaId) + rcvWorkerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) (show e) downloadFileChunk :: RcvFileChunk -> RcvFileChunkReplica -> m () downloadFileChunk RcvFileChunk {userId, rcvFileId, rcvFileEntityId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath} replica = do fsFileTmpPath <- toFSFilePath fileTmpPath @@ -415,7 +417,7 @@ runXFTPSndPrepareWorker c doWork = do createChunk :: Int -> SndFileChunk -> m () createChunk numRecipients' ch = do atomically $ assertAgentForeground c - (replica, ProtoServerWithAuth srv _) <- agentOperationBracket c AOSndNetwork throwWhenInactive tryCreate + (replica, ProtoServerWithAuth srv _) <- tryCreate withStore' c $ \db -> createSndFileReplica db ch replica addXFTPSndWorker c $ Just srv where @@ -445,7 +447,7 @@ runXFTPSndWorker c srv doWork = do forever $ do void . atomically $ readTMVar doWork atomically $ assertAgentForeground c - agentOperationBracket c AOSndNetwork throwWhenInactive runXFTPOperation + runXFTPOperation where noWorkToDo = void . atomically $ tryTakeTMVar doWork runXFTPOperation :: m () @@ -470,7 +472,9 @@ runXFTPSndWorker c srv doWork = do withStore' c $ \db -> updateSndChunkReplicaDelay db sndChunkReplicaId replicaDelay atomically $ assertAgentForeground c loop - retryDone e = sndWorkerInternalError c sndFileId sndFileEntityId (Just filePrefixPath) (show e) + retryDone e = do + withStore' c (`deleteSndFileReplica` sndChunkReplicaId) + sndWorkerInternalError c sndFileId sndFileEntityId (Just filePrefixPath) (show e) uploadFileChunk :: SndFileChunk -> SndFileChunkReplica -> m () uploadFileChunk sndFileChunk@SndFileChunk {sndFileId, userId, chunkSpec = chunkSpec@XFTPChunkSpec {filePath}, digest = chunkDigest} replica = do replica'@SndFileChunkReplica {sndChunkReplicaId} <- addRecipients sndFileChunk replica @@ -628,11 +632,13 @@ runXFTPDelWorker c srv doWork = do withStore' c $ \db -> updateDeletedSndChunkReplicaDelay db deletedSndChunkReplicaId replicaDelay atomically $ assertAgentForeground c loop - retryDone e = delWorkerInternalError c deletedSndChunkReplicaId e + retryDone e = do + withStore' c (`deleteDeletedSndChunkReplica` deletedSndChunkReplicaId) + delWorkerInternalError c deletedSndChunkReplicaId e deleteChunkReplica :: DeletedSndChunkReplica -> m () deleteChunkReplica replica@DeletedSndChunkReplica {userId, deletedSndChunkReplicaId} = do agentXFTPDeleteChunk c userId replica - withStore' c $ \db -> deleteDeletedSndChunkReplica db deletedSndChunkReplicaId + withStore' c (`deleteDeletedSndChunkReplica` deletedSndChunkReplicaId) delWorkerInternalError :: AgentMonad m => AgentClient -> Int64 -> AgentErrorType -> m () delWorkerInternalError c deletedSndChunkReplicaId e = do diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index 1a8c00149..090beb92d 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -144,6 +144,7 @@ module Simplex.Messaging.Agent.Store.SQLite updateRcvFileNoTmpPath, updateRcvFileDeleted, deleteRcvFile', + deleteRcvFileReplica, getNextRcvChunkToDownload, getNextRcvFileToDecrypt, getPendingRcvFilesServers, @@ -162,6 +163,7 @@ module Simplex.Messaging.Agent.Store.SQLite updateSndFileNoPrefixPath, updateSndFileDeleted, deleteSndFile', + deleteSndFileReplica, getSndFileDeleted, createSndFileReplica, getNextSndChunkToUpload, @@ -1998,6 +2000,10 @@ deleteRcvFile' :: DB.Connection -> DBRcvFileId -> IO () deleteRcvFile' db rcvFileId = DB.execute db "DELETE FROM rcv_files WHERE rcv_file_id = ?" (Only rcvFileId) +deleteRcvFileReplica :: DB.Connection -> Int64 -> IO () +deleteRcvFileReplica db replicaId = + DB.execute db "DELETE FROM rcv_file_chunk_replicas WHERE rcv_file_chunk_replica_id = ?" (Only replicaId) + getNextRcvChunkToDownload :: DB.Connection -> XFTPServer -> NominalDiffTime -> IO (Maybe RcvFileChunk) getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} ttl = do cutoffTs <- addUTCTime (- ttl) <$> getCurrentTime @@ -2015,7 +2021,7 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} ttl = d WHERE s.xftp_host = ? AND s.xftp_port = ? AND s.xftp_key_hash = ? AND r.received = 0 AND r.replica_number = 1 AND f.status = ? AND f.deleted = 0 AND f.created_at >= ? - ORDER BY r.created_at ASC + ORDER BY r.retries ASC, r.created_at ASC LIMIT 1 |] (host, port, keyHash, RFSReceiving, cutoffTs) @@ -2255,6 +2261,10 @@ deleteSndFile' :: DB.Connection -> DBSndFileId -> IO () deleteSndFile' db sndFileId = DB.execute db "DELETE FROM snd_files WHERE snd_file_id = ?" (Only sndFileId) +deleteSndFileReplica :: DB.Connection -> Int64 -> IO () +deleteSndFileReplica db replicaId = + DB.execute db "DELETE FROM snd_file_chunk_replicas WHERE snd_file_chunk_replica_id = ?" (Only replicaId) + getSndFileDeleted :: DB.Connection -> DBSndFileId -> IO Bool getSndFileDeleted db sndFileId = fromMaybe True @@ -2301,7 +2311,7 @@ getNextSndChunkToUpload db server@ProtocolServer {host, port, keyHash} ttl = do WHERE s.xftp_host = ? AND s.xftp_port = ? AND s.xftp_key_hash = ? AND r.replica_status = ? AND r.replica_number = 1 AND (f.status = ? OR f.status = ?) AND f.deleted = 0 AND f.created_at >= ? - ORDER BY r.created_at ASC + ORDER BY r.retries ASC, r.created_at ASC LIMIT 1 |] (host, port, keyHash, SFRSCreated, SFSEncrypted, SFSUploading, cutoffTs) @@ -2443,7 +2453,8 @@ getNextDeletedSndChunkReplica db ProtocolServer {host, port, keyHash} ttl = do JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id WHERE s.xftp_host = ? AND s.xftp_port = ? AND s.xftp_key_hash = ? AND r.created_at >= ? - ORDER BY r.created_at ASC LIMIT 1 + ORDER BY r.retries ASC, r.created_at ASC + LIMIT 1 |] (host, port, keyHash, cutoffTs) case replicaId_ of