Compare commits

..

13 Commits

Author SHA1 Message Date
Kpa-clawbot
678c4ce8da ci: force bash shell for all workflow steps
Self-hosted runner defaults to PowerShell on Windows, causing bash
syntax (if/then/fi, curl line continuations) to fail with parse errors.
Setting defaults.run.shell=bash at workflow level fixes all steps.
2026-03-29 15:39:08 +00:00
Kpa-clawbot
752f25382a fix: update TestTransportCodes to match new byte order
The test data still used the old byte order (header, pathByte, transport_codes)
but the decoder now expects (header, transport_codes, pathByte). Reorder the
test hex string accordingly.
2026-03-29 15:37:31 +00:00
you
3bbd986d41 fix: add sleep before poller data insert to prevent race condition in tests
The poller's Start() calls GetMaxTransmissionID() to initialize its cursor.
When the test goroutine inserts data between go poller.Start() and the
actual GetMaxTransmissionID() call, the poller's cursor skips past the
test data and never broadcasts it, causing a timeout.

Adding a 100ms sleep after go poller.Start() ensures the poller has
initialized its cursors before the test inserts new data.
2026-03-29 08:32:37 -07:00
you
712fa15a8c fix: force single SQLite connection in test DBs to prevent in-memory table visibility issues
SQLite :memory: databases create separate databases per connection.
When the connection pool opens multiple connections (e.g. poller goroutine
vs main test goroutine), tables created on one connection are invisible
to others. Setting MaxOpenConns(1) ensures all queries use the same
in-memory database, fixing TestPollerBroadcastsMultipleObservations.
2026-03-29 08:32:37 -07:00
Kpa-clawbot
ab03b142f5 fix: per-observation WS broadcast for live view starburst — fixes #237
IngestNewFromDB now broadcasts one message per observation (not per
transmission). IngestNewObservations also broadcasts late arrivals.
Tests verify multi-observer packets produce multiple WS messages.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-03-29 08:32:37 -07:00
you
def95aae64 fix: align packet decoder with MeshCore firmware spec
Compared decoder.js against the MeshCore firmware source (Dispatcher.cpp,
Packet.h, Mesh.cpp, AdvertDataHelpers.h) and fixed all mismatches:

1. Field order: transport codes now parsed BEFORE path_length byte,
   matching the spec: [header][transport_codes?][path_length][path][payload]

2. ACK payload: was incorrectly decoded as dest(1)+src(1)+ackHash(4).
   Firmware shows ACK is just checksum(4) — no dest/src hashes.

3. TRACE payload: was incorrectly decoded as flags(1)+tag(4)+dest(6)+src(1).
   Firmware shows tag(4)+authCode(4)+flags(1)+pathData.

4. ADVERT appdata: added missing feature1 (0x20 flag) and feature2
   (0x40 flag) parsing — 2-byte fields between location and name.

5. Transport code field naming: renamed nextHop/lastHop to code1/code2
   to match spec terminology (transport_code_1/transport_code_2).

6. Fixed incorrect field size labels in packets.js hex breakdown:
   dest/src are 1 byte, MAC is 2 bytes (not 6B/6B/4B).

7. Fixed ANON_REQ/PATH comment typos (dest was listed as 6 bytes,
   MAC as 4 bytes — both wrong, code was already correct).

All 329 tests pass (66 decoder + 263 spec/golden).
2026-03-29 08:32:16 -07:00
you
1b09c733f5 ci: restrict self-hosted jobs to Linux runners
The Windows self-hosted runner picks up jobs and fails because bash
scripts run in PowerShell. Node.js tests need Chromium/Playwright
(Linux-only), and build/deploy/publish use Docker (Linux-only).

Changes:
- node-test: runs-on: [self-hosted, Linux]
- build: runs-on: [self-hosted, Linux]
- deploy: runs-on: [self-hosted, Linux]
- publish: runs-on: [self-hosted, Linux]
- go-test: unchanged (ubuntu-latest)
2026-03-29 14:58:15 +00:00
Kpa-clawbot
553c0e4963 ci: bump GitHub Actions to Node 24 compatible versions
checkout v4→v5, setup-go v5→v6, setup-node v4→v5,
upload-artifact v4→v5, download-artifact v4→v5

Fixes the Node.js 20 deprecation warning.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-03-29 07:51:48 -07:00
efiten
8ede8427c8 fix: round Go Runtime floats to 1dp, prevent nav stats dot wrapping
- perf.js: toFixed(1) on all ms/MB values in Go Runtime section
- style.css: white-space: nowrap on .nav-stats to prevent the · separator
  from wrapping onto its own line

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-29 07:51:26 -07:00
you
8e66c68d6f fix: cache hit rate excludes stale hits + debounce bulk-health invalidation
Two cache bugs fixed:

