diff --git a/cmd/server/db.go b/cmd/server/db.go index f82d7b2..51d9cf7 100644 --- a/cmd/server/db.go +++ b/cmd/server/db.go @@ -14,6 +14,7 @@ import ( // DB wraps a read-only connection to the MeshCore SQLite database. type DB struct { conn *sql.DB + isV3 bool // v3 schema: observer_idx in observations (vs observer_id in v2) } // OpenDB opens a read-only SQLite connection with WAL mode. @@ -29,13 +30,92 @@ func OpenDB(path string) (*DB, error) { conn.Close() return nil, fmt.Errorf("ping failed: %w", err) } - return &DB{conn: conn}, nil + d := &DB{conn: conn} + d.detectSchema() + return d, nil } func (db *DB) Close() error { return db.conn.Close() } +// detectSchema checks if the observations table uses v3 schema (observer_idx). +func (db *DB) detectSchema() { + rows, err := db.conn.Query("PRAGMA table_info(observations)") + if err != nil { + return + } + defer rows.Close() + for rows.Next() { + var cid int + var colName string + var colType sql.NullString + var notNull, pk int + var dflt sql.NullString + if rows.Scan(&cid, &colName, &colType, ¬Null, &dflt, &pk) == nil && colName == "observer_idx" { + db.isV3 = true + return + } + } +} + +// transmissionBaseSQL returns the SELECT columns and JOIN clause for transmission-centric queries. +func (db *DB) transmissionBaseSQL() (selectCols, observerJoin string) { + if db.isV3 { + selectCols = `t.id, t.raw_hex, t.hash, t.first_seen, t.route_type, t.payload_type, t.decoded_json, + COALESCE((SELECT COUNT(*) FROM observations WHERE transmission_id = t.id), 0) AS observation_count, + obs.id AS observer_id, obs.name AS observer_name, + o.snr, o.rssi, o.path_json, o.direction` + observerJoin = `LEFT JOIN observations o ON o.id = ( + SELECT id FROM observations WHERE transmission_id = t.id + ORDER BY length(COALESCE(path_json,'')) DESC LIMIT 1 + ) + LEFT JOIN observers obs ON obs.rowid = o.observer_idx` + } else { + selectCols = `t.id, t.raw_hex, t.hash, t.first_seen, t.route_type, t.payload_type, t.decoded_json, + COALESCE((SELECT COUNT(*) FROM observations WHERE transmission_id = t.id), 0) AS observation_count, + o.observer_id, o.observer_name, + o.snr, o.rssi, o.path_json, o.direction` + observerJoin = `LEFT JOIN observations o ON o.id = ( + SELECT id FROM observations WHERE transmission_id = t.id + ORDER BY length(COALESCE(path_json,'')) DESC LIMIT 1 + )` + } + return +} + +// scanTransmissionRow scans a row from the transmission-centric query. +// Returns a map matching the Node.js packet-store transmission shape. +func (db *DB) scanTransmissionRow(rows *sql.Rows) map[string]interface{} { + var id, observationCount int + var rawHex, hash, firstSeen, decodedJSON, observerID, observerName, pathJSON, direction sql.NullString + var routeType, payloadType sql.NullInt64 + var snr, rssi sql.NullFloat64 + + if err := rows.Scan(&id, &rawHex, &hash, &firstSeen, &routeType, &payloadType, &decodedJSON, + &observationCount, &observerID, &observerName, &snr, &rssi, &pathJSON, &direction); err != nil { + return nil + } + + return map[string]interface{}{ + "id": id, + "raw_hex": nullStr(rawHex), + "hash": nullStr(hash), + "first_seen": nullStr(firstSeen), + "timestamp": nullStr(firstSeen), + "route_type": nullInt(routeType), + "payload_type": nullInt(payloadType), + "decoded_json": nullStr(decodedJSON), + "observation_count": observationCount, + "observer_id": nullStr(observerID), + "observer_name": nullStr(observerName), + "snr": nullFloat(snr), + "rssi": nullFloat(rssi), + "path_json": nullStr(pathJSON), + "direction": nullStr(direction), + } +} + // Node represents a row from the nodes table. type Node struct { PublicKey string `json:"public_key"` @@ -159,7 +239,7 @@ type PacketResult struct { Total int `json:"total"` } -// QueryPackets returns paginated, filtered packets from packets_v view. +// QueryPackets returns paginated, filtered packets as transmissions (matching Node.js shape). func (db *DB) QueryPackets(q PacketQuery) (*PacketResult, error) { if q.Limit <= 0 { q.Limit = 50 @@ -168,27 +248,30 @@ func (db *DB) QueryPackets(q PacketQuery) (*PacketResult, error) { q.Order = "DESC" } - where, args := db.buildPacketWhere(q) + where, args := db.buildTransmissionWhere(q) w := "" if len(where) > 0 { w = "WHERE " + strings.Join(where, " AND ") } + // Count transmissions (not observations) var total int if len(where) == 0 { - // Fast path: no filters, use direct table count - db.conn.QueryRow("SELECT COUNT(*) FROM observations").Scan(&total) + db.conn.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&total) } else { - countSQL := fmt.Sprintf("SELECT COUNT(*) FROM packets_v %s", w) - if err := db.conn.QueryRow(countSQL, args...).Scan(&total); err != nil { - return nil, err - } + countSQL := fmt.Sprintf("SELECT COUNT(*) FROM transmissions t %s", w) + db.conn.QueryRow(countSQL, args...).Scan(&total) } - querySQL := fmt.Sprintf("SELECT id, raw_hex, timestamp, observer_id, observer_name, direction, snr, rssi, score, hash, route_type, payload_type, payload_version, path_json, decoded_json, created_at FROM packets_v %s ORDER BY timestamp %s LIMIT ? OFFSET ?", w, q.Order) - args = append(args, q.Limit, q.Offset) + selectCols, observerJoin := db.transmissionBaseSQL() + querySQL := fmt.Sprintf("SELECT %s FROM transmissions t %s %s ORDER BY t.first_seen %s LIMIT ? OFFSET ?", + selectCols, observerJoin, w, q.Order) - rows, err := db.conn.Query(querySQL, args...) + qArgs := make([]interface{}, len(args)) + copy(qArgs, args) + qArgs = append(qArgs, q.Limit, q.Offset) + + rows, err := db.conn.Query(querySQL, qArgs...) if err != nil { return nil, err } @@ -196,7 +279,7 @@ func (db *DB) QueryPackets(q PacketQuery) (*PacketResult, error) { packets := make([]map[string]interface{}, 0) for rows.Next() { - p := scanPacketRow(rows) + p := db.scanTransmissionRow(rows) if p != nil { packets = append(packets, p) } @@ -217,7 +300,9 @@ func (db *DB) QueryGroupedPackets(q PacketQuery) (*PacketResult, error) { } qry := fmt.Sprintf(`SELECT hash, COUNT(*) as count, COUNT(DISTINCT observer_id) as observer_count, - MAX(timestamp) as latest, MIN(observer_id) as observer_id, MIN(observer_name) as observer_name, + MAX(timestamp) as latest, + (SELECT first_seen FROM transmissions WHERE hash = packets_v.hash LIMIT 1) as first_seen, + MIN(observer_id) as observer_id, MIN(observer_name) as observer_name, MIN(path_json) as path_json, MIN(payload_type) as payload_type, MIN(route_type) as route_type, MIN(raw_hex) as raw_hex, MIN(decoded_json) as decoded_json, MIN(snr) as snr, MIN(rssi) as rssi FROM packets_v %s GROUP BY hash ORDER BY latest DESC LIMIT ? OFFSET ?`, w) @@ -231,11 +316,11 @@ func (db *DB) QueryGroupedPackets(q PacketQuery) (*PacketResult, error) { packets := make([]map[string]interface{}, 0) for rows.Next() { - var hash, latest, observerID, observerName, pathJSON, rawHex, decodedJSON sql.NullString + var hash, latest, firstSeen, observerID, observerName, pathJSON, rawHex, decodedJSON sql.NullString var count, observerCount int var payloadType, routeType sql.NullInt64 var snr, rssi sql.NullFloat64 - if err := rows.Scan(&hash, &count, &observerCount, &latest, &observerID, &observerName, &pathJSON, &payloadType, &routeType, &rawHex, &decodedJSON, &snr, &rssi); err != nil { + if err := rows.Scan(&hash, &count, &observerCount, &latest, &firstSeen, &observerID, &observerName, &pathJSON, &payloadType, &routeType, &rawHex, &decodedJSON, &snr, &rssi); err != nil { continue } p := map[string]interface{}{ @@ -244,7 +329,7 @@ func (db *DB) QueryGroupedPackets(q PacketQuery) (*PacketResult, error) { "observer_count": observerCount, "observation_count": count, "latest": nullStr(latest), - "first_seen": nullStr(latest), + "first_seen": nullStr(firstSeen), "observer_id": nullStr(observerID), "observer_name": nullStr(observerName), "path_json": nullStr(pathJSON), @@ -306,6 +391,56 @@ func (db *DB) buildPacketWhere(q PacketQuery) ([]string, []interface{}) { return where, args } +// buildTransmissionWhere builds WHERE clauses for transmission-centric queries. +// Uses t. prefix for transmission columns and EXISTS subqueries for observation filters. +func (db *DB) buildTransmissionWhere(q PacketQuery) ([]string, []interface{}) { + var where []string + var args []interface{} + + if q.Type != nil { + where = append(where, "t.payload_type = ?") + args = append(args, *q.Type) + } + if q.Route != nil { + where = append(where, "t.route_type = ?") + args = append(args, *q.Route) + } + if q.Hash != "" { + where = append(where, "t.hash = ?") + args = append(args, strings.ToLower(q.Hash)) + } + if q.Since != "" { + where = append(where, "t.first_seen > ?") + args = append(args, q.Since) + } + if q.Until != "" { + where = append(where, "t.first_seen < ?") + args = append(args, q.Until) + } + if q.Node != "" { + pk := db.resolveNodePubkey(q.Node) + where = append(where, "t.decoded_json LIKE ?") + args = append(args, "%"+pk+"%") + } + if q.Observer != "" { + if db.isV3 { + where = append(where, "EXISTS (SELECT 1 FROM observations oi JOIN observers obi ON obi.rowid = oi.observer_idx WHERE oi.transmission_id = t.id AND obi.id = ?)") + } else { + where = append(where, "EXISTS (SELECT 1 FROM observations oi WHERE oi.transmission_id = t.id AND oi.observer_id = ?)") + } + args = append(args, q.Observer) + } + if q.Region != "" { + if db.isV3 { + where = append(where, "EXISTS (SELECT 1 FROM observations oi JOIN observers obi ON obi.rowid = oi.observer_idx WHERE oi.transmission_id = t.id AND obi.iata = ?)") + } else { + where = append(where, "EXISTS (SELECT 1 FROM observations oi JOIN observers obi ON obi.id = oi.observer_id WHERE oi.transmission_id = t.id AND obi.iata = ?)") + } + args = append(args, q.Region) + } + return where, args +} + func (db *DB) resolveNodePubkey(nodeIDOrName string) string { var pk string err := db.conn.QueryRow("SELECT public_key FROM nodes WHERE public_key = ? OR name = ? LIMIT 1", nodeIDOrName, nodeIDOrName).Scan(&pk) @@ -328,52 +463,36 @@ func (db *DB) GetPacketByID(id int) (map[string]interface{}, error) { return nil, nil } -// GetTransmissionByID fetches from transmissions table. +// GetTransmissionByID fetches from transmissions table with observer data. func (db *DB) GetTransmissionByID(id int) (map[string]interface{}, error) { - var txID int - var rawHex, hash, firstSeen, decodedJSON, createdAt sql.NullString - var routeType, payloadType, payloadVersion sql.NullInt64 - err := db.conn.QueryRow("SELECT id, raw_hex, hash, first_seen, route_type, payload_type, payload_version, decoded_json, created_at FROM transmissions WHERE id = ?", id). - Scan(&txID, &rawHex, &hash, &firstSeen, &routeType, &payloadType, &payloadVersion, &decodedJSON, &createdAt) + selectCols, observerJoin := db.transmissionBaseSQL() + querySQL := fmt.Sprintf("SELECT %s FROM transmissions t %s WHERE t.id = ?", selectCols, observerJoin) + + rows, err := db.conn.Query(querySQL, id) if err != nil { return nil, err } - return map[string]interface{}{ - "id": txID, - "raw_hex": nullStr(rawHex), - "hash": nullStr(hash), - "first_seen": nullStr(firstSeen), - "timestamp": nullStr(firstSeen), - "route_type": nullInt(routeType), - "payload_type": nullInt(payloadType), - "payload_version": nullInt(payloadVersion), - "decoded_json": nullStr(decodedJSON), - "created_at": nullStr(createdAt), - }, nil + defer rows.Close() + if rows.Next() { + return db.scanTransmissionRow(rows), nil + } + return nil, nil } -// GetPacketByHash fetches a transmission by content hash. +// GetPacketByHash fetches a transmission by content hash with observer data. func (db *DB) GetPacketByHash(hash string) (map[string]interface{}, error) { - var txID int - var rawHex, h, firstSeen, decodedJSON, createdAt sql.NullString - var routeType, payloadType, payloadVersion sql.NullInt64 - err := db.conn.QueryRow("SELECT id, raw_hex, hash, first_seen, route_type, payload_type, payload_version, decoded_json, created_at FROM transmissions WHERE hash = ?", strings.ToLower(hash)). - Scan(&txID, &rawHex, &h, &firstSeen, &routeType, &payloadType, &payloadVersion, &decodedJSON, &createdAt) + selectCols, observerJoin := db.transmissionBaseSQL() + querySQL := fmt.Sprintf("SELECT %s FROM transmissions t %s WHERE t.hash = ?", selectCols, observerJoin) + + rows, err := db.conn.Query(querySQL, strings.ToLower(hash)) if err != nil { return nil, err } - return map[string]interface{}{ - "id": txID, - "raw_hex": nullStr(rawHex), - "hash": nullStr(h), - "first_seen": nullStr(firstSeen), - "timestamp": nullStr(firstSeen), - "route_type": nullInt(routeType), - "payload_type": nullInt(payloadType), - "payload_version": nullInt(payloadVersion), - "decoded_json": nullStr(decodedJSON), - "created_at": nullStr(createdAt), - }, nil + defer rows.Close() + if rows.Next() { + return db.scanTransmissionRow(rows), nil + } + return nil, nil } // GetObservationsForHash returns all observations for a given hash. @@ -526,6 +645,44 @@ func (db *DB) GetRecentPacketsForNode(pubkey string, name string, limit int) ([] return packets, nil } +// GetRecentTransmissionsForNode returns recent transmissions referencing a node (Node.js-compatible shape). +func (db *DB) GetRecentTransmissionsForNode(pubkey string, name string, limit int) ([]map[string]interface{}, error) { + if limit <= 0 { + limit = 20 + } + pk := "%" + pubkey + "%" + np := "%" + name + "%" + + selectCols, observerJoin := db.transmissionBaseSQL() + + var querySQL string + var args []interface{} + if name != "" { + querySQL = fmt.Sprintf("SELECT %s FROM transmissions t %s WHERE t.decoded_json LIKE ? OR t.decoded_json LIKE ? ORDER BY t.first_seen DESC LIMIT ?", + selectCols, observerJoin) + args = []interface{}{pk, np, limit} + } else { + querySQL = fmt.Sprintf("SELECT %s FROM transmissions t %s WHERE t.decoded_json LIKE ? ORDER BY t.first_seen DESC LIMIT ?", + selectCols, observerJoin) + args = []interface{}{pk, limit} + } + + rows, err := db.conn.Query(querySQL, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + packets := make([]map[string]interface{}, 0) + for rows.Next() { + p := db.scanTransmissionRow(rows) + if p != nil { + packets = append(packets, p) + } + } + return packets, nil +} + // GetObservers returns all observers sorted by last_seen DESC. func (db *DB) GetObservers() ([]Observer, error) { rows, err := db.conn.Query("SELECT id, name, iata, last_seen, first_seen, packet_count, model, firmware, client_version, radio, battery_mv, uptime_secs, noise_floor FROM observers ORDER BY last_seen DESC") @@ -678,7 +835,7 @@ func (db *DB) GetNodeHealth(pubkey string) (map[string]interface{}, error) { } // Recent packets - recentPackets, _ := db.GetRecentPacketsForNode(pubkey, name, 20) + recentPackets, _ := db.GetRecentTransmissionsForNode(pubkey, name, 20) return map[string]interface{}{ "node": node, @@ -1065,14 +1222,17 @@ func scanNodeRow(rows *sql.Rows) map[string]interface{} { return nil } return map[string]interface{}{ - "public_key": pk, - "name": nullStr(name), - "role": nullStr(role), - "lat": nullFloat(lat), - "lon": nullFloat(lon), - "last_seen": nullStr(lastSeen), - "first_seen": nullStr(firstSeen), - "advert_count": advertCount, + "public_key": pk, + "name": nullStr(name), + "role": nullStr(role), + "lat": nullFloat(lat), + "lon": nullFloat(lon), + "last_seen": nullStr(lastSeen), + "first_seen": nullStr(firstSeen), + "advert_count": advertCount, + "last_heard": nullStr(lastSeen), + "hash_size": nil, + "hash_size_inconsistent": false, } } diff --git a/cmd/server/db_test.go b/cmd/server/db_test.go index f9b3b14..0dd87c6 100644 --- a/cmd/server/db_test.go +++ b/cmd/server/db_test.go @@ -86,7 +86,7 @@ func setupTestDB(t *testing.T) *DB { t.Fatal(err) } - return &DB{conn: conn} + return &DB{conn: conn, isV3: true} } func seedTestData(t *testing.T, db *DB) { @@ -170,11 +170,32 @@ func TestQueryPackets(t *testing.T) { if err != nil { t.Fatal(err) } - if result.Total != 3 { - t.Errorf("expected 3 total packets, got %d", result.Total) + // Transmission-centric: 2 unique transmissions (not 3 observations) + if result.Total != 2 { + t.Errorf("expected 2 total transmissions, got %d", result.Total) } - if len(result.Packets) != 3 { - t.Errorf("expected 3 packets, got %d", len(result.Packets)) + if len(result.Packets) != 2 { + t.Errorf("expected 2 packets, got %d", len(result.Packets)) + } + // Verify transmission shape has required fields + if len(result.Packets) > 0 { + p := result.Packets[0] + if _, ok := p["first_seen"]; !ok { + t.Error("expected first_seen field in packet") + } + if _, ok := p["observation_count"]; !ok { + t.Error("expected observation_count field in packet") + } + if _, ok := p["timestamp"]; !ok { + t.Error("expected timestamp field in packet") + } + // Should NOT have observation-level fields at top + if _, ok := p["created_at"]; ok { + t.Error("did not expect created_at in transmission-level response") + } + if _, ok := p["score"]; ok { + t.Error("did not expect score in transmission-level response") + } } } @@ -188,8 +209,9 @@ func TestQueryPacketsWithTypeFilter(t *testing.T) { if err != nil { t.Fatal(err) } - if result.Total != 2 { - t.Errorf("expected 2 ADVERT packets, got %d", result.Total) + // 1 transmission with payload_type=4 (has 2 observations, but we return transmissions) + if result.Total != 1 { + t.Errorf("expected 1 ADVERT transmission, got %d", result.Total) } } @@ -477,9 +499,9 @@ func TestGetTransmissionByIDNotFound(t *testing.T) { defer db.Close() seedTestData(t, db) - _, err := db.GetTransmissionByID(9999) - if err == nil { - t.Error("expected error for nonexistent transmission") + result, _ := db.GetTransmissionByID(9999) + if result != nil { + t.Error("expected nil result for nonexistent transmission") } } @@ -488,9 +510,9 @@ func TestGetPacketByHashNotFound(t *testing.T) { defer db.Close() seedTestData(t, db) - _, err := db.GetPacketByHash("nonexistenthash1") - if err == nil { - t.Error("expected error for nonexistent hash") + result, _ := db.GetPacketByHash("nonexistenthash1") + if result != nil { + t.Error("expected nil result for nonexistent hash") } } @@ -737,8 +759,9 @@ func TestBuildPacketWhereFilters(t *testing.T) { if err != nil { t.Fatal(err) } - if result.Total != 2 { - t.Errorf("expected 2 results for hash filter, got %d", result.Total) + // 1 transmission with this hash (has 2 observations, but transmission-centric) + if result.Total != 1 { + t.Errorf("expected 1 result for hash filter, got %d", result.Total) } }) diff --git a/cmd/server/routes.go b/cmd/server/routes.go index 1b4d170..a206f67 100644 --- a/cmd/server/routes.go +++ b/cmd/server/routes.go @@ -486,7 +486,7 @@ func (s *Server) handleNodeDetail(w http.ResponseWriter, r *http.Request) { if n, ok := node["name"]; ok && n != nil { name = fmt.Sprintf("%v", n) } - recentAdverts, _ := s.db.GetRecentPacketsForNode(pubkey, name, 20) + recentAdverts, _ := s.db.GetRecentTransmissionsForNode(pubkey, name, 20) writeJSON(w, map[string]interface{}{ "node": node, diff --git a/cmd/server/routes_test.go b/cmd/server/routes_test.go index 00a4c32..ecc9322 100644 --- a/cmd/server/routes_test.go +++ b/cmd/server/routes_test.go @@ -71,8 +71,8 @@ func TestPacketsEndpoint(t *testing.T) { if !ok { t.Fatal("expected packets array") } - if len(packets) != 3 { - t.Errorf("expected 3 packets, got %d", len(packets)) + if len(packets) != 2 { + t.Errorf("expected 2 packets (transmissions), got %d", len(packets)) } } @@ -1510,7 +1510,8 @@ func TestHandlerErrorPackets(t *testing.T) { router := mux.NewRouter() srv.RegisterRoutes(router) - db.conn.Exec("DROP VIEW IF EXISTS packets_v") + // Drop transmissions table to trigger error in transmission-centric query + db.conn.Exec("DROP TABLE IF EXISTS transmissions") req := httptest.NewRequest("GET", "/api/packets?limit=10", nil) w := httptest.NewRecorder()