mirror of
https://github.com/Kpa-clawbot/meshcore-analyzer.git
synced 2026-04-26 12:57:22 +00:00
## Problem Channel API endpoints scan entire DB — 2.4s for channel list, 30s for messages. ## Fix - Added `channel_hash` column to transmissions (populated on ingest, backfilled on startup) - `GetChannels()` rewrites to GROUP BY channel_hash (one row per channel vs scanning every packet) - `GetChannelMessages()` filters by channel_hash at SQL level with proper LIMIT/OFFSET - 60s cache for channel list - Index: `idx_tx_channel_hash` for fast lookups Expected: 2.4s → <100ms for list, 30s → <500ms for messages. Fixes #762 --------- Co-authored-by: you <you@example.com>
This commit is contained in:
@@ -345,6 +345,28 @@ func applySchema(db *sql.DB) error {
|
||||
log.Println("[migration] packets_sent/packets_recv columns added")
|
||||
}
|
||||
|
||||
// Migration: add channel_hash column for fast channel queries (#762)
|
||||
row = db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'channel_hash_v1'")
|
||||
if row.Scan(&migDone) != nil {
|
||||
log.Println("[migration] Adding channel_hash column to transmissions...")
|
||||
db.Exec(`ALTER TABLE transmissions ADD COLUMN channel_hash TEXT DEFAULT NULL`)
|
||||
db.Exec(`CREATE INDEX IF NOT EXISTS idx_tx_channel_hash ON transmissions(channel_hash) WHERE payload_type = 5`)
|
||||
// Backfill: extract channel name for decrypted (CHAN) packets
|
||||
res, err := db.Exec(`UPDATE transmissions SET channel_hash = json_extract(decoded_json, '$.channel') WHERE payload_type = 5 AND channel_hash IS NULL AND json_extract(decoded_json, '$.type') = 'CHAN'`)
|
||||
if err == nil {
|
||||
n, _ := res.RowsAffected()
|
||||
log.Printf("[migration] Backfilled channel_hash for %d CHAN packets", n)
|
||||
}
|
||||
// Backfill: extract channelHashHex for encrypted (GRP_TXT) packets, prefixed with 'enc_'
|
||||
res, err = db.Exec(`UPDATE transmissions SET channel_hash = 'enc_' || json_extract(decoded_json, '$.channelHashHex') WHERE payload_type = 5 AND channel_hash IS NULL AND json_extract(decoded_json, '$.type') = 'GRP_TXT'`)
|
||||
if err == nil {
|
||||
n, _ := res.RowsAffected()
|
||||
log.Printf("[migration] Backfilled channel_hash for %d GRP_TXT packets", n)
|
||||
}
|
||||
db.Exec(`INSERT INTO _migrations (name) VALUES ('channel_hash_v1')`)
|
||||
log.Println("[migration] channel_hash column added and backfilled")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -357,8 +379,8 @@ func (s *Store) prepareStatements() error {
|
||||
}
|
||||
|
||||
s.stmtInsertTransmission, err = s.db.Prepare(`
|
||||
INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, payload_version, decoded_json)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, payload_version, decoded_json, channel_hash)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||||
`)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -481,7 +503,7 @@ func (s *Store) InsertTransmission(data *PacketData) (bool, error) {
|
||||
result, err := s.stmtInsertTransmission.Exec(
|
||||
data.RawHex, hash, now,
|
||||
data.RouteType, data.PayloadType, data.PayloadVersion,
|
||||
data.DecodedJSON,
|
||||
data.DecodedJSON, nilIfEmpty(data.ChannelHash),
|
||||
)
|
||||
if err != nil {
|
||||
s.Stats.WriteErrors.Add(1)
|
||||
@@ -773,6 +795,15 @@ type PacketData struct {
|
||||
PayloadVersion int
|
||||
PathJSON string
|
||||
DecodedJSON string
|
||||
ChannelHash string // grouping key for channel queries (#762)
|
||||
}
|
||||
|
||||
// nilIfEmpty returns nil for empty strings (for nullable DB columns).
|
||||
func nilIfEmpty(s string) interface{} {
|
||||
if s == "" {
|
||||
return nil
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
// MQTTPacketMessage is the JSON payload from an MQTT raw packet message.
|
||||
@@ -794,7 +825,7 @@ func BuildPacketData(msg *MQTTPacketMessage, decoded *DecodedPacket, observerID,
|
||||
pathJSON = string(b)
|
||||
}
|
||||
|
||||
return &PacketData{
|
||||
pd := &PacketData{
|
||||
RawHex: msg.Raw,
|
||||
Timestamp: now,
|
||||
ObserverID: observerID,
|
||||
@@ -810,4 +841,15 @@ func BuildPacketData(msg *MQTTPacketMessage, decoded *DecodedPacket, observerID,
|
||||
PathJSON: pathJSON,
|
||||
DecodedJSON: PayloadJSON(&decoded.Payload),
|
||||
}
|
||||
|
||||
// Populate channel_hash for fast channel queries (#762)
|
||||
if decoded.Header.PayloadType == PayloadGRP_TXT {
|
||||
if decoded.Payload.Type == "CHAN" && decoded.Payload.Channel != "" {
|
||||
pd.ChannelHash = decoded.Payload.Channel
|
||||
} else if decoded.Payload.Type == "GRP_TXT" && decoded.Payload.ChannelHashHex != "" {
|
||||
pd.ChannelHash = "enc_" + decoded.Payload.ChannelHashHex
|
||||
}
|
||||
}
|
||||
|
||||
return pd
|
||||
}
|
||||
|
||||
@@ -440,6 +440,7 @@ func handleMessage(store *Store, tag string, source MQTTSource, m mqtt.Message,
|
||||
PayloadType: 5, // GRP_TXT
|
||||
PathJSON: "[]",
|
||||
DecodedJSON: string(decodedJSON),
|
||||
ChannelHash: channelName, // fast channel queries (#762)
|
||||
}
|
||||
|
||||
if _, err := store.InsertTransmission(pktData); err != nil {
|
||||
|
||||
@@ -41,7 +41,7 @@ func setupTestDBv2(t *testing.T) *DB {
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT, raw_hex TEXT NOT NULL,
|
||||
hash TEXT NOT NULL UNIQUE, first_seen TEXT NOT NULL,
|
||||
route_type INTEGER, payload_type INTEGER, payload_version INTEGER,
|
||||
decoded_json TEXT, created_at TEXT DEFAULT (datetime('now'))
|
||||
decoded_json TEXT, channel_hash TEXT DEFAULT NULL, created_at TEXT DEFAULT (datetime('now'))
|
||||
);
|
||||
CREATE TABLE observations (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
|
||||
263
cmd/server/db.go
263
cmd/server/db.go
@@ -8,6 +8,7 @@ import (
|
||||
"math"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
_ "modernc.org/sqlite"
|
||||
@@ -19,6 +20,12 @@ type DB struct {
|
||||
path string // filesystem path to the database file
|
||||
isV3 bool // v3 schema: observer_idx in observations (vs observer_id in v2)
|
||||
hasResolvedPath bool // observations table has resolved_path column
|
||||
|
||||
// Channel list cache (60s TTL) — avoids repeated GROUP BY scans (#762)
|
||||
channelsCacheMu sync.Mutex
|
||||
channelsCacheKey string
|
||||
channelsCacheRes []map[string]interface{}
|
||||
channelsCacheExp time.Time
|
||||
}
|
||||
|
||||
// OpenDB opens a read-only SQLite connection with WAL mode.
|
||||
@@ -1158,6 +1165,16 @@ func (db *DB) GetChannels(region ...string) ([]map[string]interface{}, error) {
|
||||
if len(region) > 0 {
|
||||
regionParam = region[0]
|
||||
}
|
||||
|
||||
// Check cache (60s TTL)
|
||||
db.channelsCacheMu.Lock()
|
||||
if db.channelsCacheRes != nil && db.channelsCacheKey == regionParam && time.Now().Before(db.channelsCacheExp) {
|
||||
res := db.channelsCacheRes
|
||||
db.channelsCacheMu.Unlock()
|
||||
return res, nil
|
||||
}
|
||||
db.channelsCacheMu.Unlock()
|
||||
|
||||
regionCodes := normalizeRegionCodes(regionParam)
|
||||
|
||||
var querySQL string
|
||||
@@ -1171,27 +1188,54 @@ func (db *DB) GetChannels(region ...string) ([]map[string]interface{}, error) {
|
||||
}
|
||||
regionPlaceholder := strings.Join(placeholders, ",")
|
||||
if db.isV3 {
|
||||
querySQL = fmt.Sprintf(`SELECT DISTINCT t.decoded_json, t.first_seen
|
||||
querySQL = fmt.Sprintf(`SELECT t.channel_hash,
|
||||
COUNT(*) AS msg_count,
|
||||
MAX(t.first_seen) AS last_activity,
|
||||
(SELECT t2.decoded_json FROM transmissions t2
|
||||
WHERE t2.channel_hash = t.channel_hash AND t2.payload_type = 5
|
||||
ORDER BY t2.first_seen DESC LIMIT 1) AS sample_json
|
||||
FROM transmissions t
|
||||
JOIN observations o ON o.transmission_id = t.id
|
||||
LEFT JOIN observers obs ON obs.rowid = o.observer_idx
|
||||
WHERE t.payload_type = 5
|
||||
AND t.channel_hash IS NOT NULL
|
||||
AND t.channel_hash NOT LIKE 'enc_%%'
|
||||
AND obs.rowid IS NOT NULL AND UPPER(TRIM(obs.iata)) IN (%s)
|
||||
ORDER BY t.first_seen ASC`, regionPlaceholder)
|
||||
GROUP BY t.channel_hash
|
||||
ORDER BY last_activity DESC`, regionPlaceholder)
|
||||
} else {
|
||||
querySQL = fmt.Sprintf(`SELECT DISTINCT t.decoded_json, t.first_seen
|
||||
querySQL = fmt.Sprintf(`SELECT t.channel_hash,
|
||||
COUNT(*) AS msg_count,
|
||||
MAX(t.first_seen) AS last_activity,
|
||||
(SELECT t2.decoded_json FROM transmissions t2
|
||||
WHERE t2.channel_hash = t.channel_hash AND t2.payload_type = 5
|
||||
ORDER BY t2.first_seen DESC LIMIT 1) AS sample_json
|
||||
FROM transmissions t
|
||||
JOIN observations o ON o.transmission_id = t.id
|
||||
WHERE t.payload_type = 5
|
||||
AND t.channel_hash IS NOT NULL
|
||||
AND t.channel_hash NOT LIKE 'enc_%%'
|
||||
AND EXISTS (
|
||||
SELECT 1 FROM observers obs
|
||||
WHERE obs.id = o.observer_id
|
||||
AND UPPER(TRIM(obs.iata)) IN (%s)
|
||||
)
|
||||
ORDER BY t.first_seen ASC`, regionPlaceholder)
|
||||
GROUP BY t.channel_hash
|
||||
ORDER BY last_activity DESC`, regionPlaceholder)
|
||||
}
|
||||
} else {
|
||||
querySQL = `SELECT decoded_json, first_seen FROM transmissions WHERE payload_type = 5 ORDER BY first_seen ASC`
|
||||
querySQL = `SELECT channel_hash,
|
||||
COUNT(*) AS msg_count,
|
||||
MAX(first_seen) AS last_activity,
|
||||
(SELECT t2.decoded_json FROM transmissions t2
|
||||
WHERE t2.channel_hash = t.channel_hash AND t2.payload_type = 5
|
||||
ORDER BY t2.first_seen DESC LIMIT 1) AS sample_json
|
||||
FROM transmissions t
|
||||
WHERE payload_type = 5
|
||||
AND channel_hash IS NOT NULL
|
||||
AND channel_hash NOT LIKE 'enc_%%'
|
||||
GROUP BY channel_hash
|
||||
ORDER BY last_activity DESC`
|
||||
}
|
||||
|
||||
rows, err := db.conn.Query(querySQL, args...)
|
||||
@@ -1200,68 +1244,55 @@ func (db *DB) GetChannels(region ...string) ([]map[string]interface{}, error) {
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
channelMap := map[string]map[string]interface{}{}
|
||||
channels := make([]map[string]interface{}, 0)
|
||||
for rows.Next() {
|
||||
var dj, fs sql.NullString
|
||||
rows.Scan(&dj, &fs)
|
||||
if !dj.Valid {
|
||||
var chHash, lastActivity, sampleJSON sql.NullString
|
||||
var msgCount int
|
||||
if err := rows.Scan(&chHash, &msgCount, &lastActivity, &sampleJSON); err != nil {
|
||||
continue
|
||||
}
|
||||
var decoded map[string]interface{}
|
||||
if json.Unmarshal([]byte(dj.String), &decoded) != nil {
|
||||
continue
|
||||
}
|
||||
dtype, _ := decoded["type"].(string)
|
||||
if dtype != "CHAN" {
|
||||
continue
|
||||
}
|
||||
// Filter out garbage-decrypted channel names/messages (pre-#197 data still in DB)
|
||||
chanStr, _ := decoded["channel"].(string)
|
||||
textStr, _ := decoded["text"].(string)
|
||||
if hasGarbageChars(chanStr) || hasGarbageChars(textStr) {
|
||||
continue
|
||||
}
|
||||
channelName, _ := decoded["channel"].(string)
|
||||
channelName := nullStr(chHash)
|
||||
if channelName == "" {
|
||||
channelName = "unknown"
|
||||
continue
|
||||
}
|
||||
key := channelName
|
||||
|
||||
ch, exists := channelMap[key]
|
||||
if !exists {
|
||||
ch = map[string]interface{}{
|
||||
"hash": key, "name": channelName,
|
||||
"lastMessage": nil, "lastSender": nil,
|
||||
"messageCount": 0, "lastActivity": nullStr(fs),
|
||||
}
|
||||
channelMap[key] = ch
|
||||
}
|
||||
ch["messageCount"] = ch["messageCount"].(int) + 1
|
||||
if fs.Valid {
|
||||
ch["lastActivity"] = fs.String
|
||||
}
|
||||
if text, ok := decoded["text"].(string); ok && text != "" {
|
||||
idx := strings.Index(text, ": ")
|
||||
if idx > 0 {
|
||||
ch["lastMessage"] = text[idx+2:]
|
||||
} else {
|
||||
ch["lastMessage"] = text
|
||||
}
|
||||
if sender, ok := decoded["sender"].(string); ok {
|
||||
ch["lastSender"] = sender
|
||||
var lastMessage, lastSender interface{}
|
||||
if sampleJSON.Valid {
|
||||
var decoded map[string]interface{}
|
||||
if json.Unmarshal([]byte(sampleJSON.String), &decoded) == nil {
|
||||
if text, ok := decoded["text"].(string); ok && text != "" {
|
||||
idx := strings.Index(text, ": ")
|
||||
if idx > 0 {
|
||||
lastMessage = text[idx+2:]
|
||||
} else {
|
||||
lastMessage = text
|
||||
}
|
||||
if sender, ok := decoded["sender"].(string); ok {
|
||||
lastSender = sender
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
channels = append(channels, map[string]interface{}{
|
||||
"hash": channelName, "name": channelName,
|
||||
"lastMessage": lastMessage, "lastSender": lastSender,
|
||||
"messageCount": msgCount, "lastActivity": nullStr(lastActivity),
|
||||
})
|
||||
}
|
||||
|
||||
channels := make([]map[string]interface{}, 0, len(channelMap))
|
||||
for _, ch := range channelMap {
|
||||
channels = append(channels, ch)
|
||||
}
|
||||
// Store in cache (60s TTL)
|
||||
db.channelsCacheMu.Lock()
|
||||
db.channelsCacheRes = channels
|
||||
db.channelsCacheKey = regionParam
|
||||
db.channelsCacheExp = time.Now().Add(60 * time.Second)
|
||||
db.channelsCacheMu.Unlock()
|
||||
|
||||
return channels, nil
|
||||
}
|
||||
|
||||
// GetEncryptedChannels returns channels where all messages are undecryptable (no key).
|
||||
// These have decoded_json with type "GRP_TXT" and decryptionStatus "no_key".
|
||||
// Uses channel_hash column (prefixed with 'enc_') for fast grouped queries.
|
||||
func (db *DB) GetEncryptedChannels(region ...string) ([]map[string]interface{}, error) {
|
||||
regionParam := ""
|
||||
if len(region) > 0 {
|
||||
@@ -1280,27 +1311,42 @@ func (db *DB) GetEncryptedChannels(region ...string) ([]map[string]interface{},
|
||||
}
|
||||
regionPlaceholder := strings.Join(placeholders, ",")
|
||||
if db.isV3 {
|
||||
querySQL = fmt.Sprintf(`SELECT DISTINCT t.decoded_json, t.first_seen
|
||||
querySQL = fmt.Sprintf(`SELECT t.channel_hash,
|
||||
COUNT(*) AS msg_count,
|
||||
MAX(t.first_seen) AS last_activity
|
||||
FROM transmissions t
|
||||
JOIN observations o ON o.transmission_id = t.id
|
||||
LEFT JOIN observers obs ON obs.rowid = o.observer_idx
|
||||
WHERE t.payload_type = 5
|
||||
AND t.channel_hash LIKE 'enc_%%'
|
||||
AND obs.rowid IS NOT NULL AND UPPER(TRIM(obs.iata)) IN (%s)
|
||||
ORDER BY t.first_seen ASC`, regionPlaceholder)
|
||||
GROUP BY t.channel_hash
|
||||
ORDER BY last_activity DESC`, regionPlaceholder)
|
||||
} else {
|
||||
querySQL = fmt.Sprintf(`SELECT DISTINCT t.decoded_json, t.first_seen
|
||||
querySQL = fmt.Sprintf(`SELECT t.channel_hash,
|
||||
COUNT(*) AS msg_count,
|
||||
MAX(t.first_seen) AS last_activity
|
||||
FROM transmissions t
|
||||
JOIN observations o ON o.transmission_id = t.id
|
||||
WHERE t.payload_type = 5
|
||||
AND t.channel_hash LIKE 'enc_%%'
|
||||
AND EXISTS (
|
||||
SELECT 1 FROM observers obs
|
||||
WHERE obs.id = o.observer_id
|
||||
AND UPPER(TRIM(obs.iata)) IN (%s)
|
||||
)
|
||||
ORDER BY t.first_seen ASC`, regionPlaceholder)
|
||||
GROUP BY t.channel_hash
|
||||
ORDER BY last_activity DESC`, regionPlaceholder)
|
||||
}
|
||||
} else {
|
||||
querySQL = `SELECT decoded_json, first_seen FROM transmissions WHERE payload_type = 5 ORDER BY first_seen ASC`
|
||||
querySQL = `SELECT channel_hash,
|
||||
COUNT(*) AS msg_count,
|
||||
MAX(first_seen) AS last_activity
|
||||
FROM transmissions
|
||||
WHERE payload_type = 5
|
||||
AND channel_hash LIKE 'enc_%%'
|
||||
GROUP BY channel_hash
|
||||
ORDER BY last_activity DESC`
|
||||
}
|
||||
|
||||
rows, err := db.conn.Query(querySQL, args...)
|
||||
@@ -1309,64 +1355,22 @@ func (db *DB) GetEncryptedChannels(region ...string) ([]map[string]interface{},
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
type encChanInfo struct {
|
||||
hash string
|
||||
messageCount int
|
||||
lastActivity string
|
||||
}
|
||||
channelMap := map[string]*encChanInfo{}
|
||||
|
||||
channels := make([]map[string]interface{}, 0)
|
||||
for rows.Next() {
|
||||
var dj, fs sql.NullString
|
||||
if err := rows.Scan(&dj, &fs); err != nil { continue }
|
||||
if !dj.Valid {
|
||||
var chHash, lastActivity sql.NullString
|
||||
var msgCount int
|
||||
if err := rows.Scan(&chHash, &msgCount, &lastActivity); err != nil {
|
||||
continue
|
||||
}
|
||||
var decoded map[string]interface{}
|
||||
if json.Unmarshal([]byte(dj.String), &decoded) != nil {
|
||||
continue
|
||||
}
|
||||
dtype, _ := decoded["type"].(string)
|
||||
// Only include undecryptable GRP_TXT packets (not CHAN)
|
||||
if dtype != "GRP_TXT" {
|
||||
continue
|
||||
}
|
||||
ds, _ := decoded["decryptionStatus"].(string)
|
||||
if ds != "no_key" {
|
||||
continue
|
||||
}
|
||||
// Group by channelHashHex
|
||||
chHash, _ := decoded["channelHashHex"].(string)
|
||||
if chHash == "" {
|
||||
if chNum, ok := decoded["channelHash"].(float64); ok {
|
||||
chHash = fmt.Sprintf("%02X", int(chNum))
|
||||
}
|
||||
}
|
||||
if chHash == "" {
|
||||
chHash = "?"
|
||||
}
|
||||
key := chHash
|
||||
|
||||
ch, exists := channelMap[key]
|
||||
if !exists {
|
||||
ch = &encChanInfo{hash: key, lastActivity: nullStrVal(fs)}
|
||||
channelMap[key] = ch
|
||||
}
|
||||
ch.messageCount++
|
||||
if fs.Valid && fs.String > ch.lastActivity {
|
||||
ch.lastActivity = fs.String
|
||||
}
|
||||
}
|
||||
|
||||
channels := make([]map[string]interface{}, 0, len(channelMap))
|
||||
for _, ch := range channelMap {
|
||||
fullHash := nullStrVal(chHash) // e.g. "enc_3A"
|
||||
hexPart := strings.TrimPrefix(fullHash, "enc_")
|
||||
channels = append(channels, map[string]interface{}{
|
||||
"hash": "enc_" + ch.hash,
|
||||
"name": "Encrypted (0x" + ch.hash + ")",
|
||||
"hash": fullHash,
|
||||
"name": "Encrypted (0x" + hexPart + ")",
|
||||
"lastMessage": nil,
|
||||
"lastSender": nil,
|
||||
"messageCount": ch.messageCount,
|
||||
"lastActivity": ch.lastActivity,
|
||||
"messageCount": msgCount,
|
||||
"lastActivity": nullStr(lastActivity),
|
||||
"encrypted": true,
|
||||
})
|
||||
}
|
||||
@@ -1397,15 +1401,16 @@ func (db *DB) GetChannelMessages(channelHash string, limit, offset int, region .
|
||||
regionPlaceholders = strings.Join(placeholders, ",")
|
||||
}
|
||||
|
||||
// Fetch messages with channel_hash filter (pagination applied in Go after dedup)
|
||||
var querySQL string
|
||||
args := make([]interface{}, 0, len(regionArgs))
|
||||
args := []interface{}{channelHash}
|
||||
if db.isV3 {
|
||||
querySQL = `SELECT o.id, t.hash, t.decoded_json, t.first_seen,
|
||||
obs.id, obs.name, o.snr, o.path_json
|
||||
FROM observations o
|
||||
JOIN transmissions t ON t.id = o.transmission_id
|
||||
LEFT JOIN observers obs ON obs.rowid = o.observer_idx
|
||||
WHERE t.payload_type = 5`
|
||||
WHERE t.channel_hash = ? AND t.payload_type = 5`
|
||||
if len(regionCodes) > 0 {
|
||||
querySQL += fmt.Sprintf(" AND obs.rowid IS NOT NULL AND UPPER(TRIM(obs.iata)) IN (%s)", regionPlaceholders)
|
||||
args = append(args, regionArgs...)
|
||||
@@ -1417,14 +1422,11 @@ func (db *DB) GetChannelMessages(channelHash string, limit, offset int, region .
|
||||
o.observer_id, o.observer_name, o.snr, o.path_json
|
||||
FROM observations o
|
||||
JOIN transmissions t ON t.id = o.transmission_id
|
||||
WHERE t.payload_type = 5`
|
||||
WHERE t.channel_hash = ? AND t.payload_type = 5`
|
||||
if len(regionCodes) > 0 {
|
||||
querySQL += fmt.Sprintf(` AND EXISTS (
|
||||
SELECT 1
|
||||
FROM observers obs
|
||||
WHERE obs.id = o.observer_id
|
||||
AND UPPER(TRIM(obs.iata)) IN (%s)
|
||||
)`, regionPlaceholders)
|
||||
SELECT 1 FROM observers obs WHERE obs.id = o.observer_id
|
||||
AND UPPER(TRIM(obs.iata)) IN (%s))`, regionPlaceholders)
|
||||
args = append(args, regionArgs...)
|
||||
}
|
||||
querySQL += `
|
||||
@@ -1456,17 +1458,6 @@ func (db *DB) GetChannelMessages(channelHash string, limit, offset int, region .
|
||||
if json.Unmarshal([]byte(dj.String), &decoded) != nil {
|
||||
continue
|
||||
}
|
||||
dtype, _ := decoded["type"].(string)
|
||||
if dtype != "CHAN" {
|
||||
continue
|
||||
}
|
||||
ch, _ := decoded["channel"].(string)
|
||||
if ch == "" {
|
||||
ch = "unknown"
|
||||
}
|
||||
if ch != channelHash {
|
||||
continue
|
||||
}
|
||||
|
||||
text, _ := decoded["text"].(string)
|
||||
sender, _ := decoded["sender"].(string)
|
||||
@@ -1526,18 +1517,18 @@ func (db *DB) GetChannelMessages(channelHash string, limit, offset int, region .
|
||||
}
|
||||
}
|
||||
|
||||
total := len(msgOrder)
|
||||
// Return latest messages (tail)
|
||||
start := total - limit - offset
|
||||
// Return latest messages (tail) with pagination
|
||||
msgTotal := len(msgOrder)
|
||||
start := msgTotal - limit - offset
|
||||
if start < 0 {
|
||||
start = 0
|
||||
}
|
||||
end := total - offset
|
||||
end := msgTotal - offset
|
||||
if end < 0 {
|
||||
end = 0
|
||||
}
|
||||
if end > total {
|
||||
end = total
|
||||
if end > msgTotal {
|
||||
end = msgTotal
|
||||
}
|
||||
|
||||
messages := make([]map[string]interface{}, 0)
|
||||
@@ -1548,7 +1539,7 @@ func (db *DB) GetChannelMessages(channelHash string, limit, offset int, region .
|
||||
messages = append(messages, m.Data)
|
||||
}
|
||||
|
||||
return messages, total, nil
|
||||
return messages, msgTotal, nil
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -60,6 +60,7 @@ func setupTestDB(t *testing.T) *DB {
|
||||
payload_type INTEGER,
|
||||
payload_version INTEGER,
|
||||
decoded_json TEXT,
|
||||
channel_hash TEXT DEFAULT NULL,
|
||||
created_at TEXT DEFAULT (datetime('now'))
|
||||
);
|
||||
|
||||
@@ -124,10 +125,10 @@ func seedTestData(t *testing.T, db *DB) {
|
||||
VALUES ('1122334455667788', 'TestRoom', 'room', 37.4, -121.9, ?, '2026-01-01T00:00:00Z', 5)`, twoDaysAgo)
|
||||
|
||||
// Seed transmissions
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json)
|
||||
VALUES ('AABB', 'abc123def4567890', ?, 1, 4, '{"pubKey":"aabbccdd11223344","name":"TestRepeater","type":"ADVERT","timestamp":1700000000,"timestampISO":"2023-11-14T22:13:20.000Z","signature":"abcdef","flags":{"isRepeater":true},"lat":37.5,"lon":-122.0}')`, recent)
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json)
|
||||
VALUES ('CCDD', '1234567890abcdef', ?, 1, 5, '{"type":"CHAN","channel":"#test","text":"Hello: World","sender":"TestUser"}')`, yesterday)
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash)
|
||||
VALUES ('AABB', 'abc123def4567890', ?, 1, 4, '{"pubKey":"aabbccdd11223344","name":"TestRepeater","type":"ADVERT","timestamp":1700000000,"timestampISO":"2023-11-14T22:13:20.000Z","signature":"abcdef","flags":{"isRepeater":true},"lat":37.5,"lon":-122.0}', '#test')`, recent)
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash)
|
||||
VALUES ('CCDD', '1234567890abcdef', ?, 1, 5, '{"type":"CHAN","channel":"#test","text":"Hello: World","sender":"TestUser"}', '#test')`, yesterday)
|
||||
// Second ADVERT for same node with different hash_size (raw_hex byte 0x1F → hs=1 vs 0xBB → hs=3)
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json)
|
||||
VALUES ('AA1F', 'def456abc1230099', ?, 1, 4, '{"pubKey":"aabbccdd11223344","name":"TestRepeater","type":"ADVERT","timestamp":1700000100,"timestampISO":"2023-11-14T22:14:40.000Z","signature":"fedcba","flags":{"isRepeater":true},"lat":37.5,"lon":-122.0}')`, yesterday)
|
||||
@@ -735,12 +736,12 @@ func TestGetChannelMessagesRegionFiltering(t *testing.T) {
|
||||
|
||||
db.conn.Exec(`INSERT INTO observers (id, name, iata) VALUES ('obs1', 'Observer One', 'SJC')`)
|
||||
db.conn.Exec(`INSERT INTO observers (id, name, iata) VALUES ('obs2', 'Observer Two', ' sfo ')`)
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json)
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash)
|
||||
VALUES ('AA', 'chanregion0001', ?, 1, 5,
|
||||
'{"type":"CHAN","channel":"#region","text":"SjcUser: One","sender":"SjcUser"}')`, ts1)
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json)
|
||||
'{"type":"CHAN","channel":"#region","text":"SjcUser: One","sender":"SjcUser"}', '#region')`, ts1)
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash)
|
||||
VALUES ('BB', 'chanregion0002', ?, 1, 5,
|
||||
'{"type":"CHAN","channel":"#region","text":"SfoUser: Two","sender":"SfoUser"}')`, ts2)
|
||||
'{"type":"CHAN","channel":"#region","text":"SfoUser: Two","sender":"SfoUser"}', '#region')`, ts2)
|
||||
db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
|
||||
VALUES (1, 1, 10.0, -90, '[]', ?)`, epoch1)
|
||||
db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
|
||||
@@ -1119,6 +1120,7 @@ func setupTestDBV2(t *testing.T) *DB {
|
||||
payload_type INTEGER,
|
||||
payload_version INTEGER,
|
||||
decoded_json TEXT,
|
||||
channel_hash TEXT DEFAULT NULL,
|
||||
created_at TEXT DEFAULT (datetime('now'))
|
||||
);
|
||||
|
||||
@@ -1202,12 +1204,12 @@ func TestGetChannelMessagesDedup(t *testing.T) {
|
||||
db.conn.Exec(`INSERT INTO observers (id, name, iata) VALUES ('obs2', 'Observer Two', 'SFO')`)
|
||||
|
||||
// Insert two transmissions with same hash to test dedup
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json)
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash)
|
||||
VALUES ('AA', 'chanmsg00000001', '2026-01-15T10:00:00Z', 1, 5,
|
||||
'{"type":"CHAN","channel":"#general","text":"User1: Hello","sender":"User1"}')`)
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json)
|
||||
'{"type":"CHAN","channel":"#general","text":"User1: Hello","sender":"User1"}', '#general')`)
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash)
|
||||
VALUES ('BB', 'chanmsg00000002', '2026-01-15T10:01:00Z', 1, 5,
|
||||
'{"type":"CHAN","channel":"#general","text":"User2: World","sender":"User2"}')`)
|
||||
'{"type":"CHAN","channel":"#general","text":"User2: World","sender":"User2"}', '#general')`)
|
||||
|
||||
// Observations: first msg seen by two observers (dedup), second by one
|
||||
db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
|
||||
@@ -1251,9 +1253,9 @@ func TestGetChannelMessagesNoSender(t *testing.T) {
|
||||
defer db.Close()
|
||||
|
||||
db.conn.Exec(`INSERT INTO observers (id, name, iata) VALUES ('obs1', 'Observer One', 'SJC')`)
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json)
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash)
|
||||
VALUES ('CC', 'chanmsg00000003', '2026-01-15T10:02:00Z', 1, 5,
|
||||
'{"type":"CHAN","channel":"#noname","text":"plain text no colon"}')`)
|
||||
'{"type":"CHAN","channel":"#noname","text":"plain text no colon"}', '#noname')`)
|
||||
db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
|
||||
VALUES (1, 1, 12.0, -90, null, 1736935300)`)
|
||||
|
||||
@@ -1356,9 +1358,9 @@ func TestGetChannelMessagesObserverFallback(t *testing.T) {
|
||||
defer db.Close()
|
||||
|
||||
// Observer with ID but no name entry (observer_idx won't match)
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json)
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash)
|
||||
VALUES ('AA', 'chanmsg00000004', '2026-01-15T10:00:00Z', 1, 5,
|
||||
'{"type":"CHAN","channel":"#obs","text":"Sender: Test","sender":"Sender"}')`)
|
||||
'{"type":"CHAN","channel":"#obs","text":"Sender: Test","sender":"Sender"}', '#obs')`)
|
||||
// Observation without observer (observer_idx = NULL)
|
||||
db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
|
||||
VALUES (1, NULL, 12.0, -90, null, 1736935200)`)
|
||||
@@ -1380,12 +1382,12 @@ func TestGetChannelsMultiple(t *testing.T) {
|
||||
defer db.Close()
|
||||
|
||||
db.conn.Exec(`INSERT INTO observers (id, name, iata) VALUES ('obs1', 'Observer', 'SJC')`)
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json)
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash)
|
||||
VALUES ('AA', 'chan1hash', '2026-01-15T10:00:00Z', 1, 5,
|
||||
'{"type":"CHAN","channel":"#alpha","text":"Alice: Hello","sender":"Alice"}')`)
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json)
|
||||
'{"type":"CHAN","channel":"#alpha","text":"Alice: Hello","sender":"Alice"}', '#alpha')`)
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash)
|
||||
VALUES ('BB', 'chan2hash', '2026-01-15T10:01:00Z', 1, 5,
|
||||
'{"type":"CHAN","channel":"#beta","text":"Bob: World","sender":"Bob"}')`)
|
||||
'{"type":"CHAN","channel":"#beta","text":"Bob: World","sender":"Bob"}', '#beta')`)
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json)
|
||||
VALUES ('CC', 'chan3hash', '2026-01-15T10:02:00Z', 1, 5,
|
||||
'{"type":"CHAN","channel":"","text":"No channel"}')`)
|
||||
@@ -1468,13 +1470,13 @@ func TestGetChannelsStaleMessage(t *testing.T) {
|
||||
db.conn.Exec(`INSERT INTO observers (id, name, iata) VALUES ('obs2', 'Observer2', 'SFO')`)
|
||||
|
||||
// Older message (first_seen T1)
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json)
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash)
|
||||
VALUES ('AA', 'oldhash1', '2026-01-15T10:00:00Z', 1, 5,
|
||||
'{"type":"CHAN","channel":"#test","text":"Alice: Old message","sender":"Alice"}')`)
|
||||
'{"type":"CHAN","channel":"#test","text":"Alice: Old message","sender":"Alice"}', '#test')`)
|
||||
// Newer message (first_seen T2 > T1)
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json)
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash)
|
||||
VALUES ('BB', 'newhash2', '2026-01-15T10:05:00Z', 1, 5,
|
||||
'{"type":"CHAN","channel":"#test","text":"Bob: New message","sender":"Bob"}')`)
|
||||
'{"type":"CHAN","channel":"#test","text":"Bob: New message","sender":"Bob"}', '#test')`)
|
||||
|
||||
// Observations: older message re-observed AFTER newer message (stale scenario)
|
||||
db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, timestamp)
|
||||
@@ -1512,16 +1514,16 @@ func TestGetChannelsRegionFiltering(t *testing.T) {
|
||||
db.conn.Exec(`INSERT INTO observers (id, name, iata) VALUES ('obs2', 'Observer2', 'SFO')`)
|
||||
|
||||
// Channel message seen only in SJC
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json)
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash)
|
||||
VALUES ('AA', 'hash1', '2026-01-15T10:00:00Z', 1, 5,
|
||||
'{"type":"CHAN","channel":"#sjc-only","text":"Alice: Hello SJC","sender":"Alice"}')`)
|
||||
'{"type":"CHAN","channel":"#sjc-only","text":"Alice: Hello SJC","sender":"Alice"}', '#sjc-only')`)
|
||||
db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, timestamp)
|
||||
VALUES (1, 1, 12.0, -90, 1736935200)`)
|
||||
|
||||
// Channel message seen only in SFO
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json)
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash)
|
||||
VALUES ('BB', 'hash2', '2026-01-15T10:05:00Z', 1, 5,
|
||||
'{"type":"CHAN","channel":"#sfo-only","text":"Bob: Hello SFO","sender":"Bob"}')`)
|
||||
'{"type":"CHAN","channel":"#sfo-only","text":"Bob: Hello SFO","sender":"Bob"}', '#sfo-only')`)
|
||||
db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, timestamp)
|
||||
VALUES (2, 2, 14.0, -88, 1736935500)`)
|
||||
|
||||
|
||||
@@ -15,10 +15,10 @@ func seedEncryptedChannelData(t *testing.T, db *DB) {
|
||||
recentEpoch := now.Add(-1 * time.Hour).Unix()
|
||||
|
||||
// Two encrypted GRP_TXT packets on channel hash "A1B2"
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json)
|
||||
VALUES ('EE01', 'enc_hash_001', ?, 1, 5, '{"type":"GRP_TXT","channelHashHex":"A1B2","decryptionStatus":"no_key"}')`, recent)
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json)
|
||||
VALUES ('EE02', 'enc_hash_002', ?, 1, 5, '{"type":"GRP_TXT","channelHashHex":"A1B2","decryptionStatus":"no_key"}')`, recent)
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash)
|
||||
VALUES ('EE01', 'enc_hash_001', ?, 1, 5, '{"type":"GRP_TXT","channelHashHex":"A1B2","decryptionStatus":"no_key"}', 'enc_A1B2')`, recent)
|
||||
db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash)
|
||||
VALUES ('EE02', 'enc_hash_002', ?, 1, 5, '{"type":"GRP_TXT","channelHashHex":"A1B2","decryptionStatus":"no_key"}', 'enc_A1B2')`, recent)
|
||||
|
||||
// Observations for both
|
||||
db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp)
|
||||
|
||||
@@ -27,7 +27,7 @@ func createTestDBWithSchema(t *testing.T) (*DB, string) {
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
raw_hex TEXT, hash TEXT UNIQUE, first_seen TEXT,
|
||||
route_type INTEGER, payload_type INTEGER, payload_version INTEGER,
|
||||
decoded_json TEXT
|
||||
decoded_json TEXT, channel_hash TEXT DEFAULT NULL
|
||||
)`)
|
||||
conn.Exec(`CREATE TABLE observers (
|
||||
id TEXT PRIMARY KEY, name TEXT, iata TEXT
|
||||
|
||||
Binary file not shown.
Reference in New Issue
Block a user