From b9e7535e1c37487561e79e4aa4eb223aaa1a87e4 Mon Sep 17 00:00:00 2001 From: you Date: Sat, 2 May 2026 18:26:46 +0000 Subject: [PATCH] feat(ingestor): make MQTT connect timeout configurable (#931) Add mqttConnectTimeoutSeconds config field (default 10s) so operators can tune the MQTT connect timeout for high-latency brokers without code changes. Wired into buildMQTTOpts and logged at startup. Closes #931 --- cmd/ingestor/config.go | 12 ++++++++++++ cmd/ingestor/config_test.go | 33 +++++++++++++++++++++++++++++++++ cmd/ingestor/main.go | 7 ++++--- cmd/ingestor/mqtt_opts_test.go | 8 ++++---- config.example.json | 2 ++ 5 files changed, 55 insertions(+), 7 deletions(-) diff --git a/cmd/ingestor/config.go b/cmd/ingestor/config.go index e6ab2d26..eb380352 100644 --- a/cmd/ingestor/config.go +++ b/cmd/ingestor/config.go @@ -44,6 +44,9 @@ type Config struct { ValidateSignatures *bool `json:"validateSignatures,omitempty"` DB *DBConfig `json:"db,omitempty"` + // MQTTConnectTimeoutSeconds overrides the MQTT connect timeout (default 10s). + MQTTConnectTimeoutSeconds int `json:"mqttConnectTimeoutSeconds,omitempty"` + // ObserverBlacklist is a list of observer public keys to drop at ingest. // Messages from blacklisted observers are silently discarded — no DB writes, // no UpsertObserver, no observations, no metrics. @@ -107,6 +110,15 @@ func (c *Config) MetricsRetentionDays() int { return 30 } +// MQTTConnectTimeoutOrDefault returns the configured MQTT connect timeout in +// seconds, or 10 if not set (matching the paho default used historically). +func (c *Config) MQTTConnectTimeoutOrDefault() int { + if c.MQTTConnectTimeoutSeconds > 0 { + return c.MQTTConnectTimeoutSeconds + } + return 10 +} + // NodeDaysOrDefault returns the configured retention.nodeDays or 7 if not set. func (c *Config) NodeDaysOrDefault() int { if c.Retention != nil && c.Retention.NodeDays > 0 { diff --git a/cmd/ingestor/config_test.go b/cmd/ingestor/config_test.go index 76b76f10..efe5cfc8 100644 --- a/cmd/ingestor/config_test.go +++ b/cmd/ingestor/config_test.go @@ -284,3 +284,36 @@ func TestLoadConfigWithAllFields(t *testing.T) { t.Errorf("iataFilter=%v", src.IATAFilter) } } + +func TestMQTTConnectTimeoutOrDefault(t *testing.T) { + // Default when unset + cfg := &Config{} + if got := cfg.MQTTConnectTimeoutOrDefault(); got != 10 { + t.Errorf("default: got %d, want 10", got) + } + + // Custom value + cfg.MQTTConnectTimeoutSeconds = 45 + if got := cfg.MQTTConnectTimeoutOrDefault(); got != 45 { + t.Errorf("custom: got %d, want 45", got) + } + + // Zero treated as unset + cfg.MQTTConnectTimeoutSeconds = 0 + if got := cfg.MQTTConnectTimeoutOrDefault(); got != 10 { + t.Errorf("zero: got %d, want 10", got) + } +} + +func TestMQTTConnectTimeoutFromJSON(t *testing.T) { + dir := t.TempDir() + cfgPath := dir + "/config.json" + os.WriteFile(cfgPath, []byte(`{"mqttConnectTimeoutSeconds": 5}`), 0644) + cfg, err := LoadConfig(cfgPath) + if err != nil { + t.Fatal(err) + } + if got := cfg.MQTTConnectTimeoutOrDefault(); got != 5 { + t.Errorf("from JSON: got %d, want 5", got) + } +} diff --git a/cmd/ingestor/main.go b/cmd/ingestor/main.go index 9984b13a..a4e9662d 100644 --- a/cmd/ingestor/main.go +++ b/cmd/ingestor/main.go @@ -129,7 +129,8 @@ func main() { tag = source.Broker } - opts := buildMQTTOpts(source) + opts := buildMQTTOpts(source, cfg.MQTTConnectTimeoutOrDefault()) + log.Printf("MQTT [%s] connect timeout: %ds", tag, cfg.MQTTConnectTimeoutOrDefault()) opts.SetOnConnectHandler(func(c mqtt.Client) { log.Printf("MQTT [%s] connected to %s", tag, source.Broker) @@ -196,14 +197,14 @@ func main() { // buildMQTTOpts creates MQTT client options for a source with bounded reconnect // backoff, connect timeout, and TLS/auth configuration. -func buildMQTTOpts(source MQTTSource) *mqtt.ClientOptions { +func buildMQTTOpts(source MQTTSource, connectTimeoutSec int) *mqtt.ClientOptions { opts := mqtt.NewClientOptions(). AddBroker(source.Broker). SetAutoReconnect(true). SetConnectRetry(true). SetOrderMatters(true). SetMaxReconnectInterval(30 * time.Second). - SetConnectTimeout(10 * time.Second). + SetConnectTimeout(time.Duration(connectTimeoutSec) * time.Second). SetWriteTimeout(10 * time.Second) if source.Username != "" { diff --git a/cmd/ingestor/mqtt_opts_test.go b/cmd/ingestor/mqtt_opts_test.go index e8aceec4..6f44b4cc 100644 --- a/cmd/ingestor/mqtt_opts_test.go +++ b/cmd/ingestor/mqtt_opts_test.go @@ -10,7 +10,7 @@ func TestBuildMQTTOpts_ReconnectSettings(t *testing.T) { Broker: "tcp://localhost:1883", Name: "test", } - opts := buildMQTTOpts(source) + opts := buildMQTTOpts(source, 10) if opts.MaxReconnectInterval != 30*time.Second { t.Errorf("MaxReconnectInterval = %v, want 30s", opts.MaxReconnectInterval) @@ -35,7 +35,7 @@ func TestBuildMQTTOpts_Credentials(t *testing.T) { Username: "user1", Password: "pass1", } - opts := buildMQTTOpts(source) + opts := buildMQTTOpts(source, 10) if opts.Username != "user1" { t.Errorf("Username = %q, want %q", opts.Username, "user1") @@ -51,7 +51,7 @@ func TestBuildMQTTOpts_TLS_InsecureSkipVerify(t *testing.T) { Broker: "ssl://broker:8883", RejectUnauthorized: &f, } - opts := buildMQTTOpts(source) + opts := buildMQTTOpts(source, 10) if opts.TLSConfig == nil { t.Fatal("TLSConfig should be set") @@ -65,7 +65,7 @@ func TestBuildMQTTOpts_TLS_SSL_Prefix(t *testing.T) { source := MQTTSource{ Broker: "ssl://broker:8883", } - opts := buildMQTTOpts(source) + opts := buildMQTTOpts(source, 10) if opts.TLSConfig == nil { t.Fatal("TLSConfig should be set for ssl:// brokers") diff --git a/config.example.json b/config.example.json index 7e8e80a3..f9f1e084 100644 --- a/config.example.json +++ b/config.example.json @@ -14,6 +14,8 @@ "incrementalVacuumPages": 1024, "_comment": "vacuumOnStartup: run one-time full VACUUM to enable incremental auto-vacuum on existing DBs (blocks startup for minutes on large DBs; requires 2x DB file size in free disk space). incrementalVacuumPages: free pages returned to OS after each retention reaper cycle (default 1024). See #919." }, + "mqttConnectTimeoutSeconds": 10, + "_commentMqttConnectTimeout": "MQTT connect timeout in seconds (default 10). Increase for high-latency brokers.", "https": { "cert": "/path/to/cert.pem", "key": "/path/to/key.pem",