mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-06-07 08:51:59 +00:00
FADD batching, parallel delete and redirect download
This commit is contained in:
+47
-19
@@ -15,12 +15,12 @@ import {
|
||||
} from "./protocol/description.js"
|
||||
import type {FileInfo} from "./protocol/commands.js"
|
||||
import {
|
||||
createXFTPChunk, uploadXFTPChunk, downloadXFTPChunk, downloadXFTPChunkRaw,
|
||||
createXFTPChunk, addXFTPRecipients, uploadXFTPChunk, downloadXFTPChunk, downloadXFTPChunkRaw,
|
||||
deleteXFTPChunk, ackXFTPChunk, type XFTPClientAgent
|
||||
} from "./client.js"
|
||||
export {newXFTPAgent, closeXFTPAgent, type XFTPClientAgent, type TransportConfig,
|
||||
XFTPRetriableError, XFTPPermanentError, isRetriable, categorizeError, humanReadableMessage,
|
||||
ackXFTPChunk} from "./client.js"
|
||||
ackXFTPChunk, addXFTPRecipients} from "./client.js"
|
||||
import {processDownloadedFile, decryptReceivedChunk} from "./download.js"
|
||||
import type {XFTPServer} from "./protocol/address.js"
|
||||
import {formatXFTPServer, parseXFTPServer} from "./protocol/address.js"
|
||||
@@ -96,6 +96,7 @@ export function encryptFileForUpload(source: Uint8Array, fileName: string): Encr
|
||||
}
|
||||
|
||||
const DEFAULT_REDIRECT_THRESHOLD = 400
|
||||
const MAX_RECIPIENTS_PER_REQUEST = 200
|
||||
|
||||
export interface UploadOptions {
|
||||
onProgress?: (uploaded: number, total: number) => void
|
||||
@@ -150,14 +151,24 @@ export async function uploadFile(
|
||||
size: spec.chunkSize,
|
||||
digest: chunkDigest
|
||||
}
|
||||
const rcvKeysForChunk = rcvKps.map(kp => encodePubKeyEd25519(kp.publicKey))
|
||||
const {senderId, recipientIds} = await createXFTPChunk(
|
||||
agent, server, sndKp.privateKey, fileInfo, rcvKeysForChunk, auth ?? null
|
||||
const firstBatch = Math.min(numRecipients, MAX_RECIPIENTS_PER_REQUEST)
|
||||
const firstBatchKeys = rcvKps.slice(0, firstBatch).map(kp => encodePubKeyEd25519(kp.publicKey))
|
||||
const {senderId, recipientIds: firstIds} = await createXFTPChunk(
|
||||
agent, server, sndKp.privateKey, fileInfo, firstBatchKeys, auth ?? null
|
||||
)
|
||||
const allRecipientIds = [...firstIds]
|
||||
let added = firstBatch
|
||||
while (added < numRecipients) {
|
||||
const batchSize = Math.min(numRecipients - added, MAX_RECIPIENTS_PER_REQUEST)
|
||||
const batchKeys = rcvKps.slice(added, added + batchSize).map(kp => encodePubKeyEd25519(kp.publicKey))
|
||||
const moreIds = await addXFTPRecipients(agent, server, sndKp.privateKey, senderId, batchKeys)
|
||||
allRecipientIds.push(...moreIds)
|
||||
added += batchSize
|
||||
}
|
||||
await uploadXFTPChunk(agent, server, sndKp.privateKey, senderId, chunkData)
|
||||
sentChunks[index] = {
|
||||
chunkNo, senderId, senderKey: sndKp.privateKey,
|
||||
recipients: recipientIds.map((rid, ri) => ({
|
||||
recipients: allRecipientIds.map((rid, ri) => ({
|
||||
recipientId: rid, recipientKey: rcvKps[ri].privateKey
|
||||
})),
|
||||
chunkSize: spec.chunkSize, digest: chunkDigest, server
|
||||
@@ -365,15 +376,24 @@ async function resolveRedirect(
|
||||
fd: FileDescription
|
||||
): Promise<FileDescription> {
|
||||
const plaintextChunks: Uint8Array[] = new Array(fd.chunks.length)
|
||||
const byServer = new Map<string, typeof fd.chunks>()
|
||||
for (const chunk of fd.chunks) {
|
||||
const replica = chunk.replicas[0]
|
||||
if (!replica) throw new Error("resolveRedirect: chunk has no replicas")
|
||||
const server = parseXFTPServer(replica.server)
|
||||
const seed = decodePrivKeyEd25519(replica.replicaKey)
|
||||
const kp = ed25519KeyPairFromSeed(seed)
|
||||
const data = await downloadXFTPChunk(agent, server, kp.privateKey, replica.replicaId, chunk.digest)
|
||||
plaintextChunks[chunk.chunkNo - 1] = data
|
||||
const srv = chunk.replicas[0]?.server ?? ""
|
||||
if (!byServer.has(srv)) byServer.set(srv, [])
|
||||
byServer.get(srv)!.push(chunk)
|
||||
}
|
||||
await Promise.all([...byServer.entries()].map(async ([srv, chunks]) => {
|
||||
const server = parseXFTPServer(srv)
|
||||
for (const chunk of chunks) {
|
||||
const replica = chunk.replicas[0]
|
||||
if (!replica) throw new Error("resolveRedirect: chunk has no replicas")
|
||||
const seed = decodePrivKeyEd25519(replica.replicaKey)
|
||||
const kp = ed25519KeyPairFromSeed(seed)
|
||||
const data = await downloadXFTPChunk(agent, server, kp.privateKey, replica.replicaId, chunk.digest)
|
||||
plaintextChunks[chunk.chunkNo - 1] = data
|
||||
await ackXFTPChunk(agent, server, kp.privateKey, replica.replicaId)
|
||||
}
|
||||
}))
|
||||
const totalSize = plaintextChunks.reduce((s, c) => s + c.length, 0)
|
||||
if (totalSize !== fd.size) throw new Error("resolveRedirect: redirect file size mismatch")
|
||||
const digest = sha512Streaming(plaintextChunks)
|
||||
@@ -391,14 +411,22 @@ async function resolveRedirect(
|
||||
// -- Delete
|
||||
|
||||
export async function deleteFile(agent: XFTPClientAgent, sndDescription: FileDescription): Promise<void> {
|
||||
const byServer = new Map<string, typeof sndDescription.chunks>()
|
||||
for (const chunk of sndDescription.chunks) {
|
||||
const replica = chunk.replicas[0]
|
||||
if (!replica) throw new Error("deleteFile: chunk has no replicas")
|
||||
const server = parseXFTPServer(replica.server)
|
||||
const seed = decodePrivKeyEd25519(replica.replicaKey)
|
||||
const kp = ed25519KeyPairFromSeed(seed)
|
||||
await deleteXFTPChunk(agent, server, kp.privateKey, replica.replicaId)
|
||||
const srv = chunk.replicas[0]?.server ?? ""
|
||||
if (!byServer.has(srv)) byServer.set(srv, [])
|
||||
byServer.get(srv)!.push(chunk)
|
||||
}
|
||||
await Promise.all([...byServer.entries()].map(async ([srv, chunks]) => {
|
||||
const server = parseXFTPServer(srv)
|
||||
for (const chunk of chunks) {
|
||||
const replica = chunk.replicas[0]
|
||||
if (!replica) throw new Error("deleteFile: chunk has no replicas")
|
||||
const seed = decodePrivKeyEd25519(replica.replicaKey)
|
||||
const kp = ed25519KeyPairFromSeed(seed)
|
||||
await deleteXFTPChunk(agent, server, kp.privateKey, replica.replicaId)
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
// -- Internal
|
||||
|
||||
Reference in New Issue
Block a user