diff --git a/cmd/ingestor/db.go b/cmd/ingestor/db.go index 48aa4e7d..8894c4ae 100644 --- a/cmd/ingestor/db.go +++ b/cmd/ingestor/db.go @@ -197,7 +197,8 @@ func applySchema(db *sql.DB) error { last_packet_at TEXT DEFAULT NULL, clock_skew_seconds INTEGER DEFAULT NULL, clock_skew_count_24h INTEGER DEFAULT 0, - clock_last_naive_at TEXT DEFAULT NULL + clock_last_naive_at TEXT DEFAULT NULL, + can_relay INTEGER DEFAULT 1 ); CREATE INDEX IF NOT EXISTS idx_nodes_last_seen ON nodes(last_seen); @@ -727,8 +728,8 @@ func (s *Store) prepareStatements() error { } s.stmtUpsertObserver, err = s.db.Prepare(` - INSERT INTO observers (id, name, iata, last_seen, first_seen, packet_count, model, firmware, client_version, radio, battery_mv, uptime_secs, noise_floor) - VALUES (?, ?, ?, ?, ?, 1, ?, ?, ?, ?, ?, ?, ?) + INSERT INTO observers (id, name, iata, last_seen, first_seen, packet_count, model, firmware, client_version, radio, battery_mv, uptime_secs, noise_floor, can_relay) + VALUES (?, ?, ?, ?, ?, 1, ?, ?, ?, ?, ?, ?, ?, COALESCE(?, 1)) ON CONFLICT(id) DO UPDATE SET name = COALESCE(?, name), iata = COALESCE(?, iata), @@ -740,7 +741,8 @@ func (s *Store) prepareStatements() error { radio = COALESCE(?, radio), battery_mv = COALESCE(?, battery_mv), uptime_secs = COALESCE(?, uptime_secs), - noise_floor = COALESCE(?, noise_floor) + noise_floor = COALESCE(?, noise_floor), + can_relay = COALESCE(?, can_relay) `) if err != nil { return err @@ -973,6 +975,13 @@ type ObserverMeta struct { RecvErrors *int // cumulative CRC/decode failures since boot PacketsSent *int // cumulative packets sent since boot PacketsRecv *int // cumulative packets received since boot + // CanRelay reflects the firmware 1.16 /status `repeat` flag (#1290). + // nil means the firmware did not send the field — caller must + // preserve the existing observers.can_relay value (default 1). + // true → relay-capable (`repeat:on`); false → listener-only + // (`repeat:off`), which causes the server-side disambiguator to + // exclude this observer's pubkey from path-hop candidate sets. + CanRelay *bool } // UpsertObserver inserts or updates an observer using the current wall-clock @@ -995,7 +1004,7 @@ func (s *Store) UpsertObserverAt(id, name, iata string, meta *ObserverMeta, last normalizedIATA := strings.TrimSpace(strings.ToUpper(iata)) var model, firmware, clientVersion, radio interface{} - var batteryMv, uptimeSecs, noiseFloor interface{} + var batteryMv, uptimeSecs, noiseFloor, canRelay interface{} if meta != nil { if meta.Model != nil { model = *meta.Model @@ -1018,11 +1027,22 @@ func (s *Store) UpsertObserverAt(id, name, iata string, meta *ObserverMeta, last if meta.NoiseFloor != nil { noiseFloor = *meta.NoiseFloor } + // Issue #1290: nil → leave DB column unchanged (COALESCE in + // the prepared stmt); 0/1 written when firmware provided + // the `repeat` field. INSERT branch defaults to 1 via the + // COALESCE in the VALUES clause. + if meta.CanRelay != nil { + if *meta.CanRelay { + canRelay = 1 + } else { + canRelay = 0 + } + } } _, err := s.stmtUpsertObserver.Exec( - id, name, normalizedIATA, lastSeen, lastSeen, model, firmware, clientVersion, radio, batteryMv, uptimeSecs, noiseFloor, - name, normalizedIATA, ingestNow, lastSeen, model, firmware, clientVersion, radio, batteryMv, uptimeSecs, noiseFloor, + id, name, normalizedIATA, lastSeen, lastSeen, model, firmware, clientVersion, radio, batteryMv, uptimeSecs, noiseFloor, canRelay, + name, normalizedIATA, ingestNow, lastSeen, model, firmware, clientVersion, radio, batteryMv, uptimeSecs, noiseFloor, canRelay, ) if err != nil { s.Stats.WriteErrors.Add(1) diff --git a/cmd/ingestor/main.go b/cmd/ingestor/main.go index f7515343..37806bd2 100644 --- a/cmd/ingestor/main.go +++ b/cmd/ingestor/main.go @@ -1124,6 +1124,37 @@ func extractObserverMeta(msg map[string]interface{}) *ObserverMeta { } } + // Issue #1290: firmware 1.16 publishes a `repeat` flag at the top + // level of the /status JSON (MQTTMessageBuilder.cpp:58 — see + // agessaman/MeshCore mqtt-bridge-implementation-flex). Accept + // either a boolean or a case-insensitive `on|off|true|false|1|0` + // string. Missing field → leave CanRelay nil; the writer preserves + // the prior column value (default 1, back-compat). + if v, ok := msg["repeat"]; ok && v != nil { + switch t := v.(type) { + case bool: + b := t + meta.CanRelay = &b + hasData = true + case string: + s := strings.ToLower(strings.TrimSpace(t)) + switch s { + case "on", "true", "1", "yes": + b := true + meta.CanRelay = &b + hasData = true + case "off", "false", "0", "no": + b := false + meta.CanRelay = &b + hasData = true + } + case float64: + b := t != 0 + meta.CanRelay = &b + hasData = true + } + } + if !hasData { return nil } diff --git a/cmd/server/db.go b/cmd/server/db.go index dc8a5b93..23b0d223 100644 --- a/cmd/server/db.go +++ b/cmd/server/db.go @@ -12,6 +12,7 @@ import ( "sync" "time" + "github.com/meshcore-analyzer/dbschema" "github.com/meshcore-analyzer/geofilter" _ "modernc.org/sqlite" ) @@ -251,6 +252,11 @@ type Observer struct { ClockSkewSeconds *int64 `json:"clock_skew_seconds"` ClockSkewCount24h int `json:"clock_skew_count_24h"` ClockLastNaiveAt *string `json:"clock_last_naive_at"` + // Issue #1290: firmware 1.16 `repeat: on|off` flag persisted by the + // ingestor. true = relay-capable (default for legacy observers). + // false = listener-only; cmd/server/store.go pm.resolveWithContext + // excludes these from path-hop candidate sets. + CanRelay bool `json:"can_relay"` } // Transmission represents a row from the transmissions table. @@ -1148,9 +1154,17 @@ func (db *DB) getObservationsForTransmissions(txIDs []int) map[int][]map[string] // GetObservers returns active observers (not soft-deleted) sorted by last_seen DESC. func (db *DB) GetObservers() ([]Observer, error) { + // Issue #1290: can_relay is read via COALESCE(can_relay, 1). The + // column is added by internal/dbschema; older test fixtures and + // pre-migration DBs may lack it, so we probe and fall back. + canRelayClause := "COALESCE(can_relay, 1)" + if hasCol, _ := dbschema.TableHasColumn(db.conn, "observers", "can_relay"); !hasCol { + canRelayClause = "1" + } rows, err := db.conn.Query(`SELECT id, name, iata, last_seen, first_seen, packet_count, model, firmware, client_version, radio, battery_mv, uptime_secs, noise_floor, last_packet_at, - clock_skew_seconds, clock_skew_count_24h, clock_last_naive_at + clock_skew_seconds, clock_skew_count_24h, clock_last_naive_at, + ` + canRelayClause + ` FROM observers WHERE inactive IS NULL OR inactive = 0 ORDER BY last_seen DESC`) if err != nil { return nil, err @@ -1163,11 +1177,13 @@ func (db *DB) GetObservers() ([]Observer, error) { var batteryMv, uptimeSecs, clockSkewSec sql.NullInt64 var clockSkewCount sql.NullInt64 var noiseFloor sql.NullFloat64 + var canRelay int if err := rows.Scan(&o.ID, &o.Name, &o.IATA, &o.LastSeen, &o.FirstSeen, &o.PacketCount, &o.Model, &o.Firmware, &o.ClientVersion, &o.Radio, &batteryMv, &uptimeSecs, &noiseFloor, &o.LastPacketAt, - &clockSkewSec, &clockSkewCount, &o.ClockLastNaiveAt); err != nil { + &clockSkewSec, &clockSkewCount, &o.ClockLastNaiveAt, &canRelay); err != nil { continue } + o.CanRelay = canRelay != 0 if batteryMv.Valid { v := int(batteryMv.Int64) o.BatteryMv = &v @@ -1190,22 +1206,58 @@ func (db *DB) GetObservers() ([]Observer, error) { return observers, nil } +// GetNonRelayObserverPubkeys returns the lowercase observer.id pubkeys +// for observers that have advertised `repeat:off` (#1290). The server's +// path-hop disambiguator consumes this to exclude listener-only nodes +// from the candidate set. Inactive observers are excluded for +// consistency with GetObservers; reactivation flips can_relay only on +// the next status message. +func (db *DB) GetNonRelayObserverPubkeys() ([]string, error) { + // Graceful no-op when can_relay column is absent (legacy DB / older + // test fixture). Avoids noisy schema-degradation log spam. + if hasCol, _ := dbschema.TableHasColumn(db.conn, "observers", "can_relay"); !hasCol { + return nil, nil + } + rows, err := db.conn.Query(`SELECT LOWER(id) FROM observers + WHERE COALESCE(can_relay, 1) = 0 + AND (inactive IS NULL OR inactive = 0)`) + if err != nil { + return nil, err + } + defer rows.Close() + var out []string + for rows.Next() { + var pk string + if err := rows.Scan(&pk); err == nil && pk != "" { + out = append(out, pk) + } + } + return out, rows.Err() +} + // GetObserverByID returns a single observer. func (db *DB) GetObserverByID(id string) (*Observer, error) { var o Observer var batteryMv, uptimeSecs, clockSkewSec sql.NullInt64 var clockSkewCount sql.NullInt64 var noiseFloor sql.NullFloat64 + var canRelay int + canRelayClause := "COALESCE(can_relay, 1)" + if hasCol, _ := dbschema.TableHasColumn(db.conn, "observers", "can_relay"); !hasCol { + canRelayClause = "1" + } err := db.conn.QueryRow(`SELECT id, name, iata, last_seen, first_seen, packet_count, model, firmware, client_version, radio, battery_mv, uptime_secs, noise_floor, last_packet_at, - clock_skew_seconds, clock_skew_count_24h, clock_last_naive_at + clock_skew_seconds, clock_skew_count_24h, clock_last_naive_at, + `+canRelayClause+` FROM observers WHERE id = ?`, id). Scan(&o.ID, &o.Name, &o.IATA, &o.LastSeen, &o.FirstSeen, &o.PacketCount, &o.Model, &o.Firmware, &o.ClientVersion, &o.Radio, &batteryMv, &uptimeSecs, &noiseFloor, &o.LastPacketAt, - &clockSkewSec, &clockSkewCount, &o.ClockLastNaiveAt) + &clockSkewSec, &clockSkewCount, &o.ClockLastNaiveAt, &canRelay) if err != nil { return nil, err } + o.CanRelay = canRelay != 0 if batteryMv.Valid { v := int(batteryMv.Int64) o.BatteryMv = &v diff --git a/cmd/server/routes.go b/cmd/server/routes.go index 687b5191..135b663e 100644 --- a/cmd/server/routes.go +++ b/cmd/server/routes.go @@ -2534,6 +2534,7 @@ func (s *Server) buildObserversDefaultResponse() (ObserverListResponse, error) { LastPacketAt: o.LastPacketAt, PacketsLastHour: plh, Lat: lat, Lon: lon, NodeRole: nodeRole, + CanRelay: o.CanRelay, } applyObserverNaiveClock(&resp, o, nowTime) result = append(result, resp) @@ -2578,6 +2579,7 @@ func (s *Server) handleObserverDetail(w http.ResponseWriter, r *http.Request) { NoiseFloor: obs.NoiseFloor, LastPacketAt: obs.LastPacketAt, PacketsLastHour: plh, + CanRelay: obs.CanRelay, } applyObserverNaiveClock(&resp, obs, time.Now().UTC()) return resp diff --git a/cmd/server/store.go b/cmd/server/store.go index 3edfd33a..73248f7e 100644 --- a/cmd/server/store.go +++ b/cmd/server/store.go @@ -6194,6 +6194,17 @@ func (s *PacketStore) getCachedNodesAndPM() ([]nodeInfo, *prefixMap) { nodes := s.getAllNodes() pm := buildPrefixMap(nodes) + // Issue #1290: exclude observers that advertised `repeat:off` from + // the path-hop candidate set. Failure is non-fatal — we log via the + // schema-degradation channel and proceed with an empty filter (i.e. + // pre-#1290 behavior). + if s.db != nil && s.db.conn != nil { + if pks, err := s.db.GetNonRelayObserverPubkeys(); err == nil { + pm.markNonRelay(pks) + } else { + s.logSchemaDegradationOnce("observers.can_relay read failed; path-hop disambiguator will not filter listener-only observers: " + err.Error()) + } + } s.cacheMu.Lock() s.nodeCache = nodes @@ -6257,6 +6268,25 @@ func (pm *prefixMap) resolve(hop string) *nodeInfo { func (pm *prefixMap) resolveWithContext(hop string, contextPubkeys []string, graph *NeighborGraph) (*nodeInfo, string, float64) { h := strings.ToLower(hop) candidates := pm.m[h] + // Issue #1290: drop observer-known listener-only nodes from the + // candidate set. By firmware contract a node that advertises + // `repeat:off` in its MQTT /status will never relay a packet, so it + // cannot legitimately be a hop in someone else's path. Filtering + // here shrinks ambiguous candidate sets without affecting any + // upstream caller (the returned shape and confidence labels are + // preserved; only no_match becomes more likely when the only + // matching prefix belonged to a listener). Empty pm.nonRelay + // preserves the pre-#1290 behavior exactly (back-compat). + if len(pm.nonRelay) > 0 && len(candidates) > 0 { + filtered := candidates[:0:0] + for i := range candidates { + if _, isListener := pm.nonRelay[strings.ToLower(candidates[i].PublicKey)]; isListener { + continue + } + filtered = append(filtered, candidates[i]) + } + candidates = filtered + } if len(candidates) == 0 { return nil, "no_match", 0 } @@ -8870,6 +8900,18 @@ func (s *PacketStore) GetNodeHealth(pubkey string) (map[string]interface{}, erro } observerRows := make([]map[string]interface{}, 0) + // Issue #1290: surface listener/repeater hint on node detail by + // looking up can_relay for each observer that heard this node. + // One-shot fetch of the non-relay set keeps this O(observers) on + // rare events; nil on error degrades to "neither badge" client-side. + nonRelaySet := map[string]struct{}{} + if s.db != nil && s.db.conn != nil { + if pks, err := s.db.GetNonRelayObserverPubkeys(); err == nil { + for _, pk := range pks { + nonRelaySet[strings.ToUpper(pk)] = struct{}{} + } + } + } for id, o := range observerStats { var avgSnr, avgRssi interface{} if o.snrCount > 0 { @@ -8878,9 +8920,14 @@ func (s *PacketStore) GetNodeHealth(pubkey string) (map[string]interface{}, erro if o.rssiCount > 0 { avgRssi = o.rssiSum / float64(o.rssiCount) } + canRelay := true + if _, isListener := nonRelaySet[strings.ToUpper(id)]; isListener { + canRelay = false + } observerRows = append(observerRows, map[string]interface{}{ "observer_id": id, "observer_name": o.name, "avgSnr": avgSnr, "avgRssi": avgRssi, "packetCount": o.count, + "can_relay": canRelay, }) } sort.Slice(observerRows, func(i, j int) bool { diff --git a/cmd/server/types.go b/cmd/server/types.go index 2a6ff9de..4c9d8d35 100644 --- a/cmd/server/types.go +++ b/cmd/server/types.go @@ -909,6 +909,10 @@ type ObserverResp struct { ClockSkewSeconds interface{} `json:"clock_skew_seconds"` ClockSkewCount24h int `json:"clock_skew_count_24h"` ClockLastNaiveAt interface{} `json:"clock_last_naive_at"` + // Issue #1290: firmware 1.16 `repeat` flag — true=repeater, + // false=listener-only. Drives the UI badge on observers list + + // node detail page. Defaults to true for legacy observers. + CanRelay bool `json:"can_relay"` } type ObserverListResponse struct { diff --git a/internal/dbschema/dbschema.go b/internal/dbschema/dbschema.go index 832d1716..e117c66c 100644 --- a/internal/dbschema/dbschema.go +++ b/internal/dbschema/dbschema.go @@ -82,6 +82,9 @@ func Apply(rw *sql.DB, logf Logger) error { if err := ensureObserverNaiveClockColumns(rw, logf); err != nil { return fmt.Errorf("ensure observers naive-clock columns: %w", err) } + if err := ensureObserverCanRelayColumn(rw, logf); err != nil { + return fmt.Errorf("ensure observers.can_relay: %w", err) + } return nil } @@ -138,6 +141,12 @@ func AssertReady(ro *sql.DB) error { mustCol("observers", "clock_skew_seconds") mustCol("observers", "clock_skew_count_24h") mustCol("observers", "clock_last_naive_at") + // Issue #1290: firmware 1.16 publishes a `repeat: on|off` flag in + // the MQTT /status JSON. Ingestor persists it as can_relay; server + // reads it to filter listener-only observers out of the path-hop + // disambiguator candidate set. Default 1 preserves prior behavior + // for legacy observers that never sent the field. + mustCol("observers", "can_relay") if len(missing) > 0 { return fmt.Errorf("schema not migrated by ingestor; restart ingestor first. missing: %s", @@ -522,3 +531,25 @@ func ensureObserverNaiveClockColumns(rw *sql.DB, logf Logger) error { } return nil } + +// ensureObserverCanRelayColumn adds the can_relay column to observers. +// Firmware 1.16 publishes a `repeat: on|off` flag in the MQTT /status +// JSON (#1290); the ingestor parses it and writes 0/1 here. The server's +// path-hop disambiguator (cmd/server/store.go pm.resolveWithContext) +// excludes observers with can_relay=0 from the candidate set. Default 1 +// preserves prior behavior for legacy observers (no repeat field). +func ensureObserverCanRelayColumn(rw *sql.DB, logf Logger) error { + has, err := TableHasColumn(rw, "observers", "can_relay") + if err != nil { + return err + } + if has { + return nil + } + // PREFLIGHT: async=true reason="single-column ALTER on observers (low-cardinality, ~1k rows in prod); DEFAULT 1 is a constant so SQLite does the rewrite as a metadata-only schema update, no row scan" + if _, err := rw.Exec("ALTER TABLE observers ADD COLUMN can_relay INTEGER DEFAULT 1"); err != nil { + return err + } + logf("[dbschema] added can_relay column to observers") + return nil +} diff --git a/public/nodes.js b/public/nodes.js index 548184f1..ce17d8fa 100644 --- a/public/nodes.js +++ b/public/nodes.js @@ -676,7 +676,7 @@ ${observers.map(o => ` - ${escapeHtml(o.observer_name || o.observer_id)} + ${escapeHtml(o.observer_name || o.observer_id)}${o.can_relay === false ? ' listener' : (o.can_relay === true ? ' repeater' : '')} ${o.iata ? escapeHtml(o.iata) : '—'} ${o.packetCount} ${o.avgSnr != null ? Number(o.avgSnr).toFixed(1) + ' dB' : '—'} diff --git a/public/observer-detail.js b/public/observer-detail.js index e97914a7..3ad5f654 100644 --- a/public/observer-detail.js +++ b/public/observer-detail.js @@ -180,6 +180,10 @@ window.ObserverDetailNaiveBanner = {
Status
${statusLabel}
+
+
Relay
+
${obs.can_relay === false ? 'listener' : 'repeater'}
+
Region
${obs.iata ? '' + escapeHtml(obs.iata) + '' : '—'}
diff --git a/public/observers.js b/public/observers.js index 7c19f35f..0aac88d9 100644 --- a/public/observers.js +++ b/public/observers.js @@ -297,7 +297,7 @@ window.ObserversSummary = (function () { const shape = h.cls === 'health-green' ? '●' : h.cls === 'health-yellow' ? '▲' : '✕'; return ` ${shape} ${h.label} - ${escapeHtml(o.name || o.id)}${window.ObserversNaiveChip.render(o)} + ${escapeHtml(o.name || o.id)}${window.ObserversNaiveChip.render(o)}${o.can_relay === false ? ' listener' : ' repeater'} ${o.iata ? `${o.iata}` : '—'} ${timeAgo(o.last_seen)} ${o.last_packet_at ? timeAgo(o.last_packet_at) : ''} diff --git a/public/style.css b/public/style.css index a17793c5..3cb88dc7 100644 --- a/public/style.css +++ b/public/style.css @@ -1146,6 +1146,24 @@ body.scroll-locked { overflow: hidden; } font-size: 10px; font-weight: 700; font-family: var(--mono); background: var(--nav-bg); color: var(--nav-text); letter-spacing: .5px; } +/* Issue #1290: listener vs repeater badge on observers list + detail. + * Driven by ObserverResp.can_relay (false = firmware reported repeat:off). + * Server-side disambiguator excludes listener observers from path-hop + * candidates; this badge surfaces the same hint to operators. Colors use + * existing theme vars so the badge tracks the active palette. */ +.badge-listener, +.badge-repeater { + display: inline-block; padding: 1px 5px; border-radius: 4px; + font-size: 10px; font-weight: 700; font-family: var(--mono); + letter-spacing: .5px; margin-left: 4px; vertical-align: middle; +} +.badge-listener { + background: var(--transport-badge-bg, #f59e0b20); + color: var(--transport-badge-fg, #d97706); +} +.badge-repeater { + background: var(--nav-bg); color: var(--nav-text); +} /* Observer IATA pill rendered inline next to observer name on packets (#1188). * Visually similar to .badge-region but distinct so the row badge and the * inline-with-observer badge can be styled independently in future themes. */