fix: address review — observation ordering, stale comments, affected query functions

- Load() SQL: keep o.timestamp DESC (consistent with IngestNewFromDB) so
  pickBestObservation tie-breaking is identical on both load paths
- GetTimestamps: scan from tail instead of head (was breaking on first item
  assuming it was the newest, now correctly reads from newest end)
- QueryMultiNodePackets: apply same DESC/ASC tail-read pagination as
  QueryPackets (was sorting for ASC and assuming DESC as-is)
- GetNodeHealth recentPackets: read from tail to return 20 newest items
  (was reading from head = 20 oldest items)
- Remove stale "Prepend (newest first)" comments, replace with accurate
  "oldest-first; new items go to tail" wording

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
efiten
2026-03-28 22:04:17 +01:00
committed by KpaBap
parent 03cfd114da
commit 380b1b1e28
+30 -26
View File
@@ -176,7 +176,7 @@ func (s *PacketStore) Load() error {
FROM transmissions t
LEFT JOIN observations o ON o.transmission_id = t.id
LEFT JOIN observers obs ON obs.rowid = o.observer_idx
ORDER BY t.first_seen ASC, o.timestamp ASC`
ORDER BY t.first_seen ASC, o.timestamp DESC`
} else {
loadSQL = `SELECT t.id, t.raw_hex, t.hash, t.first_seen, t.route_type,
t.payload_type, t.payload_version, t.decoded_json,
@@ -184,7 +184,7 @@ func (s *PacketStore) Load() error {
o.snr, o.rssi, o.score, o.path_json, o.timestamp
FROM transmissions t
LEFT JOIN observations o ON o.transmission_id = t.id
ORDER BY t.first_seen ASC, o.timestamp ASC`
ORDER BY t.first_seen ASC, o.timestamp DESC`
}
rows, err := s.db.conn.Query(loadSQL)
@@ -723,15 +723,16 @@ func (s *PacketStore) GetTimestamps(since string) []string {
s.mu.RLock()
defer s.mu.RUnlock()
// packets sorted newest first — scan from start until older than since
// packets sorted oldest-first — scan from tail until we reach items older than since
var result []string
for _, tx := range s.packets {
for i := len(s.packets) - 1; i >= 0; i-- {
tx := s.packets[i]
if tx.FirstSeen <= since {
break
}
result = append(result, tx.FirstSeen)
}
// Reverse to get ASC order
// result is currently newest-first; reverse to return ASC order
for i, j := 0, len(result)-1; i < j; i, j = i+1, j-1 {
result[i], result[j] = result[j], result[i]
}
@@ -781,23 +782,30 @@ func (s *PacketStore) QueryMultiNodePackets(pubkeys []string, limit, offset int,
total := len(filtered)
if order == "ASC" {
sort.Slice(filtered, func(i, j int) bool {
return filtered[i].FirstSeen < filtered[j].FirstSeen
})
}
// filtered is oldest-first (built by iterating s.packets forward).
// Apply same DESC/ASC pagination logic as QueryPackets.
if offset >= total {
return &PacketResult{Packets: []map[string]interface{}{}, Total: total}
}
end := offset + limit
if end > total {
end = total
pageSize := limit
if offset+pageSize > total {
pageSize = total - offset
}
packets := make([]map[string]interface{}, 0, end-offset)
for _, tx := range filtered[offset:end] {
packets = append(packets, txToMap(tx))
packets := make([]map[string]interface{}, 0, pageSize)
if order == "ASC" {
for _, tx := range filtered[offset : offset+pageSize] {
packets = append(packets, txToMap(tx))
}
} else {
endIdx := total - offset
startIdx := endIdx - pageSize
if startIdx < 0 {
startIdx = 0
}
for i := endIdx - 1; i >= startIdx; i-- {
packets = append(packets, txToMap(filtered[i]))
}
}
return &PacketResult{Packets: packets, Total: total}
}
@@ -930,13 +938,12 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac
DecodedJSON: r.decodedJSON,
}
s.byHash[r.hash] = tx
// Prepend (newest first)
s.packets = append(s.packets, tx)
s.packets = append(s.packets, tx) // oldest-first; new items go to tail
s.byTxID[r.txID] = tx
s.indexByNode(tx)
if tx.PayloadType != nil {
pt := *tx.PayloadType
// Prepend to maintain newest-first order (matches Load ordering)
// Append to maintain oldest-first order (matches Load ordering)
// so GetChannelMessages reverse iteration stays correct
s.byPayloadType[pt] = append(s.byPayloadType[pt], tx)
}
@@ -1083,8 +1090,6 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac
s.cacheMu.Unlock()
}
log.Printf("[poller] IngestNewFromDB: found %d new txs, maxID %d->%d", len(result), sinceID, newMaxID)
return result, newMaxID
}
@@ -1267,8 +1272,7 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) int {
s.subpathCache = make(map[string]*cachedResult)
s.cacheMu.Unlock()
log.Printf("[poller] IngestNewObservations: updated %d existing txs, maxObsID %d->%d",
len(updatedTxs), sinceObsID, newMaxObsID)
// analytics caches cleared; no per-cycle log to avoid stdout overhead
}
return newMaxObsID
@@ -4072,13 +4076,13 @@ func (s *PacketStore) GetNodeHealth(pubkey string) (map[string]interface{}, erro
lhVal = lastHeard
}
// Recent packets (up to 20, newest first — packets are already sorted DESC)
// Recent packets (up to 20, newest first — read from tail of oldest-first slice)
recentLimit := 20
if len(packets) < recentLimit {
recentLimit = len(packets)
}
recentPackets := make([]map[string]interface{}, 0, recentLimit)
for i := 0; i < recentLimit; i++ {
for i := len(packets) - 1; i >= len(packets)-recentLimit; i-- {
p := txToMap(packets[i])
delete(p, "observations")
recentPackets = append(recentPackets, p)