From 380b1b1e287c07efd66302efa10988a8ed99a33a Mon Sep 17 00:00:00 2001 From: efiten Date: Sat, 28 Mar 2026 22:04:17 +0100 Subject: [PATCH] =?UTF-8?q?fix:=20address=20review=20=E2=80=94=20observati?= =?UTF-8?q?on=20ordering,=20stale=20comments,=20affected=20query=20functio?= =?UTF-8?q?ns?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Load() SQL: keep o.timestamp DESC (consistent with IngestNewFromDB) so pickBestObservation tie-breaking is identical on both load paths - GetTimestamps: scan from tail instead of head (was breaking on first item assuming it was the newest, now correctly reads from newest end) - QueryMultiNodePackets: apply same DESC/ASC tail-read pagination as QueryPackets (was sorting for ASC and assuming DESC as-is) - GetNodeHealth recentPackets: read from tail to return 20 newest items (was reading from head = 20 oldest items) - Remove stale "Prepend (newest first)" comments, replace with accurate "oldest-first; new items go to tail" wording Co-Authored-By: Claude Sonnet 4.6 --- cmd/server/store.go | 56 ++++++++++++++++++++++++--------------------- 1 file changed, 30 insertions(+), 26 deletions(-) diff --git a/cmd/server/store.go b/cmd/server/store.go index f546ef09..9d3b3dd4 100644 --- a/cmd/server/store.go +++ b/cmd/server/store.go @@ -176,7 +176,7 @@ func (s *PacketStore) Load() error { FROM transmissions t LEFT JOIN observations o ON o.transmission_id = t.id LEFT JOIN observers obs ON obs.rowid = o.observer_idx - ORDER BY t.first_seen ASC, o.timestamp ASC` + ORDER BY t.first_seen ASC, o.timestamp DESC` } else { loadSQL = `SELECT t.id, t.raw_hex, t.hash, t.first_seen, t.route_type, t.payload_type, t.payload_version, t.decoded_json, @@ -184,7 +184,7 @@ func (s *PacketStore) Load() error { o.snr, o.rssi, o.score, o.path_json, o.timestamp FROM transmissions t LEFT JOIN observations o ON o.transmission_id = t.id - ORDER BY t.first_seen ASC, o.timestamp ASC` + ORDER BY t.first_seen ASC, o.timestamp DESC` } rows, err := s.db.conn.Query(loadSQL) @@ -723,15 +723,16 @@ func (s *PacketStore) GetTimestamps(since string) []string { s.mu.RLock() defer s.mu.RUnlock() - // packets sorted newest first — scan from start until older than since + // packets sorted oldest-first — scan from tail until we reach items older than since var result []string - for _, tx := range s.packets { + for i := len(s.packets) - 1; i >= 0; i-- { + tx := s.packets[i] if tx.FirstSeen <= since { break } result = append(result, tx.FirstSeen) } - // Reverse to get ASC order + // result is currently newest-first; reverse to return ASC order for i, j := 0, len(result)-1; i < j; i, j = i+1, j-1 { result[i], result[j] = result[j], result[i] } @@ -781,23 +782,30 @@ func (s *PacketStore) QueryMultiNodePackets(pubkeys []string, limit, offset int, total := len(filtered) - if order == "ASC" { - sort.Slice(filtered, func(i, j int) bool { - return filtered[i].FirstSeen < filtered[j].FirstSeen - }) - } - + // filtered is oldest-first (built by iterating s.packets forward). + // Apply same DESC/ASC pagination logic as QueryPackets. if offset >= total { return &PacketResult{Packets: []map[string]interface{}{}, Total: total} } - end := offset + limit - if end > total { - end = total + pageSize := limit + if offset+pageSize > total { + pageSize = total - offset } - packets := make([]map[string]interface{}, 0, end-offset) - for _, tx := range filtered[offset:end] { - packets = append(packets, txToMap(tx)) + packets := make([]map[string]interface{}, 0, pageSize) + if order == "ASC" { + for _, tx := range filtered[offset : offset+pageSize] { + packets = append(packets, txToMap(tx)) + } + } else { + endIdx := total - offset + startIdx := endIdx - pageSize + if startIdx < 0 { + startIdx = 0 + } + for i := endIdx - 1; i >= startIdx; i-- { + packets = append(packets, txToMap(filtered[i])) + } } return &PacketResult{Packets: packets, Total: total} } @@ -930,13 +938,12 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac DecodedJSON: r.decodedJSON, } s.byHash[r.hash] = tx - // Prepend (newest first) - s.packets = append(s.packets, tx) + s.packets = append(s.packets, tx) // oldest-first; new items go to tail s.byTxID[r.txID] = tx s.indexByNode(tx) if tx.PayloadType != nil { pt := *tx.PayloadType - // Prepend to maintain newest-first order (matches Load ordering) + // Append to maintain oldest-first order (matches Load ordering) // so GetChannelMessages reverse iteration stays correct s.byPayloadType[pt] = append(s.byPayloadType[pt], tx) } @@ -1083,8 +1090,6 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac s.cacheMu.Unlock() } - log.Printf("[poller] IngestNewFromDB: found %d new txs, maxID %d->%d", len(result), sinceID, newMaxID) - return result, newMaxID } @@ -1267,8 +1272,7 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) int { s.subpathCache = make(map[string]*cachedResult) s.cacheMu.Unlock() - log.Printf("[poller] IngestNewObservations: updated %d existing txs, maxObsID %d->%d", - len(updatedTxs), sinceObsID, newMaxObsID) + // analytics caches cleared; no per-cycle log to avoid stdout overhead } return newMaxObsID @@ -4072,13 +4076,13 @@ func (s *PacketStore) GetNodeHealth(pubkey string) (map[string]interface{}, erro lhVal = lastHeard } - // Recent packets (up to 20, newest first — packets are already sorted DESC) + // Recent packets (up to 20, newest first — read from tail of oldest-first slice) recentLimit := 20 if len(packets) < recentLimit { recentLimit = len(packets) } recentPackets := make([]map[string]interface{}, 0, recentLimit) - for i := 0; i < recentLimit; i++ { + for i := len(packets) - 1; i >= len(packets)-recentLimit; i-- { p := txToMap(packets[i]) delete(p, "observations") recentPackets = append(recentPackets, p)