conformance.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576
  1. from services.bottom.V0 import Bottom
  2. from uuid import UUID
  3. from state.base import State
  4. from typing import Dict, Tuple, Set, Any, List
  5. from pprint import pprint
  6. class Conformance:
  7. def __init__(self, state: State, model: UUID, type_model: UUID):
  8. self.state = state
  9. self.bottom = Bottom(state)
  10. type_model_id = state.read_dict(state.read_root(), "SCD")
  11. self.scd_model = UUID(state.read_value(type_model_id))
  12. self.model = model
  13. self.type_model = type_model
  14. self.type_mapping: Dict[str, str] = {}
  15. self.model_names = {
  16. # map model elements to their names to prevent iterating too much
  17. self.bottom.read_outgoing_elements(self.model, e)[0]: e
  18. for e in self.bottom.read_keys(self.model)
  19. }
  20. self.type_model_names = {
  21. # map type model elements to their names to prevent iterating too much
  22. self.bottom.read_outgoing_elements(self.type_model, e)[0]: e
  23. for e in self.bottom.read_keys(self.type_model)
  24. }
  25. self.sub_types: Dict[str, Set[str]] = {
  26. k: set() for k in self.bottom.read_keys(self.type_model)
  27. }
  28. self.primitive_values: Dict[UUID, Any] = {}
  29. self.abstract_types: List[str] = []
  30. self.multiplicities: Dict[str, Tuple] = {}
  31. self.source_multiplicities: Dict[str, Tuple] = {}
  32. self.target_multiplicities: Dict[str, Tuple] = {}
  33. self.structures = {}
  34. self.matches = {}
  35. self.candidates = {}
  36. def check_nominal(self, *, log=False):
  37. """
  38. Perform a nominal conformance check
  39. Args:
  40. log: boolean indicating whether to log errors
  41. Returns:
  42. Boolean indicating whether the check has passed
  43. """
  44. try:
  45. self.check_typing()
  46. self.check_link_typing()
  47. self.check_multiplicities()
  48. self.check_constraints()
  49. return True
  50. except RuntimeError as e:
  51. if log:
  52. print(e)
  53. return False
  54. def check_structural(self, *, build_morphisms=True, log=False):
  55. """
  56. Perform a structural conformance check
  57. Args:
  58. build_morphisms: boolean indicating whether to create morpishm links
  59. log: boolean indicating whether to log errors
  60. Returns:
  61. Boolean indicating whether the check has passed
  62. """
  63. try:
  64. self.precompute_structures()
  65. self.match_structures()
  66. if build_morphisms:
  67. self.build_morphisms()
  68. self.check_nominal(log=log)
  69. return True
  70. except RuntimeError as e:
  71. if log:
  72. print(e)
  73. return False
  74. def read_attribute(self, element: UUID, attr_name: str):
  75. """
  76. Read an attribute value attached to an element
  77. Args:
  78. element: UUID of the element
  79. attr_name: name of the attribute to read
  80. Returns:
  81. The value of hte attribute, if no attribute with given name is found, returns None
  82. """
  83. if element in self.type_model_names:
  84. # type model element
  85. element_name = self.type_model_names[element]
  86. model = self.type_model
  87. else:
  88. # model element
  89. element_name = self.model_names[element]
  90. model = self.model
  91. try:
  92. attr_elem, = self.bottom.read_outgoing_elements(model, f"{element_name}.{attr_name}")
  93. return self.primitive_values.get(attr_elem, self.bottom.read_value(attr_elem))
  94. except ValueError:
  95. return None
  96. def precompute_sub_types(self):
  97. """
  98. Creates an internal representation of sub-type hierarchies that is
  99. more easily queryable that the state graph
  100. """
  101. # collect inheritance link instances
  102. inh_element, = self.bottom.read_outgoing_elements(self.scd_model, "Inheritance")
  103. inh_links = []
  104. for tm_element, tm_name in self.type_model_names.items():
  105. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  106. if inh_element in morphisms:
  107. # we have an instance of an inheritance link
  108. inh_links.append(tm_element)
  109. # for each inheritance link we add the parent and child to the sub types map
  110. for link in inh_links:
  111. tm_source = self.bottom.read_edge_source(link)
  112. tm_target = self.bottom.read_edge_target(link)
  113. parent_name = self.type_model_names[tm_target]
  114. child_name = self.type_model_names[tm_source]
  115. self.sub_types[parent_name].add(child_name)
  116. # iteratively expand the sub type hierarchies in the sub types map
  117. stop = False
  118. while not stop:
  119. stop = True
  120. for child_name, child_children in self.sub_types.items():
  121. for parent_name, parent_children in self.sub_types.items():
  122. if child_name in parent_children:
  123. original_size = len(parent_children)
  124. parent_children.update(child_children)
  125. if len(parent_children) != original_size:
  126. stop = False
  127. def deref_primitive_values(self):
  128. """
  129. Prefetch the values stored in referenced primitive type models
  130. """
  131. ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef")
  132. string_element, = self.bottom.read_outgoing_elements(self.scd_model, "String")
  133. boolean_element, = self.bottom.read_outgoing_elements(self.scd_model, "Boolean")
  134. integer_element, = self.bottom.read_outgoing_elements(self.scd_model, "Integer")
  135. t_deref = []
  136. t_refs = []
  137. for tm_element, tm_name in self.type_model_names.items():
  138. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  139. if ref_element in morphisms:
  140. t_refs.append(self.type_model_names[tm_element])
  141. elif string_element in morphisms:
  142. t_deref.append(tm_element)
  143. elif boolean_element in morphisms:
  144. t_deref.append(tm_element)
  145. elif integer_element in morphisms:
  146. t_deref.append(tm_element)
  147. for elem in t_deref:
  148. primitive_model = UUID(self.bottom.read_value(elem))
  149. primitive_value_node, = self.bottom.read_outgoing_elements(primitive_model)
  150. primitive_value = self.bottom.read_value(primitive_value_node)
  151. self.primitive_values[elem] = primitive_value
  152. for m_name, tm_name in self.type_mapping.items():
  153. if tm_name in t_refs:
  154. # dereference
  155. m_element, = self.bottom.read_outgoing_elements(self.model, m_name)
  156. primitive_model = UUID(self.bottom.read_value(m_element))
  157. try:
  158. primitive_value_node, = self.bottom.read_outgoing_elements(primitive_model)
  159. primitive_value = self.bottom.read_value(primitive_value_node)
  160. self.primitive_values[m_element] = primitive_value
  161. except ValueError:
  162. pass # multiple elements in model indicate that we're not dealing with a primitive
  163. def precompute_multiplicities(self):
  164. """
  165. Creates an internal representation of type multiplicities that is
  166. more easily queryable that the state graph
  167. """
  168. for tm_element, tm_name in self.type_model_names.items():
  169. # class abstract flags and multiplicities
  170. abstract = self.read_attribute(tm_element, "abstract")
  171. lc = self.read_attribute(tm_element, "lower_cardinality")
  172. uc = self.read_attribute(tm_element, "upper_cardinality")
  173. if abstract:
  174. self.abstract_types.append(tm_name)
  175. if lc or uc:
  176. mult = (
  177. lc if lc is not None else float("-inf"),
  178. uc if uc is not None else float("inf")
  179. )
  180. self.multiplicities[tm_name] = mult
  181. # multiplicities for associations
  182. slc = self.read_attribute(tm_element, "source_lower_cardinality")
  183. suc = self.read_attribute(tm_element, "source_upper_cardinality")
  184. if slc or suc:
  185. mult = (
  186. slc if slc is not None else float("-inf"),
  187. suc if suc is not None else float("inf")
  188. )
  189. self.source_multiplicities[tm_name] = mult
  190. tlc = self.read_attribute(tm_element, "target_lower_cardinality")
  191. tuc = self.read_attribute(tm_element, "target_upper_cardinality")
  192. if tlc or tuc:
  193. mult = (
  194. tlc if tlc is not None else float("-inf"),
  195. tuc if tuc is not None else float("inf")
  196. )
  197. self.target_multiplicities[tm_name] = mult
  198. # optional for attribute links
  199. opt = self.read_attribute(tm_element, "optional")
  200. if opt is not None:
  201. self.source_multiplicities[tm_name] = (0 if opt else 1, 1)
  202. self.target_multiplicities[tm_name] = (0, 1)
  203. def get_type(self, element: UUID):
  204. """
  205. Retrieve the type of an element (wrt. current type model)
  206. """
  207. morphisms = self.bottom.read_outgoing_elements(element, "Morphism")
  208. tm_element, = [m for m in morphisms if m in self.type_model_names.keys()]
  209. return tm_element
  210. def check_typing(self):
  211. """
  212. for each element of model check whether a morphism
  213. link exists to some element of type_model
  214. """
  215. ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef")
  216. model_names = self.bottom.read_keys(self.model)
  217. for m_name in model_names:
  218. m_element, = self.bottom.read_outgoing_elements(self.model, m_name)
  219. try:
  220. tm_element = self.get_type(m_element)
  221. tm_name = self.type_model_names[tm_element]
  222. self.type_mapping[m_name] = tm_name
  223. if ref_element in self.bottom.read_outgoing_elements(tm_element, "Morphism"):
  224. sub_m = UUID(self.bottom.read_value(m_element))
  225. sub_tm = UUID(self.bottom.read_value(tm_element))
  226. if not Conformance(self.state, sub_m, sub_tm).check_nominal():
  227. raise RuntimeError(f"Incorrectly model reference: {m_name}")
  228. except ValueError:
  229. # no or too many morphism links found
  230. raise RuntimeError(f"Incorrectly typed element: {m_name}")
  231. return True
  232. def check_link_typing(self):
  233. """
  234. for each link, check whether its source and target are of a valid type
  235. """
  236. self.precompute_sub_types()
  237. for m_name, tm_name in self.type_mapping.items():
  238. m_element, = self.bottom.read_outgoing_elements(self.model, m_name)
  239. m_source = self.bottom.read_edge_source(m_element)
  240. m_target = self.bottom.read_edge_target(m_element)
  241. if m_source is None or m_target is None:
  242. # element is not a link
  243. continue
  244. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  245. tm_source = self.bottom.read_edge_source(tm_element)
  246. tm_target = self.bottom.read_edge_target(tm_element)
  247. # check if source is typed correctly
  248. source_name = self.model_names[m_source]
  249. source_type_actual = self.type_mapping[source_name]
  250. source_type_expected = self.type_model_names[tm_source]
  251. if source_type_actual != source_type_expected:
  252. if source_type_actual not in self.sub_types[source_type_expected]:
  253. raise RuntimeError(f"Invalid source type {source_type_actual} for element {m_name}")
  254. # check if target is typed correctly
  255. target_name = self.model_names[m_target]
  256. target_type_actual = self.type_mapping[target_name]
  257. target_type_expected = self.type_model_names[tm_target]
  258. if target_type_actual != target_type_expected:
  259. if target_type_actual not in self.sub_types[target_type_expected]:
  260. raise RuntimeError(f"Invalid target type {target_type_actual} for element {m_name}")
  261. return True
  262. def check_multiplicities(self):
  263. """
  264. Check whether multiplicities for all types are respected
  265. """
  266. self.deref_primitive_values()
  267. self.precompute_multiplicities()
  268. for tm_name in self.type_model_names.values():
  269. # abstract classes
  270. if tm_name in self.abstract_types:
  271. type_count = list(self.type_mapping.values()).count(tm_name)
  272. if type_count > 0:
  273. raise RuntimeError(f"Invalid instantiation of abstract class: {tm_name}")
  274. # class multiplicities
  275. if tm_name in self.multiplicities:
  276. lc, uc = self.multiplicities[tm_name]
  277. type_count = list(self.type_mapping.values()).count(tm_name)
  278. for sub_type in self.sub_types[tm_name]:
  279. type_count += list(self.type_mapping.values()).count(sub_type)
  280. if type_count < lc or type_count > uc:
  281. raise RuntimeError(f"Cardinality of type exceeds valid multiplicity range: {tm_name} ({type_count})")
  282. # association source multiplicities
  283. if tm_name in self.source_multiplicities:
  284. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  285. tm_source_element = self.bottom.read_edge_source(tm_element)
  286. tm_source_name = self.type_model_names[tm_source_element]
  287. lc, uc = self.source_multiplicities[tm_name]
  288. for i, t in self.type_mapping.items():
  289. if t == tm_source_name or t in self.sub_types[tm_source_name]:
  290. count = 0
  291. i_element, = self.bottom.read_outgoing_elements(self.model, i)
  292. outgoing = self.bottom.read_outgoing_edges(i_element)
  293. for o in outgoing:
  294. try:
  295. if self.type_mapping[self.model_names[o]] == tm_name:
  296. count += 1
  297. except KeyError:
  298. pass # for elements not part of model, e.g. morphism links
  299. if count < lc or count > uc:
  300. raise RuntimeError(f"Source cardinality of type {tm_name} exceeds valid multiplicity range in {i}.")
  301. # association target multiplicities
  302. if tm_name in self.target_multiplicities:
  303. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  304. tm_target_element = self.bottom.read_edge_target(tm_element)
  305. tm_target_name = self.type_model_names[tm_target_element]
  306. lc, uc = self.target_multiplicities[tm_name]
  307. for i, t in self.type_mapping.items():
  308. if t == tm_target_name or t in self.sub_types[tm_target_name]:
  309. count = 0
  310. i_element, = self.bottom.read_outgoing_elements(self.model, i)
  311. outgoing = self.bottom.read_incoming_edges(i_element)
  312. for o in outgoing:
  313. try:
  314. if self.type_mapping[self.model_names[o]] == tm_name:
  315. count += 1
  316. except KeyError:
  317. pass # for elements not part of model, e.g. morphism links
  318. if count < lc or count > uc:
  319. print(f"Target cardinality of type {tm_name} exceeds valid multiplicity range in {i}.")
  320. return False
  321. return True
  322. def evaluate_constraint(self, code, **kwargs):
  323. """
  324. Evaluate constraint code (Python code)
  325. """
  326. funcs = {
  327. 'read_value': self.state.read_value
  328. }
  329. return eval(
  330. code,
  331. {'__builtins__': {'isinstance': isinstance, 'print': print,
  332. 'int': int, 'float': float, 'bool': bool, 'str': str, 'tuple': tuple}
  333. }, # globals
  334. {**kwargs, **funcs} # locals
  335. )
  336. def check_constraints(self):
  337. """
  338. Check whether all constraints defined for a model are respected
  339. """
  340. # local constraints
  341. for m_name, tm_name in self.type_mapping.items():
  342. if tm_name != "GlobalConstraint":
  343. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  344. code = self.read_attribute(tm_element, "constraint")
  345. if code is not None:
  346. morphisms = self.bottom.read_incoming_elements(tm_element, "Morphism")
  347. morphisms = [m for m in morphisms if m in self.model_names]
  348. for m_element in morphisms:
  349. if not self.evaluate_constraint(code, element=m_element):
  350. raise RuntimeError(f"Local constraint of {tm_name} not satisfied in {m_name}.")
  351. # global constraints
  352. for m_name, tm_name in self.type_mapping.items():
  353. if tm_name == "GlobalConstraint":
  354. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  355. code = self.read_attribute(tm_element, "constraint")
  356. if code is not None:
  357. if not self.evaluate_constraint(code, model=self.model):
  358. raise RuntimeError(f"Global constraint {tm_name} not satisfied.")
  359. return True
  360. def precompute_structures(self):
  361. """
  362. Make an internal representation of type structures such that comparing type structures is easier
  363. """
  364. self.precompute_sub_types()
  365. scd_elements = self.bottom.read_outgoing_elements(self.scd_model)
  366. # collect types
  367. class_element, = self.bottom.read_outgoing_elements(self.scd_model, "Class")
  368. association_element, = self.bottom.read_outgoing_elements(self.scd_model, "Association")
  369. for tm_element, tm_name in self.type_model_names.items():
  370. # retrieve elements that tm_element is a morphism of
  371. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  372. morphism, = [m for m in morphisms if m in scd_elements]
  373. # check if tm_element is a morphism of AttributeLink
  374. if class_element == morphism or association_element == morphism:
  375. self.structures[tm_name] = set()
  376. # collect type structures
  377. # retrieve AttributeLink to check whether element is a morphism of AttributeLink
  378. attr_link_element, = self.bottom.read_outgoing_elements(self.scd_model, "AttributeLink")
  379. for tm_element, tm_name in self.type_model_names.items():
  380. # retrieve elements that tm_element is a morphism of
  381. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  382. morphism, = [m for m in morphisms if m in scd_elements]
  383. # check if tm_element is a morphism of AttributeLink
  384. if attr_link_element == morphism:
  385. # retrieve attributes of attribute link, i.e. 'name' and 'optional'
  386. attrs = self.bottom.read_outgoing_elements(tm_element)
  387. name_model_node, = filter(lambda x: self.type_model_names.get(x, "").endswith(".name"), attrs)
  388. opt_model_node, = filter(lambda x: self.type_model_names.get(x, "").endswith(".optional"), attrs)
  389. # get attr name value
  390. name_model = UUID(self.bottom.read_value(name_model_node))
  391. name_node, = self.bottom.read_outgoing_elements(name_model)
  392. name = self.bottom.read_value(name_node)
  393. # get attr opt value
  394. opt_model = UUID(self.bottom.read_value(opt_model_node))
  395. opt_node, = self.bottom.read_outgoing_elements(opt_model)
  396. opt = self.bottom.read_value(opt_node)
  397. # get attr type name
  398. source_type_node = self.bottom.read_edge_source(tm_element)
  399. source_type_name = self.type_model_names[source_type_node]
  400. target_type_node = self.bottom.read_edge_target(tm_element)
  401. target_type_name = self.type_model_names[target_type_node]
  402. # add attribute to the structure of its source type
  403. # attribute is stored as a (name, optional, type) triple
  404. self.structures.setdefault(source_type_name, set()).add((name, opt, target_type_name))
  405. # extend structures of sub types with attrs of super types
  406. for super_type, sub_types in self.sub_types.items():
  407. for sub_type in sub_types:
  408. self.structures.setdefault(sub_type, set()).update(self.structures[super_type])
  409. # filter out abstract types, as they cannot be instantiated
  410. # retrieve Class_abstract to check whether element is a morphism of Class_abstract
  411. class_abs_element, = self.bottom.read_outgoing_elements(self.scd_model, "Class_abstract")
  412. for tm_element, tm_name in self.type_model_names.items():
  413. # retrieve elements that tm_element is a morphism of
  414. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  415. morphism, = [m for m in morphisms if m in scd_elements]
  416. # check if tm_element is a morphism of Class_abstract
  417. if class_abs_element == morphism:
  418. # retrieve 'abstract' attribute value
  419. target_node = self.bottom.read_edge_target(tm_element)
  420. abst_model = UUID(self.bottom.read_value(target_node))
  421. abst_node, = self.bottom.read_outgoing_elements(abst_model)
  422. is_abstract = self.bottom.read_value(abst_node)
  423. # retrieve type name
  424. source_node = self.bottom.read_edge_source(tm_element)
  425. type_name = self.type_model_names[source_node]
  426. if is_abstract:
  427. self.structures.pop(type_name)
  428. def match_structures(self):
  429. """
  430. Try to match the structure of each element in the instance model to some element in the type model
  431. """
  432. ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef")
  433. # matching
  434. for m_element, m_name in self.model_names.items():
  435. is_edge = self.bottom.read_edge_source(m_element) is not None
  436. for type_name, structure in self.structures.items():
  437. tm_element, = self.bottom.read_outgoing_elements(self.type_model, type_name)
  438. type_is_edge = self.bottom.read_edge_source(tm_element) is not None
  439. if is_edge == type_is_edge:
  440. mismatch = False
  441. matched = 0
  442. for name, optional, attr_type in structure:
  443. try:
  444. attr, = self.bottom.read_outgoing_elements(self.model, f"{m_name}.{name}")
  445. attr_tm, = self.bottom.read_outgoing_elements(self.type_model, attr_type)
  446. # if attribute is a modelref, we need to check whether it
  447. # linguistically conforms to the specified type
  448. # if its an internally defined attribute, this will be checked by constraints
  449. morphisms = self.bottom.read_outgoing_elements(attr_tm, "Morphism")
  450. attr_conforms = True
  451. if ref_element in morphisms:
  452. # check conformance of reference model
  453. type_model_uuid = UUID(self.bottom.read_value(attr_tm))
  454. model_uuid = UUID(self.bottom.read_value(attr))
  455. attr_conforms = Conformance(self.state, model_uuid, type_model_uuid)\
  456. .check_nominal()
  457. else:
  458. # eval constraints
  459. code = self.read_attribute(attr_tm, "constraint")
  460. if code is not None:
  461. attr_conforms = self.evaluate_constraint(code, element=attr)
  462. if attr_conforms:
  463. matched += 1
  464. except ValueError:
  465. # attr not found or failed parsing UUID
  466. if optional:
  467. continue
  468. else:
  469. mismatch = True
  470. break
  471. # if matched == len(structure):
  472. if not mismatch:
  473. self.candidates.setdefault(m_name, set()).add(type_name)
  474. # filter out candidates for links based on source and target types
  475. for m_element, m_name in self.model_names.items():
  476. is_edge = self.bottom.read_edge_source(m_element) is not None
  477. if is_edge and m_name in self.candidates:
  478. m_source = self.bottom.read_edge_source(m_element)
  479. m_target = self.bottom.read_edge_target(m_element)
  480. source_candidates = self.candidates[self.model_names[m_source]]
  481. target_candidates = self.candidates[self.model_names[m_target]]
  482. remove = set()
  483. for candidate_name in self.candidates[m_name]:
  484. candidate_element, = self.bottom.read_outgoing_elements(self.type_model, candidate_name)
  485. candidate_source = self.type_model_names[self.bottom.read_edge_source(candidate_element)]
  486. if candidate_source not in source_candidates:
  487. if len(source_candidates.intersection(set(self.sub_types[candidate_source]))) == 0:
  488. remove.add(candidate_name)
  489. candidate_target = self.type_model_names[self.bottom.read_edge_target(candidate_element)]
  490. if candidate_target not in target_candidates:
  491. if len(target_candidates.intersection(set(self.sub_types[candidate_target]))) == 0:
  492. remove.add(candidate_name)
  493. self.candidates[m_name] = self.candidates[m_name].difference(remove)
  494. def build_morphisms(self):
  495. """
  496. Build the morphisms between an instance and a type model that structurally match
  497. """
  498. if not all([len(c) == 1 for c in self.candidates.values()]):
  499. raise RuntimeError("Cannot build incomplete or ambiguous morphism.")
  500. mapping = {k: v.pop() for k, v in self.candidates.items()}
  501. for m_name, tm_name in mapping.items():
  502. # morphism to class/assoc
  503. m_element, = self.bottom.read_outgoing_elements(self.model, m_name)
  504. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  505. self.bottom.create_edge(m_element, tm_element, "Morphism")
  506. # morphism for attributes and attribute links
  507. structure = self.structures[tm_name]
  508. for attr_name, _, attr_type in structure:
  509. try:
  510. # attribute node
  511. attr_element, = self.bottom.read_outgoing_elements(self.model, f"{m_name}.{attr_name}")
  512. attr_type_element, = self.bottom.read_outgoing_elements(self.type_model, attr_type)
  513. self.bottom.create_edge(attr_element, attr_type_element, "Morphism")
  514. # attribute link
  515. attr_link_element, = self.bottom.read_outgoing_elements(self.model, f"{m_name}.{attr_name}_link")
  516. attr_link_type_element, = self.bottom.read_outgoing_elements(self.type_model, f"{tm_name}_{attr_name}")
  517. self.bottom.create_edge(attr_link_element, attr_link_type_element, "Morphism")
  518. except ValueError:
  519. pass
  520. if __name__ == '__main__':
  521. from state.devstate import DevState as State
  522. s = State()
  523. from bootstrap.scd import bootstrap_scd
  524. scd = bootstrap_scd(s)
  525. from bootstrap.pn import bootstrap_pn
  526. ltm_pn = bootstrap_pn(s, "PN")
  527. ltm_pn_lola = bootstrap_pn(s, "PNlola")
  528. from services.pn import PN
  529. my_pn = s.create_node()
  530. PNserv = PN(my_pn, s)
  531. PNserv.create_place("p1", 5)
  532. PNserv.create_place("p2", 0)
  533. PNserv.create_transition("t1")
  534. PNserv.create_p2t("p1", "t1", 1)
  535. PNserv.create_t2p("t1", "p2", 1)
  536. cf = Conformance(s, my_pn, ltm_pn_lola)
  537. # cf = Conformance(s, scd, ltm_pn, scd)
  538. cf.precompute_structures()
  539. cf.match_structures()
  540. cf.build_morphisms()
  541. print(cf.check_nominal())