import {inspect} from "util"; // NodeJS library import {createHash} from "crypto"; import {Buffer} from "buffer"; import * as _ from "lodash"; import {UUID, PrimitiveValue} from "./types"; import {bufferXOR} from "./buffer_xor"; import {Delta} from "./delta"; export interface PrimitiveDelta extends Delta { } export class NodeCreation implements PrimitiveDelta { readonly id: UUID; readonly hash: Buffer; readonly description: string; // Inverse dependency: Deletions of this node. deletions: Array = []; // append-only // Inverse dependency: Creation outgoing edges. outgoingEdges: Array = []; // append-only // Inverse dependency: All the times this node was the target of an edge update. incomingEdges: Array = []; // append-only constructor(hash: Buffer, id: UUID) { this.hash = hash; this.id = id; this.description = "NEW("+this.id.value.toString().slice(0,8)+")"; } getDependencies(): [] { return []; } getTypedDependencies(): [] { return []; } getConflicts(): [] { return []; } getHash(): Buffer { return this.hash; } getDescription(): string { return this.description; } // pretty print to console under NodeJS [inspect.custom](depth: number, options: object) { return "NodeCreation{" + inspect(this.id, options) + "}"; } toString(): string { return this[inspect.custom](0, {}); } serialize(): any { return { type: "NodeCreation", id: this.id.value, } } *iterPrimitiveDeltas(): Iterable { yield this; } } export class NodeDeletion implements PrimitiveDelta { readonly hash: Buffer; readonly description: string; // Dependency: The node being deleted. readonly creation: NodeCreation; // Dependency: All outgoing edges of the deleted node must be deleted also. readonly deletedOutgoingEdges: Array; // Dependency: For every time the deleted node was target of an edge, the deletion depends on the EdgeUpdate that sets this edge to have a different target. readonly afterIncomingEdges: Array; // Conflicts: Concurrent deletion of the same node. deleteConflicts: Array = []; // Conflicts: Concurrent creation of an edge with as source the deleted node. edgeSourceConflicts: Array = []; // Conflicts: Concurrent update and deletion of an edge (because its source node is deleted). updateConflicts: Array = []; // Conflicts: Concurrent creation/update of an edge with as target the deleted node. edgeTargetConflicts: Array = []; // Parameters: // deletedOutgoingEdges: For every outgoing edge of this node being deleted, must explicitly specify the most recent EdgeCreation/EdgeUpdate on this edge, to make it explicit that this deletion happens AFTER the EdgeCreation/EdgeUpdate (instead of concurrently, which is a conflict). // afterIncomingEdges: For every edge that is or was (once) incoming to this node, must explicitly specify an EdgeUpdate/NodeDeletion that makes this edge point somewhere else (no longer to this node). constructor(hash: Buffer, creation: NodeCreation, deletedOutgoingEdges: Array, afterIncomingEdges: Array) { this.hash = hash; this.creation = creation; this.deletedOutgoingEdges = deletedOutgoingEdges; this.afterIncomingEdges = afterIncomingEdges; // Check some assertions if (_.uniq(deletedOutgoingEdges).length !== deletedOutgoingEdges.length) { throw new Error("Assertion failed: deletedOutgoingEdges contains duplicates."); } if (_.uniq(afterIncomingEdges).length !== afterIncomingEdges.length) { throw new Error("Assertion failed: deletedOutgoingEdges contains duplicates."); } for (const supposedlyOutgoingEdge of this.deletedOutgoingEdges) { if (supposedlyOutgoingEdge.getCreation().source !== this.creation) { throw new Error("Assertion failed: Every element of delOutgoings must be an EdgeCreation or EdgeUpdate of an outgoing edge of the deleted node.") } } for (const supposedlyIncomingEdge of this.afterIncomingEdges) { if (supposedlyIncomingEdge instanceof NodeDeletion) { // should check if the NodeDeletion deletes an edge that was *once* let isReallyIncomingEdge = false; for (const deletedEdge of supposedlyIncomingEdge.deletedOutgoingEdges) { let current: EdgeCreation|EdgeUpdate|undefined = deletedEdge; while (current !== undefined) { if (current.target.getTarget() === this.creation) { isReallyIncomingEdge = true; break; } if (current instanceof EdgeUpdate) current = current.overwrites else break; } if (isReallyIncomingEdge) break; } if (!isReallyIncomingEdge) { throw new Error("Assertion failed: NodeDeletion in afterIncomingEdges does not delete an incoming edge."); } } else { if (supposedlyIncomingEdge.target.getTarget() === this.creation) { throw new Error("Assertion failed: Every element in afterIncomingEdges MUST set the target of the edge to SOME PLACE ELSE."); } if (![...supposedlyIncomingEdge.iterUpdates()].some(e => e.target.getTarget() === this.creation)) { throw new Error("Assertion failed: None of the EdgeUpdates of supposedlyIncomingEdge set the target to the node being deleted."); } } } this.description = "DEL("+this.creation.id.value.toString().slice(0,8)+")" // Detect conflicts // Delete/delete for (const concurrentDeletion of this.creation.deletions) { // Symmetric: this.deleteConflicts.push(concurrentDeletion); concurrentDeletion.deleteConflicts.push(this); } // Concurrently created outgoing edges of this node for (const outgoingEdgeCreation of this.creation.outgoingEdges) { if (!this.deletedOutgoingEdges.some(edge => edge.getCreation() === outgoingEdgeCreation)) { // Conflict: The deleted node has an outgoing edge that this deletion does not depend on. // Symmetric this.edgeSourceConflicts.push(outgoingEdgeCreation); outgoingEdgeCreation.deleteSourceConflicts.push(this); } } // Related to previous conflict type: Concurrent edge updates for (const deletedEdge of this.deletedOutgoingEdges) { for (const concurrentEdgeUpdate of deletedEdge.overwrittenBy) { if (this.afterIncomingEdges.includes(concurrentEdgeUpdate)) { // This is a special case that can occur when a node with a self-edge is deleted. // Not a conflict. } else { // Conflict: Edge concurrently updated and deleted. // Symmetric this.updateConflicts.push(concurrentEdgeUpdate); concurrentEdgeUpdate.updateConflicts.push(this); } } } function overwritesEdge(op: EdgeCreation | EdgeUpdate | NodeDeletion, edge: EdgeUpdate | EdgeCreation) { if (op === edge) { return true; } if (op instanceof EdgeUpdate) { return overwritesEdge(op.overwrites, edge); } if (op instanceof NodeDeletion) { return op.deletedOutgoingEdges.some(deletedEdge => overwritesEdge(deletedEdge, edge)); } return false; } // Concurrently updated incoming edges of this node for (const incomingEdge of this.creation.incomingEdges) { // every incoming edge of deleted node must have been overwritten by an EdgeUpdate that is an explicit dependency: if (!this.afterIncomingEdges.some(edge => overwritesEdge(edge, incomingEdge))) { // Symmetric this.edgeTargetConflicts.push(incomingEdge); incomingEdge.target.addDeleteTargetConflict(this); } } // Create inverse dependencies this.creation.deletions.push(this); for (const deletedEdge of this.deletedOutgoingEdges) { // NodeDeletion acts a bit as an EdgeUpdate here deletedEdge.overwrittenBy.push(this); } } getDependencies(): Array { return Array().concat( [this.creation], this.deletedOutgoingEdges, this.afterIncomingEdges, ); } getTypedDependencies(): Array<[Delta, string]> { return Array<[Delta, string]>().concat( [[this.creation, "DEL"]], this.deletedOutgoingEdges.map(edge => ([edge, "D"])), this.afterIncomingEdges.map(edge => ([edge, "A"])), ); } getConflicts(): Array { return Array().concat( this.deleteConflicts, this.edgeSourceConflicts, this.edgeTargetConflicts, this.updateConflicts, ); } getHash(): Buffer { return this.hash; } getDescription(): string { return this.description; } // pretty print to console under NodeJS [inspect.custom](depth: number, options: object) { return "NodeDeletion{" + inspect(this.creation.id, options) + ",delEdges=" + this.deletedOutgoingEdges.map(e => inspect(e, options)).join(",") + ",after=" + this.afterIncomingEdges.map(e => inspect(e, options)).join(",") + "}"; } toString(): string { return this[inspect.custom](0, {}); } serialize(): any { return { type: "NodeDeletion", creation: this.creation.hash.toString('base64'), deletedOutgoingEdges: this.deletedOutgoingEdges.map(d => d.hash.toString('base64')), afterIncomingEdges: this.afterIncomingEdges.map(d => d.hash.toString('base64')), }; } *iterPrimitiveDeltas(): Iterable { yield this; } } // Target of an edge can be: another node, nothing (edge doesn't exist) or a value (i.e., string, number or boolean) export type EdgeTargetType = NodeCreation | null | PrimitiveValue; // Target of an edge can be either: (1) another node, (2) a value or (3) null (hides the edge - initially all edges are assumed to be null). export interface SetsTarget { getTarget(): EdgeTargetType; addDeleteTargetConflict(nodeDeletion: NodeDeletion); getDeleteTargetConflicts(): ReadonlyArray; getDependencies(): ReadonlyArray; getTypedDependencies(): ReadonlyArray<[NodeCreation, string]>; getHash(): Buffer; serialize(): any; } // Common functionality in EdgeCreation and EdgeUpdate: both set the target of an edge, and this can conflict with the deletion of the target. class SetsTargetToNode implements SetsTarget { // Dependency private readonly targetNode: NodeCreation; // Conflict: Concurrent deletion of target node. private deleteTargetConflicts: Array = []; // append-only constructor(targetNode: NodeCreation, edgeOperation: EdgeCreation|EdgeUpdate) { this.targetNode = targetNode; // Concurrent deletion of target node if (this.targetNode instanceof NodeCreation) { for (const targetDeletion of this.targetNode.deletions) { if (targetDeletion.afterIncomingEdges.some(edge => { while (true) { if (edge === edgeOperation) return true; if (edge instanceof EdgeUpdate && edge.overwrites instanceof EdgeUpdate) edge = edge.overwrites; else return false; } })) { // this can never happen - something is very wrong if you get this error: throw new Error("Assertion failed - did not expect existing deletion to be aware of a new edge update"); } // Symmetric this.deleteTargetConflicts.push(targetDeletion); targetDeletion.edgeTargetConflicts.push(edgeOperation); } // Create inverse dependency this.targetNode.incomingEdges.push(edgeOperation); } } getTarget(): EdgeTargetType { return this.targetNode; } addDeleteTargetConflict(nodeDeletion: NodeDeletion) { this.deleteTargetConflicts.push(nodeDeletion); } getDeleteTargetConflicts(): ReadonlyArray { return this.deleteTargetConflicts; } getDependencies(): ReadonlyArray { return [this.targetNode]; } getTypedDependencies(): ReadonlyArray<[NodeCreation, string]> { return [[this.targetNode, "TGT"]]; } getHash(): Buffer { return this.targetNode.hash; } [inspect.custom](depth: number, options: object) { return inspect(this.targetNode.id, options); } serialize(): any { return { type: "node", creation: this.targetNode.hash.toString('base64'), } } } class SetsTargetToValue implements SetsTarget { readonly value: PrimitiveValue | null; constructor(value: PrimitiveValue | null) { this.value = value; } getTarget(): EdgeTargetType { return this.value; } addDeleteTargetConflict(nodeDeletion: NodeDeletion) { throw new Error("Assertion error: SetsTargetToValue cannot be involved in conflict with NodeDeletion"); } getDeleteTargetConflicts(): ReadonlyArray { return []; } getDependencies(): ReadonlyArray { return []; } getTypedDependencies(): ReadonlyArray<[NodeCreation, string]> { return []; } getHash(): Buffer { return Buffer.from(JSON.stringify(this.value)); } [inspect.custom](depth: number, options: object) { return inspect(this.value, options); } serialize(): any { return { type: "value", value: this.value, } } } function makeSetsTarget(target: EdgeTargetType, edgeOperation: EdgeCreation|EdgeUpdate) { if (target instanceof NodeCreation) { return new SetsTargetToNode(target, edgeOperation); } else { return new SetsTargetToValue(target); } } export class EdgeCreation implements PrimitiveDelta { // Dependencies readonly source: NodeCreation; readonly label: string; readonly target: SetsTarget; readonly hash: Buffer; readonly description: string; // Inverse dependency // NodeDeletion if source of edge is deleted. overwrittenBy: Array = []; // append-only // Conflicts: Concurrent creations of the same edge. createConflicts: Array = []; // append-only // Conflicts: Concurrent deletions of source node. deleteSourceConflicts: Array = []; // append-only constructor(hash: Buffer, source: NodeCreation, label: string, target: EdgeTargetType) { this.hash = hash; this.source = source; this.label = label; this.target = makeSetsTarget(target, this); this.description = "EDG("+this.label+")"; // Detect conflicts // Create/create for (const outgoingEdge of this.source.outgoingEdges) { if (outgoingEdge.label === this.label) { // Symmetric: this.createConflicts.push(outgoingEdge); outgoingEdge.createConflicts.push(this); } } // Concurrent deletions of source node for (const sourceDeletion of this.source.deletions) { if (sourceDeletion.deletedOutgoingEdges.some(edge => edge.getCreation() === this)) { // this can never happen - something is very wrong if you get this error: throw new Error("Assertion failed - did not expect existing deletion to be aware of a new edge creation"); } // Symmetric this.deleteSourceConflicts.push(sourceDeletion); sourceDeletion.edgeSourceConflicts.push(this); } // Create inverse dependency this.source.outgoingEdges.push(this); } // Helper getCreation(): EdgeCreation { return this; } getDependencies(): Array { return [this.source, ...this.target.getDependencies()]; } getTypedDependencies(): Array<[NodeCreation, string]> { return [[this.source, "SRC"], ...this.target.getTypedDependencies()]; } getConflicts(): Array { return Array().concat( this.createConflicts, this.deleteSourceConflicts, this.target.getDeleteTargetConflicts(), ); } getHash(): Buffer { return this.hash; } getDescription(): string { return this.description; } // pretty print to console under NodeJS [inspect.custom](depth: number, options: object) { return "EdgeCreation{src=" + inspect(this.source.id, options) + ",tgt=" + inspect(this.target, options) + ",label=" + this.label + "}"; } toString(): string { return this[inspect.custom](0, {}); } serialize(): any { return { type: "EdgeCreation", source: this.source.hash.toString('base64'), label: this.label, target: this.target.serialize(), }; } *iterPrimitiveDeltas(): Iterable { yield this; } } export class EdgeUpdate implements PrimitiveDelta { // Dependencies readonly overwrites: EdgeCreation | EdgeUpdate; readonly target: SetsTarget; readonly hash: Buffer; readonly description: string; // Inverse dependency // NodeDeletion if source of edge is deleted. overwrittenBy: Array = []; // append-only // Conflicts: Concurrent updates updateConflicts: Array = []; // append-only constructor(hash: Buffer, overwrites: EdgeCreation | EdgeUpdate, newTarget: EdgeTargetType) { this.hash = hash; this.overwrites = overwrites; this.target = makeSetsTarget(newTarget, this); this.description = "EDG"; // Detect conflicts // Concurrent updates (by EdgeUpdate or NodeDeletion) for (const concurrentUpdate of this.overwrites.overwrittenBy) { // Symmetric this.updateConflicts.push(concurrentUpdate); concurrentUpdate.updateConflicts.push(this); } // Create inverse dependency this.overwrites.overwrittenBy.push(this); } // Helper getCreation(): EdgeCreation { return this.overwrites.getCreation(); } getDependencies(): Array { return [this.overwrites, ...this.target.getDependencies()]; } getTypedDependencies(): Array<[Delta, string]> { return [[this.overwrites, "UPD"], ...this.target.getTypedDependencies()]; } getConflicts(): Array { return Array().concat( this.updateConflicts, this.target.getDeleteTargetConflicts(), ); } getHash(): Buffer { return this.hash; } getDescription(): string { return this.description; } // pretty print to console under NodeJS [inspect.custom](depth: number, options: object) { return "EdgeUpdate{upd=" + inspect(this.overwrites, options) + ",tgt=" + inspect(this.target, options) + "}"; } toString(): string { return this[inspect.custom](0, {}); } serialize(): any { return { type: "EdgeUpdate", overwrites: this.overwrites.hash.toString('base64'), target: this.target.serialize(), }; } *iterUpdates() { let current: EdgeUpdate | EdgeCreation = this; while (true) { yield current; if (current instanceof EdgeUpdate) { current = current.overwrites; } else { return; } } } *iterPrimitiveDeltas(): Iterable { yield this; } } function targetToHash(target: EdgeTargetType): any { // just needs to return something "unique" to feed into the hash function if (target instanceof NodeCreation) { return target.hash; } else { return "value" + JSON.stringify(target); } } // Ensures that deltas with the same hash are only created once. export class PrimitiveRegistry { deltas: Map = new Map(); private createIdempotent(hash: Buffer, callback) { const base64 = hash.toString('base64'); return this.deltas.get(base64) || (() => { const delta = callback(); this.deltas.set(base64, delta); return delta; })(); } newNodeCreation(id: UUID): NodeCreation { const hash = createHash('sha256') .update(JSON.stringify(id.value)) // prevent collisions between 'true' (actual boolean) and '"true"' (string "true"), or 42 (number) and "42" (string) .digest(); return this.createIdempotent(hash, () => new NodeCreation(hash, id)); } newNodeDeletion(creation: NodeCreation, deletedOutgoingEdges: Array, afterIncomingEdges: Array): NodeDeletion { // hash will be calculated based on all EdgeCreations/EdgeUpdates/NodeDeletions that we depend on, by XOR (insensitive to array order). let hash = createHash('sha256').update(creation.hash); let union = Buffer.alloc(32); // all zeroes - neutral element for XOR for (const deletedOutgoing of deletedOutgoingEdges) { union = bufferXOR(union, deletedOutgoing.getHash()); } for (const afterIncoming of afterIncomingEdges) { union = bufferXOR(union, afterIncoming.getHash()); } const buf = hash.update(union).digest(); return this.createIdempotent(buf, () => new NodeDeletion(buf, creation, deletedOutgoingEdges, afterIncomingEdges)); } newEdgeCreation(source: NodeCreation, label: string, target: EdgeTargetType): EdgeCreation { const hash = createHash('sha256') .update(source.hash) .update('create').update(label) .update('target=').update(targetToHash(target)) .digest(); return this.createIdempotent(hash, () => new EdgeCreation(hash, source, label, target)); } newEdgeUpdate(overwrites: EdgeCreation | EdgeUpdate, target: EdgeTargetType): EdgeUpdate { const hash = createHash('sha256') .update(overwrites.hash) .update('target=').update(targetToHash(target)) .digest(); return this.createIdempotent(hash, () => new EdgeUpdate(hash, overwrites, target)); } }