diff --git a/src/Simplex/FileTransfer/Agent.hs b/src/Simplex/FileTransfer/Agent.hs index a989c20fe..3e76bf579 100644 --- a/src/Simplex/FileTransfer/Agent.hs +++ b/src/Simplex/FileTransfer/Agent.hs @@ -78,16 +78,15 @@ runXFTPWorker c srv doWork = do nextChunk <- withStore' c (`getNextRcvChunkToDownload` srv) case nextChunk of Nothing -> noWorkToDo - Just fc@RcvFileChunk {nextDelay} -> do + Just fc@RcvFileChunk {rcvChunkId, delay} -> do ri <- asks $ reconnectInterval . config - let ri' = maybe ri (\d -> ri {initialInterval = d}) nextDelay - withRetryInterval ri' $ \loop -> + let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay + withRetryInterval ri' $ \delay' loop -> downloadFileChunk fc - `catchError` \e -> do - liftIO $ print e + `catchError` \_ -> do + withStore' c $ \db -> updateRcvFileChunkDelay db rcvChunkId delay' -- TODO don't loop on permanent errors -- TODO increase replica retries count - -- TODO update nextDelay (modify withRetryInterval to expose current delay) loop noWorkToDo = void . atomically $ tryTakeTMVar doWork downloadFileChunk :: RcvFileChunk -> m () @@ -125,10 +124,9 @@ runXFTPLocalWorker c@AgentClient {subQ} doWork = do Nothing -> noWorkToDo Just fd -> do ri <- asks $ reconnectInterval . config - withRetryInterval ri $ \loop -> + withRetryInterval ri $ \_ loop -> decryptFile fd - `catchError` \e -> do - liftIO $ print e + `catchError` \_ -> do -- TODO don't loop on permanent errors -- TODO fixed number of retries instead of exponential backoff? loop diff --git a/src/Simplex/FileTransfer/Client/Agent.hs b/src/Simplex/FileTransfer/Client/Agent.hs index 56ec9f0f6..00ee89404 100644 --- a/src/Simplex/FileTransfer/Client/Agent.hs +++ b/src/Simplex/FileTransfer/Client/Agent.hs @@ -110,7 +110,7 @@ getXFTPServerClient XFTPClientAgent {xftpClients, config} srv = do throwError e tryConnectAsync :: ME () tryConnectAsync = void . async $ do - withRetryInterval (reconnectInterval config) $ void . tryConnectClient + withRetryInterval (reconnectInterval config) $ \_ loop -> void $ tryConnectClient loop showServer :: XFTPServer -> Text showServer ProtocolServer {host, port} = diff --git a/src/Simplex/FileTransfer/Types.hs b/src/Simplex/FileTransfer/Types.hs index dc6bd8f84..64f5f0be7 100644 --- a/src/Simplex/FileTransfer/Types.hs +++ b/src/Simplex/FileTransfer/Types.hs @@ -86,7 +86,7 @@ data RcvFileChunk = RcvFileChunk replicas :: [RcvFileChunkReplica], fileTmpPath :: FilePath, chunkTmpPath :: Maybe FilePath, - nextDelay :: Maybe Int + delay :: Maybe Int } deriving (Eq, Show) diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index 8eae34807..c3db99806 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -126,6 +126,7 @@ module Simplex.Messaging.Agent.Store.SQLite -- File transfer createRcvFile, getRcvFile, + updateRcvFileChunkDelay, updateRcvFileChunkReceived, updateRcvFileStatus, updateRcvFileComplete, @@ -1789,7 +1790,7 @@ getRcvFile db rcvFileId = runExceptT $ do <$> DB.query db [sql| - SELECT rcv_file_chunk_id, chunk_no, chunk_size, digest, tmp_path, next_delay + SELECT rcv_file_chunk_id, chunk_no, chunk_size, digest, tmp_path, delay FROM rcv_file_chunks WHERE rcv_file_id = ? |] @@ -1799,8 +1800,8 @@ getRcvFile db rcvFileId = runExceptT $ do pure (chunk {replicas = replicas'} :: RcvFileChunk) where toChunk :: (Int64, Int, FileSize Word32, FileDigest, Maybe FilePath, Maybe Int) -> RcvFileChunk - toChunk (rcvChunkId, chunkNo, chunkSize, digest, chunkTmpPath, nextDelay) = - RcvFileChunk {userId, rcvFileId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath, nextDelay, replicas = []} + toChunk (rcvChunkId, chunkNo, chunkSize, digest, chunkTmpPath, delay) = + RcvFileChunk {userId, rcvFileId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath, delay, replicas = []} getChunkReplicas :: Int64 -> IO [RcvFileChunkReplica] getChunkReplicas chunkId = do map toReplica @@ -1821,6 +1822,11 @@ getRcvFile db rcvFileId = runExceptT $ do let server = XFTPServer host port keyHash in RcvFileChunkReplica {rcvChunkReplicaId, server, replicaId, replicaKey, received, acknowledged, retries} +updateRcvFileChunkDelay :: DB.Connection -> Int64 -> Int -> IO () +updateRcvFileChunkDelay db chunkId delay = do + updatedAt <- getCurrentTime + DB.execute db "UPDATE rcv_file_chunks SET delay = ?, updated_at = ? WHERE rcv_file_chunk_id = ?" (delay, updatedAt, chunkId) + updateRcvFileChunkReceived :: DB.Connection -> Int64 -> Int64 -> RcvFileId -> FilePath -> IO (Either StoreError RcvFile) updateRcvFileChunkReceived db rId cId fId chunkTmpPath = do updatedAt <- getCurrentTime @@ -1850,7 +1856,7 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} = do db [sql| SELECT - f.user_id, f.rcv_file_id, c.rcv_file_chunk_id, c.chunk_no, c.chunk_size, c.digest, f.tmp_path, c.tmp_path, c.next_delay, + f.user_id, f.rcv_file_id, c.rcv_file_chunk_id, c.chunk_no, c.chunk_size, c.digest, f.tmp_path, c.tmp_path, c.delay, r.rcv_file_chunk_replica_id, r.replica_id, r.replica_key, r.received, r.acknowledged, r.retries FROM rcv_file_chunk_replicas r JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id @@ -1864,7 +1870,7 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} = do (host, port, keyHash) where toChunk :: ((UserId, RcvFileId, Int64, Int, FileSize Word32, FileDigest, FilePath, Maybe FilePath, Maybe Int) :. (Int64, ChunkReplicaId, C.APrivateSignKey, Bool, Bool, Int)) -> RcvFileChunk - toChunk ((userId, rcvFileId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath, nextDelay) :. (rcvChunkReplicaId, replicaId, replicaKey, received, acknowledged, retries)) = + toChunk ((userId, rcvFileId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath, delay) :. (rcvChunkReplicaId, replicaId, replicaKey, received, acknowledged, retries)) = RcvFileChunk { userId, rcvFileId, @@ -1874,7 +1880,7 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} = do digest, fileTmpPath, chunkTmpPath, - nextDelay, + delay, replicas = [RcvFileChunkReplica {rcvChunkReplicaId, server, replicaId, replicaKey, received, acknowledged, retries}] } diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230223_files.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230223_files.hs index 11cd45801..dd3f87853 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230223_files.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230223_files.hs @@ -43,7 +43,7 @@ CREATE TABLE rcv_file_chunks ( chunk_size INTEGER NOT NULL, digest BLOB NOT NULL, tmp_path TEXT, - next_delay INTEGER, + delay INTEGER, created_at TEXT NOT NULL DEFAULT (datetime('now')), updated_at TEXT NOT NULL DEFAULT (datetime('now')) ); diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql index 13aa097a6..44514b705 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql @@ -315,7 +315,7 @@ CREATE TABLE rcv_file_chunks( chunk_size INTEGER NOT NULL, digest BLOB NOT NULL, tmp_path TEXT, - next_delay INTEGER, + delay INTEGER, created_at TEXT NOT NULL DEFAULT(datetime('now')), updated_at TEXT NOT NULL DEFAULT(datetime('now')) );