mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-30 14:16:00 +00:00
agent: separate type for agent file errors (#1185)
This commit is contained in:
@@ -57,6 +57,7 @@ import Simplex.FileTransfer.Protocol (FileParty (..), SFileParty (..))
|
||||
import Simplex.FileTransfer.Transport (XFTPRcvChunkSpec (..))
|
||||
import qualified Simplex.FileTransfer.Transport as XFTP
|
||||
import Simplex.FileTransfer.Types
|
||||
import qualified Simplex.FileTransfer.Types as FT
|
||||
import Simplex.FileTransfer.Util (removePath, uniqueCombine)
|
||||
import Simplex.Messaging.Agent.Client
|
||||
import Simplex.Messaging.Agent.Env.SQLite
|
||||
@@ -71,6 +72,7 @@ import qualified Simplex.Messaging.Crypto.Lazy as LC
|
||||
import Simplex.Messaging.Encoding
|
||||
import Simplex.Messaging.Encoding.String (strDecode, strEncode)
|
||||
import Simplex.Messaging.Protocol (EntityId, XFTPServer)
|
||||
import qualified Simplex.Messaging.TMap as TM
|
||||
import Simplex.Messaging.Util (catchAll_, liftError, tshow, unlessM, whenM)
|
||||
import System.FilePath (takeFileName, (</>))
|
||||
import UnliftIO
|
||||
@@ -175,7 +177,7 @@ runXFTPRcvWorker c srv Worker {doWork} = do
|
||||
runXFTPOperation cfg
|
||||
where
|
||||
runXFTPOperation :: AgentConfig -> AM ()
|
||||
runXFTPOperation AgentConfig {rcvFilesTTL, reconnectInterval = ri, xftpNotifyErrsOnRetry = notifyOnRetry, xftpConsecutiveRetries} =
|
||||
runXFTPOperation AgentConfig {rcvFilesTTL, reconnectInterval = ri, xftpConsecutiveRetries} =
|
||||
withWork c doWork (\db -> getNextRcvChunkToDownload db srv rcvFilesTTL) $ \case
|
||||
(RcvFileChunk {rcvFileId, rcvFileEntityId, fileTmpPath, replicas = []}, _) -> rcvWorkerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) (INTERNAL "chunk has no replicas")
|
||||
(fc@RcvFileChunk {userId, rcvFileId, rcvFileEntityId, digest, fileTmpPath, replicas = replica@RcvFileChunkReplica {rcvChunkReplicaId, server, delay} : _}, approvedRelays) -> do
|
||||
@@ -187,7 +189,7 @@ runXFTPRcvWorker c srv Worker {doWork} = do
|
||||
where
|
||||
retryLoop loop e replicaDelay = do
|
||||
flip catchAgentError (\_ -> pure ()) $ do
|
||||
when notifyOnRetry $ notify c rcvFileEntityId $ RFERR e
|
||||
when (serverHostError e) $ notify c rcvFileEntityId $ RFWARN e
|
||||
liftIO $ closeXFTPServerClient c userId server digest
|
||||
withStore' c $ \db -> updateRcvChunkReplicaDelay db rcvChunkReplicaId replicaDelay
|
||||
atomically $ assertAgentForeground c
|
||||
@@ -195,7 +197,7 @@ runXFTPRcvWorker c srv Worker {doWork} = do
|
||||
retryDone = rcvWorkerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath)
|
||||
downloadFileChunk :: RcvFileChunk -> RcvFileChunkReplica -> Bool -> AM ()
|
||||
downloadFileChunk RcvFileChunk {userId, rcvFileId, rcvFileEntityId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath} replica approvedRelays = do
|
||||
unlessM ((approvedRelays ||) <$> ipAddressProtected') $ throwE $ XFTP "" XFTP.NOT_APPROVED
|
||||
unlessM ((approvedRelays ||) <$> ipAddressProtected') $ throwE $ FILE NOT_APPROVED
|
||||
fsFileTmpPath <- lift $ toFSFilePath fileTmpPath
|
||||
chunkPath <- uniqueCombine fsFileTmpPath $ show chunkNo
|
||||
let chunkSpec = XFTPRcvChunkSpec chunkPath (unFileSize chunkSize) (unFileDigest digest)
|
||||
@@ -236,7 +238,7 @@ withRetryIntervalLimit maxN ri action =
|
||||
retryOnError :: Text -> AM a -> AM a -> AgentErrorType -> AM a
|
||||
retryOnError name loop done e = do
|
||||
logError $ name <> " error: " <> tshow e
|
||||
if temporaryAgentError e
|
||||
if temporaryOrHostError e
|
||||
then loop
|
||||
else done
|
||||
|
||||
@@ -272,7 +274,7 @@ runXFTPRcvLocalWorker c Worker {doWork} = do
|
||||
encDigest <- liftIO $ LC.sha512Hash <$> readChunks chunkPaths
|
||||
when (FileDigest encDigest /= digest) $ throwE $ XFTP "" XFTP.DIGEST
|
||||
let destFile = CryptoFile fsSavePath cfArgs
|
||||
void $ liftError (INTERNAL . show) $ decryptChunks encSize chunkPaths key nonce $ \_ -> pure destFile
|
||||
void $ liftError (FILE . FILE_IO . show) $ decryptChunks encSize chunkPaths key nonce $ \_ -> pure destFile
|
||||
case redirect of
|
||||
Nothing -> do
|
||||
notify c rcvFileEntityId $ RFDONE fsSavePath
|
||||
@@ -285,13 +287,13 @@ runXFTPRcvLocalWorker c Worker {doWork} = do
|
||||
atomically $ waitUntilForeground c
|
||||
withStore' c (`updateRcvFileComplete` rcvFileId)
|
||||
-- proceed with redirect
|
||||
yaml <- liftError (INTERNAL . show) (CF.readFile $ CryptoFile fsSavePath cfArgs) `agentFinally` (lift $ toFSFilePath fsSavePath >>= removePath)
|
||||
yaml <- liftError (FILE . FILE_IO . show) (CF.readFile $ CryptoFile fsSavePath cfArgs) `agentFinally` (lift $ toFSFilePath fsSavePath >>= removePath)
|
||||
next@FileDescription {chunks = nextChunks} <- case strDecode (LB.toStrict yaml) of
|
||||
-- TODO switch to another error constructor
|
||||
Left _ -> throwE . XFTP "" $ XFTP.REDIRECT "decode error"
|
||||
Left _ -> throwE . FILE $ REDIRECT "decode error"
|
||||
Right (ValidFileDescription fd@FileDescription {size = dstSize, digest = dstDigest})
|
||||
| dstSize /= redirectSize -> throwE . XFTP "" $ XFTP.REDIRECT "size mismatch"
|
||||
| dstDigest /= redirectDigest -> throwE . XFTP "" $ XFTP.REDIRECT "digest mismatch"
|
||||
| dstSize /= redirectSize -> throwE . FILE $ REDIRECT "size mismatch"
|
||||
| dstDigest /= redirectDigest -> throwE . FILE $ REDIRECT "digest mismatch"
|
||||
| otherwise -> pure fd
|
||||
-- register and download chunks from the actual file
|
||||
withStore c $ \db -> updateRcvFileRedirect db redirectDbId next
|
||||
@@ -349,7 +351,7 @@ xftpSendDescription' c userId (ValidFileDescription fdDirect@FileDescription {si
|
||||
let directYaml = prefixPath </> "direct.yaml"
|
||||
cfArgs <- atomically $ CF.randomArgs g
|
||||
let file = CryptoFile directYaml (Just cfArgs)
|
||||
liftError (INTERNAL . show) $ CF.writeFile file (LB.fromStrict $ strEncode fdDirect)
|
||||
liftError (FILE . FILE_IO . show) $ CF.writeFile file (LB.fromStrict $ strEncode fdDirect)
|
||||
key <- atomically $ C.randomSbKey g
|
||||
nonce <- atomically $ C.randomCbNonce g
|
||||
fId <- withStore c $ \db -> createSndFile db g userId file numRecipients relPrefixPath key nonce $ Just RedirectFileInfo {size, digest}
|
||||
@@ -377,11 +379,11 @@ runXFTPSndPrepareWorker c Worker {doWork} = do
|
||||
runXFTPOperation cfg@AgentConfig {sndFilesTTL} =
|
||||
withWork c doWork (`getNextSndFileToPrepare` sndFilesTTL) $
|
||||
\f@SndFile {sndFileId, sndFileEntityId, prefixPath} ->
|
||||
prepareFile cfg f `catchAgentError` (sndWorkerInternalError c sndFileId sndFileEntityId prefixPath . show)
|
||||
prepareFile cfg f `catchAgentError` sndWorkerInternalError c sndFileId sndFileEntityId prefixPath
|
||||
prepareFile :: AgentConfig -> SndFile -> AM ()
|
||||
prepareFile _ SndFile {prefixPath = Nothing} =
|
||||
throwE $ INTERNAL "no prefix path"
|
||||
prepareFile cfg sndFile@SndFile {sndFileId, userId, prefixPath = Just ppath, status} = do
|
||||
prepareFile cfg sndFile@SndFile {sndFileId, sndFileEntityId, userId, prefixPath = Just ppath, status} = do
|
||||
SndFile {numRecipients, chunks} <-
|
||||
if status /= SFSEncrypted -- status is SFSNew or SFSEncrypting
|
||||
then do
|
||||
@@ -406,17 +408,17 @@ runXFTPSndPrepareWorker c Worker {doWork} = do
|
||||
let CryptoFile {filePath} = srcFile
|
||||
fileName = takeFileName filePath
|
||||
fileSize <- liftIO $ fromInteger <$> CF.getFileContentsSize srcFile
|
||||
when (fileSize > maxFileSizeHard) $ throwE $ INTERNAL "max file size exceeded"
|
||||
when (fileSize > maxFileSizeHard) $ throwE $ FILE FT.SIZE
|
||||
let fileHdr = smpEncode FileHeader {fileName, fileExtra = Nothing}
|
||||
fileSize' = fromIntegral (B.length fileHdr) + fileSize
|
||||
payloadSize = fileSize' + fileSizeLen + authTagSize
|
||||
chunkSizes <- case redirect of
|
||||
Nothing -> pure $ prepareChunkSizes payloadSize
|
||||
Just _ -> case singleChunkSize payloadSize of
|
||||
Nothing -> throwE $ INTERNAL "max file size exceeded for redirect"
|
||||
Nothing -> throwE $ FILE FT.SIZE
|
||||
Just chunkSize -> pure [chunkSize]
|
||||
let encSize = sum $ map fromIntegral chunkSizes
|
||||
void $ liftError (INTERNAL . show) $ encryptFile srcFile fileHdr key nonce fileSize' encSize fsEncPath
|
||||
void $ liftError (FILE . FILE_IO . show) $ encryptFile srcFile fileHdr key nonce fileSize' encSize fsEncPath
|
||||
digest <- liftIO $ LC.sha512Hash <$> LB.readFile fsEncPath
|
||||
let chunkSpecs = prepareChunkSpecs fsEncPath chunkSizes
|
||||
chunkDigests <- liftIO $ mapM getChunkDigest chunkSpecs
|
||||
@@ -430,24 +432,32 @@ runXFTPSndPrepareWorker c Worker {doWork} = do
|
||||
where
|
||||
tryCreate = do
|
||||
usedSrvs <- newTVarIO ([] :: [XFTPServer])
|
||||
withRetryInterval (riFast ri) $ \_ loop -> do
|
||||
let AgentClient {xftpServers} = c
|
||||
userSrvCount <- length <$> atomically (TM.lookup userId xftpServers)
|
||||
withRetryIntervalCount (riFast ri) $ \n _ loop -> do
|
||||
liftIO $ waitForUserNetwork c
|
||||
let triedAllSrvs = n > userSrvCount
|
||||
createWithNextSrv usedSrvs
|
||||
`catchAgentError` \e -> retryOnError "XFTP prepare worker" (retryLoop loop) (throwE e) e
|
||||
`catchAgentError` \e -> retryOnError "XFTP prepare worker" (retryLoop loop triedAllSrvs e) (throwE e) e
|
||||
where
|
||||
retryLoop loop = atomically (assertAgentForeground c) >> loop
|
||||
-- we don't do closeXFTPServerClient here to not risk closing connection for concurrent chunk upload
|
||||
retryLoop loop triedAllSrvs e = do
|
||||
flip catchAgentError (\_ -> pure ()) $ do
|
||||
when (triedAllSrvs && serverHostError e) $ notify c sndFileEntityId $ SFWARN e
|
||||
atomically $ assertAgentForeground c
|
||||
loop
|
||||
createWithNextSrv usedSrvs = do
|
||||
deleted <- withStore' c $ \db -> getSndFileDeleted db sndFileId
|
||||
when deleted $ throwE $ INTERNAL "file deleted, aborting chunk creation"
|
||||
when deleted $ throwE $ FILE NO_FILE
|
||||
withNextSrv c userId usedSrvs [] $ \srvAuth -> do
|
||||
replica <- agentXFTPNewChunk c ch numRecipients' srvAuth
|
||||
pure (replica, srvAuth)
|
||||
|
||||
sndWorkerInternalError :: AgentClient -> DBSndFileId -> SndFileId -> Maybe FilePath -> String -> AM ()
|
||||
sndWorkerInternalError c sndFileId sndFileEntityId prefixPath internalErrStr = do
|
||||
sndWorkerInternalError :: AgentClient -> DBSndFileId -> SndFileId -> Maybe FilePath -> AgentErrorType -> AM ()
|
||||
sndWorkerInternalError c sndFileId sndFileEntityId prefixPath err = do
|
||||
lift . forM_ prefixPath $ removePath <=< toFSFilePath
|
||||
withStore' c $ \db -> updateSndFileError db sndFileId internalErrStr
|
||||
notify c sndFileEntityId $ SFERR $ INTERNAL internalErrStr
|
||||
withStore' c $ \db -> updateSndFileError db sndFileId (show err)
|
||||
notify c sndFileEntityId $ SFERR err
|
||||
|
||||
runXFTPSndWorker :: AgentClient -> XFTPServer -> Worker -> AM ()
|
||||
runXFTPSndWorker c srv Worker {doWork} = do
|
||||
@@ -458,9 +468,9 @@ runXFTPSndWorker c srv Worker {doWork} = do
|
||||
runXFTPOperation cfg
|
||||
where
|
||||
runXFTPOperation :: AgentConfig -> AM ()
|
||||
runXFTPOperation cfg@AgentConfig {sndFilesTTL, reconnectInterval = ri, xftpNotifyErrsOnRetry = notifyOnRetry, xftpConsecutiveRetries} = do
|
||||
runXFTPOperation cfg@AgentConfig {sndFilesTTL, reconnectInterval = ri, xftpConsecutiveRetries} = do
|
||||
withWork c doWork (\db -> getNextSndChunkToUpload db srv sndFilesTTL) $ \case
|
||||
SndFileChunk {sndFileId, sndFileEntityId, filePrefixPath, replicas = []} -> sndWorkerInternalError c sndFileId sndFileEntityId (Just filePrefixPath) "chunk has no replicas"
|
||||
SndFileChunk {sndFileId, sndFileEntityId, filePrefixPath, replicas = []} -> sndWorkerInternalError c sndFileId sndFileEntityId (Just filePrefixPath) (INTERNAL "chunk has no replicas")
|
||||
fc@SndFileChunk {userId, sndFileId, sndFileEntityId, filePrefixPath, digest, replicas = replica@SndFileChunkReplica {sndChunkReplicaId, server, delay} : _} -> do
|
||||
let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay
|
||||
withRetryIntervalLimit xftpConsecutiveRetries ri' $ \delay' loop -> do
|
||||
@@ -470,17 +480,17 @@ runXFTPSndWorker c srv Worker {doWork} = do
|
||||
where
|
||||
retryLoop loop e replicaDelay = do
|
||||
flip catchAgentError (\_ -> pure ()) $ do
|
||||
when notifyOnRetry $ notify c sndFileEntityId $ SFERR e
|
||||
when (serverHostError e) $ notify c sndFileEntityId $ SFWARN e
|
||||
liftIO $ closeXFTPServerClient c userId server digest
|
||||
withStore' c $ \db -> updateSndChunkReplicaDelay db sndChunkReplicaId replicaDelay
|
||||
atomically $ assertAgentForeground c
|
||||
loop
|
||||
retryDone e = sndWorkerInternalError c sndFileId sndFileEntityId (Just filePrefixPath) (show e)
|
||||
retryDone = sndWorkerInternalError c sndFileId sndFileEntityId (Just filePrefixPath)
|
||||
uploadFileChunk :: AgentConfig -> SndFileChunk -> SndFileChunkReplica -> AM ()
|
||||
uploadFileChunk AgentConfig {xftpMaxRecipientsPerRequest = maxRecipients} sndFileChunk@SndFileChunk {sndFileId, userId, chunkSpec = chunkSpec@XFTPChunkSpec {filePath}, digest = chunkDigest} replica = do
|
||||
replica'@SndFileChunkReplica {sndChunkReplicaId} <- addRecipients sndFileChunk replica
|
||||
fsFilePath <- lift $ toFSFilePath filePath
|
||||
unlessM (doesFileExist fsFilePath) $ throwE $ INTERNAL "encrypted file doesn't exist on upload"
|
||||
unlessM (doesFileExist fsFilePath) $ throwE $ FILE NO_FILE
|
||||
let chunkSpec' = chunkSpec {filePath = fsFilePath} :: XFTPChunkSpec
|
||||
atomically $ assertAgentForeground c
|
||||
agentXFTPUploadChunk c userId chunkDigest replica' chunkSpec'
|
||||
@@ -624,7 +634,7 @@ runXFTPDelWorker c srv Worker {doWork} = do
|
||||
runXFTPOperation cfg
|
||||
where
|
||||
runXFTPOperation :: AgentConfig -> AM ()
|
||||
runXFTPOperation AgentConfig {rcvFilesTTL, reconnectInterval = ri, xftpNotifyErrsOnRetry = notifyOnRetry, xftpConsecutiveRetries} = do
|
||||
runXFTPOperation AgentConfig {rcvFilesTTL, reconnectInterval = ri, xftpConsecutiveRetries} = do
|
||||
-- no point in deleting files older than rcv ttl, as they will be expired on server
|
||||
withWork c doWork (\db -> getNextDeletedSndChunkReplica db srv rcvFilesTTL) processDeletedReplica
|
||||
where
|
||||
@@ -637,7 +647,7 @@ runXFTPDelWorker c srv Worker {doWork} = do
|
||||
where
|
||||
retryLoop loop e replicaDelay = do
|
||||
flip catchAgentError (\_ -> pure ()) $ do
|
||||
when notifyOnRetry $ notify c "" $ SFERR e
|
||||
when (serverHostError e) $ notify c "" $ SFWARN e
|
||||
liftIO $ closeXFTPServerClient c userId server chunkDigest
|
||||
withStore' c $ \db -> updateDeletedSndChunkReplicaDelay db deletedSndChunkReplicaId replicaDelay
|
||||
atomically $ assertAgentForeground c
|
||||
|
||||
@@ -223,10 +223,6 @@ data XFTPErrorType
|
||||
FILE_IO
|
||||
| -- | file sending or receiving timeout
|
||||
TIMEOUT
|
||||
| -- | bad redirect data
|
||||
REDIRECT {redirectError :: String}
|
||||
| -- | cannot proceed with download from not approved relays without proxy
|
||||
NOT_APPROVED
|
||||
| -- | internal server error
|
||||
INTERNAL
|
||||
| -- | used internally, never returned by the server (to be removed)
|
||||
@@ -236,11 +232,9 @@ data XFTPErrorType
|
||||
instance StrEncoding XFTPErrorType where
|
||||
strEncode = \case
|
||||
CMD e -> "CMD " <> bshow e
|
||||
REDIRECT e -> "REDIRECT " <> bshow e
|
||||
e -> bshow e
|
||||
strP =
|
||||
"CMD " *> (CMD <$> parseRead1)
|
||||
<|> "REDIRECT " *> (REDIRECT <$> parseRead A.takeByteString)
|
||||
<|> parseRead1
|
||||
|
||||
instance Encoding XFTPErrorType where
|
||||
@@ -258,8 +252,6 @@ instance Encoding XFTPErrorType where
|
||||
HAS_FILE -> "HAS_FILE"
|
||||
FILE_IO -> "FILE_IO"
|
||||
TIMEOUT -> "TIMEOUT"
|
||||
REDIRECT err -> "REDIRECT " <> smpEncode err
|
||||
NOT_APPROVED -> "NOT_APPROVED"
|
||||
INTERNAL -> "INTERNAL"
|
||||
DUPLICATE_ -> "DUPLICATE_"
|
||||
|
||||
@@ -278,8 +270,6 @@ instance Encoding XFTPErrorType where
|
||||
"HAS_FILE" -> pure HAS_FILE
|
||||
"FILE_IO" -> pure FILE_IO
|
||||
"TIMEOUT" -> pure TIMEOUT
|
||||
"REDIRECT" -> REDIRECT <$> _smpP
|
||||
"NOT_APPROVED" -> pure NOT_APPROVED
|
||||
"INTERNAL" -> pure INTERNAL
|
||||
"DUPLICATE_" -> pure DUPLICATE_
|
||||
_ -> fail "bad error type"
|
||||
|
||||
@@ -2,24 +2,33 @@
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE TemplateHaskell #-}
|
||||
|
||||
module Simplex.FileTransfer.Types where
|
||||
|
||||
import qualified Data.Aeson.TH as J
|
||||
import qualified Data.Attoparsec.ByteString.Char8 as A
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import Data.Int (Int64)
|
||||
import qualified Data.Text as T
|
||||
import Data.Text.Encoding (encodeUtf8)
|
||||
import Data.Word (Word32)
|
||||
import Database.SQLite.Simple.FromField (FromField (..))
|
||||
import Database.SQLite.Simple.ToField (ToField (..))
|
||||
import Simplex.FileTransfer.Client (XFTPChunkSpec (..))
|
||||
import Simplex.FileTransfer.Description
|
||||
import Simplex.Messaging.Agent.Protocol (RcvFileId, SndFileId)
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Crypto.File (CryptoFile (..))
|
||||
import Simplex.Messaging.Encoding
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Parsers (fromTextField_)
|
||||
import Simplex.Messaging.Protocol
|
||||
import Simplex.Messaging.Parsers
|
||||
import Simplex.Messaging.Protocol (XFTPServer)
|
||||
import System.FilePath ((</>))
|
||||
|
||||
type RcvFileId = ByteString
|
||||
|
||||
type SndFileId = ByteString
|
||||
|
||||
authTagSize :: Int64
|
||||
authTagSize = fromIntegral C.authTagSize
|
||||
|
||||
@@ -236,3 +245,35 @@ data DeletedSndChunkReplica = DeletedSndChunkReplica
|
||||
retries :: Int
|
||||
}
|
||||
deriving (Show)
|
||||
|
||||
data FileErrorType
|
||||
= -- | cannot proceed with download from not approved relays without proxy
|
||||
NOT_APPROVED
|
||||
| -- | max file size exceeded
|
||||
SIZE
|
||||
| -- | bad redirect data
|
||||
REDIRECT {redirectError :: String}
|
||||
| -- | file crypto error
|
||||
FILE_IO {fileIOError :: String}
|
||||
| -- | file not found or was deleted
|
||||
NO_FILE
|
||||
deriving (Eq, Show)
|
||||
|
||||
instance StrEncoding FileErrorType where
|
||||
strP =
|
||||
A.takeTill (== ' ')
|
||||
>>= \case
|
||||
"NOT_APPROVED" -> pure NOT_APPROVED
|
||||
"SIZE" -> pure SIZE
|
||||
"REDIRECT" -> REDIRECT <$> (A.space *> textP)
|
||||
"FILE_IO" -> FILE_IO <$> (A.space *> textP)
|
||||
"NO_FILE" -> pure NO_FILE
|
||||
_ -> fail "bad FileErrorType"
|
||||
strEncode = \case
|
||||
NOT_APPROVED -> "NOT_APPROVED"
|
||||
SIZE -> "SIZE"
|
||||
REDIRECT e -> "REDIRECT " <> encodeUtf8 (T.pack e)
|
||||
FILE_IO e -> "FILE_IO " <> encodeUtf8 (T.pack e)
|
||||
NO_FILE -> "NO_FILE"
|
||||
|
||||
$(J.deriveJSON (sumTypeJSON id) ''FileErrorType)
|
||||
|
||||
@@ -148,6 +148,7 @@ import Data.Word (Word16)
|
||||
import Simplex.FileTransfer.Agent (closeXFTPAgent, deleteSndFileInternal, deleteSndFileRemote, deleteSndFilesInternal, deleteSndFilesRemote, startXFTPWorkers, toFSFilePath, xftpDeleteRcvFile', xftpDeleteRcvFiles', xftpReceiveFile', xftpSendDescription', xftpSendFile')
|
||||
import Simplex.FileTransfer.Description (ValidFileDescription)
|
||||
import Simplex.FileTransfer.Protocol (FileParty (..))
|
||||
import Simplex.FileTransfer.Types (RcvFileId, SndFileId)
|
||||
import Simplex.FileTransfer.Util (removePath)
|
||||
import Simplex.Messaging.Agent.Client
|
||||
import Simplex.Messaging.Agent.Env.SQLite
|
||||
|
||||
@@ -105,7 +105,6 @@ data AgentConfig = AgentConfig
|
||||
storedMsgDataTTL :: NominalDiffTime,
|
||||
rcvFilesTTL :: NominalDiffTime,
|
||||
sndFilesTTL :: NominalDiffTime,
|
||||
xftpNotifyErrsOnRetry :: Bool,
|
||||
xftpConsecutiveRetries :: Int,
|
||||
xftpMaxRecipientsPerRequest :: Int,
|
||||
deleteErrorCount :: Int,
|
||||
@@ -176,7 +175,6 @@ defaultAgentConfig =
|
||||
storedMsgDataTTL = 21 * nominalDay,
|
||||
rcvFilesTTL = 2 * nominalDay,
|
||||
sndFilesTTL = nominalDay,
|
||||
xftpNotifyErrsOnRetry = True,
|
||||
xftpConsecutiveRetries = 3,
|
||||
xftpMaxRecipientsPerRequest = 200,
|
||||
deleteErrorCount = 10,
|
||||
|
||||
@@ -115,8 +115,6 @@ module Simplex.Messaging.Agent.Protocol
|
||||
cryptoErrToSyncState,
|
||||
ATransmission,
|
||||
ConnId,
|
||||
RcvFileId,
|
||||
SndFileId,
|
||||
ConfirmationId,
|
||||
InvitationId,
|
||||
MsgIntegrity (..),
|
||||
@@ -169,6 +167,7 @@ import Database.SQLite.Simple.ToField
|
||||
import Simplex.FileTransfer.Description
|
||||
import Simplex.FileTransfer.Protocol (FileParty (..))
|
||||
import Simplex.FileTransfer.Transport (XFTPErrorType)
|
||||
import Simplex.FileTransfer.Types (FileErrorType)
|
||||
import Simplex.Messaging.Agent.QueryString
|
||||
import Simplex.Messaging.Client (ProxyClientError)
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
@@ -352,9 +351,11 @@ data AEvent (e :: AEntity) where
|
||||
RFPROG :: Int64 -> Int64 -> AEvent AERcvFile
|
||||
RFDONE :: FilePath -> AEvent AERcvFile
|
||||
RFERR :: AgentErrorType -> AEvent AERcvFile
|
||||
RFWARN :: AgentErrorType -> AEvent AERcvFile
|
||||
SFPROG :: Int64 -> Int64 -> AEvent AESndFile
|
||||
SFDONE :: ValidFileDescription 'FSender -> [ValidFileDescription 'FRecipient] -> AEvent AESndFile
|
||||
SFERR :: AgentErrorType -> AEvent AESndFile
|
||||
SFWARN :: AgentErrorType -> AEvent AESndFile
|
||||
|
||||
deriving instance Eq (AEvent e)
|
||||
|
||||
@@ -420,9 +421,11 @@ data AEventTag (e :: AEntity) where
|
||||
RFDONE_ :: AEventTag AERcvFile
|
||||
RFPROG_ :: AEventTag AERcvFile
|
||||
RFERR_ :: AEventTag AERcvFile
|
||||
RFWARN_ :: AEventTag AERcvFile
|
||||
SFPROG_ :: AEventTag AESndFile
|
||||
SFDONE_ :: AEventTag AESndFile
|
||||
SFERR_ :: AEventTag AESndFile
|
||||
SFWARN_ :: AEventTag AESndFile
|
||||
|
||||
deriving instance Eq (AEventTag e)
|
||||
|
||||
@@ -470,9 +473,11 @@ aEventTag = \case
|
||||
RFPROG {} -> RFPROG_
|
||||
RFDONE {} -> RFDONE_
|
||||
RFERR {} -> RFERR_
|
||||
RFWARN {} -> RFWARN_
|
||||
SFPROG {} -> SFPROG_
|
||||
SFDONE {} -> SFDONE_
|
||||
SFERR {} -> SFERR_
|
||||
SFWARN {} -> SFWARN_
|
||||
|
||||
data QueueDirection = QDRcv | QDSnd
|
||||
deriving (Eq, Show)
|
||||
@@ -1077,10 +1082,6 @@ connModeT = \case
|
||||
-- | SMP agent connection ID.
|
||||
type ConnId = ByteString
|
||||
|
||||
type RcvFileId = ByteString
|
||||
|
||||
type SndFileId = ByteString
|
||||
|
||||
type ConfirmationId = ByteString
|
||||
|
||||
type InvitationId = ByteString
|
||||
@@ -1316,6 +1317,8 @@ data AgentErrorType
|
||||
NTF {serverAddress :: String, ntfErr :: ErrorType}
|
||||
| -- | XFTP protocol errors forwarded to agent clients
|
||||
XFTP {serverAddress :: String, xftpErr :: XFTPErrorType}
|
||||
| -- | XFTP agent errors
|
||||
FILE {fileErr :: FileErrorType}
|
||||
| -- | SMP proxy errors
|
||||
PROXY {proxyServer :: String, relayServer :: String, proxyErr :: ProxyClientError}
|
||||
| -- | XRCP protocol errors forwarded to agent clients
|
||||
|
||||
@@ -24,7 +24,7 @@ import Database.SQLite.Simple (ResultError (..), SQLData (..))
|
||||
import Database.SQLite.Simple.FromField (FieldParser, returnError)
|
||||
import Database.SQLite.Simple.Internal (Field (..))
|
||||
import Database.SQLite.Simple.Ok (Ok (Ok))
|
||||
import Simplex.Messaging.Util ((<$?>))
|
||||
import Simplex.Messaging.Util (safeDecodeUtf8, (<$?>))
|
||||
import Text.Read (readMaybe)
|
||||
|
||||
base64P :: Parser ByteString
|
||||
@@ -154,3 +154,6 @@ singleFieldJSON_ objectTag tagModifier =
|
||||
|
||||
defaultJSON :: J.Options
|
||||
defaultJSON = J.defaultOptions {J.omitNothingFields = True}
|
||||
|
||||
textP :: Parser String
|
||||
textP = T.unpack . safeDecodeUtf8 <$> A.takeByteString
|
||||
|
||||
@@ -158,6 +158,8 @@ pGet' c skipWarn = do
|
||||
DISCONNECT {} -> pGet c
|
||||
ERR (BROKER _ NETWORK) -> pGet c
|
||||
MWARN {} | skipWarn -> pGet c
|
||||
RFWARN {} | skipWarn -> pGet c
|
||||
SFWARN {} | skipWarn -> pGet c
|
||||
_ -> pure t
|
||||
|
||||
pattern CONF :: ConfirmationId -> [SMPServer] -> ConnInfo -> AEvent e
|
||||
|
||||
@@ -71,7 +71,6 @@ agentCfg =
|
||||
ntfCfg = defaultNTFClientConfig {qSize = 1, defaultTransport = (ntfTestPort, transport @TLS), networkConfig},
|
||||
reconnectInterval = fastRetryInterval,
|
||||
persistErrorInterval = 1,
|
||||
xftpNotifyErrsOnRetry = False,
|
||||
ntfWorkerDelay = 100,
|
||||
ntfSMPWorkerDelay = 100,
|
||||
caCertificateFile = "tests/fixtures/ca.crt",
|
||||
|
||||
@@ -20,15 +20,17 @@ import Data.Int (Int64)
|
||||
import Data.List (find, isSuffixOf)
|
||||
import Data.Maybe (fromJust)
|
||||
import SMPAgentClient (agentCfg, initAgentServers, testDB, testDB2, testDB3)
|
||||
import SMPClient (xit'')
|
||||
import Simplex.FileTransfer.Client (XFTPClientConfig (..))
|
||||
import Simplex.FileTransfer.Description (FileChunk (..), FileDescription (..), FileDescriptionURI (..), ValidFileDescription, fileDescriptionURI, kb, mb, qrSizeLimit, pattern ValidFileDescription)
|
||||
import Simplex.FileTransfer.Protocol (FileParty (..))
|
||||
import Simplex.FileTransfer.Server.Env (XFTPServerConfig (..))
|
||||
import Simplex.FileTransfer.Transport (XFTPErrorType (AUTH))
|
||||
import Simplex.FileTransfer.Types (RcvFileId, SndFileId)
|
||||
import Simplex.Messaging.Agent (AgentClient, testProtocolServer, xftpDeleteRcvFile, xftpDeleteSndFileInternal, xftpDeleteSndFileRemote, xftpReceiveFile, xftpSendDescription, xftpSendFile, xftpStartWorkers)
|
||||
import Simplex.Messaging.Agent.Client (ProtocolTestFailure (..), ProtocolTestStep (..))
|
||||
import Simplex.Messaging.Agent.Env.SQLite (AgentConfig, xftpCfg)
|
||||
import Simplex.Messaging.Agent.Protocol (AEvent (..), AgentErrorType (..), BrokerErrorType (..), RcvFileId, SndFileId, noAuthSrv)
|
||||
import Simplex.Messaging.Agent.Protocol (AEvent (..), AgentErrorType (..), BrokerErrorType (..), noAuthSrv)
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Crypto.File (CryptoFile (..), CryptoFileArgs)
|
||||
import qualified Simplex.Messaging.Crypto.File as CF
|
||||
@@ -58,7 +60,7 @@ xftpAgentTests = around_ testBracket . describe "agent XFTP API" $ do
|
||||
it "should resume receiving file after restart" testXFTPAgentReceiveRestore
|
||||
it "should cleanup rcv tmp path after permanent error" testXFTPAgentReceiveCleanup
|
||||
it "should resume sending file after restart" testXFTPAgentSendRestore
|
||||
xit "should cleanup snd prefix path after permanent error" testXFTPAgentSendCleanup
|
||||
xit'' "should cleanup snd prefix path after permanent error" testXFTPAgentSendCleanup
|
||||
it "should delete sent file on server" testXFTPAgentDelete
|
||||
it "should resume deleting file after restart" testXFTPAgentDeleteRestore
|
||||
-- TODO when server is fixed to correctly send AUTH error, this test has to be modified to expect AUTH error
|
||||
@@ -475,7 +477,7 @@ testXFTPAgentSendCleanup = withGlobalLogging logCfgNoLogs $ do
|
||||
-- send file - should fail with AUTH error
|
||||
withAgent 2 agentCfg initAgentServers testDB $ \sndr' -> do
|
||||
runRight_ $ xftpStartWorkers sndr' (Just senderFiles)
|
||||
("", sfId', SFERR (INTERNAL "XFTP {serverAddress = \"xftp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:7000\", xftpErr = AUTH}")) <-
|
||||
("", sfId', SFERR (XFTP "xftp://LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=@localhost:7000" AUTH)) <-
|
||||
sfGet sndr'
|
||||
sfId' `shouldBe` sfId
|
||||
|
||||
|
||||
Reference in New Issue
Block a user