feat(websocket): update WebSocket connection handling with heartbeat, reconnection logic, and support for ping/pong messages

This commit is contained in:
Ivan
2026-04-14 19:34:36 -05:00
parent d784f1c75f
commit 97fd6049ea
4 changed files with 337 additions and 24 deletions

View File

@@ -1,13 +1,24 @@
import mitt from "mitt";
import { reconnectDelayWithJitterMs } from "./wsConnectionSupport";
const PING_INTERVAL_MS = 25000;
const PONG_TIMEOUT_MS = 12000;
const BASE_RECONNECT_MS = 1000;
const MAX_RECONNECT_MS = 60000;
const JITTER_MAX_MS = 400;
class WebSocketConnection {
constructor() {
this.emitter = mitt();
this.ws = null;
this.pingInterval = null;
this.reconnectTimeout = null;
this._heartbeatInterval = null;
this._pongTimeout = null;
this._reconnectTimeout = null;
this._reconnectAttempt = 0;
this.initialized = false;
this.destroyed = false;
this._hadSuccessfulOpen = false;
this._pendingReconnectUi = false;
}
async connect() {
@@ -20,48 +31,149 @@ class WebSocketConnection {
this.initialized = true;
this.reconnect();
if (this.pingInterval) clearInterval(this.pingInterval);
this.pingInterval = setInterval(() => {
this.ping();
}, 30000);
}
// add event listener
on(event, handler) {
this.emitter.on(event, handler);
}
// remove event listener
off(event, handler) {
this.emitter.off(event, handler);
}
// emit event
emit(type, event) {
this.emitter.emit(type, event);
}
_clearHeartbeat() {
if (this._heartbeatInterval != null) {
clearInterval(this._heartbeatInterval);
this._heartbeatInterval = null;
}
}
_clearPongTimeout() {
if (this._pongTimeout != null) {
clearTimeout(this._pongTimeout);
this._pongTimeout = null;
}
}
_stopHeartbeat() {
this._clearHeartbeat();
this._clearPongTimeout();
}
_sendAppPing() {
if (this.destroyed || !this.ws || this.ws.readyState !== WebSocket.OPEN) {
return;
}
try {
this.ws.send(JSON.stringify({ type: "ping" }));
} catch {
return;
}
this._clearPongTimeout();
this._pongTimeout = setTimeout(() => {
this._pongTimeout = null;
if (this.destroyed || !this.ws) {
return;
}
try {
this.ws.close(4000, "heartbeat timeout");
} catch {
// ignore
}
}, PONG_TIMEOUT_MS);
}
_startHeartbeat() {
this._stopHeartbeat();
this._heartbeatInterval = setInterval(() => {
this._sendAppPing();
}, PING_INTERVAL_MS);
this._sendAppPing();
}
reconnect() {
if (!this.initialized || this.destroyed || typeof window === "undefined" || !window.location) {
return;
}
// connect to websocket
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
return;
}
if (this.ws) {
try {
this.ws.close();
} catch {
// ignore
}
this.ws = null;
}
const wsUrl = window.location.origin.replace(/^https/, "wss").replace(/^http/, "ws") + "/ws";
this.ws = new WebSocket(wsUrl);
// auto reconnect when websocket closes
this.ws.addEventListener("open", () => {
if (this.destroyed) {
return;
}
if (this._reconnectTimeout != null) {
clearTimeout(this._reconnectTimeout);
this._reconnectTimeout = null;
}
this._reconnectAttempt = 0;
this._stopHeartbeat();
this._startHeartbeat();
const isReconnect = this._pendingReconnectUi;
this._pendingReconnectUi = false;
this._hadSuccessfulOpen = true;
this.emit("connected", { isReconnect });
});
this.ws.addEventListener("close", () => {
if (this.destroyed) return;
this.reconnectTimeout = setTimeout(() => {
this._stopHeartbeat();
if (this.destroyed) {
return;
}
if (this._hadSuccessfulOpen) {
this._pendingReconnectUi = true;
}
this.emit("disconnected");
const delay = reconnectDelayWithJitterMs(
this._reconnectAttempt,
BASE_RECONNECT_MS,
MAX_RECONNECT_MS,
JITTER_MAX_MS
);
this._reconnectAttempt += 1;
if (this._reconnectTimeout != null) {
clearTimeout(this._reconnectTimeout);
}
this._reconnectTimeout = setTimeout(() => {
this._reconnectTimeout = null;
if (!this.destroyed) {
this.reconnect();
}
}, 1000);
}, delay);
});
this.ws.addEventListener("error", () => {
// close event will follow; reconnect scheduled there
});
// emit data received from websocket
this.ws.onmessage = (message) => {
try {
const data = JSON.parse(message.data);
if (data && data.type === "pong") {
this._clearPongTimeout();
return;
}
} catch {
// non-json: forward
}
this.emit("message", message);
};
}
@@ -69,16 +181,19 @@ class WebSocketConnection {
destroy() {
this.destroyed = true;
this.initialized = false;
if (this.pingInterval) {
clearInterval(this.pingInterval);
this.pingInterval = null;
}
if (this.reconnectTimeout) {
clearTimeout(this.reconnectTimeout);
this.reconnectTimeout = null;
this._hadSuccessfulOpen = false;
this._pendingReconnectUi = false;
this._stopHeartbeat();
if (this._reconnectTimeout != null) {
clearTimeout(this._reconnectTimeout);
this._reconnectTimeout = null;
}
if (this.ws) {
this.ws.close();
try {
this.ws.close();
} catch {
// ignore
}
this.ws = null;
}
}
@@ -97,7 +212,7 @@ class WebSocketConnection {
})
);
} catch {
// ignore error
// ignore
}
}
}

