xftp: notify about errors on rcv files retry; process snd files errors (#700)

This commit is contained in:
spaced4ndy
2023-03-30 16:26:33 +04:00
committed by GitHub
parent b5869cf169
commit e27c4f7b81
4 changed files with 36 additions and 15 deletions
+26 -15
View File
@@ -161,6 +161,8 @@ runXFTPWorker c srv doWork = do
else done e
where
retryLoop = do
notifyOnRetry <- asks (xftpNotifyErrsOnRetry . config)
when notifyOnRetry $ notifyInternalError c rcvFileEntityId $ show e
closeXFTPServerClient c userId replica
withStore' c $ \db -> updateRcvChunkReplicaDelay db rcvChunkReplicaId replicaDelay
atomically $ endAgentOperation c AORcvNetwork
@@ -266,21 +268,30 @@ sendFileExperimental c@AgentClient {xftpServers} userId filePath numRecipients =
createDirectory outputDir
let tempPath = workPath </> "snd"
createDirectoryIfMissing False tempPath
let sendOptions =
SendOptions
{ filePath,
outputDir = Just outputDir,
numRecipients,
xftpServers = xftpSrvs,
retryCount = 3,
tempPath = Just tempPath,
verbose = False
}
liftCLI $ cliSendFileOpts sendOptions False $ notify c sndFileId .: SFPROG
(sndDescr, rcvDescrs) <- readDescrs outputDir fileName
removePath tempPath
removePath outputDir
liftIO $ notify c sndFileId $ SFDONE sndDescr rcvDescrs
runSend fileName outputDir tempPath `catchError` \e -> do
cleanup outputDir tempPath
liftIO $ notify c sndFileId $ SFERR e
where
runSend :: String -> FilePath -> FilePath -> m ()
runSend fileName outputDir tempPath = do
let sendOptions =
SendOptions
{ filePath,
outputDir = Just outputDir,
numRecipients,
xftpServers = xftpSrvs,
retryCount = 3,
tempPath = Just tempPath,
verbose = False
}
liftCLI $ cliSendFileOpts sendOptions False $ notify c sndFileId .: SFPROG
(sndDescr, rcvDescrs) <- readDescrs outputDir fileName
cleanup outputDir tempPath
liftIO $ notify c sndFileId $ SFDONE sndDescr rcvDescrs
cleanup :: FilePath -> FilePath -> m ()
cleanup outputDir tempPath = do
removePath tempPath
removePath outputDir
liftCLI :: ExceptT CLIError IO () -> m ()
liftCLI = either (throwError . INTERNAL . show) pure <=< liftIO . runExceptT
readDescrs :: FilePath -> FilePath -> m (ValidFileDescription 'FSender, [ValidFileDescription 'FRecipient])
@@ -82,6 +82,7 @@ data AgentConfig = AgentConfig
initialCleanupDelay :: Int,
cleanupInterval :: Int,
rcvFilesTTL :: NominalDiffTime,
xftpNotifyErrsOnRetry :: Bool,
deleteErrorCount :: Int,
ntfCron :: Word16,
ntfWorkerDelay :: Int,
@@ -142,6 +143,7 @@ defaultAgentConfig =
initialCleanupDelay = 30 * 1000000, -- 30 seconds
cleanupInterval = 30 * 60 * 1000000, -- 30 minutes
rcvFilesTTL = 2 * nominalDay,
xftpNotifyErrsOnRetry = True,
deleteErrorCount = 10,
ntfCron = 20, -- minutes
ntfWorkerDelay = 100000, -- microseconds
+7
View File
@@ -341,6 +341,7 @@ data ACommand (p :: AParty) (e :: AEntity) where
RFERR :: AgentErrorType -> ACommand Agent AERcvFile
SFPROG :: Int64 -> Int64 -> ACommand Agent AESndFile
SFDONE :: ValidFileDescription 'FSender -> [ValidFileDescription 'FRecipient] -> ACommand Agent AESndFile
SFERR :: AgentErrorType -> ACommand Agent AESndFile
deriving instance Eq (ACommand p e)
@@ -398,6 +399,7 @@ data ACommandTag (p :: AParty) (e :: AEntity) where
RFERR_ :: ACommandTag Agent AERcvFile
SFPROG_ :: ACommandTag Agent AESndFile
SFDONE_ :: ACommandTag Agent AESndFile
SFERR_ :: ACommandTag Agent AESndFile
deriving instance Eq (ACommandTag p e)
@@ -447,6 +449,7 @@ aCommandTag = \case
RFERR {} -> RFERR_
SFPROG {} -> SFPROG_
SFDONE {} -> SFDONE_
SFERR {} -> SFERR_
data QueueDirection = QDRcv | QDSnd
deriving (Eq, Show)
@@ -1339,6 +1342,7 @@ instance StrEncoding ACmdTag where
"RFERR" -> at SAERcvFile RFERR_
"SFPROG" -> at SAESndFile SFPROG_
"SFDONE" -> at SAESndFile SFDONE_
"SFERR" -> at SAESndFile SFERR_
_ -> fail "bad ACmdTag"
where
t = pure . ACmdTag SClient SAEConn
@@ -1391,6 +1395,7 @@ instance (APartyI p, AEntityI e) => StrEncoding (ACommandTag p e) where
RFERR_ -> "RFERR"
SFPROG_ -> "SFPROG"
SFDONE_ -> "SFDONE"
SFERR_ -> "SFERR"
strP = (\(APCT _ t) -> checkEntity t) <$?> strP
checkParty :: forall t p p'. (APartyI p, APartyI p') => t p' -> Either String (t p)
@@ -1451,6 +1456,7 @@ commandP binaryP =
RFERR_ -> s (RFERR <$> strP)
SFPROG_ -> s (SFPROG <$> A.decimal <* A.space <*> A.decimal)
SFDONE_ -> s (sfDone . safeDecodeUtf8 <$?> binaryP)
SFERR_ -> s (SFERR <$> strP)
where
s :: Parser a -> Parser a
s p = A.space *> p
@@ -1515,6 +1521,7 @@ serializeCommand = \case
RFERR e -> s (RFERR_, e)
SFPROG sent total -> s (SFPROG_, sent, total)
SFDONE sd rds -> B.unwords [s SFDONE_, serializeBinary (sfDone sd rds)]
SFERR e -> s (SFERR_, e)
where
s :: StrEncoding a => a -> ByteString
s = strEncode
+1
View File
@@ -197,6 +197,7 @@ agentCfg =
smpCfg = defaultClientConfig {qSize = 1, defaultTransport = (testPort, transport @TLS)},
ntfCfg = defaultClientConfig {qSize = 1, defaultTransport = (ntfTestPort, transport @TLS)},
reconnectInterval = defaultReconnectInterval {initialInterval = 50_000},
xftpNotifyErrsOnRetry = False,
ntfWorkerDelay = 1000,
ntfSMPWorkerDelay = 1000,
caCertificateFile = "tests/fixtures/ca.crt",