feat(ingestor): make MQTT connect timeout configurable (#931)

Add mqttConnectTimeoutSeconds config field (default 10s) so operators
can tune the MQTT connect timeout for high-latency brokers without
code changes. Wired into buildMQTTOpts and logged at startup.

Closes #931
This commit is contained in:
you
2026-05-02 18:26:46 +00:00
parent 4f0f7bc6dd
commit b9e7535e1c
5 changed files with 55 additions and 7 deletions
+12
View File
@@ -44,6 +44,9 @@ type Config struct {
ValidateSignatures *bool `json:"validateSignatures,omitempty"`
DB *DBConfig `json:"db,omitempty"`
// MQTTConnectTimeoutSeconds overrides the MQTT connect timeout (default 10s).
MQTTConnectTimeoutSeconds int `json:"mqttConnectTimeoutSeconds,omitempty"`
// ObserverBlacklist is a list of observer public keys to drop at ingest.
// Messages from blacklisted observers are silently discarded — no DB writes,
// no UpsertObserver, no observations, no metrics.
@@ -107,6 +110,15 @@ func (c *Config) MetricsRetentionDays() int {
return 30
}
// MQTTConnectTimeoutOrDefault returns the configured MQTT connect timeout in
// seconds, or 10 if not set (matching the paho default used historically).
func (c *Config) MQTTConnectTimeoutOrDefault() int {
if c.MQTTConnectTimeoutSeconds > 0 {
return c.MQTTConnectTimeoutSeconds
}
return 10
}
// NodeDaysOrDefault returns the configured retention.nodeDays or 7 if not set.
func (c *Config) NodeDaysOrDefault() int {
if c.Retention != nil && c.Retention.NodeDays > 0 {
+33
View File
@@ -284,3 +284,36 @@ func TestLoadConfigWithAllFields(t *testing.T) {
t.Errorf("iataFilter=%v", src.IATAFilter)
}
}
func TestMQTTConnectTimeoutOrDefault(t *testing.T) {
// Default when unset
cfg := &Config{}
if got := cfg.MQTTConnectTimeoutOrDefault(); got != 10 {
t.Errorf("default: got %d, want 10", got)
}
// Custom value
cfg.MQTTConnectTimeoutSeconds = 45
if got := cfg.MQTTConnectTimeoutOrDefault(); got != 45 {
t.Errorf("custom: got %d, want 45", got)
}
// Zero treated as unset
cfg.MQTTConnectTimeoutSeconds = 0
if got := cfg.MQTTConnectTimeoutOrDefault(); got != 10 {
t.Errorf("zero: got %d, want 10", got)
}
}
func TestMQTTConnectTimeoutFromJSON(t *testing.T) {
dir := t.TempDir()
cfgPath := dir + "/config.json"
os.WriteFile(cfgPath, []byte(`{"mqttConnectTimeoutSeconds": 5}`), 0644)
cfg, err := LoadConfig(cfgPath)
if err != nil {
t.Fatal(err)
}
if got := cfg.MQTTConnectTimeoutOrDefault(); got != 5 {
t.Errorf("from JSON: got %d, want 5", got)
}
}
+4 -3
View File
@@ -129,7 +129,8 @@ func main() {
tag = source.Broker
}
opts := buildMQTTOpts(source)
opts := buildMQTTOpts(source, cfg.MQTTConnectTimeoutOrDefault())
log.Printf("MQTT [%s] connect timeout: %ds", tag, cfg.MQTTConnectTimeoutOrDefault())
opts.SetOnConnectHandler(func(c mqtt.Client) {
log.Printf("MQTT [%s] connected to %s", tag, source.Broker)
@@ -196,14 +197,14 @@ func main() {
// buildMQTTOpts creates MQTT client options for a source with bounded reconnect
// backoff, connect timeout, and TLS/auth configuration.
func buildMQTTOpts(source MQTTSource) *mqtt.ClientOptions {
func buildMQTTOpts(source MQTTSource, connectTimeoutSec int) *mqtt.ClientOptions {
opts := mqtt.NewClientOptions().
AddBroker(source.Broker).
SetAutoReconnect(true).
SetConnectRetry(true).
SetOrderMatters(true).
SetMaxReconnectInterval(30 * time.Second).
SetConnectTimeout(10 * time.Second).
SetConnectTimeout(time.Duration(connectTimeoutSec) * time.Second).
SetWriteTimeout(10 * time.Second)
if source.Username != "" {
+4 -4
View File
@@ -10,7 +10,7 @@ func TestBuildMQTTOpts_ReconnectSettings(t *testing.T) {
Broker: "tcp://localhost:1883",
Name: "test",
}
opts := buildMQTTOpts(source)
opts := buildMQTTOpts(source, 10)
if opts.MaxReconnectInterval != 30*time.Second {
t.Errorf("MaxReconnectInterval = %v, want 30s", opts.MaxReconnectInterval)
@@ -35,7 +35,7 @@ func TestBuildMQTTOpts_Credentials(t *testing.T) {
Username: "user1",
Password: "pass1",
}
opts := buildMQTTOpts(source)
opts := buildMQTTOpts(source, 10)
if opts.Username != "user1" {
t.Errorf("Username = %q, want %q", opts.Username, "user1")
@@ -51,7 +51,7 @@ func TestBuildMQTTOpts_TLS_InsecureSkipVerify(t *testing.T) {
Broker: "ssl://broker:8883",
RejectUnauthorized: &f,
}
opts := buildMQTTOpts(source)
opts := buildMQTTOpts(source, 10)
if opts.TLSConfig == nil {
t.Fatal("TLSConfig should be set")
@@ -65,7 +65,7 @@ func TestBuildMQTTOpts_TLS_SSL_Prefix(t *testing.T) {
source := MQTTSource{
Broker: "ssl://broker:8883",
}
opts := buildMQTTOpts(source)
opts := buildMQTTOpts(source, 10)
if opts.TLSConfig == nil {
t.Fatal("TLSConfig should be set for ssl:// brokers")
+2
View File
@@ -14,6 +14,8 @@
"incrementalVacuumPages": 1024,
"_comment": "vacuumOnStartup: run one-time full VACUUM to enable incremental auto-vacuum on existing DBs (blocks startup for minutes on large DBs; requires 2x DB file size in free disk space). incrementalVacuumPages: free pages returned to OS after each retention reaper cycle (default 1024). See #919."
},
"mqttConnectTimeoutSeconds": 10,
"_commentMqttConnectTimeout": "MQTT connect timeout in seconds (default 10). Increase for high-latency brokers.",
"https": {
"cert": "/path/to/cert.pem",
"key": "/path/to/key.pem",