conformance.py 34 KB


  1. from services.bottom.V0 import Bottom
  2. from services.primitives.actioncode_type import ActionCode
  3. from uuid import UUID
  4. from state.base import State
  5. from typing import Dict, Tuple, Set, Any, List
  6. from pprint import pprint
  7. import traceback
  8. from concrete_syntax.common import indent
  9. from api.cd import CDAPI
  10. from api.od import ODAPI
  11. import functools
  12. # based on https://stackoverflow.com/a/39381428
  13. # Parses and executes a block of Python code, and returns the eval result of the last statement
  14. import ast
  15. def exec_then_eval(code, _globals, _locals):
  16. block = ast.parse(code, mode='exec')
  17. # assumes last node is an expression
  18. last = ast.Expression(block.body.pop().value)
  19. exec(compile(block, '<string>', mode='exec'), _globals, _locals)
  20. return eval(compile(last, '<string>', mode='eval'), _globals, _locals)
  21. def render_conformance_check_result(error_list):
  22. if len(error_list) == 0:
  23. return "CONFORM"
  24. else:
  25. joined = ''.join(('\n ▸ ' + err for err in error_list))
  26. return f"NOT CONFORM, {len(error_list)} errors:{joined}"
  27. class Conformance:
  28. def __init__(self, state: State, model: UUID, type_model: UUID, constraint_check_subtypes=True):
  29. self.state = state
  30. self.bottom = Bottom(state)
  31. type_model_id = state.read_dict(state.read_root(), "SCD")
  32. self.scd_model = UUID(state.read_value(type_model_id))
  33. self.model = model
  34. self.type_model = type_model
  35. self.constraint_check_subtypes = constraint_check_subtypes # for a class-level constraint, also check the constraint on the subtypes of that class? In other words, are constraints inherited.
  36. self.type_mapping: Dict[str, str] = {}
  37. self.model_names = {
  38. # map model elements to their names to prevent iterating too much
  39. self.bottom.read_outgoing_elements(self.model, e)[0]: e
  40. for e in self.bottom.read_keys(self.model)
  41. }
  42. self.type_model_names = {
  43. # map type model elements to their names to prevent iterating too much
  44. self.bottom.read_outgoing_elements(self.type_model, e)[0]
  45. : e for e in self.bottom.read_keys(self.type_model)
  46. }
  47. self.sub_types: Dict[str, Set[str]] = {
  48. k: set() for k in self.bottom.read_keys(self.type_model)
  49. }
  50. self.primitive_values: Dict[UUID, Any] = {}
  51. self.abstract_types: List[str] = []
  52. self.multiplicities: Dict[str, Tuple] = {}
  53. self.source_multiplicities: Dict[str, Tuple] = {}
  54. self.target_multiplicities: Dict[str, Tuple] = {}
  55. self.structures = {}
  56. self.matches = {}
  57. self.candidates = {}
  58. self.odapi = ODAPI(state, model, type_model)
  59. def check_nominal(self, *, log=False):
  60. """
  61. Perform a nominal conformance check
  62. Args:
  63. log: boolean indicating whether to log errors
  64. Returns:
  65. Boolean indicating whether the check has passed
  66. """
  67. errors = []
  68. errors += self.check_typing()
  69. errors += self.check_link_typing()
  70. errors += self.check_multiplicities()
  71. errors += self.check_constraints()
  72. return errors
  73. # def check_structural(self, *, build_morphisms=True, log=False):
  74. # """
  75. # Perform a structural conformance check
  76. # Args:
  77. # build_morphisms: boolean indicating whether to create morpishm links
  78. # log: boolean indicating whether to log errors
  79. # Returns:
  80. # Boolean indicating whether the check has passed
  81. # """
  82. # try:
  83. # self.precompute_structures()
  84. # self.match_structures()
  85. # if build_morphisms:
  86. # self.build_morphisms()
  87. # self.check_nominal(log=log)
  88. # return True
  89. # except RuntimeError as e:
  90. # if log:
  91. # print(e)
  92. # return False
  93. def read_attribute(self, element: UUID, attr_name: str):
  94. """
  95. Read an attribute value attached to an element
  96. Args:
  97. element: UUID of the element
  98. attr_name: name of the attribute to read
  99. Returns:
  100. The value of hte attribute, if no attribute with given name is found, returns None
  101. """
  102. if element in self.type_model_names:
  103. # type model element
  104. element_name = self.type_model_names[element]
  105. model = self.type_model
  106. else:
  107. # model element
  108. element_name = self.model_names[element]
  109. model = self.model
  110. try:
  111. attr_elem, = self.bottom.read_outgoing_elements(model, f"{element_name}.{attr_name}")
  112. return self.primitive_values.get(attr_elem, self.bottom.read_value(UUID(self.bottom.read_value(attr_elem))))
  113. except ValueError:
  114. return None
  115. def precompute_sub_types(self):
  116. """
  117. Creates an internal representation of sub-type hierarchies that is
  118. more easily queryable that the state graph
  119. """
  120. # collect inheritance link instances
  121. inh_element, = self.bottom.read_outgoing_elements(self.scd_model, "Inheritance")
  122. inh_links = []
  123. for tm_element, tm_name in self.type_model_names.items():
  124. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  125. if inh_element in morphisms:
  126. # we have an instance of an inheritance link
  127. inh_links.append(tm_element)
  128. # for each inheritance link we add the parent and child to the sub types map
  129. for link in inh_links:
  130. tm_source = self.bottom.read_edge_source(link)
  131. tm_target = self.bottom.read_edge_target(link)
  132. parent_name = self.type_model_names[tm_target]
  133. child_name = self.type_model_names[tm_source]
  134. self.sub_types[parent_name].add(child_name)
  135. # iteratively expand the sub type hierarchies in the sub types map
  136. stop = False
  137. while not stop:
  138. stop = True
  139. for child_name, child_children in self.sub_types.items():
  140. for parent_name, parent_children in self.sub_types.items():
  141. if child_name in parent_children:
  142. original_size = len(parent_children)
  143. parent_children.update(child_children)
  144. if len(parent_children) != original_size:
  145. stop = False
  146. def deref_primitive_values(self):
  147. """
  148. Prefetch the values stored in referenced primitive type models
  149. """
  150. ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef")
  151. string_element, = self.bottom.read_outgoing_elements(self.scd_model, "String")
  152. boolean_element, = self.bottom.read_outgoing_elements(self.scd_model, "Boolean")
  153. integer_element, = self.bottom.read_outgoing_elements(self.scd_model, "Integer")
  154. t_deref = []
  155. t_refs = []
  156. for tm_element, tm_name in self.type_model_names.items():
  157. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  158. if ref_element in morphisms:
  159. t_refs.append(self.type_model_names[tm_element])
  160. elif string_element in morphisms:
  161. t_deref.append(tm_element)
  162. elif boolean_element in morphisms:
  163. t_deref.append(tm_element)
  164. elif integer_element in morphisms:
  165. t_deref.append(tm_element)
  166. for elem in t_deref:
  167. primitive_model = UUID(self.bottom.read_value(elem))
  168. primitive_value_node, = self.bottom.read_outgoing_elements(primitive_model)
  169. primitive_value = self.bottom.read_value(primitive_value_node)
  170. self.primitive_values[elem] = primitive_value
  171. for m_name, tm_name in self.type_mapping.items():
  172. if tm_name in t_refs:
  173. # dereference
  174. m_element, = self.bottom.read_outgoing_elements(self.model, m_name)
  175. primitive_model = UUID(self.bottom.read_value(m_element))
  176. try:
  177. primitive_value_node, = self.bottom.read_outgoing_elements(primitive_model)
  178. primitive_value = self.bottom.read_value(primitive_value_node)
  179. self.primitive_values[m_element] = primitive_value
  180. except ValueError:
  181. pass # multiple elements in model indicate that we're not dealing with a primitive
  182. def precompute_multiplicities(self):
  183. """
  184. Creates an internal representation of type multiplicities that is
  185. more easily queryable that the state graph
  186. """
  187. for tm_element, tm_name in self.type_model_names.items():
  188. # class abstract flags and multiplicities
  189. abstract = self.read_attribute(tm_element, "abstract")
  190. lc = self.read_attribute(tm_element, "lower_cardinality")
  191. uc = self.read_attribute(tm_element, "upper_cardinality")
  192. if abstract:
  193. self.abstract_types.append(tm_name)
  194. if lc or uc:
  195. mult = (
  196. lc if lc != None else float("-inf"),
  197. uc if uc != None else float("inf")
  198. )
  199. self.multiplicities[tm_name] = mult
  200. # multiplicities for associations
  201. slc = self.read_attribute(tm_element, "source_lower_cardinality")
  202. suc = self.read_attribute(tm_element, "source_upper_cardinality")
  203. if slc or suc:
  204. mult = (
  205. # slc if slc != None else float("-inf"),
  206. slc if slc != None else 0,
  207. suc if suc != None else float("inf")
  208. )
  209. self.source_multiplicities[tm_name] = mult
  210. tlc = self.read_attribute(tm_element, "target_lower_cardinality")
  211. tuc = self.read_attribute(tm_element, "target_upper_cardinality")
  212. if tlc or tuc:
  213. mult = (
  214. # tlc if tlc != None else float("-inf"),
  215. tlc if tlc != None else 0,
  216. tuc if tuc != None else float("inf")
  217. )
  218. self.target_multiplicities[tm_name] = mult
  219. # optional for attribute links
  220. opt = self.read_attribute(tm_element, "optional")
  221. if opt != None:
  222. self.source_multiplicities[tm_name] = (0, float('inf'))
  223. self.target_multiplicities[tm_name] = (0 if opt else 1, 1)
  224. def get_type(self, element: UUID):
  225. """
  226. Retrieve the type of an element (wrt. current type model)
  227. """
  228. morphisms = self.bottom.read_outgoing_elements(element, "Morphism")
  229. tm_element, = [m for m in morphisms if m in self.type_model_names.keys()]
  230. return tm_element
  231. def check_typing(self):
  232. """
  233. for each element of model check whether a morphism
  234. link exists to some element of type_model
  235. """
  236. errors = []
  237. ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef")
  238. model_names = self.bottom.read_keys(self.model)
  239. for m_name in model_names:
  240. m_element, = self.bottom.read_outgoing_elements(self.model, m_name)
  241. try:
  242. tm_element = self.get_type(m_element)
  243. tm_name = self.type_model_names[tm_element]
  244. self.type_mapping[m_name] = tm_name
  245. if ref_element in self.bottom.read_outgoing_elements(tm_element, "Morphism"):
  246. sub_m = UUID(self.bottom.read_value(m_element))
  247. sub_tm = UUID(self.bottom.read_value(tm_element))
  248. nested_errors = Conformance(self.state, sub_m, sub_tm).check_nominal()
  249. errors += [f"In ModelRef ({m_name}):" + err for err in nested_errors]
  250. except ValueError as e:
  251. import traceback
  252. traceback.format_exc(e)
  253. # no or too many morphism links found
  254. errors.append(f"Incorrectly typed element: '{m_name}'")
  255. return errors
  256. def check_link_typing(self):
  257. """
  258. for each link, check whether its source and target are of a valid type
  259. """
  260. errors = []
  261. self.precompute_sub_types()
  262. for m_name, tm_name in self.type_mapping.items():
  263. m_element, = self.bottom.read_outgoing_elements(self.model, m_name)
  264. m_source = self.bottom.read_edge_source(m_element)
  265. m_target = self.bottom.read_edge_target(m_element)
  266. if m_source == None or m_target == None:
  267. # element is not a link
  268. continue
  269. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  270. tm_source = self.bottom.read_edge_source(tm_element)
  271. tm_target = self.bottom.read_edge_target(tm_element)
  272. # check if source is typed correctly
  273. source_name = self.model_names[m_source]
  274. source_type_actual = self.type_mapping[source_name]
  275. source_type_expected = self.type_model_names[tm_source]
  276. if source_type_actual != source_type_expected:
  277. if source_type_actual not in self.sub_types[source_type_expected]:
  278. errors.append(f"Invalid source type '{source_type_actual}' for element '{m_name}'")
  279. # check if target is typed correctly
  280. target_name = self.model_names[m_target]
  281. target_type_actual = self.type_mapping[target_name]
  282. target_type_expected = self.type_model_names[tm_target]
  283. if target_type_actual != target_type_expected:
  284. if target_type_actual not in self.sub_types[target_type_expected]:
  285. errors.append(f"Invalid target type '{target_type_actual}' for element '{m_name}'")
  286. return errors
  287. def check_multiplicities(self):
  288. """
  289. Check whether multiplicities for all types are respected
  290. """
  291. self.deref_primitive_values()
  292. self.precompute_multiplicities()
  293. errors = []
  294. for type_name in self.type_model_names.values():
  295. # abstract classes
  296. if type_name in self.abstract_types:
  297. type_count = list(self.type_mapping.values()).count(type_name)
  298. if type_count > 0:
  299. errors.append(f"Invalid instantiation of abstract class: '{type_name}'")
  300. # class multiplicities
  301. if type_name in self.multiplicities:
  302. lc, uc = self.multiplicities[type_name]
  303. type_count = list(self.type_mapping.values()).count(type_name)
  304. for sub_type in self.sub_types[type_name]:
  305. type_count += list(self.type_mapping.values()).count(sub_type)
  306. if type_count < lc or type_count > uc:
  307. errors.append(f"Cardinality of type exceeds valid multiplicity range: '{type_name}' ({type_count})")
  308. # association/attribute source multiplicities
  309. if type_name in self.source_multiplicities:
  310. # type is an association
  311. type_obj, = self.bottom.read_outgoing_elements(self.type_model, type_name)
  312. tgt_type_obj = self.bottom.read_edge_target(type_obj)
  313. tgt_type_name = self.type_model_names[tgt_type_obj]
  314. lc, uc = self.source_multiplicities[type_name]
  315. for obj_name, obj_type_name in self.type_mapping.items():
  316. if obj_type_name == tgt_type_name or obj_type_name in self.sub_types[tgt_type_name]:
  317. # obj's type has this incoming association -> now we will count the number of links typed by it
  318. count = 0
  319. obj, = self.bottom.read_outgoing_elements(self.model, obj_name)
  320. incoming = self.bottom.read_incoming_edges(obj)
  321. for i in incoming:
  322. try:
  323. type_of_incoming_link = self.type_mapping[self.model_names[i]]
  324. if type_of_incoming_link == type_name or type_of_incoming_link in self.sub_types[type_name]:
  325. count += 1
  326. except KeyError:
  327. pass # for elements not part of model, e.g. morphism links
  328. if count < lc or count > uc:
  329. errors.append(f"Source cardinality of type '{type_name}' ({count}) out of bounds ({lc}..{uc}) in '{obj_name}'.")
  330. # association/attribute target multiplicities
  331. if type_name in self.target_multiplicities:
  332. # type is an association
  333. type_obj, = self.bottom.read_outgoing_elements(self.type_model, type_name)
  334. src_type_obj = self.bottom.read_edge_source(type_obj)
  335. src_type_name = self.type_model_names[src_type_obj]
  336. lc, uc = self.target_multiplicities[type_name]
  337. for obj_name, obj_type_name in self.type_mapping.items():
  338. if obj_type_name == src_type_name or obj_type_name in self.sub_types[src_type_name]:
  339. # obj's type has this outgoing association -> now we will count the number of links typed by it
  340. count = 0
  341. obj, = self.bottom.read_outgoing_elements(self.model, obj_name)
  342. outgoing = self.bottom.read_outgoing_edges(obj)
  343. for o in outgoing:
  344. try:
  345. type_of_outgoing_link = self.type_mapping[self.model_names[o]]
  346. if type_of_outgoing_link == type_name or type_of_outgoing_link in self.sub_types[type_name]:
  347. count += 1
  348. except KeyError:
  349. pass # for elements not part of model, e.g. morphism links
  350. if count < lc or count > uc:
  351. errors.append(f"Target cardinality of type '{type_name}' ({count}) out of bounds ({lc}..{uc}) in '{obj_name}'.")
  352. return errors
  353. def evaluate_constraint(self, code, **kwargs):
  354. """
  355. Evaluate constraint code (Python code)
  356. """
  357. funcs = {
  358. 'read_value': self.state.read_value,
  359. 'get': self.odapi.get,
  360. 'get_value': self.odapi.get_value,
  361. 'get_target': self.odapi.get_target,
  362. 'get_source': self.odapi.get_source,
  363. 'get_slot': self.odapi.get_slot,
  364. 'get_slot_value': self.odapi.get_slot_value,
  365. 'get_all_instances': self.odapi.get_all_instances,
  366. 'get_name': self.odapi.get_name,
  367. 'get_type_name': self.odapi.get_type_name,
  368. 'get_outgoing': self.odapi.get_outgoing,
  369. 'get_incoming': self.odapi.get_incoming,
  370. 'has_slot': self.odapi.has_slot,
  371. }
  372. # print("evaluating constraint ...", code)
  373. loc = {**kwargs, }
  374. result = exec_then_eval(
  375. code,
  376. {'__builtins__': {'isinstance': isinstance, 'print': print,
  377. 'int': int, 'float': float, 'bool': bool, 'str': str, 'tuple': tuple, 'len': len, 'set': set, 'dict': dict},
  378. **funcs
  379. }, # globals
  380. loc # locals
  381. )
  382. return result
  383. def check_constraints(self):
  384. """
  385. Check whether all constraints defined for a model are respected
  386. """
  387. errors = []
  388. def get_code(tm_name):
  389. constraints = self.bottom.read_outgoing_elements(self.type_model, f"{tm_name}.constraint")
  390. if len(constraints) == 1:
  391. constraint = constraints[0]
  392. code = ActionCode(UUID(self.bottom.read_value(constraint)), self.bottom.state).read()
  393. return code
  394. def check_result(result, description):
  395. if result == None:
  396. return # OK
  397. if isinstance(result, str):
  398. errors.append(f"{description} not satisfied. Reason: {result}")
  399. elif isinstance(result, bool):
  400. if not result:
  401. errors.append(f"{description} not satisfied.")
  402. elif isinstance(result, list):
  403. if len(result) > 0:
  404. reasons = indent('\n'.join(result), 4)
  405. errors.append(f"{description} not satisfied. Reasons:\n{reasons}")
  406. else:
  407. raise Exception(f"{description} evaluation result should be boolean or string! Instead got {result}")
  408. # local constraints
  409. for type_name in self.bottom.read_keys(self.type_model):
  410. code = get_code(type_name)
  411. if code != None:
  412. instances = self.odapi.get_all_instances(type_name, include_subtypes=self.constraint_check_subtypes)
  413. for obj_name, obj_id in instances:
  414. description = f"Local constraint of \"{type_name}\" in \"{obj_name}\""
  415. # print(description)
  416. try:
  417. result = self.evaluate_constraint(code, this=obj_id) # may raise
  418. check_result(result, description)
  419. except:
  420. errors.append(f"Runtime error during evaluation of {description}:\n{indent(traceback.format_exc(), 6)}")
  421. # global constraints
  422. glob_constraints = []
  423. # find global constraints...
  424. glob_constraint_type, = self.bottom.read_outgoing_elements(self.scd_model, "GlobalConstraint")
  425. for tm_name in self.bottom.read_keys(self.type_model):
  426. tm_node, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  427. # print(key, node)
  428. for type_of_node in self.bottom.read_outgoing_elements(tm_node, "Morphism"):
  429. if type_of_node == glob_constraint_type:
  430. # node is GlobalConstraint
  431. glob_constraints.append(tm_name)
  432. # evaluate them (each constraint once)
  433. for tm_name in glob_constraints:
  434. code = get_code(tm_name)
  435. if code != None:
  436. description = f"Global constraint \"{tm_name}\""
  437. try:
  438. result = self.evaluate_constraint(code, model=self.model)
  439. except:
  440. errors.append(f"Runtime error during evaluation of {description}:\n{indent(traceback.format_exc(), 6)}")
  441. check_result(result, description)
  442. return errors
  443. def precompute_structures(self):
  444. """
  445. Make an internal representation of type structures such that comparing type structures is easier
  446. """
  447. self.precompute_sub_types()
  448. scd_elements = self.bottom.read_outgoing_elements(self.scd_model)
  449. # collect types
  450. class_element, = self.bottom.read_outgoing_elements(self.scd_model, "Class")
  451. association_element, = self.bottom.read_outgoing_elements(self.scd_model, "Association")
  452. for tm_element, tm_name in self.type_model_names.items():
  453. # retrieve elements that tm_element is a morphism of
  454. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  455. morphism, = [m for m in morphisms if m in scd_elements]
  456. # check if tm_element is a morphism of AttributeLink
  457. if class_element == morphism or association_element == morphism:
  458. self.structures[tm_name] = set()
  459. # collect type structures
  460. # retrieve AttributeLink to check whether element is a morphism of AttributeLink
  461. attr_link_element, = self.bottom.read_outgoing_elements(self.scd_model, "AttributeLink")
  462. for tm_element, tm_name in self.type_model_names.items():
  463. # retrieve elements that tm_element is a morphism of
  464. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  465. morphism, = [m for m in morphisms if m in scd_elements]
  466. # check if tm_element is a morphism of AttributeLink
  467. if attr_link_element == morphism:
  468. # retrieve attributes of attribute link, i.e. 'name' and 'optional'
  469. attrs = self.bottom.read_outgoing_elements(tm_element)
  470. name_model_node, = filter(lambda x: self.type_model_names.get(x, "").endswith(".name"), attrs)
  471. opt_model_node, = filter(lambda x: self.type_model_names.get(x, "").endswith(".optional"), attrs)
  472. # get attr name value
  473. name_model = UUID(self.bottom.read_value(name_model_node))
  474. name_node, = self.bottom.read_outgoing_elements(name_model)
  475. name = self.bottom.read_value(name_node)
  476. # get attr opt value
  477. opt_model = UUID(self.bottom.read_value(opt_model_node))
  478. opt_node, = self.bottom.read_outgoing_elements(opt_model)
  479. opt = self.bottom.read_value(opt_node)
  480. # get attr type name
  481. source_type_node = self.bottom.read_edge_source(tm_element)
  482. source_type_name = self.type_model_names[source_type_node]
  483. target_type_node = self.bottom.read_edge_target(tm_element)
  484. target_type_name = self.type_model_names[target_type_node]
  485. # add attribute to the structure of its source type
  486. # attribute is stored as a (name, optional, type) triple
  487. self.structures.setdefault(source_type_name, set()).add((name, opt, target_type_name))
  488. # extend structures of sub types with attrs of super types
  489. for super_type, sub_types in self.sub_types.items():
  490. for sub_type in sub_types:
  491. self.structures.setdefault(sub_type, set()).update(self.structures[super_type])
  492. # filter out abstract types, as they cannot be instantiated
  493. # retrieve Class_abstract to check whether element is a morphism of Class_abstract
  494. class_abs_element, = self.bottom.read_outgoing_elements(self.scd_model, "Class_abstract")
  495. for tm_element, tm_name in self.type_model_names.items():
  496. # retrieve elements that tm_element is a morphism of
  497. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  498. morphism, = [m for m in morphisms if m in scd_elements]
  499. # check if tm_element is a morphism of Class_abstract
  500. if class_abs_element == morphism:
  501. # retrieve 'abstract' attribute value
  502. target_node = self.bottom.read_edge_target(tm_element)
  503. abst_model = UUID(self.bottom.read_value(target_node))
  504. abst_node, = self.bottom.read_outgoing_elements(abst_model)
  505. is_abstract = self.bottom.read_value(abst_node)
  506. # retrieve type name
  507. source_node = self.bottom.read_edge_source(tm_element)
  508. type_name = self.type_model_names[source_node]
  509. if is_abstract:
  510. self.structures.pop(type_name)
  511. def match_structures(self):
  512. """
  513. Try to match the structure of each element in the instance model to some element in the type model
  514. """
  515. ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef")
  516. # matching
  517. for m_element, m_name in self.model_names.items():
  518. is_edge = self.bottom.read_edge_source(m_element) != None
  519. print('element:', m_element, 'name:', m_name, 'is_edge', is_edge)
  520. for type_name, structure in self.structures.items():
  521. tm_element, = self.bottom.read_outgoing_elements(self.type_model, type_name)
  522. type_is_edge = self.bottom.read_edge_source(tm_element) != None
  523. if is_edge == type_is_edge:
  524. print(' type_name:', type_name, 'type_is_edge:', type_is_edge, "structure:", structure)
  525. mismatch = False
  526. matched = 0
  527. for name, optional, attr_type in structure:
  528. print(' name:', name, "optional:", optional, "attr_type:", attr_type)
  529. try:
  530. attr, = self.bottom.read_outgoing_elements(self.model, f"{m_name}.{name}")
  531. attr_tm, = self.bottom.read_outgoing_elements(self.type_model, attr_type)
  532. # if attribute is a modelref, we need to check whether it
  533. # linguistically conforms to the specified type
  534. # if its an internally defined attribute, this will be checked by constraints
  535. morphisms = self.bottom.read_outgoing_elements(attr_tm, "Morphism")
  536. attr_conforms = True
  537. if ref_element in morphisms:
  538. # check conformance of reference model
  539. type_model_uuid = UUID(self.bottom.read_value(attr_tm))
  540. model_uuid = UUID(self.bottom.read_value(attr))
  541. attr_conforms = Conformance(self.state, model_uuid, type_model_uuid)\
  542. .check_nominal()
  543. else:
  544. # eval constraints
  545. code = self.read_attribute(attr_tm, "constraint")
  546. if code != None:
  547. attr_conforms = self.evaluate_constraint(code, this=attr)
  548. if attr_conforms:
  549. matched += 1
  550. print(" attr_conforms -> matched:", matched)
  551. except ValueError as e:
  552. # attr not found or failed parsing UUID
  553. if optional:
  554. print(" skipping:", e)
  555. continue
  556. else:
  557. # did not match mandatory attribute
  558. print(" breaking:", e)
  559. mismatch = True
  560. break
  561. print(' matched:', matched, 'len(structure):', len(structure))
  562. # if matched == len(structure):
  563. if not mismatch:
  564. print(' add to candidates:', m_name, type_name)
  565. self.candidates.setdefault(m_name, set()).add(type_name)
  566. # filter out candidates for links based on source and target types
  567. for m_element, m_name in self.model_names.items():
  568. is_edge = self.bottom.read_edge_source(m_element) != None
  569. if is_edge and m_name in self.candidates:
  570. m_source = self.bottom.read_edge_source(m_element)
  571. m_target = self.bottom.read_edge_target(m_element)
  572. print(self.candidates)
  573. source_candidates = self.candidates[self.model_names[m_source]]
  574. target_candidates = self.candidates[self.model_names[m_target]]
  575. remove = set()
  576. for candidate_name in self.candidates[m_name]:
  577. candidate_element, = self.bottom.read_outgoing_elements(self.type_model, candidate_name)
  578. candidate_source = self.type_model_names[self.bottom.read_edge_source(candidate_element)]
  579. if candidate_source not in source_candidates:
  580. if len(source_candidates.intersection(set(self.sub_types[candidate_source]))) == 0:
  581. remove.add(candidate_name)
  582. candidate_target = self.type_model_names[self.bottom.read_edge_target(candidate_element)]
  583. if candidate_target not in target_candidates:
  584. if len(target_candidates.intersection(set(self.sub_types[candidate_target]))) == 0:
  585. remove.add(candidate_name)
  586. self.candidates[m_name] = self.candidates[m_name].difference(remove)
  587. def build_morphisms(self):
  588. """
  589. Build the morphisms between an instance and a type model that structurally match
  590. """
  591. if not all([len(c) == 1 for c in self.candidates.values()]):
  592. raise RuntimeError("Cannot build incomplete or ambiguous morphism.")
  593. mapping = {k: v.pop() for k, v in self.candidates.items()}
  594. for m_name, tm_name in mapping.items():
  595. # morphism to class/assoc
  596. m_element, = self.bottom.read_outgoing_elements(self.model, m_name)
  597. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  598. self.bottom.create_edge(m_element, tm_element, "Morphism")
  599. # morphism for attributes and attribute links
  600. structure = self.structures[tm_name]
  601. for attr_name, _, attr_type in structure:
  602. try:
  603. # attribute node
  604. attr_element, = self.bottom.read_outgoing_elements(self.model, f"{m_name}.{attr_name}")
  605. attr_type_element, = self.bottom.read_outgoing_elements(self.type_model, attr_type)
  606. self.bottom.create_edge(attr_element, attr_type_element, "Morphism")
  607. # attribute link
  608. attr_link_element, = self.bottom.read_outgoing_elements(self.model, f"{m_name}_{attr_name}")
  609. attr_link_type_element, = self.bottom.read_outgoing_elements(self.type_model, f"{tm_name}_{attr_name}")
  610. self.bottom.create_edge(attr_link_element, attr_link_type_element, "Morphism")
  611. except ValueError:
  612. pass
  613. if __name__ == '__main__':
  614. from state.devstate import DevState as State
  615. s = State()
  616. from bootstrap.scd import bootstrap_scd
  617. scd = bootstrap_scd(s)
  618. from bootstrap.pn import bootstrap_pn
  619. ltm_pn = bootstrap_pn(s, "PN")
  620. ltm_pn_lola = bootstrap_pn(s, "PNlola")
  621. from services.pn import PN
  622. my_pn = s.create_node()
  623. PNserv = PN(my_pn, s)
  624. PNserv.create_place("p1", 5)
  625. PNserv.create_place("p2", 0)
  626. PNserv.create_transition("t1")
  627. PNserv.create_p2t("p1", "t1", 1)
  628. PNserv.create_t2p("t1", "p2", 1)
  629. cf = Conformance(s, my_pn, ltm_pn_lola)
  630. # cf = Conformance(s, scd, ltm_pn, scd)
  631. cf.precompute_structures()
  632. cf.match_structures()
  633. cf.build_morphisms()
  634. print(cf.check_nominal())