View File

@@ -0,0 +1,40 @@
/**
* @param {number} attemptIndex 0 = first retry after disconnect
* @param {number} baseMs
* @param {number} maxMs
*/
export function getNextReconnectDelayMs(attemptIndex, baseMs, maxMs) {
const raw = baseMs * 2 ** Math.max(0, attemptIndex);
return Math.min(maxMs, Math.floor(raw));
}
/**
* Human-readable duration for disconnected banner (count-up).
* @param {number} elapsedMs
*/
export function formatDisconnectedDuration(elapsedMs) {
let t = Math.max(0, Math.floor(elapsedMs));
const s = Math.floor(t / 1000);
if (s < 60) {
return `${s}s`;
}
const m = Math.floor(s / 60);
const secRem = s % 60;
if (m < 60) {
return secRem > 0 ? `${m}m ${secRem}s` : `${m}m`;
}
const h = Math.floor(m / 60);
const minRem = m % 60;
if (h < 24) {
return minRem > 0 ? `${h}h ${minRem}m` : `${h}h`;
}
const d = Math.floor(h / 24);
const hrRem = h % 24;
return hrRem > 0 ? `${d}d ${hrRem}h` : `${d}d`;
}
export function reconnectDelayWithJitterMs(attemptIndex, baseMs, maxMs, jitterMaxMs) {
const base = getNextReconnectDelayMs(attemptIndex, baseMs, maxMs);
const jitter = jitterMaxMs > 0 ? Math.floor(Math.random() * jitterMaxMs) : 0;
return base + jitter;
}

View File

