run_server.js 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. // NodeJS script
  2. const fs = require('fs/promises');
  3. const fsConstants = require('fs').constants;
  4. const path = require('path');
  5. const {EOL} = require('os');
  6. const http = require('http');
  7. const handler = require('serve-handler');
  8. const IDLENGTH = 36; // UUID v4
  9. const port = process.env.DRAWIOPORT || 8700;
  10. const stateDir = process.env.DRAWIOSTATEDIR || ".";
  11. async function startServer() {
  12. const opsDir = path.join(stateDir, 'ops');
  13. const sessionDir = path.join(stateDir, 'sessions');
  14. try {
  15. process.stdout.write("Can read/write in directory '" + opsDir + "' ? ...");
  16. await fs.access(opsDir, fsConstants.R_OK | fsConstants.W_OK);
  17. process.stdout.write("OK" + EOL);
  18. process.stdout.write("Can read/write in directory '" + sessionDir + "' ? ...");
  19. await fs.access(sessionDir, fsConstants.R_OK | fsConstants.W_OK);
  20. process.stdout.write("OK" + EOL);
  21. } catch (e) {
  22. process.stdout.write(EOL + "Please make sure the following directories exist and are writable:" + EOL);
  23. process.stdout.write(" " + opsDir + EOL);
  24. process.stdout.write(" " + sessionDir + EOL);
  25. process.exit(1);
  26. }
  27. const { WebSocketServer } = require('ws');
  28. const { v4: uuidv4 } = require("uuid");
  29. function asyncSleep(ms) {
  30. return new Promise(resolve => setTimeout(resolve, ms));
  31. }
  32. const httpServer = http.createServer((request, response) => {
  33. // You pass two more arguments for config and middleware
  34. // More details here: https://github.com/vercel/serve-handler#options
  35. console.log(request.method, request.url)
  36. return handler(request, response);
  37. });
  38. const wsServer = new WebSocketServer({
  39. server: httpServer,
  40. path: "/websocket",
  41. });
  42. // Provides async reads and writes to files, while guaranteeing that for every file, a read always reads the latest write.
  43. class QueuedIO {
  44. constructor(path) {
  45. this.path = path;
  46. this.queues = new Map();
  47. }
  48. _queuedIO(f, filename, ...args) {
  49. const lastIO = this.queues.get(filename) || Promise.resolve();
  50. const nextIO = lastIO.then(() => f(path.join(this.path, filename), ...args));
  51. this.queues.set(filename, nextIO.catch(e => {/*console.log("silently caught", e)*/}));
  52. return nextIO;
  53. }
  54. write(filename, data) {
  55. // console.log("writing file", filename);
  56. return this._queuedIO((path, data) => fs.writeFile(path, data, {encoding: 'utf8'}), filename, data);
  57. }
  58. append(filename, data) {
  59. // console.log("appending to file", filename);
  60. const result = this._queuedIO(fs.appendFile, filename, data);
  61. // result.then(() => console.log("success")).catch(e => console.log("COULD NOT APPEND", e));
  62. return result;
  63. }
  64. stat(filename) {
  65. // console.log("statting file", filename);
  66. return this._queuedIO(fs.stat, filename);
  67. }
  68. read(filename) {
  69. // console.log("reading file", filename);
  70. return this._queuedIO(path => fs.readFile(path, {encoding:'utf8'}), filename);
  71. }
  72. writeJSON(filename, json) {
  73. // console.log("writeJSONing file", filename);
  74. return this.write(filename, JSON.stringify(json));
  75. }
  76. readJSON(filename) {
  77. // console.log("readJSONing file", filename);
  78. return this.read(filename).then(JSON.parse);
  79. }
  80. }
  81. // non-volatile state
  82. const opsDB = new QueuedIO(opsDir);
  83. const sessionDB = new QueuedIO(sessionDir);
  84. class Session {
  85. constructor(id) {
  86. this.id = id;
  87. this.users = new Set();
  88. }
  89. join(socket) {
  90. this.users.add(socket);
  91. }
  92. leave(socket) {
  93. this.users.delete(socket);
  94. }
  95. broadcast(json, skip) {
  96. if (json.type !== "update_cursor") {
  97. console.log("BROADCAST to", this.users.size-1, "subscribers...", json);
  98. }
  99. const stringified = JSON.stringify({type: "broadcast", sessionId: this.sessionId, msg: json});
  100. this.users.forEach((_, socket) => {
  101. if (socket !== skip) { // don't echo
  102. try {
  103. socket.send(stringified); // forward incoming request to all
  104. } catch (e) {
  105. console.log("Error:", e, "Closing socket...")
  106. socket.close(); // in case of an error, it is the client's responsibility to re-subscribe and 'catch up'.
  107. }
  108. }
  109. });
  110. }
  111. }
  112. // mapping of sessionId to Session
  113. const sessions = new Map();
  114. function getSession(id) {
  115. let session = sessions.get(id);
  116. if (session === undefined) {
  117. session = new Session(id);
  118. sessions.set(id, session);
  119. }
  120. return session;
  121. }
  122. const lockedCellsGlobal = new Set();
  123. wsServer.on('connection', (socket, req) => {
  124. const reqUrl = new URL(req.url, req.headers.origin);
  125. const myId = reqUrl.searchParams.get('me');
  126. console.log("Client connected.", myId);
  127. socket.sendJSON = function(json) {
  128. if (json.type !== "pong") {
  129. console.log("<-", json);
  130. }
  131. this.send(JSON.stringify(json));
  132. }
  133. function commonHandler(req) {
  134. if (req.type === "ping") {
  135. socket.sendJSON({type: "pong"});
  136. return true;
  137. }
  138. }
  139. class LeftHandler {
  140. async handle(req) {
  141. console.log("LeftHandler", req.type);
  142. if (commonHandler(req)) {
  143. console.log("PING");
  144. return this;
  145. }
  146. else if (req.type === "join") {
  147. const { reqId, sessionId, userId } = req;
  148. // Reply with all operations in session
  149. const session = await sessionDB.read(sessionId).catch(e => {
  150. if (e.code === 'ENOENT') {
  151. return ""; // Act as if session exists and is empty
  152. } else {
  153. throw e;
  154. }
  155. });
  156. const opIds = new Array(session.length / IDLENGTH);
  157. for (let i=0; i<opIds.length; i+=1) {
  158. const offset = i * IDLENGTH;
  159. opIds[i] = session.slice(offset, offset + IDLENGTH);
  160. }
  161. const ops = await Promise.all(opIds.map(async id => ({id, detail: await opsDB.readJSON(id)})));
  162. socket.sendJSON({type: "ack", reqId, ops});
  163. this.close();
  164. return new JoinedHandler(sessionId);
  165. }
  166. else if (req.type === "new_share") {
  167. console.log("HEY");
  168. const { reqId, ops } = req;
  169. const sessionId = uuidv4();
  170. await Promise.all(ops.map(({id, detail}) => opsDB.writeJSON(id, detail)));
  171. await sessionDB.write(sessionId, ops.map(op=>op.id).join(''));
  172. socket.sendJSON({type: "ack", reqId, sessionId});
  173. this.close();
  174. return new JoinedHandler(sessionId);
  175. }
  176. else {
  177. console.log("No handler for request:", req.type);
  178. return this;
  179. }
  180. }
  181. close() {}
  182. }
  183. class JoinedHandler {
  184. constructor(sessionId) {
  185. this.session = getSession(sessionId);
  186. this.lockedCells = new Set();
  187. this.session.join(socket);
  188. }
  189. async handle(req) {
  190. if (commonHandler(req)) {
  191. return this;
  192. }
  193. else if (req.type === "broadcast") {
  194. const { msg } = req;
  195. this.session.broadcast(msg, socket);
  196. }
  197. else if (req.type === "leave") {
  198. const { reqId } = req;
  199. socket.sendJSON({type: "ack", reqId});
  200. this.close();
  201. return new LeftHandler();
  202. }
  203. else if (req.type === "new_edit") {
  204. const { reqId, op: {id, detail} } = req;
  205. await opsDB.writeJSON(id, detail);
  206. await sessionDB.append(this.session.id, id); // Creates file if it doesn't exist yet
  207. // Best effort broadcast to all subscribers
  208. this.session.broadcast({type: "pub_edit", op: {id, detail}}, socket);
  209. // Send ACK
  210. socket.sendJSON({type: "ack", reqId});
  211. }
  212. else if (req.type === "request_lock") {
  213. const { reqId, cellIds } = req;
  214. let canLock = true;
  215. for (const cellId of cellIds) {
  216. if (lockedCellsGlobal.has(cellId)) {
  217. canLock = false;
  218. break;
  219. }
  220. }
  221. if (canLock) {
  222. for (const cellId of cellIds) {
  223. lockedCellsGlobal.add(cellId);
  224. this.lockedCells.add(cellId);
  225. }
  226. }
  227. socket.sendJSON({ type: "ack", reqId, success: canLock });
  228. }
  229. else if (req.type === "release_lock") {
  230. const { reqId, cellIds } = req;
  231. for (const cellId of cellIds) {
  232. if (this.lockedCells.has(cellId)) {
  233. lockedCellsGlobal.delete(cellId);
  234. this.lockedCells.delete(cellId);
  235. }
  236. }
  237. }
  238. else {
  239. console.log("No handler for request:", req.type);
  240. }
  241. return this;
  242. }
  243. close() {
  244. this.session.leave(socket);
  245. this.session.broadcast({type: "update_selection", userId: myId, selectedIds: []})
  246. }
  247. }
  248. let handlerPromise = Promise.resolve(new LeftHandler());
  249. function queuedHandler(req) {
  250. handlerPromise = handlerPromise.then(async handler => {
  251. try {
  252. return await handler.handle(req);
  253. } catch (e) {
  254. console.log("Error handling", req, e);
  255. return handler;
  256. }
  257. });
  258. }
  259. socket.on('message', function(data) {
  260. const req = JSON.parse(data);
  261. if (req.type !== "ping" && !(req.type === "broadcast" && req.msg.type === "update_cursor")) {
  262. console.log("->", req);
  263. }
  264. queuedHandler(req);
  265. });
  266. socket.on('close', function() {
  267. handlerPromise.then(h => h.close());
  268. console.log("Client disconnected.");
  269. })
  270. });
  271. httpServer.listen(port);
  272. console.log("Listening on", port);
  273. }
  274. startServer().catch(e => {
  275. console.log("Error in startServer:", e);
  276. process.exit(1);
  277. });