6.2 KiB
Simplex.Messaging.Agent.NtfSubSupervisor
Supervisor-worker architecture for notification subscription lifecycle management.
Source: Agent/NtfSubSupervisor.hs
Architecture
The notification system uses a supervisor with three worker pools, each keyed by router address:
| Pool | Key | Purpose |
|---|---|---|
ntfWorkers |
NtfServer | Create/check/delete/rotate subscriptions on notification router |
ntfSMPWorkers |
SMPServer | Create/delete notifier credentials on messaging router |
ntfTknDelWorkers |
NtfServer | Delete tokens on notification router (background cleanup) |
The supervisor (runNtfSupervisor) reads commands from ntfSubQ and dispatches work to the appropriate pools. Workers are created lazily via getAgentWorker and process batches from the database.
Non-obvious behavior
1. NSCCreate four-way partition
partitionQueueSubActions classifies each (queue, subscription) pair into one of four buckets:
- New sub: no existing subscription record — create from scratch
- Reset sub: credentials mismatch (SMP router changed, notifier ID changed, action was nulled by error, or action is a delete) — wipe and restart from SMP key exchange
- Continue SMP work: existing action is
NSASMPand credentials are consistent — kick the SMP worker - Continue NTF work: existing action is
NSANtfand credentials are consistent — kick the NTF worker
The key decision point: when subAction_ is Nothing (set by workerErrors after permanent failures), the subscription is treated as needing a full reset. This interacts with the null-action sentinel pattern from AgentStore.
2. retrySubActions shrinking retry with TVar
retrySubActions holds the list of subs-to-retry in a TVar. Each iteration, the action function returns only the subs that got temporary errors (via splitResults). The TVar is overwritten with this shrinking list. On success or permanent error, subs drop out. This means retry batches get smaller over time.
splitResults implements a three-way partition: temporary or host errors → retry, permanent errors → null the action + notify, successes → continue pipeline.
3. rescheduleWork deferred wake-up
When the NTF worker finds that all pending NSACheck actions have future timestamps, it does not spin-wait. Instead it:
- Takes itself out of the
doWorkTMVar (so the worker blocks onwaitForWork) - Forks a thread that sleeps until the first action's timestamp
- The forked thread re-signals
doWorkwhen the time arrives
This is the mechanism for time-scheduled subscription health checks.
4. checkSubs AUTH triggers full recreation
When the notification router returns AUTH for a subscription check, the subscription is not simply marked as failed — it is fully recreated from scratch by resetting to NSASMP NSASmpKey state. This handles the case where the notification router has lost its subscription state (restart, data loss). The SMP worker is kicked to re-establish notifier credentials.
Successful check results with statuses not in subscribeNtfStatuses also trigger recreation via recreateNtfSub.
5. deleteToken two-phase with restart survival
Token deletion splits into two phases:
- Store phase: Remove token from active store, persist
(server, privateKey, tokenId)to a deletion queue viaaddNtfTokenToDelete - Network phase:
runNtfTknDelWorkerreads from the queue and performs the actual router-side deletion
On supervisor startup, startTknDelete scans for any pending deletion queue entries and launches workers. This ensures token cleanup survives agent restarts.
If the token has no router-side ID (ntfTokenId = Nothing), only the store phase runs — no worker is launched.
6. workerErrors nulls subscription action
When permanent (non-temporary, non-host) errors occur in batch operations, workerErrors sets the subscription's action to NULL in the database and notifies the client. The next NSCCreate for that connection will see subAction_ = Nothing in contOrReset and trigger a full subscription reset.
This null-action sentinel is the bridge between worker failure recovery and supervisor-driven re-creation.
7. NSADelete and NSARotate are deprecated
These NTF worker actions are no longer generated by current code but are kept for processing legacy database records. They are explicitly not batched (processed one at a time via mapM). NSARotate deletes the subscription then re-queues NSCCreate back to the supervisor.
8. Stats counting groups by userId
incStatByUserId groups batch subscriptions by userId before incrementing stats counters, ensuring per-user counts are accurate even when a single batch contains subscriptions from multiple users.
9. sendNtfSubCommand — gated on instant mode
sendNtfSubCommand only enqueues work if instant notifications are active (hasInstantNotifications checks NTActive status + NMInstant mode). In periodic mode, the entire subscription creation pipeline is dormant — no commands reach the supervisor.
10. deleteNotifierKeys — credential reset before disable
resetCredsGetQueue clears the queue's notification credentials in the store before sending the disable command to the SMP router. This "clean first" ordering means local state is already consistent even if the network call fails.
11. runNtfTknDelWorker — permanent error discards record
When token deletion gets a permanent (non-temporary, non-host) error, the deletion record is removed from the queue rather than retried. This prevents stuck deletion records from blocking the worker. The error is reported to the client.
12. getNtfServer — random selection from multiple
When multiple notification routers are configured, one is selected randomly using randomR with a session-stable TVar generator. Single-router configurations skip the randomness.
13. closeNtfSupervisor — atomic swap then cancel
swapTVar atomically replaces the workers map with empty, then cancels all extracted workers. This ensures all existing workers at the point of shutdown are captured for cancellation. Prevention of new work is handled by the supervisor loop termination and operation bracket lifecycle, not by the swap itself.