mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-30 16:26:02 +00:00
xftp: client function to create new chunk (#712)
* xftp: client function to create new chunk * remove chunk digest update * rename
This commit is contained in:
committed by
GitHub
parent
6d9af2ec17
commit
2edc7529f9
@@ -5,6 +5,7 @@
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE PatternSynonyms #-}
|
||||
{-# LANGUAGE RankNTypes #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
{-# LANGUAGE TupleSections #-}
|
||||
@@ -48,7 +49,7 @@ import Simplex.FileTransfer.Client (XFTPChunkSpec (..))
|
||||
import Simplex.FileTransfer.Client.Main
|
||||
import Simplex.FileTransfer.Crypto
|
||||
import Simplex.FileTransfer.Description
|
||||
import Simplex.FileTransfer.Protocol (FileInfo (..), FileParty (..), FilePartyI, SFileParty (..))
|
||||
import Simplex.FileTransfer.Protocol (FileParty (..), FilePartyI, SFileParty (..))
|
||||
import Simplex.FileTransfer.Transport (XFTPRcvChunkSpec (..))
|
||||
import Simplex.FileTransfer.Types
|
||||
import Simplex.FileTransfer.Util (removePath, uniqueCombine)
|
||||
@@ -186,7 +187,7 @@ runXFTPRcvWorker c srv doWork = do
|
||||
chunkPath <- uniqueCombine fsFileTmpPath $ show chunkNo
|
||||
let chunkSpec = XFTPRcvChunkSpec chunkPath (unFileSize chunkSize) (unFileDigest digest)
|
||||
relChunkPath = fileTmpPath </> takeFileName chunkPath
|
||||
agentXFTPDownloadChunk c userId replica chunkSpec
|
||||
agentXFTPDownloadChunk c userId rcvChunkId replica chunkSpec
|
||||
(complete, progress) <- withStore c $ \db -> runExceptT $ do
|
||||
RcvFile {size = FileSize total, chunks} <-
|
||||
ExceptT $ updateRcvFileChunkReceived db (rcvChunkReplicaId replica) rcvChunkId rcvFileId relChunkPath
|
||||
@@ -398,14 +399,10 @@ runXFTPSndPrepareWorker c doWork = do
|
||||
chunkDigests <- map FileDigest <$> mapM (liftIO . getChunkDigest) chunkSpecs
|
||||
pure (FileDigest digest, zip chunkSpecs chunkDigests)
|
||||
createChunk :: Int -> SndFileChunk -> m ()
|
||||
createChunk numRecipients' SndFileChunk {sndChunkId, userId, chunkSpec = XFTPChunkSpec {chunkSize}, digest = FileDigest chDigest} = do
|
||||
(sndKey, spKey) <- liftIO $ C.generateSignatureKeyPair C.SEd25519
|
||||
rKeys <- liftIO $ L.fromList <$> replicateM numRecipients' (C.generateSignatureKeyPair C.SEd25519)
|
||||
let fileInfo = FileInfo {sndKey, size = fromIntegral chunkSize, digest = chDigest}
|
||||
createChunk numRecipients' ch = do
|
||||
srvAuth@(ProtoServerWithAuth srv _) <- getServer
|
||||
(sndId, rIds) <- agentXFTPCreateChunk c userId srvAuth spKey fileInfo (L.map fst rKeys)
|
||||
let rcvIdsKeys = L.toList $ L.map ChunkReplicaId rIds `L.zip` L.map snd rKeys
|
||||
withStore' c $ \db -> createSndFileReplica db sndChunkId srv (ChunkReplicaId sndId) spKey rcvIdsKeys
|
||||
replica <- agentXFTPNewChunk c ch numRecipients' srvAuth
|
||||
withStore' c $ \db -> createSndFileReplica db ch replica
|
||||
addXFTPSndWorker c $ Just srv
|
||||
getServer :: m XFTPServerWithAuth
|
||||
getServer = do
|
||||
|
||||
@@ -64,7 +64,7 @@ import qualified Simplex.Messaging.Crypto.Lazy as LC
|
||||
import Simplex.Messaging.Encoding
|
||||
import Simplex.Messaging.Encoding.String (StrEncoding (..))
|
||||
import Simplex.Messaging.Parsers (parseAll)
|
||||
import Simplex.Messaging.Protocol (ProtoServerWithAuth (..), SenderId, SndPrivateSignKey, SndPublicVerifyKey, XFTPServer, XFTPServerWithAuth)
|
||||
import Simplex.Messaging.Protocol (ProtoServerWithAuth (..), SenderId, SndPrivateSignKey, XFTPServer, XFTPServerWithAuth)
|
||||
import Simplex.Messaging.Server.CLI (getCliCommand')
|
||||
import Simplex.Messaging.Util (ifM, tshow, whenM)
|
||||
import System.Exit (exitFailure)
|
||||
@@ -328,7 +328,8 @@ cliSendFileOpts SendOptions {filePath, outputDir, numRecipients, xftpServers, re
|
||||
logInfo $ "uploading chunk " <> tshow chunkNo <> " to " <> showServer xftpServer <> "..."
|
||||
(sndKey, spKey) <- liftIO $ C.generateSignatureKeyPair C.SEd25519
|
||||
rKeys <- liftIO $ L.fromList <$> replicateM numRecipients (C.generateSignatureKeyPair C.SEd25519)
|
||||
ch@FileInfo {digest} <- liftIO $ getChunkInfo sndKey chunkSpec
|
||||
digest <- liftIO $ getChunkDigest chunkSpec
|
||||
let ch = FileInfo {sndKey, size = fromIntegral chunkSize, digest}
|
||||
c <- withRetry retryCount $ getXFTPServerClient a xftpServer
|
||||
(sndId, rIds) <- withRetry retryCount $ createXFTPChunk c spKey ch (L.map fst rKeys) auth
|
||||
withReconnect a xftpServer retryCount $ \c' -> uploadXFTPChunk c' spKey sndId chunkSpec
|
||||
@@ -409,11 +410,6 @@ cliSendFileOpts SendOptions {filePath, outputDir, numRecipients, xftpServers, re
|
||||
B.writeFile fdSndPath $ strEncode fdSnd
|
||||
pure (fdRcvPaths, fdSndPath)
|
||||
|
||||
getChunkInfo :: SndPublicVerifyKey -> XFTPChunkSpec -> IO FileInfo
|
||||
getChunkInfo sndKey spec@XFTPChunkSpec {chunkSize} = do
|
||||
digest <- getChunkDigest spec
|
||||
pure FileInfo {sndKey, size = fromIntegral chunkSize, digest}
|
||||
|
||||
getChunkDigest :: XFTPChunkSpec -> IO ByteString
|
||||
getChunkDigest XFTPChunkSpec {filePath = chunkPath, chunkOffset, chunkSize} =
|
||||
withFile chunkPath ReadMode $ \h -> do
|
||||
|
||||
@@ -177,6 +177,14 @@ data SndFileChunk = SndFileChunk
|
||||
sndChunkSize :: SndFileChunk -> Word32
|
||||
sndChunkSize SndFileChunk {chunkSpec = XFTPChunkSpec {chunkSize}} = chunkSize
|
||||
|
||||
data NewSndChunkReplica = NewSndChunkReplica
|
||||
{ server :: XFTPServer,
|
||||
replicaId :: ChunkReplicaId,
|
||||
replicaKey :: C.APrivateSignKey,
|
||||
rcvIdsKeys :: [(ChunkReplicaId, C.APrivateSignKey)]
|
||||
}
|
||||
deriving (Eq, Show)
|
||||
|
||||
data SndFileChunkReplica = SndFileChunkReplica
|
||||
{ sndChunkReplicaId :: Int64,
|
||||
server :: XFTPServer,
|
||||
|
||||
@@ -53,7 +53,7 @@ module Simplex.Messaging.Agent.Client
|
||||
agentNtfCheckSubscription,
|
||||
agentNtfDeleteSubscription,
|
||||
agentXFTPDownloadChunk,
|
||||
agentXFTPCreateChunk,
|
||||
agentXFTPNewChunk,
|
||||
agentXFTPUploadChunk,
|
||||
agentXFTPAddRecipients,
|
||||
agentCbEncrypt,
|
||||
@@ -108,6 +108,7 @@ import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.Either (lefts, partitionEithers)
|
||||
import Data.Functor (($>))
|
||||
import Data.Int (Int64)
|
||||
import Data.List (foldl', partition)
|
||||
import Data.List.NonEmpty (NonEmpty (..), (<|))
|
||||
import qualified Data.List.NonEmpty as L
|
||||
@@ -122,12 +123,12 @@ import Data.Word (Word16)
|
||||
import qualified Database.SQLite.Simple as DB
|
||||
import GHC.Generics (Generic)
|
||||
import Network.Socket (HostName)
|
||||
import Simplex.FileTransfer.Client (XFTPChunkSpec, XFTPClient, XFTPClientConfig (..), XFTPClientError)
|
||||
import Simplex.FileTransfer.Client (XFTPChunkSpec (..), XFTPClient, XFTPClientConfig (..), XFTPClientError)
|
||||
import qualified Simplex.FileTransfer.Client as X
|
||||
import Simplex.FileTransfer.Description (ChunkReplicaId (..), kb)
|
||||
import Simplex.FileTransfer.Description (ChunkReplicaId (..), FileDigest (..), kb)
|
||||
import Simplex.FileTransfer.Protocol (FileInfo (..), FileResponse, XFTPErrorType (DIGEST))
|
||||
import Simplex.FileTransfer.Transport (XFTPRcvChunkSpec (..))
|
||||
import Simplex.FileTransfer.Types (RcvFileChunkReplica (..), SndFileChunkReplica (..))
|
||||
import Simplex.FileTransfer.Types (NewSndChunkReplica (..), RcvFileChunkReplica (..), SndFileChunk (..), SndFileChunkReplica (..))
|
||||
import Simplex.FileTransfer.Util (uniqueCombine)
|
||||
import Simplex.Messaging.Agent.Env.SQLite
|
||||
import Simplex.Messaging.Agent.Lock
|
||||
@@ -168,7 +169,6 @@ import Simplex.Messaging.Protocol
|
||||
RcvNtfPublicDhKey,
|
||||
RecipientId,
|
||||
SMPMsgMeta (..),
|
||||
SenderId,
|
||||
SndPublicVerifyKey,
|
||||
XFTPServer,
|
||||
XFTPServerWithAuth,
|
||||
@@ -669,13 +669,13 @@ withNtfClient c srv = withLogClient c (0, srv, Nothing)
|
||||
withXFTPClient ::
|
||||
(AgentMonad m, ProtocolServerClient err msg) =>
|
||||
AgentClient ->
|
||||
(UserId, ProtoServer msg, EntityId) ->
|
||||
(UserId, ProtoServer msg, Int64) ->
|
||||
ByteString ->
|
||||
(Client msg -> ExceptT (ProtocolClientError err) IO b) ->
|
||||
m b
|
||||
withXFTPClient c (userId, srv, fId) cmdStr action = do
|
||||
tSess <- mkTransportSession c userId srv fId
|
||||
withLogClient c tSess (strEncode fId) cmdStr action
|
||||
withXFTPClient c (userId, srv, chunkId) cmdStr action = do
|
||||
tSess <- mkTransportSession c userId srv $ bshow chunkId
|
||||
withLogClient c tSess (bshow chunkId) cmdStr action
|
||||
|
||||
liftClient :: (AgentMonad m, Show err, Encoding err) => (err -> AgentErrorType) -> HostName -> ExceptT (ProtocolClientError err) IO a -> m a
|
||||
liftClient protocolError_ = liftError . protocolClientError protocolError_
|
||||
@@ -1066,13 +1066,21 @@ agentNtfDeleteSubscription :: AgentMonad m => AgentClient -> NtfSubscriptionId -
|
||||
agentNtfDeleteSubscription c subId NtfToken {ntfServer, ntfPrivKey} =
|
||||
withNtfClient c ntfServer subId "SDEL" $ \ntf -> ntfDeleteSubscription ntf ntfPrivKey subId
|
||||
|
||||
agentXFTPDownloadChunk :: AgentMonad m => AgentClient -> UserId -> RcvFileChunkReplica -> XFTPRcvChunkSpec -> m ()
|
||||
agentXFTPDownloadChunk c userId RcvFileChunkReplica {server, replicaId = ChunkReplicaId fId, replicaKey} chunkSpec =
|
||||
withXFTPClient c (userId, server, fId) "FGET" $ \xftp -> X.downloadXFTPChunk xftp replicaKey fId chunkSpec
|
||||
agentXFTPDownloadChunk :: AgentMonad m => AgentClient -> UserId -> Int64 -> RcvFileChunkReplica -> XFTPRcvChunkSpec -> m ()
|
||||
agentXFTPDownloadChunk c userId rcvChunkId RcvFileChunkReplica {server, replicaId = ChunkReplicaId fId, replicaKey} chunkSpec =
|
||||
withXFTPClient c (userId, server, rcvChunkId) "FGET" $ \xftp -> X.downloadXFTPChunk xftp replicaKey fId chunkSpec
|
||||
|
||||
agentXFTPCreateChunk :: AgentMonad m => AgentClient -> UserId -> XFTPServerWithAuth -> C.APrivateSignKey -> FileInfo -> NonEmpty C.APublicVerifyKey -> m (SenderId, NonEmpty RecipientId)
|
||||
agentXFTPCreateChunk c userId srv spKey file rcps =
|
||||
undefined
|
||||
agentXFTPNewChunk :: AgentMonad m => AgentClient -> SndFileChunk -> Int -> XFTPServerWithAuth -> m NewSndChunkReplica
|
||||
agentXFTPNewChunk c SndFileChunk {userId, sndChunkId, chunkSpec = XFTPChunkSpec {chunkSize}, digest = FileDigest digest} numRecipients' (ProtoServerWithAuth srv auth) = do
|
||||
(sndKey, replicaKey) <- liftIO $ C.generateSignatureKeyPair C.SEd25519
|
||||
rKeys <- liftIO $ L.fromList <$> replicateM numRecipients' (C.generateSignatureKeyPair C.SEd25519)
|
||||
let fileInfo = FileInfo {sndKey, size = fromIntegral chunkSize, digest}
|
||||
logServer "-->" c srv "" "FNEW"
|
||||
tSess <- mkTransportSession c userId srv $ bshow sndChunkId
|
||||
(sndId, rIds) <- withClient c tSess "FNEW" $ \xftp -> X.createXFTPChunk xftp replicaKey fileInfo (L.map fst rKeys) auth
|
||||
logServer "<--" c srv "" $ B.unwords ["SIDS", logSecret sndId]
|
||||
let rcvIdsKeys = L.toList $ L.map ChunkReplicaId rIds `L.zip` L.map snd rKeys
|
||||
pure NewSndChunkReplica {server = srv, replicaId = ChunkReplicaId sndId, replicaKey, rcvIdsKeys}
|
||||
|
||||
agentXFTPUploadChunk :: AgentMonad m => AgentClient -> UserId -> SndFileChunkReplica -> XFTPChunkSpec -> m ()
|
||||
agentXFTPUploadChunk c usedId SndFileChunkReplica {server, replicaId = ChunkReplicaId fId, replicaKey} chunkSpec =
|
||||
|
||||
@@ -2158,7 +2158,7 @@ getSndFile db sndFileId = runExceptT $ do
|
||||
(Only chunkId)
|
||||
forM replicas $ \replica@SndFileChunkReplica {sndChunkReplicaId} -> do
|
||||
rcvIdsKeys <- getChunkReplicaRecipients_ db sndChunkReplicaId
|
||||
pure replica {rcvIdsKeys}
|
||||
pure (replica :: SndFileChunkReplica) {rcvIdsKeys}
|
||||
where
|
||||
toReplica :: (Int64, ChunkReplicaId, C.APrivateSignKey, SndFileReplicaStatus, Maybe Int64, Int, NonEmpty TransportHost, ServiceName, C.KeyHash) -> SndFileChunkReplica
|
||||
toReplica (sndChunkReplicaId, replicaId, replicaKey, replicaStatus, delay, retries, host, port, keyHash) =
|
||||
@@ -2215,9 +2215,9 @@ updateSndFileComplete db sndFileId = do
|
||||
updatedAt <- getCurrentTime
|
||||
DB.execute db "UPDATE snd_files SET prefix_path = NULL, status = ?, updated_at = ? WHERE snd_file_id = ?" (SFSComplete, updatedAt, sndFileId)
|
||||
|
||||
createSndFileReplica :: DB.Connection -> Int64 -> XFTPServer -> ChunkReplicaId -> C.APrivateSignKey -> [(ChunkReplicaId, C.APrivateSignKey)] -> IO ()
|
||||
createSndFileReplica db sndChunkId xftpServer sndId spKey rcvIdsKeys = do
|
||||
srvId <- createXFTPServer_ db xftpServer
|
||||
createSndFileReplica :: DB.Connection -> SndFileChunk -> NewSndChunkReplica -> IO ()
|
||||
createSndFileReplica db SndFileChunk {sndChunkId} NewSndChunkReplica {server, replicaId, replicaKey, rcvIdsKeys} = do
|
||||
srvId <- createXFTPServer_ db server
|
||||
DB.execute
|
||||
db
|
||||
[sql|
|
||||
@@ -2225,7 +2225,7 @@ createSndFileReplica db sndChunkId xftpServer sndId spKey rcvIdsKeys = do
|
||||
(snd_file_chunk_id, replica_number, xftp_server_id, replica_id, replica_key, replica_status)
|
||||
VALUES (?,?,?,?,?,?)
|
||||
|]
|
||||
(sndChunkId, 1 :: Int, srvId, sndId, spKey, SFRSCreated)
|
||||
(sndChunkId, 1 :: Int, srvId, replicaId, replicaKey, SFRSCreated)
|
||||
rId <- insertedRowId db
|
||||
forM_ rcvIdsKeys $ \(rcvId, rcvKey) -> do
|
||||
DB.execute
|
||||
@@ -2262,7 +2262,7 @@ getNextSndChunkToUpload db server@ProtocolServer {host, port, keyHash} = do
|
||||
forM chunk_ $ \chunk@SndFileChunk {replicas} -> do
|
||||
replicas' <- forM replicas $ \replica@SndFileChunkReplica {sndChunkReplicaId} -> do
|
||||
rcvIdsKeys <- getChunkReplicaRecipients_ db sndChunkReplicaId
|
||||
pure replica {rcvIdsKeys}
|
||||
pure (replica :: SndFileChunkReplica) {rcvIdsKeys}
|
||||
pure (chunk {replicas = replicas'} :: SndFileChunk)
|
||||
where
|
||||
toChunk :: ((DBSndFileId, SndFileId, UserId, Int, FilePath) :. (Int64, Int, Int64, Word32, FileDigest) :. (Int64, ChunkReplicaId, C.APrivateSignKey, SndFileReplicaStatus, Maybe Int64, Int)) -> SndFileChunk
|
||||
@@ -2298,7 +2298,7 @@ addSndChunkReplicaRecipients db r@SndFileChunkReplica {sndChunkReplicaId} rcvIds
|
||||
|]
|
||||
(sndChunkReplicaId, rcvId, rcvKey)
|
||||
rcvIdsKeys' <- getChunkReplicaRecipients_ db sndChunkReplicaId
|
||||
pure r {rcvIdsKeys = rcvIdsKeys'}
|
||||
pure (r :: SndFileChunkReplica) {rcvIdsKeys = rcvIdsKeys'}
|
||||
|
||||
updateSndChunkReplicaStatus :: DB.Connection -> Int64 -> SndFileReplicaStatus -> IO ()
|
||||
updateSndChunkReplicaStatus db replicaId status = do
|
||||
|
||||
Reference in New Issue
Block a user