123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530 |
- import matplotlib
- matplotlib.use("TkAgg")
- from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
- from matplotlib.figure import Figure
- from matplotlib import rcParams
- rcParams.update({'figure.autolayout': True})
- from Tkinter import *
- from PIL import Image, ImageTk
- import tkSimpleDialog
- import urllib
- import urllib2
- import json
- import time
- JUMP = 50
- MAX_WIDTH = 10 * JUMP
- MAX_HEIGHT = 10 * JUMP
- CLICK_TOLERANCE = 5
- address = "http://127.0.0.1:8001"
- taskname = "test"
- root = Tk()
- names = {}
- event_entry = StringVar()
- setting_initial = False
- request_new_state = False
- canvas = Canvas(root, width=MAX_WIDTH, height=MAX_HEIGHT, bg="white")
- name = 0
- class FakeLayer():
- def __init__(self, address):
- self.types = {}
- self.sources = {}
- self.targets = {}
- self.attrs = {}
- def send_event(self, event):
- pass
- def read_available_attributes(self, name):
- return [("name", "String")]
- def read_attribute(self, name, attr):
- return self.attrs.get(name, {}).get(attr, None)
- def set_attribute(self, name, attr, value):
- self.attrs.setdefault(name, {})[attr] = value
- def instantiate_element(self, name, block_type):
- self.types[name] = block_type
- def instantiate_link(self, name, link_type, source, target):
- self.types[name] = link_type
- self.sources[name] = source
- self.targets[name] = target
- self.attrs[name] = {}
- def simulate(self):
- pass
- def pause(self):
- pass
- def delete(self):
- pass
- def switch_initial(self, name):
- pass
- def auto_sanitize(self, auto):
- pass
- def set_current(self, name):
- pass
- def poll(self):
- pass
- attribute = []
- available_attrs = []
- inp_evts = []
- state = []
- outp_evts = []
- #inp_evts = [(2, "arm"), (6, "detected")]
- #state = [(0, "idle"), (2, "armed"), (6, "detected"), (8, "idle")]
- #outp_evts = [(6, "soundAlarm"), ]
- do_color = "grey"
- def poll(address):
- simulation_time = None
- working_available_attrs = []
- global do_color
- global request_new_state
- while 1:
- returnvalue = json.loads(urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "get_output", "taskname": taskname}))).read())
- print("Process " + str(returnvalue))
- if (returnvalue.startswith("AVAILABLE_ATTR_VALUE")):
- working_available_attrs.append([json.loads(returnvalue.split(" ", 1)[1]), None])
- elif (returnvalue.startswith("AVAILABLE_ATTR_TYPE")):
- working_available_attrs[-1][1] = json.loads(returnvalue.split(" ", 1)[1])
- elif (returnvalue.startswith("AVAILABLE_ATTR_END")):
- available_attrs.append(working_available_attrs)
- working_available_attrs = []
- elif (returnvalue.startswith("ATTR_VALUE")):
- v = returnvalue.split(" ", 1)[1]
- if v == "None":
- v = None
- else:
- v = json.loads(v)
- attribute.append(v)
- elif (returnvalue.startswith("SIM_TIME")):
- simulation_time = json.loads(returnvalue.split(" ", 1)[1])
- elif (returnvalue.startswith("SIM_STATE")):
- state.append((simulation_time, returnvalue.split(" ", 1)[1]))
- elif (returnvalue.startswith("SIM_EVENT")):
- inp_evts.append((simulation_time, returnvalue.split(" ", 1)[1]))
- elif (returnvalue.startswith("SIM_RAISE")):
- outp_evts.append((simulation_time, returnvalue.split(" ", 1)[1]))
- elif (returnvalue.startswith("CONFORMANCE_OK")):
- do_color = "grey"
- elif (returnvalue.startswith("CONFORMANCE_FAIL")):
- do_color = "red"
- elif (returnvalue.startswith("REQUEST_CURRENT_STATE")):
- do_color = "blue"
- request_new_state = True
- else:
- print("Error: got unknown result: " + returnvalue)
- class MvLayer():
- def __init__(self, address):
- import threading
- self.address = address
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % taskname, "taskname": "task_manager"}))).read()
- thrd = threading.Thread(target=poll, args=[address])
- thrd.daemon = True
- thrd.start()
- def color():
- while 1:
- global do_color
- root.configure(bg=do_color)
- time.sleep(0.1)
- thrd = threading.Thread(target=color)
- thrd.daemon = True
- thrd.start()
- def read_available_attributes(self, name):
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"read_available_attributes"', "taskname": taskname}))).read()
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % name, "taskname": taskname}))).read()
- while 1:
- try:
- print("Attribute: " + str(available_attrs))
- return available_attrs.pop(0)
- except IndexError:
- time.sleep(0.1)
- def read_attribute(self, name, attr):
- print("Sending read_attribute")
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"read_attribute"', "taskname": taskname}))).read()
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % name, "taskname": taskname}))).read()
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % attr, "taskname": taskname}))).read()
- print("Waiting for attribute")
- while 1:
- try:
- print("Attribute: " + str(attribute))
- return attribute.pop(0)
- except IndexError:
- time.sleep(0.1)
- def set_attribute(self, name, attr, value):
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"set_attribute"', "taskname": taskname}))).read()
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % name, "taskname": taskname}))).read()
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % attr, "taskname": taskname}))).read()
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": json.dumps(value), "taskname": taskname}))).read()
- def instantiate_element(self, name, block_type):
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"instantiate_node"', "taskname": taskname}))).read()
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (block_type), "taskname": taskname}))).read()
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (name), "taskname": taskname}))).read()
- def instantiate_link(self, name, link_type, source, target):
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"instantiate_association"', "taskname": taskname}))).read()
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (link_type), "taskname": taskname}))).read()
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (name), "taskname": taskname}))).read()
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (source), "taskname": taskname}))).read()
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (target), "taskname": taskname}))).read()
- def simulate(self):
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"simulate"', "taskname": taskname}))).read()
- def pause(self):
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"pause"', "taskname": taskname}))).read()
- def send_event(self, event):
- print("SENDING EVENT")
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"event"', "taskname": taskname}))).read()
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (event), "taskname": taskname}))).read()
- def delete(self, block):
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"delete_element"', "taskname": taskname}))).read()
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (block), "taskname": taskname}))).read()
- def switch_initial(self, name):
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"switch_initial"', "taskname": taskname}))).read()
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (name), "taskname": taskname}))).read()
- def auto_sanitize(self, auto):
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"auto_sanitize"', "taskname": taskname}))).read()
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": "true" if auto else "false", "taskname": taskname}))).read()
- def set_current(self, name):
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (name), "taskname": taskname}))).read()
- def poll(self):
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"poll"', "taskname": taskname}))).read()
- def lower(value):
- return value / JUMP * JUMP
- def upper(value):
- return (value / JUMP + 1) * JUMP
- def avg(a, b):
- return float(a + b) / 2
- class InterfaceCore():
- drawn = set()
- refs = dict()
- mv = MvLayer(address)
- #mv = FakeLayer(address)
- def delete(self, x, y):
- lname = self.find((x, y))
- self.mv.delete(lname)
- for i in self.refs[lname]:
- canvas.delete(i)
- del self.refs[lname]
- if lname in names:
- self.canvas.delete(names[lname])
- del names[lname]
- self.drawn = set([e for e in self.drawn if e[4] != lname])
- def clicked(self, event):
- if self.find((event.x, event.y)):
- # Something already there, so don't add, but modify
- lname = self.find((event.x, event.y))
- global request_new_state
- if request_new_state:
- request_new_state = True
- self.mv.set_current(lname)
- else:
- attrs = self.mv.read_available_attributes(lname)
- print("Processing attributes " + str(attrs))
- for attr, t in attrs:
- print("Reading attribute " + str(attr))
- old_value = self.mv.read_attribute(lname, attr)
- print("Got value " + str(old_value))
- if old_value == "None":
- old_value = None
- new_value = tkSimpleDialog.askstring("Attribute modification", attr, initialvalue=old_value)
- canvas.focus_set()
- if t == "Float":
- new_value = float(new_value)
- elif t == "String":
- new_value = str(new_value)
- elif t == "Natural":
- new_value = int(new_value)
- else:
- print("Got unknown type: " + str(t))
- self.mv.set_attribute(lname, attr, new_value)
- print("Set value")
- if attr in ["name", "event"]:
- if lname in names:
- self.canvas.delete(names[lname])
- del names[lname]
- entry = [x for x in self.drawn if str(x[4]) == str(lname)][0]
- xc, xy = avg(entry[0], entry[2]), avg(entry[1], entry[3])
- names[lname] = self.canvas.create_text(xc, xy, text=new_value)
- else:
- global name
- x = event.x
- y = event.y
- self.mv.instantiate_element(str(name), "State")
- r = canvas.create_oval(lower(x), lower(y), upper(x), upper(y), fill="white")
- b = (lower(x), lower(y), upper(x), upper(y), str(name), "NODE")
- self.drawn.add(b)
- self.refs[str(name)] = [r]
- name += 1
- def switch_initial(self, evt):
- if self.find((evt.x, evt.y)):
- lname = self.find((evt.x, evt.y))
- self.mv.switch_initial(lname)
- # Update visual representation
- for f in self.refs.values():
- self.canvas.itemconfigure(f, width=1)
- self.canvas.itemconfigure(self.refs[lname], width=4)
- def find(self, location):
- def sqrt(a):
- return a**0.5
- x, y = location
- matches = []
- for e in self.drawn:
- x1, y1, x2, y2, x0, y0 = e[0], e[1], e[2], e[3], x, y
- if e[5] == "LINK":
- distance = abs((y2 - y1) * x0 - (x2 - x1) * y0 + x2 * y1 - y2 * x1) / sqrt((y2 - y1) ** 2 + (x2 - x1) ** 2) - CLICK_TOLERANCE
- elif e[5] == "NODE":
- if x0 <= x2 and x0 > x1 and y0 <= y2 and y0 > y1:
- distance = 0.0
- else:
- distance = float("inf")
- matches.append((distance, e[4]))
- if matches:
- minimal_distance, name = sorted(matches)[0]
- if minimal_distance <= 0:
- return name
- return None
- def draw(self, start, end):
- source = self.find(start)
- target = self.find(end)
- if source and target:
- global name
- self.mv.instantiate_link(str(name), "Transition", source, target)
- self.refs[str(name)] = [canvas.create_line(start[0], start[1], end[0], end[1], fill="black", arrow=LAST)]
- self.drawn.add((start[0], start[1], end[0], end[1], str(name), "LINK"))
- name += 1
- def add_event(self, event):
- self.mv.send_event(event)
- core = InterfaceCore()
- def clicked(event):
- if setting_initial:
- core.switch_initial(event)
- else:
- core.clicked(event)
- def draw(event):
- global start_location
- start_location = (event.x, event.y)
- def release(event):
- core.draw(start_location, (event.x, event.y))
- def simulate():
- core.mv.simulate()
- def pause():
- core.mv.pause()
- def delete(event):
- core.delete(event.x, event.y)
- def add_event():
- core.add_event(event_entry.get())
- def control_released(evt):
- global setting_initial
- setting_initial = False
- def control_pressed(evt):
- global setting_initial
- setting_initial = True
- def auto_sanitize():
- core.mv.auto_sanitize(True)
- def manual_sanitize():
- core.mv.auto_sanitize(False)
- buttons = [
- Button(root, text="START", command=simulate),
- Button(root, text="PAUSE", command=pause),
- Button(root, text="AUTO", command=auto_sanitize),
- Button(root, text="MANUAL", command=manual_sanitize),
- Entry(root, textvariable=event_entry),
- Button(root, text="EVENT", command=add_event),
- ]
- for i, b in enumerate(buttons):
- b.grid(row=0, column=i)
- canvas.grid(row=1,column=0,columnspan=len(buttons))
- core.canvas = canvas
- for i in range(JUMP, MAX_HEIGHT, JUMP):
- canvas.create_line(0, i, MAX_HEIGHT, i, fill="grey")
- for i in range(JUMP, MAX_WIDTH, JUMP):
- canvas.create_line(i, 0, i, MAX_WIDTH, fill="grey")
- canvas.focus_set()
- canvas.bind("<KeyPress-Control_L>", control_pressed)
- canvas.bind("<KeyRelease-Control_L>", control_released)
- canvas.bind("<Button-1>", clicked)
- canvas.bind("<Button-2>", delete)
- canvas.bind("<Button-3>", draw)
- canvas.bind("<ButtonRelease-3>", release)
- visual = Toplevel(root)
- # Example:
- # simulation = [(1, "A"), (3, "B"), (4, "A")]
- #inp_evts = [(2, "arm"), (6, "detected")]
- #state = [(0, "idle"), (2, "armed"), (6, "detected"), (8, "idle")]
- #outp_evts = [(6, "soundAlarm"), ]
- inp_evts = []
- state = []
- outp_evts = []
- frame = Frame(visual)
- frame.pack(fill=BOTH,expand=True)
- figsize = (6, 3)
- f_inp = Figure(figsize=figsize)
- a_inp = f_inp.add_subplot(111)
- a_inp.plot([], [])
- f_inp.suptitle("Inputs")
- fcanvas_inp = FigureCanvasTkAgg(f_inp, frame)
- fcanvas_inp.show()
- fcanvas_inp.get_tk_widget().pack()
- f_state = Figure(figsize=figsize)
- a_state = f_state.add_subplot(111)
- a_state.plot([], [])
- f_state.suptitle("State")
- fcanvas_state = FigureCanvasTkAgg(f_state, frame)
- fcanvas_state.show()
- fcanvas_state.get_tk_widget().pack()
- f_outp = Figure(figsize=figsize)
- a_outp = f_outp.add_subplot(111)
- a_outp.plot([], [])
- f_outp.suptitle("Outputs")
- fcanvas_outp = FigureCanvasTkAgg(f_outp, frame)
- fcanvas_outp.show()
- fcanvas_outp.get_tk_widget().pack()
- reverse_lookup_inp = {}
- reverse_lookup_state = {}
- reverse_lookup_outp = {}
- glob_max_x = 0.0
- def update_graphs():
-
- if state:
- max_x = state[-1][0]
- else:
- max_x = 0.0001
- global glob_max_x
- glob_max_x = max(max_x, glob_max_x)
- core.mv.poll()
- # Input events
- times = [x[0] for x in inp_evts]
- events = [reverse_lookup_inp.setdefault(x[1], len(reverse_lookup_inp)) for x in inp_evts]
- lookup = [None] * len(reverse_lookup_inp)
- for k, v in reverse_lookup_inp.items():
- lookup[v] = k
- a_inp.clear()
- a_inp.plot(times, events, linestyle="none", marker="o")
- f_inp.get_axes()[0].set_xbound(lower=0.0, upper=glob_max_x)
- f_inp.get_axes()[0].set_yticks(range(-2, len(lookup) + 2))
- f_inp.get_axes()[0].set_yticklabels(["", ""] + lookup + ["", ""])
- fcanvas_inp.draw()
- # States
- times = [x[0] for x in state]
- events = [reverse_lookup_state.setdefault(x[1], len(reverse_lookup_state)) for x in state]
- lookup = [None] * len(reverse_lookup_state)
- for k, v in reverse_lookup_state.items():
- lookup[v] = k
- a_state.clear()
- a_state.plot(times, events, linestyle="solid", drawstyle="steps-post")
- f_state.get_axes()[0].set_xbound(lower=0.0, upper=glob_max_x)
- f_state.get_axes()[0].set_yticks(range(-2, len(lookup) + 2))
- f_state.get_axes()[0].set_yticklabels(["", ""] + lookup + ["", ""])
- fcanvas_state.draw()
- # Input events
- times = [x[0] for x in outp_evts]
- events = [reverse_lookup_outp.setdefault(x[1], len(reverse_lookup_outp)) for x in outp_evts]
- lookup = [None] * len(reverse_lookup_outp)
- for k, v in reverse_lookup_outp.items():
- lookup[v] = k
- a_outp.clear()
- a_outp.plot(times, events, linestyle="none", marker="o")
- f_outp.get_axes()[0].set_xbound(lower=0.0, upper=glob_max_x)
- f_outp.get_axes()[0].set_yticks(range(-2, len(lookup) + 2))
- f_outp.get_axes()[0].set_yticklabels(["", ""] + lookup + ["", ""])
- fcanvas_outp.draw()
- root.after(500, update_graphs)
- root.after(1, update_graphs)
- root.mainloop()
|