diff --git a/cmd/server/channels_message_order_1366_test.go b/cmd/server/channels_message_order_1366_test.go new file mode 100644 index 00000000..a4f6c149 --- /dev/null +++ b/cmd/server/channels_message_order_1366_test.go @@ -0,0 +1,354 @@ +package main + +// Regression tests for issue #1366: Channel view shows stale timestamps +// because GetChannelMessages emits tx.FirstSeen (first-observation time) +// when the operator-visible expectation is the latest observation time +// (tx.LatestSeen). For repeated heartbeat-style messages whose tx.Hash is +// stable, FirstSeen stays pinned to the very first observation while the +// real-world transmission keeps repeating, producing a multi-hour gap +// between the channel view and the operator's live MeshCore client. +// +// Server-side UTC clocks are trusted; client-reported sender_timestamp +// is NOT (firmware lacks reliable wall-clock on many builds). Therefore +// the fix uses tx.LatestSeen (== max observation timestamp), NOT +// sender_timestamp. sender_timestamp remains exposed in the response +// for debug surfaces but MUST NOT be the rendered field. + +import ( + "strconv" + "testing" + "time" +) + +// TestChannelMessages_TimestampUsesLatestSeen: a CHAN tx with multiple +// observations spanning hours must render with the LATEST observation +// timestamp, not the first-seen ingest time. +func TestChannelMessages_TimestampUsesLatestSeen(t *testing.T) { + db := setupTestDB(t) + defer db.Close() + + now := time.Now().UTC() + firstSeen := now.Add(-7 * time.Hour).Format(time.RFC3339) + firstSeenEpoch := now.Add(-7 * time.Hour).Unix() + laterEpoch := now.Add(-5 * time.Minute).Unix() + _ = laterEpoch + + db.conn.Exec(`INSERT INTO observers (id, name, iata, last_seen, first_seen, packet_count) + VALUES ('obsA', 'ObsA', 'SJC', ?, '2026-01-01T00:00:00Z', 10)`, firstSeen) + db.conn.Exec(`INSERT INTO observers (id, name, iata, last_seen, first_seen, packet_count) + VALUES ('obsB', 'ObsB', 'LAX', ?, '2026-01-01T00:00:00Z', 10)`, firstSeen) + + // One transmission with two observations: T0 (7h ago) and T1 (5m ago). + db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash) + VALUES ('AA01', 'hash_repeated_msg', ?, 1, 5, + '{"type":"CHAN","channel":"#test","text":"Heartbeat: ping","sender":"Heartbeat","sender_timestamp":` + + strconv.FormatInt(firstSeenEpoch, 10) + `}', + '#test')`, firstSeen) + db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp) + VALUES (1, 1, 10.0, -90, '["aa"]', ?)`, firstSeenEpoch) + db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp) + VALUES (1, 2, 11.0, -88, '["bb"]', ?)`, laterEpoch) + + store := NewPacketStore(db, nil) + store.Load() + + msgs, total := store.GetChannelMessages("#test", 10, 0) + if total != 1 { + t.Fatalf("want 1 msg, got %d (msgs=%+v)", total, msgs) + } + got, _ := msgs[0]["timestamp"].(string) + gotParsed, err := time.Parse(time.RFC3339, got) + if err != nil { + // Try the milli-second precision form that SQLite strftime emits. + gotParsed, err = time.Parse("2006-01-02T15:04:05.000Z", got) + if err != nil { + gotParsed, err = time.Parse("2006-01-02T15:04:05.000Z07:00", got) + } + } + if err != nil { + t.Fatalf("timestamp not parseable: %q (%v)", got, err) + } + // LatestSeen should equal the laterEpoch observation (±1s). + if delta := gotParsed.Unix() - laterEpoch; delta < -1 || delta > 1 { + t.Errorf("timestamp: want ~%s (LatestSeen, observation at T-5m), got %q (Δ=%ds — likely FirstSeen, issue #1366)", + time.Unix(laterEpoch, 0).UTC().Format(time.RFC3339), got, delta) + } + + // first_seen MUST also be exposed separately so the UI/debug can see + // when the analyzer first heard the packet (older than `timestamp`). + fs, _ := msgs[0]["first_seen"].(string) + if fs == "" { + t.Errorf("first_seen field must be exposed alongside timestamp; got empty") + } + if fs == got { + t.Errorf("first_seen should differ from latest-seen timestamp (both = %q)", got) + } +} + +// TestChannelMessages_TimestampNotSenderTimestamp: a CHAN tx whose +// decoded sender_timestamp is wildly off (e.g. client with bad RTC) +// must NOT cause the rendered timestamp to drift. Rendered timestamp +// must remain server UTC (LatestSeen/FirstSeen), regardless of what +// the client claimed. +func TestChannelMessages_TimestampNotSenderTimestamp(t *testing.T) { + db := setupTestDB(t) + defer db.Close() + + now := time.Now().UTC() + firstSeen := now.Add(-10 * time.Minute).Format(time.RFC3339) + firstSeenEpoch := now.Add(-10 * time.Minute).Unix() + + // Client claims it sent the message in year 2000 (bad RTC). + badSenderTs := int64(946684800) // 2000-01-01 UTC + + db.conn.Exec(`INSERT INTO observers (id, name, iata, last_seen, first_seen, packet_count) + VALUES ('obsX', 'ObsX', 'SJC', ?, '2026-01-01T00:00:00Z', 1)`, firstSeen) + db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash) + VALUES ('BB01', 'hash_bad_clock', ?, 1, 5, + '{"type":"CHAN","channel":"#bad","text":"Alice: ping","sender":"Alice","sender_timestamp":` + + strconv.FormatInt(badSenderTs, 10) + `}', + '#bad')`, firstSeen) + db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp) + VALUES (1, 1, 10.0, -90, '["aa"]', ?)`, firstSeenEpoch) + + store := NewPacketStore(db, nil) + store.Load() + + msgs, total := store.GetChannelMessages("#bad", 10, 0) + if total != 1 { + t.Fatalf("want 1 msg, got %d", total) + } + got, _ := msgs[0]["timestamp"].(string) + // MUST be the server-side observation time, parseable as RFC3339, and + // within ~1h of now — NOT the year-2000 client value. + parsed, err := time.Parse(time.RFC3339, got) + if err != nil { + t.Fatalf("timestamp not RFC3339: %q (%v)", got, err) + } + if parsed.Year() < now.Year() { + t.Errorf("rendered timestamp %q took on the client's bad sender_timestamp (year %d) instead of server UTC", + got, parsed.Year()) + } +} + +// TestChannelMessages_TimestampIsUTCZ: rendered timestamp MUST end with +// 'Z' (or +00:00) so the browser does NOT interpret it as a local-zone +// string and shift by the operator's TZ offset. +func TestChannelMessages_TimestampIsUTCZ(t *testing.T) { + db := setupTestDB(t) + defer db.Close() + + now := time.Now().UTC() + fs := now.Add(-30 * time.Minute).Format(time.RFC3339) + ep := now.Add(-30 * time.Minute).Unix() + + db.conn.Exec(`INSERT INTO observers (id, name, iata, last_seen, first_seen, packet_count) + VALUES ('obsZ', 'ObsZ', 'SJC', ?, '2026-01-01T00:00:00Z', 1)`, fs) + db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash) + VALUES ('ZZ01', 'hash_zone_check', ?, 1, 5, + '{"type":"CHAN","channel":"#zone","text":"Carol: ping","sender":"Carol"}', + '#zone')`, fs) + db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp) + VALUES (1, 1, 11.0, -89, '["zz"]', ?)`, ep) + + store := NewPacketStore(db, nil) + store.Load() + + msgs, _ := store.GetChannelMessages("#zone", 10, 0) + if len(msgs) != 1 { + t.Fatalf("want 1 msg, got %d", len(msgs)) + } + ts, _ := msgs[0]["timestamp"].(string) + if ts == "" { + t.Fatal("empty timestamp") + } + n := len(ts) + if !(ts[n-1] == 'Z' || (n >= 6 && ts[n-6:] == "+00:00")) { + t.Errorf("timestamp not UTC-suffixed (Z/+00:00): %q", ts) + } +} + +// TestChannelMessages_OrderedByLatestSeen: adversarial follow-up to #1366 +// (PR #1368). The earlier fix only adjusted the rendered `timestamp` +// field; page SELECTION and SORT ORDER on both the in-memory and DB +// paths still used FirstSeen. This test pins the contract: +// +// - tx-A: FirstSeen 24h ago, LatestSeen NOW (via a fresh observation). +// - tx-B: FirstSeen 1h ago, LatestSeen 1h ago (single observation). +// +// Both paths MUST: +// 1. Return BOTH transmissions in a small (limit=10) page — tx-A must +// not be excluded because its FirstSeen is old. +// 2. Return tx-A AFTER tx-B (newest-LatestSeen-LAST), matching the +// tail-of-msgOrder convention used by the rest of the API and +// the frontend's scrollToBottom(). +func TestChannelMessages_OrderedByLatestSeen_InMemory(t *testing.T) { + db := setupTestDB(t) + defer db.Close() + + now := time.Now().UTC() + tOld := now.Add(-24 * time.Hour) + tMid := now.Add(-1 * time.Hour) + tNewest := now.Add(-30 * time.Minute) + tFresh := now.Add(-1 * time.Minute) + + tOldStr := tOld.Format(time.RFC3339) + tMidStr := tMid.Format(time.RFC3339) + tNewestStr := tNewest.Format(time.RFC3339) + + db.conn.Exec(`INSERT INTO observers (id, name, iata, last_seen, first_seen, packet_count) + VALUES ('obsO', 'ObsO', 'SJC', ?, '2026-01-01T00:00:00Z', 10)`, tOldStr) + + // tx-A: FirstSeen 24h ago, LatestSeen NOW (T-1m). Old insertion order. + db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash) + VALUES ('AAAA', 'order_hash_a', ?, 1, 5, + '{"type":"CHAN","channel":"#ord","text":"Alpha: hb","sender":"Alpha"}', '#ord')`, tOldStr) + db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp) + VALUES (1, 1, 10.0, -90, '["aa"]', ?)`, tOld.Unix()) + db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp) + VALUES (1, 1, 11.0, -88, '["aa"]', ?)`, tFresh.Unix()) + + // tx-B: FirstSeen 1h ago, LatestSeen 1h ago. OLDEST. + db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash) + VALUES ('BBBB', 'order_hash_b', ?, 1, 5, + '{"type":"CHAN","channel":"#ord","text":"Bravo: msg","sender":"Bravo"}', '#ord')`, tMidStr) + db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp) + VALUES (2, 1, 9.0, -91, '["bb"]', ?)`, tMid.Unix()) + + // tx-C: FirstSeen 30m ago, LatestSeen 30m ago. Middle. + db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash) + VALUES ('CCCC', 'order_hash_c', ?, 1, 5, + '{"type":"CHAN","channel":"#ord","text":"Charlie: msg","sender":"Charlie"}', '#ord')`, tNewestStr) + db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp) + VALUES (3, 1, 9.0, -91, '["cc"]', ?)`, tNewest.Unix()) + + store := NewPacketStore(db, nil) + store.Load() + + // Full-page: ordering check (fix #1 gates this — without sort, + // msgOrder is insertion order and Alpha lands FIRST, not LAST). + msgsAll, totalAll := store.GetChannelMessages("#ord", 10, 0) + if totalAll != 3 { + t.Fatalf("in-memory: want total=3, got %d", totalAll) + } + if len(msgsAll) != 3 { + t.Fatalf("in-memory: want 3 msgs, got %d", len(msgsAll)) + } + wantOrder := []string{"Bravo", "Charlie", "Alpha"} + for i, want := range wantOrder { + got, _ := msgsAll[i]["sender"].(string) + if got != want { + t.Errorf("in-memory: msg[%d] want sender=%q, got %q (LatestSeen ASC, fix #1)", i, want, got) + } + } + + // Small page (limit=2): tx-A (Alpha) MUST be included because its + // LatestSeen is freshest, even though FirstSeen is oldest. Without + // fix #1, the in-memory path takes msgOrder[total-2:] which would + // drop Alpha (it sits at msgOrder[0] by insertion order). + msgsPage, _ := store.GetChannelMessages("#ord", 2, 0) + if len(msgsPage) != 2 { + t.Fatalf("in-memory: want 2 msgs at limit=2, got %d", len(msgsPage)) + } + hasAlpha := false + for _, m := range msgsPage { + if s, _ := m["sender"].(string); s == "Alpha" { + hasAlpha = true + } + } + if !hasAlpha { + t.Errorf("in-memory: tx-A (Alpha) excluded from limit=2 page — FirstSeen-based tail selection bug (fix #1 reverted?)") + } +} + +func TestChannelMessages_OrderedByLatestSeen_DB(t *testing.T) { + db := setupTestDB(t) + defer db.Close() + + now := time.Now().UTC() + tOld := now.Add(-24 * time.Hour) + tMid := now.Add(-1 * time.Hour) + tNewest := now.Add(-30 * time.Minute) + tFresh := now.Add(-1 * time.Minute) + + tOldStr := tOld.Format(time.RFC3339) + tMidStr := tMid.Format(time.RFC3339) + tNewestStr := tNewest.Format(time.RFC3339) + + db.conn.Exec(`INSERT INTO observers (id, name, iata, last_seen, first_seen, packet_count) + VALUES ('obsD', 'ObsD', 'SJC', ?, '2026-01-01T00:00:00Z', 10)`, tOldStr) + + // tx-A: FirstSeen 24h ago, observations at T-24h and T-1m (LatestSeen + // = T-1m, the FRESHEST). Despite the freshest LatestSeen, a + // FirstSeen-DESC selection would push it OFF a small page. + db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash) + VALUES ('AADB', 'order_db_hash_a', ?, 1, 5, + '{"type":"CHAN","channel":"#ordb","text":"Alpha: hb","sender":"Alpha"}', '#ordb')`, tOldStr) + db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp) + VALUES (1, 1, 10.0, -90, '["aa"]', ?)`, tOld.Unix()) + db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp) + VALUES (1, 1, 11.0, -88, '["aa"]', ?)`, tFresh.Unix()) + + // tx-B: FirstSeen 1h ago, LatestSeen 1h ago. OLDEST LatestSeen. + db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash) + VALUES ('BBDB', 'order_db_hash_b', ?, 1, 5, + '{"type":"CHAN","channel":"#ordb","text":"Bravo: msg","sender":"Bravo"}', '#ordb')`, tMidStr) + db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp) + VALUES (2, 1, 9.0, -91, '["bb"]', ?)`, tMid.Unix()) + + // tx-C: FirstSeen 30m ago, LatestSeen 30m ago. Middle LatestSeen. + // With FirstSeen-DESC selection + limit=2, page = [tx-C, tx-B] and + // tx-A is EXCLUDED — that's the selection bug fix #2 gates. + db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash) + VALUES ('CCDB', 'order_db_hash_c', ?, 1, 5, + '{"type":"CHAN","channel":"#ordb","text":"Charlie: msg","sender":"Charlie"}', '#ordb')`, tNewestStr) + db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp) + VALUES (3, 1, 9.0, -91, '["cc"]', ?)`, tNewest.Unix()) + + msgs, total, err := db.GetChannelMessages("#ordb", 2, 0) + if err != nil { + t.Fatal(err) + } + if total != 3 { + t.Fatalf("DB: want total=3, got %d", total) + } + if len(msgs) != 2 { + t.Fatalf("DB: want 2 msgs in page (limit=2), got %d", len(msgs)) + } + // Selection (fix #2): the page MUST include tx-A (Alpha) because its + // LatestSeen is the newest — even though its FirstSeen is the OLDEST. + // With limit=2 + LatestSeen-DESC selection, page = [Alpha, Charlie]. + // Returned ASC by LatestSeen (newest LAST, fix #3) = [Charlie, Alpha]. + sender0, _ := msgs[0]["sender"].(string) + sender1, _ := msgs[1]["sender"].(string) + if sender0 != "Charlie" || sender1 != "Alpha" { + t.Errorf("DB: want order [Charlie, Alpha] (page selected by LatestSeen DESC, returned ASC, fix #2+#3), got [%q, %q]", + sender0, sender1) + } + hasAlpha := false + for _, m := range msgs { + if s, _ := m["sender"].(string); s == "Alpha" { + hasAlpha = true + } + } + if !hasAlpha { + t.Errorf("DB: tx-A (Alpha) excluded from page — FirstSeen-based selection bug (fix #2 reverted?)") + } + + // Also exercise large-page case (limit > total): ordering-only check. + msgsAll, totalAll, err := db.GetChannelMessages("#ordb", 10, 0) + if err != nil { + t.Fatal(err) + } + if totalAll != 3 || len(msgsAll) != 3 { + t.Fatalf("DB: want all 3 msgs at limit=10, got total=%d len=%d", totalAll, len(msgsAll)) + } + // Expected ASC by LatestSeen: Bravo (T-1h), Charlie (T-30m), Alpha (T-1m). + wantOrder := []string{"Bravo", "Charlie", "Alpha"} + for i, want := range wantOrder { + got, _ := msgsAll[i]["sender"].(string) + if got != want { + t.Errorf("DB: msg[%d] want sender=%q, got %q (full order: must be LatestSeen ASC, fix #3)", i, want, got) + } + } +} diff --git a/cmd/server/db.go b/cmd/server/db.go index e5141076..0a69ecfb 100644 --- a/cmd/server/db.go +++ b/cmd/server/db.go @@ -1633,27 +1633,38 @@ func (db *DB) GetChannelMessages(channelHash string, limit, offset int, region . return nil, 0, err } - // 2) Page of transmission IDs — newest LIMIT msgs minus OFFSET, returned - // in ASC order to match prior API contract (tail of message log). - pageSQL := `SELECT t.id FROM ( - SELECT id FROM transmissions - WHERE channel_hash = ? AND payload_type = 5 - ORDER BY first_seen DESC - LIMIT ? OFFSET ? - ) t` - // When a region filter is in play, we must filter on the inner subquery - // against the transmissions table — re-use the same EXISTS form but - // wrap so we still get DESC-then-ASC pagination. + // 2) Page of transmission IDs — newest LIMIT msgs minus OFFSET. + // Issue #1366 follow-up (fix #2): select page by latest observation + // timestamp (LatestSeen) DESC, NOT by t.first_seen DESC — otherwise + // a heartbeat tx whose FirstSeen is 24h old but whose latest + // observation is fresh gets pushed off page 1. + // + // PR #1368 perf fix: use a correlated subquery for MAX(timestamp) per + // transmission. With the composite index idx_observations_tx_ts + // (transmission_id, timestamp) sqlite resolves MAX as an index-only + // rightmost-leaf lookup — total O(N_tx · log N_obs). The previously- + // used grouped derived table (`GROUP BY transmission_id` over the + // whole observations table) scanned all observation rows (O(N_obs)) + // and blew the 1.5s perf budget on 1500 tx × 50 obs under -race. + // LEFT JOIN + GROUP BY t.id was even slower because GROUP BY forced + // a temp B-tree on the full transmissions×observations join. + // + // The returned page is in newest-LatestSeen-FIRST (DESC) order. + // The Go side re-orders the emitted rows ASC below (fix #3) so the + // contract matches the in-memory path's tail-of-msgOrder convention. + pageSQL := `SELECT t.id, + COALESCE((SELECT MAX(timestamp) FROM observations WHERE transmission_id = t.id), 0) AS latest_obs_epoch + FROM transmissions t + WHERE t.channel_hash = ? AND t.payload_type = 5 + ORDER BY latest_obs_epoch DESC, t.id DESC + LIMIT ? OFFSET ?` if len(regionCodes) > 0 { - pageSQL = `SELECT id FROM ( - SELECT t.id, t.first_seen FROM transmissions t - WHERE t.channel_hash = ? AND t.payload_type = 5` + regionFilter + ` - ORDER BY t.first_seen DESC - LIMIT ? OFFSET ? - ) sub - ORDER BY first_seen ASC` - } else { - pageSQL += ` ORDER BY (SELECT first_seen FROM transmissions WHERE id = t.id) ASC` + pageSQL = `SELECT t.id, + COALESCE((SELECT MAX(timestamp) FROM observations WHERE transmission_id = t.id), 0) AS latest_obs_epoch + FROM transmissions t + WHERE t.channel_hash = ? AND t.payload_type = 5` + regionFilter + ` + ORDER BY latest_obs_epoch DESC, t.id DESC + LIMIT ? OFFSET ?` } pageArgs := []interface{}{channelHash} pageArgs = append(pageArgs, regionArgs...) @@ -1666,7 +1677,8 @@ func (db *DB) GetChannelMessages(channelHash string, limit, offset int, region . pageIDs := make([]int, 0, limit) for idRows.Next() { var id int - if err := idRows.Scan(&id); err == nil { + var le sql.NullInt64 + if err := idRows.Scan(&id, &le); err == nil { pageIDs = append(pageIDs, id) } } @@ -1688,7 +1700,7 @@ func (db *DB) GetChannelMessages(channelHash string, limit, offset int, region . var obsSQL string if db.isV3 { obsSQL = `SELECT o.id, t.id, t.hash, t.decoded_json, t.first_seen, - obs.id, obs.name, o.snr, o.path_json + obs.id, obs.name, o.snr, o.path_json, o.timestamp FROM observations o JOIN transmissions t ON t.id = o.transmission_id LEFT JOIN observers obs ON obs.rowid = o.observer_idx @@ -1696,7 +1708,7 @@ func (db *DB) GetChannelMessages(channelHash string, limit, offset int, region . ORDER BY o.id ASC` } else { obsSQL = `SELECT o.id, t.id, t.hash, t.decoded_json, t.first_seen, - o.observer_id, o.observer_name, o.snr, o.path_json + o.observer_id, o.observer_name, o.snr, o.path_json, o.timestamp FROM observations o JOIN transmissions t ON t.id = o.transmission_id WHERE t.id IN (` + strings.Join(idPlaceholders, ",") + `) @@ -1710,8 +1722,9 @@ func (db *DB) GetChannelMessages(channelHash string, limit, offset int, region . defer rows.Close() type msg struct { - Data map[string]interface{} - Repeats int + Data map[string]interface{} + Repeats int + LatestEpoch int64 // max observation timestamp (unix seconds) — issue #1366 } msgMap := make(map[int]*msg, len(pageIDs)) @@ -1719,12 +1732,16 @@ func (db *DB) GetChannelMessages(channelHash string, limit, offset int, region . var pktID, txID int var pktHash, dj, fs, obsID, obsName, pathJSON sql.NullString var snr sql.NullFloat64 - rows.Scan(&pktID, &txID, &pktHash, &dj, &fs, &obsID, &obsName, &snr, &pathJSON) + var obsTs sql.NullInt64 + rows.Scan(&pktID, &txID, &pktHash, &dj, &fs, &obsID, &obsName, &snr, &pathJSON, &obsTs) if !dj.Valid { continue } if existing, ok := msgMap[txID]; ok { existing.Repeats++ + if obsTs.Valid && obsTs.Int64 > existing.LatestEpoch { + existing.LatestEpoch = obsTs.Int64 + } continue } var decoded map[string]interface{} @@ -1759,6 +1776,7 @@ func (db *DB) GetChannelMessages(channelHash string, limit, offset int, region . "sender": displaySender, "text": displayText, "timestamp": nullStr(fs), + "first_seen": nullStr(fs), "sender_timestamp": senderTs, "packetId": pktID, "packetHash": nullStr(pktHash), @@ -1769,6 +1787,9 @@ func (db *DB) GetChannelMessages(channelHash string, limit, offset int, region . }, Repeats: 1, } + if obsTs.Valid { + m.LatestEpoch = obsTs.Int64 + } if obsName.Valid { m.Data["observers"] = []string{obsName.String} } else if obsID.Valid { @@ -1777,7 +1798,16 @@ func (db *DB) GetChannelMessages(channelHash string, limit, offset int, region . msgMap[txID] = m } - messages := make([]map[string]interface{}, 0, len(pageIDs)) + // Issue #1366 follow-up: emit batch sorted by LatestSeen ascending + // (newest LAST) — matches the in-memory path's tail-of-msgOrder + // convention and the frontend's scrollToBottom() behavior. pageIDs + // order is not LatestSeen-ordered for in-page rows after fix #2. + type emitted struct { + latestEpoch int64 + txID int + data map[string]interface{} + } + rowsOut := make([]emitted, 0, len(pageIDs)) for _, id := range pageIDs { m, ok := msgMap[id] if !ok { @@ -1787,7 +1817,22 @@ func (db *DB) GetChannelMessages(channelHash string, limit, offset int, region . continue } m.Data["repeats"] = m.Repeats - messages = append(messages, m.Data) + // Issue #1366: emit LatestSeen (max obs timestamp) as the rendered + // `timestamp` field. `first_seen` stays alongside for debug. + if m.LatestEpoch > 0 { + m.Data["timestamp"] = time.Unix(m.LatestEpoch, 0).UTC().Format(time.RFC3339) + } + rowsOut = append(rowsOut, emitted{latestEpoch: m.LatestEpoch, txID: id, data: m.Data}) + } + sort.SliceStable(rowsOut, func(i, j int) bool { + if rowsOut[i].latestEpoch != rowsOut[j].latestEpoch { + return rowsOut[i].latestEpoch < rowsOut[j].latestEpoch + } + return rowsOut[i].txID < rowsOut[j].txID + }) + messages := make([]map[string]interface{}, 0, len(rowsOut)) + for _, e := range rowsOut { + messages = append(messages, e.data) } return messages, total, nil diff --git a/cmd/server/db_test.go b/cmd/server/db_test.go index 613361f2..89e636c5 100644 --- a/cmd/server/db_test.go +++ b/cmd/server/db_test.go @@ -120,6 +120,16 @@ func setupTestDB(t *testing.T) *DB { WHERE id = NEW.id; END; CREATE INDEX IF NOT EXISTS idx_transmissions_from_pubkey ON transmissions(from_pubkey); + + -- Mirror prod indexes from internal/dbschema/dbschema.go so query plans + -- in tests match prod. idx_observations_transmission_id is required by + -- GetChannelMessages's grouped MAX(timestamp) per tx aggregate + -- (issue #1366 / PR #1368): without it the perf test on 1500 tx × 50 obs + -- blows the 1.5s budget under -race. + CREATE INDEX IF NOT EXISTS idx_observations_transmission_id ON observations(transmission_id); + CREATE INDEX IF NOT EXISTS idx_observations_timestamp ON observations(timestamp); + CREATE INDEX IF NOT EXISTS idx_observations_tx_ts ON observations(transmission_id, timestamp); + CREATE INDEX IF NOT EXISTS idx_transmissions_channel_hash ON transmissions(channel_hash); ` if _, err := conn.Exec(schema); err != nil { t.Fatal(err) diff --git a/cmd/server/store.go b/cmd/server/store.go index e793af2c..bab6c599 100644 --- a/cmd/server/store.go +++ b/cmd/server/store.go @@ -4791,6 +4791,19 @@ func (s *PacketStore) GetChannelMessages(channelHash string, limit, offset int, senderTs := decoded.SenderTimestamp + // Issue #1366: emit tx.LatestSeen (max observation timestamp, + // server UTC) as the rendered timestamp — NOT tx.FirstSeen, + // which stays pinned at the first-ever observation of a hash + // and lags reality for heartbeat-style retransmissions. Fall + // back to FirstSeen only when LatestSeen is empty (no obs). + // sender_timestamp from the decoded payload is NOT used as the + // rendered field: client RTCs are unreliable. It remains in + // the response for debug surfaces. + displayTs := tx.LatestSeen + if displayTs == "" { + displayTs = tx.FirstSeen + } + observers := []string{} obsName := tx.ObserverName if obsName == "" { @@ -4804,7 +4817,8 @@ func (s *PacketStore) GetChannelMessages(channelHash string, limit, offset int, Data: map[string]interface{}{ "sender": displaySender, "text": displayText, - "timestamp": strOrNil(tx.FirstSeen), + "timestamp": strOrNil(displayTs), + "first_seen": strOrNil(tx.FirstSeen), "sender_timestamp": senderTs, "packetId": tx.ID, "packetHash": strOrNil(tx.Hash), @@ -4821,6 +4835,18 @@ func (s *PacketStore) GetChannelMessages(channelHash string, limit, offset int, } } + // Issue #1366 follow-up: msgOrder is in tx insertion order + // (≈ FirstSeen ascending). Re-sort by the rendered timestamp field + // (= LatestSeen, set above) ascending, so the page tail = newest + // LatestSeen. Without this, a long-running heartbeat with old + // FirstSeen but fresh LatestSeen ends up at the head of msgOrder + // and gets sliced off by the tail selection below. + sort.SliceStable(msgOrder, func(i, j int) bool { + ti, _ := msgMap[msgOrder[i]].Data["timestamp"].(string) + tj, _ := msgMap[msgOrder[j]].Data["timestamp"].(string) + return ti < tj + }) + total := len(msgOrder) // Return latest messages (tail) start := total - limit - offset diff --git a/internal/dbschema/dbschema.go b/internal/dbschema/dbschema.go index 42241d63..d47b0bf7 100644 --- a/internal/dbschema/dbschema.go +++ b/internal/dbschema/dbschema.go @@ -161,6 +161,10 @@ func ensureServerIndexes(rw *sql.DB) error { `CREATE INDEX IF NOT EXISTS idx_transmissions_payload_type ON transmissions(payload_type)`, `CREATE INDEX IF NOT EXISTS idx_observations_timestamp ON observations(timestamp)`, `CREATE INDEX IF NOT EXISTS idx_observations_transmission_id ON observations(transmission_id)`, + // Composite covers GetChannelMessages' grouped MAX(timestamp) per + // transmission_id (issue #1366 / PR #1368). With this index sqlite can + // satisfy the aggregate index-only without touching the heap. + `CREATE INDEX IF NOT EXISTS idx_observations_tx_ts ON observations(transmission_id, timestamp)`, } for _, s := range stmts { if _, err := rw.Exec(s); err != nil {