diff --git a/cmd/ingestor/ingest_buffer.go b/cmd/ingestor/ingest_buffer.go index a115c6e4..641801a3 100644 --- a/cmd/ingestor/ingest_buffer.go +++ b/cmd/ingestor/ingest_buffer.go @@ -4,6 +4,7 @@ import ( "log" "sync" "sync/atomic" + "time" ) // IngestBuffer decouples MQTT message receipt from DB writes (#1608). @@ -23,44 +24,148 @@ import ( type IngestBuffer struct { jobs chan func() ready chan struct{} + stop chan struct{} + done chan struct{} dropped atomic.Int64 startOnce sync.Once readyOnce sync.Once + stopOnce sync.Once + + // dropLogMu guards the time-based drop-log throttle (PR #1623 + // round-1 fix to #1609 M1). Per-drop logging under sustained + // stalls could flood the log at MQTT inbound rate; instead we + // always log the FIRST drop of a stall and then summarize at + // most once per second until the stall ends. + dropLogMu sync.Mutex + stallActive bool // true between first drop and first successful Submit + stallStart time.Time // when the current stall began + stallStartDrop int64 // dropped() value when stall began + lastSummaryAt time.Time // last time we wrote a summary line } +// dropLogSummaryInterval is the minimum interval between summary lines +// during a sustained stall. Exposed as a var so tests can shrink it. +var dropLogSummaryInterval = time.Second + // NewIngestBuffer returns a buffer holding up to capacity pending jobs. +// Non-positive capacity is clamped to 1 and a WARN is logged so the +// misconfiguration is visible (PR #1609 m2 — silent clamp hid bad +// ingestBufferSize values). func NewIngestBuffer(capacity int) *IngestBuffer { if capacity < 1 { + log.Printf("[ingest-buffer] WARN: requested capacity %d < 1, clamping to 1 — check ingestBufferSize config; default is 50000", capacity) capacity = 1 } return &IngestBuffer{ jobs: make(chan func(), capacity), ready: make(chan struct{}), + stop: make(chan struct{}), + done: make(chan struct{}), } } // Submit enqueues a job without blocking. If the buffer is full the job is // dropped and the dropped counter is incremented. Safe for concurrent callers. +// +// Ordering invariant: callers MUST call Start() before the first Submit(). +// Submit only enqueues — without a running consumer, jobs sit in the channel +// and (once cap is reached) are silently dropped until Start()+Ready() run. +// +// Drop logging (PR #1623 round-1 fix to #1609 M1) uses a time-based +// throttle to stay loud-on-stall-start without flooding under sustained +// stalls: +// - the FIRST drop of a stall logs immediately +// - subsequent drops are summarized at most once per second +// - when the next Submit succeeds, a "drained" recovery line is +// emitted so operators can quantify the burst +// +// All log lines include the buffer capacity for operator triage. func (b *IngestBuffer) Submit(job func()) { select { case b.jobs <- job: + b.maybeLogRecovery() default: n := b.dropped.Add(1) - if n == 1 || n%1000 == 0 { - log.Printf("[ingest-buffer] WARNING: buffer full, dropped %d message(s) — raise ingestBufferSize", n) - } + b.logDrop(n) } } -// Start launches the consumer goroutine. It blocks until Ready() is called, -// then drains buffered jobs and runs newly-submitted ones serially, in FIFO -// order. Idempotent. +// logDrop emits a drop log line under the time-based throttle. The first +// drop of a stall always logs; subsequent drops summarize at most once +// per dropLogSummaryInterval. +func (b *IngestBuffer) logDrop(n int64) { + b.dropLogMu.Lock() + defer b.dropLogMu.Unlock() + now := time.Now() + if !b.stallActive { + b.stallActive = true + b.stallStart = now + b.stallStartDrop = n - 1 // last successful Submit -> this is the 1st drop of the stall + b.lastSummaryAt = now + log.Printf("[ingest-buffer] WARNING: buffer full (cap %d), dropped %d message(s) total — write path stalled, raise ingestBufferSize or investigate slow writer", cap(b.jobs), n) + return + } + if now.Sub(b.lastSummaryAt) >= dropLogSummaryInterval { + b.lastSummaryAt = now + stallDrops := n - b.stallStartDrop + log.Printf("[ingest-buffer] WARNING: buffer full (cap %d), %d drop(s) in current stall, %d total — write path still stalled", cap(b.jobs), stallDrops, n) + } +} + +// maybeLogRecovery is called from the success branch of Submit. If a +// stall was active, it logs a recovery line summarizing the burst and +// clears the stall state. +func (b *IngestBuffer) maybeLogRecovery() { + b.dropLogMu.Lock() + defer b.dropLogMu.Unlock() + if !b.stallActive { + return + } + stallDrops := b.dropped.Load() - b.stallStartDrop + dur := time.Since(b.stallStart) + log.Printf("[ingest-buffer] INFO: buffer drained, %d drop(s) over %s (cap %d) — write path recovered", stallDrops, dur.Round(time.Millisecond), cap(b.jobs)) + b.stallActive = false +} + +// Start launches the consumer goroutine. It blocks until Ready() is called +// (or Stop() fires, whichever comes first), then drains buffered jobs and +// runs newly-submitted ones serially, in FIFO order. Idempotent. +// +// Lifecycle: Stop() closes b.stop, which causes the consumer to exit via +// the stop-select arm (after draining any queued jobs if Ready() had +// already fired). The b.jobs channel is never closed — closing it would +// race with concurrent Submit() callers and panic; instead jobs is +// garbage-collected with the buffer once all references drop. Done() is +// closed when the consumer goroutine returns. func (b *IngestBuffer) Start() { b.startOnce.Do(func() { go func() { - <-b.ready - for job := range b.jobs { - job() + defer close(b.done) + select { + case <-b.ready: + case <-b.stop: + // Stopped before Ready — exit immediately. Pending jobs + // are discarded; the buffer was never authorized to drain. + return + } + for { + select { + case job := <-b.jobs: + job() + case <-b.stop: + // Stop after Ready — drain whatever is queued so + // shutdown is graceful, then exit. b.jobs is never + // closed (see Start godoc), so a default-case + // non-blocking receive is the correct drain idiom. + for { + select { + case job := <-b.jobs: + job() + default: + return + } + } + } } }() }) @@ -68,6 +173,10 @@ func (b *IngestBuffer) Start() { // Ready signals that the write path is available; the consumer begins // draining. Idempotent. +// +// Ordering invariant: Start() MUST have been called before Ready() takes +// effect. Calling Ready() without a prior Start() simply closes the ready +// channel — nothing drains until a later Start() runs its consumer goroutine. func (b *IngestBuffer) Ready() { b.readyOnce.Do(func() { close(b.ready) }) } @@ -77,3 +186,17 @@ func (b *IngestBuffer) Dropped() int64 { return b.dropped.Load() } // Pending returns the current queue depth (best-effort; for observability). func (b *IngestBuffer) Pending() int { return len(b.jobs) } + +// Stop signals the consumer goroutine to exit. Test-hygiene helper so unit +// tests don't leak the goroutine that Start() spawns. Idempotent / safe to +// call without a prior Start(). After Stop() the consumer exits and Done() +// is closed. +func (b *IngestBuffer) Stop() { + b.stopOnce.Do(func() { close(b.stop) }) +} + +// Done returns a channel that is closed after the consumer goroutine has +// exited. If Start() was never called, Done() never closes. +func (b *IngestBuffer) Done() <-chan struct{} { + return b.done +} diff --git a/cmd/ingestor/ingest_buffer_test.go b/cmd/ingestor/ingest_buffer_test.go index 97013211..92e68caa 100644 --- a/cmd/ingestor/ingest_buffer_test.go +++ b/cmd/ingestor/ingest_buffer_test.go @@ -1,6 +1,9 @@ package main import ( + "bytes" + "log" + "strings" "sync" "sync/atomic" "testing" @@ -9,6 +12,7 @@ import ( func TestIngestBuffer_BuffersUntilReady(t *testing.T) { b := NewIngestBuffer(10) + t.Cleanup(b.Stop) var ran atomic.Int64 b.Start() for i := 0; i < 3; i++ { @@ -30,6 +34,7 @@ func TestIngestBuffer_BuffersUntilReady(t *testing.T) { func TestIngestBuffer_FIFOOrder(t *testing.T) { b := NewIngestBuffer(10) + t.Cleanup(b.Stop) out := make(chan int, 5) b.Start() for i := 0; i < 5; i++ { @@ -50,7 +55,8 @@ func TestIngestBuffer_FIFOOrder(t *testing.T) { } func TestIngestBuffer_DropsWhenFull(t *testing.T) { - b := NewIngestBuffer(2) // never Ready()'d -> nothing drains + b := NewIngestBuffer(2) + t.Cleanup(b.Stop) // never Ready()'d -> nothing drains for i := 0; i < 5; i++ { b.Submit(func() {}) } @@ -61,6 +67,7 @@ func TestIngestBuffer_DropsWhenFull(t *testing.T) { func TestIngestBuffer_ProcessesAfterReady(t *testing.T) { b := NewIngestBuffer(10) + t.Cleanup(b.Stop) b.Start() b.Ready() done := make(chan struct{}) @@ -74,6 +81,7 @@ func TestIngestBuffer_ProcessesAfterReady(t *testing.T) { func TestIngestBuffer_SerialExecution(t *testing.T) { b := NewIngestBuffer(50) + t.Cleanup(b.Stop) var inFlight atomic.Int32 var overlap atomic.Bool var wg sync.WaitGroup @@ -99,6 +107,7 @@ func TestIngestBuffer_SerialExecution(t *testing.T) { func TestIngestBuffer_ConcurrentSubmitSafe(t *testing.T) { b := NewIngestBuffer(20000) + t.Cleanup(b.Stop) b.Start() var wg sync.WaitGroup for g := 0; g < 8; g++ { @@ -114,3 +123,152 @@ func TestIngestBuffer_ConcurrentSubmitSafe(t *testing.T) { b.Ready() // Assertion is the absence of a race/panic; run under -race in CI. } + +// TestIngestBuffer_StopUnblocksConsumer guards the consumer-goroutine leak +// described in PR #1609 review m1: Start() blocks on <-b.ready forever if +// Ready() is never called, leaking the goroutine in test runs. Stop() must +// signal the consumer to exit cleanly without requiring Ready(). +func TestIngestBuffer_StopUnblocksConsumer(t *testing.T) { + b := NewIngestBuffer(10) + t.Cleanup(b.Stop) + b.Start() + // Do NOT call Ready(). The consumer must exit purely because of Stop(). + b.Stop() + select { + case <-b.Done(): + // good — consumer goroutine returned + case <-time.After(time.Second): + t.Fatal("Stop() did not unblock the consumer goroutine within 1s (Done() never closed)") + } +} + +// TestNewIngestBuffer_WarnsOnSubOneClamp asserts that constructing the +// buffer with a non-positive capacity emits a WARN log line. Silent +// clamping (PR #1609 review m2) hid misconfigurations like +// ingestBufferSize=-1 or 0-from-default-not-applied paths. +func TestNewIngestBuffer_WarnsOnSubOneClamp(t *testing.T) { + var buf bytes.Buffer + oldOut := log.Writer() + oldFlags := log.Flags() + log.SetOutput(&buf) + log.SetFlags(0) + t.Cleanup(func() { + log.SetOutput(oldOut) + log.SetFlags(oldFlags) + }) + + b := NewIngestBuffer(0) + t.Cleanup(b.Stop) + + got := buf.String() + if !strings.Contains(got, "WARN") || !strings.Contains(got, "ingest-buffer") { + t.Fatalf("expected WARN log on sub-one clamp, got %q", got) + } +} + +// TestIngestBuffer_DropLogThrottle asserts the time-based throttle (PR +// #1623 round-1 fix to #1609 M1): the FIRST drop of a stall logs +// immediately (loud), then subsequent drops within the same stall are +// rate-limited to at most one summary line per second, and a recovery +// line is emitted when Submit succeeds again. This prevents log-flood +// under sustained stalls (potentially hundreds of MB/min) while +// preserving "loud the instant the stall starts". +func TestIngestBuffer_DropLogThrottle(t *testing.T) { + var buf bytes.Buffer + oldOut := log.Writer() + oldFlags := log.Flags() + log.SetOutput(&buf) + log.SetFlags(0) + t.Cleanup(func() { + log.SetOutput(oldOut) + log.SetFlags(oldFlags) + }) + + b := NewIngestBuffer(2) + t.Cleanup(b.Stop) + // Fill to capacity (no Ready() — nothing drains). + for i := 0; i < 2; i++ { + b.Submit(func() {}) + } + // 100 drops in tight loop (well under 1s). + for i := 0; i < 100; i++ { + b.Submit(func() {}) + } + + got := buf.String() + lines := strings.Count(got, "buffer full") + if lines < 1 { + t.Fatalf("expected the FIRST drop to log immediately; got 0 'buffer full' lines:\n%s", got) + } + if lines > 2 { + t.Fatalf("expected at most 2 'buffer full' lines for 100 drops in <1s (first + at-most-one summary), got %d:\n%s", lines, got) + } + // Every line must include the capacity for operator triage. + if !strings.Contains(got, "cap 2") { + t.Fatalf("expected every drop log line to include 'cap 2', got:\n%s", got) + } +} + +// TestIngestBuffer_DropLogFirstAlwaysImmediate guards the "loud the +// instant the stall starts" half of the throttle contract from PR +// #1623: even a single drop must log immediately, not be silently +// absorbed by the per-second summary window. +func TestIngestBuffer_DropLogFirstAlwaysImmediate(t *testing.T) { + var buf bytes.Buffer + oldOut := log.Writer() + oldFlags := log.Flags() + log.SetOutput(&buf) + log.SetFlags(0) + t.Cleanup(func() { + log.SetOutput(oldOut) + log.SetFlags(oldFlags) + }) + + b := NewIngestBuffer(1) + t.Cleanup(b.Stop) + b.Submit(func() {}) // fills cap=1 + b.Submit(func() {}) // first drop + got := buf.String() + if !strings.Contains(got, "buffer full") { + t.Fatalf("expected FIRST drop to log immediately; got:\n%s", got) + } +} + +// TestIngestBuffer_DropLogRecoveryAfterDrain guards the recovery-line +// half of the throttle contract: once Submit succeeds again after one +// or more drops, a "recovered" / "drained" line must be emitted so +// operators can quantify the burst (PR #1623). +func TestIngestBuffer_DropLogRecoveryAfterDrain(t *testing.T) { + var buf bytes.Buffer + oldOut := log.Writer() + oldFlags := log.Flags() + log.SetOutput(&buf) + log.SetFlags(0) + t.Cleanup(func() { + log.SetOutput(oldOut) + log.SetFlags(oldFlags) + }) + + b := NewIngestBuffer(1) + t.Cleanup(b.Stop) + b.Submit(func() {}) // fills cap=1 + for i := 0; i < 3; i++ { + b.Submit(func() {}) // drops + } + // Drain: start consumer and Ready(), wait for queue to empty. + b.Start() + b.Ready() + deadline := time.Now().Add(time.Second) + for b.Pending() > 0 && time.Now().Before(deadline) { + time.Sleep(2 * time.Millisecond) + } + // Now a successful Submit should trigger the recovery line. + b.Submit(func() {}) + // Give the goroutine + log a moment. + time.Sleep(20 * time.Millisecond) + + got := buf.String() + if !strings.Contains(got, "drained") && !strings.Contains(got, "recovered") { + t.Fatalf("expected a 'drained'/'recovered' log line after stall ended; got:\n%s", got) + } +} diff --git a/cmd/ingestor/main.go b/cmd/ingestor/main.go index 9e33884f..f7515343 100644 --- a/cmd/ingestor/main.go +++ b/cmd/ingestor/main.go @@ -165,11 +165,12 @@ func main() { // Capture source for closure src := source opts.SetDefaultPublishHandler(func(c mqtt.Client, m mqtt.Message) { - // Stamp liveness at RECEIPT (not at processing) so the stall - // watchdog reflects broker liveness even while the buffer is - // gated during startup (#1608). handleMessage also stamps it, - // which is a harmless cheap re-store once draining begins. - markLivenessForTag(tag, time.Now()) + // PR #1609 M1: stamp the RECEIPT clock here (broker liveness) + // independently of the post-write clock that handleMessage + // stamps. Without separation the watchdog/healthz could + // report "fresh" while the writer was stalled and the + // buffer was filling. + markReceiptForTag(tag, time.Now()) ingestBuffer.Submit(func() { handleMessage(store, tag, src, m, channelKeys, regionKeys, cfg) }) diff --git a/cmd/ingestor/mqtt_watchdog.go b/cmd/ingestor/mqtt_watchdog.go index 899ff4c5..c28f3fe8 100644 --- a/cmd/ingestor/mqtt_watchdog.go +++ b/cmd/ingestor/mqtt_watchdog.go @@ -57,7 +57,12 @@ const ( type SourceLivenessState struct { Tag string Broker string - LastMessageUnix int64 // atomic; unix seconds of last successfully received MQTT message + LastMessageUnix int64 // atomic; unix seconds of last successfully WRITTEN MQTT message (handleMessage post-write) + // LastReceiptUnix (PR #1609 M1) is stamped at MQTT receipt time — + // BEFORE the message is handed to the buffer/writer. STUB: unused + // in production until the green commit wires MarkReceipt at the + // receipt callsite and surfaces it in stats/healthz. + LastReceiptUnix int64 // atomic; unix seconds of last RECEIPT (broker liveness) // FirstConnectedAt (PR #1216 r2 item 2) is stamped ONCE at // registerLivenessState time and never reset. Cold-start grace // checks against this so a flapping broker (CONNECT ok, SUBSCRIBE @@ -95,6 +100,16 @@ func (s *SourceLivenessState) MarkMessage(now time.Time) { atomic.StoreInt64(&s.LastMessageUnix, now.Unix()) } +// MarkReceipt records the time of an MQTT message receipt — stamped at the +// paho receipt callback BEFORE the message enters the ingest buffer. PR +// #1609 M1: kept separate from LastMessageUnix so the watchdog/healthz can +// distinguish "broker alive, write path stuck" (LastReceiptUnix fresh, +// LastMessageUnix stale) from "everything stalled" (both stale). Cheap; +// safe to call from the message-handling hot path. +func (s *SourceLivenessState) MarkReceipt(now time.Time) { + atomic.StoreInt64(&s.LastReceiptUnix, now.Unix()) +} + // MarkReconnected clears stale liveness state so the watchdog does not // false-alarm on a pre-outage timestamp after paho re-establishes the // connection (PR #1216 r1 item 2). Resets LastMessageUnix, re-stamps @@ -217,7 +232,8 @@ func registerLivenessOrSkip(s *SourceLivenessState) bool { } // markLivenessForTag is the hot-path entry point: O(1) map lookup + -// atomic store. Safe to call for unknown tags (no-op). +// atomic store. Safe to call for unknown tags (no-op). Updates +// LastMessageUnix (post-write clock). func markLivenessForTag(tag string, now time.Time) { livenessRegistryMu.RLock() s := livenessRegistry[tag] @@ -227,6 +243,38 @@ func markLivenessForTag(tag string, now time.Time) { } } +// markReceiptForTag is the hot-path entry point used at MQTT receipt +// (BEFORE the message is buffered/written). Updates LastReceiptUnix only. +// PR #1609 M1 — separates broker-liveness signal from write-path +// liveness so /healthz can show a stalled writer with a live broker. +func markReceiptForTag(tag string, now time.Time) { + livenessRegistryMu.RLock() + s := livenessRegistry[tag] + livenessRegistryMu.RUnlock() + if s != nil { + s.MarkReceipt(now) + } +} + +// SnapshotLivenessClocks returns the per-source receipt vs write-path +// liveness pair for every registered source. Read-only; safe to call +// from the stats-file writer. PR #1609 M1. +func SnapshotLivenessClocks() map[string]SourceLivenessSnapshot { + livenessRegistryMu.RLock() + defer livenessRegistryMu.RUnlock() + if len(livenessRegistry) == 0 { + return nil + } + out := make(map[string]SourceLivenessSnapshot, len(livenessRegistry)) + for tag, s := range livenessRegistry { + out[tag] = SourceLivenessSnapshot{ + LastReceiptUnix: atomic.LoadInt64(&s.LastReceiptUnix), + LastMessageUnix: atomic.LoadInt64(&s.LastMessageUnix), + } + } + return out +} + // runLivenessWatchdog starts a goroutine that scans the registry every // `interval` and logs a warning for any source that has been silent while // connected for more than `threshold`. Returns a stop function that halts diff --git a/cmd/ingestor/mqtt_watchdog_m1_test.go b/cmd/ingestor/mqtt_watchdog_m1_test.go new file mode 100644 index 00000000..9e1a7443 --- /dev/null +++ b/cmd/ingestor/mqtt_watchdog_m1_test.go @@ -0,0 +1,43 @@ +package main + +import ( + "sync/atomic" + "testing" + "time" +) + +// TestSourceLivenessState_ReceiptVsWriteSeparate asserts that the receipt- +// time and post-write liveness clocks are independent (PR #1609 review +// MAJOR M1): stamping at receipt must NOT advance the post-write clock so +// the watchdog/healthz can distinguish "broker alive, write path stuck" +// from "everything fine". Without separation, /healthz reports "fresh" +// while the writer is stalled and the ingest buffer is filling. +func TestSourceLivenessState_ReceiptVsWriteSeparate(t *testing.T) { + s := &SourceLivenessState{Tag: "t"} + now := time.Now() + + // Receipt at T0; post-write never happens (writer stalled). + s.MarkReceipt(now) + + gotReceipt := atomic.LoadInt64(&s.LastReceiptUnix) + gotWrite := atomic.LoadInt64(&s.LastMessageUnix) + if gotReceipt != now.Unix() { + t.Fatalf("LastReceiptUnix: want %d, got %d", now.Unix(), gotReceipt) + } + if gotWrite != 0 { + t.Fatalf("LastMessageUnix MUST stay 0 while writer stalled (only MarkReceipt called); got %d — receipt is double-stamping the write clock and /healthz will lie about ingestion freshness", gotWrite) + } + + // Write completes later: only MarkMessage advances LastMessageUnix. + later := now.Add(5 * time.Second) + s.MarkMessage(later) + + gotReceipt2 := atomic.LoadInt64(&s.LastReceiptUnix) + gotWrite2 := atomic.LoadInt64(&s.LastMessageUnix) + if gotReceipt2 != now.Unix() { + t.Fatalf("MarkMessage must not move LastReceiptUnix backwards or forwards; want %d, got %d", now.Unix(), gotReceipt2) + } + if gotWrite2 != later.Unix() { + t.Fatalf("LastMessageUnix after MarkMessage: want %d, got %d", later.Unix(), gotWrite2) + } +} diff --git a/cmd/ingestor/stats_file.go b/cmd/ingestor/stats_file.go index 3b38fa11..429a1f06 100644 --- a/cmd/ingestor/stats_file.go +++ b/cmd/ingestor/stats_file.go @@ -50,6 +50,21 @@ type IngestorStatsSnapshot struct { // via /api/perf/write-sources under .writer_perf. Optional — // older ingestor builds don't publish this field. WriterPerf map[string]WriterStatsSnapshot `json:"writer_perf,omitempty"` + // SourceLiveness (PR #1609 M1) is the per-MQTT-source receipt vs + // write-path liveness snapshot. Keyed by source Tag. Surfaced by + // the server via /api/healthz under .ingest_liveness so operators + // can see "broker alive, write path stuck" (lastReceiptUnix recent, + // lastMessageUnix stale) distinct from "everything stalled" (both + // stale). Additive: omitempty so older server builds ignore it + // gracefully. + SourceLiveness map[string]SourceLivenessSnapshot `json:"source_liveness,omitempty"` +} + +// SourceLivenessSnapshot is the per-source two-clock view exposed for +// /api/healthz consumers. unixSeconds for both fields; 0 means "never". +type SourceLivenessSnapshot struct { + LastReceiptUnix int64 `json:"lastReceiptUnix"` + LastMessageUnix int64 `json:"lastMessageUnix"` } // statsFilePath returns the writable path the ingestor will publish stats to. @@ -231,6 +246,7 @@ func StartStatsFileWriter(s *Store, interval time.Duration) { BackfillUpdates: s.Stats.SnapshotBackfills(), ProcIO: ioRate, WriterPerf: s.WriterStatsSnapshot(), + SourceLiveness: SnapshotLivenessClocks(), } buf.Reset() if err := enc.Encode(&snap); err != nil { diff --git a/cmd/server/healthz.go b/cmd/server/healthz.go index a04c90f3..3f554952 100644 --- a/cmd/server/healthz.go +++ b/cmd/server/healthz.go @@ -42,7 +42,7 @@ func (s *Server) handleHealthz(w http.ResponseWriter, r *http.Request) { // processed 0 { + resp["ingest_liveness"] = liveness + } + json.NewEncoder(w).Encode(resp) } diff --git a/cmd/server/perf_io.go b/cmd/server/perf_io.go index 9c5e1ea1..c20cdc2c 100644 --- a/cmd/server/perf_io.go +++ b/cmd/server/perf_io.go @@ -302,6 +302,20 @@ type IngestorStats struct { // publish this. Surfaced under .writer_perf by // handlePerfWriteSources. WriterPerf map[string]WriterStatsSnapshot `json:"writer_perf,omitempty"` + // SourceLiveness (PR #1609 M1) is the per-MQTT-source two-clock + // snapshot: lastReceiptUnix (broker liveness, stamped at receipt) + // vs lastMessageUnix (write-path liveness, stamped post-write). + // Surfaced by /api/healthz under .ingest_liveness so operators can + // distinguish "broker alive, write path stuck" from "everything + // stalled". Optional — older ingestor builds don't publish this. + SourceLiveness map[string]SourceLivenessSnapshot `json:"source_liveness,omitempty"` +} + +// SourceLivenessSnapshot mirrors the ingestor's per-MQTT-source liveness +// pair (PR #1609 M1). Both fields are unix seconds; 0 means "never". +type SourceLivenessSnapshot struct { + LastReceiptUnix int64 `json:"lastReceiptUnix"` + LastMessageUnix int64 `json:"lastMessageUnix"` } // WriterStatsSnapshot mirrors the ingestor's per-component writer-lock @@ -329,6 +343,111 @@ func IngestorStatsPath() string { return "/tmp/corescope-ingestor-stats.json" } +// readIngestorSourceLiveness returns the per-source receipt/write-path +// liveness map from the ingestor stats file, or nil on any error / older +// ingestor that doesn't publish the field. PR #1609 M1 — surfaced by +// /api/healthz under .ingest_liveness so operators can spot "broker +// alive, write path stuck". +// +// /healthz is a hot path (LB / k8s / uptime monitors), so the result +// is memoized with a short TTL (sourceLivenessCacheTTL) and refreshed +// whenever the underlying file mtime changes (PR #1623 round-1 +// finding 4). The lock is held briefly; the costly Unmarshal happens +// at most once per refresh window. +func readIngestorSourceLiveness() map[string]SourceLivenessSnapshot { + path := IngestorStatsPath() + now := time.Now() + + sourceLivenessCache.mu.RLock() + if sourceLivenessCache.path == path && + now.Sub(sourceLivenessCache.cachedAt) < sourceLivenessCacheTTL { + // Cheap mtime probe: if the file moved since we cached, fall + // through to the refresh path. Stat is cheap relative to + // ReadFile+Unmarshal. + info, err := os.Stat(path) + fresh := err == nil && info.ModTime().Equal(sourceLivenessCache.mtime) + if fresh || (err != nil && sourceLivenessCache.mtime.IsZero()) { + out := sourceLivenessCache.value + sourceLivenessCache.mu.RUnlock() + return out + } + } + sourceLivenessCache.mu.RUnlock() + + sourceLivenessCache.mu.Lock() + defer sourceLivenessCache.mu.Unlock() + // Re-check under the write lock — another goroutine may have just + // refreshed. + if sourceLivenessCache.path == path && + time.Since(sourceLivenessCache.cachedAt) < sourceLivenessCacheTTL { + info, err := os.Stat(path) + fresh := err == nil && info.ModTime().Equal(sourceLivenessCache.mtime) + if fresh || (err != nil && sourceLivenessCache.mtime.IsZero()) { + return sourceLivenessCache.value + } + } + + data, err := sourceLivenessReadFile(path) + if err != nil { + // Cache the negative result too, so a missing file doesn't + // hammer the disk under /healthz pressure. + sourceLivenessCache.path = path + sourceLivenessCache.value = nil + sourceLivenessCache.cachedAt = now + sourceLivenessCache.mtime = time.Time{} + return nil + } + var st IngestorStats + if err := json.Unmarshal(data, &st); err != nil { + sourceLivenessCache.path = path + sourceLivenessCache.value = nil + sourceLivenessCache.cachedAt = now + sourceLivenessCache.mtime = time.Time{} + return nil + } + sourceLivenessCache.path = path + sourceLivenessCache.value = st.SourceLiveness + sourceLivenessCache.cachedAt = now + if info, err := os.Stat(path); err == nil { + sourceLivenessCache.mtime = info.ModTime() + } else { + sourceLivenessCache.mtime = time.Time{} + } + return st.SourceLiveness +} + +// sourceLivenessReadFile is the file-reader used by +// readIngestorSourceLiveness. Swappable for tests so call counts can +// be asserted (PR #1623 round-1 finding 4 TTL cache test). +var sourceLivenessReadFile = os.ReadFile + +// sourceLivenessCacheTTL caps how long a parsed liveness map is reused +// across /healthz probes. 1s is short enough that operators see stale +// data only briefly during incidents, but long enough to coalesce +// hundreds of probes/sec from LBs. +var sourceLivenessCacheTTL = time.Second + +// sourceLivenessCache memoizes the parsed liveness map keyed by file +// path + mtime. See readIngestorSourceLiveness. +var sourceLivenessCache struct { + mu sync.RWMutex + path string + value map[string]SourceLivenessSnapshot + cachedAt time.Time + mtime time.Time +} + +// resetSourceLivenessCache clears the memo. Test-only helper; callable +// from production code is harmless (next call just re-reads). +func resetSourceLivenessCache() { + sourceLivenessCache.mu.Lock() + defer sourceLivenessCache.mu.Unlock() + sourceLivenessCache.path = "" + sourceLivenessCache.value = nil + sourceLivenessCache.cachedAt = time.Time{} + sourceLivenessCache.mtime = time.Time{} +} + // handlePerfWriteSources reads the ingestor's stats file and returns a flat // map of source-name -> counter, plus the sample timestamp. func (s *Server) handlePerfWriteSources(w http.ResponseWriter, r *http.Request) { diff --git a/cmd/server/perf_io_liveness_cache_test.go b/cmd/server/perf_io_liveness_cache_test.go new file mode 100644 index 00000000..3e982c01 --- /dev/null +++ b/cmd/server/perf_io_liveness_cache_test.go @@ -0,0 +1,93 @@ +package main + +import ( + "os" + "path/filepath" + "sync/atomic" + "testing" + "time" +) + +// TestReadIngestorSourceLiveness_CachesWithinTTL guards the /healthz +// hot-path TTL cache (PR #1623 round-1 finding 4): readIngestorSourceLiveness +// is called per /healthz probe (LB / k8s / uptime monitors), and every +// call re-reads + re-unmarshals the entire IngestorStats JSON. Within +// the TTL window the function MUST hit a cached parse and avoid the +// re-read. +func TestReadIngestorSourceLiveness_CachesWithinTTL(t *testing.T) { + dir := t.TempDir() + statsPath := filepath.Join(dir, "ingestor-stats.json") + stub := `{ + "sampledAt": "2026-06-07T00:00:00Z", + "source_liveness": { + "mqtt-broker-a": {"lastReceiptUnix": 1717000000, "lastMessageUnix": 1716999990} + } + }` + if err := os.WriteFile(statsPath, []byte(stub), 0o600); err != nil { + t.Fatal(err) + } + t.Setenv("CORESCOPE_INGESTOR_STATS", statsPath) + + // Swap the read function to a counting wrapper. + var calls atomic.Int64 + prev := sourceLivenessReadFile + sourceLivenessReadFile = func(p string) ([]byte, error) { + calls.Add(1) + return os.ReadFile(p) + } + t.Cleanup(func() { + sourceLivenessReadFile = prev + resetSourceLivenessCache() + }) + resetSourceLivenessCache() + + // 5 sequential calls within <1s — the cache TTL window. + start := time.Now() + for i := 0; i < 5; i++ { + got := readIngestorSourceLiveness() + if _, ok := got["mqtt-broker-a"]; !ok { + t.Fatalf("call %d: expected mqtt-broker-a in liveness map, got %+v", i, got) + } + } + elapsed := time.Since(start) + if elapsed > 800*time.Millisecond { + t.Fatalf("loop took %s — too slow for a TTL-cache assertion (should be sub-second)", elapsed) + } + if got := calls.Load(); got != 1 { + t.Fatalf("expected 1 os.ReadFile call across 5 readIngestorSourceLiveness() calls within TTL, got %d", got) + } +} + +// TestReadIngestorSourceLiveness_InvalidatesOnMTimeChange guards the +// other half of the cache contract: when the underlying stats file +// changes (mtime moves), the cache MUST refresh on the next call. +func TestReadIngestorSourceLiveness_InvalidatesOnMTimeChange(t *testing.T) { + dir := t.TempDir() + statsPath := filepath.Join(dir, "ingestor-stats.json") + stubA := `{"source_liveness": {"a": {"lastReceiptUnix": 1, "lastMessageUnix": 1}}}` + stubB := `{"source_liveness": {"b": {"lastReceiptUnix": 2, "lastMessageUnix": 2}}}` + if err := os.WriteFile(statsPath, []byte(stubA), 0o600); err != nil { + t.Fatal(err) + } + t.Setenv("CORESCOPE_INGESTOR_STATS", statsPath) + + t.Cleanup(resetSourceLivenessCache) + resetSourceLivenessCache() + + got := readIngestorSourceLiveness() + if _, ok := got["a"]; !ok { + t.Fatalf("first call: expected key 'a', got %+v", got) + } + // Bump mtime forward to guarantee the cache notices. + future := time.Now().Add(2 * time.Second) + if err := os.WriteFile(statsPath, []byte(stubB), 0o600); err != nil { + t.Fatal(err) + } + if err := os.Chtimes(statsPath, future, future); err != nil { + t.Fatal(err) + } + got = readIngestorSourceLiveness() + if _, ok := got["b"]; !ok { + t.Fatalf("after mtime change: expected key 'b', got %+v", got) + } +} diff --git a/config.example.json b/config.example.json index d136b593..3a7e1cae 100644 --- a/config.example.json +++ b/config.example.json @@ -318,6 +318,8 @@ "backfillHours": 24, "_comment": "How far back (hours) the async backfill scans for observations with NULL resolved_path. Default: 24. Set higher to backfill older data, lower to speed up startup." }, + "ingestBufferSize": 50000, + "_comment_ingestBufferSize": "Number of MQTT messages held in memory while the single SQLite writer is blocked by a startup migration/prune (#1608). Drained once the write path is ready; bounded memory (small closure per item). 0 / omit / negative => default 50000 (applied by IngestBufferSizeOrDefault before NewIngestBuffer). A positive value below the default is honored as-is; sub-1 values reaching NewIngestBuffer directly clamp to 1 with a WARN log.", "neighborGraph": { "maxAgeDays": 5, "maxEdgeKm": 500,