1. Hit rate formula excluded stale hits — reported rate was artificially low
   because stale-while-revalidate responses (which ARE cache hits from the
   caller's perspective) were not counted. Changed formula from
   hits/(hits+misses) to (hits+staleHits)/(hits+staleHits+misses).

2. Bulk-health cache invalidated on every advert packet — in a mesh with
   dozens of nodes advertising every few seconds, this caused the expensive
   bulk-health query to be recomputed on nearly every request, defeating
   the cache entirely. Switched to 30s debounced invalidation via
   debouncedInvalidateBulkHealth().

Added regression test for hit rate formula in test-server-routes.js.
2026-03-29 07:51:08 -07:00
you
37396823ad fix: align Go packet decoder with MeshCore firmware spec
Match the C++ firmware wire format (Packet::writeTo/readFrom):

1. Field order: transport codes are parsed BEFORE path_length byte,
   matching firmware's header → transport_codes → path_len → path → payload

2. ACK payload: just 4-byte CRC checksum, not dest+src+ackHash.
   Firmware createAck() writes only ack_crc (4 bytes).

3. TRACE payload: tag(4) + authCode(4) + flags(1) + pathData,
   matching firmware createTrace() and onRecvPacket() TRACE handler.

4. ADVERT features: parse feat1 (0x20) and feat2 (0x40) optional
   2-byte fields between location and name, matching AdvertDataBuilder
   and AdvertDataParser in the firmware.

5. Transport code naming: code1/code2 instead of nextHop/lastHop,
   matching firmware's transport_codes[0]/transport_codes[1] naming.

Fixes applied to both cmd/ingestor/decoder.go and cmd/server/decoder.go.
Tests updated to match new behavior.
2026-03-29 07:50:51 -07:00
efiten
092d0809f0 fix: stop wiping analytics cache on every ingest cycle
The 15s TTL already handles freshness — clearing all cache maps on
every 1-second poll meant entries were never reused, giving 0% server
hit rate and forcing every analytics request back to SQLite.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-29 07:48:38 -07:00
you
074f3d3760 ci: cancel workflow run immediately when any test job fails
When go-test or node-test fails, the workflow run is now cancelled
via the GitHub API so the sibling job doesn't sit queued/running.

Also fixed build job to need both go-test AND node-test (was only
waiting on go-test despite the pipeline comment saying both gate it).
2026-03-29 14:20:22 +00:00
19 changed files with 5257 additions and 4823 deletions

View File

@@ -20,6 +20,10 @@ concurrency:
group: deploy-${{ github.event.pull_request.number || github.ref }} group: deploy-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true cancel-in-progress: true
defaults:
run:
shell: bash
env: env:
FORCE_JAVASCRIPT_ACTIONS_TO_NODE24: true FORCE_JAVASCRIPT_ACTIONS_TO_NODE24: true
@@ -122,6 +126,13 @@ jobs:
echo "| Server | ${SERVER_COV}% |" >> $GITHUB_STEP_SUMMARY echo "| Server | ${SERVER_COV}% |" >> $GITHUB_STEP_SUMMARY
echo "| Ingestor | ${INGESTOR_COV}% |" >> $GITHUB_STEP_SUMMARY echo "| Ingestor | ${INGESTOR_COV}% |" >> $GITHUB_STEP_SUMMARY
- name: Cancel workflow on failure
if: failure()
run: |
curl -s -X POST \
-H "Authorization: Bearer ${{ github.token }}" \
"https://api.github.com/repos/${{ github.repository }}/actions/runs/${{ github.run_id }}/cancel"
- name: Upload Go coverage badges - name: Upload Go coverage badges
if: always() if: always()
uses: actions/upload-artifact@v5 uses: actions/upload-artifact@v5
@@ -136,7 +147,7 @@ jobs:
# ─────────────────────────────────────────────────────────────── # ───────────────────────────────────────────────────────────────
node-test: node-test:
name: "🧪 Node.js Tests" name: "🧪 Node.js Tests"
runs-on: self-hosted runs-on: [self-hosted, Linux]
steps: steps:
- name: Checkout code - name: Checkout code
uses: actions/checkout@v5 uses: actions/checkout@v5
@@ -263,6 +274,13 @@ jobs:
BASE_URL=http://localhost:13581 node test-e2e-playwright.js || true BASE_URL=http://localhost:13581 node test-e2e-playwright.js || true
kill $SERVER_PID 2>/dev/null || true kill $SERVER_PID 2>/dev/null || true
- name: Cancel workflow on failure
if: failure()
run: |
curl -s -X POST \
-H "Authorization: Bearer ${{ github.token }}" \
"https://api.github.com/repos/${{ github.repository }}/actions/runs/${{ github.run_id }}/cancel"
- name: Upload Node.js test badges - name: Upload Node.js test badges
if: always() if: always()
uses: actions/upload-artifact@v5 uses: actions/upload-artifact@v5
@@ -278,8 +296,8 @@ jobs:
build: build:
name: "🏗️ Build Docker Image" name: "🏗️ Build Docker Image"
if: github.event_name == 'push' if: github.event_name == 'push'
needs: [go-test] needs: [go-test, node-test]
runs-on: self-hosted runs-on: [self-hosted, Linux]
steps: steps:
- name: Checkout code - name: Checkout code
uses: actions/checkout@v5 uses: actions/checkout@v5
@@ -304,7 +322,7 @@ jobs:
name: "🚀 Deploy Staging" name: "🚀 Deploy Staging"
if: github.event_name == 'push' if: github.event_name == 'push'
needs: [build] needs: [build]
runs-on: self-hosted runs-on: [self-hosted, Linux]
steps: steps:
- name: Checkout code - name: Checkout code
uses: actions/checkout@v5 uses: actions/checkout@v5
@@ -349,7 +367,7 @@ jobs:
name: "📝 Publish Badges & Summary" name: "📝 Publish Badges & Summary"
if: github.event_name == 'push' if: github.event_name == 'push'
needs: [deploy] needs: [deploy]
runs-on: self-hosted runs-on: [self-hosted, Linux]
steps: steps:
- name: Checkout code - name: Checkout code
uses: actions/checkout@v5 uses: actions/checkout@v5

View File

@@ -72,8 +72,8 @@ type Header struct {
// TransportCodes are present on TRANSPORT_FLOOD and TRANSPORT_DIRECT routes. // TransportCodes are present on TRANSPORT_FLOOD and TRANSPORT_DIRECT routes.
type TransportCodes struct { type TransportCodes struct {
NextHop string `json:"nextHop"` Code1 string `json:"code1"`
LastHop string `json:"lastHop"` Code2 string `json:"code2"`
} }
// Path holds decoded path/hop information. // Path holds decoded path/hop information.
@@ -92,6 +92,8 @@ type AdvertFlags struct {
Room bool `json:"room"` Room bool `json:"room"`
Sensor bool `json:"sensor"` Sensor bool `json:"sensor"`
HasLocation bool `json:"hasLocation"` HasLocation bool `json:"hasLocation"`
HasFeat1 bool `json:"hasFeat1"`
HasFeat2 bool `json:"hasFeat2"`
HasName bool `json:"hasName"` HasName bool `json:"hasName"`
} }
@@ -111,6 +113,8 @@ type Payload struct {
Lat *float64 `json:"lat,omitempty"` Lat *float64 `json:"lat,omitempty"`
Lon *float64 `json:"lon,omitempty"` Lon *float64 `json:"lon,omitempty"`
Name string `json:"name,omitempty"` Name string `json:"name,omitempty"`
Feat1 *int `json:"feat1,omitempty"`
Feat2 *int `json:"feat2,omitempty"`
BatteryMv *int `json:"battery_mv,omitempty"` BatteryMv *int `json:"battery_mv,omitempty"`
TemperatureC *float64 `json:"temperature_c,omitempty"` TemperatureC *float64 `json:"temperature_c,omitempty"`
ChannelHash int `json:"channelHash,omitempty"` ChannelHash int `json:"channelHash,omitempty"`
@@ -123,6 +127,8 @@ type Payload struct {
EphemeralPubKey string `json:"ephemeralPubKey,omitempty"` EphemeralPubKey string `json:"ephemeralPubKey,omitempty"`
PathData string `json:"pathData,omitempty"` PathData string `json:"pathData,omitempty"`
Tag uint32 `json:"tag,omitempty"` Tag uint32 `json:"tag,omitempty"`
AuthCode uint32 `json:"authCode,omitempty"`
TraceFlags *int `json:"traceFlags,omitempty"`
RawHex string `json:"raw,omitempty"` RawHex string `json:"raw,omitempty"`
Error string `json:"error,omitempty"` Error string `json:"error,omitempty"`
} }
@@ -199,14 +205,13 @@ func decodeEncryptedPayload(typeName string, buf []byte) Payload {
} }
func decodeAck(buf []byte) Payload { func decodeAck(buf []byte) Payload {
if len(buf) < 6 { if len(buf) < 4 {
return Payload{Type: "ACK", Error: "too short", RawHex: hex.EncodeToString(buf)} return Payload{Type: "ACK", Error: "too short", RawHex: hex.EncodeToString(buf)}
} }
checksum := binary.LittleEndian.Uint32(buf[0:4])
return Payload{ return Payload{
Type: "ACK", Type: "ACK",
DestHash: hex.EncodeToString(buf[0:1]), ExtraHash: fmt.Sprintf("%08x", checksum),
SrcHash: hex.EncodeToString(buf[1:2]),
ExtraHash: hex.EncodeToString(buf[2:6]),
} }
} }
@@ -231,6 +236,8 @@ func decodeAdvert(buf []byte) Payload {
if len(appdata) > 0 { if len(appdata) > 0 {
flags := appdata[0] flags := appdata[0]
advType := int(flags & 0x0F) advType := int(flags & 0x0F)
hasFeat1 := flags&0x20 != 0
hasFeat2 := flags&0x40 != 0
p.Flags = &AdvertFlags{ p.Flags = &AdvertFlags{
Raw: int(flags), Raw: int(flags),
Type: advType, Type: advType,
@@ -239,6 +246,8 @@ func decodeAdvert(buf []byte) Payload {
Room: advType == 3, Room: advType == 3,
Sensor: advType == 4, Sensor: advType == 4,
HasLocation: flags&0x10 != 0, HasLocation: flags&0x10 != 0,
HasFeat1: hasFeat1,
HasFeat2: hasFeat2,
HasName: flags&0x80 != 0, HasName: flags&0x80 != 0,
} }
@@ -252,6 +261,16 @@ func decodeAdvert(buf []byte) Payload {
p.Lon = &lon p.Lon = &lon
off += 8 off += 8
} }
if hasFeat1 && len(appdata) >= off+2 {
feat1 := int(binary.LittleEndian.Uint16(appdata[off : off+2]))
p.Feat1 = &feat1
off += 2
}
if hasFeat2 && len(appdata) >= off+2 {
feat2 := int(binary.LittleEndian.Uint16(appdata[off : off+2]))
p.Feat2 = &feat2
off += 2
}
if p.Flags.HasName { if p.Flags.HasName {
// Find null terminator to separate name from trailing telemetry bytes // Find null terminator to separate name from trailing telemetry bytes
nameEnd := len(appdata) nameEnd := len(appdata)
@@ -469,15 +488,22 @@ func decodePathPayload(buf []byte) Payload {
} }
func decodeTrace(buf []byte) Payload { func decodeTrace(buf []byte) Payload {
if len(buf) < 12 { if len(buf) < 9 {
return Payload{Type: "TRACE", Error: "too short", RawHex: hex.EncodeToString(buf)} return Payload{Type: "TRACE", Error: "too short", RawHex: hex.EncodeToString(buf)}
} }
return Payload{ tag := binary.LittleEndian.Uint32(buf[0:4])
Type: "TRACE", authCode := binary.LittleEndian.Uint32(buf[4:8])
DestHash: hex.EncodeToString(buf[5:11]), flags := int(buf[8])
SrcHash: hex.EncodeToString(buf[11:12]), p := Payload{
Tag: binary.LittleEndian.Uint32(buf[1:5]), Type: "TRACE",
Tag: tag,
AuthCode: authCode,
TraceFlags: &flags,
} }
if len(buf) > 9 {
p.PathData = hex.EncodeToString(buf[9:])
}
return p
} }
func decodePayload(payloadType int, buf []byte, channelKeys map[string]string) Payload { func decodePayload(payloadType int, buf []byte, channelKeys map[string]string) Payload {
@@ -520,8 +546,7 @@ func DecodePacket(hexString string, channelKeys map[string]string) (*DecodedPack
} }
header := decodeHeader(buf[0]) header := decodeHeader(buf[0])
pathByte := buf[1] offset := 1
offset := 2
var tc *TransportCodes var tc *TransportCodes
if isTransportRoute(header.RouteType) { if isTransportRoute(header.RouteType) {
@@ -529,12 +554,18 @@ func DecodePacket(hexString string, channelKeys map[string]string) (*DecodedPack
return nil, fmt.Errorf("packet too short for transport codes") return nil, fmt.Errorf("packet too short for transport codes")
} }
tc = &TransportCodes{ tc = &TransportCodes{
NextHop: strings.ToUpper(hex.EncodeToString(buf[offset : offset+2])), Code1: strings.ToUpper(hex.EncodeToString(buf[offset : offset+2])),
LastHop: strings.ToUpper(hex.EncodeToString(buf[offset+2 : offset+4])), Code2: strings.ToUpper(hex.EncodeToString(buf[offset+2 : offset+4])),
} }
offset += 4 offset += 4
} }
if offset >= len(buf) {
return nil, fmt.Errorf("packet too short (no path byte)")
}
pathByte := buf[offset]
offset++
path, bytesConsumed := decodePath(pathByte, buf, offset) path, bytesConsumed := decodePath(pathByte, buf, offset)
offset += bytesConsumed offset += bytesConsumed
@@ -562,16 +593,24 @@ func ComputeContentHash(rawHex string) string {
return rawHex return rawHex
} }
pathByte := buf[1] headerByte := buf[0]
offset := 1
if isTransportRoute(int(headerByte & 0x03)) {
offset += 4
}
if offset >= len(buf) {
if len(rawHex) >= 16 {
return rawHex[:16]
}
return rawHex
}
pathByte := buf[offset]
offset++
hashSize := int((pathByte>>6)&0x3) + 1 hashSize := int((pathByte>>6)&0x3) + 1
hashCount := int(pathByte & 0x3F) hashCount := int(pathByte & 0x3F)
pathBytes := hashSize * hashCount pathBytes := hashSize * hashCount
headerByte := buf[0] payloadStart := offset + pathBytes
payloadStart := 2 + pathBytes
if isTransportRoute(int(headerByte & 0x03)) {
payloadStart += 4
}
if payloadStart > len(buf) { if payloadStart > len(buf) {
if len(rawHex) >= 16 { if len(rawHex) >= 16 {
return rawHex[:16] return rawHex[:16]

View File

@@ -129,7 +129,8 @@ func TestDecodePath3ByteHashes(t *testing.T) {
func TestTransportCodes(t *testing.T) { func TestTransportCodes(t *testing.T) {
// Route type 0 (TRANSPORT_FLOOD) should have transport codes // Route type 0 (TRANSPORT_FLOOD) should have transport codes
hex := "1400" + "AABB" + "CCDD" + "1A" + strings.Repeat("00", 10) // Firmware order: header + transport_codes(4) + path_len + path + payload
hex := "14" + "AABB" + "CCDD" + "00" + strings.Repeat("00", 10)
pkt, err := DecodePacket(hex, nil) pkt, err := DecodePacket(hex, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@@ -140,11 +141,11 @@ func TestTransportCodes(t *testing.T) {
if pkt.TransportCodes == nil { if pkt.TransportCodes == nil {
t.Fatal("transportCodes should not be nil for TRANSPORT_FLOOD") t.Fatal("transportCodes should not be nil for TRANSPORT_FLOOD")
} }
if pkt.TransportCodes.NextHop != "AABB" { if pkt.TransportCodes.Code1 != "AABB" {
t.Errorf("nextHop=%s, want AABB", pkt.TransportCodes.NextHop) t.Errorf("code1=%s, want AABB", pkt.TransportCodes.Code1)
} }
if pkt.TransportCodes.LastHop != "CCDD" { if pkt.TransportCodes.Code2 != "CCDD" {
t.Errorf("lastHop=%s, want CCDD", pkt.TransportCodes.LastHop) t.Errorf("code2=%s, want CCDD", pkt.TransportCodes.Code2)
} }
// Route type 1 (FLOOD) should NOT have transport codes // Route type 1 (FLOOD) should NOT have transport codes
@@ -537,10 +538,11 @@ func TestDecodeTraceShort(t *testing.T) {
func TestDecodeTraceValid(t *testing.T) { func TestDecodeTraceValid(t *testing.T) {
buf := make([]byte, 16) buf := make([]byte, 16)
buf[0] = 0x00 // tag(4) + authCode(4) + flags(1) + pathData
buf[1] = 0x01 // tag LE uint32 = 1 binary.LittleEndian.PutUint32(buf[0:4], 1) // tag = 1
buf[5] = 0xAA // destHash start binary.LittleEndian.PutUint32(buf[4:8], 0xDEADBEEF) // authCode
buf[11] = 0xBB buf[8] = 0x02 // flags
buf[9] = 0xAA // path data
p := decodeTrace(buf) p := decodeTrace(buf)
if p.Error != "" { if p.Error != "" {
t.Errorf("unexpected error: %s", p.Error) t.Errorf("unexpected error: %s", p.Error)
@@ -548,9 +550,18 @@ func TestDecodeTraceValid(t *testing.T) {
if p.Tag != 1 { if p.Tag != 1 {
t.Errorf("tag=%d, want 1", p.Tag) t.Errorf("tag=%d, want 1", p.Tag)
} }
if p.AuthCode != 0xDEADBEEF {
t.Errorf("authCode=%d, want 0xDEADBEEF", p.AuthCode)
}
if p.TraceFlags == nil || *p.TraceFlags != 2 {
t.Errorf("traceFlags=%v, want 2", p.TraceFlags)
}
if p.Type != "TRACE" { if p.Type != "TRACE" {
t.Errorf("type=%s, want TRACE", p.Type) t.Errorf("type=%s, want TRACE", p.Type)
} }
if p.PathData == "" {
t.Error("pathData should not be empty")
}
} }
func TestDecodeAdvertShort(t *testing.T) { func TestDecodeAdvertShort(t *testing.T) {
@@ -833,10 +844,9 @@ func TestComputeContentHashShortHex(t *testing.T) {
} }
func TestComputeContentHashTransportRoute(t *testing.T) { func TestComputeContentHashTransportRoute(t *testing.T) {
// Route type 0 (TRANSPORT_FLOOD) with no path hops + 4 transport code bytes // Route type 0 (TRANSPORT_FLOOD) with transport codes then path=0x00 (0 hops)
// header=0x14 (TRANSPORT_FLOOD, ADVERT), path=0x00 (0 hops) // header=0x14 (TRANSPORT_FLOOD, ADVERT), transport(4), path=0x00
// transport codes = 4 bytes, then payload hex := "14" + "AABBCCDD" + "00" + strings.Repeat("EE", 10)
hex := "1400" + "AABBCCDD" + strings.Repeat("EE", 10)
hash := ComputeContentHash(hex) hash := ComputeContentHash(hex)
if len(hash) != 16 { if len(hash) != 16 {
t.Errorf("hash length=%d, want 16", len(hash)) t.Errorf("hash length=%d, want 16", len(hash))
@@ -870,12 +880,10 @@ func TestComputeContentHashPayloadBeyondBufferLongHex(t *testing.T) {
func TestComputeContentHashTransportBeyondBuffer(t *testing.T) { func TestComputeContentHashTransportBeyondBuffer(t *testing.T) {
// Transport route (0x00 = TRANSPORT_FLOOD) with path claiming some bytes // Transport route (0x00 = TRANSPORT_FLOOD) with path claiming some bytes
// total buffer too short for transport codes + path // header=0x00, transport(4), pathByte=0x02 (2 hops, 1-byte hash)
// header=0x00, pathByte=0x02 (2 hops, 1-byte hash), then only 2 more bytes // offset=1+4+1+2=8, buffer needs to be >= 8
// payloadStart = 2 + 2 + 4(transport) = 8, but buffer only 6 bytes hex := "00" + "AABB" + "CCDD" + "02" + strings.Repeat("CC", 6) // 20 chars = 10 bytes
hex := "0002" + "AABB" + strings.Repeat("CC", 6) // 20 chars = 10 bytes
hash := ComputeContentHash(hex) hash := ComputeContentHash(hex)
// payloadStart = 2 + 2 + 4 = 8, buffer is 10 bytes → should work
if len(hash) != 16 { if len(hash) != 16 {
t.Errorf("hash length=%d, want 16", len(hash)) t.Errorf("hash length=%d, want 16", len(hash))
} }
@@ -913,8 +921,8 @@ func TestDecodePacketWithNewlines(t *testing.T) {
} }
func TestDecodePacketTransportRouteTooShort(t *testing.T) { func TestDecodePacketTransportRouteTooShort(t *testing.T) {
// TRANSPORT_FLOOD (route=0) but only 3 bytes total → too short for transport codes // TRANSPORT_FLOOD (route=0) but only 2 bytes total → too short for transport codes
_, err := DecodePacket("140011", nil) _, err := DecodePacket("1400", nil)
if err == nil { if err == nil {
t.Error("expected error for transport route with too-short buffer") t.Error("expected error for transport route with too-short buffer")
} }
@@ -931,16 +939,19 @@ func TestDecodeAckShort(t *testing.T) {
} }
func TestDecodeAckValid(t *testing.T) { func TestDecodeAckValid(t *testing.T) {
buf := []byte{0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF} buf := []byte{0xAA, 0xBB, 0xCC, 0xDD}
p := decodeAck(buf) p := decodeAck(buf)
if p.Error != "" { if p.Error != "" {
t.Errorf("unexpected error: %s", p.Error) t.Errorf("unexpected error: %s", p.Error)
} }
if p.DestHash != "aa" { if p.ExtraHash != "ddccbbaa" {
t.Errorf("destHash=%s, want aa", p.DestHash) t.Errorf("extraHash=%s, want ddccbbaa", p.ExtraHash)
} }
if p.ExtraHash != "ccddeeff" { if p.DestHash != "" {
t.Errorf("extraHash=%s, want ccddeeff", p.ExtraHash) t.Errorf("destHash should be empty, got %s", p.DestHash)
}
if p.SrcHash != "" {
t.Errorf("srcHash should be empty, got %s", p.SrcHash)
} }
} }

File diff suppressed because it is too large Load Diff

View File

@@ -17,6 +17,8 @@ func setupTestDB(t *testing.T) *DB {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
// Force single connection so all goroutines share the same in-memory DB
conn.SetMaxOpenConns(1)
// Create schema matching MeshCore Analyzer v3 // Create schema matching MeshCore Analyzer v3
schema := ` schema := `

View File

@@ -54,8 +54,8 @@ type Header struct {
// TransportCodes are present on TRANSPORT_FLOOD and TRANSPORT_DIRECT routes. // TransportCodes are present on TRANSPORT_FLOOD and TRANSPORT_DIRECT routes.
type TransportCodes struct { type TransportCodes struct {
NextHop string `json:"nextHop"` Code1 string `json:"code1"`
LastHop string `json:"lastHop"` Code2 string `json:"code2"`
} }
// Path holds decoded path/hop information. // Path holds decoded path/hop information.
@@ -74,6 +74,8 @@ type AdvertFlags struct {
Room bool `json:"room"` Room bool `json:"room"`
Sensor bool `json:"sensor"` Sensor bool `json:"sensor"`
HasLocation bool `json:"hasLocation"` HasLocation bool `json:"hasLocation"`
HasFeat1 bool `json:"hasFeat1"`
HasFeat2 bool `json:"hasFeat2"`
HasName bool `json:"hasName"` HasName bool `json:"hasName"`
} }
@@ -97,6 +99,8 @@ type Payload struct {
EphemeralPubKey string `json:"ephemeralPubKey,omitempty"` EphemeralPubKey string `json:"ephemeralPubKey,omitempty"`
PathData string `json:"pathData,omitempty"` PathData string `json:"pathData,omitempty"`
Tag uint32 `json:"tag,omitempty"` Tag uint32 `json:"tag,omitempty"`
AuthCode uint32 `json:"authCode,omitempty"`
TraceFlags *int `json:"traceFlags,omitempty"`
RawHex string `json:"raw,omitempty"` RawHex string `json:"raw,omitempty"`
Error string `json:"error,omitempty"` Error string `json:"error,omitempty"`
} }
@@ -173,14 +177,13 @@ func decodeEncryptedPayload(typeName string, buf []byte) Payload {
} }
func decodeAck(buf []byte) Payload { func decodeAck(buf []byte) Payload {
if len(buf) < 6 { if len(buf) < 4 {
return Payload{Type: "ACK", Error: "too short", RawHex: hex.EncodeToString(buf)} return Payload{Type: "ACK", Error: "too short", RawHex: hex.EncodeToString(buf)}
} }
checksum := binary.LittleEndian.Uint32(buf[0:4])
return Payload{ return Payload{
Type: "ACK", Type: "ACK",
DestHash: hex.EncodeToString(buf[0:1]), ExtraHash: fmt.Sprintf("%08x", checksum),
SrcHash: hex.EncodeToString(buf[1:2]),
ExtraHash: hex.EncodeToString(buf[2:6]),
} }
} }
@@ -205,6 +208,8 @@ func decodeAdvert(buf []byte) Payload {
if len(appdata) > 0 { if len(appdata) > 0 {
flags := appdata[0] flags := appdata[0]
advType := int(flags & 0x0F) advType := int(flags & 0x0F)
hasFeat1 := flags&0x20 != 0
hasFeat2 := flags&0x40 != 0
p.Flags = &AdvertFlags{ p.Flags = &AdvertFlags{
Raw: int(flags), Raw: int(flags),
Type: advType, Type: advType,
@@ -213,6 +218,8 @@ func decodeAdvert(buf []byte) Payload {
Room: advType == 3, Room: advType == 3,
Sensor: advType == 4, Sensor: advType == 4,
HasLocation: flags&0x10 != 0, HasLocation: flags&0x10 != 0,
HasFeat1: hasFeat1,
HasFeat2: hasFeat2,
HasName: flags&0x80 != 0, HasName: flags&0x80 != 0,
} }
@@ -226,6 +233,12 @@ func decodeAdvert(buf []byte) Payload {
p.Lon = &lon p.Lon = &lon
off += 8 off += 8
} }
if hasFeat1 && len(appdata) >= off+2 {
off += 2 // skip feat1 bytes (reserved for future use)
}
if hasFeat2 && len(appdata) >= off+2 {
off += 2 // skip feat2 bytes (reserved for future use)
}
if p.Flags.HasName { if p.Flags.HasName {
name := string(appdata[off:]) name := string(appdata[off:])
name = strings.TrimRight(name, "\x00") name = strings.TrimRight(name, "\x00")
@@ -276,15 +289,22 @@ func decodePathPayload(buf []byte) Payload {
} }
func decodeTrace(buf []byte) Payload { func decodeTrace(buf []byte) Payload {
if len(buf) < 12 { if len(buf) < 9 {
return Payload{Type: "TRACE", Error: "too short", RawHex: hex.EncodeToString(buf)} return Payload{Type: "TRACE", Error: "too short", RawHex: hex.EncodeToString(buf)}
} }
return Payload{ tag := binary.LittleEndian.Uint32(buf[0:4])
Type: "TRACE", authCode := binary.LittleEndian.Uint32(buf[4:8])
DestHash: hex.EncodeToString(buf[5:11]), flags := int(buf[8])
SrcHash: hex.EncodeToString(buf[11:12]), p := Payload{
Tag: binary.LittleEndian.Uint32(buf[1:5]), Type: "TRACE",
Tag: tag,
AuthCode: authCode,
TraceFlags: &flags,
} }
if len(buf) > 9 {
p.PathData = hex.EncodeToString(buf[9:])
}
return p
} }
func decodePayload(payloadType int, buf []byte) Payload { func decodePayload(payloadType int, buf []byte) Payload {
@@ -327,8 +347,7 @@ func DecodePacket(hexString string) (*DecodedPacket, error) {
} }
header := decodeHeader(buf[0]) header := decodeHeader(buf[0])
pathByte := buf[1] offset := 1
offset := 2
var tc *TransportCodes var tc *TransportCodes
if isTransportRoute(header.RouteType) { if isTransportRoute(header.RouteType) {
@@ -336,12 +355,18 @@ func DecodePacket(hexString string) (*DecodedPacket, error) {
return nil, fmt.Errorf("packet too short for transport codes") return nil, fmt.Errorf("packet too short for transport codes")
} }
tc = &TransportCodes{ tc = &TransportCodes{
NextHop: strings.ToUpper(hex.EncodeToString(buf[offset : offset+2])), Code1: strings.ToUpper(hex.EncodeToString(buf[offset : offset+2])),
LastHop: strings.ToUpper(hex.EncodeToString(buf[offset+2 : offset+4])), Code2: strings.ToUpper(hex.EncodeToString(buf[offset+2 : offset+4])),
} }
offset += 4 offset += 4
} }
if offset >= len(buf) {
return nil, fmt.Errorf("packet too short (no path byte)")
}
pathByte := buf[offset]
offset++
path, bytesConsumed := decodePath(pathByte, buf, offset) path, bytesConsumed := decodePath(pathByte, buf, offset)
offset += bytesConsumed offset += bytesConsumed
@@ -367,16 +392,24 @@ func ComputeContentHash(rawHex string) string {
return rawHex return rawHex
} }
pathByte := buf[1] headerByte := buf[0]
offset := 1
if isTransportRoute(int(headerByte & 0x03)) {
offset += 4
}
if offset >= len(buf) {
if len(rawHex) >= 16 {
return rawHex[:16]
}
return rawHex
}
pathByte := buf[offset]
offset++
hashSize := int((pathByte>>6)&0x3) + 1 hashSize := int((pathByte>>6)&0x3) + 1
hashCount := int(pathByte & 0x3F) hashCount := int(pathByte & 0x3F)
pathBytes := hashSize * hashCount pathBytes := hashSize * hashCount
headerByte := buf[0] payloadStart := offset + pathBytes
payloadStart := 2 + pathBytes
if isTransportRoute(int(headerByte & 0x03)) {
payloadStart += 4
}
if payloadStart > len(buf) { if payloadStart > len(buf) {
if len(rawHex) >= 16 { if len(rawHex) >= 16 {
return rawHex[:16] return rawHex[:16]

View File

@@ -1,403 +1,506 @@
package main package main
// parity_test.go — Golden fixture shape tests. // parity_test.go — Golden fixture shape tests.
// Validates that Go API responses match the shape of Node.js API responses. // Validates that Go API responses match the shape of Node.js API responses.
// Shapes were captured from the production Node.js server and stored in // Shapes were captured from the production Node.js server and stored in
// testdata/golden/shapes.json. // testdata/golden/shapes.json.
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/http/httptest" "net/http/httptest"
"os" "os"
"path/filepath" "path/filepath"
"runtime" "runtime"
"strings" "sort"
"testing" "strings"
) "testing"
"time"
// shapeSpec describes the expected JSON structure from the Node.js server. )
type shapeSpec struct {
Type string `json:"type"` // shapeSpec describes the expected JSON structure from the Node.js server.
Keys map[string]shapeSpec `json:"keys,omitempty"` type shapeSpec struct {
ElementShape *shapeSpec `json:"elementShape,omitempty"` Type string `json:"type"`
DynamicKeys bool `json:"dynamicKeys,omitempty"` Keys map[string]shapeSpec `json:"keys,omitempty"`
ValueShape *shapeSpec `json:"valueShape,omitempty"` ElementShape *shapeSpec `json:"elementShape,omitempty"`
RequiredKeys map[string]shapeSpec `json:"requiredKeys,omitempty"` DynamicKeys bool `json:"dynamicKeys,omitempty"`
} ValueShape *shapeSpec `json:"valueShape,omitempty"`
RequiredKeys map[string]shapeSpec `json:"requiredKeys,omitempty"`
// loadShapes reads testdata/golden/shapes.json relative to this source file. }
func loadShapes(t *testing.T) map[string]shapeSpec {
t.Helper() // loadShapes reads testdata/golden/shapes.json relative to this source file.
_, thisFile, _, _ := runtime.Caller(0) func loadShapes(t *testing.T) map[string]shapeSpec {
dir := filepath.Dir(thisFile) t.Helper()
data, err := os.ReadFile(filepath.Join(dir, "testdata", "golden", "shapes.json")) _, thisFile, _, _ := runtime.Caller(0)
if err != nil { dir := filepath.Dir(thisFile)
t.Fatalf("cannot load shapes.json: %v", err) data, err := os.ReadFile(filepath.Join(dir, "testdata", "golden", "shapes.json"))
} if err != nil {
var shapes map[string]shapeSpec t.Fatalf("cannot load shapes.json: %v", err)
if err := json.Unmarshal(data, &shapes); err != nil { }
t.Fatalf("cannot parse shapes.json: %v", err) var shapes map[string]shapeSpec
} if err := json.Unmarshal(data, &shapes); err != nil {
return shapes t.Fatalf("cannot parse shapes.json: %v", err)
} }
return shapes
// validateShape recursively checks that `actual` matches the expected `spec`. }
// `path` tracks the JSON path for error messages.
// Returns a list of mismatch descriptions. // validateShape recursively checks that `actual` matches the expected `spec`.
func validateShape(actual interface{}, spec shapeSpec, path string) []string { // `path` tracks the JSON path for error messages.
var errs []string // Returns a list of mismatch descriptions.
func validateShape(actual interface{}, spec shapeSpec, path string) []string {
switch spec.Type { var errs []string
case "null", "nullable":
// nullable means: value can be null OR matching type. Accept anything. switch spec.Type {
return nil case "null", "nullable":
case "nullable_number": // nullable means: value can be null OR matching type. Accept anything.
// Can be null or number return nil
if actual != nil { case "nullable_number":
if _, ok := actual.(float64); !ok { // Can be null or number
errs = append(errs, fmt.Sprintf("%s: expected number or null, got %T", path, actual)) if actual != nil {
} if _, ok := actual.(float64); !ok {
} errs = append(errs, fmt.Sprintf("%s: expected number or null, got %T", path, actual))
return errs }
case "string": }
if actual == nil { return errs
errs = append(errs, fmt.Sprintf("%s: expected string, got null", path)) case "string":
} else if _, ok := actual.(string); !ok { if actual == nil {
errs = append(errs, fmt.Sprintf("%s: expected string, got %T", path, actual)) errs = append(errs, fmt.Sprintf("%s: expected string, got null", path))
} } else if _, ok := actual.(string); !ok {
case "number": errs = append(errs, fmt.Sprintf("%s: expected string, got %T", path, actual))
if actual == nil { }
errs = append(errs, fmt.Sprintf("%s: expected number, got null", path)) case "number":
} else if _, ok := actual.(float64); !ok { if actual == nil {
errs = append(errs, fmt.Sprintf("%s: expected number, got %T (%v)", path, actual, actual)) errs = append(errs, fmt.Sprintf("%s: expected number, got null", path))
} } else if _, ok := actual.(float64); !ok {
case "boolean": errs = append(errs, fmt.Sprintf("%s: expected number, got %T (%v)", path, actual, actual))
if actual == nil { }
errs = append(errs, fmt.Sprintf("%s: expected boolean, got null", path)) case "boolean":
} else if _, ok := actual.(bool); !ok { if actual == nil {
errs = append(errs, fmt.Sprintf("%s: expected boolean, got %T", path, actual)) errs = append(errs, fmt.Sprintf("%s: expected boolean, got null", path))
} } else if _, ok := actual.(bool); !ok {
case "array": errs = append(errs, fmt.Sprintf("%s: expected boolean, got %T", path, actual))
if actual == nil { }
errs = append(errs, fmt.Sprintf("%s: expected array, got null (arrays must be [] not null)", path)) case "array":
return errs if actual == nil {
} errs = append(errs, fmt.Sprintf("%s: expected array, got null (arrays must be [] not null)", path))
arr, ok := actual.([]interface{}) return errs
if !ok { }
errs = append(errs, fmt.Sprintf("%s: expected array, got %T", path, actual)) arr, ok := actual.([]interface{})
return errs if !ok {
} errs = append(errs, fmt.Sprintf("%s: expected array, got %T", path, actual))
if spec.ElementShape != nil && len(arr) > 0 { return errs
errs = append(errs, validateShape(arr[0], *spec.ElementShape, path+"[0]")...) }
} if spec.ElementShape != nil && len(arr) > 0 {
case "object": errs = append(errs, validateShape(arr[0], *spec.ElementShape, path+"[0]")...)
if actual == nil { }
errs = append(errs, fmt.Sprintf("%s: expected object, got null", path)) case "object":
return errs if actual == nil {
} errs = append(errs, fmt.Sprintf("%s: expected object, got null", path))
obj, ok := actual.(map[string]interface{}) return errs
if !ok { }
errs = append(errs, fmt.Sprintf("%s: expected object, got %T", path, actual)) obj, ok := actual.(map[string]interface{})
return errs if !ok {
} errs = append(errs, fmt.Sprintf("%s: expected object, got %T", path, actual))
return errs
if spec.DynamicKeys { }
// Object with dynamic keys — validate value shapes
if spec.ValueShape != nil && len(obj) > 0 { if spec.DynamicKeys {
for k, v := range obj { // Object with dynamic keys — validate value shapes
errs = append(errs, validateShape(v, *spec.ValueShape, path+"."+k)...) if spec.ValueShape != nil && len(obj) > 0 {
break // check just one sample for k, v := range obj {
} errs = append(errs, validateShape(v, *spec.ValueShape, path+"."+k)...)
} break // check just one sample
if spec.RequiredKeys != nil { }
for rk, rs := range spec.RequiredKeys { }
v, exists := obj[rk] if spec.RequiredKeys != nil {
if !exists { for rk, rs := range spec.RequiredKeys {
errs = append(errs, fmt.Sprintf("%s: missing required key %q in dynamic-key object", path, rk)) v, exists := obj[rk]
} else { if !exists {
errs = append(errs, validateShape(v, rs, path+"."+rk)...) errs = append(errs, fmt.Sprintf("%s: missing required key %q in dynamic-key object", path, rk))
} } else {
} errs = append(errs, validateShape(v, rs, path+"."+rk)...)
} }
} else if spec.Keys != nil { }
// Object with known keys — check each expected key exists and has correct type }
for key, keySpec := range spec.Keys { } else if spec.Keys != nil {
val, exists := obj[key] // Object with known keys — check each expected key exists and has correct type
if !exists { for key, keySpec := range spec.Keys {
errs = append(errs, fmt.Sprintf("%s: missing field %q (expected %s)", path, key, keySpec.Type)) val, exists := obj[key]
} else { if !exists {
errs = append(errs, validateShape(val, keySpec, path+"."+key)...) errs = append(errs, fmt.Sprintf("%s: missing field %q (expected %s)", path, key, keySpec.Type))
} } else {
} errs = append(errs, validateShape(val, keySpec, path+"."+key)...)
} }
} }
}
return errs }
}
return errs
// parityEndpoint defines one endpoint to test for parity. }
type parityEndpoint struct {
name string // key in shapes.json // parityEndpoint defines one endpoint to test for parity.
path string // HTTP path to request type parityEndpoint struct {
} name string // key in shapes.json
path string // HTTP path to request
func TestParityShapes(t *testing.T) { }
shapes := loadShapes(t)
_, router := setupTestServer(t) func TestParityShapes(t *testing.T) {
shapes := loadShapes(t)
endpoints := []parityEndpoint{ _, router := setupTestServer(t)
{"stats", "/api/stats"},
{"nodes", "/api/nodes?limit=5"}, endpoints := []parityEndpoint{
{"packets", "/api/packets?limit=5"}, {"stats", "/api/stats"},
{"packets_grouped", "/api/packets?limit=5&groupByHash=true"}, {"nodes", "/api/nodes?limit=5"},
{"observers", "/api/observers"}, {"packets", "/api/packets?limit=5"},
{"channels", "/api/channels"}, {"packets_grouped", "/api/packets?limit=5&groupByHash=true"},
{"channel_messages", "/api/channels/0000000000000000/messages?limit=5"}, {"observers", "/api/observers"},
{"analytics_rf", "/api/analytics/rf?days=7"}, {"channels", "/api/channels"},
{"analytics_topology", "/api/analytics/topology?days=7"}, {"channel_messages", "/api/channels/0000000000000000/messages?limit=5"},
{"analytics_hash_sizes", "/api/analytics/hash-sizes?days=7"}, {"analytics_rf", "/api/analytics/rf?days=7"},
{"analytics_distance", "/api/analytics/distance?days=7"}, {"analytics_topology", "/api/analytics/topology?days=7"},
{"analytics_subpaths", "/api/analytics/subpaths?days=7"}, {"analytics_hash_sizes", "/api/analytics/hash-sizes?days=7"},
{"bulk_health", "/api/nodes/bulk-health"}, {"analytics_distance", "/api/analytics/distance?days=7"},
{"health", "/api/health"}, {"analytics_subpaths", "/api/analytics/subpaths?days=7"},
{"perf", "/api/perf"}, {"bulk_health", "/api/nodes/bulk-health"},
} {"health", "/api/health"},
{"perf", "/api/perf"},
for _, ep := range endpoints { }
t.Run("Parity_"+ep.name, func(t *testing.T) {
spec, ok := shapes[ep.name] for _, ep := range endpoints {
if !ok { t.Run("Parity_"+ep.name, func(t *testing.T) {
t.Fatalf("no shape spec found for %q in shapes.json", ep.name) spec, ok := shapes[ep.name]
} if !ok {
t.Fatalf("no shape spec found for %q in shapes.json", ep.name)
req := httptest.NewRequest("GET", ep.path, nil) }
w := httptest.NewRecorder()
router.ServeHTTP(w, req) req := httptest.NewRequest("GET", ep.path, nil)
w := httptest.NewRecorder()
if w.Code != 200 { router.ServeHTTP(w, req)
t.Fatalf("GET %s returned %d, expected 200. Body: %s",
ep.path, w.Code, w.Body.String()) if w.Code != 200 {
} t.Fatalf("GET %s returned %d, expected 200. Body: %s",
ep.path, w.Code, w.Body.String())
var body interface{} }
if err := json.Unmarshal(w.Body.Bytes(), &body); err != nil {
t.Fatalf("GET %s returned invalid JSON: %v\nBody: %s", var body interface{}
ep.path, err, w.Body.String()) if err := json.Unmarshal(w.Body.Bytes(), &body); err != nil {
} t.Fatalf("GET %s returned invalid JSON: %v\nBody: %s",
ep.path, err, w.Body.String())
mismatches := validateShape(body, spec, ep.path) }
if len(mismatches) > 0 {
t.Errorf("Go %s has %d shape mismatches vs Node.js golden:\n %s", mismatches := validateShape(body, spec, ep.path)
ep.path, len(mismatches), strings.Join(mismatches, "\n ")) if len(mismatches) > 0 {
} t.Errorf("Go %s has %d shape mismatches vs Node.js golden:\n %s",
}) ep.path, len(mismatches), strings.Join(mismatches, "\n "))
} }
} })
}
// TestParityNodeDetail tests node detail endpoint shape. }
// Uses a known test node public key from seeded data.
func TestParityNodeDetail(t *testing.T) { // TestParityNodeDetail tests node detail endpoint shape.
shapes := loadShapes(t) // Uses a known test node public key from seeded data.
_, router := setupTestServer(t) func TestParityNodeDetail(t *testing.T) {
shapes := loadShapes(t)
spec, ok := shapes["node_detail"] _, router := setupTestServer(t)
if !ok {
t.Fatal("no shape spec for node_detail in shapes.json") spec, ok := shapes["node_detail"]
} if !ok {
t.Fatal("no shape spec for node_detail in shapes.json")
req := httptest.NewRequest("GET", "/api/nodes/aabbccdd11223344", nil) }
w := httptest.NewRecorder()
router.ServeHTTP(w, req) req := httptest.NewRequest("GET", "/api/nodes/aabbccdd11223344", nil)
w := httptest.NewRecorder()
if w.Code != 200 { router.ServeHTTP(w, req)
t.Fatalf("node detail returned %d: %s", w.Code, w.Body.String())
} if w.Code != 200 {
t.Fatalf("node detail returned %d: %s", w.Code, w.Body.String())
var body interface{} }
json.Unmarshal(w.Body.Bytes(), &body)
var body interface{}
mismatches := validateShape(body, spec, "/api/nodes/{pubkey}") json.Unmarshal(w.Body.Bytes(), &body)
if len(mismatches) > 0 {
t.Errorf("Go node detail has %d shape mismatches vs Node.js golden:\n %s", mismatches := validateShape(body, spec, "/api/nodes/{pubkey}")
len(mismatches), strings.Join(mismatches, "\n ")) if len(mismatches) > 0 {
} t.Errorf("Go node detail has %d shape mismatches vs Node.js golden:\n %s",
} len(mismatches), strings.Join(mismatches, "\n "))
}
// TestParityArraysNotNull verifies that array-typed fields in Go responses are }
// [] (empty array) rather than null. This is a common Go/JSON pitfall where
// nil slices marshal as null instead of []. // TestParityArraysNotNull verifies that array-typed fields in Go responses are
// Uses shapes.json to know which fields SHOULD be arrays. // [] (empty array) rather than null. This is a common Go/JSON pitfall where
func TestParityArraysNotNull(t *testing.T) { // nil slices marshal as null instead of [].
shapes := loadShapes(t) // Uses shapes.json to know which fields SHOULD be arrays.
_, router := setupTestServer(t) func TestParityArraysNotNull(t *testing.T) {
shapes := loadShapes(t)
endpoints := []struct { _, router := setupTestServer(t)
name string
path string endpoints := []struct {
}{ name string
{"stats", "/api/stats"}, path string
{"nodes", "/api/nodes?limit=5"}, }{
{"packets", "/api/packets?limit=5"}, {"stats", "/api/stats"},
{"packets_grouped", "/api/packets?limit=5&groupByHash=true"}, {"nodes", "/api/nodes?limit=5"},
{"observers", "/api/observers"}, {"packets", "/api/packets?limit=5"},
{"channels", "/api/channels"}, {"packets_grouped", "/api/packets?limit=5&groupByHash=true"},
{"bulk_health", "/api/nodes/bulk-health"}, {"observers", "/api/observers"},
{"analytics_rf", "/api/analytics/rf?days=7"}, {"channels", "/api/channels"},
{"analytics_topology", "/api/analytics/topology?days=7"}, {"bulk_health", "/api/nodes/bulk-health"},
{"analytics_hash_sizes", "/api/analytics/hash-sizes?days=7"}, {"analytics_rf", "/api/analytics/rf?days=7"},
{"analytics_distance", "/api/analytics/distance?days=7"}, {"analytics_topology", "/api/analytics/topology?days=7"},
{"analytics_subpaths", "/api/analytics/subpaths?days=7"}, {"analytics_hash_sizes", "/api/analytics/hash-sizes?days=7"},
} {"analytics_distance", "/api/analytics/distance?days=7"},
{"analytics_subpaths", "/api/analytics/subpaths?days=7"},
for _, ep := range endpoints { }
t.Run("NullArrayCheck_"+ep.name, func(t *testing.T) {
spec, ok := shapes[ep.name] for _, ep := range endpoints {
if !ok { t.Run("NullArrayCheck_"+ep.name, func(t *testing.T) {
t.Skipf("no shape spec for %s", ep.name) spec, ok := shapes[ep.name]
} if !ok {
t.Skipf("no shape spec for %s", ep.name)
req := httptest.NewRequest("GET", ep.path, nil) }
w := httptest.NewRecorder()
router.ServeHTTP(w, req) req := httptest.NewRequest("GET", ep.path, nil)
w := httptest.NewRecorder()
if w.Code != 200 { router.ServeHTTP(w, req)
t.Skipf("GET %s returned %d, skipping null-array check", ep.path, w.Code)
} if w.Code != 200 {
t.Skipf("GET %s returned %d, skipping null-array check", ep.path, w.Code)
var body interface{} }
json.Unmarshal(w.Body.Bytes(), &body)
var body interface{}
nullArrays := findNullArrays(body, spec, ep.path) json.Unmarshal(w.Body.Bytes(), &body)
if len(nullArrays) > 0 {
t.Errorf("Go %s has null where [] expected:\n %s\n"+ nullArrays := findNullArrays(body, spec, ep.path)
"Go nil slices marshal as null — initialize with make() or literal", if len(nullArrays) > 0 {
ep.path, strings.Join(nullArrays, "\n ")) t.Errorf("Go %s has null where [] expected:\n %s\n"+
} "Go nil slices marshal as null — initialize with make() or literal",
}) ep.path, strings.Join(nullArrays, "\n "))
} }
} })
}
// findNullArrays walks JSON data alongside a shape spec and returns paths }
// where the spec says the field should be an array but Go returned null.
func findNullArrays(actual interface{}, spec shapeSpec, path string) []string { // findNullArrays walks JSON data alongside a shape spec and returns paths
var nulls []string // where the spec says the field should be an array but Go returned null.
func findNullArrays(actual interface{}, spec shapeSpec, path string) []string {
switch spec.Type { var nulls []string
case "array":
if actual == nil { switch spec.Type {
nulls = append(nulls, fmt.Sprintf("%s: null (should be [])", path)) case "array":
} else if arr, ok := actual.([]interface{}); ok && spec.ElementShape != nil { if actual == nil {
for i, elem := range arr { nulls = append(nulls, fmt.Sprintf("%s: null (should be [])", path))
nulls = append(nulls, findNullArrays(elem, *spec.ElementShape, fmt.Sprintf("%s[%d]", path, i))...) } else if arr, ok := actual.([]interface{}); ok && spec.ElementShape != nil {
} for i, elem := range arr {
} nulls = append(nulls, findNullArrays(elem, *spec.ElementShape, fmt.Sprintf("%s[%d]", path, i))...)
case "object": }
obj, ok := actual.(map[string]interface{}) }
if !ok || obj == nil { case "object":
return nulls obj, ok := actual.(map[string]interface{})
} if !ok || obj == nil {
if spec.Keys != nil { return nulls
for key, keySpec := range spec.Keys { }
if val, exists := obj[key]; exists { if spec.Keys != nil {
nulls = append(nulls, findNullArrays(val, keySpec, path+"."+key)...) for key, keySpec := range spec.Keys {
} else if keySpec.Type == "array" { if val, exists := obj[key]; exists {
// Key missing entirely — also a null-array problem nulls = append(nulls, findNullArrays(val, keySpec, path+"."+key)...)
nulls = append(nulls, fmt.Sprintf("%s.%s: missing (should be [])", path, key)) } else if keySpec.Type == "array" {
} // Key missing entirely — also a null-array problem
} nulls = append(nulls, fmt.Sprintf("%s.%s: missing (should be [])", path, key))
} }
if spec.DynamicKeys && spec.ValueShape != nil { }
for k, v := range obj { }
nulls = append(nulls, findNullArrays(v, *spec.ValueShape, path+"."+k)...) if spec.DynamicKeys && spec.ValueShape != nil {
break // sample one for k, v := range obj {
} nulls = append(nulls, findNullArrays(v, *spec.ValueShape, path+"."+k)...)
} break // sample one
} }
}
return nulls }
}
return nulls
// TestParityHealthEngine verifies Go health endpoint declares engine=go }
// while Node declares engine=node (or omits it). The Go server must always
// identify itself. // TestParityHealthEngine verifies Go health endpoint declares engine=go
func TestParityHealthEngine(t *testing.T) { // while Node declares engine=node (or omits it). The Go server must always
_, router := setupTestServer(t) // identify itself.
func TestParityHealthEngine(t *testing.T) {
req := httptest.NewRequest("GET", "/api/health", nil) _, router := setupTestServer(t)
w := httptest.NewRecorder()
router.ServeHTTP(w, req) req := httptest.NewRequest("GET", "/api/health", nil)
w := httptest.NewRecorder()
var body map[string]interface{} router.ServeHTTP(w, req)
json.Unmarshal(w.Body.Bytes(), &body)
var body map[string]interface{}
engine, ok := body["engine"] json.Unmarshal(w.Body.Bytes(), &body)
if !ok {
t.Error("health response missing 'engine' field (Go server must include engine=go)") engine, ok := body["engine"]
} else if engine != "go" { if !ok {
t.Errorf("health engine=%v, expected 'go'", engine) t.Error("health response missing 'engine' field (Go server must include engine=go)")
} } else if engine != "go" {
} t.Errorf("health engine=%v, expected 'go'", engine)
}
// TestValidateShapeFunction directly tests the shape validator itself. }
func TestValidateShapeFunction(t *testing.T) {
t.Run("string match", func(t *testing.T) { // TestValidateShapeFunction directly tests the shape validator itself.
errs := validateShape("hello", shapeSpec{Type: "string"}, "$.x") func TestValidateShapeFunction(t *testing.T) {
if len(errs) != 0 { t.Run("string match", func(t *testing.T) {
t.Errorf("unexpected errors: %v", errs) errs := validateShape("hello", shapeSpec{Type: "string"}, "$.x")
} if len(errs) != 0 {
}) t.Errorf("unexpected errors: %v", errs)
}
t.Run("string mismatch", func(t *testing.T) { })
errs := validateShape(42.0, shapeSpec{Type: "string"}, "$.x")
if len(errs) != 1 { t.Run("string mismatch", func(t *testing.T) {
t.Errorf("expected 1 error, got %d: %v", len(errs), errs) errs := validateShape(42.0, shapeSpec{Type: "string"}, "$.x")
} if len(errs) != 1 {
}) t.Errorf("expected 1 error, got %d: %v", len(errs), errs)
}
t.Run("null array rejected", func(t *testing.T) { })
errs := validateShape(nil, shapeSpec{Type: "array"}, "$.arr")
if len(errs) != 1 || !strings.Contains(errs[0], "null") { t.Run("null array rejected", func(t *testing.T) {
t.Errorf("expected null-array error, got: %v", errs) errs := validateShape(nil, shapeSpec{Type: "array"}, "$.arr")
} if len(errs) != 1 || !strings.Contains(errs[0], "null") {
}) t.Errorf("expected null-array error, got: %v", errs)
}
t.Run("empty array OK", func(t *testing.T) { })
errs := validateShape([]interface{}{}, shapeSpec{Type: "array"}, "$.arr")
if len(errs) != 0 { t.Run("empty array OK", func(t *testing.T) {
t.Errorf("unexpected errors for empty array: %v", errs) errs := validateShape([]interface{}{}, shapeSpec{Type: "array"}, "$.arr")
} if len(errs) != 0 {
}) t.Errorf("unexpected errors for empty array: %v", errs)
}
t.Run("missing object key", func(t *testing.T) { })
spec := shapeSpec{Type: "object", Keys: map[string]shapeSpec{
"name": {Type: "string"}, t.Run("missing object key", func(t *testing.T) {
"age": {Type: "number"}, spec := shapeSpec{Type: "object", Keys: map[string]shapeSpec{
}} "name": {Type: "string"},
obj := map[string]interface{}{"name": "test"} "age": {Type: "number"},
errs := validateShape(obj, spec, "$.user") }}
if len(errs) != 1 || !strings.Contains(errs[0], "age") { obj := map[string]interface{}{"name": "test"}
t.Errorf("expected missing age error, got: %v", errs) errs := validateShape(obj, spec, "$.user")
} if len(errs) != 1 || !strings.Contains(errs[0], "age") {
}) t.Errorf("expected missing age error, got: %v", errs)
}
t.Run("nullable allows null", func(t *testing.T) { })
errs := validateShape(nil, shapeSpec{Type: "nullable"}, "$.x")
if len(errs) != 0 { t.Run("nullable allows null", func(t *testing.T) {
t.Errorf("nullable should accept null: %v", errs) errs := validateShape(nil, shapeSpec{Type: "nullable"}, "$.x")
} if len(errs) != 0 {
}) t.Errorf("nullable should accept null: %v", errs)
}
t.Run("dynamic keys validates value shape", func(t *testing.T) { })
spec := shapeSpec{
Type: "object", t.Run("dynamic keys validates value shape", func(t *testing.T) {
DynamicKeys: true, spec := shapeSpec{
ValueShape: &shapeSpec{Type: "number"}, Type: "object",
} DynamicKeys: true,
obj := map[string]interface{}{"a": 1.0, "b": 2.0} ValueShape: &shapeSpec{Type: "number"},
errs := validateShape(obj, spec, "$.dyn") }
if len(errs) != 0 { obj := map[string]interface{}{"a": 1.0, "b": 2.0}
t.Errorf("unexpected errors: %v", errs) errs := validateShape(obj, spec, "$.dyn")
} if len(errs) != 0 {
}) t.Errorf("unexpected errors: %v", errs)
} }
})
}
func TestParityWSMultiObserverGolden(t *testing.T) {
db := setupTestDB(t)
defer db.Close()
seedTestData(t, db)
hub := NewHub()
store := NewPacketStore(db)
if err := store.Load(); err != nil {
t.Fatalf("store load failed: %v", err)
}
poller := NewPoller(db, hub, 50*time.Millisecond)
poller.store = store
client := &Client{send: make(chan []byte, 256)}
hub.Register(client)
defer hub.Unregister(client)
go poller.Start()
defer poller.Stop()
// Wait for poller to initialize its lastID/lastObsID cursors before
// inserting new data; otherwise the poller may snapshot a lastID that
// already includes the test data and never broadcast it.
time.Sleep(100 * time.Millisecond)
now := time.Now().UTC().Format(time.RFC3339)
if _, err := db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json)
VALUES ('BEEF', 'goldenstarburst237', ?, 1, 4, '{"pubKey":"aabbccdd11223344","type":"ADVERT"}')`, now); err != nil {
t.Fatalf("insert tx failed: %v", err)
}
var txID int
if err := db.conn.QueryRow(`SELECT id FROM transmissions WHERE hash='goldenstarburst237'`).Scan(&txID); err != nil {
t.Fatalf("query tx id failed: %v", err)
}
ts := time.Now().Unix()
if _, err := db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
VALUES (?, 1, 11.0, -88, '["p1"]', ?),
(?, 2, 9.0, -92, '["p1","p2"]', ?),
(?, 1, 7.0, -96, '["p1","p2","p3"]', ?)`,
txID, ts, txID, ts+1, txID, ts+2); err != nil {
t.Fatalf("insert obs failed: %v", err)
}
type golden struct {
Hash string
Count int
Paths []string
ObserverIDs []string
}
expected := golden{
Hash: "goldenstarburst237",
Count: 3,
Paths: []string{`["p1"]`, `["p1","p2"]`, `["p1","p2","p3"]`},
ObserverIDs: []string{"obs1", "obs2"},
}
gotPaths := make([]string, 0, expected.Count)
gotObservers := make(map[string]bool)
deadline := time.After(2 * time.Second)
for len(gotPaths) < expected.Count {
select {
case raw := <-client.send:
var msg map[string]interface{}
if err := json.Unmarshal(raw, &msg); err != nil {
t.Fatalf("unmarshal ws message failed: %v", err)
}
if msg["type"] != "packet" {
continue
}
data, _ := msg["data"].(map[string]interface{})
if data == nil || data["hash"] != expected.Hash {
continue
}
if path, ok := data["path_json"].(string); ok {
gotPaths = append(gotPaths, path)
}
if oid, ok := data["observer_id"].(string); ok && oid != "" {
gotObservers[oid] = true
}
case <-deadline:
t.Fatalf("timed out waiting for %d ws messages, got %d", expected.Count, len(gotPaths))
}
}
sort.Strings(gotPaths)
sort.Strings(expected.Paths)
if len(gotPaths) != len(expected.Paths) {
t.Fatalf("path count mismatch: got %d want %d", len(gotPaths), len(expected.Paths))
}
for i := range expected.Paths {
if gotPaths[i] != expected.Paths[i] {
t.Fatalf("path mismatch at %d: got %q want %q", i, gotPaths[i], expected.Paths[i])
}
}
for _, oid := range expected.ObserverIDs {
if !gotObservers[oid] {
t.Fatalf("missing expected observer %q in ws messages", oid)
}
}
}

View File

@@ -1039,7 +1039,7 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac
} }
} }
// Build broadcast maps (same shape as Node.js WS broadcast) // Build broadcast maps (same shape as Node.js WS broadcast), one per observation.
result := make([]map[string]interface{}, 0, len(broadcastOrder)) result := make([]map[string]interface{}, 0, len(broadcastOrder))
for _, txID := range broadcastOrder { for _, txID := range broadcastOrder {
tx := broadcastTxs[txID] tx := broadcastTxs[txID]
@@ -1055,32 +1055,34 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac
decoded["payload"] = payload decoded["payload"] = payload
} }
} }
// Build the nested packet object (packets.js checks m.data.packet) for _, obs := range tx.Observations {
pkt := map[string]interface{}{ // Build the nested packet object (packets.js checks m.data.packet)
"id": tx.ID, pkt := map[string]interface{}{
"raw_hex": strOrNil(tx.RawHex), "id": tx.ID,
"hash": strOrNil(tx.Hash), "raw_hex": strOrNil(tx.RawHex),
"first_seen": strOrNil(tx.FirstSeen), "hash": strOrNil(tx.Hash),
"timestamp": strOrNil(tx.FirstSeen), "first_seen": strOrNil(tx.FirstSeen),
"route_type": intPtrOrNil(tx.RouteType), "timestamp": strOrNil(tx.FirstSeen),
"payload_type": intPtrOrNil(tx.PayloadType), "route_type": intPtrOrNil(tx.RouteType),
"decoded_json": strOrNil(tx.DecodedJSON), "payload_type": intPtrOrNil(tx.PayloadType),
"observer_id": strOrNil(tx.ObserverID), "decoded_json": strOrNil(tx.DecodedJSON),
"observer_name": strOrNil(tx.ObserverName), "observer_id": strOrNil(obs.ObserverID),
"snr": floatPtrOrNil(tx.SNR), "observer_name": strOrNil(obs.ObserverName),
"rssi": floatPtrOrNil(tx.RSSI), "snr": floatPtrOrNil(obs.SNR),
"path_json": strOrNil(tx.PathJSON), "rssi": floatPtrOrNil(obs.RSSI),
"direction": strOrNil(tx.Direction), "path_json": strOrNil(obs.PathJSON),
"observation_count": tx.ObservationCount, "direction": strOrNil(obs.Direction),
"observation_count": tx.ObservationCount,
}
// Broadcast map: top-level fields for live.js + nested packet for packets.js
broadcastMap := make(map[string]interface{}, len(pkt)+2)
for k, v := range pkt {
broadcastMap[k] = v
}
broadcastMap["decoded"] = decoded
broadcastMap["packet"] = pkt
result = append(result, broadcastMap)
} }
// Broadcast map: top-level fields for live.js + nested packet for packets.js
broadcastMap := make(map[string]interface{}, len(pkt)+2)
for k, v := range pkt {
broadcastMap[k] = v
}
broadcastMap["decoded"] = decoded
broadcastMap["packet"] = pkt
result = append(result, broadcastMap)
} }
// Invalidate analytics caches since new data was ingested // Invalidate analytics caches since new data was ingested
@@ -1101,7 +1103,7 @@ func (s *PacketStore) IngestNewFromDB(sinceID, limit int) ([]map[string]interfac
// IngestNewObservations loads new observations for transmissions already in the // IngestNewObservations loads new observations for transmissions already in the
// store. This catches observations that arrive after IngestNewFromDB has already // store. This catches observations that arrive after IngestNewFromDB has already
// advanced past the transmission's ID (fixes #174). // advanced past the transmission's ID (fixes #174).
func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) int { func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) []map[string]interface{} {
if limit <= 0 { if limit <= 0 {
limit = 500 limit = 500
} }
@@ -1127,7 +1129,7 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) int {
rows, err := s.db.conn.Query(querySQL, sinceObsID, limit) rows, err := s.db.conn.Query(querySQL, sinceObsID, limit)
if err != nil { if err != nil {
log.Printf("[store] ingest observations query error: %v", err) log.Printf("[store] ingest observations query error: %v", err)
return sinceObsID return nil
} }
defer rows.Close() defer rows.Close()
@@ -1170,20 +1172,16 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) int {
} }
if len(obsRows) == 0 { if len(obsRows) == 0 {
return sinceObsID return nil
} }
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
newMaxObsID := sinceObsID
updatedTxs := make(map[int]*StoreTx) updatedTxs := make(map[int]*StoreTx)
broadcastMaps := make([]map[string]interface{}, 0, len(obsRows))
for _, r := range obsRows { for _, r := range obsRows {
if r.obsID > newMaxObsID {
newMaxObsID = r.obsID
}
// Already ingested (e.g. by IngestNewFromDB in same cycle) // Already ingested (e.g. by IngestNewFromDB in same cycle)
if _, exists := s.byObsID[r.obsID]; exists { if _, exists := s.byObsID[r.obsID]; exists {
continue continue
@@ -1226,6 +1224,43 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) int {
} }
s.totalObs++ s.totalObs++
updatedTxs[r.txID] = tx updatedTxs[r.txID] = tx
decoded := map[string]interface{}{
"header": map[string]interface{}{
"payloadTypeName": resolvePayloadTypeName(tx.PayloadType),
},
}
if tx.DecodedJSON != "" {
var payload map[string]interface{}
if json.Unmarshal([]byte(tx.DecodedJSON), &payload) == nil {
decoded["payload"] = payload
}
}
pkt := map[string]interface{}{
"id": tx.ID,
"raw_hex": strOrNil(tx.RawHex),
"hash": strOrNil(tx.Hash),
"first_seen": strOrNil(tx.FirstSeen),
"timestamp": strOrNil(tx.FirstSeen),
"route_type": intPtrOrNil(tx.RouteType),
"payload_type": intPtrOrNil(tx.PayloadType),
"decoded_json": strOrNil(tx.DecodedJSON),
"observer_id": strOrNil(obs.ObserverID),
"observer_name": strOrNil(obs.ObserverName),
"snr": floatPtrOrNil(obs.SNR),
"rssi": floatPtrOrNil(obs.RSSI),
"path_json": strOrNil(obs.PathJSON),
"direction": strOrNil(obs.Direction),
"observation_count": tx.ObservationCount,
}
broadcastMap := make(map[string]interface{}, len(pkt)+2)
for k, v := range pkt {
broadcastMap[k] = v
}
broadcastMap["decoded"] = decoded
broadcastMap["packet"] = pkt
broadcastMaps = append(broadcastMaps, broadcastMap)
} }
// Re-pick best observation for updated transmissions and update subpath index // Re-pick best observation for updated transmissions and update subpath index
@@ -1280,7 +1315,7 @@ func (s *PacketStore) IngestNewObservations(sinceObsID, limit int) int {
// analytics caches cleared; no per-cycle log to avoid stdout overhead // analytics caches cleared; no per-cycle log to avoid stdout overhead
} }
return newMaxObsID return broadcastMaps
} }
// MaxTransmissionID returns the highest transmission ID in the store. // MaxTransmissionID returns the highest transmission ID in the store.

View File

@@ -1,229 +1,245 @@
package main package main
import ( import (
"encoding/json" "encoding/json"
"log" "log"
"net/http" "net/http"
"strings" "strings"
"sync" "sync"
"time" "time"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
) )
var upgrader = websocket.Upgrader{ var upgrader = websocket.Upgrader{
ReadBufferSize: 1024, ReadBufferSize: 1024,
WriteBufferSize: 4096, WriteBufferSize: 4096,
CheckOrigin: func(r *http.Request) bool { return true }, CheckOrigin: func(r *http.Request) bool { return true },
} }
// Hub manages WebSocket clients and broadcasts. // Hub manages WebSocket clients and broadcasts.
type Hub struct { type Hub struct {
mu sync.RWMutex mu sync.RWMutex
clients map[*Client]bool clients map[*Client]bool
} }
// Client is a single WebSocket connection. // Client is a single WebSocket connection.
type Client struct { type Client struct {
conn *websocket.Conn conn *websocket.Conn
send chan []byte send chan []byte
} }
func NewHub() *Hub { func NewHub() *Hub {
return &Hub{ return &Hub{
clients: make(map[*Client]bool), clients: make(map[*Client]bool),
} }
} }
func (h *Hub) ClientCount() int { func (h *Hub) ClientCount() int {
h.mu.RLock() h.mu.RLock()
defer h.mu.RUnlock() defer h.mu.RUnlock()
return len(h.clients) return len(h.clients)
} }
func (h *Hub) Register(c *Client) { func (h *Hub) Register(c *Client) {
h.mu.Lock() h.mu.Lock()
h.clients[c] = true h.clients[c] = true
h.mu.Unlock() h.mu.Unlock()
log.Printf("[ws] client connected (%d total)", h.ClientCount()) log.Printf("[ws] client connected (%d total)", h.ClientCount())
} }
func (h *Hub) Unregister(c *Client) { func (h *Hub) Unregister(c *Client) {
h.mu.Lock() h.mu.Lock()
if _, ok := h.clients[c]; ok { if _, ok := h.clients[c]; ok {
delete(h.clients, c) delete(h.clients, c)
close(c.send) close(c.send)
} }
h.mu.Unlock() h.mu.Unlock()
log.Printf("[ws] client disconnected (%d total)", h.ClientCount()) log.Printf("[ws] client disconnected (%d total)", h.ClientCount())
} }
// Broadcast sends a message to all connected clients. // Broadcast sends a message to all connected clients.
func (h *Hub) Broadcast(msg interface{}) { func (h *Hub) Broadcast(msg interface{}) {
data, err := json.Marshal(msg) data, err := json.Marshal(msg)
if err != nil { if err != nil {
log.Printf("[ws] marshal error: %v", err) log.Printf("[ws] marshal error: %v", err)
return return
} }
h.mu.RLock() h.mu.RLock()
defer h.mu.RUnlock() defer h.mu.RUnlock()
for c := range h.clients { for c := range h.clients {
select { select {
case c.send <- data: case c.send <- data:
default: default:
// Client buffer full — drop // Client buffer full — drop
} }
} }
} }
// ServeWS handles the WebSocket upgrade and runs the client. // ServeWS handles the WebSocket upgrade and runs the client.
func (h *Hub) ServeWS(w http.ResponseWriter, r *http.Request) { func (h *Hub) ServeWS(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil) conn, err := upgrader.Upgrade(w, r, nil)
if err != nil { if err != nil {
log.Printf("[ws] upgrade error: %v", err) log.Printf("[ws] upgrade error: %v", err)
return return
} }
client := &Client{ client := &Client{
conn: conn, conn: conn,
send: make(chan []byte, 256), send: make(chan []byte, 256),
} }
h.Register(client) h.Register(client)
go client.writePump() go client.writePump()
go client.readPump(h) go client.readPump(h)
} }
// wsOrStatic upgrades WebSocket requests at any path, serves static files otherwise. // wsOrStatic upgrades WebSocket requests at any path, serves static files otherwise.
func wsOrStatic(hub *Hub, static http.Handler) http.Handler { func wsOrStatic(hub *Hub, static http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.EqualFold(r.Header.Get("Upgrade"), "websocket") { if strings.EqualFold(r.Header.Get("Upgrade"), "websocket") {
hub.ServeWS(w, r) hub.ServeWS(w, r)
return return
} }
static.ServeHTTP(w, r) static.ServeHTTP(w, r)
}) })
} }
func (c *Client) readPump(hub *Hub) { func (c *Client) readPump(hub *Hub) {
defer func() { defer func() {
hub.Unregister(c) hub.Unregister(c)
c.conn.Close() c.conn.Close()
}() }()
c.conn.SetReadLimit(512) c.conn.SetReadLimit(512)
c.conn.SetReadDeadline(time.Now().Add(60 * time.Second)) c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
c.conn.SetPongHandler(func(string) error { c.conn.SetPongHandler(func(string) error {
c.conn.SetReadDeadline(time.Now().Add(60 * time.Second)) c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
return nil return nil
}) })
for { for {
_, _, err := c.conn.ReadMessage() _, _, err := c.conn.ReadMessage()
if err != nil { if err != nil {
break break
} }
} }
} }
func (c *Client) writePump() { func (c *Client) writePump() {
ticker := time.NewTicker(30 * time.Second) ticker := time.NewTicker(30 * time.Second)
defer func() { defer func() {
ticker.Stop() ticker.Stop()
c.conn.Close() c.conn.Close()
}() }()
for { for {
select { select {
case message, ok := <-c.send: case message, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if !ok { if !ok {
c.conn.WriteMessage(websocket.CloseMessage, []byte{}) c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return return
} }
if err := c.conn.WriteMessage(websocket.TextMessage, message); err != nil { if err := c.conn.WriteMessage(websocket.TextMessage, message); err != nil {
return return
} }
case <-ticker.C: case <-ticker.C:
c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil { if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return return
} }
} }
} }
} }
// Poller watches for new transmissions in SQLite and broadcasts them. // Poller watches for new transmissions in SQLite and broadcasts them.
type Poller struct { type Poller struct {
db *DB db *DB
hub *Hub hub *Hub
store *PacketStore // optional: if set, new transmissions are ingested into memory store *PacketStore // optional: if set, new transmissions are ingested into memory
interval time.Duration interval time.Duration
stop chan struct{} stop chan struct{}
} }
func NewPoller(db *DB, hub *Hub, interval time.Duration) *Poller { func NewPoller(db *DB, hub *Hub, interval time.Duration) *Poller {
return &Poller{db: db, hub: hub, interval: interval, stop: make(chan struct{})} return &Poller{db: db, hub: hub, interval: interval, stop: make(chan struct{})}
} }
func (p *Poller) Start() { func (p *Poller) Start() {
lastID := p.db.GetMaxTransmissionID() lastID := p.db.GetMaxTransmissionID()
lastObsID := p.db.GetMaxObservationID() lastObsID := p.db.GetMaxObservationID()
log.Printf("[poller] starting from transmission ID %d, obs ID %d, interval %v", lastID, lastObsID, p.interval) log.Printf("[poller] starting from transmission ID %d, obs ID %d, interval %v", lastID, lastObsID, p.interval)
ticker := time.NewTicker(p.interval) ticker := time.NewTicker(p.interval)
defer ticker.Stop() defer ticker.Stop()
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
if p.store != nil { if p.store != nil {
// Ingest new transmissions into in-memory store and broadcast // Ingest new transmissions into in-memory store and broadcast
newTxs, newMax := p.store.IngestNewFromDB(lastID, 100) newTxs, newMax := p.store.IngestNewFromDB(lastID, 100)
if newMax > lastID { if newMax > lastID {
lastID = newMax lastID = newMax
} }
// Ingest new observations for existing transmissions (fixes #174) // Ingest new observations for existing transmissions (fixes #174)
newObsMax := p.store.IngestNewObservations(lastObsID, 500) nextObsID := lastObsID
if newObsMax > lastObsID { if err := p.db.conn.QueryRow(`
lastObsID = newObsMax SELECT COALESCE(MAX(id), ?) FROM (
} SELECT id FROM observations
if len(newTxs) > 0 { WHERE id > ?
log.Printf("[broadcast] sending %d packets to %d clients (lastID now %d)", len(newTxs), p.hub.ClientCount(), lastID) ORDER BY id ASC
} LIMIT 500
for _, tx := range newTxs { )`, lastObsID, lastObsID).Scan(&nextObsID); err != nil {
p.hub.Broadcast(WSMessage{ nextObsID = lastObsID
Type: "packet", }
Data: tx, newObs := p.store.IngestNewObservations(lastObsID, 500)
}) if nextObsID > lastObsID {
} lastObsID = nextObsID
} else { }
// Fallback: direct DB query (used when store is nil, e.g. tests) if len(newTxs) > 0 {
newTxs, err := p.db.GetNewTransmissionsSince(lastID, 100) log.Printf("[broadcast] sending %d packets to %d clients (lastID now %d)", len(newTxs), p.hub.ClientCount(), lastID)
if err != nil { }
log.Printf("[poller] error: %v", err) for _, tx := range newTxs {
continue p.hub.Broadcast(WSMessage{
} Type: "packet",
for _, tx := range newTxs { Data: tx,
id, _ := tx["id"].(int) })
if id > lastID { }
lastID = id for _, obs := range newObs {
} p.hub.Broadcast(WSMessage{
// Copy packet fields for the nested packet (avoids circular ref) Type: "packet",
pkt := make(map[string]interface{}, len(tx)) Data: obs,
for k, v := range tx { })
pkt[k] = v }
} } else {
tx["packet"] = pkt // Fallback: direct DB query (used when store is nil, e.g. tests)
p.hub.Broadcast(WSMessage{ newTxs, err := p.db.GetNewTransmissionsSince(lastID, 100)
Type: "packet", if err != nil {
Data: tx, log.Printf("[poller] error: %v", err)
}) continue
} }
} for _, tx := range newTxs {
case <-p.stop: id, _ := tx["id"].(int)
return if id > lastID {
} lastID = id
} }
} // Copy packet fields for the nested packet (avoids circular ref)
pkt := make(map[string]interface{}, len(tx))
func (p *Poller) Stop() { for k, v := range tx {
close(p.stop) pkt[k] = v
} }
tx["packet"] = pkt
p.hub.Broadcast(WSMessage{
Type: "packet",
Data: tx,
})
}
}
case <-p.stop:
return
}
}
}
func (p *Poller) Stop() {
close(p.stop)
}

View File

@@ -1,275 +1,415 @@
package main package main
import ( import (
"encoding/json" "encoding/json"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"testing" "sort"
"time" "testing"
"time"
"github.com/gorilla/websocket"
) "github.com/gorilla/websocket"
)
func TestHubBroadcast(t *testing.T) {
hub := NewHub() func TestHubBroadcast(t *testing.T) {
hub := NewHub()
if hub.ClientCount() != 0 {
t.Errorf("expected 0 clients, got %d", hub.ClientCount()) if hub.ClientCount() != 0 {
} t.Errorf("expected 0 clients, got %d", hub.ClientCount())
}
// Create a test server with WebSocket endpoint
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Create a test server with WebSocket endpoint
hub.ServeWS(w, r) srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
})) hub.ServeWS(w, r)
defer srv.Close() }))
defer srv.Close()
// Connect a WebSocket client
wsURL := "ws" + srv.URL[4:] // replace http with ws // Connect a WebSocket client
conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil) wsURL := "ws" + srv.URL[4:] // replace http with ws
if err != nil { conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
t.Fatalf("dial error: %v", err) if err != nil {
} t.Fatalf("dial error: %v", err)
defer conn.Close() }
defer conn.Close()
// Wait for registration
time.Sleep(50 * time.Millisecond) // Wait for registration
time.Sleep(50 * time.Millisecond)
if hub.ClientCount() != 1 {
t.Errorf("expected 1 client, got %d", hub.ClientCount()) if hub.ClientCount() != 1 {
} t.Errorf("expected 1 client, got %d", hub.ClientCount())
}
// Broadcast a message
hub.Broadcast(map[string]interface{}{ // Broadcast a message
"type": "packet", hub.Broadcast(map[string]interface{}{
"data": map[string]interface{}{"id": 1, "hash": "test123"}, "type": "packet",
}) "data": map[string]interface{}{"id": 1, "hash": "test123"},
})
// Read the message
conn.SetReadDeadline(time.Now().Add(2 * time.Second)) // Read the message
_, msg, err := conn.ReadMessage() conn.SetReadDeadline(time.Now().Add(2 * time.Second))
if err != nil { _, msg, err := conn.ReadMessage()
t.Fatalf("read error: %v", err) if err != nil {
} t.Fatalf("read error: %v", err)
if len(msg) == 0 { }
t.Error("expected non-empty message") if len(msg) == 0 {
} t.Error("expected non-empty message")
}
// Disconnect
conn.Close() // Disconnect
time.Sleep(100 * time.Millisecond) conn.Close()
} time.Sleep(100 * time.Millisecond)
}
func TestPollerCreation(t *testing.T) {
db := setupTestDB(t) func TestPollerCreation(t *testing.T) {
defer db.Close() db := setupTestDB(t)
seedTestData(t, db) defer db.Close()
hub := NewHub() seedTestData(t, db)
hub := NewHub()
poller := NewPoller(db, hub, 100*time.Millisecond)
if poller == nil { poller := NewPoller(db, hub, 100*time.Millisecond)
t.Fatal("expected poller") if poller == nil {
} t.Fatal("expected poller")
}
// Start and stop
go poller.Start() // Start and stop
time.Sleep(200 * time.Millisecond) go poller.Start()
poller.Stop() time.Sleep(200 * time.Millisecond)
} poller.Stop()
}
func TestHubMultipleClients(t *testing.T) {
hub := NewHub() func TestHubMultipleClients(t *testing.T) {
hub := NewHub()
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
hub.ServeWS(w, r) srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
})) hub.ServeWS(w, r)
defer srv.Close() }))
defer srv.Close()
wsURL := "ws" + srv.URL[4:]
wsURL := "ws" + srv.URL[4:]
// Connect two clients
conn1, _, err := websocket.DefaultDialer.Dial(wsURL, nil) // Connect two clients
if err != nil { conn1, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
t.Fatalf("dial error: %v", err) if err != nil {
} t.Fatalf("dial error: %v", err)
defer conn1.Close() }
defer conn1.Close()
conn2, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
if err != nil { conn2, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
t.Fatalf("dial error: %v", err) if err != nil {
} t.Fatalf("dial error: %v", err)
defer conn2.Close() }
defer conn2.Close()
time.Sleep(100 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
if hub.ClientCount() != 2 {
t.Errorf("expected 2 clients, got %d", hub.ClientCount()) if hub.ClientCount() != 2 {
} t.Errorf("expected 2 clients, got %d", hub.ClientCount())
}
// Broadcast and both should receive
hub.Broadcast(map[string]interface{}{"type": "test", "data": "hello"}) // Broadcast and both should receive
hub.Broadcast(map[string]interface{}{"type": "test", "data": "hello"})
conn1.SetReadDeadline(time.Now().Add(2 * time.Second))
_, msg1, err := conn1.ReadMessage() conn1.SetReadDeadline(time.Now().Add(2 * time.Second))
if err != nil { _, msg1, err := conn1.ReadMessage()
t.Fatalf("conn1 read error: %v", err) if err != nil {
} t.Fatalf("conn1 read error: %v", err)
if len(msg1) == 0 { }
t.Error("expected non-empty message on conn1") if len(msg1) == 0 {
} t.Error("expected non-empty message on conn1")
}
conn2.SetReadDeadline(time.Now().Add(2 * time.Second))
_, msg2, err := conn2.ReadMessage() conn2.SetReadDeadline(time.Now().Add(2 * time.Second))
if err != nil { _, msg2, err := conn2.ReadMessage()
t.Fatalf("conn2 read error: %v", err) if err != nil {
} t.Fatalf("conn2 read error: %v", err)
if len(msg2) == 0 { }
t.Error("expected non-empty message on conn2") if len(msg2) == 0 {
} t.Error("expected non-empty message on conn2")
}
// Disconnect one
conn1.Close() // Disconnect one
time.Sleep(100 * time.Millisecond) conn1.Close()
time.Sleep(100 * time.Millisecond)
// Remaining client should still work
hub.Broadcast(map[string]interface{}{"type": "test2"}) // Remaining client should still work
hub.Broadcast(map[string]interface{}{"type": "test2"})
conn2.SetReadDeadline(time.Now().Add(2 * time.Second))
_, msg3, err := conn2.ReadMessage() conn2.SetReadDeadline(time.Now().Add(2 * time.Second))
if err != nil { _, msg3, err := conn2.ReadMessage()
t.Fatalf("conn2 read error after disconnect: %v", err) if err != nil {
} t.Fatalf("conn2 read error after disconnect: %v", err)
if len(msg3) == 0 { }
t.Error("expected non-empty message") if len(msg3) == 0 {
} t.Error("expected non-empty message")
} }
}
func TestBroadcastFullBuffer(t *testing.T) {
hub := NewHub() func TestBroadcastFullBuffer(t *testing.T) {
hub := NewHub()
// Create a client with tiny buffer (1)
client := &Client{ // Create a client with tiny buffer (1)
send: make(chan []byte, 1), client := &Client{
} send: make(chan []byte, 1),
hub.mu.Lock() }
hub.clients[client] = true hub.mu.Lock()
hub.mu.Unlock() hub.clients[client] = true
hub.mu.Unlock()
// Fill the buffer
client.send <- []byte("first") // Fill the buffer
client.send <- []byte("first")
// This broadcast should drop the message (buffer full)
hub.Broadcast(map[string]interface{}{"type": "dropped"}) // This broadcast should drop the message (buffer full)
hub.Broadcast(map[string]interface{}{"type": "dropped"})
// Channel should still only have the first message
select { // Channel should still only have the first message
case msg := <-client.send: select {
if string(msg) != "first" { case msg := <-client.send:
t.Errorf("expected 'first', got %s", string(msg)) if string(msg) != "first" {
} t.Errorf("expected 'first', got %s", string(msg))
default: }
t.Error("expected message in channel") default:
} t.Error("expected message in channel")
}
// Clean up
hub.mu.Lock() // Clean up
delete(hub.clients, client) hub.mu.Lock()
hub.mu.Unlock() delete(hub.clients, client)
} hub.mu.Unlock()
}
func TestBroadcastMarshalError(t *testing.T) {
hub := NewHub() func TestBroadcastMarshalError(t *testing.T) {
hub := NewHub()
// Marshal error: functions can't be marshaled to JSON
hub.Broadcast(map[string]interface{}{"bad": func() {}}) // Marshal error: functions can't be marshaled to JSON
// Should not panic — just log and return hub.Broadcast(map[string]interface{}{"bad": func() {}})
} // Should not panic — just log and return
}
func TestPollerBroadcastsNewData(t *testing.T) {
db := setupTestDB(t) func TestPollerBroadcastsNewData(t *testing.T) {
defer db.Close() db := setupTestDB(t)
seedTestData(t, db) defer db.Close()
hub := NewHub() seedTestData(t, db)
hub := NewHub()
// Create a client to receive broadcasts
client := &Client{ // Create a client to receive broadcasts
send: make(chan []byte, 256), client := &Client{
} send: make(chan []byte, 256),
hub.mu.Lock() }
hub.clients[client] = true hub.mu.Lock()
hub.mu.Unlock() hub.clients[client] = true
hub.mu.Unlock()
poller := NewPoller(db, hub, 50*time.Millisecond)
go poller.Start() poller := NewPoller(db, hub, 50*time.Millisecond)
go poller.Start()
// Insert new data to trigger broadcast
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type) // Insert new data to trigger broadcast
VALUES ('EEFF', 'newhash123456789', '2026-01-16T10:00:00Z', 1, 4)`) db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type)
VALUES ('EEFF', 'newhash123456789', '2026-01-16T10:00:00Z', 1, 4)`)
time.Sleep(200 * time.Millisecond)
poller.Stop() time.Sleep(200 * time.Millisecond)
poller.Stop()
// Check if client received broadcast with packet field (fixes #162)
select { // Check if client received broadcast with packet field (fixes #162)
case msg := <-client.send: select {
if len(msg) == 0 { case msg := <-client.send:
t.Error("expected non-empty broadcast message") if len(msg) == 0 {
} t.Error("expected non-empty broadcast message")
var parsed map[string]interface{} }
if err := json.Unmarshal(msg, &parsed); err != nil { var parsed map[string]interface{}
t.Fatalf("failed to parse broadcast: %v", err) if err := json.Unmarshal(msg, &parsed); err != nil {
} t.Fatalf("failed to parse broadcast: %v", err)
if parsed["type"] != "packet" { }
t.Errorf("expected type=packet, got %v", parsed["type"]) if parsed["type"] != "packet" {
} t.Errorf("expected type=packet, got %v", parsed["type"])
data, ok := parsed["data"].(map[string]interface{}) }
if !ok { data, ok := parsed["data"].(map[string]interface{})
t.Fatal("expected data to be an object") if !ok {
} t.Fatal("expected data to be an object")
// packets.js filters on m.data.packet — must exist }
pkt, ok := data["packet"] // packets.js filters on m.data.packet — must exist
if !ok || pkt == nil { pkt, ok := data["packet"]
t.Error("expected data.packet to exist (required by packets.js WS handler)") if !ok || pkt == nil {
} t.Error("expected data.packet to exist (required by packets.js WS handler)")
pktMap, ok := pkt.(map[string]interface{}) }
if !ok { pktMap, ok := pkt.(map[string]interface{})
t.Fatal("expected data.packet to be an object") if !ok {
} t.Fatal("expected data.packet to be an object")
// Verify key fields exist in nested packet (timestamp required by packets.js) }
for _, field := range []string{"id", "hash", "payload_type", "timestamp"} { // Verify key fields exist in nested packet (timestamp required by packets.js)
if _, exists := pktMap[field]; !exists { for _, field := range []string{"id", "hash", "payload_type", "timestamp"} {
t.Errorf("expected data.packet.%s to exist", field) if _, exists := pktMap[field]; !exists {
} t.Errorf("expected data.packet.%s to exist", field)
} }
default: }
// Might not have received due to timing default:
} // Might not have received due to timing
}
// Clean up
hub.mu.Lock() // Clean up
delete(hub.clients, client) hub.mu.Lock()
hub.mu.Unlock() delete(hub.clients, client)
} hub.mu.Unlock()
}
func TestHubRegisterUnregister(t *testing.T) {
hub := NewHub() func TestPollerBroadcastsMultipleObservations(t *testing.T) {
db := setupTestDB(t)
client := &Client{ defer db.Close()
send: make(chan []byte, 256), seedTestData(t, db)
} hub := NewHub()
hub.Register(client) client := &Client{
if hub.ClientCount() != 1 { send: make(chan []byte, 256),
t.Errorf("expected 1 client after register, got %d", hub.ClientCount()) }
} hub.mu.Lock()
hub.clients[client] = true
hub.Unregister(client) hub.mu.Unlock()
if hub.ClientCount() != 0 { defer func() {
t.Errorf("expected 0 clients after unregister, got %d", hub.ClientCount()) hub.mu.Lock()
} delete(hub.clients, client)
hub.mu.Unlock()
// Unregister again should be safe }()
hub.Unregister(client)
if hub.ClientCount() != 0 { poller := NewPoller(db, hub, 50*time.Millisecond)
t.Errorf("expected 0 clients, got %d", hub.ClientCount()) store := NewPacketStore(db)
} if err := store.Load(); err != nil {
} t.Fatalf("store load failed: %v", err)
}
poller.store = store
go poller.Start()
defer poller.Stop()
// Wait for poller to initialize its lastID/lastObsID cursors before
// inserting new data; otherwise the poller may snapshot a lastID that
// already includes the test data and never broadcast it.
time.Sleep(100 * time.Millisecond)
now := time.Now().UTC().Format(time.RFC3339)
if _, err := db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json)
VALUES ('FACE', 'starbursthash237a', ?, 1, 4, '{"pubKey":"aabbccdd11223344","type":"ADVERT"}')`, now); err != nil {
t.Fatalf("insert tx failed: %v", err)
}
var txID int
if err := db.conn.QueryRow(`SELECT id FROM transmissions WHERE hash='starbursthash237a'`).Scan(&txID); err != nil {
t.Fatalf("query tx id failed: %v", err)
}
ts := time.Now().Unix()
if _, err := db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
VALUES (?, 1, 14.0, -82, '["aa"]', ?),
(?, 2, 10.5, -90, '["aa","bb"]', ?),
(?, 1, 7.0, -96, '["aa","bb","cc"]', ?)`,
txID, ts, txID, ts+1, txID, ts+2); err != nil {
t.Fatalf("insert observations failed: %v", err)
}
deadline := time.After(2 * time.Second)
var dataMsgs []map[string]interface{}
for len(dataMsgs) < 3 {
select {
case raw := <-client.send:
var parsed map[string]interface{}
if err := json.Unmarshal(raw, &parsed); err != nil {
t.Fatalf("unmarshal ws msg failed: %v", err)
}
if parsed["type"] != "packet" {
continue
}
data, ok := parsed["data"].(map[string]interface{})
if !ok {
continue
}
if data["hash"] == "starbursthash237a" {
dataMsgs = append(dataMsgs, data)
}
case <-deadline:
t.Fatalf("timed out waiting for 3 observation broadcasts, got %d", len(dataMsgs))
}
}
if len(dataMsgs) != 3 {
t.Fatalf("expected 3 messages, got %d", len(dataMsgs))
}
paths := make([]string, 0, 3)
observers := make(map[string]bool)
for _, m := range dataMsgs {
hash, _ := m["hash"].(string)
if hash != "starbursthash237a" {
t.Fatalf("unexpected hash %q", hash)
}
p, _ := m["path_json"].(string)
paths = append(paths, p)
if oid, ok := m["observer_id"].(string); ok && oid != "" {
observers[oid] = true
}
}
sort.Strings(paths)
wantPaths := []string{`["aa","bb","cc"]`, `["aa","bb"]`, `["aa"]`}
sort.Strings(wantPaths)
for i := range wantPaths {
if paths[i] != wantPaths[i] {
t.Fatalf("path mismatch at %d: got %q want %q", i, paths[i], wantPaths[i])
}
}
if len(observers) < 2 {
t.Fatalf("expected observations from >=2 observers, got %d", len(observers))
}
}
func TestIngestNewObservationsBroadcast(t *testing.T) {
db := setupTestDB(t)
defer db.Close()
seedTestData(t, db)
store := NewPacketStore(db)
if err := store.Load(); err != nil {
t.Fatalf("store load failed: %v", err)
}
maxObs := db.GetMaxObservationID()
now := time.Now().Unix()
if _, err := db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
VALUES (1, 2, 6.0, -100, '["aa","zz"]', ?),
(1, 1, 5.0, -101, '["aa","yy"]', ?)`, now, now+1); err != nil {
t.Fatalf("insert new observations failed: %v", err)
}
maps := store.IngestNewObservations(maxObs, 500)
if len(maps) != 2 {
t.Fatalf("expected 2 broadcast maps, got %d", len(maps))
}
for _, m := range maps {
if m["hash"] != "abc123def4567890" {
t.Fatalf("unexpected hash in map: %v", m["hash"])
}
path, ok := m["path_json"].(string)
if !ok || path == "" {
t.Fatalf("missing path_json in map: %#v", m)
}
if _, ok := m["observer_id"]; !ok {
t.Fatalf("missing observer_id in map: %#v", m)
}
}
}
func TestHubRegisterUnregister(t *testing.T) {
hub := NewHub()
client := &Client{
send: make(chan []byte, 256),
}
hub.Register(client)
if hub.ClientCount() != 1 {
t.Errorf("expected 1 client after register, got %d", hub.ClientCount())
}
hub.Unregister(client)
if hub.ClientCount() != 0 {
t.Errorf("expected 0 clients after unregister, got %d", hub.ClientCount())
}
// Unregister again should be safe
hub.Unregister(client)
if hub.ClientCount() != 0 {
t.Errorf("expected 0 clients, got %d", hub.ClientCount())
}
}

View File

@@ -2,8 +2,8 @@
* MeshCore Packet Decoder * MeshCore Packet Decoder
* Custom implementation — does NOT use meshcore-decoder library (known path_length bug). * Custom implementation — does NOT use meshcore-decoder library (known path_length bug).
* *
* Packet layout: * Packet layout (per firmware docs/packet_format.md):
* [header(1)] [pathLength(1)] [transportCodes?] [path hops] [payload...] * [header(1)] [transportCodes?(4)] [pathLength(1)] [path hops] [payload...]
* *
* Header byte (LSB first): * Header byte (LSB first):
* bits 1-0: routeType (0=TRANSPORT_FLOOD, 1=FLOOD, 2=DIRECT, 3=TRANSPORT_DIRECT) * bits 1-0: routeType (0=TRANSPORT_FLOOD, 1=FLOOD, 2=DIRECT, 3=TRANSPORT_DIRECT)
@@ -42,7 +42,7 @@ const PAYLOAD_TYPES = {
0x0F: 'RAW_CUSTOM', 0x0F: 'RAW_CUSTOM',
}; };
// Route types that carry transport codes (nextHop + lastHop, 2 bytes each) // Route types that carry transport codes (2x uint16_t, 4 bytes total)
const TRANSPORT_ROUTES = new Set([0, 3]); // TRANSPORT_FLOOD, TRANSPORT_DIRECT const TRANSPORT_ROUTES = new Set([0, 3]); // TRANSPORT_FLOOD, TRANSPORT_DIRECT
// --- Header parsing --- // --- Header parsing ---
@@ -94,13 +94,11 @@ function decodeEncryptedPayload(buf) {
}; };
} }
/** ACK: dest(1) + src(1) + ack_hash(4) (per Mesh.cpp) */ /** ACK: checksum(4) — CRC of message timestamp + text + sender pubkey (per Mesh.cpp createAck) */
function decodeAck(buf) { function decodeAck(buf) {
if (buf.length < 6) return { error: 'too short', raw: buf.toString('hex') }; if (buf.length < 4) return { error: 'too short', raw: buf.toString('hex') };
return { return {
destHash: buf.subarray(0, 1).toString('hex'), ackChecksum: buf.subarray(0, 4).toString('hex'),
srcHash: buf.subarray(1, 2).toString('hex'),
extraHash: buf.subarray(2, 6).toString('hex'),
}; };
} }
@@ -125,6 +123,8 @@ function decodeAdvert(buf) {
room: advType === 3, room: advType === 3,
sensor: advType === 4, sensor: advType === 4,
hasLocation: !!(flags & 0x10), hasLocation: !!(flags & 0x10),
hasFeat1: !!(flags & 0x20),
hasFeat2: !!(flags & 0x40),
hasName: !!(flags & 0x80), hasName: !!(flags & 0x80),
}; };
@@ -134,6 +134,14 @@ function decodeAdvert(buf) {
result.lon = appdata.readInt32LE(off + 4) / 1e6; result.lon = appdata.readInt32LE(off + 4) / 1e6;
off += 8; off += 8;
} }
if (result.flags.hasFeat1 && appdata.length >= off + 2) {
result.feat1 = appdata.readUInt16LE(off);
off += 2;
}
if (result.flags.hasFeat2 && appdata.length >= off + 2) {
result.feat2 = appdata.readUInt16LE(off);
off += 2;
}
if (result.flags.hasName) { if (result.flags.hasName) {
// Find null terminator to separate name from trailing telemetry bytes // Find null terminator to separate name from trailing telemetry bytes
let nameEnd = appdata.length; let nameEnd = appdata.length;
@@ -231,7 +239,7 @@ function decodeGrpTxt(buf, channelKeys) {
return { type: 'GRP_TXT', channelHash, channelHashHex, decryptionStatus: 'no_key', mac, encryptedData }; return { type: 'GRP_TXT', channelHash, channelHashHex, decryptionStatus: 'no_key', mac, encryptedData };
} }
/** ANON_REQ: dest(6) + ephemeral_pubkey(32) + MAC(4) + encrypted */ /** ANON_REQ: dest(1) + ephemeral_pubkey(32) + MAC(2) + encrypted */
function decodeAnonReq(buf) { function decodeAnonReq(buf) {
if (buf.length < 35) return { error: 'too short', raw: buf.toString('hex') }; if (buf.length < 35) return { error: 'too short', raw: buf.toString('hex') };
return { return {
@@ -242,7 +250,7 @@ function decodeAnonReq(buf) {
}; };
} }
/** PATH: dest(6) + src(6) + MAC(4) + path_data */ /** PATH: dest(1) + src(1) + MAC(2) + path_data */
function decodePath_payload(buf) { function decodePath_payload(buf) {
if (buf.length < 4) return { error: 'too short', raw: buf.toString('hex') }; if (buf.length < 4) return { error: 'too short', raw: buf.toString('hex') };
return { return {
@@ -253,14 +261,14 @@ function decodePath_payload(buf) {
}; };
} }
/** TRACE: flags(1) + tag(4) + dest(6) + src(1) */ /** TRACE: tag(4) + authCode(4) + flags(1) + pathData (per Mesh.cpp onRecvPacket TRACE) */
function decodeTrace(buf) { function decodeTrace(buf) {
if (buf.length < 12) return { error: 'too short', raw: buf.toString('hex') }; if (buf.length < 9) return { error: 'too short', raw: buf.toString('hex') };
return { return {
flags: buf[0], tag: buf.readUInt32LE(0),
tag: buf.readUInt32LE(1), authCode: buf.subarray(4, 8).toString('hex'),
destHash: buf.subarray(5, 11).toString('hex'), flags: buf[8],
srcHash: buf.subarray(11, 12).toString('hex'), pathData: buf.subarray(9).toString('hex'),
}; };
} }
@@ -289,20 +297,22 @@ function decodePacket(hexString, channelKeys) {
if (buf.length < 2) throw new Error('Packet too short (need at least header + pathLength)'); if (buf.length < 2) throw new Error('Packet too short (need at least header + pathLength)');
const header = decodeHeader(buf[0]); const header = decodeHeader(buf[0]);
const pathByte = buf[1]; let offset = 1;
let offset = 2;
// Transport codes for TRANSPORT_FLOOD / TRANSPORT_DIRECT // Transport codes for TRANSPORT_FLOOD / TRANSPORT_DIRECT — BEFORE path_length per spec
let transportCodes = null; let transportCodes = null;
if (TRANSPORT_ROUTES.has(header.routeType)) { if (TRANSPORT_ROUTES.has(header.routeType)) {
if (buf.length < offset + 4) throw new Error('Packet too short for transport codes'); if (buf.length < offset + 4) throw new Error('Packet too short for transport codes');
transportCodes = { transportCodes = {
nextHop: buf.subarray(offset, offset + 2).toString('hex').toUpperCase(), code1: buf.subarray(offset, offset + 2).toString('hex').toUpperCase(),
lastHop: buf.subarray(offset + 2, offset + 4).toString('hex').toUpperCase(), code2: buf.subarray(offset + 2, offset + 4).toString('hex').toUpperCase(),
}; };
offset += 4; offset += 4;
} }
// Path length byte — AFTER transport codes per spec
const pathByte = buf[offset++];
// Path // Path
const path = decodePath(pathByte, buf, offset); const path = decodePath(pathByte, buf, offset);
offset += path.bytesConsumed; offset += path.bytesConsumed;
@@ -386,7 +396,7 @@ module.exports = { decodePacket, validateAdvert, hasNonPrintableChars, ROUTE_TYP
// --- Tests --- // --- Tests ---
if (require.main === module) { if (require.main === module) {
console.log('=== Test 1: ADVERT, FLOOD, 5 hops (2-byte hashes), "Test Repeater" ==='); console.log('=== Test 1: ADVERT, FLOOD, 5 hops (2-byte hashes), "Kpa Roof Solar" ===');
const pkt1 = decodePacket( const pkt1 = decodePacket(
'11451000D818206D3AAC152C8A91F89957E6D30CA51F36E28790228971C473B755F244F718754CF5EE4A2FD58D944466E42CDED140C66D0CC590183E32BAF40F112BE8F3F2BDF6012B4B2793C52F1D36F69EE054D9A05593286F78453E56C0EC4A3EB95DDA2A7543FCCC00B939CACC009278603902FC12BCF84B706120526F6F6620536F6C6172' '11451000D818206D3AAC152C8A91F89957E6D30CA51F36E28790228971C473B755F244F718754CF5EE4A2FD58D944466E42CDED140C66D0CC590183E32BAF40F112BE8F3F2BDF6012B4B2793C52F1D36F69EE054D9A05593286F78453E56C0EC4A3EB95DDA2A7543FCCC00B939CACC009278603902FC12BCF84B706120526F6F6620536F6C6172'
); );
@@ -402,7 +412,7 @@ if (require.main === module) {
assert(pkt1.path.hops[0] === '1000', 'first hop should be 1000'); assert(pkt1.path.hops[0] === '1000', 'first hop should be 1000');
assert(pkt1.path.hops[1] === 'D818', 'second hop should be D818'); assert(pkt1.path.hops[1] === 'D818', 'second hop should be D818');
assert(pkt1.transportCodes === null, 'FLOOD has no transport codes'); assert(pkt1.transportCodes === null, 'FLOOD has no transport codes');
assert(pkt1.payload.name === 'Test Repeater', 'name should be "Test Repeater"'); assert(pkt1.payload.name === 'Kpa Roof Solar', 'name should be "Kpa Roof Solar"');
console.log('✅ Test 1 passed\n'); console.log('✅ Test 1 passed\n');
console.log('=== Test 2: ADVERT, FLOOD, 0 hops (zero-path) ==='); console.log('=== Test 2: ADVERT, FLOOD, 0 hops (zero-path) ===');

View File

@@ -22,9 +22,9 @@
<meta name="twitter:title" content="CoreScope"> <meta name="twitter:title" content="CoreScope">
<meta name="twitter:description" content="Real-time MeshCore LoRa mesh network analyzer — live packet visualization, node tracking, channel decryption, and route analysis."> <meta name="twitter:description" content="Real-time MeshCore LoRa mesh network analyzer — live packet visualization, node tracking, channel decryption, and route analysis.">
<meta name="twitter:image" content="https://raw.githubusercontent.com/Kpa-clawbot/corescope/master/public/og-image.png"> <meta name="twitter:image" content="https://raw.githubusercontent.com/Kpa-clawbot/corescope/master/public/og-image.png">
<link rel="stylesheet" href="style.css?v=1774731523"> <link rel="stylesheet" href="style.css?v=1774786038">
<link rel="stylesheet" href="home.css?v=1774731523"> <link rel="stylesheet" href="home.css?v=1774786038">
<link rel="stylesheet" href="live.css?v=1774731523"> <link rel="stylesheet" href="live.css?v=1774786038">
<link rel="stylesheet" href="https://unpkg.com/leaflet@1.9.4/dist/leaflet.css" <link rel="stylesheet" href="https://unpkg.com/leaflet@1.9.4/dist/leaflet.css"
integrity="sha256-p4NxAoJBhIIN+hmNHrzRCf9tD/miZyoHS5obTRR9BMY=" integrity="sha256-p4NxAoJBhIIN+hmNHrzRCf9tD/miZyoHS5obTRR9BMY="
crossorigin="anonymous"> crossorigin="anonymous">
@@ -81,29 +81,29 @@
<main id="app" role="main"></main> <main id="app" role="main"></main>
<script src="vendor/qrcode.js"></script> <script src="vendor/qrcode.js"></script>
<script src="roles.js?v=1774731523"></script> <script src="roles.js?v=1774786038"></script>
<script src="customize.js?v=1774731523" onerror="console.error('Failed to load:', this.src)"></script> <script src="customize.js?v=1774786038" onerror="console.error('Failed to load:', this.src)"></script>
<script src="region-filter.js?v=1774731523"></script> <script src="region-filter.js?v=1774786038"></script>
<script src="hop-resolver.js?v=1774731523"></script> <script src="hop-resolver.js?v=1774786038"></script>
<script src="hop-display.js?v=1774731523"></script> <script src="hop-display.js?v=1774786038"></script>
<script src="app.js?v=1774731523"></script> <script src="app.js?v=1774786038"></script>
<script src="home.js?v=1774731523"></script> <script src="home.js?v=1774786038"></script>
<script src="packet-filter.js?v=1774731523"></script> <script src="packet-filter.js?v=1774786038"></script>
<script src="packets.js?v=1774731523"></script> <script src="packets.js?v=1774786038"></script>
<script src="map.js?v=1774731523" onerror="console.error('Failed to load:', this.src)"></script> <script src="map.js?v=1774786038" onerror="console.error('Failed to load:', this.src)"></script>
<script src="channels.js?v=1774731523" onerror="console.error('Failed to load:', this.src)"></script> <script src="channels.js?v=1774786038" onerror="console.error('Failed to load:', this.src)"></script>
<script src="nodes.js?v=1774731523" onerror="console.error('Failed to load:', this.src)"></script> <script src="nodes.js?v=1774786038" onerror="console.error('Failed to load:', this.src)"></script>
<script src="traces.js?v=1774731523" onerror="console.error('Failed to load:', this.src)"></script> <script src="traces.js?v=1774786038" onerror="console.error('Failed to load:', this.src)"></script>
<script src="analytics.js?v=1774731523" onerror="console.error('Failed to load:', this.src)"></script> <script src="analytics.js?v=1774786038" onerror="console.error('Failed to load:', this.src)"></script>
<script src="audio.js?v=1774731523" onerror="console.error('Failed to load:', this.src)"></script> <script src="audio.js?v=1774786038" onerror="console.error('Failed to load:', this.src)"></script>
<script src="audio-v1-constellation.js?v=1774731523" onerror="console.error('Failed to load:', this.src)"></script> <script src="audio-v1-constellation.js?v=1774786038" onerror="console.error('Failed to load:', this.src)"></script>
<script src="audio-v2-constellation.js?v=1774731523" onerror="console.error('Failed to load:', this.src)"></script> <script src="audio-v2-constellation.js?v=1774786038" onerror="console.error('Failed to load:', this.src)"></script>
<script src="audio-lab.js?v=1774731523" onerror="console.error('Failed to load:', this.src)"></script> <script src="audio-lab.js?v=1774786038" onerror="console.error('Failed to load:', this.src)"></script>
<script src="live.js?v=1774731523" onerror="console.error('Failed to load:', this.src)"></script> <script src="live.js?v=1774786038" onerror="console.error('Failed to load:', this.src)"></script>
<script src="observers.js?v=1774731523" onerror="console.error('Failed to load:', this.src)"></script> <script src="observers.js?v=1774786038" onerror="console.error('Failed to load:', this.src)"></script>
<script src="observer-detail.js?v=1774731523" onerror="console.error('Failed to load:', this.src)"></script> <script src="observer-detail.js?v=1774786038" onerror="console.error('Failed to load:', this.src)"></script>
<script src="compare.js?v=1774731523" onerror="console.error('Failed to load:', this.src)"></script> <script src="compare.js?v=1774786038" onerror="console.error('Failed to load:', this.src)"></script>
<script src="node-analytics.js?v=1774731523" onerror="console.error('Failed to load:', this.src)"></script> <script src="node-analytics.js?v=1774786038" onerror="console.error('Failed to load:', this.src)"></script>
<script src="perf.js?v=1774731523" onerror="console.error('Failed to load:', this.src)"></script> <script src="perf.js?v=1774786038" onerror="console.error('Failed to load:', this.src)"></script>
</body> </body>
</html> </html>

View File

@@ -1512,14 +1512,12 @@
rows += fieldRow(off + 1, 'Sender', decoded.sender || '—', ''); rows += fieldRow(off + 1, 'Sender', decoded.sender || '—', '');
if (decoded.sender_timestamp) rows += fieldRow(off + 2, 'Sender Time', decoded.sender_timestamp, ''); if (decoded.sender_timestamp) rows += fieldRow(off + 2, 'Sender Time', decoded.sender_timestamp, '');
} else if (decoded.type === 'ACK') { } else if (decoded.type === 'ACK') {
rows += fieldRow(off, 'Dest Hash (6B)', decoded.destHash || '', ''); rows += fieldRow(off, 'Checksum (4B)', decoded.ackChecksum || '', '');
rows += fieldRow(off + 6, 'Src Hash (6B)', decoded.srcHash || '', '');
rows += fieldRow(off + 12, 'Extra (6B)', decoded.extraHash || '', '');
} else if (decoded.destHash !== undefined) { } else if (decoded.destHash !== undefined) {
rows += fieldRow(off, 'Dest Hash (6B)', decoded.destHash || '', ''); rows += fieldRow(off, 'Dest Hash (1B)', decoded.destHash || '', '');
rows += fieldRow(off + 6, 'Src Hash (6B)', decoded.srcHash || '', ''); rows += fieldRow(off + 1, 'Src Hash (1B)', decoded.srcHash || '', '');
rows += fieldRow(off + 12, 'MAC (4B)', decoded.mac || '', ''); rows += fieldRow(off + 2, 'MAC (2B)', decoded.mac || '', '');
rows += fieldRow(off + 16, 'Encrypted Data', truncate(decoded.encryptedData || '', 30), ''); rows += fieldRow(off + 4, 'Encrypted Data', truncate(decoded.encryptedData || '', 30), '');
} else { } else {
rows += fieldRow(off, 'Raw', truncate(buf.slice(off * 2), 40), ''); rows += fieldRow(off, 'Raw', truncate(buf.slice(off * 2), 40), '');
} }

View File

@@ -40,12 +40,12 @@
html += `<h3>🔧 Go Runtime</h3><div style="display:flex;gap:16px;flex-wrap:wrap;margin:8px 0;"> html += `<h3>🔧 Go Runtime</h3><div style="display:flex;gap:16px;flex-wrap:wrap;margin:8px 0;">
<div class="perf-card"><div class="perf-num">${gr.goroutines}</div><div class="perf-label">Goroutines</div></div> <div class="perf-card"><div class="perf-num">${gr.goroutines}</div><div class="perf-label">Goroutines</div></div>
<div class="perf-card"><div class="perf-num">${gr.numGC}</div><div class="perf-label">GC Collections</div></div> <div class="perf-card"><div class="perf-num">${gr.numGC}</div><div class="perf-label">GC Collections</div></div>
<div class="perf-card"><div class="perf-num" style="color:${gcColor}">${gr.pauseTotalMs}ms</div><div class="perf-label">GC Pause Total</div></div> <div class="perf-card"><div class="perf-num" style="color:${gcColor}">${(+gr.pauseTotalMs).toFixed(1)}ms</div><div class="perf-label">GC Pause Total</div></div>
<div class="perf-card"><div class="perf-num">${gr.lastPauseMs}ms</div><div class="perf-label">Last GC Pause</div></div> <div class="perf-card"><div class="perf-num">${(+gr.lastPauseMs).toFixed(1)}ms</div><div class="perf-label">Last GC Pause</div></div>
<div class="perf-card"><div class="perf-num">${gr.heapAllocMB}MB</div><div class="perf-label">Heap Alloc</div></div> <div class="perf-card"><div class="perf-num">${(+gr.heapAllocMB).toFixed(1)}MB</div><div class="perf-label">Heap Alloc</div></div>
<div class="perf-card"><div class="perf-num">${gr.heapSysMB}MB</div><div class="perf-label">Heap Sys</div></div> <div class="perf-card"><div class="perf-num">${(+gr.heapSysMB).toFixed(1)}MB</div><div class="perf-label">Heap Sys</div></div>
<div class="perf-card"><div class="perf-num">${gr.heapInuseMB}MB</div><div class="perf-label">Heap Inuse</div></div> <div class="perf-card"><div class="perf-num">${(+gr.heapInuseMB).toFixed(1)}MB</div><div class="perf-label">Heap Inuse</div></div>
<div class="perf-card"><div class="perf-num">${gr.heapIdleMB}MB</div><div class="perf-label">Heap Idle</div></div> <div class="perf-card"><div class="perf-num">${(+gr.heapIdleMB).toFixed(1)}MB</div><div class="perf-label">Heap Idle</div></div>
<div class="perf-card"><div class="perf-num">${gr.numCPU}</div><div class="perf-label">CPUs</div></div> <div class="perf-card"><div class="perf-num">${gr.numCPU}</div><div class="perf-label">CPUs</div></div>
<div class="perf-card"><div class="perf-num">${health.websocket.clients}</div><div class="perf-label">WS Clients</div></div> <div class="perf-card"><div class="perf-num">${health.websocket.clients}</div><div class="perf-label">WS Clients</div></div>
</div>`; </div>`;

View File

@@ -155,7 +155,7 @@ a:focus-visible, button:focus-visible, input:focus-visible, select:focus-visible
/* === Nav Stats === */ /* === Nav Stats === */
.nav-stats { .nav-stats {
display: flex; gap: 12px; align-items: center; font-size: 12px; color: var(--nav-text-muted); display: flex; gap: 12px; align-items: center; font-size: 12px; color: var(--nav-text-muted);
font-family: var(--mono); margin-right: 4px; font-family: var(--mono); margin-right: 4px; white-space: nowrap;
} }
.nav-stats .stat-val { color: var(--nav-text); font-weight: 600; transition: color 0.3s ease; } .nav-stats .stat-val { color: var(--nav-text); font-weight: 600; transition: color 0.3s ease; }
.nav-stats .stat-val.updated { color: var(--accent); } .nav-stats .stat-val.updated { color: var(--accent); }

View File

@@ -207,6 +207,13 @@ class TTLCache {
if (key.startsWith(prefix)) this.store.delete(key); if (key.startsWith(prefix)) this.store.delete(key);
} }
} }
debouncedInvalidateBulkHealth() {
if (this._bulkHealthTimer) return;
this._bulkHealthTimer = setTimeout(() => {
this._bulkHealthTimer = null;
this.invalidate('bulk-health');
}, 30000);
}
debouncedInvalidateAll() { debouncedInvalidateAll() {
if (this._debounceTimer) return; if (this._debounceTimer) return;
this._debounceTimer = setTimeout(() => { this._debounceTimer = setTimeout(() => {
@@ -410,7 +417,7 @@ app.get('/api/perf', (req, res) => {
avgMs: perfStats.requests ? Math.round(perfStats.totalMs / perfStats.requests * 10) / 10 : 0, avgMs: perfStats.requests ? Math.round(perfStats.totalMs / perfStats.requests * 10) / 10 : 0,
endpoints: Object.fromEntries(sorted), endpoints: Object.fromEntries(sorted),
slowQueries: perfStats.slowQueries.slice(-20), slowQueries: perfStats.slowQueries.slice(-20),
cache: { size: cache.size, hits: cache.hits, misses: cache.misses, staleHits: cache.staleHits, recomputes: cache.recomputes, hitRate: cache.hits + cache.misses > 0 ? Math.round(cache.hits / (cache.hits + cache.misses) * 1000) / 10 : 0 }, cache: { size: cache.size, hits: cache.hits, misses: cache.misses, staleHits: cache.staleHits, recomputes: cache.recomputes, hitRate: cache.hits + cache.staleHits + cache.misses > 0 ? Math.round((cache.hits + cache.staleHits) / (cache.hits + cache.staleHits + cache.misses) * 1000) / 10 : 0 },
packetStore: pktStore.getStats(), packetStore: pktStore.getStats(),
sqlite: (() => { sqlite: (() => {
try { try {
@@ -519,7 +526,7 @@ app.get('/api/health', (req, res) => {
misses: cache.misses, misses: cache.misses,
staleHits: cache.staleHits, staleHits: cache.staleHits,
recomputes: cache.recomputes, recomputes: cache.recomputes,
hitRate: cache.hits + cache.misses > 0 ? Math.round(cache.hits / (cache.hits + cache.misses) * 1000) / 10 : 0, hitRate: cache.hits + cache.staleHits + cache.misses > 0 ? Math.round((cache.hits + cache.staleHits) / (cache.hits + cache.staleHits + cache.misses) * 1000) / 10 : 0,
}, },
websocket: { websocket: {
clients: wsClients, clients: wsClients,
@@ -723,7 +730,7 @@ for (const source of mqttSources) {
// Invalidate this node's caches on advert // Invalidate this node's caches on advert
cache.invalidate('node:' + p.pubKey); cache.invalidate('node:' + p.pubKey);
cache.invalidate('health:' + p.pubKey); cache.invalidate('health:' + p.pubKey);
cache.invalidate('bulk-health'); cache.debouncedInvalidateBulkHealth();
// Cross-reference: if this node's pubkey matches an existing observer, backfill observer name // Cross-reference: if this node's pubkey matches an existing observer, backfill observer name
if (p.name && p.pubKey) { if (p.name && p.pubKey) {

View File

@@ -122,13 +122,14 @@ console.log('── Spec Tests: Transport Codes ──');
{ {
// Route type 0 (TRANSPORT_FLOOD) and 3 (TRANSPORT_DIRECT) should have 4-byte transport codes // Route type 0 (TRANSPORT_FLOOD) and 3 (TRANSPORT_DIRECT) should have 4-byte transport codes
// Route type 0: header byte = 0bPPPPPP00, e.g. 0x14 = payloadType 5 (GRP_TXT), routeType 0 // Route type 0: header=0x14 = payloadType 5 (GRP_TXT), routeType 0 (TRANSPORT_FLOOD)
const hex = '1400' + 'AABB' + 'CCDD' + '1A' + '00'.repeat(10); // transport codes + GRP_TXT payload // Format: header(1) + transportCodes(4) + pathByte(1) + payload
const hex = '14' + 'AABB' + 'CCDD' + '00' + '1A' + '00'.repeat(10); // transport codes + pathByte + GRP_TXT payload
const p = decodePacket(hex); const p = decodePacket(hex);
assertEq(p.header.routeType, 0, 'transport: routeType=0 (TRANSPORT_FLOOD)'); assertEq(p.header.routeType, 0, 'transport: routeType=0 (TRANSPORT_FLOOD)');
assert(p.transportCodes !== null, 'transport: transportCodes present for TRANSPORT_FLOOD'); assert(p.transportCodes !== null, 'transport: transportCodes present for TRANSPORT_FLOOD');
assertEq(p.transportCodes.nextHop, 'AABB', 'transport: nextHop'); assertEq(p.transportCodes.code1, 'AABB', 'transport: code1');
assertEq(p.transportCodes.lastHop, 'CCDD', 'transport: lastHop'); assertEq(p.transportCodes.code2, 'CCDD', 'transport: code2');
} }
{ {
@@ -257,13 +258,13 @@ console.log('── Spec Tests: Advert Payload ──');
console.log('── Spec Tests: Encrypted Payload Format ──'); console.log('── Spec Tests: Encrypted Payload Format ──');
// NOTE: Spec says v1 encrypted payloads have dest(1) + src(1) + MAC(2) + ciphertext // Spec says v1 encrypted payloads: dest(1)+src(1)+MAC(2)+cipher — decoder matches this.
// But decoder reads dest(6) + src(6) + MAC(4) + ciphertext
// This is a known discrepancy — the decoder matches production behavior, not the spec.
// The spec may describe the firmware's internal addressing while the OTA format differs,
// or the decoder may be parsing the fields differently. Production data validates the decoder.
{ {
note('Spec says v1 encrypted payloads: dest(1)+src(1)+MAC(2)+cipher, but decoder reads dest(6)+src(6)+MAC(4)+cipher — decoder matches prod data'); const hex = '0100' + 'AA' + 'BB' + 'CCDD' + '00'.repeat(10);
const p = decodePacket(hex);
assertEq(p.payload.destHash, 'aa', 'encrypted payload: dest is 1 byte');
assertEq(p.payload.srcHash, 'bb', 'encrypted payload: src is 1 byte');
assertEq(p.payload.mac, 'ccdd', 'encrypted payload: MAC is 2 bytes');
} }
console.log('── Spec Tests: validateAdvert ──'); console.log('── Spec Tests: validateAdvert ──');

View File

@@ -28,22 +28,22 @@ test('FLOOD + ADVERT = 0x11', () => {
}); });
test('TRANSPORT_FLOOD = routeType 0', () => { test('TRANSPORT_FLOOD = routeType 0', () => {
// 0x00 = TRANSPORT_FLOOD + REQ(0), needs transport codes + 16 byte payload // header=0x00 (TRANSPORT_FLOOD + REQ), transportCodes=AABB+CCDD, pathByte=0x00, payload
const hex = '0000' + 'AABB' + 'CCDD' + '00'.repeat(16); const hex = '00' + 'AABB' + 'CCDD' + '00' + '00'.repeat(16);
const p = decodePacket(hex); const p = decodePacket(hex);
assert.strictEqual(p.header.routeType, 0); assert.strictEqual(p.header.routeType, 0);
assert.strictEqual(p.header.routeTypeName, 'TRANSPORT_FLOOD'); assert.strictEqual(p.header.routeTypeName, 'TRANSPORT_FLOOD');
assert.notStrictEqual(p.transportCodes, null); assert.notStrictEqual(p.transportCodes, null);
assert.strictEqual(p.transportCodes.nextHop, 'AABB'); assert.strictEqual(p.transportCodes.code1, 'AABB');
assert.strictEqual(p.transportCodes.lastHop, 'CCDD'); assert.strictEqual(p.transportCodes.code2, 'CCDD');
}); });
test('TRANSPORT_DIRECT = routeType 3', () => { test('TRANSPORT_DIRECT = routeType 3', () => {
const hex = '0300' + '1122' + '3344' + '00'.repeat(16); const hex = '03' + '1122' + '3344' + '00' + '00'.repeat(16);
const p = decodePacket(hex); const p = decodePacket(hex);
assert.strictEqual(p.header.routeType, 3); assert.strictEqual(p.header.routeType, 3);
assert.strictEqual(p.header.routeTypeName, 'TRANSPORT_DIRECT'); assert.strictEqual(p.header.routeTypeName, 'TRANSPORT_DIRECT');
assert.strictEqual(p.transportCodes.nextHop, '1122'); assert.strictEqual(p.transportCodes.code1, '1122');
}); });
test('DIRECT = routeType 2, no transport codes', () => { test('DIRECT = routeType 2, no transport codes', () => {
@@ -358,9 +358,7 @@ test('ACK decode', () => {
const hex = '0D00' + '00'.repeat(18); const hex = '0D00' + '00'.repeat(18);
const p = decodePacket(hex); const p = decodePacket(hex);
assert.strictEqual(p.payload.type, 'ACK'); assert.strictEqual(p.payload.type, 'ACK');
assert(p.payload.destHash); assert(p.payload.ackChecksum);
assert(p.payload.srcHash);
assert(p.payload.extraHash);
}); });
test('ACK too short', () => { test('ACK too short', () => {
@@ -424,9 +422,9 @@ test('TRACE decode', () => {
const hex = '2500' + '00'.repeat(12); const hex = '2500' + '00'.repeat(12);
const p = decodePacket(hex); const p = decodePacket(hex);
assert.strictEqual(p.payload.type, 'TRACE'); assert.strictEqual(p.payload.type, 'TRACE');
assert.strictEqual(p.payload.flags, 0);
assert(p.payload.tag !== undefined); assert(p.payload.tag !== undefined);
assert(p.payload.destHash); assert(p.payload.authCode !== undefined);
assert.strictEqual(p.payload.flags, 0);
}); });
test('TRACE too short', () => { test('TRACE too short', () => {
@@ -460,16 +458,18 @@ test('Transport route too short throws', () => {
assert.throws(() => decodePacket('0000'), /too short for transport/); assert.throws(() => decodePacket('0000'), /too short for transport/);
}); });
test('Corrupt packet #183 — path overflow capped to buffer', () => { test('Corrupt packet #183 — TRANSPORT_DIRECT with correct field order', () => {
const hex = 'BBAD6797EC8751D500BF95A1A776EF580E665BCBF6A0BBE03B5E730707C53489B8C728FD3FB902397197E1263CEC21E52465362243685DBBAD6797EC8751C90A75D9FD8213155D'; const hex = 'BBAD6797EC8751D500BF95A1A776EF580E665BCBF6A0BBE03B5E730707C53489B8C728FD3FB902397197E1263CEC21E52465362243685DBBAD6797EC8751C90A75D9FD8213155D';
const p = decodePacket(hex); const p = decodePacket(hex);
assert.strictEqual(p.header.routeType, 3, 'routeType should be TRANSPORT_DIRECT'); assert.strictEqual(p.header.routeType, 3, 'routeType should be TRANSPORT_DIRECT');
assert.strictEqual(p.header.payloadTypeName, 'UNKNOWN'); assert.strictEqual(p.header.payloadTypeName, 'UNKNOWN');
// pathByte 0xAD claims 45 hops × 3 bytes = 135, but only 65 bytes available // transport codes are bytes 1-4, pathByte=0x87 at byte 5
assert.strictEqual(p.transportCodes.code1, 'AD67');
assert.strictEqual(p.transportCodes.code2, '97EC');
// pathByte 0x87: hashSize=3, hashCount=7
assert.strictEqual(p.path.hashSize, 3); assert.strictEqual(p.path.hashSize, 3);
assert.strictEqual(p.path.hashCount, 21, 'hashCount capped to fit buffer'); assert.strictEqual(p.path.hashCount, 7);
assert.strictEqual(p.path.hops.length, 21); assert.strictEqual(p.path.hops.length, 7);
assert.strictEqual(p.path.truncated, true);
// No empty strings in hops // No empty strings in hops
assert(p.path.hops.every(h => h.length > 0), 'no empty hops'); assert(p.path.hops.every(h => h.length > 0), 'no empty hops');
}); });

View File

@@ -1254,6 +1254,24 @@ seedTestData();
lastPathSeenMap.delete(liveNode); lastPathSeenMap.delete(liveNode);
}); });
// ── Cache hit rate includes stale hits ──
await t('Cache hitRate includes staleHits in formula', async () => {
cache.clear();
cache.hits = 0;
cache.misses = 0;
cache.staleHits = 0;
// Simulate: 3 hits, 2 stale hits, 5 misses => rate = (3+2)/(3+2+5) = 50%
cache.hits = 3;
cache.staleHits = 2;
cache.misses = 5;
const r = await request(app).get('/api/health').expect(200);
assert(r.body.cache.hitRate === 50, 'hitRate should be (hits+staleHits)/(hits+staleHits+misses) = 50%, got ' + r.body.cache.hitRate);
// Reset
cache.hits = 0;
cache.misses = 0;
cache.staleHits = 0;
});
// ── Summary ── // ── Summary ──
console.log(`\n═══ Server Route Tests: ${passed} passed, ${failed} failed ═══`); console.log(`\n═══ Server Route Tests: ${passed} passed, ${failed} failed ═══`);
if (failed > 0) process.exit(1); if (failed > 0) process.exit(1);