Various gateway stuff

This commit is contained in:
Rory&
2025-12-02 19:47:20 +01:00
parent 58b6cabf9c
commit dc5dadec04
4 changed files with 151 additions and 3 deletions

View File

@@ -0,0 +1,127 @@
/*
Spacebar: A FOSS re-implementation and extension of the Discord.com backend.
Copyright (C) 2023 Spacebar and Spacebar Contributors
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
import {
getDatabase,
getPermission,
listenEvent,
Member,
Role,
Session,
User,
Presence,
Channel,
Permissions,
arrayPartition,
timePromise,
Stopwatch,
Guild,
} from "@spacebar/util";
import { WebSocket, Payload, handlePresenceUpdate, OPCODES, Send } from "@spacebar/gateway";
import murmur from "murmurhash-js/murmurhash3_gc";
import { check } from "./instanceOf";
import { LazyRequestSchema, PublicMember } from "@spacebar/schemas";
import { In } from "typeorm";
// TODO: only show roles/members that have access to this channel
// TODO: config: to list all members (even those who are offline) sorted by role, or just those who are online
// TODO: rewrite typeorm
const getMostRelevantSession = (sessions: Session[]) => {
const statusMap = {
online: 0,
idle: 1,
dnd: 2,
invisible: 3,
offline: 4,
};
// sort sessions by relevance
sessions = sessions.sort((a, b) => {
return statusMap[a.status] - statusMap[b.status] + ((a.activities?.length ?? 0) - (b.activities?.length ?? 0)) * 2;
});
return sessions[0];
};
export async function onGuildSync(this: WebSocket, { d }: Payload) {
const sw = Stopwatch.startNew();
if (!Array.isArray(d)) throw new Error("Invalid payload for GUILD_SYNC");
const guild_ids = d as string[];
const joinedGuildIds = await Member.find({ where: { id: this.user_id, guild_id: In(guild_ids) }, select: { guild_id: true } }).then((members) =>
members.map((m) => m.guild_id),
);
const tasks = joinedGuildIds.map((guildId) => timePromise(async () => handleGuildSync(this, guildId)));
// not awaiting lol
Promise.all(tasks)
.then((res) => {
console.log(`[Gateway] GUILD_SYNC processed ${guild_ids.length} guilds in ${sw.elapsed().totalMilliseconds}ms:`, {
...Object.fromEntries(
res.map((r) => [r.result.id, `${r.result.id}: ${r.result.members.length}U/${r.result.presences.length}P in ${r.elapsed.totalMilliseconds}ms`]),
),
});
})
.catch((err) => {
console.error("[Gateway] Error processing GUILD_SYNC:", err);
});
}
interface GuildSyncResult {
id: string;
presences: Presence[];
members: PublicMember[];
}
async function handleGuildSync(ws: WebSocket, guild_id: string) {
const res: GuildSyncResult = { id: guild_id, presences: [], members: [] };
const members = await Member.find({ where: { guild_id }, relations: ["user", "roles", "guild"] });
res.members = members.map((m) => m.toPublicMember());
const sessions = await Session.find({ where: { user_id: In(members.map((m) => m.id)) }, order: { user_id: "ASC" } });
const sessionsByUserId = new Map<string, Session[]>();
for (const session of sessions) {
if (!sessionsByUserId.has(session.user_id)) sessionsByUserId.set(session.user_id, []);
sessionsByUserId.get(session.user_id)!.push(session);
}
for (const member of members) {
const userSessions = sessionsByUserId.get(member.id) || [];
if (userSessions.length === 0) continue;
const mostRelevantSession = getMostRelevantSession(userSessions);
const presence: Presence = {
user: member.user.toPublicUser(),
guild_id: guild_id,
status: mostRelevantSession.getPublicStatus(),
activities: mostRelevantSession.activities,
client_status: mostRelevantSession.client_status,
};
res.presences.push(presence);
}
await Send(ws, {
op: OPCODES.Dispatch,
t: "GUILD_SYNC",
s: ws.sequence++,
d: res,
});
return res;
}

View File

@@ -1,6 +1,6 @@
/*
Spacebar: A FOSS re-implementation and extension of the Discord.com backend.
Copyright (C) 2023 Spacebar and Spacebar Contributors
Copyright (C) 2025 Spacebar and Spacebar Contributors
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
@@ -16,14 +16,29 @@
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
import { WebSocket } from "@spacebar/gateway";
import { OPCODES, Payload, WebSocket } from "@spacebar/gateway";
import { setHeartbeat } from "../util/Heartbeat";
import { Send } from "../util/Send";
export async function onHeartbeat(this: WebSocket) {
interface QoSData {
seq: number | null;
qos: QoSPayload;
}
export interface QoSPayload {
ver: number;
active: boolean;
reasons: string[];
}
export async function onHeartbeat(this: WebSocket, data: Payload) {
// TODO: validate payload
setHeartbeat(this);
if (data.op === OPCODES.SetQoS) {
this.qos = (data.d as QoSData).qos;
}
await Send(this, { op: 11, d: {} });
}

View File

@@ -28,6 +28,7 @@ import { onGuildSubscriptionsBulk } from "./GuildSubscriptionsBulk";
import { onStreamCreate } from "./StreamCreate";
import { onStreamDelete } from "./StreamDelete";
import { onStreamWatch } from "./StreamWatch";
import { onGuildSync } from "./GuildSync";
export type OPCodeHandler = (this: WebSocket, data: Payload) => unknown;
@@ -42,10 +43,13 @@ export default {
8: onRequestGuildMembers,
// 9: Invalid Session
// 10: Hello
12: onGuildSync, // technically deprecated, bt should be less finnicky?
// 13: Dm_update
14: onLazyRequest,
18: onStreamCreate,
19: onStreamDelete,
20: onStreamWatch,
37: onGuildSubscriptionsBulk,
40: onHeartbeat, // same as 1, except with extra data
41: () => {}, // "Update Time Spent Session ID", just tracking nonsense
} as { [key: number]: OPCodeHandler };

View File

@@ -23,6 +23,7 @@ import { Capabilities } from "./Capabilities";
import { ZstdCompress } from "zlib";
import { ZstdDecompress } from "node:zlib";
import { Decoder, Encoder } from "@toondepauw/node-zstd";
import { QoSPayload } from "../opcodes/Heartbeat";
export interface WebSocket extends WS {
version: number;
@@ -48,4 +49,5 @@ export interface WebSocket extends WS {
listen_options: ListenEventOpts;
capabilities?: Capabilities;
large_threshold: number;
qos?: QoSPayload;
}