Better error handling in rabbitmqsinglelistener/writer

This commit is contained in:
Rory&
2026-05-17 02:46:48 +02:00
committed by Rory&
parent b053758d1c
commit fc60a9be8f
2 changed files with 49 additions and 14 deletions
@@ -24,8 +24,8 @@ import { randomUUID } from "node:crypto";
export class RabbitMqSingleListener extends BaseEventListener {
private readonly host: string;
private connection: ChannelModel;
private channel: Channel;
private connection?: ChannelModel;
private channel?: Channel;
eventEmitter: EventEmitter;
constructor(host: string) {
@@ -90,8 +90,10 @@ export class RabbitMqSingleListener extends BaseEventListener {
}
async close(): Promise<void> {
await this.channel.close();
await this.connection.close();
await this.channel?.close();
this.channel = undefined;
await this.connection?.close();
this.connection = undefined;
}
async listen(event: string, callback: (event: EventOpts) => unknown): Promise<() => Promise<void>> {
@@ -18,12 +18,12 @@
import { BaseEventWriter } from "./BaseEventWriter";
import amqp, { Channel, ChannelModel } from "amqplib";
import { Event } from "@spacebar/util";
import { Event, sleep } from "@spacebar/util";
export class RabbitMqSingleWriter extends BaseEventWriter {
private readonly host: string;
private connection: ChannelModel;
private channel: Channel;
private connection?: ChannelModel;
private channel?: Channel;
constructor(host: string) {
super();
@@ -31,19 +31,52 @@ export class RabbitMqSingleWriter extends BaseEventWriter {
}
async init(): Promise<void> {
console.log(`[RabbitMQ] Connecting to: ${this.host}`);
this.connection = await amqp.connect(this.host, {
timeout: 1000 * 60,
noDelay: true,
});
while (!this.connection) {
try {
console.log(`[RabbitMQSingleWriter] Connecting to: ${this.host}`);
this.connection = await amqp.connect(this.host, {
timeout: 1000 * 60,
noDelay: true,
});
console.log(`[RabbitMQSingleWriter] Connected to: ${this.host}`);
} catch (e) {
console.log(`[RabbitMQSingleWriter] Failed to connect to to: ${this.host}: ${e}`);
await sleep(1000);
}
}
this.channel = await this.connection.createChannel();
for (const sig of ["SIGINT", "SIGTERM", "SIGQUIT"] as const) {
process.on(sig, this.close);
}
this.connection.on("error", (err) => {
console.error("[RabbitMQSingleWriter] Connection error:", err);
});
this.connection.on("close", () => {
console.error("[RabbitMQSingleWriter] Connection closed");
sleep(1000).then(() => {
this.init().catch((e) => console.error("[RabbitMQSingleWriter] Failed to schedule reconnection:", e));
});
});
}
async close(): Promise<void> {
await this.channel.close();
await this.connection.close();
await this.channel?.close();
this.channel = undefined;
await this.connection?.close();
this.connection = undefined;
}
async emit(event: Event): Promise<void> {
if (!this.connection) {
throw new Error("RabbitMqSingleWriter#emit called without connection being initialised!");
}
if (!this.channel) {
throw new Error("RabbitMqSingleWriter#emit called without channel being initialised!");
}
// todo check if channel is closed
if ((this.channel as unknown as { closed?: boolean }).closed) this.channel = await this.connection.createChannel();
await this.channel.assertExchange("-", "fanout", {