diff --git a/cmd/ingestor/config.go b/cmd/ingestor/config.go index a43552af..a6b02e8b 100644 --- a/cmd/ingestor/config.go +++ b/cmd/ingestor/config.go @@ -55,6 +55,16 @@ type Config struct { ValidateSignatures *bool `json:"validateSignatures,omitempty"` DB *DBConfig `json:"db,omitempty"` + // ObserverIATAWhitelist restricts which observer IATA regions are processed. + // When non-empty, only observers whose IATA code (from the MQTT topic) matches + // one of these entries are accepted. Case-insensitive. An empty list means all + // IATA codes are allowed. This applies globally, unlike the per-source iataFilter. + ObserverIATAWhitelist []string `json:"observerIATAWhitelist,omitempty"` + + // obsIATAWhitelistCached is the lazily-built uppercase set for O(1) lookups. + obsIATAWhitelistCached map[string]bool + obsIATAWhitelistOnce sync.Once + // ObserverBlacklist is a list of observer public keys to drop at ingest. // Messages from blacklisted observers are silently discarded — no DB writes, // no UpsertObserver, no observations, no metrics. @@ -150,6 +160,25 @@ func (c *Config) IsObserverBlacklisted(id string) bool { return c.obsBlacklistSetCached[strings.ToLower(strings.TrimSpace(id))] } +// IsObserverIATAAllowed returns true if the given IATA code is permitted. +// When ObserverIATAWhitelist is empty, all codes are allowed. +func (c *Config) IsObserverIATAAllowed(iata string) bool { + if c == nil || len(c.ObserverIATAWhitelist) == 0 { + return true + } + c.obsIATAWhitelistOnce.Do(func() { + m := make(map[string]bool, len(c.ObserverIATAWhitelist)) + for _, code := range c.ObserverIATAWhitelist { + trimmed := strings.ToUpper(strings.TrimSpace(code)) + if trimmed != "" { + m[trimmed] = true + } + } + c.obsIATAWhitelistCached = m + }) + return c.obsIATAWhitelistCached[strings.ToUpper(strings.TrimSpace(iata))] +} + // LoadConfig reads configuration from a JSON file, with env var overrides. // If the config file does not exist, sensible defaults are used (zero-config startup). func LoadConfig(path string) (*Config, error) { diff --git a/cmd/ingestor/config_test.go b/cmd/ingestor/config_test.go index d2694f4a..574c2a5c 100644 --- a/cmd/ingestor/config_test.go +++ b/cmd/ingestor/config_test.go @@ -317,3 +317,61 @@ func TestConnectTimeoutFromJSON(t *testing.T) { t.Errorf("from JSON: got %d, want 5", got) } } + +func TestObserverIATAWhitelist(t *testing.T) { + // Config with whitelist set + cfg := Config{ + ObserverIATAWhitelist: []string{"ARN", "got"}, + } + + // Matching (case-insensitive) + if !cfg.IsObserverIATAAllowed("ARN") { + t.Error("ARN should be allowed") + } + if !cfg.IsObserverIATAAllowed("arn") { + t.Error("arn (lowercase) should be allowed") + } + if !cfg.IsObserverIATAAllowed("GOT") { + t.Error("GOT should be allowed") + } + + // Non-matching + if cfg.IsObserverIATAAllowed("SJC") { + t.Error("SJC should NOT be allowed") + } + + // Empty string not allowed + if cfg.IsObserverIATAAllowed("") { + t.Error("empty IATA should NOT be allowed") + } +} + +func TestObserverIATAWhitelistEmpty(t *testing.T) { + // No whitelist = allow all + cfg := Config{} + if !cfg.IsObserverIATAAllowed("SJC") { + t.Error("with no whitelist, all IATAs should be allowed") + } + if !cfg.IsObserverIATAAllowed("") { + t.Error("with no whitelist, even empty IATA should be allowed") + } +} + +func TestObserverIATAWhitelistJSON(t *testing.T) { + json := `{ + "dbPath": "test.db", + "observerIATAWhitelist": ["ARN", "GOT"] + }` + tmp := t.TempDir() + "/config.json" + os.WriteFile(tmp, []byte(json), 0644) + cfg, err := LoadConfig(tmp) + if err != nil { + t.Fatal(err) + } + if len(cfg.ObserverIATAWhitelist) != 2 { + t.Fatalf("expected 2 entries, got %d", len(cfg.ObserverIATAWhitelist)) + } + if !cfg.IsObserverIATAAllowed("ARN") { + t.Error("ARN should be allowed after loading from JSON") + } +} diff --git a/cmd/ingestor/main.go b/cmd/ingestor/main.go index 2ae93bbe..5af44c9d 100644 --- a/cmd/ingestor/main.go +++ b/cmd/ingestor/main.go @@ -274,8 +274,14 @@ func handleMessage(store *Store, tag string, source MQTTSource, m mqtt.Message, return } + // Global observer IATA whitelist: if configured, drop messages from observers + // in non-whitelisted IATA regions. Applies to ALL message types (status + packets). + if len(parts) > 1 && !cfg.IsObserverIATAAllowed(parts[1]) { + return + } + // Status topic: meshcore///status - // IATA filter does NOT apply here — observer metadata (noise_floor, battery, etc.) + // Per-source IATA filter does NOT apply here — observer metadata (noise_floor, battery, etc.) // is region-independent and should be accepted from all observers regardless of // which IATA regions are configured for packet ingestion. if len(parts) >= 4 && parts[3] == "status" { diff --git a/cmd/ingestor/main_test.go b/cmd/ingestor/main_test.go index d90f4e88..ce5b3116 100644 --- a/cmd/ingestor/main_test.go +++ b/cmd/ingestor/main_test.go @@ -904,3 +904,34 @@ func TestBL2_ZeroConnectedFatals(t *testing.T) { t.Log("BL2 confirmed: old guard len(clients)==0 would NOT fatal; new guard connectedCount==0 correctly catches zero-connected state") } } + +func TestHandleMessageObserverIATAWhitelist(t *testing.T) { + store := newTestStore(t) + source := MQTTSource{Name: "test"} + cfg := &Config{ + ObserverIATAWhitelist: []string{"ARN"}, + } + + // Message from non-whitelisted region GOT — should be dropped + handleMessage(store, "test", source, &mockMessage{ + topic: "meshcore/GOT/obs1/status", + payload: []byte(`{"origin":"node1","noise_floor":-110}`), + }, nil, cfg) + + var count int + store.db.QueryRow("SELECT COUNT(*) FROM observers WHERE id='obs1'").Scan(&count) + if count != 0 { + t.Error("observer from non-whitelisted IATA GOT should be dropped") + } + + // Message from whitelisted region ARN — should be accepted + handleMessage(store, "test", source, &mockMessage{ + topic: "meshcore/ARN/obs2/status", + payload: []byte(`{"origin":"node2","noise_floor":-105}`), + }, nil, cfg) + + store.db.QueryRow("SELECT COUNT(*) FROM observers WHERE id='obs2'").Scan(&count) + if count != 1 { + t.Errorf("observer from whitelisted IATA ARN should be accepted, got count=%d", count) + } +} diff --git a/config.example.json b/config.example.json index 87bd9415..e54c37c0 100644 --- a/config.example.json +++ b/config.example.json @@ -3,6 +3,8 @@ "apiKey": "your-secret-api-key-here", "nodeBlacklist": [], "_comment_nodeBlacklist": "Public keys of nodes to hide from all API responses. Use for trolls, offensive names, or nodes reporting false data that operators refuse to fix.", + "observerIATAWhitelist": [], + "_comment_observerIATAWhitelist": "Global IATA region whitelist. When non-empty, only observers whose IATA code (from MQTT topic) matches are processed. Case-insensitive. Empty = allow all. Unlike per-source iataFilter, this applies across all MQTT sources.", "retention": { "nodeDays": 7, "observerDays": 14,