mirror of
https://github.com/Kpa-clawbot/meshcore-analyzer.git
synced 2026-03-30 13:35:42 +00:00
Compare commits
4 Commits
optimize/f
...
fix/live-v
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8cee2da2e1 | ||
|
|
b7210e4e31 | ||
|
|
ab6fbec158 | ||
|
|
39c4c3e16e |
File diff suppressed because it is too large
Load Diff
@@ -17,6 +17,8 @@ func setupTestDB(t *testing.T) *DB {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// Force single connection so all goroutines share the same in-memory DB
|
||||
conn.SetMaxOpenConns(1)
|
||||
|
||||
// Create schema matching MeshCore Analyzer v3
|
||||
schema := `
|
||||
|
||||
@@ -1,403 +1,506 @@
|
||||
package main
|
||||
|
||||
// parity_test.go — Golden fixture shape tests.
|
||||
// Validates that Go API responses match the shape of Node.js API responses.
|
||||
// Shapes were captured from the production Node.js server and stored in
|
||||
// testdata/golden/shapes.json.
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// shapeSpec describes the expected JSON structure from the Node.js server.
|
||||
type shapeSpec struct {
|
||||
Type string `json:"type"`
|
||||
Keys map[string]shapeSpec `json:"keys,omitempty"`
|
||||
ElementShape *shapeSpec `json:"elementShape,omitempty"`
|
||||
DynamicKeys bool `json:"dynamicKeys,omitempty"`
|
||||
ValueShape *shapeSpec `json:"valueShape,omitempty"`
|
||||
RequiredKeys map[string]shapeSpec `json:"requiredKeys,omitempty"`
|
||||
}
|
||||
|
||||
// loadShapes reads testdata/golden/shapes.json relative to this source file.
|
||||
func loadShapes(t *testing.T) map[string]shapeSpec {
|
||||
t.Helper()
|
||||
_, thisFile, _, _ := runtime.Caller(0)
|
||||
dir := filepath.Dir(thisFile)
|
||||
data, err := os.ReadFile(filepath.Join(dir, "testdata", "golden", "shapes.json"))
|
||||
if err != nil {
|
||||
t.Fatalf("cannot load shapes.json: %v", err)
|
||||
}
|
||||
var shapes map[string]shapeSpec
|
||||
if err := json.Unmarshal(data, &shapes); err != nil {
|
||||
t.Fatalf("cannot parse shapes.json: %v", err)
|
||||
}
|
||||
return shapes
|
||||
}
|
||||
|
||||
// validateShape recursively checks that `actual` matches the expected `spec`.
|
||||
// `path` tracks the JSON path for error messages.
|
||||
// Returns a list of mismatch descriptions.
|
||||
func validateShape(actual interface{}, spec shapeSpec, path string) []string {
|
||||
var errs []string
|
||||
|
||||
switch spec.Type {
|
||||
case "null", "nullable":
|
||||
// nullable means: value can be null OR matching type. Accept anything.
|
||||
return nil
|
||||
case "nullable_number":
|
||||
// Can be null or number
|
||||
if actual != nil {
|
||||
if _, ok := actual.(float64); !ok {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected number or null, got %T", path, actual))
|
||||
}
|
||||
}
|
||||
return errs
|
||||
case "string":
|
||||
if actual == nil {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected string, got null", path))
|
||||
} else if _, ok := actual.(string); !ok {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected string, got %T", path, actual))
|
||||
}
|
||||
case "number":
|
||||
if actual == nil {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected number, got null", path))
|
||||
} else if _, ok := actual.(float64); !ok {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected number, got %T (%v)", path, actual, actual))
|
||||
}
|
||||
case "boolean":
|
||||
if actual == nil {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected boolean, got null", path))
|
||||
} else if _, ok := actual.(bool); !ok {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected boolean, got %T", path, actual))
|
||||
}
|
||||
case "array":
|
||||
if actual == nil {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected array, got null (arrays must be [] not null)", path))
|
||||
return errs
|
||||
}
|
||||
arr, ok := actual.([]interface{})
|
||||
if !ok {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected array, got %T", path, actual))
|
||||
return errs
|
||||
}
|
||||
if spec.ElementShape != nil && len(arr) > 0 {
|
||||
errs = append(errs, validateShape(arr[0], *spec.ElementShape, path+"[0]")...)
|
||||
}
|
||||
case "object":
|
||||
if actual == nil {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected object, got null", path))
|
||||
return errs
|
||||
}
|
||||
obj, ok := actual.(map[string]interface{})
|
||||
if !ok {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected object, got %T", path, actual))
|
||||
return errs
|
||||
}
|
||||
|
||||
if spec.DynamicKeys {
|
||||
// Object with dynamic keys — validate value shapes
|
||||
if spec.ValueShape != nil && len(obj) > 0 {
|
||||
for k, v := range obj {
|
||||
errs = append(errs, validateShape(v, *spec.ValueShape, path+"."+k)...)
|
||||
break // check just one sample
|
||||
}
|
||||
}
|
||||
if spec.RequiredKeys != nil {
|
||||
for rk, rs := range spec.RequiredKeys {
|
||||
v, exists := obj[rk]
|
||||
if !exists {
|
||||
errs = append(errs, fmt.Sprintf("%s: missing required key %q in dynamic-key object", path, rk))
|
||||
} else {
|
||||
errs = append(errs, validateShape(v, rs, path+"."+rk)...)
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if spec.Keys != nil {
|
||||
// Object with known keys — check each expected key exists and has correct type
|
||||
for key, keySpec := range spec.Keys {
|
||||
val, exists := obj[key]
|
||||
if !exists {
|
||||
errs = append(errs, fmt.Sprintf("%s: missing field %q (expected %s)", path, key, keySpec.Type))
|
||||
} else {
|
||||
errs = append(errs, validateShape(val, keySpec, path+"."+key)...)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return errs
|
||||
}
|
||||
|
||||
// parityEndpoint defines one endpoint to test for parity.
|
||||
type parityEndpoint struct {
|
||||
name string // key in shapes.json
|
||||
path string // HTTP path to request
|
||||
}
|
||||
|
||||
func TestParityShapes(t *testing.T) {
|
||||
shapes := loadShapes(t)
|
||||
_, router := setupTestServer(t)
|
||||
|
||||
endpoints := []parityEndpoint{
|
||||
{"stats", "/api/stats"},
|
||||
{"nodes", "/api/nodes?limit=5"},
|
||||
{"packets", "/api/packets?limit=5"},
|
||||
{"packets_grouped", "/api/packets?limit=5&groupByHash=true"},
|
||||
{"observers", "/api/observers"},
|
||||
{"channels", "/api/channels"},
|
||||
{"channel_messages", "/api/channels/0000000000000000/messages?limit=5"},
|
||||
{"analytics_rf", "/api/analytics/rf?days=7"},
|
||||
{"analytics_topology", "/api/analytics/topology?days=7"},
|
||||
{"analytics_hash_sizes", "/api/analytics/hash-sizes?days=7"},
|
||||
{"analytics_distance", "/api/analytics/distance?days=7"},
|
||||
{"analytics_subpaths", "/api/analytics/subpaths?days=7"},
|
||||
{"bulk_health", "/api/nodes/bulk-health"},
|
||||
{"health", "/api/health"},
|
||||
{"perf", "/api/perf"},
|
||||
}
|
||||
|
||||
for _, ep := range endpoints {
|
||||
t.Run("Parity_"+ep.name, func(t *testing.T) {
|
||||
spec, ok := shapes[ep.name]
|
||||
if !ok {
|
||||
t.Fatalf("no shape spec found for %q in shapes.json", ep.name)
|
||||
}
|
||||
|
||||
req := httptest.NewRequest("GET", ep.path, nil)
|
||||
w := httptest.NewRecorder()
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
if w.Code != 200 {
|
||||
t.Fatalf("GET %s returned %d, expected 200. Body: %s",
|
||||
ep.path, w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var body interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &body); err != nil {
|
||||
t.Fatalf("GET %s returned invalid JSON: %v\nBody: %s",
|
||||
ep.path, err, w.Body.String())
|
||||
}
|
||||
|
||||
mismatches := validateShape(body, spec, ep.path)
|
||||
if len(mismatches) > 0 {
|
||||
t.Errorf("Go %s has %d shape mismatches vs Node.js golden:\n %s",
|
||||
ep.path, len(mismatches), strings.Join(mismatches, "\n "))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestParityNodeDetail tests node detail endpoint shape.
|
||||
// Uses a known test node public key from seeded data.
|
||||
func TestParityNodeDetail(t *testing.T) {
|
||||
shapes := loadShapes(t)
|
||||
_, router := setupTestServer(t)
|
||||
|
||||
spec, ok := shapes["node_detail"]
|
||||
if !ok {
|
||||
t.Fatal("no shape spec for node_detail in shapes.json")
|
||||
}
|
||||
|
||||
req := httptest.NewRequest("GET", "/api/nodes/aabbccdd11223344", nil)
|
||||
w := httptest.NewRecorder()
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
if w.Code != 200 {
|
||||
t.Fatalf("node detail returned %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var body interface{}
|
||||
json.Unmarshal(w.Body.Bytes(), &body)
|
||||
|
||||
mismatches := validateShape(body, spec, "/api/nodes/{pubkey}")
|
||||
if len(mismatches) > 0 {
|
||||
t.Errorf("Go node detail has %d shape mismatches vs Node.js golden:\n %s",
|
||||
len(mismatches), strings.Join(mismatches, "\n "))
|
||||
}
|
||||
}
|
||||
|
||||
// TestParityArraysNotNull verifies that array-typed fields in Go responses are
|
||||
// [] (empty array) rather than null. This is a common Go/JSON pitfall where
|
||||
// nil slices marshal as null instead of [].
|
||||
// Uses shapes.json to know which fields SHOULD be arrays.
|
||||
func TestParityArraysNotNull(t *testing.T) {
|
||||
shapes := loadShapes(t)
|
||||
_, router := setupTestServer(t)
|
||||
|
||||
endpoints := []struct {
|
||||
name string
|
||||
path string
|
||||
}{
|
||||
{"stats", "/api/stats"},
|
||||
{"nodes", "/api/nodes?limit=5"},
|
||||
{"packets", "/api/packets?limit=5"},
|
||||
{"packets_grouped", "/api/packets?limit=5&groupByHash=true"},
|
||||
{"observers", "/api/observers"},
|
||||
{"channels", "/api/channels"},
|
||||
{"bulk_health", "/api/nodes/bulk-health"},
|
||||
{"analytics_rf", "/api/analytics/rf?days=7"},
|
||||
{"analytics_topology", "/api/analytics/topology?days=7"},
|
||||
{"analytics_hash_sizes", "/api/analytics/hash-sizes?days=7"},
|
||||
{"analytics_distance", "/api/analytics/distance?days=7"},
|
||||
{"analytics_subpaths", "/api/analytics/subpaths?days=7"},
|
||||
}
|
||||
|
||||
for _, ep := range endpoints {
|
||||
t.Run("NullArrayCheck_"+ep.name, func(t *testing.T) {
|
||||
spec, ok := shapes[ep.name]
|
||||
if !ok {
|
||||
t.Skipf("no shape spec for %s", ep.name)
|
||||
}
|
||||
|
||||
req := httptest.NewRequest("GET", ep.path, nil)
|
||||
w := httptest.NewRecorder()
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
if w.Code != 200 {
|
||||
t.Skipf("GET %s returned %d, skipping null-array check", ep.path, w.Code)
|
||||
}
|
||||
|
||||
var body interface{}
|
||||
json.Unmarshal(w.Body.Bytes(), &body)
|
||||
|
||||
nullArrays := findNullArrays(body, spec, ep.path)
|
||||
if len(nullArrays) > 0 {
|
||||
t.Errorf("Go %s has null where [] expected:\n %s\n"+
|
||||
"Go nil slices marshal as null — initialize with make() or literal",
|
||||
ep.path, strings.Join(nullArrays, "\n "))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// findNullArrays walks JSON data alongside a shape spec and returns paths
|
||||
// where the spec says the field should be an array but Go returned null.
|
||||
func findNullArrays(actual interface{}, spec shapeSpec, path string) []string {
|
||||
var nulls []string
|
||||
|
||||
switch spec.Type {
|
||||
case "array":
|
||||
if actual == nil {
|
||||
nulls = append(nulls, fmt.Sprintf("%s: null (should be [])", path))
|
||||
} else if arr, ok := actual.([]interface{}); ok && spec.ElementShape != nil {
|
||||
for i, elem := range arr {
|
||||
nulls = append(nulls, findNullArrays(elem, *spec.ElementShape, fmt.Sprintf("%s[%d]", path, i))...)
|
||||
}
|
||||
}
|
||||
case "object":
|
||||
obj, ok := actual.(map[string]interface{})
|
||||
if !ok || obj == nil {
|
||||
return nulls
|
||||
}
|
||||
if spec.Keys != nil {
|
||||
for key, keySpec := range spec.Keys {
|
||||
if val, exists := obj[key]; exists {
|
||||
nulls = append(nulls, findNullArrays(val, keySpec, path+"."+key)...)
|
||||
} else if keySpec.Type == "array" {
|
||||
// Key missing entirely — also a null-array problem
|
||||
nulls = append(nulls, fmt.Sprintf("%s.%s: missing (should be [])", path, key))
|
||||
}
|
||||
}
|
||||
}
|
||||
if spec.DynamicKeys && spec.ValueShape != nil {
|
||||
for k, v := range obj {
|
||||
nulls = append(nulls, findNullArrays(v, *spec.ValueShape, path+"."+k)...)
|
||||
break // sample one
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nulls
|
||||
}
|
||||
|
||||
// TestParityHealthEngine verifies Go health endpoint declares engine=go
|
||||
// while Node declares engine=node (or omits it). The Go server must always
|
||||
// identify itself.
|
||||
func TestParityHealthEngine(t *testing.T) {
|
||||
_, router := setupTestServer(t)
|
||||
|
||||
req := httptest.NewRequest("GET", "/api/health", nil)
|
||||
w := httptest.NewRecorder()
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
var body map[string]interface{}
|
||||
json.Unmarshal(w.Body.Bytes(), &body)
|
||||
|
||||
engine, ok := body["engine"]
|
||||
if !ok {
|
||||
t.Error("health response missing 'engine' field (Go server must include engine=go)")
|
||||
} else if engine != "go" {
|
||||
t.Errorf("health engine=%v, expected 'go'", engine)
|
||||
}
|
||||
}
|
||||
|
||||
// TestValidateShapeFunction directly tests the shape validator itself.
|
||||
func TestValidateShapeFunction(t *testing.T) {
|
||||
t.Run("string match", func(t *testing.T) {
|
||||
errs := validateShape("hello", shapeSpec{Type: "string"}, "$.x")
|
||||
if len(errs) != 0 {
|
||||
t.Errorf("unexpected errors: %v", errs)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("string mismatch", func(t *testing.T) {
|
||||
errs := validateShape(42.0, shapeSpec{Type: "string"}, "$.x")
|
||||
if len(errs) != 1 {
|
||||
t.Errorf("expected 1 error, got %d: %v", len(errs), errs)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("null array rejected", func(t *testing.T) {
|
||||
errs := validateShape(nil, shapeSpec{Type: "array"}, "$.arr")
|
||||
if len(errs) != 1 || !strings.Contains(errs[0], "null") {
|
||||
t.Errorf("expected null-array error, got: %v", errs)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("empty array OK", func(t *testing.T) {
|
||||
errs := validateShape([]interface{}{}, shapeSpec{Type: "array"}, "$.arr")
|
||||
if len(errs) != 0 {
|
||||
t.Errorf("unexpected errors for empty array: %v", errs)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("missing object key", func(t *testing.T) {
|
||||
spec := shapeSpec{Type: "object", Keys: map[string]shapeSpec{
|
||||
"name": {Type: "string"},
|
||||
"age": {Type: "number"},
|
||||
}}
|
||||
obj := map[string]interface{}{"name": "test"}
|
||||
errs := validateShape(obj, spec, "$.user")
|
||||
if len(errs) != 1 || !strings.Contains(errs[0], "age") {
|
||||
t.Errorf("expected missing age error, got: %v", errs)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("nullable allows null", func(t *testing.T) {
|
||||
errs := validateShape(nil, shapeSpec{Type: "nullable"}, "$.x")
|
||||
if len(errs) != 0 {
|
||||
t.Errorf("nullable should accept null: %v", errs)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("dynamic keys validates value shape", func(t *testing.T) {
|
||||
spec := shapeSpec{
|
||||
Type: "object",
|
||||
DynamicKeys: true,
|
||||
ValueShape: &shapeSpec{Type: "number"},
|
||||
}
|
||||
obj := map[string]interface{}{"a": 1.0, "b": 2.0}
|
||||
errs := validateShape(obj, spec, "$.dyn")
|
||||
if len(errs) != 0 {
|
||||
t.Errorf("unexpected errors: %v", errs)
|
||||
}
|
||||
})
|
||||
}
|
||||
package main
|
||||
|
||||
// parity_test.go — Golden fixture shape tests.
|
||||
// Validates that Go API responses match the shape of Node.js API responses.
|
||||
// Shapes were captured from the production Node.js server and stored in
|
||||
// testdata/golden/shapes.json.
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"sort"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// shapeSpec describes the expected JSON structure from the Node.js server.
|
||||
type shapeSpec struct {
|
||||
Type string `json:"type"`
|
||||
Keys map[string]shapeSpec `json:"keys,omitempty"`
|
||||
ElementShape *shapeSpec `json:"elementShape,omitempty"`
|
||||
DynamicKeys bool `json:"dynamicKeys,omitempty"`
|
||||
ValueShape *shapeSpec `json:"valueShape,omitempty"`
|
||||
RequiredKeys map[string]shapeSpec `json:"requiredKeys,omitempty"`
|
||||
}
|
||||
|
||||
// loadShapes reads testdata/golden/shapes.json relative to this source file.
|
||||
func loadShapes(t *testing.T) map[string]shapeSpec {
|
||||
t.Helper()
|
||||
_, thisFile, _, _ := runtime.Caller(0)
|
||||
dir := filepath.Dir(thisFile)
|
||||
data, err := os.ReadFile(filepath.Join(dir, "testdata", "golden", "shapes.json"))
|
||||
if err != nil {
|
||||
t.Fatalf("cannot load shapes.json: %v", err)
|
||||
}
|
||||
var shapes map[string]shapeSpec
|
||||
if err := json.Unmarshal(data, &shapes); err != nil {
|
||||
t.Fatalf("cannot parse shapes.json: %v", err)
|
||||
}
|
||||
return shapes
|
||||
}
|
||||
|
||||
// validateShape recursively checks that `actual` matches the expected `spec`.
|
||||
// `path` tracks the JSON path for error messages.
|
||||
// Returns a list of mismatch descriptions.
|
||||
func validateShape(actual interface{}, spec shapeSpec, path string) []string {
|
||||
var errs []string
|
||||
|
||||
switch spec.Type {
|
||||
case "null", "nullable":
|
||||
// nullable means: value can be null OR matching type. Accept anything.
|
||||
return nil
|
||||
case "nullable_number":
|
||||
// Can be null or number
|
||||
if actual != nil {
|
||||
if _, ok := actual.(float64); !ok {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected number or null, got %T", path, actual))
|
||||
}
|
||||
}
|
||||
return errs
|
||||
case "string":
|
||||
if actual == nil {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected string, got null", path))
|
||||
} else if _, ok := actual.(string); !ok {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected string, got %T", path, actual))
|
||||
}
|
||||
case "number":
|
||||
if actual == nil {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected number, got null", path))
|
||||
} else if _, ok := actual.(float64); !ok {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected number, got %T (%v)", path, actual, actual))
|
||||
}
|
||||
case "boolean":
|
||||
if actual == nil {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected boolean, got null", path))
|
||||
} else if _, ok := actual.(bool); !ok {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected boolean, got %T", path, actual))
|
||||
}
|
||||
case "array":
|
||||
if actual == nil {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected array, got null (arrays must be [] not null)", path))
|
||||
return errs
|
||||
}
|
||||
arr, ok := actual.([]interface{})
|
||||
if !ok {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected array, got %T", path, actual))
|
||||
return errs
|
||||
}
|
||||
if spec.ElementShape != nil && len(arr) > 0 {
|
||||
errs = append(errs, validateShape(arr[0], *spec.ElementShape, path+"[0]")...)
|
||||
}
|
||||
case "object":
|
||||
if actual == nil {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected object, got null", path))
|
||||
return errs
|
||||
}
|
||||
obj, ok := actual.(map[string]interface{})
|
||||
if !ok {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected object, got %T", path, actual))
|
||||
return errs
|
||||
}
|
||||
|
||||
if spec.DynamicKeys {
|
||||
// Object with dynamic keys — validate value shapes
|
||||
if spec.ValueShape != nil && len(obj) > 0 {
|
||||
for k, v := range obj {
|
||||
errs = append(errs, validateShape(v, *spec.ValueShape, path+"."+k)...)
|
||||
break // check just one sample
|
||||
}
|
||||
}
|
||||
if spec.RequiredKeys != nil {
|
||||
for rk, rs := range spec.RequiredKeys {
|
||||
v, exists := obj[rk]
|
||||
if !exists {
|
||||
errs = append(errs, fmt.Sprintf("%s: missing required key %q in dynamic-key object", path, rk))
|
||||
} else {
|
||||
errs = append(errs, validateShape(v, rs, path+"."+rk)...)
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if spec.Keys != nil {
|
||||
// Object with known keys — check each expected key exists and has correct type
|
||||
for key, keySpec := range spec.Keys {
|
||||
val, exists := obj[key]
|
||||
if !exists {
|
||||
errs = append(errs, fmt.Sprintf("%s: missing field %q (expected %s)", path, key, keySpec.Type))
|
||||
} else {
|
||||
errs = append(errs, validateShape(val, keySpec, path+"."+key)...)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return errs
|
||||
}
|
||||
|
||||
// parityEndpoint defines one endpoint to test for parity.
|
||||
type parityEndpoint struct {
|
||||
name string // key in shapes.json
|
||||
path string // HTTP path to request
|
||||
}
|
||||
|
||||
func TestParityShapes(t *testing.T) {
|
||||
shapes := loadShapes(t)
|
||||
_, router := setupTestServer(t)
|
||||
|
||||
endpoints := []parityEndpoint{
|
||||
{"stats", "/api/stats"},
|
||||
{"nodes", "/api/nodes?limit=5"},
|
||||
{"packets", "/api/packets?limit=5"},
|
||||
{"packets_grouped", "/api/packets?limit=5&groupByHash=true"},
|
||||
{"observers", "/api/observers"},
|
||||
{"channels", "/api/channels"},
|
||||
{"channel_messages", "/api/channels/0000000000000000/messages?limit=5"},
|
||||
{"analytics_rf", "/api/analytics/rf?days=7"},
|
||||
{"analytics_topology", "/api/analytics/topology?days=7"},
|
||||
{"analytics_hash_sizes", "/api/analytics/hash-sizes?days=7"},
|
||||
{"analytics_distance", "/api/analytics/distance?days=7"},
|
||||
{"analytics_subpaths", "/api/analytics/subpaths?days=7"},
|
||||
{"bulk_health", "/api/nodes/bulk-health"},
|
||||
{"health", "/api/health"},
|
||||
{"perf", "/api/perf"},
|
||||
}
|
||||
|
||||
for _, ep := range endpoints {
|
||||
t.Run("Parity_"+ep.name, func(t *testing.T) {
|
||||
spec, ok := shapes[ep.name]
|
||||
if !ok {
|
||||
t.Fatalf("no shape spec found for %q in shapes.json", ep.name)
|
||||
}
|
||||
|
||||
req := httptest.NewRequest("GET", ep.path, nil)
|
||||
w := httptest.NewRecorder()
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
if w.Code != 200 {
|
||||
t.Fatalf("GET %s returned %d, expected 200. Body: %s",
|
||||
ep.path, w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var body interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &body); err != nil {
|
||||
t.Fatalf("GET %s returned invalid JSON: %v\nBody: %s",
|
||||
ep.path, err, w.Body.String())
|
||||
}
|
||||
|
||||
mismatches := validateShape(body, spec, ep.path)
|
||||
if len(mismatches) > 0 {
|
||||
t.Errorf("Go %s has %d shape mismatches vs Node.js golden:\n %s",
|
||||
ep.path, len(mismatches), strings.Join(mismatches, "\n "))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestParityNodeDetail tests node detail endpoint shape.
|
||||
// Uses a known test node public key from seeded data.
|
||||
func TestParityNodeDetail(t *testing.T) {
|
||||
shapes := loadShapes(t)
|
||||
_, router := setupTestServer(t)
|
||||
|
||||
spec, ok := shapes["node_detail"]
|
||||
if !ok {
|
||||
t.Fatal("no shape spec for node_detail in shapes.json")
|
||||
}
|
||||
|
||||
req := httptest.NewRequest("GET", "/api/nodes/aabbccdd11223344", nil)
|
||||
w := httptest.NewRecorder()
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
if w.Code != 200 {
|
||||
t.Fatalf("node detail returned %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var body interface{}
|
||||
json.Unmarshal(w.Body.Bytes(), &body)
|
||||
|
||||
mismatches := validateShape(body, spec, "/api/nodes/{pubkey}")
|
||||
if len(mismatches) > 0 {
|
||||
t.Errorf("Go node detail has %d shape mismatches vs Node.js golden:\n %s",
|
||||
len(mismatches), strings.Join(mismatches, "\n "))
|
||||
}
|
||||
}
|
||||
|
||||
// TestParityArraysNotNull verifies that array-typed fields in Go responses are
|
||||
// [] (empty array) rather than null. This is a common Go/JSON pitfall where
|
||||
// nil slices marshal as null instead of [].
|
||||
// Uses shapes.json to know which fields SHOULD be arrays.
|
||||
func TestParityArraysNotNull(t *testing.T) {
|
||||
shapes := loadShapes(t)
|
||||
_, router := setupTestServer(t)
|
||||
|
||||
endpoints := []struct {
|
||||
name string
|
||||
path string
|
||||
}{
|
||||
{"stats", "/api/stats"},
|
||||
{"nodes", "/api/nodes?limit=5"},
|
||||
{"packets", "/api/packets?limit=5"},
|
||||
{"packets_grouped", "/api/packets?limit=5&groupByHash=true"},
|
||||
{"observers", "/api/observers"},
|
||||
{"channels", "/api/channels"},
|
||||
{"bulk_health", "/api/nodes/bulk-health"},
|
||||
{"analytics_rf", "/api/analytics/rf?days=7"},
|
||||
{"analytics_topology", "/api/analytics/topology?days=7"},
|
||||
{"analytics_hash_sizes", "/api/analytics/hash-sizes?days=7"},
|
||||
{"analytics_distance", "/api/analytics/distance?days=7"},
|
||||
{"analytics_subpaths", "/api/analytics/subpaths?days=7"},
|
||||
}
|
||||
|
||||
for _, ep := range endpoints {
|
||||
t.Run("NullArrayCheck_"+ep.name, func(t *testing.T) {
|
||||
spec, ok := shapes[ep.name]
|
||||
if !ok {
|
||||
t.Skipf("no shape spec for %s", ep.name)
|
||||
}
|
||||
|
||||
req := httptest.NewRequest("GET", ep.path, nil)
|
||||
w := httptest.NewRecorder()
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
if w.Code != 200 {
|
||||
t.Skipf("GET %s returned %d, skipping null-array check", ep.path, w.Code)
|
||||
}
|
||||
|
||||
var body interface{}
|
||||
json.Unmarshal(w.Body.Bytes(), &body)
|
||||
|
||||
nullArrays := findNullArrays(body, spec, ep.path)
|
||||
if len(nullArrays) > 0 {
|
||||
t.Errorf("Go %s has null where [] expected:\n %s\n"+
|
||||
"Go nil slices marshal as null — initialize with make() or literal",
|
||||
ep.path, strings.Join(nullArrays, "\n "))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// findNullArrays walks JSON data alongside a shape spec and returns paths
|
||||
// where the spec says the field should be an array but Go returned null.
|
||||
func findNullArrays(actual interface{}, spec shapeSpec, path string) []string {
|
||||
var nulls []string
|
||||
|
||||
switch spec.Type {
|
||||
case "array":
|
||||
if actual == nil {
|
||||
nulls = append(nulls, fmt.Sprintf("%s: null (should be [])", path))
|
||||
} else if arr, ok := actual.([]interface{}); ok && spec.ElementShape != nil {
|
||||
for i, elem := range arr {
|
||||
nulls = append(nulls, findNullArrays(elem, *spec.ElementShape, fmt.Sprintf("%s[%d]", path, i))...)
|
||||
}
|
||||
}
|
||||
case "object":
|
||||
obj, ok := actual.(map[string]interface{})
|
||||
if !ok || obj == nil {
|
||||
return nulls
|
||||
}
|
||||
if spec.Keys != nil {
|
||||
for key, keySpec := range spec.Keys {
|
||||
if val, exists := obj[key]; exists {
|
||||
nulls = append(nulls, findNullArrays(val, keySpec, path+"."+key)...)
|
||||
} else if keySpec.Type == "array" {
|
||||
// Key missing entirely — also a null-array problem
|
||||
nulls = append(nulls, fmt.Sprintf("%s.%s: missing (should be [])", path, key))
|
||||
}
|
||||
}
|
||||
}
|
||||
if spec.DynamicKeys && spec.ValueShape != nil {
|
||||
for k, v := range obj {
|
||||
nulls = append(nulls, findNullArrays(v, *spec.ValueShape, path+"."+k)...)
|
||||
break // sample one
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nulls
|
||||
}
|
||||
|
||||
// TestParityHealthEngine verifies Go health endpoint declares engine=go
|
||||
// while Node declares engine=node (or omits it). The Go server must always
|
||||
// identify itself.
|
||||
func TestParityHealthEngine(t *testing.T) {
|
||||
_, router := setupTestServer(t)
|
||||
|
||||
req := httptest.NewRequest("GET", "/api/health", nil)
|
||||
w := httptest.NewRecorder()
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
var body map[string]interface{}
|
||||
json.Unmarshal(w.Body.Bytes(), &body)
|
||||
|
||||
engine, ok := body["engine"]
|
||||
if !ok {
|
||||
t.Error("health response missing 'engine' field (Go server must include engine=go)")
|
||||
} else if engine != "go" {
|
||||
t.Errorf("health engine=%v, expected 'go'", engine)
|
||||
}
|
||||
}
|
||||
|
||||
// TestValidateShapeFunction directly tests the shape validator itself.
|
||||
func TestValidateShapeFunction(t *testing.T) {
|
||||
t.Run("string match", func(t *testing.T) {
|
||||
errs := validateShape("hello", shapeSpec{Type: "string"}, "$.x")
|
||||
if len(errs) != 0 {
|
||||
t.Errorf("unexpected errors: %v", errs)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("string mismatch", func(t *testing.T) {
|
||||
errs := validateShape(42.0, shapeSpec{Type: "string"}, "$.x")
|
||||
if len(errs) != 1 {
|
||||
t.Errorf("expected 1 error, got %d: %v", len(errs), errs)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("null array rejected", func(t *testing.T) {
|
||||
errs := validateShape(nil, shapeSpec{Type: "array"}, "$.arr")
|
||||
if len(errs) != 1 || !strings.Contains(errs[0], "null") {
|
||||
t.Errorf("expected null-array error, got: %v", errs)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("empty array OK", func(t *testing.T) {
|
||||
errs := validateShape([]interface{}{}, shapeSpec{Type: "array"}, "$.arr")
|
||||
if len(errs) != 0 {
|
||||
t.Errorf("unexpected errors for empty array: %v", errs)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("missing object key", func(t *testing.T) {
|
||||
spec := shapeSpec{Type: "object", Keys: map[string]shapeSpec{
|
||||
"name": {Type: "string"},
|
||||
"age": {Type: "number"},
|
||||
}}
|
||||
obj := map[string]interface{}{"name": "test"}
|
||||
errs := validateShape(obj, spec, "$.user")
|
||||
if len(errs) != 1 || !strings.Contains(errs[0], "age") {
|
||||
t.Errorf("expected missing age error, got: %v", errs)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("nullable allows null", func(t *testing.T) {
|
||||
errs := validateShape(nil, shapeSpec{Type: "nullable"}, "$.x")
|
||||
if len(errs) != 0 {
|
||||
t.Errorf("nullable should accept null: %v", errs)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("dynamic keys validates value shape", func(t *testing.T) {
|
||||
spec := shapeSpec{
|
||||
Type: "object",
|
||||
DynamicKeys: true,
|
||||
ValueShape: &shapeSpec{Type: "number"},
|
||||
}
|
||||
obj := map[string]interface{}{"a": 1.0, "b": 2.0}
|
||||
errs := validateShape(obj, spec, "$.dyn")
|
||||
if len(errs) != 0 {
|
||||
t.Errorf("unexpected errors: %v", errs)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestParityWSMultiObserverGolden(t *testing.T) {
|
||||
db := setupTestDB(t)
|
||||
defer db.Close()
|
||||
seedTestData(t, db)
|
||||
hub := NewHub()
|
||||
store := NewPacketStore(db)
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("store load failed: %v", err)
|
||||
}
|
||||
|
||||
poller := NewPoller(db, hub, 50*time.Millisecond)
|
||||
poller.store = store
|
||||
|
||||
client := &Client{send: make(chan []byte, 256)}
|
||||
hub.Register(client)
|
||||
defer hub.Unregister(client)
|
||||
|
||||
go poller.Start()
|
||||
defer poller.Stop()
|
||||
|
||||
// Wait for poller to initialize its lastID/lastObsID cursors before
|
||||
// inserting new data; otherwise the poller may snapshot a lastID that
|
||||
// already includes the test data and never broadcast it.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
now := time.Now().UTC().Format(time.RFC3339)
|
||||
if _, err := db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json)
|
||||
VALUES ('BEEF', 'goldenstarburst237', ?, 1, 4, '{"pubKey":"aabbccdd11223344","type":"ADVERT"}')`, now); err != nil {
|
||||
t.Fatalf("insert tx failed: %v", err)
|
||||
}
|
||||
var txID int
|
||||
if err := db.conn.QueryRow(`SELECT id FROM transmissions WHERE hash='goldenstarburst237'`).Scan(&txID); err != nil {
|
||||
t.Fatalf("query tx id failed: %v", err)
|
||||
}
|
||||
ts := time.Now().Unix()
|
||||
if _, err := db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
|
||||
VALUES (?, 1, 11.0, -88, '["p1"]', ?),
|
||||
(?, 2, 9.0, -92, '["p1","p2"]', ?),
|
||||
(?, 1, 7.0, -96, '["p1","p2","p3"]', ?)`,
|
||||
txID, ts, txID, ts+1, txID, ts+2); err != nil {
|
||||
t.Fatalf("insert obs failed: %v", err)
|
||||
}
|
||||
|
||||
type golden struct {
|
||||
Hash string
|
||||
Count int
|
||||
Paths []string
|
||||
ObserverIDs []string
|
||||
}
|
||||
expected := golden{
|
||||
Hash: "goldenstarburst237",
|
||||
Count: 3,
|
||||
Paths: []string{`["p1"]`, `["p1","p2"]`, `["p1","p2","p3"]`},
|
||||
ObserverIDs: []string{"obs1", "obs2"},
|
||||
}
|
||||
|
||||
gotPaths := make([]string, 0, expected.Count)
|
||||
gotObservers := make(map[string]bool)
|
||||
deadline := time.After(2 * time.Second)
|
||||
for len(gotPaths) < expected.Count {
|
||||
select {
|
||||
case raw := <-client.send:
|
||||
var msg map[string]interface{}
|
||||
if err := json.Unmarshal(raw, &msg); err != nil {
|
||||
t.Fatalf("unmarshal ws message failed: %v", err)
|
||||
}
|
||||
if msg["type"] != "packet" {
|
||||
continue
|
||||
}
|
||||
data, _ := msg["data"].(map[string]interface{})
|
||||
if data == nil || data["hash"] != expected.Hash {
|
||||
continue
|
||||
}
|
||||
if path, ok := data["path_json"].(string); ok {
|
||||
gotPaths = append(gotPaths, path)
|
||||
}
|
||||
if oid, ok := data["observer_id"].(string); ok && oid != "" {
|
||||
gotObservers[oid] = true
|
||||
}
|
||||
case <-deadline:
|
||||
t.Fatalf("timed out waiting for %d ws messages, got %d", expected.Count, len(gotPaths))
|
||||
}
|
||||
}
|
||||
|
||||
sort.Strings(gotPaths)
|
||||
sort.Strings(expected.Paths)
|
||||
if len(gotPaths) != len(expected.Paths) {
|
||||
t.Fatalf("path count mismatch: got %d want %d", len(gotPaths), len(expected.Paths))
|
||||
}
|
||||
for i := range expected.Paths {
|
||||
if gotPaths[i] != expected.Paths[i] {
|
||||
t.Fatalf("path mismatch at %d: got %q want %q", i, gotPaths[i], expected.Paths[i])
|
||||
}
|
||||
}
|
||||
for _, oid := range expected.ObserverIDs {
|
||||
if !gotObservers[oid] {
|
||||
t.Fatalf("missing expected observer %q in ws messages", oid)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1039,7 +1039,7 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac
|
||||
}
|
||||
}
|
||||
|
||||
// Build broadcast maps (same shape as Node.js WS broadcast)
|
||||
// Build broadcast maps (same shape as Node.js WS broadcast), one per observation.
|
||||
result := make([]map[string]interface{}, 0, len(broadcastOrder))
|
||||
for _, txID := range broadcastOrder {
|
||||
tx := broadcastTxs[txID]
|
||||
@@ -1055,32 +1055,34 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac
|
||||
decoded["payload"] = payload
|
||||
}
|
||||
}
|
||||
// Build the nested packet object (packets.js checks m.data.packet)
|
||||
pkt := map[string]interface{}{
|
||||
"id": tx.ID,
|
||||
"raw_hex": strOrNil(tx.RawHex),
|
||||
"hash": strOrNil(tx.Hash),
|
||||
"first_seen": strOrNil(tx.FirstSeen),
|
||||
"timestamp": strOrNil(tx.FirstSeen),
|
||||
"route_type": intPtrOrNil(tx.RouteType),
|
||||
"payload_type": intPtrOrNil(tx.PayloadType),
|
||||
"decoded_json": strOrNil(tx.DecodedJSON),
|
||||
"observer_id": strOrNil(tx.ObserverID),
|
||||
"observer_name": strOrNil(tx.ObserverName),
|
||||
"snr": floatPtrOrNil(tx.SNR),
|
||||
"rssi": floatPtrOrNil(tx.RSSI),
|
||||
"path_json": strOrNil(tx.PathJSON),
|
||||
"direction": strOrNil(tx.Direction),
|
||||
"observation_count": tx.ObservationCount,
|
||||
for _, obs := range tx.Observations {
|
||||
// Build the nested packet object (packets.js checks m.data.packet)
|
||||
pkt := map[string]interface{}{
|
||||
"id": tx.ID,
|
||||
"raw_hex": strOrNil(tx.RawHex),
|
||||
"hash": strOrNil(tx.Hash),
|
||||
"first_seen": strOrNil(tx.FirstSeen),
|
||||
"timestamp": strOrNil(tx.FirstSeen),
|
||||
"route_type": intPtrOrNil(tx.RouteType),
|
||||
"payload_type": intPtrOrNil(tx.PayloadType),
|
||||
"decoded_json": strOrNil(tx.DecodedJSON),
|
||||
"observer_id": strOrNil(obs.ObserverID),
|
||||
"observer_name": strOrNil(obs.ObserverName),
|
||||
"snr": floatPtrOrNil(obs.SNR),
|
||||
"rssi": floatPtrOrNil(obs.RSSI),
|
||||
"path_json": strOrNil(obs.PathJSON),
|
||||
"direction": strOrNil(obs.Direction),
|
||||
"observation_count": tx.ObservationCount,
|
||||
}
|
||||
// Broadcast map: top-level fields for live.js + nested packet for packets.js
|
||||
broadcastMap := make(map[string]interface{}, len(pkt)+2)
|
||||
for k, v := range pkt {
|
||||
broadcastMap[k] = v
|
||||
}
|
||||
broadcastMap["decoded"] = decoded
|
||||
broadcastMap["packet"] = pkt
|
||||
result = append(result, broadcastMap)
|
||||
}
|
||||
// Broadcast map: top-level fields for live.js + nested packet for packets.js
|
||||
broadcastMap := make(map[string]interface{}, len(pkt)+2)
|
||||
for k, v := range pkt {
|
||||
broadcastMap[k] = v
|
||||
}
|
||||
broadcastMap["decoded"] = decoded
|
||||
broadcastMap["packet"] = pkt
|
||||
result = append(result, broadcastMap)
|
||||
}
|
||||
|
||||
// Invalidate analytics caches since new data was ingested
|
||||
@@ -1101,7 +1103,7 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac
|
||||
// IngestNewObservations loads new observations for transmissions already in the
|
||||
// store. This catches observations that arrive after IngestNewFromDB has already
|
||||
// advanced past the transmission's ID (fixes #174).
|
||||
func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) int {
|
||||
func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string]interface{} {
|
||||
if limit <= 0 {
|
||||
limit = 500
|
||||
}
|
||||
@@ -1127,7 +1129,7 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) int {
|
||||
rows, err := s.db.conn.Query(querySQL, sinceObsID, limit)
|
||||
if err != nil {
|
||||
log.Printf("[store] ingest observations query error: %v", err)
|
||||
return sinceObsID
|
||||
return nil
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
@@ -1170,20 +1172,16 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) int {
|
||||
}
|
||||
|
||||
if len(obsRows) == 0 {
|
||||
return sinceObsID
|
||||
return nil
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
newMaxObsID := sinceObsID
|
||||
updatedTxs := make(map[int]*StoreTx)
|
||||
broadcastMaps := make([]map[string]interface{}, 0, len(obsRows))
|
||||
|
||||
for _, r := range obsRows {
|
||||
if r.obsID > newMaxObsID {
|
||||
newMaxObsID = r.obsID
|
||||
}
|
||||
|
||||
// Already ingested (e.g. by IngestNewFromDB in same cycle)
|
||||
if _, exists := s.byObsID[r.obsID]; exists {
|
||||
continue
|
||||
@@ -1226,6 +1224,43 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) int {
|
||||
}
|
||||
s.totalObs++
|
||||
updatedTxs[r.txID] = tx
|
||||
|
||||
decoded := map[string]interface{}{
|
||||
"header": map[string]interface{}{
|
||||
"payloadTypeName": resolvePayloadTypeName(tx.PayloadType),
|
||||
},
|
||||
}
|
||||
if tx.DecodedJSON != "" {
|
||||
var payload map[string]interface{}
|
||||
if json.Unmarshal([]byte(tx.DecodedJSON), &payload) == nil {
|
||||
decoded["payload"] = payload
|
||||
}
|
||||
}
|
||||
|
||||
pkt := map[string]interface{}{
|
||||
"id": tx.ID,
|
||||
"raw_hex": strOrNil(tx.RawHex),
|
||||
"hash": strOrNil(tx.Hash),
|
||||
"first_seen": strOrNil(tx.FirstSeen),
|
||||
"timestamp": strOrNil(tx.FirstSeen),
|
||||
"route_type": intPtrOrNil(tx.RouteType),
|
||||
"payload_type": intPtrOrNil(tx.PayloadType),
|
||||
"decoded_json": strOrNil(tx.DecodedJSON),
|
||||
"observer_id": strOrNil(obs.ObserverID),
|
||||
"observer_name": strOrNil(obs.ObserverName),
|
||||
"snr": floatPtrOrNil(obs.SNR),
|
||||
"rssi": floatPtrOrNil(obs.RSSI),
|
||||
"path_json": strOrNil(obs.PathJSON),
|
||||
"direction": strOrNil(obs.Direction),
|
||||
"observation_count": tx.ObservationCount,
|
||||
}
|
||||
broadcastMap := make(map[string]interface{}, len(pkt)+2)
|
||||
for k, v := range pkt {
|
||||
broadcastMap[k] = v
|
||||
}
|
||||
broadcastMap["decoded"] = decoded
|
||||
broadcastMap["packet"] = pkt
|
||||
broadcastMaps = append(broadcastMaps, broadcastMap)
|
||||
}
|
||||
|
||||
// Re-pick best observation for updated transmissions and update subpath index
|
||||
@@ -1280,7 +1315,7 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) int {
|
||||
// analytics caches cleared; no per-cycle log to avoid stdout overhead
|
||||
}
|
||||
|
||||
return newMaxObsID
|
||||
return broadcastMaps
|
||||
}
|
||||
|
||||
// MaxTransmissionID returns the highest transmission ID in the store.
|
||||
|
||||
@@ -1,229 +1,245 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"log"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
var upgrader = websocket.Upgrader{
|
||||
ReadBufferSize: 1024,
|
||||
WriteBufferSize: 4096,
|
||||
CheckOrigin: func(r *http.Request) bool { return true },
|
||||
}
|
||||
|
||||
// Hub manages WebSocket clients and broadcasts.
|
||||
type Hub struct {
|
||||
mu sync.RWMutex
|
||||
clients map[*Client]bool
|
||||
}
|
||||
|
||||
// Client is a single WebSocket connection.
|
||||
type Client struct {
|
||||
conn *websocket.Conn
|
||||
send chan []byte
|
||||
}
|
||||
|
||||
func NewHub() *Hub {
|
||||
return &Hub{
|
||||
clients: make(map[*Client]bool),
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Hub) ClientCount() int {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
return len(h.clients)
|
||||
}
|
||||
|
||||
func (h *Hub) Register(c *Client) {
|
||||
h.mu.Lock()
|
||||
h.clients[c] = true
|
||||
h.mu.Unlock()
|
||||
log.Printf("[ws] client connected (%d total)", h.ClientCount())
|
||||
}
|
||||
|
||||
func (h *Hub) Unregister(c *Client) {
|
||||
h.mu.Lock()
|
||||
if _, ok := h.clients[c]; ok {
|
||||
delete(h.clients, c)
|
||||
close(c.send)
|
||||
}
|
||||
h.mu.Unlock()
|
||||
log.Printf("[ws] client disconnected (%d total)", h.ClientCount())
|
||||
}
|
||||
|
||||
// Broadcast sends a message to all connected clients.
|
||||
func (h *Hub) Broadcast(msg interface{}) {
|
||||
data, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
log.Printf("[ws] marshal error: %v", err)
|
||||
return
|
||||
}
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
for c := range h.clients {
|
||||
select {
|
||||
case c.send <- data:
|
||||
default:
|
||||
// Client buffer full — drop
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ServeWS handles the WebSocket upgrade and runs the client.
|
||||
func (h *Hub) ServeWS(w http.ResponseWriter, r *http.Request) {
|
||||
conn, err := upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
log.Printf("[ws] upgrade error: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
client := &Client{
|
||||
conn: conn,
|
||||
send: make(chan []byte, 256),
|
||||
}
|
||||
h.Register(client)
|
||||
|
||||
go client.writePump()
|
||||
go client.readPump(h)
|
||||
}
|
||||
|
||||
// wsOrStatic upgrades WebSocket requests at any path, serves static files otherwise.
|
||||
func wsOrStatic(hub *Hub, static http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if strings.EqualFold(r.Header.Get("Upgrade"), "websocket") {
|
||||
hub.ServeWS(w, r)
|
||||
return
|
||||
}
|
||||
static.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
|
||||
func (c *Client) readPump(hub *Hub) {
|
||||
defer func() {
|
||||
hub.Unregister(c)
|
||||
c.conn.Close()
|
||||
}()
|
||||
c.conn.SetReadLimit(512)
|
||||
c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
|
||||
c.conn.SetPongHandler(func(string) error {
|
||||
c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
|
||||
return nil
|
||||
})
|
||||
for {
|
||||
_, _, err := c.conn.ReadMessage()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) writePump() {
|
||||
ticker := time.NewTicker(30 * time.Second)
|
||||
defer func() {
|
||||
ticker.Stop()
|
||||
c.conn.Close()
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case message, ok := <-c.send:
|
||||
c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
||||
if !ok {
|
||||
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
|
||||
return
|
||||
}
|
||||
if err := c.conn.WriteMessage(websocket.TextMessage, message); err != nil {
|
||||
return
|
||||
}
|
||||
case <-ticker.C:
|
||||
c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
||||
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Poller watches for new transmissions in SQLite and broadcasts them.
|
||||
type Poller struct {
|
||||
db *DB
|
||||
hub *Hub
|
||||
store *PacketStore // optional: if set, new transmissions are ingested into memory
|
||||
interval time.Duration
|
||||
stop chan struct{}
|
||||
}
|
||||
|
||||
func NewPoller(db *DB, hub *Hub, interval time.Duration) *Poller {
|
||||
return &Poller{db: db, hub: hub, interval: interval, stop: make(chan struct{})}
|
||||
}
|
||||
|
||||
func (p *Poller) Start() {
|
||||
lastID := p.db.GetMaxTransmissionID()
|
||||
lastObsID := p.db.GetMaxObservationID()
|
||||
log.Printf("[poller] starting from transmission ID %d, obs ID %d, interval %v", lastID, lastObsID, p.interval)
|
||||
|
||||
ticker := time.NewTicker(p.interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if p.store != nil {
|
||||
// Ingest new transmissions into in-memory store and broadcast
|
||||
newTxs, newMax := p.store.IngestNewFromDB(lastID, 100)
|
||||
if newMax > lastID {
|
||||
lastID = newMax
|
||||
}
|
||||
// Ingest new observations for existing transmissions (fixes #174)
|
||||
newObsMax := p.store.IngestNewObservations(lastObsID, 500)
|
||||
if newObsMax > lastObsID {
|
||||
lastObsID = newObsMax
|
||||
}
|
||||
if len(newTxs) > 0 {
|
||||
log.Printf("[broadcast] sending %d packets to %d clients (lastID now %d)", len(newTxs), p.hub.ClientCount(), lastID)
|
||||
}
|
||||
for _, tx := range newTxs {
|
||||
p.hub.Broadcast(WSMessage{
|
||||
Type: "packet",
|
||||
Data: tx,
|
||||
})
|
||||
}
|
||||
} else {
|
||||
// Fallback: direct DB query (used when store is nil, e.g. tests)
|
||||
newTxs, err := p.db.GetNewTransmissionsSince(lastID, 100)
|
||||
if err != nil {
|
||||
log.Printf("[poller] error: %v", err)
|
||||
continue
|
||||
}
|
||||
for _, tx := range newTxs {
|
||||
id, _ := tx["id"].(int)
|
||||
if id > lastID {
|
||||
lastID = id
|
||||
}
|
||||
// Copy packet fields for the nested packet (avoids circular ref)
|
||||
pkt := make(map[string]interface{}, len(tx))
|
||||
for k, v := range tx {
|
||||
pkt[k] = v
|
||||
}
|
||||
tx["packet"] = pkt
|
||||
p.hub.Broadcast(WSMessage{
|
||||
Type: "packet",
|
||||
Data: tx,
|
||||
})
|
||||
}
|
||||
}
|
||||
case <-p.stop:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Poller) Stop() {
|
||||
close(p.stop)
|
||||
}
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"log"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
var upgrader = websocket.Upgrader{
|
||||
ReadBufferSize: 1024,
|
||||
WriteBufferSize: 4096,
|
||||
CheckOrigin: func(r *http.Request) bool { return true },
|
||||
}
|
||||
|
||||
// Hub manages WebSocket clients and broadcasts.
|
||||
type Hub struct {
|
||||
mu sync.RWMutex
|
||||
clients map[*Client]bool
|
||||
}
|
||||
|
||||
// Client is a single WebSocket connection.
|
||||
type Client struct {
|
||||
conn *websocket.Conn
|
||||
send chan []byte
|
||||
}
|
||||
|
||||
func NewHub() *Hub {
|
||||
return &Hub{
|
||||
clients: make(map[*Client]bool),
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Hub) ClientCount() int {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
return len(h.clients)
|
||||
}
|
||||
|
||||
func (h *Hub) Register(c *Client) {
|
||||
h.mu.Lock()
|
||||
h.clients[c] = true
|
||||
h.mu.Unlock()
|
||||
log.Printf("[ws] client connected (%d total)", h.ClientCount())
|
||||
}
|
||||
|
||||
func (h *Hub) Unregister(c *Client) {
|
||||
h.mu.Lock()
|
||||
if _, ok := h.clients[c]; ok {
|
||||
delete(h.clients, c)
|
||||
close(c.send)
|
||||
}
|
||||
h.mu.Unlock()
|
||||
log.Printf("[ws] client disconnected (%d total)", h.ClientCount())
|
||||
}
|
||||
|
||||
// Broadcast sends a message to all connected clients.
|
||||
func (h *Hub) Broadcast(msg interface{}) {
|
||||
data, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
log.Printf("[ws] marshal error: %v", err)
|
||||
return
|
||||
}
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
for c := range h.clients {
|
||||
select {
|
||||
case c.send <- data:
|
||||
default:
|
||||
// Client buffer full — drop
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ServeWS handles the WebSocket upgrade and runs the client.
|
||||
func (h *Hub) ServeWS(w http.ResponseWriter, r *http.Request) {
|
||||
conn, err := upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
log.Printf("[ws] upgrade error: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
client := &Client{
|
||||
conn: conn,
|
||||
send: make(chan []byte, 256),
|
||||
}
|
||||
h.Register(client)
|
||||
|
||||
go client.writePump()
|
||||
go client.readPump(h)
|
||||
}
|
||||
|
||||
// wsOrStatic upgrades WebSocket requests at any path, serves static files otherwise.
|
||||
func wsOrStatic(hub *Hub, static http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if strings.EqualFold(r.Header.Get("Upgrade"), "websocket") {
|
||||
hub.ServeWS(w, r)
|
||||
return
|
||||
}
|
||||
static.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
|
||||
func (c *Client) readPump(hub *Hub) {
|
||||
defer func() {
|
||||
hub.Unregister(c)
|
||||
c.conn.Close()
|
||||
}()
|
||||
c.conn.SetReadLimit(512)
|
||||
c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
|
||||
c.conn.SetPongHandler(func(string) error {
|
||||
c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
|
||||
return nil
|
||||
})
|
||||
for {
|
||||
_, _, err := c.conn.ReadMessage()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) writePump() {
|
||||
ticker := time.NewTicker(30 * time.Second)
|
||||
defer func() {
|
||||
ticker.Stop()
|
||||
c.conn.Close()
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case message, ok := <-c.send:
|
||||
c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
||||
if !ok {
|
||||
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
|
||||
return
|
||||
}
|
||||
if err := c.conn.WriteMessage(websocket.TextMessage, message); err != nil {
|
||||
return
|
||||
}
|
||||
case <-ticker.C:
|
||||
c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
||||
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Poller watches for new transmissions in SQLite and broadcasts them.
|
||||
type Poller struct {
|
||||
db *DB
|
||||
hub *Hub
|
||||
store *PacketStore // optional: if set, new transmissions are ingested into memory
|
||||
interval time.Duration
|
||||
stop chan struct{}
|
||||
}
|
||||
|
||||
func NewPoller(db *DB, hub *Hub, interval time.Duration) *Poller {
|
||||
return &Poller{db: db, hub: hub, interval: interval, stop: make(chan struct{})}
|
||||
}
|
||||
|
||||
func (p *Poller) Start() {
|
||||
lastID := p.db.GetMaxTransmissionID()
|
||||
lastObsID := p.db.GetMaxObservationID()
|
||||
log.Printf("[poller] starting from transmission ID %d, obs ID %d, interval %v", lastID, lastObsID, p.interval)
|
||||
|
||||
ticker := time.NewTicker(p.interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if p.store != nil {
|
||||
// Ingest new transmissions into in-memory store and broadcast
|
||||
newTxs, newMax := p.store.IngestNewFromDB(lastID, 100)
|
||||
if newMax > lastID {
|
||||
lastID = newMax
|
||||
}
|
||||
// Ingest new observations for existing transmissions (fixes #174)
|
||||
nextObsID := lastObsID
|
||||
if err := p.db.conn.QueryRow(`
|
||||
SELECT COALESCE(MAX(id), ?) FROM (
|
||||
SELECT id FROM observations
|
||||
WHERE id > ?
|
||||
ORDER BY id ASC
|
||||
LIMIT 500
|
||||
)`, lastObsID, lastObsID).Scan(&nextObsID); err != nil {
|
||||
nextObsID = lastObsID
|
||||
}
|
||||
newObs := p.store.IngestNewObservations(lastObsID, 500)
|
||||
if nextObsID > lastObsID {
|
||||
lastObsID = nextObsID
|
||||
}
|
||||
if len(newTxs) > 0 {
|
||||
log.Printf("[broadcast] sending %d packets to %d clients (lastID now %d)", len(newTxs), p.hub.ClientCount(), lastID)
|
||||
}
|
||||
for _, tx := range newTxs {
|
||||
p.hub.Broadcast(WSMessage{
|
||||
Type: "packet",
|
||||
Data: tx,
|
||||
})
|
||||
}
|
||||
for _, obs := range newObs {
|
||||
p.hub.Broadcast(WSMessage{
|
||||
Type: "packet",
|
||||
Data: obs,
|
||||
})
|
||||
}
|
||||
} else {
|
||||
// Fallback: direct DB query (used when store is nil, e.g. tests)
|
||||
newTxs, err := p.db.GetNewTransmissionsSince(lastID, 100)
|
||||
if err != nil {
|
||||
log.Printf("[poller] error: %v", err)
|
||||
continue
|
||||
}
|
||||
for _, tx := range newTxs {
|
||||
id, _ := tx["id"].(int)
|
||||
if id > lastID {
|
||||
lastID = id
|
||||
}
|
||||
// Copy packet fields for the nested packet (avoids circular ref)
|
||||
pkt := make(map[string]interface{}, len(tx))
|
||||
for k, v := range tx {
|
||||
pkt[k] = v
|
||||
}
|
||||
tx["packet"] = pkt
|
||||
p.hub.Broadcast(WSMessage{
|
||||
Type: "packet",
|
||||
Data: tx,
|
||||
})
|
||||
}
|
||||
}
|
||||
case <-p.stop:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Poller) Stop() {
|
||||
close(p.stop)
|
||||
}
|
||||
|
||||
@@ -1,275 +1,415 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
func TestHubBroadcast(t *testing.T) {
|
||||
hub := NewHub()
|
||||
|
||||
if hub.ClientCount() != 0 {
|
||||
t.Errorf("expected 0 clients, got %d", hub.ClientCount())
|
||||
}
|
||||
|
||||
// Create a test server with WebSocket endpoint
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
hub.ServeWS(w, r)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
// Connect a WebSocket client
|
||||
wsURL := "ws" + srv.URL[4:] // replace http with ws
|
||||
conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("dial error: %v", err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
// Wait for registration
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
if hub.ClientCount() != 1 {
|
||||
t.Errorf("expected 1 client, got %d", hub.ClientCount())
|
||||
}
|
||||
|
||||
// Broadcast a message
|
||||
hub.Broadcast(map[string]interface{}{
|
||||
"type": "packet",
|
||||
"data": map[string]interface{}{"id": 1, "hash": "test123"},
|
||||
})
|
||||
|
||||
// Read the message
|
||||
conn.SetReadDeadline(time.Now().Add(2 * time.Second))
|
||||
_, msg, err := conn.ReadMessage()
|
||||
if err != nil {
|
||||
t.Fatalf("read error: %v", err)
|
||||
}
|
||||
if len(msg) == 0 {
|
||||
t.Error("expected non-empty message")
|
||||
}
|
||||
|
||||
// Disconnect
|
||||
conn.Close()
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
|
||||
func TestPollerCreation(t *testing.T) {
|
||||
db := setupTestDB(t)
|
||||
defer db.Close()
|
||||
seedTestData(t, db)
|
||||
hub := NewHub()
|
||||
|
||||
poller := NewPoller(db, hub, 100*time.Millisecond)
|
||||
if poller == nil {
|
||||
t.Fatal("expected poller")
|
||||
}
|
||||
|
||||
// Start and stop
|
||||
go poller.Start()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
poller.Stop()
|
||||
}
|
||||
|
||||
func TestHubMultipleClients(t *testing.T) {
|
||||
hub := NewHub()
|
||||
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
hub.ServeWS(w, r)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
wsURL := "ws" + srv.URL[4:]
|
||||
|
||||
// Connect two clients
|
||||
conn1, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("dial error: %v", err)
|
||||
}
|
||||
defer conn1.Close()
|
||||
|
||||
conn2, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("dial error: %v", err)
|
||||
}
|
||||
defer conn2.Close()
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
if hub.ClientCount() != 2 {
|
||||
t.Errorf("expected 2 clients, got %d", hub.ClientCount())
|
||||
}
|
||||
|
||||
// Broadcast and both should receive
|
||||
hub.Broadcast(map[string]interface{}{"type": "test", "data": "hello"})
|
||||
|
||||
conn1.SetReadDeadline(time.Now().Add(2 * time.Second))
|
||||
_, msg1, err := conn1.ReadMessage()
|
||||
if err != nil {
|
||||
t.Fatalf("conn1 read error: %v", err)
|
||||
}
|
||||
if len(msg1) == 0 {
|
||||
t.Error("expected non-empty message on conn1")
|
||||
}
|
||||
|
||||
conn2.SetReadDeadline(time.Now().Add(2 * time.Second))
|
||||
_, msg2, err := conn2.ReadMessage()
|
||||
if err != nil {
|
||||
t.Fatalf("conn2 read error: %v", err)
|
||||
}
|
||||
if len(msg2) == 0 {
|
||||
t.Error("expected non-empty message on conn2")
|
||||
}
|
||||
|
||||
// Disconnect one
|
||||
conn1.Close()
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Remaining client should still work
|
||||
hub.Broadcast(map[string]interface{}{"type": "test2"})
|
||||
|
||||
conn2.SetReadDeadline(time.Now().Add(2 * time.Second))
|
||||
_, msg3, err := conn2.ReadMessage()
|
||||
if err != nil {
|
||||
t.Fatalf("conn2 read error after disconnect: %v", err)
|
||||
}
|
||||
if len(msg3) == 0 {
|
||||
t.Error("expected non-empty message")
|
||||
}
|
||||
}
|
||||
|
||||
func TestBroadcastFullBuffer(t *testing.T) {
|
||||
hub := NewHub()
|
||||
|
||||
// Create a client with tiny buffer (1)
|
||||
client := &Client{
|
||||
send: make(chan []byte, 1),
|
||||
}
|
||||
hub.mu.Lock()
|
||||
hub.clients[client] = true
|
||||
hub.mu.Unlock()
|
||||
|
||||
// Fill the buffer
|
||||
client.send <- []byte("first")
|
||||
|
||||
// This broadcast should drop the message (buffer full)
|
||||
hub.Broadcast(map[string]interface{}{"type": "dropped"})
|
||||
|
||||
// Channel should still only have the first message
|
||||
select {
|
||||
case msg := <-client.send:
|
||||
if string(msg) != "first" {
|
||||
t.Errorf("expected 'first', got %s", string(msg))
|
||||
}
|
||||
default:
|
||||
t.Error("expected message in channel")
|
||||
}
|
||||
|
||||
// Clean up
|
||||
hub.mu.Lock()
|
||||
delete(hub.clients, client)
|
||||
hub.mu.Unlock()
|
||||
}
|
||||
|
||||
func TestBroadcastMarshalError(t *testing.T) {
|
||||
hub := NewHub()
|
||||
|
||||
// Marshal error: functions can't be marshaled to JSON
|
||||
hub.Broadcast(map[string]interface{}{"bad": func() {}})
|
||||
// Should not panic — just log and return
|
||||
}
|
||||
|
||||
func TestPollerBroadcastsNewData(t *testing.T) {
|
||||
db := setupTestDB(t)
|
||||
defer db.Close()
|
||||
seedTestData(t, db)
|
||||
hub := NewHub()
|
||||
|
||||
// Create a client to receive broadcasts
|
||||
client := &Client{
|
||||
send: make(chan []byte, 256),
|
||||
}
|
||||
hub.mu.Lock()
|
||||
hub.clients[client] = true
|
||||
hub.mu.Unlock()
|
||||
|
||||
poller := NewPoller(db, hub, 50*time.Millisecond)
|
||||
go poller.Start()
|
||||
|
||||
// Insert new data to trigger broadcast
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type)
|
||||
VALUES ('EEFF', 'newhash123456789', '2026-01-16T10:00:00Z', 1, 4)`)
|
||||
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
poller.Stop()
|
||||
|
||||
// Check if client received broadcast with packet field (fixes #162)
|
||||
select {
|
||||
case msg := <-client.send:
|
||||
if len(msg) == 0 {
|
||||
t.Error("expected non-empty broadcast message")
|
||||
}
|
||||
var parsed map[string]interface{}
|
||||
if err := json.Unmarshal(msg, &parsed); err != nil {
|
||||
t.Fatalf("failed to parse broadcast: %v", err)
|
||||
}
|
||||
if parsed["type"] != "packet" {
|
||||
t.Errorf("expected type=packet, got %v", parsed["type"])
|
||||
}
|
||||
data, ok := parsed["data"].(map[string]interface{})
|
||||
if !ok {
|
||||
t.Fatal("expected data to be an object")
|
||||
}
|
||||
// packets.js filters on m.data.packet — must exist
|
||||
pkt, ok := data["packet"]
|
||||
if !ok || pkt == nil {
|
||||
t.Error("expected data.packet to exist (required by packets.js WS handler)")
|
||||
}
|
||||
pktMap, ok := pkt.(map[string]interface{})
|
||||
if !ok {
|
||||
t.Fatal("expected data.packet to be an object")
|
||||
}
|
||||
// Verify key fields exist in nested packet (timestamp required by packets.js)
|
||||
for _, field := range []string{"id", "hash", "payload_type", "timestamp"} {
|
||||
if _, exists := pktMap[field]; !exists {
|
||||
t.Errorf("expected data.packet.%s to exist", field)
|
||||
}
|
||||
}
|
||||
default:
|
||||
// Might not have received due to timing
|
||||
}
|
||||
|
||||
// Clean up
|
||||
hub.mu.Lock()
|
||||
delete(hub.clients, client)
|
||||
hub.mu.Unlock()
|
||||
}
|
||||
|
||||
func TestHubRegisterUnregister(t *testing.T) {
|
||||
hub := NewHub()
|
||||
|
||||
client := &Client{
|
||||
send: make(chan []byte, 256),
|
||||
}
|
||||
|
||||
hub.Register(client)
|
||||
if hub.ClientCount() != 1 {
|
||||
t.Errorf("expected 1 client after register, got %d", hub.ClientCount())
|
||||
}
|
||||
|
||||
hub.Unregister(client)
|
||||
if hub.ClientCount() != 0 {
|
||||
t.Errorf("expected 0 clients after unregister, got %d", hub.ClientCount())
|
||||
}
|
||||
|
||||
// Unregister again should be safe
|
||||
hub.Unregister(client)
|
||||
if hub.ClientCount() != 0 {
|
||||
t.Errorf("expected 0 clients, got %d", hub.ClientCount())
|
||||
}
|
||||
}
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
func TestHubBroadcast(t *testing.T) {
|
||||
hub := NewHub()
|
||||
|
||||
if hub.ClientCount() != 0 {
|
||||
t.Errorf("expected 0 clients, got %d", hub.ClientCount())
|
||||
}
|
||||
|
||||
// Create a test server with WebSocket endpoint
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
hub.ServeWS(w, r)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
// Connect a WebSocket client
|
||||
wsURL := "ws" + srv.URL[4:] // replace http with ws
|
||||
conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("dial error: %v", err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
// Wait for registration
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
if hub.ClientCount() != 1 {
|
||||
t.Errorf("expected 1 client, got %d", hub.ClientCount())
|
||||
}
|
||||
|
||||
// Broadcast a message
|
||||
hub.Broadcast(map[string]interface{}{
|
||||
"type": "packet",
|
||||
"data": map[string]interface{}{"id": 1, "hash": "test123"},
|
||||
})
|
||||
|
||||
// Read the message
|
||||
conn.SetReadDeadline(time.Now().Add(2 * time.Second))
|
||||
_, msg, err := conn.ReadMessage()
|
||||
if err != nil {
|
||||
t.Fatalf("read error: %v", err)
|
||||
}
|
||||
if len(msg) == 0 {
|
||||
t.Error("expected non-empty message")
|
||||
}
|
||||
|
||||
// Disconnect
|
||||
conn.Close()
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
|
||||
func TestPollerCreation(t *testing.T) {
|
||||
db := setupTestDB(t)
|
||||
defer db.Close()
|
||||
seedTestData(t, db)
|
||||
hub := NewHub()
|
||||
|
||||
poller := NewPoller(db, hub, 100*time.Millisecond)
|
||||
if poller == nil {
|
||||
t.Fatal("expected poller")
|
||||
}
|
||||
|
||||
// Start and stop
|
||||
go poller.Start()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
poller.Stop()
|
||||
}
|
||||
|
||||
func TestHubMultipleClients(t *testing.T) {
|
||||
hub := NewHub()
|
||||
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
hub.ServeWS(w, r)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
wsURL := "ws" + srv.URL[4:]
|
||||
|
||||
// Connect two clients
|
||||
conn1, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("dial error: %v", err)
|
||||
}
|
||||
defer conn1.Close()
|
||||
|
||||
conn2, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("dial error: %v", err)
|
||||
}
|
||||
defer conn2.Close()
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
if hub.ClientCount() != 2 {
|
||||
t.Errorf("expected 2 clients, got %d", hub.ClientCount())
|
||||
}
|
||||
|
||||
// Broadcast and both should receive
|
||||
hub.Broadcast(map[string]interface{}{"type": "test", "data": "hello"})
|
||||
|
||||
conn1.SetReadDeadline(time.Now().Add(2 * time.Second))
|
||||
_, msg1, err := conn1.ReadMessage()
|
||||
if err != nil {
|
||||
t.Fatalf("conn1 read error: %v", err)
|
||||
}
|
||||
if len(msg1) == 0 {
|
||||
t.Error("expected non-empty message on conn1")
|
||||
}
|
||||
|
||||
conn2.SetReadDeadline(time.Now().Add(2 * time.Second))
|
||||
_, msg2, err := conn2.ReadMessage()
|
||||
if err != nil {
|
||||
t.Fatalf("conn2 read error: %v", err)
|
||||
}
|
||||
if len(msg2) == 0 {
|
||||
t.Error("expected non-empty message on conn2")
|
||||
}
|
||||
|
||||
// Disconnect one
|
||||
conn1.Close()
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Remaining client should still work
|
||||
hub.Broadcast(map[string]interface{}{"type": "test2"})
|
||||
|
||||
conn2.SetReadDeadline(time.Now().Add(2 * time.Second))
|
||||
_, msg3, err := conn2.ReadMessage()
|
||||
if err != nil {
|
||||
t.Fatalf("conn2 read error after disconnect: %v", err)
|
||||
}
|
||||
if len(msg3) == 0 {
|
||||
t.Error("expected non-empty message")
|
||||
}
|
||||
}
|
||||
|
||||
func TestBroadcastFullBuffer(t *testing.T) {
|
||||
hub := NewHub()
|
||||
|
||||
// Create a client with tiny buffer (1)
|
||||
client := &Client{
|
||||
send: make(chan []byte, 1),
|
||||
}
|
||||
hub.mu.Lock()
|
||||
hub.clients[client] = true
|
||||
hub.mu.Unlock()
|
||||
|
||||
// Fill the buffer
|
||||
client.send <- []byte("first")
|
||||
|
||||
// This broadcast should drop the message (buffer full)
|
||||
hub.Broadcast(map[string]interface{}{"type": "dropped"})
|
||||
|
||||
// Channel should still only have the first message
|
||||
select {
|
||||
case msg := <-client.send:
|
||||
if string(msg) != "first" {
|
||||
t.Errorf("expected 'first', got %s", string(msg))
|
||||
}
|
||||
default:
|
||||
t.Error("expected message in channel")
|
||||
}
|
||||
|
||||
// Clean up
|
||||
hub.mu.Lock()
|
||||
delete(hub.clients, client)
|
||||
hub.mu.Unlock()
|
||||
}
|
||||
|
||||
func TestBroadcastMarshalError(t *testing.T) {
|
||||
hub := NewHub()
|
||||
|
||||
// Marshal error: functions can't be marshaled to JSON
|
||||
hub.Broadcast(map[string]interface{}{"bad": func() {}})
|
||||
// Should not panic — just log and return
|
||||
}
|
||||
|
||||
func TestPollerBroadcastsNewData(t *testing.T) {
|
||||
db := setupTestDB(t)
|
||||
defer db.Close()
|
||||
seedTestData(t, db)
|
||||
hub := NewHub()
|
||||
|
||||
// Create a client to receive broadcasts
|
||||
client := &Client{
|
||||
send: make(chan []byte, 256),
|
||||
}
|
||||
hub.mu.Lock()
|
||||
hub.clients[client] = true
|
||||
hub.mu.Unlock()
|
||||
|
||||
poller := NewPoller(db, hub, 50*time.Millisecond)
|
||||
go poller.Start()
|
||||
|
||||
// Insert new data to trigger broadcast
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type)
|
||||
VALUES ('EEFF', 'newhash123456789', '2026-01-16T10:00:00Z', 1, 4)`)
|
||||
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
poller.Stop()
|
||||
|
||||
// Check if client received broadcast with packet field (fixes #162)
|
||||
select {
|
||||
case msg := <-client.send:
|
||||
if len(msg) == 0 {
|
||||
t.Error("expected non-empty broadcast message")
|
||||
}
|
||||
var parsed map[string]interface{}
|
||||
if err := json.Unmarshal(msg, &parsed); err != nil {
|
||||
t.Fatalf("failed to parse broadcast: %v", err)
|
||||
}
|
||||
if parsed["type"] != "packet" {
|
||||
t.Errorf("expected type=packet, got %v", parsed["type"])
|
||||
}
|
||||
data, ok := parsed["data"].(map[string]interface{})
|
||||
if !ok {
|
||||
t.Fatal("expected data to be an object")
|
||||
}
|
||||
// packets.js filters on m.data.packet — must exist
|
||||
pkt, ok := data["packet"]
|
||||
if !ok || pkt == nil {
|
||||
t.Error("expected data.packet to exist (required by packets.js WS handler)")
|
||||
}
|
||||
pktMap, ok := pkt.(map[string]interface{})
|
||||
if !ok {
|
||||
t.Fatal("expected data.packet to be an object")
|
||||
}
|
||||
// Verify key fields exist in nested packet (timestamp required by packets.js)
|
||||
for _, field := range []string{"id", "hash", "payload_type", "timestamp"} {
|
||||
if _, exists := pktMap[field]; !exists {
|
||||
t.Errorf("expected data.packet.%s to exist", field)
|
||||
}
|
||||
}
|
||||
default:
|
||||
// Might not have received due to timing
|
||||
}
|
||||
|
||||
// Clean up
|
||||
hub.mu.Lock()
|
||||
delete(hub.clients, client)
|
||||
hub.mu.Unlock()
|
||||
}
|
||||
|
||||
func TestPollerBroadcastsMultipleObservations(t *testing.T) {
|
||||
db := setupTestDB(t)
|
||||
defer db.Close()
|
||||
seedTestData(t, db)
|
||||
hub := NewHub()
|
||||
|
||||
client := &Client{
|
||||
send: make(chan []byte, 256),
|
||||
}
|
||||
hub.mu.Lock()
|
||||
hub.clients[client] = true
|
||||
hub.mu.Unlock()
|
||||
defer func() {
|
||||
hub.mu.Lock()
|
||||
delete(hub.clients, client)
|
||||
hub.mu.Unlock()
|
||||
}()
|
||||
|
||||
poller := NewPoller(db, hub, 50*time.Millisecond)
|
||||
store := NewPacketStore(db)
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("store load failed: %v", err)
|
||||
}
|
||||
poller.store = store
|
||||
go poller.Start()
|
||||
defer poller.Stop()
|
||||
|
||||
// Wait for poller to initialize its lastID/lastObsID cursors before
|
||||
// inserting new data; otherwise the poller may snapshot a lastID that
|
||||
// already includes the test data and never broadcast it.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
now := time.Now().UTC().Format(time.RFC3339)
|
||||
if _, err := db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json)
|
||||
VALUES ('FACE', 'starbursthash237a', ?, 1, 4, '{"pubKey":"aabbccdd11223344","type":"ADVERT"}')`, now); err != nil {
|
||||
t.Fatalf("insert tx failed: %v", err)
|
||||
}
|
||||
var txID int
|
||||
if err := db.conn.QueryRow(`SELECT id FROM transmissions WHERE hash='starbursthash237a'`).Scan(&txID); err != nil {
|
||||
t.Fatalf("query tx id failed: %v", err)
|
||||
}
|
||||
ts := time.Now().Unix()
|
||||
if _, err := db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
|
||||
VALUES (?, 1, 14.0, -82, '["aa"]', ?),
|
||||
(?, 2, 10.5, -90, '["aa","bb"]', ?),
|
||||
(?, 1, 7.0, -96, '["aa","bb","cc"]', ?)`,
|
||||
txID, ts, txID, ts+1, txID, ts+2); err != nil {
|
||||
t.Fatalf("insert observations failed: %v", err)
|
||||
}
|
||||
|
||||
deadline := time.After(2 * time.Second)
|
||||
var dataMsgs []map[string]interface{}
|
||||
for len(dataMsgs) < 3 {
|
||||
select {
|
||||
case raw := <-client.send:
|
||||
var parsed map[string]interface{}
|
||||
if err := json.Unmarshal(raw, &parsed); err != nil {
|
||||
t.Fatalf("unmarshal ws msg failed: %v", err)
|
||||
}
|
||||
if parsed["type"] != "packet" {
|
||||
continue
|
||||
}
|
||||
data, ok := parsed["data"].(map[string]interface{})
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if data["hash"] == "starbursthash237a" {
|
||||
dataMsgs = append(dataMsgs, data)
|
||||
}
|
||||
case <-deadline:
|
||||
t.Fatalf("timed out waiting for 3 observation broadcasts, got %d", len(dataMsgs))
|
||||
}
|
||||
}
|
||||
|
||||
if len(dataMsgs) != 3 {
|
||||
t.Fatalf("expected 3 messages, got %d", len(dataMsgs))
|
||||
}
|
||||
|
||||
paths := make([]string, 0, 3)
|
||||
observers := make(map[string]bool)
|
||||
for _, m := range dataMsgs {
|
||||
hash, _ := m["hash"].(string)
|
||||
if hash != "starbursthash237a" {
|
||||
t.Fatalf("unexpected hash %q", hash)
|
||||
}
|
||||
p, _ := m["path_json"].(string)
|
||||
paths = append(paths, p)
|
||||
if oid, ok := m["observer_id"].(string); ok && oid != "" {
|
||||
observers[oid] = true
|
||||
}
|
||||
}
|
||||
sort.Strings(paths)
|
||||
wantPaths := []string{`["aa","bb","cc"]`, `["aa","bb"]`, `["aa"]`}
|
||||
sort.Strings(wantPaths)
|
||||
for i := range wantPaths {
|
||||
if paths[i] != wantPaths[i] {
|
||||
t.Fatalf("path mismatch at %d: got %q want %q", i, paths[i], wantPaths[i])
|
||||
}
|
||||
}
|
||||
if len(observers) < 2 {
|
||||
t.Fatalf("expected observations from >=2 observers, got %d", len(observers))
|
||||
}
|
||||
}
|
||||
|
||||
func TestIngestNewObservationsBroadcast(t *testing.T) {
|
||||
db := setupTestDB(t)
|
||||
defer db.Close()
|
||||
seedTestData(t, db)
|
||||
store := NewPacketStore(db)
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("store load failed: %v", err)
|
||||
}
|
||||
|
||||
maxObs := db.GetMaxObservationID()
|
||||
now := time.Now().Unix()
|
||||
if _, err := db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
|
||||
VALUES (1, 2, 6.0, -100, '["aa","zz"]', ?),
|
||||
(1, 1, 5.0, -101, '["aa","yy"]', ?)`, now, now+1); err != nil {
|
||||
t.Fatalf("insert new observations failed: %v", err)
|
||||
}
|
||||
|
||||
maps := store.IngestNewObservations(maxObs, 500)
|
||||
if len(maps) != 2 {
|
||||
t.Fatalf("expected 2 broadcast maps, got %d", len(maps))
|
||||
}
|
||||
for _, m := range maps {
|
||||
if m["hash"] != "abc123def4567890" {
|
||||
t.Fatalf("unexpected hash in map: %v", m["hash"])
|
||||
}
|
||||
path, ok := m["path_json"].(string)
|
||||
if !ok || path == "" {
|
||||
t.Fatalf("missing path_json in map: %#v", m)
|
||||
}
|
||||
if _, ok := m["observer_id"]; !ok {
|
||||
t.Fatalf("missing observer_id in map: %#v", m)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestHubRegisterUnregister(t *testing.T) {
|
||||
hub := NewHub()
|
||||
|
||||
client := &Client{
|
||||
send: make(chan []byte, 256),
|
||||
}
|
||||
|
||||
hub.Register(client)
|
||||
if hub.ClientCount() != 1 {
|
||||
t.Errorf("expected 1 client after register, got %d", hub.ClientCount())
|
||||
}
|
||||
|
||||
hub.Unregister(client)
|
||||
if hub.ClientCount() != 0 {
|
||||
t.Errorf("expected 0 clients after unregister, got %d", hub.ClientCount())
|
||||
}
|
||||
|
||||
// Unregister again should be safe
|
||||
hub.Unregister(client)
|
||||
if hub.ClientCount() != 0 {
|
||||
t.Errorf("expected 0 clients, got %d", hub.ClientCount())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user