Compare commits

...

8 Commits

Author SHA1 Message Date
you
8cee2da2e1 fix: add sleep before poller data insert to prevent race condition in tests
The poller's Start() calls GetMaxTransmissionID() to initialize its cursor.
When the test goroutine inserts data between go poller.Start() and the
actual GetMaxTransmissionID() call, the poller's cursor skips past the
test data and never broadcasts it, causing a timeout.

Adding a 100ms sleep after go poller.Start() ensures the poller has
initialized its cursors before the test inserts new data.
2026-03-29 15:01:39 +00:00
Kpa-clawbot
b7210e4e31 Merge branch 'master' into fix/live-view-starburst 2026-03-29 07:15:26 -07:00
you
206d9bd64a fix: use per-PR concurrency group to prevent cross-PR cancellation
The flat 'deploy' concurrency group caused ALL PRs to share one queue,
so pushing to any PR would cancel CI runs on other PRs.

Changed to deploy-${{ github.event.pull_request.number || github.ref }}
so each PR gets its own concurrency group while re-pushes to the same
PR still cancel the previous run.
2026-03-29 14:14:57 +00:00
you
ab6fbec158 fix: force single SQLite connection in test DBs to prevent in-memory table visibility issues
SQLite :memory: databases create separate databases per connection.
When the connection pool opens multiple connections (e.g. poller goroutine
vs main test goroutine), tables created on one connection are invisible
to others. Setting MaxOpenConns(1) ensures all queries use the same
in-memory database, fixing TestPollerBroadcastsMultipleObservations.
2026-03-29 07:11:45 -07:00
Kpa-clawbot
39c4c3e16e fix: per-observation WS broadcast for live view starburst — fixes #237
IngestNewFromDB now broadcasts one message per observation (not per
transmission). IngestNewObservations also broadcasts late arrivals.
Tests verify multi-observer packets produce multiple WS messages.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-03-29 07:11:45 -07:00
efiten
3f54632b07 fix: cache /stats and GetNodeHashSizeInfo to eliminate slow API calls
- /api/stats: 10s server-side cache — was running 5 SQLite COUNT queries
  on every call, taking ~1500ms with 28 concurrent WS clients polling every 15s
- GetNodeHashSizeInfo: 15s cache — was doing a full O(n) scan + JSON unmarshal
  of all advert packets in memory on every /nodes request, taking ~1200ms

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-29 07:09:05 -07:00
Kpa-clawbot
609b12541e fix: add extra_hosts host.docker.internal to all services — fixes #238
Linux Docker doesn't resolve host.docker.internal by default.
Required when MQTT sources in config.json point to the host machine.
Harmless on Docker Desktop where it already works.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-03-28 18:58:31 -07:00
Kpa-clawbot
4369e58a3c Merge pull request #235 from Kpa-clawbot/fix/compose-build-directive
fix: docker-compose prod/staging need build: directive — fixes pull access denied
2026-03-28 18:36:21 -07:00
9 changed files with 5011 additions and 4660 deletions

View File

@@ -17,7 +17,7 @@ on:
- 'docs/**'
concurrency:
group: deploy
group: deploy-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true
env:

File diff suppressed because it is too large Load Diff

View File

