Browse Source

Basics of the FSA environment (just plain copy/paste from CBD)

Yentl Van Tendeloo 8 years ago
parent
commit
5bdbf138c8
1 changed files with 465 additions and 0 deletions
  1. 465 0
      interface/FSA/main.py

+ 465 - 0
interface/FSA/main.py

@@ -0,0 +1,465 @@
+import matplotlib
+matplotlib.use("TkAgg")
+from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
+from matplotlib.figure import Figure
+
+from Tkinter import *
+from PIL import Image, ImageTk
+import tkSimpleDialog
+
+import urllib
+import urllib2
+import json
+
+import time
+
+JUMP = 50
+MAX_WIDTH = 20 * JUMP
+MAX_HEIGHT = 20 * JUMP
+address = "http://127.0.0.1:8001"
+username = "test"
+
+root = Tk()
+
+canvas = Canvas(root, width=MAX_WIDTH, height=MAX_HEIGHT, bg="white")
+
+name = 0
+
+class FakeLayer():
+    def __init__(self, address):
+        self.types = {}
+        self.sources = {}
+        self.targets = {}
+        self.attrs = {}
+
+    def read_available_attributes(self, name):
+        if self.types[name] == "const":
+            return ["value"]
+        else:
+            return []
+
+    def read_attribute(self, name, attr):
+        return self.attr.get(name, {}).get(attr, None)
+
+    def set_attribute(self, name, attr, value):
+        self.attrs[name][attr] = value
+
+    def instantiate_block(self, name, block_type):
+        self.types[name] = block_type
+
+    def instantiate_link(self, name, link_type, source, target):
+        self.types[name] = link_type
+        self.sources[name] = source
+        self.targets[name] = target
+        self.attrs[name] = {}
+
+    def simulate(self):
+        pass
+
+    def step(self):
+        pass
+
+    def pause(self):
+        pass
+
+attribute = []
+available_attrs = []
+simulation = []
+
+def poll(address):
+    working_available_attrs = []
+    working_simulation = None
+
+    while 1:
+        returnvalue = json.loads(urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "get_output", "username": username}))).read())
+        print("Process " + str(returnvalue))
+        if (returnvalue.startswith("AVAILABLE_ATTR_VALUE")):
+            working_available_attrs.append([json.loads(returnvalue.split(" ", 1)[1]), None])
+        elif (returnvalue.startswith("AVAILABLE_ATTR_TYPE")):
+            working_available_attrs[-1][1] = json.loads(returnvalue.split(" ", 1)[1])
+        elif (returnvalue.startswith("AVAILABLE_ATTR_END")):
+            available_attrs.append(working_available_attrs)
+            working_available_attrs = []
+        elif (returnvalue.startswith("ATTR_VALUE")):
+            v = returnvalue.split(" ", 1)[1]
+            if v == "None":
+                v = None
+            else:
+                v = json.loads(v)
+            attribute.append(v)
+        elif (returnvalue.startswith("SIM_TIME")):
+            working_simulation = (json.loads(returnvalue.split(" ", 1)[1]), {})
+        elif (returnvalue.startswith("SIM_PROBE")):
+            blockname, blockvalue = returnvalue.split(" ", 1)[1].rsplit(" ", 1)
+            working_simulation[1][json.loads(blockname)] = json.loads(blockvalue)
+        elif (returnvalue.startswith("SIM_END")):
+            simulation.append(working_simulation)
+            working_simulation = None
+        elif (returnvalue.startswith("CONFORMANCE_OK")):
+            root.configure(background="grey")
+        elif (returnvalue.startswith("CONFORMANCE_FAIL")):
+            root.configure(background="red")
+        elif (returnvalue.startswith("ALGEBRAIC_LOOP")):
+            root.configure(background="blue")
+        else:
+            print("Error: got unknown result: " + returnvalue)
+
+class MvLayer():
+    def __init__(self, address):
+        import threading
+
+        self.address = address
+        urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % username, "username": "user_manager"}))).read()
+        thrd = threading.Thread(target=poll, args=[address])
+        thrd.daemon = True
+        thrd.start()
+
+    def read_available_attributes(self, name):
+        urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"read_available_attributes"', "username": username}))).read()
+        urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % name, "username": username}))).read()
+
+        while not available_attrs:
+            time.sleep(0.1)
+        return available_attrs.pop(0)
+
+    def read_attribute(self, name, attr):
+        urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"read_attribute"', "username": username}))).read()
+        urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % name, "username": username}))).read()
+        urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % attr, "username": username}))).read()
+
+        while not attribute:
+            time.sleep(0.1)
+        return attribute.pop(0)
+
+    def set_attribute(self, name, attr, value):
+        urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"set_attribute"', "username": username}))).read()
+        urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % name, "username": username}))).read()
+        urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % attr, "username": username}))).read()
+        urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": json.dumps(value), "username": username}))).read()
+
+    def instantiate_block(self, name, block_type):
+        urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"instantiate_node"', "username": username}))).read()
+        urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (block_type), "username": username}))).read()
+        urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (name), "username": username}))).read()
+
+    def instantiate_link(self, name, link_type, source, target):
+        urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"instantiate_association"', "username": username}))).read()
+        urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (link_type), "username": username}))).read()
+        urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (name), "username": username}))).read()
+        urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (source), "username": username}))).read()
+        urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (target), "username": username}))).read()
+
+    def simulate(self):
+        urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"simulate"', "username": username}))).read()
+
+    def step(self):
+        urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"step"', "username": username}))).read()
+
+    def pause(self):
+        urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"pause"', "username": username}))).read()
+
+    def delete(self, block):
+        urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"delete_element"', "username": username}))).read()
+        urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": '"%s"' % (block), "username": username}))).read()
+
+def lower(value):
+    return value / JUMP * JUMP
+
+def upper(value):
+    return (value / JUMP + 1) * JUMP
+
+def avg(a, b):
+    return float(a + b) / 2
+
+class InterfaceCore():
+    mode = ""
+    drawn = set()
+    refs = dict()
+
+    #mv = MvLayer(address)
+    mv = FakeLayer(address)
+
+    def set_mode(self, mode):
+        self.mode = mode
+
+    def delete(self, x, y):
+        lname = self.find((x, y))
+        self.mv.delete(lname)
+        for i in self.refs[lname]:
+            canvas.delete(i)
+        del self.refs[lname]
+        self.drawn = set([e for e in self.drawn if e[4] != lname])
+
+    def clicked(self, event):
+        if self.find((event.x, event.y)):
+            # Something already there, so don't add, but modify
+            lname = self.find((event.x, event.y))
+
+            attrs = self.mv.read_available_attributes(lname)
+            print("Managing " + str(attrs))
+
+            if not attrs:
+                print("No attrs to manage!")
+
+            for attr, t in attrs:
+                print("Reading data from " + str(attr))
+                old_value = self.mv.read_attribute(lname, attr)
+                if old_value == "None":
+                    old_value = None
+
+                new_value = tkSimpleDialog.askstring("Attribute modification", attr, initialvalue=old_value)
+                if t == "Float":
+                    new_value = float(new_value)
+                elif t == "String":
+                    new_value = str(new_value)
+                else:
+                    print("Got unknown type: " + str(t))
+                self.mv.set_attribute(lname, attr, new_value)
+
+        elif self.mode not in ["AdditionBlock", "NegatorBlock", "ConstantBlock", "MultiplyBlock", "ConstantBlock", "InverseBlock", "DelayBlock", "IntegratorBlock", "DerivatorBlock", "ProbeBlock"]:
+            print("Cannot create something not guaranteed to be block type!")
+        else:
+            global name
+            x = event.x
+            y = event.y
+            self.mv.instantiate_block(str(name), self.mode)
+            r = canvas.create_rectangle(lower(x), lower(y), upper(x), upper(y), fill="white")
+            t = canvas.create_image(avg(lower(x), upper(x)), avg(lower(y), upper(y)), image=photos[self.mode])
+            b = (lower(x), lower(y), upper(x), upper(y), str(name))
+            self.drawn.add(b)
+            self.refs[str(name)] = [r, t]
+            name += 1
+
+    def find(self, location):
+        x, y = location
+        for e in self.drawn:
+            if (e[0] <= x and
+                e[1] <= y and
+                e[2] >= x and
+                e[3] >= y):
+                    return e[4]
+
+        print("Found nothing at that location!")
+        return []
+
+    def draw(self, start, end):
+        source = self.find(start)
+        target = self.find(end)
+
+        print("Connect from %s to %s" % (source, target))
+
+        if source and target:
+            if self.mode not in ["Link", "InitialCondition"]:
+                print("Cannot create something not guaranteed to be link type!")
+            elif source == target:
+                print("Cannot create link to self")
+            else:
+                global name
+                self.mv.instantiate_link(str(name), self.mode, source, target)
+                self.refs[str(name)] = [canvas.create_line(start[0], start[1], end[0], end[1], fill="black" if self.mode == "Link" else "red", arrow=LAST)]
+                name += 1
+
+core = InterfaceCore()
+
+def clicked(event):
+    core.clicked(event)
+
+def draw(event):
+    global start_location
+    start_location = (event.x, event.y)
+
+def release(event):
+    core.draw(start_location, (event.x, event.y))
+
+def simulate():
+    core.mv.simulate()
+
+def step():
+    core.mv.step()
+
+def pause():
+    core.mv.pause()
+
+def addition():
+    core.set_mode("AdditionBlock")
+
+def negation():
+    core.set_mode("NegatorBlock")
+
+def link():
+    core.set_mode("Link")
+
+def multiply():
+    core.set_mode("MultiplyBlock")
+
+def constant():
+    core.set_mode("ConstantBlock")
+
+def inverse():
+    core.set_mode("InverseBlock")
+
+def ic():
+    core.set_mode("InitialCondition")
+
+def delay():
+    core.set_mode("DelayBlock")
+
+def derivator():
+    core.set_mode("DerivatorBlock")
+
+def integrator():
+    core.set_mode("IntegratorBlock")
+
+def delete(event):
+    core.delete(event.x, event.y)
+
+def probe():
+    core.set_mode("ProbeBlock")
+
+images = {
+        "ConstantBlock": Image.open("figures/constant.png"),
+        "AdditionBlock": Image.open("figures/summation.png"),
+        "NegatorBlock": Image.open("figures/negation.png"),
+        "MultiplyBlock": Image.open("figures/multiply.png"),
+        "InverseBlock": Image.open("figures/inverse.png"),
+        "DerivatorBlock": Image.open("figures/derivative.png"),
+        "IntegratorBlock": Image.open("figures/integrator.png"),
+        "DelayBlock": Image.open("figures/delay.png"),
+        "ProbeBlock": Image.open("figures/probe.png"),
+    }
+
+hsize = 20
+for k, v in images.items():
+    old_vsize, old_hsize = v.size
+    vsize = int(old_vsize * hsize / float(old_hsize))
+    images[k] = v.resize((vsize, hsize), Image.ANTIALIAS)
+
+photos = {k: ImageTk.PhotoImage(v) for k, v in images.items()}
+
+bound_functions = {
+        "ConstantBlock": constant,
+        "AdditionBlock": addition,
+        "MultiplyBlock": multiply,
+        "NegatorBlock": negation,
+        "InverseBlock": inverse,
+        "DerivatorBlock": derivator,
+        "IntegratorBlock": integrator,
+        "DelayBlock": delay,
+        "ProbeBlock": probe,
+}
+
+buttons = []
+for k, v in bound_functions.items():
+    buttons.append(Button(root, image=photos[k], command=v))
+    
+buttons.extend([
+        Button(root, text="Link", command=link, bg="blue"),
+        Button(root, text="InitialCondition", command=ic, bg="blue"),
+        Button(root, text="SIM", command=simulate, bg="green"),
+        Button(root, text="STEP", command=step, bg="green"),
+        Button(root, text="PAUSE", command=pause, bg="green"),
+    ])
+
+for i, b in enumerate(buttons):
+    b.grid(row=0, column=i)
+
+canvas.grid(row=1,column=0,columnspan=len(buttons))
+
+core.canvas = canvas
+
+for i in range(JUMP, MAX_HEIGHT, JUMP):
+    canvas.create_line(0, i, MAX_HEIGHT, i, fill="grey")
+for i in range(JUMP, MAX_WIDTH, JUMP):
+    canvas.create_line(i, 0, i, MAX_WIDTH, fill="grey")
+
+canvas.bind("<Button-1>", clicked)
+canvas.bind("<Button-2>", delete)
+canvas.bind("<Button-3>", draw)
+canvas.bind("<ButtonRelease-3>", release)
+
+visual = Toplevel(root)
+
+probes = {}
+values = {}
+
+# Example:
+#    simulation = [(1, {"a": 1, "b": 2}), (2, {"a": 3}), (3, {"a": 4, "b": 6})]
+
+# Class from StackOverflow
+class VerticalScrolledFrame(Frame):
+    """A pure Tkinter scrollable frame that actually works!
+    * Use the 'interior' attribute to place widgets inside the scrollable frame
+    * Construct and pack/place/grid normally
+    * This frame only allows vertical scrolling
+
+    """
+    def __init__(self, parent, *args, **kw):
+        Frame.__init__(self, parent, *args, **kw)            
+
+        # create a canvas object and a vertical scrollbar for scrolling it
+        vscrollbar = Scrollbar(self, orient=VERTICAL)
+        vscrollbar.pack(fill=Y, side=RIGHT, expand=FALSE)
+        canvas = Canvas(self, bd=0, highlightthickness=0,
+                        yscrollcommand=vscrollbar.set)
+        canvas.pack(side=LEFT, fill=BOTH, expand=TRUE)
+        vscrollbar.config(command=canvas.yview)
+
+        # reset the view
+        canvas.xview_moveto(0)
+        canvas.yview_moveto(0)
+
+        # create a frame inside the canvas which will be scrolled with it
+        self.interior = interior = Frame(canvas)
+        interior_id = canvas.create_window(0, 0, window=interior,
+                                           anchor=NW)
+
+        # track changes to the canvas and frame width and sync them,
+        # also updating the scrollbar
+        def _configure_interior(event):
+            # update the scrollbars to match the size of the inner frame
+            size = (interior.winfo_reqwidth(), interior.winfo_reqheight())
+            canvas.config(scrollregion="0 0 %s %s" % size)
+            if interior.winfo_reqwidth() != canvas.winfo_width():
+                # update the canvas's width to fit the inner frame
+                canvas.config(width=interior.winfo_reqwidth())
+        interior.bind('<Configure>', _configure_interior)
+
+        def _configure_canvas(event):
+            if interior.winfo_reqwidth() != canvas.winfo_width():
+                # update the inner frame's width to fill the canvas
+                canvas.itemconfigure(interior_id, width=canvas.winfo_width())
+        canvas.bind('<Configure>', _configure_canvas)
+
+frame = VerticalScrolledFrame(visual)
+frame.pack(fill=BOTH,expand=True)
+
+def update_graphs():
+    while simulation:
+        t, results = simulation.pop(0)
+        for k, v in results.items():
+            if k in probes:
+                fcanvas, a = probes[k]
+            else:
+                f = Figure()
+                a = f.add_subplot(111)
+                a.plot([], [])
+                print(k)
+                f.suptitle(k)
+
+                fcanvas = FigureCanvasTkAgg(f, frame.interior)
+                fcanvas.show()
+                fcanvas.get_tk_widget().pack()
+
+                probes[k] = (fcanvas, a)
+                values[k] = ([], [])
+
+            values[k][0].append(t)
+            values[k][1].append(v)
+
+            a.clear()
+            a.plot(values[k][0], values[k][1], linestyle="none", marker="o")
+            fcanvas.draw()
+    root.after(100, update_graphs)
+
+root.after(100, update_graphs)
+root.mainloop()