Merge remote-tracking branch 'oh64/master'

This commit is contained in:
Rory&
2026-02-03 23:37:53 +01:00

View File

@@ -194,6 +194,19 @@ class UnixSocketListener {
async init() {
const net = await import("net");
const fs = await import("fs");
// remove stale socket file if it exists
// can happen if there's a PID conflict (across containers/PID namespaces)
try {
if (fs.existsSync(this.socketPath)) {
fs.unlinkSync(this.socketPath);
console.log("[Events] Removed stale socket file:", this.socketPath);
}
} catch (e) {
console.error("[Events] Failed to remove stale socket:", e);
}
const server = net.createServer((socket) => {
socket.on("connect", () => {
console.log("[Events] Unix socket client connected");
@@ -229,6 +242,14 @@ class UnixSocketListener {
const shutdown = () => {
console.log("[Events] Closing unix socket server");
server.close();
// clean up socket file
try {
fs.unlinkSync(this.socketPath);
} catch (e) {
console.error("[Events] Failed to unlink socket file:", e);
}
process.exit(0);
};
for (const sig of ["SIGINT", "SIGTERM", "SIGQUIT"] as const) {
@@ -260,7 +281,7 @@ class UnixSocketListener {
class UnixSocketWriter {
socketPath: string;
clients: { [key: string]: Socket } = {};
watcher: FSWatcher;
watcher?: FSWatcher;
constructor(socketPath: string) {
this.socketPath = socketPath;
@@ -276,51 +297,129 @@ class UnixSocketWriter {
const connect = (file: string) => {
const fullPath = path.join(this.socketPath, file);
this.clients[fullPath] = net.createConnection(fullPath, () => {
console.log("[Events] Unix socket client connected to", fullPath);
});
this.clients[fullPath].on("error", (err) => {
console.error("[Events] Unix socket client error on", fullPath, ":", err);
});
// avoid duplicate connections
if (this.clients[fullPath] && !this.clients[fullPath].destroyed) {
console.log("[Events] Unix socket client already connected to", fullPath);
return;
}
// clean up old connection if it exists
if (this.clients[fullPath]) {
try {
this.clients[fullPath].destroy();
} catch (e) {
// ignore
}
delete this.clients[fullPath];
}
// check if it's actually a socket file (not a ghost/regular file)
try {
const stats = fs.statSync(fullPath);
if (!stats.isSocket()) {
console.log("[Events] Ignoring non-socket file:", fullPath);
return;
}
} catch (e) {
console.log("[Events] Cannot stat socket file:", fullPath);
return;
}
try {
this.clients[fullPath] = net.createConnection(fullPath, () => {
console.log("[Events] Unix socket client connected to", fullPath);
});
this.clients[fullPath].on("error", (err) => {
console.error("[Events] Unix socket client error on", fullPath, ":", err);
// clean up after error
if (this.clients[fullPath]) {
delete this.clients[fullPath];
}
});
// handle clean socket closure
this.clients[fullPath].on("close", () => {
console.log("[Events] Unix socket client closed:", fullPath);
delete this.clients[fullPath];
});
} catch (e) {
console.error("[Events] Failed to create connection to", fullPath, ":", e);
delete this.clients[fullPath];
}
};
// connect to all sockets, now and in the future
this.watcher = fs.watch(this.socketPath, {}, (eventType, filename) => {
console.log("[Events] Unix socket writer received watch sig", eventType, filename);
connect(filename!);
if (eventType === "rename" && filename?.endsWith(".sock")) {
try {
const fullPath = path.join(this.socketPath, filename!);
if (fs.existsSync(fullPath)) {
connect(filename!);
} else {
if (this.clients[fullPath]) {
console.log("[Events] Unix socket writer detected removed socket:", fullPath);
try {
this.clients[fullPath].destroy();
} catch (e) {
// socket may already be destroyed
}
delete this.clients[fullPath];
}
}
} catch (e) {
// don't
}
}
});
this.watcher.on("error", (err) => {
console.error("[Events] Unix socket watcher error:", err);
});
// connect to existing sockets if any
fs.readdir(this.socketPath, (err, files) => {
if (err) return console.error("[Events] Unix socket writer failed to read directory:", err);
try {
const files = fs.readdirSync(this.socketPath);
console.log("[Events] Unix socket writer found existing sockets:", files);
files.forEach((file) => {
connect(file);
if (file.endsWith(".sock")) {
connect(file);
}
});
});
} catch (err) {
console.error("[Events] Unix socket writer failed to read directory:", err);
}
}
async emit(event: Event) {
if (!this.clients) throw new Error("UnixSocketWriter not initialized");
// check if there are any listeners
const clientCount = Object.entries(this.clients).length;
if (clientCount === 0) {
console.warn("[Events] Unix socket writer has no connected clients to emit to");
return;
}
const tsw = Stopwatch.startNew();
const payloadBuf = Buffer.from(JSON.stringify({ id: (event.guild_id || event.channel_id || event.user_id) as string, event }));
const lenBuf = Buffer.alloc(4);
lenBuf.writeUInt32BE(payloadBuf.length, 0);
const framed = Buffer.concat([lenBuf, payloadBuf]);
for (const socket of Object.entries(this.clients)) {
if (socket[1].destroyed) {
console.log("[Events] Unix socket writer found destroyed socket, removing:", socket[0]);
delete this.clients[socket[0]];
for (const [socketPath, socket] of Object.entries(this.clients)) {
if (socket.destroyed) {
console.log("[Events] Unix socket writer found destroyed socket, removing:", socketPath);
delete this.clients[socketPath];
continue;
}
try {
socket[1].write(framed);
socket.write(framed);
} catch (e) {
console.error("[Events] Unix socket writer failed to write to socket", socket[0], ":", e);
console.error("[Events] Unix socket writer failed to write to socket", socketPath, ":", e);
}
}
@@ -328,4 +427,22 @@ class UnixSocketWriter {
// else it's too noisy
console.log(`[Events] Unix socket writer emitted to ${Object.entries(this.clients).length} sockets in ${tsw.elapsed().totalMilliseconds}ms`);
}
async close() {
console.log("[Events] Closing Unix socket writer");
if (this.watcher) {
this.watcher.close();
this.watcher = undefined;
}
for (const [path, socket] of Object.entries(this.clients)) {
try {
socket.destroy();
} catch (e) {
console.error("[Events] Error closing socket", path, ":", e);
}
}
this.clients = {};
}
}