parallel upload, FACK on download, multi-recipient, auth passthrough

- upload chunks in parallel across server groups (matching Haskell)
- send FACK after each chunk download
- support numRecipients option for multi-recipient uploads
- pass auth option through to createXFTPChunk
- remove dead concurrency option from DownloadRawOptions
This commit is contained in:
shum
2026-02-18 14:38:05 +00:00
parent 17995b3cfc
commit 122d039645
2 changed files with 112 additions and 74 deletions
+111 -72
View File
@@ -16,10 +16,11 @@ import {
import type {FileInfo} from "./protocol/commands.js"
import {
createXFTPChunk, uploadXFTPChunk, downloadXFTPChunk, downloadXFTPChunkRaw,
deleteXFTPChunk, type XFTPClientAgent
deleteXFTPChunk, ackXFTPChunk, type XFTPClientAgent
} from "./client.js"
export {newXFTPAgent, closeXFTPAgent, type XFTPClientAgent, type TransportConfig,
XFTPRetriableError, XFTPPermanentError, isRetriable, categorizeError, humanReadableMessage} from "./client.js"
XFTPRetriableError, XFTPPermanentError, isRetriable, categorizeError, humanReadableMessage,
ackXFTPChunk} from "./client.js"
import {processDownloadedFile, decryptReceivedChunk} from "./download.js"
import type {XFTPServer} from "./protocol/address.js"
import {formatXFTPServer, parseXFTPServer} from "./protocol/address.js"
@@ -31,8 +32,7 @@ interface SentChunk {
chunkNo: number
senderId: Uint8Array
senderKey: Uint8Array // 64B libsodium Ed25519 private key
recipientId: Uint8Array
recipientKey: Uint8Array // 64B libsodium Ed25519 private key
recipients: {recipientId: Uint8Array, recipientKey: Uint8Array}[]
chunkSize: number
digest: Uint8Array // SHA-256
server: XFTPServer
@@ -50,7 +50,7 @@ export interface EncryptedFileInfo extends EncryptedFileMetadata {
}
export interface UploadResult {
rcvDescription: FileDescription
rcvDescriptions: FileDescription[]
sndDescription: FileDescription
uri: string // base64url-encoded compressed YAML (no leading #)
}
@@ -101,6 +101,8 @@ export interface UploadOptions {
onProgress?: (uploaded: number, total: number) => void
redirectThreshold?: number
readChunk?: (offset: number, size: number) => Promise<Uint8Array>
auth?: Uint8Array
numRecipients?: number
}
export async function uploadFile(
@@ -110,7 +112,7 @@ export async function uploadFile(
options?: UploadOptions
): Promise<UploadResult> {
if (servers.length === 0) throw new Error("uploadFile: servers list is empty")
const {onProgress, redirectThreshold, readChunk: readChunkOpt} = options ?? {}
const {onProgress, redirectThreshold, readChunk: readChunkOpt, auth, numRecipients = 1} = options ?? {}
const readChunk: (offset: number, size: number) => Promise<Uint8Array> = readChunkOpt
? readChunkOpt
: ('encData' in encrypted
@@ -118,49 +120,71 @@ export async function uploadFile(
: () => { throw new Error("uploadFile: readChunk required when encData is absent") })
const total = encrypted.chunkSizes.reduce((a, b) => a + b, 0)
const specs = prepareChunkSpecs(encrypted.chunkSizes)
const sentChunks: SentChunk[] = []
let uploaded = 0
for (let i = 0; i < specs.length; i++) {
const spec = specs[i]
const chunkNo = i + 1
const server = servers[Math.floor(Math.random() * servers.length)]
const sndKp = generateEd25519KeyPair()
const rcvKp = generateEd25519KeyPair()
const chunkData = await readChunk(spec.chunkOffset, spec.chunkSize)
const chunkDigest = getChunkDigest(chunkData)
console.log(`[AGENT-DBG] upload chunk=${chunkNo} offset=${spec.chunkOffset} size=${spec.chunkSize} digest=${_dbgHex(chunkDigest, 32)} data[0..8]=${_dbgHex(chunkData)} data[-8..]=${_dbgHex(chunkData.slice(-8))}`)
const fileInfo: FileInfo = {
sndKey: encodePubKeyEd25519(sndKp.publicKey),
size: spec.chunkSize,
digest: chunkDigest
}
const rcvKeysForChunk = [encodePubKeyEd25519(rcvKp.publicKey)]
const {senderId, recipientIds} = await createXFTPChunk(
agent, server, sndKp.privateKey, fileInfo, rcvKeysForChunk
)
await uploadXFTPChunk(agent, server, sndKp.privateKey, senderId, chunkData)
sentChunks.push({
chunkNo, senderId, senderKey: sndKp.privateKey,
recipientId: recipientIds[0], recipientKey: rcvKp.privateKey,
chunkSize: spec.chunkSize, digest: chunkDigest, server
})
uploaded += spec.chunkSize
onProgress?.(uploaded, total)
// Pre-assign servers and group by server (matching Haskell groupAllOn)
const chunkJobs = specs.map((spec, i) => ({
index: i,
spec,
server: servers[Math.floor(Math.random() * servers.length)]
}))
const byServer = new Map<string, typeof chunkJobs>()
for (const job of chunkJobs) {
const key = formatXFTPServer(job.server)
if (!byServer.has(key)) byServer.set(key, [])
byServer.get(key)!.push(job)
}
const rcvDescription = buildDescription("recipient", encrypted, sentChunks)
const sndDescription = buildDescription("sender", encrypted, sentChunks)
let uri = encodeDescriptionURI(rcvDescription)
let finalRcvDescription = rcvDescription
// Upload groups in parallel, sequential within each group
const sentChunks: SentChunk[] = new Array(specs.length)
let uploaded = 0
await Promise.all([...byServer.values()].map(async (jobs) => {
for (const {index, spec, server} of jobs) {
const chunkNo = index + 1
const sndKp = generateEd25519KeyPair()
const rcvKps = Array.from({length: numRecipients}, () => generateEd25519KeyPair())
const chunkData = await readChunk(spec.chunkOffset, spec.chunkSize)
const chunkDigest = getChunkDigest(chunkData)
console.log(`[AGENT-DBG] upload chunk=${chunkNo} offset=${spec.chunkOffset} size=${spec.chunkSize} digest=${_dbgHex(chunkDigest, 32)} data[0..8]=${_dbgHex(chunkData)} data[-8..]=${_dbgHex(chunkData.slice(-8))}`)
const fileInfo: FileInfo = {
sndKey: encodePubKeyEd25519(sndKp.publicKey),
size: spec.chunkSize,
digest: chunkDigest
}
const rcvKeysForChunk = rcvKps.map(kp => encodePubKeyEd25519(kp.publicKey))
const {senderId, recipientIds} = await createXFTPChunk(
agent, server, sndKp.privateKey, fileInfo, rcvKeysForChunk, auth ?? null
)
await uploadXFTPChunk(agent, server, sndKp.privateKey, senderId, chunkData)
sentChunks[index] = {
chunkNo, senderId, senderKey: sndKp.privateKey,
recipients: recipientIds.map((rid, ri) => ({
recipientId: rid, recipientKey: rcvKps[ri].privateKey
})),
chunkSize: spec.chunkSize, digest: chunkDigest, server
}
uploaded += spec.chunkSize
onProgress?.(uploaded, total)
}
}))
const rcvDescriptions = Array.from({length: numRecipients}, (_, ri) =>
buildDescription("recipient", ri, encrypted, sentChunks)
)
const sndDescription = buildDescription("sender", 0, encrypted, sentChunks)
let uri = encodeDescriptionURI(rcvDescriptions[0])
let finalRcvDescriptions = rcvDescriptions
const threshold = redirectThreshold ?? DEFAULT_REDIRECT_THRESHOLD
if (uri.length > threshold && sentChunks.length > 1) {
finalRcvDescription = await uploadRedirectDescription(agent, servers, rcvDescription)
uri = encodeDescriptionURI(finalRcvDescription)
const redirected = await uploadRedirectDescription(agent, servers, rcvDescriptions[0], auth)
finalRcvDescriptions = [redirected, ...rcvDescriptions.slice(1)]
uri = encodeDescriptionURI(redirected)
}
return {rcvDescription: finalRcvDescription, sndDescription, uri}
return {rcvDescriptions: finalRcvDescriptions, sndDescription, uri}
}
function buildDescription(
party: "recipient" | "sender",
recipientIndex: number,
enc: EncryptedFileMetadata,
chunks: SentChunk[]
): FileDescription {
@@ -178,8 +202,8 @@ function buildDescription(
digest: c.digest,
replicas: [{
server: formatXFTPServer(c.server),
replicaId: party === "recipient" ? c.recipientId : c.senderId,
replicaKey: encodePrivKeyEd25519(party === "recipient" ? c.recipientKey : c.senderKey)
replicaId: party === "recipient" ? c.recipients[recipientIndex].recipientId : c.senderId,
replicaKey: encodePrivKeyEd25519(party === "recipient" ? c.recipients[recipientIndex].recipientKey : c.senderKey)
}]
})),
redirect: null
@@ -189,37 +213,52 @@ function buildDescription(
async function uploadRedirectDescription(
agent: XFTPClientAgent,
servers: XFTPServer[],
innerFd: FileDescription
innerFd: FileDescription,
auth?: Uint8Array
): Promise<FileDescription> {
const yaml = encodeFileDescription(innerFd)
const yamlBytes = new TextEncoder().encode(yaml)
const enc = encryptFileForUpload(yamlBytes, "")
const specs = prepareChunkSpecs(enc.chunkSizes)
const sentChunks: SentChunk[] = []
for (let i = 0; i < specs.length; i++) {
const spec = specs[i]
const chunkNo = i + 1
const server = servers[Math.floor(Math.random() * servers.length)]
const sndKp = generateEd25519KeyPair()
const rcvKp = generateEd25519KeyPair()
const chunkData = enc.encData.subarray(spec.chunkOffset, spec.chunkOffset + spec.chunkSize)
const chunkDigest = getChunkDigest(chunkData)
const fileInfo: FileInfo = {
sndKey: encodePubKeyEd25519(sndKp.publicKey),
size: spec.chunkSize,
digest: chunkDigest
}
const rcvKeysForChunk = [encodePubKeyEd25519(rcvKp.publicKey)]
const {senderId, recipientIds} = await createXFTPChunk(
agent, server, sndKp.privateKey, fileInfo, rcvKeysForChunk
)
await uploadXFTPChunk(agent, server, sndKp.privateKey, senderId, chunkData)
sentChunks.push({
chunkNo, senderId, senderKey: sndKp.privateKey,
recipientId: recipientIds[0], recipientKey: rcvKp.privateKey,
chunkSize: spec.chunkSize, digest: chunkDigest, server
})
const chunkJobs = specs.map((spec, i) => ({
index: i,
spec,
server: servers[Math.floor(Math.random() * servers.length)]
}))
const byServer = new Map<string, typeof chunkJobs>()
for (const job of chunkJobs) {
const key = formatXFTPServer(job.server)
if (!byServer.has(key)) byServer.set(key, [])
byServer.get(key)!.push(job)
}
const sentChunks: SentChunk[] = new Array(specs.length)
await Promise.all([...byServer.values()].map(async (jobs) => {
for (const {index, spec, server} of jobs) {
const chunkNo = index + 1
const sndKp = generateEd25519KeyPair()
const rcvKp = generateEd25519KeyPair()
const chunkData = enc.encData.subarray(spec.chunkOffset, spec.chunkOffset + spec.chunkSize)
const chunkDigest = getChunkDigest(chunkData)
const fileInfo: FileInfo = {
sndKey: encodePubKeyEd25519(sndKp.publicKey),
size: spec.chunkSize,
digest: chunkDigest
}
const rcvKeysForChunk = [encodePubKeyEd25519(rcvKp.publicKey)]
const {senderId, recipientIds} = await createXFTPChunk(
agent, server, sndKp.privateKey, fileInfo, rcvKeysForChunk, auth ?? null
)
await uploadXFTPChunk(agent, server, sndKp.privateKey, senderId, chunkData)
sentChunks[index] = {
chunkNo, senderId, senderKey: sndKp.privateKey,
recipients: [{recipientId: recipientIds[0], recipientKey: rcvKp.privateKey}],
chunkSize: spec.chunkSize, digest: chunkDigest, server
}
}
}))
return {
party: "recipient",
size: enc.chunkSizes.reduce((a, b) => a + b, 0),
@@ -233,8 +272,8 @@ async function uploadRedirectDescription(
digest: c.digest,
replicas: [{
server: formatXFTPServer(c.server),
replicaId: c.recipientId,
replicaKey: encodePrivKeyEd25519(c.recipientKey)
replicaId: c.recipients[0].recipientId,
replicaKey: encodePrivKeyEd25519(c.recipients[0].recipientKey)
}]
})),
redirect: {size: innerFd.size, digest: innerFd.digest}
@@ -253,7 +292,6 @@ export interface RawDownloadedChunk {
export interface DownloadRawOptions {
onProgress?: (downloaded: number, total: number) => void
concurrency?: number
}
export async function downloadFileRaw(
@@ -264,7 +302,7 @@ export async function downloadFileRaw(
): Promise<FileDescription> {
const err = validateFileDescription(fd)
if (err) throw new Error("downloadFileRaw: " + err)
const {onProgress, concurrency = 1} = options ?? {}
const {onProgress} = options ?? {}
// Resolve redirect on main thread (redirect data is small)
if (fd.redirect !== null) {
console.log(`[AGENT-DBG] resolving redirect: outer size=${fd.size} chunks=${fd.chunks.length}`)
@@ -296,6 +334,7 @@ export async function downloadFileRaw(
body: raw.body,
digest: chunk.digest
})
await ackXFTPChunk(agent, server, kp.privateKey, replica.replicaId)
downloaded += chunk.chunkSize
onProgress?.(downloaded, resolvedFd.size)
}
+1 -2
View File
@@ -3,9 +3,8 @@ import {getServers} from './servers.js'
import {createProgressRing} from './progress.js'
import {
newXFTPAgent, closeXFTPAgent, uploadFile, encodeDescriptionURI,
type EncryptedFileMetadata
XFTPPermanentError, type EncryptedFileMetadata
} from '../src/agent.js'
import {XFTPPermanentError} from '../src/client.js'
const MAX_SIZE = 100 * 1024 * 1024