From 0eea04bd690faebb914bfec2b192ecc5ce718baa Mon Sep 17 00:00:00 2001 From: David Teller Date: Wed, 8 Jun 2022 11:49:43 +0200 Subject: [PATCH] A background queue for kicking (#296) A background queue for kicking --- config/default.yaml | 7 + src/Mjolnir.ts | 5 + src/commands/KickCommand.ts | 8 +- src/config.ts | 6 + src/queues/ThrottlingQueue.ts | 201 ++++++++++++++++++++++++++ test/integration/throttleQueueTest.ts | 84 +++++++++++ 6 files changed, 308 insertions(+), 3 deletions(-) create mode 100644 src/queues/ThrottlingQueue.ts create mode 100644 test/integration/throttleQueueTest.ts diff --git a/config/default.yaml b/config/default.yaml index bac872b..dcaae67 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -116,6 +116,13 @@ protectedRooms: # Explicitly add these rooms as a protected room list if you want them protected. protectAllJoinedRooms: false +# Increase this delay to have Mjölnir wait longer between two consecutive backgrounded +# operations. The total duration of operations will be longer, but the homeserver won't +# be affected as much. Conversely, decrease this delay to have Mjölnir chain operations +# faster. The total duration of operations will generally be shorter, but the performance +# of the homeserver may be more impacted. +backgroundDelayMS: 500 + # Server administration commands, these commands will only work if Mjolnir is # a global server administrator, and the bot's server is a Synapse instance. admin: diff --git a/src/Mjolnir.ts b/src/Mjolnir.ts index 29f0b8c..ce3dd23 100644 --- a/src/Mjolnir.ts +++ b/src/Mjolnir.ts @@ -48,6 +48,7 @@ import { replaceRoomIdsWithPills } from "./utils"; import RuleServer from "./models/RuleServer"; import { RoomMemberManager } from "./RoomMembers"; import { ProtectedRoomActivityTracker } from "./queues/ProtectedRoomActivityTracker"; +import { ThrottlingQueue } from "./queues/ThrottlingQueue"; const levelToFn = { [LogLevel.DEBUG.toString()]: LogService.debug, @@ -95,6 +96,8 @@ export class Mjolnir { private unprotectedWatchedListRooms: string[] = []; private webapis: WebAPIs; private protectedRoomActivityTracker: ProtectedRoomActivityTracker; + public taskQueue: ThrottlingQueue; + /** * Adds a listener to the client that will automatically accept invitations. * @param {MatrixClient} client @@ -258,6 +261,8 @@ export class Mjolnir { // Setup join/leave listener this.roomJoins = new RoomMemberManager(this.client); + + this.taskQueue = new ThrottlingQueue(this, config.backgroundDelayMS); } public get lists(): BanList[] { diff --git a/src/commands/KickCommand.ts b/src/commands/KickCommand.ts index d0fca6a..deec02c 100644 --- a/src/commands/KickCommand.ts +++ b/src/commands/KickCommand.ts @@ -23,7 +23,7 @@ export async function execKickCommand(roomId: string, event: any, mjolnir: Mjoln const userId = parts[2]; let rooms = [...Object.keys(mjolnir.protectedRooms)]; - let reason; + let reason: string | undefined; if (parts.length > 3) { let reasonIndex = 3; if (parts[3].startsWith("#") || parts[3].startsWith("!")) { @@ -40,11 +40,13 @@ export async function execKickCommand(roomId: string, event: any, mjolnir: Mjoln await mjolnir.logMessage(LogLevel.INFO, "KickCommand", `Kicking ${userId} in ${targetRoomId} for ${reason}`, targetRoomId); if (!config.noop) { - await mjolnir.client.kickUser(userId, targetRoomId, reason); + await mjolnir.taskQueue.push(async () => { + return mjolnir.client.kickUser(userId, targetRoomId, reason); + }); } else { await mjolnir.logMessage(LogLevel.WARN, "KickCommand", `Tried to kick ${userId} in ${targetRoomId} but the bot is running in no-op mode.`, targetRoomId); } } - await mjolnir.client.unstableApis.addReactionToEvent(roomId, event['event_id'], '✅'); + return mjolnir.client.unstableApis.addReactionToEvent(roomId, event['event_id'], '✅'); } diff --git a/src/config.ts b/src/config.ts index 9a60560..dcaffb7 100644 --- a/src/config.ts +++ b/src/config.ts @@ -48,6 +48,11 @@ interface IConfig { fasterMembershipChecks: boolean; automaticallyRedactForReasons: string[]; // case-insensitive globs protectAllJoinedRooms: boolean; + /** + * Backgrounded tasks: number of milliseconds to wait between the completion + * of one background task and the start of the next one. + */ + backgroundDelayMS: number; admin?: { enableMakeRoomAdminCommand?: boolean; } @@ -116,6 +121,7 @@ const defaultConfig: IConfig = { fasterMembershipChecks: false, automaticallyRedactForReasons: ["spam", "advertising"], protectAllJoinedRooms: false, + backgroundDelayMS: 500, commands: { allowNoPrefix: false, additionalPrefixes: [], diff --git a/src/queues/ThrottlingQueue.ts b/src/queues/ThrottlingQueue.ts new file mode 100644 index 0000000..b63c9c8 --- /dev/null +++ b/src/queues/ThrottlingQueue.ts @@ -0,0 +1,201 @@ +/* +Copyright 2022 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import { extractRequestError, LogLevel } from "matrix-bot-sdk"; +import { Mjolnir } from "../Mjolnir"; + +export type Task = (queue: ThrottlingQueue) => Promise; + +/** + * A queue for backgrounding tasks without hammering servers too much. + */ +export class ThrottlingQueue { + /** + * The pending tasks. + */ + private _tasks: (() => Promise)[] | null; + + /** + * A timeout for the next task to execute. + */ + private timeout: ReturnType | null; + + /** + * How long we should wait between the completion of a tasks and the start of the next task. + * Any >=0 number is good. + */ + private _delayMS: number; + + /** + * Construct an empty queue. + * + * This queue will start executing whenever `push()` is called and stop + * whenever it runs out of tasks to execute. + * + * @param delayMS The default delay between executing two tasks, in ms. + */ + constructor(private mjolnir: Mjolnir, delayMS: number) { + this.timeout = null; + this.delayMS = delayMS; + this._tasks = []; + } + + /** + * Stop the queue, make sure we can never use it again. + */ + public dispose() { + this.stop(); + this._tasks = null; + } + + /** + * The number of tasks waiting to be executed. + */ + get length(): number { + return this.tasks.length; + } + + /** + * Push a new task onto the queue. + * + * @param task Some code to execute. + * @return A promise resolved/rejected once `task` is complete. + */ + public push(task: Task): Promise { + // Wrap `task` into a `Promise` to inform enqueuer when + // the task is complete. + return new Promise((resolve, reject) => { + const wrapper = async () => { + try { + const result: T = await task(this); + resolve(result); + } catch (ex) { + reject(ex); + }; + }; + this.tasks.push(wrapper); + this.start(); + }); + } + + /** + * Block a queue for a number of milliseconds. + * + * This method is meant typically to be used by a `Task` that receives a 429 (Too Many Requests) to reschedule + * itself for later, after giving the server a little room to breathe. If you need this, do not forget to + * re-`push()` with the failing `Task`. You may call `block()` and `push()` in either order. + * + * @param durationMS A number of milliseconds to wait until resuming operations. + */ + public block(durationMS: number) { + if (!this.tasks) { + throw new TypeError("Cannot `block()` on a ThrottlingQueue that has already been disposed of."); + } + this.stop(); + this.timeout = setTimeout(async () => this.step(), durationMS); + } + + /** + * Start the loop to execute pending tasks. + * + * Does nothing if the loop is already started. + */ + private start() { + if (this.timeout) { + // Already started. + return; + } + if (!this.tasks.length) { + // Nothing to do. + return; + } + this.timeout = setTimeout(async () => this.step(), this._delayMS); + } + + /** + * Stop the loop to execute pending tasks. + * + * Does nothing if the loop is already stopped. A loop stopped with `stop()` may be + * resumed by calling `push()` or `start()`. + */ + private stop() { + if (!this.timeout) { + // Already stopped. + return; + } + clearTimeout(this.timeout); + this.timeout = null; + } + + /** + * Change the delay between completion of an event and the start of the next event. + * + * This will be used next time a task is completed. + */ + set delayMS(delayMS: number) { + if (delayMS < 0) { + throw new TypeError(`Invalid delay ${delayMS}. Need a non-negative number of ms.`); + } + this._delayMS = delayMS; + } + + /** + * Return the delay between completion of an event and the start of the next event. + */ + get delayMS(): number { + return this._delayMS; + } + + /** + * Execute one step of the loop, then prepare the following step. + * + * 1. If there is no task, do nothing and stop. + * 2. Otherwise, execute task. + * 3. Once task is complete (whether succeeded or failed), retrigger the loop. + */ + private async step() { + // Pull task. + const task = this.tasks.shift(); + if (!task) { + // Nothing to do. + // Stop the loop until we have something to do. + this.stop(); + return; + } + try { + await task(); + } catch (ex) { + await this.mjolnir.logMessage( + LogLevel.WARN, + 'Error while executing task', + extractRequestError(ex) + ); + } finally { + this.stop(); + this.start(); + } + } + + /** + * Return `tasks`, unless the queue has been disposed of. + */ + private get tasks(): (() => Promise)[] { + if (this._tasks === null) { + throw new TypeError("This Throttling Queue has been disposed of and shouldn't be used anymore"); + } + return this._tasks; + } +} diff --git a/test/integration/throttleQueueTest.ts b/test/integration/throttleQueueTest.ts new file mode 100644 index 0000000..829e3f3 --- /dev/null +++ b/test/integration/throttleQueueTest.ts @@ -0,0 +1,84 @@ +import { strict as assert } from "assert"; + +import { UserID } from "matrix-bot-sdk"; +import { ThrottlingQueue } from "../../src/queues/ThrottlingQueue"; + +describe("Test: ThrottlingQueue", function() { + it("Tasks enqueued with `push()` are executed exactly once and in the right order", async function() { + this.timeout(20000); + + const queue = new ThrottlingQueue(this.mjolnir, 10); + let state = new Map(); + let promises: Promise[] = []; + for (let counter = 0; counter < 10; ++counter) { + const i = counter; + const promise = queue.push(async () => { + if (state.get(i)) { + throw new Error(`We shouldn't have set state[${i}] yet`); + } + state.set(i, true); + for (let j = 0; j < i; ++j) { + if (!state.get(j)) { + throw new Error(`We should have set state[${j}] already`); + } + } + }); + promises.push(promise); + } + await Promise.all(promises); + for (let i = 0; i < 10; ++i) { + if (!state.get(i)) { + throw new Error(`This is the end of the test, we should have set state[${i}]`); + } + } + + // Give code a little bit more time to trip itself, in case `promises` are accidentally + // resolved too early. + await new Promise(resolve => setTimeout(resolve, 1000)); + + queue.dispose(); + }); + + it("Tasks enqueued with `push()` are executed exactly once and in the right order, even if we call `block()` at some point", async function() { + this.timeout(20000); + const queue = new ThrottlingQueue(this.mjolnir, 10); + let state = new Map(); + let promises: Promise[] = []; + for (let counter = 0; counter < 10; ++counter) { + const i = counter; + promises.push(queue.push(async () => { + if (state.get(i)) { + throw new Error(`We shouldn't have set state[${i}] yet`); + } + state.set(i, true); + for (let j = 0; j < i; ++j) { + queue.block(100); + if (!state.get(j)) { + throw new Error(`We should have set state[${j}] already`); + } + } + if (i % 2 === 0) { + // Arbitrary call to `delay()`. + queue.block(20); + } + })); + } + + queue.block(100); + + await Promise.all(promises); + for (let i = 0; i < 10; ++i) { + if (!state.get(i)) { + throw new Error(`This is the end of the test, we should have set state[${i}]`); + } + } + + // Give code a little bit more time to trip itself, in case `promises` are accidentally + // resolved too early. + await new Promise(resolve => setTimeout(resolve, 1000)); + + queue.dispose(); + }); +}); + +