feat: add optional MQTT region field (#788) (#1012)

## Summary

Add optional `region` field to MQTT source config and JSON payload,
enabling publishers to explicitly provide region data without relying
solely on topic path structure.

## Changes

- **`MQTTSource.Region`** — new optional config field. When set, acts as
default region for all messages from that source (useful when a broker
serves a single region).
- **`MQTTPacketMessage.Region`** — new optional JSON payload field.
Publishers can include `"region": "PDX"` in their MQTT messages.
- **`PacketData.Region`** — carries the resolved region through to
storage.
- **Priority resolution**: payload `region` > topic-derived region >
source config `region`
- Observer IATA is updated with the effective region on every packet.

## Config example

```json
{
  "mqttSources": [
    {
      "name": "cascadia",
      "broker": "tcp://cascadia-broker:1883",
      "topics": ["meshcore/#"],
      "region": "PDX"
    }
  ]
}
```

## Payload example

```json
{"raw": "0a1b2c...", "SNR": 5.2, "region": "PDX"}
```

## TDD

- Red commit: `980304c` (tests fail at compile — fields don't exist)
- Green commit: `4caf88b` (implementation, all tests pass)

## Unblocks

- #804, #770, #730 (all depend on region being available on
observations)

Fixes #788

---------

Co-authored-by: you <you@example.com>
This commit is contained in:
Kpa-clawbot
2026-05-03 11:21:54 -07:00
committed by GitHub
parent c186129d47
commit b0e4d2fa18
6 changed files with 68 additions and 2 deletions
+1
View File
@@ -23,6 +23,7 @@ type MQTTSource struct {
Topics []string `json:"topics"`
IATAFilter []string `json:"iataFilter,omitempty"`
ConnectTimeoutSec int `json:"connectTimeoutSec,omitempty"`
Region string `json:"region,omitempty"`
}
// ConnectTimeoutOrDefault returns the per-source connect timeout in seconds,
+19
View File
@@ -375,3 +375,22 @@ func TestObserverIATAWhitelistJSON(t *testing.T) {
t.Error("ARN should be allowed after loading from JSON")
}
}
func TestMQTTSourceRegionField(t *testing.T) {
dir := t.TempDir()
cfgPath := filepath.Join(dir, "config.json")
os.WriteFile(cfgPath, []byte(`{
"dbPath": "/tmp/test.db",
"mqttSources": [
{"name": "cascadia", "broker": "tcp://localhost:1883", "topics": ["meshcore/#"], "region": "PDX"}
]
}`), 0o644)
cfg, err := LoadConfig(cfgPath)
if err != nil {
t.Fatal(err)
}
if cfg.MQTTSources[0].Region != "PDX" {
t.Fatalf("expected region PDX, got %q", cfg.MQTTSources[0].Region)
}
}
+9
View File
@@ -1062,6 +1062,7 @@ type PacketData struct {
PathJSON string
DecodedJSON string
ChannelHash string // grouping key for channel queries (#762)
Region string // observer region: payload > topic > source config (#788)
}
// nilIfEmpty returns nil for empty strings (for nullable DB columns).
@@ -1080,6 +1081,7 @@ type MQTTPacketMessage struct {
Score *float64 `json:"score"`
Direction *string `json:"direction"`
Origin string `json:"origin"`
Region string `json:"region,omitempty"` // optional region override (#788)
}
// BuildPacketData constructs a PacketData from a decoded packet and MQTT message.
@@ -1119,6 +1121,13 @@ func BuildPacketData(msg *MQTTPacketMessage, decoded *DecodedPacket, observerID,
DecodedJSON: PayloadJSON(&decoded.Payload),
}
// Region priority: payload field > topic-derived parameter (#788)
if msg.Region != "" {
pd.Region = msg.Region
} else {
pd.Region = region
}
// Populate channel_hash for fast channel queries (#762)
if decoded.Header.PayloadType == PayloadGRP_TXT {
if decoded.Payload.Type == "CHAN" && decoded.Payload.Channel != "" {
+23
View File
@@ -2353,3 +2353,26 @@ func TestCleanupLegacyNullHashTimestamp(t *testing.T) {
}
s2.Close()
}
func TestBuildPacketDataRegionFromPayload(t *testing.T) {
msg := &MQTTPacketMessage{Raw: "0102030405060708", Region: "PDX"}
decoded := &DecodedPacket{
Header: Header{RouteType: 1, PayloadType: 3},
}
pkt := BuildPacketData(msg, decoded, "obs1", "SJC")
// When payload has region, it should override the topic-derived region
if pkt.Region != "PDX" {
t.Fatalf("expected region PDX from payload, got %q", pkt.Region)
}
}
func TestBuildPacketDataRegionFallsBackToTopic(t *testing.T) {
msg := &MQTTPacketMessage{Raw: "0102030405060708"}
decoded := &DecodedPacket{
Header: Header{RouteType: 1, PayloadType: 3},
}
pkt := BuildPacketData(msg, decoded, "obs1", "SJC")
if pkt.Region != "SJC" {
t.Fatalf("expected region SJC from topic, got %q", pkt.Region)
}
}
+14 -1
View File
@@ -345,8 +345,16 @@ func handleMessage(store *Store, tag string, source MQTTSource, m mqtt.Message,
if len(parts) > 1 {
region = parts[1]
}
// Fallback to source-level region config when topic has no region (#788)
if region == "" && source.Region != "" {
region = source.Region
}
mqttMsg := &MQTTPacketMessage{Raw: rawHex}
// Parse optional region from JSON payload (#788)
if v, ok := msg["region"].(string); ok && v != "" {
mqttMsg.Region = v
}
if v, ok := msg["SNR"]; ok {
if f, ok := toFloat64(v); ok {
mqttMsg.SNR = &f
@@ -446,7 +454,12 @@ func handleMessage(store *Store, tag string, source MQTTSource, m mqtt.Message,
// Upsert observer
if observerID != "" {
origin, _ := msg["origin"].(string)
if err := store.UpsertObserver(observerID, origin, region, nil); err != nil {
// Use effective region: payload > topic > source config (#788)
effectiveRegion := region
if mqttMsg.Region != "" {
effectiveRegion = mqttMsg.Region
}
if err := store.UpsertObserver(observerID, origin, effectiveRegion, nil); err != nil {
log.Printf("MQTT [%s] observer upsert error: %v", tag, err)
}
}
+2 -1
View File
@@ -132,6 +132,7 @@
"OAK",
"MRY"
],
"region": "SJC",
"connectTimeoutSec": 45
}
],
@@ -227,7 +228,7 @@
"maxAgeDays": 5,
"_comment": "Neighbor edges older than this many days are pruned on startup and daily. Default: 5."
},
"_comment_mqttSources": "Each source connects to an MQTT broker. topics: what to subscribe to. iataFilter: only ingest packets from these regions (optional).",
"_comment_mqttSources": "Each source connects to an MQTT broker. topics: what to subscribe to. iataFilter: only ingest packets from these regions (optional). region: default IATA region for this source — used when packet/topic doesn't specify one (optional, priority: payload > topic > this field).",
"_comment_channelKeys": "Hex keys for decrypting channel messages. Key name = channel display name. public channel key is well-known.",
"_comment_hashChannels": "Channel names whose keys are derived via SHA256. Key = SHA256(name)[:16]. Listed here so the ingestor can auto-derive keys.",
"_comment_defaultRegion": "IATA code shown by default in region filters.",