From 0d6824d2853dd50bdb882429c3528f6e0fd8d4de Mon Sep 17 00:00:00 2001 From: Rory& Date: Thu, 28 May 2026 10:38:39 +0200 Subject: [PATCH] Process lifecycle scripts --- package-lock.json | Bin 326825 -> 327405 bytes src/api/Server.ts | 15 +++-- src/api/routes/stop.ts | 5 +- src/bundle/Server.ts | 20 +++--- src/cdn/Server.ts | 11 +++- src/gateway/Server.ts | 36 ++++++---- src/gateway/events/Close.ts | 60 +++++++++-------- src/gateway/events/Connection.ts | 41 +++++++++--- src/util/util/Database.ts | 7 +- src/util/util/ProcessLifecycle.ts | 62 ++++++++++++++++++ .../ipc/listener/RabbitMqSingleListener.ts | 28 ++++++-- .../util/ipc/listener/UnixSocketListener.ts | 43 ++++++++++-- .../util/ipc/writer/RabbitMqSingleWriter.ts | 6 +- src/util/util/ipc/writer/UnixSocketWriter.ts | 24 +++++-- src/webrtc/Server.ts | 24 ++++--- 15 files changed, 282 insertions(+), 100 deletions(-) create mode 100644 src/util/util/ProcessLifecycle.ts diff --git a/package-lock.json b/package-lock.json index 0cd867fdbe65257eae4bb44b48b77b60fec4b583..05ebdfacd4c2362b2d15c244498505fe57edfcbd 100644 GIT binary patch delta 282 zcmZ4aQ~2#);SJh+(~n3n3QLw}RO)6H>*nRBr0N!x=H+GPr7Kw}D8(61-)O+3G}*zQ zqd9=DJ%Eo9h?#(xd3yjK%gtTSGbrHJnvW0K-kks4%?=^9p8>}cRqXcm^2Vh|cpmY(RHR27zQ5*6YR zIwMyT(bDMq>c%Lt<0_ delta 49 zcmaF+S9s-5;SJh+lbK^=Cnrd9OuxIBQMq{uU;7e1Mj&PaV&?5j_*k~>+MauwrNI~g D_9PQy diff --git a/src/api/Server.ts b/src/api/Server.ts index 1d9fe2ab2..f9f1c708f 100644 --- a/src/api/Server.ts +++ b/src/api/Server.ts @@ -16,15 +16,17 @@ along with this program. If not, see . */ +import path from "node:path"; +import { Request, Response, Router } from "express"; +import morgan from "morgan"; +import { Server, ServerOptions } from "lambert-server"; +import { red } from "picocolors"; import { Config, ConnectionConfig, ConnectionLoader, Email, JSONReplacer, WebAuthn, initDatabase, initEvent, registerRoutes, getDatabase, getRevInfoOrFail } from "@spacebar/util"; import { Authentication, CORS, ImageProxy, BodyParser, ErrorHandler, initRateLimits, initTranslation } from "./middlewares"; -import { Request, Response, Router } from "express"; -import { Server, ServerOptions } from "lambert-server"; -import morgan from "morgan"; -import path from "node:path"; -import { red } from "picocolors"; import { initInstance } from "./util/handlers/Instance"; import { route } from "./util"; +import { ProcessLifecycle } from "../util/util/ProcessLifecycle"; +import { Monitoring } from "../util/monitoring/Monitoring"; const ASSETS_FOLDER = path.join(__dirname, "..", "..", "assets"); const PUBLIC_ASSETS_FOLDER = path.join(ASSETS_FOLDER, "public"); @@ -50,6 +52,8 @@ export class SpacebarServer extends Server { } async start() { + await Monitoring.init(); + Monitoring.attach(this.app); await initDatabase(); await Config.init(); await initEvent(); @@ -196,6 +200,7 @@ export class SpacebarServer extends Server { if (logRequests) console.log(red(`Warning: Request logging is enabled! This will spam your console!\nTo disable this, unset the 'LOG_REQUESTS' environment variable!`)); + await ProcessLifecycle.Ready(); return super.start(); } } diff --git a/src/api/routes/stop.ts b/src/api/routes/stop.ts index 4f8697350..b6ecd04ce 100644 --- a/src/api/routes/stop.ts +++ b/src/api/routes/stop.ts @@ -18,6 +18,7 @@ import { route } from "@spacebar/api"; import { Request, Response, Router } from "express"; +import { ProcessLifecycle } from "../../util/util/ProcessLifecycle"; const router: Router = Router({ mergeParams: true }); @@ -35,7 +36,9 @@ router.post( (req: Request, res: Response) => { console.log(`/stop was called by ${req.user_id} at ${new Date()}`); res.sendStatus(200); - process.kill(process.pid, "SIGTERM"); + ProcessLifecycle.Shutdown().catch((e) => { + console.error("Failed to shut down:", e); + }); }, ); diff --git a/src/bundle/Server.ts b/src/bundle/Server.ts index 91c3dcf42..c396d1000 100644 --- a/src/bundle/Server.ts +++ b/src/bundle/Server.ts @@ -16,21 +16,19 @@ along with this program. If not, see . */ -import morgan from "morgan"; - -process.on("unhandledRejection", console.error); -process.on("uncaughtException", console.error); - import http from "node:http"; +import fs from "node:fs"; +import cluster from "node:cluster"; +import morgan from "morgan"; +import express from "express"; +import { green, bold } from "picocolors"; import * as Api from "@spacebar/api"; import * as Gateway from "@spacebar/gateway"; import * as Webrtc from "@spacebar/webrtc"; import { CDNServer } from "@spacebar/cdn"; -import express from "express"; -import { green, bold } from "picocolors"; import { Config, initDatabase } from "@spacebar/util"; -import fs from "node:fs"; -import cluster from "node:cluster"; +import { ProcessLifecycle } from "../util/util/ProcessLifecycle"; +import { Monitoring } from "../util/monitoring/Monitoring"; const app = express(); const server = http.createServer(); @@ -48,8 +46,7 @@ const webrtc = new Webrtc.Server({ production, }); -process.on("SIGTERM", async () => { - console.log("Shutting down due to SIGTERM"); +ProcessLifecycle.eventEmitter.on("stopping", async () => { await gateway.stop(); await cdn.stop(); await api.stop(); @@ -58,6 +55,7 @@ process.on("SIGTERM", async () => { }); async function main() { + await Monitoring.init(); await initDatabase(); await Config.init(); diff --git a/src/cdn/Server.ts b/src/cdn/Server.ts index 8260407ec..8964161dc 100644 --- a/src/cdn/Server.ts +++ b/src/cdn/Server.ts @@ -16,13 +16,15 @@ along with this program. If not, see . */ +import path from "node:path"; +import morgan from "morgan"; import { Server, ServerOptions } from "lambert-server"; import { Attachment, Config, initDatabase, registerRoutes } from "@spacebar/util"; import { CORS, BodyParser } from "@spacebar/api"; -import path from "node:path"; import guildProfilesRoute from "./routes/guild-profiles"; -import morgan from "morgan"; import { storage } from "./util"; +import { ProcessLifecycle } from "../util/util/ProcessLifecycle"; +import { Monitoring } from "../util/monitoring/Monitoring"; export type CDNServerOptions = ServerOptions; @@ -34,6 +36,8 @@ export class CDNServer extends Server { } async start() { + await Monitoring.init(); + Monitoring.attach(this.app); await initDatabase(); await Config.init(); @@ -71,6 +75,7 @@ export class CDNServer extends Server { this.app.use("/guilds/:guild_id/users/:user_id/banners", guildProfilesRoute); if (process.env.LOG_ROUTES !== "false") console.log("[Server] Route /guilds/:guild_id/users/:user_id/banners registered"); + await ProcessLifecycle.Ready(); return super.start(); } @@ -89,6 +94,8 @@ export class CDNServer extends Server { } async stop() { + await ProcessLifecycle.Shutdown(); + await ProcessLifecycle.Finalize(); return super.stop(); } } diff --git a/src/gateway/Server.ts b/src/gateway/Server.ts index 632fbc529..430419752 100644 --- a/src/gateway/Server.ts +++ b/src/gateway/Server.ts @@ -16,21 +16,22 @@ along with this program. If not, see . */ -import dotenv from "dotenv"; -dotenv.config({ quiet: true }); -import { checkToken, closeDatabase, Config, initDatabase, initEvent, Rights } from "@spacebar/util"; -import ws from "ws"; -import { Connection, openConnections } from "./events/Connection"; import http from "node:http"; -import { cleanupOnStartup } from "./util"; -import { randomString } from "@spacebar/api"; import { setInterval } from "node:timers"; +import ws from "ws"; +import { checkToken, Config, initDatabase, initEvent, Rights } from "@spacebar/util"; +import { randomString } from "@spacebar/api"; // TODO: move to util +import { Connection, openConnections } from "./events/Connection"; +import { cleanupOnStartup } from "./util"; +import { ProcessLifecycle } from "../util/util/ProcessLifecycle"; +import { Monitoring } from "../util/monitoring/Monitoring"; export class Server { public ws: ws.Server; public port: number; public server: http.Server; public production: boolean; + private monitoringLoop: NodeJS.Timeout; constructor({ port, server, production }: { port: number; server?: http.Server; production?: boolean }) { this.port = port; @@ -42,7 +43,7 @@ export class Server { const eluP = [1, 5, 15].map(() => performance.eventLoopUtilization()); const cpu = [1, 5, 15].map(() => process.cpuUsage()); let sec = 0; - setInterval(() => { + const monitoringLoop = setInterval(() => { sec += 1; // for some reason this behaves differently from cpuUsage, so we need an absolute reference as "previous" const eluC = performance.eventLoopUtilization(); @@ -67,7 +68,9 @@ export class Server { res.setHeader("Set-Cookie", `__sb_sessid=${randomString(32)}; Secure; HttpOnly; SameSite=None; Path=/`); } const requestUrl = new URL(`http://${req.headers.host}${req.url}`); - if (requestUrl.pathname === "/_spacebar/gateway/admin/introspect") { + if (requestUrl.pathname === "/metrics") { + return await Monitoring.handleRawRequest(req, res); + } else if (requestUrl.pathname === "/_spacebar/gateway/admin/introspect") { if (!req.headers.authorization) { return res.writeHead(401).end("Unauthorized"); } else { @@ -150,6 +153,8 @@ export class Server { res.writeHead(200).end("Online"); }); + + ProcessLifecycle.eventEmitter.on("stopping", () => clearTimeout(monitoringLoop)); } this.server.on("upgrade", (request, socket, head) => { @@ -167,6 +172,7 @@ export class Server { } async start(): Promise { + await Monitoring.init(); await initDatabase(); await Config.init(); await initEvent(); @@ -177,14 +183,16 @@ export class Server { this.server.listen(this.port); console.log(`[Gateway] online on 0.0.0.0:${this.port}`); } + + await ProcessLifecycle.Ready(); } async stop() { + await ProcessLifecycle.Shutdown(); + clearInterval(this.monitoringLoop); this.ws.clients.forEach((x) => x.close()); - this.ws.close(() => { - this.server.close(() => { - closeDatabase(); - }); - }); + this.ws.close(); + this.server.close(); + await ProcessLifecycle.Finalize(); } } diff --git a/src/gateway/events/Close.ts b/src/gateway/events/Close.ts index 594149057..6a3c6cb32 100644 --- a/src/gateway/events/Close.ts +++ b/src/gateway/events/Close.ts @@ -19,6 +19,7 @@ import { WebSocket } from "@spacebar/gateway"; import { emitEvent, Member, PresenceUpdateEvent, Session, SessionsReplace, User, VoiceState, VoiceStateUpdateEvent, distributePresenceUpdate } from "@spacebar/util"; import { randomString } from "@spacebar/api"; +import { ProcessLifecycle } from "../../util/util/ProcessLifecycle"; export async function Close(this: WebSocket, code: number, reason: Buffer) { console.log("[WebSocket] closed", code, reason.toString()); @@ -32,36 +33,37 @@ export async function Close(this: WebSocket, code: number, reason: Buffer) { const authSessionId = this.session?.session_id; const closedAt = Date.now(); - setTimeout(async () => { - console.log("Handling presence update after disconnect"); - try { - if (authSessionId && this.user_id) { - const s = await Session.findOne({ - where: { user_id: this.user_id, session_id: authSessionId }, - }); - if (s && (s.last_seen?.getTime() ?? 0) <= closedAt) { - console.log("... updating session"); - await Session.update({ user_id: this.user_id, session_id: authSessionId }, { status: "offline", activities: [], client_status: {} }); - this.session = await Session.findOneOrFail({ where: { session_id: this.session_id } }); - console.log("... distributing PRESENCE_UPDATE"); - await distributePresenceUpdate(this.user_id, { - event: "PRESENCE_UPDATE", - data: { - user: (await User.findOneOrFail({ where: { id: this.user_id } })).toPublicUser(), - status: this.session!.getPublicStatus(), - client_status: this.session!.client_status, - activities: this.session!.activities, - }, - origin: "GATEWAY_CLOSE", - transaction_id: `IDENT_${this.user_id}_${randomString()}`, - } satisfies PresenceUpdateEvent); - console.log("... done!"); - } else console.log("... Discarding presence update as the session reactivated"); + if (!(ProcessLifecycle.state === "stopping" || ProcessLifecycle.state === "stopped")) + setTimeout(async () => { + console.log("Handling presence update after disconnect"); + try { + if (authSessionId && this.user_id) { + const s = await Session.findOne({ + where: { user_id: this.user_id, session_id: authSessionId }, + }); + if (s && (s.last_seen?.getTime() ?? 0) <= closedAt) { + console.log("... updating session"); + await Session.update({ user_id: this.user_id, session_id: authSessionId }, { status: "offline", activities: [], client_status: {} }); + this.session = await Session.findOneOrFail({ where: { session_id: this.session_id } }); + console.log("... distributing PRESENCE_UPDATE"); + await distributePresenceUpdate(this.user_id, { + event: "PRESENCE_UPDATE", + data: { + user: (await User.findOneOrFail({ where: { id: this.user_id } })).toPublicUser(), + status: this.session!.getPublicStatus(), + client_status: this.session!.client_status, + activities: this.session!.activities, + }, + origin: "GATEWAY_CLOSE", + transaction_id: `IDENT_${this.user_id}_${randomString()}`, + } satisfies PresenceUpdateEvent); + console.log("... done!"); + } else console.log("... Discarding presence update as the session reactivated"); + } + } catch (e) { + console.error("[WebSocket] Close session cleanup failed", code, e); } - } catch (e) { - console.error("[WebSocket] Close session cleanup failed", code, e); - } - }, 10_000); + }, 10_000); const voiceState = await VoiceState.findOne({ where: { user_id: this.user_id }, diff --git a/src/gateway/events/Connection.ts b/src/gateway/events/Connection.ts index 6f67c0c59..8daa5998f 100644 --- a/src/gateway/events/Connection.ts +++ b/src/gateway/events/Connection.ts @@ -29,6 +29,9 @@ import { Deflate, Inflate } from "fast-zlib"; import { URL } from "node:url"; import { Config } from "@spacebar/util"; import { Decoder, Encoder } from "@toondepauw/node-zstd"; +import { ProcessLifecycle } from "../../util/util/ProcessLifecycle"; +import { Monitoring } from "../../util/monitoring/Monitoring"; +import { Gauge } from "prom-client"; // TODO: check rate limit // TODO: specify rate limit in config @@ -36,23 +39,43 @@ import { Decoder, Encoder } from "@toondepauw/node-zstd"; export const openConnections: WebSocket[] = []; +const openConnectionCount = Monitoring.attachMetric( + "spacebar_gateway_open_connection_count", + new Gauge({ + name: "spacebar_gateway_open_connection_count", + help: "The total number of HTTP requests received", + }), +); + export async function Connection(this: WS.Server, socket: WebSocket, request: IncomingMessage) { openConnections.push(socket); + openConnectionCount.set(openConnections.length); socket.on("close", () => { const index = openConnections.indexOf(socket); if (index !== -1) openConnections.splice(index, 1); + openConnectionCount.set(openConnections.length); }); - for (const sig of ["SIGINT", "SIGTERM", "SIGQUIT"] as const) { - process.on(sig, async () => { - await Send(socket, { - op: OPCODES.Reconnect, - s: socket.sequence++, - d: Math.round(Math.random() * 5000), - }); - socket.close(1000); + const onShutdown = async () => { + await Send(socket, { + op: OPCODES.Reconnect, + s: socket.sequence++, + d: Math.round(Math.random() * 5000), }); - } + + const closeListeners = socket.listeners("close"); + for (const listener of closeListeners) { + socket.off("close", listener); + // noinspection JSVoidFunctionReturnValueUsed - awaiting results + const res = listener.call(socket, 1000, 0) as void | Promise; + if (res) await res; + } + + socket.close(1000); + }; + + if (ProcessLifecycle.state == "stopping" || ProcessLifecycle.state == "stopped") return await onShutdown(); + ProcessLifecycle.eventEmitter.on("stopping", onShutdown); const forwardedFor = Config.get().security.forwardedFor; const ipAddress = forwardedFor ? (request.headers[forwardedFor.toLowerCase()] as string) : request.socket.remoteAddress; diff --git a/src/util/util/Database.ts b/src/util/util/Database.ts index 004fd3f5e..ecbb7110c 100644 --- a/src/util/util/Database.ts +++ b/src/util/util/Database.ts @@ -23,6 +23,7 @@ import { DataSource } from "typeorm"; // noinspection ES6PreferShortImport import { ConfigEntity } from "../entities/Config"; import fs from "node:fs"; +import { ProcessLifecycle } from "./ProcessLifecycle"; // UUID extension option is only supported with postgres // We want to generate all id's with Snowflakes that's why we have our own BaseEntity class @@ -127,11 +128,13 @@ export async function initDatabase(): Promise { } } - console.log(`[Database] ${green("Connected")}`); + ProcessLifecycle.eventEmitter.on("stopped", async () => await closeDatabase()); + console.log(`[Database] ${green("Connected")}`); return dbConnection; } export async function closeDatabase() { - await dbConnection?.destroy(); + if (DataSourceOptions.isInitialized) await DataSourceOptions.destroy(); + if (dbConnection?.isInitialized) await dbConnection?.destroy(); } diff --git a/src/util/util/ProcessLifecycle.ts b/src/util/util/ProcessLifecycle.ts new file mode 100644 index 000000000..6b4a6d778 --- /dev/null +++ b/src/util/util/ProcessLifecycle.ts @@ -0,0 +1,62 @@ +/* + Spacebar: A FOSS re-implementation and extension of the Discord.com backend. + Copyright (C) 2026 Spacebar and Spacebar Contributors + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . +*/ + +import EventEmitter from "node:events"; +import whyIsNodeRunning from "why-is-node-running"; + +interface ProcessLifecycleEvents { + starting: unknown[]; + running: unknown[]; + stopping: unknown[]; + stopped: unknown[]; +} + +export class ProcessLifecycle { + static state: keyof ProcessLifecycleEvents = "starting"; + static eventEmitter: EventEmitter = new EventEmitter(); + + // to be ran after startup is finished + static async Ready() { + await this.emitAsync((this.state = "running")); + } + + // to be ran at the start of shutdown + static async Shutdown() { + await this.emitAsync((this.state = "stopping")); + } + + // to be ran at the end of shutdown (clean up sockets, ...) + static async Finalize() { + await this.emitAsync((this.state = "stopped")); + } + + // emit, except it awaits promises + private static async emitAsync(eventName: keyof ProcessLifecycleEvents) { + for (const evt of this.eventEmitter.listeners(eventName)) { + // noinspection JSVoidFunctionReturnValueUsed - we want to handle async functions blocking aswell + const res = evt() as void | Promise; + if (res) await res; + } + } +} + +process.on("SIGUSR1", () => { + console.log("Handling SIGUSR1:"); + whyIsNodeRunning(); + console.log("\nProcess state:", ProcessLifecycle.state); +}); diff --git a/src/util/util/ipc/listener/RabbitMqSingleListener.ts b/src/util/util/ipc/listener/RabbitMqSingleListener.ts index d120e70ac..7e5867acc 100644 --- a/src/util/util/ipc/listener/RabbitMqSingleListener.ts +++ b/src/util/util/ipc/listener/RabbitMqSingleListener.ts @@ -17,21 +17,36 @@ */ import EventEmitter from "node:events"; -import { BaseEventListener } from "./BaseEventListener"; -import { EVENT, Event, EventOpts, sleep } from "@spacebar/util"; -import amqp, { Channel, ChannelModel } from "amqplib"; import { randomUUID } from "node:crypto"; +import { BaseEventListener } from "./BaseEventListener"; +import { arraySum, EVENT, Event, EventOpts, sleep } from "@spacebar/util"; +import amqp, { Channel, ChannelModel } from "amqplib"; +import { ProcessLifecycle } from "../../ProcessLifecycle"; +import { Monitoring } from "../../../monitoring/Monitoring"; +import { Gauge } from "prom-client"; export class RabbitMqSingleListener extends BaseEventListener { + static openListenersMetric: Gauge; private readonly host: string; private connection?: ChannelModel; private channel?: Channel; eventEmitter: EventEmitter; + openListenersMetric: Gauge.Internal; constructor(host: string) { super(); this.eventEmitter = new EventEmitter(); this.host = host; + + RabbitMqSingleListener.openListenersMetric = Monitoring.attachMetric( + "spacebar_ipc_unix_listener_open_listener_count", + new Gauge({ + name: "spacebar_ipc_rabbitmqsingle_listener_open_listener_count", + help: "Amount of open listeners on unix socket", + labelNames: ["host"], + }), + ); + this.openListenersMetric = RabbitMqSingleListener.openListenersMetric.labels({ host }); } async init() { @@ -50,9 +65,7 @@ export class RabbitMqSingleListener extends BaseEventListener { } this.channel = await this.connection.createChannel(); - for (const sig of ["SIGINT", "SIGTERM", "SIGQUIT"] as const) { - process.on(sig, () => this.close()); - } + ProcessLifecycle.eventEmitter.on("stopped", async () => await this.close()); this.connection.on("error", (err) => { console.error("[RabbitMQSingleListener] Connection error:", err); @@ -94,6 +107,7 @@ export class RabbitMqSingleListener extends BaseEventListener { this.channel = undefined; await this.connection?.close(); this.connection = undefined; + RabbitMqSingleListener.openListenersMetric.remove({ host: this.host }); } async listen(event: string, callback: (event: EventOpts) => unknown): Promise<() => Promise> { @@ -105,10 +119,12 @@ export class RabbitMqSingleListener extends BaseEventListener { }; this.eventEmitter.addListener(event, listener); + this.openListenersMetric.set(arraySum(this.eventEmitter.eventNames().map((e) => this.eventEmitter.listeners(e).length))); const cancel = async () => { this.eventEmitter.removeListener(event, listener); this.eventEmitter.setMaxListeners(this.eventEmitter.getMaxListeners() - 1); + this.openListenersMetric.set(arraySum(this.eventEmitter.eventNames().map((e) => this.eventEmitter.listeners(e).length))); }; this.eventEmitter.setMaxListeners(this.eventEmitter.getMaxListeners() + 1); diff --git a/src/util/util/ipc/listener/UnixSocketListener.ts b/src/util/util/ipc/listener/UnixSocketListener.ts index 9465e9cad..274b8ae12 100644 --- a/src/util/util/ipc/listener/UnixSocketListener.ts +++ b/src/util/util/ipc/listener/UnixSocketListener.ts @@ -20,18 +20,47 @@ import EventEmitter from "node:events"; import fs from "node:fs"; import net, { Server } from "node:net"; import { BaseEventListener } from "./BaseEventListener"; -import { EVENT, Event, EventOpts } from "@spacebar/util"; +import { arraySum, EVENT, Event, EventOpts } from "@spacebar/util"; +import { ProcessLifecycle } from "../../ProcessLifecycle"; +import { Gauge } from "prom-client"; +import { Monitoring } from "../../../monitoring/Monitoring"; export class UnixSocketListener extends BaseEventListener { + static openConnectionsMetric?: Gauge; + static openListenersMetric?: Gauge; + eventEmitter: EventEmitter; socketPath: string; server: Server; isInitialized = false; + openConnectionsMetric: Gauge.Internal; + openListenersMetric: Gauge.Internal; + isInitialized = false; constructor(socketPath: string) { super(); this.eventEmitter = new EventEmitter(); this.socketPath = socketPath; + + UnixSocketListener.openConnectionsMetric = Monitoring.attachMetric( + "spacebar_ipc_unix_listener_open_connection_count", + new Gauge({ + name: "spacebar_ipc_unix_listener_open_connection_count", + help: "Amount of open inbound connections on unix socket", + labelNames: ["path"], + }), + ); + this.openConnectionsMetric = UnixSocketListener.openConnectionsMetric.labels({ path: socketPath }); + + UnixSocketListener.openListenersMetric = Monitoring.attachMetric( + "spacebar_ipc_unix_listener_open_listener_count", + new Gauge({ + name: "spacebar_ipc_unix_listener_open_listener_count", + help: "Amount of open listeners on unix socket", + labelNames: ["path"], + }), + ); + this.openListenersMetric = UnixSocketListener.openListenersMetric.labels({ path: socketPath }); } async init() { @@ -49,6 +78,7 @@ export class UnixSocketListener extends BaseEventListener { this.server = net.createServer((socket) => { socket.on("connect", () => { console.log("[UnixSocketListener] Unix socket client connected, now at", this.server.connections, "connections..."); + this.openConnectionsMetric.set(this.server.connections); }); let buffer = Buffer.alloc(0); socket.on("data", (data: Buffer) => { @@ -71,16 +101,15 @@ export class UnixSocketListener extends BaseEventListener { }); socket.on("close", () => { console.log("[UnixSocketListener] Unix socket client disconnected"); + this.openConnectionsMetric.set(this.server.connections ?? 0); }); }); this.server.listen(this.socketPath, () => { - console.log(`[UnixSocketListener] Listening on ${this.socketPath}`); + console.log(`[UnixSocketListener] listening on ${this.socketPath}`); }); - for (const sig of ["SIGINT", "SIGTERM", "SIGQUIT"] as const) { - process.on(sig, () => this.close()); - } + ProcessLifecycle.eventEmitter.on("stopped", async () => await this.close()); this.isInitialized = true; } @@ -91,11 +120,13 @@ export class UnixSocketListener extends BaseEventListener { console.log("[UnixSocketListener] Closing unix socket server"); this.server.close(); + UnixSocketListener.openConnectionsMetric?.remove({ path: this.socketPath }); // clean up socket file try { fs.unlinkSync(this.socketPath); } catch (e) { + if (e instanceof Error && "errno" in e && e.errno == -2) return; console.error("[UnixSocketListener] Failed to unlink socket file:", e); } } @@ -109,10 +140,12 @@ export class UnixSocketListener extends BaseEventListener { }; this.eventEmitter.addListener(event, listener); + this.openListenersMetric.set(arraySum(this.eventEmitter.eventNames().map((e) => this.eventEmitter.listeners(e).length))); const cancel = async () => { this.eventEmitter.removeListener(event, listener); this.eventEmitter.setMaxListeners(this.eventEmitter.getMaxListeners() - 1); + this.openListenersMetric.set(arraySum(this.eventEmitter.eventNames().map((e) => this.eventEmitter.listeners(e).length))); }; this.eventEmitter.setMaxListeners(this.eventEmitter.getMaxListeners() + 1); diff --git a/src/util/util/ipc/writer/RabbitMqSingleWriter.ts b/src/util/util/ipc/writer/RabbitMqSingleWriter.ts index 72a0d08ce..ad00aa2b5 100644 --- a/src/util/util/ipc/writer/RabbitMqSingleWriter.ts +++ b/src/util/util/ipc/writer/RabbitMqSingleWriter.ts @@ -19,6 +19,7 @@ import { BaseEventWriter } from "./BaseEventWriter"; import amqp, { Channel, ChannelModel } from "amqplib"; import { Event, sleep } from "@spacebar/util"; +import { ProcessLifecycle } from "../../ProcessLifecycle"; export class RabbitMqSingleWriter extends BaseEventWriter { private readonly host: string; @@ -46,10 +47,7 @@ export class RabbitMqSingleWriter extends BaseEventWriter { } this.channel = await this.connection.createChannel(); - for (const sig of ["SIGINT", "SIGTERM", "SIGQUIT"] as const) { - process.on(sig, () => this.close()); - } - + ProcessLifecycle.eventEmitter.on("stopped", async () => await this.close()); this.connection.on("error", (err) => { console.error("[RabbitMQSingleWriter] Connection error:", err); }); diff --git a/src/util/util/ipc/writer/UnixSocketWriter.ts b/src/util/util/ipc/writer/UnixSocketWriter.ts index e3b91de8f..b50f53dcb 100644 --- a/src/util/util/ipc/writer/UnixSocketWriter.ts +++ b/src/util/util/ipc/writer/UnixSocketWriter.ts @@ -22,8 +22,13 @@ import path from "node:path"; import { red } from "picocolors"; import { BaseEventWriter } from "./BaseEventWriter"; import { Event, Stopwatch } from "@spacebar/util"; +import { ProcessLifecycle } from "../../ProcessLifecycle"; +import { Monitoring } from "../../../monitoring/Monitoring"; +import { Gauge } from "prom-client"; export class UnixSocketWriter extends BaseEventWriter { + private static openConnectionsMetric: Gauge; + socketPath: string; clients: { [key: string]: Socket } = {}; watcher?: FSWatcher; @@ -31,10 +36,21 @@ export class UnixSocketWriter extends BaseEventWriter { broadcastLock: Promise = Promise.resolve(); replayLock: Promise = Promise.resolve(); isInitializing = true; + openConnectionsMetric: Gauge.Internal; constructor(socketPath: string) { super(); this.socketPath = socketPath; + + UnixSocketWriter.openConnectionsMetric = Monitoring.attachMetric( + "spacebar_ipc_unix_writer_open_connection_count", + new Gauge({ + name: "spacebar_ipc_unix_writer_open_connection_count", + help: "Amount of open outbound connections on unix socket", + labelNames: ["path"], + }), + ); + this.openConnectionsMetric = UnixSocketWriter.openConnectionsMetric.labels({ path: socketPath }); } async init() { @@ -79,6 +95,7 @@ export class UnixSocketWriter extends BaseEventWriter { try { this.clients[fullPath] = net.createConnection(fullPath, () => { console.log("[UnixSocketWriter] Unix socket client connected to", fullPath); + this.openConnectionsMetric.set(Object.entries(this.clients).length); }); this.clients[fullPath].on("error", (err) => { @@ -93,6 +110,7 @@ export class UnixSocketWriter extends BaseEventWriter { this.clients[fullPath].on("close", () => { console.log("[UnixSocketWriter] Unix socket client closed:", fullPath); delete this.clients[fullPath]; + this.openConnectionsMetric.set(Object.entries(this.clients).length); }); } catch (e) { console.error("[UnixSocketWriter] Failed to create connection to", fullPath, ":", e); @@ -142,10 +160,7 @@ export class UnixSocketWriter extends BaseEventWriter { console.error("[UnixSocketWriter] Unix socket writer failed to read directory:", err); } - for (const sig of ["SIGINT", "SIGTERM", "SIGQUIT"] as const) { - process.on(sig, () => this.close()); - } - + ProcessLifecycle.eventEmitter.on("stopped", async () => await this.close()); this.isInitializing = false; } @@ -226,6 +241,7 @@ export class UnixSocketWriter extends BaseEventWriter { } } this.clients = {}; + UnixSocketWriter.openConnectionsMetric.remove({ path: this.socketPath }); } } diff --git a/src/webrtc/Server.ts b/src/webrtc/Server.ts index 0f8ead2ce..f3e8e2c26 100644 --- a/src/webrtc/Server.ts +++ b/src/webrtc/Server.ts @@ -15,14 +15,15 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ -import dotenv from "dotenv"; -dotenv.config({ quiet: true }); -import { closeDatabase, Config, initDatabase, initEvent, Session, TimeSpan } from "@spacebar/util"; + import http from "node:http"; import ws from "ws"; +import { green, yellow } from "picocolors"; +import { Config, initDatabase, initEvent } from "@spacebar/util"; import { Connection } from "./events/Connection"; import { loadWebRtcLibrary, mediaServer, WRTC_PORT_MAX, WRTC_PORT_MIN, WRTC_PUBLIC_IP } from "./util"; -import { green, yellow } from "picocolors"; +import { ProcessLifecycle } from "../util/util/ProcessLifecycle"; +import { Monitoring } from "../util/monitoring/Monitoring"; export class Server { public ws: ws.Server; @@ -36,8 +37,11 @@ export class Server { if (server) this.server = server; else { - this.server = http.createServer(function (req, res) { - res.writeHead(200).end("Online"); + this.server = http.createServer(async (req, res) => { + const requestUrl = new URL(`http://${req.headers.host}${req.url}`); + if (requestUrl.pathname === "/metrics") { + return await Monitoring.handleRawRequest(req, res); + } else res.writeHead(200).end("Online"); }); } @@ -59,6 +63,7 @@ export class Server { } async start(): Promise { + await Monitoring.init(); await initDatabase(); await Config.init(); await initEvent(); @@ -76,11 +81,14 @@ export class Server { this.server.listen(this.port); console.log(`[WebRTC] ${green(`online on 0.0.0.0:${this.port}`)}`); } + + await ProcessLifecycle.Ready(); } async stop() { - await closeDatabase(); + await ProcessLifecycle.Shutdown(); this.server.close(); - mediaServer?.stop(); + await mediaServer?.stop(); + await ProcessLifecycle.Finalize(); } }