diff --git a/cmd/ingestor/db.go b/cmd/ingestor/db.go index 79771f3f..0f128775 100644 --- a/cmd/ingestor/db.go +++ b/cmd/ingestor/db.go @@ -748,9 +748,11 @@ func (s *Store) InsertTransmission(data *PacketData) (bool, error) { err := s.stmtGetObserverRowid.QueryRow(data.ObserverID).Scan(&rowid) if err == nil { observerIdx = &rowid - // Update observer last_seen and last_packet_at on every packet to prevent - // low-traffic observers from appearing offline (#463) - _, _ = s.stmtUpdateObserverLastSeen.Exec(ingestNow, rxTime, ingestNow, rxTime, rowid) + // observer.last_seen and last_packet_at answer "when did the analyzer + // last hear from this observer" — both are ingest-time questions. + // Per-packet rxTime is stored separately on observations/transmissions + // using envelope time (see InsertTransmission above). See #1465. + _, _ = s.stmtUpdateObserverLastSeen.Exec(ingestNow, ingestNow, ingestNow, ingestNow, rowid) } } diff --git a/cmd/ingestor/db_test.go b/cmd/ingestor/db_test.go index cc8d3e2c..3d964d14 100644 --- a/cmd/ingestor/db_test.go +++ b/cmd/ingestor/db_test.go @@ -554,18 +554,26 @@ func TestInsertTransmissionUpdatesObserverLastSeen(t *testing.T) { PathJSON: "[]", DecodedJSON: `{"type":"TXT_MSG"}`, } + before := time.Now().Unix() if _, err := s.InsertTransmission(data); err != nil { t.Fatal(err) } + after := time.Now().Unix() - // Verify last_seen was updated + // Verify last_seen was updated to INGEST time, not envelope time (#1465). var lastSeenAfter string s.db.QueryRow("SELECT last_seen FROM observers WHERE id = ?", "obs1").Scan(&lastSeenAfter) if lastSeenAfter == oldTime { t.Error("observer last_seen was NOT updated after packet insertion — low-traffic observers will appear offline") } - if lastSeenAfter != "2026-03-25T01:00:00Z" { - t.Errorf("expected last_seen=2026-03-25T01:00:00Z, got %s", lastSeenAfter) + ls, err := time.Parse(time.RFC3339, lastSeenAfter) + if err != nil { + t.Fatalf("last_seen %q not RFC3339: %v", lastSeenAfter, err) + } + if ls.Unix() < before-5 || ls.Unix() > after+5 { + t.Errorf("expected last_seen ≈ server now (in [%d, %d]), got %s (epoch %d). "+ + "observer.last_seen must use ingest time, not envelope time (#1465).", + before, after, lastSeenAfter, ls.Unix()) } } @@ -598,18 +606,26 @@ func TestLastPacketAtUpdatedOnPacketOnly(t *testing.T) { PathJSON: "[]", DecodedJSON: `{"type":"TXT_MSG"}`, } + before := time.Now().Unix() if _, err := s.InsertTransmission(data); err != nil { t.Fatal(err) } + after := time.Now().Unix() s.db.QueryRow("SELECT last_packet_at FROM observers WHERE id = ?", "obs1").Scan(&lastPacketAt) if !lastPacketAt.Valid { t.Fatal("expected last_packet_at to be non-NULL after InsertTransmission") } - // InsertTransmission uses `now = data.Timestamp || time.Now()`, so last_packet_at - // should match the packet's Timestamp when provided (same source-of-truth as last_seen). - if lastPacketAt.String != "2026-04-24T12:00:00Z" { - t.Errorf("expected last_packet_at=2026-04-24T12:00:00Z, got %s", lastPacketAt.String) + // last_packet_at, like last_seen, is "when did the analyzer last receive a + // packet from this observer" — an ingest-time question, independent of the + // envelope timestamp. See #1465. + lp, err := time.Parse(time.RFC3339, lastPacketAt.String) + if err != nil { + t.Fatalf("last_packet_at %q not RFC3339: %v", lastPacketAt.String, err) + } + if lp.Unix() < before-5 || lp.Unix() > after+5 { + t.Errorf("expected last_packet_at ≈ server now (in [%d, %d]), got %s (epoch %d)", + before, after, lastPacketAt.String, lp.Unix()) } // UpsertObserver again (status path) — last_packet_at should NOT change diff --git a/cmd/ingestor/main.go b/cmd/ingestor/main.go index da63e4b9..cdd63e89 100644 --- a/cmd/ingestor/main.go +++ b/cmd/ingestor/main.go @@ -487,7 +487,11 @@ func handleMessage(store *Store, tag string, source MQTTSource, m mqtt.Message, name, _ := msg["origin"].(string) iata := parts[1] meta := extractObserverMeta(msg) - if err := store.UpsertObserverAt(observerID, name, iata, meta, resolveRxTime(msg, tag)); err != nil { + // observer.last_seen is "when did the analyzer last hear from this + // observer" — fundamentally an ingest-time question. Passing "" makes + // UpsertObserverAt use time.Now(), independent of the envelope timestamp + // (which can be stale/skewed even when well-formed). See #1465. + if err := store.UpsertObserverAt(observerID, name, iata, meta, ""); err != nil { log.Printf("MQTT [%s] observer status error: %v", tag, err) } // Insert metrics sample from status message @@ -709,7 +713,10 @@ func handleMessage(store *Store, tag string, source MQTTSource, m mqtt.Message, if mqttMsg.Region != "" { effectiveRegion = mqttMsg.Region } - if err := store.UpsertObserverAt(observerID, origin, effectiveRegion, nil, mqttMsg.Timestamp); err != nil { + // Same as the status-path call above: observer.last_seen is ingest + // time, not envelope time. Per-packet rxTime (stored in observations + // via InsertTransmission) still uses envelope time. See #1465. + if err := store.UpsertObserverAt(observerID, origin, effectiveRegion, nil, ""); err != nil { log.Printf("MQTT [%s] observer upsert error: %v", tag, err) } } diff --git a/cmd/ingestor/observer_lastseen_1465_test.go b/cmd/ingestor/observer_lastseen_1465_test.go new file mode 100644 index 00000000..dedd7ab7 --- /dev/null +++ b/cmd/ingestor/observer_lastseen_1465_test.go @@ -0,0 +1,109 @@ +package main + +// Regression tests for issue #1465 — observer.last_seen MUST always reflect +// ingest time (server wall clock), never the MQTT envelope timestamp. Observers +// with broken clocks (wrong TZ, RTC drift, replayed retained messages) must +// NOT be able to drag the analyzer's "last heard from" field into the past +// or future. +// +// Per-packet rxTime semantics (envelope time with naive-clamp from #1464) +// are out of scope here — those continue to use envelope time. This file +// asserts only the observer.last_seen path. + +import ( + "testing" + "time" +) + +// Status path: envelope timestamp is a well-formed RFC3339 value 3h in the +// past. observer.last_seen must be server wall clock, NOT the envelope value. +func TestStatusMessage_ObserverLastSeen_AlwaysIngestTime_PastEnvelope_1465(t *testing.T) { + store := newTestStore(t) + source := MQTTSource{Name: "test"} + + stale := time.Now().UTC().Add(-3 * time.Hour).Format(time.RFC3339) + before := time.Now().Unix() + + payload := []byte(`{"status":"online","origin":"obs-past","timestamp":"` + stale + `"}`) + msg := &mockMessage{topic: "meshcore/SJC/obs-past/status", payload: payload} + + handleMessage(store, "test", source, msg, nil, nil, &Config{}) + after := time.Now().Unix() + + var lastSeen string + if err := store.db.QueryRow(`SELECT last_seen FROM observers WHERE id = ?`, "obs-past").Scan(&lastSeen); err != nil { + t.Fatalf("scan last_seen: %v", err) + } + ls, err := time.Parse(time.RFC3339, lastSeen) + if err != nil { + t.Fatalf("last_seen %q not RFC3339: %v", lastSeen, err) + } + if ls.Unix() < before-5 || ls.Unix() > after+5 { + t.Errorf("observer.last_seen = %q (epoch %d); want in [%d, %d] (server wall clock). "+ + "Envelope reported well-formed stale %q (3h ago) — must NOT drag last_seen into the past. Issue #1465.", + lastSeen, ls.Unix(), before, after, stale) + } +} + +// Status path: envelope timestamp 5 min in the future. observer.last_seen +// must still be server wall clock. +func TestStatusMessage_ObserverLastSeen_AlwaysIngestTime_FutureEnvelope_1465(t *testing.T) { + store := newTestStore(t) + source := MQTTSource{Name: "test"} + + future := time.Now().UTC().Add(5 * time.Minute).Format(time.RFC3339) + before := time.Now().Unix() + + payload := []byte(`{"status":"online","origin":"obs-future","timestamp":"` + future + `"}`) + msg := &mockMessage{topic: "meshcore/SJC/obs-future/status", payload: payload} + + handleMessage(store, "test", source, msg, nil, nil, &Config{}) + after := time.Now().Unix() + + var lastSeen string + if err := store.db.QueryRow(`SELECT last_seen FROM observers WHERE id = ?`, "obs-future").Scan(&lastSeen); err != nil { + t.Fatalf("scan last_seen: %v", err) + } + ls, err := time.Parse(time.RFC3339, lastSeen) + if err != nil { + t.Fatalf("last_seen %q not RFC3339: %v", lastSeen, err) + } + if ls.Unix() < before-5 || ls.Unix() > after+5 { + t.Errorf("observer.last_seen = %q (epoch %d); want in [%d, %d] (server wall clock). "+ + "Envelope reported well-formed future %q (5 min ahead) — must NOT drag last_seen into the future. Issue #1465.", + lastSeen, ls.Unix(), before, after, future) + } +} + +// Packet path: a transmission whose envelope timestamp is 3h in the past +// MUST still bump observer.last_seen to server wall clock — observer is +// clearly alive (we just ingested a packet from it), regardless of what +// its clock claims. +func TestPacketMessage_ObserverLastSeen_AlwaysIngestTime_PastEnvelope_1465(t *testing.T) { + store := newTestStore(t) + source := MQTTSource{Name: "test"} + + stale := time.Now().UTC().Add(-3 * time.Hour).Format(time.RFC3339) + before := time.Now().Unix() + + rawHex := "0A00D69FD7A5A7475DB07337749AE61FA53A4788E976" + payload := []byte(`{"raw":"` + rawHex + `","SNR":5.5,"RSSI":-100.0,"origin":"obs-pkt","timestamp":"` + stale + `"}`) + msg := &mockMessage{topic: "meshcore/SJC/obs-pkt/packets", payload: payload} + + handleMessage(store, "test", source, msg, nil, nil, &Config{}) + after := time.Now().Unix() + + var lastSeen string + if err := store.db.QueryRow(`SELECT last_seen FROM observers WHERE id = ?`, "obs-pkt").Scan(&lastSeen); err != nil { + t.Fatalf("scan last_seen: %v", err) + } + ls, err := time.Parse(time.RFC3339, lastSeen) + if err != nil { + t.Fatalf("last_seen %q not RFC3339: %v", lastSeen, err) + } + if ls.Unix() < before-5 || ls.Unix() > after+5 { + t.Errorf("packet-path observer.last_seen = %q (epoch %d); want in [%d, %d] (server wall clock). "+ + "Envelope stale = %q. Observer just delivered a packet; last_seen must be NOW. Issue #1465.", + lastSeen, ls.Unix(), before, after, stale) + } +}