diff --git a/cmd/server/issue804_repeater_region_test.go b/cmd/server/issue804_repeater_region_test.go new file mode 100644 index 00000000..3a4c3b86 --- /dev/null +++ b/cmd/server/issue804_repeater_region_test.go @@ -0,0 +1,147 @@ +package main + +import ( + "testing" + "time" +) + +// TestIssue804_AnalyticsAttributesByRepeaterRegion verifies that analytics +// (specifically GetAnalyticsHashSizes) attribute multi-byte nodes to the +// REPEATER's home region, not the observer that happened to hear the relay. +// +// Scenario from #804: +// - PDX-Repeater is a multi-byte (hashSize=2) repeater whose ZERO-HOP direct +// adverts are only heard by obs-PDX (a PDX observer). That zero-hop direct +// advert is the most reliable home-region signal — it cannot have been +// relayed. +// - A flood advert from PDX-Repeater (hashSize=2) propagates and is heard by +// obs-SJC (a SJC observer) via a multi-hop relay path. +// - When the user asks for region=SJC analytics, the PDX-Repeater MUST NOT +// pollute SJC's multiByteNodes — it lives in PDX. +// - The result should also expose attributionMethod="repeater" so the API +// consumer knows which method was used. +// +// Pre-fix behavior: PDX-Repeater appears in SJC's multiByteNodes because the +// filter is observer-based. This test fails on the pre-fix code at the +// "want PDX-Repeater EXCLUDED" assertion. +func TestIssue804_AnalyticsAttributesByRepeaterRegion(t *testing.T) { + db := setupTestDB(t) + defer db.Close() + + now := time.Now().UTC() + recent := now.Add(-1 * time.Hour).Format(time.RFC3339) + recentEpoch := now.Add(-1 * time.Hour).Unix() + + // Observers: one in PDX, one in SJC + db.conn.Exec(`INSERT INTO observers (id, name, iata, last_seen, first_seen, packet_count) + VALUES ('obs-pdx', 'Obs PDX', 'PDX', ?, '2026-01-01T00:00:00Z', 100)`, recent) + db.conn.Exec(`INSERT INTO observers (id, name, iata, last_seen, first_seen, packet_count) + VALUES ('obs-sjc', 'Obs SJC', 'SJC', ?, '2026-01-01T00:00:00Z', 100)`, recent) + + // PDX-Repeater node (lives in Portland) + pdxPK := "pdx0000000000001" + db.conn.Exec(`INSERT INTO nodes (public_key, name, role) + VALUES (?, 'PDX-Repeater', 'repeater')`, pdxPK) + + // SJC-Repeater node (lives in San Jose) — sanity baseline + sjcPK := "sjc0000000000001" + db.conn.Exec(`INSERT INTO nodes (public_key, name, role) + VALUES (?, 'SJC-Repeater', 'repeater')`, sjcPK) + + pdxDecoded := `{"pubKey":"` + pdxPK + `","name":"PDX-Repeater","type":"ADVERT","flags":{"isRepeater":true}}` + sjcDecoded := `{"pubKey":"` + sjcPK + `","name":"SJC-Repeater","type":"ADVERT","flags":{"isRepeater":true}}` + + // 1) PDX-Repeater zero-hop DIRECT advert heard only by obs-PDX. + // Establishes PDX as the repeater's home region. + // raw_hex header 0x12 = route_type 2 (direct), payload_type 4 + // pathByte 0x40 (hashSize bits=01 → 2, hop_count=0) + db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json) + VALUES ('1240aabbccdd', 'pdx_zh_direct', ?, 2, 4, ?)`, recent, pdxDecoded) + db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp) + VALUES (1, 1, 12.0, -85, '[]', ?)`, recentEpoch) + + // 2) PDX-Repeater FLOOD advert with hashSize=2 (reliable). + // Heard ONLY by obs-SJC via a relay path (this is the polluting case). + // raw_hex header 0x11 = route_type 1 (flood), payload_type 4 + // pathByte 0x41 (hashSize bits=01 → 2, hop_count=1) + db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json) + VALUES ('1141aabbccdd', 'pdx_flood', ?, 1, 4, ?)`, recent, pdxDecoded) + db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp) + VALUES (2, 2, 8.0, -95, '["aa11"]', ?)`, recentEpoch) + + // 3) SJC-Repeater zero-hop DIRECT advert heard only by obs-SJC. + // Establishes SJC as the repeater's home region. + db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json) + VALUES ('1240ccddeeff', 'sjc_zh_direct', ?, 2, 4, ?)`, recent, sjcDecoded) + db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp) + VALUES (3, 2, 14.0, -82, '[]', ?)`, recentEpoch) + + // 4) SJC-Repeater FLOOD advert with hashSize=2, heard by obs-SJC. + db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json) + VALUES ('1141ccddeeff', 'sjc_flood', ?, 1, 4, ?)`, recent, sjcDecoded) + db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp) + VALUES (4, 2, 11.0, -88, '["cc22"]', ?)`, recentEpoch) + + store := NewPacketStore(db, nil) + store.Load() + + t.Run("region=SJC excludes PDX-Repeater (heard but not home)", func(t *testing.T) { + result := store.GetAnalyticsHashSizes("SJC") + + mb, ok := result["multiByteNodes"].([]map[string]interface{}) + if !ok { + t.Fatal("expected multiByteNodes slice") + } + + var foundPDX, foundSJC bool + for _, n := range mb { + pk, _ := n["pubkey"].(string) + if pk == pdxPK { + foundPDX = true + } + if pk == sjcPK { + foundSJC = true + } + } + + if foundPDX { + t.Errorf("PDX-Repeater leaked into SJC analytics — region attribution still observer-based (#804 not fixed)") + } + if !foundSJC { + t.Errorf("SJC-Repeater missing from SJC analytics — fix over-filtered") + } + }) + + t.Run("API exposes attributionMethod", func(t *testing.T) { + result := store.GetAnalyticsHashSizes("SJC") + method, ok := result["attributionMethod"].(string) + if !ok { + t.Fatal("expected attributionMethod string field on result") + } + if method != "repeater" { + t.Errorf("attributionMethod = %q, want %q", method, "repeater") + } + }) + + t.Run("region=PDX excludes SJC-Repeater", func(t *testing.T) { + result := store.GetAnalyticsHashSizes("PDX") + mb, _ := result["multiByteNodes"].([]map[string]interface{}) + + var foundPDX, foundSJC bool + for _, n := range mb { + pk, _ := n["pubkey"].(string) + if pk == pdxPK { + foundPDX = true + } + if pk == sjcPK { + foundSJC = true + } + } + if !foundPDX { + t.Errorf("PDX-Repeater missing from PDX analytics") + } + if foundSJC { + t.Errorf("SJC-Repeater leaked into PDX analytics") + } + }) +} diff --git a/cmd/server/store.go b/cmd/server/store.go index 23f75206..721bfe51 100644 --- a/cmd/server/store.go +++ b/cmd/server/store.go @@ -2441,6 +2441,145 @@ func (s *PacketStore) fetchAndCacheRegionObs(region string) map[string]bool { return m } +// iataMatchesRegion returns true if iata matches any of the comma-separated +// region codes in regionParam. Comparison is case-insensitive and trim-tolerant. +// Empty iata never matches; empty regionParam never matches. +// +// #804: shared helper used by analytics to attribute transmissions to a node's +// HOME region (derived from observers that hear its zero-hop direct adverts) +// rather than to the observer that happened to relay a packet. +func iataMatchesRegion(iata, regionParam string) bool { + if iata == "" || regionParam == "" { + return false + } + codes := normalizeRegionCodes(regionParam) + if len(codes) == 0 { + return false + } + got := strings.TrimSpace(strings.ToUpper(iata)) + if got == "" { + return false + } + for _, c := range codes { + if c == got { + return true + } + } + return false +} + +// computeNodeHomeRegions returns a pubkey → IATA map deriving each node's +// HOME region from zero-hop DIRECT adverts. A zero-hop direct advert is the +// most authoritative location signal because the path byte is set locally on +// the originating radio and the packet has not been relayed: the observer +// that hears it is necessarily within direct RF range of the originator. +// +// When a node has zero-hop direct adverts heard by observers from multiple +// regions, the most-frequently-observed region wins (geographic plurality). +// +// Caller must hold s.mu (read or write). Returns empty map (not nil) if no +// observers are loaded or no zero-hop direct adverts have been seen. +// +// #804: feeds analytics region-attribution so a multi-byte repeater whose +// flood adverts get relayed across regions is still attributed to its home. +func (s *PacketStore) computeNodeHomeRegions() map[string]string { + // Build observer → IATA map. observers table is small (≪ packets), so a + // single DB read here is acceptable; resolveRegionObservers does similar. + obsIATA := make(map[string]string, 64) + if s.db != nil { + if observers, err := s.db.GetObservers(); err == nil { + for _, o := range observers { + if o.IATA != nil && *o.IATA != "" { + obsIATA[o.ID] = strings.TrimSpace(strings.ToUpper(*o.IATA)) + } + } + } + } + if len(obsIATA) == 0 { + return map[string]string{} + } + + // Tally zero-hop direct ADVERT region observations per pubkey. + type tally struct { + counts map[string]int + } + per := make(map[string]*tally, 256) + + for _, tx := range s.packets { + if tx.RawHex == "" || len(tx.RawHex) < 4 { + continue + } + if tx.PayloadType == nil || *tx.PayloadType != PayloadADVERT { + continue + } + if tx.DecodedJSON == "" { + continue + } + header, err := strconv.ParseUint(tx.RawHex[:2], 16, 8) + if err != nil { + continue + } + routeType := header & 0x03 + if routeType != uint64(RouteDirect) && routeType != uint64(RouteTransportDirect) { + continue + } + // Path byte index — for direct/transport-direct it's at offset 1 + // (matches the analytics decoder's pathByteIdx logic). + if len(tx.RawHex) < 4 { + continue + } + pathByte, err := strconv.ParseUint(tx.RawHex[2:4], 16, 8) + if err != nil { + continue + } + hopCount := pathByte & 0x3F + if hopCount != 0 { + continue + } + + var d map[string]interface{} + if json.Unmarshal([]byte(tx.DecodedJSON), &d) != nil { + continue + } + pk, _ := d["pubKey"].(string) + if pk == "" { + pk, _ = d["public_key"].(string) + } + if pk == "" { + continue + } + + for _, obs := range tx.Observations { + iata := obsIATA[obs.ObserverID] + if iata == "" { + continue + } + t := per[pk] + if t == nil { + t = &tally{counts: map[string]int{}} + per[pk] = t + } + t.counts[iata]++ + } + } + + out := make(map[string]string, len(per)) + for pk, t := range per { + var bestIATA string + bestCount := 0 + for iata, n := range t.counts { + if n > bestCount || (n == bestCount && iata < bestIATA) { + bestCount = n + bestIATA = iata + } + } + if bestIATA != "" { + out[pk] = bestIATA + } + } + return out +} + // enrichObs returns a map with observation fields + transmission fields. func (s *PacketStore) enrichObs(obs *StoreObs) map[string]interface{} { tx := s.byTxID[obs.TransmissionID] @@ -5666,6 +5805,16 @@ func (s *PacketStore) computeAnalyticsHashSizes(region string) map[string]interf regionObs = s.resolveRegionObservers(region) } + // #804: derive each node's HOME region from zero-hop direct adverts (the + // most authoritative location signal — those packets cannot have been + // relayed). When non-empty, multi-byte node attribution prefers this + // over observer-region. Falls back to observer-region when unknown. + nodeHomeRegion := s.computeNodeHomeRegions() + attributionMethod := "observer" + if region != "" && len(nodeHomeRegion) > 0 { + attributionMethod = "repeater" + } + allNodes, pm := s.getCachedNodesAndPM() // Build pubkey→role map for filtering by node type. @@ -5684,18 +5833,6 @@ func (s *PacketStore) computeAnalyticsHashSizes(region string) map[string]interf if tx.RawHex == "" { continue } - if regionObs != nil { - match := false - for _, obs := range tx.Observations { - if regionObs[obs.ObserverID] { - match = true - break - } - } - if !match { - continue - } - } // Parse header and path byte if len(tx.RawHex) < 4 { @@ -5725,52 +5862,84 @@ func (s *PacketStore) computeAnalyticsHashSizes(region string) map[string]interf continue } - // Track originator from advert packets (including zero-hop adverts, - // keyed by pubKey so same-name nodes don't merge). + // #804: pre-extract originator pubkey for ADVERT packets so we can + // (a) relax observer-region filter when the originator's HOME region + // matches the requested region (a flood relay heard outside the + // home region must still attribute to the home), and + // (b) reuse the parsed values below without re-parsing. + var advertPK, advertName string + var advertParsed bool if tx.PayloadType != nil && *tx.PayloadType == PayloadADVERT && tx.DecodedJSON != "" { var d map[string]interface{} if json.Unmarshal([]byte(tx.DecodedJSON), &d) == nil { - pk := "" if v, ok := d["pubKey"].(string); ok { - pk = v + advertPK = v } else if v, ok := d["public_key"].(string); ok { - pk = v + advertPK = v } - if pk != "" { - name := "" - if n, ok := d["name"].(string); ok { - name = n - } - if name == "" { - if len(pk) >= 8 { - name = pk[:8] - } else { - name = pk - } - } - // Skip zero-hop direct adverts for hash_size — the - // path byte is locally generated and unreliable. - // Still count the packet and update lastSeen. - isZeroHop := (routeType == uint64(RouteDirect) || routeType == uint64(RouteTransportDirect)) && (actualPathByte&0x3F) == 0 - if byNode[pk] == nil { - role := nodeRoleByPK[pk] // empty if unknown - initHS := hashSize - if isZeroHop { - initHS = 0 - } - byNode[pk] = map[string]interface{}{ - "hashSize": initHS, "packets": 0, - "lastSeen": tx.FirstSeen, "name": name, - "role": role, - } - } - byNode[pk]["packets"] = byNode[pk]["packets"].(int) + 1 - if !isZeroHop { - byNode[pk]["hashSize"] = hashSize - } - byNode[pk]["lastSeen"] = tx.FirstSeen + if n, ok := d["name"].(string); ok { + advertName = n + } + advertParsed = advertPK != "" + } + } + + if regionObs != nil { + match := false + for _, obs := range tx.Observations { + if regionObs[obs.ObserverID] { + match = true + break } } + // #804: allow ADVERTs from a node whose HOME region matches the + // requested region even if no observer in that region heard this + // particular packet (e.g. flood relay heard only by an out-of- + // region observer). Conservative: only ADVERTs (the source is + // known by pubkey) and only when home is established. + if !match && advertParsed { + if home, ok := nodeHomeRegion[advertPK]; ok && iataMatchesRegion(home, region) { + match = true + } + } + if !match { + continue + } + } + + // Track originator from advert packets (including zero-hop adverts, + // keyed by pubKey so same-name nodes don't merge). + if advertParsed { + pk := advertPK + name := advertName + if name == "" { + if len(pk) >= 8 { + name = pk[:8] + } else { + name = pk + } + } + // Skip zero-hop direct adverts for hash_size — the + // path byte is locally generated and unreliable. + // Still count the packet and update lastSeen. + isZeroHop := (routeType == uint64(RouteDirect) || routeType == uint64(RouteTransportDirect)) && (actualPathByte&0x3F) == 0 + if byNode[pk] == nil { + role := nodeRoleByPK[pk] // empty if unknown + initHS := hashSize + if isZeroHop { + initHS = 0 + } + byNode[pk] = map[string]interface{}{ + "hashSize": initHS, "packets": 0, + "lastSeen": tx.FirstSeen, "name": name, + "role": role, + } + } + byNode[pk]["packets"] = byNode[pk]["packets"].(int) + 1 + if !isZeroHop { + byNode[pk]["hashSize"] = hashSize + } + byNode[pk]["lastSeen"] = tx.FirstSeen } // Distribution/hourly/uniqueHops only for packets with relay hops @@ -5851,6 +6020,15 @@ func (s *PacketStore) computeAnalyticsHashSizes(region string) map[string]interf // Multi-byte nodes multiByteNodes := make([]map[string]interface{}, 0) for pk, data := range byNode { + // #804: when a region filter is active, prefer the repeater's HOME + // region over the observer that happened to relay it. Falls back to + // the (already-applied) observer-region filter when the node's home + // region is unknown. + if region != "" { + if home, ok := nodeHomeRegion[pk]; ok && !iataMatchesRegion(home, region) { + continue + } + } if data["hashSize"].(int) > 1 { multiByteNodes = append(multiByteNodes, map[string]interface{}{ "name": data["name"], "hashSize": data["hashSize"], @@ -5865,11 +6043,17 @@ func (s *PacketStore) computeAnalyticsHashSizes(region string) map[string]interf // Distribution by repeaters: count unique REPEATER nodes per hash size distributionByRepeaters := map[string]int{"1": 0, "2": 0, "3": 0} - for _, data := range byNode { + for pk, data := range byNode { role, _ := data["role"].(string) if !strings.Contains(strings.ToLower(role), "repeater") { continue } + // #804: same repeater-region preference as multiByteNodes. + if region != "" { + if home, ok := nodeHomeRegion[pk]; ok && !iataMatchesRegion(home, region) { + continue + } + } hs := data["hashSize"].(int) key := strconv.Itoa(hs) distributionByRepeaters[key]++ @@ -5882,6 +6066,7 @@ func (s *PacketStore) computeAnalyticsHashSizes(region string) map[string]interf "hourly": hourly, "topHops": topHops, "multiByteNodes": multiByteNodes, + "attributionMethod": attributionMethod, } }