mirror of
https://github.com/spacebarchat/server.git
synced 2026-04-14 13:45:49 +00:00
try to handle race condition again
This commit is contained in:
@@ -29,13 +29,18 @@ export async function emitEvent(payload: Omit<Event, "created_at">) {
|
||||
|
||||
if (RabbitMQ.connection) {
|
||||
const data = typeof payload.data === "object" ? JSON.stringify(payload.data) : payload.data; // use rabbitmq for event transmission
|
||||
await RabbitMQ.channel?.assertExchange(id, "fanout", {
|
||||
durable: false,
|
||||
});
|
||||
const channel = await RabbitMQ.getSafeChannel();
|
||||
try {
|
||||
await channel.assertExchange(id, "fanout", {
|
||||
durable: false,
|
||||
});
|
||||
|
||||
// assertQueue isn't needed, because a queue will automatically created if it doesn't exist
|
||||
const successful = RabbitMQ.channel?.publish(id, "", Buffer.from(`${data}`), { type: payload.event });
|
||||
if (!successful) throw new Error("failed to send event");
|
||||
// assertQueue isn't needed, because a queue will automatically created if it doesn't exist
|
||||
const successful = channel.publish(id, "", Buffer.from(`${data}`), { type: payload.event });
|
||||
if (!successful) throw new Error("failed to send event");
|
||||
} catch (e) {
|
||||
console.log("[RabbitMQ] ", e);
|
||||
}
|
||||
} else if (process.env.EVENT_TRANSMISSION === "process") {
|
||||
process.send?.({ type: "event", event: payload, id } as ProcessEvent);
|
||||
} else {
|
||||
@@ -72,7 +77,8 @@ export interface ProcessEvent {
|
||||
|
||||
export async function listenEvent(event: string, callback: (event: EventOpts) => unknown, opts?: ListenEventOpts): Promise<() => Promise<void>> {
|
||||
if (RabbitMQ.connection) {
|
||||
const channel = opts?.channel || RabbitMQ.channel;
|
||||
const rabbitMQChannel = await RabbitMQ.getSafeChannel();
|
||||
const channel = opts?.channel || rabbitMQChannel;
|
||||
if (!channel) throw new Error("[Events] An event was sent without an associated channel");
|
||||
return await rabbitListen(channel, event, callback, {
|
||||
acknowledge: opts?.acknowledge,
|
||||
|
||||
@@ -23,6 +23,7 @@ export const RabbitMQ: {
|
||||
connection: ChannelModel | null;
|
||||
channel: Channel | null;
|
||||
init: () => Promise<void>;
|
||||
getSafeChannel: () => Promise<Channel>;
|
||||
} = {
|
||||
connection: null,
|
||||
channel: null,
|
||||
@@ -46,6 +47,13 @@ export const RabbitMQ: {
|
||||
// will be a pain since we will have to reconstruct entire state
|
||||
});
|
||||
|
||||
await this.getSafeChannel();
|
||||
},
|
||||
getSafeChannel: async function () {
|
||||
if (!this.connection) return Promise.reject();
|
||||
|
||||
if (this.channel) return this.channel;
|
||||
|
||||
this.channel = await this.connection.createChannel();
|
||||
console.log(`[RabbitMQ] channel created`);
|
||||
|
||||
@@ -53,5 +61,12 @@ export const RabbitMQ: {
|
||||
this.channel.on("error", (err) => {
|
||||
console.error("[RabbitMQ] Channel Error:", err);
|
||||
});
|
||||
|
||||
this.channel.on("close", () => {
|
||||
console.log("[RabbitMQ] channel closed");
|
||||
this.channel = null;
|
||||
});
|
||||
|
||||
return this.channel;
|
||||
},
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user