diff --git a/src/api/routes/channels/#channel_id/messages/index.ts b/src/api/routes/channels/#channel_id/messages/index.ts index 8790241d8..43c4984e0 100644 --- a/src/api/routes/channels/#channel_id/messages/index.ts +++ b/src/api/routes/channels/#channel_id/messages/index.ts @@ -548,7 +548,7 @@ router.delete( // TODO: handle other read state types if (body.read_state_type != ReadStateType.CHANNEL) return res.status(204).send(); - const readState = await ReadState.findOne({where: {channel_id}}); + const readState = await ReadState.findOne({ where: { channel_id, user_id: req.user_id } }); if (readState) { await readState.remove(); } diff --git a/src/gateway/listener/listener.ts b/src/gateway/listener/listener.ts index d7c3d23e5..75cb6bc61 100644 --- a/src/gateway/listener/listener.ts +++ b/src/gateway/listener/listener.ts @@ -36,7 +36,7 @@ import { WebSocket } from "@spacebar/gateway"; import { Channel as AMQChannel } from "amqplib"; import { Recipient } from "@spacebar/util"; import * as console from "node:console"; -import { PublicMember, RelationshipType } from "@spacebar/schemas" +import { PublicMember, RelationshipType } from "@spacebar/schemas"; import { bgRedBright } from "picocolors"; // TODO: close connection on Invalidated Token @@ -46,10 +46,7 @@ import { bgRedBright } from "picocolors"; // Sharding: calculate if the current shard id matches the formula: shard_id = (guild_id >> 22) % num_shards // https://discord.com/developers/docs/topics/gateway#sharding -export function handlePresenceUpdate( - this: WebSocket, - { event, acknowledge, data }: EventOpts, -) { +export function handlePresenceUpdate(this: WebSocket, { event, acknowledge, data }: EventOpts) { acknowledge?.(); if (event === EVENTEnum.PresenceUpdate) { return Send(this, { @@ -92,32 +89,24 @@ export async function setupListener(this: WebSocket) { this.listen_options = opts; const consumer = consume.bind(this); + const handleChannelError = (err: unknown) => { + console.error(`[RabbitMQ] [user-${this.user_id}] Channel Error (Handled):`, err); + }; + console.log("[RabbitMQ] setupListener: open for ", this.user_id); if (RabbitMQ.connection) { - console.log( - "[RabbitMQ] setupListener: opts.channel = ", - typeof opts.channel, - "with channel id", - opts.channel?.ch, - ); + console.log("[RabbitMQ] setupListener: opts.channel = ", typeof opts.channel, "with channel id", opts.channel?.ch); opts.channel = await RabbitMQ.connection.createChannel(); + + opts.channel.on("error", handleChannelError); opts.channel.queues = {}; - console.log( - "[RabbitMQ] channel created: ", - typeof opts.channel, - "with channel id", - opts.channel?.ch, - ); + console.log("[RabbitMQ] channel created: ", typeof opts.channel, "with channel id", opts.channel?.ch); } this.events[this.user_id] = await listenEvent(this.user_id, consumer, opts); relationships.forEach(async (relationship) => { - this.events[relationship.to_id] = await listenEvent( - relationship.to_id, - handlePresenceUpdate.bind(this), - opts, - ); + this.events[relationship.to_id] = await listenEvent(relationship.to_id, handlePresenceUpdate.bind(this), opts); }); dm_channels.forEach(async (channel) => { @@ -130,33 +119,27 @@ export async function setupListener(this: WebSocket) { this.events[guild.id] = await listenEvent(guild.id, consumer, opts); guild.channels.forEach(async (channel) => { - if ( - permission - .overwriteChannel(channel.permission_overwrites ?? []) - .has("VIEW_CHANNEL") - ) { - this.events[channel.id] = await listenEvent( - channel.id, - consumer, - opts, - ); + if (permission.overwriteChannel(channel.permission_overwrites ?? []).has("VIEW_CHANNEL")) { + this.events[channel.id] = await listenEvent(channel.id, consumer, opts); } }); }); - this.once("close", () => { - console.log( - "[RabbitMQ] setupListener: close for", - this.user_id, - "=", - typeof opts.channel, - "with channel id", - opts.channel?.ch, + this.once("close", async () => { + console.log("[RabbitMQ] setupListener: close for", this.user_id, "=", typeof opts.channel, "with channel id", opts.channel?.ch); + + // wait for event consumer cancellation + await Promise.all( + Object.values(this.events).map((x) => { + if (x) return x(); + else return Promise.resolve(); + }), ); - if (opts.channel) opts.channel.close(); - else { - Object.values(this.events).forEach((x) => x?.()); - Object.values(this.member_events).forEach((x) => x()); + await Promise.all(Object.values(this.member_events).map((x) => x())); + + if (opts.channel) { + await opts.channel.close(); + opts.channel.off("error", handleChannelError); } }); } @@ -180,11 +163,7 @@ async function consume(this: WebSocket, opts: EventOpts) { break; case "GUILD_MEMBER_ADD": if (this.member_events[data.user.id]) break; // already subscribed - this.member_events[data.user.id] = await listenEvent( - data.user.id, - handlePresenceUpdate.bind(this), - this.listen_options, - ); + this.member_events[data.user.id] = await listenEvent(data.user.id, handlePresenceUpdate.bind(this), this.listen_options); break; case "GUILD_MEMBER_UPDATE": if (!this.member_events[data.user.id]) break; @@ -197,32 +176,20 @@ async function consume(this: WebSocket, opts: EventOpts) { delete this.events[id]; break; case "CHANNEL_CREATE": - if ( - !permission - .overwriteChannel(data.permission_overwrites) - .has("VIEW_CHANNEL") - ) { + if (!permission.overwriteChannel(data.permission_overwrites).has("VIEW_CHANNEL")) { return; } this.events[id] = await listenEvent(id, consumer, listenOpts); break; case "RELATIONSHIP_ADD": - this.events[data.user.id] = await listenEvent( - data.user.id, - handlePresenceUpdate.bind(this), - this.listen_options, - ); + this.events[data.user.id] = await listenEvent(data.user.id, handlePresenceUpdate.bind(this), this.listen_options); break; case "GUILD_CREATE": this.events[id] = await listenEvent(id, consumer, listenOpts); break; case "CHANNEL_UPDATE": { const exists = this.events[id]; - if ( - permission - .overwriteChannel(data.permission_overwrites) - .has("VIEW_CHANNEL") - ) { + if (permission.overwriteChannel(data.permission_overwrites).has("VIEW_CHANNEL")) { if (exists) break; this.events[id] = await listenEvent(id, consumer, listenOpts); } else { @@ -294,20 +261,19 @@ async function consume(this: WebSocket, opts: EventOpts) { case "MESSAGE_UPDATE": // console.log(this.request) if (data["attachments"]) - data["attachments"] = - Message.prototype.withSignedAttachments.call( - data, - new NewUrlUserSignatureData({ - ip: this.ipAddress, - userAgent: this.userAgent, - }), - ).attachments; + data["attachments"] = Message.prototype.withSignedAttachments.call( + data, + new NewUrlUserSignatureData({ + ip: this.ipAddress, + userAgent: this.userAgent, + }), + ).attachments; break; default: break; } - if(event === "GUILD_MEMBER_ADD") { + if (event === "GUILD_MEMBER_ADD") { if ((data as PublicMember).roles === undefined || (data as PublicMember).roles === null) { console.log(bgRedBright("[Gateway]"), "[GUILD_MEMBER_ADD] roles is undefined, setting to empty array!", opts.origin ?? "(Event origin not defined)", data); (data as PublicMember).roles = []; diff --git a/src/gateway/util/WebSocket.ts b/src/gateway/util/WebSocket.ts index ea6f0701e..00ef9e98f 100644 --- a/src/gateway/util/WebSocket.ts +++ b/src/gateway/util/WebSocket.ts @@ -44,8 +44,8 @@ export interface WebSocket extends WS { intents: Intents; sequence: number; permissions: Record; - events: Record unknown)>; - member_events: Record unknown>; + events: Record Promise)>; + member_events: Record Promise>; listen_options: ListenEventOpts; capabilities?: Capabilities; large_threshold: number; diff --git a/src/util/util/Event.ts b/src/util/util/Event.ts index f56d66646..ce5c25d25 100644 --- a/src/util/util/Event.ts +++ b/src/util/util/Event.ts @@ -20,30 +20,21 @@ import { Channel } from "amqplib"; import { RabbitMQ } from "./RabbitMQ"; import EventEmitter from "events"; import { EVENT, Event } from "../interfaces"; +import { randomUUID } from "crypto"; export const events = new EventEmitter(); export async function emitEvent(payload: Omit) { - const id = (payload.guild_id || - payload.channel_id || - payload.user_id) as string; + const id = (payload.guild_id || payload.channel_id || payload.user_id) as string; if (!id) return console.error("event doesn't contain any id", payload); if (RabbitMQ.connection) { - const data = - typeof payload.data === "object" - ? JSON.stringify(payload.data) - : payload.data; // use rabbitmq for event transmission + const data = typeof payload.data === "object" ? JSON.stringify(payload.data) : payload.data; // use rabbitmq for event transmission await RabbitMQ.channel?.assertExchange(id, "fanout", { durable: false, }); // assertQueue isn't needed, because a queue will automatically created if it doesn't exist - const successful = RabbitMQ.channel?.publish( - id, - "", - Buffer.from(`${data}`), - { type: payload.event }, - ); + const successful = RabbitMQ.channel?.publish(id, "", Buffer.from(`${data}`), { type: payload.event }); if (!successful) throw new Error("failed to send event"); } else if (process.env.EVENT_TRANSMISSION === "process") { process.send?.({ type: "event", event: payload, id } as ProcessEvent); @@ -79,17 +70,10 @@ export interface ProcessEvent { id: string; } -export async function listenEvent( - event: string, - callback: (event: EventOpts) => unknown, - opts?: ListenEventOpts, -) { +export async function listenEvent(event: string, callback: (event: EventOpts) => unknown, opts?: ListenEventOpts): Promise<() => Promise> { if (RabbitMQ.connection) { const channel = opts?.channel || RabbitMQ.channel; - if (!channel) - throw new Error( - "[Events] An event was sent without an associated channel", - ); + if (!channel) throw new Error("[Events] An event was sent without an associated channel"); return await rabbitListen(channel, event, callback, { acknowledge: opts?.acknowledge, }); @@ -101,9 +85,7 @@ export async function listenEvent( const listener = (msg: ProcessEvent) => { // eslint-disable-next-line @typescript-eslint/no-unused-expressions - msg.type === "event" && - msg.id === event && - callback({ ...msg.event, cancel }); + msg.type === "event" && msg.id === event && callback({ ...msg.event, cancel }); }; // TODO: assert the type is correct? @@ -124,20 +106,17 @@ export async function listenEvent( } } -async function rabbitListen( - channel: Channel, - id: string, - callback: (event: EventOpts) => unknown, - opts?: { acknowledge?: boolean }, -) { +async function rabbitListen(channel: Channel, id: string, callback: (event: EventOpts) => unknown, opts?: { acknowledge?: boolean }): Promise<() => Promise> { await channel.assertExchange(id, "fanout", { durable: false }); const q = await channel.assertQueue("", { exclusive: true, autoDelete: true, }); + const consumerTag = randomUUID(); + const cancel = async () => { - await channel.cancel(q.queue); + await channel.cancel(consumerTag); await channel.unbindQueue(q.queue, id, ""); }; @@ -163,6 +142,7 @@ async function rabbitListen( }, { noAck: !opts?.acknowledge, + consumerTag: consumerTag, }, ); diff --git a/src/util/util/RabbitMQ.ts b/src/util/util/RabbitMQ.ts index 1a61aee93..89e8e140c 100644 --- a/src/util/util/RabbitMQ.ts +++ b/src/util/util/RabbitMQ.ts @@ -34,7 +34,24 @@ export const RabbitMQ: { timeout: 1000 * 60, }); console.log(`[RabbitMQ] connected`); + + // log connection errors + this.connection.on("error", (err) => { + console.error("[RabbitMQ] Connection Error:", err); + }); + + this.connection.on("close", () => { + console.error("[RabbitMQ] connection closed"); + // TODO: Add reconnection logic here if the connection crashes?? + // will be a pain since we will have to reconstruct entire state + }); + this.channel = await this.connection.createChannel(); console.log(`[RabbitMQ] channel created`); + + // log channel errors + this.channel.on("error", (err) => { + console.error("[RabbitMQ] Channel Error:", err); + }); }, };