From c940f16f37b1852ea8bd81b45f8056e511a0c3a7 Mon Sep 17 00:00:00 2001 From: "Evgeny @ SimpleX Chat" <259188159+evgeny-simplex@users.noreply.github.com> Date: Fri, 13 Mar 2026 10:14:47 +0000 Subject: [PATCH] update agent specs --- spec/modules/Simplex/Messaging/Agent.md | 266 ++++++++++++------ .../modules/Simplex/Messaging/Agent/Client.md | 125 ++++++-- 2 files changed, 283 insertions(+), 108 deletions(-) diff --git a/spec/modules/Simplex/Messaging/Agent.md b/spec/modules/Simplex/Messaging/Agent.md index 0b1e9cec1..a52be2156 100644 --- a/spec/modules/Simplex/Messaging/Agent.md +++ b/spec/modules/Simplex/Messaging/Agent.md @@ -10,25 +10,41 @@ ## Overview -This module is the top-level messaging agent, consumed by simplex-chat and other client applications. It passes specific worker bodies, task queries, and handler logic into the frameworks defined in [Agent/Client.hs](./Agent/Client.md), and implements the orchestration policies: duplex handshake, queue rotation, ratchet synchronization, message integrity validation. +This module is the top-level SimpleX agent, consumed by simplex-chat and other client applications. It passes specific worker bodies, task queries, and handler logic into the frameworks defined in [Agent/Client.hs](./Agent/Client.md), and implements the orchestration policies: duplex handshake, queue rotation, ratchet synchronization, message integrity validation. -The agent starts four threads (in `getSMPAgentClient_`): `subscriber` (main event loop), `runNtfSupervisor` (notification token management), `cleanupManager` (periodic garbage collection), and `logServersStats` (statistics reporting). These threads are raced via `raceAny_` — if any exits, all are cancelled. +### Agent startup — backgroundMode + +`getSMPAgentClient_` accepts a `backgroundMode` flag that fundamentally changes agent capabilities: +- **Normal mode** (`backgroundMode = False`): starts four threads raced via `raceAny_` — `subscriber` (main event loop), `runNtfSupervisor` (notification management), `cleanupManager` (garbage collection), `logServersStats` (statistics). Also restores persisted server statistics. If any thread crashes, all are cancelled; statistics are saved in a `finally` block. +- **Background mode** (`backgroundMode = True`): starts only the `subscriber` thread. No cleanup, no notifications, no stats persistence. Used when the agent needs minimal receive-only operation. + +Thread crashes are caught by the `run` wrapper: if the agent is still active (`acThread` is set), the exception is reported as `CRITICAL True` to `subQ`. If the agent is being disposed, crashes are silently ignored. + +### Service + entity session mode prohibition + +Service certificates and entity transport session mode (`TSMEntity`) are mutually exclusive. This is checked in four places: `getSMPAgentClient_`, `setNetworkConfig`, `createUser'`, `setUserService'`. If violated, throws `CMD PROHIBITED`. The constraint exists because service certificates associate multiple queues under one identity, which contradicts entity session mode's goal of preventing queue correlation. ## Split-phase connection creation -`prepareConnectionLink` and `createConnectionForLink` separate link preparation (key generation, link formatting — no network) from queue creation (single network call). This prevents the race where a link is published before the queue exists on the router. The link can be shared out-of-band after `prepareConnectionLink`, and `createConnectionForLink` is called only when the user is ready to accept connections. +`prepareConnectionLink` and `createConnectionForLink` separate link preparation (key generation, link formatting — no network) from queue creation (single network call). This prevents the race where a link is published before the queue exists on the router. + +**Sender ID derivation.** The sender ID is deterministic: `SMP.EntityId $ B.take 24 $ C.sha3_384 corrId` where `corrId` is a random nonce. `createConnectionForLink` validates `actualSndId == sndId` — if the router returns a different sender ID, the connection is rejected. See source comment: "the remaining 24 bytes are reserved, possibly for notifier ID in the new notifications protocol." + +**PQ restriction.** `IKUsePQ` is prohibited for prepared links — throws `CMD PROHIBITED`. PQ keys are too large for the short link format. ## Subscriber loop — processSMPTransmissions -The subscriber thread reads batches from `msgQ` (filled by SMP protocol clients) and dispatches to `processSMPTransmissions`. Key non-obvious behaviors: +The subscriber thread reads batches from `msgQ` (filled by SMP protocol clients) and dispatches to `processSMPTransmissions`. Each batch is processed within `agentOperationBracket c AORcvNetwork waitUntilActive`, tying into the operation suspension cascade. -**Batch UP notification accumulation.** Successful subscription confirmations (`processSubOk`) append to a shared `upConnIds` TVar across the batch. A single `UP` event is emitted after all transmissions in the batch are processed, not per-transmission. Similarly, `serviceRQs` accumulates service-associated receive queues for batch processing via `processRcvServiceAssocs`. +**Batch UP notification accumulation.** Successful subscription confirmations (`processSubOk`) append to a shared `upConnIds` TVar across the batch. A single `UP` event is emitted after all transmissions are processed, not per-transmission. Similarly, `serviceRQs` accumulates service-associated receive queues for batch processing via `processRcvServiceAssocs`. -**Double validation for subscription results.** `isPendingSub` checks two conditions atomically: the queue must be in the pending map AND the client session must still be active. If either fails, the subscription result is counted as ignored (statistics only). This handles the race where a subscription response arrives after the client disconnected and a new client connected. +**Double validation for subscription results.** `isPendingSub` checks two conditions atomically: the queue must be in the pending map AND the client session must still be active (`activeClientSession`). If either fails, the result is counted as ignored (statistics only). This handles the race where a subscription response arrives after reconnection. -**subQ overflow to pendingMsgs.** `processSMP` writes events to `subQ` (bounded TBQueue) but when it's full, events go into a `pendingMsgs` TVar instead. After processing completes, pending messages are drained in reverse order. This prevents the message processing thread from blocking on a full queue, which would stall the entire SMP client. +**SUB response piggybacking MSG.** When a SUB response arrives as `Right msg@SMP.MSG {}`, the connection is marked UP (via `processSubOk`) AND the MSG is processed. The UP notification happens even if the MSG processing fails — the connection is up regardless. -**END/ENDS session validation.** Both `END` (single queue) and `ENDS` (service) check `activeClientSession` before removing subscriptions. If the session doesn't match (stale disconnect), the event is logged but ignored. This prevents a delayed END from a disconnected client from removing subscriptions that a new client established. +**subQ overflow to pendingMsgs.** `processSMP` writes events to `subQ` (bounded TBQueue) but when full, events go into a `pendingMsgs` TVar. After processing, pending messages are drained in reverse order (LIFO). This prevents the message processing thread from blocking on a full queue, which would stall the entire SMP client. + +**END/ENDS session validation.** Both check `activeClientSession` before removing subscriptions. If the session doesn't match (stale disconnect), the event is logged but ignored. ## Message processing — processSMP @@ -36,40 +52,44 @@ The subscriber thread reads batches from `msgQ` (filled by SMP protocol clients) ### Four e2e key states -The MSG handler discriminates on `(e2eDhSecret, e2ePubKey_)` — the per-queue shared secret and the incoming public key: +The MSG handler discriminates on `(e2eDhSecret, e2ePubKey_)`: -- `(Nothing, Just key)` — **Handshake phase**: no shared secret yet, public key present. Computes DH, decrypts with per-queue E2E. Dispatches to `smpConfirmation` (if AgentConfirmation) or `smpInvitation` (if AgentInvitation). -- `(Just dh, Nothing)` — **Established phase**: shared secret exists, no new key. This is normal message flow. Dispatches to `AgentRatchetKey` (ratchet renegotiation) or `AgentMsgEnvelope` (double-ratchet encrypted message). -- `(Just dh, Just _)` — **Repeated confirmation**: both present. Only AgentConfirmation is accepted (this is a retry because ACK failed), everything else is rejected. +- `(Nothing, Just key)` — **Handshake**: computes DH, decrypts with per-queue E2E. Dispatches to `smpConfirmation` or `smpInvitation`. +- `(Just dh, Nothing)` — **Established**: normal message flow. Dispatches to `AgentRatchetKey` or `AgentMsgEnvelope`. +- `(Just dh, Just _)` — **Repeated confirmation**: only AgentConfirmation is accepted (ACK for previous one failed), everything else is rejected. - `(Nothing, Nothing)` — **Error**: no keys at all. ### ACK semantics -ACK is NOT automatic for `A_MSG` — the function returns `ACKPending` and the user must call `ackMessage`. ACK IS automatic for all control messages (HELLO, QADD, QKEY, QUSE, QTEST, EREADY, A_RCVD). This is because `A_MSG` delivery to the user application must be confirmed before the message is removed from the router. +ACK is NOT automatic for `A_MSG` — the function returns `ACKPending` and the user must call `ackMessage`. ACK IS automatic for all control messages (HELLO, QADD, QKEY, QUSE, QTEST, EREADY, A_RCVD). -`handleNotifyAck` wraps each MSG processing branch: if any error occurs, it sends `ERR` to the client but still ACKs the SMP message. This prevents a processing error from causing infinite re-delivery of the same message. +`handleNotifyAck` wraps the MSG processing: if any error occurs, it sends `ERR` to the client but still ACKs the SMP message. This prevents a processing error from causing infinite re-delivery. ### agentClientMsg — transactional message processing -The inner function `agentClientMsg` performs ratchet decryption, message parsing, and integrity checking inside a single `withStore` transaction with `lockConnForUpdate`. This serializes all message processing for a given connection, preventing concurrent ratchet state modifications. The function returns the pre-decryption ratchet state (`rcPrev`) alongside the message — this is needed by `ereadyMsg` to decide whether to send EREADY. +Performs ratchet decryption, message parsing, and integrity checking inside a single `withStore` transaction with `lockConnForUpdate`. This serializes all message processing for a given connection, preventing concurrent ratchet state modifications. Returns the pre-decryption ratchet state (`rcPrev`) alongside the message — needed by `ereadyMsg` to decide whether to send EREADY. + +### Additional queue status transitions on message receipt + +When receiving an `AgentMsgEnvelope` on a non-Active queue, the queue is set to Active. For primary queues during rotation (`dbReplaceQueueId` is set), the new queue is set as primary and the old queue is scheduled for deletion via `ICQDelete`. This is how the receiving side completes queue rotation — any message on the new queue triggers cleanup of the old one. ### Duplicate message handling Three paths for `A_DUPLICATE` errors: -1. **Stored and user-acked**: `getLastMsg` finds it with `userAck = True` → `ackDel` (delete from router). -2. **Stored, A_MSG, not user-acked**: re-notify the user with `MSG` event and return `ACKPending`. The user may not have seen the original notification. -3. **Not stored or non-A_MSG**: verify via `checkDuplicateHash` that the encrypted hash exists in the DB. If it doesn't, the error is re-thrown (it's a real decryption failure, not a duplicate). +1. **Stored and user-acked**: `getLastMsg` finds it with `userAck = True` → `ackDel`. +2. **Stored, A_MSG, not user-acked**: re-notify the user with `MSG` event and return `ACKPending`. The user may not have seen the original. +3. **Not stored or non-A_MSG**: `checkDuplicateHash` verifies the encrypted hash exists in the DB. If not, re-throws (real decryption failure, not duplicate). -For crypto errors (`A_CRYPTO`): the encrypted message hash is checked for existence. If the hash already exists, the error is silently suppressed (it's a duplicate that failed decryption differently). If not, `notifySync` classifies the error via `cryptoErrToSyncState` and may trigger ratchet resynchronization. +For crypto errors (`A_CRYPTO`): if the encrypted hash already exists, suppressed (duplicate). If not, `notifySync` classifies via `cryptoErrToSyncState` (RSAllowed or RSRequired) and updates the connection's ratchet sync state. ### resetRatchetSync on successful decryption -When a double-ratchet message is successfully decrypted and the connection's ratchet sync state is not `RSOk` or `RSStarted`, the state is reset to `RSOk` and `RSYNC RSOk` is notified. This means successful message delivery is the recovery signal for ratchet desynchronization. +When a double-ratchet message is successfully decrypted and the connection's ratchet sync state is not `RSOk` or `RSStarted`, the state is reset to `RSOk` and `RSYNC RSOk` is notified. Successful message delivery is the recovery signal for ratchet desynchronization. -### updateConnVersion on every message +### updateConnVersion — monotonic upgrade -Every received `AgentMsgEnvelope` triggers `updateConnVersion`, which upgrades the connection's agreed agent version if the message's version is higher and compatible. This is a monotonic upgrade — versions only increase. The `safeVersionRange` construction handles the case where the sender's version is higher than the receiver's maximum — it creates a range from `minVersion` to the sender's version. +Every received `AgentMsgEnvelope` triggers `updateConnVersion`. If the message's agent version is higher than the current agreed version and compatible, the agreed version is upgraded. Versions only increase. `safeVersionRange` handles the case where the sender's version exceeds the receiver's maximum — creates a range from `minVersion` to the sender's version. ## Duplex handshake @@ -81,29 +101,41 @@ Receives AgentConfirmation with `e2eEncryption = Just sndParams`. Initializes th ### Accepting party (DuplexConnection) -Receives AgentConfirmation with `e2eEncryption = Nothing` and `AgentConnInfo` (not `AgentConnInfoReply`). The ratchet was already initialized during `joinConnection`. If `senderKey` is present, enqueues `ICDuplexSecure` (the queue needs to be secured with SKEY). If absent (sender already secured via LKEY), sends `CON` immediately. +Receives AgentConfirmation with `e2eEncryption = Nothing` and `AgentConnInfo` (not `AgentConnInfoReply`). The ratchet was already initialized during `joinConnection`. If `senderKey` is present, enqueues `ICDuplexSecure` (queue needs securing with SKEY). If absent (sender already secured via LKEY), sends `CON` immediately and sets the queue Active. ### HELLO exchange HELLO is processed in `helloMsg`. The key dispatch is on `sndStatus`: - `sndStatus == Active`: this side already sent HELLO, so receiving HELLO means both sides are connected → emit `CON`. -- Otherwise: this side hasn't sent HELLO yet → enqueue HELLO reply via `enqueueDuplexHello`. +- Otherwise: this side hasn't sent HELLO yet → enqueue HELLO reply. -HELLO is not used at all in fast duplex connection (v9+ SMP with SKEY — the sender secures the queue directly, skipping the HELLO exchange). +HELLO is not used in fast duplex connection (v9+ SMP with SKEY). + +### startJoinInvitation — retry-safe ratchet creation + +When retrying a join (existing `SndQueue`), `startJoinInvitation` tries to get the existing ratchet via `getSndRatchet` before creating a new one. If the ratchet exists, it reuses it. If not (error), it logs a non-blocking error via `nonBlockingWriteTBQueue` and creates a fresh ratchet. This prevents a retry from corrupting an already-established ratchet. The same pattern appears in `mkJoinInvitation` for contact URI joins. + +### PQ support negotiation + +PQ support is the AND of four conditions: the local client's PQ preference, the peer's agent version (>= `pqdrSMPAgentVersion`), the E2E encryption version (>= `pqRatchetE2EEncryptVersion`), and the connection's current PQ support. This negotiation happens at `joinConn` and `smpConfirmation` time via `versionPQSupport_` and `pqSupportAnd`. ## Queue rotation Four agent messages implement queue rotation. See [agent-protocol.md](../../../../protocol/agent-protocol.md#rotating-messaging-queue) for the protocol. Implementation-specific details: -**QADD** (processed by sender in `qAddMsg`): Creates a new `SndQueue` with DH key exchange. Before creating the new queue, deletes any previous pending replacement (`delSqs` partitioned by `dbReplaceQId`). Responds with `QKEY`. The replacement chain means multiple consecutive rotation requests are handled correctly — only the latest replacement survives. +**QADD** (processed by sender in `qAddMsg`): Creates a new `SndQueue` with DH key exchange. Deletes any previous pending replacement (`delSqs` partitioned by `dbReplaceQId`). Responds with `QKEY`. The replacement chain means consecutive rotation requests are handled correctly — only the latest survives. -**QKEY** (processed by recipient in `qKeyMsg`): Validates that the queue is `New` or `Confirmed` and the switch status is `RSSendingQADD`. Enqueues `ICQSecure` to secure the queue asynchronously — the actual KEY command is sent by `runCommandProcessing`. +**QKEY** (processed by recipient in `qKeyMsg`): Validates queue is `New` or `Confirmed` and switch status is `RSSendingQADD`. Enqueues `ICQSecure` for async processing. -**QUSE** (processed by sender in `qUseMsg`): Marks the new queue as `Secured`. Sends `QTEST` **only to the new queue**, not the old one. The old queue is deleted after QTEST is successfully delivered (handled in `runSmpQueueMsgDelivery`). +**QUSE** (processed by sender in `qUseMsg`): Marks new queue `Secured`. Sends `QTEST` **only to the new queue**. -**QTEST** (no handler): Comment explains — any message received on the new queue triggers deletion of the old queue via the `dbReplaceQueueId` logic in `processSMP`'s AgentMsgEnvelope branch. QTEST exists only to ensure at least one message traverses the new queue. +**QTEST** (no handler in processSMP): Any message on the new queue triggers old queue deletion via `dbReplaceQueueId` logic. QTEST exists only to ensure at least one message traverses the new queue. -**Ratchet sync guard**: All four handlers check `ratchetSyncSendProhibited` before proceeding. Queue rotation is blocked during ratchet desynchronization. +**Sender-side completion in delivery handler.** When `AM_QTEST_` is successfully sent in `runSmpQueueMsgDelivery`, the old send queue is removed from the connection: pending messages are deleted, the queue record is removed, and the old queue's delivery worker is deleted from `smpDeliveryWorkers` (stopping its thread). This happens inside `withConnLockNotify` to prevent deadlock with the subscriber. + +**ICQDelete error tolerance.** In `runCommandProcessing`, if deleting the old receive queue fails with a permanent error (e.g., queue already gone on router), `finalizeSwitch` still runs — the local switch completes. Only temporary errors prevent completion. + +**Ratchet sync guard**: All four message handlers check `ratchetSyncSendProhibited` before proceeding. ## Ratchet synchronization — newRatchetKey @@ -111,128 +143,192 @@ When an `AgentRatchetKey` message is received, `newRatchetKey` handles ratchet r ### Hash-ordering for initialization role -Both parties generate key pairs and exchange them. The party whose `rkHash(k1, k2)` is **lower** (lexicographic comparison) initializes as the **receiving** ratchet; the other initializes as **sending** and sends EREADY. This deterministic ordering breaks the symmetry when both parties simultaneously request ratchet sync. +Both parties generate key pairs and exchange them. The party whose `rkHash(k1, k2)` is **lower** (lexicographic comparison) initializes the **receiving** ratchet; the other initializes **sending** and sends EREADY. This breaks the symmetry when both parties simultaneously request ratchet sync. ### State machine -The current `ratchetSyncState` determines behavior: - `RSOk`, `RSAllowed`, `RSRequired` → **receiving client**: generate new keys, send `AgentRatchetKey` reply, then proceed with hash-ordering. -- `RSStarted` → **initiating client**: use the keys already stored (from `synchronizeRatchet'`), proceed with hash-ordering. -- `RSAgreed` → **error**: ratchet was already re-established but another key arrived. Sets state to `RSRequired` and throws `RATCHET_SYNC`. This handles the edge case where both parties initiate simultaneously and one has already completed. +- `RSStarted` → **initiating client**: use keys already stored (from `synchronizeRatchet'`), proceed with hash-ordering. +- `RSAgreed` → **error**: sets state to `RSRequired`, throws `RATCHET_SYNC`. Handles the edge case where both parties initiate simultaneously and one has completed. ### Deduplication -`checkRatchetKeyHashExists` prevents processing the same ratchet key message twice. The hash is stored before processing, so a duplicate delivery is detected and short-circuited via `ratchetExists`. +`checkRatchetKeyHashExists` prevents processing the same ratchet key twice. The hash is stored atomically before processing begins. ### EREADY -Sent when the ratchet was initialized as receiving (`rcSnd` is `Nothing` in the pre-decryption ratchet state). Carries `lastExternalSndId` so the other party knows which messages were sent with the old ratchet. Processed by `ereadyMsg`, which checks `rcPrev` (the ratchet state before decrypting the current message) for the same condition — if the pre-decryption ratchet had no send chain, it sends EREADY. +Sent when the ratchet was initialized as receiving (`rcSnd` is `Nothing` in the pre-decryption ratchet state). Carries `lastExternalSndId` so the other party knows which messages were sent with the old ratchet. ## Message integrity — checkMsgIntegrity -Sequential external sender ID + previous message hash chain. Five outcomes: -- **MsgOk**: `extSndId == prevExtSndId + 1` AND hashes match. -- **MsgBadId**: `extSndId < prevExtSndId` — message from the past. -- **MsgDuplicate**: `extSndId == prevExtSndId` — same ID as last message. -- **MsgSkipped**: `extSndId > prevExtSndId + 1` — gap in sequence, reports range of skipped IDs. -- **MsgBadHash**: IDs are sequential but hashes don't match — message was modified or a different message was inserted. +Sequential external sender ID + previous message hash chain. Five outcomes: `MsgOk` (sequential + hashes match), `MsgBadId` (ID from the past), `MsgDuplicate` (same ID), `MsgSkipped` (gap in sequence), `MsgBadHash` (sequential but hashes differ). -The integrity result is stored in `MsgMeta` and delivered to the client application. The agent does not reject messages with integrity failures — it reports them and continues processing. This is intentional: the client application decides the policy. +The integrity result is delivered to the client application via `MsgMeta`. The agent does not reject messages with integrity failures — it reports them and continues processing. The client decides the policy. ## Async command processing — runCommandProcessing -Uses the worker framework from [Agent/Client.hs](./Agent/Client.md#worker-framework). The worker body calls `withWork` with `getPendingServerCommand` as the task source. +Uses the worker framework from [Agent/Client.hs](./Agent/Client.md#worker-framework). Keyed by `(connId, server)` — each connection/server combination gets its own command worker. Uses `AOSndNetwork` for operation suspension. ### Internal commands -The command processor dispatches internal commands that are enqueued by message handlers and other agent operations: - -- **ICAllowSecure / ICDuplexSecure**: Complete the duplex handshake by securing the queue and sending confirmation. `ICAllowSecure` is the user-initiated path (from `allowConnection`), `ICDuplexSecure` is the automatic path (from receiving AgentConnInfo with senderKey). -- **ICQSecure / ICQDelete**: Queue rotation — secure the new queue (KEY command) and delete the old queue. -- **ICAck / ICAckDel**: Send ACK to the SMP router, optionally deleting the internal message record. -- **ICDeleteConn / ICDeleteRcvQueue**: Connection and queue cleanup. +- **ICAllowSecure**: User-initiated handshake completion (from `allowConnection`). On DuplexConnection (SKEY retry), if the error is temporary and the send queue's server differs from the command's server, the command is **moved** to the correct server queue via `updateCommandServer` + `getAsyncCmdWorker`. Returns `CCMoved` instead of `CCCompleted`. +- **ICDuplexSecure**: Automatic handshake completion (from receiving AgentConnInfo with senderKey). Secures queue and sends HELLO. +- **ICQSecure / ICQDelete**: Queue rotation — secure the new queue (KEY) and delete the old queue. +- **ICAck / ICAckDel**: Send ACK to the router, optionally deleting the internal message record. +- **ICDeleteConn**: No longer used, but may exist in old databases — cleaned up by deleting the command record. +- **ICDeleteRcvQueue**: Queue cleanup during rotation. ### Retry semantics -`runCommandProcessing` has two retry intervals: zero (immediate retry via `0`) for commands that fail with temporary errors, and `asyncCmdRetryInterval` for stuck commands. `tryMoveableCommand` attempts to skip a stuck command by marking it with a future `connId` so `getPendingServerCommand` returns the next one instead. +`tryMoveableCommand` wraps execution with `withRetryInterval`: waits for `waitWhileSuspended` and `waitForUserNetwork`, then executes. Temporary/host errors trigger retry via `retrySndOp`. On success, the command is deleted. On permanent error, the error is notified and the command is deleted. `retrySndOp` separates `endAgentOperation`/`beginAgentOperation` into separate `atomically` blocks — see source comment: if `beginAgentOperation` blocks, `SUSPENDED` won't be sent. -### withConnLockNotify +### withConnLockNotify — deadlock prevention -Wraps command execution with `withConnLock` plus automatic error notification to `subQ`. This ensures that even if a command fails, the client application is notified. +Returns `Maybe ATransmission` and writes to `subQ` **after** releasing the lock. This prevents deadlock: if the lock holder writes to a full `subQ` while the subscriber thread needs the lock to process a message, both block indefinitely. ## Message delivery — runSmpQueueMsgDelivery -Per-queue delivery loop using the worker framework. Each `SndQueue` has its own delivery worker (keyed by queue address in `smpDeliveryWorkers`). +Per-queue delivery loop. Each `SndQueue` has its own worker keyed by queue address in `smpDeliveryWorkers`, paired with a `TMVar ()` retry lock (via `getAgentWorker'`). + +### Deferred encryption + +Message bodies are NOT encrypted at enqueue time. `enqueueMessageB` advances the ratchet header (`agentRatchetEncryptHeader`) and validates padding (`rcCheckCanPad`), but stores only the body reference (`sndMsgBodyId`) and encryption key (`encryptKey`, `paddedLen`). The actual message body encoding (`encodeAgentMsgStr`) and encryption (`rcEncryptMsg`) happen at delivery time. This allows the same body to be shared across multiple send queues via `sndMsgBodyId` — each delivery encrypts independently with its connection's ratchet. + +For confirmation and ratchet key messages (AM_CONN_INFO, AM_CONN_INFO_REPLY, AM_RATCHET_INFO), the body is pre-encrypted and stored in `msgBody` directly — no deferred encryption. ### Per-message-type error handling -Error handling differs by message type and SMP error: +**QUOTA**: Checks `internalTs` against `quotaExceededTimeout`. If the message is older than the timeout, expires it and all subsequent expired messages in the queue (via `getExpiredSndMessages` → bulk `MERRS` notification). If not expired, sends `MWARN` and retries with `RISlow`. For confirmation messages (AM_CONN_INFO/AM_CONN_INFO_REPLY), QUOTA is treated as `NOT_AVAILABLE`. -**QUOTA**: The queue has exceeded its message quota. Sets `quotaExceededTs` and starts an expiry timer if `messageExpireInterval` is configured. Does NOT retry — the sender must wait for the recipient to drain messages (signaled by `A_QCONT`). +**AUTH**: Per message type: +- `AM_CONN_INFO` / `AM_CONN_INFO_REPLY` / `AM_RATCHET_INFO`: connection error `NOT_AVAILABLE` +- `AM_HELLO_` with receive queue (initiating party): `NOT_AVAILABLE`. Without receive queue (joining party): `NOT_ACCEPTED`. +- `AM_A_MSG_` / `AM_A_RCVD_` / `AM_QCONT_` / `AM_EREADY_`: delete message and notify `MERR`. +- Queue rotation messages (`AM_QADD_` through `AM_QTEST_`): queue error with descriptive string. -**AUTH**: Different response per message type: -- `A_MSG_` (user message): sends `SENT` with `SndMsgRcvQueued` status to the client. The message was accepted by the router but auth failed on the receive side — likely the queue was replaced during rotation. -- Other types: sends `MERR` error to the client. -- In both cases, if `messageExpireInterval` is configured, expired messages are deleted. +**Timeout/network errors**: message-type-aware timeout — `AM_HELLO_` uses `helloTimeout`, all others use `messageTimeout`. If expired, uses `notifyDelMsgs` which expires the current message AND fetches all expired messages for the queue in bulk. If `serverHostError`, sends `MWARN` before retrying. Non-host temporary errors retry silently. -**Timeout/network errors**: retried with the worker framework's built-in retry. The `retryLock` TMVar (paired with each delivery worker — see `getAgentWorker'` in [Agent/Client.md](./Agent/Client.md#getagentworker--lifecycle-management)) provides external retry signaling from `A_QCONT`. +### Delivery success handling + +On successful send, per message type: +- `AM_CONN_INFO` with `senderCanSecure` (fast handshake): sends `CON` + sets status `Active`. +- `AM_CONN_INFO` without `senderCanSecure`: sets status `Confirmed` only. +- `AM_CONN_INFO_REPLY`: sets status `Confirmed`. +- `AM_HELLO_`: sets status `Active`. If receive queue exists AND its status is `Active`, sends `CON` (accepting party in v2). +- `AM_A_MSG_`: sends `SENT msgId proxySrv_` to notify the client. +- `AM_QKEY_`: re-reads connection and sends `SWITCH QDSnd SPConfirmed`. +- `AM_QTEST_`: see "Sender-side completion" under Queue rotation above. +- All other types: no notification. + +After success, the delivery record is deleted. For `AM_A_MSG_`, `keepForReceipt = True` — the record is kept until a receipt is received. + +### withRetryLock2 — external retry signaling + +The delivery loop uses `withRetryLock2` which combines the standard retry interval with `qLock` (the `TMVar ()` paired with the worker). When `A_QCONT` is received, the handler puts `()` into the retry lock, causing the retry to fire immediately instead of waiting for the backoff interval. See `continueSending` in `processSMP`. + +### submitPendingMsg — operation counting + +`submitPendingMsg` increments `opsInProgress` on `msgDeliveryOp` BEFORE spawning the delivery worker. This means the operation is counted even before the worker starts, ensuring the suspension cascade waits for all enqueued deliveries. ## Batch message sending — sendMessagesB_ -`sendMessagesB_` sends messages to multiple connections. When multiple messages have the same body (common for group messages), the body is encrypted once and referenced via `VRRef` for subsequent connections. `vrCopyMap` tracks `ByteString → (VRValue encrypted)` mappings. This is a performance optimization — ratchet encryption is expensive, and group messages go to many connections with identical plaintext. +### MsgReq grouping contract -The function partitions connections by send queue and builds per-queue delivery batches. Each connection's message is encrypted with its own ratchet but the plaintext body lookup avoids redundant work. +Messages to the same connection must be contiguous in the traversable, with only the first having a non-empty `connId`. Subsequent messages for the same connection must have empty `connId`. This is validated by `addConnId` which rejects duplicate `connId` values and empty first `connId`. The `getConn_` function uses a `TVar prev` to cache the last connection lookup, avoiding redundant database reads. + +### Connection locking + +`withConnLocks` takes locks for ALL connections in the batch before processing. This prevents concurrent sends to the same connection from interleaving ratchet state updates. + +### PQ support monotonic upgrade + +When `pqEnc == PQEncOn` but the connection has `pqSupport == PQSupportOff`, PQ support is upgraded via `setConnPQSupport`. PQ support can only be enabled, never disabled. The upgrade IDs are accumulated via `mapAccumL` and applied in a single batch database write. + +### VRValue/VRRef — database body deduplication + +VRValue/VRRef deduplication operates at the **database body storage** level, not encryption. `enqueueMessageB` tracks an `IntMap (Maybe Int64, AMessage)` mapping integer indices to database body IDs (`sndMsgBodyId`): + +- `VRValue (Just i) body`: stores the body in `snd_message_bodies`, records the `sndMsgBodyId`, and associates it with index `i` for future reference. +- `VRRef i`: looks up index `i` to get the previously stored `sndMsgBodyId`, and creates a new `snd_messages` record linked to the same body. + +Encryption is NOT deduplicated — each connection's ratchet header is independently advanced at enqueue time, and each delivery encrypts the body independently. The optimization is purely about avoiding redundant database storage of identical message bodies (common for group messages). + +### Error propagation constraint + +When a connection type is wrong (e.g., SndConnection, NewConnection), the error is returned per-message but the batch continues. See source comment: "we can't fail here, as it may prevent delivery of subsequent messages that reference the body of the failed message." If a VRValue message fails, subsequent VRRef messages that reference it would break. ## Subscription management +### subscribeConnections_ + +Partitions connections by type. SndConnection with `Confirmed` status returns success (it's not subscribed, just waiting). SndConnection with `Active` status returns `CONN SIMPLEX` (can't subscribe a send-only connection). After subscribing queues, resumes delivery workers for connections with pending deliveries (via `getConnectionsForDelivery`). + +**Multi-queue result combining.** For connections with multiple receive queues, results are combined using a priority system: Active+Success (1) > Active+Error (2) > non-Active+Success (3) > non-Active+Error (4). The highest-priority (lowest number) result is used. This ensures that if at least one Active queue subscribes successfully, the connection reports success. + ### subscribeAllConnections' -Batch subscription with throttling: `maxPending` limits how many pending subscriptions exist simultaneously. When the pending count exceeds the limit, the function waits before enqueuing more. This prevents memory exhaustion on reconnection when thousands of connections need resubscription. +**Active user priority.** If `activeUserId_` is provided, that user's subscriptions are processed first (`sortOn`). -Service subscriptions are attempted first (`subscribeClientServices'`). If a service subscription succeeds, its associated queues don't need individual SUB commands — they're covered by the service subscription. Queues not associated with any service are subscribed individually. +**Service subscription with fallback.** Service subscriptions are attempted first. If a service subscription fails with `SSErrorServiceId` or zero subscribed queues, the queues are unassociated from the service and subscribed individually. If the error is a client-level error (not a service-specific error), the same fallback applies. -### resubscribeConnection' +**Pending throttle.** `maxPending` limits concurrent pending subscriptions. The counter is incremented inside the database transaction (before leaving `withStore'`) and decremented in a `finally` block. When the count exceeds the limit, `subscribeUserServer` blocks in STM via `retry`. -Individual connection resubscription. Checks connection status and queue status before subscribing — deleted or suspended connections are skipped. Used for targeted resubscription after specific operations (e.g., after `allowConnection`). +### resubscribeConnections' + +Filters out connections that already have active subscriptions (via `hasActiveSubscription`). For store errors, returns `True` for `isActiveConn` — this causes the error to be processed by `subscribeConnections_` which will report it. ## Notification token lifecycle -`registerNtfToken'` → `verifyNtfToken'` → `checkNtfToken'` → `deleteNtfToken'` manage push notification token registration with the NTF server. Token verification uses a challenge-response flow where the NTF server sends a verification code through the push notification channel, and the client confirms receipt. +`registerNtfToken'` is a complex state machine. Key non-obvious behavior: on `NTF AUTH` error during token operations, the token is removed and re-registered from scratch (see `withToken` catch of `NTF AUTH`). Device token changes trigger `replaceToken`, which attempts an in-place replacement; if that fails with a permanent error, the token is removed and recreated. ## Cleanup manager -Runs periodically (configurable interval, typically 1 minute). Operations: -- **Delete marked connections**: connections in "deleted" or "deleted-waiting-delivery" states -- **Delete expired/deleted files**: both receive and send files, with configurable TTLs -- **Clean temp paths**: remove temporary file paths from completed transfers -- **Delete orphaned users**: users with no remaining connections get `DEL_USER` notification +Runs periodically with a `cleanupStepInterval` delay BETWEEN each cleanup operation (not just between cycles). This prevents cleanup from monopolizing database access. -Each cleanup operation catches errors individually (`catchAllErrors`) — a failure in one doesn't prevent others from running. The manager uses `waitActive` to pause during agent suspension, with `tryAny` to handle the case where the agent is being shut down. +Additional cleanup not previously mentioned: +- **Expired receive message hashes**: `deleteRcvMsgHashesExpired` +- **Expired send messages**: `deleteSndMsgsExpired` +- **Expired ratchet key hashes**: `deleteRatchetKeyHashesExpired` +- **Expired notification tokens**: `deleteExpiredNtfTokensToDelete` +- **Expired send chunk replicas**: `deleteDeletedSndChunkReplicasExpired` ## Agent suspension -`suspendAgent` triggers the operation suspension cascade defined in [Agent/Client.md](./Agent/Client.md#operation-suspension-cascade). `foregroundAgent` resumes operations. The cascade ordering (RcvNetwork → MsgDelivery → SndNetwork → Database) ensures that receiving stops first, then in-flight message delivery completes, then sending stops, and finally database operations complete. +`suspendAgent` has two modes: +- **Immediate** (`maxDelay = 0`): sets `ASSuspended` and suspends all operations immediately. +- **Gradual** (`maxDelay > 0`): sets `ASSuspending` and triggers the cascade (NtfNetwork independent; RcvNetwork → MsgDelivery → SndNetwork → Database). A timeout thread fires after `maxDelay` and forces suspension of sending and database if still suspending. + +`foregroundAgent` resumes in reverse order: database → sending → delivery → receiving → notifications. ## connectReplyQueues — background duplex upgrade Used during async command processing to complete the duplex handshake. Handles two cases: -- **Fresh connection** (`sq_ = Nothing`): upgrades `RcvConnection` to `DuplexConnection` by creating a new send queue. -- **SKEY retry** (`sq_ = Just sq`): connection is already duplex from a previous attempt. Reuses the existing send queue. - -Both paths then secure the queue and enqueue the confirmation. +- **Fresh connection** (`sq_ = Nothing`): upgrades `RcvConnection` to `DuplexConnection`. +- **SKEY retry** (`sq_ = Just sq`): connection is already duplex. See source comment: "in case of SKEY retry the connection is already duplex." ## secureConfirmQueue vs secureConfirmQueueAsync -Two paths for sending the confirmation message during duplex handshake: -- **secureConfirmQueue** (synchronous): secures the queue and sends confirmation directly via network. Used in `joinConnection` (foreground user-initiated path). -- **secureConfirmQueueAsync** (asynchronous): secures the queue, stores the confirmation in the database, and submits to the delivery worker. Used in `allowConnection` (background path via `ICAllowSecure`). +- **secureConfirmQueue** (synchronous): secures queue and sends confirmation directly via network. Used in `joinConnection`. +- **secureConfirmQueueAsync** (asynchronous): secures queue, stores confirmation, submits to delivery worker. Used in `allowConnection` (via `ICAllowSecure`). -Both call `agentSecureSndQueue` first, which returns whether the initiator's ratchet should be created on confirmation (v7+ behavior). +Both call `agentSecureSndQueue`, which returns `initiatorRatchetOnConf` — whether the initiator's ratchet should be created on confirmation (v7+ behavior). When the queue was already secured (retry), returns the same flag without re-securing. ## smpConfirmation — version compatibility -The confirmation handler accepts messages where the agent version or client version is either within the configured range OR at-or-below the already-agreed version. See source comment: "checking agreed versions to continue connection in case of client/agent version downgrades." This means a downgraded client can still complete in-progress handshakes. +The confirmation handler accepts messages where the agent version or client version is either within the configured range OR at-or-below the already-agreed version. See source comment. This means a downgraded client can still complete in-progress handshakes. ## smpInvitation — contact address handling -Invitation messages received on a contact address connection are passed through even if version-incompatible. See source comment: "show connection request even if invitation via contact address is not compatible." The client application sees the `REQ` event with `PQSupportOff` when incompatible, allowing it to display the request to the user (who may choose to respond from a compatible client). +Invitation messages received on a contact address are passed through even if version-incompatible. See source comment. The client application sees `REQ` with `PQSupportOff` when incompatible. + +## ackMessage' — receipt sending + +After ACKing a message, if the user provides receipt info (`rcptInfo_`), a receipt message (`A_RCVD`) is enqueued. Receipts are only allowed for `AM_A_MSG_` type. If the user ACKs without receipt info and the message already has a receipt with `MROk` status, the corresponding sent message is deleted from the database — it's confirmed delivered. + +## acceptContactAsync' — rollback on failure + +See source comment. Unlike the synchronous `acceptContact'` which takes a lock first, `acceptContactAsync'` marks the invitation as accepted before joining. On failure, `unacceptInvitation` rolls back. The comment notes this could be improved with an invitation lock map. + +## prepareConnectionToJoin — race prevention + +See source comment. Creates a connection record without queues, returning a `ConnId`. The caller saves this ID before the peer can send a confirmation. Without this, the sequence "joinConnection → peer sends confirmation → caller saves ConnId" could result in the confirmation arriving before the caller has the ID. diff --git a/spec/modules/Simplex/Messaging/Agent/Client.md b/spec/modules/Simplex/Messaging/Agent/Client.md index e5d83675b..f1f4965b6 100644 --- a/spec/modules/Simplex/Messaging/Agent/Client.md +++ b/spec/modules/Simplex/Messaging/Agent/Client.md @@ -8,7 +8,7 @@ ## Overview -This module defines `AgentClient`, the central state container for the messaging agent, and all reusable infrastructure that Agent.hs and other consumers (NtfSubSupervisor.hs, FileTransfer/Agent.hs, simplex-chat) build upon. It covers: +This module defines `AgentClient`, the central state container for the SimpleX agent, and all reusable infrastructure that Agent.hs and other consumers (NtfSubSupervisor.hs, FileTransfer/Agent.hs, simplex-chat) build upon. It covers: - **Protocol client lifecycle**: lazy singleton connections to SMP/NTF/XFTP routers via `SessionVar` pattern, with disconnect callbacks and reconnection workers - **Worker framework**: `getAgentWorker` (lifecycle, restart rate limiting, crash recovery) + `withWork`/`withWork_`/`withWorkItems` (task retrieval with doWork flag atomics) @@ -20,14 +20,17 @@ The module is consumed by Agent.hs (which passes specific worker bodies, task qu ## AgentClient — central state container -`AgentClient` has ~50 fields, almost all TVars or TMaps. Key architectural groupings: +`AgentClient` has ~43 fields, almost all TVars or TMaps. Key architectural groupings: - **Event queues**: `subQ` (events to client application), `msgQ` (messages from SMP routers) - **Protocol client pools**: `smpClients`, `ntfClients`, `xftpClients` — all are TMaps of `TransportSession` → `SessionVar`, implementing lazy singletons via `getSessVar` - **Subscription tracking**: `currentSubs` (TSessionSubs, active+pending per transport session), `removedSubs` (failed subscriptions with errors), `subscrConns` (set of connection IDs currently subscribed) -- **Worker pools**: `smpDeliveryWorkers`, `asyncCmdWorkers`, `smpSubWorkers` — TMaps keyed by work address/connection +- **Worker pools**: `smpDeliveryWorkers`, `asyncCmdWorkers` — TMaps keyed by work address/connection. `smpSubWorkers` — TMaps keyed by transport session for resubscription. - **Operation states**: `ntfNetworkOp`, `rcvNetworkOp`, `msgDeliveryOp`, `sndNetworkOp`, `databaseOp` -- **Locking**: `connLocks`, `invLocks`, `deleteLock`, `getMsgLocks` +- **Locking**: `connLocks`, `invLocks`, `deleteLock`, `getMsgLocks`, `clientNoticesLock` +- **Service state**: `useClientServices` (per-user boolean controlling whether service certificates are used) +- **Proxy routing**: `smpProxiedRelays` (maps destination transport session → proxy server used) +- **Network state**: `userNetworkInfo`, `userNetworkUpdated`, `useNetworkConfig` (slow/fast pair) All TVars are initialized in `newAgentClient`. The `active` TVar is the global kill switch — `closeAgentClient` sets it to `False`, and all protocol client getters check it first. @@ -36,22 +39,43 @@ All TVars are initialized in `newAgentClient`. The `active` TVar is the global k Protocol client connections (SMP, NTF, XFTP) use a lazy singleton pattern implemented by [Session.hs](../../../Session.md): 1. **`getSessVar`** atomically checks the TMap. Returns `Left newVar` if absent (caller must connect), `Right existingVar` if present (caller waits for the TMVar). -2. **`newProtocolClient`** wraps the connection attempt. On success, fills the `sessionVar` TMVar with `Right client`. On failure, fills with `Left (error, maybeRetryTime)` and re-throws. +2. **`newProtocolClient`** wraps the connection attempt. On success, fills the `sessionVar` TMVar with `Right client` and writes a `CONNECT` event to `subQ`. On failure, fills with `Left (error, maybeRetryTime)` and re-throws. 3. **`waitForProtocolClient`** reads the TMVar with a timeout. If the stored error has an expiry time that has passed, it removes the SessionVar and retries from scratch — this is the `persistErrorInterval` retry mechanism. +### Error caching with persistErrorInterval + +When `newProtocolClient` fails and `persistErrorInterval > 0`, the error is cached with an expiry timestamp (`Just ts`). Future connection attempts during the interval immediately receive the cached error from `waitForProtocolClient` without attempting a connection. When `persistErrorInterval == 0`, the SessionVar is removed immediately on failure, so the next attempt starts a fresh connection. This prevents connection storms to unreachable routers. + ### SessionVar compare-and-swap `removeSessVar` (Session.hs) only removes a SessionVar from the map if its `sessionVarId` matches the current entry. The `sessionVarId` is a monotonically increasing counter from `workerSeq`. This prevents a stale disconnection callback from removing a *new* client that was created after the old one disconnected. Without this, the sequence "client A disconnects → client B connects → client A's callback runs" would incorrectly remove client B. +### SMP connection — service credentials and session setup + +`smpConnectClient` connects an SMP client, with two important post-connection steps: + +1. **Session ID registration**: `SS.setSessionId` records the TLS session ID in `currentSubs`, linking the transport session to the actual TLS connection for later session validation. + +2. **Service credential synchronization** (`updateClientService`): After connecting, compares client-side and server-side service state. Four cases: + - Both have service and IDs match → update DB (no-op if same) + - Both have service but IDs differ → update DB and remove old queue-service associations + - Client has service, server doesn't → delete client service (handles server version downgrade) + - Server has service, client doesn't → log error (should not happen in normal flow) + +On connection failure, `smpConnectClient` triggers `resubscribeSMPSession` before re-throwing the error. This ensures pending subscriptions get retry logic even when the initial connection attempt fails. + ### SMP disconnect callback -`smpClientDisconnected` is the most complex disconnect handler (NTF/XFTP have simpler versions that just remove the SessionVar): +`smpClientDisconnected` is the most complex disconnect handler (NTF/XFTP have simpler versions that remove the SessionVar and write a `DISCONNECT` event): 1. `removeSessVar` atomically removes the client if still current 2. If `active`, moves active subscriptions to pending (only those matching the disconnecting client's `sessionId` — see next section) 3. Removes proxied relay sessions that this client created -4. Fires `DOWN` events for affected connections -5. Triggers `resubscribeSMPSession` to spawn a reconnection worker +4. Fires `DISCONNECT`, `DOWN`, and `SERVICE_DOWN` events for affected connections +5. Releases GET locks for affected queues +6. Triggers resubscription (see below) + +**Resubscription mode switching**: The disconnect handler chooses between two resubscription paths based on whether the session mode matches the entity presence: `(mode == TSMEntity) == isJust cId`. When they match, it calls `resubscribeSMPSession` which handles both service and queue resubscription in a single worker. When they don't match (e.g., entity-mode session disconnects but there's also a shared session), it separately resubscribes the service and queues, because they belong to different transport sessions. ### Session-aware subscription cleanup @@ -65,6 +89,8 @@ Unifies SMP/NTF/XFTP client management with associated types: SMP is special: `SMPConnectedClient` bundles the protocol client with `proxiedRelays :: TMap SMPServer ProxiedRelayVar`, a per-connection map of relay sessions for proxy routing. +XFTP is special in a different way: its `getProtocolServerClient` ignores the `NetworkRequestMode` parameter and always uses `NRMBackground` for `waitForProtocolClient`. This means XFTP connections always use background timing regardless of the caller's request mode. + ## Worker framework Defined here, consumed by Agent.hs, NtfSubSupervisor.hs, FileTransfer/Agent.hs, and simplex-chat. Two separable parts: @@ -75,8 +101,7 @@ Creates or reuses a worker for a given key. Workers are stored in a TMap keyed b - **Create-or-reuse**: atomically checks the map. If absent, creates a new `Worker` (with `doWork` TMVar pre-filled with `()`). If present and `hasWork=True`, signals the existing worker. - **Fork**: `runWorkerAsync` takes the `action` TMVar. If `Nothing` (worker idle), it starts work. If `Just weakThreadId` (worker running), it puts the value back and returns. This bracket ensures at-most-one concurrent execution. -- **Restart rate limiting**: on worker exit (success or error), checks `restartCount` against `maxWorkerRestartsPerMin`. If under the limit, restarts with `hasWorkToDo` signal. If over the limit, deletes the worker from the map and sends a `CRITICAL True` error. -- **Worker identity**: `workerId` (from `workerSeq`) prevents a stale restart from interfering with a new worker that replaced it in the map. +- **Restart rate limiting**: on worker exit (success or error), `restartOrDelete` checks `restartCount` against `maxWorkerRestartsPerMin`. If under the limit, resets `action` to `Nothing` (idle), signals `hasWorkToDo`, and reports `INTERNAL` error. If over the limit, deletes the worker from the map and sends a `CRITICAL True` error. The restart only happens if the worker's `workerId` still matches the map entry — a stale restart from a replaced worker silently no-ops. `getAgentWorker'` is the generic version with custom worker wrapper — used by `smpDeliveryWorkers` which pairs each Worker with a `TMVar ()` retry lock. @@ -90,16 +115,20 @@ Takes `getWork` (fetch next task) and `action` (process it) as separate paramete - **Work item error** (`isWorkItemError`): the worker stops and sends `CRITICAL False`. The next iteration would likely produce the same error, so stopping prevents infinite loops. - **Store error**: the flag is re-set and an `INTERNAL` error is reported. The assumption is that store errors are transient (e.g., DB busy) and retrying may succeed. -`withWorkItems` handles batched work — a list of items where some may have individual errors. If all items are work-item errors, the worker stops. If only some are, the worker continues with the successful items and reports errors. +`withWorkItems` handles batched work — a list of items where some may have individual errors. If all items are work-item errors, the worker stops. If only some are, the worker continues with the successful items and reports errors via `ERRS` event. ### runWorkerAsync — at-most-one execution Uses a bracket on the `action` TMVar: - `takeTMVar action` — blocks if another thread is starting the worker (TMVar empty during start) -- If the taken value is `Nothing` — worker is idle, start it. Store `Just weakThreadId` in the TMVar. +- If the taken value is `Nothing` — worker is idle, start it. Store `Just weakThreadId` in the TMVar via `forkIO`. - If `Just _` — worker is already running, put it back and return. -The `Weak ThreadId` in `action` is a weak reference — it doesn't prevent the worker thread from being garbage collected. This is the cleanup mechanism: if the thread dies without explicitly clearing `action`, the weak reference becomes stale and the next `runWorkerAsync` call will detect it as idle. +The `Weak ThreadId` in `action` is a weak reference — it doesn't prevent the worker thread from being garbage collected. It is used by `cancelWorker`, which calls `deRefWeak` to get the thread ID and kills it; if the thread was already GC'd, the kill is a no-op. The primary lifecycle management is through the `restartOrDelete` chain in `getAgentWorker'`, not the weak reference. + +### throwWhenNoDelivery — delivery worker self-termination + +Delivery workers call `throwWhenNoDelivery` to check if their entry still exists in the `smpDeliveryWorkers` map. If the worker was removed (delivery complete), it throws `ThreadKilled` to terminate the worker thread. This is distinct from `throwWhenInactive` (which checks global `active` state) — it allows individual workers to be stopped without shutting down the entire agent. ## Operation suspension cascade @@ -118,10 +147,12 @@ The cascade means: **`beginAgentOperation`** retries (blocks in STM) if the operation is suspended. This provides backpressure: new operations wait until the operation is resumed. -**`agentOperationBracket`** wraps an operation with begin/end. All database access goes through `withStore` which brackets with `AODatabase`. This ensures graceful shutdown propagates: suspending `AORcvNetwork` eventually suspends all downstream operations, and `notifySuspended` only fires when all in-flight operations have completed. +**`agentOperationBracket`** wraps an operation with begin/end. It takes a `check` function that runs before `beginAgentOperation` — typically `throwWhenInactive`, which throws `ThreadKilled` if the agent is inactive. All database access goes through `withStore` which brackets with `AODatabase`. This ensures graceful shutdown propagates: suspending `AORcvNetwork` eventually suspends all downstream operations, and `notifySuspended` only fires when all in-flight operations have completed. **`waitWhileSuspended`** vs **`waitUntilForeground`**: `waitWhileSuspended` proceeds during `ASSuspending` (allowing in-flight operations to complete), while `waitUntilForeground` blocks during both `ASSuspending` and `ASSuspended`. +**`waitForUserNetwork`**: bounded wait for network — if the network doesn't come online within `userNetworkInterval`, proceeds anyway. Uses `registerDelay` for the timeout. + ## Subscription management ### subscribeQueues — batch-by-transport-session @@ -133,17 +164,28 @@ The cascade means: 3. `addPendingSubs` marks all queues as pending before the RPC 4. `mapConcurrently` subscribes each session batch in parallel -### subscribeSessQueues_ — post-hoc session validation +### subscribeSessQueues_ — post-hoc session validation and atomicity -After the subscription RPC completes, `subscribeSessQueues_` validates `activeClientSession` — checking that the SessionVar still holds the same client that was used for the RPC. If the client was replaced during the RPC (reconnection happened), the results are discarded and resubscription is triggered. This is optimistic execution with post-hoc validation: do the work, then check if it's still valid. +After the subscription RPC completes, `subscribeSessQueues_` validates `activeClientSession` — checking that the SessionVar still holds the same client that was used for the RPC. If the client was replaced during the RPC (reconnection happened), the results are discarded (errors converted to temporary `BROKER NETWORK` to ensure retry) and resubscription is triggered. + +The post-RPC processing runs under `uninterruptibleMask_` for atomicity. The sequence is: +1. **Atomically**: `processSubResults` partitions results and updates subscription state; if there are client notices, takes `clientNoticesLock` TMVar +2. **IO**: `processRcvServiceAssocs` updates service associations in the DB +3. **IO**: `processClientNotices` updates notice state, always releases `clientNoticesLock` in `finally` + +The `clientNoticesLock` TMVar serializes notice processing across concurrent subscription batches. + +**UP events for newly-active connections only**: After processing, UP events are sent only for connections that were NOT already active before this batch — existing active subscriptions (from `SS.getActiveSubs`) are excluded to prevent duplicate notifications. + +**Client close on all-temporary-error**: When ALL subscription results are temporary errors, no connections were already active, and the session is still current, the SMP client session is closed. This forces a fresh connection on the next attempt rather than reusing a potentially broken one. ### processSubResults — partitioning -Subscription results are partitioned into four categories: -1. **Failed with client notice** — queue has a server-side notice (e.g., queue status change) -2. **Failed permanently** — non-temporary error, queue is removed from pending and added to `removedSubs` -3. **Failed temporarily** — error is transient, queue stays in pending for retry on reconnect -4. **Subscribed** — moved from pending to active. Further split into: queues whose service ID matches the session service (added as service-associated) and others. +Subscription results are partitioned into five categories: +1. **Failed with client notice** — error has an associated server-side notice (e.g., queue status change). Queue is treated as failed (removed from pending, added to `removedSubs`) AND the notice is recorded for processing. +2. **Failed permanently** — non-temporary error without notice, queue is removed from pending and added to `removedSubs` +3. **Failed temporarily** — error is transient, queue stays in pending unchanged for retry on reconnect +4. **Subscribed** — moved from pending to active. Further split into: queues whose service ID matches the session service (added as service-associated) and others. If the queue had a tracked `clientNoticeId`, it is cleared (notice resolved by successful subscription). 5. **Ignored** — queue was not in the pending map (already activated by a concurrent path), counted for statistics only ### Resubscription worker @@ -155,6 +197,8 @@ Subscription results are partitioned into four categories: 3. Resubscribes service and queues 4. Loops until no pending subs remain +**Spawn guard**: Before creating a new worker, `resubscribeSMPSession` checks `SS.hasPendingSubs`. If there are no pending subs, it returns without spawning. This prevents creating idle workers. + **Cleanup blocks on TMVar fill** — the `cleanup` STM action retries (`whenM (isEmptyTMVar $ sessionVar v) retry`) until the async handle is inserted. This prevents the race where cleanup runs before the worker async is stored, which would leave a terminated worker in the map. ## Proxy routing — sendOrProxySMPCommand @@ -166,6 +210,21 @@ Implements SMP proxy/direct routing with fallback: 3. If proxying fails with a host error and `smpProxyFallback` allows it: falls back to direct connection 4. `deleteRelaySession` carefully validates that the current relay session matches the one that failed before removing it (prevents removing a concurrently-created replacement session) +**NO_SESSION retry limit**: On `NO_SESSION`, `sendViaProxy` is called recursively with `Just proxySrv` to reuse the same proxy server. If the recursive call also gets `NO_SESSION`, it throws `proxyError` instead of recursing again — `proxySrv_` is `Just`, so the `Nothing` branch (which recurses) is not taken. This limits retry to exactly one attempt. + +**Proxy selection caching** (`smpProxiedRelays`): When `getSMPProxyClient` selects a proxy for a destination, it atomically inserts the proxy→destination mapping into `smpProxiedRelays`. If a mapping already exists (another thread selected a proxy for the same destination), the existing mapping is used. On relay creation failure with non-host errors, both the relay session and proxy mapping are removed. On host errors, they are preserved to allow fallback logic. + +## Service credentials lifecycle + +`getServiceCredentials` manages per-user, per-server service certificate credentials: + +1. Checks `useClientServices` — if the user has services disabled, returns `Nothing` +2. Looks up existing credentials in DB via `getClientServiceCredentials` +3. If none exist, generates new TLS credentials on-the-fly (`genCredentials`) and stores them +4. Extracts the private signing key from the X.509 certificate + +The generated credentials are Ed25519 self-signed certificates with `simplex` organization, valid for ~2740 years. The certificate chain and hash are bundled into `ServiceCredentials` for the SMP handshake. + ## withStore — database access bracket `withStore` wraps database access with `agentOperationBracket c AODatabase`, ensuring the operation suspension cascade is respected. SQLite errors are classified: @@ -174,6 +233,8 @@ Implements SMP proxy/direct routing with fallback: `SEAgentError` is a special wrapper that allows agent-level errors to be threaded through store operations — used when "transaction-like" access is needed but the operation involves agent logic, not just DB queries. See source comment: "network IO should NOT be used inside AgentStoreMonad." +`withStoreBatch` / `withStoreBatch'` run multiple DB operations in a single transaction, catching exceptions per-operation to report individual failures. The entire batch is within one `agentOperationBracket`. + ## Server selection — getNextServer / withNextSrv Server selection has two-level diversity: @@ -184,6 +245,14 @@ Server selection has two-level diversity: `withNextSrv` is designed for retry loops — it re-reads user servers on each call (allowing configuration changes during retries) and tracks `triedHosts` across attempts. When all hosts are tried, the tried set is reset (`S.empty`), creating a round-robin effect. +## Locking primitives + +**`withConnLock`**: Per-connection lock via `connLocks` TMap. Non-obvious: `withConnLock'` with empty `ConnId` is a no-op (identity function) — allows agent operations on entities without real connection IDs to skip locking. + +**`withConnLocks`**: Takes a `Set ConnId` and acquires locks for all connections. Uses `withGetLocks` which acquires all locks concurrently via `forConcurrently`. Note: concurrent acquisition of overlapping lock sets from different threads could theoretically deadlock, so callers must ensure non-overlapping lock sets or use a higher-level coordination. + +**`getMapLock`**: Creates a lock on first access and caches it in the TMap. Locks are never removed — the TMap grows monotonically. + ## Network configuration — slow/fast selection `getNetworkConfig` selects between slow and fast network configs based on `userNetworkInfo`: @@ -198,11 +267,15 @@ Both configs are stored together in `useNetworkConfig :: TVar (NetworkConfig, Ne 2. Closes all protocol server clients (SMP, NTF, XFTP) by swapping maps to empty and forking close threads 3. Clears proxied relays 4. Cancels resubscription workers — forks cancellation threads (fire-and-forget, `closeAgentClient` may return before all workers are cancelled) -5. Clears delivery and async command workers +5. Clears delivery and async command workers (delivery workers are also cancelled via `cancelWorker`) 6. Clears subscription state The cancellation of resubscription workers reads the TMVar first (to get the Async handle), then calls `uninterruptibleCancel`. This is wrapped in a forked thread to avoid blocking the shutdown sequence. +**`closeClient_` edge case**: When closing individual clients, `closeClient_` handles `BlockedIndefinitelyOnSTM` — which occurs if the SessionVar TMVar was never filled (connection attempt in progress when shutdown started). The exception is caught and treated as a no-op. + +**`reconnectServerClients` vs `closeProtocolServerClients`**: `closeProtocolServerClients` swaps the map to empty and closes all clients — no new connections can be made to those sessions. `reconnectServerClients` reads the map without clearing it and closes current clients — the disconnect callbacks trigger reconnection, effectively forcing fresh connections while keeping the session entries. + ## Transport session modes `TransportSessionMode` (`TSMEntity` vs other) determines whether the transport session key includes the entity ID (connection/queue ID). When `TSMEntity`, each queue gets its own TLS connection to the router. When not, queues to the same router share a connection. This is controlled by `sessionMode` in the network config. @@ -213,9 +286,15 @@ The cancellation of resubscription workers reads the TMVar first (to get the Asy `getQueueMessage` creates a TMVar lock keyed by `(server, rcvId)` and takes it before sending GET. This prevents concurrent GET and SUB on the same queue (SUB is checked via `hasGetLock` in `checkQueues`). The lock is released by `releaseGetLock` after ACK or on error. +The lock creation uses `TM.alterF` to atomically create-or-reuse: if no lock exists, creates a new `TMVar ()` and immediately takes it; if one exists, takes it. This avoids a race between two concurrent GET attempts on the same queue. + ## Error classification — temporaryAgentError Classifies errors as temporary (retryable) or permanent. Notable non-obvious classifications: - `TEHandshake BAD_SERVICE` is temporary — it indicates a DB error on the router, not a permanent rejection -- `CRITICAL True` is temporary — `True` means the error shows a restart button, implying the user should retry +- `CRITICAL True` is temporary — `True` means the error shows a restart button, implying the user should retry. `CRITICAL False` is permanent. - `INACTIVE` is temporary — the agent may be reactivated +- `SMP.PROXY NO_SESSION` via proxy is temporary — session can be re-established +- `SMP.STORE _` is temporary — server-side store error, not a client issue + +`temporaryOrHostError` extends `temporaryAgentError` to also include host-related errors (`HOST`, `TRANSPORT TEVersion`). Used in subscription management where host errors should trigger resubscription rather than permanent failure.