diff --git a/src/Simplex/FileTransfer/Agent.hs b/src/Simplex/FileTransfer/Agent.hs index c0d705d0e..334ace1fd 100644 --- a/src/Simplex/FileTransfer/Agent.hs +++ b/src/Simplex/FileTransfer/Agent.hs @@ -13,10 +13,11 @@ module Simplex.FileTransfer.Agent receiveFile, addXFTPWorker, -- Sending files - sendFile, + _sendFile, ) where +import Control.Logger.Simple (logError) import Control.Monad import Control.Monad.Except import Control.Monad.Reader @@ -31,7 +32,7 @@ import Simplex.FileTransfer.Types import Simplex.FileTransfer.Util (uniqueCombine) import Simplex.Messaging.Agent.Client import Simplex.Messaging.Agent.Env.SQLite -import Simplex.Messaging.Agent.Protocol (ACommand (FRCVD), AParty (..), AgentErrorType (INTERNAL)) +import Simplex.Messaging.Agent.Protocol import Simplex.Messaging.Agent.RetryInterval import Simplex.Messaging.Agent.Store import Simplex.Messaging.Agent.Store.SQLite @@ -39,7 +40,7 @@ import qualified Simplex.Messaging.Crypto.Lazy as LC import Simplex.Messaging.Encoding import Simplex.Messaging.Protocol (XFTPServer) import qualified Simplex.Messaging.TMap as TM -import Simplex.Messaging.Util (whenM) +import Simplex.Messaging.Util (tshow, whenM) import UnliftIO import UnliftIO.Directory import qualified UnliftIO.Exception as E @@ -77,31 +78,44 @@ runXFTPWorker c srv doWork = do void . atomically $ readTMVar doWork agentOperationBracket c AORcvNetwork throwWhenInactive runXftpOperation where + noWorkToDo = void . atomically $ tryTakeTMVar doWork runXftpOperation :: m () runXftpOperation = do nextChunk <- withStore' c (`getNextRcvChunkToDownload` srv) case nextChunk of Nothing -> noWorkToDo - Just fc@RcvFileChunk {rcvChunkId, delay} -> do + Just RcvFileChunk {rcvFileId, replicas = []} -> workerInternalError c rcvFileId "chunk has no replicas" + Just fc@RcvFileChunk {rcvFileId, rcvChunkId, delay, replicas = replica@RcvFileChunkReplica {rcvChunkReplicaId} : _} -> do ri <- asks $ reconnectInterval . config let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay withRetryInterval ri' $ \delay' loop -> - downloadFileChunk fc - `catchError` \_ -> do - withStore' c $ \db -> updateRcvFileChunkDelay db rcvChunkId delay' - -- TODO don't loop on permanent errors - -- TODO increase replica retries count - loop - noWorkToDo = void . atomically $ tryTakeTMVar doWork - downloadFileChunk :: RcvFileChunk -> m () - downloadFileChunk RcvFileChunk {userId, rcvFileId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, replicas = replica : _} = do + downloadFileChunk fc replica + `catchError` retryOnError delay' loop (workerInternalError c rcvFileId . show) + where + retryOnError :: Int -> m () -> (AgentErrorType -> m ()) -> AgentErrorType -> m () + retryOnError chunkDelay loop done e = do + logError $ "XFTP worker error: " <> tshow e + if temporaryAgentError e + then retryLoop + else done e + where + retryLoop = do + withStore' c $ \db -> do + updateRcvFileChunkDelay db rcvChunkId chunkDelay + increaseRcvChunkReplicaRetries db rcvChunkReplicaId + atomically $ endAgentOperation c AORcvNetwork + atomically $ throwWhenInactive c + atomically $ beginAgentOperation c AORcvNetwork + loop + downloadFileChunk :: RcvFileChunk -> RcvFileChunkReplica -> m () + downloadFileChunk RcvFileChunk {userId, rcvFileId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath} replica = do chunkPath <- uniqueCombine fileTmpPath $ show chunkNo let chunkSpec = XFTPRcvChunkSpec chunkPath (unFileSize chunkSize) (unFileDigest digest) agentXFTPDownloadChunk c userId replica chunkSpec fileReceived <- withStore c $ \db -> runExceptT $ do -- both actions can be done in a single store method - fd <- ExceptT $ updateRcvFileChunkReceived db (rcvChunkReplicaId replica) rcvChunkId rcvFileId chunkPath - let fileReceived = allChunksReceived fd + f <- ExceptT $ updateRcvFileChunkReceived db (rcvChunkReplicaId replica) rcvChunkId rcvFileId chunkPath + let fileReceived = allChunksReceived f when fileReceived $ liftIO $ updateRcvFileStatus db rcvFileId RFSReceived pure fileReceived @@ -113,7 +127,14 @@ runXFTPWorker c srv doWork = do allChunksReceived :: RcvFile -> Bool allChunksReceived RcvFile {chunks} = all (\RcvFileChunk {replicas} -> any received replicas) chunks - downloadFileChunk _ = throwError $ INTERNAL "no replica" + +workerInternalError :: AgentMonad m => AgentClient -> RcvFileId -> String -> m () +workerInternalError c rcvFileId internalErrStr = do + withStore' c $ \db -> updateRcvFileError db rcvFileId internalErrStr + notifyInternalError c rcvFileId internalErrStr + +notifyInternalError :: (MonadUnliftIO m) => AgentClient -> RcvFileId -> String -> m () +notifyInternalError AgentClient {subQ} rcvFileId internalErrStr = atomically $ writeTBQueue subQ ("", "", FRCVERR rcvFileId $ INTERNAL internalErrStr) runXFTPLocalWorker :: forall m. AgentMonad m => AgentClient -> TMVar () -> m () runXFTPLocalWorker c@AgentClient {subQ} doWork = do @@ -126,14 +147,8 @@ runXFTPLocalWorker c@AgentClient {subQ} doWork = do nextFile <- withStore' c getNextRcvFileToDecrypt case nextFile of Nothing -> noWorkToDo - Just fd -> do - ri <- asks $ reconnectInterval . config - withRetryInterval ri $ \_ loop -> - decryptFile fd - `catchError` \_ -> do - -- TODO don't loop on permanent errors - -- TODO fixed number of retries instead of exponential backoff? - loop + Just f@RcvFile {rcvFileId} -> + decryptFile f `catchError` (workerInternalError c rcvFileId . show) noWorkToDo = void . atomically $ tryTakeTMVar doWork decryptFile :: RcvFile -> m () decryptFile RcvFile {rcvFileId, key, nonce, tmpPath, saveDir, chunks} = do @@ -183,15 +198,16 @@ runXFTPLocalWorker c@AgentClient {subQ} doWork = do ) LB.empty -sendFile :: AgentMonad m => AgentClient -> UserId -> FilePath -> FilePath -> m Int64 -sendFile c userId xftpPath filePath = do +-- _sendFile :: AgentMonad m => AgentClient -> UserId -> FilePath -> FilePath -> m Int64 +_sendFile :: AgentClient -> UserId -> FilePath -> FilePath -> m Int64 +_sendFile _c _userId _xftpPath _filePath = do -- db: create file in status New without chunks -- add local snd worker for encryption -- return file id to client undefined -runXFTPSndLocalWorker :: forall m. AgentMonad m => AgentClient -> TMVar () -> m () -runXFTPSndLocalWorker c@AgentClient {subQ} doWork = do +_runXFTPSndLocalWorker :: forall m. AgentMonad m => AgentClient -> TMVar () -> m () +_runXFTPSndLocalWorker _c doWork = do forever $ do void . atomically $ readTMVar doWork runXftpOperation @@ -202,8 +218,8 @@ runXFTPSndLocalWorker c@AgentClient {subQ} doWork = do -- ? (or Encrypted to retry create? - see below) -- with fixed retries (?) encryptFile undefined - encryptFile :: SndFile -> m () - encryptFile sndFile = do + _encryptFile :: SndFile -> m () + _encryptFile _sndFile = do -- if enc path exists, remove it -- if enc path doesn't exist: -- - choose enc path @@ -225,8 +241,8 @@ runXFTPSndLocalWorker c@AgentClient {subQ} doWork = do -- ? and add XFTP snd workers for uploading chunks. undefined -runXFTPSndWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> TMVar () -> m () -runXFTPSndWorker c srv doWork = do +_runXFTPSndWorker :: forall m. AgentMonad m => AgentClient -> XFTPServer -> TMVar () -> m () +_runXFTPSndWorker c _srv doWork = do forever $ do void . atomically $ readTMVar doWork agentOperationBracket c AOSndNetwork throwWhenInactive runXftpOperation @@ -238,8 +254,8 @@ runXFTPSndWorker c srv doWork = do -- - with fixed retries, repeat N times: -- check if other files are in upload, delay (see xftpSndFiles in XFTPAgent) undefined - uploadFileChunk :: SndFileChunk -> m () - uploadFileChunk sndFileChunk = do + _uploadFileChunk :: SndFileChunk -> m () + _uploadFileChunk _sndFileChunk = do -- add file id to xftpSndFiles -- XFTP upload chunk -- db: update replica status to Uploaded, return SndFile diff --git a/src/Simplex/FileTransfer/Types.hs b/src/Simplex/FileTransfer/Types.hs index a234528b1..8b748a6eb 100644 --- a/src/Simplex/FileTransfer/Types.hs +++ b/src/Simplex/FileTransfer/Types.hs @@ -51,12 +51,12 @@ data RcvFile = RcvFile } deriving (Eq, Show) --- TODO add error status? data RcvFileStatus = RFSReceiving | RFSReceived | RFSDecrypting | RFSComplete + | RFSError deriving (Eq, Show) instance FromField RcvFileStatus where fromField = fromTextField_ textDecode @@ -69,12 +69,14 @@ instance TextEncoding RcvFileStatus where "received" -> Just RFSReceived "decrypting" -> Just RFSDecrypting "complete" -> Just RFSComplete + "error" -> Just RFSError _ -> Nothing textEncode = \case RFSReceiving -> "receiving" RFSReceived -> "received" RFSDecrypting -> "decrypting" RFSComplete -> "complete" + RFSError -> "error" data RcvFileChunk = RcvFileChunk { userId :: Int64, @@ -96,7 +98,7 @@ data RcvFileChunkReplica = RcvFileChunkReplica replicaId :: ChunkReplicaId, replicaKey :: C.APrivateSignKey, received :: Bool, - acknowledged :: Bool, + -- acknowledged :: Bool, retries :: Int } deriving (Eq, Show) diff --git a/src/Simplex/Messaging/Agent/Protocol.hs b/src/Simplex/Messaging/Agent/Protocol.hs index f1e7cec1d..c18e432fb 100644 --- a/src/Simplex/Messaging/Agent/Protocol.hs +++ b/src/Simplex/Messaging/Agent/Protocol.hs @@ -285,6 +285,7 @@ data ACommand (p :: AParty) where ERR :: AgentErrorType -> ACommand Agent SUSPENDED :: ACommand Agent FRCVD :: RcvFileId -> FilePath -> ACommand Agent + FRCVERR :: RcvFileId -> AgentErrorType -> ACommand Agent deriving instance Eq (ACommand p) @@ -328,6 +329,7 @@ data ACommandTag (p :: AParty) where ERR_ :: ACommandTag Agent SUSPENDED_ :: ACommandTag Agent FRCVD_ :: ACommandTag Agent + FRCVERR_ :: ACommandTag Agent deriving instance Eq (ACommandTag p) @@ -370,6 +372,7 @@ aCommandTag = \case ERR _ -> ERR_ SUSPENDED -> SUSPENDED_ FRCVD {} -> FRCVD_ + FRCVERR {} -> FRCVERR_ data QueueDirection = QDRcv | QDSnd deriving (Eq, Show) @@ -1292,6 +1295,7 @@ instance APartyI p => StrEncoding (ACommandTag p) where ERR_ -> "ERR" SUSPENDED_ -> "SUSPENDED" FRCVD_ -> "FRCVD" + FRCVERR_ -> "FRCVERR" strP = (\(ACmdTag _ t) -> checkParty t) <$?> strP checkParty :: forall t p p'. (APartyI p, APartyI p') => t p' -> Either String (t p) @@ -1343,6 +1347,7 @@ commandP binaryP = ERR_ -> s (ERR <$> strP) SUSPENDED_ -> pure SUSPENDED FRCVD_ -> s (FRCVD <$> A.decimal <* A.space <*> strP) + FRCVERR_ -> s (FRCVERR <$> A.decimal <* A.space <*> strP) where s :: Parser a -> Parser a s p = A.space *> p @@ -1397,6 +1402,7 @@ serializeCommand = \case OK -> s OK_ SUSPENDED -> s SUSPENDED_ FRCVD fId fPath -> s (FRCVD_, Str $ bshow fId, fPath) + FRCVERR fId e -> s (FRCVERR_, Str $ bshow fId, e) where s :: StrEncoding a => a -> ByteString s = strEncode diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index 31171ed5e..859c1ac27 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -127,8 +127,10 @@ module Simplex.Messaging.Agent.Store.SQLite createRcvFile, getRcvFile, updateRcvFileChunkDelay, + increaseRcvChunkReplicaRetries, updateRcvFileChunkReceived, updateRcvFileStatus, + updateRcvFileError, updateRcvFileComplete, updateRcvFileChunkReplicaRetries, getNextRcvChunkToDownload, @@ -1809,7 +1811,7 @@ getRcvFile db rcvFileId = runExceptT $ do db [sql| SELECT - r.rcv_file_chunk_replica_id, r.replica_id, r.replica_key, r.received, r.acknowledged, r.retries, + r.rcv_file_chunk_replica_id, r.replica_id, r.replica_key, r.received, r.retries, s.xftp_host, s.xftp_port, s.xftp_key_hash FROM rcv_file_chunk_replicas r JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id @@ -1817,16 +1819,21 @@ getRcvFile db rcvFileId = runExceptT $ do |] (Only chunkId) where - toReplica :: (Int64, ChunkReplicaId, C.APrivateSignKey, Bool, Bool, Int, NonEmpty TransportHost, ServiceName, C.KeyHash) -> RcvFileChunkReplica - toReplica (rcvChunkReplicaId, replicaId, replicaKey, received, acknowledged, retries, host, port, keyHash) = + toReplica :: (Int64, ChunkReplicaId, C.APrivateSignKey, Bool, Int, NonEmpty TransportHost, ServiceName, C.KeyHash) -> RcvFileChunkReplica + toReplica (rcvChunkReplicaId, replicaId, replicaKey, received, retries, host, port, keyHash) = let server = XFTPServer host port keyHash - in RcvFileChunkReplica {rcvChunkReplicaId, server, replicaId, replicaKey, received, acknowledged, retries} + in RcvFileChunkReplica {rcvChunkReplicaId, server, replicaId, replicaKey, received, retries} updateRcvFileChunkDelay :: DB.Connection -> Int64 -> Int -> IO () updateRcvFileChunkDelay db chunkId delay = do updatedAt <- getCurrentTime DB.execute db "UPDATE rcv_file_chunks SET delay = ?, updated_at = ? WHERE rcv_file_chunk_id = ?" (delay, updatedAt, chunkId) +increaseRcvChunkReplicaRetries :: DB.Connection -> Int64 -> IO () +increaseRcvChunkReplicaRetries db replicaId = do + updatedAt <- getCurrentTime + DB.execute db "UPDATE rcv_file_chunk_replicas SET retries = retries + 1, updated_at = ? WHERE rcv_file_chunk_replica_id = ?" (updatedAt, replicaId) + updateRcvFileChunkReceived :: DB.Connection -> Int64 -> Int64 -> RcvFileId -> FilePath -> IO (Either StoreError RcvFile) updateRcvFileChunkReceived db rId cId fId chunkTmpPath = do updatedAt <- getCurrentTime @@ -1839,6 +1846,11 @@ updateRcvFileStatus db rcvFileId status = do updatedAt <- getCurrentTime DB.execute db "UPDATE rcv_files SET status = ?, updated_at = ? WHERE rcv_file_id = ?" (status, updatedAt, rcvFileId) +updateRcvFileError :: DB.Connection -> RcvFileId -> String -> IO () +updateRcvFileError db rcvFileId errStr = do + updatedAt <- getCurrentTime + DB.execute db "UPDATE rcv_files SET error = ?, status = ?, updated_at = ? WHERE rcv_file_id = ?" (errStr, RFSError, updatedAt, rcvFileId) + updateRcvFileComplete :: DB.Connection -> RcvFileId -> FilePath -> IO () updateRcvFileComplete db rcvFileId savePath = do updatedAt <- getCurrentTime @@ -1857,20 +1869,20 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} = do [sql| SELECT f.user_id, f.rcv_file_id, c.rcv_file_chunk_id, c.chunk_no, c.chunk_size, c.digest, f.tmp_path, c.tmp_path, c.delay, - r.rcv_file_chunk_replica_id, r.replica_id, r.replica_key, r.received, r.acknowledged, r.retries + r.rcv_file_chunk_replica_id, r.replica_id, r.replica_key, r.received, r.retries FROM rcv_file_chunk_replicas r JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id JOIN rcv_file_chunks c ON c.rcv_file_chunk_id = r.rcv_file_chunk_id JOIN rcv_files f ON f.rcv_file_id = c.rcv_file_id WHERE s.xftp_host = ? AND s.xftp_port = ? AND s.xftp_key_hash = ? - AND r.received = 0 AND r.replica_number = 1 + AND r.received = 0 AND r.replica_number = 1 AND f.status = ? ORDER BY r.created_at ASC LIMIT 1 |] - (host, port, keyHash) + (host, port, keyHash, RFSReceiving) where - toChunk :: ((UserId, RcvFileId, Int64, Int, FileSize Word32, FileDigest, FilePath, Maybe FilePath, Maybe Int) :. (Int64, ChunkReplicaId, C.APrivateSignKey, Bool, Bool, Int)) -> RcvFileChunk - toChunk ((userId, rcvFileId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath, delay) :. (rcvChunkReplicaId, replicaId, replicaKey, received, acknowledged, retries)) = + toChunk :: ((UserId, RcvFileId, Int64, Int, FileSize Word32, FileDigest, FilePath, Maybe FilePath, Maybe Int) :. (Int64, ChunkReplicaId, C.APrivateSignKey, Bool, Int)) -> RcvFileChunk + toChunk ((userId, rcvFileId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath, delay) :. (rcvChunkReplicaId, replicaId, replicaKey, received, retries)) = RcvFileChunk { userId, rcvFileId, @@ -1881,7 +1893,7 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} = do fileTmpPath, chunkTmpPath, delay, - replicas = [RcvFileChunkReplica {rcvChunkReplicaId, server, replicaId, replicaKey, received, acknowledged, retries}] + replicas = [RcvFileChunkReplica {rcvChunkReplicaId, server, replicaId, replicaKey, received, retries}] } getNextRcvFileToDecrypt :: DB.Connection -> IO (Maybe RcvFile) @@ -1896,15 +1908,18 @@ getNextRcvFileToDecrypt db = do getPendingRcvFilesServers :: DB.Connection -> IO [XFTPServer] getPendingRcvFilesServers db = do map toServer - <$> DB.query_ + <$> DB.query db [sql| SELECT DISTINCT s.xftp_host, s.xftp_port, s.xftp_key_hash FROM rcv_file_chunk_replicas r JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id - WHERE r.received = 0 AND r.replica_number = 1 + JOIN rcv_file_chunks c ON c.rcv_file_chunk_id = r.rcv_file_chunk_id + JOIN rcv_files f ON f.rcv_file_id = c.rcv_file_id + WHERE r.received = 0 AND r.replica_number = 1 AND f.status = ? |] + (Only RFSReceiving) where toServer :: (NonEmpty TransportHost, ServiceName, C.KeyHash) -> XFTPServer toServer (host, port, keyHash) = XFTPServer host port keyHash diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230223_files.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230223_files.hs index dd3f87853..dbfd0f5dc 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230223_files.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230223_files.hs @@ -30,6 +30,7 @@ CREATE TABLE rcv_files ( save_dir TEXT NOT NULL, save_path TEXT, status TEXT NOT NULL, + error TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')), updated_at TEXT NOT NULL DEFAULT (datetime('now')) ); @@ -58,7 +59,7 @@ CREATE TABLE rcv_file_chunk_replicas ( replica_id BLOB NOT NULL, replica_key BLOB NOT NULL, received INTEGER NOT NULL DEFAULT 0, - acknowledged INTEGER NOT NULL DEFAULT 0, + -- acknowledged INTEGER NOT NULL DEFAULT 0, retries INTEGER NOT NULL DEFAULT 0, created_at TEXT NOT NULL DEFAULT (datetime('now')), updated_at TEXT NOT NULL DEFAULT (datetime('now')) diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql index 44514b705..a96e6c847 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql @@ -304,6 +304,7 @@ CREATE TABLE rcv_files( save_dir TEXT NOT NULL, save_path TEXT, status TEXT NOT NULL, + error TEXT, created_at TEXT NOT NULL DEFAULT(datetime('now')), updated_at TEXT NOT NULL DEFAULT(datetime('now')) ); @@ -328,7 +329,7 @@ CREATE TABLE rcv_file_chunk_replicas( replica_id BLOB NOT NULL, replica_key BLOB NOT NULL, received INTEGER NOT NULL DEFAULT 0, - acknowledged INTEGER NOT NULL DEFAULT 0, + -- acknowledged INTEGER NOT NULL DEFAULT 0, retries INTEGER NOT NULL DEFAULT 0, created_at TEXT NOT NULL DEFAULT(datetime('now')), updated_at TEXT NOT NULL DEFAULT(datetime('now'))