mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-15 03:05:08 +00:00
xftp: report receive file error with redirected file ID, when redirect is present (#1304)
* xftp: report receive file error with redirected file ID, when redirect is present * fix test
This commit is contained in:
@@ -45,7 +45,7 @@ import Data.List (foldl', partition, sortOn)
|
||||
import qualified Data.List.NonEmpty as L
|
||||
import Data.Map.Strict (Map)
|
||||
import qualified Data.Map.Strict as M
|
||||
import Data.Maybe (mapMaybe)
|
||||
import Data.Maybe (fromMaybe, mapMaybe)
|
||||
import qualified Data.Set as S
|
||||
import Data.Text (Text)
|
||||
import Data.Time.Clock (getCurrentTime)
|
||||
@@ -190,8 +190,9 @@ runXFTPRcvWorker c srv Worker {doWork} = do
|
||||
runXFTPOperation :: AgentConfig -> AM ()
|
||||
runXFTPOperation AgentConfig {rcvFilesTTL, reconnectInterval = ri, xftpConsecutiveRetries} =
|
||||
withWork c doWork (\db -> getNextRcvChunkToDownload db srv rcvFilesTTL) $ \case
|
||||
(RcvFileChunk {rcvFileId, rcvFileEntityId, fileTmpPath, replicas = []}, _) -> rcvWorkerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) (INTERNAL "chunk has no replicas")
|
||||
(fc@RcvFileChunk {userId, rcvFileId, rcvFileEntityId, digest, fileTmpPath, replicas = replica@RcvFileChunkReplica {rcvChunkReplicaId, server, delay} : _}, approvedRelays) -> do
|
||||
(RcvFileChunk {rcvFileId, rcvFileEntityId, fileTmpPath, replicas = []}, _, redirectEntityId_) ->
|
||||
rcvWorkerInternalError c rcvFileId rcvFileEntityId redirectEntityId_ (Just fileTmpPath) (INTERNAL "chunk has no replicas")
|
||||
(fc@RcvFileChunk {userId, rcvFileId, rcvFileEntityId, digest, fileTmpPath, replicas = replica@RcvFileChunkReplica {rcvChunkReplicaId, server, delay} : _}, approvedRelays, redirectEntityId_) -> do
|
||||
let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay
|
||||
withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop -> do
|
||||
liftIO $ waitWhileSuspended c
|
||||
@@ -202,7 +203,7 @@ runXFTPRcvWorker c srv Worker {doWork} = do
|
||||
where
|
||||
retryLoop loop e replicaDelay = do
|
||||
flip catchAgentError (\_ -> pure ()) $ do
|
||||
when (serverHostError e) $ notify c rcvFileEntityId $ RFWARN e
|
||||
when (serverHostError e) $ notify c (fromMaybe rcvFileEntityId redirectEntityId_) (RFWARN e)
|
||||
liftIO $ closeXFTPServerClient c userId server digest
|
||||
withStore' c $ \db -> updateRcvChunkReplicaDelay db rcvChunkReplicaId replicaDelay
|
||||
liftIO $ assertAgentForeground c
|
||||
@@ -211,7 +212,7 @@ runXFTPRcvWorker c srv Worker {doWork} = do
|
||||
atomically . incXFTPServerStat c userId srv $ case e of
|
||||
XFTP _ XFTP.AUTH -> downloadAuthErrs
|
||||
_ -> downloadErrs
|
||||
rcvWorkerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) e
|
||||
rcvWorkerInternalError c rcvFileId rcvFileEntityId redirectEntityId_ (Just fileTmpPath) e
|
||||
downloadFileChunk :: RcvFileChunk -> RcvFileChunkReplica -> Bool -> AM ()
|
||||
downloadFileChunk RcvFileChunk {userId, rcvFileId, rcvFileEntityId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath} replica approvedRelays = do
|
||||
unlessM ((approvedRelays ||) <$> ipAddressProtected') $ throwE $ FILE NOT_APPROVED
|
||||
@@ -262,11 +263,11 @@ retryOnError name loop done e = do
|
||||
then loop
|
||||
else done
|
||||
|
||||
rcvWorkerInternalError :: AgentClient -> DBRcvFileId -> RcvFileId -> Maybe FilePath -> AgentErrorType -> AM ()
|
||||
rcvWorkerInternalError c rcvFileId rcvFileEntityId tmpPath err = do
|
||||
rcvWorkerInternalError :: AgentClient -> DBRcvFileId -> RcvFileId -> Maybe RcvFileId -> Maybe FilePath -> AgentErrorType -> AM ()
|
||||
rcvWorkerInternalError c rcvFileId rcvFileEntityId redirectEntityId_ tmpPath err = do
|
||||
lift $ forM_ tmpPath (removePath <=< toFSFilePath)
|
||||
withStore' c $ \db -> updateRcvFileError db rcvFileId (show err)
|
||||
notify c rcvFileEntityId $ RFERR err
|
||||
notify c (fromMaybe rcvFileEntityId redirectEntityId_) (RFERR err)
|
||||
|
||||
runXFTPRcvLocalWorker :: AgentClient -> Worker -> AM ()
|
||||
runXFTPRcvLocalWorker c Worker {doWork} = do
|
||||
@@ -279,8 +280,8 @@ runXFTPRcvLocalWorker c Worker {doWork} = do
|
||||
runXFTPOperation :: AgentConfig -> AM ()
|
||||
runXFTPOperation AgentConfig {rcvFilesTTL} =
|
||||
withWork c doWork (`getNextRcvFileToDecrypt` rcvFilesTTL) $
|
||||
\f@RcvFile {rcvFileId, rcvFileEntityId, tmpPath} ->
|
||||
decryptFile f `catchAgentError` rcvWorkerInternalError c rcvFileId rcvFileEntityId tmpPath
|
||||
\f@RcvFile {rcvFileId, rcvFileEntityId, tmpPath, redirect} ->
|
||||
decryptFile f `catchAgentError` rcvWorkerInternalError c rcvFileId rcvFileEntityId (redirectEntityId <$> redirect) tmpPath
|
||||
decryptFile :: RcvFile -> AM ()
|
||||
decryptFile RcvFile {rcvFileId, rcvFileEntityId, size, digest, key, nonce, tmpPath, saveFile, status, chunks, redirect} = do
|
||||
let CryptoFile savePath cfArgs = saveFile
|
||||
|
||||
@@ -2525,7 +2525,7 @@ deleteRcvFile' :: DB.Connection -> DBRcvFileId -> IO ()
|
||||
deleteRcvFile' db rcvFileId =
|
||||
DB.execute db "DELETE FROM rcv_files WHERE rcv_file_id = ?" (Only rcvFileId)
|
||||
|
||||
getNextRcvChunkToDownload :: DB.Connection -> XFTPServer -> NominalDiffTime -> IO (Either StoreError (Maybe (RcvFileChunk, Bool)))
|
||||
getNextRcvChunkToDownload :: DB.Connection -> XFTPServer -> NominalDiffTime -> IO (Either StoreError (Maybe (RcvFileChunk, Bool, Maybe RcvFileId)))
|
||||
getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} ttl = do
|
||||
getWorkItem "rcv_file_download" getReplicaId getChunkData (markRcvFileFailed db . snd)
|
||||
where
|
||||
@@ -2549,7 +2549,7 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} ttl = d
|
||||
LIMIT 1
|
||||
|]
|
||||
(host, port, keyHash, RFSReceiving, cutoffTs)
|
||||
getChunkData :: (Int64, DBRcvFileId) -> IO (Either StoreError (RcvFileChunk, Bool))
|
||||
getChunkData :: (Int64, DBRcvFileId) -> IO (Either StoreError (RcvFileChunk, Bool, Maybe RcvFileId))
|
||||
getChunkData (rcvFileChunkReplicaId, _fileId) =
|
||||
firstRow toChunk SEFileNotFound $
|
||||
DB.query
|
||||
@@ -2558,7 +2558,7 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} ttl = d
|
||||
SELECT
|
||||
f.rcv_file_id, f.rcv_file_entity_id, f.user_id, c.rcv_file_chunk_id, c.chunk_no, c.chunk_size, c.digest, f.tmp_path, c.tmp_path,
|
||||
r.rcv_file_chunk_replica_id, r.replica_id, r.replica_key, r.received, r.delay, r.retries,
|
||||
f.approved_relays
|
||||
f.approved_relays, f.redirect_entity_id
|
||||
FROM rcv_file_chunk_replicas r
|
||||
JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id
|
||||
JOIN rcv_file_chunks c ON c.rcv_file_chunk_id = r.rcv_file_chunk_id
|
||||
@@ -2567,8 +2567,8 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} ttl = d
|
||||
|]
|
||||
(Only rcvFileChunkReplicaId)
|
||||
where
|
||||
toChunk :: ((DBRcvFileId, RcvFileId, UserId, Int64, Int, FileSize Word32, FileDigest, FilePath, Maybe FilePath) :. (Int64, ChunkReplicaId, C.APrivateAuthKey, Bool, Maybe Int64, Int) :. Only Bool) -> (RcvFileChunk, Bool)
|
||||
toChunk ((rcvFileId, rcvFileEntityId, userId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath) :. (rcvChunkReplicaId, replicaId, replicaKey, received, delay, retries) :. (Only approvedRelays)) =
|
||||
toChunk :: ((DBRcvFileId, RcvFileId, UserId, Int64, Int, FileSize Word32, FileDigest, FilePath, Maybe FilePath) :. (Int64, ChunkReplicaId, C.APrivateAuthKey, Bool, Maybe Int64, Int) :. (Bool, Maybe RcvFileId)) -> (RcvFileChunk, Bool, Maybe RcvFileId)
|
||||
toChunk ((rcvFileId, rcvFileEntityId, userId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath) :. (rcvChunkReplicaId, replicaId, replicaKey, received, delay, retries) :. (approvedRelays, redirectEntityId_)) =
|
||||
( RcvFileChunk
|
||||
{ rcvFileId,
|
||||
rcvFileEntityId,
|
||||
@@ -2581,7 +2581,8 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} ttl = d
|
||||
chunkTmpPath,
|
||||
replicas = [RcvFileChunkReplica {rcvChunkReplicaId, server, replicaId, replicaKey, received, delay, retries}]
|
||||
},
|
||||
approvedRelays
|
||||
approvedRelays,
|
||||
redirectEntityId_
|
||||
)
|
||||
|
||||
getNextRcvFileToDecrypt :: DB.Connection -> NominalDiffTime -> IO (Either StoreError (Maybe RcvFile))
|
||||
|
||||
@@ -741,7 +741,7 @@ testGetNextRcvChunkToDownload st = do
|
||||
show e `shouldContain` "ConversionFailed"
|
||||
DB.query_ db "SELECT rcv_file_id FROM rcv_files WHERE failed = 1" `shouldReturn` [Only (1 :: Int)]
|
||||
|
||||
Right (Just (RcvFileChunk {rcvFileEntityId}, _)) <- getNextRcvChunkToDownload db xftpServer1 86400
|
||||
Right (Just (RcvFileChunk {rcvFileEntityId}, _, Nothing)) <- getNextRcvChunkToDownload db xftpServer1 86400
|
||||
rcvFileEntityId `shouldBe` fId2
|
||||
|
||||
testGetNextRcvFileToDecrypt :: SQLiteStore -> Expectation
|
||||
|
||||
Reference in New Issue
Block a user