fix: channels stale latest message from observation-timestamp ordering, fixes #171

db.GetChannels() queried packets_v (observation-level rows) ordered by
observation timestamp and always overwrote lastMessage. When an older
message had a later re-observation, it would overwrite the correct
latest message with stale data.

Fix: query transmissions table directly (one row per unique message)
ordered by first_seen. This ensures lastMessage always reflects the
most recently sent message, not the most recently observed one.

Also fix db.GetChannelMessages() to use first_seen ordering with
schema-aware queries (v2/v3), and add missing distCache/subpathCache
invalidation on packet ingestion.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
Kpa-clawbot
2026-03-27 16:01:54 -07:00
parent 30bcfff45b
commit 64bf3744e2
4 changed files with 85 additions and 13 deletions
+34 -11
View File
@@ -1213,8 +1213,11 @@ func (db *DB) GetTraces(hash string) ([]map[string]interface{}, error) {
}
// GetChannels returns channel list from GRP_TXT packets.
// Queries transmissions directly (not packets_v) to avoid observation-level
// duplicates that could cause stale lastMessage when an older message has
// a later re-observation timestamp.
func (db *DB) GetChannels() ([]map[string]interface{}, error) {
rows, err := db.conn.Query(`SELECT decoded_json, timestamp FROM packets_v WHERE payload_type = 5 ORDER BY timestamp ASC`)
rows, err := db.conn.Query(`SELECT decoded_json, first_seen FROM transmissions WHERE payload_type = 5 ORDER BY first_seen ASC`)
if err != nil {
return nil, err
}
@@ -1222,8 +1225,8 @@ func (db *DB) GetChannels() ([]map[string]interface{}, error) {
channelMap := map[string]map[string]interface{}{}
for rows.Next() {
var dj, ts sql.NullString
rows.Scan(&dj, &ts)
var dj, fs sql.NullString
rows.Scan(&dj, &fs)
if !dj.Valid {
continue
}
@@ -1246,13 +1249,13 @@ func (db *DB) GetChannels() ([]map[string]interface{}, error) {
ch = map[string]interface{}{
"hash": key, "name": channelName,
"lastMessage": nil, "lastSender": nil,
"messageCount": 0, "lastActivity": nullStr(ts),
"messageCount": 0, "lastActivity": nullStr(fs),
}
channelMap[key] = ch
}
ch["messageCount"] = ch["messageCount"].(int) + 1
if ts.Valid {
ch["lastActivity"] = ts.String
if fs.Valid {
ch["lastActivity"] = fs.String
}
if text, ok := decoded["text"].(string); ok && text != "" {
idx := strings.Index(text, ": ")
@@ -1275,12 +1278,32 @@ func (db *DB) GetChannels() ([]map[string]interface{}, error) {
}
// GetChannelMessages returns messages for a specific channel.
// Uses transmission-level ordering (first_seen) to ensure correct message
// sequence even when observations arrive out of order.
func (db *DB) GetChannelMessages(channelHash string, limit, offset int) ([]map[string]interface{}, int, error) {
if limit <= 0 {
limit = 100
}
rows, err := db.conn.Query(`SELECT id, hash, decoded_json, timestamp, observer_id, observer_name, snr, path_json
FROM packets_v WHERE payload_type = 5 ORDER BY timestamp ASC`)
var querySQL string
if db.isV3 {
querySQL = `SELECT o.id, t.hash, t.decoded_json, t.first_seen,
obs.id, obs.name, o.snr, o.path_json
FROM observations o
JOIN transmissions t ON t.id = o.transmission_id
LEFT JOIN observers obs ON obs.rowid = o.observer_idx
WHERE t.payload_type = 5
ORDER BY t.first_seen ASC`
} else {
querySQL = `SELECT o.id, t.hash, t.decoded_json, t.first_seen,
o.observer_id, o.observer_name, o.snr, o.path_json
FROM observations o
JOIN transmissions t ON t.id = o.transmission_id
WHERE t.payload_type = 5
ORDER BY t.first_seen ASC`
}
rows, err := db.conn.Query(querySQL)
if err != nil {
return nil, 0, err
}
@@ -1295,9 +1318,9 @@ func (db *DB) GetChannelMessages(channelHash string, limit, offset int) ([]map[s
for rows.Next() {
var pktID int
var pktHash, dj, ts, obsID, obsName, pathJSON sql.NullString
var pktHash, dj, fs, obsID, obsName, pathJSON sql.NullString
var snr sql.NullFloat64
rows.Scan(&pktID, &pktHash, &dj, &ts, &obsID, &obsName, &snr, &pathJSON)
rows.Scan(&pktID, &pktHash, &dj, &fs, &obsID, &obsName, &snr, &pathJSON)
if !dj.Valid {
continue
}
@@ -1354,7 +1377,7 @@ func (db *DB) GetChannelMessages(channelHash string, limit, offset int) ([]map[s
Data: map[string]interface{}{
"sender": displaySender,
"text": displayText,
"timestamp": nullStr(ts),
"timestamp": nullStr(fs),
"sender_timestamp": senderTs,
"packetId": pktID,
"packetHash": nullStr(pktHash),
+47
View File
@@ -1339,6 +1339,53 @@ func TestNullHelpers(t *testing.T) {
}
}
// TestGetChannelsStaleMessage verifies that GetChannels returns the newest message
// per channel even when an older message has a later observation timestamp.
// This is the regression test for #171.
func TestGetChannelsStaleMessage(t *testing.T) {
db := setupTestDB(t)
defer db.Close()
db.conn.Exec(`INSERT INTO observers (id, name, iata) VALUES ('obs1', 'Observer1', 'SJC')`)
db.conn.Exec(`INSERT INTO observers (id, name, iata) VALUES ('obs2', 'Observer2', 'SFO')`)
// Older message (first_seen T1)
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json)
VALUES ('AA', 'oldhash1', '2026-01-15T10:00:00Z', 1, 5,
'{"type":"CHAN","channel":"#test","text":"Alice: Old message","sender":"Alice"}')`)
// Newer message (first_seen T2 > T1)
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json)
VALUES ('BB', 'newhash2', '2026-01-15T10:05:00Z', 1, 5,
'{"type":"CHAN","channel":"#test","text":"Bob: New message","sender":"Bob"}')`)
// Observations: older message re-observed AFTER newer message (stale scenario)
db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, timestamp)
VALUES (1, 1, 12.0, -90, 1736935200)`) // old msg first obs
db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, timestamp)
VALUES (2, 1, 14.0, -88, 1736935500)`) // new msg obs
db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, timestamp)
VALUES (1, 2, 10.0, -95, 1736935800)`) // old msg re-observed LATER
channels, err := db.GetChannels()
if err != nil {
t.Fatal(err)
}
if len(channels) != 1 {
t.Fatalf("expected 1 channel, got %d", len(channels))
}
ch := channels[0]
if ch["lastMessage"] != "New message" {
t.Errorf("expected lastMessage='New message' (newest by first_seen), got %q", ch["lastMessage"])
}
if ch["lastSender"] != "Bob" {
t.Errorf("expected lastSender='Bob', got %q", ch["lastSender"])
}
if ch["messageCount"] != 2 {
t.Errorf("expected messageCount=2 (unique transmissions), got %v", ch["messageCount"])
}
}
func TestMain(m *testing.M) {
os.Exit(m.Run())
}
+2 -2
View File
@@ -1538,7 +1538,7 @@ func TestHandlerErrorChannels(t *testing.T) {
router := mux.NewRouter()
srv.RegisterRoutes(router)
db.conn.Exec("DROP VIEW IF EXISTS packets_v")
db.conn.Exec("DROP TABLE IF EXISTS transmissions")
req := httptest.NewRequest("GET", "/api/channels", nil)
w := httptest.NewRecorder()
@@ -1710,7 +1710,7 @@ func TestHandlerErrorChannelMessages(t *testing.T) {
router := mux.NewRouter()
srv.RegisterRoutes(router)
db.conn.Exec("DROP VIEW IF EXISTS packets_v")
db.conn.Exec("DROP TABLE IF EXISTS observations")
req := httptest.NewRequest("GET", "/api/channels/%23test/messages", nil)
w := httptest.NewRecorder()
+2
View File
@@ -980,6 +980,8 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac
s.topoCache = make(map[string]*cachedResult)
s.hashCache = make(map[string]*cachedResult)
s.chanCache = make(map[string]*cachedResult)
s.distCache = make(map[string]*cachedResult)
s.subpathCache = make(map[string]*cachedResult)
s.cacheMu.Unlock()
}