diff --git a/src/gateway/opcodes/GuildSync.ts b/src/gateway/opcodes/GuildSync.ts new file mode 100644 index 000000000..03540ded3 --- /dev/null +++ b/src/gateway/opcodes/GuildSync.ts @@ -0,0 +1,127 @@ +/* + Spacebar: A FOSS re-implementation and extension of the Discord.com backend. + Copyright (C) 2023 Spacebar and Spacebar Contributors + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . +*/ + +import { + getDatabase, + getPermission, + listenEvent, + Member, + Role, + Session, + User, + Presence, + Channel, + Permissions, + arrayPartition, + timePromise, + Stopwatch, + Guild, +} from "@spacebar/util"; +import { WebSocket, Payload, handlePresenceUpdate, OPCODES, Send } from "@spacebar/gateway"; +import murmur from "murmurhash-js/murmurhash3_gc"; +import { check } from "./instanceOf"; +import { LazyRequestSchema, PublicMember } from "@spacebar/schemas"; +import { In } from "typeorm"; + +// TODO: only show roles/members that have access to this channel +// TODO: config: to list all members (even those who are offline) sorted by role, or just those who are online +// TODO: rewrite typeorm + +const getMostRelevantSession = (sessions: Session[]) => { + const statusMap = { + online: 0, + idle: 1, + dnd: 2, + invisible: 3, + offline: 4, + }; + // sort sessions by relevance + sessions = sessions.sort((a, b) => { + return statusMap[a.status] - statusMap[b.status] + ((a.activities?.length ?? 0) - (b.activities?.length ?? 0)) * 2; + }); + + return sessions[0]; +}; + +export async function onGuildSync(this: WebSocket, { d }: Payload) { + const sw = Stopwatch.startNew(); + if (!Array.isArray(d)) throw new Error("Invalid payload for GUILD_SYNC"); + const guild_ids = d as string[]; + + const joinedGuildIds = await Member.find({ where: { id: this.user_id, guild_id: In(guild_ids) }, select: { guild_id: true } }).then((members) => + members.map((m) => m.guild_id), + ); + + const tasks = joinedGuildIds.map((guildId) => timePromise(async () => handleGuildSync(this, guildId))); + // not awaiting lol + Promise.all(tasks) + .then((res) => { + console.log(`[Gateway] GUILD_SYNC processed ${guild_ids.length} guilds in ${sw.elapsed().totalMilliseconds}ms:`, { + ...Object.fromEntries( + res.map((r) => [r.result.id, `${r.result.id}: ${r.result.members.length}U/${r.result.presences.length}P in ${r.elapsed.totalMilliseconds}ms`]), + ), + }); + }) + .catch((err) => { + console.error("[Gateway] Error processing GUILD_SYNC:", err); + }); +} + +interface GuildSyncResult { + id: string; + presences: Presence[]; + members: PublicMember[]; +} + +async function handleGuildSync(ws: WebSocket, guild_id: string) { + const res: GuildSyncResult = { id: guild_id, presences: [], members: [] }; + + const members = await Member.find({ where: { guild_id }, relations: ["user", "roles", "guild"] }); + res.members = members.map((m) => m.toPublicMember()); + + const sessions = await Session.find({ where: { user_id: In(members.map((m) => m.id)) }, order: { user_id: "ASC" } }); + const sessionsByUserId = new Map(); + for (const session of sessions) { + if (!sessionsByUserId.has(session.user_id)) sessionsByUserId.set(session.user_id, []); + sessionsByUserId.get(session.user_id)!.push(session); + } + + for (const member of members) { + const userSessions = sessionsByUserId.get(member.id) || []; + if (userSessions.length === 0) continue; + + const mostRelevantSession = getMostRelevantSession(userSessions); + const presence: Presence = { + user: member.user.toPublicUser(), + guild_id: guild_id, + status: mostRelevantSession.getPublicStatus(), + activities: mostRelevantSession.activities, + client_status: mostRelevantSession.client_status, + }; + res.presences.push(presence); + } + + await Send(ws, { + op: OPCODES.Dispatch, + t: "GUILD_SYNC", + s: ws.sequence++, + d: res, + }); + + return res; +} diff --git a/src/gateway/opcodes/Heartbeat.ts b/src/gateway/opcodes/Heartbeat.ts index b9b62be3d..ab222bdf4 100644 --- a/src/gateway/opcodes/Heartbeat.ts +++ b/src/gateway/opcodes/Heartbeat.ts @@ -1,6 +1,6 @@ /* Spacebar: A FOSS re-implementation and extension of the Discord.com backend. - Copyright (C) 2023 Spacebar and Spacebar Contributors + Copyright (C) 2025 Spacebar and Spacebar Contributors This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published @@ -16,14 +16,29 @@ along with this program. If not, see . */ -import { WebSocket } from "@spacebar/gateway"; +import { OPCODES, Payload, WebSocket } from "@spacebar/gateway"; import { setHeartbeat } from "../util/Heartbeat"; import { Send } from "../util/Send"; -export async function onHeartbeat(this: WebSocket) { +interface QoSData { + seq: number | null; + qos: QoSPayload; +} + +export interface QoSPayload { + ver: number; + active: boolean; + reasons: string[]; +} + +export async function onHeartbeat(this: WebSocket, data: Payload) { // TODO: validate payload setHeartbeat(this); + if (data.op === OPCODES.SetQoS) { + this.qos = (data.d as QoSData).qos; + } + await Send(this, { op: 11, d: {} }); } diff --git a/src/gateway/opcodes/index.ts b/src/gateway/opcodes/index.ts index cba9e545b..0955fe519 100644 --- a/src/gateway/opcodes/index.ts +++ b/src/gateway/opcodes/index.ts @@ -28,6 +28,7 @@ import { onGuildSubscriptionsBulk } from "./GuildSubscriptionsBulk"; import { onStreamCreate } from "./StreamCreate"; import { onStreamDelete } from "./StreamDelete"; import { onStreamWatch } from "./StreamWatch"; +import { onGuildSync } from "./GuildSync"; export type OPCodeHandler = (this: WebSocket, data: Payload) => unknown; @@ -42,10 +43,13 @@ export default { 8: onRequestGuildMembers, // 9: Invalid Session // 10: Hello + 12: onGuildSync, // technically deprecated, bt should be less finnicky? // 13: Dm_update 14: onLazyRequest, 18: onStreamCreate, 19: onStreamDelete, 20: onStreamWatch, 37: onGuildSubscriptionsBulk, + 40: onHeartbeat, // same as 1, except with extra data + 41: () => {}, // "Update Time Spent Session ID", just tracking nonsense } as { [key: number]: OPCodeHandler }; diff --git a/src/gateway/util/WebSocket.ts b/src/gateway/util/WebSocket.ts index 09bae2d02..ea6f0701e 100644 --- a/src/gateway/util/WebSocket.ts +++ b/src/gateway/util/WebSocket.ts @@ -23,6 +23,7 @@ import { Capabilities } from "./Capabilities"; import { ZstdCompress } from "zlib"; import { ZstdDecompress } from "node:zlib"; import { Decoder, Encoder } from "@toondepauw/node-zstd"; +import { QoSPayload } from "../opcodes/Heartbeat"; export interface WebSocket extends WS { version: number; @@ -48,4 +49,5 @@ export interface WebSocket extends WS { listen_options: ListenEventOpts; capabilities?: Capabilities; large_threshold: number; + qos?: QoSPayload; }