mirror of
https://github.com/Kpa-clawbot/meshcore-analyzer.git
synced 2026-06-03 18:51:18 +00:00
fix(#1366): channels view shows latest message time — backend emits LatestSeen, not FirstSeen (#1368)
Red commit: 702d82eb5e (CI: see Actions
tab for fix/issue-1366)
## What
Channel view emits the max observation timestamp (`tx.LatestSeen`)
instead of the analyzer's first-observation time (`tx.FirstSeen`) as the
rendered `timestamp` field. A new `first_seen` field is exposed
alongside for debug surfaces. `sender_timestamp` continues to be
returned in the JSON response but is intentionally NOT used as the
rendered time (client clocks are unreliable).
## Root cause
Two parallel call sites both emitted the wrong field:
- `cmd/server/store.go` — `GetChannelMessages` (~line 4807): set
`entry.Data["timestamp"] = strOrNil(tx.FirstSeen)` for every new dedup
entry. `tx.FirstSeen` is the analyzer's first-ever observation time of a
`transmissions.hash` row; for heartbeat-style packets (e.g. `BlorkoBot
🤖` posting the same status line periodically), the hash is stable, so
FirstSeen stays pinned at the very first observation while the message
keeps retransmitting hours later. Operator sees "old" message timestamps
for live messages.
- `cmd/server/db.go` — `GetChannelMessages` (~line 1757): same problem
against the SQLite-backed query path. Used `nullStr(fs)` (where `fs` is
`t.first_seen`) for the `timestamp` field.
### Repro from staging
Same packet, same hash `aba4f0493249de57`, sender `BlorkoBot 🤖`:
- `/api/channels/%23test/messages` → `timestamp: "2026-05-25T15:53:20Z"`
(FirstSeen, 7h+ in the past)
- `/api/packets?hash=aba4f0493249de57` → `first_seen:
"2026-05-25T22:53:19Z"` (latest obs), `observation_count: 84`
The packets view used max-obs correctly; the channels view did not. 7h
gap matches operator screenshot.
## TDD red → green
Red: `cmd/server/channels_message_order_1366_test.go` — three tests:
- `TestChannelMessages_TimestampUsesLatestSeen`: seeds a CHAN tx with
observations 7h apart, asserts returned `timestamp` ≈ latest observation
epoch (±1s). Fails under FirstSeen with Δ=−25200s.
- `TestChannelMessages_TimestampNotSenderTimestamp`: seeds a CHAN tx
whose decoded `sender_timestamp` is year-2000 (bad RTC). Asserts the
rendered `timestamp` parses to current year — guards against the
tempting "just use sender_timestamp" alt-fix that would let bad client
clocks corrupt the view.
- `TestChannelMessages_TimestampIsUTCZ`: asserts the emitted string is
unambiguously UTC (suffix `Z` or `+00:00`) so browsers don't apply a
local-zone shift.
Green commit changes:
- `store.go`: emit `tx.LatestSeen` (with FirstSeen fallback if no obs);
add `first_seen` field.
- `db.go`: join `o.timestamp` per-observation, track max epoch per tx,
emit RFC3339 UTC at the end; add `first_seen` field.
`sender_timestamp` remains in the response — unchanged shape, frontend
never read it for the rendered time (verified: only `msg.timestamp` is
consumed in `public/channels.js:1902`).
## Manual verification (post-merge)
1. Deploy to staging.
2. Curl `/api/channels/%23test/messages?limit=5` and
`/api/packets?hash=<recent>`. The channel `timestamp` field MUST equal
the packets `first_seen` (max obs) for the same hash, NOT lag it.
3. Send a fresh GRP_TXT via a MeshCore client into a watched channel.
Within 15s, refresh the Channels view at `/channels`. The new message
MUST render at the bottom with the correct (current) time.
## Why not `sender_timestamp`?
It's a per-client field, decoded from the payload. Many MeshCore
firmware builds run without RTC/NTP/GPS and report bogus values.
Trusting it for display would propagate bad client clocks into the
analyzer UI — the analyzer is the source of truth for UTC, not the
client.
Fixes #1366
---------
Co-authored-by: CoreScope Bot <bot@corescope>
Co-authored-by: bot <bot@kpa-clawbot.dev>
Co-authored-by: openclaw-bot <bot@openclaw.local>
This commit is contained in:
@@ -0,0 +1,354 @@
|
||||
package main
|
||||
|
||||
// Regression tests for issue #1366: Channel view shows stale timestamps
|
||||
// because GetChannelMessages emits tx.FirstSeen (first-observation time)
|
||||
// when the operator-visible expectation is the latest observation time
|
||||
// (tx.LatestSeen). For repeated heartbeat-style messages whose tx.Hash is
|
||||
// stable, FirstSeen stays pinned to the very first observation while the
|
||||
// real-world transmission keeps repeating, producing a multi-hour gap
|
||||
// between the channel view and the operator's live MeshCore client.
|
||||
//
|
||||
// Server-side UTC clocks are trusted; client-reported sender_timestamp
|
||||
// is NOT (firmware lacks reliable wall-clock on many builds). Therefore
|
||||
// the fix uses tx.LatestSeen (== max observation timestamp), NOT
|
||||
// sender_timestamp. sender_timestamp remains exposed in the response
|
||||
// for debug surfaces but MUST NOT be the rendered field.
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// TestChannelMessages_TimestampUsesLatestSeen: a CHAN tx with multiple
|
||||
// observations spanning hours must render with the LATEST observation
|
||||
// timestamp, not the first-seen ingest time.
|
||||
func TestChannelMessages_TimestampUsesLatestSeen(t *testing.T) {
|
||||
db := setupTestDB(t)
|
||||
defer db.Close()
|
||||
|
||||
now := time.Now().UTC()
|
||||
firstSeen := now.Add(-7 * time.Hour).Format(time.RFC3339)
|
||||
firstSeenEpoch := now.Add(-7 * time.Hour).Unix()
|
||||
laterEpoch := now.Add(-5 * time.Minute).Unix()
|
||||
_ = laterEpoch
|
||||
|
||||
db.conn.Exec(`INSERT INTO observers (id, name, iata, last_seen, first_seen, packet_count)
|
||||
VALUES ('obsA', 'ObsA', 'SJC', ?, '2026-01-01T00:00:00Z', 10)`, firstSeen)
|
||||
db.conn.Exec(`INSERT INTO observers (id, name, iata, last_seen, first_seen, packet_count)
|
||||
VALUES ('obsB', 'ObsB', 'LAX', ?, '2026-01-01T00:00:00Z', 10)`, firstSeen)
|
||||
|
||||
// One transmission with two observations: T0 (7h ago) and T1 (5m ago).
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash)
|
||||
VALUES ('AA01', 'hash_repeated_msg', ?, 1, 5,
|
||||
'{"type":"CHAN","channel":"#test","text":"Heartbeat: ping","sender":"Heartbeat","sender_timestamp":` +
|
||||
strconv.FormatInt(firstSeenEpoch, 10) + `}',
|
||||
'#test')`, firstSeen)
|
||||
db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
|
||||
VALUES (1, 1, 10.0, -90, '["aa"]', ?)`, firstSeenEpoch)
|
||||
db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
|
||||
VALUES (1, 2, 11.0, -88, '["bb"]', ?)`, laterEpoch)
|
||||
|
||||
store := NewPacketStore(db, nil)
|
||||
store.Load()
|
||||
|
||||
msgs, total := store.GetChannelMessages("#test", 10, 0)
|
||||
if total != 1 {
|
||||
t.Fatalf("want 1 msg, got %d (msgs=%+v)", total, msgs)
|
||||
}
|
||||
got, _ := msgs[0]["timestamp"].(string)
|
||||
gotParsed, err := time.Parse(time.RFC3339, got)
|
||||
if err != nil {
|
||||
// Try the milli-second precision form that SQLite strftime emits.
|
||||
gotParsed, err = time.Parse("2006-01-02T15:04:05.000Z", got)
|
||||
if err != nil {
|
||||
gotParsed, err = time.Parse("2006-01-02T15:04:05.000Z07:00", got)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("timestamp not parseable: %q (%v)", got, err)
|
||||
}
|
||||
// LatestSeen should equal the laterEpoch observation (±1s).
|
||||
if delta := gotParsed.Unix() - laterEpoch; delta < -1 || delta > 1 {
|
||||
t.Errorf("timestamp: want ~%s (LatestSeen, observation at T-5m), got %q (Δ=%ds — likely FirstSeen, issue #1366)",
|
||||
time.Unix(laterEpoch, 0).UTC().Format(time.RFC3339), got, delta)
|
||||
}
|
||||
|
||||
// first_seen MUST also be exposed separately so the UI/debug can see
|
||||
// when the analyzer first heard the packet (older than `timestamp`).
|
||||
fs, _ := msgs[0]["first_seen"].(string)
|
||||
if fs == "" {
|
||||
t.Errorf("first_seen field must be exposed alongside timestamp; got empty")
|
||||
}
|
||||
if fs == got {
|
||||
t.Errorf("first_seen should differ from latest-seen timestamp (both = %q)", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestChannelMessages_TimestampNotSenderTimestamp: a CHAN tx whose
|
||||
// decoded sender_timestamp is wildly off (e.g. client with bad RTC)
|
||||
// must NOT cause the rendered timestamp to drift. Rendered timestamp
|
||||
// must remain server UTC (LatestSeen/FirstSeen), regardless of what
|
||||
// the client claimed.
|
||||
func TestChannelMessages_TimestampNotSenderTimestamp(t *testing.T) {
|
||||
db := setupTestDB(t)
|
||||
defer db.Close()
|
||||
|
||||
now := time.Now().UTC()
|
||||
firstSeen := now.Add(-10 * time.Minute).Format(time.RFC3339)
|
||||
firstSeenEpoch := now.Add(-10 * time.Minute).Unix()
|
||||
|
||||
// Client claims it sent the message in year 2000 (bad RTC).
|
||||
badSenderTs := int64(946684800) // 2000-01-01 UTC
|
||||
|
||||
db.conn.Exec(`INSERT INTO observers (id, name, iata, last_seen, first_seen, packet_count)
|
||||
VALUES ('obsX', 'ObsX', 'SJC', ?, '2026-01-01T00:00:00Z', 1)`, firstSeen)
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash)
|
||||
VALUES ('BB01', 'hash_bad_clock', ?, 1, 5,
|
||||
'{"type":"CHAN","channel":"#bad","text":"Alice: ping","sender":"Alice","sender_timestamp":` +
|
||||
strconv.FormatInt(badSenderTs, 10) + `}',
|
||||
'#bad')`, firstSeen)
|
||||
db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
|
||||
VALUES (1, 1, 10.0, -90, '["aa"]', ?)`, firstSeenEpoch)
|
||||
|
||||
store := NewPacketStore(db, nil)
|
||||
store.Load()
|
||||
|
||||
msgs, total := store.GetChannelMessages("#bad", 10, 0)
|
||||
if total != 1 {
|
||||
t.Fatalf("want 1 msg, got %d", total)
|
||||
}
|
||||
got, _ := msgs[0]["timestamp"].(string)
|
||||
// MUST be the server-side observation time, parseable as RFC3339, and
|
||||
// within ~1h of now — NOT the year-2000 client value.
|
||||
parsed, err := time.Parse(time.RFC3339, got)
|
||||
if err != nil {
|
||||
t.Fatalf("timestamp not RFC3339: %q (%v)", got, err)
|
||||
}
|
||||
if parsed.Year() < now.Year() {
|
||||
t.Errorf("rendered timestamp %q took on the client's bad sender_timestamp (year %d) instead of server UTC",
|
||||
got, parsed.Year())
|
||||
}
|
||||
}
|
||||
|
||||
// TestChannelMessages_TimestampIsUTCZ: rendered timestamp MUST end with
|
||||
// 'Z' (or +00:00) so the browser does NOT interpret it as a local-zone
|
||||
// string and shift by the operator's TZ offset.
|
||||
func TestChannelMessages_TimestampIsUTCZ(t *testing.T) {
|
||||
db := setupTestDB(t)
|
||||
defer db.Close()
|
||||
|
||||
now := time.Now().UTC()
|
||||
fs := now.Add(-30 * time.Minute).Format(time.RFC3339)
|
||||
ep := now.Add(-30 * time.Minute).Unix()
|
||||
|
||||
db.conn.Exec(`INSERT INTO observers (id, name, iata, last_seen, first_seen, packet_count)
|
||||
VALUES ('obsZ', 'ObsZ', 'SJC', ?, '2026-01-01T00:00:00Z', 1)`, fs)
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash)
|
||||
VALUES ('ZZ01', 'hash_zone_check', ?, 1, 5,
|
||||
'{"type":"CHAN","channel":"#zone","text":"Carol: ping","sender":"Carol"}',
|
||||
'#zone')`, fs)
|
||||
db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
|
||||
VALUES (1, 1, 11.0, -89, '["zz"]', ?)`, ep)
|
||||
|
||||
store := NewPacketStore(db, nil)
|
||||
store.Load()
|
||||
|
||||
msgs, _ := store.GetChannelMessages("#zone", 10, 0)
|
||||
if len(msgs) != 1 {
|
||||
t.Fatalf("want 1 msg, got %d", len(msgs))
|
||||
}
|
||||
ts, _ := msgs[0]["timestamp"].(string)
|
||||
if ts == "" {
|
||||
t.Fatal("empty timestamp")
|
||||
}
|
||||
n := len(ts)
|
||||
if !(ts[n-1] == 'Z' || (n >= 6 && ts[n-6:] == "+00:00")) {
|
||||
t.Errorf("timestamp not UTC-suffixed (Z/+00:00): %q", ts)
|
||||
}
|
||||
}
|
||||
|
||||
// TestChannelMessages_OrderedByLatestSeen: adversarial follow-up to #1366
|
||||
// (PR #1368). The earlier fix only adjusted the rendered `timestamp`
|
||||
// field; page SELECTION and SORT ORDER on both the in-memory and DB
|
||||
// paths still used FirstSeen. This test pins the contract:
|
||||
//
|
||||
// - tx-A: FirstSeen 24h ago, LatestSeen NOW (via a fresh observation).
|
||||
// - tx-B: FirstSeen 1h ago, LatestSeen 1h ago (single observation).
|
||||
//
|
||||
// Both paths MUST:
|
||||
// 1. Return BOTH transmissions in a small (limit=10) page — tx-A must
|
||||
// not be excluded because its FirstSeen is old.
|
||||
// 2. Return tx-A AFTER tx-B (newest-LatestSeen-LAST), matching the
|
||||
// tail-of-msgOrder convention used by the rest of the API and
|
||||
// the frontend's scrollToBottom().
|
||||
func TestChannelMessages_OrderedByLatestSeen_InMemory(t *testing.T) {
|
||||
db := setupTestDB(t)
|
||||
defer db.Close()
|
||||
|
||||
now := time.Now().UTC()
|
||||
tOld := now.Add(-24 * time.Hour)
|
||||
tMid := now.Add(-1 * time.Hour)
|
||||
tNewest := now.Add(-30 * time.Minute)
|
||||
tFresh := now.Add(-1 * time.Minute)
|
||||
|
||||
tOldStr := tOld.Format(time.RFC3339)
|
||||
tMidStr := tMid.Format(time.RFC3339)
|
||||
tNewestStr := tNewest.Format(time.RFC3339)
|
||||
|
||||
db.conn.Exec(`INSERT INTO observers (id, name, iata, last_seen, first_seen, packet_count)
|
||||
VALUES ('obsO', 'ObsO', 'SJC', ?, '2026-01-01T00:00:00Z', 10)`, tOldStr)
|
||||
|
||||
// tx-A: FirstSeen 24h ago, LatestSeen NOW (T-1m). Old insertion order.
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash)
|
||||
VALUES ('AAAA', 'order_hash_a', ?, 1, 5,
|
||||
'{"type":"CHAN","channel":"#ord","text":"Alpha: hb","sender":"Alpha"}', '#ord')`, tOldStr)
|
||||
db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
|
||||
VALUES (1, 1, 10.0, -90, '["aa"]', ?)`, tOld.Unix())
|
||||
db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
|
||||
VALUES (1, 1, 11.0, -88, '["aa"]', ?)`, tFresh.Unix())
|
||||
|
||||
// tx-B: FirstSeen 1h ago, LatestSeen 1h ago. OLDEST.
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash)
|
||||
VALUES ('BBBB', 'order_hash_b', ?, 1, 5,
|
||||
'{"type":"CHAN","channel":"#ord","text":"Bravo: msg","sender":"Bravo"}', '#ord')`, tMidStr)
|
||||
db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
|
||||
VALUES (2, 1, 9.0, -91, '["bb"]', ?)`, tMid.Unix())
|
||||
|
||||
// tx-C: FirstSeen 30m ago, LatestSeen 30m ago. Middle.
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash)
|
||||
VALUES ('CCCC', 'order_hash_c', ?, 1, 5,
|
||||
'{"type":"CHAN","channel":"#ord","text":"Charlie: msg","sender":"Charlie"}', '#ord')`, tNewestStr)
|
||||
db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
|
||||
VALUES (3, 1, 9.0, -91, '["cc"]', ?)`, tNewest.Unix())
|
||||
|
||||
store := NewPacketStore(db, nil)
|
||||
store.Load()
|
||||
|
||||
// Full-page: ordering check (fix #1 gates this — without sort,
|
||||
// msgOrder is insertion order and Alpha lands FIRST, not LAST).
|
||||
msgsAll, totalAll := store.GetChannelMessages("#ord", 10, 0)
|
||||
if totalAll != 3 {
|
||||
t.Fatalf("in-memory: want total=3, got %d", totalAll)
|
||||
}
|
||||
if len(msgsAll) != 3 {
|
||||
t.Fatalf("in-memory: want 3 msgs, got %d", len(msgsAll))
|
||||
}
|
||||
wantOrder := []string{"Bravo", "Charlie", "Alpha"}
|
||||
for i, want := range wantOrder {
|
||||
got, _ := msgsAll[i]["sender"].(string)
|
||||
if got != want {
|
||||
t.Errorf("in-memory: msg[%d] want sender=%q, got %q (LatestSeen ASC, fix #1)", i, want, got)
|
||||
}
|
||||
}
|
||||
|
||||
// Small page (limit=2): tx-A (Alpha) MUST be included because its
|
||||
// LatestSeen is freshest, even though FirstSeen is oldest. Without
|
||||
// fix #1, the in-memory path takes msgOrder[total-2:] which would
|
||||
// drop Alpha (it sits at msgOrder[0] by insertion order).
|
||||
msgsPage, _ := store.GetChannelMessages("#ord", 2, 0)
|
||||
if len(msgsPage) != 2 {
|
||||
t.Fatalf("in-memory: want 2 msgs at limit=2, got %d", len(msgsPage))
|
||||
}
|
||||
hasAlpha := false
|
||||
for _, m := range msgsPage {
|
||||
if s, _ := m["sender"].(string); s == "Alpha" {
|
||||
hasAlpha = true
|
||||
}
|
||||
}
|
||||
if !hasAlpha {
|
||||
t.Errorf("in-memory: tx-A (Alpha) excluded from limit=2 page — FirstSeen-based tail selection bug (fix #1 reverted?)")
|
||||
}
|
||||
}
|
||||
|
||||
func TestChannelMessages_OrderedByLatestSeen_DB(t *testing.T) {
|
||||
db := setupTestDB(t)
|
||||
defer db.Close()
|
||||
|
||||
now := time.Now().UTC()
|
||||
tOld := now.Add(-24 * time.Hour)
|
||||
tMid := now.Add(-1 * time.Hour)
|
||||
tNewest := now.Add(-30 * time.Minute)
|
||||
tFresh := now.Add(-1 * time.Minute)
|
||||
|
||||
tOldStr := tOld.Format(time.RFC3339)
|
||||
tMidStr := tMid.Format(time.RFC3339)
|
||||
tNewestStr := tNewest.Format(time.RFC3339)
|
||||
|
||||
db.conn.Exec(`INSERT INTO observers (id, name, iata, last_seen, first_seen, packet_count)
|
||||
VALUES ('obsD', 'ObsD', 'SJC', ?, '2026-01-01T00:00:00Z', 10)`, tOldStr)
|
||||
|
||||
// tx-A: FirstSeen 24h ago, observations at T-24h and T-1m (LatestSeen
|
||||
// = T-1m, the FRESHEST). Despite the freshest LatestSeen, a
|
||||
// FirstSeen-DESC selection would push it OFF a small page.
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash)
|
||||
VALUES ('AADB', 'order_db_hash_a', ?, 1, 5,
|
||||
'{"type":"CHAN","channel":"#ordb","text":"Alpha: hb","sender":"Alpha"}', '#ordb')`, tOldStr)
|
||||
db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
|
||||
VALUES (1, 1, 10.0, -90, '["aa"]', ?)`, tOld.Unix())
|
||||
db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
|
||||
VALUES (1, 1, 11.0, -88, '["aa"]', ?)`, tFresh.Unix())
|
||||
|
||||
// tx-B: FirstSeen 1h ago, LatestSeen 1h ago. OLDEST LatestSeen.
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash)
|
||||
VALUES ('BBDB', 'order_db_hash_b', ?, 1, 5,
|
||||
'{"type":"CHAN","channel":"#ordb","text":"Bravo: msg","sender":"Bravo"}', '#ordb')`, tMidStr)
|
||||
db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
|
||||
VALUES (2, 1, 9.0, -91, '["bb"]', ?)`, tMid.Unix())
|
||||
|
||||
// tx-C: FirstSeen 30m ago, LatestSeen 30m ago. Middle LatestSeen.
|
||||
// With FirstSeen-DESC selection + limit=2, page = [tx-C, tx-B] and
|
||||
// tx-A is EXCLUDED — that's the selection bug fix #2 gates.
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash)
|
||||
VALUES ('CCDB', 'order_db_hash_c', ?, 1, 5,
|
||||
'{"type":"CHAN","channel":"#ordb","text":"Charlie: msg","sender":"Charlie"}', '#ordb')`, tNewestStr)
|
||||
db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
|
||||
VALUES (3, 1, 9.0, -91, '["cc"]', ?)`, tNewest.Unix())
|
||||
|
||||
msgs, total, err := db.GetChannelMessages("#ordb", 2, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if total != 3 {
|
||||
t.Fatalf("DB: want total=3, got %d", total)
|
||||
}
|
||||
if len(msgs) != 2 {
|
||||
t.Fatalf("DB: want 2 msgs in page (limit=2), got %d", len(msgs))
|
||||
}
|
||||
// Selection (fix #2): the page MUST include tx-A (Alpha) because its
|
||||
// LatestSeen is the newest — even though its FirstSeen is the OLDEST.
|
||||
// With limit=2 + LatestSeen-DESC selection, page = [Alpha, Charlie].
|
||||
// Returned ASC by LatestSeen (newest LAST, fix #3) = [Charlie, Alpha].
|
||||
sender0, _ := msgs[0]["sender"].(string)
|
||||
sender1, _ := msgs[1]["sender"].(string)
|
||||
if sender0 != "Charlie" || sender1 != "Alpha" {
|
||||
t.Errorf("DB: want order [Charlie, Alpha] (page selected by LatestSeen DESC, returned ASC, fix #2+#3), got [%q, %q]",
|
||||
sender0, sender1)
|
||||
}
|
||||
hasAlpha := false
|
||||
for _, m := range msgs {
|
||||
if s, _ := m["sender"].(string); s == "Alpha" {
|
||||
hasAlpha = true
|
||||
}
|
||||
}
|
||||
if !hasAlpha {
|
||||
t.Errorf("DB: tx-A (Alpha) excluded from page — FirstSeen-based selection bug (fix #2 reverted?)")
|
||||
}
|
||||
|
||||
// Also exercise large-page case (limit > total): ordering-only check.
|
||||
msgsAll, totalAll, err := db.GetChannelMessages("#ordb", 10, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if totalAll != 3 || len(msgsAll) != 3 {
|
||||
t.Fatalf("DB: want all 3 msgs at limit=10, got total=%d len=%d", totalAll, len(msgsAll))
|
||||
}
|
||||
// Expected ASC by LatestSeen: Bravo (T-1h), Charlie (T-30m), Alpha (T-1m).
|
||||
wantOrder := []string{"Bravo", "Charlie", "Alpha"}
|
||||
for i, want := range wantOrder {
|
||||
got, _ := msgsAll[i]["sender"].(string)
|
||||
if got != want {
|
||||
t.Errorf("DB: msg[%d] want sender=%q, got %q (full order: must be LatestSeen ASC, fix #3)", i, want, got)
|
||||
}
|
||||
}
|
||||
}
|
||||
+73
-28
@@ -1633,27 +1633,38 @@ func (db *DB) GetChannelMessages(channelHash string, limit, offset int, region .
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
// 2) Page of transmission IDs — newest LIMIT msgs minus OFFSET, returned
|
||||
// in ASC order to match prior API contract (tail of message log).
|
||||
pageSQL := `SELECT t.id FROM (
|
||||
SELECT id FROM transmissions
|
||||
WHERE channel_hash = ? AND payload_type = 5
|
||||
ORDER BY first_seen DESC
|
||||
LIMIT ? OFFSET ?
|
||||
) t`
|
||||
// When a region filter is in play, we must filter on the inner subquery
|
||||
// against the transmissions table — re-use the same EXISTS form but
|
||||
// wrap so we still get DESC-then-ASC pagination.
|
||||
// 2) Page of transmission IDs — newest LIMIT msgs minus OFFSET.
|
||||
// Issue #1366 follow-up (fix #2): select page by latest observation
|
||||
// timestamp (LatestSeen) DESC, NOT by t.first_seen DESC — otherwise
|
||||
// a heartbeat tx whose FirstSeen is 24h old but whose latest
|
||||
// observation is fresh gets pushed off page 1.
|
||||
//
|
||||
// PR #1368 perf fix: use a correlated subquery for MAX(timestamp) per
|
||||
// transmission. With the composite index idx_observations_tx_ts
|
||||
// (transmission_id, timestamp) sqlite resolves MAX as an index-only
|
||||
// rightmost-leaf lookup — total O(N_tx · log N_obs). The previously-
|
||||
// used grouped derived table (`GROUP BY transmission_id` over the
|
||||
// whole observations table) scanned all observation rows (O(N_obs))
|
||||
// and blew the 1.5s perf budget on 1500 tx × 50 obs under -race.
|
||||
// LEFT JOIN + GROUP BY t.id was even slower because GROUP BY forced
|
||||
// a temp B-tree on the full transmissions×observations join.
|
||||
//
|
||||
// The returned page is in newest-LatestSeen-FIRST (DESC) order.
|
||||
// The Go side re-orders the emitted rows ASC below (fix #3) so the
|
||||
// contract matches the in-memory path's tail-of-msgOrder convention.
|
||||
pageSQL := `SELECT t.id,
|
||||
COALESCE((SELECT MAX(timestamp) FROM observations WHERE transmission_id = t.id), 0) AS latest_obs_epoch
|
||||
FROM transmissions t
|
||||
WHERE t.channel_hash = ? AND t.payload_type = 5
|
||||
ORDER BY latest_obs_epoch DESC, t.id DESC
|
||||
LIMIT ? OFFSET ?`
|
||||
if len(regionCodes) > 0 {
|
||||
pageSQL = `SELECT id FROM (
|
||||
SELECT t.id, t.first_seen FROM transmissions t
|
||||
WHERE t.channel_hash = ? AND t.payload_type = 5` + regionFilter + `
|
||||
ORDER BY t.first_seen DESC
|
||||
LIMIT ? OFFSET ?
|
||||
) sub
|
||||
ORDER BY first_seen ASC`
|
||||
} else {
|
||||
pageSQL += ` ORDER BY (SELECT first_seen FROM transmissions WHERE id = t.id) ASC`
|
||||
pageSQL = `SELECT t.id,
|
||||
COALESCE((SELECT MAX(timestamp) FROM observations WHERE transmission_id = t.id), 0) AS latest_obs_epoch
|
||||
FROM transmissions t
|
||||
WHERE t.channel_hash = ? AND t.payload_type = 5` + regionFilter + `
|
||||
ORDER BY latest_obs_epoch DESC, t.id DESC
|
||||
LIMIT ? OFFSET ?`
|
||||
}
|
||||
pageArgs := []interface{}{channelHash}
|
||||
pageArgs = append(pageArgs, regionArgs...)
|
||||
@@ -1666,7 +1677,8 @@ func (db *DB) GetChannelMessages(channelHash string, limit, offset int, region .
|
||||
pageIDs := make([]int, 0, limit)
|
||||
for idRows.Next() {
|
||||
var id int
|
||||
if err := idRows.Scan(&id); err == nil {
|
||||
var le sql.NullInt64
|
||||
if err := idRows.Scan(&id, &le); err == nil {
|
||||
pageIDs = append(pageIDs, id)
|
||||
}
|
||||
}
|
||||
@@ -1688,7 +1700,7 @@ func (db *DB) GetChannelMessages(channelHash string, limit, offset int, region .
|
||||
var obsSQL string
|
||||
if db.isV3 {
|
||||
obsSQL = `SELECT o.id, t.id, t.hash, t.decoded_json, t.first_seen,
|
||||
obs.id, obs.name, o.snr, o.path_json
|
||||
obs.id, obs.name, o.snr, o.path_json, o.timestamp
|
||||
FROM observations o
|
||||
JOIN transmissions t ON t.id = o.transmission_id
|
||||
LEFT JOIN observers obs ON obs.rowid = o.observer_idx
|
||||
@@ -1696,7 +1708,7 @@ func (db *DB) GetChannelMessages(channelHash string, limit, offset int, region .
|
||||
ORDER BY o.id ASC`
|
||||
} else {
|
||||
obsSQL = `SELECT o.id, t.id, t.hash, t.decoded_json, t.first_seen,
|
||||
o.observer_id, o.observer_name, o.snr, o.path_json
|
||||
o.observer_id, o.observer_name, o.snr, o.path_json, o.timestamp
|
||||
FROM observations o
|
||||
JOIN transmissions t ON t.id = o.transmission_id
|
||||
WHERE t.id IN (` + strings.Join(idPlaceholders, ",") + `)
|
||||
@@ -1710,8 +1722,9 @@ func (db *DB) GetChannelMessages(channelHash string, limit, offset int, region .
|
||||
defer rows.Close()
|
||||
|
||||
type msg struct {
|
||||
Data map[string]interface{}
|
||||
Repeats int
|
||||
Data map[string]interface{}
|
||||
Repeats int
|
||||
LatestEpoch int64 // max observation timestamp (unix seconds) — issue #1366
|
||||
}
|
||||
msgMap := make(map[int]*msg, len(pageIDs))
|
||||
|
||||
@@ -1719,12 +1732,16 @@ func (db *DB) GetChannelMessages(channelHash string, limit, offset int, region .
|
||||
var pktID, txID int
|
||||
var pktHash, dj, fs, obsID, obsName, pathJSON sql.NullString
|
||||
var snr sql.NullFloat64
|
||||
rows.Scan(&pktID, &txID, &pktHash, &dj, &fs, &obsID, &obsName, &snr, &pathJSON)
|
||||
var obsTs sql.NullInt64
|
||||
rows.Scan(&pktID, &txID, &pktHash, &dj, &fs, &obsID, &obsName, &snr, &pathJSON, &obsTs)
|
||||
if !dj.Valid {
|
||||
continue
|
||||
}
|
||||
if existing, ok := msgMap[txID]; ok {
|
||||
existing.Repeats++
|
||||
if obsTs.Valid && obsTs.Int64 > existing.LatestEpoch {
|
||||
existing.LatestEpoch = obsTs.Int64
|
||||
}
|
||||
continue
|
||||
}
|
||||
var decoded map[string]interface{}
|
||||
@@ -1759,6 +1776,7 @@ func (db *DB) GetChannelMessages(channelHash string, limit, offset int, region .
|
||||
"sender": displaySender,
|
||||
"text": displayText,
|
||||
"timestamp": nullStr(fs),
|
||||
"first_seen": nullStr(fs),
|
||||
"sender_timestamp": senderTs,
|
||||
"packetId": pktID,
|
||||
"packetHash": nullStr(pktHash),
|
||||
@@ -1769,6 +1787,9 @@ func (db *DB) GetChannelMessages(channelHash string, limit, offset int, region .
|
||||
},
|
||||
Repeats: 1,
|
||||
}
|
||||
if obsTs.Valid {
|
||||
m.LatestEpoch = obsTs.Int64
|
||||
}
|
||||
if obsName.Valid {
|
||||
m.Data["observers"] = []string{obsName.String}
|
||||
} else if obsID.Valid {
|
||||
@@ -1777,7 +1798,16 @@ func (db *DB) GetChannelMessages(channelHash string, limit, offset int, region .
|
||||
msgMap[txID] = m
|
||||
}
|
||||
|
||||
messages := make([]map[string]interface{}, 0, len(pageIDs))
|
||||
// Issue #1366 follow-up: emit batch sorted by LatestSeen ascending
|
||||
// (newest LAST) — matches the in-memory path's tail-of-msgOrder
|
||||
// convention and the frontend's scrollToBottom() behavior. pageIDs
|
||||
// order is not LatestSeen-ordered for in-page rows after fix #2.
|
||||
type emitted struct {
|
||||
latestEpoch int64
|
||||
txID int
|
||||
data map[string]interface{}
|
||||
}
|
||||
rowsOut := make([]emitted, 0, len(pageIDs))
|
||||
for _, id := range pageIDs {
|
||||
m, ok := msgMap[id]
|
||||
if !ok {
|
||||
@@ -1787,7 +1817,22 @@ func (db *DB) GetChannelMessages(channelHash string, limit, offset int, region .
|
||||
continue
|
||||
}
|
||||
m.Data["repeats"] = m.Repeats
|
||||
messages = append(messages, m.Data)
|
||||
// Issue #1366: emit LatestSeen (max obs timestamp) as the rendered
|
||||
// `timestamp` field. `first_seen` stays alongside for debug.
|
||||
if m.LatestEpoch > 0 {
|
||||
m.Data["timestamp"] = time.Unix(m.LatestEpoch, 0).UTC().Format(time.RFC3339)
|
||||
}
|
||||
rowsOut = append(rowsOut, emitted{latestEpoch: m.LatestEpoch, txID: id, data: m.Data})
|
||||
}
|
||||
sort.SliceStable(rowsOut, func(i, j int) bool {
|
||||
if rowsOut[i].latestEpoch != rowsOut[j].latestEpoch {
|
||||
return rowsOut[i].latestEpoch < rowsOut[j].latestEpoch
|
||||
}
|
||||
return rowsOut[i].txID < rowsOut[j].txID
|
||||
})
|
||||
messages := make([]map[string]interface{}, 0, len(rowsOut))
|
||||
for _, e := range rowsOut {
|
||||
messages = append(messages, e.data)
|
||||
}
|
||||
|
||||
return messages, total, nil
|
||||
|
||||
@@ -120,6 +120,16 @@ func setupTestDB(t *testing.T) *DB {
|
||||
WHERE id = NEW.id;
|
||||
END;
|
||||
CREATE INDEX IF NOT EXISTS idx_transmissions_from_pubkey ON transmissions(from_pubkey);
|
||||
|
||||
-- Mirror prod indexes from internal/dbschema/dbschema.go so query plans
|
||||
-- in tests match prod. idx_observations_transmission_id is required by
|
||||
-- GetChannelMessages's grouped MAX(timestamp) per tx aggregate
|
||||
-- (issue #1366 / PR #1368): without it the perf test on 1500 tx × 50 obs
|
||||
-- blows the 1.5s budget under -race.
|
||||
CREATE INDEX IF NOT EXISTS idx_observations_transmission_id ON observations(transmission_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_observations_timestamp ON observations(timestamp);
|
||||
CREATE INDEX IF NOT EXISTS idx_observations_tx_ts ON observations(transmission_id, timestamp);
|
||||
CREATE INDEX IF NOT EXISTS idx_transmissions_channel_hash ON transmissions(channel_hash);
|
||||
`
|
||||
if _, err := conn.Exec(schema); err != nil {
|
||||
t.Fatal(err)
|
||||
|
||||
+27
-1
@@ -4791,6 +4791,19 @@ func (s *PacketStore) GetChannelMessages(channelHash string, limit, offset int,
|
||||
|
||||
senderTs := decoded.SenderTimestamp
|
||||
|
||||
// Issue #1366: emit tx.LatestSeen (max observation timestamp,
|
||||
// server UTC) as the rendered timestamp — NOT tx.FirstSeen,
|
||||
// which stays pinned at the first-ever observation of a hash
|
||||
// and lags reality for heartbeat-style retransmissions. Fall
|
||||
// back to FirstSeen only when LatestSeen is empty (no obs).
|
||||
// sender_timestamp from the decoded payload is NOT used as the
|
||||
// rendered field: client RTCs are unreliable. It remains in
|
||||
// the response for debug surfaces.
|
||||
displayTs := tx.LatestSeen
|
||||
if displayTs == "" {
|
||||
displayTs = tx.FirstSeen
|
||||
}
|
||||
|
||||
observers := []string{}
|
||||
obsName := tx.ObserverName
|
||||
if obsName == "" {
|
||||
@@ -4804,7 +4817,8 @@ func (s *PacketStore) GetChannelMessages(channelHash string, limit, offset int,
|
||||
Data: map[string]interface{}{
|
||||
"sender": displaySender,
|
||||
"text": displayText,
|
||||
"timestamp": strOrNil(tx.FirstSeen),
|
||||
"timestamp": strOrNil(displayTs),
|
||||
"first_seen": strOrNil(tx.FirstSeen),
|
||||
"sender_timestamp": senderTs,
|
||||
"packetId": tx.ID,
|
||||
"packetHash": strOrNil(tx.Hash),
|
||||
@@ -4821,6 +4835,18 @@ func (s *PacketStore) GetChannelMessages(channelHash string, limit, offset int,
|
||||
}
|
||||
}
|
||||
|
||||
// Issue #1366 follow-up: msgOrder is in tx insertion order
|
||||
// (≈ FirstSeen ascending). Re-sort by the rendered timestamp field
|
||||
// (= LatestSeen, set above) ascending, so the page tail = newest
|
||||
// LatestSeen. Without this, a long-running heartbeat with old
|
||||
// FirstSeen but fresh LatestSeen ends up at the head of msgOrder
|
||||
// and gets sliced off by the tail selection below.
|
||||
sort.SliceStable(msgOrder, func(i, j int) bool {
|
||||
ti, _ := msgMap[msgOrder[i]].Data["timestamp"].(string)
|
||||
tj, _ := msgMap[msgOrder[j]].Data["timestamp"].(string)
|
||||
return ti < tj
|
||||
})
|
||||
|
||||
total := len(msgOrder)
|
||||
// Return latest messages (tail)
|
||||
start := total - limit - offset
|
||||
|
||||
@@ -161,6 +161,10 @@ func ensureServerIndexes(rw *sql.DB) error {
|
||||
`CREATE INDEX IF NOT EXISTS idx_transmissions_payload_type ON transmissions(payload_type)`,
|
||||
`CREATE INDEX IF NOT EXISTS idx_observations_timestamp ON observations(timestamp)`,
|
||||
`CREATE INDEX IF NOT EXISTS idx_observations_transmission_id ON observations(transmission_id)`,
|
||||
// Composite covers GetChannelMessages' grouped MAX(timestamp) per
|
||||
// transmission_id (issue #1366 / PR #1368). With this index sqlite can
|
||||
// satisfy the aggregate index-only without touching the heap.
|
||||
`CREATE INDEX IF NOT EXISTS idx_observations_tx_ts ON observations(transmission_id, timestamp)`,
|
||||
}
|
||||
for _, s := range stmts {
|
||||
if _, err := rw.Exec(s); err != nil {
|
||||
|
||||
Reference in New Issue
Block a user