From 2627bd053b9ee90b40be192841998bddee2e1f71 Mon Sep 17 00:00:00 2001 From: Kpa-clawbot Date: Thu, 28 May 2026 12:16:29 -0700 Subject: [PATCH] fix(#1465): observer.last_seen always uses ingest time, not envelope (#1466) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary `observer.last_seen` (and `last_packet_at`) answer "when did the analyzer last hear from this observer" — fundamentally an ingest-time question. Previously both the status-message handler and the packet-message handler passed the MQTT envelope timestamp into `UpsertObserverAt` / `stmtUpdateObserverLastSeen`, which let buggy observer clocks drag `last_seen` hours into the past even when the timestamp parsed cleanly as RFC3339 (so #1464's naive-clamp didn't catch it). California observers on `analyzer.00id.net` consistently appeared 3-7h stale for this reason. ## Fix - `cmd/ingestor/main.go` status handler: pass `""` to `UpsertObserverAt` so it falls back to `time.Now()`. - `cmd/ingestor/main.go` packet-path observer upsert: same. - `cmd/ingestor/db.go` `InsertTransmission`'s `stmtUpdateObserverLastSeen.Exec` call: use `ingestNow` for both `last_seen` and `last_packet_at` (was `rxTime`). Per-packet rxTime semantics (`transmissions.first_seen`, `observations.timestamp`) are unchanged — those continue to use envelope time with the naive-clamp / 14h-future / 30d-past guards from #1463 / #1464. Per-hop SNR-vs-time analysis still works. ## TDD - Red: `test(#1465): observer.last_seen uses ingest time even with well-formed envelope (red)` - 3 new tests in `observer_lastseen_1465_test.go`: status-past, status-future, packet-path-past. - Status-past and packet-path-past assertions failed on master (envelope time stored verbatim). - Green: `fix(#1465): observer.last_seen always uses ingest time, not envelope` - All 3 new tests pass. - Pre-existing `TestInsertTransmissionUpdatesObserverLastSeen` and `TestLastPacketAtUpdatedOnPacketOnly` were encoding the buggy behavior; updated to assert ingest-time semantics. - Full `go test ./cmd/ingestor/...` green. ## Refs - Refs #1463 (root-cause investigation) - Refs #1464 (naive-clamp fix that handled malformed timestamps) - Closes #1465 --------- Co-authored-by: openclaw-bot --- cmd/ingestor/db.go | 8 +- cmd/ingestor/db_test.go | 30 ++++-- cmd/ingestor/main.go | 11 +- cmd/ingestor/observer_lastseen_1465_test.go | 109 ++++++++++++++++++++ 4 files changed, 146 insertions(+), 12 deletions(-) create mode 100644 cmd/ingestor/observer_lastseen_1465_test.go diff --git a/cmd/ingestor/db.go b/cmd/ingestor/db.go index 79771f3f..0f128775 100644 --- a/cmd/ingestor/db.go +++ b/cmd/ingestor/db.go @@ -748,9 +748,11 @@ func (s *Store) InsertTransmission(data *PacketData) (bool, error) { err := s.stmtGetObserverRowid.QueryRow(data.ObserverID).Scan(&rowid) if err == nil { observerIdx = &rowid - // Update observer last_seen and last_packet_at on every packet to prevent - // low-traffic observers from appearing offline (#463) - _, _ = s.stmtUpdateObserverLastSeen.Exec(ingestNow, rxTime, ingestNow, rxTime, rowid) + // observer.last_seen and last_packet_at answer "when did the analyzer + // last hear from this observer" — both are ingest-time questions. + // Per-packet rxTime is stored separately on observations/transmissions + // using envelope time (see InsertTransmission above). See #1465. + _, _ = s.stmtUpdateObserverLastSeen.Exec(ingestNow, ingestNow, ingestNow, ingestNow, rowid) } } diff --git a/cmd/ingestor/db_test.go b/cmd/ingestor/db_test.go index cc8d3e2c..3d964d14 100644 --- a/cmd/ingestor/db_test.go +++ b/cmd/ingestor/db_test.go @@ -554,18 +554,26 @@ func TestInsertTransmissionUpdatesObserverLastSeen(t *testing.T) { PathJSON: "[]", DecodedJSON: `{"type":"TXT_MSG"}`, } + before := time.Now().Unix() if _, err := s.InsertTransmission(data); err != nil { t.Fatal(err) } + after := time.Now().Unix() - // Verify last_seen was updated + // Verify last_seen was updated to INGEST time, not envelope time (#1465). var lastSeenAfter string s.db.QueryRow("SELECT last_seen FROM observers WHERE id = ?", "obs1").Scan(&lastSeenAfter) if lastSeenAfter == oldTime { t.Error("observer last_seen was NOT updated after packet insertion — low-traffic observers will appear offline") } - if lastSeenAfter != "2026-03-25T01:00:00Z" { - t.Errorf("expected last_seen=2026-03-25T01:00:00Z, got %s", lastSeenAfter) + ls, err := time.Parse(time.RFC3339, lastSeenAfter) + if err != nil { + t.Fatalf("last_seen %q not RFC3339: %v", lastSeenAfter, err) + } + if ls.Unix() < before-5 || ls.Unix() > after+5 { + t.Errorf("expected last_seen ≈ server now (in [%d, %d]), got %s (epoch %d). "+ + "observer.last_seen must use ingest time, not envelope time (#1465).", + before, after, lastSeenAfter, ls.Unix()) } } @@ -598,18 +606,26 @@ func TestLastPacketAtUpdatedOnPacketOnly(t *testing.T) { PathJSON: "[]", DecodedJSON: `{"type":"TXT_MSG"}`, } + before := time.Now().Unix() if _, err := s.InsertTransmission(data); err != nil { t.Fatal(err) } + after := time.Now().Unix() s.db.QueryRow("SELECT last_packet_at FROM observers WHERE id = ?", "obs1").Scan(&lastPacketAt) if !lastPacketAt.Valid { t.Fatal("expected last_packet_at to be non-NULL after InsertTransmission") } - // InsertTransmission uses `now = data.Timestamp || time.Now()`, so last_packet_at - // should match the packet's Timestamp when provided (same source-of-truth as last_seen). - if lastPacketAt.String != "2026-04-24T12:00:00Z" { - t.Errorf("expected last_packet_at=2026-04-24T12:00:00Z, got %s", lastPacketAt.String) + // last_packet_at, like last_seen, is "when did the analyzer last receive a + // packet from this observer" — an ingest-time question, independent of the + // envelope timestamp. See #1465. + lp, err := time.Parse(time.RFC3339, lastPacketAt.String) + if err != nil { + t.Fatalf("last_packet_at %q not RFC3339: %v", lastPacketAt.String, err) + } + if lp.Unix() < before-5 || lp.Unix() > after+5 { + t.Errorf("expected last_packet_at ≈ server now (in [%d, %d]), got %s (epoch %d)", + before, after, lastPacketAt.String, lp.Unix()) } // UpsertObserver again (status path) — last_packet_at should NOT change diff --git a/cmd/ingestor/main.go b/cmd/ingestor/main.go index da63e4b9..cdd63e89 100644 --- a/cmd/ingestor/main.go +++ b/cmd/ingestor/main.go @@ -487,7 +487,11 @@ func handleMessage(store *Store, tag string, source MQTTSource, m mqtt.Message, name, _ := msg["origin"].(string) iata := parts[1] meta := extractObserverMeta(msg) - if err := store.UpsertObserverAt(observerID, name, iata, meta, resolveRxTime(msg, tag)); err != nil { + // observer.last_seen is "when did the analyzer last hear from this + // observer" — fundamentally an ingest-time question. Passing "" makes + // UpsertObserverAt use time.Now(), independent of the envelope timestamp + // (which can be stale/skewed even when well-formed). See #1465. + if err := store.UpsertObserverAt(observerID, name, iata, meta, ""); err != nil { log.Printf("MQTT [%s] observer status error: %v", tag, err) } // Insert metrics sample from status message @@ -709,7 +713,10 @@ func handleMessage(store *Store, tag string, source MQTTSource, m mqtt.Message, if mqttMsg.Region != "" { effectiveRegion = mqttMsg.Region } - if err := store.UpsertObserverAt(observerID, origin, effectiveRegion, nil, mqttMsg.Timestamp); err != nil { + // Same as the status-path call above: observer.last_seen is ingest + // time, not envelope time. Per-packet rxTime (stored in observations + // via InsertTransmission) still uses envelope time. See #1465. + if err := store.UpsertObserverAt(observerID, origin, effectiveRegion, nil, ""); err != nil { log.Printf("MQTT [%s] observer upsert error: %v", tag, err) } } diff --git a/cmd/ingestor/observer_lastseen_1465_test.go b/cmd/ingestor/observer_lastseen_1465_test.go new file mode 100644 index 00000000..dedd7ab7 --- /dev/null +++ b/cmd/ingestor/observer_lastseen_1465_test.go @@ -0,0 +1,109 @@ +package main + +// Regression tests for issue #1465 — observer.last_seen MUST always reflect +// ingest time (server wall clock), never the MQTT envelope timestamp. Observers +// with broken clocks (wrong TZ, RTC drift, replayed retained messages) must +// NOT be able to drag the analyzer's "last heard from" field into the past +// or future. +// +// Per-packet rxTime semantics (envelope time with naive-clamp from #1464) +// are out of scope here — those continue to use envelope time. This file +// asserts only the observer.last_seen path. + +import ( + "testing" + "time" +) + +// Status path: envelope timestamp is a well-formed RFC3339 value 3h in the +// past. observer.last_seen must be server wall clock, NOT the envelope value. +func TestStatusMessage_ObserverLastSeen_AlwaysIngestTime_PastEnvelope_1465(t *testing.T) { + store := newTestStore(t) + source := MQTTSource{Name: "test"} + + stale := time.Now().UTC().Add(-3 * time.Hour).Format(time.RFC3339) + before := time.Now().Unix() + + payload := []byte(`{"status":"online","origin":"obs-past","timestamp":"` + stale + `"}`) + msg := &mockMessage{topic: "meshcore/SJC/obs-past/status", payload: payload} + + handleMessage(store, "test", source, msg, nil, nil, &Config{}) + after := time.Now().Unix() + + var lastSeen string + if err := store.db.QueryRow(`SELECT last_seen FROM observers WHERE id = ?`, "obs-past").Scan(&lastSeen); err != nil { + t.Fatalf("scan last_seen: %v", err) + } + ls, err := time.Parse(time.RFC3339, lastSeen) + if err != nil { + t.Fatalf("last_seen %q not RFC3339: %v", lastSeen, err) + } + if ls.Unix() < before-5 || ls.Unix() > after+5 { + t.Errorf("observer.last_seen = %q (epoch %d); want in [%d, %d] (server wall clock). "+ + "Envelope reported well-formed stale %q (3h ago) — must NOT drag last_seen into the past. Issue #1465.", + lastSeen, ls.Unix(), before, after, stale) + } +} + +// Status path: envelope timestamp 5 min in the future. observer.last_seen +// must still be server wall clock. +func TestStatusMessage_ObserverLastSeen_AlwaysIngestTime_FutureEnvelope_1465(t *testing.T) { + store := newTestStore(t) + source := MQTTSource{Name: "test"} + + future := time.Now().UTC().Add(5 * time.Minute).Format(time.RFC3339) + before := time.Now().Unix() + + payload := []byte(`{"status":"online","origin":"obs-future","timestamp":"` + future + `"}`) + msg := &mockMessage{topic: "meshcore/SJC/obs-future/status", payload: payload} + + handleMessage(store, "test", source, msg, nil, nil, &Config{}) + after := time.Now().Unix() + + var lastSeen string + if err := store.db.QueryRow(`SELECT last_seen FROM observers WHERE id = ?`, "obs-future").Scan(&lastSeen); err != nil { + t.Fatalf("scan last_seen: %v", err) + } + ls, err := time.Parse(time.RFC3339, lastSeen) + if err != nil { + t.Fatalf("last_seen %q not RFC3339: %v", lastSeen, err) + } + if ls.Unix() < before-5 || ls.Unix() > after+5 { + t.Errorf("observer.last_seen = %q (epoch %d); want in [%d, %d] (server wall clock). "+ + "Envelope reported well-formed future %q (5 min ahead) — must NOT drag last_seen into the future. Issue #1465.", + lastSeen, ls.Unix(), before, after, future) + } +} + +// Packet path: a transmission whose envelope timestamp is 3h in the past +// MUST still bump observer.last_seen to server wall clock — observer is +// clearly alive (we just ingested a packet from it), regardless of what +// its clock claims. +func TestPacketMessage_ObserverLastSeen_AlwaysIngestTime_PastEnvelope_1465(t *testing.T) { + store := newTestStore(t) + source := MQTTSource{Name: "test"} + + stale := time.Now().UTC().Add(-3 * time.Hour).Format(time.RFC3339) + before := time.Now().Unix() + + rawHex := "0A00D69FD7A5A7475DB07337749AE61FA53A4788E976" + payload := []byte(`{"raw":"` + rawHex + `","SNR":5.5,"RSSI":-100.0,"origin":"obs-pkt","timestamp":"` + stale + `"}`) + msg := &mockMessage{topic: "meshcore/SJC/obs-pkt/packets", payload: payload} + + handleMessage(store, "test", source, msg, nil, nil, &Config{}) + after := time.Now().Unix() + + var lastSeen string + if err := store.db.QueryRow(`SELECT last_seen FROM observers WHERE id = ?`, "obs-pkt").Scan(&lastSeen); err != nil { + t.Fatalf("scan last_seen: %v", err) + } + ls, err := time.Parse(time.RFC3339, lastSeen) + if err != nil { + t.Fatalf("last_seen %q not RFC3339: %v", lastSeen, err) + } + if ls.Unix() < before-5 || ls.Unix() > after+5 { + t.Errorf("packet-path observer.last_seen = %q (epoch %d); want in [%d, %d] (server wall clock). "+ + "Envelope stale = %q. Observer just delivered a packet; last_seen must be NOW. Issue #1465.", + lastSeen, ls.Unix(), before, after, stale) + } +}