xftp: agent error handling (#675)

- chunk download doesn't loop on permanent errors
- decryption errors are considered permanent - local worker doesn't retry
- update replica retries; to do - consider use for this field, or remove it
- rcv file Error status - to prevent repeat reads of chunks for download, files for decryption; also plan to use it for filtering on cleanup
- error string saved in separate field for debugging (not part of status type)
- agent event for rcv file errors
This commit is contained in:
spaced4ndy
2023-03-09 15:32:17 +04:00
committed by GitHub
parent 552759018e
commit deec963de8
6 changed files with 91 additions and 50 deletions
+50 -34
View File
@@ -13,10 +13,11 @@ module Simplex.FileTransfer.Agent
receiveFile,
addXFTPWorker,
-- Sending files
sendFile,
_sendFile,
)
where
import Control.Logger.Simple (logError)
import Control.Monad
import Control.Monad.Except
import Control.Monad.Reader
@@ -31,7 +32,7 @@ import Simplex.FileTransfer.Types
import Simplex.FileTransfer.Util (uniqueCombine)
import Simplex.Messaging.Agent.Client
import Simplex.Messaging.Agent.Env.SQLite
import Simplex.Messaging.Agent.Protocol (ACommand (FRCVD), AParty (..), AgentErrorType (INTERNAL))
import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Agent.RetryInterval
import Simplex.Messaging.Agent.Store
import Simplex.Messaging.Agent.Store.SQLite
@@ -39,7 +40,7 @@ import qualified Simplex.Messaging.Crypto.Lazy as LC
import Simplex.Messaging.Encoding
import Simplex.Messaging.Protocol (XFTPServer)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Util (whenM)
import Simplex.Messaging.Util (tshow, whenM)
import UnliftIO
import UnliftIO.Directory
import qualified UnliftIO.Exception as E
@@ -77,31 +78,44 @@ runXFTPWorker c srv doWork = do
void . atomically $ readTMVar doWork
agentOperationBracket c AORcvNetwork throwWhenInactive runXftpOperation
where
noWorkToDo = void . atomically $ tryTakeTMVar doWork
runXftpOperation :: m ()
runXftpOperation = do
nextChunk <- withStore' c (`getNextRcvChunkToDownload` srv)
case nextChunk of
Nothing -> noWorkToDo
Just fc@RcvFileChunk {rcvChunkId, delay} -> do
Just RcvFileChunk {rcvFileId, replicas = []} -> workerInternalError c rcvFileId "chunk has no replicas"
Just fc@RcvFileChunk {rcvFileId, rcvChunkId, delay, replicas = replica@RcvFileChunkReplica {rcvChunkReplicaId} : _} -> do
ri <- asks $ reconnectInterval . config
let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay
withRetryInterval ri' $ \delay' loop ->
downloadFileChunk fc
`catchError` \_ -> do
withStore' c $ \db -> updateRcvFileChunkDelay db rcvChunkId delay'
-- TODO don't loop on permanent errors
-- TODO increase replica retries count
loop
noWorkToDo = void . atomically $ tryTakeTMVar doWork
downloadFileChunk :: RcvFileChunk -> m ()
downloadFileChunk RcvFileChunk {userId, rcvFileId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, replicas = replica : _} = do
downloadFileChunk fc replica
`catchError` retryOnError delay' loop (workerInternalError c rcvFileId . show)
where
retryOnError :: Int -> m () -> (AgentErrorType -> m ()) -> AgentErrorType -> m ()
retryOnError chunkDelay loop done e = do
logError $ "XFTP worker error: " <> tshow e
if temporaryAgentError e
then retryLoop
else done e
where
retryLoop = do
withStore' c $ \db -> do
updateRcvFileChunkDelay db rcvChunkId chunkDelay
increaseRcvChunkReplicaRetries db rcvChunkReplicaId
atomically $ endAgentOperation c AORcvNetwork
atomically $ throwWhenInactive c
atomically $ beginAgentOperation c AORcvNetwork
loop
downloadFileChunk :: RcvFileChunk -> RcvFileChunkReplica -> m ()
downloadFileChunk RcvFileChunk {userId, rcvFileId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath} replica = do
chunkPath <- uniqueCombine fileTmpPath $ show chunkNo
let chunkSpec = XFTPRcvChunkSpec chunkPath (unFileSize chunkSize) (unFileDigest digest)
agentXFTPDownloadChunk c userId replica chunkSpec
fileReceived <- withStore c $ \db -> runExceptT $ do
-- both actions can be done in a single store method
fd <- ExceptT $ updateRcvFileChunkReceived db (rcvChunkReplicaId replica) rcvChunkId rcvFileId chunkPath
let fileReceived = allChunksReceived fd
f <- ExceptT $ updateRcvFileChunkReceived db (rcvChunkReplicaId replica) rcvChunkId rcvFileId chunkPath
let fileReceived = allChunksReceived f
when fileReceived $
liftIO $ updateRcvFileStatus db rcvFileId RFSReceived
pure fileReceived
@@ -113,7 +127,14 @@ runXFTPWorker c srv doWork = do
allChunksReceived :: RcvFile -> Bool
allChunksReceived RcvFile {chunks} =
all (\RcvFileChunk {replicas} -> any received replicas) chunks
downloadFileChunk _ = throwError $ INTERNAL "no replica"
workerInternalError :: AgentMonad m => AgentClient -> RcvFileId -> String -> m ()
workerInternalError c rcvFileId internalErrStr = do
withStore' c $ \db -> updateRcvFileError db rcvFileId internalErrStr
notifyInternalError c rcvFileId internalErrStr
notifyInternalError :: (MonadUnliftIO m) => AgentClient -> RcvFileId -> String -> m ()
notifyInternalError AgentClient {subQ} rcvFileId internalErrStr = atomically $ writeTBQueue subQ ("", "", FRCVERR rcvFileId $ INTERNAL internalErrStr)
runXFTPLocalWorker :: forall m. AgentMonad m => AgentClient -> TMVar () -> m ()
runXFTPLocalWorker c@AgentClient {subQ} doWork = do
@@ -126,14 +147,8 @@ runXFTPLocalWorker c@AgentClient {subQ} doWork = do
nextFile <- withStore' c getNextRcvFileToDecrypt
case nextFile of
Nothing -> noWorkToDo
Just fd -> do
ri <- asks $ reconnectInterval . config
withRetryInterval ri $ \_ loop ->
decryptFile fd
`catchError` \_ -> do
-- TODO don't loop on permanent errors
-- TODO fixed number of retries instead of exponential backoff?
loop
Just f@RcvFile {rcvFileId} ->
decryptFile f `catchError` (workerInternalError c rcvFileId . show)
noWorkToDo = void . atomically $ tryTakeTMVar doWork
decryptFile :: RcvFile -> m ()
decryptFile RcvFile {rcvFileId, key, nonce, tmpPath, saveDir, chunks} = do
@@ -183,15 +198,16 @@ runXFTPLocalWorker c@AgentClient {subQ} doWork = do
)
LB.empty
sendFile :: AgentMonad m => AgentClient -> UserId -> FilePath -> FilePath -> m Int64
sendFile c userId xftpPath filePath = do
-- _sendFile :: AgentMonad m => AgentClient -> UserId -> FilePath -> FilePath -> m Int64
_sendFile :: AgentClient -> UserId -> FilePath -> FilePath -> m Int64
_sendFile _c _userId _xftpPath _filePath = do
-- db: create file in status New without chunks
-- add local snd worker for encryption
-- return file id to client
undefined
runXFTPSndLocalWorker :: forall m. AgentMonad m => AgentClient -> TMVar () -> m ()
runXFTPSndLocalWorker c@AgentClient {subQ} doWork = do
_runXFTPSndLocalWorker :: forall m. AgentMonad m => AgentClient -> TMVar () -> m ()
_runXFTPSndLocalWorker _c doWork = do
forever $ do
void . atomically $ readTMVar doWork
runXftpOperation
@@ -202,8 +218,8 @@ runXFTPSndLocalWorker c@AgentClient {subQ} doWork = do
-- ? (or Encrypted to retry create? - see below)
-- with fixed retries (?) encryptFile
undefined
encryptFile :: SndFile -> m ()
encryptFile sndFile = do
_encryptFile :: SndFile -> m ()
_encryptFile _sndFile = do
-- if enc path exists, remove it
-- if enc path doesn't exist:
-- - choose enc path
@@ -225,8 +241,8 @@ runXFTPSndLocalWorker c@AgentClient {subQ} doWork = do
-- ? and add XFTP snd workers for uploading chunks.
undefined
runXFTPSndWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> TMVar () -> m ()
runXFTPSndWorker c srv doWork = do
_runXFTPSndWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> TMVar () -> m ()
_runXFTPSndWorker c _srv doWork = do
forever $ do
void . atomically $ readTMVar doWork
agentOperationBracket c AOSndNetwork throwWhenInactive runXftpOperation
@@ -238,8 +254,8 @@ runXFTPSndWorker c srv doWork = do
-- - with fixed retries, repeat N times:
-- check if other files are in upload, delay (see xftpSndFiles in XFTPAgent)
undefined
uploadFileChunk :: SndFileChunk -> m ()
uploadFileChunk sndFileChunk = do
_uploadFileChunk :: SndFileChunk -> m ()
_uploadFileChunk _sndFileChunk = do
-- add file id to xftpSndFiles
-- XFTP upload chunk
-- db: update replica status to Uploaded, return SndFile
+4 -2
View File
@@ -51,12 +51,12 @@ data RcvFile = RcvFile
}
deriving (Eq, Show)
-- TODO add error status?
data RcvFileStatus
= RFSReceiving
| RFSReceived
| RFSDecrypting
| RFSComplete
| RFSError
deriving (Eq, Show)
instance FromField RcvFileStatus where fromField = fromTextField_ textDecode
@@ -69,12 +69,14 @@ instance TextEncoding RcvFileStatus where
"received" -> Just RFSReceived
"decrypting" -> Just RFSDecrypting
"complete" -> Just RFSComplete
"error" -> Just RFSError
_ -> Nothing
textEncode = \case
RFSReceiving -> "receiving"
RFSReceived -> "received"
RFSDecrypting -> "decrypting"
RFSComplete -> "complete"
RFSError -> "error"
data RcvFileChunk = RcvFileChunk
{ userId :: Int64,
@@ -96,7 +98,7 @@ data RcvFileChunkReplica = RcvFileChunkReplica
replicaId :: ChunkReplicaId,
replicaKey :: C.APrivateSignKey,
received :: Bool,
acknowledged :: Bool,
-- acknowledged :: Bool,
retries :: Int
}
deriving (Eq, Show)
+6
View File
@@ -285,6 +285,7 @@ data ACommand (p :: AParty) where
ERR :: AgentErrorType -> ACommand Agent
SUSPENDED :: ACommand Agent
FRCVD :: RcvFileId -> FilePath -> ACommand Agent
FRCVERR :: RcvFileId -> AgentErrorType -> ACommand Agent
deriving instance Eq (ACommand p)
@@ -328,6 +329,7 @@ data ACommandTag (p :: AParty) where
ERR_ :: ACommandTag Agent
SUSPENDED_ :: ACommandTag Agent
FRCVD_ :: ACommandTag Agent
FRCVERR_ :: ACommandTag Agent
deriving instance Eq (ACommandTag p)
@@ -370,6 +372,7 @@ aCommandTag = \case
ERR _ -> ERR_
SUSPENDED -> SUSPENDED_
FRCVD {} -> FRCVD_
FRCVERR {} -> FRCVERR_
data QueueDirection = QDRcv | QDSnd
deriving (Eq, Show)
@@ -1292,6 +1295,7 @@ instance APartyI p => StrEncoding (ACommandTag p) where
ERR_ -> "ERR"
SUSPENDED_ -> "SUSPENDED"
FRCVD_ -> "FRCVD"
FRCVERR_ -> "FRCVERR"
strP = (\(ACmdTag _ t) -> checkParty t) <$?> strP
checkParty :: forall t p p'. (APartyI p, APartyI p') => t p' -> Either String (t p)
@@ -1343,6 +1347,7 @@ commandP binaryP =
ERR_ -> s (ERR <$> strP)
SUSPENDED_ -> pure SUSPENDED
FRCVD_ -> s (FRCVD <$> A.decimal <* A.space <*> strP)
FRCVERR_ -> s (FRCVERR <$> A.decimal <* A.space <*> strP)
where
s :: Parser a -> Parser a
s p = A.space *> p
@@ -1397,6 +1402,7 @@ serializeCommand = \case
OK -> s OK_
SUSPENDED -> s SUSPENDED_
FRCVD fId fPath -> s (FRCVD_, Str $ bshow fId, fPath)
FRCVERR fId e -> s (FRCVERR_, Str $ bshow fId, e)
where
s :: StrEncoding a => a -> ByteString
s = strEncode
+27 -12
View File
@@ -127,8 +127,10 @@ module Simplex.Messaging.Agent.Store.SQLite
createRcvFile,
getRcvFile,
updateRcvFileChunkDelay,
increaseRcvChunkReplicaRetries,
updateRcvFileChunkReceived,
updateRcvFileStatus,
updateRcvFileError,
updateRcvFileComplete,
updateRcvFileChunkReplicaRetries,
getNextRcvChunkToDownload,
@@ -1809,7 +1811,7 @@ getRcvFile db rcvFileId = runExceptT $ do
db
[sql|
SELECT
r.rcv_file_chunk_replica_id, r.replica_id, r.replica_key, r.received, r.acknowledged, r.retries,
r.rcv_file_chunk_replica_id, r.replica_id, r.replica_key, r.received, r.retries,
s.xftp_host, s.xftp_port, s.xftp_key_hash
FROM rcv_file_chunk_replicas r
JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id
@@ -1817,16 +1819,21 @@ getRcvFile db rcvFileId = runExceptT $ do
|]
(Only chunkId)
where
toReplica :: (Int64, ChunkReplicaId, C.APrivateSignKey, Bool, Bool, Int, NonEmpty TransportHost, ServiceName, C.KeyHash) -> RcvFileChunkReplica
toReplica (rcvChunkReplicaId, replicaId, replicaKey, received, acknowledged, retries, host, port, keyHash) =
toReplica :: (Int64, ChunkReplicaId, C.APrivateSignKey, Bool, Int, NonEmpty TransportHost, ServiceName, C.KeyHash) -> RcvFileChunkReplica
toReplica (rcvChunkReplicaId, replicaId, replicaKey, received, retries, host, port, keyHash) =
let server = XFTPServer host port keyHash
in RcvFileChunkReplica {rcvChunkReplicaId, server, replicaId, replicaKey, received, acknowledged, retries}
in RcvFileChunkReplica {rcvChunkReplicaId, server, replicaId, replicaKey, received, retries}
updateRcvFileChunkDelay :: DB.Connection -> Int64 -> Int -> IO ()
updateRcvFileChunkDelay db chunkId delay = do
updatedAt <- getCurrentTime
DB.execute db "UPDATE rcv_file_chunks SET delay = ?, updated_at = ? WHERE rcv_file_chunk_id = ?" (delay, updatedAt, chunkId)
increaseRcvChunkReplicaRetries :: DB.Connection -> Int64 -> IO ()
increaseRcvChunkReplicaRetries db replicaId = do
updatedAt <- getCurrentTime
DB.execute db "UPDATE rcv_file_chunk_replicas SET retries = retries + 1, updated_at = ? WHERE rcv_file_chunk_replica_id = ?" (updatedAt, replicaId)
updateRcvFileChunkReceived :: DB.Connection -> Int64 -> Int64 -> RcvFileId -> FilePath -> IO (Either StoreError RcvFile)
updateRcvFileChunkReceived db rId cId fId chunkTmpPath = do
updatedAt <- getCurrentTime
@@ -1839,6 +1846,11 @@ updateRcvFileStatus db rcvFileId status = do
updatedAt <- getCurrentTime
DB.execute db "UPDATE rcv_files SET status = ?, updated_at = ? WHERE rcv_file_id = ?" (status, updatedAt, rcvFileId)
updateRcvFileError :: DB.Connection -> RcvFileId -> String -> IO ()
updateRcvFileError db rcvFileId errStr = do
updatedAt <- getCurrentTime
DB.execute db "UPDATE rcv_files SET error = ?, status = ?, updated_at = ? WHERE rcv_file_id = ?" (errStr, RFSError, updatedAt, rcvFileId)
updateRcvFileComplete :: DB.Connection -> RcvFileId -> FilePath -> IO ()
updateRcvFileComplete db rcvFileId savePath = do
updatedAt <- getCurrentTime
@@ -1857,20 +1869,20 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} = do
[sql|
SELECT
f.user_id, f.rcv_file_id, c.rcv_file_chunk_id, c.chunk_no, c.chunk_size, c.digest, f.tmp_path, c.tmp_path, c.delay,
r.rcv_file_chunk_replica_id, r.replica_id, r.replica_key, r.received, r.acknowledged, r.retries
r.rcv_file_chunk_replica_id, r.replica_id, r.replica_key, r.received, r.retries
FROM rcv_file_chunk_replicas r
JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id
JOIN rcv_file_chunks c ON c.rcv_file_chunk_id = r.rcv_file_chunk_id
JOIN rcv_files f ON f.rcv_file_id = c.rcv_file_id
WHERE s.xftp_host = ? AND s.xftp_port = ? AND s.xftp_key_hash = ?
AND r.received = 0 AND r.replica_number = 1
AND r.received = 0 AND r.replica_number = 1 AND f.status = ?
ORDER BY r.created_at ASC
LIMIT 1
|]
(host, port, keyHash)
(host, port, keyHash, RFSReceiving)
where
toChunk :: ((UserId, RcvFileId, Int64, Int, FileSize Word32, FileDigest, FilePath, Maybe FilePath, Maybe Int) :. (Int64, ChunkReplicaId, C.APrivateSignKey, Bool, Bool, Int)) -> RcvFileChunk
toChunk ((userId, rcvFileId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath, delay) :. (rcvChunkReplicaId, replicaId, replicaKey, received, acknowledged, retries)) =
toChunk :: ((UserId, RcvFileId, Int64, Int, FileSize Word32, FileDigest, FilePath, Maybe FilePath, Maybe Int) :. (Int64, ChunkReplicaId, C.APrivateSignKey, Bool, Int)) -> RcvFileChunk
toChunk ((userId, rcvFileId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath, delay) :. (rcvChunkReplicaId, replicaId, replicaKey, received, retries)) =
RcvFileChunk
{ userId,
rcvFileId,
@@ -1881,7 +1893,7 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} = do
fileTmpPath,
chunkTmpPath,
delay,
replicas = [RcvFileChunkReplica {rcvChunkReplicaId, server, replicaId, replicaKey, received, acknowledged, retries}]
replicas = [RcvFileChunkReplica {rcvChunkReplicaId, server, replicaId, replicaKey, received, retries}]
}
getNextRcvFileToDecrypt :: DB.Connection -> IO (Maybe RcvFile)
@@ -1896,15 +1908,18 @@ getNextRcvFileToDecrypt db = do
getPendingRcvFilesServers :: DB.Connection -> IO [XFTPServer]
getPendingRcvFilesServers db = do
map toServer
<$> DB.query_
<$> DB.query
db
[sql|
SELECT DISTINCT
s.xftp_host, s.xftp_port, s.xftp_key_hash
FROM rcv_file_chunk_replicas r
JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id
WHERE r.received = 0 AND r.replica_number = 1
JOIN rcv_file_chunks c ON c.rcv_file_chunk_id = r.rcv_file_chunk_id
JOIN rcv_files f ON f.rcv_file_id = c.rcv_file_id
WHERE r.received = 0 AND r.replica_number = 1 AND f.status = ?
|]
(Only RFSReceiving)
where
toServer :: (NonEmpty TransportHost, ServiceName, C.KeyHash) -> XFTPServer
toServer (host, port, keyHash) = XFTPServer host port keyHash
@@ -30,6 +30,7 @@ CREATE TABLE rcv_files (
save_dir TEXT NOT NULL,
save_path TEXT,
status TEXT NOT NULL,
error TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
@@ -58,7 +59,7 @@ CREATE TABLE rcv_file_chunk_replicas (
replica_id BLOB NOT NULL,
replica_key BLOB NOT NULL,
received INTEGER NOT NULL DEFAULT 0,
acknowledged INTEGER NOT NULL DEFAULT 0,
-- acknowledged INTEGER NOT NULL DEFAULT 0,
retries INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
@@ -304,6 +304,7 @@ CREATE TABLE rcv_files(
save_dir TEXT NOT NULL,
save_path TEXT,
status TEXT NOT NULL,
error TEXT,
created_at TEXT NOT NULL DEFAULT(datetime('now')),
updated_at TEXT NOT NULL DEFAULT(datetime('now'))
);
@@ -328,7 +329,7 @@ CREATE TABLE rcv_file_chunk_replicas(
replica_id BLOB NOT NULL,
replica_key BLOB NOT NULL,
received INTEGER NOT NULL DEFAULT 0,
acknowledged INTEGER NOT NULL DEFAULT 0,
-- acknowledged INTEGER NOT NULL DEFAULT 0,
retries INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT(datetime('now')),
updated_at TEXT NOT NULL DEFAULT(datetime('now'))