mirror of
https://github.com/Kpa-clawbot/meshcore-analyzer.git
synced 2026-05-20 03:25:05 +00:00
2e28aa3e04
RED235b65b4(CI will surface URL after PR open) — `test(#1229): tier-1 must prefer multi-observer edges`. Green:841fc5de. ## Summary Implements **Option C** from issue #1229: edge source-diversity confidence weighting. Each neighbor-graph edge already tracks the set of distinct observers that contributed to it (`NeighborEdge.Observers`). This PR is the first to consume that signal in the disambiguator. Tier-1 score in `pm.resolveWithContext` becomes `Score(now) × Confidence()` where: ``` Confidence() = min(1.0, max(1, |Observers|) / 3.0) ``` - 1 observer → 1/3 weight (single-source, suspect) - 2 observers → 2/3 weight - ≥3 observers → 1.0 (saturated, full historical weight) A 6-observer edge (30 obs) now beats a 1-observer edge (25 obs) by 3.6× (vs. 1.2× before) — enough to clear `affinityConfidenceRatio` and skip the tier-2 geo fallback that was misresolving in cross-region cases. Stacks with the geo-rejection filter merged in #1228/#1230 to give two independent defenses against cross-region prefix-collision pollution. ## Why C over A/B - **A (per-observer graphs):** N×memory cost, biggest refactor surface. - **B (per-region/IATA segmented):** requires region attribution on every packet + per-region cache plumbing; deferred follow-up. - **C:** smallest diff (~30 lines), no schema migration, leverages an existing field, composes additively with #1228. A and B remain valid follow-ups if C proves insufficient. ## Backward compatibility (persistence) `neighbor_edges` schema is **unchanged**. `Observers` is rebuilt by `BuildFromStoreWithOptions` from live observations on every graph refresh (5-min TTL). Persisted rows carry an empty set only during the post-restart warm-up; `Confidence()` defaults n→1 when `|Observers|==0`, so legacy rows resolve as single-observer (degraded but non-zero) confidence rather than disappearing. Defensive. ## Tests - `cmd/server/hop_disambig_confidence_test.go:48` — RED-then-GREEN E2E: two `8a` candidates from the same anchor, candX placed geo-near with 1 observer × 25 obs, candY placed geo-far with 6 observers × 5 obs. Without confidence weighting tier-1 falls through (1.2× ratio) and tier-2 picks the wrong (geo-near) candX. With confidence weighting tier-1 fires and picks candY. Asserts `method == "neighbor_affinity"` to pin the resolver path. - `TestNeighborEdge_ObserverSetIsDistinct` — guards the source-diversity counter against double-counting same-observer contributions and pins the `Confidence()` formula at both endpoints (single → fractional, ≥3 → 1.0). All existing tier-1 tests (`hop_disambig_tier1_test.go`) continue to pass — they seed with a single observer, so their weights drop from 1.0 to 1/3 uniformly across candidates, preserving the ratio guard outcome. Fixes #1229 --------- Co-authored-by: bot <bot@corescope.local>
775 lines
24 KiB
Go
775 lines
24 KiB
Go
package main
|
||
|
||
import (
|
||
"encoding/json"
|
||
"fmt"
|
||
"log"
|
||
"math"
|
||
"strings"
|
||
"sync"
|
||
"sync/atomic"
|
||
"time"
|
||
)
|
||
|
||
// ─── Constants ─────────────────────────────────────────────────────────────────
|
||
|
||
const (
|
||
// After this many observations, count contributes max weight to the score.
|
||
affinitySaturationCount = 100
|
||
// Time-decay half-life: 7 days.
|
||
affinityHalfLifeHours = 168.0
|
||
// Cache TTL for the built graph.
|
||
neighborGraphTTL = 5 * time.Minute
|
||
// Auto-resolve confidence: best must be >= this factor × second-best.
|
||
affinityConfidenceRatio = 3.0
|
||
// Minimum observation count to auto-resolve.
|
||
affinityMinObservations = 3
|
||
// Source-diversity saturation: edges contributed by this many distinct
|
||
// observers (or more) earn full confidence weight (multiplier 1.0).
|
||
// Fewer observers earn a proportional fraction. Issue #1229 (Option C).
|
||
affinityObserverSaturation = 3.0
|
||
)
|
||
|
||
// affinityLambda = ln(2) / half-life-hours, precomputed.
|
||
var affinityLambda = math.Ln2 / affinityHalfLifeHours
|
||
|
||
// ─── Data model ────────────────────────────────────────────────────────────────
|
||
|
||
// edgeKey is the canonical key for an undirected edge (A < B lexicographically).
|
||
// For ambiguous edges where NodeB is unknown, B is the raw prefix prefixed with "prefix:".
|
||
type edgeKey struct {
|
||
A, B string
|
||
}
|
||
|
||
func makeEdgeKey(a, b string) edgeKey {
|
||
if a > b {
|
||
a, b = b, a
|
||
}
|
||
return edgeKey{A: a, B: b}
|
||
}
|
||
|
||
// NeighborEdge represents a weighted, undirected first-hop neighbor relationship.
|
||
type NeighborEdge struct {
|
||
NodeA string // full pubkey
|
||
NodeB string // full pubkey, or "" if unresolved/ambiguous
|
||
Prefix string // raw hop prefix that established this edge
|
||
Count int // total observations
|
||
FirstSeen time.Time //
|
||
LastSeen time.Time //
|
||
SNRSum float64 // running sum for average
|
||
SNRCount int // how many SNR samples
|
||
Observers map[string]bool // observer pubkeys that witnessed
|
||
Ambiguous bool // multiple candidates or zero candidates
|
||
Candidates []string // candidate pubkeys when ambiguous
|
||
Resolved bool // true if auto-resolved via Jaccard
|
||
}
|
||
|
||
// Score computes the affinity score at query time with time decay.
|
||
func (e *NeighborEdge) Score(now time.Time) float64 {
|
||
countFactor := math.Min(1.0, float64(e.Count)/float64(affinitySaturationCount))
|
||
hoursSince := now.Sub(e.LastSeen).Hours()
|
||
if hoursSince < 0 {
|
||
hoursSince = 0
|
||
}
|
||
decay := math.Exp(-affinityLambda * hoursSince)
|
||
return countFactor * decay
|
||
}
|
||
|
||
// Confidence returns a source-diversity multiplier in (0, 1] derived from the
|
||
// number of distinct observers that have contributed to this edge. Issue #1229
|
||
// (Option C): edges corroborated by multiple independent observers should
|
||
// outrank edges seen by a single observer at the same raw score.
|
||
//
|
||
// Formula: min(1.0, max(1, |Observers|) / affinityObserverSaturation).
|
||
// With saturation=3, a single observer yields 1/3, two observers 2/3, and
|
||
// three-or-more observers saturate at 1.0 — full historical weight. Edges
|
||
// with an empty observer set (legacy persisted rows lacking the column;
|
||
// see neighbor_persist.go backward-compat) default to a count of 1 so they
|
||
// behave like single-observer edges rather than disappearing — defensive.
|
||
func (e *NeighborEdge) Confidence() float64 {
|
||
n := float64(len(e.Observers))
|
||
if n < 1 {
|
||
n = 1
|
||
}
|
||
c := n / affinityObserverSaturation
|
||
if c > 1.0 {
|
||
c = 1.0
|
||
}
|
||
return c
|
||
}
|
||
|
||
// AvgSNR returns the average SNR, or 0 if no samples.
|
||
func (e *NeighborEdge) AvgSNR() float64 {
|
||
if e.SNRCount == 0 {
|
||
return 0
|
||
}
|
||
return e.SNRSum / float64(e.SNRCount)
|
||
}
|
||
|
||
// ─── NeighborGraph ─────────────────────────────────────────────────────────────
|
||
|
||
// NeighborGraph is a cached, in-memory first-hop neighbor affinity graph.
|
||
type NeighborGraph struct {
|
||
mu sync.RWMutex
|
||
edges map[edgeKey]*NeighborEdge
|
||
byNode map[string][]*NeighborEdge // pubkey → edges involving this node
|
||
builtAt time.Time
|
||
logFn func(prefix, msg string) // optional structured logging callback
|
||
|
||
// RejectedEdgesGeoFar counts edges dropped at build time because both
|
||
// endpoints had GPS and their haversine distance exceeded the
|
||
// configurable threshold (NeighborGraphConfig.MaxEdgeKm, default 500).
|
||
// Accessed via sync/atomic. See issue #1228.
|
||
RejectedEdgesGeoFar uint64
|
||
|
||
// maxEdgeKm is the geo-sanity threshold copied from config at build
|
||
// time. 0 means "no limit" / filter disabled.
|
||
maxEdgeKm float64
|
||
|
||
// nodeGeo maps lowercased pubkey → (lat, lon, hasGPS) for geo-sanity
|
||
// checks during upsertEdge. Populated by the builder; empty for graphs
|
||
// constructed via NewNeighborGraph directly (geo filter inert).
|
||
nodeGeo map[string]nodeGeoInfo
|
||
}
|
||
|
||
// nodeGeoInfo is the minimal geo slice cached on the graph for upsertEdge.
|
||
type nodeGeoInfo struct {
|
||
Lat, Lon float64
|
||
HasGPS bool
|
||
}
|
||
|
||
// NewNeighborGraph creates an empty graph.
|
||
func NewNeighborGraph() *NeighborGraph {
|
||
return &NeighborGraph{
|
||
edges: make(map[edgeKey]*NeighborEdge),
|
||
byNode: make(map[string][]*NeighborEdge),
|
||
}
|
||
}
|
||
|
||
// Neighbors returns all edges for a given node pubkey.
|
||
func (g *NeighborGraph) Neighbors(pubkey string) []*NeighborEdge {
|
||
g.mu.RLock()
|
||
defer g.mu.RUnlock()
|
||
return g.byNode[strings.ToLower(pubkey)]
|
||
}
|
||
|
||
// AllEdges returns all edges in the graph.
|
||
func (g *NeighborGraph) AllEdges() []*NeighborEdge {
|
||
g.mu.RLock()
|
||
defer g.mu.RUnlock()
|
||
out := make([]*NeighborEdge, 0, len(g.edges))
|
||
for _, e := range g.edges {
|
||
out = append(out, e)
|
||
}
|
||
return out
|
||
}
|
||
|
||
// MarkAmbiguous flips the Ambiguous flag on the edge between pubkeyA and
|
||
// pubkeyB (key direction-agnostic) to the supplied value. Returns true if
|
||
// the edge existed and was updated.
|
||
//
|
||
// This helper exists so tests don't have to mutate *NeighborEdge fields
|
||
// returned from AllEdges()/Neighbors() — those mutations work today only
|
||
// because the map stores pointers, which is a hidden coupling. Routing
|
||
// the flip through a method makes the intent explicit and lets the graph
|
||
// take its own write-lock.
|
||
func (g *NeighborGraph) MarkAmbiguous(pubkeyA, pubkeyB string, ambiguous bool) bool {
|
||
g.mu.Lock()
|
||
defer g.mu.Unlock()
|
||
key := makeEdgeKey(strings.ToLower(pubkeyA), strings.ToLower(pubkeyB))
|
||
e, ok := g.edges[key]
|
||
if !ok {
|
||
return false
|
||
}
|
||
e.Ambiguous = ambiguous
|
||
return true
|
||
}
|
||
|
||
// IsStale returns true if the graph cache has expired.
|
||
func (g *NeighborGraph) IsStale() bool {
|
||
g.mu.RLock()
|
||
defer g.mu.RUnlock()
|
||
return g.builtAt.IsZero() || time.Since(g.builtAt) > neighborGraphTTL
|
||
}
|
||
|
||
// ─── Builder ───────────────────────────────────────────────────────────────────
|
||
|
||
// BuildFromStore constructs the neighbor graph from all packets in the store.
|
||
// The store's read-lock must NOT be held by the caller.
|
||
func BuildFromStore(store *PacketStore) *NeighborGraph {
|
||
return BuildFromStoreWithOptions(store, BuildOptions{MaxEdgeKm: DefaultMaxEdgeKm})
|
||
}
|
||
|
||
// BuildOptions controls optional behavior of BuildFromStoreWithOptions.
|
||
type BuildOptions struct {
|
||
EnableLog bool // structured disambiguation logging
|
||
MaxEdgeKm float64 // geo-sanity threshold; 0 disables the filter
|
||
}
|
||
|
||
// DefaultMaxEdgeKm is the conservative built-in cap for the
|
||
// geo-implausibility filter (issue #1228). 500 km is comfortably above any
|
||
// plausible terrestrial LoRa hop (including satellite-relayed cases).
|
||
const DefaultMaxEdgeKm = 500.0
|
||
|
||
// cachedToLower returns strings.ToLower(s), caching results to avoid
|
||
// repeated allocations for the same pubkey string.
|
||
func cachedToLower(cache map[string]string, s string) string {
|
||
if v, ok := cache[s]; ok {
|
||
return v
|
||
}
|
||
v := strings.ToLower(s)
|
||
cache[s] = v
|
||
return v
|
||
}
|
||
|
||
// BuildFromStoreWithLog constructs the neighbor graph, optionally logging disambiguation decisions.
|
||
// Kept for backward compatibility; new callers should use BuildFromStoreWithOptions.
|
||
func BuildFromStoreWithLog(store *PacketStore, enableLog bool) *NeighborGraph {
|
||
return BuildFromStoreWithOptions(store, BuildOptions{EnableLog: enableLog, MaxEdgeKm: DefaultMaxEdgeKm})
|
||
}
|
||
|
||
// BuildFromStoreWithOptions constructs the neighbor graph with explicit options.
|
||
func BuildFromStoreWithOptions(store *PacketStore, opts BuildOptions) *NeighborGraph {
|
||
g := NewNeighborGraph()
|
||
g.maxEdgeKm = opts.MaxEdgeKm
|
||
if opts.EnableLog {
|
||
g.logFn = func(prefix, msg string) {
|
||
log.Printf("[affinity] resolve %s: %s", prefix, msg)
|
||
}
|
||
}
|
||
|
||
store.mu.RLock()
|
||
// Snapshot what we need under lock.
|
||
packets := make([]*StoreTx, len(store.packets))
|
||
copy(packets, store.packets)
|
||
store.mu.RUnlock()
|
||
|
||
// Build prefix map for candidate resolution.
|
||
// Use cached nodes+PM (avoids DB call if cache is fresh).
|
||
allNodes, pm := store.getCachedNodesAndPM()
|
||
|
||
// Index node geo for upsertEdge geo-sanity checks (issue #1228).
|
||
geo := make(map[string]nodeGeoInfo, len(allNodes))
|
||
for _, n := range allNodes {
|
||
geo[strings.ToLower(n.PublicKey)] = nodeGeoInfo{Lat: n.Lat, Lon: n.Lon, HasGPS: n.HasGPS}
|
||
}
|
||
g.mu.Lock()
|
||
g.nodeGeo = geo
|
||
g.mu.Unlock()
|
||
|
||
// Local cache for strings.ToLower — pubkeys are immutable and repeat
|
||
// across hundreds of thousands of observations.
|
||
lowerCache := make(map[string]string, 256)
|
||
|
||
// Phase 1: Extract edges from every transmission + observation.
|
||
for _, tx := range packets {
|
||
isAdvert := tx.PayloadType != nil && *tx.PayloadType == PayloadADVERT
|
||
fromNode := extractFromNode(tx)
|
||
// Pre-compute lowered originator once per tx (not per observation).
|
||
fromLower := ""
|
||
if fromNode != "" {
|
||
fromLower = cachedToLower(lowerCache, fromNode)
|
||
}
|
||
|
||
for _, obs := range tx.Observations {
|
||
path := parsePathJSON(obs.PathJSON)
|
||
observerPK := cachedToLower(lowerCache, obs.ObserverID)
|
||
|
||
if len(path) == 0 {
|
||
// Zero-hop
|
||
if isAdvert && fromLower != "" {
|
||
if fromLower != observerPK { // self-edge guard
|
||
g.upsertEdge(fromLower, observerPK, "", observerPK, obs.SNR, parseTimestamp(obs.Timestamp))
|
||
}
|
||
}
|
||
continue
|
||
}
|
||
|
||
// Edge 1: originator ↔ path[0] — ADVERTs only
|
||
if isAdvert && fromLower != "" {
|
||
firstHop := cachedToLower(lowerCache, path[0])
|
||
if fromLower != firstHop { // self-edge guard (shouldn't happen but spec says check)
|
||
candidates := pm.m[firstHop]
|
||
g.upsertEdgeWithCandidates(fromLower, firstHop, candidates, observerPK, obs.SNR, parseTimestamp(obs.Timestamp), lowerCache)
|
||
}
|
||
}
|
||
|
||
// Edge 2: observer ↔ path[last] — ALL packet types
|
||
lastHop := cachedToLower(lowerCache, path[len(path)-1])
|
||
if observerPK != lastHop { // self-edge guard
|
||
candidates := pm.m[lastHop]
|
||
g.upsertEdgeWithCandidates(observerPK, lastHop, candidates, observerPK, obs.SNR, parseTimestamp(obs.Timestamp), lowerCache)
|
||
}
|
||
}
|
||
}
|
||
|
||
// Phase 1.5: Resolve ambiguous edges using full graph context.
|
||
resolveAmbiguousEdges(pm, g)
|
||
|
||
// Phase 2: Disambiguation via Jaccard similarity.
|
||
g.disambiguate()
|
||
|
||
g.mu.Lock()
|
||
g.builtAt = time.Now()
|
||
g.mu.Unlock()
|
||
|
||
return g
|
||
}
|
||
|
||
// extractFromNode pulls the originator pubkey from a StoreTx's DecodedJSON.
|
||
// ADVERTs use "pubKey", other packets may use "from_node" or "from".
|
||
// Uses the cached ParsedDecoded() accessor to avoid repeated json.Unmarshal.
|
||
func extractFromNode(tx *StoreTx) string {
|
||
decoded := tx.ParsedDecoded()
|
||
if decoded == nil {
|
||
return ""
|
||
}
|
||
// ADVERTs store the originator pubkey as "pubKey"; other packets may use
|
||
// "from_node" or "from". Check all three so we never miss the originator.
|
||
for _, field := range []string{"pubKey", "from_node", "from"} {
|
||
if v, ok := decoded[field]; ok {
|
||
if s, ok := v.(string); ok && s != "" {
|
||
return s
|
||
}
|
||
}
|
||
}
|
||
return ""
|
||
}
|
||
|
||
// jsonUnmarshalFast is a thin wrapper; could be optimized later.
|
||
func jsonUnmarshalFast(data string, v interface{}) error {
|
||
return json.Unmarshal([]byte(data), v)
|
||
}
|
||
|
||
// upsertEdge adds/updates an edge between two fully-known pubkeys.
|
||
func (g *NeighborGraph) upsertEdge(pubkeyA, pubkeyB, prefix, observer string, snr *float64, ts time.Time) {
|
||
// Geo-sanity guard (issue #1228): if both endpoints have known GPS and
|
||
// the haversine distance exceeds the configured threshold, drop the
|
||
// edge. When either lacks GPS we have no signal and accept.
|
||
if g.shouldRejectGeoFar(pubkeyA, pubkeyB) {
|
||
atomic.AddUint64(&g.RejectedEdgesGeoFar, 1)
|
||
return
|
||
}
|
||
key := makeEdgeKey(pubkeyA, pubkeyB)
|
||
|
||
g.mu.Lock()
|
||
defer g.mu.Unlock()
|
||
|
||
e, exists := g.edges[key]
|
||
if !exists {
|
||
e = &NeighborEdge{
|
||
NodeA: key.A,
|
||
NodeB: key.B,
|
||
Prefix: prefix,
|
||
Observers: make(map[string]bool),
|
||
FirstSeen: ts,
|
||
LastSeen: ts,
|
||
}
|
||
g.edges[key] = e
|
||
g.byNode[key.A] = append(g.byNode[key.A], e)
|
||
g.byNode[key.B] = append(g.byNode[key.B], e)
|
||
}
|
||
|
||
e.Count++
|
||
if ts.After(e.LastSeen) {
|
||
e.LastSeen = ts
|
||
}
|
||
if ts.Before(e.FirstSeen) {
|
||
e.FirstSeen = ts
|
||
}
|
||
if snr != nil {
|
||
e.SNRSum += *snr
|
||
e.SNRCount++
|
||
}
|
||
if observer != "" {
|
||
e.Observers[observer] = true
|
||
}
|
||
}
|
||
|
||
// upsertEdgeWithCandidates handles prefix-based edges that may be ambiguous.
|
||
func (g *NeighborGraph) upsertEdgeWithCandidates(knownPK, prefix string, candidates []nodeInfo, observer string, snr *float64, ts time.Time, lc map[string]string) {
|
||
if len(candidates) == 1 {
|
||
resolved := cachedToLower(lc, candidates[0].PublicKey)
|
||
if resolved == knownPK {
|
||
return // self-edge guard
|
||
}
|
||
g.upsertEdge(knownPK, resolved, prefix, observer, snr, ts)
|
||
return
|
||
}
|
||
|
||
// Filter out self from candidates
|
||
filtered := make([]string, 0, len(candidates))
|
||
for _, c := range candidates {
|
||
pk := cachedToLower(lc, c.PublicKey)
|
||
if pk != knownPK {
|
||
filtered = append(filtered, pk)
|
||
}
|
||
}
|
||
|
||
if len(filtered) == 1 {
|
||
g.upsertEdge(knownPK, filtered[0], prefix, observer, snr, ts)
|
||
return
|
||
}
|
||
|
||
// Ambiguous or orphan: use prefix-based key
|
||
pseudoB := "prefix:" + prefix
|
||
key := makeEdgeKey(knownPK, pseudoB)
|
||
|
||
g.mu.Lock()
|
||
defer g.mu.Unlock()
|
||
|
||
e, exists := g.edges[key]
|
||
if !exists {
|
||
e = &NeighborEdge{
|
||
NodeA: key.A,
|
||
NodeB: "",
|
||
Prefix: prefix,
|
||
Observers: make(map[string]bool),
|
||
Ambiguous: true,
|
||
Candidates: filtered,
|
||
FirstSeen: ts,
|
||
LastSeen: ts,
|
||
}
|
||
g.edges[key] = e
|
||
g.byNode[knownPK] = append(g.byNode[knownPK], e)
|
||
}
|
||
|
||
e.Count++
|
||
if ts.After(e.LastSeen) {
|
||
e.LastSeen = ts
|
||
}
|
||
if ts.Before(e.FirstSeen) {
|
||
e.FirstSeen = ts
|
||
}
|
||
if snr != nil {
|
||
e.SNRSum += *snr
|
||
e.SNRCount++
|
||
}
|
||
if observer != "" {
|
||
e.Observers[observer] = true
|
||
}
|
||
}
|
||
|
||
// ─── Phase 1.5: Context-based resolution of ambiguous edges ────────────────────
|
||
|
||
// resolveAmbiguousEdges attempts to resolve ambiguous prefix edges using the
|
||
// fully-built graph context. Called after Phase 1 (edge collection) completes
|
||
// so that affinity and geo proximity tiers have full neighbor data.
|
||
func resolveAmbiguousEdges(pm *prefixMap, graph *NeighborGraph) {
|
||
// Step 1: Collect ambiguous edges under read lock.
|
||
graph.mu.RLock()
|
||
type ambiguousEntry struct {
|
||
key edgeKey
|
||
edge *NeighborEdge
|
||
knownNode string
|
||
prefix string
|
||
}
|
||
var ambiguous []ambiguousEntry
|
||
for key, e := range graph.edges {
|
||
if !e.Ambiguous {
|
||
continue
|
||
}
|
||
knownNode := e.NodeA
|
||
if strings.HasPrefix(e.NodeA, "prefix:") {
|
||
knownNode = e.NodeB
|
||
}
|
||
if knownNode == "" {
|
||
continue
|
||
}
|
||
ambiguous = append(ambiguous, ambiguousEntry{key, e, knownNode, e.Prefix})
|
||
}
|
||
graph.mu.RUnlock()
|
||
|
||
// Step 2: Resolve each (no lock needed — resolveWithContext takes its own RLock).
|
||
type resolution struct {
|
||
ambiguousEntry
|
||
resolvedPK string
|
||
}
|
||
var resolutions []resolution
|
||
for _, ae := range ambiguous {
|
||
resolved, confidence, _ := pm.resolveWithContext(ae.prefix, []string{ae.knownNode}, graph)
|
||
if resolved == nil || confidence == "no_match" || confidence == "observation_count_fallback" || confidence == "gps_preference" {
|
||
continue
|
||
}
|
||
rpk := strings.ToLower(resolved.PublicKey)
|
||
if rpk == ae.knownNode {
|
||
continue // self-edge guard
|
||
}
|
||
resolutions = append(resolutions, resolution{ae, rpk})
|
||
}
|
||
|
||
// Step 3: Apply resolutions under write lock.
|
||
if len(resolutions) == 0 {
|
||
return
|
||
}
|
||
graph.mu.Lock()
|
||
for _, r := range resolutions {
|
||
// Verify edge still exists and is still ambiguous (could have been
|
||
// resolved by a prior iteration if two ambiguous edges resolve to same target).
|
||
e, ok := graph.edges[r.key]
|
||
if !ok || !e.Ambiguous {
|
||
continue
|
||
}
|
||
graph.resolveEdge(r.key, e, r.knownNode, r.resolvedPK)
|
||
}
|
||
graph.mu.Unlock()
|
||
}
|
||
|
||
// ─── Disambiguation ────────────────────────────────────────────────────────────
|
||
|
||
// disambiguate resolves ambiguous edges using Jaccard similarity of neighbor sets.
|
||
// Only fully-resolved edges are used as evidence (transitivity poisoning guard).
|
||
func (g *NeighborGraph) disambiguate() {
|
||
g.mu.Lock()
|
||
defer g.mu.Unlock()
|
||
|
||
// Build resolved neighbor sets: for each node, collect the set of nodes
|
||
// it has fully-resolved (non-ambiguous) edges with.
|
||
resolvedNeighbors := make(map[string]map[string]bool)
|
||
for _, e := range g.edges {
|
||
if e.Ambiguous || e.NodeB == "" {
|
||
continue
|
||
}
|
||
if resolvedNeighbors[e.NodeA] == nil {
|
||
resolvedNeighbors[e.NodeA] = make(map[string]bool)
|
||
}
|
||
if resolvedNeighbors[e.NodeB] == nil {
|
||
resolvedNeighbors[e.NodeB] = make(map[string]bool)
|
||
}
|
||
resolvedNeighbors[e.NodeA][e.NodeB] = true
|
||
resolvedNeighbors[e.NodeB][e.NodeA] = true
|
||
}
|
||
|
||
// Try to resolve each ambiguous edge.
|
||
for key, e := range g.edges {
|
||
if !e.Ambiguous || len(e.Candidates) < 2 {
|
||
continue
|
||
}
|
||
if e.Count < affinityMinObservations {
|
||
continue
|
||
}
|
||
|
||
// Determine the known node (the one that's a real pubkey, not the prefix side).
|
||
knownNode := e.NodeA
|
||
if strings.HasPrefix(e.NodeA, "prefix:") {
|
||
knownNode = e.NodeB
|
||
}
|
||
// If knownNode is empty (shouldn't happen for ambiguous edges with candidates), skip.
|
||
if knownNode == "" {
|
||
continue
|
||
}
|
||
|
||
knownNeighbors := resolvedNeighbors[knownNode]
|
||
|
||
type scored struct {
|
||
pubkey string
|
||
jaccard float64
|
||
}
|
||
var scores []scored
|
||
|
||
for _, cand := range e.Candidates {
|
||
candNeighbors := resolvedNeighbors[cand]
|
||
j := jaccardSimilarity(knownNeighbors, candNeighbors)
|
||
scores = append(scores, scored{cand, j})
|
||
}
|
||
|
||
if len(scores) < 2 {
|
||
continue
|
||
}
|
||
|
||
// Find best and second-best.
|
||
best, secondBest := scores[0], scores[1]
|
||
if secondBest.jaccard > best.jaccard {
|
||
best, secondBest = secondBest, best
|
||
}
|
||
for i := 2; i < len(scores); i++ {
|
||
if scores[i].jaccard > best.jaccard {
|
||
secondBest = best
|
||
best = scores[i]
|
||
} else if scores[i].jaccard > secondBest.jaccard {
|
||
secondBest = scores[i]
|
||
}
|
||
}
|
||
|
||
// Auto-resolve only if best >= 3× second-best AND enough observations.
|
||
if secondBest.jaccard == 0 {
|
||
// If second-best is 0 and best > 0, ratio is infinite → resolve.
|
||
if best.jaccard > 0 {
|
||
if g.logFn != nil {
|
||
g.logFn(e.Prefix, fmt.Sprintf("%s score=%d Jaccard=%.2f vs %s score=%d Jaccard=%.2f → neighbor_affinity (ratio ∞)",
|
||
best.pubkey[:minLen(best.pubkey, 8)], e.Count, best.jaccard,
|
||
secondBest.pubkey[:minLen(secondBest.pubkey, 8)], e.Count, secondBest.jaccard))
|
||
}
|
||
g.resolveEdge(key, e, knownNode, best.pubkey)
|
||
}
|
||
} else if best.jaccard/secondBest.jaccard >= affinityConfidenceRatio {
|
||
ratio := best.jaccard / secondBest.jaccard
|
||
if g.logFn != nil {
|
||
g.logFn(e.Prefix, fmt.Sprintf("%s score=%d Jaccard=%.2f vs %s score=%d Jaccard=%.2f → neighbor_affinity (ratio %.1f×)",
|
||
best.pubkey[:minLen(best.pubkey, 8)], e.Count, best.jaccard,
|
||
secondBest.pubkey[:minLen(secondBest.pubkey, 8)], e.Count, secondBest.jaccard, ratio))
|
||
}
|
||
g.resolveEdge(key, e, knownNode, best.pubkey)
|
||
} else {
|
||
// Ambiguous
|
||
if g.logFn != nil {
|
||
ratio := 0.0
|
||
if secondBest.jaccard > 0 {
|
||
ratio = best.jaccard / secondBest.jaccard
|
||
}
|
||
g.logFn(e.Prefix, fmt.Sprintf("scores too close (Jaccard %.2f vs %.2f, ratio %.1f×) → ambiguous, returning %d candidates",
|
||
best.jaccard, secondBest.jaccard, ratio, len(e.Candidates)))
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// resolveEdge converts an ambiguous edge to a resolved one.
|
||
// Must be called with g.mu held.
|
||
func (g *NeighborGraph) resolveEdge(oldKey edgeKey, e *NeighborEdge, knownNode, resolvedPK string) {
|
||
// Remove old edge.
|
||
delete(g.edges, oldKey)
|
||
g.removeFromByNode(oldKey.A, e)
|
||
g.removeFromByNode(oldKey.B, e)
|
||
|
||
// Update edge.
|
||
newKey := makeEdgeKey(knownNode, resolvedPK)
|
||
e.NodeA = newKey.A
|
||
e.NodeB = newKey.B
|
||
e.Ambiguous = false
|
||
e.Resolved = true
|
||
|
||
// Merge with existing edge if any.
|
||
if existing, ok := g.edges[newKey]; ok {
|
||
existing.Count += e.Count
|
||
if e.LastSeen.After(existing.LastSeen) {
|
||
existing.LastSeen = e.LastSeen
|
||
}
|
||
if e.FirstSeen.Before(existing.FirstSeen) {
|
||
existing.FirstSeen = e.FirstSeen
|
||
}
|
||
existing.SNRSum += e.SNRSum
|
||
existing.SNRCount += e.SNRCount
|
||
for obs := range e.Observers {
|
||
existing.Observers[obs] = true
|
||
}
|
||
return
|
||
}
|
||
|
||
g.edges[newKey] = e
|
||
g.byNode[newKey.A] = append(g.byNode[newKey.A], e)
|
||
g.byNode[newKey.B] = append(g.byNode[newKey.B], e)
|
||
}
|
||
|
||
// removeFromByNode removes an edge from the byNode index for the given key.
|
||
func (g *NeighborGraph) removeFromByNode(nodeKey string, edge *NeighborEdge) {
|
||
edges := g.byNode[nodeKey]
|
||
for i, e := range edges {
|
||
if e == edge {
|
||
g.byNode[nodeKey] = append(edges[:i], edges[i+1:]...)
|
||
return
|
||
}
|
||
}
|
||
}
|
||
|
||
// jaccardSimilarity computes |A ∩ B| / |A ∪ B|.
|
||
func jaccardSimilarity(a, b map[string]bool) float64 {
|
||
if len(a) == 0 && len(b) == 0 {
|
||
return 0
|
||
}
|
||
intersection := 0
|
||
for k := range a {
|
||
if b[k] {
|
||
intersection++
|
||
}
|
||
}
|
||
union := len(a) + len(b) - intersection
|
||
if union == 0 {
|
||
return 0
|
||
}
|
||
return float64(intersection) / float64(union)
|
||
}
|
||
|
||
// parseTimestamp parses a timestamp string into time.Time.
|
||
func parseTimestamp(s string) time.Time {
|
||
// Try common formats.
|
||
for _, fmt := range []string{
|
||
time.RFC3339,
|
||
"2006-01-02T15:04:05Z",
|
||
"2006-01-02 15:04:05",
|
||
"2006-01-02T15:04:05.000Z",
|
||
} {
|
||
if t, err := time.Parse(fmt, s); err == nil {
|
||
return t
|
||
}
|
||
}
|
||
return time.Time{}
|
||
}
|
||
|
||
|
||
// minLen returns the smaller of n and len(s).
|
||
func minLen(s string, n int) int {
|
||
if len(s) < n {
|
||
return len(s)
|
||
}
|
||
return n
|
||
}
|
||
|
||
// PruneOlderThan removes all edges with LastSeen before cutoff.
|
||
// Returns the number of edges removed.
|
||
func (g *NeighborGraph) PruneOlderThan(cutoff time.Time) int {
|
||
g.mu.Lock()
|
||
defer g.mu.Unlock()
|
||
|
||
pruned := 0
|
||
for key, edge := range g.edges {
|
||
if edge.LastSeen.Before(cutoff) {
|
||
// Remove from byNode index
|
||
g.removeFromByNode(edge.NodeA, edge)
|
||
if edge.NodeB != "" {
|
||
g.removeFromByNode(edge.NodeB, edge)
|
||
}
|
||
delete(g.edges, key)
|
||
pruned++
|
||
}
|
||
}
|
||
return pruned
|
||
}
|
||
|
||
// shouldRejectGeoFar reports whether the edge (a, b) is geographically
|
||
// implausible under the configured threshold. Both endpoints must have known
|
||
// GPS to trigger a rejection; if either lacks GPS the edge is accepted
|
||
// (issue #1228 — "no signal to reject").
|
||
//
|
||
// All log output is PII-truncated to the first 8 hex chars of each pubkey.
|
||
func (g *NeighborGraph) shouldRejectGeoFar(a, b string) bool {
|
||
if g == nil || g.maxEdgeKm <= 0 || g.nodeGeo == nil {
|
||
return false
|
||
}
|
||
if strings.HasPrefix(a, "prefix:") || strings.HasPrefix(b, "prefix:") {
|
||
return false
|
||
}
|
||
ga, oka := g.nodeGeo[a]
|
||
gb, okb := g.nodeGeo[b]
|
||
if !oka || !okb || !ga.HasGPS || !gb.HasGPS {
|
||
return false
|
||
}
|
||
d := haversineKm(ga.Lat, ga.Lon, gb.Lat, gb.Lon)
|
||
if d <= g.maxEdgeKm {
|
||
return false
|
||
}
|
||
// PII-truncated INFO log (8-char prefix max).
|
||
log.Printf("[neighbor-graph] reject geo-far edge %s↔%s distance=%.0fkm threshold=%.0fkm",
|
||
piiTruncPubkey(a), piiTruncPubkey(b), d, g.maxEdgeKm)
|
||
return true
|
||
}
|
||
|
||
// piiTruncPubkey returns at most the first 8 hex chars of a pubkey for log
|
||
// output. The repo is public and observer/node pubkeys are PII-adjacent.
|
||
func piiTruncPubkey(pk string) string {
|
||
if len(pk) <= 8 {
|
||
return pk
|
||
}
|
||
return pk[:8]
|
||
}
|