from services.bottom.V0 import Bottom from uuid import UUID from state.base import State from typing import Dict, Tuple, Set, Any, List from pprint import pprint class Conformance: def __init__(self, state: State, model: UUID, type_model: UUID): self.state = state self.bottom = Bottom(state) type_model_id = state.read_dict(state.read_root(), "SCD") self.scd_model = UUID(state.read_value(type_model_id)) self.model = model self.type_model = type_model self.type_mapping: Dict[str, str] = {} self.model_names = { # map model elements to their names to prevent iterating too much self.bottom.read_outgoing_elements(self.model, e)[0]: e for e in self.bottom.read_keys(self.model) } self.type_model_names = { # map type model elements to their names to prevent iterating too much self.bottom.read_outgoing_elements(self.type_model, e)[0]: e for e in self.bottom.read_keys(self.type_model) } self.sub_types: Dict[str, Set[str]] = { k: set() for k in self.bottom.read_keys(self.type_model) } self.primitive_values: Dict[UUID, Any] = {} self.abstract_types: List[str] = [] self.multiplicities: Dict[str, Tuple] = {} self.source_multiplicities: Dict[str, Tuple] = {} self.target_multiplicities: Dict[str, Tuple] = {} self.structures = {} self.matches = {} self.candidates = {} def check_nominal(self, *, log=False): """ Perform a nominal conformance check Args: log: boolean indicating whether to log errors Returns: Boolean indicating whether the check has passed """ try: self.check_typing() self.check_link_typing() self.check_multiplicities() self.check_constraints() return True except RuntimeError as e: if log: print(e) return False def check_structural(self, *, build_morphisms=True, log=False): """ Perform a structural conformance check Args: build_morphisms: boolean indicating whether to create morpishm links log: boolean indicating whether to log errors Returns: Boolean indicating whether the check has passed """ try: self.precompute_structures() self.match_structures() if build_morphisms: self.build_morphisms() self.check_nominal(log=log) return True except RuntimeError as e: if log: print(e) return False def read_attribute(self, element: UUID, attr_name: str): """ Read an attribute value attached to an element Args: element: UUID of the element attr_name: name of the attribute to read Returns: The value of hte attribute, if no attribute with given name is found, returns None """ if element in self.type_model_names: # type model element element_name = self.type_model_names[element] model = self.type_model else: # model element element_name = self.model_names[element] model = self.model try: attr_elem, = self.bottom.read_outgoing_elements(model, f"{element_name}.{attr_name}") return self.primitive_values.get(attr_elem, self.bottom.read_value(attr_elem)) except ValueError: return None def precompute_sub_types(self): """ Creates an internal representation of sub-type hierarchies that is more easily queryable that the state graph """ # collect inheritance link instances inh_element, = self.bottom.read_outgoing_elements(self.scd_model, "Inheritance") inh_links = [] for tm_element, tm_name in self.type_model_names.items(): morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism") if inh_element in morphisms: # we have an instance of an inheritance link inh_links.append(tm_element) # for each inheritance link we add the parent and child to the sub types map for link in inh_links: tm_source = self.bottom.read_edge_source(link) tm_target = self.bottom.read_edge_target(link) parent_name = self.type_model_names[tm_target] child_name = self.type_model_names[tm_source] self.sub_types[parent_name].add(child_name) # iteratively expand the sub type hierarchies in the sub types map stop = False while not stop: stop = True for child_name, child_children in self.sub_types.items(): for parent_name, parent_children in self.sub_types.items(): if child_name in parent_children: original_size = len(parent_children) parent_children.update(child_children) if len(parent_children) != original_size: stop = False def deref_primitive_values(self): """ Prefetch the values stored in referenced primitive type models """ ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef") string_element, = self.bottom.read_outgoing_elements(self.scd_model, "String") boolean_element, = self.bottom.read_outgoing_elements(self.scd_model, "Boolean") integer_element, = self.bottom.read_outgoing_elements(self.scd_model, "Integer") t_deref = [] t_refs = [] for tm_element, tm_name in self.type_model_names.items(): morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism") if ref_element in morphisms: t_refs.append(self.type_model_names[tm_element]) elif string_element in morphisms: t_deref.append(tm_element) elif boolean_element in morphisms: t_deref.append(tm_element) elif integer_element in morphisms: t_deref.append(tm_element) for elem in t_deref: primitive_model = UUID(self.bottom.read_value(elem)) primitive_value_node, = self.bottom.read_outgoing_elements(primitive_model) primitive_value = self.bottom.read_value(primitive_value_node) self.primitive_values[elem] = primitive_value for m_name, tm_name in self.type_mapping.items(): if tm_name in t_refs: # dereference m_element, = self.bottom.read_outgoing_elements(self.model, m_name) primitive_model = UUID(self.bottom.read_value(m_element)) try: primitive_value_node, = self.bottom.read_outgoing_elements(primitive_model) primitive_value = self.bottom.read_value(primitive_value_node) self.primitive_values[m_element] = primitive_value except ValueError: pass # multiple elements in model indicate that we're not dealing with a primitive def precompute_multiplicities(self): """ Creates an internal representation of type multiplicities that is more easily queryable that the state graph """ for tm_element, tm_name in self.type_model_names.items(): # class abstract flags and multiplicities abstract = self.read_attribute(tm_element, "abstract") lc = self.read_attribute(tm_element, "lower_cardinality") uc = self.read_attribute(tm_element, "upper_cardinality") if abstract: self.abstract_types.append(tm_name) if lc or uc: mult = ( lc if lc is not None else float("-inf"), uc if uc is not None else float("inf") ) self.multiplicities[tm_name] = mult # multiplicities for associations slc = self.read_attribute(tm_element, "source_lower_cardinality") suc = self.read_attribute(tm_element, "source_upper_cardinality") if slc or suc: mult = ( slc if slc is not None else float("-inf"), suc if suc is not None else float("inf") ) self.source_multiplicities[tm_name] = mult tlc = self.read_attribute(tm_element, "target_lower_cardinality") tuc = self.read_attribute(tm_element, "target_upper_cardinality") if tlc or tuc: mult = ( tlc if tlc is not None else float("-inf"), tuc if tuc is not None else float("inf") ) self.target_multiplicities[tm_name] = mult # optional for attribute links opt = self.read_attribute(tm_element, "optional") if opt is not None: self.source_multiplicities[tm_name] = (0 if opt else 1, 1) self.target_multiplicities[tm_name] = (0, 1) def get_type(self, element: UUID): """ Retrieve the type of an element (wrt. current type model) """ morphisms = self.bottom.read_outgoing_elements(element, "Morphism") tm_element, = [m for m in morphisms if m in self.type_model_names.keys()] return tm_element def check_typing(self): """ for each element of model check whether a morphism link exists to some element of type_model """ ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef") model_names = self.bottom.read_keys(self.model) for m_name in model_names: m_element, = self.bottom.read_outgoing_elements(self.model, m_name) try: tm_element = self.get_type(m_element) tm_name = self.type_model_names[tm_element] self.type_mapping[m_name] = tm_name if ref_element in self.bottom.read_outgoing_elements(tm_element, "Morphism"): sub_m = UUID(self.bottom.read_value(m_element)) sub_tm = UUID(self.bottom.read_value(tm_element)) if not Conformance(self.state, sub_m, sub_tm).check_nominal(): raise RuntimeError(f"Incorrectly model reference: {m_name}") except ValueError: # no or too many morphism links found raise RuntimeError(f"Incorrectly typed element: {m_name}") return True def check_link_typing(self): """ for each link, check whether its source and target are of a valid type """ self.precompute_sub_types() for m_name, tm_name in self.type_mapping.items(): m_element, = self.bottom.read_outgoing_elements(self.model, m_name) m_source = self.bottom.read_edge_source(m_element) m_target = self.bottom.read_edge_target(m_element) if m_source is None or m_target is None: # element is not a link continue tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name) tm_source = self.bottom.read_edge_source(tm_element) tm_target = self.bottom.read_edge_target(tm_element) # check if source is typed correctly source_name = self.model_names[m_source] source_type_actual = self.type_mapping[source_name] source_type_expected = self.type_model_names[tm_source] if source_type_actual != source_type_expected: if source_type_actual not in self.sub_types[source_type_expected]: raise RuntimeError(f"Invalid source type {source_type_actual} for element {m_name}") # check if target is typed correctly target_name = self.model_names[m_target] target_type_actual = self.type_mapping[target_name] target_type_expected = self.type_model_names[tm_target] if target_type_actual != target_type_expected: if target_type_actual not in self.sub_types[target_type_expected]: raise RuntimeError(f"Invalid target type {target_type_actual} for element {m_name}") return True def check_multiplicities(self): """ Check whether multiplicities for all types are respected """ self.deref_primitive_values() self.precompute_multiplicities() for tm_name in self.type_model_names.values(): # abstract classes if tm_name in self.abstract_types: type_count = list(self.type_mapping.values()).count(tm_name) if type_count > 0: raise RuntimeError(f"Invalid instantiation of abstract class: {tm_name}") # class multiplicities if tm_name in self.multiplicities: lc, uc = self.multiplicities[tm_name] type_count = list(self.type_mapping.values()).count(tm_name) for sub_type in self.sub_types[tm_name]: type_count += list(self.type_mapping.values()).count(sub_type) if type_count < lc or type_count > uc: raise RuntimeError(f"Cardinality of type exceeds valid multiplicity range: {tm_name} ({type_count})") # association source multiplicities if tm_name in self.source_multiplicities: tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name) tm_source_element = self.bottom.read_edge_source(tm_element) tm_source_name = self.type_model_names[tm_source_element] lc, uc = self.source_multiplicities[tm_name] for i, t in self.type_mapping.items(): if t == tm_source_name or t in self.sub_types[tm_source_name]: count = 0 i_element, = self.bottom.read_outgoing_elements(self.model, i) outgoing = self.bottom.read_outgoing_edges(i_element) for o in outgoing: try: if self.type_mapping[self.model_names[o]] == tm_name: count += 1 except KeyError: pass # for elements not part of model, e.g. morphism links if count < lc or count > uc: raise RuntimeError(f"Source cardinality of type {tm_name} exceeds valid multiplicity range in {i}.") # association target multiplicities if tm_name in self.target_multiplicities: tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name) tm_target_element = self.bottom.read_edge_target(tm_element) tm_target_name = self.type_model_names[tm_target_element] lc, uc = self.target_multiplicities[tm_name] for i, t in self.type_mapping.items(): if t == tm_target_name or t in self.sub_types[tm_target_name]: count = 0 i_element, = self.bottom.read_outgoing_elements(self.model, i) outgoing = self.bottom.read_incoming_edges(i_element) for o in outgoing: try: if self.type_mapping[self.model_names[o]] == tm_name: count += 1 except KeyError: pass # for elements not part of model, e.g. morphism links if count < lc or count > uc: print(f"Target cardinality of type {tm_name} exceeds valid multiplicity range in {i}.") return False return True def evaluate_constraint(self, code, **kwargs): """ Evaluate constraint code (Python code) """ funcs = { 'read_value': self.state.read_value } return eval( code, {'__builtins__': {'isinstance': isinstance, 'print': print, 'int': int, 'float': float, 'bool': bool, 'str': str, 'tuple': tuple} }, # globals {**kwargs, **funcs} # locals ) def check_constraints(self): """ Check whether all constraints defined for a model are respected """ # local constraints for m_name, tm_name in self.type_mapping.items(): if tm_name != "GlobalConstraint": tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name) code = self.read_attribute(tm_element, "constraint") if code is not None: morphisms = self.bottom.read_incoming_elements(tm_element, "Morphism") morphisms = [m for m in morphisms if m in self.model_names] for m_element in morphisms: if not self.evaluate_constraint(code, element=m_element): raise RuntimeError(f"Local constraint of {tm_name} not satisfied in {m_name}.") # global constraints for m_name, tm_name in self.type_mapping.items(): if tm_name == "GlobalConstraint": tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name) code = self.read_attribute(tm_element, "constraint") if code is not None: if not self.evaluate_constraint(code, model=self.model): raise RuntimeError(f"Global constraint {tm_name} not satisfied.") return True def precompute_structures(self): """ Make an internal representation of type structures such that comparing type structures is easier """ self.precompute_sub_types() scd_elements = self.bottom.read_outgoing_elements(self.scd_model) # collect types class_element, = self.bottom.read_outgoing_elements(self.scd_model, "Class") association_element, = self.bottom.read_outgoing_elements(self.scd_model, "Association") for tm_element, tm_name in self.type_model_names.items(): # retrieve elements that tm_element is a morphism of morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism") morphism, = [m for m in morphisms if m in scd_elements] # check if tm_element is a morphism of AttributeLink if class_element == morphism or association_element == morphism: self.structures[tm_name] = set() # collect type structures # retrieve AttributeLink to check whether element is a morphism of AttributeLink attr_link_element, = self.bottom.read_outgoing_elements(self.scd_model, "AttributeLink") for tm_element, tm_name in self.type_model_names.items(): # retrieve elements that tm_element is a morphism of morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism") morphism, = [m for m in morphisms if m in scd_elements] # check if tm_element is a morphism of AttributeLink if attr_link_element == morphism: # retrieve attributes of attribute link, i.e. 'name' and 'optional' attrs = self.bottom.read_outgoing_elements(tm_element) name_model_node, = filter(lambda x: self.type_model_names.get(x, "").endswith(".name"), attrs) opt_model_node, = filter(lambda x: self.type_model_names.get(x, "").endswith(".optional"), attrs) # get attr name value name_model = UUID(self.bottom.read_value(name_model_node)) name_node, = self.bottom.read_outgoing_elements(name_model) name = self.bottom.read_value(name_node) # get attr opt value opt_model = UUID(self.bottom.read_value(opt_model_node)) opt_node, = self.bottom.read_outgoing_elements(opt_model) opt = self.bottom.read_value(opt_node) # get attr type name source_type_node = self.bottom.read_edge_source(tm_element) source_type_name = self.type_model_names[source_type_node] target_type_node = self.bottom.read_edge_target(tm_element) target_type_name = self.type_model_names[target_type_node] # add attribute to the structure of its source type # attribute is stored as a (name, optional, type) triple self.structures.setdefault(source_type_name, set()).add((name, opt, target_type_name)) # extend structures of sub types with attrs of super types for super_type, sub_types in self.sub_types.items(): for sub_type in sub_types: self.structures.setdefault(sub_type, set()).update(self.structures[super_type]) # filter out abstract types, as they cannot be instantiated # retrieve Class_abstract to check whether element is a morphism of Class_abstract class_abs_element, = self.bottom.read_outgoing_elements(self.scd_model, "Class_abstract") for tm_element, tm_name in self.type_model_names.items(): # retrieve elements that tm_element is a morphism of morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism") morphism, = [m for m in morphisms if m in scd_elements] # check if tm_element is a morphism of Class_abstract if class_abs_element == morphism: # retrieve 'abstract' attribute value target_node = self.bottom.read_edge_target(tm_element) abst_model = UUID(self.bottom.read_value(target_node)) abst_node, = self.bottom.read_outgoing_elements(abst_model) is_abstract = self.bottom.read_value(abst_node) # retrieve type name source_node = self.bottom.read_edge_source(tm_element) type_name = self.type_model_names[source_node] if is_abstract: self.structures.pop(type_name) def match_structures(self): """ Try to match the structure of each element in the instance model to some element in the type model """ ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef") # matching for m_element, m_name in self.model_names.items(): is_edge = self.bottom.read_edge_source(m_element) is not None for type_name, structure in self.structures.items(): tm_element, = self.bottom.read_outgoing_elements(self.type_model, type_name) type_is_edge = self.bottom.read_edge_source(tm_element) is not None if is_edge == type_is_edge: mismatch = False matched = 0 for name, optional, attr_type in structure: try: attr, = self.bottom.read_outgoing_elements(self.model, f"{m_name}.{name}") attr_tm, = self.bottom.read_outgoing_elements(self.type_model, attr_type) # if attribute is a modelref, we need to check whether it # linguistically conforms to the specified type # if its an internally defined attribute, this will be checked by constraints morphisms = self.bottom.read_outgoing_elements(attr_tm, "Morphism") attr_conforms = True if ref_element in morphisms: # check conformance of reference model type_model_uuid = UUID(self.bottom.read_value(attr_tm)) model_uuid = UUID(self.bottom.read_value(attr)) attr_conforms = Conformance(self.state, model_uuid, type_model_uuid)\ .check_nominal() else: # eval constraints code = self.read_attribute(attr_tm, "constraint") if code is not None: attr_conforms = self.evaluate_constraint(code, element=attr) if attr_conforms: matched += 1 except ValueError: # attr not found or failed parsing UUID if optional: continue else: mismatch = True break # if matched == len(structure): if not mismatch: self.candidates.setdefault(m_name, set()).add(type_name) # filter out candidates for links based on source and target types for m_element, m_name in self.model_names.items(): is_edge = self.bottom.read_edge_source(m_element) is not None if is_edge and m_name in self.candidates: m_source = self.bottom.read_edge_source(m_element) m_target = self.bottom.read_edge_target(m_element) source_candidates = self.candidates[self.model_names[m_source]] target_candidates = self.candidates[self.model_names[m_target]] remove = set() for candidate_name in self.candidates[m_name]: candidate_element, = self.bottom.read_outgoing_elements(self.type_model, candidate_name) candidate_source = self.type_model_names[self.bottom.read_edge_source(candidate_element)] if candidate_source not in source_candidates: if len(source_candidates.intersection(set(self.sub_types[candidate_source]))) == 0: remove.add(candidate_name) candidate_target = self.type_model_names[self.bottom.read_edge_target(candidate_element)] if candidate_target not in target_candidates: if len(target_candidates.intersection(set(self.sub_types[candidate_target]))) == 0: remove.add(candidate_name) self.candidates[m_name] = self.candidates[m_name].difference(remove) def build_morphisms(self): """ Build the morphisms between an instance and a type model that structurally match """ if not all([len(c) == 1 for c in self.candidates.values()]): raise RuntimeError("Cannot build incomplete or ambiguous morphism.") mapping = {k: v.pop() for k, v in self.candidates.items()} for m_name, tm_name in mapping.items(): # morphism to class/assoc m_element, = self.bottom.read_outgoing_elements(self.model, m_name) tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name) self.bottom.create_edge(m_element, tm_element, "Morphism") # morphism for attributes and attribute links structure = self.structures[tm_name] for attr_name, _, attr_type in structure: try: # attribute node attr_element, = self.bottom.read_outgoing_elements(self.model, f"{m_name}.{attr_name}") attr_type_element, = self.bottom.read_outgoing_elements(self.type_model, attr_type) self.bottom.create_edge(attr_element, attr_type_element, "Morphism") # attribute link attr_link_element, = self.bottom.read_outgoing_elements(self.model, f"{m_name}.{attr_name}_link") attr_link_type_element, = self.bottom.read_outgoing_elements(self.type_model, f"{tm_name}_{attr_name}") self.bottom.create_edge(attr_link_element, attr_link_type_element, "Morphism") except ValueError: pass if __name__ == '__main__': from state.devstate import DevState as State s = State() from bootstrap.scd import bootstrap_scd scd = bootstrap_scd(s) from bootstrap.pn import bootstrap_pn ltm_pn = bootstrap_pn(s, "PN") ltm_pn_lola = bootstrap_pn(s, "PNlola") from services.pn import PN my_pn = s.create_node() PNserv = PN(my_pn, s) PNserv.create_place("p1", 5) PNserv.create_place("p2", 0) PNserv.create_transition("t1") PNserv.create_p2t("p1", "t1", 1) PNserv.create_t2p("t1", "p2", 1) cf = Conformance(s, my_pn, ltm_pn_lola) # cf = Conformance(s, scd, ltm_pn, scd) cf.precompute_structures() cf.match_structures() cf.build_morphisms() print(cf.check_nominal())