From 64bf3744e2146de3874fa7fc62112b68dcd5ab32 Mon Sep 17 00:00:00 2001 From: Kpa-clawbot <259247574+Kpa-clawbot@users.noreply.github.com> Date: Fri, 27 Mar 2026 16:01:54 -0700 Subject: [PATCH] fix: channels stale latest message from observation-timestamp ordering, fixes #171 db.GetChannels() queried packets_v (observation-level rows) ordered by observation timestamp and always overwrote lastMessage. When an older message had a later re-observation, it would overwrite the correct latest message with stale data. Fix: query transmissions table directly (one row per unique message) ordered by first_seen. This ensures lastMessage always reflects the most recently sent message, not the most recently observed one. Also fix db.GetChannelMessages() to use first_seen ordering with schema-aware queries (v2/v3), and add missing distCache/subpathCache invalidation on packet ingestion. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- cmd/server/db.go | 45 ++++++++++++++++++++++++++++--------- cmd/server/db_test.go | 47 +++++++++++++++++++++++++++++++++++++++ cmd/server/routes_test.go | 4 ++-- cmd/server/store.go | 2 ++ 4 files changed, 85 insertions(+), 13 deletions(-) diff --git a/cmd/server/db.go b/cmd/server/db.go index 26ef31a6..9379851f 100644 --- a/cmd/server/db.go +++ b/cmd/server/db.go @@ -1213,8 +1213,11 @@ func (db *DB) GetTraces(hash string) ([]map[string]interface{}, error) { } // GetChannels returns channel list from GRP_TXT packets. +// Queries transmissions directly (not packets_v) to avoid observation-level +// duplicates that could cause stale lastMessage when an older message has +// a later re-observation timestamp. func (db *DB) GetChannels() ([]map[string]interface{}, error) { - rows, err := db.conn.Query(`SELECT decoded_json, timestamp FROM packets_v WHERE payload_type = 5 ORDER BY timestamp ASC`) + rows, err := db.conn.Query(`SELECT decoded_json, first_seen FROM transmissions WHERE payload_type = 5 ORDER BY first_seen ASC`) if err != nil { return nil, err } @@ -1222,8 +1225,8 @@ func (db *DB) GetChannels() ([]map[string]interface{}, error) { channelMap := map[string]map[string]interface{}{} for rows.Next() { - var dj, ts sql.NullString - rows.Scan(&dj, &ts) + var dj, fs sql.NullString + rows.Scan(&dj, &fs) if !dj.Valid { continue } @@ -1246,13 +1249,13 @@ func (db *DB) GetChannels() ([]map[string]interface{}, error) { ch = map[string]interface{}{ "hash": key, "name": channelName, "lastMessage": nil, "lastSender": nil, - "messageCount": 0, "lastActivity": nullStr(ts), + "messageCount": 0, "lastActivity": nullStr(fs), } channelMap[key] = ch } ch["messageCount"] = ch["messageCount"].(int) + 1 - if ts.Valid { - ch["lastActivity"] = ts.String + if fs.Valid { + ch["lastActivity"] = fs.String } if text, ok := decoded["text"].(string); ok && text != "" { idx := strings.Index(text, ": ") @@ -1275,12 +1278,32 @@ func (db *DB) GetChannels() ([]map[string]interface{}, error) { } // GetChannelMessages returns messages for a specific channel. +// Uses transmission-level ordering (first_seen) to ensure correct message +// sequence even when observations arrive out of order. func (db *DB) GetChannelMessages(channelHash string, limit, offset int) ([]map[string]interface{}, int, error) { if limit <= 0 { limit = 100 } - rows, err := db.conn.Query(`SELECT id, hash, decoded_json, timestamp, observer_id, observer_name, snr, path_json - FROM packets_v WHERE payload_type = 5 ORDER BY timestamp ASC`) + + var querySQL string + if db.isV3 { + querySQL = `SELECT o.id, t.hash, t.decoded_json, t.first_seen, + obs.id, obs.name, o.snr, o.path_json + FROM observations o + JOIN transmissions t ON t.id = o.transmission_id + LEFT JOIN observers obs ON obs.rowid = o.observer_idx + WHERE t.payload_type = 5 + ORDER BY t.first_seen ASC` + } else { + querySQL = `SELECT o.id, t.hash, t.decoded_json, t.first_seen, + o.observer_id, o.observer_name, o.snr, o.path_json + FROM observations o + JOIN transmissions t ON t.id = o.transmission_id + WHERE t.payload_type = 5 + ORDER BY t.first_seen ASC` + } + + rows, err := db.conn.Query(querySQL) if err != nil { return nil, 0, err } @@ -1295,9 +1318,9 @@ func (db *DB) GetChannelMessages(channelHash string, limit, offset int) ([]map[s for rows.Next() { var pktID int - var pktHash, dj, ts, obsID, obsName, pathJSON sql.NullString + var pktHash, dj, fs, obsID, obsName, pathJSON sql.NullString var snr sql.NullFloat64 - rows.Scan(&pktID, &pktHash, &dj, &ts, &obsID, &obsName, &snr, &pathJSON) + rows.Scan(&pktID, &pktHash, &dj, &fs, &obsID, &obsName, &snr, &pathJSON) if !dj.Valid { continue } @@ -1354,7 +1377,7 @@ func (db *DB) GetChannelMessages(channelHash string, limit, offset int) ([]map[s Data: map[string]interface{}{ "sender": displaySender, "text": displayText, - "timestamp": nullStr(ts), + "timestamp": nullStr(fs), "sender_timestamp": senderTs, "packetId": pktID, "packetHash": nullStr(pktHash), diff --git a/cmd/server/db_test.go b/cmd/server/db_test.go index cb69afa9..385d0325 100644 --- a/cmd/server/db_test.go +++ b/cmd/server/db_test.go @@ -1339,6 +1339,53 @@ func TestNullHelpers(t *testing.T) { } } +// TestGetChannelsStaleMessage verifies that GetChannels returns the newest message +// per channel even when an older message has a later observation timestamp. +// This is the regression test for #171. +func TestGetChannelsStaleMessage(t *testing.T) { + db := setupTestDB(t) + defer db.Close() + + db.conn.Exec(`INSERT INTO observers (id, name, iata) VALUES ('obs1', 'Observer1', 'SJC')`) + db.conn.Exec(`INSERT INTO observers (id, name, iata) VALUES ('obs2', 'Observer2', 'SFO')`) + + // Older message (first_seen T1) + db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json) + VALUES ('AA', 'oldhash1', '2026-01-15T10:00:00Z', 1, 5, + '{"type":"CHAN","channel":"#test","text":"Alice: Old message","sender":"Alice"}')`) + // Newer message (first_seen T2 > T1) + db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json) + VALUES ('BB', 'newhash2', '2026-01-15T10:05:00Z', 1, 5, + '{"type":"CHAN","channel":"#test","text":"Bob: New message","sender":"Bob"}')`) + + // Observations: older message re-observed AFTER newer message (stale scenario) + db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, timestamp) + VALUES (1, 1, 12.0, -90, 1736935200)`) // old msg first obs + db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, timestamp) + VALUES (2, 1, 14.0, -88, 1736935500)`) // new msg obs + db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, timestamp) + VALUES (1, 2, 10.0, -95, 1736935800)`) // old msg re-observed LATER + + channels, err := db.GetChannels() + if err != nil { + t.Fatal(err) + } + if len(channels) != 1 { + t.Fatalf("expected 1 channel, got %d", len(channels)) + } + ch := channels[0] + + if ch["lastMessage"] != "New message" { + t.Errorf("expected lastMessage='New message' (newest by first_seen), got %q", ch["lastMessage"]) + } + if ch["lastSender"] != "Bob" { + t.Errorf("expected lastSender='Bob', got %q", ch["lastSender"]) + } + if ch["messageCount"] != 2 { + t.Errorf("expected messageCount=2 (unique transmissions), got %v", ch["messageCount"]) + } +} + func TestMain(m *testing.M) { os.Exit(m.Run()) } diff --git a/cmd/server/routes_test.go b/cmd/server/routes_test.go index a4cac080..bc2b9de9 100644 --- a/cmd/server/routes_test.go +++ b/cmd/server/routes_test.go @@ -1538,7 +1538,7 @@ func TestHandlerErrorChannels(t *testing.T) { router := mux.NewRouter() srv.RegisterRoutes(router) - db.conn.Exec("DROP VIEW IF EXISTS packets_v") + db.conn.Exec("DROP TABLE IF EXISTS transmissions") req := httptest.NewRequest("GET", "/api/channels", nil) w := httptest.NewRecorder() @@ -1710,7 +1710,7 @@ func TestHandlerErrorChannelMessages(t *testing.T) { router := mux.NewRouter() srv.RegisterRoutes(router) - db.conn.Exec("DROP VIEW IF EXISTS packets_v") + db.conn.Exec("DROP TABLE IF EXISTS observations") req := httptest.NewRequest("GET", "/api/channels/%23test/messages", nil) w := httptest.NewRecorder() diff --git a/cmd/server/store.go b/cmd/server/store.go index add7d3f4..9e233bbe 100644 --- a/cmd/server/store.go +++ b/cmd/server/store.go @@ -980,6 +980,8 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac s.topoCache = make(map[string]*cachedResult) s.hashCache = make(map[string]*cachedResult) s.chanCache = make(map[string]*cachedResult) + s.distCache = make(map[string]*cachedResult) + s.subpathCache = make(map[string]*cachedResult) s.cacheMu.Unlock() }