Merge remote-tracking branch 's074/add-rabbitmq-error-handling'

This commit is contained in:
Rory&
2026-02-03 23:42:14 +01:00
4 changed files with 273 additions and 94 deletions
+16 -4
View File
@@ -16,7 +16,7 @@
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
import { Config, getRights, listenEvent } from "@spacebar/util";
import { Config, getRights, listenEvent, RabbitMQ } from "@spacebar/util";
import { NextFunction, Request, Response, Router } from "express";
import { API_PREFIX_TRAILING_SLASH } from "./Authentication";
@@ -154,9 +154,21 @@ export async function initRateLimits(app: Router) {
const { routes, global, ip, error, enabled } = Config.get().limits.rate;
if (!enabled) return;
console.log("Enabling rate limits...");
await listenEvent(EventRateLimit, (event) => {
Cache.set(event.channel_id as string, event.data);
event.acknowledge?.();
// Set up rate limit event listener
const setupRateLimitListener = async () => {
await listenEvent(EventRateLimit, (event) => {
Cache.set(event.channel_id as string, event.data);
event.acknowledge?.();
});
};
await setupRateLimitListener();
// Re-establish listener on RabbitMQ reconnection
RabbitMQ.on("reconnected", async () => {
console.log("[RateLimit] RabbitMQ reconnected, re-establishing rate limit listener");
await setupRateLimitListener();
});
// await RateLimit.delete({ expires_at: LessThan(new Date().toISOString()) }); // cleans up if not already deleted, morethan -> older date
// const limits = await RateLimit.find({ blocked: true });
+79 -27
View File
@@ -94,39 +94,87 @@ export async function setupListener(this: WebSocket) {
console.error(`[RabbitMQ] [user-${this.user_id}] Channel Error (Handled):`, err);
};
if (RabbitMQ.connection) {
console.log("[RabbitMQ] setupListener: opts.channel = ", typeof opts.channel, "with channel id", opts.channel?.ch);
opts.channel = await RabbitMQ.connection.createChannel();
// Function to set up all event listeners (used for initial setup and reconnection)
const setupEventListeners = async () => {
if (RabbitMQ.connection) {
console.log(`[RabbitMQ] [user-${this.user_id}] Setting up channel and event listeners`);
opts.channel = await RabbitMQ.connection.createChannel();
opts.channel.on("error", handleChannelError);
opts.channel.queues = {};
console.log("[RabbitMQ] channel created: ", typeof opts.channel, "with channel id", opts.channel?.ch);
}
opts.channel.on("error", handleChannelError);
opts.channel.queues = {};
console.log("[RabbitMQ] channel created: ", typeof opts.channel, "with channel id", opts.channel?.ch);
}
this.events[this.user_id] = await listenEvent(this.user_id, consumer, opts);
this.events[this.user_id] = await listenEvent(this.user_id, consumer, opts);
relationships.forEach(async (relationship) => {
this.events[relationship.to_id] = await listenEvent(relationship.to_id, handlePresenceUpdate.bind(this), opts);
});
await Promise.all(
relationships.map(async (relationship) => {
this.events[relationship.to_id] = await listenEvent(relationship.to_id, handlePresenceUpdate.bind(this), opts);
}),
);
dm_channels.forEach(async (channel) => {
this.events[channel.id] = await listenEvent(channel.id, consumer, opts);
});
guilds.forEach(async (guild) => {
const permission = await getPermission(this.user_id, guild.id);
this.permissions[guild.id] = permission;
this.events[guild.id] = await listenEvent(guild.id, consumer, opts);
guild.channels.forEach(async (channel) => {
if (permission.overwriteChannel(channel.permission_overwrites ?? []).has("VIEW_CHANNEL")) {
await Promise.all(
dm_channels.map(async (channel) => {
this.events[channel.id] = await listenEvent(channel.id, consumer, opts);
}
});
});
}),
);
await Promise.all(
guilds.map(async (guild) => {
const permission = await getPermission(this.user_id, guild.id);
this.permissions[guild.id] = permission;
this.events[guild.id] = await listenEvent(guild.id, consumer, opts);
await Promise.all(
guild.channels.map(async (channel) => {
if (permission.overwriteChannel(channel.permission_overwrites ?? []).has("VIEW_CHANNEL")) {
this.events[channel.id] = await listenEvent(channel.id, consumer, opts);
}
}),
);
}),
);
};
// Initial setup
await setupEventListeners();
// Handle RabbitMQ reconnection - re-establish all subscriptions
const handleReconnect = async () => {
console.log(`[RabbitMQ] [user-${this.user_id}] Connection restored, re-establishing subscriptions`);
try {
// Clear old event handlers (they're now invalid)
this.events = {};
this.member_events = {};
opts.channel = undefined;
// re-establish all subscriptions
await setupEventListeners();
console.log(`[RabbitMQ] [user-${this.user_id}] Successfully re-established subscriptions`);
} catch (e) {
console.error(`[RabbitMQ] [user-${this.user_id}] Failed to re-establish subscriptions:`, e);
// close the WebSocket - will force client to reconnect and redo subscription setup
this.close(4000, "Failed to re-establish event subscriptions");
}
};
const handleDisconnect = () => {
console.log(`[RabbitMQ] [user-${this.user_id}] Connection lost, waiting for reconnection`);
// mark channel invalid
if (opts.channel) {
opts.channel.off("error", handleChannelError);
}
opts.channel = undefined;
};
// Subscribe to RabbitMQ connection events
RabbitMQ.on("reconnected", handleReconnect);
RabbitMQ.on("disconnected", handleDisconnect);
this.once("close", async () => {
// console.log("[Events] setupListener: close for", this.user_id, "=", typeof opts.channel, "with channel id", opts.channel?.ch);
// Unsubscribe from RabbitMQ events
RabbitMQ.off("reconnected", handleReconnect);
RabbitMQ.off("disconnected", handleDisconnect);
// wait for event consumer cancellation
await Promise.all(
@@ -138,7 +186,11 @@ export async function setupListener(this: WebSocket) {
await Promise.all(Object.values(this.member_events).map((x) => x()));
if (opts.channel) {
await opts.channel.close();
try {
await opts.channel.close();
} catch {
// Channel might already be closed
}
opts.channel.off("error", handleChannelError);
}
});
+60 -26
View File
@@ -36,19 +36,33 @@ export async function emitEvent(payload: Omit<Event, "created_at">) {
if (RabbitMQ.connection) {
const data = typeof payload.data === "object" ? JSON.stringify(payload.data) : payload.data; // use rabbitmq for event transmission
const channel = await RabbitMQ.getSafeChannel();
try {
await channel.assertExchange(id, "fanout", {
durable: false,
});
// assertQueue isn't needed, because a queue will automatically created if it doesn't exist
const successful = channel.publish(id, "", Buffer.from(`${data}`), { type: payload.event });
if (!successful) throw new Error("failed to send event");
} catch (e) {
// todo: should we retry publishng the event?
console.log("[RabbitMQ] ", e);
}
const publishEvent = async (retryCount = 0): Promise<void> => {
const channel = await RabbitMQ.getSafeChannel();
try {
await channel.assertExchange(id, "fanout", {
durable: false,
});
// assertQueue isn't needed, because a queue will automatically created if it doesn't exist
const successful = channel.publish(id, "", Buffer.from(`${data}`), { type: payload.event });
if (!successful) throw new Error("failed to send event");
} catch (e) {
// Check if this is a channel closed error and if we should retry
const errorMessage = e instanceof Error ? e.message : String(e);
const isChannelError = errorMessage.includes("Channel closed") || errorMessage.includes("IllegalOperationError") || errorMessage.includes("RESOURCE_ERROR");
if (isChannelError && retryCount < 1) {
console.log("[RabbitMQ] Channel error detected, retrying with new channel...");
// Force the cached channel to be discarded by calling getSafeChannel which will create a new one
return publishEvent(retryCount + 1);
}
console.log("[RabbitMQ] ", e);
}
};
await publishEvent();
} else if (process.env.EVENT_TRANSMISSION === "unix" && process.env.EVENT_SOCKET_PATH) {
if (!unixSocketWriter) {
unixSocketWriter = new UnixSocketWriter(process.env.EVENT_SOCKET_PATH);
@@ -64,19 +78,26 @@ export async function emitEvent(payload: Omit<Event, "created_at">) {
export async function initEvent() {
await RabbitMQ.init(); // does nothing if rabbitmq is not setup
if (RabbitMQ.connection) {
// empty on purpose?
} else {
// use event emitter
// use process messages
}
await listenEvent("spacebar", async (event) => {
console.log("[Event] Received spacebar event:", event);
if ((event.event as string) === "SB_RELOAD_CONFIG") {
console.log("[Event] Reloading config due to RELOAD_CONFIG event");
await Config.init(true);
}
// Set up the spacebar event listener (used for config reload, etc.)
const setupSpacebarListener = async () => {
console.log("[Event] Setting up spacebar event listener");
await listenEvent("spacebar", async (event) => {
console.log("[Event] Received spacebar event:", event);
if ((event.event as string) === "SB_RELOAD_CONFIG") {
console.log("[Event] Reloading config due to RELOAD_CONFIG event");
await Config.init(true);
}
});
};
// Initial setup
await setupSpacebarListener();
// Re-establish listener on reconnection
RabbitMQ.on("reconnected", async () => {
console.log("[Event] RabbitMQ reconnected, re-establishing spacebar listener");
await setupSpacebarListener();
});
}
@@ -142,16 +163,29 @@ export async function listenEvent(event: string, callback: (event: EventOpts) =>
async function rabbitListen(channel: Channel, id: string, callback: (event: EventOpts) => unknown, opts?: { acknowledge?: boolean }): Promise<() => Promise<void>> {
await channel.assertExchange(id, "fanout", { durable: false });
// messageTtl ensures any orphaned messages are cleaned up quickly if the consumer disconnects.
const q = await channel.assertQueue("", {
exclusive: true,
autoDelete: true,
messageTtl: 5000, // Messages expire after 5 seconds if not consumed
});
const consumerTag = randomUUID();
const cancel = async () => {
await channel.cancel(consumerTag);
await channel.unbindQueue(q.queue, id, "");
try {
// Order matters here to prevent RESOURCE_ERROR, due to potential race condition:
// 1. Unbind first - stops new messages from being routed to this queue
await channel.unbindQueue(q.queue, id, "");
// 2. Cancel consumer - with autoDelete: true, this triggers queue deletion
// after RabbitMQ ensures no messages are in-flight to this queue
await channel.cancel(consumerTag);
// Don't explicitly delete the queue - let autoDelete handle it safely.
// Explicitly deleting can race with in-flight message delivery.
} catch (e) {
// Channel might already be closed or queue already deleted - that's fine
console.log("[RabbitMQ] Error during consumer cancel (may be expected):", e instanceof Error ? e.message : e);
}
};
await channel.bindQueue(q.queue, id, "");
+118 -37
View File
@@ -18,75 +18,156 @@
import amqp, { Channel, ChannelModel } from "amqplib";
import { Config } from "./Config";
import EventEmitter from "events";
export class RabbitMQ {
public static connection: ChannelModel | null = null;
public static channel: Channel | null = null;
// Event emitter for connection state changes
private static events = new EventEmitter();
// Reconnection state
private static isReconnecting = false;
private static reconnectAttempts = 0;
private static readonly BASE_RECONNECT_DELAY_MS = 500; // reconnect after 500 milliseconds delay
// Track if event listeners have been set up (to avoid duplicates)
private static connectionListenersAttached = false;
/**
* Subscribe to connection events.
* - 'reconnected': Fired after successful reconnection. Consumers should re-establish subscriptions.
* - 'disconnected': Fired when connection is lost.
*/
static on(event: "reconnected" | "disconnected", listener: () => void) {
this.events.on(event, listener);
}
static off(event: "reconnected" | "disconnected", listener: () => void) {
this.events.off(event, listener);
}
static async init() {
const host = Config.get().rabbitmq.host;
if (!host) return;
console.log(`[RabbitMQ] connect: ${host}`);
this.connection = await amqp.connect(host, {
timeout: 1000 * 60,
});
console.log(`[RabbitMQ] connected`);
// log connection errors
await this.connect(host);
}
private static async connect(host: string): Promise<void> {
try {
console.log(`[RabbitMQ] Connecting to: ${host}`);
this.connection = await amqp.connect(host, {
timeout: 1000 * 60,
});
console.log(`[RabbitMQ] Connected successfully`);
// Reset reconnection state on successful connect
this.reconnectAttempts = 0;
this.isReconnecting = false;
// Only attach listeners once per connection object
if (!this.connectionListenersAttached) {
this.attachConnectionListeners(host);
this.connectionListenersAttached = true;
}
// Pre-create the shared channel
await this.getSafeChannel();
// Notify subscribers that connection is (re-)established
this.events.emit("reconnected");
} catch (error) {
console.error("[RabbitMQ] Connection failed:", error);
await this.scheduleReconnect(host);
}
}
private static attachConnectionListeners(host: string) {
if (!this.connection) return;
this.connection.on("error", (err) => {
console.error("[RabbitMQ] Connection Error:", err);
console.error("[RabbitMQ] Connection error:", err);
// Don't reconnect here - wait for 'close' event
});
this.connection.on("close", () => {
console.error("[RabbitMQ] connection closed");
// TODO: Add reconnection logic here if the connection crashes??
// will be a pain since we will have to reconstruct entire state
});
console.error("[RabbitMQ] Connection closed");
this.channel = null;
this.connection = null;
this.connectionListenersAttached = false;
await this.getSafeChannel(); // why is this here?
// Notify subscribers that connection is lost
this.events.emit("disconnected");
// Schedule reconnection
this.scheduleReconnect(host);
});
}
private static async scheduleReconnect(host: string): Promise<void> {
if (this.isReconnecting) {
console.log("[RabbitMQ] Reconnection already in progress, skipping");
return;
}
this.isReconnecting = true;
this.reconnectAttempts++;
console.log(`[RabbitMQ] Scheduling reconnection attempt ${this.reconnectAttempts} in ${this.BASE_RECONNECT_DELAY_MS}ms`);
await new Promise((resolve) => setTimeout(resolve, this.BASE_RECONNECT_DELAY_MS));
try {
await this.connect(host);
} catch {
// connect() will schedule another reconnect on failure
console.log("[RabbitMQ] Reconnection attempt failed, will retry");
}
}
static async getSafeChannel(): Promise<Channel> {
if (!this.connection) return Promise.reject();
if (this.channel) return this.channel;
if (!this.connection) {
return Promise.reject(new Error("[RabbitMQ] No connection available"));
}
// Check if cached channel is still usable
if (this.channel) {
// amqplib channels have a 'closed' property when closed
const isClosed = (this.channel as unknown as { closed?: boolean }).closed;
if (!isClosed) {
return this.channel;
}
console.log("[RabbitMQ] Cached channel is closed, creating new one");
this.channel = null;
}
try {
this.channel = await this.connection.createChannel();
console.log(`[RabbitMQ] channel created`);
console.log("[RabbitMQ] Channel created");
// log channel errors
this.channel.on("error", (err) => {
console.error("[RabbitMQ] Channel Error:", err);
console.error("[RabbitMQ] Channel error:", err);
});
this.channel.on("close", () => {
console.log("[RabbitMQ] channel closed");
console.log("[RabbitMQ] Channel closed");
this.channel = null;
});
this.connection.on("error", (err) => {
console.error("[RabbitMQ] connection error, setting channel to null and reconnecting:", err);
this.channel = null;
this.connection = null;
this.init();
});
this.connection.on("close", () => {
console.log("[RabbitMQ] connection closed, setting channel to null and reconnecting");
this.channel = null;
this.connection = null;
this.init();
});
return this.channel;
} catch (e) {
console.error("[RabbitMQ] Failed to create channel:", e);
console.error("[RabbitMQ] Forcing reconnect!");
this.connection = null;
this.channel = null;
await this.init();
return await this.getSafeChannel();
// return Promise.reject(e);
throw e;
}
}
/**
* Check if RabbitMQ is currently connected and ready.
*/
static isConnected(): boolean {
return this.connection !== null && !this.isReconnecting;
}
}