123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613 |
- import urllib
- import urllib2
- import json
- import random
- from urllib2 import URLError
- import sys
- # Bind to the compiler
- sys.path.append("../interface/HUTN")
- from hutn_compiler.compiler import main as do_compile
- # Helper functions and configuration: do not use yourself!
- taskname = random.random()
- address = None
- last_output = None
- mode = 0
- def _input(value):
- # Ugly json encoding of primitives
- if isinstance(value, type([])):
- value = json.dumps(value)
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "data": value, "taskname": taskname}))).read()
- else:
- value = json.dumps(value)
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": value, "taskname": taskname}))).read()
- def _input_raw(value, taskname):
- # Ugly json encoding of primitives
- urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "set_input", "value": value, "taskname": taskname}))).read()
- def _compile_AL(code):
- # Compile an action language file and send the compiled code
- code_fragments = code.split("\n")
- code_fragments = [i for i in code_fragments if i.strip() != ""]
- code_fragments = [i.replace(" ", "\t") for i in code_fragments]
- initial_tabs = min([len(i) - len(i.lstrip("\t")) for i in code_fragments])
- code_fragments = [i[initial_tabs:] for i in code_fragments]
- code_fragments.append("")
- code = "\n".join(code_fragments)
- with open("__constraint.alc", "w") as f:
- f.write(code)
- f.flush()
- _input(do_compile("__constraint.alc", "../interface/HUTN/grammars/actionlanguage.g", "CS"))
- def _compile_model(code):
- # Compile a model and send the compiled graph
- # First change multiple spaces to a tab
- code_fragments = code.split("\n")
- code_fragments = [i for i in code_fragments if i.strip() != ""]
- code_fragments = [i.replace(" ", "\t") for i in code_fragments]
- initial_tabs = min([len(i) - len(i.lstrip("\t")) for i in code_fragments])
- code_fragments = [i[initial_tabs:] for i in code_fragments]
- code_fragments.append("")
- code = "\n".join(code_fragments)
- with open("__model.mvc", "w") as f:
- f.write(code)
- f.flush()
- _input(do_compile("__model.mvc", "../interface/HUTN/grammars/modelling.g", "M") + ["exit"])
- def _output():
- try:
- global last_output
- last_output = json.loads(urllib2.urlopen(urllib2.Request(address, urllib.urlencode({"op": "get_output", "taskname": taskname}))).read())
- return last_output
- except:
- raise UnknownError()
- def _last_output():
- return last_output
- def _consume_to_end():
- while (_output() not in ["Ready for command...", "Please give your command."]):
- pass
- # Exceptions
- class ModelverseException(Exception):
- pass
- class UnknownException(ModelverseException):
- pass
- class UnknownIdentifier(ModelverseException):
- pass
- class UnknownType(ModelverseException):
- pass
- class UnsupportedValue(ModelverseException):
- pass
- class CompilationError(ModelverseException):
- pass
- class NoSuchAttribute(ModelverseException):
- pass
- class UnknownModel(ModelverseException):
- pass
- class ConnectionError(ModelverseException):
- pass
- class ModelExists(ModelverseException):
- pass
- class PermissionDenied(ModelverseException):
- pass
- class InvalidMode(ModelverseException):
- pass
- # Main MvC operations
- def init(address_param="http://localhost:8001"):
- """Starts up the connection to the Modelverse."""
- # return None
- # raises ConnectionError
- # raises UnknownError
- # raises InvalidMode
- global mode
- if mode != 0:
- raise InvalidMode()
- global address
- address = address_param
- try:
- _input_raw('"%s"' % taskname, "task_manager")
- mode = 1
- except URLError as e:
- raise ConnectionError(e.reason)
- def login(username, password):
- """Log in an existing user."""
- # return None
- # raises UnknownError
- # raises PermissionDenied
- global mode
- if mode != 1:
- raise InvalidMode()
- _input(username)
- _input(password)
- _consume_to_end()
- mode = 2
- def register(username, password):
- """Register a new user."""
- # return None
- # raises UnknownError
- # raises UserExists
- global mode
- if mode != 1:
- raise InvalidMode()
- _input(username)
- _input(password)
- _input(password)
- _consume_to_end()
- mode = 2
- def model_add(model_name, metamodel_name, model_code=None):
- """Instantiate a new model."""
- # return None
- # raises UnknownModel
- # raises ModelExists
- # raises UnknownError
- # raises PermissionDenied
- # raises CompilationError
- if mode != 2:
- raise InvalidMode()
- _input("model_add")
- _output()
- _output()
- _input(metamodel_name)
- if _output() == "Model name?":
- _input(model_name)
- if _output() == "Waiting for model constructors...":
- try:
- if model_code == None:
- _input("exit")
- else:
- _compile_model(model_code)
- except Exception:
- raise CompilationError()
- _consume_to_end()
- else:
- _consume_to_end()
- raise ModelExists()
- elif _last_output() == "Could not find type model!":
- _consume_to_end()
- raise UnknownModel()
- elif _last_output() == "Permission denied":
- _consume_to_end()
- raise PermissionDenied()
- def model_modify(model_name):
- """Modify an existing model."""
- # return is_write
- # raises UnknownModel
- # raises PermissionDenied
- # raises UnknownError
- global mode
- if mode != 2:
- raise InvalidMode()
- _input("model_modify")
- _output()
- _input(model_name)
- if _output() == "Permission denied":
- _consume_to_end()
- raise PermissionDenied()
- elif _last_output() == "Could not find model!":
- _consume_to_end()
- raise UnknownModel()
- elif _last_output() == "Model loaded, ready for commands!":
- mode = 3
- if ("r/w" in _output()):
- write = True
- else:
- write = False
- _consume_to_end()
- return write
- else:
- _consume_to_end()
- raise UnknownException()
- def model_list():
- """List all models."""
- # return [(model1, metamodel1), (model2, metamodel2), ...]
- # raises UnknownError
- if mode != 2:
- raise InvalidMode()
- _input("model_list")
- lst = []
- while (_output() != "Ready for command..."):
- v = _last_output()
- m, mm = v.split(":")
- m = m.strip()
- mm = mm.strip()
- lst.append((m, mm))
- return lst
- def model_list_full():
- """List full information on all models."""
- # return [(model1, metamodel1, owner1, group1, permissions1), (model2, metamodel2, owner2, group2, permissions2), ...]
- # raises UnknownError
- if mode != 2:
- raise InvalidMode()
- _input("model_list_full")
- lst = []
- while (_output() != "Ready for command..."):
- v = _last_output()
- m, mm = v.split(":")
- m = m.strip()
- mm = mm.strip()
- perm, own, grp, m = m.split(" ")
- lst.append((m, mm, own, grp, perm))
- return lst
- def model_overwrite(model_name, new_model):
- """Upload a new model and overwrite an existing model."""
- # return None
- # raises UnknownModel
- # raises PermissionDenied
- # raises CompilationError
- # raises UnknownError
- if mode != 2:
- raise InvalidMode()
- _input("model_overwrite")
- _input(model_name)
- _compile_model(new_model)
- def user_logout():
- """Log out the current user. A new login will be required afterwards."""
- # return None
- # raises UnknownException
- global mode
- if mode != 2:
- raise InvalidMode()
- _input("exit")
- mode = 1
- def user_delete():
- """Removes the current user. A new login will be required afterwards."""
- # return None
- # raises UnknownException
- global mode
- if mode != 2:
- raise InvalidMode()
- _input("self-destruct")
- mode = 1
- # Actual operations on the model
- def element_list():
- """Return a list of all IDs and the type of the element"""
- # return [(name1, type1), (name2, type2), ...]
- # raises UnknownError
- if mode != 3:
- raise InvalidMode()
- _input("list")
- lst = []
- _output()
- while (_output() != "Please give your command."):
- v = _last_output()
- m, mm = v.split(":")
- m = m.strip()
- mm = mm.strip()
- lst.append((m, mm))
- return lst
- def types():
- """Return a list of all types usable in the model"""
- # return [type1, type2, ...]
- # raises UnknownError
- if mode != 3:
- raise InvalidMode()
- _input("types")
- _output()
- lst = []
- while (_output() != "Please give your command."):
- v = _last_output()
- m, mm = v.split(":")
- m = m.strip()
- lst.append(m)
- return lst
- def read(ID):
- """Return a tuple of information on the element: its type and source/target (None if not an edge)"""
- # return (type, (source, target))
- # raises UnknownError
- # raises UnknownIdentifier
- if mode != 3:
- raise InvalidMode()
- _input("read")
- _output()
- _input(ID)
- _output()
- if _last_output() == "Unknown element; aborting":
- _consume_to_end()
- raise UnknownIdentifier()
- else:
- t = _output().split(":")[1].strip()
- if (not _output().startswith("Source:")):
- rval = (t, None)
- else:
- src = _last_output().split(":")[1].strip()
- trg = _output().split(":")[1].strip()
- rval = (t, (src, trg))
- while (_output() != "Please give your command."):
- pass
- return rval
- def read_attrs(ID):
- """Return a dictionary of attribute value pairs"""
- # return {attr1: value1, attr2: value2, ...}
- # raises UnknownError
- # raises UnknownIdentifier
- if mode != 3:
- raise InvalidMode()
- _input("read")
- _output()
- _input(ID)
- _output()
- if _last_output() == "Unknown element; aborting":
- _consume_to_end()
- raise UnknownIdentifier()
- else:
- rval = {}
- while (_output() != "Attributes:"):
- pass
- while (_output() != "Please give your command."):
- r = _last_output()
- key, value = r.split(":")
- _, value = value.split("=")
- key = json.loads(key.strip())
- value = value.strip()
- value = None if value == "None" else json.loads(value)
- rval[key] = value
- return rval
- def instantiate(typename, edge=None, ID=""):
- """Create a new instance of the specified typename, between the selected elements (if not None), and with the provided ID (if any)"""
- # return instantiated_ID
- # raises UnknownError
- # raises UnknownType
- # raises UnknownIdentifier
- # raises NotAnEdge
- if mode != 3:
- raise InvalidMode()
- _input("instantiate")
- if (_output() == "Permission denied"):
- _consume_to_end()
- raise PermissionDenied()
- else:
- _input(typename)
- if (_output() == "Name of new element?"):
- _input(ID)
- if (_output() == "Element already exists; aborting"):
- _consume_to_end()
- raise ElementExists()
- if (edge is not None):
- if (_last_output() == "Source name?"):
- _input(edge[0])
- if (_output() != "Destination name?"):
- _consume_to_end()
- raise UnknownIdentifier()
- _input(edge[1])
- ID = _output()
- if (ID == "Unknown destination; aborting"):
- _consume_to_end()
- raise UnknownIdentifier()
- else:
- _consume_to_end()
- return ID
- else:
- _consume_to_end()
- raise NotAnEdge()
- else:
- ID = _last_output()
- _consume_to_end()
- return ID
- elif (_last_output() == "Permission denied"):
- _consume_to_end()
- raise PermissionDenied()
- elif (_last_output() == "Unknown type specified; aborting"):
- _consume_to_end()
- raise UnknownType()
- else:
- print(9)
- print(_last_output())
- def delete(ID):
- """Delete the element with the given ID"""
- # return None
- # raises UnknownError
- # raises UnknownIdentifier
- if mode != 3:
- raise InvalidMode()
- _input("delete")
- _input(ID)
- def attr_assign(ID, attr, value):
- """Assign a value to an attribute"""
- # return None
- # raises UnknownError
- # raises UnknownIdentifier
- # raises NoSuchAttribute
- # raises UnsupportedValue
- if mode != 3:
- raise InvalidMode()
- _input("attr_modify")
- _input(ID)
- _input(attr)
- _input(value)
- _consume_to_end()
- def attr_assign_code(ID, attr, code):
- """Assign a piece of Action Language code to the attribute"""
- # return None
- # raises UnknownError
- # raises UnknownIdentifier
- # raises NoSuchAttribute
- # raises UnsupportedValue
- if mode != 3:
- raise InvalidMode()
- _input("attr_modify")
- _input(ID)
- _input(attr)
- _compile_AL(code)
- def upload(new_model):
- """Overwrite the current model with the provided model"""
- # return None
- # raises UnknownError
- # raises CompilationError
- if mode != 3:
- raise InvalidMode()
- _input("upload")
- _compile_model(new_model)
- def read_outgoing(ID, typename):
- """Returns a list of all outgoing associations of a specific type ("" = all)"""
- # return [name1, name2, ...]
- # raises UnknownError
- # raises UnknownIdentifier
- # raises UnknownType
- if mode != 3:
- raise InvalidMode()
- _input("read_outgoing")
- _input(ID)
- _input(typename)
- lst = []
- while (_output() != "Please give your command."):
- lst.append(_last_output())
- return lst
- def read_incoming(ID, typename):
- """Returns a list of all incoming associations of a specific type ("" = all)"""
- # return [name1, name2, ...]
- # raises UnknownError
- # raises UnknownIdentifier
- # raises UnknownType
- if mode != 3:
- raise InvalidMode()
- _input("read_incoming")
- _input(ID)
- _input(typename)
- lst = []
- while (_output() != "Please give your command."):
- lst.append(_last_output())
- return lst
- def model_exit():
- """Leave model modify mode."""
- # return None
- # raises UnknownError
- global mode
- if mode != 3:
- raise InvalidMode()
- _input("exit")
- _consume_to_end()
- mode = 2
- def transformation_add_MT_language():
- """Create a new Model Transformation language out of a set of metamodels."""
- raise NotImplementedError()
- def transformation_add_MT():
- """Create a new model transformation."""
- raise NotImplementedError()
- def transformation_add_AL():
- """Create a new action language fragment."""
- raise NotImplementedError()
- def transformation_add_MANUAL():
- """Create a new manual model operation."""
- raise NotImplementedError()
- def transformation_execute():
- """Execute an existing model operation."""
- raise NotImplementedError()
- def transformation_list():
- """List existing model operations."""
- raise NotImplementedError()
- def transformation_list_full():
- """List detailed information on model operations."""
- raise NotImplementedError()
- def transformation_detail():
- """List full details of a a model operation."""
- raise NotImplementedError()
- def transformation_RAMify():
- """Ramify an existing metamodel."""
- raise NotImplementedError()
- def process_execute():
- """Execute a process model."""
- raise NotImplementedError()
- def permission_modify():
- """Modify permissions of a model."""
- raise NotImplementedError()
- def permission_owner():
- """Modify the owning user of a model."""
- raise NotImplementedError()
- def permission_group():
- """Modify the owning group of a model."""
- raise NotImplementedError()
- def group_create():
- """Create a new group."""
- raise NotImplementedError()
- def group_delete():
- """Delete a group of which you are an owner."""
- raise NotImplementedError()
- def group_owner_add():
- """Add a new owning user to a group you own."""
- raise NotImplementedError()
- def group_owner_delete():
- """Delete an owning user to a group you own."""
- raise NotImplementedError()
- def group_join():
- """Add a new user to a group you own."""
- raise NotImplementedError()
- def group_kick():
- """Delete a user from a group you own."""
- raise NotImplementedError()
- def group_list():
- """List existing groups."""
- raise NotImplementedError()
- def admin_promote():
- """Promote a user to admin status."""
- raise NotImplementedError()
- def admin_demote():
- """Demote a user from admin status."""
- raise NotImplementedError()
|