diff --git a/xftp-web/src/agent.ts b/xftp-web/src/agent.ts index 3205afdc7..db3d605d5 100644 --- a/xftp-web/src/agent.ts +++ b/xftp-web/src/agent.ts @@ -16,10 +16,11 @@ import { import type {FileInfo} from "./protocol/commands.js" import { createXFTPChunk, uploadXFTPChunk, downloadXFTPChunk, downloadXFTPChunkRaw, - deleteXFTPChunk, type XFTPClientAgent + deleteXFTPChunk, ackXFTPChunk, type XFTPClientAgent } from "./client.js" export {newXFTPAgent, closeXFTPAgent, type XFTPClientAgent, type TransportConfig, - XFTPRetriableError, XFTPPermanentError, isRetriable, categorizeError, humanReadableMessage} from "./client.js" + XFTPRetriableError, XFTPPermanentError, isRetriable, categorizeError, humanReadableMessage, + ackXFTPChunk} from "./client.js" import {processDownloadedFile, decryptReceivedChunk} from "./download.js" import type {XFTPServer} from "./protocol/address.js" import {formatXFTPServer, parseXFTPServer} from "./protocol/address.js" @@ -31,8 +32,7 @@ interface SentChunk { chunkNo: number senderId: Uint8Array senderKey: Uint8Array // 64B libsodium Ed25519 private key - recipientId: Uint8Array - recipientKey: Uint8Array // 64B libsodium Ed25519 private key + recipients: {recipientId: Uint8Array, recipientKey: Uint8Array}[] chunkSize: number digest: Uint8Array // SHA-256 server: XFTPServer @@ -50,7 +50,7 @@ export interface EncryptedFileInfo extends EncryptedFileMetadata { } export interface UploadResult { - rcvDescription: FileDescription + rcvDescriptions: FileDescription[] sndDescription: FileDescription uri: string // base64url-encoded compressed YAML (no leading #) } @@ -101,6 +101,8 @@ export interface UploadOptions { onProgress?: (uploaded: number, total: number) => void redirectThreshold?: number readChunk?: (offset: number, size: number) => Promise + auth?: Uint8Array + numRecipients?: number } export async function uploadFile( @@ -110,7 +112,7 @@ export async function uploadFile( options?: UploadOptions ): Promise { if (servers.length === 0) throw new Error("uploadFile: servers list is empty") - const {onProgress, redirectThreshold, readChunk: readChunkOpt} = options ?? {} + const {onProgress, redirectThreshold, readChunk: readChunkOpt, auth, numRecipients = 1} = options ?? {} const readChunk: (offset: number, size: number) => Promise = readChunkOpt ? readChunkOpt : ('encData' in encrypted @@ -118,49 +120,71 @@ export async function uploadFile( : () => { throw new Error("uploadFile: readChunk required when encData is absent") }) const total = encrypted.chunkSizes.reduce((a, b) => a + b, 0) const specs = prepareChunkSpecs(encrypted.chunkSizes) - const sentChunks: SentChunk[] = [] - let uploaded = 0 - for (let i = 0; i < specs.length; i++) { - const spec = specs[i] - const chunkNo = i + 1 - const server = servers[Math.floor(Math.random() * servers.length)] - const sndKp = generateEd25519KeyPair() - const rcvKp = generateEd25519KeyPair() - const chunkData = await readChunk(spec.chunkOffset, spec.chunkSize) - const chunkDigest = getChunkDigest(chunkData) - console.log(`[AGENT-DBG] upload chunk=${chunkNo} offset=${spec.chunkOffset} size=${spec.chunkSize} digest=${_dbgHex(chunkDigest, 32)} data[0..8]=${_dbgHex(chunkData)} data[-8..]=${_dbgHex(chunkData.slice(-8))}`) - const fileInfo: FileInfo = { - sndKey: encodePubKeyEd25519(sndKp.publicKey), - size: spec.chunkSize, - digest: chunkDigest - } - const rcvKeysForChunk = [encodePubKeyEd25519(rcvKp.publicKey)] - const {senderId, recipientIds} = await createXFTPChunk( - agent, server, sndKp.privateKey, fileInfo, rcvKeysForChunk - ) - await uploadXFTPChunk(agent, server, sndKp.privateKey, senderId, chunkData) - sentChunks.push({ - chunkNo, senderId, senderKey: sndKp.privateKey, - recipientId: recipientIds[0], recipientKey: rcvKp.privateKey, - chunkSize: spec.chunkSize, digest: chunkDigest, server - }) - uploaded += spec.chunkSize - onProgress?.(uploaded, total) + + // Pre-assign servers and group by server (matching Haskell groupAllOn) + const chunkJobs = specs.map((spec, i) => ({ + index: i, + spec, + server: servers[Math.floor(Math.random() * servers.length)] + })) + const byServer = new Map() + for (const job of chunkJobs) { + const key = formatXFTPServer(job.server) + if (!byServer.has(key)) byServer.set(key, []) + byServer.get(key)!.push(job) } - const rcvDescription = buildDescription("recipient", encrypted, sentChunks) - const sndDescription = buildDescription("sender", encrypted, sentChunks) - let uri = encodeDescriptionURI(rcvDescription) - let finalRcvDescription = rcvDescription + + // Upload groups in parallel, sequential within each group + const sentChunks: SentChunk[] = new Array(specs.length) + let uploaded = 0 + await Promise.all([...byServer.values()].map(async (jobs) => { + for (const {index, spec, server} of jobs) { + const chunkNo = index + 1 + const sndKp = generateEd25519KeyPair() + const rcvKps = Array.from({length: numRecipients}, () => generateEd25519KeyPair()) + const chunkData = await readChunk(spec.chunkOffset, spec.chunkSize) + const chunkDigest = getChunkDigest(chunkData) + console.log(`[AGENT-DBG] upload chunk=${chunkNo} offset=${spec.chunkOffset} size=${spec.chunkSize} digest=${_dbgHex(chunkDigest, 32)} data[0..8]=${_dbgHex(chunkData)} data[-8..]=${_dbgHex(chunkData.slice(-8))}`) + const fileInfo: FileInfo = { + sndKey: encodePubKeyEd25519(sndKp.publicKey), + size: spec.chunkSize, + digest: chunkDigest + } + const rcvKeysForChunk = rcvKps.map(kp => encodePubKeyEd25519(kp.publicKey)) + const {senderId, recipientIds} = await createXFTPChunk( + agent, server, sndKp.privateKey, fileInfo, rcvKeysForChunk, auth ?? null + ) + await uploadXFTPChunk(agent, server, sndKp.privateKey, senderId, chunkData) + sentChunks[index] = { + chunkNo, senderId, senderKey: sndKp.privateKey, + recipients: recipientIds.map((rid, ri) => ({ + recipientId: rid, recipientKey: rcvKps[ri].privateKey + })), + chunkSize: spec.chunkSize, digest: chunkDigest, server + } + uploaded += spec.chunkSize + onProgress?.(uploaded, total) + } + })) + + const rcvDescriptions = Array.from({length: numRecipients}, (_, ri) => + buildDescription("recipient", ri, encrypted, sentChunks) + ) + const sndDescription = buildDescription("sender", 0, encrypted, sentChunks) + let uri = encodeDescriptionURI(rcvDescriptions[0]) + let finalRcvDescriptions = rcvDescriptions const threshold = redirectThreshold ?? DEFAULT_REDIRECT_THRESHOLD if (uri.length > threshold && sentChunks.length > 1) { - finalRcvDescription = await uploadRedirectDescription(agent, servers, rcvDescription) - uri = encodeDescriptionURI(finalRcvDescription) + const redirected = await uploadRedirectDescription(agent, servers, rcvDescriptions[0], auth) + finalRcvDescriptions = [redirected, ...rcvDescriptions.slice(1)] + uri = encodeDescriptionURI(redirected) } - return {rcvDescription: finalRcvDescription, sndDescription, uri} + return {rcvDescriptions: finalRcvDescriptions, sndDescription, uri} } function buildDescription( party: "recipient" | "sender", + recipientIndex: number, enc: EncryptedFileMetadata, chunks: SentChunk[] ): FileDescription { @@ -178,8 +202,8 @@ function buildDescription( digest: c.digest, replicas: [{ server: formatXFTPServer(c.server), - replicaId: party === "recipient" ? c.recipientId : c.senderId, - replicaKey: encodePrivKeyEd25519(party === "recipient" ? c.recipientKey : c.senderKey) + replicaId: party === "recipient" ? c.recipients[recipientIndex].recipientId : c.senderId, + replicaKey: encodePrivKeyEd25519(party === "recipient" ? c.recipients[recipientIndex].recipientKey : c.senderKey) }] })), redirect: null @@ -189,37 +213,52 @@ function buildDescription( async function uploadRedirectDescription( agent: XFTPClientAgent, servers: XFTPServer[], - innerFd: FileDescription + innerFd: FileDescription, + auth?: Uint8Array ): Promise { const yaml = encodeFileDescription(innerFd) const yamlBytes = new TextEncoder().encode(yaml) const enc = encryptFileForUpload(yamlBytes, "") const specs = prepareChunkSpecs(enc.chunkSizes) - const sentChunks: SentChunk[] = [] - for (let i = 0; i < specs.length; i++) { - const spec = specs[i] - const chunkNo = i + 1 - const server = servers[Math.floor(Math.random() * servers.length)] - const sndKp = generateEd25519KeyPair() - const rcvKp = generateEd25519KeyPair() - const chunkData = enc.encData.subarray(spec.chunkOffset, spec.chunkOffset + spec.chunkSize) - const chunkDigest = getChunkDigest(chunkData) - const fileInfo: FileInfo = { - sndKey: encodePubKeyEd25519(sndKp.publicKey), - size: spec.chunkSize, - digest: chunkDigest - } - const rcvKeysForChunk = [encodePubKeyEd25519(rcvKp.publicKey)] - const {senderId, recipientIds} = await createXFTPChunk( - agent, server, sndKp.privateKey, fileInfo, rcvKeysForChunk - ) - await uploadXFTPChunk(agent, server, sndKp.privateKey, senderId, chunkData) - sentChunks.push({ - chunkNo, senderId, senderKey: sndKp.privateKey, - recipientId: recipientIds[0], recipientKey: rcvKp.privateKey, - chunkSize: spec.chunkSize, digest: chunkDigest, server - }) + + const chunkJobs = specs.map((spec, i) => ({ + index: i, + spec, + server: servers[Math.floor(Math.random() * servers.length)] + })) + const byServer = new Map() + for (const job of chunkJobs) { + const key = formatXFTPServer(job.server) + if (!byServer.has(key)) byServer.set(key, []) + byServer.get(key)!.push(job) } + + const sentChunks: SentChunk[] = new Array(specs.length) + await Promise.all([...byServer.values()].map(async (jobs) => { + for (const {index, spec, server} of jobs) { + const chunkNo = index + 1 + const sndKp = generateEd25519KeyPair() + const rcvKp = generateEd25519KeyPair() + const chunkData = enc.encData.subarray(spec.chunkOffset, spec.chunkOffset + spec.chunkSize) + const chunkDigest = getChunkDigest(chunkData) + const fileInfo: FileInfo = { + sndKey: encodePubKeyEd25519(sndKp.publicKey), + size: spec.chunkSize, + digest: chunkDigest + } + const rcvKeysForChunk = [encodePubKeyEd25519(rcvKp.publicKey)] + const {senderId, recipientIds} = await createXFTPChunk( + agent, server, sndKp.privateKey, fileInfo, rcvKeysForChunk, auth ?? null + ) + await uploadXFTPChunk(agent, server, sndKp.privateKey, senderId, chunkData) + sentChunks[index] = { + chunkNo, senderId, senderKey: sndKp.privateKey, + recipients: [{recipientId: recipientIds[0], recipientKey: rcvKp.privateKey}], + chunkSize: spec.chunkSize, digest: chunkDigest, server + } + } + })) + return { party: "recipient", size: enc.chunkSizes.reduce((a, b) => a + b, 0), @@ -233,8 +272,8 @@ async function uploadRedirectDescription( digest: c.digest, replicas: [{ server: formatXFTPServer(c.server), - replicaId: c.recipientId, - replicaKey: encodePrivKeyEd25519(c.recipientKey) + replicaId: c.recipients[0].recipientId, + replicaKey: encodePrivKeyEd25519(c.recipients[0].recipientKey) }] })), redirect: {size: innerFd.size, digest: innerFd.digest} @@ -253,7 +292,6 @@ export interface RawDownloadedChunk { export interface DownloadRawOptions { onProgress?: (downloaded: number, total: number) => void - concurrency?: number } export async function downloadFileRaw( @@ -264,7 +302,7 @@ export async function downloadFileRaw( ): Promise { const err = validateFileDescription(fd) if (err) throw new Error("downloadFileRaw: " + err) - const {onProgress, concurrency = 1} = options ?? {} + const {onProgress} = options ?? {} // Resolve redirect on main thread (redirect data is small) if (fd.redirect !== null) { console.log(`[AGENT-DBG] resolving redirect: outer size=${fd.size} chunks=${fd.chunks.length}`) @@ -296,6 +334,7 @@ export async function downloadFileRaw( body: raw.body, digest: chunk.digest }) + await ackXFTPChunk(agent, server, kp.privateKey, replica.replicaId) downloaded += chunk.chunkSize onProgress?.(downloaded, resolvedFd.size) } diff --git a/xftp-web/web/upload.ts b/xftp-web/web/upload.ts index 0faa70c0a..cf63ba379 100644 --- a/xftp-web/web/upload.ts +++ b/xftp-web/web/upload.ts @@ -3,9 +3,8 @@ import {getServers} from './servers.js' import {createProgressRing} from './progress.js' import { newXFTPAgent, closeXFTPAgent, uploadFile, encodeDescriptionURI, - type EncryptedFileMetadata + XFTPPermanentError, type EncryptedFileMetadata } from '../src/agent.js' -import {XFTPPermanentError} from '../src/client.js' const MAX_SIZE = 100 * 1024 * 1024