import matplotlib matplotlib.use("TkAgg") from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg from matplotlib.figure import Figure from matplotlib import rcParams rcParams.update({'figure.autolayout': True}) from Tkinter import * from PIL import Image, ImageTk import tkSimpleDialog import urllib import urllib2 import json import time JUMP = 50 MAX_WIDTH = 10 * JUMP MAX_HEIGHT = 10 * JUMP CLICK_TOLERANCE = 5 address = "http://127.0.0.1:8001" username = "test" root = Tk() names = {} event_entry = StringVar() setting_initial = False request_new_state = False canvas = Canvas(root, width=MAX_WIDTH, height=MAX_HEIGHT, bg="white") name = 0 class FakeLayer(): def __init__(self, address): self.types = {} self.sources = {} self.targets = {} self.attrs = {} def send_event(self, event): pass def read_available_attributes(self, name): return [("name", "String")] def read_attribute(self, name, attr): return self.attrs.get(name, {}).get(attr, None) def set_attribute(self, name, attr, value): self.attrs.setdefault(name, {})[attr] = value def instantiate_element(self, name, block_type): self.types[name] = block_type def instantiate_link(self, name, link_type, source, target): self.types[name] = link_type self.sources[name] = source self.targets[name] = target self.attrs[name] = {} def simulate(self): pass def pause(self): pass def delete(self): pass def switch_initial(self, name): pass def auto_sanitize(self, auto): pass def set_current(self, name): pass attribute = [] available_attrs = [] inp_evts = [] state = [] outp_evts = [] #inp_evts = [(2, "arm"), (6, "detected")] #state = [(0, "idle"), (2, "armed"), (6, "detected"), (8, "idle")] #outp_evts = [(6, "soundAlarm"), ] def poll(address): simulation_time = None working_available_attrs = [] while 1: returnvalue = json.loads(urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "get_output", "username": username}))).read()) print("Process " + str(returnvalue)) if (returnvalue.startswith("AVAILABLE_ATTR_VALUE")): working_available_attrs.append([json.loads(returnvalue.split(" ", 1)[1]), None]) elif (returnvalue.startswith("AVAILABLE_ATTR_TYPE")): working_available_attrs[-1][1] = json.loads(returnvalue.split(" ", 1)[1]) elif (returnvalue.startswith("AVAILABLE_ATTR_END")): available_attrs.append(working_available_attrs) working_available_attrs = [] elif (returnvalue.startswith("ATTR_VALUE")): v = returnvalue.split(" ", 1)[1] if v == "None": v = None else: v = json.loads(v) attribute.append(v) elif (returnvalue.startswith("SIM_TIME")): simulation_time = json.loads(returnvalue.split(" ", 1)[1]) elif (returnvalue.startswith("SIM_STATE")): state.append((simulation_time, returnvalue.split(" ", 1)[1])) elif (returnvalue.startswith("SIM_EVENT")): inp_evts.append((simulation_time, returnvalue.split(" ", 1)[1])) elif (returnvalue.startswith("SIM_RAISE")): outp_evts.append((simulation_time, returnvalue.split(" ", 1)[1])) elif (returnvalue.startswith("CONFORMANCE_OK")): root.configure(background="grey") elif (returnvalue.startswith("CONFORMANCE_FAIL")): root.configure(background="red") elif (returnvalue.startswith("REQUEST_CURRENT_STATE"): root.configure(background="blue") global request_new_state request_new_state = True else: print("Error: got unknown result: " + returnvalue) class MvLayer(): def __init__(self, address): import threading self.address = address urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % username, "username": "user_manager"}))).read() thrd = threading.Thread(target=poll, args=[address]) thrd.daemon = True thrd.start() def read_available_attributes(self, name): urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"read_available_attributes"', "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % name, "username": username}))).read() while not available_attrs: time.sleep(0.1) return available_attrs.pop(0) def read_attribute(self, name, attr): urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"read_attribute"', "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % name, "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % attr, "username": username}))).read() while not attribute: time.sleep(0.1) return attribute.pop(0) def set_attribute(self, name, attr, value): urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"set_attribute"', "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % name, "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % attr, "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": json.dumps(value), "username": username}))).read() def instantiate_element(self, name, block_type): urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"instantiate_node"', "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (block_type), "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (name), "username": username}))).read() def instantiate_link(self, name, link_type, source, target): urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"instantiate_association"', "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (link_type), "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (name), "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (source), "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (target), "username": username}))).read() def simulate(self): urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"simulate"', "username": username}))).read() def pause(self): urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"pause"', "username": username}))).read() def send_event(self, event): urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"event"', "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (event), "username": username}))).read() def delete(self, block): urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"delete_element"', "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (block), "username": username}))).read() def switch_initial(self, name): urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"switch_initial"', "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (name), "username": username}))).read() def auto_sanitize(self, auto): urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"auto_sanitize"', "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": "true" if auto else "false", "username": username}))).read() def set_current(self, name): urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (name), "username": username}))).read() def lower(value): return value / JUMP * JUMP def upper(value): return (value / JUMP + 1) * JUMP def avg(a, b): return float(a + b) / 2 class InterfaceCore(): drawn = set() refs = dict() #mv = MvLayer(address) mv = FakeLayer(address) def delete(self, x, y): lname = self.find((x, y)) self.mv.delete(lname) for i in self.refs[lname]: canvas.delete(i) del self.refs[lname] if lname in names: self.canvas.delete(names[lname]) del names[lname] self.drawn = set([e for e in self.drawn if e[4] != lname]) def clicked(self, event): if self.find((event.x, event.y)): # Something already there, so don't add, but modify lname = self.find((event.x, event.y)) if request_new_state: global request_new_state request_new_state = True self.mv.set_current(lname) else: attrs = self.mv.read_available_attributes(lname) for attr, t in attrs: old_value = self.mv.read_attribute(lname, attr) if old_value == "None": old_value = None new_value = tkSimpleDialog.askstring("Attribute modification", attr, initialvalue=old_value) canvas.focus_set() if t == "Float": new_value = float(new_value) elif t == "String": new_value = str(new_value) elif t == "Natural": new_value = int(new_value) else: print("Got unknown type: " + str(t)) self.mv.set_attribute(lname, attr, new_value) if attr == "name": if lname in names: self.canvas.delete(names[lname]) del names[lname] entry = [x for x in self.drawn if str(x[4]) == str(lname)][0] xc, xy = avg(entry[0], entry[2]), avg(entry[1], entry[3]) names[lname] = self.canvas.create_text(xc, xy, text=new_value) else: global name x = event.x y = event.y self.mv.instantiate_element(str(name), "State") r = canvas.create_oval(lower(x), lower(y), upper(x), upper(y), fill="white") b = (lower(x), lower(y), upper(x), upper(y), str(name), "NODE") self.drawn.add(b) self.refs[str(name)] = [r] name += 1 def switch_initial(self, evt): if self.find((evt.x, evt.y)): lname = self.find((evt.x, evt.y)) self.mv.switch_initial(lname) # Update visual representation for f in self.refs.values(): self.canvas.itemconfigure(f, width=1) self.canvas.itemconfigure(self.refs[lname], width=4) def find(self, location): def sqrt(a): return a**0.5 x, y = location matches = [] for e in self.drawn: x1, y1, x2, y2, x0, y0 = e[0], e[1], e[2], e[3], x, y if e[5] == "LINK": distance = abs((y2 - y1) * x0 - (x2 - x1) * y0 + x2 * y1 - y2 * x1) / sqrt((y2 - y1) ** 2 + (x2 - x1) ** 2) - CLICK_TOLERANCE elif e[5] == "NODE": if x0 <= x2 and x0 > x1 and y0 <= y2 and y0 > y1: distance = 0.0 else: distance = float("inf") matches.append((distance, e[4])) if matches: minimal_distance, name = sorted(matches)[0] if minimal_distance <= 0: return name return None def draw(self, start, end): source = self.find(start) target = self.find(end) if source and target: global name self.mv.instantiate_link(str(name), "Transition", source, target) self.refs[str(name)] = [canvas.create_line(start[0], start[1], end[0], end[1], fill="black", arrow=LAST)] self.drawn.add((start[0], start[1], end[0], end[1], str(name), "LINK")) name += 1 def add_event(self, event): self.mv.send_event(event) core = InterfaceCore() def clicked(event): if setting_initial: core.switch_initial(event) else: core.clicked(event) def draw(event): global start_location start_location = (event.x, event.y) def release(event): core.draw(start_location, (event.x, event.y)) def simulate(): core.mv.simulate() def pause(): core.mv.pause() def delete(event): core.delete(event.x, event.y) def add_event(): core.add_event(event_entry.get()) def control_released(evt): global setting_initial setting_initial = False def control_pressed(evt): global setting_initial setting_initial = True def auto_sanitize(): core.mv.auto_sanitize(True) def manual_sanitize(): core.mv.auto_sanitize(False) buttons = [ Button(root, text="START", command=simulate), Button(root, text="PAUSE", command=pause), Button(root, text="AUTO", command=auto_sanitize), Button(root, text="MANUAL", command=manual_sanitize), Entry(root, textvariable=event_entry), Button(root, text="EVENT", command=add_event), ] for i, b in enumerate(buttons): b.grid(row=0, column=i) canvas.grid(row=1,column=0,columnspan=len(buttons)) core.canvas = canvas for i in range(JUMP, MAX_HEIGHT, JUMP): canvas.create_line(0, i, MAX_HEIGHT, i, fill="grey") for i in range(JUMP, MAX_WIDTH, JUMP): canvas.create_line(i, 0, i, MAX_WIDTH, fill="grey") canvas.focus_set() canvas.bind("", control_pressed) canvas.bind("", control_released) canvas.bind("", clicked) canvas.bind("", delete) canvas.bind("", draw) canvas.bind("", release) visual = Toplevel(root) # Example: # simulation = [(1, "A"), (3, "B"), (4, "A")] #inp_evts = [(2, "arm"), (6, "detected")] #state = [(0, "idle"), (2, "armed"), (6, "detected"), (8, "idle")] #outp_evts = [(6, "soundAlarm"), ] inp_evts = [] state = [] outp_evts = [] frame = Frame(visual) frame.pack(fill=BOTH,expand=True) figsize = (6, 3) f_inp = Figure(figsize=figsize) a_inp = f_inp.add_subplot(111) a_inp.plot([], []) f_inp.suptitle("Inputs") fcanvas_inp = FigureCanvasTkAgg(f_inp, frame) fcanvas_inp.show() fcanvas_inp.get_tk_widget().pack() f_state = Figure(figsize=figsize) a_state = f_state.add_subplot(111) a_state.plot([], []) f_state.suptitle("State") fcanvas_state = FigureCanvasTkAgg(f_state, frame) fcanvas_state.show() fcanvas_state.get_tk_widget().pack() f_outp = Figure(figsize=figsize) a_outp = f_outp.add_subplot(111) a_outp.plot([], []) f_outp.suptitle("Outputs") fcanvas_outp = FigureCanvasTkAgg(f_outp, frame) fcanvas_outp.show() fcanvas_outp.get_tk_widget().pack() reverse_lookup_inp = {} reverse_lookup_state = {} reverse_lookup_outp = {} def update_graphs(): # Input events times = [x[0] for x in inp_evts] events = [reverse_lookup_inp.setdefault(x[1], len(reverse_lookup_inp)) for x in inp_evts] lookup = [None] * len(reverse_lookup_inp) for k, v in reverse_lookup_inp.items(): lookup[v] = k a_inp.clear() a_inp.plot(times, events, linestyle="none", marker="o") #f_inp.get_axes()[0].set_xbound(lower=0.0, upper=10.0) f_inp.get_axes()[0].set_yticks(range(-2, len(lookup) + 2)) f_inp.get_axes()[0].set_yticklabels(["", ""] + lookup + ["", ""]) fcanvas_inp.draw() # States times = [x[0] for x in state] events = [reverse_lookup_state.setdefault(x[1], len(reverse_lookup_state)) for x in state] lookup = [None] * len(reverse_lookup_state) for k, v in reverse_lookup_state.items(): lookup[v] = k a_state.clear() a_state.plot(times, events, linestyle="solid", marker="o", drawstyle="steps-post") #f_state.get_axes()[0].set_xbound(lower=0.0, upper=10.0) f_state.get_axes()[0].set_yticks(range(-2, len(lookup) + 2)) f_state.get_axes()[0].set_yticklabels(["", ""] + lookup + ["", ""]) fcanvas_state.draw() # Input events times = [x[0] for x in outp_evts] events = [reverse_lookup_outp.setdefault(x[1], len(reverse_lookup_outp)) for x in outp_evts] lookup = [None] * len(reverse_lookup_outp) for k, v in reverse_lookup_outp.items(): lookup[v] = k a_outp.clear() a_outp.plot(times, events, linestyle="none", marker="o") #f_outp.get_axes()[0].set_xbound(lower=0.0, upper=10.0) f_outp.get_axes()[0].set_yticks(range(-2, len(lookup) + 2)) f_outp.get_axes()[0].set_yticklabels(["", ""] + lookup + ["", ""]) fcanvas_outp.draw() root.after(500, update_graphs) root.after(500, update_graphs) root.mainloop()