IPC: unix socket reinit + backlog system

This commit is contained in:
Rory&
2026-02-27 15:39:55 +01:00
parent 98373234c4
commit ab4a8a31ec
+53 -21
View File
@@ -333,6 +333,10 @@ class UnixSocketWriter {
socketPath: string;
clients: { [key: string]: Socket } = {};
watcher?: FSWatcher;
backlog: Event[] = [];
broadcastLock: Promise<void> = Promise.resolve();
replayLock: Promise<void> = Promise.resolve();
isInitializing = true;
constructor(socketPath: string) {
this.socketPath = socketPath;
@@ -442,6 +446,8 @@ class UnixSocketWriter {
} catch (err) {
console.error("[Events] Unix socket writer failed to read directory:", err);
}
this.isInitializing = false;
}
async emit(event: Event) {
@@ -450,33 +456,59 @@ class UnixSocketWriter {
// check if there are any listeners
const clientCount = Object.entries(this.clients).length;
if (clientCount === 0) {
console.warn("[Events] Unix socket writer has no connected clients to emit to");
console.warn("[Events] Unix socket writer has no connected clients to emit to, backlog size:", this.backlog.length + 1);
this.backlog.push(event);
if (!this.isInitializing) {
this.isInitializing = true;
console.log("[Events] Re-initializing unix socket writer due to new event with no listeners");
await this.close();
await this.init();
}
return;
}
const tsw = Stopwatch.startNew();
const payloadBuf = Buffer.from(JSON.stringify({ id: (event.guild_id || event.channel_id || event.user_id) as string, event }));
const lenBuf = Buffer.alloc(4);
lenBuf.writeUInt32BE(payloadBuf.length, 0);
const framed = Buffer.concat([lenBuf, payloadBuf]);
await this.replayLock;
await (this.replayLock = Promise.resolve().then(async () => {
if (this.backlog.length > 0) {
console.log(`[Events] Replaying ${this.backlog.length} backlog events`);
for (const backlogEvent of this.backlog) {
await this.broadcast(backlogEvent);
}
this.backlog = [];
}
}));
for (const [socketPath, socket] of Object.entries(this.clients)) {
if (socket.destroyed) {
console.log("[Events] Unix socket writer found destroyed socket, removing:", socketPath);
delete this.clients[socketPath];
continue;
await this.broadcast(event);
}
private async broadcast(event: Event) {
await this.broadcastLock;
return await (this.broadcastLock = new Promise((res) => {
const tsw = Stopwatch.startNew();
const payloadBuf = Buffer.from(JSON.stringify({ id: (event.guild_id || event.channel_id || event.user_id) as string, event }));
const lenBuf = Buffer.alloc(4);
lenBuf.writeUInt32BE(payloadBuf.length, 0);
const framed = Buffer.concat([lenBuf, payloadBuf]);
for (const [socketPath, socket] of Object.entries(this.clients)) {
if (socket.destroyed) {
console.log("[Events] Unix socket writer found destroyed socket, removing:", socketPath);
delete this.clients[socketPath];
continue;
}
try {
socket.write(framed);
} catch (e) {
console.error("[Events] Unix socket writer failed to write to socket", socketPath, ":", e);
}
}
try {
socket.write(framed);
} catch (e) {
console.error("[Events] Unix socket writer failed to write to socket", socketPath, ":", e);
}
}
if (tsw.elapsed().totalMilliseconds > 5)
// else it's too noisy
console.log(`[Events] Unix socket writer emitted to ${Object.entries(this.clients).length} sockets in ${tsw.elapsed().totalMilliseconds}ms`);
if (tsw.elapsed().totalMilliseconds > 5)
// else it's too noisy
console.log(`[Events] Unix socket writer emitted to ${Object.entries(this.clients).length} sockets in ${tsw.elapsed().totalMilliseconds}ms`);
res();
}));
}
async close() {