socket2event.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. """
  2. Socket 2 Event wrapper
  3. Author: Yentl Van Tendeloo
  4. This maps socket communication to events, and vice versa, allowing for a
  5. Statechart to use (blocking) sockets. It sends events to the socket_in port,
  6. and listens for commands on the socket_out port. As this runs on its own
  7. thread, you will need to start the code by running
  8. "boot_translation_service(controller)" before using the ports.
  9. """
  10. import threading
  11. from sccd.runtime.statecharts_core import Event
  12. import socket
  13. import sys
  14. send_data_queues = {}
  15. send_events = {}
  16. recv_events = {}
  17. run_sockets = {}
  18. def start_socket_threads(controller, sock):
  19. recv_events[sock] = recv_event = threading.Event()
  20. send_events[sock] = send_event = threading.Event()
  21. send_data_queues[sock] = send_data_queue = []
  22. run_sockets[sock] = True
  23. thrd = threading.Thread(target=receive_from_socket, args=[controller, sock, recv_event])
  24. thrd.daemon = True
  25. thrd.start()
  26. thrd = threading.Thread(target=send_to_socket, args=[controller, sock, send_data_queue, send_event])
  27. thrd.daemon = True
  28. thrd.start()
  29. def receive_from_socket(controller, sock, recv_event):
  30. try:
  31. while 1:
  32. recv_event.wait()
  33. recv_event.clear()
  34. if not run_sockets[sock]:
  35. break
  36. data = sock.recv(2**16)
  37. controller.addInput(Event("received_socket", "socket_in", [sock, data]))
  38. except socket.error as e:
  39. controller.addInput(Event("error_socket", "socket_in", [sock, e]))
  40. def send_to_socket(controller, sock, data_queue, send_event):
  41. while run_sockets[sock]:
  42. send_event.wait()
  43. send_event.clear()
  44. while data_queue:
  45. data = data_queue.pop(0)
  46. if sys.version_info[0] > 2:
  47. if isinstance(data, str):
  48. data = data.encode()
  49. send = sock.send(data)
  50. controller.addInput(Event("sent_socket", "socket_in", [sock, send]))
  51. if not run_sockets[sock]:
  52. break
  53. def _accept(controller, sock):
  54. conn, addr = sock.accept()
  55. start_socket_threads(controller, conn)
  56. controller.addInput(Event("accepted_socket", "socket_in", [sock, conn]))
  57. def _connect(controller, sock, destination):
  58. sock.connect(destination)
  59. controller.addInput(Event("connected_socket", "socket_in", [sock]))
  60. def _close(controller, sock):
  61. run_sockets[sock] = False
  62. send_events[sock].set()
  63. recv_events[sock].set()
  64. sock.close()
  65. controller.addInput(Event("closed_socket", "socket_in", [sock]))
  66. def _bind(controller, sock, addr):
  67. sock.bind(addr)
  68. controller.addInput(Event("bound_socket", "socket_in", [sock]))
  69. def _listen(controller, sock):
  70. sock.listen(1)
  71. controller.addInput(Event("listened_socket", "socket_in", [sock]))
  72. def _wrapper_func(*args):
  73. func = args[0]
  74. controller = args[1]
  75. sock = args[2]
  76. try:
  77. func(*args[1:])
  78. except socket.error as e:
  79. #print("ERROR " + str(e))
  80. controller.addInput(Event("error_socket", "socket_in", [sock, e]))
  81. except Exception as e:
  82. print("UNKNOWN ERROR " + str(e))
  83. controller.addInput(Event("unknown_error_socket", "socket_in", [sock, e]))
  84. raise
  85. def _start_on_daemon_thread(func, args):
  86. new_args = [func]
  87. new_args.extend(args)
  88. args = new_args
  89. thrd = threading.Thread(target=_wrapper_func, args=args)
  90. thrd.daemon = True
  91. thrd.start()
  92. def boot_translation_service(controller):
  93. socket_out = controller.addOutputListener("socket_out")
  94. _start_on_daemon_thread(_poll, [controller, socket_out])
  95. def _poll(controller, socket_out):
  96. while 1:
  97. evt = socket_out.fetch(-1)
  98. name, params = evt.getName(), evt.getParameters()
  99. if name == "accept_socket":
  100. _start_on_daemon_thread(_accept, [controller, params[0]])
  101. elif name == "recv_socket":
  102. recv_events[params[0]].set()
  103. elif name == "connect_socket":
  104. _start_on_daemon_thread(_connect, [controller, params[0], params[1]])
  105. elif name == "create_socket":
  106. sock = socket.socket()
  107. sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  108. start_socket_threads(controller, sock)
  109. if len(params) == 1:
  110. # In case we get an ID to prove which socket it is
  111. controller.addInput(Event("created_socket", "socket_in", [sock, params[0]]))
  112. else:
  113. # Don't care and just send out the socket
  114. controller.addInput(Event("created_socket", "socket_in", [sock]))
  115. elif name == "close_socket":
  116. _start_on_daemon_thread(_close, [controller, params[0]])
  117. elif name == "send_socket":
  118. send_data_queues[params[0]].append(params[1])
  119. send_events[params[0]].set()
  120. elif name == "bind_socket":
  121. _start_on_daemon_thread(_bind, [controller, params[0], params[1]])
  122. elif name == "listen_socket":
  123. _start_on_daemon_thread(_listen, [controller, params[0]])
  124. elif name == "stop":
  125. break