// NodeJS script const fs = require('fs/promises'); const fsConstants = require('fs').constants; const path = require('path'); const {EOL} = require('os'); const http = require('http'); const handler = require('serve-handler'); const IDLENGTH = 36; // UUID v4 const port = process.env.DRAWIOPORT || 8700; const stateDir = process.env.DRAWIOSTATEDIR || "."; async function startServer() { const opsDir = path.join(stateDir, 'ops'); const sessionDir = path.join(stateDir, 'sessions'); try { process.stdout.write("Can read/write in directory '" + opsDir + "' ? ..."); await fs.access(opsDir, fsConstants.R_OK | fsConstants.W_OK); process.stdout.write("OK" + EOL); process.stdout.write("Can read/write in directory '" + sessionDir + "' ? ..."); await fs.access(sessionDir, fsConstants.R_OK | fsConstants.W_OK); process.stdout.write("OK" + EOL); } catch (e) { process.stdout.write(EOL + "Please make sure the following directories exist and are writable:" + EOL); process.stdout.write(" " + opsDir + EOL); process.stdout.write(" " + sessionDir + EOL); process.exit(1); } const { WebSocketServer } = require('ws'); const { v4: uuidv4 } = require("uuid"); function asyncSleep(ms) { return new Promise(resolve => setTimeout(resolve, ms)); } const httpServer = http.createServer((request, response) => { // You pass two more arguments for config and middleware // More details here: https://github.com/vercel/serve-handler#options console.log(request.method, request.url) return handler(request, response); }); const wsServer = new WebSocketServer({ server: httpServer, path: "/websocket", }); // Provides async reads and writes to files, while guaranteeing that for every file, a read always reads the latest write. class QueuedIO { constructor(path) { this.path = path; this.queues = new Map(); } _queuedIO(f, filename, ...args) { const lastIO = this.queues.get(filename) || Promise.resolve(); const nextIO = lastIO.then(() => f(path.join(this.path, filename), ...args)); this.queues.set(filename, nextIO.catch(e => {/*console.log("silently caught", e)*/})); return nextIO; } write(filename, data) { // console.log("writing file", filename); return this._queuedIO((path, data) => fs.writeFile(path, data, {encoding: 'utf8'}), filename, data); } append(filename, data) { // console.log("appending to file", filename); const result = this._queuedIO(fs.appendFile, filename, data); // result.then(() => console.log("success")).catch(e => console.log("COULD NOT APPEND", e)); return result; } stat(filename) { // console.log("statting file", filename); return this._queuedIO(fs.stat, filename); } read(filename) { // console.log("reading file", filename); return this._queuedIO(path => fs.readFile(path, {encoding:'utf8'}), filename); } writeJSON(filename, json) { // console.log("writeJSONing file", filename); return this.write(filename, JSON.stringify(json)); } readJSON(filename) { // console.log("readJSONing file", filename); return this.read(filename).then(JSON.parse); } } // non-volatile state const opsDB = new QueuedIO(opsDir); const sessionDB = new QueuedIO(sessionDir); class Session { constructor(id) { this.id = id; this.users = new Set(); } join(socket) { this.users.add(socket); } leave(socket) { this.users.delete(socket); } broadcast(json, skip) { if (json.type !== "update_cursor") { console.log("BROADCAST to", this.users.size-1, "subscribers...", json); } const stringified = JSON.stringify({type: "broadcast", sessionId: this.sessionId, msg: json}); this.users.forEach((_, socket) => { if (socket !== skip) { // don't echo try { socket.send(stringified); // forward incoming request to all } catch (e) { console.log("Error:", e, "Closing socket...") socket.close(); // in case of an error, it is the client's responsibility to re-subscribe and 'catch up'. } } }); } } // mapping of sessionId to Session const sessions = new Map(); function getSession(id) { let session = sessions.get(id); if (session === undefined) { session = new Session(id); sessions.set(id, session); } return session; } const lockedCellsGlobal = new Set(); wsServer.on('connection', (socket, req) => { const reqUrl = new URL(req.url, req.headers.origin); const myId = reqUrl.searchParams.get('me'); console.log("Client connected.", myId); socket.sendJSON = function(json) { if (json.type !== "pong") { console.log("<-", json); } this.send(JSON.stringify(json)); } function commonHandler(req) { if (req.type === "ping") { socket.sendJSON({type: "pong"}); return true; } } class LeftHandler { async handle(req) { console.log("LeftHandler", req.type); if (commonHandler(req)) { console.log("PING"); return this; } else if (req.type === "join") { const { reqId, sessionId, userId } = req; // Reply with all operations in session const session = await sessionDB.read(sessionId).catch(e => { if (e.code === 'ENOENT') { return ""; // Act as if session exists and is empty } else { throw e; } }); const opIds = new Array(session.length / IDLENGTH); for (let i=0; i ({id, detail: await opsDB.readJSON(id)}))); socket.sendJSON({type: "ack", reqId, ops}); this.close(); return new JoinedHandler(sessionId); } else if (req.type === "new_share") { console.log("HEY"); const { reqId, ops } = req; const sessionId = uuidv4(); await Promise.all(ops.map(({id, detail}) => opsDB.writeJSON(id, detail))); await sessionDB.write(sessionId, ops.map(op=>op.id).join('')); socket.sendJSON({type: "ack", reqId, sessionId}); this.close(); return new JoinedHandler(sessionId); } else { console.log("No handler for request:", req.type); return this; } } close() {} } class JoinedHandler { constructor(sessionId) { this.session = getSession(sessionId); this.lockedCells = new Set(); this.session.join(socket); } async handle(req) { if (commonHandler(req)) { return this; } else if (req.type === "broadcast") { const { msg } = req; this.session.broadcast(msg, socket); } else if (req.type === "leave") { const { reqId } = req; socket.sendJSON({type: "ack", reqId}); this.close(); return new LeftHandler(); } else if (req.type === "new_edit") { const { reqId, op: {id, detail} } = req; await opsDB.writeJSON(id, detail); await sessionDB.append(this.session.id, id); // Creates file if it doesn't exist yet // Best effort broadcast to all subscribers this.session.broadcast({type: "pub_edit", op: {id, detail}}, socket); // Send ACK socket.sendJSON({type: "ack", reqId}); } else if (req.type === "request_lock") { const { reqId, cellIds } = req; let canLock = true; for (const cellId of cellIds) { if (lockedCellsGlobal.has(cellId)) { canLock = false; break; } } if (canLock) { for (const cellId of cellIds) { lockedCellsGlobal.add(cellId); this.lockedCells.add(cellId); } } socket.sendJSON({ type: "ack", reqId, success: canLock }); } else if (req.type === "release_lock") { const { reqId, cellIds } = req; for (const cellId of cellIds) { if (this.lockedCells.has(cellId)) { lockedCellsGlobal.delete(cellId); this.lockedCells.delete(cellId); } } } else { console.log("No handler for request:", req.type); } return this; } close() { this.session.leave(socket); this.session.broadcast({type: "update_selection", userId: myId, selectedIds: []}) } } let handlerPromise = Promise.resolve(new LeftHandler()); function queuedHandler(req) { handlerPromise = handlerPromise.then(async handler => { try { return await handler.handle(req); } catch (e) { console.log("Error handling", req, e); return handler; } }); } socket.on('message', function(data) { const req = JSON.parse(data); if (req.type !== "ping" && !(req.type === "broadcast" && req.msg.type === "update_cursor")) { console.log("->", req); } queuedHandler(req); }); socket.on('close', function() { handlerPromise.then(h => h.close()); console.log("Client disconnected."); }) }); httpServer.listen(port); console.log("Listening on", port); } startServer().catch(e => { console.log("Error in startServer:", e); process.exit(1); });