diff --git a/src/util/util/Event.ts b/src/util/util/Event.ts index e2be9b196..4ec1cb5f9 100644 --- a/src/util/util/Event.ts +++ b/src/util/util/Event.ts @@ -194,6 +194,19 @@ class UnixSocketListener { async init() { const net = await import("net"); + const fs = await import("fs"); + + // remove stale socket file if it exists + // can happen if there's a PID conflict (across containers/PID namespaces) + try { + if (fs.existsSync(this.socketPath)) { + fs.unlinkSync(this.socketPath); + console.log("[Events] Removed stale socket file:", this.socketPath); + } + } catch (e) { + console.error("[Events] Failed to remove stale socket:", e); + } + const server = net.createServer((socket) => { socket.on("connect", () => { console.log("[Events] Unix socket client connected"); @@ -229,6 +242,14 @@ class UnixSocketListener { const shutdown = () => { console.log("[Events] Closing unix socket server"); server.close(); + + // clean up socket file + try { + fs.unlinkSync(this.socketPath); + } catch (e) { + console.error("[Events] Failed to unlink socket file:", e); + } + process.exit(0); }; for (const sig of ["SIGINT", "SIGTERM", "SIGQUIT"] as const) { @@ -260,7 +281,7 @@ class UnixSocketListener { class UnixSocketWriter { socketPath: string; clients: { [key: string]: Socket } = {}; - watcher: FSWatcher; + watcher?: FSWatcher; constructor(socketPath: string) { this.socketPath = socketPath; @@ -276,51 +297,129 @@ class UnixSocketWriter { const connect = (file: string) => { const fullPath = path.join(this.socketPath, file); - this.clients[fullPath] = net.createConnection(fullPath, () => { - console.log("[Events] Unix socket client connected to", fullPath); - }); - this.clients[fullPath].on("error", (err) => { - console.error("[Events] Unix socket client error on", fullPath, ":", err); - }); + // avoid duplicate connections + if (this.clients[fullPath] && !this.clients[fullPath].destroyed) { + console.log("[Events] Unix socket client already connected to", fullPath); + return; + } + + // clean up old connection if it exists + if (this.clients[fullPath]) { + try { + this.clients[fullPath].destroy(); + } catch (e) { + // ignore + } + delete this.clients[fullPath]; + } + + // check if it's actually a socket file (not a ghost/regular file) + try { + const stats = fs.statSync(fullPath); + if (!stats.isSocket()) { + console.log("[Events] Ignoring non-socket file:", fullPath); + return; + } + } catch (e) { + console.log("[Events] Cannot stat socket file:", fullPath); + return; + } + + try { + this.clients[fullPath] = net.createConnection(fullPath, () => { + console.log("[Events] Unix socket client connected to", fullPath); + }); + + this.clients[fullPath].on("error", (err) => { + console.error("[Events] Unix socket client error on", fullPath, ":", err); + // clean up after error + if (this.clients[fullPath]) { + delete this.clients[fullPath]; + } + }); + + // handle clean socket closure + this.clients[fullPath].on("close", () => { + console.log("[Events] Unix socket client closed:", fullPath); + delete this.clients[fullPath]; + }); + } catch (e) { + console.error("[Events] Failed to create connection to", fullPath, ":", e); + delete this.clients[fullPath]; + } }; // connect to all sockets, now and in the future this.watcher = fs.watch(this.socketPath, {}, (eventType, filename) => { console.log("[Events] Unix socket writer received watch sig", eventType, filename); - connect(filename!); + if (eventType === "rename" && filename?.endsWith(".sock")) { + try { + const fullPath = path.join(this.socketPath, filename!); + if (fs.existsSync(fullPath)) { + connect(filename!); + } else { + if (this.clients[fullPath]) { + console.log("[Events] Unix socket writer detected removed socket:", fullPath); + try { + this.clients[fullPath].destroy(); + } catch (e) { + // socket may already be destroyed + } + delete this.clients[fullPath]; + } + } + } catch (e) { + // don't + } + } + }); + + this.watcher.on("error", (err) => { + console.error("[Events] Unix socket watcher error:", err); }); // connect to existing sockets if any - fs.readdir(this.socketPath, (err, files) => { - if (err) return console.error("[Events] Unix socket writer failed to read directory:", err); - + try { + const files = fs.readdirSync(this.socketPath); console.log("[Events] Unix socket writer found existing sockets:", files); files.forEach((file) => { - connect(file); + if (file.endsWith(".sock")) { + connect(file); + } }); - }); + } catch (err) { + console.error("[Events] Unix socket writer failed to read directory:", err); + } } async emit(event: Event) { if (!this.clients) throw new Error("UnixSocketWriter not initialized"); + // check if there are any listeners + const clientCount = Object.entries(this.clients).length; + if (clientCount === 0) { + console.warn("[Events] Unix socket writer has no connected clients to emit to"); + return; + } + const tsw = Stopwatch.startNew(); const payloadBuf = Buffer.from(JSON.stringify({ id: (event.guild_id || event.channel_id || event.user_id) as string, event })); const lenBuf = Buffer.alloc(4); lenBuf.writeUInt32BE(payloadBuf.length, 0); const framed = Buffer.concat([lenBuf, payloadBuf]); - for (const socket of Object.entries(this.clients)) { - if (socket[1].destroyed) { - console.log("[Events] Unix socket writer found destroyed socket, removing:", socket[0]); - delete this.clients[socket[0]]; + + for (const [socketPath, socket] of Object.entries(this.clients)) { + if (socket.destroyed) { + console.log("[Events] Unix socket writer found destroyed socket, removing:", socketPath); + delete this.clients[socketPath]; continue; } try { - socket[1].write(framed); + socket.write(framed); } catch (e) { - console.error("[Events] Unix socket writer failed to write to socket", socket[0], ":", e); + console.error("[Events] Unix socket writer failed to write to socket", socketPath, ":", e); } } @@ -328,4 +427,22 @@ class UnixSocketWriter { // else it's too noisy console.log(`[Events] Unix socket writer emitted to ${Object.entries(this.clients).length} sockets in ${tsw.elapsed().totalMilliseconds}ms`); } + + async close() { + console.log("[Events] Closing Unix socket writer"); + + if (this.watcher) { + this.watcher.close(); + this.watcher = undefined; + } + + for (const [path, socket] of Object.entries(this.clients)) { + try { + socket.destroy(); + } catch (e) { + console.error("[Events] Error closing socket", path, ":", e); + } + } + this.clients = {}; + } }