import urllib import urllib2 import json import random from urllib2 import URLError import sys import time COMPILER_PATH = "interface/HUTN" MODE_UNCONNECTED = 0 MODE_UNAUTHORIZED = 1 MODE_MODELLING = 2 MODE_MODIFY = 3 MODE_DIALOG = 4 MODE_MANUAL = 5 # Bind to the compiler (might have to update path manually!) sys.path.append(COMPILER_PATH) from hutn_compiler.compiler import main as do_compile # Exceptions class ModelverseException(Exception): pass class UnknownError(ModelverseException): pass class UnknownIdentifier(ModelverseException): pass class UnknownType(ModelverseException): pass class NotAnAssociation(ModelverseException): pass class UnsupportedValue(ModelverseException): pass class CompilationError(ModelverseException): pass class NoSuchAttribute(ModelverseException): pass class UnknownModel(ModelverseException): pass class ConnectionError(ModelverseException): pass class ModelExists(ModelverseException): pass class PermissionDenied(ModelverseException): pass class InvalidMode(ModelverseException): pass class InterfaceMismatch(ModelverseException): pass # Helper functions and configuration: do not use yourself! taskname = None address = None last_output = None mode = MODE_UNCONNECTED prev_mode = None def _input(value): # Ugly json encoding of primitives if isinstance(value, type([])): value = json.dumps(value) urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "data": value, "taskname": taskname}))).read() else: value = json.dumps(value) #print("Set input: " + str(value)) urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": value, "taskname": taskname}))).read() def _input_raw(value, taskname): # Ugly json encoding of primitives urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": value, "taskname": taskname}))).read() def _compile_AL(code): # Compile an action language file and send the compiled code code_fragments = code.split("\n") code_fragments = [i for i in code_fragments if i.strip() != ""] code_fragments = [i.replace(" ", "\t") for i in code_fragments] initial_tabs = min([len(i) - len(i.lstrip("\t")) for i in code_fragments]) code_fragments = [i[initial_tabs:] for i in code_fragments] code_fragments.append("") code = "\n".join(code_fragments) with open("__constraint.alc", "w") as f: f.write(code) f.flush() return do_compile("__constraint.alc", COMPILER_PATH + "/grammars/actionlanguage.g", "CS") def _compile_model(code): # Compile a model and send the compiled graph # First change multiple spaces to a tab code_fragments = code.split("\n") code_fragments = [i for i in code_fragments if i.strip() != ""] code_fragments = [i.replace(" ", "\t") for i in code_fragments] initial_tabs = min([len(i) - len(i.lstrip("\t")) for i in code_fragments]) code_fragments = [i[initial_tabs:] for i in code_fragments] code_fragments.append("") code = "\n".join(code_fragments) with open("__model.mvc", "w") as f: f.write(code) f.flush() return do_compile("__model.mvc", COMPILER_PATH + "/grammars/modelling.g", "M") + ["exit"] def _output(expected=None): try: global last_output last_output = json.loads(urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "get_output", "taskname": taskname}))).read()) except: raise UnknownError() if expected is not None and last_output != expected: raise InterfaceMismatch(_last_output(), expected) return last_output def _last_output(): return last_output # Raise common exceptions def _handle_output(requested=None, split=None): value = _output() if value.startswith("Model exists: "): raise ModelExists(value.split(": ", 1)[1]) elif value.startswith("Permission denied"): raise PermissionDenied(value.split(": ", 1)[1]) elif value.startswith("Model not found: "): raise UnknownModel(value.split(": ", 1)[1]) elif value.startswith("Element not found: "): raise UnknownIdentifier(value.split(": ", 1)[1]) elif value.startswith("Element exists: "): raise ElementExists(value.split(": ", 1)[1]) elif value.startswith("Attribute not found: "): raise NoSuchAttribute(value.split(": ", 1)[1]) elif requested is not None and value.startswith(requested): if split is None: return value else: splitted = value.strip().split(split, 1) if len(splitted) == 1: return "" else: return splitted[1].rstrip() else: raise InterfaceMismatch(value) # Main MvC operations def init(address_param="http://127.0.0.1:8001", timeout=20.0): """Starts up the connection to the Modelverse.""" # return None # raises ConnectionError # raises UnknownError # raises InvalidMode global mode global address global taskname address = address_param start_time = time.time() taskname = random.random() while 1: try: _input_raw('"%s"' % taskname, "task_manager") mode = MODE_UNAUTHORIZED break except URLError as e: if time.time() - start_time > timeout: raise ConnectionError(e.reason) else: time.sleep(0.1) def login(username, password): """Log in a user, if user doesn't exist, it is created.""" # return None # raises UnknownError # raises PermissionDenied # raises InterfaceMismatch global mode if mode != MODE_UNAUTHORIZED: raise InvalidMode() _output("Log on as which user?") _input(username) if _output() == "Password for existing user?": _input(password) if _output() == "Welcome to the Model Management Interface v2.0!": _output("Use the 'help' command for a list of possible commands") _input("quiet") mode = MODE_MODELLING elif _last_output() == "Wrong password!": raise PermissionDenied() else: raise InterfaceMismatch(_last_output()) elif _last_output() == "This is a new user: please give password!": _input(password) _output("Please repeat the password") _input(password) if _output() == "Passwords match!": _output("Welcome to the Model Management Interface v2.0!") _output("Use the 'help' command for a list of possible commands") _input("quiet") mode = MODE_MODELLING elif _last_output() == "Not the same password!": # We just sent the same password, so it should be identical, unless the interface changed raise InterfaceMismatch(_last_output()) else: raise InterfaceMismatch(_last_output()) else: raise InterfaceMismatch(_last_output()) def model_add(model_name, metamodel_name, model_code=None): """Instantiate a new model.""" # return None # raises UnknownModel # raises ModelExists # raises UnknownError # raises PermissionDenied # raises CompilationError global mode if mode != MODE_MODELLING: raise InvalidMode() # Do this before creating the model, as otherwise compilation errors would make us inconsistent if model_code is not None: try: compiled = _compile_model(model_code) except: raise CompilationError() else: compiled = ["exit"] _input(["model_add", metamodel_name, model_name]) _handle_output("Waiting for model constructors...") _input(compiled) _output("Success") def model_delete(model_name): """Delete an existing model.""" global mode if mode != MODE_MODELLING: raise InvalidMode() _input(["model_delete", model_name]) _handle_output("Success") def model_modify(model_name): """Modify an existing model.""" # return is_write # raises UnknownModel # raises PermissionDenied # raises UnknownError global mode global prev_mode if mode == MODE_MANUAL: prev_mode = MODE_MANUAL mode = MODE_MODIFY return None if mode != MODE_MODELLING: raise InvalidMode() prev_mode = MODE_MODELLING _input(["model_modify", model_name]) _handle_output("Success") # Mode has changed mode = MODE_MODIFY _output("Model loaded, ready for commands!") if ("r/w" in _output()): write = True else: write = False _output("Use 'help' command for a list of possible commands") return write def model_list(): """List all models.""" # return [(model1, metamodel1), (model2, metamodel2), ...] # raises UnknownError if mode != MODE_MODELLING: raise InvalidMode() _input("model_list") output = _handle_output("Success: ", split=" ") if output == "": return set([]) lst = set([]) value = output.strip().split("\n") for v in value: m, mm = v.split(":") m = m.strip() mm = mm.strip() lst.add((m, mm)) return lst def model_list_full(): """List full information on all models.""" # return [(model1, metamodel1, owner1, group1, permissions1), (model2, metamodel2, owner2, group2, permissions2), ...] # raises UnknownError if mode != MODE_MODELLING: raise InvalidMode() _input("model_list_full") output = _handle_output("Success: ", split=" ") if output == "": return set([]) lst = set([]) value = output.strip().split("\n") for v in value: m, mm = v.split(":") m = m.strip() mm = mm.strip() perm, own, grp, m = m.split(" ") lst.add((m, mm, own, grp, perm)) return lst def verify(model): """Verify if a model conforms to its metamodel.""" # return "verification_result" # raises UnknownError # raises UnknownModel if mode != MODE_MODELLING: raise InvalidMode() _input(["verify", model]) return _handle_output("Success: ", split=" ") def model_overwrite(model_name, new_model=None): """Upload a new model and overwrite an existing model.""" # return None # raises UnknownModel # raises PermissionDenied # raises CompilationError # raises UnknownError if mode not in [MODE_MODELLING, MODE_MODIFY, MODE_MANUAL]: raise InvalidMode() if new_model is not None: try: compiled = _compile_model(new_model) except Exception as e: raise CompilationError(e) else: compiled = ["exit"] if mode == MODE_MODELLING: _input(["model_overwrite", model_name]) elif mode == MODE_MODIFY or mode == MODE_MANUAL: _input("upload") else: raise InvalidMode() _handle_output("Waiting for model constructors...") _input(compiled) _output("Success") def user_logout(): """Log out the current user and break the connection.""" # return None # raises UnknownError global mode if mode != MODE_MODELLING: raise InvalidMode() _input("exit") mode = MODE_UNCONNECTED def user_delete(): """Removes the current user and break the connection.""" # return None # raises UnknownError global mode if mode != MODE_MODELLING: raise InvalidMode() _input("self-destruct") mode = MODE_UNCONNECTED def model_render(model, mapper): """Fetch a rendered verion of a model.""" # return JSON_representation # raises UnknownError # raises UnknownIdentifier # raises InterfaceMismatch global mode if mode != MODE_MODELLING: raise InvalidMode() _input(["model_render", model, mapper]) return _handle_output("Success: ", split=" ") def transformation_between(source, target): global mode if mode != MODE_MODELLING: raise InvalidMode() _input(["transformation_between", source, target]) output = _handle_output("Success: ", split=" ") if output == "": return set([]) lst = set([v for v in output.split("\n")]) def transformation_add_MT_language(metamodels, RAMified_name): """Create a new Model Transformation language out of a set of metamodels.""" global mode if mode != MODE_MODELLING: raise InvalidMode() _input(["transformation_add_MT_language"] + metamodels + ["", RAMified_name]) _handle_output("Success") def transformation_add_MT(RAMified_metamodel, source_metamodels, target_metamodels, operation_name, code): """Create a new model transformation.""" global mode if mode != MODE_MODELLING: raise InvalidMode() try: compiled = _compile_model(code) except Exception as e: raise CompilationError(e) _input(["transformation_add_MT", RAMified_metamodel] + source_metamodels + [""] + target_metamodels + ["", operation_name]) _handle_output("Waiting for model constructors...") _input(compiled) _handle_output("Success") def transformation_add_AL(source_metamodels, target_metamodels, operation_name, code): """Create a new action language model, which can be executed.""" global mode if mode != MODE_MODELLING: raise InvalidMode() try: compiled = _compile_AL(code) except Exception as e: raise CompilationError(e) _input(["transformation_add_AL"] + source_metamodels + [""] + target_metamodels + [""] + [operation_name]) _handle_output("Waiting for code constructors...") _input(compiled) _output("Success") def transformation_add_MANUAL(source_metamodels, target_metamodels, operation_name): """Create a new manual model operation.""" global mode if mode != MODE_MODELLING: raise InvalidMode() _input(["transformation_add_MANUAL"] + source_metamodels + [""] + target_metamodels + [""] + [operation_name]) _handle_output("Success") def transformation_execute_AL(operation_name, input_models_dict, output_models_dict, callback=lambda i: None): """Execute an existing model operation.""" global mode if mode != MODE_MODELLING: raise InvalidMode() mv_dict_rep = [] for key, value in input_models_dict.items(): mv_dict_rep += [key, value] mv_dict_rep += [""] for key, value in output_models_dict.items(): mv_dict_rep += [key, value] mv_dict_rep += [""] _input(["transformation_execute", operation_name] + mv_dict_rep) _handle_output("Success: ready for AL execution") # We are now executing, so everything we get is part of the dialog, except if it is the string for transformation termination while _output() not in ["Success", "Failure"]: mode = MODE_DIALOG reply = callback(_last_output()) mode = MODE_MODELLING if reply is not None: _input(reply) # Got termination message, so we are done! if _last_output() == "Success": return True else: return False def transformation_execute_MANUAL(operation_name, input_models_dict, output_models_dict, callback=lambda i: None): """Execute an existing model operation.""" global mode if mode != MODE_MODELLING: raise InvalidMode() mv_dict_rep = [] for key, value in input_models_dict.items(): mv_dict_rep += [key, value] mv_dict_rep += [""] for key, value in output_models_dict.items(): mv_dict_rep += [key, value] mv_dict_rep += [""] _input(["transformation_execute", operation_name] + mv_dict_rep) _handle_output("Success: ready for MANUAL execution") # Skip over the begin of mini_modify _output() _output() _output() _output() # We are now executing, so everything we get is part of the dialog, except if it is the string for transformation termination mode = MODE_MANUAL callback() # Finished, so leave _input("exit") mode = MODE_MODELLING # Got termination message, so we are done! if _output() == "Success": return True else: return False def transformation_execute_MT(operation_name, input_models_dict, output_models_dict, callback=lambda i: None): """Execute an existing model operation.""" global mode if mode != MODE_MODELLING: raise InvalidMode() mv_dict_rep = [] for key, value in input_models_dict.items(): mv_dict_rep += [key, value] mv_dict_rep += [""] for key, value in output_models_dict.items(): mv_dict_rep += [key, value] mv_dict_rep += [""] _input(["transformation_execute", operation_name] + mv_dict_rep) _handle_output("Success: ready for MT execution") # We are now executing, so everything we get is part of the dialog, except if it is the string for transformation termination while _output() not in ["Success", "Failure"]: mode = MODE_DIALOG reply = callback(_last_output()) mode = MODE_MODELLING if reply is not None: _input(reply) # Got termination message, so we are done! if _last_output() == "Success": return True else: return False def transformation_list(): """List existing model operations.""" global mode if mode != MODE_MODELLING: raise InvalidMode() _input("transformation_list") output = _handle_output("Success: ", split=" ") if output == "": return set([]) lst = set([]) value = output.strip().split("\n") for v in value: t, m = v.strip().split(" ", 1) t = t[1:-1].strip() m = m.strip().split(":")[0].strip() lst.add((t, m)) return lst def transformation_detail(): """List full details of a a model operation.""" raise NotImplementedError() def transformation_RAMify(metamodel_name, RAMified_metamodel_name): """Ramify an existing metamodel.""" global mode if mode != MODE_MODELLING: raise InvalidMode() _input(["transformation_RAMify", metamodel_name, RAMified_metamodel_name]) _handle_output("Success") def process_execute(process_name, prefix, callbacks): """Execute a process model.""" global mode if mode != MODE_MODELLING: raise InvalidMode() _input(["process_execute", process_name, prefix]) _handle_output("Success") while _output() != "Success": output = _last_output() if output.startswith("Enacting "): # Next activity! t = output.split(" ", 1)[1].split(":", 1)[0] name = output.split(": ", 1)[1] if name in callbacks: callback = callbacks[name] if t == "ModelTransformation" or t == "ActionLanguage": while not (_output().startswith("Enacting ") or _last_output() == "Success"): mode = MODE_DIALOG reply = callback(_last_output()) mode = MODE_MODELLING if reply is not None: _input(reply) elif t == "ManualOperation": _output() _output() _output() _output() mode = MODE_MANUAL callback() _input("exit") mode = MODE_MODELLING def permission_modify(): """Modify permissions of a model.""" raise NotImplementedError() def permission_owner(): """Modify the owning user of a model.""" raise NotImplementedError() def permission_group(): """Modify the owning group of a model.""" raise NotImplementedError() def group_create(): """Create a new group.""" raise NotImplementedError() def group_delete(): """Delete a group of which you are an owner.""" raise NotImplementedError() def group_owner_add(): """Add a new owning user to a group you own.""" raise NotImplementedError() def group_owner_delete(): """Delete an owning user to a group you own.""" raise NotImplementedError() def group_join(): """Add a new user to a group you own.""" raise NotImplementedError() def group_kick(): """Delete a user from a group you own.""" raise NotImplementedError() def group_list(): """List existing groups.""" raise NotImplementedError() def admin_promote(): """Promote a user to admin status.""" raise NotImplementedError() def admin_demote(): """Demote a user from admin status.""" raise NotImplementedError() # Actual operations on the model def element_list(model_name): """Return a list of all IDs and the type of the element""" # return [(name1, type1), (name2, type2), ...] # raises UnknownError model_modify(model_name) if mode != MODE_MODIFY: raise InvalidMode() try: _input("list_full") lst = set([]) output = _handle_output("Success: ", split=" ") if output == "": return set([]) for v in output.split("\n"): m, mm = v.split(":") m = m.strip() mm = mm.strip() lst.add((m, mm)) return lst finally: model_exit() def types(model_name): """Return a list of all types usable in the model""" # return [type1, type2, ...] # raises UnknownError model_modify(model_name) if mode != MODE_MODIFY: raise InvalidMode() try: _input("types") lst = set([]) output = _handle_output("Success: ", split=" ") if output == "": return set([]) for v in output.split("\n"): m, mm = v.split(":") m = m.strip() lst.add(m) return lst finally: model_exit() def types_full(model_name): """Return a list of full types usable in the model""" # return [(type1, typetype1), (type2, typetype2), ...] # raises UnknownError model_modify(model_name) if mode != MODE_MODIFY: raise InvalidMode() try: _input("types") lst = set([]) output = _handle_output("Success: ", split=" ") if output == "": return set([]) for v in output.split("\n"): m, mm = v.split(":") m = m.strip() mm = mm.strip() lst.add((m, mm)) return lst finally: model_exit() def read(model_name, ID): """Return a tuple of information on the element: its type and source/target (None if not an edge)""" # return (type, (source, target)) # raises UnknownError # raises UnknownIdentifier model_modify(model_name) if mode != MODE_MODIFY: raise InvalidMode() try: _input(["read", ID]) output = _handle_output("Success: ", split=" ") v = output.split("\n") t = v[1].split(":")[1].strip() if (not v[0].startswith("Source:")): rval = (t, None) else: src = v[0].split(":")[1].strip() trg = v[1].split(":")[1].strip() rval = (t, (src, trg)) return rval finally: model_exit() def read_attrs(model_name, ID): """Return a dictionary of attribute value pairs""" # return {attr1: value1, attr2: value2, ...} # raises UnknownError # raises UnknownIdentifier model_modify(model_name) if mode != MODE_MODIFY: raise InvalidMode() try: _input(["read", ID]) output = _handle_output("Success: ", split=" ") v = output.split("\n") searching = True rval = {} for r in v: if searching: if r == "Attributes:": # Start working on attributes searching = False else: key, value = r.split(":", 1) _, value = value.split("=", 1) key = json.loads(key.strip()) value = value.strip() if value == "None": value = None elif value == "True": value = True elif value == "False": value = False else: value = json.loads(value) rval[key] = value return rval finally: model_exit() def instantiate(model_name, typename, edge=None, ID=""): """Create a new instance of the specified typename, between the selected elements (if not None), and with the provided ID (if any)""" # return instantiated_ID # raises UnknownError # raises UnknownType # raises UnknownIdentifier # raises NotAnEdge model_modify(model_name) if mode != MODE_MODIFY: raise InvalidMode() try: if edge is None: _input(["instantiate_node", typename, ID]) else: _input(["instantiate_edge", typename, ID, edge[0], edge[1]]) return _handle_output("Success: ", split=" ") finally: model_exit() def delete_element(model_name, ID): """Delete the element with the given ID""" # return None # raises UnknownError # raises UnknownIdentifier model_modify(model_name) if mode != MODE_MODIFY: raise InvalidMode() try: _input(["delete", ID]) _handle_output("Success") finally: model_exit() def attr_assign(model_name, ID, attr, value): """Assign a value to an attribute""" # return None # raises UnknownError # raises UnknownIdentifier # raises NoSuchAttribute # raises UnsupportedValue model_modify(model_name) if mode != MODE_MODIFY: raise InvalidMode() try: _input(["attr_add", ID, attr, value]) _handle_output("Success") finally: model_exit() def attr_assign_code(model_name, ID, attr, code): """Assign a piece of Action Language code to the attribute""" # return None # raises UnknownError # raises UnknownIdentifier # raises NoSuchAttribute # raises UnsupportedValue try: compiled = _compile_AL(code) except Exception as e: raise CompilationError(e) model_modify(model_name) if mode != MODE_MODIFY: raise InvalidMode() try: _input(["attr_add", ID, attr]) _handle_output("Waiting for code constructors...") _input(compiled) _output("Success") finally: model_exit() def attr_delete(model_name, ID, attr): """Remove an attribute.""" model_modify(model_name) if mode != MODE_MODIFY: raise InvalidMode() try: _input(["attr_del", ID, attr]) _handle_output("Success") finally: model_exit() def read_outgoing(model_name, ID, typename): """Returns a list of all outgoing associations of a specific type ("" = all)""" # return [name1, name2, ...] # raises UnknownError # raises UnknownIdentifier model_modify(model_name) if mode != MODE_MODIFY: raise InvalidMode() try: _input(["read_outgoing", ID, typename]) output = _handle_output("Success: ", split=" ") if output == "": return set([]) else: return set(output.split("\n")) finally: model_exit() def read_incoming(model_name, ID, typename): """Returns a list of all incoming associations of a specific type ("" = all)""" # return [name1, name2, ...] # raises UnknownError # raises UnknownIdentifier # raises UnknownType model_modify(model_name) if mode != MODE_MODIFY: raise InvalidMode() try: _input(["read_incoming", ID, typename]) output = _handle_output("Success: ", split=" ") if output == "": return set([]) else: return set(output.split("\n")) finally: model_exit() def read_association_source(model_name, ID): """Returns the source of an association.""" # returns name # raises UnknownError # raises UnknownIdentifier # raises NotAnAssociation model_modify(model_name) if mode != MODE_MODIFY: raise InvalidMode() try: _input(["read_association_source", ID]) return _handle_output("Success: ", split=" ") finally: model_exit() def read_association_destination(model_name, ID): """Returns the destination of an association.""" # returns name # raises UnknownError # raises UnknownIdentifier # raises NotAnAssociation model_modify(model_name) if mode != MODE_MODIFY: raise InvalidMode() try: _input(["read_association_destination", ID]) return _handle_output("Success: ", split=" ") finally: model_exit() def model_exit(): """Leave model modify mode.""" # return None # raises UnknownError global mode global prev_mode if prev_mode == MODE_MANUAL: mode = MODE_MANUAL return if mode != MODE_MODIFY: raise InvalidMode() _input("exit") _output("Success") mode = MODE_MODELLING