diff --git a/src/util/util/ipc/listener/RabbitMqSingleListener.ts b/src/util/util/ipc/listener/RabbitMqSingleListener.ts index 127d79cc2..b5fd5efd5 100644 --- a/src/util/util/ipc/listener/RabbitMqSingleListener.ts +++ b/src/util/util/ipc/listener/RabbitMqSingleListener.ts @@ -24,8 +24,8 @@ import { randomUUID } from "node:crypto"; export class RabbitMqSingleListener extends BaseEventListener { private readonly host: string; - private connection: ChannelModel; - private channel: Channel; + private connection?: ChannelModel; + private channel?: Channel; eventEmitter: EventEmitter; constructor(host: string) { @@ -90,8 +90,10 @@ export class RabbitMqSingleListener extends BaseEventListener { } async close(): Promise { - await this.channel.close(); - await this.connection.close(); + await this.channel?.close(); + this.channel = undefined; + await this.connection?.close(); + this.connection = undefined; } async listen(event: string, callback: (event: EventOpts) => unknown): Promise<() => Promise> { diff --git a/src/util/util/ipc/writer/RabbitMqSingleWriter.ts b/src/util/util/ipc/writer/RabbitMqSingleWriter.ts index 9be6299b5..28781c6db 100644 --- a/src/util/util/ipc/writer/RabbitMqSingleWriter.ts +++ b/src/util/util/ipc/writer/RabbitMqSingleWriter.ts @@ -18,12 +18,12 @@ import { BaseEventWriter } from "./BaseEventWriter"; import amqp, { Channel, ChannelModel } from "amqplib"; -import { Event } from "@spacebar/util"; +import { Event, sleep } from "@spacebar/util"; export class RabbitMqSingleWriter extends BaseEventWriter { private readonly host: string; - private connection: ChannelModel; - private channel: Channel; + private connection?: ChannelModel; + private channel?: Channel; constructor(host: string) { super(); @@ -31,19 +31,52 @@ export class RabbitMqSingleWriter extends BaseEventWriter { } async init(): Promise { - console.log(`[RabbitMQ] Connecting to: ${this.host}`); - this.connection = await amqp.connect(this.host, { - timeout: 1000 * 60, - noDelay: true, - }); + while (!this.connection) { + try { + console.log(`[RabbitMQSingleWriter] Connecting to: ${this.host}`); + this.connection = await amqp.connect(this.host, { + timeout: 1000 * 60, + noDelay: true, + }); + console.log(`[RabbitMQSingleWriter] Connected to: ${this.host}`); + } catch (e) { + console.log(`[RabbitMQSingleWriter] Failed to connect to to: ${this.host}: ${e}`); + await sleep(1000); + } + } this.channel = await this.connection.createChannel(); + + for (const sig of ["SIGINT", "SIGTERM", "SIGQUIT"] as const) { + process.on(sig, this.close); + } + + this.connection.on("error", (err) => { + console.error("[RabbitMQSingleWriter] Connection error:", err); + }); + + this.connection.on("close", () => { + console.error("[RabbitMQSingleWriter] Connection closed"); + sleep(1000).then(() => { + this.init().catch((e) => console.error("[RabbitMQSingleWriter] Failed to schedule reconnection:", e)); + }); + }); } + async close(): Promise { - await this.channel.close(); - await this.connection.close(); + await this.channel?.close(); + this.channel = undefined; + await this.connection?.close(); + this.connection = undefined; } async emit(event: Event): Promise { + if (!this.connection) { + throw new Error("RabbitMqSingleWriter#emit called without connection being initialised!"); + } + if (!this.channel) { + throw new Error("RabbitMqSingleWriter#emit called without channel being initialised!"); + } + // todo check if channel is closed if ((this.channel as unknown as { closed?: boolean }).closed) this.channel = await this.connection.createChannel(); await this.channel.assertExchange("-", "fanout", {