| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219 |
- // NodeJS script
- const fs = require('fs/promises');
- const fsConstants = require('fs').constants;
- const path = require('path');
- const {EOL} = require('os');
- const http = require('http');
- const handler = require('serve-handler');
- const IDLENGTH = 36; // UUID v4
- const port = process.env.DRAWIOPORT || 8700;
- const stateDir = process.env.DRAWIOSTATEDIR || ".";
- async function startServer() {
- const opsDir = path.join(stateDir, 'ops');
- const sessionDir = path.join(stateDir, 'sessions');
- try {
- process.stdout.write("Can read/write in directory '" + opsDir + "' ? ...");
- await fs.access(opsDir, fsConstants.R_OK | fsConstants.W_OK);
- process.stdout.write("OK" + EOL);
- process.stdout.write("Can read/write in directory '" + sessionDir + "' ? ...");
- await fs.access(sessionDir, fsConstants.R_OK | fsConstants.W_OK);
- process.stdout.write("OK" + EOL);
- } catch (e) {
- process.stdout.write(EOL + "Please make sure the following directories exist and are writable:" + EOL);
- process.stdout.write(" " + opsDir + EOL);
- process.stdout.write(" " + sessionDir + EOL);
- process.exit(1);
- }
- const { WebSocketServer } = require('ws');
- const { v4: uuidv4 } = require("uuid");
- function asyncSleep(ms) {
- return new Promise(resolve => setTimeout(resolve, ms));
- }
- const httpServer = http.createServer((request, response) => {
- // You pass two more arguments for config and middleware
- // More details here: https://github.com/vercel/serve-handler#options
- console.log(request.method, request.url)
- return handler(request, response);
- });
- const wsServer = new WebSocketServer({
- server: httpServer,
- path: "/websocket",
- });
- // Provides async reads and writes to files, while guaranteeing that for every file, a read always reads the latest write.
- class QueuedIO {
- constructor(path) {
- this.path = path;
- this.queues = new Map();
- }
- _queuedIO(f, filename, ...args) {
- const lastIO = this.queues.get(filename) || Promise.resolve();
- const nextIO = lastIO.then(() => f(path.join(this.path, filename), ...args));
- this.queues.set(filename, nextIO.catch(e => {/*console.log("silently caught", e)*/}));
- return nextIO;
- }
- write(filename, data) {
- // console.log("writing file", filename);
- return this._queuedIO((path, data) => fs.writeFile(path, data, {encoding: 'utf8'}), filename, data);
- }
- append(filename, data) {
- // console.log("appending to file", filename);
- const result = this._queuedIO(fs.appendFile, filename, data);
- // result.then(() => console.log("success")).catch(e => console.log("COULD NOT APPEND", e));
- return result;
- }
- stat(filename) {
- // console.log("statting file", filename);
- return this._queuedIO(fs.stat, filename);
- }
- read(filename) {
- // console.log("reading file", filename);
- return this._queuedIO(path => fs.readFile(path, {encoding:'utf8'}), filename);
- }
- writeJSON(filename, json) {
- // console.log("writeJSONing file", filename);
- return this.write(filename, JSON.stringify(json));
- }
- readJSON(filename) {
- // console.log("readJSONing file", filename);
- return this.read(filename).then(JSON.parse);
- }
- }
- const opsDB = new QueuedIO(opsDir);
- const sessionDB = new QueuedIO(sessionDir);
- // mapping of modelId to set of sockets
- const subscriptions = new Map();
- wsServer.on('connection', function(socket) {
- console.log("Client connected.")
- const mySubscriptions = new Set();
- function join(sessionId) {
- const subbedSockets = subscriptions.get(sessionId);
- if (subbedSockets) {
- subbedSockets.add(socket);
- } else {
- subscriptions.set(sessionId, new Set([socket]));
- }
- mySubscriptions.add(sessionId);
- }
- function leave(sessionId) {
- const subbedSockets = subscriptions.get(sessionId);
- if (subbedSockets) {
- subbedSockets.delete(socket);
- }
- mySubscriptions.delete(sessionId);
- }
- function leaveAll() {
- mySubscriptions.forEach(leave);
- }
- async function handle(req) {
- if (req.type === "ping") {
- return {type: "pong"};
- }
- else {
- console.log("-> ", req);
- if (req.type === "join") {
- const { reqId, sessionId } = req;
- leaveAll(); // a client can only join 1 session at a time
- join(sessionId);
- // Reply with all operations in session
- const session = await sessionDB.read(sessionId).catch(e => {
- if (e.code === 'ENOENT') {
- return ""; // Act as if session exists and is empty
- } else {
- throw e;
- }
- });
- const opIds = new Array(session.length / IDLENGTH);
- for (let i=0; i<opIds.length; i+=1) {
- const offset = i * IDLENGTH;
- opIds[i] = session.slice(offset, offset + IDLENGTH);
- }
- const ops = await Promise.all(opIds.map(async id => ({id, detail: await opsDB.readJSON(id)})));
- return {type: "ack", reqId, ops};
- }
- else if (req.type === "leave") {
- const { reqId } = req;
- leaveAll();
- return {type: "ack", reqId};
- }
- else if (req.type === "new_share") {
- const { reqId, ops } = req;
- const sessionId = uuidv4();
- leaveAll();
- join(sessionId);
- await Promise.all(ops.map(({id, detail}) => opsDB.writeJSON(id, detail)));
- await sessionDB.write(sessionId, ops.map(op=>op.id).join(''));
- return {type: "ack", reqId, sessionId};
- }
- else if (req.type === "new_edit") {
- const { reqId, sessionId, op: {id, detail} } = req;
- await opsDB.writeJSON(id, detail);
- await sessionDB.append(sessionId, id); // Creates file if it doesn't exist yet
- // Best effort broadcast to all subscribers
- (async () => {
- const subbedSockets = subscriptions.get(sessionId) || new Set();
- const broadcastMsg = {type:"pub_edit", sessionId, op: {id, detail}};
- console.log("BROADCAST to", subbedSockets.size-1,"subscribers...", broadcastMsg);
- const stringified = JSON.stringify(broadcastMsg);
- // await asyncSleep(3000);
- subbedSockets.forEach(subbed => {
- if (subbed !== socket) { // don't echo
- try {
- subbed.send(stringified); // forward incoming request to all
- } catch (e) {
- console.log("Error:", e, "Closing socket...")
- subbed.close(); // in case of an error, it is the client's responsibility to re-subscribe and 'catch up'.
- }
- }
- });
- })();
- return {type: "ack", reqId};
- }
- }
- }
- socket.on('message', function(data) {
- const req = JSON.parse(data);
- handle(req).then(json => {
- if (json.type !== 'pong')
- console.log("<- ", json);
- socket.send(JSON.stringify(json));
- }).catch(e => {
- console.log("Error handling", req, e);
- })
- });
- socket.on('close', function() {
- leaveAll();
- console.log("Client disconnected.");
- })
- });
- httpServer.listen(port);
- console.log("Listening on", port);
- }
- startServer().catch(e => {
- console.log("Error in startServer:", e);
- process.exit(1);
- });
|