_app.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326
  1. """
  2. websocket - WebSocket client library for Python
  3. Copyright (C) 2010 Hiroki Ohtani(liris)
  4. This library is free software; you can redistribute it and/or
  5. modify it under the terms of the GNU Lesser General Public
  6. License as published by the Free Software Foundation; either
  7. version 2.1 of the License, or (at your option) any later version.
  8. This library is distributed in the hope that it will be useful,
  9. but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  11. Lesser General Public License for more details.
  12. You should have received a copy of the GNU Lesser General Public
  13. License along with this library; if not, write to the Free Software
  14. Foundation, Inc., 51 Franklin Street, Fifth Floor,
  15. Boston, MA 02110-1335 USA
  16. """
  17. """
  18. WebSocketApp provides higher level APIs.
  19. """
  20. import select
  21. import sys
  22. import threading
  23. import time
  24. import traceback
  25. import six
  26. from ._abnf import ABNF
  27. from ._core import WebSocket, getdefaulttimeout
  28. from ._exceptions import *
  29. from . import _logging
  30. __all__ = ["WebSocketApp"]
  31. class Dispatcher:
  32. def __init__(self, app, ping_timeout):
  33. self.app = app
  34. self.ping_timeout = ping_timeout
  35. def read(self, sock, read_callback, check_callback):
  36. while self.app.sock.connected:
  37. r, w, e = select.select(
  38. (self.app.sock.sock, ), (), (), self.ping_timeout) # Use a 10 second timeout to avoid to wait forever on close
  39. if r:
  40. if not read_callback():
  41. break
  42. check_callback()
  43. class SSLDispacther:
  44. def __init__(self, app, ping_timeout):
  45. self.app = app
  46. self.ping_timeout = ping_timeout
  47. def read(self, sock, read_callback, check_callback):
  48. while self.app.sock.connected:
  49. r = self.select()
  50. if r:
  51. if not read_callback():
  52. break
  53. check_callback()
  54. def select(self):
  55. sock = self.app.sock.sock
  56. if sock.pending():
  57. return [sock,]
  58. r, w, e = select.select((sock, ), (), (), self.ping_timeout)
  59. return r
  60. class WebSocketApp(object):
  61. """
  62. Higher level of APIs are provided.
  63. The interface is like JavaScript WebSocket object.
  64. """
  65. def __init__(self, url, header=None,
  66. on_open=None, on_message=None, on_error=None,
  67. on_close=None, on_ping=None, on_pong=None,
  68. on_cont_message=None,
  69. keep_running=True, get_mask_key=None, cookie=None,
  70. subprotocols=None,
  71. on_data=None):
  72. """
  73. url: websocket url.
  74. header: custom header for websocket handshake.
  75. on_open: callable object which is called at opening websocket.
  76. this function has one argument. The argument is this class object.
  77. on_message: callable object which is called when received data.
  78. on_message has 2 arguments.
  79. The 1st argument is this class object.
  80. The 2nd argument is utf-8 string which we get from the server.
  81. on_error: callable object which is called when we get error.
  82. on_error has 2 arguments.
  83. The 1st argument is this class object.
  84. The 2nd argument is exception object.
  85. on_close: callable object which is called when closed the connection.
  86. this function has one argument. The argument is this class object.
  87. on_cont_message: callback object which is called when receive continued
  88. frame data.
  89. on_cont_message has 3 arguments.
  90. The 1st argument is this class object.
  91. The 2nd argument is utf-8 string which we get from the server.
  92. The 3rd argument is continue flag. if 0, the data continue
  93. to next frame data
  94. on_data: callback object which is called when a message received.
  95. This is called before on_message or on_cont_message,
  96. and then on_message or on_cont_message is called.
  97. on_data has 4 argument.
  98. The 1st argument is this class object.
  99. The 2nd argument is utf-8 string which we get from the server.
  100. The 3rd argument is data type. ABNF.OPCODE_TEXT or ABNF.OPCODE_BINARY will be came.
  101. The 4th argument is continue flag. if 0, the data continue
  102. keep_running: this parameter is obosleted and ignored it.
  103. get_mask_key: a callable to produce new mask keys,
  104. see the WebSocket.set_mask_key's docstring for more information
  105. subprotocols: array of available sub protocols. default is None.
  106. """
  107. self.url = url
  108. self.header = header if header is not None else []
  109. self.cookie = cookie
  110. self.on_open = on_open
  111. self.on_message = on_message
  112. self.on_data = on_data
  113. self.on_error = on_error
  114. self.on_close = on_close
  115. self.on_ping = on_ping
  116. self.on_pong = on_pong
  117. self.on_cont_message = on_cont_message
  118. self.keep_running = False
  119. self.get_mask_key = get_mask_key
  120. self.sock = None
  121. self.last_ping_tm = 0
  122. self.last_pong_tm = 0
  123. self.subprotocols = subprotocols
  124. def send(self, data, opcode=ABNF.OPCODE_TEXT):
  125. """
  126. send message.
  127. data: message to send. If you set opcode to OPCODE_TEXT,
  128. data must be utf-8 string or unicode.
  129. opcode: operation code of data. default is OPCODE_TEXT.
  130. """
  131. if not self.sock or self.sock.send(data, opcode) == 0:
  132. raise WebSocketConnectionClosedException(
  133. "Connection is already closed.")
  134. def close(self, **kwargs):
  135. """
  136. close websocket connection.
  137. """
  138. self.keep_running = False
  139. if self.sock:
  140. self.sock.close(**kwargs)
  141. def _send_ping(self, interval, event):
  142. while not event.wait(interval):
  143. self.last_ping_tm = time.time()
  144. if self.sock:
  145. try:
  146. self.sock.ping()
  147. except Exception as ex:
  148. _logging.warning("send_ping routine terminated: {}".format(ex))
  149. break
  150. def run_forever(self, sockopt=None, sslopt=None,
  151. ping_interval=0, ping_timeout=None,
  152. http_proxy_host=None, http_proxy_port=None,
  153. http_no_proxy=None, http_proxy_auth=None,
  154. skip_utf8_validation=False,
  155. host=None, origin=None, dispatcher=None):
  156. """
  157. run event loop for WebSocket framework.
  158. This loop is infinite loop and is alive during websocket is available.
  159. sockopt: values for socket.setsockopt.
  160. sockopt must be tuple
  161. and each element is argument of sock.setsockopt.
  162. sslopt: ssl socket optional dict.
  163. ping_interval: automatically send "ping" command
  164. every specified period(second)
  165. if set to 0, not send automatically.
  166. ping_timeout: timeout(second) if the pong message is not received.
  167. http_proxy_host: http proxy host name.
  168. http_proxy_port: http proxy port. If not set, set to 80.
  169. http_no_proxy: host names, which doesn't use proxy.
  170. skip_utf8_validation: skip utf8 validation.
  171. host: update host header.
  172. origin: update origin header.
  173. """
  174. if not ping_timeout or ping_timeout <= 0:
  175. ping_timeout = None
  176. if ping_timeout and ping_interval and ping_interval <= ping_timeout:
  177. raise WebSocketException("Ensure ping_interval > ping_timeout")
  178. if sockopt is None:
  179. sockopt = []
  180. if sslopt is None:
  181. sslopt = {}
  182. if self.sock:
  183. raise WebSocketException("socket is already opened")
  184. thread = None
  185. close_frame = None
  186. self.keep_running = True
  187. self.last_ping_tm = 0
  188. self.last_pong_tm = 0
  189. def teardown():
  190. if thread and thread.isAlive():
  191. event.set()
  192. thread.join()
  193. self.keep_running = False
  194. self.sock.close()
  195. close_args = self._get_close_args(
  196. close_frame.data if close_frame else None)
  197. self._callback(self.on_close, *close_args)
  198. self.sock = None
  199. try:
  200. self.sock = WebSocket(
  201. self.get_mask_key, sockopt=sockopt, sslopt=sslopt,
  202. fire_cont_frame=self.on_cont_message and True or False,
  203. skip_utf8_validation=skip_utf8_validation)
  204. self.sock.settimeout(getdefaulttimeout())
  205. self.sock.connect(
  206. self.url, header=self.header, cookie=self.cookie,
  207. http_proxy_host=http_proxy_host,
  208. http_proxy_port=http_proxy_port, http_no_proxy=http_no_proxy,
  209. http_proxy_auth=http_proxy_auth, subprotocols=self.subprotocols,
  210. host=host, origin=origin)
  211. if not dispatcher:
  212. dispatcher = self.create_dispatcher(ping_timeout)
  213. self._callback(self.on_open)
  214. if ping_interval:
  215. event = threading.Event()
  216. thread = threading.Thread(
  217. target=self._send_ping, args=(ping_interval, event))
  218. thread.setDaemon(True)
  219. thread.start()
  220. def read():
  221. if not self.keep_running:
  222. return teardown()
  223. op_code, frame = self.sock.recv_data_frame(True)
  224. if op_code == ABNF.OPCODE_CLOSE:
  225. close_frame = frame
  226. return teardown()
  227. elif op_code == ABNF.OPCODE_PING:
  228. self._callback(self.on_ping, frame.data)
  229. elif op_code == ABNF.OPCODE_PONG:
  230. self.last_pong_tm = time.time()
  231. self._callback(self.on_pong, frame.data)
  232. elif op_code == ABNF.OPCODE_CONT and self.on_cont_message:
  233. self._callback(self.on_data, frame.data,
  234. frame.opcode, frame.fin)
  235. self._callback(self.on_cont_message,
  236. frame.data, frame.fin)
  237. else:
  238. data = frame.data
  239. if six.PY3 and op_code == ABNF.OPCODE_TEXT:
  240. data = data.decode("utf-8")
  241. self._callback(self.on_data, data, frame.opcode, True)
  242. self._callback(self.on_message, data)
  243. return True
  244. def check():
  245. if ping_timeout and self.last_ping_tm \
  246. and time.time() - self.last_ping_tm > ping_timeout \
  247. and self.last_ping_tm - self.last_pong_tm > ping_timeout:
  248. raise WebSocketTimeoutException("ping/pong timed out")
  249. return True
  250. dispatcher.read(self.sock.sock, read, check)
  251. except (Exception, KeyboardInterrupt, SystemExit) as e:
  252. self._callback(self.on_error, e)
  253. if isinstance(e, SystemExit):
  254. # propagate SystemExit further
  255. raise
  256. teardown()
  257. def create_dispatcher(self, ping_timeout):
  258. timeout = ping_timeout or 10
  259. if self.sock.is_ssl():
  260. return SSLDispacther(self, timeout)
  261. return Dispatcher(self, timeout)
  262. def _get_close_args(self, data):
  263. """ this functions extracts the code, reason from the close body
  264. if they exists, and if the self.on_close except three arguments """
  265. import inspect
  266. # if the on_close callback is "old", just return empty list
  267. if sys.version_info < (3, 0):
  268. if not self.on_close or len(inspect.getargspec(self.on_close).args) != 3:
  269. return []
  270. else:
  271. if not self.on_close or len(inspect.getfullargspec(self.on_close).args) != 3:
  272. return []
  273. if data and len(data) >= 2:
  274. code = 256 * six.byte2int(data[0:1]) + six.byte2int(data[1:2])
  275. reason = data[2:].decode('utf-8')
  276. return [code, reason]
  277. return [None, None]
  278. def _callback(self, callback, *args):
  279. if callback:
  280. try:
  281. callback(self, *args)
  282. except Exception as e:
  283. _logging.error("error from callback {}: {}".format(callback, e))
  284. if _logging.isEnabledForDebug():
  285. _, _, tb = sys.exc_info()
  286. traceback.print_tb(tb)