test: add load test with throughput and latency metrics

TestLoadTestThroughput: 1000 messages × 4 writes each = 4000 writes,
20 concurrent goroutines. Reports msgs/sec, p50/p95/p99 latency,
SQLITE_BUSY count, and total errors. Hard-asserts zero BUSY errors.
This commit is contained in:
you
2026-03-28 16:54:06 +00:00
parent cef8156a86
commit 331dc0090e

View File

@@ -4,7 +4,9 @@ import (
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"sync/atomic"
"testing"
"time"
)
@@ -790,3 +792,144 @@ func TestDBStats(t *testing.T) {
// LogStats should not panic
s.LogStats()
}
func TestLoadTestThroughput(t *testing.T) {
s, err := OpenStore(tempDBPath(t))
if err != nil {
t.Fatal(err)
}
defer s.Close()
// Pre-create observer
if err := s.UpsertObserver("obs1", "Observer1", "SJC"); err != nil {
t.Fatal(err)
}
const totalMessages = 1000
const goroutines = 20
perGoroutine := totalMessages / goroutines
// Simulate full pipeline: InsertTransmission + UpsertNode + UpsertObserver + IncrementAdvertCount
// This matches the real handleMessage write pattern for ADVERT packets
latencies := make([]time.Duration, totalMessages)
var busyErrors atomic.Int64
var totalErrors atomic.Int64
errCh := make(chan error, totalMessages)
done := make(chan struct{})
start := time.Now()
for g := 0; g < goroutines; g++ {
go func(gIdx int) {
defer func() { done <- struct{}{} }()
for i := 0; i < perGoroutine; i++ {
msgStart := time.Now()
idx := gIdx*perGoroutine + i
hash := fmt.Sprintf("load_%04d_%04d____", gIdx, i)
snr := 5.0
rssi := -100.0
data := &PacketData{
RawHex: "0A00D69F",
Timestamp: time.Now().UTC().Format(time.RFC3339),
ObserverID: "obs1",
Hash: hash[:16],
RouteType: 2,
PayloadType: 4,
PathJSON: "[]",
DecodedJSON: `{"type":"ADVERT","pubKey":"` + hash[:16] + `"}`,
SNR: &snr,
RSSI: &rssi,
}
_, err := s.InsertTransmission(data)
if err != nil {
totalErrors.Add(1)
if strings.Contains(err.Error(), "database is locked") || strings.Contains(err.Error(), "SQLITE_BUSY") {
busyErrors.Add(1)
}
errCh <- err
continue
}
lat := 37.0 + float64(gIdx)*0.001
lon := -122.0 + float64(i)*0.001
pubKey := fmt.Sprintf("node_%04d_%04d____", gIdx, i)
if err := s.UpsertNode(pubKey[:16], "Node", "repeater", &lat, &lon, data.Timestamp); err != nil {
totalErrors.Add(1)
if strings.Contains(err.Error(), "locked") || strings.Contains(err.Error(), "BUSY") {
busyErrors.Add(1)
}
}
if err := s.IncrementAdvertCount(pubKey[:16]); err != nil {
totalErrors.Add(1)
}
obsID := fmt.Sprintf("obs_%04d_%04d_____", gIdx, i)
if err := s.UpsertObserver(obsID[:16], "Obs", "SJC"); err != nil {
totalErrors.Add(1)
if strings.Contains(err.Error(), "locked") || strings.Contains(err.Error(), "BUSY") {
busyErrors.Add(1)
}
}
latencies[idx] = time.Since(msgStart)
}
}(g)
}
for g := 0; g < goroutines; g++ {
<-done
}
close(errCh)
elapsed := time.Since(start)
// Calculate p50, p95, p99
validLatencies := make([]time.Duration, 0, totalMessages)
for _, l := range latencies {
if l > 0 {
validLatencies = append(validLatencies, l)
}
}
sort.Slice(validLatencies, func(i, j int) bool { return validLatencies[i] < validLatencies[j] })
p50 := validLatencies[len(validLatencies)*50/100]
p95 := validLatencies[len(validLatencies)*95/100]
p99 := validLatencies[len(validLatencies)*99/100]
msgsPerSec := float64(totalMessages) / elapsed.Seconds()
t.Logf("=== LOAD TEST RESULTS ===")
t.Logf("Messages: %d (%d goroutines × %d each)", totalMessages, goroutines, perGoroutine)
t.Logf("Writes/msg: 4 (InsertTx + UpsertNode + IncrAdvertCount + UpsertObserver)")
t.Logf("Total writes: %d", totalMessages*4)
t.Logf("Duration: %s", elapsed.Round(time.Millisecond))
t.Logf("Throughput: %.1f msgs/sec (%.1f writes/sec)", msgsPerSec, msgsPerSec*4)
t.Logf("Latency p50: %s", p50.Round(time.Microsecond))
t.Logf("Latency p95: %s", p95.Round(time.Microsecond))
t.Logf("Latency p99: %s", p99.Round(time.Microsecond))
t.Logf("SQLITE_BUSY: %d", busyErrors.Load())
t.Logf("Total errors: %d", totalErrors.Load())
t.Logf("Stats: tx=%d dupes=%d obs=%d nodes=%d observers=%d write_err=%d",
s.Stats.TransmissionsInserted.Load(),
s.Stats.DuplicateTransmissions.Load(),
s.Stats.ObservationsInserted.Load(),
s.Stats.NodeUpserts.Load(),
s.Stats.ObserverUpserts.Load(),
s.Stats.WriteErrors.Load(),
)
// Hard assertions
if busyErrors.Load() > 0 {
t.Errorf("SQLITE_BUSY errors: %d (expected 0)", busyErrors.Load())
}
if totalErrors.Load() > 0 {
t.Errorf("Total errors: %d (expected 0)", totalErrors.Load())
}
var txCount int
s.db.QueryRow("SELECT COUNT(*) FROM transmissions").Scan(&txCount)
if txCount != totalMessages {
t.Errorf("transmissions=%d, want %d", txCount, totalMessages)
}
}