Make ProjectionNode reduction single-step (#1100)
GHCR - Development Branches / ghcr-publish (push) Failing after 32s
Docker Hub - Develop / docker-latest (push) Failing after 33s
Tests / Build & Lint (push) Failing after 4m29s
Tests / Integration tests (push) Failing after 26s
Tests / Application Service Integration tests (push) Failing after 20s
Tests / Unit tests (push) Successful in 5m47s

We figured out while working on https://github.com/the-draupnir-project/Draupnir/pull/1099 that actually in both cases if delta reducers are incorrect and get fixed, then downstream nodes have to use `reduceRebuild`.

The single-step projection nodes are much simpler though and don't leak internal state into the diff.

Closes https://github.com/the-draupnir-project/planning/issues/123

* Allow projections to pass node state in the delta.

https://github.com/the-draupnir-project/planning/issues/123.

The existing projections have been changed to the new API, but they
probably could be optimised now that we have this feature.

* Make the node state delta authoritative in existing projections.

https://github.com/the-draupnir-project/planning/issues/123

* Make projections single step

https://github.com/the-draupnir-project/planning/issues/120

* Make Projection responsible for producing the diff.

https://github.com/the-draupnir-project/planning/issues/123

* Changeset.
This commit is contained in:
Gnuxie
2026-04-20 16:50:44 +01:00
committed by GitHub
parent ecfcc1c2c3
commit a4c4158443
9 changed files with 228 additions and 206 deletions
+11
View File
@@ -0,0 +1,11 @@
---
"@the-draupnir-project/matrix-protection-suite": minor
---
Projections now produce the next node in a single step, which simplifies their
implementation.
Previously we thought that the two-step system had stronger guarantees for
correctness when persisting projection nodes, but we found in implementation
that the two approaches were almost identical, with two-step being needlessly
complicated https://github.com/the-draupnir-project/planning/issues/120
+5 -2
View File
@@ -36,16 +36,19 @@ function watchDeltaForMemberBanIntents(
watchedPoliciesDelta,
setMembershipRevision
);
return memberBanIntentProjectionNode.reduceInput(
const nextNode = memberBanIntentProjectionNode.reduceInput(
setMembershipPoliciesRevisionDelta
);
return memberBanIntentProjectionNode.diff(nextNode);
}
function watchDeltaForServerBanIntents(
watchedPoliciesDelta: PolicyRuleChange[],
serverBanIntentProjectionNode: ServerBanIntentProjectionNode
) {
return serverBanIntentProjectionNode.reduceInput(watchedPoliciesDelta);
const nextNode =
serverBanIntentProjectionNode.reduceInput(watchedPoliciesDelta);
return serverBanIntentProjectionNode.diff(nextNode);
}
export type WatchPolicyRoomPreview = {
@@ -67,12 +67,17 @@ export class ProjectionOutputHelper<
input: ExtractInputDeltaShapes<ExtractInputProjectionNodes<TProjectionNode>>
): void {
const previousNode = this.currentNode;
const delta = previousNode.reduceInput(input);
this.currentNode = previousNode.reduceDelta(delta) as TProjectionNode;
this.currentNode = previousNode.reduceInput(input) as TProjectionNode;
const downstreamDelta = previousNode.diff(this.currentNode);
for (const output of this.outputs) {
output.applyInput(delta);
output.applyInput(downstreamDelta);
}
this.emitter.emit("projection", this.currentNode, delta, previousNode);
this.emitter.emit(
"projection",
this.currentNode,
downstreamDelta,
previousNode
);
}
addOutput(projection: Projection): this {
@@ -1,4 +1,4 @@
// SPDX-FileCopyrightText: 2025 Gnuxie <Gnuxie@protonmail.com>
// SPDX-FileCopyrightText: 2025 - 2026 Gnuxie <Gnuxie@protonmail.com>
//
// SPDX-License-Identifier: Apache-2.0
//
@@ -27,23 +27,44 @@ export type ExtractInputProjectionNodes<
export type ProjectionNode<
TInputs extends ProjectionNode[] | unknown[] = unknown[],
TDeltaShape = unknown,
TDownstreamDeltaShape = unknown,
TAccessMixin = Record<never, never>,
> = {
readonly ulid: ULID;
// Whether the projection has no state at all.
isEmpty(): boolean;
reduceInput(input: ExtractInputDeltaShapes<TInputs>): TDeltaShape;
reduceDelta(
input: TDeltaShape
): ProjectionNode<TInputs, TDeltaShape, TAccessMixin>;
/**
* Produces the initial delta, can only be used when the revision is empty.
* Otherwise you must use reduceRebuild.
* Produces the externally visible delta with the difference between two
* nodes. This delta is for downstream projections to consume, not the
* projection itself. To replay and reproduce the projection node
* from deltas you have to use the input deltas.
*/
reduceInitialInputs(input: TInputs): TDeltaShape;
// only needed for persistent storage
reduceRebuild?(inputs: TInputs): TDeltaShape;
diff(
nextNode: ProjectionNode<TInputs, TDownstreamDeltaShape, TAccessMixin>
): TDownstreamDeltaShape;
/**
* Reduces an input delta into the next projection node.
*/
reduceInput(
input: ExtractInputDeltaShapes<TInputs>
): ProjectionNode<TInputs, TDownstreamDeltaShape, TAccessMixin>;
/**
* Produces the initial node from the current input projection nodes. This can
* only be used when the node is empty. Otherwise use reduceRebuild.
*/
reduceInitialInputs(
input: TInputs
): ProjectionNode<TInputs, TDownstreamDeltaShape, TAccessMixin>;
/**
* Reconciles this node against the current input projection nodes. This is
* intended for persistence/rebuild flows after reducer bugs are fixed.
*
* The projection or orchestration layer is responsible for publishing the
* corrective downstream delta by diffing this node against the corrected node.
*/
reduceRebuild?(
inputs: TInputs
): ProjectionNode<TInputs, TDownstreamDeltaShape, TAccessMixin>;
} & TAccessMixin;
export type AnyProjectionNode = ProjectionNode<never>;
@@ -34,10 +34,10 @@ export class StandardMemberBanIntentProjection
) {
const node =
StandardMemberBanIntentProjectionNode.create(monotonicFactory());
const delta = node.reduceInitialInputs([
const initialNode = node.reduceInitialInputs([
membershipPolicyRevisionIssuer.currentRevision as unknown as MemberBanInputProjectionNode,
]);
super(node.reduceDelta(delta));
super(initialNode);
this.membershipPolicyRevisionIssuer.on(
"revision",
this.handleUpstreamRevision
@@ -1,4 +1,4 @@
// SPDX-FileCopyrightText: 2025 Gnuxie <Gnuxie@protonmail.com>
// SPDX-FileCopyrightText: 2025 - 2026 Gnuxie <Gnuxie@protonmail.com>
//
// SPDX-License-Identifier: Apache-2.0
//
@@ -13,7 +13,6 @@ import {
ProjectionNode,
} from "../../../Projection/ProjectionNode";
import {
MemberPolicyMatch,
MemberPolicyMatches,
MembershipPolicyRevision,
MembershipPolicyRevisionDelta,
@@ -37,16 +36,16 @@ export type MemberBanInputProjectionNode = ProjectionNode<
> &
MembershipPolicyRevision;
// use add/remove for steady state intents
// When the intent becomes effectual, matches will be removed
// upstream and so this model will remain consistent
export interface MemberBanIntentProjectionDelta {
add: MemberPolicyMatch[];
remove: MemberPolicyMatch[];
ban: StringUserID[];
recall: StringUserID[];
}
type MemberBanIntentMap = PersistentMap<
StringUserID,
List<LiteralPolicyRule | GlobPolicyRule>
>;
function isPolicyRelevant(policy: LiteralPolicyRule | GlobPolicyRule): boolean {
return (
policy.recommendation === Recommendation.Ban ||
@@ -59,63 +58,20 @@ export type MemberBanIntentProjectionNode = ProjectionNode<
MemberBanIntentProjectionDelta,
{
allMembersWithRules(): MemberPolicyMatches[];
isMemberBanned(member: StringUserID): boolean;
allRulesMatchingMember(
member: StringUserID
): (LiteralPolicyRule | GlobPolicyRule)[];
}
>;
export const MemberBanIntentProjectionNodeHelper = Object.freeze({
reduceMembershipPolicyDelta(
input: MembershipPolicyRevisionDelta
): Pick<MemberBanIntentProjectionDelta, "add" | "remove"> {
const output: Pick<MemberBanIntentProjectionDelta, "add" | "remove"> = {
add: [],
remove: [],
};
for (const added of input.addedMemberMatches) {
if (isPolicyRelevant(added.policy)) {
output.add.push(added);
}
}
for (const removed of input.removedMemberMatches) {
if (isPolicyRelevant(removed.policy)) {
output.remove.push(removed);
}
}
return output;
},
reduceIntentDelta(
input: Pick<MemberBanIntentProjectionDelta, "add" | "remove">,
policies: PersistentMap<
StringUserID,
List<LiteralPolicyRule | GlobPolicyRule>
>
): MemberBanIntentProjectionDelta {
const intents = ListMultiMap.deriveIntents(
policies,
input.add.map((match) => match.policy),
input.remove.map((match) => match.policy),
(rule) => rule.entity as StringUserID
);
return {
...input,
ban: intents.intend,
recall: intents.recall,
};
},
});
// Upstream inputs are not yet converted to projections, so have to be never[]
// for now.
export class StandardMemberBanIntentProjectionNode implements MemberBanIntentProjectionNode {
public readonly ulid: ULID;
constructor(
private readonly ulidFactory: ULIDFactory,
private readonly intents: PersistentMap<
StringUserID,
List<LiteralPolicyRule | GlobPolicyRule>
>
private readonly intents: MemberBanIntentMap
) {
this.ulid = ulidFactory();
}
@@ -133,29 +89,42 @@ export class StandardMemberBanIntentProjectionNode implements MemberBanIntentPro
return this.intents.isEmpty();
}
reduceInput(
input: ExtractInputDeltaShapes<[MemberBanInputProjectionNode]>
public diff(
nextNode: MemberBanIntentProjectionNode
): MemberBanIntentProjectionDelta {
return MemberBanIntentProjectionNodeHelper.reduceIntentDelta(
MemberBanIntentProjectionNodeHelper.reduceMembershipPolicyDelta(input),
this.intents
);
const ban: StringUserID[] = [];
const recall: StringUserID[] = [];
for (const member of nextNode.allMembersWithRules()) {
if (!this.isMemberBanned(member.userID)) {
ban.push(member.userID);
}
}
for (const userID of this.intents.keys()) {
if (!nextNode.isMemberBanned(userID)) {
recall.push(userID);
}
}
return { ban, recall };
}
reduceDelta(
input: MemberBanIntentProjectionDelta
): StandardMemberBanIntentProjectionNode {
reduceInput(
input: ExtractInputDeltaShapes<[MemberBanInputProjectionNode]>
): MemberBanIntentProjectionNode {
let nextIntents = this.intents;
nextIntents = ListMultiMap.addValues(
nextIntents,
input.add.map((match) => match.policy),
(rule) => rule.entity as StringUserID
);
nextIntents = ListMultiMap.removeValues(
nextIntents,
input.remove.map((match) => match.policy),
(rule) => rule.entity as StringUserID
);
for (const added of input.addedMemberMatches) {
if (isPolicyRelevant(added.policy)) {
nextIntents = ListMultiMap.add(nextIntents, added.userID, added.policy);
}
}
for (const removed of input.removedMemberMatches) {
if (isPolicyRelevant(removed.policy)) {
nextIntents = ListMultiMap.remove(
nextIntents,
removed.userID,
removed.policy
);
}
}
return new StandardMemberBanIntentProjectionNode(
this.ulidFactory,
nextIntents
@@ -164,7 +133,7 @@ export class StandardMemberBanIntentProjectionNode implements MemberBanIntentPro
reduceInitialInputs([membershipPolicyRevision]: [
MemberBanInputProjectionNode,
]): MemberBanIntentProjectionDelta {
]): MemberBanIntentProjectionNode {
if (!this.isEmpty()) {
throw new TypeError(
"This can only be called on an empty projection node"
@@ -173,15 +142,22 @@ export class StandardMemberBanIntentProjectionNode implements MemberBanIntentPro
const matches = membershipPolicyRevision
.allMembersWithRules()
.map((member) =>
member.policies.map((policy) => ({ userID: member.userID, policy }))
member.policies
.filter(isPolicyRelevant)
.map((policy) => ({ userID: member.userID, policy }))
)
.flat();
return {
add: matches,
ban: matches.map((match) => match.userID),
remove: [],
recall: [],
};
let nextIntents = PersistentMap<
StringUserID,
List<LiteralPolicyRule | GlobPolicyRule>
>();
for (const match of matches) {
nextIntents = ListMultiMap.add(nextIntents, match.userID, match.policy);
}
return new StandardMemberBanIntentProjectionNode(
this.ulidFactory,
nextIntents
);
}
allMembersWithRules(): MemberPolicyMatches[] {
@@ -197,6 +173,10 @@ export class StandardMemberBanIntentProjectionNode implements MemberBanIntentPro
);
}
isMemberBanned(member: StringUserID): boolean {
return this.intents.has(member);
}
allRulesMatchingMember(
member: StringUserID
): (LiteralPolicyRule | GlobPolicyRule)[] {
@@ -47,15 +47,13 @@ test("ACL compilation works and does not ban our server", async function () {
content: UnredactedPolicyContent;
}
).expect("Should be able to parse the policy rule");
const nodeWithBans = emptyNode.reduceDelta(
emptyNode.reduceInput(
[banPolicyRule, ourBanPolicyRule].map((policy) => ({
rule: policy,
sender: policy.sourceEvent.sender,
event: policy.sourceEvent,
changeType: PolicyRuleChangeType.Added,
}))
)
const nodeWithBans = emptyNode.reduceInput(
[banPolicyRule, ourBanPolicyRule].map((policy) => ({
rule: policy,
sender: policy.sourceEvent.sender,
event: policy.sourceEvent,
changeType: PolicyRuleChangeType.Added,
}))
);
const acl = compileServerACL(ourServerName, nodeWithBans).safeAclContent();
expect(acl.deny?.at(0)).toBe(badServer);
@@ -34,10 +34,10 @@ export class StandardServerBanIntentProjection
) {
const node =
StandardServerBanIntentProjectionNode.create(monotonicFactory());
const delta = node.reduceInitialInputs([
const initialNode = node.reduceInitialInputs([
policyListRevisionIssuer.currentRevision as unknown as PolicyListBridgeProjectionNode,
]);
super(node.reduceDelta(delta));
super(initialNode);
this.policyListRevisionIssuer.on("revision", this.handleUpstreamRevision);
}
@@ -1,4 +1,4 @@
// SPDX-FileCopyrightText: 2025 Gnuxie <Gnuxie@protonmail.com>
// SPDX-FileCopyrightText: 2025 - 2026 Gnuxie <Gnuxie@protonmail.com>
//
// SPDX-License-Identifier: Apache-2.0
//
@@ -28,10 +28,13 @@ import { ListMultiMap } from "../../../Projection/ListMultiMap";
export type ServerBanIntentProjectionDelta = {
deny: StringServerName[];
recall: StringServerName[];
add: (LiteralPolicyRule | GlobPolicyRule)[];
remove: (LiteralPolicyRule | GlobPolicyRule)[];
};
type ServerBanIntentMap = PersistentMap<
StringServerName,
List<LiteralPolicyRule | GlobPolicyRule>
>;
// is there a way that we can adapt this so that it can possibly be swapped
// to a lazy ban style protection if acls become exhausted.
// not withiout addressing the issues in the member protection tbh.
@@ -40,75 +43,15 @@ export type ServerBanIntentProjectionNode = ProjectionNode<
ServerBanIntentProjectionDelta,
{
deny: StringServerName[];
isServerDenied(serverName: StringServerName): boolean;
}
>;
export const ServerBanIntentProjectionHelper = Object.freeze({
reducePolicyDelta(
input: PolicyRuleChange[]
): Pick<ServerBanIntentProjectionDelta, "add" | "remove"> {
const output: Pick<ServerBanIntentProjectionDelta, "add" | "remove"> = {
add: [],
remove: [],
} satisfies Pick<ServerBanIntentProjectionDelta, "add" | "remove">;
for (const change of input) {
if (change.rule.kind !== PolicyRuleType.Server) {
continue;
} else if (change.rule.matchType === PolicyRuleMatchType.HashedLiteral) {
continue;
}
switch (change.changeType) {
case PolicyRuleChangeType.Added:
case PolicyRuleChangeType.RevealedLiteral: {
output.add.push(change.rule);
break;
}
case PolicyRuleChangeType.Modified: {
output.add.push(change.rule);
if (change.previousRule === undefined) {
throw new TypeError("Things are very wrong");
}
output.remove.push(change.previousRule as LiteralPolicyRule);
break;
}
case PolicyRuleChangeType.Removed: {
output.remove.push(change.rule);
break;
}
}
}
return output;
},
reduceIntentDelta(
input: Pick<ServerBanIntentProjectionDelta, "add" | "remove">,
policies: PersistentMap<
StringServerName,
List<GlobPolicyRule | LiteralPolicyRule>
>
): ServerBanIntentProjectionDelta {
const intents = ListMultiMap.deriveIntents(
policies,
input.add,
input.remove,
(rule) => rule.entity as StringServerName
);
return {
...input,
deny: intents.intend,
recall: intents.recall,
};
},
});
export class StandardServerBanIntentProjectionNode implements ServerBanIntentProjectionNode {
public readonly ulid: ULID;
constructor(
private readonly ulidFactory: ULIDFactory,
private readonly policies: PersistentMap<
StringServerName,
List<LiteralPolicyRule | GlobPolicyRule>
>
private readonly policies: ServerBanIntentMap
) {
this.ulid = ulidFactory();
}
@@ -122,15 +65,84 @@ export class StandardServerBanIntentProjectionNode implements ServerBanIntentPro
);
}
reduceInput(input: PolicyRuleChange[]): ServerBanIntentProjectionDelta {
return ServerBanIntentProjectionHelper.reduceIntentDelta(
ServerBanIntentProjectionHelper.reducePolicyDelta(input),
this.policies
diff(
nextNode: ServerBanIntentProjectionNode
): ServerBanIntentProjectionDelta {
const deny: StringServerName[] = [];
const recall: StringServerName[] = [];
for (const serverName of nextNode.deny) {
if (!this.isServerDenied(serverName)) {
deny.push(serverName);
}
}
for (const serverName of this.policies.keys()) {
if (!nextNode.isServerDenied(serverName)) {
recall.push(serverName);
}
}
return { deny, recall };
}
reduceInput(input: PolicyRuleChange[]): ServerBanIntentProjectionNode {
let nextPolicies = this.policies;
for (const change of input) {
if (
change.rule.kind !== PolicyRuleType.Server ||
change.rule.matchType === PolicyRuleMatchType.HashedLiteral
) {
continue;
}
switch (change.changeType) {
case PolicyRuleChangeType.Added:
case PolicyRuleChangeType.RevealedLiteral: {
nextPolicies = ListMultiMap.add(
nextPolicies,
change.rule.entity as StringServerName,
change.rule
);
break;
}
case PolicyRuleChangeType.Modified: {
if (change.previousRule === undefined) {
throw new TypeError("Things are very wrong");
}
nextPolicies = ListMultiMap.add(
nextPolicies,
change.rule.entity as StringServerName,
change.rule
);
if (
change.previousRule.kind !== PolicyRuleType.Server ||
change.previousRule.matchType === PolicyRuleMatchType.HashedLiteral
) {
break;
}
nextPolicies = ListMultiMap.remove(
nextPolicies,
change.previousRule.entity as StringServerName,
change.previousRule
);
break;
}
case PolicyRuleChangeType.Removed: {
nextPolicies = ListMultiMap.remove(
nextPolicies,
change.rule.entity as StringServerName,
change.rule
);
break;
}
}
}
return new StandardServerBanIntentProjectionNode(
this.ulidFactory,
nextPolicies
);
}
reduceInitialInputs([policyListRevision]: [
PolicyListBridgeProjectionNode,
]): ServerBanIntentProjectionDelta {
]): ServerBanIntentProjectionNode {
if (!this.isEmpty()) {
throw new TypeError("Cannot reduce initial inputs when inialised");
}
@@ -144,40 +156,32 @@ export class StandardServerBanIntentProjectionNode implements ServerBanIntentPro
Recommendation.Takedown
),
].filter((rule) => rule.matchType !== PolicyRuleMatchType.HashedLiteral);
const names = new Set(serverPolicies.map((policy) => policy.entity));
return {
add: serverPolicies,
deny: [...names] as StringServerName[],
remove: [],
recall: [],
};
}
isEmpty(): boolean {
return this.policies.size === 0;
}
reduceDelta(
input: ServerBanIntentProjectionDelta
): ServerBanIntentProjectionNode {
let nextPolicies = this.policies;
nextPolicies = ListMultiMap.addValues(
nextPolicies,
input.add,
(rule) => rule.entity as StringServerName
);
nextPolicies = ListMultiMap.removeValues(
nextPolicies,
input.remove,
(rule) => rule.entity as StringServerName
);
let nextPolicies = PersistentMap<
StringServerName,
List<LiteralPolicyRule | GlobPolicyRule>
>();
for (const policy of serverPolicies) {
nextPolicies = ListMultiMap.add(
nextPolicies,
policy.entity as StringServerName,
policy
);
}
return new StandardServerBanIntentProjectionNode(
this.ulidFactory,
nextPolicies
);
}
isEmpty(): boolean {
return this.policies.size === 0;
}
get deny(): StringServerName[] {
return [...this.policies.keys()];
}
isServerDenied(serverName: StringServerName): boolean {
return this.policies.has(serverName);
}
}