A background queue for kicking (#296)

A background queue for kicking
This commit is contained in:
David Teller
2022-06-08 11:49:43 +02:00
committed by GitHub
parent 558cbb3cae
commit 0eea04bd69
6 changed files with 308 additions and 3 deletions

View File

@@ -116,6 +116,13 @@ protectedRooms:
# Explicitly add these rooms as a protected room list if you want them protected.
protectAllJoinedRooms: false
# Increase this delay to have Mjölnir wait longer between two consecutive backgrounded
# operations. The total duration of operations will be longer, but the homeserver won't
# be affected as much. Conversely, decrease this delay to have Mjölnir chain operations
# faster. The total duration of operations will generally be shorter, but the performance
# of the homeserver may be more impacted.
backgroundDelayMS: 500
# Server administration commands, these commands will only work if Mjolnir is
# a global server administrator, and the bot's server is a Synapse instance.
admin:

View File

@@ -48,6 +48,7 @@ import { replaceRoomIdsWithPills } from "./utils";
import RuleServer from "./models/RuleServer";
import { RoomMemberManager } from "./RoomMembers";
import { ProtectedRoomActivityTracker } from "./queues/ProtectedRoomActivityTracker";
import { ThrottlingQueue } from "./queues/ThrottlingQueue";
const levelToFn = {
[LogLevel.DEBUG.toString()]: LogService.debug,
@@ -95,6 +96,8 @@ export class Mjolnir {
private unprotectedWatchedListRooms: string[] = [];
private webapis: WebAPIs;
private protectedRoomActivityTracker: ProtectedRoomActivityTracker;
public taskQueue: ThrottlingQueue;
/**
* Adds a listener to the client that will automatically accept invitations.
* @param {MatrixClient} client
@@ -258,6 +261,8 @@ export class Mjolnir {
// Setup join/leave listener
this.roomJoins = new RoomMemberManager(this.client);
this.taskQueue = new ThrottlingQueue(this, config.backgroundDelayMS);
}
public get lists(): BanList[] {

View File

@@ -23,7 +23,7 @@ export async function execKickCommand(roomId: string, event: any, mjolnir: Mjoln
const userId = parts[2];
let rooms = [...Object.keys(mjolnir.protectedRooms)];
let reason;
let reason: string | undefined;
if (parts.length > 3) {
let reasonIndex = 3;
if (parts[3].startsWith("#") || parts[3].startsWith("!")) {
@@ -40,11 +40,13 @@ export async function execKickCommand(roomId: string, event: any, mjolnir: Mjoln
await mjolnir.logMessage(LogLevel.INFO, "KickCommand", `Kicking ${userId} in ${targetRoomId} for ${reason}`, targetRoomId);
if (!config.noop) {
await mjolnir.client.kickUser(userId, targetRoomId, reason);
await mjolnir.taskQueue.push(async () => {
return mjolnir.client.kickUser(userId, targetRoomId, reason);
});
} else {
await mjolnir.logMessage(LogLevel.WARN, "KickCommand", `Tried to kick ${userId} in ${targetRoomId} but the bot is running in no-op mode.`, targetRoomId);
}
}
await mjolnir.client.unstableApis.addReactionToEvent(roomId, event['event_id'], '✅');
return mjolnir.client.unstableApis.addReactionToEvent(roomId, event['event_id'], '✅');
}

View File

@@ -48,6 +48,11 @@ interface IConfig {
fasterMembershipChecks: boolean;
automaticallyRedactForReasons: string[]; // case-insensitive globs
protectAllJoinedRooms: boolean;
/**
* Backgrounded tasks: number of milliseconds to wait between the completion
* of one background task and the start of the next one.
*/
backgroundDelayMS: number;
admin?: {
enableMakeRoomAdminCommand?: boolean;
}
@@ -116,6 +121,7 @@ const defaultConfig: IConfig = {
fasterMembershipChecks: false,
automaticallyRedactForReasons: ["spam", "advertising"],
protectAllJoinedRooms: false,
backgroundDelayMS: 500,
commands: {
allowNoPrefix: false,
additionalPrefixes: [],

View File

@@ -0,0 +1,201 @@
/*
Copyright 2022 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import { extractRequestError, LogLevel } from "matrix-bot-sdk";
import { Mjolnir } from "../Mjolnir";
export type Task<T> = (queue: ThrottlingQueue) => Promise<T>;
/**
* A queue for backgrounding tasks without hammering servers too much.
*/
export class ThrottlingQueue {
/**
* The pending tasks.
*/
private _tasks: (() => Promise<void>)[] | null;
/**
* A timeout for the next task to execute.
*/
private timeout: ReturnType<typeof setTimeout> | null;
/**
* How long we should wait between the completion of a tasks and the start of the next task.
* Any >=0 number is good.
*/
private _delayMS: number;
/**
* Construct an empty queue.
*
* This queue will start executing whenever `push()` is called and stop
* whenever it runs out of tasks to execute.
*
* @param delayMS The default delay between executing two tasks, in ms.
*/
constructor(private mjolnir: Mjolnir, delayMS: number) {
this.timeout = null;
this.delayMS = delayMS;
this._tasks = [];
}
/**
* Stop the queue, make sure we can never use it again.
*/
public dispose() {
this.stop();
this._tasks = null;
}
/**
* The number of tasks waiting to be executed.
*/
get length(): number {
return this.tasks.length;
}
/**
* Push a new task onto the queue.
*
* @param task Some code to execute.
* @return A promise resolved/rejected once `task` is complete.
*/
public push<T>(task: Task<T>): Promise<T> {
// Wrap `task` into a `Promise` to inform enqueuer when
// the task is complete.
return new Promise((resolve, reject) => {
const wrapper = async () => {
try {
const result: T = await task(this);
resolve(result);
} catch (ex) {
reject(ex);
};
};
this.tasks.push(wrapper);
this.start();
});
}
/**
* Block a queue for a number of milliseconds.
*
* This method is meant typically to be used by a `Task` that receives a 429 (Too Many Requests) to reschedule
* itself for later, after giving the server a little room to breathe. If you need this, do not forget to
* re-`push()` with the failing `Task`. You may call `block()` and `push()` in either order.
*
* @param durationMS A number of milliseconds to wait until resuming operations.
*/
public block(durationMS: number) {
if (!this.tasks) {
throw new TypeError("Cannot `block()` on a ThrottlingQueue that has already been disposed of.");
}
this.stop();
this.timeout = setTimeout(async () => this.step(), durationMS);
}
/**
* Start the loop to execute pending tasks.
*
* Does nothing if the loop is already started.
*/
private start() {
if (this.timeout) {
// Already started.
return;
}
if (!this.tasks.length) {
// Nothing to do.
return;
}
this.timeout = setTimeout(async () => this.step(), this._delayMS);
}
/**
* Stop the loop to execute pending tasks.
*
* Does nothing if the loop is already stopped. A loop stopped with `stop()` may be
* resumed by calling `push()` or `start()`.
*/
private stop() {
if (!this.timeout) {
// Already stopped.
return;
}
clearTimeout(this.timeout);
this.timeout = null;
}
/**
* Change the delay between completion of an event and the start of the next event.
*
* This will be used next time a task is completed.
*/
set delayMS(delayMS: number) {
if (delayMS < 0) {
throw new TypeError(`Invalid delay ${delayMS}. Need a non-negative number of ms.`);
}
this._delayMS = delayMS;
}
/**
* Return the delay between completion of an event and the start of the next event.
*/
get delayMS(): number {
return this._delayMS;
}
/**
* Execute one step of the loop, then prepare the following step.
*
* 1. If there is no task, do nothing and stop.
* 2. Otherwise, execute task.
* 3. Once task is complete (whether succeeded or failed), retrigger the loop.
*/
private async step() {
// Pull task.
const task = this.tasks.shift();
if (!task) {
// Nothing to do.
// Stop the loop until we have something to do.
this.stop();
return;
}
try {
await task();
} catch (ex) {
await this.mjolnir.logMessage(
LogLevel.WARN,
'Error while executing task',
extractRequestError(ex)
);
} finally {
this.stop();
this.start();
}
}
/**
* Return `tasks`, unless the queue has been disposed of.
*/
private get tasks(): (() => Promise<void>)[] {
if (this._tasks === null) {
throw new TypeError("This Throttling Queue has been disposed of and shouldn't be used anymore");
}
return this._tasks;
}
}

View File

@@ -0,0 +1,84 @@
import { strict as assert } from "assert";
import { UserID } from "matrix-bot-sdk";
import { ThrottlingQueue } from "../../src/queues/ThrottlingQueue";
describe("Test: ThrottlingQueue", function() {
it("Tasks enqueued with `push()` are executed exactly once and in the right order", async function() {
this.timeout(20000);
const queue = new ThrottlingQueue(this.mjolnir, 10);
let state = new Map();
let promises: Promise<void>[] = [];
for (let counter = 0; counter < 10; ++counter) {
const i = counter;
const promise = queue.push(async () => {
if (state.get(i)) {
throw new Error(`We shouldn't have set state[${i}] yet`);
}
state.set(i, true);
for (let j = 0; j < i; ++j) {
if (!state.get(j)) {
throw new Error(`We should have set state[${j}] already`);
}
}
});
promises.push(promise);
}
await Promise.all(promises);
for (let i = 0; i < 10; ++i) {
if (!state.get(i)) {
throw new Error(`This is the end of the test, we should have set state[${i}]`);
}
}
// Give code a little bit more time to trip itself, in case `promises` are accidentally
// resolved too early.
await new Promise(resolve => setTimeout(resolve, 1000));
queue.dispose();
});
it("Tasks enqueued with `push()` are executed exactly once and in the right order, even if we call `block()` at some point", async function() {
this.timeout(20000);
const queue = new ThrottlingQueue(this.mjolnir, 10);
let state = new Map();
let promises: Promise<void>[] = [];
for (let counter = 0; counter < 10; ++counter) {
const i = counter;
promises.push(queue.push(async () => {
if (state.get(i)) {
throw new Error(`We shouldn't have set state[${i}] yet`);
}
state.set(i, true);
for (let j = 0; j < i; ++j) {
queue.block(100);
if (!state.get(j)) {
throw new Error(`We should have set state[${j}] already`);
}
}
if (i % 2 === 0) {
// Arbitrary call to `delay()`.
queue.block(20);
}
}));
}
queue.block(100);
await Promise.all(promises);
for (let i = 0; i < 10; ++i) {
if (!state.get(i)) {
throw new Error(`This is the end of the test, we should have set state[${i}]`);
}
}
// Give code a little bit more time to trip itself, in case `promises` are accidentally
// resolved too early.
await new Promise(resolve => setTimeout(resolve, 1000));
queue.dispose();
});
});