mirror of
https://github.com/Kpa-clawbot/meshcore-analyzer.git
synced 2026-05-13 07:24:48 +00:00
Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| b9e7535e1c |
@@ -44,6 +44,9 @@ type Config struct {
|
||||
ValidateSignatures *bool `json:"validateSignatures,omitempty"`
|
||||
DB *DBConfig `json:"db,omitempty"`
|
||||
|
||||
// MQTTConnectTimeoutSeconds overrides the MQTT connect timeout (default 10s).
|
||||
MQTTConnectTimeoutSeconds int `json:"mqttConnectTimeoutSeconds,omitempty"`
|
||||
|
||||
// ObserverBlacklist is a list of observer public keys to drop at ingest.
|
||||
// Messages from blacklisted observers are silently discarded — no DB writes,
|
||||
// no UpsertObserver, no observations, no metrics.
|
||||
@@ -107,6 +110,15 @@ func (c *Config) MetricsRetentionDays() int {
|
||||
return 30
|
||||
}
|
||||
|
||||
// MQTTConnectTimeoutOrDefault returns the configured MQTT connect timeout in
|
||||
// seconds, or 10 if not set (matching the paho default used historically).
|
||||
func (c *Config) MQTTConnectTimeoutOrDefault() int {
|
||||
if c.MQTTConnectTimeoutSeconds > 0 {
|
||||
return c.MQTTConnectTimeoutSeconds
|
||||
}
|
||||
return 10
|
||||
}
|
||||
|
||||
// NodeDaysOrDefault returns the configured retention.nodeDays or 7 if not set.
|
||||
func (c *Config) NodeDaysOrDefault() int {
|
||||
if c.Retention != nil && c.Retention.NodeDays > 0 {
|
||||
|
||||
@@ -284,3 +284,36 @@ func TestLoadConfigWithAllFields(t *testing.T) {
|
||||
t.Errorf("iataFilter=%v", src.IATAFilter)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMQTTConnectTimeoutOrDefault(t *testing.T) {
|
||||
// Default when unset
|
||||
cfg := &Config{}
|
||||
if got := cfg.MQTTConnectTimeoutOrDefault(); got != 10 {
|
||||
t.Errorf("default: got %d, want 10", got)
|
||||
}
|
||||
|
||||
// Custom value
|
||||
cfg.MQTTConnectTimeoutSeconds = 45
|
||||
if got := cfg.MQTTConnectTimeoutOrDefault(); got != 45 {
|
||||
t.Errorf("custom: got %d, want 45", got)
|
||||
}
|
||||
|
||||
// Zero treated as unset
|
||||
cfg.MQTTConnectTimeoutSeconds = 0
|
||||
if got := cfg.MQTTConnectTimeoutOrDefault(); got != 10 {
|
||||
t.Errorf("zero: got %d, want 10", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMQTTConnectTimeoutFromJSON(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
cfgPath := dir + "/config.json"
|
||||
os.WriteFile(cfgPath, []byte(`{"mqttConnectTimeoutSeconds": 5}`), 0644)
|
||||
cfg, err := LoadConfig(cfgPath)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if got := cfg.MQTTConnectTimeoutOrDefault(); got != 5 {
|
||||
t.Errorf("from JSON: got %d, want 5", got)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -129,7 +129,8 @@ func main() {
|
||||
tag = source.Broker
|
||||
}
|
||||
|
||||
opts := buildMQTTOpts(source)
|
||||
opts := buildMQTTOpts(source, cfg.MQTTConnectTimeoutOrDefault())
|
||||
log.Printf("MQTT [%s] connect timeout: %ds", tag, cfg.MQTTConnectTimeoutOrDefault())
|
||||
|
||||
opts.SetOnConnectHandler(func(c mqtt.Client) {
|
||||
log.Printf("MQTT [%s] connected to %s", tag, source.Broker)
|
||||
@@ -196,14 +197,14 @@ func main() {
|
||||
|
||||
// buildMQTTOpts creates MQTT client options for a source with bounded reconnect
|
||||
// backoff, connect timeout, and TLS/auth configuration.
|
||||
func buildMQTTOpts(source MQTTSource) *mqtt.ClientOptions {
|
||||
func buildMQTTOpts(source MQTTSource, connectTimeoutSec int) *mqtt.ClientOptions {
|
||||
opts := mqtt.NewClientOptions().
|
||||
AddBroker(source.Broker).
|
||||
SetAutoReconnect(true).
|
||||
SetConnectRetry(true).
|
||||
SetOrderMatters(true).
|
||||
SetMaxReconnectInterval(30 * time.Second).
|
||||
SetConnectTimeout(10 * time.Second).
|
||||
SetConnectTimeout(time.Duration(connectTimeoutSec) * time.Second).
|
||||
SetWriteTimeout(10 * time.Second)
|
||||
|
||||
if source.Username != "" {
|
||||
|
||||
@@ -10,7 +10,7 @@ func TestBuildMQTTOpts_ReconnectSettings(t *testing.T) {
|
||||
Broker: "tcp://localhost:1883",
|
||||
Name: "test",
|
||||
}
|
||||
opts := buildMQTTOpts(source)
|
||||
opts := buildMQTTOpts(source, 10)
|
||||
|
||||
if opts.MaxReconnectInterval != 30*time.Second {
|
||||
t.Errorf("MaxReconnectInterval = %v, want 30s", opts.MaxReconnectInterval)
|
||||
@@ -35,7 +35,7 @@ func TestBuildMQTTOpts_Credentials(t *testing.T) {
|
||||
Username: "user1",
|
||||
Password: "pass1",
|
||||
}
|
||||
opts := buildMQTTOpts(source)
|
||||
opts := buildMQTTOpts(source, 10)
|
||||
|
||||
if opts.Username != "user1" {
|
||||
t.Errorf("Username = %q, want %q", opts.Username, "user1")
|
||||
@@ -51,7 +51,7 @@ func TestBuildMQTTOpts_TLS_InsecureSkipVerify(t *testing.T) {
|
||||
Broker: "ssl://broker:8883",
|
||||
RejectUnauthorized: &f,
|
||||
}
|
||||
opts := buildMQTTOpts(source)
|
||||
opts := buildMQTTOpts(source, 10)
|
||||
|
||||
if opts.TLSConfig == nil {
|
||||
t.Fatal("TLSConfig should be set")
|
||||
@@ -65,7 +65,7 @@ func TestBuildMQTTOpts_TLS_SSL_Prefix(t *testing.T) {
|
||||
source := MQTTSource{
|
||||
Broker: "ssl://broker:8883",
|
||||
}
|
||||
opts := buildMQTTOpts(source)
|
||||
opts := buildMQTTOpts(source, 10)
|
||||
|
||||
if opts.TLSConfig == nil {
|
||||
t.Fatal("TLSConfig should be set for ssl:// brokers")
|
||||
|
||||
@@ -14,6 +14,8 @@
|
||||
"incrementalVacuumPages": 1024,
|
||||
"_comment": "vacuumOnStartup: run one-time full VACUUM to enable incremental auto-vacuum on existing DBs (blocks startup for minutes on large DBs; requires 2x DB file size in free disk space). incrementalVacuumPages: free pages returned to OS after each retention reaper cycle (default 1024). See #919."
|
||||
},
|
||||
"mqttConnectTimeoutSeconds": 10,
|
||||
"_commentMqttConnectTimeout": "MQTT connect timeout in seconds (default 10). Increase for high-latency brokers.",
|
||||
"https": {
|
||||
"cert": "/path/to/cert.pem",
|
||||
"key": "/path/to/key.pem",
|
||||
|
||||
Reference in New Issue
Block a user