123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418 |
- from modelverse_kernel.primitives import PrimitiveFinished
- def reverseKeyLookup(a, b, **remainder):
- edges = yield [("RO", [a])]
- expanded_edges = yield [("RE", [i]) for i in edges]
- for i, edge in enumerate(expanded_edges):
- if b == edge[1]:
- # Found our edge: edges[i]
- outgoing = yield [("RO", [edges[i]])]
- result = yield [("RE", [outgoing[0]])]
- raise PrimitiveFinished(result[1])
- result = yield [("CNV", ["(unknown: %s)" % b])]
- raise PrimitiveFinished(result)
- def read_attribute(a, b, c, **remainder):
- def make_list(v, l):
- return [v] if l else v
- model_dict, b_val, c_val, type_mapping = \
- yield [("RD", [a, "model"]),
- ("RV", [b]),
- ("RV", [c]),
- ("RD", [a, "type_mapping"]),
- ]
- model_instance = \
- yield [("RD", [model_dict, b_val])]
- edges = yield [("RO", [model_instance])]
- edge_types = yield [("RDN", [type_mapping, i]) for i in edges]
- edge_types = make_list(edge_types, len(edges) == 1)
- type_edge_val = yield [("RE", [i]) for i in edge_types]
- type_edge_val = make_list(type_edge_val, len(edges) == 1)
- src_nodes = set([i[0] for i in type_edge_val])
- found_edges = yield [("RDE", [i, c_val]) for i in src_nodes]
- found_edges = make_list(found_edges, len(src_nodes) == 1)
- for e1 in found_edges:
- if e1 is not None:
- # Found an edge!
- for i, e2 in enumerate(edge_types):
- if e1 == e2:
- # The instance of this edge is the one we want!
- edge = edges[i]
- edge_val = yield [("RE", [edge])]
- result = edge_val[1]
- raise PrimitiveFinished(result)
- else:
- result = yield [("RR", [])]
- raise PrimitiveFinished(result)
- raise Exception("Error in reading edge!")
- def precompute_cardinalities(a, **remainder):
- result = yield [("CN", [])]
- # Read out all edges from the metamodel
- a = yield [("RD", [a, "metamodel"])]
- model_dict = yield [("RD", [a, "model"])]
- model_keys = yield [("RDK", [model_dict])]
- type_mapping = yield [("RD", [a, "type_mapping"])]
- elems = yield [("RDN", [model_dict, k]) for k in model_keys]
- model_keys_str= yield [("RV", [i]) for i in model_keys]
- elem_to_name = dict(zip(elems, model_keys_str))
- edges = yield [("RE", [i]) for i in elems]
- elems = [elems[i] for i, edge_val in enumerate(edges) if edge_val is not None]
- # Now we have all edges in the metamodel
- # Read out the type of the Association defining all cardinalities
- metamodel = yield [("RD", [a, "metamodel"])]
- metametamodel = yield [("RD", [metamodel, "metamodel"])]
- metametamodel_dict = \
- yield [("RD", [metametamodel, "model"])]
- assoc = yield [("RD", [metametamodel_dict, "Association"])]
- slc, suc, tlc, tuc = \
- yield [("RDE", [assoc, "source_lower_cardinality"]),
- ("RDE", [assoc, "source_upper_cardinality"]),
- ("RDE", [assoc, "target_lower_cardinality"]),
- ("RDE", [assoc, "target_upper_cardinality"]),
- ]
- # All that we now have to do is find, for each edge, whether or not it has an edge typed by any of these links!
- # Just find all links typed by these links!
- types = yield [("RDN", [type_mapping, i]) for i in elems]
- cardinalities = {}
- for i, edge_type in enumerate(types):
- if edge_type == slc:
- t = "slc"
- elif edge_type == suc:
- t = "suc"
- elif edge_type == tlc:
- t = "tlc"
- elif edge_type == tuc:
- t = "tuc"
- else:
- continue
-
- # Found a link, so add it
- source, destination = yield [("RE", [elems[i]])]
- # The edge gives the "source" the cardinality found in "destination"
- cardinalities.setdefault(elem_to_name[source], {})[t] = destination
- # Now we have to translate the "cardinalities" Python dictionary to a Modelverse dictionary
- nodes = yield [("CN", []) for i in cardinalities]
- yield [("CD", [result, i, node]) for i, node in zip(cardinalities.keys(), nodes)]
- l = cardinalities.keys()
- values = yield [("RD", [result, i]) for i in l]
- for i, value in enumerate(values):
- cards = cardinalities[l[i]]
- yield [("CD", [value, card_type, cards[card_type]]) for card_type in cards]
- raise PrimitiveFinished(result)
- def set_copy(a, **remainder):
- b = yield [("CN", [])]
- links = yield [("RO", [a])]
- exp_links = yield [("RE", [i]) for i in links]
- if len(links) == 1:
- exp_links = [exp_links]
- _ = yield [("CE", [b, i[1]]) for i in exp_links]
- raise PrimitiveFinished(b)
- def allInstances(a, b, **remainder):
- b_val = yield [("RV", [b])]
- model_dict= yield [("RD", [a, "model"])]
- metamodel = yield [("RD", [a, "metamodel"])]
- mm_dict = yield [("RD", [metamodel, "model"])]
- typing = yield [("RD", [a, "type_mapping"])]
- elem_keys = yield [("RDK", [model_dict])]
- elems = yield [("RDN", [model_dict, i]) for i in elem_keys]
- mms = yield [("RDN", [typing, i]) for i in elems]
- # Have the type for each name
- types_to_name_nodes = {}
- for key, mm in zip(elem_keys, mms):
- types_to_name_nodes.setdefault(mm, set()).add(key)
- # And now we have the inverse mapping: for each type, we have the node containing the name
- # Get the inheritance link type
- inheritance_type = yield [("RD", [metamodel, "inheritance"])]
- # Now we figure out which types are valid for the specified model
- desired_types = set()
- mm_element = yield [("RD", [mm_dict, b_val])]
- work_list = []
- work_list.append(mm_element)
- mm_typing = yield [("RD", [metamodel, "type_mapping"])]
- while work_list:
- mm_element = work_list.pop()
- if mm_element in desired_types:
- # Already been here, so stop
- continue
- # New element, so continue
- desired_types.add(mm_element)
- # Follow all inheritance links that COME IN this node, as all these are subtypes and should also match
- incoming = yield [("RI", [mm_element])]
- for i in incoming:
- t = yield [("RDN", [mm_typing, i])]
- if t == inheritance_type:
- e = yield [("RE", [i])]
- # Add the source of the inheritance link to the work list
- work_list.append(e[0])
- # Now desired_types holds all the direct types that we are interested in!
- # Construct the result out of all models that are direct instances of our specified type
- final = set()
- for t in desired_types:
- final |= types_to_name_nodes.get(t, set())
- # Result is a Python set with nodes, so just make this a Mv set
- result = yield [("CN", [])]
- v = yield [("RV", [i]) for i in final]
- _ = yield [("CE", [result, i]) for i in final]
- raise PrimitiveFinished(result)
- def add_AL(a, b, **remainder):
- worklist = [(b, "funcdef")]
- added = set()
- type_cache = {}
- model_dict = yield [("RD", [a, "model"])]
- metamodel = yield [("RD", [a, "metamodel"])]
- metamodel_dict = yield [("RD", [metamodel, "model"])]
- type_map = yield [("RD", [a, "type_mapping"])]
- outgoing = yield [("RO", [model_dict])]
- edges = yield [("RE", [i]) for i in outgoing]
- added |= set([i[1] for i in edges])
- result = yield [("CNV", ["__%s" % b])]
- # All the action language elements and their expected output links
- type_links = {
- "if": [("cond", ""), ("then", ""), ("else", ""), ("next", "")],
- "while": [("cond", ""), ("body", ""), ("next", "")],
- "assign": [("var", ""), ("value", ""), ("next", "")],
- "break": [("while", "while")],
- "continue": [("while", "while")],
- "return": [("value", "")],
- "resolve": [("var", "")],
- "access": [("var", "")],
- "constant": [("node", "")],
- "output": [("node", ""), ("next", "")],
- "global": [("var", "String"), ("next", "")],
- "param": [("name", "String"), ("value", ""), ("next_param", "param")],
- "funcdef": [("body", ""), ("next", "")],
- "call": [("func", ""), ("params", "param"), ("last_param", "param"), ("next", "")],
- }
- # Already add some often used types to the type cache, so we don't have to check for their presence
- to_str, string = yield [("RD", [metamodel_dict, "to_str"]),
- ("RD", [metamodel_dict, "String"])]
- type_cache = {"to_str": to_str,
- "String": string}
- while worklist:
- # Fetch the element and see if we need to add it
- worknode, expected_type = worklist.pop(0)
- if worknode in added:
- continue
- # Determine type of element
- if expected_type == "":
- value = yield [("RV", [worknode])]
- if (isinstance(value, dict)) and ("value" in value):
- v = value["value"]
- if v in ["if", "while", "assign", "call", "break", "continue", "return", "resolve", "access", "constant", "global", "declare"]:
- expected_type = v
- else:
- expected_type = "Any"
- else:
- expected_type = "Any"
- # Fill the cache
- if expected_type not in type_cache:
- type_cache[expected_type] = yield [("RD", [metamodel_dict, expected_type])]
- # Need to add it now
- yield [("CD", [model_dict, "__%s" % worknode, worknode])]
- added.add(worknode)
- # NOTE can't just use CD here, as the key is a node and not a value
- t1 = yield [("CE", [type_map, type_cache[expected_type]])]
- t2 = yield [("CE", [t1, worknode])]
- if t1 is None or t2 is None:
- raise Exception("ERROR")
- # Now add all its outgoing links, depending on the type we actually saw
- links = type_links.get(expected_type, [])
- for link in links:
- link_name, destination_type = link
- # Check if the link actually exists
- destination = yield [("RD", [worknode, link_name])]
- if destination is not None:
- # If so, we add it and continue
- edge = yield [("RDE", [worknode, link_name])]
- edge_outlinks = yield [("RO", [edge])]
- edge_outlink = edge_outlinks[0]
- edge_name = yield [("RE", [edge_outlink])]
- edge_name = edge_name[1]
- # Now add: edge, edge_outlink, edge_name
- # Add 'edge'
- yield [("CD", [model_dict, "__%s" % edge, edge])]
- added.add(edge)
- link_type = "%s_%s" % (expected_type, link_name)
- if link_type not in type_cache:
- type_cache[link_type] = yield [("RD", [metamodel_dict, link_type])]
- t = yield [("CE", [type_map, type_cache[link_type]])]
- yield [("CE", [t, edge])]
- # Add 'edge_outlink'
- yield [("CD", [model_dict, "__%s" % edge_outlink, edge_outlink])]
- added.add(edge_outlink)
- t = yield [("CE", [type_map, type_cache["to_str"]])]
- yield [("CE", [t, edge_outlink])]
- # Add 'edge_name' (if not present)
- if edge_name not in added:
- yield [("CD", [model_dict, "__%s" % edge_name, edge_name])]
- t = yield [("CE", [type_map, type_cache["String"]])]
- yield [("CE", [t, edge_name])]
- added.add(edge_name)
- # Add the destination to the worklist
- worklist.append((destination, destination_type))
- raise PrimitiveFinished(result)
- def get_superclasses(a, b, **remainder):
- inheritance = yield [("RD", [a, "inheritance"])]
- model_dict = yield [("RD", [a, "model"])]
- b_v = yield [("RV", [b])]
- subclass = yield [("RD", [model_dict, b_v])]
- type_mapping = yield [("RD", [a, "type_mapping"])]
- names = yield [("RDK", [model_dict])]
- elems = yield [("RDN", [model_dict, i]) for i in names]
- elem_to_name = dict(zip(elems, names))
- result = yield [("CN", [])]
- worklist = [subclass]
- while worklist:
- subclass = worklist.pop()
- res = elem_to_name[subclass]
- yield [("CE", [result, res])]
- outgoing = yield [("RO", [subclass])]
- types = yield [("RDN", [type_mapping, i]) for i in outgoing]
- types = [types] if len(outgoing) == 1 else types
- for i, t in enumerate(types):
- if t == inheritance:
- # Found an inheritance link!
- elem = outgoing[i]
- src, dst = \
- yield [("RE", [elem])]
- # Find elem in elems
- worklist.append(dst)
- raise PrimitiveFinished(result)
- def selectPossibleIncoming(a, b, c, **remainder):
- model_dict = yield [("RD", [a, "model"])]
- limit_set_links = \
- yield [("RO", [c])]
- limit_set = yield [("RE", [i]) for i in limit_set_links]
- limit_set_names = \
- [i[1] for i in limit_set]
- limit_set_names = [limit_set_names] if len(limit_set) == 1 else limit_set_names
- name_values = yield [("RV", [i]) for i in limit_set_names]
- limit_set = yield [("RD", [model_dict, i]) for i in name_values]
- limit_set = [limit_set] if len(limit_set_names) == 1 else limit_set
- try:
- gen = get_superclasses(a, b)
- inp = None
- while 1:
- inp = yield gen.send(inp)
- except PrimitiveFinished as e:
- superclasses = e.result
- vals = yield [("RO", [superclasses])]
- superclasses = yield [("RE", [i]) for i in vals]
- superclasses = [superclasses] if len(vals) == 1 else superclasses
- superclasses = [i[1] for i in superclasses]
- superclass_names = yield [("RV", [i]) for i in superclasses]
- superclass_names = [superclass_names] if len(superclasses) == 1 else superclass_names
- elems = yield [("RD", [model_dict, i]) for i in superclass_names]
- elems = [elems] if len(superclasses) == 1 else elems
- result = yield [("CN", [])]
- for i, edge in enumerate(limit_set):
- src, dst = yield [("RE", [edge])]
- if dst in elems:
- yield [("CE", [result, limit_set_names[i]])]
- raise PrimitiveFinished(result)
- def selectPossibleOutgoing(a, b, c, **remainder):
- model_dict = yield [("RD", [a, "model"])]
- limit_set_links = \
- yield [("RO", [c])]
- limit_set = yield [("RE", [i]) for i in limit_set_links]
- limit_set_names = \
- [i[1] for i in limit_set]
- limit_set_names = [limit_set_names] if len(limit_set) == 1 else limit_set_names
- name_values = yield [("RV", [i]) for i in limit_set_names]
- limit_set = yield [("RD", [model_dict, i]) for i in name_values]
- limit_set = [limit_set] if len(limit_set_names) == 1 else limit_set
- try:
- gen = get_superclasses(a, b)
- inp = None
- while 1:
- inp = yield gen.send(inp)
- except PrimitiveFinished as e:
- superclasses = e.result
- vals = yield [("RO", [superclasses])]
- superclasses = yield [("RE", [i]) for i in vals]
- superclasses = [superclasses] if len(vals) == 1 else superclasses
- superclasses = [i[1] for i in superclasses]
- superclass_names = yield [("RV", [i]) for i in superclasses]
- superclass_names = [superclass_names] if len(superclasses) == 1 else superclass_names
- elems = yield [("RD", [model_dict, i]) for i in superclass_names]
- elems = [elems] if len(superclasses) == 1 else elems
- result = yield [("CN", [])]
- for i, edge in enumerate(limit_set):
- src, dst = yield [("RE", [edge])]
- if src in elems:
- yield [("CE", [result, limit_set_names[i]])]
- raise PrimitiveFinished(result)
- def read_symbols(a, b, **remainder):
- b_v = yield [("RV", [b])]
- obj = yield [("RD", [a, b_v])]
- node = yield [("RD", [obj, "symbols"])]
- keys = yield [("RDK", [node])]
- keys_v = yield [("RV", [i]) for i in keys]
- keys_v = [keys_v] if not isinstance(keys_v, list) else keys_v
- is_in = yield [("RD", [node, i]) for i in keys_v]
- is_in = [is_in] if not isinstance(is_in, list) else is_in
- is_in_v = yield [("RV", [i]) for i in is_in]
- is_in_v = [is_in_v] if not isinstance(is_in_v, list) else is_in_v
- result_v = ["%s:%s\n" % (key, "1" if value else "0") for key, value in zip(keys_v, is_in_v)]
- result_v = "".join(result_v)
- result = yield [("CNV", [result_v])]
- raise PrimitiveFinished(result)
|