diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index 85628dd7..99134290 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -141,6 +141,7 @@ jobs: node test-traces.js node test-issue-1648-m4-emoji-scan.js node test-issue-1668-m3-typography.js + node test-mqtt-status-panel.js - name: π‘οΈ Preflight XSS gate β actual --diff check (PR only) # The fixture self-test above (test-preflight-xss-gate.js) only diff --git a/cmd/ingestor/main.go b/cmd/ingestor/main.go index 37806bd2..a623da6f 100644 --- a/cmd/ingestor/main.go +++ b/cmd/ingestor/main.go @@ -132,8 +132,14 @@ func main() { Broker: source.Broker, } + // #1043: per-source status registry. Idempotent β repeated + // registration across reconnects returns the same state so + // counters accumulate across the process lifetime. + status := RegisterSourceStatus(tag, source.Broker) + opts.SetOnConnectHandler(func(c mqtt.Client) { log.Printf("MQTT [%s] connected to %s", tag, source.Broker) + status.MarkConnect(time.Now()) // PR #1216 r1 item 2: clear the stale LastMessageUnix from // before the outage so the watchdog doesn't immediately scream // "stalled for 2h". Also restarts the cold-start grace window @@ -156,6 +162,7 @@ func main() { opts.SetConnectionLostHandler(func(c mqtt.Client, err error) { log.Printf("MQTT [%s] disconnected from %s: %v", tag, source.Broker, err) + status.MarkDisconnect(time.Now(), err) }) opts.SetReconnectingHandler(func(c mqtt.Client, options *mqtt.ClientOptions) { @@ -171,6 +178,7 @@ func main() { // report "fresh" while the writer was stalled and the // buffer was filling. markReceiptForTag(tag, time.Now()) + status.MarkPacket(time.Now()) ingestBuffer.Submit(func() { handleMessage(store, tag, src, m, channelKeys, regionKeys, cfg) }) diff --git a/cmd/ingestor/source_status.go b/cmd/ingestor/source_status.go new file mode 100644 index 00000000..d18637de --- /dev/null +++ b/cmd/ingestor/source_status.go @@ -0,0 +1,187 @@ +package main + +import ( + "sync" + "sync/atomic" + "time" +) + +// SourceStatusSnapshot is the per-MQTT-source connection state and counter +// view written to the ingestor stats file (under "source_statuses") and +// consumed by cmd/server's /api/mqtt/status handler (#1043). +// +// All fields are unix seconds (0 = "never"). PacketsLast5m is a sliding +// 5-minute count derived from a per-second ring buffer. +type SourceStatusSnapshot struct { + Name string `json:"name"` + Broker string `json:"broker"` + Connected bool `json:"connected"` + LastConnectUnix int64 `json:"lastConnectUnix"` + LastDisconnectUnix int64 `json:"lastDisconnectUnix"` + LastPacketUnix int64 `json:"lastPacketUnix"` + ConnectCount int64 `json:"connectCount"` + DisconnectCount int64 `json:"disconnectCount"` + PacketsTotal int64 `json:"packetsTotal"` + PacketsLast5m int64 `json:"packetsLast5m"` + LastError string `json:"lastError,omitempty"` +} + +// sourceStatusState is the in-memory per-source counter set. All scalar +// fields are accessed via sync/atomic so the hot-path MarkPacket / +// MarkConnect / MarkDisconnect callsites stay lock-free. The 5-minute +// sliding window uses a 300-element per-second ring (one slot per +// second), guarded by ringMu only when we slide the cursor β the common +// path increments the current second with a single atomic.AddInt64. +// +// Memory: one state per source (typically 1-5 in production). 300 int64 +// slots = 2.4KB/source β fine. +type sourceStatusState struct { + name string + broker string // raw broker URL β server-side handler masks the password + + connected atomic.Bool + lastConnectUnix atomic.Int64 + lastDisconnectUnix atomic.Int64 + lastPacketUnix atomic.Int64 + connectCount atomic.Int64 + disconnectCount atomic.Int64 + packetsTotal atomic.Int64 + + // 5-minute sliding window: per-second buckets keyed by unix second. + // Stored as parallel arrays so we can both zero-out a stale slot AND + // know whether a slot's contents are still inside the window. + ringMu sync.Mutex + ringSec [300]int64 // unix second this slot represents (0 = unused) + ringCount [300]int64 // packets received in that second + + // lastError is rare-write/rare-read so a plain mutex is fine. + errMu sync.RWMutex + lastError string +} + +// MarkConnect records a successful (re)connection to the broker. +// Clears any stale lastError from a prior disconnect β otherwise the UI +// shows "connected=true, lastError='connection refused'" after a successful +// reconnect, which is a lie (#1682 munger review r1). +func (s *sourceStatusState) MarkConnect(now time.Time) { + s.connected.Store(true) + s.lastConnectUnix.Store(now.Unix()) + s.connectCount.Add(1) + s.errMu.Lock() + s.lastError = "" + s.errMu.Unlock() +} + +// MarkDisconnect records the broker dropping the connection. +func (s *sourceStatusState) MarkDisconnect(now time.Time, err error) { + s.connected.Store(false) + s.lastDisconnectUnix.Store(now.Unix()) + s.disconnectCount.Add(1) + if err != nil { + s.errMu.Lock() + s.lastError = err.Error() + s.errMu.Unlock() + } +} + +// MarkPacket records receipt of an MQTT message. Hot path. +func (s *sourceStatusState) MarkPacket(now time.Time) { + nowSec := now.Unix() + s.lastPacketUnix.Store(nowSec) + s.packetsTotal.Add(1) + + slot := nowSec % int64(len(s.ringSec)) + s.ringMu.Lock() + if s.ringSec[slot] != nowSec { + s.ringSec[slot] = nowSec + s.ringCount[slot] = 0 + } + s.ringCount[slot]++ + s.ringMu.Unlock() +} + +// sumLast5m returns the count of MarkPacket calls in the last 300s. Slots +// whose stored second falls outside the window are ignored (no stale leak). +func (s *sourceStatusState) sumLast5m(now time.Time) int64 { + nowSec := now.Unix() + cutoff := nowSec - int64(len(s.ringSec)) + 1 + var total int64 + s.ringMu.Lock() + for i := 0; i < len(s.ringSec); i++ { + if s.ringSec[i] >= cutoff && s.ringSec[i] <= nowSec { + total += s.ringCount[i] + } + } + s.ringMu.Unlock() + return total +} + +// snapshot copies the state into a serializable view. +func (s *sourceStatusState) snapshot(now time.Time) SourceStatusSnapshot { + s.errMu.RLock() + errStr := s.lastError + s.errMu.RUnlock() + return SourceStatusSnapshot{ + Name: s.name, + Broker: s.broker, + Connected: s.connected.Load(), + LastConnectUnix: s.lastConnectUnix.Load(), + LastDisconnectUnix: s.lastDisconnectUnix.Load(), + LastPacketUnix: s.lastPacketUnix.Load(), + ConnectCount: s.connectCount.Load(), + DisconnectCount: s.disconnectCount.Load(), + PacketsTotal: s.packetsTotal.Load(), + PacketsLast5m: s.sumLast5m(now), + LastError: errStr, + } +} + +// sourceStatusRegistry holds one sourceStatusState per source. Keyed by +// tag (which is the source Name, or the Broker URL if the operator left +// the name blank). +var ( + sourceStatusRegistryMu sync.RWMutex + sourceStatusRegistry = map[string]*sourceStatusState{} +) + +// RegisterSourceStatus creates (or returns the existing) state for the +// given source. Safe for cold-start use; idempotent β re-registering the +// same tag returns the existing state so counters aren't reset across +// reconnects. +func RegisterSourceStatus(tag, broker string) *sourceStatusState { + sourceStatusRegistryMu.Lock() + defer sourceStatusRegistryMu.Unlock() + if s, ok := sourceStatusRegistry[tag]; ok { + return s + } + s := &sourceStatusState{name: tag, broker: broker} + sourceStatusRegistry[tag] = s + return s +} + +// lookupSourceStatus returns the state for tag, or nil if unregistered. +func lookupSourceStatus(tag string) *sourceStatusState { + sourceStatusRegistryMu.RLock() + defer sourceStatusRegistryMu.RUnlock() + return sourceStatusRegistry[tag] +} + +// SnapshotSourceStatuses returns a slice of every registered source's +// current snapshot. Surfaced via the ingestor stats file under +// "source_statuses" so /api/mqtt/status can serve it (#1043). +func SnapshotSourceStatuses(now time.Time) []SourceStatusSnapshot { + sourceStatusRegistryMu.RLock() + defer sourceStatusRegistryMu.RUnlock() + out := make([]SourceStatusSnapshot, 0, len(sourceStatusRegistry)) + for _, s := range sourceStatusRegistry { + out = append(out, s.snapshot(now)) + } + return out +} + +// resetSourceStatusRegistry clears the registry. Test-only helper. +func resetSourceStatusRegistry() { + sourceStatusRegistryMu.Lock() + defer sourceStatusRegistryMu.Unlock() + sourceStatusRegistry = map[string]*sourceStatusState{} +} diff --git a/cmd/ingestor/source_status_test.go b/cmd/ingestor/source_status_test.go new file mode 100644 index 00000000..06b94d42 --- /dev/null +++ b/cmd/ingestor/source_status_test.go @@ -0,0 +1,116 @@ +package main + +import ( + "errors" + "testing" + "time" +) + +// TestSourceStatus_BasicLifecycle exercises the counter wiring used by +// the /api/mqtt/status server-side endpoint (#1043). +func TestSourceStatus_BasicLifecycle(t *testing.T) { + resetSourceStatusRegistry() + defer resetSourceStatusRegistry() + + s := RegisterSourceStatus("local", "mqtt://broker.example.com:1883") + if s == nil { + t.Fatal("RegisterSourceStatus returned nil") + } + // Re-registration is idempotent. + if s2 := RegisterSourceStatus("local", "mqtt://other"); s2 != s { + t.Fatal("RegisterSourceStatus not idempotent") + } + + now := time.Unix(1_700_000_000, 0) + s.MarkConnect(now) + s.MarkPacket(now) + s.MarkPacket(now.Add(1 * time.Second)) + s.MarkPacket(now.Add(2 * time.Second)) + + snap := s.snapshot(now.Add(3 * time.Second)) + if !snap.Connected { + t.Error("snapshot.Connected = false, want true after MarkConnect") + } + if snap.PacketsTotal != 3 { + t.Errorf("PacketsTotal = %d, want 3", snap.PacketsTotal) + } + if snap.PacketsLast5m != 3 { + t.Errorf("PacketsLast5m = %d, want 3", snap.PacketsLast5m) + } + if snap.ConnectCount != 1 { + t.Errorf("ConnectCount = %d, want 1", snap.ConnectCount) + } + if snap.LastConnectUnix != now.Unix() { + t.Errorf("LastConnectUnix = %d, want %d", snap.LastConnectUnix, now.Unix()) + } + if snap.Broker != "mqtt://broker.example.com:1883" { + t.Errorf("Broker = %q, want raw URL passthrough (server masks)", snap.Broker) + } + + // After 5 minutes idle, sliding window must be empty. + snap2 := s.snapshot(now.Add(6 * time.Minute)) + if snap2.PacketsLast5m != 0 { + t.Errorf("PacketsLast5m after 6m idle = %d, want 0", snap2.PacketsLast5m) + } + if snap2.PacketsTotal != 3 { + t.Errorf("PacketsTotal must be lifetime-cumulative, got %d", snap2.PacketsTotal) + } +} + +func TestSourceStatus_Disconnect(t *testing.T) { + resetSourceStatusRegistry() + defer resetSourceStatusRegistry() + + s := RegisterSourceStatus("disco", "mqtt://x:1883") + now := time.Unix(1_700_000_100, 0) + s.MarkConnect(now) + s.MarkDisconnect(now.Add(time.Minute), nil) + + snap := s.snapshot(now.Add(2 * time.Minute)) + if snap.Connected { + t.Error("snapshot.Connected = true after MarkDisconnect, want false") + } + if snap.DisconnectCount != 1 { + t.Errorf("DisconnectCount = %d, want 1", snap.DisconnectCount) + } +} + +func TestSnapshotSourceStatuses_ReturnsAll(t *testing.T) { + resetSourceStatusRegistry() + defer resetSourceStatusRegistry() + + RegisterSourceStatus("a", "mqtt://a") + RegisterSourceStatus("b", "mqtt://b") + snaps := SnapshotSourceStatuses(time.Now()) + if len(snaps) != 2 { + t.Errorf("len(snaps) = %d, want 2", len(snaps)) + } +} + +// TestSourceStatus_MarkConnectClearsLastError asserts MarkConnect wipes +// any prior sticky error (#1682 munger r1 review). Otherwise the UI sees +// connected=true alongside a stale "connection refused" string. +func TestSourceStatus_MarkConnectClearsLastError(t *testing.T) { + resetSourceStatusRegistry() + defer resetSourceStatusRegistry() + + s := RegisterSourceStatus("sticky", "mqtt://x:1883") + now := time.Unix(1_700_000_200, 0) + s.MarkConnect(now) + s.MarkDisconnect(now.Add(time.Second), errors.New("connection refused")) + + snap := s.snapshot(now.Add(2 * time.Second)) + if snap.LastError == "" { + t.Fatalf("precondition: expected lastError after MarkDisconnect, got empty") + } + + // Reconnect β lastError must clear. + s.MarkConnect(now.Add(3 * time.Second)) + snap = s.snapshot(now.Add(4 * time.Second)) + if snap.LastError != "" { + t.Errorf("snapshot.LastError = %q after MarkConnect, want empty (sticky-error regression)", snap.LastError) + } + if !snap.Connected { + t.Errorf("snapshot.Connected = false after MarkConnect, want true") + } +} diff --git a/cmd/ingestor/stats_file.go b/cmd/ingestor/stats_file.go index 429a1f06..7af3065d 100644 --- a/cmd/ingestor/stats_file.go +++ b/cmd/ingestor/stats_file.go @@ -58,6 +58,10 @@ type IngestorStatsSnapshot struct { // stale). Additive: omitempty so older server builds ignore it // gracefully. SourceLiveness map[string]SourceLivenessSnapshot `json:"source_liveness,omitempty"` + // SourceStatuses (#1043) is the per-MQTT-source connection state and + // counter view consumed by cmd/server's /api/mqtt/status handler. + // Additive; omitempty so older server builds ignore it. + SourceStatuses []SourceStatusSnapshot `json:"source_statuses,omitempty"` } // SourceLivenessSnapshot is the per-source two-clock view exposed for @@ -247,6 +251,7 @@ func StartStatsFileWriter(s *Store, interval time.Duration) { ProcIO: ioRate, WriterPerf: s.WriterStatsSnapshot(), SourceLiveness: SnapshotLivenessClocks(), + SourceStatuses: SnapshotSourceStatuses(tickAt), } buf.Reset() if err := enc.Encode(&snap); err != nil { diff --git a/cmd/server/mqtt_status.go b/cmd/server/mqtt_status.go new file mode 100644 index 00000000..65857c54 --- /dev/null +++ b/cmd/server/mqtt_status.go @@ -0,0 +1,144 @@ +package main + +import ( + "encoding/json" + "net/http" + "net/url" + "os" + "regexp" + "strings" +) + +// mqttBrokerSchemes is the set of broker URL schemes whose embedded +// `user:pass@host` credentials we want to redact. We URL-parse for these +// (defense vs. passwords containing `@`); other strings fall through to +// the legacy regex pass for embedded user:pass occurrences in free-form +// error strings. +var mqttBrokerSchemes = map[string]bool{ + "mqtt": true, "mqtts": true, "tcp": true, "ssl": true, "ws": true, "wss": true, +} + +// mqttBrokerURLRe locates a broker URL (with credentials) embedded inside +// a larger free-form string β e.g. an error message that quotes the +// failing broker. Each match is fed through url.Parse + redaction. We +// match greedily up through the LAST `@` followed by a host-shaped token +// so passwords containing `@` are not truncated (#1682 adversarial r1). +// +// Go's RE2 has no lookahead; we capture the host tail and emit it +// unchanged in the replacement. +var mqttBrokerURLRe = regexp.MustCompile(`(?i)(?:mqtt|mqtts|tcp|ssl|ws|wss)://[^\s]*`) + +// maskBrokerURL returns the broker URL with any inline password redacted. +// `mqtt://user:secret@host:1883` -> `mqtt://user:****@host:1883`. +// `mqtt://user:p@ss@host` -> `mqtt://user:****@host` (password with `@`). +// URLs without inline credentials are returned unchanged. +// +// Primary strategy: url.Parse β handles passwords with `@`, `:`, etc. +// Fallback: regex sweep for free-form strings (e.g. error messages that +// quote a URL fragment but aren't standalone-parseable). +func maskBrokerURL(s string) string { + if s == "" { + return s + } + // Fast path: the whole string is the broker URL. + if masked, ok := redactBrokerURL(s); ok { + return masked + } + // Fallback: free-form string (e.g. error message) containing a URL. + // Find embedded broker URLs and redact each in-place. + return mqttBrokerURLRe.ReplaceAllStringFunc(s, func(m string) string { + if out, ok := redactBrokerURL(m); ok { + return out + } + return m + }) +} + +// redactBrokerURL parses s as a URL and, if it has an mqtt-family scheme +// with userinfo containing a password, returns the URL with the password +// replaced by `****`. Returns ok=false when s is not such a URL. +func redactBrokerURL(s string) (string, bool) { + u, err := url.Parse(s) + if err != nil || u.Scheme == "" || u.User == nil { + return s, false + } + if !mqttBrokerSchemes[strings.ToLower(u.Scheme)] { + return s, false + } + if _, hasPass := u.User.Password(); !hasPass { + return s, false + } + // Re-assemble manually rather than via url.UserPassword + u.String() + // because the latter percent-encodes the `*` mask token into `%2A`, + // defeating the user-visible redaction marker. We only need to swap + // the userinfo segment of the original string. + hostAndAfter := s + if idx := strings.LastIndex(s, "@"); idx >= 0 { + hostAndAfter = s[idx+1:] + } + // Preserve original scheme casing (url.Parse lowercases u.Scheme). + schemeEnd := strings.Index(s, "://") + if schemeEnd < 0 { + return s, false + } + return s[:schemeEnd] + "://" + u.User.Username() + ":****@" + hostAndAfter, true +} + +// MqttSourceStatus is the per-MQTT-source status row surfaced via +// /api/mqtt/status. Mirrors the on-disk shape the ingestor publishes +// (cmd/ingestor SourceStatusSnapshot) but with the broker URL credentials +// redacted before serving β operators must not see the broker password +// in the API response (#1043 acceptance criterion). +type MqttSourceStatus struct { + Name string `json:"name"` + Broker string `json:"broker"` + Connected bool `json:"connected"` + LastConnectUnix int64 `json:"lastConnectUnix"` + LastDisconnectUnix int64 `json:"lastDisconnectUnix"` + LastPacketUnix int64 `json:"lastPacketUnix"` + ConnectCount int64 `json:"connectCount"` + DisconnectCount int64 `json:"disconnectCount"` + PacketsTotal int64 `json:"packetsTotal"` + PacketsLast5m int64 `json:"packetsLast5m"` + LastError string `json:"lastError,omitempty"` +} + +// MqttStatusResponse is the JSON envelope returned by /api/mqtt/status. +type MqttStatusResponse struct { + Sources []MqttSourceStatus `json:"sources"` + SampleAt string `json:"sampleAt"` +} + +// ingestorMqttStatusEnvelope is the partial shape the server decodes from +// the ingestor stats file (additive β older ingestors omit the field). +type ingestorMqttStatusEnvelope struct { + SampledAt string `json:"sampledAt"` + SourceStatuses []MqttSourceStatus `json:"source_statuses"` +} + +// handleMqttStatus serves GET /api/mqtt/status. Reads the ingestor stats +// file, masks broker-URL passwords, and returns the per-source status +// list. Returns an empty list (200 OK) when the stats file is missing +// or unparseable β the UI panel renders a "no data yet" state. +func (s *Server) handleMqttStatus(w http.ResponseWriter, r *http.Request) { + resp := MqttStatusResponse{Sources: []MqttSourceStatus{}, SampleAt: ""} + data, err := os.ReadFile(IngestorStatsPath()) + if err != nil { + writeJSON(w, resp) + return + } + var env ingestorMqttStatusEnvelope + if err := json.Unmarshal(data, &env); err != nil { + writeJSON(w, resp) + return + } + resp.SampleAt = env.SampledAt + for _, src := range env.SourceStatuses { + src.Broker = maskBrokerURL(src.Broker) + // Broker libraries occasionally quote the failing URL in the + // error string β redact there too as defense-in-depth. + src.LastError = maskBrokerURL(src.LastError) + resp.Sources = append(resp.Sources, src) + } + writeJSON(w, resp) +} diff --git a/cmd/server/mqtt_status_test.go b/cmd/server/mqtt_status_test.go new file mode 100644 index 00000000..db3415e1 --- /dev/null +++ b/cmd/server/mqtt_status_test.go @@ -0,0 +1,142 @@ +package main + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "strings" + "testing" +) + +// TestMqttStatus_MasksBrokerPassword (#1043) asserts the /api/mqtt/status +// handler never leaks the broker password embedded in a mqtt:// URL. +// Operators viewing the API response (or the Observers panel that +// consumes it) must see `****` in place of the inline credential. +// +// Test shape: write a stub ingestor stats file with one source whose +// broker URL contains a plaintext password, invoke the handler, assert +// the JSON response (a) contains the username + host, (b) does NOT +// contain the password substring. +func TestMqttStatus_MasksBrokerPassword(t *testing.T) { + const password = "hunter2supersecret" + const rawBroker = "mqtt://obsuser:" + password + "@broker.example.com:1883" + + tmp := t.TempDir() + statsPath := filepath.Join(tmp, "ingestor-stats.json") + t.Setenv("CORESCOPE_INGESTOR_STATS", statsPath) + + // Stub stats file: one MQTT source with a credentialed broker URL. + stub := map[string]any{ + "sampledAt": "2026-06-12T12:30:00Z", + "source_statuses": []map[string]any{{ + "name": "local", + "broker": rawBroker, + "connected": true, + "lastPacketUnix": 1717977000, + "connectCount": 1, + "disconnectCount": 0, + "packetsTotal": 42, + "packetsLast5m": 7, + }}, + } + data, err := json.Marshal(stub) + if err != nil { + t.Fatalf("marshal stub: %v", err) + } + if err := os.WriteFile(statsPath, data, 0o600); err != nil { + t.Fatalf("write stub: %v", err) + } + + srv := &Server{} + req := httptest.NewRequest(http.MethodGet, "/api/mqtt/status", nil) + rec := httptest.NewRecorder() + srv.handleMqttStatus(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want 200; body=%s", rec.Code, rec.Body.String()) + } + body := rec.Body.String() + t.Logf("response body: %s", body) + + if strings.Contains(body, password) { + t.Errorf("response leaks broker password %q in body: %s", password, body) + } + // Sanity: the response still identifies the source by name + host. + if !strings.Contains(body, "broker.example.com") { + t.Errorf("response missing broker host: %s", body) + } + if !strings.Contains(body, "obsuser") { + t.Errorf("response missing broker username: %s", body) + } + // Mask token must be present so operators can tell credentials were + // redacted vs the broker URL never having a password to begin with. + if !strings.Contains(body, "****") { + t.Errorf("response missing redaction marker '****': %s", body) + } +} + +// TestMqttStatus_EmptyWhenNoStatsFile asserts the handler returns an empty +// list (200 OK) when the ingestor stats file is missing β the UI panel +// renders a "no data yet" state in that case. +func TestMqttStatus_EmptyWhenNoStatsFile(t *testing.T) { + tmp := t.TempDir() + t.Setenv("CORESCOPE_INGESTOR_STATS", filepath.Join(tmp, "does-not-exist.json")) + + srv := &Server{} + req := httptest.NewRequest(http.MethodGet, "/api/mqtt/status", nil) + rec := httptest.NewRecorder() + srv.handleMqttStatus(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want 200", rec.Code) + } + var resp MqttStatusResponse + if err := json.Unmarshal(rec.Body.Bytes(), &resp); err != nil { + t.Fatalf("unmarshal: %v; body=%s", err, rec.Body.String()) + } + if len(resp.Sources) != 0 { + t.Errorf("Sources len = %d, want 0", len(resp.Sources)) + } +} + +// TestMaskBrokerURL_Patterns is a unit table-driven test for the masking +// helper. Kept separate from the handler test so a regression in the +// regex localizes immediately. +func TestMaskBrokerURL_Patterns(t *testing.T) { + cases := []struct { + name, in, want string + }{ + {"plain mqtt no creds", "mqtt://broker.example.com:1883", "mqtt://broker.example.com:1883"}, + {"mqtt with creds", "mqtt://u:secret@broker.example.com:1883", "mqtt://u:****@broker.example.com:1883"}, + {"mqtts with creds", "mqtts://u:secret@broker.example.com:8883", "mqtts://u:****@broker.example.com:8883"}, + {"tcp with creds", "tcp://u:p@host:1883", "tcp://u:****@host:1883"}, + {"ssl with creds", "ssl://u:p@host:8883", "ssl://u:****@host:8883"}, + {"ws with creds", "ws://u:p@host:8080/mqtt", "ws://u:****@host:8080/mqtt"}, + {"wss with creds", "wss://u:p@host:443/mqtt", "wss://u:****@host:443/mqtt"}, + {"uppercase scheme", "MQTT://u:p@host:1883", "MQTT://u:****@host:1883"}, + {"empty", "", ""}, + {"long password", "mqtt://obsuser:hunter2supersecretXYZ123@host:1883", "mqtt://obsuser:****@host:1883"}, + {"no scheme bare host", "host:1883", "host:1883"}, + // Adversarial r1 review (#1682): password contains @. The previous + // regex-only impl matched only up to the FIRST @, exposing "ss" as + // part of the path: "mqtt://user:****@ss@host". url.Parse handles + // this correctly because Go interprets the LAST @ as the userinfo + // boundary. + {"password with single @", "mqtt://user:p@ss@host:1883", "mqtt://user:****@host:1883"}, + {"password with multiple @", "mqtt://user:p@ss@wo@host:1883", "mqtt://user:****@host:1883"}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + got := maskBrokerURL(c.in) + if got != c.want { + t.Errorf("maskBrokerURL(%q) = %q, want %q", c.in, got, c.want) + } + // Inline secret must never survive. + if c.in != c.want && strings.Contains(got, "secret") { + t.Errorf("output still contains 'secret': %q", got) + } + }) + } +} diff --git a/cmd/server/openapi.go b/cmd/server/openapi.go index c17099fa..a6b65af3 100644 --- a/cmd/server/openapi.go +++ b/cmd/server/openapi.go @@ -42,6 +42,7 @@ func routeDescriptions() map[string]routeMeta { "GET /api/health": {Summary: "Health check", Description: "Returns server health, uptime, and memory stats.", Tag: "admin"}, "GET /api/stats": {Summary: "Network statistics", Description: "Returns aggregate stats (node counts, packet counts, observer counts). Cached for 10s.", Tag: "admin"}, "GET /api/perf": {Summary: "Performance statistics", Description: "Returns per-endpoint request timing and slow query log.", Tag: "admin"}, + "GET /api/mqtt/status": {Summary: "MQTT source status", Description: "Returns per-MQTT-source connection state and counters (lastConnectUnix, lastPacketUnix, packetsTotal, etc.). Broker URL passwords are masked. Sourced from the ingestor stats file; empty list when unavailable. (#1043)", Tag: "admin"}, "POST /api/perf/reset": {Summary: "Reset performance stats", Tag: "admin", Auth: true}, // "POST /api/admin/prune" removed in #1283 (ingestor owns prune). "GET /api/debug/affinity": {Summary: "Debug neighbor affinity scores", Tag: "admin", Auth: true}, diff --git a/cmd/server/routes.go b/cmd/server/routes.go index 2f30f3ec..7b3670df 100644 --- a/cmd/server/routes.go +++ b/cmd/server/routes.go @@ -230,6 +230,7 @@ func (s *Server) RegisterRoutes(r *mux.Router) { r.HandleFunc("/api/perf/io", s.handlePerfIO).Methods("GET") r.HandleFunc("/api/perf/sqlite", s.handlePerfSqlite).Methods("GET") r.HandleFunc("/api/perf/write-sources", s.handlePerfWriteSources).Methods("GET") + r.HandleFunc("/api/mqtt/status", s.handleMqttStatus).Methods("GET") r.Handle("/api/perf/reset", s.requireAPIKey(http.HandlerFunc(s.handlePerfReset))).Methods("POST") // /api/admin/prune removed in #1283 β pruning is owned by the // ingestor process (scheduled tickers + startup pass). Operators diff --git a/public/index.html b/public/index.html index 4b1f79de..1a1f914f 100644 --- a/public/index.html +++ b/public/index.html @@ -211,6 +211,7 @@ + diff --git a/public/mqtt-status-panel.js b/public/mqtt-status-panel.js new file mode 100644 index 00000000..f0b2642b --- /dev/null +++ b/public/mqtt-status-panel.js @@ -0,0 +1,140 @@ +/* === CoreScope β mqtt-status-panel.js (#1043) === + * Small panel that fetches /api/mqtt/status, renders a per-source row + * with connection state + recent-packet color coding, and auto-refreshes + * every 10s. Mounted by observers.js into a container element. + * + * Color-coding: + * - green: connected AND a packet seen in the last 5 minutes + * - yellow: connected but no recent packets (broker quiet or stalled) + * - red: disconnected + * + * Exposed as window.MqttStatusPanel for testability and so the Observers + * page can mount it without an import system. + */ +'use strict'; + +(function () { + var REFRESH_MS = 10000; + var RECENT_PACKET_MS = 5 * 60 * 1000; + + function fmtRelative(unixSec, now) { + if (!unixSec) return 'never'; + var ms = (now || Date.now()) - unixSec * 1000; + if (ms < 0) ms = 0; + if (ms < 60000) return Math.floor(ms / 1000) + 's ago'; + if (ms < 3600000) return Math.floor(ms / 60000) + 'm ago'; + if (ms < 86400000) return Math.floor(ms / 3600000) + 'h ago'; + return Math.floor(ms / 86400000) + 'd ago'; + } + + // classifySource returns 'green' | 'yellow' | 'red' for a source row. + // Exposed for unit testing. + function classifySource(src, now) { + if (!src || !src.connected) return 'red'; + var lastMs = (src.lastPacketUnix || 0) * 1000; + var ageMs = (now || Date.now()) - lastMs; + if (src.lastPacketUnix && ageMs <= RECENT_PACKET_MS) return 'green'; + return 'yellow'; + } + + // escapeHTML keeps masked-but-still-attacker-controllable broker strings + // safe in innerHTML. The server already redacts passwords; this defends + // against a hostname containing < or & breaking the panel. + function escapeHTML(s) { + return String(s == null ? '' : s) + .replace(/&/g, '&') + .replace(//g, '>') + .replace(/"/g, '"'); + } + + function renderPanel(container, payload, now) { + if (!container) return; + var sources = (payload && payload.sources) || []; + if (sources.length === 0) { + container.innerHTML = '
' + escapeHTML(s.broker) + '| Source | ' + + 'Broker | ' + + 'State | ' + + 'Last packet | ' + + '5m | ' + + 'Total | ' + + 'Disc. | ' + + '
|---|