From ab4a8a31ec6825f865c40baa611b26e10a0b32c8 Mon Sep 17 00:00:00 2001 From: Rory& Date: Fri, 27 Feb 2026 15:39:55 +0100 Subject: [PATCH] IPC: unix socket reinit + backlog system --- src/util/util/Event.ts | 74 ++++++++++++++++++++++++++++++------------ 1 file changed, 53 insertions(+), 21 deletions(-) diff --git a/src/util/util/Event.ts b/src/util/util/Event.ts index 08507dc76..2e6cf8206 100644 --- a/src/util/util/Event.ts +++ b/src/util/util/Event.ts @@ -333,6 +333,10 @@ class UnixSocketWriter { socketPath: string; clients: { [key: string]: Socket } = {}; watcher?: FSWatcher; + backlog: Event[] = []; + broadcastLock: Promise = Promise.resolve(); + replayLock: Promise = Promise.resolve(); + isInitializing = true; constructor(socketPath: string) { this.socketPath = socketPath; @@ -442,6 +446,8 @@ class UnixSocketWriter { } catch (err) { console.error("[Events] Unix socket writer failed to read directory:", err); } + + this.isInitializing = false; } async emit(event: Event) { @@ -450,33 +456,59 @@ class UnixSocketWriter { // check if there are any listeners const clientCount = Object.entries(this.clients).length; if (clientCount === 0) { - console.warn("[Events] Unix socket writer has no connected clients to emit to"); + console.warn("[Events] Unix socket writer has no connected clients to emit to, backlog size:", this.backlog.length + 1); + this.backlog.push(event); + if (!this.isInitializing) { + this.isInitializing = true; + console.log("[Events] Re-initializing unix socket writer due to new event with no listeners"); + await this.close(); + await this.init(); + } return; } - const tsw = Stopwatch.startNew(); - const payloadBuf = Buffer.from(JSON.stringify({ id: (event.guild_id || event.channel_id || event.user_id) as string, event })); - const lenBuf = Buffer.alloc(4); - lenBuf.writeUInt32BE(payloadBuf.length, 0); - const framed = Buffer.concat([lenBuf, payloadBuf]); + await this.replayLock; + await (this.replayLock = Promise.resolve().then(async () => { + if (this.backlog.length > 0) { + console.log(`[Events] Replaying ${this.backlog.length} backlog events`); + for (const backlogEvent of this.backlog) { + await this.broadcast(backlogEvent); + } + this.backlog = []; + } + })); - for (const [socketPath, socket] of Object.entries(this.clients)) { - if (socket.destroyed) { - console.log("[Events] Unix socket writer found destroyed socket, removing:", socketPath); - delete this.clients[socketPath]; - continue; + await this.broadcast(event); + } + + private async broadcast(event: Event) { + await this.broadcastLock; + return await (this.broadcastLock = new Promise((res) => { + const tsw = Stopwatch.startNew(); + const payloadBuf = Buffer.from(JSON.stringify({ id: (event.guild_id || event.channel_id || event.user_id) as string, event })); + const lenBuf = Buffer.alloc(4); + lenBuf.writeUInt32BE(payloadBuf.length, 0); + const framed = Buffer.concat([lenBuf, payloadBuf]); + + for (const [socketPath, socket] of Object.entries(this.clients)) { + if (socket.destroyed) { + console.log("[Events] Unix socket writer found destroyed socket, removing:", socketPath); + delete this.clients[socketPath]; + continue; + } + + try { + socket.write(framed); + } catch (e) { + console.error("[Events] Unix socket writer failed to write to socket", socketPath, ":", e); + } } - try { - socket.write(framed); - } catch (e) { - console.error("[Events] Unix socket writer failed to write to socket", socketPath, ":", e); - } - } - - if (tsw.elapsed().totalMilliseconds > 5) - // else it's too noisy - console.log(`[Events] Unix socket writer emitted to ${Object.entries(this.clients).length} sockets in ${tsw.elapsed().totalMilliseconds}ms`); + if (tsw.elapsed().totalMilliseconds > 5) + // else it's too noisy + console.log(`[Events] Unix socket writer emitted to ${Object.entries(this.clients).length} sockets in ${tsw.elapsed().totalMilliseconds}ms`); + res(); + })); } async close() {