import matplotlib matplotlib.use("TkAgg") from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg from matplotlib.figure import Figure from Tkinter import * import tkSimpleDialog import urllib import urllib2 import json import time JUMP = 40 MAX_WIDTH = 20 * JUMP MAX_HEIGHT = 20 * JUMP address = "http://127.0.0.1:8001" username = "test" root = Tk() canvas = Canvas(root, width=MAX_WIDTH, height=MAX_HEIGHT, bg="white") canvas.pack() name = 0 class FakeLayer(): def __init__(self, address): self.types = {} self.sources = {} self.targets = {} self.attrs = {} def read_available_attributes(self, name): if self.types[name] == "const": return ["value"] else: return [] def read_attribute(self, name, attr): return self.attr.get(name, {}).get(attr, None) def set_attribute(self, name, attr, value): self.attrs[name][attr] = value def instantiate_block(self, name, block_type): self.types[name] = block_type def instantiate_link(self, name, link_type, source, target): self.types[name] = link_type self.sources[name] = source self.targets[name] = target self.attrs[name] = {} def simulate(self): pass def step(self): pass def pause(self): pass attribute = [] available_attrs = [] simulation = [] #simulation = [(1, {"a": 1, "b": 2}), (2, {"a": 3}), (3, {"a": 4, "b": 6})] def poll(address): working_available_attrs = [] working_simulation = None while 1: returnvalue = json.loads(urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "get_output", "username": username}))).read()) print("Process " + str(returnvalue)) if (returnvalue.startswith("AVAILABLE_ATTR_VALUE")): working_available_attrs.append([json.loads(returnvalue.split(" ", 1)[1]), None]) elif (returnvalue.startswith("AVAILABLE_ATTR_TYPE")): working_available_attrs[-1][1] = json.loads(returnvalue.split(" ", 1)[1]) elif (returnvalue.startswith("AVAILABLE_ATTR_END")): available_attrs.append(working_available_attrs) working_available_attrs = [] elif (returnvalue.startswith("ATTR_VALUE")): v = returnvalue.split(" ", 1)[1] if v == "None": v = None else: v = json.loads(v) attribute.append(v) elif (returnvalue.startswith("SIM_TIME")): working_simulation = (json.loads(returnvalue.split(" ", 1)[1]), {}) elif (returnvalue.startswith("SIM_PROBE")): blockname, blockvalue = returnvalue.split(" ", 1)[1].rsplit(" ", 1) working_simulation[1][json.loads(blockname)] = json.loads(blockvalue) elif (returnvalue.startswith("SIM_END")): simulation.append(working_simulation) working_simulation = None elif (returnvalue.startswith("CONFORMANCE_OK")): root.configure(background="grey") elif (returnvalue.startswith("CONFORMANCE_FAIL")): root.configure(background="red") else: print("Error: got unknown result: " + returnvalue) class MvLayer(): def __init__(self, address): import threading self.address = address urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % username, "username": "user_manager"}))).read() thrd = threading.Thread(target=poll, args=[address]) thrd.daemon = True thrd.start() def read_available_attributes(self, name): urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"read_available_attributes"', "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % name, "username": username}))).read() while not available_attrs: time.sleep(0.1) return available_attrs.pop(0) def read_attribute(self, name, attr): urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"read_attribute"', "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % name, "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % attr, "username": username}))).read() while not attribute: time.sleep(0.1) return attribute.pop(0) def set_attribute(self, name, attr, value): urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"set_attribute"', "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % name, "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % attr, "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": json.dumps(value), "username": username}))).read() def instantiate_block(self, name, block_type): urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"instantiate_node"', "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (block_type), "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (name), "username": username}))).read() def instantiate_link(self, name, link_type, source, target): urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"instantiate_association"', "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (link_type), "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (name), "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (source), "username": username}))).read() urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (target), "username": username}))).read() def simulate(self): urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"simulate"', "username": username}))).read() def step(self): urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"step"', "username": username}))).read() def pause(self): urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"pause"', "username": username}))).read() def lower(value): return value / JUMP * JUMP def upper(value): return (value / JUMP + 1) * JUMP def avg(a, b): return float(a + b) / 2 class InterfaceCore(): mode = "" drawn = set() refs = dict() mv = MvLayer(address) #mv = FakeLayer(address) def set_mode(self, mode): self.mode = mode def clicked(self, event): if self.find((event.x, event.y)): # Something already there, so don't add, but modify lname = self.find((event.x, event.y)) attrs = self.mv.read_available_attributes(lname) print("Managing " + str(attrs)) if not attrs: print("No attrs to manage!") for attr, t in attrs: print("Reading data from " + str(attr)) old_value = self.mv.read_attribute(lname, attr) if old_value == "None": old_value = None new_value = tkSimpleDialog.askstring("Attribute modification", attr, initialvalue=old_value) if t == "Float": new_value = float(new_value) else: print("Got unknown type: " + str(t)) self.mv.set_attribute(lname, attr, new_value) elif self.mode not in ["AdditionBlock", "NegatorBlock", "ConstantBlock", "MultiplyBlock", "ConstantBlock", "InverseBlock", "DelayBlock"]: print("Cannot create something not guaranteed to be block type!") else: global name x = event.x y = event.y self.mv.instantiate_block(str(name), self.mode) r = canvas.create_rectangle(lower(x), lower(y), upper(x), upper(y), fill="white") t = canvas.create_text(avg(lower(x), upper(x)), avg(lower(y), upper(y)), text=self.mode, fill="black") b = (lower(x), lower(y), upper(x), upper(y), str(name)) self.drawn.add(b) self.refs[str(name)] = [r, t] name += 1 def find(self, location): x, y = location for e in self.drawn: if (e[0] <= x and e[1] <= y and e[2] >= x and e[3] >= y): return e[4] print("Found nothing at that location!") return [] def draw(self, start, end): source = self.find(start) target = self.find(end) print("Connect from %s to %s" % (source, target)) if source and target: if self.mode not in ["Link", "InitialCondition"]: print("Cannot create something not guaranteed to be link type!") else: global name self.mv.instantiate_link(str(name), self.mode, source, target) self.refs[str(name)] = [canvas.create_line(start[0], start[1], end[0], end[1], fill="black", arrow=LAST)] name += 1 core = InterfaceCore() def clicked(event): core.clicked(event) def draw(event): global start_location start_location = (event.x, event.y) def release(event): core.draw(start_location, (event.x, event.y)) def simulate(): core.mv.simulate() def step(): core.mv.step() def pause(): core.mv.pause() def addition(): core.set_mode("AdditionBlock") def negation(): core.set_mode("NegatorBlock") def link(): core.set_mode("Link") def multiply(): core.set_mode("MultiplyBlock") def constant(): core.set_mode("ConstantBlock") def inverse(): core.set_mode("InverseBlock") def ic(): core.set_mode("InitialCondition") def delay(): core.set_mode("DelayBlock") Button(root, text="+", command=addition).pack() Button(root, text="-x", command=negation).pack() Button(root, text="*", command=multiply).pack() Button(root, text="1/x", command=inverse).pack() Button(root, text="Constant", command=constant).pack() Button(root, text="Delay", command=delay).pack() Button(root, text="Link", command=link).pack() Button(root, text="InitialCondition", command=ic).pack() Button(root, text="SIM", command=simulate).pack() Button(root, text="STEP", command=step).pack() Button(root, text="PAUSE", command=pause).pack() core.canvas = canvas for i in range(JUMP, MAX_HEIGHT, JUMP): canvas.create_line(0, i, MAX_HEIGHT, i, fill="grey") for i in range(JUMP, MAX_WIDTH, JUMP): canvas.create_line(i, 0, i, MAX_WIDTH, fill="grey") canvas.bind("", clicked) canvas.bind("", draw) canvas.bind("", release) visual = Toplevel(root) probes = {} values = {} #simulation = [(1, {"a": 1, "b": 2}), (2, {"a": 3}), (3, {"a": 4, "b": 6})] def update_graphs(): while simulation: t, results = simulation.pop(0) for k, v in results.items(): if k in probes: fcanvas, a = probes[k] else: f = Figure(figsize=(5,5), dpi=100) a = f.add_subplot(111) a.plot([], []) fcanvas = FigureCanvasTkAgg(f, visual) fcanvas.show() fcanvas.get_tk_widget().pack() probes[k] = (fcanvas, a) values[k] = ([], []) values[k][0].append(t) values[k][1].append(v) a.clear() a.plot(values[k][0], values[k][1], linestyle="none", marker="o") fcanvas.draw() root.after(50, update_graphs) root.after(50, update_graphs) root.mainloop()