mirror of
https://github.com/Kpa-clawbot/meshcore-analyzer.git
synced 2026-06-28 05:41:38 +00:00
fix(ingestor): subscribe to MQTT before startup maintenance, buffer until writer is free (#1608) (#1609)
## Summary Closes #1608. The ingestor's MQTT connect/subscribe loop ran **last** in `main()`, after the synchronous startup-maintenance block. Because all writes share a single SQLite writer (#1283), that maintenance — and the connect loop after it — serialize behind any long-running async migration. The subscription therefore came up minutes late (observed ~4.5 min after the v3.8.3 `obs_observer_ts_idx_v1` index build over ~4.9M rows), and QoS-0 packets published in that window were dropped. This decouples **receipt** from **write**: - New `IngestBuffer` — a bounded FIFO drained by a **single** gated consumer goroutine. - The MQTT subscription is brought up first; its publish handler stamps source liveness at receipt and enqueues a `handleMessage` closure. - Startup maintenance runs, then `WaitForAsyncMigrations()`, then `IngestBuffer.Ready()` opens the gate and the backlog drains. A single consumer preserves the single-writer invariant (#1283); buffering replays the original messages, so it introduces **no duplicates** (unlike a QoS-1 broker queue). Broker-agnostic — helps direct-connect and bridged operators alike. ## Changes - `cmd/ingestor/ingest_buffer.go` — `IngestBuffer` (`Submit`/`Start`/`Ready`/`Dropped`/`Pending`); non-blocking submit with drop-on-full counter; single consumer. - `cmd/ingestor/config.go` — `ingestBufferSize` knob (default 50000). - `cmd/ingestor/main.go` — reorder boot: connect/subscribe **before** startup maintenance; stamp liveness at receipt; `Ready()` after maintenance + `WaitForAsyncMigrations()`; periodic stats log buffer `pending`/`dropped`. ## Test plan - [x] `go test ./...` in `cmd/ingestor` — `IngestBuffer` suite covers gating-until-ready, FIFO order, drop-on-full, serial execution (single-writer), and concurrent-submit. - [ ] `go test -race` in CI (concurrency on `IngestBuffer`). - [ ] Manual: restart with a pending heavy migration → `subscribed to meshcore/#` appears within seconds; `[ingest-buffer] write path ready` after the migration; packets received during the window are written after `Ready()` (0 dropped under normal traffic); stall watchdog stays quiet (liveness stamped at receipt). ## Out of scope A hard crash while messages sit in the in-memory buffer still loses them; crash-durability requires broker-side persistence, which is topology-specific. This PR closes the startup-migration and deploy loss windows. --------- Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -80,6 +80,12 @@ type Config struct {
|
||||
// NeighborEdgesMaxAgeDays controls neighbor_edges row retention
|
||||
// (#1287 — moved from cmd/server). 0 = default 5.
|
||||
NeighborEdgesMaxAgeDays int `json:"neighborEdgesMaxAgeDays,omitempty"`
|
||||
|
||||
// IngestBufferSize caps the in-memory queue (number of MQTT messages) held
|
||||
// while the single SQLite writer is blocked by startup migrations/prunes
|
||||
// (#1608). Received messages are drained once the write path is ready.
|
||||
// 0 / unset => default. Bounded memory.
|
||||
IngestBufferSize int `json:"ingestBufferSize,omitempty"`
|
||||
}
|
||||
|
||||
// NeighborEdgesDaysOrDefault returns the configured pruning window or 5.
|
||||
@@ -90,6 +96,17 @@ func (c *Config) NeighborEdgesDaysOrDefault() int {
|
||||
return c.NeighborEdgesMaxAgeDays
|
||||
}
|
||||
|
||||
// IngestBufferSizeOrDefault returns the ingest buffer capacity. Default 50000:
|
||||
// at typical mesh rates (~1-2 msg/s) that is many minutes of headroom while a
|
||||
// startup migration holds the writer; each queued item is a small closure, so
|
||||
// worst-case memory stays in the tens of MB.
|
||||
func (c *Config) IngestBufferSizeOrDefault() int {
|
||||
if c.IngestBufferSize > 0 {
|
||||
return c.IngestBufferSize
|
||||
}
|
||||
return 50000
|
||||
}
|
||||
|
||||
// GeoFilterConfig is an alias for the shared geofilter.Config type.
|
||||
type GeoFilterConfig = geofilter.Config
|
||||
|
||||
|
||||
@@ -484,3 +484,15 @@ func TestLoadConfigWSSource(t *testing.T) {
|
||||
t.Errorf("ResolvedSources wss broker=%s, want unchanged", sources[1].Broker)
|
||||
}
|
||||
}
|
||||
|
||||
func TestIngestBufferSizeOrDefault(t *testing.T) {
|
||||
if got := (&Config{}).IngestBufferSizeOrDefault(); got != 50000 {
|
||||
t.Fatalf("default: want 50000, got %d", got)
|
||||
}
|
||||
if got := (&Config{IngestBufferSize: 10}).IngestBufferSizeOrDefault(); got != 10 {
|
||||
t.Fatalf("override: want 10, got %d", got)
|
||||
}
|
||||
if got := (&Config{IngestBufferSize: -5}).IngestBufferSizeOrDefault(); got != 50000 {
|
||||
t.Fatalf("invalid negative should fall back to default, got %d", got)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,79 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"log"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
// IngestBuffer decouples MQTT message receipt from DB writes (#1608).
|
||||
//
|
||||
// On boot the ingestor must subscribe to MQTT immediately, but the single
|
||||
// SQLite writer (#1283) can be held for minutes by a startup migration
|
||||
// (e.g. a large CREATE INDEX) or prune. Without buffering, every QoS-0 packet
|
||||
// received in that window is lost. IngestBuffer holds received work in a
|
||||
// bounded FIFO and a single consumer goroutine drains it once Ready() is
|
||||
// called — i.e. once the write path is free.
|
||||
//
|
||||
// A single consumer preserves the single-writer invariant: jobs run one at a
|
||||
// time, exactly as paho's in-order handler did before. Submit never blocks the
|
||||
// MQTT delivery goroutine; if the buffer is full it drops and counts (bounded
|
||||
// memory). Buffering replays the original messages, so it introduces NO
|
||||
// duplicates (contrast: a QoS-1 broker-queue would).
|
||||
type IngestBuffer struct {
|
||||
jobs chan func()
|
||||
ready chan struct{}
|
||||
dropped atomic.Int64
|
||||
startOnce sync.Once
|
||||
readyOnce sync.Once
|
||||
}
|
||||
|
||||
// NewIngestBuffer returns a buffer holding up to capacity pending jobs.
|
||||
func NewIngestBuffer(capacity int) *IngestBuffer {
|
||||
if capacity < 1 {
|
||||
capacity = 1
|
||||
}
|
||||
return &IngestBuffer{
|
||||
jobs: make(chan func(), capacity),
|
||||
ready: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Submit enqueues a job without blocking. If the buffer is full the job is
|
||||
// dropped and the dropped counter is incremented. Safe for concurrent callers.
|
||||
func (b *IngestBuffer) Submit(job func()) {
|
||||
select {
|
||||
case b.jobs <- job:
|
||||
default:
|
||||
n := b.dropped.Add(1)
|
||||
if n == 1 || n%1000 == 0 {
|
||||
log.Printf("[ingest-buffer] WARNING: buffer full, dropped %d message(s) — raise ingestBufferSize", n)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Start launches the consumer goroutine. It blocks until Ready() is called,
|
||||
// then drains buffered jobs and runs newly-submitted ones serially, in FIFO
|
||||
// order. Idempotent.
|
||||
func (b *IngestBuffer) Start() {
|
||||
b.startOnce.Do(func() {
|
||||
go func() {
|
||||
<-b.ready
|
||||
for job := range b.jobs {
|
||||
job()
|
||||
}
|
||||
}()
|
||||
})
|
||||
}
|
||||
|
||||
// Ready signals that the write path is available; the consumer begins
|
||||
// draining. Idempotent.
|
||||
func (b *IngestBuffer) Ready() {
|
||||
b.readyOnce.Do(func() { close(b.ready) })
|
||||
}
|
||||
|
||||
// Dropped returns the number of jobs dropped due to a full buffer.
|
||||
func (b *IngestBuffer) Dropped() int64 { return b.dropped.Load() }
|
||||
|
||||
// Pending returns the current queue depth (best-effort; for observability).
|
||||
func (b *IngestBuffer) Pending() int { return len(b.jobs) }
|
||||
@@ -0,0 +1,116 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestIngestBuffer_BuffersUntilReady(t *testing.T) {
|
||||
b := NewIngestBuffer(10)
|
||||
var ran atomic.Int64
|
||||
b.Start()
|
||||
for i := 0; i < 3; i++ {
|
||||
b.Submit(func() { ran.Add(1) })
|
||||
}
|
||||
time.Sleep(30 * time.Millisecond)
|
||||
if ran.Load() != 0 {
|
||||
t.Fatalf("jobs ran before Ready(): %d", ran.Load())
|
||||
}
|
||||
b.Ready()
|
||||
deadline := time.Now().Add(time.Second)
|
||||
for ran.Load() < 3 && time.Now().Before(deadline) {
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
}
|
||||
if ran.Load() != 3 {
|
||||
t.Fatalf("want 3 ran after Ready, got %d", ran.Load())
|
||||
}
|
||||
}
|
||||
|
||||
func TestIngestBuffer_FIFOOrder(t *testing.T) {
|
||||
b := NewIngestBuffer(10)
|
||||
out := make(chan int, 5)
|
||||
b.Start()
|
||||
for i := 0; i < 5; i++ {
|
||||
i := i
|
||||
b.Submit(func() { out <- i })
|
||||
}
|
||||
b.Ready()
|
||||
for want := 0; want < 5; want++ {
|
||||
select {
|
||||
case got := <-out:
|
||||
if got != want {
|
||||
t.Fatalf("order: want %d got %d", want, got)
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
t.Fatalf("timeout waiting for job %d", want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestIngestBuffer_DropsWhenFull(t *testing.T) {
|
||||
b := NewIngestBuffer(2) // never Ready()'d -> nothing drains
|
||||
for i := 0; i < 5; i++ {
|
||||
b.Submit(func() {})
|
||||
}
|
||||
if got := b.Dropped(); got != 3 {
|
||||
t.Fatalf("want 3 dropped (cap 2, 5 submitted), got %d", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestIngestBuffer_ProcessesAfterReady(t *testing.T) {
|
||||
b := NewIngestBuffer(10)
|
||||
b.Start()
|
||||
b.Ready()
|
||||
done := make(chan struct{})
|
||||
b.Submit(func() { close(done) })
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("job submitted after Ready was not processed")
|
||||
}
|
||||
}
|
||||
|
||||
func TestIngestBuffer_SerialExecution(t *testing.T) {
|
||||
b := NewIngestBuffer(50)
|
||||
var inFlight atomic.Int32
|
||||
var overlap atomic.Bool
|
||||
var wg sync.WaitGroup
|
||||
b.Start()
|
||||
const n = 20
|
||||
wg.Add(n)
|
||||
for i := 0; i < n; i++ {
|
||||
b.Submit(func() {
|
||||
if inFlight.Add(1) > 1 {
|
||||
overlap.Store(true)
|
||||
}
|
||||
time.Sleep(time.Millisecond)
|
||||
inFlight.Add(-1)
|
||||
wg.Done()
|
||||
})
|
||||
}
|
||||
b.Ready()
|
||||
wg.Wait()
|
||||
if overlap.Load() {
|
||||
t.Fatal("jobs overlapped — consumer is not serial (violates single-writer)")
|
||||
}
|
||||
}
|
||||
|
||||
func TestIngestBuffer_ConcurrentSubmitSafe(t *testing.T) {
|
||||
b := NewIngestBuffer(20000)
|
||||
b.Start()
|
||||
var wg sync.WaitGroup
|
||||
for g := 0; g < 8; g++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for i := 0; i < 1000; i++ {
|
||||
b.Submit(func() {})
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
b.Ready()
|
||||
// Assertion is the absence of a race/panic; run under -race in CI.
|
||||
}
|
||||
+160
-131
@@ -75,6 +75,151 @@ func main() {
|
||||
// Check auto_vacuum mode and optionally migrate (#919)
|
||||
store.CheckAutoVacuum(cfg)
|
||||
|
||||
channelKeys := loadChannelKeys(cfg, *configPath)
|
||||
if len(channelKeys) > 0 {
|
||||
log.Printf("Loaded %d channel keys for GRP_TXT decryption", len(channelKeys))
|
||||
} else {
|
||||
log.Printf("No channel keys loaded — GRP_TXT packets will not be decrypted")
|
||||
}
|
||||
|
||||
regionKeys := loadRegionKeys(cfg)
|
||||
store.BackfillDefaultScopeAsync(regionKeys)
|
||||
|
||||
// Subscribe-early + buffer (#1608): the MQTT subscription is brought up
|
||||
// before startup maintenance so no packets are missed while the single
|
||||
// SQLite writer is blocked (e.g. a large CREATE INDEX migration). Received
|
||||
// messages are buffered here and drained once Ready() is called below.
|
||||
ingestBuffer := NewIngestBuffer(cfg.IngestBufferSizeOrDefault())
|
||||
ingestBuffer.Start()
|
||||
|
||||
// Connect to each MQTT source
|
||||
var clients []mqtt.Client
|
||||
connectedCount := 0
|
||||
for _, source := range sources {
|
||||
tag := source.Name
|
||||
if tag == "" {
|
||||
tag = source.Broker
|
||||
}
|
||||
|
||||
opts := buildMQTTOpts(source)
|
||||
connectTimeout := source.ConnectTimeoutOrDefault()
|
||||
log.Printf("MQTT [%s] connect timeout: %ds", tag, connectTimeout)
|
||||
|
||||
// Pre-allocate the liveness pointer so OnConnect can reset its
|
||||
// stale-message clock on reconnect (PR #1216 r1 item 2). IsConnectedFn
|
||||
// is wired below once the client exists.
|
||||
liveness := &SourceLivenessState{
|
||||
Tag: tag,
|
||||
Broker: source.Broker,
|
||||
}
|
||||
|
||||
opts.SetOnConnectHandler(func(c mqtt.Client) {
|
||||
log.Printf("MQTT [%s] connected to %s", tag, source.Broker)
|
||||
// PR #1216 r1 item 2: clear the stale LastMessageUnix from
|
||||
// before the outage so the watchdog doesn't immediately scream
|
||||
// "stalled for 2h". Also restarts the cold-start grace window
|
||||
// and clears the alert cooldown so a fresh stall edge can fire.
|
||||
liveness.MarkReconnected(time.Now())
|
||||
topics := source.Topics
|
||||
if len(topics) == 0 {
|
||||
topics = []string{"meshcore/#"}
|
||||
}
|
||||
for _, t := range topics {
|
||||
token := c.Subscribe(t, 0, nil)
|
||||
token.Wait()
|
||||
if token.Error() != nil {
|
||||
log.Printf("MQTT [%s] subscribe error for %s: %v", tag, t, token.Error())
|
||||
} else {
|
||||
log.Printf("MQTT [%s] subscribed to %s", tag, t)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
opts.SetConnectionLostHandler(func(c mqtt.Client, err error) {
|
||||
log.Printf("MQTT [%s] disconnected from %s: %v", tag, source.Broker, err)
|
||||
})
|
||||
|
||||
opts.SetReconnectingHandler(func(c mqtt.Client, options *mqtt.ClientOptions) {
|
||||
log.Printf("MQTT [%s] reconnecting to %s", tag, source.Broker)
|
||||
})
|
||||
|
||||
// Capture source for closure
|
||||
src := source
|
||||
opts.SetDefaultPublishHandler(func(c mqtt.Client, m mqtt.Message) {
|
||||
// Stamp liveness at RECEIPT (not at processing) so the stall
|
||||
// watchdog reflects broker liveness even while the buffer is
|
||||
// gated during startup (#1608). handleMessage also stamps it,
|
||||
// which is a harmless cheap re-store once draining begins.
|
||||
markLivenessForTag(tag, time.Now())
|
||||
ingestBuffer.Submit(func() {
|
||||
handleMessage(store, tag, src, m, channelKeys, regionKeys, cfg)
|
||||
})
|
||||
})
|
||||
|
||||
client := mqtt.NewClient(opts)
|
||||
// Wire IsConnectedFn now that the client exists, then register.
|
||||
// Registration BEFORE Connect so the attempt counter is available
|
||||
// to OnConnectAttempt on the very first dial.
|
||||
liveness.IsConnectedFn = client.IsConnected
|
||||
// #1335: wire force-reconnect so the watchdog can drop a
|
||||
// half-open TCP socket and re-dial when paho.IsConnected==true
|
||||
// but no messages have flowed past the stall threshold. Throttled
|
||||
// per source by the watchdog itself (forceReconnectThrottle).
|
||||
// Disconnect(250) gives in-flight publishes 250ms to drain;
|
||||
// Connect() returns immediately and paho's reconnect machinery
|
||||
// takes over from there. Captured-by-value `client` is the same
|
||||
// pointer used everywhere else for this source.
|
||||
liveness.ForceReconnectFn = func() {
|
||||
client.Disconnect(250)
|
||||
client.Connect()
|
||||
}
|
||||
// PR #1216 r2 item 3: tag collisions used to log.Fatalf, which
|
||||
// killed the entire ingestor over one config typo and recreated
|
||||
// the #1212 total-ingest-stop class this PR exists to prevent.
|
||||
// registerLivenessOrSkip logs ERROR + skips liveness registration
|
||||
// for the duplicate; the MQTT source still attempts to connect,
|
||||
// it just isn't tracked by the watchdog. First registration
|
||||
// remains authoritative.
|
||||
registerLivenessOrSkip(liveness)
|
||||
token := client.Connect()
|
||||
// With ConnectRetry=true, token.Wait() blocks forever for unreachable brokers.
|
||||
// WaitTimeout lets startup proceed; the client keeps retrying in the background
|
||||
// and OnConnect fires (subscribing) when it eventually connects (#910).
|
||||
if !token.WaitTimeout(time.Duration(connectTimeout) * time.Second) {
|
||||
log.Printf("MQTT [%s] initial connection timed out — retrying in background", tag)
|
||||
clients = append(clients, client)
|
||||
continue
|
||||
}
|
||||
if token.Error() != nil {
|
||||
log.Printf("MQTT [%s] connection failed (non-fatal): %v", tag, token.Error())
|
||||
// BL1 fix: Disconnect to stop Paho's internal retry goroutines.
|
||||
// With ConnectRetry=true, Connect() spawns background goroutines
|
||||
// that leak if the client is simply discarded.
|
||||
client.Disconnect(0)
|
||||
continue
|
||||
}
|
||||
connectedCount++
|
||||
clients = append(clients, client)
|
||||
}
|
||||
|
||||
// BL2 fix: require at least one immediately-connected source. Timed-out
|
||||
// clients are retrying in background (tracked in clients) but don't count
|
||||
// as "connected" — a single unreachable broker must not silently run with
|
||||
// zero active connections.
|
||||
if connectedCount == 0 {
|
||||
// Clean up any timed-out clients still retrying
|
||||
for _, c := range clients {
|
||||
c.Disconnect(0)
|
||||
}
|
||||
log.Fatal("no MQTT sources connected — all timed out or failed. Check broker is running (default: mqtt://localhost:1883). Set MQTT_BROKER env var or configure mqttSources in config.json")
|
||||
}
|
||||
|
||||
if connectedCount < len(clients) {
|
||||
log.Printf("Running — %d MQTT source(s) connected, %d retrying in background", connectedCount, len(clients)-connectedCount)
|
||||
} else {
|
||||
log.Printf("Running — %d MQTT source(s) connected", connectedCount)
|
||||
}
|
||||
|
||||
// Node retention: move stale nodes to inactive_nodes on startup
|
||||
nodeDays := cfg.NodeDaysOrDefault()
|
||||
store.MoveStaleNodes(nodeDays)
|
||||
@@ -103,6 +248,18 @@ func main() {
|
||||
vacuumPages := cfg.IncrementalVacuumPages()
|
||||
store.RunIncrementalVacuum(vacuumPages)
|
||||
|
||||
// Gate open: the synchronous startup writes above cannot return until the
|
||||
// single SQLite writer is free, which means any blocking async migration
|
||||
// (e.g. the CREATE INDEX) has finished. WaitForAsyncMigrations() makes that
|
||||
// explicit. Now drain everything the subscription buffered during startup.
|
||||
store.WaitForAsyncMigrations()
|
||||
ingestBuffer.Ready()
|
||||
if d := ingestBuffer.Dropped(); d > 0 {
|
||||
log.Printf("[ingest-buffer] write path ready; draining backlog (dropped %d during startup — consider raising ingestBufferSize)", d)
|
||||
} else {
|
||||
log.Printf("[ingest-buffer] write path ready; draining backlog (0 dropped)")
|
||||
}
|
||||
|
||||
// Daily ticker for node retention
|
||||
retentionTicker := time.NewTicker(1 * time.Hour)
|
||||
go func() {
|
||||
@@ -192,6 +349,9 @@ func main() {
|
||||
go func() {
|
||||
for range statsTicker.C {
|
||||
store.LogStats()
|
||||
if d := ingestBuffer.Dropped(); d > 0 || ingestBuffer.Pending() > 0 {
|
||||
log.Printf("[ingest-buffer] pending=%d dropped_total=%d", ingestBuffer.Pending(), d)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -238,137 +398,6 @@ func main() {
|
||||
defer stopNeighborBuilder()
|
||||
log.Printf("[neighbor-build] enabled (interval=%s)", NeighborEdgesBuilderInterval)
|
||||
|
||||
channelKeys := loadChannelKeys(cfg, *configPath)
|
||||
if len(channelKeys) > 0 {
|
||||
log.Printf("Loaded %d channel keys for GRP_TXT decryption", len(channelKeys))
|
||||
} else {
|
||||
log.Printf("No channel keys loaded — GRP_TXT packets will not be decrypted")
|
||||
}
|
||||
|
||||
regionKeys := loadRegionKeys(cfg)
|
||||
store.BackfillDefaultScopeAsync(regionKeys)
|
||||
|
||||
// Connect to each MQTT source
|
||||
var clients []mqtt.Client
|
||||
connectedCount := 0
|
||||
for _, source := range sources {
|
||||
tag := source.Name
|
||||
if tag == "" {
|
||||
tag = source.Broker
|
||||
}
|
||||
|
||||
opts := buildMQTTOpts(source)
|
||||
connectTimeout := source.ConnectTimeoutOrDefault()
|
||||
log.Printf("MQTT [%s] connect timeout: %ds", tag, connectTimeout)
|
||||
|
||||
// Pre-allocate the liveness pointer so OnConnect can reset its
|
||||
// stale-message clock on reconnect (PR #1216 r1 item 2). IsConnectedFn
|
||||
// is wired below once the client exists.
|
||||
liveness := &SourceLivenessState{
|
||||
Tag: tag,
|
||||
Broker: source.Broker,
|
||||
}
|
||||
|
||||
opts.SetOnConnectHandler(func(c mqtt.Client) {
|
||||
log.Printf("MQTT [%s] connected to %s", tag, source.Broker)
|
||||
// PR #1216 r1 item 2: clear the stale LastMessageUnix from
|
||||
// before the outage so the watchdog doesn't immediately scream
|
||||
// "stalled for 2h". Also restarts the cold-start grace window
|
||||
// and clears the alert cooldown so a fresh stall edge can fire.
|
||||
liveness.MarkReconnected(time.Now())
|
||||
topics := source.Topics
|
||||
if len(topics) == 0 {
|
||||
topics = []string{"meshcore/#"}
|
||||
}
|
||||
for _, t := range topics {
|
||||
token := c.Subscribe(t, 0, nil)
|
||||
token.Wait()
|
||||
if token.Error() != nil {
|
||||
log.Printf("MQTT [%s] subscribe error for %s: %v", tag, t, token.Error())
|
||||
} else {
|
||||
log.Printf("MQTT [%s] subscribed to %s", tag, t)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
opts.SetConnectionLostHandler(func(c mqtt.Client, err error) {
|
||||
log.Printf("MQTT [%s] disconnected from %s: %v", tag, source.Broker, err)
|
||||
})
|
||||
|
||||
opts.SetReconnectingHandler(func(c mqtt.Client, options *mqtt.ClientOptions) {
|
||||
log.Printf("MQTT [%s] reconnecting to %s", tag, source.Broker)
|
||||
})
|
||||
|
||||
// Capture source for closure
|
||||
src := source
|
||||
opts.SetDefaultPublishHandler(func(c mqtt.Client, m mqtt.Message) {
|
||||
handleMessage(store, tag, src, m, channelKeys, regionKeys, cfg)
|
||||
})
|
||||
|
||||
client := mqtt.NewClient(opts)
|
||||
// Wire IsConnectedFn now that the client exists, then register.
|
||||
// Registration BEFORE Connect so the attempt counter is available
|
||||
// to OnConnectAttempt on the very first dial.
|
||||
liveness.IsConnectedFn = client.IsConnected
|
||||
// #1335: wire force-reconnect so the watchdog can drop a
|
||||
// half-open TCP socket and re-dial when paho.IsConnected==true
|
||||
// but no messages have flowed past the stall threshold. Throttled
|
||||
// per source by the watchdog itself (forceReconnectThrottle).
|
||||
// Disconnect(250) gives in-flight publishes 250ms to drain;
|
||||
// Connect() returns immediately and paho's reconnect machinery
|
||||
// takes over from there. Captured-by-value `client` is the same
|
||||
// pointer used everywhere else for this source.
|
||||
liveness.ForceReconnectFn = func() {
|
||||
client.Disconnect(250)
|
||||
client.Connect()
|
||||
}
|
||||
// PR #1216 r2 item 3: tag collisions used to log.Fatalf, which
|
||||
// killed the entire ingestor over one config typo and recreated
|
||||
// the #1212 total-ingest-stop class this PR exists to prevent.
|
||||
// registerLivenessOrSkip logs ERROR + skips liveness registration
|
||||
// for the duplicate; the MQTT source still attempts to connect,
|
||||
// it just isn't tracked by the watchdog. First registration
|
||||
// remains authoritative.
|
||||
registerLivenessOrSkip(liveness)
|
||||
token := client.Connect()
|
||||
// With ConnectRetry=true, token.Wait() blocks forever for unreachable brokers.
|
||||
// WaitTimeout lets startup proceed; the client keeps retrying in the background
|
||||
// and OnConnect fires (subscribing) when it eventually connects (#910).
|
||||
if !token.WaitTimeout(time.Duration(connectTimeout) * time.Second) {
|
||||
log.Printf("MQTT [%s] initial connection timed out — retrying in background", tag)
|
||||
clients = append(clients, client)
|
||||
continue
|
||||
}
|
||||
if token.Error() != nil {
|
||||
log.Printf("MQTT [%s] connection failed (non-fatal): %v", tag, token.Error())
|
||||
// BL1 fix: Disconnect to stop Paho's internal retry goroutines.
|
||||
// With ConnectRetry=true, Connect() spawns background goroutines
|
||||
// that leak if the client is simply discarded.
|
||||
client.Disconnect(0)
|
||||
continue
|
||||
}
|
||||
connectedCount++
|
||||
clients = append(clients, client)
|
||||
}
|
||||
|
||||
// BL2 fix: require at least one immediately-connected source. Timed-out
|
||||
// clients are retrying in background (tracked in clients) but don't count
|
||||
// as "connected" — a single unreachable broker must not silently run with
|
||||
// zero active connections.
|
||||
if connectedCount == 0 {
|
||||
// Clean up any timed-out clients still retrying
|
||||
for _, c := range clients {
|
||||
c.Disconnect(0)
|
||||
}
|
||||
log.Fatal("no MQTT sources connected — all timed out or failed. Check broker is running (default: mqtt://localhost:1883). Set MQTT_BROKER env var or configure mqttSources in config.json")
|
||||
}
|
||||
|
||||
if connectedCount < len(clients) {
|
||||
log.Printf("Running — %d MQTT source(s) connected, %d retrying in background", connectedCount, len(clients)-connectedCount)
|
||||
} else {
|
||||
log.Printf("Running — %d MQTT source(s) connected", connectedCount)
|
||||
}
|
||||
|
||||
// #1212: per-source stall watchdog. Detects "silently dead" sources
|
||||
// where the client reports connected but no messages have flowed. Logs
|
||||
// a WARN line every minute for any source silent for >5m. Scan every
|
||||
|
||||
Reference in New Issue
Block a user