From 331dc0090ee5d9fe7888c3c3a05ae593c03aac60 Mon Sep 17 00:00:00 2001 From: you Date: Sat, 28 Mar 2026 16:54:06 +0000 Subject: [PATCH] test: add load test with throughput and latency metrics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit TestLoadTestThroughput: 1000 messages × 4 writes each = 4000 writes, 20 concurrent goroutines. Reports msgs/sec, p50/p95/p99 latency, SQLITE_BUSY count, and total errors. Hard-asserts zero BUSY errors. --- cmd/ingestor/db_test.go | 143 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 143 insertions(+) diff --git a/cmd/ingestor/db_test.go b/cmd/ingestor/db_test.go index e45ad2f..c5afb52 100644 --- a/cmd/ingestor/db_test.go +++ b/cmd/ingestor/db_test.go @@ -4,7 +4,9 @@ import ( "fmt" "os" "path/filepath" + "sort" "strings" + "sync/atomic" "testing" "time" ) @@ -790,3 +792,144 @@ func TestDBStats(t *testing.T) { // LogStats should not panic s.LogStats() } + +func TestLoadTestThroughput(t *testing.T) { + s, err := OpenStore(tempDBPath(t)) + if err != nil { + t.Fatal(err) + } + defer s.Close() + + // Pre-create observer + if err := s.UpsertObserver("obs1", "Observer1", "SJC"); err != nil { + t.Fatal(err) + } + + const totalMessages = 1000 + const goroutines = 20 + perGoroutine := totalMessages / goroutines + + // Simulate full pipeline: InsertTransmission + UpsertNode + UpsertObserver + IncrementAdvertCount + // This matches the real handleMessage write pattern for ADVERT packets + latencies := make([]time.Duration, totalMessages) + var busyErrors atomic.Int64 + var totalErrors atomic.Int64 + errCh := make(chan error, totalMessages) + done := make(chan struct{}) + + start := time.Now() + + for g := 0; g < goroutines; g++ { + go func(gIdx int) { + defer func() { done <- struct{}{} }() + for i := 0; i < perGoroutine; i++ { + msgStart := time.Now() + idx := gIdx*perGoroutine + i + hash := fmt.Sprintf("load_%04d_%04d____", gIdx, i) + snr := 5.0 + rssi := -100.0 + + data := &PacketData{ + RawHex: "0A00D69F", + Timestamp: time.Now().UTC().Format(time.RFC3339), + ObserverID: "obs1", + Hash: hash[:16], + RouteType: 2, + PayloadType: 4, + PathJSON: "[]", + DecodedJSON: `{"type":"ADVERT","pubKey":"` + hash[:16] + `"}`, + SNR: &snr, + RSSI: &rssi, + } + + _, err := s.InsertTransmission(data) + if err != nil { + totalErrors.Add(1) + if strings.Contains(err.Error(), "database is locked") || strings.Contains(err.Error(), "SQLITE_BUSY") { + busyErrors.Add(1) + } + errCh <- err + continue + } + + lat := 37.0 + float64(gIdx)*0.001 + lon := -122.0 + float64(i)*0.001 + pubKey := fmt.Sprintf("node_%04d_%04d____", gIdx, i) + if err := s.UpsertNode(pubKey[:16], "Node", "repeater", &lat, &lon, data.Timestamp); err != nil { + totalErrors.Add(1) + if strings.Contains(err.Error(), "locked") || strings.Contains(err.Error(), "BUSY") { + busyErrors.Add(1) + } + } + + if err := s.IncrementAdvertCount(pubKey[:16]); err != nil { + totalErrors.Add(1) + } + + obsID := fmt.Sprintf("obs_%04d_%04d_____", gIdx, i) + if err := s.UpsertObserver(obsID[:16], "Obs", "SJC"); err != nil { + totalErrors.Add(1) + if strings.Contains(err.Error(), "locked") || strings.Contains(err.Error(), "BUSY") { + busyErrors.Add(1) + } + } + + latencies[idx] = time.Since(msgStart) + } + }(g) + } + + for g := 0; g < goroutines; g++ { + <-done + } + close(errCh) + elapsed := time.Since(start) + + // Calculate p50, p95, p99 + validLatencies := make([]time.Duration, 0, totalMessages) + for _, l := range latencies { + if l > 0 { + validLatencies = append(validLatencies, l) + } + } + sort.Slice(validLatencies, func(i, j int) bool { return validLatencies[i] < validLatencies[j] }) + + p50 := validLatencies[len(validLatencies)*50/100] + p95 := validLatencies[len(validLatencies)*95/100] + p99 := validLatencies[len(validLatencies)*99/100] + msgsPerSec := float64(totalMessages) / elapsed.Seconds() + + t.Logf("=== LOAD TEST RESULTS ===") + t.Logf("Messages: %d (%d goroutines × %d each)", totalMessages, goroutines, perGoroutine) + t.Logf("Writes/msg: 4 (InsertTx + UpsertNode + IncrAdvertCount + UpsertObserver)") + t.Logf("Total writes: %d", totalMessages*4) + t.Logf("Duration: %s", elapsed.Round(time.Millisecond)) + t.Logf("Throughput: %.1f msgs/sec (%.1f writes/sec)", msgsPerSec, msgsPerSec*4) + t.Logf("Latency p50: %s", p50.Round(time.Microsecond)) + t.Logf("Latency p95: %s", p95.Round(time.Microsecond)) + t.Logf("Latency p99: %s", p99.Round(time.Microsecond)) + t.Logf("SQLITE_BUSY: %d", busyErrors.Load()) + t.Logf("Total errors: %d", totalErrors.Load()) + t.Logf("Stats: tx=%d dupes=%d obs=%d nodes=%d observers=%d write_err=%d", + s.Stats.TransmissionsInserted.Load(), + s.Stats.DuplicateTransmissions.Load(), + s.Stats.ObservationsInserted.Load(), + s.Stats.NodeUpserts.Load(), + s.Stats.ObserverUpserts.Load(), + s.Stats.WriteErrors.Load(), + ) + + // Hard assertions + if busyErrors.Load() > 0 { + t.Errorf("SQLITE_BUSY errors: %d (expected 0)", busyErrors.Load()) + } + if totalErrors.Load() > 0 { + t.Errorf("Total errors: %d (expected 0)", totalErrors.Load()) + } + + var txCount int + s.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&txCount) + if txCount != totalMessages { + t.Errorf("transmissions=%d, want %d", txCount, totalMessages) + } +}