diff --git a/cmd/ingestor/main.go b/cmd/ingestor/main.go index 4f669ae0..da63e4b9 100644 --- a/cmd/ingestor/main.go +++ b/cmd/ingestor/main.go @@ -1086,7 +1086,7 @@ func resolveRxTime(msg map[string]interface{}, tag string) string { if raw == "" { return now.Format(time.RFC3339) } - t, err := parseEnvelopeTime(raw) + t, naive, err := parseEnvelopeTime(raw) if err != nil { log.Printf("MQTT [%s] unparseable timestamp %q, using ingest time", tag, raw) return now.Format(time.RFC3339) @@ -1105,13 +1105,30 @@ func resolveRxTime(msg map[string]interface{}, tag string) string { log.Printf("MQTT [%s] stale timestamp %q (>30d old), using ingest time", tag, raw) return now.Format(time.RFC3339) } - // Soft clamp: naive local-clock timestamps from UTC+N observers are parsed - // as-if UTC, making them appear N hours in the future. A UTC+2 observer's - // live packet looks 2h ahead, but it is NOT a buffered packet — the whole - // point of using rxTime is to preserve the past timestamp for packets that - // were buffered offline. If rxTime is ahead of now, the packet is live and - // ingest time is the correct value. This also prevents storing future - // timestamps that would show ⚠️ in the UI for every packet from UTC+N nodes. + // Symmetric naive-timestamp clamp (issue #1463). Naive (zone-less) ISO + // values from observers in non-UTC zones are parsed as-if UTC, leaving a + // residual offset equal to the observer's UTC offset: + // - UTC+N observer → value appears N hours in the future + // - UTC-N observer → value appears N hours in the past + // The past case was silently stored verbatim, poisoning last_seen and + // rendering UTC-N observers perpetually "Stale" in the UI. Collapse any + // naive value more than 15 min off server-now to now() — well-behaved + // observers (Z-suffixed or explicit offset) are untouched regardless of + // skew so legitimate buffered uploads remain accurate. + const naiveTolerance = 15 * time.Minute + if naive { + delta := t.Sub(now) + if delta < 0 { + delta = -delta + } + if delta > naiveTolerance { + log.Printf("MQTT [%s] naive timestamp %q off by %s, using ingest time", tag, raw, delta.Round(time.Second)) + return now.Format(time.RFC3339) + } + } + // Legacy soft clamp for zone-aware near-future values: any value ahead of + // now is from a slightly skewed observer clock — collapse to now so we + // don't render ⚠️ in the UI for live packets from those nodes. if t.After(now) { return now.Format(time.RFC3339) } @@ -1121,19 +1138,22 @@ func resolveRxTime(msg map[string]interface{}, tag string) string { // parseEnvelopeTime parses the MQTT envelope timestamp. Two on-wire forms // occur: zone-aware ISO8601 (RFC3339), and a naive local-clock ISO string // with no zone (python datetime.isoformat()). Zone-aware layouts are tried -// first; naive layouts are assumed UTC, leaving a bounded residual offset -// equal to the observer's UTC offset for naive-timestamp uploaders. -func parseEnvelopeTime(s string) (time.Time, error) { +// first; naive layouts are assumed UTC but the caller is informed via the +// returned `naive` flag so it can apply a symmetric clamp (see issue #1463). +func parseEnvelopeTime(s string) (time.Time, bool, error) { + // Zone-aware first — RFC3339 demands Z or ±HH:MM. + if t, err := time.Parse(time.RFC3339, s); err == nil { + return t, false, nil + } for _, layout := range []string{ - time.RFC3339, // 2026-05-16T10:00:00Z / +02:00 "2006-01-02T15:04:05.999999", // python isoformat w/ microseconds "2006-01-02T15:04:05", // naive ISO } { if t, err := time.Parse(layout, s); err == nil { - return t, nil + return t, true, nil } } - return time.Time{}, fmt.Errorf("unrecognized timestamp layout: %q", s) + return time.Time{}, false, fmt.Errorf("unrecognized timestamp layout: %q", s) } // deriveHashtagChannelKey derives an AES-128 key from a channel name. diff --git a/cmd/ingestor/rxtime_test.go b/cmd/ingestor/rxtime_test.go index 2a29baa7..9fc4b9b2 100644 --- a/cmd/ingestor/rxtime_test.go +++ b/cmd/ingestor/rxtime_test.go @@ -7,23 +7,27 @@ import ( func TestParseEnvelopeTime(t *testing.T) { cases := []struct { - name string - in string - ok bool + name string + in string + ok bool + wantNaive bool }{ - {"rfc3339 utc", "2026-05-16T10:00:00Z", true}, - {"rfc3339 offset", "2026-05-16T12:00:00+02:00", true}, - {"naive iso", "2026-05-16T10:00:00", true}, - {"naive iso micros", "2026-05-16T10:00:00.123456", true}, - {"garbage", "not-a-time", false}, - {"empty", "", false}, + {"rfc3339 utc", "2026-05-16T10:00:00Z", true, false}, + {"rfc3339 offset", "2026-05-16T12:00:00+02:00", true, false}, + {"naive iso", "2026-05-16T10:00:00", true, true}, + {"naive iso micros", "2026-05-16T10:00:00.123456", true, true}, + {"garbage", "not-a-time", false, false}, + {"empty", "", false, false}, } for _, c := range cases { t.Run(c.name, func(t *testing.T) { - _, err := parseEnvelopeTime(c.in) + _, naive, err := parseEnvelopeTime(c.in) if (err == nil) != c.ok { t.Fatalf("parseEnvelopeTime(%q): want ok=%v, got err=%v", c.in, c.ok, err) } + if err == nil && naive != c.wantNaive { + t.Fatalf("parseEnvelopeTime(%q): want naive=%v, got %v", c.in, c.wantNaive, naive) + } }) } }