diff --git a/spec/README.md b/spec/README.md index 83ce5097c..7154aa957 100644 --- a/spec/README.md +++ b/spec/README.md @@ -60,5 +60,8 @@ Function documentation format: - [remote-control.md](remote-control.md) — XRCP implementation - [compression.md](compression.md) — Zstd compression +### Cross-cutting Features +- [rcv-services.md](rcv-services.md) — Service certificates for high-volume SMP clients (bulk subscription) + ### Security - [security-invariants.md](security-invariants.md) — All security invariants diff --git a/spec/rcv-services.md b/spec/rcv-services.md new file mode 100644 index 000000000..b0d97d9f7 --- /dev/null +++ b/spec/rcv-services.md @@ -0,0 +1,741 @@ +# Receive Services (Service Certificates) + +> Cross-cutting specification for the rcv-services feature: service certificates enabling high-volume SMP clients (notification routers, chat relays, directory services) to bulk-subscribe to queues. + +**Source branch**: `rcv-services` +**Protocol reference**: [`protocol/simplex-messaging.md`](../protocol/simplex-messaging.md) +**Phase**: 3.0a (Protocol + Transport + Server), 3.0b (Client + Agent + Store + NTF) + +## Overview + +A **service client** is a high-volume SMP client that presents a TLS client certificate during handshake. The server assigns it a persistent `ServiceId` derived from the certificate fingerprint. Individual queues are then **associated** with this ServiceId via per-queue `SUB` commands carrying a service signature. Once associated, the service client can **bulk-subscribe** all its queues in a single `SUBS` command instead of issuing per-queue `SUB` commands on each reconnection. + +This matters for notification servers, chat relays, and directory services that manage thousands to millions of queues per SMP server. Without service certificates, reconnection requires O(n) SUB commands; with them, it requires O(1) SUBS. + +### Design summary + +``` +Service client SMP Server + | | + |---- TLS + service cert --------->| Three-way handshake + |<--- ServiceId -------------------| (Transport layer) + | | + |---- SUB + service sig ---------->| Per-queue association + |<--- SOK(ServiceId) --------------| (Protocol layer, one-time) + | ...repeat per queue... | + | | + |---- SUBS count idsHash --------->| Bulk subscribe + |<--- SOKS count' idsHash' --------| (count/hash from server) + |<--- MSG ... MSG ... MSG ---------| Buffered messages + |<--- ALLS ------------------------| All delivered + | | +``` + +## Version gates + +| Constant | Value | Gate | Source | +|----------|-------|------|--------| +| `serviceCertsSMPVersion` | 16 | Service handshake, `SOK`, `useServiceAuth` | Transport.hs:214 | +| `rcvServiceSMPVersion` | 19 | `SUBS`/`NSUBS` parameters, `SOKS`/`ENDS` idsHash, messaging service role in handshake | Transport.hs:223 | + +The two-version split means: +- v16-18 servers accept service certificates and per-queue `SUB` with service auth, but `SUBS`/`NSUBS` send no count/hash parameters (bare command tag only). +- v19+ servers send and receive full count + idsHash with `SUBS`/`NSUBS`/`SOKS`/`ENDS`. +- Messaging services (`SRMessaging`) are only included in the client handshake at v >= 19. Notifier services (`SRNotifier`) are included at v >= 16. + +## Types + +### ServiceId + +`ServiceId` is an `EntityId` (24-byte base64url-encoded identifier) assigned by the server during the three-way handshake. It is derived from the service certificate fingerprint via `getCreateService` in QueueStore. + +### SMPServiceRole + +```haskell +data SMPServiceRole = SRMessaging | SRNotifier | SRProxy +-- Wire: "M" | "N" | "P" +``` +Source: Transport.hs:594 + +### Party (service-related constructors) + +```haskell +data Party = ... | RecipientService | NotifierService | ... +``` +Source: Protocol.hs:335-346 + +The `ServiceParty` type family constrains to `RecipientService | NotifierService` only: +```haskell +type family ServiceParty (p :: Party) :: Constraint where + ServiceParty RecipientService = () + ServiceParty NotifierService = () + ServiceParty p = (Int ~ Bool, TypeError ...) -- compile-time error +``` +Source: Protocol.hs:430-434 + +### IdsHash + +16-byte XOR of MD5 hashes, used for drift detection between client and server subscription state. + +```haskell +newtype IdsHash = IdsHash {unIdsHash :: BS.ByteString} + +instance Semigroup IdsHash where + (IdsHash s1) <> (IdsHash s2) = IdsHash $! BS.pack $ BS.zipWith xor s1 s2 + +instance Monoid IdsHash where + mempty = IdsHash $ BS.replicate 16 0 + +queueIdHash :: QueueId -> IdsHash +queueIdHash = IdsHash . C.md5Hash . unEntityId +``` +Source: Protocol.hs:1501-1526 + +**Key property**: XOR is self-inverse, so `addServiceSubs` and `subtractServiceSubs` both use `<>` (XOR) for the hash component: +```haskell +addServiceSubs (n', idsHash') (n, idsHash) = (n + n', idsHash <> idsHash') +subtractServiceSubs (n', idsHash') (n, idsHash) + | n > n' = (n - n', idsHash <> idsHash') + | otherwise = (0, mempty) +``` +Source: Protocol.hs:1528-1534 + +### ServiceSub / ServiceSubResult / ServiceSubError + +Client-side types for comparing expected vs actual subscription state: +```haskell +data ServiceSub = ServiceSub + { smpServiceId :: ServiceId, + smpQueueCount :: Int64, + smpQueueIdsHash :: IdsHash } + +data ServiceSubResult = ServiceSubResult (Maybe ServiceSubError) ServiceSub + +data ServiceSubError + = SSErrorServiceId {expectedServiceId, subscribedServiceId :: ServiceId} + | SSErrorQueueCount {expectedQueueCount, subscribedQueueCount :: Int64} + | SSErrorQueueIdsHash {expectedQueueIdsHash, subscribedQueueIdsHash :: IdsHash} +``` +Source: Protocol.hs:1476-1499 + +`serviceSubResult` compares expected vs actual, returning the first mismatch (priority: serviceId > count > idsHash). + +### STMService (QueueStore) + +```haskell +data STMService = STMService + { serviceRec :: ServiceRec, + serviceRcvQueues :: TVar (Set RecipientId, IdsHash), + serviceNtfQueues :: TVar (Set NotifierId, IdsHash) } +``` +Source: QueueStore/STM.hs:64-68 + +Tracks the set of queue IDs and their cumulative XOR hash per service, per role (receive vs notify). + +## Transport layer: service handshake + +### Three-way handshake + +Standard SMP handshake is two messages: server sends `SMPServerHandshake`, client sends `SMPClientHandshake`. Service clients extend this to three messages: + +1. **Server -> Client**: `SMPServerHandshake` (standard, with session ID and auth key) +2. **Client -> Server**: `SMPClientHandshake` with `clientService :: Maybe SMPClientHandshakeService` +3. **Server -> Client**: `SMPServerHandshakeResponse {serviceId}` or `SMPServerHandshakeError {handshakeError}` + +Source: Transport.hs:752-791 (server), Transport.hs:796-848 (client) + +### SMPClientHandshakeService + +```haskell +data SMPClientHandshakeService = SMPClientHandshakeService + { serviceRole :: SMPServiceRole, + serviceCertKey :: CertChainPubKey } +``` +Source: Transport.hs:582-585 + +The `serviceCertKey` contains the TLS client certificate chain and a proof-of-possession: the service's Ed25519 session key signed by the service's X.509 signing key (`C.signX509 serviceSignKey $ C.publicToX509 k`). + +### Server-side validation (`getClientService`) + +1. Verify certificate chain matches TLS peer certificate: `getPeerCertChain c == cc` +2. Extract identity certificate and service key from chain +3. Verify signed session key: `C.verifyX509 serviceCertKey exact` +4. Compute fingerprint: `XV.getFingerprint idCert X.HashSHA256` +5. Call `getService` callback (QueueStore.getCreateService) to get/create ServiceId +6. Send `SMPServerHandshakeResponse {serviceId}` back to client + +Source: Transport.hs:775-791 + +### Client-side reception (`getClientService`) + +Client receives either `SMPServerHandshakeResponse {serviceId}` (success) or `SMPServerHandshakeError {handshakeError}` (failure). On success, stores `THClientService {serviceId, serviceRole, serviceCertHash, serviceKey}`. + +Source: Transport.hs:843-847 + +### Version-gated service role filtering (`mkClientService`) + +```haskell +mkClientService v (ServiceCredentials {serviceRole, ...}, (k, _)) + | serviceRole == SRMessaging && v < rcvServiceSMPVersion = Nothing + | otherwise = Just SMPClientHandshakeService {..} +``` +Source: Transport.hs:838-842 + +Messaging services are suppressed below v19. Notifier services are sent at v16+. + +### ServiceCredentials (client-side persistent state) + +```haskell +data ServiceCredentials = ServiceCredentials + { serviceRole :: SMPServiceRole, + serviceCreds :: T.Credential, -- TLS certificate + private key + serviceCertHash :: XV.Fingerprint, + serviceSignKey :: C.APrivateSignKey } +``` +Source: Transport.hs:587-592 + +## Protocol layer: commands and messages + +### Commands + +| Command | Party | Entity | Auth | Description | +|---------|-------|--------|------|-------------| +| `SUB` | Recipient | QueueId | Queue key + optional service sig | Subscribe single queue; if service sig present, associates queue with service | +| `NSUB` | Notifier | NotifierId | Queue key + optional service sig | Subscribe single notifier; if service sig present, associates with service | +| `NEW` | Creator | NoEntity | Queue key + optional service sig | Create queue; if service sig present, associates at creation | +| `SUBS count idsHash` | RecipientService | ServiceId | Service session key | Bulk-subscribe all associated receive queues | +| `NSUBS count idsHash` | NotifierService | ServiceId | Service session key | Bulk-subscribe all associated notifier queues | + +### Double authenticator (`useServiceAuth`) + +Only `NEW`, `SUB`, and `NSUB` carry a service signature (when sent from a service connection): +```haskell +useServiceAuth = \case + Cmd _ (NEW _) -> True + Cmd _ SUB -> True + Cmd _ NSUB -> True + _ -> False +``` +Source: Protocol.hs:1737-1742 + +For these commands, `tEncodeAuth` appends both the primary queue key signature and an optional service Ed25519 signature. `SUBS`/`NSUBS` use the ServiceId as entity and are signed only by the service session key. + +### Broker messages (responses) + +| Message | Fields | Description | +|---------|--------|-------------| +| `SOK` | `Maybe ServiceId` | Per-queue subscription success; `Just serviceId` when queue was associated with service | +| `SOKS` | `Int64, IdsHash` | Bulk subscription success; server's actual count and hash | +| `ALLS` | (none) | Marker: all buffered messages for this SUBS have been delivered | +| `END` | (none) | Per-queue subscription ended (another client subscribed) | +| `ENDS` | `Int64, IdsHash` | Service subscription ended (another service client took over); server's count and hash at takeover time | + +### Wire encoding (version-dependent) + +**SUBS/NSUBS encoding:** +``` +v >= 19: tag SP count idsHash +v < 19: tag (bare, no parameters) +``` +Source: Protocol.hs:1769-1771, 1787-1789 + +**SOKS/ENDS encoding:** +``` +v >= 19: tag SP count idsHash +v < 19: tag SP count (no idsHash) +``` +Source: Protocol.hs:1951-1953 + +**SOKS/ENDS decoding:** +``` +v >= 19: tag -> resp <$> _smpP <*> smpP (count + idsHash) +v < 19: tag -> resp <$> _smpP <*> pure mempty (count only, mempty hash) +``` +Source: Protocol.hs:1996-1998 + +## Server layer + +### Client state (Env/STM.hs) + +Each connected client tracks: +```haskell +data Client s = Client + { ... + serviceSubscribed :: TVar Bool, -- has SUBS been received? + ntfServiceSubscribed :: TVar Bool, -- has NSUBS been received? + serviceSubsCount :: TVar (Int64, IdsHash), -- running (count, hash) for receive queues + ntfServiceSubsCount :: TVar (Int64, IdsHash), -- running (count, hash) for notifier queues + ... } +``` +Source: Env/STM.hs:437-456 + +Server-global state: +```haskell +data ServerSubscribers s = ServerSubscribers + { subQ :: TQueue (ClientSub, ClientId), + queueSubscribers :: SubscribedClients s, -- per-queue lookup + serviceSubscribers :: SubscribedClients s, -- per-service lookup + totalServiceSubs :: TVar (Int64, IdsHash), -- global service sub count + subClients :: TVar IntSet, + pendingEvents :: TVar (IntMap (NonEmpty (EntityId, BrokerMsg))) } +``` +Source: Env/STM.hs:362-369 + +### ClientSub events + +```haskell +data ClientSub + = CSClient QueueId (Maybe ServiceId) (Maybe ServiceId) -- prev and new service IDs + | CSDeleted QueueId (Maybe ServiceId) -- prev service ID + | CSService ServiceId (Int64, IdsHash) -- service subscription change +``` +Source: Env/STM.hs:426-429 + +These are enqueued into `subQ` and processed by `serverThread` (the subscription event loop). + +### SUBS command flow + +``` +Client sends SUBS count idsHash + | + v +subscribeServiceMessages(serviceId, (count, idsHash)) Server.hs:1800 + | + +-- sharedSubscribeService(SRecipientService, ...) Server.hs:1849 + | | + | +-- If already subscribed: return cached (count, hash) + | | + | +-- First time: + | +-- getServiceQueueCountHash(party, serviceId) QueueStore + | | -> returns server's actual (count', idsHash') + | | + | +-- atomically: + | | writeTVar clientServiceSubscribed True + | | writeTVar clientServiceSubs (count', idsHash') + | | + | +-- Compute drift stats: + | | count == -1 && match -> srvSubOk++ (old NTF server) + | | diff > 0 -> srvSubMore++ (server has more) + | | diff < 0 -> srvSubFewer++ (server has fewer) + | | otherwise -> srvSubDiff++ (count match, hash mismatch) + | | + | +-- Enqueue CSService event to subQ + | + +-- If not already subscribed: + | fork "deliverServiceMessages" Server.hs:1806 + | | + | +-- foldRcvServiceMessages(serviceId, deliverQueueMsg, acc) + | | MsgStore + | +-- For each queue in service: + | | +-- Read queue record + first pending message + | | +-- Call deliverQueueMsg(acc, rId, result) Server.hs:1822 + | | | + | | +-- Error -> accumulate ERR + | | +-- No message -> skip + | | +-- Has message: + | | +-- getSubscription(rId) Server.hs:1835 + | | | If sub exists -> Nothing (skip, already delivering) + | | | Else -> create new Sub, insert in subscriptions + | | +-- setDelivered sub msg + | | +-- writeTBQueue msgQ [(corrId, rId, MSG ...)] + | | + | +-- After fold: write ALLS to msgQ + | + +-- Return SOKS count' idsHash' +``` + +### Per-queue SUB with service association + +`sharedSubscribeQueue` handles four cases (Server.hs:1738-1798): + +**Case 1: Service client, queue already associated with this service** (`queueServiceId == Just serviceId`) +- Duplicate association (retry after timeout/error) +- If no service sub exists yet, increment service queue count and enqueue CSClient +- Stats: `srvAssocDuplicate++` + +**Case 2: Service client, queue not yet associated** (new or different service) +- Call `setQueueService(queue, party, Just serviceId)` to update QueueStore +- Increment client's `serviceSubsCount` by `(1, queueIdHash rId)` +- Enqueue CSClient event +- Stats: `srvAssocNew++` or `srvAssocUpdated++` + +**Case 3: Non-service client, queue has service association** (downgrade) +- Call `setQueueService(queue, party, Nothing)` to remove association +- Stats: `srvAssocRemoved++` +- Create normal per-queue subscription + +**Case 4: Non-service client, no service association** (standard SUB) +- Create/return per-queue subscription as normal + +### Message delivery for service queues + +When a new message arrives for a queue (`tryDeliverMessage`, Server.hs:1985-2024): + +```haskell +getSubscribed = case rcvServiceId qr of + Just serviceId -> getSubscribedClient serviceId $ serviceSubscribers subscribers + Nothing -> getSubscribedClient rId $ queueSubscribers subscribers +``` + +If the queue has `rcvServiceId`, the server looks up the subscriber in `serviceSubscribers` (by ServiceId) rather than `queueSubscribers` (by QueueId). + +**On-demand Sub creation** (`newServiceDeliverySub`, Server.hs:2019-2024): When a message arrives for a service queue but no `Sub` exists in the client's `subscriptions` TMap, one is created on the fly. This handles messages arriving after SUBS but before the fold reaches that queue. + +### serverThread subscription event loop + +`serverThread` (Server.hs:250-351) processes `ClientSub` events from `subQ`: + +**CSClient** (per-queue subscription): +- If service association changed: end previous service subscription for that queue +- If new service: increment `totalServiceSubs`, end any per-queue subscriber, cancel previous service subscriber +- If no service: standard per-queue upsert + +**CSDeleted** (queue deletion): +- End both queue and service subscriptions + +**CSService** (bulk SUBS): +- Subtract changed subs from `totalServiceSubs` (because the client already has them counted) +- Cancel previous service subscriber for this ServiceId (sends ENDS to old client) + +**Service takeover** (`cancelServiceSubs`, Server.hs:317-321): +When a new service client subscribes (same ServiceId), the previous client's service subs are zeroed out: +```haskell +cancelServiceSubs serviceId = checkAnotherClient $ \c -> do + changedSubs <- swapTVar (clientServiceSubs c) (0, mempty) + pure [(c, CSADecreaseSubs changedSubs, (serviceId, ENDS n idsHash))] +``` +The previous client receives `ENDS count idsHash`. + +### Client disconnect cleanup + +`clientDisconnected` (Server.hs:1090-1121): +1. Set `connected = False` +2. Swap out all subscriptions and ntf subscriptions (clear TMap) +3. Cancel per-queue Subs +4. Update `queueSubscribers` (delete per-queue entries) and `serviceSubscribers` (delete service entry) +5. Subtract client's `serviceSubsCount` from `totalServiceSubs` +6. Kill delivery threads + +**Queue-service associations persist**: Only live subscription state is cleaned up. The `rcvServiceId` field on `QueueRec` and the `STMService` queue sets survive disconnect. On reconnection, `SUBS` resubscribes without re-associating. + +### Notification service subscription (`NSUBS`) + +`subscribeServiceNotifications` (Server.hs:1845-1847) is a thin wrapper around `sharedSubscribeService` with `SNotifierService` party. Unlike `SUBS`, it does NOT fork a delivery thread -- notification delivery is handled by the separate `deliverNtfsThread`. + +`deliverNtfsThread` (Server.hs:353) periodically scans `subClients` (which includes service subscribers) and delivers pending notifications. + +## QueueStore layer + +### getCreateService + +Lookup by certificate fingerprint; create if not found (Server/QueueStore/STM.hs:284-310): +1. `TM.lookup fp serviceCerts` -- fast IO lookup +2. If miss: STM transaction to double-check and create +3. If hit: verify service role matches; error `SERVICE` on role mismatch +4. On new service: log via store log + +### setQueueService + +Updates the `rcvServiceId` (or `ntfServiceId`) field on a `QueueRec` and maintains the service's queue set (Server/QueueStore/STM.hs:312-338): +1. Read queue record +2. If same service -> no-op +3. If different: `removeServiceQueue` from old, `addServiceQueue` to new +4. Update `QueueRec` in-place + +### addServiceQueue / removeServiceQueue + +Both use `setServiceQueues_` which XORs the queue's `queueIdHash` into the service's running hash (Server/QueueStore/STM.hs:383-398): +```haskell +update (s, idsHash) = + let !s' = updateSet qId s -- Set insert/delete + !idsHash' = queueIdHash qId <> idsHash -- XOR (self-inverse) + in (s', idsHash') +``` + +## Test coverage + +### Existing tests (ServerTests.hs) + +| Test | Lines | What it covers | +|------|-------|----------------| +| `testServiceDeliverSubscribe` | 682-742 | Create queue as service, reconnect, SUBS, message delivery, ALLS | +| `testServiceUpgradeAndDowngrade` | 744-859 | Regular SUB -> service SUB -> SUBS -> downgrade back to regular SUB | +| `testMessageServiceNotifications` | 1313-1388 | NSUB with service, service takeover (ENDS), NSUBS bulk subscribe | +| `testServiceNotificationsTwoRestarts` | 1390-1434 | NSUBS persistence across two server restarts | + +### Test gaps + +| Gap | Severity | Description | +|-----|----------|-------------| +| **TG-SVC-01** | High | No concurrent SUBS + regular SUB on same queue -- race between fold delivery and per-queue subscription | +| **TG-SVC-02** | High | No queue deletion during SUBS fold -- what happens when a queue is deleted mid-fold? | +| **TG-SVC-03** | Medium | No duplicate SUBS test -- what if client sends SUBS twice? (code returns cached count) | +| **TG-SVC-04** | Medium | No drift detection verification -- no test checks that stats are actually logged on count/hash mismatch | +| **TG-SVC-05** | Medium | No SUBS with 0 queues -- edge case where service has no associated queues | +| **TG-SVC-06** | Medium | No concurrent message delivery during fold -- messages sent while fold is in progress | +| **TG-SVC-07** | Low | No large-scale test -- fold performance with 10k+ queues | +| **TG-SVC-08** | Low | No test for `subtractServiceSubs` underflow (`n <= n'` -> `(0, mempty)`) | + +## Security invariants + +| ID | Invariant | Enforced by | Test | +|----|-----------|-------------|------| +| **SI-SVC-01** | Service certificate must match TLS peer certificate | `getClientService`: `getPeerCertChain c == cc` | Implicit in all service tests | +| **SI-SVC-02** | Service session key proof-of-possession: signed by X.509 key | `C.verifyX509 serviceCertKey exact` in `getClientService` | Implicit | +| **SI-SVC-03** | Only NEW, SUB, NSUB carry service signature | `useServiceAuth` pattern match | testServiceDeliverSubscribe (ERR SERVICE on unsigned) | +| **SI-SVC-04** | SUBS/NSUBS require service session key, not queue key | Entity is ServiceId, auth is service key | testServiceDeliverSubscribe (ERR CMD NO_AUTH on wrong key) | +| **SI-SVC-05** | Service role mismatch rejected | `getCreateService`: role check -> `Left SERVICE` | testServiceDeliverSubscribe (ERR SERVICE on wrong role) | +| **SI-SVC-06** | Non-service client cannot send SUBS | `ERR SERVICE` when no service handshake | testServiceUpgradeAndDowngrade (ERR SERVICE on plain client) | +| **SI-SVC-07** | Queue-service associations persist across disconnect | `clientDisconnected` only clears live state | testServiceNotificationsTwoRestarts | +| **SI-SVC-08** | Service takeover sends ENDS to previous client | `cancelServiceSubs` -> ENDS | testMessageServiceNotifications | +| **SI-SVC-09** | Drift is informational only -- server never rejects | `sharedSubscribeService` logs stats, always returns subs | No direct test (TG-SVC-04) | + +## Identified risks + +| ID | Risk | Severity | Description | +|----|------|----------|-------------| +| **R-SVC-01** | Postgres fold full table scan | High | `foldRcvServiceMessages` (Postgres.hs:127-139) uses `ROW_NUMBER() OVER (PARTITION BY recipient_id ORDER BY message_id ASC)` as a subquery joined to `msg_queues`. This window function scans the **entire `messages` table** before filtering. For a service with 100k+ queues and millions of messages, this query can be very slow. The STM backend iterates an in-memory Set (fast), and the Journal backend uses per-queue file locks (moderate). Only the Postgres path has this scaling problem. Consider rewriting to use a lateral join or per-queue subquery to avoid the full-table window. | +| **R-SVC-02** | `totalServiceSubs` accounting drift | Low | `totalServiceSubs` is incremented by `serverThread` when processing CSClient events (line 281), but `clientDisconnected` subtracts the full `clientServiceSubs` (line 1120) which was eagerly updated by `sharedSubscribeQueue`. If CSClient events are still pending in `subQ` at disconnect time, `totalServiceSubs` is decremented for increments that never happened, causing negative drift. `totalServiceSubs` is never read for any decision (only written), so this is cosmetic. Resets on server restart. Consider periodic reconciliation or removing the counter if unused. | +| **R-SVC-03** | Fold thread continues after service takeover | Needs analysis | When a second service client connects (same cert), `cancelServiceSubs` sends ENDS to the old client. But the old client's `deliverServiceMessages` fold thread (forked via `forkClient`, tracked in `endThreads`) keeps running -- it writes MSG to the old client's `msgQ` (captured in closure). The old client receives and can ACK these messages. After ALLS the thread exits. New messages route to the new client via `tryDeliverMessage`. Questions: (1) Can the old client's ACKs interfere with the new client's subscription state? (2) If the old client disconnects mid-fold, `clientDisconnected` kills the fold thread (line 1111) -- are partially-delivered Subs cleaned up correctly? (3) Could the fold's `getSubscription` (which inserts into old client's `subscriptions`) conflict with the old client's subscription TMap being swapped out by `clientDisconnected`? | +| **R-SVC-04** | Cert rotation = full re-association | Medium (operational) | `getCreateService` maps cert fingerprint -> ServiceId. A new cert = new fingerprint = new ServiceId. All existing queue associations remain on the old ServiceId. The service must re-SUB every queue with the new service signature -- O(n), exactly the cost SUBS was designed to avoid. Old fingerprint->ServiceId mappings remain in memory/DB (no GC). For a notification server with millions of queues, cert rotation means a full re-association storm. | +| **R-SVC-05** | Fold blocking | Low | `foldRcvServiceMessages` iterates all service queues sequentially, reading queue records and first messages. For services with many queues, this could take significant time. It runs in a forked thread, so it doesn't block the client's command processing, but the ALLS marker is delayed. No progress signal between SOKS and ALLS -- client doesn't know how many messages to expect. | +| **R-SVC-06** | XOR hash collision | Very Low | IdsHash uses XOR of MD5 hashes. XOR is commutative and associative, so different queue sets with the same XOR-combined hash would not be detected. Given 16-byte hashes, collision probability is negligible for realistic queue counts, but the hash provides no ordering information. | +| **R-SVC-07** | Count underflow in subtractServiceSubs | Very Low | If `n <= n'`, the function returns `(0, mempty)` -- a full reset. This is a defensive fallback but could mask accounting errors. | + +### Considered and dismissed + +- **Fold-delivery race**: Both the fold's `getSubscription` (Server.hs:1828) and `newServiceDeliverySub` (Server.hs:1999-2023) operate on the same `subscriptions clnt` TMap within `atomically` blocks. STM serialization ensures at most one creates the Sub; the other sees it and skips. No race exists. +- **Sub accumulation during fold**: Each service queue with a pending message gets a Sub created in the client's `subscriptions` TMap. This is necessary and correct -- the Sub holds the `delivered` TVar for ACK verification and `subThread` for delivery state. Without per-queue Subs the server cannot track what was delivered or verify ACKs. Subs are cleaned on ACK or disconnect. +- **Store log replay ordering**: `writeQueueStore` writes all services before queues. `addQueue_` (QueueStore/STM.hs:119-132) calls `addServiceQueue` when `rcvServiceId` is present in QueueRec, so snapshot replay correctly rebuilds STMService queue sets. Incremental `QueueService` log entries are always preceded by `NewService` because the handshake (which creates the service) happens before SUB (which associates queues). No ordering issue. + +--- + +## SMP Client layer (Client.hs) + +### Service subscription command + +```haskell +subscribeService :: (PartyI p, ServiceParty p) => SMPClient -> SParty p -> Int64 -> IdsHash -> ExceptT SMPClientError IO ServiceSub +subscribeService c party n idsHash = case smpClientService c of + Just THClientService {serviceId, serviceKey} -> do + sendSMPCommand c NRMBackground (Just (C.APrivateAuthKey C.SEd25519 serviceKey)) serviceId subCmd >>= \case + SOKS n' idsHash' -> pure $ ServiceSub serviceId n' idsHash' + r -> throwE $ unexpectedResponse r + where subCmd = case party of + SRecipientService -> SUBS n idsHash + SNotifierService -> NSUBS n idsHash + Nothing -> throwE PCEServiceUnavailable +``` +Source: Client.hs:921-934 + +Entity is `serviceId`, auth key is the service session key (Ed25519). The client passes its expected count and hash; the server returns its own. + +### Per-queue SUB with service + +`subscribeSMPQueue` (Client.hs:843-846) and `subscribeSMPQueues` (Client.hs:850-855) send `SUB` commands. The response handler `processSUBResponse_` (Client.hs:867-872) accepts both `OK` (no service) and `SOK serviceId_` (service-associated). + +`nsubResponse_` (Client.hs:914-918) does the same for `NSUB`. + +### Dual signature scheme (`authTransmission`) + +When `serviceAuth = True` and `useServiceAuth` returns True for the command (Client.hs:1385-1403): + +1. The entity key signs over `serviceCertHash || transmission` (not just transmission) +2. The service key signs over `transmission` alone + +This prevents MITM service substitution inside TLS: an attacker cannot replace the service certificate hash without invalidating the entity key signature. + +```haskell +(t', serviceSig) = case clientService =<< thAuth of + Just THClientService {serviceCertHash = XV.Fingerprint fp, serviceKey} | serviceAuth -> + (fp <> t, Just $ C.sign' serviceKey t) + _ -> (t, Nothing) +``` +Source: Client.hs:1398-1401 + +### Service runtime accessors + +```haskell +smpClientService :: SMPClient -> Maybe THClientService +smpClientService = thAuth . thParams >=> clientService + +smpClientServiceId :: SMPClient -> Maybe ServiceId +smpClientServiceId = fmap (\THClientService {serviceId} -> serviceId) . smpClientService +``` +Source: Client.hs:936-942 + +### Configuration + +`ProtocolClientConfig` (Client.hs:466-483) carries `serviceCredentials :: Maybe ServiceCredentials`. On handshake, the client generates a fresh Ed25519 key pair per connection and signs it with the service's X.509 key (via `mkClientService`). + +`serviceAuth` flag is set to `thVersion >= serviceCertsSMPVersion` (Client.hs:230), enabling dual signatures for all commands on v16+ connections. + +## Agent layer + +### Agent events + +Four service-specific events (Agent/Protocol.hs:401-404): + +| Event | Payload | When | +|-------|---------|------| +| `SERVICE_UP` | `SMPServer, ServiceSubResult` | SUBS succeeded; carries drift info | +| `SERVICE_DOWN` | `SMPServer, ServiceSub` | Server disconnected while service was subscribed | +| `SERVICE_ALL` | `SMPServer` | ALLS received — all buffered messages delivered | +| `SERVICE_END` | `SMPServer, ServiceSub` | ENDS received — another service client took over | + +### Service subscription flow (`Agent/Client.hs`) + +``` +subscribeClientService(c, withEvent, userId, srv, serviceSub) Client.hs:1743 + | + +-- withServiceClient(c, tSess, ...) Client.hs:1752 + | | + | +-- Get SMPClient for tSess + | +-- Check smpClientServiceId is Just -> smpServiceId + | + +-- setPendingServiceSub(tSess, serviceSub, currentSubs) TSessionSubs + | + +-- subscribeClientService_(c, withEvent, tSess, smp, serviceSub) Client.hs:1760 + | + +-- subscribeService smp SRecipientService n idsHash -> ServiceSub + +-- serviceSubResult expected subscribed -> ServiceSubResult + +-- atomically: setActiveServiceSub(tSess, sessId, subscribed) + +-- if withEvent: notify SERVICE_UP srv result +``` + +### Reconnection / resubscription (`Agent/Client.hs:1727-1740`) + +On service subscription failure during resubscription: +- `SSErrorServiceId` (server returned different ServiceId): fall back to `unassocSubscribeQueues` — removes all service associations for this server and resubscribes queues individually +- `clientServiceError`: same fallback +- Other errors: propagated + +### Startup subscription (`Agent.hs:1622-1641`) + +At agent startup, `subscribeService` is called in parallel per server. On `SSErrorServiceId` or `SSErrorQueueCount {n > 0, n' == 0}` (service exists but has no queues): falls back to unassociating queues and resubscribing individually. + +### Server disconnection (`Agent/Client.hs:787-800`) + +`serverDown` emits `SERVICE_DOWN`, then resubscribes: +- If session mode matches: full `resubscribeSMPSession` +- Otherwise: `resubscribeClientService` for service, then `subscribeQueues` for individual queues + +## TSessionSubs (Agent/TSessionSubs.hs) + +Per-session subscription state tracking, ~264 lines. + +```haskell +data SessSubs = SessSubs + { subsSessId :: TVar (Maybe SessionId), + activeSubs :: TMap RecipientId RcvQueueSub, + pendingSubs :: TMap RecipientId RcvQueueSub, + activeServiceSub :: TVar (Maybe ServiceSub), + pendingServiceSub :: TVar (Maybe ServiceSub) } +``` +Source: TSessionSubs.hs:59-65 + +Key operations: +- `setPendingServiceSub`: stores expected ServiceSub before SUBS is sent +- `setActiveServiceSub`: promotes to active after SOKS, validates session ID +- `updateActiveService`: increments count/hash when per-queue SUBs with service signature succeed (used by `Client/Agent.hs` when individual SUBs return `SOK(Just serviceId)`) +- `deleteServiceSub`: clears both active and pending (on ENDS) + +## Agent Store (AgentStore.hs) + +### `client_services` table + +```sql +CREATE TABLE client_services( + user_id INTEGER NOT NULL REFERENCES users ON DELETE CASCADE, + host TEXT NOT NULL, port TEXT NOT NULL, + server_key_hash BLOB, + service_cert BLOB NOT NULL, + service_cert_hash BLOB NOT NULL, + service_priv_key BLOB NOT NULL, + service_id BLOB, -- assigned by server, NULL until first handshake + service_queue_count INTEGER NOT NULL DEFAULT 0, + service_queue_ids_hash BLOB NOT NULL DEFAULT x'00000000000000000000000000000000' +); +``` +Source: Agent/Store/SQLite/Migrations/M20260115_service_certs.hs:11-23 + +### `rcv_queues.rcv_service_assoc` + +Boolean column added to `rcv_queues`. When set, the queue is associated with the service for this server. SQLite triggers automatically maintain `service_queue_count` and `service_queue_ids_hash` on insert/delete/update of `rcv_queues` rows. + +Triggers: `tr_rcv_queue_insert`, `tr_rcv_queue_delete`, `tr_rcv_queue_update_remove`, `tr_rcv_queue_update_add` (same migration file, lines 30-76). All use `simplex_xor_md5_combine` — the SQLite equivalent of Haskell's `queueIdHash <>`. + +### Key CRUD operations + +| Function | What it does | +|----------|--------------| +| `getClientServiceCredentials` | Load cert + key for a server; returns `Maybe ((KeyHash, TLS.Credential), Maybe ServiceId)` | +| `getSubscriptionService` | Load `ServiceSub` (serviceId, count, hash) for reconnection | +| `setClientServiceId` | Store ServiceId after first handshake | +| `setRcvServiceAssocs` | Mark queues as service-associated (sets `rcv_service_assoc = 1`) | +| `removeRcvServiceAssocs` | Remove service association for all queues on a server | +| `unassocUserServerRcvQueueSubs` | Remove association and return queues for re-subscription | + +Source: AgentStore.hs:419-494, 2378-2414 + +### Service ID nullification on cert change + +`INSERT ... ON CONFLICT DO UPDATE SET ... service_id = NULL` (AgentStore.hs:429) — when service credentials are updated (new cert), the stored `service_id` is cleared, forcing a new handshake to get a fresh ServiceId. + +## Notification server (Notifications/Server.hs) + +The NTF server is the primary consumer of service certificates for `SRNotifier` role. + +### Configuration + +`NtfServerConfig.useServiceCreds :: Bool` (Env.hs:80) — controls whether the NTF server uses service certificates for SMP subscriptions. + +### Credential generation + +On first use per SMP server, `mkDbService` (Env.hs:126-142) generates a self-signed TLS certificate (valid ~2400 days) and stores it in the `smp_servers` table. The cert is reused across connections to the same SMP server. + +### Startup subscription + +`subscribeSrvSubs` (Server.hs:460-481): +1. If service credentials exist: send NSUBS first (one command for all associated queues) +2. Then subscribe remaining individual queues in batches via `subscribeQueuesNtfs` + +### Event handling + +| Event | Handler | +|-------|---------| +| `CAServiceSubscribed` | Log count/hash match or mismatch | +| `CAServiceDisconnected` | Log disconnection | +| `CAServiceSubError` | Log error (non-fatal; fatal errors go to `CAServiceUnavailable`) | +| `CAServiceUnavailable` | **Critical recovery path**: calls `removeServiceAndAssociations`, wipes service creds, resubscribes all queues individually | + +Source: Server.hs:567-602 + +### `removeServiceAndAssociations` (Store/Postgres.hs:620-652) + +Nuclear recovery: clears `ntf_service_id`, `ntf_service_cert*`, resets `smp_notifier_count`/`smp_notifier_ids_hash`, and removes all `ntf_service_assoc` flags from subscriptions. Used when the service subscription is irrecoverably broken (e.g., ServiceId mismatch after cert rotation). + +### NTF Postgres schema + +The `smp_servers` table stores per-SMP-server state: +- `ntf_service_id`, `ntf_service_cert`, `ntf_service_cert_hash`, `ntf_service_priv_key` — service identity +- `smp_notifier_count`, `smp_notifier_ids_hash` — maintained by Postgres triggers on the `subscriptions` table + +Triggers use `xor_combine` (Postgres equivalent of XOR hash combine) and fire on `ntf_service_assoc` changes. + +## Agent test coverage + +### Existing tests + +| Test | File | What it covers | +|------|------|----------------| +| `testMigrateToServiceSubscriptions` | AgentTests/NotificationTests.hs:930-1016 | Full lifecycle: no service -> enable service (creates association) -> use service (NSUBS) -> disable service (downgrade to individual) -> re-enable | + +### Additional test gaps (Phase 3.0b) + +| Gap | Severity | Description | +|-----|----------|-------------| +| **TG-SVC-09** | Medium | No agent-level test for `SSErrorServiceId` recovery — the `unassocQueues` fallback path | +| **TG-SVC-10** | Medium | No agent-level test for concurrent reconnection — service resubscription racing with individual queue resubscription | +| **TG-SVC-11** | Medium | No test for `SERVICE_END` agent event handling — what does the agent do after receiving ENDS? | +| **TG-SVC-12** | Low | No test for SQLite trigger correctness — verifying `service_queue_count`/`service_queue_ids_hash` match expected values after insert/delete/update cycles |