diff --git a/cmd/ingestor/config.go b/cmd/ingestor/config.go index c39730a0..70c18fbe 100644 --- a/cmd/ingestor/config.go +++ b/cmd/ingestor/config.go @@ -39,7 +39,8 @@ type Config struct { HashChannels []string `json:"hashChannels,omitempty"` Retention *RetentionConfig `json:"retention,omitempty"` Metrics *MetricsConfig `json:"metrics,omitempty"` - GeoFilter *GeoFilterConfig `json:"geo_filter,omitempty"` + GeoFilter *GeoFilterConfig `json:"geo_filter,omitempty"` + ValidateSignatures *bool `json:"validateSignatures,omitempty"` } // GeoFilterConfig is an alias for the shared geofilter.Config type. @@ -57,6 +58,14 @@ type MetricsConfig struct { SampleIntervalSec int `json:"sampleIntervalSec"` } +// ShouldValidateSignatures returns true (default) unless explicitly disabled. +func (c *Config) ShouldValidateSignatures() bool { + if c.ValidateSignatures != nil { + return *c.ValidateSignatures + } + return true +} + // MetricsSampleInterval returns the configured sample interval or 300s default. func (c *Config) MetricsSampleInterval() int { if c.Metrics != nil && c.Metrics.SampleIntervalSec > 0 { diff --git a/cmd/ingestor/coverage_boost_test.go b/cmd/ingestor/coverage_boost_test.go index 292f080a..90f82b48 100644 --- a/cmd/ingestor/coverage_boost_test.go +++ b/cmd/ingestor/coverage_boost_test.go @@ -158,7 +158,7 @@ func TestHandleMessageChannelMessage(t *testing.T) { payload := []byte(`{"text":"Alice: Hello everyone","channel_idx":3,"SNR":5.0,"RSSI":-95,"score":10,"direction":"rx","sender_timestamp":1700000000}`) msg := &mockMessage{topic: "meshcore/message/channel/2", payload: payload} - handleMessage(store, "test", source, msg, nil, nil) + handleMessage(store, "test", source, msg, nil, &Config{}) var count int if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil { @@ -218,7 +218,7 @@ func TestHandleMessageChannelMessageEmptyText(t *testing.T) { store, source := newTestContext(t) msg := &mockMessage{topic: "meshcore/message/channel/1", payload: []byte(`{"text":""}`)} - handleMessage(store, "test", source, msg, nil, nil) + handleMessage(store, "test", source, msg, nil, &Config{}) var count int if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil { @@ -233,7 +233,7 @@ func TestHandleMessageChannelNoSender(t *testing.T) { store, source := newTestContext(t) msg := &mockMessage{topic: "meshcore/message/channel/1", payload: []byte(`{"text":"no sender here"}`)} - handleMessage(store, "test", source, msg, nil, nil) + handleMessage(store, "test", source, msg, nil, &Config{}) var count int if err := store.db.QueryRow("SELECT COUNT(*) FROM nodes").Scan(&count); err != nil { @@ -250,7 +250,7 @@ func TestHandleMessageDirectMessage(t *testing.T) { payload := []byte(`{"text":"Bob: Hey there","sender_timestamp":1700000000,"SNR":3.0,"rssi":-100,"Score":8,"Direction":"tx"}`) msg := &mockMessage{topic: "meshcore/message/direct/abc123", payload: payload} - handleMessage(store, "test", source, msg, nil, nil) + handleMessage(store, "test", source, msg, nil, &Config{}) var count int if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil { @@ -294,7 +294,7 @@ func TestHandleMessageDirectMessageEmptyText(t *testing.T) { store, source := newTestContext(t) msg := &mockMessage{topic: "meshcore/message/direct/abc", payload: []byte(`{"text":""}`)} - handleMessage(store, "test", source, msg, nil, nil) + handleMessage(store, "test", source, msg, nil, &Config{}) var count int if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil { @@ -309,7 +309,7 @@ func TestHandleMessageDirectNoSender(t *testing.T) { store, source := newTestContext(t) msg := &mockMessage{topic: "meshcore/message/direct/xyz", payload: []byte(`{"text":"message with no colon"}`)} - handleMessage(store, "test", source, msg, nil, nil) + handleMessage(store, "test", source, msg, nil, &Config{}) var count int if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil { @@ -328,7 +328,7 @@ func TestHandleMessageUppercaseScoreDirection(t *testing.T) { payload := []byte(`{"raw":"` + rawHex + `","Score":9.0,"Direction":"tx"}`) msg := &mockMessage{topic: "meshcore/SJC/obs1/packets", payload: payload} - handleMessage(store, "test", source, msg, nil, nil) + handleMessage(store, "test", source, msg, nil, &Config{}) var score *float64 var direction *string @@ -349,7 +349,7 @@ func TestHandleMessageChannelLowercaseFields(t *testing.T) { payload := []byte(`{"text":"Test: msg","snr":3.0,"rssi":-90,"Score":5,"Direction":"rx"}`) msg := &mockMessage{topic: "meshcore/message/channel/0", payload: payload} - handleMessage(store, "test", source, msg, nil, nil) + handleMessage(store, "test", source, msg, nil, &Config{}) var count int if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil { @@ -365,7 +365,7 @@ func TestHandleMessageDirectLowercaseFields(t *testing.T) { payload := []byte(`{"text":"Test: msg","snr":2.0,"rssi":-85,"score":7,"direction":"tx"}`) msg := &mockMessage{topic: "meshcore/message/direct/xyz", payload: payload} - handleMessage(store, "test", source, msg, nil, nil) + handleMessage(store, "test", source, msg, nil, &Config{}) var count int if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil { @@ -388,7 +388,7 @@ func TestHandleMessageAdvertWithTelemetry(t *testing.T) { payload: []byte(`{"raw":"` + rawHex + `"}`), } - handleMessage(store, "test", source, msg, nil, nil) + handleMessage(store, "test", source, msg, nil, &Config{}) // Should have created transmission, node, and observer var txCount, nodeCount, obsCount int @@ -428,7 +428,7 @@ func TestHandleMessageAdvertGeoFiltered(t *testing.T) { topic: "meshcore/SJC/obs1/packets", payload: []byte(`{"raw":"` + rawHex + `"}`), } - handleMessage(store, "test", source, msg, nil, gf) + handleMessage(store, "test", source, msg, nil, &Config{GeoFilter: gf}) // Geo-filtered adverts should not create nodes var nodeCount int @@ -665,7 +665,7 @@ func TestHandleMessageCorruptedAdvertNoNode(t *testing.T) { topic: "meshcore/SJC/obs1/packets", payload: []byte(`{"raw":"` + rawHex + `"}`), } - handleMessage(store, "test", source, msg, nil, nil) + handleMessage(store, "test", source, msg, nil, &Config{}) var count int if err := store.db.QueryRow("SELECT COUNT(*) FROM nodes").Scan(&count); err != nil { @@ -687,7 +687,7 @@ func TestHandleMessageNonAdvertPacket(t *testing.T) { topic: "meshcore/SJC/obs1/packets", payload: []byte(`{"raw":"` + rawHex + `"}`), } - handleMessage(store, "test", source, msg, nil, nil) + handleMessage(store, "test", source, msg, nil, &Config{}) var count int if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil { @@ -864,7 +864,7 @@ func TestHandleMessageChannelLongSender(t *testing.T) { longText := "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA: msg" payload := []byte(`{"text":"` + longText + `"}`) msg := &mockMessage{topic: "meshcore/message/channel/1", payload: payload} - handleMessage(store, "test", source, msg, nil, nil) + handleMessage(store, "test", source, msg, nil, &Config{}) var count int if err := store.db.QueryRow("SELECT COUNT(*) FROM nodes").Scan(&count); err != nil { @@ -883,7 +883,7 @@ func TestHandleMessageDirectLongSender(t *testing.T) { longText := "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB: msg" payload := []byte(`{"text":"` + longText + `"}`) msg := &mockMessage{topic: "meshcore/message/direct/abc", payload: payload} - handleMessage(store, "test", source, msg, nil, nil) + handleMessage(store, "test", source, msg, nil, &Config{}) var count int if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil { @@ -900,7 +900,7 @@ func TestHandleMessageDirectUppercaseScoreDirection(t *testing.T) { payload := []byte(`{"text":"X: hi","Score":6,"Direction":"rx"}`) msg := &mockMessage{topic: "meshcore/message/direct/d1", payload: payload} - handleMessage(store, "test", source, msg, nil, nil) + handleMessage(store, "test", source, msg, nil, &Config{}) var count int if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil { @@ -930,7 +930,7 @@ func TestHandleMessageChannelUppercaseScoreDirection(t *testing.T) { payload := []byte(`{"text":"Y: hi","Score":4,"Direction":"tx"}`) msg := &mockMessage{topic: "meshcore/message/channel/5", payload: payload} - handleMessage(store, "test", source, msg, nil, nil) + handleMessage(store, "test", source, msg, nil, &Config{}) var count int if err := store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count); err != nil { @@ -961,7 +961,7 @@ func TestHandleMessageRawLowercaseScore(t *testing.T) { rawHex := "0A00D69FD7A5A7475DB07337749AE61FA53A4788E976" payload := []byte(`{"raw":"` + rawHex + `","score":3.5}`) msg := &mockMessage{topic: "meshcore/SJC/obs1/packets", payload: payload} - handleMessage(store, "test", source, msg, nil, nil) + handleMessage(store, "test", source, msg, nil, &Config{}) var score *float64 if err := store.db.QueryRow("SELECT score FROM observations LIMIT 1").Scan(&score); err != nil { @@ -980,7 +980,7 @@ func TestHandleMessageStatusNoOrigin(t *testing.T) { topic: "meshcore/LAX/obs5/status", payload: []byte(`{"model":"L1"}`), } - handleMessage(store, "test", source, msg, nil, nil) + handleMessage(store, "test", source, msg, nil, &Config{}) var count int if err := store.db.QueryRow("SELECT COUNT(*) FROM observers WHERE id = 'obs5'").Scan(&count); err != nil { diff --git a/cmd/ingestor/db.go b/cmd/ingestor/db.go index ad9b781f..79e1a895 100644 --- a/cmd/ingestor/db.go +++ b/cmd/ingestor/db.go @@ -22,6 +22,7 @@ type DBStats struct { NodeUpserts atomic.Int64 ObserverUpserts atomic.Int64 WriteErrors atomic.Int64 + SignatureDrops atomic.Int64 } // Store wraps the SQLite database for packet ingestion. @@ -381,6 +382,32 @@ func applySchema(db *sql.DB) error { log.Println("[migration] channel_hash column added and backfilled") } + // Migration: dropped_packets table for signature validation failures (#793) + row = db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'dropped_packets_v1'") + if row.Scan(&migDone) != nil { + log.Println("[migration] Creating dropped_packets table...") + _, err := db.Exec(` + CREATE TABLE IF NOT EXISTS dropped_packets ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + hash TEXT, + raw_hex TEXT, + reason TEXT NOT NULL, + observer_id TEXT, + observer_name TEXT, + node_pubkey TEXT, + node_name TEXT, + dropped_at DATETIME DEFAULT CURRENT_TIMESTAMP + ); + CREATE INDEX IF NOT EXISTS idx_dropped_observer ON dropped_packets(observer_id); + CREATE INDEX IF NOT EXISTS idx_dropped_node ON dropped_packets(node_pubkey); + `) + if err != nil { + return fmt.Errorf("dropped_packets schema: %w", err) + } + db.Exec(`INSERT INTO _migrations (name) VALUES ('dropped_packets_v1')`) + log.Println("[migration] dropped_packets table created") + } + return nil } @@ -758,13 +785,14 @@ func (s *Store) Checkpoint() { // LogStats logs current operational metrics. func (s *Store) LogStats() { - log.Printf("[stats] tx_inserted=%d tx_dupes=%d obs_inserted=%d node_upserts=%d observer_upserts=%d write_errors=%d", + log.Printf("[stats] tx_inserted=%d tx_dupes=%d obs_inserted=%d node_upserts=%d observer_upserts=%d write_errors=%d sig_drops=%d", s.Stats.TransmissionsInserted.Load(), s.Stats.DuplicateTransmissions.Load(), s.Stats.ObservationsInserted.Load(), s.Stats.NodeUpserts.Load(), s.Stats.ObserverUpserts.Load(), s.Stats.WriteErrors.Load(), + s.Stats.SignatureDrops.Load(), ) } @@ -819,6 +847,48 @@ func (s *Store) RemoveStaleObservers(observerDays int) (int64, error) { return removed, nil } +// DroppedPacket holds data for a packet rejected during ingest. +type DroppedPacket struct { + Hash string + RawHex string + Reason string + ObserverID string + ObserverName string + NodePubKey string + NodeName string +} + +// InsertDroppedPacket records a rejected packet in the dropped_packets table. +func (s *Store) InsertDroppedPacket(dp *DroppedPacket) error { + _, err := s.db.Exec( + `INSERT INTO dropped_packets (hash, raw_hex, reason, observer_id, observer_name, node_pubkey, node_name) VALUES (?, ?, ?, ?, ?, ?, ?)`, + dp.Hash, dp.RawHex, dp.Reason, dp.ObserverID, dp.ObserverName, dp.NodePubKey, dp.NodeName, + ) + if err != nil { + s.Stats.WriteErrors.Add(1) + return fmt.Errorf("insert dropped packet: %w", err) + } + s.Stats.SignatureDrops.Add(1) + return nil +} + +// PruneDroppedPackets removes dropped_packets older than retentionDays. +func (s *Store) PruneDroppedPackets(retentionDays int) (int64, error) { + if retentionDays <= 0 { + return 0, nil + } + cutoff := time.Now().UTC().AddDate(0, 0, -retentionDays).Format(time.RFC3339) + result, err := s.db.Exec(`DELETE FROM dropped_packets WHERE dropped_at < ?`, cutoff) + if err != nil { + return 0, fmt.Errorf("prune dropped packets: %w", err) + } + n, _ := result.RowsAffected() + if n > 0 { + log.Printf("Pruned %d dropped packet(s) older than %d days", n, retentionDays) + } + return n, nil +} + // PacketData holds the data needed to insert a packet into the DB. type PacketData struct { RawHex string diff --git a/cmd/ingestor/main.go b/cmd/ingestor/main.go index 67544865..76697686 100644 --- a/cmd/ingestor/main.go +++ b/cmd/ingestor/main.go @@ -68,6 +68,7 @@ func main() { // Metrics retention: prune old metrics on startup metricsDays := cfg.MetricsRetentionDays() store.PruneOldMetrics(metricsDays) + store.PruneDroppedPackets(metricsDays) // Daily ticker for node retention retentionTicker := time.NewTicker(1 * time.Hour) @@ -92,6 +93,7 @@ func main() { go func() { for range metricsRetentionTicker.C { store.PruneOldMetrics(metricsDays) + store.PruneDroppedPackets(metricsDays) } }() @@ -160,7 +162,7 @@ func main() { // Capture source for closure src := source opts.SetDefaultPublishHandler(func(c mqtt.Client, m mqtt.Message) { - handleMessage(store, tag, src, m, channelKeys, cfg.GeoFilter) + handleMessage(store, tag, src, m, channelKeys, cfg) }) client := mqtt.NewClient(opts) @@ -195,7 +197,7 @@ func main() { log.Println("Done.") } -func handleMessage(store *Store, tag string, source MQTTSource, m mqtt.Message, channelKeys map[string]string, geoFilter *GeoFilterConfig) { +func handleMessage(store *Store, tag string, source MQTTSource, m mqtt.Message, channelKeys map[string]string, cfg *Config) { defer func() { if r := recover(); r != nil { log.Printf("MQTT [%s] panic in handler: %v", tag, r) @@ -262,7 +264,8 @@ func handleMessage(store *Store, tag string, source MQTTSource, m mqtt.Message, // Format 1: Raw packet (meshcoretomqtt / Cisien format) rawHex, _ := msg["raw"].(string) if rawHex != "" { - decoded, err := DecodePacket(rawHex, channelKeys, false) + validateSigs := cfg.ShouldValidateSignatures() + decoded, err := DecodePacket(rawHex, channelKeys, validateSigs) if err != nil { log.Printf("MQTT [%s] decode error: %v", tag, err) return @@ -322,7 +325,27 @@ func handleMessage(store *Store, tag string, source MQTTSource, m mqtt.Message, log.Printf("MQTT [%s] skipping corrupted ADVERT: %s", tag, reason) return } - if !NodePassesGeoFilter(decoded.Payload.Lat, decoded.Payload.Lon, geoFilter) { + // Signature validation: drop adverts with invalid ed25519 signatures + if validateSigs && decoded.Payload.SignatureValid != nil && !*decoded.Payload.SignatureValid { + hash := ComputeContentHash(rawHex) + truncPK := decoded.Payload.PubKey + if len(truncPK) > 16 { + truncPK = truncPK[:16] + } + log.Printf("MQTT [%s] DROPPED invalid signature: hash=%s name=%s observer=%s pubkey=%s", + tag, hash, decoded.Payload.Name, firstNonEmpty(mqttMsg.Origin, observerID), truncPK) + store.InsertDroppedPacket(&DroppedPacket{ + Hash: hash, + RawHex: rawHex, + Reason: "invalid signature", + ObserverID: observerID, + ObserverName: mqttMsg.Origin, + NodePubKey: decoded.Payload.PubKey, + NodeName: decoded.Payload.Name, + }) + return + } + if !NodePassesGeoFilter(decoded.Payload.Lat, decoded.Payload.Lon, cfg.GeoFilter) { return } pktData := BuildPacketData(mqttMsg, decoded, observerID, region) diff --git a/cmd/ingestor/main_test.go b/cmd/ingestor/main_test.go index 9d9cec8c..11935826 100644 --- a/cmd/ingestor/main_test.go +++ b/cmd/ingestor/main_test.go @@ -130,7 +130,7 @@ func TestHandleMessageRawPacket(t *testing.T) { payload := []byte(`{"raw":"` + rawHex + `","SNR":5.5,"RSSI":-100.0,"origin":"myobs"}`) msg := &mockMessage{topic: "meshcore/SJC/obs1/packets", payload: payload} - handleMessage(store, "test", source, msg, nil, nil) + handleMessage(store, "test", source, msg, nil, &Config{}) var count int store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count) @@ -147,7 +147,7 @@ func TestHandleMessageRawPacketAdvert(t *testing.T) { payload := []byte(`{"raw":"` + rawHex + `"}`) msg := &mockMessage{topic: "meshcore/SJC/obs1/packets", payload: payload} - handleMessage(store, "test", source, msg, nil, nil) + handleMessage(store, "test", source, msg, nil, &Config{}) // Should create a node from the ADVERT var count int @@ -169,7 +169,7 @@ func TestHandleMessageInvalidJSON(t *testing.T) { msg := &mockMessage{topic: "meshcore/SJC/obs1/packets", payload: []byte(`not json`)} // Should not panic - handleMessage(store, "test", source, msg, nil, nil) + handleMessage(store, "test", source, msg, nil, &Config{}) var count int store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count) @@ -186,7 +186,7 @@ func TestHandleMessageStatusTopic(t *testing.T) { payload: []byte(`{"origin":"MyObserver"}`), } - handleMessage(store, "test", source, msg, nil, nil) + handleMessage(store, "test", source, msg, nil, &Config{}) var name, iata string err := store.db.QueryRow("SELECT name, iata FROM observers WHERE id = 'obs1'").Scan(&name, &iata) @@ -207,11 +207,11 @@ func TestHandleMessageSkipStatusTopics(t *testing.T) { // meshcore/status should be skipped msg1 := &mockMessage{topic: "meshcore/status", payload: []byte(`{"raw":"0A00"}`)} - handleMessage(store, "test", source, msg1, nil, nil) + handleMessage(store, "test", source, msg1, nil, &Config{}) // meshcore/events/connection should be skipped msg2 := &mockMessage{topic: "meshcore/events/connection", payload: []byte(`{"raw":"0A00"}`)} - handleMessage(store, "test", source, msg2, nil, nil) + handleMessage(store, "test", source, msg2, nil, &Config{}) var count int store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count) @@ -230,7 +230,7 @@ func TestHandleMessageIATAFilter(t *testing.T) { topic: "meshcore/SJC/obs1/packets", payload: []byte(`{"raw":"` + rawHex + `"}`), } - handleMessage(store, "test", source, msg, nil, nil) + handleMessage(store, "test", source, msg, nil, &Config{}) var count int store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count) @@ -243,7 +243,7 @@ func TestHandleMessageIATAFilter(t *testing.T) { topic: "meshcore/LAX/obs2/packets", payload: []byte(`{"raw":"` + rawHex + `"}`), } - handleMessage(store, "test", source, msg2, nil, nil) + handleMessage(store, "test", source, msg2, nil, &Config{}) store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count) if count != 1 { @@ -261,7 +261,7 @@ func TestHandleMessageIATAFilterNoRegion(t *testing.T) { topic: "meshcore", payload: []byte(`{"raw":"` + rawHex + `"}`), } - handleMessage(store, "test", source, msg, nil, nil) + handleMessage(store, "test", source, msg, nil, &Config{}) // No region part → filter doesn't apply, message goes through // Actually the code checks len(parts) > 1 for IATA filter @@ -277,7 +277,7 @@ func TestHandleMessageNoRawHex(t *testing.T) { topic: "meshcore/SJC/obs1/packets", payload: []byte(`{"type":"companion","data":"something"}`), } - handleMessage(store, "test", source, msg, nil, nil) + handleMessage(store, "test", source, msg, nil, &Config{}) var count int store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count) @@ -295,7 +295,7 @@ func TestHandleMessageBadRawHex(t *testing.T) { topic: "meshcore/SJC/obs1/packets", payload: []byte(`{"raw":"ZZZZ"}`), } - handleMessage(store, "test", source, msg, nil, nil) + handleMessage(store, "test", source, msg, nil, &Config{}) var count int store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count) @@ -312,7 +312,7 @@ func TestHandleMessageWithSNRRSSIAsNumbers(t *testing.T) { payload := []byte(`{"raw":"` + rawHex + `","SNR":7.2,"RSSI":-95}`) msg := &mockMessage{topic: "meshcore/SJC/obs1/packets", payload: payload} - handleMessage(store, "test", source, msg, nil, nil) + handleMessage(store, "test", source, msg, nil, &Config{}) var snr, rssi *float64 store.db.QueryRow("SELECT snr, rssi FROM observations LIMIT 1").Scan(&snr, &rssi) @@ -331,7 +331,7 @@ func TestHandleMessageMinimalTopic(t *testing.T) { topic: "meshcore/SJC", payload: []byte(`{"raw":"` + rawHex + `"}`), } - handleMessage(store, "test", source, msg, nil, nil) + handleMessage(store, "test", source, msg, nil, &Config{}) var count int store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count) @@ -352,7 +352,7 @@ func TestHandleMessageCorruptedAdvert(t *testing.T) { topic: "meshcore/SJC/obs1/packets", payload: []byte(`{"raw":"` + rawHex + `"}`), } - handleMessage(store, "test", source, msg, nil, nil) + handleMessage(store, "test", source, msg, nil, &Config{}) // Transmission should be inserted (even if advert is invalid) var count int @@ -378,7 +378,7 @@ func TestHandleMessageNoObserverID(t *testing.T) { topic: "packets", payload: []byte(`{"raw":"` + rawHex + `","origin":"obs1"}`), } - handleMessage(store, "test", source, msg, nil, nil) + handleMessage(store, "test", source, msg, nil, &Config{}) var count int store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count) @@ -400,7 +400,7 @@ func TestHandleMessageSNRNotFloat(t *testing.T) { // SNR as a string value — should not parse as float payload := []byte(`{"raw":"` + rawHex + `","SNR":"bad","RSSI":"bad"}`) msg := &mockMessage{topic: "meshcore/SJC/obs1/packets", payload: payload} - handleMessage(store, "test", source, msg, nil, nil) + handleMessage(store, "test", source, msg, nil, &Config{}) var count int store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count) @@ -416,7 +416,7 @@ func TestHandleMessageOriginExtraction(t *testing.T) { rawHex := "0A00D69FD7A5A7475DB07337749AE61FA53A4788E976" payload := []byte(`{"raw":"` + rawHex + `","origin":"MyOrigin"}`) msg := &mockMessage{topic: "meshcore/SJC/obs1/packets", payload: payload} - handleMessage(store, "test", source, msg, nil, nil) + handleMessage(store, "test", source, msg, nil, &Config{}) // Verify origin was extracted to observer name var name string @@ -439,7 +439,7 @@ func TestHandleMessagePanicRecovery(t *testing.T) { } // Should not panic — the defer/recover should catch it - handleMessage(store, "test", source, msg, nil, nil) + handleMessage(store, "test", source, msg, nil, &Config{}) } func TestHandleMessageStatusOriginFallback(t *testing.T) { @@ -451,7 +451,7 @@ func TestHandleMessageStatusOriginFallback(t *testing.T) { topic: "meshcore/SJC/obs1/status", payload: []byte(`{"type":"status"}`), } - handleMessage(store, "test", source, msg, nil, nil) + handleMessage(store, "test", source, msg, nil, &Config{}) var name string err := store.db.QueryRow("SELECT name FROM observers WHERE id = 'obs1'").Scan(&name) @@ -640,7 +640,7 @@ func TestHandleMessageWithLowercaseSNRRSSI(t *testing.T) { payload := []byte(`{"raw":"` + rawHex + `","snr":5.5,"rssi":-102}`) msg := &mockMessage{topic: "meshcore/SJC/obs1/packets", payload: payload} - handleMessage(store, "test", source, msg, nil, nil) + handleMessage(store, "test", source, msg, nil, &Config{}) var snr, rssi *float64 store.db.QueryRow("SELECT snr, rssi FROM observations LIMIT 1").Scan(&snr, &rssi) @@ -661,7 +661,7 @@ func TestHandleMessageSNRRSSIUppercaseWins(t *testing.T) { payload := []byte(`{"raw":"` + rawHex + `","SNR":7.2,"snr":1.0,"RSSI":-95,"rssi":-50}`) msg := &mockMessage{topic: "meshcore/SJC/obs1/packets", payload: payload} - handleMessage(store, "test", source, msg, nil, nil) + handleMessage(store, "test", source, msg, nil, &Config{}) var snr, rssi *float64 store.db.QueryRow("SELECT snr, rssi FROM observations LIMIT 1").Scan(&snr, &rssi) @@ -681,7 +681,7 @@ func TestHandleMessageNoSNRRSSI(t *testing.T) { payload := []byte(`{"raw":"` + rawHex + `"}`) msg := &mockMessage{topic: "meshcore/SJC/obs1/packets", payload: payload} - handleMessage(store, "test", source, msg, nil, nil) + handleMessage(store, "test", source, msg, nil, &Config{}) var snr, rssi *float64 store.db.QueryRow("SELECT snr, rssi FROM observations LIMIT 1").Scan(&snr, &rssi) diff --git a/cmd/ingestor/sig_validate_ingest_test.go b/cmd/ingestor/sig_validate_ingest_test.go new file mode 100644 index 00000000..8dafa181 --- /dev/null +++ b/cmd/ingestor/sig_validate_ingest_test.go @@ -0,0 +1,339 @@ +package main + +import ( + "crypto/ed25519" + "encoding/binary" + "encoding/hex" + "strings" + "testing" +) + +// buildAdvertHex constructs a full ADVERT packet hex string. +// header(1) + pathByte(1) + pubkey(32) + timestamp(4) + signature(64) + appdata +func buildAdvertHex(pubKey ed25519.PublicKey, privKey ed25519.PrivateKey, timestamp uint32, appdata []byte) string { + // Build signed message: pubkey(32) + timestamp(4 LE) + appdata + msg := make([]byte, 32+4+len(appdata)) + copy(msg[0:32], pubKey) + binary.LittleEndian.PutUint32(msg[32:36], timestamp) + copy(msg[36:], appdata) + + sig := ed25519.Sign(privKey, msg) + + // Payload: pubkey(32) + timestamp(4) + signature(64) + appdata + payload := make([]byte, 0, 100+len(appdata)) + payload = append(payload, pubKey...) + ts := make([]byte, 4) + binary.LittleEndian.PutUint32(ts, timestamp) + payload = append(payload, ts...) + payload = append(payload, sig...) + payload = append(payload, appdata...) + + // Header: ADVERT (0x04 << 2) | FLOOD (1) = 0x11, pathByte=0 (no hops) + header := byte(0x11) + pathByte := byte(0x00) + + pkt := append([]byte{header, pathByte}, payload...) + return hex.EncodeToString(pkt) +} + +// makeAppdata builds minimal appdata: flags(1) + name +func makeAppdata(name string) []byte { + flags := byte(0x81) // hasName=true, type=companion(1) + data := []byte{flags} + data = append(data, []byte(name)...) + data = append(data, 0x00) // null terminator + return data +} + +func TestSigValidation_ValidAdvertStored(t *testing.T) { + dbPath := t.TempDir() + "/test.db" + store, err := OpenStoreWithInterval(dbPath, 300) + if err != nil { + t.Fatal(err) + } + defer store.Close() + + pub, priv, _ := ed25519.GenerateKey(nil) + appdata := makeAppdata("TestNode") + rawHex := buildAdvertHex(pub, priv, 1700000000, appdata) + + source := MQTTSource{Name: "test"} + msg := newMockMsg("meshcore/US/obs1/packet", `{"raw":"`+rawHex+`","origin":"TestObs"}`) + cfg := &Config{} + + handleMessage(store, "test", source, msg, nil, cfg) + + // Verify packet was stored + var count int + store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&count) + if count == 0 { + t.Fatal("valid advert should be stored, got 0 transmissions") + } +} + +func TestSigValidation_TamperedSignatureDropped(t *testing.T) { + dbPath := t.TempDir() + "/test.db" + store, err := OpenStoreWithInterval(dbPath, 300) + if err != nil { + t.Fatal(err) + } + defer store.Close() + + pub, priv, _ := ed25519.GenerateKey(nil) + appdata := makeAppdata("BadNode") + rawHex := buildAdvertHex(pub, priv, 1700000000, appdata) + + // Tamper with signature (flip a byte in the signature area) + // Signature starts at offset 2 (header+path) + 32 (pubkey) + 4 (timestamp) = 38 + // That's byte 38 in the packet, hex chars 76-77 + rawBytes := []byte(rawHex) + if rawBytes[76] == '0' { + rawBytes[76] = 'f' + } else { + rawBytes[76] = '0' + } + tamperedHex := string(rawBytes) + + source := MQTTSource{Name: "test"} + msg := newMockMsg("meshcore/US/obs1/packet", `{"raw":"`+tamperedHex+`","origin":"TestObs"}`) + cfg := &Config{} + + handleMessage(store, "test", source, msg, nil, cfg) + + // Verify packet was NOT stored in transmissions + var txCount int + store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&txCount) + if txCount != 0 { + t.Fatalf("tampered advert should be dropped, got %d transmissions", txCount) + } + + // Verify it was recorded in dropped_packets + var dropCount int + store.db.QueryRow("SELECT COUNT(*) FROM dropped_packets").Scan(&dropCount) + if dropCount == 0 { + t.Fatal("tampered advert should be recorded in dropped_packets") + } + + // Verify drop counter incremented + if store.Stats.SignatureDrops.Load() != 1 { + t.Fatalf("expected 1 signature drop, got %d", store.Stats.SignatureDrops.Load()) + } + + // Verify dropped_packets has correct fields + var reason, nodeKey, nodeName, obsID string + store.db.QueryRow("SELECT reason, node_pubkey, node_name, observer_id FROM dropped_packets LIMIT 1").Scan(&reason, &nodeKey, &nodeName, &obsID) + if reason != "invalid signature" { + t.Fatalf("expected reason 'invalid signature', got %q", reason) + } + if nodeKey == "" { + t.Fatal("dropped packet should have node_pubkey") + } + if !strings.Contains(nodeName, "BadNode") { + t.Fatalf("expected node_name to contain 'BadNode', got %q", nodeName) + } + if obsID != "obs1" { + t.Fatalf("expected observer_id 'obs1', got %q", obsID) + } +} + +func TestSigValidation_TruncatedAppdataDropped(t *testing.T) { + dbPath := t.TempDir() + "/test.db" + store, err := OpenStoreWithInterval(dbPath, 300) + if err != nil { + t.Fatal(err) + } + defer store.Close() + + pub, priv, _ := ed25519.GenerateKey(nil) + appdata := makeAppdata("TruncNode") + rawHex := buildAdvertHex(pub, priv, 1700000000, appdata) + + // Sign was computed with full appdata. Now truncate the raw hex to remove + // some appdata bytes, making the signature invalid. + // Truncate last 4 hex chars (2 bytes of appdata) + truncatedHex := rawHex[:len(rawHex)-4] + + source := MQTTSource{Name: "test"} + msg := newMockMsg("meshcore/US/obs1/packet", `{"raw":"`+truncatedHex+`","origin":"TestObs"}`) + cfg := &Config{} + + handleMessage(store, "test", source, msg, nil, cfg) + + var txCount int + store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&txCount) + if txCount != 0 { + t.Fatalf("truncated advert should be dropped, got %d transmissions", txCount) + } +} + +func TestSigValidation_DisabledByConfig(t *testing.T) { + dbPath := t.TempDir() + "/test.db" + store, err := OpenStoreWithInterval(dbPath, 300) + if err != nil { + t.Fatal(err) + } + defer store.Close() + + pub, priv, _ := ed25519.GenerateKey(nil) + appdata := makeAppdata("NoValNode") + rawHex := buildAdvertHex(pub, priv, 1700000000, appdata) + + // Tamper with signature + rawBytes := []byte(rawHex) + if rawBytes[76] == '0' { + rawBytes[76] = 'f' + } else { + rawBytes[76] = '0' + } + tamperedHex := string(rawBytes) + + source := MQTTSource{Name: "test"} + msg := newMockMsg("meshcore/US/obs1/packet", `{"raw":"`+tamperedHex+`","origin":"TestObs"}`) + falseVal := false + cfg := &Config{ValidateSignatures: &falseVal} + + handleMessage(store, "test", source, msg, nil, cfg) + + // With validation disabled, tampered packet should be stored + var txCount int + store.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&txCount) + if txCount == 0 { + t.Fatal("with validateSignatures=false, tampered advert should be stored") + } +} + +func TestSigValidation_DropCounterIncrements(t *testing.T) { + dbPath := t.TempDir() + "/test.db" + store, err := OpenStoreWithInterval(dbPath, 300) + if err != nil { + t.Fatal(err) + } + defer store.Close() + + pub, priv, _ := ed25519.GenerateKey(nil) + source := MQTTSource{Name: "test"} + cfg := &Config{} + + for i := 0; i < 3; i++ { + appdata := makeAppdata("Node") + rawHex := buildAdvertHex(pub, priv, uint32(1700000000+i), appdata) + // Tamper + rawBytes := []byte(rawHex) + if rawBytes[76] == '0' { + rawBytes[76] = 'f' + } else { + rawBytes[76] = '0' + } + msg := newMockMsg("meshcore/US/obs1/packet", `{"raw":"`+string(rawBytes)+`","origin":"Obs"}`) + handleMessage(store, "test", source, msg, nil, cfg) + } + + if store.Stats.SignatureDrops.Load() != 3 { + t.Fatalf("expected 3 signature drops, got %d", store.Stats.SignatureDrops.Load()) + } +} + +func TestSigValidation_LogContainsFields(t *testing.T) { + // This test verifies the dropped_packets row has all required fields + dbPath := t.TempDir() + "/test.db" + store, err := OpenStoreWithInterval(dbPath, 300) + if err != nil { + t.Fatal(err) + } + defer store.Close() + + pub, priv, _ := ed25519.GenerateKey(nil) + appdata := makeAppdata("LogTestNode") + rawHex := buildAdvertHex(pub, priv, 1700000000, appdata) + + // Tamper + rawBytes := []byte(rawHex) + if rawBytes[76] == '0' { + rawBytes[76] = 'f' + } else { + rawBytes[76] = '0' + } + + source := MQTTSource{Name: "test"} + msg := newMockMsg("meshcore/US/obs1/packet", `{"raw":"`+string(rawBytes)+`","origin":"MyObserver"}`) + cfg := &Config{} + + handleMessage(store, "test", source, msg, nil, cfg) + + var hash, reason, obsID, obsName, pubkey, nodeName string + err = store.db.QueryRow("SELECT hash, reason, observer_id, observer_name, node_pubkey, node_name FROM dropped_packets LIMIT 1"). + Scan(&hash, &reason, &obsID, &obsName, &pubkey, &nodeName) + if err != nil { + t.Fatal(err) + } + + if hash == "" { + t.Error("dropped packet should have hash") + } + if reason != "invalid signature" { + t.Errorf("expected reason 'invalid signature', got %q", reason) + } + if obsID != "obs1" { + t.Errorf("expected observer_id 'obs1', got %q", obsID) + } + if obsName != "MyObserver" { + t.Errorf("expected observer_name 'MyObserver', got %q", obsName) + } + if pubkey == "" { + t.Error("dropped packet should have node_pubkey") + } + if !strings.Contains(nodeName, "LogTestNode") { + t.Errorf("expected node_name containing 'LogTestNode', got %q", nodeName) + } +} + +func TestPruneDroppedPackets(t *testing.T) { + dbPath := t.TempDir() + "/test.db" + store, err := OpenStoreWithInterval(dbPath, 300) + if err != nil { + t.Fatal(err) + } + defer store.Close() + + // Insert an old dropped packet + store.db.Exec(`INSERT INTO dropped_packets (hash, reason, dropped_at) VALUES ('old', 'test', datetime('now', '-60 days'))`) + store.db.Exec(`INSERT INTO dropped_packets (hash, reason, dropped_at) VALUES ('new', 'test', datetime('now'))`) + + n, err := store.PruneDroppedPackets(30) + if err != nil { + t.Fatal(err) + } + if n != 1 { + t.Fatalf("expected 1 pruned, got %d", n) + } + + var count int + store.db.QueryRow("SELECT COUNT(*) FROM dropped_packets").Scan(&count) + if count != 1 { + t.Fatalf("expected 1 remaining, got %d", count) + } +} + +func TestShouldValidateSignatures_Default(t *testing.T) { + cfg := &Config{} + if !cfg.ShouldValidateSignatures() { + t.Fatal("default should be true") + } + + falseVal := false + cfg2 := &Config{ValidateSignatures: &falseVal} + if cfg2.ShouldValidateSignatures() { + t.Fatal("explicit false should be false") + } + + trueVal := true + cfg3 := &Config{ValidateSignatures: &trueVal} + if !cfg3.ShouldValidateSignatures() { + t.Fatal("explicit true should be true") + } +} + +// newMockMsg creates a minimal mqtt.Message for testing. +func newMockMsg(topic, payload string) *mockMessage { + return &mockMessage{topic: topic, payload: []byte(payload)} +} diff --git a/cmd/server/db.go b/cmd/server/db.go index aadf7772..bb3a71a8 100644 --- a/cmd/server/db.go +++ b/cmd/server/db.go @@ -2252,3 +2252,71 @@ func (db *DB) TouchNodeLastSeen(pubkey string, timestamp string) error { ) return err } + +// GetDroppedPackets returns recently dropped packets, newest first. +func (db *DB) GetDroppedPackets(limit int, observerID, nodePubkey string) ([]map[string]interface{}, error) { + if limit <= 0 || limit > 500 { + limit = 100 + } + query := `SELECT id, hash, raw_hex, reason, observer_id, observer_name, node_pubkey, node_name, dropped_at FROM dropped_packets` + var conditions []string + var args []interface{} + if observerID != "" { + conditions = append(conditions, "observer_id = ?") + args = append(args, observerID) + } + if nodePubkey != "" { + conditions = append(conditions, "node_pubkey = ?") + args = append(args, nodePubkey) + } + if len(conditions) > 0 { + query += " WHERE " + strings.Join(conditions, " AND ") + } + query += " ORDER BY dropped_at DESC LIMIT ?" + args = append(args, limit) + + rows, err := db.conn.Query(query, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + var results []map[string]interface{} + for rows.Next() { + var id int + var hash, rawHex, reason, obsID, obsName, pubkey, name, droppedAt sql.NullString + if err := rows.Scan(&id, &hash, &rawHex, &reason, &obsID, &obsName, &pubkey, &name, &droppedAt); err != nil { + continue + } + row := map[string]interface{}{ + "id": id, + "hash": nullStr(hash), + "reason": nullStr(reason), + "observer_id": nullStr(obsID), + "observer_name": nullStr(obsName), + "node_pubkey": nullStr(pubkey), + "node_name": nullStr(name), + "dropped_at": nullStr(droppedAt), + } + // Only include raw_hex if explicitly requested (it's large) + if rawHex.Valid { + row["raw_hex"] = rawHex.String + } + results = append(results, row) + } + if results == nil { + results = []map[string]interface{}{} + } + return results, nil +} + +// GetSignatureDropCount returns the total number of dropped packets. +func (db *DB) GetSignatureDropCount() int64 { + var count int64 + // Table may not exist yet if ingestor hasn't run the migration + err := db.conn.QueryRow("SELECT COUNT(*) FROM dropped_packets").Scan(&count) + if err != nil { + return 0 + } + return count +} diff --git a/cmd/server/multibyte_capability_test.go b/cmd/server/multibyte_capability_test.go index 4f77b3d8..6e48477c 100644 --- a/cmd/server/multibyte_capability_test.go +++ b/cmd/server/multibyte_capability_test.go @@ -6,10 +6,17 @@ import ( "fmt" "strings" "testing" + "time" _ "modernc.org/sqlite" ) +// recentTS returns a timestamp string N hours ago, ensuring test data +// stays within the 7-day advert window used by computeNodeHashSizeInfo. +func recentTS(hoursAgo int) string { + return time.Now().UTC().Add(-time.Duration(hoursAgo) * time.Hour).Format("2006-01-02T15:04:05.000Z") +} + // setupCapabilityTestDB creates a minimal in-memory DB with nodes table. func setupCapabilityTestDB(t *testing.T) *DB { t.Helper() @@ -69,7 +76,7 @@ func makeTestAdvert(pubkey string, hashSize int) *StoreTx { PayloadType: &pt, DecodedJSON: string(decoded), PathJSON: `["` + prefix + `"]`, - FirstSeen: "2026-04-11T00:00:00.000Z", + FirstSeen: recentTS(24), } } @@ -80,7 +87,7 @@ func TestMultiByteCapability_Confirmed(t *testing.T) { defer db.conn.Close() db.conn.Exec("INSERT INTO nodes (public_key, name, role, last_seen) VALUES (?, ?, ?, ?)", - "aabbccdd11223344", "RepA", "repeater", "2026-04-11T00:00:00Z") + "aabbccdd11223344", "RepA", "repeater", recentTS(24)) store := NewPacketStore(db, nil) addTestPacket(store, makeTestAdvert("aabbccdd11223344", 2)) @@ -107,7 +114,7 @@ func TestMultiByteCapability_Suspected(t *testing.T) { defer db.conn.Close() db.conn.Exec("INSERT INTO nodes (public_key, name, role, last_seen) VALUES (?, ?, ?, ?)", - "aabbccdd11223344", "RepB", "repeater", "2026-04-10T00:00:00Z") + "aabbccdd11223344", "RepB", "repeater", recentTS(48)) store := NewPacketStore(db, nil) @@ -119,7 +126,7 @@ func TestMultiByteCapability_Suspected(t *testing.T) { RawHex: rawHex, PayloadType: &pt, PathJSON: `["aabb"]`, - FirstSeen: "2026-04-10T00:00:00.000Z", + FirstSeen: recentTS(48), } addTestPacket(store, pkt) @@ -145,7 +152,7 @@ func TestMultiByteCapability_Unknown(t *testing.T) { defer db.conn.Close() db.conn.Exec("INSERT INTO nodes (public_key, name, role, last_seen) VALUES (?, ?, ?, ?)", - "aabbccdd11223344", "RepC", "repeater", "2026-04-08T00:00:00Z") + "aabbccdd11223344", "RepC", "repeater", recentTS(72)) store := NewPacketStore(db, nil) @@ -173,9 +180,9 @@ func TestMultiByteCapability_PrefixCollision(t *testing.T) { // Two repeaters sharing 1-byte prefix "aa" db.conn.Exec("INSERT INTO nodes (public_key, name, role, last_seen) VALUES (?, ?, ?, ?)", - "aabb000000000001", "RepConfirmed", "repeater", "2026-04-11T00:00:00Z") + "aabb000000000001", "RepConfirmed", "repeater", recentTS(24)) db.conn.Exec("INSERT INTO nodes (public_key, name, role, last_seen) VALUES (?, ?, ?, ?)", - "aacc000000000002", "RepOther", "repeater", "2026-04-11T00:00:00Z") + "aacc000000000002", "RepOther", "repeater", recentTS(24)) store := NewPacketStore(db, nil) @@ -190,7 +197,7 @@ func TestMultiByteCapability_PrefixCollision(t *testing.T) { RawHex: rawHex, PayloadType: &pt, PathJSON: `["aa"]`, - FirstSeen: "2026-04-10T00:00:00.000Z", + FirstSeen: recentTS(48), } addTestPacket(store, pkt) @@ -221,7 +228,7 @@ func TestMultiByteCapability_TraceExcluded(t *testing.T) { defer db.conn.Close() db.conn.Exec("INSERT INTO nodes (public_key, name, role, last_seen) VALUES (?, ?, ?, ?)", - "aabbccdd11223344", "RepTrace", "repeater", "2026-04-10T00:00:00Z") + "aabbccdd11223344", "RepTrace", "repeater", recentTS(48)) store := NewPacketStore(db, nil) @@ -233,7 +240,7 @@ func TestMultiByteCapability_TraceExcluded(t *testing.T) { RawHex: rawHex, PayloadType: &pt, PathJSON: `["aabb"]`, - FirstSeen: "2026-04-10T00:00:00.000Z", + FirstSeen: recentTS(48), } addTestPacket(store, pkt) @@ -253,7 +260,7 @@ func TestMultiByteCapability_NonTraceStillSuspected(t *testing.T) { defer db.conn.Close() db.conn.Exec("INSERT INTO nodes (public_key, name, role, last_seen) VALUES (?, ?, ?, ?)", - "aabbccdd11223344", "RepNonTrace", "repeater", "2026-04-10T00:00:00Z") + "aabbccdd11223344", "RepNonTrace", "repeater", recentTS(48)) store := NewPacketStore(db, nil) @@ -265,7 +272,7 @@ func TestMultiByteCapability_NonTraceStillSuspected(t *testing.T) { RawHex: rawHex, PayloadType: &pt, PathJSON: `["aabb"]`, - FirstSeen: "2026-04-10T00:00:00.000Z", + FirstSeen: recentTS(48), } addTestPacket(store, pkt) @@ -285,7 +292,7 @@ func TestMultiByteCapability_ConfirmedUnaffectedByTraceExclusion(t *testing.T) { defer db.conn.Close() db.conn.Exec("INSERT INTO nodes (public_key, name, role, last_seen) VALUES (?, ?, ?, ?)", - "aabbccdd11223344", "RepConfirmedTrace", "repeater", "2026-04-11T00:00:00Z") + "aabbccdd11223344", "RepConfirmedTrace", "repeater", recentTS(24)) store := NewPacketStore(db, nil) @@ -300,7 +307,7 @@ func TestMultiByteCapability_ConfirmedUnaffectedByTraceExclusion(t *testing.T) { RawHex: rawHex, PayloadType: &pt, PathJSON: `["aabb"]`, - FirstSeen: "2026-04-10T00:00:00.000Z", + FirstSeen: recentTS(48), } addTestPacket(store, pkt) @@ -320,7 +327,7 @@ func TestMultiByteCapability_CompanionConfirmed(t *testing.T) { defer db.conn.Close() db.conn.Exec("INSERT INTO nodes (public_key, name, role, last_seen) VALUES (?, ?, ?, ?)", - "aabbccdd11223344", "CompA", "companion", "2026-04-11T00:00:00Z") + "aabbccdd11223344", "CompA", "companion", recentTS(24)) store := NewPacketStore(db, nil) addTestPacket(store, makeTestAdvert("aabbccdd11223344", 2)) @@ -347,11 +354,11 @@ func TestMultiByteCapability_RoleColumnPopulated(t *testing.T) { defer db.conn.Close() db.conn.Exec("INSERT INTO nodes (public_key, name, role, last_seen) VALUES (?, ?, ?, ?)", - "aabb000000000001", "Rep1", "repeater", "2026-04-11T00:00:00Z") + "aabb000000000001", "Rep1", "repeater", recentTS(24)) db.conn.Exec("INSERT INTO nodes (public_key, name, role, last_seen) VALUES (?, ?, ?, ?)", - "ccdd000000000002", "Comp1", "companion", "2026-04-11T00:00:00Z") + "ccdd000000000002", "Comp1", "companion", recentTS(24)) db.conn.Exec("INSERT INTO nodes (public_key, name, role, last_seen) VALUES (?, ?, ?, ?)", - "eeff000000000003", "Room1", "room_server", "2026-04-11T00:00:00Z") + "eeff000000000003", "Room1", "room_server", recentTS(24)) store := NewPacketStore(db, nil) addTestPacket(store, makeTestAdvert("aabb000000000001", 2)) @@ -386,7 +393,7 @@ func TestMultiByteCapability_AdopterEvidenceTakesPrecedence(t *testing.T) { defer db.conn.Close() db.conn.Exec("INSERT INTO nodes (public_key, name, role, last_seen) VALUES (?, ?, ?, ?)", - "aabbccdd11223344", "RepAdopter", "repeater", "2026-04-11T00:00:00Z") + "aabbccdd11223344", "RepAdopter", "repeater", recentTS(24)) store := NewPacketStore(db, nil) @@ -398,7 +405,7 @@ func TestMultiByteCapability_AdopterEvidenceTakesPrecedence(t *testing.T) { RawHex: rawHex, PayloadType: &pt, PathJSON: `["aabb"]`, - FirstSeen: "2026-04-10T00:00:00.000Z", + FirstSeen: recentTS(48), } addTestPacket(store, pkt) diff --git a/cmd/server/routes.go b/cmd/server/routes.go index 37d120e1..d3bbac2c 100644 --- a/cmd/server/routes.go +++ b/cmd/server/routes.go @@ -124,6 +124,7 @@ func (s *Server) RegisterRoutes(r *mux.Router) { r.Handle("/api/perf/reset", s.requireAPIKey(http.HandlerFunc(s.handlePerfReset))).Methods("POST") r.Handle("/api/admin/prune", s.requireAPIKey(http.HandlerFunc(s.handleAdminPrune))).Methods("POST") r.Handle("/api/debug/affinity", s.requireAPIKey(http.HandlerFunc(s.handleDebugAffinity))).Methods("GET") + r.Handle("/api/dropped-packets", s.requireAPIKey(http.HandlerFunc(s.handleDroppedPackets))).Methods("GET") // Packet endpoints r.HandleFunc("/api/packets/observations", s.handleBatchObservations).Methods("POST") @@ -589,6 +590,7 @@ func (s *Server) handleStats(w http.ResponseWriter, r *http.Request) { }, Backfilling: backfilling, BackfillProgress: backfillProgress, + SignatureDrops: s.db.GetSignatureDropCount(), } s.statsMu.Lock() @@ -2606,3 +2608,22 @@ func (s *Server) filterBlacklistedFromSubpaths(data map[string]interface{}) map[ } return data } + +// handleDroppedPackets returns recently dropped packets for investigation. +func (s *Server) handleDroppedPackets(w http.ResponseWriter, r *http.Request) { + limit := 100 + if v := r.URL.Query().Get("limit"); v != "" { + if n, err := strconv.Atoi(v); err == nil && n > 0 { + limit = n + } + } + observerID := r.URL.Query().Get("observer") + nodePubkey := r.URL.Query().Get("pubkey") + + results, err := s.db.GetDroppedPackets(limit, observerID, nodePubkey) + if err != nil { + writeError(w, 500, err.Error()) + return + } + writeJSON(w, results) +} diff --git a/cmd/server/types.go b/cmd/server/types.go index 24769df0..3171518b 100644 --- a/cmd/server/types.go +++ b/cmd/server/types.go @@ -70,6 +70,7 @@ type StatsResponse struct { Counts RoleCounts `json:"counts"` Backfilling bool `json:"backfilling"` BackfillProgress float64 `json:"backfillProgress"` + SignatureDrops int64 `json:"signatureDrops,omitempty"` } // ─── Health ────────────────────────────────────────────────────────────────────