diff --git a/cmd/ingestor/db.go b/cmd/ingestor/db.go index cd5992e..439ef1a 100644 --- a/cmd/ingestor/db.go +++ b/cmd/ingestor/db.go @@ -345,6 +345,28 @@ func applySchema(db *sql.DB) error { log.Println("[migration] packets_sent/packets_recv columns added") } + // Migration: add channel_hash column for fast channel queries (#762) + row = db.QueryRow("SELECT 1 FROM _migrations WHERE name = 'channel_hash_v1'") + if row.Scan(&migDone) != nil { + log.Println("[migration] Adding channel_hash column to transmissions...") + db.Exec(`ALTER TABLE transmissions ADD COLUMN channel_hash TEXT DEFAULT NULL`) + db.Exec(`CREATE INDEX IF NOT EXISTS idx_tx_channel_hash ON transmissions(channel_hash) WHERE payload_type = 5`) + // Backfill: extract channel name for decrypted (CHAN) packets + res, err := db.Exec(`UPDATE transmissions SET channel_hash = json_extract(decoded_json, '$.channel') WHERE payload_type = 5 AND channel_hash IS NULL AND json_extract(decoded_json, '$.type') = 'CHAN'`) + if err == nil { + n, _ := res.RowsAffected() + log.Printf("[migration] Backfilled channel_hash for %d CHAN packets", n) + } + // Backfill: extract channelHashHex for encrypted (GRP_TXT) packets, prefixed with 'enc_' + res, err = db.Exec(`UPDATE transmissions SET channel_hash = 'enc_' || json_extract(decoded_json, '$.channelHashHex') WHERE payload_type = 5 AND channel_hash IS NULL AND json_extract(decoded_json, '$.type') = 'GRP_TXT'`) + if err == nil { + n, _ := res.RowsAffected() + log.Printf("[migration] Backfilled channel_hash for %d GRP_TXT packets", n) + } + db.Exec(`INSERT INTO _migrations (name) VALUES ('channel_hash_v1')`) + log.Println("[migration] channel_hash column added and backfilled") + } + return nil } @@ -357,8 +379,8 @@ func (s *Store) prepareStatements() error { } s.stmtInsertTransmission, err = s.db.Prepare(` - INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, payload_version, decoded_json) - VALUES (?, ?, ?, ?, ?, ?, ?) + INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, payload_version, decoded_json, channel_hash) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) `) if err != nil { return err @@ -481,7 +503,7 @@ func (s *Store) InsertTransmission(data *PacketData) (bool, error) { result, err := s.stmtInsertTransmission.Exec( data.RawHex, hash, now, data.RouteType, data.PayloadType, data.PayloadVersion, - data.DecodedJSON, + data.DecodedJSON, nilIfEmpty(data.ChannelHash), ) if err != nil { s.Stats.WriteErrors.Add(1) @@ -773,6 +795,15 @@ type PacketData struct { PayloadVersion int PathJSON string DecodedJSON string + ChannelHash string // grouping key for channel queries (#762) +} + +// nilIfEmpty returns nil for empty strings (for nullable DB columns). +func nilIfEmpty(s string) interface{} { + if s == "" { + return nil + } + return s } // MQTTPacketMessage is the JSON payload from an MQTT raw packet message. @@ -794,7 +825,7 @@ func BuildPacketData(msg *MQTTPacketMessage, decoded *DecodedPacket, observerID, pathJSON = string(b) } - return &PacketData{ + pd := &PacketData{ RawHex: msg.Raw, Timestamp: now, ObserverID: observerID, @@ -810,4 +841,15 @@ func BuildPacketData(msg *MQTTPacketMessage, decoded *DecodedPacket, observerID, PathJSON: pathJSON, DecodedJSON: PayloadJSON(&decoded.Payload), } + + // Populate channel_hash for fast channel queries (#762) + if decoded.Header.PayloadType == PayloadGRP_TXT { + if decoded.Payload.Type == "CHAN" && decoded.Payload.Channel != "" { + pd.ChannelHash = decoded.Payload.Channel + } else if decoded.Payload.Type == "GRP_TXT" && decoded.Payload.ChannelHashHex != "" { + pd.ChannelHash = "enc_" + decoded.Payload.ChannelHashHex + } + } + + return pd } diff --git a/cmd/ingestor/main.go b/cmd/ingestor/main.go index be333a8..d1da305 100644 --- a/cmd/ingestor/main.go +++ b/cmd/ingestor/main.go @@ -440,6 +440,7 @@ func handleMessage(store *Store, tag string, source MQTTSource, m mqtt.Message, PayloadType: 5, // GRP_TXT PathJSON: "[]", DecodedJSON: string(decodedJSON), + ChannelHash: channelName, // fast channel queries (#762) } if _, err := store.InsertTransmission(pktData); err != nil { diff --git a/cmd/server/coverage_test.go b/cmd/server/coverage_test.go index 3b8f15e..09f5a69 100644 --- a/cmd/server/coverage_test.go +++ b/cmd/server/coverage_test.go @@ -41,7 +41,7 @@ func setupTestDBv2(t *testing.T) *DB { id INTEGER PRIMARY KEY AUTOINCREMENT, raw_hex TEXT NOT NULL, hash TEXT NOT NULL UNIQUE, first_seen TEXT NOT NULL, route_type INTEGER, payload_type INTEGER, payload_version INTEGER, - decoded_json TEXT, created_at TEXT DEFAULT (datetime('now')) + decoded_json TEXT, channel_hash TEXT DEFAULT NULL, created_at TEXT DEFAULT (datetime('now')) ); CREATE TABLE observations ( id INTEGER PRIMARY KEY AUTOINCREMENT, diff --git a/cmd/server/db.go b/cmd/server/db.go index e3d8363..227a199 100644 --- a/cmd/server/db.go +++ b/cmd/server/db.go @@ -8,6 +8,7 @@ import ( "math" "os" "strings" + "sync" "time" _ "modernc.org/sqlite" @@ -19,6 +20,12 @@ type DB struct { path string // filesystem path to the database file isV3 bool // v3 schema: observer_idx in observations (vs observer_id in v2) hasResolvedPath bool // observations table has resolved_path column + + // Channel list cache (60s TTL) — avoids repeated GROUP BY scans (#762) + channelsCacheMu sync.Mutex + channelsCacheKey string + channelsCacheRes []map[string]interface{} + channelsCacheExp time.Time } // OpenDB opens a read-only SQLite connection with WAL mode. @@ -1158,6 +1165,16 @@ func (db *DB) GetChannels(region ...string) ([]map[string]interface{}, error) { if len(region) > 0 { regionParam = region[0] } + + // Check cache (60s TTL) + db.channelsCacheMu.Lock() + if db.channelsCacheRes != nil && db.channelsCacheKey == regionParam && time.Now().Before(db.channelsCacheExp) { + res := db.channelsCacheRes + db.channelsCacheMu.Unlock() + return res, nil + } + db.channelsCacheMu.Unlock() + regionCodes := normalizeRegionCodes(regionParam) var querySQL string @@ -1171,27 +1188,54 @@ func (db *DB) GetChannels(region ...string) ([]map[string]interface{}, error) { } regionPlaceholder := strings.Join(placeholders, ",") if db.isV3 { - querySQL = fmt.Sprintf(`SELECT DISTINCT t.decoded_json, t.first_seen + querySQL = fmt.Sprintf(`SELECT t.channel_hash, + COUNT(*) AS msg_count, + MAX(t.first_seen) AS last_activity, + (SELECT t2.decoded_json FROM transmissions t2 + WHERE t2.channel_hash = t.channel_hash AND t2.payload_type = 5 + ORDER BY t2.first_seen DESC LIMIT 1) AS sample_json FROM transmissions t JOIN observations o ON o.transmission_id = t.id LEFT JOIN observers obs ON obs.rowid = o.observer_idx WHERE t.payload_type = 5 + AND t.channel_hash IS NOT NULL + AND t.channel_hash NOT LIKE 'enc_%%' AND obs.rowid IS NOT NULL AND UPPER(TRIM(obs.iata)) IN (%s) - ORDER BY t.first_seen ASC`, regionPlaceholder) + GROUP BY t.channel_hash + ORDER BY last_activity DESC`, regionPlaceholder) } else { - querySQL = fmt.Sprintf(`SELECT DISTINCT t.decoded_json, t.first_seen + querySQL = fmt.Sprintf(`SELECT t.channel_hash, + COUNT(*) AS msg_count, + MAX(t.first_seen) AS last_activity, + (SELECT t2.decoded_json FROM transmissions t2 + WHERE t2.channel_hash = t.channel_hash AND t2.payload_type = 5 + ORDER BY t2.first_seen DESC LIMIT 1) AS sample_json FROM transmissions t JOIN observations o ON o.transmission_id = t.id WHERE t.payload_type = 5 + AND t.channel_hash IS NOT NULL + AND t.channel_hash NOT LIKE 'enc_%%' AND EXISTS ( SELECT 1 FROM observers obs WHERE obs.id = o.observer_id AND UPPER(TRIM(obs.iata)) IN (%s) ) - ORDER BY t.first_seen ASC`, regionPlaceholder) + GROUP BY t.channel_hash + ORDER BY last_activity DESC`, regionPlaceholder) } } else { - querySQL = `SELECT decoded_json, first_seen FROM transmissions WHERE payload_type = 5 ORDER BY first_seen ASC` + querySQL = `SELECT channel_hash, + COUNT(*) AS msg_count, + MAX(first_seen) AS last_activity, + (SELECT t2.decoded_json FROM transmissions t2 + WHERE t2.channel_hash = t.channel_hash AND t2.payload_type = 5 + ORDER BY t2.first_seen DESC LIMIT 1) AS sample_json + FROM transmissions t + WHERE payload_type = 5 + AND channel_hash IS NOT NULL + AND channel_hash NOT LIKE 'enc_%%' + GROUP BY channel_hash + ORDER BY last_activity DESC` } rows, err := db.conn.Query(querySQL, args...) @@ -1200,68 +1244,55 @@ func (db *DB) GetChannels(region ...string) ([]map[string]interface{}, error) { } defer rows.Close() - channelMap := map[string]map[string]interface{}{} + channels := make([]map[string]interface{}, 0) for rows.Next() { - var dj, fs sql.NullString - rows.Scan(&dj, &fs) - if !dj.Valid { + var chHash, lastActivity, sampleJSON sql.NullString + var msgCount int + if err := rows.Scan(&chHash, &msgCount, &lastActivity, &sampleJSON); err != nil { continue } - var decoded map[string]interface{} - if json.Unmarshal([]byte(dj.String), &decoded) != nil { - continue - } - dtype, _ := decoded["type"].(string) - if dtype != "CHAN" { - continue - } - // Filter out garbage-decrypted channel names/messages (pre-#197 data still in DB) - chanStr, _ := decoded["channel"].(string) - textStr, _ := decoded["text"].(string) - if hasGarbageChars(chanStr) || hasGarbageChars(textStr) { - continue - } - channelName, _ := decoded["channel"].(string) + channelName := nullStr(chHash) if channelName == "" { - channelName = "unknown" + continue } - key := channelName - ch, exists := channelMap[key] - if !exists { - ch = map[string]interface{}{ - "hash": key, "name": channelName, - "lastMessage": nil, "lastSender": nil, - "messageCount": 0, "lastActivity": nullStr(fs), - } - channelMap[key] = ch - } - ch["messageCount"] = ch["messageCount"].(int) + 1 - if fs.Valid { - ch["lastActivity"] = fs.String - } - if text, ok := decoded["text"].(string); ok && text != "" { - idx := strings.Index(text, ": ") - if idx > 0 { - ch["lastMessage"] = text[idx+2:] - } else { - ch["lastMessage"] = text - } - if sender, ok := decoded["sender"].(string); ok { - ch["lastSender"] = sender + var lastMessage, lastSender interface{} + if sampleJSON.Valid { + var decoded map[string]interface{} + if json.Unmarshal([]byte(sampleJSON.String), &decoded) == nil { + if text, ok := decoded["text"].(string); ok && text != "" { + idx := strings.Index(text, ": ") + if idx > 0 { + lastMessage = text[idx+2:] + } else { + lastMessage = text + } + if sender, ok := decoded["sender"].(string); ok { + lastSender = sender + } + } } } + + channels = append(channels, map[string]interface{}{ + "hash": channelName, "name": channelName, + "lastMessage": lastMessage, "lastSender": lastSender, + "messageCount": msgCount, "lastActivity": nullStr(lastActivity), + }) } - channels := make([]map[string]interface{}, 0, len(channelMap)) - for _, ch := range channelMap { - channels = append(channels, ch) - } + // Store in cache (60s TTL) + db.channelsCacheMu.Lock() + db.channelsCacheRes = channels + db.channelsCacheKey = regionParam + db.channelsCacheExp = time.Now().Add(60 * time.Second) + db.channelsCacheMu.Unlock() + return channels, nil } // GetEncryptedChannels returns channels where all messages are undecryptable (no key). -// These have decoded_json with type "GRP_TXT" and decryptionStatus "no_key". +// Uses channel_hash column (prefixed with 'enc_') for fast grouped queries. func (db *DB) GetEncryptedChannels(region ...string) ([]map[string]interface{}, error) { regionParam := "" if len(region) > 0 { @@ -1280,27 +1311,42 @@ func (db *DB) GetEncryptedChannels(region ...string) ([]map[string]interface{}, } regionPlaceholder := strings.Join(placeholders, ",") if db.isV3 { - querySQL = fmt.Sprintf(`SELECT DISTINCT t.decoded_json, t.first_seen + querySQL = fmt.Sprintf(`SELECT t.channel_hash, + COUNT(*) AS msg_count, + MAX(t.first_seen) AS last_activity FROM transmissions t JOIN observations o ON o.transmission_id = t.id LEFT JOIN observers obs ON obs.rowid = o.observer_idx WHERE t.payload_type = 5 + AND t.channel_hash LIKE 'enc_%%' AND obs.rowid IS NOT NULL AND UPPER(TRIM(obs.iata)) IN (%s) - ORDER BY t.first_seen ASC`, regionPlaceholder) + GROUP BY t.channel_hash + ORDER BY last_activity DESC`, regionPlaceholder) } else { - querySQL = fmt.Sprintf(`SELECT DISTINCT t.decoded_json, t.first_seen + querySQL = fmt.Sprintf(`SELECT t.channel_hash, + COUNT(*) AS msg_count, + MAX(t.first_seen) AS last_activity FROM transmissions t JOIN observations o ON o.transmission_id = t.id WHERE t.payload_type = 5 + AND t.channel_hash LIKE 'enc_%%' AND EXISTS ( SELECT 1 FROM observers obs WHERE obs.id = o.observer_id AND UPPER(TRIM(obs.iata)) IN (%s) ) - ORDER BY t.first_seen ASC`, regionPlaceholder) + GROUP BY t.channel_hash + ORDER BY last_activity DESC`, regionPlaceholder) } } else { - querySQL = `SELECT decoded_json, first_seen FROM transmissions WHERE payload_type = 5 ORDER BY first_seen ASC` + querySQL = `SELECT channel_hash, + COUNT(*) AS msg_count, + MAX(first_seen) AS last_activity + FROM transmissions + WHERE payload_type = 5 + AND channel_hash LIKE 'enc_%%' + GROUP BY channel_hash + ORDER BY last_activity DESC` } rows, err := db.conn.Query(querySQL, args...) @@ -1309,64 +1355,22 @@ func (db *DB) GetEncryptedChannels(region ...string) ([]map[string]interface{}, } defer rows.Close() - type encChanInfo struct { - hash string - messageCount int - lastActivity string - } - channelMap := map[string]*encChanInfo{} - + channels := make([]map[string]interface{}, 0) for rows.Next() { - var dj, fs sql.NullString - if err := rows.Scan(&dj, &fs); err != nil { continue } - if !dj.Valid { + var chHash, lastActivity sql.NullString + var msgCount int + if err := rows.Scan(&chHash, &msgCount, &lastActivity); err != nil { continue } - var decoded map[string]interface{} - if json.Unmarshal([]byte(dj.String), &decoded) != nil { - continue - } - dtype, _ := decoded["type"].(string) - // Only include undecryptable GRP_TXT packets (not CHAN) - if dtype != "GRP_TXT" { - continue - } - ds, _ := decoded["decryptionStatus"].(string) - if ds != "no_key" { - continue - } - // Group by channelHashHex - chHash, _ := decoded["channelHashHex"].(string) - if chHash == "" { - if chNum, ok := decoded["channelHash"].(float64); ok { - chHash = fmt.Sprintf("%02X", int(chNum)) - } - } - if chHash == "" { - chHash = "?" - } - key := chHash - - ch, exists := channelMap[key] - if !exists { - ch = &encChanInfo{hash: key, lastActivity: nullStrVal(fs)} - channelMap[key] = ch - } - ch.messageCount++ - if fs.Valid && fs.String > ch.lastActivity { - ch.lastActivity = fs.String - } - } - - channels := make([]map[string]interface{}, 0, len(channelMap)) - for _, ch := range channelMap { + fullHash := nullStrVal(chHash) // e.g. "enc_3A" + hexPart := strings.TrimPrefix(fullHash, "enc_") channels = append(channels, map[string]interface{}{ - "hash": "enc_" + ch.hash, - "name": "Encrypted (0x" + ch.hash + ")", + "hash": fullHash, + "name": "Encrypted (0x" + hexPart + ")", "lastMessage": nil, "lastSender": nil, - "messageCount": ch.messageCount, - "lastActivity": ch.lastActivity, + "messageCount": msgCount, + "lastActivity": nullStr(lastActivity), "encrypted": true, }) } @@ -1397,15 +1401,16 @@ func (db *DB) GetChannelMessages(channelHash string, limit, offset int, region . regionPlaceholders = strings.Join(placeholders, ",") } + // Fetch messages with channel_hash filter (pagination applied in Go after dedup) var querySQL string - args := make([]interface{}, 0, len(regionArgs)) + args := []interface{}{channelHash} if db.isV3 { querySQL = `SELECT o.id, t.hash, t.decoded_json, t.first_seen, obs.id, obs.name, o.snr, o.path_json FROM observations o JOIN transmissions t ON t.id = o.transmission_id LEFT JOIN observers obs ON obs.rowid = o.observer_idx - WHERE t.payload_type = 5` + WHERE t.channel_hash = ? AND t.payload_type = 5` if len(regionCodes) > 0 { querySQL += fmt.Sprintf(" AND obs.rowid IS NOT NULL AND UPPER(TRIM(obs.iata)) IN (%s)", regionPlaceholders) args = append(args, regionArgs...) @@ -1417,14 +1422,11 @@ func (db *DB) GetChannelMessages(channelHash string, limit, offset int, region . o.observer_id, o.observer_name, o.snr, o.path_json FROM observations o JOIN transmissions t ON t.id = o.transmission_id - WHERE t.payload_type = 5` + WHERE t.channel_hash = ? AND t.payload_type = 5` if len(regionCodes) > 0 { querySQL += fmt.Sprintf(` AND EXISTS ( - SELECT 1 - FROM observers obs - WHERE obs.id = o.observer_id - AND UPPER(TRIM(obs.iata)) IN (%s) - )`, regionPlaceholders) + SELECT 1 FROM observers obs WHERE obs.id = o.observer_id + AND UPPER(TRIM(obs.iata)) IN (%s))`, regionPlaceholders) args = append(args, regionArgs...) } querySQL += ` @@ -1456,17 +1458,6 @@ func (db *DB) GetChannelMessages(channelHash string, limit, offset int, region . if json.Unmarshal([]byte(dj.String), &decoded) != nil { continue } - dtype, _ := decoded["type"].(string) - if dtype != "CHAN" { - continue - } - ch, _ := decoded["channel"].(string) - if ch == "" { - ch = "unknown" - } - if ch != channelHash { - continue - } text, _ := decoded["text"].(string) sender, _ := decoded["sender"].(string) @@ -1526,18 +1517,18 @@ func (db *DB) GetChannelMessages(channelHash string, limit, offset int, region . } } - total := len(msgOrder) - // Return latest messages (tail) - start := total - limit - offset + // Return latest messages (tail) with pagination + msgTotal := len(msgOrder) + start := msgTotal - limit - offset if start < 0 { start = 0 } - end := total - offset + end := msgTotal - offset if end < 0 { end = 0 } - if end > total { - end = total + if end > msgTotal { + end = msgTotal } messages := make([]map[string]interface{}, 0) @@ -1548,7 +1539,7 @@ func (db *DB) GetChannelMessages(channelHash string, limit, offset int, region . messages = append(messages, m.Data) } - return messages, total, nil + return messages, msgTotal, nil } diff --git a/cmd/server/db_test.go b/cmd/server/db_test.go index 2783cdb..f47fd04 100644 --- a/cmd/server/db_test.go +++ b/cmd/server/db_test.go @@ -60,6 +60,7 @@ func setupTestDB(t *testing.T) *DB { payload_type INTEGER, payload_version INTEGER, decoded_json TEXT, + channel_hash TEXT DEFAULT NULL, created_at TEXT DEFAULT (datetime('now')) ); @@ -124,10 +125,10 @@ func seedTestData(t *testing.T, db *DB) { VALUES ('1122334455667788', 'TestRoom', 'room', 37.4, -121.9, ?, '2026-01-01T00:00:00Z', 5)`, twoDaysAgo) // Seed transmissions - db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json) - VALUES ('AABB', 'abc123def4567890', ?, 1, 4, '{"pubKey":"aabbccdd11223344","name":"TestRepeater","type":"ADVERT","timestamp":1700000000,"timestampISO":"2023-11-14T22:13:20.000Z","signature":"abcdef","flags":{"isRepeater":true},"lat":37.5,"lon":-122.0}')`, recent) - db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json) - VALUES ('CCDD', '1234567890abcdef', ?, 1, 5, '{"type":"CHAN","channel":"#test","text":"Hello: World","sender":"TestUser"}')`, yesterday) + db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash) + VALUES ('AABB', 'abc123def4567890', ?, 1, 4, '{"pubKey":"aabbccdd11223344","name":"TestRepeater","type":"ADVERT","timestamp":1700000000,"timestampISO":"2023-11-14T22:13:20.000Z","signature":"abcdef","flags":{"isRepeater":true},"lat":37.5,"lon":-122.0}', '#test')`, recent) + db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash) + VALUES ('CCDD', '1234567890abcdef', ?, 1, 5, '{"type":"CHAN","channel":"#test","text":"Hello: World","sender":"TestUser"}', '#test')`, yesterday) // Second ADVERT for same node with different hash_size (raw_hex byte 0x1F → hs=1 vs 0xBB → hs=3) db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json) VALUES ('AA1F', 'def456abc1230099', ?, 1, 4, '{"pubKey":"aabbccdd11223344","name":"TestRepeater","type":"ADVERT","timestamp":1700000100,"timestampISO":"2023-11-14T22:14:40.000Z","signature":"fedcba","flags":{"isRepeater":true},"lat":37.5,"lon":-122.0}')`, yesterday) @@ -735,12 +736,12 @@ func TestGetChannelMessagesRegionFiltering(t *testing.T) { db.conn.Exec(`INSERT INTO observers (id, name, iata) VALUES ('obs1', 'Observer One', 'SJC')`) db.conn.Exec(`INSERT INTO observers (id, name, iata) VALUES ('obs2', 'Observer Two', ' sfo ')`) - db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json) + db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash) VALUES ('AA', 'chanregion0001', ?, 1, 5, - '{"type":"CHAN","channel":"#region","text":"SjcUser: One","sender":"SjcUser"}')`, ts1) - db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json) + '{"type":"CHAN","channel":"#region","text":"SjcUser: One","sender":"SjcUser"}', '#region')`, ts1) + db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash) VALUES ('BB', 'chanregion0002', ?, 1, 5, - '{"type":"CHAN","channel":"#region","text":"SfoUser: Two","sender":"SfoUser"}')`, ts2) + '{"type":"CHAN","channel":"#region","text":"SfoUser: Two","sender":"SfoUser"}', '#region')`, ts2) db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp) VALUES (1, 1, 10.0, -90, '[]', ?)`, epoch1) db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp) @@ -1119,6 +1120,7 @@ func setupTestDBV2(t *testing.T) *DB { payload_type INTEGER, payload_version INTEGER, decoded_json TEXT, + channel_hash TEXT DEFAULT NULL, created_at TEXT DEFAULT (datetime('now')) ); @@ -1202,12 +1204,12 @@ func TestGetChannelMessagesDedup(t *testing.T) { db.conn.Exec(`INSERT INTO observers (id, name, iata) VALUES ('obs2', 'Observer Two', 'SFO')`) // Insert two transmissions with same hash to test dedup - db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json) + db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash) VALUES ('AA', 'chanmsg00000001', '2026-01-15T10:00:00Z', 1, 5, - '{"type":"CHAN","channel":"#general","text":"User1: Hello","sender":"User1"}')`) - db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json) + '{"type":"CHAN","channel":"#general","text":"User1: Hello","sender":"User1"}', '#general')`) + db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash) VALUES ('BB', 'chanmsg00000002', '2026-01-15T10:01:00Z', 1, 5, - '{"type":"CHAN","channel":"#general","text":"User2: World","sender":"User2"}')`) + '{"type":"CHAN","channel":"#general","text":"User2: World","sender":"User2"}', '#general')`) // Observations: first msg seen by two observers (dedup), second by one db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp) @@ -1251,9 +1253,9 @@ func TestGetChannelMessagesNoSender(t *testing.T) { defer db.Close() db.conn.Exec(`INSERT INTO observers (id, name, iata) VALUES ('obs1', 'Observer One', 'SJC')`) - db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json) + db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash) VALUES ('CC', 'chanmsg00000003', '2026-01-15T10:02:00Z', 1, 5, - '{"type":"CHAN","channel":"#noname","text":"plain text no colon"}')`) + '{"type":"CHAN","channel":"#noname","text":"plain text no colon"}', '#noname')`) db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp) VALUES (1, 1, 12.0, -90, null, 1736935300)`) @@ -1356,9 +1358,9 @@ func TestGetChannelMessagesObserverFallback(t *testing.T) { defer db.Close() // Observer with ID but no name entry (observer_idx won't match) - db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json) + db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash) VALUES ('AA', 'chanmsg00000004', '2026-01-15T10:00:00Z', 1, 5, - '{"type":"CHAN","channel":"#obs","text":"Sender: Test","sender":"Sender"}')`) + '{"type":"CHAN","channel":"#obs","text":"Sender: Test","sender":"Sender"}', '#obs')`) // Observation without observer (observer_idx = NULL) db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp) VALUES (1, NULL, 12.0, -90, null, 1736935200)`) @@ -1380,12 +1382,12 @@ func TestGetChannelsMultiple(t *testing.T) { defer db.Close() db.conn.Exec(`INSERT INTO observers (id, name, iata) VALUES ('obs1', 'Observer', 'SJC')`) - db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json) + db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash) VALUES ('AA', 'chan1hash', '2026-01-15T10:00:00Z', 1, 5, - '{"type":"CHAN","channel":"#alpha","text":"Alice: Hello","sender":"Alice"}')`) - db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json) + '{"type":"CHAN","channel":"#alpha","text":"Alice: Hello","sender":"Alice"}', '#alpha')`) + db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash) VALUES ('BB', 'chan2hash', '2026-01-15T10:01:00Z', 1, 5, - '{"type":"CHAN","channel":"#beta","text":"Bob: World","sender":"Bob"}')`) + '{"type":"CHAN","channel":"#beta","text":"Bob: World","sender":"Bob"}', '#beta')`) db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json) VALUES ('CC', 'chan3hash', '2026-01-15T10:02:00Z', 1, 5, '{"type":"CHAN","channel":"","text":"No channel"}')`) @@ -1468,13 +1470,13 @@ func TestGetChannelsStaleMessage(t *testing.T) { db.conn.Exec(`INSERT INTO observers (id, name, iata) VALUES ('obs2', 'Observer2', 'SFO')`) // Older message (first_seen T1) - db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json) + db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash) VALUES ('AA', 'oldhash1', '2026-01-15T10:00:00Z', 1, 5, - '{"type":"CHAN","channel":"#test","text":"Alice: Old message","sender":"Alice"}')`) + '{"type":"CHAN","channel":"#test","text":"Alice: Old message","sender":"Alice"}', '#test')`) // Newer message (first_seen T2 > T1) - db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json) + db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash) VALUES ('BB', 'newhash2', '2026-01-15T10:05:00Z', 1, 5, - '{"type":"CHAN","channel":"#test","text":"Bob: New message","sender":"Bob"}')`) + '{"type":"CHAN","channel":"#test","text":"Bob: New message","sender":"Bob"}', '#test')`) // Observations: older message re-observed AFTER newer message (stale scenario) db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, timestamp) @@ -1512,16 +1514,16 @@ func TestGetChannelsRegionFiltering(t *testing.T) { db.conn.Exec(`INSERT INTO observers (id, name, iata) VALUES ('obs2', 'Observer2', 'SFO')`) // Channel message seen only in SJC - db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json) + db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash) VALUES ('AA', 'hash1', '2026-01-15T10:00:00Z', 1, 5, - '{"type":"CHAN","channel":"#sjc-only","text":"Alice: Hello SJC","sender":"Alice"}')`) + '{"type":"CHAN","channel":"#sjc-only","text":"Alice: Hello SJC","sender":"Alice"}', '#sjc-only')`) db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, timestamp) VALUES (1, 1, 12.0, -90, 1736935200)`) // Channel message seen only in SFO - db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json) + db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash) VALUES ('BB', 'hash2', '2026-01-15T10:05:00Z', 1, 5, - '{"type":"CHAN","channel":"#sfo-only","text":"Bob: Hello SFO","sender":"Bob"}')`) + '{"type":"CHAN","channel":"#sfo-only","text":"Bob: Hello SFO","sender":"Bob"}', '#sfo-only')`) db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, timestamp) VALUES (2, 2, 14.0, -88, 1736935500)`) diff --git a/cmd/server/encrypted_channels_test.go b/cmd/server/encrypted_channels_test.go index c9c76aa..d8632b8 100644 --- a/cmd/server/encrypted_channels_test.go +++ b/cmd/server/encrypted_channels_test.go @@ -15,10 +15,10 @@ func seedEncryptedChannelData(t *testing.T, db *DB) { recentEpoch := now.Add(-1 * time.Hour).Unix() // Two encrypted GRP_TXT packets on channel hash "A1B2" - db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json) - VALUES ('EE01', 'enc_hash_001', ?, 1, 5, '{"type":"GRP_TXT","channelHashHex":"A1B2","decryptionStatus":"no_key"}')`, recent) - db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json) - VALUES ('EE02', 'enc_hash_002', ?, 1, 5, '{"type":"GRP_TXT","channelHashHex":"A1B2","decryptionStatus":"no_key"}')`, recent) + db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash) + VALUES ('EE01', 'enc_hash_001', ?, 1, 5, '{"type":"GRP_TXT","channelHashHex":"A1B2","decryptionStatus":"no_key"}', 'enc_A1B2')`, recent) + db.conn.Exec(`INSERT INTO transmissions (raw_hex, hash, first_seen, route_type, payload_type, decoded_json, channel_hash) + VALUES ('EE02', 'enc_hash_002', ?, 1, 5, '{"type":"GRP_TXT","channelHashHex":"A1B2","decryptionStatus":"no_key"}', 'enc_A1B2')`, recent) // Observations for both db.conn.Exec(`INSERT INTO observations (transmission_id, observer_idx, snr, rssi, path_json, timestamp) diff --git a/cmd/server/neighbor_persist_test.go b/cmd/server/neighbor_persist_test.go index c39f138..25a2044 100644 --- a/cmd/server/neighbor_persist_test.go +++ b/cmd/server/neighbor_persist_test.go @@ -27,7 +27,7 @@ func createTestDBWithSchema(t *testing.T) (*DB, string) { id INTEGER PRIMARY KEY AUTOINCREMENT, raw_hex TEXT, hash TEXT UNIQUE, first_seen TEXT, route_type INTEGER, payload_type INTEGER, payload_version INTEGER, - decoded_json TEXT + decoded_json TEXT, channel_hash TEXT DEFAULT NULL )`) conn.Exec(`CREATE TABLE observers ( id TEXT PRIMARY KEY, name TEXT, iata TEXT diff --git a/test-fixtures/e2e-fixture.db b/test-fixtures/e2e-fixture.db index 3a8ad83..f1c7a05 100644 Binary files a/test-fixtures/e2e-fixture.db and b/test-fixtures/e2e-fixture.db differ