diff --git a/cmd/ingestor/config.go b/cmd/ingestor/config.go index a6b02e8b..fb56aeef 100644 --- a/cmd/ingestor/config.go +++ b/cmd/ingestor/config.go @@ -23,6 +23,7 @@ type MQTTSource struct { Topics []string `json:"topics"` IATAFilter []string `json:"iataFilter,omitempty"` ConnectTimeoutSec int `json:"connectTimeoutSec,omitempty"` + Region string `json:"region,omitempty"` } // ConnectTimeoutOrDefault returns the per-source connect timeout in seconds, diff --git a/cmd/ingestor/config_test.go b/cmd/ingestor/config_test.go index 574c2a5c..cb9e6098 100644 --- a/cmd/ingestor/config_test.go +++ b/cmd/ingestor/config_test.go @@ -375,3 +375,22 @@ func TestObserverIATAWhitelistJSON(t *testing.T) { t.Error("ARN should be allowed after loading from JSON") } } + +func TestMQTTSourceRegionField(t *testing.T) { + dir := t.TempDir() + cfgPath := filepath.Join(dir, "config.json") + os.WriteFile(cfgPath, []byte(`{ + "dbPath": "/tmp/test.db", + "mqttSources": [ + {"name": "cascadia", "broker": "tcp://localhost:1883", "topics": ["meshcore/#"], "region": "PDX"} + ] + }`), 0o644) + + cfg, err := LoadConfig(cfgPath) + if err != nil { + t.Fatal(err) + } + if cfg.MQTTSources[0].Region != "PDX" { + t.Fatalf("expected region PDX, got %q", cfg.MQTTSources[0].Region) + } +} diff --git a/cmd/ingestor/db.go b/cmd/ingestor/db.go index f66a3dcb..b110317e 100644 --- a/cmd/ingestor/db.go +++ b/cmd/ingestor/db.go @@ -1062,6 +1062,7 @@ type PacketData struct { PathJSON string DecodedJSON string ChannelHash string // grouping key for channel queries (#762) + Region string // observer region: payload > topic > source config (#788) } // nilIfEmpty returns nil for empty strings (for nullable DB columns). @@ -1080,6 +1081,7 @@ type MQTTPacketMessage struct { Score *float64 `json:"score"` Direction *string `json:"direction"` Origin string `json:"origin"` + Region string `json:"region,omitempty"` // optional region override (#788) } // BuildPacketData constructs a PacketData from a decoded packet and MQTT message. @@ -1119,6 +1121,13 @@ func BuildPacketData(msg *MQTTPacketMessage, decoded *DecodedPacket, observerID, DecodedJSON: PayloadJSON(&decoded.Payload), } + // Region priority: payload field > topic-derived parameter (#788) + if msg.Region != "" { + pd.Region = msg.Region + } else { + pd.Region = region + } + // Populate channel_hash for fast channel queries (#762) if decoded.Header.PayloadType == PayloadGRP_TXT { if decoded.Payload.Type == "CHAN" && decoded.Payload.Channel != "" { diff --git a/cmd/ingestor/db_test.go b/cmd/ingestor/db_test.go index d11c2968..c3f95933 100644 --- a/cmd/ingestor/db_test.go +++ b/cmd/ingestor/db_test.go @@ -2353,3 +2353,26 @@ func TestCleanupLegacyNullHashTimestamp(t *testing.T) { } s2.Close() } + +func TestBuildPacketDataRegionFromPayload(t *testing.T) { + msg := &MQTTPacketMessage{Raw: "0102030405060708", Region: "PDX"} + decoded := &DecodedPacket{ + Header: Header{RouteType: 1, PayloadType: 3}, + } + pkt := BuildPacketData(msg, decoded, "obs1", "SJC") + // When payload has region, it should override the topic-derived region + if pkt.Region != "PDX" { + t.Fatalf("expected region PDX from payload, got %q", pkt.Region) + } +} + +func TestBuildPacketDataRegionFallsBackToTopic(t *testing.T) { + msg := &MQTTPacketMessage{Raw: "0102030405060708"} + decoded := &DecodedPacket{ + Header: Header{RouteType: 1, PayloadType: 3}, + } + pkt := BuildPacketData(msg, decoded, "obs1", "SJC") + if pkt.Region != "SJC" { + t.Fatalf("expected region SJC from topic, got %q", pkt.Region) + } +} diff --git a/cmd/ingestor/main.go b/cmd/ingestor/main.go index 5af44c9d..6dda8b99 100644 --- a/cmd/ingestor/main.go +++ b/cmd/ingestor/main.go @@ -345,8 +345,16 @@ func handleMessage(store *Store, tag string, source MQTTSource, m mqtt.Message, if len(parts) > 1 { region = parts[1] } + // Fallback to source-level region config when topic has no region (#788) + if region == "" && source.Region != "" { + region = source.Region + } mqttMsg := &MQTTPacketMessage{Raw: rawHex} + // Parse optional region from JSON payload (#788) + if v, ok := msg["region"].(string); ok && v != "" { + mqttMsg.Region = v + } if v, ok := msg["SNR"]; ok { if f, ok := toFloat64(v); ok { mqttMsg.SNR = &f @@ -446,7 +454,12 @@ func handleMessage(store *Store, tag string, source MQTTSource, m mqtt.Message, // Upsert observer if observerID != "" { origin, _ := msg["origin"].(string) - if err := store.UpsertObserver(observerID, origin, region, nil); err != nil { + // Use effective region: payload > topic > source config (#788) + effectiveRegion := region + if mqttMsg.Region != "" { + effectiveRegion = mqttMsg.Region + } + if err := store.UpsertObserver(observerID, origin, effectiveRegion, nil); err != nil { log.Printf("MQTT [%s] observer upsert error: %v", tag, err) } } diff --git a/config.example.json b/config.example.json index e54c37c0..71847f9d 100644 --- a/config.example.json +++ b/config.example.json @@ -132,6 +132,7 @@ "OAK", "MRY" ], + "region": "SJC", "connectTimeoutSec": 45 } ], @@ -227,7 +228,7 @@ "maxAgeDays": 5, "_comment": "Neighbor edges older than this many days are pruned on startup and daily. Default: 5." }, - "_comment_mqttSources": "Each source connects to an MQTT broker. topics: what to subscribe to. iataFilter: only ingest packets from these regions (optional).", + "_comment_mqttSources": "Each source connects to an MQTT broker. topics: what to subscribe to. iataFilter: only ingest packets from these regions (optional). region: default IATA region for this source — used when packet/topic doesn't specify one (optional, priority: payload > topic > this field).", "_comment_channelKeys": "Hex keys for decrypting channel messages. Key name = channel display name. public channel key is well-known.", "_comment_hashChannels": "Channel names whose keys are derived via SHA256. Key = SHA256(name)[:16]. Listed here so the ingestor can auto-derive keys.", "_comment_defaultRegion": "IATA code shown by default in region filters.",