Compare commits

...

2 Commits

Author SHA1 Message Date
openclaw-bot 4ea12087f2 fix(mqtt): persistent session + parallel handler (#1337)
paho defaults (CleanSession=true, empty random ClientID per reconnect,
Order=true) caused the staging ingestor to receive ~7 msg/h while
mosquitto_sub on the same broker/creds/topics received ~6720/h — a 200x
gap. Every watchdog-driven reconnect (~every 5min) made the broker treat
us as a brand-new session and drop the queued backlog.

buildMQTTOpts now sets:
  - SetClientID("corescope-ingestor-<hostname>-<source-tag>")
    persistent + unique across sources, stable across restarts
  - SetCleanSession(false)
    broker keeps subscription state across reconnects and replays the
    backlog we missed
  - SetKeepAlive(30 * time.Second)
    paho-level half-open detection (was unset; relying on OS keepalive)
  - SetOrderMatters(false)
    handler dispatch is parallel; one slow packet no longer stalls all
    others under burst load

The existing watchdog (#1212/#1216) is untouched. Reconnect throttle
(MaxReconnectInterval=30s) is unchanged — no reconnect storm.

Fixes #1337
2026-05-24 03:00:42 +00:00
openclaw-bot 2fd579bc6e test(mqtt): RED — pin persistent-session paho opts for #1337
Three tests that fail on master:
- TestBuildMQTTOpts_PersistentSession_Issue1337 — asserts CleanSession=false,
  non-empty ClientID embedding hostname+source name, KeepAlive=30s, Order=false
- TestBuildMQTTOpts_ClientIDStableAcrossBuilds_Issue1337 — same source name +
  hostname must yield identical ClientID across two builds (otherwise reconnect
  = new session = broker drops the backlog)
- TestBuildMQTTOpts_ClientIDUniquePerSource_Issue1337 — distinct source names
  must yield distinct ClientIDs (duplicate ClientID = broker disconnects the
  older session, infinite flap)

Refs #1337
2026-05-24 02:59:10 +00:00
2 changed files with 106 additions and 1 deletions
+21 -1
View File
@@ -364,11 +364,31 @@ func buildMQTTOpts(source MQTTSource) *mqtt.ClientOptions {
if tag == "" {
tag = source.Broker
}
// #1337: paho defaults silently throttle delivery on this broker.
// - CleanSession=true + empty ClientID (random per reconnect) made the
// broker treat every reconnect as a brand-new session and discard the
// backlog it had queued since the previous disconnect. With watchdog
// reconnects every ~5min on staging, this lost ~99% of messages.
// - Order=true serialized the default publish handler; one slow packet
// blocked all others, compounding the loss under bursts.
// Fix: persistent unique ClientID + CleanSession=false (broker keeps
// our subscription state across reconnects and forwards what we missed),
// explicit KeepAlive so half-open TCP is detected at the paho layer, and
// Order=false for parallel handler dispatch.
hostname, _ := os.Hostname()
if hostname == "" {
hostname = "unknown-host"
}
clientID := "corescope-ingestor-" + hostname + "-" + tag
opts := mqtt.NewClientOptions().
AddBroker(source.Broker).
SetClientID(clientID).
SetCleanSession(false).
SetKeepAlive(30 * time.Second).
SetOrderMatters(false).
SetAutoReconnect(true).
SetConnectRetry(true).
SetOrderMatters(true).
SetMaxReconnectInterval(30 * time.Second).
SetConnectTimeout(10 * time.Second).
SetWriteTimeout(10 * time.Second)
+85
View File
@@ -0,0 +1,85 @@
package main
import (
"os"
"strings"
"testing"
"time"
)
// Issue #1337: paho client misconfigured — ingestor receives 200× fewer
// messages than mosquitto_sub on the same broker/creds/topics. Root cause
// (hypothesis 1+5): paho defaults — CleanSession=true, empty ClientID
// (auto-random per reconnect), Order=true (handler serialized) — combined
// with the reconnect-every-5min watchdog meant the broker dropped queued
// messages on every reconnect AND the handler couldn't keep up under load.
//
// These tests pin the four paho options that fix the gap:
// 1. CleanSession=false — broker keeps the subscription state across
// reconnects instead of treating each dial
// as a brand-new session.
// 2. ClientID = persistent — broker recognizes the returning session.
// Empty ClientID makes paho generate a fresh
// random one on every reconnect, which is
// treated as a new client by the broker.
// 3. KeepAlive = 30s — half-open TCP detected at the paho layer
// instead of waiting for OS keepalive.
// 4. Order = false — handler dispatch is parallel; one slow
// packet does not block all the others.
//
// All four must be set in buildMQTTOpts. This test fails on master.
func TestBuildMQTTOpts_PersistentSession_Issue1337(t *testing.T) {
source := MQTTSource{
Broker: "ssl://broker.example:8883",
Name: "sjc-test",
}
opts := buildMQTTOpts(source)
if opts.CleanSession {
t.Error("CleanSession must be false (#1337): broker drops queued msgs across reconnects when true")
}
host, _ := os.Hostname()
if opts.ClientID == "" {
t.Fatal("ClientID must be set to a persistent value (#1337): empty = paho generates random per reconnect, broker treats every reconnect as new session")
}
if !strings.Contains(opts.ClientID, "sjc-test") {
t.Errorf("ClientID must embed source name for uniqueness across sources, got %q", opts.ClientID)
}
if host != "" && !strings.Contains(opts.ClientID, host) {
t.Errorf("ClientID must embed hostname for uniqueness across deployments, got %q (host=%q)", opts.ClientID, host)
}
if opts.KeepAlive != int64((30 * time.Second).Seconds()) {
t.Errorf("KeepAlive must be 30s (#1337): got %ds — needed so paho detects half-open TCP", opts.KeepAlive)
}
if opts.Order {
t.Error("Order must be false (#1337): default true serializes handler dispatch; a slow packet stalls all others")
}
}
// Stability: ClientID must be deterministic for a given (hostname, source)
// across two builds. Otherwise reconnect = new session = lost backlog.
func TestBuildMQTTOpts_ClientIDStableAcrossBuilds_Issue1337(t *testing.T) {
source := MQTTSource{Broker: "ssl://broker.example:8883", Name: "stable-test"}
a := buildMQTTOpts(source).ClientID
b := buildMQTTOpts(source).ClientID
if a == "" {
t.Fatal("ClientID empty")
}
if a != b {
t.Errorf("ClientID must be stable across buildMQTTOpts calls (#1337): %q vs %q — random = broker drops session on reconnect", a, b)
}
}
// Distinct sources must NOT share a ClientID — broker disconnects the older
// session whenever a duplicate ClientID connects, causing flapping.
func TestBuildMQTTOpts_ClientIDUniquePerSource_Issue1337(t *testing.T) {
a := buildMQTTOpts(MQTTSource{Broker: "ssl://a:8883", Name: "alpha"}).ClientID
b := buildMQTTOpts(MQTTSource{Broker: "ssl://b:8883", Name: "beta"}).ClientID
if a == b {
t.Errorf("distinct sources must get distinct ClientIDs (#1337): both got %q — duplicate IDs cause broker to disconnect the older one, infinite flap", a)
}
}