From 1085754f52321b452cecb033903dca39cd0da576 Mon Sep 17 00:00:00 2001 From: dank074 Date: Sat, 13 Dec 2025 13:43:02 -0600 Subject: [PATCH] try to handle race condition again --- src/util/util/Event.ts | 20 +++++++++++++------- src/util/util/RabbitMQ.ts | 15 +++++++++++++++ 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/src/util/util/Event.ts b/src/util/util/Event.ts index ce5c25d25..db4e8066e 100644 --- a/src/util/util/Event.ts +++ b/src/util/util/Event.ts @@ -29,13 +29,18 @@ export async function emitEvent(payload: Omit) { if (RabbitMQ.connection) { const data = typeof payload.data === "object" ? JSON.stringify(payload.data) : payload.data; // use rabbitmq for event transmission - await RabbitMQ.channel?.assertExchange(id, "fanout", { - durable: false, - }); + const channel = await RabbitMQ.getSafeChannel(); + try { + await channel.assertExchange(id, "fanout", { + durable: false, + }); - // assertQueue isn't needed, because a queue will automatically created if it doesn't exist - const successful = RabbitMQ.channel?.publish(id, "", Buffer.from(`${data}`), { type: payload.event }); - if (!successful) throw new Error("failed to send event"); + // assertQueue isn't needed, because a queue will automatically created if it doesn't exist + const successful = channel.publish(id, "", Buffer.from(`${data}`), { type: payload.event }); + if (!successful) throw new Error("failed to send event"); + } catch (e) { + console.log("[RabbitMQ] ", e); + } } else if (process.env.EVENT_TRANSMISSION === "process") { process.send?.({ type: "event", event: payload, id } as ProcessEvent); } else { @@ -72,7 +77,8 @@ export interface ProcessEvent { export async function listenEvent(event: string, callback: (event: EventOpts) => unknown, opts?: ListenEventOpts): Promise<() => Promise> { if (RabbitMQ.connection) { - const channel = opts?.channel || RabbitMQ.channel; + const rabbitMQChannel = await RabbitMQ.getSafeChannel(); + const channel = opts?.channel || rabbitMQChannel; if (!channel) throw new Error("[Events] An event was sent without an associated channel"); return await rabbitListen(channel, event, callback, { acknowledge: opts?.acknowledge, diff --git a/src/util/util/RabbitMQ.ts b/src/util/util/RabbitMQ.ts index 89e8e140c..f11f701fb 100644 --- a/src/util/util/RabbitMQ.ts +++ b/src/util/util/RabbitMQ.ts @@ -23,6 +23,7 @@ export const RabbitMQ: { connection: ChannelModel | null; channel: Channel | null; init: () => Promise; + getSafeChannel: () => Promise; } = { connection: null, channel: null, @@ -46,6 +47,13 @@ export const RabbitMQ: { // will be a pain since we will have to reconstruct entire state }); + await this.getSafeChannel(); + }, + getSafeChannel: async function () { + if (!this.connection) return Promise.reject(); + + if (this.channel) return this.channel; + this.channel = await this.connection.createChannel(); console.log(`[RabbitMQ] channel created`); @@ -53,5 +61,12 @@ export const RabbitMQ: { this.channel.on("error", (err) => { console.error("[RabbitMQ] Channel Error:", err); }); + + this.channel.on("close", () => { + console.log("[RabbitMQ] channel closed"); + this.channel = null; + }); + + return this.channel; }, };