1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015 |
- import urllib
- import urllib2
- import json
- import random
- from urllib2 import URLError
- import sys
- import time
- import threading
- COMPILER_PATH = "interface/HUTN"
- MODE_UNCONNECTED = 0
- MODE_UNAUTHORIZED = 1
- MODE_MODELLING = 2
- MODE_MODIFY = 3
- MODE_DIALOG = 4
- MODE_MANUAL = 5
- MODE_SERVICE = 6
- # Bind to the compiler (might have to update path manually!)
- sys.path.append(COMPILER_PATH)
- from hutn_compiler.compiler import main as do_compile
- # Exceptions
- class ModelverseException(Exception):
- pass
- class UnknownError(ModelverseException):
- pass
- class UnknownIdentifier(ModelverseException):
- pass
- class CompilationError(ModelverseException):
- pass
- class NoSuchAttribute(ModelverseException):
- pass
- class UnknownModel(ModelverseException):
- pass
- class ConnectionError(ModelverseException):
- pass
- class ModelExists(ModelverseException):
- pass
- class PermissionDenied(ModelverseException):
- pass
- class InvalidMode(ModelverseException):
- pass
- class InterfaceMismatch(ModelverseException):
- pass
- class UnknownMetamodellingHierarchy(ModelverseException):
- pass
- # Helper functions and configuration: do not use yourself!
- taskname = None
- address = None
- last_output = None
- mode = MODE_UNCONNECTED
- prev_mode = None
- current_model = None
- registered_metamodels = {}
- def _get_metamodel(model):
- global registered_metamodels
- try:
- return registered_metamodels[model]
- except KeyError:
- raise UnknownMetamodellingHierarchy(model)
- def _check_type(value):
- if not isinstance(value, (int, long, float, str, unicode, bool)):
- raise UnsupportedValue("%s : %s" % (value, str(type(value))))
- def _check_type_list(value):
- if isinstance(value, list):
- [_check_type(i) for i in value]
- else:
- _check_type(value)
- def _dict_to_list(python_dict):
- lst = []
- for k, v in python_dict.items():
- lst += [k, v]
- return lst
- def _goto_mode(new_mode, model_name=None):
- global mode
- if mode == MODE_MANUAL and new_mode == MODE_MODIFY:
- if model_name != None and current_model != model_name:
- raise InvalidMode("Mode error: cannot modify other models!")
- else:
- return
- elif mode == MODE_MODELLING and new_mode == MODE_MODIFY:
- # Are in root view, but want to modify a model
- _model_modify(model_name, _get_metamodel(model_name))
- elif mode == MODE_MODIFY and new_mode == MODE_MODIFY and model_name != None and current_model != model_name:
- # Are in modify mode, but want to modify a different model
- mm = _get_metamodel(model_name)
- _model_exit()
- _model_modify(model_name, mm)
- elif mode == MODE_MODIFY and new_mode == MODE_MODELLING:
- _model_exit()
- elif mode == new_mode:
- return
- else:
- # Go to a mode that we have no automatic transfer to: raise exception
- raise InvalidMode("Required mode: %s, current mode: %s" % (new_mode, mode))
- def _input(value, port=None):
- # Ugly json encoding of primitives
- #print("[IN] %s" % value)
- if port is None:
- port = taskname
- if isinstance(value, type([])):
- value = json.dumps(value)
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "data": value, "taskname": port}))).read()
- else:
- value = json.dumps(value)
- #print("Set input: " + str(value))
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": value, "taskname": port}))).read()
- def _input_raw(value, taskname):
- # Ugly json encoding of primitives
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": value, "taskname": taskname}))).read()
- #TODO check that this is actually a Modelverse!
- def _compile_AL(code):
- # Compile an action language file and send the compiled code
- code_fragments = code.split("\n")
- code_fragments = [i for i in code_fragments if i.strip() != ""]
- code_fragments = [i.replace(" ", "\t") for i in code_fragments]
- initial_tabs = min([len(i) - len(i.lstrip("\t")) for i in code_fragments])
- code_fragments = [i[initial_tabs:] for i in code_fragments]
- code_fragments.append("")
- code = "\n".join(code_fragments)
- with open(".code.alc", "w") as f:
- f.write(code)
- f.flush()
- compiled = do_compile(".code.alc", COMPILER_PATH + "/grammars/actionlanguage.g", "CS")
- return compiled
- def _compile_model(code):
- # Compile a model and send the compiled graph
- # First change multiple spaces to a tab
- code_fragments = code.split("\n")
- code_fragments = [i for i in code_fragments if i.strip() != ""]
- code_fragments = [i.replace(" ", "\t") for i in code_fragments]
- initial_tabs = min([len(i) - len(i.lstrip("\t")) for i in code_fragments])
- code_fragments = [i[initial_tabs:] for i in code_fragments]
- code_fragments.append("")
- code = "\n".join(code_fragments)
- with open(".model.mvc", "w") as f:
- f.write(code)
- f.flush()
- return do_compile(".model.mvc", COMPILER_PATH + "/grammars/modelling.g", "M")
- def _output(expected=None,port=None):
- if port is None:
- port = taskname
- try:
- global last_output
- last_output = json.loads(urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "get_output", "taskname": port}))).read())
- #print("[OUT] %s" % last_output)
- except:
- raise UnknownError()
- if expected is not None and last_output != expected:
- raise InterfaceMismatch(_last_output(), expected)
- return last_output
- def _last_output():
- return last_output
- # Raise common exceptions
- def _handle_output(requested=None, split=None):
- value = _output()
- if value.startswith("Model exists: "):
- raise ModelExists(value.split(": ", 1)[1])
- elif value.startswith("Permission denied"):
- raise PermissionDenied(value.split(": ", 1)[1])
- elif value.startswith("Model not found: "):
- raise UnknownModel(value.split(": ", 1)[1])
- elif value.startswith("Element not found: "):
- raise UnknownIdentifier(value.split(": ", 1)[1])
- elif value.startswith("Element exists: "):
- raise ElementExists(value.split(": ", 1)[1])
- elif value.startswith("Attribute not found: "):
- raise NoSuchAttribute(value.split(": ", 1)[1])
- elif requested is not None and value.startswith(requested):
- if split is None:
- return value
- else:
- splitted = value.strip().split(split, 1)
- if len(splitted) == 1:
- return ""
- else:
- return splitted[1].rstrip()
- else:
- raise InterfaceMismatch(value)
-
- def _model_modify(model_name, metamodel_name):
- """Modify an existing model."""
- global mode
- global prev_mode
- if mode == MODE_MANUAL:
- prev_mode = MODE_MANUAL
- mode = MODE_MODIFY
- return None
- _goto_mode(MODE_MODELLING)
- prev_mode = MODE_MODELLING
- _input(["model_modify", model_name, metamodel_name])
- _handle_output("Success")
- global current_model
- current_model = model_name
- # Mode has changed
- mode = MODE_MODIFY
- _output("Model loaded, ready for commands!")
- def _model_exit():
- """Leave model modify mode."""
- global mode
- global prev_mode
- if prev_mode == MODE_MANUAL:
- mode = MODE_MANUAL
- return
- if mode != MODE_MODIFY:
- raise InvalidMode()
- _input("exit")
- _output("Success")
- mode = MODE_MODELLING
- def alter_context(model_name, metamodel_name):
- global registered_metamodels
- registered_metamodels[model_name] = metamodel_name
- # Main MvC operations
- def init(address_param="http://127.0.0.1:8001", timeout=20.0):
- """Starts up the connection to the Modelverse."""
- global mode
- global address
- global taskname
- address = address_param
- start_time = time.time()
- taskname = random.random()
- while 1:
- try:
- _input_raw('"%s"' % taskname, "task_manager")
- mode = MODE_UNAUTHORIZED
- break
- except URLError as e:
- if time.time() - start_time > timeout:
- raise ConnectionError(e.reason)
- else:
- time.sleep(0.1)
- def login(username, password):
- """Log in a user, if user doesn't exist, it is created."""
- global mode
- _goto_mode(MODE_UNAUTHORIZED)
- _output("Log on as which user?")
- _input(username)
- if _output() == "Password for existing user?":
- _input(password)
- if _output() == "Welcome to the Model Management Interface v2.0!":
- _output("Use the 'help' command for a list of possible commands")
- _input("quiet")
- mode = MODE_MODELLING
- elif _last_output() == "Wrong password!":
- raise PermissionDenied()
- else:
- raise InterfaceMismatch(_last_output())
- elif _last_output() == "This is a new user: please give password!":
- _input(password)
- _output("Please repeat the password")
- _input(password)
- if _output() == "Passwords match!":
- _output("Welcome to the Model Management Interface v2.0!")
- _output("Use the 'help' command for a list of possible commands")
- _input("quiet")
- mode = MODE_MODELLING
- elif _last_output() == "Not the same password!":
- # We just sent the same password, so it should be identical, unless the interface changed
- raise InterfaceMismatch(_last_output())
- else:
- raise InterfaceMismatch(_last_output())
- else:
- raise InterfaceMismatch(_last_output())
- def model_add(model_name, metamodel_name, model_code=None):
- """Instantiate a new model."""
- _goto_mode(MODE_MODELLING)
- # Do this before creating the model, as otherwise compilation errors would make us inconsistent
- if model_code is not None:
- try:
- compiled = _compile_model(model_code)
- except Exception as e:
- raise CompilationError(e)
- else:
- compiled = [0]
- _input(["model_add", metamodel_name, model_name])
- _handle_output("Waiting for model constructors...")
- _input(compiled)
- _output("Success")
- global registered_metamodels
- registered_metamodels[model_name] = metamodel_name
- def upload_code(code):
- try:
- compiled = _compile_AL(code)
- except Exception as e:
- raise CompilationError(e)
- _input(compiled)
- def model_delete(model_name):
- """Delete an existing model."""
- _goto_mode(MODE_MODELLING)
- _input(["model_delete", model_name])
- _handle_output("Success")
- def model_list(location):
- """List all models."""
- _goto_mode(MODE_MODELLING)
- _input(["model_list", location])
- return set(_handle_output("Success: ", split=" ").split("\n"))
- def model_list_full(location):
- """List full information on all models."""
- _goto_mode(MODE_MODELLING)
- _input(["model_list_full", location])
- output = _handle_output("Success: ", split=" ")
- if output == "":
- return set([])
- lst = set([])
- value = output.strip().split("\n")
- for v in value:
- m = v.strip()
- perm, own, grp, m = m.split(" ", 3)
- lst.add((m, own, grp, perm))
- return lst
- def verify(model_name, metamodel_name=None):
- """Verify if a model conforms to its metamodel."""
- _goto_mode(MODE_MODELLING)
- if metamodel_name is None:
- metamodel_name = _get_metamodel(model_name)
- _input(["verify", model_name, metamodel_name])
- return _handle_output("Success: ", split=" ")
- def model_overwrite(model_name, new_model=None, metamodel_name=None):
- """Upload a new model and overwrite an existing model."""
- _goto_mode(MODE_MODIFY, model_name)
- if new_model is not None:
- try:
- compiled = _compile_model(new_model)
- except Exception as e:
- raise CompilationError(e)
- else:
- compiled = [0]
- _input("upload")
- _handle_output("Waiting for model constructors...")
- _input(compiled)
- _output("Success")
- if metamodel_name is not None:
- global registered_metamodels
- registered_metamodels[model_name] = metamodel_name
- def user_logout():
- """Log out the current user and break the connection."""
- global mode
- _goto_mode(MODE_MODELLING)
- _input("exit")
- mode = MODE_UNCONNECTED
- def user_delete():
- """Removes the current user and break the connection."""
- global mode
- _goto_mode(MODE_MODELLING)
- _input("self-destruct")
- mode = MODE_UNCONNECTED
- def model_render(model_name, mapper_name):
- """Fetch a rendered verion of a model."""
- _goto_mode(MODE_MODELLING)
- _input(["model_render", model_name, mapper_name])
- return json.loads(_handle_output("Success: ", split=" "))
- def transformation_between(source, target):
- _goto_mode(MODE_MODELLING)
- _input(["transformation_between", source, target])
- output = _handle_output("Success: ", split=" ")
- if output == "":
- return set([])
- return set([v for v in output.split("\n")])
- def transformation_add_MT(source_metamodels, target_metamodels, operation_name, code, callback=lambda: None):
- """Create a new model transformation."""
- global mode
- _goto_mode(MODE_MODELLING)
- import time
- start = time.time()
- try:
- compiled = _compile_model(code)
- except Exception as e:
- raise CompilationError(e)
- #print("Compilation took: %ss" % (time.time() - start))
- start = time.time()
- mv_dict_rep = _dict_to_list(source_metamodels) + [""] + _dict_to_list(target_metamodels) + [""]
- _input(["transformation_add_MT"] + mv_dict_rep + [operation_name])
- # Possibly modify the merged metamodel first (add tracability links)
- if len(source_metamodels) + len(target_metamodels) > 0:
- mode = MODE_MANUAL
- _output("Model loaded, ready for commands!")
- callback()
- _input("exit")
- mode = MODE_MODELLING
- #print("Callbacks took: %ss" % (time.time() - start))
- start = time.time()
- # Done, so RAMify and upload the model
- _handle_output("Waiting for model constructors...")
- _input(compiled)
- _handle_output("Success")
- #print("Upload and RAMify took: %ss" % (time.time() - start))
- def transformation_add_AL(source_metamodels, target_metamodels, operation_name, code, callback=lambda: None):
- """Create a new action language model, which can be executed."""
- global mode
- _goto_mode(MODE_MODELLING)
- try:
- compiled = _compile_AL(code)
- except Exception as e:
- raise CompilationError(e)
- mv_dict_rep = _dict_to_list(source_metamodels) + [""] + _dict_to_list(target_metamodels) + [""]
- _input(["transformation_add_AL"] + mv_dict_rep + [operation_name])
- # Possibly modify the merged metamodel first (add tracability links)
- if len(source_metamodels) + len(target_metamodels) > 0:
- mode = MODE_MANUAL
- _output("Model loaded, ready for commands!")
- callback()
- _input("exit")
- mode = MODE_MODELLING
- _handle_output("Waiting for code constructors...")
- _input(compiled)
- _output("Success")
- def transformation_add_MANUAL(source_metamodels, target_metamodels, operation_name, callback=lambda: None):
- """Create a new manual model operation."""
- global mode
- _goto_mode(MODE_MODELLING)
- mv_dict_rep = _dict_to_list(source_metamodels) + [""] + _dict_to_list(target_metamodels) + [""]
- _input(["transformation_add_MANUAL"] + mv_dict_rep + [operation_name])
- # Possibly modify the merged metamodel first (add tracability links)
- if len(source_metamodels) + len(target_metamodels) > 0:
- mode = MODE_MANUAL
- _output("Model loaded, ready for commands!")
- callback()
- _input("exit")
- mode = MODE_MODELLING
- _handle_output("Success")
- def transformation_execute_AL(operation_name, input_models_dict, output_models_dict, statechart=None):
- """Execute an existing model operation."""
- global mode
- _goto_mode(MODE_MODELLING)
- mv_dict_rep = _dict_to_list(input_models_dict) + [""] + _dict_to_list(output_models_dict) + [""]
- _input(["transformation_execute", operation_name] + mv_dict_rep)
- _handle_output("Success: ready for AL execution")
- if statechart is not None:
- inputs = [None]
- outputs = [None]
- thrd = threading.Thread(target=exec_statechart, args=[statechart, inputs, outputs])
- thrd.daemon = True
- thrd.start()
- # On this remote thread, we are running the statechart, producing output and consuming input
- # We bind to this input and output by passing references to the function, which it must update to reflect the input/output queue
- while 1:
- # Poll for outputs to send
- try:
- outp = outputs[0].fetch(0)
- print("Got SC output: " + str(outp))
- _input(outp)
- except Queue.Empty:
- if not thrd.is_alive():
- # No more outputs, and the SC is not even alive anymore, so stop
- break
- # Got termination message, so we are done!
- if _output() == "Success":
- return True
- else:
- return False
- def transformation_execute_MANUAL(operation_name, input_models_dict, output_models_dict, callback=lambda i: None):
- """Execute an existing model operation."""
- global mode
- _goto_mode(MODE_MODELLING)
- mv_dict_rep = _dict_to_list(input_models_dict) + [""] + _dict_to_list(output_models_dict) + [""]
- _input(["transformation_execute", operation_name] + mv_dict_rep)
- _handle_output("Success: ready for MANUAL execution")
- # Skip over the begin of mini_modify
- _handle_output("Please perform manual operation ")
- _output("Model loaded, ready for commands!")
- # We are now executing, so everything we get is part of the dialog, except if it is the string for transformation termination
- mode = MODE_MANUAL
- callback()
- # Finished, so leave
- _input("exit")
- mode = MODE_MODELLING
- # Got termination message, so we are done!
- if _output() == "Success":
- return True
- else:
- return False
- def transformation_execute_MT(operation_name, input_models_dict, output_models_dict, statechart=None):
- """Execute an existing model operation."""
- global mode
- _goto_mode(MODE_MODELLING)
- mv_dict_rep = _dict_to_list(input_models_dict) + [""] + _dict_to_list(output_models_dict) + [""]
- _input(["transformation_execute", operation_name] + mv_dict_rep)
- _handle_output("Success: ready for MT execution")
- # We are now executing, so everything we get is part of the dialog, except if it is the string for transformation termination
- while _output() not in ["Success", "Failure"]:
- mode = MODE_DIALOG
- reply = callback(_last_output())
- mode = MODE_MODELLING
- if reply is not None:
- _input(reply)
- # Got termination message, so we are done!
- if _last_output() == "Success":
- return True
- else:
- return False
- def transformation_list():
- """List existing model operations."""
- _goto_mode(MODE_MODELLING)
- _input("transformation_list")
- output = _handle_output("Success: ", split=" ")
- if output == "":
- return set([])
- lst = set([])
- value = output.strip().split("\n")
- for v in value:
- t, m = v.strip().split(" ", 1)
- t = t[1:-1].strip()
- m = m.strip().split(":")[0].strip()
- lst.add((t, m))
- return lst
- def process_execute(process_name, prefix, callbacks):
- """Execute a process model."""
- global mode
- _goto_mode(MODE_MODELLING)
- _input(["process_execute", process_name, prefix])
- _handle_output("Success")
- reuse = False
- while reuse or _output() != "Success":
- reuse = False
- output = _last_output()
- if output.startswith("Enacting "):
- # Next activity!
- t = output.split(" ", 1)[1].split(":", 1)[0]
- name = output.split(": ", 1)[1]
- if name in callbacks:
- callback = callbacks[name]
- if t == "ModelTransformation" or t == "ActionLanguage":
- while not (_output().startswith("Enacting ") or _last_output() == "Success"):
- mode = MODE_DIALOG
- reply = callback(_last_output())
- mode = MODE_MODELLING
- if reply is not None:
- _input(reply)
- if _last_output().startswith("Enacting "):
- reuse = True
- elif t == "ManualOperation":
- _handle_output("Please perform manual operation ")
- _output("Model loaded, ready for commands!")
- mode = MODE_MANUAL
- callback()
- _input("exit")
- mode = MODE_MODELLING
- def permission_modify(model_name, permissions):
- """Modify permissions of a model."""
- _goto_mode(MODE_MODELLING)
- _input(["permission_modify", model_name, permissions])
- _handle_output("Success")
- def permission_owner(model_name, owner):
- """Modify the owning user of a model."""
- _goto_mode(MODE_MODELLING)
- _input(["permission_owner", model_name, owner])
- _handle_output("Success")
- def permission_group(model_name, group):
- """Modify the owning group of a model."""
- _goto_mode(MODE_MODELLING)
- _input(["permission_group", model_name, group])
- _handle_output("Success")
- def group_create(group_name):
- """Create a new group."""
- _goto_mode(MODE_MODELLING)
- _input(["group_create", group_name])
- _handle_output("Success")
- def group_delete(group_name):
- """Delete a group of which you are an owner."""
- _goto_mode(MODE_MODELLING)
- _input(["group_delete", group_name])
- _handle_output("Success")
- def group_owner_add(group_name, user_name):
- """Add a new owning user to a group you own."""
- _goto_mode(MODE_MODELLING)
- _input(["owner_add", group_name, user_name])
- _handle_output("Success")
- def group_owner_delete(group_name, user_name):
- """Delete an owning user to a group you own."""
- _goto_mode(MODE_MODELLING)
- _input(["owner_delete", group_name, user_name])
- _handle_output("Success")
- def group_join(group_name, user_name):
- """Add a new user to a group you own."""
- _goto_mode(MODE_MODELLING)
- _input(["group_join", group_name, user_name])
- _handle_output("Success")
- def group_kick(group_name, user_name):
- """Delete a user from a group you own."""
- _goto_mode(MODE_MODELLING)
- _input(["group_kick", group_name, user_name])
- _handle_output("Success")
- def group_list():
- """List existing groups."""
- _goto_mode(MODE_MODELLING)
- _input(["group_list"])
- _handle_output("Success")
- def admin_promote(user_name):
- """Promote a user to admin status."""
- _goto_mode(MODE_MODELLING)
- _input(["admin_promote", user_name])
- _handle_output("Success")
- def admin_demote():
- """Demote a user from admin status."""
- _goto_mode(MODE_MODELLING)
- _input(["admin_demote", user_name])
- _handle_output("Success")
- # Actual operations on the model
- def element_list(model_name):
- """Return a list of all IDs and the type of the element"""
- _goto_mode(MODE_MODIFY, model_name)
- _input("list_full")
- lst = set([])
- output = _handle_output("Success: ", split=" ")
- if output == "":
- return set([])
- for v in output.split("\n"):
- m, mm = v.split(":")
- m = m.strip()
- mm = mm.strip()
- lst.add((m, mm))
- return lst
- def types(model_name):
- """Return a list of all types usable in the model"""
- _goto_mode(MODE_MODIFY, model_name)
- _input("types")
- lst = set([])
- output = _handle_output("Success: ", split=" ")
- if output == "":
- return set([])
- for v in output.split("\n"):
- m, mm = v.split(":")
- m = m.strip()
- lst.add(m)
- return lst
- def types_full(model_name):
- """Return a list of full types usable in the model"""
- _goto_mode(MODE_MODIFY, model_name)
- _input("types")
- lst = set([])
- output = _handle_output("Success: ", split=" ")
- if output == "":
- return set([])
- for v in output.split("\n"):
- m, mm = v.split(":")
- m = m.strip()
- mm = mm.strip()
- lst.add((m, mm))
- return lst
- def read(model_name, ID):
- """Return a tuple of information on the element: its type and source/target (None if not an edge)"""
- _goto_mode(MODE_MODIFY, model_name)
- _input(["read", ID])
- output = _handle_output("Success: ", split=" ")
- v = output.split("\n")
- t = v[1].split(":")[1].strip()
- if (not v[2].startswith("Source:")):
- rval = (t, None)
- else:
- src = v[2].split(":")[1].strip()
- trg = v[3].split(":")[1].strip()
- rval = (t, (src, trg))
- return rval
- def read_attrs(model_name, ID):
- """Return a dictionary of attribute value pairs"""
- _goto_mode(MODE_MODIFY, model_name)
- _input(["read", ID])
- output = _handle_output("Success: ", split=" ")
- v = output.split("\n")
- searching = True
- rval = {}
- for r in v:
- if searching:
- if r == "Attributes:":
- # Start working on attributes
- searching = False
- else:
- key, value = r.split(":", 1)
- _, value = value.split("=", 1)
- key = json.loads(key.strip())
- value = value.strip()
- if value == "None":
- value = None
- elif value == "True":
- value = True
- elif value == "False":
- value = False
- else:
- value = json.loads(value)
- rval[key] = value
- return rval
- def instantiate(model_name, typename, edge=None, ID=""):
- """Create a new instance of the specified typename, between the selected elements (if not None), and with the provided ID (if any)"""
- _goto_mode(MODE_MODIFY, model_name)
- if edge is None:
- _input(["instantiate_node", typename, ID])
- else:
- _input(["instantiate_edge", typename, ID, edge[0], edge[1]])
- return _handle_output("Success: ", split=" ")
- def delete_element(model_name, ID):
- """Delete the element with the given ID"""
- _goto_mode(MODE_MODIFY, model_name)
- _input(["delete", ID])
- _handle_output("Success")
- def attr_assign(model_name, ID, attr, value):
- """Assign a value to an attribute"""
- _check_type(value)
- _goto_mode(MODE_MODIFY, model_name)
- _input(["attr_add", ID, attr, value])
- _handle_output("Success")
- def attr_assign_code(model_name, ID, attr, code):
- """Assign a piece of Action Language code to the attribute"""
- _check_type(code)
- try:
- compiled = _compile_AL(code)
- except Exception as e:
- raise CompilationError(e)
- _goto_mode(MODE_MODIFY, model_name)
- _input(["attr_add", ID, attr])
- _handle_output("Waiting for code constructors...")
- _input(compiled)
- _output("Success")
- def attr_delete(model_name, ID, attr):
- """Remove an attribute."""
- _goto_mode(MODE_MODIFY, model_name)
- _input(["attr_del", ID, attr])
- _handle_output("Success")
- def read_outgoing(model_name, ID, typename):
- """Returns a list of all outgoing associations of a specific type ("" = all)"""
- _goto_mode(MODE_MODIFY, model_name)
- _input(["read_outgoing", ID, typename])
- output = _handle_output("Success: ", split=" ")
- if output == "":
- return set([])
- else:
- return set(output.split("\n"))
- def read_incoming(model_name, ID, typename):
- """Returns a list of all incoming associations of a specific type ("" = all)"""
- _goto_mode(MODE_MODIFY, model_name)
- _input(["read_incoming", ID, typename])
- output = _handle_output("Success: ", split=" ")
- if output == "":
- return set([])
- else:
- return set(output.split("\n"))
- def read_association_source(model_name, ID):
- """Returns the source of an association."""
- _goto_mode(MODE_MODIFY, model_name)
- _input(["read_association_source", ID])
- return _handle_output("Success: ", split=" ")
- def read_association_destination(model_name, ID):
- """Returns the destination of an association."""
- _goto_mode(MODE_MODIFY, model_name)
- _input(["read_association_destination", ID])
- return _handle_output("Success: ", split=" ")
- ##### To document:
- def service_register(name, function):
- """Register a function as a service with a specific name."""
- def service_process(port):
- while 1:
- thrd = threading.Thread(target=function, args=[service_get(port)])
- thrd.daemon = True
- thrd.start()
- global mode
- _goto_mode(MODE_MODELLING)
- _input(["service_register", name])
- # Now we are in service-mode
- mode = MODE_SERVICE
- port = _handle_output("Success: ", split=" ")
- # Process events in the background!
- threading.Thread(target=service_process, args=[port]).start()
- def service_stop():
- """Stop the currently executing process."""
- _goto_mode(MODE_SERVICE)
- _input("service_stop")
- _handle_output("Success")
- global mode
- mode = MODE_MODELLING
- def service_get(port):
- """Get the values on the specified port."""
- _goto_mode(MODE_SERVICE)
- return _output(port=port)
- def service_set(port, value):
- """Set a value on a specified port."""
- _check_type_list(value)
- _goto_mode(MODE_SERVICE)
- _input(value, port=port)
- def user_password(user, password):
- """Change a user's password."""
- raise NotImplementedError()
- def transformation_read_signature(transformation):
- """Reads an operation's signature, specifying the names and their required types."""
- raise NotImplementedError()
- def element_list_nice(model_name):
- """Fetches a nice representation of models."""
- _goto_mode(MODE_MODELLING)
- _input(["element_list_nice", model_name, _get_metamodel(model_name)])
- data = _handle_output("Success: ", split=" ")
- try:
- return json.loads(data)
- except:
- print(data)
- raise
- def connections_between(model_name, source_element, target_element):
- """Gets a list of all allowed connections between the source and target element in the model."""
- _goto_mode(MODE_MODIFY, model_name)
- _input(["connections_between", source_element, target_element])
- output = _handle_output("Success: ", split=" ")
- if output == "":
- return set([])
- else:
- return set(output.split("\n"))
- def define_attribute(model_name, node, attr_name, attr_type):
- """Create a new attribute, which can be instantiated one meta-level below."""
- _goto_mode(MODE_MODIFY, model_name)
- _input(["define_attribute", node, attr_name, attr_type])
- return _handle_output("Success: ", split=" ")
- def all_instances(model_name, type_name):
- """Returns a list of all elements of a specific type."""
- _goto_mode(MODE_MODIFY, model_name)
- _input(["all_instances", type_name])
- output = _handle_output("Success: ", split=" ")
- if output == "":
- return set([])
- else:
- return set(output.split("\n"))
- def service_poll(port):
- """Checks whether or not the Modelverse side has any input ready to be processed."""
- raise NotImplementedError()
- def user_name(user, username):
- """Change a user's name."""
- raise NotImplementedError()
- def remove_conformance(model_name, metamodel_name):
- """Remove a metamodel for a model."""
- _goto_mode(MODE_MODELLING)
- _input(["remove_conformance", model_name, metamodel_name])
- _handle_output("Success")
- def add_conformance(model_name, metamodel_name, partial_type_mapping=None):
- """Add a metamodel for a model."""
- raise NotImplementedError()
- _goto_mode(MODE_MODELLING)
- _input(["add_conformance", model_name, metamodel_name])
- _handle_output("Success")
- def folder_create(folder_name):
- _goto_mode(MODE_MODELLING)
- _input(["folder_create", folder_name])
- _handle_output("Success")
|