fix(#1465): observer.last_seen always uses ingest time, not envelope (#1466)

## Summary

`observer.last_seen` (and `last_packet_at`) answer "when did the
analyzer last hear from this observer" — fundamentally an ingest-time
question. Previously both the status-message handler and the
packet-message handler passed the MQTT envelope timestamp into
`UpsertObserverAt` / `stmtUpdateObserverLastSeen`, which let buggy
observer clocks drag `last_seen` hours into the past even when the
timestamp parsed cleanly as RFC3339 (so #1464's naive-clamp didn't catch
it).

California observers on `analyzer.00id.net` consistently appeared 3-7h
stale for this reason.

## Fix

- `cmd/ingestor/main.go` status handler: pass `""` to `UpsertObserverAt`
so it falls back to `time.Now()`.
- `cmd/ingestor/main.go` packet-path observer upsert: same.
- `cmd/ingestor/db.go` `InsertTransmission`'s
`stmtUpdateObserverLastSeen.Exec` call: use `ingestNow` for both
`last_seen` and `last_packet_at` (was `rxTime`).

Per-packet rxTime semantics (`transmissions.first_seen`,
`observations.timestamp`) are unchanged — those continue to use envelope
time with the naive-clamp / 14h-future / 30d-past guards from #1463 /
#1464. Per-hop SNR-vs-time analysis still works.

## TDD

- Red: `test(#1465): observer.last_seen uses ingest time even with
well-formed envelope (red)`
- 3 new tests in `observer_lastseen_1465_test.go`: status-past,
status-future, packet-path-past.
- Status-past and packet-path-past assertions failed on master (envelope
time stored verbatim).
- Green: `fix(#1465): observer.last_seen always uses ingest time, not
envelope`
  - All 3 new tests pass.
- Pre-existing `TestInsertTransmissionUpdatesObserverLastSeen` and
`TestLastPacketAtUpdatedOnPacketOnly` were encoding the buggy behavior;
updated to assert ingest-time semantics.
  - Full `go test ./cmd/ingestor/...` green.

## Refs

- Refs #1463 (root-cause investigation)
- Refs #1464 (naive-clamp fix that handled malformed timestamps)
- Closes #1465

---------

Co-authored-by: openclaw-bot <bot@openclaw.local>
This commit is contained in:
Kpa-clawbot
2026-05-28 12:16:29 -07:00
committed by GitHub
parent 4e5e141182
commit 2627bd053b
4 changed files with 146 additions and 12 deletions
+5 -3
View File
@@ -748,9 +748,11 @@ func (s *Store) InsertTransmission(data *PacketData) (bool, error) {
err := s.stmtGetObserverRowid.QueryRow(data.ObserverID).Scan(&rowid)
if err == nil {
observerIdx = &rowid
// Update observer last_seen and last_packet_at on every packet to prevent
// low-traffic observers from appearing offline (#463)
_, _ = s.stmtUpdateObserverLastSeen.Exec(ingestNow, rxTime, ingestNow, rxTime, rowid)
// observer.last_seen and last_packet_at answer "when did the analyzer
// last hear from this observer" — both are ingest-time questions.
// Per-packet rxTime is stored separately on observations/transmissions
// using envelope time (see InsertTransmission above). See #1465.
_, _ = s.stmtUpdateObserverLastSeen.Exec(ingestNow, ingestNow, ingestNow, ingestNow, rowid)
}
}
+23 -7
View File
@@ -554,18 +554,26 @@ func TestInsertTransmissionUpdatesObserverLastSeen(t *testing.T) {
PathJSON: "[]",
DecodedJSON: `{"type":"TXT_MSG"}`,
}
before := time.Now().Unix()
if _, err := s.InsertTransmission(data); err != nil {
t.Fatal(err)
}
after := time.Now().Unix()
// Verify last_seen was updated
// Verify last_seen was updated to INGEST time, not envelope time (#1465).
var lastSeenAfter string
s.db.QueryRow("SELECT last_seen FROM observers WHERE id = ?", "obs1").Scan(&lastSeenAfter)
if lastSeenAfter == oldTime {
t.Error("observer last_seen was NOT updated after packet insertion — low-traffic observers will appear offline")
}
if lastSeenAfter != "2026-03-25T01:00:00Z" {
t.Errorf("expected last_seen=2026-03-25T01:00:00Z, got %s", lastSeenAfter)
ls, err := time.Parse(time.RFC3339, lastSeenAfter)
if err != nil {
t.Fatalf("last_seen %q not RFC3339: %v", lastSeenAfter, err)
}
if ls.Unix() < before-5 || ls.Unix() > after+5 {
t.Errorf("expected last_seen ≈ server now (in [%d, %d]), got %s (epoch %d). "+
"observer.last_seen must use ingest time, not envelope time (#1465).",
before, after, lastSeenAfter, ls.Unix())
}
}
@@ -598,18 +606,26 @@ func TestLastPacketAtUpdatedOnPacketOnly(t *testing.T) {
PathJSON: "[]",
DecodedJSON: `{"type":"TXT_MSG"}`,
}
before := time.Now().Unix()
if _, err := s.InsertTransmission(data); err != nil {
t.Fatal(err)
}
after := time.Now().Unix()
s.db.QueryRow("SELECT last_packet_at FROM observers WHERE id = ?", "obs1").Scan(&lastPacketAt)
if !lastPacketAt.Valid {
t.Fatal("expected last_packet_at to be non-NULL after InsertTransmission")
}
// InsertTransmission uses `now = data.Timestamp || time.Now()`, so last_packet_at
// should match the packet's Timestamp when provided (same source-of-truth as last_seen).
if lastPacketAt.String != "2026-04-24T12:00:00Z" {
t.Errorf("expected last_packet_at=2026-04-24T12:00:00Z, got %s", lastPacketAt.String)
// last_packet_at, like last_seen, is "when did the analyzer last receive a
// packet from this observer" — an ingest-time question, independent of the
// envelope timestamp. See #1465.
lp, err := time.Parse(time.RFC3339, lastPacketAt.String)
if err != nil {
t.Fatalf("last_packet_at %q not RFC3339: %v", lastPacketAt.String, err)
}
if lp.Unix() < before-5 || lp.Unix() > after+5 {
t.Errorf("expected last_packet_at ≈ server now (in [%d, %d]), got %s (epoch %d)",
before, after, lastPacketAt.String, lp.Unix())
}
// UpsertObserver again (status path) — last_packet_at should NOT change
+9 -2
View File
@@ -487,7 +487,11 @@ func handleMessage(store *Store, tag string, source MQTTSource, m mqtt.Message,
name, _ := msg["origin"].(string)
iata := parts[1]
meta := extractObserverMeta(msg)
if err := store.UpsertObserverAt(observerID, name, iata, meta, resolveRxTime(msg, tag)); err != nil {
// observer.last_seen is "when did the analyzer last hear from this
// observer" — fundamentally an ingest-time question. Passing "" makes
// UpsertObserverAt use time.Now(), independent of the envelope timestamp
// (which can be stale/skewed even when well-formed). See #1465.
if err := store.UpsertObserverAt(observerID, name, iata, meta, ""); err != nil {
log.Printf("MQTT [%s] observer status error: %v", tag, err)
}
// Insert metrics sample from status message
@@ -709,7 +713,10 @@ func handleMessage(store *Store, tag string, source MQTTSource, m mqtt.Message,
if mqttMsg.Region != "" {
effectiveRegion = mqttMsg.Region
}
if err := store.UpsertObserverAt(observerID, origin, effectiveRegion, nil, mqttMsg.Timestamp); err != nil {
// Same as the status-path call above: observer.last_seen is ingest
// time, not envelope time. Per-packet rxTime (stored in observations
// via InsertTransmission) still uses envelope time. See #1465.
if err := store.UpsertObserverAt(observerID, origin, effectiveRegion, nil, ""); err != nil {
log.Printf("MQTT [%s] observer upsert error: %v", tag, err)
}
}
+109
View File
@@ -0,0 +1,109 @@
package main
// Regression tests for issue #1465 — observer.last_seen MUST always reflect
// ingest time (server wall clock), never the MQTT envelope timestamp. Observers
// with broken clocks (wrong TZ, RTC drift, replayed retained messages) must
// NOT be able to drag the analyzer's "last heard from" field into the past
// or future.
//
// Per-packet rxTime semantics (envelope time with naive-clamp from #1464)
// are out of scope here — those continue to use envelope time. This file
// asserts only the observer.last_seen path.
import (
"testing"
"time"
)
// Status path: envelope timestamp is a well-formed RFC3339 value 3h in the
// past. observer.last_seen must be server wall clock, NOT the envelope value.
func TestStatusMessage_ObserverLastSeen_AlwaysIngestTime_PastEnvelope_1465(t *testing.T) {
store := newTestStore(t)
source := MQTTSource{Name: "test"}
stale := time.Now().UTC().Add(-3 * time.Hour).Format(time.RFC3339)
before := time.Now().Unix()
payload := []byte(`{"status":"online","origin":"obs-past","timestamp":"` + stale + `"}`)
msg := &mockMessage{topic: "meshcore/SJC/obs-past/status", payload: payload}
handleMessage(store, "test", source, msg, nil, nil, &Config{})
after := time.Now().Unix()
var lastSeen string
if err := store.db.QueryRow(`SELECT last_seen FROM observers WHERE id = ?`, "obs-past").Scan(&lastSeen); err != nil {
t.Fatalf("scan last_seen: %v", err)
}
ls, err := time.Parse(time.RFC3339, lastSeen)
if err != nil {
t.Fatalf("last_seen %q not RFC3339: %v", lastSeen, err)
}
if ls.Unix() < before-5 || ls.Unix() > after+5 {
t.Errorf("observer.last_seen = %q (epoch %d); want in [%d, %d] (server wall clock). "+
"Envelope reported well-formed stale %q (3h ago) — must NOT drag last_seen into the past. Issue #1465.",
lastSeen, ls.Unix(), before, after, stale)
}
}
// Status path: envelope timestamp 5 min in the future. observer.last_seen
// must still be server wall clock.
func TestStatusMessage_ObserverLastSeen_AlwaysIngestTime_FutureEnvelope_1465(t *testing.T) {
store := newTestStore(t)
source := MQTTSource{Name: "test"}
future := time.Now().UTC().Add(5 * time.Minute).Format(time.RFC3339)
before := time.Now().Unix()
payload := []byte(`{"status":"online","origin":"obs-future","timestamp":"` + future + `"}`)
msg := &mockMessage{topic: "meshcore/SJC/obs-future/status", payload: payload}
handleMessage(store, "test", source, msg, nil, nil, &Config{})
after := time.Now().Unix()
var lastSeen string
if err := store.db.QueryRow(`SELECT last_seen FROM observers WHERE id = ?`, "obs-future").Scan(&lastSeen); err != nil {
t.Fatalf("scan last_seen: %v", err)
}
ls, err := time.Parse(time.RFC3339, lastSeen)
if err != nil {
t.Fatalf("last_seen %q not RFC3339: %v", lastSeen, err)
}
if ls.Unix() < before-5 || ls.Unix() > after+5 {
t.Errorf("observer.last_seen = %q (epoch %d); want in [%d, %d] (server wall clock). "+
"Envelope reported well-formed future %q (5 min ahead) — must NOT drag last_seen into the future. Issue #1465.",
lastSeen, ls.Unix(), before, after, future)
}
}
// Packet path: a transmission whose envelope timestamp is 3h in the past
// MUST still bump observer.last_seen to server wall clock — observer is
// clearly alive (we just ingested a packet from it), regardless of what
// its clock claims.
func TestPacketMessage_ObserverLastSeen_AlwaysIngestTime_PastEnvelope_1465(t *testing.T) {
store := newTestStore(t)
source := MQTTSource{Name: "test"}
stale := time.Now().UTC().Add(-3 * time.Hour).Format(time.RFC3339)
before := time.Now().Unix()
rawHex := "0A00D69FD7A5A7475DB07337749AE61FA53A4788E976"
payload := []byte(`{"raw":"` + rawHex + `","SNR":5.5,"RSSI":-100.0,"origin":"obs-pkt","timestamp":"` + stale + `"}`)
msg := &mockMessage{topic: "meshcore/SJC/obs-pkt/packets", payload: payload}
handleMessage(store, "test", source, msg, nil, nil, &Config{})
after := time.Now().Unix()
var lastSeen string
if err := store.db.QueryRow(`SELECT last_seen FROM observers WHERE id = ?`, "obs-pkt").Scan(&lastSeen); err != nil {
t.Fatalf("scan last_seen: %v", err)
}
ls, err := time.Parse(time.RFC3339, lastSeen)
if err != nil {
t.Fatalf("last_seen %q not RFC3339: %v", lastSeen, err)
}
if ls.Unix() < before-5 || ls.Unix() > after+5 {
t.Errorf("packet-path observer.last_seen = %q (epoch %d); want in [%d, %d] (server wall clock). "+
"Envelope stale = %q. Observer just delivered a packet; last_seen must be NOW. Issue #1465.",
lastSeen, ls.Unix(), before, after, stale)
}
}