socket2event.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. import threading
  2. from sccd.runtime.statecharts_core import Event
  3. import socket
  4. send_data_queues = {}
  5. send_events = {}
  6. recv_events = {}
  7. run_sockets = {}
  8. def start_socket_threads(controller, sock):
  9. recv_events[sock] = recv_event = threading.Event()
  10. send_events[sock] = send_event = threading.Event()
  11. send_data_queues[sock] = send_data_queue = []
  12. run_sockets[sock] = True
  13. thrd = threading.Thread(target=receive_from_socket, args=[controller, sock, recv_event])
  14. thrd.daemon = True
  15. thrd.start()
  16. thrd = threading.Thread(target=send_to_socket, args=[controller, sock, send_data_queue, send_event])
  17. thrd.daemon = True
  18. thrd.start()
  19. def receive_from_socket(controller, sock, recv_event):
  20. while 1:
  21. recv_event.wait()
  22. recv_event.clear()
  23. if not run_sockets[sock]:
  24. break
  25. data = sock.recv(2**16)
  26. controller.addInput(Event("received_socket", "socket_in", [sock, data]))
  27. def send_to_socket(controller, sock, data_queue, send_event):
  28. while run_sockets[sock]:
  29. send_event.wait()
  30. send_event.clear()
  31. while data_queue:
  32. send = sock.send(data_queue.pop(0))
  33. controller.addInput(Event("sent_socket", "socket_in", [sock, send]))
  34. if not run_sockets[sock]:
  35. break
  36. def _accept(controller, sock):
  37. conn, addr = sock.accept()
  38. start_socket_threads(controller, conn)
  39. controller.addInput(Event("accepted_socket", "socket_in", [sock, conn]))
  40. def _connect(controller, sock, destination):
  41. sock.connect(destination)
  42. controller.addInput(Event("connected_socket", "socket_in", [sock]))
  43. def _close(controller, sock):
  44. run_sockets[sock] = False
  45. send_events[sock].set()
  46. recv_events[sock].set()
  47. sock.close()
  48. controller.addInput(Event("closed_socket", "socket_in", [sock]))
  49. def _bind(controller, sock, addr):
  50. sock.bind(addr)
  51. controller.addInput(Event("bound_socket", "socket_in", [sock]))
  52. def _listen(controller, sock):
  53. sock.listen(1)
  54. controller.addInput(Event("listened_socket", "socket_in", [sock]))
  55. def _wrapper_func(*args):
  56. func = args[0]
  57. controller = args[1]
  58. sock = args[2]
  59. try:
  60. func(*args[1:])
  61. except socket.error as e:
  62. print("ERROR " + str(e))
  63. controller.addInput(Event("error_socket", "socket_in", [sock, e]))
  64. except Exception as e:
  65. print("UNKNOWN ERROR " + str(e))
  66. controller.addInput(Event("unknown_error_socket", "socket_in", [sock, e]))
  67. def _start_on_daemon_thread(func, args):
  68. new_args = [func]
  69. new_args.extend(args)
  70. args = new_args
  71. thrd = threading.Thread(target=_wrapper_func, args=args)
  72. thrd.daemon = True
  73. thrd.start()
  74. def boot_translation_service(controller):
  75. socket_out = controller.addOutputListener("socket_out")
  76. _start_on_daemon_thread(_poll, [controller, socket_out])
  77. def _poll(controller, socket_out):
  78. while 1:
  79. evt = socket_out.fetch(-1)
  80. name, params = evt.getName(), evt.getParameters()
  81. if name == "accept_socket":
  82. _start_on_daemon_thread(_accept, [controller, params[0]])
  83. elif name == "recv_socket":
  84. recv_events[params[0]].set()
  85. elif name == "connect_socket":
  86. _start_on_daemon_thread(_connect, [controller, params[0], params[1]])
  87. elif name == "create_socket":
  88. sock = socket.socket()
  89. sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  90. start_socket_threads(controller, sock)
  91. controller.addInput(Event("created_socket", "socket_in", [sock]))
  92. elif name == "close_socket":
  93. _start_on_daemon_thread(_close, [controller, params[0]])
  94. elif name == "send_socket":
  95. send_data_queues[params[0]].append(params[1])
  96. send_events[params[0]].set()
  97. elif name == "bind_socket":
  98. _start_on_daemon_thread(_bind, [controller, params[0], params[1]])
  99. elif name == "listen_socket":
  100. _start_on_daemon_thread(_listen, [controller, params[0]])
  101. elif name == "stop":
  102. break