123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576 |
- from services.bottom.V0 import Bottom
- from uuid import UUID
- from state.base import State
- from typing import Dict, Tuple, Set, Any, List
- from pprint import pprint
- class Conformance:
- def __init__(self, state: State, model: UUID, type_model: UUID):
- self.state = state
- self.bottom = Bottom(state)
- type_model_id = state.read_dict(state.read_root(), "SCD")
- self.scd_model = UUID(state.read_value(type_model_id))
- self.model = model
- self.type_model = type_model
- self.type_mapping: Dict[str, str] = {}
- self.model_names = {
- # map model elements to their names to prevent iterating too much
- self.bottom.read_outgoing_elements(self.model, e)[0]: e
- for e in self.bottom.read_keys(self.model)
- }
- self.type_model_names = {
- # map type model elements to their names to prevent iterating too much
- self.bottom.read_outgoing_elements(self.type_model, e)[0]: e
- for e in self.bottom.read_keys(self.type_model)
- }
- self.sub_types: Dict[str, Set[str]] = {
- k: set() for k in self.bottom.read_keys(self.type_model)
- }
- self.primitive_values: Dict[UUID, Any] = {}
- self.abstract_types: List[str] = []
- self.multiplicities: Dict[str, Tuple] = {}
- self.source_multiplicities: Dict[str, Tuple] = {}
- self.target_multiplicities: Dict[str, Tuple] = {}
- self.structures = {}
- self.matches = {}
- self.candidates = {}
- def check_nominal(self, *, log=False):
- """
- Perform a nominal conformance check
- Args:
- log: boolean indicating whether to log errors
- Returns:
- Boolean indicating whether the check has passed
- """
- try:
- self.check_typing()
- self.check_link_typing()
- self.check_multiplicities()
- self.check_constraints()
- return True
- except RuntimeError as e:
- if log:
- print(e)
- return False
- def check_structural(self, *, build_morphisms=True, log=False):
- """
- Perform a structural conformance check
- Args:
- build_morphisms: boolean indicating whether to create morpishm links
- log: boolean indicating whether to log errors
- Returns:
- Boolean indicating whether the check has passed
- """
- try:
- self.precompute_structures()
- self.match_structures()
- if build_morphisms:
- self.build_morphisms()
- self.check_nominal(log=log)
- return True
- except RuntimeError as e:
- if log:
- print(e)
- return False
- def read_attribute(self, element: UUID, attr_name: str):
- """
- Read an attribute value attached to an element
- Args:
- element: UUID of the element
- attr_name: name of the attribute to read
- Returns:
- The value of hte attribute, if no attribute with given name is found, returns None
- """
- if element in self.type_model_names:
- # type model element
- element_name = self.type_model_names[element]
- model = self.type_model
- else:
- # model element
- element_name = self.model_names[element]
- model = self.model
- try:
- attr_elem, = self.bottom.read_outgoing_elements(model, f"{element_name}.{attr_name}")
- return self.primitive_values.get(attr_elem, self.bottom.read_value(attr_elem))
- except ValueError:
- return None
- def precompute_sub_types(self):
- """
- Creates an internal representation of sub-type hierarchies that is
- more easily queryable that the state graph
- """
- # collect inheritance link instances
- inh_element, = self.bottom.read_outgoing_elements(self.scd_model, "Inheritance")
- inh_links = []
- for tm_element, tm_name in self.type_model_names.items():
- morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
- if inh_element in morphisms:
- # we have an instance of an inheritance link
- inh_links.append(tm_element)
- # for each inheritance link we add the parent and child to the sub types map
- for link in inh_links:
- tm_source = self.bottom.read_edge_source(link)
- tm_target = self.bottom.read_edge_target(link)
- parent_name = self.type_model_names[tm_target]
- child_name = self.type_model_names[tm_source]
- self.sub_types[parent_name].add(child_name)
- # iteratively expand the sub type hierarchies in the sub types map
- stop = False
- while not stop:
- stop = True
- for child_name, child_children in self.sub_types.items():
- for parent_name, parent_children in self.sub_types.items():
- if child_name in parent_children:
- original_size = len(parent_children)
- parent_children.update(child_children)
- if len(parent_children) != original_size:
- stop = False
- def deref_primitive_values(self):
- """
- Prefetch the values stored in referenced primitive type models
- """
- ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef")
- string_element, = self.bottom.read_outgoing_elements(self.scd_model, "String")
- boolean_element, = self.bottom.read_outgoing_elements(self.scd_model, "Boolean")
- integer_element, = self.bottom.read_outgoing_elements(self.scd_model, "Integer")
- t_deref = []
- t_refs = []
- for tm_element, tm_name in self.type_model_names.items():
- morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
- if ref_element in morphisms:
- t_refs.append(self.type_model_names[tm_element])
- elif string_element in morphisms:
- t_deref.append(tm_element)
- elif boolean_element in morphisms:
- t_deref.append(tm_element)
- elif integer_element in morphisms:
- t_deref.append(tm_element)
- for elem in t_deref:
- primitive_model = UUID(self.bottom.read_value(elem))
- primitive_value_node, = self.bottom.read_outgoing_elements(primitive_model)
- primitive_value = self.bottom.read_value(primitive_value_node)
- self.primitive_values[elem] = primitive_value
- for m_name, tm_name in self.type_mapping.items():
- if tm_name in t_refs:
- # dereference
- m_element, = self.bottom.read_outgoing_elements(self.model, m_name)
- primitive_model = UUID(self.bottom.read_value(m_element))
- try:
- primitive_value_node, = self.bottom.read_outgoing_elements(primitive_model)
- primitive_value = self.bottom.read_value(primitive_value_node)
- self.primitive_values[m_element] = primitive_value
- except ValueError:
- pass # multiple elements in model indicate that we're not dealing with a primitive
- def precompute_multiplicities(self):
- """
- Creates an internal representation of type multiplicities that is
- more easily queryable that the state graph
- """
- for tm_element, tm_name in self.type_model_names.items():
- # class abstract flags and multiplicities
- abstract = self.read_attribute(tm_element, "abstract")
- lc = self.read_attribute(tm_element, "lower_cardinality")
- uc = self.read_attribute(tm_element, "upper_cardinality")
- if abstract:
- self.abstract_types.append(tm_name)
- if lc or uc:
- mult = (
- lc if lc is not None else float("-inf"),
- uc if uc is not None else float("inf")
- )
- self.multiplicities[tm_name] = mult
- # multiplicities for associations
- slc = self.read_attribute(tm_element, "source_lower_cardinality")
- suc = self.read_attribute(tm_element, "source_upper_cardinality")
- if slc or suc:
- mult = (
- slc if slc is not None else float("-inf"),
- suc if suc is not None else float("inf")
- )
- self.source_multiplicities[tm_name] = mult
- tlc = self.read_attribute(tm_element, "target_lower_cardinality")
- tuc = self.read_attribute(tm_element, "target_upper_cardinality")
- if tlc or tuc:
- mult = (
- tlc if tlc is not None else float("-inf"),
- tuc if tuc is not None else float("inf")
- )
- self.target_multiplicities[tm_name] = mult
- # optional for attribute links
- opt = self.read_attribute(tm_element, "optional")
- if opt is not None:
- self.source_multiplicities[tm_name] = (0 if opt else 1, 1)
- self.target_multiplicities[tm_name] = (0, 1)
- def get_type(self, element: UUID):
- """
- Retrieve the type of an element (wrt. current type model)
- """
- morphisms = self.bottom.read_outgoing_elements(element, "Morphism")
- tm_element, = [m for m in morphisms if m in self.type_model_names.keys()]
- return tm_element
- def check_typing(self):
- """
- for each element of model check whether a morphism
- link exists to some element of type_model
- """
- ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef")
- model_names = self.bottom.read_keys(self.model)
- for m_name in model_names:
- m_element, = self.bottom.read_outgoing_elements(self.model, m_name)
- try:
- tm_element = self.get_type(m_element)
- tm_name = self.type_model_names[tm_element]
- self.type_mapping[m_name] = tm_name
- if ref_element in self.bottom.read_outgoing_elements(tm_element, "Morphism"):
- sub_m = UUID(self.bottom.read_value(m_element))
- sub_tm = UUID(self.bottom.read_value(tm_element))
- if not Conformance(self.state, sub_m, sub_tm).check_nominal():
- raise RuntimeError(f"Incorrectly model reference: {m_name}")
- except ValueError:
- # no or too many morphism links found
- raise RuntimeError(f"Incorrectly typed element: {m_name}")
- return True
- def check_link_typing(self):
- """
- for each link, check whether its source and target are of a valid type
- """
- self.precompute_sub_types()
- for m_name, tm_name in self.type_mapping.items():
- m_element, = self.bottom.read_outgoing_elements(self.model, m_name)
- m_source = self.bottom.read_edge_source(m_element)
- m_target = self.bottom.read_edge_target(m_element)
- if m_source is None or m_target is None:
- # element is not a link
- continue
- tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
- tm_source = self.bottom.read_edge_source(tm_element)
- tm_target = self.bottom.read_edge_target(tm_element)
- # check if source is typed correctly
- source_name = self.model_names[m_source]
- source_type_actual = self.type_mapping[source_name]
- source_type_expected = self.type_model_names[tm_source]
- if source_type_actual != source_type_expected:
- if source_type_actual not in self.sub_types[source_type_expected]:
- raise RuntimeError(f"Invalid source type {source_type_actual} for element {m_name}")
- # check if target is typed correctly
- target_name = self.model_names[m_target]
- target_type_actual = self.type_mapping[target_name]
- target_type_expected = self.type_model_names[tm_target]
- if target_type_actual != target_type_expected:
- if target_type_actual not in self.sub_types[target_type_expected]:
- raise RuntimeError(f"Invalid target type {target_type_actual} for element {m_name}")
- return True
- def check_multiplicities(self):
- """
- Check whether multiplicities for all types are respected
- """
- self.deref_primitive_values()
- self.precompute_multiplicities()
- for tm_name in self.type_model_names.values():
- # abstract classes
- if tm_name in self.abstract_types:
- type_count = list(self.type_mapping.values()).count(tm_name)
- if type_count > 0:
- raise RuntimeError(f"Invalid instantiation of abstract class: {tm_name}")
- # class multiplicities
- if tm_name in self.multiplicities:
- lc, uc = self.multiplicities[tm_name]
- type_count = list(self.type_mapping.values()).count(tm_name)
- for sub_type in self.sub_types[tm_name]:
- type_count += list(self.type_mapping.values()).count(sub_type)
- if type_count < lc or type_count > uc:
- raise RuntimeError(f"Cardinality of type exceeds valid multiplicity range: {tm_name} ({type_count})")
- # association source multiplicities
- if tm_name in self.source_multiplicities:
- tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
- tm_source_element = self.bottom.read_edge_source(tm_element)
- tm_source_name = self.type_model_names[tm_source_element]
- lc, uc = self.source_multiplicities[tm_name]
- for i, t in self.type_mapping.items():
- if t == tm_source_name or t in self.sub_types[tm_source_name]:
- count = 0
- i_element, = self.bottom.read_outgoing_elements(self.model, i)
- outgoing = self.bottom.read_outgoing_edges(i_element)
- for o in outgoing:
- try:
- if self.type_mapping[self.model_names[o]] == tm_name:
- count += 1
- except KeyError:
- pass # for elements not part of model, e.g. morphism links
- if count < lc or count > uc:
- raise RuntimeError(f"Source cardinality of type {tm_name} exceeds valid multiplicity range in {i}.")
- # association target multiplicities
- if tm_name in self.target_multiplicities:
- tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
- tm_target_element = self.bottom.read_edge_target(tm_element)
- tm_target_name = self.type_model_names[tm_target_element]
- lc, uc = self.target_multiplicities[tm_name]
- for i, t in self.type_mapping.items():
- if t == tm_target_name or t in self.sub_types[tm_target_name]:
- count = 0
- i_element, = self.bottom.read_outgoing_elements(self.model, i)
- outgoing = self.bottom.read_incoming_edges(i_element)
- for o in outgoing:
- try:
- if self.type_mapping[self.model_names[o]] == tm_name:
- count += 1
- except KeyError:
- pass # for elements not part of model, e.g. morphism links
- if count < lc or count > uc:
- print(f"Target cardinality of type {tm_name} exceeds valid multiplicity range in {i}.")
- return False
- return True
- def evaluate_constraint(self, code, **kwargs):
- """
- Evaluate constraint code (Python code)
- """
- funcs = {
- 'read_value': self.state.read_value
- }
- return eval(
- code,
- {'__builtins__': {'isinstance': isinstance, 'print': print,
- 'int': int, 'float': float, 'bool': bool, 'str': str, 'tuple': tuple}
- }, # globals
- {**kwargs, **funcs} # locals
- )
- def check_constraints(self):
- """
- Check whether all constraints defined for a model are respected
- """
- # local constraints
- for m_name, tm_name in self.type_mapping.items():
- if tm_name != "GlobalConstraint":
- tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
- code = self.read_attribute(tm_element, "constraint")
- if code is not None:
- morphisms = self.bottom.read_incoming_elements(tm_element, "Morphism")
- morphisms = [m for m in morphisms if m in self.model_names]
- for m_element in morphisms:
- if not self.evaluate_constraint(code, element=m_element):
- raise RuntimeError(f"Local constraint of {tm_name} not satisfied in {m_name}.")
- # global constraints
- for m_name, tm_name in self.type_mapping.items():
- if tm_name == "GlobalConstraint":
- tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
- code = self.read_attribute(tm_element, "constraint")
- if code is not None:
- if not self.evaluate_constraint(code, model=self.model):
- raise RuntimeError(f"Global constraint {tm_name} not satisfied.")
- return True
- def precompute_structures(self):
- """
- Make an internal representation of type structures such that comparing type structures is easier
- """
- self.precompute_sub_types()
- scd_elements = self.bottom.read_outgoing_elements(self.scd_model)
- # collect types
- class_element, = self.bottom.read_outgoing_elements(self.scd_model, "Class")
- association_element, = self.bottom.read_outgoing_elements(self.scd_model, "Association")
- for tm_element, tm_name in self.type_model_names.items():
- # retrieve elements that tm_element is a morphism of
- morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
- morphism, = [m for m in morphisms if m in scd_elements]
- # check if tm_element is a morphism of AttributeLink
- if class_element == morphism or association_element == morphism:
- self.structures[tm_name] = set()
- # collect type structures
- # retrieve AttributeLink to check whether element is a morphism of AttributeLink
- attr_link_element, = self.bottom.read_outgoing_elements(self.scd_model, "AttributeLink")
- for tm_element, tm_name in self.type_model_names.items():
- # retrieve elements that tm_element is a morphism of
- morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
- morphism, = [m for m in morphisms if m in scd_elements]
- # check if tm_element is a morphism of AttributeLink
- if attr_link_element == morphism:
- # retrieve attributes of attribute link, i.e. 'name' and 'optional'
- attrs = self.bottom.read_outgoing_elements(tm_element)
- name_model_node, = filter(lambda x: self.type_model_names.get(x, "").endswith(".name"), attrs)
- opt_model_node, = filter(lambda x: self.type_model_names.get(x, "").endswith(".optional"), attrs)
- # get attr name value
- name_model = UUID(self.bottom.read_value(name_model_node))
- name_node, = self.bottom.read_outgoing_elements(name_model)
- name = self.bottom.read_value(name_node)
- # get attr opt value
- opt_model = UUID(self.bottom.read_value(opt_model_node))
- opt_node, = self.bottom.read_outgoing_elements(opt_model)
- opt = self.bottom.read_value(opt_node)
- # get attr type name
- source_type_node = self.bottom.read_edge_source(tm_element)
- source_type_name = self.type_model_names[source_type_node]
- target_type_node = self.bottom.read_edge_target(tm_element)
- target_type_name = self.type_model_names[target_type_node]
- # add attribute to the structure of its source type
- # attribute is stored as a (name, optional, type) triple
- self.structures.setdefault(source_type_name, set()).add((name, opt, target_type_name))
- # extend structures of sub types with attrs of super types
- for super_type, sub_types in self.sub_types.items():
- for sub_type in sub_types:
- self.structures.setdefault(sub_type, set()).update(self.structures[super_type])
- # filter out abstract types, as they cannot be instantiated
- # retrieve Class_abstract to check whether element is a morphism of Class_abstract
- class_abs_element, = self.bottom.read_outgoing_elements(self.scd_model, "Class_abstract")
- for tm_element, tm_name in self.type_model_names.items():
- # retrieve elements that tm_element is a morphism of
- morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
- morphism, = [m for m in morphisms if m in scd_elements]
- # check if tm_element is a morphism of Class_abstract
- if class_abs_element == morphism:
- # retrieve 'abstract' attribute value
- target_node = self.bottom.read_edge_target(tm_element)
- abst_model = UUID(self.bottom.read_value(target_node))
- abst_node, = self.bottom.read_outgoing_elements(abst_model)
- is_abstract = self.bottom.read_value(abst_node)
- # retrieve type name
- source_node = self.bottom.read_edge_source(tm_element)
- type_name = self.type_model_names[source_node]
- if is_abstract:
- self.structures.pop(type_name)
- def match_structures(self):
- """
- Try to match the structure of each element in the instance model to some element in the type model
- """
- ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef")
- # matching
- for m_element, m_name in self.model_names.items():
- is_edge = self.bottom.read_edge_source(m_element) is not None
- for type_name, structure in self.structures.items():
- tm_element, = self.bottom.read_outgoing_elements(self.type_model, type_name)
- type_is_edge = self.bottom.read_edge_source(tm_element) is not None
- if is_edge == type_is_edge:
- mismatch = False
- matched = 0
- for name, optional, attr_type in structure:
- try:
- attr, = self.bottom.read_outgoing_elements(self.model, f"{m_name}.{name}")
- attr_tm, = self.bottom.read_outgoing_elements(self.type_model, attr_type)
- # if attribute is a modelref, we need to check whether it
- # linguistically conforms to the specified type
- # if its an internally defined attribute, this will be checked by constraints
- morphisms = self.bottom.read_outgoing_elements(attr_tm, "Morphism")
- attr_conforms = True
- if ref_element in morphisms:
- # check conformance of reference model
- type_model_uuid = UUID(self.bottom.read_value(attr_tm))
- model_uuid = UUID(self.bottom.read_value(attr))
- attr_conforms = Conformance(self.state, model_uuid, type_model_uuid)\
- .check_nominal()
- else:
- # eval constraints
- code = self.read_attribute(attr_tm, "constraint")
- if code is not None:
- attr_conforms = self.evaluate_constraint(code, element=attr)
- if attr_conforms:
- matched += 1
- except ValueError:
- # attr not found or failed parsing UUID
- if optional:
- continue
- else:
- mismatch = True
- break
- # if matched == len(structure):
- if not mismatch:
- self.candidates.setdefault(m_name, set()).add(type_name)
- # filter out candidates for links based on source and target types
- for m_element, m_name in self.model_names.items():
- is_edge = self.bottom.read_edge_source(m_element) is not None
- if is_edge and m_name in self.candidates:
- m_source = self.bottom.read_edge_source(m_element)
- m_target = self.bottom.read_edge_target(m_element)
- source_candidates = self.candidates[self.model_names[m_source]]
- target_candidates = self.candidates[self.model_names[m_target]]
- remove = set()
- for candidate_name in self.candidates[m_name]:
- candidate_element, = self.bottom.read_outgoing_elements(self.type_model, candidate_name)
- candidate_source = self.type_model_names[self.bottom.read_edge_source(candidate_element)]
- if candidate_source not in source_candidates:
- if len(source_candidates.intersection(set(self.sub_types[candidate_source]))) == 0:
- remove.add(candidate_name)
- candidate_target = self.type_model_names[self.bottom.read_edge_target(candidate_element)]
- if candidate_target not in target_candidates:
- if len(target_candidates.intersection(set(self.sub_types[candidate_target]))) == 0:
- remove.add(candidate_name)
- self.candidates[m_name] = self.candidates[m_name].difference(remove)
- def build_morphisms(self):
- """
- Build the morphisms between an instance and a type model that structurally match
- """
- if not all([len(c) == 1 for c in self.candidates.values()]):
- raise RuntimeError("Cannot build incomplete or ambiguous morphism.")
- mapping = {k: v.pop() for k, v in self.candidates.items()}
- for m_name, tm_name in mapping.items():
- # morphism to class/assoc
- m_element, = self.bottom.read_outgoing_elements(self.model, m_name)
- tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
- self.bottom.create_edge(m_element, tm_element, "Morphism")
- # morphism for attributes and attribute links
- structure = self.structures[tm_name]
- for attr_name, _, attr_type in structure:
- try:
- # attribute node
- attr_element, = self.bottom.read_outgoing_elements(self.model, f"{m_name}.{attr_name}")
- attr_type_element, = self.bottom.read_outgoing_elements(self.type_model, attr_type)
- self.bottom.create_edge(attr_element, attr_type_element, "Morphism")
- # attribute link
- attr_link_element, = self.bottom.read_outgoing_elements(self.model, f"{m_name}.{attr_name}_link")
- attr_link_type_element, = self.bottom.read_outgoing_elements(self.type_model, f"{tm_name}_{attr_name}")
- self.bottom.create_edge(attr_link_element, attr_link_type_element, "Morphism")
- except ValueError:
- pass
- if __name__ == '__main__':
- from state.devstate import DevState as State
- s = State()
- from bootstrap.scd import bootstrap_scd
- scd = bootstrap_scd(s)
- from bootstrap.pn import bootstrap_pn
- ltm_pn = bootstrap_pn(s, "PN")
- ltm_pn_lola = bootstrap_pn(s, "PNlola")
- from services.pn import PN
- my_pn = s.create_node()
- PNserv = PN(my_pn, s)
- PNserv.create_place("p1", 5)
- PNserv.create_place("p2", 0)
- PNserv.create_transition("t1")
- PNserv.create_p2t("p1", "t1", 1)
- PNserv.create_t2p("t1", "p2", 1)
-
- cf = Conformance(s, my_pn, ltm_pn_lola)
- # cf = Conformance(s, scd, ltm_pn, scd)
- cf.precompute_structures()
- cf.match_structures()
- cf.build_morphisms()
- print(cf.check_nominal())
|