123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467 |
- import matplotlib
- matplotlib.use("TkAgg")
- from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
- from matplotlib.figure import Figure
- from Tkinter import *
- from PIL import Image, ImageTk
- import tkSimpleDialog
- import urllib
- import urllib2
- import json
- import time
- JUMP = 50
- MAX_WIDTH = 20 * JUMP
- MAX_HEIGHT = 20 * JUMP
- address = "http://127.0.0.1:8001"
- taskname = "test"
- root = Tk()
- canvas = Canvas(root, width=MAX_WIDTH, height=MAX_HEIGHT, bg="white")
- name = 0
- class FakeLayer():
- def __init__(self, address):
- self.types = {}
- self.sources = {}
- self.targets = {}
- self.attrs = {}
- def read_available_attributes(self, name):
- if self.types[name] == "const":
- return ["value"]
- else:
- return []
- def read_attribute(self, name, attr):
- return self.attr.get(name, {}).get(attr, None)
- def set_attribute(self, name, attr, value):
- self.attrs[name][attr] = value
- def instantiate_block(self, name, block_type):
- self.types[name] = block_type
- def instantiate_link(self, name, link_type, source, target):
- self.types[name] = link_type
- self.sources[name] = source
- self.targets[name] = target
- self.attrs[name] = {}
- def simulate(self):
- pass
- def step(self):
- pass
- def pause(self):
- pass
- attribute = []
- available_attrs = []
- simulation = []
- def poll(address):
- working_available_attrs = []
- working_simulation = None
- while 1:
- returnvalue = json.loads(urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "get_output", "taskname": taskname}))).read())
- print("Process " + str(returnvalue))
- if (returnvalue.startswith("AVAILABLE_ATTR_VALUE")):
- working_available_attrs.append([json.loads(returnvalue.split(" ", 1)[1]), None])
- elif (returnvalue.startswith("AVAILABLE_ATTR_TYPE")):
- working_available_attrs[-1][1] = json.loads(returnvalue.split(" ", 1)[1])
- elif (returnvalue.startswith("AVAILABLE_ATTR_END")):
- available_attrs.append(working_available_attrs)
- working_available_attrs = []
- elif (returnvalue.startswith("ATTR_VALUE")):
- v = returnvalue.split(" ", 1)[1]
- if v == "None":
- v = None
- else:
- v = json.loads(v)
- attribute.append(v)
- elif (returnvalue.startswith("SIM_TIME")):
- working_simulation = (json.loads(returnvalue.split(" ", 1)[1]), {})
- elif (returnvalue.startswith("SIM_PROBE")):
- blockname, blockvalue = returnvalue.split(" ", 1)[1].rsplit(" ", 1)
- working_simulation[1][json.loads(blockname)] = json.loads(blockvalue)
- elif (returnvalue.startswith("SIM_END")):
- simulation.append(working_simulation)
- working_simulation = None
- elif (returnvalue.startswith("CONFORMANCE_OK")):
- root.configure(background="grey")
- elif (returnvalue.startswith("CONFORMANCE_FAIL")):
- root.configure(background="red")
- elif (returnvalue.startswith("ALGEBRAIC_LOOP")):
- root.configure(background="blue")
- else:
- print("Error: got unknown result: " + returnvalue)
- class MvLayer():
- def __init__(self, address):
- import threading
- self.address = address
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % taskname, "taskname": "task_manager"}))).read()
- thrd = threading.Thread(target=poll, args=[address])
- thrd.daemon = True
- thrd.start()
- def read_available_attributes(self, name):
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"read_available_attributes"', "taskname": taskname}))).read()
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % name, "taskname": taskname}))).read()
- while not available_attrs:
- time.sleep(0.1)
- return available_attrs.pop(0)
- def read_attribute(self, name, attr):
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"read_attribute"', "taskname": taskname}))).read()
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % name, "taskname": taskname}))).read()
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % attr, "taskname": taskname}))).read()
- while not attribute:
- time.sleep(0.1)
- return attribute.pop(0)
- def set_attribute(self, name, attr, value):
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"set_attribute"', "taskname": taskname}))).read()
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % name, "taskname": taskname}))).read()
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % attr, "taskname": taskname}))).read()
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": json.dumps(value), "taskname": taskname}))).read()
- def instantiate_block(self, name, block_type):
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"instantiate_node"', "taskname": taskname}))).read()
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (block_type), "taskname": taskname}))).read()
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (name), "taskname": taskname}))).read()
- def instantiate_link(self, name, link_type, source, target):
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"instantiate_association"', "taskname": taskname}))).read()
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (link_type), "taskname": taskname}))).read()
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (name), "taskname": taskname}))).read()
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (source), "taskname": taskname}))).read()
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (target), "taskname": taskname}))).read()
- def simulate(self):
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"simulate"', "taskname": taskname}))).read()
- def step(self):
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"step"', "taskname": taskname}))).read()
- def pause(self):
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"pause"', "taskname": taskname}))).read()
- def delete(self, block):
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"delete_element"', "taskname": taskname}))).read()
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (block), "taskname": taskname}))).read()
- def lower(value):
- return value / JUMP * JUMP
- def upper(value):
- return (value / JUMP + 1) * JUMP
- def avg(a, b):
- return float(a + b) / 2
- class InterfaceCore():
- mode = ""
- drawn = set()
- refs = dict()
- #mv = MvLayer(address)
- mv = FakeLayer(address)
- def set_mode(self, mode):
- self.mode = mode
- def delete(self, x, y):
- lname = self.find((x, y))
- self.mv.delete(lname)
- for i in self.refs[lname]:
- canvas.delete(i)
- del self.refs[lname]
- self.drawn = set([e for e in self.drawn if e[4] != lname])
- def clicked(self, event):
- if self.find((event.x, event.y)):
- # Something already there, so don't add, but modify
- lname = self.find((event.x, event.y))
- attrs = self.mv.read_available_attributes(lname)
- print("Managing " + str(attrs))
- if not attrs:
- print("No attrs to manage!")
- for attr, t in attrs:
- print("Reading data from " + str(attr))
- old_value = self.mv.read_attribute(lname, attr)
- if old_value == "None":
- old_value = None
- new_value = tkSimpleDialog.askstring("Attribute modification", attr, initialvalue=old_value)
- if t == "Float":
- new_value = float(new_value)
- elif t == "String":
- new_value = str(new_value)
- else:
- print("Got unknown type: " + str(t))
- self.mv.set_attribute(lname, attr, new_value)
- elif self.mode not in ["AdditionBlock", "NegatorBlock", "ConstantBlock", "MultiplyBlock", "ConstantBlock", "InverseBlock", "DelayBlock", "IntegratorBlock", "DerivatorBlock", "ProbeBlock"]:
- print("Cannot create something not guaranteed to be block type!")
- else:
- global name
- x = event.x
- y = event.y
- self.mv.instantiate_block(str(name), self.mode)
- r = canvas.create_rectangle(lower(x), lower(y), upper(x), upper(y), fill="white")
- t = canvas.create_image(avg(lower(x), upper(x)), avg(lower(y), upper(y)), image=photos[self.mode])
- b = (lower(x), lower(y), upper(x), upper(y), str(name))
- self.drawn.add(b)
- self.refs[str(name)] = [r, t]
- name += 1
- def find(self, location):
- x, y = location
- for e in self.drawn:
- if (e[0] <= x and
- e[1] <= y and
- e[2] >= x and
- e[3] >= y):
- return e[4]
- print("Found nothing at that location!")
- return []
- def draw(self, start, end):
- source = self.find(start)
- target = self.find(end)
- print("Connect from %s to %s" % (source, target))
- if source and target:
- if self.mode not in ["Link", "InitialCondition"]:
- print("Cannot create something not guaranteed to be link type!")
- elif source == target:
- print("Cannot create link to self")
- else:
- global name
- self.mv.instantiate_link(str(name), self.mode, source, target)
- self.refs[str(name)] = [canvas.create_line(start[0], start[1], end[0], end[1], fill="black" if self.mode == "Link" else "red", arrow=LAST)]
- name += 1
- core = InterfaceCore()
- def clicked(event):
- core.clicked(event)
- def draw(event):
- global start_location
- start_location = (event.x, event.y)
- def release(event):
- core.draw(start_location, (event.x, event.y))
- def simulate():
- core.mv.simulate()
- def step():
- core.mv.step()
- def pause():
- core.mv.pause()
- def addition():
- core.set_mode("AdditionBlock")
- def negation():
- core.set_mode("NegatorBlock")
- def link():
- core.set_mode("Link")
- def multiply():
- core.set_mode("MultiplyBlock")
- def constant():
- core.set_mode("ConstantBlock")
- def inverse():
- core.set_mode("InverseBlock")
- def ic():
- core.set_mode("InitialCondition")
- def delay():
- core.set_mode("DelayBlock")
- def derivator():
- core.set_mode("DerivatorBlock")
- def integrator():
- core.set_mode("IntegratorBlock")
- def delete(event):
- core.delete(event.x, event.y)
- def probe():
- core.set_mode("ProbeBlock")
- images = {
- "ConstantBlock": Image.open("figures/constant.png"),
- "AdditionBlock": Image.open("figures/summation.png"),
- "NegatorBlock": Image.open("figures/negation.png"),
- "MultiplyBlock": Image.open("figures/multiply.png"),
- "InverseBlock": Image.open("figures/inverse.png"),
- "DerivatorBlock": Image.open("figures/derivative.png"),
- "IntegratorBlock": Image.open("figures/integrator.png"),
- "DelayBlock": Image.open("figures/delay.png"),
- "ProbeBlock": Image.open("figures/probe.png"),
- }
- hsize = 20
- for k, v in images.items():
- old_vsize, old_hsize = v.size
- vsize = int(old_vsize * hsize / float(old_hsize))
- images[k] = v.resize((vsize, hsize), Image.ANTIALIAS)
- photos = {k: ImageTk.PhotoImage(v) for k, v in images.items()}
- bound_functions = {
- "ConstantBlock": constant,
- "AdditionBlock": addition,
- "MultiplyBlock": multiply,
- "NegatorBlock": negation,
- "InverseBlock": inverse,
- "DerivatorBlock": derivator,
- "IntegratorBlock": integrator,
- "DelayBlock": delay,
- "ProbeBlock": probe,
- }
- buttons = []
- for k, v in bound_functions.items():
- buttons.append(Button(root, image=photos[k], command=v))
-
- buttons.extend([
- Button(root, text="Link", command=link, bg="blue"),
- Button(root, text="InitialCondition", command=ic, bg="blue"),
- Button(root, text="SIM", command=simulate, bg="green"),
- Button(root, text="STEP", command=step, bg="green"),
- Button(root, text="PAUSE", command=pause, bg="green"),
- ])
- for i, b in enumerate(buttons):
- b.grid(row=0, column=i)
- canvas.grid(row=1,column=0,columnspan=len(buttons))
- core.canvas = canvas
- for i in range(JUMP, MAX_HEIGHT, JUMP):
- canvas.create_line(0, i, MAX_HEIGHT, i, fill="grey")
- for i in range(JUMP, MAX_WIDTH, JUMP):
- canvas.create_line(i, 0, i, MAX_WIDTH, fill="grey")
- canvas.bind("<Button-1>", clicked)
- canvas.bind("<Button-2>", delete)
- canvas.bind("<Button-3>", draw)
- canvas.bind("<ButtonRelease-3>", release)
- visual = Toplevel(root)
- probes = {}
- values = {}
- # Example:
- # simulation = [(1, {"a": 1, "b": 2}), (2, {"a": 3}), (3, {"a": 4, "b": 6})]
- # Class from StackOverflow
- class VerticalScrolledFrame(Frame):
- """A pure Tkinter scrollable frame that actually works!
- * Use the 'interior' attribute to place widgets inside the scrollable frame
- * Construct and pack/place/grid normally
- * This frame only allows vertical scrolling
- """
- def __init__(self, parent, *args, **kw):
- Frame.__init__(self, parent, *args, **kw)
- # create a canvas object and a vertical scrollbar for scrolling it
- vscrollbar = Scrollbar(self, orient=VERTICAL)
- vscrollbar.pack(fill=Y, side=RIGHT, expand=FALSE)
- canvas = Canvas(self, bd=0, highlightthickness=0,
- yscrollcommand=vscrollbar.set)
- canvas.pack(side=LEFT, fill=BOTH, expand=TRUE)
- vscrollbar.config(command=canvas.yview)
- # reset the view
- canvas.xview_moveto(0)
- canvas.yview_moveto(0)
- # create a frame inside the canvas which will be scrolled with it
- self.interior = interior = Frame(canvas)
- interior_id = canvas.create_window(0, 0, window=interior,
- anchor=NW)
- # track changes to the canvas and frame width and sync them,
- # also updating the scrollbar
- def _configure_interior(event):
- # update the scrollbars to match the size of the inner frame
- size = (interior.winfo_reqwidth(), interior.winfo_reqheight())
- canvas.config(scrollregion="0 0 %s %s" % size)
- if interior.winfo_reqwidth() != canvas.winfo_width():
- # update the canvas's width to fit the inner frame
- canvas.config(width=interior.winfo_reqwidth())
- interior.bind('<Configure>', _configure_interior)
- def _configure_canvas(event):
- if interior.winfo_reqwidth() != canvas.winfo_width():
- # update the inner frame's width to fill the canvas
- canvas.itemconfigure(interior_id, width=canvas.winfo_width())
- canvas.bind('<Configure>', _configure_canvas)
- frame = VerticalScrolledFrame(visual)
- frame.pack(fill=BOTH,expand=True)
- def update_graphs():
- while simulation:
- t, results = simulation.pop(0)
- for k, v in results.items():
- if k in probes:
- fcanvas, a = probes[k]
- else:
- f = Figure()
- a = f.add_subplot(111)
- a.plot([], [])
- print(k)
- f.suptitle(k)
- fcanvas = FigureCanvasTkAgg(f, frame.interior)
- fcanvas.show()
- fcanvas.get_tk_widget().pack()
- probes[k] = (fcanvas, a)
- values[k] = ([], [])
- values[k][0].append(t)
- values[k][1].append(v)
- a.clear()
- a.plot(values[k][0], values[k][1], linestyle="none", marker="o")
- fcanvas.draw()
- root.after(100, update_graphs)
- root.after(100, update_graphs)
- root.mainloop()
|