123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999 |
- import urllib
- import urllib2
- import json
- import random
- from urllib2 import URLError
- import sys
- import time
- import threading
- COMPILER_PATH = "interface/HUTN"
- MODE_UNCONNECTED = 0
- MODE_UNAUTHORIZED = 1
- MODE_MODELLING = 2
- MODE_MODIFY = 3
- MODE_DIALOG = 4
- MODE_MANUAL = 5
- MODE_SERVICE = 6
- # Bind to the compiler (might have to update path manually!)
- sys.path.append(COMPILER_PATH)
- from hutn_compiler.compiler import main as do_compile
- # Exceptions
- class ModelverseException(Exception):
- pass
- class UnknownError(ModelverseException):
- pass
- class UnknownIdentifier(ModelverseException):
- pass
- class CompilationError(ModelverseException):
- pass
- class NoSuchAttribute(ModelverseException):
- pass
- class UnknownModel(ModelverseException):
- pass
- class ConnectionError(ModelverseException):
- pass
- class ModelExists(ModelverseException):
- pass
- class PermissionDenied(ModelverseException):
- pass
- class InvalidMode(ModelverseException):
- pass
- class InterfaceMismatch(ModelverseException):
- pass
- class UnknownMetamodellingHierarchy(ModelverseException):
- pass
- # Helper functions and configuration: do not use yourself!
- taskname = None
- address = None
- last_output = None
- mode = MODE_UNCONNECTED
- prev_mode = None
- current_model = None
- registered_metamodels = {}
- def _check_type(value):
- if not isinstance(value, (int, long, float, str, unicode, bool)):
- raise UnsupportedValue("%s : %s" % (value, str(type(value))))
- def _dict_to_list(python_dict):
- lst = []
- for k, v in python_dict.items():
- lst += [k, v]
- return lst
- def _goto_mode(new_mode, model_name=None):
- global mode
- global registered_metamodels
- if mode == MODE_MANUAL and new_mode == MODE_MODIFY:
- if model_name != None and current_model != model_name:
- raise InvalidMode("Mode error: cannot modify other models!")
- else:
- return
- elif mode == MODE_MODELLING and new_mode == MODE_MODIFY:
- # Are in root view, but want to modify a model
- if model_name not in registered_metamodels:
- raise UnfamiliarMetamodel(model_name)
- _model_modify(model_name, registered_metamodels[model_name])
- elif mode == MODE_MODIFY and new_mode == MODE_MODIFY and model_name != None and current_model != model_name:
- # Are in modify mode, but want to modify a different model
- if model_name not in registered_metamodels:
- raise UnfamiliarMetamodel(model_name)
- _model_exit()
- _model_modify(model_name, registered_metamodels[model_name])
- elif mode == MODE_MODIFY and new_mode == MODE_MODELLING:
- _model_exit()
- elif mode == new_mode:
- return
- else:
- # Go to a mode that we have no automatic transfer to: raise exception
- raise InvalidMode("Required mode: %s, current mode: %s" % (new_mode, mode))
- def _input(value, port=None):
- # Ugly json encoding of primitives
- #print("[IN] %s" % value)
- if port is None:
- port = taskname
- if isinstance(value, type([])):
- value = json.dumps(value)
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "data": value, "taskname": port}))).read()
- else:
- value = json.dumps(value)
- #print("Set input: " + str(value))
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": value, "taskname": port}))).read()
- def _input_raw(value, taskname):
- # Ugly json encoding of primitives
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": value, "taskname": taskname}))).read()
- #TODO check that this is actually a Modelverse!
- def _compile_AL(code):
- # Compile an action language file and send the compiled code
- code_fragments = code.split("\n")
- code_fragments = [i for i in code_fragments if i.strip() != ""]
- code_fragments = [i.replace(" ", "\t") for i in code_fragments]
- initial_tabs = min([len(i) - len(i.lstrip("\t")) for i in code_fragments])
- code_fragments = [i[initial_tabs:] for i in code_fragments]
- code_fragments.append("")
- code = "\n".join(code_fragments)
- with open(".code.alc", "w") as f:
- f.write(code)
- f.flush()
- compiled = do_compile(".code.alc", COMPILER_PATH + "/grammars/actionlanguage.g", "CS")
- return compiled
- def _compile_model(code):
- # Compile a model and send the compiled graph
- # First change multiple spaces to a tab
- code_fragments = code.split("\n")
- code_fragments = [i for i in code_fragments if i.strip() != ""]
- code_fragments = [i.replace(" ", "\t") for i in code_fragments]
- initial_tabs = min([len(i) - len(i.lstrip("\t")) for i in code_fragments])
- code_fragments = [i[initial_tabs:] for i in code_fragments]
- code_fragments.append("")
- code = "\n".join(code_fragments)
- with open(".model.mvc", "w") as f:
- f.write(code)
- f.flush()
- return do_compile(".model.mvc", COMPILER_PATH + "/grammars/modelling.g", "M") + ["exit"]
- def _output(expected=None,port=None):
- if port is None:
- port = taskname
- try:
- global last_output
- last_output = json.loads(urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "get_output", "taskname": port}))).read())
- #print("[OUT] %s" % last_output)
- except:
- raise UnknownError()
- if expected is not None and last_output != expected:
- raise InterfaceMismatch(_last_output(), expected)
- return last_output
- def _last_output():
- return last_output
- # Raise common exceptions
- def _handle_output(requested=None, split=None):
- value = _output()
- if value.startswith("Model exists: "):
- raise ModelExists(value.split(": ", 1)[1])
- elif value.startswith("Permission denied"):
- raise PermissionDenied(value.split(": ", 1)[1])
- elif value.startswith("Model not found: "):
- raise UnknownModel(value.split(": ", 1)[1])
- elif value.startswith("Element not found: "):
- raise UnknownIdentifier(value.split(": ", 1)[1])
- elif value.startswith("Element exists: "):
- raise ElementExists(value.split(": ", 1)[1])
- elif value.startswith("Attribute not found: "):
- raise NoSuchAttribute(value.split(": ", 1)[1])
- elif requested is not None and value.startswith(requested):
- if split is None:
- return value
- else:
- splitted = value.strip().split(split, 1)
- if len(splitted) == 1:
- return ""
- else:
- return splitted[1].rstrip()
- else:
- raise InterfaceMismatch(value)
-
- def _model_modify(model_name, metamodel_name):
- """Modify an existing model."""
- global mode
- global prev_mode
- if mode == MODE_MANUAL:
- prev_mode = MODE_MANUAL
- mode = MODE_MODIFY
- return None
- _goto_mode(MODE_MODELLING)
- prev_mode = MODE_MODELLING
- _input(["model_modify", model_name, metamodel_name])
- _handle_output("Success")
- global current_model
- current_model = model_name
- # Mode has changed
- mode = MODE_MODIFY
- _output("Model loaded, ready for commands!")
- def _model_exit():
- """Leave model modify mode."""
- global mode
- global prev_mode
- if prev_mode == MODE_MANUAL:
- mode = MODE_MANUAL
- return
- if mode != MODE_MODIFY:
- raise InvalidMode()
- _input("exit")
- _output("Success")
- mode = MODE_MODELLING
- def alter_context(model_name, metamodel_name):
- global registered_metamodels
- registered_metamodels[model_name] = metamodel_name
- # Main MvC operations
- def init(address_param="http://127.0.0.1:8001", timeout=20.0):
- """Starts up the connection to the Modelverse."""
- global mode
- global address
- global taskname
- address = address_param
- start_time = time.time()
- taskname = random.random()
- while 1:
- try:
- _input_raw('"%s"' % taskname, "task_manager")
- mode = MODE_UNAUTHORIZED
- break
- except URLError as e:
- if time.time() - start_time > timeout:
- raise ConnectionError(e.reason)
- else:
- time.sleep(0.1)
- def login(username, password):
- """Log in a user, if user doesn't exist, it is created."""
- global mode
- _goto_mode(MODE_UNAUTHORIZED)
- _output("Log on as which user?")
- _input(username)
- if _output() == "Password for existing user?":
- _input(password)
- if _output() == "Welcome to the Model Management Interface v2.0!":
- _output("Use the 'help' command for a list of possible commands")
- _input("quiet")
- mode = MODE_MODELLING
- elif _last_output() == "Wrong password!":
- raise PermissionDenied()
- else:
- raise InterfaceMismatch(_last_output())
- elif _last_output() == "This is a new user: please give password!":
- _input(password)
- _output("Please repeat the password")
- _input(password)
- if _output() == "Passwords match!":
- _output("Welcome to the Model Management Interface v2.0!")
- _output("Use the 'help' command for a list of possible commands")
- _input("quiet")
- mode = MODE_MODELLING
- elif _last_output() == "Not the same password!":
- # We just sent the same password, so it should be identical, unless the interface changed
- raise InterfaceMismatch(_last_output())
- else:
- raise InterfaceMismatch(_last_output())
- else:
- raise InterfaceMismatch(_last_output())
- def model_add(model_name, metamodel_name, model_code=None):
- """Instantiate a new model."""
- _goto_mode(MODE_MODELLING)
- # Do this before creating the model, as otherwise compilation errors would make us inconsistent
- if model_code is not None:
- try:
- compiled = _compile_model(model_code)
- except Exception as e:
- raise CompilationError(e)
- else:
- compiled = ["exit"]
- _input(["model_add", metamodel_name, model_name])
- _handle_output("Waiting for model constructors...")
- _input(compiled)
- _output("Success")
- global registered_metamodels
- registered_metamodels[model_name] = metamodel_name
- def upload_code(code):
- try:
- compiled = _compile_AL(code)
- except Exception as e:
- raise CompilationError(e)
- _input(compiled)
- def model_delete(model_name):
- """Delete an existing model."""
- _goto_mode(MODE_MODELLING)
- _input(["model_delete", model_name])
- _handle_output("Success")
- def model_list():
- """List all models."""
- _goto_mode(MODE_MODELLING)
- _input("model_list")
- output = _handle_output("Success: ", split=" ")
- if output == "":
- return set([])
- lst = set([])
- value = output.strip().split("\n")
- for v in value:
- m, mm = v.split(":")
- m = m.strip()
- mm = mm.strip()
- lst.add((m, mm))
- return lst
- def model_list_full():
- """List full information on all models."""
- _goto_mode(MODE_MODELLING)
- _input("model_list_full")
- output = _handle_output("Success: ", split=" ")
- if output == "":
- return set([])
- lst = set([])
- value = output.strip().split("\n")
- for v in value:
- m, mm = v.split(":")
- m = m.strip()
- mm = mm.strip()
- perm, own, grp, m = m.split(" ")
- lst.add((m, mm, own, grp, perm))
- return lst
- def verify(model_name, metamodel_name=None):
- """Verify if a model conforms to its metamodel."""
- _goto_mode(MODE_MODELLING)
- if metamodel_name is None:
- global registered_metamodels
- if model_name not in registered_metamodels:
- raise UnknownMetamodellingHierarchy(model_name)
- metamodel_name = registered_metamodels[model_name]
- _input(["verify", model_name, metamodel_name])
- return _handle_output("Success: ", split=" ")
- def model_overwrite(model_name, new_model=None, metamodel_name=None):
- """Upload a new model and overwrite an existing model."""
- global registered_metamodels
- _goto_mode(MODE_MODIFY, model_name)
- if new_model is not None:
- try:
- compiled = _compile_model(new_model)
- except Exception as e:
- raise CompilationError(e)
- else:
- compiled = ["exit"]
- _input("upload")
- _handle_output("Waiting for model constructors...")
- _input(compiled)
- _output("Success")
- if metamodel_name is not None:
- registered_metamodels[model_name] = metamodel_name
- def user_logout():
- """Log out the current user and break the connection."""
- global mode
- _goto_mode(MODE_MODELLING)
- _input("exit")
- mode = MODE_UNCONNECTED
- def user_delete():
- """Removes the current user and break the connection."""
- global mode
- _goto_mode(MODE_MODELLING)
- _input("self-destruct")
- mode = MODE_UNCONNECTED
- def model_render(model_name, mapper_name):
- """Fetch a rendered verion of a model."""
- _goto_mode(MODE_MODELLING)
- _input(["model_render", model_name, mapper_name])
- return json.loads(_handle_output("Success: ", split=" "))
- def transformation_between(source, target):
- _goto_mode(MODE_MODELLING)
- _input(["transformation_between", source, target])
- output = _handle_output("Success: ", split=" ")
- if output == "":
- return set([])
- return set([v for v in output.split("\n")])
- def transformation_add_MT(source_metamodels, target_metamodels, operation_name, code, callback=lambda: None):
- """Create a new model transformation."""
- global mode
- _goto_mode(MODE_MODELLING)
- import time
- start = time.time()
- try:
- compiled = _compile_model(code)
- except Exception as e:
- raise CompilationError(e)
- #print("Compilation took: %ss" % (time.time() - start))
- start = time.time()
- mv_dict_rep = _dict_to_list(source_metamodels) + [""] + _dict_to_list(target_metamodels) + [""]
- _input(["transformation_add_MT"] + mv_dict_rep + [operation_name])
- # Possibly modify the merged metamodel first (add tracability links)
- if len(source_metamodels) + len(target_metamodels) > 0:
- mode = MODE_MANUAL
- _output("Model loaded, ready for commands!")
- callback()
- _input("exit")
- mode = MODE_MODELLING
- #print("Callbacks took: %ss" % (time.time() - start))
- start = time.time()
- # Done, so RAMify and upload the model
- _handle_output("Waiting for model constructors...")
- _input(compiled)
- _handle_output("Success")
- #print("Upload and RAMify took: %ss" % (time.time() - start))
- def transformation_add_AL(source_metamodels, target_metamodels, operation_name, code, callback=lambda: None):
- """Create a new action language model, which can be executed."""
- global mode
- _goto_mode(MODE_MODELLING)
- try:
- compiled = _compile_AL(code)
- except Exception as e:
- raise CompilationError(e)
- mv_dict_rep = _dict_to_list(source_metamodels) + [""] + _dict_to_list(target_metamodels) + [""]
- _input(["transformation_add_AL"] + mv_dict_rep + [operation_name])
- # Possibly modify the merged metamodel first (add tracability links)
- if len(source_metamodels) + len(target_metamodels) > 0:
- mode = MODE_MANUAL
- _output("Model loaded, ready for commands!")
- callback()
- _input("exit")
- mode = MODE_MODELLING
- _handle_output("Waiting for code constructors...")
- _input(compiled)
- _output("Success")
- def transformation_add_MANUAL(source_metamodels, target_metamodels, operation_name, callback=lambda: None):
- """Create a new manual model operation."""
- global mode
- _goto_mode(MODE_MODELLING)
- mv_dict_rep = _dict_to_list(source_metamodels) + [""] + _dict_to_list(target_metamodels) + [""]
- _input(["transformation_add_MANUAL"] + mv_dict_rep + [operation_name])
- # Possibly modify the merged metamodel first (add tracability links)
- if len(source_metamodels) + len(target_metamodels) > 0:
- mode = MODE_MANUAL
- _output("Model loaded, ready for commands!")
- callback()
- _input("exit")
- mode = MODE_MODELLING
- _handle_output("Success")
- def transformation_execute_AL(operation_name, input_models_dict, output_models_dict, callback=lambda i: None):
- """Execute an existing model operation."""
- global mode
- _goto_mode(MODE_MODELLING)
- mv_dict_rep = _dict_to_list(input_models_dict) + [""] + _dict_to_list(output_models_dict) + [""]
- _input(["transformation_execute", operation_name] + mv_dict_rep)
- _handle_output("Success: ready for AL execution")
- # We are now executing, so everything we get is part of the dialog, except if it is the string for transformation termination
- while _output() not in ["Success", "Failure"]:
- mode = MODE_DIALOG
- reply = callback(_last_output())
- mode = MODE_MODELLING
- if reply is not None:
- _input(reply)
- # Got termination message, so we are done!
- if _last_output() == "Success":
- return True
- else:
- return False
- def transformation_execute_MANUAL(operation_name, input_models_dict, output_models_dict, callback=lambda i: None):
- """Execute an existing model operation."""
- global mode
- _goto_mode(MODE_MODELLING)
- mv_dict_rep = _dict_to_list(input_models_dict) + [""] + _dict_to_list(output_models_dict) + [""]
- _input(["transformation_execute", operation_name] + mv_dict_rep)
- _handle_output("Success: ready for MANUAL execution")
- # Skip over the begin of mini_modify
- _handle_output("Please perform manual operation ")
- _output("Model loaded, ready for commands!")
- # We are now executing, so everything we get is part of the dialog, except if it is the string for transformation termination
- mode = MODE_MANUAL
- callback()
- # Finished, so leave
- _input("exit")
- mode = MODE_MODELLING
- # Got termination message, so we are done!
- if _output() == "Success":
- return True
- else:
- return False
- def transformation_execute_MT(operation_name, input_models_dict, output_models_dict, callback=lambda i: None):
- """Execute an existing model operation."""
- global mode
- _goto_mode(MODE_MODELLING)
- mv_dict_rep = _dict_to_list(input_models_dict) + [""] + _dict_to_list(output_models_dict) + [""]
- _input(["transformation_execute", operation_name] + mv_dict_rep)
- _handle_output("Success: ready for MT execution")
- # We are now executing, so everything we get is part of the dialog, except if it is the string for transformation termination
- while _output() not in ["Success", "Failure"]:
- mode = MODE_DIALOG
- reply = callback(_last_output())
- mode = MODE_MODELLING
- if reply is not None:
- _input(reply)
- # Got termination message, so we are done!
- if _last_output() == "Success":
- return True
- else:
- return False
- def transformation_list():
- """List existing model operations."""
- _goto_mode(MODE_MODELLING)
- _input("transformation_list")
- output = _handle_output("Success: ", split=" ")
- if output == "":
- return set([])
- lst = set([])
- value = output.strip().split("\n")
- for v in value:
- t, m = v.strip().split(" ", 1)
- t = t[1:-1].strip()
- m = m.strip().split(":")[0].strip()
- lst.add((t, m))
- return lst
- def process_execute(process_name, prefix, callbacks):
- """Execute a process model."""
- global mode
- _goto_mode(MODE_MODELLING)
- _input(["process_execute", process_name, prefix])
- _handle_output("Success")
- while _output() != "Success":
- output = _last_output()
- if output.startswith("Enacting "):
- # Next activity!
- t = output.split(" ", 1)[1].split(":", 1)[0]
- name = output.split(": ", 1)[1]
- if name in callbacks:
- callback = callbacks[name]
- if t == "ModelTransformation" or t == "ActionLanguage":
- while not (_output().startswith("Enacting ") or _last_output() == "Success"):
- mode = MODE_DIALOG
- reply = callback(_last_output())
- mode = MODE_MODELLING
- if reply is not None:
- _input(reply)
- elif t == "ManualOperation":
- _handle_output("Please perform manual operation ")
- _output("Model loaded, ready for commands!")
- mode = MODE_MANUAL
- callback()
- _input("exit")
- mode = MODE_MODELLING
- def permission_modify(model_name, permissions):
- """Modify permissions of a model."""
- _goto_mode(MODE_MODELLING)
- _input(["permission_modify", model_name, permissions])
- _handle_output("Success")
- def permission_owner(model_name, owner):
- """Modify the owning user of a model."""
- _goto_mode(MODE_MODELLING)
- _input(["permission_owner", model_name, owner])
- _handle_output("Success")
- def permission_group(model_name, group):
- """Modify the owning group of a model."""
- _goto_mode(MODE_MODELLING)
- _input(["permission_group", model_name, group])
- _handle_output("Success")
- def group_create(group_name):
- """Create a new group."""
- _goto_mode(MODE_MODELLING)
- _input(["group_create", group_name])
- _handle_output("Success")
- def group_delete(group_name):
- """Delete a group of which you are an owner."""
- _goto_mode(MODE_MODELLING)
- _input(["group_delete", group_name])
- _handle_output("Success")
- def group_owner_add(group_name, user_name):
- """Add a new owning user to a group you own."""
- _goto_mode(MODE_MODELLING)
- _input(["owner_add", group_name, user_name])
- _handle_output("Success")
- def group_owner_delete(group_name, user_name):
- """Delete an owning user to a group you own."""
- _goto_mode(MODE_MODELLING)
- _input(["owner_delete", group_name, user_name])
- _handle_output("Success")
- def group_join(group_name, user_name):
- """Add a new user to a group you own."""
- _goto_mode(MODE_MODELLING)
- _input(["group_join", group_name, user_name])
- _handle_output("Success")
- def group_kick(group_name, user_name):
- """Delete a user from a group you own."""
- _goto_mode(MODE_MODELLING)
- _input(["group_kick", group_name, user_name])
- _handle_output("Success")
- def group_list():
- """List existing groups."""
- _goto_mode(MODE_MODELLING)
- _input(["group_list"])
- _handle_output("Success")
- def admin_promote(user_name):
- """Promote a user to admin status."""
- _goto_mode(MODE_MODELLING)
- _input(["admin_promote", user_name])
- _handle_output("Success")
- def admin_demote():
- """Demote a user from admin status."""
- _goto_mode(MODE_MODELLING)
- _input(["admin_demote", user_name])
- _handle_output("Success")
- # Actual operations on the model
- def element_list(model_name):
- """Return a list of all IDs and the type of the element"""
- _goto_mode(MODE_MODIFY, model_name)
- _input("list_full")
- lst = set([])
- output = _handle_output("Success: ", split=" ")
- if output == "":
- return set([])
- for v in output.split("\n"):
- m, mm = v.split(":")
- m = m.strip()
- mm = mm.strip()
- lst.add((m, mm))
- return lst
- def types(model_name):
- """Return a list of all types usable in the model"""
- _goto_mode(MODE_MODIFY, model_name)
- _input("types")
- lst = set([])
- output = _handle_output("Success: ", split=" ")
- if output == "":
- return set([])
- for v in output.split("\n"):
- m, mm = v.split(":")
- m = m.strip()
- lst.add(m)
- return lst
- def types_full(model_name):
- """Return a list of full types usable in the model"""
- _goto_mode(MODE_MODIFY, model_name)
- _input("types")
- lst = set([])
- output = _handle_output("Success: ", split=" ")
- if output == "":
- return set([])
- for v in output.split("\n"):
- m, mm = v.split(":")
- m = m.strip()
- mm = mm.strip()
- lst.add((m, mm))
- return lst
- def read(model_name, ID):
- """Return a tuple of information on the element: its type and source/target (None if not an edge)"""
- _goto_mode(MODE_MODIFY, model_name)
- _input(["read", ID])
- output = _handle_output("Success: ", split=" ")
- v = output.split("\n")
- t = v[1].split(":")[1].strip()
- if (not v[2].startswith("Source:")):
- rval = (t, None)
- else:
- src = v[2].split(":")[1].strip()
- trg = v[3].split(":")[1].strip()
- rval = (t, (src, trg))
- return rval
- def read_attrs(model_name, ID):
- """Return a dictionary of attribute value pairs"""
- _goto_mode(MODE_MODIFY, model_name)
- _input(["read", ID])
- output = _handle_output("Success: ", split=" ")
- v = output.split("\n")
- searching = True
- rval = {}
- for r in v:
- if searching:
- if r == "Attributes:":
- # Start working on attributes
- searching = False
- else:
- key, value = r.split(":", 1)
- _, value = value.split("=", 1)
- key = json.loads(key.strip())
- value = value.strip()
- if value == "None":
- value = None
- elif value == "True":
- value = True
- elif value == "False":
- value = False
- else:
- value = json.loads(value)
- rval[key] = value
- return rval
- def instantiate(model_name, typename, edge=None, ID=""):
- """Create a new instance of the specified typename, between the selected elements (if not None), and with the provided ID (if any)"""
- _goto_mode(MODE_MODIFY, model_name)
- if edge is None:
- _input(["instantiate_node", typename, ID])
- else:
- _input(["instantiate_edge", typename, ID, edge[0], edge[1]])
- return _handle_output("Success: ", split=" ")
- def delete_element(model_name, ID):
- """Delete the element with the given ID"""
- _goto_mode(MODE_MODIFY, model_name)
- _input(["delete", ID])
- _handle_output("Success")
- def attr_assign(model_name, ID, attr, value):
- """Assign a value to an attribute"""
- _check_type(value)
- _goto_mode(MODE_MODIFY, model_name)
- _input(["attr_add", ID, attr, value])
- _handle_output("Success")
- def attr_assign_code(model_name, ID, attr, code):
- """Assign a piece of Action Language code to the attribute"""
- _check_type(code)
- try:
- compiled = _compile_AL(code)
- except Exception as e:
- raise CompilationError(e)
- _goto_mode(MODE_MODIFY, model_name)
- _input(["attr_add", ID, attr])
- _handle_output("Waiting for code constructors...")
- _input(compiled)
- _output("Success")
- def attr_delete(model_name, ID, attr):
- """Remove an attribute."""
- _goto_mode(MODE_MODIFY, model_name)
- _input(["attr_del", ID, attr])
- _handle_output("Success")
- def read_outgoing(model_name, ID, typename):
- """Returns a list of all outgoing associations of a specific type ("" = all)"""
- _goto_mode(MODE_MODIFY, model_name)
- _input(["read_outgoing", ID, typename])
- output = _handle_output("Success: ", split=" ")
- if output == "":
- return set([])
- else:
- return set(output.split("\n"))
- def read_incoming(model_name, ID, typename):
- """Returns a list of all incoming associations of a specific type ("" = all)"""
- _goto_mode(MODE_MODIFY, model_name)
- _input(["read_incoming", ID, typename])
- output = _handle_output("Success: ", split=" ")
- if output == "":
- return set([])
- else:
- return set(output.split("\n"))
- def read_association_source(model_name, ID):
- """Returns the source of an association."""
- _goto_mode(MODE_MODIFY, model_name)
- _input(["read_association_source", ID])
- return _handle_output("Success: ", split=" ")
- def read_association_destination(model_name, ID):
- """Returns the destination of an association."""
- _goto_mode(MODE_MODIFY, model_name)
- _input(["read_association_destination", ID])
- return _handle_output("Success: ", split=" ")
- ##### To document:
- def service_register(name, function):
- """Register a function as a service with a specific name."""
- def service_process(port):
- while 1:
- thrd = threading.Thread(target=function, args=[service_get(port)])
- thrd.daemon = True
- thrd.start()
- global mode
- _goto_mode(MODE_MODELLING)
- _input(["service_register", name])
- # Now we are in service-mode
- mode = MODE_SERVICE
- port = _handle_output("Success: ", split=" ")
- # Process events in the background!
- threading.Thread(target=service_process, args=[port]).start()
- def service_stop():
- """Stop the currently executing process."""
- _goto_mode(MODE_SERVICE)
- _input("service_stop")
- _handle_output("Success")
- global mode
- mode = MODE_MODELLING
- def service_get(port):
- """Get the values on the specified port."""
- _goto_mode(MODE_SERVICE)
- return _output(port=port)
- def service_set(port, value):
- """Set a value on a specified port."""
- _check_type(value)
- _goto_mode(MODE_SERVICE)
- _input(value, port=port)
- def user_password(user, password):
- """Change a user's password."""
- raise NotImplementedError()
- def transformation_read_signature(transformation):
- """Reads an operation's signature, specifying the names and their required types."""
- raise NotImplementedError()
- def element_list_nice(model_name):
- """Fetches a nice representation of models."""
- _goto_mode(MODE_MODELLING)
- global registered_metamodels
- _input(["element_list_nice", model_name, registered_metamodels[model_name]])
- return json.loads(_handle_output("Success: ", split=" "))
- def connections_between(model_name, source_element, target_element):
- """Gets a list of all allowed connections between the source and target element in the model."""
- _goto_mode(MODE_MODIFY, model_name)
- _input(["connections_between", source_element, target_element])
- output = _handle_output("Success: ", split=" ")
- if output == "":
- return set([])
- else:
- return set(output.split("\n"))
- def define_attribute(model_name, node, attr_name, attr_type):
- """Create a new attribute, which can be instantiated one meta-level below."""
- _goto_mode(MODE_MODIFY, model_name)
- _input(["define_attribute", node, attr_name, attr_type])
- return _handle_output("Success: ", split=" ")
- def all_instances(model_name, type_name):
- """Returns a list of all elements of a specific type."""
- _goto_mode(MODE_MODIFY, model_name)
- _input(["all_instances", type_name])
- output = _handle_output("Success: ", split=" ")
- if output == "":
- return set([])
- else:
- return set(output.split("\n"))
- def service_poll(port):
- """Checks whether or not the Modelverse side has any input ready to be processed."""
- raise NotImplementedError()
- def user_name(user, username):
- """Change a user's name."""
- raise NotImplementedError()
- def remove_conformance(model_name, metamodel_name):
- """Remove a metamodel for a model."""
- _goto_mode(MODE_MODELLING)
- _input(["remove_conformance", model_name, metamodel_name])
- _handle_output("Success")
- def add_conformance(model_name, metamodel_name, partial_type_mapping=None):
- """Add a metamodel for a model."""
- raise NotImplementedError()
- _goto_mode(MODE_MODELLING)
- _input(["add_conformance", model_name, metamodel_name])
- _handle_output("Success")
|