conformance.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598
  1. from services.bottom.V0 import Bottom
  2. from uuid import UUID
  3. from state.base import State
  4. from typing import Dict, Tuple, Set, Any, List
  5. from pprint import pprint
  6. import functools
  7. class Conformance:
  8. def __init__(self, state: State, model: UUID, type_model: UUID):
  9. self.state = state
  10. self.bottom = Bottom(state)
  11. type_model_id = state.read_dict(state.read_root(), "SCD")
  12. self.scd_model = UUID(state.read_value(type_model_id))
  13. self.model = model
  14. self.type_model = type_model
  15. self.type_mapping: Dict[str, str] = {}
  16. self.model_names = {
  17. # map model elements to their names to prevent iterating too much
  18. self.bottom.read_outgoing_elements(self.model, e)[0]: e
  19. for e in self.bottom.read_keys(self.model)
  20. }
  21. self.type_model_names = {
  22. # map type model elements to their names to prevent iterating too much
  23. self.bottom.read_outgoing_elements(self.type_model, e)[0]
  24. : e for e in self.bottom.read_keys(self.type_model)
  25. }
  26. self.sub_types: Dict[str, Set[str]] = {
  27. k: set() for k in self.bottom.read_keys(self.type_model)
  28. }
  29. self.primitive_values: Dict[UUID, Any] = {}
  30. self.abstract_types: List[str] = []
  31. self.multiplicities: Dict[str, Tuple] = {}
  32. self.source_multiplicities: Dict[str, Tuple] = {}
  33. self.target_multiplicities: Dict[str, Tuple] = {}
  34. self.structures = {}
  35. self.matches = {}
  36. self.candidates = {}
  37. def check_nominal(self, *, log=False):
  38. """
  39. Perform a nominal conformance check
  40. Args:
  41. log: boolean indicating whether to log errors
  42. Returns:
  43. Boolean indicating whether the check has passed
  44. """
  45. errors = []
  46. errors += self.check_typing()
  47. errors += self.check_link_typing()
  48. errors += self.check_multiplicities()
  49. errors += self.check_constraints()
  50. return errors
  51. # def check_structural(self, *, build_morphisms=True, log=False):
  52. # """
  53. # Perform a structural conformance check
  54. # Args:
  55. # build_morphisms: boolean indicating whether to create morpishm links
  56. # log: boolean indicating whether to log errors
  57. # Returns:
  58. # Boolean indicating whether the check has passed
  59. # """
  60. # try:
  61. # self.precompute_structures()
  62. # self.match_structures()
  63. # if build_morphisms:
  64. # self.build_morphisms()
  65. # self.check_nominal(log=log)
  66. # return True
  67. # except RuntimeError as e:
  68. # if log:
  69. # print(e)
  70. # return False
  71. def read_attribute(self, element: UUID, attr_name: str):
  72. """
  73. Read an attribute value attached to an element
  74. Args:
  75. element: UUID of the element
  76. attr_name: name of the attribute to read
  77. Returns:
  78. The value of hte attribute, if no attribute with given name is found, returns None
  79. """
  80. if element in self.type_model_names:
  81. # type model element
  82. element_name = self.type_model_names[element]
  83. model = self.type_model
  84. else:
  85. # model element
  86. element_name = self.model_names[element]
  87. model = self.model
  88. try:
  89. attr_elem, = self.bottom.read_outgoing_elements(model, f"{element_name}.{attr_name}")
  90. return self.primitive_values.get(attr_elem, self.bottom.read_value(attr_elem))
  91. except ValueError:
  92. return None
  93. def precompute_sub_types(self):
  94. """
  95. Creates an internal representation of sub-type hierarchies that is
  96. more easily queryable that the state graph
  97. """
  98. # collect inheritance link instances
  99. inh_element, = self.bottom.read_outgoing_elements(self.scd_model, "Inheritance")
  100. inh_links = []
  101. for tm_element, tm_name in self.type_model_names.items():
  102. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  103. if inh_element in morphisms:
  104. # we have an instance of an inheritance link
  105. inh_links.append(tm_element)
  106. # for each inheritance link we add the parent and child to the sub types map
  107. for link in inh_links:
  108. tm_source = self.bottom.read_edge_source(link)
  109. tm_target = self.bottom.read_edge_target(link)
  110. parent_name = self.type_model_names[tm_target]
  111. child_name = self.type_model_names[tm_source]
  112. self.sub_types[parent_name].add(child_name)
  113. # iteratively expand the sub type hierarchies in the sub types map
  114. stop = False
  115. while not stop:
  116. stop = True
  117. for child_name, child_children in self.sub_types.items():
  118. for parent_name, parent_children in self.sub_types.items():
  119. if child_name in parent_children:
  120. original_size = len(parent_children)
  121. parent_children.update(child_children)
  122. if len(parent_children) != original_size:
  123. stop = False
  124. def deref_primitive_values(self):
  125. """
  126. Prefetch the values stored in referenced primitive type models
  127. """
  128. ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef")
  129. string_element, = self.bottom.read_outgoing_elements(self.scd_model, "String")
  130. boolean_element, = self.bottom.read_outgoing_elements(self.scd_model, "Boolean")
  131. integer_element, = self.bottom.read_outgoing_elements(self.scd_model, "Integer")
  132. t_deref = []
  133. t_refs = []
  134. for tm_element, tm_name in self.type_model_names.items():
  135. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  136. if ref_element in morphisms:
  137. t_refs.append(self.type_model_names[tm_element])
  138. elif string_element in morphisms:
  139. t_deref.append(tm_element)
  140. elif boolean_element in morphisms:
  141. t_deref.append(tm_element)
  142. elif integer_element in morphisms:
  143. t_deref.append(tm_element)
  144. for elem in t_deref:
  145. primitive_model = UUID(self.bottom.read_value(elem))
  146. primitive_value_node, = self.bottom.read_outgoing_elements(primitive_model)
  147. primitive_value = self.bottom.read_value(primitive_value_node)
  148. self.primitive_values[elem] = primitive_value
  149. for m_name, tm_name in self.type_mapping.items():
  150. if tm_name in t_refs:
  151. # dereference
  152. m_element, = self.bottom.read_outgoing_elements(self.model, m_name)
  153. primitive_model = UUID(self.bottom.read_value(m_element))
  154. try:
  155. primitive_value_node, = self.bottom.read_outgoing_elements(primitive_model)
  156. primitive_value = self.bottom.read_value(primitive_value_node)
  157. self.primitive_values[m_element] = primitive_value
  158. except ValueError:
  159. pass # multiple elements in model indicate that we're not dealing with a primitive
  160. def precompute_multiplicities(self):
  161. """
  162. Creates an internal representation of type multiplicities that is
  163. more easily queryable that the state graph
  164. """
  165. for tm_element, tm_name in self.type_model_names.items():
  166. # class abstract flags and multiplicities
  167. abstract = self.read_attribute(tm_element, "abstract")
  168. lc = self.read_attribute(tm_element, "lower_cardinality")
  169. uc = self.read_attribute(tm_element, "upper_cardinality")
  170. if abstract:
  171. self.abstract_types.append(tm_name)
  172. if lc or uc:
  173. mult = (
  174. lc if lc != None else float("-inf"),
  175. uc if uc != None else float("inf")
  176. )
  177. self.multiplicities[tm_name] = mult
  178. # multiplicities for associations
  179. slc = self.read_attribute(tm_element, "source_lower_cardinality")
  180. suc = self.read_attribute(tm_element, "source_upper_cardinality")
  181. if slc or suc:
  182. mult = (
  183. slc if slc != None else float("-inf"),
  184. suc if suc != None else float("inf")
  185. )
  186. self.source_multiplicities[tm_name] = mult
  187. tlc = self.read_attribute(tm_element, "target_lower_cardinality")
  188. tuc = self.read_attribute(tm_element, "target_upper_cardinality")
  189. if tlc or tuc:
  190. mult = (
  191. tlc if tlc != None else float("-inf"),
  192. tuc if tuc != None else float("inf")
  193. )
  194. self.target_multiplicities[tm_name] = mult
  195. # optional for attribute links
  196. opt = self.read_attribute(tm_element, "optional")
  197. if opt != None:
  198. self.source_multiplicities[tm_name] = (0, float('inf'))
  199. self.target_multiplicities[tm_name] = (0 if opt else 1, 1)
  200. def get_type(self, element: UUID):
  201. """
  202. Retrieve the type of an element (wrt. current type model)
  203. """
  204. morphisms = self.bottom.read_outgoing_elements(element, "Morphism")
  205. tm_element, = [m for m in morphisms if m in self.type_model_names.keys()]
  206. return tm_element
  207. def check_typing(self):
  208. """
  209. for each element of model check whether a morphism
  210. link exists to some element of type_model
  211. """
  212. errors = []
  213. ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef")
  214. model_names = self.bottom.read_keys(self.model)
  215. for m_name in model_names:
  216. m_element, = self.bottom.read_outgoing_elements(self.model, m_name)
  217. try:
  218. tm_element = self.get_type(m_element)
  219. tm_name = self.type_model_names[tm_element]
  220. self.type_mapping[m_name] = tm_name
  221. if ref_element in self.bottom.read_outgoing_elements(tm_element, "Morphism"):
  222. sub_m = UUID(self.bottom.read_value(m_element))
  223. sub_tm = UUID(self.bottom.read_value(tm_element))
  224. nested_errors = Conformance(self.state, sub_m, sub_tm).check_nominal()
  225. errors += [f"In ModelRef ({m_name}):" + err for err in nested_errors]
  226. except ValueError as e:
  227. import traceback
  228. traceback.format_exc(e)
  229. # no or too many morphism links found
  230. errors.append(f"Incorrectly typed element: {m_name}")
  231. return errors
  232. def check_link_typing(self):
  233. """
  234. for each link, check whether its source and target are of a valid type
  235. """
  236. errors = []
  237. self.precompute_sub_types()
  238. for m_name, tm_name in self.type_mapping.items():
  239. m_element, = self.bottom.read_outgoing_elements(self.model, m_name)
  240. m_source = self.bottom.read_edge_source(m_element)
  241. m_target = self.bottom.read_edge_target(m_element)
  242. if m_source == None or m_target == None:
  243. # element is not a link
  244. continue
  245. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  246. tm_source = self.bottom.read_edge_source(tm_element)
  247. tm_target = self.bottom.read_edge_target(tm_element)
  248. # check if source is typed correctly
  249. source_name = self.model_names[m_source]
  250. source_type_actual = self.type_mapping[source_name]
  251. source_type_expected = self.type_model_names[tm_source]
  252. if source_type_actual != source_type_expected:
  253. if source_type_actual not in self.sub_types[source_type_expected]:
  254. errors.append(f"Invalid source type {source_type_actual} for element {m_name}")
  255. # check if target is typed correctly
  256. target_name = self.model_names[m_target]
  257. target_type_actual = self.type_mapping[target_name]
  258. target_type_expected = self.type_model_names[tm_target]
  259. if target_type_actual != target_type_expected:
  260. if target_type_actual not in self.sub_types[target_type_expected]:
  261. errors.append(f"Invalid target type {target_type_actual} for element {m_name}")
  262. return errors
  263. def check_multiplicities(self):
  264. """
  265. Check whether multiplicities for all types are respected
  266. """
  267. self.deref_primitive_values()
  268. self.precompute_multiplicities()
  269. errors = []
  270. for tm_name in self.type_model_names.values():
  271. # abstract classes
  272. if tm_name in self.abstract_types:
  273. type_count = list(self.type_mapping.values()).count(tm_name)
  274. if type_count > 0:
  275. errors.append(f"Invalid instantiation of abstract class: {tm_name}")
  276. # class multiplicities
  277. if tm_name in self.multiplicities:
  278. lc, uc = self.multiplicities[tm_name]
  279. type_count = list(self.type_mapping.values()).count(tm_name)
  280. for sub_type in self.sub_types[tm_name]:
  281. type_count += list(self.type_mapping.values()).count(sub_type)
  282. if type_count < lc or type_count > uc:
  283. errors.append(f"Cardinality of type exceeds valid multiplicity range: {tm_name} ({type_count})")
  284. # association source multiplicities
  285. if tm_name in self.source_multiplicities:
  286. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  287. tm_tgt_element = self.bottom.read_edge_target(tm_element)
  288. tm_tgt_name = self.type_model_names[tm_tgt_element]
  289. lc, uc = self.source_multiplicities[tm_name]
  290. for tgt_obj_name, t in self.type_mapping.items():
  291. if t == tm_tgt_name or t in self.sub_types[tm_tgt_name]:
  292. count = 0
  293. tgt_obj_node, = self.bottom.read_outgoing_elements(self.model, tgt_obj_name)
  294. incoming = self.bottom.read_incoming_edges(tgt_obj_node)
  295. for i in incoming:
  296. try:
  297. if self.type_mapping[self.model_names[i]] == tm_name:
  298. count += 1
  299. except KeyError:
  300. pass # for elements not part of model, e.g. morphism links
  301. if count < lc or count > uc:
  302. errors.append(f"Source cardinality of type {tm_name} ({count}) out of bounds ({lc}..{uc}) in {tgt_obj_name}.")
  303. # association target multiplicities
  304. if tm_name in self.target_multiplicities:
  305. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  306. # tm_target_element = self.bottom.read_edge_target(tm_element)
  307. tm_src_element = self.bottom.read_edge_source(tm_element)
  308. tm_src_name = self.type_model_names[tm_src_element]
  309. lc, uc = self.target_multiplicities[tm_name]
  310. # print("checking assoc", tm_name, "source", tm_src_name)
  311. # print("subtypes of", tm_src_name, self.sub_types[tm_src_name])
  312. for src_obj_name, t in self.type_mapping.items():
  313. if t == tm_src_name or t in self.sub_types[tm_src_name]:
  314. # print("got obj", src_obj_name, "of type", t)
  315. count = 0
  316. src_obj_node, = self.bottom.read_outgoing_elements(self.model, src_obj_name)
  317. # outgoing = self.bottom.read_incoming_edges(src_obj_node)
  318. outgoing = self.bottom.read_outgoing_edges(src_obj_node)
  319. for o in outgoing:
  320. try:
  321. if self.type_mapping[self.model_names[o]] == tm_name:
  322. # print("have an outgoing edge", self.model_names[o], self.type_mapping[self.model_names[o]], "---> increase counter")
  323. count += 1
  324. except KeyError:
  325. pass # for elements not part of model, e.g. morphism links
  326. if count < lc or count > uc:
  327. errors.append(f"Target cardinality of type {tm_name} ({count}) out of bounds ({lc}..{uc}) in {src_obj_name}.")
  328. # else:
  329. # print(f"OK: Target cardinality of type {tm_name} ({count}) within bounds ({lc}..{uc}) in {src_obj_name}.")
  330. return errors
  331. def evaluate_constraint(self, code, **kwargs):
  332. """
  333. Evaluate constraint code (Python code)
  334. """
  335. funcs = {
  336. 'read_value': self.state.read_value
  337. }
  338. return eval(
  339. code,
  340. {'__builtins__': {'isinstance': isinstance, 'print': print,
  341. 'int': int, 'float': float, 'bool': bool, 'str': str, 'tuple': tuple}
  342. }, # globals
  343. {**kwargs, **funcs} # locals
  344. )
  345. def check_constraints(self):
  346. """
  347. Check whether all constraints defined for a model are respected
  348. """
  349. # local constraints
  350. errors = []
  351. for m_name, tm_name in self.type_mapping.items():
  352. if tm_name != "GlobalConstraint":
  353. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  354. code = self.read_attribute(tm_element, "constraint")
  355. if code != None:
  356. morphisms = self.bottom.read_incoming_elements(tm_element, "Morphism")
  357. morphisms = [m for m in morphisms if m in self.model_names]
  358. for m_element in morphisms:
  359. if not self.evaluate_constraint(code, element=m_element):
  360. errors.append(f"Local constraint of {tm_name} not satisfied in {m_name}.")
  361. # global constraints
  362. for m_name, tm_name in self.type_mapping.items():
  363. if tm_name == "GlobalConstraint":
  364. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  365. code = self.read_attribute(tm_element, "constraint")
  366. if code != None:
  367. if not self.evaluate_constraint(code, model=self.model):
  368. errors.append(f"Global constraint {tm_name} not satisfied.")
  369. return errors
  370. def precompute_structures(self):
  371. """
  372. Make an internal representation of type structures such that comparing type structures is easier
  373. """
  374. self.precompute_sub_types()
  375. scd_elements = self.bottom.read_outgoing_elements(self.scd_model)
  376. # collect types
  377. class_element, = self.bottom.read_outgoing_elements(self.scd_model, "Class")
  378. association_element, = self.bottom.read_outgoing_elements(self.scd_model, "Association")
  379. for tm_element, tm_name in self.type_model_names.items():
  380. # retrieve elements that tm_element is a morphism of
  381. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  382. morphism, = [m for m in morphisms if m in scd_elements]
  383. # check if tm_element is a morphism of AttributeLink
  384. if class_element == morphism or association_element == morphism:
  385. self.structures[tm_name] = set()
  386. # collect type structures
  387. # retrieve AttributeLink to check whether element is a morphism of AttributeLink
  388. attr_link_element, = self.bottom.read_outgoing_elements(self.scd_model, "AttributeLink")
  389. for tm_element, tm_name in self.type_model_names.items():
  390. # retrieve elements that tm_element is a morphism of
  391. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  392. morphism, = [m for m in morphisms if m in scd_elements]
  393. # check if tm_element is a morphism of AttributeLink
  394. if attr_link_element == morphism:
  395. # retrieve attributes of attribute link, i.e. 'name' and 'optional'
  396. attrs = self.bottom.read_outgoing_elements(tm_element)
  397. name_model_node, = filter(lambda x: self.type_model_names.get(x, "").endswith(".name"), attrs)
  398. opt_model_node, = filter(lambda x: self.type_model_names.get(x, "").endswith(".optional"), attrs)
  399. # get attr name value
  400. name_model = UUID(self.bottom.read_value(name_model_node))
  401. name_node, = self.bottom.read_outgoing_elements(name_model)
  402. name = self.bottom.read_value(name_node)
  403. # get attr opt value
  404. opt_model = UUID(self.bottom.read_value(opt_model_node))
  405. opt_node, = self.bottom.read_outgoing_elements(opt_model)
  406. opt = self.bottom.read_value(opt_node)
  407. # get attr type name
  408. source_type_node = self.bottom.read_edge_source(tm_element)
  409. source_type_name = self.type_model_names[source_type_node]
  410. target_type_node = self.bottom.read_edge_target(tm_element)
  411. target_type_name = self.type_model_names[target_type_node]
  412. # add attribute to the structure of its source type
  413. # attribute is stored as a (name, optional, type) triple
  414. self.structures.setdefault(source_type_name, set()).add((name, opt, target_type_name))
  415. # extend structures of sub types with attrs of super types
  416. for super_type, sub_types in self.sub_types.items():
  417. for sub_type in sub_types:
  418. self.structures.setdefault(sub_type, set()).update(self.structures[super_type])
  419. # filter out abstract types, as they cannot be instantiated
  420. # retrieve Class_abstract to check whether element is a morphism of Class_abstract
  421. class_abs_element, = self.bottom.read_outgoing_elements(self.scd_model, "Class_abstract")
  422. for tm_element, tm_name in self.type_model_names.items():
  423. # retrieve elements that tm_element is a morphism of
  424. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  425. morphism, = [m for m in morphisms if m in scd_elements]
  426. # check if tm_element is a morphism of Class_abstract
  427. if class_abs_element == morphism:
  428. # retrieve 'abstract' attribute value
  429. target_node = self.bottom.read_edge_target(tm_element)
  430. abst_model = UUID(self.bottom.read_value(target_node))
  431. abst_node, = self.bottom.read_outgoing_elements(abst_model)
  432. is_abstract = self.bottom.read_value(abst_node)
  433. # retrieve type name
  434. source_node = self.bottom.read_edge_source(tm_element)
  435. type_name = self.type_model_names[source_node]
  436. if is_abstract:
  437. self.structures.pop(type_name)
  438. def match_structures(self):
  439. """
  440. Try to match the structure of each element in the instance model to some element in the type model
  441. """
  442. ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef")
  443. # matching
  444. for m_element, m_name in self.model_names.items():
  445. is_edge = self.bottom.read_edge_source(m_element) != None
  446. print('element:', m_element, 'name:', m_name, 'is_edge', is_edge)
  447. for type_name, structure in self.structures.items():
  448. tm_element, = self.bottom.read_outgoing_elements(self.type_model, type_name)
  449. type_is_edge = self.bottom.read_edge_source(tm_element) != None
  450. if is_edge == type_is_edge:
  451. print(' type_name:', type_name, 'type_is_edge:', type_is_edge, "structure:", structure)
  452. mismatch = False
  453. matched = 0
  454. for name, optional, attr_type in structure:
  455. print(' name:', name, "optional:", optional, "attr_type:", attr_type)
  456. try:
  457. attr, = self.bottom.read_outgoing_elements(self.model, f"{m_name}.{name}")
  458. attr_tm, = self.bottom.read_outgoing_elements(self.type_model, attr_type)
  459. # if attribute is a modelref, we need to check whether it
  460. # linguistically conforms to the specified type
  461. # if its an internally defined attribute, this will be checked by constraints
  462. morphisms = self.bottom.read_outgoing_elements(attr_tm, "Morphism")
  463. attr_conforms = True
  464. if ref_element in morphisms:
  465. # check conformance of reference model
  466. type_model_uuid = UUID(self.bottom.read_value(attr_tm))
  467. model_uuid = UUID(self.bottom.read_value(attr))
  468. attr_conforms = Conformance(self.state, model_uuid, type_model_uuid)\
  469. .check_nominal()
  470. else:
  471. # eval constraints
  472. code = self.read_attribute(attr_tm, "constraint")
  473. if code != None:
  474. attr_conforms = self.evaluate_constraint(code, element=attr)
  475. if attr_conforms:
  476. matched += 1
  477. print(" attr_conforms -> matched:", matched)
  478. except ValueError as e:
  479. # attr not found or failed parsing UUID
  480. if optional:
  481. print(" skipping:", e)
  482. continue
  483. else:
  484. # did not match mandatory attribute
  485. print(" breaking:", e)
  486. mismatch = True
  487. break
  488. print(' matched:', matched, 'len(structure):', len(structure))
  489. # if matched == len(structure):
  490. if not mismatch:
  491. print(' add to candidates:', m_name, type_name)
  492. self.candidates.setdefault(m_name, set()).add(type_name)
  493. # filter out candidates for links based on source and target types
  494. for m_element, m_name in self.model_names.items():
  495. is_edge = self.bottom.read_edge_source(m_element) != None
  496. if is_edge and m_name in self.candidates:
  497. m_source = self.bottom.read_edge_source(m_element)
  498. m_target = self.bottom.read_edge_target(m_element)
  499. print(self.candidates)
  500. source_candidates = self.candidates[self.model_names[m_source]]
  501. target_candidates = self.candidates[self.model_names[m_target]]
  502. remove = set()
  503. for candidate_name in self.candidates[m_name]:
  504. candidate_element, = self.bottom.read_outgoing_elements(self.type_model, candidate_name)
  505. candidate_source = self.type_model_names[self.bottom.read_edge_source(candidate_element)]
  506. if candidate_source not in source_candidates:
  507. if len(source_candidates.intersection(set(self.sub_types[candidate_source]))) == 0:
  508. remove.add(candidate_name)
  509. candidate_target = self.type_model_names[self.bottom.read_edge_target(candidate_element)]
  510. if candidate_target not in target_candidates:
  511. if len(target_candidates.intersection(set(self.sub_types[candidate_target]))) == 0:
  512. remove.add(candidate_name)
  513. self.candidates[m_name] = self.candidates[m_name].difference(remove)
  514. def build_morphisms(self):
  515. """
  516. Build the morphisms between an instance and a type model that structurally match
  517. """
  518. if not all([len(c) == 1 for c in self.candidates.values()]):
  519. raise RuntimeError("Cannot build incomplete or ambiguous morphism.")
  520. mapping = {k: v.pop() for k, v in self.candidates.items()}
  521. for m_name, tm_name in mapping.items():
  522. # morphism to class/assoc
  523. m_element, = self.bottom.read_outgoing_elements(self.model, m_name)
  524. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  525. self.bottom.create_edge(m_element, tm_element, "Morphism")
  526. # morphism for attributes and attribute links
  527. structure = self.structures[tm_name]
  528. for attr_name, _, attr_type in structure:
  529. try:
  530. # attribute node
  531. attr_element, = self.bottom.read_outgoing_elements(self.model, f"{m_name}.{attr_name}")
  532. attr_type_element, = self.bottom.read_outgoing_elements(self.type_model, attr_type)
  533. self.bottom.create_edge(attr_element, attr_type_element, "Morphism")
  534. # attribute link
  535. attr_link_element, = self.bottom.read_outgoing_elements(self.model, f"{m_name}_{attr_name}")
  536. attr_link_type_element, = self.bottom.read_outgoing_elements(self.type_model, f"{tm_name}_{attr_name}")
  537. self.bottom.create_edge(attr_link_element, attr_link_type_element, "Morphism")
  538. except ValueError:
  539. pass
  540. if __name__ == '__main__':
  541. from state.devstate import DevState as State
  542. s = State()
  543. from bootstrap.scd import bootstrap_scd
  544. scd = bootstrap_scd(s)
  545. from bootstrap.pn import bootstrap_pn
  546. ltm_pn = bootstrap_pn(s, "PN")
  547. ltm_pn_lola = bootstrap_pn(s, "PNlola")
  548. from services.pn import PN
  549. my_pn = s.create_node()
  550. PNserv = PN(my_pn, s)
  551. PNserv.create_place("p1", 5)
  552. PNserv.create_place("p2", 0)
  553. PNserv.create_transition("t1")
  554. PNserv.create_p2t("p1", "t1", 1)
  555. PNserv.create_t2p("t1", "p2", 1)
  556. cf = Conformance(s, my_pn, ltm_pn_lola)
  557. # cf = Conformance(s, scd, ltm_pn, scd)
  558. cf.precompute_structures()
  559. cf.match_structures()
  560. cf.build_morphisms()
  561. print(cf.check_nominal())