mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-30 16:26:02 +00:00
xftp: transform SndFile to recipient descriptions; non optional chunk digest (#710)
This commit is contained in:
@@ -36,9 +36,10 @@ import qualified Data.ByteString.Char8 as B
|
||||
import qualified Data.ByteString.Lazy.Char8 as LB
|
||||
import Data.Composition ((.:))
|
||||
import Data.Int (Int64)
|
||||
import Data.List (foldl', isSuffixOf, partition)
|
||||
import Data.List (foldl', isSuffixOf, partition, sortOn)
|
||||
import Data.List.NonEmpty (nonEmpty)
|
||||
import qualified Data.List.NonEmpty as L
|
||||
import Data.Map (Map)
|
||||
import qualified Data.Map.Strict as M
|
||||
import Data.Text (Text)
|
||||
import Data.Time.Clock (getCurrentTime)
|
||||
@@ -371,9 +372,9 @@ runXFTPSndPrepareWorker c doWork = do
|
||||
when (status == SFSEncrypting) $
|
||||
whenM (doesFileExist fsEncPath) $ removeFile fsEncPath
|
||||
withStore' c $ \db -> updateSndFileStatus db sndFileId SFSEncrypting
|
||||
(digest, chunkSpecs) <- encryptFileForUpload sndFile encPath
|
||||
(digest, chunkSpecsDigests) <- encryptFileForUpload sndFile encPath
|
||||
withStore c $ \db -> do
|
||||
updateSndFileEncrypted db sndFileId digest chunkSpecs
|
||||
updateSndFileEncrypted db sndFileId digest chunkSpecsDigests
|
||||
getSndFile db sndFileId
|
||||
else pure sndFile
|
||||
maxRecipients <- asks (xftpMaxRecipientsPerRequest . config)
|
||||
@@ -382,7 +383,7 @@ runXFTPSndPrepareWorker c doWork = do
|
||||
forM_ chunks $ createChunk numRecipients'
|
||||
withStore' c $ \db -> updateSndFileStatus db sndFileId SFSUploading
|
||||
where
|
||||
encryptFileForUpload :: SndFile -> FilePath -> m (FileDigest, [XFTPChunkSpec])
|
||||
encryptFileForUpload :: SndFile -> FilePath -> m (FileDigest, [(XFTPChunkSpec, FileDigest)])
|
||||
encryptFileForUpload SndFile {key, nonce, filePath} encPath = do
|
||||
let fileName = takeFileName filePath
|
||||
fileSize <- fromInteger <$> getFileSize filePath
|
||||
@@ -395,17 +396,17 @@ runXFTPSndPrepareWorker c doWork = do
|
||||
void $ liftError (INTERNAL . show) $ encryptFile filePath fileHdr key nonce fileSize' encSize encPath
|
||||
digest <- liftIO $ LC.sha512Hash <$> LB.readFile encPath
|
||||
let chunkSpecs = prepareChunkSpecs encPath chunkSizes
|
||||
pure (FileDigest digest, chunkSpecs)
|
||||
chunkDigests <- map FileDigest <$> mapM (liftIO . getChunkDigest) chunkSpecs
|
||||
pure (FileDigest digest, zip chunkSpecs chunkDigests)
|
||||
createChunk :: Int -> SndFileChunk -> m ()
|
||||
createChunk numRecipients' SndFileChunk {sndChunkId, userId, chunkSpec} = do
|
||||
createChunk numRecipients' SndFileChunk {sndChunkId, userId, chunkSpec = XFTPChunkSpec {chunkSize}, digest = FileDigest chDigest} = do
|
||||
(sndKey, spKey) <- liftIO $ C.generateSignatureKeyPair C.SEd25519
|
||||
rKeys <- liftIO $ L.fromList <$> replicateM numRecipients' (C.generateSignatureKeyPair C.SEd25519)
|
||||
ch@FileInfo {digest} <- liftIO $ getChunkInfo sndKey chunkSpec
|
||||
-- TODO with retry on temporary errors
|
||||
let fileInfo = FileInfo {sndKey, size = fromIntegral chunkSize, digest = chDigest}
|
||||
srvAuth@(ProtoServerWithAuth srv _) <- getServer
|
||||
(sndId, rIds) <- agentXFTPCreateChunk c userId srvAuth spKey ch (L.map fst rKeys)
|
||||
(sndId, rIds) <- agentXFTPCreateChunk c userId srvAuth spKey fileInfo (L.map fst rKeys)
|
||||
let rcvIdsKeys = L.toList $ L.map ChunkReplicaId rIds `L.zip` L.map snd rKeys
|
||||
withStore' c $ \db -> createSndFileReplica db sndChunkId (FileDigest digest) srv (ChunkReplicaId sndId) spKey rcvIdsKeys
|
||||
withStore' c $ \db -> createSndFileReplica db sndChunkId srv (ChunkReplicaId sndId) spKey rcvIdsKeys
|
||||
addXFTPSndWorker c $ Just srv
|
||||
getServer :: m XFTPServerWithAuth
|
||||
getServer = do
|
||||
@@ -455,8 +456,7 @@ runXFTPSndWorker c srv doWork = do
|
||||
let complete = all chunkUploaded chunks
|
||||
-- TODO calculate progress, notify SFPROG
|
||||
when complete $ do
|
||||
sndDescr <- sndFileToSndDescr sf
|
||||
rcvDescrs <- sndFileToRcvDescrs sf
|
||||
(sndDescr, rcvDescrs) <- sndFileToDescrs sf
|
||||
notify c sndFileEntityId $ SFDONE sndDescr rcvDescrs
|
||||
forM_ prefixPath (removePath <=< toFSFilePath)
|
||||
withStore' c $ \db -> updateSndFileStatus db sndFileId SFSComplete
|
||||
@@ -473,25 +473,60 @@ runXFTPSndWorker c srv doWork = do
|
||||
let rcvIdsKeys' = L.toList $ L.map ChunkReplicaId rIds `L.zip` L.map snd rKeys
|
||||
cr' <- withStore' c $ \db -> addSndChunkReplicaRecipients db cr rcvIdsKeys'
|
||||
addRecipients ch cr'
|
||||
sndFileToSndDescr :: SndFile -> m (ValidFileDescription 'FSender)
|
||||
sndFileToSndDescr SndFile {digest = Nothing} = throwError $ INTERNAL "snd file has no digest"
|
||||
sndFileToSndDescr SndFile {chunks = []} = throwError $ INTERNAL "snd file has no chunks"
|
||||
sndFileToSndDescr SndFile {digest = Just digest, key, nonce, chunks = chunks@(fstChunk : _)} = do
|
||||
sndFileToDescrs :: SndFile -> m (ValidFileDescription 'FSender, [ValidFileDescription 'FRecipient])
|
||||
sndFileToDescrs SndFile {digest = Nothing} = throwError $ INTERNAL "snd file has no digest"
|
||||
sndFileToDescrs SndFile {chunks = []} = throwError $ INTERNAL "snd file has no chunks"
|
||||
sndFileToDescrs SndFile {digest = Just digest, key, nonce, chunks = chunks@(fstChunk : _)} = do
|
||||
let chunkSize = FileSize $ sndChunkSize fstChunk
|
||||
size = FileSize $ sum $ map (fromIntegral . sndChunkSize) chunks
|
||||
descrChunks <- mapM toDescrChunk chunks
|
||||
let fd = FileDescription {party = SFSender, size, digest, key, nonce, chunkSize, chunks = descrChunks}
|
||||
either (throwError . INTERNAL) pure $ validateFileDescription' fd
|
||||
-- snd description
|
||||
sndDescrChunks <- mapM toSndDescrChunk chunks
|
||||
let fdSnd = FileDescription {party = SFSender, size, digest, key, nonce, chunkSize, chunks = sndDescrChunks}
|
||||
validFdSnd <- either (throwError . INTERNAL) pure $ validateFileDescription fdSnd
|
||||
-- rcv descriptions
|
||||
let fdRcv = FileDescription {party = SFRecipient, size, digest, key, nonce, chunkSize, chunks = []}
|
||||
fdRcvs = createRcvFileDescriptions fdRcv chunks
|
||||
validFdRcvs <- either (throwError . INTERNAL) pure $ mapM validateFileDescription fdRcvs
|
||||
pure (validFdSnd, validFdRcvs)
|
||||
toSndDescrChunk :: SndFileChunk -> m FileChunk
|
||||
toSndDescrChunk SndFileChunk {replicas = []} = throwError $ INTERNAL "snd file chunk has no replicas"
|
||||
toSndDescrChunk ch@SndFileChunk {chunkNo, digest = chDigest, replicas = (SndFileChunkReplica {server, replicaId, replicaKey} : _)} = do
|
||||
let chunkSize = FileSize $ sndChunkSize ch
|
||||
replicas = [FileChunkReplica {server, replicaId, replicaKey}]
|
||||
pure FileChunk {chunkNo, digest = chDigest, chunkSize, replicas}
|
||||
createRcvFileDescriptions :: FileDescription 'FRecipient -> [SndFileChunk] -> [FileDescription 'FRecipient]
|
||||
createRcvFileDescriptions fd sndChunks = map (\chunks -> (fd :: (FileDescription 'FRecipient)) {chunks}) rcvChunks
|
||||
where
|
||||
toDescrChunk :: SndFileChunk -> m FileChunk
|
||||
toDescrChunk SndFileChunk {digest = Nothing} = throwError $ INTERNAL "snd file chunk has no digest"
|
||||
toDescrChunk SndFileChunk {replicas = []} = throwError $ INTERNAL "snd file chunk has no replicas"
|
||||
toDescrChunk ch@SndFileChunk {chunkNo, digest = Just chDigest, replicas = (SndFileChunkReplica {server, replicaId, replicaKey} : _)} = do
|
||||
rcvReplicas :: [SentRecipientReplica]
|
||||
rcvReplicas = concatMap toSentRecipientReplicas sndChunks
|
||||
toSentRecipientReplicas :: SndFileChunk -> [SentRecipientReplica]
|
||||
toSentRecipientReplicas ch@SndFileChunk {chunkNo, digest, replicas} =
|
||||
let chunkSize = FileSize $ sndChunkSize ch
|
||||
replicas = [FileChunkReplica {server, replicaId, replicaKey}]
|
||||
pure FileChunk {chunkNo, digest = chDigest, chunkSize, replicas}
|
||||
sndFileToRcvDescrs :: SndFile -> m [ValidFileDescription 'FRecipient]
|
||||
sndFileToRcvDescrs SndFile {} = do
|
||||
undefined
|
||||
in concatMap
|
||||
( \SndFileChunkReplica {server, rcvIdsKeys} ->
|
||||
zipWith
|
||||
(\rcvNo (replicaId, replicaKey) -> SentRecipientReplica {chunkNo, server, rcvNo, replicaId, replicaKey, digest, chunkSize})
|
||||
[1 ..]
|
||||
rcvIdsKeys
|
||||
)
|
||||
replicas
|
||||
rcvChunks :: [[FileChunk]]
|
||||
rcvChunks = map (sortChunks . M.elems) $ M.elems $ foldl' addRcvChunk M.empty rcvReplicas
|
||||
sortChunks :: [FileChunk] -> [FileChunk]
|
||||
sortChunks = map reverseReplicas . sortOn (chunkNo :: FileChunk -> Int)
|
||||
reverseReplicas ch@FileChunk {replicas} = (ch :: FileChunk) {replicas = reverse replicas}
|
||||
addRcvChunk :: Map Int (Map Int FileChunk) -> SentRecipientReplica -> Map Int (Map Int FileChunk)
|
||||
addRcvChunk m SentRecipientReplica {chunkNo, server, rcvNo, replicaId, replicaKey, digest, chunkSize} =
|
||||
M.alter (Just . addOrChangeRecipient) rcvNo m
|
||||
where
|
||||
addOrChangeRecipient :: Maybe (Map Int FileChunk) -> Map Int FileChunk
|
||||
addOrChangeRecipient = \case
|
||||
Just m' -> M.alter (Just . addOrChangeChunk) chunkNo m'
|
||||
_ -> M.singleton chunkNo $ FileChunk {chunkNo, digest, chunkSize, replicas = [replica']}
|
||||
addOrChangeChunk :: Maybe FileChunk -> FileChunk
|
||||
addOrChangeChunk = \case
|
||||
Just ch@FileChunk {replicas} -> ch {replicas = replica' : replicas}
|
||||
_ -> FileChunk {chunkNo, digest, chunkSize, replicas = [replica']}
|
||||
replica' = FileChunkReplica {server, replicaId, replicaKey}
|
||||
chunkUploaded SndFileChunk {replicas} =
|
||||
any (\SndFileChunkReplica {replicaStatus} -> replicaStatus == SFRSUploaded) replicas
|
||||
|
||||
@@ -22,7 +22,8 @@ module Simplex.FileTransfer.Client.Main
|
||||
chunkSize3,
|
||||
maxFileSize,
|
||||
fileSizeLen,
|
||||
getChunkInfo,
|
||||
getChunkDigest,
|
||||
SentRecipientReplica (..),
|
||||
)
|
||||
where
|
||||
|
||||
@@ -33,6 +34,7 @@ import Control.Monad.Except
|
||||
import Crypto.Random (getRandomBytes)
|
||||
import qualified Data.Attoparsec.ByteString.Char8 as A
|
||||
import Data.Bifunctor (first)
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import qualified Data.ByteString.Lazy.Char8 as LB
|
||||
import Data.Char (toLower)
|
||||
@@ -408,11 +410,15 @@ cliSendFileOpts SendOptions {filePath, outputDir, numRecipients, xftpServers, re
|
||||
pure (fdRcvPaths, fdSndPath)
|
||||
|
||||
getChunkInfo :: SndPublicVerifyKey -> XFTPChunkSpec -> IO FileInfo
|
||||
getChunkInfo sndKey XFTPChunkSpec {filePath = chunkPath, chunkOffset, chunkSize} =
|
||||
getChunkInfo sndKey spec@XFTPChunkSpec {chunkSize} = do
|
||||
digest <- getChunkDigest spec
|
||||
pure FileInfo {sndKey, size = fromIntegral chunkSize, digest}
|
||||
|
||||
getChunkDigest :: XFTPChunkSpec -> IO ByteString
|
||||
getChunkDigest XFTPChunkSpec {filePath = chunkPath, chunkOffset, chunkSize} =
|
||||
withFile chunkPath ReadMode $ \h -> do
|
||||
hSeek h AbsoluteSeek $ fromIntegral chunkOffset
|
||||
digest <- LC.sha256Hash <$> LB.hGet h (fromIntegral chunkSize)
|
||||
pure FileInfo {sndKey, size = fromIntegral chunkSize, digest}
|
||||
LC.sha256Hash <$> LB.hGet h (fromIntegral chunkSize)
|
||||
|
||||
cliReceiveFile :: ReceiveOptions -> ExceptT CLIError IO ()
|
||||
cliReceiveFile ReceiveOptions {fileDescription, filePath, retryCount, tempPath, verbose, yes} =
|
||||
|
||||
@@ -25,7 +25,6 @@ module Simplex.FileTransfer.Description
|
||||
YAMLFileDescription (..), -- for tests
|
||||
YAMLServerReplicas (..), -- for tests
|
||||
validateFileDescription,
|
||||
validateFileDescription',
|
||||
groupReplicasByServer,
|
||||
replicaServer,
|
||||
fdSeparator,
|
||||
@@ -183,7 +182,7 @@ instance FilePartyI p => StrEncoding (ValidFileDescription p) where
|
||||
|
||||
instance StrEncoding AValidFileDescription where
|
||||
strEncode (AVFD fd) = strEncode fd
|
||||
strDecode = validateFileDescription <=< strDecode
|
||||
strDecode = (\(AFD fd) -> AVFD <$> validateFileDescription fd) <=< strDecode
|
||||
strP = strDecode <$?> A.takeByteString
|
||||
|
||||
instance FilePartyI p => StrEncoding (FileDescription p) where
|
||||
@@ -196,26 +195,14 @@ instance StrEncoding AFileDescription where
|
||||
strDecode = decodeFileDescription <=< first show . Y.decodeEither'
|
||||
strP = strDecode <$?> A.takeByteString
|
||||
|
||||
validateFileDescription :: AFileDescription -> Either String AValidFileDescription
|
||||
validateFileDescription = \case
|
||||
AFD fd@FileDescription {size, chunks}
|
||||
| chunkNos /= [1 .. length chunks] -> Left "chunk numbers are not sequential"
|
||||
| chunksSize chunks /= unFileSize size -> Left "chunks total size is different than file size"
|
||||
| otherwise -> Right $ AVFD (ValidFD fd)
|
||||
where
|
||||
chunkNos = map (chunkNo :: FileChunk -> Int) chunks
|
||||
chunksSize = fromIntegral . foldl' (\s FileChunk {chunkSize} -> s + unFileSize chunkSize) 0
|
||||
|
||||
-- TODO refactor
|
||||
validateFileDescription' :: FileDescription p -> Either String (ValidFileDescription p)
|
||||
validateFileDescription' = \case
|
||||
fd@FileDescription {size, chunks}
|
||||
| chunkNos /= [1 .. length chunks] -> Left "chunk numbers are not sequential"
|
||||
| chunksSize chunks /= unFileSize size -> Left "chunks total size is different than file size"
|
||||
| otherwise -> Right $ ValidFD fd
|
||||
where
|
||||
chunkNos = map (chunkNo :: FileChunk -> Int) chunks
|
||||
chunksSize = fromIntegral . foldl' (\s FileChunk {chunkSize} -> s + unFileSize chunkSize) 0
|
||||
validateFileDescription :: FileDescription p -> Either String (ValidFileDescription p)
|
||||
validateFileDescription fd@FileDescription {size, chunks}
|
||||
| chunkNos /= [1 .. length chunks] = Left "chunk numbers are not sequential"
|
||||
| chunksSize chunks /= unFileSize size = Left "chunks total size is different than file size"
|
||||
| otherwise = Right $ ValidFD fd
|
||||
where
|
||||
chunkNos = map (chunkNo :: FileChunk -> Int) chunks
|
||||
chunksSize = fromIntegral . foldl' (\s FileChunk {chunkSize} -> s + unFileSize chunkSize) 0
|
||||
|
||||
encodeFileDescription :: FileDescription p -> YAMLFileDescription
|
||||
encodeFileDescription FileDescription {party, size, digest, key, nonce, chunkSize, chunks} =
|
||||
|
||||
@@ -169,7 +169,7 @@ data SndFileChunk = SndFileChunk
|
||||
chunkNo :: Int,
|
||||
chunkSpec :: XFTPChunkSpec,
|
||||
filePrefixPath :: FilePath,
|
||||
digest :: Maybe FileDigest,
|
||||
digest :: FileDigest,
|
||||
replicas :: [SndFileChunkReplica]
|
||||
}
|
||||
deriving (Eq, Show)
|
||||
|
||||
@@ -2137,7 +2137,7 @@ getSndFile db sndFileId = runExceptT $ do
|
||||
replicas' <- getChunkReplicas sndChunkId
|
||||
pure (chunk {replicas = replicas'} :: SndFileChunk)
|
||||
where
|
||||
toChunk :: (Int64, Int, Int64, Word32, Maybe FileDigest) -> SndFileChunk
|
||||
toChunk :: (Int64, Int, Int64, Word32, FileDigest) -> SndFileChunk
|
||||
toChunk (sndChunkId, chunkNo, chunkOffset, chunkSize, digest) =
|
||||
let chunkSpec = XFTPChunkSpec {filePath = sndFileEncPath filePrefixPath, chunkOffset, chunkSize}
|
||||
in SndFileChunk {sndFileId, sndFileEntityId, userId, numRecipients, sndChunkId, chunkNo, chunkSpec, filePrefixPath, digest, replicas = []}
|
||||
@@ -2203,23 +2203,21 @@ updateSndFileStatus db sndFileId status = do
|
||||
updatedAt <- getCurrentTime
|
||||
DB.execute db "UPDATE snd_files SET status = ?, updated_at = ? WHERE snd_file_id = ?" (status, updatedAt, sndFileId)
|
||||
|
||||
updateSndFileEncrypted :: DB.Connection -> DBSndFileId -> FileDigest -> [XFTPChunkSpec] -> IO ()
|
||||
updateSndFileEncrypted db sndFileId digest chunkSpecs = do
|
||||
updateSndFileEncrypted :: DB.Connection -> DBSndFileId -> FileDigest -> [(XFTPChunkSpec, FileDigest)] -> IO ()
|
||||
updateSndFileEncrypted db sndFileId digest chunkSpecsDigests = do
|
||||
updatedAt <- getCurrentTime
|
||||
DB.execute db "UPDATE snd_files SET status = ?, digest = ?, updated_at = ? WHERE snd_file_id = ?" (SFSEncrypted, digest, updatedAt, sndFileId)
|
||||
forM_ (zip [1 ..] chunkSpecs) $ \(chunkNo :: Int, XFTPChunkSpec {chunkOffset, chunkSize}) ->
|
||||
DB.execute db "INSERT INTO snd_file_chunks (snd_file_id, chunk_no, chunk_offset, chunk_size) VALUES (?,?,?,?)" (sndFileId, chunkNo, chunkOffset, chunkSize)
|
||||
forM_ (zip [1 ..] chunkSpecsDigests) $ \(chunkNo :: Int, (XFTPChunkSpec {chunkOffset, chunkSize}, chunkDigest)) ->
|
||||
DB.execute db "INSERT INTO snd_file_chunks (snd_file_id, chunk_no, chunk_offset, chunk_size, digest) VALUES (?,?,?,?,?)" (sndFileId, chunkNo, chunkOffset, chunkSize, chunkDigest)
|
||||
|
||||
updateSndFileComplete :: DB.Connection -> DBSndFileId -> IO ()
|
||||
updateSndFileComplete db sndFileId = do
|
||||
updatedAt <- getCurrentTime
|
||||
DB.execute db "UPDATE snd_files SET prefix_path = NULL, status = ?, updated_at = ? WHERE snd_file_id = ?" (SFSComplete, updatedAt, sndFileId)
|
||||
|
||||
createSndFileReplica :: DB.Connection -> Int64 -> FileDigest -> XFTPServer -> ChunkReplicaId -> C.APrivateSignKey -> [(ChunkReplicaId, C.APrivateSignKey)] -> IO ()
|
||||
createSndFileReplica db sndChunkId digest xftpServer sndId spKey rcvIdsKeys = do
|
||||
createSndFileReplica :: DB.Connection -> Int64 -> XFTPServer -> ChunkReplicaId -> C.APrivateSignKey -> [(ChunkReplicaId, C.APrivateSignKey)] -> IO ()
|
||||
createSndFileReplica db sndChunkId xftpServer sndId spKey rcvIdsKeys = do
|
||||
srvId <- createXFTPServer_ db xftpServer
|
||||
updatedAt <- getCurrentTime
|
||||
DB.execute db "UPDATE snd_file_chunks SET digest = ?, updated_at = ? WHERE snd_file_chunk_id = ?" (digest, updatedAt, sndChunkId)
|
||||
DB.execute
|
||||
db
|
||||
[sql|
|
||||
@@ -2267,7 +2265,7 @@ getNextSndChunkToUpload db server@ProtocolServer {host, port, keyHash} = do
|
||||
pure replica {rcvIdsKeys}
|
||||
pure (chunk {replicas = replicas'} :: SndFileChunk)
|
||||
where
|
||||
toChunk :: ((DBSndFileId, SndFileId, UserId, Int, FilePath) :. (Int64, Int, Int64, Word32, Maybe FileDigest) :. (Int64, ChunkReplicaId, C.APrivateSignKey, SndFileReplicaStatus, Maybe Int64, Int)) -> SndFileChunk
|
||||
toChunk :: ((DBSndFileId, SndFileId, UserId, Int, FilePath) :. (Int64, Int, Int64, Word32, FileDigest) :. (Int64, ChunkReplicaId, C.APrivateSignKey, SndFileReplicaStatus, Maybe Int64, Int)) -> SndFileChunk
|
||||
toChunk ((sndFileId, sndFileEntityId, userId, numRecipients, filePrefixPath) :. (sndChunkId, chunkNo, chunkOffset, chunkSize, digest) :. (sndChunkReplicaId, replicaId, replicaKey, replicaStatus, delay, retries)) =
|
||||
let chunkSpec = XFTPChunkSpec {filePath = sndFileEncPath filePrefixPath, chunkOffset, chunkSize}
|
||||
in SndFileChunk
|
||||
|
||||
Reference in New Issue
Block a user