mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-30 16:26:02 +00:00
xftp: move delay to replica (#688)
This commit is contained in:
@@ -19,7 +19,6 @@ module Simplex.FileTransfer.Agent
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Concurrent.STM (stateTVar)
|
||||
import Control.Logger.Simple (logError)
|
||||
import Control.Monad
|
||||
import Control.Monad.Except
|
||||
@@ -96,7 +95,7 @@ runXFTPWorker c srv doWork = do
|
||||
case nextChunk of
|
||||
Nothing -> noWorkToDo
|
||||
Just RcvFileChunk {rcvFileId, rcvFileEntityId, fileTmpPath, replicas = []} -> workerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) "chunk has no replicas"
|
||||
Just fc@RcvFileChunk {rcvFileId, rcvFileEntityId, rcvChunkId, fileTmpPath, delay, replicas = replica@RcvFileChunkReplica {rcvChunkReplicaId} : _} -> do
|
||||
Just fc@RcvFileChunk {rcvFileId, rcvFileEntityId, fileTmpPath, replicas = replica@RcvFileChunkReplica {rcvChunkReplicaId, delay} : _} -> do
|
||||
ri <- asks $ reconnectInterval . config
|
||||
let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay
|
||||
withRetryInterval ri' $ \delay' loop ->
|
||||
@@ -104,16 +103,14 @@ runXFTPWorker c srv doWork = do
|
||||
`catchError` retryOnError delay' loop (workerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) . show)
|
||||
where
|
||||
retryOnError :: Int -> m () -> (AgentErrorType -> m ()) -> AgentErrorType -> m ()
|
||||
retryOnError chunkDelay loop done e = do
|
||||
retryOnError replicaDelay loop done e = do
|
||||
logError $ "XFTP worker error: " <> tshow e
|
||||
if temporaryAgentError e
|
||||
then retryLoop
|
||||
else done e
|
||||
where
|
||||
retryLoop = do
|
||||
withStore' c $ \db -> do
|
||||
updateRcvFileChunkDelay db rcvChunkId chunkDelay
|
||||
increaseRcvChunkReplicaRetries db rcvChunkReplicaId
|
||||
withStore' c $ \db -> updateRcvChunkReplicaDelay db rcvChunkReplicaId replicaDelay
|
||||
atomically $ endAgentOperation c AORcvNetwork
|
||||
atomically $ throwWhenInactive c
|
||||
atomically $ beginAgentOperation c AORcvNetwork
|
||||
|
||||
@@ -18,7 +18,6 @@ module Simplex.FileTransfer.Client.Main
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Concurrent.STM (stateTVar)
|
||||
import Control.Logger.Simple
|
||||
import Control.Monad
|
||||
import Control.Monad.Except
|
||||
|
||||
@@ -89,8 +89,7 @@ data RcvFileChunk = RcvFileChunk
|
||||
digest :: FileDigest,
|
||||
replicas :: [RcvFileChunkReplica],
|
||||
fileTmpPath :: FilePath,
|
||||
chunkTmpPath :: Maybe FilePath,
|
||||
delay :: Maybe Int
|
||||
chunkTmpPath :: Maybe FilePath
|
||||
}
|
||||
deriving (Eq, Show)
|
||||
|
||||
@@ -100,7 +99,7 @@ data RcvFileChunkReplica = RcvFileChunkReplica
|
||||
replicaId :: ChunkReplicaId,
|
||||
replicaKey :: C.APrivateSignKey,
|
||||
received :: Bool,
|
||||
-- acknowledged :: Bool,
|
||||
delay :: Maybe Int,
|
||||
retries :: Int
|
||||
}
|
||||
deriving (Eq, Show)
|
||||
|
||||
@@ -92,7 +92,6 @@ module Simplex.Messaging.Agent
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Concurrent.STM (stateTVar)
|
||||
import Control.Logger.Simple (logError, logInfo, showText)
|
||||
import Control.Monad.Except
|
||||
import Control.Monad.IO.Unlift (MonadUnliftIO)
|
||||
|
||||
@@ -87,7 +87,7 @@ where
|
||||
import Control.Applicative ((<|>))
|
||||
import Control.Concurrent (forkIO, threadDelay)
|
||||
import Control.Concurrent.Async (Async, uninterruptibleCancel)
|
||||
import Control.Concurrent.STM (retry, stateTVar, throwSTM)
|
||||
import Control.Concurrent.STM (retry, throwSTM)
|
||||
import Control.Exception (AsyncException (..))
|
||||
import Control.Logger.Simple
|
||||
import Control.Monad.Except
|
||||
|
||||
@@ -18,7 +18,6 @@ module Simplex.Messaging.Agent.NtfSubSupervisor
|
||||
where
|
||||
|
||||
import Control.Concurrent.Async (Async, uninterruptibleCancel)
|
||||
import Control.Concurrent.STM (stateTVar)
|
||||
import Control.Logger.Simple (logError, logInfo)
|
||||
import Control.Monad
|
||||
import Control.Monad.Except
|
||||
|
||||
@@ -126,8 +126,7 @@ module Simplex.Messaging.Agent.Store.SQLite
|
||||
-- File transfer
|
||||
createRcvFile,
|
||||
getRcvFile,
|
||||
updateRcvFileChunkDelay,
|
||||
increaseRcvChunkReplicaRetries,
|
||||
updateRcvChunkReplicaDelay,
|
||||
updateRcvFileChunkReceived,
|
||||
updateRcvFileStatus,
|
||||
updateRcvFileError,
|
||||
@@ -148,7 +147,6 @@ module Simplex.Messaging.Agent.Store.SQLite
|
||||
where
|
||||
|
||||
import Control.Concurrent (threadDelay)
|
||||
import Control.Concurrent.STM (stateTVar)
|
||||
import Control.Monad.Except
|
||||
import Crypto.Random (ChaChaDRG, randomBytesGenerate)
|
||||
import Data.Bifunctor (second)
|
||||
@@ -1799,7 +1797,7 @@ getRcvFile db rcvFileId = runExceptT $ do
|
||||
<$> DB.query
|
||||
db
|
||||
[sql|
|
||||
SELECT rcv_file_chunk_id, chunk_no, chunk_size, digest, tmp_path, delay
|
||||
SELECT rcv_file_chunk_id, chunk_no, chunk_size, digest, tmp_path
|
||||
FROM rcv_file_chunks
|
||||
WHERE rcv_file_id = ?
|
||||
|]
|
||||
@@ -1808,9 +1806,9 @@ getRcvFile db rcvFileId = runExceptT $ do
|
||||
replicas' <- getChunkReplicas rcvChunkId
|
||||
pure (chunk {replicas = replicas'} :: RcvFileChunk)
|
||||
where
|
||||
toChunk :: (Int64, Int, FileSize Word32, FileDigest, Maybe FilePath, Maybe Int) -> RcvFileChunk
|
||||
toChunk (rcvChunkId, chunkNo, chunkSize, digest, chunkTmpPath, delay) =
|
||||
RcvFileChunk {rcvFileId, rcvFileEntityId, userId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath, delay, replicas = []}
|
||||
toChunk :: (Int64, Int, FileSize Word32, FileDigest, Maybe FilePath) -> RcvFileChunk
|
||||
toChunk (rcvChunkId, chunkNo, chunkSize, digest, chunkTmpPath) =
|
||||
RcvFileChunk {rcvFileId, rcvFileEntityId, userId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath, replicas = []}
|
||||
getChunkReplicas :: Int64 -> IO [RcvFileChunkReplica]
|
||||
getChunkReplicas chunkId = do
|
||||
map toReplica
|
||||
@@ -1818,7 +1816,7 @@ getRcvFile db rcvFileId = runExceptT $ do
|
||||
db
|
||||
[sql|
|
||||
SELECT
|
||||
r.rcv_file_chunk_replica_id, r.replica_id, r.replica_key, r.received, r.retries,
|
||||
r.rcv_file_chunk_replica_id, r.replica_id, r.replica_key, r.received, r.delay, r.retries,
|
||||
s.xftp_host, s.xftp_port, s.xftp_key_hash
|
||||
FROM rcv_file_chunk_replicas r
|
||||
JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id
|
||||
@@ -1826,20 +1824,15 @@ getRcvFile db rcvFileId = runExceptT $ do
|
||||
|]
|
||||
(Only chunkId)
|
||||
where
|
||||
toReplica :: (Int64, ChunkReplicaId, C.APrivateSignKey, Bool, Int, NonEmpty TransportHost, ServiceName, C.KeyHash) -> RcvFileChunkReplica
|
||||
toReplica (rcvChunkReplicaId, replicaId, replicaKey, received, retries, host, port, keyHash) =
|
||||
toReplica :: (Int64, ChunkReplicaId, C.APrivateSignKey, Bool, Maybe Int, Int, NonEmpty TransportHost, ServiceName, C.KeyHash) -> RcvFileChunkReplica
|
||||
toReplica (rcvChunkReplicaId, replicaId, replicaKey, received, delay, retries, host, port, keyHash) =
|
||||
let server = XFTPServer host port keyHash
|
||||
in RcvFileChunkReplica {rcvChunkReplicaId, server, replicaId, replicaKey, received, retries}
|
||||
in RcvFileChunkReplica {rcvChunkReplicaId, server, replicaId, replicaKey, received, delay, retries}
|
||||
|
||||
updateRcvFileChunkDelay :: DB.Connection -> Int64 -> Int -> IO ()
|
||||
updateRcvFileChunkDelay db chunkId delay = do
|
||||
updateRcvChunkReplicaDelay :: DB.Connection -> Int64 -> Int -> IO ()
|
||||
updateRcvChunkReplicaDelay db replicaId delay = do
|
||||
updatedAt <- getCurrentTime
|
||||
DB.execute db "UPDATE rcv_file_chunks SET delay = ?, updated_at = ? WHERE rcv_file_chunk_id = ?" (delay, updatedAt, chunkId)
|
||||
|
||||
increaseRcvChunkReplicaRetries :: DB.Connection -> Int64 -> IO ()
|
||||
increaseRcvChunkReplicaRetries db replicaId = do
|
||||
updatedAt <- getCurrentTime
|
||||
DB.execute db "UPDATE rcv_file_chunk_replicas SET retries = retries + 1, updated_at = ? WHERE rcv_file_chunk_replica_id = ?" (updatedAt, replicaId)
|
||||
DB.execute db "UPDATE rcv_file_chunk_replicas SET delay = ?, retries = retries + 1, updated_at = ? WHERE rcv_file_chunk_replica_id = ?" (delay, updatedAt, replicaId)
|
||||
|
||||
updateRcvFileChunkReceived :: DB.Connection -> Int64 -> Int64 -> DBRcvFileId -> FilePath -> IO (Either StoreError RcvFile)
|
||||
updateRcvFileChunkReceived db rId cId fId chunkTmpPath = do
|
||||
@@ -1875,8 +1868,8 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} = do
|
||||
db
|
||||
[sql|
|
||||
SELECT
|
||||
f.rcv_file_id, f.rcv_file_entity_id, f.user_id, c.rcv_file_chunk_id, c.chunk_no, c.chunk_size, c.digest, f.tmp_path, c.tmp_path, c.delay,
|
||||
r.rcv_file_chunk_replica_id, r.replica_id, r.replica_key, r.received, r.retries
|
||||
f.rcv_file_id, f.rcv_file_entity_id, f.user_id, c.rcv_file_chunk_id, c.chunk_no, c.chunk_size, c.digest, f.tmp_path, c.tmp_path,
|
||||
r.rcv_file_chunk_replica_id, r.replica_id, r.replica_key, r.received, r.delay, r.retries
|
||||
FROM rcv_file_chunk_replicas r
|
||||
JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id
|
||||
JOIN rcv_file_chunks c ON c.rcv_file_chunk_id = r.rcv_file_chunk_id
|
||||
@@ -1888,8 +1881,8 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} = do
|
||||
|]
|
||||
(host, port, keyHash, RFSReceiving)
|
||||
where
|
||||
toChunk :: ((DBRcvFileId, RcvFileId, UserId, Int64, Int, FileSize Word32, FileDigest, FilePath, Maybe FilePath, Maybe Int) :. (Int64, ChunkReplicaId, C.APrivateSignKey, Bool, Int)) -> RcvFileChunk
|
||||
toChunk ((rcvFileId, rcvFileEntityId, userId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath, delay) :. (rcvChunkReplicaId, replicaId, replicaKey, received, retries)) =
|
||||
toChunk :: ((DBRcvFileId, RcvFileId, UserId, Int64, Int, FileSize Word32, FileDigest, FilePath, Maybe FilePath) :. (Int64, ChunkReplicaId, C.APrivateSignKey, Bool, Maybe Int, Int)) -> RcvFileChunk
|
||||
toChunk ((rcvFileId, rcvFileEntityId, userId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath) :. (rcvChunkReplicaId, replicaId, replicaKey, received, delay, retries)) =
|
||||
RcvFileChunk
|
||||
{ rcvFileId,
|
||||
rcvFileEntityId,
|
||||
@@ -1900,8 +1893,7 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} = do
|
||||
digest,
|
||||
fileTmpPath,
|
||||
chunkTmpPath,
|
||||
delay,
|
||||
replicas = [RcvFileChunkReplica {rcvChunkReplicaId, server, replicaId, replicaKey, received, retries}]
|
||||
replicas = [RcvFileChunkReplica {rcvChunkReplicaId, server, replicaId, replicaKey, received, delay, retries}]
|
||||
}
|
||||
|
||||
getNextRcvFileToDecrypt :: DB.Connection -> IO (Maybe RcvFile)
|
||||
|
||||
@@ -45,7 +45,6 @@ CREATE TABLE rcv_file_chunks (
|
||||
chunk_size INTEGER NOT NULL,
|
||||
digest BLOB NOT NULL,
|
||||
tmp_path TEXT,
|
||||
delay INTEGER,
|
||||
created_at TEXT NOT NULL DEFAULT (datetime('now')),
|
||||
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
|
||||
);
|
||||
@@ -60,7 +59,7 @@ CREATE TABLE rcv_file_chunk_replicas (
|
||||
replica_id BLOB NOT NULL,
|
||||
replica_key BLOB NOT NULL,
|
||||
received INTEGER NOT NULL DEFAULT 0,
|
||||
-- acknowledged INTEGER NOT NULL DEFAULT 0,
|
||||
delay INTEGER,
|
||||
retries INTEGER NOT NULL DEFAULT 0,
|
||||
created_at TEXT NOT NULL DEFAULT (datetime('now')),
|
||||
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
|
||||
|
||||
@@ -317,7 +317,6 @@ CREATE TABLE rcv_file_chunks(
|
||||
chunk_size INTEGER NOT NULL,
|
||||
digest BLOB NOT NULL,
|
||||
tmp_path TEXT,
|
||||
delay INTEGER,
|
||||
created_at TEXT NOT NULL DEFAULT(datetime('now')),
|
||||
updated_at TEXT NOT NULL DEFAULT(datetime('now'))
|
||||
);
|
||||
@@ -330,7 +329,7 @@ CREATE TABLE rcv_file_chunk_replicas(
|
||||
replica_id BLOB NOT NULL,
|
||||
replica_key BLOB NOT NULL,
|
||||
received INTEGER NOT NULL DEFAULT 0,
|
||||
-- acknowledged INTEGER NOT NULL DEFAULT 0,
|
||||
delay INTEGER,
|
||||
retries INTEGER NOT NULL DEFAULT 0,
|
||||
created_at TEXT NOT NULL DEFAULT(datetime('now')),
|
||||
updated_at TEXT NOT NULL DEFAULT(datetime('now'))
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
module Simplex.Messaging.Agent.TAsyncs where
|
||||
|
||||
import Control.Concurrent.STM (stateTVar)
|
||||
import Control.Monad.IO.Unlift (MonadUnliftIO)
|
||||
import Simplex.Messaging.TMap (TMap)
|
||||
import qualified Simplex.Messaging.TMap as TM
|
||||
|
||||
@@ -12,7 +12,6 @@
|
||||
|
||||
module Simplex.Messaging.Notifications.Server where
|
||||
|
||||
import Control.Concurrent.STM (stateTVar)
|
||||
import Control.Logger.Simple
|
||||
import Control.Monad.Except
|
||||
import Control.Monad.Reader
|
||||
|
||||
@@ -12,7 +12,6 @@ module Simplex.Messaging.Transport.Server
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Concurrent.STM (stateTVar)
|
||||
import Control.Monad.Except
|
||||
import Control.Monad.IO.Unlift
|
||||
import qualified Crypto.Store.X509 as SX
|
||||
|
||||
Reference in New Issue
Block a user