import matplotlib matplotlib.use("TkAgg") from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg from matplotlib.figure import Figure from Tkinter import * from PIL import Image, ImageTk import tkSimpleDialog import urllib import urllib2 import json import time JUMP = 50 MAX_WIDTH = 20 * JUMP MAX_HEIGHT = 20 * JUMP address = "http://127.0.0.1:8001" username = "test" root = Tk() event_entry = StringVar() canvas = Canvas(root, width=MAX_WIDTH, height=MAX_HEIGHT, bg="white") name = 0 class FakeLayer(): def __init__(self, address): self.types = {} self.sources = {} self.targets = {} self.attrs = {} def read_available_attributes(self, name): if self.types[name] == "const": return ["value"] else: return [] def read_attribute(self, name, attr): return self.attr.get(name, {}).get(attr, None) def set_attribute(self, name, attr, value): self.attrs[name][attr] = value def instantiate_block(self, name, block_type): self.types[name] = block_type def instantiate_link(self, name, link_type, source, target): self.types[name] = link_type self.sources[name] = source self.targets[name] = target self.attrs[name] = {} def simulate(self): pass def step(self): pass def pause(self): pass attribute = [] available_attrs = [] simulation = [] def poll(address): working_available_attrs = [] working_simulation = None while 1: returnvalue = json.loads(urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "get_output", "username": username}))).read()) print("Process " + str(returnvalue)) if (returnvalue.startswith("AVAILABLE_ATTR_VALUE")): working_available_attrs.append([json.loads(returnvalue.split(" ", 1)[1]), None]) elif (returnvalue.startswith("AVAILABLE_ATTR_TYPE")): working_available_attrs[-1][1] = json.loads(returnvalue.split(" ", 1)[1]) elif (returnvalue.startswith("AVAILABLE_ATTR_END")): available_attrs.append(working_available_attrs) working_available_attrs = [] elif (returnvalue.startswith("ATTR_VALUE")): v = returnvalue.split(" ", 1)[1] if v == "None": v = None else: v = json.loads(v) attribute.append(v) elif (returnvalue.startswith("SIM_TIME")): working_simulation = (json.loads(returnvalue.split(" ", 1)[1]), {}) elif (returnvalue.startswith("SIM_PROBE")): blockname, blockvalue = returnvalue.split(" ", 1)[1].rsplit(" ", 1) working_simulation[1][json.loads(blockname)] = json.loads(blockvalue) elif (returnvalue.startswith("SIM_END")): simulation.append(working_simulation) working_simulation = None elif (returnvalue.startswith("CONFORMANCE_OK")): root.configure(background="grey") elif (returnvalue.startswith("CONFORMANCE_FAIL")): root.configure(background="red") elif (returnvalue.startswith("ALGEBRAIC_LOOP")): root.configure(background="blue") else: print("Error: got unknown result: " + returnvalue) class MvLayer(): def __init__(self, address): import threading self.address = address urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % username, "username": "user_manager"}))).read() thrd = threading.Thread(target=poll, args=[address]) thrd.daemon = True thrd.start() def read_available_attributes(self, name): urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"read_available_attributes"', "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % name, "username": username}))).read() while not available_attrs: time.sleep(0.1) return available_attrs.pop(0) def read_attribute(self, name, attr): urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"read_attribute"', "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % name, "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % attr, "username": username}))).read() while not attribute: time.sleep(0.1) return attribute.pop(0) def set_attribute(self, name, attr, value): urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"set_attribute"', "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % name, "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % attr, "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": json.dumps(value), "username": username}))).read() def instantiate_block(self, name, block_type): urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"instantiate_node"', "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (block_type), "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (name), "username": username}))).read() def instantiate_link(self, name, link_type, source, target): urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"instantiate_association"', "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (link_type), "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (name), "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (source), "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (target), "username": username}))).read() def simulate(self): urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"simulate"', "username": username}))).read() def step(self): urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"step"', "username": username}))).read() def pause(self): urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"pause"', "username": username}))).read() def delete(self, block): urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"delete_element"', "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (block), "username": username}))).read() def lower(value): return value / JUMP * JUMP def upper(value): return (value / JUMP + 1) * JUMP def avg(a, b): return float(a + b) / 2 class InterfaceCore(): drawn = set() refs = dict() #mv = MvLayer(address) mv = FakeLayer(address) def delete(self, x, y): lname = self.find((x, y)) self.mv.delete(lname) for i in self.refs[lname]: canvas.delete(i) del self.refs[lname] self.drawn = set([e for e in self.drawn if e[4] != lname]) def clicked(self, event): if self.find((event.x, event.y)): # Something already there, so don't add, but modify lname = self.find((event.x, event.y)) attrs = self.mv.read_available_attributes(lname) print("Managing " + str(attrs)) if not attrs: print("No attrs to manage!") for attr, t in attrs: print("Reading data from " + str(attr)) old_value = self.mv.read_attribute(lname, attr) if old_value == "None": old_value = None new_value = tkSimpleDialog.askstring("Attribute modification", attr, initialvalue=old_value) if t == "Float": new_value = float(new_value) elif t == "String": new_value = str(new_value) else: print("Got unknown type: " + str(t)) self.mv.set_attribute(lname, attr, new_value) else: global name x = event.x y = event.y self.mv.instantiate_block(str(name), "State") r = canvas.create_oval(lower(x), lower(y), upper(x), upper(y), fill="white") b = (lower(x), lower(y), upper(x), upper(y), str(name)) self.refs[str(name)] = [r] name += 1 def find(self, location): def sqrt(a): return a**0.5 x, y = location matches = [] for e in self.drawn: x1, y1, x2, y2, x0, y0 = e[0], e[1], e[2], e[3], x, y distance = abs((y2 - y1) * x0 - (x2 - x1) * y0 + x2 * y1 - y2 * x1) / sqrt((y2 - y1) ** 2 + (x2 - x1) ** 2) matches.append((distance, e[4])) if matches: minimal_distance, name = sorted(matches)[0] print("Distance to " + name + " is " + str(minimal_distance)) if minimal_distance < 10: return name return None def draw(self, start, end): source = self.find(start) target = self.find(end) print("Connect from %s to %s" % (source, target)) if source and target: global name self.mv.instantiate_link(str(name), "Transition", source, target) self.refs[str(name)] = [canvas.create_line(start[0], start[1], end[0], end[1], fill="black", arrow=LAST)] self.drawn.add([start[0], start[1], end[0], end[1], str(name)]) name += 1 core = InterfaceCore() def clicked(event): core.clicked(event) def draw(event): global start_location start_location = (event.x, event.y) def release(event): core.draw(start_location, (event.x, event.y)) def simulate(): core.mv.simulate() def pause(): core.mv.pause() def delete(event): core.delete(event.x, event.y) def add_event(): core.add_event(event_entry.get()) buttons = [ Button(root, text="START", command=simulate), Button(root, text="PAUSE", command=pause), Entry(root, textvariable=event_entry), Button(root, text="EVENT", command=add_event), ] for i, b in enumerate(buttons): b.grid(row=0, column=i) canvas.grid(row=1,column=0,columnspan=len(buttons)) core.canvas = canvas for i in range(JUMP, MAX_HEIGHT, JUMP): canvas.create_line(0, i, MAX_HEIGHT, i, fill="grey") for i in range(JUMP, MAX_WIDTH, JUMP): canvas.create_line(i, 0, i, MAX_WIDTH, fill="grey") canvas.bind("", clicked) canvas.bind("", delete) canvas.bind("", draw) canvas.bind("", release) visual = Toplevel(root) # Example: # simulation = [(1, "A"), (3, "B"), (4, "A")] simulation = [(1, "A"), (3, "B"), (4, "A")] frame = Frame(visual) frame.pack(fill=BOTH,expand=True) f = Figure() a = f.add_subplot(111) a.plot([], []) f.suptitle("Events") fcanvas = FigureCanvasTkAgg(f, frame) fcanvas.show() fcanvas.get_tk_widget().pack() reverse_lookup = {} def update_graphs(): times = [x[0] for x in simulation] events = [reverse_lookup.setdefault(x[1], len(reverse_lookup)) for x in simulation] lookup = [None] * len(reverse_lookup) for k, v in reverse_lookup.items(): lookup[v] = k a.clear() a.plot(times, events, linestyle="none", marker="o") print("Lookup: " + str(lookup)) print("Events:" + str(events)) f.get_axes()[0].set_yticks(range(len(lookup))) f.get_axes()[0].set_yticklabels(lookup) fcanvas.draw() root.after(100, update_graphs) root.after(100, update_graphs) root.mainloop()