mirror of
https://github.com/Kpa-clawbot/meshcore-analyzer.git
synced 2026-06-03 23:51:22 +00:00
feat(ingestor): document and test ws:// / wss:// WebSocket MQTT broker support (#902)
## Summary
CoreScope's ingestor already supports WebSocket MQTT connections today —
`paho.mqtt.golang` v1.5.0 handles `ws://` and `wss://` natively via
gorilla/websocket. However this support was **undocumented, untested,
and had a TLS gap** for `wss://` connections.
This PR closes those gaps without any breaking changes.
## Changes
### `cmd/ingestor/config.go`
- Added godoc comment to `ResolvedSources()` explaining all four
supported schemes and which ones require translation vs. pass-through
- `ws://` and `wss://` explicitly documented as native paho schemes
requiring no mapping
### `cmd/ingestor/main.go`
- Extended TLS config to cover `wss://` in addition to `ssl://`
- Before: `wss://` connections would use paho's default TLS (no explicit
`tls.Config` set), which works for valid certs but doesn't apply the
same predictable setup as `ssl://`
- After: both `ssl://` and `wss://` get `tls.Config{}` (system CA pool),
matching behavior; `rejectUnauthorized: false` still works for
self-signed certs on both schemes
### `cmd/ingestor/config_test.go`
Two new tests:
- `TestResolvedSourcesSchemeMapping`: validates all six scheme
variations (`mqtt://`, `mqtts://`, `tcp://`, `ssl://`, `ws://`,
`wss://`) including paths like `wss://host/mqtt`
- `TestLoadConfigWSSource`: full round-trip of a dual-source config (TCP
+ wss:// with username/password), verifies scheme unchanged through
`LoadConfig` and `ResolvedSources`
### `config.example.json`
- Added `wsmqtt` example entry showing `wss://` with username/password
- Updated `_comment_mqttSources` to enumerate all supported schemes:
`mqtt://`, `mqtts://`, `ws://`, `wss://`
## Motivation
We run
[meshcore-mqtt-broker](https://github.com/andrewjfreyer/meshcore-mqtt-broker)
(a WebSocket MQTT bridge with JWT auth) alongside Mosquitto, and
subscribe to both via `mqttSources`. The dual-source config works in
production but nothing in the docs or example config made this
discoverable for other operators.
## Testing
```
cd cmd/ingestor && go test ./...
ok github.com/corescope/ingestor 1.568s
```
All existing tests pass. Two new tests added.
## No breaking changes
- Existing configs: no change in behavior
- `ws://` / `wss://` configs that were already working: same behavior +
explicit TLS setup for `wss://`
This commit is contained in:
+10
-1
@@ -286,15 +286,24 @@ func LoadConfig(path string) (*Config, error) {
|
||||
}
|
||||
|
||||
// ResolvedSources returns the final list of MQTT sources to connect to.
|
||||
//
|
||||
// Scheme mapping:
|
||||
//
|
||||
// mqtt:// → tcp:// (paho plain TCP)
|
||||
// mqtts:// → ssl:// (paho TLS over TCP)
|
||||
// ws:// (paho WebSocket — passed through, no mapping needed)
|
||||
// wss:// (paho WebSocket TLS — passed through, no mapping needed)
|
||||
func (c *Config) ResolvedSources() []MQTTSource {
|
||||
for i := range c.MQTTSources {
|
||||
// paho uses tcp:// and ssl:// not mqtt:// and mqtts://
|
||||
// paho uses tcp:// and ssl:// for plain MQTT; ws:// and wss:// are accepted natively.
|
||||
b := c.MQTTSources[i].Broker
|
||||
if strings.HasPrefix(b, "mqtt://") {
|
||||
c.MQTTSources[i].Broker = "tcp://" + b[7:]
|
||||
} else if strings.HasPrefix(b, "mqtts://") {
|
||||
c.MQTTSources[i].Broker = "ssl://" + b[8:]
|
||||
}
|
||||
// ws:// and wss:// pass through unchanged — paho handles WebSocket
|
||||
// connections natively via gorilla/websocket.
|
||||
}
|
||||
return c.MQTTSources
|
||||
}
|
||||
|
||||
@@ -394,3 +394,93 @@ func TestMQTTSourceRegionField(t *testing.T) {
|
||||
t.Fatalf("expected region PDX, got %q", cfg.MQTTSources[0].Region)
|
||||
}
|
||||
}
|
||||
|
||||
// TestResolvedSourcesSchemeMapping verifies that mqtt:// and mqtts:// are translated
|
||||
// to the paho-native tcp:// and ssl:// schemes, while ws:// and wss:// pass through
|
||||
// unchanged (paho handles WebSocket connections natively).
|
||||
func TestResolvedSourcesSchemeMapping(t *testing.T) {
|
||||
tests := []struct {
|
||||
input string
|
||||
want string
|
||||
}{
|
||||
{"mqtt://host:1883", "tcp://host:1883"},
|
||||
{"mqtts://host:8883", "ssl://host:8883"},
|
||||
{"tcp://host:1883", "tcp://host:1883"},
|
||||
{"ssl://host:8883", "ssl://host:8883"},
|
||||
{"ws://host:9001", "ws://host:9001"},
|
||||
{"wss://host:9001", "wss://host:9001"},
|
||||
{"ws://host:9001/mqtt", "ws://host:9001/mqtt"},
|
||||
{"wss://host:9001/mqtt", "wss://host:9001/mqtt"},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
cfg := &Config{
|
||||
MQTTSources: []MQTTSource{
|
||||
{Name: "test", Broker: tt.input, Topics: []string{"meshcore/#"}},
|
||||
},
|
||||
}
|
||||
sources := cfg.ResolvedSources()
|
||||
if got := sources[0].Broker; got != tt.want {
|
||||
t.Errorf("ResolvedSources(%q) = %q, want %q", tt.input, got, tt.want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestLoadConfigWSSource verifies that a WebSocket MQTT source round-trips through
|
||||
// LoadConfig correctly — username/password preserved, scheme unchanged.
|
||||
func TestLoadConfigWSSource(t *testing.T) {
|
||||
t.Setenv("DB_PATH", "")
|
||||
t.Setenv("MQTT_BROKER", "")
|
||||
|
||||
dir := t.TempDir()
|
||||
cfgPath := filepath.Join(dir, "config.json")
|
||||
os.WriteFile(cfgPath, []byte(`{
|
||||
"dbPath": "test.db",
|
||||
"mqttSources": [
|
||||
{
|
||||
"name": "local-tcp",
|
||||
"broker": "mqtt://localhost:1883",
|
||||
"topics": ["meshcore/#"]
|
||||
},
|
||||
{
|
||||
"name": "wsmqtt-ws",
|
||||
"broker": "wss://wsmqtt.example.com/mqtt",
|
||||
"username": "corescope",
|
||||
"password": "s3cr3t",
|
||||
"topics": ["meshcore/#"]
|
||||
}
|
||||
]
|
||||
}`), 0o644)
|
||||
|
||||
cfg, err := LoadConfig(cfgPath)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(cfg.MQTTSources) != 2 {
|
||||
t.Fatalf("mqttSources len=%d, want 2", len(cfg.MQTTSources))
|
||||
}
|
||||
|
||||
tcp := cfg.MQTTSources[0]
|
||||
if tcp.Name != "local-tcp" {
|
||||
t.Errorf("name=%s, want local-tcp", tcp.Name)
|
||||
}
|
||||
|
||||
ws := cfg.MQTTSources[1]
|
||||
if ws.Name != "wsmqtt-ws" {
|
||||
t.Errorf("name=%s, want wsmqtt-ws", ws.Name)
|
||||
}
|
||||
if ws.Broker != "wss://wsmqtt.example.com/mqtt" {
|
||||
t.Errorf("broker=%s, want wss://wsmqtt.example.com/mqtt", ws.Broker)
|
||||
}
|
||||
if ws.Username != "corescope" {
|
||||
t.Errorf("username=%s, want corescope", ws.Username)
|
||||
}
|
||||
if ws.Password != "s3cr3t" {
|
||||
t.Errorf("password=%s, want s3cr3t", ws.Password)
|
||||
}
|
||||
|
||||
sources := cfg.ResolvedSources()
|
||||
if sources[1].Broker != "wss://wsmqtt.example.com/mqtt" {
|
||||
t.Errorf("ResolvedSources wss broker=%s, want unchanged", sources[1].Broker)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -436,7 +436,9 @@ func buildMQTTOpts(source MQTTSource) *mqtt.ClientOptions {
|
||||
}
|
||||
if source.RejectUnauthorized != nil && !*source.RejectUnauthorized {
|
||||
opts.SetTLSConfig(&tls.Config{InsecureSkipVerify: true})
|
||||
} else if strings.HasPrefix(source.Broker, "ssl://") {
|
||||
} else if strings.HasPrefix(source.Broker, "ssl://") || strings.HasPrefix(source.Broker, "wss://") {
|
||||
// TLS with system CA pool — valid for ssl:// MQTT brokers and
|
||||
// wss:// WebSocket brokers behind a publicly-trusted certificate.
|
||||
opts.SetTLSConfig(&tls.Config{})
|
||||
}
|
||||
return opts
|
||||
|
||||
+11
-2
@@ -137,6 +137,16 @@
|
||||
],
|
||||
"region": "SJC",
|
||||
"connectTimeoutSec": 45
|
||||
},
|
||||
{
|
||||
"_comment": "WebSocket MQTT broker (e.g. meshcore-mqtt-broker). Use ws:// for plain WebSocket or wss:// for TLS. Username/password supported.",
|
||||
"name": "wsmqtt",
|
||||
"broker": "wss://wsmqtt.example.com/mqtt",
|
||||
"username": "corescope",
|
||||
"password": "your-password",
|
||||
"topics": [
|
||||
"meshcore/#"
|
||||
]
|
||||
}
|
||||
],
|
||||
"channelKeys": {
|
||||
@@ -264,7 +274,7 @@
|
||||
"criticalMv": 3000,
|
||||
"_comment": "Voltage cutoffs (millivolts) for the per-node battery trend chart on /node-analytics. Latest sample below lowMv shows the node as ⚠️ Low; below criticalMv shows 🪫 Critical. Both default to 3300 / 3000 if omitted. Source data: observer_metrics.battery_mv populated from observer status messages; only nodes that are themselves observers (matching pubkey ↔ observer id) yield a series. Issue #663."
|
||||
},
|
||||
"_comment_mqttSources": "Each source connects to an MQTT broker. topics: what to subscribe to. iataFilter: only ingest packets from these regions (optional). region: default IATA region for this source — used when packet/topic doesn't specify one (optional, priority: payload > topic > this field).",
|
||||
"_comment_mqttSources": "Each source connects to an MQTT broker. Supported schemes: mqtt:// (plain TCP), mqtts:// (TLS), ws:// (WebSocket), wss:// (WebSocket TLS). topics: what to subscribe to. iataFilter: only ingest packets from these regions (optional). region: default IATA region for this source — used when packet/topic doesn't specify one (optional, priority: payload > topic > this field).",
|
||||
"compression": {
|
||||
"gzip": false,
|
||||
"websocket": false,
|
||||
@@ -281,7 +291,6 @@
|
||||
]
|
||||
},
|
||||
"_comment_compression": "Opt-in HTTP gzip middleware + WebSocket permessage-deflate. Both default to false — enable ONLY when your upstream reverse proxy is NOT already compressing. gzip: enables the gzipMiddleware wrapper around the HTTP handler. websocket: sets gorilla websocket Upgrader.EnableCompression. level: gzip compression level 1..9 (1=BestSpeed, 9=BestCompression, default 6). minSizeBytes: advisory minimum response size below which compression would not pay off. contentTypes: MIME allow-list — only responses with these Content-Type values are compressed. Already-compressed types (image/*, video/*, audio/*, application/zip, application/x-gzip, application/pdf, application/octet-stream) are always skipped, as are responses whose handler already set Content-Encoding. Omit contentTypes to use the built-in default allow-list.",
|
||||
"_comment_mqttSources": "Each source connects to an MQTT broker. topics: what to subscribe to. iataFilter: only ingest packets from these regions (optional).",
|
||||
"_comment_channelKeys": "Hex keys for decrypting channel messages. Key name = channel display name. public channel key is well-known.",
|
||||
"_comment_hashChannels": "Channel names whose keys are derived via SHA256. Key = SHA256(name)[:16]. Listed here so the ingestor can auto-derive keys.",
|
||||
"hashRegions": [
|
||||
|
||||
Reference in New Issue
Block a user