123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313 |
- // NodeJS script
- const fs = require('fs/promises');
- const fsConstants = require('fs').constants;
- const path = require('path');
- const {EOL} = require('os');
- const http = require('http');
- const handler = require('serve-handler');
- const IDLENGTH = 36; // UUID v4
- const port = process.env.DRAWIOPORT || 8700;
- const stateDir = process.env.DRAWIOSTATEDIR || ".";
- async function startServer() {
- const opsDir = path.join(stateDir, 'ops');
- const sessionDir = path.join(stateDir, 'sessions');
- try {
- process.stdout.write("Can read/write in directory '" + opsDir + "' ? ...");
- await fs.access(opsDir, fsConstants.R_OK | fsConstants.W_OK);
- process.stdout.write("OK" + EOL);
- process.stdout.write("Can read/write in directory '" + sessionDir + "' ? ...");
- await fs.access(sessionDir, fsConstants.R_OK | fsConstants.W_OK);
- process.stdout.write("OK" + EOL);
- } catch (e) {
- process.stdout.write(EOL + "Please make sure the following directories exist and are writable:" + EOL);
- process.stdout.write(" " + opsDir + EOL);
- process.stdout.write(" " + sessionDir + EOL);
- process.exit(1);
- }
- const { WebSocketServer } = require('ws');
- const { v4: uuidv4 } = require("uuid");
- function asyncSleep(ms) {
- return new Promise(resolve => setTimeout(resolve, ms));
- }
- const httpServer = http.createServer((request, response) => {
- // You pass two more arguments for config and middleware
- // More details here: https://github.com/vercel/serve-handler#options
- console.log(request.method, request.url)
- return handler(request, response);
- });
- const wsServer = new WebSocketServer({
- server: httpServer,
- path: "/websocket",
- });
- // Provides async reads and writes to files, while guaranteeing that for every file, a read always reads the latest write.
- class QueuedIO {
- constructor(path) {
- this.path = path;
- this.queues = new Map();
- }
- _queuedIO(f, filename, ...args) {
- const lastIO = this.queues.get(filename) || Promise.resolve();
- const nextIO = lastIO.then(() => f(path.join(this.path, filename), ...args));
- this.queues.set(filename, nextIO.catch(e => {/*console.log("silently caught", e)*/}));
- return nextIO;
- }
- write(filename, data) {
- // console.log("writing file", filename);
- return this._queuedIO((path, data) => fs.writeFile(path, data, {encoding: 'utf8'}), filename, data);
- }
- append(filename, data) {
- // console.log("appending to file", filename);
- const result = this._queuedIO(fs.appendFile, filename, data);
- // result.then(() => console.log("success")).catch(e => console.log("COULD NOT APPEND", e));
- return result;
- }
- stat(filename) {
- // console.log("statting file", filename);
- return this._queuedIO(fs.stat, filename);
- }
- read(filename) {
- // console.log("reading file", filename);
- return this._queuedIO(path => fs.readFile(path, {encoding:'utf8'}), filename);
- }
- writeJSON(filename, json) {
- // console.log("writeJSONing file", filename);
- return this.write(filename, JSON.stringify(json));
- }
- readJSON(filename) {
- // console.log("readJSONing file", filename);
- return this.read(filename).then(JSON.parse);
- }
- }
- // non-volatile state
- const opsDB = new QueuedIO(opsDir);
- const sessionDB = new QueuedIO(sessionDir);
- class Session {
- constructor(id) {
- this.id = id;
- this.users = new Set();
- }
- join(socket) {
- this.users.add(socket);
- }
- leave(socket) {
- this.users.delete(socket);
- }
- broadcast(json, skip) {
- if (json.type !== "update_cursor") {
- console.log("BROADCAST to", this.users.size-1, "subscribers...", json);
- }
- const stringified = JSON.stringify({type: "broadcast", sessionId: this.sessionId, msg: json});
- this.users.forEach((_, socket) => {
- if (socket !== skip) { // don't echo
- try {
- socket.send(stringified); // forward incoming request to all
- } catch (e) {
- console.log("Error:", e, "Closing socket...")
- socket.close(); // in case of an error, it is the client's responsibility to re-subscribe and 'catch up'.
- }
- }
- });
- }
- }
- // mapping of sessionId to Session
- const sessions = new Map();
- function getSession(id) {
- let session = sessions.get(id);
- if (session === undefined) {
- session = new Session(id);
- sessions.set(id, session);
- }
- return session;
- }
- const lockedCellsGlobal = new Set();
- wsServer.on('connection', (socket, req) => {
- const reqUrl = new URL(req.url, req.headers.origin);
- const myId = reqUrl.searchParams.get('me');
- console.log("Client connected.", myId);
- socket.sendJSON = function(json) {
- if (json.type !== "pong") {
- console.log("<-", json);
- }
- this.send(JSON.stringify(json));
- }
- function commonHandler(req) {
- if (req.type === "ping") {
- socket.sendJSON({type: "pong"});
- return true;
- }
- }
- class LeftHandler {
- async handle(req) {
- console.log("LeftHandler", req.type);
- if (commonHandler(req)) {
- console.log("PING");
- return this;
- }
- else if (req.type === "join") {
- const { reqId, sessionId, userId } = req;
- // Reply with all operations in session
- const session = await sessionDB.read(sessionId).catch(e => {
- if (e.code === 'ENOENT') {
- return ""; // Act as if session exists and is empty
- } else {
- throw e;
- }
- });
- const opIds = new Array(session.length / IDLENGTH);
- for (let i=0; i<opIds.length; i+=1) {
- const offset = i * IDLENGTH;
- opIds[i] = session.slice(offset, offset + IDLENGTH);
- }
- const ops = await Promise.all(opIds.map(async id => ({id, detail: await opsDB.readJSON(id)})));
- socket.sendJSON({type: "ack", reqId, ops});
- this.close();
- return new JoinedHandler(sessionId);
- }
- else if (req.type === "new_share") {
- console.log("HEY");
- const { reqId, ops } = req;
- const sessionId = uuidv4();
- await Promise.all(ops.map(({id, detail}) => opsDB.writeJSON(id, detail)));
- await sessionDB.write(sessionId, ops.map(op=>op.id).join(''));
- socket.sendJSON({type: "ack", reqId, sessionId});
- this.close();
- return new JoinedHandler(sessionId);
- }
- else {
- console.log("No handler for request:", req.type);
- return this;
- }
- }
- close() {}
- }
- class JoinedHandler {
- constructor(sessionId) {
- this.session = getSession(sessionId);
- this.lockedCells = new Set();
- this.session.join(socket);
- }
- async handle(req) {
- if (commonHandler(req)) {
- return this;
- }
- else if (req.type === "broadcast") {
- const { msg } = req;
- this.session.broadcast(msg, socket);
- }
- else if (req.type === "leave") {
- const { reqId } = req;
- socket.sendJSON({type: "ack", reqId});
- this.close();
- return new LeftHandler();
- }
- else if (req.type === "new_edit") {
- const { reqId, op: {id, detail} } = req;
- await opsDB.writeJSON(id, detail);
- await sessionDB.append(this.session.id, id); // Creates file if it doesn't exist yet
- // Best effort broadcast to all subscribers
- this.session.broadcast({type: "pub_edit", op: {id, detail}}, socket);
- // Send ACK
- socket.sendJSON({type: "ack", reqId});
- }
- else if (req.type === "request_lock") {
- const { reqId, cellIds } = req;
- let canLock = true;
- for (const cellId of cellIds) {
- if (lockedCellsGlobal.has(cellId)) {
- canLock = false;
- break;
- }
- }
- if (canLock) {
- for (const cellId of cellIds) {
- lockedCellsGlobal.add(cellId);
- this.lockedCells.add(cellId);
- }
- }
- socket.sendJSON({ type: "ack", reqId, success: canLock });
- }
- else if (req.type === "release_lock") {
- const { reqId, cellIds } = req;
- for (const cellId of cellIds) {
- if (this.lockedCells.has(cellId)) {
- lockedCellsGlobal.delete(cellId);
- this.lockedCells.delete(cellId);
- }
- }
- }
- else {
- console.log("No handler for request:", req.type);
- }
- return this;
- }
- close() {
- this.session.leave(socket);
- this.session.broadcast({type: "update_selection", userId: myId, selectedIds: []})
- }
- }
- let handlerPromise = Promise.resolve(new LeftHandler());
- function queuedHandler(req) {
- handlerPromise = handlerPromise.then(async handler => {
- try {
- return await handler.handle(req);
- } catch (e) {
- console.log("Error handling", req, e);
- return handler;
- }
- });
- }
- socket.on('message', function(data) {
- const req = JSON.parse(data);
- if (req.type !== "ping" && !(req.type === "broadcast" && req.msg.type === "update_cursor")) {
- console.log("->", req);
- }
- queuedHandler(req);
- });
- socket.on('close', function() {
- handlerPromise.then(h => h.close());
- console.log("Client disconnected.");
- })
- });
- httpServer.listen(port);
- console.log("Listening on", port);
- }
- startServer().catch(e => {
- console.log("Error in startServer:", e);
- process.exit(1);
- });
|