diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml
index 9e70c6b7..b827d226 100644
--- a/.github/workflows/deploy.yml
+++ b/.github/workflows/deploy.yml
@@ -100,6 +100,7 @@ jobs:
node test-channel-modal-ux.js
node test-channel-issue-1087.js
node test-channel-issue-1101.js
+ node test-observer-iata-1188.js
node test-pull-to-reconnect-1091.js
node test-channel-fluid-layout.js
@@ -228,6 +229,7 @@ jobs:
BASE_URL=http://localhost:13581 node test-channel-issue-1087-e2e.js 2>&1 | tee -a e2e-output.txt
BASE_URL=http://localhost:13581 node test-channel-issue-1111-e2e.js 2>&1 | tee -a e2e-output.txt
BASE_URL=http://localhost:13581 node test-map-modal-fluid-e2e.js 2>&1 | tee -a e2e-output.txt
+ BASE_URL=http://localhost:13581 node test-observer-iata-1188-e2e.js 2>&1 | tee -a e2e-output.txt
CHROMIUM_REQUIRE=1 BASE_URL=http://localhost:13581 node test-nav-fluid-1055-e2e.js 2>&1 | tee -a e2e-output.txt
CHROMIUM_REQUIRE=1 BASE_URL=http://localhost:13581 node test-nav-priority-1102-e2e.js 2>&1 | tee -a e2e-output.txt
CHROMIUM_REQUIRE=1 BASE_URL=http://localhost:13581 node test-nav-more-floor-1139-e2e.js 2>&1 | tee -a e2e-output.txt
diff --git a/cmd/server/bounded_load_test.go b/cmd/server/bounded_load_test.go
index 3a0c5bef..db425dde 100644
--- a/cmd/server/bounded_load_test.go
+++ b/cmd/server/bounded_load_test.go
@@ -162,7 +162,7 @@ func createTestDBWithAgedPackets(t *testing.T, numRecent, numOld int) string {
}
execOrFail(`CREATE TABLE transmissions (id INTEGER PRIMARY KEY, raw_hex TEXT, hash TEXT, first_seen TEXT, route_type INTEGER, payload_type INTEGER, payload_version INTEGER, decoded_json TEXT)`)
execOrFail(`CREATE TABLE observations (id INTEGER PRIMARY KEY, transmission_id INTEGER, observer_id TEXT, observer_name TEXT, direction TEXT, snr REAL, rssi REAL, score INTEGER, path_json TEXT, timestamp TEXT, raw_hex TEXT)`)
- execOrFail(`CREATE TABLE observers (rowid INTEGER PRIMARY KEY, id TEXT, name TEXT)`)
+ execOrFail(`CREATE TABLE observers (rowid INTEGER PRIMARY KEY, id TEXT, name TEXT, iata TEXT)`)
execOrFail(`CREATE TABLE nodes (pubkey TEXT PRIMARY KEY, name TEXT, role TEXT, lat REAL, lon REAL, last_seen TEXT, first_seen TEXT, frequency REAL)`)
execOrFail(`CREATE TABLE schema_version (version INTEGER)`)
execOrFail(`INSERT INTO schema_version (version) VALUES (1)`)
@@ -321,7 +321,7 @@ func createTestDBAt(tb testing.TB, dbPath string, numTx int) {
direction TEXT, snr REAL, rssi REAL, score INTEGER,
path_json TEXT, timestamp TEXT, raw_hex TEXT
)`)
- execOrFail(`CREATE TABLE IF NOT EXISTS observers (rowid INTEGER PRIMARY KEY, id TEXT, name TEXT)`)
+ execOrFail(`CREATE TABLE IF NOT EXISTS observers (rowid INTEGER PRIMARY KEY, id TEXT, name TEXT, iata TEXT)`)
execOrFail(`CREATE TABLE IF NOT EXISTS nodes (
pubkey TEXT PRIMARY KEY, name TEXT, role TEXT, lat REAL, lon REAL,
last_seen TEXT, first_seen TEXT, frequency REAL
@@ -372,7 +372,7 @@ func createTestDBWithObs(tb testing.TB, dbPath string, numTx int) {
id INTEGER PRIMARY KEY, transmission_id INTEGER, observer_id TEXT, observer_name TEXT,
direction TEXT, snr REAL, rssi REAL, score INTEGER, path_json TEXT, timestamp TEXT, raw_hex TEXT
)`)
- execOrFail(`CREATE TABLE IF NOT EXISTS observers (rowid INTEGER PRIMARY KEY, id TEXT, name TEXT)`)
+ execOrFail(`CREATE TABLE IF NOT EXISTS observers (rowid INTEGER PRIMARY KEY, id TEXT, name TEXT, iata TEXT)`)
execOrFail(`CREATE TABLE IF NOT EXISTS nodes (
pubkey TEXT PRIMARY KEY, name TEXT, role TEXT, lat REAL, lon REAL,
last_seen TEXT, first_seen TEXT, frequency REAL
diff --git a/cmd/server/db.go b/cmd/server/db.go
index b080f79c..1be8ad6a 100644
--- a/cmd/server/db.go
+++ b/cmd/server/db.go
@@ -7,6 +7,7 @@ import (
"log"
"math"
"os"
+ "sort"
"strings"
"sync"
"time"
@@ -89,7 +90,7 @@ func (db *DB) transmissionBaseSQL() (selectCols, observerJoin string) {
if db.isV3 {
selectCols = `t.id, t.raw_hex, t.hash, t.first_seen, t.route_type, t.payload_type, t.decoded_json,
COALESCE((SELECT COUNT(*) FROM observations WHERE transmission_id = t.id), 0) AS observation_count,
- obs.id AS observer_id, obs.name AS observer_name,
+ obs.id AS observer_id, obs.name AS observer_name, COALESCE(obs.iata, '') AS observer_iata,
o.snr, o.rssi, o.path_json, o.direction`
observerJoin = `LEFT JOIN observations o ON o.id = (
SELECT id FROM observations WHERE transmission_id = t.id
@@ -99,12 +100,13 @@ func (db *DB) transmissionBaseSQL() (selectCols, observerJoin string) {
} else {
selectCols = `t.id, t.raw_hex, t.hash, t.first_seen, t.route_type, t.payload_type, t.decoded_json,
COALESCE((SELECT COUNT(*) FROM observations WHERE transmission_id = t.id), 0) AS observation_count,
- o.observer_id, o.observer_name,
+ o.observer_id, o.observer_name, COALESCE(obs2.iata, '') AS observer_iata,
o.snr, o.rssi, o.path_json, o.direction`
observerJoin = `LEFT JOIN observations o ON o.id = (
SELECT id FROM observations WHERE transmission_id = t.id
ORDER BY length(COALESCE(path_json,'')) DESC LIMIT 1
- )`
+ )
+ LEFT JOIN observers obs2 ON obs2.id = o.observer_id`
}
return
}
@@ -113,12 +115,12 @@ func (db *DB) transmissionBaseSQL() (selectCols, observerJoin string) {
// Returns a map matching the Node.js packet-store transmission shape.
func (db *DB) scanTransmissionRow(rows *sql.Rows) map[string]interface{} {
var id, observationCount int
- var rawHex, hash, firstSeen, decodedJSON, observerID, observerName, pathJSON, direction sql.NullString
+ var rawHex, hash, firstSeen, decodedJSON, observerID, observerName, observerIATA, pathJSON, direction sql.NullString
var routeType, payloadType sql.NullInt64
var snr, rssi sql.NullFloat64
if err := rows.Scan(&id, &rawHex, &hash, &firstSeen, &routeType, &payloadType, &decodedJSON,
- &observationCount, &observerID, &observerName, &snr, &rssi, &pathJSON, &direction); err != nil {
+ &observationCount, &observerID, &observerName, &observerIATA, &snr, &rssi, &pathJSON, &direction); err != nil {
return nil
}
@@ -134,6 +136,7 @@ func (db *DB) scanTransmissionRow(rows *sql.Rows) map[string]interface{} {
"observation_count": observationCount,
"observer_id": nullStr(observerID),
"observer_name": nullStr(observerName),
+ "observer_iata": nullStr(observerIATA),
"snr": nullFloat(snr),
"rssi": nullFloat(rssi),
"path_json": nullStr(pathJSON),
@@ -469,15 +472,20 @@ func (db *DB) QueryGroupedPackets(q PacketQuery) (*PacketResult, error) {
db.conn.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM transmissions t %s", w), args...).Scan(&total)
}
- // Build grouped query using transmissions table with correlated subqueries
+ // Build grouped query using transmissions table with correlated subqueries.
+ // #1189 R2: distinct_iatas is a NEW column — comma-separated DISTINCT IATA
+ // codes across all observers of the transmission, with empty/NULL IATAs
+ // excluded. Frontend needs this on the DEFAULT COLLAPSED VIEW (where
+ // p._children is empty), so we compute it server-side.
var querySQL string
if db.isV3 {
querySQL = fmt.Sprintf(`SELECT t.hash, t.first_seen, t.raw_hex, t.decoded_json, t.payload_type, t.route_type,
COALESCE((SELECT COUNT(*) FROM observations oi WHERE oi.transmission_id = t.id), 0) AS count,
COALESCE((SELECT COUNT(DISTINCT oi.observer_idx) FROM observations oi WHERE oi.transmission_id = t.id), 0) AS observer_count,
COALESCE((SELECT MAX(strftime('%%Y-%%m-%%dT%%H:%%M:%%fZ', oi.timestamp, 'unixepoch')) FROM observations oi WHERE oi.transmission_id = t.id), t.first_seen) AS latest,
- obs.id AS observer_id, obs.name AS observer_name,
- o.snr, o.rssi, o.path_json
+ obs.id AS observer_id, obs.name AS observer_name, COALESCE(obs.iata, '') AS observer_iata,
+ o.snr, o.rssi, o.path_json,
+ COALESCE((SELECT GROUP_CONCAT(DISTINCT obi.iata) FROM observations oi JOIN observers obi ON obi.rowid = oi.observer_idx WHERE oi.transmission_id = t.id AND obi.iata IS NOT NULL AND obi.iata != ''), '') AS distinct_iatas
FROM transmissions t
LEFT JOIN observations o ON o.id = (
SELECT id FROM observations WHERE transmission_id = t.id
@@ -490,13 +498,15 @@ func (db *DB) QueryGroupedPackets(q PacketQuery) (*PacketResult, error) {
COALESCE((SELECT COUNT(*) FROM observations oi WHERE oi.transmission_id = t.id), 0) AS count,
COALESCE((SELECT COUNT(DISTINCT oi.observer_id) FROM observations oi WHERE oi.transmission_id = t.id), 0) AS observer_count,
COALESCE((SELECT MAX(oi.timestamp) FROM observations oi WHERE oi.transmission_id = t.id), t.first_seen) AS latest,
- o.observer_id, o.observer_name,
- o.snr, o.rssi, o.path_json
+ o.observer_id, o.observer_name, COALESCE(obs2.iata, '') AS observer_iata,
+ o.snr, o.rssi, o.path_json,
+ COALESCE((SELECT GROUP_CONCAT(DISTINCT obi.iata) FROM observations oi JOIN observers obi ON obi.id = oi.observer_id WHERE oi.transmission_id = t.id AND obi.iata IS NOT NULL AND obi.iata != ''), '') AS distinct_iatas
FROM transmissions t
LEFT JOIN observations o ON o.id = (
SELECT id FROM observations WHERE transmission_id = t.id
ORDER BY length(COALESCE(path_json,'')) DESC LIMIT 1
)
+ LEFT JOIN observers obs2 ON obs2.id = o.observer_id
%s ORDER BY latest DESC LIMIT ? OFFSET ?`, w)
}
@@ -512,14 +522,14 @@ func (db *DB) QueryGroupedPackets(q PacketQuery) (*PacketResult, error) {
packets := make([]map[string]interface{}, 0)
for rows.Next() {
- var hash, firstSeen, rawHex, decodedJSON, latest, observerID, observerName, pathJSON sql.NullString
+ var hash, firstSeen, rawHex, decodedJSON, latest, observerID, observerName, observerIATA, pathJSON, distinctIatasCSV sql.NullString
var payloadType, routeType sql.NullInt64
var count, observerCount int
var snr, rssi sql.NullFloat64
if err := rows.Scan(&hash, &firstSeen, &rawHex, &decodedJSON, &payloadType, &routeType,
&count, &observerCount, &latest,
- &observerID, &observerName, &snr, &rssi, &pathJSON); err != nil {
+ &observerID, &observerName, &observerIATA, &snr, &rssi, &pathJSON, &distinctIatasCSV); err != nil {
continue
}
@@ -532,6 +542,8 @@ func (db *DB) QueryGroupedPackets(q PacketQuery) (*PacketResult, error) {
"latest": nullStr(latest),
"observer_id": nullStr(observerID),
"observer_name": nullStr(observerName),
+ "observer_iata": nullStr(observerIATA),
+ "distinct_iatas": parseDistinctIatasCSV(nullStr(distinctIatasCSV)),
"path_json": nullStr(pathJSON),
"payload_type": nullInt(payloadType),
"route_type": nullInt(routeType),
@@ -545,6 +557,29 @@ func (db *DB) QueryGroupedPackets(q PacketQuery) (*PacketResult, error) {
return &PacketResult{Packets: packets, Total: total}, nil
}
+// parseDistinctIatasCSV turns SQLite GROUP_CONCAT output ("SJC,SFO,OAK") into
+// a sorted, deduped []string. Returns an empty (non-nil) slice when the input
+// is empty/nil so JSON serialization stays consistent (`[]` not `null`).
+func parseDistinctIatasCSV(v interface{}) []string {
+ s, ok := v.(string)
+ if !ok || s == "" {
+ return []string{}
+ }
+ parts := strings.Split(s, ",")
+ seen := make(map[string]bool, len(parts))
+ out := make([]string, 0, len(parts))
+ for _, p := range parts {
+ code := strings.TrimSpace(p)
+ if code == "" || seen[code] {
+ continue
+ }
+ seen[code] = true
+ out = append(out, code)
+ }
+ sort.Strings(out)
+ return out
+}
+
func (db *DB) buildPacketWhere(q PacketQuery) ([]string, []interface{}) {
var where []string
var args []interface{}
@@ -971,16 +1006,17 @@ func (db *DB) getObservationsForTransmissions(txIDs []int) map[int][]map[string]
var querySQL string
if db.isV3 {
- querySQL = fmt.Sprintf(`SELECT o.transmission_id, o.id, obs.id AS observer_id, obs.name AS observer_name,
+ querySQL = fmt.Sprintf(`SELECT o.transmission_id, o.id, obs.id AS observer_id, obs.name AS observer_name, COALESCE(obs.iata, '') AS observer_iata,
o.direction, o.snr, o.rssi, o.path_json, strftime('%%Y-%%m-%%dT%%H:%%M:%%fZ', o.timestamp, 'unixepoch') AS obs_timestamp
FROM observations o
LEFT JOIN observers obs ON obs.rowid = o.observer_idx
WHERE o.transmission_id IN (%s)
ORDER BY o.timestamp DESC`, strings.Join(placeholders, ","))
} else {
- querySQL = fmt.Sprintf(`SELECT o.transmission_id, o.id, o.observer_id, o.observer_name,
+ querySQL = fmt.Sprintf(`SELECT o.transmission_id, o.id, o.observer_id, o.observer_name, COALESCE(obs.iata, '') AS observer_iata,
o.direction, o.snr, o.rssi, o.path_json, o.timestamp AS obs_timestamp
FROM observations o
+ LEFT JOIN observers obs ON obs.id = o.observer_id
WHERE o.transmission_id IN (%s)
ORDER BY o.timestamp DESC`, strings.Join(placeholders, ","))
}
@@ -993,10 +1029,10 @@ func (db *DB) getObservationsForTransmissions(txIDs []int) map[int][]map[string]
for rows.Next() {
var txID, obsID int
- var observerID, observerName, direction, pathJSON, obsTimestamp sql.NullString
+ var observerID, observerName, observerIATA, direction, pathJSON, obsTimestamp sql.NullString
var snr, rssi sql.NullFloat64
- if err := rows.Scan(&txID, &obsID, &observerID, &observerName, &direction,
+ if err := rows.Scan(&txID, &obsID, &observerID, &observerName, &observerIATA, &direction,
&snr, &rssi, &pathJSON, &obsTimestamp); err != nil {
continue
}
@@ -1011,6 +1047,7 @@ func (db *DB) getObservationsForTransmissions(txIDs []int) map[int][]map[string]
"transmission_id": txID,
"observer_id": nullStr(observerID),
"observer_name": nullStr(observerName),
+ "observer_iata": nullStr(observerIATA),
"snr": nullFloat(snr),
"rssi": nullFloat(rssi),
"path_json": nullStr(pathJSON),
diff --git a/cmd/server/hot_startup_test.go b/cmd/server/hot_startup_test.go
index da9285a0..a6455f42 100644
--- a/cmd/server/hot_startup_test.go
+++ b/cmd/server/hot_startup_test.go
@@ -37,7 +37,7 @@ func createTestDBMultiDay(t *testing.T, numDays, txPerDay int) string {
}
execOrFail(`CREATE TABLE transmissions (id INTEGER PRIMARY KEY, raw_hex TEXT, hash TEXT, first_seen TEXT, route_type INTEGER, payload_type INTEGER, payload_version INTEGER, decoded_json TEXT)`)
execOrFail(`CREATE TABLE observations (id INTEGER PRIMARY KEY, transmission_id INTEGER, observer_id TEXT, observer_name TEXT, direction TEXT, snr REAL, rssi REAL, score INTEGER, path_json TEXT, timestamp TEXT, raw_hex TEXT)`)
- execOrFail(`CREATE TABLE observers (rowid INTEGER PRIMARY KEY, id TEXT, name TEXT)`)
+ execOrFail(`CREATE TABLE observers (rowid INTEGER PRIMARY KEY, id TEXT, name TEXT, iata TEXT)`)
execOrFail(`CREATE TABLE nodes (pubkey TEXT PRIMARY KEY, name TEXT, role TEXT, lat REAL, lon REAL, last_seen TEXT, first_seen TEXT, frequency REAL)`)
execOrFail(`CREATE TABLE schema_version (version INTEGER)`)
execOrFail(`INSERT INTO schema_version (version) VALUES (1)`)
diff --git a/cmd/server/issue1189_distinct_iatas_test.go b/cmd/server/issue1189_distinct_iatas_test.go
new file mode 100644
index 00000000..3a096fe9
--- /dev/null
+++ b/cmd/server/issue1189_distinct_iatas_test.go
@@ -0,0 +1,108 @@
+package main
+
+import (
+ "sort"
+ "strings"
+ "testing"
+ "time"
+)
+
+// TestQueryGroupedPacketsReturnsDistinctIATAs (#1189 R2):
+// The default collapsed grouped view must already expose the DISTINCT set
+// of observer IATA codes for each transmission — frontend can't compute it
+// because p._children is empty until the user expands the row (or applies a
+// non-default sort). Previously the cell showed a single IATA + "+N" of
+// observer count, which conflates SAME-region redundancy with CROSS-region
+// reception. R1 added a frontend helper but it only fired on the expanded
+// view; this test gates the server-side fix.
+//
+// Seeds one transmission with observations from two IATAs (SJC, SFO) and
+// asserts the grouped row carries distinct_iatas containing both codes.
+func TestQueryGroupedPacketsReturnsDistinctIATAs(t *testing.T) {
+ db := setupTestDB(t)
+ defer db.Close()
+
+ now := time.Now().UTC()
+ recentEpoch := now.Add(-1 * time.Hour).Unix()
+
+ // Observers: SJC + SFO + a third with no IATA (should be excluded).
+ db.conn.Exec(`INSERT INTO observers (id, name, iata, last_seen, first_seen, packet_count)
+ VALUES ('obsA', 'A', 'SJC', ?, '2026-01-01T00:00:00Z', 10)`, now.Format(time.RFC3339))
+ db.conn.Exec(`INSERT INTO observers (id, name, iata, last_seen, first_seen, packet_count)
+ VALUES ('obsB', 'B', 'SFO', ?, '2026-01-01T00:00:00Z', 10)`, now.Format(time.RFC3339))
+ db.conn.Exec(`INSERT INTO observers (id, name, iata, last_seen, first_seen, packet_count)
+ VALUES ('obsC', 'C', '', ?, '2026-01-01T00:00:00Z', 10)`, now.Format(time.RFC3339))
+
+ // One transmission with 3 observations (SJC, SFO, no-IATA).
+ db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json)
+ VALUES ('AABB', 'deadbeefcafef00d', ?, 1, 4, '{}')`, now.Format(time.RFC3339))
+ // v3 schema: observer_idx = observers.rowid (auto-assigned 1,2,3 in insert order).
+ db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
+ VALUES (1, 1, 12.0, -80, '["aa"]', ?)`, recentEpoch)
+ db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
+ VALUES (1, 2, 8.0, -90, '["aa"]', ?)`, recentEpoch-30)
+ db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
+ VALUES (1, 3, 5.0, -95, '["aa"]', ?)`, recentEpoch-60)
+
+ result, err := db.QueryGroupedPackets(PacketQuery{Limit: 50})
+ if err != nil {
+ t.Fatalf("QueryGroupedPackets: %v", err)
+ }
+ if result.Total != 1 {
+ t.Fatalf("expected 1 grouped tx, got %d", result.Total)
+ }
+
+ row := result.Packets[0]
+ raw, ok := row["distinct_iatas"]
+ if !ok {
+ t.Fatalf("expected distinct_iatas key in grouped row, got: %#v", row)
+ }
+ iatas, ok := raw.([]string)
+ if !ok {
+ t.Fatalf("expected distinct_iatas to be []string, got %T (%v)", raw, raw)
+ }
+ sort.Strings(iatas)
+ want := []string{"SFO", "SJC"}
+ if strings.Join(iatas, ",") != strings.Join(want, ",") {
+ t.Fatalf("distinct_iatas = %v, want %v (must exclude empty-IATA observers, dedupe)", iatas, want)
+ }
+}
+
+// TestQueryGroupedPacketsDistinctIATAsEmptyWhenNoIATA (#1189 R2):
+// Group whose observers all have no IATA → distinct_iatas should be empty
+// (or absent / empty slice) — must NOT carry stale data from another group.
+func TestQueryGroupedPacketsDistinctIATAsEmptyWhenNoIATA(t *testing.T) {
+ db := setupTestDB(t)
+ defer db.Close()
+
+ now := time.Now().UTC()
+ recentEpoch := now.Add(-1 * time.Hour).Unix()
+
+ db.conn.Exec(`INSERT INTO observers (id, name, iata, last_seen, first_seen, packet_count)
+ VALUES ('obsX', 'X', '', ?, '2026-01-01T00:00:00Z', 1)`, now.Format(time.RFC3339))
+ db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json)
+ VALUES ('AA', '1111222233334444', ?, 1, 4, '{}')`, now.Format(time.RFC3339))
+ db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
+ VALUES (1, 1, 10.0, -85, '[]', ?)`, recentEpoch)
+
+ result, err := db.QueryGroupedPackets(PacketQuery{Limit: 50})
+ if err != nil {
+ t.Fatalf("QueryGroupedPackets: %v", err)
+ }
+ if result.Total != 1 {
+ t.Fatalf("expected 1 grouped tx, got %d", result.Total)
+ }
+ row := result.Packets[0]
+ raw, ok := row["distinct_iatas"]
+ if !ok {
+ // absent key acceptable — treat as empty
+ return
+ }
+ iatas, ok := raw.([]string)
+ if !ok {
+ t.Fatalf("distinct_iatas should be []string, got %T", raw)
+ }
+ if len(iatas) != 0 {
+ t.Fatalf("distinct_iatas should be empty for no-IATA group, got %v", iatas)
+ }
+}
diff --git a/cmd/server/main.go b/cmd/server/main.go
index e178afa2..82dd2e99 100644
--- a/cmd/server/main.go
+++ b/cmd/server/main.go
@@ -216,6 +216,15 @@ func main() {
log.Printf("[store] warning: could not add observers.last_packet_at column: %v", err)
}
+ // Ensure observers.iata column exists (#1188 read paths COALESCE(obs.iata, '')
+ // in Store.Load() / IngestNewFromDB / IngestNewObservations; ingestor migration
+ // adds it but server may run against DBs ingestor never touched (e2e fixture)
+ // OR pre-iata operator DBs upgraded to this build — without this migration
+ // the first SELECT crashes with "no such column: obs.iata" (#1189 R1).
+ if err := ensureObserverIATAColumn(dbPath); err != nil {
+ log.Printf("[store] warning: could not add observers.iata column: %v", err)
+ }
+
// Ensure nodes.foreign_advert column exists (#730 reads it on every /api/nodes
// scan; ingestor migration foreign_advert_v1 adds it but server may run against
// DBs ingestor never touched, e.g. e2e fixture).
diff --git a/cmd/server/neighbor_api_test.go b/cmd/server/neighbor_api_test.go
index 1510c515..40ce23c1 100644
--- a/cmd/server/neighbor_api_test.go
+++ b/cmd/server/neighbor_api_test.go
@@ -476,10 +476,10 @@ func TestBuildNodeInfoMap_ObserverEnrichment(t *testing.T) {
// Create tables
for _, stmt := range []string{
"CREATE TABLE nodes (public_key TEXT, name TEXT, role TEXT, lat REAL, lon REAL)",
- "CREATE TABLE observers (id TEXT, name TEXT)",
+ "CREATE TABLE observers (id TEXT, name TEXT, iata TEXT)",
"INSERT INTO nodes VALUES ('AAAA1111', 'Repeater-1', 'repeater', 0, 0)",
- "INSERT INTO observers VALUES ('BBBB2222', 'Observer-Alpha')",
- "INSERT INTO observers VALUES ('AAAA1111', 'Obs-also-repeater')",
+ "INSERT INTO observers VALUES ('BBBB2222', 'Observer-Alpha', '')",
+ "INSERT INTO observers VALUES ('AAAA1111', 'Obs-also-repeater', '')",
} {
if _, err := conn.Exec(stmt); err != nil {
t.Fatalf("exec %q: %v", stmt, err)
diff --git a/cmd/server/neighbor_persist.go b/cmd/server/neighbor_persist.go
index d9d271b5..c1cbf373 100644
--- a/cmd/server/neighbor_persist.go
+++ b/cmd/server/neighbor_persist.go
@@ -353,6 +353,44 @@ func ensureLastPacketAtColumn(dbPath string) error {
return nil
}
+// ensureObserverIATAColumn adds the iata column to observers if missing.
+// The column was originally added by ingestor migration (cmd/ingestor/db.go) to
+// label each observer with a 3-letter regional IATA code. When the server starts
+// against a DB that was never touched by the ingestor (e.g. the e2e fixture,
+// or a pre-iata operator DB upgraded to this build), every SELECT that joins
+// COALESCE(obs.iata, '') panics with "no such column: obs.iata" — crashing
+// Store.Load() / IngestNewFromDB / IngestNewObservations on startup (#1189 R1).
+func ensureObserverIATAColumn(dbPath string) error {
+ rw, err := cachedRW(dbPath)
+ if err != nil {
+ return err
+ }
+
+ rows, err := rw.Query("PRAGMA table_info(observers)")
+ if err != nil {
+ return err
+ }
+ defer rows.Close()
+
+ for rows.Next() {
+ var cid int
+ var colName string
+ var colType sql.NullString
+ var notNull, pk int
+ var dflt sql.NullString
+ if rows.Scan(&cid, &colName, &colType, ¬Null, &dflt, &pk) == nil && colName == "iata" {
+ return nil // already exists
+ }
+ }
+
+ _, err = rw.Exec("ALTER TABLE observers ADD COLUMN iata TEXT")
+ if err != nil {
+ return fmt.Errorf("add iata column: %w", err)
+ }
+ log.Println("[store] Added iata column to observers")
+ return nil
+}
+
// ensureForeignAdvertColumn adds the foreign_advert column to nodes/inactive_nodes
// if missing (#730). The column is added by the ingestor migration foreign_advert_v1
// — but the server may run against a DB the ingestor has never touched (e2e fixture,
diff --git a/cmd/server/neighbor_persist_test.go b/cmd/server/neighbor_persist_test.go
index 40594e4e..0e843e56 100644
--- a/cmd/server/neighbor_persist_test.go
+++ b/cmd/server/neighbor_persist_test.go
@@ -597,3 +597,91 @@ func TestEnsureLastPacketAtColumn(t *testing.T) {
t.Fatalf("idempotent call failed: %v", err)
}
}
+
+// TestEnsureObserverIATAColumn validates the #1189 R1 fix: an operator with
+// a pre-iata observers schema (no `iata TEXT` column) must not panic on
+// startup. The migration must idempotently ALTER TABLE ADD COLUMN, and
+// queries that COALESCE(obs.iata, '') must succeed after the migration runs.
+func TestEnsureObserverIATAColumn(t *testing.T) {
+ dir := t.TempDir()
+ dbPath := dir + "/test.db"
+
+ // Pre-iata schema (matches what shipped before #1188 landed).
+ db, err := sql.Open("sqlite", dbPath)
+ if err != nil {
+ t.Fatal(err)
+ }
+ _, err = db.Exec(`CREATE TABLE observers (
+ id TEXT PRIMARY KEY,
+ name TEXT,
+ last_seen TEXT,
+ first_seen TEXT,
+ packet_count INTEGER DEFAULT 0,
+ inactive INTEGER DEFAULT 0,
+ last_packet_at TEXT DEFAULT NULL
+ )`)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if _, err := db.Exec(`INSERT INTO observers (id, name) VALUES ('obs1', 'Observer One')`); err != nil {
+ t.Fatal(err)
+ }
+ db.Close()
+
+ // Prove the bug exists pre-migration: a SELECT that COALESCEs obs.iata must fail.
+ db0, err := sql.Open("sqlite", dbPath)
+ if err != nil {
+ t.Fatal(err)
+ }
+ var probe string
+ preErr := db0.QueryRow(`SELECT COALESCE(iata, '') FROM observers WHERE id='obs1'`).Scan(&probe)
+ db0.Close()
+ if preErr == nil {
+ t.Fatal("expected SELECT on missing iata column to fail BEFORE migration; got success")
+ }
+
+ // First call: should add the column.
+ if err := ensureObserverIATAColumn(dbPath); err != nil {
+ t.Fatalf("first call failed: %v", err)
+ }
+
+ // Verify column exists.
+ db2, err := sql.Open("sqlite", dbPath)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer db2.Close()
+
+ var found bool
+ rows, err := db2.Query("PRAGMA table_info(observers)")
+ if err != nil {
+ t.Fatal(err)
+ }
+ for rows.Next() {
+ var cid int
+ var colName string
+ var colType sql.NullString
+ var notNull, pk int
+ var dflt sql.NullString
+ if rows.Scan(&cid, &colName, &colType, ¬Null, &dflt, &pk) == nil && colName == "iata" {
+ found = true
+ }
+ }
+ rows.Close()
+ if !found {
+ t.Fatal("iata column not found after migration")
+ }
+
+ // The query that previously panicked must now succeed (empty string default).
+ if err := db2.QueryRow(`SELECT COALESCE(iata, '') FROM observers WHERE id='obs1'`).Scan(&probe); err != nil {
+ t.Fatalf("post-migration SELECT failed: %v", err)
+ }
+ if probe != "" {
+ t.Fatalf("expected empty iata for legacy row, got %q", probe)
+ }
+
+ // Idempotency: second call must succeed.
+ if err := ensureObserverIATAColumn(dbPath); err != nil {
+ t.Fatalf("idempotent call failed: %v", err)
+ }
+}
diff --git a/cmd/server/packets_observer_iata_test.go b/cmd/server/packets_observer_iata_test.go
new file mode 100644
index 00000000..4fe4385f
--- /dev/null
+++ b/cmd/server/packets_observer_iata_test.go
@@ -0,0 +1,121 @@
+// Test (#1188): /api/packets response must include observer_iata per packet
+// so the frontend can render the IATA inline without per-row observer lookups.
+package main
+
+import (
+ "encoding/json"
+ "net/http/httptest"
+ "testing"
+)
+
+// TestPacketsEndpointIncludesObserverIATA asserts the ungrouped packets endpoint
+// surfaces the joined observer's IATA on each packet row.
+func TestPacketsEndpointIncludesObserverIATA(t *testing.T) {
+ _, router := setupTestServer(t)
+ req := httptest.NewRequest("GET", "/api/packets?limit=10", nil)
+ w := httptest.NewRecorder()
+ router.ServeHTTP(w, req)
+
+ if w.Code != 200 {
+ t.Fatalf("expected 200, got %d", w.Code)
+ }
+ var body map[string]interface{}
+ if err := json.Unmarshal(w.Body.Bytes(), &body); err != nil {
+ t.Fatalf("invalid JSON: %v", err)
+ }
+ packets, ok := body["packets"].([]interface{})
+ if !ok || len(packets) == 0 {
+ t.Fatal("expected non-empty packets array")
+ }
+
+ // Seeded observers: obs1 → SJC, obs2 → SFO. At least one packet row
+ // must carry a non-empty observer_iata string.
+ gotIATA := false
+ for _, p := range packets {
+ m, _ := p.(map[string]interface{})
+ if m == nil {
+ continue
+ }
+ if _, present := m["observer_iata"]; !present {
+ t.Fatalf("packet missing observer_iata field; got keys: %v", keysOfMap(m))
+ }
+ if s, _ := m["observer_iata"].(string); s != "" {
+ gotIATA = true
+ }
+ }
+ if !gotIATA {
+ t.Fatalf("expected at least one packet with non-empty observer_iata (seed has SJC/SFO)")
+ }
+}
+
+// TestPacketsGroupedIncludesObserverIATA asserts the grouped (groupByHash)
+// view also surfaces observer_iata for the header row.
+func TestPacketsGroupedIncludesObserverIATA(t *testing.T) {
+ _, router := setupTestServer(t)
+ req := httptest.NewRequest("GET", "/api/packets?groupByHash=true&limit=10", nil)
+ w := httptest.NewRecorder()
+ router.ServeHTTP(w, req)
+
+ if w.Code != 200 {
+ t.Fatalf("expected 200, got %d", w.Code)
+ }
+ var body map[string]interface{}
+ json.Unmarshal(w.Body.Bytes(), &body)
+ packets, _ := body["packets"].([]interface{})
+ if len(packets) == 0 {
+ t.Fatal("expected non-empty grouped packets")
+ }
+ gotIATA := false
+ for _, p := range packets {
+ m, _ := p.(map[string]interface{})
+ if _, present := m["observer_iata"]; !present {
+ t.Fatalf("grouped packet missing observer_iata field; got keys: %v", keysOfMap(m))
+ }
+ if s, _ := m["observer_iata"].(string); s != "" {
+ gotIATA = true
+ }
+ }
+ if !gotIATA {
+ t.Fatalf("expected at least one grouped packet with non-empty observer_iata")
+ }
+}
+
+// TestPacketDetailObservationsIncludeIATA asserts /api/packets/{id} returns
+// per-observation observer_iata so the detail pane can render it.
+func TestPacketDetailObservationsIncludeIATA(t *testing.T) {
+ _, router := setupTestServer(t)
+ // transmission_id 1 has two observations (obs1 SJC, obs2 SFO) from seedTestData
+ req := httptest.NewRequest("GET", "/api/packets/1", nil)
+ w := httptest.NewRecorder()
+ router.ServeHTTP(w, req)
+ if w.Code != 200 {
+ t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
+ }
+ var body map[string]interface{}
+ json.Unmarshal(w.Body.Bytes(), &body)
+ obs, _ := body["observations"].([]interface{})
+ if len(obs) == 0 {
+ t.Fatalf("expected observations in detail response; body: %s", w.Body.String())
+ }
+ gotIATA := false
+ for _, o := range obs {
+ m, _ := o.(map[string]interface{})
+ if _, present := m["observer_iata"]; !present {
+ t.Fatalf("observation missing observer_iata field; got keys: %v", keysOfMap(m))
+ }
+ if s, _ := m["observer_iata"].(string); s != "" {
+ gotIATA = true
+ }
+ }
+ if !gotIATA {
+ t.Fatalf("expected at least one observation with non-empty observer_iata")
+ }
+}
+
+func keysOfMap(m map[string]interface{}) []string {
+ out := make([]string, 0, len(m))
+ for k := range m {
+ out = append(out, k)
+ }
+ return out
+}
diff --git a/cmd/server/routes.go b/cmd/server/routes.go
index dbc16dd4..b4d47442 100644
--- a/cmd/server/routes.go
+++ b/cmd/server/routes.go
@@ -2490,6 +2490,7 @@ func mapSliceToTransmissions(maps []map[string]interface{}) []TransmissionResp {
}
tx.ObserverID = m["observer_id"]
tx.ObserverName = m["observer_name"]
+ tx.ObserverIATA = m["observer_iata"]
tx.SNR = m["snr"]
tx.RSSI = m["rssi"]
tx.PathJSON = m["path_json"]
@@ -2512,6 +2513,7 @@ func mapSliceToObservations(maps []map[string]interface{}) []ObservationResp {
obs.Hash = m["hash"]
obs.ObserverID = m["observer_id"]
obs.ObserverName = m["observer_name"]
+ obs.ObserverIATA = m["observer_iata"]
obs.SNR = m["snr"]
obs.RSSI = m["rssi"]
obs.PathJSON = m["path_json"]
diff --git a/cmd/server/store.go b/cmd/server/store.go
index 0da491b7..773aaebc 100644
--- a/cmd/server/store.go
+++ b/cmd/server/store.go
@@ -37,6 +37,7 @@ type StoreTx struct {
// Display fields from longest-path observation
ObserverID string
ObserverName string
+ ObserverIATA string
SNR *float64
RSSI *float64
PathJSON string
@@ -59,6 +60,7 @@ type StoreObs struct {
TransmissionID int
ObserverID string
ObserverName string
+ ObserverIATA string
Direction string
SNR *float64
RSSI *float64
@@ -555,7 +557,7 @@ func (s *PacketStore) Load() error {
if s.db.isV3 {
loadSQL = `SELECT t.id, t.raw_hex, t.hash, t.first_seen, t.route_type,
t.payload_type, t.payload_version, t.decoded_json,
- o.id, obs.id, obs.name, o.direction,
+ o.id, obs.id, obs.name, COALESCE(obs.iata, ''), o.direction,
o.snr, o.rssi, o.score, o.path_json, strftime('%Y-%m-%dT%H:%M:%fZ', o.timestamp, 'unixepoch')` + obsRawHexCol + rpCol + `
FROM transmissions t
LEFT JOIN observations o ON o.transmission_id = t.id
@@ -564,10 +566,11 @@ func (s *PacketStore) Load() error {
} else {
loadSQL = `SELECT t.id, t.raw_hex, t.hash, t.first_seen, t.route_type,
t.payload_type, t.payload_version, t.decoded_json,
- o.id, o.observer_id, o.observer_name, o.direction,
+ o.id, o.observer_id, o.observer_name, COALESCE(obs.iata, ''), o.direction,
o.snr, o.rssi, o.score, o.path_json, o.timestamp` + obsRawHexCol + rpCol + `
FROM transmissions t
- LEFT JOIN observations o ON o.transmission_id = t.id` + filterClause + `
+ LEFT JOIN observations o ON o.transmission_id = t.id
+ LEFT JOIN observers obs ON obs.id = o.observer_id` + filterClause + `
ORDER BY t.first_seen ASC, o.timestamp DESC`
}
@@ -584,7 +587,7 @@ func (s *PacketStore) Load() error {
var rawHex, hash, firstSeen, decodedJSON sql.NullString
var routeType, payloadType, payloadVersion sql.NullInt64
var obsID sql.NullInt64
- var observerID, observerName, direction, pathJSON, obsTimestamp sql.NullString
+ var observerID, observerName, observerIATA, direction, pathJSON, obsTimestamp sql.NullString
var snr, rssi sql.NullFloat64
var score sql.NullInt64
var obsRawHex sql.NullString
@@ -592,7 +595,7 @@ func (s *PacketStore) Load() error {
scanArgs := []interface{}{&txID, &rawHex, &hash, &firstSeen, &routeType, &payloadType,
&payloadVersion, &decodedJSON,
- &obsID, &observerID, &observerName, &direction,
+ &obsID, &observerID, &observerName, &observerIATA, &direction,
&snr, &rssi, &score, &pathJSON, &obsTimestamp}
if s.db.hasObsRawHex {
scanArgs = append(scanArgs, &obsRawHex)
@@ -651,6 +654,7 @@ func (s *PacketStore) Load() error {
TransmissionID: txID,
ObserverID: obsIDStr,
ObserverName: nullStrVal(observerName),
+ ObserverIATA: nullStrVal(observerIATA),
Direction: nullStrVal(direction),
SNR: nullFloatPtr(snr),
RSSI: nullFloatPtr(rssi),
@@ -1142,6 +1146,7 @@ func pickBestObservation(tx *StoreTx) {
}
tx.ObserverID = best.ObserverID
tx.ObserverName = best.ObserverName
+ tx.ObserverIATA = best.ObserverIATA
tx.SNR = best.SNR
tx.RSSI = best.RSSI
tx.PathJSON = best.PathJSON
@@ -1428,6 +1433,11 @@ func groupedTxsToPage(txs []*StoreTx, total, offset, limit int) *PacketResult {
packets := make([]map[string]interface{}, len(page))
for i, tx := range page {
+ // #1189 R2: compute distinct IATA set across all observations.
+ // Frontend uses this in the default collapsed view to show CROSS-region
+ // reception at a glance — see groupedObserverIataBadgesHtml in
+ // public/packets.js.
+ distinctIatas := storeTxDistinctIatas(tx)
m := map[string]interface{}{
"hash": strOrNil(tx.Hash),
"first_seen": strOrNil(tx.FirstSeen),
@@ -1437,6 +1447,8 @@ func groupedTxsToPage(txs []*StoreTx, total, offset, limit int) *PacketResult {
"latest": strOrNil(tx.LatestSeen),
"observer_id": strOrNil(tx.ObserverID),
"observer_name": strOrNil(tx.ObserverName),
+ "observer_iata": strOrNil(tx.ObserverIATA),
+ "distinct_iatas": distinctIatas,
"path_json": strOrNil(tx.PathJSON),
"payload_type": intPtrOrNil(tx.PayloadType),
"route_type": intPtrOrNil(tx.RouteType),
@@ -1452,6 +1464,34 @@ func groupedTxsToPage(txs []*StoreTx, total, offset, limit int) *PacketResult {
return &PacketResult{Packets: packets, Total: total}
}
+// storeTxDistinctIatas (#1189 R2) returns a sorted, deduped list of observer
+// IATA codes for a StoreTx, excluding empty values. Returns an empty
+// (non-nil) []string when the tx has no IATA'd observations so JSON
+// serialization stays consistent across the in-memory store and SQL
+// fallback paths (db.go's parseDistinctIatasCSV does the same).
+func storeTxDistinctIatas(tx *StoreTx) []string {
+ if tx == nil {
+ return []string{}
+ }
+ seen := make(map[string]bool)
+ // Include the header observer's IATA (some hot-path StoreTx records the
+ // chosen observer fields directly without re-populating Observations).
+ if tx.ObserverIATA != "" {
+ seen[tx.ObserverIATA] = true
+ }
+ for _, o := range tx.Observations {
+ if o != nil && o.ObserverIATA != "" {
+ seen[o.ObserverIATA] = true
+ }
+ }
+ out := make([]string, 0, len(seen))
+ for k := range seen {
+ out = append(out, k)
+ }
+ sort.Strings(out)
+ return out
+}
+
// GetStoreStats returns aggregate counts (packet data from memory, node/observer from DB).
func (s *PacketStore) GetStoreStats() (*Stats, error) {
s.mu.RLock()
@@ -1906,7 +1946,7 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac
if s.db.isV3 {
querySQL = `SELECT t.id, t.raw_hex, t.hash, t.first_seen, t.route_type,
t.payload_type, t.payload_version, t.decoded_json,
- o.id, obs.id, obs.name, o.direction,
+ o.id, obs.id, obs.name, COALESCE(obs.iata, ''), o.direction,
o.snr, o.rssi, o.score, o.path_json, strftime('%Y-%m-%dT%H:%M:%fZ', o.timestamp, 'unixepoch')` + obsRHCol + `
FROM transmissions t
LEFT JOIN observations o ON o.transmission_id = t.id
@@ -1916,10 +1956,11 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac
} else {
querySQL = `SELECT t.id, t.raw_hex, t.hash, t.first_seen, t.route_type,
t.payload_type, t.payload_version, t.decoded_json,
- o.id, o.observer_id, o.observer_name, o.direction,
+ o.id, o.observer_id, o.observer_name, COALESCE(obs.iata, ''), o.direction,
o.snr, o.rssi, o.score, o.path_json, o.timestamp` + obsRHCol + `
FROM transmissions t
LEFT JOIN observations o ON o.transmission_id = t.id
+ LEFT JOIN observers obs ON obs.id = o.observer_id
WHERE t.id > ?
ORDER BY t.id ASC, o.timestamp DESC`
}
@@ -1933,14 +1974,14 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac
// Scan into temp structures
type tempRow struct {
- txID int
- rawHex, hash, firstSeen, decodedJSON string
- routeType, payloadType *int
- obsID *int
- observerID, observerName, direction, pathJSON, obsTS string
- obsRawHex string
- snr, rssi *float64
- score *int
+ txID int
+ rawHex, hash, firstSeen, decodedJSON string
+ routeType, payloadType *int
+ obsID *int
+ observerID, observerName, observerIATA, direction, pathJSON, obsTS string
+ obsRawHex string
+ snr, rssi *float64
+ score *int
}
var tempRows []tempRow
@@ -1952,14 +1993,14 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac
var rawHex, hash, firstSeen, decodedJSON sql.NullString
var routeType, payloadType, payloadVersion sql.NullInt64
var obsIDVal sql.NullInt64
- var observerID, observerName, direction, pathJSON, obsTimestamp sql.NullString
+ var observerID, observerName, observerIATA, direction, pathJSON, obsTimestamp sql.NullString
var snrVal, rssiVal sql.NullFloat64
var scoreVal sql.NullInt64
var obsRawHex sql.NullString
scanArgs2 := []interface{}{&txID, &rawHex, &hash, &firstSeen, &routeType, &payloadType,
&payloadVersion, &decodedJSON,
- &obsIDVal, &observerID, &observerName, &direction,
+ &obsIDVal, &observerID, &observerName, &observerIATA, &direction,
&snrVal, &rssiVal, &scoreVal, &pathJSON, &obsTimestamp}
if s.db.hasObsRawHex {
scanArgs2 = append(scanArgs2, &obsRawHex)
@@ -1986,6 +2027,7 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac
payloadType: nullIntPtr(payloadType),
observerID: nullStrVal(observerID),
observerName: nullStrVal(observerName),
+ observerIATA: nullStrVal(observerIATA),
direction: nullStrVal(direction),
pathJSON: nullStrVal(pathJSON),
obsTS: nullStrVal(obsTimestamp),
@@ -2088,6 +2130,7 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac
TransmissionID: r.txID,
ObserverID: r.observerID,
ObserverName: r.observerName,
+ ObserverIATA: r.observerIATA,
Direction: r.direction,
SNR: r.snr,
RSSI: r.rssi,
@@ -2341,7 +2384,7 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string]
obsRHCol2 = ", o.raw_hex"
}
if s.db.isV3 {
- querySQL = `SELECT o.id, o.transmission_id, obs.id, obs.name, o.direction,
+ querySQL = `SELECT o.id, o.transmission_id, obs.id, obs.name, COALESCE(obs.iata, ''), o.direction,
o.snr, o.rssi, o.score, o.path_json, strftime('%Y-%m-%dT%H:%M:%fZ', o.timestamp, 'unixepoch')` + obsRHCol2 + `
FROM observations o
LEFT JOIN observers obs ON obs.rowid = o.observer_idx
@@ -2349,9 +2392,10 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string]
ORDER BY o.id ASC
LIMIT ?`
} else {
- querySQL = `SELECT o.id, o.transmission_id, o.observer_id, o.observer_name, o.direction,
+ querySQL = `SELECT o.id, o.transmission_id, o.observer_id, o.observer_name, COALESCE(obs.iata, ''), o.direction,
o.snr, o.rssi, o.score, o.path_json, o.timestamp` + obsRHCol2 + `
FROM observations o
+ LEFT JOIN observers obs ON obs.id = o.observer_id
WHERE o.id > ?
ORDER BY o.id ASC
LIMIT ?`
@@ -2369,6 +2413,7 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string]
txID int
observerID string
observerName string
+ observerIATA string
direction string
snr, rssi *float64
score *int
@@ -2380,12 +2425,12 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string]
var obsRows []obsRow
for rows.Next() {
var oid, txID int
- var observerID, observerName, direction, pathJSON, ts sql.NullString
+ var observerID, observerName, observerIATA, direction, pathJSON, ts sql.NullString
var snr, rssi sql.NullFloat64
var score sql.NullInt64
var obsRawHex sql.NullString
- scanArgs3 := []interface{}{&oid, &txID, &observerID, &observerName, &direction,
+ scanArgs3 := []interface{}{&oid, &txID, &observerID, &observerName, &observerIATA, &direction,
&snr, &rssi, &score, &pathJSON, &ts}
if s.db.hasObsRawHex {
scanArgs3 = append(scanArgs3, &obsRawHex)
@@ -2399,6 +2444,7 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string]
txID: txID,
observerID: nullStrVal(observerID),
observerName: nullStrVal(observerName),
+ observerIATA: nullStrVal(observerIATA),
direction: nullStrVal(direction),
snr: nullFloatPtr(snr),
rssi: nullFloatPtr(rssi),
@@ -2455,6 +2501,7 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string]
TransmissionID: r.txID,
ObserverID: r.observerID,
ObserverName: r.observerName,
+ ObserverIATA: r.observerIATA,
Direction: r.direction,
SNR: r.snr,
RSSI: r.rssi,
@@ -3079,6 +3126,7 @@ func (s *PacketStore) enrichObs(obs *StoreObs) map[string]interface{} {
"timestamp": strOrNil(obs.Timestamp),
"observer_id": strOrNil(obs.ObserverID),
"observer_name": strOrNil(obs.ObserverName),
+ "observer_iata": strOrNil(obs.ObserverIATA),
"direction": strOrNil(obs.Direction),
"snr": floatPtrOrNil(obs.SNR),
"rssi": floatPtrOrNil(obs.RSSI),
@@ -3123,6 +3171,7 @@ func txToMap(tx *StoreTx, includeObservations ...bool) map[string]interface{} {
"observation_count": tx.ObservationCount,
"observer_id": strOrNil(tx.ObserverID),
"observer_name": strOrNil(tx.ObserverName),
+ "observer_iata": strOrNil(tx.ObserverIATA),
"snr": floatPtrOrNil(tx.SNR),
"rssi": floatPtrOrNil(tx.RSSI),
"path_json": strOrNil(tx.PathJSON),
@@ -3142,6 +3191,7 @@ func txToMap(tx *StoreTx, includeObservations ...bool) map[string]interface{} {
"id": o.ID,
"observer_id": strOrNil(o.ObserverID),
"observer_name": strOrNil(o.ObserverName),
+ "observer_iata": strOrNil(o.ObserverIATA),
"snr": floatPtrOrNil(o.SNR),
"rssi": floatPtrOrNil(o.RSSI),
"path_json": strOrNil(o.PathJSON),
diff --git a/cmd/server/topology_dedup_test.go b/cmd/server/topology_dedup_test.go
index dacf04ee..73a7efab 100644
--- a/cmd/server/topology_dedup_test.go
+++ b/cmd/server/topology_dedup_test.go
@@ -34,7 +34,7 @@ func TestTopologyDedup_RepeatersMergeByPubkey(t *testing.T) {
id INTEGER PRIMARY KEY, transmission_id INTEGER, observer_id TEXT, observer_name TEXT,
direction TEXT, snr REAL, rssi REAL, score INTEGER, path_json TEXT, timestamp TEXT, raw_hex TEXT
)`)
- exec(`CREATE TABLE observers (rowid INTEGER PRIMARY KEY, id TEXT, name TEXT)`)
+ exec(`CREATE TABLE observers (rowid INTEGER PRIMARY KEY, id TEXT, name TEXT, iata TEXT)`)
exec(`CREATE TABLE nodes (
public_key TEXT PRIMARY KEY, name TEXT, role TEXT, lat REAL, lon REAL,
last_seen TEXT, frequency REAL
@@ -158,7 +158,7 @@ func TestTopologyDedup_AmbiguousPrefixNotMerged(t *testing.T) {
id INTEGER PRIMARY KEY, transmission_id INTEGER, observer_id TEXT, observer_name TEXT,
direction TEXT, snr REAL, rssi REAL, score INTEGER, path_json TEXT, timestamp TEXT, raw_hex TEXT
)`)
- exec(`CREATE TABLE observers (rowid INTEGER PRIMARY KEY, id TEXT, name TEXT)`)
+ exec(`CREATE TABLE observers (rowid INTEGER PRIMARY KEY, id TEXT, name TEXT, iata TEXT)`)
exec(`CREATE TABLE nodes (
public_key TEXT PRIMARY KEY, name TEXT, role TEXT, lat REAL, lon REAL,
last_seen TEXT, frequency REAL
@@ -264,7 +264,7 @@ func TestTopologyDedup_PairsMergeByPubkey(t *testing.T) {
id INTEGER PRIMARY KEY, transmission_id INTEGER, observer_id TEXT, observer_name TEXT,
direction TEXT, snr REAL, rssi REAL, score INTEGER, path_json TEXT, timestamp TEXT, raw_hex TEXT
)`)
- exec(`CREATE TABLE observers (rowid INTEGER PRIMARY KEY, id TEXT, name TEXT)`)
+ exec(`CREATE TABLE observers (rowid INTEGER PRIMARY KEY, id TEXT, name TEXT, iata TEXT)`)
exec(`CREATE TABLE nodes (
public_key TEXT PRIMARY KEY, name TEXT, role TEXT, lat REAL, lon REAL,
last_seen TEXT, frequency REAL
diff --git a/cmd/server/types.go b/cmd/server/types.go
index 78560180..7797658e 100644
--- a/cmd/server/types.go
+++ b/cmd/server/types.go
@@ -264,6 +264,7 @@ type TransmissionResp struct {
ObservationCount int `json:"observation_count"`
ObserverID interface{} `json:"observer_id"`
ObserverName interface{} `json:"observer_name"`
+ ObserverIATA interface{} `json:"observer_iata"`
SNR interface{} `json:"snr"`
RSSI interface{} `json:"rssi"`
PathJSON interface{} `json:"path_json"`
@@ -278,6 +279,7 @@ type ObservationResp struct {
Hash interface{} `json:"hash,omitempty"`
ObserverID interface{} `json:"observer_id"`
ObserverName interface{} `json:"observer_name"`
+ ObserverIATA interface{} `json:"observer_iata"`
SNR interface{} `json:"snr"`
RSSI interface{} `json:"rssi"`
PathJSON interface{} `json:"path_json"`
@@ -295,6 +297,7 @@ type GroupedPacketResp struct {
Latest string `json:"latest"`
ObserverID interface{} `json:"observer_id"`
ObserverName interface{} `json:"observer_name"`
+ ObserverIATA interface{} `json:"observer_iata"`
PathJSON interface{} `json:"path_json"`
PayloadType int `json:"payload_type"`
RouteType int `json:"route_type"`
diff --git a/public/live.js b/public/live.js
index 83ddafd3..a29a5271 100644
--- a/public/live.js
+++ b/public/live.js
@@ -53,6 +53,24 @@
}
function setObserverIataMap(m) { observerIataMap = m || {}; }
+ // #1189 R2 mesh-operator fix: live feed must show the observer's IATA pill
+ // alongside the existing 👁 N badge so operators on /live can tell SAME-
+ // region from CROSS-region reception at a glance (same affordance as the
+ // /packets table). Mirrors `obsIataBadge` in public/packets.js — kept as a
+ // local helper for now (live.js and packets.js are separate IIFEs with no
+ // shared module). TODO: extract `obsIataBadge` into shared packet-helpers.js
+ // and have both surfaces import it.
+ function obsIataBadgeHtml(pkt) {
+ if (!pkt) return '';
+ var iata = pkt.observer_iata;
+ if (!iata && pkt.observer_id) iata = observerIataMap && observerIataMap[pkt.observer_id];
+ if (!iata) return '';
+ var esc = (typeof escapeHtml === 'function')
+ ? escapeHtml(iata)
+ : String(iata).replace(/&/g,'&').replace(//g,'>').replace(/"/g,'"').replace(/'/g,''');
+ return '' + esc + '';
+ }
+
/**
* Build observer_id → IATA map from the /api/observers response.
* The endpoint returns `{ observers: [...], server_time: "..." }`
@@ -2183,6 +2201,7 @@
const preview = text ? ' ' + (text.length > 35 ? text.slice(0, 35) + '…' : text) : '';
const hopStr = longestHops.length ? `${longestHops.length}⇢` : '';
const obsBadge = group.count > 1 ? `👁 ${group.count}` : '';
+ const iataBadge = obsIataBadgeHtml(pkt);
var _ccPayload = (pkt.decoded || {}).payload || {};
var _ccChan1 = (typeName === 'GRP_TXT' || typeName === 'CHAN') ? (_ccPayload.channel || null) : null;
@@ -2197,7 +2216,7 @@
item.innerHTML = `
${typeName}
- ${dotHtml1}${transportBadge(pkt.route_type)}${hopStr}${obsBadge}
+ ${dotHtml1}${transportBadge(pkt.route_type)}${hopStr}${obsBadge}${iataBadge}
${escapeHtml(preview)}
${formatLiveTimestampHtml(group.latestTs || Date.now())}
`;
@@ -3251,6 +3270,7 @@
const preview = text ? ' ' + (text.length > 35 ? text.slice(0, 35) + '…' : text) : '';
const hopStr = hops.length ? `${hops.length}⇢` : '';
const obsBadge = pkt.observation_count > 1 ? `👁 ${pkt.observation_count}` : '';
+ const iataBadge = obsIataBadgeHtml(pkt);
const anomalyIcon = (pkt.decoded && pkt.decoded.anomaly) ? '⚠️' : '';
var _ccPayload2 = (pkt.decoded || {}).payload || {};
var _ccChan = (typeName === 'GRP_TXT' || typeName === 'CHAN') ? (_ccPayload2.channel || null) : null;
@@ -3270,7 +3290,7 @@
item.innerHTML = `
${typeName}
- ${dotHtml}${transportBadge(pkt.route_type)}${hopStr}${obsBadge}${anomalyIcon}
+ ${dotHtml}${transportBadge(pkt.route_type)}${hopStr}${obsBadge}${iataBadge}${anomalyIcon}
${escapeHtml(preview)}
${formatLiveTimestampHtml(pkt._ts || Date.now())}
`;
@@ -3337,6 +3357,7 @@
const preview = text ? ' ' + (text.length > 35 ? text.slice(0, 35) + '…' : text) : '';
const hopStr = hops.length ? `${hops.length}⇢` : '';
const obsBadge = incomingObs > 1 ? `👁 ${incomingObs}` : '';
+ const iataBadge = obsIataBadgeHtml(pkt);
var _ccPayload3 = (pkt.decoded || {}).payload || {};
var _ccChan3 = (typeName === 'GRP_TXT' || typeName === 'CHAN') ? (_ccPayload3.channel || null) : null;
var dotHtml3 = _ccChan3 ? _feedColorDot(_ccChan3) : '';
@@ -3357,7 +3378,7 @@
item.innerHTML = `
${typeName}
- ${dotHtml3}${transportBadge(pkt.route_type)}${hopStr}${obsBadge}
+ ${dotHtml3}${transportBadge(pkt.route_type)}${hopStr}${obsBadge}${iataBadge}
${escapeHtml(preview)}
${formatLiveTimestampHtml(pkt._ts || Date.now())}
`;
diff --git a/public/packet-filter.js b/public/packet-filter.js
index 7aad6956..42154b3a 100644
--- a/public/packet-filter.js
+++ b/public/packet-filter.js
@@ -23,10 +23,10 @@
var TK = {
FIELD: 'FIELD', OP: 'OP', STRING: 'STRING', NUMBER: 'NUMBER', BOOL: 'BOOL',
DURATION: 'DURATION',
- AND: 'AND', OR: 'OR', NOT: 'NOT', LPAREN: 'LPAREN', RPAREN: 'RPAREN'
+ AND: 'AND', OR: 'OR', NOT: 'NOT', LPAREN: 'LPAREN', RPAREN: 'RPAREN', COMMA: 'COMMA'
};
- var OP_WORDS = { contains: true, starts_with: true, ends_with: true, after: true, before: true, between: true };
+ var OP_WORDS = { contains: true, starts_with: true, ends_with: true, after: true, before: true, between: true, in: true };
// Duration unit → seconds. Used for `age < 1h`-style filters.
var DURATION_UNITS = { s: 1, m: 60, h: 3600, d: 86400, w: 604800 };
@@ -50,6 +50,7 @@
if (input[i] === '!') { tokens.push({ type: TK.NOT, value: '!' }); i++; continue; }
if (input[i] === '(') { tokens.push({ type: TK.LPAREN }); i++; continue; }
if (input[i] === ')') { tokens.push({ type: TK.RPAREN }); i++; continue; }
+ if (input[i] === ',') { tokens.push({ type: TK.COMMA, value: ',' }); i++; continue; }
// quoted string
if (input[i] === '"') {
var j = i + 1;
@@ -179,6 +180,28 @@
return { type: 'comparison', field: field, op: op, value: lo, value2: hi };
}
+ // `in` takes a parenthesized list of values: `field in (a, b, c)`
+ if (op === 'in') {
+ if (!peek() || peek().type !== TK.LPAREN) {
+ throw new Error("Expected '(' after 'in'");
+ }
+ advance(); // consume '('
+ var values = [];
+ if (!peek() || peek().type === TK.RPAREN) {
+ throw new Error("Empty value list for 'in'");
+ }
+ values.push(parseValue(field, op));
+ while (peek() && peek().type === TK.COMMA) {
+ advance(); // consume ','
+ values.push(parseValue(field, op));
+ }
+ if (!peek() || peek().type !== TK.RPAREN) {
+ throw new Error("Expected ')' or ',' in 'in' list");
+ }
+ advance(); // consume ')'
+ return { type: 'comparison', field: field, op: op, values: values };
+ }
+
var value = parseValue(field, op);
if (op === 'after' || op === 'before') validateTimeValue(field, op, value);
return { type: 'comparison', field: field, op: op, value: value };
@@ -233,6 +256,7 @@
}
if (field === 'observer') return packet.observer_name || '';
if (field === 'observer_id') return packet.observer_id || '';
+ if (field === 'observer_iata' || field === 'iata') return packet.observer_iata || '';
if (field === 'observations') return packet.observation_count || 0;
if (field === 'time' || field === 'timestamp') {
// Returns ms-since-epoch or null. Falls back to first_seen when timestamp absent
@@ -304,6 +328,16 @@
if (fieldVal == null || fieldVal === undefined) return false;
+ // `in` operator: membership in a list of values (case-insensitive for strings)
+ if (op === 'in') {
+ var list = ast.values || [];
+ var lhs = String(fieldVal).toLowerCase();
+ for (var iv = 0; iv < list.length; iv++) {
+ if (String(list[iv]).toLowerCase() === lhs) return true;
+ }
+ return false;
+ }
+
// Temporal ops: after / before / between operate on epoch-ms.
if (op === 'after' || op === 'before' || op === 'between') {
var lhsMs = typeof fieldVal === 'number' ? fieldVal : Date.parse(fieldVal);
@@ -397,6 +431,8 @@
{ name: 'hops', desc: 'Number of hops in the path' },
{ name: 'observer', desc: 'Observer station name' },
{ name: 'observer_id', desc: 'Observer pubkey/id' },
+ { name: 'observer_iata', desc: 'Observer IATA region code (e.g. SJC, SFO)' },
+ { name: 'iata', desc: 'Alias of observer_iata' },
{ name: 'observations', desc: 'Number of observations of this packet' },
{ name: 'path', desc: 'Hop path (joined with arrows)' },
{ name: 'payload_bytes', desc: 'Payload size in bytes (size - 2 header bytes)' },
@@ -428,6 +464,7 @@
{ op: 'after', desc: 'Datetime after (ISO or epoch)', example: 'time after "2025-01-01"' },
{ op: 'before', desc: 'Datetime before', example: 'time before "2025-12-31"' },
{ op: 'between', desc: 'Datetime between two values', example: 'time between "2025-01-01" "2025-02-01"' },
+ { op: 'in', desc: 'Value in a list (case-insensitive for strings)', example: 'iata in ("SJC","SFO")' },
];
// Canonical type names (firmware payload types)
@@ -611,6 +648,18 @@
c = compile('observer == "kpabap"');
assert(c.filter({ observer_name: 'kpabap' }), 'observer');
+ // Observer IATA (#1188)
+ c = compile('observer_iata == "SJC"');
+ assert(c.filter({ observer_iata: 'SJC' }), 'observer_iata ==');
+ assert(!c.filter({ observer_iata: 'SFO' }), 'observer_iata != mismatch');
+ c = compile('iata == "SJC"');
+ assert(c.filter({ observer_iata: 'SJC' }), 'iata alias');
+ c = compile('iata in ("SJC","SFO")');
+ assert(c.filter({ observer_iata: 'SFO' }), 'iata in (...)');
+ assert(!c.filter({ observer_iata: 'LAX' }), 'iata in (...) mismatch');
+ c = compile('observer_iata contains "S"');
+ assert(c.filter({ observer_iata: 'SJC' }), 'observer_iata contains');
+
console.log('\nAll tests passed!');
module.exports = { parse: parse, evaluate: evaluate, compile: compile };
}
diff --git a/public/packets.js b/public/packets.js
index 3deb925a..cd83c3ef 100644
--- a/public/packets.js
+++ b/public/packets.js
@@ -465,6 +465,75 @@
if (!o) return id;
return o.iata ? `${o.name} (${o.iata})` : o.name;
}
+ // Compact IATA pill (#1188) — renders next to observer name. Prefers
+ // packet.observer_iata (now joined on the server) and falls back to the
+ // observer lookup map for callers that haven't been updated yet.
+ function obsIataBadge(packet) {
+ if (!packet) return '';
+ let iata = packet.observer_iata;
+ if (!iata) {
+ const o = packet.observer_id ? observerMap.get(packet.observer_id) : null;
+ iata = o && o.iata;
+ }
+ return iata ? `${escapeHtml(iata)}` : '';
+ }
+ // Plain observer name without the trailing IATA — used when the IATA is
+ // rendered separately as a badge (so the cell doesn't show "Name (SJC) SJC").
+ function obsNameOnly(id) {
+ if (!id) return '—';
+ const o = observerMap.get(id);
+ if (!o) return id;
+ return o.name;
+ }
+ // #1189 R1 mesh-operator feedback: in a grouped row the old cell showed ONE
+ // observer's IATA + `+N` — operators couldn't tell whether the N additional
+ // observers were SAME-region (redundant copies) or CROSS-region (interesting
+ // multi-site reception). This helper returns the cell's badge HTML showing
+ // the DISTINCT IATA set: `