diff --git a/src/Simplex/FileTransfer/Agent.hs b/src/Simplex/FileTransfer/Agent.hs index a71026dcb..6e4a84c7e 100644 --- a/src/Simplex/FileTransfer/Agent.hs +++ b/src/Simplex/FileTransfer/Agent.hs @@ -19,7 +19,6 @@ module Simplex.FileTransfer.Agent ) where -import Control.Concurrent.STM (stateTVar) import Control.Logger.Simple (logError) import Control.Monad import Control.Monad.Except @@ -96,7 +95,7 @@ runXFTPWorker c srv doWork = do case nextChunk of Nothing -> noWorkToDo Just RcvFileChunk {rcvFileId, rcvFileEntityId, fileTmpPath, replicas = []} -> workerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) "chunk has no replicas" - Just fc@RcvFileChunk {rcvFileId, rcvFileEntityId, rcvChunkId, fileTmpPath, delay, replicas = replica@RcvFileChunkReplica {rcvChunkReplicaId} : _} -> do + Just fc@RcvFileChunk {rcvFileId, rcvFileEntityId, fileTmpPath, replicas = replica@RcvFileChunkReplica {rcvChunkReplicaId, delay} : _} -> do ri <- asks $ reconnectInterval . config let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay withRetryInterval ri' $ \delay' loop -> @@ -104,16 +103,14 @@ runXFTPWorker c srv doWork = do `catchError` retryOnError delay' loop (workerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) . show) where retryOnError :: Int -> m () -> (AgentErrorType -> m ()) -> AgentErrorType -> m () - retryOnError chunkDelay loop done e = do + retryOnError replicaDelay loop done e = do logError $ "XFTP worker error: " <> tshow e if temporaryAgentError e then retryLoop else done e where retryLoop = do - withStore' c $ \db -> do - updateRcvFileChunkDelay db rcvChunkId chunkDelay - increaseRcvChunkReplicaRetries db rcvChunkReplicaId + withStore' c $ \db -> updateRcvChunkReplicaDelay db rcvChunkReplicaId replicaDelay atomically $ endAgentOperation c AORcvNetwork atomically $ throwWhenInactive c atomically $ beginAgentOperation c AORcvNetwork diff --git a/src/Simplex/FileTransfer/Client/Main.hs b/src/Simplex/FileTransfer/Client/Main.hs index 5ed5d5eba..ed626b4dd 100644 --- a/src/Simplex/FileTransfer/Client/Main.hs +++ b/src/Simplex/FileTransfer/Client/Main.hs @@ -18,7 +18,6 @@ module Simplex.FileTransfer.Client.Main ) where -import Control.Concurrent.STM (stateTVar) import Control.Logger.Simple import Control.Monad import Control.Monad.Except diff --git a/src/Simplex/FileTransfer/Types.hs b/src/Simplex/FileTransfer/Types.hs index c9f5aa7ae..e71460fc2 100644 --- a/src/Simplex/FileTransfer/Types.hs +++ b/src/Simplex/FileTransfer/Types.hs @@ -89,8 +89,7 @@ data RcvFileChunk = RcvFileChunk digest :: FileDigest, replicas :: [RcvFileChunkReplica], fileTmpPath :: FilePath, - chunkTmpPath :: Maybe FilePath, - delay :: Maybe Int + chunkTmpPath :: Maybe FilePath } deriving (Eq, Show) @@ -100,7 +99,7 @@ data RcvFileChunkReplica = RcvFileChunkReplica replicaId :: ChunkReplicaId, replicaKey :: C.APrivateSignKey, received :: Bool, - -- acknowledged :: Bool, + delay :: Maybe Int, retries :: Int } deriving (Eq, Show) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index a1d80b93f..fcd68361c 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -92,7 +92,6 @@ module Simplex.Messaging.Agent ) where -import Control.Concurrent.STM (stateTVar) import Control.Logger.Simple (logError, logInfo, showText) import Control.Monad.Except import Control.Monad.IO.Unlift (MonadUnliftIO) diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 8e8f2fe90..89559329f 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -87,7 +87,7 @@ where import Control.Applicative ((<|>)) import Control.Concurrent (forkIO, threadDelay) import Control.Concurrent.Async (Async, uninterruptibleCancel) -import Control.Concurrent.STM (retry, stateTVar, throwSTM) +import Control.Concurrent.STM (retry, throwSTM) import Control.Exception (AsyncException (..)) import Control.Logger.Simple import Control.Monad.Except diff --git a/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs b/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs index 459dc1206..4061335b7 100644 --- a/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs +++ b/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs @@ -18,7 +18,6 @@ module Simplex.Messaging.Agent.NtfSubSupervisor where import Control.Concurrent.Async (Async, uninterruptibleCancel) -import Control.Concurrent.STM (stateTVar) import Control.Logger.Simple (logError, logInfo) import Control.Monad import Control.Monad.Except diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index 9252682c2..b8e02fc49 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -126,8 +126,7 @@ module Simplex.Messaging.Agent.Store.SQLite -- File transfer createRcvFile, getRcvFile, - updateRcvFileChunkDelay, - increaseRcvChunkReplicaRetries, + updateRcvChunkReplicaDelay, updateRcvFileChunkReceived, updateRcvFileStatus, updateRcvFileError, @@ -148,7 +147,6 @@ module Simplex.Messaging.Agent.Store.SQLite where import Control.Concurrent (threadDelay) -import Control.Concurrent.STM (stateTVar) import Control.Monad.Except import Crypto.Random (ChaChaDRG, randomBytesGenerate) import Data.Bifunctor (second) @@ -1799,7 +1797,7 @@ getRcvFile db rcvFileId = runExceptT $ do <$> DB.query db [sql| - SELECT rcv_file_chunk_id, chunk_no, chunk_size, digest, tmp_path, delay + SELECT rcv_file_chunk_id, chunk_no, chunk_size, digest, tmp_path FROM rcv_file_chunks WHERE rcv_file_id = ? |] @@ -1808,9 +1806,9 @@ getRcvFile db rcvFileId = runExceptT $ do replicas' <- getChunkReplicas rcvChunkId pure (chunk {replicas = replicas'} :: RcvFileChunk) where - toChunk :: (Int64, Int, FileSize Word32, FileDigest, Maybe FilePath, Maybe Int) -> RcvFileChunk - toChunk (rcvChunkId, chunkNo, chunkSize, digest, chunkTmpPath, delay) = - RcvFileChunk {rcvFileId, rcvFileEntityId, userId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath, delay, replicas = []} + toChunk :: (Int64, Int, FileSize Word32, FileDigest, Maybe FilePath) -> RcvFileChunk + toChunk (rcvChunkId, chunkNo, chunkSize, digest, chunkTmpPath) = + RcvFileChunk {rcvFileId, rcvFileEntityId, userId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath, replicas = []} getChunkReplicas :: Int64 -> IO [RcvFileChunkReplica] getChunkReplicas chunkId = do map toReplica @@ -1818,7 +1816,7 @@ getRcvFile db rcvFileId = runExceptT $ do db [sql| SELECT - r.rcv_file_chunk_replica_id, r.replica_id, r.replica_key, r.received, r.retries, + r.rcv_file_chunk_replica_id, r.replica_id, r.replica_key, r.received, r.delay, r.retries, s.xftp_host, s.xftp_port, s.xftp_key_hash FROM rcv_file_chunk_replicas r JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id @@ -1826,20 +1824,15 @@ getRcvFile db rcvFileId = runExceptT $ do |] (Only chunkId) where - toReplica :: (Int64, ChunkReplicaId, C.APrivateSignKey, Bool, Int, NonEmpty TransportHost, ServiceName, C.KeyHash) -> RcvFileChunkReplica - toReplica (rcvChunkReplicaId, replicaId, replicaKey, received, retries, host, port, keyHash) = + toReplica :: (Int64, ChunkReplicaId, C.APrivateSignKey, Bool, Maybe Int, Int, NonEmpty TransportHost, ServiceName, C.KeyHash) -> RcvFileChunkReplica + toReplica (rcvChunkReplicaId, replicaId, replicaKey, received, delay, retries, host, port, keyHash) = let server = XFTPServer host port keyHash - in RcvFileChunkReplica {rcvChunkReplicaId, server, replicaId, replicaKey, received, retries} + in RcvFileChunkReplica {rcvChunkReplicaId, server, replicaId, replicaKey, received, delay, retries} -updateRcvFileChunkDelay :: DB.Connection -> Int64 -> Int -> IO () -updateRcvFileChunkDelay db chunkId delay = do +updateRcvChunkReplicaDelay :: DB.Connection -> Int64 -> Int -> IO () +updateRcvChunkReplicaDelay db replicaId delay = do updatedAt <- getCurrentTime - DB.execute db "UPDATE rcv_file_chunks SET delay = ?, updated_at = ? WHERE rcv_file_chunk_id = ?" (delay, updatedAt, chunkId) - -increaseRcvChunkReplicaRetries :: DB.Connection -> Int64 -> IO () -increaseRcvChunkReplicaRetries db replicaId = do - updatedAt <- getCurrentTime - DB.execute db "UPDATE rcv_file_chunk_replicas SET retries = retries + 1, updated_at = ? WHERE rcv_file_chunk_replica_id = ?" (updatedAt, replicaId) + DB.execute db "UPDATE rcv_file_chunk_replicas SET delay = ?, retries = retries + 1, updated_at = ? WHERE rcv_file_chunk_replica_id = ?" (delay, updatedAt, replicaId) updateRcvFileChunkReceived :: DB.Connection -> Int64 -> Int64 -> DBRcvFileId -> FilePath -> IO (Either StoreError RcvFile) updateRcvFileChunkReceived db rId cId fId chunkTmpPath = do @@ -1875,8 +1868,8 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} = do db [sql| SELECT - f.rcv_file_id, f.rcv_file_entity_id, f.user_id, c.rcv_file_chunk_id, c.chunk_no, c.chunk_size, c.digest, f.tmp_path, c.tmp_path, c.delay, - r.rcv_file_chunk_replica_id, r.replica_id, r.replica_key, r.received, r.retries + f.rcv_file_id, f.rcv_file_entity_id, f.user_id, c.rcv_file_chunk_id, c.chunk_no, c.chunk_size, c.digest, f.tmp_path, c.tmp_path, + r.rcv_file_chunk_replica_id, r.replica_id, r.replica_key, r.received, r.delay, r.retries FROM rcv_file_chunk_replicas r JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id JOIN rcv_file_chunks c ON c.rcv_file_chunk_id = r.rcv_file_chunk_id @@ -1888,8 +1881,8 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} = do |] (host, port, keyHash, RFSReceiving) where - toChunk :: ((DBRcvFileId, RcvFileId, UserId, Int64, Int, FileSize Word32, FileDigest, FilePath, Maybe FilePath, Maybe Int) :. (Int64, ChunkReplicaId, C.APrivateSignKey, Bool, Int)) -> RcvFileChunk - toChunk ((rcvFileId, rcvFileEntityId, userId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath, delay) :. (rcvChunkReplicaId, replicaId, replicaKey, received, retries)) = + toChunk :: ((DBRcvFileId, RcvFileId, UserId, Int64, Int, FileSize Word32, FileDigest, FilePath, Maybe FilePath) :. (Int64, ChunkReplicaId, C.APrivateSignKey, Bool, Maybe Int, Int)) -> RcvFileChunk + toChunk ((rcvFileId, rcvFileEntityId, userId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath) :. (rcvChunkReplicaId, replicaId, replicaKey, received, delay, retries)) = RcvFileChunk { rcvFileId, rcvFileEntityId, @@ -1900,8 +1893,7 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} = do digest, fileTmpPath, chunkTmpPath, - delay, - replicas = [RcvFileChunkReplica {rcvChunkReplicaId, server, replicaId, replicaKey, received, retries}] + replicas = [RcvFileChunkReplica {rcvChunkReplicaId, server, replicaId, replicaKey, received, delay, retries}] } getNextRcvFileToDecrypt :: DB.Connection -> IO (Maybe RcvFile) diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230223_files.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230223_files.hs index 8853deb0e..46e9dd300 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230223_files.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230223_files.hs @@ -45,7 +45,6 @@ CREATE TABLE rcv_file_chunks ( chunk_size INTEGER NOT NULL, digest BLOB NOT NULL, tmp_path TEXT, - delay INTEGER, created_at TEXT NOT NULL DEFAULT (datetime('now')), updated_at TEXT NOT NULL DEFAULT (datetime('now')) ); @@ -60,7 +59,7 @@ CREATE TABLE rcv_file_chunk_replicas ( replica_id BLOB NOT NULL, replica_key BLOB NOT NULL, received INTEGER NOT NULL DEFAULT 0, - -- acknowledged INTEGER NOT NULL DEFAULT 0, + delay INTEGER, retries INTEGER NOT NULL DEFAULT 0, created_at TEXT NOT NULL DEFAULT (datetime('now')), updated_at TEXT NOT NULL DEFAULT (datetime('now')) diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql index b09f5a97b..1f17ac080 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql @@ -317,7 +317,6 @@ CREATE TABLE rcv_file_chunks( chunk_size INTEGER NOT NULL, digest BLOB NOT NULL, tmp_path TEXT, - delay INTEGER, created_at TEXT NOT NULL DEFAULT(datetime('now')), updated_at TEXT NOT NULL DEFAULT(datetime('now')) ); @@ -330,7 +329,7 @@ CREATE TABLE rcv_file_chunk_replicas( replica_id BLOB NOT NULL, replica_key BLOB NOT NULL, received INTEGER NOT NULL DEFAULT 0, - -- acknowledged INTEGER NOT NULL DEFAULT 0, + delay INTEGER, retries INTEGER NOT NULL DEFAULT 0, created_at TEXT NOT NULL DEFAULT(datetime('now')), updated_at TEXT NOT NULL DEFAULT(datetime('now')) diff --git a/src/Simplex/Messaging/Agent/TAsyncs.hs b/src/Simplex/Messaging/Agent/TAsyncs.hs index 80fc41840..d2e2ea1f5 100644 --- a/src/Simplex/Messaging/Agent/TAsyncs.hs +++ b/src/Simplex/Messaging/Agent/TAsyncs.hs @@ -1,6 +1,5 @@ module Simplex.Messaging.Agent.TAsyncs where -import Control.Concurrent.STM (stateTVar) import Control.Monad.IO.Unlift (MonadUnliftIO) import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM diff --git a/src/Simplex/Messaging/Notifications/Server.hs b/src/Simplex/Messaging/Notifications/Server.hs index 1f230883b..2c5aa0728 100644 --- a/src/Simplex/Messaging/Notifications/Server.hs +++ b/src/Simplex/Messaging/Notifications/Server.hs @@ -12,7 +12,6 @@ module Simplex.Messaging.Notifications.Server where -import Control.Concurrent.STM (stateTVar) import Control.Logger.Simple import Control.Monad.Except import Control.Monad.Reader diff --git a/src/Simplex/Messaging/Transport/Server.hs b/src/Simplex/Messaging/Transport/Server.hs index d7ce49d9b..8162b98b8 100644 --- a/src/Simplex/Messaging/Transport/Server.hs +++ b/src/Simplex/Messaging/Transport/Server.hs @@ -12,7 +12,6 @@ module Simplex.Messaging.Transport.Server ) where -import Control.Concurrent.STM (stateTVar) import Control.Monad.Except import Control.Monad.IO.Unlift import qualified Crypto.Store.X509 as SX