From f4bc4a223d7c002316b9beea066b05b3b643c826 Mon Sep 17 00:00:00 2001 From: Rory& Date: Sun, 17 May 2026 02:14:27 +0200 Subject: [PATCH] Add NixOS test for starting with rabbitmq-single ipc --- default.nix | 17 ++++++++-- nix/tests/test-bundle-starts.nix | 17 ++++++++-- src/util/util/ipc/Event.ts | 4 +-- .../ipc/listener/RabbitMqSingleListener.ts | 32 ++++++++++++------- .../util/ipc/writer/RabbitMqSingleWriter.ts | 4 +-- 5 files changed, 54 insertions(+), 20 deletions(-) diff --git a/default.nix b/default.nix index 6af2bae2a..d47a30d63 100644 --- a/default.nix +++ b/default.nix @@ -82,8 +82,6 @@ let # set +x runHook postInstall ''; - - passthru.tests = pkgs.testers.runNixOSTest (import ./nix/tests/test-bundle-starts.nix self); }; in pkgs.stdenv.mkDerivation { @@ -122,5 +120,18 @@ pkgs.stdenv.mkDerivation { makeWrapper ${pkgs.nodejs_24}/bin/node $out/bin/apply-migrations --prefix NODE_PATH : $out/node_modules --add-flags --enable-source-maps --add-flags $out/dist/apply-migrations.js ''; - passthru.tests = pkgs.testers.runNixOSTest (import ./nix/tests/test-bundle-starts.nix self); + passthru.tests = pkgs.runCommand "spacebar-server-ts-all-tests" rec { + bundleStarts = pkgs.testers.runNixOSTest (import ./nix/tests/test-bundle-starts.nix { inherit self; }); + bundleStartsRabbitMq = pkgs.testers.runNixOSTest ( + import ./nix/tests/test-bundle-starts.nix { + inherit self; + withIpc = "rabbitmq-single"; + } + ); + + nativeBuildInputs = [ + bundleStarts + bundleStartsRabbitMq + ]; + } "touch $out"; } diff --git a/nix/tests/test-bundle-starts.nix b/nix/tests/test-bundle-starts.nix index 86b998cf7..2e088f6de 100644 --- a/nix/tests/test-bundle-starts.nix +++ b/nix/tests/test-bundle-starts.nix @@ -1,4 +1,7 @@ -self: +{ + self, + withIpc ? "unix", +}: { config, lib, @@ -8,11 +11,13 @@ self: let sb = import ../lib/mkEndpoint.nix; + isRabbitMqTest = lib.strings.hasPrefix "rabbitmq" withIpc; in { - name = "test-bundle-starts"; + name = "test-bundle-starts" + lib.optionalString (withIpc != "unix") ("_ipc=" + withIpc); skipTypeCheck = true; skipLint = true; + globalTimeout = 120; nodes.machine = { imports = [ self.nixosModules.default ]; @@ -30,12 +35,20 @@ in LOG_REQUESTS = "-"; # Log all requests LOG_VALIDATION_ERRORS = true; }; + ipcMethod = withIpc; + + settings = { + rabbitmq = { + host = lib.mkIf isRabbitMqTest "amqp://guest:guest@127.0.0.1:5672"; + }; + }; nginx.enable = true; }; in lib.trace ("Testing with config: " + builtins.toJSON cfg) cfg; services.nginx.enable = true; + services.rabbitmq.enable = isRabbitMqTest; services.postgresql = { enable = true; initdbArgs = [ diff --git a/src/util/util/ipc/Event.ts b/src/util/util/ipc/Event.ts index 9e523ce6c..e836c269e 100644 --- a/src/util/util/ipc/Event.ts +++ b/src/util/util/ipc/Event.ts @@ -52,7 +52,7 @@ export async function emitEvent(payload: Omit) { export async function initEvent() { if (process.env.EVENT_TRANSMISSION === "rabbitmq-single") { if (!Config.get().rabbitmq.host!) { - throw new Error("[Events] RabbitMQ is not configured."); + throw new Error("[Events] rabbitmq.host is not configured."); } if (!writer) { @@ -112,7 +112,7 @@ export interface ProcessEvent { export async function listenEvent(event: string, callback: (event: EventOpts) => unknown, opts?: ListenEventOpts): Promise<() => Promise> { if (process.env.EVENT_TRANSMISSION === "rabbitmq-single") { if (!Config.get().rabbitmq.host) { - throw new Error("[Events] EVENT_SOCKET_PATH is not configured."); + throw new Error("[Events] rabbitmq.host is not configured."); } if (!listener) { listener = new RabbitMqSingleListener(Config.get().rabbitmq.host!); diff --git a/src/util/util/ipc/listener/RabbitMqSingleListener.ts b/src/util/util/ipc/listener/RabbitMqSingleListener.ts index b323db4be..127d79cc2 100644 --- a/src/util/util/ipc/listener/RabbitMqSingleListener.ts +++ b/src/util/util/ipc/listener/RabbitMqSingleListener.ts @@ -18,7 +18,7 @@ import EventEmitter from "node:events"; import { BaseEventListener } from "./BaseEventListener"; -import { EVENT, Event, EventOpts } from "@spacebar/util"; +import { EVENT, Event, EventOpts, sleep } from "@spacebar/util"; import amqp, { Channel, ChannelModel } from "amqplib"; import { randomUUID } from "node:crypto"; @@ -35,11 +35,19 @@ export class RabbitMqSingleListener extends BaseEventListener { } async init() { - console.log(`[RabbitMQSingleListener] Connecting to: ${this.host}`); - this.connection = await amqp.connect(this.host, { - timeout: 1000 * 60, - noDelay: true, - }); + while (!this.connection) { + try { + console.log(`[RabbitMQSingleListener] Connecting to: ${this.host}`); + this.connection = await amqp.connect(this.host, { + timeout: 1000 * 60, + noDelay: true, + }); + console.log(`[RabbitMQSingleListener] Connected to: ${this.host}`); + } catch (e) { + console.log(`[RabbitMQSingleListener] Failed to connect to to: ${this.host}: ${e}`); + await sleep(1000); + } + } this.channel = await this.connection.createChannel(); for (const sig of ["SIGINT", "SIGTERM", "SIGQUIT"] as const) { @@ -52,19 +60,21 @@ export class RabbitMqSingleListener extends BaseEventListener { this.connection.on("close", () => { console.error("[RabbitMQSingleListener] Connection closed"); - this.init().catch((e) => console.error("[RabbitMQSingleListener] Failed to schedule reconnection:", e)); + sleep(1000).then(() => { + this.init().catch((e) => console.error("[RabbitMQSingleListener] Failed to schedule reconnection:", e)); + }); }); // actually set up event receiving? - await this.channel.assertExchange("", "fanout", { durable: false }); - const q = await this.channel.assertQueue("", { - exclusive: true, + await this.channel.assertExchange("-", "fanout", { durable: false }); + const q = await this.channel.assertQueue("-", { + exclusive: false, autoDelete: true, messageTtl: 5000, }); const consumerTag = randomUUID(); - await this.channel.bindQueue(q.queue, "", ""); + await this.channel.bindQueue(q.queue, "-", ""); await this.channel.consume( q.queue, (opts) => { diff --git a/src/util/util/ipc/writer/RabbitMqSingleWriter.ts b/src/util/util/ipc/writer/RabbitMqSingleWriter.ts index 5bad33076..9be6299b5 100644 --- a/src/util/util/ipc/writer/RabbitMqSingleWriter.ts +++ b/src/util/util/ipc/writer/RabbitMqSingleWriter.ts @@ -46,14 +46,14 @@ export class RabbitMqSingleWriter extends BaseEventWriter { async emit(event: Event): Promise { // todo check if channel is closed if ((this.channel as unknown as { closed?: boolean }).closed) this.channel = await this.connection.createChannel(); - await this.channel.assertExchange("", "fanout", { + await this.channel.assertExchange("-", "fanout", { durable: false, // ensure that messages arent written to disk }); let success = false; try { success = this.channel.publish( - "", + "-", "", Buffer.from(JSON.stringify({ id: (event.guild_id || event.channel_id || event.user_id || event.session_id) as string, event })), {},