run_server.js 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. // NodeJS script
  2. const fs = require('fs/promises');
  3. const fsConstants = require('fs').constants;
  4. const path = require('path');
  5. const {EOL} = require('os');
  6. const http = require('http');
  7. const handler = require('serve-handler');
  8. const IDLENGTH = 36; // UUID v4
  9. const port = process.env.DRAWIOPORT || 8700;
  10. const stateDir = process.env.DRAWIOSTATEDIR || ".";
  11. async function startServer() {
  12. const opsDir = path.join(stateDir, 'ops');
  13. const sessionDir = path.join(stateDir, 'sessions');
  14. try {
  15. process.stdout.write("Can read/write in directory '" + opsDir + "' ? ...");
  16. await fs.access(opsDir, fsConstants.R_OK | fsConstants.W_OK);
  17. process.stdout.write("OK" + EOL);
  18. process.stdout.write("Can read/write in directory '" + sessionDir + "' ? ...");
  19. await fs.access(sessionDir, fsConstants.R_OK | fsConstants.W_OK);
  20. process.stdout.write("OK" + EOL);
  21. } catch (e) {
  22. process.stdout.write(EOL + "Please make sure the following directories exist and are writable:" + EOL);
  23. process.stdout.write(" " + opsDir + EOL);
  24. process.stdout.write(" " + sessionDir + EOL);
  25. process.exit(1);
  26. }
  27. const { WebSocketServer } = require('ws');
  28. const { v4: uuidv4 } = require("uuid");
  29. function asyncSleep(ms) {
  30. return new Promise(resolve => setTimeout(resolve, ms));
  31. }
  32. const httpServer = http.createServer((request, response) => {
  33. // You pass two more arguments for config and middleware
  34. // More details here: https://github.com/vercel/serve-handler#options
  35. console.log(request.method, request.url)
  36. return handler(request, response);
  37. });
  38. const wsServer = new WebSocketServer({
  39. server: httpServer,
  40. path: "/websocket",
  41. });
  42. // Provides async reads and writes to files, while guaranteeing that for every file, a read always reads the latest write.
  43. class QueuedIO {
  44. constructor(path) {
  45. this.path = path;
  46. this.queues = new Map();
  47. }
  48. _queuedIO(f, filename, ...args) {
  49. const lastIO = this.queues.get(filename) || Promise.resolve();
  50. const nextIO = lastIO.then(() => f(path.join(this.path, filename), ...args));
  51. this.queues.set(filename, nextIO.catch(e => {/*console.log("silently caught", e)*/}));
  52. return nextIO;
  53. }
  54. write(filename, data) {
  55. // console.log("writing file", filename);
  56. return this._queuedIO((path, data) => fs.writeFile(path, data, {encoding: 'utf8'}), filename, data);
  57. }
  58. append(filename, data) {
  59. // console.log("appending to file", filename);
  60. const result = this._queuedIO(fs.appendFile, filename, data);
  61. // result.then(() => console.log("success")).catch(e => console.log("COULD NOT APPEND", e));
  62. return result;
  63. }
  64. stat(filename) {
  65. // console.log("statting file", filename);
  66. return this._queuedIO(fs.stat, filename);
  67. }
  68. read(filename) {
  69. // console.log("reading file", filename);
  70. return this._queuedIO(path => fs.readFile(path, {encoding:'utf8'}), filename);
  71. }
  72. writeJSON(filename, json) {
  73. // console.log("writeJSONing file", filename);
  74. return this.write(filename, JSON.stringify(json));
  75. }
  76. readJSON(filename) {
  77. // console.log("readJSONing file", filename);
  78. return this.read(filename).then(JSON.parse);
  79. }
  80. }
  81. const opsDB = new QueuedIO(opsDir);
  82. const sessionDB = new QueuedIO(sessionDir);
  83. // mapping of modelId to set of sockets
  84. const subscriptions = new Map();
  85. wsServer.on('connection', function(socket) {
  86. console.log("Client connected.")
  87. const mySubscriptions = new Set();
  88. function join(sessionId) {
  89. const subbedSockets = subscriptions.get(sessionId);
  90. if (subbedSockets) {
  91. subbedSockets.add(socket);
  92. } else {
  93. subscriptions.set(sessionId, new Set([socket]));
  94. }
  95. mySubscriptions.add(sessionId);
  96. }
  97. function leave(sessionId) {
  98. const subbedSockets = subscriptions.get(sessionId);
  99. if (subbedSockets) {
  100. subbedSockets.delete(socket);
  101. }
  102. mySubscriptions.delete(sessionId);
  103. }
  104. function leaveAll() {
  105. mySubscriptions.forEach(leave);
  106. }
  107. async function handle(req) {
  108. if (req.type === "ping") {
  109. return {type: "pong"};
  110. }
  111. else {
  112. console.log("-> ", req);
  113. if (req.type === "join") {
  114. const { reqId, sessionId } = req;
  115. leaveAll(); // a client can only join 1 session at a time
  116. join(sessionId);
  117. // Reply with all operations in session
  118. const session = await sessionDB.read(sessionId).catch(e => {
  119. if (e.code === 'ENOENT') {
  120. return ""; // Act as if session exists and is empty
  121. } else {
  122. throw e;
  123. }
  124. });
  125. const opIds = new Array(session.length / IDLENGTH);
  126. for (let i=0; i<opIds.length; i+=1) {
  127. const offset = i * IDLENGTH;
  128. opIds[i] = session.slice(offset, offset + IDLENGTH);
  129. }
  130. const ops = await Promise.all(opIds.map(async id => ({id, detail: await opsDB.readJSON(id)})));
  131. return {type: "ack", reqId, ops};
  132. }
  133. else if (req.type === "leave") {
  134. const { reqId } = req;
  135. leaveAll();
  136. return {type: "ack", reqId};
  137. }
  138. else if (req.type === "new_share") {
  139. const { reqId, ops } = req;
  140. const sessionId = uuidv4();
  141. leaveAll();
  142. join(sessionId);
  143. await Promise.all(ops.map(({id, detail}) => opsDB.writeJSON(id, detail)));
  144. await sessionDB.write(sessionId, ops.map(op=>op.id).join(''));
  145. return {type: "ack", reqId, sessionId};
  146. }
  147. else if (req.type === "new_edit") {
  148. const { reqId, sessionId, op: {id, detail} } = req;
  149. await opsDB.writeJSON(id, detail);
  150. await sessionDB.append(sessionId, id); // Creates file if it doesn't exist yet
  151. // Best effort broadcast to all subscribers
  152. (async () => {
  153. const subbedSockets = subscriptions.get(sessionId) || new Set();
  154. const broadcastMsg = {type:"pub_edit", sessionId, op: {id, detail}};
  155. console.log("BROADCAST to", subbedSockets.size-1,"subscribers...", broadcastMsg);
  156. const stringified = JSON.stringify(broadcastMsg);
  157. // await asyncSleep(3000);
  158. subbedSockets.forEach(subbed => {
  159. if (subbed !== socket) { // don't echo
  160. try {
  161. subbed.send(stringified); // forward incoming request to all
  162. } catch (e) {
  163. console.log("Error:", e, "Closing socket...")
  164. subbed.close(); // in case of an error, it is the client's responsibility to re-subscribe and 'catch up'.
  165. }
  166. }
  167. });
  168. })();
  169. return {type: "ack", reqId};
  170. }
  171. else if (req.type === "update_cursor") {
  172. mySubscriptions.forEach(sessionId => {
  173. const sockets = subscriptions.get(sessionId) || new Set();
  174. const stringified = JSON.stringify(req);
  175. sockets.forEach(s => {
  176. if (s === socket) {
  177. return;
  178. }
  179. try {
  180. s.send(stringified);
  181. } catch (e) {
  182. console.log("Error:", e, "Closing socket...")
  183. s.close(); // in case of an error, it is the client's responsibility to re-subscribe and 'catch up'.
  184. }
  185. });
  186. });
  187. }
  188. }
  189. }
  190. socket.on('message', function(data) {
  191. const req = JSON.parse(data);
  192. handle(req).then(json => {
  193. if (json) {
  194. if (json.type !== 'pong')
  195. console.log("<- ", json);
  196. socket.send(JSON.stringify(json));
  197. }
  198. }).catch(e => {
  199. console.log("Error handling", req, e);
  200. })
  201. });
  202. socket.on('close', function() {
  203. leaveAll();
  204. console.log("Client disconnected.");
  205. })
  206. });
  207. httpServer.listen(port);
  208. console.log("Listening on", port);
  209. }
  210. startServer().catch(e => {
  211. console.log("Error in startServer:", e);
  212. process.exit(1);
  213. });