fix(ingestor): subscribe to MQTT before startup maintenance, buffer until writer is free (#1608) (#1609)

## Summary

Closes #1608.

The ingestor's MQTT connect/subscribe loop ran **last** in `main()`,
after the synchronous startup-maintenance block. Because all writes
share a single SQLite writer (#1283), that maintenance — and the connect
loop after it — serialize behind any long-running async migration. The
subscription therefore came up minutes late (observed ~4.5 min after the
v3.8.3 `obs_observer_ts_idx_v1` index build over ~4.9M rows), and QoS-0
packets published in that window were dropped.

This decouples **receipt** from **write**:
- New `IngestBuffer` — a bounded FIFO drained by a **single** gated
consumer goroutine.
- The MQTT subscription is brought up first; its publish handler stamps
source liveness at receipt and enqueues a `handleMessage` closure.
- Startup maintenance runs, then `WaitForAsyncMigrations()`, then
`IngestBuffer.Ready()` opens the gate and the backlog drains.

A single consumer preserves the single-writer invariant (#1283);
buffering replays the original messages, so it introduces **no
duplicates** (unlike a QoS-1 broker queue). Broker-agnostic — helps
direct-connect and bridged operators alike.

## Changes

- `cmd/ingestor/ingest_buffer.go` — `IngestBuffer`
(`Submit`/`Start`/`Ready`/`Dropped`/`Pending`); non-blocking submit with
drop-on-full counter; single consumer.
- `cmd/ingestor/config.go` — `ingestBufferSize` knob (default 50000).
- `cmd/ingestor/main.go` — reorder boot: connect/subscribe **before**
startup maintenance; stamp liveness at receipt; `Ready()` after
maintenance + `WaitForAsyncMigrations()`; periodic stats log buffer
`pending`/`dropped`.

## Test plan

- [x] `go test ./...` in `cmd/ingestor` — `IngestBuffer` suite covers
gating-until-ready, FIFO order, drop-on-full, serial execution
(single-writer), and concurrent-submit.
- [ ] `go test -race` in CI (concurrency on `IngestBuffer`).
- [ ] Manual: restart with a pending heavy migration → `subscribed to
meshcore/#` appears within seconds; `[ingest-buffer] write path ready`
after the migration; packets received during the window are written
after `Ready()` (0 dropped under normal traffic); stall watchdog stays
quiet (liveness stamped at receipt).

## Out of scope

A hard crash while messages sit in the in-memory buffer still loses
them; crash-durability requires broker-side persistence, which is
topology-specific. This PR closes the startup-migration and deploy loss
windows.

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
efiten
2026-06-07 06:05:53 +02:00
committed by GitHub
parent 9612f08e46
commit 18810b5c13
5 changed files with 384 additions and 131 deletions
+17
View File
@@ -80,6 +80,12 @@ type Config struct {
// NeighborEdgesMaxAgeDays controls neighbor_edges row retention
// (#1287 — moved from cmd/server). 0 = default 5.
NeighborEdgesMaxAgeDays int `json:"neighborEdgesMaxAgeDays,omitempty"`
// IngestBufferSize caps the in-memory queue (number of MQTT messages) held
// while the single SQLite writer is blocked by startup migrations/prunes
// (#1608). Received messages are drained once the write path is ready.
// 0 / unset => default. Bounded memory.
IngestBufferSize int `json:"ingestBufferSize,omitempty"`
}
// NeighborEdgesDaysOrDefault returns the configured pruning window or 5.
@@ -90,6 +96,17 @@ func (c *Config) NeighborEdgesDaysOrDefault() int {
return c.NeighborEdgesMaxAgeDays
}
// IngestBufferSizeOrDefault returns the ingest buffer capacity. Default 50000:
// at typical mesh rates (~1-2 msg/s) that is many minutes of headroom while a
// startup migration holds the writer; each queued item is a small closure, so
// worst-case memory stays in the tens of MB.
func (c *Config) IngestBufferSizeOrDefault() int {
if c.IngestBufferSize > 0 {
return c.IngestBufferSize
}
return 50000
}
// GeoFilterConfig is an alias for the shared geofilter.Config type.
type GeoFilterConfig = geofilter.Config
+12
View File
@@ -484,3 +484,15 @@ func TestLoadConfigWSSource(t *testing.T) {
t.Errorf("ResolvedSources wss broker=%s, want unchanged", sources[1].Broker)
}
}
func TestIngestBufferSizeOrDefault(t *testing.T) {
if got := (&Config{}).IngestBufferSizeOrDefault(); got != 50000 {
t.Fatalf("default: want 50000, got %d", got)
}
if got := (&Config{IngestBufferSize: 10}).IngestBufferSizeOrDefault(); got != 10 {
t.Fatalf("override: want 10, got %d", got)
}
if got := (&Config{IngestBufferSize: -5}).IngestBufferSizeOrDefault(); got != 50000 {
t.Fatalf("invalid negative should fall back to default, got %d", got)
}
}
+79
View File
@@ -0,0 +1,79 @@
package main
import (
"log"
"sync"
"sync/atomic"
)
// IngestBuffer decouples MQTT message receipt from DB writes (#1608).
//
// On boot the ingestor must subscribe to MQTT immediately, but the single
// SQLite writer (#1283) can be held for minutes by a startup migration
// (e.g. a large CREATE INDEX) or prune. Without buffering, every QoS-0 packet
// received in that window is lost. IngestBuffer holds received work in a
// bounded FIFO and a single consumer goroutine drains it once Ready() is
// called — i.e. once the write path is free.
//
// A single consumer preserves the single-writer invariant: jobs run one at a
// time, exactly as paho's in-order handler did before. Submit never blocks the
// MQTT delivery goroutine; if the buffer is full it drops and counts (bounded
// memory). Buffering replays the original messages, so it introduces NO
// duplicates (contrast: a QoS-1 broker-queue would).
type IngestBuffer struct {
jobs chan func()
ready chan struct{}
dropped atomic.Int64
startOnce sync.Once
readyOnce sync.Once
}
// NewIngestBuffer returns a buffer holding up to capacity pending jobs.
func NewIngestBuffer(capacity int) *IngestBuffer {
if capacity < 1 {
capacity = 1
}
return &IngestBuffer{
jobs: make(chan func(), capacity),
ready: make(chan struct{}),
}
}
// Submit enqueues a job without blocking. If the buffer is full the job is
// dropped and the dropped counter is incremented. Safe for concurrent callers.
func (b *IngestBuffer) Submit(job func()) {
select {
case b.jobs <- job:
default:
n := b.dropped.Add(1)
if n == 1 || n%1000 == 0 {
log.Printf("[ingest-buffer] WARNING: buffer full, dropped %d message(s) — raise ingestBufferSize", n)
}
}
}
// Start launches the consumer goroutine. It blocks until Ready() is called,
// then drains buffered jobs and runs newly-submitted ones serially, in FIFO
// order. Idempotent.
func (b *IngestBuffer) Start() {
b.startOnce.Do(func() {
go func() {
<-b.ready
for job := range b.jobs {
job()
}
}()
})
}
// Ready signals that the write path is available; the consumer begins
// draining. Idempotent.
func (b *IngestBuffer) Ready() {
b.readyOnce.Do(func() { close(b.ready) })
}
// Dropped returns the number of jobs dropped due to a full buffer.
func (b *IngestBuffer) Dropped() int64 { return b.dropped.Load() }
// Pending returns the current queue depth (best-effort; for observability).
func (b *IngestBuffer) Pending() int { return len(b.jobs) }
+116
View File
@@ -0,0 +1,116 @@
package main
import (
"sync"
"sync/atomic"
"testing"
"time"
)
func TestIngestBuffer_BuffersUntilReady(t *testing.T) {
b := NewIngestBuffer(10)
var ran atomic.Int64
b.Start()
for i := 0; i < 3; i++ {
b.Submit(func() { ran.Add(1) })
}
time.Sleep(30 * time.Millisecond)
if ran.Load() != 0 {
t.Fatalf("jobs ran before Ready(): %d", ran.Load())
}
b.Ready()
deadline := time.Now().Add(time.Second)
for ran.Load() < 3 && time.Now().Before(deadline) {
time.Sleep(5 * time.Millisecond)
}
if ran.Load() != 3 {
t.Fatalf("want 3 ran after Ready, got %d", ran.Load())
}
}
func TestIngestBuffer_FIFOOrder(t *testing.T) {
b := NewIngestBuffer(10)
out := make(chan int, 5)
b.Start()
for i := 0; i < 5; i++ {
i := i
b.Submit(func() { out <- i })
}
b.Ready()
for want := 0; want < 5; want++ {
select {
case got := <-out:
if got != want {
t.Fatalf("order: want %d got %d", want, got)
}
case <-time.After(time.Second):
t.Fatalf("timeout waiting for job %d", want)
}
}
}
func TestIngestBuffer_DropsWhenFull(t *testing.T) {
b := NewIngestBuffer(2) // never Ready()'d -> nothing drains
for i := 0; i < 5; i++ {
b.Submit(func() {})
}
if got := b.Dropped(); got != 3 {
t.Fatalf("want 3 dropped (cap 2, 5 submitted), got %d", got)
}
}
func TestIngestBuffer_ProcessesAfterReady(t *testing.T) {
b := NewIngestBuffer(10)
b.Start()
b.Ready()
done := make(chan struct{})
b.Submit(func() { close(done) })
select {
case <-done:
case <-time.After(time.Second):
t.Fatal("job submitted after Ready was not processed")
}
}
func TestIngestBuffer_SerialExecution(t *testing.T) {
b := NewIngestBuffer(50)
var inFlight atomic.Int32
var overlap atomic.Bool
var wg sync.WaitGroup
b.Start()
const n = 20
wg.Add(n)
for i := 0; i < n; i++ {
b.Submit(func() {
if inFlight.Add(1) > 1 {
overlap.Store(true)
}
time.Sleep(time.Millisecond)
inFlight.Add(-1)
wg.Done()
})
}
b.Ready()
wg.Wait()
if overlap.Load() {
t.Fatal("jobs overlapped — consumer is not serial (violates single-writer)")
}
}
func TestIngestBuffer_ConcurrentSubmitSafe(t *testing.T) {
b := NewIngestBuffer(20000)
b.Start()
var wg sync.WaitGroup
for g := 0; g < 8; g++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 1000; i++ {
b.Submit(func() {})
}
}()
}
wg.Wait()
b.Ready()
// Assertion is the absence of a race/panic; run under -race in CI.
}
+160 -131
View File
@@ -75,6 +75,151 @@ func main() {
// Check auto_vacuum mode and optionally migrate (#919)
store.CheckAutoVacuum(cfg)
channelKeys := loadChannelKeys(cfg, *configPath)
if len(channelKeys) > 0 {
log.Printf("Loaded %d channel keys for GRP_TXT decryption", len(channelKeys))
} else {
log.Printf("No channel keys loaded — GRP_TXT packets will not be decrypted")
}
regionKeys := loadRegionKeys(cfg)
store.BackfillDefaultScopeAsync(regionKeys)
// Subscribe-early + buffer (#1608): the MQTT subscription is brought up
// before startup maintenance so no packets are missed while the single
// SQLite writer is blocked (e.g. a large CREATE INDEX migration). Received
// messages are buffered here and drained once Ready() is called below.
ingestBuffer := NewIngestBuffer(cfg.IngestBufferSizeOrDefault())
ingestBuffer.Start()
// Connect to each MQTT source
var clients []mqtt.Client
connectedCount := 0
for _, source := range sources {
tag := source.Name
if tag == "" {
tag = source.Broker
}
opts := buildMQTTOpts(source)
connectTimeout := source.ConnectTimeoutOrDefault()
log.Printf("MQTT [%s] connect timeout: %ds", tag, connectTimeout)
// Pre-allocate the liveness pointer so OnConnect can reset its
// stale-message clock on reconnect (PR #1216 r1 item 2). IsConnectedFn
// is wired below once the client exists.
liveness := &SourceLivenessState{
Tag: tag,
Broker: source.Broker,
}
opts.SetOnConnectHandler(func(c mqtt.Client) {
log.Printf("MQTT [%s] connected to %s", tag, source.Broker)
// PR #1216 r1 item 2: clear the stale LastMessageUnix from
// before the outage so the watchdog doesn't immediately scream
// "stalled for 2h". Also restarts the cold-start grace window
// and clears the alert cooldown so a fresh stall edge can fire.
liveness.MarkReconnected(time.Now())
topics := source.Topics
if len(topics) == 0 {
topics = []string{"meshcore/#"}
}
for _, t := range topics {
token := c.Subscribe(t, 0, nil)
token.Wait()
if token.Error() != nil {
log.Printf("MQTT [%s] subscribe error for %s: %v", tag, t, token.Error())
} else {
log.Printf("MQTT [%s] subscribed to %s", tag, t)
}
}
})
opts.SetConnectionLostHandler(func(c mqtt.Client, err error) {
log.Printf("MQTT [%s] disconnected from %s: %v", tag, source.Broker, err)
})
opts.SetReconnectingHandler(func(c mqtt.Client, options *mqtt.ClientOptions) {
log.Printf("MQTT [%s] reconnecting to %s", tag, source.Broker)
})
// Capture source for closure
src := source
opts.SetDefaultPublishHandler(func(c mqtt.Client, m mqtt.Message) {
// Stamp liveness at RECEIPT (not at processing) so the stall
// watchdog reflects broker liveness even while the buffer is
// gated during startup (#1608). handleMessage also stamps it,
// which is a harmless cheap re-store once draining begins.
markLivenessForTag(tag, time.Now())
ingestBuffer.Submit(func() {
handleMessage(store, tag, src, m, channelKeys, regionKeys, cfg)
})
})
client := mqtt.NewClient(opts)
// Wire IsConnectedFn now that the client exists, then register.
// Registration BEFORE Connect so the attempt counter is available
// to OnConnectAttempt on the very first dial.
liveness.IsConnectedFn = client.IsConnected
// #1335: wire force-reconnect so the watchdog can drop a
// half-open TCP socket and re-dial when paho.IsConnected==true
// but no messages have flowed past the stall threshold. Throttled
// per source by the watchdog itself (forceReconnectThrottle).
// Disconnect(250) gives in-flight publishes 250ms to drain;
// Connect() returns immediately and paho's reconnect machinery
// takes over from there. Captured-by-value `client` is the same
// pointer used everywhere else for this source.
liveness.ForceReconnectFn = func() {
client.Disconnect(250)
client.Connect()
}
// PR #1216 r2 item 3: tag collisions used to log.Fatalf, which
// killed the entire ingestor over one config typo and recreated
// the #1212 total-ingest-stop class this PR exists to prevent.
// registerLivenessOrSkip logs ERROR + skips liveness registration
// for the duplicate; the MQTT source still attempts to connect,
// it just isn't tracked by the watchdog. First registration
// remains authoritative.
registerLivenessOrSkip(liveness)
token := client.Connect()
// With ConnectRetry=true, token.Wait() blocks forever for unreachable brokers.
// WaitTimeout lets startup proceed; the client keeps retrying in the background
// and OnConnect fires (subscribing) when it eventually connects (#910).
if !token.WaitTimeout(time.Duration(connectTimeout) * time.Second) {
log.Printf("MQTT [%s] initial connection timed out — retrying in background", tag)
clients = append(clients, client)
continue
}
if token.Error() != nil {
log.Printf("MQTT [%s] connection failed (non-fatal): %v", tag, token.Error())
// BL1 fix: Disconnect to stop Paho's internal retry goroutines.
// With ConnectRetry=true, Connect() spawns background goroutines
// that leak if the client is simply discarded.
client.Disconnect(0)
continue
}
connectedCount++
clients = append(clients, client)
}
// BL2 fix: require at least one immediately-connected source. Timed-out
// clients are retrying in background (tracked in clients) but don't count
// as "connected" — a single unreachable broker must not silently run with
// zero active connections.
if connectedCount == 0 {
// Clean up any timed-out clients still retrying
for _, c := range clients {
c.Disconnect(0)
}
log.Fatal("no MQTT sources connected — all timed out or failed. Check broker is running (default: mqtt://localhost:1883). Set MQTT_BROKER env var or configure mqttSources in config.json")
}
if connectedCount < len(clients) {
log.Printf("Running — %d MQTT source(s) connected, %d retrying in background", connectedCount, len(clients)-connectedCount)
} else {
log.Printf("Running — %d MQTT source(s) connected", connectedCount)
}
// Node retention: move stale nodes to inactive_nodes on startup
nodeDays := cfg.NodeDaysOrDefault()
store.MoveStaleNodes(nodeDays)
@@ -103,6 +248,18 @@ func main() {
vacuumPages := cfg.IncrementalVacuumPages()
store.RunIncrementalVacuum(vacuumPages)
// Gate open: the synchronous startup writes above cannot return until the
// single SQLite writer is free, which means any blocking async migration
// (e.g. the CREATE INDEX) has finished. WaitForAsyncMigrations() makes that
// explicit. Now drain everything the subscription buffered during startup.
store.WaitForAsyncMigrations()
ingestBuffer.Ready()
if d := ingestBuffer.Dropped(); d > 0 {
log.Printf("[ingest-buffer] write path ready; draining backlog (dropped %d during startup — consider raising ingestBufferSize)", d)
} else {
log.Printf("[ingest-buffer] write path ready; draining backlog (0 dropped)")
}
// Daily ticker for node retention
retentionTicker := time.NewTicker(1 * time.Hour)
go func() {
@@ -192,6 +349,9 @@ func main() {
go func() {
for range statsTicker.C {
store.LogStats()
if d := ingestBuffer.Dropped(); d > 0 || ingestBuffer.Pending() > 0 {
log.Printf("[ingest-buffer] pending=%d dropped_total=%d", ingestBuffer.Pending(), d)
}
}
}()
@@ -238,137 +398,6 @@ func main() {
defer stopNeighborBuilder()
log.Printf("[neighbor-build] enabled (interval=%s)", NeighborEdgesBuilderInterval)
channelKeys := loadChannelKeys(cfg, *configPath)
if len(channelKeys) > 0 {
log.Printf("Loaded %d channel keys for GRP_TXT decryption", len(channelKeys))
} else {
log.Printf("No channel keys loaded — GRP_TXT packets will not be decrypted")
}
regionKeys := loadRegionKeys(cfg)
store.BackfillDefaultScopeAsync(regionKeys)
// Connect to each MQTT source
var clients []mqtt.Client
connectedCount := 0
for _, source := range sources {
tag := source.Name
if tag == "" {
tag = source.Broker
}
opts := buildMQTTOpts(source)
connectTimeout := source.ConnectTimeoutOrDefault()
log.Printf("MQTT [%s] connect timeout: %ds", tag, connectTimeout)
// Pre-allocate the liveness pointer so OnConnect can reset its
// stale-message clock on reconnect (PR #1216 r1 item 2). IsConnectedFn
// is wired below once the client exists.
liveness := &SourceLivenessState{
Tag: tag,
Broker: source.Broker,
}
opts.SetOnConnectHandler(func(c mqtt.Client) {
log.Printf("MQTT [%s] connected to %s", tag, source.Broker)
// PR #1216 r1 item 2: clear the stale LastMessageUnix from
// before the outage so the watchdog doesn't immediately scream
// "stalled for 2h". Also restarts the cold-start grace window
// and clears the alert cooldown so a fresh stall edge can fire.
liveness.MarkReconnected(time.Now())
topics := source.Topics
if len(topics) == 0 {
topics = []string{"meshcore/#"}
}
for _, t := range topics {
token := c.Subscribe(t, 0, nil)
token.Wait()
if token.Error() != nil {
log.Printf("MQTT [%s] subscribe error for %s: %v", tag, t, token.Error())
} else {
log.Printf("MQTT [%s] subscribed to %s", tag, t)
}
}
})
opts.SetConnectionLostHandler(func(c mqtt.Client, err error) {
log.Printf("MQTT [%s] disconnected from %s: %v", tag, source.Broker, err)
})
opts.SetReconnectingHandler(func(c mqtt.Client, options *mqtt.ClientOptions) {
log.Printf("MQTT [%s] reconnecting to %s", tag, source.Broker)
})
// Capture source for closure
src := source
opts.SetDefaultPublishHandler(func(c mqtt.Client, m mqtt.Message) {
handleMessage(store, tag, src, m, channelKeys, regionKeys, cfg)
})
client := mqtt.NewClient(opts)
// Wire IsConnectedFn now that the client exists, then register.
// Registration BEFORE Connect so the attempt counter is available
// to OnConnectAttempt on the very first dial.
liveness.IsConnectedFn = client.IsConnected
// #1335: wire force-reconnect so the watchdog can drop a
// half-open TCP socket and re-dial when paho.IsConnected==true
// but no messages have flowed past the stall threshold. Throttled
// per source by the watchdog itself (forceReconnectThrottle).
// Disconnect(250) gives in-flight publishes 250ms to drain;
// Connect() returns immediately and paho's reconnect machinery
// takes over from there. Captured-by-value `client` is the same
// pointer used everywhere else for this source.
liveness.ForceReconnectFn = func() {
client.Disconnect(250)
client.Connect()
}
// PR #1216 r2 item 3: tag collisions used to log.Fatalf, which
// killed the entire ingestor over one config typo and recreated
// the #1212 total-ingest-stop class this PR exists to prevent.
// registerLivenessOrSkip logs ERROR + skips liveness registration
// for the duplicate; the MQTT source still attempts to connect,
// it just isn't tracked by the watchdog. First registration
// remains authoritative.
registerLivenessOrSkip(liveness)
token := client.Connect()
// With ConnectRetry=true, token.Wait() blocks forever for unreachable brokers.
// WaitTimeout lets startup proceed; the client keeps retrying in the background
// and OnConnect fires (subscribing) when it eventually connects (#910).
if !token.WaitTimeout(time.Duration(connectTimeout) * time.Second) {
log.Printf("MQTT [%s] initial connection timed out — retrying in background", tag)
clients = append(clients, client)
continue
}
if token.Error() != nil {
log.Printf("MQTT [%s] connection failed (non-fatal): %v", tag, token.Error())
// BL1 fix: Disconnect to stop Paho's internal retry goroutines.
// With ConnectRetry=true, Connect() spawns background goroutines
// that leak if the client is simply discarded.
client.Disconnect(0)
continue
}
connectedCount++
clients = append(clients, client)
}
// BL2 fix: require at least one immediately-connected source. Timed-out
// clients are retrying in background (tracked in clients) but don't count
// as "connected" — a single unreachable broker must not silently run with
// zero active connections.
if connectedCount == 0 {
// Clean up any timed-out clients still retrying
for _, c := range clients {
c.Disconnect(0)
}
log.Fatal("no MQTT sources connected — all timed out or failed. Check broker is running (default: mqtt://localhost:1883). Set MQTT_BROKER env var or configure mqttSources in config.json")
}
if connectedCount < len(clients) {
log.Printf("Running — %d MQTT source(s) connected, %d retrying in background", connectedCount, len(clients)-connectedCount)
} else {
log.Printf("Running — %d MQTT source(s) connected", connectedCount)
}
// #1212: per-source stall watchdog. Detects "silently dead" sources
// where the client reports connected but no messages have flowed. Logs
// a WARN line every minute for any source silent for >5m. Scan every