123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547 |
- import {Buffer} from "buffer";
- import {PrimitiveValue, UUID} from "./types";
- type ConflictType = "U/U" | "R/U" | "R*/U" | "U/D" | "D/D";
- export interface DeltaVisitor {
- visitNodeCreation(delta: NodeCreation);
- visitReadAllOutgoing(delta: ReadAllOutgoing);
- visitEdgeCreation(delta: EdgeCreation);
- visitEdgeUpdate(delta: EdgeUpdate);
- visitNodeDeletion(delta: NodeDeletion);
- visitTransaction(delta: Transaction);
- }
- export abstract class Delta {
- readonly hash: Buffer;
- readonly description: string;
- readonly conflictsWith: [Delta, ConflictType][] = [];
- readonly partOf: Transaction[] = [];
- constructor(hash: Buffer, description: string) {
- this.hash = hash;
- this.description = description;
- }
- abstract getDependencies(): readonly [Delta,string][];
- hasTransitiveDependency(other: Delta) {
- if (this === other) return true;
- for (const [d] of this.getDependencies()) {
- if (d.hasTransitiveDependency(other)) return true;
- }
- return false;
- }
- serialize(): any {
- return {
- hash: this.hash.toString('hex'),
- }
- }
- abstract iterPrimitiveDeltas(): Generator<PrimitiveDelta>;
- abstract accept(visitor: DeltaVisitor);
- }
- export abstract class PrimitiveDelta extends Delta {
- // constructor(hash: Buffer, description: string) {
- // super(has, description);
- // }
- *iterPrimitiveDeltas() {
- yield this;
- }
- }
- function registerConflict(d1: Delta, d2: Delta, type: ConflictType) {
- d1.conflictsWith.push([d2, type]);
- d2.conflictsWith.push([d1, type]);
- }
- export class NodeCreation extends PrimitiveDelta {
- readonly id: UUID;
- // Inverse dependencies
- readonly outgoingEdges: Map<string, EdgeCreation> = new Map();
- readonly incomingEdges: EdgeUpdate[] = []; // all deltas EVER that set this node as target of an edge.
- readonly readAllOutgoing: ReadAllOutgoing[] = [];
- readonly deletions: NodeDeletion[] = [];
- constructor(hash: Buffer, id: UUID) {
- super(hash, `NEW(${JSON.stringify(id)})`);
- this.id = id;
- }
- getDependencies(): readonly [Delta,string][] {
- return [];
- }
- registerEdgeUpdate(u: EdgeUpdate) {
- // A new outgoing edge will always conflict with all existing deletions.
- for (const d of this.deletions) {
- registerConflict(d, u, "U/D");
- }
- }
- registerIncomingEdge(u: EdgeUpdate) {
- // A new incoming edge always will conflict with all existing deletions.
- for (const d of this.deletions) {
- registerConflict(d, u, "U/D");
- }
- this.incomingEdges.push(u);
- }
- registerDeletion(d: NodeDeletion) {
- // A new deletion will conflict with all earliest incoming/outgoing edge operations that the deletion does not transitively explicitly depend on.
- for (const o of this.outgoingEdges.values()) {
- for (const u of o.iterOverwriters(u => !d.afterSrc.some(a => a.hasTransitiveDependency(u)))) {
- registerConflict(d, u, "U/D");
- }
- }
- for (const i of this.incomingEdges.filter(i => !d.afterTgt.some(a => a.hasTransitiveDependency(i)))) {
- registerConflict(i, d, "U/D");
- }
- // Delete/Delete conflict
- for (const other of this.deletions) {
- if (other !== d) {
- registerConflict(d, other, "D/D");
- }
- }
- this.deletions.push(d);
- }
-
- registerReadAllOutgoing(r: ReadAllOutgoing) {
- // ReadAllOutgoing will conflict with all outgoing edge creations that are not a dependency of ReadAllOutgoing
- for (const o of this.outgoingEdges.values()) {
- if (!r.after.includes(o)) {
- registerConflict(r, o, "R*/U");
- }
- }
- this.readAllOutgoing.push(r);
- }
- registerEdgeCreation(o: EdgeCreation) {
- for (const r of this.readAllOutgoing) {
- if (!o.after.includes(r)) {
- registerConflict(r, o, "R*/U");
- }
- }
- this.outgoingEdges.set(o.hash.toString('hex'), o);
- }
- accept(visitor: DeltaVisitor) {
- visitor.visitNodeCreation(this);
- };
- serialize(): any {
- return {...super.serialize(), type: "NodeCreation", id: this.id};
- }
- }
- // This delta represents getting all outgoing edges of a node.
- // For instance, when getting all the elements of a set (e.g., when checking some constraint), or iterating over a dictionary.
- // This operation will be conflicting with all concurrent EdgeCreations.
- export class ReadAllOutgoing extends PrimitiveDelta {
- // Dependencies:
- readonly node: NodeCreation;
- readonly after: readonly EdgeCreation[];
- constructor(hash: Buffer, node: NodeCreation, after: readonly EdgeCreation[]) {
- super(hash, `R*(${node.id})`);
- this.node = node;
- this.after = after;
- // Register inverse dependencies:
- node.registerReadAllOutgoing(this);
- }
- getDependencies(): readonly [Delta, string][] {
- return [
- [this.node, "N"],
- ...([] as [Delta,string][]).concat(...this.after.map(a => a.overwrittenBy.map(u => [u, "A"] as [Delta, string]))),
- ];
- }
- accept(visitor: DeltaVisitor) {
- visitor.visitReadAllOutgoing(this);
- }
- }
- export class NodeDeletion extends PrimitiveDelta {
- // Dependencies:
- node: NodeCreation;
- afterSrc: readonly EdgeUpdate[];
- afterTgt: readonly EdgeUpdate[];
- constructor(hash: Buffer, node: NodeCreation, afterSrc: readonly EdgeUpdate[], afterTgt: readonly EdgeUpdate[]) {
- super(hash, `DEL(${JSON.stringify(node.id)})`);
- this.node = node;
- this.afterSrc = afterSrc;
- this.afterTgt = afterTgt;
- if (afterSrc.some(a => a.target.value !== null)) {
- throw new Error("NodeDeletion can only depend on EdgeUpdates that set outgoing edges to null.");
- }
- if (afterTgt.some(a => a.target.value === node)) {
- throw new Error("NodeDeletion cannot depend on EdgeUpdates that set incoming edges to node being deleted.");
- }
- // Register our dependencies' inverse dependencies + detect conflicts:
- node.registerDeletion(this);
- }
- getDependencies(): [Delta, string][] {
- return [
- [this.node, "D"],
- ...this.afterSrc.map(u => [u, "A"] as [Delta, string]),
- ...this.afterTgt.map(u => [u, "A"] as [Delta, string]),
- ];
- }
- accept(visitor: DeltaVisitor) {
- visitor.visitNodeDeletion(this);
- };
- serialize(): any {
- return {
- ...super.serialize(),
- type: "NodeDeletion",
- node: this.node.hash.toString('hex'),
- afterSrc: this.afterSrc.map(u => u.hash.toString('hex')),
- afterTgt: this.afterTgt.map(u => u.hash.toString('hex')),
- };
- }
- }
- // Detects write/write and write/read conflicts.
- export abstract class Edge extends PrimitiveDelta {
- readonly source: NodeCreation;
- readonly label: string;
- // Inverse dependencies
- readonly overwrittenBy: EdgeUpdate[] = [];
- readonly readBy: EdgeUpdate[] = [];
- constructor(hash: Buffer, description: string, source: NodeCreation, label: string) {
- super(hash, description);
- this.source = source;
- this.label = label;
- }
- abstract getDependencies(): readonly [Delta,string][];
- // Iterate over all overwriters, from early to late, depth-first.
- // When in some branch the 'condition' is satisfied, the satisfying element is yielded, and the descendants ignored.
- *iterOverwriters(filter: (u: EdgeUpdate) => boolean): Iterable<EdgeUpdate> {
- for (const ovr of this.overwrittenBy) {
- if (filter(ovr)) {
- yield ovr;
- }
- else {
- yield* ovr.iterOverwriters(filter);
- }
- }
- }
- registerWrite(write: EdgeUpdate) {
- for (const other of this.overwrittenBy) {
- // A write conflicts with all other writes:
- registerConflict(write, other, "U/U");
- }
- for (const read of this.readBy) {
- if (read !== write && !write.afterReads.includes(read)) {
- // A write conflicts with all reads:
- registerConflict(read, write, "R/U");
- }
- }
- this.overwrittenBy.push(write);
- // Also check conflicts with deletions of source:
- this.source.registerEdgeUpdate(write);
- }
- registerRead(read: EdgeUpdate) {
- for (const write of this.overwrittenBy) {
- if (read !== write) {
- // A read conflicts with all writes:
- registerConflict(read, write, "R/U");
- }
- }
- this.readBy.push(read);
- }
- }
- // Represents the addition of an outgoing edge to a node, without assigning a target to the edge yet.
- // The edge can be given a target with an EdgeUpdate.
- // EdgeCreation conflicts with any concurrent 'ReadAllOutgoing'.
- export class EdgeCreation extends Edge {
- // Dependencies:
- readonly after: readonly ReadAllOutgoing[];
- constructor(hash: Buffer, source: NodeCreation, label: string, after: readonly ReadAllOutgoing[]) {
- super(hash, `E(${JSON.stringify(source.id)}.${label})`, source, label);
- this.after = after;
- // Register our dependencies' inverse dependencies + detect conflicts:
- source.registerEdgeCreation(this);
- }
- getDependencies(): readonly [Delta, string][] {
- return [
- [this.source, "SRC"],
- ...this.after.map(a => [a, "A"] as [Delta,string]),
- ];
- }
- accept(visitor: DeltaVisitor) {
- visitor.visitEdgeCreation(this);
- }
- serialize() {
- return {
- ...super.serialize(),
- type: "EdgeCreation",
- source: this.source.hash.toString('hex'),
- label: this.label,
- after: this.after.map(a => a.hash.toString('hex')),
- };
- }
- }
- export class EdgeUpdate extends Edge {
- // Dependencies
- readonly overwrites: Edge;
- readonly reads: readonly Edge[];
- readonly target: Target;
- readonly afterReads: readonly EdgeUpdate[];
- // Inverse dependencies
- // readonly overwritable: ExistingEdge;
- constructor(hash: Buffer, overwrites: Edge, target: Target, reads: readonly Edge[], afterReads: readonly EdgeUpdate[]) {
- super(hash, `U(${JSON.stringify(overwrites.source.id)}.${overwrites.label}=${target.value instanceof NodeCreation ? target.value.description : target.value})`, overwrites.source, overwrites.label);
- // Record our own dependencies:
- this.overwrites = overwrites;
- this.target = target;
- this.reads = reads;
- this.afterReads = afterReads;
- // Register our dependencies' inverse dependencies + detect conflicts:
- overwrites.registerWrite(this);
- target.registerDependency(this);
- reads.forEach(r => r.registerRead(this));
- }
- // Makes code slightly easier to read
- overwrite() {
- return this;
- }
- // Makes code slightly easier to read
- read() {
- return this;
- }
- getDependencies(): readonly [Delta,string][] {
- return [
- [this.overwrites, "U"],
- ...this.target.getDependencies(),
- ...this.reads.map(r => [r, "R"] as [Delta,string]),
- ...this.afterReads.map(a => [a, "A"] as [Delta, string]),
- ];
- }
- accept(visitor: DeltaVisitor) {
- visitor.visitEdgeUpdate(this);
- };
- serialize(): any {
- return {
- ...super.serialize(),
- type: "EdgeUpdate",
- overwrites: this.overwrites.serialize(),
- target: this.target.serialize(),
- reads: this.reads.map(r => r.serialize()),
- afterReads: this.afterReads.map(a => a.hash.toString('hex')),
- };
- }
- }
- // Target of an edge
- export interface Target {
- value: NodeCreation | PrimitiveValue;
- registerDependency(u: EdgeUpdate): void;
- getDependencies(): readonly [Delta, string][];
- serialize(): any;
- }
- export class TargetNode implements Target {
- value: NodeCreation;
- constructor(value: NodeCreation) {
- this.value = value;
- }
- registerDependency(u: EdgeUpdate): void {
- this.value.registerIncomingEdge(u);
- }
- getDependencies(): readonly [Delta, string][] {
- return [[this.value, "TGT"]];
- }
- serialize() {
- return {type: "TargetNode", node: this.value.hash.toString('hex')};
- }
- }
- export class TargetValue implements Target {
- value: PrimitiveValue;
- constructor(value: PrimitiveValue) {
- this.value = value;
- }
- registerDependency(): void {}
- getDependencies(): readonly [Delta, string][] {
- return [];
- }
- serialize() {
- return {type: "TargetValue", value: this.value};
- }
- }
- // One of the possibilities explored was to have a separate delta for Reading an Edge.
- // Problem is that deltas have content-based IDs, and all concurrent Reads (of the same Write) would be identical.
- // If two EdgeUpdates both read the same value, but one of them also overwrites this value, then there would not be a R/U conflict because they both perform the 'same' Read (same ID), but there should really be a conflict.
- // To solve this issue, every Read would need an additional unique ID attribute, but this is overkill: two concurrent identical EdgeUpdates that depend on an identical Read, would then become different (conflicting) operations, while they do exactly the same thing.
- // So I conclude that it's better for Reads to really just be dependencies of EdgeUpdates.
- // export class EdgeRead extends PrimitiveDelta {
- // // Every read has a unique ID (otherwise, different concurrent reads would be represented by the same delta)
- // readonly id: UUID;
- // // Dependencies
- // readonly reads: ExistingEdge;
- // constructor(hash: Buffer, id: UUID, reads: ExistingEdge) {
- // super(hash, `R(${reads.label})`);
- // // Record our own dependencies:
- // this.id = id;
- // this.reads = reads;
- // reads.registerRead(this);
- // }
- // getDependencies(): readonly [Delta, string][] {
- // return [[this.reads.delta, "R"]];
- // }
- // serialize(): any {
- // return {
- // ...super.serialize(),
- // id: this.id,
- // reads: this.reads.serialize(),
- // };
- // }
- // }
- export class Transaction extends Delta {
- readonly deltas: readonly Delta[];
- readonly dependencies: readonly [Transaction, string][];
- constructor(hash: Buffer, deltas: readonly Delta[], description: string, dependencies: readonly [Transaction, string][]) {
- super(hash, description);
- this.deltas = deltas;
- this.dependencies = dependencies;
- // TODO: validate dependencies.
- // Derive conflicts from deltas
- for (const delta of deltas) {
- for (const [conflictingDelta, conflictType] of delta.conflictsWith) {
- if (deltas.includes(conflictingDelta)) {
- throw new Error("Cannot create a composite delta out of conflicting deltas");
- }
- const conflictingTxs = conflictingDelta.partOf;
- for (const otherTx of conflictingTxs) {
- if (!this.conflictsWith.some(([c]) => c === otherTx)) {
- registerConflict(this, otherTx, conflictType);
- }
- }
- }
- }
- for (const [dependency] of dependencies) {
- for (const [c, conflictKind] of this.conflictsWith) {
- if (c === dependency) {
- // // only for debugging, print all the detailed dependencies:
- // const detailedDependencies: [Delta,Delta,string][] = [];
- // for (const d of deltas) {
- // for (const d2 of dependency.deltas) {
- // for (const [dd, kind] of d.getDependencies()) {
- // if (dd === d2) {
- // detailedDependencies.push([d, d2, kind]);
- // }
- // }
- // }
- // }
- // console.log({tx: deltas, dependency: dependency.deltas, detailedDependencies});
- throw new Error(`Assertion failed: Transaction '${description}' (${hash.toString('hex').substring(0,8)}) depends on another conflicting (${conflictKind}) transaction '${dependency.description}' (${dependency.hash.toString('hex').substring(0,8)}).`);
- }
- }
- }
- deltas.forEach(d => d.partOf.push(this));
- }
- getDependencies(): readonly [Delta,string][] {
- return this.dependencies;
- }
- serialize(): any {
- return {
- ...super.serialize(),
- type: "Transaction",
- deltas: this.deltas.map(d => d.hash.toString('hex')),
- description: this.description,
- dependencies: this.dependencies.map(([d, kind]) => ({tx: d.hash.toString('hex'), kind})),
- };
- }
- accept(visitor: DeltaVisitor) {
- visitor.visitTransaction(this);
- };
- *iterPrimitiveDeltas() {
- for (const d of this.deltas) {
- yield* d.iterPrimitiveDeltas();
- }
- }
- }
- // Given a set of deltas that we are trying to glue together in a (new) transaction, what other transactions should this new transaction depend on?
- // Argument 'currentDeltas' is a set of deltas to be considered as possible dependencies. Typically you only want to consider the deltas that make up the current version. This is decide which transaction to depend on, if a delta is contained by multiple transactions. If this argument is left undefined, then an error will be thrown if one of the deltas is contained by multiple transactions.
- export function findTxDependencies(deltas: Delta[], candidates?: Set<Delta>): [Transaction, string][] {
- const txDependencies: Map<Transaction,Set<string>> = new Map();
- for (const delta of deltas) {
- const dependencies = delta.getDependencies();
- for (const [dependency, kind] of dependencies) {
- if (!deltas.includes(dependency)) {
- const txs = dependency.partOf;
- const filteredTxs =
- candidates !== undefined
- ? txs.filter(tx => candidates!.has(tx))
- : txs;
- if (filteredTxs.length > 1) {
- // This error can never occur when passing a proper 'candidates' argument.
- throw new Error("Error: One of the composite's dependencies is contained by multiple composites.");
- }
- if (filteredTxs.length === 0) {
- throw new Error("Assertion failed: delta " + delta.description + " depends on " + dependency.description + " but this dependency could not be found in a composite.");
- }
- const [tx] = filteredTxs;
- const kinds = txDependencies.get(tx) || (() => {
- const kinds = new Set<string>();
- txDependencies.set(tx, kinds);
- return kinds;
- })();
- kinds.add(kind);
- }
- }
- }
- return [...txDependencies.entries()].map(([tx, kinds]) => [tx, [...kinds.values()].join(',')]);
- }
|