Process lifecycle scripts

This commit is contained in:
Rory&
2026-05-28 10:38:39 +02:00
parent ac3160e833
commit 0d6824d285
15 changed files with 282 additions and 100 deletions
BIN
View File
Binary file not shown.
+10 -5
View File
@@ -16,15 +16,17 @@
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
import path from "node:path";
import { Request, Response, Router } from "express";
import morgan from "morgan";
import { Server, ServerOptions } from "lambert-server";
import { red } from "picocolors";
import { Config, ConnectionConfig, ConnectionLoader, Email, JSONReplacer, WebAuthn, initDatabase, initEvent, registerRoutes, getDatabase, getRevInfoOrFail } from "@spacebar/util";
import { Authentication, CORS, ImageProxy, BodyParser, ErrorHandler, initRateLimits, initTranslation } from "./middlewares";
import { Request, Response, Router } from "express";
import { Server, ServerOptions } from "lambert-server";
import morgan from "morgan";
import path from "node:path";
import { red } from "picocolors";
import { initInstance } from "./util/handlers/Instance";
import { route } from "./util";
import { ProcessLifecycle } from "../util/util/ProcessLifecycle";
import { Monitoring } from "../util/monitoring/Monitoring";
const ASSETS_FOLDER = path.join(__dirname, "..", "..", "assets");
const PUBLIC_ASSETS_FOLDER = path.join(ASSETS_FOLDER, "public");
@@ -50,6 +52,8 @@ export class SpacebarServer extends Server {
}
async start() {
await Monitoring.init();
Monitoring.attach(this.app);
await initDatabase();
await Config.init();
await initEvent();
@@ -196,6 +200,7 @@ export class SpacebarServer extends Server {
if (logRequests) console.log(red(`Warning: Request logging is enabled! This will spam your console!\nTo disable this, unset the 'LOG_REQUESTS' environment variable!`));
await ProcessLifecycle.Ready();
return super.start();
}
}
+4 -1
View File
@@ -18,6 +18,7 @@
import { route } from "@spacebar/api";
import { Request, Response, Router } from "express";
import { ProcessLifecycle } from "../../util/util/ProcessLifecycle";
const router: Router = Router({ mergeParams: true });
@@ -35,7 +36,9 @@ router.post(
(req: Request, res: Response) => {
console.log(`/stop was called by ${req.user_id} at ${new Date()}`);
res.sendStatus(200);
process.kill(process.pid, "SIGTERM");
ProcessLifecycle.Shutdown().catch((e) => {
console.error("Failed to shut down:", e);
});
},
);
+9 -11
View File
@@ -16,21 +16,19 @@
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
import morgan from "morgan";
process.on("unhandledRejection", console.error);
process.on("uncaughtException", console.error);
import http from "node:http";
import fs from "node:fs";
import cluster from "node:cluster";
import morgan from "morgan";
import express from "express";
import { green, bold } from "picocolors";
import * as Api from "@spacebar/api";
import * as Gateway from "@spacebar/gateway";
import * as Webrtc from "@spacebar/webrtc";
import { CDNServer } from "@spacebar/cdn";
import express from "express";
import { green, bold } from "picocolors";
import { Config, initDatabase } from "@spacebar/util";
import fs from "node:fs";
import cluster from "node:cluster";
import { ProcessLifecycle } from "../util/util/ProcessLifecycle";
import { Monitoring } from "../util/monitoring/Monitoring";
const app = express();
const server = http.createServer();
@@ -48,8 +46,7 @@ const webrtc = new Webrtc.Server({
production,
});
process.on("SIGTERM", async () => {
console.log("Shutting down due to SIGTERM");
ProcessLifecycle.eventEmitter.on("stopping", async () => {
await gateway.stop();
await cdn.stop();
await api.stop();
@@ -58,6 +55,7 @@ process.on("SIGTERM", async () => {
});
async function main() {
await Monitoring.init();
await initDatabase();
await Config.init();
+9 -2
View File
@@ -16,13 +16,15 @@
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
import path from "node:path";
import morgan from "morgan";
import { Server, ServerOptions } from "lambert-server";
import { Attachment, Config, initDatabase, registerRoutes } from "@spacebar/util";
import { CORS, BodyParser } from "@spacebar/api";
import path from "node:path";
import guildProfilesRoute from "./routes/guild-profiles";
import morgan from "morgan";
import { storage } from "./util";
import { ProcessLifecycle } from "../util/util/ProcessLifecycle";
import { Monitoring } from "../util/monitoring/Monitoring";
export type CDNServerOptions = ServerOptions;
@@ -34,6 +36,8 @@ export class CDNServer extends Server {
}
async start() {
await Monitoring.init();
Monitoring.attach(this.app);
await initDatabase();
await Config.init();
@@ -71,6 +75,7 @@ export class CDNServer extends Server {
this.app.use("/guilds/:guild_id/users/:user_id/banners", guildProfilesRoute);
if (process.env.LOG_ROUTES !== "false") console.log("[Server] Route /guilds/:guild_id/users/:user_id/banners registered");
await ProcessLifecycle.Ready();
return super.start();
}
@@ -89,6 +94,8 @@ export class CDNServer extends Server {
}
async stop() {
await ProcessLifecycle.Shutdown();
await ProcessLifecycle.Finalize();
return super.stop();
}
}
+22 -14
View File
@@ -16,21 +16,22 @@
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
import dotenv from "dotenv";
dotenv.config({ quiet: true });
import { checkToken, closeDatabase, Config, initDatabase, initEvent, Rights } from "@spacebar/util";
import ws from "ws";
import { Connection, openConnections } from "./events/Connection";
import http from "node:http";
import { cleanupOnStartup } from "./util";
import { randomString } from "@spacebar/api";
import { setInterval } from "node:timers";
import ws from "ws";
import { checkToken, Config, initDatabase, initEvent, Rights } from "@spacebar/util";
import { randomString } from "@spacebar/api"; // TODO: move to util
import { Connection, openConnections } from "./events/Connection";
import { cleanupOnStartup } from "./util";
import { ProcessLifecycle } from "../util/util/ProcessLifecycle";
import { Monitoring } from "../util/monitoring/Monitoring";
export class Server {
public ws: ws.Server;
public port: number;
public server: http.Server;
public production: boolean;
private monitoringLoop: NodeJS.Timeout;
constructor({ port, server, production }: { port: number; server?: http.Server; production?: boolean }) {
this.port = port;
@@ -42,7 +43,7 @@ export class Server {
const eluP = [1, 5, 15].map(() => performance.eventLoopUtilization());
const cpu = [1, 5, 15].map(() => process.cpuUsage());
let sec = 0;
setInterval(() => {
const monitoringLoop = setInterval(() => {
sec += 1;
// for some reason this behaves differently from cpuUsage, so we need an absolute reference as "previous"
const eluC = performance.eventLoopUtilization();
@@ -67,7 +68,9 @@ export class Server {
res.setHeader("Set-Cookie", `__sb_sessid=${randomString(32)}; Secure; HttpOnly; SameSite=None; Path=/`);
}
const requestUrl = new URL(`http://${req.headers.host}${req.url}`);
if (requestUrl.pathname === "/_spacebar/gateway/admin/introspect") {
if (requestUrl.pathname === "/metrics") {
return await Monitoring.handleRawRequest(req, res);
} else if (requestUrl.pathname === "/_spacebar/gateway/admin/introspect") {
if (!req.headers.authorization) {
return res.writeHead(401).end("Unauthorized");
} else {
@@ -150,6 +153,8 @@ export class Server {
res.writeHead(200).end("Online");
});
ProcessLifecycle.eventEmitter.on("stopping", () => clearTimeout(monitoringLoop));
}
this.server.on("upgrade", (request, socket, head) => {
@@ -167,6 +172,7 @@ export class Server {
}
async start(): Promise<void> {
await Monitoring.init();
await initDatabase();
await Config.init();
await initEvent();
@@ -177,14 +183,16 @@ export class Server {
this.server.listen(this.port);
console.log(`[Gateway] online on 0.0.0.0:${this.port}`);
}
await ProcessLifecycle.Ready();
}
async stop() {
await ProcessLifecycle.Shutdown();
clearInterval(this.monitoringLoop);
this.ws.clients.forEach((x) => x.close());
this.ws.close(() => {
this.server.close(() => {
closeDatabase();
});
});
this.ws.close();
this.server.close();
await ProcessLifecycle.Finalize();
}
}
+31 -29
View File
@@ -19,6 +19,7 @@
import { WebSocket } from "@spacebar/gateway";
import { emitEvent, Member, PresenceUpdateEvent, Session, SessionsReplace, User, VoiceState, VoiceStateUpdateEvent, distributePresenceUpdate } from "@spacebar/util";
import { randomString } from "@spacebar/api";
import { ProcessLifecycle } from "../../util/util/ProcessLifecycle";
export async function Close(this: WebSocket, code: number, reason: Buffer) {
console.log("[WebSocket] closed", code, reason.toString());
@@ -32,36 +33,37 @@ export async function Close(this: WebSocket, code: number, reason: Buffer) {
const authSessionId = this.session?.session_id;
const closedAt = Date.now();
setTimeout(async () => {
console.log("Handling presence update after disconnect");
try {
if (authSessionId && this.user_id) {
const s = await Session.findOne({
where: { user_id: this.user_id, session_id: authSessionId },
});
if (s && (s.last_seen?.getTime() ?? 0) <= closedAt) {
console.log("... updating session");
await Session.update({ user_id: this.user_id, session_id: authSessionId }, { status: "offline", activities: [], client_status: {} });
this.session = await Session.findOneOrFail({ where: { session_id: this.session_id } });
console.log("... distributing PRESENCE_UPDATE");
await distributePresenceUpdate(this.user_id, {
event: "PRESENCE_UPDATE",
data: {
user: (await User.findOneOrFail({ where: { id: this.user_id } })).toPublicUser(),
status: this.session!.getPublicStatus(),
client_status: this.session!.client_status,
activities: this.session!.activities,
},
origin: "GATEWAY_CLOSE",
transaction_id: `IDENT_${this.user_id}_${randomString()}`,
} satisfies PresenceUpdateEvent);
console.log("... done!");
} else console.log("... Discarding presence update as the session reactivated");
if (!(ProcessLifecycle.state === "stopping" || ProcessLifecycle.state === "stopped"))
setTimeout(async () => {
console.log("Handling presence update after disconnect");
try {
if (authSessionId && this.user_id) {
const s = await Session.findOne({
where: { user_id: this.user_id, session_id: authSessionId },
});
if (s && (s.last_seen?.getTime() ?? 0) <= closedAt) {
console.log("... updating session");
await Session.update({ user_id: this.user_id, session_id: authSessionId }, { status: "offline", activities: [], client_status: {} });
this.session = await Session.findOneOrFail({ where: { session_id: this.session_id } });
console.log("... distributing PRESENCE_UPDATE");
await distributePresenceUpdate(this.user_id, {
event: "PRESENCE_UPDATE",
data: {
user: (await User.findOneOrFail({ where: { id: this.user_id } })).toPublicUser(),
status: this.session!.getPublicStatus(),
client_status: this.session!.client_status,
activities: this.session!.activities,
},
origin: "GATEWAY_CLOSE",
transaction_id: `IDENT_${this.user_id}_${randomString()}`,
} satisfies PresenceUpdateEvent);
console.log("... done!");
} else console.log("... Discarding presence update as the session reactivated");
}
} catch (e) {
console.error("[WebSocket] Close session cleanup failed", code, e);
}
} catch (e) {
console.error("[WebSocket] Close session cleanup failed", code, e);
}
}, 10_000);
}, 10_000);
const voiceState = await VoiceState.findOne({
where: { user_id: this.user_id },
+32 -9
View File
@@ -29,6 +29,9 @@ import { Deflate, Inflate } from "fast-zlib";
import { URL } from "node:url";
import { Config } from "@spacebar/util";
import { Decoder, Encoder } from "@toondepauw/node-zstd";
import { ProcessLifecycle } from "../../util/util/ProcessLifecycle";
import { Monitoring } from "../../util/monitoring/Monitoring";
import { Gauge } from "prom-client";
// TODO: check rate limit
// TODO: specify rate limit in config
@@ -36,23 +39,43 @@ import { Decoder, Encoder } from "@toondepauw/node-zstd";
export const openConnections: WebSocket[] = [];
const openConnectionCount = Monitoring.attachMetric(
"spacebar_gateway_open_connection_count",
new Gauge({
name: "spacebar_gateway_open_connection_count",
help: "The total number of HTTP requests received",
}),
);
export async function Connection(this: WS.Server, socket: WebSocket, request: IncomingMessage) {
openConnections.push(socket);
openConnectionCount.set(openConnections.length);
socket.on("close", () => {
const index = openConnections.indexOf(socket);
if (index !== -1) openConnections.splice(index, 1);
openConnectionCount.set(openConnections.length);
});
for (const sig of ["SIGINT", "SIGTERM", "SIGQUIT"] as const) {
process.on(sig, async () => {
await Send(socket, {
op: OPCODES.Reconnect,
s: socket.sequence++,
d: Math.round(Math.random() * 5000),
});
socket.close(1000);
const onShutdown = async () => {
await Send(socket, {
op: OPCODES.Reconnect,
s: socket.sequence++,
d: Math.round(Math.random() * 5000),
});
}
const closeListeners = socket.listeners("close");
for (const listener of closeListeners) {
socket.off("close", listener);
// noinspection JSVoidFunctionReturnValueUsed - awaiting results
const res = listener.call(socket, 1000, 0) as void | Promise<void>;
if (res) await res;
}
socket.close(1000);
};
if (ProcessLifecycle.state == "stopping" || ProcessLifecycle.state == "stopped") return await onShutdown();
ProcessLifecycle.eventEmitter.on("stopping", onShutdown);
const forwardedFor = Config.get().security.forwardedFor;
const ipAddress = forwardedFor ? (request.headers[forwardedFor.toLowerCase()] as string) : request.socket.remoteAddress;
+5 -2
View File
@@ -23,6 +23,7 @@ import { DataSource } from "typeorm";
// noinspection ES6PreferShortImport
import { ConfigEntity } from "../entities/Config";
import fs from "node:fs";
import { ProcessLifecycle } from "./ProcessLifecycle";
// UUID extension option is only supported with postgres
// We want to generate all id's with Snowflakes that's why we have our own BaseEntity class
@@ -127,11 +128,13 @@ export async function initDatabase(): Promise<DataSource> {
}
}
console.log(`[Database] ${green("Connected")}`);
ProcessLifecycle.eventEmitter.on("stopped", async () => await closeDatabase());
console.log(`[Database] ${green("Connected")}`);
return dbConnection;
}
export async function closeDatabase() {
await dbConnection?.destroy();
if (DataSourceOptions.isInitialized) await DataSourceOptions.destroy();
if (dbConnection?.isInitialized) await dbConnection?.destroy();
}
+62
View File
@@ -0,0 +1,62 @@
/*
Spacebar: A FOSS re-implementation and extension of the Discord.com backend.
Copyright (C) 2026 Spacebar and Spacebar Contributors
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
import EventEmitter from "node:events";
import whyIsNodeRunning from "why-is-node-running";
interface ProcessLifecycleEvents {
starting: unknown[];
running: unknown[];
stopping: unknown[];
stopped: unknown[];
}
export class ProcessLifecycle {
static state: keyof ProcessLifecycleEvents = "starting";
static eventEmitter: EventEmitter<ProcessLifecycleEvents> = new EventEmitter();
// to be ran after startup is finished
static async Ready() {
await this.emitAsync((this.state = "running"));
}
// to be ran at the start of shutdown
static async Shutdown() {
await this.emitAsync((this.state = "stopping"));
}
// to be ran at the end of shutdown (clean up sockets, ...)
static async Finalize() {
await this.emitAsync((this.state = "stopped"));
}
// emit, except it awaits promises
private static async emitAsync(eventName: keyof ProcessLifecycleEvents) {
for (const evt of this.eventEmitter.listeners(eventName)) {
// noinspection JSVoidFunctionReturnValueUsed - we want to handle async functions blocking aswell
const res = evt() as void | Promise<void>;
if (res) await res;
}
}
}
process.on("SIGUSR1", () => {
console.log("Handling SIGUSR1:");
whyIsNodeRunning();
console.log("\nProcess state:", ProcessLifecycle.state);
});
@@ -17,21 +17,36 @@
*/
import EventEmitter from "node:events";
import { BaseEventListener } from "./BaseEventListener";
import { EVENT, Event, EventOpts, sleep } from "@spacebar/util";
import amqp, { Channel, ChannelModel } from "amqplib";
import { randomUUID } from "node:crypto";
import { BaseEventListener } from "./BaseEventListener";
import { arraySum, EVENT, Event, EventOpts, sleep } from "@spacebar/util";
import amqp, { Channel, ChannelModel } from "amqplib";
import { ProcessLifecycle } from "../../ProcessLifecycle";
import { Monitoring } from "../../../monitoring/Monitoring";
import { Gauge } from "prom-client";
export class RabbitMqSingleListener extends BaseEventListener {
static openListenersMetric: Gauge;
private readonly host: string;
private connection?: ChannelModel;
private channel?: Channel;
eventEmitter: EventEmitter;
openListenersMetric: Gauge.Internal<string>;
constructor(host: string) {
super();
this.eventEmitter = new EventEmitter();
this.host = host;
RabbitMqSingleListener.openListenersMetric = Monitoring.attachMetric(
"spacebar_ipc_unix_listener_open_listener_count",
new Gauge({
name: "spacebar_ipc_rabbitmqsingle_listener_open_listener_count",
help: "Amount of open listeners on unix socket",
labelNames: ["host"],
}),
);
this.openListenersMetric = RabbitMqSingleListener.openListenersMetric.labels({ host });
}
async init() {
@@ -50,9 +65,7 @@ export class RabbitMqSingleListener extends BaseEventListener {
}
this.channel = await this.connection.createChannel();
for (const sig of ["SIGINT", "SIGTERM", "SIGQUIT"] as const) {
process.on(sig, () => this.close());
}
ProcessLifecycle.eventEmitter.on("stopped", async () => await this.close());
this.connection.on("error", (err) => {
console.error("[RabbitMQSingleListener] Connection error:", err);
@@ -94,6 +107,7 @@ export class RabbitMqSingleListener extends BaseEventListener {
this.channel = undefined;
await this.connection?.close();
this.connection = undefined;
RabbitMqSingleListener.openListenersMetric.remove({ host: this.host });
}
async listen(event: string, callback: (event: EventOpts) => unknown): Promise<() => Promise<void>> {
@@ -105,10 +119,12 @@ export class RabbitMqSingleListener extends BaseEventListener {
};
this.eventEmitter.addListener(event, listener);
this.openListenersMetric.set(arraySum(this.eventEmitter.eventNames().map((e) => this.eventEmitter.listeners(e).length)));
const cancel = async () => {
this.eventEmitter.removeListener(event, listener);
this.eventEmitter.setMaxListeners(this.eventEmitter.getMaxListeners() - 1);
this.openListenersMetric.set(arraySum(this.eventEmitter.eventNames().map((e) => this.eventEmitter.listeners(e).length)));
};
this.eventEmitter.setMaxListeners(this.eventEmitter.getMaxListeners() + 1);
@@ -20,18 +20,47 @@ import EventEmitter from "node:events";
import fs from "node:fs";
import net, { Server } from "node:net";
import { BaseEventListener } from "./BaseEventListener";
import { EVENT, Event, EventOpts } from "@spacebar/util";
import { arraySum, EVENT, Event, EventOpts } from "@spacebar/util";
import { ProcessLifecycle } from "../../ProcessLifecycle";
import { Gauge } from "prom-client";
import { Monitoring } from "../../../monitoring/Monitoring";
export class UnixSocketListener extends BaseEventListener {
static openConnectionsMetric?: Gauge;
static openListenersMetric?: Gauge;
eventEmitter: EventEmitter;
socketPath: string;
server: Server;
isInitialized = false;
openConnectionsMetric: Gauge.Internal<string>;
openListenersMetric: Gauge.Internal<string>;
isInitialized = false;
constructor(socketPath: string) {
super();
this.eventEmitter = new EventEmitter();
this.socketPath = socketPath;
UnixSocketListener.openConnectionsMetric = Monitoring.attachMetric(
"spacebar_ipc_unix_listener_open_connection_count",
new Gauge({
name: "spacebar_ipc_unix_listener_open_connection_count",
help: "Amount of open inbound connections on unix socket",
labelNames: ["path"],
}),
);
this.openConnectionsMetric = UnixSocketListener.openConnectionsMetric.labels({ path: socketPath });
UnixSocketListener.openListenersMetric = Monitoring.attachMetric(
"spacebar_ipc_unix_listener_open_listener_count",
new Gauge({
name: "spacebar_ipc_unix_listener_open_listener_count",
help: "Amount of open listeners on unix socket",
labelNames: ["path"],
}),
);
this.openListenersMetric = UnixSocketListener.openListenersMetric.labels({ path: socketPath });
}
async init() {
@@ -49,6 +78,7 @@ export class UnixSocketListener extends BaseEventListener {
this.server = net.createServer((socket) => {
socket.on("connect", () => {
console.log("[UnixSocketListener] Unix socket client connected, now at", this.server.connections, "connections...");
this.openConnectionsMetric.set(this.server.connections);
});
let buffer = Buffer.alloc(0);
socket.on("data", (data: Buffer) => {
@@ -71,16 +101,15 @@ export class UnixSocketListener extends BaseEventListener {
});
socket.on("close", () => {
console.log("[UnixSocketListener] Unix socket client disconnected");
this.openConnectionsMetric.set(this.server.connections ?? 0);
});
});
this.server.listen(this.socketPath, () => {
console.log(`[UnixSocketListener] Listening on ${this.socketPath}`);
console.log(`[UnixSocketListener] listening on ${this.socketPath}`);
});
for (const sig of ["SIGINT", "SIGTERM", "SIGQUIT"] as const) {
process.on(sig, () => this.close());
}
ProcessLifecycle.eventEmitter.on("stopped", async () => await this.close());
this.isInitialized = true;
}
@@ -91,11 +120,13 @@ export class UnixSocketListener extends BaseEventListener {
console.log("[UnixSocketListener] Closing unix socket server");
this.server.close();
UnixSocketListener.openConnectionsMetric?.remove({ path: this.socketPath });
// clean up socket file
try {
fs.unlinkSync(this.socketPath);
} catch (e) {
if (e instanceof Error && "errno" in e && e.errno == -2) return;
console.error("[UnixSocketListener] Failed to unlink socket file:", e);
}
}
@@ -109,10 +140,12 @@ export class UnixSocketListener extends BaseEventListener {
};
this.eventEmitter.addListener(event, listener);
this.openListenersMetric.set(arraySum(this.eventEmitter.eventNames().map((e) => this.eventEmitter.listeners(e).length)));
const cancel = async () => {
this.eventEmitter.removeListener(event, listener);
this.eventEmitter.setMaxListeners(this.eventEmitter.getMaxListeners() - 1);
this.openListenersMetric.set(arraySum(this.eventEmitter.eventNames().map((e) => this.eventEmitter.listeners(e).length)));
};
this.eventEmitter.setMaxListeners(this.eventEmitter.getMaxListeners() + 1);
@@ -19,6 +19,7 @@
import { BaseEventWriter } from "./BaseEventWriter";
import amqp, { Channel, ChannelModel } from "amqplib";
import { Event, sleep } from "@spacebar/util";
import { ProcessLifecycle } from "../../ProcessLifecycle";
export class RabbitMqSingleWriter extends BaseEventWriter {
private readonly host: string;
@@ -46,10 +47,7 @@ export class RabbitMqSingleWriter extends BaseEventWriter {
}
this.channel = await this.connection.createChannel();
for (const sig of ["SIGINT", "SIGTERM", "SIGQUIT"] as const) {
process.on(sig, () => this.close());
}
ProcessLifecycle.eventEmitter.on("stopped", async () => await this.close());
this.connection.on("error", (err) => {
console.error("[RabbitMQSingleWriter] Connection error:", err);
});
+20 -4
View File
@@ -22,8 +22,13 @@ import path from "node:path";
import { red } from "picocolors";
import { BaseEventWriter } from "./BaseEventWriter";
import { Event, Stopwatch } from "@spacebar/util";
import { ProcessLifecycle } from "../../ProcessLifecycle";
import { Monitoring } from "../../../monitoring/Monitoring";
import { Gauge } from "prom-client";
export class UnixSocketWriter extends BaseEventWriter {
private static openConnectionsMetric: Gauge;
socketPath: string;
clients: { [key: string]: Socket } = {};
watcher?: FSWatcher;
@@ -31,10 +36,21 @@ export class UnixSocketWriter extends BaseEventWriter {
broadcastLock: Promise<void> = Promise.resolve();
replayLock: Promise<void> = Promise.resolve();
isInitializing = true;
openConnectionsMetric: Gauge.Internal<string>;
constructor(socketPath: string) {
super();
this.socketPath = socketPath;
UnixSocketWriter.openConnectionsMetric = Monitoring.attachMetric(
"spacebar_ipc_unix_writer_open_connection_count",
new Gauge({
name: "spacebar_ipc_unix_writer_open_connection_count",
help: "Amount of open outbound connections on unix socket",
labelNames: ["path"],
}),
);
this.openConnectionsMetric = UnixSocketWriter.openConnectionsMetric.labels({ path: socketPath });
}
async init() {
@@ -79,6 +95,7 @@ export class UnixSocketWriter extends BaseEventWriter {
try {
this.clients[fullPath] = net.createConnection(fullPath, () => {
console.log("[UnixSocketWriter] Unix socket client connected to", fullPath);
this.openConnectionsMetric.set(Object.entries(this.clients).length);
});
this.clients[fullPath].on("error", (err) => {
@@ -93,6 +110,7 @@ export class UnixSocketWriter extends BaseEventWriter {
this.clients[fullPath].on("close", () => {
console.log("[UnixSocketWriter] Unix socket client closed:", fullPath);
delete this.clients[fullPath];
this.openConnectionsMetric.set(Object.entries(this.clients).length);
});
} catch (e) {
console.error("[UnixSocketWriter] Failed to create connection to", fullPath, ":", e);
@@ -142,10 +160,7 @@ export class UnixSocketWriter extends BaseEventWriter {
console.error("[UnixSocketWriter] Unix socket writer failed to read directory:", err);
}
for (const sig of ["SIGINT", "SIGTERM", "SIGQUIT"] as const) {
process.on(sig, () => this.close());
}
ProcessLifecycle.eventEmitter.on("stopped", async () => await this.close());
this.isInitializing = false;
}
@@ -226,6 +241,7 @@ export class UnixSocketWriter extends BaseEventWriter {
}
}
this.clients = {};
UnixSocketWriter.openConnectionsMetric.remove({ path: this.socketPath });
}
}
+16 -8
View File
@@ -15,14 +15,15 @@
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
import dotenv from "dotenv";
dotenv.config({ quiet: true });
import { closeDatabase, Config, initDatabase, initEvent, Session, TimeSpan } from "@spacebar/util";
import http from "node:http";
import ws from "ws";
import { green, yellow } from "picocolors";
import { Config, initDatabase, initEvent } from "@spacebar/util";
import { Connection } from "./events/Connection";
import { loadWebRtcLibrary, mediaServer, WRTC_PORT_MAX, WRTC_PORT_MIN, WRTC_PUBLIC_IP } from "./util";
import { green, yellow } from "picocolors";
import { ProcessLifecycle } from "../util/util/ProcessLifecycle";
import { Monitoring } from "../util/monitoring/Monitoring";
export class Server {
public ws: ws.Server;
@@ -36,8 +37,11 @@ export class Server {
if (server) this.server = server;
else {
this.server = http.createServer(function (req, res) {
res.writeHead(200).end("Online");
this.server = http.createServer(async (req, res) => {
const requestUrl = new URL(`http://${req.headers.host}${req.url}`);
if (requestUrl.pathname === "/metrics") {
return await Monitoring.handleRawRequest(req, res);
} else res.writeHead(200).end("Online");
});
}
@@ -59,6 +63,7 @@ export class Server {
}
async start(): Promise<void> {
await Monitoring.init();
await initDatabase();
await Config.init();
await initEvent();
@@ -76,11 +81,14 @@ export class Server {
this.server.listen(this.port);
console.log(`[WebRTC] ${green(`online on 0.0.0.0:${this.port}`)}`);
}
await ProcessLifecycle.Ready();
}
async stop() {
await closeDatabase();
await ProcessLifecycle.Shutdown();
this.server.close();
mediaServer?.stop();
await mediaServer?.stop();
await ProcessLifecycle.Finalize();
}
}