conformance.py 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671
  1. from services.bottom.V0 import Bottom
  2. from services import od
  3. from services.primitives.actioncode_type import ActionCode
  4. from uuid import UUID
  5. from state.base import State
  6. from typing import Dict, Tuple, Set, Any, List
  7. from pprint import pprint
  8. import functools
  9. # based on https://stackoverflow.com/a/39381428
  10. # Parses and executes a block of Python code, and returns the eval result of the last statement
  11. import ast
  12. def exec_then_eval(code, _globals, _locals):
  13. block = ast.parse(code, mode='exec')
  14. # assumes last node is an expression
  15. last = ast.Expression(block.body.pop().value)
  16. exec(compile(block, '<string>', mode='exec'), _globals, _locals)
  17. return eval(compile(last, '<string>', mode='eval'), _globals, _locals)
  18. class Conformance:
  19. def __init__(self, state: State, model: UUID, type_model: UUID):
  20. self.state = state
  21. self.bottom = Bottom(state)
  22. type_model_id = state.read_dict(state.read_root(), "SCD")
  23. self.scd_model = UUID(state.read_value(type_model_id))
  24. self.model = model
  25. self.type_model = type_model
  26. self.type_mapping: Dict[str, str] = {}
  27. self.model_names = {
  28. # map model elements to their names to prevent iterating too much
  29. self.bottom.read_outgoing_elements(self.model, e)[0]: e
  30. for e in self.bottom.read_keys(self.model)
  31. }
  32. self.type_model_names = {
  33. # map type model elements to their names to prevent iterating too much
  34. self.bottom.read_outgoing_elements(self.type_model, e)[0]
  35. : e for e in self.bottom.read_keys(self.type_model)
  36. }
  37. self.sub_types: Dict[str, Set[str]] = {
  38. k: set() for k in self.bottom.read_keys(self.type_model)
  39. }
  40. self.primitive_values: Dict[UUID, Any] = {}
  41. self.abstract_types: List[str] = []
  42. self.multiplicities: Dict[str, Tuple] = {}
  43. self.source_multiplicities: Dict[str, Tuple] = {}
  44. self.target_multiplicities: Dict[str, Tuple] = {}
  45. self.structures = {}
  46. self.matches = {}
  47. self.candidates = {}
  48. def check_nominal(self, *, log=False):
  49. """
  50. Perform a nominal conformance check
  51. Args:
  52. log: boolean indicating whether to log errors
  53. Returns:
  54. Boolean indicating whether the check has passed
  55. """
  56. errors = []
  57. errors += self.check_typing()
  58. errors += self.check_link_typing()
  59. errors += self.check_multiplicities()
  60. errors += self.check_constraints()
  61. return errors
  62. # def check_structural(self, *, build_morphisms=True, log=False):
  63. # """
  64. # Perform a structural conformance check
  65. # Args:
  66. # build_morphisms: boolean indicating whether to create morpishm links
  67. # log: boolean indicating whether to log errors
  68. # Returns:
  69. # Boolean indicating whether the check has passed
  70. # """
  71. # try:
  72. # self.precompute_structures()
  73. # self.match_structures()
  74. # if build_morphisms:
  75. # self.build_morphisms()
  76. # self.check_nominal(log=log)
  77. # return True
  78. # except RuntimeError as e:
  79. # if log:
  80. # print(e)
  81. # return False
  82. def read_attribute(self, element: UUID, attr_name: str):
  83. """
  84. Read an attribute value attached to an element
  85. Args:
  86. element: UUID of the element
  87. attr_name: name of the attribute to read
  88. Returns:
  89. The value of hte attribute, if no attribute with given name is found, returns None
  90. """
  91. if element in self.type_model_names:
  92. # type model element
  93. element_name = self.type_model_names[element]
  94. model = self.type_model
  95. else:
  96. # model element
  97. element_name = self.model_names[element]
  98. model = self.model
  99. try:
  100. attr_elem, = self.bottom.read_outgoing_elements(model, f"{element_name}.{attr_name}")
  101. return self.primitive_values.get(attr_elem, self.bottom.read_value(UUID(self.bottom.read_value(attr_elem))))
  102. except ValueError:
  103. return None
  104. def precompute_sub_types(self):
  105. """
  106. Creates an internal representation of sub-type hierarchies that is
  107. more easily queryable that the state graph
  108. """
  109. # collect inheritance link instances
  110. inh_element, = self.bottom.read_outgoing_elements(self.scd_model, "Inheritance")
  111. inh_links = []
  112. for tm_element, tm_name in self.type_model_names.items():
  113. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  114. if inh_element in morphisms:
  115. # we have an instance of an inheritance link
  116. inh_links.append(tm_element)
  117. # for each inheritance link we add the parent and child to the sub types map
  118. for link in inh_links:
  119. tm_source = self.bottom.read_edge_source(link)
  120. tm_target = self.bottom.read_edge_target(link)
  121. parent_name = self.type_model_names[tm_target]
  122. child_name = self.type_model_names[tm_source]
  123. self.sub_types[parent_name].add(child_name)
  124. # iteratively expand the sub type hierarchies in the sub types map
  125. stop = False
  126. while not stop:
  127. stop = True
  128. for child_name, child_children in self.sub_types.items():
  129. for parent_name, parent_children in self.sub_types.items():
  130. if child_name in parent_children:
  131. original_size = len(parent_children)
  132. parent_children.update(child_children)
  133. if len(parent_children) != original_size:
  134. stop = False
  135. def deref_primitive_values(self):
  136. """
  137. Prefetch the values stored in referenced primitive type models
  138. """
  139. ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef")
  140. string_element, = self.bottom.read_outgoing_elements(self.scd_model, "String")
  141. boolean_element, = self.bottom.read_outgoing_elements(self.scd_model, "Boolean")
  142. integer_element, = self.bottom.read_outgoing_elements(self.scd_model, "Integer")
  143. t_deref = []
  144. t_refs = []
  145. for tm_element, tm_name in self.type_model_names.items():
  146. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  147. if ref_element in morphisms:
  148. t_refs.append(self.type_model_names[tm_element])
  149. elif string_element in morphisms:
  150. t_deref.append(tm_element)
  151. elif boolean_element in morphisms:
  152. t_deref.append(tm_element)
  153. elif integer_element in morphisms:
  154. t_deref.append(tm_element)
  155. for elem in t_deref:
  156. primitive_model = UUID(self.bottom.read_value(elem))
  157. primitive_value_node, = self.bottom.read_outgoing_elements(primitive_model)
  158. primitive_value = self.bottom.read_value(primitive_value_node)
  159. self.primitive_values[elem] = primitive_value
  160. for m_name, tm_name in self.type_mapping.items():
  161. if tm_name in t_refs:
  162. # dereference
  163. m_element, = self.bottom.read_outgoing_elements(self.model, m_name)
  164. primitive_model = UUID(self.bottom.read_value(m_element))
  165. try:
  166. primitive_value_node, = self.bottom.read_outgoing_elements(primitive_model)
  167. primitive_value = self.bottom.read_value(primitive_value_node)
  168. self.primitive_values[m_element] = primitive_value
  169. except ValueError:
  170. pass # multiple elements in model indicate that we're not dealing with a primitive
  171. def precompute_multiplicities(self):
  172. """
  173. Creates an internal representation of type multiplicities that is
  174. more easily queryable that the state graph
  175. """
  176. for tm_element, tm_name in self.type_model_names.items():
  177. # class abstract flags and multiplicities
  178. abstract = self.read_attribute(tm_element, "abstract")
  179. lc = self.read_attribute(tm_element, "lower_cardinality")
  180. uc = self.read_attribute(tm_element, "upper_cardinality")
  181. if abstract:
  182. self.abstract_types.append(tm_name)
  183. if lc or uc:
  184. mult = (
  185. lc if lc != None else float("-inf"),
  186. uc if uc != None else float("inf")
  187. )
  188. self.multiplicities[tm_name] = mult
  189. # multiplicities for associations
  190. slc = self.read_attribute(tm_element, "source_lower_cardinality")
  191. suc = self.read_attribute(tm_element, "source_upper_cardinality")
  192. if slc or suc:
  193. mult = (
  194. slc if slc != None else float("-inf"),
  195. suc if suc != None else float("inf")
  196. )
  197. self.source_multiplicities[tm_name] = mult
  198. tlc = self.read_attribute(tm_element, "target_lower_cardinality")
  199. tuc = self.read_attribute(tm_element, "target_upper_cardinality")
  200. if tlc or tuc:
  201. mult = (
  202. tlc if tlc != None else float("-inf"),
  203. tuc if tuc != None else float("inf")
  204. )
  205. self.target_multiplicities[tm_name] = mult
  206. # optional for attribute links
  207. opt = self.read_attribute(tm_element, "optional")
  208. if opt != None:
  209. self.source_multiplicities[tm_name] = (0, float('inf'))
  210. self.target_multiplicities[tm_name] = (0 if opt else 1, 1)
  211. def get_type(self, element: UUID):
  212. """
  213. Retrieve the type of an element (wrt. current type model)
  214. """
  215. morphisms = self.bottom.read_outgoing_elements(element, "Morphism")
  216. tm_element, = [m for m in morphisms if m in self.type_model_names.keys()]
  217. return tm_element
  218. def check_typing(self):
  219. """
  220. for each element of model check whether a morphism
  221. link exists to some element of type_model
  222. """
  223. errors = []
  224. ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef")
  225. model_names = self.bottom.read_keys(self.model)
  226. for m_name in model_names:
  227. m_element, = self.bottom.read_outgoing_elements(self.model, m_name)
  228. try:
  229. tm_element = self.get_type(m_element)
  230. tm_name = self.type_model_names[tm_element]
  231. self.type_mapping[m_name] = tm_name
  232. if ref_element in self.bottom.read_outgoing_elements(tm_element, "Morphism"):
  233. sub_m = UUID(self.bottom.read_value(m_element))
  234. sub_tm = UUID(self.bottom.read_value(tm_element))
  235. nested_errors = Conformance(self.state, sub_m, sub_tm).check_nominal()
  236. errors += [f"In ModelRef ({m_name}):" + err for err in nested_errors]
  237. except ValueError as e:
  238. import traceback
  239. traceback.format_exc(e)
  240. # no or too many morphism links found
  241. errors.append(f"Incorrectly typed element: {m_name}")
  242. return errors
  243. def check_link_typing(self):
  244. """
  245. for each link, check whether its source and target are of a valid type
  246. """
  247. errors = []
  248. self.precompute_sub_types()
  249. for m_name, tm_name in self.type_mapping.items():
  250. m_element, = self.bottom.read_outgoing_elements(self.model, m_name)
  251. m_source = self.bottom.read_edge_source(m_element)
  252. m_target = self.bottom.read_edge_target(m_element)
  253. if m_source == None or m_target == None:
  254. # element is not a link
  255. continue
  256. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  257. tm_source = self.bottom.read_edge_source(tm_element)
  258. tm_target = self.bottom.read_edge_target(tm_element)
  259. # check if source is typed correctly
  260. source_name = self.model_names[m_source]
  261. source_type_actual = self.type_mapping[source_name]
  262. source_type_expected = self.type_model_names[tm_source]
  263. if source_type_actual != source_type_expected:
  264. if source_type_actual not in self.sub_types[source_type_expected]:
  265. errors.append(f"Invalid source type {source_type_actual} for element {m_name}")
  266. # check if target is typed correctly
  267. target_name = self.model_names[m_target]
  268. target_type_actual = self.type_mapping[target_name]
  269. target_type_expected = self.type_model_names[tm_target]
  270. if target_type_actual != target_type_expected:
  271. if target_type_actual not in self.sub_types[target_type_expected]:
  272. errors.append(f"Invalid target type {target_type_actual} for element {m_name}")
  273. return errors
  274. def check_multiplicities(self):
  275. """
  276. Check whether multiplicities for all types are respected
  277. """
  278. self.deref_primitive_values()
  279. self.precompute_multiplicities()
  280. errors = []
  281. for tm_name in self.type_model_names.values():
  282. # abstract classes
  283. if tm_name in self.abstract_types:
  284. type_count = list(self.type_mapping.values()).count(tm_name)
  285. if type_count > 0:
  286. errors.append(f"Invalid instantiation of abstract class: {tm_name}")
  287. # class multiplicities
  288. if tm_name in self.multiplicities:
  289. lc, uc = self.multiplicities[tm_name]
  290. type_count = list(self.type_mapping.values()).count(tm_name)
  291. for sub_type in self.sub_types[tm_name]:
  292. type_count += list(self.type_mapping.values()).count(sub_type)
  293. if type_count < lc or type_count > uc:
  294. errors.append(f"Cardinality of type exceeds valid multiplicity range: {tm_name} ({type_count})")
  295. # association source multiplicities
  296. if tm_name in self.source_multiplicities:
  297. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  298. tm_tgt_element = self.bottom.read_edge_target(tm_element)
  299. tm_tgt_name = self.type_model_names[tm_tgt_element]
  300. lc, uc = self.source_multiplicities[tm_name]
  301. for tgt_obj_name, t in self.type_mapping.items():
  302. if t == tm_tgt_name or t in self.sub_types[tm_tgt_name]:
  303. count = 0
  304. tgt_obj_node, = self.bottom.read_outgoing_elements(self.model, tgt_obj_name)
  305. incoming = self.bottom.read_incoming_edges(tgt_obj_node)
  306. for i in incoming:
  307. try:
  308. if self.type_mapping[self.model_names[i]] == tm_name:
  309. count += 1
  310. except KeyError:
  311. pass # for elements not part of model, e.g. morphism links
  312. if count < lc or count > uc:
  313. errors.append(f"Source cardinality of type {tm_name} ({count}) out of bounds ({lc}..{uc}) in {tgt_obj_name}.")
  314. # association target multiplicities
  315. if tm_name in self.target_multiplicities:
  316. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  317. # tm_target_element = self.bottom.read_edge_target(tm_element)
  318. tm_src_element = self.bottom.read_edge_source(tm_element)
  319. tm_src_name = self.type_model_names[tm_src_element]
  320. lc, uc = self.target_multiplicities[tm_name]
  321. # print("checking assoc", tm_name, "source", tm_src_name)
  322. # print("subtypes of", tm_src_name, self.sub_types[tm_src_name])
  323. for src_obj_name, t in self.type_mapping.items():
  324. if t == tm_src_name or t in self.sub_types[tm_src_name]:
  325. # print("got obj", src_obj_name, "of type", t)
  326. count = 0
  327. src_obj_node, = self.bottom.read_outgoing_elements(self.model, src_obj_name)
  328. # outgoing = self.bottom.read_incoming_edges(src_obj_node)
  329. outgoing = self.bottom.read_outgoing_edges(src_obj_node)
  330. for o in outgoing:
  331. try:
  332. if self.type_mapping[self.model_names[o]] == tm_name:
  333. # print("have an outgoing edge", self.model_names[o], self.type_mapping[self.model_names[o]], "---> increase counter")
  334. count += 1
  335. except KeyError:
  336. pass # for elements not part of model, e.g. morphism links
  337. if count < lc or count > uc:
  338. errors.append(f"Target cardinality of type {tm_name} ({count}) out of bounds ({lc}..{uc}) in {src_obj_name}.")
  339. # else:
  340. # print(f"OK: Target cardinality of type {tm_name} ({count}) within bounds ({lc}..{uc}) in {src_obj_name}.")
  341. return errors
  342. def evaluate_constraint(self, code, **kwargs):
  343. """
  344. Evaluate constraint code (Python code)
  345. """
  346. funcs = {
  347. 'read_value': self.state.read_value,
  348. 'get_value': lambda el: od.read_primitive_value(self.bottom, el, self.type_model)[0],
  349. 'get_target': lambda el: self.bottom.read_edge_target(el),
  350. 'get_source': lambda el: self.bottom.read_edge_source(el),
  351. 'get_slot': od.OD(self.type_model, self.model, self.state).get_slot,
  352. 'get_all_instances': self.get_all_instances,
  353. 'get_name': lambda el: [name for name in self.bottom.read_keys(self.model) if self.bottom.read_outgoing_elements(self.model, name)[0] == el][0],
  354. 'get_type_name': self.get_type_name,
  355. 'get_outgoing': self.get_outgoing,
  356. 'get_incoming': self.get_incoming,
  357. }
  358. # print("evaluating constraint ...", code)
  359. loc = {**kwargs, }
  360. result = exec_then_eval(
  361. code,
  362. {'__builtins__': {'isinstance': isinstance, 'print': print,
  363. 'int': int, 'float': float, 'bool': bool, 'str': str, 'tuple': tuple, 'len': len, 'set': set, 'dict': dict},
  364. **funcs
  365. }, # globals
  366. loc # locals
  367. )
  368. # print('result =', result)
  369. return result
  370. def get_type_name(self, element: UUID):
  371. type_node = self.bottom.read_outgoing_elements(element, "Morphism")[0]
  372. for type_name in self.bottom.read_keys(self.type_model):
  373. if self.bottom.read_outgoing_elements(self.type_model, type_name)[0] == type_node:
  374. return type_name
  375. def get_all_instances(self, type_name: str, include_subtypes=True):
  376. result = [e_name for e_name, t_name in self.type_mapping.items() if t_name == type_name]
  377. if include_subtypes:
  378. for subtype_name in self.sub_types[type_name]:
  379. # print(subtype_name, 'is subtype of ')
  380. result += [e_name for e_name, t_name in self.type_mapping.items() if t_name == subtype_name]
  381. result_with_ids = [ (e_name, self.bottom.read_outgoing_elements(self.model, e_name)[0]) for e_name in result]
  382. return result_with_ids
  383. def get_outgoing(self, element: UUID, assoc_or_attr_name: str):
  384. return od.find_outgoing_typed_by(self.bottom, src=element, type_node=self.bottom.read_outgoing_elements(self.type_model, assoc_or_attr_name)[0])
  385. def get_incoming(self, element: UUID, assoc_or_attr_name: str):
  386. return od.find_incoming_typed_by(self.bottom, tgt=element, type_node=self.bottom.read_outgoing_elements(self.type_model, assoc_or_attr_name)[0])
  387. def check_constraints(self):
  388. """
  389. Check whether all constraints defined for a model are respected
  390. """
  391. errors = []
  392. def get_code(tm_name):
  393. constraints = self.bottom.read_outgoing_elements(self.type_model, f"{tm_name}.constraint")
  394. if len(constraints) == 1:
  395. constraint = constraints[0]
  396. code = ActionCode(UUID(self.bottom.read_value(constraint)), self.bottom.state).read()
  397. return code
  398. def check_result(result, description):
  399. if not isinstance(result, bool):
  400. raise Exception(f"{description} evaluation result is not boolean! Instead got {result}")
  401. if not result:
  402. errors.append(f"{description} not satisfied.")
  403. # local constraints
  404. for m_name, tm_name in self.type_mapping.items():
  405. code = get_code(tm_name)
  406. if code != None:
  407. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  408. morphisms = self.bottom.read_incoming_elements(tm_element, "Morphism")
  409. morphisms = [m for m in morphisms if m in self.model_names]
  410. for m_element in morphisms:
  411. result = self.evaluate_constraint(code, this=m_element)
  412. description = f"Local constraint of \"{tm_name}\" in \"{m_name}\""
  413. check_result(result, description)
  414. # global constraints
  415. glob_constraints = []
  416. # find global constraints...
  417. glob_constraint_type, = self.bottom.read_outgoing_elements(self.scd_model, "GlobalConstraint")
  418. for tm_name in self.bottom.read_keys(self.type_model):
  419. tm_node, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  420. # print(key, node)
  421. for type_of_node in self.bottom.read_outgoing_elements(tm_node, "Morphism"):
  422. if type_of_node == glob_constraint_type:
  423. # node is GlobalConstraint
  424. glob_constraints.append(tm_name)
  425. # evaluate them
  426. for tm_name in glob_constraints:
  427. code = get_code(tm_name)
  428. if code != None:
  429. result = self.evaluate_constraint(code, model=self.model)
  430. description = f"Global constraint \"{tm_name}\""
  431. check_result(result, description)
  432. return errors
  433. def precompute_structures(self):
  434. """
  435. Make an internal representation of type structures such that comparing type structures is easier
  436. """
  437. self.precompute_sub_types()
  438. scd_elements = self.bottom.read_outgoing_elements(self.scd_model)
  439. # collect types
  440. class_element, = self.bottom.read_outgoing_elements(self.scd_model, "Class")
  441. association_element, = self.bottom.read_outgoing_elements(self.scd_model, "Association")
  442. for tm_element, tm_name in self.type_model_names.items():
  443. # retrieve elements that tm_element is a morphism of
  444. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  445. morphism, = [m for m in morphisms if m in scd_elements]
  446. # check if tm_element is a morphism of AttributeLink
  447. if class_element == morphism or association_element == morphism:
  448. self.structures[tm_name] = set()
  449. # collect type structures
  450. # retrieve AttributeLink to check whether element is a morphism of AttributeLink
  451. attr_link_element, = self.bottom.read_outgoing_elements(self.scd_model, "AttributeLink")
  452. for tm_element, tm_name in self.type_model_names.items():
  453. # retrieve elements that tm_element is a morphism of
  454. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  455. morphism, = [m for m in morphisms if m in scd_elements]
  456. # check if tm_element is a morphism of AttributeLink
  457. if attr_link_element == morphism:
  458. # retrieve attributes of attribute link, i.e. 'name' and 'optional'
  459. attrs = self.bottom.read_outgoing_elements(tm_element)
  460. name_model_node, = filter(lambda x: self.type_model_names.get(x, "").endswith(".name"), attrs)
  461. opt_model_node, = filter(lambda x: self.type_model_names.get(x, "").endswith(".optional"), attrs)
  462. # get attr name value
  463. name_model = UUID(self.bottom.read_value(name_model_node))
  464. name_node, = self.bottom.read_outgoing_elements(name_model)
  465. name = self.bottom.read_value(name_node)
  466. # get attr opt value
  467. opt_model = UUID(self.bottom.read_value(opt_model_node))
  468. opt_node, = self.bottom.read_outgoing_elements(opt_model)
  469. opt = self.bottom.read_value(opt_node)
  470. # get attr type name
  471. source_type_node = self.bottom.read_edge_source(tm_element)
  472. source_type_name = self.type_model_names[source_type_node]
  473. target_type_node = self.bottom.read_edge_target(tm_element)
  474. target_type_name = self.type_model_names[target_type_node]
  475. # add attribute to the structure of its source type
  476. # attribute is stored as a (name, optional, type) triple
  477. self.structures.setdefault(source_type_name, set()).add((name, opt, target_type_name))
  478. # extend structures of sub types with attrs of super types
  479. for super_type, sub_types in self.sub_types.items():
  480. for sub_type in sub_types:
  481. self.structures.setdefault(sub_type, set()).update(self.structures[super_type])
  482. # filter out abstract types, as they cannot be instantiated
  483. # retrieve Class_abstract to check whether element is a morphism of Class_abstract
  484. class_abs_element, = self.bottom.read_outgoing_elements(self.scd_model, "Class_abstract")
  485. for tm_element, tm_name in self.type_model_names.items():
  486. # retrieve elements that tm_element is a morphism of
  487. morphisms = self.bottom.read_outgoing_elements(tm_element, "Morphism")
  488. morphism, = [m for m in morphisms if m in scd_elements]
  489. # check if tm_element is a morphism of Class_abstract
  490. if class_abs_element == morphism:
  491. # retrieve 'abstract' attribute value
  492. target_node = self.bottom.read_edge_target(tm_element)
  493. abst_model = UUID(self.bottom.read_value(target_node))
  494. abst_node, = self.bottom.read_outgoing_elements(abst_model)
  495. is_abstract = self.bottom.read_value(abst_node)
  496. # retrieve type name
  497. source_node = self.bottom.read_edge_source(tm_element)
  498. type_name = self.type_model_names[source_node]
  499. if is_abstract:
  500. self.structures.pop(type_name)
  501. def match_structures(self):
  502. """
  503. Try to match the structure of each element in the instance model to some element in the type model
  504. """
  505. ref_element, = self.bottom.read_outgoing_elements(self.scd_model, "ModelRef")
  506. # matching
  507. for m_element, m_name in self.model_names.items():
  508. is_edge = self.bottom.read_edge_source(m_element) != None
  509. print('element:', m_element, 'name:', m_name, 'is_edge', is_edge)
  510. for type_name, structure in self.structures.items():
  511. tm_element, = self.bottom.read_outgoing_elements(self.type_model, type_name)
  512. type_is_edge = self.bottom.read_edge_source(tm_element) != None
  513. if is_edge == type_is_edge:
  514. print(' type_name:', type_name, 'type_is_edge:', type_is_edge, "structure:", structure)
  515. mismatch = False
  516. matched = 0
  517. for name, optional, attr_type in structure:
  518. print(' name:', name, "optional:", optional, "attr_type:", attr_type)
  519. try:
  520. attr, = self.bottom.read_outgoing_elements(self.model, f"{m_name}.{name}")
  521. attr_tm, = self.bottom.read_outgoing_elements(self.type_model, attr_type)
  522. # if attribute is a modelref, we need to check whether it
  523. # linguistically conforms to the specified type
  524. # if its an internally defined attribute, this will be checked by constraints
  525. morphisms = self.bottom.read_outgoing_elements(attr_tm, "Morphism")
  526. attr_conforms = True
  527. if ref_element in morphisms:
  528. # check conformance of reference model
  529. type_model_uuid = UUID(self.bottom.read_value(attr_tm))
  530. model_uuid = UUID(self.bottom.read_value(attr))
  531. attr_conforms = Conformance(self.state, model_uuid, type_model_uuid)\
  532. .check_nominal()
  533. else:
  534. # eval constraints
  535. code = self.read_attribute(attr_tm, "constraint")
  536. if code != None:
  537. attr_conforms = self.evaluate_constraint(code, this=attr)
  538. if attr_conforms:
  539. matched += 1
  540. print(" attr_conforms -> matched:", matched)
  541. except ValueError as e:
  542. # attr not found or failed parsing UUID
  543. if optional:
  544. print(" skipping:", e)
  545. continue
  546. else:
  547. # did not match mandatory attribute
  548. print(" breaking:", e)
  549. mismatch = True
  550. break
  551. print(' matched:', matched, 'len(structure):', len(structure))
  552. # if matched == len(structure):
  553. if not mismatch:
  554. print(' add to candidates:', m_name, type_name)
  555. self.candidates.setdefault(m_name, set()).add(type_name)
  556. # filter out candidates for links based on source and target types
  557. for m_element, m_name in self.model_names.items():
  558. is_edge = self.bottom.read_edge_source(m_element) != None
  559. if is_edge and m_name in self.candidates:
  560. m_source = self.bottom.read_edge_source(m_element)
  561. m_target = self.bottom.read_edge_target(m_element)
  562. print(self.candidates)
  563. source_candidates = self.candidates[self.model_names[m_source]]
  564. target_candidates = self.candidates[self.model_names[m_target]]
  565. remove = set()
  566. for candidate_name in self.candidates[m_name]:
  567. candidate_element, = self.bottom.read_outgoing_elements(self.type_model, candidate_name)
  568. candidate_source = self.type_model_names[self.bottom.read_edge_source(candidate_element)]
  569. if candidate_source not in source_candidates:
  570. if len(source_candidates.intersection(set(self.sub_types[candidate_source]))) == 0:
  571. remove.add(candidate_name)
  572. candidate_target = self.type_model_names[self.bottom.read_edge_target(candidate_element)]
  573. if candidate_target not in target_candidates:
  574. if len(target_candidates.intersection(set(self.sub_types[candidate_target]))) == 0:
  575. remove.add(candidate_name)
  576. self.candidates[m_name] = self.candidates[m_name].difference(remove)
  577. def build_morphisms(self):
  578. """
  579. Build the morphisms between an instance and a type model that structurally match
  580. """
  581. if not all([len(c) == 1 for c in self.candidates.values()]):
  582. raise RuntimeError("Cannot build incomplete or ambiguous morphism.")
  583. mapping = {k: v.pop() for k, v in self.candidates.items()}
  584. for m_name, tm_name in mapping.items():
  585. # morphism to class/assoc
  586. m_element, = self.bottom.read_outgoing_elements(self.model, m_name)
  587. tm_element, = self.bottom.read_outgoing_elements(self.type_model, tm_name)
  588. self.bottom.create_edge(m_element, tm_element, "Morphism")
  589. # morphism for attributes and attribute links
  590. structure = self.structures[tm_name]
  591. for attr_name, _, attr_type in structure:
  592. try:
  593. # attribute node
  594. attr_element, = self.bottom.read_outgoing_elements(self.model, f"{m_name}.{attr_name}")
  595. attr_type_element, = self.bottom.read_outgoing_elements(self.type_model, attr_type)
  596. self.bottom.create_edge(attr_element, attr_type_element, "Morphism")
  597. # attribute link
  598. attr_link_element, = self.bottom.read_outgoing_elements(self.model, f"{m_name}_{attr_name}")
  599. attr_link_type_element, = self.bottom.read_outgoing_elements(self.type_model, f"{tm_name}_{attr_name}")
  600. self.bottom.create_edge(attr_link_element, attr_link_type_element, "Morphism")
  601. except ValueError:
  602. pass
  603. if __name__ == '__main__':
  604. from state.devstate import DevState as State
  605. s = State()
  606. from bootstrap.scd import bootstrap_scd
  607. scd = bootstrap_scd(s)
  608. from bootstrap.pn import bootstrap_pn
  609. ltm_pn = bootstrap_pn(s, "PN")
  610. ltm_pn_lola = bootstrap_pn(s, "PNlola")
  611. from services.pn import PN
  612. my_pn = s.create_node()
  613. PNserv = PN(my_pn, s)
  614. PNserv.create_place("p1", 5)
  615. PNserv.create_place("p2", 0)
  616. PNserv.create_transition("t1")
  617. PNserv.create_p2t("p1", "t1", 1)
  618. PNserv.create_t2p("t1", "p2", 1)
  619. cf = Conformance(s, my_pn, ltm_pn_lola)
  620. # cf = Conformance(s, scd, ltm_pn, scd)
  621. cf.precompute_structures()
  622. cf.match_structures()
  623. cf.build_morphisms()
  624. print(cf.check_nominal())