Add untested rabbitmq single-channel IPC

This commit is contained in:
Rory&
2026-05-16 20:36:15 +02:00
parent 6459a3332d
commit 3c0d8b507b
7 changed files with 228 additions and 37 deletions
+1 -1
View File
@@ -37,7 +37,7 @@ export * from "./Logo";
export * from "./MessageFlags";
export * from "./networking";
export * from "./Permissions";
export * from "./RabbitMQ";
export * from "./ipc/RabbitMQ";
export * from "./Regex";
export * from "./Rights";
export * from "./Snowflake";
+16 -1
View File
@@ -20,13 +20,15 @@ import { Channel } from "amqplib";
import { randomUUID } from "node:crypto";
import EventEmitter from "node:events";
import path from "node:path";
import { RabbitMQ } from "../RabbitMQ";
import { RabbitMQ } from "./RabbitMQ";
import { EVENT, Event } from "../../interfaces";
import { Config } from "../Config";
import { BaseEventListener } from "./listener/BaseEventListener";
import { BaseEventWriter } from "./writer/BaseEventWriter";
import { UnixSocketWriter } from "./writer/UnixSocketWriter";
import { UnixSocketListener } from "./listener/UnixSocketListener";
import { RabbitMqSingleListener } from "./listener/RabbitMqSingleListener";
import { yellow } from "picocolors";
export const events = new EventEmitter();
let listener: BaseEventListener | null = null;
@@ -131,12 +133,25 @@ export interface ProcessEvent {
export async function listenEvent(event: string, callback: (event: EventOpts) => unknown, opts?: ListenEventOpts): Promise<() => Promise<void>> {
if (RabbitMQ.connection) {
if (process.env.EVENT_TRANSMISSION !== "rabbitmq-legacy") {
console.warn(yellow("[Events] Warning:"), "RabbitMQ replication without configuring EVENT_TRANSMISSION is deprecated.");
console.warn(yellow("[Events] Warning:"), "Set EVENT_TRANSMISSION to 'rabbitmq-legacy' in environment variables to silence this warning.");
}
const rabbitMQChannel = await RabbitMQ.getSafeChannel();
const channel = opts?.channel || rabbitMQChannel;
if (!channel) throw new Error("[Events] An event was sent without an associated channel");
return await rabbitListen(channel, event, callback, {
acknowledge: opts?.acknowledge,
});
} else if (process.env.EVENT_TRANSMISSION === "rabbitmq-single") {
if (!Config.get().rabbitmq.host) {
console.error("[Events] RabbitMQ is not configured.");
}
if (!listener) {
listener = new RabbitMqSingleListener(Config.get().rabbitmq.host!);
await listener.init();
}
return await listener.listen(event, callback);
} else if (process.env.EVENT_TRANSMISSION === "unix" && process.env.EVENT_SOCKET_PATH) {
if (!unixSocketListener) {
listener = unixSocketListener = new UnixSocketListener(path.join(process.env.EVENT_SOCKET_PATH, `${process.pid}.sock`));
@@ -16,9 +16,9 @@
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
import amqp, { Channel, ChannelModel } from "amqplib";
import { Config } from "./Config";
import EventEmitter from "node:events";
import amqp, { Channel, ChannelModel } from "amqplib";
import { Config } from "../Config";
export class RabbitMQ {
public static connection: ChannelModel | null = null;
@@ -0,0 +1,106 @@
/*
Spacebar: A FOSS re-implementation and extension of the Discord.com backend.
Copyright (C) 2026 Spacebar and Spacebar Contributors
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
import EventEmitter from "node:events";
import { BaseEventListener } from "./BaseEventListener";
import { EVENT, Event, EventOpts, RabbitMQ } from "@spacebar/util";
import amqp, { Channel, ChannelModel } from "amqplib";
import { randomUUID } from "node:crypto";
export class RabbitMqSingleListener extends BaseEventListener {
private readonly host: string;
private connection: ChannelModel;
private channel: Channel;
eventEmitter: EventEmitter;
constructor(host: string) {
super();
this.eventEmitter = new EventEmitter();
this.host = host;
}
async init() {
console.log(`[RabbitMQSingleListener] Connecting to: ${this.host}`);
this.connection = await amqp.connect(this.host, {
timeout: 1000 * 60,
noDelay: true,
});
this.channel = await this.connection.createChannel();
for (const sig of ["SIGINT", "SIGTERM", "SIGQUIT"] as const) {
process.on(sig, this.close);
}
this.connection.on("error", (err) => {
console.error("[RabbitMQSingleListener] Connection error:", err);
});
this.connection.on("close", () => {
console.error("[RabbitMQSingleListener] Connection closed");
this.init().catch((e) => console.error("[RabbitMQSingleListener] Failed to schedule reconnection:", e));
});
// actually set up event receiving?
await this.channel.assertExchange("", "fanout", { durable: false });
const q = await this.channel.assertQueue("", {
exclusive: true,
autoDelete: true,
messageTtl: 5000,
});
const consumerTag = randomUUID();
await this.channel.bindQueue(q.queue, "", "");
await this.channel.consume(
q.queue,
(opts) => {
if (!opts) return;
const data = JSON.parse(opts.content.toString()) as { id: EVENT; event: Event };
this.eventEmitter.emit(data.id, data.event);
},
{
consumerTag,
},
);
}
async close(): Promise<void> {
await this.channel.close();
await this.connection.close();
}
async listen(event: string, callback: (event: EventOpts) => unknown): Promise<() => Promise<void>> {
const listener = (data: Event) => {
callback({
...data,
cancel,
});
};
this.eventEmitter.addListener(event, listener);
const cancel = async () => {
this.eventEmitter.removeListener(event, listener);
this.eventEmitter.setMaxListeners(this.eventEmitter.getMaxListeners() - 1);
};
this.eventEmitter.setMaxListeners(this.eventEmitter.getMaxListeners() + 1);
return cancel;
}
}
@@ -20,7 +20,7 @@ import EventEmitter from "node:events";
import fs from "node:fs";
import net, { Server } from "node:net";
import { BaseEventListener } from "./BaseEventListener";
import { Event, EventOpts } from "@spacebar/util";
import { EVENT, Event, EventOpts } from "@spacebar/util";
export class UnixSocketListener extends BaseEventListener {
eventEmitter: EventEmitter;
@@ -39,15 +39,15 @@ export class UnixSocketListener extends BaseEventListener {
try {
if (fs.existsSync(this.socketPath)) {
fs.unlinkSync(this.socketPath);
console.log("[Events] Removed stale socket file:", this.socketPath);
console.log("[UnixSocketListener] Removed stale socket file:", this.socketPath);
}
} catch (e) {
console.error("[Events] Failed to remove stale socket:", e);
console.error("[UnixSocketListener] Failed to remove stale socket:", e);
}
this.server = net.createServer((socket) => {
socket.on("connect", () => {
console.log("[Events] Unix socket client connected");
console.log("[UnixSocketListener] Unix socket client connected");
});
let buffer = Buffer.alloc(0);
socket.on("data", (data: Buffer) => {
@@ -58,18 +58,18 @@ export class UnixSocketListener extends BaseEventListener {
const msgBuf = buffer.subarray(4, 4 + msgLen);
buffer = buffer.subarray(4 + msgLen);
try {
const payload = JSON.parse(msgBuf.toString());
const payload = JSON.parse(msgBuf.toString()) as { id: EVENT; event: Event };
this.eventEmitter.emit(payload.id, payload.event);
} catch (e) {
console.error("[Events] Failed to parse unix socket data:", e);
console.error("[UnixSocketListener] Failed to parse unix socket data:", e);
}
}
});
socket.on("error", (err) => {
console.error("[Events] Unix socket error:", err);
console.error("[UnixSocketListener] Unix socket error:", err);
});
socket.on("close", () => {
console.log("[Events] Unix socket client disconnected");
console.log("[UnixSocketListener] Unix socket client disconnected");
});
});
@@ -83,14 +83,14 @@ export class UnixSocketListener extends BaseEventListener {
}
async close(): Promise<void> {
console.log("[Events] Closing unix socket server");
console.log("[UnixSocketListener] Closing unix socket server");
this.server.close();
// clean up socket file
try {
fs.unlinkSync(this.socketPath);
} catch (e) {
console.error("[Events] Failed to unlink socket file:", e);
console.error("[UnixSocketListener] Failed to unlink socket file:", e);
}
process.exit(0);
@@ -0,0 +1,70 @@
/*
Spacebar: A FOSS re-implementation and extension of the Discord.com backend.
Copyright (C) 2026 Spacebar and Spacebar Contributors
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
import { BaseEventWriter } from "./BaseEventWriter";
import amqp, { Channel, ChannelModel } from "amqplib";
import { Event } from "@spacebar/util";
export class RabbitMqSingleWriter extends BaseEventWriter {
private readonly host: string;
private connection: ChannelModel;
private channel: Channel;
constructor(host: string) {
super();
this.host = host;
}
async init(): Promise<void> {
console.log(`[RabbitMQ] Connecting to: ${this.host}`);
this.connection = await amqp.connect(this.host, {
timeout: 1000 * 60,
noDelay: true,
});
this.channel = await this.connection.createChannel();
}
async close(): Promise<void> {
await this.channel.close();
await this.connection.close();
}
async emit(event: Event): Promise<void> {
// todo check if channel is closed
if ((this.channel as unknown as { closed?: boolean }).closed) this.channel = await this.connection.createChannel();
await this.channel.assertExchange("", "fanout", {
durable: false, // ensure that messages arent written to disk
});
let success = false;
try {
success = this.channel.publish(
"",
"",
Buffer.from(JSON.stringify({ id: (event.guild_id || event.channel_id || event.user_id || event.session_id) as string, event })),
{},
);
} catch (e) {
console.error("[RabbitMqSingleWriter] Got error while publishing event:", e);
}
if (!success) {
console.log("[RabbitMqSingleWriter] Publishing message was not successful, retrying...");
await this.emit(event);
}
}
}
+23 -23
View File
@@ -40,22 +40,22 @@ export class UnixSocketWriter extends BaseEventWriter {
async init() {
if (!fs.opendirSync(this.socketPath)) throw new Error("Unix socket path does not exist or is not a directory: " + this.socketPath);
console.log("[Events] Unix socket writer initializing for", this.socketPath);
console.log("[UnixSocketWriter] Unix socket writer initializing for", this.socketPath);
const connect = (file: string) => {
const fullPath = path.join(this.socketPath, file);
const pid = Number(path.basename(file, ".sock"));
console.log("[Events] Attempting to connect to unix socket:", fullPath, "| proc:", getPidCmdline(pid) ?? red("No such pid: " + pid));
console.log("[UnixSocketWriter] Attempting to connect to unix socket:", fullPath, "| proc:", getPidCmdline(pid) ?? red("No such pid: " + pid));
// avoid duplicate connections
if (this.clients[fullPath] && !this.clients[fullPath].destroyed) {
console.log("[Events] Unix socket client already connected to", fullPath);
console.log("[UnixSocketWriter] Unix socket client already connected to", fullPath);
return;
}
// clean up old connection if it exists
if (this.clients[fullPath]) {
console.log("[Events] Removing stale unix socket client for", fullPath);
console.log("[UnixSocketWriter] Removing stale unix socket client for", fullPath);
try {
this.clients[fullPath].destroy();
} catch (e) {
@@ -68,21 +68,21 @@ export class UnixSocketWriter extends BaseEventWriter {
try {
const stats = fs.statSync(fullPath);
if (!stats.isSocket()) {
console.log("[Events] Ignoring non-socket file:", fullPath);
console.log("[UnixSocketWriter] Ignoring non-socket file:", fullPath);
return;
}
} catch (e) {
console.log("[Events] Cannot stat socket file:", fullPath);
console.log("[UnixSocketWriter] Cannot stat socket file:", fullPath);
return;
}
try {
this.clients[fullPath] = net.createConnection(fullPath, () => {
console.log("[Events] Unix socket client connected to", fullPath);
console.log("[UnixSocketWriter] Unix socket client connected to", fullPath);
});
this.clients[fullPath].on("error", (err) => {
console.error("[Events] Unix socket client error on", fullPath, ":", err);
console.error("[UnixSocketWriter] Unix socket client error on", fullPath, ":", err);
// clean up after error
if (this.clients[fullPath]) {
delete this.clients[fullPath];
@@ -91,18 +91,18 @@ export class UnixSocketWriter extends BaseEventWriter {
// handle clean socket closure
this.clients[fullPath].on("close", () => {
console.log("[Events] Unix socket client closed:", fullPath);
console.log("[UnixSocketWriter] Unix socket client closed:", fullPath);
delete this.clients[fullPath];
});
} catch (e) {
console.error("[Events] Failed to create connection to", fullPath, ":", e);
console.error("[UnixSocketWriter] Failed to create connection to", fullPath, ":", e);
delete this.clients[fullPath];
}
};
// connect to all sockets, now and in the future
this.watcher = fs.watch(this.socketPath, {}, (eventType, filename) => {
console.log("[Events] Unix socket writer received watch sig", eventType, filename);
console.log("[UnixSocketWriter] Unix socket writer received watch sig", eventType, filename);
if (eventType === "rename" && filename?.endsWith(".sock")) {
try {
const fullPath = path.join(this.socketPath, filename!);
@@ -110,7 +110,7 @@ export class UnixSocketWriter extends BaseEventWriter {
connect(filename!);
} else {
if (this.clients[fullPath]) {
console.log("[Events] Unix socket writer detected removed socket:", fullPath);
console.log("[UnixSocketWriter] Unix socket writer detected removed socket:", fullPath);
try {
this.clients[fullPath].destroy();
} catch (e) {
@@ -126,20 +126,20 @@ export class UnixSocketWriter extends BaseEventWriter {
});
this.watcher.on("error", (err) => {
console.error("[Events] Unix socket watcher error:", err);
console.error("[UnixSocketWriter] Unix socket watcher error:", err);
});
// connect to existing sockets if any
try {
const files = fs.readdirSync(this.socketPath);
console.log("[Events] Unix socket writer found existing sockets:", files);
console.log("[UnixSocketWriter] Unix socket writer found existing sockets:", files);
files.forEach((file) => {
if (file.endsWith(".sock")) {
connect(file);
}
});
} catch (err) {
console.error("[Events] Unix socket writer failed to read directory:", err);
console.error("[UnixSocketWriter] Unix socket writer failed to read directory:", err);
}
this.isInitializing = false;
@@ -151,11 +151,11 @@ export class UnixSocketWriter extends BaseEventWriter {
// check if there are any listeners
const clientCount = Object.entries(this.clients).length;
if (clientCount === 0) {
console.warn("[Events] Unix socket writer has no connected clients to emit to, backlog size:", this.backlog.length + 1);
console.warn("[UnixSocketWriter] Unix socket writer has no connected clients to emit to, backlog size:", this.backlog.length + 1);
this.backlog.push(event);
if (!this.isInitializing) {
this.isInitializing = true;
console.log("[Events] Re-initializing unix socket writer due to new event with no listeners");
console.log("[UnixSocketWriter] Re-initializing unix socket writer due to new event with no listeners");
await this.close();
await this.init();
}
@@ -165,7 +165,7 @@ export class UnixSocketWriter extends BaseEventWriter {
await this.replayLock;
await (this.replayLock = Promise.resolve().then(async () => {
if (this.backlog.length > 0) {
console.log(`[Events] Replaying ${this.backlog.length} backlog events`);
console.log(`[UnixSocketWriter] Replaying ${this.backlog.length} backlog events`);
for (const backlogEvent of this.backlog) {
await this.broadcast(backlogEvent);
}
@@ -187,7 +187,7 @@ export class UnixSocketWriter extends BaseEventWriter {
for (const [socketPath, socket] of Object.entries(this.clients)) {
if (socket.destroyed) {
console.log("[Events] Unix socket writer found destroyed socket, removing:", socketPath);
console.log("[UnixSocketWriter] Unix socket writer found destroyed socket, removing:", socketPath);
delete this.clients[socketPath];
continue;
}
@@ -195,19 +195,19 @@ export class UnixSocketWriter extends BaseEventWriter {
try {
socket.write(framed);
} catch (e) {
console.error("[Events] Unix socket writer failed to write to socket", socketPath, ":", e);
console.error("[UnixSocketWriter] Unix socket writer failed to write to socket", socketPath, ":", e);
}
}
if (tsw.elapsed().totalMilliseconds > 5)
// else it's too noisy
console.log(`[Events] Unix socket writer emitted to ${Object.entries(this.clients).length} sockets in ${tsw.elapsed().totalMilliseconds}ms`);
console.log(`[UnixSocketWriter] Unix socket writer emitted to ${Object.entries(this.clients).length} sockets in ${tsw.elapsed().totalMilliseconds}ms`);
res();
}));
}
async close() {
console.log("[Events] Closing Unix socket writer");
console.log("[UnixSocketWriter] Closing Unix socket writer");
if (this.watcher) {
this.watcher.close();
@@ -218,7 +218,7 @@ export class UnixSocketWriter extends BaseEventWriter {
try {
socket.destroy();
} catch (e) {
console.error("[Events] Error closing socket", path, ":", e);
console.error("[UnixSocketWriter] Error closing socket", path, ":", e);
}
}
this.clients = {};