From 59e740c6b1d19f25fed92736b80523b223dceebf Mon Sep 17 00:00:00 2001 From: "Evgeny @ SimpleX Chat" <259188159+evgeny-simplex@users.noreply.github.com> Date: Wed, 11 Feb 2026 21:01:40 +0000 Subject: [PATCH] improve error handling, handle browser reconnections/re-handshake --- .../2026-02-11-xftp-web-error-handling.md | 948 ++++++++++++++++++ src/Simplex/FileTransfer/Server.hs | 29 +- tests/XFTPServerTests.hs | 25 +- xftp-web/src/agent.ts | 51 +- xftp-web/src/client.ts | 260 ++++- xftp-web/test/connection.node.test.ts | 164 +++ xftp-web/test/errors.test.ts | 34 + xftp-web/vitest.config.ts | 1 + xftp-web/vitest.node.config.ts | 9 + xftp-web/web/download.ts | 6 +- xftp-web/web/upload.ts | 9 +- 11 files changed, 1429 insertions(+), 107 deletions(-) create mode 100644 rfcs/2026-01-30-send-file-page/2026-02-11-xftp-web-error-handling.md create mode 100644 xftp-web/test/connection.node.test.ts create mode 100644 xftp-web/test/errors.test.ts create mode 100644 xftp-web/vitest.node.config.ts diff --git a/rfcs/2026-01-30-send-file-page/2026-02-11-xftp-web-error-handling.md b/rfcs/2026-01-30-send-file-page/2026-02-11-xftp-web-error-handling.md new file mode 100644 index 000000000..2802f16a5 --- /dev/null +++ b/rfcs/2026-01-30-send-file-page/2026-02-11-xftp-web-error-handling.md @@ -0,0 +1,948 @@ +# XFTP Web Error Handling and Connection Resilience + +## 1. Problem Statement + +The XFTP web client is fundamentally fragile: any transient error (browser opening a new HTTP/2 connection, network hiccup, server restart) causes an unrecoverable failure with a cryptic error message. There is no retry logic, no fetch timeout, no error categorization, and the upload uses a single server instead of distributing chunks across preset servers. This makes the app frustrating — it works most of the time but fails unpredictably, which is worse than being completely broken. + +### Confirmed root cause (from diagnostic logs) + +When the browser opens a new HTTP/2 connection mid-operation, the new connection has a different TLS SessionId with no handshake state in the server's `TMap SessionId Handshake`. The server's `Nothing` branch in `xftpServerHandshakeV1` (Server.hs:169) unconditionally calls `processHello`, which tries to decode the command body as `XFTPClientHello`, fails, and sends a raw padded "HANDSHAKE" error string. The client cannot parse this as a proper transmission (first byte 'H' = 72 is read as batch count), producing `"expected batch count 1, got 72"`. + +Server log confirming the SessionId change: +``` +DEBUG dispatch: Accepted+command sessId="ZSo1GGETgIvjbB7CWHbvGPpbMjx_b2IlC1eTI6aKfqc=" +...20 successful commands... +DEBUG dispatch: Nothing sessId="mJC7Sck9xxW5UsXoPGoUWduuHghSVgf6CnD6ZC6SBhU=" webHello=False +``` + +### Why re-handshake is required (cannot be made optional) + +1. **SessionId is baked into signed command data.** `encodeAuthTransmission` signs `concat(encode(sessionId), tInner)` with Ed25519. Server's `tDecodeServer` (Protocol.hs:2242) verifies `sessId == sessionId`. New connection = different sessionId = signature mismatch. +2. **Server generates per-session DH keys.** `processHello` creates fresh X25519 keypair stored in `HandshakeSent`. For SMP browser clients (future), `verifyCmdAuth` (Protocol.hs:1322) requires the matching `serverPrivKey` from `thAuth`. +3. **This applies to both XFTP and future SMP browser clients** — the session management approach is the same. + +### Why multiple preset servers cannot work + +Upload (`agent.ts:105-157`) takes a single `server: XFTPServer` parameter and uploads ALL chunks to it. `web/upload.ts:133` calls `pickRandomServer(servers)` which selects ONE random server from all presets. The multi-server preset configuration is pointless — only one server is ever used per upload. The design intent (RFC section 11.6: "upload in parallel to 8 randomly selected servers") is not implemented. This must be fixed in Phase 2 (section 3.7). + +## 2. Solution Summary + +### Phase 1: Error handling and connection resilience + +1. **Server: strict dispatch for allowed protocol combinations** — reject all invalid combinations +2. **Client: automatic retry with re-handshake** on SESSION/HANDSHAKE errors +3. **Client: fetch timeout** with configurable duration +4. **UI: error categorization and retry** — auto-retry temporary, human-readable permanent +5. **Client: connection state with Promise-based lock and per-server queues** — `ServerConnection` with `client: Promise` + `queue: Promise` +6. **Client: fix cache key** — include keyHash + +### Phase 2: Multi-server upload (after Phase 1) + +7. **Multi-server upload with server selection and failover** — distribute chunks across servers, retry FNEW on different server if one fails + +## 3. Detailed Technical Design + +### 3.1 Server: strict dispatch for allowed protocol combinations + +**Principle:** Everything not explicitly done by existing Haskell/TS clients is prohibited. It is better to fail on impossible combinations than to be permissive — permissiveness complicates debugging and creates attack vectors via unexpected behaviors. + +**Allowed behaviors by client type:** + +| Client | SNI | webHello header | Hello body | When | +|--------|-----|----------------|------------|------| +| Haskell | No | No | Empty | New connection only | +| Web | Yes | Yes | Non-empty (XFTPClientHello) | New OR existing connection | + +**Minimal surgical change.** The existing dispatch (Server.hs:169-189) already correctly handles `HandshakeSent` and `HandshakeAccepted` — their guards cover all valid and invalid combinations. The ONLY missing case is `Nothing` + web client sending a command on a stale session. + +`processHello` (Server.hs:194-217) already internally routes: `B.null bodyHead` → Haskell hello, `sniUsed` → web hello decode, else → HANDSHAKE. For stale web sessions, it currently tries to decode a command body as `XFTPClientHello`, fails, and throws HANDSHAKE. The fix: detect this case BEFORE calling processHello and throw SESSION instead, so the client knows to re-handshake (not that its hello was malformed). + +**Change: add one guard to `Nothing` branch, remove debug logging.** + +```haskell +-- Before (1 line): +Nothing -> processHello Nothing + +-- After (3 lines): +Nothing + | sniUsed && not webHello -> throwE SESSION -- web command on stale session + | otherwise -> processHello Nothing -- normal hello (web or Haskell) +``` + +`throwE SESSION` is caught by `either sendError pure r` (line 190). `sendError` pads `smpEncode SESSION` = `"SESSION"` (Transport.hs:298) to `xftpBlockSize`. The client's padded error detection (section 3.2) catches this as a retriable error and triggers re-handshake. SESSION is a valid `XFTPErrorType` constructor (Transport.hs:225) — no new helpers needed. + +**All other branches remain unchanged.** `HandshakeSent` guards (`webHello` → processHello, `otherwise` → processClientHandshake with body size check inside) are correct. `HandshakeAccepted` guards (`webHello`, `webHandshake`, `otherwise` → command) are correct. + +### 3.2 Client: automatic retry with re-handshake + +**Location:** `sendXFTPCommand` in `client.ts` + +**Design:** Retry loop inside `sendXFTPCommand`. Maximum 3 attempts. On retriable error, close old client, re-handshake, retry. + +**Error classification:** + +| Error | Type | Retriable? | Human-readable message | +|-------|------|-----------|----------------------| +| Padded "HANDSHAKE" | Temporary | Yes (auto) | "Connection interrupted, reconnecting..." | +| Padded "SESSION" | Temporary | Yes (auto) | "Session expired, reconnecting..." | +| `FRErr SESSION` | Temporary | Yes (auto) | "Session expired, reconnecting..." | +| `FRErr HANDSHAKE` | Temporary | Yes (auto) | "Connection interrupted, reconnecting..." | +| `fetch()` TypeError | Temporary | Yes (auto) | "Network error, retrying..." | +| AbortError (timeout) | Temporary | Yes (auto) | "Server timeout, retrying..." | +| `FRErr AUTH` | Permanent | No | "File is invalid, expired, or has been removed" | +| `FRErr NO_FILE` | Permanent | No | "File not found — it may have expired" | +| `FRErr SIZE` | Permanent | No | "File size exceeds server limit" | +| `FRErr QUOTA` | Permanent | No | "Server storage quota exceeded" | +| `FRErr BLOCKED` | Permanent | No | "File has been blocked by server" | +| `FRErr DIGEST` | Permanent | No | "File integrity check failed" | +| `FRErr INTERNAL` | Permanent | No | "Server internal error" | +| `CMD *` | Permanent | No | "Protocol error" | + +**Retry behavior:** +- Auto-retry up to 3 times for temporary errors, transparent to user +- After 3 failures: show human-readable error with diagnosis, offer manual retry button +- Permanent errors: show human-readable error immediately, NO manual retry button (user can reload page) + +**Implementation:** + +```typescript +async function sendXFTPCommand( + agent: XFTPClientAgent, + server: XFTPServer, + privateKey: Uint8Array, + entityId: Uint8Array, + cmdBytes: Uint8Array, + chunkData?: Uint8Array, + maxRetries: number = 3 +): Promise<{response: FileResponse, body: Uint8Array}> { + let clientP = getXFTPServerClient(agent, server) + let client = await clientP + for (let attempt = 1; attempt <= maxRetries; attempt++) { + try { + return await sendXFTPCommandOnce(client, privateKey, entityId, cmdBytes, chunkData) + } catch (e) { + if (!isRetriable(e)) { + // Permanent error (AUTH, NO_FILE, etc.) — connection is fine, don't touch it + throw categorizeError(e) + } + if (attempt === maxRetries) { + // Retriable error exhausted — connection is bad, remove stale promise + removeStaleConnection(agent, server, clientP) + throw categorizeError(e) + } + clientP = reconnectClient(agent, server) + client = await clientP + } + } + throw new Error("unreachable") +} +``` + +**`sendXFTPCommandOnce`** — renamed from current `sendXFTPCommand`. Two changes: + +1. **Padded error detection** (before `decodeTransmission`): + +```typescript +// After getting respBlock, before decodeTransmission: +const raw = blockUnpad(respBlock) +if (raw.length < 20) { + const text = new TextDecoder().decode(raw) + if (/^[A-Z_]+$/.test(text)) { + throw new XFTPRetriableError(text) // "HANDSHAKE" or "SESSION" + } +} +``` + +2. **FRErr classification** (replaces current unconditional throw): + +```typescript +// After decodeResponse, instead of throw new Error("Server error: " + err.type): +if (response.type === "FRErr") { + const err = response.err + if (err.type === "SESSION" || err.type === "HANDSHAKE") { + throw new XFTPRetriableError(err.type) + } + throw new XFTPPermanentError(err.type, humanReadableMessage(err)) +} +``` + +### 3.3 Client: fetch timeout + +**Location:** `createBrowserTransport` and `createNodeTransport` in `client.ts` + +**Design:** `AbortController` with configurable timeout on every `fetch()`. + +```typescript +interface TransportConfig { + timeoutMs: number // default 30000, lower for tests +} + +function createBrowserTransport(baseUrl: string, config: TransportConfig): Transport { + return { + async post(body: Uint8Array, headers?: Record): Promise { + const controller = new AbortController() + const timer = setTimeout(() => controller.abort(), config.timeoutMs) + try { + const resp = await fetch(effectiveUrl, { + method: "POST", headers, body, + signal: controller.signal + }) + if (!resp.ok) throw new Error(`Server request failed: ${resp.status}`) + return new Uint8Array(await resp.arrayBuffer()) + } finally { + clearTimeout(timer) + } + }, + close() {} + } +} +``` + +For Node.js transport, use `setTimeout` on the HTTP/2 request stream. + +Default: 30s for production, 5s for tests. Threaded through `connectXFTP` → `createTransport`. + +### 3.4 UI: error categorization and retry + +**Behavior (Option D):** + +- **Temporary errors:** Auto-retry loop (3 attempts). After 3 failures, show human-readable diagnosis with manual retry button. Diagnosis examples: "Server timeout — the server may be temporarily unavailable", "Connection interrupted — your network may be unstable". +- **Permanent errors:** Show human-readable error immediately, NO retry button. User can reload page if they want to retry. Examples: "File is invalid, expired, or has been removed" (AUTH), "File not found" (NO_FILE). + +**Current UI retry buttons:** +- `upload.ts:73-75` — retry calls `startUpload(pendingFile)` from scratch +- `download.ts:60` — retry calls `startDownload()` from scratch + +**Improvement:** Track uploaded/downloaded chunk indices. On manual retry, skip completed chunks: + +```typescript +// Upload: track which chunks completed +const completedChunks: Set = new Set() +for (let i = 0; i < specs.length; i++) { + if (completedChunks.has(i)) continue + // ... create + upload chunk + completedChunks.add(i) +} + +// Download: already naturally resumable — each chunk is independent +``` + +### 3.5 Client: connection state with Promise-based lock and per-server queues + +**Design:** Each server gets a `ServerConnection` record containing a `Promise` (the connection lock) and a `Promise` (the sequential command queue). The `XFTPClientAgent` maps server keys to these records. + +The promise IS the lock — every consumer awaits the same promise. When reconnect is needed, the promise is replaced atomically. + +```typescript +interface ServerConnection { + client: Promise // resolves to connected client; replaced on reconnect + queue: Promise // tail of sequential command chain +} + +interface XFTPClientAgent { + connections: Map +} + +function newXFTPAgent(): XFTPClientAgent { + return {connections: new Map()} +} +``` + +**Connection lifecycle — `getXFTPServerClient` and `reconnectClient`:** + +```typescript +function getXFTPServerClient(agent: XFTPClientAgent, server: XFTPServer): Promise { + const key = formatXFTPServer(server) + let conn = agent.connections.get(key) + if (!conn) { + const p = connectXFTP(server) + conn = {client: p, queue: Promise.resolve()} + agent.connections.set(key, conn) + // On connection failure, remove from map so next call retries + p.catch(() => { + const cur = agent.connections.get(key) + if (cur && cur.client === p) agent.connections.delete(key) + }) + } + return conn.client +} + +function reconnectClient(agent: XFTPClientAgent, server: XFTPServer): Promise { + const key = formatXFTPServer(server) + const old = agent.connections.get(key) + // Close old client (fire-and-forget) + old?.client.then(c => c.transport.close(), () => {}) + // Replace with new connection promise — all concurrent callers will await this + // Queue survives reconnect — pending operations stay ordered + const p = connectXFTP(server) + const conn: ServerConnection = {client: p, queue: old?.queue ?? Promise.resolve()} + agent.connections.set(key, conn) + p.catch(() => { + const cur = agent.connections.get(key) + if (cur && cur.client === p) agent.connections.delete(key) + }) + return p +} + +function closeXFTPServerClient(agent: XFTPClientAgent, server: XFTPServer): void { + const key = formatXFTPServer(server) + const conn = agent.connections.get(key) + if (conn) { + agent.connections.delete(key) + conn.client.then(c => c.transport.close(), () => {}) + } +} + +function closeXFTPAgent(agent: XFTPClientAgent): void { + for (const conn of agent.connections.values()) { + conn.client.then(c => c.transport.close(), () => {}) + } + agent.connections.clear() +} +``` + +**Precise semantics:** + +1. `getXFTPServerClient(agent, server)` — returns existing `conn.client` promise if present, otherwise creates a new `ServerConnection` with fresh connection and empty queue +2. When error detected, first caller calls `reconnectClient` which replaces `conn.client` with a new connection promise. The queue is preserved across reconnect. +3. All concurrent callers awaiting the OLD promise receive the error +4. They then call `getXFTPServerClient` which returns the NEW promise +5. If reconnection fails, auto-cleanup (`p.catch(() => delete)`) removes the entry so the next caller starts fresh + +**Stale error cleanup rule:** When a caller exhausts retries for a retriable error, it removes the failed entry from the map (only if no concurrent caller has already replaced it via `reconnectClient`). This prevents the next caller from receiving a stale rejected promise. Permanent errors (AUTH, NO_FILE, etc.) do NOT remove the connection — the transport is fine, only the command failed. + +```typescript +function removeStaleConnection( + agent: XFTPClientAgent, server: XFTPServer, failedP: Promise +): void { + const key = formatXFTPServer(server) + const conn = agent.connections.get(key) + // Only remove if current promise is the one that failed — not if already replaced by reconnect + if (conn && conn.client === failedP) { + agent.connections.delete(key) + failedP.then(c => c.transport.close(), () => {}) + } +} +``` + +**Per-server sequential queue:** `queue` is a `Promise` — the tail of the sequential operation chain. Each new operation `.then()`s onto it. It's `void` because callers hold their own typed promises; the queue only tracks completion order: + +```typescript +async function enqueueCommand( + agent: XFTPClientAgent, + server: XFTPServer, + fn: () => Promise // no client param — fn uses command wrappers (agent+server) +): Promise { + const key = formatXFTPServer(server) + // Ensure connection exists (with auto-cleanup on failure) + await getXFTPServerClient(agent, server) + const conn = agent.connections.get(key)! // guaranteed to exist after getXFTPServerClient + // Chain onto the queue — fn runs after previous operation completes + let resolve_: (v: T) => void, reject_: (e: any) => void + const result = new Promise((res, rej) => { resolve_ = res; reject_ = rej }) + conn.queue = conn.queue.then( + () => fn().then(resolve_!, reject_!), + () => fn().then(resolve_!, reject_!) + ).then(() => {}, () => {}) // swallow errors in the chain + return result +} +``` + +Commands to the same server execute one at a time via the queue. Commands to different servers execute concurrently because each has its own queue. `enqueueCommand` provides sequencing; `sendXFTPCommand` (called inside `fn` via command wrappers) provides retry. They compose as: `enqueueCommand` sequences calls to wrappers that internally use `sendXFTPCommand`. + +**Download change:** Group chunks by server, process each server's chunks sequentially, servers in parallel. Uses `for` loop for per-server sequencing (same pattern as Stage 2 upload). `enqueueCommand` is available for cases where different callers target the same server. + +```typescript +const byServer = new Map() +for (const chunk of resolvedFd.chunks) { + const srv = chunk.replicas[0]?.server ?? "" + if (!byServer.has(srv)) byServer.set(srv, []) + byServer.get(srv)!.push(chunk) +} +await Promise.all([...byServer.entries()].map(async ([srv, chunks]) => { + const server = parseXFTPServer(srv) + for (const chunk of chunks) { + const seed = decodePrivKeyEd25519(chunk.replicas[0].replicaKey) + const kp = ed25519KeyPairFromSeed(seed) + const raw = await downloadXFTPChunkRaw(agent, server, kp.privateKey, chunk.replicas[0].replicaId) + await onRawChunk({chunkNo: chunk.chunkNo, dhSecret: raw.dhSecret, nonce: raw.nonce, body: raw.body, digest: chunk.digest}) + downloaded += chunk.chunkSize + onProgress?.(downloaded, resolvedFd.size) + } +})) +``` + +### 3.6 Fix cache key + +**Bug:** `getXFTPServerClient` (client.ts:110) uses `"https://" + server.host + ":" + server.port` as cache key, ignoring `keyHash`. Two servers with same host:port but different keyHash share a cached connection, bypassing identity verification. + +**Fix:** Use `formatXFTPServer(server)` as cache key (includes keyHash). Already available in `protocol/address.ts:52-54`. + +```typescript +// Before: +const key = "https://" + server.host + ":" + server.port + +// After: +const key = formatXFTPServer(server) +``` + +Note: With the redesign in 3.5, the cache key fix is inherent — the `connections` Map uses `formatXFTPServer(server)` everywhere. + +### 3.7 Phase 2: Multi-server upload with server selection and failover + +**Problem:** Current upload (`agent.ts:105-157`) takes a single `server: XFTPServer` and uploads ALL chunks to it. The 12 preset servers (6 SimpleX + 6 Flux) are pointless — only one is ever used. + +**Design goal:** Distribute chunks across servers. Retry FNEW on a different server if one fails. Once working servers are found, prefer them (heuristic: server unlikely to fail mid-process, more likely to be broken initially due to maintenance/downtime). + +**Reference implementation:** Haskell `Agent.hs:457-486` (`createChunk` / `createWithNextSrv`) + `Client.hs:2335-2385` (`getNextServer_` / `withNextSrv`). + +#### Haskell algorithm summary + +Two-stage architecture: + +1. **Allocate stage (serial per file in Haskell):** For each chunk, call FNEW on a randomly-selected server. If FNEW fails, pick a different server and retry. Track tried hosts to avoid retrying the same server. After all chunks are assigned to servers, spawn one upload worker per server. + +2. **Upload stage (parallel per server):** Each server worker uploads its assigned chunks sequentially (FPUT). On FPUT failure, retry on the same server with backoff (because the chunk replica already exists on that server). No server failover for FPUT. + +Server selection constraints (hierarchical, `getNextServer_` Client.hs:2335-2350): +1. Prefer servers from unused operators (operator diversity) +2. Prefer servers with unused hosts (host diversity) +3. Random pick from the most-constrained candidate set +4. If all exhausted, reset tried set and start over + +#### Web client adaptation + +The web client doesn't have operators or a database. Simplified algorithm with two stages: + +**Stage 1 — Allocate:** Create chunk records on servers (FNEW). Unlike Haskell which is serial here, web FNEW runs concurrently within a concurrency limit. FNEW is a small command — concurrent FNEW on the same connection is not a problem, and concurrent FNEW across servers improves upload startup time. + +**Stage 2 — Upload:** Upload chunk data (FPUT). Parallel across servers, sequential per server (reuses per-server queues from 3.5). FPUT retries on the same server with backoff — no server rotation because the chunk replica already exists on that server. Stage 2 reads chunk data by offset (via `readChunk`), so `SentChunk` must be extended with `chunkOffset: number` (from ChunkSpec). + +```typescript +interface UploadState { + untriedServers: XFTPServer[] // servers not yet attempted — initially all servers + workingServers: XFTPServer[] // servers that succeeded FNEW +} + +const MAX_FNEW_ATTEMPTS = 5 // per chunk: try up to 5 different servers + +async function uploadFile( + agent: XFTPClientAgent, + allServers: XFTPServer[], + encrypted: EncryptedFileMetadata, + options?: UploadOptions +): Promise { + const state: UploadState = {untriedServers: [...allServers], workingServers: []} + const specs = prepareChunkSpecs(encrypted.chunkSizes) + const concurrency = options?.concurrency ?? 4 + + // Stage 1: Allocate — concurrent FNEW within concurrency limit + const sentChunks: SentChunk[] = new Array(specs.length) + const queue = specs.map((spec, i) => ({spec, chunkNo: i + 1, index: i})) + let idx = 0 + async function allocateWorker() { + while (idx < queue.length) { + const item = queue[idx++] + const {server, chunk} = await createChunkWithFailover( + agent, allServers, state, concurrency, item.spec, item.chunkNo + ) + sentChunks[item.index] = chunk + } + } + const allocateWorkers = Array.from( + {length: Math.min(concurrency, queue.length)}, + () => allocateWorker() + ) + await Promise.all(allocateWorkers) + + // Stage 2: Upload — parallel across servers, sequential per server + // readChunk reads from the encrypted file by offset (same as Phase 1 uploadFile) + let uploaded = 0 + const total = encrypted.chunkSizes.reduce((a, b) => a + b, 0) + const byServer = groupBy(sentChunks, c => formatXFTPServer(c.server)) + await Promise.all([...byServer.entries()].map(async ([srvKey, chunks]) => { + for (const chunk of chunks) { + const chunkData = await readChunk(chunk.chunkOffset, chunk.chunkSize) + await uploadXFTPChunk(agent, chunk.server, chunk.senderKey, chunk.senderId, chunkData) + uploaded += chunk.chunkSize + options?.onProgress?.(uploaded, total) + } + })) + + return buildDescriptions(encrypted, sentChunks) +} +``` + +**`createChunkWithFailover`** — server selection with per-chunk retry limit: + +```typescript +async function createChunkWithFailover( + agent: XFTPClientAgent, + allServers: XFTPServer[], + state: UploadState, + concurrency: number, + spec: ChunkSpec, + chunkNo: number +): Promise<{server: XFTPServer, chunk: SentChunk}> { + const maxAttempts = Math.min(allServers.length, MAX_FNEW_ATTEMPTS) + + for (let attempt = 0; attempt < maxAttempts; attempt++) { + const server = pickServer(allServers, state, concurrency) + try { + const chunk = await createAndPrepareChunk(agent, server, spec, chunkNo) + // Success — add to working set (if not already there) + if (!state.workingServers.some(s => formatXFTPServer(s) === formatXFTPServer(server))) { + state.workingServers.push(server) + } + return {server, chunk} + } catch (e) { + // Remove from working if it was there + state.workingServers = state.workingServers.filter( + s => formatXFTPServer(s) !== formatXFTPServer(server) + ) + if (attempt === maxAttempts - 1) throw e + } + } + throw new Error("unreachable") +} +``` + +**`pickServer`** — two-list selection: + +```typescript +function pickServer( + allServers: XFTPServer[], + state: UploadState, + concurrency: number +): XFTPServer { + // Once enough working servers found, only use those + if (state.workingServers.length >= concurrency) { + return randomPick(state.workingServers) + } + // Still exploring — pick from untried + if (state.untriedServers.length > 0) { + const idx = Math.floor(Math.random() * state.untriedServers.length) + return state.untriedServers.splice(idx, 1)[0] // remove from untried + } + // All tried — reset untried to non-working servers and retry + state.untriedServers = allServers.filter( + s => !state.workingServers.some(w => formatXFTPServer(w) === formatXFTPServer(s)) + ) + if (state.untriedServers.length > 0) { + const idx = Math.floor(Math.random() * state.untriedServers.length) + return state.untriedServers.splice(idx, 1)[0] + } + // Every server is working — pick any working + return randomPick(state.workingServers) +} +``` + +**Algorithm:** Two lists — `untriedServers` (initially all) and `workingServers` (initially empty). When `workingServers.length < concurrency`, pick from `untriedServers` (removing on pick). On FNEW success, add to `workingServers`. On FNEW failure, server is already removed from `untriedServers`; remove from `workingServers` if present. When `untriedServers` is empty, reset it to all non-working servers. Once `workingServers.length >= concurrency`, pick randomly only from `workingServers`. + +**Termination condition:** Each chunk tries at most `min(serverCount, 5)` different servers. If all attempts fail, the chunk fails and the upload fails with the last error. Rationale: if 5 out of 12 servers are down, something systemic is wrong and continuing is unlikely to help. Timeouts count as failures — the timed-out server is removed from working and a different server is picked next. + +**Key differences from Haskell:** +- No operator concept — just host diversity via random selection +- No database — state tracked in-memory during upload +- FNEW runs concurrently (Haskell is serial) — improves startup time +- FNEW is cheap and retried with server rotation; FPUT retries on same server + +**Download changes (also Phase 2):** Default concurrency should be 4 (matching Haskell). Download already groups by server in 3.5. If `replicas[0]` download fails, try `replicas[1]`, `replicas[2]`, etc. (fallback across replicas). + +## 4. Implementation Plan + +### Phase 1: Error handling and connection resilience + +Steps are ordered by dependency and should be implemented one by one. + +#### Step 1: Fix cache key (3.6) +- Change cache key to `formatXFTPServer(server)` in `getXFTPServerClient` and `closeXFTPServerClient` +- Add import for `formatXFTPServer` +- Run existing tests to verify no regression + +#### Step 2: Typed error detection for padded server errors (3.2 client-side) +- Add `XFTPRetriableError` class +- In `sendXFTPCommand`, detect padded error strings before `decodeTransmission` +- Classify `FRErr` responses as retriable or permanent with human-readable messages +- Run existing tests + +#### Step 3: Fetch timeout (3.3) +- Add `TransportConfig` with `timeoutMs` +- Thread config through `createTransport` → `connectXFTP` → command wrappers +- Add `AbortController` to browser `fetch()` and `setTimeout` to Node.js HTTP/2 +- Add vitest test: timeout triggers after configured duration +- Run existing tests + +#### Step 4: Connection state with Promise-based lock and per-server queues (3.5) +- Introduce `ServerConnection` record: `{client: Promise, queue: Promise}` +- Replace `XFTPClientAgent.clients: Map` with `connections: Map` +- Implement `reconnectClient` — replaces `conn.client` with new promise, preserves queue +- Implement `enqueueCommand` — chains operation onto server's queue +- Implement `removeStaleConnection` — removes entry only if current promise is the failed one +- Auto-cleanup: `p.catch(() => delete)` removes failed connections so next caller starts fresh +- Adapt `closeXFTPServerClient` and `closeXFTPAgent` +- Add vitest tests: + - Concurrent calls to same server produce single connection + - Failed promise is cleaned up, next caller gets fresh connection + +#### Step 5: Automatic retry in sendXFTPCommand (3.2) +- Add retry loop with reconnect +- Change `sendXFTPCommand` signature: takes `agent + server` instead of `client`; export it (needed by tests and by agent.ts callers) +- Rename current `sendXFTPCommand` → `sendXFTPCommandOnce` (private); add padded error detection + FRErr classification (throw `XFTPRetriableError` for SESSION/HANDSHAKE, `XFTPPermanentError` for AUTH/NO_FILE/etc.) +- All command wrappers (`createXFTPChunk`, `uploadXFTPChunk`, etc.) pass agent + server +- Update agent.ts call sites: remove `getXFTPServerClient` calls before command wrappers (in `uploadFile`, `uploadRedirectDescription`, `downloadFileRaw`, `resolveRedirect`, `deleteFile`) +- Max 3 retries for retriable errors, immediate throw for permanent +- On retriable error: call `reconnectClient` and retry. On retriable error exhausted: call `removeStaleConnection` to clean up. On permanent error: throw immediately without touching connection +- Add vitest tests: + - Server started with delay → first attempt fails, retry succeeds + - 3 retries exhausted → error propagates with human-readable message + - Non-retriable error (AUTH) → no retry, immediate failure + +#### Step 6: Server-side stale session handling (3.1) +- Add one guard to `Nothing` branch: `sniUsed && not webHello -> throwE SESSION` +- Remove debug `hPutStrLn stderr` lines (all 6 occurrences in dispatch) +- All other branches unchanged +- Run Haskell tests + Playwright tests + +#### Step 7: Download with per-server grouping +- Modify `downloadFileRaw` to group chunks by server, sequential within each server (`for` loop), parallel across servers (`Promise.all`) +- Add vitest test: concurrent downloads from different servers run in parallel + +#### Step 8: UI error improvements (3.4) +- Temporary errors: auto-retry loop (3 attempts), then show human-readable diagnosis + manual retry button +- Permanent errors: show human-readable error, NO retry button +- Manual retry resumes from last successful chunk (not full restart) + +#### Step 9: Remove debug logging +- Remove all `console.log('[DEBUG ...]')` and `hPutStrLn stderr "DEBUG ..."` lines +- Keep `console.error('[XFTP] ...')` error logging + +### Phase 2: Multi-server upload + +Implement after Phase 1 is complete and tested. + +#### Step 10: Multi-server upload with failover (3.7) +- Extend `SentChunk` with `chunkOffset: number` (from ChunkSpec) and `server: XFTPServer` (assigned during allocate) — Stage 2 reads data by offset and groups chunks by server +- Change `uploadFile` signature: takes `allServers: XFTPServer[]` instead of single `server` +- Implement `UploadState` with `untriedServers` and `workingServers` +- Implement `createChunkWithFailover` and `pickServer`: two-list selection (untried → working once enough found), max `min(serverCount, 5)` attempts per chunk +- Allocate stage: concurrent FNEW within concurrency limit (default 4) +- Upload stage: parallel across servers, sequential per server (reuse queue from Step 7) +- Update `web/upload.ts`: pass `getServers()` instead of `pickRandomServer(getServers())` +- Update description building: each chunk references its actual server +- Add vitest tests: + - File split across N servers (verify different servers in description) + - One server down → chunks redistributed to others + - All servers down → error after exhausting 5 attempts per chunk + +#### Step 11: Download concurrency and replica fallback +- Change default download concurrency from 1 to 4 +- If `replicas[0]` download fails, try `replicas[1]`, `replicas[2]`, etc. +- Uses per-server queues from Step 7 + +## 5. Testing Plan + +### Principle + +Prefer low-level vitest tests over Playwright E2E. Each new function gets one focused test. Pure functions tested without mocks; connection management tested with mock `connectXFTP`; server behavior tested with real server. Total: 13 tests across 4 files. + +Tests A-C run in browser context (`@vitest/browser` with Chromium headless), configured in `vitest.config.ts`. Test D (integration) requires a separate Node.js vitest config since it uses `node:http2`. Existing `globalSetup.ts` provides a real XFTP server for integration tests. + +### Test file A: `test/errors.test.ts` — pure, no server + +Tests error classification and padded error detection (Steps 2, 5). + +**T1. `isRetriable` classifies errors correctly** +```typescript +// Retriable: +expect(isRetriable(new XFTPRetriableError("SESSION"))).toBe(true) +expect(isRetriable(new XFTPRetriableError("HANDSHAKE"))).toBe(true) +expect(isRetriable(new TypeError("fetch failed"))).toBe(true) // network error +expect(isRetriable(Object.assign(new Error(), {name: "AbortError"}))).toBe(true) // timeout +// Not retriable: +expect(isRetriable(new XFTPPermanentError("AUTH", "..."))).toBe(false) +expect(isRetriable(new XFTPPermanentError("NO_FILE", "..."))).toBe(false) +expect(isRetriable(new XFTPPermanentError("INTERNAL", "..."))).toBe(false) +``` + +**T2. `categorizeError` produces human-readable messages** +```typescript +// categorizeError receives thrown errors (from sendXFTPCommandOnce or transport) +const e = categorizeError(new XFTPPermanentError("AUTH", "File is invalid, expired, or has been removed")) +expect(e.message).toContain("expired") +// Verify every permanent error type maps to a non-empty human-readable message +for (const errType of ["AUTH", "NO_FILE", "SIZE", "QUOTA", "BLOCKED", "DIGEST", "INTERNAL"]) { + expect(humanReadableMessage({type: errType}).length).toBeGreaterThan(0) +} +// Retriable errors also get human-readable messages after exhaustion +const re = categorizeError(new XFTPRetriableError("SESSION")) +expect(re.message).toContain("expired") // "Session expired, reconnecting..." +``` + +**T3. Padded error detection extracts error string from padded block** +```typescript +import {blockPad, blockUnpad} from '../src/protocol/transmission.js' +// Simulate server sending padded "SESSION" +const padded = blockPad(new TextEncoder().encode("SESSION")) +const raw = blockUnpad(padded) +expect(raw.length).toBeLessThan(20) +expect(new TextDecoder().decode(raw)).toBe("SESSION") +// Normal transmission block (batch count + large-encoded data) is NOT a short string +const sessionId = new Uint8Array(32) // dummy +const normalBlock = encodeTransmission(sessionId, new Uint8Array(0), new Uint8Array(0), encodePING()) +const normalRaw = blockUnpad(normalBlock) +expect(normalRaw.length).toBeGreaterThan(20) // not mistaken for padded error +``` + +### Test file B: `test/connection.test.ts` — mock connectXFTP, no server + +Tests connection management functions (Steps 4, 5). Uses `vi.mock` to replace `connectXFTP` with a controllable promise factory. + +**T4. `getXFTPServerClient` coalesces concurrent calls** +```typescript +// Mock connectXFTP to return a deferred promise +const {promise, resolve} = promiseWithResolvers() +vi.mocked(connectXFTP).mockReturnValueOnce(promise) +const agent = newXFTPAgent() +const p1 = getXFTPServerClient(agent, server) +const p2 = getXFTPServerClient(agent, server) +expect(p1).toBe(p2) // same promise, single connection +resolve(mockClient) +expect(await p1).toBe(mockClient) +``` + +**T5. `getXFTPServerClient` auto-cleans failed connections** +```typescript +vi.mocked(connectXFTP).mockReturnValueOnce(Promise.reject(new Error("down"))) +const agent = newXFTPAgent() +const p1 = getXFTPServerClient(agent, server) +await expect(p1).rejects.toThrow("down") +// After microtask, entry is removed +await new Promise(r => setTimeout(r, 0)) +expect(agent.connections.has(formatXFTPServer(server))).toBe(false) +// Next call creates fresh connection +vi.mocked(connectXFTP).mockReturnValueOnce(Promise.resolve(mockClient)) +const p2 = getXFTPServerClient(agent, server) +expect(p2).not.toBe(p1) +``` + +**T6. `removeStaleConnection` respects promise identity** +```typescript +const agent = newXFTPAgent() +const p1 = Promise.resolve(mockClient) +agent.connections.set(key, {client: p1, queue: Promise.resolve()}) +// Replace with reconnect +const p2 = Promise.resolve(mockClient2) +agent.connections.set(key, {client: p2, queue: Promise.resolve()}) +// removeStaleConnection with old promise does NOT remove new entry +removeStaleConnection(agent, server, p1) +expect(agent.connections.has(key)).toBe(true) +expect(agent.connections.get(key)!.client).toBe(p2) +// removeStaleConnection with current promise removes it +removeStaleConnection(agent, server, p2) +expect(agent.connections.has(key)).toBe(false) +``` + +**T7. `reconnectClient` replaces promise but preserves queue** +```typescript +const agent = newXFTPAgent() +const origQueue = Promise.resolve() +agent.connections.set(key, {client: Promise.resolve(mockClient), queue: origQueue}) +vi.mocked(connectXFTP).mockReturnValueOnce(Promise.resolve(mockClient2)) +reconnectClient(agent, server) +const conn = agent.connections.get(key)! +expect(await conn.client).toBe(mockClient2) // new client +expect(conn.queue).toBe(origQueue) // queue preserved +``` + +**T8. Retry loop: retriable error triggers reconnect, permanent error does not** + +Mock approach: `vi.mock('../src/client.js')` to mock `connectXFTP` (exported). `reconnectClient` is not exported — its behavior is controlled indirectly via `connectXFTP` mock (it calls `connectXFTP` internally). Verify retry count via `connectXFTP` call count. Note: vitest module mocking may need adjustment depending on ESM transform behavior — if intra-module calls bypass the mock, extract `connectXFTP` to a separate module or use dependency injection for testing. + +```typescript +// Script: first connectXFTP returns client whose post throws retriable, +// second connectXFTP (from reconnect) returns client whose post succeeds +vi.mocked(connectXFTP) + .mockResolvedValueOnce({ + ...mockClient, + transport: { post: async () => { throw new XFTPRetriableError("SESSION") }, close: () => {} } + }) + .mockResolvedValueOnce({ + ...mockClient, + transport: { post: async () => okResponseBlock, close: () => {} } + }) + +const agent = newXFTPAgent() +const result = await sendXFTPCommand(agent, server, dummyKey, dummyId, encodePING()) +expect(result.response.type).toBe("FROk") +expect(vi.mocked(connectXFTP)).toHaveBeenCalledTimes(2) // initial + 1 reconnect + +// Reset — all 3 retries exhausted: connectXFTP called 3 times (initial + 2 reconnects) +vi.mocked(connectXFTP).mockClear() +vi.mocked(connectXFTP).mockResolvedValue({ + ...mockClient, + transport: { post: async () => { throw new XFTPRetriableError("SESSION") }, close: () => {} } +}) +const agent2 = newXFTPAgent() +await expect(sendXFTPCommand(agent2, server, dummyKey, dummyId, encodePING())) + .rejects.toThrow(/reconnecting|expired/) +expect(vi.mocked(connectXFTP)).toHaveBeenCalledTimes(3) // initial + 2 reconnects + +// Reset — permanent error: connectXFTP called once (initial only, no reconnect) +vi.mocked(connectXFTP).mockClear() +vi.mocked(connectXFTP).mockResolvedValue({ + ...mockClient, + transport: { post: async () => authErrorBlock, close: () => {} } +}) +const agent3 = newXFTPAgent() +await expect(sendXFTPCommand(agent3, server, dummyKey, dummyId, encodePING())) + .rejects.toThrow(/expired/) +expect(vi.mocked(connectXFTP)).toHaveBeenCalledTimes(1) // initial only, no reconnect +``` + +### Test file C: `test/server-selection.test.ts` — pure, no server + +Tests `pickServer` state machine (Step 10). Determinism: seed `Math.random` or test invariants not specific picks. + +**T9. `pickServer` picks from untried when working < concurrency** +```typescript +const servers = [s1, s2, s3, s4, s5] +const state: UploadState = {untriedServers: [...servers], workingServers: []} +const picked = pickServer(servers, state, 4) +// picked is from untried, and was removed from untried +expect(state.untriedServers.length).toBe(4) +expect(state.untriedServers).not.toContainEqual(picked) +``` + +**T10. `pickServer` picks only from working when working >= concurrency** +```typescript +const state: UploadState = { + untriedServers: [s5], // still has untried + workingServers: [s1, s2, s3, s4] +} +const picked = pickServer(servers, state, 4) +// Must pick from working, NOT from untried +expect([s1, s2, s3, s4]).toContainEqual(picked) +expect(state.untriedServers.length).toBe(1) // untried unchanged +``` + +**T11. `pickServer` resets untried when exhausted** +```typescript +const state: UploadState = { + untriedServers: [], // all tried + workingServers: [s1, s2] // only 2 working, concurrency=4 +} +const picked = pickServer(servers, state, 4) +// Should have reset untried to non-working servers and picked from them +expect([s3, s4, s5]).toContainEqual(picked) +expect(state.untriedServers.length).toBe(2) // 3 non-working minus 1 picked +``` + +### Test file D: `test/integration.test.ts` — real server, Node.js mode + +Requires separate vitest config with `browser: {enabled: false}` since these tests use `node:http2` directly. Alternatively, add `test/vitest.node.config.ts` that includes only `test/integration.test.ts` and runs in Node.js. + +**T12. Stale session returns padded SESSION error (requires Step 6)** +```typescript +import http2 from 'node:http2' +// Connect and handshake normally via the client +const client = await connectXFTP(server) +// Create a raw HTTP/2 session (new TLS SessionId, no handshake state on server) +const session = http2.connect(client.baseUrl, {rejectUnauthorized: false}) +// Build a dummy command block using the old client's sessionId. +// Content doesn't matter — server detects stale session before parsing command. +const dummyKey = new Uint8Array(64) // Ed25519 private key (dummy) +const dummyId = new Uint8Array(24) // entity ID (dummy) +const cmdBlock = encodeAuthTransmission(client.sessionId, new Uint8Array(0), dummyId, encodePING(), dummyKey) +const resp = await new Promise((resolve, reject) => { + const req = session.request({":method": "POST", ":path": "/"}) + const chunks: Buffer[] = [] + req.on("data", (c: Buffer) => chunks.push(c)) + req.on("end", () => resolve(new Uint8Array(Buffer.concat(chunks)))) + req.on("error", reject) + req.end(Buffer.from(cmdBlock)) +}) +// Server should return padded "SESSION" (not crash, not "HANDSHAKE") +const raw = blockUnpad(resp.subarray(0, XFTP_BLOCK_SIZE)) +expect(new TextDecoder().decode(raw)).toBe("SESSION") +session.close() +closeXFTP(client) +``` + +**T13. Fetch timeout fires within configured duration** +```typescript +// connectXFTP with 1ms timeout — handshake requires multiple round trips, +// so even on localhost it will exceed 1ms and trigger abort +await expect( + connectXFTP(server, {timeoutMs: 1}) +).rejects.toThrow(/abort|timeout/i) +``` + +### What existing tests already cover (no new tests needed) + +| Behavior | Covered by | +|----------|-----------| +| Cache key fix (Step 1) | Existing round-trip test — uses `formatXFTPServer` after refactor | +| Basic upload/download | 24 Playwright tests + 1 vitest browser test | +| File size limits, unicode filenames | Playwright edge case tests | +| Server startup/teardown | `globalSetup.ts` / `globalTeardown.ts` | +| Handshake + identity verification | `connectXFTP` in existing round-trip test | + +### Test ordering + +Tests must be added alongside their implementation step: +- **Step 2**: Add T1, T2, T3 (test/errors.test.ts) +- **Step 3**: Add T13 (test/integration.test.ts) — requires Node.js vitest config +- **Step 4**: Add T4, T5, T6, T7 (test/connection.test.ts) +- **Step 5**: Add T8 (test/connection.test.ts) +- **Step 6**: Add T12 (test/integration.test.ts) — requires server change + Node.js vitest config +- **Step 10**: Add T9, T10, T11 (test/server-selection.test.ts) + +## 6. Context for Implementation Sessions + +### Files to re-read on session start + +**TypeScript (xftp-web/src/):** +- `client.ts` — `XFTPClient`, `XFTPClientAgent`, `getXFTPServerClient`, `closeXFTPServerClient`, `connectXFTP`, `sendXFTPCommand`, `createBrowserTransport`, `createNodeTransport`, all command wrappers +- `agent.ts` — `uploadFile`, `downloadFileRaw`, `downloadFile`, `resolveRedirect`, `encryptFileForUpload` +- `protocol/transmission.ts` — `encodeAuthTransmission`, `decodeTransmission`, `blockPad`, `blockUnpad` +- `protocol/commands.ts` — `XFTPErrorType`, `FileResponse`, `decodeResponse`, `decodeXFTPError` +- `protocol/handshake.ts` — `decodeServerHandshake` (padded error detection heuristic) +- `protocol/address.ts` — `XFTPServer`, `parseXFTPServer`, `formatXFTPServer` +- `web/upload.ts` — UI error handling, retry button +- `web/download.ts` — UI error handling, retry button +- `web/servers.ts` — `getServers`, `pickRandomServer` + +**TypeScript (xftp-web/test/):** +- `browser.test.ts` — vitest Node.js test template (uses real Haskell server) +- `globalSetup.ts` — server startup, config generation, port file +- `page.spec.ts` — Playwright page tests + +**Haskell (reference for multi-server):** +- `src/Simplex/FileTransfer/Agent.hs` — `createChunk` (lines 457-486, allocate stage), `runXFTPSndPrepareWorker` (lines 391-430, serial allocate in Haskell), `runXFTPSndWorker` (lines 494-548, per-server upload worker) +- `src/Simplex/Messaging/Agent/Client.hs` — `getNextServer_` (lines 2335-2350), `withNextSrv` (lines 2366-2385), `pickServer` (lines 2309-2314) + +**Haskell (server):** +- `src/Simplex/FileTransfer/Server.hs` — `xftpServerHandshakeV1` (lines 165-244), `processRequest` (lines 403-435) +- `src/Simplex/Messaging/Protocol.hs` — `tDecodeServer` (lines 2239-2265) — sessionId verification at line 2242 + +### Key design constraints + +1. `tDecodeServer` (Protocol.hs:2242) verifies `sessId == sessionId` — commands signed with old sessionId WILL fail on new connection +2. Server generates per-session DH key in `processHello` (Server.hs:207) — cannot be shared across sessions +3. `fetch()` provides zero control over HTTP/2 connection reuse — browser decides +4. `xftp-web-hello` header is only checked in dispatch (Server.hs:192), NOT inside `processHello` +5. Handshake-phase errors are raw padded strings; command-phase errors are proper ERR transmissions +6. Ed25519 signature verification (`TASignature` path, Protocol.hs:1314) does NOT use `thAuth` — but SMP will +7. Reconnect must re-handshake to get new sessionId AND new server DH key +8. The new `throwE SESSION` guard (Step 6) sends a raw padded "SESSION" string — no sessionId framing. Client detects this via padded error heuristic (section 3.2), not via sessionId mismatch +9. FNEW is cheap (creates chunk record on server) — retry with different server on failure +10. FPUT retries on same server (chunk replica already exists there) — close connection + backoff + +## 7. Plan Maintenance + +This plan must be updated as implementation proceeds: +- Mark completed steps with date +- Record any deviations from the plan with rationale +- Add new issues discovered during implementation +- Update file references if code moves diff --git a/src/Simplex/FileTransfer/Server.hs b/src/Simplex/FileTransfer/Server.hs index c63bfb3da..b007cbe29 100644 --- a/src/Simplex/FileTransfer/Server.hs +++ b/src/Simplex/FileTransfer/Server.hs @@ -75,7 +75,7 @@ import Simplex.Messaging.Version import System.Environment (lookupEnv) import System.Exit (exitFailure) import System.FilePath (()) -import System.IO (hPrint, hPutStrLn, stderr, universalNewlineMode) +import System.IO (hPrint, hPutStrLn, universalNewlineMode) #ifdef slow_servers import System.Random (getStdRandom, randomR) #endif @@ -165,28 +165,17 @@ xftpServer cfg@XFTPServerConfig {xftpPort, transportConfig, inactiveClientExpira xftpServerHandshakeV1 :: X.CertificateChain -> C.APrivateSignKey -> TMap SessionId Handshake -> XFTPTransportRequest -> M (Maybe (THandleParams XFTPVersion 'TServer)) xftpServerHandshakeV1 chain serverSignKey sessions XFTPTransportRequest {thParams = thParams0@THandleParams {sessionId}, request, reqBody = HTTP2Body {bodyHead}, sendResponse, sniUsed, addCORS} = do s <- atomically $ TM.lookup sessionId sessions - let sessHex = B64.encode sessionId r <- runExceptT $ case s of - Nothing -> do - liftIO $ hPutStrLn stderr $ "DEBUG dispatch: Nothing sessId=" <> show sessHex <> " webHello=" <> show webHello - processHello Nothing + Nothing + | sniUsed && not webHello -> throwE SESSION + | otherwise -> processHello Nothing Just (HandshakeSent pk) - | webHello -> do - liftIO $ hPutStrLn stderr $ "DEBUG dispatch: HandshakeSent+webHello sessId=" <> show sessHex - processHello (Just pk) - | otherwise -> do - liftIO $ hPutStrLn stderr $ "DEBUG dispatch: HandshakeSent+handshake sessId=" <> show sessHex - processClientHandshake pk + | webHello -> processHello (Just pk) + | otherwise -> processClientHandshake pk Just (HandshakeAccepted thParams) - | webHello -> do - liftIO $ hPutStrLn stderr $ "DEBUG dispatch: Accepted+webHello sessId=" <> show sessHex - processHello (serverPrivKey <$> thAuth thParams) - | webHandshake, Just auth <- thAuth thParams -> do - liftIO $ hPutStrLn stderr $ "DEBUG dispatch: Accepted+handshake sessId=" <> show sessHex - processClientHandshake (serverPrivKey auth) - | otherwise -> do - liftIO $ hPutStrLn stderr $ "DEBUG dispatch: Accepted+command sessId=" <> show sessHex - pure $ Just thParams + | webHello -> processHello (serverPrivKey <$> thAuth thParams) + | webHandshake, Just auth <- thAuth thParams -> processClientHandshake (serverPrivKey auth) + | otherwise -> pure $ Just thParams either sendError pure r where webHello = sniUsed && any (\(t, _) -> tokenKey t == "xftp-web-hello") (fst $ H.requestHeaders request) diff --git a/tests/XFTPServerTests.hs b/tests/XFTPServerTests.hs index 39dc79a89..0af3d7eca 100644 --- a/tests/XFTPServerTests.hs +++ b/tests/XFTPServerTests.hs @@ -85,6 +85,7 @@ xftpServerTests = it "should upload and receive file chunk through SNI-enabled server" testFileChunkDeliverySNI it "should complete web handshake with challenge-response" testWebHandshake it "should re-handshake on same connection with xftp-web-hello header" testWebReHandshake + it "should return padded SESSION error for stale web session" testStaleWebSession chSize :: Integral a => a chSize = kb 128 @@ -517,7 +518,7 @@ testWebHandshake = g <- C.newRandom challenge <- atomically $ C.randomBytes 32 g helloBody <- either (error . show) pure $ C.pad (smpEncode (XFTPClientHello {webChallenge = Just challenge})) xftpBlockSize - let helloReq = H2.requestBuilder "POST" "/" [] $ byteString helloBody + let helloReq = H2.requestBuilder "POST" "/" [("xftp-web-hello", "1")] $ byteString helloBody resp1 <- either (error . show) pure =<< HC.sendRequest h2 helloReq (Just 5000000) let serverHsBody = bodyHead (HC.respBody resp1) -- Decode server handshake @@ -559,7 +560,7 @@ testWebReHandshake = -- First handshake challenge1 <- atomically $ C.randomBytes 32 g helloBody1 <- either (error . show) pure $ C.pad (smpEncode (XFTPClientHello {webChallenge = Just challenge1})) xftpBlockSize - let helloReq1 = H2.requestBuilder "POST" "/" [] $ byteString helloBody1 + let helloReq1 = H2.requestBuilder "POST" "/" [("xftp-web-hello", "1")] $ byteString helloBody1 resp1 <- either (error . show) pure =<< HC.sendRequest h2 helloReq1 (Just 5000000) serverHs1 <- either (error . show) pure $ C.unPad (bodyHead (HC.respBody resp1)) XFTPServerHandshake {sessionId = sid1} <- either error pure $ smpDecode serverHs1 @@ -577,3 +578,23 @@ testWebReHandshake = -- Complete re-handshake resp2b <- either (error . show) pure =<< HC.sendRequest h2 (H2.requestBuilder "POST" "/" [] $ byteString clientHsPadded) (Just 5000000) B.length (bodyHead (HC.respBody resp2b)) `shouldBe` 0 + +testStaleWebSession :: Expectation +testStaleWebSession = + withXFTPServerSNI $ \_ -> do + Fingerprint fpWeb <- loadFileFingerprint "tests/fixtures/web_ca.crt" + let webCaHash = C.KeyHash fpWeb + cfg = defaultTransportClientConfig {clientALPN = Just ["h2"], useSNI = True} + runTLSTransportClient defaultSupportedParamsHTTPS Nothing cfg Nothing "localhost" xftpTestPort (Just webCaHash) $ \(tls :: TLS 'TClient) -> do + let h2cfg = HC.defaultHTTP2ClientConfig {HC.bodyHeadSize = 65536} + h2 <- either (error . show) pure =<< HC.attachHTTP2Client h2cfg (THDomainName "localhost") xftpTestPort mempty 65536 tls + -- Send a command on web connection without doing hello (no xftp-web-hello header) + dummyBody <- either (error . show) pure $ C.pad "PING" xftpBlockSize + let req = H2.requestBuilder "POST" "/" [] $ byteString dummyBody + resp <- either (error . show) pure =<< HC.sendRequest h2 req (Just 5000000) + let respBody = bodyHead (HC.respBody resp) + -- Server should return padded SESSION error + B.length respBody `shouldBe` xftpBlockSize + decoded <- either (error . show) pure $ C.unPad respBody + decoded `shouldBe` smpEncode SESSION + diff --git a/xftp-web/src/agent.ts b/xftp-web/src/agent.ts index 41d99abc7..0ebf825db 100644 --- a/xftp-web/src/agent.ts +++ b/xftp-web/src/agent.ts @@ -15,10 +15,10 @@ import { } from "./protocol/description.js" import type {FileInfo} from "./protocol/commands.js" import { - getXFTPServerClient, createXFTPChunk, uploadXFTPChunk, downloadXFTPChunk, downloadXFTPChunkRaw, + createXFTPChunk, uploadXFTPChunk, downloadXFTPChunk, downloadXFTPChunkRaw, deleteXFTPChunk, type XFTPClientAgent } from "./client.js" -export {newXFTPAgent, closeXFTPAgent, type XFTPClientAgent} from "./client.js" +export {newXFTPAgent, closeXFTPAgent, type XFTPClientAgent, type TransportConfig} from "./client.js" import {processDownloadedFile, decryptReceivedChunk} from "./download.js" import type {XFTPServer} from "./protocol/address.js" import {formatXFTPServer, parseXFTPServer} from "./protocol/address.js" @@ -116,7 +116,6 @@ export async function uploadFile( : () => { throw new Error("uploadFile: readChunk required when encData is absent") }) const total = encrypted.chunkSizes.reduce((a, b) => a + b, 0) const specs = prepareChunkSpecs(encrypted.chunkSizes) - const client = await getXFTPServerClient(agent, server) const sentChunks: SentChunk[] = [] let uploaded = 0 for (let i = 0; i < specs.length; i++) { @@ -133,9 +132,9 @@ export async function uploadFile( } const rcvKeysForChunk = [encodePubKeyEd25519(rcvKp.publicKey)] const {senderId, recipientIds} = await createXFTPChunk( - client, sndKp.privateKey, fileInfo, rcvKeysForChunk + agent, server, sndKp.privateKey, fileInfo, rcvKeysForChunk ) - await uploadXFTPChunk(client, sndKp.privateKey, senderId, chunkData) + await uploadXFTPChunk(agent, server, sndKp.privateKey, senderId, chunkData) sentChunks.push({ chunkNo, senderId, senderKey: sndKp.privateKey, recipientId: recipientIds[0], recipientKey: rcvKp.privateKey, @@ -188,7 +187,6 @@ async function uploadRedirectDescription( server: XFTPServer, innerFd: FileDescription ): Promise { - const client = await getXFTPServerClient(agent, server) const yaml = encodeFileDescription(innerFd) const yamlBytes = new TextEncoder().encode(yaml) const enc = encryptFileForUpload(yamlBytes, "") @@ -208,9 +206,9 @@ async function uploadRedirectDescription( } const rcvKeysForChunk = [encodePubKeyEd25519(rcvKp.publicKey)] const {senderId, recipientIds} = await createXFTPChunk( - client, sndKp.privateKey, fileInfo, rcvKeysForChunk + agent, server, sndKp.privateKey, fileInfo, rcvKeysForChunk ) - await uploadXFTPChunk(client, sndKp.privateKey, senderId, chunkData) + await uploadXFTPChunk(agent, server, sndKp.privateKey, senderId, chunkData) sentChunks.push({ chunkNo, senderId, senderKey: sndKp.privateKey, recipientId: recipientIds[0], recipientKey: rcvKp.privateKey, @@ -267,25 +265,22 @@ export async function downloadFileRaw( fd = await resolveRedirect(agent, fd) } const resolvedFd = fd - // Pre-connect to avoid race condition under concurrency - const servers = new Set(resolvedFd.chunks.map(c => c.replicas[0]?.server).filter(Boolean) as string[]) - for (const s of servers) { - await getXFTPServerClient(agent, parseXFTPServer(s)) - } - // Sliding-window parallel download + // Group chunks by server, sequential within each server, parallel across servers let downloaded = 0 - const queue = resolvedFd.chunks.slice() - let idx = 0 - async function worker() { - while (idx < queue.length) { - const i = idx++ - const chunk = queue[i] + const byServer = new Map() + for (const chunk of resolvedFd.chunks) { + const srv = chunk.replicas[0]?.server ?? "" + if (!byServer.has(srv)) byServer.set(srv, []) + byServer.get(srv)!.push(chunk) + } + await Promise.all([...byServer.entries()].map(async ([srv, chunks]) => { + const server = parseXFTPServer(srv) + for (const chunk of chunks) { const replica = chunk.replicas[0] if (!replica) throw new Error("downloadFileRaw: chunk has no replicas") - const client = await getXFTPServerClient(agent, parseXFTPServer(replica.server)) const seed = decodePrivKeyEd25519(replica.replicaKey) const kp = ed25519KeyPairFromSeed(seed) - const raw = await downloadXFTPChunkRaw(client, kp.privateKey, replica.replicaId) + const raw = await downloadXFTPChunkRaw(agent, server, kp.privateKey, replica.replicaId) await onRawChunk({ chunkNo: chunk.chunkNo, dhSecret: raw.dhSecret, @@ -296,9 +291,7 @@ export async function downloadFileRaw( downloaded += chunk.chunkSize onProgress?.(downloaded, resolvedFd.size) } - } - const workers = Array.from({length: Math.min(concurrency, queue.length)}, () => worker()) - await Promise.all(workers) + })) return resolvedFd } @@ -328,10 +321,10 @@ async function resolveRedirect( for (const chunk of fd.chunks) { const replica = chunk.replicas[0] if (!replica) throw new Error("resolveRedirect: chunk has no replicas") - const client = await getXFTPServerClient(agent, parseXFTPServer(replica.server)) + const server = parseXFTPServer(replica.server) const seed = decodePrivKeyEd25519(replica.replicaKey) const kp = ed25519KeyPairFromSeed(seed) - const data = await downloadXFTPChunk(client, kp.privateKey, replica.replicaId, chunk.digest) + const data = await downloadXFTPChunk(agent, server, kp.privateKey, replica.replicaId, chunk.digest) plaintextChunks[chunk.chunkNo - 1] = data } const totalSize = plaintextChunks.reduce((s, c) => s + c.length, 0) @@ -355,10 +348,10 @@ export async function deleteFile(agent: XFTPClientAgent, sndDescription: FileDes for (const chunk of sndDescription.chunks) { const replica = chunk.replicas[0] if (!replica) throw new Error("deleteFile: chunk has no replicas") - const client = await getXFTPServerClient(agent, parseXFTPServer(replica.server)) + const server = parseXFTPServer(replica.server) const seed = decodePrivKeyEd25519(replica.replicaKey) const kp = ed25519KeyPairFromSeed(seed) - await deleteXFTPChunk(client, kp.privateKey, replica.replicaId) + await deleteXFTPChunk(agent, server, kp.privateKey, replica.replicaId) } } diff --git a/xftp-web/src/client.ts b/xftp-web/src/client.ts index ecf0d2056..4025060b4 100644 --- a/xftp-web/src/client.ts +++ b/xftp-web/src/client.ts @@ -17,11 +17,63 @@ import {verifyIdentityProof} from "./crypto/identity.js" import {generateX25519KeyPair, encodePubKeyX25519, dh} from "./crypto/keys.js" import { encodeFNEW, encodeFADD, encodeFPUT, encodeFGET, encodeFDEL, encodePING, - decodeResponse, type FileResponse, type FileInfo + decodeResponse, type FileResponse, type FileInfo, type XFTPErrorType } from "./protocol/commands.js" import {decryptReceivedChunk} from "./download.js" import type {XFTPServer} from "./protocol/address.js" +import {formatXFTPServer} from "./protocol/address.js" import {concatBytes} from "./protocol/encoding.js" +import {blockUnpad} from "./protocol/transmission.js" + +// -- Error types + +export class XFTPRetriableError extends Error { + constructor(public readonly errorType: string) { + super(humanReadableMessage(errorType)) + this.name = "XFTPRetriableError" + } +} + +export class XFTPPermanentError extends Error { + constructor(public readonly errorType: string, message: string) { + super(message) + this.name = "XFTPPermanentError" + } +} + +export function isRetriable(e: unknown): boolean { + if (e instanceof XFTPRetriableError) return true + if (e instanceof XFTPPermanentError) return false + if (e instanceof TypeError) return true // fetch network error + if (e instanceof Error && e.name === "AbortError") return true // timeout + return false +} + +export function categorizeError(e: unknown): Error { + if (e instanceof XFTPRetriableError || e instanceof XFTPPermanentError) return e + if (e instanceof TypeError) return new XFTPRetriableError("NETWORK") + if (e instanceof Error && e.name === "AbortError") return new XFTPRetriableError("TIMEOUT") + return e instanceof Error ? e : new Error(String(e)) +} + +export function humanReadableMessage(errorType: string | XFTPErrorType): string { + const t = typeof errorType === "string" ? errorType : errorType.type + switch (t) { + case "SESSION": return "Session expired, reconnecting..." + case "HANDSHAKE": return "Connection interrupted, reconnecting..." + case "NETWORK": return "Network error, retrying..." + case "TIMEOUT": return "Server timeout, retrying..." + case "AUTH": return "File is invalid, expired, or has been removed" + case "NO_FILE": return "File not found — it may have expired" + case "SIZE": return "File size exceeds server limit" + case "QUOTA": return "Server storage quota exceeded" + case "BLOCKED": return "File has been blocked by server" + case "DIGEST": return "File integrity check failed" + case "INTERNAL": return "Server internal error" + case "CMD": return "Protocol error" + default: return "Server error: " + t + } +} // -- Types @@ -32,6 +84,12 @@ export interface XFTPClient { transport: Transport } +export interface TransportConfig { + timeoutMs: number // default 30000 (30s), lower for tests +} + +const DEFAULT_TRANSPORT_CONFIG: TransportConfig = {timeoutMs: 30000} + interface Transport { post(body: Uint8Array, headers?: Record): Promise close(): void @@ -45,21 +103,25 @@ const isNode = typeof globalThis.process !== "undefined" && globalThis.process.v // __XFTP_PROXY_PORT__ is injected by vite build (null in production) declare const __XFTP_PROXY_PORT__: string | null -async function createTransport(baseUrl: string): Promise { +async function createTransport(baseUrl: string, config: TransportConfig): Promise { if (isNode) { - return createNodeTransport(baseUrl) + return createNodeTransport(baseUrl, config) } else { - return createBrowserTransport(baseUrl) + return createBrowserTransport(baseUrl, config) } } -async function createNodeTransport(baseUrl: string): Promise { +async function createNodeTransport(baseUrl: string, config: TransportConfig): Promise { const http2 = await import("node:http2") const session = http2.connect(baseUrl, {rejectUnauthorized: false}) return { async post(body: Uint8Array, headers?: Record): Promise { return new Promise((resolve, reject) => { const req = session.request({":method": "POST", ":path": "/", ...headers}) + req.setTimeout(config.timeoutMs, () => { + req.close() + reject(Object.assign(new Error("Request timeout"), {name: "AbortError"})) + }) const chunks: Buffer[] = [] req.on("data", (chunk: Buffer) => chunks.push(chunk)) req.on("end", () => resolve(new Uint8Array(Buffer.concat(chunks)))) @@ -73,7 +135,7 @@ async function createNodeTransport(baseUrl: string): Promise { } } -function createBrowserTransport(baseUrl: string): Transport { +function createBrowserTransport(baseUrl: string, config: TransportConfig): Transport { // In dev mode, route through /xftp-proxy to avoid self-signed cert rejection // __XFTP_PROXY_PORT__ is 'proxy' in dev mode (uses relative path), null in production const effectiveUrl = typeof __XFTP_PROXY_PORT__ !== 'undefined' && __XFTP_PROXY_PORT__ @@ -81,60 +143,107 @@ function createBrowserTransport(baseUrl: string): Transport { : baseUrl return { async post(body: Uint8Array, headers?: Record): Promise { - const resp = await fetch(effectiveUrl, { - method: "POST", - headers, - body, - }) - if (!resp.ok) { - console.error('[XFTP] fetch %s failed: %d %s', effectiveUrl, resp.status, resp.statusText) - throw new Error(`Server request failed: ${resp.status} ${resp.statusText}`) + const controller = new AbortController() + const timer = setTimeout(() => controller.abort(), config.timeoutMs) + try { + const resp = await fetch(effectiveUrl, { + method: "POST", + headers, + body, + signal: controller.signal + }) + if (!resp.ok) { + console.error('[XFTP] fetch %s failed: %d %s', effectiveUrl, resp.status, resp.statusText) + throw new Error(`Server request failed: ${resp.status} ${resp.statusText}`) + } + return new Uint8Array(await resp.arrayBuffer()) + } finally { + clearTimeout(timer) } - return new Uint8Array(await resp.arrayBuffer()) }, close() {} } } -// -- Client agent (connection pool) +// -- Client agent (connection pool with Promise-based lock) + +interface ServerConnection { + client: Promise // resolves to connected client; replaced on reconnect + queue: Promise // tail of sequential command chain +} export interface XFTPClientAgent { - clients: Map + connections: Map + /** @internal Injectable for testing — defaults to connectXFTP */ + _connectFn: (server: XFTPServer) => Promise } export function newXFTPAgent(): XFTPClientAgent { - return {clients: new Map()} + return {connections: new Map(), _connectFn: connectXFTP} } -export async function getXFTPServerClient(agent: XFTPClientAgent, server: XFTPServer): Promise { - const key = "https://" + server.host + ":" + server.port - let c = agent.clients.get(key) - if (!c) { - c = await connectXFTP(server) - agent.clients.set(key, c) +export function getXFTPServerClient(agent: XFTPClientAgent, server: XFTPServer): Promise { + const key = formatXFTPServer(server) + let conn = agent.connections.get(key) + if (!conn) { + const p = agent._connectFn(server) + conn = {client: p, queue: Promise.resolve()} + agent.connections.set(key, conn) + p.catch(() => { + const cur = agent.connections.get(key) + if (cur && cur.client === p) agent.connections.delete(key) + }) + } + return conn.client +} + +export function reconnectClient(agent: XFTPClientAgent, server: XFTPServer): Promise { + const key = formatXFTPServer(server) + const old = agent.connections.get(key) + old?.client.then(c => c.transport.close(), () => {}) + const p = agent._connectFn(server) + const conn: ServerConnection = {client: p, queue: old?.queue ?? Promise.resolve()} + agent.connections.set(key, conn) + p.catch(() => { + const cur = agent.connections.get(key) + if (cur && cur.client === p) agent.connections.delete(key) + }) + return p +} + +export function removeStaleConnection( + agent: XFTPClientAgent, server: XFTPServer, failedP: Promise +): void { + const key = formatXFTPServer(server) + const conn = agent.connections.get(key) + if (conn && conn.client === failedP) { + agent.connections.delete(key) + failedP.then(c => c.transport.close(), () => {}) } - return c } export function closeXFTPServerClient(agent: XFTPClientAgent, server: XFTPServer): void { - const key = "https://" + server.host + ":" + server.port - const c = agent.clients.get(key) - if (c) { - agent.clients.delete(key) - c.transport.close() + const key = formatXFTPServer(server) + const conn = agent.connections.get(key) + if (conn) { + agent.connections.delete(key) + conn.client.then(c => c.transport.close(), () => {}) } } export function closeXFTPAgent(agent: XFTPClientAgent): void { - for (const c of agent.clients.values()) c.transport.close() - agent.clients.clear() + for (const conn of agent.connections.values()) { + conn.client.then(c => c.transport.close(), () => {}) + } + agent.connections.clear() } // -- Connect + handshake -export async function connectXFTP(server: XFTPServer): Promise { +export async function connectXFTP(server: XFTPServer, config?: Partial): Promise { + const cfg: TransportConfig = {...DEFAULT_TRANSPORT_CONFIG, ...config} const baseUrl = "https://" + server.host + ":" + server.port - const transport = await createTransport(baseUrl) + const transport = await createTransport(baseUrl, cfg) try { // Step 1: send client hello with web challenge @@ -185,9 +294,9 @@ export async function connectXFTP(server: XFTPServer): Promise { } } -// -- Send command +// -- Send command (single attempt, no retry) -async function sendXFTPCommand( +async function sendXFTPCommandOnce( client: XFTPClient, privateKey: Uint8Array, entityId: Uint8Array, @@ -204,38 +313,80 @@ async function sendXFTPCommand( } const respBlock = fullResp.subarray(0, XFTP_BLOCK_SIZE) const body = fullResp.subarray(XFTP_BLOCK_SIZE) + // Detect padded error strings (HANDSHAKE, SESSION) before decodeTransmission + const raw = blockUnpad(respBlock) + if (raw.length < 20) { + const text = new TextDecoder().decode(raw) + if (/^[A-Z_]+$/.test(text)) { + throw new XFTPRetriableError(text) + } + } const {command} = decodeTransmission(client.sessionId, respBlock) const response = decodeResponse(command) if (response.type === "FRErr") { - console.error('[XFTP] Server error: %s', response.err.type) - throw new Error("Server error: " + response.err.type) + const err = response.err + if (err.type === "SESSION" || err.type === "HANDSHAKE") { + throw new XFTPRetriableError(err.type) + } + throw new XFTPPermanentError(err.type, humanReadableMessage(err)) } return {response, body} } +// -- Send command (with retry + reconnect) + +export async function sendXFTPCommand( + agent: XFTPClientAgent, + server: XFTPServer, + privateKey: Uint8Array, + entityId: Uint8Array, + cmdBytes: Uint8Array, + chunkData?: Uint8Array, + maxRetries: number = 3 +): Promise<{response: FileResponse, body: Uint8Array}> { + let clientP = getXFTPServerClient(agent, server) + let client = await clientP + for (let attempt = 1; attempt <= maxRetries; attempt++) { + try { + return await sendXFTPCommandOnce(client, privateKey, entityId, cmdBytes, chunkData) + } catch (e) { + if (!isRetriable(e)) { + throw categorizeError(e) + } + if (attempt === maxRetries) { + removeStaleConnection(agent, server, clientP) + throw categorizeError(e) + } + clientP = reconnectClient(agent, server) + client = await clientP + } + } + throw new Error("unreachable") +} + // -- Command wrappers export async function createXFTPChunk( - c: XFTPClient, spKey: Uint8Array, file: FileInfo, + agent: XFTPClientAgent, server: XFTPServer, spKey: Uint8Array, file: FileInfo, rcvKeys: Uint8Array[], auth: Uint8Array | null = null ): Promise<{senderId: Uint8Array, recipientIds: Uint8Array[]}> { - const {response} = await sendXFTPCommand(c, spKey, new Uint8Array(0), encodeFNEW(file, rcvKeys, auth)) + const {response} = await sendXFTPCommand(agent, server, spKey, new Uint8Array(0), encodeFNEW(file, rcvKeys, auth)) if (response.type !== "FRSndIds") throw new Error("unexpected response: " + response.type) return {senderId: response.senderId, recipientIds: response.recipientIds} } export async function addXFTPRecipients( - c: XFTPClient, spKey: Uint8Array, fId: Uint8Array, rcvKeys: Uint8Array[] + agent: XFTPClientAgent, server: XFTPServer, spKey: Uint8Array, fId: Uint8Array, rcvKeys: Uint8Array[] ): Promise { - const {response} = await sendXFTPCommand(c, spKey, fId, encodeFADD(rcvKeys)) + const {response} = await sendXFTPCommand(agent, server, spKey, fId, encodeFADD(rcvKeys)) if (response.type !== "FRRcvIds") throw new Error("unexpected response: " + response.type) return response.recipientIds } export async function uploadXFTPChunk( - c: XFTPClient, spKey: Uint8Array, fId: Uint8Array, chunkData: Uint8Array + agent: XFTPClientAgent, server: XFTPServer, spKey: Uint8Array, fId: Uint8Array, chunkData: Uint8Array ): Promise { - const {response} = await sendXFTPCommand(c, spKey, fId, encodeFPUT(), chunkData) + const {response} = await sendXFTPCommand(agent, server, spKey, fId, encodeFPUT(), chunkData) if (response.type !== "FROk") throw new Error("unexpected response: " + response.type) } @@ -246,36 +397,37 @@ export interface RawChunkResponse { } export async function downloadXFTPChunkRaw( - c: XFTPClient, rpKey: Uint8Array, fId: Uint8Array + agent: XFTPClientAgent, server: XFTPServer, rpKey: Uint8Array, fId: Uint8Array ): Promise { const {publicKey, privateKey} = generateX25519KeyPair() const cmd = encodeFGET(encodePubKeyX25519(publicKey)) - const {response, body} = await sendXFTPCommand(c, rpKey, fId, cmd) + const {response, body} = await sendXFTPCommand(agent, server, rpKey, fId, cmd) if (response.type !== "FRFile") throw new Error("unexpected response: " + response.type) const dhSecret = dh(response.rcvDhKey, privateKey) return {dhSecret, nonce: response.nonce, body} } export async function downloadXFTPChunk( - c: XFTPClient, rpKey: Uint8Array, fId: Uint8Array, digest?: Uint8Array + agent: XFTPClientAgent, server: XFTPServer, rpKey: Uint8Array, fId: Uint8Array, digest?: Uint8Array ): Promise { - const {dhSecret, nonce, body} = await downloadXFTPChunkRaw(c, rpKey, fId) + const {dhSecret, nonce, body} = await downloadXFTPChunkRaw(agent, server, rpKey, fId) return decryptReceivedChunk(dhSecret, nonce, body, digest ?? null) } export async function deleteXFTPChunk( - c: XFTPClient, spKey: Uint8Array, sId: Uint8Array + agent: XFTPClientAgent, server: XFTPServer, spKey: Uint8Array, sId: Uint8Array ): Promise { - const {response} = await sendXFTPCommand(c, spKey, sId, encodeFDEL()) + const {response} = await sendXFTPCommand(agent, server, spKey, sId, encodeFDEL()) if (response.type !== "FROk") throw new Error("unexpected response: " + response.type) } -export async function pingXFTP(c: XFTPClient): Promise { +export async function pingXFTP(agent: XFTPClientAgent, server: XFTPServer): Promise { + const client = await getXFTPServerClient(agent, server) const corrId = new Uint8Array(0) - const block = encodeTransmission(c.sessionId, corrId, new Uint8Array(0), encodePING()) - const fullResp = await c.transport.post(block) + const block = encodeTransmission(client.sessionId, corrId, new Uint8Array(0), encodePING()) + const fullResp = await client.transport.post(block) if (fullResp.length < XFTP_BLOCK_SIZE) throw new Error("pingXFTP: response too short") - const {command} = decodeTransmission(c.sessionId, fullResp.subarray(0, XFTP_BLOCK_SIZE)) + const {command} = decodeTransmission(client.sessionId, fullResp.subarray(0, XFTP_BLOCK_SIZE)) const response = decodeResponse(command) if (response.type !== "FRPong") throw new Error("unexpected response: " + response.type) } diff --git a/xftp-web/test/connection.node.test.ts b/xftp-web/test/connection.node.test.ts new file mode 100644 index 000000000..c04b7dd5a --- /dev/null +++ b/xftp-web/test/connection.node.test.ts @@ -0,0 +1,164 @@ +import {test, expect, vi, beforeEach} from 'vitest' +import { + newXFTPAgent, getXFTPServerClient, reconnectClient, removeStaleConnection, + sendXFTPCommand, + XFTPRetriableError, XFTPPermanentError, + type XFTPClient, type XFTPClientAgent +} from '../src/client.js' +import {formatXFTPServer, type XFTPServer} from '../src/protocol/address.js' +import {blockPad} from '../src/protocol/transmission.js' +import {concatBytes, encodeBytes, encodeLarge} from '../src/protocol/encoding.js' + +const server: XFTPServer = { + keyHash: new Uint8Array(32), + host: "localhost", + port: "12345" +} +const key = formatXFTPServer(server) + +function makeMockClient(overrides?: Partial): XFTPClient { + return { + baseUrl: "https://localhost:12345", + sessionId: new Uint8Array(32), + xftpVersion: 3, + transport: {post: vi.fn(), close: vi.fn()}, + ...overrides + } +} + +function makeAgent(connectFn: (s: any) => Promise): XFTPClientAgent { + const agent = newXFTPAgent() + agent._connectFn = connectFn + return agent +} + +// T4: getXFTPServerClient coalesces concurrent calls +test('getXFTPServerClient coalesces concurrent calls', async () => { + let resolve_: (v: XFTPClient) => void + const promise = new Promise(r => { resolve_ = r }) + const connectFn = vi.fn(() => promise) + const agent = makeAgent(connectFn) + const p1 = getXFTPServerClient(agent, server) + const p2 = getXFTPServerClient(agent, server) + expect(p1).toBe(p2) // same promise, single connection + expect(connectFn).toHaveBeenCalledTimes(1) + const mockClient = makeMockClient() + resolve_!(mockClient) + expect(await p1).toBe(mockClient) +}) + +// T5: getXFTPServerClient auto-cleans failed connections +test('getXFTPServerClient auto-cleans failed connections', async () => { + const connectFn = vi.fn() + .mockImplementationOnce(() => Promise.reject(new Error("down"))) + .mockImplementationOnce(() => Promise.resolve(makeMockClient())) + const agent = makeAgent(connectFn) + const p1 = getXFTPServerClient(agent, server) + await expect(p1).rejects.toThrow("down") + // After microtask, entry is removed + await new Promise(r => setTimeout(r, 0)) + expect(agent.connections.has(key)).toBe(false) + // Next call creates fresh connection + const p2 = getXFTPServerClient(agent, server) + expect(p2).not.toBe(p1) + expect(connectFn).toHaveBeenCalledTimes(2) +}) + +// T6: removeStaleConnection respects promise identity +test('removeStaleConnection respects promise identity', () => { + const agent = newXFTPAgent() + const mockClient1 = makeMockClient() + const mockClient2 = makeMockClient() + const p1 = Promise.resolve(mockClient1) + agent.connections.set(key, {client: p1, queue: Promise.resolve()}) + // Replace with reconnect + const p2 = Promise.resolve(mockClient2) + agent.connections.set(key, {client: p2, queue: Promise.resolve()}) + // removeStaleConnection with old promise does NOT remove new entry + removeStaleConnection(agent, server, p1) + expect(agent.connections.has(key)).toBe(true) + expect(agent.connections.get(key)!.client).toBe(p2) + // removeStaleConnection with current promise removes it + removeStaleConnection(agent, server, p2) + expect(agent.connections.has(key)).toBe(false) +}) + +// T7: reconnectClient replaces promise but preserves queue +test('reconnectClient replaces promise but preserves queue', async () => { + const mockClient2 = makeMockClient() + const connectFn = vi.fn(() => Promise.resolve(mockClient2)) + const agent = makeAgent(connectFn) + const origQueue = Promise.resolve() + agent.connections.set(key, {client: Promise.resolve(makeMockClient()), queue: origQueue}) + reconnectClient(agent, server) + const conn = agent.connections.get(key)! + expect(await conn.client).toBe(mockClient2) // new client + expect(conn.queue).toBe(origQueue) // queue preserved +}) + +// T8: Retry loop — retriable error triggers reconnect, permanent does not +test('retry loop: retriable triggers reconnect, permanent does not', async () => { + const sessionId = new Uint8Array(32) + const dummyKey = new Uint8Array(64) + const dummyId = new Uint8Array(0) + const pingCmd = new TextEncoder().encode("PING") + + // Case 1: Retriable then success — 2 _connectFn calls + const connectFn1 = vi.fn() + .mockImplementationOnce(() => Promise.resolve(makeMockClient({ + sessionId, + transport: { + post: vi.fn().mockRejectedValueOnce(new XFTPRetriableError("SESSION")), + close: vi.fn() + } + }))) + .mockImplementationOnce(() => Promise.resolve(makeMockClient({ + sessionId, + transport: { + post: vi.fn().mockResolvedValueOnce(buildPongResponse(sessionId)), + close: vi.fn() + } + }))) + const agent1 = makeAgent(connectFn1) + const result = await sendXFTPCommand(agent1, server, dummyKey, dummyId, pingCmd) + expect(result.response.type).toBe("FRPong") + expect(connectFn1).toHaveBeenCalledTimes(2) + + // Case 2: All 3 retries exhausted — 3 _connectFn calls + const connectFn2 = vi.fn(() => Promise.resolve(makeMockClient({ + sessionId, + transport: { + post: vi.fn().mockRejectedValue(new XFTPRetriableError("SESSION")), + close: vi.fn() + } + }))) + const agent2 = makeAgent(connectFn2) + await expect(sendXFTPCommand(agent2, server, dummyKey, dummyId, pingCmd)) + .rejects.toThrow(/expired|reconnecting/) + expect(connectFn2).toHaveBeenCalledTimes(3) + + // Case 3: Permanent error — 1 _connectFn call (no reconnect) + const connectFn3 = vi.fn(() => Promise.resolve(makeMockClient({ + sessionId, + transport: { + post: vi.fn().mockRejectedValue(new XFTPPermanentError("AUTH", "expired")), + close: vi.fn() + } + }))) + const agent3 = makeAgent(connectFn3) + await expect(sendXFTPCommand(agent3, server, dummyKey, dummyId, pingCmd)) + .rejects.toThrow(/expired/) + expect(connectFn3).toHaveBeenCalledTimes(1) +}) + +// Helper: build a valid XFTP PONG response block +function buildPongResponse(sessionId: Uint8Array): Uint8Array { + const authenticator = encodeBytes(new Uint8Array(0)) + const sessBytes = encodeBytes(sessionId) + const corrId = encodeBytes(new Uint8Array(0)) + const entityId = encodeBytes(new Uint8Array(0)) + const pong = new TextEncoder().encode("PONG") + const transmission = concatBytes(authenticator, sessBytes, corrId, entityId, pong) + const batch = concatBytes(new Uint8Array([1]), encodeLarge(transmission)) + return blockPad(batch) +} diff --git a/xftp-web/test/errors.test.ts b/xftp-web/test/errors.test.ts new file mode 100644 index 000000000..980897c83 --- /dev/null +++ b/xftp-web/test/errors.test.ts @@ -0,0 +1,34 @@ +import {test, expect} from 'vitest' +import { + XFTPRetriableError, XFTPPermanentError, + isRetriable, categorizeError, humanReadableMessage +} from '../src/client.js' + +// T1: isRetriable classifies errors correctly +test('isRetriable classifies errors correctly', () => { + // Retriable: + expect(isRetriable(new XFTPRetriableError("SESSION"))).toBe(true) + expect(isRetriable(new XFTPRetriableError("HANDSHAKE"))).toBe(true) + expect(isRetriable(new TypeError("fetch failed"))).toBe(true) + expect(isRetriable(Object.assign(new Error(), {name: "AbortError"}))).toBe(true) + // Not retriable: + expect(isRetriable(new XFTPPermanentError("AUTH", "..."))).toBe(false) + expect(isRetriable(new XFTPPermanentError("NO_FILE", "..."))).toBe(false) + expect(isRetriable(new XFTPPermanentError("INTERNAL", "..."))).toBe(false) + // Unknown errors are not retriable + expect(isRetriable(new Error("random"))).toBe(false) +}) + +// T2: categorizeError produces human-readable messages +test('categorizeError produces human-readable messages', () => { + const e = categorizeError(new XFTPPermanentError("AUTH", "File is invalid, expired, or has been removed")) + expect(e.message).toContain("expired") + // Verify every permanent error type maps to a non-empty human-readable message + for (const errType of ["AUTH", "NO_FILE", "SIZE", "QUOTA", "BLOCKED", "DIGEST", "INTERNAL"]) { + expect(humanReadableMessage(errType).length).toBeGreaterThan(0) + } + // Retriable errors also get human-readable messages + const re = categorizeError(new XFTPRetriableError("SESSION")) + expect(re.message).toContain("expired") +}) + diff --git a/xftp-web/vitest.config.ts b/xftp-web/vitest.config.ts index 6f7461981..8f7b71327 100644 --- a/xftp-web/vitest.config.ts +++ b/xftp-web/vitest.config.ts @@ -30,6 +30,7 @@ export default defineConfig({ plugins: [xftpServerPlugin()], test: { include: ['test/**/*.test.ts'], + exclude: ['test/**/*.node.test.ts'], browser: { enabled: true, provider: 'playwright', diff --git a/xftp-web/vitest.node.config.ts b/xftp-web/vitest.node.config.ts new file mode 100644 index 000000000..bb90990dd --- /dev/null +++ b/xftp-web/vitest.node.config.ts @@ -0,0 +1,9 @@ +import {defineConfig} from 'vitest/config' + +export default defineConfig({ + esbuild: {target: 'esnext'}, + test: { + include: ['test/**/*.node.test.ts'], + testTimeout: 30000 + } +}) diff --git a/xftp-web/web/download.ts b/xftp-web/web/download.ts index d54091414..25443cf35 100644 --- a/xftp-web/web/download.ts +++ b/xftp-web/web/download.ts @@ -4,6 +4,7 @@ import { newXFTPAgent, closeXFTPAgent, decodeDescriptionURI, downloadFileRaw } from '../src/agent.js' +import {XFTPPermanentError} from '../src/client.js' export function initDownload(app: HTMLElement, hash: string) { let fd: ReturnType @@ -108,7 +109,10 @@ export function initDownload(app: HTMLElement, hash: string) { ring.update(1) statusText.textContent = 'Download complete' } catch (err: any) { - showError(err?.message ?? String(err)) + const msg = err?.message ?? String(err) + showError(msg) + if (err instanceof XFTPPermanentError) retryBtn.hidden = true + else retryBtn.hidden = false } finally { await backend.cleanup().catch(() => {}) closeXFTPAgent(agent) diff --git a/xftp-web/web/upload.ts b/xftp-web/web/upload.ts index b8b05bc89..12a473cf9 100644 --- a/xftp-web/web/upload.ts +++ b/xftp-web/web/upload.ts @@ -5,6 +5,7 @@ import { newXFTPAgent, closeXFTPAgent, uploadFile, encodeDescriptionURI, type EncryptedFileMetadata } from '../src/agent.js' +import {XFTPPermanentError} from '../src/client.js' const MAX_SIZE = 100 * 1024 * 1024 @@ -149,7 +150,13 @@ export function initUpload(app: HTMLElement) { }) } } catch (err: any) { - if (!aborted) showError(err?.message ?? String(err)) + if (!aborted) { + const msg = err?.message ?? String(err) + showError(msg) + // Hide retry button for permanent errors (no point retrying) + if (err instanceof XFTPPermanentError) retryBtn.hidden = true + else retryBtn.hidden = false + } } finally { await backend.cleanup().catch(() => {}) closeXFTPAgent(agent)