@@ -0,0 +1,117 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
function makeWsImpl() {
return class MockWebSocket {
static CONNECTING = 0;
static OPEN = 1;
static CLOSING = 2;
static CLOSED = 3;
constructor(url) {
this.url = url;
this.readyState = MockWebSocket.CONNECTING;
this._listeners = { open: [], close: [], error: [], message: [] };
queueMicrotask(() => {
if (this.readyState === MockWebSocket.CLOSED) {
return;
}
this.readyState = MockWebSocket.OPEN;
this._listeners.open.forEach((fn) => fn());
});
}
addEventListener(type, fn) {
this._listeners[type]?.push(fn);
}
send(data) {
if (data.includes('"type":"ping"')) {
queueMicrotask(() => {
this._listeners.message.forEach((fn) => fn({ data: JSON.stringify({ type: "pong" }) }));
});
}
}
close(code, reason) {
if (this.readyState === MockWebSocket.CLOSED) {
return;
}
this.readyState = MockWebSocket.CLOSED;
queueMicrotask(() => {
this._listeners.close.forEach((fn) => fn({ code, reason }));
});
}
};
}
describe("WebSocketConnection module", () => {
beforeEach(() => {
vi.resetModules();
global.window = {
api: {},
location: { origin: "http://127.0.0.1:5173" },
};
});
afterEach(() => {
vi.useRealTimers();
});
it("emits connected then disconnected on close and reconnects with backoff", async () => {
const MockWS = makeWsImpl();
global.WebSocket = MockWS;
vi.useFakeTimers({ shouldAdvanceTime: true });
const { default: WebSocketConnection } = await import("../../meshchatx/src/frontend/js/WebSocketConnection.js");
const connected = vi.fn();
const disconnected = vi.fn();
WebSocketConnection.on("connected", connected);
WebSocketConnection.on("disconnected", disconnected);
await WebSocketConnection.connect();
await vi.waitUntil(() => connected.mock.calls.length >= 1);
expect(connected.mock.calls[0][0]).toEqual({ isReconnect: false });
const firstWs = WebSocketConnection.ws;
firstWs.close(1000, "test");
await vi.waitUntil(() => disconnected.mock.calls.length >= 1);
const delay = 1000;
await vi.advanceTimersByTimeAsync(delay + 500);
await vi.waitUntil(() => WebSocketConnection.ws && WebSocketConnection.ws !== firstWs);
await vi.waitUntil(() => connected.mock.calls.length >= 2);
expect(connected.mock.calls[1][0]).toEqual({ isReconnect: true });
WebSocketConnection.destroy();
});
it("strips pong from message stream", async () => {
const MockWS = makeWsImpl();
global.WebSocket = MockWS;
const { default: WebSocketConnection } = await import("../../meshchatx/src/frontend/js/WebSocketConnection.js");
const onMessage = vi.fn();
WebSocketConnection.on("message", onMessage);
await WebSocketConnection.connect();
await vi.waitUntil(() => WebSocketConnection.ws?.readyState === MockWS.OPEN);
const sock = WebSocketConnection.ws;
sock.onmessage({ data: JSON.stringify({ type: "pong" }) });
expect(onMessage).not.toHaveBeenCalled();
sock.onmessage({ data: JSON.stringify({ type: "config", config: {} }) });
expect(onMessage).toHaveBeenCalledTimes(1);
WebSocketConnection.destroy();
});
});

View File

@@ -0,0 +1,41 @@
import { describe, expect, it, vi } from "vitest";
import {
formatDisconnectedDuration,
getNextReconnectDelayMs,
reconnectDelayWithJitterMs,
} from "../../meshchatx/src/frontend/js/wsConnectionSupport";
describe("getNextReconnectDelayMs", () => {
it("doubles exponentially and caps at max", () => {
expect(getNextReconnectDelayMs(0, 1000, 60000)).toBe(1000);
expect(getNextReconnectDelayMs(1, 1000, 60000)).toBe(2000);
expect(getNextReconnectDelayMs(2, 1000, 60000)).toBe(4000);
expect(getNextReconnectDelayMs(16, 1000, 60000)).toBe(60000);
});
});
describe("reconnectDelayWithJitterMs", () => {
it("adds jitter in range", () => {
vi.spyOn(Math, "random").mockReturnValue(0.5);
expect(reconnectDelayWithJitterMs(0, 1000, 60000, 400)).toBe(1200);
vi.mocked(Math.random).mockRestore();
});
});
describe("formatDisconnectedDuration", () => {
it("formats seconds", () => {
expect(formatDisconnectedDuration(0)).toBe("0s");
expect(formatDisconnectedDuration(1500)).toBe("1s");
expect(formatDisconnectedDuration(59000)).toBe("59s");
});
it("formats minutes", () => {
expect(formatDisconnectedDuration(60000)).toBe("1m");
expect(formatDisconnectedDuration(125000)).toBe("2m 5s");
});
it("formats hours and days", () => {
expect(formatDisconnectedDuration(3600000)).toBe("1h");
expect(formatDisconnectedDuration(3720000)).toBe("1h 2m");
expect(formatDisconnectedDuration(86400000)).toBe("1d");
expect(formatDisconnectedDuration(90000000)).toBe("1d 1h");
});
});