Merge branch 'spacebarchat:master' into fix/gh-embeds-new-commits-title-link

This commit is contained in:
Cyber
2025-12-12 22:21:09 +01:00
committed by GitHub
5 changed files with 71 additions and 108 deletions

View File

@@ -548,7 +548,7 @@ router.delete(
// TODO: handle other read state types
if (body.read_state_type != ReadStateType.CHANNEL) return res.status(204).send();
const readState = await ReadState.findOne({where: {channel_id}});
const readState = await ReadState.findOne({ where: { channel_id, user_id: req.user_id } });
if (readState) {
await readState.remove();
}

View File

@@ -36,7 +36,7 @@ import { WebSocket } from "@spacebar/gateway";
import { Channel as AMQChannel } from "amqplib";
import { Recipient } from "@spacebar/util";
import * as console from "node:console";
import { PublicMember, RelationshipType } from "@spacebar/schemas"
import { PublicMember, RelationshipType } from "@spacebar/schemas";
import { bgRedBright } from "picocolors";
// TODO: close connection on Invalidated Token
@@ -46,10 +46,7 @@ import { bgRedBright } from "picocolors";
// Sharding: calculate if the current shard id matches the formula: shard_id = (guild_id >> 22) % num_shards
// https://discord.com/developers/docs/topics/gateway#sharding
export function handlePresenceUpdate(
this: WebSocket,
{ event, acknowledge, data }: EventOpts,
) {
export function handlePresenceUpdate(this: WebSocket, { event, acknowledge, data }: EventOpts) {
acknowledge?.();
if (event === EVENTEnum.PresenceUpdate) {
return Send(this, {
@@ -92,32 +89,24 @@ export async function setupListener(this: WebSocket) {
this.listen_options = opts;
const consumer = consume.bind(this);
const handleChannelError = (err: unknown) => {
console.error(`[RabbitMQ] [user-${this.user_id}] Channel Error (Handled):`, err);
};
console.log("[RabbitMQ] setupListener: open for ", this.user_id);
if (RabbitMQ.connection) {
console.log(
"[RabbitMQ] setupListener: opts.channel = ",
typeof opts.channel,
"with channel id",
opts.channel?.ch,
);
console.log("[RabbitMQ] setupListener: opts.channel = ", typeof opts.channel, "with channel id", opts.channel?.ch);
opts.channel = await RabbitMQ.connection.createChannel();
opts.channel.on("error", handleChannelError);
opts.channel.queues = {};
console.log(
"[RabbitMQ] channel created: ",
typeof opts.channel,
"with channel id",
opts.channel?.ch,
);
console.log("[RabbitMQ] channel created: ", typeof opts.channel, "with channel id", opts.channel?.ch);
}
this.events[this.user_id] = await listenEvent(this.user_id, consumer, opts);
relationships.forEach(async (relationship) => {
this.events[relationship.to_id] = await listenEvent(
relationship.to_id,
handlePresenceUpdate.bind(this),
opts,
);
this.events[relationship.to_id] = await listenEvent(relationship.to_id, handlePresenceUpdate.bind(this), opts);
});
dm_channels.forEach(async (channel) => {
@@ -130,33 +119,27 @@ export async function setupListener(this: WebSocket) {
this.events[guild.id] = await listenEvent(guild.id, consumer, opts);
guild.channels.forEach(async (channel) => {
if (
permission
.overwriteChannel(channel.permission_overwrites ?? [])
.has("VIEW_CHANNEL")
) {
this.events[channel.id] = await listenEvent(
channel.id,
consumer,
opts,
);
if (permission.overwriteChannel(channel.permission_overwrites ?? []).has("VIEW_CHANNEL")) {
this.events[channel.id] = await listenEvent(channel.id, consumer, opts);
}
});
});
this.once("close", () => {
console.log(
"[RabbitMQ] setupListener: close for",
this.user_id,
"=",
typeof opts.channel,
"with channel id",
opts.channel?.ch,
this.once("close", async () => {
console.log("[RabbitMQ] setupListener: close for", this.user_id, "=", typeof opts.channel, "with channel id", opts.channel?.ch);
// wait for event consumer cancellation
await Promise.all(
Object.values(this.events).map((x) => {
if (x) return x();
else return Promise.resolve();
}),
);
if (opts.channel) opts.channel.close();
else {
Object.values(this.events).forEach((x) => x?.());
Object.values(this.member_events).forEach((x) => x());
await Promise.all(Object.values(this.member_events).map((x) => x()));
if (opts.channel) {
await opts.channel.close();
opts.channel.off("error", handleChannelError);
}
});
}
@@ -180,11 +163,7 @@ async function consume(this: WebSocket, opts: EventOpts) {
break;
case "GUILD_MEMBER_ADD":
if (this.member_events[data.user.id]) break; // already subscribed
this.member_events[data.user.id] = await listenEvent(
data.user.id,
handlePresenceUpdate.bind(this),
this.listen_options,
);
this.member_events[data.user.id] = await listenEvent(data.user.id, handlePresenceUpdate.bind(this), this.listen_options);
break;
case "GUILD_MEMBER_UPDATE":
if (!this.member_events[data.user.id]) break;
@@ -197,32 +176,20 @@ async function consume(this: WebSocket, opts: EventOpts) {
delete this.events[id];
break;
case "CHANNEL_CREATE":
if (
!permission
.overwriteChannel(data.permission_overwrites)
.has("VIEW_CHANNEL")
) {
if (!permission.overwriteChannel(data.permission_overwrites).has("VIEW_CHANNEL")) {
return;
}
this.events[id] = await listenEvent(id, consumer, listenOpts);
break;
case "RELATIONSHIP_ADD":
this.events[data.user.id] = await listenEvent(
data.user.id,
handlePresenceUpdate.bind(this),
this.listen_options,
);
this.events[data.user.id] = await listenEvent(data.user.id, handlePresenceUpdate.bind(this), this.listen_options);
break;
case "GUILD_CREATE":
this.events[id] = await listenEvent(id, consumer, listenOpts);
break;
case "CHANNEL_UPDATE": {
const exists = this.events[id];
if (
permission
.overwriteChannel(data.permission_overwrites)
.has("VIEW_CHANNEL")
) {
if (permission.overwriteChannel(data.permission_overwrites).has("VIEW_CHANNEL")) {
if (exists) break;
this.events[id] = await listenEvent(id, consumer, listenOpts);
} else {
@@ -294,20 +261,19 @@ async function consume(this: WebSocket, opts: EventOpts) {
case "MESSAGE_UPDATE":
// console.log(this.request)
if (data["attachments"])
data["attachments"] =
Message.prototype.withSignedAttachments.call(
data,
new NewUrlUserSignatureData({
ip: this.ipAddress,
userAgent: this.userAgent,
}),
).attachments;
data["attachments"] = Message.prototype.withSignedAttachments.call(
data,
new NewUrlUserSignatureData({
ip: this.ipAddress,
userAgent: this.userAgent,
}),
).attachments;
break;
default:
break;
}
if(event === "GUILD_MEMBER_ADD") {
if (event === "GUILD_MEMBER_ADD") {
if ((data as PublicMember).roles === undefined || (data as PublicMember).roles === null) {
console.log(bgRedBright("[Gateway]"), "[GUILD_MEMBER_ADD] roles is undefined, setting to empty array!", opts.origin ?? "(Event origin not defined)", data);
(data as PublicMember).roles = [];

View File

@@ -44,8 +44,8 @@ export interface WebSocket extends WS {
intents: Intents;
sequence: number;
permissions: Record<string, Permissions>;
events: Record<string, undefined | (() => unknown)>;
member_events: Record<string, () => unknown>;
events: Record<string, undefined | (() => Promise<unknown>)>;
member_events: Record<string, () => Promise<unknown>>;
listen_options: ListenEventOpts;
capabilities?: Capabilities;
large_threshold: number;

View File

@@ -20,30 +20,21 @@ import { Channel } from "amqplib";
import { RabbitMQ } from "./RabbitMQ";
import EventEmitter from "events";
import { EVENT, Event } from "../interfaces";
import { randomUUID } from "crypto";
export const events = new EventEmitter();
export async function emitEvent(payload: Omit<Event, "created_at">) {
const id = (payload.guild_id ||
payload.channel_id ||
payload.user_id) as string;
const id = (payload.guild_id || payload.channel_id || payload.user_id) as string;
if (!id) return console.error("event doesn't contain any id", payload);
if (RabbitMQ.connection) {
const data =
typeof payload.data === "object"
? JSON.stringify(payload.data)
: payload.data; // use rabbitmq for event transmission
const data = typeof payload.data === "object" ? JSON.stringify(payload.data) : payload.data; // use rabbitmq for event transmission
await RabbitMQ.channel?.assertExchange(id, "fanout", {
durable: false,
});
// assertQueue isn't needed, because a queue will automatically created if it doesn't exist
const successful = RabbitMQ.channel?.publish(
id,
"",
Buffer.from(`${data}`),
{ type: payload.event },
);
const successful = RabbitMQ.channel?.publish(id, "", Buffer.from(`${data}`), { type: payload.event });
if (!successful) throw new Error("failed to send event");
} else if (process.env.EVENT_TRANSMISSION === "process") {
process.send?.({ type: "event", event: payload, id } as ProcessEvent);
@@ -79,17 +70,10 @@ export interface ProcessEvent {
id: string;
}
export async function listenEvent(
event: string,
callback: (event: EventOpts) => unknown,
opts?: ListenEventOpts,
) {
export async function listenEvent(event: string, callback: (event: EventOpts) => unknown, opts?: ListenEventOpts): Promise<() => Promise<void>> {
if (RabbitMQ.connection) {
const channel = opts?.channel || RabbitMQ.channel;
if (!channel)
throw new Error(
"[Events] An event was sent without an associated channel",
);
if (!channel) throw new Error("[Events] An event was sent without an associated channel");
return await rabbitListen(channel, event, callback, {
acknowledge: opts?.acknowledge,
});
@@ -101,9 +85,7 @@ export async function listenEvent(
const listener = (msg: ProcessEvent) => {
// eslint-disable-next-line @typescript-eslint/no-unused-expressions
msg.type === "event" &&
msg.id === event &&
callback({ ...msg.event, cancel });
msg.type === "event" && msg.id === event && callback({ ...msg.event, cancel });
};
// TODO: assert the type is correct?
@@ -124,20 +106,17 @@ export async function listenEvent(
}
}
async function rabbitListen(
channel: Channel,
id: string,
callback: (event: EventOpts) => unknown,
opts?: { acknowledge?: boolean },
) {
async function rabbitListen(channel: Channel, id: string, callback: (event: EventOpts) => unknown, opts?: { acknowledge?: boolean }): Promise<() => Promise<void>> {
await channel.assertExchange(id, "fanout", { durable: false });
const q = await channel.assertQueue("", {
exclusive: true,
autoDelete: true,
});
const consumerTag = randomUUID();
const cancel = async () => {
await channel.cancel(q.queue);
await channel.cancel(consumerTag);
await channel.unbindQueue(q.queue, id, "");
};
@@ -163,6 +142,7 @@ async function rabbitListen(
},
{
noAck: !opts?.acknowledge,
consumerTag: consumerTag,
},
);

View File

@@ -34,7 +34,24 @@ export const RabbitMQ: {
timeout: 1000 * 60,
});
console.log(`[RabbitMQ] connected`);
// log connection errors
this.connection.on("error", (err) => {
console.error("[RabbitMQ] Connection Error:", err);
});
this.connection.on("close", () => {
console.error("[RabbitMQ] connection closed");
// TODO: Add reconnection logic here if the connection crashes??
// will be a pain since we will have to reconstruct entire state
});
this.channel = await this.connection.createChannel();
console.log(`[RabbitMQ] channel created`);
// log channel errors
this.channel.on("error", (err) => {
console.error("[RabbitMQ] Channel Error:", err);
});
},
};