diff --git a/src/Simplex/FileTransfer/Agent.hs b/src/Simplex/FileTransfer/Agent.hs index 5154961c4..95a2ce04b 100644 --- a/src/Simplex/FileTransfer/Agent.hs +++ b/src/Simplex/FileTransfer/Agent.hs @@ -46,11 +46,12 @@ import UnliftIO import UnliftIO.Directory import qualified UnliftIO.Exception as E -receiveFile :: AgentMonad m => AgentClient -> UserId -> ValidFileDescription 'FPRecipient -> FilePath -> m Int64 +receiveFile :: AgentMonad m => AgentClient -> UserId -> ValidFileDescription 'FPRecipient -> FilePath -> m RcvFileId receiveFile c userId (ValidFileDescription fd@FileDescription {chunks}) xftpPath = do + g <- asks idsDrg encPath <- uniqueCombine xftpPath "xftp.encrypted" createDirectory encPath - fId <- withStore' c $ \db -> createRcvFile db userId fd xftpPath encPath + fId <- withStore c $ \db -> createRcvFile db g userId fd xftpPath encPath forM_ chunks downloadChunk pure fId where @@ -85,13 +86,13 @@ runXFTPWorker c srv doWork = do nextChunk <- withStore' c (`getNextRcvChunkToDownload` srv) case nextChunk of Nothing -> noWorkToDo - Just RcvFileChunk {rcvFileId, fileTmpPath, replicas = []} -> workerInternalError c rcvFileId (Just fileTmpPath) "chunk has no replicas" - Just fc@RcvFileChunk {rcvFileId, rcvChunkId, fileTmpPath, delay, replicas = replica@RcvFileChunkReplica {rcvChunkReplicaId} : _} -> do + Just RcvFileChunk {rcvFileId, rcvFileEntityId, fileTmpPath, replicas = []} -> workerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) "chunk has no replicas" + Just fc@RcvFileChunk {rcvFileId, rcvFileEntityId, rcvChunkId, fileTmpPath, delay, replicas = replica@RcvFileChunkReplica {rcvChunkReplicaId} : _} -> do ri <- asks $ reconnectInterval . config let ri' = maybe ri (\d -> ri {initialInterval = d, increaseAfter = 0}) delay withRetryInterval ri' $ \delay' loop -> downloadFileChunk fc replica - `catchError` retryOnError delay' loop (workerInternalError c rcvFileId (Just fileTmpPath) . show) + `catchError` retryOnError delay' loop (workerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) . show) where retryOnError :: Int -> m () -> (AgentErrorType -> m ()) -> AgentErrorType -> m () retryOnError chunkDelay loop done e = do @@ -129,14 +130,14 @@ runXFTPWorker c srv doWork = do allChunksReceived RcvFile {chunks} = all (\RcvFileChunk {replicas} -> any received replicas) chunks -workerInternalError :: AgentMonad m => AgentClient -> RcvFileId -> Maybe FilePath -> String -> m () -workerInternalError c rcvFileId tmpPath internalErrStr = do +workerInternalError :: AgentMonad m => AgentClient -> DBRcvFileId -> RcvFileId -> Maybe FilePath -> String -> m () +workerInternalError c rcvFileId rcvFileEntityId tmpPath internalErrStr = do forM_ tmpPath removePath withStore' c $ \db -> updateRcvFileError db rcvFileId internalErrStr - notifyInternalError c rcvFileId internalErrStr + notifyInternalError c rcvFileEntityId internalErrStr notifyInternalError :: (MonadUnliftIO m) => AgentClient -> RcvFileId -> String -> m () -notifyInternalError AgentClient {subQ} rcvFileId internalErrStr = atomically $ writeTBQueue subQ ("", "", APC SAERcvFile $ RFERR rcvFileId $ INTERNAL internalErrStr) +notifyInternalError AgentClient {subQ} rcvFileEntityId internalErrStr = atomically $ writeTBQueue subQ ("", rcvFileEntityId, APC SAERcvFile $ RFERR $ INTERNAL internalErrStr) runXFTPLocalWorker :: forall m. AgentMonad m => AgentClient -> TMVar () -> m () runXFTPLocalWorker c@AgentClient {subQ} doWork = do @@ -149,11 +150,11 @@ runXFTPLocalWorker c@AgentClient {subQ} doWork = do nextFile <- withStore' c getNextRcvFileToDecrypt case nextFile of Nothing -> noWorkToDo - Just f@RcvFile {rcvFileId, tmpPath} -> - decryptFile f `catchError` (workerInternalError c rcvFileId tmpPath . show) + Just f@RcvFile {rcvFileId, rcvFileEntityId, tmpPath} -> + decryptFile f `catchError` (workerInternalError c rcvFileId rcvFileEntityId tmpPath . show) noWorkToDo = void . atomically $ tryTakeTMVar doWork decryptFile :: RcvFile -> m () - decryptFile RcvFile {rcvFileId, key, nonce, tmpPath, saveDir, savePath, chunks} = do + decryptFile RcvFile {rcvFileId, rcvFileEntityId, key, nonce, tmpPath, saveDir, savePath, chunks} = do forM_ savePath $ \p -> do removePath p withStore' c (`updateRcvFileNoSavePath` rcvFileId) @@ -163,10 +164,10 @@ runXFTPLocalWorker c@AgentClient {subQ} doWork = do path <- decrypt encSize chunkPaths forM_ tmpPath removePath withStore' c $ \db -> updateRcvFileComplete db rcvFileId path - notify $ RFDONE rcvFileId path + notify $ RFDONE path where notify :: forall e. AEntityI e => ACommand 'Agent e -> m () - notify cmd = atomically $ writeTBQueue subQ ("", "", APC (sAEntity @e) cmd) + notify cmd = atomically $ writeTBQueue subQ ("", rcvFileEntityId, APC (sAEntity @e) cmd) getChunkPaths :: [RcvFileChunk] -> m [FilePath] getChunkPaths [] = pure [] getChunkPaths (RcvFileChunk {chunkTmpPath = Just path} : cs) = do diff --git a/src/Simplex/FileTransfer/Types.hs b/src/Simplex/FileTransfer/Types.hs index 983bf9924..da673f4c9 100644 --- a/src/Simplex/FileTransfer/Types.hs +++ b/src/Simplex/FileTransfer/Types.hs @@ -11,6 +11,7 @@ import Database.SQLite.Simple.FromField (FromField (..)) import Database.SQLite.Simple.ToField (ToField (..)) import Simplex.FileTransfer.Client (XFTPChunkSpec (..)) import Simplex.FileTransfer.Description +import Simplex.Messaging.Agent.Protocol (RcvFileId) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding import Simplex.Messaging.Encoding.String @@ -33,11 +34,12 @@ instance Encoding FileHeader where (fileName, fileExtra) <- smpP pure FileHeader {fileName, fileExtra} -type RcvFileId = Int64 +type DBRcvFileId = Int64 data RcvFile = RcvFile - { userId :: Int64, - rcvFileId :: RcvFileId, + { rcvFileId :: DBRcvFileId, + rcvFileEntityId :: RcvFileId, + userId :: Int64, size :: FileSize Int64, digest :: FileDigest, key :: C.SbKey, @@ -79,8 +81,9 @@ instance TextEncoding RcvFileStatus where RFSError -> "error" data RcvFileChunk = RcvFileChunk - { userId :: Int64, - rcvFileId :: RcvFileId, + { rcvFileId :: DBRcvFileId, + rcvFileEntityId :: RcvFileId, + userId :: Int64, rcvChunkId :: Int64, chunkNo :: Int, chunkSize :: FileSize Word32, @@ -105,11 +108,11 @@ data RcvFileChunkReplica = RcvFileChunkReplica -- Sending files -type SndFileId = Int64 +type DBSndFileId = Int64 data SndFile = SndFile { userId :: Int64, - sndFileId :: SndFileId, + sndFileId :: DBSndFileId, size :: FileSize Int64, digest :: FileDigest, key :: C.SbKey, @@ -151,7 +154,7 @@ instance TextEncoding SndFileStatus where data SndFileChunk = SndFileChunk { userId :: Int64, - sndFileId :: SndFileId, + sndFileId :: DBSndFileId, sndChunkId :: Int64, chunkNo :: Int, chunkSpec :: XFTPChunkSpec, diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 2689f60e3..94c53848e 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -117,7 +117,6 @@ import qualified Database.SQLite.Simple as DB import Simplex.FileTransfer.Agent (addXFTPWorker, receiveFile) import Simplex.FileTransfer.Description (ValidFileDescription) import Simplex.FileTransfer.Protocol (FileParty (..)) -import Simplex.FileTransfer.Types (RcvFileId) import Simplex.FileTransfer.Util (removePath) import Simplex.Messaging.Agent.Client import Simplex.Messaging.Agent.Env.SQLite diff --git a/src/Simplex/Messaging/Agent/Env/SQLite.hs b/src/Simplex/Messaging/Agent/Env/SQLite.hs index b90b260dc..632b89961 100644 --- a/src/Simplex/Messaging/Agent/Env/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Env/SQLite.hs @@ -41,7 +41,7 @@ import Data.Word (Word16) import Network.Socket import Numeric.Natural import Simplex.FileTransfer.Client (XFTPClientConfig (..), defaultXFTPClientConfig) -import Simplex.FileTransfer.Types (SndFileId) +import Simplex.FileTransfer.Types (DBSndFileId) import Simplex.Messaging.Agent.Protocol import Simplex.Messaging.Agent.RetryInterval import Simplex.Messaging.Agent.Store (UserId) @@ -225,7 +225,7 @@ data XFTPAgent = XFTPAgent xftpSndWorkers :: TMap (Maybe XFTPServer) (TMVar (), Async ()), -- files currently in upload - to throttle upload of other files' chunks, -- this optimization can be dropped for the MVP - xftpSndFiles :: TVar (Set SndFileId) + xftpSndFiles :: TVar (Set DBSndFileId) } newXFTPAgent :: STM XFTPAgent diff --git a/src/Simplex/Messaging/Agent/Protocol.hs b/src/Simplex/Messaging/Agent/Protocol.hs index b73a04d06..60b1d0d17 100644 --- a/src/Simplex/Messaging/Agent/Protocol.hs +++ b/src/Simplex/Messaging/Agent/Protocol.hs @@ -97,6 +97,7 @@ module Simplex.Messaging.Agent.Protocol ATransmissionOrError, ARawTransmission, ConnId, + RcvFileId, ConfirmationId, InvitationId, MsgIntegrity (..), @@ -162,7 +163,6 @@ import Database.SQLite.Simple.ToField import GHC.Generics (Generic) import Generic.Random (genericArbitraryU) import Simplex.FileTransfer.Protocol (XFTPErrorType) -import Simplex.FileTransfer.Types (RcvFileId) import Simplex.Messaging.Agent.QueryString import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Crypto.Ratchet (E2ERatchetParams, E2ERatchetParamsUri) @@ -327,9 +327,9 @@ data ACommand (p :: AParty) (e :: AEntity) where ERR :: AgentErrorType -> ACommand Agent AEConn SUSPENDED :: ACommand Agent AENone -- XFTP commands and responses - RFPROG :: RcvFileId -> Int -> Int -> ACommand Agent AERcvFile - RFDONE :: RcvFileId -> FilePath -> ACommand Agent AERcvFile - RFERR :: RcvFileId -> AgentErrorType -> ACommand Agent AERcvFile + RFPROG :: Int -> Int -> ACommand Agent AERcvFile + RFDONE :: FilePath -> ACommand Agent AERcvFile + RFERR :: AgentErrorType -> ACommand Agent AERcvFile deriving instance Eq (ACommand p e) @@ -880,6 +880,8 @@ connModeT = \case -- | SMP agent connection ID. type ConnId = ByteString +type RcvFileId = ByteString + type ConfirmationId = ByteString type InvitationId = ByteString @@ -1422,9 +1424,9 @@ commandP binaryP = OK_ -> pure OK ERR_ -> s (ERR <$> strP) SUSPENDED_ -> pure SUSPENDED - RFPROG_ -> s (RFPROG <$> A.decimal <* A.space <*> A.decimal <* A.space <*> A.decimal) - RFDONE_ -> s (RFDONE <$> A.decimal <* A.space <*> strP) - RFERR_ -> s (RFERR <$> A.decimal <* A.space <*> strP) + RFPROG_ -> s (RFPROG <$> A.decimal <* A.space <*> A.decimal) + RFDONE_ -> s (RFDONE <$> strP) + RFERR_ -> s (RFERR <$> strP) where s :: Parser a -> Parser a s p = A.space *> p @@ -1478,9 +1480,9 @@ serializeCommand = \case ERR e -> s (ERR_, e) OK -> s OK_ SUSPENDED -> s SUSPENDED_ - RFPROG fId rcvd total -> s (RFPROG_, Str $ bshow fId, rcvd, total) - RFDONE fId fPath -> s (RFDONE_, Str $ bshow fId, fPath) - RFERR fId e -> s (RFERR_, Str $ bshow fId, e) + RFPROG rcvd total -> s (RFPROG_, rcvd, total) + RFDONE fPath -> s (RFDONE_, fPath) + RFERR e -> s (RFERR_, e) where s :: StrEncoding a => a -> ByteString s = strEncode diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index ab4cfd8fb..2ff4b3dc0 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -1738,20 +1738,26 @@ getXFTPServerId_ db ProtocolServer {host, port, keyHash} = do firstRow fromOnly SEXFTPServerNotFound $ DB.query db "SELECT xftp_server_id FROM xftp_servers WHERE xftp_host = ? AND xftp_port = ? AND xftp_key_hash = ?" (host, port, keyHash) -createRcvFile :: DB.Connection -> UserId -> FileDescription 'FPRecipient -> FilePath -> FilePath -> IO RcvFileId -createRcvFile db userId fd@FileDescription {chunks} saveDir tmpPath = do - rcvFileId <- insertRcvFile fd - forM_ chunks $ \fc@FileChunk {replicas} -> do - chunkId <- insertChunk fc rcvFileId - forM_ (zip [1 ..] replicas) $ \(rno, replica) -> insertReplica rno replica chunkId - pure rcvFileId +createRcvFile :: DB.Connection -> TVar ChaChaDRG -> UserId -> FileDescription 'FPRecipient -> FilePath -> FilePath -> IO (Either StoreError RcvFileId) +createRcvFile db gVar userId fd@FileDescription {chunks} saveDir tmpPath = runExceptT $ do + (rcvFileEntityId, rcvFileId) <- ExceptT $ insertRcvFile fd + liftIO $ + forM_ chunks $ \fc@FileChunk {replicas} -> do + chunkId <- insertChunk fc rcvFileId + forM_ (zip [1 ..] replicas) $ \(rno, replica) -> insertReplica rno replica chunkId + pure rcvFileEntityId where - insertRcvFile FileDescription {size, digest, key, nonce, chunkSize} = do - DB.execute - db - "INSERT INTO rcv_files (user_id, size, digest, key, nonce, chunk_size, tmp_path, save_dir, status) VALUES (?,?,?,?,?,?,?,?,?)" - (userId, size, digest, key, nonce, chunkSize, tmpPath, saveDir, RFSReceiving) - insertedRowId db + insertRcvFile :: FileDescription 'FPRecipient -> IO (Either StoreError (RcvFileId, DBRcvFileId)) + insertRcvFile FileDescription {size, digest, key, nonce, chunkSize} = runExceptT $ do + rcvFileEntityId <- ExceptT $ + createWithRandomId gVar $ \rcvFileEntityId -> + DB.execute + db + "INSERT INTO rcv_files (rcv_file_entity_id, user_id, size, digest, key, nonce, chunk_size, tmp_path, save_dir, status) VALUES (?,?,?,?,?,?,?,?,?,?)" + (rcvFileEntityId, userId, size, digest, key, nonce, chunkSize, tmpPath, saveDir, RFSReceiving) + rcvFileId <- liftIO $ insertedRowId db + pure (rcvFileEntityId, rcvFileId) + insertChunk :: FileChunk -> DBRcvFileId -> IO Int64 insertChunk FileChunk {chunkNo, chunkSize, digest} rcvFileId = do DB.execute db @@ -1766,10 +1772,10 @@ createRcvFile db userId fd@FileDescription {chunks} saveDir tmpPath = do "INSERT INTO rcv_file_chunk_replicas (replica_number, rcv_file_chunk_id, xftp_server_id, replica_id, replica_key) VALUES (?,?,?,?,?)" (replicaNo, chunkId, srvId, replicaId, replicaKey) -getRcvFile :: DB.Connection -> RcvFileId -> IO (Either StoreError RcvFile) +getRcvFile :: DB.Connection -> DBRcvFileId -> IO (Either StoreError RcvFile) getRcvFile db rcvFileId = runExceptT $ do - fd@RcvFile {userId, tmpPath} <- ExceptT getFile - chunks <- maybe (pure []) (liftIO . getChunks userId) tmpPath + fd@RcvFile {rcvFileEntityId, userId, tmpPath} <- ExceptT getFile + chunks <- maybe (pure []) (liftIO . getChunks rcvFileEntityId userId) tmpPath pure (fd {chunks} :: RcvFile) where getFile :: IO (Either StoreError RcvFile) @@ -1778,17 +1784,17 @@ getRcvFile db rcvFileId = runExceptT $ do DB.query db [sql| - SELECT user_id, size, digest, key, nonce, chunk_size, tmp_path, save_dir, save_path, status + SELECT rcv_file_entity_id, user_id, size, digest, key, nonce, chunk_size, tmp_path, save_dir, save_path, status FROM rcv_files WHERE rcv_file_id = ? |] (Only rcvFileId) where - toFile :: (UserId, FileSize Int64, FileDigest, C.SbKey, C.CbNonce, FileSize Word32, Maybe FilePath, FilePath, Maybe FilePath, RcvFileStatus) -> RcvFile - toFile (userId, size, digest, key, nonce, chunkSize, tmpPath, saveDir, savePath, status) = - RcvFile {userId, rcvFileId, size, digest, key, nonce, chunkSize, tmpPath, saveDir, savePath, status, chunks = []} - getChunks :: UserId -> FilePath -> IO [RcvFileChunk] - getChunks userId fileTmpPath = do + toFile :: (RcvFileId, UserId, FileSize Int64, FileDigest, C.SbKey, C.CbNonce, FileSize Word32, Maybe FilePath, FilePath, Maybe FilePath, RcvFileStatus) -> RcvFile + toFile (rcvFileEntityId, userId, size, digest, key, nonce, chunkSize, tmpPath, saveDir, savePath, status) = + RcvFile {rcvFileId, rcvFileEntityId, userId, size, digest, key, nonce, chunkSize, tmpPath, saveDir, savePath, status, chunks = []} + getChunks :: RcvFileId -> UserId -> FilePath -> IO [RcvFileChunk] + getChunks rcvFileEntityId userId fileTmpPath = do chunks <- map toChunk <$> DB.query @@ -1805,7 +1811,7 @@ getRcvFile db rcvFileId = runExceptT $ do where toChunk :: (Int64, Int, FileSize Word32, FileDigest, Maybe FilePath, Maybe Int) -> RcvFileChunk toChunk (rcvChunkId, chunkNo, chunkSize, digest, chunkTmpPath, delay) = - RcvFileChunk {userId, rcvFileId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath, delay, replicas = []} + RcvFileChunk {rcvFileId, rcvFileEntityId, userId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath, delay, replicas = []} getChunkReplicas :: Int64 -> IO [RcvFileChunkReplica] getChunkReplicas chunkId = do map toReplica @@ -1836,34 +1842,34 @@ increaseRcvChunkReplicaRetries db replicaId = do updatedAt <- getCurrentTime DB.execute db "UPDATE rcv_file_chunk_replicas SET retries = retries + 1, updated_at = ? WHERE rcv_file_chunk_replica_id = ?" (updatedAt, replicaId) -updateRcvFileChunkReceived :: DB.Connection -> Int64 -> Int64 -> RcvFileId -> FilePath -> IO (Either StoreError RcvFile) +updateRcvFileChunkReceived :: DB.Connection -> Int64 -> Int64 -> DBRcvFileId -> FilePath -> IO (Either StoreError RcvFile) updateRcvFileChunkReceived db rId cId fId chunkTmpPath = do updatedAt <- getCurrentTime DB.execute db "UPDATE rcv_file_chunk_replicas SET received = 1, updated_at = ? WHERE rcv_file_chunk_replica_id = ?" (updatedAt, rId) DB.execute db "UPDATE rcv_file_chunks SET tmp_path = ?, updated_at = ? WHERE rcv_file_chunk_id = ?" (chunkTmpPath, updatedAt, cId) getRcvFile db fId -updateRcvFileStatus :: DB.Connection -> RcvFileId -> RcvFileStatus -> IO () +updateRcvFileStatus :: DB.Connection -> DBRcvFileId -> RcvFileStatus -> IO () updateRcvFileStatus db rcvFileId status = do updatedAt <- getCurrentTime DB.execute db "UPDATE rcv_files SET status = ?, updated_at = ? WHERE rcv_file_id = ?" (status, updatedAt, rcvFileId) -updateRcvFileError :: DB.Connection -> RcvFileId -> String -> IO () +updateRcvFileError :: DB.Connection -> DBRcvFileId -> String -> IO () updateRcvFileError db rcvFileId errStr = do updatedAt <- getCurrentTime DB.execute db "UPDATE rcv_files SET tmp_path = NULL, error = ?, status = ?, updated_at = ? WHERE rcv_file_id = ?" (errStr, RFSError, updatedAt, rcvFileId) -updateRcvFileComplete :: DB.Connection -> RcvFileId -> FilePath -> IO () +updateRcvFileComplete :: DB.Connection -> DBRcvFileId -> FilePath -> IO () updateRcvFileComplete db rcvFileId savePath = do updatedAt <- getCurrentTime DB.execute db "UPDATE rcv_files SET tmp_path = NULL, save_path = ?, status = ?, updated_at = ? WHERE rcv_file_id = ?" (savePath, RFSComplete, updatedAt, rcvFileId) -updateRcvFileNoSavePath :: DB.Connection -> RcvFileId -> IO () +updateRcvFileNoSavePath :: DB.Connection -> DBRcvFileId -> IO () updateRcvFileNoSavePath db rcvFileId = do updatedAt <- getCurrentTime DB.execute db "UPDATE rcv_files SET save_path = NULL, updated_at = ? WHERE rcv_file_id = ?" (updatedAt, rcvFileId) -updateRcvFileNoTmpPath :: DB.Connection -> RcvFileId -> IO () +updateRcvFileNoTmpPath :: DB.Connection -> DBRcvFileId -> IO () updateRcvFileNoTmpPath db rcvFileId = do updatedAt <- getCurrentTime DB.execute db "UPDATE rcv_files SET tmp_path = NULL, updated_at = ? WHERE rcv_file_id = ?" (updatedAt, rcvFileId) @@ -1875,7 +1881,7 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} = do db [sql| SELECT - f.user_id, f.rcv_file_id, c.rcv_file_chunk_id, c.chunk_no, c.chunk_size, c.digest, f.tmp_path, c.tmp_path, c.delay, + f.rcv_file_id, f.rcv_file_entity_id, f.user_id, c.rcv_file_chunk_id, c.chunk_no, c.chunk_size, c.digest, f.tmp_path, c.tmp_path, c.delay, r.rcv_file_chunk_replica_id, r.replica_id, r.replica_key, r.received, r.retries FROM rcv_file_chunk_replicas r JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id @@ -1888,11 +1894,12 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} = do |] (host, port, keyHash, RFSReceiving) where - toChunk :: ((UserId, RcvFileId, Int64, Int, FileSize Word32, FileDigest, FilePath, Maybe FilePath, Maybe Int) :. (Int64, ChunkReplicaId, C.APrivateSignKey, Bool, Int)) -> RcvFileChunk - toChunk ((userId, rcvFileId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath, delay) :. (rcvChunkReplicaId, replicaId, replicaKey, received, retries)) = + toChunk :: ((DBRcvFileId, RcvFileId, UserId, Int64, Int, FileSize Word32, FileDigest, FilePath, Maybe FilePath, Maybe Int) :. (Int64, ChunkReplicaId, C.APrivateSignKey, Bool, Int)) -> RcvFileChunk + toChunk ((rcvFileId, rcvFileEntityId, userId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath, delay) :. (rcvChunkReplicaId, replicaId, replicaKey, received, retries)) = RcvFileChunk - { userId, - rcvFileId, + { rcvFileId, + rcvFileEntityId, + userId, rcvChunkId, chunkNo, chunkSize, @@ -1905,7 +1912,7 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} = do getNextRcvFileToDecrypt :: DB.Connection -> IO (Maybe RcvFile) getNextRcvFileToDecrypt db = do - fileId_ :: Maybe RcvFileId <- + fileId_ :: Maybe DBRcvFileId <- maybeFirstRow fromOnly $ DB.query db "SELECT rcv_file_id FROM rcv_files WHERE status IN (?,?) ORDER BY created_at ASC LIMIT 1" (RFSReceived, RFSDecrypting) case fileId_ of @@ -1931,6 +1938,6 @@ getPendingRcvFilesServers db = do toServer :: (NonEmpty TransportHost, ServiceName, C.KeyHash) -> XFTPServer toServer (host, port, keyHash) = XFTPServer host port keyHash -getTmpFilePaths :: DB.Connection -> IO [(RcvFileId, FilePath)] +getTmpFilePaths :: DB.Connection -> IO [(DBRcvFileId, FilePath)] getTmpFilePaths db = DB.query db "SELECT rcv_file_id, tmp_path FROM rcv_files WHERE status IN (?,?) AND tmp_path IS NOT NULL" (RFSComplete, RFSError) diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230223_files.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230223_files.hs index 94c41f0b7..2ad06b4c8 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230223_files.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20230223_files.hs @@ -19,7 +19,8 @@ CREATE TABLE xftp_servers ( ); CREATE TABLE rcv_files ( - rcv_file_id INTEGER PRIMARY KEY AUTOINCREMENT, + rcv_file_id INTEGER PRIMARY KEY, + rcv_file_entity_id BLOB NOT NULL, user_id INTEGER NOT NULL REFERENCES users ON DELETE CASCADE, size INTEGER NOT NULL, digest BLOB NOT NULL, @@ -32,7 +33,8 @@ CREATE TABLE rcv_files ( status TEXT NOT NULL, error TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')), - updated_at TEXT NOT NULL DEFAULT (datetime('now')) + updated_at TEXT NOT NULL DEFAULT (datetime('now')), + UNIQUE(rcv_file_entity_id) ); CREATE INDEX idx_rcv_files_user_id ON rcv_files(user_id); diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql index 101cf0a6b..ee7850191 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql @@ -293,7 +293,8 @@ CREATE TABLE xftp_servers( UNIQUE(xftp_host, xftp_port, xftp_key_hash) ); CREATE TABLE rcv_files( - rcv_file_id INTEGER PRIMARY KEY AUTOINCREMENT, + rcv_file_id INTEGER PRIMARY KEY, + rcv_file_entity_id BLOB NOT NULL, user_id INTEGER NOT NULL REFERENCES users ON DELETE CASCADE, size INTEGER NOT NULL, digest BLOB NOT NULL, @@ -306,7 +307,8 @@ CREATE TABLE rcv_files( status TEXT NOT NULL, error TEXT, created_at TEXT NOT NULL DEFAULT(datetime('now')), - updated_at TEXT NOT NULL DEFAULT(datetime('now')) + updated_at TEXT NOT NULL DEFAULT(datetime('now')), + UNIQUE(rcv_file_entity_id) ); CREATE INDEX idx_rcv_files_user_id ON rcv_files(user_id); CREATE TABLE rcv_file_chunks( diff --git a/tests/XFTPAgent.hs b/tests/XFTPAgent.hs index cb33d766e..27d4afaf3 100644 --- a/tests/XFTPAgent.hs +++ b/tests/XFTPAgent.hs @@ -6,6 +6,7 @@ module XFTPAgent where import AgentTests.FunctionalAPITests (get, rfGet, runRight, runRight_) +import Control.Logger.Simple import Control.Monad.Except import Data.Bifunctor (first) import qualified Data.ByteString as LB @@ -49,7 +50,7 @@ testXFTPAgentReceive = withXFTPServer $ do runRight_ $ do fd :: ValidFileDescription 'FPRecipient <- getFileDescription fdRcv fId <- xftpReceiveFile rcp 1 fd recipientFiles - ("", "", RFDONE fId' path) <- rfGet rcp + ("", fId', RFDONE path) <- rfGet rcp liftIO $ do fId' `shouldBe` fId LB.readFile path `shouldReturn` file @@ -61,8 +62,11 @@ getFileDescription path = do case vfd of AVFD fd' -> either (throwError . INTERNAL) pure $ checkParty fd' +logCfgNoLogs :: LogConfig +logCfgNoLogs = LogConfig {lc_file = Nothing, lc_stderr = False} + testXFTPAgentReceiveRestore :: IO () -testXFTPAgentReceiveRestore = do +testXFTPAgentReceiveRestore = withGlobalLogging logCfgNoLogs $ do let filePath = senderFiles "testfile" fdRcv = filePath <> ".xftp" "rcv1.xftp" fdSnd = filePath <> ".xftp" "snd.xftp.private" @@ -81,10 +85,11 @@ testXFTPAgentReceiveRestore = do -- receive file using agent - should not succeed due to server being down rcp <- getSMPAgentClient agentCfg initAgentServers - runRight_ $ do + fId <- runRight $ do fd :: ValidFileDescription 'FPRecipient <- getFileDescription fdRcv - void $ xftpReceiveFile rcp 1 fd recipientFiles + fId <- xftpReceiveFile rcp 1 fd recipientFiles liftIO $ timeout 300000 (get rcp) `shouldReturn` Nothing -- wait for worker attempt + pure fId disconnectAgentClient rcp doesDirectoryExist (recipientFiles "xftp.encrypted") `shouldReturn` True @@ -92,9 +97,9 @@ testXFTPAgentReceiveRestore = do rcp' <- getSMPAgentClient agentCfg initAgentServers withXFTPServerStoreLogOn $ \_ -> do -- receive file using agent - should succeed with server up - ("", "", RFDONE fId' path) <- rfGet rcp' + ("", fId', RFDONE path) <- rfGet rcp' liftIO $ do - fId' `shouldBe` 1 + fId' `shouldBe` fId file <- LB.readFile filePath LB.readFile path `shouldReturn` file @@ -102,7 +107,7 @@ testXFTPAgentReceiveRestore = do doesDirectoryExist (recipientFiles "xftp.encrypted") `shouldReturn` False testXFTPAgentReceiveCleanup :: IO () -testXFTPAgentReceiveCleanup = do +testXFTPAgentReceiveCleanup = withGlobalLogging logCfgNoLogs $ do let filePath = senderFiles "testfile" fdRcv = filePath <> ".xftp" "rcv1.xftp" fdSnd = filePath <> ".xftp" "snd.xftp.private" @@ -133,7 +138,7 @@ testXFTPAgentReceiveCleanup = do -- receive file using agent - should fail with AUTH error rcp' <- getSMPAgentClient agentCfg initAgentServers withXFTPServerThreadOn $ \_ -> do - ("", "", RFERR fId' (INTERNAL "XFTP {xftpErr = AUTH}")) <- rfGet rcp' + ("", fId', RFERR (INTERNAL "XFTP {xftpErr = AUTH}")) <- rfGet rcp' fId' `shouldBe` fId -- tmp path should be removed after permanent error