From 3c0d8b507b235d2cca6122b284e6263d2521803e Mon Sep 17 00:00:00 2001 From: Rory& Date: Sat, 16 May 2026 20:36:15 +0200 Subject: [PATCH] Add untested rabbitmq single-channel IPC --- src/util/util/index.ts | 2 +- src/util/util/ipc/Event.ts | 17 ++- src/util/util/{ => ipc}/RabbitMQ.ts | 4 +- .../ipc/listener/RabbitMqSingleListener.ts | 106 ++++++++++++++++++ .../util/ipc/listener/UnixSocketListener.ts | 20 ++-- .../util/ipc/writer/RabbitMqSingleWriter.ts | 70 ++++++++++++ src/util/util/ipc/writer/UnixSocketWriter.ts | 46 ++++---- 7 files changed, 228 insertions(+), 37 deletions(-) rename src/util/util/{ => ipc}/RabbitMQ.ts (99%) create mode 100644 src/util/util/ipc/listener/RabbitMqSingleListener.ts create mode 100644 src/util/util/ipc/writer/RabbitMqSingleWriter.ts diff --git a/src/util/util/index.ts b/src/util/util/index.ts index 9daef50b1..f6de6a955 100644 --- a/src/util/util/index.ts +++ b/src/util/util/index.ts @@ -37,7 +37,7 @@ export * from "./Logo"; export * from "./MessageFlags"; export * from "./networking"; export * from "./Permissions"; -export * from "./RabbitMQ"; +export * from "./ipc/RabbitMQ"; export * from "./Regex"; export * from "./Rights"; export * from "./Snowflake"; diff --git a/src/util/util/ipc/Event.ts b/src/util/util/ipc/Event.ts index a332d6e0c..4899b5d83 100644 --- a/src/util/util/ipc/Event.ts +++ b/src/util/util/ipc/Event.ts @@ -20,13 +20,15 @@ import { Channel } from "amqplib"; import { randomUUID } from "node:crypto"; import EventEmitter from "node:events"; import path from "node:path"; -import { RabbitMQ } from "../RabbitMQ"; +import { RabbitMQ } from "./RabbitMQ"; import { EVENT, Event } from "../../interfaces"; import { Config } from "../Config"; import { BaseEventListener } from "./listener/BaseEventListener"; import { BaseEventWriter } from "./writer/BaseEventWriter"; import { UnixSocketWriter } from "./writer/UnixSocketWriter"; import { UnixSocketListener } from "./listener/UnixSocketListener"; +import { RabbitMqSingleListener } from "./listener/RabbitMqSingleListener"; +import { yellow } from "picocolors"; export const events = new EventEmitter(); let listener: BaseEventListener | null = null; @@ -131,12 +133,25 @@ export interface ProcessEvent { export async function listenEvent(event: string, callback: (event: EventOpts) => unknown, opts?: ListenEventOpts): Promise<() => Promise> { if (RabbitMQ.connection) { + if (process.env.EVENT_TRANSMISSION !== "rabbitmq-legacy") { + console.warn(yellow("[Events] Warning:"), "RabbitMQ replication without configuring EVENT_TRANSMISSION is deprecated."); + console.warn(yellow("[Events] Warning:"), "Set EVENT_TRANSMISSION to 'rabbitmq-legacy' in environment variables to silence this warning."); + } const rabbitMQChannel = await RabbitMQ.getSafeChannel(); const channel = opts?.channel || rabbitMQChannel; if (!channel) throw new Error("[Events] An event was sent without an associated channel"); return await rabbitListen(channel, event, callback, { acknowledge: opts?.acknowledge, }); + } else if (process.env.EVENT_TRANSMISSION === "rabbitmq-single") { + if (!Config.get().rabbitmq.host) { + console.error("[Events] RabbitMQ is not configured."); + } + if (!listener) { + listener = new RabbitMqSingleListener(Config.get().rabbitmq.host!); + await listener.init(); + } + return await listener.listen(event, callback); } else if (process.env.EVENT_TRANSMISSION === "unix" && process.env.EVENT_SOCKET_PATH) { if (!unixSocketListener) { listener = unixSocketListener = new UnixSocketListener(path.join(process.env.EVENT_SOCKET_PATH, `${process.pid}.sock`)); diff --git a/src/util/util/RabbitMQ.ts b/src/util/util/ipc/RabbitMQ.ts similarity index 99% rename from src/util/util/RabbitMQ.ts rename to src/util/util/ipc/RabbitMQ.ts index 328a2df72..fe6e6d1ba 100644 --- a/src/util/util/RabbitMQ.ts +++ b/src/util/util/ipc/RabbitMQ.ts @@ -16,9 +16,9 @@ along with this program. If not, see . */ -import amqp, { Channel, ChannelModel } from "amqplib"; -import { Config } from "./Config"; import EventEmitter from "node:events"; +import amqp, { Channel, ChannelModel } from "amqplib"; +import { Config } from "../Config"; export class RabbitMQ { public static connection: ChannelModel | null = null; diff --git a/src/util/util/ipc/listener/RabbitMqSingleListener.ts b/src/util/util/ipc/listener/RabbitMqSingleListener.ts new file mode 100644 index 000000000..bff8f9402 --- /dev/null +++ b/src/util/util/ipc/listener/RabbitMqSingleListener.ts @@ -0,0 +1,106 @@ +/* + Spacebar: A FOSS re-implementation and extension of the Discord.com backend. + Copyright (C) 2026 Spacebar and Spacebar Contributors + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . +*/ + +import EventEmitter from "node:events"; +import { BaseEventListener } from "./BaseEventListener"; +import { EVENT, Event, EventOpts, RabbitMQ } from "@spacebar/util"; +import amqp, { Channel, ChannelModel } from "amqplib"; +import { randomUUID } from "node:crypto"; + +export class RabbitMqSingleListener extends BaseEventListener { + private readonly host: string; + private connection: ChannelModel; + private channel: Channel; + eventEmitter: EventEmitter; + + constructor(host: string) { + super(); + this.eventEmitter = new EventEmitter(); + this.host = host; + } + + async init() { + console.log(`[RabbitMQSingleListener] Connecting to: ${this.host}`); + this.connection = await amqp.connect(this.host, { + timeout: 1000 * 60, + noDelay: true, + }); + this.channel = await this.connection.createChannel(); + + for (const sig of ["SIGINT", "SIGTERM", "SIGQUIT"] as const) { + process.on(sig, this.close); + } + + this.connection.on("error", (err) => { + console.error("[RabbitMQSingleListener] Connection error:", err); + }); + + this.connection.on("close", () => { + console.error("[RabbitMQSingleListener] Connection closed"); + this.init().catch((e) => console.error("[RabbitMQSingleListener] Failed to schedule reconnection:", e)); + }); + + // actually set up event receiving? + await this.channel.assertExchange("", "fanout", { durable: false }); + const q = await this.channel.assertQueue("", { + exclusive: true, + autoDelete: true, + messageTtl: 5000, + }); + + const consumerTag = randomUUID(); + await this.channel.bindQueue(q.queue, "", ""); + await this.channel.consume( + q.queue, + (opts) => { + if (!opts) return; + const data = JSON.parse(opts.content.toString()) as { id: EVENT; event: Event }; + + this.eventEmitter.emit(data.id, data.event); + }, + { + consumerTag, + }, + ); + } + + async close(): Promise { + await this.channel.close(); + await this.connection.close(); + } + + async listen(event: string, callback: (event: EventOpts) => unknown): Promise<() => Promise> { + const listener = (data: Event) => { + callback({ + ...data, + cancel, + }); + }; + + this.eventEmitter.addListener(event, listener); + + const cancel = async () => { + this.eventEmitter.removeListener(event, listener); + this.eventEmitter.setMaxListeners(this.eventEmitter.getMaxListeners() - 1); + }; + + this.eventEmitter.setMaxListeners(this.eventEmitter.getMaxListeners() + 1); + + return cancel; + } +} diff --git a/src/util/util/ipc/listener/UnixSocketListener.ts b/src/util/util/ipc/listener/UnixSocketListener.ts index 14d07d387..8ead85bda 100644 --- a/src/util/util/ipc/listener/UnixSocketListener.ts +++ b/src/util/util/ipc/listener/UnixSocketListener.ts @@ -20,7 +20,7 @@ import EventEmitter from "node:events"; import fs from "node:fs"; import net, { Server } from "node:net"; import { BaseEventListener } from "./BaseEventListener"; -import { Event, EventOpts } from "@spacebar/util"; +import { EVENT, Event, EventOpts } from "@spacebar/util"; export class UnixSocketListener extends BaseEventListener { eventEmitter: EventEmitter; @@ -39,15 +39,15 @@ export class UnixSocketListener extends BaseEventListener { try { if (fs.existsSync(this.socketPath)) { fs.unlinkSync(this.socketPath); - console.log("[Events] Removed stale socket file:", this.socketPath); + console.log("[UnixSocketListener] Removed stale socket file:", this.socketPath); } } catch (e) { - console.error("[Events] Failed to remove stale socket:", e); + console.error("[UnixSocketListener] Failed to remove stale socket:", e); } this.server = net.createServer((socket) => { socket.on("connect", () => { - console.log("[Events] Unix socket client connected"); + console.log("[UnixSocketListener] Unix socket client connected"); }); let buffer = Buffer.alloc(0); socket.on("data", (data: Buffer) => { @@ -58,18 +58,18 @@ export class UnixSocketListener extends BaseEventListener { const msgBuf = buffer.subarray(4, 4 + msgLen); buffer = buffer.subarray(4 + msgLen); try { - const payload = JSON.parse(msgBuf.toString()); + const payload = JSON.parse(msgBuf.toString()) as { id: EVENT; event: Event }; this.eventEmitter.emit(payload.id, payload.event); } catch (e) { - console.error("[Events] Failed to parse unix socket data:", e); + console.error("[UnixSocketListener] Failed to parse unix socket data:", e); } } }); socket.on("error", (err) => { - console.error("[Events] Unix socket error:", err); + console.error("[UnixSocketListener] Unix socket error:", err); }); socket.on("close", () => { - console.log("[Events] Unix socket client disconnected"); + console.log("[UnixSocketListener] Unix socket client disconnected"); }); }); @@ -83,14 +83,14 @@ export class UnixSocketListener extends BaseEventListener { } async close(): Promise { - console.log("[Events] Closing unix socket server"); + console.log("[UnixSocketListener] Closing unix socket server"); this.server.close(); // clean up socket file try { fs.unlinkSync(this.socketPath); } catch (e) { - console.error("[Events] Failed to unlink socket file:", e); + console.error("[UnixSocketListener] Failed to unlink socket file:", e); } process.exit(0); diff --git a/src/util/util/ipc/writer/RabbitMqSingleWriter.ts b/src/util/util/ipc/writer/RabbitMqSingleWriter.ts new file mode 100644 index 000000000..5bad33076 --- /dev/null +++ b/src/util/util/ipc/writer/RabbitMqSingleWriter.ts @@ -0,0 +1,70 @@ +/* + Spacebar: A FOSS re-implementation and extension of the Discord.com backend. + Copyright (C) 2026 Spacebar and Spacebar Contributors + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . +*/ + +import { BaseEventWriter } from "./BaseEventWriter"; +import amqp, { Channel, ChannelModel } from "amqplib"; +import { Event } from "@spacebar/util"; + +export class RabbitMqSingleWriter extends BaseEventWriter { + private readonly host: string; + private connection: ChannelModel; + private channel: Channel; + + constructor(host: string) { + super(); + this.host = host; + } + + async init(): Promise { + console.log(`[RabbitMQ] Connecting to: ${this.host}`); + this.connection = await amqp.connect(this.host, { + timeout: 1000 * 60, + noDelay: true, + }); + this.channel = await this.connection.createChannel(); + } + async close(): Promise { + await this.channel.close(); + await this.connection.close(); + } + + async emit(event: Event): Promise { + // todo check if channel is closed + if ((this.channel as unknown as { closed?: boolean }).closed) this.channel = await this.connection.createChannel(); + await this.channel.assertExchange("", "fanout", { + durable: false, // ensure that messages arent written to disk + }); + + let success = false; + try { + success = this.channel.publish( + "", + "", + Buffer.from(JSON.stringify({ id: (event.guild_id || event.channel_id || event.user_id || event.session_id) as string, event })), + {}, + ); + } catch (e) { + console.error("[RabbitMqSingleWriter] Got error while publishing event:", e); + } + + if (!success) { + console.log("[RabbitMqSingleWriter] Publishing message was not successful, retrying..."); + await this.emit(event); + } + } +} diff --git a/src/util/util/ipc/writer/UnixSocketWriter.ts b/src/util/util/ipc/writer/UnixSocketWriter.ts index 84839593e..53b9564ce 100644 --- a/src/util/util/ipc/writer/UnixSocketWriter.ts +++ b/src/util/util/ipc/writer/UnixSocketWriter.ts @@ -40,22 +40,22 @@ export class UnixSocketWriter extends BaseEventWriter { async init() { if (!fs.opendirSync(this.socketPath)) throw new Error("Unix socket path does not exist or is not a directory: " + this.socketPath); - console.log("[Events] Unix socket writer initializing for", this.socketPath); + console.log("[UnixSocketWriter] Unix socket writer initializing for", this.socketPath); const connect = (file: string) => { const fullPath = path.join(this.socketPath, file); const pid = Number(path.basename(file, ".sock")); - console.log("[Events] Attempting to connect to unix socket:", fullPath, "| proc:", getPidCmdline(pid) ?? red("No such pid: " + pid)); + console.log("[UnixSocketWriter] Attempting to connect to unix socket:", fullPath, "| proc:", getPidCmdline(pid) ?? red("No such pid: " + pid)); // avoid duplicate connections if (this.clients[fullPath] && !this.clients[fullPath].destroyed) { - console.log("[Events] Unix socket client already connected to", fullPath); + console.log("[UnixSocketWriter] Unix socket client already connected to", fullPath); return; } // clean up old connection if it exists if (this.clients[fullPath]) { - console.log("[Events] Removing stale unix socket client for", fullPath); + console.log("[UnixSocketWriter] Removing stale unix socket client for", fullPath); try { this.clients[fullPath].destroy(); } catch (e) { @@ -68,21 +68,21 @@ export class UnixSocketWriter extends BaseEventWriter { try { const stats = fs.statSync(fullPath); if (!stats.isSocket()) { - console.log("[Events] Ignoring non-socket file:", fullPath); + console.log("[UnixSocketWriter] Ignoring non-socket file:", fullPath); return; } } catch (e) { - console.log("[Events] Cannot stat socket file:", fullPath); + console.log("[UnixSocketWriter] Cannot stat socket file:", fullPath); return; } try { this.clients[fullPath] = net.createConnection(fullPath, () => { - console.log("[Events] Unix socket client connected to", fullPath); + console.log("[UnixSocketWriter] Unix socket client connected to", fullPath); }); this.clients[fullPath].on("error", (err) => { - console.error("[Events] Unix socket client error on", fullPath, ":", err); + console.error("[UnixSocketWriter] Unix socket client error on", fullPath, ":", err); // clean up after error if (this.clients[fullPath]) { delete this.clients[fullPath]; @@ -91,18 +91,18 @@ export class UnixSocketWriter extends BaseEventWriter { // handle clean socket closure this.clients[fullPath].on("close", () => { - console.log("[Events] Unix socket client closed:", fullPath); + console.log("[UnixSocketWriter] Unix socket client closed:", fullPath); delete this.clients[fullPath]; }); } catch (e) { - console.error("[Events] Failed to create connection to", fullPath, ":", e); + console.error("[UnixSocketWriter] Failed to create connection to", fullPath, ":", e); delete this.clients[fullPath]; } }; // connect to all sockets, now and in the future this.watcher = fs.watch(this.socketPath, {}, (eventType, filename) => { - console.log("[Events] Unix socket writer received watch sig", eventType, filename); + console.log("[UnixSocketWriter] Unix socket writer received watch sig", eventType, filename); if (eventType === "rename" && filename?.endsWith(".sock")) { try { const fullPath = path.join(this.socketPath, filename!); @@ -110,7 +110,7 @@ export class UnixSocketWriter extends BaseEventWriter { connect(filename!); } else { if (this.clients[fullPath]) { - console.log("[Events] Unix socket writer detected removed socket:", fullPath); + console.log("[UnixSocketWriter] Unix socket writer detected removed socket:", fullPath); try { this.clients[fullPath].destroy(); } catch (e) { @@ -126,20 +126,20 @@ export class UnixSocketWriter extends BaseEventWriter { }); this.watcher.on("error", (err) => { - console.error("[Events] Unix socket watcher error:", err); + console.error("[UnixSocketWriter] Unix socket watcher error:", err); }); // connect to existing sockets if any try { const files = fs.readdirSync(this.socketPath); - console.log("[Events] Unix socket writer found existing sockets:", files); + console.log("[UnixSocketWriter] Unix socket writer found existing sockets:", files); files.forEach((file) => { if (file.endsWith(".sock")) { connect(file); } }); } catch (err) { - console.error("[Events] Unix socket writer failed to read directory:", err); + console.error("[UnixSocketWriter] Unix socket writer failed to read directory:", err); } this.isInitializing = false; @@ -151,11 +151,11 @@ export class UnixSocketWriter extends BaseEventWriter { // check if there are any listeners const clientCount = Object.entries(this.clients).length; if (clientCount === 0) { - console.warn("[Events] Unix socket writer has no connected clients to emit to, backlog size:", this.backlog.length + 1); + console.warn("[UnixSocketWriter] Unix socket writer has no connected clients to emit to, backlog size:", this.backlog.length + 1); this.backlog.push(event); if (!this.isInitializing) { this.isInitializing = true; - console.log("[Events] Re-initializing unix socket writer due to new event with no listeners"); + console.log("[UnixSocketWriter] Re-initializing unix socket writer due to new event with no listeners"); await this.close(); await this.init(); } @@ -165,7 +165,7 @@ export class UnixSocketWriter extends BaseEventWriter { await this.replayLock; await (this.replayLock = Promise.resolve().then(async () => { if (this.backlog.length > 0) { - console.log(`[Events] Replaying ${this.backlog.length} backlog events`); + console.log(`[UnixSocketWriter] Replaying ${this.backlog.length} backlog events`); for (const backlogEvent of this.backlog) { await this.broadcast(backlogEvent); } @@ -187,7 +187,7 @@ export class UnixSocketWriter extends BaseEventWriter { for (const [socketPath, socket] of Object.entries(this.clients)) { if (socket.destroyed) { - console.log("[Events] Unix socket writer found destroyed socket, removing:", socketPath); + console.log("[UnixSocketWriter] Unix socket writer found destroyed socket, removing:", socketPath); delete this.clients[socketPath]; continue; } @@ -195,19 +195,19 @@ export class UnixSocketWriter extends BaseEventWriter { try { socket.write(framed); } catch (e) { - console.error("[Events] Unix socket writer failed to write to socket", socketPath, ":", e); + console.error("[UnixSocketWriter] Unix socket writer failed to write to socket", socketPath, ":", e); } } if (tsw.elapsed().totalMilliseconds > 5) // else it's too noisy - console.log(`[Events] Unix socket writer emitted to ${Object.entries(this.clients).length} sockets in ${tsw.elapsed().totalMilliseconds}ms`); + console.log(`[UnixSocketWriter] Unix socket writer emitted to ${Object.entries(this.clients).length} sockets in ${tsw.elapsed().totalMilliseconds}ms`); res(); })); } async close() { - console.log("[Events] Closing Unix socket writer"); + console.log("[UnixSocketWriter] Closing Unix socket writer"); if (this.watcher) { this.watcher.close(); @@ -218,7 +218,7 @@ export class UnixSocketWriter extends BaseEventWriter { try { socket.destroy(); } catch (e) { - console.error("[Events] Error closing socket", path, ":", e); + console.error("[UnixSocketWriter] Error closing socket", path, ":", e); } } this.clients = {};