Add NixOS test for starting with rabbitmq-single ipc

This commit is contained in:
Rory&
2026-05-17 02:14:27 +02:00
parent 92c07ba445
commit f4bc4a223d
5 changed files with 54 additions and 20 deletions
+2 -2
View File
@@ -52,7 +52,7 @@ export async function emitEvent(payload: Omit<Event, "created_at">) {
export async function initEvent() {
if (process.env.EVENT_TRANSMISSION === "rabbitmq-single") {
if (!Config.get().rabbitmq.host!) {
throw new Error("[Events] RabbitMQ is not configured.");
throw new Error("[Events] rabbitmq.host is not configured.");
}
if (!writer) {
@@ -112,7 +112,7 @@ export interface ProcessEvent {
export async function listenEvent(event: string, callback: (event: EventOpts) => unknown, opts?: ListenEventOpts): Promise<() => Promise<void>> {
if (process.env.EVENT_TRANSMISSION === "rabbitmq-single") {
if (!Config.get().rabbitmq.host) {
throw new Error("[Events] EVENT_SOCKET_PATH is not configured.");
throw new Error("[Events] rabbitmq.host is not configured.");
}
if (!listener) {
listener = new RabbitMqSingleListener(Config.get().rabbitmq.host!);
@@ -18,7 +18,7 @@
import EventEmitter from "node:events";
import { BaseEventListener } from "./BaseEventListener";
import { EVENT, Event, EventOpts } from "@spacebar/util";
import { EVENT, Event, EventOpts, sleep } from "@spacebar/util";
import amqp, { Channel, ChannelModel } from "amqplib";
import { randomUUID } from "node:crypto";
@@ -35,11 +35,19 @@ export class RabbitMqSingleListener extends BaseEventListener {
}
async init() {
console.log(`[RabbitMQSingleListener] Connecting to: ${this.host}`);
this.connection = await amqp.connect(this.host, {
timeout: 1000 * 60,
noDelay: true,
});
while (!this.connection) {
try {
console.log(`[RabbitMQSingleListener] Connecting to: ${this.host}`);
this.connection = await amqp.connect(this.host, {
timeout: 1000 * 60,
noDelay: true,
});
console.log(`[RabbitMQSingleListener] Connected to: ${this.host}`);
} catch (e) {
console.log(`[RabbitMQSingleListener] Failed to connect to to: ${this.host}: ${e}`);
await sleep(1000);
}
}
this.channel = await this.connection.createChannel();
for (const sig of ["SIGINT", "SIGTERM", "SIGQUIT"] as const) {
@@ -52,19 +60,21 @@ export class RabbitMqSingleListener extends BaseEventListener {
this.connection.on("close", () => {
console.error("[RabbitMQSingleListener] Connection closed");
this.init().catch((e) => console.error("[RabbitMQSingleListener] Failed to schedule reconnection:", e));
sleep(1000).then(() => {
this.init().catch((e) => console.error("[RabbitMQSingleListener] Failed to schedule reconnection:", e));
});
});
// actually set up event receiving?
await this.channel.assertExchange("", "fanout", { durable: false });
const q = await this.channel.assertQueue("", {
exclusive: true,
await this.channel.assertExchange("-", "fanout", { durable: false });
const q = await this.channel.assertQueue("-", {
exclusive: false,
autoDelete: true,
messageTtl: 5000,
});
const consumerTag = randomUUID();
await this.channel.bindQueue(q.queue, "", "");
await this.channel.bindQueue(q.queue, "-", "");
await this.channel.consume(
q.queue,
(opts) => {
@@ -46,14 +46,14 @@ export class RabbitMqSingleWriter extends BaseEventWriter {
async emit(event: Event): Promise<void> {
// todo check if channel is closed
if ((this.channel as unknown as { closed?: boolean }).closed) this.channel = await this.connection.createChannel();
await this.channel.assertExchange("", "fanout", {
await this.channel.assertExchange("-", "fanout", {
durable: false, // ensure that messages arent written to disk
});
let success = false;
try {
success = this.channel.publish(
"",
"-",
"",
Buffer.from(JSON.stringify({ id: (event.guild_id || event.channel_id || event.user_id || event.session_id) as string, event })),
{},