fix(packets): order by ingest id, not rxTime — fresh activity visible on packets page (#1345) (#1349)

## Summary
Fixes #1345 — the packets page shows "no recent activity" while MQTT
ingest is healthy because the default `/api/packets` query was `ORDER BY
first_seen DESC`, and PR #1233 redefined `first_seen` as the observer's
radio receive time (rxTime). When an observer buffers offline and
uploads hours later, its packets land with hours-old `first_seen`
values; older-ingested packets with fresher rxTime then crowd the top of
the list and the visually freshest activity disappears.

## Fix
Switch the default ordering to `t.id DESC` (ingest order) on
`/api/packets` and the closely-related endpoints. `id` is monotonic with
ingest time and immune to buffered uploads.

Endpoints changed (all use the same fix for the same reason):

| Path | Function | File |
|------|----------|------|
| `GET /api/packets` (default) | `DB.QueryPackets`, `Store.QueryPackets`
| `cmd/server/db.go`, `cmd/server/store.go` |
| `GET /api/packets?nodes=…` | `DB.QueryMultiNodePackets`,
`Store.QueryMultiNodePackets` | same |
| Node detail "recent transmissions" |
`DB.GetRecentTransmissionsForNode` | `cmd/server/db.go` |

## `since=` semantic — preserved
`since=` still filters by `first_seen` (RFC3339 path uses the
observations.timestamp subquery), i.e. "packets the network received
since X." Buffered uploads of older packets are still excluded from a
`since=15m` view even if they were ingested in the last 15 minutes. Only
the **display order** changes; filtering by receive time is unchanged.

## Audit — NOT changed
- `Store.QueryGroupedPackets` already sorts by `LatestSeen` (max
observation timestamp), which is correct for the grouped view and immune
to the buffered-upload regression.
- `GetChannelMessages` and channel `sample_json` subqueries keep
`first_seen DESC` — channel message chronology is meaningful for message
UX; if buffered uploads become a problem here too it's a separate UX
call (out of scope for #1345).
- `s.packets` insertion ordering (Load + ingest) — untouched. The fix
sorts at query time so we don't perturb `oldestLoaded` invariants.

## Tests — TDD red → green
- Red: `508f4371` adds `cmd/server/packets_order_test.go` with two cases
— order assertion (failed on master with `[fresh, buffered]`) and
since-filter semantic (RFC3339 path uses observation timestamps).
- Green: `0fd685e7` switches the SQL + in-memory ordering. Tests pass;
full `cmd/server` suite green locally (44s).

## Out of scope
- Re-thinking #1233's first_seen semantics
- Adding a UI sort toggle (issue's option 2)
- Channel-message page ordering

## Preflight
Clean (`bash ~/.openclaw/skills/pr-preflight/scripts/run-all.sh
origin/master`).

---------

Co-authored-by: openclaw-bot <bot@openclaw.local>
This commit is contained in:
Kpa-clawbot
2026-05-25 22:32:00 -07:00
committed by GitHub
parent dc6c79cff8
commit 9d3dd8df0a
3 changed files with 143 additions and 5 deletions
+13 -3
View File
@@ -493,8 +493,14 @@ func (db *DB) QueryPackets(q PacketQuery) (*PacketResult, error) {
db.conn.QueryRow(countSQL, args...).Scan(&total)
}
// #1345: order by ingest id, NOT first_seen. PR #1233 made first_seen=rxTime,
// so buffered-then-uploaded observer packets with hours-old rxTime were
// sorting to the top/middle and hiding fresh ingest. Ordering by id keeps
// "latest activity" semantically equal to "what we ingested last" — which
// is what the packets page is showing. The `since=` filter still uses
// first_seen / observation timestamp, preserving "received-by-radio since X."
selectCols, observerJoin := db.transmissionBaseSQL()
querySQL := fmt.Sprintf("SELECT %s FROM transmissions t %s %s ORDER BY t.first_seen %s LIMIT ? OFFSET ?",
querySQL := fmt.Sprintf("SELECT %s FROM transmissions t %s %s ORDER BY t.id %s LIMIT ? OFFSET ?",
selectCols, observerJoin, w, q.Order)
qArgs := make([]interface{}, len(args))
@@ -1013,7 +1019,10 @@ func (db *DB) GetRecentTransmissionsForNode(pubkey string, limit int) ([]map[str
selectCols, observerJoin := db.transmissionBaseSQL()
querySQL := fmt.Sprintf("SELECT %s FROM transmissions t %s WHERE t.from_pubkey = ? ORDER BY t.first_seen DESC LIMIT ?",
// #1345: order by ingest id, not first_seen (=rxTime). Buffered observer
// uploads with old rxTime would otherwise displace fresh activity from
// the "recent transmissions for node" list.
querySQL := fmt.Sprintf("SELECT %s FROM transmissions t %s WHERE t.from_pubkey = ? ORDER BY t.id DESC LIMIT ?",
selectCols, observerJoin)
args := []interface{}{pubkey, limit}
@@ -2013,7 +2022,8 @@ func (db *DB) QueryMultiNodePackets(pubkeys []string, limit, offset int, order,
db.conn.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM transmissions t %s", w), args...).Scan(&total)
selectCols, observerJoin := db.transmissionBaseSQL()
querySQL := fmt.Sprintf("SELECT %s FROM transmissions t %s %s ORDER BY t.first_seen %s LIMIT ? OFFSET ?",
// #1345: order by ingest id (see QueryPackets comment above).
querySQL := fmt.Sprintf("SELECT %s FROM transmissions t %s %s ORDER BY t.id %s LIMIT ? OFFSET ?",
selectCols, observerJoin, w, order)
qArgs := make([]interface{}, len(args))
+114
View File
@@ -0,0 +1,114 @@
package main
import (
"testing"
"time"
)
// TestQueryPacketsOrdersByIngestID is the regression test for issue #1345.
//
// PR #1233 changed `first_seen` to be the observer's receive time (rxTime),
// not the moment the server ingested the row. When an observer buffers
// offline and uploads hours later, its packets land with old first_seen
// values. The /api/packets handler previously ordered by
// `first_seen DESC`, so buffered uploads with old rxTime appeared at the
// bottom while older-ingested packets with newer rxTime took the top —
// users on the packets page saw "no recent activity" even though MQTT
// ingest was active.
//
// Fix: default ordering for /api/packets is `t.id DESC` (ingest order).
// This test inserts two rows where row order by id and order by
// first_seen DISAGREE, then asserts the result is ordered by id DESC.
func TestQueryPacketsOrdersByIngestID(t *testing.T) {
db := setupTestDB(t)
defer db.Close()
now := time.Now().UTC()
// Row A: ingested FIRST (lower id), rxTime "newer" (fresher first_seen)
freshFirstSeen := now.Add(-1 * time.Hour).Format(time.RFC3339)
// Row B: ingested SECOND (higher id), rxTime "older" — simulating a
// buffered observer upload that arrived after row A but contains a
// packet the radio received hours earlier.
bufferedFirstSeen := now.Add(-6 * time.Hour).Format(time.RFC3339)
if _, err := db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, payload_type)
VALUES ('AA', 'hashfresh00000001', ?, 4)`, freshFirstSeen); err != nil {
t.Fatal(err)
}
if _, err := db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, payload_type)
VALUES ('BB', 'hashbuffered00002', ?, 4)`, bufferedFirstSeen); err != nil {
t.Fatal(err)
}
result, err := db.QueryPackets(PacketQuery{Limit: 50, Order: "DESC"})
if err != nil {
t.Fatal(err)
}
if len(result.Packets) != 2 {
t.Fatalf("expected 2 packets, got %d", len(result.Packets))
}
// With first_seen DESC (the bug), the order would be [fresh, buffered]
// because the fresh row has the newer rxTime. With the fix (id DESC),
// order is [buffered, fresh] because the buffered row was ingested
// second and has the higher id.
first, _ := result.Packets[0]["hash"].(string)
second, _ := result.Packets[1]["hash"].(string)
if first != "hashbuffered00002" || second != "hashfresh00000001" {
t.Errorf("expected order [buffered, fresh] by ingest id DESC, got [%s, %s]",
first, second)
}
}
// TestQueryPacketsSinceFilterUsesFirstSeen documents the chosen semantic for
// the `since=` query param: it still filters by `first_seen` (radio receive
// time), NOT by ingest time. Rationale: callers using `since=` expect
// "packets the network received since X" — buffered uploads of older
// packets should still be EXCLUDED from a `since=15min` view even if
// they were ingested in the last 15 minutes. Display order is by ingest
// id (issue #1345 fix); filter semantic is unchanged.
func TestQueryPacketsSinceFilterUsesFirstSeen(t *testing.T) {
db := setupTestDB(t)
defer db.Close()
now := time.Now().UTC()
recent := now.Add(-30 * time.Minute).Format(time.RFC3339)
old := now.Add(-6 * time.Hour).Format(time.RFC3339)
sinceCutoff := now.Add(-1 * time.Hour).Format(time.RFC3339)
recentEpoch := now.Add(-30 * time.Minute).Unix()
oldEpoch := now.Add(-6 * time.Hour).Unix()
if _, err := db.conn.Exec(`INSERT INTO observers (id, name, last_seen, first_seen, packet_count)
VALUES ('obs1', 'Obs1', ?, ?, 1)`, recent, recent); err != nil {
t.Fatal(err)
}
if _, err := db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, payload_type)
VALUES ('AA', 'recentrx00000001', ?, 4)`, recent); err != nil {
t.Fatal(err)
}
// Buffered upload — ingested SECOND, but rxTime is 6h ago.
if _, err := db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, payload_type)
VALUES ('BB', 'oldrxbuffered001', ?, 4)`, old); err != nil {
t.Fatal(err)
}
if _, err := db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
VALUES (1, 1, 10, -90, '[]', ?)`, recentEpoch); err != nil {
t.Fatal(err)
}
if _, err := db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
VALUES (2, 1, 10, -90, '[]', ?)`, oldEpoch); err != nil {
t.Fatal(err)
}
result, err := db.QueryPackets(PacketQuery{Limit: 50, Order: "DESC", Since: sinceCutoff})
if err != nil {
t.Fatal(err)
}
if len(result.Packets) != 1 {
t.Fatalf("since= should filter by first_seen (rxTime); expected 1 packet, got %d",
len(result.Packets))
}
h, _ := result.Packets[0]["hash"].(string)
if h != "recentrx00000001" {
t.Errorf("expected the rxTime-recent packet, got %s", h)
}
}
+16 -2
View File
@@ -1373,6 +1373,20 @@ func (s *PacketStore) QueryPackets(q PacketQuery) *PacketResult {
results := s.filterPackets(q)
total := len(results)
// #1345: order by ingest id, not insertion-into-s.packets order. After
// Load() (which orders by first_seen ASC) the slice is mostly id-ordered
// EXCEPT where rxTime ≠ ingest time — exactly the buffered-observer-upload
// case that hides fresh activity. Sort by ID DESC so "page 0" is always
// the most-recently-ingested transmissions, matching the DB-path fix.
// Cost: O(n log n) on the filtered set per query; acceptable for the
// typical filter-then-paginate flow (filterPackets already O(n)).
sortedByID := make([]*StoreTx, len(results))
copy(sortedByID, results)
sort.Slice(sortedByID, func(i, j int) bool {
return sortedByID[i].ID < sortedByID[j].ID
})
results = sortedByID
// results is oldest-first (ASC). For DESC (default) read backwards from the tail;
// for ASC read forwards. Both are O(page_size) — no sort copy needed.
start := q.Offset
@@ -1956,9 +1970,9 @@ func (s *PacketStore) QueryMultiNodePackets(pubkeys []string, limit, offset int,
filtered = append(filtered, tx)
}
}
// Sort oldest-first to match pagination expectations (same as s.packets order).
// #1345: sort by ingest id, not first_seen (=rxTime).
sort.Slice(filtered, func(i, j int) bool {
return filtered[i].FirstSeen < filtered[j].FirstSeen
return filtered[i].ID < filtered[j].ID
})
total := len(filtered)