@@ -17,6 +17,8 @@ func setupTestDB(t *testing.T) *DB {
if err != nil {
t.Fatal(err)
}
// Force single connection so all goroutines share the same in-memory DB
conn.SetMaxOpenConns(1)
// Create schema matching MeshCore Analyzer v3
schema := `

View File

@@ -1,403 +1,506 @@
package main
// parity_test.go — Golden fixture shape tests.
// Validates that Go API responses match the shape of Node.js API responses.
// Shapes were captured from the production Node.js server and stored in
// testdata/golden/shapes.json.
import (
"encoding/json"
"fmt"
"net/http/httptest"
"os"
"path/filepath"
"runtime"
"strings"
"testing"
)
// shapeSpec describes the expected JSON structure from the Node.js server.
type shapeSpec struct {
Type string `json:"type"`
Keys map[string]shapeSpec `json:"keys,omitempty"`
ElementShape *shapeSpec `json:"elementShape,omitempty"`
DynamicKeys bool `json:"dynamicKeys,omitempty"`
ValueShape *shapeSpec `json:"valueShape,omitempty"`
RequiredKeys map[string]shapeSpec `json:"requiredKeys,omitempty"`
}
// loadShapes reads testdata/golden/shapes.json relative to this source file.
func loadShapes(t *testing.T) map[string]shapeSpec {
t.Helper()
_, thisFile, _, _ := runtime.Caller(0)
dir := filepath.Dir(thisFile)
data, err := os.ReadFile(filepath.Join(dir, "testdata", "golden", "shapes.json"))
if err != nil {
t.Fatalf("cannot load shapes.json: %v", err)
}
var shapes map[string]shapeSpec
if err := json.Unmarshal(data, &shapes); err != nil {
t.Fatalf("cannot parse shapes.json: %v", err)
}
return shapes
}
// validateShape recursively checks that `actual` matches the expected `spec`.
// `path` tracks the JSON path for error messages.
// Returns a list of mismatch descriptions.
func validateShape(actual interface{}, spec shapeSpec, path string) []string {
var errs []string
switch spec.Type {
case "null", "nullable":
// nullable means: value can be null OR matching type. Accept anything.
return nil
case "nullable_number":
// Can be null or number
if actual != nil {
if _, ok := actual.(float64); !ok {
errs = append(errs, fmt.Sprintf("%s: expected number or null, got %T", path, actual))
}
}
return errs
case "string":
if actual == nil {
errs = append(errs, fmt.Sprintf("%s: expected string, got null", path))
} else if _, ok := actual.(string); !ok {
errs = append(errs, fmt.Sprintf("%s: expected string, got %T", path, actual))
}
case "number":
if actual == nil {
errs = append(errs, fmt.Sprintf("%s: expected number, got null", path))
} else if _, ok := actual.(float64); !ok {
errs = append(errs, fmt.Sprintf("%s: expected number, got %T (%v)", path, actual, actual))
}
case "boolean":
if actual == nil {
errs = append(errs, fmt.Sprintf("%s: expected boolean, got null", path))
} else if _, ok := actual.(bool); !ok {
errs = append(errs, fmt.Sprintf("%s: expected boolean, got %T", path, actual))
}
case "array":
if actual == nil {
errs = append(errs, fmt.Sprintf("%s: expected array, got null (arrays must be [] not null)", path))
return errs
}
arr, ok := actual.([]interface{})
if !ok {
errs = append(errs, fmt.Sprintf("%s: expected array, got %T", path, actual))
return errs
}
if spec.ElementShape != nil && len(arr) > 0 {
errs = append(errs, validateShape(arr[0], *spec.ElementShape, path+"[0]")...)
}
case "object":
if actual == nil {
errs = append(errs, fmt.Sprintf("%s: expected object, got null", path))
return errs
}
obj, ok := actual.(map[string]interface{})
if !ok {
errs = append(errs, fmt.Sprintf("%s: expected object, got %T", path, actual))
return errs
}
if spec.DynamicKeys {
// Object with dynamic keys — validate value shapes
if spec.ValueShape != nil && len(obj) > 0 {
for k, v := range obj {
errs = append(errs, validateShape(v, *spec.ValueShape, path+"."+k)...)
break // check just one sample
}
}
if spec.RequiredKeys != nil {
for rk, rs := range spec.RequiredKeys {
v, exists := obj[rk]
if !exists {
errs = append(errs, fmt.Sprintf("%s: missing required key %q in dynamic-key object", path, rk))
} else {
errs = append(errs, validateShape(v, rs, path+"."+rk)...)
}
}
}
} else if spec.Keys != nil {
// Object with known keys — check each expected key exists and has correct type
for key, keySpec := range spec.Keys {
val, exists := obj[key]
if !exists {
errs = append(errs, fmt.Sprintf("%s: missing field %q (expected %s)", path, key, keySpec.Type))
} else {
errs = append(errs, validateShape(val, keySpec, path+"."+key)...)
}
}
}
}
return errs
}
// parityEndpoint defines one endpoint to test for parity.
type parityEndpoint struct {
name string // key in shapes.json
path string // HTTP path to request
}
func TestParityShapes(t *testing.T) {
shapes := loadShapes(t)
_, router := setupTestServer(t)
endpoints := []parityEndpoint{
{"stats", "/api/stats"},
{"nodes", "/api/nodes?limit=5"},
{"packets", "/api/packets?limit=5"},
{"packets_grouped", "/api/packets?limit=5&groupByHash=true"},
{"observers", "/api/observers"},
{"channels", "/api/channels"},
{"channel_messages", "/api/channels/0000000000000000/messages?limit=5"},
{"analytics_rf", "/api/analytics/rf?days=7"},
{"analytics_topology", "/api/analytics/topology?days=7"},
{"analytics_hash_sizes", "/api/analytics/hash-sizes?days=7"},
{"analytics_distance", "/api/analytics/distance?days=7"},
{"analytics_subpaths", "/api/analytics/subpaths?days=7"},
{"bulk_health", "/api/nodes/bulk-health"},
{"health", "/api/health"},
{"perf", "/api/perf"},
}
for _, ep := range endpoints {
t.Run("Parity_"+ep.name, func(t *testing.T) {
spec, ok := shapes[ep.name]
if !ok {
t.Fatalf("no shape spec found for %q in shapes.json", ep.name)
}
req := httptest.NewRequest("GET", ep.path, nil)
w := httptest.NewRecorder()
router.ServeHTTP(w, req)
if w.Code != 200 {
t.Fatalf("GET %s returned %d, expected 200. Body: %s",
ep.path, w.Code, w.Body.String())
}
var body interface{}
if err := json.Unmarshal(w.Body.Bytes(), &body); err != nil {
t.Fatalf("GET %s returned invalid JSON: %v\nBody: %s",
ep.path, err, w.Body.String())
}
mismatches := validateShape(body, spec, ep.path)
if len(mismatches) > 0 {
t.Errorf("Go %s has %d shape mismatches vs Node.js golden:\n %s",
ep.path, len(mismatches), strings.Join(mismatches, "\n "))
}
})
}
}
// TestParityNodeDetail tests node detail endpoint shape.
// Uses a known test node public key from seeded data.
func TestParityNodeDetail(t *testing.T) {
shapes := loadShapes(t)
_, router := setupTestServer(t)
spec, ok := shapes["node_detail"]
if !ok {
t.Fatal("no shape spec for node_detail in shapes.json")
}
req := httptest.NewRequest("GET", "/api/nodes/aabbccdd11223344", nil)
w := httptest.NewRecorder()
router.ServeHTTP(w, req)
if w.Code != 200 {
t.Fatalf("node detail returned %d: %s", w.Code, w.Body.String())
}
var body interface{}
json.Unmarshal(w.Body.Bytes(), &body)
mismatches := validateShape(body, spec, "/api/nodes/{pubkey}")
if len(mismatches) > 0 {
t.Errorf("Go node detail has %d shape mismatches vs Node.js golden:\n %s",
len(mismatches), strings.Join(mismatches, "\n "))
}
}
// TestParityArraysNotNull verifies that array-typed fields in Go responses are
// [] (empty array) rather than null. This is a common Go/JSON pitfall where
// nil slices marshal as null instead of [].
// Uses shapes.json to know which fields SHOULD be arrays.
func TestParityArraysNotNull(t *testing.T) {
shapes := loadShapes(t)
_, router := setupTestServer(t)
endpoints := []struct {
name string
path string
}{
{"stats", "/api/stats"},
{"nodes", "/api/nodes?limit=5"},
{"packets", "/api/packets?limit=5"},
{"packets_grouped", "/api/packets?limit=5&groupByHash=true"},
{"observers", "/api/observers"},
{"channels", "/api/channels"},
{"bulk_health", "/api/nodes/bulk-health"},
{"analytics_rf", "/api/analytics/rf?days=7"},
{"analytics_topology", "/api/analytics/topology?days=7"},
{"analytics_hash_sizes", "/api/analytics/hash-sizes?days=7"},
{"analytics_distance", "/api/analytics/distance?days=7"},
{"analytics_subpaths", "/api/analytics/subpaths?days=7"},
}
for _, ep := range endpoints {
t.Run("NullArrayCheck_"+ep.name, func(t *testing.T) {
spec, ok := shapes[ep.name]
if !ok {
t.Skipf("no shape spec for %s", ep.name)
}
req := httptest.NewRequest("GET", ep.path, nil)
w := httptest.NewRecorder()
router.ServeHTTP(w, req)
if w.Code != 200 {
t.Skipf("GET %s returned %d, skipping null-array check", ep.path, w.Code)
}
var body interface{}
json.Unmarshal(w.Body.Bytes(), &body)
nullArrays := findNullArrays(body, spec, ep.path)
if len(nullArrays) > 0 {
t.Errorf("Go %s has null where [] expected:\n %s\n"+
"Go nil slices marshal as null — initialize with make() or literal",
ep.path, strings.Join(nullArrays, "\n "))
}
})
}
}
// findNullArrays walks JSON data alongside a shape spec and returns paths
// where the spec says the field should be an array but Go returned null.
func findNullArrays(actual interface{}, spec shapeSpec, path string) []string {
var nulls []string
switch spec.Type {
case "array":
if actual == nil {
nulls = append(nulls, fmt.Sprintf("%s: null (should be [])", path))
} else if arr, ok := actual.([]interface{}); ok && spec.ElementShape != nil {
for i, elem := range arr {
nulls = append(nulls, findNullArrays(elem, *spec.ElementShape, fmt.Sprintf("%s[%d]", path, i))...)
}
}
case "object":
obj, ok := actual.(map[string]interface{})
if !ok || obj == nil {
return nulls
}
if spec.Keys != nil {
for key, keySpec := range spec.Keys {
if val, exists := obj[key]; exists {
nulls = append(nulls, findNullArrays(val, keySpec, path+"."+key)...)
} else if keySpec.Type == "array" {
// Key missing entirely — also a null-array problem
nulls = append(nulls, fmt.Sprintf("%s.%s: missing (should be [])", path, key))
}
}
}
if spec.DynamicKeys && spec.ValueShape != nil {
for k, v := range obj {
nulls = append(nulls, findNullArrays(v, *spec.ValueShape, path+"."+k)...)
break // sample one
}
}
}
return nulls
}
// TestParityHealthEngine verifies Go health endpoint declares engine=go
// while Node declares engine=node (or omits it). The Go server must always
// identify itself.
func TestParityHealthEngine(t *testing.T) {
_, router := setupTestServer(t)
req := httptest.NewRequest("GET", "/api/health", nil)
w := httptest.NewRecorder()
router.ServeHTTP(w, req)
var body map[string]interface{}
json.Unmarshal(w.Body.Bytes(), &body)
engine, ok := body["engine"]
if !ok {
t.Error("health response missing 'engine' field (Go server must include engine=go)")
} else if engine != "go" {
t.Errorf("health engine=%v, expected 'go'", engine)
}
}
// TestValidateShapeFunction directly tests the shape validator itself.
func TestValidateShapeFunction(t *testing.T) {
t.Run("string match", func(t *testing.T) {
errs := validateShape("hello", shapeSpec{Type: "string"}, "$.x")
if len(errs) != 0 {
t.Errorf("unexpected errors: %v", errs)
}
})
t.Run("string mismatch", func(t *testing.T) {
errs := validateShape(42.0, shapeSpec{Type: "string"}, "$.x")
if len(errs) != 1 {
t.Errorf("expected 1 error, got %d: %v", len(errs), errs)
}
})
t.Run("null array rejected", func(t *testing.T) {
errs := validateShape(nil, shapeSpec{Type: "array"}, "$.arr")
if len(errs) != 1 || !strings.Contains(errs[0], "null") {
t.Errorf("expected null-array error, got: %v", errs)
}
})
t.Run("empty array OK", func(t *testing.T) {
errs := validateShape([]interface{}{}, shapeSpec{Type: "array"}, "$.arr")
if len(errs) != 0 {
t.Errorf("unexpected errors for empty array: %v", errs)
}
})
t.Run("missing object key", func(t *testing.T) {
spec := shapeSpec{Type: "object", Keys: map[string]shapeSpec{
"name": {Type: "string"},
"age": {Type: "number"},
}}
obj := map[string]interface{}{"name": "test"}
errs := validateShape(obj, spec, "$.user")
if len(errs) != 1 || !strings.Contains(errs[0], "age") {
t.Errorf("expected missing age error, got: %v", errs)
}
})
t.Run("nullable allows null", func(t *testing.T) {
errs := validateShape(nil, shapeSpec{Type: "nullable"}, "$.x")
if len(errs) != 0 {
t.Errorf("nullable should accept null: %v", errs)
}
})
t.Run("dynamic keys validates value shape", func(t *testing.T) {
spec := shapeSpec{
Type: "object",
DynamicKeys: true,
ValueShape: &shapeSpec{Type: "number"},
}
obj := map[string]interface{}{"a": 1.0, "b": 2.0}
errs := validateShape(obj, spec, "$.dyn")
if len(errs) != 0 {
t.Errorf("unexpected errors: %v", errs)
}
})
}
package main
// parity_test.go — Golden fixture shape tests.
// Validates that Go API responses match the shape of Node.js API responses.
// Shapes were captured from the production Node.js server and stored in
// testdata/golden/shapes.json.
import (
"encoding/json"
"fmt"
"net/http/httptest"
"os"
"path/filepath"
"runtime"
"sort"
"strings"
"testing"
"time"
)
// shapeSpec describes the expected JSON structure from the Node.js server.
type shapeSpec struct {
Type string `json:"type"`
Keys map[string]shapeSpec `json:"keys,omitempty"`
ElementShape *shapeSpec `json:"elementShape,omitempty"`
DynamicKeys bool `json:"dynamicKeys,omitempty"`
ValueShape *shapeSpec `json:"valueShape,omitempty"`
RequiredKeys map[string]shapeSpec `json:"requiredKeys,omitempty"`
}
// loadShapes reads testdata/golden/shapes.json relative to this source file.
func loadShapes(t *testing.T) map[string]shapeSpec {
t.Helper()
_, thisFile, _, _ := runtime.Caller(0)
dir := filepath.Dir(thisFile)
data, err := os.ReadFile(filepath.Join(dir, "testdata", "golden", "shapes.json"))
if err != nil {
t.Fatalf("cannot load shapes.json: %v", err)
}
var shapes map[string]shapeSpec
if err := json.Unmarshal(data, &shapes); err != nil {
t.Fatalf("cannot parse shapes.json: %v", err)
}
return shapes
}
// validateShape recursively checks that `actual` matches the expected `spec`.
// `path` tracks the JSON path for error messages.
// Returns a list of mismatch descriptions.
func validateShape(actual interface{}, spec shapeSpec, path string) []string {
var errs []string
switch spec.Type {
case "null", "nullable":
// nullable means: value can be null OR matching type. Accept anything.
return nil
case "nullable_number":
// Can be null or number
if actual != nil {
if _, ok := actual.(float64); !ok {
errs = append(errs, fmt.Sprintf("%s: expected number or null, got %T", path, actual))
}
}
return errs
case "string":
if actual == nil {
errs = append(errs, fmt.Sprintf("%s: expected string, got null", path))
} else if _, ok := actual.(string); !ok {
errs = append(errs, fmt.Sprintf("%s: expected string, got %T", path, actual))
}
case "number":
if actual == nil {
errs = append(errs, fmt.Sprintf("%s: expected number, got null", path))
} else if _, ok := actual.(float64); !ok {
errs = append(errs, fmt.Sprintf("%s: expected number, got %T (%v)", path, actual, actual))
}
case "boolean":
if actual == nil {
errs = append(errs, fmt.Sprintf("%s: expected boolean, got null", path))
} else if _, ok := actual.(bool); !ok {
errs = append(errs, fmt.Sprintf("%s: expected boolean, got %T", path, actual))
}
case "array":
if actual == nil {
errs = append(errs, fmt.Sprintf("%s: expected array, got null (arrays must be [] not null)", path))
return errs
}
arr, ok := actual.([]interface{})
if !ok {
errs = append(errs, fmt.Sprintf("%s: expected array, got %T", path, actual))
return errs
}
if spec.ElementShape != nil && len(arr) > 0 {
errs = append(errs, validateShape(arr[0], *spec.ElementShape, path+"[0]")...)
}
case "object":
if actual == nil {
errs = append(errs, fmt.Sprintf("%s: expected object, got null", path))
return errs
}
obj, ok := actual.(map[string]interface{})
if !ok {
errs = append(errs, fmt.Sprintf("%s: expected object, got %T", path, actual))
return errs
}
if spec.DynamicKeys {
// Object with dynamic keys — validate value shapes
if spec.ValueShape != nil && len(obj) > 0 {
for k, v := range obj {
errs = append(errs, validateShape(v, *spec.ValueShape, path+"."+k)...)
break // check just one sample
}
}
if spec.RequiredKeys != nil {
for rk, rs := range spec.RequiredKeys {
v, exists := obj[rk]
if !exists {
errs = append(errs, fmt.Sprintf("%s: missing required key %q in dynamic-key object", path, rk))
} else {
errs = append(errs, validateShape(v, rs, path+"."+rk)...)
}
}
}
} else if spec.Keys != nil {
// Object with known keys — check each expected key exists and has correct type
for key, keySpec := range spec.Keys {
val, exists := obj[key]
if !exists {
errs = append(errs, fmt.Sprintf("%s: missing field %q (expected %s)", path, key, keySpec.Type))
} else {
errs = append(errs, validateShape(val, keySpec, path+"."+key)...)
}
}
}
}
return errs
}
// parityEndpoint defines one endpoint to test for parity.
type parityEndpoint struct {
name string // key in shapes.json
path string // HTTP path to request
}
func TestParityShapes(t *testing.T) {
shapes := loadShapes(t)
_, router := setupTestServer(t)
endpoints := []parityEndpoint{
{"stats", "/api/stats"},
{"nodes", "/api/nodes?limit=5"},
{"packets", "/api/packets?limit=5"},
{"packets_grouped", "/api/packets?limit=5&groupByHash=true"},
{"observers", "/api/observers"},
{"channels", "/api/channels"},
{"channel_messages", "/api/channels/0000000000000000/messages?limit=5"},
{"analytics_rf", "/api/analytics/rf?days=7"},
{"analytics_topology", "/api/analytics/topology?days=7"},
{"analytics_hash_sizes", "/api/analytics/hash-sizes?days=7"},
{"analytics_distance", "/api/analytics/distance?days=7"},
{"analytics_subpaths", "/api/analytics/subpaths?days=7"},
{"bulk_health", "/api/nodes/bulk-health"},
{"health", "/api/health"},
{"perf", "/api/perf"},
}
for _, ep := range endpoints {
t.Run("Parity_"+ep.name, func(t *testing.T) {
spec, ok := shapes[ep.name]
if !ok {
t.Fatalf("no shape spec found for %q in shapes.json", ep.name)
}
req := httptest.NewRequest("GET", ep.path, nil)
w := httptest.NewRecorder()
router.ServeHTTP(w, req)
if w.Code != 200 {
t.Fatalf("GET %s returned %d, expected 200. Body: %s",
ep.path, w.Code, w.Body.String())
}
var body interface{}
if err := json.Unmarshal(w.Body.Bytes(), &body); err != nil {
t.Fatalf("GET %s returned invalid JSON: %v\nBody: %s",
ep.path, err, w.Body.String())
}
mismatches := validateShape(body, spec, ep.path)
if len(mismatches) > 0 {
t.Errorf("Go %s has %d shape mismatches vs Node.js golden:\n %s",
ep.path, len(mismatches), strings.Join(mismatches, "\n "))
}
})
}
}
// TestParityNodeDetail tests node detail endpoint shape.
// Uses a known test node public key from seeded data.
func TestParityNodeDetail(t *testing.T) {
shapes := loadShapes(t)
_, router := setupTestServer(t)
spec, ok := shapes["node_detail"]
if !ok {
t.Fatal("no shape spec for node_detail in shapes.json")
}
req := httptest.NewRequest("GET", "/api/nodes/aabbccdd11223344", nil)
w := httptest.NewRecorder()
router.ServeHTTP(w, req)
if w.Code != 200 {
t.Fatalf("node detail returned %d: %s", w.Code, w.Body.String())
}
var body interface{}
json.Unmarshal(w.Body.Bytes(), &body)
mismatches := validateShape(body, spec, "/api/nodes/{pubkey}")
if len(mismatches) > 0 {
t.Errorf("Go node detail has %d shape mismatches vs Node.js golden:\n %s",
len(mismatches), strings.Join(mismatches, "\n "))
}
}
// TestParityArraysNotNull verifies that array-typed fields in Go responses are
// [] (empty array) rather than null. This is a common Go/JSON pitfall where
// nil slices marshal as null instead of [].
// Uses shapes.json to know which fields SHOULD be arrays.
func TestParityArraysNotNull(t *testing.T) {
shapes := loadShapes(t)
_, router := setupTestServer(t)
endpoints := []struct {
name string
path string
}{
{"stats", "/api/stats"},
{"nodes", "/api/nodes?limit=5"},
{"packets", "/api/packets?limit=5"},
{"packets_grouped", "/api/packets?limit=5&groupByHash=true"},
{"observers", "/api/observers"},
{"channels", "/api/channels"},
{"bulk_health", "/api/nodes/bulk-health"},
{"analytics_rf", "/api/analytics/rf?days=7"},
{"analytics_topology", "/api/analytics/topology?days=7"},
{"analytics_hash_sizes", "/api/analytics/hash-sizes?days=7"},
{"analytics_distance", "/api/analytics/distance?days=7"},
{"analytics_subpaths", "/api/analytics/subpaths?days=7"},
}
for _, ep := range endpoints {
t.Run("NullArrayCheck_"+ep.name, func(t *testing.T) {
spec, ok := shapes[ep.name]
if !ok {
t.Skipf("no shape spec for %s", ep.name)
}
req := httptest.NewRequest("GET", ep.path, nil)
w := httptest.NewRecorder()
router.ServeHTTP(w, req)
if w.Code != 200 {
t.Skipf("GET %s returned %d, skipping null-array check", ep.path, w.Code)
}
var body interface{}
json.Unmarshal(w.Body.Bytes(), &body)
nullArrays := findNullArrays(body, spec, ep.path)
if len(nullArrays) > 0 {
t.Errorf("Go %s has null where [] expected:\n %s\n"+
"Go nil slices marshal as null — initialize with make() or literal",
ep.path, strings.Join(nullArrays, "\n "))
}
})
}
}
// findNullArrays walks JSON data alongside a shape spec and returns paths
// where the spec says the field should be an array but Go returned null.
func findNullArrays(actual interface{}, spec shapeSpec, path string) []string {
var nulls []string
switch spec.Type {
case "array":
if actual == nil {
nulls = append(nulls, fmt.Sprintf("%s: null (should be [])", path))
} else if arr, ok := actual.([]interface{}); ok && spec.ElementShape != nil {
for i, elem := range arr {
nulls = append(nulls, findNullArrays(elem, *spec.ElementShape, fmt.Sprintf("%s[%d]", path, i))...)
}
}
case "object":
obj, ok := actual.(map[string]interface{})
if !ok || obj == nil {
return nulls
}
if spec.Keys != nil {
for key, keySpec := range spec.Keys {
if val, exists := obj[key]; exists {
nulls = append(nulls, findNullArrays(val, keySpec, path+"."+key)...)
} else if keySpec.Type == "array" {
// Key missing entirely — also a null-array problem
nulls = append(nulls, fmt.Sprintf("%s.%s: missing (should be [])", path, key))
}
}
}
if spec.DynamicKeys && spec.ValueShape != nil {
for k, v := range obj {
nulls = append(nulls, findNullArrays(v, *spec.ValueShape, path+"."+k)...)
break // sample one
}
}
}
return nulls
}
// TestParityHealthEngine verifies Go health endpoint declares engine=go
// while Node declares engine=node (or omits it). The Go server must always
// identify itself.
func TestParityHealthEngine(t *testing.T) {
_, router := setupTestServer(t)
req := httptest.NewRequest("GET", "/api/health", nil)
w := httptest.NewRecorder()
router.ServeHTTP(w, req)
var body map[string]interface{}
json.Unmarshal(w.Body.Bytes(), &body)
engine, ok := body["engine"]
if !ok {
t.Error("health response missing 'engine' field (Go server must include engine=go)")
} else if engine != "go" {
t.Errorf("health engine=%v, expected 'go'", engine)
}
}
// TestValidateShapeFunction directly tests the shape validator itself.
func TestValidateShapeFunction(t *testing.T) {
t.Run("string match", func(t *testing.T) {
errs := validateShape("hello", shapeSpec{Type: "string"}, "$.x")
if len(errs) != 0 {
t.Errorf("unexpected errors: %v", errs)
}
})
t.Run("string mismatch", func(t *testing.T) {
errs := validateShape(42.0, shapeSpec{Type: "string"}, "$.x")
if len(errs) != 1 {
t.Errorf("expected 1 error, got %d: %v", len(errs), errs)
}
})
t.Run("null array rejected", func(t *testing.T) {
errs := validateShape(nil, shapeSpec{Type: "array"}, "$.arr")
if len(errs) != 1 || !strings.Contains(errs[0], "null") {
t.Errorf("expected null-array error, got: %v", errs)
}
})
t.Run("empty array OK", func(t *testing.T) {
errs := validateShape([]interface{}{}, shapeSpec{Type: "array"}, "$.arr")
if len(errs) != 0 {
t.Errorf("unexpected errors for empty array: %v", errs)
}
})
t.Run("missing object key", func(t *testing.T) {
spec := shapeSpec{Type: "object", Keys: map[string]shapeSpec{
"name": {Type: "string"},
"age": {Type: "number"},
}}
obj := map[string]interface{}{"name": "test"}
errs := validateShape(obj, spec, "$.user")
if len(errs) != 1 || !strings.Contains(errs[0], "age") {
t.Errorf("expected missing age error, got: %v", errs)
}
})
t.Run("nullable allows null", func(t *testing.T) {
errs := validateShape(nil, shapeSpec{Type: "nullable"}, "$.x")
if len(errs) != 0 {
t.Errorf("nullable should accept null: %v", errs)
}
})
t.Run("dynamic keys validates value shape", func(t *testing.T) {
spec := shapeSpec{
Type: "object",
DynamicKeys: true,
ValueShape: &shapeSpec{Type: "number"},
}
obj := map[string]interface{}{"a": 1.0, "b": 2.0}
errs := validateShape(obj, spec, "$.dyn")
if len(errs) != 0 {
t.Errorf("unexpected errors: %v", errs)
}
})
}
func TestParityWSMultiObserverGolden(t *testing.T) {
db := setupTestDB(t)
defer db.Close()
seedTestData(t, db)
hub := NewHub()
store := NewPacketStore(db)
if err := store.Load(); err != nil {
t.Fatalf("store load failed: %v", err)
}
poller := NewPoller(db, hub, 50*time.Millisecond)
poller.store = store
client := &Client{send: make(chan []byte, 256)}
hub.Register(client)
defer hub.Unregister(client)
go poller.Start()
defer poller.Stop()
// Wait for poller to initialize its lastID/lastObsID cursors before
// inserting new data; otherwise the poller may snapshot a lastID that
// already includes the test data and never broadcast it.
time.Sleep(100 * time.Millisecond)
now := time.Now().UTC().Format(time.RFC3339)
if _, err := db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json)
VALUES ('BEEF', 'goldenstarburst237', ?, 1, 4, '{"pubKey":"aabbccdd11223344","type":"ADVERT"}')`, now); err != nil {
t.Fatalf("insert tx failed: %v", err)
}
var txID int
if err := db.conn.QueryRow(`SELECT id FROM transmissions WHERE hash='goldenstarburst237'`).Scan(&txID); err != nil {
t.Fatalf("query tx id failed: %v", err)
}
ts := time.Now().Unix()
if _, err := db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
VALUES (?, 1, 11.0, -88, '["p1"]', ?),
(?, 2, 9.0, -92, '["p1","p2"]', ?),
(?, 1, 7.0, -96, '["p1","p2","p3"]', ?)`,
txID, ts, txID, ts+1, txID, ts+2); err != nil {
t.Fatalf("insert obs failed: %v", err)
}
type golden struct {
Hash string
Count int
Paths []string
ObserverIDs []string
}
expected := golden{
Hash: "goldenstarburst237",
Count: 3,
Paths: []string{`["p1"]`, `["p1","p2"]`, `["p1","p2","p3"]`},
ObserverIDs: []string{"obs1", "obs2"},
}
gotPaths := make([]string, 0, expected.Count)
gotObservers := make(map[string]bool)
deadline := time.After(2 * time.Second)
for len(gotPaths) < expected.Count {
select {
case raw := <-client.send:
var msg map[string]interface{}
if err := json.Unmarshal(raw, &msg); err != nil {
t.Fatalf("unmarshal ws message failed: %v", err)
}
if msg["type"] != "packet" {
continue
}
data, _ := msg["data"].(map[string]interface{})
if data == nil || data["hash"] != expected.Hash {
continue
}
if path, ok := data["path_json"].(string); ok {
gotPaths = append(gotPaths, path)
}
if oid, ok := data["observer_id"].(string); ok && oid != "" {
gotObservers[oid] = true
}
case <-deadline:
t.Fatalf("timed out waiting for %d ws messages, got %d", expected.Count, len(gotPaths))
}
}
sort.Strings(gotPaths)
sort.Strings(expected.Paths)
if len(gotPaths) != len(expected.Paths) {
t.Fatalf("path count mismatch: got %d want %d", len(gotPaths), len(expected.Paths))
}
for i := range expected.Paths {
if gotPaths[i] != expected.Paths[i] {
t.Fatalf("path mismatch at %d: got %q want %q", i, gotPaths[i], expected.Paths[i])
}
}
for _, oid := range expected.ObserverIDs {
if !gotObservers[oid] {
t.Fatalf("missing expected observer %q in ws messages", oid)
}
}
}

View File

@@ -33,6 +33,11 @@ type Server struct {
memStatsMu sync.Mutex
memStatsCache runtime.MemStats
memStatsCachedAt time.Time
// Cached /api/stats response — recomputed at most once every 10s
statsMu sync.Mutex
statsCache *StatsResponse
statsCachedAt time.Time
}
// PerfStats tracks request performance.
@@ -380,6 +385,17 @@ func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
}
func (s *Server) handleStats(w http.ResponseWriter, r *http.Request) {
const statsTTL = 10 * time.Second
s.statsMu.Lock()
if s.statsCache != nil && time.Since(s.statsCachedAt) < statsTTL {
cached := s.statsCache
s.statsMu.Unlock()
writeJSON(w, cached)
return
}
s.statsMu.Unlock()
var stats *Stats
var err error
if s.store != nil {
@@ -392,7 +408,7 @@ func (s *Server) handleStats(w http.ResponseWriter, r *http.Request) {
return
}
counts := s.db.GetRoleCounts()
writeJSON(w, StatsResponse{
resp := &StatsResponse{
TotalPackets: stats.TotalPackets,
TotalTransmissions: &stats.TotalTransmissions,
TotalObservations: stats.TotalObservations,
@@ -411,7 +427,14 @@ func (s *Server) handleStats(w http.ResponseWriter, r *http.Request) {
Companions: counts["companions"],
Sensors: counts["sensors"],
},
})
}
s.statsMu.Lock()
s.statsCache = resp
s.statsCachedAt = time.Now()
s.statsMu.Unlock()
writeJSON(w, resp)
}
func (s *Server) handlePerf(w http.ResponseWriter, r *http.Request) {

View File

@@ -98,6 +98,11 @@ type PacketStore struct {
// computed during Load() and incrementally updated on ingest.
distHops []distHopRecord
distPaths []distPathRecord
// Cached GetNodeHashSizeInfo result — recomputed at most once every 15s
hashSizeInfoMu sync.Mutex
hashSizeInfoCache map[string]*hashSizeNodeInfo
hashSizeInfoAt time.Time
}
// Precomputed distance records for fast analytics aggregation.
@@ -1034,7 +1039,7 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac
}
}
// Build broadcast maps (same shape as Node.js WS broadcast)
// Build broadcast maps (same shape as Node.js WS broadcast), one per observation.
result := make([]map[string]interface{}, 0, len(broadcastOrder))
for _, txID := range broadcastOrder {
tx := broadcastTxs[txID]
@@ -1050,32 +1055,34 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac
decoded["payload"] = payload
}
}
// Build the nested packet object (packets.js checks m.data.packet)
pkt := map[string]interface{}{
"id": tx.ID,
"raw_hex": strOrNil(tx.RawHex),
"hash": strOrNil(tx.Hash),
"first_seen": strOrNil(tx.FirstSeen),
"timestamp": strOrNil(tx.FirstSeen),
"route_type": intPtrOrNil(tx.RouteType),
"payload_type": intPtrOrNil(tx.PayloadType),
"decoded_json": strOrNil(tx.DecodedJSON),
"observer_id": strOrNil(tx.ObserverID),
"observer_name": strOrNil(tx.ObserverName),
"snr": floatPtrOrNil(tx.SNR),
"rssi": floatPtrOrNil(tx.RSSI),
"path_json": strOrNil(tx.PathJSON),
"direction": strOrNil(tx.Direction),
"observation_count": tx.ObservationCount,
for _, obs := range tx.Observations {
// Build the nested packet object (packets.js checks m.data.packet)
pkt := map[string]interface{}{
"id": tx.ID,
"raw_hex": strOrNil(tx.RawHex),
"hash": strOrNil(tx.Hash),
"first_seen": strOrNil(tx.FirstSeen),
"timestamp": strOrNil(tx.FirstSeen),
"route_type": intPtrOrNil(tx.RouteType),
"payload_type": intPtrOrNil(tx.PayloadType),
"decoded_json": strOrNil(tx.DecodedJSON),
"observer_id": strOrNil(obs.ObserverID),
"observer_name": strOrNil(obs.ObserverName),
"snr": floatPtrOrNil(obs.SNR),
"rssi": floatPtrOrNil(obs.RSSI),
"path_json": strOrNil(obs.PathJSON),
"direction": strOrNil(obs.Direction),
"observation_count": tx.ObservationCount,
}
// Broadcast map: top-level fields for live.js + nested packet for packets.js
broadcastMap := make(map[string]interface{}, len(pkt)+2)
for k, v := range pkt {
broadcastMap[k] = v
}
broadcastMap["decoded"] = decoded
broadcastMap["packet"] = pkt
result = append(result, broadcastMap)
}
// Broadcast map: top-level fields for live.js + nested packet for packets.js
broadcastMap := make(map[string]interface{}, len(pkt)+2)
for k, v := range pkt {
broadcastMap[k] = v
}
broadcastMap["decoded"] = decoded
broadcastMap["packet"] = pkt
result = append(result, broadcastMap)
}
// Invalidate analytics caches since new data was ingested
@@ -1096,7 +1103,7 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac
// IngestNewObservations loads new observations for transmissions already in the
// store. This catches observations that arrive after IngestNewFromDB has already
// advanced past the transmission's ID (fixes #174).
func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) int {
func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string]interface{} {
if limit <= 0 {
limit = 500
}
@@ -1122,7 +1129,7 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) int {
rows, err := s.db.conn.Query(querySQL, sinceObsID, limit)
if err != nil {
log.Printf("[store] ingest observations query error: %v", err)
return sinceObsID
return nil
}
defer rows.Close()
@@ -1165,20 +1172,16 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) int {
}
if len(obsRows) == 0 {
return sinceObsID
return nil
}
s.mu.Lock()
defer s.mu.Unlock()
newMaxObsID := sinceObsID
updatedTxs := make(map[int]*StoreTx)
broadcastMaps := make([]map[string]interface{}, 0, len(obsRows))
for _, r := range obsRows {
if r.obsID > newMaxObsID {
newMaxObsID = r.obsID
}
// Already ingested (e.g. by IngestNewFromDB in same cycle)
if _, exists := s.byObsID[r.obsID]; exists {
continue
@@ -1221,6 +1224,43 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) int {
}
s.totalObs++
updatedTxs[r.txID] = tx
decoded := map[string]interface{}{
"header": map[string]interface{}{
"payloadTypeName": resolvePayloadTypeName(tx.PayloadType),
},
}
if tx.DecodedJSON != "" {
var payload map[string]interface{}
if json.Unmarshal([]byte(tx.DecodedJSON), &payload) == nil {
decoded["payload"] = payload
}
}
pkt := map[string]interface{}{
"id": tx.ID,
"raw_hex": strOrNil(tx.RawHex),
"hash": strOrNil(tx.Hash),
"first_seen": strOrNil(tx.FirstSeen),
"timestamp": strOrNil(tx.FirstSeen),
"route_type": intPtrOrNil(tx.RouteType),
"payload_type": intPtrOrNil(tx.PayloadType),
"decoded_json": strOrNil(tx.DecodedJSON),
"observer_id": strOrNil(obs.ObserverID),
"observer_name": strOrNil(obs.ObserverName),
"snr": floatPtrOrNil(obs.SNR),
"rssi": floatPtrOrNil(obs.RSSI),
"path_json": strOrNil(obs.PathJSON),
"direction": strOrNil(obs.Direction),
"observation_count": tx.ObservationCount,
}
broadcastMap := make(map[string]interface{}, len(pkt)+2)
for k, v := range pkt {
broadcastMap[k] = v
}
broadcastMap["decoded"] = decoded
broadcastMap["packet"] = pkt
broadcastMaps = append(broadcastMaps, broadcastMap)
}
// Re-pick best observation for updated transmissions and update subpath index
@@ -1275,7 +1315,7 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) int {
// analytics caches cleared; no per-cycle log to avoid stdout overhead
}
return newMaxObsID
return broadcastMaps
}
// MaxTransmissionID returns the highest transmission ID in the store.
@@ -3722,8 +3762,26 @@ type hashSizeNodeInfo struct {
Inconsistent bool
}
// GetNodeHashSizeInfo scans advert packets to compute per-node hash size data.
// GetNodeHashSizeInfo returns cached per-node hash size data, recomputing at most every 15s.
func (s *PacketStore) GetNodeHashSizeInfo() map[string]*hashSizeNodeInfo {
const ttl = 15 * time.Second
s.hashSizeInfoMu.Lock()
if s.hashSizeInfoCache != nil && time.Since(s.hashSizeInfoAt) < ttl {
cached := s.hashSizeInfoCache
s.hashSizeInfoMu.Unlock()
return cached
}
s.hashSizeInfoMu.Unlock()
result := s.computeNodeHashSizeInfo()
s.hashSizeInfoMu.Lock()
s.hashSizeInfoCache = result
s.hashSizeInfoAt = time.Now()
s.hashSizeInfoMu.Unlock()
return result
}
// computeNodeHashSizeInfo scans advert packets to compute per-node hash size data.
func (s *PacketStore) computeNodeHashSizeInfo() map[string]*hashSizeNodeInfo {
s.mu.RLock()
defer s.mu.RUnlock()

View File

@@ -1,229 +1,245 @@
package main
import (
"encoding/json"
"log"
"net/http"
"strings"
"sync"
"time"
"github.com/gorilla/websocket"
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 4096,
CheckOrigin: func(r *http.Request) bool { return true },
}
// Hub manages WebSocket clients and broadcasts.
type Hub struct {
mu sync.RWMutex
clients map[*Client]bool
}
// Client is a single WebSocket connection.
type Client struct {
conn *websocket.Conn
send chan []byte
}
func NewHub() *Hub {
return &Hub{
clients: make(map[*Client]bool),
}
}
func (h *Hub) ClientCount() int {
h.mu.RLock()
defer h.mu.RUnlock()
return len(h.clients)
}
func (h *Hub) Register(c *Client) {
h.mu.Lock()
h.clients[c] = true
h.mu.Unlock()
log.Printf("[ws] client connected (%d total)", h.ClientCount())
}
func (h *Hub) Unregister(c *Client) {
h.mu.Lock()
if _, ok := h.clients[c]; ok {
delete(h.clients, c)
close(c.send)
}
h.mu.Unlock()
log.Printf("[ws] client disconnected (%d total)", h.ClientCount())
}
// Broadcast sends a message to all connected clients.
func (h *Hub) Broadcast(msg interface{}) {
data, err := json.Marshal(msg)
if err != nil {
log.Printf("[ws] marshal error: %v", err)
return
}
h.mu.RLock()
defer h.mu.RUnlock()
for c := range h.clients {
select {
case c.send <- data:
default:
// Client buffer full — drop
}
}
}
// ServeWS handles the WebSocket upgrade and runs the client.
func (h *Hub) ServeWS(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("[ws] upgrade error: %v", err)
return
}
client := &Client{
conn: conn,
send: make(chan []byte, 256),
}
h.Register(client)
go client.writePump()
go client.readPump(h)
}
// wsOrStatic upgrades WebSocket requests at any path, serves static files otherwise.
func wsOrStatic(hub *Hub, static http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.EqualFold(r.Header.Get("Upgrade"), "websocket") {
hub.ServeWS(w, r)
return
}
static.ServeHTTP(w, r)
})
}
func (c *Client) readPump(hub *Hub) {
defer func() {
hub.Unregister(c)
c.conn.Close()
}()
c.conn.SetReadLimit(512)
c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
c.conn.SetPongHandler(func(string) error {
c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
return nil
})
for {
_, _, err := c.conn.ReadMessage()
if err != nil {
break
}
}
}
func (c *Client) writePump() {
ticker := time.NewTicker(30 * time.Second)
defer func() {
ticker.Stop()
c.conn.Close()
}()
for {
select {
case message, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if !ok {
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
if err := c.conn.WriteMessage(websocket.TextMessage, message); err != nil {
return
}
case <-ticker.C:
c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
// Poller watches for new transmissions in SQLite and broadcasts them.
type Poller struct {
db *DB
hub *Hub
store *PacketStore // optional: if set, new transmissions are ingested into memory
interval time.Duration
stop chan struct{}
}
func NewPoller(db *DB, hub *Hub, interval time.Duration) *Poller {
return &Poller{db: db, hub: hub, interval: interval, stop: make(chan struct{})}
}
func (p *Poller) Start() {
lastID := p.db.GetMaxTransmissionID()
lastObsID := p.db.GetMaxObservationID()
log.Printf("[poller] starting from transmission ID %d, obs ID %d, interval %v", lastID, lastObsID, p.interval)
ticker := time.NewTicker(p.interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if p.store != nil {
// Ingest new transmissions into in-memory store and broadcast
newTxs, newMax := p.store.IngestNewFromDB(lastID, 100)
if newMax > lastID {
lastID = newMax
}
// Ingest new observations for existing transmissions (fixes #174)
newObsMax := p.store.IngestNewObservations(lastObsID, 500)
if newObsMax > lastObsID {
lastObsID = newObsMax
}
if len(newTxs) > 0 {
log.Printf("[broadcast] sending %d packets to %d clients (lastID now %d)", len(newTxs), p.hub.ClientCount(), lastID)
}
for _, tx := range newTxs {
p.hub.Broadcast(WSMessage{
Type: "packet",
Data: tx,
})
}
} else {
// Fallback: direct DB query (used when store is nil, e.g. tests)
newTxs, err := p.db.GetNewTransmissionsSince(lastID, 100)
if err != nil {
log.Printf("[poller] error: %v", err)
continue
}
for _, tx := range newTxs {
id, _ := tx["id"].(int)
if id > lastID {
lastID = id
}
// Copy packet fields for the nested packet (avoids circular ref)
pkt := make(map[string]interface{}, len(tx))
for k, v := range tx {
pkt[k] = v
}
tx["packet"] = pkt
p.hub.Broadcast(WSMessage{
Type: "packet",
Data: tx,
})
}
}
case <-p.stop:
return
}
}
}
func (p *Poller) Stop() {
close(p.stop)
}
package main
import (
"encoding/json"
"log"
"net/http"
"strings"
"sync"
"time"
"github.com/gorilla/websocket"
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 4096,
CheckOrigin: func(r *http.Request) bool { return true },
}
// Hub manages WebSocket clients and broadcasts.
type Hub struct {
mu sync.RWMutex
clients map[*Client]bool
}
// Client is a single WebSocket connection.
type Client struct {
conn *websocket.Conn
send chan []byte
}
func NewHub() *Hub {
return &Hub{
clients: make(map[*Client]bool),
}
}
func (h *Hub) ClientCount() int {
h.mu.RLock()
defer h.mu.RUnlock()
return len(h.clients)
}
func (h *Hub) Register(c *Client) {
h.mu.Lock()
h.clients[c] = true
h.mu.Unlock()
log.Printf("[ws] client connected (%d total)", h.ClientCount())
}
func (h *Hub) Unregister(c *Client) {
h.mu.Lock()
if _, ok := h.clients[c]; ok {
delete(h.clients, c)
close(c.send)
}
h.mu.Unlock()
log.Printf("[ws] client disconnected (%d total)", h.ClientCount())
}
// Broadcast sends a message to all connected clients.
func (h *Hub) Broadcast(msg interface{}) {
data, err := json.Marshal(msg)
if err != nil {
log.Printf("[ws] marshal error: %v", err)
return
}
h.mu.RLock()
defer h.mu.RUnlock()
for c := range h.clients {
select {
case c.send <- data:
default:
// Client buffer full — drop
}
}
}
// ServeWS handles the WebSocket upgrade and runs the client.
func (h *Hub) ServeWS(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("[ws] upgrade error: %v", err)
return
}
client := &Client{
conn: conn,
send: make(chan []byte, 256),
}
h.Register(client)
go client.writePump()
go client.readPump(h)
}
// wsOrStatic upgrades WebSocket requests at any path, serves static files otherwise.
func wsOrStatic(hub *Hub, static http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.EqualFold(r.Header.Get("Upgrade"), "websocket") {
hub.ServeWS(w, r)
return
}
static.ServeHTTP(w, r)
})
}
func (c *Client) readPump(hub *Hub) {
defer func() {
hub.Unregister(c)
c.conn.Close()
}()
c.conn.SetReadLimit(512)
c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
c.conn.SetPongHandler(func(string) error {
c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
return nil
})
for {
_, _, err := c.conn.ReadMessage()
if err != nil {
break
}
}
}
func (c *Client) writePump() {
ticker := time.NewTicker(30 * time.Second)
defer func() {
ticker.Stop()
c.conn.Close()
}()
for {
select {
case message, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if !ok {
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
if err := c.conn.WriteMessage(websocket.TextMessage, message); err != nil {
return
}
case <-ticker.C:
c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
// Poller watches for new transmissions in SQLite and broadcasts them.
type Poller struct {
db *DB
hub *Hub
store *PacketStore // optional: if set, new transmissions are ingested into memory
interval time.Duration
stop chan struct{}
}
func NewPoller(db *DB, hub *Hub, interval time.Duration) *Poller {
return &Poller{db: db, hub: hub, interval: interval, stop: make(chan struct{})}
}
func (p *Poller) Start() {
lastID := p.db.GetMaxTransmissionID()
lastObsID := p.db.GetMaxObservationID()
log.Printf("[poller] starting from transmission ID %d, obs ID %d, interval %v", lastID, lastObsID, p.interval)
ticker := time.NewTicker(p.interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if p.store != nil {
// Ingest new transmissions into in-memory store and broadcast
newTxs, newMax := p.store.IngestNewFromDB(lastID, 100)
if newMax > lastID {
lastID = newMax
}
// Ingest new observations for existing transmissions (fixes #174)
nextObsID := lastObsID
if err := p.db.conn.QueryRow(`
SELECT COALESCE(MAX(id), ?) FROM (
SELECT id FROM observations
WHERE id > ?
ORDER BY id ASC
LIMIT 500
)`, lastObsID, lastObsID).Scan(&nextObsID); err != nil {
nextObsID = lastObsID
}
newObs := p.store.IngestNewObservations(lastObsID, 500)
if nextObsID > lastObsID {
lastObsID = nextObsID
}
if len(newTxs) > 0 {
log.Printf("[broadcast] sending %d packets to %d clients (lastID now %d)", len(newTxs), p.hub.ClientCount(), lastID)
}
for _, tx := range newTxs {
p.hub.Broadcast(WSMessage{
Type: "packet",
Data: tx,
})
}
for _, obs := range newObs {
p.hub.Broadcast(WSMessage{
Type: "packet",
Data: obs,
})
}
} else {
// Fallback: direct DB query (used when store is nil, e.g. tests)
newTxs, err := p.db.GetNewTransmissionsSince(lastID, 100)
if err != nil {
log.Printf("[poller] error: %v", err)
continue
}
for _, tx := range newTxs {
id, _ := tx["id"].(int)
if id > lastID {
lastID = id
}
// Copy packet fields for the nested packet (avoids circular ref)
pkt := make(map[string]interface{}, len(tx))
for k, v := range tx {
pkt[k] = v
}
tx["packet"] = pkt
p.hub.Broadcast(WSMessage{
Type: "packet",
Data: tx,
})
}
}
case <-p.stop:
return
}
}
}
func (p *Poller) Stop() {
close(p.stop)
}

View File

@@ -1,275 +1,415 @@
package main
import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/gorilla/websocket"
)
func TestHubBroadcast(t *testing.T) {
hub := NewHub()
if hub.ClientCount() != 0 {
t.Errorf("expected 0 clients, got %d", hub.ClientCount())
}
// Create a test server with WebSocket endpoint
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
hub.ServeWS(w, r)
}))
defer srv.Close()
// Connect a WebSocket client
wsURL := "ws" + srv.URL[4:] // replace http with ws
conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
if err != nil {
t.Fatalf("dial error: %v", err)
}
defer conn.Close()
// Wait for registration
time.Sleep(50 * time.Millisecond)
if hub.ClientCount() != 1 {
t.Errorf("expected 1 client, got %d", hub.ClientCount())
}
// Broadcast a message
hub.Broadcast(map[string]interface{}{
"type": "packet",
"data": map[string]interface{}{"id": 1, "hash": "test123"},
})
// Read the message
conn.SetReadDeadline(time.Now().Add(2 * time.Second))
_, msg, err := conn.ReadMessage()
if err != nil {
t.Fatalf("read error: %v", err)
}
if len(msg) == 0 {
t.Error("expected non-empty message")
}
// Disconnect
conn.Close()
time.Sleep(100 * time.Millisecond)
}
func TestPollerCreation(t *testing.T) {
db := setupTestDB(t)
defer db.Close()
seedTestData(t, db)
hub := NewHub()
poller := NewPoller(db, hub, 100*time.Millisecond)
if poller == nil {
t.Fatal("expected poller")
}
// Start and stop
go poller.Start()
time.Sleep(200 * time.Millisecond)
poller.Stop()
}
func TestHubMultipleClients(t *testing.T) {
hub := NewHub()
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
hub.ServeWS(w, r)
}))
defer srv.Close()
wsURL := "ws" + srv.URL[4:]
// Connect two clients
conn1, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
if err != nil {
t.Fatalf("dial error: %v", err)
}
defer conn1.Close()
conn2, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
if err != nil {
t.Fatalf("dial error: %v", err)
}
defer conn2.Close()
time.Sleep(100 * time.Millisecond)
if hub.ClientCount() != 2 {
t.Errorf("expected 2 clients, got %d", hub.ClientCount())
}
// Broadcast and both should receive
hub.Broadcast(map[string]interface{}{"type": "test", "data": "hello"})
conn1.SetReadDeadline(time.Now().Add(2 * time.Second))
_, msg1, err := conn1.ReadMessage()
if err != nil {
t.Fatalf("conn1 read error: %v", err)
}
if len(msg1) == 0 {
t.Error("expected non-empty message on conn1")
}
conn2.SetReadDeadline(time.Now().Add(2 * time.Second))
_, msg2, err := conn2.ReadMessage()
if err != nil {
t.Fatalf("conn2 read error: %v", err)
}
if len(msg2) == 0 {
t.Error("expected non-empty message on conn2")
}
// Disconnect one
conn1.Close()
time.Sleep(100 * time.Millisecond)
// Remaining client should still work
hub.Broadcast(map[string]interface{}{"type": "test2"})
conn2.SetReadDeadline(time.Now().Add(2 * time.Second))
_, msg3, err := conn2.ReadMessage()
if err != nil {
t.Fatalf("conn2 read error after disconnect: %v", err)
}
if len(msg3) == 0 {
t.Error("expected non-empty message")
}
}
func TestBroadcastFullBuffer(t *testing.T) {
hub := NewHub()
// Create a client with tiny buffer (1)
client := &Client{
send: make(chan []byte, 1),
}
hub.mu.Lock()
hub.clients[client] = true
hub.mu.Unlock()
// Fill the buffer
client.send <- []byte("first")
// This broadcast should drop the message (buffer full)
hub.Broadcast(map[string]interface{}{"type": "dropped"})
// Channel should still only have the first message
select {
case msg := <-client.send:
if string(msg) != "first" {
t.Errorf("expected 'first', got %s", string(msg))
}
default:
t.Error("expected message in channel")
}
// Clean up
hub.mu.Lock()
delete(hub.clients, client)
hub.mu.Unlock()
}
func TestBroadcastMarshalError(t *testing.T) {
hub := NewHub()
// Marshal error: functions can't be marshaled to JSON
hub.Broadcast(map[string]interface{}{"bad": func() {}})
// Should not panic — just log and return
}
func TestPollerBroadcastsNewData(t *testing.T) {
db := setupTestDB(t)
defer db.Close()
seedTestData(t, db)
hub := NewHub()
// Create a client to receive broadcasts
client := &Client{
send: make(chan []byte, 256),
}
hub.mu.Lock()
hub.clients[client] = true
hub.mu.Unlock()
poller := NewPoller(db, hub, 50*time.Millisecond)
go poller.Start()
// Insert new data to trigger broadcast
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type)
VALUES ('EEFF', 'newhash123456789', '2026-01-16T10:00:00Z', 1, 4)`)
time.Sleep(200 * time.Millisecond)
poller.Stop()
// Check if client received broadcast with packet field (fixes #162)
select {
case msg := <-client.send:
if len(msg) == 0 {
t.Error("expected non-empty broadcast message")
}
var parsed map[string]interface{}
if err := json.Unmarshal(msg, &parsed); err != nil {
t.Fatalf("failed to parse broadcast: %v", err)
}
if parsed["type"] != "packet" {
t.Errorf("expected type=packet, got %v", parsed["type"])
}
data, ok := parsed["data"].(map[string]interface{})
if !ok {
t.Fatal("expected data to be an object")
}
// packets.js filters on m.data.packet — must exist
pkt, ok := data["packet"]
if !ok || pkt == nil {
t.Error("expected data.packet to exist (required by packets.js WS handler)")
}
pktMap, ok := pkt.(map[string]interface{})
if !ok {
t.Fatal("expected data.packet to be an object")
}
// Verify key fields exist in nested packet (timestamp required by packets.js)
for _, field := range []string{"id", "hash", "payload_type", "timestamp"} {
if _, exists := pktMap[field]; !exists {
t.Errorf("expected data.packet.%s to exist", field)
}
}
default:
// Might not have received due to timing
}
// Clean up
hub.mu.Lock()
delete(hub.clients, client)
hub.mu.Unlock()
}
func TestHubRegisterUnregister(t *testing.T) {
hub := NewHub()
client := &Client{
send: make(chan []byte, 256),
}
hub.Register(client)
if hub.ClientCount() != 1 {
t.Errorf("expected 1 client after register, got %d", hub.ClientCount())
}
hub.Unregister(client)
if hub.ClientCount() != 0 {
t.Errorf("expected 0 clients after unregister, got %d", hub.ClientCount())
}
// Unregister again should be safe
hub.Unregister(client)
if hub.ClientCount() != 0 {
t.Errorf("expected 0 clients, got %d", hub.ClientCount())
}
}
package main
import (
"encoding/json"
"net/http"
"net/http/httptest"
"sort"
"testing"
"time"
"github.com/gorilla/websocket"
)
func TestHubBroadcast(t *testing.T) {
hub := NewHub()
if hub.ClientCount() != 0 {
t.Errorf("expected 0 clients, got %d", hub.ClientCount())
}
// Create a test server with WebSocket endpoint
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
hub.ServeWS(w, r)
}))
defer srv.Close()
// Connect a WebSocket client
wsURL := "ws" + srv.URL[4:] // replace http with ws
conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
if err != nil {
t.Fatalf("dial error: %v", err)
}
defer conn.Close()
// Wait for registration
time.Sleep(50 * time.Millisecond)
if hub.ClientCount() != 1 {
t.Errorf("expected 1 client, got %d", hub.ClientCount())
}
// Broadcast a message
hub.Broadcast(map[string]interface{}{
"type": "packet",
"data": map[string]interface{}{"id": 1, "hash": "test123"},
})
// Read the message
conn.SetReadDeadline(time.Now().Add(2 * time.Second))
_, msg, err := conn.ReadMessage()
if err != nil {
t.Fatalf("read error: %v", err)
}
if len(msg) == 0 {
t.Error("expected non-empty message")
}
// Disconnect
conn.Close()
time.Sleep(100 * time.Millisecond)
}
func TestPollerCreation(t *testing.T) {
db := setupTestDB(t)
defer db.Close()
seedTestData(t, db)
hub := NewHub()
poller := NewPoller(db, hub, 100*time.Millisecond)
if poller == nil {
t.Fatal("expected poller")
}
// Start and stop
go poller.Start()
time.Sleep(200 * time.Millisecond)
poller.Stop()
}
func TestHubMultipleClients(t *testing.T) {
hub := NewHub()
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
hub.ServeWS(w, r)
}))
defer srv.Close()
wsURL := "ws" + srv.URL[4:]
// Connect two clients
conn1, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
if err != nil {
t.Fatalf("dial error: %v", err)
}
defer conn1.Close()
conn2, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
if err != nil {
t.Fatalf("dial error: %v", err)
}
defer conn2.Close()
time.Sleep(100 * time.Millisecond)
if hub.ClientCount() != 2 {
t.Errorf("expected 2 clients, got %d", hub.ClientCount())
}
// Broadcast and both should receive
hub.Broadcast(map[string]interface{}{"type": "test", "data": "hello"})
conn1.SetReadDeadline(time.Now().Add(2 * time.Second))
_, msg1, err := conn1.ReadMessage()
if err != nil {
t.Fatalf("conn1 read error: %v", err)
}
if len(msg1) == 0 {
t.Error("expected non-empty message on conn1")
}
conn2.SetReadDeadline(time.Now().Add(2 * time.Second))
_, msg2, err := conn2.ReadMessage()
if err != nil {
t.Fatalf("conn2 read error: %v", err)
}
if len(msg2) == 0 {
t.Error("expected non-empty message on conn2")
}
// Disconnect one
conn1.Close()
time.Sleep(100 * time.Millisecond)
// Remaining client should still work
hub.Broadcast(map[string]interface{}{"type": "test2"})
conn2.SetReadDeadline(time.Now().Add(2 * time.Second))
_, msg3, err := conn2.ReadMessage()
if err != nil {
t.Fatalf("conn2 read error after disconnect: %v", err)
}
if len(msg3) == 0 {
t.Error("expected non-empty message")
}
}
func TestBroadcastFullBuffer(t *testing.T) {
hub := NewHub()
// Create a client with tiny buffer (1)
client := &Client{
send: make(chan []byte, 1),
}
hub.mu.Lock()
hub.clients[client] = true
hub.mu.Unlock()
// Fill the buffer
client.send <- []byte("first")
// This broadcast should drop the message (buffer full)
hub.Broadcast(map[string]interface{}{"type": "dropped"})
// Channel should still only have the first message
select {
case msg := <-client.send:
if string(msg) != "first" {
t.Errorf("expected 'first', got %s", string(msg))
}
default:
t.Error("expected message in channel")
}
// Clean up
hub.mu.Lock()
delete(hub.clients, client)
hub.mu.Unlock()
}
func TestBroadcastMarshalError(t *testing.T) {
hub := NewHub()
// Marshal error: functions can't be marshaled to JSON
hub.Broadcast(map[string]interface{}{"bad": func() {}})
// Should not panic — just log and return
}
func TestPollerBroadcastsNewData(t *testing.T) {
db := setupTestDB(t)
defer db.Close()
seedTestData(t, db)
hub := NewHub()
// Create a client to receive broadcasts
client := &Client{
send: make(chan []byte, 256),
}
hub.mu.Lock()
hub.clients[client] = true
hub.mu.Unlock()
poller := NewPoller(db, hub, 50*time.Millisecond)
go poller.Start()
// Insert new data to trigger broadcast
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type)
VALUES ('EEFF', 'newhash123456789', '2026-01-16T10:00:00Z', 1, 4)`)
time.Sleep(200 * time.Millisecond)
poller.Stop()
// Check if client received broadcast with packet field (fixes #162)
select {
case msg := <-client.send:
if len(msg) == 0 {
t.Error("expected non-empty broadcast message")
}
var parsed map[string]interface{}
if err := json.Unmarshal(msg, &parsed); err != nil {
t.Fatalf("failed to parse broadcast: %v", err)
}
if parsed["type"] != "packet" {
t.Errorf("expected type=packet, got %v", parsed["type"])
}
data, ok := parsed["data"].(map[string]interface{})
if !ok {
t.Fatal("expected data to be an object")
}
// packets.js filters on m.data.packet — must exist
pkt, ok := data["packet"]
if !ok || pkt == nil {
t.Error("expected data.packet to exist (required by packets.js WS handler)")
}
pktMap, ok := pkt.(map[string]interface{})
if !ok {
t.Fatal("expected data.packet to be an object")
}
// Verify key fields exist in nested packet (timestamp required by packets.js)
for _, field := range []string{"id", "hash", "payload_type", "timestamp"} {
if _, exists := pktMap[field]; !exists {
t.Errorf("expected data.packet.%s to exist", field)
}
}
default:
// Might not have received due to timing
}
// Clean up
hub.mu.Lock()
delete(hub.clients, client)
hub.mu.Unlock()
}
func TestPollerBroadcastsMultipleObservations(t *testing.T) {
db := setupTestDB(t)
defer db.Close()
seedTestData(t, db)
hub := NewHub()
client := &Client{
send: make(chan []byte, 256),
}
hub.mu.Lock()
hub.clients[client] = true
hub.mu.Unlock()
defer func() {
hub.mu.Lock()
delete(hub.clients, client)
hub.mu.Unlock()
}()
poller := NewPoller(db, hub, 50*time.Millisecond)
store := NewPacketStore(db)
if err := store.Load(); err != nil {
t.Fatalf("store load failed: %v", err)
}
poller.store = store
go poller.Start()
defer poller.Stop()
// Wait for poller to initialize its lastID/lastObsID cursors before
// inserting new data; otherwise the poller may snapshot a lastID that
// already includes the test data and never broadcast it.
time.Sleep(100 * time.Millisecond)
now := time.Now().UTC().Format(time.RFC3339)
if _, err := db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json)
VALUES ('FACE', 'starbursthash237a', ?, 1, 4, '{"pubKey":"aabbccdd11223344","type":"ADVERT"}')`, now); err != nil {
t.Fatalf("insert tx failed: %v", err)
}
var txID int
if err := db.conn.QueryRow(`SELECT id FROM transmissions WHERE hash='starbursthash237a'`).Scan(&txID); err != nil {
t.Fatalf("query tx id failed: %v", err)
}
ts := time.Now().Unix()
if _, err := db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
VALUES (?, 1, 14.0, -82, '["aa"]', ?),
(?, 2, 10.5, -90, '["aa","bb"]', ?),
(?, 1, 7.0, -96, '["aa","bb","cc"]', ?)`,
txID, ts, txID, ts+1, txID, ts+2); err != nil {
t.Fatalf("insert observations failed: %v", err)
}
deadline := time.After(2 * time.Second)
var dataMsgs []map[string]interface{}
for len(dataMsgs) < 3 {
select {
case raw := <-client.send:
var parsed map[string]interface{}
if err := json.Unmarshal(raw, &parsed); err != nil {
t.Fatalf("unmarshal ws msg failed: %v", err)
}
if parsed["type"] != "packet" {
continue
}
data, ok := parsed["data"].(map[string]interface{})
if !ok {
continue
}
if data["hash"] == "starbursthash237a" {
dataMsgs = append(dataMsgs, data)
}
case <-deadline:
t.Fatalf("timed out waiting for 3 observation broadcasts, got %d", len(dataMsgs))
}
}
if len(dataMsgs) != 3 {
t.Fatalf("expected 3 messages, got %d", len(dataMsgs))
}
paths := make([]string, 0, 3)
observers := make(map[string]bool)
for _, m := range dataMsgs {
hash, _ := m["hash"].(string)
if hash != "starbursthash237a" {
t.Fatalf("unexpected hash %q", hash)
}
p, _ := m["path_json"].(string)
paths = append(paths, p)
if oid, ok := m["observer_id"].(string); ok && oid != "" {
observers[oid] = true
}
}
sort.Strings(paths)
wantPaths := []string{`["aa","bb","cc"]`, `["aa","bb"]`, `["aa"]`}
sort.Strings(wantPaths)
for i := range wantPaths {
if paths[i] != wantPaths[i] {
t.Fatalf("path mismatch at %d: got %q want %q", i, paths[i], wantPaths[i])
}
}
if len(observers) < 2 {
t.Fatalf("expected observations from >=2 observers, got %d", len(observers))
}
}
func TestIngestNewObservationsBroadcast(t *testing.T) {
db := setupTestDB(t)
defer db.Close()
seedTestData(t, db)
store := NewPacketStore(db)
if err := store.Load(); err != nil {
t.Fatalf("store load failed: %v", err)
}
maxObs := db.GetMaxObservationID()
now := time.Now().Unix()
if _, err := db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
VALUES (1, 2, 6.0, -100, '["aa","zz"]', ?),
(1, 1, 5.0, -101, '["aa","yy"]', ?)`, now, now+1); err != nil {
t.Fatalf("insert new observations failed: %v", err)
}
maps := store.IngestNewObservations(maxObs, 500)
if len(maps) != 2 {
t.Fatalf("expected 2 broadcast maps, got %d", len(maps))
}
for _, m := range maps {
if m["hash"] != "abc123def4567890" {
t.Fatalf("unexpected hash in map: %v", m["hash"])
}
path, ok := m["path_json"].(string)
if !ok || path == "" {
t.Fatalf("missing path_json in map: %#v", m)
}
if _, ok := m["observer_id"]; !ok {
t.Fatalf("missing observer_id in map: %#v", m)
}
}
}
func TestHubRegisterUnregister(t *testing.T) {
hub := NewHub()
client := &Client{
send: make(chan []byte, 256),
}
hub.Register(client)
if hub.ClientCount() != 1 {
t.Errorf("expected 1 client after register, got %d", hub.ClientCount())
}
hub.Unregister(client)
if hub.ClientCount() != 0 {
t.Errorf("expected 0 clients after unregister, got %d", hub.ClientCount())
}
// Unregister again should be safe
hub.Unregister(client)
if hub.ClientCount() != 0 {
t.Errorf("expected 0 clients, got %d", hub.ClientCount())
}
}

View File

@@ -9,6 +9,8 @@ services:
image: corescope:latest
container_name: corescope-prod
restart: unless-stopped
extra_hosts:
- "host.docker.internal:host-gateway"
ports:
- "${PROD_HTTP_PORT:-80}:${PROD_HTTP_PORT:-80}"
- "${PROD_HTTPS_PORT:-443}:${PROD_HTTPS_PORT:-443}"
@@ -31,6 +33,8 @@ services:
image: corescope:latest
container_name: corescope-staging
restart: unless-stopped
extra_hosts:
- "host.docker.internal:host-gateway"
ports:
- "${STAGING_HTTP_PORT:-81}:${STAGING_HTTP_PORT:-81}"
- "${STAGING_MQTT_PORT:-1884}:1883"
@@ -59,6 +63,8 @@ services:
image: corescope-go:latest
container_name: corescope-staging-go
restart: unless-stopped
extra_hosts:
- "host.docker.internal:host-gateway"
ports:
- "${STAGING_GO_HTTP_PORT:-82}:80"
- "${STAGING_GO_MQTT_PORT:-1885}:1883"