mirror of
https://github.com/Kpa-clawbot/meshcore-analyzer.git
synced 2026-03-30 17:05:58 +00:00
Compare commits
3 Commits
fix/live-v
...
optimize/f
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1191cfc017 | ||
|
|
68bdaaca99 | ||
|
|
074f3d3760 |
85
.github/workflows/deploy.yml
vendored
85
.github/workflows/deploy.yml
vendored
@@ -41,10 +41,10 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v4
|
||||
uses: actions/checkout@v5
|
||||
|
||||
- name: Set up Go 1.22
|
||||
uses: actions/setup-go@v5
|
||||
uses: actions/setup-go@v6
|
||||
with:
|
||||
go-version: '1.22'
|
||||
cache-dependency-path: |
|
||||
@@ -122,9 +122,16 @@ jobs:
|
||||
echo "| Server | ${SERVER_COV}% |" >> $GITHUB_STEP_SUMMARY
|
||||
echo "| Ingestor | ${INGESTOR_COV}% |" >> $GITHUB_STEP_SUMMARY
|
||||
|
||||
- name: Cancel workflow on failure
|
||||
if: failure()
|
||||
run: |
|
||||
curl -s -X POST \
|
||||
-H "Authorization: Bearer ${{ github.token }}" \
|
||||
"https://api.github.com/repos/${{ github.repository }}/actions/runs/${{ github.run_id }}/cancel"
|
||||
|
||||
- name: Upload Go coverage badges
|
||||
if: always()
|
||||
uses: actions/upload-artifact@v4
|
||||
uses: actions/upload-artifact@v5
|
||||
with:
|
||||
name: go-badges
|
||||
path: .badges/go-*.json
|
||||
@@ -136,15 +143,15 @@ jobs:
|
||||
# ───────────────────────────────────────────────────────────────
|
||||
node-test:
|
||||
name: "🧪 Node.js Tests"
|
||||
runs-on: self-hosted
|
||||
runs-on: [self-hosted, Linux]
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v4
|
||||
uses: actions/checkout@v5
|
||||
with:
|
||||
fetch-depth: 2
|
||||
|
||||
- name: Set up Node.js 22
|
||||
uses: actions/setup-node@v4
|
||||
uses: actions/setup-node@v5
|
||||
with:
|
||||
node-version: '22'
|
||||
|
||||
@@ -190,7 +197,11 @@ jobs:
|
||||
|
||||
- name: Install Playwright browser
|
||||
if: steps.changes.outputs.frontend == 'true'
|
||||
run: npx playwright install chromium --with-deps 2>/dev/null || true
|
||||
run: |
|
||||
# Install chromium (skips download if already cached on self-hosted runner)
|
||||
npx playwright install chromium 2>/dev/null || true
|
||||
# Install system deps only if missing (apt-get is slow)
|
||||
npx playwright install-deps chromium 2>/dev/null || true
|
||||
|
||||
- name: Instrument frontend JS for coverage
|
||||
if: steps.changes.outputs.frontend == 'true'
|
||||
@@ -220,19 +231,32 @@ jobs:
|
||||
sleep 1
|
||||
done
|
||||
|
||||
- name: Run Playwright E2E tests
|
||||
- name: Run Playwright E2E + coverage collection concurrently
|
||||
if: steps.changes.outputs.frontend == 'true'
|
||||
run: BASE_URL=http://localhost:13581 node test-e2e-playwright.js 2>&1 | tee e2e-output.txt
|
||||
run: |
|
||||
# Run E2E tests and coverage collection in parallel — both use the same server
|
||||
BASE_URL=http://localhost:13581 node test-e2e-playwright.js 2>&1 | tee e2e-output.txt &
|
||||
E2E_PID=$!
|
||||
BASE_URL=http://localhost:13581 node scripts/collect-frontend-coverage.js 2>&1 | tee fe-coverage-output.txt &
|
||||
COV_PID=$!
|
||||
|
||||
- name: Collect frontend coverage report
|
||||
# Wait for both — E2E must pass, coverage is best-effort
|
||||
E2E_EXIT=0
|
||||
wait $E2E_PID || E2E_EXIT=$?
|
||||
wait $COV_PID || true
|
||||
|
||||
# Fail if E2E failed
|
||||
[ $E2E_EXIT -ne 0 ] && exit $E2E_EXIT
|
||||
true
|
||||
|
||||
- name: Generate frontend coverage badges
|
||||
if: always() && steps.changes.outputs.frontend == 'true'
|
||||
run: |
|
||||
BASE_URL=http://localhost:13581 node scripts/collect-frontend-coverage.js 2>&1 | tee fe-coverage-output.txt
|
||||
|
||||
E2E_PASS=$(grep -oP '[0-9]+(?=/)' e2e-output.txt | tail -1)
|
||||
|
||||
mkdir -p .badges
|
||||
if [ -f .nyc_output/frontend-coverage.json ]; then
|
||||
# Merge E2E + coverage collector data if both exist
|
||||
if [ -f .nyc_output/frontend-coverage.json ] || [ -f .nyc_output/e2e-coverage.json ]; then
|
||||
npx nyc report --reporter=text-summary --reporter=text 2>&1 | tee fe-report.txt
|
||||
FE_COVERAGE=$(grep 'Statements' fe-report.txt | head -1 | grep -oP '[\d.]+(?=%)' || echo "0")
|
||||
FE_COVERAGE=${FE_COVERAGE:-0}
|
||||
@@ -259,13 +283,24 @@ jobs:
|
||||
fuser -k 13581/tcp 2>/dev/null || true
|
||||
PORT=13581 node server.js &
|
||||
SERVER_PID=$!
|
||||
sleep 5
|
||||
# Wait for server to be ready (up to 15s)
|
||||
for i in $(seq 1 15); do
|
||||
curl -sf http://localhost:13581/api/stats > /dev/null 2>&1 && break
|
||||
sleep 1
|
||||
done
|
||||
BASE_URL=http://localhost:13581 node test-e2e-playwright.js || true
|
||||
kill $SERVER_PID 2>/dev/null || true
|
||||
|
||||
- name: Cancel workflow on failure
|
||||
if: failure()
|
||||
run: |
|
||||
curl -s -X POST \
|
||||
-H "Authorization: Bearer ${{ github.token }}" \
|
||||
"https://api.github.com/repos/${{ github.repository }}/actions/runs/${{ github.run_id }}/cancel"
|
||||
|
||||
- name: Upload Node.js test badges
|
||||
if: always()
|
||||
uses: actions/upload-artifact@v4
|
||||
uses: actions/upload-artifact@v5
|
||||
with:
|
||||
name: node-badges
|
||||
path: .badges/
|
||||
@@ -278,14 +313,14 @@ jobs:
|
||||
build:
|
||||
name: "🏗️ Build Docker Image"
|
||||
if: github.event_name == 'push'
|
||||
needs: [go-test]
|
||||
runs-on: self-hosted
|
||||
needs: [go-test, node-test]
|
||||
runs-on: [self-hosted, Linux]
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v4
|
||||
uses: actions/checkout@v5
|
||||
|
||||
- name: Set up Node.js 22
|
||||
uses: actions/setup-node@v4
|
||||
uses: actions/setup-node@v5
|
||||
with:
|
||||
node-version: '22'
|
||||
|
||||
@@ -304,10 +339,10 @@ jobs:
|
||||
name: "🚀 Deploy Staging"
|
||||
if: github.event_name == 'push'
|
||||
needs: [build]
|
||||
runs-on: self-hosted
|
||||
runs-on: [self-hosted, Linux]
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v4
|
||||
uses: actions/checkout@v5
|
||||
|
||||
- name: Start staging on port 82
|
||||
run: |
|
||||
@@ -349,21 +384,21 @@ jobs:
|
||||
name: "📝 Publish Badges & Summary"
|
||||
if: github.event_name == 'push'
|
||||
needs: [deploy]
|
||||
runs-on: self-hosted
|
||||
runs-on: [self-hosted, Linux]
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v4
|
||||
uses: actions/checkout@v5
|
||||
|
||||
- name: Download Go coverage badges
|
||||
continue-on-error: true
|
||||
uses: actions/download-artifact@v4
|
||||
uses: actions/download-artifact@v5
|
||||
with:
|
||||
name: go-badges
|
||||
path: .badges/
|
||||
|
||||
- name: Download Node.js test badges
|
||||
continue-on-error: true
|
||||
uses: actions/download-artifact@v4
|
||||
uses: actions/download-artifact@v5
|
||||
with:
|
||||
name: node-badges
|
||||
path: .badges/
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -17,8 +17,6 @@ func setupTestDB(t *testing.T) *DB {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// Force single connection so all goroutines share the same in-memory DB
|
||||
conn.SetMaxOpenConns(1)
|
||||
|
||||
// Create schema matching MeshCore Analyzer v3
|
||||
schema := `
|
||||
|
||||
@@ -1,506 +1,403 @@
|
||||
package main
|
||||
|
||||
// parity_test.go — Golden fixture shape tests.
|
||||
// Validates that Go API responses match the shape of Node.js API responses.
|
||||
// Shapes were captured from the production Node.js server and stored in
|
||||
// testdata/golden/shapes.json.
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"sort"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// shapeSpec describes the expected JSON structure from the Node.js server.
|
||||
type shapeSpec struct {
|
||||
Type string `json:"type"`
|
||||
Keys map[string]shapeSpec `json:"keys,omitempty"`
|
||||
ElementShape *shapeSpec `json:"elementShape,omitempty"`
|
||||
DynamicKeys bool `json:"dynamicKeys,omitempty"`
|
||||
ValueShape *shapeSpec `json:"valueShape,omitempty"`
|
||||
RequiredKeys map[string]shapeSpec `json:"requiredKeys,omitempty"`
|
||||
}
|
||||
|
||||
// loadShapes reads testdata/golden/shapes.json relative to this source file.
|
||||
func loadShapes(t *testing.T) map[string]shapeSpec {
|
||||
t.Helper()
|
||||
_, thisFile, _, _ := runtime.Caller(0)
|
||||
dir := filepath.Dir(thisFile)
|
||||
data, err := os.ReadFile(filepath.Join(dir, "testdata", "golden", "shapes.json"))
|
||||
if err != nil {
|
||||
t.Fatalf("cannot load shapes.json: %v", err)
|
||||
}
|
||||
var shapes map[string]shapeSpec
|
||||
if err := json.Unmarshal(data, &shapes); err != nil {
|
||||
t.Fatalf("cannot parse shapes.json: %v", err)
|
||||
}
|
||||
return shapes
|
||||
}
|
||||
|
||||
// validateShape recursively checks that `actual` matches the expected `spec`.
|
||||
// `path` tracks the JSON path for error messages.
|
||||
// Returns a list of mismatch descriptions.
|
||||
func validateShape(actual interface{}, spec shapeSpec, path string) []string {
|
||||
var errs []string
|
||||
|
||||
switch spec.Type {
|
||||
case "null", "nullable":
|
||||
// nullable means: value can be null OR matching type. Accept anything.
|
||||
return nil
|
||||
case "nullable_number":
|
||||
// Can be null or number
|
||||
if actual != nil {
|
||||
if _, ok := actual.(float64); !ok {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected number or null, got %T", path, actual))
|
||||
}
|
||||
}
|
||||
return errs
|
||||
case "string":
|
||||
if actual == nil {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected string, got null", path))
|
||||
} else if _, ok := actual.(string); !ok {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected string, got %T", path, actual))
|
||||
}
|
||||
case "number":
|
||||
if actual == nil {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected number, got null", path))
|
||||
} else if _, ok := actual.(float64); !ok {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected number, got %T (%v)", path, actual, actual))
|
||||
}
|
||||
case "boolean":
|
||||
if actual == nil {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected boolean, got null", path))
|
||||
} else if _, ok := actual.(bool); !ok {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected boolean, got %T", path, actual))
|
||||
}
|
||||
case "array":
|
||||
if actual == nil {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected array, got null (arrays must be [] not null)", path))
|
||||
return errs
|
||||
}
|
||||
arr, ok := actual.([]interface{})
|
||||
if !ok {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected array, got %T", path, actual))
|
||||
return errs
|
||||
}
|
||||
if spec.ElementShape != nil && len(arr) > 0 {
|
||||
errs = append(errs, validateShape(arr[0], *spec.ElementShape, path+"[0]")...)
|
||||
}
|
||||
case "object":
|
||||
if actual == nil {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected object, got null", path))
|
||||
return errs
|
||||
}
|
||||
obj, ok := actual.(map[string]interface{})
|
||||
if !ok {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected object, got %T", path, actual))
|
||||
return errs
|
||||
}
|
||||
|
||||
if spec.DynamicKeys {
|
||||
// Object with dynamic keys — validate value shapes
|
||||
if spec.ValueShape != nil && len(obj) > 0 {
|
||||
for k, v := range obj {
|
||||
errs = append(errs, validateShape(v, *spec.ValueShape, path+"."+k)...)
|
||||
break // check just one sample
|
||||
}
|
||||
}
|
||||
if spec.RequiredKeys != nil {
|
||||
for rk, rs := range spec.RequiredKeys {
|
||||
v, exists := obj[rk]
|
||||
if !exists {
|
||||
errs = append(errs, fmt.Sprintf("%s: missing required key %q in dynamic-key object", path, rk))
|
||||
} else {
|
||||
errs = append(errs, validateShape(v, rs, path+"."+rk)...)
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if spec.Keys != nil {
|
||||
// Object with known keys — check each expected key exists and has correct type
|
||||
for key, keySpec := range spec.Keys {
|
||||
val, exists := obj[key]
|
||||
if !exists {
|
||||
errs = append(errs, fmt.Sprintf("%s: missing field %q (expected %s)", path, key, keySpec.Type))
|
||||
} else {
|
||||
errs = append(errs, validateShape(val, keySpec, path+"."+key)...)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return errs
|
||||
}
|
||||
|
||||
// parityEndpoint defines one endpoint to test for parity.
|
||||
type parityEndpoint struct {
|
||||
name string // key in shapes.json
|
||||
path string // HTTP path to request
|
||||
}
|
||||
|
||||
func TestParityShapes(t *testing.T) {
|
||||
shapes := loadShapes(t)
|
||||
_, router := setupTestServer(t)
|
||||
|
||||
endpoints := []parityEndpoint{
|
||||
{"stats", "/api/stats"},
|
||||
{"nodes", "/api/nodes?limit=5"},
|
||||
{"packets", "/api/packets?limit=5"},
|
||||
{"packets_grouped", "/api/packets?limit=5&groupByHash=true"},
|
||||
{"observers", "/api/observers"},
|
||||
{"channels", "/api/channels"},
|
||||
{"channel_messages", "/api/channels/0000000000000000/messages?limit=5"},
|
||||
{"analytics_rf", "/api/analytics/rf?days=7"},
|
||||
{"analytics_topology", "/api/analytics/topology?days=7"},
|
||||
{"analytics_hash_sizes", "/api/analytics/hash-sizes?days=7"},
|
||||
{"analytics_distance", "/api/analytics/distance?days=7"},
|
||||
{"analytics_subpaths", "/api/analytics/subpaths?days=7"},
|
||||
{"bulk_health", "/api/nodes/bulk-health"},
|
||||
{"health", "/api/health"},
|
||||
{"perf", "/api/perf"},
|
||||
}
|
||||
|
||||
for _, ep := range endpoints {
|
||||
t.Run("Parity_"+ep.name, func(t *testing.T) {
|
||||
spec, ok := shapes[ep.name]
|
||||
if !ok {
|
||||
t.Fatalf("no shape spec found for %q in shapes.json", ep.name)
|
||||
}
|
||||
|
||||
req := httptest.NewRequest("GET", ep.path, nil)
|
||||
w := httptest.NewRecorder()
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
if w.Code != 200 {
|
||||
t.Fatalf("GET %s returned %d, expected 200. Body: %s",
|
||||
ep.path, w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var body interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &body); err != nil {
|
||||
t.Fatalf("GET %s returned invalid JSON: %v\nBody: %s",
|
||||
ep.path, err, w.Body.String())
|
||||
}
|
||||
|
||||
mismatches := validateShape(body, spec, ep.path)
|
||||
if len(mismatches) > 0 {
|
||||
t.Errorf("Go %s has %d shape mismatches vs Node.js golden:\n %s",
|
||||
ep.path, len(mismatches), strings.Join(mismatches, "\n "))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestParityNodeDetail tests node detail endpoint shape.
|
||||
// Uses a known test node public key from seeded data.
|
||||
func TestParityNodeDetail(t *testing.T) {
|
||||
shapes := loadShapes(t)
|
||||
_, router := setupTestServer(t)
|
||||
|
||||
spec, ok := shapes["node_detail"]
|
||||
if !ok {
|
||||
t.Fatal("no shape spec for node_detail in shapes.json")
|
||||
}
|
||||
|
||||
req := httptest.NewRequest("GET", "/api/nodes/aabbccdd11223344", nil)
|
||||
w := httptest.NewRecorder()
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
if w.Code != 200 {
|
||||
t.Fatalf("node detail returned %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var body interface{}
|
||||
json.Unmarshal(w.Body.Bytes(), &body)
|
||||
|
||||
mismatches := validateShape(body, spec, "/api/nodes/{pubkey}")
|
||||
if len(mismatches) > 0 {
|
||||
t.Errorf("Go node detail has %d shape mismatches vs Node.js golden:\n %s",
|
||||
len(mismatches), strings.Join(mismatches, "\n "))
|
||||
}
|
||||
}
|
||||
|
||||
// TestParityArraysNotNull verifies that array-typed fields in Go responses are
|
||||
// [] (empty array) rather than null. This is a common Go/JSON pitfall where
|
||||
// nil slices marshal as null instead of [].
|
||||
// Uses shapes.json to know which fields SHOULD be arrays.
|
||||
func TestParityArraysNotNull(t *testing.T) {
|
||||
shapes := loadShapes(t)
|
||||
_, router := setupTestServer(t)
|
||||
|
||||
endpoints := []struct {
|
||||
name string
|
||||
path string
|
||||
}{
|
||||
{"stats", "/api/stats"},
|
||||
{"nodes", "/api/nodes?limit=5"},
|
||||
{"packets", "/api/packets?limit=5"},
|
||||
{"packets_grouped", "/api/packets?limit=5&groupByHash=true"},
|
||||
{"observers", "/api/observers"},
|
||||
{"channels", "/api/channels"},
|
||||
{"bulk_health", "/api/nodes/bulk-health"},
|
||||
{"analytics_rf", "/api/analytics/rf?days=7"},
|
||||
{"analytics_topology", "/api/analytics/topology?days=7"},
|
||||
{"analytics_hash_sizes", "/api/analytics/hash-sizes?days=7"},
|
||||
{"analytics_distance", "/api/analytics/distance?days=7"},
|
||||
{"analytics_subpaths", "/api/analytics/subpaths?days=7"},
|
||||
}
|
||||
|
||||
for _, ep := range endpoints {
|
||||
t.Run("NullArrayCheck_"+ep.name, func(t *testing.T) {
|
||||
spec, ok := shapes[ep.name]
|
||||
if !ok {
|
||||
t.Skipf("no shape spec for %s", ep.name)
|
||||
}
|
||||
|
||||
req := httptest.NewRequest("GET", ep.path, nil)
|
||||
w := httptest.NewRecorder()
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
if w.Code != 200 {
|
||||
t.Skipf("GET %s returned %d, skipping null-array check", ep.path, w.Code)
|
||||
}
|
||||
|
||||
var body interface{}
|
||||
json.Unmarshal(w.Body.Bytes(), &body)
|
||||
|
||||
nullArrays := findNullArrays(body, spec, ep.path)
|
||||
if len(nullArrays) > 0 {
|
||||
t.Errorf("Go %s has null where [] expected:\n %s\n"+
|
||||
"Go nil slices marshal as null — initialize with make() or literal",
|
||||
ep.path, strings.Join(nullArrays, "\n "))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// findNullArrays walks JSON data alongside a shape spec and returns paths
|
||||
// where the spec says the field should be an array but Go returned null.
|
||||
func findNullArrays(actual interface{}, spec shapeSpec, path string) []string {
|
||||
var nulls []string
|
||||
|
||||
switch spec.Type {
|
||||
case "array":
|
||||
if actual == nil {
|
||||
nulls = append(nulls, fmt.Sprintf("%s: null (should be [])", path))
|
||||
} else if arr, ok := actual.([]interface{}); ok && spec.ElementShape != nil {
|
||||
for i, elem := range arr {
|
||||
nulls = append(nulls, findNullArrays(elem, *spec.ElementShape, fmt.Sprintf("%s[%d]", path, i))...)
|
||||
}
|
||||
}
|
||||
case "object":
|
||||
obj, ok := actual.(map[string]interface{})
|
||||
if !ok || obj == nil {
|
||||
return nulls
|
||||
}
|
||||
if spec.Keys != nil {
|
||||
for key, keySpec := range spec.Keys {
|
||||
if val, exists := obj[key]; exists {
|
||||
nulls = append(nulls, findNullArrays(val, keySpec, path+"."+key)...)
|
||||
} else if keySpec.Type == "array" {
|
||||
// Key missing entirely — also a null-array problem
|
||||
nulls = append(nulls, fmt.Sprintf("%s.%s: missing (should be [])", path, key))
|
||||
}
|
||||
}
|
||||
}
|
||||
if spec.DynamicKeys && spec.ValueShape != nil {
|
||||
for k, v := range obj {
|
||||
nulls = append(nulls, findNullArrays(v, *spec.ValueShape, path+"."+k)...)
|
||||
break // sample one
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nulls
|
||||
}
|
||||
|
||||
// TestParityHealthEngine verifies Go health endpoint declares engine=go
|
||||
// while Node declares engine=node (or omits it). The Go server must always
|
||||
// identify itself.
|
||||
func TestParityHealthEngine(t *testing.T) {
|
||||
_, router := setupTestServer(t)
|
||||
|
||||
req := httptest.NewRequest("GET", "/api/health", nil)
|
||||
w := httptest.NewRecorder()
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
var body map[string]interface{}
|
||||
json.Unmarshal(w.Body.Bytes(), &body)
|
||||
|
||||
engine, ok := body["engine"]
|
||||
if !ok {
|
||||
t.Error("health response missing 'engine' field (Go server must include engine=go)")
|
||||
} else if engine != "go" {
|
||||
t.Errorf("health engine=%v, expected 'go'", engine)
|
||||
}
|
||||
}
|
||||
|
||||
// TestValidateShapeFunction directly tests the shape validator itself.
|
||||
func TestValidateShapeFunction(t *testing.T) {
|
||||
t.Run("string match", func(t *testing.T) {
|
||||
errs := validateShape("hello", shapeSpec{Type: "string"}, "$.x")
|
||||
if len(errs) != 0 {
|
||||
t.Errorf("unexpected errors: %v", errs)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("string mismatch", func(t *testing.T) {
|
||||
errs := validateShape(42.0, shapeSpec{Type: "string"}, "$.x")
|
||||
if len(errs) != 1 {
|
||||
t.Errorf("expected 1 error, got %d: %v", len(errs), errs)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("null array rejected", func(t *testing.T) {
|
||||
errs := validateShape(nil, shapeSpec{Type: "array"}, "$.arr")
|
||||
if len(errs) != 1 || !strings.Contains(errs[0], "null") {
|
||||
t.Errorf("expected null-array error, got: %v", errs)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("empty array OK", func(t *testing.T) {
|
||||
errs := validateShape([]interface{}{}, shapeSpec{Type: "array"}, "$.arr")
|
||||
if len(errs) != 0 {
|
||||
t.Errorf("unexpected errors for empty array: %v", errs)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("missing object key", func(t *testing.T) {
|
||||
spec := shapeSpec{Type: "object", Keys: map[string]shapeSpec{
|
||||
"name": {Type: "string"},
|
||||
"age": {Type: "number"},
|
||||
}}
|
||||
obj := map[string]interface{}{"name": "test"}
|
||||
errs := validateShape(obj, spec, "$.user")
|
||||
if len(errs) != 1 || !strings.Contains(errs[0], "age") {
|
||||
t.Errorf("expected missing age error, got: %v", errs)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("nullable allows null", func(t *testing.T) {
|
||||
errs := validateShape(nil, shapeSpec{Type: "nullable"}, "$.x")
|
||||
if len(errs) != 0 {
|
||||
t.Errorf("nullable should accept null: %v", errs)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("dynamic keys validates value shape", func(t *testing.T) {
|
||||
spec := shapeSpec{
|
||||
Type: "object",
|
||||
DynamicKeys: true,
|
||||
ValueShape: &shapeSpec{Type: "number"},
|
||||
}
|
||||
obj := map[string]interface{}{"a": 1.0, "b": 2.0}
|
||||
errs := validateShape(obj, spec, "$.dyn")
|
||||
if len(errs) != 0 {
|
||||
t.Errorf("unexpected errors: %v", errs)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestParityWSMultiObserverGolden(t *testing.T) {
|
||||
db := setupTestDB(t)
|
||||
defer db.Close()
|
||||
seedTestData(t, db)
|
||||
hub := NewHub()
|
||||
store := NewPacketStore(db)
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("store load failed: %v", err)
|
||||
}
|
||||
|
||||
poller := NewPoller(db, hub, 50*time.Millisecond)
|
||||
poller.store = store
|
||||
|
||||
client := &Client{send: make(chan []byte, 256)}
|
||||
hub.Register(client)
|
||||
defer hub.Unregister(client)
|
||||
|
||||
go poller.Start()
|
||||
defer poller.Stop()
|
||||
|
||||
// Wait for poller to initialize its lastID/lastObsID cursors before
|
||||
// inserting new data; otherwise the poller may snapshot a lastID that
|
||||
// already includes the test data and never broadcast it.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
now := time.Now().UTC().Format(time.RFC3339)
|
||||
if _, err := db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json)
|
||||
VALUES ('BEEF', 'goldenstarburst237', ?, 1, 4, '{"pubKey":"aabbccdd11223344","type":"ADVERT"}')`, now); err != nil {
|
||||
t.Fatalf("insert tx failed: %v", err)
|
||||
}
|
||||
var txID int
|
||||
if err := db.conn.QueryRow(`SELECT id FROM transmissions WHERE hash='goldenstarburst237'`).Scan(&txID); err != nil {
|
||||
t.Fatalf("query tx id failed: %v", err)
|
||||
}
|
||||
ts := time.Now().Unix()
|
||||
if _, err := db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
|
||||
VALUES (?, 1, 11.0, -88, '["p1"]', ?),
|
||||
(?, 2, 9.0, -92, '["p1","p2"]', ?),
|
||||
(?, 1, 7.0, -96, '["p1","p2","p3"]', ?)`,
|
||||
txID, ts, txID, ts+1, txID, ts+2); err != nil {
|
||||
t.Fatalf("insert obs failed: %v", err)
|
||||
}
|
||||
|
||||
type golden struct {
|
||||
Hash string
|
||||
Count int
|
||||
Paths []string
|
||||
ObserverIDs []string
|
||||
}
|
||||
expected := golden{
|
||||
Hash: "goldenstarburst237",
|
||||
Count: 3,
|
||||
Paths: []string{`["p1"]`, `["p1","p2"]`, `["p1","p2","p3"]`},
|
||||
ObserverIDs: []string{"obs1", "obs2"},
|
||||
}
|
||||
|
||||
gotPaths := make([]string, 0, expected.Count)
|
||||
gotObservers := make(map[string]bool)
|
||||
deadline := time.After(2 * time.Second)
|
||||
for len(gotPaths) < expected.Count {
|
||||
select {
|
||||
case raw := <-client.send:
|
||||
var msg map[string]interface{}
|
||||
if err := json.Unmarshal(raw, &msg); err != nil {
|
||||
t.Fatalf("unmarshal ws message failed: %v", err)
|
||||
}
|
||||
if msg["type"] != "packet" {
|
||||
continue
|
||||
}
|
||||
data, _ := msg["data"].(map[string]interface{})
|
||||
if data == nil || data["hash"] != expected.Hash {
|
||||
continue
|
||||
}
|
||||
if path, ok := data["path_json"].(string); ok {
|
||||
gotPaths = append(gotPaths, path)
|
||||
}
|
||||
if oid, ok := data["observer_id"].(string); ok && oid != "" {
|
||||
gotObservers[oid] = true
|
||||
}
|
||||
case <-deadline:
|
||||
t.Fatalf("timed out waiting for %d ws messages, got %d", expected.Count, len(gotPaths))
|
||||
}
|
||||
}
|
||||
|
||||
sort.Strings(gotPaths)
|
||||
sort.Strings(expected.Paths)
|
||||
if len(gotPaths) != len(expected.Paths) {
|
||||
t.Fatalf("path count mismatch: got %d want %d", len(gotPaths), len(expected.Paths))
|
||||
}
|
||||
for i := range expected.Paths {
|
||||
if gotPaths[i] != expected.Paths[i] {
|
||||
t.Fatalf("path mismatch at %d: got %q want %q", i, gotPaths[i], expected.Paths[i])
|
||||
}
|
||||
}
|
||||
for _, oid := range expected.ObserverIDs {
|
||||
if !gotObservers[oid] {
|
||||
t.Fatalf("missing expected observer %q in ws messages", oid)
|
||||
}
|
||||
}
|
||||
}
|
||||
package main
|
||||
|
||||
// parity_test.go — Golden fixture shape tests.
|
||||
// Validates that Go API responses match the shape of Node.js API responses.
|
||||
// Shapes were captured from the production Node.js server and stored in
|
||||
// testdata/golden/shapes.json.
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// shapeSpec describes the expected JSON structure from the Node.js server.
|
||||
type shapeSpec struct {
|
||||
Type string `json:"type"`
|
||||
Keys map[string]shapeSpec `json:"keys,omitempty"`
|
||||
ElementShape *shapeSpec `json:"elementShape,omitempty"`
|
||||
DynamicKeys bool `json:"dynamicKeys,omitempty"`
|
||||
ValueShape *shapeSpec `json:"valueShape,omitempty"`
|
||||
RequiredKeys map[string]shapeSpec `json:"requiredKeys,omitempty"`
|
||||
}
|
||||
|
||||
// loadShapes reads testdata/golden/shapes.json relative to this source file.
|
||||
func loadShapes(t *testing.T) map[string]shapeSpec {
|
||||
t.Helper()
|
||||
_, thisFile, _, _ := runtime.Caller(0)
|
||||
dir := filepath.Dir(thisFile)
|
||||
data, err := os.ReadFile(filepath.Join(dir, "testdata", "golden", "shapes.json"))
|
||||
if err != nil {
|
||||
t.Fatalf("cannot load shapes.json: %v", err)
|
||||
}
|
||||
var shapes map[string]shapeSpec
|
||||
if err := json.Unmarshal(data, &shapes); err != nil {
|
||||
t.Fatalf("cannot parse shapes.json: %v", err)
|
||||
}
|
||||
return shapes
|
||||
}
|
||||
|
||||
// validateShape recursively checks that `actual` matches the expected `spec`.
|
||||
// `path` tracks the JSON path for error messages.
|
||||
// Returns a list of mismatch descriptions.
|
||||
func validateShape(actual interface{}, spec shapeSpec, path string) []string {
|
||||
var errs []string
|
||||
|
||||
switch spec.Type {
|
||||
case "null", "nullable":
|
||||
// nullable means: value can be null OR matching type. Accept anything.
|
||||
return nil
|
||||
case "nullable_number":
|
||||
// Can be null or number
|
||||
if actual != nil {
|
||||
if _, ok := actual.(float64); !ok {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected number or null, got %T", path, actual))
|
||||
}
|
||||
}
|
||||
return errs
|
||||
case "string":
|
||||
if actual == nil {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected string, got null", path))
|
||||
} else if _, ok := actual.(string); !ok {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected string, got %T", path, actual))
|
||||
}
|
||||
case "number":
|
||||
if actual == nil {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected number, got null", path))
|
||||
} else if _, ok := actual.(float64); !ok {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected number, got %T (%v)", path, actual, actual))
|
||||
}
|
||||
case "boolean":
|
||||
if actual == nil {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected boolean, got null", path))
|
||||
} else if _, ok := actual.(bool); !ok {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected boolean, got %T", path, actual))
|
||||
}
|
||||
case "array":
|
||||
if actual == nil {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected array, got null (arrays must be [] not null)", path))
|
||||
return errs
|
||||
}
|
||||
arr, ok := actual.([]interface{})
|
||||
if !ok {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected array, got %T", path, actual))
|
||||
return errs
|
||||
}
|
||||
if spec.ElementShape != nil && len(arr) > 0 {
|
||||
errs = append(errs, validateShape(arr[0], *spec.ElementShape, path+"[0]")...)
|
||||
}
|
||||
case "object":
|
||||
if actual == nil {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected object, got null", path))
|
||||
return errs
|
||||
}
|
||||
obj, ok := actual.(map[string]interface{})
|
||||
if !ok {
|
||||
errs = append(errs, fmt.Sprintf("%s: expected object, got %T", path, actual))
|
||||
return errs
|
||||
}
|
||||
|
||||
if spec.DynamicKeys {
|
||||
// Object with dynamic keys — validate value shapes
|
||||
if spec.ValueShape != nil && len(obj) > 0 {
|
||||
for k, v := range obj {
|
||||
errs = append(errs, validateShape(v, *spec.ValueShape, path+"."+k)...)
|
||||
break // check just one sample
|
||||
}
|
||||
}
|
||||
if spec.RequiredKeys != nil {
|
||||
for rk, rs := range spec.RequiredKeys {
|
||||
v, exists := obj[rk]
|
||||
if !exists {
|
||||
errs = append(errs, fmt.Sprintf("%s: missing required key %q in dynamic-key object", path, rk))
|
||||
} else {
|
||||
errs = append(errs, validateShape(v, rs, path+"."+rk)...)
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if spec.Keys != nil {
|
||||
// Object with known keys — check each expected key exists and has correct type
|
||||
for key, keySpec := range spec.Keys {
|
||||
val, exists := obj[key]
|
||||
if !exists {
|
||||
errs = append(errs, fmt.Sprintf("%s: missing field %q (expected %s)", path, key, keySpec.Type))
|
||||
} else {
|
||||
errs = append(errs, validateShape(val, keySpec, path+"."+key)...)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return errs
|
||||
}
|
||||
|
||||
// parityEndpoint defines one endpoint to test for parity.
|
||||
type parityEndpoint struct {
|
||||
name string // key in shapes.json
|
||||
path string // HTTP path to request
|
||||
}
|
||||
|
||||
func TestParityShapes(t *testing.T) {
|
||||
shapes := loadShapes(t)
|
||||
_, router := setupTestServer(t)
|
||||
|
||||
endpoints := []parityEndpoint{
|
||||
{"stats", "/api/stats"},
|
||||
{"nodes", "/api/nodes?limit=5"},
|
||||
{"packets", "/api/packets?limit=5"},
|
||||
{"packets_grouped", "/api/packets?limit=5&groupByHash=true"},
|
||||
{"observers", "/api/observers"},
|
||||
{"channels", "/api/channels"},
|
||||
{"channel_messages", "/api/channels/0000000000000000/messages?limit=5"},
|
||||
{"analytics_rf", "/api/analytics/rf?days=7"},
|
||||
{"analytics_topology", "/api/analytics/topology?days=7"},
|
||||
{"analytics_hash_sizes", "/api/analytics/hash-sizes?days=7"},
|
||||
{"analytics_distance", "/api/analytics/distance?days=7"},
|
||||
{"analytics_subpaths", "/api/analytics/subpaths?days=7"},
|
||||
{"bulk_health", "/api/nodes/bulk-health"},
|
||||
{"health", "/api/health"},
|
||||
{"perf", "/api/perf"},
|
||||
}
|
||||
|
||||
for _, ep := range endpoints {
|
||||
t.Run("Parity_"+ep.name, func(t *testing.T) {
|
||||
spec, ok := shapes[ep.name]
|
||||
if !ok {
|
||||
t.Fatalf("no shape spec found for %q in shapes.json", ep.name)
|
||||
}
|
||||
|
||||
req := httptest.NewRequest("GET", ep.path, nil)
|
||||
w := httptest.NewRecorder()
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
if w.Code != 200 {
|
||||
t.Fatalf("GET %s returned %d, expected 200. Body: %s",
|
||||
ep.path, w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var body interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &body); err != nil {
|
||||
t.Fatalf("GET %s returned invalid JSON: %v\nBody: %s",
|
||||
ep.path, err, w.Body.String())
|
||||
}
|
||||
|
||||
mismatches := validateShape(body, spec, ep.path)
|
||||
if len(mismatches) > 0 {
|
||||
t.Errorf("Go %s has %d shape mismatches vs Node.js golden:\n %s",
|
||||
ep.path, len(mismatches), strings.Join(mismatches, "\n "))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestParityNodeDetail tests node detail endpoint shape.
|
||||
// Uses a known test node public key from seeded data.
|
||||
func TestParityNodeDetail(t *testing.T) {
|
||||
shapes := loadShapes(t)
|
||||
_, router := setupTestServer(t)
|
||||
|
||||
spec, ok := shapes["node_detail"]
|
||||
if !ok {
|
||||
t.Fatal("no shape spec for node_detail in shapes.json")
|
||||
}
|
||||
|
||||
req := httptest.NewRequest("GET", "/api/nodes/aabbccdd11223344", nil)
|
||||
w := httptest.NewRecorder()
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
if w.Code != 200 {
|
||||
t.Fatalf("node detail returned %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var body interface{}
|
||||
json.Unmarshal(w.Body.Bytes(), &body)
|
||||
|
||||
mismatches := validateShape(body, spec, "/api/nodes/{pubkey}")
|
||||
if len(mismatches) > 0 {
|
||||
t.Errorf("Go node detail has %d shape mismatches vs Node.js golden:\n %s",
|
||||
len(mismatches), strings.Join(mismatches, "\n "))
|
||||
}
|
||||
}
|
||||
|
||||
// TestParityArraysNotNull verifies that array-typed fields in Go responses are
|
||||
// [] (empty array) rather than null. This is a common Go/JSON pitfall where
|
||||
// nil slices marshal as null instead of [].
|
||||
// Uses shapes.json to know which fields SHOULD be arrays.
|
||||
func TestParityArraysNotNull(t *testing.T) {
|
||||
shapes := loadShapes(t)
|
||||
_, router := setupTestServer(t)
|
||||
|
||||
endpoints := []struct {
|
||||
name string
|
||||
path string
|
||||
}{
|
||||
{"stats", "/api/stats"},
|
||||
{"nodes", "/api/nodes?limit=5"},
|
||||
{"packets", "/api/packets?limit=5"},
|
||||
{"packets_grouped", "/api/packets?limit=5&groupByHash=true"},
|
||||
{"observers", "/api/observers"},
|
||||
{"channels", "/api/channels"},
|
||||
{"bulk_health", "/api/nodes/bulk-health"},
|
||||
{"analytics_rf", "/api/analytics/rf?days=7"},
|
||||
{"analytics_topology", "/api/analytics/topology?days=7"},
|
||||
{"analytics_hash_sizes", "/api/analytics/hash-sizes?days=7"},
|
||||
{"analytics_distance", "/api/analytics/distance?days=7"},
|
||||
{"analytics_subpaths", "/api/analytics/subpaths?days=7"},
|
||||
}
|
||||
|
||||
for _, ep := range endpoints {
|
||||
t.Run("NullArrayCheck_"+ep.name, func(t *testing.T) {
|
||||
spec, ok := shapes[ep.name]
|
||||
if !ok {
|
||||
t.Skipf("no shape spec for %s", ep.name)
|
||||
}
|
||||
|
||||
req := httptest.NewRequest("GET", ep.path, nil)
|
||||
w := httptest.NewRecorder()
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
if w.Code != 200 {
|
||||
t.Skipf("GET %s returned %d, skipping null-array check", ep.path, w.Code)
|
||||
}
|
||||
|
||||
var body interface{}
|
||||
json.Unmarshal(w.Body.Bytes(), &body)
|
||||
|
||||
nullArrays := findNullArrays(body, spec, ep.path)
|
||||
if len(nullArrays) > 0 {
|
||||
t.Errorf("Go %s has null where [] expected:\n %s\n"+
|
||||
"Go nil slices marshal as null — initialize with make() or literal",
|
||||
ep.path, strings.Join(nullArrays, "\n "))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// findNullArrays walks JSON data alongside a shape spec and returns paths
|
||||
// where the spec says the field should be an array but Go returned null.
|
||||
func findNullArrays(actual interface{}, spec shapeSpec, path string) []string {
|
||||
var nulls []string
|
||||
|
||||
switch spec.Type {
|
||||
case "array":
|
||||
if actual == nil {
|
||||
nulls = append(nulls, fmt.Sprintf("%s: null (should be [])", path))
|
||||
} else if arr, ok := actual.([]interface{}); ok && spec.ElementShape != nil {
|
||||
for i, elem := range arr {
|
||||
nulls = append(nulls, findNullArrays(elem, *spec.ElementShape, fmt.Sprintf("%s[%d]", path, i))...)
|
||||
}
|
||||
}
|
||||
case "object":
|
||||
obj, ok := actual.(map[string]interface{})
|
||||
if !ok || obj == nil {
|
||||
return nulls
|
||||
}
|
||||
if spec.Keys != nil {
|
||||
for key, keySpec := range spec.Keys {
|
||||
if val, exists := obj[key]; exists {
|
||||
nulls = append(nulls, findNullArrays(val, keySpec, path+"."+key)...)
|
||||
} else if keySpec.Type == "array" {
|
||||
// Key missing entirely — also a null-array problem
|
||||
nulls = append(nulls, fmt.Sprintf("%s.%s: missing (should be [])", path, key))
|
||||
}
|
||||
}
|
||||
}
|
||||
if spec.DynamicKeys && spec.ValueShape != nil {
|
||||
for k, v := range obj {
|
||||
nulls = append(nulls, findNullArrays(v, *spec.ValueShape, path+"."+k)...)
|
||||
break // sample one
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nulls
|
||||
}
|
||||
|
||||
// TestParityHealthEngine verifies Go health endpoint declares engine=go
|
||||
// while Node declares engine=node (or omits it). The Go server must always
|
||||
// identify itself.
|
||||
func TestParityHealthEngine(t *testing.T) {
|
||||
_, router := setupTestServer(t)
|
||||
|
||||
req := httptest.NewRequest("GET", "/api/health", nil)
|
||||
w := httptest.NewRecorder()
|
||||
router.ServeHTTP(w, req)
|
||||
|
||||
var body map[string]interface{}
|
||||
json.Unmarshal(w.Body.Bytes(), &body)
|
||||
|
||||
engine, ok := body["engine"]
|
||||
if !ok {
|
||||
t.Error("health response missing 'engine' field (Go server must include engine=go)")
|
||||
} else if engine != "go" {
|
||||
t.Errorf("health engine=%v, expected 'go'", engine)
|
||||
}
|
||||
}
|
||||
|
||||
// TestValidateShapeFunction directly tests the shape validator itself.
|
||||
func TestValidateShapeFunction(t *testing.T) {
|
||||
t.Run("string match", func(t *testing.T) {
|
||||
errs := validateShape("hello", shapeSpec{Type: "string"}, "$.x")
|
||||
if len(errs) != 0 {
|
||||
t.Errorf("unexpected errors: %v", errs)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("string mismatch", func(t *testing.T) {
|
||||
errs := validateShape(42.0, shapeSpec{Type: "string"}, "$.x")
|
||||
if len(errs) != 1 {
|
||||
t.Errorf("expected 1 error, got %d: %v", len(errs), errs)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("null array rejected", func(t *testing.T) {
|
||||
errs := validateShape(nil, shapeSpec{Type: "array"}, "$.arr")
|
||||
if len(errs) != 1 || !strings.Contains(errs[0], "null") {
|
||||
t.Errorf("expected null-array error, got: %v", errs)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("empty array OK", func(t *testing.T) {
|
||||
errs := validateShape([]interface{}{}, shapeSpec{Type: "array"}, "$.arr")
|
||||
if len(errs) != 0 {
|
||||
t.Errorf("unexpected errors for empty array: %v", errs)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("missing object key", func(t *testing.T) {
|
||||
spec := shapeSpec{Type: "object", Keys: map[string]shapeSpec{
|
||||
"name": {Type: "string"},
|
||||
"age": {Type: "number"},
|
||||
}}
|
||||
obj := map[string]interface{}{"name": "test"}
|
||||
errs := validateShape(obj, spec, "$.user")
|
||||
if len(errs) != 1 || !strings.Contains(errs[0], "age") {
|
||||
t.Errorf("expected missing age error, got: %v", errs)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("nullable allows null", func(t *testing.T) {
|
||||
errs := validateShape(nil, shapeSpec{Type: "nullable"}, "$.x")
|
||||
if len(errs) != 0 {
|
||||
t.Errorf("nullable should accept null: %v", errs)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("dynamic keys validates value shape", func(t *testing.T) {
|
||||
spec := shapeSpec{
|
||||
Type: "object",
|
||||
DynamicKeys: true,
|
||||
ValueShape: &shapeSpec{Type: "number"},
|
||||
}
|
||||
obj := map[string]interface{}{"a": 1.0, "b": 2.0}
|
||||
errs := validateShape(obj, spec, "$.dyn")
|
||||
if len(errs) != 0 {
|
||||
t.Errorf("unexpected errors: %v", errs)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1039,7 +1039,7 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac
|
||||
}
|
||||
}
|
||||
|
||||
// Build broadcast maps (same shape as Node.js WS broadcast), one per observation.
|
||||
// Build broadcast maps (same shape as Node.js WS broadcast)
|
||||
result := make([]map[string]interface{}, 0, len(broadcastOrder))
|
||||
for _, txID := range broadcastOrder {
|
||||
tx := broadcastTxs[txID]
|
||||
@@ -1055,34 +1055,32 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac
|
||||
decoded["payload"] = payload
|
||||
}
|
||||
}
|
||||
for _, obs := range tx.Observations {
|
||||
// Build the nested packet object (packets.js checks m.data.packet)
|
||||
pkt := map[string]interface{}{
|
||||
"id": tx.ID,
|
||||
"raw_hex": strOrNil(tx.RawHex),
|
||||
"hash": strOrNil(tx.Hash),
|
||||
"first_seen": strOrNil(tx.FirstSeen),
|
||||
"timestamp": strOrNil(tx.FirstSeen),
|
||||
"route_type": intPtrOrNil(tx.RouteType),
|
||||
"payload_type": intPtrOrNil(tx.PayloadType),
|
||||
"decoded_json": strOrNil(tx.DecodedJSON),
|
||||
"observer_id": strOrNil(obs.ObserverID),
|
||||
"observer_name": strOrNil(obs.ObserverName),
|
||||
"snr": floatPtrOrNil(obs.SNR),
|
||||
"rssi": floatPtrOrNil(obs.RSSI),
|
||||
"path_json": strOrNil(obs.PathJSON),
|
||||
"direction": strOrNil(obs.Direction),
|
||||
"observation_count": tx.ObservationCount,
|
||||
}
|
||||
// Broadcast map: top-level fields for live.js + nested packet for packets.js
|
||||
broadcastMap := make(map[string]interface{}, len(pkt)+2)
|
||||
for k, v := range pkt {
|
||||
broadcastMap[k] = v
|
||||
}
|
||||
broadcastMap["decoded"] = decoded
|
||||
broadcastMap["packet"] = pkt
|
||||
result = append(result, broadcastMap)
|
||||
// Build the nested packet object (packets.js checks m.data.packet)
|
||||
pkt := map[string]interface{}{
|
||||
"id": tx.ID,
|
||||
"raw_hex": strOrNil(tx.RawHex),
|
||||
"hash": strOrNil(tx.Hash),
|
||||
"first_seen": strOrNil(tx.FirstSeen),
|
||||
"timestamp": strOrNil(tx.FirstSeen),
|
||||
"route_type": intPtrOrNil(tx.RouteType),
|
||||
"payload_type": intPtrOrNil(tx.PayloadType),
|
||||
"decoded_json": strOrNil(tx.DecodedJSON),
|
||||
"observer_id": strOrNil(tx.ObserverID),
|
||||
"observer_name": strOrNil(tx.ObserverName),
|
||||
"snr": floatPtrOrNil(tx.SNR),
|
||||
"rssi": floatPtrOrNil(tx.RSSI),
|
||||
"path_json": strOrNil(tx.PathJSON),
|
||||
"direction": strOrNil(tx.Direction),
|
||||
"observation_count": tx.ObservationCount,
|
||||
}
|
||||
// Broadcast map: top-level fields for live.js + nested packet for packets.js
|
||||
broadcastMap := make(map[string]interface{}, len(pkt)+2)
|
||||
for k, v := range pkt {
|
||||
broadcastMap[k] = v
|
||||
}
|
||||
broadcastMap["decoded"] = decoded
|
||||
broadcastMap["packet"] = pkt
|
||||
result = append(result, broadcastMap)
|
||||
}
|
||||
|
||||
// Invalidate analytics caches since new data was ingested
|
||||
@@ -1103,7 +1101,7 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac
|
||||
// IngestNewObservations loads new observations for transmissions already in the
|
||||
// store. This catches observations that arrive after IngestNewFromDB has already
|
||||
// advanced past the transmission's ID (fixes #174).
|
||||
func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string]interface{} {
|
||||
func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) int {
|
||||
if limit <= 0 {
|
||||
limit = 500
|
||||
}
|
||||
@@ -1129,7 +1127,7 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string]
|
||||
rows, err := s.db.conn.Query(querySQL, sinceObsID, limit)
|
||||
if err != nil {
|
||||
log.Printf("[store] ingest observations query error: %v", err)
|
||||
return nil
|
||||
return sinceObsID
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
@@ -1172,16 +1170,20 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string]
|
||||
}
|
||||
|
||||
if len(obsRows) == 0 {
|
||||
return nil
|
||||
return sinceObsID
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
newMaxObsID := sinceObsID
|
||||
updatedTxs := make(map[int]*StoreTx)
|
||||
broadcastMaps := make([]map[string]interface{}, 0, len(obsRows))
|
||||
|
||||
for _, r := range obsRows {
|
||||
if r.obsID > newMaxObsID {
|
||||
newMaxObsID = r.obsID
|
||||
}
|
||||
|
||||
// Already ingested (e.g. by IngestNewFromDB in same cycle)
|
||||
if _, exists := s.byObsID[r.obsID]; exists {
|
||||
continue
|
||||
@@ -1224,43 +1226,6 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string]
|
||||
}
|
||||
s.totalObs++
|
||||
updatedTxs[r.txID] = tx
|
||||
|
||||
decoded := map[string]interface{}{
|
||||
"header": map[string]interface{}{
|
||||
"payloadTypeName": resolvePayloadTypeName(tx.PayloadType),
|
||||
},
|
||||
}
|
||||
if tx.DecodedJSON != "" {
|
||||
var payload map[string]interface{}
|
||||
if json.Unmarshal([]byte(tx.DecodedJSON), &payload) == nil {
|
||||
decoded["payload"] = payload
|
||||
}
|
||||
}
|
||||
|
||||
pkt := map[string]interface{}{
|
||||
"id": tx.ID,
|
||||
"raw_hex": strOrNil(tx.RawHex),
|
||||
"hash": strOrNil(tx.Hash),
|
||||
"first_seen": strOrNil(tx.FirstSeen),
|
||||
"timestamp": strOrNil(tx.FirstSeen),
|
||||
"route_type": intPtrOrNil(tx.RouteType),
|
||||
"payload_type": intPtrOrNil(tx.PayloadType),
|
||||
"decoded_json": strOrNil(tx.DecodedJSON),
|
||||
"observer_id": strOrNil(obs.ObserverID),
|
||||
"observer_name": strOrNil(obs.ObserverName),
|
||||
"snr": floatPtrOrNil(obs.SNR),
|
||||
"rssi": floatPtrOrNil(obs.RSSI),
|
||||
"path_json": strOrNil(obs.PathJSON),
|
||||
"direction": strOrNil(obs.Direction),
|
||||
"observation_count": tx.ObservationCount,
|
||||
}
|
||||
broadcastMap := make(map[string]interface{}, len(pkt)+2)
|
||||
for k, v := range pkt {
|
||||
broadcastMap[k] = v
|
||||
}
|
||||
broadcastMap["decoded"] = decoded
|
||||
broadcastMap["packet"] = pkt
|
||||
broadcastMaps = append(broadcastMaps, broadcastMap)
|
||||
}
|
||||
|
||||
// Re-pick best observation for updated transmissions and update subpath index
|
||||
@@ -1315,7 +1280,7 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string]
|
||||
// analytics caches cleared; no per-cycle log to avoid stdout overhead
|
||||
}
|
||||
|
||||
return broadcastMaps
|
||||
return newMaxObsID
|
||||
}
|
||||
|
||||
// MaxTransmissionID returns the highest transmission ID in the store.
|
||||
|
||||
@@ -1,245 +1,229 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"log"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
var upgrader = websocket.Upgrader{
|
||||
ReadBufferSize: 1024,
|
||||
WriteBufferSize: 4096,
|
||||
CheckOrigin: func(r *http.Request) bool { return true },
|
||||
}
|
||||
|
||||
// Hub manages WebSocket clients and broadcasts.
|
||||
type Hub struct {
|
||||
mu sync.RWMutex
|
||||
clients map[*Client]bool
|
||||
}
|
||||
|
||||
// Client is a single WebSocket connection.
|
||||
type Client struct {
|
||||
conn *websocket.Conn
|
||||
send chan []byte
|
||||
}
|
||||
|
||||
func NewHub() *Hub {
|
||||
return &Hub{
|
||||
clients: make(map[*Client]bool),
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Hub) ClientCount() int {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
return len(h.clients)
|
||||
}
|
||||
|
||||
func (h *Hub) Register(c *Client) {
|
||||
h.mu.Lock()
|
||||
h.clients[c] = true
|
||||
h.mu.Unlock()
|
||||
log.Printf("[ws] client connected (%d total)", h.ClientCount())
|
||||
}
|
||||
|
||||
func (h *Hub) Unregister(c *Client) {
|
||||
h.mu.Lock()
|
||||
if _, ok := h.clients[c]; ok {
|
||||
delete(h.clients, c)
|
||||
close(c.send)
|
||||
}
|
||||
h.mu.Unlock()
|
||||
log.Printf("[ws] client disconnected (%d total)", h.ClientCount())
|
||||
}
|
||||
|
||||
// Broadcast sends a message to all connected clients.
|
||||
func (h *Hub) Broadcast(msg interface{}) {
|
||||
data, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
log.Printf("[ws] marshal error: %v", err)
|
||||
return
|
||||
}
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
for c := range h.clients {
|
||||
select {
|
||||
case c.send <- data:
|
||||
default:
|
||||
// Client buffer full — drop
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ServeWS handles the WebSocket upgrade and runs the client.
|
||||
func (h *Hub) ServeWS(w http.ResponseWriter, r *http.Request) {
|
||||
conn, err := upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
log.Printf("[ws] upgrade error: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
client := &Client{
|
||||
conn: conn,
|
||||
send: make(chan []byte, 256),
|
||||
}
|
||||
h.Register(client)
|
||||
|
||||
go client.writePump()
|
||||
go client.readPump(h)
|
||||
}
|
||||
|
||||
// wsOrStatic upgrades WebSocket requests at any path, serves static files otherwise.
|
||||
func wsOrStatic(hub *Hub, static http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if strings.EqualFold(r.Header.Get("Upgrade"), "websocket") {
|
||||
hub.ServeWS(w, r)
|
||||
return
|
||||
}
|
||||
static.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
|
||||
func (c *Client) readPump(hub *Hub) {
|
||||
defer func() {
|
||||
hub.Unregister(c)
|
||||
c.conn.Close()
|
||||
}()
|
||||
c.conn.SetReadLimit(512)
|
||||
c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
|
||||
c.conn.SetPongHandler(func(string) error {
|
||||
c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
|
||||
return nil
|
||||
})
|
||||
for {
|
||||
_, _, err := c.conn.ReadMessage()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) writePump() {
|
||||
ticker := time.NewTicker(30 * time.Second)
|
||||
defer func() {
|
||||
ticker.Stop()
|
||||
c.conn.Close()
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case message, ok := <-c.send:
|
||||
c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
||||
if !ok {
|
||||
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
|
||||
return
|
||||
}
|
||||
if err := c.conn.WriteMessage(websocket.TextMessage, message); err != nil {
|
||||
return
|
||||
}
|
||||
case <-ticker.C:
|
||||
c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
||||
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Poller watches for new transmissions in SQLite and broadcasts them.
|
||||
type Poller struct {
|
||||
db *DB
|
||||
hub *Hub
|
||||
store *PacketStore // optional: if set, new transmissions are ingested into memory
|
||||
interval time.Duration
|
||||
stop chan struct{}
|
||||
}
|
||||
|
||||
func NewPoller(db *DB, hub *Hub, interval time.Duration) *Poller {
|
||||
return &Poller{db: db, hub: hub, interval: interval, stop: make(chan struct{})}
|
||||
}
|
||||
|
||||
func (p *Poller) Start() {
|
||||
lastID := p.db.GetMaxTransmissionID()
|
||||
lastObsID := p.db.GetMaxObservationID()
|
||||
log.Printf("[poller] starting from transmission ID %d, obs ID %d, interval %v", lastID, lastObsID, p.interval)
|
||||
|
||||
ticker := time.NewTicker(p.interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if p.store != nil {
|
||||
// Ingest new transmissions into in-memory store and broadcast
|
||||
newTxs, newMax := p.store.IngestNewFromDB(lastID, 100)
|
||||
if newMax > lastID {
|
||||
lastID = newMax
|
||||
}
|
||||
// Ingest new observations for existing transmissions (fixes #174)
|
||||
nextObsID := lastObsID
|
||||
if err := p.db.conn.QueryRow(`
|
||||
SELECT COALESCE(MAX(id), ?) FROM (
|
||||
SELECT id FROM observations
|
||||
WHERE id > ?
|
||||
ORDER BY id ASC
|
||||
LIMIT 500
|
||||
)`, lastObsID, lastObsID).Scan(&nextObsID); err != nil {
|
||||
nextObsID = lastObsID
|
||||
}
|
||||
newObs := p.store.IngestNewObservations(lastObsID, 500)
|
||||
if nextObsID > lastObsID {
|
||||
lastObsID = nextObsID
|
||||
}
|
||||
if len(newTxs) > 0 {
|
||||
log.Printf("[broadcast] sending %d packets to %d clients (lastID now %d)", len(newTxs), p.hub.ClientCount(), lastID)
|
||||
}
|
||||
for _, tx := range newTxs {
|
||||
p.hub.Broadcast(WSMessage{
|
||||
Type: "packet",
|
||||
Data: tx,
|
||||
})
|
||||
}
|
||||
for _, obs := range newObs {
|
||||
p.hub.Broadcast(WSMessage{
|
||||
Type: "packet",
|
||||
Data: obs,
|
||||
})
|
||||
}
|
||||
} else {
|
||||
// Fallback: direct DB query (used when store is nil, e.g. tests)
|
||||
newTxs, err := p.db.GetNewTransmissionsSince(lastID, 100)
|
||||
if err != nil {
|
||||
log.Printf("[poller] error: %v", err)
|
||||
continue
|
||||
}
|
||||
for _, tx := range newTxs {
|
||||
id, _ := tx["id"].(int)
|
||||
if id > lastID {
|
||||
lastID = id
|
||||
}
|
||||
// Copy packet fields for the nested packet (avoids circular ref)
|
||||
pkt := make(map[string]interface{}, len(tx))
|
||||
for k, v := range tx {
|
||||
pkt[k] = v
|
||||
}
|
||||
tx["packet"] = pkt
|
||||
p.hub.Broadcast(WSMessage{
|
||||
Type: "packet",
|
||||
Data: tx,
|
||||
})
|
||||
}
|
||||
}
|
||||
case <-p.stop:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Poller) Stop() {
|
||||
close(p.stop)
|
||||
}
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"log"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
var upgrader = websocket.Upgrader{
|
||||
ReadBufferSize: 1024,
|
||||
WriteBufferSize: 4096,
|
||||
CheckOrigin: func(r *http.Request) bool { return true },
|
||||
}
|
||||
|
||||
// Hub manages WebSocket clients and broadcasts.
|
||||
type Hub struct {
|
||||
mu sync.RWMutex
|
||||
clients map[*Client]bool
|
||||
}
|
||||
|
||||
// Client is a single WebSocket connection.
|
||||
type Client struct {
|
||||
conn *websocket.Conn
|
||||
send chan []byte
|
||||
}
|
||||
|
||||
func NewHub() *Hub {
|
||||
return &Hub{
|
||||
clients: make(map[*Client]bool),
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Hub) ClientCount() int {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
return len(h.clients)
|
||||
}
|
||||
|
||||
func (h *Hub) Register(c *Client) {
|
||||
h.mu.Lock()
|
||||
h.clients[c] = true
|
||||
h.mu.Unlock()
|
||||
log.Printf("[ws] client connected (%d total)", h.ClientCount())
|
||||
}
|
||||
|
||||
func (h *Hub) Unregister(c *Client) {
|
||||
h.mu.Lock()
|
||||
if _, ok := h.clients[c]; ok {
|
||||
delete(h.clients, c)
|
||||
close(c.send)
|
||||
}
|
||||
h.mu.Unlock()
|
||||
log.Printf("[ws] client disconnected (%d total)", h.ClientCount())
|
||||
}
|
||||
|
||||
// Broadcast sends a message to all connected clients.
|
||||
func (h *Hub) Broadcast(msg interface{}) {
|
||||
data, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
log.Printf("[ws] marshal error: %v", err)
|
||||
return
|
||||
}
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
for c := range h.clients {
|
||||
select {
|
||||
case c.send <- data:
|
||||
default:
|
||||
// Client buffer full — drop
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ServeWS handles the WebSocket upgrade and runs the client.
|
||||
func (h *Hub) ServeWS(w http.ResponseWriter, r *http.Request) {
|
||||
conn, err := upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
log.Printf("[ws] upgrade error: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
client := &Client{
|
||||
conn: conn,
|
||||
send: make(chan []byte, 256),
|
||||
}
|
||||
h.Register(client)
|
||||
|
||||
go client.writePump()
|
||||
go client.readPump(h)
|
||||
}
|
||||
|
||||
// wsOrStatic upgrades WebSocket requests at any path, serves static files otherwise.
|
||||
func wsOrStatic(hub *Hub, static http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if strings.EqualFold(r.Header.Get("Upgrade"), "websocket") {
|
||||
hub.ServeWS(w, r)
|
||||
return
|
||||
}
|
||||
static.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
|
||||
func (c *Client) readPump(hub *Hub) {
|
||||
defer func() {
|
||||
hub.Unregister(c)
|
||||
c.conn.Close()
|
||||
}()
|
||||
c.conn.SetReadLimit(512)
|
||||
c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
|
||||
c.conn.SetPongHandler(func(string) error {
|
||||
c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
|
||||
return nil
|
||||
})
|
||||
for {
|
||||
_, _, err := c.conn.ReadMessage()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) writePump() {
|
||||
ticker := time.NewTicker(30 * time.Second)
|
||||
defer func() {
|
||||
ticker.Stop()
|
||||
c.conn.Close()
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case message, ok := <-c.send:
|
||||
c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
||||
if !ok {
|
||||
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
|
||||
return
|
||||
}
|
||||
if err := c.conn.WriteMessage(websocket.TextMessage, message); err != nil {
|
||||
return
|
||||
}
|
||||
case <-ticker.C:
|
||||
c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
||||
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Poller watches for new transmissions in SQLite and broadcasts them.
|
||||
type Poller struct {
|
||||
db *DB
|
||||
hub *Hub
|
||||
store *PacketStore // optional: if set, new transmissions are ingested into memory
|
||||
interval time.Duration
|
||||
stop chan struct{}
|
||||
}
|
||||
|
||||
func NewPoller(db *DB, hub *Hub, interval time.Duration) *Poller {
|
||||
return &Poller{db: db, hub: hub, interval: interval, stop: make(chan struct{})}
|
||||
}
|
||||
|
||||
func (p *Poller) Start() {
|
||||
lastID := p.db.GetMaxTransmissionID()
|
||||
lastObsID := p.db.GetMaxObservationID()
|
||||
log.Printf("[poller] starting from transmission ID %d, obs ID %d, interval %v", lastID, lastObsID, p.interval)
|
||||
|
||||
ticker := time.NewTicker(p.interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if p.store != nil {
|
||||
// Ingest new transmissions into in-memory store and broadcast
|
||||
newTxs, newMax := p.store.IngestNewFromDB(lastID, 100)
|
||||
if newMax > lastID {
|
||||
lastID = newMax
|
||||
}
|
||||
// Ingest new observations for existing transmissions (fixes #174)
|
||||
newObsMax := p.store.IngestNewObservations(lastObsID, 500)
|
||||
if newObsMax > lastObsID {
|
||||
lastObsID = newObsMax
|
||||
}
|
||||
if len(newTxs) > 0 {
|
||||
log.Printf("[broadcast] sending %d packets to %d clients (lastID now %d)", len(newTxs), p.hub.ClientCount(), lastID)
|
||||
}
|
||||
for _, tx := range newTxs {
|
||||
p.hub.Broadcast(WSMessage{
|
||||
Type: "packet",
|
||||
Data: tx,
|
||||
})
|
||||
}
|
||||
} else {
|
||||
// Fallback: direct DB query (used when store is nil, e.g. tests)
|
||||
newTxs, err := p.db.GetNewTransmissionsSince(lastID, 100)
|
||||
if err != nil {
|
||||
log.Printf("[poller] error: %v", err)
|
||||
continue
|
||||
}
|
||||
for _, tx := range newTxs {
|
||||
id, _ := tx["id"].(int)
|
||||
if id > lastID {
|
||||
lastID = id
|
||||
}
|
||||
// Copy packet fields for the nested packet (avoids circular ref)
|
||||
pkt := make(map[string]interface{}, len(tx))
|
||||
for k, v := range tx {
|
||||
pkt[k] = v
|
||||
}
|
||||
tx["packet"] = pkt
|
||||
p.hub.Broadcast(WSMessage{
|
||||
Type: "packet",
|
||||
Data: tx,
|
||||
})
|
||||
}
|
||||
}
|
||||
case <-p.stop:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Poller) Stop() {
|
||||
close(p.stop)
|
||||
}
|
||||
|
||||
@@ -1,415 +1,275 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
func TestHubBroadcast(t *testing.T) {
|
||||
hub := NewHub()
|
||||
|
||||
if hub.ClientCount() != 0 {
|
||||
t.Errorf("expected 0 clients, got %d", hub.ClientCount())
|
||||
}
|
||||
|
||||
// Create a test server with WebSocket endpoint
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
hub.ServeWS(w, r)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
// Connect a WebSocket client
|
||||
wsURL := "ws" + srv.URL[4:] // replace http with ws
|
||||
conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("dial error: %v", err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
// Wait for registration
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
if hub.ClientCount() != 1 {
|
||||
t.Errorf("expected 1 client, got %d", hub.ClientCount())
|
||||
}
|
||||
|
||||
// Broadcast a message
|
||||
hub.Broadcast(map[string]interface{}{
|
||||
"type": "packet",
|
||||
"data": map[string]interface{}{"id": 1, "hash": "test123"},
|
||||
})
|
||||
|
||||
// Read the message
|
||||
conn.SetReadDeadline(time.Now().Add(2 * time.Second))
|
||||
_, msg, err := conn.ReadMessage()
|
||||
if err != nil {
|
||||
t.Fatalf("read error: %v", err)
|
||||
}
|
||||
if len(msg) == 0 {
|
||||
t.Error("expected non-empty message")
|
||||
}
|
||||
|
||||
// Disconnect
|
||||
conn.Close()
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
|
||||
func TestPollerCreation(t *testing.T) {
|
||||
db := setupTestDB(t)
|
||||
defer db.Close()
|
||||
seedTestData(t, db)
|
||||
hub := NewHub()
|
||||
|
||||
poller := NewPoller(db, hub, 100*time.Millisecond)
|
||||
if poller == nil {
|
||||
t.Fatal("expected poller")
|
||||
}
|
||||
|
||||
// Start and stop
|
||||
go poller.Start()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
poller.Stop()
|
||||
}
|
||||
|
||||
func TestHubMultipleClients(t *testing.T) {
|
||||
hub := NewHub()
|
||||
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
hub.ServeWS(w, r)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
wsURL := "ws" + srv.URL[4:]
|
||||
|
||||
// Connect two clients
|
||||
conn1, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("dial error: %v", err)
|
||||
}
|
||||
defer conn1.Close()
|
||||
|
||||
conn2, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("dial error: %v", err)
|
||||
}
|
||||
defer conn2.Close()
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
if hub.ClientCount() != 2 {
|
||||
t.Errorf("expected 2 clients, got %d", hub.ClientCount())
|
||||
}
|
||||
|
||||
// Broadcast and both should receive
|
||||
hub.Broadcast(map[string]interface{}{"type": "test", "data": "hello"})
|
||||
|
||||
conn1.SetReadDeadline(time.Now().Add(2 * time.Second))
|
||||
_, msg1, err := conn1.ReadMessage()
|
||||
if err != nil {
|
||||
t.Fatalf("conn1 read error: %v", err)
|
||||
}
|
||||
if len(msg1) == 0 {
|
||||
t.Error("expected non-empty message on conn1")
|
||||
}
|
||||
|
||||
conn2.SetReadDeadline(time.Now().Add(2 * time.Second))
|
||||
_, msg2, err := conn2.ReadMessage()
|
||||
if err != nil {
|
||||
t.Fatalf("conn2 read error: %v", err)
|
||||
}
|
||||
if len(msg2) == 0 {
|
||||
t.Error("expected non-empty message on conn2")
|
||||
}
|
||||
|
||||
// Disconnect one
|
||||
conn1.Close()
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Remaining client should still work
|
||||
hub.Broadcast(map[string]interface{}{"type": "test2"})
|
||||
|
||||
conn2.SetReadDeadline(time.Now().Add(2 * time.Second))
|
||||
_, msg3, err := conn2.ReadMessage()
|
||||
if err != nil {
|
||||
t.Fatalf("conn2 read error after disconnect: %v", err)
|
||||
}
|
||||
if len(msg3) == 0 {
|
||||
t.Error("expected non-empty message")
|
||||
}
|
||||
}
|
||||
|
||||
func TestBroadcastFullBuffer(t *testing.T) {
|
||||
hub := NewHub()
|
||||
|
||||
// Create a client with tiny buffer (1)
|
||||
client := &Client{
|
||||
send: make(chan []byte, 1),
|
||||
}
|
||||
hub.mu.Lock()
|
||||
hub.clients[client] = true
|
||||
hub.mu.Unlock()
|
||||
|
||||
// Fill the buffer
|
||||
client.send <- []byte("first")
|
||||
|
||||
// This broadcast should drop the message (buffer full)
|
||||
hub.Broadcast(map[string]interface{}{"type": "dropped"})
|
||||
|
||||
// Channel should still only have the first message
|
||||
select {
|
||||
case msg := <-client.send:
|
||||
if string(msg) != "first" {
|
||||
t.Errorf("expected 'first', got %s", string(msg))
|
||||
}
|
||||
default:
|
||||
t.Error("expected message in channel")
|
||||
}
|
||||
|
||||
// Clean up
|
||||
hub.mu.Lock()
|
||||
delete(hub.clients, client)
|
||||
hub.mu.Unlock()
|
||||
}
|
||||
|
||||
func TestBroadcastMarshalError(t *testing.T) {
|
||||
hub := NewHub()
|
||||
|
||||
// Marshal error: functions can't be marshaled to JSON
|
||||
hub.Broadcast(map[string]interface{}{"bad": func() {}})
|
||||
// Should not panic — just log and return
|
||||
}
|
||||
|
||||
func TestPollerBroadcastsNewData(t *testing.T) {
|
||||
db := setupTestDB(t)
|
||||
defer db.Close()
|
||||
seedTestData(t, db)
|
||||
hub := NewHub()
|
||||
|
||||
// Create a client to receive broadcasts
|
||||
client := &Client{
|
||||
send: make(chan []byte, 256),
|
||||
}
|
||||
hub.mu.Lock()
|
||||
hub.clients[client] = true
|
||||
hub.mu.Unlock()
|
||||
|
||||
poller := NewPoller(db, hub, 50*time.Millisecond)
|
||||
go poller.Start()
|
||||
|
||||
// Insert new data to trigger broadcast
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type)
|
||||
VALUES ('EEFF', 'newhash123456789', '2026-01-16T10:00:00Z', 1, 4)`)
|
||||
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
poller.Stop()
|
||||
|
||||
// Check if client received broadcast with packet field (fixes #162)
|
||||
select {
|
||||
case msg := <-client.send:
|
||||
if len(msg) == 0 {
|
||||
t.Error("expected non-empty broadcast message")
|
||||
}
|
||||
var parsed map[string]interface{}
|
||||
if err := json.Unmarshal(msg, &parsed); err != nil {
|
||||
t.Fatalf("failed to parse broadcast: %v", err)
|
||||
}
|
||||
if parsed["type"] != "packet" {
|
||||
t.Errorf("expected type=packet, got %v", parsed["type"])
|
||||
}
|
||||
data, ok := parsed["data"].(map[string]interface{})
|
||||
if !ok {
|
||||
t.Fatal("expected data to be an object")
|
||||
}
|
||||
// packets.js filters on m.data.packet — must exist
|
||||
pkt, ok := data["packet"]
|
||||
if !ok || pkt == nil {
|
||||
t.Error("expected data.packet to exist (required by packets.js WS handler)")
|
||||
}
|
||||
pktMap, ok := pkt.(map[string]interface{})
|
||||
if !ok {
|
||||
t.Fatal("expected data.packet to be an object")
|
||||
}
|
||||
// Verify key fields exist in nested packet (timestamp required by packets.js)
|
||||
for _, field := range []string{"id", "hash", "payload_type", "timestamp"} {
|
||||
if _, exists := pktMap[field]; !exists {
|
||||
t.Errorf("expected data.packet.%s to exist", field)
|
||||
}
|
||||
}
|
||||
default:
|
||||
// Might not have received due to timing
|
||||
}
|
||||
|
||||
// Clean up
|
||||
hub.mu.Lock()
|
||||
delete(hub.clients, client)
|
||||
hub.mu.Unlock()
|
||||
}
|
||||
|
||||
func TestPollerBroadcastsMultipleObservations(t *testing.T) {
|
||||
db := setupTestDB(t)
|
||||
defer db.Close()
|
||||
seedTestData(t, db)
|
||||
hub := NewHub()
|
||||
|
||||
client := &Client{
|
||||
send: make(chan []byte, 256),
|
||||
}
|
||||
hub.mu.Lock()
|
||||
hub.clients[client] = true
|
||||
hub.mu.Unlock()
|
||||
defer func() {
|
||||
hub.mu.Lock()
|
||||
delete(hub.clients, client)
|
||||
hub.mu.Unlock()
|
||||
}()
|
||||
|
||||
poller := NewPoller(db, hub, 50*time.Millisecond)
|
||||
store := NewPacketStore(db)
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("store load failed: %v", err)
|
||||
}
|
||||
poller.store = store
|
||||
go poller.Start()
|
||||
defer poller.Stop()
|
||||
|
||||
// Wait for poller to initialize its lastID/lastObsID cursors before
|
||||
// inserting new data; otherwise the poller may snapshot a lastID that
|
||||
// already includes the test data and never broadcast it.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
now := time.Now().UTC().Format(time.RFC3339)
|
||||
if _, err := db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json)
|
||||
VALUES ('FACE', 'starbursthash237a', ?, 1, 4, '{"pubKey":"aabbccdd11223344","type":"ADVERT"}')`, now); err != nil {
|
||||
t.Fatalf("insert tx failed: %v", err)
|
||||
}
|
||||
var txID int
|
||||
if err := db.conn.QueryRow(`SELECT id FROM transmissions WHERE hash='starbursthash237a'`).Scan(&txID); err != nil {
|
||||
t.Fatalf("query tx id failed: %v", err)
|
||||
}
|
||||
ts := time.Now().Unix()
|
||||
if _, err := db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
|
||||
VALUES (?, 1, 14.0, -82, '["aa"]', ?),
|
||||
(?, 2, 10.5, -90, '["aa","bb"]', ?),
|
||||
(?, 1, 7.0, -96, '["aa","bb","cc"]', ?)`,
|
||||
txID, ts, txID, ts+1, txID, ts+2); err != nil {
|
||||
t.Fatalf("insert observations failed: %v", err)
|
||||
}
|
||||
|
||||
deadline := time.After(2 * time.Second)
|
||||
var dataMsgs []map[string]interface{}
|
||||
for len(dataMsgs) < 3 {
|
||||
select {
|
||||
case raw := <-client.send:
|
||||
var parsed map[string]interface{}
|
||||
if err := json.Unmarshal(raw, &parsed); err != nil {
|
||||
t.Fatalf("unmarshal ws msg failed: %v", err)
|
||||
}
|
||||
if parsed["type"] != "packet" {
|
||||
continue
|
||||
}
|
||||
data, ok := parsed["data"].(map[string]interface{})
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if data["hash"] == "starbursthash237a" {
|
||||
dataMsgs = append(dataMsgs, data)
|
||||
}
|
||||
case <-deadline:
|
||||
t.Fatalf("timed out waiting for 3 observation broadcasts, got %d", len(dataMsgs))
|
||||
}
|
||||
}
|
||||
|
||||
if len(dataMsgs) != 3 {
|
||||
t.Fatalf("expected 3 messages, got %d", len(dataMsgs))
|
||||
}
|
||||
|
||||
paths := make([]string, 0, 3)
|
||||
observers := make(map[string]bool)
|
||||
for _, m := range dataMsgs {
|
||||
hash, _ := m["hash"].(string)
|
||||
if hash != "starbursthash237a" {
|
||||
t.Fatalf("unexpected hash %q", hash)
|
||||
}
|
||||
p, _ := m["path_json"].(string)
|
||||
paths = append(paths, p)
|
||||
if oid, ok := m["observer_id"].(string); ok && oid != "" {
|
||||
observers[oid] = true
|
||||
}
|
||||
}
|
||||
sort.Strings(paths)
|
||||
wantPaths := []string{`["aa","bb","cc"]`, `["aa","bb"]`, `["aa"]`}
|
||||
sort.Strings(wantPaths)
|
||||
for i := range wantPaths {
|
||||
if paths[i] != wantPaths[i] {
|
||||
t.Fatalf("path mismatch at %d: got %q want %q", i, paths[i], wantPaths[i])
|
||||
}
|
||||
}
|
||||
if len(observers) < 2 {
|
||||
t.Fatalf("expected observations from >=2 observers, got %d", len(observers))
|
||||
}
|
||||
}
|
||||
|
||||
func TestIngestNewObservationsBroadcast(t *testing.T) {
|
||||
db := setupTestDB(t)
|
||||
defer db.Close()
|
||||
seedTestData(t, db)
|
||||
store := NewPacketStore(db)
|
||||
if err := store.Load(); err != nil {
|
||||
t.Fatalf("store load failed: %v", err)
|
||||
}
|
||||
|
||||
maxObs := db.GetMaxObservationID()
|
||||
now := time.Now().Unix()
|
||||
if _, err := db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
|
||||
VALUES (1, 2, 6.0, -100, '["aa","zz"]', ?),
|
||||
(1, 1, 5.0, -101, '["aa","yy"]', ?)`, now, now+1); err != nil {
|
||||
t.Fatalf("insert new observations failed: %v", err)
|
||||
}
|
||||
|
||||
maps := store.IngestNewObservations(maxObs, 500)
|
||||
if len(maps) != 2 {
|
||||
t.Fatalf("expected 2 broadcast maps, got %d", len(maps))
|
||||
}
|
||||
for _, m := range maps {
|
||||
if m["hash"] != "abc123def4567890" {
|
||||
t.Fatalf("unexpected hash in map: %v", m["hash"])
|
||||
}
|
||||
path, ok := m["path_json"].(string)
|
||||
if !ok || path == "" {
|
||||
t.Fatalf("missing path_json in map: %#v", m)
|
||||
}
|
||||
if _, ok := m["observer_id"]; !ok {
|
||||
t.Fatalf("missing observer_id in map: %#v", m)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestHubRegisterUnregister(t *testing.T) {
|
||||
hub := NewHub()
|
||||
|
||||
client := &Client{
|
||||
send: make(chan []byte, 256),
|
||||
}
|
||||
|
||||
hub.Register(client)
|
||||
if hub.ClientCount() != 1 {
|
||||
t.Errorf("expected 1 client after register, got %d", hub.ClientCount())
|
||||
}
|
||||
|
||||
hub.Unregister(client)
|
||||
if hub.ClientCount() != 0 {
|
||||
t.Errorf("expected 0 clients after unregister, got %d", hub.ClientCount())
|
||||
}
|
||||
|
||||
// Unregister again should be safe
|
||||
hub.Unregister(client)
|
||||
if hub.ClientCount() != 0 {
|
||||
t.Errorf("expected 0 clients, got %d", hub.ClientCount())
|
||||
}
|
||||
}
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
func TestHubBroadcast(t *testing.T) {
|
||||
hub := NewHub()
|
||||
|
||||
if hub.ClientCount() != 0 {
|
||||
t.Errorf("expected 0 clients, got %d", hub.ClientCount())
|
||||
}
|
||||
|
||||
// Create a test server with WebSocket endpoint
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
hub.ServeWS(w, r)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
// Connect a WebSocket client
|
||||
wsURL := "ws" + srv.URL[4:] // replace http with ws
|
||||
conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("dial error: %v", err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
// Wait for registration
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
if hub.ClientCount() != 1 {
|
||||
t.Errorf("expected 1 client, got %d", hub.ClientCount())
|
||||
}
|
||||
|
||||
// Broadcast a message
|
||||
hub.Broadcast(map[string]interface{}{
|
||||
"type": "packet",
|
||||
"data": map[string]interface{}{"id": 1, "hash": "test123"},
|
||||
})
|
||||
|
||||
// Read the message
|
||||
conn.SetReadDeadline(time.Now().Add(2 * time.Second))
|
||||
_, msg, err := conn.ReadMessage()
|
||||
if err != nil {
|
||||
t.Fatalf("read error: %v", err)
|
||||
}
|
||||
if len(msg) == 0 {
|
||||
t.Error("expected non-empty message")
|
||||
}
|
||||
|
||||
// Disconnect
|
||||
conn.Close()
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
|
||||
func TestPollerCreation(t *testing.T) {
|
||||
db := setupTestDB(t)
|
||||
defer db.Close()
|
||||
seedTestData(t, db)
|
||||
hub := NewHub()
|
||||
|
||||
poller := NewPoller(db, hub, 100*time.Millisecond)
|
||||
if poller == nil {
|
||||
t.Fatal("expected poller")
|
||||
}
|
||||
|
||||
// Start and stop
|
||||
go poller.Start()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
poller.Stop()
|
||||
}
|
||||
|
||||
func TestHubMultipleClients(t *testing.T) {
|
||||
hub := NewHub()
|
||||
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
hub.ServeWS(w, r)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
wsURL := "ws" + srv.URL[4:]
|
||||
|
||||
// Connect two clients
|
||||
conn1, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("dial error: %v", err)
|
||||
}
|
||||
defer conn1.Close()
|
||||
|
||||
conn2, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("dial error: %v", err)
|
||||
}
|
||||
defer conn2.Close()
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
if hub.ClientCount() != 2 {
|
||||
t.Errorf("expected 2 clients, got %d", hub.ClientCount())
|
||||
}
|
||||
|
||||
// Broadcast and both should receive
|
||||
hub.Broadcast(map[string]interface{}{"type": "test", "data": "hello"})
|
||||
|
||||
conn1.SetReadDeadline(time.Now().Add(2 * time.Second))
|
||||
_, msg1, err := conn1.ReadMessage()
|
||||
if err != nil {
|
||||
t.Fatalf("conn1 read error: %v", err)
|
||||
}
|
||||
if len(msg1) == 0 {
|
||||
t.Error("expected non-empty message on conn1")
|
||||
}
|
||||
|
||||
conn2.SetReadDeadline(time.Now().Add(2 * time.Second))
|
||||
_, msg2, err := conn2.ReadMessage()
|
||||
if err != nil {
|
||||
t.Fatalf("conn2 read error: %v", err)
|
||||
}
|
||||
if len(msg2) == 0 {
|
||||
t.Error("expected non-empty message on conn2")
|
||||
}
|
||||
|
||||
// Disconnect one
|
||||
conn1.Close()
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Remaining client should still work
|
||||
hub.Broadcast(map[string]interface{}{"type": "test2"})
|
||||
|
||||
conn2.SetReadDeadline(time.Now().Add(2 * time.Second))
|
||||
_, msg3, err := conn2.ReadMessage()
|
||||
if err != nil {
|
||||
t.Fatalf("conn2 read error after disconnect: %v", err)
|
||||
}
|
||||
if len(msg3) == 0 {
|
||||
t.Error("expected non-empty message")
|
||||
}
|
||||
}
|
||||
|
||||
func TestBroadcastFullBuffer(t *testing.T) {
|
||||
hub := NewHub()
|
||||
|
||||
// Create a client with tiny buffer (1)
|
||||
client := &Client{
|
||||
send: make(chan []byte, 1),
|
||||
}
|
||||
hub.mu.Lock()
|
||||
hub.clients[client] = true
|
||||
hub.mu.Unlock()
|
||||
|
||||
// Fill the buffer
|
||||
client.send <- []byte("first")
|
||||
|
||||
// This broadcast should drop the message (buffer full)
|
||||
hub.Broadcast(map[string]interface{}{"type": "dropped"})
|
||||
|
||||
// Channel should still only have the first message
|
||||
select {
|
||||
case msg := <-client.send:
|
||||
if string(msg) != "first" {
|
||||
t.Errorf("expected 'first', got %s", string(msg))
|
||||
}
|
||||
default:
|
||||
t.Error("expected message in channel")
|
||||
}
|
||||
|
||||
// Clean up
|
||||
hub.mu.Lock()
|
||||
delete(hub.clients, client)
|
||||
hub.mu.Unlock()
|
||||
}
|
||||
|
||||
func TestBroadcastMarshalError(t *testing.T) {
|
||||
hub := NewHub()
|
||||
|
||||
// Marshal error: functions can't be marshaled to JSON
|
||||
hub.Broadcast(map[string]interface{}{"bad": func() {}})
|
||||
// Should not panic — just log and return
|
||||
}
|
||||
|
||||
func TestPollerBroadcastsNewData(t *testing.T) {
|
||||
db := setupTestDB(t)
|
||||
defer db.Close()
|
||||
seedTestData(t, db)
|
||||
hub := NewHub()
|
||||
|
||||
// Create a client to receive broadcasts
|
||||
client := &Client{
|
||||
send: make(chan []byte, 256),
|
||||
}
|
||||
hub.mu.Lock()
|
||||
hub.clients[client] = true
|
||||
hub.mu.Unlock()
|
||||
|
||||
poller := NewPoller(db, hub, 50*time.Millisecond)
|
||||
go poller.Start()
|
||||
|
||||
// Insert new data to trigger broadcast
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type)
|
||||
VALUES ('EEFF', 'newhash123456789', '2026-01-16T10:00:00Z', 1, 4)`)
|
||||
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
poller.Stop()
|
||||
|
||||
// Check if client received broadcast with packet field (fixes #162)
|
||||
select {
|
||||
case msg := <-client.send:
|
||||
if len(msg) == 0 {
|
||||
t.Error("expected non-empty broadcast message")
|
||||
}
|
||||
var parsed map[string]interface{}
|
||||
if err := json.Unmarshal(msg, &parsed); err != nil {
|
||||
t.Fatalf("failed to parse broadcast: %v", err)
|
||||
}
|
||||
if parsed["type"] != "packet" {
|
||||
t.Errorf("expected type=packet, got %v", parsed["type"])
|
||||
}
|
||||
data, ok := parsed["data"].(map[string]interface{})
|
||||
if !ok {
|
||||
t.Fatal("expected data to be an object")
|
||||
}
|
||||
// packets.js filters on m.data.packet — must exist
|
||||
pkt, ok := data["packet"]
|
||||
if !ok || pkt == nil {
|
||||
t.Error("expected data.packet to exist (required by packets.js WS handler)")
|
||||
}
|
||||
pktMap, ok := pkt.(map[string]interface{})
|
||||
if !ok {
|
||||
t.Fatal("expected data.packet to be an object")
|
||||
}
|
||||
// Verify key fields exist in nested packet (timestamp required by packets.js)
|
||||
for _, field := range []string{"id", "hash", "payload_type", "timestamp"} {
|
||||
if _, exists := pktMap[field]; !exists {
|
||||
t.Errorf("expected data.packet.%s to exist", field)
|
||||
}
|
||||
}
|
||||
default:
|
||||
// Might not have received due to timing
|
||||
}
|
||||
|
||||
// Clean up
|
||||
hub.mu.Lock()
|
||||
delete(hub.clients, client)
|
||||
hub.mu.Unlock()
|
||||
}
|
||||
|
||||
func TestHubRegisterUnregister(t *testing.T) {
|
||||
hub := NewHub()
|
||||
|
||||
client := &Client{
|
||||
send: make(chan []byte, 256),
|
||||
}
|
||||
|
||||
hub.Register(client)
|
||||
if hub.ClientCount() != 1 {
|
||||
t.Errorf("expected 1 client after register, got %d", hub.ClientCount())
|
||||
}
|
||||
|
||||
hub.Unregister(client)
|
||||
if hub.ClientCount() != 0 {
|
||||
t.Errorf("expected 0 clients after unregister, got %d", hub.ClientCount())
|
||||
}
|
||||
|
||||
// Unregister again should be safe
|
||||
hub.Unregister(client)
|
||||
if hub.ClientCount() != 0 {
|
||||
t.Errorf("expected 0 clients, got %d", hub.ClientCount())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -64,9 +64,9 @@ async function collectCoverage() {
|
||||
// ══════════════════════════════════════════════
|
||||
console.log(' [coverage] Home page — chooser...');
|
||||
// Clear localStorage to get chooser
|
||||
await page.goto(BASE, { waitUntil: 'networkidle', timeout: 15000 }).catch(() => {});
|
||||
await page.goto(BASE, { waitUntil: 'domcontentloaded', timeout: 10000 }).catch(() => {});
|
||||
await page.evaluate(() => localStorage.clear()).catch(() => {});
|
||||
await page.goto(`${BASE}/#/home`, { waitUntil: 'networkidle', timeout: 15000 }).catch(() => {});
|
||||
await page.goto(`${BASE}/#/home`, { waitUntil: 'domcontentloaded', timeout: 10000 }).catch(() => {});
|
||||
|
||||
// Click "I'm new"
|
||||
await safeClick('#chooseNew');
|
||||
@@ -105,7 +105,7 @@ async function collectCoverage() {
|
||||
|
||||
// Switch to experienced mode
|
||||
await page.evaluate(() => localStorage.clear()).catch(() => {});
|
||||
await page.goto(`${BASE}/#/home`, { waitUntil: 'networkidle', timeout: 15000 }).catch(() => {});
|
||||
await page.goto(`${BASE}/#/home`, { waitUntil: 'domcontentloaded', timeout: 10000 }).catch(() => {});
|
||||
await safeClick('#chooseExp');
|
||||
|
||||
// Interact with experienced home page
|
||||
@@ -120,7 +120,7 @@ async function collectCoverage() {
|
||||
// NODES PAGE
|
||||
// ══════════════════════════════════════════════
|
||||
console.log(' [coverage] Nodes page...');
|
||||
await page.goto(`${BASE}/#/nodes`, { waitUntil: 'networkidle', timeout: 15000 }).catch(() => {});
|
||||
await page.goto(`${BASE}/#/nodes`, { waitUntil: 'domcontentloaded', timeout: 10000 }).catch(() => {});
|
||||
|
||||
// Sort by EVERY column
|
||||
for (const col of ['name', 'public_key', 'role', 'last_seen', 'advert_count']) {
|
||||
@@ -168,7 +168,7 @@ async function collectCoverage() {
|
||||
try {
|
||||
const firstNodeKey = await page.$eval('#nodesBody tr td:nth-child(2)', el => el.textContent.trim());
|
||||
if (firstNodeKey) {
|
||||
await page.goto(`${BASE}/#/nodes/${firstNodeKey}`, { waitUntil: 'networkidle', timeout: 15000 }).catch(() => {});
|
||||
await page.goto(`${BASE}/#/nodes/${firstNodeKey}`, { waitUntil: 'domcontentloaded', timeout: 10000 }).catch(() => {});
|
||||
|
||||
// Click tabs on detail page
|
||||
await clickAll('.tab-btn, [data-tab]', 10);
|
||||
@@ -191,7 +191,7 @@ async function collectCoverage() {
|
||||
try {
|
||||
const firstKey = await page.$eval('#nodesBody tr td:nth-child(2)', el => el.textContent.trim()).catch(() => null);
|
||||
if (firstKey) {
|
||||
await page.goto(`${BASE}/#/nodes/${firstKey}?scroll=paths`, { waitUntil: 'networkidle', timeout: 15000 }).catch(() => {});
|
||||
await page.goto(`${BASE}/#/nodes/${firstKey}?scroll=paths`, { waitUntil: 'domcontentloaded', timeout: 10000 }).catch(() => {});
|
||||
}
|
||||
} catch {}
|
||||
|
||||
@@ -199,7 +199,7 @@ async function collectCoverage() {
|
||||
// PACKETS PAGE
|
||||
// ══════════════════════════════════════════════
|
||||
console.log(' [coverage] Packets page...');
|
||||
await page.goto(`${BASE}/#/packets`, { waitUntil: 'networkidle', timeout: 15000 }).catch(() => {});
|
||||
await page.goto(`${BASE}/#/packets`, { waitUntil: 'domcontentloaded', timeout: 10000 }).catch(() => {});
|
||||
|
||||
// Open filter bar
|
||||
await safeClick('#filterToggleBtn');
|
||||
@@ -285,13 +285,13 @@ async function collectCoverage() {
|
||||
} catch {}
|
||||
|
||||
// Navigate to specific packet by hash
|
||||
await page.goto(`${BASE}/#/packets/deadbeef`, { waitUntil: 'networkidle', timeout: 15000 }).catch(() => {});
|
||||
await page.goto(`${BASE}/#/packets/deadbeef`, { waitUntil: 'domcontentloaded', timeout: 10000 }).catch(() => {});
|
||||
|
||||
// ══════════════════════════════════════════════
|
||||
// MAP PAGE
|
||||
// ══════════════════════════════════════════════
|
||||
console.log(' [coverage] Map page...');
|
||||
await page.goto(`${BASE}/#/map`, { waitUntil: 'networkidle', timeout: 15000 }).catch(() => {});
|
||||
await page.goto(`${BASE}/#/map`, { waitUntil: 'domcontentloaded', timeout: 10000 }).catch(() => {});
|
||||
|
||||
// Toggle controls panel
|
||||
await safeClick('#mapControlsToggle');
|
||||
@@ -345,7 +345,7 @@ async function collectCoverage() {
|
||||
// ANALYTICS PAGE
|
||||
// ══════════════════════════════════════════════
|
||||
console.log(' [coverage] Analytics page...');
|
||||
await page.goto(`${BASE}/#/analytics`, { waitUntil: 'networkidle', timeout: 15000 }).catch(() => {});
|
||||
await page.goto(`${BASE}/#/analytics`, { waitUntil: 'domcontentloaded', timeout: 10000 }).catch(() => {});
|
||||
|
||||
// Click EVERY analytics tab
|
||||
const analyticsTabs = ['overview', 'rf', 'topology', 'channels', 'hashsizes', 'collisions', 'subpaths', 'nodes', 'distance'];
|
||||
@@ -383,7 +383,7 @@ async function collectCoverage() {
|
||||
|
||||
// Deep-link to each analytics tab via URL
|
||||
for (const tab of analyticsTabs) {
|
||||
await page.goto(`${BASE}/#/analytics?tab=${tab}`, { waitUntil: 'networkidle', timeout: 15000 }).catch(() => {});
|
||||
await page.goto(`${BASE}/#/analytics?tab=${tab}`, { waitUntil: 'domcontentloaded', timeout: 10000 }).catch(() => {});
|
||||
}
|
||||
|
||||
// Region filter on analytics
|
||||
@@ -396,7 +396,7 @@ async function collectCoverage() {
|
||||
// CUSTOMIZE
|
||||
// ══════════════════════════════════════════════
|
||||
console.log(' [coverage] Customizer...');
|
||||
await page.goto(BASE, { waitUntil: 'networkidle', timeout: 15000 }).catch(() => {});
|
||||
await page.goto(BASE, { waitUntil: 'domcontentloaded', timeout: 10000 }).catch(() => {});
|
||||
await safeClick('#customizeToggle');
|
||||
|
||||
// Click EVERY customizer tab
|
||||
@@ -503,7 +503,7 @@ async function collectCoverage() {
|
||||
// CHANNELS PAGE
|
||||
// ══════════════════════════════════════════════
|
||||
console.log(' [coverage] Channels page...');
|
||||
await page.goto(`${BASE}/#/channels`, { waitUntil: 'networkidle', timeout: 15000 }).catch(() => {});
|
||||
await page.goto(`${BASE}/#/channels`, { waitUntil: 'domcontentloaded', timeout: 10000 }).catch(() => {});
|
||||
// Click channel rows/items
|
||||
await clickAll('.channel-item, .channel-row, .channel-card', 3);
|
||||
await clickAll('table tbody tr', 3);
|
||||
@@ -512,7 +512,7 @@ async function collectCoverage() {
|
||||
try {
|
||||
const channelHash = await page.$eval('table tbody tr td:first-child', el => el.textContent.trim()).catch(() => null);
|
||||
if (channelHash) {
|
||||
await page.goto(`${BASE}/#/channels/${channelHash}`, { waitUntil: 'networkidle', timeout: 15000 }).catch(() => {});
|
||||
await page.goto(`${BASE}/#/channels/${channelHash}`, { waitUntil: 'domcontentloaded', timeout: 10000 }).catch(() => {});
|
||||
}
|
||||
} catch {}
|
||||
|
||||
@@ -520,7 +520,7 @@ async function collectCoverage() {
|
||||
// LIVE PAGE
|
||||
// ══════════════════════════════════════════════
|
||||
console.log(' [coverage] Live page...');
|
||||
await page.goto(`${BASE}/#/live`, { waitUntil: 'networkidle', timeout: 15000 }).catch(() => {});
|
||||
await page.goto(`${BASE}/#/live`, { waitUntil: 'domcontentloaded', timeout: 10000 }).catch(() => {});
|
||||
|
||||
// VCR controls
|
||||
await safeClick('#vcrPauseBtn');
|
||||
@@ -603,14 +603,14 @@ async function collectCoverage() {
|
||||
// TRACES PAGE
|
||||
// ══════════════════════════════════════════════
|
||||
console.log(' [coverage] Traces page...');
|
||||
await page.goto(`${BASE}/#/traces`, { waitUntil: 'networkidle', timeout: 15000 }).catch(() => {});
|
||||
await page.goto(`${BASE}/#/traces`, { waitUntil: 'domcontentloaded', timeout: 10000 }).catch(() => {});
|
||||
await clickAll('table tbody tr', 3);
|
||||
|
||||
// ══════════════════════════════════════════════
|
||||
// OBSERVERS PAGE
|
||||
// ══════════════════════════════════════════════
|
||||
console.log(' [coverage] Observers page...');
|
||||
await page.goto(`${BASE}/#/observers`, { waitUntil: 'networkidle', timeout: 15000 }).catch(() => {});
|
||||
await page.goto(`${BASE}/#/observers`, { waitUntil: 'domcontentloaded', timeout: 10000 }).catch(() => {});
|
||||
// Click observer rows
|
||||
const obsRows = await page.$$('table tbody tr, .observer-card, .observer-row');
|
||||
for (let i = 0; i < Math.min(obsRows.length, 3); i++) {
|
||||
@@ -631,7 +631,7 @@ async function collectCoverage() {
|
||||
// PERF PAGE
|
||||
// ══════════════════════════════════════════════
|
||||
console.log(' [coverage] Perf page...');
|
||||
await page.goto(`${BASE}/#/perf`, { waitUntil: 'networkidle', timeout: 15000 }).catch(() => {});
|
||||
await page.goto(`${BASE}/#/perf`, { waitUntil: 'domcontentloaded', timeout: 10000 }).catch(() => {});
|
||||
await safeClick('#perfRefresh');
|
||||
await safeClick('#perfReset');
|
||||
|
||||
@@ -641,14 +641,14 @@ async function collectCoverage() {
|
||||
console.log(' [coverage] App.js — router + global...');
|
||||
|
||||
// Navigate to bad route to trigger error/404
|
||||
await page.goto(`${BASE}/#/nonexistent-route`, { waitUntil: 'networkidle', timeout: 15000 }).catch(() => {});
|
||||
await page.goto(`${BASE}/#/nonexistent-route`, { waitUntil: 'domcontentloaded', timeout: 10000 }).catch(() => {});
|
||||
|
||||
// Navigate to every route via hash
|
||||
const allRoutes = ['home', 'nodes', 'packets', 'map', 'live', 'channels', 'traces', 'observers', 'analytics', 'perf'];
|
||||
for (const route of allRoutes) {
|
||||
try {
|
||||
await page.evaluate((r) => { location.hash = '#/' + r; }, route);
|
||||
await page.waitForLoadState('networkidle').catch(() => {});
|
||||
await new Promise(r => setTimeout(r, 200));
|
||||
} catch {}
|
||||
}
|
||||
|
||||
@@ -788,14 +788,14 @@ async function collectCoverage() {
|
||||
console.log(' [coverage] Region filter...');
|
||||
try {
|
||||
// Open region filter on nodes page
|
||||
await page.goto(`${BASE}/#/nodes`, { waitUntil: 'networkidle', timeout: 15000 }).catch(() => {});
|
||||
await page.goto(`${BASE}/#/nodes`, { waitUntil: 'domcontentloaded', timeout: 10000 }).catch(() => {});
|
||||
await safeClick('#nodesRegionFilter');
|
||||
await clickAll('#nodesRegionFilter input[type="checkbox"]', 3);
|
||||
} catch {}
|
||||
|
||||
// Region filter on packets
|
||||
try {
|
||||
await page.goto(`${BASE}/#/packets`, { waitUntil: 'networkidle', timeout: 15000 }).catch(() => {});
|
||||
await page.goto(`${BASE}/#/packets`, { waitUntil: 'domcontentloaded', timeout: 10000 }).catch(() => {});
|
||||
await safeClick('#packetsRegionFilter');
|
||||
await clickAll('#packetsRegionFilter input[type="checkbox"]', 3);
|
||||
} catch {}
|
||||
@@ -807,7 +807,7 @@ async function collectCoverage() {
|
||||
for (const route of allRoutes) {
|
||||
try {
|
||||
await page.evaluate((r) => { location.hash = '#/' + r; }, route);
|
||||
await page.waitForLoadState('networkidle').catch(() => {});
|
||||
await new Promise(r => setTimeout(r, 200));
|
||||
} catch {}
|
||||
}
|
||||
|
||||
|
||||
@@ -909,6 +909,19 @@ async function run() {
|
||||
assert(hexDump, 'Hex dump should be visible after selecting a packet');
|
||||
});
|
||||
|
||||
// Extract frontend coverage if instrumented server is running
|
||||
try {
|
||||
const coverage = await page.evaluate(() => window.__coverage__);
|
||||
if (coverage) {
|
||||
const fs = require('fs');
|
||||
const path = require('path');
|
||||
const outDir = path.join(__dirname, '.nyc_output');
|
||||
if (!fs.existsSync(outDir)) fs.mkdirSync(outDir, { recursive: true });
|
||||
fs.writeFileSync(path.join(outDir, 'e2e-coverage.json'), JSON.stringify(coverage));
|
||||
console.log(`Frontend coverage from E2E: ${Object.keys(coverage).length} files`);
|
||||
}
|
||||
} catch {}
|
||||
|
||||
await browser.close();
|
||||
|
||||
// Summary
|
||||
|
||||
Reference in New Issue
Block a user