ws.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. '''This file is part of AToMPM - A Tool for Multi-Paradigm Modelling
  2. Copyright 2011 by the AToMPM team and licensed under the LGPL
  3. See COPYING.lesser and README.md in the root of this project for full details'''
  4. import re, threading, json, logging
  5. import sys
  6. if sys.version_info[0] < 3:
  7. import httplib as httplib
  8. import websocket._app as websocket
  9. else:
  10. import http.client as httplib
  11. import websocket._app as websocket
  12. '''
  13. a friendly wrapper around python-websockets that doubles as a socketio client
  14. _opened true when the socket is first opened
  15. _chlogh a reference to an object that implements onchangelog(), this
  16. method is called upon reception of changelogs from the asworker
  17. we're subscribed to
  18. subscribed describes the current state of our subscription to our asworker
  19. None: don't know yet
  20. True: subscribed
  21. False: subscription failed
  22. _ws the python-websocket '''
  23. class WebSocket :
  24. #socket.io messages types
  25. DISCONNECT = '0'
  26. CONNECT = '1'
  27. HEARTBEAT = '2'
  28. MESSAGE = '3'
  29. JSON_MESSAGE = '4'
  30. EVENT = '5'
  31. ACK = '6'
  32. ERROR = '7'
  33. NOOP = '8'
  34. def __init__(self,chlogh=None) :
  35. assert chlogh == None or 'onchangelog' in dir(chlogh)
  36. self._opened = False
  37. self._chlogh = chlogh
  38. self.subscribed = None
  39. self.connect()
  40. def _start_ws(self, hskey):
  41. self._ws = websocket.WebSocketApp(
  42. 'ws://127.0.0.1:8124/socket.io/1/websocket/' + hskey,
  43. on_message = self._onmessage,
  44. on_open = self._onopen)
  45. self._ws.run_forever()
  46. '''
  47. connect to the socketio server
  48. 1. perform the HTTP handshake
  49. 2. open a websocket connection
  50. REF:: https://github.com/LearnBoost/socket.io-spec '''
  51. def connect(self) :
  52. conn = httplib.HTTPConnection('127.0.0.1:8124')
  53. conn.request('POST','/socket.io/1/')
  54. resp = conn.getresponse()
  55. if resp.status == 200 :
  56. resp = resp.read()
  57. try: #handle bytes
  58. resp = resp.decode()
  59. except AttributeError:
  60. pass
  61. hskey = resp.split(':')[0]
  62. # start the websocket on a different thread as it loops forever
  63. thr = threading.Thread(target = self._start_ws, args = (hskey, ))
  64. thr.start()
  65. else :
  66. raise Exception('websocket initialization failed :: '+str(resp.reason))
  67. '''
  68. close the socket '''
  69. def close(self, ws) :
  70. self._ws.close()
  71. '''
  72. parse and handle incoming message '''
  73. def _onmessage(self,ws, msg) :
  74. logging.debug('## msg recvd '+msg)
  75. msgType = msg[0]
  76. if msgType == WebSocket.CONNECT :
  77. return
  78. elif msgType == WebSocket.ERROR :
  79. raise Exception('received error from socketio :: '+str(msg))
  80. elif msgType == WebSocket.HEARTBEAT :
  81. self._ws.send('2::')
  82. elif msgType == WebSocket.EVENT :
  83. msg = json.loads(msg[len(WebSocket.EVENT+':::'):])
  84. if msg['name'] != 'message' :
  85. raise Exception('received unexpected socketio event :: '+str(msg))
  86. msg = msg['args'][0]
  87. if 'statusCode' in msg and msg['statusCode'] != None :
  88. #on POST /changeListener response
  89. if msg['statusCode'] == 201 :
  90. self.subscribed = True
  91. else :
  92. self.subscribed = False
  93. elif self._chlogh and self.subscribed :
  94. self._chlogh.onchangelog(msg['data'])
  95. else :
  96. pass
  97. '''
  98. mark socket connection as opened '''
  99. def _onopen(self, ws) :
  100. self._opened = True
  101. '''
  102. subscribe to specified asworker '''
  103. def subscribe(self,aswid) :
  104. if not self._opened :
  105. t = threading.Timer(0.25,self.subscribe,[aswid])
  106. t.start()
  107. else :
  108. self._ws.send(
  109. '4:::{"method":"POST","url":"/changeListener?wid='+aswid+'"}')