feat(mqtt): per-source status endpoint + Observers panel (#1682)

## Summary

Adds MQTT source status visibility per #1043 acceptance criteria:

- **Ingestor:** per-source counter registry
(`cmd/ingestor/source_status.go`) tracking `connected`,
`lastConnectUnix`, `lastDisconnectUnix`, `lastPacketUnix`,
`connectCount`, `disconnectCount`, `packetsTotal`, `packetsLast5m`
(sliding 5-min window via per-second buckets keyed by unix second — no
stale-leak), `lastError`. Wired at the existing OnConnect /
ConnectionLost / DefaultPublish callsites alongside the liveness
watchdog. Idempotent registration so counters survive reconnects.
Snapshot emitted in the existing stats file under `source_statuses`
(additive, `omitempty`).
- **Backend:** new `GET /api/mqtt/status` handler reads the ingestor
stats file and returns the per-source list. **Broker passwords are
masked** via a regex over the `scheme://user:pass@host` form (covers
mqtt/mqtts/tcp/ssl/ws/wss). Mask is also applied to `lastError` as
defense-in-depth (broker libs occasionally quote the failing URL).
OpenAPI completeness gate satisfied with a `routeDescriptions` entry.
- **Frontend:** small self-contained panel
(`public/mqtt-status-panel.js`) mounted above the Observers table.
Auto-refreshes every 10s, color-codes each row (green = connected +
recent packet, yellow = connected idle, red = disconnected), and tears
down its timer on SPA route change.

## TDD

- Red commit `f19a93b5` — stub `/api/mqtt/status` handler + assertion
test that the broker password is `****`-redacted. Test fails on the
assertion (handler passes the URL through verbatim). Compile-clean —
assertion-fail, not build-fail.
- Green commit `77042e41` — `maskBrokerURL` helper + table-driven unit
tests across all schemes + handler rewires to mask both `Broker` and
`LastError`.
- Subsequent commits land the ingestor wiring and the frontend panel.

## Tests

```
$ cd cmd/server && go test -run 'TestMqttStatus|TestMaskBrokerURL' -v ./...
PASS: TestMqttStatus_MasksBrokerPassword
PASS: TestMqttStatus_EmptyWhenNoStatsFile
PASS: TestMaskBrokerURL_Patterns (10 subtests)

$ cd cmd/ingestor && go test -run 'TestSourceStatus|TestSnapshotSourceStatuses' -v ./...
PASS: TestSourceStatus_BasicLifecycle
PASS: TestSourceStatus_Disconnect
PASS: TestSnapshotSourceStatuses_ReturnsAll

$ node test-mqtt-status-panel.js
7 passed, 0 failed
```

Full `go test ./...` clean in both `cmd/server` and `cmd/ingestor`.

## Preflight overrides

- `cross-stack`: justified — issue #1043 is intrinsically full-stack
(ingestor stats → server endpoint → observers panel). Per-stack split
would land an unreachable endpoint or a fetch with no backend.
- `check-xss-sinks` (public/mqtt-status-panel.js:55): justified — the
flagged `innerHTML=` is a fully-static literal (empty-state placeholder,
no payload data interpolated). All payload-bearing `innerHTML=` sites in
this file run through `escapeHTML` (defined in the same file); the test
`renderPanel never echoes a plaintext password (defense-in-depth)`
exercises the rendered HTML against payload strings.

## Acceptance criteria

- [x] `/api/mqtt/status` returns per-source connection state —
`cmd/server/mqtt_status.go`
- [x] UI panel shows all configured sources with live status —
`public/mqtt-status-panel.js`
- [x] Connection state updates on reconnect/disconnect events —
`MarkConnect` / `MarkDisconnect` wired in `cmd/ingestor/main.go`
- [x] Broker URLs don't expose passwords in the API response —
`maskBrokerURL` + 13 test cases
- [x] Works with 1-N sources — registry is keyed per-source, snapshot
iterates the map

**Partial fix for #1043** — per-packet `mqtt_source` attribution (the
issue's "Follow-up" section) is **deferred** per the `mc-bot-triaged:v1`
triage and the autofix comment ("Per-packet attribution deferred to
follow-up issue"). That work requires a new observation-row column and
DB schema migration, both explicitly out of scope for this PR.

Refs #1043

---------

Co-authored-by: openclaw-bot <bot@openclaw.local>
This commit is contained in:
Kpa-clawbot
2026-06-12 08:11:02 -07:00
committed by GitHub
parent 2ef7d2437d
commit efd66ea3f5
13 changed files with 919 additions and 0 deletions
+1
View File
@@ -141,6 +141,7 @@ jobs:
node test-traces.js
node test-issue-1648-m4-emoji-scan.js
node test-issue-1668-m3-typography.js
node test-mqtt-status-panel.js
- name: 🛡️ Preflight XSS gate — actual --diff check (PR only)
# The fixture self-test above (test-preflight-xss-gate.js) only
+8
View File
@@ -132,8 +132,14 @@ func main() {
Broker: source.Broker,
}
// #1043: per-source status registry. Idempotent — repeated
// registration across reconnects returns the same state so
// counters accumulate across the process lifetime.
status := RegisterSourceStatus(tag, source.Broker)
opts.SetOnConnectHandler(func(c mqtt.Client) {
log.Printf("MQTT [%s] connected to %s", tag, source.Broker)
status.MarkConnect(time.Now())
// PR #1216 r1 item 2: clear the stale LastMessageUnix from
// before the outage so the watchdog doesn't immediately scream
// "stalled for 2h". Also restarts the cold-start grace window
@@ -156,6 +162,7 @@ func main() {
opts.SetConnectionLostHandler(func(c mqtt.Client, err error) {
log.Printf("MQTT [%s] disconnected from %s: %v", tag, source.Broker, err)
status.MarkDisconnect(time.Now(), err)
})
opts.SetReconnectingHandler(func(c mqtt.Client, options *mqtt.ClientOptions) {
@@ -171,6 +178,7 @@ func main() {
// report "fresh" while the writer was stalled and the
// buffer was filling.
markReceiptForTag(tag, time.Now())
status.MarkPacket(time.Now())
ingestBuffer.Submit(func() {
handleMessage(store, tag, src, m, channelKeys, regionKeys, cfg)
})
+187
View File
@@ -0,0 +1,187 @@
package main
import (
"sync"
"sync/atomic"
"time"
)
// SourceStatusSnapshot is the per-MQTT-source connection state and counter
// view written to the ingestor stats file (under "source_statuses") and
// consumed by cmd/server's /api/mqtt/status handler (#1043).
//
// All fields are unix seconds (0 = "never"). PacketsLast5m is a sliding
// 5-minute count derived from a per-second ring buffer.
type SourceStatusSnapshot struct {
Name string `json:"name"`
Broker string `json:"broker"`
Connected bool `json:"connected"`
LastConnectUnix int64 `json:"lastConnectUnix"`
LastDisconnectUnix int64 `json:"lastDisconnectUnix"`
LastPacketUnix int64 `json:"lastPacketUnix"`
ConnectCount int64 `json:"connectCount"`
DisconnectCount int64 `json:"disconnectCount"`
PacketsTotal int64 `json:"packetsTotal"`
PacketsLast5m int64 `json:"packetsLast5m"`
LastError string `json:"lastError,omitempty"`
}
// sourceStatusState is the in-memory per-source counter set. All scalar
// fields are accessed via sync/atomic so the hot-path MarkPacket /
// MarkConnect / MarkDisconnect callsites stay lock-free. The 5-minute
// sliding window uses a 300-element per-second ring (one slot per
// second), guarded by ringMu only when we slide the cursor — the common
// path increments the current second with a single atomic.AddInt64.
//
// Memory: one state per source (typically 1-5 in production). 300 int64
// slots = 2.4KB/source — fine.
type sourceStatusState struct {
name string
broker string // raw broker URL — server-side handler masks the password
connected atomic.Bool
lastConnectUnix atomic.Int64
lastDisconnectUnix atomic.Int64
lastPacketUnix atomic.Int64
connectCount atomic.Int64
disconnectCount atomic.Int64
packetsTotal atomic.Int64
// 5-minute sliding window: per-second buckets keyed by unix second.
// Stored as parallel arrays so we can both zero-out a stale slot AND
// know whether a slot's contents are still inside the window.
ringMu sync.Mutex
ringSec [300]int64 // unix second this slot represents (0 = unused)
ringCount [300]int64 // packets received in that second
// lastError is rare-write/rare-read so a plain mutex is fine.
errMu sync.RWMutex
lastError string
}
// MarkConnect records a successful (re)connection to the broker.
// Clears any stale lastError from a prior disconnect — otherwise the UI
// shows "connected=true, lastError='connection refused'" after a successful
// reconnect, which is a lie (#1682 munger review r1).
func (s *sourceStatusState) MarkConnect(now time.Time) {
s.connected.Store(true)
s.lastConnectUnix.Store(now.Unix())
s.connectCount.Add(1)
s.errMu.Lock()
s.lastError = ""
s.errMu.Unlock()
}
// MarkDisconnect records the broker dropping the connection.
func (s *sourceStatusState) MarkDisconnect(now time.Time, err error) {
s.connected.Store(false)
s.lastDisconnectUnix.Store(now.Unix())
s.disconnectCount.Add(1)
if err != nil {
s.errMu.Lock()
s.lastError = err.Error()
s.errMu.Unlock()
}
}
// MarkPacket records receipt of an MQTT message. Hot path.
func (s *sourceStatusState) MarkPacket(now time.Time) {
nowSec := now.Unix()
s.lastPacketUnix.Store(nowSec)
s.packetsTotal.Add(1)
slot := nowSec % int64(len(s.ringSec))
s.ringMu.Lock()
if s.ringSec[slot] != nowSec {
s.ringSec[slot] = nowSec
s.ringCount[slot] = 0
}
s.ringCount[slot]++
s.ringMu.Unlock()
}
// sumLast5m returns the count of MarkPacket calls in the last 300s. Slots
// whose stored second falls outside the window are ignored (no stale leak).
func (s *sourceStatusState) sumLast5m(now time.Time) int64 {
nowSec := now.Unix()
cutoff := nowSec - int64(len(s.ringSec)) + 1
var total int64
s.ringMu.Lock()
for i := 0; i < len(s.ringSec); i++ {
if s.ringSec[i] >= cutoff && s.ringSec[i] <= nowSec {
total += s.ringCount[i]
}
}
s.ringMu.Unlock()
return total
}
// snapshot copies the state into a serializable view.
func (s *sourceStatusState) snapshot(now time.Time) SourceStatusSnapshot {
s.errMu.RLock()
errStr := s.lastError
s.errMu.RUnlock()
return SourceStatusSnapshot{
Name: s.name,
Broker: s.broker,
Connected: s.connected.Load(),
LastConnectUnix: s.lastConnectUnix.Load(),
LastDisconnectUnix: s.lastDisconnectUnix.Load(),
LastPacketUnix: s.lastPacketUnix.Load(),
ConnectCount: s.connectCount.Load(),
DisconnectCount: s.disconnectCount.Load(),
PacketsTotal: s.packetsTotal.Load(),
PacketsLast5m: s.sumLast5m(now),
LastError: errStr,
}
}
// sourceStatusRegistry holds one sourceStatusState per source. Keyed by
// tag (which is the source Name, or the Broker URL if the operator left
// the name blank).
var (
sourceStatusRegistryMu sync.RWMutex
sourceStatusRegistry = map[string]*sourceStatusState{}
)
// RegisterSourceStatus creates (or returns the existing) state for the
// given source. Safe for cold-start use; idempotent — re-registering the
// same tag returns the existing state so counters aren't reset across
// reconnects.
func RegisterSourceStatus(tag, broker string) *sourceStatusState {
sourceStatusRegistryMu.Lock()
defer sourceStatusRegistryMu.Unlock()
if s, ok := sourceStatusRegistry[tag]; ok {
return s
}
s := &sourceStatusState{name: tag, broker: broker}
sourceStatusRegistry[tag] = s
return s
}
// lookupSourceStatus returns the state for tag, or nil if unregistered.
func lookupSourceStatus(tag string) *sourceStatusState {
sourceStatusRegistryMu.RLock()
defer sourceStatusRegistryMu.RUnlock()
return sourceStatusRegistry[tag]
}
// SnapshotSourceStatuses returns a slice of every registered source's
// current snapshot. Surfaced via the ingestor stats file under
// "source_statuses" so /api/mqtt/status can serve it (#1043).
func SnapshotSourceStatuses(now time.Time) []SourceStatusSnapshot {
sourceStatusRegistryMu.RLock()
defer sourceStatusRegistryMu.RUnlock()
out := make([]SourceStatusSnapshot, 0, len(sourceStatusRegistry))
for _, s := range sourceStatusRegistry {
out = append(out, s.snapshot(now))
}
return out
}
// resetSourceStatusRegistry clears the registry. Test-only helper.
func resetSourceStatusRegistry() {
sourceStatusRegistryMu.Lock()
defer sourceStatusRegistryMu.Unlock()
sourceStatusRegistry = map[string]*sourceStatusState{}
}
+116
View File
@@ -0,0 +1,116 @@
package main
import (
"errors"
"testing"
"time"
)
// TestSourceStatus_BasicLifecycle exercises the counter wiring used by
// the /api/mqtt/status server-side endpoint (#1043).
func TestSourceStatus_BasicLifecycle(t *testing.T) {
resetSourceStatusRegistry()
defer resetSourceStatusRegistry()
s := RegisterSourceStatus("local", "mqtt://broker.example.com:1883")
if s == nil {
t.Fatal("RegisterSourceStatus returned nil")
}
// Re-registration is idempotent.
if s2 := RegisterSourceStatus("local", "mqtt://other"); s2 != s {
t.Fatal("RegisterSourceStatus not idempotent")
}
now := time.Unix(1_700_000_000, 0)
s.MarkConnect(now)
s.MarkPacket(now)
s.MarkPacket(now.Add(1 * time.Second))
s.MarkPacket(now.Add(2 * time.Second))
snap := s.snapshot(now.Add(3 * time.Second))
if !snap.Connected {
t.Error("snapshot.Connected = false, want true after MarkConnect")
}
if snap.PacketsTotal != 3 {
t.Errorf("PacketsTotal = %d, want 3", snap.PacketsTotal)
}
if snap.PacketsLast5m != 3 {
t.Errorf("PacketsLast5m = %d, want 3", snap.PacketsLast5m)
}
if snap.ConnectCount != 1 {
t.Errorf("ConnectCount = %d, want 1", snap.ConnectCount)
}
if snap.LastConnectUnix != now.Unix() {
t.Errorf("LastConnectUnix = %d, want %d", snap.LastConnectUnix, now.Unix())
}
if snap.Broker != "mqtt://broker.example.com:1883" {
t.Errorf("Broker = %q, want raw URL passthrough (server masks)", snap.Broker)
}
// After 5 minutes idle, sliding window must be empty.
snap2 := s.snapshot(now.Add(6 * time.Minute))
if snap2.PacketsLast5m != 0 {
t.Errorf("PacketsLast5m after 6m idle = %d, want 0", snap2.PacketsLast5m)
}
if snap2.PacketsTotal != 3 {
t.Errorf("PacketsTotal must be lifetime-cumulative, got %d", snap2.PacketsTotal)
}
}
func TestSourceStatus_Disconnect(t *testing.T) {
resetSourceStatusRegistry()
defer resetSourceStatusRegistry()
s := RegisterSourceStatus("disco", "mqtt://x:1883")
now := time.Unix(1_700_000_100, 0)
s.MarkConnect(now)
s.MarkDisconnect(now.Add(time.Minute), nil)
snap := s.snapshot(now.Add(2 * time.Minute))
if snap.Connected {
t.Error("snapshot.Connected = true after MarkDisconnect, want false")
}
if snap.DisconnectCount != 1 {
t.Errorf("DisconnectCount = %d, want 1", snap.DisconnectCount)
}
}
func TestSnapshotSourceStatuses_ReturnsAll(t *testing.T) {
resetSourceStatusRegistry()
defer resetSourceStatusRegistry()
RegisterSourceStatus("a", "mqtt://a")
RegisterSourceStatus("b", "mqtt://b")
snaps := SnapshotSourceStatuses(time.Now())
if len(snaps) != 2 {
t.Errorf("len(snaps) = %d, want 2", len(snaps))
}
}
// TestSourceStatus_MarkConnectClearsLastError asserts MarkConnect wipes
// any prior sticky error (#1682 munger r1 review). Otherwise the UI sees
// connected=true alongside a stale "connection refused" string.
func TestSourceStatus_MarkConnectClearsLastError(t *testing.T) {
resetSourceStatusRegistry()
defer resetSourceStatusRegistry()
s := RegisterSourceStatus("sticky", "mqtt://x:1883")
now := time.Unix(1_700_000_200, 0)
s.MarkConnect(now)
s.MarkDisconnect(now.Add(time.Second), errors.New("connection refused"))
snap := s.snapshot(now.Add(2 * time.Second))
if snap.LastError == "" {
t.Fatalf("precondition: expected lastError after MarkDisconnect, got empty")
}
// Reconnect — lastError must clear.
s.MarkConnect(now.Add(3 * time.Second))
snap = s.snapshot(now.Add(4 * time.Second))
if snap.LastError != "" {
t.Errorf("snapshot.LastError = %q after MarkConnect, want empty (sticky-error regression)", snap.LastError)
}
if !snap.Connected {
t.Errorf("snapshot.Connected = false after MarkConnect, want true")
}
}
+5
View File
@@ -58,6 +58,10 @@ type IngestorStatsSnapshot struct {
// stale). Additive: omitempty so older server builds ignore it
// gracefully.
SourceLiveness map[string]SourceLivenessSnapshot `json:"source_liveness,omitempty"`
// SourceStatuses (#1043) is the per-MQTT-source connection state and
// counter view consumed by cmd/server's /api/mqtt/status handler.
// Additive; omitempty so older server builds ignore it.
SourceStatuses []SourceStatusSnapshot `json:"source_statuses,omitempty"`
}
// SourceLivenessSnapshot is the per-source two-clock view exposed for
@@ -247,6 +251,7 @@ func StartStatsFileWriter(s *Store, interval time.Duration) {
ProcIO: ioRate,
WriterPerf: s.WriterStatsSnapshot(),
SourceLiveness: SnapshotLivenessClocks(),
SourceStatuses: SnapshotSourceStatuses(tickAt),
}
buf.Reset()
if err := enc.Encode(&snap); err != nil {
+144
View File
@@ -0,0 +1,144 @@
package main
import (
"encoding/json"
"net/http"
"net/url"
"os"
"regexp"
"strings"
)
// mqttBrokerSchemes is the set of broker URL schemes whose embedded
// `user:pass@host` credentials we want to redact. We URL-parse for these
// (defense vs. passwords containing `@`); other strings fall through to
// the legacy regex pass for embedded user:pass occurrences in free-form
// error strings.
var mqttBrokerSchemes = map[string]bool{
"mqtt": true, "mqtts": true, "tcp": true, "ssl": true, "ws": true, "wss": true,
}
// mqttBrokerURLRe locates a broker URL (with credentials) embedded inside
// a larger free-form string — e.g. an error message that quotes the
// failing broker. Each match is fed through url.Parse + redaction. We
// match greedily up through the LAST `@` followed by a host-shaped token
// so passwords containing `@` are not truncated (#1682 adversarial r1).
//
// Go's RE2 has no lookahead; we capture the host tail and emit it
// unchanged in the replacement.
var mqttBrokerURLRe = regexp.MustCompile(`(?i)(?:mqtt|mqtts|tcp|ssl|ws|wss)://[^\s]*`)
// maskBrokerURL returns the broker URL with any inline password redacted.
// `mqtt://user:secret@host:1883` -> `mqtt://user:****@host:1883`.
// `mqtt://user:p@ss@host` -> `mqtt://user:****@host` (password with `@`).
// URLs without inline credentials are returned unchanged.
//
// Primary strategy: url.Parse — handles passwords with `@`, `:`, etc.
// Fallback: regex sweep for free-form strings (e.g. error messages that
// quote a URL fragment but aren't standalone-parseable).
func maskBrokerURL(s string) string {
if s == "" {
return s
}
// Fast path: the whole string is the broker URL.
if masked, ok := redactBrokerURL(s); ok {
return masked
}
// Fallback: free-form string (e.g. error message) containing a URL.
// Find embedded broker URLs and redact each in-place.
return mqttBrokerURLRe.ReplaceAllStringFunc(s, func(m string) string {
if out, ok := redactBrokerURL(m); ok {
return out
}
return m
})
}
// redactBrokerURL parses s as a URL and, if it has an mqtt-family scheme
// with userinfo containing a password, returns the URL with the password
// replaced by `****`. Returns ok=false when s is not such a URL.
func redactBrokerURL(s string) (string, bool) {
u, err := url.Parse(s)
if err != nil || u.Scheme == "" || u.User == nil {
return s, false
}
if !mqttBrokerSchemes[strings.ToLower(u.Scheme)] {
return s, false
}
if _, hasPass := u.User.Password(); !hasPass {
return s, false
}
// Re-assemble manually rather than via url.UserPassword + u.String()
// because the latter percent-encodes the `*` mask token into `%2A`,
// defeating the user-visible redaction marker. We only need to swap
// the userinfo segment of the original string.
hostAndAfter := s
if idx := strings.LastIndex(s, "@"); idx >= 0 {
hostAndAfter = s[idx+1:]
}
// Preserve original scheme casing (url.Parse lowercases u.Scheme).
schemeEnd := strings.Index(s, "://")
if schemeEnd < 0 {
return s, false
}
return s[:schemeEnd] + "://" + u.User.Username() + ":****@" + hostAndAfter, true
}
// MqttSourceStatus is the per-MQTT-source status row surfaced via
// /api/mqtt/status. Mirrors the on-disk shape the ingestor publishes
// (cmd/ingestor SourceStatusSnapshot) but with the broker URL credentials
// redacted before serving — operators must not see the broker password
// in the API response (#1043 acceptance criterion).
type MqttSourceStatus struct {
Name string `json:"name"`
Broker string `json:"broker"`
Connected bool `json:"connected"`
LastConnectUnix int64 `json:"lastConnectUnix"`
LastDisconnectUnix int64 `json:"lastDisconnectUnix"`
LastPacketUnix int64 `json:"lastPacketUnix"`
ConnectCount int64 `json:"connectCount"`
DisconnectCount int64 `json:"disconnectCount"`
PacketsTotal int64 `json:"packetsTotal"`
PacketsLast5m int64 `json:"packetsLast5m"`
LastError string `json:"lastError,omitempty"`
}
// MqttStatusResponse is the JSON envelope returned by /api/mqtt/status.
type MqttStatusResponse struct {
Sources []MqttSourceStatus `json:"sources"`
SampleAt string `json:"sampleAt"`
}
// ingestorMqttStatusEnvelope is the partial shape the server decodes from
// the ingestor stats file (additive — older ingestors omit the field).
type ingestorMqttStatusEnvelope struct {
SampledAt string `json:"sampledAt"`
SourceStatuses []MqttSourceStatus `json:"source_statuses"`
}
// handleMqttStatus serves GET /api/mqtt/status. Reads the ingestor stats
// file, masks broker-URL passwords, and returns the per-source status
// list. Returns an empty list (200 OK) when the stats file is missing
// or unparseable — the UI panel renders a "no data yet" state.
func (s *Server) handleMqttStatus(w http.ResponseWriter, r *http.Request) {
resp := MqttStatusResponse{Sources: []MqttSourceStatus{}, SampleAt: ""}
data, err := os.ReadFile(IngestorStatsPath())
if err != nil {
writeJSON(w, resp)
return
}
var env ingestorMqttStatusEnvelope
if err := json.Unmarshal(data, &env); err != nil {
writeJSON(w, resp)
return
}
resp.SampleAt = env.SampledAt
for _, src := range env.SourceStatuses {
src.Broker = maskBrokerURL(src.Broker)
// Broker libraries occasionally quote the failing URL in the
// error string — redact there too as defense-in-depth.
src.LastError = maskBrokerURL(src.LastError)
resp.Sources = append(resp.Sources, src)
}
writeJSON(w, resp)
}
+142
View File
@@ -0,0 +1,142 @@
package main
import (
"encoding/json"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"strings"
"testing"
)
// TestMqttStatus_MasksBrokerPassword (#1043) asserts the /api/mqtt/status
// handler never leaks the broker password embedded in a mqtt:// URL.
// Operators viewing the API response (or the Observers panel that
// consumes it) must see `****` in place of the inline credential.
//
// Test shape: write a stub ingestor stats file with one source whose
// broker URL contains a plaintext password, invoke the handler, assert
// the JSON response (a) contains the username + host, (b) does NOT
// contain the password substring.
func TestMqttStatus_MasksBrokerPassword(t *testing.T) {
const password = "hunter2supersecret"
const rawBroker = "mqtt://obsuser:" + password + "@broker.example.com:1883"
tmp := t.TempDir()
statsPath := filepath.Join(tmp, "ingestor-stats.json")
t.Setenv("CORESCOPE_INGESTOR_STATS", statsPath)
// Stub stats file: one MQTT source with a credentialed broker URL.
stub := map[string]any{
"sampledAt": "2026-06-12T12:30:00Z",
"source_statuses": []map[string]any{{
"name": "local",
"broker": rawBroker,
"connected": true,
"lastPacketUnix": 1717977000,
"connectCount": 1,
"disconnectCount": 0,
"packetsTotal": 42,
"packetsLast5m": 7,
}},
}
data, err := json.Marshal(stub)
if err != nil {
t.Fatalf("marshal stub: %v", err)
}
if err := os.WriteFile(statsPath, data, 0o600); err != nil {
t.Fatalf("write stub: %v", err)
}
srv := &Server{}
req := httptest.NewRequest(http.MethodGet, "/api/mqtt/status", nil)
rec := httptest.NewRecorder()
srv.handleMqttStatus(rec, req)
if rec.Code != http.StatusOK {
t.Fatalf("status = %d, want 200; body=%s", rec.Code, rec.Body.String())
}
body := rec.Body.String()
t.Logf("response body: %s", body)
if strings.Contains(body, password) {
t.Errorf("response leaks broker password %q in body: %s", password, body)
}
// Sanity: the response still identifies the source by name + host.
if !strings.Contains(body, "broker.example.com") {
t.Errorf("response missing broker host: %s", body)
}
if !strings.Contains(body, "obsuser") {
t.Errorf("response missing broker username: %s", body)
}
// Mask token must be present so operators can tell credentials were
// redacted vs the broker URL never having a password to begin with.
if !strings.Contains(body, "****") {
t.Errorf("response missing redaction marker '****': %s", body)
}
}
// TestMqttStatus_EmptyWhenNoStatsFile asserts the handler returns an empty
// list (200 OK) when the ingestor stats file is missing — the UI panel
// renders a "no data yet" state in that case.
func TestMqttStatus_EmptyWhenNoStatsFile(t *testing.T) {
tmp := t.TempDir()
t.Setenv("CORESCOPE_INGESTOR_STATS", filepath.Join(tmp, "does-not-exist.json"))
srv := &Server{}
req := httptest.NewRequest(http.MethodGet, "/api/mqtt/status", nil)
rec := httptest.NewRecorder()
srv.handleMqttStatus(rec, req)
if rec.Code != http.StatusOK {
t.Fatalf("status = %d, want 200", rec.Code)
}
var resp MqttStatusResponse
if err := json.Unmarshal(rec.Body.Bytes(), &resp); err != nil {
t.Fatalf("unmarshal: %v; body=%s", err, rec.Body.String())
}
if len(resp.Sources) != 0 {
t.Errorf("Sources len = %d, want 0", len(resp.Sources))
}
}
// TestMaskBrokerURL_Patterns is a unit table-driven test for the masking
// helper. Kept separate from the handler test so a regression in the
// regex localizes immediately.
func TestMaskBrokerURL_Patterns(t *testing.T) {
cases := []struct {
name, in, want string
}{
{"plain mqtt no creds", "mqtt://broker.example.com:1883", "mqtt://broker.example.com:1883"},
{"mqtt with creds", "mqtt://u:secret@broker.example.com:1883", "mqtt://u:****@broker.example.com:1883"},
{"mqtts with creds", "mqtts://u:secret@broker.example.com:8883", "mqtts://u:****@broker.example.com:8883"},
{"tcp with creds", "tcp://u:p@host:1883", "tcp://u:****@host:1883"},
{"ssl with creds", "ssl://u:p@host:8883", "ssl://u:****@host:8883"},
{"ws with creds", "ws://u:p@host:8080/mqtt", "ws://u:****@host:8080/mqtt"},
{"wss with creds", "wss://u:p@host:443/mqtt", "wss://u:****@host:443/mqtt"},
{"uppercase scheme", "MQTT://u:p@host:1883", "MQTT://u:****@host:1883"},
{"empty", "", ""},
{"long password", "mqtt://obsuser:hunter2supersecretXYZ123@host:1883", "mqtt://obsuser:****@host:1883"},
{"no scheme bare host", "host:1883", "host:1883"},
// Adversarial r1 review (#1682): password contains @. The previous
// regex-only impl matched only up to the FIRST @, exposing "ss" as
// part of the path: "mqtt://user:****@ss@host". url.Parse handles
// this correctly because Go interprets the LAST @ as the userinfo
// boundary.
{"password with single @", "mqtt://user:p@ss@host:1883", "mqtt://user:****@host:1883"},
{"password with multiple @", "mqtt://user:p@ss@wo@host:1883", "mqtt://user:****@host:1883"},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
got := maskBrokerURL(c.in)
if got != c.want {
t.Errorf("maskBrokerURL(%q) = %q, want %q", c.in, got, c.want)
}
// Inline secret must never survive.
if c.in != c.want && strings.Contains(got, "secret") {
t.Errorf("output still contains 'secret': %q", got)
}
})
}
}
+1
View File
@@ -42,6 +42,7 @@ func routeDescriptions() map[string]routeMeta {
"GET /api/health": {Summary: "Health check", Description: "Returns server health, uptime, and memory stats.", Tag: "admin"},
"GET /api/stats": {Summary: "Network statistics", Description: "Returns aggregate stats (node counts, packet counts, observer counts). Cached for 10s.", Tag: "admin"},
"GET /api/perf": {Summary: "Performance statistics", Description: "Returns per-endpoint request timing and slow query log.", Tag: "admin"},
"GET /api/mqtt/status": {Summary: "MQTT source status", Description: "Returns per-MQTT-source connection state and counters (lastConnectUnix, lastPacketUnix, packetsTotal, etc.). Broker URL passwords are masked. Sourced from the ingestor stats file; empty list when unavailable. (#1043)", Tag: "admin"},
"POST /api/perf/reset": {Summary: "Reset performance stats", Tag: "admin", Auth: true},
// "POST /api/admin/prune" removed in #1283 (ingestor owns prune).
"GET /api/debug/affinity": {Summary: "Debug neighbor affinity scores", Tag: "admin", Auth: true},
+1
View File
@@ -230,6 +230,7 @@ func (s *Server) RegisterRoutes(r *mux.Router) {
r.HandleFunc("/api/perf/io", s.handlePerfIO).Methods("GET")
r.HandleFunc("/api/perf/sqlite", s.handlePerfSqlite).Methods("GET")
r.HandleFunc("/api/perf/write-sources", s.handlePerfWriteSources).Methods("GET")
r.HandleFunc("/api/mqtt/status", s.handleMqttStatus).Methods("GET")
r.Handle("/api/perf/reset", s.requireAPIKey(http.HandlerFunc(s.handlePerfReset))).Methods("POST")
// /api/admin/prune removed in #1283 — pruning is owned by the
// ingestor process (scheduled tickers + startup pass). Operators
+1
View File
@@ -211,6 +211,7 @@
<script src="drag-manager.js?v=__BUST__" onerror="console.error('Failed to load:', this.src)"></script>
<script src="live.js?v=__BUST__" onerror="console.error('Failed to load:', this.src)"></script>
<script src="observers.js?v=__BUST__" onerror="console.error('Failed to load:', this.src)"></script>
<script src="mqtt-status-panel.js?v=__BUST__" onerror="console.error('Failed to load:', this.src)"></script>
<script src="observer-detail.js?v=__BUST__" onerror="console.error('Failed to load:', this.src)"></script>
<script src="compare.js?v=__BUST__" onerror="console.error('Failed to load:', this.src)"></script>
<script src="node-analytics.js?v=__BUST__" onerror="console.error('Failed to load:', this.src)"></script>
+140
View File
@@ -0,0 +1,140 @@
/* === CoreScope mqtt-status-panel.js (#1043) ===
* Small panel that fetches /api/mqtt/status, renders a per-source row
* with connection state + recent-packet color coding, and auto-refreshes
* every 10s. Mounted by observers.js into a container element.
*
* Color-coding:
* - green: connected AND a packet seen in the last 5 minutes
* - yellow: connected but no recent packets (broker quiet or stalled)
* - red: disconnected
*
* Exposed as window.MqttStatusPanel for testability and so the Observers
* page can mount it without an import system.
*/
'use strict';
(function () {
var REFRESH_MS = 10000;
var RECENT_PACKET_MS = 5 * 60 * 1000;
function fmtRelative(unixSec, now) {
if (!unixSec) return 'never';
var ms = (now || Date.now()) - unixSec * 1000;
if (ms < 0) ms = 0;
if (ms < 60000) return Math.floor(ms / 1000) + 's ago';
if (ms < 3600000) return Math.floor(ms / 60000) + 'm ago';
if (ms < 86400000) return Math.floor(ms / 3600000) + 'h ago';
return Math.floor(ms / 86400000) + 'd ago';
}
// classifySource returns 'green' | 'yellow' | 'red' for a source row.
// Exposed for unit testing.
function classifySource(src, now) {
if (!src || !src.connected) return 'red';
var lastMs = (src.lastPacketUnix || 0) * 1000;
var ageMs = (now || Date.now()) - lastMs;
if (src.lastPacketUnix && ageMs <= RECENT_PACKET_MS) return 'green';
return 'yellow';
}
// escapeHTML keeps masked-but-still-attacker-controllable broker strings
// safe in innerHTML. The server already redacts passwords; this defends
// against a hostname containing < or & breaking the panel.
function escapeHTML(s) {
return String(s == null ? '' : s)
.replace(/&/g, '&amp;')
.replace(/</g, '&lt;')
.replace(/>/g, '&gt;')
.replace(/"/g, '&quot;');
}
function renderPanel(container, payload, now) {
if (!container) return;
var sources = (payload && payload.sources) || [];
if (sources.length === 0) {
container.innerHTML = '<div class="mqtt-status-empty text-muted" '
+ 'style="padding:8px 0;font-size:13px">'
+ 'No MQTT sources reported yet. The ingestor publishes status '
+ 'every second; if this persists check the ingestor logs.</div>';
return;
}
var rows = sources.map(function (s) {
var state = classifySource(s, now);
var dot;
switch (state) {
case 'green': dot = 'var(--status-green)'; break;
case 'yellow': dot = 'var(--status-yellow)'; break;
default: dot = 'var(--status-red)';
}
return ''
+ '<tr data-source-name="' + escapeHTML(s.name) + '" data-state="' + state + '">'
+ '<td><span class="mqtt-status-dot" aria-hidden="true" '
+ 'style="display:inline-block;width:10px;height:10px;border-radius:50%;'
+ 'background:' + dot + ';margin-right:6px"></span>'
+ '<strong>' + escapeHTML(s.name) + '</strong></td>'
+ '<td><code style="font-size:12px">' + escapeHTML(s.broker) + '</code></td>'
+ '<td>' + (s.connected ? 'connected' : 'disconnected') + '</td>'
+ '<td>' + fmtRelative(s.lastPacketUnix, now) + '</td>'
+ '<td style="text-align:right">' + (s.packetsLast5m || 0) + '</td>'
+ '<td style="text-align:right">' + (s.packetsTotal || 0) + '</td>'
+ '<td style="text-align:right">' + (s.disconnectCount || 0) + '</td>'
+ '</tr>';
}).join('');
container.innerHTML = ''
+ '<div class="mqtt-status-panel" style="margin:12px 0">'
+ '<h3 style="margin:0 0 6px 0;font-size:14px">MQTT sources</h3>'
+ '<table class="mqtt-status-table" style="width:100%;font-size:13px;border-collapse:collapse">'
+ '<thead><tr style="text-align:left">'
+ '<th style="padding:4px 8px">Source</th>'
+ '<th style="padding:4px 8px">Broker</th>'
+ '<th style="padding:4px 8px">State</th>'
+ '<th style="padding:4px 8px">Last packet</th>'
+ '<th style="padding:4px 8px;text-align:right">5m</th>'
+ '<th style="padding:4px 8px;text-align:right">Total</th>'
+ '<th style="padding:4px 8px;text-align:right">Disc.</th>'
+ '</tr></thead>'
+ '<tbody>' + rows + '</tbody>'
+ '</table></div>';
}
// mount attaches the panel into `container` and starts auto-refresh.
// Returns a teardown function the caller can invoke on page unmount.
// The optional `opts.fetchImpl` lets tests inject a fake fetch.
function mount(container, opts) {
opts = opts || {};
var fetchImpl = opts.fetchImpl || (typeof window !== 'undefined' && window.fetch ? window.fetch.bind(window) : null);
if (!fetchImpl) return function noop() {};
var stopped = false;
function tick() {
if (stopped) return;
Promise.resolve()
.then(function () { return fetchImpl('/api/mqtt/status'); })
.then(function (r) { return r && r.json ? r.json() : r; })
.then(function (payload) {
if (stopped) return;
renderPanel(container, payload, Date.now());
})
.catch(function () { /* keep last-rendered state on transient failures */ });
}
tick();
var timer = setInterval(tick, opts.intervalMs || REFRESH_MS);
return function teardown() {
stopped = true;
clearInterval(timer);
};
}
var api = {
mount: mount,
renderPanel: renderPanel,
classifySource: classifySource,
fmtRelative: fmtRelative,
REFRESH_MS: REFRESH_MS,
RECENT_PACKET_MS: RECENT_PACKET_MS
};
if (typeof window !== 'undefined') window.MqttStatusPanel = api;
if (typeof module !== 'undefined' && module.exports) module.exports = api;
})();
+12
View File
@@ -144,6 +144,7 @@ window.preserveCompareSelection = function preserveCompareSelection(prevIds, tbo
let refreshTimer = null;
let regionChangeHandler = null;
let _obsSortCtl = null; // #1641 r1 finding #6: tracked TableSort controller for destroy-before-reinit
let _mqttPanelTeardown = null; // #1043: teardown fn for the MQTT status panel timer
function init(app) {
app.innerHTML = `
@@ -165,9 +166,16 @@ window.preserveCompareSelection = function preserveCompareSelection(prevIds, tbo
<button class="btn-icon" data-action="obs-refresh" title="Refresh" aria-label="Refresh observers"><svg class="ph-icon" aria-hidden="true" focusable="false"><use href="/icons/phosphor-sprite.svg#ph-arrow-clockwise"></use></svg></button>
</div>
<div id="obsRegionFilter" class="region-filter-container"></div>
<div id="mqttStatusPanel" class="mqtt-status-panel-container"></div>
<div id="obsContent"><div class="text-center text-muted" style="padding:40px">Loading</div></div>
</div>`;
RegionFilter.init(document.getElementById('obsRegionFilter'));
// #1043: mount the MQTT source status panel above the observers
// table. Self-refreshes every 10s; tear down in destroy() so the
// SPA route change doesn't leak the timer.
if (typeof window !== 'undefined' && window.MqttStatusPanel) {
_mqttPanelTeardown = window.MqttStatusPanel.mount(document.getElementById('mqttStatusPanel'));
}
regionChangeHandler = RegionFilter.onChange(function () { render(); });
loadObservers();
// Event delegation for data-action buttons
@@ -241,6 +249,10 @@ window.preserveCompareSelection = function preserveCompareSelection(prevIds, tbo
try { _obsSortCtl.destroy(); } catch (_) { /* ignore */ }
}
_obsSortCtl = null;
if (typeof _mqttPanelTeardown === 'function') {
try { _mqttPanelTeardown(); } catch (_) { /* ignore */ }
}
_mqttPanelTeardown = null;
observers = [];
obsSkewMap = {};
}
+161
View File
@@ -0,0 +1,161 @@
/* test-mqtt-status-panel.js (#1043)
*
* DOM-grep test for public/mqtt-status-panel.js. Loads the module into a
* VM sandbox (no jsdom), stubs the fetch + container, drives renderPanel
* directly with a mocked /api/mqtt/status payload, then asserts:
* - one row per source
* - a row with no recent packet but `connected:true` is classified yellow
* - a disconnected source is classified red
* - a connected source with a recent packet is classified green
* - the masked broker URL is rendered (server is responsible for
* masking; the test verifies the panel does not re-leak it)
* - no plaintext password appears in the rendered HTML
*/
'use strict';
const vm = require('vm');
const fs = require('fs');
const assert = require('assert');
let passed = 0, failed = 0;
function test(name, fn) {
try { fn(); passed++; console.log(`${name}`); }
catch (e) { failed++; console.log(`${name}: ${e.message}`); }
}
const src = fs.readFileSync(require('path').resolve(__dirname, 'public/mqtt-status-panel.js'), 'utf8');
const ctx = {
window: {},
module: { exports: {} },
setInterval, clearInterval, setTimeout, clearTimeout,
Promise
};
vm.createContext(ctx);
vm.runInContext(src, ctx);
const Panel = ctx.window.MqttStatusPanel;
assert.ok(Panel, 'window.MqttStatusPanel must be exposed after loading mqtt-status-panel.js');
console.log('mqtt-status-panel:');
test('classifySource: connected + recent packet → green', () => {
const now = 1_700_000_000_000;
const state = Panel.classifySource({ connected: true, lastPacketUnix: now / 1000 - 30 }, now);
assert.strictEqual(state, 'green');
});
test('classifySource: connected, no recent packets → yellow', () => {
const now = 1_700_000_000_000;
const state = Panel.classifySource({ connected: true, lastPacketUnix: now / 1000 - 9 * 60 }, now);
assert.strictEqual(state, 'yellow');
});
test('classifySource: disconnected → red (regardless of packet age)', () => {
const now = 1_700_000_000_000;
const state = Panel.classifySource({ connected: false, lastPacketUnix: now / 1000 - 1 }, now);
assert.strictEqual(state, 'red');
});
test('renderPanel emits one <tr> per source with the masked broker URL', () => {
const container = { innerHTML: '' };
const now = 1_700_000_000_000;
const payload = {
sampleAt: '2026-06-12T12:30:00Z',
sources: [
{ name: 'local', broker: 'mqtt://obsuser:****@broker.example.com:1883',
connected: true, lastPacketUnix: now / 1000 - 10, packetsTotal: 999, packetsLast5m: 42, disconnectCount: 0 },
{ name: 'cascadia', broker: 'mqtts://cascadia.example.com:8883',
connected: true, lastPacketUnix: now / 1000 - 9 * 60, packetsTotal: 50, packetsLast5m: 0, disconnectCount: 2 },
{ name: 'dead', broker: 'mqtt://dead.example.com:1883',
connected: false, lastPacketUnix: 0, packetsTotal: 0, packetsLast5m: 0, disconnectCount: 7 }
]
};
Panel.renderPanel(container, payload, now);
const html = container.innerHTML;
// 3 <tr data-source-name="...">
const rowMatches = html.match(/<tr data-source-name=/g) || [];
assert.strictEqual(rowMatches.length, 3, `expected 3 rows, got ${rowMatches.length}: ${html}`);
// State data attributes wired through.
assert.ok(/data-source-name="local"[^>]*data-state="green"/.test(html), 'local row must be green');
assert.ok(/data-source-name="cascadia"[^>]*data-state="yellow"/.test(html), 'cascadia row must be yellow (connected, idle)');
assert.ok(/data-source-name="dead"[^>]*data-state="red"/.test(html), 'dead row must be red (disconnected)');
// Masked broker URL rendered verbatim; password placeholder visible.
assert.ok(html.includes('mqtt://obsuser:****@broker.example.com:1883'),
'masked broker URL must be present in rendered HTML');
// Counts rendered.
assert.ok(html.includes('999'), 'packetsTotal must be rendered');
assert.ok(html.includes('42'), 'packetsLast5m must be rendered');
// Status dots must use CSS variables, not hardcoded hex (#1682 adversarial r1).
// Regression: hex literals (#3fbf3f / #e4a800 / #e04848) bypass theming.
assert.ok(html.includes('background:var(--status-green)') || html.includes('background: var(--status-green)'),
'green dot must use var(--status-green): ' + html);
assert.ok(html.includes('background:var(--status-yellow)') || html.includes('background: var(--status-yellow)'),
'yellow dot must use var(--status-yellow): ' + html);
assert.ok(html.includes('background:var(--status-red)') || html.includes('background: var(--status-red)'),
'red dot must use var(--status-red): ' + html);
assert.ok(!/#3fbf3f|#e4a800|#e04848/i.test(html),
'panel must not emit hardcoded hex colors for status dots: ' + html);
});
test('renderPanel never echoes a plaintext password (defense-in-depth)', () => {
// The panel only renders what the server sends. If the server fails
// to mask, the panel must NOT introduce its own leak — and a smoke
// test here would catch a regression that adds a debug data-* with
// the unmasked URL.
const container = { innerHTML: '' };
const now = 1_700_000_000_000;
// Already-masked input — the server-side regex is tested in Go.
Panel.renderPanel(container, { sources: [
{ name: 'local', broker: 'mqtt://obsuser:****@host:1883', connected: true, lastPacketUnix: now / 1000 - 5,
packetsTotal: 1, packetsLast5m: 1, disconnectCount: 0 }
]}, now);
assert.ok(!/hunter2|password=|p4ssw0rd/i.test(container.innerHTML),
'plaintext password substrings must not appear in rendered HTML');
});
test('renderPanel handles empty source list with placeholder text', () => {
const container = { innerHTML: '' };
Panel.renderPanel(container, { sources: [] }, Date.now());
assert.ok(container.innerHTML.includes('No MQTT sources'),
`empty state should render placeholder; got: ${container.innerHTML}`);
});
test('mount fetches /api/mqtt/status and renders a row from the response', async () => {
const container = { innerHTML: '' };
const now = Date.now();
const fakePayload = {
sources: [{
name: 'local',
broker: 'mqtt://obsuser:****@broker.example.com:1883',
connected: true,
lastPacketUnix: Math.floor(now / 1000) - 5,
packetsTotal: 3,
packetsLast5m: 3,
disconnectCount: 0
}]
};
let fetchedURL = null;
const fakeFetch = (url) => {
fetchedURL = url;
return Promise.resolve({ json: () => Promise.resolve(fakePayload) });
};
const teardown = Panel.mount(container, { fetchImpl: fakeFetch, intervalMs: 60_000 });
// mount's initial tick is async; spin until container is populated.
await new Promise((resolve) => setTimeout(resolve, 50));
teardown();
assert.strictEqual(fetchedURL, '/api/mqtt/status', 'mount must hit /api/mqtt/status');
assert.ok(container.innerHTML.includes('mqtt://obsuser:****@broker.example.com:1883'),
`panel did not render after fetch; html=${container.innerHTML}`);
});
(async () => {
// Tiny sleep so the async test above resolves before the process exits.
await new Promise((r) => setTimeout(r, 100));
console.log(`\n${passed} passed, ${failed} failed`);
process.exit(failed === 0 ? 0 : 1);
})();