123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954 |
- import urllib
- import urllib2
- import json
- import random
- from urllib2 import URLError
- import sys
- import time
- COMPILER_PATH = "interface/HUTN"
- MODE_UNCONNECTED = 0
- MODE_UNAUTHORIZED = 1
- MODE_MODELLING = 2
- MODE_MODIFY = 3
- MODE_DIALOG = 4
- MODE_MANUAL = 5
- # Bind to the compiler (might have to update path manually!)
- sys.path.append(COMPILER_PATH)
- from hutn_compiler.compiler import main as do_compile
- # Exceptions
- class ModelverseException(Exception):
- pass
- class UnknownError(ModelverseException):
- pass
- class UnknownIdentifier(ModelverseException):
- pass
- class UnknownType(ModelverseException):
- pass
- class NotAnAssociation(ModelverseException):
- pass
- class UnsupportedValue(ModelverseException):
- pass
- class CompilationError(ModelverseException):
- pass
- class NoSuchAttribute(ModelverseException):
- pass
- class UnknownModel(ModelverseException):
- pass
- class ConnectionError(ModelverseException):
- pass
- class ModelExists(ModelverseException):
- pass
- class PermissionDenied(ModelverseException):
- pass
- class InvalidMode(ModelverseException):
- pass
- class InterfaceMismatch(ModelverseException):
- pass
- # Helper functions and configuration: do not use yourself!
- taskname = random.random()
- address = None
- last_output = None
- mode = MODE_UNCONNECTED
- prev_mode = None
- def _input(value):
- # Ugly json encoding of primitives
- if isinstance(value, type([])):
- value = json.dumps(value)
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "data": value, "taskname": taskname}))).read()
- else:
- value = json.dumps(value)
- #print("Set input: " + str(value))
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": value, "taskname": taskname}))).read()
- def _input_raw(value, taskname):
- # Ugly json encoding of primitives
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": value, "taskname": taskname}))).read()
- def _compile_AL(code):
- # Compile an action language file and send the compiled code
- code_fragments = code.split("\n")
- code_fragments = [i for i in code_fragments if i.strip() != ""]
- code_fragments = [i.replace(" ", "\t") for i in code_fragments]
- initial_tabs = min([len(i) - len(i.lstrip("\t")) for i in code_fragments])
- code_fragments = [i[initial_tabs:] for i in code_fragments]
- code_fragments.append("")
- code = "\n".join(code_fragments)
- with open("__constraint.alc", "w") as f:
- f.write(code)
- f.flush()
- return do_compile("__constraint.alc", COMPILER_PATH + "/grammars/actionlanguage.g", "CS")
- def _compile_model(code):
- # Compile a model and send the compiled graph
- # First change multiple spaces to a tab
- code_fragments = code.split("\n")
- code_fragments = [i for i in code_fragments if i.strip() != ""]
- code_fragments = [i.replace(" ", "\t") for i in code_fragments]
- initial_tabs = min([len(i) - len(i.lstrip("\t")) for i in code_fragments])
- code_fragments = [i[initial_tabs:] for i in code_fragments]
- code_fragments.append("")
- code = "\n".join(code_fragments)
- with open("__model.mvc", "w") as f:
- f.write(code)
- f.flush()
- return do_compile("__model.mvc", COMPILER_PATH + "/grammars/modelling.g", "M") + ["exit"]
- def _output(expected=None):
- try:
- global last_output
- last_output = json.loads(urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "get_output", "taskname": taskname}))).read())
- except:
- raise UnknownError()
- if expected is not None and last_output != expected:
- raise InterfaceMismatch(_last_output(), expected)
- return last_output
- def _last_output():
- return last_output
- # Raise common exceptions
- def _handle_output(requested=None, split=None):
- value = _output()
- if value.startswith("Model exists: "):
- raise ModelExists(value.split(": ", 1)[1])
- elif value.startswith("Permission denied"):
- raise PermissionDenied(value.split(": ", 1)[1])
- elif value.startswith("Model not found: "):
- raise UnknownModel(value.split(": ", 1)[1])
- elif value.startswith("Element not found: "):
- raise UnknownIdentifier(value.split(": ", 1)[1])
- elif value.startswith("Element exists: "):
- raise ElementExists(value.split(": ", 1)[1])
- elif value.startswith("Attribute not found: "):
- raise NoSuchAttribute(value.split(": ", 1)[1])
- elif requested is not None and value.startswith(requested):
- if split is None:
- return value
- else:
- splitted = value.strip().split(split, 1)
- if len(splitted) == 1:
- return ""
- else:
- return splitted[1].rstrip()
- else:
- raise InterfaceMismatch(value)
-
- # Main MvC operations
- def init(address_param="http://127.0.0.1:8001", timeout=10.0):
- """Starts up the connection to the Modelverse."""
- # return None
- # raises ConnectionError
- # raises UnknownError
- # raises InvalidMode
- global mode
- if mode != MODE_UNCONNECTED:
- raise InvalidMode()
- global address
- address = address_param
- start_time = time.time()
- while 1:
- try:
- _input_raw('"%s"' % taskname, "task_manager")
- mode = MODE_UNAUTHORIZED
- break
- except URLError as e:
- if time.time() - start_time > timeout:
- raise ConnectionError(e.reason)
- else:
- time.sleep(0.1)
- def login(username, password):
- """Log in a user, if user doesn't exist, it is created."""
- # return None
- # raises UnknownError
- # raises PermissionDenied
- # raises InterfaceMismatch
- global mode
- if mode != MODE_UNAUTHORIZED:
- raise InvalidMode()
- _output("Log on as which user?")
- _input(username)
- if _output() == "Password for existing user?":
- _input(password)
- if _output() == "Welcome to the Model Management Interface v2.0!":
- _output("Use the 'help' command for a list of possible commands")
- _input("quiet")
- mode = MODE_MODELLING
- elif _last_output() == "Wrong password!":
- raise PermissionDenied()
- else:
- raise InterfaceMismatch(_last_output())
- elif _last_output() == "This is a new user: please give password!":
- _input(password)
- _output("Please repeat the password")
- _input(password)
- if _output() == "Passwords match!":
- _output("Welcome to the Model Management Interface v2.0!")
- _output("Use the 'help' command for a list of possible commands")
- _input("quiet")
- mode = MODE_MODELLING
- elif _last_output() == "Not the same password!":
- # We just sent the same password, so it should be identical, unless the interface changed
- raise InterfaceMismatch(_last_output())
- else:
- raise InterfaceMismatch(_last_output())
- else:
- raise InterfaceMismatch(_last_output())
- def model_add(model_name, metamodel_name, model_code=None):
- """Instantiate a new model."""
- # return None
- # raises UnknownModel
- # raises ModelExists
- # raises UnknownError
- # raises PermissionDenied
- # raises CompilationError
- if mode != MODE_MODELLING:
- raise InvalidMode()
- # Do this before creating the model, as otherwise compilation errors would make us inconsistent
- if model_code is not None:
- try:
- compiled = _compile_model(model_code)
- except:
- raise CompilationError()
- else:
- compiled = ["exit"]
- _input(["model_add", metamodel_name, model_name])
- _handle_output("Waiting for model constructors...")
- _input(compiled)
- _output("Success")
- def model_modify(model_name):
- """Modify an existing model."""
- # return is_write
- # raises UnknownModel
- # raises PermissionDenied
- # raises UnknownError
- global mode
- global prev_mode
- if mode == MODE_MANUAL:
- prev_mode = MODE_MANUAL
- mode = MODE_MODIFY
- return None
- if mode != MODE_MODELLING:
- raise InvalidMode()
- prev_mode = MODE_MODELLING
- _input(["model_modify", model_name])
- _handle_output("Success")
- # Mode has changed
- mode = MODE_MODIFY
- _output("Model loaded, ready for commands!")
- if ("r/w" in _output()):
- write = True
- else:
- write = False
- _output("Use 'help' command for a list of possible commands")
- return write
- def model_list():
- """List all models."""
- # return [(model1, metamodel1), (model2, metamodel2), ...]
- # raises UnknownError
- if mode != MODE_MODELLING:
- raise InvalidMode()
- _input("model_list")
- output = _handle_output("Success: ", split=" ")
- if output == "":
- return set([])
- lst = set([])
- value = output.strip().split("\n")
- for v in value:
- m, mm = v.split(":")
- m = m.strip()
- mm = mm.strip()
- lst.add((m, mm))
- return lst
- def model_list_full():
- """List full information on all models."""
- # return [(model1, metamodel1, owner1, group1, permissions1), (model2, metamodel2, owner2, group2, permissions2), ...]
- # raises UnknownError
- if mode != MODE_MODELLING:
- raise InvalidMode()
- _input("model_list_full")
- output = _handle_output("Success: ", split=" ")
- if output == "":
- return set([])
- lst = set([])
- value = output.strip().split("\n")
- for v in value:
- print("READ " + v)
- m, mm = v.split(":")
- m = m.strip()
- mm = mm.strip()
- perm, own, grp, m = m.split(" ")
- lst.add((m, mm, own, grp, perm))
- return lst
- def verify(model):
- """Verify if a model conforms to its metamodel."""
- # return "verification_result"
- # raises UnknownError
- # raises UnknownModel
- if mode != MODE_MODELLING:
- raise InvalidMode()
- _input(["verify", model])
- return _handle_output("Success: ", split=" ")
- def model_overwrite(model_name, new_model=None):
- """Upload a new model and overwrite an existing model."""
- # return None
- # raises UnknownModel
- # raises PermissionDenied
- # raises CompilationError
- # raises UnknownError
- if mode != MODE_MODELLING:
- raise InvalidMode()
- if new_model is not None:
- try:
- compiled = _compile_model(new_model)
- except Exception as e:
- raise CompilationError(e)
- else:
- compiled = ["exit"]
- _input(["model_overwrite", model_name])
- _handle_output("Waiting for model constructors...")
- _input(compiled)
- _output("Success")
- def user_logout():
- """Log out the current user and break the connection."""
- # return None
- # raises UnknownError
- global mode
- if mode != MODE_MODELLING:
- raise InvalidMode()
- _input("exit")
- mode = MODE_UNCONNECTED
- def user_delete():
- """Removes the current user and break the connection."""
- # return None
- # raises UnknownError
- global mode
- if mode != MODE_MODELLING:
- raise InvalidMode()
- _input("self-destruct")
- mode = MODE_UNCONNECTED
- def model_render(model, mapper):
- """Fetch a rendered verion of a model."""
- # return JSON_representation
- # raises UnknownError
- # raises UnknownIdentifier
- # raises InterfaceMismatch
- global mode
- if mode != MODE_MODELLING:
- raise InvalidMode()
- _input(["model_render", model, mapper])
- return _handle_output("Success: ", split=" ")
- def transformation_between(source, target):
- global mode
- if mode != MODE_MODELLING:
- raise InvalidMode()
- _input(["transformation_between", source, target])
- output = _handle_output("Success: ", split=" ")
- if output == "":
- return set([])
- lst = set([v for v in output.split("\n")])
- def transformation_add_MT_language(metamodels, RAMified_name):
- """Create a new Model Transformation language out of a set of metamodels."""
- global mode
- if mode != MODE_MODELLING:
- raise InvalidMode()
- _input(["transformation_add_MT_language"] + metamodels + ["", RAMified_name])
- _handle_output("Success")
- def transformation_add_MT(RAMified_metamodel, source_metamodels, target_metamodels, operation_name, code):
- """Create a new model transformation."""
- global mode
- if mode != MODE_MODELLING:
- raise InvalidMode()
- try:
- compiled = _compile_model(code)
- except Exception as e:
- raise CompilationError(e)
- _input(["transformation_add_MT", RAMified_metamodel] + source_metamodels + [""] + target_metamodels + ["", operation_name])
- _handle_output("Waiting for model constructors...")
- _input(compiled)
- _handle_output("Success")
- def transformation_add_AL(source_metamodels, target_metamodels, operation_name, code):
- """Create a new action language model, which can be executed."""
- global mode
- if mode != MODE_MODELLING:
- raise InvalidMode()
- try:
- compiled = _compile_AL(code)
- except Exception as e:
- raise CompilationError(e)
- _input(["transformation_add_AL"] + source_metamodels + [""] + target_metamodels + [""] + [operation_name])
- _handle_output("Waiting for code constructors...")
- _input(compiled)
- _output("Success")
- def transformation_add_MANUAL(source_metamodels, target_metamodels, operation_name):
- """Create a new manual model operation."""
- global mode
- if mode != MODE_MODELLING:
- raise InvalidMode()
- _input(["transformation_add_MANUAL"] + source_metamodels + [""] + target_metamodels + [""] + [operation_name])
- _handle_output("Success")
- def transformation_execute_AL(operation_name, input_models_dict, output_models_dict, callback=lambda i: None):
- """Execute an existing model operation."""
- global mode
- if mode != MODE_MODELLING:
- raise InvalidMode()
- mv_dict_rep = []
- for key, value in input_models_dict.items():
- mv_dict_rep += [key, value]
- mv_dict_rep += [""]
- for key, value in output_models_dict.items():
- mv_dict_rep += [key, value]
- mv_dict_rep += [""]
- _input(["transformation_execute", operation_name] + mv_dict_rep)
- _handle_output("Success: ready for AL execution")
- # We are now executing, so everything we get is part of the dialog, except if it is the string for transformation termination
- while _output() not in ["Success", "Failure"]:
- mode = MODE_DIALOG
- reply = callback(_last_output())
- mode = MODE_MODELLING
- if reply is not None:
- _input(reply)
- # Got termination message, so we are done!
- if _last_output() == "Success":
- return True
- else:
- return False
- def transformation_execute_MANUAL(operation_name, input_models_dict, output_models_dict, callback=lambda i: None):
- """Execute an existing model operation."""
- global mode
- if mode != MODE_MODELLING:
- raise InvalidMode()
- mv_dict_rep = []
- for key, value in input_models_dict.items():
- mv_dict_rep += [key, value]
- mv_dict_rep += [""]
- for key, value in output_models_dict.items():
- mv_dict_rep += [key, value]
- mv_dict_rep += [""]
- _input(["transformation_execute", operation_name] + mv_dict_rep)
- _handle_output("Success: ready for MANUAL execution")
- # Skip over the begin of mini_modify
- _output()
- _output()
- _output()
- _output()
- # We are now executing, so everything we get is part of the dialog, except if it is the string for transformation termination
- mode = MODE_MANUAL
- callback()
- # Finished, so leave
- _input("exit")
- mode = MODE_MODELLING
- # Got termination message, so we are done!
- if _output() == "Success":
- return True
- else:
- return False
- def transformation_execute_MT(operation_name, input_models_dict, output_models_dict, callback=lambda i: None):
- """Execute an existing model operation."""
- global mode
- if mode != MODE_MODELLING:
- raise InvalidMode()
- mv_dict_rep = []
- for key, value in input_models_dict.items():
- mv_dict_rep += [key, value]
- mv_dict_rep += [""]
- for key, value in output_models_dict.items():
- mv_dict_rep += [key, value]
- mv_dict_rep += [""]
- _input(["transformation_execute", operation_name] + mv_dict_rep)
- _handle_output("Success: ready for MT execution")
- # We are now executing, so everything we get is part of the dialog, except if it is the string for transformation termination
- while _output() not in ["Success", "Failure"]:
- reply = callback(_last_output())
- if reply is not None:
- _input(reply)
- # Got termination message, so we are done!
- if _last_output() == "Success":
- return True
- else:
- return False
- def transformation_list():
- """List existing model operations."""
- global mode
- if mode != MODE_MODELLING:
- raise InvalidMode()
- _input("transformation_list")
- output = _handle_output("Success: ", split=" ")
- if output == "":
- return set([])
- lst = set([])
- value = output.strip().split("\n")
- for v in value:
- t, m = v.strip().split(" ", 1)
- t = t[1:-1].strip()
- m = m.strip().split(":")[0].strip()
- lst.add((t, m))
- return lst
- def transformation_detail():
- """List full details of a a model operation."""
- raise NotImplementedError()
- def transformation_RAMify(metamodel_name, RAMified_metamodel_name):
- """Ramify an existing metamodel."""
- global mode
- if mode != MODE_MODELLING:
- raise InvalidMode()
- _input(["transformation_RAMify", metamodel_name, RAMified_metamodel_name])
- _handle_output("Success")
- def process_execute():
- """Execute a process model."""
- raise NotImplementedError()
- def permission_modify():
- """Modify permissions of a model."""
- raise NotImplementedError()
- def permission_owner():
- """Modify the owning user of a model."""
- raise NotImplementedError()
- def permission_group():
- """Modify the owning group of a model."""
- raise NotImplementedError()
- def group_create():
- """Create a new group."""
- raise NotImplementedError()
- def group_delete():
- """Delete a group of which you are an owner."""
- raise NotImplementedError()
- def group_owner_add():
- """Add a new owning user to a group you own."""
- raise NotImplementedError()
- def group_owner_delete():
- """Delete an owning user to a group you own."""
- raise NotImplementedError()
- def group_join():
- """Add a new user to a group you own."""
- raise NotImplementedError()
- def group_kick():
- """Delete a user from a group you own."""
- raise NotImplementedError()
- def group_list():
- """List existing groups."""
- raise NotImplementedError()
- def admin_promote():
- """Promote a user to admin status."""
- raise NotImplementedError()
- def admin_demote():
- """Demote a user from admin status."""
- raise NotImplementedError()
- # Actual operations on the model
- def element_list(model_name):
- """Return a list of all IDs and the type of the element"""
- # return [(name1, type1), (name2, type2), ...]
- # raises UnknownError
- model_modify(model_name)
- if mode != MODE_MODIFY:
- raise InvalidMode()
- try:
- _input("list_full")
- lst = set([])
- output = _handle_output("Success: ", split=" ")
- if output == "":
- return set([])
- for v in output.split("\n"):
- m, mm = v.split(":")
- m = m.strip()
- mm = mm.strip()
- lst.add((m, mm))
- return lst
- finally:
- model_exit()
- def types(model_name):
- """Return a list of all types usable in the model"""
- # return [type1, type2, ...]
- # raises UnknownError
- model_modify(model_name)
- if mode != MODE_MODIFY:
- raise InvalidMode()
- try:
- _input("types")
- lst = set([])
- output = _handle_output("Success: ", split=" ")
- if output == "":
- return set([])
- for v in output.split("\n"):
- m, mm = v.split(":")
- m = m.strip()
- lst.add(m)
- return lst
- finally:
- model_exit()
- def types_full(model_name):
- """Return a list of full types usable in the model"""
- # return [(type1, typetype1), (type2, typetype2), ...]
- # raises UnknownError
- model_modify(model_name)
- if mode != MODE_MODIFY:
- raise InvalidMode()
- try:
- _input("types")
- lst = set([])
- output = _handle_output("Success: ", split=" ")
- if output == "":
- return set([])
- for v in output.split("\n"):
- m, mm = v.split(":")
- m = m.strip()
- mm = mm.strip()
- lst.add((m, mm))
- return lst
- finally:
- model_exit()
- def read(model_name, ID):
- """Return a tuple of information on the element: its type and source/target (None if not an edge)"""
- # return (type, (source, target))
- # raises UnknownError
- # raises UnknownIdentifier
- model_modify(model_name)
- if mode != MODE_MODIFY:
- raise InvalidMode()
- try:
- _input(["read", ID])
- output = _handle_output("Success: ", split=" ")
- v = output.split("\n")
- t = v[1].split(":")[1].strip()
- if (not v[0].startswith("Source:")):
- rval = (t, None)
- else:
- src = v[0].split(":")[1].strip()
- trg = v[1].split(":")[1].strip()
- rval = (t, (src, trg))
- return rval
- finally:
- model_exit()
- def read_attrs(model_name, ID):
- """Return a dictionary of attribute value pairs"""
- # return {attr1: value1, attr2: value2, ...}
- # raises UnknownError
- # raises UnknownIdentifier
- model_modify(model_name)
- if mode != MODE_MODIFY:
- raise InvalidMode()
- try:
- _input(["read", ID])
- output = _handle_output("Success: ", split=" ")
- v = output.split("\n")
- searching = True
- rval = {}
- for r in v:
- if searching:
- if r == "Attributes:":
- # Start working on attributes
- searching = False
- else:
- key, value = r.split(":", 1)
- _, value = value.split("=", 1)
- key = json.loads(key.strip())
- value = value.strip()
- if value == "None":
- value = None
- elif value == "True":
- value = True
- elif value == "False":
- value = False
- else:
- value = json.loads(value)
- rval[key] = value
- return rval
- finally:
- model_exit()
- def instantiate(model_name, typename, edge=None, ID=""):
- """Create a new instance of the specified typename, between the selected elements (if not None), and with the provided ID (if any)"""
- # return instantiated_ID
- # raises UnknownError
- # raises UnknownType
- # raises UnknownIdentifier
- # raises NotAnEdge
- model_modify(model_name)
- if mode != MODE_MODIFY:
- raise InvalidMode()
- try:
- if edge is None:
- _input(["instantiate_node", typename, ID])
- else:
- _input(["instantiate_edge", typename, ID, edge[0], edge[1]])
- return _handle_output("Success: ", split=" ")
- finally:
- model_exit()
- def delete_element(model_name, ID):
- """Delete the element with the given ID"""
- # return None
- # raises UnknownError
- # raises UnknownIdentifier
- model_modify(model_name)
- if mode != MODE_MODIFY:
- raise InvalidMode()
- try:
- _input(["delete", ID])
- _handle_output("Success")
- finally:
- model_exit()
- def attr_assign(model_name, ID, attr, value):
- """Assign a value to an attribute"""
- # return None
- # raises UnknownError
- # raises UnknownIdentifier
- # raises NoSuchAttribute
- # raises UnsupportedValue
- model_modify(model_name)
- if mode != MODE_MODIFY:
- raise InvalidMode()
- try:
- _input(["attr_add", ID, attr, value])
- _handle_output("Success")
- finally:
- model_exit()
- def attr_assign_code(model_name, ID, attr, code):
- """Assign a piece of Action Language code to the attribute"""
- # return None
- # raises UnknownError
- # raises UnknownIdentifier
- # raises NoSuchAttribute
- # raises UnsupportedValue
- try:
- compiled = _compile_AL(code)
- except Exception as e:
- raise CompilationError(e)
- model_modify(model_name)
- if mode != MODE_MODIFY:
- raise InvalidMode()
- try:
- _input(["attr_add", ID, attr])
- _handle_output("Waiting for code constructors...")
- _input(compiled)
- _output("Success")
- finally:
- model_exit()
- def attr_delete(model_name, ID, attr):
- """Remove an attribute."""
- model_modify(model_name)
- if mode != MODE_MODIFY:
- raise InvalidMode()
- try:
- _input(["attr_del", ID, attr])
- _handle_output("Success")
- finally:
- model_exit()
- def read_outgoing(model_name, ID, typename):
- """Returns a list of all outgoing associations of a specific type ("" = all)"""
- # return [name1, name2, ...]
- # raises UnknownError
- # raises UnknownIdentifier
- model_modify(model_name)
- if mode != MODE_MODIFY:
- raise InvalidMode()
- try:
- _input(["read_outgoing", ID, typename])
- output = _handle_output("Success: ", split=" ")
- if output == "":
- return set([])
- else:
- return set(output.split("\n"))
- finally:
- model_exit()
- def read_incoming(model_name, ID, typename):
- """Returns a list of all incoming associations of a specific type ("" = all)"""
- # return [name1, name2, ...]
- # raises UnknownError
- # raises UnknownIdentifier
- # raises UnknownType
- model_modify(model_name)
- if mode != MODE_MODIFY:
- raise InvalidMode()
- try:
- _input(["read_incoming", ID, typename])
- output = _handle_output("Success: ", split=" ")
- if output == "":
- return set([])
- else:
- return set(output.split("\n"))
- finally:
- model_exit()
- def read_association_source(model_name, ID):
- """Returns the source of an association."""
- # returns name
- # raises UnknownError
- # raises UnknownIdentifier
- # raises NotAnAssociation
- model_modify(model_name)
- if mode != MODE_MODIFY:
- raise InvalidMode()
- try:
- _input(["read_association_source", ID])
- return _handle_output("Success: ", split=" ")
- finally:
- model_exit()
- def read_association_destination(model_name, ID):
- """Returns the destination of an association."""
- # returns name
- # raises UnknownError
- # raises UnknownIdentifier
- # raises NotAnAssociation
- model_modify(model_name)
- if mode != MODE_MODIFY:
- raise InvalidMode()
- try:
- _input(["read_association_destination", ID])
- return _handle_output("Success: ", split=" ")
- finally:
- model_exit()
- def model_exit():
- """Leave model modify mode."""
- # return None
- # raises UnknownError
- global mode
- global prev_mode
- if prev_mode == MODE_MANUAL:
- mode = MODE_MANUAL
- return
- if mode != MODE_MODIFY:
- raise InvalidMode()
- _input("exit")
- _output("Success")
- mode = MODE_MODELLING
|