diff --git a/src/api/middlewares/RateLimit.ts b/src/api/middlewares/RateLimit.ts index c749df6ad..8a8c17f6b 100644 --- a/src/api/middlewares/RateLimit.ts +++ b/src/api/middlewares/RateLimit.ts @@ -16,7 +16,7 @@ along with this program. If not, see . */ -import { Config, getRights, listenEvent } from "@spacebar/util"; +import { Config, getRights, listenEvent, RabbitMQ } from "@spacebar/util"; import { NextFunction, Request, Response, Router } from "express"; import { API_PREFIX_TRAILING_SLASH } from "./Authentication"; @@ -154,9 +154,21 @@ export async function initRateLimits(app: Router) { const { routes, global, ip, error, enabled } = Config.get().limits.rate; if (!enabled) return; console.log("Enabling rate limits..."); - await listenEvent(EventRateLimit, (event) => { - Cache.set(event.channel_id as string, event.data); - event.acknowledge?.(); + + // Set up rate limit event listener + const setupRateLimitListener = async () => { + await listenEvent(EventRateLimit, (event) => { + Cache.set(event.channel_id as string, event.data); + event.acknowledge?.(); + }); + }; + + await setupRateLimitListener(); + + // Re-establish listener on RabbitMQ reconnection + RabbitMQ.on("reconnected", async () => { + console.log("[RateLimit] RabbitMQ reconnected, re-establishing rate limit listener"); + await setupRateLimitListener(); }); // await RateLimit.delete({ expires_at: LessThan(new Date().toISOString()) }); // cleans up if not already deleted, morethan -> older date // const limits = await RateLimit.find({ blocked: true }); diff --git a/src/gateway/listener/listener.ts b/src/gateway/listener/listener.ts index 1f0d386d2..deec5014e 100644 --- a/src/gateway/listener/listener.ts +++ b/src/gateway/listener/listener.ts @@ -94,39 +94,87 @@ export async function setupListener(this: WebSocket) { console.error(`[RabbitMQ] [user-${this.user_id}] Channel Error (Handled):`, err); }; - if (RabbitMQ.connection) { - console.log("[RabbitMQ] setupListener: opts.channel = ", typeof opts.channel, "with channel id", opts.channel?.ch); - opts.channel = await RabbitMQ.connection.createChannel(); + // Function to set up all event listeners (used for initial setup and reconnection) + const setupEventListeners = async () => { + if (RabbitMQ.connection) { + console.log(`[RabbitMQ] [user-${this.user_id}] Setting up channel and event listeners`); + opts.channel = await RabbitMQ.connection.createChannel(); - opts.channel.on("error", handleChannelError); - opts.channel.queues = {}; - console.log("[RabbitMQ] channel created: ", typeof opts.channel, "with channel id", opts.channel?.ch); - } + opts.channel.on("error", handleChannelError); + opts.channel.queues = {}; + console.log("[RabbitMQ] channel created: ", typeof opts.channel, "with channel id", opts.channel?.ch); + } - this.events[this.user_id] = await listenEvent(this.user_id, consumer, opts); + this.events[this.user_id] = await listenEvent(this.user_id, consumer, opts); - relationships.forEach(async (relationship) => { - this.events[relationship.to_id] = await listenEvent(relationship.to_id, handlePresenceUpdate.bind(this), opts); - }); + await Promise.all( + relationships.map(async (relationship) => { + this.events[relationship.to_id] = await listenEvent(relationship.to_id, handlePresenceUpdate.bind(this), opts); + }), + ); - dm_channels.forEach(async (channel) => { - this.events[channel.id] = await listenEvent(channel.id, consumer, opts); - }); - - guilds.forEach(async (guild) => { - const permission = await getPermission(this.user_id, guild.id); - this.permissions[guild.id] = permission; - this.events[guild.id] = await listenEvent(guild.id, consumer, opts); - - guild.channels.forEach(async (channel) => { - if (permission.overwriteChannel(channel.permission_overwrites ?? []).has("VIEW_CHANNEL")) { + await Promise.all( + dm_channels.map(async (channel) => { this.events[channel.id] = await listenEvent(channel.id, consumer, opts); - } - }); - }); + }), + ); + + await Promise.all( + guilds.map(async (guild) => { + const permission = await getPermission(this.user_id, guild.id); + this.permissions[guild.id] = permission; + this.events[guild.id] = await listenEvent(guild.id, consumer, opts); + + await Promise.all( + guild.channels.map(async (channel) => { + if (permission.overwriteChannel(channel.permission_overwrites ?? []).has("VIEW_CHANNEL")) { + this.events[channel.id] = await listenEvent(channel.id, consumer, opts); + } + }), + ); + }), + ); + }; + + // Initial setup + await setupEventListeners(); + + // Handle RabbitMQ reconnection - re-establish all subscriptions + const handleReconnect = async () => { + console.log(`[RabbitMQ] [user-${this.user_id}] Connection restored, re-establishing subscriptions`); + try { + // Clear old event handlers (they're now invalid) + this.events = {}; + this.member_events = {}; + opts.channel = undefined; + + // re-establish all subscriptions + await setupEventListeners(); + console.log(`[RabbitMQ] [user-${this.user_id}] Successfully re-established subscriptions`); + } catch (e) { + console.error(`[RabbitMQ] [user-${this.user_id}] Failed to re-establish subscriptions:`, e); + // close the WebSocket - will force client to reconnect and redo subscription setup + this.close(4000, "Failed to re-establish event subscriptions"); + } + }; + + const handleDisconnect = () => { + console.log(`[RabbitMQ] [user-${this.user_id}] Connection lost, waiting for reconnection`); + // mark channel invalid + if (opts.channel) { + opts.channel.off("error", handleChannelError); + } + opts.channel = undefined; + }; + + // Subscribe to RabbitMQ connection events + RabbitMQ.on("reconnected", handleReconnect); + RabbitMQ.on("disconnected", handleDisconnect); this.once("close", async () => { - // console.log("[Events] setupListener: close for", this.user_id, "=", typeof opts.channel, "with channel id", opts.channel?.ch); + // Unsubscribe from RabbitMQ events + RabbitMQ.off("reconnected", handleReconnect); + RabbitMQ.off("disconnected", handleDisconnect); // wait for event consumer cancellation await Promise.all( @@ -138,7 +186,11 @@ export async function setupListener(this: WebSocket) { await Promise.all(Object.values(this.member_events).map((x) => x())); if (opts.channel) { - await opts.channel.close(); + try { + await opts.channel.close(); + } catch { + // Channel might already be closed + } opts.channel.off("error", handleChannelError); } }); diff --git a/src/util/util/Event.ts b/src/util/util/Event.ts index 4ec1cb5f9..27b1986e7 100644 --- a/src/util/util/Event.ts +++ b/src/util/util/Event.ts @@ -36,19 +36,33 @@ export async function emitEvent(payload: Omit) { if (RabbitMQ.connection) { const data = typeof payload.data === "object" ? JSON.stringify(payload.data) : payload.data; // use rabbitmq for event transmission - const channel = await RabbitMQ.getSafeChannel(); - try { - await channel.assertExchange(id, "fanout", { - durable: false, - }); - // assertQueue isn't needed, because a queue will automatically created if it doesn't exist - const successful = channel.publish(id, "", Buffer.from(`${data}`), { type: payload.event }); - if (!successful) throw new Error("failed to send event"); - } catch (e) { - // todo: should we retry publishng the event? - console.log("[RabbitMQ] ", e); - } + const publishEvent = async (retryCount = 0): Promise => { + const channel = await RabbitMQ.getSafeChannel(); + try { + await channel.assertExchange(id, "fanout", { + durable: false, + }); + + // assertQueue isn't needed, because a queue will automatically created if it doesn't exist + const successful = channel.publish(id, "", Buffer.from(`${data}`), { type: payload.event }); + if (!successful) throw new Error("failed to send event"); + } catch (e) { + // Check if this is a channel closed error and if we should retry + const errorMessage = e instanceof Error ? e.message : String(e); + const isChannelError = errorMessage.includes("Channel closed") || errorMessage.includes("IllegalOperationError") || errorMessage.includes("RESOURCE_ERROR"); + + if (isChannelError && retryCount < 1) { + console.log("[RabbitMQ] Channel error detected, retrying with new channel..."); + // Force the cached channel to be discarded by calling getSafeChannel which will create a new one + return publishEvent(retryCount + 1); + } + + console.log("[RabbitMQ] ", e); + } + }; + + await publishEvent(); } else if (process.env.EVENT_TRANSMISSION === "unix" && process.env.EVENT_SOCKET_PATH) { if (!unixSocketWriter) { unixSocketWriter = new UnixSocketWriter(process.env.EVENT_SOCKET_PATH); @@ -64,19 +78,26 @@ export async function emitEvent(payload: Omit) { export async function initEvent() { await RabbitMQ.init(); // does nothing if rabbitmq is not setup - if (RabbitMQ.connection) { - // empty on purpose? - } else { - // use event emitter - // use process messages - } - await listenEvent("spacebar", async (event) => { - console.log("[Event] Received spacebar event:", event); - if ((event.event as string) === "SB_RELOAD_CONFIG") { - console.log("[Event] Reloading config due to RELOAD_CONFIG event"); - await Config.init(true); - } + // Set up the spacebar event listener (used for config reload, etc.) + const setupSpacebarListener = async () => { + console.log("[Event] Setting up spacebar event listener"); + await listenEvent("spacebar", async (event) => { + console.log("[Event] Received spacebar event:", event); + if ((event.event as string) === "SB_RELOAD_CONFIG") { + console.log("[Event] Reloading config due to RELOAD_CONFIG event"); + await Config.init(true); + } + }); + }; + + // Initial setup + await setupSpacebarListener(); + + // Re-establish listener on reconnection + RabbitMQ.on("reconnected", async () => { + console.log("[Event] RabbitMQ reconnected, re-establishing spacebar listener"); + await setupSpacebarListener(); }); } @@ -142,16 +163,29 @@ export async function listenEvent(event: string, callback: (event: EventOpts) => async function rabbitListen(channel: Channel, id: string, callback: (event: EventOpts) => unknown, opts?: { acknowledge?: boolean }): Promise<() => Promise> { await channel.assertExchange(id, "fanout", { durable: false }); + // messageTtl ensures any orphaned messages are cleaned up quickly if the consumer disconnects. const q = await channel.assertQueue("", { exclusive: true, autoDelete: true, + messageTtl: 5000, // Messages expire after 5 seconds if not consumed }); const consumerTag = randomUUID(); const cancel = async () => { - await channel.cancel(consumerTag); - await channel.unbindQueue(q.queue, id, ""); + try { + // Order matters here to prevent RESOURCE_ERROR, due to potential race condition: + // 1. Unbind first - stops new messages from being routed to this queue + await channel.unbindQueue(q.queue, id, ""); + // 2. Cancel consumer - with autoDelete: true, this triggers queue deletion + // after RabbitMQ ensures no messages are in-flight to this queue + await channel.cancel(consumerTag); + // Don't explicitly delete the queue - let autoDelete handle it safely. + // Explicitly deleting can race with in-flight message delivery. + } catch (e) { + // Channel might already be closed or queue already deleted - that's fine + console.log("[RabbitMQ] Error during consumer cancel (may be expected):", e instanceof Error ? e.message : e); + } }; await channel.bindQueue(q.queue, id, ""); diff --git a/src/util/util/RabbitMQ.ts b/src/util/util/RabbitMQ.ts index 777b77ee3..288a7fa9a 100644 --- a/src/util/util/RabbitMQ.ts +++ b/src/util/util/RabbitMQ.ts @@ -18,75 +18,156 @@ import amqp, { Channel, ChannelModel } from "amqplib"; import { Config } from "./Config"; +import EventEmitter from "events"; export class RabbitMQ { public static connection: ChannelModel | null = null; public static channel: Channel | null = null; + // Event emitter for connection state changes + private static events = new EventEmitter(); + + // Reconnection state + private static isReconnecting = false; + private static reconnectAttempts = 0; + private static readonly BASE_RECONNECT_DELAY_MS = 500; // reconnect after 500 milliseconds delay + + // Track if event listeners have been set up (to avoid duplicates) + private static connectionListenersAttached = false; + + /** + * Subscribe to connection events. + * - 'reconnected': Fired after successful reconnection. Consumers should re-establish subscriptions. + * - 'disconnected': Fired when connection is lost. + */ + static on(event: "reconnected" | "disconnected", listener: () => void) { + this.events.on(event, listener); + } + + static off(event: "reconnected" | "disconnected", listener: () => void) { + this.events.off(event, listener); + } + static async init() { const host = Config.get().rabbitmq.host; if (!host) return; - console.log(`[RabbitMQ] connect: ${host}`); - this.connection = await amqp.connect(host, { - timeout: 1000 * 60, - }); - console.log(`[RabbitMQ] connected`); - // log connection errors + await this.connect(host); + } + + private static async connect(host: string): Promise { + try { + console.log(`[RabbitMQ] Connecting to: ${host}`); + this.connection = await amqp.connect(host, { + timeout: 1000 * 60, + }); + console.log(`[RabbitMQ] Connected successfully`); + + // Reset reconnection state on successful connect + this.reconnectAttempts = 0; + this.isReconnecting = false; + + // Only attach listeners once per connection object + if (!this.connectionListenersAttached) { + this.attachConnectionListeners(host); + this.connectionListenersAttached = true; + } + + // Pre-create the shared channel + await this.getSafeChannel(); + + // Notify subscribers that connection is (re-)established + this.events.emit("reconnected"); + } catch (error) { + console.error("[RabbitMQ] Connection failed:", error); + await this.scheduleReconnect(host); + } + } + + private static attachConnectionListeners(host: string) { + if (!this.connection) return; + this.connection.on("error", (err) => { - console.error("[RabbitMQ] Connection Error:", err); + console.error("[RabbitMQ] Connection error:", err); + // Don't reconnect here - wait for 'close' event }); this.connection.on("close", () => { - console.error("[RabbitMQ] connection closed"); - // TODO: Add reconnection logic here if the connection crashes?? - // will be a pain since we will have to reconstruct entire state - }); + console.error("[RabbitMQ] Connection closed"); + this.channel = null; + this.connection = null; + this.connectionListenersAttached = false; - await this.getSafeChannel(); // why is this here? + // Notify subscribers that connection is lost + this.events.emit("disconnected"); + + // Schedule reconnection + this.scheduleReconnect(host); + }); + } + + private static async scheduleReconnect(host: string): Promise { + if (this.isReconnecting) { + console.log("[RabbitMQ] Reconnection already in progress, skipping"); + return; + } + + this.isReconnecting = true; + this.reconnectAttempts++; + + console.log(`[RabbitMQ] Scheduling reconnection attempt ${this.reconnectAttempts} in ${this.BASE_RECONNECT_DELAY_MS}ms`); + + await new Promise((resolve) => setTimeout(resolve, this.BASE_RECONNECT_DELAY_MS)); + + try { + await this.connect(host); + } catch { + // connect() will schedule another reconnect on failure + console.log("[RabbitMQ] Reconnection attempt failed, will retry"); + } } static async getSafeChannel(): Promise { - if (!this.connection) return Promise.reject(); - if (this.channel) return this.channel; + if (!this.connection) { + return Promise.reject(new Error("[RabbitMQ] No connection available")); + } + + // Check if cached channel is still usable + if (this.channel) { + // amqplib channels have a 'closed' property when closed + const isClosed = (this.channel as unknown as { closed?: boolean }).closed; + if (!isClosed) { + return this.channel; + } + console.log("[RabbitMQ] Cached channel is closed, creating new one"); + this.channel = null; + } try { this.channel = await this.connection.createChannel(); - console.log(`[RabbitMQ] channel created`); + console.log("[RabbitMQ] Channel created"); - // log channel errors this.channel.on("error", (err) => { - console.error("[RabbitMQ] Channel Error:", err); + console.error("[RabbitMQ] Channel error:", err); }); this.channel.on("close", () => { - console.log("[RabbitMQ] channel closed"); + console.log("[RabbitMQ] Channel closed"); this.channel = null; }); - this.connection.on("error", (err) => { - console.error("[RabbitMQ] connection error, setting channel to null and reconnecting:", err); - this.channel = null; - this.connection = null; - this.init(); - }); - - this.connection.on("close", () => { - console.log("[RabbitMQ] connection closed, setting channel to null and reconnecting"); - this.channel = null; - this.connection = null; - this.init(); - }); - return this.channel; } catch (e) { console.error("[RabbitMQ] Failed to create channel:", e); - console.error("[RabbitMQ] Forcing reconnect!"); - this.connection = null; this.channel = null; - await this.init(); - return await this.getSafeChannel(); - // return Promise.reject(e); + throw e; } } + + /** + * Check if RabbitMQ is currently connected and ready. + */ + static isConnected(): boolean { + return this.connection !== null && !this.isReconnecting; + } }