conformance.py 34 KB


  1. from services.bottom.V0 import Bottom
  2. from services import od
  3. from services.primitives.actioncode_type import ActionCode
  4. from uuid import UUID
  5. from state.base import State
  6. from typing import Dict, Tuple, Set, Any, List
  7. from pprint import pprint
  8. import functools
  9. # based on https://stackoverflow.com/a/39381428
  10. # Parses and executes a block of Python code, and returns the eval result of the last statement
  11. import ast
  12. def exec_then_eval(code, _globals, _locals):
  13. block = ast.parse(code, mode='exec')
  14. # assumes last node is an expression
  15. last = ast.Expression(block.body.pop().value)
  16. exec(compile(block, '<string>', mode='exec'), _globals, _locals)
  17. return eval(compile(last, '<string>', mode='eval'), _globals, _locals)
  18. def render_conformance_check_result(error_list):
  19. if len(error_list) == 0:
  20. return "OK"
  21. else:
  22. return f"There were {len(error_list)} errors: \n {'\n '.join(error_list)}"
  23. class Conformance:
  24. def __init__(self, state: State, model: UUID, type_model: UUID):
  25. self.state = state
  26. self.bottom = Bottom(state)
  27. type_model_id = state.read_dict(state.read_root(), "SCD")
  28. self.scd_model = UUID(state.read_value(type_model_id))
  29. self.model = model
  30. self.type_model = type_model
  31. self.type_mapping: Dict[str, str] = {}
  32. self.model_names = {
  33. # map model elements to their names to prevent iterating too much
  34. self.bottom.read_outgoing_elements(self.model, e)[0]: e
  35. for e in self.bottom.read_keys(self.model)
  36. }
  37. self.type_model_names = {
  38. # map type model elements to their names to prevent iterating too much
  39. self.bottom.read_outgoing_elements(self.type_model, e)[0]
  40. : e for e in self.bottom.read_keys(self.type_model)
  41. }
  42. self.sub_types: Dict[str, Set[str]] = {
  43. k: set() for k in self.bottom.read_keys(self.type_model)
  44. }
  45. self.primitive_values: Dict[UUID, Any] = {}
  46. self.abstract_types: List[str] = []
  47. self.multiplicities: Dict[str, Tuple] = {}
  48. self.source_multiplicities: Dict[str, Tuple] = {}
  49. self.target_multiplicities: Dict[str, Tuple] = {}
  50. self.structures = {}
  51. self.matches = {}
  52. self.candidates = {}
  53. def check_nominal(self, *, log=False):
  54. """
  55. Perform a nominal conformance check
  56. Args:
  57. log: boolean indicating whether to log errors
  58. Returns:
  59. Boolean indicating whether the check has passed
  60. """
  61. errors = []
  62. errors += self.check_typing()
  63. errors += self.check_link_typing()
  64. errors += self.check_multiplicities()
  65. errors += self.check_constraints()
  66. return errors
  67. # def check_structural(self, *, build_morphisms=True, log=False):
  68. # """
  69. # Perform a structural conformance check
  70. # Args:
  71. # build_morphisms: boolean indicating whether to create morpishm links
  72. # log: boolean indicating whether to log errors
  73. # Returns:
  74. # Boolean indicating whether the check has passed
  75. # """
  76. # try:
  77. # self.precompute_structures()
  78. # self.match_structures()
  79. # if build_morphisms:
  80. # self.build_morphisms()
  81. # self.check_nominal(log=log)
  82. # return True
  83. # except RuntimeError as e:
  84. # if log:
  85. # print(e)
  86. # return False
  87. def read_attribute(self, element: UUID, attr_name: str):
  88. """
  89. Read an attribute value attached to an element
  90. Args:
  91. element: UUID of the element
  92. attr_name: name of the attribute to read
  93. Returns:
  94. The value of hte attribute, if no attribute with given name is found, returns None
  95. """
  96. if element in self.type_model_names:
  97. # type model element
  98. element_name = self.type_model_names[element]
  99. model = self.type_model
  100. else:
  101. # model element
  102. element_name = self.model_names[element]
  103. model = self.model
  104. try:
  105. attr_elem, = self.bottom.read_outgoing_elements(model, f"{element_name}.{attr_name}")
  106. return self.primitive_values.get(attr_elem, self.bottom.read_value(UUID(self.bottom.read_value(attr_elem))))
  107. except ValueError:
  108. return None
  109. def precompute_sub_types(self):
  110. """
  111. Creates an internal representation of sub-type hierarchies that is
  112. more easily queryable that the state graph
  113. """
  114. # collect inheritance link instances
  115. inh_element, = self.bottom.read_outgoing_elements(self.scd_model, "Inheritance")
  116. inh_links = []
  117. for tm_element, tm_name in self.type_model_names.items():
  118. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  119. if inh_element in morphisms:
  120. # we have an instance of an inheritance link
  121. inh_links.append(tm_element)
  122. # for each inheritance link we add the parent and child to the sub types map
  123. for link in inh_links:
  124. tm_source = self.bottom.read_edge_source(link)
  125. tm_target = self.bottom.read_edge_target(link)
  126. parent_name = self.type_model_names[tm_target]
  127. child_name = self.type_model_names[tm_source]
  128. self.sub_types[parent_name].add(child_name)
  129. # iteratively expand the sub type hierarchies in the sub types map
  130. stop = False
  131. while not stop:
  132. stop = True
  133. for child_name, child_children in self.sub_types.items():
  134. for parent_name, parent_children in self.sub_types.items():
  135. if child_name in parent_children:
  136. original_size = len(parent_children)
  137. parent_children.update(child_children)
  138. if len(parent_children) != original_size:
  139. stop = False
  140. def deref_primitive_values(self):
  141. """
  142. Prefetch the values stored in referenced primitive type models
  143. """
  144. ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef")
  145. string_element, = self.bottom.read_outgoing_elements(self.scd_model, "String")
  146. boolean_element, = self.bottom.read_outgoing_elements(self.scd_model, "Boolean")
  147. integer_element, = self.bottom.read_outgoing_elements(self.scd_model, "Integer")
  148. t_deref = []
  149. t_refs = []
  150. for tm_element, tm_name in self.type_model_names.items():
  151. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  152. if ref_element in morphisms:
  153. t_refs.append(self.type_model_names[tm_element])
  154. elif string_element in morphisms:
  155. t_deref.append(tm_element)
  156. elif boolean_element in morphisms:
  157. t_deref.append(tm_element)
  158. elif integer_element in morphisms:
  159. t_deref.append(tm_element)
  160. for elem in t_deref:
  161. primitive_model = UUID(self.bottom.read_value(elem))
  162. primitive_value_node, = self.bottom.read_outgoing_elements(primitive_model)
  163. primitive_value = self.bottom.read_value(primitive_value_node)
  164. self.primitive_values[elem] = primitive_value
  165. for m_name, tm_name in self.type_mapping.items():
  166. if tm_name in t_refs:
  167. # dereference
  168. m_element, = self.bottom.read_outgoing_elements(self.model, m_name)
  169. primitive_model = UUID(self.bottom.read_value(m_element))
  170. try:
  171. primitive_value_node, = self.bottom.read_outgoing_elements(primitive_model)
  172. primitive_value = self.bottom.read_value(primitive_value_node)
  173. self.primitive_values[m_element] = primitive_value
  174. except ValueError:
  175. pass # multiple elements in model indicate that we're not dealing with a primitive
  176. def precompute_multiplicities(self):
  177. """
  178. Creates an internal representation of type multiplicities that is
  179. more easily queryable that the state graph
  180. """
  181. for tm_element, tm_name in self.type_model_names.items():
  182. # class abstract flags and multiplicities
  183. abstract = self.read_attribute(tm_element, "abstract")
  184. lc = self.read_attribute(tm_element, "lower_cardinality")
  185. uc = self.read_attribute(tm_element, "upper_cardinality")
  186. if abstract:
  187. self.abstract_types.append(tm_name)
  188. if lc or uc:
  189. mult = (
  190. lc if lc != None else float("-inf"),
  191. uc if uc != None else float("inf")
  192. )
  193. self.multiplicities[tm_name] = mult
  194. # multiplicities for associations
  195. slc = self.read_attribute(tm_element, "source_lower_cardinality")
  196. suc = self.read_attribute(tm_element, "source_upper_cardinality")
  197. if slc or suc:
  198. mult = (
  199. slc if slc != None else float("-inf"),
  200. suc if suc != None else float("inf")
  201. )
  202. self.source_multiplicities[tm_name] = mult
  203. tlc = self.read_attribute(tm_element, "target_lower_cardinality")
  204. tuc = self.read_attribute(tm_element, "target_upper_cardinality")
  205. if tlc or tuc:
  206. mult = (
  207. tlc if tlc != None else float("-inf"),
  208. tuc if tuc != None else float("inf")
  209. )
  210. self.target_multiplicities[tm_name] = mult
  211. # optional for attribute links
  212. opt = self.read_attribute(tm_element, "optional")
  213. if opt != None:
  214. self.source_multiplicities[tm_name] = (0, float('inf'))
  215. self.target_multiplicities[tm_name] = (0 if opt else 1, 1)
  216. def get_type(self, element: UUID):
  217. """
  218. Retrieve the type of an element (wrt. current type model)
  219. """
  220. morphisms = self.bottom.read_outgoing_elements(element, "Morphism")
  221. tm_element, = [m for m in morphisms if m in self.type_model_names.keys()]
  222. return tm_element
  223. def check_typing(self):
  224. """
  225. for each element of model check whether a morphism
  226. link exists to some element of type_model
  227. """
  228. errors = []
  229. ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef")
  230. model_names = self.bottom.read_keys(self.model)
  231. for m_name in model_names:
  232. m_element, = self.bottom.read_outgoing_elements(self.model, m_name)
  233. try:
  234. tm_element = self.get_type(m_element)
  235. tm_name = self.type_model_names[tm_element]
  236. self.type_mapping[m_name] = tm_name
  237. if ref_element in self.bottom.read_outgoing_elements(tm_element, "Morphism"):
  238. sub_m = UUID(self.bottom.read_value(m_element))
  239. sub_tm = UUID(self.bottom.read_value(tm_element))
  240. nested_errors = Conformance(self.state, sub_m, sub_tm).check_nominal()
  241. errors += [f"In ModelRef ({m_name}):" + err for err in nested_errors]
  242. except ValueError as e:
  243. import traceback
  244. traceback.format_exc(e)
  245. # no or too many morphism links found
  246. errors.append(f"Incorrectly typed element: {m_name}")
  247. return errors
  248. def check_link_typing(self):
  249. """
  250. for each link, check whether its source and target are of a valid type
  251. """
  252. errors = []
  253. self.precompute_sub_types()
  254. for m_name, tm_name in self.type_mapping.items():
  255. m_element, = self.bottom.read_outgoing_elements(self.model, m_name)
  256. m_source = self.bottom.read_edge_source(m_element)
  257. m_target = self.bottom.read_edge_target(m_element)
  258. if m_source == None or m_target == None:
  259. # element is not a link
  260. continue
  261. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  262. tm_source = self.bottom.read_edge_source(tm_element)
  263. tm_target = self.bottom.read_edge_target(tm_element)
  264. # check if source is typed correctly
  265. source_name = self.model_names[m_source]
  266. source_type_actual = self.type_mapping[source_name]
  267. source_type_expected = self.type_model_names[tm_source]
  268. if source_type_actual != source_type_expected:
  269. if source_type_actual not in self.sub_types[source_type_expected]:
  270. errors.append(f"Invalid source type {source_type_actual} for element {m_name}")
  271. # check if target is typed correctly
  272. target_name = self.model_names[m_target]
  273. target_type_actual = self.type_mapping[target_name]
  274. target_type_expected = self.type_model_names[tm_target]
  275. if target_type_actual != target_type_expected:
  276. if target_type_actual not in self.sub_types[target_type_expected]:
  277. errors.append(f"Invalid target type {target_type_actual} for element {m_name}")
  278. return errors
  279. def check_multiplicities(self):
  280. """
  281. Check whether multiplicities for all types are respected
  282. """
  283. self.deref_primitive_values()
  284. self.precompute_multiplicities()
  285. errors = []
  286. for tm_name in self.type_model_names.values():
  287. # abstract classes
  288. if tm_name in self.abstract_types:
  289. type_count = list(self.type_mapping.values()).count(tm_name)
  290. if type_count > 0:
  291. errors.append(f"Invalid instantiation of abstract class: {tm_name}")
  292. # class multiplicities
  293. if tm_name in self.multiplicities:
  294. lc, uc = self.multiplicities[tm_name]
  295. type_count = list(self.type_mapping.values()).count(tm_name)
  296. for sub_type in self.sub_types[tm_name]:
  297. type_count += list(self.type_mapping.values()).count(sub_type)
  298. if type_count < lc or type_count > uc:
  299. errors.append(f"Cardinality of type exceeds valid multiplicity range: {tm_name} ({type_count})")
  300. # association source multiplicities
  301. if tm_name in self.source_multiplicities:
  302. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  303. tm_tgt_element = self.bottom.read_edge_target(tm_element)
  304. tm_tgt_name = self.type_model_names[tm_tgt_element]
  305. lc, uc = self.source_multiplicities[tm_name]
  306. for tgt_obj_name, t in self.type_mapping.items():
  307. if t == tm_tgt_name or t in self.sub_types[tm_tgt_name]:
  308. count = 0
  309. tgt_obj_node, = self.bottom.read_outgoing_elements(self.model, tgt_obj_name)
  310. incoming = self.bottom.read_incoming_edges(tgt_obj_node)
  311. for i in incoming:
  312. try:
  313. if self.type_mapping[self.model_names[i]] == tm_name:
  314. count += 1
  315. except KeyError:
  316. pass # for elements not part of model, e.g. morphism links
  317. if count < lc or count > uc:
  318. errors.append(f"Source cardinality of type {tm_name} ({count}) out of bounds ({lc}..{uc}) in {tgt_obj_name}.")
  319. # association target multiplicities
  320. if tm_name in self.target_multiplicities:
  321. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  322. # tm_target_element = self.bottom.read_edge_target(tm_element)
  323. tm_src_element = self.bottom.read_edge_source(tm_element)
  324. tm_src_name = self.type_model_names[tm_src_element]
  325. lc, uc = self.target_multiplicities[tm_name]
  326. # print("checking assoc", tm_name, "source", tm_src_name)
  327. # print("subtypes of", tm_src_name, self.sub_types[tm_src_name])
  328. for src_obj_name, t in self.type_mapping.items():
  329. if t == tm_src_name or t in self.sub_types[tm_src_name]:
  330. # print("got obj", src_obj_name, "of type", t)
  331. count = 0
  332. src_obj_node, = self.bottom.read_outgoing_elements(self.model, src_obj_name)
  333. # outgoing = self.bottom.read_incoming_edges(src_obj_node)
  334. outgoing = self.bottom.read_outgoing_edges(src_obj_node)
  335. for o in outgoing:
  336. try:
  337. if self.type_mapping[self.model_names[o]] == tm_name:
  338. # print("have an outgoing edge", self.model_names[o], self.type_mapping[self.model_names[o]], "---> increase counter")
  339. count += 1
  340. except KeyError:
  341. pass # for elements not part of model, e.g. morphism links
  342. if count < lc or count > uc:
  343. errors.append(f"Target cardinality of type {tm_name} ({count}) out of bounds ({lc}..{uc}) in {src_obj_name}.")
  344. # else:
  345. # print(f"OK: Target cardinality of type {tm_name} ({count}) within bounds ({lc}..{uc}) in {src_obj_name}.")
  346. return errors
  347. def evaluate_constraint(self, code, **kwargs):
  348. """
  349. Evaluate constraint code (Python code)
  350. """
  351. funcs = {
  352. 'read_value': self.state.read_value,
  353. 'get_value': lambda el: od.read_primitive_value(self.bottom, el, self.type_model)[0],
  354. 'get_target': lambda el: self.bottom.read_edge_target(el),
  355. 'get_source': lambda el: self.bottom.read_edge_source(el),
  356. 'get_slot': od.OD(self.type_model, self.model, self.state).get_slot,
  357. 'get_all_instances': self.get_all_instances,
  358. 'get_name': lambda el: [name for name in self.bottom.read_keys(self.model) if self.bottom.read_outgoing_elements(self.model, name)[0] == el][0],
  359. 'get_type_name': self.get_type_name,
  360. 'get_outgoing': self.get_outgoing,
  361. 'get_incoming': self.get_incoming,
  362. }
  363. # print("evaluating constraint ...", code)
  364. loc = {**kwargs, }
  365. result = exec_then_eval(
  366. code,
  367. {'__builtins__': {'isinstance': isinstance, 'print': print,
  368. 'int': int, 'float': float, 'bool': bool, 'str': str, 'tuple': tuple, 'len': len, 'set': set, 'dict': dict},
  369. **funcs
  370. }, # globals
  371. loc # locals
  372. )
  373. # print('result =', result)
  374. return result
  375. def get_type_name(self, element: UUID):
  376. type_node = self.bottom.read_outgoing_elements(element, "Morphism")[0]
  377. for type_name in self.bottom.read_keys(self.type_model):
  378. if self.bottom.read_outgoing_elements(self.type_model, type_name)[0] == type_node:
  379. return type_name
  380. def get_all_instances(self, type_name: str, include_subtypes=True):
  381. result = [e_name for e_name, t_name in self.type_mapping.items() if t_name == type_name]
  382. if include_subtypes:
  383. for subtype_name in self.sub_types[type_name]:
  384. # print(subtype_name, 'is subtype of ')
  385. result += [e_name for e_name, t_name in self.type_mapping.items() if t_name == subtype_name]
  386. result_with_ids = [ (e_name, self.bottom.read_outgoing_elements(self.model, e_name)[0]) for e_name in result]
  387. return result_with_ids
  388. def get_outgoing(self, element: UUID, assoc_or_attr_name: str):
  389. return od.find_outgoing_typed_by(self.bottom, src=element, type_node=self.bottom.read_outgoing_elements(self.type_model, assoc_or_attr_name)[0])
  390. def get_incoming(self, element: UUID, assoc_or_attr_name: str):
  391. return od.find_incoming_typed_by(self.bottom, tgt=element, type_node=self.bottom.read_outgoing_elements(self.type_model, assoc_or_attr_name)[0])
  392. def check_constraints(self):
  393. """
  394. Check whether all constraints defined for a model are respected
  395. """
  396. errors = []
  397. def get_code(tm_name):
  398. constraints = self.bottom.read_outgoing_elements(self.type_model, f"{tm_name}.constraint")
  399. if len(constraints) == 1:
  400. constraint = constraints[0]
  401. code = ActionCode(UUID(self.bottom.read_value(constraint)), self.bottom.state).read()
  402. return code
  403. def check_result(result, description):
  404. if not isinstance(result, bool):
  405. raise Exception(f"{description} evaluation result is not boolean! Instead got {result}")
  406. if not result:
  407. errors.append(f"{description} not satisfied.")
  408. # local constraints
  409. for m_name, tm_name in self.type_mapping.items():
  410. code = get_code(tm_name)
  411. if code != None:
  412. # print('code:', code)
  413. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  414. m_element, = self.bottom.read_outgoing_elements(self.model, m_name)
  415. result = self.evaluate_constraint(code, this=m_element)
  416. description = f"Local constraint of \"{tm_name}\" in \"{m_name}\""
  417. check_result(result, description)
  418. # global constraints
  419. glob_constraints = []
  420. # find global constraints...
  421. glob_constraint_type, = self.bottom.read_outgoing_elements(self.scd_model, "GlobalConstraint")
  422. for tm_name in self.bottom.read_keys(self.type_model):
  423. tm_node, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  424. # print(key, node)
  425. for type_of_node in self.bottom.read_outgoing_elements(tm_node, "Morphism"):
  426. if type_of_node == glob_constraint_type:
  427. # node is GlobalConstraint
  428. glob_constraints.append(tm_name)
  429. # evaluate them
  430. for tm_name in glob_constraints:
  431. code = get_code(tm_name)
  432. if code != None:
  433. result = self.evaluate_constraint(code, model=self.model)
  434. description = f"Global constraint \"{tm_name}\""
  435. check_result(result, description)
  436. return errors
  437. def precompute_structures(self):
  438. """
  439. Make an internal representation of type structures such that comparing type structures is easier
  440. """
  441. self.precompute_sub_types()
  442. scd_elements = self.bottom.read_outgoing_elements(self.scd_model)
  443. # collect types
  444. class_element, = self.bottom.read_outgoing_elements(self.scd_model, "Class")
  445. association_element, = self.bottom.read_outgoing_elements(self.scd_model, "Association")
  446. for tm_element, tm_name in self.type_model_names.items():
  447. # retrieve elements that tm_element is a morphism of
  448. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  449. morphism, = [m for m in morphisms if m in scd_elements]
  450. # check if tm_element is a morphism of AttributeLink
  451. if class_element == morphism or association_element == morphism:
  452. self.structures[tm_name] = set()
  453. # collect type structures
  454. # retrieve AttributeLink to check whether element is a morphism of AttributeLink
  455. attr_link_element, = self.bottom.read_outgoing_elements(self.scd_model, "AttributeLink")
  456. for tm_element, tm_name in self.type_model_names.items():
  457. # retrieve elements that tm_element is a morphism of
  458. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  459. morphism, = [m for m in morphisms if m in scd_elements]
  460. # check if tm_element is a morphism of AttributeLink
  461. if attr_link_element == morphism:
  462. # retrieve attributes of attribute link, i.e. 'name' and 'optional'
  463. attrs = self.bottom.read_outgoing_elements(tm_element)
  464. name_model_node, = filter(lambda x: self.type_model_names.get(x, "").endswith(".name"), attrs)
  465. opt_model_node, = filter(lambda x: self.type_model_names.get(x, "").endswith(".optional"), attrs)
  466. # get attr name value
  467. name_model = UUID(self.bottom.read_value(name_model_node))
  468. name_node, = self.bottom.read_outgoing_elements(name_model)
  469. name = self.bottom.read_value(name_node)
  470. # get attr opt value
  471. opt_model = UUID(self.bottom.read_value(opt_model_node))
  472. opt_node, = self.bottom.read_outgoing_elements(opt_model)
  473. opt = self.bottom.read_value(opt_node)
  474. # get attr type name
  475. source_type_node = self.bottom.read_edge_source(tm_element)
  476. source_type_name = self.type_model_names[source_type_node]
  477. target_type_node = self.bottom.read_edge_target(tm_element)
  478. target_type_name = self.type_model_names[target_type_node]
  479. # add attribute to the structure of its source type
  480. # attribute is stored as a (name, optional, type) triple
  481. self.structures.setdefault(source_type_name, set()).add((name, opt, target_type_name))
  482. # extend structures of sub types with attrs of super types
  483. for super_type, sub_types in self.sub_types.items():
  484. for sub_type in sub_types:
  485. self.structures.setdefault(sub_type, set()).update(self.structures[super_type])
  486. # filter out abstract types, as they cannot be instantiated
  487. # retrieve Class_abstract to check whether element is a morphism of Class_abstract
  488. class_abs_element, = self.bottom.read_outgoing_elements(self.scd_model, "Class_abstract")
  489. for tm_element, tm_name in self.type_model_names.items():
  490. # retrieve elements that tm_element is a morphism of
  491. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  492. morphism, = [m for m in morphisms if m in scd_elements]
  493. # check if tm_element is a morphism of Class_abstract
  494. if class_abs_element == morphism:
  495. # retrieve 'abstract' attribute value
  496. target_node = self.bottom.read_edge_target(tm_element)
  497. abst_model = UUID(self.bottom.read_value(target_node))
  498. abst_node, = self.bottom.read_outgoing_elements(abst_model)
  499. is_abstract = self.bottom.read_value(abst_node)
  500. # retrieve type name
  501. source_node = self.bottom.read_edge_source(tm_element)
  502. type_name = self.type_model_names[source_node]
  503. if is_abstract:
  504. self.structures.pop(type_name)
  505. def match_structures(self):
  506. """
  507. Try to match the structure of each element in the instance model to some element in the type model
  508. """
  509. ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef")
  510. # matching
  511. for m_element, m_name in self.model_names.items():
  512. is_edge = self.bottom.read_edge_source(m_element) != None
  513. print('element:', m_element, 'name:', m_name, 'is_edge', is_edge)
  514. for type_name, structure in self.structures.items():
  515. tm_element, = self.bottom.read_outgoing_elements(self.type_model, type_name)
  516. type_is_edge = self.bottom.read_edge_source(tm_element) != None
  517. if is_edge == type_is_edge:
  518. print(' type_name:', type_name, 'type_is_edge:', type_is_edge, "structure:", structure)
  519. mismatch = False
  520. matched = 0
  521. for name, optional, attr_type in structure:
  522. print(' name:', name, "optional:", optional, "attr_type:", attr_type)
  523. try:
  524. attr, = self.bottom.read_outgoing_elements(self.model, f"{m_name}.{name}")
  525. attr_tm, = self.bottom.read_outgoing_elements(self.type_model, attr_type)
  526. # if attribute is a modelref, we need to check whether it
  527. # linguistically conforms to the specified type
  528. # if its an internally defined attribute, this will be checked by constraints
  529. morphisms = self.bottom.read_outgoing_elements(attr_tm, "Morphism")
  530. attr_conforms = True
  531. if ref_element in morphisms:
  532. # check conformance of reference model
  533. type_model_uuid = UUID(self.bottom.read_value(attr_tm))
  534. model_uuid = UUID(self.bottom.read_value(attr))
  535. attr_conforms = Conformance(self.state, model_uuid, type_model_uuid)\
  536. .check_nominal()
  537. else:
  538. # eval constraints
  539. code = self.read_attribute(attr_tm, "constraint")
  540. if code != None:
  541. attr_conforms = self.evaluate_constraint(code, this=attr)
  542. if attr_conforms:
  543. matched += 1
  544. print(" attr_conforms -> matched:", matched)
  545. except ValueError as e:
  546. # attr not found or failed parsing UUID
  547. if optional:
  548. print(" skipping:", e)
  549. continue
  550. else:
  551. # did not match mandatory attribute
  552. print(" breaking:", e)
  553. mismatch = True
  554. break
  555. print(' matched:', matched, 'len(structure):', len(structure))
  556. # if matched == len(structure):
  557. if not mismatch:
  558. print(' add to candidates:', m_name, type_name)
  559. self.candidates.setdefault(m_name, set()).add(type_name)
  560. # filter out candidates for links based on source and target types
  561. for m_element, m_name in self.model_names.items():
  562. is_edge = self.bottom.read_edge_source(m_element) != None
  563. if is_edge and m_name in self.candidates:
  564. m_source = self.bottom.read_edge_source(m_element)
  565. m_target = self.bottom.read_edge_target(m_element)
  566. print(self.candidates)
  567. source_candidates = self.candidates[self.model_names[m_source]]
  568. target_candidates = self.candidates[self.model_names[m_target]]
  569. remove = set()
  570. for candidate_name in self.candidates[m_name]:
  571. candidate_element, = self.bottom.read_outgoing_elements(self.type_model, candidate_name)
  572. candidate_source = self.type_model_names[self.bottom.read_edge_source(candidate_element)]
  573. if candidate_source not in source_candidates:
  574. if len(source_candidates.intersection(set(self.sub_types[candidate_source]))) == 0:
  575. remove.add(candidate_name)
  576. candidate_target = self.type_model_names[self.bottom.read_edge_target(candidate_element)]
  577. if candidate_target not in target_candidates:
  578. if len(target_candidates.intersection(set(self.sub_types[candidate_target]))) == 0:
  579. remove.add(candidate_name)
  580. self.candidates[m_name] = self.candidates[m_name].difference(remove)
  581. def build_morphisms(self):
  582. """
  583. Build the morphisms between an instance and a type model that structurally match
  584. """
  585. if not all([len(c) == 1 for c in self.candidates.values()]):
  586. raise RuntimeError("Cannot build incomplete or ambiguous morphism.")
  587. mapping = {k: v.pop() for k, v in self.candidates.items()}
  588. for m_name, tm_name in mapping.items():
  589. # morphism to class/assoc
  590. m_element, = self.bottom.read_outgoing_elements(self.model, m_name)
  591. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  592. self.bottom.create_edge(m_element, tm_element, "Morphism")
  593. # morphism for attributes and attribute links
  594. structure = self.structures[tm_name]
  595. for attr_name, _, attr_type in structure:
  596. try:
  597. # attribute node
  598. attr_element, = self.bottom.read_outgoing_elements(self.model, f"{m_name}.{attr_name}")
  599. attr_type_element, = self.bottom.read_outgoing_elements(self.type_model, attr_type)
  600. self.bottom.create_edge(attr_element, attr_type_element, "Morphism")
  601. # attribute link
  602. attr_link_element, = self.bottom.read_outgoing_elements(self.model, f"{m_name}_{attr_name}")
  603. attr_link_type_element, = self.bottom.read_outgoing_elements(self.type_model, f"{tm_name}_{attr_name}")
  604. self.bottom.create_edge(attr_link_element, attr_link_type_element, "Morphism")
  605. except ValueError:
  606. pass
  607. if __name__ == '__main__':
  608. from state.devstate import DevState as State
  609. s = State()
  610. from bootstrap.scd import bootstrap_scd
  611. scd = bootstrap_scd(s)
  612. from bootstrap.pn import bootstrap_pn
  613. ltm_pn = bootstrap_pn(s, "PN")
  614. ltm_pn_lola = bootstrap_pn(s, "PNlola")
  615. from services.pn import PN
  616. my_pn = s.create_node()
  617. PNserv = PN(my_pn, s)
  618. PNserv.create_place("p1", 5)
  619. PNserv.create_place("p2", 0)
  620. PNserv.create_transition("t1")
  621. PNserv.create_p2t("p1", "t1", 1)
  622. PNserv.create_t2p("t1", "p2", 1)
  623. cf = Conformance(s, my_pn, ltm_pn_lola)
  624. # cf = Conformance(s, scd, ltm_pn, scd)
  625. cf.precompute_structures()
  626. cf.match_structures()
  627. cf.build_morphisms()
  628. print(cf.check_nominal())