diff --git a/src/Simplex/FileTransfer/Agent.hs b/src/Simplex/FileTransfer/Agent.hs index 547e58326..7fd6be36c 100644 --- a/src/Simplex/FileTransfer/Agent.hs +++ b/src/Simplex/FileTransfer/Agent.hs @@ -36,9 +36,10 @@ import qualified Data.ByteString.Char8 as B import qualified Data.ByteString.Lazy.Char8 as LB import Data.Composition ((.:)) import Data.Int (Int64) -import Data.List (foldl', isSuffixOf, partition) +import Data.List (foldl', isSuffixOf, partition, sortOn) import Data.List.NonEmpty (nonEmpty) import qualified Data.List.NonEmpty as L +import Data.Map (Map) import qualified Data.Map.Strict as M import Data.Text (Text) import Data.Time.Clock (getCurrentTime) @@ -371,9 +372,9 @@ runXFTPSndPrepareWorker c doWork = do when (status == SFSEncrypting) $ whenM (doesFileExist fsEncPath) $ removeFile fsEncPath withStore' c $ \db -> updateSndFileStatus db sndFileId SFSEncrypting - (digest, chunkSpecs) <- encryptFileForUpload sndFile encPath + (digest, chunkSpecsDigests) <- encryptFileForUpload sndFile encPath withStore c $ \db -> do - updateSndFileEncrypted db sndFileId digest chunkSpecs + updateSndFileEncrypted db sndFileId digest chunkSpecsDigests getSndFile db sndFileId else pure sndFile maxRecipients <- asks (xftpMaxRecipientsPerRequest . config) @@ -382,7 +383,7 @@ runXFTPSndPrepareWorker c doWork = do forM_ chunks $ createChunk numRecipients' withStore' c $ \db -> updateSndFileStatus db sndFileId SFSUploading where - encryptFileForUpload :: SndFile -> FilePath -> m (FileDigest, [XFTPChunkSpec]) + encryptFileForUpload :: SndFile -> FilePath -> m (FileDigest, [(XFTPChunkSpec, FileDigest)]) encryptFileForUpload SndFile {key, nonce, filePath} encPath = do let fileName = takeFileName filePath fileSize <- fromInteger <$> getFileSize filePath @@ -395,17 +396,17 @@ runXFTPSndPrepareWorker c doWork = do void $ liftError (INTERNAL . show) $ encryptFile filePath fileHdr key nonce fileSize' encSize encPath digest <- liftIO $ LC.sha512Hash <$> LB.readFile encPath let chunkSpecs = prepareChunkSpecs encPath chunkSizes - pure (FileDigest digest, chunkSpecs) + chunkDigests <- map FileDigest <$> mapM (liftIO . getChunkDigest) chunkSpecs + pure (FileDigest digest, zip chunkSpecs chunkDigests) createChunk :: Int -> SndFileChunk -> m () - createChunk numRecipients' SndFileChunk {sndChunkId, userId, chunkSpec} = do + createChunk numRecipients' SndFileChunk {sndChunkId, userId, chunkSpec = XFTPChunkSpec {chunkSize}, digest = FileDigest chDigest} = do (sndKey, spKey) <- liftIO $ C.generateSignatureKeyPair C.SEd25519 rKeys <- liftIO $ L.fromList <$> replicateM numRecipients' (C.generateSignatureKeyPair C.SEd25519) - ch@FileInfo {digest} <- liftIO $ getChunkInfo sndKey chunkSpec - -- TODO with retry on temporary errors + let fileInfo = FileInfo {sndKey, size = fromIntegral chunkSize, digest = chDigest} srvAuth@(ProtoServerWithAuth srv _) <- getServer - (sndId, rIds) <- agentXFTPCreateChunk c userId srvAuth spKey ch (L.map fst rKeys) + (sndId, rIds) <- agentXFTPCreateChunk c userId srvAuth spKey fileInfo (L.map fst rKeys) let rcvIdsKeys = L.toList $ L.map ChunkReplicaId rIds `L.zip` L.map snd rKeys - withStore' c $ \db -> createSndFileReplica db sndChunkId (FileDigest digest) srv (ChunkReplicaId sndId) spKey rcvIdsKeys + withStore' c $ \db -> createSndFileReplica db sndChunkId srv (ChunkReplicaId sndId) spKey rcvIdsKeys addXFTPSndWorker c $ Just srv getServer :: m XFTPServerWithAuth getServer = do @@ -455,8 +456,7 @@ runXFTPSndWorker c srv doWork = do let complete = all chunkUploaded chunks -- TODO calculate progress, notify SFPROG when complete $ do - sndDescr <- sndFileToSndDescr sf - rcvDescrs <- sndFileToRcvDescrs sf + (sndDescr, rcvDescrs) <- sndFileToDescrs sf notify c sndFileEntityId $ SFDONE sndDescr rcvDescrs forM_ prefixPath (removePath <=< toFSFilePath) withStore' c $ \db -> updateSndFileStatus db sndFileId SFSComplete @@ -473,25 +473,60 @@ runXFTPSndWorker c srv doWork = do let rcvIdsKeys' = L.toList $ L.map ChunkReplicaId rIds `L.zip` L.map snd rKeys cr' <- withStore' c $ \db -> addSndChunkReplicaRecipients db cr rcvIdsKeys' addRecipients ch cr' - sndFileToSndDescr :: SndFile -> m (ValidFileDescription 'FSender) - sndFileToSndDescr SndFile {digest = Nothing} = throwError $ INTERNAL "snd file has no digest" - sndFileToSndDescr SndFile {chunks = []} = throwError $ INTERNAL "snd file has no chunks" - sndFileToSndDescr SndFile {digest = Just digest, key, nonce, chunks = chunks@(fstChunk : _)} = do + sndFileToDescrs :: SndFile -> m (ValidFileDescription 'FSender, [ValidFileDescription 'FRecipient]) + sndFileToDescrs SndFile {digest = Nothing} = throwError $ INTERNAL "snd file has no digest" + sndFileToDescrs SndFile {chunks = []} = throwError $ INTERNAL "snd file has no chunks" + sndFileToDescrs SndFile {digest = Just digest, key, nonce, chunks = chunks@(fstChunk : _)} = do let chunkSize = FileSize $ sndChunkSize fstChunk size = FileSize $ sum $ map (fromIntegral . sndChunkSize) chunks - descrChunks <- mapM toDescrChunk chunks - let fd = FileDescription {party = SFSender, size, digest, key, nonce, chunkSize, chunks = descrChunks} - either (throwError . INTERNAL) pure $ validateFileDescription' fd + -- snd description + sndDescrChunks <- mapM toSndDescrChunk chunks + let fdSnd = FileDescription {party = SFSender, size, digest, key, nonce, chunkSize, chunks = sndDescrChunks} + validFdSnd <- either (throwError . INTERNAL) pure $ validateFileDescription fdSnd + -- rcv descriptions + let fdRcv = FileDescription {party = SFRecipient, size, digest, key, nonce, chunkSize, chunks = []} + fdRcvs = createRcvFileDescriptions fdRcv chunks + validFdRcvs <- either (throwError . INTERNAL) pure $ mapM validateFileDescription fdRcvs + pure (validFdSnd, validFdRcvs) + toSndDescrChunk :: SndFileChunk -> m FileChunk + toSndDescrChunk SndFileChunk {replicas = []} = throwError $ INTERNAL "snd file chunk has no replicas" + toSndDescrChunk ch@SndFileChunk {chunkNo, digest = chDigest, replicas = (SndFileChunkReplica {server, replicaId, replicaKey} : _)} = do + let chunkSize = FileSize $ sndChunkSize ch + replicas = [FileChunkReplica {server, replicaId, replicaKey}] + pure FileChunk {chunkNo, digest = chDigest, chunkSize, replicas} + createRcvFileDescriptions :: FileDescription 'FRecipient -> [SndFileChunk] -> [FileDescription 'FRecipient] + createRcvFileDescriptions fd sndChunks = map (\chunks -> (fd :: (FileDescription 'FRecipient)) {chunks}) rcvChunks where - toDescrChunk :: SndFileChunk -> m FileChunk - toDescrChunk SndFileChunk {digest = Nothing} = throwError $ INTERNAL "snd file chunk has no digest" - toDescrChunk SndFileChunk {replicas = []} = throwError $ INTERNAL "snd file chunk has no replicas" - toDescrChunk ch@SndFileChunk {chunkNo, digest = Just chDigest, replicas = (SndFileChunkReplica {server, replicaId, replicaKey} : _)} = do + rcvReplicas :: [SentRecipientReplica] + rcvReplicas = concatMap toSentRecipientReplicas sndChunks + toSentRecipientReplicas :: SndFileChunk -> [SentRecipientReplica] + toSentRecipientReplicas ch@SndFileChunk {chunkNo, digest, replicas} = let chunkSize = FileSize $ sndChunkSize ch - replicas = [FileChunkReplica {server, replicaId, replicaKey}] - pure FileChunk {chunkNo, digest = chDigest, chunkSize, replicas} - sndFileToRcvDescrs :: SndFile -> m [ValidFileDescription 'FRecipient] - sndFileToRcvDescrs SndFile {} = do - undefined + in concatMap + ( \SndFileChunkReplica {server, rcvIdsKeys} -> + zipWith + (\rcvNo (replicaId, replicaKey) -> SentRecipientReplica {chunkNo, server, rcvNo, replicaId, replicaKey, digest, chunkSize}) + [1 ..] + rcvIdsKeys + ) + replicas + rcvChunks :: [[FileChunk]] + rcvChunks = map (sortChunks . M.elems) $ M.elems $ foldl' addRcvChunk M.empty rcvReplicas + sortChunks :: [FileChunk] -> [FileChunk] + sortChunks = map reverseReplicas . sortOn (chunkNo :: FileChunk -> Int) + reverseReplicas ch@FileChunk {replicas} = (ch :: FileChunk) {replicas = reverse replicas} + addRcvChunk :: Map Int (Map Int FileChunk) -> SentRecipientReplica -> Map Int (Map Int FileChunk) + addRcvChunk m SentRecipientReplica {chunkNo, server, rcvNo, replicaId, replicaKey, digest, chunkSize} = + M.alter (Just . addOrChangeRecipient) rcvNo m + where + addOrChangeRecipient :: Maybe (Map Int FileChunk) -> Map Int FileChunk + addOrChangeRecipient = \case + Just m' -> M.alter (Just . addOrChangeChunk) chunkNo m' + _ -> M.singleton chunkNo $ FileChunk {chunkNo, digest, chunkSize, replicas = [replica']} + addOrChangeChunk :: Maybe FileChunk -> FileChunk + addOrChangeChunk = \case + Just ch@FileChunk {replicas} -> ch {replicas = replica' : replicas} + _ -> FileChunk {chunkNo, digest, chunkSize, replicas = [replica']} + replica' = FileChunkReplica {server, replicaId, replicaKey} chunkUploaded SndFileChunk {replicas} = any (\SndFileChunkReplica {replicaStatus} -> replicaStatus == SFRSUploaded) replicas diff --git a/src/Simplex/FileTransfer/Client/Main.hs b/src/Simplex/FileTransfer/Client/Main.hs index 9dcd64c5e..2479dbf5c 100644 --- a/src/Simplex/FileTransfer/Client/Main.hs +++ b/src/Simplex/FileTransfer/Client/Main.hs @@ -22,7 +22,8 @@ module Simplex.FileTransfer.Client.Main chunkSize3, maxFileSize, fileSizeLen, - getChunkInfo, + getChunkDigest, + SentRecipientReplica (..), ) where @@ -33,6 +34,7 @@ import Control.Monad.Except import Crypto.Random (getRandomBytes) import qualified Data.Attoparsec.ByteString.Char8 as A import Data.Bifunctor (first) +import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import qualified Data.ByteString.Lazy.Char8 as LB import Data.Char (toLower) @@ -408,11 +410,15 @@ cliSendFileOpts SendOptions {filePath, outputDir, numRecipients, xftpServers, re pure (fdRcvPaths, fdSndPath) getChunkInfo :: SndPublicVerifyKey -> XFTPChunkSpec -> IO FileInfo -getChunkInfo sndKey XFTPChunkSpec {filePath = chunkPath, chunkOffset, chunkSize} = +getChunkInfo sndKey spec@XFTPChunkSpec {chunkSize} = do + digest <- getChunkDigest spec + pure FileInfo {sndKey, size = fromIntegral chunkSize, digest} + +getChunkDigest :: XFTPChunkSpec -> IO ByteString +getChunkDigest XFTPChunkSpec {filePath = chunkPath, chunkOffset, chunkSize} = withFile chunkPath ReadMode $ \h -> do hSeek h AbsoluteSeek $ fromIntegral chunkOffset - digest <- LC.sha256Hash <$> LB.hGet h (fromIntegral chunkSize) - pure FileInfo {sndKey, size = fromIntegral chunkSize, digest} + LC.sha256Hash <$> LB.hGet h (fromIntegral chunkSize) cliReceiveFile :: ReceiveOptions -> ExceptT CLIError IO () cliReceiveFile ReceiveOptions {fileDescription, filePath, retryCount, tempPath, verbose, yes} = diff --git a/src/Simplex/FileTransfer/Description.hs b/src/Simplex/FileTransfer/Description.hs index 506ded18b..581bf974f 100644 --- a/src/Simplex/FileTransfer/Description.hs +++ b/src/Simplex/FileTransfer/Description.hs @@ -25,7 +25,6 @@ module Simplex.FileTransfer.Description YAMLFileDescription (..), -- for tests YAMLServerReplicas (..), -- for tests validateFileDescription, - validateFileDescription', groupReplicasByServer, replicaServer, fdSeparator, @@ -183,7 +182,7 @@ instance FilePartyI p => StrEncoding (ValidFileDescription p) where instance StrEncoding AValidFileDescription where strEncode (AVFD fd) = strEncode fd - strDecode = validateFileDescription <=< strDecode + strDecode = (\(AFD fd) -> AVFD <$> validateFileDescription fd) <=< strDecode strP = strDecode <$?> A.takeByteString instance FilePartyI p => StrEncoding (FileDescription p) where @@ -196,26 +195,14 @@ instance StrEncoding AFileDescription where strDecode = decodeFileDescription <=< first show . Y.decodeEither' strP = strDecode <$?> A.takeByteString -validateFileDescription :: AFileDescription -> Either String AValidFileDescription -validateFileDescription = \case - AFD fd@FileDescription {size, chunks} - | chunkNos /= [1 .. length chunks] -> Left "chunk numbers are not sequential" - | chunksSize chunks /= unFileSize size -> Left "chunks total size is different than file size" - | otherwise -> Right $ AVFD (ValidFD fd) - where - chunkNos = map (chunkNo :: FileChunk -> Int) chunks - chunksSize = fromIntegral . foldl' (\s FileChunk {chunkSize} -> s + unFileSize chunkSize) 0 - --- TODO refactor -validateFileDescription' :: FileDescription p -> Either String (ValidFileDescription p) -validateFileDescription' = \case - fd@FileDescription {size, chunks} - | chunkNos /= [1 .. length chunks] -> Left "chunk numbers are not sequential" - | chunksSize chunks /= unFileSize size -> Left "chunks total size is different than file size" - | otherwise -> Right $ ValidFD fd - where - chunkNos = map (chunkNo :: FileChunk -> Int) chunks - chunksSize = fromIntegral . foldl' (\s FileChunk {chunkSize} -> s + unFileSize chunkSize) 0 +validateFileDescription :: FileDescription p -> Either String (ValidFileDescription p) +validateFileDescription fd@FileDescription {size, chunks} + | chunkNos /= [1 .. length chunks] = Left "chunk numbers are not sequential" + | chunksSize chunks /= unFileSize size = Left "chunks total size is different than file size" + | otherwise = Right $ ValidFD fd + where + chunkNos = map (chunkNo :: FileChunk -> Int) chunks + chunksSize = fromIntegral . foldl' (\s FileChunk {chunkSize} -> s + unFileSize chunkSize) 0 encodeFileDescription :: FileDescription p -> YAMLFileDescription encodeFileDescription FileDescription {party, size, digest, key, nonce, chunkSize, chunks} = diff --git a/src/Simplex/FileTransfer/Types.hs b/src/Simplex/FileTransfer/Types.hs index ebdcbf99a..90efabfc3 100644 --- a/src/Simplex/FileTransfer/Types.hs +++ b/src/Simplex/FileTransfer/Types.hs @@ -169,7 +169,7 @@ data SndFileChunk = SndFileChunk chunkNo :: Int, chunkSpec :: XFTPChunkSpec, filePrefixPath :: FilePath, - digest :: Maybe FileDigest, + digest :: FileDigest, replicas :: [SndFileChunkReplica] } deriving (Eq, Show) diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index 4e5edf996..d1d25c464 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -2137,7 +2137,7 @@ getSndFile db sndFileId = runExceptT $ do replicas' <- getChunkReplicas sndChunkId pure (chunk {replicas = replicas'} :: SndFileChunk) where - toChunk :: (Int64, Int, Int64, Word32, Maybe FileDigest) -> SndFileChunk + toChunk :: (Int64, Int, Int64, Word32, FileDigest) -> SndFileChunk toChunk (sndChunkId, chunkNo, chunkOffset, chunkSize, digest) = let chunkSpec = XFTPChunkSpec {filePath = sndFileEncPath filePrefixPath, chunkOffset, chunkSize} in SndFileChunk {sndFileId, sndFileEntityId, userId, numRecipients, sndChunkId, chunkNo, chunkSpec, filePrefixPath, digest, replicas = []} @@ -2203,23 +2203,21 @@ updateSndFileStatus db sndFileId status = do updatedAt <- getCurrentTime DB.execute db "UPDATE snd_files SET status = ?, updated_at = ? WHERE snd_file_id = ?" (status, updatedAt, sndFileId) -updateSndFileEncrypted :: DB.Connection -> DBSndFileId -> FileDigest -> [XFTPChunkSpec] -> IO () -updateSndFileEncrypted db sndFileId digest chunkSpecs = do +updateSndFileEncrypted :: DB.Connection -> DBSndFileId -> FileDigest -> [(XFTPChunkSpec, FileDigest)] -> IO () +updateSndFileEncrypted db sndFileId digest chunkSpecsDigests = do updatedAt <- getCurrentTime DB.execute db "UPDATE snd_files SET status = ?, digest = ?, updated_at = ? WHERE snd_file_id = ?" (SFSEncrypted, digest, updatedAt, sndFileId) - forM_ (zip [1 ..] chunkSpecs) $ \(chunkNo :: Int, XFTPChunkSpec {chunkOffset, chunkSize}) -> - DB.execute db "INSERT INTO snd_file_chunks (snd_file_id, chunk_no, chunk_offset, chunk_size) VALUES (?,?,?,?)" (sndFileId, chunkNo, chunkOffset, chunkSize) + forM_ (zip [1 ..] chunkSpecsDigests) $ \(chunkNo :: Int, (XFTPChunkSpec {chunkOffset, chunkSize}, chunkDigest)) -> + DB.execute db "INSERT INTO snd_file_chunks (snd_file_id, chunk_no, chunk_offset, chunk_size, digest) VALUES (?,?,?,?,?)" (sndFileId, chunkNo, chunkOffset, chunkSize, chunkDigest) updateSndFileComplete :: DB.Connection -> DBSndFileId -> IO () updateSndFileComplete db sndFileId = do updatedAt <- getCurrentTime DB.execute db "UPDATE snd_files SET prefix_path = NULL, status = ?, updated_at = ? WHERE snd_file_id = ?" (SFSComplete, updatedAt, sndFileId) -createSndFileReplica :: DB.Connection -> Int64 -> FileDigest -> XFTPServer -> ChunkReplicaId -> C.APrivateSignKey -> [(ChunkReplicaId, C.APrivateSignKey)] -> IO () -createSndFileReplica db sndChunkId digest xftpServer sndId spKey rcvIdsKeys = do +createSndFileReplica :: DB.Connection -> Int64 -> XFTPServer -> ChunkReplicaId -> C.APrivateSignKey -> [(ChunkReplicaId, C.APrivateSignKey)] -> IO () +createSndFileReplica db sndChunkId xftpServer sndId spKey rcvIdsKeys = do srvId <- createXFTPServer_ db xftpServer - updatedAt <- getCurrentTime - DB.execute db "UPDATE snd_file_chunks SET digest = ?, updated_at = ? WHERE snd_file_chunk_id = ?" (digest, updatedAt, sndChunkId) DB.execute db [sql| @@ -2267,7 +2265,7 @@ getNextSndChunkToUpload db server@ProtocolServer {host, port, keyHash} = do pure replica {rcvIdsKeys} pure (chunk {replicas = replicas'} :: SndFileChunk) where - toChunk :: ((DBSndFileId, SndFileId, UserId, Int, FilePath) :. (Int64, Int, Int64, Word32, Maybe FileDigest) :. (Int64, ChunkReplicaId, C.APrivateSignKey, SndFileReplicaStatus, Maybe Int64, Int)) -> SndFileChunk + toChunk :: ((DBSndFileId, SndFileId, UserId, Int, FilePath) :. (Int64, Int, Int64, Word32, FileDigest) :. (Int64, ChunkReplicaId, C.APrivateSignKey, SndFileReplicaStatus, Maybe Int64, Int)) -> SndFileChunk toChunk ((sndFileId, sndFileEntityId, userId, numRecipients, filePrefixPath) :. (sndChunkId, chunkNo, chunkOffset, chunkSize, digest) :. (sndChunkReplicaId, replicaId, replicaKey, replicaStatus, delay, retries)) = let chunkSpec = XFTPChunkSpec {filePath = sndFileEncPath filePrefixPath, chunkOffset, chunkSize} in SndFileChunk