import urllib import urllib2 import json import random from urllib2 import URLError import sys # Bind to the compiler sys.path.append("../interface/HUTN") from hutn_compiler.compiler import main as do_compile # Helper functions and configuration: do not use yourself! taskname = random.random() address = None last_output = None mode = 0 def _input(value): # Ugly json encoding of primitives if isinstance(value, type([])): value = json.dumps(value) urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "data": value, "taskname": taskname}))).read() else: value = json.dumps(value) urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": value, "taskname": taskname}))).read() def _input_raw(value, taskname): # Ugly json encoding of primitives urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": value, "taskname": taskname}))).read() def _compile_AL(code): # Compile an action language file and send the compiled code code_fragments = code.split("\n") code_fragments = [i for i in code_fragments if i.strip() != ""] code_fragments = [i.replace(" ", "\t") for i in code_fragments] initial_tabs = min([len(i) - len(i.lstrip("\t")) for i in code_fragments]) code_fragments = [i[initial_tabs:] for i in code_fragments] code_fragments.append("") code = "\n".join(code_fragments) with open("__constraint.alc", "w") as f: f.write(code) f.flush() _input(do_compile("__constraint.alc", "../interface/HUTN/grammars/actionlanguage.g", "CS")) def _compile_model(code): # Compile a model and send the compiled graph # First change multiple spaces to a tab code_fragments = code.split("\n") code_fragments = [i for i in code_fragments if i.strip() != ""] code_fragments = [i.replace(" ", "\t") for i in code_fragments] initial_tabs = min([len(i) - len(i.lstrip("\t")) for i in code_fragments]) code_fragments = [i[initial_tabs:] for i in code_fragments] code_fragments.append("") code = "\n".join(code_fragments) with open("__model.mvc", "w") as f: f.write(code) f.flush() _input(do_compile("__model.mvc", "../interface/HUTN/grammars/modelling.g", "M") + ["exit"]) def _output(): try: global last_output last_output = json.loads(urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "get_output", "taskname": taskname}))).read()) return last_output except: raise UnknownError() def _last_output(): return last_output def _consume_to_end(): while (_output() not in ["Ready for command...", "Please give your command."]): pass # Exceptions class ModelverseException(Exception): pass class UnknownError(ModelverseException): pass class UnknownIdentifier(ModelverseException): pass class UnknownType(ModelverseException): pass class UnsupportedValue(ModelverseException): pass class CompilationError(ModelverseException): pass class NoSuchAttribute(ModelverseException): pass class UnknownModel(ModelverseException): pass class ConnectionError(ModelverseException): pass class ModelExists(ModelverseException): pass class PermissionDenied(ModelverseException): pass class InvalidMode(ModelverseException): pass # Main MvC operations def init(address_param="http://localhost:8001"): """Starts up the connection to the Modelverse.""" # return None # raises ConnectionError # raises UnknownError # raises InvalidMode global mode if mode != 0: raise InvalidMode() global address address = address_param try: _input_raw('"%s"' % taskname, "task_manager") mode = 1 except URLError as e: raise ConnectionError(e.reason) def login(username, password): """Log in an existing user.""" # return None # raises UnknownError # raises PermissionDenied global mode if mode != 1: raise InvalidMode() _input(username) _input(password) _consume_to_end() mode = 2 def register(username, password): """Register a new user.""" # return None # raises UnknownError # raises UserExists global mode if mode != 1: raise InvalidMode() _input(username) _input(password) _input(password) _consume_to_end() mode = 2 def model_add(model_name, metamodel_name, model_code): """Instantiate a new model.""" # return None # raises UnknownModel # raises ModelExists # raises UnknownError # raises PermissionDenied # raises CompilationError if mode != 2: raise InvalidMode() _input("model_add") _output() _output() _input(metamodel_name) if _output() == "Model name?": _input(model_name) if _output() == "Waiting for model constructors...": try: _compile_model(model_code) except Exception: raise CompilationError() _consume_to_end() else: _consume_to_end() raise ModelExists() elif _last_output() == "Could not find type model!": _consume_to_end() raise UnknownModel() elif _last_output() == "Permission denied": _consume_to_end() raise PermissionDenied() def model_modify(model_name): """Modify an existing model.""" # return is_write # raises UnknownModel # raises PermissionDenied # raises UnknownError global mode if mode != 2: raise InvalidMode() _input("model_modify") _output() _input(model_name) if _output() == "Permission denied": _consume_to_end() raise PermissionDenied() elif _last_output() == "Could not find model!": _consume_to_end() raise UnknownModel() elif _last_output() == "Model loaded, ready for commands!": mode = 3 if ("r/w" in _output()): write = True else: write = False _consume_to_end() return write else: _consume_to_end() raise UnknownException() def model_list(): """List all models.""" # return [(model1, metamodel1), (model2, metamodel2), ...] # raises UnknownError if mode != 2: raise InvalidMode() _input("model_list") lst = [] while (_output() != "Ready for command..."): v = _last_output() m, mm = v.split(":") m = m.strip() mm = mm.strip() lst.append((m, mm)) return lst def model_list_full(): """List full information on all models.""" # return [(model1, metamodel1, owner1, group1, permissions1), (model2, metamodel2, owner2, group2, permissions2), ...] # raises UnknownError if mode != 2: raise InvalidMode() _input("model_list_full") lst = [] while (_output() != "Ready for command..."): v = _last_output() m, mm = v.split(":") m = m.strip() mm = mm.strip() perm, own, grp, m = m.split(" ") lst.append((m, mm, own, grp, perm)) return lst def model_overwrite(model_name, new_model): """Upload a new model and overwrite an existing model.""" # return None # raises UnknownModel # raises PermissionDenied # raises CompilationError # raises UnknownError if mode != 2: raise InvalidMode() _input("model_overwrite") _input(model_name) _compile_model(new_model) def user_logout(): """Log out the current user. A new login will be required afterwards.""" # return None # raises UnknownException global mode if mode != 2: raise InvalidMode() _input("exit") mode = 1 def user_delete(): """Removes the current user. A new login will be required afterwards.""" # return None # raises UnknownException global mode if mode != 2: raise InvalidMode() _input("self-destruct") mode = 1 # Actual operations on the model def element_list(): """Return a list of all IDs and the type of the element""" # return [(name1, type1), (name2, type2), ...] # raises UnknownError if mode != 3: raise InvalidMode() _input("list") lst = [] _output() while (_output() != "Please give your command."): v = _last_output() m, mm = v.split(":") m = m.strip() mm = mm.strip() lst.append((m, mm)) return lst def types(): """Return a list of all types usable in the model""" # return [type1, type2, ...] # raises UnknownError if mode != 3: raise InvalidMode() _input("types") _output() lst = [] while (_output() != "Please give your command."): v = _last_output() m, mm = v.split(":") m = m.strip() lst.append(m) return lst def read(ID): """Return a tuple of information on the element: its type and source/target (None if not an edge)""" # return (type, (source, target)) # raises UnknownError # raises UnknownIdentifier if mode != 3: raise InvalidMode() _input("read") _output() _input(ID) _output() if _last_output() == "Unknown element; aborting": _consume_to_end() raise UnknownIdentifier() else: t = _output().split(":")[1].strip() if (not _output().startswith("Source:")): rval = (t, None) else: src = _last_output().split(":")[1].strip() trg = _output().split(":")[1].strip() rval = (t, (src, trg)) while (_output() != "Please give your command."): pass return rval def read_attrs(ID): """Return a dictionary of attribute value pairs""" # return {attr1: value1, attr2: value2, ...} # raises UnknownError # raises UnknownIdentifier if mode != 3: raise InvalidMode() _input("read") _output() _input(ID) _output() if _last_output() == "Unknown element; aborting": _consume_to_end() raise UnknownIdentifier() else: rval = {} while (_output() != "Attributes:"): pass while (_output() != "Please give your command."): r = _last_output() key, value = r.split(":") _, value = value.split("=") key = key.strip() value = value.strip() value = None if value == "None" else json.loads(value) rval[key] = value return rval def instantiate(typename, edge=None, ID=""): """Create a new instance of the specified typename, between the selected elements (if not None), and with the provided ID (if any)""" # return instantiated_ID # raises UnknownError # raises UnknownType # raises UnknownIdentifier if mode != 3: raise InvalidMode() _input("instantiate") _input(typename) _input(ID) if (edge is not None): _input(edge[0]) _input(edge[1]) def delete(ID): """Delete the element with the given ID""" # return None # raises UnknownError # raises UnknownIdentifier if mode != 3: raise InvalidMode() _input("delete") _input(ID) def attr_assign(ID, attr, value): """Assign a value to an attribute""" # return None # raises UnknownError # raises UnknownIdentifier # raises NoSuchAttribute # raises UnsupportedValue if mode != 3: raise InvalidMode() _input("attr_modify") _input(ID) _input(attr) _input(value) def attr_assign_code(ID, attr, code): """Assign a piece of Action Language code to the attribute""" # return None # raises UnknownError # raises UnknownIdentifier # raises NoSuchAttribute # raises UnsupportedValue if mode != 3: raise InvalidMode() _input("attr_modify") _input(ID) _input(attr) _compile_AL(code) def upload(new_model): """Overwrite the current model with the provided model""" # return None # raises UnknownError # raises CompilationError if mode != 3: raise InvalidMode() _input("upload") _compile_model(new_model) def read_outgoing(ID, typename): """Returns a list of all outgoing associations of a specific type ("" = all)""" # return [name1, name2, ...] # raises UnknownError # raises UnknownIdentifier # raises UnknownType if mode != 3: raise InvalidMode() _input("read_outgoing") _input(ID) _input(typename) lst = [] while (_output() != "Please give your command."): lst.append(_last_output()) return lst def read_incoming(ID, typename): """Returns a list of all incoming associations of a specific type ("" = all)""" # return [name1, name2, ...] # raises UnknownError # raises UnknownIdentifier # raises UnknownType if mode != 3: raise InvalidMode() _input("read_incoming") _input(ID) _input(typename) lst = [] while (_output() != "Please give your command."): lst.append(_last_output()) return lst def model_exit(): """Leave model modify mode.""" # return None # raises UnknownError global mode if mode != 3: raise InvalidMode() _input("exit") mode = 2 def transformation_add_MT_language(): """Create a new Model Transformation language out of a set of metamodels.""" raise NotImplementedError() def transformation_add_MT(): """Create a new model transformation.""" raise NotImplementedError() def transformation_add_AL(): """Create a new action language fragment.""" raise NotImplementedError() def transformation_add_MANUAL(): """Create a new manual model operation.""" raise NotImplementedError() def transformation_execute(): """Execute an existing model operation.""" raise NotImplementedError() def transformation_list(): """List existing model operations.""" raise NotImplementedError() def transformation_list_full(): """List detailed information on model operations.""" raise NotImplementedError() def transformation_detail(): """List full details of a a model operation.""" raise NotImplementedError() def transformation_RAMify(): """Ramify an existing metamodel.""" raise NotImplementedError() def process_execute(): """Execute a process model.""" raise NotImplementedError() def permission_modify(): """Modify permissions of a model.""" raise NotImplementedError() def permission_owner(): """Modify the owning user of a model.""" raise NotImplementedError() def permission_group(): """Modify the owning group of a model.""" raise NotImplementedError() def group_create(): """Create a new group.""" raise NotImplementedError() def group_delete(): """Delete a group of which you are an owner.""" raise NotImplementedError() def group_owner_add(): """Add a new owning user to a group you own.""" raise NotImplementedError() def group_owner_delete(): """Delete an owning user to a group you own.""" raise NotImplementedError() def group_join(): """Add a new user to a group you own.""" raise NotImplementedError() def group_kick(): """Delete a user from a group you own.""" raise NotImplementedError() def group_list(): """List existing groups.""" raise NotImplementedError() def admin_promote(): """Promote a user to admin status.""" raise NotImplementedError() def admin_demote(): """Demote a user from admin status.""" raise NotImplementedError()