diff --git a/src/Simplex/FileTransfer/Agent.hs b/src/Simplex/FileTransfer/Agent.hs index 115ca6946..aabe3ff28 100644 --- a/src/Simplex/FileTransfer/Agent.hs +++ b/src/Simplex/FileTransfer/Agent.hs @@ -45,7 +45,7 @@ import Data.List (foldl', partition, sortOn) import qualified Data.List.NonEmpty as L import Data.Map.Strict (Map) import qualified Data.Map.Strict as M -import Data.Maybe (mapMaybe) +import Data.Maybe (fromMaybe, mapMaybe) import qualified Data.Set as S import Data.Text (Text) import Data.Time.Clock (getCurrentTime) @@ -190,8 +190,9 @@ runXFTPRcvWorker c srv Worker {doWork} = do runXFTPOperation :: AgentConfig -> AM () runXFTPOperation AgentConfig {rcvFilesTTL, reconnectInterval = ri, xftpConsecutiveRetries} = withWork c doWork (\db -> getNextRcvChunkToDownload db srv rcvFilesTTL) $ \case - (RcvFileChunk {rcvFileId, rcvFileEntityId, fileTmpPath, replicas = []}, _) -> rcvWorkerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) (INTERNAL "chunk has no replicas") - (fc@RcvFileChunk {userId, rcvFileId, rcvFileEntityId, digest, fileTmpPath, replicas = replica@RcvFileChunkReplica {rcvChunkReplicaId, server, delay} : _}, approvedRelays) -> do + (RcvFileChunk {rcvFileId, rcvFileEntityId, fileTmpPath, replicas = []}, _, redirectEntityId_) -> + rcvWorkerInternalError c rcvFileId rcvFileEntityId redirectEntityId_ (Just fileTmpPath) (INTERNAL "chunk has no replicas") + (fc@RcvFileChunk {userId, rcvFileId, rcvFileEntityId, digest, fileTmpPath, replicas = replica@RcvFileChunkReplica {rcvChunkReplicaId, server, delay} : _}, approvedRelays, redirectEntityId_) -> do let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop -> do liftIO $ waitWhileSuspended c @@ -202,7 +203,7 @@ runXFTPRcvWorker c srv Worker {doWork} = do where retryLoop loop e replicaDelay = do flip catchAgentError (\_ -> pure ()) $ do - when (serverHostError e) $ notify c rcvFileEntityId $ RFWARN e + when (serverHostError e) $ notify c (fromMaybe rcvFileEntityId redirectEntityId_) (RFWARN e) liftIO $ closeXFTPServerClient c userId server digest withStore' c $ \db -> updateRcvChunkReplicaDelay db rcvChunkReplicaId replicaDelay liftIO $ assertAgentForeground c @@ -211,7 +212,7 @@ runXFTPRcvWorker c srv Worker {doWork} = do atomically . incXFTPServerStat c userId srv $ case e of XFTP _ XFTP.AUTH -> downloadAuthErrs _ -> downloadErrs - rcvWorkerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) e + rcvWorkerInternalError c rcvFileId rcvFileEntityId redirectEntityId_ (Just fileTmpPath) e downloadFileChunk :: RcvFileChunk -> RcvFileChunkReplica -> Bool -> AM () downloadFileChunk RcvFileChunk {userId, rcvFileId, rcvFileEntityId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath} replica approvedRelays = do unlessM ((approvedRelays ||) <$> ipAddressProtected') $ throwE $ FILE NOT_APPROVED @@ -262,11 +263,11 @@ retryOnError name loop done e = do then loop else done -rcvWorkerInternalError :: AgentClient -> DBRcvFileId -> RcvFileId -> Maybe FilePath -> AgentErrorType -> AM () -rcvWorkerInternalError c rcvFileId rcvFileEntityId tmpPath err = do +rcvWorkerInternalError :: AgentClient -> DBRcvFileId -> RcvFileId -> Maybe RcvFileId -> Maybe FilePath -> AgentErrorType -> AM () +rcvWorkerInternalError c rcvFileId rcvFileEntityId redirectEntityId_ tmpPath err = do lift $ forM_ tmpPath (removePath <=< toFSFilePath) withStore' c $ \db -> updateRcvFileError db rcvFileId (show err) - notify c rcvFileEntityId $ RFERR err + notify c (fromMaybe rcvFileEntityId redirectEntityId_) (RFERR err) runXFTPRcvLocalWorker :: AgentClient -> Worker -> AM () runXFTPRcvLocalWorker c Worker {doWork} = do @@ -279,8 +280,8 @@ runXFTPRcvLocalWorker c Worker {doWork} = do runXFTPOperation :: AgentConfig -> AM () runXFTPOperation AgentConfig {rcvFilesTTL} = withWork c doWork (`getNextRcvFileToDecrypt` rcvFilesTTL) $ - \f@RcvFile {rcvFileId, rcvFileEntityId, tmpPath} -> - decryptFile f `catchAgentError` rcvWorkerInternalError c rcvFileId rcvFileEntityId tmpPath + \f@RcvFile {rcvFileId, rcvFileEntityId, tmpPath, redirect} -> + decryptFile f `catchAgentError` rcvWorkerInternalError c rcvFileId rcvFileEntityId (redirectEntityId <$> redirect) tmpPath decryptFile :: RcvFile -> AM () decryptFile RcvFile {rcvFileId, rcvFileEntityId, size, digest, key, nonce, tmpPath, saveFile, status, chunks, redirect} = do let CryptoFile savePath cfArgs = saveFile diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index 07074e08f..d3eae9354 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -2525,7 +2525,7 @@ deleteRcvFile' :: DB.Connection -> DBRcvFileId -> IO () deleteRcvFile' db rcvFileId = DB.execute db "DELETE FROM rcv_files WHERE rcv_file_id = ?" (Only rcvFileId) -getNextRcvChunkToDownload :: DB.Connection -> XFTPServer -> NominalDiffTime -> IO (Either StoreError (Maybe (RcvFileChunk, Bool))) +getNextRcvChunkToDownload :: DB.Connection -> XFTPServer -> NominalDiffTime -> IO (Either StoreError (Maybe (RcvFileChunk, Bool, Maybe RcvFileId))) getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} ttl = do getWorkItem "rcv_file_download" getReplicaId getChunkData (markRcvFileFailed db . snd) where @@ -2549,7 +2549,7 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} ttl = d LIMIT 1 |] (host, port, keyHash, RFSReceiving, cutoffTs) - getChunkData :: (Int64, DBRcvFileId) -> IO (Either StoreError (RcvFileChunk, Bool)) + getChunkData :: (Int64, DBRcvFileId) -> IO (Either StoreError (RcvFileChunk, Bool, Maybe RcvFileId)) getChunkData (rcvFileChunkReplicaId, _fileId) = firstRow toChunk SEFileNotFound $ DB.query @@ -2558,7 +2558,7 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} ttl = d SELECT f.rcv_file_id, f.rcv_file_entity_id, f.user_id, c.rcv_file_chunk_id, c.chunk_no, c.chunk_size, c.digest, f.tmp_path, c.tmp_path, r.rcv_file_chunk_replica_id, r.replica_id, r.replica_key, r.received, r.delay, r.retries, - f.approved_relays + f.approved_relays, f.redirect_entity_id FROM rcv_file_chunk_replicas r JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id JOIN rcv_file_chunks c ON c.rcv_file_chunk_id = r.rcv_file_chunk_id @@ -2567,8 +2567,8 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} ttl = d |] (Only rcvFileChunkReplicaId) where - toChunk :: ((DBRcvFileId, RcvFileId, UserId, Int64, Int, FileSize Word32, FileDigest, FilePath, Maybe FilePath) :. (Int64, ChunkReplicaId, C.APrivateAuthKey, Bool, Maybe Int64, Int) :. Only Bool) -> (RcvFileChunk, Bool) - toChunk ((rcvFileId, rcvFileEntityId, userId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath) :. (rcvChunkReplicaId, replicaId, replicaKey, received, delay, retries) :. (Only approvedRelays)) = + toChunk :: ((DBRcvFileId, RcvFileId, UserId, Int64, Int, FileSize Word32, FileDigest, FilePath, Maybe FilePath) :. (Int64, ChunkReplicaId, C.APrivateAuthKey, Bool, Maybe Int64, Int) :. (Bool, Maybe RcvFileId)) -> (RcvFileChunk, Bool, Maybe RcvFileId) + toChunk ((rcvFileId, rcvFileEntityId, userId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath) :. (rcvChunkReplicaId, replicaId, replicaKey, received, delay, retries) :. (approvedRelays, redirectEntityId_)) = ( RcvFileChunk { rcvFileId, rcvFileEntityId, @@ -2581,7 +2581,8 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} ttl = d chunkTmpPath, replicas = [RcvFileChunkReplica {rcvChunkReplicaId, server, replicaId, replicaKey, received, delay, retries}] }, - approvedRelays + approvedRelays, + redirectEntityId_ ) getNextRcvFileToDecrypt :: DB.Connection -> NominalDiffTime -> IO (Either StoreError (Maybe RcvFile)) diff --git a/tests/AgentTests/SQLiteTests.hs b/tests/AgentTests/SQLiteTests.hs index 4a8d80dd4..22023cc96 100644 --- a/tests/AgentTests/SQLiteTests.hs +++ b/tests/AgentTests/SQLiteTests.hs @@ -741,7 +741,7 @@ testGetNextRcvChunkToDownload st = do show e `shouldContain` "ConversionFailed" DB.query_ db "SELECT rcv_file_id FROM rcv_files WHERE failed = 1" `shouldReturn` [Only (1 :: Int)] - Right (Just (RcvFileChunk {rcvFileEntityId}, _)) <- getNextRcvChunkToDownload db xftpServer1 86400 + Right (Just (RcvFileChunk {rcvFileEntityId}, _, Nothing)) <- getNextRcvChunkToDownload db xftpServer1 86400 rcvFileEntityId `shouldBe` fId2 testGetNextRcvFileToDecrypt :: SQLiteStore -> Expectation