|
|
@@ -1,7 +1,9 @@
|
|
|
import {Buffer} from "buffer";
|
|
|
import {PrimitiveValue, UUID} from "./types";
|
|
|
+import { bufferXOR, buffersXOR } from "../util/buffer_xor";
|
|
|
+import { createHash } from "crypto";
|
|
|
|
|
|
-type ConflictType = "U/U" | "U/R" | "U/D" | "D/D";
|
|
|
+type ConflictType = "U/U" | "R/U" | "R*/U" | "U/D" | "D/D";
|
|
|
|
|
|
export abstract class Delta {
|
|
|
readonly hash: Buffer;
|
|
|
@@ -26,7 +28,7 @@ export abstract class Delta {
|
|
|
|
|
|
serialize(): any {
|
|
|
return {
|
|
|
- hash: this.hash.toString('base64'),
|
|
|
+ hash: this.hash.toString('hex'),
|
|
|
type: this.constructor.name,
|
|
|
}
|
|
|
}
|
|
|
@@ -54,6 +56,7 @@ export class NodeCreation extends PrimitiveDelta {
|
|
|
// Inverse dependencies
|
|
|
readonly outgoingEdges: Map<string, NewEdge> = new Map();
|
|
|
readonly incomingEdges: EdgeUpdate[] = []; // all deltas EVER that set this node as target of an edge.
|
|
|
+ readonly readAllOutgoing: ReadAllOutgoing[] = [];
|
|
|
readonly deletions: NodeDeletion[] = [];
|
|
|
|
|
|
constructor(hash: Buffer, id: UUID) {
|
|
|
@@ -65,11 +68,12 @@ export class NodeCreation extends PrimitiveDelta {
|
|
|
return [];
|
|
|
}
|
|
|
|
|
|
- createOutgoingEdge(label: string): NewEdge {
|
|
|
- return this.outgoingEdges.get(label) || (() => {
|
|
|
- const ovr = new NewEdge(this, label);
|
|
|
- this.outgoingEdges.set(label, ovr);
|
|
|
- return ovr;
|
|
|
+ createOutgoingEdge(label: string, after: ReadAllOutgoing[] = []): NewEdge {
|
|
|
+ const edgeId = label + ':' + buffersXOR(...after.map(a => a.hash)).toString('hex');
|
|
|
+ return this.outgoingEdges.get(edgeId) || (() => {
|
|
|
+ const newEdge = new NewEdge(this, label, after, edgeId);
|
|
|
+ this.outgoingEdges.set(edgeId, newEdge);
|
|
|
+ return newEdge;
|
|
|
})();
|
|
|
}
|
|
|
|
|
|
@@ -78,6 +82,11 @@ export class NodeCreation extends PrimitiveDelta {
|
|
|
for (const d of this.deletions) {
|
|
|
registerConflict(d, u, "U/D");
|
|
|
}
|
|
|
+ for (const r of this.readAllOutgoing) {
|
|
|
+ if (!(u.overwrites as NewEdge).after.includes(r)) {
|
|
|
+ registerConflict(r, u, "R*/U");
|
|
|
+ }
|
|
|
+ }
|
|
|
// outgoing edge already stored in Edge.
|
|
|
}
|
|
|
|
|
|
@@ -90,7 +99,7 @@ export class NodeCreation extends PrimitiveDelta {
|
|
|
}
|
|
|
|
|
|
registerDeletion(d: NodeDeletion) {
|
|
|
- // A new deletion will conflict with all earliest incoming/outgoing edge operations that the deletion does not explicitly depend on.
|
|
|
+ // A new deletion will conflict with all earliest incoming/outgoing edge operations that the deletion does not transitively explicitly depend on.
|
|
|
for (const o of this.outgoingEdges.values()) {
|
|
|
for (const u of o.iterOverwriters(u => !d.afterSrc.some(a => a.hasTransitiveDependency(u)))) {
|
|
|
registerConflict(d, u, "U/D");
|
|
|
@@ -99,7 +108,7 @@ export class NodeCreation extends PrimitiveDelta {
|
|
|
for (const i of this.incomingEdges.filter(i => !d.afterTgt.some(a => a.hasTransitiveDependency(i)))) {
|
|
|
registerConflict(i, d, "U/D");
|
|
|
}
|
|
|
- // NodeDeletion/delete conflict
|
|
|
+ // Delete/Delete conflict
|
|
|
for (const other of this.deletions) {
|
|
|
if (other !== d) {
|
|
|
registerConflict(d, other, "D/D");
|
|
|
@@ -107,12 +116,49 @@ export class NodeCreation extends PrimitiveDelta {
|
|
|
}
|
|
|
this.deletions.push(d);
|
|
|
}
|
|
|
+
|
|
|
+ registerReadAllOutgoing(r: ReadAllOutgoing) {
|
|
|
+ // ReadAllOutgoing will conflict with all outgoing edge creations that are not a dependency of ReadAllOutgoing
|
|
|
+ // (in essence just a R/W conflict)
|
|
|
+ for (const o of this.outgoingEdges.values()) {
|
|
|
+ if (!r.after.includes(o)) {
|
|
|
+ // TODO: We could turn NewEdge into an actual delta type. Then we could just conflict with that delta (possibly more efficient, maybe more elegant)
|
|
|
+ for (const u of o.overwrittenBy) {
|
|
|
+ registerConflict(r, u, "R*/U");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ this.readAllOutgoing.push(r);
|
|
|
+ }
|
|
|
|
|
|
serialize(): any {
|
|
|
return {...super.serialize(), id: this.id};
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+// This delta represents getting all outgoing edges of a node.
|
|
|
+// For instance, when getting all the elements of a set (e.g., when checking some constraint), or iterating over a dictionary.
|
|
|
+export class ReadAllOutgoing extends PrimitiveDelta {
|
|
|
+ // Dependencies:
|
|
|
+ readonly node: NodeCreation;
|
|
|
+ readonly after: readonly NewEdge[];
|
|
|
+
|
|
|
+ constructor(hash: Buffer, node: NodeCreation, after: NewEdge[]) {
|
|
|
+ super(hash, `R*(${node.id})`);
|
|
|
+ this.node = node;
|
|
|
+ this.after = after;
|
|
|
+ // Register inverse dependencies:
|
|
|
+ node.registerReadAllOutgoing(this);
|
|
|
+ }
|
|
|
+
|
|
|
+ getDependencies(): [Delta, string][] {
|
|
|
+ return [
|
|
|
+ [this.node, "N"],
|
|
|
+ ...([] as [Delta,string][]).concat(...this.after.map(a => a.overwrittenBy.map(u => [u, "A"] as [Delta, string]))),
|
|
|
+ ];
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
export class NodeDeletion extends PrimitiveDelta {
|
|
|
// Dependencies:
|
|
|
node: NodeCreation;
|
|
|
@@ -147,9 +193,9 @@ export class NodeDeletion extends PrimitiveDelta {
|
|
|
serialize(): any {
|
|
|
return {
|
|
|
...super.serialize(),
|
|
|
- node: this.node.hash.toString('base64'),
|
|
|
- afterSrc: this.afterSrc.map(u => u.hash.toString('base64')),
|
|
|
- afterTgt: this.afterTgt.map(u => u.hash.toString('base64')),
|
|
|
+ node: this.node.hash.toString('hex'),
|
|
|
+ afterSrc: this.afterSrc.map(u => u.hash.toString('hex')),
|
|
|
+ afterTgt: this.afterTgt.map(u => u.hash.toString('hex')),
|
|
|
};
|
|
|
}
|
|
|
}
|
|
|
@@ -160,8 +206,8 @@ export abstract class Edge {
|
|
|
readonly label: string;
|
|
|
|
|
|
// Inverse dependencies
|
|
|
- readonly concurrentlyWrittenBy: EdgeUpdate[] = [];
|
|
|
- readonly concurrentlyReadBy: EdgeUpdate[] = [];
|
|
|
+ readonly overwrittenBy: EdgeUpdate[] = [];
|
|
|
+ readonly readBy: EdgeUpdate[] = [];
|
|
|
|
|
|
constructor(source: NodeCreation, label: string) {
|
|
|
this.source = source;
|
|
|
@@ -174,7 +220,7 @@ export abstract class Edge {
|
|
|
// Iterate over all overwriters, from early to late, depth-first.
|
|
|
// When in some branch the 'condition' is satisfied, the satisfying element is yielded, and the descendants ignored.
|
|
|
*iterOverwriters(filter: (u: EdgeUpdate) => boolean): Iterable<EdgeUpdate> {
|
|
|
- for (const ovr of this.concurrentlyWrittenBy) {
|
|
|
+ for (const ovr of this.overwrittenBy) {
|
|
|
if (filter(ovr)) {
|
|
|
yield ovr;
|
|
|
}
|
|
|
@@ -185,37 +231,42 @@ export abstract class Edge {
|
|
|
}
|
|
|
|
|
|
registerWrite(write: EdgeUpdate) {
|
|
|
- for (const other of this.concurrentlyWrittenBy) {
|
|
|
+ for (const other of this.overwrittenBy) {
|
|
|
// A write conflicts with all other writes:
|
|
|
registerConflict(write, other, "U/U");
|
|
|
}
|
|
|
- for (const read of this.concurrentlyReadBy) {
|
|
|
+ for (const read of this.readBy) {
|
|
|
if (read !== write) {
|
|
|
// A write conflicts with all reads:
|
|
|
- registerConflict(read, write, "U/R");
|
|
|
+ registerConflict(read, write, "R/U");
|
|
|
}
|
|
|
}
|
|
|
- this.concurrentlyWrittenBy.push(write);
|
|
|
+ this.overwrittenBy.push(write);
|
|
|
|
|
|
// Also check conflicts with deletions of source:
|
|
|
this.source.registerOutgoingEdge(write);
|
|
|
}
|
|
|
|
|
|
registerRead(read: EdgeUpdate) {
|
|
|
- for (const write of this.concurrentlyWrittenBy) {
|
|
|
+ for (const write of this.overwrittenBy) {
|
|
|
if (read !== write) {
|
|
|
// A read conflicts with all writes:
|
|
|
- registerConflict(read, write, "U/R");
|
|
|
+ registerConflict(read, write, "R/U");
|
|
|
}
|
|
|
}
|
|
|
- this.concurrentlyReadBy.push(read);
|
|
|
+ this.readBy.push(read);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// An Edge that does not yet have a target
|
|
|
export class NewEdge extends Edge {
|
|
|
- constructor(source: NodeCreation, label: string) {
|
|
|
+ readonly after: readonly ReadAllOutgoing[];
|
|
|
+ readonly edgeId: string;
|
|
|
+
|
|
|
+ constructor(source: NodeCreation, label: string, after: readonly ReadAllOutgoing[], edgeId: string) {
|
|
|
super(source, label);
|
|
|
+ this.after = after;
|
|
|
+ this.edgeId = edgeId;
|
|
|
}
|
|
|
|
|
|
getDependencies(): [Delta,string][] {
|
|
|
@@ -225,7 +276,7 @@ export class NewEdge extends Edge {
|
|
|
serialize() {
|
|
|
return {
|
|
|
type: "NewEdge",
|
|
|
- source: this.source.hash.toString('base64'),
|
|
|
+ source: this.source.hash.toString('hex'),
|
|
|
label: this.label,
|
|
|
};
|
|
|
}
|
|
|
@@ -247,7 +298,7 @@ export class ExistingEdge extends Edge {
|
|
|
serialize() {
|
|
|
return {
|
|
|
type: "ExistingEdge",
|
|
|
- overwrites: this.delta.hash.toString('base64'),
|
|
|
+ overwrites: this.delta.hash.toString('hex'),
|
|
|
};
|
|
|
}
|
|
|
}
|
|
|
@@ -273,7 +324,7 @@ export class TargetNode implements Target {
|
|
|
return [[this.value, "TGT"]];
|
|
|
}
|
|
|
serialize() {
|
|
|
- return {type: "TargetNode", node: this.value.hash.toString('base64')};
|
|
|
+ return {type: "TargetNode", node: this.value.hash.toString('hex')};
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -321,8 +372,14 @@ export class EdgeUpdate extends PrimitiveDelta {
|
|
|
return this.overwritable;
|
|
|
}
|
|
|
|
|
|
+ // Makes code slightly easier to read
|
|
|
+ read() {
|
|
|
+ return this.overwritable;
|
|
|
+ }
|
|
|
+
|
|
|
getDependencies(): [Delta,string][] {
|
|
|
return this.overwrites.getDependencies()
|
|
|
+ .concat(...this.reads.map(r => r.getDependencies()))
|
|
|
.concat(this.target.getDependencies());
|
|
|
}
|
|
|
|
|
|
@@ -336,6 +393,30 @@ export class EdgeUpdate extends PrimitiveDelta {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+// export class EdgeRead extends PrimitiveDelta {
|
|
|
+// // Every read has a unique ID (otherwise, different concurrent reads would be represented by the same delta)
|
|
|
+// readonly id: UUID;
|
|
|
+
|
|
|
+// // Dependencies
|
|
|
+// readonly reads: ExistingEdge;
|
|
|
+
|
|
|
+// constructor(hash: Buffer, id: UUID, reads: ExistingEdge) {
|
|
|
+// super(hash, `R(${reads.label})`);
|
|
|
+// // Record our own dependencies:
|
|
|
+// this.reads = reads;
|
|
|
+
|
|
|
+// reads.registerRead(this);
|
|
|
+// }
|
|
|
+
|
|
|
+// serialize(): any {
|
|
|
+// return {
|
|
|
+// ...super.serialize(),
|
|
|
+// id: this.id,
|
|
|
+// reads: this.reads.map(r => r.serialize()),
|
|
|
+// };
|
|
|
+// }
|
|
|
+// }
|
|
|
+
|
|
|
export class Transaction extends Delta {
|
|
|
readonly deltas: Delta[];
|
|
|
readonly dependencies: Transaction[];
|
|
|
@@ -376,9 +457,9 @@ export class Transaction extends Delta {
|
|
|
serialize(): any {
|
|
|
return {
|
|
|
...super.serialize(),
|
|
|
- deltas: this.deltas.map(d => d.hash.toString('base64')),
|
|
|
+ deltas: this.deltas.map(d => d.hash.toString('hex')),
|
|
|
description: this.description,
|
|
|
- dependencies: this.dependencies.map(d => d.hash.toString('base64')),
|
|
|
+ dependencies: this.dependencies.map(d => d.hash.toString('hex')),
|
|
|
};
|
|
|
}
|
|
|
|