mirror of
https://github.com/Kpa-clawbot/meshcore-analyzer.git
synced 2026-04-26 21:25:13 +00:00
a8e1cea683
## Problem The firmware computes packet content hash as: ``` SHA256(payload_type_byte + [path_len for TRACE] + payload) ``` Where `payload_type_byte = (header >> 2) & 0x0F` — just the payload type bits (2-5). CoreScope was using the **full header byte** in its hash computation, which includes route type bits (0-1) and version bits (6-7). This meant the same logical packet produced different content hashes depending on route type — breaking dedup and packet lookup. **Firmware reference:** `Packet.cpp::calculatePacketHash()` uses `getPayloadType()` which returns `(header >> PH_TYPE_SHIFT) & PH_TYPE_MASK`. ## Fix - Extract only payload type bits: `payloadType := (headerByte >> 2) & 0x0F` - Include `path_len` byte in hash for TRACE packets (matching firmware behavior) - Applied to both `cmd/server/decoder.go` and `cmd/ingestor/decoder.go` ## Tests Added - **Route type independence:** Same payload with FLOOD vs DIRECT route types produces identical hash - **TRACE path_len inclusion:** TRACE packets with different `path_len` produce different hashes - **Firmware compatibility:** Hash output matches manual computation of firmware algorithm ## Migration Impact Existing packets in the DB have content hashes computed with the old (incorrect) formula. Options: 1. **Recompute hashes** via migration (recommended for clean state) 2. **Dual lookup** — check both old and new hash on queries (backward compat) 3. **Accept the break** — old hashes become stale, new packets get correct hashes Recommend option 1 (migration) as a follow-up. The volume of affected packets depends on how many distinct route types were seen for the same logical packet. Fixes #786 --------- Co-authored-by: you <you@example.com>
120 lines
3.3 KiB
Go
120 lines
3.3 KiB
Go
package main
|
|
|
|
import (
|
|
"log"
|
|
"time"
|
|
)
|
|
|
|
// migrateContentHashesAsync recomputes content hashes in batches after the
|
|
// server is already serving HTTP. Packets whose hash changes are updated in
|
|
// both the DB and the in-memory byHash index. The migration is idempotent:
|
|
// once all hashes match the current formula it completes instantly.
|
|
func migrateContentHashesAsync(store *PacketStore, batchSize int, yieldDuration time.Duration) {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
log.Printf("[hash-migrate] panic recovered: %v", r)
|
|
}
|
|
store.hashMigrationComplete.Store(true)
|
|
}()
|
|
|
|
// Snapshot the packet slice length under lock (packets only grow).
|
|
store.mu.RLock()
|
|
total := len(store.packets)
|
|
store.mu.RUnlock()
|
|
|
|
migrated := 0
|
|
for offset := 0; offset < total; offset += batchSize {
|
|
end := offset + batchSize
|
|
if end > total {
|
|
end = total
|
|
}
|
|
|
|
// Collect stale hashes in this batch under RLock.
|
|
type hashUpdate struct {
|
|
tx *StoreTx
|
|
oldHash string
|
|
newHash string
|
|
}
|
|
var updates []hashUpdate
|
|
|
|
store.mu.RLock()
|
|
for _, tx := range store.packets[offset:end] {
|
|
if tx.RawHex == "" {
|
|
continue
|
|
}
|
|
newHash := ComputeContentHash(tx.RawHex)
|
|
if newHash != tx.Hash {
|
|
updates = append(updates, hashUpdate{tx: tx, oldHash: tx.Hash, newHash: newHash})
|
|
}
|
|
}
|
|
store.mu.RUnlock()
|
|
|
|
if len(updates) == 0 {
|
|
continue
|
|
}
|
|
|
|
// Write batch to DB in a single transaction.
|
|
dbTx, err := store.db.conn.Begin()
|
|
if err != nil {
|
|
log.Printf("[hash-migrate] begin tx: %v", err)
|
|
continue
|
|
}
|
|
stmt, err := dbTx.Prepare("UPDATE transmissions SET hash = ? WHERE id = ?")
|
|
if err != nil {
|
|
log.Printf("[hash-migrate] prepare: %v", err)
|
|
dbTx.Rollback()
|
|
continue
|
|
}
|
|
|
|
for _, u := range updates {
|
|
if _, err := stmt.Exec(u.newHash, u.tx.ID); err != nil {
|
|
// UNIQUE constraint = two old hashes map to the same new hash (duplicate).
|
|
// Merge observations to the surviving tx, delete the duplicate.
|
|
log.Printf("[hash-migrate] tx %d collides — merging duplicate", u.tx.ID)
|
|
var survID int
|
|
if err2 := dbTx.QueryRow("SELECT id FROM transmissions WHERE hash = ?", u.newHash).Scan(&survID); err2 == nil {
|
|
dbTx.Exec("UPDATE observations SET transmission_id = ? WHERE transmission_id = ?", survID, u.tx.ID)
|
|
dbTx.Exec("DELETE FROM transmissions WHERE id = ?", u.tx.ID)
|
|
u.newHash = "" // mark for in-memory removal only
|
|
}
|
|
}
|
|
}
|
|
stmt.Close()
|
|
|
|
if err := dbTx.Commit(); err != nil {
|
|
log.Printf("[hash-migrate] commit: %v", err)
|
|
continue
|
|
}
|
|
|
|
// Update in-memory index under write lock.
|
|
store.mu.Lock()
|
|
for _, u := range updates {
|
|
delete(store.byHash, u.oldHash)
|
|
if u.newHash == "" {
|
|
// Merged duplicate — remove from packets slice and indexes.
|
|
delete(store.byTxID, u.tx.ID)
|
|
// Move observations to survivor if present.
|
|
if surv := store.byHash[ComputeContentHash(u.tx.RawHex)]; surv != nil {
|
|
for _, obs := range u.tx.Observations {
|
|
surv.Observations = append(surv.Observations, obs)
|
|
surv.ObservationCount++
|
|
}
|
|
}
|
|
} else {
|
|
u.tx.Hash = u.newHash
|
|
store.byHash[u.newHash] = u.tx
|
|
}
|
|
}
|
|
store.mu.Unlock()
|
|
|
|
migrated += len(updates)
|
|
|
|
// Yield to let HTTP handlers run.
|
|
time.Sleep(yieldDuration)
|
|
}
|
|
|
|
if migrated > 0 {
|
|
log.Printf("[hash-migrate] Migrated %d content hashes to new formula", migrated)
|
|
}
|
|
}
|