import urllib import urllib2 import json import random from urllib2 import URLError import sys COMPILER_PATH = "interface/HUTN" # Bind to the compiler (might have to update path manually!) sys.path.append(COMPILER_PATH) from hutn_compiler.compiler import main as do_compile # Exceptions class ModelverseException(Exception): pass class UnknownException(ModelverseException): pass class UnknownIdentifier(ModelverseException): pass class UnknownType(ModelverseException): pass class NotAnAssociation(ModelverseException): pass class UnsupportedValue(ModelverseException): pass class CompilationError(ModelverseException): pass class NoSuchAttribute(ModelverseException): pass class UnknownModel(ModelverseException): pass class ConnectionError(ModelverseException): pass class ModelExists(ModelverseException): pass class PermissionDenied(ModelverseException): pass class InvalidMode(ModelverseException): pass class InterfaceMismatch(ModelverseException): pass # Helper functions and configuration: do not use yourself! taskname = random.random() address = None last_output = None mode = 0 def _input(value): # Ugly json encoding of primitives if isinstance(value, type([])): value = json.dumps(value) urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "data": value, "taskname": taskname}))).read() else: value = json.dumps(value) #print("Set input: " + str(value)) urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": value, "taskname": taskname}))).read() def _input_raw(value, taskname): # Ugly json encoding of primitives urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": value, "taskname": taskname}))).read() def _compile_AL(code): # Compile an action language file and send the compiled code code_fragments = code.split("\n") code_fragments = [i for i in code_fragments if i.strip() != ""] code_fragments = [i.replace(" ", "\t") for i in code_fragments] initial_tabs = min([len(i) - len(i.lstrip("\t")) for i in code_fragments]) code_fragments = [i[initial_tabs:] for i in code_fragments] code_fragments.append("") code = "\n".join(code_fragments) with open("__constraint.alc", "w") as f: f.write(code) f.flush() return do_compile("__constraint.alc", COMPILER_PATH + "/grammars/actionlanguage.g", "CS") def _compile_model(code): # Compile a model and send the compiled graph # First change multiple spaces to a tab code_fragments = code.split("\n") code_fragments = [i for i in code_fragments if i.strip() != ""] code_fragments = [i.replace(" ", "\t") for i in code_fragments] initial_tabs = min([len(i) - len(i.lstrip("\t")) for i in code_fragments]) code_fragments = [i[initial_tabs:] for i in code_fragments] code_fragments.append("") code = "\n".join(code_fragments) with open("__model.mvc", "w") as f: f.write(code) f.flush() return do_compile("__model.mvc", COMPILER_PATH + "/grammars/modelling.g", "M") + ["exit"] def _output(expected=None): try: global last_output last_output = json.loads(urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "get_output", "taskname": taskname}))).read()) #print("Got output: " + str(last_output)) except: raise UnknownError() if expected is not None and last_output != expected: raise InterfaceMismatch(_last_output(), expected) return last_output def _last_output(): return last_output # Raise common exceptions def _handle_output(requested=None, split=None): value = _output() if value.startswith("Model exists: "): raise ModelExists() elif value.startswith("Permission denied"): raise PermissionDenied() elif value.startswith("Model not found: "): raise UnknownModel() elif value.startswith("Element not found: "): raise UnknownIdentifier() elif value.startswith("Element exists: "): raise ElementExists() elif value.startswith("Attribute not found: "): raise NoSuchAttribute() elif requested is not None and value.startswith(requested): if split is None: return value else: splitted = value.strip().split(split, 1) if len(splitted) == 1: return "" else: return splitted[1].rstrip() else: raise InterfaceMismatch(value) # Main MvC operations def init(address_param="http://127.0.0.1:8001"): """Starts up the connection to the Modelverse.""" # return None # raises ConnectionError # raises UnknownError # raises InvalidMode global mode if mode != 0: raise InvalidMode() global address address = address_param try: _input_raw('"%s"' % taskname, "task_manager") mode = 1 except URLError as e: raise ConnectionError(e.reason) def login(username, password): """Log in a user, if user doesn't exist, it is created.""" # return None # raises UnknownError # raises PermissionDenied # raises InterfaceMismatch global mode if mode != 1: raise InvalidMode() _output("Log on as which user?") _input(username) if _output() == "Password for existing user?": _input(password) if _output() == "Welcome to the Model Management Interface v2.0!": _output("Use the 'help' command for a list of possible commands") _input("quiet") mode = 2 elif _last_output() == "Wrong password!": raise PermissionDenied() else: raise InterfaceMismatch(_last_output()) elif _last_output() == "This is a new user: please give password!": _input(password) _output("Please repeat the password") _input(password) if _output() == "Passwords match!": _output("Welcome to the Model Management Interface v2.0!") _output("Use the 'help' command for a list of possible commands") _input("quiet") mode = 2 elif _last_output() == "Not the same password!": # We just sent the same password, so it should be identical, unless the interface changed raise InterfaceMismatch(_last_output()) else: raise InterfaceMismatch(_last_output()) else: raise InterfaceMismatch(_last_output()) def model_add(model_name, metamodel_name, model_code=None): """Instantiate a new model.""" # return None # raises UnknownModel # raises ModelExists # raises UnknownError # raises PermissionDenied # raises CompilationError if mode != 2: raise InvalidMode() # Do this before creating the model, as otherwise compilation errors would make us inconsistent if model_code is not None: try: compiled = _compile_model(model_code) except: raise CompilationError() else: compiled = ["exit"] _input(["model_add", metamodel_name, model_name]) _handle_output("Waiting for model constructors...") _input(compiled) _output("Success") def model_modify(model_name): """Modify an existing model.""" # return is_write # raises UnknownModel # raises PermissionDenied # raises UnknownError global mode if mode != 2: raise InvalidMode() _input(["model_modify", model_name]) _handle_output("Success") # Mode has changed mode = 3 _output("Model loaded, ready for commands!") if ("r/w" in _output()): write = True else: write = False _output("Use 'help' command for a list of possible commands") return write def model_list(): """List all models.""" # return [(model1, metamodel1), (model2, metamodel2), ...] # raises UnknownError if mode != 2: raise InvalidMode() _input("model_list") output = _handle_output("Success: ", split=" ") if output == "": return [] lst = [] value = output.strip().split("\n") for v in value: m, mm = v.split(":") m = m.strip() mm = mm.strip() lst.append((m, mm)) return lst def model_list_full(): """List full information on all models.""" # return [(model1, metamodel1, owner1, group1, permissions1), (model2, metamodel2, owner2, group2, permissions2), ...] # raises UnknownError if mode != 2: raise InvalidMode() _input("model_list_full") lst = [] value = _output().strip().split("\n") for v in value: m, mm = v.split(":") m = m.strip() mm = mm.strip() perm, own, grp, m = m.split(" ") lst.append((m, mm, own, grp, perm)) return lst def verify(model): """Verify if a model conforms to its metamodel.""" # return "verification_result" # raises UnknownError # raises UnknownModel if mode != 2: raise InvalidMode() _input(["verify", model]) return _handle_output("Success: ", split=" ") def model_overwrite(model_name, new_model=None): """Upload a new model and overwrite an existing model.""" # return None # raises UnknownModel # raises PermissionDenied # raises CompilationError # raises UnknownError if mode != 2: raise InvalidMode() if new_model is not None: try: compiled = _compile_model(new_model) except Exception as e: raise CompilationError(e) else: compiled = ["exit"] _input(["model_overwrite", model_name]) _handle_output("Waiting for model constructors...") _input(compiled) _output("Success") def user_logout(): """Log out the current user and break the connection.""" # return None # raises UnknownException global mode if mode != 2: raise InvalidMode() _input("exit") mode = 0 def user_delete(): """Removes the current user and break the connection.""" # return None # raises UnknownException global mode if mode != 2: raise InvalidMode() _input("self-destruct") mode = 0 def model_render(model, mapper): """Fetch a rendered verion of a model.""" # return JSON_representation # raises UnknownException # raises UnknownIdentifier # raises InterfaceMismatch global mode if mode != 2: raise InvalidMode() _input(["model_render", model, mapper]) return _handle_output("Success: ", split=" ") def transformation_between(source, target): global mode if mode != 2: raise InvalidMode() _input(["transformation_between", source, target]) output = _handle_output("Success: ", split=" ") if output == "": return [] lst = [v for v in output.split("\n")] def transformation_add_MT_language(): """Create a new Model Transformation language out of a set of metamodels.""" raise NotImplementedError() def transformation_add_MT(): """Create a new model transformation.""" raise NotImplementedError() def transformation_add_AL(): """Create a new action language fragment.""" raise NotImplementedError() def transformation_add_MANUAL(): """Create a new manual model operation.""" raise NotImplementedError() def transformation_execute(): """Execute an existing model operation.""" raise NotImplementedError() def transformation_list(): """List existing model operations.""" raise NotImplementedError() def transformation_list_full(): """List detailed information on model operations.""" raise NotImplementedError() def transformation_detail(): """List full details of a a model operation.""" raise NotImplementedError() def transformation_RAMify(): """Ramify an existing metamodel.""" raise NotImplementedError() def process_execute(): """Execute a process model.""" raise NotImplementedError() def permission_modify(): """Modify permissions of a model.""" raise NotImplementedError() def permission_owner(): """Modify the owning user of a model.""" raise NotImplementedError() def permission_group(): """Modify the owning group of a model.""" raise NotImplementedError() def group_create(): """Create a new group.""" raise NotImplementedError() def group_delete(): """Delete a group of which you are an owner.""" raise NotImplementedError() def group_owner_add(): """Add a new owning user to a group you own.""" raise NotImplementedError() def group_owner_delete(): """Delete an owning user to a group you own.""" raise NotImplementedError() def group_join(): """Add a new user to a group you own.""" raise NotImplementedError() def group_kick(): """Delete a user from a group you own.""" raise NotImplementedError() def group_list(): """List existing groups.""" raise NotImplementedError() def admin_promote(): """Promote a user to admin status.""" raise NotImplementedError() def admin_demote(): """Demote a user from admin status.""" raise NotImplementedError() # Actual operations on the model def element_list(model_name): """Return a list of all IDs and the type of the element""" # return [(name1, type1), (name2, type2), ...] # raises UnknownError model_modify(model_name) if mode != 3: raise InvalidMode() try: _input("list_full") lst = [] output = _handle_output("Success: ", split=" ") if output == "": return [] for v in output.split("\n"): m, mm = v.split(":") m = m.strip() mm = mm.strip() lst.append((m, mm)) return lst finally: model_exit() def types(model_name): """Return a list of all types usable in the model""" # return [type1, type2, ...] # raises UnknownError model_modify(model_name) if mode != 3: raise InvalidMode() try: _input("types") lst = [] output = _handle_output("Success: ", split=" ") if output == "": return [] for v in output.split("\n"): m, mm = v.split(":") m = m.strip() lst.append(m) return lst finally: model_exit() def types_full(model_name): """Return a list of full types usable in the model""" # return [(type1, typetype1), (type2, typetype2), ...] # raises UnknownError model_modify(model_name) if mode != 3: raise InvalidMode() try: _input("types") lst = [] output = _handle_output("Success: ", split=" ") if output == "": return [] for v in output.split("\n"): m, mm = v.split(":") m = m.strip() mm = mm.strip() lst.append((m, mm)) return lst finally: model_exit() def read(model_name, ID): """Return a tuple of information on the element: its type and source/target (None if not an edge)""" # return (type, (source, target)) # raises UnknownError # raises UnknownIdentifier model_modify(model_name) if mode != 3: raise InvalidMode() try: _input(["read", ID]) output = _handle_output("Success: ", split=" ") v = output.split("\n") t = v[1].split(":")[1].strip() if (not v[0].startswith("Source:")): rval = (t, None) else: src = v[0].split(":")[1].strip() trg = v[1].split(":")[1].strip() rval = (t, (src, trg)) return rval finally: model_exit() def read_attrs(model_name, ID): """Return a dictionary of attribute value pairs""" # return {attr1: value1, attr2: value2, ...} # raises UnknownError # raises UnknownIdentifier model_modify(model_name) if mode != 3: raise InvalidMode() try: _input(["read", ID]) output = _handle_output("Success: ", split=" ") v = output.split("\n") searching = True rval = {} for r in v: if searching: if r == "Attributes:": # Start working on attributes searching = False else: key, value = r.split(":", 1) _, value = value.split("=", 1) key = json.loads(key.strip()) value = value.strip() if value == "None": value = None elif value == "True": value = True elif value == "False": value = False else: value = json.loads(value) rval[key] = value return rval finally: model_exit() def instantiate(model_name, typename, edge=None): """Create a new instance of the specified typename, between the selected elements (if not None), and with the provided ID (if any)""" # return instantiated_ID # raises UnknownError # raises UnknownType # raises UnknownIdentifier # raises NotAnEdge model_modify(model_name) if mode != 3: raise InvalidMode() try: if edge is None: _input(["instantiate_node", typename, ""]) else: _input(["instantiate_edge", typename, "", edge[0], edge[1]]) return _handle_output("Success: ", split=" ") finally: model_exit() def delete_element(model_name, ID): """Delete the element with the given ID""" # return None # raises UnknownError # raises UnknownIdentifier model_modify(model_name) if mode != 3: raise InvalidMode() try: _input(["delete", ID]) _handle_output("Success") finally: model_exit() def attr_assign(model_name, ID, attr, value): """Assign a value to an attribute""" # return None # raises UnknownError # raises UnknownIdentifier # raises NoSuchAttribute # raises UnsupportedValue model_modify(model_name) if mode != 3: raise InvalidMode() try: _input(["attr_add", ID, attr, value]) _handle_output("Success") finally: model_exit() def attr_assign_code(model_name, ID, attr, code): """Assign a piece of Action Language code to the attribute""" # return None # raises UnknownError # raises UnknownIdentifier # raises NoSuchAttribute # raises UnsupportedValue try: compiled = _compile_AL(code) except Exception as e: raise CompilationError(e) model_modify(model_name) if mode != 3: raise InvalidMode() try: _input(["attr_add", ID, attr]) output = _handle_output("Waiting for code constructors...") _input(compiled) _output("Success") finally: model_exit() def attr_delete(model_name, ID, attr): """Remove an attribute.""" model_modify(model_name) if mode != 3: raise InvalidMode() try: _input(["attr_del", ID, attr]) _handle_output("Success") finally: model_exit() def read_outgoing(model_name, ID, typename): """Returns a list of all outgoing associations of a specific type ("" = all)""" # return [name1, name2, ...] # raises UnknownError # raises UnknownIdentifier model_modify(model_name) if mode != 3: raise InvalidMode() try: _input(["read_outgoing", ID, typename]) output = _handle_output("Success: ", split=" ") if output == "": return [] else: return output.split("\n") finally: model_exit() def read_incoming(model_name, ID, typename): """Returns a list of all incoming associations of a specific type ("" = all)""" # return [name1, name2, ...] # raises UnknownError # raises UnknownIdentifier # raises UnknownType model_modify(model_name) if mode != 3: raise InvalidMode() try: _input(["read_incoming", ID, typename]) output = _handle_output("Success: ", split=" ") if output == "": return [] else: return output.split("\n") finally: model_exit() def read_association_source(model_name, ID): """Returns the source of an association.""" # returns name # raises UnknownError # raises UnknownIdentifier # raises NotAnAssociation model_modify(model_name) if mode != 3: raise InvalidMode() try: _input(["read_association_source", ID]) return _handle_output("Success: ", split=" ") finally: model_exit() def read_association_destination(model_name, ID): """Returns the destination of an association.""" # returns name # raises UnknownError # raises UnknownIdentifier # raises NotAnAssociation model_modify(model_name) if mode != 3: raise InvalidMode() try: _input(["read_association_destination", ID]) return _handle_output("Success: ", split=" ") finally: model_exit() def model_exit(): """Leave model modify mode.""" # return None # raises UnknownError global mode if mode != 3: raise InvalidMode() _input("exit") _output("Success") mode = 2