fix(#804): attribute analytics by repeater home region, not observer (#1025)

Fixes #804.

## Problem
Analytics filtered region purely by **observer** region: a multi-byte
repeater whose home is PDX would leak into SJC results whenever its
flood
adverts were relayed past an SJC observer. Per-node groupings
(`multiByteNodes`, `distributionByRepeaters`) inherited the same bug.

## Fix

Two new helpers in `cmd/server/store.go`:

- `iataMatchesRegion(iata, regionParam)` — case-insensitive IATA→region
  match using the existing `normalizeRegionCodes` parser.
- `computeNodeHomeRegions()` — derives each node's HOME IATA from its
  zero-hop DIRECT adverts. Path byte for those packets is set locally on
  the originating radio and the packet has not been relayed, so the
  observer that hears it must be in direct RF range. Plurality vote when
  zero-hop adverts span multiple regions.

`computeAnalyticsHashSizes` now applies these in two ways:

1. **Observer-region filter is relaxed for ADVERT packets** when the
   originator's home region matches the requested region. A flood advert
   from a PDX repeater that's only heard by an SJC observer still
   attributes to PDX.
2. **Per-node grouping** (`multiByteNodes`, `distributionByRepeaters`)
   excludes nodes whose HOME region disagrees with the requested region.
   Falls back to the observer-region filter when home is unknown.

Adds `attributionMethod` to the response (`"observer"` or `"repeater"`)
so operators can tell which method was applied.

## Backwards compatibility

- No region filter requested → behavior unchanged (`attributionMethod`
  is `"observer"`).
- Region filter requested but no zero-hop direct adverts seen for a node
  → falls back to the prior observer-region check for that node.
- Operators without IATA-tagged observers see no change.

## TDD

- **Red commit** (`c35d349`): adds
`TestIssue804_AnalyticsAttributesByRepeaterRegion`
with three subtests (PDX leak into SJC, attributionMethod field present,
  SJC leak into PDX). Compiles, runs, fails on assertions.
- **Green commit** (`11b157f`): the implementation. All subtests pass,
  full `cmd/server` package green.

## Files changed
- `cmd/server/store.go` — helpers + analytics filter logic (+236/-51)
- `cmd/server/issue804_repeater_region_test.go` — new test (+147)

---------

Co-authored-by: CoreScope Bot <bot@corescope.local>
Co-authored-by: openclaw-bot <bot@openclaw.local>
This commit is contained in:
Kpa-clawbot
2026-05-03 20:10:02 -07:00
committed by GitHub
parent 019ace3645
commit 9f55ef802b
2 changed files with 383 additions and 51 deletions
+147
View File
@@ -0,0 +1,147 @@
package main
import (
"testing"
"time"
)
// TestIssue804_AnalyticsAttributesByRepeaterRegion verifies that analytics
// (specifically GetAnalyticsHashSizes) attribute multi-byte nodes to the
// REPEATER's home region, not the observer that happened to hear the relay.
//
// Scenario from #804:
// - PDX-Repeater is a multi-byte (hashSize=2) repeater whose ZERO-HOP direct
// adverts are only heard by obs-PDX (a PDX observer). That zero-hop direct
// advert is the most reliable home-region signal — it cannot have been
// relayed.
// - A flood advert from PDX-Repeater (hashSize=2) propagates and is heard by
// obs-SJC (a SJC observer) via a multi-hop relay path.
// - When the user asks for region=SJC analytics, the PDX-Repeater MUST NOT
// pollute SJC's multiByteNodes — it lives in PDX.
// - The result should also expose attributionMethod="repeater" so the API
// consumer knows which method was used.
//
// Pre-fix behavior: PDX-Repeater appears in SJC's multiByteNodes because the
// filter is observer-based. This test fails on the pre-fix code at the
// "want PDX-Repeater EXCLUDED" assertion.
func TestIssue804_AnalyticsAttributesByRepeaterRegion(t *testing.T) {
db := setupTestDB(t)
defer db.Close()
now := time.Now().UTC()
recent := now.Add(-1 * time.Hour).Format(time.RFC3339)
recentEpoch := now.Add(-1 * time.Hour).Unix()
// Observers: one in PDX, one in SJC
db.conn.Exec(`INSERT INTO observers (id, name, iata, last_seen, first_seen, packet_count)
VALUES ('obs-pdx', 'Obs PDX', 'PDX', ?, '2026-01-01T00:00:00Z', 100)`, recent)
db.conn.Exec(`INSERT INTO observers (id, name, iata, last_seen, first_seen, packet_count)
VALUES ('obs-sjc', 'Obs SJC', 'SJC', ?, '2026-01-01T00:00:00Z', 100)`, recent)
// PDX-Repeater node (lives in Portland)
pdxPK := "pdx0000000000001"
db.conn.Exec(`INSERT INTO nodes (public_key, name, role)
VALUES (?, 'PDX-Repeater', 'repeater')`, pdxPK)
// SJC-Repeater node (lives in San Jose) — sanity baseline
sjcPK := "sjc0000000000001"
db.conn.Exec(`INSERT INTO nodes (public_key, name, role)
VALUES (?, 'SJC-Repeater', 'repeater')`, sjcPK)
pdxDecoded := `{"pubKey":"` + pdxPK + `","name":"PDX-Repeater","type":"ADVERT","flags":{"isRepeater":true}}`
sjcDecoded := `{"pubKey":"` + sjcPK + `","name":"SJC-Repeater","type":"ADVERT","flags":{"isRepeater":true}}`
// 1) PDX-Repeater zero-hop DIRECT advert heard only by obs-PDX.
// Establishes PDX as the repeater's home region.
// raw_hex header 0x12 = route_type 2 (direct), payload_type 4
// pathByte 0x40 (hashSize bits=01 → 2, hop_count=0)
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json)
VALUES ('1240aabbccdd', 'pdx_zh_direct', ?, 2, 4, ?)`, recent, pdxDecoded)
db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
VALUES (1, 1, 12.0, -85, '[]', ?)`, recentEpoch)
// 2) PDX-Repeater FLOOD advert with hashSize=2 (reliable).
// Heard ONLY by obs-SJC via a relay path (this is the polluting case).
// raw_hex header 0x11 = route_type 1 (flood), payload_type 4
// pathByte 0x41 (hashSize bits=01 → 2, hop_count=1)
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json)
VALUES ('1141aabbccdd', 'pdx_flood', ?, 1, 4, ?)`, recent, pdxDecoded)
db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
VALUES (2, 2, 8.0, -95, '["aa11"]', ?)`, recentEpoch)
// 3) SJC-Repeater zero-hop DIRECT advert heard only by obs-SJC.
// Establishes SJC as the repeater's home region.
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json)
VALUES ('1240ccddeeff', 'sjc_zh_direct', ?, 2, 4, ?)`, recent, sjcDecoded)
db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
VALUES (3, 2, 14.0, -82, '[]', ?)`, recentEpoch)
// 4) SJC-Repeater FLOOD advert with hashSize=2, heard by obs-SJC.
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json)
VALUES ('1141ccddeeff', 'sjc_flood', ?, 1, 4, ?)`, recent, sjcDecoded)
db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
VALUES (4, 2, 11.0, -88, '["cc22"]', ?)`, recentEpoch)
store := NewPacketStore(db, nil)
store.Load()
t.Run("region=SJC excludes PDX-Repeater (heard but not home)", func(t *testing.T) {
result := store.GetAnalyticsHashSizes("SJC")
mb, ok := result["multiByteNodes"].([]map[string]interface{})
if !ok {
t.Fatal("expected multiByteNodes slice")
}
var foundPDX, foundSJC bool
for _, n := range mb {
pk, _ := n["pubkey"].(string)
if pk == pdxPK {
foundPDX = true
}
if pk == sjcPK {
foundSJC = true
}
}
if foundPDX {
t.Errorf("PDX-Repeater leaked into SJC analytics — region attribution still observer-based (#804 not fixed)")
}
if !foundSJC {
t.Errorf("SJC-Repeater missing from SJC analytics — fix over-filtered")
}
})
t.Run("API exposes attributionMethod", func(t *testing.T) {
result := store.GetAnalyticsHashSizes("SJC")
method, ok := result["attributionMethod"].(string)
if !ok {
t.Fatal("expected attributionMethod string field on result")
}
if method != "repeater" {
t.Errorf("attributionMethod = %q, want %q", method, "repeater")
}
})
t.Run("region=PDX excludes SJC-Repeater", func(t *testing.T) {
result := store.GetAnalyticsHashSizes("PDX")
mb, _ := result["multiByteNodes"].([]map[string]interface{})
var foundPDX, foundSJC bool
for _, n := range mb {
pk, _ := n["pubkey"].(string)
if pk == pdxPK {
foundPDX = true
}
if pk == sjcPK {
foundSJC = true
}
}
if !foundPDX {
t.Errorf("PDX-Repeater missing from PDX analytics")
}
if foundSJC {
t.Errorf("SJC-Repeater leaked into PDX analytics")
}
})
}
+236 -51
View File
@@ -2441,6 +2441,145 @@ func (s *PacketStore) fetchAndCacheRegionObs(region string) map[string]bool {
return m
}
// iataMatchesRegion returns true if iata matches any of the comma-separated
// region codes in regionParam. Comparison is case-insensitive and trim-tolerant.
// Empty iata never matches; empty regionParam never matches.
//
// #804: shared helper used by analytics to attribute transmissions to a node's
// HOME region (derived from observers that hear its zero-hop direct adverts)
// rather than to the observer that happened to relay a packet.
func iataMatchesRegion(iata, regionParam string) bool {
if iata == "" || regionParam == "" {
return false
}
codes := normalizeRegionCodes(regionParam)
if len(codes) == 0 {
return false
}
got := strings.TrimSpace(strings.ToUpper(iata))
if got == "" {
return false
}
for _, c := range codes {
if c == got {
return true
}
}
return false
}
// computeNodeHomeRegions returns a pubkey → IATA map deriving each node's
// HOME region from zero-hop DIRECT adverts. A zero-hop direct advert is the
// most authoritative location signal because the path byte is set locally on
// the originating radio and the packet has not been relayed: the observer
// that hears it is necessarily within direct RF range of the originator.
//
// When a node has zero-hop direct adverts heard by observers from multiple
// regions, the most-frequently-observed region wins (geographic plurality).
//
// Caller must hold s.mu (read or write). Returns empty map (not nil) if no
// observers are loaded or no zero-hop direct adverts have been seen.
//
// #804: feeds analytics region-attribution so a multi-byte repeater whose
// flood adverts get relayed across regions is still attributed to its home.
func (s *PacketStore) computeNodeHomeRegions() map[string]string {
// Build observer → IATA map. observers table is small (≪ packets), so a
// single DB read here is acceptable; resolveRegionObservers does similar.
obsIATA := make(map[string]string, 64)
if s.db != nil {
if observers, err := s.db.GetObservers(); err == nil {
for _, o := range observers {
if o.IATA != nil && *o.IATA != "" {
obsIATA[o.ID] = strings.TrimSpace(strings.ToUpper(*o.IATA))
}
}
}
}
if len(obsIATA) == 0 {
return map[string]string{}
}
// Tally zero-hop direct ADVERT region observations per pubkey.
type tally struct {
counts map[string]int
}
per := make(map[string]*tally, 256)
for _, tx := range s.packets {
if tx.RawHex == "" || len(tx.RawHex) < 4 {
continue
}
if tx.PayloadType == nil || *tx.PayloadType != PayloadADVERT {
continue
}
if tx.DecodedJSON == "" {
continue
}
header, err := strconv.ParseUint(tx.RawHex[:2], 16, 8)
if err != nil {
continue
}
routeType := header & 0x03
if routeType != uint64(RouteDirect) && routeType != uint64(RouteTransportDirect) {
continue
}
// Path byte index — for direct/transport-direct it's at offset 1
// (matches the analytics decoder's pathByteIdx logic).
if len(tx.RawHex) < 4 {
continue
}
pathByte, err := strconv.ParseUint(tx.RawHex[2:4], 16, 8)
if err != nil {
continue
}
hopCount := pathByte & 0x3F
if hopCount != 0 {
continue
}
var d map[string]interface{}
if json.Unmarshal([]byte(tx.DecodedJSON), &d) != nil {
continue
}
pk, _ := d["pubKey"].(string)
if pk == "" {
pk, _ = d["public_key"].(string)
}
if pk == "" {
continue
}
for _, obs := range tx.Observations {
iata := obsIATA[obs.ObserverID]
if iata == "" {
continue
}
t := per[pk]
if t == nil {
t = &tally{counts: map[string]int{}}
per[pk] = t
}
t.counts[iata]++
}
}
out := make(map[string]string, len(per))
for pk, t := range per {
var bestIATA string
bestCount := 0
for iata, n := range t.counts {
if n > bestCount || (n == bestCount && iata < bestIATA) {
bestCount = n
bestIATA = iata
}
}
if bestIATA != "" {
out[pk] = bestIATA
}
}
return out
}
// enrichObs returns a map with observation fields + transmission fields.
func (s *PacketStore) enrichObs(obs *StoreObs) map[string]interface{} {
tx := s.byTxID[obs.TransmissionID]
@@ -5666,6 +5805,16 @@ func (s *PacketStore) computeAnalyticsHashSizes(region string) map[string]interf
regionObs = s.resolveRegionObservers(region)
}
// #804: derive each node's HOME region from zero-hop direct adverts (the
// most authoritative location signal — those packets cannot have been
// relayed). When non-empty, multi-byte node attribution prefers this
// over observer-region. Falls back to observer-region when unknown.
nodeHomeRegion := s.computeNodeHomeRegions()
attributionMethod := "observer"
if region != "" && len(nodeHomeRegion) > 0 {
attributionMethod = "repeater"
}
allNodes, pm := s.getCachedNodesAndPM()
// Build pubkey→role map for filtering by node type.
@@ -5684,18 +5833,6 @@ func (s *PacketStore) computeAnalyticsHashSizes(region string) map[string]interf
if tx.RawHex == "" {
continue
}
if regionObs != nil {
match := false
for _, obs := range tx.Observations {
if regionObs[obs.ObserverID] {
match = true
break
}
}
if !match {
continue
}
}
// Parse header and path byte
if len(tx.RawHex) < 4 {
@@ -5725,52 +5862,84 @@ func (s *PacketStore) computeAnalyticsHashSizes(region string) map[string]interf
continue
}
// Track originator from advert packets (including zero-hop adverts,
// keyed by pubKey so same-name nodes don't merge).
// #804: pre-extract originator pubkey for ADVERT packets so we can
// (a) relax observer-region filter when the originator's HOME region
// matches the requested region (a flood relay heard outside the
// home region must still attribute to the home), and
// (b) reuse the parsed values below without re-parsing.
var advertPK, advertName string
var advertParsed bool
if tx.PayloadType != nil && *tx.PayloadType == PayloadADVERT && tx.DecodedJSON != "" {
var d map[string]interface{}
if json.Unmarshal([]byte(tx.DecodedJSON), &d) == nil {
pk := ""
if v, ok := d["pubKey"].(string); ok {
pk = v
advertPK = v
} else if v, ok := d["public_key"].(string); ok {
pk = v
advertPK = v
}
if pk != "" {
name := ""
if n, ok := d["name"].(string); ok {
name = n
}
if name == "" {
if len(pk) >= 8 {
name = pk[:8]
} else {
name = pk
}
}
// Skip zero-hop direct adverts for hash_size — the
// path byte is locally generated and unreliable.
// Still count the packet and update lastSeen.
isZeroHop := (routeType == uint64(RouteDirect) || routeType == uint64(RouteTransportDirect)) && (actualPathByte&0x3F) == 0
if byNode[pk] == nil {
role := nodeRoleByPK[pk] // empty if unknown
initHS := hashSize
if isZeroHop {
initHS = 0
}
byNode[pk] = map[string]interface{}{
"hashSize": initHS, "packets": 0,
"lastSeen": tx.FirstSeen, "name": name,
"role": role,
}
}
byNode[pk]["packets"] = byNode[pk]["packets"].(int) + 1
if !isZeroHop {
byNode[pk]["hashSize"] = hashSize
}
byNode[pk]["lastSeen"] = tx.FirstSeen
if n, ok := d["name"].(string); ok {
advertName = n
}
advertParsed = advertPK != ""
}
}
if regionObs != nil {
match := false
for _, obs := range tx.Observations {
if regionObs[obs.ObserverID] {
match = true
break
}
}
// #804: allow ADVERTs from a node whose HOME region matches the
// requested region even if no observer in that region heard this
// particular packet (e.g. flood relay heard only by an out-of-
// region observer). Conservative: only ADVERTs (the source is
// known by pubkey) and only when home is established.
if !match && advertParsed {
if home, ok := nodeHomeRegion[advertPK]; ok && iataMatchesRegion(home, region) {
match = true
}
}
if !match {
continue
}
}
// Track originator from advert packets (including zero-hop adverts,
// keyed by pubKey so same-name nodes don't merge).
if advertParsed {
pk := advertPK
name := advertName
if name == "" {
if len(pk) >= 8 {
name = pk[:8]
} else {
name = pk
}
}
// Skip zero-hop direct adverts for hash_size — the
// path byte is locally generated and unreliable.
// Still count the packet and update lastSeen.
isZeroHop := (routeType == uint64(RouteDirect) || routeType == uint64(RouteTransportDirect)) && (actualPathByte&0x3F) == 0
if byNode[pk] == nil {
role := nodeRoleByPK[pk] // empty if unknown
initHS := hashSize
if isZeroHop {
initHS = 0
}
byNode[pk] = map[string]interface{}{
"hashSize": initHS, "packets": 0,
"lastSeen": tx.FirstSeen, "name": name,
"role": role,
}
}
byNode[pk]["packets"] = byNode[pk]["packets"].(int) + 1
if !isZeroHop {
byNode[pk]["hashSize"] = hashSize
}
byNode[pk]["lastSeen"] = tx.FirstSeen
}
// Distribution/hourly/uniqueHops only for packets with relay hops
@@ -5851,6 +6020,15 @@ func (s *PacketStore) computeAnalyticsHashSizes(region string) map[string]interf
// Multi-byte nodes
multiByteNodes := make([]map[string]interface{}, 0)
for pk, data := range byNode {
// #804: when a region filter is active, prefer the repeater's HOME
// region over the observer that happened to relay it. Falls back to
// the (already-applied) observer-region filter when the node's home
// region is unknown.
if region != "" {
if home, ok := nodeHomeRegion[pk]; ok && !iataMatchesRegion(home, region) {
continue
}
}
if data["hashSize"].(int) > 1 {
multiByteNodes = append(multiByteNodes, map[string]interface{}{
"name": data["name"], "hashSize": data["hashSize"],
@@ -5865,11 +6043,17 @@ func (s *PacketStore) computeAnalyticsHashSizes(region string) map[string]interf
// Distribution by repeaters: count unique REPEATER nodes per hash size
distributionByRepeaters := map[string]int{"1": 0, "2": 0, "3": 0}
for _, data := range byNode {
for pk, data := range byNode {
role, _ := data["role"].(string)
if !strings.Contains(strings.ToLower(role), "repeater") {
continue
}
// #804: same repeater-region preference as multiByteNodes.
if region != "" {
if home, ok := nodeHomeRegion[pk]; ok && !iataMatchesRegion(home, region) {
continue
}
}
hs := data["hashSize"].(int)
key := strconv.Itoa(hs)
distributionByRepeaters[key]++
@@ -5882,6 +6066,7 @@ func (s *PacketStore) computeAnalyticsHashSizes(region string) map[string]interf
"hourly": hourly,
"topHops": topHops,
"multiByteNodes": multiByteNodes,
"attributionMethod": attributionMethod,
}
}