network_client.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. import threading
  2. import socket
  3. import sys
  4. import io
  5. import queue
  6. from dataclasses import dataclass
  7. from typing import *
  8. SERVERS = [ "localhost:9000", "localhost:9001" ]
  9. # returns the number of hardcoded servers of the client
  10. def get_nr_of_servers():
  11. return len(SERVERS)
  12. # gets the information of the server with the index provided
  13. def get_server(i):
  14. return SERVERS[i]
  15. from sccd.action_lang.static.types import *
  16. SCCD_EXPORTS = {
  17. "get_nr_of_servers": (get_nr_of_servers, SCCDFunction([], SCCDInt)),
  18. "get_server": (get_server, SCCDFunction([SCCDInt], SCCDString)),
  19. }
  20. from sccd.controller.controller import *
  21. from sccd.realtime.eventloop import *
  22. class NetworkClient:
  23. def __init__(self, eventloop: ThreadSafeEventLoop):
  24. self.eventloop = eventloop
  25. self.queue = queue.Queue() # output events from the statechart (for us to handle) are added to this queue
  26. self.recv_thread = None
  27. # Starts the network client in a new thread.
  28. # Networking stuff must run in its own thread, because socket IO is blocking.
  29. def start(self):
  30. def event_handler_thread():
  31. while True:
  32. name, params = self.queue.get()
  33. if name == "connect":
  34. address = params[0]
  35. host, port = address.split(':')
  36. port = int(port)
  37. # print("NetworkClient: attempting to connect...")
  38. try:
  39. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  40. sock.connect((host, port))
  41. def recv_thread():
  42. file = sock.makefile()
  43. for line in file:
  44. if line.startswith("ACK JOIN"):
  45. self.eventloop.add_input_now_threadsafe("network", "joined")
  46. elif line.startswith("ACK LEAVE"):
  47. self.eventloop.add_input_now_threadsafe("network", "left")
  48. elif line.startswith("MSG"):
  49. msg = line[4:]
  50. self.eventloop.add_input_now_threadsafe("network", "receive_message", [msg])
  51. elif line.startswith("ALIVE"):
  52. self.eventloop.add_input_now_threadsafe("network", "alive")
  53. self.recv_thread = threading.Thread(target=recv_thread)
  54. self.recv_thread.daemon = True
  55. self.recv_thread.start()
  56. # print("NetworkClient: connected: started recv_thread")
  57. self.eventloop.add_input_now_threadsafe("network", "connected")
  58. except ConnectionError:
  59. pass
  60. elif name == "disconnect":
  61. sock.close()
  62. # print("NetworkClient: waiting for recv_thread...")
  63. self.recv_thread.join()
  64. self.recv_thread = None
  65. # print("NetworkClient: recv_thread is done.")
  66. self.eventloop.add_input_now_threadsafe("network", "disconnected")
  67. elif name == "join":
  68. sock.send(("JOIN " + str(params[0]) + "\n").encode('utf-8'))
  69. elif name == "leave":
  70. sock.send("LEAVE\n".encode('utf-8'))
  71. elif name == "poll":
  72. sock.send("POLL\n".encode('utf-8'))
  73. elif name == "send_message":
  74. sock.send(("MSG " + str(params[0]) + "\n").encode('utf-8'))
  75. t = threading.Thread(target=event_handler_thread)
  76. t.daemon = True
  77. t.start()
  78. def add_input(self, name, params):
  79. self.queue.put((name, params))