import matplotlib matplotlib.use("TkAgg") from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg from matplotlib.figure import Figure from Tkinter import * from PIL import Image, ImageTk import tkSimpleDialog import urllib import urllib2 import json import time JUMP = 50 MAX_WIDTH = 20 * JUMP MAX_HEIGHT = 20 * JUMP address = "http://127.0.0.1:8001" username = "test" root = Tk() canvas = Canvas(root, width=MAX_WIDTH, height=MAX_HEIGHT, bg="white") name = 0 class FakeLayer(): def __init__(self, address): self.types = {} self.sources = {} self.targets = {} self.attrs = {} def read_available_attributes(self, name): if self.types[name] == "const": return ["value"] else: return [] def read_attribute(self, name, attr): return self.attr.get(name, {}).get(attr, None) def set_attribute(self, name, attr, value): self.attrs[name][attr] = value def instantiate_block(self, name, block_type): self.types[name] = block_type def instantiate_link(self, name, link_type, source, target): self.types[name] = link_type self.sources[name] = source self.targets[name] = target self.attrs[name] = {} def simulate(self): pass def step(self): pass def pause(self): pass attribute = [] available_attrs = [] simulation = [] def poll(address): working_available_attrs = [] working_simulation = None while 1: returnvalue = json.loads(urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "get_output", "username": username}))).read()) print("Process " + str(returnvalue)) if (returnvalue.startswith("AVAILABLE_ATTR_VALUE")): working_available_attrs.append([json.loads(returnvalue.split(" ", 1)[1]), None]) elif (returnvalue.startswith("AVAILABLE_ATTR_TYPE")): working_available_attrs[-1][1] = json.loads(returnvalue.split(" ", 1)[1]) elif (returnvalue.startswith("AVAILABLE_ATTR_END")): available_attrs.append(working_available_attrs) working_available_attrs = [] elif (returnvalue.startswith("ATTR_VALUE")): v = returnvalue.split(" ", 1)[1] if v == "None": v = None else: v = json.loads(v) attribute.append(v) elif (returnvalue.startswith("SIM_TIME")): working_simulation = (json.loads(returnvalue.split(" ", 1)[1]), {}) elif (returnvalue.startswith("SIM_PROBE")): blockname, blockvalue = returnvalue.split(" ", 1)[1].rsplit(" ", 1) working_simulation[1][json.loads(blockname)] = json.loads(blockvalue) elif (returnvalue.startswith("SIM_END")): simulation.append(working_simulation) working_simulation = None elif (returnvalue.startswith("CONFORMANCE_OK")): root.configure(background="grey") elif (returnvalue.startswith("CONFORMANCE_FAIL")): root.configure(background="red") elif (returnvalue.startswith("ALGEBRAIC_LOOP")): root.configure(background="blue") else: print("Error: got unknown result: " + returnvalue) class MvLayer(): def __init__(self, address): import threading self.address = address urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % username, "username": "user_manager"}))).read() thrd = threading.Thread(target=poll, args=[address]) thrd.daemon = True thrd.start() def read_available_attributes(self, name): urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"read_available_attributes"', "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % name, "username": username}))).read() while not available_attrs: time.sleep(0.1) return available_attrs.pop(0) def read_attribute(self, name, attr): urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"read_attribute"', "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % name, "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % attr, "username": username}))).read() while not attribute: time.sleep(0.1) return attribute.pop(0) def set_attribute(self, name, attr, value): urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"set_attribute"', "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % name, "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % attr, "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": json.dumps(value), "username": username}))).read() def instantiate_block(self, name, block_type): urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"instantiate_node"', "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (block_type), "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (name), "username": username}))).read() def instantiate_link(self, name, link_type, source, target): urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"instantiate_association"', "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (link_type), "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (name), "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (source), "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (target), "username": username}))).read() def simulate(self): urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"simulate"', "username": username}))).read() def step(self): urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"step"', "username": username}))).read() def pause(self): urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"pause"', "username": username}))).read() def delete(self, block): urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"delete_element"', "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (block), "username": username}))).read() def lower(value): return value / JUMP * JUMP def upper(value): return (value / JUMP + 1) * JUMP def avg(a, b): return float(a + b) / 2 class InterfaceCore(): mode = "" drawn = set() refs = dict() #mv = MvLayer(address) mv = FakeLayer(address) def set_mode(self, mode): self.mode = mode def delete(self, x, y): lname = self.find((x, y)) self.mv.delete(lname) for i in self.refs[lname]: canvas.delete(i) del self.refs[lname] self.drawn = set([e for e in self.drawn if e[4] != lname]) def clicked(self, event): if self.find((event.x, event.y)): # Something already there, so don't add, but modify lname = self.find((event.x, event.y)) attrs = self.mv.read_available_attributes(lname) print("Managing " + str(attrs)) if not attrs: print("No attrs to manage!") for attr, t in attrs: print("Reading data from " + str(attr)) old_value = self.mv.read_attribute(lname, attr) if old_value == "None": old_value = None new_value = tkSimpleDialog.askstring("Attribute modification", attr, initialvalue=old_value) if t == "Float": new_value = float(new_value) elif t == "String": new_value = str(new_value) else: print("Got unknown type: " + str(t)) self.mv.set_attribute(lname, attr, new_value) elif self.mode not in ["AdditionBlock", "NegatorBlock", "ConstantBlock", "MultiplyBlock", "ConstantBlock", "InverseBlock", "DelayBlock", "IntegratorBlock", "DerivatorBlock", "ProbeBlock"]: print("Cannot create something not guaranteed to be block type!") else: global name x = event.x y = event.y self.mv.instantiate_block(str(name), self.mode) r = canvas.create_rectangle(lower(x), lower(y), upper(x), upper(y), fill="white") t = canvas.create_image(avg(lower(x), upper(x)), avg(lower(y), upper(y)), image=photos[self.mode]) b = (lower(x), lower(y), upper(x), upper(y), str(name)) self.drawn.add(b) self.refs[str(name)] = [r, t] name += 1 def find(self, location): x, y = location for e in self.drawn: if (e[0] <= x and e[1] <= y and e[2] >= x and e[3] >= y): return e[4] print("Found nothing at that location!") return [] def draw(self, start, end): source = self.find(start) target = self.find(end) print("Connect from %s to %s" % (source, target)) if source and target: if self.mode not in ["Link", "InitialCondition"]: print("Cannot create something not guaranteed to be link type!") elif source == target: print("Cannot create link to self") else: global name self.mv.instantiate_link(str(name), self.mode, source, target) self.refs[str(name)] = [canvas.create_line(start[0], start[1], end[0], end[1], fill="black" if self.mode == "Link" else "red", arrow=LAST)] name += 1 core = InterfaceCore() def clicked(event): core.clicked(event) def draw(event): global start_location start_location = (event.x, event.y) def release(event): core.draw(start_location, (event.x, event.y)) def simulate(): core.mv.simulate() def step(): core.mv.step() def pause(): core.mv.pause() def addition(): core.set_mode("AdditionBlock") def negation(): core.set_mode("NegatorBlock") def link(): core.set_mode("Link") def multiply(): core.set_mode("MultiplyBlock") def constant(): core.set_mode("ConstantBlock") def inverse(): core.set_mode("InverseBlock") def ic(): core.set_mode("InitialCondition") def delay(): core.set_mode("DelayBlock") def derivator(): core.set_mode("DerivatorBlock") def integrator(): core.set_mode("IntegratorBlock") def delete(event): core.delete(event.x, event.y) def probe(): core.set_mode("ProbeBlock") images = { "ConstantBlock": Image.open("figures/constant.png"), "AdditionBlock": Image.open("figures/summation.png"), "NegatorBlock": Image.open("figures/negation.png"), "MultiplyBlock": Image.open("figures/multiply.png"), "InverseBlock": Image.open("figures/inverse.png"), "DerivatorBlock": Image.open("figures/derivative.png"), "IntegratorBlock": Image.open("figures/integrator.png"), "DelayBlock": Image.open("figures/delay.png"), "ProbeBlock": Image.open("figures/probe.png"), } hsize = 20 for k, v in images.items(): old_vsize, old_hsize = v.size vsize = int(old_vsize * hsize / float(old_hsize)) images[k] = v.resize((vsize, hsize), Image.ANTIALIAS) photos = {k: ImageTk.PhotoImage(v) for k, v in images.items()} bound_functions = { "ConstantBlock": constant, "AdditionBlock": addition, "MultiplyBlock": multiply, "NegatorBlock": negation, "InverseBlock": inverse, "DerivatorBlock": derivator, "IntegratorBlock": integrator, "DelayBlock": delay, "ProbeBlock": probe, } buttons = [] for k, v in bound_functions.items(): buttons.append(Button(root, image=photos[k], command=v)) buttons.extend([ Button(root, text="Link", command=link, bg="blue"), Button(root, text="InitialCondition", command=ic, bg="blue"), Button(root, text="SIM", command=simulate, bg="green"), Button(root, text="STEP", command=step, bg="green"), Button(root, text="PAUSE", command=pause, bg="green"), ]) for i, b in enumerate(buttons): b.grid(row=0, column=i) canvas.grid(row=1,column=0,columnspan=len(buttons)) core.canvas = canvas for i in range(JUMP, MAX_HEIGHT, JUMP): canvas.create_line(0, i, MAX_HEIGHT, i, fill="grey") for i in range(JUMP, MAX_WIDTH, JUMP): canvas.create_line(i, 0, i, MAX_WIDTH, fill="grey") canvas.bind("", clicked) canvas.bind("", delete) canvas.bind("", draw) canvas.bind("", release) visual = Toplevel(root) probes = {} values = {} # Example: # simulation = [(1, {"a": 1, "b": 2}), (2, {"a": 3}), (3, {"a": 4, "b": 6})] # Class from StackOverflow class VerticalScrolledFrame(Frame): """A pure Tkinter scrollable frame that actually works! * Use the 'interior' attribute to place widgets inside the scrollable frame * Construct and pack/place/grid normally * This frame only allows vertical scrolling """ def __init__(self, parent, *args, **kw): Frame.__init__(self, parent, *args, **kw) # create a canvas object and a vertical scrollbar for scrolling it vscrollbar = Scrollbar(self, orient=VERTICAL) vscrollbar.pack(fill=Y, side=RIGHT, expand=FALSE) canvas = Canvas(self, bd=0, highlightthickness=0, yscrollcommand=vscrollbar.set) canvas.pack(side=LEFT, fill=BOTH, expand=TRUE) vscrollbar.config(command=canvas.yview) # reset the view canvas.xview_moveto(0) canvas.yview_moveto(0) # create a frame inside the canvas which will be scrolled with it self.interior = interior = Frame(canvas) interior_id = canvas.create_window(0, 0, window=interior, anchor=NW) # track changes to the canvas and frame width and sync them, # also updating the scrollbar def _configure_interior(event): # update the scrollbars to match the size of the inner frame size = (interior.winfo_reqwidth(), interior.winfo_reqheight()) canvas.config(scrollregion="0 0 %s %s" % size) if interior.winfo_reqwidth() != canvas.winfo_width(): # update the canvas's width to fit the inner frame canvas.config(width=interior.winfo_reqwidth()) interior.bind('', _configure_interior) def _configure_canvas(event): if interior.winfo_reqwidth() != canvas.winfo_width(): # update the inner frame's width to fill the canvas canvas.itemconfigure(interior_id, width=canvas.winfo_width()) canvas.bind('', _configure_canvas) frame = VerticalScrolledFrame(visual) frame.pack(fill=BOTH,expand=True) def update_graphs(): while simulation: t, results = simulation.pop(0) for k, v in results.items(): if k in probes: fcanvas, a = probes[k] else: f = Figure() a = f.add_subplot(111) a.plot([], []) print(k) f.suptitle(k) fcanvas = FigureCanvasTkAgg(f, frame.interior) fcanvas.show() fcanvas.get_tk_widget().pack() probes[k] = (fcanvas, a) values[k] = ([], []) values[k][0].append(t) values[k][1].append(v) a.clear() a.plot(values[k][0], values[k][1], linestyle="none", marker="o") fcanvas.draw() root.after(100, update_graphs) root.after(100, update_graphs) root.mainloop()