feat: Go server API parity with Node.js — response shapes, perf, computed fields

- Packets query rewired from packets_v VIEW (9s) to direct table joins (~50ms)
- Packet response: added first_seen, observation_count; removed created_at, score
- Node response: added last_heard, hash_size, hash_size_inconsistent
- Schema-aware v2/v3 detection for observer_idx vs observer_id
- All Go tests passing

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
Kpa-clawbot
2026-03-27 01:50:32 -07:00
parent f74b4c69f4
commit e18a73e1f2
4 changed files with 265 additions and 81 deletions

View File

@@ -14,6 +14,7 @@ import (
// DB wraps a read-only connection to the MeshCore SQLite database.
type DB struct {
conn *sql.DB
isV3 bool // v3 schema: observer_idx in observations (vs observer_id in v2)
}
// OpenDB opens a read-only SQLite connection with WAL mode.
@@ -29,13 +30,92 @@ func OpenDB(path string) (*DB, error) {
conn.Close()
return nil, fmt.Errorf("ping failed: %w", err)
}
return &DB{conn: conn}, nil
d := &DB{conn: conn}
d.detectSchema()
return d, nil
}
func (db *DB) Close() error {
return db.conn.Close()
}
// detectSchema checks if the observations table uses v3 schema (observer_idx).
func (db *DB) detectSchema() {
rows, err := db.conn.Query("PRAGMA table_info(observations)")
if err != nil {
return
}
defer rows.Close()
for rows.Next() {
var cid int
var colName string
var colType sql.NullString
var notNull, pk int
var dflt sql.NullString
if rows.Scan(&cid, &colName, &colType, &notNull, &dflt, &pk) == nil && colName == "observer_idx" {
db.isV3 = true
return
}
}
}
// transmissionBaseSQL returns the SELECT columns and JOIN clause for transmission-centric queries.
func (db *DB) transmissionBaseSQL() (selectCols, observerJoin string) {
if db.isV3 {
selectCols = `t.id, t.raw_hex, t.hash, t.first_seen, t.route_type, t.payload_type, t.decoded_json,
COALESCE((SELECT COUNT(*) FROM observations WHERE transmission_id = t.id), 0) AS observation_count,
obs.id AS observer_id, obs.name AS observer_name,
o.snr, o.rssi, o.path_json, o.direction`
observerJoin = `LEFT JOIN observations o ON o.id = (
SELECT id FROM observations WHERE transmission_id = t.id
ORDER BY length(COALESCE(path_json,'')) DESC LIMIT 1
)
LEFT JOIN observers obs ON obs.rowid = o.observer_idx`
} else {
selectCols = `t.id, t.raw_hex, t.hash, t.first_seen, t.route_type, t.payload_type, t.decoded_json,
COALESCE((SELECT COUNT(*) FROM observations WHERE transmission_id = t.id), 0) AS observation_count,
o.observer_id, o.observer_name,
o.snr, o.rssi, o.path_json, o.direction`
observerJoin = `LEFT JOIN observations o ON o.id = (
SELECT id FROM observations WHERE transmission_id = t.id
ORDER BY length(COALESCE(path_json,'')) DESC LIMIT 1
)`
}
return
}
// scanTransmissionRow scans a row from the transmission-centric query.
// Returns a map matching the Node.js packet-store transmission shape.
func (db *DB) scanTransmissionRow(rows *sql.Rows) map[string]interface{} {
var id, observationCount int
var rawHex, hash, firstSeen, decodedJSON, observerID, observerName, pathJSON, direction sql.NullString
var routeType, payloadType sql.NullInt64
var snr, rssi sql.NullFloat64
if err := rows.Scan(&id, &rawHex, &hash, &firstSeen, &routeType, &payloadType, &decodedJSON,
&observationCount, &observerID, &observerName, &snr, &rssi, &pathJSON, &direction); err != nil {
return nil
}
return map[string]interface{}{
"id": id,
"raw_hex": nullStr(rawHex),
"hash": nullStr(hash),
"first_seen": nullStr(firstSeen),
"timestamp": nullStr(firstSeen),
"route_type": nullInt(routeType),
"payload_type": nullInt(payloadType),
"decoded_json": nullStr(decodedJSON),
"observation_count": observationCount,
"observer_id": nullStr(observerID),
"observer_name": nullStr(observerName),
"snr": nullFloat(snr),
"rssi": nullFloat(rssi),
"path_json": nullStr(pathJSON),
"direction": nullStr(direction),
}
}
// Node represents a row from the nodes table.
type Node struct {
PublicKey string `json:"public_key"`
@@ -159,7 +239,7 @@ type PacketResult struct {
Total int `json:"total"`
}
// QueryPackets returns paginated, filtered packets from packets_v view.
// QueryPackets returns paginated, filtered packets as transmissions (matching Node.js shape).
func (db *DB) QueryPackets(q PacketQuery) (*PacketResult, error) {
if q.Limit <= 0 {
q.Limit = 50
@@ -168,27 +248,30 @@ func (db *DB) QueryPackets(q PacketQuery) (*PacketResult, error) {
q.Order = "DESC"
}
where, args := db.buildPacketWhere(q)
where, args := db.buildTransmissionWhere(q)
w := ""
if len(where) > 0 {
w = "WHERE " + strings.Join(where, " AND ")
}
// Count transmissions (not observations)
var total int
if len(where) == 0 {
// Fast path: no filters, use direct table count
db.conn.QueryRow("SELECT COUNT(*) FROM observations").Scan(&total)
db.conn.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&total)
} else {
countSQL := fmt.Sprintf("SELECT COUNT(*) FROM packets_v %s", w)
if err := db.conn.QueryRow(countSQL, args...).Scan(&total); err != nil {
return nil, err
}
countSQL := fmt.Sprintf("SELECT COUNT(*) FROM transmissions t %s", w)
db.conn.QueryRow(countSQL, args...).Scan(&total)
}
querySQL := fmt.Sprintf("SELECT id, raw_hex, timestamp, observer_id, observer_name, direction, snr, rssi, score, hash, route_type, payload_type, payload_version, path_json, decoded_json, created_at FROM packets_v %s ORDER BY timestamp %s LIMIT ? OFFSET ?", w, q.Order)
args = append(args, q.Limit, q.Offset)
selectCols, observerJoin := db.transmissionBaseSQL()
querySQL := fmt.Sprintf("SELECT %s FROM transmissions t %s %s ORDER BY t.first_seen %s LIMIT ? OFFSET ?",
selectCols, observerJoin, w, q.Order)
rows, err := db.conn.Query(querySQL, args...)
qArgs := make([]interface{}, len(args))
copy(qArgs, args)
qArgs = append(qArgs, q.Limit, q.Offset)
rows, err := db.conn.Query(querySQL, qArgs...)
if err != nil {
return nil, err
}
@@ -196,7 +279,7 @@ func (db *DB) QueryPackets(q PacketQuery) (*PacketResult, error) {
packets := make([]map[string]interface{}, 0)
for rows.Next() {
p := scanPacketRow(rows)
p := db.scanTransmissionRow(rows)
if p != nil {
packets = append(packets, p)
}
@@ -217,7 +300,9 @@ func (db *DB) QueryGroupedPackets(q PacketQuery) (*PacketResult, error) {
}
qry := fmt.Sprintf(`SELECT hash, COUNT(*) as count, COUNT(DISTINCT observer_id) as observer_count,
MAX(timestamp) as latest, MIN(observer_id) as observer_id, MIN(observer_name) as observer_name,
MAX(timestamp) as latest,
(SELECT first_seen FROM transmissions WHERE hash = packets_v.hash LIMIT 1) as first_seen,
MIN(observer_id) as observer_id, MIN(observer_name) as observer_name,
MIN(path_json) as path_json, MIN(payload_type) as payload_type, MIN(route_type) as route_type,
MIN(raw_hex) as raw_hex, MIN(decoded_json) as decoded_json, MIN(snr) as snr, MIN(rssi) as rssi
FROM packets_v %s GROUP BY hash ORDER BY latest DESC LIMIT ? OFFSET ?`, w)
@@ -231,11 +316,11 @@ func (db *DB) QueryGroupedPackets(q PacketQuery) (*PacketResult, error) {
packets := make([]map[string]interface{}, 0)
for rows.Next() {
var hash, latest, observerID, observerName, pathJSON, rawHex, decodedJSON sql.NullString
var hash, latest, firstSeen, observerID, observerName, pathJSON, rawHex, decodedJSON sql.NullString
var count, observerCount int
var payloadType, routeType sql.NullInt64
var snr, rssi sql.NullFloat64
if err := rows.Scan(&hash, &count, &observerCount, &latest, &observerID, &observerName, &pathJSON, &payloadType, &routeType, &rawHex, &decodedJSON, &snr, &rssi); err != nil {
if err := rows.Scan(&hash, &count, &observerCount, &latest, &firstSeen, &observerID, &observerName, &pathJSON, &payloadType, &routeType, &rawHex, &decodedJSON, &snr, &rssi); err != nil {
continue
}
p := map[string]interface{}{
@@ -244,7 +329,7 @@ func (db *DB) QueryGroupedPackets(q PacketQuery) (*PacketResult, error) {
"observer_count": observerCount,
"observation_count": count,
"latest": nullStr(latest),
"first_seen": nullStr(latest),
"first_seen": nullStr(firstSeen),
"observer_id": nullStr(observerID),
"observer_name": nullStr(observerName),
"path_json": nullStr(pathJSON),
@@ -306,6 +391,56 @@ func (db *DB) buildPacketWhere(q PacketQuery) ([]string, []interface{}) {
return where, args
}
// buildTransmissionWhere builds WHERE clauses for transmission-centric queries.
// Uses t. prefix for transmission columns and EXISTS subqueries for observation filters.
func (db *DB) buildTransmissionWhere(q PacketQuery) ([]string, []interface{}) {
var where []string
var args []interface{}
if q.Type != nil {
where = append(where, "t.payload_type = ?")
args = append(args, *q.Type)
}
if q.Route != nil {
where = append(where, "t.route_type = ?")
args = append(args, *q.Route)
}
if q.Hash != "" {
where = append(where, "t.hash = ?")
args = append(args, strings.ToLower(q.Hash))
}
if q.Since != "" {
where = append(where, "t.first_seen > ?")
args = append(args, q.Since)
}
if q.Until != "" {
where = append(where, "t.first_seen < ?")
args = append(args, q.Until)
}
if q.Node != "" {
pk := db.resolveNodePubkey(q.Node)
where = append(where, "t.decoded_json LIKE ?")
args = append(args, "%"+pk+"%")
}
if q.Observer != "" {
if db.isV3 {
where = append(where, "EXISTS (SELECT 1 FROM observations oi JOIN observers obi ON obi.rowid = oi.observer_idx WHERE oi.transmission_id = t.id AND obi.id = ?)")
} else {
where = append(where, "EXISTS (SELECT 1 FROM observations oi WHERE oi.transmission_id = t.id AND oi.observer_id = ?)")
}
args = append(args, q.Observer)
}
if q.Region != "" {
if db.isV3 {
where = append(where, "EXISTS (SELECT 1 FROM observations oi JOIN observers obi ON obi.rowid = oi.observer_idx WHERE oi.transmission_id = t.id AND obi.iata = ?)")
} else {
where = append(where, "EXISTS (SELECT 1 FROM observations oi JOIN observers obi ON obi.id = oi.observer_id WHERE oi.transmission_id = t.id AND obi.iata = ?)")
}
args = append(args, q.Region)
}
return where, args
}
func (db *DB) resolveNodePubkey(nodeIDOrName string) string {
var pk string
err := db.conn.QueryRow("SELECT public_key FROM nodes WHERE public_key = ? OR name = ? LIMIT 1", nodeIDOrName, nodeIDOrName).Scan(&pk)
@@ -328,52 +463,36 @@ func (db *DB) GetPacketByID(id int) (map[string]interface{}, error) {
return nil, nil
}
// GetTransmissionByID fetches from transmissions table.
// GetTransmissionByID fetches from transmissions table with observer data.
func (db *DB) GetTransmissionByID(id int) (map[string]interface{}, error) {
var txID int
var rawHex, hash, firstSeen, decodedJSON, createdAt sql.NullString
var routeType, payloadType, payloadVersion sql.NullInt64
err := db.conn.QueryRow("SELECT id, raw_hex, hash, first_seen, route_type, payload_type, payload_version, decoded_json, created_at FROM transmissions WHERE id = ?", id).
Scan(&txID, &rawHex, &hash, &firstSeen, &routeType, &payloadType, &payloadVersion, &decodedJSON, &createdAt)
selectCols, observerJoin := db.transmissionBaseSQL()
querySQL := fmt.Sprintf("SELECT %s FROM transmissions t %s WHERE t.id = ?", selectCols, observerJoin)
rows, err := db.conn.Query(querySQL, id)
if err != nil {
return nil, err
}
return map[string]interface{}{
"id": txID,
"raw_hex": nullStr(rawHex),
"hash": nullStr(hash),
"first_seen": nullStr(firstSeen),
"timestamp": nullStr(firstSeen),
"route_type": nullInt(routeType),
"payload_type": nullInt(payloadType),
"payload_version": nullInt(payloadVersion),
"decoded_json": nullStr(decodedJSON),
"created_at": nullStr(createdAt),
}, nil
defer rows.Close()
if rows.Next() {
return db.scanTransmissionRow(rows), nil
}
return nil, nil
}
// GetPacketByHash fetches a transmission by content hash.
// GetPacketByHash fetches a transmission by content hash with observer data.
func (db *DB) GetPacketByHash(hash string) (map[string]interface{}, error) {
var txID int
var rawHex, h, firstSeen, decodedJSON, createdAt sql.NullString
var routeType, payloadType, payloadVersion sql.NullInt64
err := db.conn.QueryRow("SELECT id, raw_hex, hash, first_seen, route_type, payload_type, payload_version, decoded_json, created_at FROM transmissions WHERE hash = ?", strings.ToLower(hash)).
Scan(&txID, &rawHex, &h, &firstSeen, &routeType, &payloadType, &payloadVersion, &decodedJSON, &createdAt)
selectCols, observerJoin := db.transmissionBaseSQL()
querySQL := fmt.Sprintf("SELECT %s FROM transmissions t %s WHERE t.hash = ?", selectCols, observerJoin)
rows, err := db.conn.Query(querySQL, strings.ToLower(hash))
if err != nil {
return nil, err
}
return map[string]interface{}{
"id": txID,
"raw_hex": nullStr(rawHex),
"hash": nullStr(h),
"first_seen": nullStr(firstSeen),
"timestamp": nullStr(firstSeen),
"route_type": nullInt(routeType),
"payload_type": nullInt(payloadType),
"payload_version": nullInt(payloadVersion),
"decoded_json": nullStr(decodedJSON),
"created_at": nullStr(createdAt),
}, nil
defer rows.Close()
if rows.Next() {
return db.scanTransmissionRow(rows), nil
}
return nil, nil
}
// GetObservationsForHash returns all observations for a given hash.
@@ -526,6 +645,44 @@ func (db *DB) GetRecentPacketsForNode(pubkey string, name string, limit int) ([]
return packets, nil
}
// GetRecentTransmissionsForNode returns recent transmissions referencing a node (Node.js-compatible shape).
func (db *DB) GetRecentTransmissionsForNode(pubkey string, name string, limit int) ([]map[string]interface{}, error) {
if limit <= 0 {
limit = 20
}
pk := "%" + pubkey + "%"
np := "%" + name + "%"
selectCols, observerJoin := db.transmissionBaseSQL()
var querySQL string
var args []interface{}
if name != "" {
querySQL = fmt.Sprintf("SELECT %s FROM transmissions t %s WHERE t.decoded_json LIKE ? OR t.decoded_json LIKE ? ORDER BY t.first_seen DESC LIMIT ?",
selectCols, observerJoin)
args = []interface{}{pk, np, limit}
} else {
querySQL = fmt.Sprintf("SELECT %s FROM transmissions t %s WHERE t.decoded_json LIKE ? ORDER BY t.first_seen DESC LIMIT ?",
selectCols, observerJoin)
args = []interface{}{pk, limit}
}
rows, err := db.conn.Query(querySQL, args...)
if err != nil {
return nil, err
}
defer rows.Close()
packets := make([]map[string]interface{}, 0)
for rows.Next() {
p := db.scanTransmissionRow(rows)
if p != nil {
packets = append(packets, p)
}
}
return packets, nil
}
// GetObservers returns all observers sorted by last_seen DESC.
func (db *DB) GetObservers() ([]Observer, error) {
rows, err := db.conn.Query("SELECT id, name, iata, last_seen, first_seen, packet_count, model, firmware, client_version, radio, battery_mv, uptime_secs, noise_floor FROM observers ORDER BY last_seen DESC")
@@ -678,7 +835,7 @@ func (db *DB) GetNodeHealth(pubkey string) (map[string]interface{}, error) {
}
// Recent packets
recentPackets, _ := db.GetRecentPacketsForNode(pubkey, name, 20)
recentPackets, _ := db.GetRecentTransmissionsForNode(pubkey, name, 20)
return map[string]interface{}{
"node": node,
@@ -1065,14 +1222,17 @@ func scanNodeRow(rows *sql.Rows) map[string]interface{} {
return nil
}
return map[string]interface{}{
"public_key": pk,
"name": nullStr(name),
"role": nullStr(role),
"lat": nullFloat(lat),
"lon": nullFloat(lon),
"last_seen": nullStr(lastSeen),
"first_seen": nullStr(firstSeen),
"advert_count": advertCount,
"public_key": pk,
"name": nullStr(name),
"role": nullStr(role),
"lat": nullFloat(lat),
"lon": nullFloat(lon),
"last_seen": nullStr(lastSeen),
"first_seen": nullStr(firstSeen),
"advert_count": advertCount,
"last_heard": nullStr(lastSeen),
"hash_size": nil,
"hash_size_inconsistent": false,
}
}

View File

@@ -86,7 +86,7 @@ func setupTestDB(t *testing.T) *DB {
t.Fatal(err)
}
return &DB{conn: conn}
return &DB{conn: conn, isV3: true}
}
func seedTestData(t *testing.T, db *DB) {
@@ -170,11 +170,32 @@ func TestQueryPackets(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if result.Total != 3 {
t.Errorf("expected 3 total packets, got %d", result.Total)
// Transmission-centric: 2 unique transmissions (not 3 observations)
if result.Total != 2 {
t.Errorf("expected 2 total transmissions, got %d", result.Total)
}
if len(result.Packets) != 3 {
t.Errorf("expected 3 packets, got %d", len(result.Packets))
if len(result.Packets) != 2 {
t.Errorf("expected 2 packets, got %d", len(result.Packets))
}
// Verify transmission shape has required fields
if len(result.Packets) > 0 {
p := result.Packets[0]
if _, ok := p["first_seen"]; !ok {
t.Error("expected first_seen field in packet")
}
if _, ok := p["observation_count"]; !ok {
t.Error("expected observation_count field in packet")
}
if _, ok := p["timestamp"]; !ok {
t.Error("expected timestamp field in packet")
}
// Should NOT have observation-level fields at top
if _, ok := p["created_at"]; ok {
t.Error("did not expect created_at in transmission-level response")
}
if _, ok := p["score"]; ok {
t.Error("did not expect score in transmission-level response")
}
}
}
@@ -188,8 +209,9 @@ func TestQueryPacketsWithTypeFilter(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if result.Total != 2 {
t.Errorf("expected 2 ADVERT packets, got %d", result.Total)
// 1 transmission with payload_type=4 (has 2 observations, but we return transmissions)
if result.Total != 1 {
t.Errorf("expected 1 ADVERT transmission, got %d", result.Total)
}
}
@@ -477,9 +499,9 @@ func TestGetTransmissionByIDNotFound(t *testing.T) {
defer db.Close()
seedTestData(t, db)
_, err := db.GetTransmissionByID(9999)
if err == nil {
t.Error("expected error for nonexistent transmission")
result, _ := db.GetTransmissionByID(9999)
if result != nil {
t.Error("expected nil result for nonexistent transmission")
}
}
@@ -488,9 +510,9 @@ func TestGetPacketByHashNotFound(t *testing.T) {
defer db.Close()
seedTestData(t, db)
_, err := db.GetPacketByHash("nonexistenthash1")
if err == nil {
t.Error("expected error for nonexistent hash")
result, _ := db.GetPacketByHash("nonexistenthash1")
if result != nil {
t.Error("expected nil result for nonexistent hash")
}
}
@@ -737,8 +759,9 @@ func TestBuildPacketWhereFilters(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if result.Total != 2 {
t.Errorf("expected 2 results for hash filter, got %d", result.Total)
// 1 transmission with this hash (has 2 observations, but transmission-centric)
if result.Total != 1 {
t.Errorf("expected 1 result for hash filter, got %d", result.Total)
}
})

View File

@@ -486,7 +486,7 @@ func (s *Server) handleNodeDetail(w http.ResponseWriter, r *http.Request) {
if n, ok := node["name"]; ok && n != nil {
name = fmt.Sprintf("%v", n)
}
recentAdverts, _ := s.db.GetRecentPacketsForNode(pubkey, name, 20)
recentAdverts, _ := s.db.GetRecentTransmissionsForNode(pubkey, name, 20)
writeJSON(w, map[string]interface{}{
"node": node,

View File

@@ -71,8 +71,8 @@ func TestPacketsEndpoint(t *testing.T) {
if !ok {
t.Fatal("expected packets array")
}
if len(packets) != 3 {
t.Errorf("expected 3 packets, got %d", len(packets))
if len(packets) != 2 {
t.Errorf("expected 2 packets (transmissions), got %d", len(packets))
}
}
@@ -1510,7 +1510,8 @@ func TestHandlerErrorPackets(t *testing.T) {
router := mux.NewRouter()
srv.RegisterRoutes(router)
db.conn.Exec("DROP VIEW IF EXISTS packets_v")
// Drop transmissions table to trigger error in transmission-centric query
db.conn.Exec("DROP TABLE IF EXISTS transmissions")
req := httptest.NewRequest("GET", "/api/packets?limit=10", nil)
w := httptest.NewRecorder()