feat: validate advert signatures on ingest, reject corrupt packets (#794)

## Summary

Validates ed25519 signatures on ADVERT packets during MQTT ingest.
Packets with invalid signatures are rejected before storage, preventing
corrupt/truncated adverts from polluting the database.

## Changes

### Ingestor (`cmd/ingestor/`)

- **Signature validation on ingest**: After decoding an ADVERT, checks
`SignatureValid` from the decoder. Invalid signatures → packet dropped,
never stored.
- **Config flag**: `validateSignatures` (default `true`). Set to `false`
to disable validation for backward compatibility with existing installs.
- **`dropped_packets` table**: New SQLite table recording every rejected
packet with full attribution:
- `hash`, `raw_hex`, `reason`, `observer_id`, `observer_name`,
`node_pubkey`, `node_name`, `dropped_at`
  - Indexed on `observer_id` and `node_pubkey` for investigation queries
- **`SignatureDrops` counter**: New atomic counter in `DBStats`, logged
in periodic stats output as `sig_drops=N`
- **Retention**: `dropped_packets` pruned alongside metrics on the same
`retention.metricsDays` schedule

### Server (`cmd/server/`)

- **`GET /api/dropped-packets`** (API key required): Returns recent
drops with optional `?observer=` and `?pubkey=` filters, `?limit=`
(default 100, max 500)
- **`signatureDrops`** field added to `/api/stats` response (count from
`dropped_packets` table)

### Tests (8 new)

| Test | What it verifies |
|------|-----------------|
| `TestSigValidation_ValidAdvertStored` | Valid advert passes validation
and is stored |
| `TestSigValidation_TamperedSignatureDropped` | Tampered signature →
dropped, recorded in `dropped_packets` with correct fields |
| `TestSigValidation_TruncatedAppdataDropped` | Truncated appdata
invalidates signature → dropped |
| `TestSigValidation_DisabledByConfig` | `validateSignatures: false`
skips validation, stores tampered packet |
| `TestSigValidation_DropCounterIncrements` | Counter increments
correctly across multiple drops |
| `TestSigValidation_LogContainsFields` | `dropped_packets` row contains
hash, reason, observer, pubkey, name |
| `TestPruneDroppedPackets` | Old entries pruned, recent entries
retained |
| `TestShouldValidateSignatures_Default` | Config helper returns correct
defaults |

### Config example

```json
{
  "validateSignatures": true
}
```

Fixes #793

---------

Co-authored-by: you <you@example.com>
This commit is contained in:
Kpa-clawbot
2026-04-18 11:39:13 -07:00
committed by GitHub
parent d596becca3
commit bf674ebfa2
10 changed files with 605 additions and 67 deletions
+10 -1
View File
@@ -39,7 +39,8 @@ type Config struct {
HashChannels []string `json:"hashChannels,omitempty"`
Retention *RetentionConfig `json:"retention,omitempty"`
Metrics *MetricsConfig `json:"metrics,omitempty"`
GeoFilter *GeoFilterConfig `json:"geo_filter,omitempty"`
GeoFilter *GeoFilterConfig `json:"geo_filter,omitempty"`
ValidateSignatures *bool `json:"validateSignatures,omitempty"`
}
// GeoFilterConfig is an alias for the shared geofilter.Config type.
@@ -57,6 +58,14 @@ type MetricsConfig struct {
SampleIntervalSec int `json:"sampleIntervalSec"`
}
// ShouldValidateSignatures returns true (default) unless explicitly disabled.
func (c *Config) ShouldValidateSignatures() bool {
if c.ValidateSignatures != nil {
return *c.ValidateSignatures
}
return true
}
// MetricsSampleInterval returns the configured sample interval or 300s default.
func (c *Config) MetricsSampleInterval() int {
if c.Metrics != nil && c.Metrics.SampleIntervalSec > 0 {
+19 -19
View File
@@ -158,7 +158,7 @@ func TestHandleMessageChannelMessage(t *testing.T) {
payload := []byte(`{"text":"Alice: Hello everyone","channel_idx":3,"SNR":5.0,"RSSI":-95,"score":10,"direction":"rx","sender_timestamp":1700000000}`)
msg := &mockMessage{topic: "meshcore/message/channel/2", payload: payload}
handleMessage(store, "test", source, msg, nil, nil)
handleMessage(store, "test", source, msg, nil, &Config{})
var count int
if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil {
@@ -218,7 +218,7 @@ func TestHandleMessageChannelMessageEmptyText(t *testing.T) {
store, source := newTestContext(t)
msg := &mockMessage{topic: "meshcore/message/channel/1", payload: []byte(`{"text":""}`)}
handleMessage(store, "test", source, msg, nil, nil)
handleMessage(store, "test", source, msg, nil, &Config{})
var count int
if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil {
@@ -233,7 +233,7 @@ func TestHandleMessageChannelNoSender(t *testing.T) {
store, source := newTestContext(t)
msg := &mockMessage{topic: "meshcore/message/channel/1", payload: []byte(`{"text":"no sender here"}`)}
handleMessage(store, "test", source, msg, nil, nil)
handleMessage(store, "test", source, msg, nil, &Config{})
var count int
if err := store.db.QueryRow("SELECT COUNT(*) FROM nodes").Scan(&count); err != nil {
@@ -250,7 +250,7 @@ func TestHandleMessageDirectMessage(t *testing.T) {
payload := []byte(`{"text":"Bob: Hey there","sender_timestamp":1700000000,"SNR":3.0,"rssi":-100,"Score":8,"Direction":"tx"}`)
msg := &mockMessage{topic: "meshcore/message/direct/abc123", payload: payload}
handleMessage(store, "test", source, msg, nil, nil)
handleMessage(store, "test", source, msg, nil, &Config{})
var count int
if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil {
@@ -294,7 +294,7 @@ func TestHandleMessageDirectMessageEmptyText(t *testing.T) {
store, source := newTestContext(t)
msg := &mockMessage{topic: "meshcore/message/direct/abc", payload: []byte(`{"text":""}`)}
handleMessage(store, "test", source, msg, nil, nil)
handleMessage(store, "test", source, msg, nil, &Config{})
var count int
if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil {
@@ -309,7 +309,7 @@ func TestHandleMessageDirectNoSender(t *testing.T) {
store, source := newTestContext(t)
msg := &mockMessage{topic: "meshcore/message/direct/xyz", payload: []byte(`{"text":"message with no colon"}`)}
handleMessage(store, "test", source, msg, nil, nil)
handleMessage(store, "test", source, msg, nil, &Config{})
var count int
if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil {
@@ -328,7 +328,7 @@ func TestHandleMessageUppercaseScoreDirection(t *testing.T) {
payload := []byte(`{"raw":"` + rawHex + `","Score":9.0,"Direction":"tx"}`)
msg := &mockMessage{topic: "meshcore/SJC/obs1/packets", payload: payload}
handleMessage(store, "test", source, msg, nil, nil)
handleMessage(store, "test", source, msg, nil, &Config{})
var score *float64
var direction *string
@@ -349,7 +349,7 @@ func TestHandleMessageChannelLowercaseFields(t *testing.T) {
payload := []byte(`{"text":"Test: msg","snr":3.0,"rssi":-90,"Score":5,"Direction":"rx"}`)
msg := &mockMessage{topic: "meshcore/message/channel/0", payload: payload}
handleMessage(store, "test", source, msg, nil, nil)
handleMessage(store, "test", source, msg, nil, &Config{})
var count int
if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil {
@@ -365,7 +365,7 @@ func TestHandleMessageDirectLowercaseFields(t *testing.T) {
payload := []byte(`{"text":"Test: msg","snr":2.0,"rssi":-85,"score":7,"direction":"tx"}`)
msg := &mockMessage{topic: "meshcore/message/direct/xyz", payload: payload}
handleMessage(store, "test", source, msg, nil, nil)
handleMessage(store, "test", source, msg, nil, &Config{})
var count int
if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil {
@@ -388,7 +388,7 @@ func TestHandleMessageAdvertWithTelemetry(t *testing.T) {
payload: []byte(`{"raw":"` + rawHex + `"}`),
}
handleMessage(store, "test", source, msg, nil, nil)
handleMessage(store, "test", source, msg, nil, &Config{})
// Should have created transmission, node, and observer
var txCount, nodeCount, obsCount int
@@ -428,7 +428,7 @@ func TestHandleMessageAdvertGeoFiltered(t *testing.T) {
topic: "meshcore/SJC/obs1/packets",
payload: []byte(`{"raw":"` + rawHex + `"}`),
}
handleMessage(store, "test", source, msg, nil, gf)
handleMessage(store, "test", source, msg, nil, &Config{GeoFilter: gf})
// Geo-filtered adverts should not create nodes
var nodeCount int
@@ -665,7 +665,7 @@ func TestHandleMessageCorruptedAdvertNoNode(t *testing.T) {
topic: "meshcore/SJC/obs1/packets",
payload: []byte(`{"raw":"` + rawHex + `"}`),
}
handleMessage(store, "test", source, msg, nil, nil)
handleMessage(store, "test", source, msg, nil, &Config{})
var count int
if err := store.db.QueryRow("SELECT COUNT(*) FROM nodes").Scan(&count); err != nil {
@@ -687,7 +687,7 @@ func TestHandleMessageNonAdvertPacket(t *testing.T) {
topic: "meshcore/SJC/obs1/packets",
payload: []byte(`{"raw":"` + rawHex + `"}`),
}
handleMessage(store, "test", source, msg, nil, nil)
handleMessage(store, "test", source, msg, nil, &Config{})
var count int
if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil {
@@ -864,7 +864,7 @@ func TestHandleMessageChannelLongSender(t *testing.T) {
longText := "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA: msg"
payload := []byte(`{"text":"` + longText + `"}`)
msg := &mockMessage{topic: "meshcore/message/channel/1", payload: payload}
handleMessage(store, "test", source, msg, nil, nil)
handleMessage(store, "test", source, msg, nil, &Config{})
var count int
if err := store.db.QueryRow("SELECT COUNT(*) FROM nodes").Scan(&count); err != nil {
@@ -883,7 +883,7 @@ func TestHandleMessageDirectLongSender(t *testing.T) {
longText := "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB: msg"
payload := []byte(`{"text":"` + longText + `"}`)
msg := &mockMessage{topic: "meshcore/message/direct/abc", payload: payload}
handleMessage(store, "test", source, msg, nil, nil)
handleMessage(store, "test", source, msg, nil, &Config{})
var count int
if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil {
@@ -900,7 +900,7 @@ func TestHandleMessageDirectUppercaseScoreDirection(t *testing.T) {
payload := []byte(`{"text":"X: hi","Score":6,"Direction":"rx"}`)
msg := &mockMessage{topic: "meshcore/message/direct/d1", payload: payload}
handleMessage(store, "test", source, msg, nil, nil)
handleMessage(store, "test", source, msg, nil, &Config{})
var count int
if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil {
@@ -930,7 +930,7 @@ func TestHandleMessageChannelUppercaseScoreDirection(t *testing.T) {
payload := []byte(`{"text":"Y: hi","Score":4,"Direction":"tx"}`)
msg := &mockMessage{topic: "meshcore/message/channel/5", payload: payload}
handleMessage(store, "test", source, msg, nil, nil)
handleMessage(store, "test", source, msg, nil, &Config{})
var count int
if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil {
@@ -961,7 +961,7 @@ func TestHandleMessageRawLowercaseScore(t *testing.T) {
rawHex := "0A00D69FD7A5A7475DB07337749AE61FA53A4788E976"
payload := []byte(`{"raw":"` + rawHex + `","score":3.5}`)
msg := &mockMessage{topic: "meshcore/SJC/obs1/packets", payload: payload}
handleMessage(store, "test", source, msg, nil, nil)
handleMessage(store, "test", source, msg, nil, &Config{})
var score *float64
if err := store.db.QueryRow("SELECT score FROM observations LIMIT 1").Scan(&score); err != nil {
@@ -980,7 +980,7 @@ func TestHandleMessageStatusNoOrigin(t *testing.T) {
topic: "meshcore/LAX/obs5/status",
payload: []byte(`{"model":"L1"}`),
}
handleMessage(store, "test", source, msg, nil, nil)
handleMessage(store, "test", source, msg, nil, &Config{})
var count int
if err := store.db.QueryRow("SELECT COUNT(*) FROM observers WHERE id = 'obs5'").Scan(&count); err != nil {
+71 -1
View File
@@ -22,6 +22,7 @@ type DBStats struct {
NodeUpserts atomic.Int64
ObserverUpserts atomic.Int64
WriteErrors atomic.Int64
SignatureDrops atomic.Int64
}
// Store wraps the SQLite database for packet ingestion.
@@ -381,6 +382,32 @@ func applySchema(db *sql.DB) error {
log.Println("[migration] channel_hash column added and backfilled")
}
// Migration: dropped_packets table for signature validation failures (#793)
row = db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'dropped_packets_v1'")
if row.Scan(&migDone) != nil {
log.Println("[migration] Creating dropped_packets table...")
_, err := db.Exec(`
CREATE TABLE IF NOT EXISTS dropped_packets (
id INTEGER PRIMARY KEY AUTOINCREMENT,
hash TEXT,
raw_hex TEXT,
reason TEXT NOT NULL,
observer_id TEXT,
observer_name TEXT,
node_pubkey TEXT,
node_name TEXT,
dropped_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_dropped_observer ON dropped_packets(observer_id);
CREATE INDEX IF NOT EXISTS idx_dropped_node ON dropped_packets(node_pubkey);
`)
if err != nil {
return fmt.Errorf("dropped_packets schema: %w", err)
}
db.Exec(`INSERT INTO _migrations (name) VALUES ('dropped_packets_v1')`)
log.Println("[migration] dropped_packets table created")
}
return nil
}
@@ -758,13 +785,14 @@ func (s *Store) Checkpoint() {
// LogStats logs current operational metrics.
func (s *Store) LogStats() {
log.Printf("[stats] tx_inserted=%d tx_dupes=%d obs_inserted=%d node_upserts=%d observer_upserts=%d write_errors=%d",
log.Printf("[stats] tx_inserted=%d tx_dupes=%d obs_inserted=%d node_upserts=%d observer_upserts=%d write_errors=%d sig_drops=%d",
s.Stats.TransmissionsInserted.Load(),
s.Stats.DuplicateTransmissions.Load(),
s.Stats.ObservationsInserted.Load(),
s.Stats.NodeUpserts.Load(),
s.Stats.ObserverUpserts.Load(),
s.Stats.WriteErrors.Load(),
s.Stats.SignatureDrops.Load(),
)
}
@@ -819,6 +847,48 @@ func (s *Store) RemoveStaleObservers(observerDays int) (int64, error) {
return removed, nil
}
// DroppedPacket holds data for a packet rejected during ingest.
type DroppedPacket struct {
Hash string
RawHex string
Reason string
ObserverID string
ObserverName string
NodePubKey string
NodeName string
}
// InsertDroppedPacket records a rejected packet in the dropped_packets table.
func (s *Store) InsertDroppedPacket(dp *DroppedPacket) error {
_, err := s.db.Exec(
`INSERT INTO dropped_packets (hash, raw_hex, reason, observer_id, observer_name, node_pubkey, node_name) VALUES (?, ?, ?, ?, ?, ?, ?)`,
dp.Hash, dp.RawHex, dp.Reason, dp.ObserverID, dp.ObserverName, dp.NodePubKey, dp.NodeName,
)
if err != nil {
s.Stats.WriteErrors.Add(1)
return fmt.Errorf("insert dropped packet: %w", err)
}
s.Stats.SignatureDrops.Add(1)
return nil
}
// PruneDroppedPackets removes dropped_packets older than retentionDays.
func (s *Store) PruneDroppedPackets(retentionDays int) (int64, error) {
if retentionDays <= 0 {
return 0, nil
}
cutoff := time.Now().UTC().AddDate(0, 0, -retentionDays).Format(time.RFC3339)
result, err := s.db.Exec(`DELETE FROM dropped_packets WHERE dropped_at < ?`, cutoff)
if err != nil {
return 0, fmt.Errorf("prune dropped packets: %w", err)
}
n, _ := result.RowsAffected()
if n > 0 {
log.Printf("Pruned %d dropped packet(s) older than %d days", n, retentionDays)
}
return n, nil
}
// PacketData holds the data needed to insert a packet into the DB.
type PacketData struct {
RawHex string
+27 -4
View File
@@ -68,6 +68,7 @@ func main() {
// Metrics retention: prune old metrics on startup
metricsDays := cfg.MetricsRetentionDays()
store.PruneOldMetrics(metricsDays)
store.PruneDroppedPackets(metricsDays)
// Daily ticker for node retention
retentionTicker := time.NewTicker(1 * time.Hour)
@@ -92,6 +93,7 @@ func main() {
go func() {
for range metricsRetentionTicker.C {
store.PruneOldMetrics(metricsDays)
store.PruneDroppedPackets(metricsDays)
}
}()
@@ -160,7 +162,7 @@ func main() {
// Capture source for closure
src := source
opts.SetDefaultPublishHandler(func(c mqtt.Client, m mqtt.Message) {
handleMessage(store, tag, src, m, channelKeys, cfg.GeoFilter)
handleMessage(store, tag, src, m, channelKeys, cfg)
})
client := mqtt.NewClient(opts)
@@ -195,7 +197,7 @@ func main() {
log.Println("Done.")
}
func handleMessage(store *Store, tag string, source MQTTSource, m mqtt.Message, channelKeys map[string]string, geoFilter *GeoFilterConfig) {
func handleMessage(store *Store, tag string, source MQTTSource, m mqtt.Message, channelKeys map[string]string, cfg *Config) {
defer func() {
if r := recover(); r != nil {
log.Printf("MQTT [%s] panic in handler: %v", tag, r)
@@ -262,7 +264,8 @@ func handleMessage(store *Store, tag string, source MQTTSource, m mqtt.Message,
// Format 1: Raw packet (meshcoretomqtt / Cisien format)
rawHex, _ := msg["raw"].(string)
if rawHex != "" {
decoded, err := DecodePacket(rawHex, channelKeys, false)
validateSigs := cfg.ShouldValidateSignatures()
decoded, err := DecodePacket(rawHex, channelKeys, validateSigs)
if err != nil {
log.Printf("MQTT [%s] decode error: %v", tag, err)
return
@@ -322,7 +325,27 @@ func handleMessage(store *Store, tag string, source MQTTSource, m mqtt.Message,
log.Printf("MQTT [%s] skipping corrupted ADVERT: %s", tag, reason)
return
}
if !NodePassesGeoFilter(decoded.Payload.Lat, decoded.Payload.Lon, geoFilter) {
// Signature validation: drop adverts with invalid ed25519 signatures
if validateSigs && decoded.Payload.SignatureValid != nil && !*decoded.Payload.SignatureValid {
hash := ComputeContentHash(rawHex)
truncPK := decoded.Payload.PubKey
if len(truncPK) > 16 {
truncPK = truncPK[:16]
}
log.Printf("MQTT [%s] DROPPED invalid signature: hash=%s name=%s observer=%s pubkey=%s",
tag, hash, decoded.Payload.Name, firstNonEmpty(mqttMsg.Origin, observerID), truncPK)
store.InsertDroppedPacket(&DroppedPacket{
Hash: hash,
RawHex: rawHex,
Reason: "invalid signature",
ObserverID: observerID,
ObserverName: mqttMsg.Origin,
NodePubKey: decoded.Payload.PubKey,
NodeName: decoded.Payload.Name,
})
return
}
if !NodePassesGeoFilter(decoded.Payload.Lat, decoded.Payload.Lon, cfg.GeoFilter) {
return
}
pktData := BuildPacketData(mqttMsg, decoded, observerID, region)
+22 -22
View File
@@ -130,7 +130,7 @@ func TestHandleMessageRawPacket(t *testing.T) {
payload := []byte(`{"raw":"` + rawHex + `","SNR":5.5,"RSSI":-100.0,"origin":"myobs"}`)
msg := &mockMessage{topic: "meshcore/SJC/obs1/packets", payload: payload}
handleMessage(store, "test", source, msg, nil, nil)
handleMessage(store, "test", source, msg, nil, &Config{})
var count int
store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count)
@@ -147,7 +147,7 @@ func TestHandleMessageRawPacketAdvert(t *testing.T) {
payload := []byte(`{"raw":"` + rawHex + `"}`)
msg := &mockMessage{topic: "meshcore/SJC/obs1/packets", payload: payload}
handleMessage(store, "test", source, msg, nil, nil)
handleMessage(store, "test", source, msg, nil, &Config{})
// Should create a node from the ADVERT
var count int
@@ -169,7 +169,7 @@ func TestHandleMessageInvalidJSON(t *testing.T) {
msg := &mockMessage{topic: "meshcore/SJC/obs1/packets", payload: []byte(`not json`)}
// Should not panic
handleMessage(store, "test", source, msg, nil, nil)
handleMessage(store, "test", source, msg, nil, &Config{})
var count int
store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count)
@@ -186,7 +186,7 @@ func TestHandleMessageStatusTopic(t *testing.T) {
payload: []byte(`{"origin":"MyObserver"}`),
}
handleMessage(store, "test", source, msg, nil, nil)
handleMessage(store, "test", source, msg, nil, &Config{})
var name, iata string
err := store.db.QueryRow("SELECT name, iata FROM observers WHERE id = 'obs1'").Scan(&name, &iata)
@@ -207,11 +207,11 @@ func TestHandleMessageSkipStatusTopics(t *testing.T) {
// meshcore/status should be skipped
msg1 := &mockMessage{topic: "meshcore/status", payload: []byte(`{"raw":"0A00"}`)}
handleMessage(store, "test", source, msg1, nil, nil)
handleMessage(store, "test", source, msg1, nil, &Config{})
// meshcore/events/connection should be skipped
msg2 := &mockMessage{topic: "meshcore/events/connection", payload: []byte(`{"raw":"0A00"}`)}
handleMessage(store, "test", source, msg2, nil, nil)
handleMessage(store, "test", source, msg2, nil, &Config{})
var count int
store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count)
@@ -230,7 +230,7 @@ func TestHandleMessageIATAFilter(t *testing.T) {
topic: "meshcore/SJC/obs1/packets",
payload: []byte(`{"raw":"` + rawHex + `"}`),
}
handleMessage(store, "test", source, msg, nil, nil)
handleMessage(store, "test", source, msg, nil, &Config{})
var count int
store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count)
@@ -243,7 +243,7 @@ func TestHandleMessageIATAFilter(t *testing.T) {
topic: "meshcore/LAX/obs2/packets",
payload: []byte(`{"raw":"` + rawHex + `"}`),
}
handleMessage(store, "test", source, msg2, nil, nil)
handleMessage(store, "test", source, msg2, nil, &Config{})
store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count)
if count != 1 {
@@ -261,7 +261,7 @@ func TestHandleMessageIATAFilterNoRegion(t *testing.T) {
topic: "meshcore",
payload: []byte(`{"raw":"` + rawHex + `"}`),
}
handleMessage(store, "test", source, msg, nil, nil)
handleMessage(store, "test", source, msg, nil, &Config{})
// No region part → filter doesn't apply, message goes through
// Actually the code checks len(parts) > 1 for IATA filter
@@ -277,7 +277,7 @@ func TestHandleMessageNoRawHex(t *testing.T) {
topic: "meshcore/SJC/obs1/packets",
payload: []byte(`{"type":"companion","data":"something"}`),
}
handleMessage(store, "test", source, msg, nil, nil)
handleMessage(store, "test", source, msg, nil, &Config{})
var count int
store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count)
@@ -295,7 +295,7 @@ func TestHandleMessageBadRawHex(t *testing.T) {
topic: "meshcore/SJC/obs1/packets",
payload: []byte(`{"raw":"ZZZZ"}`),
}
handleMessage(store, "test", source, msg, nil, nil)
handleMessage(store, "test", source, msg, nil, &Config{})
var count int
store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count)
@@ -312,7 +312,7 @@ func TestHandleMessageWithSNRRSSIAsNumbers(t *testing.T) {
payload := []byte(`{"raw":"` + rawHex + `","SNR":7.2,"RSSI":-95}`)
msg := &mockMessage{topic: "meshcore/SJC/obs1/packets", payload: payload}
handleMessage(store, "test", source, msg, nil, nil)
handleMessage(store, "test", source, msg, nil, &Config{})
var snr, rssi *float64
store.db.QueryRow("SELECT snr, rssi FROM observations LIMIT 1").Scan(&snr, &rssi)
@@ -331,7 +331,7 @@ func TestHandleMessageMinimalTopic(t *testing.T) {
topic: "meshcore/SJC",
payload: []byte(`{"raw":"` + rawHex + `"}`),
}
handleMessage(store, "test", source, msg, nil, nil)
handleMessage(store, "test", source, msg, nil, &Config{})
var count int
store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count)
@@ -352,7 +352,7 @@ func TestHandleMessageCorruptedAdvert(t *testing.T) {
topic: "meshcore/SJC/obs1/packets",
payload: []byte(`{"raw":"` + rawHex + `"}`),
}
handleMessage(store, "test", source, msg, nil, nil)
handleMessage(store, "test", source, msg, nil, &Config{})
// Transmission should be inserted (even if advert is invalid)
var count int
@@ -378,7 +378,7 @@ func TestHandleMessageNoObserverID(t *testing.T) {
topic: "packets",
payload: []byte(`{"raw":"` + rawHex + `","origin":"obs1"}`),
}
handleMessage(store, "test", source, msg, nil, nil)
handleMessage(store, "test", source, msg, nil, &Config{})
var count int
store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count)
@@ -400,7 +400,7 @@ func TestHandleMessageSNRNotFloat(t *testing.T) {
// SNR as a string value — should not parse as float
payload := []byte(`{"raw":"` + rawHex + `","SNR":"bad","RSSI":"bad"}`)
msg := &mockMessage{topic: "meshcore/SJC/obs1/packets", payload: payload}
handleMessage(store, "test", source, msg, nil, nil)
handleMessage(store, "test", source, msg, nil, &Config{})
var count int
store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count)
@@ -416,7 +416,7 @@ func TestHandleMessageOriginExtraction(t *testing.T) {
rawHex := "0A00D69FD7A5A7475DB07337749AE61FA53A4788E976"
payload := []byte(`{"raw":"` + rawHex + `","origin":"MyOrigin"}`)
msg := &mockMessage{topic: "meshcore/SJC/obs1/packets", payload: payload}
handleMessage(store, "test", source, msg, nil, nil)
handleMessage(store, "test", source, msg, nil, &Config{})
// Verify origin was extracted to observer name
var name string
@@ -439,7 +439,7 @@ func TestHandleMessagePanicRecovery(t *testing.T) {
}
// Should not panic — the defer/recover should catch it
handleMessage(store, "test", source, msg, nil, nil)
handleMessage(store, "test", source, msg, nil, &Config{})
}
func TestHandleMessageStatusOriginFallback(t *testing.T) {
@@ -451,7 +451,7 @@ func TestHandleMessageStatusOriginFallback(t *testing.T) {
topic: "meshcore/SJC/obs1/status",
payload: []byte(`{"type":"status"}`),
}
handleMessage(store, "test", source, msg, nil, nil)
handleMessage(store, "test", source, msg, nil, &Config{})
var name string
err := store.db.QueryRow("SELECT name FROM observers WHERE id = 'obs1'").Scan(&name)
@@ -640,7 +640,7 @@ func TestHandleMessageWithLowercaseSNRRSSI(t *testing.T) {
payload := []byte(`{"raw":"` + rawHex + `","snr":5.5,"rssi":-102}`)
msg := &mockMessage{topic: "meshcore/SJC/obs1/packets", payload: payload}
handleMessage(store, "test", source, msg, nil, nil)
handleMessage(store, "test", source, msg, nil, &Config{})
var snr, rssi *float64
store.db.QueryRow("SELECT snr, rssi FROM observations LIMIT 1").Scan(&snr, &rssi)
@@ -661,7 +661,7 @@ func TestHandleMessageSNRRSSIUppercaseWins(t *testing.T) {
payload := []byte(`{"raw":"` + rawHex + `","SNR":7.2,"snr":1.0,"RSSI":-95,"rssi":-50}`)
msg := &mockMessage{topic: "meshcore/SJC/obs1/packets", payload: payload}
handleMessage(store, "test", source, msg, nil, nil)
handleMessage(store, "test", source, msg, nil, &Config{})
var snr, rssi *float64
store.db.QueryRow("SELECT snr, rssi FROM observations LIMIT 1").Scan(&snr, &rssi)
@@ -681,7 +681,7 @@ func TestHandleMessageNoSNRRSSI(t *testing.T) {
payload := []byte(`{"raw":"` + rawHex + `"}`)
msg := &mockMessage{topic: "meshcore/SJC/obs1/packets", payload: payload}
handleMessage(store, "test", source, msg, nil, nil)
handleMessage(store, "test", source, msg, nil, &Config{})
var snr, rssi *float64
store.db.QueryRow("SELECT snr, rssi FROM observations LIMIT 1").Scan(&snr, &rssi)
+339
View File
@@ -0,0 +1,339 @@
package main
import (
"crypto/ed25519"
"encoding/binary"
"encoding/hex"
"strings"
"testing"
)
// buildAdvertHex constructs a full ADVERT packet hex string.
// header(1) + pathByte(1) + pubkey(32) + timestamp(4) + signature(64) + appdata
func buildAdvertHex(pubKey ed25519.PublicKey, privKey ed25519.PrivateKey, timestamp uint32, appdata []byte) string {
// Build signed message: pubkey(32) + timestamp(4 LE) + appdata
msg := make([]byte, 32+4+len(appdata))
copy(msg[0:32], pubKey)
binary.LittleEndian.PutUint32(msg[32:36], timestamp)
copy(msg[36:], appdata)
sig := ed25519.Sign(privKey, msg)
// Payload: pubkey(32) + timestamp(4) + signature(64) + appdata
payload := make([]byte, 0, 100+len(appdata))
payload = append(payload, pubKey...)
ts := make([]byte, 4)
binary.LittleEndian.PutUint32(ts, timestamp)
payload = append(payload, ts...)
payload = append(payload, sig...)
payload = append(payload, appdata...)
// Header: ADVERT (0x04 << 2) | FLOOD (1) = 0x11, pathByte=0 (no hops)
header := byte(0x11)
pathByte := byte(0x00)
pkt := append([]byte{header, pathByte}, payload...)
return hex.EncodeToString(pkt)
}
// makeAppdata builds minimal appdata: flags(1) + name
func makeAppdata(name string) []byte {
flags := byte(0x81) // hasName=true, type=companion(1)
data := []byte{flags}
data = append(data, []byte(name)...)
data = append(data, 0x00) // null terminator
return data
}
func TestSigValidation_ValidAdvertStored(t *testing.T) {
dbPath := t.TempDir() + "/test.db"
store, err := OpenStoreWithInterval(dbPath, 300)
if err != nil {
t.Fatal(err)
}
defer store.Close()
pub, priv, _ := ed25519.GenerateKey(nil)
appdata := makeAppdata("TestNode")
rawHex := buildAdvertHex(pub, priv, 1700000000, appdata)
source := MQTTSource{Name: "test"}
msg := newMockMsg("meshcore/US/obs1/packet", `{"raw":"`+rawHex+`","origin":"TestObs"}`)
cfg := &Config{}
handleMessage(store, "test", source, msg, nil, cfg)
// Verify packet was stored
var count int
store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count)
if count == 0 {
t.Fatal("valid advert should be stored, got 0 transmissions")
}
}
func TestSigValidation_TamperedSignatureDropped(t *testing.T) {
dbPath := t.TempDir() + "/test.db"
store, err := OpenStoreWithInterval(dbPath, 300)
if err != nil {
t.Fatal(err)
}
defer store.Close()
pub, priv, _ := ed25519.GenerateKey(nil)
appdata := makeAppdata("BadNode")
rawHex := buildAdvertHex(pub, priv, 1700000000, appdata)
// Tamper with signature (flip a byte in the signature area)
// Signature starts at offset 2 (header+path) + 32 (pubkey) + 4 (timestamp) = 38
// That's byte 38 in the packet, hex chars 76-77
rawBytes := []byte(rawHex)
if rawBytes[76] == '0' {
rawBytes[76] = 'f'
} else {
rawBytes[76] = '0'
}
tamperedHex := string(rawBytes)
source := MQTTSource{Name: "test"}
msg := newMockMsg("meshcore/US/obs1/packet", `{"raw":"`+tamperedHex+`","origin":"TestObs"}`)
cfg := &Config{}
handleMessage(store, "test", source, msg, nil, cfg)
// Verify packet was NOT stored in transmissions
var txCount int
store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&txCount)
if txCount != 0 {
t.Fatalf("tampered advert should be dropped, got %d transmissions", txCount)
}
// Verify it was recorded in dropped_packets
var dropCount int
store.db.QueryRow("SELECT COUNT(*) FROM dropped_packets").Scan(&dropCount)
if dropCount == 0 {
t.Fatal("tampered advert should be recorded in dropped_packets")
}
// Verify drop counter incremented
if store.Stats.SignatureDrops.Load() != 1 {
t.Fatalf("expected 1 signature drop, got %d", store.Stats.SignatureDrops.Load())
}
// Verify dropped_packets has correct fields
var reason, nodeKey, nodeName, obsID string
store.db.QueryRow("SELECT reason, node_pubkey, node_name, observer_id FROM dropped_packets LIMIT 1").Scan(&reason, &nodeKey, &nodeName, &obsID)
if reason != "invalid signature" {
t.Fatalf("expected reason 'invalid signature', got %q", reason)
}
if nodeKey == "" {
t.Fatal("dropped packet should have node_pubkey")
}
if !strings.Contains(nodeName, "BadNode") {
t.Fatalf("expected node_name to contain 'BadNode', got %q", nodeName)
}
if obsID != "obs1" {
t.Fatalf("expected observer_id 'obs1', got %q", obsID)
}
}
func TestSigValidation_TruncatedAppdataDropped(t *testing.T) {
dbPath := t.TempDir() + "/test.db"
store, err := OpenStoreWithInterval(dbPath, 300)
if err != nil {
t.Fatal(err)
}
defer store.Close()
pub, priv, _ := ed25519.GenerateKey(nil)
appdata := makeAppdata("TruncNode")
rawHex := buildAdvertHex(pub, priv, 1700000000, appdata)
// Sign was computed with full appdata. Now truncate the raw hex to remove
// some appdata bytes, making the signature invalid.
// Truncate last 4 hex chars (2 bytes of appdata)
truncatedHex := rawHex[:len(rawHex)-4]
source := MQTTSource{Name: "test"}
msg := newMockMsg("meshcore/US/obs1/packet", `{"raw":"`+truncatedHex+`","origin":"TestObs"}`)
cfg := &Config{}
handleMessage(store, "test", source, msg, nil, cfg)
var txCount int
store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&txCount)
if txCount != 0 {
t.Fatalf("truncated advert should be dropped, got %d transmissions", txCount)
}
}
func TestSigValidation_DisabledByConfig(t *testing.T) {
dbPath := t.TempDir() + "/test.db"
store, err := OpenStoreWithInterval(dbPath, 300)
if err != nil {
t.Fatal(err)
}
defer store.Close()
pub, priv, _ := ed25519.GenerateKey(nil)
appdata := makeAppdata("NoValNode")
rawHex := buildAdvertHex(pub, priv, 1700000000, appdata)
// Tamper with signature
rawBytes := []byte(rawHex)
if rawBytes[76] == '0' {
rawBytes[76] = 'f'
} else {
rawBytes[76] = '0'
}
tamperedHex := string(rawBytes)
source := MQTTSource{Name: "test"}
msg := newMockMsg("meshcore/US/obs1/packet", `{"raw":"`+tamperedHex+`","origin":"TestObs"}`)
falseVal := false
cfg := &Config{ValidateSignatures: &falseVal}
handleMessage(store, "test", source, msg, nil, cfg)
// With validation disabled, tampered packet should be stored
var txCount int
store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&txCount)
if txCount == 0 {
t.Fatal("with validateSignatures=false, tampered advert should be stored")
}
}
func TestSigValidation_DropCounterIncrements(t *testing.T) {
dbPath := t.TempDir() + "/test.db"
store, err := OpenStoreWithInterval(dbPath, 300)
if err != nil {
t.Fatal(err)
}
defer store.Close()
pub, priv, _ := ed25519.GenerateKey(nil)
source := MQTTSource{Name: "test"}
cfg := &Config{}
for i := 0; i < 3; i++ {
appdata := makeAppdata("Node")
rawHex := buildAdvertHex(pub, priv, uint32(1700000000+i), appdata)
// Tamper
rawBytes := []byte(rawHex)
if rawBytes[76] == '0' {
rawBytes[76] = 'f'
} else {
rawBytes[76] = '0'
}
msg := newMockMsg("meshcore/US/obs1/packet", `{"raw":"`+string(rawBytes)+`","origin":"Obs"}`)
handleMessage(store, "test", source, msg, nil, cfg)
}
if store.Stats.SignatureDrops.Load() != 3 {
t.Fatalf("expected 3 signature drops, got %d", store.Stats.SignatureDrops.Load())
}
}
func TestSigValidation_LogContainsFields(t *testing.T) {
// This test verifies the dropped_packets row has all required fields
dbPath := t.TempDir() + "/test.db"
store, err := OpenStoreWithInterval(dbPath, 300)
if err != nil {
t.Fatal(err)
}
defer store.Close()
pub, priv, _ := ed25519.GenerateKey(nil)
appdata := makeAppdata("LogTestNode")
rawHex := buildAdvertHex(pub, priv, 1700000000, appdata)
// Tamper
rawBytes := []byte(rawHex)
if rawBytes[76] == '0' {
rawBytes[76] = 'f'
} else {
rawBytes[76] = '0'
}
source := MQTTSource{Name: "test"}
msg := newMockMsg("meshcore/US/obs1/packet", `{"raw":"`+string(rawBytes)+`","origin":"MyObserver"}`)
cfg := &Config{}
handleMessage(store, "test", source, msg, nil, cfg)
var hash, reason, obsID, obsName, pubkey, nodeName string
err = store.db.QueryRow("SELECT hash, reason, observer_id, observer_name, node_pubkey, node_name FROM dropped_packets LIMIT 1").
Scan(&hash, &reason, &obsID, &obsName, &pubkey, &nodeName)
if err != nil {
t.Fatal(err)
}
if hash == "" {
t.Error("dropped packet should have hash")
}
if reason != "invalid signature" {
t.Errorf("expected reason 'invalid signature', got %q", reason)
}
if obsID != "obs1" {
t.Errorf("expected observer_id 'obs1', got %q", obsID)
}
if obsName != "MyObserver" {
t.Errorf("expected observer_name 'MyObserver', got %q", obsName)
}
if pubkey == "" {
t.Error("dropped packet should have node_pubkey")
}
if !strings.Contains(nodeName, "LogTestNode") {
t.Errorf("expected node_name containing 'LogTestNode', got %q", nodeName)
}
}
func TestPruneDroppedPackets(t *testing.T) {
dbPath := t.TempDir() + "/test.db"
store, err := OpenStoreWithInterval(dbPath, 300)
if err != nil {
t.Fatal(err)
}
defer store.Close()
// Insert an old dropped packet
store.db.Exec(`INSERT INTO dropped_packets (hash, reason, dropped_at) VALUES ('old', 'test', datetime('now', '-60 days'))`)
store.db.Exec(`INSERT INTO dropped_packets (hash, reason, dropped_at) VALUES ('new', 'test', datetime('now'))`)
n, err := store.PruneDroppedPackets(30)
if err != nil {
t.Fatal(err)
}
if n != 1 {
t.Fatalf("expected 1 pruned, got %d", n)
}
var count int
store.db.QueryRow("SELECT COUNT(*) FROM dropped_packets").Scan(&count)
if count != 1 {
t.Fatalf("expected 1 remaining, got %d", count)
}
}
func TestShouldValidateSignatures_Default(t *testing.T) {
cfg := &Config{}
if !cfg.ShouldValidateSignatures() {
t.Fatal("default should be true")
}
falseVal := false
cfg2 := &Config{ValidateSignatures: &falseVal}
if cfg2.ShouldValidateSignatures() {
t.Fatal("explicit false should be false")
}
trueVal := true
cfg3 := &Config{ValidateSignatures: &trueVal}
if !cfg3.ShouldValidateSignatures() {
t.Fatal("explicit true should be true")
}
}
// newMockMsg creates a minimal mqtt.Message for testing.
func newMockMsg(topic, payload string) *mockMessage {
return &mockMessage{topic: topic, payload: []byte(payload)}
}
+68
View File
@@ -2252,3 +2252,71 @@ func (db *DB) TouchNodeLastSeen(pubkey string, timestamp string) error {
)
return err
}
// GetDroppedPackets returns recently dropped packets, newest first.
func (db *DB) GetDroppedPackets(limit int, observerID, nodePubkey string) ([]map[string]interface{}, error) {
if limit <= 0 || limit > 500 {
limit = 100
}
query := `SELECT id, hash, raw_hex, reason, observer_id, observer_name, node_pubkey, node_name, dropped_at FROM dropped_packets`
var conditions []string
var args []interface{}
if observerID != "" {
conditions = append(conditions, "observer_id = ?")
args = append(args, observerID)
}
if nodePubkey != "" {
conditions = append(conditions, "node_pubkey = ?")
args = append(args, nodePubkey)
}
if len(conditions) > 0 {
query += " WHERE " + strings.Join(conditions, " AND ")
}
query += " ORDER BY dropped_at DESC LIMIT ?"
args = append(args, limit)
rows, err := db.conn.Query(query, args...)
if err != nil {
return nil, err
}
defer rows.Close()
var results []map[string]interface{}
for rows.Next() {
var id int
var hash, rawHex, reason, obsID, obsName, pubkey, name, droppedAt sql.NullString
if err := rows.Scan(&id, &hash, &rawHex, &reason, &obsID, &obsName, &pubkey, &name, &droppedAt); err != nil {
continue
}
row := map[string]interface{}{
"id": id,
"hash": nullStr(hash),
"reason": nullStr(reason),
"observer_id": nullStr(obsID),
"observer_name": nullStr(obsName),
"node_pubkey": nullStr(pubkey),
"node_name": nullStr(name),
"dropped_at": nullStr(droppedAt),
}
// Only include raw_hex if explicitly requested (it's large)
if rawHex.Valid {
row["raw_hex"] = rawHex.String
}
results = append(results, row)
}
if results == nil {
results = []map[string]interface{}{}
}
return results, nil
}
// GetSignatureDropCount returns the total number of dropped packets.
func (db *DB) GetSignatureDropCount() int64 {
var count int64
// Table may not exist yet if ingestor hasn't run the migration
err := db.conn.QueryRow("SELECT COUNT(*) FROM dropped_packets").Scan(&count)
if err != nil {
return 0
}
return count
}
+27 -20
View File
@@ -6,10 +6,17 @@ import (
"fmt"
"strings"
"testing"
"time"
_ "modernc.org/sqlite"
)
// recentTS returns a timestamp string N hours ago, ensuring test data
// stays within the 7-day advert window used by computeNodeHashSizeInfo.
func recentTS(hoursAgo int) string {
return time.Now().UTC().Add(-time.Duration(hoursAgo) * time.Hour).Format("2006-01-02T15:04:05.000Z")
}
// setupCapabilityTestDB creates a minimal in-memory DB with nodes table.
func setupCapabilityTestDB(t *testing.T) *DB {
t.Helper()
@@ -69,7 +76,7 @@ func makeTestAdvert(pubkey string, hashSize int) *StoreTx {
PayloadType: &pt,
DecodedJSON: string(decoded),
PathJSON: `["` + prefix + `"]`,
FirstSeen: "2026-04-11T00:00:00.000Z",
FirstSeen: recentTS(24),
}
}
@@ -80,7 +87,7 @@ func TestMultiByteCapability_Confirmed(t *testing.T) {
defer db.conn.Close()
db.conn.Exec("INSERT INTO nodes (public_key, name, role, last_seen) VALUES (?, ?, ?, ?)",
"aabbccdd11223344", "RepA", "repeater", "2026-04-11T00:00:00Z")
"aabbccdd11223344", "RepA", "repeater", recentTS(24))
store := NewPacketStore(db, nil)
addTestPacket(store, makeTestAdvert("aabbccdd11223344", 2))
@@ -107,7 +114,7 @@ func TestMultiByteCapability_Suspected(t *testing.T) {
defer db.conn.Close()
db.conn.Exec("INSERT INTO nodes (public_key, name, role, last_seen) VALUES (?, ?, ?, ?)",
"aabbccdd11223344", "RepB", "repeater", "2026-04-10T00:00:00Z")
"aabbccdd11223344", "RepB", "repeater", recentTS(48))
store := NewPacketStore(db, nil)
@@ -119,7 +126,7 @@ func TestMultiByteCapability_Suspected(t *testing.T) {
RawHex: rawHex,
PayloadType: &pt,
PathJSON: `["aabb"]`,
FirstSeen: "2026-04-10T00:00:00.000Z",
FirstSeen: recentTS(48),
}
addTestPacket(store, pkt)
@@ -145,7 +152,7 @@ func TestMultiByteCapability_Unknown(t *testing.T) {
defer db.conn.Close()
db.conn.Exec("INSERT INTO nodes (public_key, name, role, last_seen) VALUES (?, ?, ?, ?)",
"aabbccdd11223344", "RepC", "repeater", "2026-04-08T00:00:00Z")
"aabbccdd11223344", "RepC", "repeater", recentTS(72))
store := NewPacketStore(db, nil)
@@ -173,9 +180,9 @@ func TestMultiByteCapability_PrefixCollision(t *testing.T) {
// Two repeaters sharing 1-byte prefix "aa"
db.conn.Exec("INSERT INTO nodes (public_key, name, role, last_seen) VALUES (?, ?, ?, ?)",
"aabb000000000001", "RepConfirmed", "repeater", "2026-04-11T00:00:00Z")
"aabb000000000001", "RepConfirmed", "repeater", recentTS(24))
db.conn.Exec("INSERT INTO nodes (public_key, name, role, last_seen) VALUES (?, ?, ?, ?)",
"aacc000000000002", "RepOther", "repeater", "2026-04-11T00:00:00Z")
"aacc000000000002", "RepOther", "repeater", recentTS(24))
store := NewPacketStore(db, nil)
@@ -190,7 +197,7 @@ func TestMultiByteCapability_PrefixCollision(t *testing.T) {
RawHex: rawHex,
PayloadType: &pt,
PathJSON: `["aa"]`,
FirstSeen: "2026-04-10T00:00:00.000Z",
FirstSeen: recentTS(48),
}
addTestPacket(store, pkt)
@@ -221,7 +228,7 @@ func TestMultiByteCapability_TraceExcluded(t *testing.T) {
defer db.conn.Close()
db.conn.Exec("INSERT INTO nodes (public_key, name, role, last_seen) VALUES (?, ?, ?, ?)",
"aabbccdd11223344", "RepTrace", "repeater", "2026-04-10T00:00:00Z")
"aabbccdd11223344", "RepTrace", "repeater", recentTS(48))
store := NewPacketStore(db, nil)
@@ -233,7 +240,7 @@ func TestMultiByteCapability_TraceExcluded(t *testing.T) {
RawHex: rawHex,
PayloadType: &pt,
PathJSON: `["aabb"]`,
FirstSeen: "2026-04-10T00:00:00.000Z",
FirstSeen: recentTS(48),
}
addTestPacket(store, pkt)
@@ -253,7 +260,7 @@ func TestMultiByteCapability_NonTraceStillSuspected(t *testing.T) {
defer db.conn.Close()
db.conn.Exec("INSERT INTO nodes (public_key, name, role, last_seen) VALUES (?, ?, ?, ?)",
"aabbccdd11223344", "RepNonTrace", "repeater", "2026-04-10T00:00:00Z")
"aabbccdd11223344", "RepNonTrace", "repeater", recentTS(48))
store := NewPacketStore(db, nil)
@@ -265,7 +272,7 @@ func TestMultiByteCapability_NonTraceStillSuspected(t *testing.T) {
RawHex: rawHex,
PayloadType: &pt,
PathJSON: `["aabb"]`,
FirstSeen: "2026-04-10T00:00:00.000Z",
FirstSeen: recentTS(48),
}
addTestPacket(store, pkt)
@@ -285,7 +292,7 @@ func TestMultiByteCapability_ConfirmedUnaffectedByTraceExclusion(t *testing.T) {
defer db.conn.Close()
db.conn.Exec("INSERT INTO nodes (public_key, name, role, last_seen) VALUES (?, ?, ?, ?)",
"aabbccdd11223344", "RepConfirmedTrace", "repeater", "2026-04-11T00:00:00Z")
"aabbccdd11223344", "RepConfirmedTrace", "repeater", recentTS(24))
store := NewPacketStore(db, nil)
@@ -300,7 +307,7 @@ func TestMultiByteCapability_ConfirmedUnaffectedByTraceExclusion(t *testing.T) {
RawHex: rawHex,
PayloadType: &pt,
PathJSON: `["aabb"]`,
FirstSeen: "2026-04-10T00:00:00.000Z",
FirstSeen: recentTS(48),
}
addTestPacket(store, pkt)
@@ -320,7 +327,7 @@ func TestMultiByteCapability_CompanionConfirmed(t *testing.T) {
defer db.conn.Close()
db.conn.Exec("INSERT INTO nodes (public_key, name, role, last_seen) VALUES (?, ?, ?, ?)",
"aabbccdd11223344", "CompA", "companion", "2026-04-11T00:00:00Z")
"aabbccdd11223344", "CompA", "companion", recentTS(24))
store := NewPacketStore(db, nil)
addTestPacket(store, makeTestAdvert("aabbccdd11223344", 2))
@@ -347,11 +354,11 @@ func TestMultiByteCapability_RoleColumnPopulated(t *testing.T) {
defer db.conn.Close()
db.conn.Exec("INSERT INTO nodes (public_key, name, role, last_seen) VALUES (?, ?, ?, ?)",
"aabb000000000001", "Rep1", "repeater", "2026-04-11T00:00:00Z")
"aabb000000000001", "Rep1", "repeater", recentTS(24))
db.conn.Exec("INSERT INTO nodes (public_key, name, role, last_seen) VALUES (?, ?, ?, ?)",
"ccdd000000000002", "Comp1", "companion", "2026-04-11T00:00:00Z")
"ccdd000000000002", "Comp1", "companion", recentTS(24))
db.conn.Exec("INSERT INTO nodes (public_key, name, role, last_seen) VALUES (?, ?, ?, ?)",
"eeff000000000003", "Room1", "room_server", "2026-04-11T00:00:00Z")
"eeff000000000003", "Room1", "room_server", recentTS(24))
store := NewPacketStore(db, nil)
addTestPacket(store, makeTestAdvert("aabb000000000001", 2))
@@ -386,7 +393,7 @@ func TestMultiByteCapability_AdopterEvidenceTakesPrecedence(t *testing.T) {
defer db.conn.Close()
db.conn.Exec("INSERT INTO nodes (public_key, name, role, last_seen) VALUES (?, ?, ?, ?)",
"aabbccdd11223344", "RepAdopter", "repeater", "2026-04-11T00:00:00Z")
"aabbccdd11223344", "RepAdopter", "repeater", recentTS(24))
store := NewPacketStore(db, nil)
@@ -398,7 +405,7 @@ func TestMultiByteCapability_AdopterEvidenceTakesPrecedence(t *testing.T) {
RawHex: rawHex,
PayloadType: &pt,
PathJSON: `["aabb"]`,
FirstSeen: "2026-04-10T00:00:00.000Z",
FirstSeen: recentTS(48),
}
addTestPacket(store, pkt)
+21
View File
@@ -124,6 +124,7 @@ func (s *Server) RegisterRoutes(r *mux.Router) {
r.Handle("/api/perf/reset", s.requireAPIKey(http.HandlerFunc(s.handlePerfReset))).Methods("POST")
r.Handle("/api/admin/prune", s.requireAPIKey(http.HandlerFunc(s.handleAdminPrune))).Methods("POST")
r.Handle("/api/debug/affinity", s.requireAPIKey(http.HandlerFunc(s.handleDebugAffinity))).Methods("GET")
r.Handle("/api/dropped-packets", s.requireAPIKey(http.HandlerFunc(s.handleDroppedPackets))).Methods("GET")
// Packet endpoints
r.HandleFunc("/api/packets/observations", s.handleBatchObservations).Methods("POST")
@@ -589,6 +590,7 @@ func (s *Server) handleStats(w http.ResponseWriter, r *http.Request) {
},
Backfilling: backfilling,
BackfillProgress: backfillProgress,
SignatureDrops: s.db.GetSignatureDropCount(),
}
s.statsMu.Lock()
@@ -2606,3 +2608,22 @@ func (s *Server) filterBlacklistedFromSubpaths(data map[string]interface{}) map[
}
return data
}
// handleDroppedPackets returns recently dropped packets for investigation.
func (s *Server) handleDroppedPackets(w http.ResponseWriter, r *http.Request) {
limit := 100
if v := r.URL.Query().Get("limit"); v != "" {
if n, err := strconv.Atoi(v); err == nil && n > 0 {
limit = n
}
}
observerID := r.URL.Query().Get("observer")
nodePubkey := r.URL.Query().Get("pubkey")
results, err := s.db.GetDroppedPackets(limit, observerID, nodePubkey)
if err != nil {
writeError(w, 500, err.Error())
return
}
writeJSON(w, results)
}
+1
View File
@@ -70,6 +70,7 @@ type StatsResponse struct {
Counts RoleCounts `json:"counts"`
Backfilling bool `json:"backfilling"`
BackfillProgress float64 `json:"backfillProgress"`
SignatureDrops int64 `json:"signatureDrops,omitempty"`
}
// ─── Health ────────────────────────────────────────────────────────────────────