diff --git a/package.json b/package.json index ee24bd188..4c56edc01 100644 --- a/package.json +++ b/package.json @@ -24,7 +24,7 @@ "generate:openapi": "node scripts/openapi.js", "add:license": "node scripts/license.js", "migrate-from-staging": "node -r dotenv/config -r module-alias/register scripts/stagingMigration/index.js", - "node:tests": "npm run build:src && node -r dotenv/config -r module-alias/register --enable-source-maps --test --experimental-test-coverage dist/**/*.test.js" + "node:tests": "npm run build:src:tsgo && node -r dotenv/config -r module-alias/register --enable-source-maps --test --experimental-test-coverage dist/**/*.test.js" }, "main": "dist/bundle/index.js", "types": "src/bundle/index.ts", diff --git a/src/util/util/json/JsonNode.ts b/src/util/util/json/JsonNode.ts new file mode 100644 index 000000000..0ed4c16df --- /dev/null +++ b/src/util/util/json/JsonNode.ts @@ -0,0 +1,31 @@ +/// Defines the various JSON tokens that make up a JSON text. +export enum JsonTokenType { + /// There is no value (as distinct from ). This is the default token type if no data has been read by the . + None, + /// The token type is the start of a JSON object. + StartObject, + /// The token type is the end of a JSON object. + EndObject, + /// The token type is the start of a JSON array. + StartArray, + /// The token type is the end of a JSON array. + EndArray, + /// The token type is a JSON property name. + PropertyName, + /// The token type is a comment string. + Comment, + /// The token type is a JSON string. + String, + /// The token type is a JSON number. + Number, + /// The token type is the JSON literal true. + True, + /// The token type is the JSON literal false. + False, + /// The token type is the JSON literal null. + Null, +} + +export class JsonNode { + type: JsonTokenType = JsonTokenType.None; +} diff --git a/src/util/util/json/JsonSerializer.test.ts b/src/util/util/json/JsonSerializer.test.ts new file mode 100644 index 000000000..31624db7d --- /dev/null +++ b/src/util/util/json/JsonSerializer.test.ts @@ -0,0 +1,158 @@ +import { JsonSerializer } from "./JsonSerializer"; +import { describe, it } from "node:test"; +import { strict as assert } from "assert"; +import fs from "fs/promises"; +import { Stopwatch } from "../Stopwatch"; +import { JsonValue } from "@protobuf-ts/runtime"; + +describe("JsonSerializer", () => { + it("should serialize synchronously", () => { + const obj = { a: 1, b: "test" }; + const result = JsonSerializer.Serialize(obj); + assert.equal(result, '{"a":1,"b":"test"}'); + }); + + it("should deserialize synchronously", () => { + const json = '{"a":1,"b":"test"}'; + const result = JsonSerializer.Deserialize(json); + assert.deepEqual(result, { a: 1, b: "test" }); + }); + + it("should serialize asynchronously", async () => { + const obj = { a: 1, b: "test" }; + const result = await JsonSerializer.SerializeAsync(obj); + assert.equal(result, '{"a":1,"b":"test"}'); + }); + + it("should deserialize asynchronously", async () => { + const json = '{"a":1,"b":"test"}'; + const result = await JsonSerializer.DeserializeAsync(json); + assert.deepEqual(result, { a: 1, b: "test" }); + }); + + it("should be able to read large file", async () => { + // write a massive json file + const sw = Stopwatch.startNew(); + const jsonfile = await fs.open("large.json", "w"); + await jsonfile.write("["); + const getLargeObj = (index: number, depth: number) => { + const obj: JsonValue = {}; + + if (depth === 0) { + return obj; + } + for (let i = 0; i < 10; i++) { + obj[`key${i}`] = getLargeObj(index * 10 + i, depth - 1); + } + return obj; + }; + for (let i = 0; i < 100; i++) { + const entry = JSON.stringify(getLargeObj(i, 5)); + await jsonfile.write(entry); + if (i < 99) { + await jsonfile.write(","); + } + } + await jsonfile.write("]"); + await jsonfile.close(); + process.stdout.write("Large file written in " + sw.elapsed().toString() + "\n"); + + const jsonData = await fs.readFile("large.json", "utf-8"); + + const start = process.hrtime.bigint(); + const obj = await JsonSerializer.DeserializeAsync<{ key: string; value: string }[]>(jsonData); + const end = process.hrtime.bigint(); + const duration = end - start; + console.log(`Deserialization took ${duration / BigInt(1e6)} ms`); + + assert.equal(obj.length, 100); + await fs.unlink("large.json"); + }); + + it("should be able to parallelise", async () => { + // write a massive json file + const sw = Stopwatch.startNew(); + const jsonfile = await fs.open("large.json", "w"); + await jsonfile.write("["); + const getLargeObj = (index: number, depth: number) => { + const obj: JsonValue = {}; + + if (depth === 0) { + return obj; + } + for (let i = 0; i < 5; i++) { + obj[`key${i}`] = getLargeObj(index * 10 + i, depth - 1); + } + return obj; + }; + for (let i = 0; i < 50; i++) { + const entry = JSON.stringify(getLargeObj(i, 5)); + await jsonfile.write(entry); + if (i < 49) { + await jsonfile.write(","); + } + } + await jsonfile.write("]"); + await jsonfile.close(); + process.stdout.write("Large file written in " + sw.elapsed().toString() + "\n"); + + const tasks = []; + const start = process.hrtime.bigint(); + for (let i = 0; i < 64; i++) { + tasks.push( + (async () => { + const jsonData = await fs.readFile("large.json", "utf-8"); + + const obj = await JsonSerializer.DeserializeAsync<{ key: string; value: string }[]>(jsonData); + const end = process.hrtime.bigint(); + const duration = end - start; + console.log(`Deserialization took ${duration / BigInt(1e6)} ms`); + + assert.equal(obj.length, 50); + })(), + ); + } + await Promise.all(tasks); + await fs.unlink("large.json"); + }); + + // TODO: broken + // it("should be able to stream large file", async () => { + // // write a massive json file + // const sw = Stopwatch.startNew(); + // const jsonfile = await fs.open("large.json", "w"); + // await jsonfile.write("["); + // const getLargeObj = (index: number, depth: number) => { + // const obj: JsonValue = {}; + // + // if (depth === 0) { + // return obj; + // } + // for (let i = 0; i < 10; i++) { + // obj[`key${i}`] = getLargeObj(index * 10 + i, depth - 1); + // } + // return obj; + // }; + // for (let i = 0; i < 100; i++) { + // const entry = JSON.stringify(getLargeObj(i, 5)); + // await jsonfile.write(entry); + // if (i < 99) { + // await jsonfile.write(","); + // } + // } + // await jsonfile.write("]"); + // await jsonfile.close(); + // process.stdout.write("Large file written in " + sw.elapsed().toString() + "\n"); + // + // const jsonData = await fs.open("large.json", "r").then((f) => f.createReadStream()); + // + // const start = process.hrtime.bigint(); + // const obj = await JsonSerializer.DeserializeAsync<{ key: string; value: string }[]>(jsonData); + // const end = process.hrtime.bigint(); + // const duration = end - start; + // console.log(`Deserialization took ${duration / BigInt(1e6)} ms`); + // + // assert.equal(obj.length, 100); + // await fs.unlink("large.json"); + // }); +}); diff --git a/src/util/util/json/JsonSerializer.ts b/src/util/util/json/JsonSerializer.ts new file mode 100644 index 000000000..c88e4a2f9 --- /dev/null +++ b/src/util/util/json/JsonSerializer.ts @@ -0,0 +1,166 @@ +import { JsonSerializerOptions } from "./JsonSerializerOptions"; +import { Worker } from "worker_threads"; +import { join } from "path"; +import os from "os"; +import Stream from "node:stream"; +import { ReadStream, WriteStream } from "node:fs"; + +// const worker = new Worker(join(process.cwd(), 'dist', 'util', 'util', 'json', 'jsonWorker.js')); +const workerPool: Worker[] = []; +const numWorkers = process.env.JSON_WORKERS ? parseInt(process.env.JSON_WORKERS) : os.cpus().length; + +for (let i = 0; i < numWorkers; i++) { + console.log("[JsonSerializer] Starting JSON worker", i); + workerPool.push(new Worker(join(__dirname, "jsonWorker.js"))); + workerPool[i].unref(); + workerPool[i].setMaxListeners(64); +} +let currentWorkerIndex = 0; + +function getNextWorker(): Worker { + const worker = workerPool[currentWorkerIndex]; + currentWorkerIndex = (currentWorkerIndex + 1) % numWorkers; + return worker; +} + +export class JsonSerializer { + public static Serialize(value: T, opts?: JsonSerializerOptions): string { + return JSON.stringify(value); + } + public static async SerializeAsync(value: T, opts?: JsonSerializerOptions): Promise { + const worker = getNextWorker(); + worker.postMessage({ type: "serialize", value }); + return new Promise((resolve, reject) => { + const handler = (msg: { result?: string; error?: string }) => { + clearTimeout(timeout); + worker.removeListener("message", handler); + if (msg.error) { + reject(new Error(msg.error)); + } else { + resolve(msg.result!); + } + }; + worker.on("message", handler); + const timeout = setTimeout(() => { + worker.removeListener("message", handler); + reject(new Error("Worker timeout")); + }, 60000); + }); + } + public static Deserialize(json: string, opts?: JsonSerializerOptions): T { + return JSON.parse(json) as T; + } + public static async DeserializeAsync(json: string | ReadableStream | ReadStream, opts?: JsonSerializerOptions): Promise { + if (json instanceof ReadableStream) return this.DeserializeAsyncReadableStream(json, opts); + if (json instanceof ReadStream) return this.DeserializeAsyncReadStream(json, opts); + + const worker = getNextWorker(); + worker.postMessage({ type: "deserialize", json }); + return new Promise((resolve, reject) => { + const handler = (msg: { result?: string; error?: string }) => { + clearTimeout(timeout); + worker.removeListener("message", handler); + if (msg.error) { + reject(new Error(msg.error)); + } else { + resolve(JSON.parse(msg.result!) as T); + } + }; + worker.on("message", handler); + const timeout = setTimeout(() => { + worker.removeListener("message", handler); + reject(new Error("Worker timeout")); + }, 60000); + }); + } + + private static async DeserializeAsyncReadableStream(jsonStream: ReadableStream, opts?: JsonSerializerOptions): Promise { + const reader = jsonStream.getReader(); + let jsonData = ""; + while (true) { + const { done, value } = await reader.read(); + if (done) break; + jsonData += new TextDecoder().decode(value); + } + return this.DeserializeAsync(jsonData, opts); + } + + private static async DeserializeAsyncReadStream(jsonStream: ReadStream, opts?: JsonSerializerOptions): Promise { + let jsonData = ""; + for await (const chunk of jsonStream) { + jsonData += chunk.toString(); + } + return this.DeserializeAsync(jsonData, opts); + } + + public static async *DeserializeAsyncEnumerable(json: string | ReadStream | ReadableStream, opts?: JsonSerializerOptions): AsyncGenerator { + if (json instanceof ReadableStream) return yield* this.DeserializeAsyncEnumerableReadableStream(json, opts); + if (json instanceof ReadStream) return yield* this.DeserializeAsyncEnumerableReadStream(json, opts); + + const arr = await this.DeserializeAsync(json, opts); + for (const item of arr) { + yield item; + } + } + + private static async *DeserializeAsyncEnumerableReadableStream(json: ReadableStream, opts?: JsonSerializerOptions) { + const reader = json.getReader(); + //TODO: implement + yield undefined as unknown as T; + } + + private static async *DeserializeAsyncEnumerableReadStream(json: ReadStream, opts?: JsonSerializerOptions) { + // TODO: implement + yield undefined as unknown as T; + } + + public static async SerializeAsyncEnumerableToStringAsync(items: AsyncIterable, opts?: JsonSerializerOptions): Promise { + let jsonData = "["; + let first = true; + for await (const item of items) { + if (!first) { + jsonData += ","; + } else { + first = false; + } + jsonData += await this.SerializeAsync(item, opts); + } + jsonData += "]"; + return jsonData; + } + + public static async SerializeAsyncEnumerableAsync(items: AsyncIterable, stream: WriteStream | WritableStream, opts?: JsonSerializerOptions): Promise {} + + private static async SerializeAsyncEnumerableToWritableStreamAsync(items: AsyncIterable, stream: WritableStream, opts?: JsonSerializerOptions): Promise { + const writer = stream.getWriter(); + let first = true; + await writer.write(new TextEncoder().encode("[")); + for await (const item of items) { + if (!first) { + await writer.write(new TextEncoder().encode(",")); + } else { + first = false; + } + const jsonItem = await this.SerializeAsync(item, opts); + await writer.write(new TextEncoder().encode(jsonItem)); + } + await writer.write(new TextEncoder().encode("]")); + await writer.close(); + } + + private static async SerializeAsyncEnumerableToWriteStreamAsync(items: AsyncIterable, stream: WriteStream, opts?: JsonSerializerOptions): Promise { + let first = true; + stream.write("["); + for await (const item of items) { + if (!first) { + stream.write(","); + } else { + first = false; + } + const jsonItem = await this.SerializeAsync(item, opts); + stream.write(jsonItem); + } + stream.write("]"); + stream.end(); + } +} diff --git a/src/util/util/json/JsonSerializerOptions.ts b/src/util/util/json/JsonSerializerOptions.ts new file mode 100644 index 000000000..0b3e11b2f --- /dev/null +++ b/src/util/util/json/JsonSerializerOptions.ts @@ -0,0 +1 @@ +export class JsonSerializerOptions {} diff --git a/src/util/util/json/README.RST b/src/util/util/json/README.RST new file mode 100644 index 000000000..9670081b2 --- /dev/null +++ b/src/util/util/json/README.RST @@ -0,0 +1,4 @@ +JSON utils +============= + +JSON library in typescript, based around Dotnet's System.Text.Json api. \ No newline at end of file diff --git a/src/util/util/json/jsonWorker.ts b/src/util/util/json/jsonWorker.ts new file mode 100644 index 000000000..f12c9f67e --- /dev/null +++ b/src/util/util/json/jsonWorker.ts @@ -0,0 +1,16 @@ +import { parentPort } from "worker_threads"; + +parentPort?.on("message", (message) => { + try { + if (message.type === "serialize") { + const result = JSON.stringify(message.value); + parentPort?.postMessage({ result }); + } else if (message.type === "deserialize") { + const parsed = JSON.parse(message.json); + const result = JSON.stringify(parsed); + parentPort?.postMessage({ result }); + } + } catch (error) { + parentPort?.postMessage({ error: (error as Error).message }); + } +});